ThinkChat2.0新版上线,更智能更精彩,支持会话、画图、阅读、搜索等,送10W Token,即刻开启你的AI之旅 广告
首先要做的是大致浏览下几个目录与大致的文件!! 基本的流程可以简述为: 1 创建pod 2 schedulor notice到有新的pod待分配。 3 scedulor为pod分配一个node 大致的一个结构就像 ~~~ while True: pods = get_all_pods() for pod in pods: if pod.node == nil: assignNode(pod) ~~~ 确实整个scheduler的核心就是这个,在此基础上的就是对于整个的实现, 根据官方文件描述:程序的main入口在schedulor.go下。 通过这个方法点击进入,然后就被各种不认识的结构体方法愣住了(哈哈)。没关系,我们要做的不是仔细看代码,要做的是获取其主要的代码结构。 通过跳转就到了server.go这个文件下,首先看下包注释:~~~ Package app implements a Server object for running the scheduler. ~~~ 实现一个server对象用于run scheduler。 然后大致浏览下方法, ~~~ ~~~ // NewSchedulerCommand creates a *cobra.Command object with default parameters and registryOptions func NewSchedulerCommand(registryOptions ...Option) *cobra.Command { 注释写明了创建具有默认参数的cobra对象。 ~~~ 接着往下 ~~~ // runCommand runs the scheduler. func runCommand(cmd *cobra.Command, opts *options.Options, registryOptions ...Option) error { 这就找到了schedulor的run函数。easy 对不对! ~~~ ~~~ // Run executes the scheduler based on the given configuration. It only returns on error or when context is done. func Run(ctx context.Context, cc *schedulerserverconfig.CompletedConfig, sched *scheduler.Scheduler) error { 在给定的配置上run scheduler。(恩,一个是runscheduler,一个是在给定的配置基础上run scheduler。那你肯定会问点什么。) ~~~ ~~~ // Setup creates a completed config and a scheduler based on the command args and options func Setup(ctx context.Context, opts *options.Options, outOfTreeRegistryOptions ...Option) (*schedulerserverconfig.CompletedConfig, *scheduler.Scheduler, error) { if errs := opts.Validate(); len(errs) > 0 { 接着往下:Step函数基于命令行参数与完整的配置run一个schedulor,通过注释就可以知道,这三者的关系应该很清晰啦! ~~~ server这个文件里面主要的模块就是这样。不用看代码就知道, (如何能够找到Run函数,或许是通过调试运行) 仔细看run函数, ~~~ // Leader election is disabled, so runCommand inline until done. close(waitingForLeader) sched.Run(ctx) return fmt.Errorf("finished without leader elect") return的是finished without leader elect。说明scheduler已经在里面run起来了,那么在哪里呢?你找找看。 找到之后,就是下面这样,跟前面的while循环一个道理。 ~~~ ~~~ // Run begins watching and scheduling. It starts scheduling and blocked until the context is done. sched.SchedulingQueue.Run() wait.UntilWithContext(ctx, sched.scheduleOne, 0) sched.SchedulingQueue.Close() 注释就说的很清楚啦,大意就是跟那个while循环一样。这里为了便于理解:放一个更早的版本。 ~~~ ~~~ func (sched *Scheduler) scheduleOne() { pod := sched.config.NextPod() // do all the scheduler stuff for `pod` } 其他的方法暂时也看不懂,不管啦,哈哈。 可以找到scheduler的结构体定义。以及一个schelorOne。进入。 ~~~ ~~~ // scheduleOne does the entire scheduling workflow for a single pod. It is serialized on the scheduling algorithm's host fitting. func (sched *Scheduler) scheduleOne(ctx context.Context) 通过注释就可以看到,这是方法是处理整个调度器的核心流程啦。 ~~~ 阅读scheduler方法,碰到这行, ~~~ scheduleResult, err := sched.Algorithm.Schedule(schedulingCycleCtx, fwk, state, pod) ~~~ 点击进去 ~~~ // ScheduleAlgorithm is an interface implemented by things that know how to schedule pods // onto machines. // TODO: Rename this type. type ScheduleAlgorithm interface { Schedule(context.Context, framework.Framework, *framework.CycleState, *v1.Pod) (scheduleResult ScheduleResult, err error) // Extenders returns a slice of extender config. This is exposed for // testing. Extenders() []framework.Extender } 是一个结构,(编译器搜索不到这个接口的方法。) ~~~ 继续往下,就能看到这个接口方法的实现 ~~~ // Schedule tries to schedule the given pod to one of the nodes in the node list. // If it succeeds, it will return the name of the node. // If it fails, it will return a FitError error with reasons. func (g *genericScheduler) Schedule(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) 调度器尝试将给定的pod调度到node list的node里面 成功则返回名字,失败则返回error信息。 ~~~ ~~~ feasibleNodes, filteredNodesStatuses, err :=g.findNodesThatFitPod(ctx, fwk, state, pod) 点击方法 // Filters the nodes to find the ones that fit the pod based on the framework // filter plugins and filter extenders. func (g *genericScheduler) findNodesThatFitPod(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) 找到了有一个主要的方法,基于 framework filter plugins and filter extenders 去为pod筛选node,返回可行节点,这就是node筛选算法 ~~~ 继续往下 ~~~ priorityList, err := g.prioritizeNodes(ctx, fwk, state, pod, feasibleNodes) 查看方法注释 // prioritizeNodes prioritizes the nodes by running the score plugins, // which return a score for each node from the call to RunScorePlugins(). // The scores from each plugin are added together to make the score for that node, then // any extenders are run as well. // All scores are finally combined (added) to get the total weighted scores of all nodes 大致翻译一下:通过score plugins为node划分优先级,然后从RunScorePlugins方法里面为每一个node返回一个评分,待仔细翻译 这就是node优先级算法。 ~~~ 我们已经找到了优先级算法,筛选算法。 (这里如何理解func (g *genericScheduler) ) 点击genericScheduler,查看到结构体,搜索哪个地方创建了这个结构体对象? ~~~ 然后就能找到 // NewGenericScheduler creates a genericScheduler object. func NewGenericScheduler( ~~~ 继续搜索,查看那个地方调用了该方法。 ~~~ algo := core.NewGenericScheduler( c.schedulerCache, c.nodeInfoSnapshot, extenders, c.percentageOfNodesToScore, ) 这段代码是在create方法中, // create a scheduler from a set of registered plugins. func (c *Configurator) create() (*Scheduler, error) { 从注册的plug中创建scheduler。 继续搜索哪些地方调用了create方法, // createFromProvider creates a scheduler from the name of a registered algorithm provider. func (c *Configurator) createFromProvider(providerName string) (*Scheduler, error) { 一个是createFromProvider,一个是createFromConfig // createFromConfig creates a scheduler from the configuration file // Only reachable when using v1alpha1 component config func (c *Configurator) createFromConfig(policy schedulerapi.Policy) (*Scheduler, error) { ~~~ 这里貌似还有一些地方没有找到,待补充。 前面进行了一些列的准备动作,return 前面的一行sched.RUN方法。 点进去看看。 我写这个出发点是以一个完全不依赖于实现对其有所了解的思维去写。也即是如何以及自己的思维去阅读这个东西。我认为这是怎么样能够以自己的理解去学习。 如果从一个完全陌生的角度去,我认为就是我上面这个思路,但不好意思的是明显我也参考了别人的资料,哈哈。 核心就是,把握大致的设计思路,尝试在代码中找到他。然后再去理解每一个分支。不依靠其他的资料,自己去发现与探索,enjoy it。