kube-controller-manager基本流程:1、构造2、配置3、初始化4、执行入口函数:/cmd/kube-controller-manager/controller-manager.gofunc main() { rand.Seed(time.Now().UnixNano()) //构造,配置,初始化command command := app.NewControllerManagerCommand() logs.InitLogs() defer logs.FlushLogs() //执行 if err := command.Execute(); err != nil { fmt.Fprintf(os.Stderr, “%v\n”, err) os.Exit(1) }}构造执行器: /cmd/kube-controller-manager/app/controllermanager.gofunc NewControllerManagerCommand() *cobra.Command { //初始化Controller-manager的配置选项结构 s, err := options.NewKubeControllerManagerOptions() … //创建执行命令结构 cmd := &cobra.Command{ Use: “kube-controller-manager”, Long: The Kubernetes controller manager is a daemon that embeds...' //获取所有控制器 c, err := s.Config(KnownControllers(), ControllersDisabledByDefault.List()) ... } //返回执行器 return cmd;}进入执行:/cmd/kube-controller-manager/app/controllermanager.gofunc Run(c *config.CompletedConfig, stopCh <-chan struct{}) error { ... //初始化controller manager 的HTTP服务 var unsecuredMux *mux.PathRecorderMux if c.SecureServing != nil { ... //构造run的执行体 run := func(stop <-chan struct{}) { ... //如果只是单节点,直接运行run if !c.ComponentConfig.GenericComponent.LeaderElection.LeaderElect { run(wait.NeverStop) panic("unreachable") } //非单点,选主后执行run //进行选主,并在选为主节点后执行run leaderelection.RunOrDie(leaderelection.LeaderElectionConfig{ ... //选主完成后执行 OnStartedLeading: run, ...}run的执行体:/cmd/kube-controller-manager/app/controllermanager.go >> run() run := func(stop <-chan struct{}) { //创建控制器上下文 ctx, err := CreateControllerContext(c, rootClientBuilder, clientBuilder, stop) if err != nil { glog.Fatalf("error building controller context: %v", err) } saTokenControllerInitFunc := serviceAccountTokenControllerStarter{rootClientBuilder: rootClientBuilder}.startServiceAccountTokenController //初始化所有控制器 if err := StartControllers(ctx, saTokenControllerInitFunc, NewControllerInitializers(ctx.LoopMode)); err != nil { glog.Fatalf("error starting controllers: %v", err) } //启动监听资源的事件 ctx.InformerFactory.Start(ctx.Stop) close(ctx.InformersStarted) select {} }选主流程:/staging/src/k8s.io/client-go/tools/leaderelection/resourcelock/interface.go//选主主要有client-go工具类完成,选择configmap/endpoint来创建资源,哪个执行单元创建成功了此资源便可获得锁,锁信息存储在此configmap/endpoint中,选主代码如下func New(lockType string, ns string, name string, client corev1.CoreV1Interface, rlc ResourceLockConfig) (Interface, error) { switch lockType { case EndpointsResourceLock: return &EndpointsLock{ EndpointsMeta: metav1.ObjectMeta{ Namespace: ns, Name: name, }, Client: client, LockConfig: rlc, }, nil case ConfigMapsResourceLock: return &ConfigMapLock{ ConfigMapMeta: metav1.ObjectMeta{ Namespace: ns, Name: name, }, Client: client, LockConfig: rlc, }, nil default: return nil, fmt.Errorf("Invalid lock-type %s", lockType) }}初始化所有控制器:/cmd/kube-controller-manager/app/controllermanager.gofunc StartControllers(ctx ControllerContext, startSATokenController InitFunc, controllers map[string]InitFunc) error { ··· //遍历所有的controller list for controllerName, initFn := range controllers { if !ctx.IsControllerEnabled(controllerName) { glog.Warningf("%q is disabled", controllerName) continue } time.Sleep(wait.Jitter(ctx.ComponentConfig.GenericComponent.ControllerStartInterval.Duration, ControllerStartJitter)) glog.V(1).Infof("Starting %q", controllerName) //执行每个controller的初始化函数 started, err := initFn(ctx) ··· } return nil}创建控制器上下文:/cmd/kube-controller-manager/app/controllermanager.gofunc CreateControllerContext(s *config.CompletedConfig, rootClientBuilder, clientBuilder controller.ControllerClientBuilder, stop <-chan struct{}) (ControllerContext, error) { //拿到对apiServer资源的操作的句柄 versionedClient := rootClientBuilder.ClientOrDie("shared-informers") sharedInformers := informers.NewSharedInformerFactory(versionedClient, ResyncPeriod(s)()) //确认api Server的健康(最多等待的时间为10s),再获取连接 if err := genericcontrollermanager.WaitForAPIServer(versionedClient, 10*time.Second); err != nil { return ControllerContext{}, fmt.Errorf("failed to wait for apiserver being healthy: %v", err) } //创建并返回controllerContext ctx := ControllerContext{ ClientBuilder: clientBuilder, InformerFactory: sharedInformers, ... } return ctx,nil}kube-scheduler基本流程1、初始化配置2、构造3、从队列中获取pod4、进行绑定入口函数:/cmd/kube-scheduler/scheduler.gofunc main() { rand.Seed(time.Now().UnixNano()) //构造 command := app.NewSchedulerCommand() pflag.CommandLine.SetNormalizeFunc(utilflag.WordSepNormalizeFunc) // utilflag.InitFlags() logs.InitLogs() defer logs.FlushLogs() //执行 if err := command.Execute(); err != nil { fmt.Fprintf(os.Stderr, "%v\n", err) os.Exit(1) }}注册调度策略:pkg/scheduler/algorithmprovider/defaults/defaults.gofunc registerAlgorithmProvider(predSet, priSet sets.String) { // Registers algorithm providers. By default we use 'DefaultProvider', but user can specify one to be used // by specifying flag. factory.RegisterAlgorithmProvider(factory.DefaultProvider, predSet, priSet) // Cluster autoscaler friendly scheduling algorithm. factory.RegisterAlgorithmProvider(ClusterAutoscalerProvider, predSet, copyAndReplace(priSet, priorities.LeastRequestedPriority, priorities.MostRequestedPriority))}从组件入口:/cmd/kube-scheduler/app/server.gofunc NewSchedulerCommand() *cobra.Command { //初始化默认的参数 opts, err := options.NewOptions() //构造执行命令对象 cmd := &cobra.Command{ Use: "kube-scheduler", Long:
The Kubernetes ······`, Run: func(cmd *cobra.Command, args []string) { … } //读取配置参数 opts.AddFlags(cmd.Flags()) cmd.MarkFlagFilename(“config”, “yaml”, “yml”, “json”) return cmd}启动:/cmd/kube-scheduler/app/server.gofunc Run(c schedulerserverconfig.CompletedConfig, stopCh <-chan struct{}) error { //设置调度算法 algorithmprovider.ApplyFeatureGates() … //初始化schedulerConfig schedulerConfig, err := NewSchedulerConfig(c) //创建Scheduler对象 sched := scheduler.NewFromConfig(schedulerConfig) // 进行健康检查 if c.InsecureServing != nil { … //是否需要选主 if c.LeaderElection != nil { … //执行调度任务 run(stopCh)}执行:/cmd/kube-scheduler/app/server.go//开始执行调度任务func (sched *Scheduler) Run() { if !sched.config.WaitForCacheSync() { return } if utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) { go sched.config.VolumeBinder.Run(sched.bindVolumesWorker, sched.config.StopEverything) } //串行执行调度任务 go wait.Until(sched.scheduleOne, 0, sched.config.StopEverything)}调度pod逻辑:/cmd/kube-scheduler/scheduler.gofunc (sched *Scheduler) scheduleOne() { //从队列中获取pod pod := sched.config.NextPod() … //给获取到的pod调度到合适的位置 suggestedHost, err := sched.schedule(pod) … //在缓存中预先绑定主机(调用apiserver的延时问题) assumedPod := pod.DeepCopy() … //通过apiserver的client进行绑定 go func() { err := sched.bind(assumedPod, &v1.Binding{ ObjectMeta: metav1.ObjectMeta{Namespace: assumedPod.Namespace, Name: assumedPod.Name, UID: assumedPod.UID} …}寻找合适的节点:/pkg/scheduler/core/generic_scheduler.gofunc (sched *Scheduler) scheduleOne() { //从队列中获取pod pod := sched.config.NextPod() … //给获取到的pod调度到合适的位置 suggestedHost, err := sched.schedule(pod) … //在缓存中预先绑定主机(调用apiserver的延时问题) assumedPod := pod.DeepCopy() … //通过apiserver的client进行绑定 go func() { err := sched.bind(assumedPod, &v1.Binding{ ObjectMeta: metav1.ObjectMeta{Namespace: assumedPod.Namespace, Name: assumedPod.Name, UID: assumedPod.UID} …}