IT博客汇
  • 首页
  • 精华
  • 技术
  • 设计
  • 资讯
  • 扯淡
  • 权利声明
  • 登录 注册

    java 连接ACTIVE MQ 进行队列读写操作的例子

    asuncool@gmail.com(yihaomen)发表于 2016-04-14 09:16:03
    love 0
    MQ 用过好多年了,不过以前用 WEBSPHERE MQ 比较多,IBM 的,商业版。其实开源的 ACTIVE MQ 也算不错了,最近刚好用到,写了个简单的测试例子,在项目中用到的测试例子而已。
    1. 向队列中保存消息
    2. 从队列中获取消息

    向队列中保存消息
    程序代码 程序代码

    public class SaveMsg {

        static final String BROKER_URL = "tcp://192.168.1.111:61616";
        static ActiveMQConnectionFactory factory;
        protected static final Logger log = LogManager.getLogger(SaveMsg.class);
        

        static {
            factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, BROKER_URL);
            factory.setExceptionListener(
                    new ExceptionListener() {
                        @Override
                        public void onException(JMSException ex) {
                           log.warn("监听到ACTIVEMQ JSM 异常" + ex);
                        }
                    }
            );
        }

        final ExecutorService es = Executors.newSingleThreadExecutor();
        String _DESTINATION;
        

        public static void main(String[] args) throws Exception {
            SaveMsg mi = new SaveMsg();      
            for (int i=0; i<100; i++) {            
                mi.sendRoomChat(8888, 80001, 80001, "<FONT style=\\\"FONT-FAMILY:宋体;FONT-SIZE:12px; COLOR:#000000\\\">收到来自android的测试信息" + i + "</FONT>");  
                log.debug("send message,index is :" + i);
            }
            mi.conn.close();
            
          
        }

        public SaveMsg() throws Exception {
            _initConn();
          
        }
      
        public static boolean stat = true;

        private boolean isRunning() {
            return stat;
        }
        Connection conn;

        private void _initConn() throws JMSException {
            conn = factory.createConnection();
            conn.start();
        }
      
        /**
         *
         * @param roomId 房间ID
         * @param srcId 消息发送ID
         * @param toId 消息接收者
         * @param data 聊天消息
         */
        public void sendRoomChat(int roomId, int srcId, int toId, String data) {
            Session session;
            try {
                session = conn.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
                // 创建一个消息队列
                Destination destination = session.createQueue("live.test.rsp");//amq.projId.mode.trans
                // 创建消息制作者
                MessageProducer producer = session.createProducer(destination);
                {//非持久
                    producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
                }
                MapMessage msg = session.createMapMessage();
                {//数据封装
                    msg.setInt("relay_cmd_id", 17);//
                    msg.setInt("vcbid", roomId);//房间ID
                    msg.setInt("srcid", srcId);//当前用户ID
                    msg.setInt("toid", toId);//目标用户ID
                    msg.setInt("msgtype", 0);//文字聊天消息
                    msg.setInt("isprivate", 0);//是否私聊(1为私聊)
                    msg.setString("content", data);
                }
                log.debug("android发送给pc的消息内容为:"+msg);
                producer.send(msg);
                session.close();
            } catch (JMSException ex) {
                log.warn("send android message to pc error:", ex);
            }
        }

        
    }


    从队列中获取消息
    例子如下:
    程序代码 程序代码


    import java.util.Enumeration;
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.MapMessage;
    import javax.jms.MessageConsumer;
    import javax.jms.Queue;
    import javax.jms.QueueBrowser;
    import javax.jms.QueueSession;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    import org.apache.activemq.ActiveMQConnection;
    import org.apache.activemq.ActiveMQConnectionFactory;
    import org.apache.activemq.advisory.DestinationSource;

    /**
    *
    * @author Administrator
    */
    public class ReadMsg {
        
          
        public static void main(String[] args) throws JMSException {
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.1.111:61616");
            Connection connection = connectionFactory.createConnection();
            connection.start();
          
            //第一种情况
            System.out.println("***********************");
            while (true) {
                Session session = connection.createSession(Boolean.FALSE,Session.AUTO_ACKNOWLEDGE);
                Destination destination = session.createQueue("live.test.rsp");
                MessageConsumer consumer = session.createConsumer(destination);
                MapMessage message = (MapMessage) consumer.receive();
                //session.commit();
                
                System.out .println("收到消息:" +message.getString("content"));
                session.close();            
            }
          
        }
        
    }



    这个进程不断的从队列中获取数据, 最简单的方式实现进程间的通信。说白了,MQ 也是ESB 的核心。


沪ICP备19023445号-2号
友情链接