RocketMQ最佳实践,就看这一篇!

语言: CN / TW / HK

点击上方蓝色字关注我们~

RocketMQ是阿里开源的消息队列框架,如今也已成为Apache顶级项目,RockerMQ是一个非常优秀的框架,现在大部分互联网公司使用的消息队列也是RocketMQ,在我们使用的过程中,如果能一开始就给你最佳实践,可以避免走一些弯路,甚至你看完之后可以自身检查下你们是不是这样使用,没有的话可以进行适当的调整,这篇文章应该能够帮助你更好的使用RockerMQ。

1

〓Producer最佳实践

1、Topic

一个应用尽可能用一个 Topic,消息子类型用 tags 来标识,tags 可以由应用自由设置。只有发送消息设置了tags,消费方在订阅消息时,才可以利用 tags 在 broker 做消息过滤:message.setTags("TagA")。

2、Key

每个消息在业务层面的唯一标识码,要设置到 keys 字段,方便将来定位消息丢失问题。服务器会为每个消息创建索引(哈希索引),应用可以通过topic、key来查询这条消息内容,以及消息被谁消费。

每个消息在业务层面的唯一标识码要设置到keys字段,方便将来定位消息丢失问题。

由于是哈希索引,请务必保证 key 尽可能唯一,这样可以避免潜在的哈希冲突。


   

// 订单Id

String orderId = "20034568923546";

message.setKeys(orderId);

3、日志

消息发送成功或者失败,要打印消息日志,务必要打印 sendresult 和 key 字段。

send消息方法只要不抛异常,就代表发送成功。

发送成功会有多个状态,在sendResult里定义。

4、重发

对于消息不可丢失应用,务必要有消息重发机制。

例如:消息发送失败,存储到数据库,能有定时程序尝试重发或者人工触发重发。

5、sendOneWay

某些应用如果不关注消息是否发送成功,请直接使用sendOneWay方法发送消息。

对可靠性要求并不高,例如日志收集类应用,此类应用可以采用oneway形式调用。

1

Consumer最佳实践

1、幂等

消费过程要做到幂等(即消费端去重)。

RocketMQ目前无法避免消息重复,所以如果业务对消费重复非常敏感,务必要在业务层面去重,有以下几种去重方式:

a).将消息的唯一键,可以是msgId,也可以是消息内容中的唯一标识字段,例如订单Id等,消费之前判断是否在Db或Tair(全局KV存储)中存在,如果不存在则插入,并消费,否则跳过。

b). 用业务层面的状态机去重。

2、批量消费

尽量使用批量方式消费方式,可以很大程度上提高消费吞吐量。

某些业务流程如果支持批量方式消费,则可以很大程度上提高消费吞吐量,例如订单扣款类应用,一次处理一个订单耗时 1 s,一次处理 10 个订单可能也只耗时 2 s,这样即可大幅度提高消费的吞吐量,通过设置 consumer的 consumeMessageBatchMaxSize 返个参数,默认是 1,即一次只消费一条消息,例如设置为 N,那么每次消费的消息数小于等于 N。

3、 跳过非重要消息

发生消息堆积时,如果消费速度一直追不上发送速度,如果业务对数据要求不高的话,可以选择丢弃不重要的消息。

例如,当某个队列的消息数堆积到100000条以上,则尝试丢弃部分或全部消息,这样就可以快速追上发送消息的速度。

示例代码如下:

public ConsumeConcurrentlyStatus consumeMessage(
            List<MessageExt> msgs,
            ConsumeConcurrentlyContext context) {
        long offset = msgs.get(0).getQueueOffset();
        String maxOffset =
                msgs.get(0).getProperty(Message.PROPERTY_MAX_OFFSET);
        long diff = Long.parseLong(maxOffset) - offset;
        if (diff > 100000) {
            // TODO 消息堆积情况的特殊处理
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
        // TODO 正常消费过程
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }

4、提高消费并行度

a). 同一个ConsumerGroup下,通过增加Consumer实例数量来提高并行度,超过订阅队列数的Consumer实例无效。

可以通过加机器,或者在已有机器启动多个进程的方式。

b). 提高单个Consumer的消费并行线程,通过修改以下参数: 

consumeThreadMin consumeThreadMax

5、优化每条消息消费过程

举例如下,某条消息的消费过程如下:

  • 根据消息从 DB 查询【数据 1】

  • 根据消息从 DB 查询【数据 2】

  • 复杂的业务计算

  • 向 DB 插入【数据 3】

  • 向 DB 插入【数据 4】

