IT博客汇
  • 首页
  • 精华
  • 技术
  • 设计
  • 资讯
  • 扯淡
  • 权利声明
  • 登录 注册

    Erlang简单并行服务器

    金庆发表于 2015-07-28 07:13:00
    love 0
    Erlang简单并行服务器

    (金庆的专栏)

    Erlang并行服务器为每个Tcp连接创建对应的连接进程,处理客户端数据。

    参考 Erlang程序设计(第2版)
    17.1.3 顺序和并行服务器

    并行服务器的诀窍是:每当gen_tcp:accept收到一个新连接时就立即分裂一个新进程。
    为每个新套接字连接创建一个并行进程。

    -module(gs_svr).
    -author("jinqing").

    -behaviour(gen_server).

    %% API
    -export([start_link/0]).

    init([]) ->
        gs_listener:start_parallel(),
        {ok, #{}}.

    gs_svr(GameServer gen_server)启动Tcp监听,并维护连接,如连接计数,发送广播。

    start_parallel()创建监听端口,然后创建连接进程。

    start_parallel() ->
        Port = server_csv:get_my_port(),
        lager:info("Starting game server on port ~p...", [Port]),
        {ok, ListenSocket} = gen_tcp:listen(Port,
            [binary, {packet, 4},
                {packet_size, 256 * 1024},  % limit packet size
                {reuseaddr, true},
                {nodelay, true},
                {backlog, 999999},
                {active, once}]),
        connection:spawn_connection(ListenSocket).


    spawn_connection()创建连接进程。每接受一个连接就再创建一个新的连接进程。

    -module(connection).
    -author("jinqing").

    %% API
    -export([spawn_connection/1]).
    -export([parallel_connect/1, loop/2]).

    -spec spawn_connection(ListenSocket :: gen_tcp:socket()) -> pid().
    spawn_connection(ListenSocket) ->
        spawn(fun() -> ?MODULE:parallel_connect(ListenSocket) end).

    -spec parallel_connect(ListenSocket :: gen_tcp:socket()) -> ok.
    parallel_connect(ListenSocket) ->
        {ok, Socket} = gen_tcp:accept(ListenSocket),
        spawn_connection(ListenSocket),
        
        gs_svr:cast_connection_new(self()),
        ConnStat = conn_stat:new(),
        erlang:send_after(1000, self(), timer_sec),
        try ?MODULE:loop(Socket, ConnStat)
        catch
            Type:E -> lager:error("loop() ~p:~p. ~p",
                [Type, E, erlang:get_stacktrace()])
        end,
        gs_svr:cast_connection_ended(self()),
        ok.

    -spec loop(Socket :: gen_tcp:socket(), ConnStat :: conn_stat:conn_stat()) -> any().
    loop(Socket, ConnStat) ->
        receive
            {tcp, Socket, Bin} ->
                NewConnStat = rpc_handler:handle_bin(Socket, Bin, ConnStat),
                inet:setopts(Socket, [{active, once}]),
                NewConnStat2 = cutil_dos_checker:on_data(size(Bin), NewConnStat),
                ?MODULE:loop(Socket, NewConnStat2#{idle_sec=>0});
            {tcp_closed, Socket} ->
                save_on_end(ConnStat);
            {tcp_error, Socket, Reason} ->
                save_on_end(ConnStat);

            {gs_to_connection, Msg} ->
                NewConnStat = handle_gs_msg(Msg, Socket, ConnStat),
                ?MODULE:loop(Socket, NewConnStat);

            timer_sec ->
                case conn_timer:timer_sec(ConnStat) of
                    {ok, NewConnStat} ->
                        erlang:send_after(1000, self(), timer_sec),
                        ?MODULE:loop(Socket, NewConnStat);
                end;
            Other ->
                lager:error("Unknown msg: ~p", [Other]),
                ?MODULE:loop(Socket, ConnStat)
        end.  % This is tail-recursive.


    缺点是连接进程没有加入监控树。gs_svr出错重启时,连接进程connection应该断开并退出。


    金庆 2015-07-28 15:13 发表评论


沪ICP备19023445号-2号
友情链接