自定义Kubernetes调度器的改造与实践

原文出处:
改造 Kubernetes 自定义调度器 | Jayden’s Blog (jaydenchang.top)

Overview

Kubernetes默认调度器在调度Pod时并不关心特殊资源,例如磁盘、GPU等。对此,我们尝试对调度器进行改造。通过研究官方调度器框架、调度器配置以及参考其他文章,开始尝试自己改写一下。

环境配置

相关软件版本:

  • Kubernetes 版本:v1.19.0
  • Docker 版本:v26.1.2
  • Prometheus 版本:v2.49
  • Node Exporter 版本:v1.7.0

集群内有 1 个 master 和 3 个 node。

实验部分

项目总览

项目结构如下:


.
├── Dockerfile
├── deployment.yaml
├── go.mod
├── go.sum
├── main.go
├── pkg
│   ├── cpu
│   │   └── cputraffic.go
│   ├── disk
│   │   └── disktraffic.go
│   ├── diskspace
│   │   └── diskspacetraffic.go
│   ├── memory
│   │   └── memorytraffic.go
│   ├── network
│   │   └── networktraffic.go
│   └── prometheus.go
├── scheduler
├── scheduler.conf
└── scheduler.yaml

插件部分

下面以构建内存插件为例。

定义插件名称、变量和结构体


const MemoryPlugin = "MemoryTraffic"
var _ = framework.ScorePlugin(&MemoryTraffic{})

下面来实现 framework.FrameworkHandle 的接口。

先定义插件初始化入口


func New(plArgs runtime.Object, h framework.FrameworkHandle) (framework.Plugin, error) {
args := &MemoryTrafficArgs{}
if err := fruntime.DecodeInto(plArgs, args); err != nil {
return nil, err
}

klog.Infof("[MemoryTraffic] args received. Device: %s; TimeRange: %d, Address: %s", args.DeviceName, args.TimeRange, args.IP)
return &MemoryTraffic{
handle: h,
prometheus: pkg.NewProme(args.IP, args.DeviceName, time.Minute*time.Duration(args.TimeRange)),
}, nil
}

实现 Score 接口,Score 进行初步打分


func (n *MemoryTraffic) Score(ctx context.Context, state *framework.CycleState, p *corev1.Pod, nodeName string) (int64, *framework.Status) {
nodeBandwidth, err := n.prometheus.MemoryGetGauge(nodeName)
if err != nil {
return 0, framework.NewStatus(framework.Error, fmt.Sprintf("error getting node bandwidth measure: %s", err))
}
bandWidth := int64(nodeBandwidth.Value)
klog.Infof("[MemoryTraffic] node '%s' bandwidth: %v", nodeName, bandWidth)
return bandWidth, nil
}

实现 NormalizeScore,对上一步 Score 的打分进行修正


func (n *MemoryTraffic) NormalizeScore(ctx context.Context, state *framework.CycleState, pod *corev1.Pod, scores framework.NodeScoreList) *framework.Status {
var higherScore int64
for _, node := range scores {
if higherScore < node.Score {
higherScore = node.Score
}
}
// 计算公式为,满分 - (当前内存使用 / 总内存 * 100)
// 公式的计算结果为,内存使用率越大的节点,分数越低
for i, node := range scores {
scores[i].Score = node.Score * 100 / higherScore
klog.Infof("[MemoryTraffic] Nodes final score: %v", scores[i].Score)
}

klog.Infof("[MemoryTraffic] Nodes final score: %v", scores)
return nil
}

配置插件名称和返回 ScoreExtension


func (n *MemoryTraffic) Name() string {
return MemoryPlugin
}

// 如果返回framework.ScoreExtensions 就需要实现framework.ScoreExtensions
func (n *MemoryTraffic) ScoreExtensions() framework.ScoreExtensions {
return n
}

Prometheus 部分

首先来编写查询内存可用率的 PromQL


const memoryMeasureQueryTemplate = ` (avg_over_time(node_memory_MemAvailable_bytes[30m]) / avg_over_time(node_memory_MemTotal_bytes[30m])) * 100 * on(instance) group_left(nodename) (node_uname_info{nodename="%s"})`

然后来声明 PrometheusHandle


type PrometheusHandle struct {
deviceName string
timeRange time.Duration
ip string
client v1.API
}

另外在插件部分也要声明查询Prometheus的参数结构体


type MemoryTrafficArgs struct {
IP string `json:"ip"`
DeviceName string `json:"deviceName"`
TimeRange int `json:"timeRange"`
}

编写初始化Prometheus插件入口


func NewProme(ip, deviceName string, timeRace time.Duration) *PrometheusHandle {
client, err := api.NewClient(api.Config{Address: ip})
if err != nil {
klog.Fatalf("[Prometheus Plugin] FatalError creating prometheus client: %s", err.Error())
}
return &PrometheusHandle{
deviceName: deviceName,
ip: ip,
timeRange: timeRace,
client: v1.NewAPI(client),
}
}

编写通用查询接口,可供其他类型资源查询


func (p *PrometheusHandle) query(promQL string) (model.Value, error) {
results, warnings, err := p.client.Query(context.Background(), promQL, time.Now())
if len(warnings) > 0 {
klog.Warningf("[Prometheus Query Plugin] Warnings: %v\n", warnings)
}

return results, err
}

获取内存可用率接口


