2727
2828namespace rocketmq {
2929// <!***************************************************************************
30- AsyncCallbackWrap::AsyncCallbackWrap (AsyncCallback* pAsyncCallback,
31- MQClientAPIImpl* pclientAPI)
32- : m_pAsyncCallBack(pAsyncCallback), m_pClientAPI(pclientAPI) {}
30+ AsyncCallbackWrap::AsyncCallbackWrap (AsyncCallback *pAsyncCallback, MQClientAPIImpl *pClientAPI)
31+ : m_pAsyncCallBack(pAsyncCallback), m_pClientAPI(pClientAPI) {}
3332
3433AsyncCallbackWrap::~AsyncCallbackWrap () {
35- m_pAsyncCallBack = NULL ;
36- m_pClientAPI = NULL ;
34+ m_pAsyncCallBack = nullptr ;
35+ m_pClientAPI = nullptr ;
3736}
3837
3938// <!************************************************************************
40- SendCallbackWrap::SendCallbackWrap (const string& brokerName,
41- const MQMessage& msg,
42- AsyncCallback* pAsyncCallback,
43- MQClientAPIImpl* pclientAPI)
44- : AsyncCallbackWrap(pAsyncCallback, pclientAPI),
39+ SendCallbackWrap::SendCallbackWrap (
40+ const string &brokerName, const MQMessage &msg, AsyncCallback *pAsyncCallback, MQClientAPIImpl *pClientAPI)
41+ : AsyncCallbackWrap(pAsyncCallback, pClientAPI),
4542 m_msg (msg),
4643 m_brokerName(brokerName) {}
4744
4845void SendCallbackWrap::onException () {
49- if (m_pAsyncCallBack == NULL ) return ;
46+ if (m_pAsyncCallBack == nullptr ) return ;
5047
51- SendCallback* pCallback = static_cast <SendCallback*>(m_pAsyncCallBack);
48+ auto * pCallback = static_cast <SendCallback *>(m_pAsyncCallBack);
5249 if (pCallback) {
5350 unique_ptr<MQException> exception (new MQException (
54- " send msg failed due to wait response timeout or network error" , -1 ,
55- __FILE__, __LINE__));
51+ " send msg failed due to wait response timeout or network error" , -1 , __FILE__, __LINE__));
5652 pCallback->onException (*exception);
5753 if (pCallback->getSendCallbackType () == autoDeleteSendCallback) {
5854 deleteAndZero (pCallback);
5955 }
6056 }
6157}
6258
63- void SendCallbackWrap::operationComplete (ResponseFuture* pResponseFuture,
64- bool bProducePullRequest) {
59+ void SendCallbackWrap::onComplete (ResponseFuture *pResponseFuture) {
6560 unique_ptr<RemotingCommand> pResponse (pResponseFuture->getCommand ());
6661
67- if (m_pAsyncCallBack == NULL ) {
62+ if (m_pAsyncCallBack == nullptr ) {
6863 return ;
6964 }
7065 int opaque = pResponseFuture->getOpaque ();
71- SendCallback* pCallback = static_cast <SendCallback*>(m_pAsyncCallBack);
66+ auto * pCallback = static_cast <SendCallback *>(m_pAsyncCallBack);
7267
7368 if (!pResponse) {
74- string err = " unknow reseaon " ;
69+ string err = " unknown reason " ;
7570 if (!pResponseFuture->isSendRequestOK ()) {
7671 err = " send request failed" ;
77-
7872 } else if (pResponseFuture->isTimeOut ()) {
79- // pResponseFuture->setAsyncResponseFlag();
8073 err = " wait response timeout" ;
8174 }
8275 if (pCallback) {
@@ -86,44 +79,46 @@ void SendCallbackWrap::operationComplete(ResponseFuture* pResponseFuture,
8679 LOG_ERROR (" send failed of:%d" , pResponseFuture->getOpaque ());
8780 } else {
8881 try {
89- SendResult ret = m_pClientAPI->processSendResponse (m_brokerName, m_msg, pResponse.get ());
90- if (pCallback) {
91- LOG_DEBUG (" operationComplete: processSendResponse success, opaque:%d, maxRetryTime:%d, retrySendTimes:%d" , opaque, pResponseFuture->getMaxRetrySendTimes (), pResponseFuture->getRetrySendTimes ());
92- pCallback->onSuccess (ret);
93- }
94- } catch (MQException& e) {
95- LOG_ERROR (" operationComplete: processSendResponse exception: %s" , e.what ());
96-
97- // broker may return exception, need consider retry send
98- int maxRetryTimes = pResponseFuture->getMaxRetrySendTimes ();
99- int retryTimes = pResponseFuture->getRetrySendTimes ();
100- if (pResponseFuture->getAsyncFlag () && retryTimes < maxRetryTimes && maxRetryTimes > 1 ) {
101-
102- int64 left_timeout_ms = pResponseFuture->leftTime ();
103- string brokerAddr = pResponseFuture->getBrokerAddr ();
104- const RemotingCommand& requestCommand = pResponseFuture->getRequestCommand ();
105- retryTimes += 1 ;
106- LOG_WARN (" retry send, opaque:%d, sendTimes:%d, maxRetryTimes:%d, left_timeout:%lld, brokerAddr:%s, msg:%s" ,
107- opaque, retryTimes, maxRetryTimes, left_timeout_ms, brokerAddr.data (), m_msg.toString ().data ());
108-
109- bool exception_flag = false ;
110- try {
111- m_pClientAPI->sendMessageAsync (pResponseFuture->getBrokerAddr (), m_brokerName, m_msg, (RemotingCommand&)requestCommand, pCallback, left_timeout_ms, maxRetryTimes, retryTimes);
112- } catch (MQClientException& e) {
113- LOG_ERROR (" retry send exception:%s, opaque:%d, retryTimes:%d, msg:%s, not retry send again" , e.what (), opaque, retryTimes, m_msg.toString ().data ());
114- exception_flag = true ;
115- }
116-
117- if (exception_flag == false ) {
118- return ; // send retry again, here need return
119- }
120- }
121-
122- if (pCallback) {
123- MQException exception (" process send response error" , -1 , __FILE__,
124- __LINE__);
125- pCallback->onException (exception);
126- }
82+ SendResult ret = m_pClientAPI->processSendResponse (m_brokerName, m_msg, pResponse.get ());
83+ if (pCallback) {
84+ LOG_DEBUG (" operationComplete: processSendResponse success, opaque:%d, maxRetryTime:%d, retrySendTimes:%d" ,
85+ opaque, pResponseFuture->getMaxRetrySendTimes (), pResponseFuture->getRetrySendTimes ());
86+ pCallback->onSuccess (ret);
87+ }
88+ } catch (MQException &e) {
89+ LOG_ERROR (" operationComplete: processSendResponse exception: %s" , e.what ());
90+
91+ // broker may return exception, need consider retry send
92+ int maxRetryTimes = pResponseFuture->getMaxRetrySendTimes ();
93+ int retryTimes = pResponseFuture->getRetrySendTimes ();
94+ if (pResponseFuture->getAsyncFlag () && retryTimes < maxRetryTimes && maxRetryTimes > 1 ) {
95+
96+ int64 left_timeout_ms = pResponseFuture->leftTime ();
97+ string brokerAddr = pResponseFuture->getBrokerAddr ();
98+ auto &requestCommand = const_cast <RemotingCommand &>(pResponseFuture->getRequestCommand ());
99+ retryTimes += 1 ;
100+ LOG_WARN (" retry send, opaque:%d, sendTimes:%d, maxRetryTimes:%d, left_timeout:%lld, brokerAddr:%s, msg:%s" ,
101+ opaque, retryTimes, maxRetryTimes, left_timeout_ms, brokerAddr.data (), m_msg.toString ().data ());
102+
103+ bool exception_flag = false ;
104+ try {
105+ m_pClientAPI->sendMessageAsync (pResponseFuture->getBrokerAddr (), m_brokerName, m_msg, requestCommand,
106+ pCallback, left_timeout_ms, maxRetryTimes, retryTimes);
107+ } catch (MQClientException &e) {
108+ LOG_ERROR (" retry send exception:%s, opaque:%d, retryTimes:%d, msg:%s, not retry send again" ,
109+ e.what (), opaque, retryTimes, m_msg.toString ().data ());
110+ exception_flag = true ;
111+ }
112+
113+ if (!exception_flag) {
114+ return ; // send retry again, here need return
115+ }
116+ }
117+
118+ if (pCallback) {
119+ MQException exception (" process send response error" , -1 , __FILE__, __LINE__);
120+ pCallback->onException (exception);
121+ }
127122 }
128123 }
129124 if (pCallback && pCallback->getSendCallbackType () == autoDeleteSendCallback) {
@@ -132,18 +127,17 @@ void SendCallbackWrap::operationComplete(ResponseFuture* pResponseFuture,
132127}
133128
134129// <!************************************************************************
135- PullCallbackWarp::PullCallbackWarp (AsyncCallback* pAsyncCallback,
136- MQClientAPIImpl* pclientAPI, void * pArg)
137- : AsyncCallbackWrap(pAsyncCallback, pclientAPI) {
138- m_pArg = *static_cast <AsyncArg*>(pArg);
130+ PullCallbackWarp::PullCallbackWarp (AsyncCallback *pAsyncCallback, MQClientAPIImpl *pClientAPI, void *pArg)
131+ : AsyncCallbackWrap(pAsyncCallback, pClientAPI) {
132+ m_pArg = *static_cast <AsyncArg *>(pArg);
139133}
140134
141- PullCallbackWarp::~PullCallbackWarp () {}
135+ PullCallbackWarp::~PullCallbackWarp () = default ;
142136
143137void PullCallbackWarp::onException () {
144- if (m_pAsyncCallBack == NULL ) return ;
138+ if (m_pAsyncCallBack == nullptr ) return ;
145139
146- PullCallback* pCallback = static_cast <PullCallback*>(m_pAsyncCallBack);
140+ auto * pCallback = static_cast <PullCallback *>(m_pAsyncCallBack);
147141 if (pCallback) {
148142 MQException exception (" wait response timeout" , -1 , __FILE__, __LINE__);
149143 pCallback->onException (exception);
@@ -152,43 +146,42 @@ void PullCallbackWarp::onException() {
152146 }
153147}
154148
155- void PullCallbackWarp::operationComplete (ResponseFuture* pResponseFuture,
156- bool bProducePullRequest) {
149+ void PullCallbackWarp::onComplete (ResponseFuture *pResponseFuture) {
157150 unique_ptr<RemotingCommand> pResponse (pResponseFuture->getCommand ());
158- if (m_pAsyncCallBack == NULL ) {
151+ if (m_pAsyncCallBack == nullptr ) {
159152 LOG_ERROR (" m_pAsyncCallBack is NULL, AsyncPull could not continue" );
160153 return ;
161154 }
162- PullCallback* pCallback = static_cast <PullCallback*>(m_pAsyncCallBack);
155+ auto * pCallback = static_cast <PullCallback *>(m_pAsyncCallBack);
163156 if (!pResponse) {
164- string err = " unknow reseaon " ;
157+ string err = " unknown reason " ;
165158 if (!pResponseFuture->isSendRequestOK ()) {
166159 err = " send request failed" ;
167-
168160 } else if (pResponseFuture->isTimeOut ()) {
169- // pResponseFuture->setAsyncResponseFlag();
170161 err = " wait response timeout" ;
171162 }
172163 MQException exception (err, -1 , __FILE__, __LINE__);
173- LOG_ERROR (" Async pull exception of opaque:%d" ,
174- pResponseFuture->getOpaque ());
175- if (pCallback && bProducePullRequest) pCallback->onException (exception);
164+ LOG_ERROR (" Async pull exception of opaque:%d" , pResponseFuture->getOpaque ());
165+ if (pCallback) {
166+ pCallback->onException (exception);
167+ }
176168 } else {
177169 try {
178170 if (m_pArg.pPullWrapper ) {
179- unique_ptr<PullResult> pullResult (
180- m_pClientAPI->processPullResponse (pResponse.get ()));
181- PullResult result = m_pArg.pPullWrapper ->processPullResult (
182- m_pArg.mq , pullResult.get (), &m_pArg.subData );
183- if (pCallback)
184- pCallback->onSuccess (m_pArg.mq , result, bProducePullRequest);
171+ unique_ptr<PullResult> pullResult (m_pClientAPI->processPullResponse (pResponse.get ()));
172+ PullResult result = m_pArg.pPullWrapper ->processPullResult (m_pArg.mq , pullResult.get (), &m_pArg.subData );
173+ if (pCallback) {
174+ pCallback->onSuccess (m_pArg.mq , result, true );
175+ }
185176 } else {
186177 LOG_ERROR (" pPullWrapper had been destroyed with consumer" );
187178 }
188- } catch (MQException& e) {
179+ } catch (MQException & e) {
189180 LOG_ERROR (e.what ());
190181 MQException exception (" pullResult error" , -1 , __FILE__, __LINE__);
191- if (pCallback && bProducePullRequest) pCallback->onException (exception);
182+ if (pCallback) {
183+ pCallback->onException (exception);
184+ }
192185 }
193186 }
194187}
0 commit comments