MQ 用过好多年了,不过以前用 WEBSPHERE MQ 比较多,IBM 的,商业版。其实开源的 ACTIVE MQ 也算不错了,最近刚好用到,写了个简单的测试例子,在项目中用到的测试例子而已。
1. 向队列中保存消息
2. 从队列中获取消息
向队列中保存消息 程序代码
public class SaveMsg {
static final String BROKER_URL = "tcp://192.168.1.111:61616";
static ActiveMQConnectionFactory factory;
protected static final Logger log = LogManager.getLogger(SaveMsg.class);
static {
factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, BROKER_URL);
factory.setExceptionListener(
new ExceptionListener() {
@Override
public void onException(JMSException ex) {
log.warn("监听到ACTIVEMQ JSM 异常" + ex);
}
}
);
}
final ExecutorService es = Executors.newSingleThreadExecutor();
String _DESTINATION;
public static void main(String[] args) throws Exception {
SaveMsg mi = new SaveMsg();
for (int i=0; i<100; i++) {
mi.sendRoomChat(8888, 80001, 80001, "<FONT style=\\\"FONT-FAMILY:宋体;FONT-SIZE:12px; COLOR:#000000\\\">收到来自android的测试信息" + i + "</FONT>");
log.debug("send message,index is :" + i);
}
mi.conn.close();
}
public SaveMsg() throws Exception {
_initConn();
}
public static boolean stat = true;
private boolean isRunning() {
return stat;
}
Connection conn;
private void _initConn() throws JMSException {
conn = factory.createConnection();
conn.start();
}
/**
*
* @param roomId 房间ID
* @param srcId 消息发送ID
* @param toId 消息接收者
* @param data 聊天消息
*/
public void sendRoomChat(int roomId, int srcId, int toId, String data) {
Session session;
try {
session = conn.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
// 创建一个消息队列
Destination destination = session.createQueue("live.test.rsp");//amq.projId.mode.trans
// 创建消息制作者
MessageProducer producer = session.createProducer(destination);
{//非持久
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
}
MapMessage msg = session.createMapMessage();
{//数据封装
msg.setInt("relay_cmd_id", 17);//
msg.setInt("vcbid", roomId);//房间ID
msg.setInt("srcid", srcId);//当前用户ID
msg.setInt("toid", toId);//目标用户ID
msg.setInt("msgtype", 0);//文字聊天消息
msg.setInt("isprivate", 0);//是否私聊(1为私聊)
msg.setString("content", data);
}
log.debug("android发送给pc的消息内容为:"+msg);
producer.send(msg);
session.close();
} catch (JMSException ex) {
log.warn("send android message to pc error:", ex);
}
}
}
从队列中获取消息 例子如下:
程序代码
import java.util.Enumeration;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.MessageConsumer;
import javax.jms.Queue;
import javax.jms.QueueBrowser;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.advisory.DestinationSource;
/**
*
* @author Administrator
*/
public class ReadMsg {
public static void main(String[] args) throws JMSException {
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.1.111:61616");
Connection connection = connectionFactory.createConnection();
connection.start();
//第一种情况
System.out.println("***********************");
while (true) {
Session session = connection.createSession(Boolean.FALSE,Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("live.test.rsp");
MessageConsumer consumer = session.createConsumer(destination);
MapMessage message = (MapMessage) consumer.receive();
//session.commit();
System.out .println("收到消息:" +message.getString("content"));
session.close();
}
}
}
这个进程不断的从队列中获取数据, 最简单的方式实现进程间的通信。说白了,MQ 也是ESB 的核心。