ActiveMQ生產者發送資料第二次開始阻塞,第一次不阻塞,沒有啟動消費者,型別:topic。
呼叫方式:
lz_producer* topic_producer_ = new lz_producer ("failover://(tcp://127.0.0.1:61616)","test.policy",true,false);
頭檔案
class lz_producer {
private:
Session* session_;
Connection* connection_;
Destination* destination_;
MessageProducer* producer_;
std::string dest_uri_;
std::string broker_uri_;
bool use_topic_;
bool client_ack_;
private:
void cleanup(void);
public:
lz_producer(const std::string& broker_uri,const std::string& dest_uri,
bool use_topic = false, bool client_ack = false);
~lz_producer(void);
public:
void initializer(void);
bool send_message(const std::string& content);
bool send_message(const std::string& content, const int priority);
};CPP檔案
lz_producer::lz_producer(const std::string &broker_uri, const std::string &dest_uri,
bool use_topic, bool client_ack) {
session_ = NULL;
producer_ = NULL;
connection_ = NULL;
destination_ = NULL;
use_topic_ = use_topic;
client_ack_ = client_ack;
broker_uri_.assign(broker_uri);
dest_uri_.assign(dest_uri);
activemq::library::ActiveMQCPP::initializeLibrary();
LOG(INFO) << "lz_producer create, " << dest_uri_;
}
vanli::lz_producer::~lz_producer(void) {
cleanup();
activemq::library::ActiveMQCPP::shutdownLibrary();
}
void lz_producer::initializer(void) {
LOG(INFO) << "lz_producer::initializer, " << dest_uri_;
try {
// Create a ConnectionFact
std::auto_ptr<ActiveMQConnectionFactory> connectionFactory(new ActiveMQConnectionFactory(broker_uri_));
connectionFactory->setUseAsyncSend(true);
connection_ = connectionFactory->createConnection();
connection_->start();
if (client_ack_) {
LOG(INFO) << "Session::CLIENT_ACKNOWLEDGE";
session_ = connection_->createSession(Session::CLIENT_ACKNOWLEDGE);
} else {
//Session::AUTO_ACKNOWLEDGE
//Session::SESSION_TRANSACTED
LOG(INFO) << "Session::SESSION_TRANSACTED";
session_ = connection_->createSession(Session::SESSION_TRANSACTED);
}
if (use_topic_) {
destination_ = session_->createTopic(dest_uri_);
} else {
destination_ = session_->createQueue(dest_uri_);
}
producer_ = session_->createProducer(destination_);
producer_->setDeliveryMode(DeliveryMode::PERSISTENT);
}catch ( CMSException& e ) {
LOG(ERROR) << e.getMessage();
}
}
void lz_producer::cleanup(void) {
try {
if (destination_ != NULL) {
delete destination_;
}
} catch (CMSException &e) {
LOG(ERROR) << e.getMessage();
}
destination_ = NULL;
try {
if (producer_ != NULL) {
delete producer_;
}
} catch (CMSException &e) {
LOG(ERROR) << e.getMessage();
}
producer_ = NULL;
try {
if (session_ != NULL) {
session_->close();
}
if (connection_ != NULL) {
connection_->close();
}
} catch (CMSException &e) {
LOG(ERROR) << e.getMessage();
}
try {
if (session_ != NULL){
delete session_;
}
} catch (CMSException &e) {
LOG(ERROR) << e.getMessage();
}
session_ = NULL;
try {
if (connection_ != NULL) {
delete connection_;}
} catch (CMSException &e) {
LOG(ERROR) << e.getMessage();
}
connection_ = NULL;
}
bool lz_producer::send_message(const std::string &content) {
bool ret = false;
try {
std::auto_ptr<cms::TextMessage> textMessage(session_->createTextMessage());
textMessage->setText(content);
producer_->send(textMessage.get());
session_->commit();
ret = true;
} catch (CMSException &e) {
LOG(ERROR) << "send_message 1, error: " << e.getMessage();
}
return ret;
}
轉載請註明出處,本文鏈接:https://www.uj5u.com/caozuo/110306.html
標籤:應用程序開發區
