Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
171 changes: 83 additions & 88 deletions example/AsyncProducer.cpp
100755 → 100644
Original file line number Diff line number Diff line change
Expand Up @@ -36,105 +36,100 @@ SendCallback* g_callback = NULL;
TpsReportService g_tps;

class MySendCallback : public SendCallback {
virtual void onSuccess(SendResult& sendResult) {
g_msgCount--;
g_tps.Increment();
if (g_msgCount.load() <= 0) {
std::unique_lock<std::mutex> lck(g_mtx);
g_finished.notify_one();
virtual void onSuccess(SendResult& sendResult) {
g_msgCount--;
g_tps.Increment();
if (g_msgCount.load() <= 0) {
std::unique_lock<std::mutex> lck(g_mtx);
g_finished.notify_one();
}
}
}
virtual void onException(MQException& e) { cout << "send Exception\n"; }
virtual void onException(MQException& e) { cout << "send Exception\n"; }
};

class MyAutoDeleteSendCallback : public AutoDeleteSendCallBack {
public:
virtual ~MyAutoDeleteSendCallback() {}
virtual void onSuccess(SendResult& sendResult) {
g_msgCount--;
if (g_msgCount.load() <= 0) {
std::unique_lock<std::mutex> lck(g_mtx);
g_finished.notify_one();
public:
virtual ~MyAutoDeleteSendCallback() {}
virtual void onSuccess(SendResult& sendResult) {
g_msgCount--;
if (g_msgCount.load() <= 0) {
std::unique_lock<std::mutex> lck(g_mtx);
g_finished.notify_one();
}
}
}
virtual void onException(MQException& e) {
std::cout << "send Exception" << e << "\n";
}
virtual void onException(MQException& e) { std::cout << "send Exception" << e << "\n"; }
};

void AsyncProducerWorker(RocketmqSendAndConsumerArgs* info,
DefaultMQProducer* producer) {
while (!g_quit.load()) {
if (g_msgCount.load() <= 0) {
std::unique_lock<std::mutex> lck(g_mtx);
g_finished.notify_one();
void AsyncProducerWorker(RocketmqSendAndConsumerArgs* info, DefaultMQProducer* producer) {
while (!g_quit.load()) {
if (g_msgCount.load() <= 0) {
std::unique_lock<std::mutex> lck(g_mtx);
g_finished.notify_one();
}
MQMessage msg(info->topic, // topic
"*", // tag
info->body); // body

if (info->IsAutoDeleteSendCallback) {
g_callback = new MyAutoDeleteSendCallback(); // auto delete
}

try {
producer->send(msg, g_callback);
}
catch (MQException& e) {
std::cout << e << endl; // if catch excepiton , need re-send this msg by
// service
}
}
MQMessage msg(info->topic, // topic
"*", // tag
info->body); // body
}

if (info->IsAutoDeleteSendCallback) {
g_callback = new MyAutoDeleteSendCallback(); // auto delete
int main(int argc, char* argv[]) {
RocketmqSendAndConsumerArgs info;
if (!ParseArgs(argc, argv, &info)) {
exit(-1);
}

try {
producer->send(msg, g_callback);
} catch (MQException& e) {
std::cout << e << endl; // if catch excepiton , need re-send this msg by
// service
DefaultMQProducer producer("please_rename_unique_group_name");
if (!info.IsAutoDeleteSendCallback) {
g_callback = new MySendCallback();
}
}
}

int main(int argc, char* argv[]) {
RocketmqSendAndConsumerArgs info;
if (!ParseArgs(argc, argv, &info)) {
exit(-1);
}

DefaultMQProducer producer("please_rename_unique_group_name");
if (!info.IsAutoDeleteSendCallback) {
g_callback = new MySendCallback();
}

PrintRocketmqSendAndConsumerArgs(info);

if (!info.namesrv.empty()) producer.setNamesrvAddr(info.namesrv);

producer.setGroupName(info.groupname);
producer.setInstanceName(info.groupname);
producer.setNamesrvDomain(info.namesrv_domain);
producer.start();
g_tps.start();
std::vector<std::shared_ptr<std::thread>> work_pool;
auto start = std::chrono::system_clock::now();
int msgcount = g_msgCount.load();
for (int j = 0; j < info.thread_count; j++) {
std::shared_ptr<std::thread> th =
std::make_shared<std::thread>(AsyncProducerWorker, &info, &producer);
work_pool.push_back(th);
}

{
std::unique_lock<std::mutex> lck(g_mtx);
g_finished.wait(lck);
g_quit.store(true);
}

auto end = std::chrono::system_clock::now();
auto duration =
std::chrono::duration_cast<std::chrono::milliseconds>(end - start);

std::cout
<< "per msg time: " << duration.count() / (double)msgcount << "ms \n"
<< "========================finished==============================\n";

producer.shutdown();
for (size_t th = 0; th != work_pool.size(); ++th) {
work_pool[th]->join();
}
if (!info.IsAutoDeleteSendCallback) {
delete g_callback;
}
return 0;
PrintRocketmqSendAndConsumerArgs(info);

if (!info.namesrv.empty()) producer.setNamesrvAddr(info.namesrv);

producer.setGroupName(info.groupname);
producer.setInstanceName(info.groupname);
producer.setNamesrvDomain(info.namesrv_domain);
producer.start();
g_tps.start();
std::vector<std::shared_ptr<std::thread>> work_pool;
auto start = std::chrono::system_clock::now();
int msgcount = g_msgCount.load();
for (int j = 0; j < info.thread_count; j++) {
std::shared_ptr<std::thread> th = std::make_shared<std::thread>(AsyncProducerWorker, &info, &producer);
work_pool.push_back(th);
}

{
std::unique_lock<std::mutex> lck(g_mtx);
g_finished.wait(lck);
g_quit.store(true);
}

auto end = std::chrono::system_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end - start);

std::cout << "per msg time: " << duration.count() / (double)msgcount << "ms \n"
<< "========================finished==============================\n";

producer.shutdown();
for (size_t th = 0; th != work_pool.size(); ++th) {
work_pool[th]->join();
}
if (!info.IsAutoDeleteSendCallback) {
delete g_callback;
}
return 0;
}
138 changes: 70 additions & 68 deletions example/AsyncPushConsumer.cpp
100755 → 100644
Original file line number Diff line number Diff line change
Expand Up @@ -33,79 +33,81 @@ TpsReportService g_tps;
using namespace rocketmq;

class MyMsgListener : public MessageListenerConcurrently {
public:
MyMsgListener() {}
virtual ~MyMsgListener() {}

virtual ConsumeStatus consumeMessage(const std::vector<MQMessageExt> &msgs) {
g_msgCount.store(g_msgCount.load() - msgs.size());
for (size_t i = 0; i < msgs.size(); ++i) {
g_tps.Increment();
public:
MyMsgListener() {}
virtual ~MyMsgListener() {}

virtual ConsumeStatus consumeMessage(const std::vector<MQMessageExt> &msgs) {
g_msgCount.store(g_msgCount.load() - msgs.size());
for (size_t i = 0; i < msgs.size(); ++i) {
g_tps.Increment();
}

if (g_msgCount.load() <= 0) {
std::unique_lock<std::mutex> lck(g_mtx);
g_finished.notify_one();
}
return CONSUME_SUCCESS;
}

if (g_msgCount.load() <= 0) {
std::unique_lock<std::mutex> lck(g_mtx);
g_finished.notify_one();
}
return CONSUME_SUCCESS;
}
};

int main(int argc, char *argv[]) {
RocketmqSendAndConsumerArgs info;
if (!ParseArgs(argc, argv, &info)) {
exit(-1);
}
PrintRocketmqSendAndConsumerArgs(info);
DefaultMQPushConsumer consumer("please_rename_unique_group_name");
DefaultMQProducer producer("please_rename_unique_group_name");

producer.setNamesrvAddr(info.namesrv);
producer.setGroupName("msg-persist-group_producer_sandbox");
producer.setNamesrvDomain(info.namesrv_domain);
producer.start();

consumer.setNamesrvAddr(info.namesrv);
consumer.setGroupName(info.groupname);
consumer.setNamesrvDomain(info.namesrv_domain);
consumer.setConsumeFromWhere(CONSUME_FROM_LAST_OFFSET);

consumer.setInstanceName(info.groupname);

consumer.subscribe(info.topic, "*");
consumer.setConsumeThreadCount(15);
consumer.setTcpTransportTryLockTimeout(1000);
consumer.setTcpTransportConnectTimeout(400);

MyMsgListener msglistener;
consumer.registerMessageListener(&msglistener);

try {
consumer.start();
} catch (MQClientException &e) {
cout << e << endl;
}
g_tps.start();

int msgcount = g_msgCount.load();
for (int i = 0; i < msgcount; ++i) {
MQMessage msg(info.topic, // topic
"*", // tag
info.body); // body
RocketmqSendAndConsumerArgs info;
if (!ParseArgs(argc, argv, &info)) {
exit(-1);
}
PrintRocketmqSendAndConsumerArgs(info);
DefaultMQPushConsumer consumer("please_rename_unique_group_name");
DefaultMQProducer producer("please_rename_unique_group_name");

producer.setNamesrvAddr(info.namesrv);
producer.setGroupName("msg-persist-group_producer_sandbox");
producer.setNamesrvDomain(info.namesrv_domain);
producer.start();

consumer.setNamesrvAddr(info.namesrv);
consumer.setGroupName(info.groupname);
consumer.setNamesrvDomain(info.namesrv_domain);
consumer.setConsumeFromWhere(CONSUME_FROM_LAST_OFFSET);

consumer.setInstanceName(info.groupname);

consumer.subscribe(info.topic, "*");
consumer.setConsumeThreadCount(15);
consumer.setTcpTransportTryLockTimeout(1000);
consumer.setTcpTransportConnectTimeout(400);

MyMsgListener msglistener;
consumer.registerMessageListener(&msglistener);

try {
producer.send(msg);
} catch (MQException &e) {
std::cout << e << endl; // if catch excepiton , need re-send this msg by
// service
consumer.start();
}
catch (MQClientException &e) {
cout << e << endl;
}
g_tps.start();

int msgcount = g_msgCount.load();
for (int i = 0; i < msgcount; ++i) {
MQMessage msg(info.topic, // topic
"*", // tag
info.body); // body

try {
producer.send(msg);
}
catch (MQException &e) {
std::cout << e << endl; // if catch excepiton , need re-send this msg by
// service
}
}

{
std::unique_lock<std::mutex> lck(g_mtx);
g_finished.wait(lck);
}
}

{
std::unique_lock<std::mutex> lck(g_mtx);
g_finished.wait(lck);
}
producer.shutdown();
consumer.shutdown();
return 0;
producer.shutdown();
consumer.shutdown();
return 0;
}
24 changes: 10 additions & 14 deletions example/BatchProducer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@ std::mutex g_mtx;
std::condition_variable g_finished;
TpsReportService g_tps;

void SyncProducerWorker(RocketmqSendAndConsumerArgs* info,
DefaultMQProducer* producer) {
void SyncProducerWorker(RocketmqSendAndConsumerArgs* info, DefaultMQProducer* producer) {
while (!g_quit.load()) {
if (g_msgCount.load() <= 0) {
std::unique_lock<std::mutex> lck(g_mtx);
Expand All @@ -62,13 +61,13 @@ void SyncProducerWorker(RocketmqSendAndConsumerArgs* info,
g_tps.Increment();
--g_msgCount;
auto end = std::chrono::system_clock::now();
auto duration =
std::chrono::duration_cast<std::chrono::milliseconds>(end - start);
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end - start);
if (duration.count() >= 500) {
std::cout << "send RT more than: " << duration.count()
<< " ms with msgid: " << sendResult.getMsgId() << endl;
std::cout << "send RT more than: " << duration.count() << " ms with msgid: " << sendResult.getMsgId()
<< endl;
}
} catch (const MQException& e) {
}
catch (const MQException& e) {
std::cout << "send failed: " << e.what() << std::endl;
std::unique_lock<std::mutex> lck(g_mtx);
g_finished.notify_one();
Expand Down Expand Up @@ -101,8 +100,7 @@ int main(int argc, char* argv[]) {

int threadCount = info.thread_count;
for (int j = 0; j < threadCount; j++) {
std::shared_ptr<std::thread> th =
std::make_shared<std::thread>(SyncProducerWorker, &info, &producer);
std::shared_ptr<std::thread> th = std::make_shared<std::thread>(SyncProducerWorker, &info, &producer);
work_pool.push_back(th);
}

Expand All @@ -113,12 +111,10 @@ int main(int argc, char* argv[]) {
}

auto end = std::chrono::system_clock::now();
auto duration =
std::chrono::duration_cast<std::chrono::milliseconds>(end - start);
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end - start);

std::cout
<< "per msg time: " << duration.count() / (double)msgcount << "ms \n"
<< "========================finished==============================\n";
std::cout << "per msg time: " << duration.count() / (double)msgcount << "ms \n"
<< "========================finished==============================\n";

for (size_t th = 0; th != work_pool.size(); ++th) {
work_pool[th]->join();
Expand Down
Loading