November 16, 2019

cmux 源码分析

cmux

cmuxsoheilhy实现的一个复用连接的Go库,可以在同一个TCP listener上面监听多种服务

源码分析

cmux 项目结构如下,整个代码量包含test代码总共2k行,核心代码几百行,很适合阅读

➜  cmux git:(master) ✗ tree -L 2
.
├── CONTRIBUTORS
├── LICENSE
├── README.md
├── bench_test.go
├── buffer.go
├── cmd
│   └── cmd.go
├── cmux.go
├── cmux_test.go
├── doc.go
├── example_recursive_test.go
├── example_test.go
├── example_tls_test.go
├── go.mod
├── go.sum
├── matchers.go
├── patricia.go
└── patricia_test.go

在看cmux实现之前,可以先从README中看怎么使用cmux

// Create the main listener.
l, err := net.Listen("tcp", ":23456")
if err != nil {
    log.Fatal(err)
}

// Create a cmux.
m := cmux.New(l)

// Match connections in order:
// First grpc, then HTTP, and otherwise Go RPC/TCP.
grpcL := m.Match(cmux.HTTP2HeaderField("content-type", "application/grpc"))
httpL := m.Match(cmux.HTTP1Fast())
trpcL := m.Match(cmux.Any()) // Any means anything that is not yet matched.

// Create your protocol servers.
grpcS := grpc.NewServer()
grpchello.RegisterGreeterServer(grpcS, &server{})

httpS := &http.Server{
    Handler: &helloHTTP1Handler{},
}

trpcS := rpc.NewServer()
trpcS.Register(&ExampleRPCRcvr{})

// Use the muxed listeners for your servers.
go grpcS.Serve(grpcL)
go httpS.Serve(httpL)
go trpcS.Accept(trpcL)

// Start serving!
m.Serve()

  1. 创建一个tcp listener, 然后根据这个listener实例化cmux
  2. 根据需求(grpc, http等)将实例化的m添加对应的match规则,得到对应的listener
  3. 对不同的需求,将得到的listener分别在不同的goroutine中运行启动服务的方法
  4. 在主线程中启动cmux来监听当前tcp listener过来的payload

cmux逻辑

首先分析创建, New方法很简单,创建了实现CMux接口的cMux实例,将listener赋值到root中,然后返回CMux:

func New(l net.Listener) CMux {
    return &cMux{
        root:        l,
        bufLen:      1024,
        errh:        func(_ error) bool { return true },
        donec:       make(chan struct{}),
        readTimeout: noTimeout,
    }
}

CMux接口定义如下:

type CMux interface {
    Match(...Matcher) net.Listener // 返回匹配当前Matcher的Listener,可以理解为向CMux注册一种Matcher,Matcher则是在复用的Listener中区分Conn的匹配器
    MatchWithWriters(...MatchWriter) net.Listener // 返回匹配当前MatchWriter的Listener
    Serve() error //CMux 启动方法,是一个阻塞的方法 
    HandleError(ErrorHandler) // 注册errorhander
    SetReadTimeout(time.Duration) // 设置读取matchers的超时时间
}

type Matcher func(io.Reader) bool

接着分析cmux添加Matcher的逻辑,首先需要定义一些Matcher,cmux已经实现了一些Matcher, 具体在matchers.go中 比如http1的Matcher,则是解析io.Reader中报文第一行是否符合HTTP1的规则,如果匹配,返回true。
有了Matcher之后,向cmux中使用Match方法将其注册:

func (m *cMux) Match(matchers ...Matcher) net.Listener {
    mws := matchersToMatchWriters(matchers)
    return m.MatchWithWriters(mws...)
}

func (m *cMux) MatchWithWriters(matchers ...MatchWriter) net.Listener {
    ml := muxListener{
        Listener: m.root,
        connc:    make(chan net.Conn, m.bufLen),
    }
    m.sls = append(m.sls, matchersListener{ss: matchers, l: ml})
    return ml
}

