小工具      在线工具  汉语词典  css  js  c++  java

RocketMQ与Spring集成,用代码注册与消费(含生产者消费者)

Java 额外说明

收录于:92天前

遇到的问题:消费者的topic注册失败。

解决方案:我发现我的JAVA Web项目中使用的rocketMQ版本是4.3.0,而我使用的rocketMQ服务器版本是4.2.0。然后将项目中的版本统一为4.2.0即可。

<dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.2.0</version>
        </dependency>

XML模式的RocketMQ参见这篇文章


一. 启动RocketMQ环境,如上几篇博客的配置。

1.启动mqnamesrv

cd /Users/sunww/Documents/JAVA/MQ/rocketmq-all-4.2.0-bin-release

nohup sh bin/mqnamesrv &

tail -f ~/logs/rocketmqlogs/namesrv.log

启动成功打印:INFO main - 名称服务器启动成功

2.启动代理

cd /Users/sunww/Documents/JAVA/MQ/rocketmq-all-4.2.0-bin-release

nohup sh bin/mqbroker -n localhost:9876 &

旧版本 nohup sh bin/mqbroker -n localhost:9876 autoCreateTopicEnable=true &

tail -f ~/logs/rocketmqlogs/broker.log // 查看broker日志

启动成功打印: Broker[TF012778.local, 10.50.62.53:10911] 启动成功

3. 关闭服务器

sh bin/mqshutdownbroker //停止broker

sh bin/mqshutdown namesrv //停止名称服务器

见下图:

二。消费端MqConsumer源码

package com.JXWork.util.MQ;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.commons.lang.StringUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

/**
 * 接口说明
 *
 * @author TF12778 2019/10/28 16:48
 */
@Component
public class MqConsumer implements InitializingBean {

    private static final Logger log = LoggerFactory.getLogger(MqConsumer.class);

    @Value("${consumer.group}")
    private String consumerGroup;

    @Value("${producer.connect.list}")
    private String PRODUCER_CONNECT_LIST;

    @Value("${party.topic}")
    private String PARTY_TOPIC;

    @Value("${test.topic}")
    private String TEST_TOPIC;

    private static DefaultMQPushConsumer consumer = null;

    @Override
    public void afterPropertiesSet() throws Exception {
        init();
    }

    private void init() {

        try {

            consumer = new DefaultMQPushConsumer(consumerGroup);
            consumer.setNamesrvAddr(PRODUCER_CONNECT_LIST);
            consumer.setInstanceName("Consumer");
            consumer.subscribe(PARTY_TOPIC, "");
            consumer.subscribe(TEST_TOPIC, "*");

            //注册消费的监听
            //在此监听中消费信息,并返回消费的状态信息
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
            consumer.registerMessageListener(getMessageListenerConcurrently());

            //Launch the consumer instance.
            consumer.start();

            log.info("MsgConsumer init - success.");
        } catch (MQClientException e) {
            e.printStackTrace();
        }
    }

    private MessageListenerConcurrently getMessageListenerConcurrently() {
        return (msgs, context) -> {
            for (MessageExt msg : msgs) {
                log.info("收到的Topic:{},消息内容:{}" ,msg.getTopic() ,new String(msg.getBody()));

                if (PARTY_TOPIC.equals(msg.getTopic())) {

                    handleChangePartyPhoneResult(msg);
                } else if (TEST_TOPIC.equals(msg.getTopic())) {

                    handleChangePartyPhoneResult(msg);
                }
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        };
    }

    private ConsumeConcurrentlyStatus handleChangePartyPhoneResult(MessageExt msg) {
        log.info("handleChangePartyPhoneResult handle,:{}", JSON.toJSONString(msg));
        String msgStr = new String(msg.getBody());
        if (msgStr.length() == 0) {
            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
        }

        // 解析json字符串
        JSONObject jsonObject = JSONObject.parseObject(msgStr);
        if (StringUtils.isBlank(jsonObject.getString("mobilenumber"))
                || StringUtils.isBlank(jsonObject.getString("partyid"))
                || StringUtils.isBlank(jsonObject.getString("event"))
                || !"CHANGE_BINDING".equals(jsonObject.getString("event"))) {
            log.info("MessageListener handleChangePartyPhoneResult 参数异常, jsonObject={}", jsonObject);
            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
        }
        // 具体业务处理
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }

}

三。生产者MqProducer源代码

package com.JXWork.util.MQ;

import com.alibaba.fastjson.JSON;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import java.util.HashMap;
import java.util.Map;

/**
 * 接口说明
 *
 * @author TF12778 2019/10/28 16:43
 */

@Component
public class MqProducer implements InitializingBean {

    private static final Logger LOGGER = LoggerFactory.getLogger(MqProducer.class);

    @Value("${producer.group}")
    private String PRODUCER_GROUP;

    @Value("${producer.connect.list}")
    private String PRODUCER_CONNECT_LIST;

    public static DefaultMQProducer rocketMqProducer = null;

    @Override
    public void afterPropertiesSet() throws Exception {

        try {
            rocketMqProducer = new DefaultMQProducer(PRODUCER_GROUP);
            rocketMqProducer.setNamesrvAddr(PRODUCER_CONNECT_LIST);
            rocketMqProducer.start();
            LOGGER.info("RocketMqProducer init - success.");
        } catch (MQClientException e) {
            LOGGER.info("RocketMqProducer init - error");
        }
    }

