0510_【掌握】Lettuce实现Stream机制.pptx

0510_【掌握】Lettuce实现Stream机制.pptx

  1. 1、本文档共8页,可阅读全部内容。
  2. 2、有哪些信誉好的足球投注网站(book118)网站文档一经付费(服务费),不意味着购买了该文档的版权,仅供个人/单位学习、研究之用,不得用于商业用途,未经授权,严禁复制、发行、汇编、翻译或者网络传播等,侵权必究。
  3. 3、本站所有内容均由合作方或网友上传,本站不对文档的完整性、权威性及其观点立场正确性做任何保证或承诺!文档内容仅供研究参考,付费前请自行鉴别。如您付费,意味着您自己接受本站规则且自行承担风险,本站不退款、不进行额外附加服务;查看《如何避免下载的几个坑》。如果您已付费下载过本站文档,您可以点击 这里二次下载
  4. 4、如文档侵犯商业秘密、侵犯著作权、侵犯人身权等,请点击“版权申诉”(推荐),也可以打举报电话:400-050-0827(电话支持时间:9:00-18:30)。
查看更多

《Redis开发实战》Lettuce实现Stream机制

业务数据采集Stream消息类型主要是结合大数据时代的数据采集而提供的,所以在实际的开发中,所有的消息基本上都来自于业务数据,所以必然要与实际的应用程序进行结合

创建消费组进行消息接收packagecom.yootk.test.stream;

publicclassTestStreamConsumerGroup{//创建消费者

privatestaticfinalLoggerLOGGER=

LoggerFactory.getLogger(TestStreamConsumerGroup.class);

publicstaticfinalStringGROUP_NAME=group:yootk;//组名称

publicstaticfinalStringKEY=stream:yootk;//数据KEY

publicstaticvoidmain(String[]args)throwsException{

RedisAsyncCommandsString,Stringcommands=

RedisConnectionUtil.getConnection().async();//异步命令

commands.flushdb();//清空当前数据库

XReadArgs.StreamOffsetStringgroupOffset=

XReadArgs.StreamOffset.from(KEY,$);//读取偏移量

LOGGER.debug(【消费组】创建消费组:{},commands.xgroupCreate(groupOffset,

GROUP_NAME,XGroupCreateArgs.Builder.mkstream()).get());

XReadArgs.StreamOffsetStringconsumerOffset=

XReadArgs.StreamOffset.from(KEY,);//读取偏移量

XReadArgsblock=XReadArgs.Builder.block(Duration.ZERO).count(2);//读取参数

for(intx=0;x3;x++){//创建消费者线程

newThread(()-{//创建线程

while(true){//不间断消息接收

RedisFutureListStreamMessageString,Stringresult=

commands.xreadgroup(Consumer.from(GROUP_NAME,

Thread.currentThread().getName()),block,consumerOffset);

try{//Stream消费端在每次会批量接收消息

for(StreamMessageString,Stringmessage:result.get()){

LOGGER.info(【{}-消息接收】ID={}、BODY={},

Thread.currentThread().getName(),

message.getId(),message.getBody());//获取消息内容

commands.xack(KEY,GROUP_NAME,message.getId());//消息应答

}

}catch(Exceptione){}

文档评论(0)

学海无涯而人有崖 + 关注
实名认证
内容提供者

教师资格证、人力资源管理师持证人

该用户很懒,什么也没介绍

领域认证该用户于2023年06月11日上传了教师资格证、人力资源管理师

1亿VIP精品文档

相关文档