小工具      在线工具  汉语词典  css  js  c++  java

RocketMQ与Spring集成,用代码注册与消费(含生产者消费者)

Java 额外说明

收录于:42天前

遇到的问题:消费者的topic注册失败。

解决方案:我发现我的JAVA Web项目中使用的rocketMQ版本是4.3.0,而我使用的rocketMQ服务器版本是4.2.0。然后将项目中的版本统一为4.2.0即可。

<dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.2.0</version>
        </dependency>

XML模式的RocketMQ参见这篇文章


一. 启动RocketMQ环境,如上几篇博客的配置。

1.启动mqnamesrv

cd /Users/sunww/Documents/JAVA/MQ/rocketmq-all-4.2.0-bin-release

nohup sh bin/mqnamesrv &

tail -f ~/logs/rocketmqlogs/namesrv.log

启动成功打印:INFO main - 名称服务器启动成功

2.启动代理

cd /Users/sunww/Documents/JAVA/MQ/rocketmq-all-4.2.0-bin-release

nohup sh bin/mqbroker -n localhost:9876 &

旧版本 nohup sh bin/mqbroker -n localhost:9876 autoCreateTopicEnable=true &

tail -f ~/logs/rocketmqlogs/broker.log // 查看broker日志

启动成功打印: Broker[TF012778.local, 10.50.62.53:10911] 启动成功

3. 关闭服务器

sh bin/mqshutdownbroker //停止broker

sh bin/mqshutdown namesrv //停止名称服务器

见下图:

二。消费端MqConsumer源码

package com.JXWork.util.MQ;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.commons.lang.StringUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

/**
 * 接口说明
 *
 * @author TF12778 2019/10/28 16:48
 */
@Component
public class MqConsumer implements InitializingBean {

    private static final Logger log = LoggerFactory.getLogger(MqConsumer.class);

    @Value("${consumer.group}")
    private String consumerGroup;

    @Value("${producer.connect.list}")
    private String PRODUCER_CONNECT_LIST;

    @Value("${party.topic}")
    private String PARTY_TOPIC;

    @Value("${test.topic}")
    private String TEST_TOPIC;

    private static DefaultMQPushConsumer consumer = null;

    @Override
    public void afterPropertiesSet() throws Exception {
        init();
    }

    private void init() {

        try {

            consumer = new DefaultMQPushConsumer(consumerGroup);
            consumer.setNamesrvAddr(PRODUCER_CONNECT_LIST);
            consumer.setInstanceName("Consumer");
            consumer.subscribe(PARTY_TOPIC, "");
            consumer.subscribe(TEST_TOPIC, "*");

            //注册消费的监听
            //在此监听中消费信息,并返回消费的状态信息
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
            consumer.registerMessageListener(getMessageListenerConcurrently());

            //Launch the consumer instance.
            consumer.start();

            log.info("MsgConsumer init - success.");
        } catch (MQClientException e) {
            e.printStackTrace();
        }
    }

    private MessageListenerConcurrently getMessageListenerConcurrently() {
        return (msgs, context) -> {
            for (MessageExt msg : msgs) {
                log.info("收到的Topic:{},消息内容:{}" ,msg.getTopic() ,new String(msg.getBody()));

                if (PARTY_TOPIC.equals(msg.getTopic())) {

                    handleChangePartyPhoneResult(msg);
                } else if (TEST_TOPIC.equals(msg.getTopic())) {

                    handleChangePartyPhoneResult(msg);
                }
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        };
    }

    private ConsumeConcurrentlyStatus handleChangePartyPhoneResult(MessageExt msg) {
        log.info("handleChangePartyPhoneResult handle,:{}", JSON.toJSONString(msg));
        String msgStr = new String(msg.getBody());
        if (msgStr.length() == 0) {
            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
        }

        // 解析json字符串
        JSONObject jsonObject = JSONObject.parseObject(msgStr);
        if (StringUtils.isBlank(jsonObject.getString("mobilenumber"))
                || StringUtils.isBlank(jsonObject.getString("partyid"))
                || StringUtils.isBlank(jsonObject.getString("event"))
                || !"CHANGE_BINDING".equals(jsonObject.getString("event"))) {
            log.info("MessageListener handleChangePartyPhoneResult 参数异常, jsonObject={}", jsonObject);
            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
        }
        // 具体业务处理
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }

}

三。生产者MqProducer源代码

package com.JXWork.util.MQ;

import com.alibaba.fastjson.JSON;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import java.util.HashMap;
import java.util.Map;

/**
 * 接口说明
 *
 * @author TF12778 2019/10/28 16:43
 */

@Component
public class MqProducer implements InitializingBean {

    private static final Logger LOGGER = LoggerFactory.getLogger(MqProducer.class);

    @Value("${producer.group}")
    private String PRODUCER_GROUP;

    @Value("${producer.connect.list}")
    private String PRODUCER_CONNECT_LIST;

    public static DefaultMQProducer rocketMqProducer = null;

    @Override
    public void afterPropertiesSet() throws Exception {

        try {
            rocketMqProducer = new DefaultMQProducer(PRODUCER_GROUP);
            rocketMqProducer.setNamesrvAddr(PRODUCER_CONNECT_LIST);
            rocketMqProducer.start();
            LOGGER.info("RocketMqProducer init - success.");
        } catch (MQClientException e) {
            LOGGER.info("RocketMqProducer init - error");
        }
    }