func (p *PrometheusHandle) MemoryGetGauge(node string) (*model.Sample, error) {
value, err := p.query(fmt.Sprintf(memoryMeasureQueryTemplate, node))
fmt.Println(fmt.Sprintf(memoryMeasureQueryTemplate, node))
if err != nil {
return nil, fmt.Errorf("[MemoryTraffic Plugin] Error querying prometheus: %w", err)
}

nodeMeasure := value.(model.Vector)
if len(nodeMeasure) != 1 {
return nil, fmt.Errorf("[MemoryTraffic Plugin] Invalid response, expected 1 value, got %d", len(nodeMeasure))
}
return nodeMeasure[0], nil
}

然后在程序入口里启用插件并执行


func main() {
rand.Seed(time.Now().UnixNano())
command := app.NewSchedulerCommand(
app.WithPlugin(network.NetworkPlugin, network.New),
app.WithPlugin(disk.DiskPlugin, disk.New),
app.WithPlugin(diskspace.DiskSpacePlugin, diskspace.New),
app.WithPlugin(cpu.CPUPlugin, cpu.New),
app.WithPlugin(memory.MemoryPlugin, memory.New),
)
// 对于外部注册一个plugin
// command := app.NewSchedulerCommand(
// app.WithPlugin("example-plugin1", ExamplePlugin1.New))

if err := command.Execute(); err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
os.Exit(1)
}
}

配置部分

为方便观察,这里使用二进制方式运行,准备运行时的配置文件


apiVersion: kubescheduler.config.k8s.io/v1beta1
kind: KubeSchedulerConfiguration
clientConnection:
kubeconfig: /etc/kubernetes/scheduler.conf
profiles:
- schedulerName: custom-scheduler
plugins:
score:
enabled:
- name: "CPUTraffic"
weight: 3
- name: "MemoryTraffic"
weight: 4
- name: "DiskSpaceTraffic"
weight: 3
- name: "NetworkTraffic"
weight: 2
disabled:
- name: "*"
pluginConfig:
- name: "NetworkTraffic"
args:
ip: "http://172.19.32.140:9090"
deviceName: "eth0"
timeRange: 60
- name: "CPUTraffic"
args:
ip: "http://172.19.32.140:9090"
deviceName: "eth0"
timeRange: 0
- name: "MemoryTraffic"
args:
ip: "http://172.19.32.140:9090"
deviceName: "eth0"
timeRange: 0
- name: "DiskSpaceTraffic"
args:
ip: "http://172.19.32.140:9090"
deviceName: "eth0"
timeRange: 0

kubeconfig处为master节点的scheduler.conf,以实际路径为准,内包含集群的证书哈希,ip为部署Prometheus节点的ip,端口为Promenade配置中对外暴露的端口。

将二进制文件和scheduler.yaml放至master同一目录下运行:


./scheduler --logtostderr=true \
--address=127.0.0.1 \
--v=6 \
--config=`pwd`/scheduler.yaml \
--kubeconfig="/etc/kubernetes/scheduler.conf" \

验证结果

准备一个要部署的Pod,使用指定的调度器名称


apiVersion: apps/v1
kind: Deployment
metadata:
name: gin
namespace: default
labels:
app: gin
spec:
replicas: 2
selector:
matchLabels:
app: gin
template:
metadata:
labels:
app: gin
spec:
schedulerName: my-custom-scheduler # 使用自定义调度器
containers:
- name: gin
image: jaydenchang/k8s_test:latest
imagePullPolicy: Always
command: ["./app"]
ports:
- containerPort: 9999
protocol: TCP

最后的可以查看日志,部分日志如下:


I0808 17:32:35.138289 27131 memorytraffic.go:83] [MemoryTraffic] node 'node1' bandwidth: %!s(int64=2680340)
I0808 17:32:35.138763 27131 memorytraffic.go:70] [MemoryTraffic] Nodes final score: [{node1 2680340} {node2 0}]
I0808 17:32:35.138851 27131 memorytraffic.go:70] [MemoryTraffic] Nodes final score: [{node1 71} {node2 0}]
I0808 17:32:35.138911 27131 memorytraffic.go:73] [MemoryTraffic] Nodes final score: [{node1 71} {node2 0}]
I0808 17:32:35.139565 27131 default_binder.go:51] Attempting to bind default/go-deployment-66878c4885-b4b7k to node1
I0808 17:32:35.141114 27131 eventhandlers.go:225] add event for scheduled pod default/go-deployment-66878c4885-b4b7k
I0808 17:32:35.141714 27131 eventhandlers.go:205] delete event for unscheduled pod default/go-deployment-66878c4885-b4b7k
I0808 17:32:35.143504 27131 scheduler.go:609] "Successfully bound pod to node" pod="default/go-deployment-66878c4885-b4b7k" node="node1" evaluatedNodes=2 feasibleNodes=2
I0808 17:32:35.104540 27131 scheduler.go:609] "Successfully bound pod to node" pod="default/go-deployment-66878c4885-b4b7k" node="node1" evaluatedNodes=2 feasibleNodes=2

参考链接

  1. Scheduling Framework | Kubernetes ↩︎

  2. Scheduler Configuration | Kubernetes ↩︎

  3. 基于Prometheus的Kubernetes网络调度器 | Cylon’s Collection (oomkill.com) ↩︎

未经允许不得转载:大白鲨游戏网 » 自定义Kubernetes调度器的改造与实践