小工具      在线工具  汉语词典  css  js  c++  java

分布式开放消息系统(RocketMQ)的原理与实践

Java 额外说明

收录于:93天前

距离这篇文章的撰写已经过去两年半了(201808)。我无法再保证内容是否已过时。由于目前业务中并没有使用RocketMQ,所以挖掘代码的时间很少,实际操作方面也很多。这个问题不可能对所有人都得到很好的解决。因此,建议您阅读介绍性文章。如果你在实践中遇到问题,在本地机器上运行代码并调试一下,或者去社区,可能对你解决问题更有帮助。当然这是一个简单的问题,我会尽力与大家沟通,谢谢大家。

分布式消息系统作为实现分布式系统扩展性和扩展性的关键组件,需要具备高吞吐量、高可用性等特性。在设计消息系统时,无法回避两个问题:

  1. 消息顺序问题
  2. 消息重复问题

RocketMQ是阿里巴巴开源的高性能、高吞吐量的消息中间件。它是如何解决这两个问题的呢? RocketMQ的主要特点是什么?其实现原理是什么?

主要功能及其实施方式

1. 顺序消息

消息排序意味着消息可以按照发送的顺序进行消费。例如:一个订单生成三个消息,分别是订单创建、订单支付、订单完成。消费时,只有按顺序消费才有意义。同时可以并行消费多个订单。首先看下面的例子:

如果生产者生成了两条消息:M1和M2,那么应该如何保证这两条消息的顺序呢?您想到的可能是这样的:

您可以使用此方法来确保消息顺序

假设M1发送到S1,M2发送到S2。如果要保证M1先于M2被消费,那么M1到达消费端并被消费后,需要通知S2,然后S2将M2发送到消费端。

这种模型的问题在于,如果将M1和M2分别发送到两台服务器,不能保证M1先到达MQ集群,也不能保证M1先被消费。从另一个角度来看,如果M2先于M1到达MQ集群,或者甚至在M2被消费之后,M1到达消费者端,那么消息就会乱序,说明上述模型无法保证消息的顺序。如何保证MQ集群中消息的顺序?一个简单的方法是将M1和M2发送到同一台服务器:

确保消息顺序,您的改进方法

这样可以保证M1先于M2到达MQServer(生产者等待M1发送成功后再发送M2)。按照先到先消费的原则,M1会先于M2被消费,从而保证了消息的顺序。

该模型只能理论上保证消息的顺序。在实际场景中,您可能会遇到以下问题:

网络延迟问题

每当消息从一台服务器发送到另一台服务器时,就会出现网络延迟问题。如上图所示,如果发送M1比发送M2花费的时间多,那么M2仍然会先被消费,消息的顺序仍然无法保证。即使M1和M2同时到达消费端,由于消费端1和消费端2的负载情况不清楚,M2仍然可能先于M1被消费。

那么如何解决这个问题呢?将M1和M2发送给同一个消费者,发送M1后,消费者必须响应成功才能发送M2。

如果你聪明的话,你可能想到了另一个问题:如果消费者1在向消费者发送M1后没有响应,那么你应该继续发送M2,还是重新发送M1?一般为了保证消息被消费,M1会重新发送给另一个消费者2,如下图所示。

确保消息序列的正确姿势

这样的模型严格保证了消息的顺序。如果你细心的话,还是会发现问题的。消费者1没有响应服务器有两种情况。一种是M1没有到达(网络传输过程中数据丢失),另一种是consumer客户端已经消费了M1并发送了响应消息,但是MQ Server还没有收到。如果是第二种情况,重新发送M1会导致M1被重复消耗。这也就引入了我们要讲的第二个问题,消息重复的问题,后面会详细解释。

回过头来看消息顺序问题,严格的顺序消息非常容易理解,也可以按照文章中介绍的方式进行简单处理。综上所述,实现严格顺序消息的简单可行的方式是:

保证生产者 - MQServer - 消费者是一对一对一的关系

虽然这种设计简单且易于实现,但也存在一些严重的问题,例如:

  1. 并行性会成为消息系统的瓶颈(吞吐量不足)
  2. 更多的异常处理,比如:只要消费端出现问题,整个处理过程就会被阻塞,我们就得花更多的精力去解决阻塞问题。

但我们的最终目标是实现集群的高容错性和高吞吐量。这看起来是一个不可调和的矛盾,那么阿里是如何解决的呢?

