共计 6548 个字符,预计需要花费 17 分钟才能阅读完成。
kube-controller-manager
基本流程:1、构造 2、配置 3、初始化 4、执行
入口函数:/cmd/kube-controller-manager/controller-manager.go
func main() {
rand.Seed(time.Now().UnixNano())
// 构造, 配置, 初始化 command
command := app.NewControllerManagerCommand()
logs.InitLogs()
defer logs.FlushLogs()
// 执行
if err := command.Execute(); err != nil {
fmt.Fprintf(os.Stderr, “%v\n”, err)
os.Exit(1)
}
}
构造执行器:/cmd/kube-controller-manager/app/controllermanager.go
func NewControllerManagerCommand() *cobra.Command {
// 初始化 Controller-manager 的配置选项结构
s, err := options.NewKubeControllerManagerOptions()
…
// 创建执行命令结构
cmd := &cobra.Command{
Use: “kube-controller-manager”,
Long: `The Kubernetes controller manager is a daemon that embeds…’
// 获取所有控制器
c, err := s.Config(KnownControllers(), ControllersDisabledByDefault.List())
…
}
// 返回执行器
return cmd;
}
进入执行:/cmd/kube-controller-manager/app/controllermanager.go
func Run(c *config.CompletedConfig, stopCh <-chan struct{}) error {
…
// 初始化 controller manager 的 HTTP 服务
var unsecuredMux *mux.PathRecorderMux
if c.SecureServing != nil {
…
// 构造 run 的执行体
run := func(stop <-chan struct{}) {
…
// 如果只是单节点,直接运行 run if !c.ComponentConfig.GenericComponent.LeaderElection.LeaderElect {
run(wait.NeverStop)
panic(“unreachable”)
}
// 非单点,选主后执行 run
// 进行选主,并在选为主节点后执行 run
leaderelection.RunOrDie(leaderelection.LeaderElectionConfig{
…
// 选主完成后执行
OnStartedLeading: run,
…
}
run 的执行体:/cmd/kube-controller-manager/app/controllermanager.go >> run()
run := func(stop <-chan struct{}) {
// 创建控制器上下文
ctx, err := CreateControllerContext(c, rootClientBuilder, clientBuilder, stop)
if err != nil {
glog.Fatalf(“error building controller context: %v”, err)
}
saTokenControllerInitFunc := serviceAccountTokenControllerStarter{rootClientBuilder: rootClientBuilder}.startServiceAccountTokenController
// 初始化所有控制器
if err := StartControllers(ctx, saTokenControllerInitFunc, NewControllerInitializers(ctx.LoopMode)); err != nil {
glog.Fatalf(“error starting controllers: %v”, err)
}
// 启动监听资源的事件
ctx.InformerFactory.Start(ctx.Stop)
close(ctx.InformersStarted)
select {}
}
选主流程:/staging/src/k8s.io/client-go/tools/leaderelection/resourcelock/interface.go
// 选主主要有 client-go 工具类完成,选择 configmap/endpoint 来创建资源,哪个执行单元创建成功了此资源便可获得锁,锁信息存储在此 configmap/endpoint 中,选主代码如下
func New(lockType string, ns string, name string, client corev1.CoreV1Interface, rlc ResourceLockConfig) (Interface, error) {
switch lockType {
case EndpointsResourceLock:
return &EndpointsLock{
EndpointsMeta: metav1.ObjectMeta{
Namespace: ns,
Name: name,
},
Client: client,
LockConfig: rlc,
}, nil
case ConfigMapsResourceLock:
return &ConfigMapLock{
ConfigMapMeta: metav1.ObjectMeta{
Namespace: ns,
Name: name,
},
Client: client,
LockConfig: rlc,
}, nil
default:
return nil, fmt.Errorf(“Invalid lock-type %s”, lockType)
}
}
初始化所有控制器:/cmd/kube-controller-manager/app/controllermanager.go
func StartControllers(ctx ControllerContext, startSATokenController InitFunc, controllers map[string]InitFunc) error {
···
// 遍历所有的 controller list
for controllerName, initFn := range controllers {
if !ctx.IsControllerEnabled(controllerName) {
glog.Warningf(“%q is disabled”, controllerName)
continue
}
time.Sleep(wait.Jitter(ctx.ComponentConfig.GenericComponent.ControllerStartInterval.Duration, ControllerStartJitter))
glog.V(1).Infof(“Starting %q”, controllerName)
// 执行每个 controller 的初始化函数
started, err := initFn(ctx)
···
}
return nil
}
创建控制器上下文:/cmd/kube-controller-manager/app/controllermanager.go
func CreateControllerContext(s *config.CompletedConfig, rootClientBuilder, clientBuilder controller.ControllerClientBuilder, stop <-chan struct{}) (ControllerContext, error) {
// 拿到对 apiServer 资源的操作的句柄
versionedClient := rootClientBuilder.ClientOrDie(“shared-informers”)
sharedInformers := informers.NewSharedInformerFactory(versionedClient, ResyncPeriod(s)())
// 确认 api Server 的健康(最多等待的时间为 10s),再获取连接
if err := genericcontrollermanager.WaitForAPIServer(versionedClient, 10*time.Second); err != nil {
return ControllerContext{}, fmt.Errorf(“failed to wait for apiserver being healthy: %v”, err)
}
// 创建并返回 controllerContext
ctx := ControllerContext{
ClientBuilder: clientBuilder,
InformerFactory: sharedInformers,
…
}
return ctx,nil
}
kube-scheduler
基本流程 1、初始化配置 2、构造 3、从队列中获取 pod4、进行绑定
入口函数:/cmd/kube-scheduler/scheduler.go
func main() {
rand.Seed(time.Now().UnixNano())
// 构造
command := app.NewSchedulerCommand()
pflag.CommandLine.SetNormalizeFunc(utilflag.WordSepNormalizeFunc)
// utilflag.InitFlags()
logs.InitLogs()
defer logs.FlushLogs()
// 执行
if err := command.Execute(); err != nil {
fmt.Fprintf(os.Stderr, “%v\n”, err)
os.Exit(1)
}
}
注册调度策略:pkg/scheduler/algorithmprovider/defaults/defaults.go
func registerAlgorithmProvider(predSet, priSet sets.String) {
// Registers algorithm providers. By default we use ‘DefaultProvider’, but user can specify one to be used
// by specifying flag.
factory.RegisterAlgorithmProvider(factory.DefaultProvider, predSet, priSet)
// Cluster autoscaler friendly scheduling algorithm.
factory.RegisterAlgorithmProvider(ClusterAutoscalerProvider, predSet,
copyAndReplace(priSet, priorities.LeastRequestedPriority, priorities.MostRequestedPriority))
}
从组件入口:/cmd/kube-scheduler/app/server.go
func NewSchedulerCommand() *cobra.Command {
// 初始化默认的参数
opts, err := options.NewOptions()
// 构造执行命令对象
cmd := &cobra.Command{
Use: “kube-scheduler”,
Long: `The Kubernetes ······`,
Run: func(cmd *cobra.Command, args []string) {
…
}
// 读取配置参数
opts.AddFlags(cmd.Flags())
cmd.MarkFlagFilename(“config”, “yaml”, “yml”, “json”)
return cmd
}
启动:/cmd/kube-scheduler/app/server.go
func Run(c schedulerserverconfig.CompletedConfig, stopCh <-chan struct{}) error {
// 设置调度算法
algorithmprovider.ApplyFeatureGates()
…
// 初始化 schedulerConfig
schedulerConfig, err := NewSchedulerConfig(c)
// 创建 Scheduler 对象
sched := scheduler.NewFromConfig(schedulerConfig)
// 进行健康检查
if c.InsecureServing != nil {
…
// 是否需要选主
if c.LeaderElection != nil {
…
// 执行调度任务
run(stopCh)
}
执行:/cmd/kube-scheduler/app/server.go
// 开始执行调度任务
func (sched *Scheduler) Run() {
if !sched.config.WaitForCacheSync() {
return
}
if utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) {
go sched.config.VolumeBinder.Run(sched.bindVolumesWorker, sched.config.StopEverything)
}
// 串行执行调度任务
go wait.Until(sched.scheduleOne, 0, sched.config.StopEverything)
}
调度 pod 逻辑:/cmd/kube-scheduler/scheduler.go
func (sched *Scheduler) scheduleOne() {
// 从队列中获取 pod
pod := sched.config.NextPod()
…
// 给获取到的 pod 调度到合适的位置
suggestedHost, err := sched.schedule(pod)
…
// 在缓存中预先绑定主机(调用 apiserver 的延时问题)
assumedPod := pod.DeepCopy()
…
// 通过 apiserver 的 client 进行绑定
go func() {
err := sched.bind(assumedPod, &v1.Binding{
ObjectMeta: metav1.ObjectMeta{Namespace: assumedPod.Namespace, Name: assumedPod.Name, UID: assumedPod.UID}
…
}
寻找合适的节点:/pkg/scheduler/core/generic_scheduler.go
func (sched *Scheduler) scheduleOne() {
// 从队列中获取 pod
pod := sched.config.NextPod()
…
// 给获取到的 pod 调度到合适的位置
suggestedHost, err := sched.schedule(pod)
…
// 在缓存中预先绑定主机(调用 apiserver 的延时问题)
assumedPod := pod.DeepCopy()
…
// 通过 apiserver 的 client 进行绑定
go func() {
err := sched.bind(assumedPod, &v1.Binding{
ObjectMeta: metav1.ObjectMeta{Namespace: assumedPod.Namespace, Name: assumedPod.Name, UID: assumedPod.UID}
…
}