网站首页 > 资源文章 正文
RocketMQ由NameServer、Broker、Producer、Consumer构成,NameServer类似于RPC框架的注册中心,Broker负责存储消息,Producer和Consumer之间没有直接关系,通过Broker交互。整体架构如下图:
MQProducer(和MQConsumer)继承接口MQAdmin(定义创建topic、查看指定MessageQueue的offset、通过offsetId查看消息),支持同步(或异步、单向)发送单条(或批量)的(事务型或非事务性)消息。
除此之外,MQProducer还支持类似RPC的同步调用,即通过request方法发送消息,并在Consumer消费后返回消息。
先看Producer的构造方法和启动:
# rocketmq-client-4.9.1.jar!/org.apache.rocketmq.client.producer.DefaultMQProducer
// 0. 启动时(或构造此对象前)设置系统属性"rocketmq.namesrv.addr"(或环境变量"NAMESRV_ADDR")
public DefaultMQProducer(final String namespace, final String producerGroup, RPCHook rpcHook,
boolean enableMsgTrace, final String customizedTraceTopic) {
this.namespace = namespace; // 1. 若使用不含namespace的构造函数,则尝试从namesrv地址获取(符合"{schema}://MQ_INST_{1}_{2}.{domain}"则取”MQ_INST_{1}_{2}“)。topic在内部处理时,除系统topic(以"rmq_sys_"开头或个别特定topic如"TBW102",见TopicValidator)外都会添加namespace前缀(如果是重试或死信,则添加到中间位置)
this.producerGroup = producerGroup; // 2. 若使用不含producerGroup的构造函数,则使用默认的”DEFAULT_PRODUCER“
defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook); // 3. RPCHook用于消息发送请求前后的处理,可能为null。构造DefaultMQProducerImpl时会创建异步发送消息的线程池,队列长度为50000
//if client open the message trace feature
if (enableMsgTrace) {
try {
AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(producerGroup, TraceDispatcher.Type.PRODUCE, customizedTraceTopic, rpcHook); // 4.customizedTraceTopic为null时,使用默认的topic:RMQ_SYS_TRACE_TOPIC
dispatcher.setHostProducer(this.getDefaultMQProducerImpl());
traceDispatcher = dispatcher;
this.getDefaultMQProducerImpl().registerSendMessageHook( // 5. 注册发送消息钩子
new SendMessageTraceHookImpl(traceDispatcher));
this.defaultMQProducerImpl.registerEndTransactionHook( // 6. 注册事务结束钩子
new EndTransactionTraceHookImpl(traceDispatcher));
} catch (Throwable e) {
log.error("system mqtrace hook init failed ,maybe can't send msg trace data");
}
}
}
@Override
public void start() throws MQClientException {
this.setProducerGroup(withNamespace(this.producerGroup)); // 7. 重置producerGroup:如果有namespace,或namesrv能推测出namespace,则拼接成"{namespace}%{producerGroup}"格式
this.defaultMQProducerImpl.start(); // 8. 启动消息生产者
if (null != traceDispatcher) {
try {
traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel()); // 9. 启动消息跟踪
} catch (MQClientException e) {
log.warn("trace dispatcher start failed ", e);
}
}
}
接着我们看下DefaultMQProducerImpl的启动过程:
# rocketmq-client-4.9.1.jar!/org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl
public void start() throws MQClientException {
this.start(true);
}
public void start(final boolean startFactory) throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST: // 1. 初始状态
this.serviceState = ServiceState.START_FAILED;
this.checkConfig(); // 2. 检查producerGroup(不能为空,长度应小于255,字符应属于”%|a-zA-Z0-9_-“,且不能是”DEFAULT_PRODUCER“)
if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
this.defaultMQProducer.changeInstanceNameToPID(); // 3. 实例名取自系统属性”rocketmq.client.name“,若为空则取默认值”DEFAULT“,当producerGroup不是”CLIENT_INNER_PRODUCER“,且默认实例名时重置实例名为进程号与当前时间({pid}#{nanoTime})
}
this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook); // 4. 以IP、实例名作为clientId(如果设置了unitName则添加unitName),创建MQClientInstance,并保存映射关系到MQClientManager.factoryTable
boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this); // 5. 将producerGroup与DefaultMQProducerImpl绑定,放到MQClientInstance.producerTable
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
null);
}
this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo()); // 6. 将”TBW102“这个topic放入topicPublishInfoTable
if (startFactory) {
mQClientFactory.start(); // 7. 启动MQClientInstance
}
log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
this.defaultMQProducer.isSendMessageWithVIPChannel());
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The producer service state not OK, maybe started once, "
+ this.serviceState
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
default:
break;
}
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock(); // 8. 给所有broker发送心跳,心跳包内容为{clientId:,producerGroup:[]},当brokerAddrTable不为空时,给broker发送心跳,并记录broker对应的版本
this.startScheduledTask(); // 9. 使用单线程的定时任务线程池,周期性(间隔1秒)扫描requestFutureTable,将超时的RequestResponseFuture移出,设置异常原因,并执行回调
}
再接着看下MQClientInstance的启动:
# rocketmq-client-4.9.1.jar!/org.apache.rocketmq.client.impl.factory.MQClientInstance
public void start() throws MQClientException {
synchronized (this) {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
// If not specified,looking address from name server
if (null == this.clientConfig.getNamesrvAddr()) {
this.mQClientAPIImpl.fetchNameServerAddr(); // 1. 如果未设置namesrv,则使用HTTP GET方式从指定URL获取。此处应该是淘宝内部使用方式,默认地址为”http://jmenv.tbsite.net:8080/rocketmq/nsaddr?nofix=1“。可以通过系统属性修改(参考TopAddressing, MixAll)
}
// Start request-response channel
this.mQClientAPIImpl.start(); // 2. 启动netty,使用rocketmq的NettyEncoder、NettyDecoder来编解码,使用rocketmq的NettyRemotingClient.NettyClientHandler来处理
// Start various schedule tasks
// 3. 如果未设置namesrv,周期性(每2分钟)获取地址(见步骤1:mQClientAPIImpl.fetchNameServerAddr)。
// 周期性从producerTable获取topic,然后根据topic从namesrv获取TopicRouteData,如果信息变化,则更新brokerAddrTable,以及更新topic相关的TopicPublishInfo
// 周期性(每个心跳间隔)清除brokerAddrTable中下线的broker信息,然后给所有broker发送心跳
// 周期性从consumerTable遍历MQConsumerInner,持久化其offset。(广播模式使用本地文件,集群模式使用远程的broker)
// 周期性(每隔1分钟)调整线程池:遍历consumerTable,根据处理队列里消息数量,增加或减少核心线程数
this.startScheduledTask();
// Start pull service
this.pullMessageService.start(); // 4. 创建一个线程,Runnable对象为PullMessageService,启动线程后会阻塞式获取PullRequest,根据Consumer的consumerGroup,从consumerTable获取MQConsumerInner,拉取消息
// Start rebalance service
this.rebalanceService.start(); // 5. 创建线程,Runnable对象为RebalanceService,它会隔一定时间,遍历consumerTable获取MQConsumerInner,根据topic和当前消费者情况以一定策略,调整消费队列,以心跳形式通知到broker
// Start push service
this.defaultMQProducer.getDefaultMQProducerImpl().start(false); // 6. 再次执行DefaultMQProducerImpl.start,但不会循环调用MQClientInstance.start
log.info("the client factory [{}] start OK", this.clientId);
this.serviceState = ServiceState.RUNNING;
break;
case START_FAILED:
throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
default:
break;
}
}
}
最后,我们看下消息发送。
先介绍同步发送:
# rocketmq-client-4.9.1.jar!/org.apache.rocketmq.client.producer.DefaultMQProducer
@Override
public SendResult send(
Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
Validators.checkMessage(msg, this);
msg.setTopic(withNamespace(msg.getTopic())); // 1. 如果有namespace,重置topic
return this.defaultMQProducerImpl.send(msg);
}
# rocketmq-client-4.9.1.jar!/org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl
public SendResult send(
Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return send(msg, this.defaultMQProducer.getSendMsgTimeout());
}
public SendResult send(Message msg,
long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);
}
private SendResult sendDefaultImpl( // 1. 异步send最终也是调用此方法,只是在线程池中执行任务,不关心返回值,而参数值CommunicationMode为CommunicationMode.ASYNC。单向发送也是调用此方法,参数值CommunicationMode为CommunicationMode.ONEWAY。
Message msg,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
this.makeSureStateOK(); // 2. 检查serviceState是否为RUNNING
Validators.checkMessage(msg, this.defaultMQProducer);
final long invokeID = random.nextLong();
long beginTimestampFirst = System.currentTimeMillis();
long beginTimestampPrev = beginTimestampFirst;
long endTimestamp = beginTimestampFirst;
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic()); // 3. 从topicPublishInfoTable根据topic得到TopicPublishInfo(为空则先从namesrv获取TopicRouteData,转换成TopicPublishInfo)
if (topicPublishInfo != null && topicPublishInfo.ok()) {
boolean callTimeout = false;
MessageQueue mq = null;
Exception exception = null;
SendResult sendResult = null;
int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
int times = 0;
String[] brokersSent = new String[timesTotal];
for (; times < timesTotal; times++) {
String lastBrokerName = null == mq ? null : mq.getBrokerName();
MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName); // 4. 使用mqFaultStrategy根据topic、最近一次使用的broker信息挑选MessageQueue。(一些send方法支持入参传MessageQueue或MessageQueueSelector)
if (mqSelected != null) {
mq = mqSelected;
brokersSent[times] = mq.getBrokerName();
try {
beginTimestampPrev = System.currentTimeMillis();
if (times > 0) {
//Reset topic with namespace during resend.
msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));
}
long costTime = beginTimestampPrev - beginTimestampFirst;
if (timeout < costTime) {
callTimeout = true;
break;
}
sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime); // 5. 核心的消息发送代码,见后面分析
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
switch (communicationMode) {
case ASYNC:
return null;
case ONEWAY:
return null;
case SYNC:
if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) { // 6. 发送状态不是成功时,根据属性决定是否在另一个broker重试(默认值不重试)
continue;
}
}
return sendResult; // 7. 仅同步返回结果
default:
break;
}
} catch (RemotingException e) { // 8. 指定条件(异常类型,及某些类型的返回码)重试
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
log.warn(msg.toString());
exception = e;
continue;
} catch (MQClientException e) {
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
log.warn(msg.toString());
exception = e;
continue;
} catch (MQBrokerException e) {
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
log.warn(msg.toString());
exception = e;
switch (e.getResponseCode()) {
case ResponseCode.TOPIC_NOT_EXIST:
case ResponseCode.SERVICE_NOT_AVAILABLE:
case ResponseCode.SYSTEM_ERROR:
case ResponseCode.NO_PERMISSION:
case ResponseCode.NO_BUYER_ID:
case ResponseCode.NOT_IN_CURRENT_UNIT:
continue;
default:
if (sendResult != null) {
return sendResult;
}
throw e;
}
} catch (InterruptedException e) {
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
log.warn(msg.toString());
log.warn("sendKernelImpl exception", e);
log.warn(msg.toString());
throw e;
}
} else {
break;
}
}
if (sendResult != null) {
return sendResult;
}
String info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s",
times,
System.currentTimeMillis() - beginTimestampFirst,
msg.getTopic(),
Arrays.toString(brokersSent));
info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED);
MQClientException mqClientException = new MQClientException(info, exception);
if (callTimeout) {
throw new RemotingTooMuchRequestException("sendDefaultImpl call timeout");
}
if (exception instanceof MQBrokerException) {
mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode());
} else if (exception instanceof RemotingConnectException) {
mqClientException.setResponseCode(ClientErrorCode.CONNECT_BROKER_EXCEPTION);
} else if (exception instanceof RemotingTimeoutException) {
mqClientException.setResponseCode(ClientErrorCode.ACCESS_BROKER_TIMEOUT);
} else if (exception instanceof MQClientException) {
mqClientException.setResponseCode(ClientErrorCode.BROKER_NOT_EXIST_EXCEPTION);
}
throw mqClientException;
}
validateNameServerSetting();
throw new MQClientException("No route info of this topic: " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),
null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);
}
private SendResult sendKernelImpl(final Message msg,
final MessageQueue mq,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final TopicPublishInfo topicPublishInfo,
final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
long beginStartTime = System.currentTimeMillis();
String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName()); // 5.1 从MQClientInstance.brokerAddrTable根据brokerName获取主broker地址
if (null == brokerAddr) { // 5.2 重试
tryToFindTopicPublishInfo(mq.getTopic());
brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
}
SendMessageContext context = null;
if (brokerAddr != null) {
brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr); // 5.3 可能使用VIP的处理(设置系统属性”com.rocketmq.sendMessageWithVIPChannel“才能启用,VIP地址为原端口-2)
byte[] prevBody = msg.getBody();
try {
//for MessageBatch,ID has been set in the generating process
if (!(msg instanceof MessageBatch)) {
MessageClientIDSetter.setUniqID(msg); // 5.4 非MessageBatch类型的Message,设置属性”UNIQ_KEY“值(包含ip、进程号、ClassLoader哈希值、类加载至今时间、自增计数)
}
boolean topicWithNamespace = false;
if (null != this.mQClientFactory.getClientConfig().getNamespace()) { // 5.5 namespace存在时,设置Message属性”INSTANCE_ID“为namespace
msg.setInstanceId(this.mQClientFactory.getClientConfig().getNamespace());
topicWithNamespace = true;
}
int sysFlag = 0;
boolean msgBodyCompressed = false;
if (this.tryToCompressMessage(msg)) { // 5.6 当消息类型不是MessageBatch,且消息体长度过大时(默认4k),按一定压缩比(默认5级)压缩消息体,再重设消息体
sysFlag |= MessageSysFlag.COMPRESSED_FLAG; // 5.7 压缩消息后标记sysFlag
msgBodyCompressed = true;
}
final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {
sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE; // 5.8 (使用TransactionMQProducer发送的事务型消息)消息属性”TRAN_MSG“被解析成true时,标记sysFlag
}
if (hasCheckForbiddenHook()) { // 5.9 如果存在CheckForbiddenHook(需用户注册,默认不存在),执行CheckForbiddenHook.checkForbidden
CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext();
checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr());
checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup());
checkForbiddenContext.setCommunicationMode(communicationMode);
checkForbiddenContext.setBrokerAddr(brokerAddr);
checkForbiddenContext.setMessage(msg);
checkForbiddenContext.setMq(mq);
checkForbiddenContext.setUnitMode(this.isUnitMode());
this.executeCheckForbiddenHook(checkForbiddenContext);
}
if (this.hasSendMessageHook()) { // 5.10 SendMessageHook存在时(如果允许trace,则会有SendMessageTraceHookImpl),执行SendMessageHook.sendMessageBefore
context = new SendMessageContext();
context.setProducer(this);
context.setProducerGroup(this.defaultMQProducer.getProducerGroup());
context.setCommunicationMode(communicationMode);
context.setBornHost(this.defaultMQProducer.getClientIP());
context.setBrokerAddr(brokerAddr);
context.setMessage(msg);
context.setMq(mq);
context.setNamespace(this.defaultMQProducer.getNamespace());
String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (isTrans != null && isTrans.equals("true")) {
context.setMsgType(MessageType.Trans_Msg_Half);
}
if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {
context.setMsgType(MessageType.Delay_Msg);
}
this.executeSendMessageHookBefore(context);
}
SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
requestHeader.setTopic(msg.getTopic());
requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
requestHeader.setQueueId(mq.getQueueId());
requestHeader.setSysFlag(sysFlag);
requestHeader.setBornTimestamp(System.currentTimeMillis());
requestHeader.setFlag(msg.getFlag());
requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
requestHeader.setReconsumeTimes(0);
requestHeader.setUnitMode(this.isUnitMode());
requestHeader.setBatch(msg instanceof MessageBatch);
if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { // 5.11 topic以”%RETRY%“开头,重置SendMessageRequestHeader的消费次数,清除消息的”RECONSUME_TIME“属性
String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
if (reconsumeTimes != null) {
requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));
MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);
}
String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);
if (maxReconsumeTimes != null) {
requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));
MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);
}
}
SendResult sendResult = null;
switch (communicationMode) {
case ASYNC: // 5.12 异步消息topic去除namespace,发送的消息体使用可能压缩的内容,但原Message.body需还原成压缩前
Message tmpMessage = msg;
boolean messageCloned = false;
if (msgBodyCompressed) {
//If msg body was compressed, msgbody should be reset using prevBody.
//Clone new message using commpressed message body and recover origin massage.
//Fix bug:https://github.com/apache/rocketmq-externals/issues/66
tmpMessage = MessageAccessor.cloneMessage(msg);
messageCloned = true;
msg.setBody(prevBody);
}
if (topicWithNamespace) {
if (!messageCloned) {
tmpMessage = MessageAccessor.cloneMessage(msg);
messageCloned = true;
}
msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));
}
long costTimeAsync = System.currentTimeMillis() - beginStartTime;
if (timeout < costTimeAsync) {
throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
}
sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
brokerAddr,
mq.getBrokerName(),
tmpMessage,
requestHeader,
timeout - costTimeAsync,
communicationMode,
sendCallback,
topicPublishInfo,
this.mQClientFactory,
this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),
context,
this);
break;
case ONEWAY:
case SYNC:
long costTimeSync = System.currentTimeMillis() - beginStartTime;
if (timeout < costTimeSync) {
throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
}
sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage( // 5.13 同步发送消息
brokerAddr,
mq.getBrokerName(),
msg,
requestHeader,
timeout - costTimeSync,
communicationMode,
context,
this);
break;
default:
assert false;
break;
}
if (this.hasSendMessageHook()) {
context.setSendResult(sendResult);
this.executeSendMessageHookAfter(context); // 5.14 SendMessageHook存在时(如果允许trace,则会有SendMessageTraceHookImpl),执行SendMessageHook.sendMessageAfter
}
return sendResult;
} catch (RemotingException e) {
if (this.hasSendMessageHook()) {
context.setException(e);
this.executeSendMessageHookAfter(context);
}
throw e;
} catch (MQBrokerException e) {
if (this.hasSendMessageHook()) {
context.setException(e);
this.executeSendMessageHookAfter(context);
}
throw e;
} catch (InterruptedException e) {
if (this.hasSendMessageHook()) {
context.setException(e);
this.executeSendMessageHookAfter(context);
}
throw e;
} finally { // 5.15 还原body为未压缩状态,重置topic为不含namespace
msg.setBody(prevBody);
msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));
}
}
throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
}
再跟进去的发送代码:
# rocketmq-client-4.9.1.jar!/org.apache.rocketmq.client.impl.MQClientAPIImpl
public SendResult sendMessage( // 1. 同步发送
final String addr,
final String brokerName,
final Message msg,
final SendMessageRequestHeader requestHeader,
final long timeoutMillis,
final CommunicationMode communicationMode,
final SendMessageContext context,
final DefaultMQProducerImpl producer
) throws RemotingException, MQBrokerException, InterruptedException {
return sendMessage(addr, brokerName, msg, requestHeader, timeoutMillis, communicationMode, null, null, null, 0, context, producer);
}
public SendResult sendMessage( // 2. 同步/异步发送
final String addr,
final String brokerName,
final Message msg,
final SendMessageRequestHeader requestHeader,
final long timeoutMillis,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final TopicPublishInfo topicPublishInfo,
final MQClientInstance instance,
final int retryTimesWhenSendFailed,
final SendMessageContext context,
final DefaultMQProducerImpl producer
) throws RemotingException, MQBrokerException, InterruptedException {
long beginStartTime = System.currentTimeMillis();
RemotingCommand request = null;
String msgType = msg.getProperty(MessageConst.PROPERTY_MESSAGE_TYPE); // 3. 默认未设置,模拟RPC时可通过MessageUtil.createReplyMessage来设置
boolean isReply = msgType != null && msgType.equals(MixAll.REPLY_MESSAGE_FLAG);
if (isReply) {
if (sendSmartMsg) {
SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
request = RemotingCommand.createRequestCommand(RequestCode.SEND_REPLY_MESSAGE_V2, requestHeaderV2);
} else {
request = RemotingCommand.createRequestCommand(RequestCode.SEND_REPLY_MESSAGE, requestHeader);
}
} else {
if (sendSmartMsg || msg instanceof MessageBatch) { // 4. 创建RemotingCommand(sendSmartMsg根据系统属性""org.apache.rocketmq.client.sendSmartMsg"来定,默认为true)
SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
request = RemotingCommand.createRequestCommand(msg instanceof MessageBatch ? RequestCode.SEND_BATCH_MESSAGE : RequestCode.SEND_MESSAGE_V2, requestHeaderV2);
} else {
request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader);
}
}
request.setBody(msg.getBody());
switch (communicationMode) {
case ONEWAY:
this.remotingClient.invokeOneway(addr, request, timeoutMillis);
return null;
case ASYNC:
final AtomicInteger times = new AtomicInteger();
long costTimeAsync = System.currentTimeMillis() - beginStartTime;
if (timeoutMillis < costTimeAsync) {
throw new RemotingTooMuchRequestException("sendMessage call timeout");
}
this.sendMessageAsync(addr, brokerName, msg, timeoutMillis - costTimeAsync, request, sendCallback, topicPublishInfo, instance,
retryTimesWhenSendFailed, times, context, producer);
return null;
case SYNC:
long costTimeSync = System.currentTimeMillis() - beginStartTime;
if (timeoutMillis < costTimeSync) {
throw new RemotingTooMuchRequestException("sendMessage call timeout");
}
return this.sendMessageSync(addr, brokerName, msg, timeoutMillis - costTimeSync, request); // 5. 调用NettyRemotingClient的invokeSync方法(执行前调RPCHook.doBeforeRequest,执行后调用RPCHook.doAfterResponse),然后处理结果
default:
assert false;
break;
}
return null;
}
单向只会在执行前调RPCHook.doBeforeRequest,然后获取单向的信号量才能写消息。
异步调用只会在执行前调RPCHook.doBeforeRequest,也需要获取信号量,还有InvokeCallback,在此时触发SendMessageHook和SendCallback。
然后看下事务型消息的处理:
# rocketmq-client-4.9.1.jar!/org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl
public TransactionSendResult sendMessageInTransaction(final Message msg,
final LocalTransactionExecuter localTransactionExecuter, final Object arg)
throws MQClientException {
TransactionListener transactionListener = getCheckListener();
if (null == localTransactionExecuter && null == transactionListener) {
throw new MQClientException("tranExecutor is null", null);
}
// ignore DelayTimeLevel parameter
if (msg.getDelayTimeLevel() != 0) {
MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_DELAY_TIME_LEVEL);
}
Validators.checkMessage(msg, this.defaultMQProducer);
SendResult sendResult = null;
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true"); // 1. 设置消息属性”TRAN_MSG“、”PGROUP“
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
try {
sendResult = this.send(msg); // 2. 发送消息(见之前普通的发送代码)
} catch (Exception e) {
throw new MQClientException("send message Exception", e);
}
LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
Throwable localException = null;
switch (sendResult.getSendStatus()) {
case SEND_OK: {
try {
if (sendResult.getTransactionId() != null) {
msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
}
String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
if (null != transactionId && !"".equals(transactionId)) {
msg.setTransactionId(transactionId);
}
if (null != localTransactionExecuter) { // 2. 使用LocalTransactionExecuter或TransactionListener来执行本地事务
localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);
} else if (transactionListener != null) {
log.debug("Used new transaction API");
localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
}
if (null == localTransactionState) {
localTransactionState = LocalTransactionState.UNKNOW;
}
if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
log.info("executeLocalTransactionBranch return {}", localTransactionState);
log.info(msg.toString());
}
} catch (Throwable e) {
log.info("executeLocalTransactionBranch exception", e);
log.info(msg.toString());
localException = e;
}
}
break;
case FLUSH_DISK_TIMEOUT:
case FLUSH_SLAVE_TIMEOUT:
case SLAVE_NOT_AVAILABLE:
localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
break;
default:
break;
}
try {
this.endTransaction(msg, sendResult, localTransactionState, localException); // 3. 结束事务:(EndTransactionHook如果存在)则先执行EndTransactionHook.endTransaction,然后发送单向消息进行提交(半消息发送成功且本地事务成功)或回滚
} catch (Exception e) {
log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
}
TransactionSendResult transactionSendResult = new TransactionSendResult();
transactionSendResult.setSendStatus(sendResult.getSendStatus());
transactionSendResult.setMessageQueue(sendResult.getMessageQueue());
transactionSendResult.setMsgId(sendResult.getMsgId());
transactionSendResult.setQueueOffset(sendResult.getQueueOffset());
transactionSendResult.setTransactionId(sendResult.getTransactionId());
transactionSendResult.setLocalTransactionState(localTransactionState);
return transactionSendResult;
}
再看下同步RPC如何模拟:
# rocketmq-client-4.9.1.jar!/org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl
public Message request(Message msg,
long timeout) throws RequestTimeoutException, MQClientException, RemotingException, MQBrokerException, InterruptedException {
long beginTimestamp = System.currentTimeMillis();
prepareSendRequest(msg, timeout); // 1. 以UUID作为关联id给消息属性”CORRELATION_ID“,以clientId给消息属性”REPLY_TO_CLIENT“,以超时时间作为消息属性”TTL“
final String correlationId = msg.getProperty(MessageConst.PROPERTY_CORRELATION_ID);
try {
final RequestResponseFuture requestResponseFuture = new RequestResponseFuture(correlationId, timeout, null);
RequestFutureTable.getRequestFutureTable().put(correlationId, requestResponseFuture);
long cost = System.currentTimeMillis() - beginTimestamp;
this.sendDefaultImpl(msg, CommunicationMode.ASYNC, new SendCallback() { // 2. 发送消息
@Override
public void onSuccess(SendResult sendResult) {
requestResponseFuture.setSendRequestOk(true);
}
@Override
public void onException(Throwable e) {
requestResponseFuture.setSendRequestOk(false);
requestResponseFuture.putResponseMessage(null);
requestResponseFuture.setCause(e);
}
}, timeout - cost);
return waitResponse(msg, timeout, requestResponseFuture, cost); // 3. 等待Future结果
} finally {
RequestFutureTable.getRequestFutureTable().remove(correlationId);
}
}
从客户端代码无法了解发送消息的客户端如何接收响应的消息,即一个ProducerGroup下有多个实例,在request模式下,只能是发起request请求的Producer能收到Message,具体如何处理,需要后续分析Broker相关代码了。
猜你喜欢
- 2024-11-19 要懂redis,首先得看懂sds(全网最细节的sds讲解)
- 2024-11-19 迷之 crontab 异常:不运行、不报错、无日志?原来是这些原因
- 2024-11-19 「Shiro 系列 07」Shiro 中密码加盐
- 2024-11-19 K8S:分享一次“乌龙问题”(人为导致的无法正常删除命名空间)
- 2024-11-19 DBCC CHECKD 手工修复和优化数据库 各种参数的用法说明
- 2024-11-19 开发利器丨如何使用ELK设计微服务中的日志收集方案?
- 2024-11-19 人民艺Show|共赏经典话剧:北京人民艺术剧院《雷雨》
- 2024-11-19 聊聊springboot项目如何实现自定义actuator端点
- 2024-11-19 Doris Rollup物化视图及应用实践
- 2024-11-19 JavaDemo案例演示RocketMQ DLedger宕机故障下的高可用
你 发表评论:
欢迎- 最近发表
-
- Linux系统Shell脚本编程之whiptail图形化工具编写系统管理程序
- Linux常用命令讲解及Shell脚本开发实战入门二
- Linux命令手册:从青铜到王者,这30个命令让你成为终端高手
- Shell脚本编程入门:轻松掌握自动化利器
- 阿里巴巴《Linux命令行与shell脚本编程大全》高清版 PDF 开放下载
- Lazygit:让Git操作变得直观高效的终端魔法
- 2GB内存电脑跑Win10太卡 程序员求助 网友怀念起XP系统
- 觉得Linux很难?不妨试试2025年这些Linux桌面版!
- Linux运维工程师必知的服务器备份工具:Rsnapshot
- 推荐给系统管理员的10款Linux GUI工具
- 标签列表
-
- 电脑显示器花屏 (79)
- 403 forbidden (65)
- linux怎么查看系统版本 (54)
- 补码运算 (63)
- 缓存服务器 (61)
- 定时重启 (59)
- plsql developer (73)
- 对话框打开时命令无法执行 (61)
- excel数据透视表 (72)
- oracle认证 (56)
- 网页不能复制 (84)
- photoshop外挂滤镜 (58)
- 网页无法复制粘贴 (55)
- vmware workstation 7 1 3 (78)
- jdk 64位下载 (65)
- phpstudy 2013 (66)
- 卡通形象生成 (55)
- psd模板免费下载 (67)
- shift (58)
- localhost打不开 (58)
- 检测代理服务器设置 (55)
- frequency (66)
- indesign教程 (55)
- 运行命令大全 (61)
- ping exe (64)
本文暂时没有评论,来添加一个吧(●'◡'●)