ThinkChat2.0新版上线,更智能更精彩,支持会话、画图、阅读、搜索等,送10W Token,即刻开启你的AI之旅 广告
# Sidekiq Style Guide > 原文:[https://docs.gitlab.com/ee/development/sidekiq_style_guide.html](https://docs.gitlab.com/ee/development/sidekiq_style_guide.html) * [ApplicationWorker](#applicationworker) * [Dedicated Queues](#dedicated-queues) * [Queue Namespaces](#queue-namespaces) * [Idempotent Jobs](#idempotent-jobs) * [Ensuring a worker is idempotent](#ensuring-a-worker-is-idempotent) * [Declaring a worker as idempotent](#declaring-a-worker-as-idempotent) * [Deduplication](#deduplication) * [Job urgency](#job-urgency) * [Latency sensitive jobs](#latency-sensitive-jobs) * [Changing a queue’s urgency](#changing-a-queues-urgency) * [Jobs with External Dependencies](#jobs-with-external-dependencies) * [CPU-bound and Memory-bound Workers](#cpu-bound-and-memory-bound-workers) * [Declaring a Job as CPU-bound](#declaring-a-job-as-cpu-bound) * [Determining whether a worker is CPU-bound](#determining-whether-a-worker-is-cpu-bound) * [Feature category](#feature-category) * [Job weights](#job-weights) * [Worker context](#worker-context) * [Cron workers](#cron-workers) * [Jobs scheduled in bulk](#jobs-scheduled-in-bulk) * [Arguments logging](#arguments-logging) * [Tests](#tests) * [Sidekiq Compatibility across Updates](#sidekiq-compatibility-across-updates) * [Changing the arguments for a worker](#changing-the-arguments-for-a-worker) * [Remove an argument](#remove-an-argument) * [Add an argument](#add-an-argument) * [Multi-step deployment](#multi-step-deployment) * [Parameter hash](#parameter-hash) * [Removing workers](#removing-workers) * [Renaming queues](#renaming-queues) # Sidekiq Style Guide[](#sidekiq-style-guide "Permalink") 本文档概述了添加或修改 Sidekiq 工作程序时应遵循的各种准则. ## ApplicationWorker[](#applicationworker "Permalink") All workers should include `ApplicationWorker` instead of `Sidekiq::Worker`, which adds some convenience methods and automatically sets the queue based on the worker’s name. ## Dedicated Queues[](#dedicated-queues "Permalink") 所有工作程序都应使用自己的队列,该队列将根据工作程序类名称自动设置. 对于名为`ProcessSomethingWorker`的工作程序,队列名称将为`process_something` . 如果不确定工人使用什么队列,可以使用`SomeWorker.queue`找到它. 几乎没有理由使用`sidekiq_options queue: :some_queue`手动覆盖队列名称. 添加新队列后,运行`bin/rake gitlab:sidekiq:all_queues_yml:generate`来重新生成`app/workers/all_queues.yml`或`ee/app/workers/all_queues.yml`以便可以由[`sidekiq-cluster`](../administration/operations/extra_sidekiq_processes.html)拾取. ## Queue Namespaces[](#queue-namespaces "Permalink") 虽然不同的工作人员无法共享队列,但是他们可以共享队列名称空间. 为工作程序定义队列名称空间可以启动 Sidekiq 进程,该进程自动为该工作空间中的所有工作程序处理作业,而无需显式列出其所有队列名称. 例如,如果由`sidekiq-cron`管理的所有工作人员都使用`cronjob`队列名称空间,那么我们可以专门针对此类计划的作业启动 Sidekiq 进程. 如果稍后添加使用`cronjob`命名空间的新工作程序,则 Sidekiq 进程也将自动为该工作程序选择作业(重新启动后),而无需更改任何配置. 可以使用`queue_namespace` DSL 类方法设置队列名称空间: ``` class SomeScheduledTaskWorker include ApplicationWorker queue_namespace :cronjob # ... end ``` 在后台,这会将`SomeScheduledTaskWorker.queue`设置为`cronjob:some_scheduled_task` . 常用的名称空间将具有自己的关注模块,可以轻松地将其包含在 worker 类中,并且可以设置队列名称空间以外的其他 Sidekiq 选项. 例如, `CronjobQueue`设置名称空间,但也禁用重试. `bundle exec sidekiq`是可感知名称空间的,当提供名称空间而不是`--queue` ( `-q` )选项中的简单队列名称时,它将自动侦听名称空间中的所有队列(技术上:所有以名称空间名称为前缀的队列) ,或`config/sidekiq_queues.yml`中的`:queues:`部分. 请注意,应谨慎执行将工作程序添加到现有命名空间的操作,因为如果没有适当调整可用于处理命名空间的 Sidekiq 进程可用的资源,则额外的作业将占用已经存在的工作程序的资源. ## Idempotent Jobs[](#idempotent-jobs "Permalink") 众所周知,一项作业可能由于多种原因而失败. 例如,网络中断或错误. 为了解决此问题,Sidekiq 具有内置的重试机制,GitLab 中的大多数工作人员默认使用该机制. 期望作业在失败后可以再次运行,而不会给应用程序或用户带来重大副作用,这就是 Sidekiq 鼓励作业具有[幂等性和事务性的原因](https://github.com/mperham/sidekiq/wiki/Best-Practices#2-make-your-job-idempotent-and-transactional) . 通常,在以下情况下,可以将工人视为等幂的: * 它可以使用相同的参数安全地运行多次. * 预期应用程序副作用仅发生一次(或第二次运行的副作用无效). 一个很好的例子是缓存过期工作器. **注意:**如果队列中已经存在具有相同参数的未启动作业,则为等幂工作器调度的作业将自动进行[重复数据删除](#deduplication) . ### Ensuring a worker is idempotent[](#ensuring-a-worker-is-idempotent "Permalink") 确保使用以下共享示例通过工作程序测试: ``` include_examples 'an idempotent worker' do it 'marks the MR as merged' do # Using subject inside this block will process the job multiple times subject expect(merge_request.state).to eq('merged') end end ``` 直接使用`perform_multiple`方法而不是`job.perform` (此辅助方法将自动包含在 worker 中). ### Declaring a worker as idempotent[](#declaring-a-worker-as-idempotent "Permalink") ``` class IdempotentWorker include ApplicationWorker # Declares a worker is idempotent and can # safely run multiple times. idempotent! # ... end ``` 鼓励只具有`idempotent!` 即使在另一个类或模块中定义了`perform`方法,也要在最顶层的 worker 类中调用. **注意:**如果工人阶级没有被标记为幂等,那么警察将失败. 如果您不确定自己的工作可以安全地多次运行,请考虑跳过警察. ### Deduplication[](#deduplication "Permalink") 当队列中有另一个幂函数的作业入队而另一个未启动的作业时,GitLab 会删除第二个作业. 之所以跳过该工作,是因为首先安排的工作将完成相同的工作; 到第二个作业执行时,第一个作业什么也做不了. 例如, `AuthorizedProjectsWorker`需要一个用户 ID. 当工作程序运行时,它将重新计算用户的授权. 每当操作有可能更改用户的授权时,GitLab 都会计划此作业. 如果将同一用户同时添加到两个项目,则如果第一个作业尚未开始,则可以跳过第二个作业,因为当第一个作业运行时,它将为两个项目创建授权. GitLab 不会跳过将来计划的作业,因为我们假设在计划执行作业时状态将已更改. [已经提出了](https://gitlab.com/gitlab-com/gl-infra/scalability/-/issues/195)更多的[重复数据删除策略](https://gitlab.com/gitlab-com/gl-infra/scalability/-/issues/195) . 如果您正在部署的员工可能会从其他策略中受益,请在问题中发表评论. 如果自动重复数据删除会导致某些队列出现问题. 可以通过启用名为`disable_<queue name>_deduplication`的功能标志来暂时禁用此功能. 例如,要禁用`AuthorizedProjectsWorker`重复数据删除,我们将启用功能标记`disable_authorized_projects_deduplication` . 从 ChatOps: ``` /chatops run feature set disable_authorized_projects_deduplication true ``` 从 rails 控制台: ``` Feature.enable!(:disable_authorized_projects_deduplication) ``` ## Job urgency[](#job-urgency "Permalink") 作业可以设置一个`urgency`属性,可以是`:high` , `:low`或`:throttled` . 这些目标如下: | **Urgency** | **队列调度目标** | **执行延迟要求** | | --- | --- | --- | | `:high` | 10 秒 | 1 秒的 p50、10 秒的 p99 | | `:low` | 1 分钟 | 最长运行时间为 5 分钟 | | `:throttled` | None | 最长运行时间为 5 分钟 | 要设置作业的紧急程度,请使用`urgency`类方法: ``` class HighUrgencyWorker include ApplicationWorker urgency :high # ... end ``` ### Latency sensitive jobs[](#latency-sensitive-jobs "Permalink") 如果立即安排大量后台作业,则在作业等待辅助节点可用时,可能会出现作业排队. 这是正常现象,它可以通过系统地处理流量高峰来赋予系统弹性. 但是,某些作业比其他作业对延迟更敏感. 这些工作的示例包括: 1. 在推送到分支之后更新合并请求的作业. 2. 在推送到分支之后,该任务会使项目的已知分支的缓存无效. 3. 更改权限后,用户可以看到重新计算组和项目的作业. 4. 在状态更改为管道中的作业之后更新 CI 管道状态的作业. 当这些作业被延迟时,用户可能会将延迟视为错误:例如,他们可以推送分支,然后尝试为该分支创建合并请求,但在 UI 中被告知该分支不存在. 我们认为这些工作很`urgency :high` 做出额外的努力以确保这些作业在计划后的很短时间内启动. 但是,为了确保吞吐量,这些作业还具有非常严格的执行持续时间要求: 1. 中位作业执行时间应少于 1 秒. 2. 99%的工作应在 10 秒内完成. 如果一个工作人员不能满足这些期望,那么就不能将其视为`urgency :high`工作人员:考虑重新设计该工作人员,或在两个不同的工作人员之间拆分工作,其中一个工作`urgency :high`执行快速的`urgency :high`代码,另一个工作`urgency :low` ,它没有执行延迟要求(但也有较低的调度目标). ### Changing a queue’s urgency[](#changing-a-queues-urgency "Permalink") 在 GitLab.com,我们几个跑 Sidekiq [碎片](https://dashboards.gitlab.net/d/sidekiq-shard-detail/sidekiq-shard-detail) ,其中每一个代表一个特定类型的工作负载. 更改队列的紧急性或添加新队列时,我们需要考虑新分片上的预期工作量. 请注意,如果我们要更改现有队列,那么也会对旧分片产生影响,但这始终会减少工作量. 为此,我们要计算新分片的总执行时间和 RPS(吞吐量)的预期增长. 我们可以从以下获得这些值: * " [队列详细信息"仪表板](https://dashboards.gitlab.net/d/sidekiq-queue-detail/sidekiq-queue-detail)具有队列本身的值. 对于新队列,我们​​可以查找具有类似模式或在类似情况下安排的队列. * [碎片详细信息仪表板](https://dashboards.gitlab.net/d/sidekiq-shard-detail/sidekiq-shard-detail)具有总执行时间和吞吐量(RPS). "分片利用率"面板将显示该分片当前是否有多余的容量. 然后,我们可以计算我们要更改的队列的 RPS *平均运行时间(针对新作业的估算值),以查看新分片期望的 RPS 和执行时间的相对增加: ``` new_queue_consumption = queue_rps * queue_duration_avg shard_consumption = shard_rps * shard_duration_avg (new_queue_consumption / shard_consumption) * 100 ``` 如果我们预期增加**幅度小于 5%** ,则无需采取进一步措施. 否则,请对合并请求 ping `@gitlab-org/scalability`并要求进行审查. ## Jobs with External Dependencies[](#jobs-with-external-dependencies "Permalink") GitLab 应用程序中的大多数后台作业都与其他 GitLab 服务进行通信. 例如,PostgreSQL,Redis,Gitaly 和对象存储. 这些被视为作业的"内部"依赖性. 但是,某些作业将依赖于外部服务才能成功完成. 一些示例包括: 1. 调用用户配置的 Web 钩子的作业. 2. 将应用程序部署到用户配置的 k8s 集群的作业. 这些作业具有"外部依赖性". 这对于后台处理群集的运行有多种重要的作用: 1. 大多数外部依赖项(例如 Web 钩子)都不提供 SLO,因此我们不能保证这些作业的执行延迟. 由于我们无法保证执行延迟,因此无法确保吞吐量,因此,在高流量环境中,我们需要确保将具有外部依赖关系的作业与高紧急性作业分开,以确保这些队列上的吞吐量. 2. Errors in jobs with external dependencies have higher alerting thresholds as there is a likelihood that the cause of the error is external. ``` class ExternalDependencyWorker include ApplicationWorker # Declares that this worker depends on # third-party, external services in order # to complete successfully worker_has_external_dependencies! # ... end ``` **注意:**请注意,一项工作既不能具有很高的紧迫性,又不能具有外部依赖性. ## CPU-bound and Memory-bound Workers[](#cpu-bound-and-memory-bound-workers "Permalink") 受 CPU 或内存资源限制约束的工作程序应使用`worker_resource_boundary`方法进行注释. 大多数工作人员倾向于将大部分时间都花在阻止时间上,等待来自 Redis,PostgreSQL 和 Gitaly 等其他服务的网络响应. 由于 Sidekiq 是多线程环境,因此可以高并发地调度这些作业. 但是,有些工人在 Ruby 中花费大量时间*在 CPU*运行逻辑上. Ruby MRI 不支持真正的多线程-它依赖[GIL](https://thoughtbot.com/blog/untangling-ruby-threads#the-global-interpreter-lock)来极大简化应用程序开发,无论托管该进程的计算机有多少核,一次仅允许一个进程中的一部分 Ruby 代码运行一次. 对于受 IO 约束的工作人员,这不是问题,因为大多数线程在基础库(位于 GIL 之外)中被阻止. 如果许多线程试图同时运行 Ruby 代码,则将导致 GIL 争用,这将减慢所有进程的速度. 在高流量的环境中,知道一个工作人员受 CPU 限制,可以使我们在具有较低并发性的其他队列中运行它. 这样可以确保最佳性能. 同样,如果工作人员使用大量内存,则可以在定制的低并发,高内存队列上运行这些内存. 请注意,受内存限制的工作程序会创建大量的 GC 工作负载,暂停时间为 10-50ms. 这将对工作人员的延迟要求产生影响. 因此, `memory`限制, `urgency :high`作业是不允许的,并且将使 CI 失败. 通常,不鼓励受`memory`限制的工作人员,应考虑处理工作的替代方法. 如果工作程序需要大量的内存和 CPU 时间,则由于上述对高紧急性的内存绑定工作程序的限制,应将其标记为内存绑定. ## Declaring a Job as CPU-bound[](#declaring-a-job-as-cpu-bound "Permalink") 本示例说明如何将作业声明为受 CPU 约束. ``` class CPUIntensiveWorker include ApplicationWorker # Declares that this worker will perform a lot of # calculations on-CPU. worker_resource_boundary :cpu # ... end ``` ## Determining whether a worker is CPU-bound[](#determining-whether-a-worker-is-cpu-bound "Permalink") 我们使用以下方法来确定工作程序是否受 CPU 限制: * 在 Sidekiq 结构化 JSON 日志中,汇总工作`duration`和`cpu_s`字段. * `duration` refers to the total job execution duration, in seconds * `cpu_s`是从[`Process::CLOCK_THREAD_CPUTIME_ID`](https://www.rubydoc.info/stdlib/core/Process:clock_gettime)计数器派生的,它是作业在 CPU 上花费的时间的度量. * 将`cpu_s`除以`duration`即可得到在 CPU 上花费的`duration`百分比. * 如果该比例超过 33%,则认为该工作线程受 CPU 限制,因此应进行注释. * 请注意,这些值不应用于较小的样本量,而应用于相当大的汇总. ## Feature category[](#feature-category "Permalink") 所有 Sidekiq 工作人员都必须定义一个已知的[特征类别](feature_categorization/index.html#sidekiq-workers) . ## Job weights[](#job-weights "Permalink") 某些作业的重量已声明. 仅在默认执行模式下运行 Sidekiq 时才使用此选项-使用[`sidekiq-cluster`](../administration/operations/extra_sidekiq_processes.html)不能计算权重. 随着我们[朝在 Core 中使用`sidekiq-cluster`迈进](https://gitlab.com/gitlab-org/gitlab/-/issues/34396) ,新增加的工作人员无需指定权重. 他们可以简单地使用默认权重 1. ## Worker context[](#worker-context "Permalink") 版本历史 * 在 GitLab 12.8 中[引入](https://gitlab.com/gitlab-com/gl-infra/scalability/-/issues/9) . 为了在日志中获得有关工作程序的更多信息,我们[以`ApplicationContext`的形式向工作](logging.html#logging-context-metadata-through-rails-or-grape-requests)添加[元数据](logging.html#logging-context-metadata-through-rails-or-grape-requests) . 在大多数情况下,从请求计划作业时,该上下文已经从请求中扣除并添加到计划的作业中. 运行作业时,将还原计划时处于活动状态的上下文. 这会使上下文传播到正在运行的作业中计划的任何作业. 所有这些意味着在大多数情况下,要将上下文添加到作业中,我们无需执行任何操作. 但是,在某些情况下,计划作业时将不存在任何上下文,或者存在的上下文很可能不正确. 对于这些实例,我们添加了 Rubocop 规则以引起注意并避免日志中的元数据不正确. 与大多数警察一样,有完全正当的理由禁用它们. 在这种情况下,可能来自请求的上下文是正确的. 或者,您可能已经以警察无法接受的方式指定了上下文. 无论如何,请在禁用警察时留下指向将使用哪个上下文的代码注释. 当确实为上下文提供对象时,请确保已预先加载名称空间和项目的路由. 这可以通过使用来完成`.with_route`上所有定义范围`Routable`秒. ### Cron workers[](#cron-workers "Permalink") 对于 Cronjob 队列( `include CronjobQueue` )中的工作人员,将自动清除上下文,即使从请求中安排工作人员时也是如此. 我们这样做是为了避免从 cron worker 安排其他作业时出现不正确的元数据. Cron 工作人员自己在实例范围内运行,因此它们的作用域不限于应添加到上下文中的用户,名称空间,项目或其他资源. 然而,他们往往安排*确实*需要方面的其他工作. 这就是为什么需要在工作人员中某处显示上下文的原因. 可以通过在工作器内的某些位置使用以下方法之一来完成此操作: 1. 在`with_context`帮助器中包装用于调度作业的代码: ``` def perform deletion_cutoff = Gitlab::CurrentSettings .deletion_adjourned_period.days.ago.to_date projects = Project.with_route.with_namespace .aimed_for_deletion(deletion_cutoff) projects.find_each(batch_size: 100).with_index do |project, index| delay = index * INTERVAL with_context(project: project) do AdjournedProjectDeletionWorker.perform_in(delay, project.id) end end end ``` 2. 使用提供上下文的批处理调度方法: ``` def schedule_projects_in_batch(projects) ProjectImportScheduleWorker.bulk_perform_async_with_contexts( projects, arguments_proc: -> (project) { project.id }, context_proc: -> (project) { { project: project } } ) end ``` 或者,在延迟调度时: ``` diffs.each_batch(of: BATCH_SIZE) do |diffs, index| DeleteDiffFilesWorker .bulk_perform_in_with_contexts(index * 5.minutes, diffs, arguments_proc: -> (diff) { diff.id }, context_proc: -> (diff) { { project: diff.merge_request.target_project } }) end ``` ### Jobs scheduled in bulk[](#jobs-scheduled-in-bulk "Permalink") 通常,在批量调度作业时,这些作业应具有单独的上下文而不是总体上下文. 如果是这样的话, `bulk_perform_async`可以通过更换`bulk_perform_async_with_context`帮手,而不是`bulk_perform_in`使用`bulk_perform_in_with_context` . 例如: ``` ProjectImportScheduleWorker.bulk_perform_async_with_contexts( projects, arguments_proc: -> (project) { project.id }, context_proc: -> (project) { { project: project } } ) ``` 第一个参数中可枚举的每个对象分为两个块: * `arguments_proc` ,它需要返回作业需要调度的参数列表. * 需要返回带有作业上下文信息的哈希值的`context_proc` . ## Arguments logging[](#arguments-logging "Permalink") 当[`SIDEKIQ_LOG_ARGUMENTS`](../administration/troubleshooting/sidekiq.html#log-arguments-to-sidekiq-jobs)启用,Sidekiq 作业参数将被记录. 默认情况下,记录的唯一参数是数字参数,因为其他类型的参数可能包含敏感信息. 要覆盖此参数,请在工作程序内部使用`loggable_arguments`并记录要记录的参数的索引. (此处不需要指定数字参数.) 例如: ``` class MyWorker include ApplicationWorker loggable_arguments 1, 3 # object_id will be logged as it's numeric # string_a will be logged due to the loggable_arguments call # string_b will be filtered from logs # string_c will be logged due to the loggable_arguments call def perform(object_id, string_a, string_b, string_c) end end ``` ## Tests[](#tests "Permalink") 与其他任何类一样,每个 Sidekiq 工作者都必须使用 RSpec 进行测试. 这些测试应放在`spec/workers` . ## Sidekiq Compatibility across Updates[](#sidekiq-compatibility-across-updates "Permalink") 请记住,Sidekiq 作业的参数在计划执行时存储在队列中. 在线更新期间,这可能会导致几种可能的情况: 1. 该应用程序的较旧版本发布作业,该作业由升级的 Sidekiq 节点执行. 2. 作业在升级之前排队,但在升级之后执行. 3. 作业由运行较新版本应用程序的节点排队,但在运行较旧版本应用程序的节点上执行. ### Changing the arguments for a worker[](#changing-the-arguments-for-a-worker "Permalink") 作业需要在应用程序的连续版本之间向后和向前兼容. 在所有 Rails 和 Sidekiq 节点都具有更新的代码之前,添加或删除参数可能会在部署期间引起问题. #### Remove an argument[](#remove-an-argument "Permalink") **不要从`perform`函数中删除参数.** . 而是,使用以下方法: 1. 提供默认值(通常为`nil` )并使用注释将参数标记为已弃用 2. 停止在`perform_async`使用该参数. 3. 忽略 worker 类中的值,但是直到下一个主要版本才将其删除. 在以下示例中,如果要删除`arg2` ,请首先设置`nil`默认值,然后更新调用`ExampleWorker.perform_async`位置. ``` class ExampleWorker def perform(object_id, arg1, arg2 = nil) # ... end end ``` #### Add an argument[](#add-an-argument "Permalink") 有两种方法可以安全地向 Sidekiq 工作者添加新参数: 1. Set up a [multi-step deployment](#multi-step-deployment) in which the new argument is first added to the worker 2. 将[参数哈希](#parameter-hash)用于其他参数. 这也许是最灵活的选择. ##### Multi-step deployment[](#multi-step-deployment "Permalink") 这种方法需要多个合并请求,并且在合并其他更改之前,要合并和部署第一个合并请求. 1. 在初始合并请求中,使用默认值将参数添加到 worker 中: ``` class ExampleWorker def perform(object_id, new_arg = nil) # ... end end ``` 2. 使用新参数合并和部署工作程序. 3. 在另一个合并请求中,更新`ExampleWorker.perform_async`调用以使用新参数. ##### Parameter hash[](#parameter-hash "Permalink") 如果现有工作人员已经利用参数哈希,则此方法将不需要多次部署. 1. 在 worker 中使用参数散列以实现将来的灵活性: ``` class ExampleWorker def perform(object_id, params = {}) # ... end end ``` ### Removing workers[](#removing-workers "Permalink") 尽量避免在次要版本和修补程序版本中删除工作人员及其队列. 在联机更新期间,实例可能有待处理的作业,而删除队列可能导致这些作业永远卡住. 如果您无法为这些 Sidekiq 作业编写迁移,请考虑仅在主要版本中删除该工作程序. ### Renaming queues[](#renaming-queues "Permalink") 出于同样的原因,遣散工人也很危险,因此在重命名队列时应格外小心. 重命名队列时,请使用`sidekiq_queue_migrate`帮助程序迁移方法,如本示例所示: ``` class MigrateTheRenamedSidekiqQueue < ActiveRecord::Migration[5.0] include Gitlab::Database::MigrationHelpers DOWNTIME = false def up sidekiq_queue_migrate 'old_queue_name', to: 'new_queue_name' end def down sidekiq_queue_migrate 'new_queue_name', to: 'old_queue_name' end end ```