@@ -1131,3 +1131,162 @@ func TestDockerExecAPI(t *testing.T) {
1131
1131
1132
1132
waitFinished (t , finished , testTimeout )
1133
1133
}
1134
+
1135
+ // This integ test checks for task queuing behavior in waitingTaskQueue which is dependent on hostResourceManager.
1136
+ // First two tasks totally consume the available memory resource on the host. So the third task queued up needs to wait
1137
+ // until resources gets freed up (i.e. any running tasks stops and frees enough resources) before it can start progressing.
1138
+ func TestHostResourceManagerTrickleQueue (t * testing.T ) {
1139
+ testTimeout := 1 * time .Minute
1140
+ taskEngine , done , _ := setupWithDefaultConfig (t )
1141
+ defer done ()
1142
+
1143
+ stateChangeEvents := taskEngine .StateChangeEvents ()
1144
+
1145
+ tasks := []* apitask.Task {}
1146
+ for i := 0 ; i < 3 ; i ++ {
1147
+ taskArn := fmt .Sprintf ("taskArn-%d" , i )
1148
+ testTask := createTestTask (taskArn )
1149
+
1150
+ // create container
1151
+ A := createTestContainerWithImageAndName (baseImageForOS , "A" )
1152
+ A .EntryPoint = & entryPointForOS
1153
+ A .Command = []string {"sleep 10" }
1154
+ A .Essential = true
1155
+ testTask .Containers = []* apicontainer.Container {
1156
+ A ,
1157
+ }
1158
+
1159
+ // task memory so that only 2 such tasks can run - 1024 total memory available on instance by getTestHostResources()
1160
+ testTask .Memory = int64 (512 )
1161
+
1162
+ tasks = append (tasks , testTask )
1163
+ }
1164
+
1165
+ // goroutine to trickle tasks to enforce queueing order
1166
+ go func () {
1167
+ taskEngine .AddTask (tasks [0 ])
1168
+ time .Sleep (2 * time .Second )
1169
+ taskEngine .AddTask (tasks [1 ])
1170
+ time .Sleep (2 * time .Second )
1171
+ taskEngine .AddTask (tasks [2 ])
1172
+ }()
1173
+
1174
+ finished := make (chan interface {})
1175
+
1176
+ // goroutine to verify task running order
1177
+ go func () {
1178
+ // Tasks go RUNNING in order
1179
+ verifyContainerRunningStateChange (t , taskEngine )
1180
+ verifyTaskIsRunning (stateChangeEvents , tasks [0 ])
1181
+
1182
+ verifyContainerRunningStateChange (t , taskEngine )
1183
+ verifyTaskIsRunning (stateChangeEvents , tasks [1 ])
1184
+
1185
+ // First task should stop before 3rd task goes RUNNING
1186
+ verifyContainerStoppedStateChange (t , taskEngine )
1187
+ verifyTaskIsStopped (stateChangeEvents , tasks [0 ])
1188
+
1189
+ verifyContainerRunningStateChange (t , taskEngine )
1190
+ verifyTaskIsRunning (stateChangeEvents , tasks [2 ])
1191
+
1192
+ verifyContainerStoppedStateChange (t , taskEngine )
1193
+ verifyTaskIsStopped (stateChangeEvents , tasks [1 ])
1194
+
1195
+ verifyContainerStoppedStateChange (t , taskEngine )
1196
+ verifyTaskIsStopped (stateChangeEvents , tasks [2 ])
1197
+ close (finished )
1198
+ }()
1199
+
1200
+ // goroutine to verify task accounting
1201
+ // After ~4s, 3rd task should be queued up and will not be dequeued until ~10s, i.e. until 1st task stops and gets dequeued
1202
+ go func () {
1203
+ time .Sleep (6 * time .Second )
1204
+ task , err := taskEngine .(* DockerTaskEngine ).topTask ()
1205
+ assert .NoError (t , err , "one task should be queued up after 6s" )
1206
+ assert .Equal (t , task .Arn , tasks [2 ].Arn , "wrong task at top of queue" )
1207
+
1208
+ time .Sleep (6 * time .Second )
1209
+ _ , err = taskEngine .(* DockerTaskEngine ).topTask ()
1210
+ assert .Error (t , err , "no task should be queued up after 12s" )
1211
+ }()
1212
+ waitFinished (t , finished , testTimeout )
1213
+ }
1214
+
1215
+ // This test verifies if a task which is STOPPING does not block other new tasks
1216
+ // from starting if resources for them are available
1217
+ func TestHostResourceManagerResourceUtilization (t * testing.T ) {
1218
+ testTimeout := 1 * time .Minute
1219
+ taskEngine , done , _ := setupWithDefaultConfig (t )
1220
+ defer done ()
1221
+
1222
+ stateChangeEvents := taskEngine .StateChangeEvents ()
1223
+
1224
+ tasks := []* apitask.Task {}
1225
+ for i := 0 ; i < 2 ; i ++ {
1226
+ taskArn := fmt .Sprintf ("IntegTaskArn-%d" , i )
1227
+ testTask := createTestTask (taskArn )
1228
+
1229
+ // create container
1230
+ A := createTestContainerWithImageAndName (baseImageForOS , "A" )
1231
+ A .EntryPoint = & entryPointForOS
1232
+ A .Command = []string {"trap shortsleep SIGTERM; shortsleep() { sleep 10; exit 1; }; sleep 30" }
1233
+ A .Essential = true
1234
+ testTask .Containers = []* apicontainer.Container {
1235
+ A ,
1236
+ }
1237
+
1238
+ // task memory so that only 2 such tasks can run - 1024 total memory available on instance by getTestHostResources()
1239
+ testTask .Memory = int64 (512 )
1240
+
1241
+ tasks = append (tasks , testTask )
1242
+ }
1243
+
1244
+ // Stop task payload from ACS for 1st task
1245
+ stopTask := createTestTask ("IntegTaskArn-0" )
1246
+ stopTask .DesiredStatusUnsafe = apitaskstatus .TaskStopped
1247
+ stopTask .Containers = []* apicontainer.Container {}
1248
+
1249
+ go func () {
1250
+ taskEngine .AddTask (tasks [0 ])
1251
+ time .Sleep (2 * time .Second )
1252
+
1253
+ // single managedTask which should have started
1254
+ assert .Equal (t , 1 , len (taskEngine .(* DockerTaskEngine ).managedTasks ), "exactly one task should be running" )
1255
+
1256
+ // stopTask
1257
+ taskEngine .AddTask (stopTask )
1258
+ time .Sleep (2 * time .Second )
1259
+
1260
+ taskEngine .AddTask (tasks [1 ])
1261
+ }()
1262
+
1263
+ finished := make (chan interface {})
1264
+
1265
+ // goroutine to verify task running order
1266
+ go func () {
1267
+ // Tasks go RUNNING in order, 2nd task doesn't wait for 1st task
1268
+ // to transition to STOPPED as resources are available
1269
+ verifyContainerRunningStateChange (t , taskEngine )
1270
+ verifyTaskIsRunning (stateChangeEvents , tasks [0 ])
1271
+
1272
+ verifyContainerRunningStateChange (t , taskEngine )
1273
+ verifyTaskIsRunning (stateChangeEvents , tasks [1 ])
1274
+
1275
+ // At this time, task[0] stopTask is received, and SIGTERM sent to task
1276
+ // but the task[0] is still RUNNING due to trap handler
1277
+ assert .Equal (t , apitaskstatus .TaskRunning , tasks [0 ].GetKnownStatus (), "task 0 known status should be RUNNING" )
1278
+ assert .Equal (t , apitaskstatus .TaskStopped , tasks [0 ].GetDesiredStatus (), "task 0 status should be STOPPED" )
1279
+
1280
+ // task[0] stops after SIGTERM trap handler finishes
1281
+ verifyContainerStoppedStateChange (t , taskEngine )
1282
+ verifyTaskIsStopped (stateChangeEvents , tasks [0 ])
1283
+
1284
+ // task[1] stops after normal execution
1285
+ verifyContainerStoppedStateChange (t , taskEngine )
1286
+ verifyTaskIsStopped (stateChangeEvents , tasks [1 ])
1287
+
1288
+ close (finished )
1289
+ }()
1290
+
1291
+ waitFinished (t , finished , testTimeout )
1292
+ }
0 commit comments