IT博客汇
  • 首页
  • 精华
  • 技术
  • 设计
  • 资讯
  • 扯淡
  • 权利声明
  • 登录 注册

    [原][Erlang 0101] Gproc:扩展进程注册机制

    ligaorenvip发表于 2014-01-01 16:23:31
    love 0

    2013-3-25 22:45:01更新:抱歉 抱歉 脑子里面想的是进程注册 手误 一直把进程注册写成了进程字典

    Erlang 进程注册机制目前的限制是:
    • names只能是atom
    • 一个进程只能注册一个name
    • 不能进行高效的搜索和遍历,进程信息的检索是通过遍历检查进程的元数据完成的.
    Ulf T. Wiger的开源项目 Gproc 就是解决上面问题的,难得的是这个项目的文档,范例,测试代码相当完整,还有专门的论文讲述整个项目的来龙去脉,设计取舍.
    Gproc是Erlang进程注册机制的加强版,提供了如下原生进程注册没有的功能:
    • 使用任意Erlang Term作为进程的别名
    • 一个进程注册多个别名
    • 支持QLC和match specification高效查询
    • 自动移交已经注册的names和属性到另外的进程
    • .....
    那它是怎么做的呢?这些额外的信息是维护在哪里,是ETS吗?动手操练一下,验证想法:
    复制代码
    Eshell V5.9  (abort with ^G)
    1> application:start(gproc).
    ok
    2> ets:i().
    id              name              type  size   mem      owner
    ----------------------------------------------------------------------------
    13              code              set   265    10683    code_server
    4110            code_names        set   53     7018     code_server
    8207            shell_records     ordered_set 0      73       <0.26.0>
    ac_tab          ac_tab            set   9      929      application_controller
    file_io_servers file_io_servers   set   0      283      file_server_2
    global_locks    global_locks      set   0      283      global_name_server
    global_names    global_names      set   0      283      global_name_server
    global_names_ext global_names_ext  set   0      283      global_name_server
    global_pid_ids  global_pid_ids    bag   0      283      global_name_server
    global_pid_names global_pid_names  bag   0      283      global_name_server
    gproc           gproc             ordered_set 0      73       gproc_sup
    gproc_monitor   gproc_monitor     ordered_set 0      73       gproc_monitor
    inet_cache      inet_cache        bag   0      283      inet_db
    inet_db         inet_db           set   29     554      inet_db
    inet_hosts_byaddr inet_hosts_byaddr bag   0      283      inet_db
    inet_hosts_byname inet_hosts_byname bag   0      283      inet_db
    inet_hosts_file_byaddr inet_hosts_file_byaddr bag   0      283      inet_db
    inet_hosts_file_byname inet_hosts_file_byname bag   0      283      inet_db
    ok
    复制代码

    应用程序启动之后我们查看ETS表的信息,发现多出来两个表:gproc和gproc_monitor;下面我们就使用Shell进程完成测试,为当前的Shell进程创建别名"Shell",并向它发送消息dvd,之后再shell中flush()查看已经接收到的消息.

    复制代码
    5> gproc:reg({p,l,"Shell"}).
    true
    6> gproc:send({p,l,"Shell"},dvd).
    dvd
    7> flush().
    Shell got dvd
    ok
    复制代码

    现在查看一下ETS表的内容,发现里面已经记录了当前Shell进程的注册信息;
    9> ets:i(gproc).
    <1   > {{<0.43.0>,l}}
    <2   > {{<0.43.0>,{p,l,"Shell"}},[]}
    <3   > {{{p,l,"Shell"},<0.43.0>},<0.43.0>,undefined}
    EOT  (q)uit (p)Digits (k)ill /Regexp -->q
    ok

    紧接上面我们为Shell再创建一个别名,然后查看ETS
    复制代码
    10> gproc:reg({p,l,"Shell_alia"}).
    true
    11> ets:i(gproc).
    <1   > {{<0.43.0>,l}}
    <2   > {{<0.43.0>,{p,l,"Shell"}},[]}
    <3   > {{<0.43.0>,{p,l,"Shell_alia"}},[]}
    <4   > {{{p,l,"Shell"},<0.43.0>},<0.43.0>,undefined}
    <5   > {{{p,l,"Shell_alia"},<0.43.0>},<0.43.0>,undefined}
    EOT  (q)uit (p)Digits (k)ill /Regexp -->q
    ok
     
    复制代码

    下面简单演示一下QLC:
    复制代码
    40>  Q5 = qlc:q([P || {{p,l,'_'},P,C} <- gproc:table( )]).
    {qlc_handle,{qlc_lc,#Fun20.111823515>,
                        {qlc_opt,false,false,-1,any,[],any,524288,allowed}}}
    41>
    41> qlc:eval(Q5).
    [<0.65.0>,<0.61.0>]
    42> qlc:e(qlc:q([N || N <- gproc:table()])).
    [{{p,l,"Hello"},<0.65.0>,undefined},
    {{p,l,"NEW_Process"},<0.61.0>,undefined}]
    复制代码

    上面的几段代码基本上包含了它最重要的feature,我们看下实现,注册name的过程实际上是把注册信息写到了ETS;而发送消息的第一步就是从ETS表中查询name对应的Pid,然后进行发送.以发送为例看一下代码实现:
    复制代码
    https://github.com/esl/gproc/blob/master/src/gproc.erl
     
    %% If Key belongs to a unique object (name or aggregated counter), this
    %% function will send a message to the corresponding process, or fail if there
    %% is no such process. If Key is for a non-unique object type (counter or
    %% property), Msg will be send to all processes that have such an object.
    %% @end
    %%
    send(Key, Msg) ->
        ?CATCH_GPROC_ERROR(send1(Key, Msg), [Key, Msg]).
    
    send1({T,C,_} = Key, Msg) when C==l; C==g ->
        if T == n orelse T == a ->
                case ets:lookup(?TAB, {Key, T}) of
                    [{_, Pid, _}] ->
                        Pid ! Msg;
                    _ ->
                        ?THROW_GPROC_ERROR(badarg)
                end;
           T==p orelse T==c ->
                %% BUG - if the key part contains select wildcards, we may end up
                %% sending multiple messages to the same pid
                lists:foreach(fun(Pid) ->
                                      Pid ! Msg
                              end, lookup_pids(Key)),
                Msg;
           true ->
                erlang:error(badarg)
        end;
    send1(_, _) ->
        ?THROW_GPROC_ERROR(badarg).
     
    复制代码

    细致的实现
    下面的测试我们通过gproc:info查看进程信息,下面的结果注意一下current function属性值,是不是有疑问?
    复制代码
    17> P2=spawn(fun() -> gproc:reg({p,l,"NEW_Process"}),receive a -> a end end ).
    <0.61.0>
    18> ets:i(gproc).
    <1   > {{<0.61.0>,l}}
    <2   > {{<0.61.0>,{p,l,"NEW_Process"}},[]}
    <3   > {{{p,l,"NEW_Process"},<0.61.0>},<0.61.0>,undefined}
    EOT  (q)uit (p)Digits (k)ill /Regexp -->q
    ok
    19> gproc:info(P2).
    [{gproc,[{{p,l,"NEW_Process"},undefined}]},
    {current_function,{erl_eval,receive_clauses,6}},
    {initial_call,{erlang,apply,2}},
    {status,waiting},
    {message_queue_len,0},
    {messages,[]},
    {links,[]},
    {dictionary,[]},
    {trap_exit,false},
    {error_handler,error_handler},
    {priority,normal},
    {group_leader,<0.25.0>},
    {total_heap_size,610},
    {heap_size,233},
    {stack_size,8},
    {reductions,100},
    {garbage_collection,[{min_bin_vheap_size,46368},
                          {min_heap_size,233},
                          {fullsweep_after,65535},
                          {minor_gcs,1}]},
    {suspending,[]}]
    复制代码

    这里内部实现还是做得非常细致的,不是简单的把信息附加在Process_info,比如对current function信息做了修正.
    复制代码
    https://github.com/esl/gproc/blob/master/src/gproc.erl
     
    %% We don't want to return the internal gproc:info() function as current
    %% function, so we grab the 'backtrace' and extract the call stack from it,
    %% filtering out the functions gproc:info/_ and gproc:'-info/1-lc...' entries.
    %%
    %% This is really an indication that wrapping the process_info() BIF was a
    %% bad idea to begin with... :P
    %%
    info_cur_f(T, Default) ->
        {match, Matches} = re:run(T,<<"\\(([^\\)]+):(.+)/([0-9]+)">>,
                         [global,{capture,[1,2,3],list}]),
        case lists:dropwhile(fun(["gproc","info",_]) -> true;
                       (["gproc","'-info/1-lc" ++ _, _]) -> true;
                       (_) -> false
                   end, Matches) of
         [] ->
             Default;
         [[M,F,A]|_] ->
             {current_function,
              {to_atom(M), to_atom(F), list_to_integer(A)}}
        end.
    复制代码

    好了今天就到这里.
    最后小图一张:


沪ICP备19023445号-2号
友情链接