小工具      在线工具  汉语词典  css  js  c++  java

ActiveMQ消息中间件的点对点模式点对点消息队列

* 消息消费者从队列中取出消息并消费该消息

* 消息被消费后,不再存储在队列中,因此消息消费者无法消费已经消费过的消息。

* 队列支持多个消费者,但对于一个消费者来说,只有一个消费者可以消费这条消息,其他消费者不能消费这条消息。

* 当消费者不存在时,消息会被保存,直到有消费者消费该消息(持久化)

ActiveMQ环境配置地址:Centos下ActiveMQ安装

ActiveMQ 版本:apache-activemq-5.16.5

SpringBoot集成ActiveMQ生产端的pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.5.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

    <groupId>boot.example.queue.provider</groupId>
    <artifactId>boot-example-queue-demo-provider-2.0.5</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>boot-example-queue-demo-provider-2.0.5</name>
    <description>Demo project for Spring Boot</description>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>

        <dependency>
            <groupId>boot.example.demo.entity</groupId>
            <artifactId>boot-example-demo-entity-2.0.5</artifactId>
            <version>0.0.1-SNAPSHOT</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!-- activeMQ依赖组件 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-activemq</artifactId>
            <exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <!-- spring.activemq.pool.enabled=true -->
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-pool</artifactId>
            <version>5.16.5</version>
        </dependency>
        <dependency>
            <groupId>io.springfox</groupId>
            <artifactId>springfox-swagger2</artifactId>
            <version>2.9.2</version>
        </dependency>
        <dependency>
            <groupId>com.github.xiaoymin</groupId>
            <artifactId>swagger-bootstrap-ui</artifactId>
            <version>1.9.2</version>
        </dependency>

<!--        <dependency>-->
<!--            <groupId>org.apache.activemq</groupId>-->
<!--            <artifactId>activemq-all</artifactId>-->
<!--            <version>5.16.5</version>-->
<!--        </dependency>-->

    </dependencies>

    <build>
        <plugins>
            <!-- 打包成一个可执行jar -->
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <executions>
                    <execution>
                        <goals>
                            <goal>repackage</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

</project>

应用程序属性


server.port=8041


spring.activemq.broker-url=tcp://127.0.0.1:61616
spring.activemq.user=admin
spring.activemq.password=admin
spring.activemq.in-memory=false
spring.activemq.packages.trust-all=true
#true连接池
spring.activemq.pool.enabled=true
spring.activemq.pool.max-connections=6
spring.activemq.pool.idle-timeout=30000
spring.activemq.pool.expire-timeout=0
spring.jms.pub-sub-domain=false

启动类应用程序队列提供者

package boot.example.queue.provider;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.jms.annotation.EnableJms;

/**
 * 蚂蚁舞
 */
@SpringBootApplication
/**
 * 启动消息队列
 */
@EnableJms
public class AppQueueProvider {
    public static void main( String[] args )
    {
        SpringApplication.run(AppQueueProvider.class, args);
        System.out.println( "Hello World!" );
    }
    
}

配置一个queue ActiveMQ队列配置

package boot.example.queue.provider.config;


import org.apache.activemq.command.ActiveMQQueue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.jms.Queue;


/**
 *  蚂蚁舞
 */
@Configuration
public class ActiveMQQueueConfig {

    public static final String defaultQueue = "myw_queue";

    @Bean
    public Queue queue() {
        return new ActiveMQQueue(defaultQueue);
    }

}


启动默认提供者服务

package boot.example.queue.provider.service;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.stereotype.Service;

import javax.jms.Queue;

/**
 *  蚂蚁舞
 */
@Service
public class BootDefaultProviderService {

    @Autowired
    private Queue queue;

    @Autowired
    private JmsMessagingTemplate jmsMessagingTemplate;

    public void sendStringDefaultQueue(String message) {
        jmsMessagingTemplate.convertAndSend(queue, message);
    }


}

其他模式启动服务

package boot.example.queue.provider.service;

