forked from chenzongshu/Docker
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdocker 集群随笔
235 lines (175 loc) · 11.2 KB
/
docker 集群随笔
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
针对global类型的服务
设置DesiredState为shutdown的地方
global.go
(1)state.EventUpdateTask 如果t.Status.State > api.TaskStateRunning 触发 restart task。
(2)removeTask 函数---》removeTasks---》reconcileServices--->state.EventUpdateService
--->state.EventCreateService
--->reconcileServicesOneNode--->state.EventDeleteTask
---->reconcileOneNode ----> state.EventUpdateNode(g.reconcileOneNode(ctx, v.Node))
---》removeTasksFromNode---》EventUpdateNode
---》EventDeleteNode
---》reconcileOneNode(node.Spec.Availability == api.NodeAvailabilityDrain)
updater.go
removeOldTasks --->useExistingTask ---->func (u *Updater) worker( 不同分支----reconcileServices
--->updateTask ---->func (u *Updater) worker( ----》reconcileServices
reconcileServices 分析
(1)遍历serviceIDs 针对每个service 做以下操作
查找属于这个服务的所有task(不管状态)
将task 根据是否running 方式不同的数组中。
DesiredState <= api.TaskStateRunning放入到nodeTasks[serviceID][t.NodeID]
t.Status.State == api.TaskStateCompleted 放入到nodeCompleted[serviceID][t.NodeID]
(2)再次遍历serviceIDs 针对每个service 做以下操作
如果nodeTasks[serviceID] 为空,则继续下一个服务
遍历 g.nodes 也就是global 注册的node
delete(nodeTasks[serviceID], nodeID)
如果 nodeCompleted[serviceID][t.NodeID] 存在,则将这个节点的nodeTasks[serviceID][nodeID]全部remove掉
如果nodeTasks[serviceID][nodeID]为空,表示global服务在此节点没有运行的task,需要创建task。
如果nodeTasks[serviceID][nodeID]存在,则追加到updateTasks
遍历nodeTasks[serviceID],对剩下的任务执行 removeTasks
(3)
for service, updateTasks := range updates {
g.updater.Update(ctx, g.cluster, service, updateTasks)
}
reconcileServices---》updater.Update
update的作用就是遍历当前服务 根据slot情况 梳理task,只保留一个running状态的task,如果没有一个期望状态为task,
则创建新任务或者将最后一个期望状态小于running的task 期望状态更新为running,
其他都remove掉。
manager/orchestrator 此目录是指编排服务,写数据库而已,
manager/dispatcher 主要分发task给agent,通过与agent的session进行交流,新创建grpc连接来接收agent报上来的task status更新到数据库。
dispatch Tasks函数
对数据库中的所有task进行监控,当有变化,触发 nodeTasks channel
检查触发event分别进行处理,主要是更新tasksMap数组。
taskmap 中tasks 会被通过grpc分发到agent端。、
Task函数最多处理200次nodeTasks事件或者50毫秒。
agent 通过session收到的tasks 是一些应该运行在该节点上的任务列表,不在此列表上的task应该terminate。
type TasksMessage struct {
// Tasks is the set of tasks that should be running on the node.
// Tasks outside of this set running on the node should be terminated.
Tasks []*Task `protobuf:"bytes,1,rep,name=tasks" json:"tasks,omitempty"`
}
dispatcher 是通过grpc通信,cs模式,agent是client端,leader节点必定有一个client。
var _Dispatcher_serviceDesc = grpc.ServiceDesc{
ServiceName: "docker.swarmkit.v1.Dispatcher",
HandlerType: (*DispatcherServer)(nil),
Methods: []grpc.MethodDesc{
{
MethodName: "Heartbeat",
Handler: _Dispatcher_Heartbeat_Handler,
},
{
MethodName: "UpdateTaskStatus",
Handler: _Dispatcher_UpdateTaskStatus_Handler,
},
},
Streams: []grpc.StreamDesc{
{
StreamName: "Session",
Handler: _Dispatcher_Session_Handler,
ServerStreams: true,
},
{
StreamName: "Tasks",
Handler: _Dispatcher_Tasks_Handler,
ServerStreams: true,
},
},
}
agent 发送UpdateTaskStatus给 server端,server端 dispatcher 定时或者定量更新集群存储中task的status。
agent端session start成功后, 创建3个携程
go runctx(ctx, s.closed, s.errs, s.heartbeat)
go runctx(ctx, s.closed, s.errs, s.watch)
go runctx(ctx, s.closed, s.errs, s.listen)
watch用来 向leader 发送tasks 列表的请求,leader每个一段时间回复tasks列表。
heartbeat 用来保持与leader的心跳。
listen监听 session本身的消息。
关于task的状态转化过程
task状态包括:
TaskStateNew TaskState = 0
TaskStateAllocated TaskState = 64
TaskStatePending TaskState = 128
TaskStateAssigned TaskState = 192
TaskStateAccepted TaskState = 256
TaskStatePreparing TaskState = 320
TaskStateReady TaskState = 384
TaskStateStarting TaskState = 448
TaskStateRunning TaskState = 512
TaskStateCompleted TaskState = 576
TaskStateShutdown TaskState = 640
TaskStateFailed TaskState = 704
TaskStateRejected TaskState = 768
1、manager端 TaskStateNew 调用newTask函数 创建的task,task.status 都是 TaskStateNew,DesiredState就是 TaskStateRunning
2、TaskStateAllocated task分配网络后,设置为TaskStateAllocated
./manager/allocator/network.go: updateTaskStatus(storeT, api.TaskStateAllocated, "allocated")
3、 scheduler模块,
检测node上是否有资源运行该任务,或者调度task到某个node上。
taskFitNode ./manager/scheduler/scheduler.go: State: api.TaskStateAssigned,
scheduleTask ./manager/scheduler/scheduler.go: State: api.TaskStateAssigned,
4、
agent端 负责的几个state。
./agent/exec/controller.go: return transition(api.TaskStateReady, "prepared")
./agent/exec/controller.go: status.State = api.TaskStateAccepted
./agent/exec/controller.go: return transition(api.TaskStatePreparing, "preparing")
./agent/exec/controller.go: return transition(api.TaskStateStarting, "starting")
./agent/exec/controller.go: return transition(api.TaskStateRunning, "started")
TaskStateCompleted 这个非常特殊,容器退出,没有错误,才会设置。目前不清楚什么情况会发生。
./agent/exec/controller.go: return transition(api.TaskStateCompleted, "finished")
当task.DesiredState == api.TaskStateShutdown 的时候, task 状态会变成 TaskStateShutdown
./agent/exec/controller.go: return transition(api.TaskStateShutdown, "shutdown")
当时容器异常退出或者被kill ,status state会被设置为TaskStateFailed。
global.go 全局服务 leader收到状态变化后,
if isTaskTerminated(v.Task) {
g.restartTask(ctx, v.Task.ID, v.Task.ServiceID)
}
case api.TaskStateRunning:
if err := ctlr.Wait(ctx); err != nil {
return fatal(err)
}
return transition(api.TaskStateCompleted, "finished")
fatal:=
switch {
case status.State < api.TaskStateStarting:
status.State = api.TaskStateRejected
case status.State >= api.TaskStateStarting:
status.State = api.TaskStateFailed
}
transition := func(state api.TaskState, msg string) (*api.TaskStatus, error) {
current := status.State
status.State = state
status.Message = msg
if current > state {
panic("invalid state transition")
}
return status, nil
}
&Task{ID:361undifpwfhou0nbifqd1pfv,Meta:Meta{Version:Version{Index:2472,},
CreatedAt:&docker_swarmkit_v1.Timestamp{Seconds:1515550683,Nanos:259408447,},
UpdatedAt:&docker_swarmkit_v1.Timestamp{Seconds:1515550687,Nanos:759699923,},},
Spec:TaskSpec{Runtime:&TaskSpec_Container{Container:&ContainerSpec{Image:10.62.107.171:62648/admin/cf-vnpm:v1.17.30.03.p06.1300069,Labels:map[string]string{},
Command:[],Args:[],Env:[],Dir:,User:,Mounts:[{BIND /root/cf-base/usr /usr false nil nil nil}
{BIND /root/cf-base/lib /lib false nil nil nil} {BIND /root/info/etcs/cloudframe /etc/cloudframe false nil nil nil}
{BIND /root/info/logs/cf-vnpm /var/log/cf-vnpm false nil nil nil} {BIND /root/info/etcs/cloudframe/supervisord-vnpm.conf /etc/supervisord.conf false nil nil nil} {BIND /root/zartcli /home/soft_repo_client/bin false nil nil nil} {BIND /root/cfy-volumes/pluginfile /home/vnpmfile//pluginfile false nil nil nil} {BIND /etc/localtime /etc/localtime:ro false nil nil nil}],StopGracePeriod:nil,PullOptions:nil,},},Resources:&ResourceRequirements{Limits:&Resources{NanoCPUs:0,MemoryBytes:0,},Reservations:&Resources{NanoCPUs:0,MemoryBytes:0,},},Restart:&RestartPolicy{Condition:ANY,Delay:nil,MaxAtt
empts:0,Window:nil,},Placement:&Placement{Constraints:[],},LogDriver:nil,},
ServiceID:93ecdtbq4wpeejxz0hdkfxn2w,Slot:0,NodeID:ewnpbrxk8bsiugczrrxqnmbcv,Annotations:Annotations
{Name:,Labels:map[string]string{},},ServiceAnnotations:Annotations{Name:cf-vnpm-service,Labels:map[string]string{},},
Status:TaskStatus{Timestamp:&docker_swarmkit_v1.Timestamp{Seconds:1515550687,Nanos:393929692,},State:RUNNING,Message:started,Err:,
RuntimeStatus:&TaskStatus_Container{Container:&ContainerStatus{ContainerID:6adb24599c53be44c867a0d4100eef1306b3e4103885bd38d418b3efc8600c1c,PID:10762,ExitCode:0,},},
PortStatus:&PortStatus{Ports:[],},},
DesiredState:RUNNING,Networks:[],Endpoint:&Endpoint{Spec:&EndpointSpec{Mode:VIP,Ports:[],},Ports:[],VirtualIPs:[],},LogDriver:nil,}
检测是否相同的内容,排除的内容包括:
Meta:Meta{Version:Version{Index:2472,},
CreatedAt:&docker_swarmkit_v1.Timestamp{Seconds:1515550683,Nanos:259408447,},
UpdatedAt:&docker_swarmkit_v1.Timestamp{Seconds:1515550687,Nanos:759699923,},},
Status:TaskStatus{Timestamp:&docker_swarmkit_v1.Timestamp{Seconds:1515550687,Nanos:393929692,},State:RUNNING,Message:started,Err:,
RuntimeStatus:&TaskStatus_Container{Container:&ContainerStatus{ContainerID:6adb24599c53be44c867a0d4100eef1306b3e4103885bd38d418b3efc8600c1c,PID:10762,ExitCode:0,},},
PortStatus:&PortStatus{Ports:[],},},
case task := <-tm.updateq:
if equality.TasksEqualStable(task, tm.task) {
continue // ignore the update
}
scheduler
老的swarm ,1.12以前策略有三个:
spread: 默认策略,尽量均匀分布,找容器数少的结点调度
binpack: 和spread相反,尽量把一个结点占满再用其他结点
random: 随机
老的Swarm没有replica的概念,每个实例都是独立的个体,所以不需要在调度的时候考虑多副本部署到不同主机。 新的Swarm调度算法和老Swarm差不多,不过不再提供策略选择,只提供了spread策略。
新的Swarm把结点信息放到一个堆里(堆排序的堆),以当前结点上的容器数为建堆的标准建一个最小堆,这样查找起来就特别快了。