共计 7056 个字符,预计需要花费 18 分钟才能阅读完成。
之前都是花工夫在 k8s 的搭建,根底概念学习以及具体利用在 k8s 上的搭建,始终想对 k8s 源码进行浏览学习,这次借着云原生社区的 Kubernetes 源码研习社我的项目机会对 k8s 源码进行摄入的学习,也能鞭策本人坚持下去把源码读完。
本文次要对第一周 kube-scheduler 的架构设计和启动流程的学习进行了一个总结。
1、kube-scheduler 架构设计
kube-scheduler 组件是 kubernetes 默认的调度器,kube-scheduler 组件内置的预选和优选算法,次要负责把创立的 Pod 调度到具体的 Work 工作节点上,实现 Pod 到 Node 的调度和预绑定工作(具体的绑定实现在 kube-apiserver 组件上实现)。
如 kube-scheduler 架构图上图所示,kube-scheduler 通过 informer 从 kube-apiserver 保护一个还未绑定到 Node 的待调度 Pod 队列,每个调度循环中,scheduler 会从 Pod queue 中获取一个 Pod,依据 scheduler 框架的 filter 阶段(预选阶段) 和Score 阶段(优选阶段)从待绑定的 Node 列表中选出最优 Node,最初通过 绑定阶段 把 Pod 绑定到最优的 Node 节点上。
- filter 阶段(预选阶段):依据 Pod 须要的 CPU、内存、端口等资源状况,间接过滤掉不合乎 Pod 资源条件的 Node 节点
- Score 阶段(优选阶段):依据 Pod 的节点亲和性、资源状况等进行归一化打分,选出分数最优的 Node 节点。
- 绑定阶段:该阶段次要把 Pod 预绑定到选出的 Node 上,并择机把绑定后果上传到 kube-apiserver,由 kube-apiserver 实现理论绑定操作。
2、kube-scheduler 启动流程
kube-scheduler 组件程序入口在 cmd/kube-scheduler/scheduler.go
文件的 main()
函数中,该函数次要实现了命令行解析和运行 scheduler 工作。cmd/kube-scheduler/scheduler.go:33
func main() {
...
// 1、创立 cobra.Command 对象,cobra 次要用来解析 scheduler 命令行参数
command := app.NewSchedulerCommand()
// 2、初始化 log
pflag.CommandLine.SetNormalizeFunc(cliflag.WordSepNormalizeFunc)
logs.InitLogs()
defer logs.FlushLogs()
// 3、运行 command 对象的 Run 成员对应的函数,该函数 在 cmd/kube-scheduler/app/server.go:117
if err := command.Execute(); err != nil {os.Exit(1)
}
}
下面的 Execute()
办法理论执行的是 cmd/kube-scheduler/app/server.go:117
的 runCommand()
办法
func runCommand(cmd *cobra.Command, args []string, opts *options.Options, registryOptions ...Option) error {
......
//1、装载生成一个 scheduler.Scheduler 对象
cc, sched, err := Setup(ctx, args, opts, registryOptions...)
if err != nil {return err}
......
// 2、通过装载好的 Scheduler 对象,执行真正的 scheduler 流程
return Run(ctx, cc, sched)
}
Run()
函数次要进行以下操作:
- 筹备事件 (Event) 播送器
- 启动 healthz、metrics、scheduler server,次要监听 10251 和 10259 端口,前者为非平安端口(可拜访 healthz/metics),后者为平安端口,须要进行认证
- 启动所有 informer,并通过 informer 同步所有信息到本地 cache 中
- 当领导者选举设置时,运行领导者选举办法
- 执行
sched.Run(ctx)
办法,该办法执行 scheduler 主逻辑
cmd/kube-scheduler/app/server.go:141
func Run(ctx context.Context, cc *schedulerserverconfig.CompletedConfig, sched *scheduler.Scheduler) error {
......
// 1、筹备 event broadcaster.
if cc.Broadcaster != nil && cc.EventClient != nil {cc.Broadcaster.StartRecordingToSink(ctx.Done())
}
if cc.CoreBroadcaster != nil && cc.CoreEventClient != nil {cc.CoreBroadcaster.StartRecordingToSink(&corev1.EventSinkImpl{Interface: cc.CoreEventClient.Events("")})
}
// 2、筹备并启动 healthz 服务.
var checks []healthz.HealthChecker
if cc.ComponentConfig.LeaderElection.LeaderElect {checks = append(checks, cc.LeaderElection.WatchDog)
}
if cc.InsecureServing != nil {
separateMetrics := cc.InsecureMetricsServing != nil
handler := buildHandlerChain(newHealthzHandler(&cc.ComponentConfig, separateMetrics, checks...), nil, nil)
if err := cc.InsecureServing.Serve(handler, 0, ctx.Done()); err != nil {return fmt.Errorf("failed to start healthz server: %v", err)
}
}
// 3、启动 metrics 服务
if cc.InsecureMetricsServing != nil {handler := buildHandlerChain(newMetricsHandler(&cc.ComponentConfig), nil, nil)
if err := cc.InsecureMetricsServing.Serve(handler, 0, ctx.Done()); err != nil {return fmt.Errorf("failed to start metrics server: %v", err)
}
}
// 4、启动 scheduler 平安服务
if cc.SecureServing != nil {handler := buildHandlerChain(newHealthzHandler(&cc.ComponentConfig, false, checks...), cc.Authentication.Authenticator, cc.Authorization.Authorizer)
// TODO: handle stoppedCh returned by c.SecureServing.Serve
if _, err := cc.SecureServing.Serve(handler, 0, ctx.Done()); err != nil {
// fail early for secure handlers, removing the old error loop from above
return fmt.Errorf("failed to start secure server: %v", err)
}
}
// 5、启动所有 informer.
go cc.PodInformer.Informer().Run(ctx.Done())
cc.InformerFactory.Start(ctx.Done())
// 6、期待所有 informer 获取信息到本地 cache 中.
cc.InformerFactory.WaitForCacheSync(ctx.Done())
// 7、若领导者选举不为空,开始进行领导者选举.
if cc.LeaderElection != nil {
cc.LeaderElection.Callbacks = leaderelection.LeaderCallbacks{
OnStartedLeading: sched.Run,
OnStoppedLeading: func() {klog.Fatalf("leaderelection lost")
},
}
leaderElector, err := leaderelection.NewLeaderElector(*cc.LeaderElection)
if err != nil {return fmt.Errorf("couldn't create leader elector: %v", err)
}
leaderElector.Run(ctx)
return fmt.Errorf("lost lease")
}
// 8、执行 scheduler 主逻辑
sched.Run(ctx)
return fmt.Errorf("finished without leader elect")
}
sched.Run()
次要执行以下步骤
- 期待 Cache 同步结束
- 启动管制 Pod 队列的 goroutines
- 通过 wait.UntilWithContext() 办法,定时调用 sched.scheduleOne() 办法,sched.scheduleOne() 办法实现一轮调度
pkg/scheduler/scheduler.go:360
func (sched *Scheduler) Run(ctx context.Context) {
// 1、期待 cache 同步实现
if !cache.WaitForCacheSync(ctx.Done(), sched.scheduledPodsHasSynced) {return}
// 2、启动 Pod 队列 Goroutines
sched.SchedulingQueue.Run()
// 3、启动通过 sched.scheduleOne()办法定时调度的 goroutines
wait.UntilWithContext(ctx, sched.scheduleOne, 0)
sched.SchedulingQueue.Close()}
sched.scheduleOne()
办法调用一次就为一个 Pod 实现一次调度轮次,该办法次要逻辑如下
- 从 Pod 队列汇总获取一个 Pod
- 同步执行 Filter 和 Score 阶段,为 Pod 选取适合的 Node
- 假如 Pod 运行在选取好的 Node 上,运行 Reserve 插件,从该节点上保留下所需的资源
- 执行 Permit 插件,该插件次要是用作 Bind 前期待,可能场景是同一组 Pod 须要同时绑定和调度到节点上
- 启动一个额定 goroutine,执行 prebind 插件和 postbind 插件实现 Pod 到 Node 的绑定操作
pkg/scheduler/scheduler.go:520
func (sched *Scheduler) scheduleOne(ctx context.Context) {
// 1、从 pod queue 队列中获取一个 Pod
podInfo := sched.NextPod()
......
// 2、调用 sched.Algorithm.Schedule()执行 Filter 和 Score 阶段,尝试获取一个可行的 Node
start := time.Now()
state := framework.NewCycleState()
state.SetRecordPluginMetrics(rand.Intn(100) < pluginMetricsSamplePercent)
schedulingCycleCtx, cancel := context.WithCancel(ctx)
defer cancel()
scheduleResult, err := sched.Algorithm.Schedule(schedulingCycleCtx, prof, state, pod)
......
// 3、运行 "reserve" plugins.
if sts := prof.RunReservePlugins(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost); !sts.IsSuccess() {sched.recordSchedulingFailure(prof, assumedPodInfo, sts.AsError(), SchedulerError, sts.Message())
metrics.PodScheduleErrors.Inc()
return
}
// 4、假如 Pod 曾经调度到对应节点上
err = sched.assume(assumedPod, scheduleResult.SuggestedHost)
......
// 5、运行 "permit" plugins.
runPermitStatus := prof.RunPermitPlugins(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
......
// bind the pod to its host asynchronously (we can do this b/c of the assumption step above).
// 6、启动一个 goroutines,异步执行 prebind 和 postbind
go func() {
......
// 6.1 启动 "prebind" plugins.
preBindStatus := prof.RunPreBindPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
......
// 6.2 执行 bind 操作
err := sched.bind(bindingCycleCtx, prof, assumedPod, scheduleResult.SuggestedHost, state)
metrics.E2eSchedulingLatency.Observe(metrics.SinceInSeconds(start))
if err != nil {......} else {
......
// 6.3 启动 "postbind" plugins.
prof.RunPostBindPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
}
}()}
为 Pod 选取 Node 的逻辑次要在 sched.Algorithm.Schedule()
,该办法次要执行以下操作
- 执行 prefilter 插件,过滤资源不适合节点
- 执行 preioritize 办法,为剩下的节点进行归一化打分
- 依据 priority 后果执行 selectHost 操作,抉择最优节点
pkg/scheduler/generic_scheduler.go:147
func (g *genericScheduler) Schedule(ctx context.Context, prof *profile.Profile, state *framework.CycleState, pod *v1.Pod) (result ScheduleResult, err error) {
......
// 1、运行 "prefilter" plugins.
startPredicateEvalTime := time.Now()
filteredNodes, filteredNodesStatuses, err := g.findNodesThatFitPod(ctx, prof, state, pod)
......
// 2、运行优选算法,map/reduce 每个 Node 的分数
priorityList, err := g.prioritizeNodes(ctx, prof, state, pod, filteredNodes)
if err != nil {return result, err}
......
// 3、依据 priority 计算好的列表,抉择最合适的 Node
host, err := g.selectHost(priorityList)
trace.Step("Prioritizing done")
......
}
本文次要介绍了 kube-scheduler 的架构设计和启动简要流程,scheduler 组件的各个对象如何初始化、Pod 队列如何实现、cache 机制如何实现、framwork 预选 / 优选阶段算法、领导者选举算法如何具体实现尚未进行深入研究,这一些将在接下来的学习总结中进行出现。
3、参考文献
kube-scheduler 源码剖析
云原生学习笔记 /1. 调度器外围数据结构与算法剖析
《kubernetes 源码分析》- 电子工业出版社 - 作者:郑东旭
kubernetes 源码