IT博客汇
  • 首页
  • 精华
  • 技术
  • 设计
  • 资讯
  • 扯淡
  • 权利声明
  • 登录 注册

    使用HiRedis实现自动重连Redis

    春秋十二月发表于 2021-02-25 07:51:00
    love 0
    主要思路
     1. 首次连接时调用redisConnectWithTimeout或redisConnectUnixWithTimeout连接Redis服务端,若成功则保存返回的redisContext,假设为ctx
     2. 发送命令数据后获取响应,如果是pipeling模式则调用redisGetReply获取响应,再检查redisContext中的错误码,如果为网络出错或关闭,则不置位ctx REDIS_CONNECTED标志
     3. 在下次发送数据时,先检查ctx否置位了REDIS_CONNECTED标志,若没有则调用redisReconnect重连Redis服务端

    实现代码
     自动连接
     1 int redis_auto_connect(CBED_REDIS *redis)
     2 {
     3     if(NULL==redis->ctx){
     4         redisContext *ctx;
     5         if(redis->type == CONN_TCP)
     6             ctx = redisConnectWithTimeout(redis->conn.tcp.ip, redis->conn.tcp.port, redis->timeout_conn);
     7         else
     8             ctx = redisConnectUnixWithTimeout(redis->conn.unix.path, redis->timeout_conn);
     9         
    10         if(NULL==ctx){
    11             zlog_fatal(c_redis, "redis allocate context fail");
    12             return -1;
    13             
    14         }else if(ctx->err){
    15             zlog_fatal(c_redis, "redis connection %s:%d error: %s", 
    16                         redis->type==CONN_TCP?redis->conn.tcp.ip:redis->conn.unix.path, 
    17                         redis->type==CONN_TCP?redis->conn.tcp.port:0, ctx->errstr);
    18             redisFree(ctx);
    19             return -1;
    20         }
    21         
    22         if(REDIS_ERR==redisSetTimeout(ctx, redis->timeout_rw)){
    23             zlog_fatal(c_redis, "redis set rw timeout error: %s", ctx->errstr);
    24             redisFree(ctx);
    25             return -1;
    26         }
    27         
    28         redis->ctx = ctx;
    29         if(redis_auth(redis)){
    30             redisFree(ctx);
    31             redis->ctx = NULL;
    32             return -1;
    33         }
    34         
    35     }    else if(!(redis->ctx->flags & REDIS_CONNECTED)){
    36         int retry = redis->reconn_max, n = 0;
    37         do {
    38             if(REDIS_OK==redisReconnect(redis->ctx)){
    39                 return redis_auth(redis);
    40             }
    41             
    42             zlog_warn(c_redis, "redis reconnect %d error: %s", ++n, redis->ctx->errstr);    
    43             sleep(redis->reconn_interval); //reconn_interval default is 3 seconds
    44             
    45         }while(--retry > 0);
    46         
    47         zlog_error(c_redis, "redis reconnect exceed max num %d", redis->reconn_max);
    48         return -1;
    49     }
    50 
    51     return 0;
    52 }
     
     发送时检查错误码
     1 static int redis_bulk_get_reply(CBED_REDIS *redis)
     2 {
     3     redisReply *r;
     4     int i = 0;        
     5     int num = redis->cmd_num;
     6     redis->cmd_num = 0;
     7 
     8     for(; i<num; ++i){
     9         if(REDIS_OK==redisGetReply(redis->ctx, (void**)&r)){
    10             if(r->type == REDIS_REPLY_ERROR){
    11                 zlog_error(c_redis, "redis get reply error: %.*s", r->len, r->str);
    12                 freeReplyObject(r);
    13                 return -1;
    14             }
    15             freeReplyObject(r);
    16         
    17         }else{
    18             if(redis->ctx->err==REDIS_ERR_IO||redis->ctx->err==REDIS_ERR_EOF)
    19                 redis->ctx->flags &= ~REDIS_CONNECTED;
    20             zlog_fatal(c_redis, "redis get reply fail: %s", redis->ctx->errstr);
    21             return -1;
    22         }
    23     }
    24 
    25     return 0;
    26 }
    27 
    28 int redis_send(CBED_REDIS *redis, unsigned char *data, unsigned int size, int force)
    29 {
    30     if(redis_auto_connect(redis))
    31         return -1;
    32 
    33     int i;
    34     
    35     if(redis->max_cmd_num > 1){ //pipelining
    36         for(i=0; i<redis->queue_num; ++i){
    37             if(REDIS_ERR == redisAppendCommand(redis->ctx, "RPUSH %s %b", redis->queue[i], data, size)) {
    38                 zlog_fatal(c_redis, "redis append command rpush %s len %u fail: %s", redis->queue[i], size, redis->ctx->errstr);
    39                 return -1;
    40             }
    41             
    42             ++redis->cmd_num;
    43             if((!force && redis->cmd_num==redis->max_cmd_num) || force){
    44                 if(redis_bulk_get_reply(redis))
    45                     return -1;
    46             }
    47         }
    48         
    49     }else{
    50         for(i=0; i<redis->queue_num; ++i){
    51             redisReply *r = redisCommand(redis->ctx, "RPUSH %s %b", redis->queue[i], data, size);
    52             if(NULL==r){
    53                 if(redis->ctx->err==REDIS_ERR_IO||redis->ctx->err==REDIS_ERR_EOF)
    54                     redis->ctx->flags &= ~REDIS_CONNECTED;
    55                 zlog_fatal(c_redis, "redis command rpush %s len %u fail: %s", redis->queue[i], size, redis->ctx->errstr);
    56                 return -1;
    57             }
    58             
    59             if(r->type == REDIS_REPLY_ERROR){
    60                 zlog_error(c_redis, "redis reply rpush %s len %u error: %.*s", redis->queue[i], size, r->len, r->str);
    61                 freeReplyObject(r);
    62                 return -1;
    63             }
    64 
    65             freeReplyObject(r);
    66         }
    67     }
    68 
    69     return 0;
    70 }


    春秋十二月 2021-02-25 15:51 发表评论


沪ICP备19023445号-2号
友情链接