世界上解决一个计算机问题最简单的方法:“恰好”不需要解决它!—— 沉勋

有些问题,看起来很重要,但实际上我们可以通过设计合理或者分解问题来规避。如果硬要把时间花在解决问题本身,实际上不仅效率低下,而且也是一种浪费。从这个角度来看消息的顺序问题,我们可以得出两个结论:

  1. 其实有大量的应用是不讲究乱序的。
  2. 队列乱序并不意味着消息乱序

那么我们应该寻求从业务层面来保证消息的顺序,而不是仅仅依靠消息系统,是不是更合理的方式呢?

最后我们从源码角度分析RocketMQ是如何实现顺序发送消息的。

RocketMQ通过轮询所有队列(负载均衡策略)来确定消息发送到哪个队列。例如,在下面的示例中,具有相同订单号的消息将被陆续发送到同一个队列:

// RocketMQ通过MessageQueueSelector中实现的算法来确定消息发送到哪一个队列上
// RocketMQ默认提供了两种MessageQueueSelector实现:随机/Hash
// 当然你可以根据业务实现自己的MessageQueueSelector来决定消息按照何种策略发送到消息队列中
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
    @Override
    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
        Integer id = (Integer) arg;
        int index = id % mqs.size();
        return mqs.get(index);
    }
}, orderId);

在获取到路由信息以后,会根据MessageQueueSelector实现的算法来选择一个队列,同一个OrderId获取到的肯定是同一个队列。

private SendResult send()  {
    // 获取topic路由信息
    TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
    if (topicPublishInfo != null && topicPublishInfo.ok()) {
        MessageQueue mq = null;
        // 根据我们的算法,选择一个发送队列
        // 这里的arg = orderId
        mq = selector.select(topicPublishInfo.getMessageQueueList(), msg, arg);
        if (mq != null) {
            return this.sendKernelImpl(msg, mq, communicationMode, sendCallback, timeout);
        }
    }
}

2、消息重复

在解决上面的消息序列问题时,引入了一个新的问题,就是消息重复。那么RocketMQ是如何解决消息重复问题的呢?或者“仅仅”并不能解决问题。

消息重复的根本原因是:网络不可达。只要通过网络交换数据,就没有办法避免这个问题。所以解决这个问题的办法就是绕过它。那么问题就变成了:如果消费者收到两条相同的消息该怎么办?

  1. 消费者端消息处理的业务逻辑仍然是幂等的。
  2. 确保每条消息都有唯一的编号,并且消息处理成功且重复数据删除表的日志同时出现。

第1条很容易理解。只要保持幂等性,无论有多少重复消息来,最终的处理结果都是一样的。第二个原则是使用日志表来记录已成功处理的消息的ID。如果新到达的消息ID已经在日志表中,那么该消息将不再被处理。

第一种方案显然应该在消费者侧实现,而不是消息系统要实现的功能。第2条可以在消息系统中实现,也可以在业务侧实现。一般情况下,重复消息的概率其实很小。如果通过消息系统来实现,肯定会对消息系统的吞吐量和高可用性产生影响。因此,消息重复的问题最好由业务端自己处理。这也是RocketMQ没有解决消息重复的问题。

RocketMQ不保证消息不重复。如果您的业务需要保证消息严格不重复,那么您需要在业务端自行进行去重。

3、交易消息

RocketMQ除了支持普通消息和顺序消息外,还支持事务消息。首先,我们来讨论一下什么是交易消息以及支持交易消息的必要性。我们以一个转账场景为例来说明这个问题:Bob向Smith转账100元。

在单机环境下,事务执行情况大概如下:

单机环境下转账交易示意图

当用户数量达到一定程度后,Bob和Smith的账户和余额信息不再在同一台服务器上,那么上面的流程就变成这样:

集群环境下转账交易示意图

这时候你会发现,同样的转账业务,在集群环境下,时间消耗成倍增加,这显然是不可接受的。那么如何避免这个问题呢?

大事务=小事务+异步

将大事务拆分为多个小事务以异步执行。这样基本上可以优化跨机事务的执行效率与单机一致。转账交易可以分解为以下两个小交易:

小事务+异步消息

图中,执行本地交易(借记Bob的账户)和发送异步消息应该保证同时成功或失败。也就是说,如果扣费成功,那么消息一定发送成功。如果扣费失败,则无法再发送消息。问题是:是先扣钱还是先发消息?

