关于CTS项目的Spring+ActiveMQ(内置Broker启动)

关于CTS项目的Spring+ActiveMQ(内置Broker启动)

1、引入activemq-all-5.15.0.jar。由于activemq-all-5.15.0.jar包含了spring, 与当前引入的spring冲突, 必须把activemq-all中的spring删除。

2、applicationContext.xml配置。

<?xml version='1.0' encoding='UTF-8' ?>
<beans xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p"
  xmlns:tx="http://www.springframework.org/schema/tx" xmlns:context="http://www.springframework.org/schema/context"
  xmlns:mvc="http://www.springframework.org/schema/mvc"
  xmlns:jms="http://www.springframework.org/schema/jms" xmlns:amq="http://activemq.apache.org/schema/core"
  xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
       http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.0.xsd
       http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd
       http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc-4.0.xsd
       http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-4.0.xsd
       http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core-5.15.2.xsd">
  <!-- 使用注解驱动 -->
  <context:annotation-config />
  <!-- 数据库连接池 -->
  <bean id="dataSource" class="org.apache.commons.dbcp.BasicDataSource">
    <property name="driverClassName" value="com.mysql.jdbc.Driver" />
    <property name="url" value="jdbc:mysql://localhost:3306/cts" />
    <property name="username" value="root" />
    <property name="password" value="root" />
    <property name="maxActive" value="255" />
    <property name="maxIdle" value="5" />
    <property name="maxWait" value="10000" />
  </bean>

  <!-- 集成mybatis -->
  <bean id="SqlSessionFactory" class="org.mybatis.spring.SqlSessionFactoryBean">
    <property name="dataSource" ref="dataSource" />
    <property name="configLocation" value="classpath:/mybatis/mybatis-config.xml" />
  </bean>

  <!-- 配置数据源事务管理器 -->
  <bean id="transactionManager"
    class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
    <property name="dataSource" ref="dataSource" />
  </bean>

  <!-- 采用自动扫描方式创建mapper bean -->
  <bean class="org.mybatis.spring.mapper.MapperScannerConfigurer">
      <property name="basePackage" value="com.ssm.cts" />
      <property name="SqlSessionFactory" ref="SqlSessionFactory" />
      <property name="annotationClass" value="org.springframework.stereotype.Repository" />
  </bean>
  
  <!-- Embedded ActiveMQ Broker -->
    <amq:broker id="broker" useJmx="false" persistent="false">
        <amq:transportConnectors>
            <!-- TCP transport 允许客户端通过TCP socket连接到远程的broker。 -->
            <amq:transportConnector uri="tcp://localhost:61616"/>
            <!-- VM transport 允许在VM内部通信, 从而避免了网络传输的开销。 -->
            <amq:transportConnector uri="vm://localhost:0"/>
        </amq:transportConnectors>
    </amq:broker>

  <!--activemq连接工厂 -->
  <amq:connectionFactory id="activemqConnectionFactory" brokerURL="tcp://127.0.0.1:61616"
    userName="admin" password="admin">
  </amq:connectionFactory>

  <!--spring连接工厂, 管理activemq连接工厂 -->
  <bean id="connectionFactory"
    class="org.springframework.jms.connection.CachingConnectionFactory">
    <!--管理对象, 对应activemq连接工厂 -->
    <property name="targetConnectionFactory"
      ref="activemqConnectionFactory"></property>
    <!--session缓存数量 -->
    <property name="sessionCacheSize" value="100" />
  </bean>

  <!--定义消息队列 -->
  <bean id="ctsQueueDestination"
    class="org.apache.activemq.command.ActiveMQQueue">
    <!--消息队列名称 -->
    <constructor-arg>
      <value>cts.queue</value>
    </constructor-arg>
  </bean>

  <bean id="ctsTopicDestination"
    class="org.apache.activemq.command.ActiveMQTopic">
    <!--订阅主题名称 -->
    <constructor-arg>
      <value>cts.topic</value>
    </constructor-arg>
  </bean>

  <!--消息生产者 -->
  <!--队列类型JmsTemplate -->
  <bean id="jmsQueueTemplate"
    class="org.springframework.jms.core.JmsTemplate">
    <property name="connectionFactory" ref="connectionFactory" />
    <property name="defaultDestination" ref="ctsQueueDestination" />
    <property name="receiveTimeout" value="10000" />
    <!--非pub/sub (发布/订阅)模式, 即队列模式 -->
    <property name="pubSubDomain" value="false" />
  </bean>

  <!--Topic类型JmsTemplate -->
  <bean id="jmsTopicTemplate"
    class="org.springframework.jms.core.JmsTemplate">
    <property name="connectionFactory" ref="connectionFactory" />
    <property name="defaultDestination" ref="ctsTopicDestination" />
    <property name="receiveTimeout" value="10000" />
    <!--pub/sub(发布/订阅)模式 -->
    <property name="pubSubDomain" value="true" />
  </bean>

  <!--队列监听器 -->
  <bean id="queueMessageListen" class="com.ssm.cts.amq.QueueMessageListener" />
  
  <!--Topic监听器 -->
  <bean id="topicMessageListen" class="com.ssm.cts.amq.TopicMessageListener" />
  
  <!--消息消费者 -->
  <!--显示注入消息监听容器(Queue), 配置连接工厂, 监听的目标是ctsQueueDestination, 监听器是上面定义的监听器 -->
    <bean id="queueListenerContainer"
          class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory" />
        <property name="destination" ref="ctsQueueDestination" />
        <property name="messageListener" ref="queueMessageListen" />
    </bean>
    
    <!--显示注入消息监听容器(Topic), 配置连接工厂, 监听的目标是ctsTopicDestination, 监听器是上面定义的监听器 -->
    <bean id="topicListenerContainer"
          class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory" />
        <property name="destination" ref="ctsTopicDestination" />
        <property name="messageListener" ref="topicMessageListen" />
    </bean>

