如何实现一个基于 Volcano 的作业同步控制器

最近通过 Antigravity + Claude Opus 很快设计 + 搭建了一个套壳基于 Volcano 的 k8s 调度器。为了防止抽象泄露,学习记录一下服务中用到的一些关键概念和实现。

本文信息源大部分来自(尽可能贵的) AI,文字输入经过本人神经元后重新输出产生。

Controller

What is Controller

在 K8s 中,用户通过声明式 API 定义资源的“预期状态”,Controller 则负责监视资源的实际状态,当资源的实际状态和“预期状态”不一致时,Controller 则对系统进行必要的更改,以确保两者一致,这个过程被称之为调谐(Reconcile)。

Controller 的核心是一个无限循环,不断执行:

  1. 观察:获取当前状态
  2. 对比:对比期望状态和当前状态
  3. 行动:执行操作消除差异

这也意味着我们需要实现的作业同步服务本质上也是一个 Controller:

  1. 观察:Informer 监听 VcJob 变更
  2. 对比:业务逻辑判断(是否需要入库/处理作业资源)
  3. 行动:更新 MySQL + 归档日志 + 清理资源

How to Reconcile

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func (c *Controller) syncJob(key string) error {
// 1. 获取当前状态
job, err := c.vcClient.BatchV1alpha1().Jobs(namespace).Get(ctx, name, metav1.GetOptions{})

// 2. 读取状态
phase := string(job.Status.State.Phase)

// 3. 同步到 MySQL (期望状态:数据库和 K8s 状态一致)
c.repo.UpdateStatus(ctx, name, namespace, phase)

// 4. 终态处理
if isTerminalPhase(phase) {
c.archiveLogs(ctx, job) // 归档日志
c.removeFinalizer(ctx, job) // 移除 Finalizer
c.vcClient.Jobs(...).Delete() // 清理资源
}
}
  • Finalizer 是 k8s 中标记删除资源的一种方式,设计中用来给作业添加“锁”,保证作业同步数据不丢失。

Informer

What is Informer

Informer 是 Kubernetes client-go 库提供的一种高效监听 K8s 资源变更的机制。它的核心优势是:

特性 说明
本地缓存 将 K8s 资源缓存到本地内存,读操作直接访问缓存
Watch 机制 使用长连接监听资源变更,增量更新而非轮询
事件驱动 资源变更时触发回调函数,而非主动查询
自动重连 连接断开后自动 re-list + re-watch

How to Implement a (Volcano) Informer

  1. 初始化 Informer
1
2
factory := vcinformers.NewSharedInformerFactory(vcClient, time.Minute*10)
jobInformer := factory.Batch().V1alpha1().Jobs().Informer()
  • 使用 Volcano 的 Informer Factory 监听 Volcano Job (VcJob) 资源
  • K8s 在 client go 中基于 Informer 之上再做了一层封装,提供了 SharedInformer 机制。采用 SharedInformer 后,客户端对同一种资源对象只会有一个对 API Server 的 ListAndWatch 调用,多个 Informer 也会共用同一份缓存,减少了对 API Server 的请求,提高了性能。
  • Resync Period = 10 分钟:即使没有事件,也会每 10 分钟强制执行一次全量同步
  1. 注册事件处理器
1
2
3
4
5
jobInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: c.handleAdd, // Job 新建时触发
UpdateFunc: c.handleUpdate, // Job 更新时触发
DeleteFunc: c.handleDelete, // Job 删除时触发
})

Work Queue

What is Work Queue

Work Queue(工作队列)是 Kubernetes client-go 库中用于解耦事件生产者和消费者的核心组件,在 Controller 模式中扮演关键角色。

client-go 提供了三种队列:

  • 基础队列 (workqueue.Interface):去重、并发安全、支持多 Worker消费。
  • 延迟队列 (DelayingInterface):继承基础队列所有功能,支持延迟入队。
  • 限速队列 (RateLimitingInterface) :继承延迟队列所有功能,失败重试时按指数退避。

Why Work Queue

直接在 Informer 事件回调中处理业务逻辑会有问题:

  • 阻塞 Informer:事件处理慢会阻塞后续事件分发
  • 重复处理:短时间内同一资源多次变更,每次都要处理
  • 失败无重试:处理失败后没有重试机制
  • 无速率控制:大量事件涌入可能压垮系统

Work Queue 使 Informer 的事件回调操作只负责入队,后续在 Worker Goroutine 中实际处理逻辑。

How to Use Work Queue

DefaultControllerRateLimiter() 组合了两种策略:

1
2
3
4
5
6
7
// Source: k8s.io/client-go/util/workqueue/default_rate_limiters.go
func DefaultControllerRateLimiter() RateLimiter {
return NewMaxOfRateLimiter(
NewItemExponentialFailureRateLimiter(5*time.Millisecond, 1000*time.Second),
&BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
)
}
  • 指数退避:失败后等待时间翻倍
  • 令牌桶:全局每秒最多 10 个,突发最多 100 个

入队逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
func (c *Controller) handleAdd(obj interface{}) {
key, _ := cache.MetaNamespaceKeyFunc(obj) // 生成 "namespace/name"
c.workqueue.Add(key) // 入队
}
func (c *Controller) handleUpdate(oldObj, newObj interface{}) {
key, _ := cache.MetaNamespaceKeyFunc(newObj)
c.workqueue.Add(key)
}
func (c *Controller) handleDelete(obj interface{}) {
key, _ := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
c.workqueue.Add(key)
}
  • 使用 namespace/name 作为 key(默认策略)

消费逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
func (c *Controller) runWorker() {
for c.processNextWorkItem() {
}
}
func (c *Controller) processNextWorkItem() bool {
// 1. 阻塞获取 item
obj, shutdown := c.workqueue.Get()
if shutdown {
return false
}

// 2. 标记处理完成(允许相同 key 再次入队)
defer c.workqueue.Done(obj)

key, ok := obj.(string)
if !ok {
c.workqueue.Forget(obj) // 无效 key,丢弃
return true
}

// 3. 实际处理逻辑
if err := c.syncJob(key); err != nil {
c.logger.Errorf("Error syncing job %s: %v", key, err)
c.workqueue.AddRateLimited(obj) // ❌ 失败:限速重试
return true
}

c.workqueue.Forget(obj) // ✅ 成功:清除重试计数
return true
}

Worker 注册:

1
wait.Until(c.runWorker, time.Second, ctx.Done())

Worker Queue 的设计是并发安全的,所以也可以启动多个:

1
2
3
for i := 0; i < 4; i++ {
go wait.Until(c.runWorker, time.Second, ctx.Done())
}