Skip to content

Commit 83f8f28

Browse files
authored
Respect survey schedules before sending SMS (#2380)
1 parent 1404b09 commit 83f8f28

File tree

5 files changed

+126
-53
lines changed

5 files changed

+126
-53
lines changed

lib/ask/runtime/channel_broker.ex

+32-21
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,8 @@ defmodule Ask.Runtime.ChannelBroker do
3636
call(channel_id, {:has_delivery_confirmation?})
3737
end
3838

39-
def ask(channel_id, channel_type, respondent, token, reply) do
40-
cast(channel_id, {:ask, channel_type, respondent, token, reply})
39+
def ask(channel_id, channel_type, respondent, token, reply, not_before \\ nil, not_after \\ nil) do
40+
cast(channel_id, {:ask, channel_type, respondent, token, reply, not_before, not_after})
4141
end
4242

4343
def has_queued_message?(channel_id, respondent_id) do
@@ -145,16 +145,18 @@ defmodule Ask.Runtime.ChannelBroker do
145145
end
146146

147147
@impl true
148-
def handle_cast({:ask, "sms", respondent, token, reply}, state) do
148+
def handle_cast({:ask, "sms", respondent, token, reply, not_before, not_after}, state) do
149149
debug("handle_cast[ask]",
150150
channel_type: "sms",
151151
channel_id: state.channel_id,
152152
respondent_id: respondent.id,
153153
token: token,
154-
reply: reply
154+
reply: reply,
155+
not_before: not_before,
156+
not_after: not_after
155157
)
156158

157-
contact = {respondent, token, reply}
159+
contact = {respondent, token, reply, not_before, not_after}
158160
refreshed_state = refresh_runtime_channel(state)
159161
size = messages_count(refreshed_state, respondent, reply)
160162

@@ -410,32 +412,41 @@ defmodule Ask.Runtime.ChannelBroker do
410412
# Activates the next queued contact. There must be at least one contact in
411413
# queue. It doesn't verify if the channel capacity has been reached!
412414
defp activate_next_queued_contact(state) do
413-
{new_state, unqueued_item} =
415+
{new_broker_state, unqueued_item} =
414416
state
415417
|> refresh_runtime_channel()
416418
|> State.activate_next_in_queue()
417419

418420
case unqueued_item do
419421
{respondent_id, token, not_before, not_after} ->
420-
Respondent.with_lock(respondent_id, fn respondent ->
421-
cond do
422-
expired_call?(not_after) ->
423-
new_state = State.deactivate_contact(new_state, respondent.id)
424-
Ask.Runtime.Survey.contact_attempt_expired(respondent)
425-
new_state
426-
427-
true ->
428-
ivr_call(new_state, respondent, token, not_before, not_after)
429-
end
422+
contact_or_mark_as_expired(respondent_id, not_after, new_broker_state, fn (respondent) ->
423+
ivr_call(new_broker_state, respondent, token, not_before, not_after)
430424
end)
431-
{respondent_id, token, reply} ->
432-
Respondent.with_lock(respondent_id, fn respondent ->
433-
channel_ask(new_state, respondent, token, reply)
425+
{respondent_id, token, reply, _not_before, not_after} ->
426+
contact_or_mark_as_expired(respondent_id, not_after, new_broker_state, fn (respondent) ->
427+
channel_ask(new_broker_state, respondent, token, reply)
434428
end)
435-
end
429+
end
436430
end
437431

438-
defp expired_call?(not_after) do
432+
defp contact_or_mark_as_expired(respondent_id, not_after, new_broker_state, do_contact) do
433+
Respondent.with_lock(respondent_id, fn respondent ->
434+
cond do
435+
expired_contact_attempt?(not_after) ->
436+
new_state = State.deactivate_contact(new_broker_state, respondent.id)
437+
Ask.Runtime.Survey.contact_attempt_expired(respondent)
438+
new_state
439+
440+
true ->
441+
do_contact.(respondent)
442+
end
443+
end)
444+
end
445+
446+
defp expired_contact_attempt?(nil) do
447+
false
448+
end
449+
defp expired_contact_attempt?(not_after) do
439450
DateTime.compare(not_after, Ask.SystemTime.time().now) != :gt
440451
end
441452

lib/ask/runtime/channel_broker_state.ex

+4-4
Original file line numberDiff line numberDiff line change
@@ -94,16 +94,16 @@ defmodule Ask.Runtime.ChannelBrokerState do
9494
end
9595

9696
# Adds an SMS contact to the queue with given priority (`:high`, `:normal`).
97-
def queue_contact(state, {respondent, token, reply}, size, priority) do
97+
def queue_contact(state, {respondent, token, reply, not_before, not_after}, size, priority) do
9898
Queue.upsert!(%{
9999
channel_id: state.channel_id,
100100
respondent_id: respondent.id,
101101
queued_at: SystemTime.time().now,
102102
priority: priority,
103103
size: size,
104104
token: token,
105-
not_before: nil,
106-
not_after: nil,
105+
not_before: not_before,
106+
not_after: not_after,
107107
reply: reply
108108
})
109109

@@ -173,7 +173,7 @@ defmodule Ask.Runtime.ChannelBrokerState do
173173
end
174174

175175
defp to_item("sms", contact) do
176-
{contact.respondent_id, contact.token, contact.reply}
176+
{contact.respondent_id, contact.token, contact.reply, contact.not_before, contact.not_after}
177177
end
178178

179179
# Increments the number of contacts for the respondent. Activates the contact

lib/ask/runtime/session.ex

+30-22
Original file line numberDiff line numberDiff line change
@@ -245,30 +245,27 @@ defmodule Ask.Runtime.Session do
245245
end
246246
end
247247

248-
def contact_respondent(%{current_mode: %SMSMode{}} = session) do
248+
def contact_respondent(%{schedule: schedule, current_mode: %SMSMode{}} = session) do
249249
token = Ecto.UUID.generate()
250250

251251
respondent = session.respondent
252252
{:ok, _flow, reply} = Flow.retry(session.flow, TextVisitor.new("sms"), respondent.disposition)
253253
channel = session.current_mode.channel
254254
log_prompts(reply, channel, session.flow.mode, respondent)
255255

256-
ChannelBroker.ask(channel.id, channel.type, session.respondent, token, reply)
256+
{not_before, not_after} = acceptable_contact_time_window(schedule)
257+
258+
ChannelBroker.ask(channel.id, channel.type, session.respondent, token, reply, not_before, not_after)
257259

260+
# TODO: what happens with this when contact attempt falls outside acceptable window
258261
respondent = Respondent.update_stats(respondent.id, reply)
259262
%{session | token: token, respondent: respondent}
260263
end
261264

262265
def contact_respondent(%{schedule: schedule, current_mode: %IVRMode{}} = session) do
263266
token = Ecto.UUID.generate()
264267

265-
next_available_date_time =
266-
schedule
267-
|> Schedule.next_available_date_time()
268-
269-
today_end_time =
270-
schedule
271-
|> Schedule.at_end_time(next_available_date_time)
268+
{not_before, not_after} = acceptable_contact_time_window(schedule)
272269

273270
channel = session.current_mode.channel
274271

@@ -277,8 +274,8 @@ defmodule Ask.Runtime.Session do
277274
channel.type,
278275
session.respondent,
279276
token,
280-
next_available_date_time,
281-
today_end_time
277+
not_before,
278+
not_after
282279
)
283280

284281
%{session | token: token}
@@ -388,9 +385,13 @@ defmodule Ask.Runtime.Session do
388385
flow: flow,
389386
respondent: respondent,
390387
token: token,
391-
current_mode: %SMSMode{channel: channel}
388+
current_mode: %SMSMode{channel: channel},
389+
schedule: schedule
392390
} = session
393391
) do
392+
393+
{not_before, not_after} = acceptable_contact_time_window(schedule)
394+
394395
case flow
395396
|> Flow.step(
396397
session.current_mode |> SessionMode.visitor(),
@@ -401,7 +402,7 @@ defmodule Ask.Runtime.Session do
401402
if Reply.prompts(reply) != [] do
402403
log_prompts(reply, channel, flow.mode, respondent, true)
403404

404-
ChannelBroker.ask(channel.id, channel.type, respondent, token, reply)
405+
ChannelBroker.ask(channel.id, channel.type, respondent, token, reply, not_before, not_after)
405406

406407
respondent = Respondent.update_stats(respondent.id, reply)
407408
{:end, reply, respondent}
@@ -422,7 +423,7 @@ defmodule Ask.Runtime.Session do
422423

423424
log_prompts(reply, channel, flow.mode, respondent)
424425

425-
ChannelBroker.ask(channel.id, channel.type, respondent, token, reply)
426+
ChannelBroker.ask(channel.id, channel.type, respondent, token, reply, not_before, not_after)
426427

427428
respondent = Respondent.update_stats(respondent.id, reply)
428429
{:ok, %{session | flow: flow, respondent: respondent}, reply, current_timeout(session)}
@@ -472,21 +473,16 @@ defmodule Ask.Runtime.Session do
472473
schedule: schedule
473474
} = session
474475
) do
475-
next_available_date_time =
476-
schedule
477-
|> Schedule.next_available_date_time()
478476

