IT博客汇
  • 首页
  • 精华
  • 技术
  • 设计
  • 资讯
  • 扯淡
  • 权利声明
  • 登录 注册

    rabbitmq java client api详解

    五四陈科学院发表于 2014-08-27 15:33:38
    love 0

    以下内容由[五四陈科学院]提供

    AMQP

    • AMQP协议是一个高级抽象层消息通信协议,RabbitMQ是AMQP协议的实现。

    基础概念快速入门

    • 每个rabbitmq-server叫做一个Broker,等着tcp连接进入。
    • 在rabbitmq-server进程内有Exchange,定义了这个消息的发送类型。(一对多、直连、多对多等等)
    • Queue是进程内的逻辑队列,有多个,有名字。
    • Binding联系Exchane与Queue。
    • Routing key由生产者指定。Binding key由消费者指定。二者联合决定一条消息的来去。

    java client api

    连接

    1
    2
    3
    4
    
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost(hostName);
    Connection conn = factory.newConnection();
    Channel channel = conn.createChannel();
    • 以上是得到一个rabbitmq连接最最基础的代码,当然了,还可以设置一些诸如用户名密码的事情。
    • 最后这个channel就可以用来收和发消息了。

    消息者线程池

    1
    2
    
    ExecutorService es = Executors.newFixedThreadPool(20);
    Connection conn = factory.newConnection(es);
    • 消费者时使用,上述自动开了一20个线程的池来搞。

    地址数组

    1
    2
    3
    
    Address[] addrArr = new Address[]{ new Address(hostname1, portnumber1)
                                     , new Address(hostname2, portnumber2)};
    Connection conn = factory.newConnection(addrArr);
    • 上述代码如果连hostname1失败了就去hostname2。
    • factory.newConnection()会触发这个检测。

    声明exchange与queue

    1
    2
    3
    
    channel.exchangeDeclare(exchangeName, "direct", true);
    String queueName = channel.queueDeclare().getQueue();
    channel.queueBind(queueName, exchangeName, routingKey);
    • channel.exchangeDeclare 参数有 交换机名字 类型 是否持久化 不使用时是否自动删除 是否是内部的(不能被客户端使用) 其他参数
    • channel.queueDeclare 参数有 queue名字 是否持久化 独占的queue(仅供此连接) 不使用时是否自动删除 其他参数
    • channel.queueBind 参数有 queue名字 交换机名字 此次绑定使用的路由关键字 其他参数

    发出消息

    1
    2
    
    byte[] messageBodyBytes = "Hello, world!".getBytes();
    channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);
    • channel.basicPublish 参数有 要发出的交换机名字 路由关键字 是否强制(设置为true时,找不到收的人时可以通过returnListener返回) 是否立即(其实rabbitmq不支持) 其他属性 消息主体

    线程安全

    • Channel是线程好全的,但是最好是每个线程里用自己的Channel,因为在单个Channel里排队是有可能慢一些的。

    最简单的办法消费消息

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    
    boolean autoAck = false;
    channel.basicConsume(queueName, autoAck, "myConsumerTag",
         new DefaultConsumer(channel) {
             @Override
             public void handleDelivery(String consumerTag,
                                        Envelope envelope,
                                        AMQP.BasicProperties properties,
                                        byte[] body)
                 throws IOException
             {
                 String routingKey = envelope.getRoutingKey();
                 String contentType = properties.contentType;
                 long deliveryTag = envelope.getDeliveryTag();
                 // (process the message components here ...)
                 channel.basicAck(deliveryTag, false);
             }
         });
    • 一个Channel一个Consumer。
    • channel.basicAck 回发ACK 参数 tag 是否多个。

    零碎

    • channel.basicQos 指定服务质量设置 参数 最大的投送字节数 最大的投送消息数量 设置是否要应用到整个channel(而不是一个消费者)。
    • factory.setAutomaticRecoveryEnabled(true) 网络有问题时,好后可自动恢复设置。
    • cf.setRequestedHeartbeat(5) 设置心跳时间。
    • exchange type可用的值:direct topic headers fanout。
    • exchange的类型有一个default,basicPublish没有指定时使用,而且,如果routingKey在指定绑定的时候,会去到绑定的exchange。
    • channel.queueDeclare().getQueue() 得到的是一个随机queue,断开连接后即删除。
    • 当exchange为direct的时候routingKey与bindingKey必须完全一致才能消费消息。

    想快点找到作者也可以到Twitter上留言: @54chen
    或者你懒得带梯子上墙,请到新浪微博:@54chen


沪ICP备19023445号-2号
友情链接