首先我们先看一下发送消息的情况。粗略图如下:

交易消息:先发送消息

问题是:如果消息发送成功但是扣款失败,消费者就会消费该消息,然后给史密斯账户充值。

如果先发消息不行,那就先扣钱吧。总体图如下:

交易消息-借方优先

存在的问题与上面类似:如果扣款成功但消息发送失败,Bob会扣钱,但Smith的账户上不会添加任何钱。

解决这个问题的方法可能有很多,比如直接将消息放到Bob的扣款交易中。如果发送失败,则会抛出异常,并回滚事务。这样的处理方式也符合“只是”不需要解决的原则。

这里需要说明一下:如果使用Spring来管理事物,可以把发送消息的逻辑放到本地事物中。如果消息发送失败,将会抛出异常。 Spring捕获异常后,会回滚事物,以保证Local事物和发送消息的原子性。

RocketMQ支持事务消息。我们来看看RocketMQ是如何实现的。

RocketMQ实现发送交易消息

RocketMQ第一阶段发送Prepared消息时,会拿到消息的地址,第二阶段执行本地事物,第三阶段通过第一阶段拿到的地址去访问消息,并修改消息的状态。

细心的你可能又发现问题了,如果确认消息发送失败了怎么办?RocketMQ会定期扫描消息集群中的事物消息,如果发现了Prepared消息,它会向消息发送端(生产者)确认,Bob的钱到底是减了还是没减呢?如果减了是回滚还是继续发送确认消息呢?RocketMQ会根据发送端设置的策略来决定是回滚还是继续发送确认消息。这样就保证了消息发送与本地事务同时成功或同时失败。

那我们来看下RocketMQ源码,是如何处理事务消息的。客户端发送事务消息的部分(完整代码请查看:rocketmq-example工程下的com.alibaba.rocketmq.example.transaction.TransactionProducer):

// =============================发送事务消息的一系列准备工作========================================
// 未决事务,MQ服务器回查客户端
// 也就是上文所说的,当RocketMQ发现`Prepared消息`时,会根据这个Listener实现的策略来决断事务
TransactionCheckListener transactionCheckListener = new TransactionCheckListenerImpl();
// 构造事务消息的生产者
TransactionMQProducer producer = new TransactionMQProducer("groupName");
// 设置事务决断处理类
producer.setTransactionCheckListener(transactionCheckListener);
// 本地事务的处理逻辑,相当于示例中检查Bob账户并扣钱的逻辑
TransactionExecuterImpl tranExecuter = new TransactionExecuterImpl();
producer.start()
// 构造MSG,省略构造参数
Message msg = new Message(......);
// 发送消息
SendResult sendResult = producer.sendMessageInTransaction(msg, tranExecuter, null);
producer.shutdown();

接着查看sendMessageInTransaction方法的源码,总共分为3个阶段:发送Prepared消息、执行本地事务、发送确认消息。

//  ================================事务消息的发送过程=============================================
public TransactionSendResult sendMessageInTransaction(.....)  {
    // 逻辑代码,非实际代码
    // 1.发送消息
    sendResult = this.send(msg);
    // sendResult.getSendStatus() == SEND_OK
    // 2.如果消息发送成功,处理与消息关联的本地事务单元
    LocalTransactionState localTransactionState = tranExecuter.executeLocalTransactionBranch(msg, arg);
    // 3.结束事务
    this.endTransaction(sendResult, localTransactionState, localException);
}

endTransaction方法会将请求发往broker(mq server)去更新事务消息的最终状态:

  1. 根据sendResult找到Prepared消息sendResult包含事务消息的ID
  2. 根据localTransaction更新消息的最终状态

如果endTransaction方法执行失败,数据没有发送到broker,导致事务消息的 状态更新失败,broker会有回查线程定时(默认1分钟)扫描每个存储事务状态的表格文件,如果是已经提交或者回滚的消息直接跳过,如果是prepared状态则会向Producer发起CheckTransaction请求,Producer会调用DefaultMQProducerImpl.checkTransactionState()方法来处理broker的定时回调请求,而checkTransactionState会调用我们的事务设置的决断方法来决定是回滚事务还是继续执行,最后调用endTransactionOnewaybroker来更新消息的最终状态。

