PushConsumer与PullConsumer的区别.pdf

  1. 1、本文档共13页,可阅读全部内容。
  2. 2、有哪些信誉好的足球投注网站(book118)网站文档一经付费(服务费),不意味着购买了该文档的版权,仅供个人/单位学习、研究之用,不得用于商业用途,未经授权,严禁复制、发行、汇编、翻译或者网络传播等,侵权必究。
  3. 3、本站所有内容均由合作方或网友上传,本站不对文档的完整性、权威性及其观点立场正确性做任何保证或承诺!文档内容仅供研究参考,付费前请自行鉴别。如您付费,意味着您自己接受本站规则且自行承担风险,本站不退款、不进行额外附加服务;查看《如何避免下载的几个坑》。如果您已付费下载过本站文档,您可以点击 这里二次下载
  4. 4、如文档侵犯商业秘密、侵犯著作权、侵犯人身权等,请点击“版权申诉”(推荐),也可以打举报电话:400-050-0827(电话支持时间:9:00-18:30)。
查看更多
PushConsumer与PullConsumer的区别

PushConsumer 与PullConsumer 的区别 概念上来说PushConsumer 应该是推给消费者,不考虑消费者的消费 能力,PullConsumer 则是消费者主动去拉队列中的消息。 但是RocketMq 与Kafka 的实现方式不太一样,RocketMQ 的 Consumer 都是从 Broker 拉消息来消费,但是为了能做到实时收消 息,RocketMQ 使用长轮询方式,可以保证消息实时性同 Push 方式一 致。返种长轮询方式类似亍 Web QQ 收収消息机制。 PushConsumer 的consumeFromWhere 参数可以指定在消费者第一次启 动时从哪个位置开始消费,默认情况下CONSUME_FROM_LAST_OFFSET 从 队列最后开始消费,也就是不消费之前的数据。后续启动会从上次消 费的位置开始消费,该参数不再起作用,但是需要注意的是重启 Consumer 有可能造成重复消费数据,因为消费的位置信息并不是实时 保存的,这点非常重要,在业务系统上就要防止重复数据提交,持久 化消费的位置信息间隔时间可以通过参数persistConsumerOffsetInterval 配 置,默认是5 秒,但我自己测试发现似乎无效。 使用PullConsumer 则可以指定某个Offset 开始Pull 消息。 测试代码 import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrently Context ; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyS tatus ; import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcu rrently ; import com.alibaba.rocketmq.client.exception.MQClientException ; import mon.consumer.ConsumeFromWhere ; import mon.message.MessageExt ; import tocol.heartbeat.MessageModel ; import java.util.List ; import java.util.concurrent.TimeUnit ; /** * Created by Administrator on 2016/1/6 0006. */ public class PushConsumerTest { static int i = 0 ; /** * 当前例子是PushConsumer 用法,使用方式给用户感觉是消息从 RocketMQ 服务器推到了应用客户端。br * 但是实际PushConsumer 内部是使用长轮询Pull 方式从Broker 拉消息, 然后再回调用户Listener 方法br */ public static void main(String[] args) throws InterruptedException, MQClientException { /** * 一个应用创建一个Consumer,由应用来维护此对象,可以设置为全局 对象或者单例br * 注意:ConsumerGroupName 需要由应用来保证唯一 */ final DefaultMQPushConsumer consumer = new DefaultMQPushConsumer( TestGroup) ; // consumer.setInstanceName(TestConsumer); consumer.setNamesrvAddr( ipAddress:9876); consumer.setConsumeThreadMin( 1000); consumer.setConsumeThreadMax( 1000); /** * 订阅指定topic

文档评论(0)

dajuhyy + 关注
实名认证
内容提供者

该用户很懒,什么也没介绍

1亿VIP精品文档

相关文档