`
dengwanchuan
  • 浏览: 45176 次
  • 性别: Icon_minigender_1
  • 来自: 武汉
最近访客 更多访客>>
社区版块
存档分类
最新评论

Java中间件JMS(四)之ActiveMQ整合spring之类转换

阅读更多

原文链接:http://blog.csdn.net/dwc_fly/article/details/11096071

前几章都是直接发送MapMessage类型的数据,拿前面的例子来讲,如果生产者发送的是TextMessage,消费者也是必须TextMessage;如果我们自己要发送的数据不是TextMessage类型,而消费者还是TextMessage的,那该怎么办?难道每次接受后都要增加一个转换方法么?其实spring早就考虑到这种情况了。转化器在很多组件中都是必不缺少的东西Spring的MessageConverter接口提供了对消息转换的支持。

 

1、转换类的相关代码POJO

新建一个类MsgPoJo,就是一个简单的Pojo类。具体代码如下:

[java] view plaincopy
 
  1. package jms.mq.spring;  
  2.   
  3. import java.io.Serializable;  
  4.   
  5. public class MsgPoJo implements Serializable{  
  6.     private String id;  
  7.     private String text;  
  8.     public String getId() {  
  9.         return id;  
  10.     }  
  11.     public void setId(String id) {  
  12.         this.id = id;  
  13.     }  
  14.     public String getText() {  
  15.         return text;  
  16.     }  
  17.     public void setText(String text) {  
  18.         this.text = text;  
  19.     }     
  20. }  

 

2.转换类的实现

新建一个类MsgConverter.java,实现MessageConverter接口。生成的代码如下

 

[java] view plaincopy
 
  1. package jms.mq.spring;  
  2.   
  3. import javax.jms.JMSException;  
  4. import javax.jms.Message;  
  5. import javax.jms.Session;  
  6. import javax.jms.TextMessage;  
  7.   
  8. import org.springframework.jms.support.converter.MessageConversionException;  
  9. import org.springframework.jms.support.converter.MessageConverter;  
  10.   
  11. public class MsgConverter implements MessageConverter{  
  12.   
  13.     @Override  
  14.     public Object fromMessage(Message message) throws JMSException,  
  15.     MessageConversionException {  
  16.         if (!(message instanceof TextMessage)) {  
  17.             throw new MessageConversionException("Message is not TextMessage");  
  18.         }  
  19.         System.out.println("--转换接收的消息--");  
  20.         TextMessage textMessage = (TextMessage) message;  
  21.         MsgPoJo msgPojo = new MsgPoJo();  
  22.         String[] texts=textMessage.getText().split(",");  
  23.         msgPojo.setId(texts[0]);  
  24.         msgPojo.setText(texts[1]);  
  25.         return msgPojo;  
  26.     }  
  27.   
  28.     @Override  
  29.     public Message toMessage(Object object, Session session) throws JMSException,  
  30.     MessageConversionException {  
  31.         if (!(object instanceof MsgPoJo)) {  
  32.             throw new MessageConversionException("obj is not MsgPojo");  
  33.         }  
  34.         System.out.println("--转换发送的消息--");  
  35.         MsgPoJo msgPojo = (MsgPoJo) object;  
  36.         TextMessage textMessage = session.createTextMessage();  
  37.         textMessage.setText(msgPojo.getId()+","+msgPojo.getText());  
  38.         return  textMessage;  
  39.     }  
  40. }  

 

 

代码很简单就是做些转换,有fromMessage和toMessage两个方法,真好对应发送转换toMessage和接受转换fromMessage。此时,发送和接收消息要换成template.convertAndSend(message);template.receiveAndConvert()。接下来我做一些配置,让spring知道我们的转换类。修改applicationContext.xml中jms模版配置的代码,修改后的代码如下:

[html] view plaincopy
 
  1. <!-- 类转换 -->  
  2. <bean id="msgConverter" class="jms.mq.spring.MsgConverter"></bean>  
  3.   
  4. <!-- 配置Jms模板 -->  
  5. <bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate">  
  6.     <property name="connectionFactory" ref="connectionFactory" />  
  7.     <property name="defaultDestination" ref="queueDest" />  
  8.     <!--<property name="receiveTimeout" value="10000" /> -->  
  9.     <!-- 类转换 -->  
  10.     <property name="messageConverter" ref="msgConverter"></property>  
  11. </bean>  

注意:如果你有队列监听容器配置,配置jmsQueueTemplate和jmsTopicTemplate可能与队列容器配置冲突。

3、业务相关代码和配置

在QueueProducerService.java增加convertAndSend()方法并在其实现类中实现,实现类的代码如下:

 

[java] view plaincopy
 
  1. package jms.mq.spring;  
  2.   
  3. import java.util.Date;  
  4.   
  5. import javax.jms.Destination;  
  6. import javax.jms.JMSException;  
  7. import javax.jms.Message;  
  8. import javax.jms.Session;  
  9. import javax.jms.TextMessage;  
  10. import org.springframework.jms.core.JmsTemplate;  
  11. import org.springframework.jms.core.MessageCreator;  
  12.   
  13. public class QueueProducerService{  
  14.     JmsTemplate jmsTemplate;  
  15.   
  16.     Destination destination;  
  17.   
  18.     public void send() {  
  19.         MessageCreator messageCreator = new MessageCreator() {  
  20.             public Message createMessage(Session session) throws JMSException {  
  21.                 TextMessage message = session.createTextMessage();  
  22.                 message.setText("QueueProducerService发送消息"+new Date());  
  23.                 return message;  
  24.             }  
  25.   
  26.         };  
  27.         jmsTemplate.send(this.destination,messageCreator);  
  28.     }  
  29.       
  30.     public void convertAndSend(){  
  31.         MsgPoJo msgPojo = new MsgPoJo();  
  32.         msgPojo.setId("1");  
  33.         msgPojo.setText("first msg");  
  34.         System.out.println("--发送消息:msgPojo.id为"+msgPojo.getId()+";msgPojo.text为"+msgPojo.getText());  
  35.         jmsTemplate.convertAndSend(this.destination, msgPojo);  
  36.     }  
  37.   
  38.   
  39.     public void setJmsTemplate(JmsTemplate jmsTemplate) {  
  40.         this.jmsTemplate = jmsTemplate;  
  41.     }  
  42.       
  43.     public void setDestination(Destination destination) {  
  44.         this.destination = destination;  
  45.     }  
  46. }  

 

同样在QueueConsumerService.java中增加receiveAndConvert()方法并在其实现类中实现,实现类的代码如下:

 

[java] view plaincopy
 
  1. package jms.mq.spring;  
  2.   
  3. import javax.jms.Destination;  
  4. import javax.jms.JMSException;  
  5. import javax.jms.TextMessage;  
  6. import org.springframework.jms.core.JmsTemplate;  
  7.   
  8.   
  9. public class QueueConsumerService{  
  10.   
  11.     JmsTemplate jmsTemplate;  
  12.   
  13.     Destination destination;  
  14.   
  15.     public void receive() {  
  16.         TextMessage message = (TextMessage) jmsTemplate.receive();  
  17.         try {  
  18.             System.out.println("QueueConsumerService收到消息:"+message.getText());  
  19.   
  20.         } catch (JMSException e) {  
  21.             e.printStackTrace();  
  22.         }  
  23.     }  
  24.   
  25.     public void receiveAndConvert() {  
  26.         MsgPoJo msgPojo = (MsgPoJo)jmsTemplate.receiveAndConvert();  
  27.         if(msgPojo!=null){  
  28.             System.out.println("--收到消息:msgPojo.id为"+msgPojo.getId()+";msgPojo.text为"+msgPojo.getText());  
  29.         }  
  30.     }  
  31.   
  32.     public void setJmsTemplate(JmsTemplate jmsTemplate) {  
  33.         this.jmsTemplate = jmsTemplate;  
  34.     }  
  35.   
  36.     public void setDestination(Destination destination) {  
  37.         this.destination = destination;  
  38.     }  
  39. }  

 

 

修改我们的两个测试类,增加对转换方法的调用,不再赘述,直接上代码:

QueueConsumerTest.java测试类

[java] view plaincopy
 
  1. package jms.mq.spring;  
  2.   
  3. import org.springframework.context.ApplicationContext;  
  4. import org.springframework.context.support.ClassPathXmlApplicationContext;  
  5.   
  6. public class QueueConsumerTest {  
  7.     private static ApplicationContext appContext = new ClassPathXmlApplicationContext( "applicationContext.xml");  
  8.   
  9.     private static void receive() {  
  10.         QueueConsumerService consumerService = (QueueConsumerService) appContext.getBean("queueConsumerService");  
  11.         consumerService.receive();  
  12.     }  
  13.   
  14.     private static void receiveAndConvert() {  
  15.         QueueConsumerService consumerService = (QueueConsumerService) appContext.getBean("queueConsumerService");  
  16.         consumerService.receiveAndConvert();  
  17.     }  
  18.   
  19.   
  20.     public static void main(String[] args) {  
  21.         //receive();  
  22.         receiveAndConvert();  
  23.     }  
  24. }  

QueueProducerTest.java测试类

[java] view plaincopy
 
  1. package jms.mq.spring;  
  2.   
  3. import org.springframework.context.ApplicationContext;  
  4. import org.springframework.context.support.ClassPathXmlApplicationContext;  
  5.   
  6. public class QueueProducerTest {  
  7.     private static ApplicationContext appContext = new ClassPathXmlApplicationContext( "applicationContext.xml");  
  8.   
  9.     private static void send() {  
  10.         QueueProducerService producerService = (QueueProducerService) appContext.getBean("queueProducerService");  
  11.         producerService.send();  
  12.     }  
  13.       
  14.     private static void convertAndSend() {  
  15.         QueueProducerService producerService = (QueueProducerService) appContext.getBean("queueProducerService");  
  16.         producerService.convertAndSend();  
  17.     }  
  18.   
  19.     public static void main(String[] args) {  
  20.         //send();  
  21.         convertAndSend();  
  22.     }  
  23.   
  24. }  

 

代码编写完毕,我们看一下我们的劳动成果。首先运行生产者类和消费者控制台信息如下:

 

收到的内容与发的内容相同,说明转换成功了。如果这一部分的程序使用的队列跟上面的一样,那你会发现发送的时候打印出的信息不值上面的一个,还包括一个接收的信息,这是为什么呢?了解spring原理的人应该知道,spring是把所有类都加载到内容中,当然也包括我们上门写的按个实现MessageListener的一个消费者类,他们也在运行,如果监听的地址跟你送的地址正好相同的话,他也有可能收到这个信息。所以在测试的时候要注意修改配置文件。

 

[html] view plaincopy
 
  1. <bean id="queueProducerService" class="jms.mq.spring.QueueProducerService">  
  2.     <property name="jmsTemplate" ref="jmsQueueTemplate" />  
  3.     <property name="destination" ref="queueDest" />    
  4. </bean>  
  5.   
  6. <bean id="queueConsumerService" class="jms.mq.spring.QueueConsumerService">  
  7.     <property name="jmsTemplate" ref="jmsQueueTemplate" />  
  8.     <property name="destination" ref="queueDest" />   
  9. </bean>  

 

4、监听器上的使用方式

我再来学习一下跟监听器联合使用的方式,只在发布订阅者模式上演示一下。我们先来修改发布者的实现方式,在发布者中增加convertAndSend方法并在其实现类中实现,订阅者监听器没有类转换,不用修改,发布者修改后的代码如下:

 

[java] view plaincopy
 
  1. package jms.mq.spring;  
  2.   
  3. import java.util.Date;  
  4.   
  5. import javax.jms.Destination;  
  6. import javax.jms.JMSException;  
  7. import javax.jms.MapMessage;  
  8. import javax.jms.Message;  
  9. import javax.jms.Session;  
  10. import javax.jms.TextMessage;  
  11.   
  12. import org.springframework.jms.core.JmsTemplate;  
  13. import org.springframework.jms.core.MessageCreator;  
  14.   
  15. import jms.spring.QueueProducerService;  
  16.   
  17. public class TopicPublisherService{  
  18.     JmsTemplate jmsTemplate;  
  19.        
  20.     Destination destination;  
  21.   
  22.     public void send() {  
  23.         MessageCreator messageCreator = new MessageCreator() {  
  24.   
  25.             public Message createMessage(Session session) throws JMSException {  
  26.                 TextMessage message = session.createTextMessage();  
  27.                 message.setText("QueueProducerService发送消息"+new Date());  
  28.                 return message;  
  29.             }  
  30.         };  
  31.         jmsTemplate.send(this.destination,messageCreator);  
  32.     }  
  33.   
  34.     public void convertAndSend(Object obj) {  
  35.         System.out.println("--发送PoJo对象...");  
  36.         jmsTemplate.convertAndSend(destination, obj);  
  37.     }  
  38.   
  39.       
  40.     public void setJmsTemplate(JmsTemplate jmsTemplate) {  
  41.         this.jmsTemplate = jmsTemplate;  
  42.     }  
  43.   
  44.     public void setDestination(Destination destination) {  
  45.         this.destination = destination;  
  46.     }  
  47.   
  48. }  

 

发布订阅者配置文件如下

 

[html] view plaincopy
 
  1. <!-- 配置TopicJms模板 -->  
  2. <bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate">  
  3.     <property name="connectionFactory" ref="connectionFactory" />  
  4.     <property name="defaultDestination" ref="topicDest" />  
  5.     <!-- 配置是否为发布订阅者模式,默认为false -->  
  6.     <property name="pubSubDomain" value="true" />  
  7.     <!--<property name="receiveTimeout" value="10000" /> -->  
  8.     <property name="messageConverter" ref="msgConverter"></property>  
  9. </bean>  
[html] view plaincopy
 
  1. <bean id="topicPublisherService" class="jms.mq.spring.TopicPublisherService">  
  2.         <property name="jmsTemplate" ref="jmsTopicTemplate" />  
  3.         <!-- <property name="destination" ref="topicDest" /> -->  
  4.         <property name="destination" ref="topicSubscriberMessageListenerDest" />   
  5.     </bean>  
  6.   
  7.     <bean id="topicSubscriberService" class="jms.mq.spring.TopicSubscriberService">  
  8.         <property name="jmsTemplate" ref="jmsTopicTemplate" />  
  9.         <property name="destination" ref="topicDest" />  
  10.     </bean>  

 

修改上面的发布测试类,修改增加对新增方法的调用,修改后的内容如下:

[java] view plaincopy
 
  1. package jms.mq.spring;  
  2.   
  3. import org.springframework.context.ApplicationContext;  
  4. import org.springframework.context.support.ClassPathXmlApplicationContext;  
  5.   
  6. public class TopicPublisherTest {  
  7.     private static ApplicationContext appContext = new ClassPathXmlApplicationContext( "applicationContext.xml");  
  8.   
  9.     private static void send() {  
  10.         TopicPublisherService topicPublisherService = (TopicPublisherService) appContext.getBean("topicPublisherService");  
  11.         topicPublisherService.send();  
  12.     }  
  13.     private static void convertAndSend() {  
  14.         TopicPublisherService topicPublisherService = (TopicPublisherService) appContext.getBean("topicPublisherService");  
  15.         MsgPoJo msgPoJo = new MsgPoJo();  
  16.         msgPoJo.setId("1");  
  17.         msgPoJo.setText("测试内容");  
  18.         topicPublisherService.convertAndSend(msgPoJo);  
  19.     }  
  20.   
  21.   
  22.     public static void main(String[] args) {  
  23.         //send();  
  24.         convertAndSend();  
  25.     }  
  26. }  

运行发布测试类,运行结果如下:

写在到这里,ActiveMQ与spring整合就讲完了,主要讲了ActiveMQ与spring的简单整合,监听器和类转换这些主要功能.

呵呵,写到不好,请大家不要拍砖。

0
1
分享到:
评论
1 楼 zn100200 2013-11-28  
从一到五我都配置成功了也能正常运行,很好,虽然还是有些不明白的但谢谢分享。

相关推荐

    消息中间件ActiveMQ及Spring整合JMS.docx

    ——学习参考资料:仅用于个人学习使用! 本代码仅作学习交流,切勿用于商业用途,否则后果自负。若涉及侵权,请联系,会尽快处理! 未进行详尽测试,请自行调试!

    activemq消息中间件的使用demo,以及spring集合jms实现消息发送和处理。

    activemq消息中间件的使用demo,以及spring集合jms实现消息发送和处理。

    JMS+activeMQ消息中间件

    最全的基于spring mvc的JMS+activeMQ实现的消息中间件代码例子,源程序和apache-activemq-5.10.0-bin.zip

    activemq新手大全

    一、JMS基本概念 二、activemq介绍及安装 1、消息中间件简介 2、activemq ...四、activemq整合spring运用 五、activemq常见问题 5.1 activemq 消息传递 5.2 activemq 消息确认机制 5.3 activemq 持久化机制

    ActiveMQ实例

    activemq与spring整合,activemq与swing整合 activemq与ajax整合

    jms-spring.zip

    java消息中间件学习-Spring集成JMS连接ActiveMQ demo

    ActiveMQ详细入门使用教程_java_MQ_

    MQ是消息中间件,是一种在分布式系统中应用程序借以传递消息的媒介,常用的有ActiveMQ,RabbitMQ,kafka。ActiveMQ是Apache下的开源项目,完全支持JMS1.1和J2EE1.4规范的JMS Provider实现。?特点:?1、支持多种语言...

    ActiveMQ消息服务器.rar

    1.ActiveMQ消息队列中间件 1.1.什么是ActiveMQ ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,...

    消息中间件activemq项目demo

    消息中间件activemq的入门demo,以及集成了spring管理jsm的初始化管理,简化发送消息的步骤。前后两个项目的对比,凸显出spring的优点

    apache-activemq-5.15.0-bin.tar.7z

    MQ是消息中间件,是一种在分布式系统中应用程序借以传递消息的媒介,常用的有ActiveMQ,RabbitMQ,kafka。ActiveMQ是Apache下的开源项目,完全支持JMS1.1和J2EE1.4规范的JMS Provider实现。 特点: 1、支持多种...

    Learn Apache ActiveMQ

    Apache ActiveMQ是Apache软件基金会所研发的开放源码消息中间件;由于ActiveMQ是一个纯Java程式,因此只需要操作系统支援Java虚拟机,ActiveMQ便可执行。 支持Java消息服务 (JMS) 1.1 版本 Spring Framework 集群 ...

    ActiveMQ.rar

    一: ActiveMQ简介 包括:是什么、能干什么、特点;消息中间件的功能、特点、应用场景等 n 二: ActiveMQ安装和基本使用 包括:通过源码安装、基本的配置示例、启动、测试运行、关闭等 n 三:理解和掌握JMS 包括:...

    ActiveMQDemo:这是关于消息中间件ActiveMQ的WEB项目,将覆盖JMS,ActiveMQ的初始入门和API详细使用,两种经典的消息模式(PTP和PubSub),与Spring集成,ActiveMQ进行监控,监控与配置优化等。不多说,直接撸原始码!

    ActiveMQDemo:这是关于消息中间件ActiveMQ的WEB项目,将覆盖JMS,ActiveMQ的初始入门和API详细使用,两种经典的消息模式(PTP和PubSub),与Spring集成,ActiveMQ进行监控,监控与配置优化等。不多说,直接撸原始码...

    将Sun的Open Message Queue与Spring集成

    基于JMS标准的消息中间件实现的产品有很多,JBossMQ、ActiveMQ、OpenMQ、OpenJMS等等,最常用的还是apache的ActiveMQ。有时也使用Sun的OpenMQ。在官网http://mq.java.net/处可以下载。Open Message Queue是Sun Java ...

    实战ActiveMQ集群与应用视频教程.zip

    1:ActiveMQ入门和消息中间件 2:JMS基本概念和模型 3:JMS的可靠性机制 4:JMS的API结构和开发步骤 5:Broker的启动方式 6:ActiveMQ结合Spring开发 7:ActiveMQ支持的传输协议 8:ActiveMQ消息存储持久化 9:...

    JavaEE企业级开发的面试题汇总

    JavaEE企业级开发的面试题汇总,内容...5.JMS中间件:关于ActiveMQ消息中间件相关的面试题 6.RPC中间件DUBBO的面试题 7.注册中心zookeeper的面试题 8.MAVN和solr的面试题 9.分布式事务、分面式锁、高并发相关的面试题等

    ActiveMQ从入门到精通(一)

    本文来自于jianshu,本篇文章介绍了JMS、...andPub/Sub)、与Spring整合、ActiveMQ集群、监控与配置优化等。话不多说,我们来一起瞧一瞧!首先来说较早以前,也就是没有JMS的那个时候,很多应用系统存在一些缺陷

    2小时学会Spring+Dubbo整合ActiveMQ消息队列

    让你更了解互联网是如何解决高并发 学完SSM框架的同学就可以学习,能让你切身感受到企业级开发环境目标1:理解消息中间件、JMS等概念目标2:掌握JMS点对点与发布订阅模式的收发消息目标3:掌握SpringJms目标4:完成...

    java开源包4

    BoneCP很小,只有四十几K(运行时需要slf4j和guava的支持,这二者加起来就不小了),而相比之下 C3P0 要六百多K。 异步输出框架 AsynWriter 一个Java的类库,用于异步输出记录的简单小框架用于高并发下数据输出使用...

    java开源包11

    BoneCP很小,只有四十几K(运行时需要slf4j和guava的支持,这二者加起来就不小了),而相比之下 C3P0 要六百多K。 异步输出框架 AsynWriter 一个Java的类库,用于异步输出记录的简单小框架用于高并发下数据输出使用...

Global site tag (gtag.js) - Google Analytics