Skip to content

Commit 8317668

Browse files
committed
Merge branch 'fix_forking'
2 parents 9063af4 + 1136487 commit 8317668

File tree

3 files changed

+87
-75
lines changed

3 files changed

+87
-75
lines changed

aws-flow/lib/aws/decider/task_poller.rb

+10-2
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ def get_decision_task
5252
@domain.decision_tasks.poll_for_single_task(@task_list)
5353
end
5454

55-
def poll_and_process_single_task
55+
def poll_and_process_single_task(opts={})
5656
# TODO waitIfSuspended
5757
begin
5858
@logger.debug "Polling for a new decision task of type #{@handler.workflow_definition_map.keys.map{ |x| "#{x.name} #{x.version}"} } on task_list: #{@task_list}"
@@ -96,6 +96,9 @@ def poll_and_process_single_task
9696
@logger.info Utilities.workflow_task_to_debug_string("Finished executing task", task, @task_list)
9797
rescue AWS::SimpleWorkflow::Errors::UnknownResourceFault => e
9898
@logger.error "Error in the poller, #{e.inspect}"
99+
rescue Interrupt => e
100+
@logger.error "Error in the poller, #{e.inspect}"
101+
raise Interrupt
99102
rescue Exception => e
100103
@logger.error "Error in the poller, #{e.inspect}"
101104
end
@@ -356,7 +359,8 @@ def process_single_task(task)
356359
# *Optional*. Whether to use forking to execute the task. On Windows,
357360
# you should set this to `false`.
358361
#
359-
def poll_and_process_single_task(use_forking = true)
362+
def poll_and_process_single_task(opts = {})
363+
use_forking = opts[:use_forking] || true
360364
@poll_semaphore ||= SuspendableSemaphore.new
361365
@poll_semaphore.acquire
362366
semaphore_needs_release = true
@@ -369,6 +373,10 @@ def poll_and_process_single_task(use_forking = true)
369373
if task
370374
@logger.info Utilities.activity_task_to_debug_string("Got activity task", task)
371375
end
376+
rescue Interrupt => e
377+
@poll_semaphore.release
378+
@logger.error "Error in the poller, #{e.inspect}"
379+
raise Interrupt
372380
rescue Exception => e
373381
@logger.error "Error in the poller, #{e.inspect}"
374382
@poll_semaphore.release

aws-flow/lib/aws/decider/worker.rb

+63-36
Original file line numberDiff line numberDiff line change
@@ -43,17 +43,37 @@ def initialize(service, domain, task_list_to_poll, *args, &block)
4343
if args
4444
args.each { |klass_or_instance| add_implementation(klass_or_instance) }
4545
end
46+
@aws_flow_signals = []
4647
@shutting_down = false
4748
%w{ TERM INT }.each do |signal|
4849
Signal.trap(signal) do
49-
if @shutting_down
50-
@executor.shutdown(0) if @executor
51-
Kernel.exit! 1
52-
else
53-
@shutting_down = true
54-
@shutdown_first_time_function.call if @shutdown_first_time_function
55-
end
50+
@aws_flow_signals << signal
51+
raise Interrupt
52+
end
53+
end
54+
end
55+
56+
def run_once(should_register = false, poller = nil)
57+
register if should_register
58+
poller ||= generate_poller
59+
Kernel.exit if @shutting_down
60+
poller.poll_and_process_single_task(@options)
61+
end
62+
63+
def handle_signals
64+
# This function itself needs to be able to handle interrupts, in case we get them in close succession
65+
begin
66+
return if @aws_flow_signals.empty?
67+
if @shutting_down
68+
@executor.shutdown(0) if @executor
69+
Kernel.exit! 1
70+
else
71+
@shutting_down = true
72+
@shutdown_first_time_function.call if @shutdown_first_time_function
5673
end
74+
rescue Interrupt
75+
@executor.shutdown(0) if @executor
76+
Kernel.exit! 1
5777
end
5878
end
5979