这条消息的消费过程中有4次与 DB的 交互,如果按照每次 5ms 计算,那么总共耗时 20ms,假设业务计算耗时 5ms,那么总过耗时 25ms,所以如果能把 4 次 DB 交互优化为 2 次,那么总耗时就可以优化到 15ms,即总体性能提高了 40%。

所以应用如果对时延敏感的话,可以把DB部署在SSD硬盘,相比于SCSI磁盘,前者的RT会小很多。

6、日志

消费时记录日志,以便后续定位问题。

如果消息量较少,建议在消费入口方法打印消息,消费耗时等,方便后续排查问题。

   public ConsumeConcurrentlyStatus consumeMessage(
            List<MessageExt> msgs,
            ConsumeConcurrentlyContext context) {
        log.info("RECEIVE_MSG_BEGIN: " + msgs.toString());
        // TODO 正常消费过程
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }   

如果能打印每条消息消费耗时,那么在排查消费慢等线上问题时,会更方便。

〓其他配置

1、线上应该关闭autoCreateTopicEnable,即在配置文件中将其设置为false。

RocketMQ在发送消息时,会首先获取路由信息。如果是新的消息,由于MQServer上面还没有创建对应的Topic,这个时候,如果上面的配置打开的话,会返回默认TOPIC的(RocketMQ会在每台broker上面创建名为TBW102的TOPIC)路由信息,然后Producer会选择一台Broker发送消息,选中的broker在存储消息时,发现消息的topic还没有创建,就会自动创建topic。

后果 就是:以后所有该TOPIC的消息,都将发送到这台broker上,达不到负载均衡的目的。

所以基于目前RocketMQ的设计, 建议 关闭自动创建TOPIC的功能,然后根据消息量的大小,手动创建TOPIC。

2、JVM选项

推荐使用最新发布的JDK 1.8版本。通过设置相同的Xms和Xmx值来防止JVM调整堆大小以获得更好的性能。

简单的JVM配置如下所示:

-server -Xms8g -Xmx8g -Xmn4g

如果您不关心RocketMQ Broker的启动时间,还有一种更好的选择,就是通过“预触摸”Java堆以确保在JVM初始化期间每个页面都将被分配。

那些不关心启动时间的人可以启用它: -XX:+AlwaysPreTouch

禁用偏置锁定可能会减少JVM暂停,  -XX:-UseBiasedLocking

至于垃圾回收,建议使用带JDK 1.8的G1收集器。


  

-XX:+UseG1GC -XX:G1HeapRegionSize=16m

-XX:G1ReservePercent=25 -XX:InitiatingHeapOccupancyPercent=30

这些GC选项看起来有点激进,但事实证明它在我们的生产环境中具有良好的性能。另外不要把-XX:MaxGCPauseMillis的值设置太小,否则JVM将使用一个小的年轻代来实现这个目标,这将导致非常频繁的minor GC,所以建议使用rolling GC日志文件:


  

-XX:+UseGCLogFileRotation

-XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=30m

如果写入GC文件会增加代理的延迟,可以考虑将GC日志文件重定向到内存文件系统:

-Xloggc:/dev/shm/mq_gc_%p.log123

3、Linux内核参数

os.sh脚本在bin文件夹中列出了许多内核参数,可以进行微小的更改然后用于生产用途。

下面的参数需要注意,更多细节请参考 /proc/sys/vm/ *的文档

  • vm.extra_free_kbytes ,告诉VM在后台回收(kswapd)启动的阈值与直接回收(通过分配进程)的阈值之间保留额外的可用内存。RocketMQ使用此参数来避免内存分配中的长延迟。(与具体内核版本相关)

  • vm.min_free_kbytes ,如果将其设置为低于1024KB,将会巧妙的将系统破坏,并且系统在高负载下容易出现死锁。

  • vm.max_map_count ,限制一个进程可能具有的最大内存映射区域数。RocketMQ将使用mmap加载CommitLog和ConsumeQueue,因此建议将为此参数设置较大的值。(agressiveness --> aggressiveness)

  • vm.swappiness ,定义内核交换内存页面的积极程度。较高的值会增加攻击性,较低的值会减少交换量。建议将值设置为10来避免交换延迟。

  • File descriptor limits ,RocketMQ需要为文件(CommitLog和ConsumeQueue)和网络连接打开文件描述符。我们建议设置文件描述符的值为655350。

  • Disk scheduler ,RocketMQ建议使用I/O截止时间调度器,它试图为请求提供有保证的延迟。

如果您有更好的实践经验,欢迎留言。

▽参考资料:

http://jm.taobao.org/2017/03/09/20170309/

https://github.com/apache/rocketmq/blob/master/docs/cn/best_practice.md

分享到: