java怎么拦截某个对象
360
2022-09-12
Jstorm与RocketMQ整合
如果是经常关注阿里巴巴的朋友们,看到我这篇博客的题目,就知道我在参加今年的中间件比赛。 好了,废话不说,开始了。 首先我们知道,rocketmq的consumer有两种,一种是DefaultMQPushConsumer另外一个是DefaultMQPullConsumer 两个有什么区别呢? 对我们自己写的代码来说,使用push就是被动接受mq的消息,而使用pull就是需要主动的去mq上拉取消息。 那么再与jstorm集成的时候,选择哪个呢? 我最开始选择的是pull,后来遇到各种问题,放弃了。 选择push才是真爱呀。 具体怎么做呢? 在spot的open里初始化DefaultMQPushConsumer,registerMessageListener的时候填入自己,当然我们的spot实现了MessageListenerConcurrently, 然后在spot里面的consumeMessage里面写自己的逻辑,合适的时候,用collector发射消息就是了。 能上点干货么?
public class EmitPaymentSpot extends BaseRichSpout implements MessageListenerConcurrently{ private static final long serialVersionUID = -3085994102089532269L; private SpoutOutputCollector collector; private transient DefaultMQPushConsumer consumer; @SuppressWarnings("rawtypes") public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { log.error("init DefaultMQPushConsumer"); consumer = new DefaultMQPushConsumer(RaceConfig.MetaConsumerGroup); // consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.setNamesrvAddr("ip:port") try { consumer.subscribe(RaceConfig.MqTmallTradeTopic, "*"); consumer.subscribe(RaceConfig.MqTaobaoTradeTopic, "*"); consumer.subscribe(RaceConfig.MqPayTopic, "*"); } catch (MQClientException e) { e.printStackTrace(); } consumer.registerMessageListener(this); try { consumer.start(); } catch (MQClientException e) { e.printStackTrace(); } log.error("Consumer Started."); this.collector = collector; } @Override public void nextTuple() { //do nothing } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { //... } @Override public ConsumeConcurrentlyStatus consumeMessage(List
当然还有第二种方式,是官方推荐的。
其实我觉得和我的方式差不多..
就是在生成consumer的时候使用工厂模式而已。
相关的代码,比较麻烦大家见
https://github.com/alibaba/jstorm/blob/master/jstorm-utility/jstorm-rocket-mq/src/main/java/com/alibaba/aloha/meta/MetaSpout.java
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。
发表评论
暂时没有评论,来抢沙发吧~