回到转账的例子,如果Bob的账户余额已经减少,消息发送成功,而Smith开始消费消息,就会出现两个问题:消费失败和消费超时。解决超时问题的思路是不断重试,直到消费者成功消费完消息。整个过程中可能会出现消息重复的问题,可以按照之前的思路解决。

消费者交易消息

这样基本上可以解决消费端超时问题,但是如果消费失败怎么办?阿里提供给我们的解决方法是:手动解决。大家可以考虑一下,按照事务的流程,因为某种原因Smith加款失败,那么需要回滚整个流程。如果消息系统要实现这个回滚流程的话,系统复杂度将大大提升,且很容易出现Bug,估计出现Bug的概率会比消费失败的概率大很多。这也是RocketMQ目前暂时没有解决这个问题的原因,在设计实现消息系统时,我们需要衡量是否值得花这么大的代价来解决这样一个出现概率非常小的问题,这也是大家在解决疑难问题时需要多多思考的地方。

20160321补充:在3.2.6版本中移除了事务消息的实现,所以此版本不支持事务消息,具体情况请参考rocketmq的issues(已失效):
https://github.com/alibaba/RocketMQ/issues/65
https://github.com/alibaba/RocketMQ/issues/138
https://github.com/alibaba/RocketMQ/issues/156

4. Producer如何发送消息

Producer轮询某topic下的所有队列的方式来实现发送方的负载均衡,如下图所示:

Producer发送消息负载均衡


首先分析一下RocketMQ的客户端发送消息的源码:

// 构造Producer
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
// 初始化Producer,整个应用生命周期内,只需要初始化1次
producer.start();
// 构造Message
Message msg = new Message("TopicTest1",// topic
                        "TagA",// tag:给消息打标签,用于区分一类消息,可为null
                        "OrderID188",// key:自定义Key,可以用于去重,可为null
                        ("Hello MetaQ").getBytes());// body:消息内容
// 发送消息并返回结果
SendResult sendResult = producer.send(msg);
// 清理资源,关闭网络连接,注销自己
producer.shutdown();

在整个应用程序生命周期中,生产者需要调用一次start方法进行初始化。初始化过程中完成的主要任务有:

  1. 如果没有指定namesrv地址,将会自动寻址
  2. 启动计划任务:更新namesrv地址、从namsrv更新主题路由信息、清理失效的broker、向所有broker发送心跳...
  3. 启动负载均衡服务

初始化完成后,开始发送消息。发送消息的主要代码如下:

private SendResult sendDefaultImpl(Message msg,......) {
    // 检查Producer的状态是否是RUNNING
    this.makeSureStateOK();
    // 检查msg是否合法:是否为null、topic,body是否为空、body是否超长
    Validators.checkMessage(msg, this.defaultMQProducer);
    // 获取topic路由信息
    TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
    // 从路由信息中选择一个消息队列
    MessageQueue mq = topicPublishInfo.selectOneMessageQueue(lastBrokerName);
    // 将消息发送到该队列上去
    sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, timeout);
}

代码中需要关注的两个方法tryToFindTopicPublishInfoselectOneMessageQueue。前面说过在producer初始化时,会启动定时任务获取路由信息并更新到本地缓存,所以tryToFindTopicPublishInfo会首先从缓存中获取topic路由信息,如果没有获取到,则会自己去namesrv获取路由信息。selectOneMessageQueue方法通过轮询的方式,返回一个队列,以达到负载均衡的目的。

如果Producer发送消息失败,它会自动重试。重试策略为:

  1. 重试次数< retryTimesWhenSendFailed(可配置)
  2. 总耗时(包含重试n次的时间)<sendMsgTimeout(发送消息时传入的参数)
  3. 当以上两个条件同时满足后,Producer就会选择另一个队列来发送消息。

5. 消息存储

RocketMQ的消息存储是由consume queuecommit log配合完成的。

1.消费队列

consume queue是消息的逻辑队列,相当于字典的目录,用来指定消息在物理文件commit log上的位置。

我们可以在配置中指定consumequeuecommitlog存储的目录
每个topic下的每个queue都有一个对应的consumequeue文件,比如:

${rocketmq.home}/store/consumequeue/${topicName}/${queueId}/${fileName}

Consume Queue文件组织,如图:

