cmux
cmux 是soheilhy实现的一个复用连接的Go库,可以在同一个TCP listener上面监听多种服务
源码分析
cmux 项目结构如下,整个代码量包含test代码总共2k行,核心代码几百行,很适合阅读
➜ cmux git:(master) ✗ tree -L 2
.
├── CONTRIBUTORS
├── LICENSE
├── README.md
├── bench_test.go
├── buffer.go
├── cmd
│ └── cmd.go
├── cmux.go
├── cmux_test.go
├── doc.go
├── example_recursive_test.go
├── example_test.go
├── example_tls_test.go
├── go.mod
├── go.sum
├── matchers.go
├── patricia.go
└── patricia_test.go
在看cmux实现之前,可以先从README中看怎么使用cmux
// Create the main listener.
l, err := net.Listen("tcp", ":23456")
if err != nil {
log.Fatal(err)
}
// Create a cmux.
m := cmux.New(l)
// Match connections in order:
// First grpc, then HTTP, and otherwise Go RPC/TCP.
grpcL := m.Match(cmux.HTTP2HeaderField("content-type", "application/grpc"))
httpL := m.Match(cmux.HTTP1Fast())
trpcL := m.Match(cmux.Any()) // Any means anything that is not yet matched.
// Create your protocol servers.
grpcS := grpc.NewServer()
grpchello.RegisterGreeterServer(grpcS, &server{})
httpS := &http.Server{
Handler: &helloHTTP1Handler{},
}
trpcS := rpc.NewServer()
trpcS.Register(&ExampleRPCRcvr{})
// Use the muxed listeners for your servers.
go grpcS.Serve(grpcL)
go httpS.Serve(httpL)
go trpcS.Accept(trpcL)
// Start serving!
m.Serve()
- 创建一个tcp listener, 然后根据这个listener实例化cmux
- 根据需求(grpc, http等)将实例化的m添加对应的match规则,得到对应的listener
- 对不同的需求,将得到的listener分别在不同的goroutine中运行启动服务的方法
- 在主线程中启动cmux来监听当前tcp listener过来的payload
cmux逻辑
首先分析创建, New
方法很简单,创建了实现CMux接口的cMux实例,将listener赋值到root中,然后返回CMux:
func New(l net.Listener) CMux {
return &cMux{
root: l,
bufLen: 1024,
errh: func(_ error) bool { return true },
donec: make(chan struct{}),
readTimeout: noTimeout,
}
}
CMux接口定义如下:
type CMux interface {
Match(...Matcher) net.Listener // 返回匹配当前Matcher的Listener,可以理解为向CMux注册一种Matcher,Matcher则是在复用的Listener中区分Conn的匹配器
MatchWithWriters(...MatchWriter) net.Listener // 返回匹配当前MatchWriter的Listener
Serve() error //CMux 启动方法,是一个阻塞的方法
HandleError(ErrorHandler) // 注册errorhander
SetReadTimeout(time.Duration) // 设置读取matchers的超时时间
}
type Matcher func(io.Reader) bool
接着分析cmux添加Matcher的逻辑,首先需要定义一些Matcher,cmux已经实现了一些Matcher, 具体在matchers.go中
比如http1的Matcher,则是解析io.Reader中报文第一行是否符合HTTP1的规则,如果匹配,返回true。
有了Matcher之后,向cmux中使用Match方法将其注册:
func (m *cMux) Match(matchers ...Matcher) net.Listener {
mws := matchersToMatchWriters(matchers)
return m.MatchWithWriters(mws...)
}
func (m *cMux) MatchWithWriters(matchers ...MatchWriter) net.Listener {
ml := muxListener{
Listener: m.root,
connc: make(chan net.Conn, m.bufLen),
}
m.sls = append(m.sls, matchersListener{ss: matchers, l: ml})
return ml
}
Match实现很简单,将Matcher封装成matchersListener后append到list中 matchersListener的作用是将匹配到的conn通过connc传递给外部调用Accept进行阻塞监听的具体的服务。
再然后,分析Serve
逻辑:
func (m *cMux)Serve() error{
var wg sync.WaitGroup
...
for {
c, err := m.root.Accept()
if err != nil {
if !m.handleErr(err) {
return err
}
continue
}
wg.Add(1)
go m.serve(c, m.donec, &wg)
}
}
Serve中,在死循环中接收root过来的新的net.Conn,接收之后在一个新的goroutine中执行match和dispatch逻辑,具体如下:
func (m *cMux) serve(c net.Conn, donec <-chan struct{}, wg *sync.WaitGroup) {
defer wg.Done()
muc := newMuxConn(c) //(1)
if m.readTimeout > noTimeout {
_ = c.SetReadDeadline(time.Now().Add(m.readTimeout))
}
for _, sl := range m.sls {
for _, s := range sl.ss {
matched := s(muc.Conn, muc.startSniffing()) //(2)
if matched {
muc.doneSniffing() //(3)
if m.readTimeout > noTimeout {
_ = c.SetReadDeadline(time.Time{})
}
select {
case sl.l.connc <- muc: //(4)
case <-donec:
_ = c.Close()
}
return
}
}
}
_ = c.Close()
err := ErrNotMatched{c: c}
if !m.handleErr(err) {
_ = m.root.Close()
}
}
(1) serve将新过来的conn包裹成MuxConn,MuxConn的作用使用bufferReader结构进行探测conn的payload,然后传递给Matcher进行match
(2) 针对每一个注册进来的Matcher调用匹配方法进行匹配, 匹配的io.Reader是muc的bufferedReader,在Read之前首先调用了reset方法,将bufferedReader的sniffing置为true表示重新开始从bufferedReader中读取数据,Read方法如下:
type bufferedReader struct {
source io.Reader
buffer bytes.Buffer
bufferRead int
bufferSize int
sniffing bool
lastErr error
}
func (s *bufferedReader) Read(p []byte) (int, error) {
if s.bufferSize > s.bufferRead {
// If we have already read something from the buffer before, we return the
// same data and the last error if any. We need to immediately return,
// otherwise we may block for ever, if we try to be smart and call
// source.Read() seeking a little bit of more data.
bn := copy(p, s.buffer.Bytes()[s.bufferRead:s.bufferSize])
s.bufferRead += bn
return bn, s.lastErr
} else if !s.sniffing && s.buffer.Cap() != 0 {
// We don't need the buffer anymore.
// Reset it to release the internal slice.
s.buffer = bytes.Buffer{}
}
// If there is nothing more to return in the sniffed buffer, read from the
// source.
sn, sErr := s.source.Read(p)
if sn > 0 && s.sniffing {
s.lastErr = sErr
if wn, wErr := s.buffer.Write(p[:sn]); wErr != nil {
return wn, wErr
}
}
return sn, sErr
}
func (s *bufferedReader) reset(snif bool) {
s.sniffing = snif
s.bufferRead = 0
s.bufferSize = s.buffer.Len()
}
Read方法中首先判断当前已经读取的数量和自己缓存数据的数量,如果有并且缓存数据大于读取的数据,将buffer中的数据返回;反之从net.Conn中读取
(3) 回到serve方法,当完成match之后,将bufferedReader的sniffing置为false
(4) 将MuxConn通过listener的connc传递出去,继续等待着新的conn到来
需要注意的是,一个conn只能被匹配一次,匹配的顺序和Matcher存入list的顺序一致。
TIP
可以将bufferedReader放置在sync.Pool中