kube-controller-manager
基本流程:
1、构造
2、配置
3、初始化
4、执行
入口函数:/cmd/kube-controller-manager/controller-manager.go
func main() {
rand.Seed(time.Now().UnixNano())
//构造,配置,初始化command
command := app.NewControllerManagerCommand()
logs.InitLogs()
defer logs.FlushLogs()
//执行
if err := command.Execute(); err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
os.Exit(1)
}
}
构造执行器: /cmd/kube-controller-manager/app/controllermanager.go
func NewControllerManagerCommand() *cobra.Command {
//初始化Controller-manager的配置选项结构
s, err := options.NewKubeControllerManagerOptions()
...
//创建执行命令结构
cmd := &cobra.Command{
Use: "kube-controller-manager",
Long: `The Kubernetes controller manager is a daemon that embeds...'
//获取所有控制器
c, err := s.Config(KnownControllers(), ControllersDisabledByDefault.List())
...
}
//返回执行器
return cmd;
}
进入执行:/cmd/kube-controller-manager/app/controllermanager.go
func Run(c *config.CompletedConfig, stopCh <-chan struct{}) error {
...
//初始化controller manager 的HTTP服务
var unsecuredMux *mux.PathRecorderMux
if c.SecureServing != nil {
...
//构造run的执行体
run := func(stop <-chan struct{}) {
...
//如果只是单节点,直接运行run if !c.ComponentConfig.GenericComponent.LeaderElection.LeaderElect {
run(wait.NeverStop)
panic("unreachable")
}
//非单点,选主后执行run
//进行选主,并在选为主节点后执行run
leaderelection.RunOrDie(leaderelection.LeaderElectionConfig{
...
//选主完成后执行
OnStartedLeading: run,
...
}
run的执行体:/cmd/kube-controller-manager/app/controllermanager.go >> run()
run := func(stop <-chan struct{}) {
//创建控制器上下文
ctx, err := CreateControllerContext(c, rootClientBuilder, clientBuilder, stop)
if err != nil {
glog.Fatalf("error building controller context: %v", err)
}
saTokenControllerInitFunc := serviceAccountTokenControllerStarter{rootClientBuilder: rootClientBuilder}.startServiceAccountTokenController
//初始化所有控制器
if err := StartControllers(ctx, saTokenControllerInitFunc, NewControllerInitializers(ctx.LoopMode)); err != nil {
glog.Fatalf("error starting controllers: %v", err)
}
//启动监听资源的事件
ctx.InformerFactory.Start(ctx.Stop)
close(ctx.InformersStarted)
select {}
}
选主流程:/staging/src/k8s.io/client-go/tools/leaderelection/resourcelock/interface.go
//选主主要有client-go工具类完成,选择configmap/endpoint来创建资源,哪个执行单元创建成功了此资源便可获得锁,锁信息存储在此configmap/endpoint中,选主代码如下
func New(lockType string, ns string, name string, client corev1.CoreV1Interface, rlc ResourceLockConfig) (Interface, error) {
switch lockType {
case EndpointsResourceLock:
return &EndpointsLock{
EndpointsMeta: metav1.ObjectMeta{
Namespace: ns,
Name: name,
},
Client: client,
LockConfig: rlc,
}, nil
case ConfigMapsResourceLock:
return &ConfigMapLock{
ConfigMapMeta: metav1.ObjectMeta{
Namespace: ns,
Name: name,
},
Client: client,
LockConfig: rlc,
}, nil
default:
return nil, fmt.Errorf("Invalid lock-type %s", lockType)
}
}
初始化所有控制器:/cmd/kube-controller-manager/app/controllermanager.go
func StartControllers(ctx ControllerContext, startSATokenController InitFunc, controllers map[string]InitFunc) error {
···
//遍历所有的controller list
for controllerName, initFn := range controllers {
if !ctx.IsControllerEnabled(controllerName) {
glog.Warningf("%q is disabled", controllerName)
continue
}
time.Sleep(wait.Jitter(ctx.ComponentConfig.GenericComponent.ControllerStartInterval.Duration, ControllerStartJitter))
glog.V(1).Infof("Starting %q", controllerName)
//执行每个controller的初始化函数
started, err := initFn(ctx)
···
}
return nil
}
创建控制器上下文:/cmd/kube-controller-manager/app/controllermanager.go
func CreateControllerContext(s *config.CompletedConfig, rootClientBuilder, clientBuilder controller.ControllerClientBuilder, stop <-chan struct{}) (ControllerContext, error) {
//拿到对apiServer资源的操作的句柄
versionedClient := rootClientBuilder.ClientOrDie("shared-informers")
sharedInformers := informers.NewSharedInformerFactory(versionedClient, ResyncPeriod(s)())
//确认api Server的健康(最多等待的时间为10s),再获取连接
if err := genericcontrollermanager.WaitForAPIServer(versionedClient, 10*time.Second); err != nil {
return ControllerContext{}, fmt.Errorf("failed to wait for apiserver being healthy: %v", err)
}
//创建并返回controllerContext
ctx := ControllerContext{
ClientBuilder: clientBuilder,
InformerFactory: sharedInformers,
...
}
return ctx,nil
}
kube-scheduler
基本流程
1、初始化配置
2、构造
3、从队列中获取pod
4、进行绑定
入口函数:/cmd/kube-scheduler/scheduler.go
func main() {
rand.Seed(time.Now().UnixNano())
//构造
command := app.NewSchedulerCommand()
pflag.CommandLine.SetNormalizeFunc(utilflag.WordSepNormalizeFunc)
// utilflag.InitFlags()
logs.InitLogs()
defer logs.FlushLogs()
//执行
if err := command.Execute(); err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
os.Exit(1)
}
}
注册调度策略:pkg/scheduler/algorithmprovider/defaults/defaults.go
func registerAlgorithmProvider(predSet, priSet sets.String) {
// Registers algorithm providers. By default we use 'DefaultProvider', but user can specify one to be used
// by specifying flag.
factory.RegisterAlgorithmProvider(factory.DefaultProvider, predSet, priSet)
// Cluster autoscaler friendly scheduling algorithm.
factory.RegisterAlgorithmProvider(ClusterAutoscalerProvider, predSet,
copyAndReplace(priSet, priorities.LeastRequestedPriority, priorities.MostRequestedPriority))
}
从组件入口:/cmd/kube-scheduler/app/server.go
func NewSchedulerCommand() *cobra.Command {
//初始化默认的参数
opts, err := options.NewOptions()
//构造执行命令对象
cmd := &cobra.Command{
Use: "kube-scheduler",
Long: `The Kubernetes ······`,
Run: func(cmd *cobra.Command, args []string) {
...
}
//读取配置参数
opts.AddFlags(cmd.Flags())
cmd.MarkFlagFilename("config", "yaml", "yml", "json")
return cmd
}
启动:/cmd/kube-scheduler/app/server.go
func Run(c schedulerserverconfig.CompletedConfig, stopCh <-chan struct{}) error {
//设置调度算法
algorithmprovider.ApplyFeatureGates()
...
//初始化schedulerConfig
schedulerConfig, err := NewSchedulerConfig(c)
//创建Scheduler对象
sched := scheduler.NewFromConfig(schedulerConfig)
// 进行健康检查
if c.InsecureServing != nil {
...
//是否需要选主
if c.LeaderElection != nil {
...
//执行调度任务
run(stopCh)
}
执行:/cmd/kube-scheduler/app/server.go
//开始执行调度任务
func (sched *Scheduler) Run() {
if !sched.config.WaitForCacheSync() {
return
}
if utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) {
go sched.config.VolumeBinder.Run(sched.bindVolumesWorker, sched.config.StopEverything)
}
//串行执行调度任务
go wait.Until(sched.scheduleOne, 0, sched.config.StopEverything)
}
调度pod逻辑:/cmd/kube-scheduler/scheduler.go
func (sched *Scheduler) scheduleOne() {
//从队列中获取pod
pod := sched.config.NextPod()
...
//给获取到的pod调度到合适的位置
suggestedHost, err := sched.schedule(pod)
...
//在缓存中预先绑定主机(调用apiserver的延时问题)
assumedPod := pod.DeepCopy()
...
//通过apiserver的client进行绑定
go func() {
err := sched.bind(assumedPod, &v1.Binding{
ObjectMeta: metav1.ObjectMeta{Namespace: assumedPod.Namespace, Name: assumedPod.Name, UID: assumedPod.UID}
...
}
寻找合适的节点:/pkg/scheduler/core/generic_scheduler.go
func (sched *Scheduler) scheduleOne() {
//从队列中获取pod
pod := sched.config.NextPod()
...
//给获取到的pod调度到合适的位置
suggestedHost, err := sched.schedule(pod)
...
//在缓存中预先绑定主机(调用apiserver的延时问题)
assumedPod := pod.DeepCopy()
...
//通过apiserver的client进行绑定
go func() {
err := sched.bind(assumedPod, &v1.Binding{
ObjectMeta: metav1.ObjectMeta{Namespace: assumedPod.Namespace, Name: assumedPod.Name, UID: assumedPod.UID}
...
}
有疑问加站长微信联系(非本文作者)