import boot.example.queue.entity.BootProvider;
import org.apache.activemq.command.ActiveMQQueue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.stereotype.Service;

import java.util.List;

/**
 *  蚂蚁舞
 */
@Service
public class BootProviderService {

    @Autowired
    private JmsMessagingTemplate jmsMessagingTemplate;

    /**
     * 发送字符串消息队列
     *
     * @param queueName 队列名称
     * @param message   字符串
     */
    public void sendStringQueue(String queueName, String message) {
        jmsMessagingTemplate.convertAndSend(new ActiveMQQueue(queueName), message);
    }

    /**
     * 发送字符串集合消息队列
     *
     * @param queueName 队列名称
     * @param list      字符串集合
     */
    public void sendStringListQueue(String queueName, List<String> list) {
        jmsMessagingTemplate.convertAndSend(new ActiveMQQueue(queueName), list);
    }

    /**
     * 发送对象消息队列
     *
     * @param queueName 队列名称
     * @param obj       对象
     */
    public void sendObjQueue(String queueName, BootProvider obj) {
        jmsMessagingTemplate.convertAndSend(new ActiveMQQueue(queueName), obj);
    }

    /**
     * 发送对象集合消息队列
     *
     * @param queueName 队列名称
     * @param objList   对象集合
     */
    public void sendObjListQueue(String queueName, List<BootProvider> objList) {
        jmsMessagingTemplate.convertAndSend(new ActiveMQQueue(queueName), objList);
    }

}

使用招摇配置

package boot.example.queue.provider.config;

import com.google.common.base.Predicates;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import springfox.documentation.builders.ApiInfoBuilder;
import springfox.documentation.builders.PathSelectors;
import springfox.documentation.builders.RequestHandlerSelectors;
import springfox.documentation.service.ApiInfo;
import springfox.documentation.spi.DocumentationType;
import springfox.documentation.spring.web.plugins.Docket;
import springfox.documentation.swagger2.annotations.EnableSwagger2;

/**
 *  蚂蚁舞
 */
@Configuration
@EnableSwagger2
public class SwaggerConfig {

    @Bean
    public Docket createRestApi(){
        return new Docket(DocumentationType.SWAGGER_2).apiInfo(apiInfo()).select()
                .apis(RequestHandlerSelectors.any()).paths(PathSelectors.any())
                .paths(Predicates.not(PathSelectors.regex("/error.*")))
                .paths(PathSelectors.regex("/.*"))
                .build().apiInfo(apiInfo());
    }

    private ApiInfo apiInfo(){
        return new ApiInfoBuilder()
                .title("demo")
                .description("demo接口")
                .version("0.01")
                .build();
    }

    /**
     * http://localhost:XXXX/doc.html  地址和端口根据实际项目查看
     */


}

BootQueueDefaultProviderController

package boot.example.queue.provider.controller;

import boot.example.queue.provider.service.BootDefaultProviderService;
import org.springframework.web.bind.annotation.*;

import javax.annotation.Resource;

/**
 *     蚂蚁舞
 */
@RestController
@RequestMapping(value="/provider")
public class BootQueueDefaultProviderController {

    @Resource
    private BootDefaultProviderService providerService;

    @PostMapping(value = "/sendStringDefaultQueue")
    @ResponseBody
    public String sendStringDefaultQueue(@RequestParam(name="message",required = true) String message) throws Exception {
        providerService.sendStringDefaultQueue(message);
        return "success";
    }





}

BootQueueProvider控制器

package boot.example.queue.provider.controller;

import boot.example.queue.entity.BootProvider;
import boot.example.queue.provider.service.BootProviderService;
import org.springframework.web.bind.annotation.*;
import javax.annotation.Resource;
import java.util.List;

/**
 *     蚂蚁舞
 */
@RestController
@RequestMapping(value="/provider")
public class BootQueueProviderController {

    //  指定queue
    public static final String stringQueue = "stringQueue";