479-
today_end_time =
480-
schedule
481-
|> Schedule.at_end_time(next_available_date_time)
477+
{not_before, not_after} = acceptable_contact_time_window(schedule)
482478

483479
ChannelBroker.setup(
484480
channel.id,
485481
channel.type,
486482
respondent,
487483
token,
488-
next_available_date_time,
489-
today_end_time
484+
not_before,
485+
not_after
490486
)
491487

492488
{:ok, %{session | respondent: respondent}, %Reply{}, current_timeout(session)}
@@ -1043,4 +1039,16 @@ defmodule Ask.Runtime.Session do
10431039
until = Schedule.next_available_date_time(session.schedule)
10441040
Interval.new(from: from, until: until) |> Interval.duration(:minutes)
10451041
end
1042+
1043+
defp acceptable_contact_time_window(schedule) do
1044+
not_before =
1045+
schedule
1046+
|> Schedule.next_available_date_time()
1047+
1048+
not_after =
1049+
schedule
1050+
|> Schedule.at_end_time(not_before)
1051+
1052+
{not_before, not_after}
1053+
end
10461054
end

test/ask/runtime/channel_broker_state_test.exs

+7-4
Original file line numberDiff line numberDiff line change
@@ -68,10 +68,13 @@ defmodule Ask.Runtime.ChannelBrokerStateTest do
6868
now = DateTime.utc_now()
6969
mock_time(now)
7070