@@ -207,15 +227,15 @@ def register
207227
# first. If {#register} was already called
208228
# for this workflow worker, specify `false`.
209229
#
210-
def start(should_register = true)
230+
def start(should_register = true, poller = nil)
211231
# TODO check to make sure that the correct properties are set
212232
# TODO Register the domain if not already registered
213233
# TODO register types to poll
214234
# TODO Set up throttler
215235
# TODO Set up a timeout on the throttler correctly,
216236
# TODO Make this a generic poller, go to the right kind correctly
217237

218-
poller = WorkflowTaskPoller.new(
238+
poller ||= WorkflowTaskPoller.new(
219239
@service,
220240
@domain,
221241
DecisionTaskHandler.new(@workflow_definition_map, @options),
@@ -226,10 +246,24 @@ def start(should_register = true)
226246
register if should_register
227247
@logger.debug "Starting an infinite loop to poll and process workflow tasks."
228248
loop do
229-
run_once(false, poller)
249+
begin
250+
run_once(false, poller)
251+
rescue Interrupt
252+
handle_signals
253+
end
230254
end
231255
end
232256

257+
258+
def generate_poller
259+
WorkflowTaskPoller.new(
260+
@service,
261+
@domain,
262+
DecisionTaskHandler.new(@workflow_definition_map, @options),
263+
@task_list,
264+
@options
265+
)
266+
end
233267
# Starts the workflow and runs it once, with an optional
234268
# {WorkflowTaskPoller}.
235269
#
@@ -239,18 +273,7 @@ def start(should_register = true)
239273
# An optional {WorkflowTaskPoller} to use.
240274
#
241275
def run_once(should_register = false, poller = nil)
242-
register if should_register
243-
244-
poller = WorkflowTaskPoller.new(
245-
@service,
246-
@domain,
247-
DecisionTaskHandler.new(@workflow_definition_map, @options),
248-
@task_list,
249-
@options
250-
) if poller.nil?
251-
252-
Kernel.exit if @shutting_down
253-
poller.poll_and_process_single_task
276+
super
254277
end
255278
end
256279

@@ -391,7 +414,6 @@ def add_activities_implementation(class_or_instance)
391414
end
392415
end
393416

394-
395417
# Starts the activity that was added to the `ActivityWorker`.
396418
#
397419
# @param [true, false] should_register
@@ -412,10 +434,26 @@ def start(should_register = true)
412434

413435
@logger.debug "Starting an infinite loop to poll and process activity tasks."
414436
loop do
415-
run_once(false, poller)
437+
begin
438+
run_once(false, poller)
439+
rescue Interrupt
440+
handle_signals
441+
end
442+
416443
end
417444
end
418445

446+
def generate_poller
447+
ActivityTaskPoller.new(
448+
@service,
449+
@domain,
450+
@task_list,
451+
@activity_definition_map,
452+
@executor,
453+
@options
454+
)
455+
end
456+
419457
# Starts the activity that was added to the `ActivityWorker` and,
420458
# optionally, sets the {ActivityTaskPoller}.
421459
#
@@ -428,18 +466,7 @@ def start(should_register = true)
428466
# {ActivityTaskPoller} will be created.
429467
#
430468
def run_once(should_register = true, poller = nil)
431-
register if should_register
432-
poller = ActivityTaskPoller.new(
433-
@service,
434-
@domain,
435-
@task_list,
436-
@activity_definition_map,
437-
@executor,
438-
@options
439-
) if poller.nil?
440-
441-
Kernel.exit if @shutting_down
442-
poller.poll_and_process_single_task(@options.use_forking)
469+
super
443470
end
444471
end
445472

aws-flow/spec/aws/decider/unit/worker_spec.rb

+14-37
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,9 @@ def initialize(service, domain, task_list, forking_executor, *args, &block)
3939
end
4040

4141
class FakeTaskPoller < WorkflowTaskPoller
42+
def poll_and_process_single_task(opts={})
43+
dumb_fib(5000)
44+
end
4245
def get_decision_task
4346
nil
4447
end
@@ -134,48 +137,20 @@ class DefaultTasklistTestWorkflow
134137
workflow_worker = WorkflowWorker.new(service, domain, task_list)
135138
workflow_worker.add_workflow_implementation(TestWorkflow)
136139
pid = fork do
137-
loop do
138-
workflow_worker.run_once(true, FakeTaskPoller.new(service, domain, nil, task_list, nil))
139-
end
140+
workflow_worker.start(true, FakeTaskPoller.new(service, domain, nil, task_list, nil))
140141
end
142+
sleep 1
141143
# Send an interrupt to the child process
142144
Process.kill("INT", pid)
143145
# Adding a sleep to let things get setup correctly (not ideal but going with
144146
# this for now)
145-
sleep 5
147+
sleep 2
146148
return_pid, status = Process.wait2(pid, Process::WNOHANG)
147149
Process.kill("KILL", pid) if return_pid.nil?
148150
return_pid.should_not be nil
149151
status.success?.should be_true
150152
end
151153

152-
it "will test whether WorkflowWorker dies cleanly when two interrupts are received" do
153-
class FakeTaskPoller
154-
def poll_and_process_single_task
155-
dumb_fib(5000)
156-
end
157-
end
158-
task_list = "TestWorkflow_tasklist"
159-
service = FakeServiceClient.new
160-
workflow_type_object = double("workflow_type", :name => "TestWorkflow.start", :start_execution => "" )
161-
domain = FakeDomain.new(workflow_type_object)
162-
workflow_worker = WorkflowWorker.new(service, domain, task_list)
163-
workflow_worker.add_workflow_implementation(TestWorkflow)
164-
pid = fork do
165-
loop do
166-
workflow_worker.run_once(true, FakeTaskPoller.new(service, domain, nil, task_list, nil))
167-
end
168-
end
169-
# Send an interrupt to the child process
170-
sleep 3
171-
2.times { Process.kill("INT", pid); sleep 2 }
172-
return_pid, status = Process.wait2(pid, Process::WNOHANG)
173-
174-
Process.kill("KILL", pid) if return_pid.nil?
175-
return_pid.should_not be nil
176-
status.success?.should be_false
177-
end
178-
179154
end
180155

181156
describe ActivityWorker do
@@ -306,6 +281,7 @@ def activity_tasks
306281
sleep 30
307282
end
308283
end
284+
309285
it "will test whether the ActivityWorker shuts down cleanly when an interrupt is received" do
310286

311287
task_list = "TestWorkflow_tasklist"
@@ -319,20 +295,17 @@ def activity_tasks
319295
# handler to the process. When the process exits, the handler checks whether
320296
# the executor's internal is_shutdown variable is set correctly or not.
321297
pid = fork do
322-
at_exit {
323-
activity_worker.executor.is_shutdown.should == true
324-
}
325298
activity_worker.start true
326299
end
327300
# Send an interrupt to the child process
301+
sleep 1
328302
Process.kill("INT", pid)
329303
# Adding a sleep to let things get setup correctly (not ideal but going with
330304
# this for now)
331-
sleep 5
305+
sleep 2
332306
return_pid, status = Process.wait2(pid, Process::WNOHANG)
333307
Process.kill("KILL", pid) if return_pid.nil?
334308
return_pid.should_not be nil
335-
336309
status.success?.should be_true
337310
end
338311

@@ -354,15 +327,19 @@ def activity_tasks
354327
# create a child process to run that task. The task (dumb_fib) is
355328
# purposefully designed to be long running so that we can test our shutdown
356329
# scenario.
330+
357331
pid = fork do
332+
at_exit {
333+
activity_worker.executor.is_shutdown.should == true
334+
}
358335
activity_worker.executor.execute {
359336
dumb_fib(1000)
360337
}
361338
activity_worker.start true
362339
end
363340
# Adding a sleep to let things get setup correctly (not idea but going with
364341
# this for now)
365-
sleep 3
342+
sleep 2
366343
# Send 2 interrupts to the child process
367344
2.times { Process.kill("INT", pid); sleep 3 }
368345
status = Process.waitall

0 commit comments

Comments
 (0)