- 1、有哪些信誉好的足球投注网站(book118)网站文档一经付费(服务费),不意味着购买了该文档的版权,仅供个人/单位学习、研究之用,不得用于商业用途,未经授权,严禁复制、发行、汇编、翻译或者网络传播等,侵权必究。。
- 2、本站所有内容均由合作方或网友上传,本站不对文档的完整性、权威性及其观点立场正确性做任何保证或承诺!文档内容仅供研究参考,付费前请自行鉴别。如您付费,意味着您自己接受本站规则且自行承担风险,本站不退款、不进行额外附加服务;查看《如何避免下载的几个坑》。如果您已付费下载过本站文档,您可以点击 这里二次下载。
- 3、如文档侵犯商业秘密、侵犯著作权、侵犯人身权等,请点击“版权申诉”(推荐),也可以打举报电话:400-050-0827(电话支持时间:9:00-18:30)。
查看更多
PushConsumer与PullConsumer的区别
PushConsumer 与PullConsumer 的区别 概念上来说PushConsumer 应该是推给消费者,不考虑消费者的消费 能力,PullConsumer 则是消费者主动去拉队列中的消息。 但是RocketMq 与Kafka 的实现方式不太一样,RocketMQ 的 Consumer 都是从 Broker 拉消息来消费,但是为了能做到实时收消 息,RocketMQ 使用长轮询方式,可以保证消息实时性同 Push 方式一 致。返种长轮询方式类似亍 Web QQ 收収消息机制。 PushConsumer 的consumeFromWhere 参数可以指定在消费者第一次启 动时从哪个位置开始消费,默认情况下CONSUME_FROM_LAST_OFFSET 从 队列最后开始消费,也就是不消费之前的数据。后续启动会从上次消 费的位置开始消费,该参数不再起作用,但是需要注意的是重启 Consumer 有可能造成重复消费数据,因为消费的位置信息并不是实时 保存的,这点非常重要,在业务系统上就要防止重复数据提交,持久 化消费的位置信息间隔时间可以通过参数persistConsumerOffsetInterval 配 置,默认是5 秒,但我自己测试发现似乎无效。 使用PullConsumer 则可以指定某个Offset 开始Pull 消息。 测试代码 import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrently Context ; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyS tatus ; import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcu rrently ; import com.alibaba.rocketmq.client.exception.MQClientException ; import mon.consumer.ConsumeFromWhere ; import mon.message.MessageExt ; import tocol.heartbeat.MessageModel ; import java.util.List ; import java.util.concurrent.TimeUnit ; /** * Created by Administrator on 2016/1/6 0006. */ public class PushConsumerTest { static int i = 0 ; /** * 当前例子是PushConsumer 用法,使用方式给用户感觉是消息从 RocketMQ 服务器推到了应用客户端。br * 但是实际PushConsumer 内部是使用长轮询Pull 方式从Broker 拉消息, 然后再回调用户Listener 方法br */ public static void main(String[] args) throws InterruptedException, MQClientException { /** * 一个应用创建一个Consumer,由应用来维护此对象,可以设置为全局 对象或者单例br * 注意:ConsumerGroupName 需要由应用来保证唯一 */ final DefaultMQPushConsumer consumer = new DefaultMQPushConsumer( TestGroup) ; // consumer.setInstanceName(TestConsumer); consumer.setNamesrvAddr( ipAddress:9876); consumer.setConsumeThreadMin( 1000); consumer.setConsumeThreadMax( 1000); /** * 订阅指定topic
您可能关注的文档
最近下载
- 矿区1∶1万地质填图工作细则.pdf VIP
- 新能源场站宽频振荡就地监测控制方法及装置.pdf VIP
- 航空公司飞机失事应急预案演练脚本.docx VIP
- DLT 5210.1-2021 电力建设施工质量验收规程全套表格必威体育精装版201至400页.docx VIP
- 巨量引擎2024卫浴行业白皮书.pptx
- 湖南郴电国际发展股份有限公司校园招聘模拟试题附带答案详解汇编.docx VIP
- 斯巴拓SBT904D2手持操作说明书(二通道232 485变送器).doc VIP
- 中国成人患者围手术期液体治疗临床实践指南(2025版)解读.pptx
- 15 铁路营业线工程施工安全专项处置预案.doc VIP
- 《小学二年级家长会》课件(五套).pptx
文档评论(0)