Match实现很简单,将Matcher封装成matchersListener后append到list中 matchersListener的作用是将匹配到的conn通过connc传递给外部调用Accept进行阻塞监听的具体的服务。

再然后,分析Serve逻辑:

func (m *cMux)Serve() error{
    var wg sync.WaitGroup

    ...

    for {
        c, err := m.root.Accept()
        if err != nil {
            if !m.handleErr(err) {
                return err
            }
            continue
        }
        wg.Add(1)
        go m.serve(c, m.donec, &wg)
    }
}

Serve中,在死循环中接收root过来的新的net.Conn,接收之后在一个新的goroutine中执行match和dispatch逻辑,具体如下:

func (m *cMux) serve(c net.Conn, donec <-chan struct{}, wg *sync.WaitGroup) {
    defer wg.Done()

    muc := newMuxConn(c)      //(1)
    if m.readTimeout > noTimeout {
        _ = c.SetReadDeadline(time.Now().Add(m.readTimeout))
    }
    for _, sl := range m.sls {
        for _, s := range sl.ss {
            matched := s(muc.Conn, muc.startSniffing())  //(2)
            if matched {
                muc.doneSniffing()                //(3)
                if m.readTimeout > noTimeout {
                    _ = c.SetReadDeadline(time.Time{})
                }
                select {
                case sl.l.connc <- muc: //(4)
                case <-donec:
                    _ = c.Close()
                }
                return
            }
        }
    }

    _ = c.Close()
    err := ErrNotMatched{c: c}
    if !m.handleErr(err) {
        _ = m.root.Close()
    }
}

(1) serve将新过来的conn包裹成MuxConn,MuxConn的作用使用bufferReader结构进行探测conn的payload,然后传递给Matcher进行match
(2) 针对每一个注册进来的Matcher调用匹配方法进行匹配, 匹配的io.Reader是muc的bufferedReader,在Read之前首先调用了reset方法,将bufferedReader的sniffing置为true表示重新开始从bufferedReader中读取数据,Read方法如下:

type bufferedReader struct {
    source     io.Reader
    buffer     bytes.Buffer
    bufferRead int
    bufferSize int
    sniffing   bool
    lastErr    error
}

func (s *bufferedReader) Read(p []byte) (int, error) {
    if s.bufferSize > s.bufferRead {
        // If we have already read something from the buffer before, we return the
        // same data and the last error if any. We need to immediately return,
        // otherwise we may block for ever, if we try to be smart and call
        // source.Read() seeking a little bit of more data.
        bn := copy(p, s.buffer.Bytes()[s.bufferRead:s.bufferSize])
        s.bufferRead += bn
        return bn, s.lastErr
    } else if !s.sniffing && s.buffer.Cap() != 0 {
        // We don't need the buffer anymore.
        // Reset it to release the internal slice.
        s.buffer = bytes.Buffer{}
    }

    // If there is nothing more to return in the sniffed buffer, read from the
    // source.
    sn, sErr := s.source.Read(p)
    if sn > 0 && s.sniffing {
        s.lastErr = sErr
        if wn, wErr := s.buffer.Write(p[:sn]); wErr != nil {
            return wn, wErr
        }
    }
    return sn, sErr
}

func (s *bufferedReader) reset(snif bool) {
    s.sniffing = snif
    s.bufferRead = 0
    s.bufferSize = s.buffer.Len()
}

Read方法中首先判断当前已经读取的数量和自己缓存数据的数量,如果有并且缓存数据大于读取的数据,将buffer中的数据返回;反之从net.Conn中读取

(3) 回到serve方法,当完成match之后,将bufferedReader的sniffing置为false
(4) 将MuxConn通过listener的connc传递出去,继续等待着新的conn到来
需要注意的是,一个conn只能被匹配一次,匹配的顺序和Matcher存入list的顺序一致。

TIP

可以将bufferedReader放置在sync.Pool中

© Wade Van 2019

Powered by Hugo & Kiss.