ETCD数据库源码分析——集群间网络层服务端接口

网友投稿 268 2022-09-19

ETCD数据库源码分析——集群间网络层服务端接口

从上一篇文章ETCD数据库源码分析——集群通信初始化我们知道:

集群通信服务端会调用configurePeerListeners函数为配置Config中LPUrls每个url创建一个peerListener,该函数为初始化peerListener结构体会调用​​transport.NewListenerWithOpts​​函数创建net.Listener。configurePeerListeners函数并没有初始化peerListener结构体的serve函数。集群通信服务端会调用servePeers函数启动服务。servePeers函数会创建协程运行​​cmux.New(p.Listener).Serve()​​函数。集群通信服务端会调用servePeers函数启动服务。servePeers函数首先NewPeerHandler会调用​​newPeerHandler(lg,s,s.RaftHandler()...)​​​,RaftHandler()会返回s.r.transport.Handler(),最终transport.Handler函数返回了注册了pipelineHandler、streamHandler、snapHandler的mux来获取相应的Handler。servePeers函数会创建协程运行​​&ph, }.Serve(m.Match(cmux.Any()))​​

这篇文章我们来学习一下​​transport.NewListenerWithOpts​​函数和pipelineHandler、streamHandler、snapHandler。

NewListenerWithOpts函数

Listener的初始化代码如下所示,transport.NewListenerWithOpts函数代码位于client/pkg/transport/listener.go文件中,作为transport包内export到外部的函数。

