xxx

熊云港 · · 1989 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

kube-controller-manager

基本流程:
1、构造
2、配置
3、初始化
4、执行

image.png

入口函数:/cmd/kube-controller-manager/controller-manager.go

func main() {

    rand.Seed(time.Now().UnixNano())
    
    //构造,配置,初始化command
    command := app.NewControllerManagerCommand()

    logs.InitLogs()
    defer logs.FlushLogs()

    //执行
    if err := command.Execute(); err != nil {
        fmt.Fprintf(os.Stderr, "%v\n", err)
        os.Exit(1)
    }

}

构造执行器: /cmd/kube-controller-manager/app/controllermanager.go

func NewControllerManagerCommand() *cobra.Command {

    //初始化Controller-manager的配置选项结构
    s, err := options.NewKubeControllerManagerOptions()

    ...

    //创建执行命令结构
    cmd := &cobra.Command{
        Use: "kube-controller-manager",
        Long: `The Kubernetes controller manager is a daemon that embeds...'
        //获取所有控制器
        c, err := s.Config(KnownControllers(), ControllersDisabledByDefault.List())
        ...
    }


    //返回执行器
    return cmd;
}

进入执行:/cmd/kube-controller-manager/app/controllermanager.go

func Run(c *config.CompletedConfig, stopCh <-chan struct{}) error {
    ...
    //初始化controller manager 的HTTP服务
    var unsecuredMux *mux.PathRecorderMux
    if c.SecureServing != nil {
    ...
    //构造run的执行体
    run := func(stop <-chan struct{}) {
    ...
     //如果只是单节点,直接运行run    if !c.ComponentConfig.GenericComponent.LeaderElection.LeaderElect {
        run(wait.NeverStop)
        panic("unreachable")
    }
    //非单点,选主后执行run
    //进行选主,并在选为主节点后执行run
    leaderelection.RunOrDie(leaderelection.LeaderElectionConfig{
    ...
    //选主完成后执行
    OnStartedLeading: run,
    ...
}

run的执行体:/cmd/kube-controller-manager/app/controllermanager.go >> run()

 run := func(stop <-chan struct{}) {
     
    //创建控制器上下文
    ctx, err := CreateControllerContext(c, rootClientBuilder, clientBuilder, stop)
    if err != nil {
    glog.Fatalf("error building controller context: %v", err)
            }
    saTokenControllerInitFunc := serviceAccountTokenControllerStarter{rootClientBuilder:     rootClientBuilder}.startServiceAccountTokenController

    //初始化所有控制器
    if err := StartControllers(ctx, saTokenControllerInitFunc, NewControllerInitializers(ctx.LoopMode)); err != nil {
    glog.Fatalf("error starting controllers: %v", err)
    }

    //启动监听资源的事件
            ctx.InformerFactory.Start(ctx.Stop)
            close(ctx.InformersStarted)

            select {}
    }

选主流程:/staging/src/k8s.io/client-go/tools/leaderelection/resourcelock/interface.go

//选主主要有client-go工具类完成,选择configmap/endpoint来创建资源,哪个执行单元创建成功了此资源便可获得锁,锁信息存储在此configmap/endpoint中,选主代码如下
func New(lockType string, ns string, name string, client corev1.CoreV1Interface, rlc ResourceLockConfig) (Interface, error) {
    switch lockType {
    case EndpointsResourceLock:
        return &EndpointsLock{
            EndpointsMeta: metav1.ObjectMeta{
                Namespace: ns,
                Name:      name,
            },
            Client:     client,
            LockConfig: rlc,
        }, nil
    case ConfigMapsResourceLock:
        return &ConfigMapLock{
            ConfigMapMeta: metav1.ObjectMeta{
                Namespace: ns,
                Name:      name,
            },
            Client:     client,
            LockConfig: rlc,
        }, nil
    default:
        return nil, fmt.Errorf("Invalid lock-type %s", lockType)
    }
}

初始化所有控制器:/cmd/kube-controller-manager/app/controllermanager.go

func StartControllers(ctx ControllerContext, startSATokenController InitFunc, controllers map[string]InitFunc) error {
    ···
    //遍历所有的controller list
    for controllerName, initFn := range controllers {
        if !ctx.IsControllerEnabled(controllerName) {
        glog.Warningf("%q is disabled", controllerName)
        continue
    }
    time.Sleep(wait.Jitter(ctx.ComponentConfig.GenericComponent.ControllerStartInterval.Duration, ControllerStartJitter))
    glog.V(1).Infof("Starting %q", controllerName)
    //执行每个controller的初始化函数
    started, err := initFn(ctx)
    ···
    }

    return nil
}

创建控制器上下文:/cmd/kube-controller-manager/app/controllermanager.go

func CreateControllerContext(s *config.CompletedConfig, rootClientBuilder, clientBuilder controller.ControllerClientBuilder, stop <-chan struct{}) (ControllerContext, error) {
    //拿到对apiServer资源的操作的句柄
    versionedClient := rootClientBuilder.ClientOrDie("shared-informers")
    sharedInformers := informers.NewSharedInformerFactory(versionedClient, ResyncPeriod(s)())

    //确认api Server的健康(最多等待的时间为10s),再获取连接
    if err := genericcontrollermanager.WaitForAPIServer(versionedClient, 10*time.Second); err != nil {
    return ControllerContext{}, fmt.Errorf("failed to wait for apiserver being healthy: %v", err)
    }
    //创建并返回controllerContext
    ctx := ControllerContext{
        ClientBuilder:      clientBuilder,
        InformerFactory:    sharedInformers,
        ...
    }

    
    return ctx,nil
}

kube-scheduler

基本流程
1、初始化配置
2、构造
3、从队列中获取pod
4、进行绑定

clipboard.png

入口函数:/cmd/kube-scheduler/scheduler.go

func main() {
    rand.Seed(time.Now().UnixNano())
    //构造
    command := app.NewSchedulerCommand()
    pflag.CommandLine.SetNormalizeFunc(utilflag.WordSepNormalizeFunc)
    // utilflag.InitFlags()
    logs.InitLogs()
    defer logs.FlushLogs()
    //执行
    if err := command.Execute(); err != nil {
        fmt.Fprintf(os.Stderr, "%v\n", err)
        os.Exit(1)
    }
}

注册调度策略:pkg/scheduler/algorithmprovider/defaults/defaults.go

func registerAlgorithmProvider(predSet, priSet sets.String) {
    // Registers algorithm providers. By default we use 'DefaultProvider', but user can specify one to be used
    // by specifying flag.
    factory.RegisterAlgorithmProvider(factory.DefaultProvider, predSet, priSet)
    // Cluster autoscaler friendly scheduling algorithm.
    factory.RegisterAlgorithmProvider(ClusterAutoscalerProvider, predSet,
        copyAndReplace(priSet, priorities.LeastRequestedPriority, priorities.MostRequestedPriority))
}

从组件入口:/cmd/kube-scheduler/app/server.go

func NewSchedulerCommand() *cobra.Command {
    //初始化默认的参数
    opts, err := options.NewOptions()
    
    //构造执行命令对象
    cmd := &cobra.Command{
    Use: "kube-scheduler",
    Long: `The Kubernetes ······`,
    Run: func(cmd *cobra.Command, args []string) {
    ...
    }
    //读取配置参数
    opts.AddFlags(cmd.Flags())
    cmd.MarkFlagFilename("config", "yaml", "yml", "json")

    return cmd
}

启动:/cmd/kube-scheduler/app/server.go

func Run(c schedulerserverconfig.CompletedConfig, stopCh <-chan struct{}) error {
    //设置调度算法
    algorithmprovider.ApplyFeatureGates()
    ...
    //初始化schedulerConfig
    schedulerConfig, err := NewSchedulerConfig(c)

    //创建Scheduler对象
    sched := scheduler.NewFromConfig(schedulerConfig)
    
    // 进行健康检查
    if c.InsecureServing != nil {    
    ...
    //是否需要选主
    if c.LeaderElection != nil {
    ...
    //执行调度任务
    run(stopCh)

}

执行:/cmd/kube-scheduler/app/server.go

//开始执行调度任务
func (sched *Scheduler) Run() {
    if !sched.config.WaitForCacheSync() {
    return
    }

    if utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) {
        go sched.config.VolumeBinder.Run(sched.bindVolumesWorker, sched.config.StopEverything)
    }

    //串行执行调度任务
    go wait.Until(sched.scheduleOne, 0, sched.config.StopEverything)
}

调度pod逻辑:/cmd/kube-scheduler/scheduler.go

func (sched *Scheduler) scheduleOne() {
    //从队列中获取pod
    pod := sched.config.NextPod()
    ...
    //给获取到的pod调度到合适的位置
     suggestedHost, err := sched.schedule(pod)
    ...
    //在缓存中预先绑定主机(调用apiserver的延时问题)
     assumedPod := pod.DeepCopy()
    ...
    //通过apiserver的client进行绑定
    go func() {
    err := sched.bind(assumedPod, &v1.Binding{
                   ObjectMeta: metav1.ObjectMeta{Namespace: assumedPod.Namespace, Name:     assumedPod.Name, UID: assumedPod.UID}
    ...
}

寻找合适的节点:/pkg/scheduler/core/generic_scheduler.go

func (sched *Scheduler) scheduleOne() {
    //从队列中获取pod
    pod := sched.config.NextPod()
    ...
    //给获取到的pod调度到合适的位置
     suggestedHost, err := sched.schedule(pod)
    ...
    //在缓存中预先绑定主机(调用apiserver的延时问题)
     assumedPod := pod.DeepCopy()
    ...
    //通过apiserver的client进行绑定
    go func() {
    err := sched.bind(assumedPod, &v1.Binding{
                   ObjectMeta: metav1.ObjectMeta{Namespace: assumedPod.Namespace, Name:     assumedPod.Name, UID: assumedPod.UID}
    ...
}

有疑问加站长微信联系(非本文作者)

本文来自:Segmentfault

感谢作者:熊云港

查看原文:xxx

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

1989 次点击  
加入收藏 微博
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传