共计 13651 个字符,预计需要花费 35 分钟才能阅读完成。
作者
李腾飞,腾讯容器技术研发工程师,腾讯云 TKE 后盾研发,SuperEdge 外围开发成员。
杜杨浩,腾讯云高级工程师,热衷于开源、容器和 Kubernetes。目前次要从事镜像仓库,Kubernetes 集群高可用 & 备份还原,以及边缘计算相干研发工作。
SuperEdge 介绍
SuperEdge 是 Kubernetes 原生的边缘容器计划,它将 Kubernetes 弱小的容器治理能力扩大到边缘计算场景中,针对边缘计算场景中常见的技术挑战提供了解决方案,如:单集群节点跨地区、云边网络不牢靠、边缘节点位于 NAT 网络等。这些能力能够让利用很容易地部署到边缘计算节点上,并且牢靠地运行,能够帮忙您很不便地把散布在各处的计算资源放到一个 Kubernetes 集群中治理,包含但不限于:边缘云计算资源、公有云资源、现场设施,打造属于您的边缘 PaaS 平台。SuperEdge 反对所有 Kubernetes 资源类型、API 接口、应用形式、运维工具,无额定的学习老本,也兼容其余云原生我的项目,如:Promethues,使用者能够联合其余所需的云原生我的项目一起应用。我的项目由以下公司独特发动:腾讯、Intel、VMware、虎牙直播、寒武纪、首都在线和美团。
云边隧道的架构与原理
在边缘场景中,很多时候都是单向网络,即只有边缘节点能被动拜访云端。云边隧道次要用于代理云端拜访边缘节点组件的申请,解决云端无奈间接拜访边缘节点的问题。
架构图如下所示:
实现原理为:
- 边缘节点上 tunnel-edge 被动连贯 tunnel-cloud service,tunnel-cloud service 依据负载平衡策略将申请转到 tunnel-cloud pod
- tunnel-edge 与 tunnel-cloud 建设 gRPC 连贯后,tunnel-cloud 会把本身的 podIp 和 tunnel-edge 所在节点的 nodeName 的映射写入 tunnel-dns。gRPC 连贯断开之后,tunnel-cloud 会删除相干 podIp 和节点名的映射
而整个申请的代理转发流程如下:
- apiserver 或者其它云端的利用拜访边缘节点上的 kubelet 或者其它利用时,tunnel-dns 通过 DNS 劫持 (将 HTTP Request 中的 host 中的节点名解析为 tunnel-cloud 的 podIp) 把申请转发到 tunnel-cloud 的 pod 上
- tunnel-cloud 依据节点名把申请信息转发到节点名对应的与 tunnel-edge 建设的 gRPC 连贯上
- tunnel-edge 依据接管的申请信息申请边缘节点上的利用
Tunnel 外部模块数据交互
在介绍完 Tunnel 的配置后,上面介绍 Tunnel 的外部数据流转:
上图标记出了 HTTPS 代理的数据流转,TCP 代理数据流转和 HTTPS 的相似,其中的关键步骤:
- HTTPS Server -> StreamServer:HTTPS Server 通过 Channel 将 StreamMsg 发送给 Stream Server,其中的 Channel 是依据 StreamMsg.Node 字段从 nodeContext 获取 node.Channel
- StreamServer -> StreamClient: 每个云边隧道都会调配一个 node 对象,将 StreamClient 发送到 node 中的 Channel 即可把数据发往 StreamClient
- StreamServer -> HTTPS Server: StreamServer 通过 Channel 将 StreamMsg 发送给 HTTPS Server,其中的 Channel 是依据 StreamMsg.Node 从 nodeContext 获取 node,通过 StreamMsg.Topic 与 conn.uid 匹配获取 HTTPS 模块的 conn.Channel
nodeContext 和 connContext 都是做连贯的治理,然而 nodeContext 治理 gRPC 长连贯的和 connContext 治理的下层转发申请的连贯 (TCP 和 HTTPS) 的生命周期是不雷同的,因而须要离开治理
Tunnel 的连贯治理
Tunnel 治理的连贯能够分为底层连贯 (云端隧道的 gRPC 连贯) 和下层利用连贯(HTTPS 连贯和 TCP 连贯),连贯异样的治理的能够分为以下几种场景:
gRPC 连贯失常,下层连贯异样
以 HTTPS 连贯为例,tunnel-edge 的 HTTPS Client 与边缘节点 Server 连贯异样断开,会发送 StreamMsg (StreamMsg.Type=CLOSE) 音讯,tunnel-cloud 在接管到 StreamMsg 音讯之后会被动敞开 HTTPS Server 与 HTTPS Client 的连贯。
gRPC 连贯异样
gRPC 连贯异样,Stream 模块会依据与 gPRC 连贯绑定的 node.connContext,向 HTTPS 和 TCP 模块发送 StreamMsg(StreamMsg.Type=CLOSE),HTTPS 或 TCP 模块接管音讯之后被动断开连接。
Stream (gRPC 云边隧道)
func (stream *Stream) Start(mode string) {context.GetContext().RegisterHandler(util.STREAM_HEART_BEAT, util.STREAM, streammsg.HeartbeatHandler)
if mode == util.CLOUD {
...
// 启动 gRPC server
go connect.StartServer()
...
// 同步 coredns 的 hosts 插件的配置文件
go connect.SynCorefile()} else {
// 启动 gRPC client
go connect.StartSendClient()
...
}
...
}
tunnel-cloud 首先调用 RegisterHandler 注册心跳音讯处理函数 HeartbeatHandler
SynCorefile 执行同步 tunnel-coredns 的 hosts 插件的配置文件,每隔一分钟 (思考到 configmap 同步 tunnel-cloud 的 pod 挂载文件的工夫) 执行一次 checkHosts,如下:
func SynCorefile() {
for {
...
err := coreDns.checkHosts()
...
time.Sleep(60 * time.Second)
}
}
而 checkHosts 负责 configmap 具体的刷新操作:
func (dns *CoreDns) checkHosts() error {nodes, flag := parseHosts()
if !flag {return nil}
...
_, err = dns.ClientSet.CoreV1().ConfigMaps(dns.Namespace).Update(cctx.TODO(), cm, metav1.UpdateOptions{})
...
}
checkHosts 首先调用 parseHosts 获取本地 hosts 文件中边缘节点名称以及对应 tunnel-cloud podIp 映射列表,比照 podIp 的对应节点名和内存中节点名,如果有变动则将这个内容笼罩写入 configmap 并更新:
另外,这里 tunnel-cloud 引入 configmap 本地挂载文件的目标是:优化托管模式下泛滥集群同时同步 tunnel-coredns 时的性能
tunnel-edge 首先调用 StartClient 与 tunnel-edge 建设 gRPC 连贯,返回 grpc.ClientConn
func StartClient() (*grpc.ClientConn, ctx.Context, ctx.CancelFunc, error) {
...
opts := []grpc.DialOption{grpc.WithKeepaliveParams(kacp),
grpc.WithStreamInterceptor(ClientStreamInterceptor),
grpc.WithTransportCredentials(creds)}
conn, err := grpc.Dial(conf.TunnelConf.TunnlMode.EDGE.StreamEdge.Client.ServerName, opts...)
...
}
在调用 grpc.Dial 时会传递grpc.WithStreamInterceptor(ClientStreamInterceptor)
DialOption,将 ClientStreamInterceptor 作为 StreamClientInterceptor 传递给 grpc.ClientConn, 期待 gRPC 连贯状态变为 Ready,而后执行 Send 函数。streamClient.TunnelStreaming 调用 StreamClientInterceptor 返回 wrappedClientStream 对象
func ClientStreamInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
...
opts = append(opts, grpc.PerRPCCredentials(oauth.NewOauthAccess(&oauth2.Token{AccessToken: clientToken})))
...
return newClientWrappedStream(s), nil
}
ClientStreamInterceptor 会将边缘节点名称以及 token 结构成 oauth2.Token.AccessToken 进行认证传递,并构建 wrappedClientStream
stream.Send 会并发调用 wrappedClientStream.SendMsg 以及 wrappedClientStream.RecvMsg 别离用于 tunnel-edge 发送以及承受,并阻塞期待
留神:tunnel-edge 向 tunnel-cloud 注册节点信息是在建设 gRPC Stream 时,而不是创立 grpc.connClient 的时候
整个过程如下图所示:
相应的,在初始化 tunnel-cloud 时,会将 grpc.StreamInterceptor(ServerStreamInterceptor)
构建成 gRPC ServerOption,并将 ServerStreamInterceptor 作为 StreamServerInterceptor 传递给 grpc.Server:
func StartServer() {
...
opts := []grpc.ServerOption{grpc.KeepaliveEnforcementPolicy(kaep), grpc.KeepaliveParams(kasp), grpc.StreamInterceptor(ServerStreamInterceptor), grpc.Creds(creds)}
s := grpc.NewServer(opts...)
proto.RegisterStreamServer(s, &stream.Server{})
...
}
云端 gRPC 服务在承受到 tunnel-edge 申请 (建设 Stream 流) 时,会调用 ServerStreamInterceptor,而 ServerStreamInterceptor 会从 gRPC metadata 中解析出此 gRPC 连贯对应的边缘节点名和 token,并对该 token 进行校验,而后依据节点名构建 wrappedServerStream 作为与该边缘节点通信的解决对象(每个边缘节点对应一个解决对象),handler 函数会调用 stream.TunnelStreaming,并将 wrappedServerStream 传递给它(wrappedServerStream 实现了 proto.Stream_TunnelStreamingServer 接口)
func ServerStreamInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {md, ok := metadata.FromIncomingContext(ss.Context())
...
tk := strings.TrimPrefix(md["authorization"][0], "Bearer")
auth, err := token.ParseToken(tk)
...
if auth.Token != token.GetTokenFromCache(auth.NodeName) {klog.Errorf("invalid token node = %s", auth.NodeName)
return ErrInvalidToken
}
err = handler(srv, newServerWrappedStream(ss, auth.NodeName))
if err != nil {ctx.GetContext().RemoveNode(auth.NodeName)
klog.Errorf("node disconnected node = %s err = %v", auth.NodeName, err)
}
return err
}
而当 TunnelStreaming 办法退出时,就会执 ServerStreamInterceptor 移除节点的逻辑ctx.GetContext().RemoveNode
TunnelStreaming 会并发调用 wrappedServerStream.SendMsg 以及 wrappedServerStream.RecvMsg 别离用于 tunnel-cloud 发送以及承受,并阻塞期待:
func (s *Server) TunnelStreaming(stream proto.Stream_TunnelStreamingServer) error {errChan := make(chan error, 2)
go func(sendStream proto.Stream_TunnelStreamingServer, sendChan chan error) {sendErr := sendStream.SendMsg(nil)
...
sendChan <- sendErr
}(stream, errChan)
go func(recvStream proto.Stream_TunnelStreamingServer, recvChan chan error) {recvErr := stream.RecvMsg(nil)
...
recvChan <- recvErr
}(stream, errChan)
e := <-errChan
return e
}
SendMsg 会从 wrappedServerStream 对应边缘节点 node 中承受 StreamMsg,并调用 ServerStream.SendMsg 发送该音讯给 tunnel-edge
func (w *wrappedServerStream) SendMsg(m interface{}) error {if m != nil { return w.ServerStream.SendMsg(m) } node := ctx.GetContext().AddNode(w.node) ... for {msg := <-node.NodeRecv() ... err := w.ServerStream.SendMsg(msg) ... }}
而 RecvMsg 会一直承受来自 tunnel-edge 的 StreamMsg,并调用 StreamMsg. 对应的处理函数进行操作
小结:
- Stream 模块负责建设 gRPC连贯以及通信(云边隧道)
- 边缘节点上 tunnel-edge 被动连贯云端 tunnel-cloud service,tunnel-cloud service 依据负载平衡策略将申请转到tunnel-cloud pod
- tunnel-edge 与 tunnel-cloud 建设 gRPC 连贯后,tunnel-cloud 会把本身的 podIp 和 tunnel-edge 所在节点的 nodeName 的映射写入tunnel-coredns。gRPC 连贯断开之后,tunnel-cloud 会删除相干 podIp 和节点名的映射
- tunnel-edge 会利用边缘节点名以及 token 构建 gRPC 连贯,而 tunnel-cloud 会通过认证信息解析 gRPC 连贯对应的边缘节点,并对每个边缘节点别离构建一个 wrappedServerStream 进行解决(同一个 tunnel-cloud 能够解决多个 tunnel-edge 的连贯)
- tunnel-cloud 每隔一分钟 (思考到 configmap 同步 tunnel-cloud 的 pod 挂载文件的工夫) 向 tunnel-coredns 的 hosts 插件的配置文件对应 configmap 同步一次边缘节点名以及 tunnel-cloud podIp 的映射;另外,引入 configmap 本地挂载文件优化了托管模式下泛滥集群同时同步 tunnel-coredns 时的性能
- tunnel-edge 每隔一分钟会向 tunnel-cloud 发送代表该节点失常的心跳 StreamMsg,而 tunnel-cloud 在承受到该心跳后会进行回应(心跳是为了探测 gRPC Stream 流是否失常)
- StreamMsg 包含心跳,TCP 代理以及 HTTPS 申请等不同类型音讯;同时 tunnel-cloud 通过 context.node 辨别与不同边缘节点 gRPC 连贯隧道
HTTPS 代理
HTTPS 模块负责建设云边的 HTTPS 代理,将云端组件 (例如:kube-apiserver) 的 https 申请转发给边端服务(例如:kubelet)
func (https *Https) Start(mode string) {context.GetContext().RegisterHandler(util.CONNECTING, util.HTTPS, httpsmsg.ConnectingHandler) context.GetContext().RegisterHandler(util.CONNECTED, util.HTTPS, httpsmsg.ConnectedAndTransmission) context.GetContext().RegisterHandler(util.CLOSED, util.HTTPS, httpsmsg.ConnectedAndTransmission) context.GetContext().RegisterHandler(util.TRANSNMISSION, util.HTTPS, httpsmsg.ConnectedAndTransmission) if mode == util.CLOUD {go httpsmng.StartServer() }}
Start 函数首先注册了 StreamMsg 的处理函数,其中 CLOSED 处理函数次要解决敞开连贯的音讯,并启动 HTTPS Server。
当云端组件向 tunnel-cloud 发送 HTTPS 申请时,serverHandler 会首先从 request.Host 字段解析节点名,若先建设 TLS 连贯,而后在连贯中写入 HTTP 的 request 对象,此时的 request.Host 能够不设置,则须要从 request.TLS.ServerName 解析节点名。HTTPS Server 读取 request.Body 以及 request.Header 构建 HttpsMsg 构造体,并序列化后封装成 StreamMsg,通过 Send2Node 发送 StreamMsg 放入 StreamMsg.node 对应的 node 的 Channel 中,由 Stream 模块发送到 tunnel-edge
func (serverHandler *ServerHandler) ServeHTTP(writer http.ResponseWriter, request *http.Request) {var nodeName string nodeinfo := strings.Split(request.Host, ":") if context.GetContext().NodeIsExist(nodeinfo[0]) {nodeName = nodeinfo[0] } else {nodeName = request.TLS.ServerName} ... node.Send2Node(StreamMsg)}
tunnel-edge 承受到 StreamMsg,并调用 ConnectingHandler 函数进行解决:
func ConnectingHandler(msg *proto.StreamMsg) error {go httpsmng.Request(msg) return nil}func Request(msg *proto.StreamMsg) {httpConn, err := getHttpConn(msg) ... rawResponse := bytes.NewBuffer(make([]byte, 0, util.MaxResponseSize)) rawResponse.Reset() respReader := bufio.NewReader(io.TeeReader(httpConn, rawResponse)) resp, err := http.ReadResponse(respReader, nil) ... node.BindNode(msg.Topic) ... if resp.StatusCode != http.StatusSwitchingProtocols {handleClientHttp(resp, rawResponse, httpConn, msg, node, conn) } else {handleClientSwitchingProtocols(httpConn, rawResponse, msg, node, conn) }}
ConnectingHandler 会调用 Request 对该 StreamMsg 进行解决。Reqeust 首先通过 getHttpConn 与边缘节点 Server 建设的 TLS 连贯。解析 TLS 连贯中返回的数据获取 HTTP Response,Status Code 为 200,将 Response 的内容发送到 tunnel-cloud,Status Code 为 101,将从TLS 连贯读取 Response 的二进制数据发送到 tunnel-cloud,其中 StreamMsg.Type 为 CONNECTED。
tunnel-cloud 在承受到该 StreamMsg 后,会调用 ConnectedAndTransmission 进行解决:
func ConnectedAndTransmission(msg *proto.StreamMsg) error {conn := context.GetContext().GetConn(msg.Topic) ... conn.Send2Conn(msg) return nil}
通过 msg.Topic(conn uid) 获取 conn,并通过 Send2Conn 将音讯塞到该 conn 对应的管道中
云端 HTTPS Server 在承受到云端的 CONNECTED 音讯之后,认为HTTPS 代理胜利建设。并继续执行 handleClientHttp or handleClientSwitchingProtocols 进行数据传输,这里只剖析 handleClientHttp 非协定晋升下的数据传输过程,HTTPS Client 端的解决逻辑如下:
func handleClientHttp(resp *http.Response, rawResponse *bytes.Buffer, httpConn net.Conn, msg *proto.StreamMsg, node context.Node, conn context.Conn) {... go func(read chan *proto.StreamMsg, response *http.Response, buf *bytes.Buffer, stopRead chan struct{}) {rrunning := true for rrunning { bbody := make([]byte, util.MaxResponseSize) n, err := response.Body.Read(bbody) respMsg := &proto.StreamMsg{Node: msg.Node, Category: msg.Category, Type: util.CONNECTED, Topic: msg.Topic, Data: bbody[:n], } ... read <- respMsg } ... }(readCh, resp, rawResponse, stop) running := true for running {select { case cloudMsg := <-conn.ConnRecv(): ... case respMsg := <-readCh: ... node.Send2Node(respMsg) ... } } ...}
这里 handleClientHttp 会始终尝试读取来自边端组件的数据包,并构建成 TRANSNMISSION 类型的 StreamMsg 发送给 tunnel-cloud,tunnel-cloud 在承受到 StreamMsg 后调用 ConnectedAndTransmission 函数,将 StreamMsg 放入 StreamMsg.Type 对应的 HTTPS 模块的 conn.Channel 中
func handleServerHttp(rmsg *HttpsMsg, writer http.ResponseWriter, request *http.Request, node context.Node, conn context.Conn) {for k, v := range rmsg.Header { writer.Header().Add(k, v) } flusher, ok := writer.(http.Flusher) if ok {running := true for running { select { case <-request.Context().Done(): ... case msg := <-conn.ConnRecv(): ... _, err := writer.Write(msg.Data) flusher.Flush() ...} } ...}
handleServerHttp 在承受到 StreamMsg 后,会将 msg.Data,也即边端组件的数据包,发送给云端组件。整个数据流是单向的由边端向云端传送,如下所示:
而对于相似 kubectl exec
的申请,数据流是双向的,此时边端组件 (kubelet) 会返回 StatusCode 为 101 的回包,标示协定晋升,之后 tunnel-cloud 以及 tunnel-edge 会别离切到 handleServerSwitchingProtocols 以及 handleClientSwitchingProtocols 对 HTTPS 底层连贯进行读取和写入,实现数据流的双向传输。
架构如下所示:
总结 HTTPS 模块如下:
小结
- HTTPS:负责建设云边 HTTPS 代理(eg:云端 kube-apiserver <-> 边端 kubelet),并传输数据
- 作用与 TCP 代理相似,不同的是 tunnel-cloud 会读取云端组件 HTTPS 申请中携带的边缘节点名,并尝试建设与该边缘节点的 HTTPS 代理;而不是像 TCP 代理一样随机抉择一个云边隧道进行转发
- 云端 apiserver 或者其它云端的利用拜访边缘节点上的 kubelet 或者其它利用时,tunnel-dns 通过 DNS 劫持 (将 Request host 中的节点名解析为 tunnel-cloud 的 podIp) 把申请转发到 tunnel-cloud 的 pod 上,tunnel-cloud 把申请信息封装成 StreamMsg 通过与节点名对应的云边隧道发送到 tunnel-edge,tunnel-edge 通过接管到的 StreamMsg 的 Addr 字段和配置文件中的证书与边缘端 Server 建设 TLS 连贯,并将 StreamMsg 中的申请信息写入 TLS 连贯。tunnel-edge 从 TLS 连贯中读取到边缘端 Server 的返回数据,将其封装成 StreamMsg 发送到 tunnel-cloud,tunnel-cloud 将接管到数据写入云端组件与 tunnel-cloud 建设的连贯中。
TCP
TCP 模块负责在多集群治理中建设云端管控集群与边缘独立集群的一条 TCP 代理隧道:
func (tcp *TcpProxy) Start(mode string) {context.GetContext().RegisterHandler(util.TCP_BACKEND, tcp.Name(), tcpmsg.BackendHandler) context.GetContext().RegisterHandler(util.TCP_FRONTEND, tcp.Name(), tcpmsg.FrontendHandler) context.GetContext().RegisterHandler(util.CLOSED, tcp.Name(), tcpmsg.ControlHandler) if mode == util.CLOUD {... for front, backend := range Tcp.Addr { go func(front, backend string) {ln, err := net.Listen("tcp", front) ... for {rawConn, err := ln.Accept() .... fp := tcpmng.NewTcpConn(uuid, backend, node) fp.Conn = rawConn fp.Type = util.TCP_FRONTEND go fp.Write() go fp.Read() } }(front, backend) } }
Start 函数首先注册了 StreamMsg 的处理函数,其中 CLOSED 处理函数次要解决敞开连贯的音讯,之后在云端启动 TCP Server。
在承受到云端组件的申请后,TCP Server 会将申请封装成 StremMsg 发送给 StreamServer,由 StreamServer 发送到 tunnel-edge, 其中 StreamMsg.Type=FrontendHandler,StreamMsg.Node 从已建设的云边隧道的节点中随机抉择一个。
tunnel-edge 在承受到该 StreamMsg 后,会调用 FrontendHandler 函数解决
func FrontendHandler(msg *proto.StreamMsg) error {c := context.GetContext().GetConn(msg.Topic) if c != nil {c.Send2Conn(msg) return nil } tp := tcpmng.NewTcpConn(msg.Topic, msg.Addr, msg.Node) tp.Type = util.TCP_BACKEND tp.C.Send2Conn(msg) tcpAddr, err := net.ResolveTCPAddr("tcp", tp.Addr) if err != nil {... conn, err := net.DialTCP("tcp", nil, tcpAddr) ... tp.Conn = conn go tp.Read() go tp.Write() return nil}
FrontendHandler 首先应用 StreamMsg.Addr 与 Edge Server 建设 TCP 连贯,启动协程异步对 TCP 连贯 Read 和 Write,同时新建 conn 对象(conn.uid=StreamMsg.Topic),并 eamMsg.Data 写入 TCP 连贯。tunnel-edge 在接管到 Edge Server 的返回数据将其封装为 StreamMsg(StreamMsg.Topic=BackendHandler) 发送到 tunnel-cloud
整个过程如图所示:
小结
- TCP:负责在多集群治理中建设云端与边端的 TCP 代理
- 云端组件通过 TCP 模块拜访边缘端的 Server,云端的 TCP Server 在接管到申请会将申请封装成 StreamMsg 通过云边隧道 (在已连贯的隧道中随机抉择一个,因而举荐在只有一个 tunnel-edge 的场景下应用 TCP 代理) 发送到 tunnel-edge,*tunnel-edge 通过接管到 StreamMag 的 Addr 字段与边缘端 Server 建设TCP 连贯,并将申请写入 TCP 连贯。tunnel-edge 从 TCP 连贯中读取边缘端 Server 的返回音讯,通过云边缘隧道发送到tunnel-cloud,tunnel-cloud 接管到音讯之后将其写入云端组件与 TCP Server 建设的连贯
瞻望
- 反对更多的网络协议(已反对 HTTPS 和 TCP)
- 反对云端拜访边缘节点业务 pod server
- 多个边缘节点同时退出集群时,多正本 tunnel-cloud pod 在更新 tunnel-coredns 的 hosts 插件配置文件对应 configmap 时没有加锁,尽管概率较低,但实践上仍然存在写抵触的可能性
Refs
-
kubernetes-reading-notes
【腾讯云原生】云说新品、云研新术、云游新活、云赏资讯,扫码关注同名公众号,及时获取更多干货!!