    /**
     * 方法 sendMsg 作用描述: rocketmq发送消息方法
     *
     * @param topic      组名
     * @param tagName    同一个topic下的不同 分支,同一个消费者只能取一个组的下的不同的tag分支
     * @param key        保持唯一
     * @param msgBody    消息体
     */
    public static void sendMsgIntime(String topic, String tagName, String key, byte[] msgBody) {//,int level
        Message msg = new Message(topic, tagName, key, msgBody);
        try {
            String result = rocketMqProducer.send(msg).toString();
            LOGGER.info("send rockmq uuid:" + new String(msgBody, "UTF-8") + " " + result);
        } catch (Exception e) {
            LOGGER.error("msgBody:" + new String(msgBody), e);
        }
    }


    public static void sendMsgIntimeNoUUID(String topic, String tagName, String key, byte[] msgBody) {//,int level
        Message msg = new Message(topic, tagName, key, msgBody);
        try {
            String result = rocketMqProducer.send(msg).toString();
            LOGGER.info("生产者 send msg 2 rockmq " + new String(msgBody) + " " + result);
        } catch (Exception e) {
            LOGGER.error("msgBody:" + new String(msgBody), e);
        }
    }

    public static void main(String[] xx) throws Exception {
        try {
            rocketMqProducer = new DefaultMQProducer("ljMQProducerGroup");
            rocketMqProducer.setNamesrvAddr("127.0.0.1:9876");
            rocketMqProducer.start();
            LOGGER.info("RocketMqProducer init - success.");
        } catch (MQClientException e) {
            LOGGER.info("RocketMqProducer init - error");
        }

        Map<String, Object> msg = new HashMap<>();
        msg.put("carNum", "浙A888888");
        msg.put("code", "778882222");

        // consumer.subscribe("partyTopic", "partyTopicTags");
        MqProducer.sendMsgIntime("PARTYTOPIC", "", "",
                (JSON.toJSONString(msg)).getBytes());
    }

}

application.properties内容:

producer.group=ljMQProducerGroup
producer.connect.list=localhost:9876

consumer.group=ljMQConsumerGroup

party.topic=partyTopic
test.topic=testTopic

四。测试

    @Test
    public void mqTest() {

        Map<String, Object> msg = new HashMap<>();
        msg.put("carNum", "浙A888888");
        msg.put("code", "778882222");

        MqProducer.sendMsgIntime("partyTopic", "", "",
                (JSON.toJSONString(msg)).getBytes());

        MqProducer.sendMsgIntime("testTopic", "", "",
                (JSON.toJSONString("Test RocketMQ")).getBytes());
    }

检测结果 :

. . .

相关推荐

额外说明

Springboot 中使用 RestTemplate 向另一个 RESTful api 请求 multipart file 报错

No serializer found for class java.io.FileDescriptor 如第三方的文件上传接口:   /** * 上传文件 * @param request

额外说明

按部门或职位分组统计

《SQL 从入门到精通》专栏目录 第 01 篇 和数据打交道的你,一定要学会 SQL 第 02 篇 在 SQL 的世界里一切都是关系 第 03 篇 使用 SELECT 语句初步探索数据库 第 04 篇 通过查询条件实现数据过滤 第 05 篇 如何使用 S

额外说明

【C#】List<T>类型强制转换

List<UIData> datalist=null; datalist.ConvertAll<object>(input => input as object); 将UIData类型的List强制转成object类型。

额外说明

【微服务】spring webflux响应式编程使用详解

目录 一、webflux介绍 1.1 什么是webflux 1.2 什么是响应式编程 1.3 webflux特点 二、Java9中响应式编程

额外说明

餐中餐(3)Lucene--核心类IndexWriter (Part 2)

文章目录 Abstract Graph: 流程分析: 1.获取索引目录的索引文件锁 2.获取封装后的Directory 3.根据不同的OpenMode执行对应的工作 3.1 create模式下的流程 配置检查 初始化一个新的SegmentInfos对象,

额外说明

Java中相除(/)和取余(%)

链接地址: 及其推荐: https://blog.csdn.net/sinat_41152339/article/details/81054379

额外说明

关于 Burrows-Wheeler 变换和 Lempel-Ziv 解析的一些认识

关于 Burrows-Wheeler 变换和 Lempel-Ziv 解析的一些认识 文章目录 关于 Burrows-Wheeler 变换和 Lempel-Ziv 解析的一些认识 一,Burrows-Wheeler Transform 1. 概述 2. 图

额外说明

Java Web实训项目:西蒙购物网(中)

文章目录 四、实现步骤 (八)控制层(XXXServlet) 1、登录处理类LoginServlet 2、注销处理类LogoutServlet 3、注册处理类RegisterServlet 4、显示类别处理类ShowCategoryServlet 5、显

额外说明

每个人都应该了解的WordPress Admin Bar

WordPress 附带一个管理栏,为登录用户显示方便的快捷方式。即使在查看网站的前端时,这也使您可以快速访问管理任务。在本文中,我们将解释什么是 WordPress 管理栏,以及如何根据自己的需要使用或自定义它。 WordPress 附带一个管理栏,为

额外说明

Java 表驱动(映射)函数替换 if...else 代码

Map+函数式接口方​​法取代了很多if...else判断。 什么是表驱动方法? 是一种不使用逻辑语句(if 和 case)从表中查找信息的编程模式。事实上,任何可以通过逻辑语句选择的东西都​​可以通过查找表选择。对于简单的情况,使用逻辑语句更容易、更直

ads via 小工具