Skip to content

Commit 9c52334

Browse files
authored
Fixes an potential race condition when multiple signals were sent at the same time. (#3239)
Fixes race condition where if multiple signals are ingested, no case may be created.
1 parent 5bc1320 commit 9c52334

File tree

8 files changed

+149
-22
lines changed

8 files changed

+149
-22
lines changed

src/dispatch/entity/models.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ class Entity(Base, TimeStampMixin, ProjectMixin):
2121
__table_args__ = (UniqueConstraint("name", "project_id"),)
2222

2323
# Columns
24-
id = Column(Integer, primary_key=True)
24+
id = Column(Integer, primary_key=True, autoincrement=True)
2525
name = Column(String)
2626
description = Column(String)
2727
value = Column(String)

src/dispatch/signal/flows.py

+1-6
Original file line numberDiff line numberDiff line change
@@ -107,9 +107,4 @@ def create_signal_instance(
107107
signal_instance = signal_service.create_instance(
108108
db_session=db_session, signal_instance_in=signal_instance_in
109109
)
110-
111-
return signal_instance_create_flow(
112-
db_session=db_session,
113-
signal_instance_id=signal_instance.id,
114-
organization_slug=None,
115-
)
110+
return signal_instance

src/dispatch/signal/models.py

+1
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,7 @@ class SignalFilterMode(DispatchEnum):
116116
class SignalFilterAction(DispatchEnum):
117117
deduplicate = "deduplicate"
118118
snooze = "snooze"
119+
none = "none"
119120

120121

121122
class Signal(Base, TimeStampMixin, ProjectMixin):

src/dispatch/signal/scheduled.py

+23-1
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,13 @@
66
"""
77
import logging
88
from schedule import every
9-
109
from dispatch.database.core import SessionLocal
1110
from dispatch.scheduler import scheduler
1211
from dispatch.project.models import Project
1312
from dispatch.plugin import service as plugin_service
1413
from dispatch.signal import flows as signal_flows
1514
from dispatch.decorators import scheduled_project_task
15+
from dispatch.signal.models import SignalInstance
1616

1717
log = logging.getLogger(__name__)
1818

@@ -49,3 +49,25 @@ def consume_signals(db_session: SessionLocal, project: Project):
4949

5050
if signal_instances:
5151
plugin.instance.delete()
52+
53+
54+
@scheduler.add(every(1).minutes, name="signal-process")
55+
@scheduled_project_task
56+
def process_signals(db_session: SessionLocal, project: Project):
57+
"""Processes signals and create cases if appropriate."""
58+
signal_instances = (
59+
db_session.query(SignalInstance)
60+
.filter(SignalInstance.project_id == project.id)
61+
.filter(SignalInstance.filter_action == None) # noqa
62+
)
63+
for signal_instance in signal_instances:
64+
log.info(f"Attempting to process the following signal: {signal_instance.id}")
65+
try:
66+
signal_flows.signal_instance_create_flow(
67+
db_session=db_session,
68+
project=project,
69+
signal_instance_id=signal_instance.id,
70+
)
71+
except Exception as e:
72+
log.debug(signal_instance)
73+
log.exception(e)

src/dispatch/signal/service.py

+9-1
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
from dispatch.service import service as service_service
1818
from dispatch.tag import service as tag_service
1919
from dispatch.workflow import service as workflow_service
20+
from dispatch.entity.models import Entity
2021

2122
from .models import (
2223
Signal,
@@ -378,8 +379,12 @@ def filter_signal(*, db_session: Session, signal_instance: SignalInstance) -> bo
378379
break
379380

380381
elif f.action == SignalFilterAction.deduplicate:
381-
window = datetime.now(timezone.utc) - timedelta(seconds=f.window)
382+
window = datetime.now(timezone.utc) - timedelta(minutes=f.window)
382383
query = query.filter(SignalInstance.created_at >= window)
384+
query = query.join(SignalInstance.entities).filter(
385+
Entity.id.in_([e.id for e in signal_instance.entities])
386+
)
387+
query = query.filter(SignalInstance.id != signal_instance.id)
383388

384389
# get the earliest instance
385390
query = query.order_by(asc(SignalInstance.created_at))
@@ -392,5 +397,8 @@ def filter_signal(*, db_session: Session, signal_instance: SignalInstance) -> bo
392397
filtered = True
393398
break
394399

400+
if not filtered:
401+
signal_instance.filter_action = SignalFilterAction.none
402+
395403
db_session.commit()
396404
return filtered

src/dispatch/signal/views.py

-6
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
from dispatch.models import OrganizationSlug, PrimaryKey
1212
from dispatch.project import service as project_service
1313
from dispatch.signal import service as signal_service
14-
from dispatch.signal.flows import signal_instance_create_flow
1514

1615
from .models import (
1716
SignalCreate,
@@ -94,11 +93,6 @@ def create_signal_instance(
9493
)
9594
signal_instance.signal = signal
9695
db_session.commit()
97-
98-
background_tasks.add_task(
99-
signal_instance_create_flow, signal_instance.id, organization_slug=organization
100-
)
101-
10296
return signal_instance
10397

10498

tests/factories.py

-1
Original file line numberDiff line numberDiff line change
@@ -801,7 +801,6 @@ class Meta:
801801

802802

803803
class EntityFactory(BaseFactory):
804-
id = Sequence(lambda n: f"1{n}")
805804
name = FuzzyText()
806805
description = FuzzyText()
807806
value = FuzzyText()

tests/signal/test_signal_service.py

+114-6
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,117 @@ def test_delete(session, signal):
5050
assert not get(db_session=session, signal_id=signal.id)
5151

5252

53+
def test_filter_actions_deduplicate_different_entities(session, signal, project):
54+
from dispatch.signal.models import (
55+
SignalFilter,
56+
SignalInstance,
57+
SignalFilterAction,
58+
)
59+
from dispatch.signal.service import filter_signal
60+
from dispatch.entity_type.models import EntityType
61+
from dispatch.entity.models import Entity
62+
63+
entity_type_0 = EntityType(
64+
name="dedupe2-0",
65+
field="id",
66+
regular_expression=None,
67+
project=project,
68+
)
69+
session.add(entity_type_0)
70+
71+
entity_0 = Entity(name="dedupe2", description="test", value="foo", entity_type=entity_type_0)
72+
session.add(entity_0)
73+
74+
entity_1 = Entity(name="dedupe2-1", description="test", value="foo", entity_type=entity_type_0)
75+
session.add(entity_1)
76+
77+
signal_instance_0 = SignalInstance(
78+
raw=json.dumps({"id": "foo"}), project=project, signal=signal, entities=[entity_0]
79+
)
80+
session.add(signal_instance_0)
81+
82+
signal_instance_1 = SignalInstance(
83+
raw=json.dumps({"id": "foo"}), project=project, signal=signal, entities=[entity_1]
84+
)
85+
session.add(signal_instance_1)
86+
session.commit()
87+
88+
# create deduplicate signal filter
89+
signal_filter = SignalFilter(
90+
name="test",
91+
description="dedupe2",
92+
expression=[
93+
{"or": [{"model": "EntityType", "field": "id", "op": "==", "value": entity_type_0.id}]}
94+
],
95+
action=SignalFilterAction.deduplicate,
96+
window=5,
97+
project=project,
98+
)
99+
signal.filters.append(signal_filter)
100+
101+
session.commit()
102+
assert not filter_signal(db_session=session, signal_instance=signal_instance_1)
103+
assert signal_instance_1.filter_action == SignalFilterAction.none
104+
105+
106+
def test_filter_actions_deduplicate_different_entities_types(session, signal, project):
107+
from dispatch.signal.models import (
108+
SignalFilter,
109+
SignalInstance,
110+
SignalFilterAction,
111+
)
112+
from dispatch.signal.service import filter_signal
113+
from dispatch.entity_type.models import EntityType
114+
from dispatch.entity.models import Entity
115+
116+
entity_type_0 = EntityType(
117+
name="dedupe0-0",
118+
field="id",
119+
regular_expression=None,
120+
project=project,
121+
)
122+
session.add(entity_type_0)
123+
entity_0 = Entity(name="dedupe0", description="test", value="foo", entity_type=entity_type_0)
124+
session.add(entity_0)
125+
signal_instance_0 = SignalInstance(
126+
raw=json.dumps({"id": "foo"}), project=project, signal=signal, entities=[entity_0]
127+
)
128+
session.add(signal_instance_0)
129+
130+
entity_type_1 = EntityType(
131+
name="dedupe0-1",
132+
field="id",
133+
regular_expression=None,
134+
project=project,
135+
)
136+
session.add(entity_type_1)
137+
entity_1 = Entity(name="dedupe0-1", description="test", value="foo", entity_type=entity_type_1)
138+
session.add(entity_1)
139+
140+
signal_instance_1 = SignalInstance(
141+
raw=json.dumps({"id": "foo"}), project=project, signal=signal, entities=[entity_1]
142+
)
143+
session.add(signal_instance_1)
144+
session.commit()
145+
146+
# create deduplicate signal filter
147+
signal_filter = SignalFilter(
148+
name="test",
149+
description="dedupe0",
150+
expression=[
151+
{"or": [{"model": "EntityType", "field": "id", "op": "==", "value": entity_type_1.id}]}
152+
],
153+
action=SignalFilterAction.deduplicate,
154+
window=5,
155+
project=project,
156+
)
157+
signal.filters.append(signal_filter)
158+
159+
session.commit()
160+
assert not filter_signal(db_session=session, signal_instance=signal_instance_1)
161+
assert signal_instance_1.filter_action == SignalFilterAction.none
162+
163+
53164
def test_filter_actions_deduplicate(session, signal, project):
54165
from dispatch.signal.models import (
55166
SignalFilter,
@@ -61,14 +172,14 @@ def test_filter_actions_deduplicate(session, signal, project):
61172
from dispatch.entity.models import Entity
62173

63174
entity_type = EntityType(
64-
name="test",
175+
name="dedupe1",
65176
field="id",
66177
regular_expression=None,
67178
project=project,
68179
)
69180
session.add(entity_type)
70181

71-
entity = Entity(name="test", description="test", value="foo", entity_type=entity_type)
182+
entity = Entity(name="dedupe1", description="test", value="foo", entity_type=entity_type)
72183
session.add(entity)
73184

74185
# create instance
@@ -81,13 +192,10 @@ def test_filter_actions_deduplicate(session, signal, project):
81192
raw=json.dumps({"id": "foo"}), project=project, signal=signal, entities=[entity]
82193
)
83194
session.add(signal_instance_2)
84-
signal.entity_types.append(entity_type)
85-
86195
session.commit()
87-
88196
# create deduplicate signal filter
89197
signal_filter = SignalFilter(
90-
name="test",
198+
name="dedupe1",
91199
description="test",
92200
expression=[
93201
{"or": [{"model": "EntityType", "field": "id", "op": "==", "value": entity_type.id}]}

0 commit comments

Comments
 (0)