    /**
     * 方法 sendMsg 作用描述: rocketmq发送消息方法
     *
     * @param topic      组名
     * @param tagName    同一个topic下的不同 分支,同一个消费者只能取一个组的下的不同的tag分支
     * @param key        保持唯一
     * @param msgBody    消息体
     */
    public static void sendMsgIntime(String topic, String tagName, String key, byte[] msgBody) {//,int level
        Message msg = new Message(topic, tagName, key, msgBody);
        try {
            String result = rocketMqProducer.send(msg).toString();
            LOGGER.info("send rockmq uuid:" + new String(msgBody, "UTF-8") + " " + result);
        } catch (Exception e) {
            LOGGER.error("msgBody:" + new String(msgBody), e);
        }
    }


    public static void sendMsgIntimeNoUUID(String topic, String tagName, String key, byte[] msgBody) {//,int level
        Message msg = new Message(topic, tagName, key, msgBody);
        try {
            String result = rocketMqProducer.send(msg).toString();
            LOGGER.info("生产者 send msg 2 rockmq " + new String(msgBody) + " " + result);
        } catch (Exception e) {
            LOGGER.error("msgBody:" + new String(msgBody), e);
        }
    }

    public static void main(String[] xx) throws Exception {
        try {
            rocketMqProducer = new DefaultMQProducer("ljMQProducerGroup");
            rocketMqProducer.setNamesrvAddr("127.0.0.1:9876");
            rocketMqProducer.start();
            LOGGER.info("RocketMqProducer init - success.");
        } catch (MQClientException e) {
            LOGGER.info("RocketMqProducer init - error");
        }

        Map<String, Object> msg = new HashMap<>();
        msg.put("carNum", "浙A888888");
        msg.put("code", "778882222");

        // consumer.subscribe("partyTopic", "partyTopicTags");
        MqProducer.sendMsgIntime("PARTYTOPIC", "", "",
                (JSON.toJSONString(msg)).getBytes());
    }

}

application.properties内容:

producer.group=ljMQProducerGroup
producer.connect.list=localhost:9876

consumer.group=ljMQConsumerGroup

party.topic=partyTopic
test.topic=testTopic

四。测试

    @Test
    public void mqTest() {

        Map<String, Object> msg = new HashMap<>();
        msg.put("carNum", "浙A888888");
        msg.put("code", "778882222");

        MqProducer.sendMsgIntime("partyTopic", "", "",
                (JSON.toJSONString(msg)).getBytes());

        MqProducer.sendMsgIntime("testTopic", "", "",
                (JSON.toJSONString("Test RocketMQ")).getBytes());
    }

检测结果 :

. . .

相关推荐

额外说明

springboot环境下将对象注入到工具类中或者静态方法中将对象注入到工具类中

众所周知工具类一般为static的,如果我们在工具类中需要操作数据库,那么spring环境下需要注入service或者dao将对象交给spring进行管控,但是静态的的对象是无法被注入的,代码可能会出现各种空指针。需要采用@PostConstruct注解

额外说明

设计模式 观察者模式

前言: 观察者模式(Observer Pattern)定义了一种一对多的依赖关系,让多个观察者对象同时监听某一个主题对象。这个主题对象在状态发生变化时,会通知所有观察者对象,使他们能够自己更新自己。 一.观察者模式简介: 观察者模式(Observer P

额外说明

拉姆达表达式

Lambda表达式是一种匿名函数式编程的方式,在某些编程语言中,比如Python,Java,C++11等,它可以作为参数传递给其他函数或方法。它的语法通常包含一个箭头”->”符号,如下所示: (parameter1, parameter2,...) ->

额外说明

JVM常见问题排查的思路汇总

参考文档: 一次JVM爆内存分析 一次JVM爆内存分析 - 持续疯长,往天那边去 - ITeye博客 部分内容提要: 1.应用没有死,但是响应非常慢,通过jstat查看到s0/s1 eden old区都100%了;确认JVM是内存爆了;系统GC时间非常高

额外说明

java判断当前时间是否在某个时间区间内(可精确到毫秒)

直接上代码,不做过多解释了 package testJava; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Date; pu

额外说明

【Rust 基础篇】Rust 属性宏:定制你的代码

导言 Rust是一门现代的、安全的系统级编程语言,它提供了丰富的元编程特性,其中属性宏(Attribute Macros)是其中之一。属性宏允许开发者在代码上方添加自定义的属性,并对代码进行定制化处理。在本篇博客中,我们将深入探讨Rust中的属性宏,包括

额外说明

MQTT协议简介

一、mqtt协议简介 1.1 MQTT协议特点 1.2 发布和订阅 1.3 QoS(Quality of Service levels) QoS 0 QoS 1 QoS 2 二、MQTT 数据包结构 2.1 MQTT固定头 ( FixedHeader )

额外说明

创建数据库链接(dblink)步骤

1.CREATE DATABASE LINK数据库链接名CONNECT TO 用户名 IDENTIFIED BY 密码 USING ‘数据库连接字符串’;   可以像这样:    create database link orcl2   connect

额外说明

.Net下的MSMQ的同步异步调用

  一、MSMQ简介 MSMQ(微软消息队列)是Windows操作系统中消息应用程序的基础,是用于创建分布式、松散连接的消息通讯应用程序的开发工具。消息队列 和电子邮件有着很多相似处,他们都包含多个属性,用于保存消息,消息类型中都指出发送者和接收者的地址

额外说明

解决Windows系统文件夹filemgmt.dll文件丢失找不到的问题

其实很多用户玩单机游戏或者安装软件的时候就出现过这种问题,如果是新手第一时间会认为是软件或游戏出错了,其实并不是这样,其主要原因就是你电脑系统的该dll文件丢失了或没有安装一些系统软件平台所需要的动态链接库,这时你可以下载这个filemgmt.dll文件

ads via 小工具