T O P

[资源分享]     ActiveMQ 入门实战(3)--SpringBoot 整合 ActiveMQ

  • By - 楼主

  • 2021-09-12 12:00:08
  • Spring JMS 是基于 Spring 框架的 JMS 消息解决方案,提供模板化发送和接收消息的抽象层;本文主要介绍在 SpringBoot 中用 Spring JMS 操作 ActiveMQ,文中使用到的软件版本:ActiveMQ 5.16.2、SpringBoot 2.4.9、Java 1.8.0_191。

    1、SpringBoot 整合 ActiveMQ "Classic"

    1.1、引入依赖

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-activemq</artifactId>
    </dependency>
    <!--jms 连接池-->
    <dependency>
        <groupId>org.messaginghub</groupId>
        <artifactId>pooled-jms</artifactId>
    </dependency>

    1.2、配置(application.yml)

    spring:
      activemq:
        broker-url: tcp://10.40.100.69:61616
        pool:
          enabled: true
          max-connections: 5

    1.3、配置类

    package com.abc.demo.activemq;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
    import org.springframework.jms.config.JmsListenerContainerFactory;
    import org.springframework.jms.core.JmsMessagingTemplate;
    import org.springframework.jms.core.JmsTemplate;
    
    import javax.jms.ConnectionFactory;
    
    @Configuration
    public class ActiveMQConfig {
        @Value("${spring.activemq.broker-url}")
        private String brokerUrl;
    
        @Autowired
        private ConnectionFactory connectionFactory;
    
        @Bean
        public JmsTemplate jmsTemplate() {
            JmsTemplate jmsTemplate = new JmsTemplate(connectionFactory);
            //开启事务,需配合@Transactional使用
            jmsTemplate.setSessionTransacted(true);
            return jmsTemplate;
        }
    
        @Bean("jmsMessagingTemplateTransaction")
        public JmsMessagingTemplate jmsMessagingTemplate() {
            JmsMessagingTemplate jmsMessagingTemplate = new JmsMessagingTemplate();
            jmsMessagingTemplate.setJmsTemplate(jmsTemplate());
            return jmsMessagingTemplate;
        }
    
        @Bean("jmsListenerContainerFactoryTopic")
        public JmsListenerContainerFactory<?> jmsListenerContainerFactoryTopic() {
            DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
            factory.setConnectionFactory(connectionFactory);
            factory.setPubSubDomain(true);
            return factory;
        }
    
        @Bean("jmsListenerContainerFactoryTopicDurable")
        public JmsListenerContainerFactory<?> jmsListenerContainerFactoryTopicDurable() {
            DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
            factory.setConnectionFactory(new ActiveMQConnectionFactory(brokerUrl));
            factory.setPubSubDomain(true);
            //设置持久订阅
            factory.setSubscriptionDurable(true);
            factory.setClientId("1234");
            return factory;
        }
    }

    1.4、生产者

    package com.abc.demo.activemq;
    
    import com.abc.demo.util.DateUtil;
    import org.apache.activemq.command.ActiveMQQueue;
    import org.apache.activemq.command.ActiveMQTopic;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.jms.core.JmsMessagingTemplate;
    import org.springframework.scheduling.annotation.Scheduled;
    import org.springframework.stereotype.Component;
    import org.springframework.transaction.annotation.Transactional;
    
    import javax.jms.Destination;
    
    /**
     * 生产者,这里使用定时任务来发送消息
     */
    @Component
    public class Producer {
        private static Logger logger = LoggerFactory.getLogger(Producer.class);
    
        private Destination queue = new ActiveMQQueue("testQueue");
        private Destination topic = new ActiveMQTopic("testTopic");
    
        @Autowired
        private JmsMessagingTemplate jmsMessagingTemplate;
    
        @Scheduled(cron = "0 0/1 * * * ?")
        public void sendToQueue() {
            String message = "queue-msg-" + DateUtil.getCurrentDateString("yyyyMMddHHmmss");
            logger.info(message);
            jmsMessagingTemplate.convertAndSend(queue, message);
        }
    
        @Scheduled(cron = "30 0/1 * * * ?")
        public void sendToTopic() {
            String message = "topic-msg-" + DateUtil.getCurrentDateString("yyyyMMddHHmmss");
            logger.info(message);
            jmsMessagingTemplate.convertAndSend(topic, message);
        }
    
        /**
         * 事务
         */
        @Transactional
        @Scheduled(cron = "5 0/1 * * * ?")
        public void sendToQueueTransaction() {
            for (int i = 0;  i < 5; i++) {
                String message = "queue-msg-transaction-" + DateUtil.getCurrentDateString("yyyyMMddHHmmss") + "-" + i;
                logger.info(message);
                jmsMessagingTemplate.convertAndSend(queue, message);
            }
        }
    
        /**
         * 事务
         */
        @Transactional
        @Scheduled(cron = "35 0/1 * * * ?")
        public void sendToTopicTransaction() {
            for (int i = 0;  i < 5; i++) {
                String message = "topic-msg-transaction-" + DateUtil.getCurrentDateString("yyyyMMddHHmmss") + "-" + i;
                logger.info(message);
                jmsMessagingTemplate.convertAndSend(topic, message);
            }
        }
    }

    1.5、消费者

    package com.abc.demo.activemq;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.jms.annotation.JmsListener;
    import org.springframework.stereotype.Component;
    
    import javax.jms.JMSException;
    import javax.jms.Session;
    
    @Component
    public class Consumer {
        private static Logger logger = LoggerFactory.getLogger(Consumer.class);
    
        @JmsListener(destination = "testQueue")
        public void recevieFromQueue(String msg, Session session) throws JMSException {
            logger.info(session.getTransacted() + "-" + msg);
        }
    
        @JmsListener(destination = "testTopic", containerFactory = "jmsListenerContainerFactoryTopic")
        public void recevieFromTopic(String msg) {
            logger.info(msg);
        }
    
        /**
         * 持久化订阅
         * @param msg
         */
        @JmsListener(destination = "testTopic", containerFactory = "jmsListenerContainerFactoryTopicDurable")
        public void recevieFromTopicDurable(String msg) {
            logger.info(msg);
        }
    }

    2、SpringBoot 整合 ActiveMQ Artemis

    2.1、引入依赖

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-artemis</artifactId>
    </dependency>
    <!--jms 连接池-->
    <dependency>
        <groupId>org.messaginghub</groupId>
        <artifactId>pooled-jms</artifactId>
    </dependency>

    2.2、配置(application.yml)

    spring:
      artemis:
        host: 10.40.96.140
        port: 61616
        pool:
          enabled: true
          max-connections: 5

    2.3、配置类

    package com.abc.demo.activemq.artemis;
    
    import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
    import org.springframework.jms.config.JmsListenerContainerFactory;
    import org.springframework.jms.core.JmsMessagingTemplate;
    import org.springframework.jms.core.JmsTemplate;
    
    import javax.jms.ConnectionFactory;
    
    @Configuration
    public class ActiveMQConfig {
        @Value("${spring.artemis.host}")
        private String host;
    
        @Value("${spring.artemis.port}")
        private String port;
    
        @Autowired
        private ConnectionFactory connectionFactory;
    
        @Bean
        public JmsTemplate jmsTemplate() {
            JmsTemplate jmsTemplate = new JmsTemplate(connectionFactory);
            //开启事务,需配合@Transactional使用
            jmsTemplate.setSessionTransacted(true);
            return jmsTemplate;
        }
    
        @Bean("jmsMessagingTemplateTransaction")
        public JmsMessagingTemplate jmsMessagingTemplate() {
            JmsMessagingTemplate jmsMessagingTemplate = new JmsMessagingTemplate();
            jmsMessagingTemplate.setJmsTemplate(jmsTemplate());
            return jmsMessagingTemplate;
        }
    
        @Bean("jmsListenerContainerFactoryTopic")
        public JmsListenerContainerFactory<?> jmsListenerContainerFactoryTopic() {
            DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
            factory.setConnectionFactory(connectionFactory);
            factory.setPubSubDomain(true);
            return factory;
        }
    
        @Bean("jmsListenerContainerFactoryTopicDurable")
        public JmsListenerContainerFactory<?> jmsListenerContainerFactoryTopicDurable() {
            DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
    //        factory.setConnectionFactory(new ActiveMQConnectionFactory("tcp://" + host + ":" + port));
            factory.setConnectionFactory(connectionFactory);
            factory.setPubSubDomain(true);
            //设置持久订阅
            factory.setSubscriptionDurable(true);
            factory.setClientId("1234");
            return factory;
        }
    
        @Bean("jmsListenerContainerFactoryTopicShare")
        public JmsListenerContainerFactory<?> jmsListenerContainerFactoryTopicShare() {
            DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
            factory.setConnectionFactory(connectionFactory);
            factory.setPubSubDomain(true);
            //设置共享订阅
            factory.setSubscriptionShared(true);
            return factory;
        }
    
        @Bean("jmsListenerContainerFactoryTopicShareDurable")
        public JmsListenerContainerFactory<?> jmsListenerContainerFactoryTopicShareDurable() {
            DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
            factory.setConnectionFactory(connectionFactory);
            factory.setPubSubDomain(true);
            //设置共享订阅
            factory.setSubscriptionShared(true);
            //设置持久订阅
            factory.setSubscriptionDurable(true);
            return factory;
        }
    }

    2.4、生产者

    package com.abc.demo.activemq.artemis;
    
    import com.abc.demo.util.DateUtil;
    import org.apache.activemq.artemis.jms.client.ActiveMQQueue;
    import org.apache.activemq.artemis.jms.client.ActiveMQTopic;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.jms.core.JmsMessagingTemplate;
    import org.springframework.scheduling.annotation.Scheduled;
    import org.springframework.stereotype.Component;
    import org.springframework.transaction.annotation.Transactional;
    
    import javax.jms.Destination;
    
    /**
     * 生产者,这里使用定时任务来发送消息
     */
    @Component
    public class Producer {
        private static Logger logger = LoggerFactory.getLogger(Producer.class);
    
        private Destination queue = new ActiveMQQueue("testQueue");
        private Destination topic = new ActiveMQTopic("testTopic");
    
        @Autowired
        private JmsMessagingTemplate jmsMessagingTemplate;
    
        @Scheduled(cron = "0 0/1 * * * ?")
        public void sendToQueue() {
            String message = "queue-msg-" + DateUtil.getCurrentDateString("yyyyMMddHHmmss");
            logger.info(message);
            jmsMessagingTemplate.convertAndSend(queue, message);
        }
    
        @Scheduled(cron = "30 0/1 * * * ?")
        public void sendToTopic() {
            String message = "topic-msg-" + DateUtil.getCurrentDateString("yyyyMMddHHmmss");
            logger.info(message);
            jmsMessagingTemplate.convertAndSend(topic, message);
        }
    
        /**
         * 事务
         */
        @Transactional
        @Scheduled(cron = "5 0/1 * * * ?")
        public void sendToQueueTransaction() {
            for (int i = 0;  i < 5; i++) {
                String message = "queue-msg-transaction-" + DateUtil.getCurrentDateString("yyyyMMddHHmmss") + "-" + i;
                logger.info(message);
                jmsMessagingTemplate.convertAndSend(queue, message);
            }
        }
    
        /**
         * 事务
         */
        @Transactional
        @Scheduled(cron = "35 0/1 * * * ?")
        public void sendToTopicTransaction() {
            for (int i = 0;  i < 5; i++) {
                String message = "topic-msg-transaction-" + DateUtil.getCurrentDateString("yyyyMMddHHmmss") + "-" + i;
                logger.info(message);
                jmsMessagingTemplate.convertAndSend(topic, message);
            }
        }
    }

    2.5、消费者

    package com.abc.demo.activemq.artemis;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.jms.annotation.JmsListener;
    import org.springframework.stereotype.Component;
    
    import javax.jms.JMSException;
    import javax.jms.Session;
    
    @Component
    public class Consumer {
        private static Logger logger = LoggerFactory.getLogger(Consumer.class);
    
        @JmsListener(destination = "testQueue")
        public void recevieFromQueue(String msg, Session session) throws JMSException {
            logger.info(session.getTransacted() + "-" + msg);
        }
    
        @JmsListener(destination = "testTopic", containerFactory = "jmsListenerContainerFactoryTopic")
        public void recevieFromTopic(String msg) {
            logger.info(msg);
        }
    
        /**
         * 持久化订阅
         * @param msg
         */
        @JmsListener(destination = "testTopic", containerFactory = "jmsListenerContainerFactoryTopicDurable")
        public void recevieFromTopicDurable(String msg) {
            logger.info(msg);
        }
    
        /**
         * 共享订阅
         * @param msg
         */
        @JmsListener(destination = "testTopic", containerFactory = "jmsListenerContainerFactoryTopicShare", subscription = "test")
        public void recevieFromTopicShare(String msg) {
            logger.info(msg);
        }
        @JmsListener(destination = "testTopic", containerFactory = "jmsListenerContainerFactoryTopicShare", subscription = "test")
        public void recevieFromTopicShare2(String msg) {
            logger.info(msg);
        }
    
    
        /**
         * 共享持久订阅
         * @param msg
         */
        @JmsListener(destination = "testTopic", containerFactory = "jmsListenerContainerFactoryTopicShareDurable", subscription = "test2")
        public void recevieFromTopicShareDurable(String msg) {
            logger.info(msg);
        }
    }

     

    本帖子中包含资源

    您需要 登录 才可以下载,没有帐号?立即注册