IT博客汇
  • 首页
  • 精华
  • 技术
  • 设计
  • 资讯
  • 扯淡
  • 权利声明
  • 登录 注册

    [原]oracle 高级队列技术

    changyanmanman发表于 2015-07-30 14:58:33
    love 0

    Oracle AQ是Oracle数据库中集成的一种消息队列机制,可以用于不同应用程序间的消息交互,例如PL/SQL可以通过相应的Package访问队列、C#应该程序可以通过ODP.NET访问队列、Java应用程序则可以通过OJMS访问队列。AQ内部是通过数据库表实现的(即消息实际上是存储在数据库表中)。

    高级队列的应用范围非常广泛。 除了提供消息在oracle数据库和应用程序以及用户之间传送的功能之外,利用oracle Net Services消息还可以在oracle数据库的客户端和服务端之间或两个数据库之间以及一个oracle 队列到另一个队列之间传递。 而且,基于HTTP/HTTPS或SMTP等传输协议,我们还可以通过Internet执行高级队列操作。 此外ORACLE高级队列通过消息处理网关可以实现同现存的非oracle消息系统的无缝集成。

    另外,由于oracle 高级队列集成于数据库,它便具有一些其它消息队列所不具备的特殊优势:

    首先,它的操作继承了数据库的所有优点,例如可靠性’完整性’高可用性’安全性以及可伸缩性等。

    其次,消息的管理大大方便了, 由于采用数据库表存储消息,因此用户可以利用标准的SQL 语句访问消息信息,包括消息的属性、历史消息、消息负载。同样可以对消息进行审计和跟踪,利用索引可以更好地优化消息管理。

    第三,同其它数据库表一样,队列表还可以被导入、导出。

     高级队列消息传递机制
    oracle高级队列的具体消息传递机制大致如下:消息“ 生产者” 把消息装入队列( 称为Enqueue,入列) ,消息“消费者”从队列中取消息( 称为Dequeue,出列) 。队列表以数据库表的形式存在,队列存储在队列表中。


    应用实例
    在医疗保险系统中,医院的应用程序和医保中心的应用程序之间需要进行通信。 其中一个例子便是医保病人在医院进行的有些医疗项目需要先经过医保中心审批。 否则,医保中心不给予报销。 这样,在医院的应用程序中,如果输入一个需要审批的医疗项目时,应该把这个申请信息传送到医保中心。 审批后,医保中心把结果返回给医院。
    在oracle 高级队列的实现中, 可以使用多种方式:PL/SQL 、java、C 等。在医保应用和医院应用通过oracle 高级队列的通信中,我们使用PL/SQL 使用高级队列进行医院应用和医保中心应用之间通信的大致过程如图 所示:



    以上案例实现过程参考文档:http://download.csdn.net/detail/changyanmanman/8946375








    下面展示了PL/SQL中使用AQ的基本用法,示例假设了一个场景:A是一个被频繁调用的存储过程,每次调用A之前需要调用过程B,B消耗大量的时间,假设A的执行并不依赖于B的执行结果,我们可以把调用B的上下文先存入AQ中,而后异步地进行处理,从而减小了B对应用程序性能的影响。

    1.    创建AQ所需要的权限

    复制代码
    GRANT EXECUTE ON DBMS_AQ TO user1;
    GRANT EXECUTE ON DBMS_AQADM TO user1;
    BEGIN
      DBMS_AQADM.GRANT_SYSTEM_PRIVILEGE('ENQUEUE_ANY', 'user1', FALSE);
      DBMS_AQADM.GRANT_SYSTEM_PRIVILEGE('DEQUEUE_ANY', 'user1', FALSE);
    END;
    复制代码

    需要使用sys或system用户发起这些授权语句,欲创建及管理AQ,需要获得两个至关重要的包dbms_aq, dbms_aqadm的执行权限。后两个通过grant_system_privilege进行的授权是可选的,它们表示的是:

    ENQUEUE_ANY means users granted this privilege are allowed to enqueue messages to any queues in the database. DEQUEUE_ANY means users granted this privilege are allowed to dequeue messages from any queues in the database.

    2.    创建一个payload类型

    复制代码
    CREATE OR REPLACE TYPE t_spl_queue_payload AS OBJECT
    (
      ID             CHAR(36),
      EXEC_DATE     TIMESTAMP(6),
      PARAMETER1       NUMBER,
      PARAMETER2       VARCHAR2(500),
      FLAG           CHAR(1)
    );
    复制代码

    通常我们会定义一个对象,用于存储将来需要放置在AQ队列中的信息。

    3.    创建AQ相关表

    BEGIN
      DBMS_AQADM.CREATE_QUEUE_TABLE(queue_table        => 'user1. Spl_queue_table',
                                    multiple_consumers => TRUE,
                                    queue_payload_type => 'user1.t_spl_queue_payload');
    END;

    执行的结果是生成了表Spl_queue_table,以及若干个aq$_ spl_queue_table_表。表Spl_queue_table中除了AQ队列自身需要的一些字段外,有一个类型为t_spl_queue_payload的USER_DATA字段,用于存储队列消息,这也印证了上面说的:AQ内部是通过数据库表实现的。


    4.    创建及启动AQ

    复制代码
    BEGIN
      DBMS_AQADM.CREATE_QUEUE(queue_name  => 'user1.spl_aq',
                              queue_table => 'user1.spl_queue_table');
    END;
    --
    BEGIN
      DBMS_AQADM.START_QUEUE(queue_name => 'user1.spl_aq');
    END;
    复制代码

    如何停止及删除AQ:

    BEGIN
      DBMS_AQADM.STOP_QUEUE (queue_name => 'user1.spl_aq'); 
      DBMS_AQADM.DROP_QUEUE (queue_name => 'user1.spl_aq'); 
      DBMS_AQADM.DROP_QUEUE_TABLE (queue_table => 'user1.spl_queue_table');
    END;

    5.    消息的入队

    复制代码
    PROCEDURE enqueue(p_payload IN t_spl_queue_payload) IS
      --PRAGMA AUTONOMOUS_TRANSACTION;
      enqueue_options    dbms_aq.enqueue_options_t;
      message_properties dbms_aq.message_properties_t;
      message_handle     RAW(16);
      recipients         DBMS_AQ.aq$_recipient_list_t;
    BEGIN
      recipients(1) := sys.aq$_agent('someguy', 'user1.SPL_AQ', NULL);
      message_properties.recipient_list := recipients;
      message_properties.priority := -5;
      message_properties.delay := dbms_aq.no_delay;
      message_properties.expiration := dbms_aq.never;
      --enqueue_options.visibility := dbms_aq.on_commit;
      enqueue_options.visibility := dbms_aq.immediate;
      enqueue_options.sequence_deviation := null;
    
      dbms_aq.enqueue(queue_name         => 'user1.SPL_AQ',
                      enqueue_options    => enqueue_options,
                      message_properties => message_properties,
                      payload            => p_payload,
                      msgid              => message_handle);
    
      --COMMIT;
    END enqueue;
    复制代码

    (1) recipient,其中“someguy”指定的是消息的接收者,出队时你需要指定一样的名字才能接收到消息。(2)visibility,可以是on_commit或者immediate,如果使用on_commit,需要手工调用commit语句之后消息才进入队列(这种情况下,最好使用自治事务);如果使用immediate,则dbms_aq.enqueue完成时消息就进入队列,不需commit,并且默认使用自治事务。

    6.    消息的出队

    复制代码
    PROCEDURE dequeue IS
      l_payload      t_spl_queue_payload;
      l_queue_record NUMBER;
      dequeue_options    dbms_aq.dequeue_options_t;
      message_properties dbms_aq.message_properties_t;
      message_handle     RAW(16);
    BEGIN
      dequeue_options.consumer_name := 'someguy';
      dequeue_options.dequeue_mode  := dbms_aq.remove;
      dequeue_options.navigation    := dbms_aq.next_message;
      dequeue_options.visibility    := dbms_aq.immediate;
      --dequeue_options.wait          := dbms_aq.forever;
      dequeue_options.wait          := dbms_aq.no_wait;
      dequeue_options.msgid         := null;
      --
      SELECT COUNT(*)
        INTO l_queue_record
        FROM AQ$SPL_QUEUE_TABLE
       WHERE msg_state = 'READY';
      --
      FOR i IN 1 .. l_queue_record LOOP
        dbms_aq.dequeue(queue_name         => 'user1.SPL_AQ',
                        dequeue_options    => dequeue_options,
                        message_properties => message_properties,
                        payload            => l_payload,
                        msgid              => message_handle);
      --
      /*……………………………………….
      some time consuming calculation
      ………………………………………….*/
      END LOOP;
    END;
    复制代码

    (1) consumer_name需要和前面在入队时指定的recipient一致。(2)wait的两个值forever和no_wait是指如果当前队列中无消息时,是否进行等待,默认等待。(3) navigation的两个值first_message和next_message,一般出于性能考虑我们使用后者,或者在第一次出队时使用前者而在随后的出队中使用后者:

    The FIRST_MESSAGE navigation option performs a SELECT on the queue. The NEXT_ MESSAGE navigation option fetches from the results of the SELECT run in the FIRST_ MESSAGE navigation. Thus performance is optimized because subsequent dequeues need not run the entire SELECT again.



沪ICP备19023445号-2号
友情链接