IT博客汇
  • 首页
  • 精华
  • 技术
  • 设计
  • 资讯
  • 扯淡
  • 权利声明
  • 登录 注册

    [原]豆瓣Redis解决方案Codis源码剖析:Proxy代理

    dc_726发表于 2015-07-03 21:33:18
    love 0
    豆瓣Redis解决方案Codis源码剖析:Proxy代理1.预备知识1.1 CodisCodis就不详细说了,摘抄一下GitHub上的一些项目描述:Codis is a proxy based high performance Redis cluster solution written in Go/C, an alternative to Twemproxy. It supports multiple stateless proxy with multiple redis instances and is engineered to elastically scale, Easily add or remove redis or proxy instances on-demand/dynamicly.Auto rebalanceSupport both redis or rocksdb transparentlyGUI dashboard & admin toolsSupports most of Redis commands, Fully compatible with twemproxyNative Redis clients are supportedSafe and transparent data migration, Easily add or remove nodes on-demandCommand-line interface is also providedRESTful APIs安装步骤官方网站上也写的很清楚了:// Golang环境安装配置[root@vmroot]$ tar -C /usr/local -zxf go1.4.2.linux-amd64.tar.gz [root@vmroot]$ vim /etc/profileexportGOROOT=/usr/local/goexportPATH=$GOROOT/bin:$PATHexportGOPATH=/home/user/go [root@vmroot]$ source /etc/profile [root@vmroot]$ goversion// 下载Codis依赖,编译Codis[root@vmroot]$ cd codis-1.92[root@vmroot]$ ./bootstrap.sh1.2 GolangCodis的核心代码都是用Golang开发的,所以在一头扎进源代码里之前,先了解Golang的语法特性是必不可少的!好在Golang除了少数一些“古怪之处”外,还算容易上手。具体请参考笔者的另一篇文章《 Java程序员的Golang入门指南(上)》。1.3 Redis通信协议Redis通信协议简称为RESP,在分析网络通信时需要这一部分的知识。RESP本身的设计非常简单,所以还是快速过一下吧。具体请参考笔者的另一篇文章《用Netty解析Redis网络协议》以及官网上的协议具体规范。1.4 ZookeeperCodis以及现今很多后端中间件都使用Zookeeper来协调分布式通信,所以在阅读源码前我们至少要知道Zookeeper是干什么的,有哪些基本操作和监听器。具体请参考笔者的另一篇文章《Apache Curator入门实战》。2.Proxy源码剖析Codis可以分为客户端Jodis、代理中间件Codis Proxy、Zookeeper协调、监控界面、Redis定制版Codis Server等组件。这里第一部分主要关注最核心的Proxy部分的源码。2.1 程序入口main.gocodis-1.92/cmd/proxy/main.go是Proxy组件的main函数入口,完成的主要工作就是设置日志级别、解析命令行参数(CPU核数、绑定地址等)、加载配置文件、Golang环境(runtime.GOMAXPROCS并发数)、启动Socket监听等常规任务。顺藤摸瓜,我们要分析的关键应该就在router中。funcmain() {// 1.打印banner,设置日志级别fmt.Print(banner) log.SetLevelByString("info")// 2.解析命令行参数args, err := docopt.Parse(usage,nil,true,"codis proxy v0.1",true)iferr !=nil{ log.Error(err) }ifargs["-c"] !=nil{ configFile = args["-c"].(string) } ... dumppath := utils.GetExecutorPath() log.Info("dump file path:", dumppath) log.CrashLog(path.Join(dumppath,"codis-proxy.dump"))// 3.设置Golang并发数等router.CheckUlimit(1024) runtime.GOMAXPROCS(cpus)// 4.启动Http监听http.HandleFunc("/setloglevel", handleSetLogLevel)gohttp.ListenAndServe(httpAddr,nil) log.Info("running on ", addr) conf, err := router.LoadConf(configFile)iferr !=nil{ log.Fatal(err) }// 5.创建Server,启动Socket监听s := router.NewServer(addr, httpAddr, conf) s.Run() log.Warning("exit") }2.2 核心类Server打开codis-1.92/pkg/proxy/router/router.go,在分析请求接收和分发前,先来看一个最核心的类Server,它就是在main.go中调用router.NewServer()时创建的。说一下比较重要的几个字段:reqCh:Pipeline请求的Channel。pools:Slot与cachepool的map。evtbus/top:处理Zookeeper消息,更新拓扑结构。bufferedReq:Slot处于migrate期间被缓冲的请求。pipeConns:Slot对应的taskrunner。注意interface{},它表示空interface,按照Golang的Duck Type继承方式,任何类都是空接口的子类。所以interface{}有点像C语言中的void*/char*。因为Codis是先启动监听再开始接收Socket请求,所以对go s.handleTopoEvent()的分析放到后面。在下一节我们先看一下Codis是如何启动对Socket端口监听并将接收到的请求放入到Server的reqCh管道中的。typeServerstruct{ slots [models.DEFAULT_SLOT_NUM]*Slot top *topo.Topology evtbuschaninterface{} reqChchan*PipelineRequest lastActionSeqintpi models.ProxyInfo startAt time.Time addrstringmoper *MultiOperator pools *cachepool.CachePool counter *stats.Counters OnSuicide OnSuicideFun bufferedReq *list.List conf *Conf pipeConnsmap[string]*taskRunner//redis->taskrunner}funcNewServer(addrstring, debugVarAddrstring, conf *Conf) *Server { log.Infof("start with configuration: %+v", conf)// 1.创建Server类s := &Server;{ conf: conf, evtbus:make(chaninterface{},1000), top: topo.NewTopo(conf.productName, conf.zkAddr, conf.f, conf.provider), counter: stats.NewCounters("router"), lastActionSeq:-1, startAt: time.Now(), addr: addr, moper: NewMultiOperator(addr), reqCh:make(chan*PipelineRequest,1000), pools: cachepool.NewCachePool(), pipeConns:make(map[string]*taskRunner), bufferedReq: list.New(), } ...// 2.启动Zookeeper监听器s.RegisterAndWait() _, err = s.top.WatchChildren(models.GetWatchActionPath(conf.productName), s.evtbus)iferr !=nil{ log.Fatal(errors.ErrorStack(err)) }// 3.初始化所有Slot的信息s.FillSlots()// 4.启动对reqCh和evtbus中事件的监听gos.handleTopoEvent()returns }2.2.1 Zookeeper通信topology.goNewServer()中调用的RegisterAndWait()和WatchChildren()都是处理Zookeeper的。一部分代码在codis-1.92/pkg/proxy/router/topology/topology.go中,一部分底层实现在codis-1.92/pkg/models包下。这里就不具体分析models包是如何与Zookeeper通信的了,以免偏离了主题。现阶段,我们只需知道Zookeeper中结点关系(Proxy拓扑结构)的变化都会反映到evtbus管道中就行了。func(s *Server) RegisterAndWait() { _, err := s.top.CreateProxyInfo(&s.pi;)iferr !=nil{ log.Fatal(errors.ErrorStack(err)) } _, err = s.top.CreateProxyFenceNode(&s.pi;)iferr !=nil{ log.Warning(errors.ErrorStack(err)) } s.registerSignal() s.waitOnline() }func(top *Topology) WatchChildren(pathstring, evtbuschaninterface{}) ([]string, error) { content, _, evtch, err := top.zkConn.ChildrenW(path)iferr !=nil{returnnil, errors.Trace(err) }// 启动监听器,监听Zookeeper事件gotop.doWatch(evtch, evtbus)returncontent,nil}func(top *Topology) doWatch(evtch <-chantopo.Event, evtbuschaninterface{}) { e := <-evtchife.State == topo.StateExpired || e.Type == topo.EventNotWatching { log.Fatalf("session expired: %+v", e) } log.Warningf("topo event %+v", e)switche.Type {//case topo.EventNodeCreated://case topo.EventNodeDataChanged:casetopo.EventNodeChildrenChanged://only care children changed//todo:get changed node and decode eventdefault: log.Warningf("%+v", e) }// 将Zookeeper结点变化的事件放入Server的evtbus管道中evtbus <- e }2.2.2 初始化槽信息fillSlots()Codis将Redis服务器按照Group划分,每个Group就是一个Master以及至少一个Slave。也就是说每个Group都对应哈希散列的一个Slot。fillSlots()从ZooKeeper中取出注册的Redis后端信息,初始化每个Slot(默认1024个):包括Slot状态、Group信息等。func(s *Server) FillSlots() {// 为所有默认1024个Slot初始化信息fori :=0; i < models.DEFAULT_SLOT_NUM; i++ { s.fillSlot(i,false) } }func(s *Server) fillSlot(iint, forcebool) { s.clearSlot(i)// 1.获得当前Slot的信息和Group信息slotInfo, groupInfo, err := s.top.GetSlotByIndex(i) slot := &Slot;{ slotInfo: slotInfo, dst: group.NewGroup(*groupInfo), groupInfo: groupInfo, }// 2.创建Slot对应的cachepools.pools.AddPool(slot.dst.Master())ifslot.slotInfo.State.Status == models.SLOT_STATUS_MIGRATE {//get migrate src group and fill itfrom, err := s.top.GetGroup(slot.slotInfo.State.MigrateStatus.From)iferr !=nil{//todo: retry ?log.Fatal(err) } slot.migrateFrom = group.NewGroup(*from) s.pools.AddPool(slot.migrateFrom.Master()) } s.slots[i] = slot s.counter.Add("FillSlot",1) }codis-1.92/pkg/proxy/cachepool/cachepool.go和codis-1.92/pkg/proxy/redispool/redispool.go中负责创建与Redis通信的连接池。typeLivePoolstruct{ pool redispool.IPool }typeCachePoolstruct{ mu sync.RWMutex poolsmap[string]*LivePool }func(cp *CachePool) AddPool(keystring) error {// 1.锁住cachepoolcp.mu.Lock()defercp.mu.Unlock()// 2.查找当前Slot的连接池pool, ok := cp.pools[key]ifok {returnnil}// 3.若不存在则新建LivePoolpool = &LivePool;{//pool: redispool.NewConnectionPool("redis conn pool", 50, 120*time.Second),pool: NewSimpleConnectionPool(), }// 4.打开连接pool.pool.Open(redispool.ConnectionCreator(key))// 5.保存新建好的连接池cp.pools[key] = poolreturnnil}2.3 请求接收router.go(1)下面继续跟踪主流程,main()方法在调用NewServer()创建出Server实例后,调用了其Run()方法。Run()是标准的服务端代码,首先net.Listen()绑定到端口上监听,然后进入死循环Accept(),每接收到一个连接就启动一个goroutine进行处理。func(s *Server) Run() { log.Infof("listening %s on %s", s.conf.proto, s.addr) listener, err := net.Listen(s.conf.proto, s.addr) ...for{ conn, err := listener.Accept()iferr !=nil{ log.Warning(errors.ErrorStack(err))continue}gos.handleConn(conn) } }handleConn()接收到客户端的连接,完成三件事儿:创建session对象:保存当前客户端的Socket连接、读写缓冲区、响应Channel等。启动响应goroutine:client.WritingLoop()中处理backQ中的响应数据。建立Redis连接:server.redisTunnel()中打开连接,读取客户端请求并转发给Redis处理。func(s *Server) handleConn(c net.Conn) { log.Info("new connection", c.RemoteAddr()) s.counter.Add("connections",1)// 1.创建当前客户端的Session实例client := &session;{ Conn: c, r: bufio.NewReaderSize(c,32*1024), w: bufio.NewWriterSize(c,32*1024), CreateAt: time.Now(), backQ:make(chan*PipelineResponse,1000), closeSignal: &sync.WaitGroup;{}, } client.closeSignal.Add(1)// 2.启动监视backQ写回响应的子routinegoclient.WritingLoop() ...// 3.循环读取该客户端的请求并处理for{ err = s.redisTunnel(client)iferr !=nil{close(client.backQ)return} client.Ops++ } }redisTunnel可以说是Proxy服务端的“代码中枢”了,最核心的代码都是在这里共同协作完成任务的,它调用三个最为关键的函数:getRespOpKeys()解析请求:在helper.go中,委托parser.go解析客户端请求。此处对多参数的请求例如hmset进行特殊处理,因为key可能对应多个后端Redis实例。如果是单参数,则可以Pipeline化发送给后端。mapKey2Slot()哈希映射:在mapper.go中,计算key应该分配到哪台Redis服务器的Slot中。PipelineRequest()创建Pipeline请求:根据前面得到的数据新建PipelineRequest,并发送到当前客户端Session中的Channel中。之后调用pr.wg.Wait(),当前go s.handleConn()创建的goroutine休眠等待响应。func(s *Server) redisTunnel(c *session) error { resp, op, keys, err := getRespOpKeys(c) k := keys[0] ...ifisMulOp(opstr) {iflen(keys) >1{//can not send to redis directlyvarresult []byteerr := s.moper.handleMultiOp(opstr, keys, &result;)iferr !=nil{returnerrors.Trace(err) } s.sendBack(c, op, keys, resp, result)returnnil} } i := mapKey2Slot(k)//pipelinec.pipelineSeq++ pr := &PipelineRequest;{ slotIdx: i, op: op, keys: keys, seq: c.pipelineSeq, backQ: c.backQ, req: resp, wg: &sync.WaitGroup;{}, } pr.wg.Add(1) s.reqCh <- pr pr.wg.Wait()returnnil}2.3.1 RESP协议解析parser.goredisTunnel()调用了helper.go中的getRespOpKeys(),后者使用parser.go解析RESP协议请求,从Parse()函数的代码中能清晰地看到对RESP五种通信格式’-‘,’+’,’:’,’$’,’*’。因为要根据请求中的命令和key做路由,以及特殊处理(例如多参数命令),所以Codis不能简单地透传,而是解析协议获得所需的信息。注意parser.Parse()的用法,这里parser是包名不是一个对象实例,而Parse是parser包中的一个public函数。所以乍看之下有点困惑了,这也是Golang支持既像C一样面向过程编程,又有高级语言的面向对象甚至Duck Type的缘故。Parse()读取网络流,并递归处理整个请求。例如”GET ab”命令:*2\r\n$3\r\nGET\r\n$2\r\nab\r\n最终Parse()返回时得到:Resp{Raw: "*2\r\n", Multi{Resp{Raw: "$3\r\nGET\r\n"}, Resp{Raw: "$2\r\nab\r\n"}}}如果细致分析的话,readLine()中使用readSlice()读取缓冲区的切片,节约了内存。这种设计上的小细节还是很值得关注和学习的,毕竟“天下大事,必作于细”。funcgetRespOpKeys(c *session) (*parser.Resp, []byte, [][]byte, error) { resp, err := parser.Parse(c.r)// read client requestop, keys, err := resp.GetOpKeys() ...returnresp, op, keys,nil}typeRespstruct{ TypeintRaw []byteMulti []*Resp }funcParse(r *bufio.Reader) (*Resp, error) { line, err := readLine(r)iferr !=nil{returnnil, errors.Trace(err) } resp := &Resp;{}ifline[0] =='$'|| line[0] =='*'{ resp.Raw =make([]byte,0,len(line)+64) }else{ resp.Raw =make([]byte,0,len(line)) } resp.Raw =append(resp.Raw, line...)switchline[0] {case'-': resp.Type = ErrorRespreturnresp,nilcase'+': resp.Type = SimpleStringreturnresp,nilcase':': resp.Type = IntegerRespreturnresp,nilcase'$': resp.Type = BulkResp ...case'*': resp.Type = MultiResp ... }2.3.2 哈希映射mapper.gomapKey2Slot()处理HashTag,并使用CRC32计算哈希值。const( HASHTAG_START ='{'HASHTAG_END ='}')funcmapKey2Slot(key []byte)int{ hashKey := key//hash tag supporthtagStart := bytes.IndexByte(key, HASHTAG_START)ifhtagStart >=0{ htagEnd := bytes.IndexByte(key[htagStart:], HASHTAG_END)ifhtagEnd >=0{ hashKey = key[htagStart+1: htagStart+htagEnd] } }returnint(crc32.ChecksumIEEE(hashKey) % models.DEFAULT_SLOT_NUM) }2.4 请求分发router.go(2)NewServer()中执行go s.handleTopoEvent()启动goroutine,对Server数据结构中的reqCh和evtbus两个Channel进行事件监听处理。这里重点看拿到reqCh的事件后是如何dispatch()的。reqCh的事件也就是PipelineRequest,会经dispath()函数放入对应Slot的taskrunner的in管道中。也就是说,reqCh中的请求会被分发到各个Slot自己的Channel中。另外注意:此处会检查PipelineRequest对应Slot的状态,如果正在migrate,则暂时将请求缓冲到Server类的bufferedReq链表中。func(s *Server) handleTopoEvent() {for{select{// 1.处理Server.reqCh中事件caser := <-s.reqCh:// 1.1 如果正在migrate,则将请求r暂时缓冲起来ifs.slots[r.slotIdx].slotInfo.State.Status == models.SLOT_STATUS_PRE_MIGRATE { s.bufferedReq.PushBack(r)continue}// 1.2 处理缓冲中的请求efore := s.bufferedReq.Front(); e !=nil; { next := e.Next() s.dispatch(e.Value.(*PipelineRequest)) s.bufferedReq.Remove(e) e = next }// 1.3 处理当前请求rs.dispatch(r)// 2.处理Server.evtbus中请求casee := <-s.evtbus:switche.(type) {case*killEvent: s.handleMarkOffline() e.(*killEvent).done <-nildefault: evtPath := GetEventPath(e) ... s.processAction(e) } } } }func(s *Server) dispatch(r *PipelineRequest) { s.handleMigrateState(r.slotIdx, r.keys[0])// 1.查找Slot对应的taskrunnertr, ok := s.pipeConns[s.slots[r.slotIdx].dst.Master()]// 2.若没有,则新建一个taskrunnerif!ok {// 2.1 新建tr时出错,则向r.backQ放入一个空响应iferr := s.createTaskRunner(s.slots[r.slotIdx]); err !=nil{ r.backQ <- &PipelineResponse;{ctx: r, resp:nil, err: err}return}// 2.2 拿到taskrunnertr = s.pipeConns[s.slots[r.slotIdx].dst.Master()] }// 3.将请求r放入in管道tr.in <- r }taskrunner.go的createTaskRunner()调用NewTaskRunner()创建当前Slot对应的taskrunner。每个taskrunner都拥有一对in和out管道。之前的PipelineRequest就是放到in管道中。然后启动了两个goroutine,分别调用writeloop()和readloop()函数监听in和out管道,处理其中的请求。func(s *Server) createTaskRunner(slot *Slot) error { dst := slot.dst.Master()if_, ok := s.pipeConns[dst]; !ok { tr, err := NewTaskRunner(dst, s.conf.netTimeout)iferr !=nil{returnerrors.Errorf("create task runner failed, %v, %+v, %+v", err, slot.dst, slot.slotInfo) }else{ s.pipeConns[dst] = tr } }returnnil}funcNewTaskRunner(addrstring, netTimeoutint) (*taskRunner, error) {// 1.创建TaskRunner实例tr := &taskRunner;{ in:make(chaninterface{},1000), out:make(chaninterface{},1000), redisAddr: addr, tasks: list.New(), netTimeout: netTimeout, }// 2.创建Redis连接,并绑定到trc, err := redisconn.NewConnection(addr, netTimeout) tr.c = c// 3.开始监听读写管道in和outgotr.writeloop()gotr.readloop()returntr,nil}func(tr *taskRunner) writeloop() {varerr error tick := time.Tick(2* time.Second)for{ ...select{// 1.处理in管道中来自客户端的请求caset := <-tr.in: tr.processTask(t)// 2.处理out管道中来自Redis的响应caseresp := <-tr.out: err = tr.handleResponse(resp)// 设置select间隔case<-tick:iftr.tasks.Len() >0&∫(time.Since(tr.latest).Seconds()) > tr.netTimeout { tr.c.Close() } } } }2.5 请求发送taskrunner.go终于到了请求的生命周期的最后一个环节了!writeloop()会不断调用processTask()处理in管道中的请求,通过dowrite()函数发送到Redis服务端。当in管道中没有其他请求时,会强制刷新一下缓冲区。func(tr *taskRunner) processTask(tinterface{}) {varerr errorswitcht.(type) {case*PipelineRequest: r := t.(*PipelineRequest)varflushbooliflen(tr.in) ==0{//force flushflush =true} err = tr.handleTask(r, flush)case*sync.WaitGroup://close taskrunnererr = tr.handleTask(nil,true)//flush... } ... }func(tr *taskRunner) handleTask(r *PipelineRequest, flushbool) error {ifr ==nil&& flush {//just flushreturntr.c.Flush() }// 1.将请求保存到链表,接收到响应时再移除tr.tasks.PushBack(r) tr.latest = time.Now()// 2.发送请求到Redisreturnerrors.Trace(tr.dowrite(r, flush)) }typeRespstruct{ TypeintRaw []byteMulti []*Resp }func(tr *taskRunner) dowrite(r *PipelineRequest, flushbool) error {// 1.通过Bytes()函数取出Resp中的原始字节Rawb, err := r.req.Bytes() ...// 2.将原始请求发送到Redis服务端_, err = tr.c.Write(b) ...// 3.如果需要,强制刷新缓冲区ifflush {returnerrors.Trace(tr.c.Flush()) }returnnil}Codis使用Golang的bufio库处理底层的IO流读写操作。在NewConnection()中,用net包创建到Redis的Socket连接,并分别创建大小为512K的读写缓冲流。//not thread-safetypeConnstruct{ addrstringnet.Conn closedboolr *bufio.Reader w *bufio.Writer netTimeoutint//second}funcNewConnection(addrstring, netTimeoutint) (*Conn, error) {// 1.打开到Redis服务端的TCP连接conn, err := net.DialTimeout("tcp", addr, time.Duration(netTimeout)*time.Second) ...// 2.创建Conn实例,及读写缓冲区return&Conn;{ addr: addr, Conn: conn, r: bufio.NewReaderSize(conn,512*1024), w: bufio.NewWriterSize(deadline.NewDeadlineWriter(conn, time.Duration(netTimeout)*time.Second),512*1024), netTimeout: netTimeout, },nil}func(c *Conn) Flush() error {returnc.w.Flush() }func(c *Conn) Write(p []byte) (int, error) {returnc.w.Write(p) }2.6 返回响应session.go当writeloop()在“如火如荼”地向Redis发送请求时,readloop()也没有闲着。它不断地从Redis读取响应。发送每个请求时都不会等待Redis的响应,也就是说发送请求和读取响应完全是异步进行的,所以就充分利用了Pipeline的性能优势。func(tr *taskRunner) readloop() {for{// 1.从Redis连接中读取响应resp, err := parser.Parse(tr.c.BufioReader())iferr !=nil{ tr.out <- errreturn}// 2.将解析好的响应放入out管道中tr.out <- resp } }func(tr *taskRunner) handleResponse(einterface{}) error {switche.(type) { ...case*parser.Resp:// 1.取到out管道中的PipelineResponseresp := e.(*parser.Resp)// 2.取出对应的PipelineRequeste := tr.tasks.Front() req := e.Value.(*PipelineRequest)// 3.将响应放入到backQ管道中(req.backQ也就是session中的backQ)req.backQ <- &PipelineResponse;{ctx: req, resp: resp, err:nil}// 4.从任务列表中移除已拿到响应的请求tr.tasks.Remove(e)returnnil}returnnil}因为writeloop()不仅监视in管道,也监视out管道。所以writeloop()会将readloop()放入的响应交给handleResponse()处理。最终PipelineResponse被放入Session对象的backQ管道中。还记得它吗?在最开始NewServer时为当前客户端创建的Session实例。最后,接收到的PipleResponse会转成RESP协议的字节序列,发送回客户端。func(s *session) WritingLoop() { s.lastUnsentResponseSeq =1for{select{caseresp, ok := <-s.backQ:if!ok { s.Close() s.closeSignal.Done()return} flush, err := s.handleResponse(resp) ... } } }func(s *session) handleResponse(resp *PipelineResponse) (flushbool, err error) { ...if!s.closed {iferr := s.writeResp(resp); err !=nil{returnfalse, errors.Trace(err) } flush =true}return}func(s *session) writeResp(resp *PipelineResponse) error {// 1.取出Resp中的原始字节buf, err := resp.resp.Bytes()iferr !=nil{returnerrors.Trace(err) }// 2.写回到客户端_, err = s.Write(buf)returnerrors.Trace(err) }//write without bufiofunc(s *session) Write(p []byte) (int, error) {returns.w.Write(p) }2.7 Proxy源码流程总结最后以一张Proxy的流程图作结束。经过我们的分析能够看出,关于并发安全方面,Codis唯一需要并发控制的地方就是从reqCh分发到各个Slot的Channel,为了避免竞争,这一部分是由一个goroutine完成的。


沪ICP备19023445号-2号
友情链接