    //  指定list<String>
    public static final String stringListQueue = "stringListQueue";

    //  指定Object
    public static final String objQueue = "objQueue";

    //  指定List<Object>
    public static final String objListQueue = "objListQueue";

    @Resource
    private BootProviderService providerService;

    @PostMapping(value = "/sendStringQueue")
    @ResponseBody
    public String sendStringQueue(@RequestParam(name="message",required = true) String message) throws Exception {
        providerService.sendStringQueue(stringQueue, message);
        return "success";
    }

    @PostMapping(value = "/sendStringListQueue")
    @ResponseBody
    public String sendStringListQueue(@RequestBody List<String> list) throws Exception {
        if(list.isEmpty()){return "fail";}
        providerService.sendStringListQueue(stringListQueue, list);
        return "success";
    }

    @PostMapping(value = "/sendObjQueue")
    @ResponseBody
    public String sendObjQueue(@RequestBody BootProvider bootProvider) throws Exception {
        if(bootProvider == null){return "fail";}
        providerService.sendObjQueue(objQueue, bootProvider);
        return "success";
    }

    @PostMapping(value = "/sendObjListQueue")
    @ResponseBody
    public String sendObjListQueue(@RequestBody  List<BootProvider> list) throws Exception {
        if(list.isEmpty()){return "fail";}
        providerService.sendObjListQueue(objListQueue, list);
        return "success";
    }





}

BootProvider对象

package boot.example.queue.entity;

import java.io.Serializable;
import java.util.Date;

/**
 *  用在activeMq消息,必须保证package一致,不然序列化后反序列化要出错
 *  蚂蚁舞
 */
public class BootProvider implements Serializable {

    private int id;

    private String name;

    private Date date = new Date();

    public BootProvider() {
    }

    public BootProvider(int id, String name) {
        this.id = id;
        this.name = name;
    }

    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public Date getDate() {
        return date;
    }

    public void setDate(Date date) {
        this.date = date;
    }

    @Override
    public String toString() {
        return "BootProvider{" +
                "id=" + id +
                ", name='" + name + '\'' +
                ", date=" + date +
                '}';
    }
}

代码结构

├─boot-example-demo-entity-2.0.5
│  │  pom.xml
│  │  
│  ├─src
│  │  └─main
│  │      └─java
│  │          └─boot
│  │              └─example
│  │                  └─queue
│  │                      └─entity
│  │                              BootProvider.java
│  │                              
├─boot-example-queue-demo-provider-2.0.5
│  │  pom.xml
│  │  
│  ├─src
│  │  ├─main
│  │  │  ├─java
│  │  │  │  └─boot
│  │  │  │      └─example
│  │  │  │          └─queue
│  │  │  │              └─provider
│  │  │  │                  │  AppQueueProvider.java
│  │  │  │                  │  
│  │  │  │                  ├─config
│  │  │  │                  │      ActiveMQQueueConfig.java
│  │  │  │                  │      SwaggerConfig.java
│  │  │  │                  │      
│  │  │  │                  ├─controller
│  │  │  │                  │      BootQueueDefaultProviderController.java
│  │  │  │                  │      BootQueueProviderController.java
│  │  │  │                  │      
│  │  │  │                  └─service
│  │  │  │                          BootDefaultProviderService.java
│  │  │  │                          BootProviderService.java
│  │  │  │                          
│  │  │  └─resources
│  │  │          application.properties
│  │  │          
│  │  └─test
│  │      └─java
│  │          └─boot
│  │              └─example
│  │                  └─queue
│  │                      └─provider
│  │                              ActiveMQQueueConsumer.java
│  │                              ActiveMQQueueProvider.java
│  │                              AppQueueProviderTest.java

SpringBoot中集成了ActiveMQ代码演示。 (ActiveMQ启动后)程序访问开始

http://localhost:8041/doc.html

测试发送,可以看到发送成功。