Consume Queue 文件组织图

  1. 根据topicqueueId来组织文件,图中TopicA有两个队列0,1,那么TopicA和QueueId=0组成一个ConsumeQueue,TopicA和QueueId=1组成另一个ConsumeQueue。
  2. 按照消费端的GroupName来分组重试队列,如果消费端消费失败,消息将被发往重试队列中,比如图中的%RETRY%ConsumerGroupA
  3. 按照消费端的GroupName来分组死信队列,如果消费端消费失败,并重试指定次数后,仍然失败,则发往死信队列,比如图中的%DLQ%ConsumerGroupA

死信队列(Dead Letter Queue)一般用于存储由于某种原因无法投递的消息,例如处理失败或者过期的消息。

Consume Queue中的存储单元是20字节的定长二进制数据,顺序写入,顺序读取,如下图所示:

ConsumerQueue 文件存储单元格式

  1. CommitLog Offset是指这条消息在Commit Log文件中的实际偏移量
  2. Size存储消息的大小
  3. 消息标签HashCode存储消息的标签的哈希值:主要用于订阅时的消息过滤(如果订阅时指定了标签,则可以根据HashCode快速找到订阅的消息)

2. 提交日志

CommitLog:消息存放的物理文件,每台broker上的commitlog被本机所有的queue共享,不做任何区分。
文件的默认位置如下,仍然可通过配置文件修改:

${user.home} \store\${commitlog}\${fileName}

CommitLog的消息存储单元长度不固定,文件顺序写入,随机读取。消息的存储结构如下表所示,按照号码顺序以及号码对应的内容依次存储。

Commit Log存储单元结构图

3.消息存储实现

消息存储的实现比较复杂,值得大家深入了解。稍后将在单独的文件中进行分析(目前正在收集材料)。本节仅用代码来说明具体过程。

