Skip to content

Commit

Permalink
Merge branch 'main' into dana/pre-process-upload
Browse files Browse the repository at this point in the history
  • Loading branch information
dana-yaish committed Aug 31, 2023
2 parents 6d48fe9 + 36b9432 commit 4941ed0
Show file tree
Hide file tree
Showing 34 changed files with 3,401 additions and 346 deletions.
6 changes: 3 additions & 3 deletions celery_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,15 +124,15 @@ def _beat_schedule():
},
"github_app_webhooks_task": {
"task": gh_app_webhook_check_task_name,
"schedule": crontab(hour="0,6,12,18"),
"schedule": crontab(minute="0", hour="0,6,12,18"),
"kwargs": {
"cron_task_generation_time_iso": BeatLazyFunc(get_utc_now_as_iso_format)
},
},
"trial_expiration_cron": {
"task": trial_expiration_cron_task_name,
# 4 UTC is 12am EDT
"schedule": crontab(hour="4"),
"schedule": crontab(minute="0", hour="4"),
"kwargs": {
"cron_task_generation_time_iso": BeatLazyFunc(get_utc_now_as_iso_format)
},
Expand All @@ -142,7 +142,7 @@ def _beat_schedule():
if get_config("setup", "telemetry", "enabled", default=True):
beat_schedule["brolly_stats_rollup"] = {
"task": brolly_stats_rollup_task_name,
"schedule": crontab(hour="2"),
"schedule": crontab(minute="0", hour="2"),
"kwargs": {
"cron_task_generation_time_iso": BeatLazyFunc(get_utc_now_as_iso_format)
},
Expand Down
24 changes: 24 additions & 0 deletions conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,3 +256,27 @@ def with_sql_functions(dbsession):
$$ language sql stable strict;"""
)
dbsession.flush()


# We don't want any tests submitting checkpoint logs to Sentry for real
@pytest.fixture(autouse=True)
def mock_checkpoint_submit(mocker, request):
# We mock sentry differently in the tests for CheckpointLogger
if request.node.get_closest_marker("real_checkpoint_logger"):
return

def mock_submit_fn(metric, start, end):
pass

mock_submit = mocker.Mock()
mock_submit.side_effect = mock_submit_fn

import helpers

return mocker.patch.object(
helpers.checkpoint_logger.CheckpointLogger,
"submit_subflow",
mock_submit,
)

return mock_submit
4 changes: 2 additions & 2 deletions database/models/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ def should_write_to_storage(self) -> bool:
_report_json_storage_path = Column("report_storage_path", types.Text, nullable=True)
report_json = ArchiveField(
should_write_to_storage_fn=should_write_to_storage,
default_value={},
default_value_class=dict,
)


Expand Down Expand Up @@ -358,7 +358,7 @@ def should_write_to_storage(self) -> bool:
_flare = Column("flare", postgresql.JSON)
_flare_storage_path = Column("flare_storage_path", types.Text, nullable=True)
flare = ArchiveField(
should_write_to_storage_fn=should_write_to_storage, default_value={}
should_write_to_storage_fn=should_write_to_storage, default_value_class=dict
)


Expand Down
2 changes: 1 addition & 1 deletion database/models/reports.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ def _should_write_to_storage(self) -> bool:
files_array = ArchiveField(
should_write_to_storage_fn=_should_write_to_storage,
rehydrate_fn=rehydrate_encoded_data,
default_value=[],
default_value_class=list,
)


Expand Down
4 changes: 1 addition & 3 deletions database/tests/unit/test_model_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,7 @@ def __init__(
self._archive_field_storage_path = archive_value
self.should_write_to_gcs = should_write_to_gcs

archive_field = ArchiveField(
should_write_to_storage_fn=should_write_to_storage, default_value=None
)
archive_field = ArchiveField(should_write_to_storage_fn=should_write_to_storage)

class ClassWithArchiveFieldMissingMethods:
commit: Commit
Expand Down
8 changes: 4 additions & 4 deletions database/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,9 @@ def __init__(
should_write_to_storage_fn: Callable[[object], bool],
rehydrate_fn: Callable[[object, object], Any] = lambda self, x: x,
json_encoder=ReportEncoder,
default_value=None,
default_value_class=lambda: None,
):
self.default_value = default_value
self.default_value_class = default_value_class
self.rehydrate_fn = rehydrate_fn
self.should_write_to_storage_fn = should_write_to_storage_fn
self.json_encoder = json_encoder
Expand Down Expand Up @@ -103,14 +103,14 @@ def _get_value_from_archive(self, obj):
),
)
else:
log.info(
log.debug(
"Both db_field and archive_field are None",
extra=dict(
object_id=obj.id,
commit=obj.get_commitid(),
),
)
return self.default_value
return self.default_value_class()

def __get__(self, obj, objtype=None):
cached_value = getattr(obj, self.cached_value_property_name, None)
Expand Down
98 changes: 98 additions & 0 deletions helpers/checkpoint_logger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
import logging
import time
from enum import Enum, auto

import sentry_sdk
from shared.metrics import metrics

logger = logging.getLogger(__name__)


class UploadFlow(str, Enum):
UPLOAD_TASK_BEGIN = auto()
PROCESSING_BEGIN = auto()
INITIAL_PROCESSING_COMPLETE = auto()
BATCH_PROCESSING_COMPLETE = auto()
PROCESSING_COMPLETE = auto()
NOTIFIED = auto()


def _get_milli_timestamp():
return time.time_ns() // 1000000


def _kwargs_key(cls):
return f"checkpoints_{cls.__name__}"


def from_kwargs(cls, kwargs, strict=False):
data = kwargs.get(_kwargs_key(cls), {})

# Make sure these checkpoints were made with the same flow
for key in data.keys():
if key not in iter(cls):
raise ValueError(f"Checkpoint {key} not part of flow `{cls.__name__}`")

return CheckpointLogger(cls, data, strict)


class CheckpointLogger:
def __init__(self, cls, data=None, strict=False):
self.cls = cls
self.data = data if data else {}
self.kwargs_key = _kwargs_key(self.cls)
self.strict = strict

def _error(self, msg):
# When a new version of worker rolls out, it will pick up tasks that
# may have been enqueued by the old worker and be missing checkpoints
# data. At least for that reason, we want to allow failing softly.
metrics.incr("worker.checkpoint_logger.error")
if self.strict:
raise ValueError(msg)
else:
logger.warning(msg)

def _validate_checkpoint(self, checkpoint):
if checkpoint.__class__ != self.cls:
# This error is not ignored when `self.strict==False` because it's definitely
# a code mistake
raise ValueError(
f"Checkpoint {checkpoint} not part of flow `{self.cls.__name__}`"
)

def _subflow_duration(self, start, end):
self._validate_checkpoint(start)
self._validate_checkpoint(end)
if start not in self.data:
return self._error(
f"Cannot compute duration; missing start checkpoint {start}"
)
elif end not in self.data:
return self._error(f"Cannot compute duration; missing end checkpoint {end}")
elif end.value <= start.value:
# This error is not ignored when `self.strict==False` because it's definitely
# a code mistake
raise ValueError(
f"Cannot compute duration; end {end} is not after start {start}"
)

return self.data[end] - self.data[start]

def log(self, checkpoint, ignore_repeat=False, kwargs=None):
if checkpoint not in self.data:
self._validate_checkpoint(checkpoint)
self.data[checkpoint] = _get_milli_timestamp()
elif not ignore_repeat:
self._error(f"Already recorded checkpoint {checkpoint}")

if kwargs is not None:
kwargs[self.kwargs_key] = self.data

return self

def submit_subflow(self, metric, start, end):
duration = self._subflow_duration(start, end)
sentry_sdk.set_measurement(metric, duration, "milliseconds")

return self
2 changes: 1 addition & 1 deletion helpers/sentry.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def initialize_sentry() -> None:
environment=os.getenv("DD_ENV", "production"),
traces_sample_rate=float(os.environ.get("SERVICES__SENTRY__SAMPLE_RATE", 1)),
integrations=[
CeleryIntegration(),
CeleryIntegration(monitor_beat_tasks=True),
SqlalchemyIntegration(),
RedisIntegration(),
HttpxIntegration(),
Expand Down
Loading

0 comments on commit 4941ed0

Please sign in to comment.