ActiveMQ与SpringBoot集成默认采用持久事务模式,因此可以开箱即用。没有必要花时间去了解更多关于ActiveMQ的知识。所需要的只是能够将消息从一个服务发送到另一个服务,并且 ActiveMQ 支持事务和持久性。

典型案例演示测试

ActiveMQQueueProvider(生产端)

package boot.example.queue.provider;


import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 *  蚂蚁舞
 */
public class ActiveMQQueueProvider {

    public static void main(String[] args) throws Exception{
        //1 创建连接工厂
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://127.0.0.1:61616");
        //2 创建连接
        Connection connection = connectionFactory.createConnection();
        //3 打开连接
        connection.start();
        //4 创建会话
        //第一个参数:是否开启事务 transacted true开启 false不开启
        //第二个参数:消息是否自动确认  acknowledgeMode
        //当transacted为true时,acknowledgeMode会默认Session.SESSION_TRANSACTED
        Session session = connection.createSession(true, Session.DUPS_OK_ACKNOWLEDGE);
        //创建队列
        Queue queue = session.createQueue("myw_queue_test");
        //5 创建生产者
        MessageProducer producer = session.createProducer(queue);
        //6 创建消息
        Message message = session.createTextMessage("myyhtw蚂蚁也会跳舞");
        //7 发送消息
        producer.send(message);

        //8 关闭消息
        session.commit();  // 开启事务必须提交这个
        producer.close();
        session.close();
        connection.close();
    }
}

ActiveMQQueueConsumer(消费者)

package boot.example.queue.provider;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class ActiveMQQueueConsumer {
    public static void main(String[] args) throws Exception {
        //创建连接工厂
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://127.0.0.1:61616");
        //创建连接
        Connection connection = connectionFactory.createConnection();
        //开启连接
        connection.start();
        // 创建会话
        // transacted 如果设置true,操作消息队列后,必须使用 session.commit() 如果设置false,操作消息队列后,不使用session.commit();
        // acknowledgeMode
        // 1-Session.AUTO_ACKNOWLEDGE 自动应答
        // 2-Session.CLIENT_ACKNOWLEDGE 手动应答
        // 3-Session.DUPS_OK_ACKNOWLEDGE 延迟应答
        // 0-Session.SESSION_TRANSACTED 事务
        // 当transacted为true时,acknowledgeMode会默认Session.SESSION_TRANSACTED
        Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
        //创建队列
        Queue queue = session.createQueue("myw_queue_test");
        //创建消费者
        MessageConsumer consumer = session.createConsumer(queue);
        while(true){
            //失效时间,如果10秒内没有收到新的消息,说明没有消息存在,此时可以退出当前循环
            TextMessage message = (TextMessage) consumer.receive(10000);
            if(message!=null){
                System.out.println(message.getText());
                //message.acknowledge();
            } else {
                break;
            }
        }

        //关闭连接
        session.commit();
        session.close();
        connection.close();
    }
}



ActiveMQQueueConsumerListener(消费者监听器)

package boot.example.queue.provider;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class ActiveMQQueueConsumerListener {


    public static void main(String[] args) throws Exception {
        //创建连接工厂
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://127.0.0.1:61616");
        //创建连接
        Connection connection = connectionFactory.createConnection();
        //开启连接
        connection.start();
        // 创建会话
        // transacted 如果设置true,操作消息队列后,必须使用 session.commit() 如果设置false,操作消息队列后,不使用session.commit();
        // acknowledgeMode
        // 1-Session.AUTO_ACKNOWLEDGE 自动应答
        // 2-Session.CLIENT_ACKNOWLEDGE 手动应答
        // 3-Session.DUPS_OK_ACKNOWLEDGE 延迟应答
        // 0-Session.SESSION_TRANSACTED 事务
        // 当transacted为true时,acknowledgeMode会默认Session.SESSION_TRANSACTED
        Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
        //创建队列
        Queue queue = session.createQueue("myw_queue_test");
        //创建消费者
        MessageConsumer consumer = session.createConsumer(queue);

        //注册消息监听器
        consumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                try {
                    TextMessage msg = (TextMessage)message;
                    System.out.println(msg.getText());
                    message.acknowledge();
                } catch (JMSException e) {
                    throw new RuntimeException(e);
                }
            }
        });

        while (true){}