// Set the storage time
msg.setStoreTimestamp(System.currentTimeMillis());
// Set the message body BODY CRC (consider the most appropriate setting
msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
synchronized (this) {
    long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
    // Here settings are stored timestamp, in order to ensure an orderly global
    msg.setStoreTimestamp(beginLockTimestamp);
    // MapedFile:操作物理文件在内存中的映射以及将内存数据持久化到物理文件中
    MapedFile mapedFile = this.mapedFileQueue.getLastMapedFile();
    // 将Message追加到文件commitlog
    result = mapedFile.appendMessage(msg, this.appendMessageCallback);
    switch (result.getStatus()) {
    case PUT_OK:break;
    case END_OF_FILE:
         // Create a new file, re-write the message
         mapedFile = this.mapedFileQueue.getLastMapedFile();
         result = mapedFile.appendMessage(msg, this.appendMessageCallback);
     break;
     DispatchRequest dispatchRequest = new DispatchRequest(
                topic,// 1
                queueId,// 2
                result.getWroteOffset(),// 3
                result.getWroteBytes(),// 4
                tagsCode,// 5
                msg.getStoreTimestamp(),// 6
                result.getLogicsOffset(),// 7
                msg.getKeys(),// 8
                /**
                 * Transaction
                 */
                msg.getSysFlag(),// 9
                msg.getPreparedTransactionOffset());// 10
    // 1.分发消息位置到ConsumeQueue
    // 2.分发到IndexService建立索引
    this.defaultMessageStore.putDispatchRequest(dispatchRequest);
}

4.消息索引文件

如果消息包含键值,则使用 IndexFile 来存储消息索引。文件内容结构如下:

留言索引

索引文件主要用于根据key查询消息。主要流程为:

  1. 根据查询的key的hashcode%slotNum获取具体的slot位置(slotNum为索引文件中包含的最大slot数量,例如如图slotNum=5000000)
  2. 根据slotValue(槽位置对应的值)查找索引项列表中的最后一项(倒序排列,slotValue始终指向最新的索引项)
  3. 遍历索引项列表返回查询时间范围内的结果集(默认一次最多返回32条记录)

6.消息订阅

RocketMQ消息订阅有两种模式,一种是Push模式,即MQServer主动推送给消费者;另一种是Pull模式,即消费者在需要时主动从MQServer拉取。但在具体实现上,Push和Pull模式都采用了消费者主动拉取的方式。

首先我们看一下consumer端的负载均衡:

消费者负载均衡

消费者将使用RebalanceService线程每10秒加载一次基于主题的所有队列:

  1. 遍历Consumer下的所有主题,然后根据主题订阅所有消息
  2. 获取同一主题和消费者组下的所有消费者
  3. 然后根据具体的分配策略来分配消费队列。分配策略包括:平均分配、消费者配置等。

如上图所示:如果有5个队列,2个消费者,那么第一个Consumer消费3个队列,第二个Consumer消费2个队列。这里使用的平均分配策略与分页过程类似。 TOPIC下的所有队列都是记录,Consumer的数量相当于页面总数。那么每页的记录数就类似于某个Consumer会去消费哪些队列。

通过这样的策略,可以达到大致的平均消费。这样的设计还可以在很多方面拓展消费者,提高消费能力。

消费端的Push模式是通过长轮询模式实现的,如下图:

推送模式图

Consumer端定期主动向Broker发送拉取消息请求。 Broker收到Pull请求后,如果有消息就会立即返回数据。 Consumer端收到返回的消息后,会回调消费者设置的Listener方法。如果 Broker 收到 Pull 请求时消息队列中没有数据,则 Broker 会阻塞该请求,直到数据送达或超时才返回。

当然,Consumer端是通过一个线程将阻塞队列LinkedBlockingQueue<PullRequest>中的PullRequest发送到broker拉取消息,以防止Consumer一致被阻塞。而Broker端,在接收到Consumer的PullRequest时,如果发现没有消息,就会把PullRequest扔到ConcurrentHashMap中缓存起来。broker在启动时,会启动一个线程不停的从ConcurrentHashMap取出PullRequest检查,直到有数据返回。

7、RocketMQ的其他特性

前面的六大功能基本都涵盖了。如果你想深入理解它们,你需要查看源代码并在实践中应用它们。当然,除了已经提到的功能之外,RocketMQ还支持:

  1. 定时消息
  2. 新闻刷新策略
  3. 主动同步策略:同步双写、异步复制
  4. 海量消息积累能力
  5. 高效沟通
  6. …………

其中涉及到的很多设计思路和解决方案都值得我们深入研究:

  1. 消息存储设计:既要满足海量消息的积累能力,又要满足极快的查询效率,还要保证写入效率。
  2. 高效的通信组件设计:高吞吐量和毫秒级的消息传递能力离不开高效的通信。
  3. …………

RocketMQ最佳实践

1. 生产者最佳实践

1、一个应用尽可能用一个 Topic,消息子类型用 tags 来标识,tags 可以由应用自由设置。只有发送消息设置了tags,消费方在订阅消息时,才可以利用 tags 在 broker 做消息过滤。
2、每个消息在业务层面的唯一标识码,要设置到 keys 字段,方便将来定位消息丢失问题。由于是哈希索引,请务必保证 key 尽可能唯一,这样可以避免潜在的哈希冲突。
3、消息发送成功或者失败,要打印消息日志,务必要打印 sendresult 和 key 字段。
4、对于消息不可丢失应用,务必要有消息重发机制。例如:消息发送失败,存储到数据库,能有定时程序尝试重发或者人工触发重发。
5、某些应用如果不关注消息是否发送成功,请直接使用sendOneWay方法发送消息。

2. 消费者最佳实践

1、消费过程要做到幂等(即消费端去重)
2、尽量使用批量方式消费方式,可以很大程度上提高消费吞吐量。
3、优化每条消息消费过程

3.其他配置

线上应该关闭autoCreateTopicEnable,即在配置文件中将其设置为false

RocketMQ在发送消息时,会首先获取路由信息。如果是新的消息,由于MQServer上面还没有创建对应的Topic,这个时候,如果上面的配置打开的话,会返回默认TOPIC的(RocketMQ会在每台broker上面创建名为TBW102的TOPIC)路由信息,然后Producer会选择一台Broker发送消息,选中的broker在存储消息时,发现消息的topic还没有创建,就会自动创建topic。后果就是:以后所有该TOPIC的消息,都将发送到这台broker上,达不到负载均衡的目的。

因此,基于RocketMQ目前的设计,建议关闭自动创建TOPIC的功能,然后根据消息量的大小手动创建TOPIC。

RocketMQ设计相关

RocketMQ的设计假设:

每台PC机器都可能宕机不可服务
任意集群都有可能处理能力不足
最坏的情况一定会发生
内网环境需要低延迟来提供最佳用户体验

RocketMQ的关键设计:

分布式集群化
强数据安全
海量数据堆积
毫秒级投递延迟(推拉模式)

这是RocketMQ设计的前提,也是它需要达到的效果。我认为这些假设适用于所有系统设计。随着我们系统的服务越来越多,每个开发者都必须关注自己的程序是否存在单点故障,挂了如何恢复,能否很好的横向扩展,对外接口是否足够高效,以及是否由自己管理。数据足够安全吗?只有通过标准化您的设计,您才能开发高效且强大的程序。

参考

    RocketMQ 用户指南 RocketMQ原理介绍 RocketMQ最佳实践 阿里巴巴分布式开放消息服务(ONS)原理与实践2 阿里巴巴分布式开放消息服务(ONS)原理与实践3 RocketMQ原理分析
. . .

相关推荐

额外说明

在linux上更新新的jar程序

    查看指定jar进程命令 #ps aux|grep xxx.jar 或用命令找到自己8080的jar程序 # netstat -ntlp     接着删除已有的进程 #kill -9 9028     最后使用下面命令可后台运行jar,这时关闭Xs

额外说明

PostgreSQL 如何计算服务器配置参数的大小

文章目录 大家好,我是只谈技术不剪发的 Tony 老师。 PostgreSQL 的默认配置参数通常都偏小,不太适合生产环境。因此,我们在安装数据库之后需要修改服务器的配置参数。上一篇文章中介绍了如何设置服务器的配置参数。今天给大家介绍一个快速计算 Pos

额外说明

Apache HTTP Server 2.4 绿色版 安装成系统服务

以前电脑上装过Apache并配置了PHP环境,后来重做系统了这些就不能用了,这几天又需要弄PHP的东西了,所以又要把这一套架起来。因为第一次安装完成的文件都在,所以这次直接用绿色版。   参考安装的文章是这篇:Apache HTTP Server 2.4

额外说明

java重试机制实现方案

本文内容是目前团队内小磊同学对重试机制实现方案的梳理总结。 从为什么需要重试的背景开始,到重试的场景,大致的一些设计思路,最后通过两个成熟的retry组件进行案例讲解,理论+实战。 背景 重试是系统提高容错能力的一种手段。在一次请求中,往往需要经过多个服

额外说明

Ubuntu安装pygame的过程记录与分享(包括python的卸载,openssl的安装,python3.7的安装,pygame的安装)

目录 讲在前面:本人安装环境为Xubuntu 14.04,系统自带python2.7和python3.6,openssl 1.0.1。 1.卸载python3.6(只是纯粹追求更高版本) 2.下载安装openssl-1.1.1b(由于python3.7太

额外说明

docker部署mysql数据库

docker部署mysql数据库 官方镜像地址:https://hub.docker.com/_/mysql 创建数据库配置文件,从容器中取出默认配置文件 mkdir -p /data/mysql/{ conf.d,data} docker create

额外说明

Java代码弱点与修复之——Open redirect(开放重定向)

弱点描述 Open redirect , 开放重定向,是一种常见的安全漏洞,也被称为“重定向漏洞”。该漏洞通常出现在 Web 应用程序中,攻击者可以利用它将用户重定向到恶意站点,从而进行钓鱼攻击、恶意软件传播、诱骗等活动。 在 Java 中,通过重定向

额外说明

WCF宿主与服务托管

WCF宿主与服务托管 若要公开WCF服务,需要提供一个运行服务的宿主环境。就像.NET CLR需要创建宿主环境以托管代码一般,WCF的宿主环境同样运行在进程的应用程序域中。在应用程序域中可以创建一个或多个ServiceHost实例,其关系如图一所示:  

额外说明

【软考 系统架构设计师】数据库系统⑦ 数据库的安全性与备份

>>回到总目录<< 为了不辜负已经订阅了专栏的同学们的信任,所以本专栏不会有任何的优惠活动。 另外,当订阅人数每次达到 2 n ( n > 2 ) 2^n(n>2) 2

额外说明

模态对话框(antd-design组件库)简单易用

1.模态对话框 模态对话框。 2.何时使用 当用户需要处理事务但又不想跳转到页面而中断工作流程时,可以使用Modal在当前页面中间开辟一个浮层来承载相应的操作。 另外,当需要一个简洁的确认框来询问用户时,可以使用App.useApp封装的语法糖方法。 组

ads via 小工具