- 1、本文档共8页,可阅读全部内容。
- 2、有哪些信誉好的足球投注网站(book118)网站文档一经付费(服务费),不意味着购买了该文档的版权,仅供个人/单位学习、研究之用,不得用于商业用途,未经授权,严禁复制、发行、汇编、翻译或者网络传播等,侵权必究。
- 3、本站所有内容均由合作方或网友上传,本站不对文档的完整性、权威性及其观点立场正确性做任何保证或承诺!文档内容仅供研究参考,付费前请自行鉴别。如您付费,意味着您自己接受本站规则且自行承担风险,本站不退款、不进行额外附加服务;查看《如何避免下载的几个坑》。如果您已付费下载过本站文档,您可以点击 这里二次下载。
- 4、如文档侵犯商业秘密、侵犯著作权、侵犯人身权等,请点击“版权申诉”(推荐),也可以打举报电话:400-050-0827(电话支持时间:9:00-18:30)。
《Redis开发实战》Lettuce实现Stream机制
业务数据采集Stream消息类型主要是结合大数据时代的数据采集而提供的,所以在实际的开发中,所有的消息基本上都来自于业务数据,所以必然要与实际的应用程序进行结合
创建消费组进行消息接收packagecom.yootk.test.stream;
publicclassTestStreamConsumerGroup{//创建消费者
privatestaticfinalLoggerLOGGER=
LoggerFactory.getLogger(TestStreamConsumerGroup.class);
publicstaticfinalStringGROUP_NAME=group:yootk;//组名称
publicstaticfinalStringKEY=stream:yootk;//数据KEY
publicstaticvoidmain(String[]args)throwsException{
RedisAsyncCommandsString,Stringcommands=
RedisConnectionUtil.getConnection().async();//异步命令
commands.flushdb();//清空当前数据库
XReadArgs.StreamOffsetStringgroupOffset=
XReadArgs.StreamOffset.from(KEY,$);//读取偏移量
LOGGER.debug(【消费组】创建消费组:{},commands.xgroupCreate(groupOffset,
GROUP_NAME,XGroupCreateArgs.Builder.mkstream()).get());
XReadArgs.StreamOffsetStringconsumerOffset=
XReadArgs.StreamOffset.from(KEY,);//读取偏移量
XReadArgsblock=XReadArgs.Builder.block(Duration.ZERO).count(2);//读取参数
for(intx=0;x3;x++){//创建消费者线程
newThread(()-{//创建线程
while(true){//不间断消息接收
RedisFutureListStreamMessageString,Stringresult=
commands.xreadgroup(Consumer.from(GROUP_NAME,
Thread.currentThread().getName()),block,consumerOffset);
try{//Stream消费端在每次会批量接收消息
for(StreamMessageString,Stringmessage:result.get()){
LOGGER.info(【{}-消息接收】ID={}、BODY={},
Thread.currentThread().getName(),
message.getId(),message.getBody());//获取消息内容
commands.xack(KEY,GROUP_NAME,message.getId());//消息应答
}
}catch(Exceptione){}
文档评论(0)