Apache activemq例子

Apache activemq 是目前比较流行的开源消息,支持多语言客户的编写, 支持Ajax、支持与Axis的整合

完全支持JMS1.1和J2EE 1.4规范。 官网: http://activemq.apache.org/ 最新版本下载http://activemq.apache.org/download.html.

所用的Jar包:

activemq-core-5.1.0.jar、geronimo-j2ee-management_1.0_spec-1.0.jar、geronimo-jms_1.1_spec-1.1.1.jar、geronimo-jta_1.0.1B_spec-1.0.1.jar、activemq-web-5.1.0.jar

首先启动activemq.bat服务

activemq.xml配置:

<!--
    Licensed to the Apache Software Foundation (ASF) under one or more
    contributor license agreements.  See the NOTICE file distributed with
    this work for additional information regarding copyright ownership.
    The ASF licenses this file to You under the Apache License, Version 2.0
    (the "License"); you may not use this file except in compliance with
    the License.  You may obtain a copy of the License at
   
    http://www.apache.org/licenses/LICENSE-2.0
   
    Unless required by applicable law or agreed to in writing, software
    distributed under the License is distributed on an "AS IS" BASIS,
    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    See the License for the specific language governing permissions and
    limitations under the License.
-->
<!-- START SNIPPET: xbean -->
<beans
  xmlns="http://www.springframework.org/schema/beans"
  xmlns:amq="http://activemq.apache.org/schema/core"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
  http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd
  http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">

  
  <broker xmlns="http://activemq.apache.org/schema/core"  persistent="false"  useJmx="false">

    <persistenceFactory>
      <journalPersistenceAdapterFactory journalLogFiles="5" dataDirectory="../data"/>
    </persistenceFactory>
  
    <transportConnectors>
      <transportConnector uri="tcp://localhost:61616"/>      
      <transportConnector uri="stomp://localhost:61613"/>
    </transportConnectors>
        
  </broker>
  
</beans>
<!-- END SNIPPET: xbean -->

Tomcat下的Content.xml文件配置:

<Resource
 name="jms/FailoverConnectionFactory" auth="Container" type="org.apache.activemq.ActiveMQConnectionFactory"
 description="JMS Connection Factory"  factory="org.apache.activemq.jndi.JNDIReferenceFactory"
 brokerURL="failover:(tcp://localhost:61616)?initialReconnectDelay=100&amp;maxReconnectAttempts=5"
 brokerName="localhost" useEmbeddedBroker="false"/>
<Resource
 name="jms/NormalConnectionFactory"
 auth="Container"
 type="org.apache.activemq.ActiveMQConnectionFactory"
 description="JMS Connection Factory"
 factory="org.apache.activemq.jndi.JNDIReferenceFactory"
 brokerURL="tcp://localhost:61616"
 brokerName="XXX"
 useEmbeddedBroker="false"/>
 <Resource name="jms/topic/MyTopic"
 auth="Container"
   type="org.apache.activemq.command.ActiveMQTopic"
 factory="org.apache.activemq.jndi.JNDIReferenceFactory"
 physicalName="MY.TEST.FOO"/>
<Resource name="jms/queue/MyQueue"
 auth="Container"
 type="org.apache.activemq.command.ActiveMQQueue"
 factory="org.apache.activemq.jndi.JNDIReferenceFactory"
 physicalName="MY.TEST.FOO.QUEUE"/> 
 <!--  Configuration explanation: the way to JNDI defines the connection ActiveMQ broker url, Topic and Queue-->
</Context>

web.xml的配置:

<!-- JMS -->
    <servlet>
        <servlet-name>jms-listener</servlet-name>
        <!-- 你的监听类 -->
        <servlet-class>com.util.jms.JMSListener</servlet-class>
        <!-- 容器是在启动的时候就加载这个servlet(实例化并调用其init()方法) ,1代表的是优先级 1-5-->
        <load-on-startup>1</load-on-startup>
    </servlet>
<context-param>

<param-name>brokerURI</param-name>

<param-value>/WEB-INF/activemq.xml</param-value>

</context-param>

JMSListener监听类和接受消息处理:

package com.util.jms;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.TopicSubscriber;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import javax.servlet.ServletConfig;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;

/**
 * JMS监听端 <br>
 * 由容器加载 <br>
 * Servlet模式
 * 
 * @author Iverson
 */
public class JMSListener extends HttpServlet implements MessageListener {

    private static final long serialVersionUID = 1L;
    public static StringBuffer newsContent = new StringBuffer();

    /** 初始化jms连接,创建topic监听器 */
    public void init(ServletConfig config) throws ServletException {
        try {
            // 初始上下文
            InitialContext initCtx = new InitialContext();
            // 上下文查找
            Context envContext = (Context) initCtx.lookup("java:comp/env");
            ConnectionFactory connectionFactory = (ConnectionFactory) envContext.lookup("jms/FailoverConnectionFactory");
            Connection connection = connectionFactory.createConnection();
            // Connection必须指定一个唯一的clientId
            connection.setClientID("clientId");
            Session jmsSession = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
            // //基于Topic创建持久的消息订阅者
            TopicSubscriber consumer = jmsSession.createDurableSubscriber((Topic) envContext.lookup("jms/topic/MyTopic"), "MySub");
            consumer.setMessageListener(this);
            connection.start();
        } catch (NamingException e) {
            e.printStackTrace(System.out);
        } catch (JMSException e) {
            e.printStackTrace(System.out);
        }
    }

    @Override
    // 消息处理
    public void onMessage(Message message) {
        String content = "";
        // 如果是修改员工
        if (checkText(message, JMSLogotype.EMPUP) != null) {
            content = checkText(message, JMSLogotype.EMPUP);
            System.out.println(content);
        }
     }

    // 接受消息JSP处理

    private static String checkText(Message m, String s) {
        try {
            return m.getStringProperty(s);
        } catch (JMSException e) {
            e.printStackTrace(System.out);
            return null;
        }
    }

Publish消息发布类:

package com.util.jms;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;

/**
 *biz 消息发布
 * 
 * @author allen
 */
public class BizPublish extends HttpServlet implements MessageListener {

    private static final long serialVersionUID = 1L;
    private InitialContext initCtx;
    private Context envContext;
    private ConnectionFactory connectionFactory;
    private Connection connection;
    private Session jmsSession;
    private MessageProducer producer;

    /**
     * Singleton
     */
    static class BizPublishHolder {
        static BizPublish instance = new BizPublish();
    }

    public static BizPublish getInstance() {
        return BizPublishHolder.instance;
    }

    private BizPublish() {
        super();
        try {
            init();
        } catch (ServletException e) {
            e.printStackTrace();
        }
    }

    /**
     * Initialization of the servlet.
     */
    public void init() throws ServletException {
        try {
            initCtx = new InitialContext();
            envContext = (Context) initCtx.lookup("java:comp/env");
            connectionFactory = (ConnectionFactory) envContext.lookup("jms/NormalConnectionFactory");
            connection = connectionFactory.createConnection();
            jmsSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            producer = jmsSession.createProducer((Destination) envContext.lookup("jms/topic/MyTopic"));
        } catch (NamingException e) {
            e.printStackTrace();
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void onMessage(Message message) {
    }

    public void send(String content) {
        // 设置持久方式
        try {
            producer.setDeliveryMode(DeliveryMode.PERSISTENT);
            Message testMessage = jmsSession.createMessage();
            // 发布消息
            testMessage.setStringProperty("faceemployye", content);
            producer.send(testMessage);
            // testMessage.clearProperties();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * Destruction of the servlet.
     */
    public void destroy() {
        super.destroy();
    }
}