71+
five_seconds_ago = DateTime.add(now, -5, :second)
72+
in_one_minute = DateTime.add(now, 60, :second)
73+
7174
State.new(0, "sms", %{})
72-
|> State.queue_contact({%{id: 2, disposition: :queued}, "secret", []}, 5)
75+
|> State.queue_contact({%{id: 2, disposition: :queued}, "secret", [], five_seconds_ago, in_one_minute}, 5)
7376

74-
assert [%{respondent_id: 2, size: 5, queued_at: now, reply: []}] = Queue.queued_contacts(0)
77+
assert [%{respondent_id: 2, size: 5, queued_at: now, reply: [], not_before: five_seconds_ago, not_after: in_one_minute}] = Queue.queued_contacts(0)
7578
assert [] = Queue.active_contacts(0)
7679
end
7780

@@ -214,10 +217,10 @@ defmodule Ask.Runtime.ChannelBrokerStateTest do
214217

215218
{_, contact} =
216219
State.new(0, "sms", %{})
217-
|> State.queue_contact({respondent, "secret", []}, 2)
220+
|> State.queue_contact({respondent, "secret", [], nil, nil}, 2)
218221
|> State.activate_next_in_queue()
219222

220-
assert {^respondent_id, "secret", []} = contact
223+
assert {^respondent_id, "secret", [], nil, nil} = contact
221224
end
222225

