消息队列的消息存取需求
- 消息保序
- 重复消息处理
- 消息可靠性保证
基于 List 的消息队列解决方案
List 本身就是按先进先出的顺序对数据进行存取
- LPUSH 命令
- 把要发送的消息依次写入 List
- RPOP 命令
- 从 List 的另一端按照消息的写入顺序,依次读取消息
- BRPOP 命令也称为阻塞式读取
- 客户端在没有读到队列数据时,自动阻塞,直到有新的数据写入队列,再开始读取新数据
- BRPOPLPUSH 命令
- 保证消息可靠性
- 让消费者程序从一个 List 中读取消息,同时,Redis 会把这个消息再插入到另一个 List(可以叫作备份 List)留存
- 如果消费者程序读了消息但没能正常处理,等它重启后,就可以从备份 List 中重新读取消息并进行处理
基于 Streams 的消息队列解决方案
Streams 是 Redis 5.0 专门针对消息队列场景设计的数据类型 和 List 相比,Streams 同样能够满足消息队列的三大需求。而且,它还支持消费组形式的消息读取
Streams 是 Redis 专门为消息队列设计的数据类型
- XADD:插入消息,保证有序,可以自动生成全局唯一 ID;
- XREAD:用于读取消息,可以按 ID 读取数据;
- XREADGROUP:按消费组形式读取消息;
- 消息队列中的消息一旦被消费组里的一个消费者读取了,就不能再被该消费组内的其他消费者读取了
- 目的是让组内的多个消费者共同分担读取消息
- XPENDING 和 XACK:
- XPENDING 命令可以用来查询每个消费组内所有消费者已读取但尚未确认的消息
- XACK 命令用于向消息队列确认消息处理已完成
- 为了保证消费者在发生故障或宕机再次重启后,仍然可以读取未处理完的消息,Streams 会自动使用内部队列(也称为 PENDING List)留存消费组里每个消费者读取的消息,直到消费者使用 XACK 命令通知 Streams“消息已经处理完成”。
- 如果消费者没有成功处理消息,它就不会给 Streams 发送 XACK 命令,消息仍然会留存。此时,消费者可以在重启后,用 XPENDING 命令查看已读取、但尚未确认处理完成的消息。
小结
- Streams 是 Redis 5.0 专门针对消息队列场景设计的数据类型,支持:有序、判重、可靠性。
- 一个 Redis 能搞定的消息队列,还要什么其他消息队列?
- Redis 是一个非常轻量级的键值数据库,部署一个 Redis 实例就是启动一个进程,部署 Redis 集群,也就是部署多个 Redis 实例
- Kafka、RabbitMQ 部署时,涉及额外的组件,例如 Kafka 的运行就需要再部署 ZooKeeper
- 相比 Redis 来说,Kafka 和 RabbitMQ 一般被认为是重量级的消息队列
精选留言
Kaito
如果一个生产者发送给消息队列的消息,需要被多个消费者进行读取和处理,你会使用Redis的什么数据类型来解决这个问题?
这种情况下,只能使用Streams数据类型来解决。使用Streams数据类型,创建多个消费者组,就可以实现同时消费生产者的数据。每个消费者组内可以再挂多个消费者分担读取消息进行消费,消费完成后,各自向Redis发送XACK,标记自己的消费组已经消费到了哪个位置,而且消费组之间互不影响。
另外,老师在介绍使用List用作队列时,为了保证消息可靠性,使用BRPOPLPUSH命令把消息取出的同时,还把消息插入到备份队列中,从而防止消费者故障导致消息丢失。
这种情况下,还需要额外做一些工作,也就是维护这个备份队列:每次执行BRPOPLPUSH命令后,因为都会把消息插入一份到备份队列中,所以当消费者成功消费取出的消息后,最好把备份队列中的消息删除,防止备份队列存储过多无用的数据,导致内存浪费。
这篇文章主要是讲消息队列的使用,借这个机会,也顺便总结一下使用消息队列时的注意点:
在使用消息队列时,重点需要关注的是如何保证不丢消息?
那么下面就来分析一下,哪些情况下,会丢消息,以及如何解决?
1、生产者在发布消息时异常:
a) 网络故障或其他问题导致发布失败(直接返回错误,消息根本没发出去) b) 网络抖动导致发布超时(可能发送数据包成功,但读取响应结果超时了,不知道结果如何)
情况a还好,消息根本没发出去,那么重新发一次就好了。但是情况b没办法知道到底有没有发布成功,所以也只能再发一次。所以这两种情况,生产者都需要重新发布消息,直到成功为止(一般设定一个最大重试次数,超过最大次数依旧失败的需要报警处理)。这就会导致消费者可能会收到重复消息的问题,所以消费者需要保证在收到重复消息时,依旧能保证业务的正确性(设计幂等逻辑),一般需要根据具体业务来做,例如使用消息的唯一ID,或者版本号配合业务逻辑来处理。
2、消费者在处理消息时异常:
也就是消费者把消息拿出来了,但是还没处理完,消费者就挂了。这种情况,需要消费者恢复时,依旧能处理之前没有消费成功的消息。使用List当作队列时,也就是利用老师文章所讲的备份队列来保证,代价是增加了维护这个备份队列的成本。而Streams则是采用ack的方式,消费成功后告知中间件,这种方式处理起来更优雅,成熟的队列中间件例如RabbitMQ、Kafka都是采用这种方式来保证消费者不丢消息的。
3、消息队列中间件丢失消息
上面2个层面都比较好处理,只要客户端和服务端配合好,就能保证生产者和消费者都不丢消息。但是,如果消息队列中间件本身就不可靠,也有可能会丢失消息,毕竟生产者和消费这都依赖它,如果它不可靠,那么生产者和消费者无论怎么做,都无法保证数据不丢失。
a) 在用Redis当作队列或存储数据时,是有可能丢失数据的:一个场景是,如果打开AOF并且是每秒写盘,因为这个写盘过程是异步的,Redis宕机时会丢失1秒的数据。而如果AOF改为同步写盘,那么写入性能会下降。另一个场景是,如果采用主从集群,如果写入量比较大,从库同步存在延迟,此时进行主从切换,也存在丢失数据的可能(从库还未同步完成主库发来的数据就被提成主库)。总的来说,Redis不保证严格的数据完整性和主从切换时的一致性。我们在使用Redis时需要注意。
b) 而采用RabbitMQ和Kafka这些专业的队列中间件时,就没有这个问题了。这些组件一般是部署一个集群,生产者在发布消息时,队列中间件一般会采用写多个节点+预写磁盘的方式保证消息的完整性,即便其中一个节点挂了,也能保证集群的数据不丢失。当然,为了做到这些,方案肯定比Redis设计的要复杂(毕竟是专们针对队列场景设计的)。
综上,Redis可以用作队列,而且性能很高,部署维护也很轻量,但缺点是无法严格保数据的完整性(个人认为这就是业界有争议要不要使用Redis当作队列的地方)。而使用专业的队列中间件,可以严格保证数据的完整性,但缺点是,部署维护成本高,用起来比较重。
所以我们需要根据具体情况进行选择,如果对于丢数据不敏感的业务,例如发短信、发通知的场景,可以采用Redis作队列。如果是金融相关的业务场景,例如交易、支付这类,建议还是使用专业的队列中间件。