IT博客汇
  • 首页
  • 精华
  • 技术
  • 设计
  • 资讯
  • 扯淡
  • 权利声明
  • 登录 注册

    Redis AE事件库源码剖析

    armsword发表于 2014-11-22 14:24:34
    love 0

    很久之前就看过AE库了,不过这东西好久不看,也忘得差不多了,正好要找工作了,简历上写了之前的项目用过这个AE事件库,索性我就又把AE库看了一遍,本来表达能力就差,复习下吧。

    先上张图,下图左侧是ae.c文件里所有的函数,一目了然。

    AE事件库包含文件事件和时间事件,其包含2个源文件ae.c和ae.h,当然还有四个不同的多用复用封装的文件:ae_epoll.c、ae_select.c、ae_evport.c、ae_kqueue.c。一般常用的是ae_select.c(跨平台用),linux下多用ae_epoll.c。我们就以ae_epoll.c为例吧。

    我们先说说文中用到的几个结构体:

    aeEventLoop 核心数据结构,每个aeEventLoop对应一个event loop。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    /* State of an event based program */
    typedef struct aeEventLoop {
    int maxfd; /* 现在注册的最大文件描述符 */
    int setsize; /* 跟踪文件描述符的最大数量 */
    long long timeEventNextId; //下一个timer的id
    time_t lastTime; /* 用来诊断系统时间偏差 */
    aeFileEvent *events; /* 用于保存epoll需要关注的文件事件的fd、触发条件、注册函数 */
    aeFiredEvent *fired; /* poll_wait之后获得可读或者可写的fd数组,通过aeFiredEvent->fd再定位到events*/
    aeTimeEvent *timeEventHead; //以链表形式保存多个时间事件,每隔一段时间机会触发注册的函数
    int stop; //停止运行的标志
    void *apidata; /* This is used for polling API specific data */
    aeBeforeSleepProc *beforesleep; //阻塞地等待下一个事件发生之前要执行的函数
    } aeEventLoop;

    aeFileEvent是文件事件结构体,mask是要捕获的事件的掩码(读|写),rfileProc和wfileProc分别指处理文件读和写事件的函数,clientData是函数用的参数。

    1
    2
    3
    4
    5
    6
    7
    /* File event structure */
    typedef struct aeFileEvent {
    int mask; /* one of AE_(READABLE|WRITABLE) */
    aeFileProc *rfileProc;
    aeFileProc *wfileProc;
    void *clientData;
    } aeFileEvent;

    aeFiredEvent指的已准备好的文件事件的结构体,包括fd(文件描述符)和mask(掩码):

    1
    2
    3
    4
    5
    /* A fired event */
    typedef struct aeFiredEvent {
    int fd;
    int mask;
    } aeFiredEvent;

    aeTimeEvent 是时间事件用到的结构体,id是指的这个定时器的id,when_sec、when_ms分别指的秒和毫秒,timeproc是时间处理函数,finalizerProc是定时器被删除的时候的回调函数,next指的下一个结点,定时器事件是一个链表形式存储的各个时间事件,无序的。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    /* Time event structure */
    typedef struct aeTimeEvent {
    long long id; /* time event identifier. */
    long when_sec; /* seconds */
    long when_ms; /* milliseconds */
    aeTimeProc *timeProc;
    aeEventFinalizerProc *finalizerProc;
    void *clientData;
    struct aeTimeEvent *next;
    } aeTimeEvent;

    下面这个结构体用在epoll创建,aeCreateApi()里使用:

    1
    2
    3
    4
    typedef struct aeApiState {
    int epfd;
    struct epoll_event *events;
    } aeApiState;

    下面来说说程序流程吧,再说aeMain()函数之前,我先说下一些准备动作吧,分别为aeCreateEventLoop()、aeCreateTimeEvent()、aeCreateFileEvent()和aeSetBeforeSleepProc()。 aeCreateEventLoop()创建一个aeEventLoop结构体,做些初始化,之后调用函数aeApiCreate(),如上述结构体,epfd由epoll_create创建,*events 保存epoll_wait返回的事件组。 aeCreateFileEvent()为fd注册一个文件事件,使用epoll_ctl加入到全局的epoll fd 进行监控,之后再指定事件可读写处理函数。 aeCreateTimeEvent()注册一个时间事件,使用aeAddMillisecondsToNow()加入时间事件。 beforesleep 就是个回调函数,sleep之前调用的函数,有些事情是每次循环必须做的,并非文件和时间事件。

    1
    2
    3
    void aeSetBeforeSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *beforesleep) {
    eventLoop->beforesleep = beforesleep;
    }

    下面说说核心循环函数aeMain()吧:

    1
    2
    3
    4
    5
    6
    7
    8
    void aeMain(aeEventLoop *eventLoop) {
    eventLoop->stop = 0;
    while (!eventLoop->stop) {
    if (eventLoop->beforesleep != NULL)
    eventLoop->beforesleep(eventLoop);
    aeProcessEvents(eventLoop, AE_ALL_EVENTS);
    }
    }

    由stop来决定是否停止,beforesleep可以决定它是否停止,但redis里beforesleep就是一个实现,做的工作只是保证aeProcessEvents要用到的fd都准备好。aeProcessEvents一般会阻塞的(有例外)等待事件(时间和文件事件)的发生,然后做一些处理,代码如下所示:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    * The function returns the number of events processed. */
    int aeProcessEvents(aeEventLoop *eventLoop, int flags)
    {
    int processed = 0, numevents;
    /* Nothing to do? return ASAP */
    if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;
    /* Note that we want call select() even if there are no
    * file events to process as long as we want to process time
    * events, in order to sleep until the next time event is ready
    * to fire. */
    if (eventLoop->maxfd != -1 ||
    ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
    int j;
    aeTimeEvent *shortest = NULL;
    struct timeval tv, *tvp;
    if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
    shortest = aeSearchNearestTimer(eventLoop);
    if (shortest) {
    long now_sec, now_ms;
    /* Calculate the time missing for the nearest
    * timer to fire. */
    aeGetTime(&now;_sec, &now;_ms);
    tvp = &tv;
    tvp->tv_sec = shortest->when_sec - now_sec;
    if (shortest->when_ms < now_ms) {
    tvp->tv_usec = ((shortest->when_ms+1000) - now_ms)*1000;
    tvp->tv_sec --;
    } else {
    tvp->tv_usec = (shortest->when_ms - now_ms)*1000;
    }
    if (tvp->tv_sec < 0) tvp->tv_sec = 0;
    if (tvp->tv_usec < 0) tvp->tv_usec = 0;
    } else {
    /* If we have to check for events but need to return
    * ASAP because of AE_DONT_WAIT we need to set the timeout
    * to zero */
    if (flags & AE_DONT_WAIT) {
    tv.tv_sec = tv.tv_usec = 0;
    tvp = &tv;
    } else {
    /* Otherwise we can block */
    tvp = NULL; /* wait forever */
    }
    }
    numevents = aeApiPoll(eventLoop, tvp);
    for (j = 0; j < numevents; j++) {
    aeFileEvent *fe = &eventLoop-;>events[eventLoop->fired[j].fd];
    int mask = eventLoop->fired[j].mask;
    int fd = eventLoop->fired[j].fd;
    int rfired = 0;
    /* note the fe->mask & mask & ... code: maybe an already processed
    * event removed an element that fired and we still didn't
    * processed, so we check if the event is still valid. */
    if (fe->mask & mask & AE_READABLE) {
    rfired = 1;
    fe->rfileProc(eventLoop,fd,fe->clientData,mask);
    }
    if (fe->mask & mask & AE_WRITABLE) {
    if (!rfired || fe->wfileProc != fe->rfileProc)
    fe->wfileProc(eventLoop,fd,fe->clientData,mask);
    }
    processed++;
    }
    }
    /* Check time events */
    if (flags & AE_TIME_EVENTS)
    processed += processTimeEvents(eventLoop);
    return processed; /* return the number of processed file/time events */
    }

    注意参数flags,当为AE_DONT_WAIT时就表示是不阻塞的检查关注的事件是否发生了,如果没发生则直接返回0。
    aeProcessEvents处理过程如下:
    首先通过调用aeSearchNearestTimer找到离现在最近的定时器是什么时候,这个地方其实做的不好,不如libevent的定时器,需要O(N)得到最短时间,可以改为最小堆保存时间,然后将得到的时间作为参数传给aeApiPoll,作为其timeout参数,来返回命中的文件事件数目,查看eventLoop->fired,可以取出命中事件的详细信息,然后调用rfileProc或wfileProc做事件读写处理,最后再调用processTimeEvents处理已经触发了的定时器事件,然后返回所有已处理的事件(文件和时间)总和。
    最后,参考链接里的那个Redis执行流程图还是挺有借鉴意义的,图片太大,我就不转过来了,参考链接在最下方。

    PS:边看边写,写到此处竟然又晚上两点多了,现在作息时间越来越乱,等拿到offer后,需要调整下生物钟了。

    参考链接:http://my.oschina.net/qichang/blog/32469



沪ICP备19023445号-2号
友情链接