223226
@tag :time_mock

test/ask/runtime/channel_broker_test.exs

+53-2
Original file line numberDiff line numberDiff line change
@@ -241,6 +241,49 @@ defmodule Ask.Runtime.ChannelBrokerTest do
241241
end
242242

243243
describe "Nuntium" do
244+
defp activate_respondent(state, "sms", respondent, not_before, not_after) do
245+
{_, state, _} =
246+
ChannelBroker.handle_cast(
247+
{:ask, "sms", respondent, "token", "foo", not_before, not_after},
248+
state
249+
)
250+
251+
state
252+
end
253+
254+
test "Respect schedules for SMS" do
255+
%{state: state, respondents: respondents, test_channel: test_channel} = build_survey("sms")
256+
257+
now = SystemTime.time().now
258+
not_before = DateTime.add(now, 5, :second)
259+
not_after = DateTime.add(now, 60, :second)
260+
expired = DateTime.add(now, -5, :second)
261+
262+
with_mock Ask.Runtime.Survey, [contact_attempt_expired: fn _ -> :ok end] do
263+
state =
264+
state
265+
|> activate_respondent("sms", Enum.at(respondents, 0), now, not_after)
266+
|> activate_respondent("sms", Enum.at(respondents, 1), not_before, expired)
267+
|> activate_respondent("sms", Enum.at(respondents, 2), not_before, expired)
268+
|> activate_respondent("sms", Enum.at(respondents, 3), not_before, not_after)
269+
|> activate_respondent("sms", Enum.at(respondents, 4), not_before, not_after)
270+
271+
expected_active_respondents = [
272+
Enum.at(respondents, 0),
273+
Enum.at(respondents, 3),
274+
Enum.at(respondents, 4)
275+
]
276+
277+
assert Enum.map(expected_active_respondents, fn r -> r.id end) == active_respondent_ids(state)
278+
279+
assert_sent_smss(expected_active_respondents, test_channel)
280+
assert_not_sent_smss([
281+
Enum.at(respondents, 1),
282+
Enum.at(respondents, 2),
283+
], test_channel)
284+
end
285+
end
286+
244287
test "Every SMS is sent when capacity isn't set", %{} do
245288
channel_capacity = nil
246289
[test_channel, respondents, _channel] = initialize_survey("sms", channel_capacity)
@@ -466,6 +509,12 @@ defmodule Ask.Runtime.ChannelBrokerTest do
466509
refute_received [:ask, ^test_channel, _respondent, _token, _reply, _channel_id]
467510
end
468511

512+
defp assert_not_sent_smss(respondents, test_channel) do
513+
Enum.each(respondents, fn %{id: id} ->
514+
refute_received [:ask, ^test_channel, %{id: ^id}, _token, _reply, _channel_id]
515+
end)
516+
end
517+
469518
defp assert_some_sent_smss(amount, respondents, test_channel) do
470519
respondent_ids = Enum.map(respondents, & &1.id)
471520

@@ -511,7 +560,7 @@ defmodule Ask.Runtime.ChannelBrokerTest do
511560
config: Config.channel_broker_config()
512561
}
513562

514-
%{state: state, respondents: respondents, channel: channel}
563+
%{state: state, respondents: respondents, channel: channel, test_channel: test_channel}
515564
end
516565

517566
defp start_survey(channel_type) do
@@ -547,7 +596,9 @@ defmodule Ask.Runtime.ChannelBrokerTest do
547596
end
548597

549598
defp respondent_to_contact("sms", respondent) do
550-
{respondent, "secret", []}
599+
not_before = SystemTime.time().now |> DateTime.add(-3600, :second)
600+
not_after = SystemTime.time().now |> DateTime.add(3600, :second)
601+
{respondent, "secret", [], not_before, not_after}
551602
end
552603

553604
defp channel_state("ivr", respondent) do

0 commit comments

Comments
 (0)