@@ -1131,3 +1131,160 @@ func TestDockerExecAPI(t *testing.T) {
1131
1131
1132
1132
waitFinished (t , finished , testTimeout )
1133
1133
}
1134
+
1135
+ // This integ test checks for task queuing behavior in waitingTaskQueue which is dependent on hostResourceManager.
1136
+ // First two tasks totally consume the available memory resource on the host. So the third task queued up needs to wait
1137
+ // until resources gets freed up (i.e. any running tasks stops and frees enough resources) before it can start progressing.
1138
+ func TestHostResourceManagerTrickleQueue (t * testing.T ) {
1139
+ testTimeout := 1 * time .Minute
1140
+ taskEngine , done , _ := setupWithDefaultConfig (t )
1141
+ defer done ()
1142
+
1143
+ stateChangeEvents := taskEngine .StateChangeEvents ()
1144
+
1145
+ tasks := []* apitask.Task {}
1146
+ for i := 0 ; i < 3 ; i ++ {
1147
+ taskArn := fmt .Sprintf ("taskArn-%d" , i )
1148
+ testTask := createTestTask (taskArn )
1149
+
1150
+ // create container
1151
+ A := createTestContainerWithImageAndName (baseImageForOS , "A" )
1152
+ A .EntryPoint = & entryPointForOS
1153
+ A .Command = []string {"sleep 10" }
1154
+ A .Essential = true
1155
+ testTask .Containers = []* apicontainer.Container {
1156
+ A ,
1157
+ }
1158
+
1159
+ // task memory so that only 2 such tasks can run - 1024 total memory available on instance by getTestHostResources()
1160
+ testTask .Memory = int64 (512 )
1161
+
1162
+ tasks = append (tasks , testTask )
1163
+ }
1164
+
1165
+ // goroutine to trickle tasks to enforce queueing order
1166
+ go func () {
1167
+ taskEngine .AddTask (tasks [0 ])
1168
+ time .Sleep (2 * time .Second )
1169
+ taskEngine .AddTask (tasks [1 ])
1170
+ time .Sleep (2 * time .Second )
1171
+ taskEngine .AddTask (tasks [2 ])
1172
+ }()
1173
+
1174
+ finished := make (chan interface {})
1175
+
1176
+ // goroutine to verify task running order
1177
+ go func () {
1178
+ // Tasks go RUNNING in order
1179
+ verifyContainerRunningStateChange (t , taskEngine )
1180
+ verifyTaskIsRunning (stateChangeEvents , tasks [0 ])
1181
+
1182
+ verifyContainerRunningStateChange (t , taskEngine )
1183
+ verifyTaskIsRunning (stateChangeEvents , tasks [1 ])
1184
+
1185
+ // First task should stop before 3rd task goes RUNNING
1186
+ verifyContainerStoppedStateChange (t , taskEngine )
1187
+ verifyTaskIsStopped (stateChangeEvents , tasks [0 ])
1188
+
1189
+ verifyContainerRunningStateChange (t , taskEngine )
1190
+ verifyTaskIsRunning (stateChangeEvents , tasks [2 ])
1191
+
1192
+ verifyContainerStoppedStateChange (t , taskEngine )
1193
+ verifyTaskIsStopped (stateChangeEvents , tasks [1 ])
1194
+
1195
+ verifyContainerStoppedStateChange (t , taskEngine )
1196
+ verifyTaskIsStopped (stateChangeEvents , tasks [2 ])
1197
+ close (finished )
1198
+ }()
1199
+
1200
+ // goroutine to verify task accounting
1201
+ // After ~4s, 3rd task should be queued up and will not be dequeued until ~10s, i.e. until 1st task stops and gets dequeued
1202
+ go func () {
1203
+ time .Sleep (6 * time .Second )
1204
+ task , err := taskEngine .(* DockerTaskEngine ).topTask ()
1205
+ assert .NoError (t , err , "one task should be queued up after 6s" )
1206
+ assert .Equal (t , task .Arn , tasks [2 ].Arn , "wrong task at top of queue" )
1207
+
1208
+ time .Sleep (6 * time .Second )
1209
+ _ , err = taskEngine .(* DockerTaskEngine ).topTask ()
1210
+ assert .Error (t , err , "no task should be queued up after 12s" )
1211
+ }()
1212
+ waitFinished (t , finished , testTimeout )
1213
+ }
1214
+
1215
+ // This test verifies if a task which is STOPPING does not block other new tasks
1216
+ // from starting if resources for them are available
1217
+ func TestHostResourceManagerResourceUtilization (t * testing.T ) {
1218
+ testTimeout := 1 * time .Minute
1219
+ taskEngine , done , _ := setupWithDefaultConfig (t )
1220
+ defer done ()
1221
+
1222
+ stateChangeEvents := taskEngine .StateChangeEvents ()
1223
+
1224
+ tasks := []* apitask.Task {}
1225
+ for i := 0 ; i < 2 ; i ++ {
1226
+ taskArn := fmt .Sprintf ("IntegTaskArn-%d" , i )
1227
+ testTask := createTestTask (taskArn )
1228
+
1229
+ // create container
1230
+ A := createTestContainerWithImageAndName (baseImageForOS , "A" )
1231
+ A .EntryPoint = & entryPointForOS
1232
+ A .Command = []string {"trap shortsleep SIGTERM; shortsleep() { sleep 6; exit 1; }; sleep 10" }
1233
+ A .Essential = true
1234
+ A .StopTimeout = uint (6 )
1235
+ testTask .Containers = []* apicontainer.Container {
1236
+ A ,
1237
+ }
1238
+
1239
+ tasks = append (tasks , testTask )
1240
+ }
1241
+
1242
+ // Stop task payload from ACS for 1st task
1243
+ stopTask := createTestTask ("IntegTaskArn-0" )
1244
+ stopTask .DesiredStatusUnsafe = apitaskstatus .TaskStopped
1245
+ stopTask .Containers = []* apicontainer.Container {}
1246
+
1247
+ go func () {
1248
+ taskEngine .AddTask (tasks [0 ])
1249
+ time .Sleep (2 * time .Second )
1250
+
1251
+ // single managedTask which should have started
1252
+ assert .Equal (t , 1 , len (taskEngine .(* DockerTaskEngine ).managedTasks ), "exactly one task should be running" )
1253
+
1254
+ // stopTask
1255
+ taskEngine .AddTask (stopTask )
1256
+ time .Sleep (2 * time .Second )
1257
+
1258
+ taskEngine .AddTask (tasks [1 ])
1259
+ }()
1260
+
1261
+ finished := make (chan interface {})
1262
+
1263
+ // goroutine to verify task running order
1264
+ go func () {
1265
+ // Tasks go RUNNING in order, 2nd task doesn't wait for 1st task
1266
+ // to transition to STOPPED as resources are available
1267
+ verifyContainerRunningStateChange (t , taskEngine )
1268
+ verifyTaskIsRunning (stateChangeEvents , tasks [0 ])
1269
+
1270
+ verifyContainerRunningStateChange (t , taskEngine )
1271
+ verifyTaskIsRunning (stateChangeEvents , tasks [1 ])
1272
+
1273
+ // At this time, task[0] stopTask is received, and SIGTERM sent to task
1274
+ // but the task[0] is still RUNNING due to trap handler
1275
+ assert .Equal (t , apitaskstatus .TaskRunning , tasks [0 ].GetKnownStatus (), "task 0 known status should be RUNNING" )
1276
+ assert .Equal (t , apitaskstatus .TaskStopped , tasks [0 ].GetDesiredStatus (), "task 0 status should be STOPPED" )
1277
+
1278
+ // task[0] stops after SIGTERM trap handler finishes
1279
+ verifyContainerStoppedStateChange (t , taskEngine )
1280
+ verifyTaskIsStopped (stateChangeEvents , tasks [0 ])
1281
+
1282
+ // task[1] stops after normal execution
1283
+ verifyContainerStoppedStateChange (t , taskEngine )
1284
+ verifyTaskIsStopped (stateChangeEvents , tasks [1 ])
1285
+
1286
+ close (finished )
1287
+ }()
1288
+
1289
+ waitFinished (t , finished , testTimeout )
1290
+ }
0 commit comments