Apache activemq例子
Apache activemq 是目前比较流行的开源消息,支持多语言客户的编写, 支持Ajax、支持与Axis的整合
完全支持JMS1.1和J2EE 1.4规范。 官网: http://activemq.apache.org/ 最新版本下载http://activemq.apache.org/download.html.
所用的Jar包:
activemq-core-5.1.0.jar、geronimo-j2ee-management_1.0_spec-1.0.jar、geronimo-jms_1.1_spec-1.1.1.jar、geronimo-jta_1.0.1B_spec-1.0.1.jar、activemq-web-5.1.0.jar
首先启动activemq.bat服务
activemq.xml配置:
<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. --> <!-- START SNIPPET: xbean --> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:amq="http://activemq.apache.org/schema/core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd"> <broker xmlns="http://activemq.apache.org/schema/core" persistent="false" useJmx="false"> <persistenceFactory> <journalPersistenceAdapterFactory journalLogFiles="5" dataDirectory="../data"/> </persistenceFactory> <transportConnectors> <transportConnector uri="tcp://localhost:61616"/> <transportConnector uri="stomp://localhost:61613"/> </transportConnectors> </broker> </beans> <!-- END SNIPPET: xbean -->
Tomcat下的Content.xml文件配置:
<Resource name="jms/FailoverConnectionFactory" auth="Container" type="org.apache.activemq.ActiveMQConnectionFactory" description="JMS Connection Factory" factory="org.apache.activemq.jndi.JNDIReferenceFactory" brokerURL="failover:(tcp://localhost:61616)?initialReconnectDelay=100&maxReconnectAttempts=5" brokerName="localhost" useEmbeddedBroker="false"/> <Resource name="jms/NormalConnectionFactory" auth="Container" type="org.apache.activemq.ActiveMQConnectionFactory" description="JMS Connection Factory" factory="org.apache.activemq.jndi.JNDIReferenceFactory" brokerURL="tcp://localhost:61616" brokerName="XXX" useEmbeddedBroker="false"/> <Resource name="jms/topic/MyTopic" auth="Container" type="org.apache.activemq.command.ActiveMQTopic" factory="org.apache.activemq.jndi.JNDIReferenceFactory" physicalName="MY.TEST.FOO"/> <Resource name="jms/queue/MyQueue" auth="Container" type="org.apache.activemq.command.ActiveMQQueue" factory="org.apache.activemq.jndi.JNDIReferenceFactory" physicalName="MY.TEST.FOO.QUEUE"/> <!-- Configuration explanation: the way to JNDI defines the connection ActiveMQ broker url, Topic and Queue--> </Context>
web.xml的配置:
<!-- JMS --> <servlet> <servlet-name>jms-listener</servlet-name> <!-- 你的监听类 --> <servlet-class>com.util.jms.JMSListener</servlet-class> <!-- 容器是在启动的时候就加载这个servlet(实例化并调用其init()方法) ,1代表的是优先级 1-5--> <load-on-startup>1</load-on-startup> </servlet>
<context-param>
<param-name>brokerURI</param-name>
<param-value>/WEB-INF/activemq.xml</param-value>
</context-param>
JMSListener监听类和接受消息处理:
package com.util.jms; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.Session; import javax.jms.Topic; import javax.jms.TopicSubscriber; import javax.naming.Context; import javax.naming.InitialContext; import javax.naming.NamingException; import javax.servlet.ServletConfig; import javax.servlet.ServletException; import javax.servlet.http.HttpServlet; /** * JMS监听端 <br> * 由容器加载 <br> * Servlet模式 * * @author Iverson */ public class JMSListener extends HttpServlet implements MessageListener { private static final long serialVersionUID = 1L; public static StringBuffer newsContent = new StringBuffer(); /** 初始化jms连接,创建topic监听器 */ public void init(ServletConfig config) throws ServletException { try { // 初始上下文 InitialContext initCtx = new InitialContext(); // 上下文查找 Context envContext = (Context) initCtx.lookup("java:comp/env"); ConnectionFactory connectionFactory = (ConnectionFactory) envContext.lookup("jms/FailoverConnectionFactory"); Connection connection = connectionFactory.createConnection(); // Connection必须指定一个唯一的clientId connection.setClientID("clientId"); Session jmsSession = connection.createSession(false,Session.AUTO_ACKNOWLEDGE); // //基于Topic创建持久的消息订阅者 TopicSubscriber consumer = jmsSession.createDurableSubscriber((Topic) envContext.lookup("jms/topic/MyTopic"), "MySub"); consumer.setMessageListener(this); connection.start(); } catch (NamingException e) { e.printStackTrace(System.out); } catch (JMSException e) { e.printStackTrace(System.out); } } @Override // 消息处理 public void onMessage(Message message) { String content = ""; // 如果是修改员工 if (checkText(message, JMSLogotype.EMPUP) != null) { content = checkText(message, JMSLogotype.EMPUP); System.out.println(content); } } // 接受消息JSP处理 private static String checkText(Message m, String s) { try { return m.getStringProperty(s); } catch (JMSException e) { e.printStackTrace(System.out); return null; } }
Publish消息发布类:
package com.util.jms; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.MessageProducer; import javax.jms.Session; import javax.naming.Context; import javax.naming.InitialContext; import javax.naming.NamingException; import javax.servlet.ServletException; import javax.servlet.http.HttpServlet; /** *biz 消息发布 * * @author allen */ public class BizPublish extends HttpServlet implements MessageListener { private static final long serialVersionUID = 1L; private InitialContext initCtx; private Context envContext; private ConnectionFactory connectionFactory; private Connection connection; private Session jmsSession; private MessageProducer producer; /** * Singleton */ static class BizPublishHolder { static BizPublish instance = new BizPublish(); } public static BizPublish getInstance() { return BizPublishHolder.instance; } private BizPublish() { super(); try { init(); } catch (ServletException e) { e.printStackTrace(); } } /** * Initialization of the servlet. */ public void init() throws ServletException { try { initCtx = new InitialContext(); envContext = (Context) initCtx.lookup("java:comp/env"); connectionFactory = (ConnectionFactory) envContext.lookup("jms/NormalConnectionFactory"); connection = connectionFactory.createConnection(); jmsSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); producer = jmsSession.createProducer((Destination) envContext.lookup("jms/topic/MyTopic")); } catch (NamingException e) { e.printStackTrace(); } catch (JMSException e) { e.printStackTrace(); } } @Override public void onMessage(Message message) { } public void send(String content) { // 设置持久方式 try { producer.setDeliveryMode(DeliveryMode.PERSISTENT); Message testMessage = jmsSession.createMessage(); // 发布消息 testMessage.setStringProperty("faceemployye", content); producer.send(testMessage); // testMessage.clearProperties(); } catch (Exception e) { e.printStackTrace(); } } /** * Destruction of the servlet. */ public void destroy() { super.destroy(); } }