</beans>

3、QueueMessageListener.java

package com.ssm.cts.amq;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

import org.springframework.stereotype.Component;

/**
 * CopyRright (c)2018-2028: chanpinxue.cn 
 * Project: cts 
 * Module Name: TopicMessageListener
 * Comments: 消息监听器
 * JDK version used: JDK1.8 
 * Author: jzh 
 * Create Date: 2018-10-25
 * Modified By: jzh 
 * Modified Date: 2018-10-25
 * Why & What is modified:
 * Version: <1.0>
 */

@Component
public class QueueMessageListener implements MessageListener {
  
  @Override
  public void onMessage(Message message) {
        TextMessage tm = (TextMessage) message;
        try {
            System.out.println("QueueMessageListener监听到了文本消息:\t" + tm.getText());
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

4、TopicMessageListener.java

package com.ssm.cts.amq;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.TextMessage;
import javax.jms.MessageListener;

import org.springframework.stereotype.Component;

/**
 * CopyRright (c)2018-2028: chanpinxue.cn 
 * Project: cts 
 * Module Name: TopicMessageListener
 * Comments: 消息监听器
 * JDK version used: JDK1.8 
 * Author: jzh 
 * Create Date: 2018-10-25
 * Modified By: jzh 
 * Modified Date: 2018-10-25
 * Why & What is modified:
 * Version: <1.0>
 */

@Component
public class TopicMessageListener implements  MessageListener {

  @Override
    public void onMessage(Message message) {
        TextMessage tm = (TextMessage) message;
        try {
            System.out.println("TopicMessageListener监听到了文本消息:\t" + tm.getText());
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

5、ActiveMQController.java

package com.ssm.cts.controller;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.servlet.ModelAndView;

/**
 * CopyRright (c)2018-2028: chanpinxue.cn 
 * Project: cts 
 * Module Name: ActiveMQController
 * Comments: 控制器
 * JDK version used: JDK1.8 
 * Author: jzh 
 * Create Date: 2018-10-25
 * Modified By: jzh 
 * Modified Date: 2018-10-25
 * Why & What is modified:
 * Version: <1.0>
 */

// 注解@Controller表示它是一个控制器
@Controller("amqController")
// 表明当请求的URI在/my下的时候才有该控制器响应
@RequestMapping("/mq")
public class ActiveMQController {
  
  // 注入JmsTemplate
  @Autowired
  private JmsTemplate jmsQueueTemplate;
  
  @Autowired
  private JmsTemplate jmsTopicTemplate;
  
  // activemq-all-5.15.0.jar包含了spring, 与当前引入的spring冲突, 必须把activemq-all中的spring删除.
  
  // http://127.0.0.1:8161/admin
  
  // 表明URI是/queue的时候该方法才请求
  // http://localhost:8080/cts/mq/queue.do
  @RequestMapping("/queue")
  public String queue() {
    jmsQueueTemplate.send(new MessageCreator() {
      @Override
      public Message createMessage(Session session) throws JMSException {
        TextMessage message = session.createTextMessage();
        message.setText("9001");
        return message;
      }
    });
    
    // 重定向, 从一个controller跳到另一个controller
    return "redirect:/mi/index.do";
  }
  
  // 表明URI是/queue的时候该方法才请求
  // http://localhost:8080/cts/mq/topic.do
  @RequestMapping("/topic")
  public String topic() {
    jmsTopicTemplate.send(new MessageCreator() {
      @Override
      public Message createMessage(Session session) throws JMSException {
        TextMessage message = session.createTextMessage();
        message.setText("9001");
        return message;
      }
    });
    
    // 重定向, 从一个controller跳到另一个controller
    return "redirect:/mi/index.do";
  }
}

6、启动。

http://localhost:8080/cts/mq/queue.do

http://localhost:8080/cts/mq/topic.do

发表回复

您的电子邮箱地址不会被公开。