最近通过 Antigravity + Claude Opus 很快设计 + 搭建了一个套壳基于 Volcano 的 k8s 调度器。为了防止抽象泄露,学习记录一下服务中用到的一些关键概念和实现。
本文信息源大部分来自(尽可能贵的) AI,文字输入经过本人神经元后重新输出产生。
Controller
What is Controller
在 K8s 中,用户通过声明式 API 定义资源的“预期状态”,Controller 则负责监视资源的实际状态,当资源的实际状态和“预期状态”不一致时,Controller 则对系统进行必要的更改,以确保两者一致,这个过程被称之为调谐(Reconcile)。
Controller 的核心是一个无限循环,不断执行:
- 观察:获取当前状态
- 对比:对比期望状态和当前状态
- 行动:执行操作消除差异
这也意味着我们需要实现的作业同步服务本质上也是一个 Controller:
- 观察:Informer 监听 VcJob 变更
- 对比:业务逻辑判断(是否需要入库/处理作业资源)
- 行动:更新 MySQL + 归档日志 + 清理资源
How to Reconcile
1 | func (c *Controller) syncJob(key string) error { |
- Finalizer 是 k8s 中标记删除资源的一种方式,设计中用来给作业添加“锁”,保证作业同步数据不丢失。
Informer
What is Informer
Informer 是 Kubernetes client-go 库提供的一种高效监听 K8s 资源变更的机制。它的核心优势是:
| 特性 | 说明 |
|---|---|
| 本地缓存 | 将 K8s 资源缓存到本地内存,读操作直接访问缓存 |
| Watch 机制 | 使用长连接监听资源变更,增量更新而非轮询 |
| 事件驱动 | 资源变更时触发回调函数,而非主动查询 |
| 自动重连 | 连接断开后自动 re-list + re-watch |
How to Implement a (Volcano) Informer
- 初始化 Informer
1 | factory := vcinformers.NewSharedInformerFactory(vcClient, time.Minute*10) |
- 使用 Volcano 的 Informer Factory 监听 Volcano Job (VcJob) 资源
- K8s 在 client go 中基于 Informer 之上再做了一层封装,提供了 SharedInformer 机制。采用 SharedInformer 后,客户端对同一种资源对象只会有一个对 API Server 的 ListAndWatch 调用,多个 Informer 也会共用同一份缓存,减少了对 API Server 的请求,提高了性能。
- Resync Period = 10 分钟:即使没有事件,也会每 10 分钟强制执行一次全量同步
- 注册事件处理器
1 | jobInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ |
Work Queue
What is Work Queue
Work Queue(工作队列)是 Kubernetes client-go 库中用于解耦事件生产者和消费者的核心组件,在 Controller 模式中扮演关键角色。
client-go 提供了三种队列:
- 基础队列 (workqueue.Interface):去重、并发安全、支持多 Worker消费。
- 延迟队列 (DelayingInterface):继承基础队列所有功能,支持延迟入队。
- 限速队列 (RateLimitingInterface) :继承延迟队列所有功能,失败重试时按指数退避。
Why Work Queue
直接在 Informer 事件回调中处理业务逻辑会有问题:
- 阻塞 Informer:事件处理慢会阻塞后续事件分发
- 重复处理:短时间内同一资源多次变更,每次都要处理
- 失败无重试:处理失败后没有重试机制
- 无速率控制:大量事件涌入可能压垮系统
Work Queue 使 Informer 的事件回调操作只负责入队,后续在 Worker Goroutine 中实际处理逻辑。
How to Use Work Queue
DefaultControllerRateLimiter() 组合了两种策略:
1 | // Source: k8s.io/client-go/util/workqueue/default_rate_limiters.go |
- 指数退避:失败后等待时间翻倍
- 令牌桶:全局每秒最多 10 个,突发最多 100 个
入队逻辑:
1 | func (c *Controller) handleAdd(obj interface{}) { |
- 使用
namespace/name作为 key(默认策略)
消费逻辑:
1 | func (c *Controller) runWorker() { |
Worker 注册:
1 | wait.Until(c.runWorker, time.Second, ctx.Done()) |
Worker Queue 的设计是并发安全的,所以也可以启动多个:
1 | for i := 0; i < 4; i++ { |