[i] = &peerListener{close: func(context.Context) error { return nil }} peers[i].Listener, err = transport.NewListenerWithOpts(u.Host, u.Scheme, transport.WithTLSInfo(&cfg.PeerTLSInfo), transport.WithSocketOpts(&cfg.SocketOpts), transport.WithTimeout(raftraft )

client/pkg/transport/listener.go文件export两个函数给其他代码调用。NewListener函数用于不带选项时使用,从调用看两个函数并没有本质的不同,最终都是会调用client/pkg/transport/listener.go文件中的newListener函数。

// NewListener creates a new listner.func NewListener(addr, scheme string, tlsinfo *TLSInfo) (l net.Listener, err error) { return newListener(addr, scheme, WithTLSInfo(tlsinfo))}// NewListenerWithOpts creates a new listener which accepts listener options.func NewListenerWithOpts(addr, scheme string, opts ...ListenerOption) (net.Listener, error) { return newListener(addr, scheme, opts...)}

newListener函数比较负责,会根据不同的协议和选项调用不同的函数产生不同的Listener。针对unix或unixs协议调用NewUnixListener函数(定义在client/pkg/transport/unix_listener.go文件);通过listen选项(相关代码位于client/pkg/transport/listener_opts.go文件)来设定监听配置;rwTimeoutListener定义在client/pkg/transport/timeout_listener.go文件中;NewTLSListener定义在client/pkg/transport/listener_tls.go中。其实万变不离其宗net.Listener GO包。

func newListener(addr, scheme string, opts ...ListenerOption) (net.Listener, error) { if scheme == "unix" || scheme == "unixs" { // unix sockets via unix://laddr return NewUnixListener(addr) } lnOpts := newListenOpts(opts...) // listen选项相关代码位于client/pkg/transport/listener_opts.go文件 switch { case lnOpts.IsSocketOpts(): config, err := newListenConfig(lnOpts.socketOpts) // new ListenConfig with socket options. if err != nil { return nil, err } lnOpts.ListenConfig = config // check for timeout fallthrough case lnOpts.IsTimeout(), lnOpts.IsSocketOpts(): ln, err := lnOpts.ListenConfig.Listen(context.TODO(), "tcp", addr) // timeout listener with socket options. if err != nil { return nil, err } lnOpts.Listener = &rwTimeoutListener{ Listener: ln, readTimeout: lnOpts.readTimeout, writeTimeout: lnOpts.writeTimeout, } case lnOpts.IsTimeout(): ln, err := net.Listen("tcp", addr) if err != nil { return nil, err } lnOpts.Listener = &rwTimeoutListener{ Listener: ln, readTimeout: lnOpts.readTimeout, writeTimeout: lnOpts.writeTimeout, } default: ln, err := net.Listen("tcp", addr) if err != nil { return nil, err } lnOpts.Listener = ln } // only skip if not passing TLSInfo if lnOpts.skipTLSInfoCheck && !lnOpts.IsTLS() { return lnOpts.Listener, nil } return wrapTLS(scheme, lnOpts.tlsInfo, lnOpts.Listener)}func wrapTLS(scheme string, tlsinfo *TLSInfo, l net.Listener) (net.Listener, error) { if scheme != "&& scheme != "unixs" { return l, nil } if tlsinfo != nil && tlsinfo.SkipClientSANVerify { return NewTLSListener(l, tlsinfo) } return newTLSListener(l, tlsinfo, checkSAN)}

pipelineHandler、streamHandler、snapHandler

servePeers函数首先NewPeerHandler会调用​​newPeerHandler(lg,s,s.RaftHandler()...)​​​,RaftHandler()会返回s.r.transport.Handler(),最终transport.Handler函数返回了注册了pipelineHandler、streamHandler、snapHandler的mux来获取相应的Handler。首先我们先看etcdserver.ServerPeerV2接口,其包含了ServerPeer,而ServerPeer又包含了ServerV2接口,最终可以发现ServerV2接口包含了Server接口。而EtcdServer(server/etcdserver/server.go)是Server接口的实现,因此servePeers函数代码​​ph := etcde.Server)​​中形参etcdserver.ServerPeerV2传入EtcdServer实参是没有任何问题的。

type ServerPeerV2 interface { ServerPeer HashKVHandler() DowngradeEnabledHandler() ServerPeer interface { ServerV2 RaftHandler() LeaseHandler() ServerV2 interface { Server Leader() types.ID // Do takes a V2 request and attempts to fulfill it, returning a Response. Do(ctx context.Context, r pb.Request) (Response, error) ClientCertAuthEnabled() bool}type Server interface { AddMember(ctx context.Context, memb membership.Member) ([]*membership.Member, error) RemoveMember(ctx context.Context, id uint64) ([]*membership.Member, error) UpdateMember(ctx context.Context, updateMemb membership.Member) ([]*membership.Member, error) PromoteMember(ctx context.Context, id uint64) ([]*membership.Member, error) ClusterVersion() *semver.Version StorageVersion() *semver.Version Cluster() api.Cluster Alarms() []*pb.AlarmMember LeaderChangedNotify() <-chan struct{}}

NewPeerHandler函数调用newPeerHandler函数注册raftHandler、leaseHandler、hashKVHandler、downgradeEnabledHandler和versionHandler、peerMembersHandler、peerMemberPromoteHandler到mux中。这里罗列一下这些handler所对应的路径。

handler

path

raftHandler

/raft 或 /raft/

leaseHandler

/leases 或 /leases/internal

hashKVHandler

/members/hashkv

downgradeEnabledHandler

/downgrade/enabled

versionHandler

/version

peerMembersHandler

/members

peerMemberPromoteHandler

/members/promote/

// NewPeerHandler generates an to handle etcd peer requests.func NewPeerHandler(lg *zap.Logger, s etcdserver.ServerPeerV2) { return newPeerHandler(lg, s, s.RaftHandler(), s.LeaseHandler(), s.HashKVHandler(), s.DowngradeEnabledHandler())}func newPeerHandler(lg *zap.Logger,s etcdserver.Server,raftHandler { if lg == nil { lg = zap.NewNop() } peerMembersHandler := newPeerMembersHandler(lg, s.Cluster()) peerMemberPromoteHandler := newPeerMemberPromoteHandler(lg, s) mux := mux.HandleFunc("/", mux.Handle(raftraftHandler) mux.Handle(raftraftHandler) mux.Handle(peerMembersPath, peerMembersHandler) mux.Handle(peerMemberPromotePrefix, peerMemberPromoteHandler) if leaseHandler != nil { mux.Handle(leaseleaseHandler) mux.Handle(leaseleaseHandler) } if downgradeEnabledHandler != nil { mux.Handle(etcdserver.DowngradeEnabledPath, downgradeEnabledHandler) } if hashKVHandler != nil { mux.Handle(etcdserver.PeerHashKVPath, hashKVHandler) } mux.HandleFunc(versionPath, versionHandler(s, serveVersion)) return mux}

RaftHandler() RaftHandler()函数定义在server/etcdserver/server.go文件中,其会调用transport的Handler函数。

func (s *EtcdServer) RaftHandler() { return s.r.transport.Handler() }

transport的Handler函数定义在server/etcdserver/api/raft(t *Transport) Handler() { pipelineHandler := newPipelineHandler(t, t.Raft, t.ClusterID) streamHandler := newStreamHandler(t, t, t.Raft, t.ID, t.ClusterID) snapHandler := newSnapshotHandler(t, t.Raft, t.Snapshotter, t.ClusterID) mux := mux.Handle(RaftPrefix, pipelineHandler) mux.Handle(RaftStreamPrefix+"/", streamHandler) mux.Handle(RaftSnapshotPrefix, snapHandler) mux.Handle(ProbingPrefix, probing.NewHandler()) return mux}

这里罗列一下这些handler所对应的路径。

handler

path

raftHandler

/raft 或 /raft/

pipelineHandler

/raft

streamHandler

/raft/stream

snapHandler

/raft/snapshot

probing.NewHandler()

/raft/probing

LeaseHandler() LeaseHandler()函数定义在server/etcdserver/server.go文件中,其会调用lease(s *EtcdServer) LeaseHandler() { if s.lessor == nil { return nil } return leases.ApplyWait)}

HashKVHandler() HashKVHandler()函数定义在server/etcdserver/corrupt.go文件中,其会返回hashKVHandler结构体。

func (s *EtcdServer) HashKVHandler() { return &hashKVHandler{lg: s.Logger(), server: s}}type hashKVHandler struct { lg *zap.Logger server *EtcdServer}

DowngradeEnabledHandler() DowngradeEnabledHandler()函数定义在server/etcdserver/server.go文件中,其会返回downgradeEnabledHandler结构体。

type downgradeEnabledHandler struct { lg *zap.Logger cluster api.Cluster server *EtcdServer}func (s *EtcdServer) DowngradeEnabledHandler() { return &downgradeEnabledHandler{ lg: s.Logger(), cluster: s.cluster, server: s, }}

versionHandler(s, serveVersion) versionHandler()函数定义在server/etcdserver/api/etcdversionHandler(server etcdserver.Server, fn func(*string, string)) { return func(w r *{ clusterVersion := server.ClusterVersion() storageVersion := server.StorageVersion() clusterVersionStr, storageVersionStr := "not_decided", "unknown" if clusterVersion != nil { clusterVersionStr = clusterVersion.String() } if storageVersion != nil { storageVersionStr = storageVersion.String() } fn(w, r, clusterVersionStr, storageVersionStr) }}func serveVersion(w r *clusterV, storageV string) { if !allowMethod(w, r, "GET") { return } vs := version.Versions{ Server: version.Version, Cluster: clusterV, Storage: storageV, } w.Header().Set("Content-Type", "application/json") b, err := json.Marshal(&vs) if err != nil { panic(fmt.Sprintf("cannot marshal versions to json (%v)", err)) } w.Write(b)}

newPeerMembersHandler(lg, s.Cluster()) newPeerMembersHandler()函数定义在server/etcdserver/api/etcdnewPeerMembersHandler(lg *zap.Logger, cluster api.Cluster) { return &peerMembersHandler{ lg: lg, cluster: cluster, }}type peerMembersHandler struct { lg *zap.Logger cluster api.Cluster}

newPeerMemberPromoteHandler(lg, s) newPeerMemberPromoteHandler()函数定义在server/etcdserver/api/etcdnewPeerMemberPromoteHandler(lg *zap.Logger, s etcdserver.Server) { return &peerMemberPromoteHandler{ lg: lg, cluster: s.Cluster(), server: s, }}type peerMemberPromoteHandler struct { lg *zap.Logger cluster api.Cluster server etcdserver.Server}

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:CloudFlare遇到Error 526无效的SSL证书解决方法
下一篇:Greenplum数据库故障分析——semop(id=2000421076,num=11) failed: invalid argument
相关文章

 发表评论

暂时没有评论,来抢沙发吧~