//        consumer.close();
//        session.close();
//        connection.close();
    }

}

源码案例地址: SpringBoot+ActiveMQ源码案例

. . .

相关推荐

额外说明

mysqld:无法将目录更改为“/usr/local/mysql/data/”(错误代码:13 - 权限被拒绝)错误处理

报错提示: [root@hecs-399223 bin]# ./mysqld --initialize --user=mysql --basedir=/usr/local/mysql --datadir=/usr/local/mysql/data mys

额外说明

只需要这三步,用Java也能图片识别

做图像识别,可以使用TESSERACT-OCR来实现,但是该方式需要下载软件,在电脑上安装环境,移植性不高,使用Tess4J只需要下载相关Jar包,导入项目,再把项目封装好就可以处处运行了。 首先说一下我使用的电脑和JDK版本 电脑:MacBook JD

额外说明

1.spark standalone环境安装

概述 环境是spark 3.2.4 hadoop版本 3.2.4,所以官网下载的包为 spark-3.2.4-bin-hadoop3.2.tgz 在具体安装部署之前,需要先下载Spark的安装包,进到 spark的官网,点击download按钮 使用Sp

额外说明

nodejs操作mongodb之七(mongoose聚合函数的使用)

一、使用聚合函数多表查询和mongodb类似的 1、定义schema const mongoose = require('./db'); const OrderSchema = mongoose.Schema({ order_id: Stri

额外说明

《天天数学》连载30:一月三十日

格言作者:柏拉图(Plato,Πλατών,公元前427年—公元前347年),是古希腊伟大的哲学家,也是整个西方文化中最伟大的哲学家和思想家之一。柏拉图和老师苏格拉底,学生亚里士多德并称为希腊三贤。他创造或发展的概念包括:柏拉图思想、柏拉图主义、柏拉图式

额外说明

Vite 使用 ?raw 将 vue文件转为字符串(两种方式)

一. 前言 学习 vite 时,找到插件 awesome-vite,顺着找,可以看到插件 vite-plugin-raw 起作用。 官方翻译该插件可将任何类型的文件转换为字符串。  二. 使用 这里仅对 .vue 文件做转换。 使用场景,如使用 code

额外说明

SpringBoot的三大核心组件,你了解多少?

在本文中,我们将详细介绍SpringBoot的三大核心组件:Spring Boot Starter、Spring Boot Autoconfigure和Spring Boot Actuator。我们将分别讨论这三个组件的作用、使用方法和优势,以帮助您更好

额外说明

低代码开发之开源数据可视化分析平台datagear

概述 DataGear是一款开源免费的数据可视化分析平台,自由制作任何您想要的数据看板,支持接入SQL、CSV、Excel、HTTP接口、JSON等多种数据源。 系统基于Spring Boot、Jquery、ECharts等技术开发。 系统特点 友好接入

额外说明

打开软件提示找不到jschs.dll文件如何解决?

其实很多用户玩单机游戏或者安装软件的时候就出现过这种问题,如果是新手第一时间会认为是软件或游戏出错了,其实并不是这样,其主要原因就是你电脑系统的该dll文件丢失了或没有安装一些系统软件平台所需要的动态链接库,这时你可以下载这个jschs.dll文件(挑选

额外说明

0基础教你玩转手机摄影:手把手实操讲解,好学易懂!

新书名:手机摄影入门指南:从零开始,轻松掌握实用技巧! 文本: 在当今科技发达的时代,手机摄影已经成为人们记录生活、表达创意的重要方式之一。然而,对于许多初学者来说,手机摄影似乎有点棘手。但别担心!在这篇文章中,我将带领大家逐步学习手机摄影的基础知识和实

ads via 小工具