From c3cdba0fcf2cdb4c2cf713fb13d76893ef7b7e34 Mon Sep 17 00:00:00 2001 From: Michael Zhao Date: Thu, 29 May 2025 15:28:40 -0400 Subject: [PATCH 01/21] set context for sqs-> lambda --- datadog_lambda/wrapper.py | 39 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 39 insertions(+) diff --git a/datadog_lambda/wrapper.py b/datadog_lambda/wrapper.py index 86bbf04d8..7a4b29f64 100644 --- a/datadog_lambda/wrapper.py +++ b/datadog_lambda/wrapper.py @@ -79,6 +79,7 @@ DD_REQUESTS_SERVICE_NAME = "DD_REQUESTS_SERVICE_NAME" DD_SERVICE = "DD_SERVICE" DD_ENV = "DD_ENV" +DD_DATA_STREAMS_ENABLED = "DD_DATA_STREAMS_ENABLED" def get_env_as_int(env_key, default_value: int) -> int: @@ -190,6 +191,9 @@ def __init__(self, func): self.min_cold_start_trace_duration = get_env_as_int( DD_MIN_COLD_START_DURATION, 3 ) + self.data_streams_enabled = ( + os.environ.get(DD_DATA_STREAMS_ENABLED, "false").lower() == "true" + ) self.local_testing_mode = os.environ.get( DD_LOCAL_TEST, "false" ).lower() in ("true", "1") @@ -287,6 +291,41 @@ def _inject_authorizer_span_headers(self, request_id): self.response["context"]["_datadog"] = datadog_data def _before(self, event, context): + + from ddtrace.internal.datastreams.processor import ( + DataStreamsProcessor as processor, + DsmPathwayCodec, + ) + from ddtrace.internal.datastreams.botocore import ( + get_datastreams_context, + calculate_sqs_payload_size, + ) + + def _dsm_set_sqs_context(record): + try: + queue_arn = record.get("eventSourceARN", "") + + contextjson = get_datastreams_context(record) + payload_size = calculate_sqs_payload_size(record) + + ctx = DsmPathwayCodec.decode(contextjson, processor()) + ctx.set_checkpoint( + ["direction:in", "queue:arn:" + queue_arn, "type:sqs"], + payload_size=payload_size, + ) + + except Exception as e: + logger.error(format_err_with_traceback(e)) + + if self.data_streams_enabled: + if isinstance(event, dict) and "Records" in event and event["Records"]: + sqs_records = [ + r for r in event["Records"] if r.get("eventSource") == "aws:sqs" + ] + if sqs_records: + for record in sqs_records: + _dsm_set_sqs_context(record) + try: self.response = None set_cold_start(init_timestamp_ns) From 8a5a33f2f7f2d1132e9b16e57566968343b7d186 Mon Sep 17 00:00:00 2001 From: Michael Zhao Date: Fri, 30 May 2025 09:50:02 -0400 Subject: [PATCH 02/21] needed queue name for consistent hash --- datadog_lambda/wrapper.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/datadog_lambda/wrapper.py b/datadog_lambda/wrapper.py index 7a4b29f64..3a86c56fc 100644 --- a/datadog_lambda/wrapper.py +++ b/datadog_lambda/wrapper.py @@ -304,13 +304,14 @@ def _before(self, event, context): def _dsm_set_sqs_context(record): try: queue_arn = record.get("eventSourceARN", "") + queue_name = queue_arn.split(":")[-1] contextjson = get_datastreams_context(record) payload_size = calculate_sqs_payload_size(record) ctx = DsmPathwayCodec.decode(contextjson, processor()) ctx.set_checkpoint( - ["direction:in", "queue:arn:" + queue_arn, "type:sqs"], + ["direction:in", "topic:" + queue_name, "type:sqs"], payload_size=payload_size, ) From a0ade0fad720cde679543433f5989c6c5ba29f3f Mon Sep 17 00:00:00 2001 From: Michael Zhao Date: Fri, 30 May 2025 13:58:56 -0400 Subject: [PATCH 03/21] add context prop test --- tests/test_wrapper.py | 165 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 165 insertions(+) diff --git a/tests/test_wrapper.py b/tests/test_wrapper.py index f46b365ee..79367fa6b 100644 --- a/tests/test_wrapper.py +++ b/tests/test_wrapper.py @@ -563,6 +563,171 @@ def return_type_test(event, context): self.assertEqual(result, test_result) self.assertFalse(MockPrintExc.called) + @patch.dict(os.environ, {"DD_DATA_STREAMS_ENABLED": "true"}) + def test_datadog_lambda_wrapper_dsm_sqs_context_pathway_verification(self): + with patch( + "ddtrace.internal.datastreams.processor.get_connection" + ) as mock_get_connection: + + mock_conn = unittest.mock.MagicMock() + mock_response = unittest.mock.MagicMock() + mock_response.status = 200 + mock_conn.getresponse.return_value = mock_response + mock_get_connection.return_value = mock_conn + + def updated_get_datastreams_context(message): + """ + Updated version that handles the correct message formats + """ + import base64 + import json + + context_json = None + message_body = message + try: + body = message.get("Body") + if body: + message_body = json.loads(body) + except (ValueError, TypeError): + pass + + message_attributes = message_body.get( + "MessageAttributes" + ) or message_body.get("messageAttributes") + if not message_attributes: + return None + + if "_datadog" not in message_attributes: + return None + + datadog_attr = message_attributes["_datadog"] + + if message_body.get("Type") == "Notification": + if datadog_attr.get("Type") == "Binary": + context_json = json.loads( + base64.b64decode(datadog_attr["Value"]).decode() + ) + elif "StringValue" in datadog_attr: + context_json = json.loads(datadog_attr["StringValue"]) + elif "stringValue" in datadog_attr: + context_json = json.loads(datadog_attr["stringValue"]) + elif "BinaryValue" in datadog_attr: + context_json = json.loads(datadog_attr["BinaryValue"].decode()) + else: + print(f"DEBUG: Unhandled datadog_attr format: {datadog_attr}") + + return context_json + + with patch( + "ddtrace.internal.datastreams.botocore.get_datastreams_context", + updated_get_datastreams_context, + ): + + # Step 1: Create a message with some context in the message attributes + + from ddtrace.internal.datastreams.processor import DataStreamsProcessor + + processor_instance = DataStreamsProcessor() + + with patch( + "ddtrace.internal.datastreams.processor.DataStreamsProcessor", + return_value=processor_instance, + ): + + parent_ctx = processor_instance.new_pathway() + + parent_ctx.set_checkpoint( + ["direction:out", "topic:upstream-topic", "type:sqs"], + now_sec=1640995200.0, + payload_size=512, + ) + parent_hash = parent_ctx.hash + encoded_parent_context = parent_ctx.encode_b64() + + sqs_event = { + "Records": [ + { + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:us-east-1:123456789012:test", + "Body": "test message body", + "messageAttributes": { + "_datadog": { + "stringValue": json.dumps( + { + "dd-pathway-ctx-base64": encoded_parent_context + } + ) + } + }, + } + ] + } + + # Step 2: Call the handler + @wrapper.datadog_lambda_wrapper + def lambda_handler(event, context): + return {"statusCode": 200, "body": "processed"} + + result = lambda_handler(sqs_event, get_mock_context()) + self.assertEqual(result["statusCode"], 200) + + # New context set after handler call + current_ctx = processor_instance._current_context.value + self.assertIsNotNone( + current_ctx, + "Data streams context should be set after processing SQS message", + ) + + # Step 3: Check that hash in this context is the child of the hash you passed + # Step 4: Check that the right checkpoint was produced during call to handler + + found_sqs_checkpoint = False + for bucket_time, bucket in processor_instance._buckets.items(): + for aggr_key, stats in bucket.pathway_stats.items(): + edge_tags_str, hash_value, parent_hash_recorded = aggr_key + edge_tags = edge_tags_str.split(",") + + if ( + "direction:in" in edge_tags + and "topic:test" in edge_tags + and "type:sqs" in edge_tags + ): + found_sqs_checkpoint = True + + # EXPLICIT PARENT-CHILD HASH RELATIONSHIP TEST + self.assertEqual( + parent_hash_recorded, + parent_hash, + f"Parent hash must be preserved: " + f"expected {parent_hash}, got {parent_hash_recorded}", + ) + self.assertEqual( + hash_value, + current_ctx.hash, + f"Child hash must match current context: " + f"expected {current_ctx.hash}, got {hash_value}", + ) + self.assertNotEqual( + hash_value, + parent_hash_recorded, + f"Child hash ({hash_value}) must be different from " + f"parent hash ({parent_hash_recorded}) - proves parent-child", + ) + self.assertGreaterEqual( + stats.payload_size.count, + 1, + "Should have one payload size measurement", + ) + + break + + self.assertTrue( + found_sqs_checkpoint, + "Should have found SQS consumption checkpoint in processor stats", + ) + + processor_instance.shutdown(timeout=0.1) + class TestLambdaDecoratorSettings(unittest.TestCase): def test_some_envs_should_depend_on_dd_tracing_enabled(self): From 1f73d64490a5b20f9ff511dbfeb1af3a7b051e6d Mon Sep 17 00:00:00 2001 From: Michael Zhao Date: Fri, 30 May 2025 14:08:53 -0400 Subject: [PATCH 04/21] comment --- tests/test_wrapper.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_wrapper.py b/tests/test_wrapper.py index 79367fa6b..f0033c3c9 100644 --- a/tests/test_wrapper.py +++ b/tests/test_wrapper.py @@ -680,7 +680,7 @@ def lambda_handler(event, context): # Step 3: Check that hash in this context is the child of the hash you passed # Step 4: Check that the right checkpoint was produced during call to handler - + # The buckets hold the aggregated stats for all checkpoints found_sqs_checkpoint = False for bucket_time, bucket in processor_instance._buckets.items(): for aggr_key, stats in bucket.pathway_stats.items(): From 44390b1eae358da6f60a6ebe8b9229f5db96b61a Mon Sep 17 00:00:00 2001 From: Michael Zhao Date: Fri, 30 May 2025 14:23:55 -0400 Subject: [PATCH 05/21] only import when DSM is enabled --- datadog_lambda/wrapper.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/datadog_lambda/wrapper.py b/datadog_lambda/wrapper.py index 3a86c56fc..37c305b22 100644 --- a/datadog_lambda/wrapper.py +++ b/datadog_lambda/wrapper.py @@ -292,15 +292,6 @@ def _inject_authorizer_span_headers(self, request_id): def _before(self, event, context): - from ddtrace.internal.datastreams.processor import ( - DataStreamsProcessor as processor, - DsmPathwayCodec, - ) - from ddtrace.internal.datastreams.botocore import ( - get_datastreams_context, - calculate_sqs_payload_size, - ) - def _dsm_set_sqs_context(record): try: queue_arn = record.get("eventSourceARN", "") @@ -319,6 +310,15 @@ def _dsm_set_sqs_context(record): logger.error(format_err_with_traceback(e)) if self.data_streams_enabled: + from ddtrace.internal.datastreams.processor import ( + DataStreamsProcessor as processor, + DsmPathwayCodec, + ) + from ddtrace.internal.datastreams.botocore import ( + get_datastreams_context, + calculate_sqs_payload_size, + ) + if isinstance(event, dict) and "Records" in event and event["Records"]: sqs_records = [ r for r in event["Records"] if r.get("eventSource") == "aws:sqs" From aec31366345772a6a62018c873b70d989b8442db Mon Sep 17 00:00:00 2001 From: Michael Zhao Date: Fri, 30 May 2025 14:25:53 -0400 Subject: [PATCH 06/21] fix lint --- datadog_lambda/wrapper.py | 1 - 1 file changed, 1 deletion(-) diff --git a/datadog_lambda/wrapper.py b/datadog_lambda/wrapper.py index 37c305b22..76965b6ba 100644 --- a/datadog_lambda/wrapper.py +++ b/datadog_lambda/wrapper.py @@ -291,7 +291,6 @@ def _inject_authorizer_span_headers(self, request_id): self.response["context"]["_datadog"] = datadog_data def _before(self, event, context): - def _dsm_set_sqs_context(record): try: queue_arn = record.get("eventSourceARN", "") From 5b2632061b1937399c6f8fd5b46261f80fb0c5a9 Mon Sep 17 00:00:00 2001 From: Michael Zhao Date: Fri, 30 May 2025 16:35:26 -0400 Subject: [PATCH 07/21] fix --- datadog_lambda/wrapper.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/datadog_lambda/wrapper.py b/datadog_lambda/wrapper.py index 76965b6ba..e567a2a3d 100644 --- a/datadog_lambda/wrapper.py +++ b/datadog_lambda/wrapper.py @@ -294,14 +294,13 @@ def _before(self, event, context): def _dsm_set_sqs_context(record): try: queue_arn = record.get("eventSourceARN", "") - queue_name = queue_arn.split(":")[-1] contextjson = get_datastreams_context(record) payload_size = calculate_sqs_payload_size(record) ctx = DsmPathwayCodec.decode(contextjson, processor()) ctx.set_checkpoint( - ["direction:in", "topic:" + queue_name, "type:sqs"], + ["direction:in", "topic:" + queue_arn, "type:sqs"], payload_size=payload_size, ) From 0987c7d9152470b015818f9085ca83dbd18d2f20 Mon Sep 17 00:00:00 2001 From: Michael Zhao Date: Fri, 30 May 2025 16:42:15 -0400 Subject: [PATCH 08/21] fix --- tests/test_wrapper.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/test_wrapper.py b/tests/test_wrapper.py index f0033c3c9..3a0dce61d 100644 --- a/tests/test_wrapper.py +++ b/tests/test_wrapper.py @@ -689,7 +689,8 @@ def lambda_handler(event, context): if ( "direction:in" in edge_tags - and "topic:test" in edge_tags + and "topic:arn:aws:sqs:us-east-1:123456789012:test" + in edge_tags and "type:sqs" in edge_tags ): found_sqs_checkpoint = True From 0fe48343be02b6149f96016367dad16d66fb40f9 Mon Sep 17 00:00:00 2001 From: Michael Zhao Date: Tue, 3 Jun 2025 12:38:20 -0400 Subject: [PATCH 09/21] refactorings --- datadog_lambda/dsm.py | 37 +++++++++++++++++++++++++++++++++++++ datadog_lambda/wrapper.py | 36 +++--------------------------------- tests/test_wrapper.py | 32 ++++++++++++++++++++++++++++++++ 3 files changed, 72 insertions(+), 33 deletions(-) create mode 100644 datadog_lambda/dsm.py diff --git a/datadog_lambda/dsm.py b/datadog_lambda/dsm.py new file mode 100644 index 000000000..d3c7ec54f --- /dev/null +++ b/datadog_lambda/dsm.py @@ -0,0 +1,37 @@ +from datadog_lambda import logger +from datadog_lambda.trigger import EventTypes + + +def set_dsm_context(event, event_source): + + if event_source.equals(EventTypes.SQS): + _dsm_set_sqs_context(event) + + +def _dsm_set_sqs_context(event): + from datadog_lambda.wrapper import format_err_with_traceback + + from ddtrace.internal.datastreams.processor import ( + DataStreamsProcessor as processor, + DsmPathwayCodec, + ) + from ddtrace.internal.datastreams.botocore import ( + get_datastreams_context, + calculate_sqs_payload_size, + ) + + records = event.get("Records", []) + for record in records: + try: + queue_arn = record.get("eventSourceARN", "") + + contextjson = get_datastreams_context(record) + payload_size = calculate_sqs_payload_size(record) + + ctx = DsmPathwayCodec.decode(contextjson, processor()) + ctx.set_checkpoint( + ["direction:in", "topic:" + queue_arn, "type:sqs"], + payload_size=payload_size, + ) + except Exception as e: + logger.error(format_err_with_traceback(e)) diff --git a/datadog_lambda/wrapper.py b/datadog_lambda/wrapper.py index e567a2a3d..91d13a6c6 100644 --- a/datadog_lambda/wrapper.py +++ b/datadog_lambda/wrapper.py @@ -9,6 +9,7 @@ from importlib import import_module from time import time_ns +from datadog_lambda.dsm import set_dsm_context from datadog_lambda.extension import should_use_extension, flush_extension from datadog_lambda.cold_start import ( set_cold_start, @@ -291,39 +292,6 @@ def _inject_authorizer_span_headers(self, request_id): self.response["context"]["_datadog"] = datadog_data def _before(self, event, context): - def _dsm_set_sqs_context(record): - try: - queue_arn = record.get("eventSourceARN", "") - - contextjson = get_datastreams_context(record) - payload_size = calculate_sqs_payload_size(record) - - ctx = DsmPathwayCodec.decode(contextjson, processor()) - ctx.set_checkpoint( - ["direction:in", "topic:" + queue_arn, "type:sqs"], - payload_size=payload_size, - ) - - except Exception as e: - logger.error(format_err_with_traceback(e)) - - if self.data_streams_enabled: - from ddtrace.internal.datastreams.processor import ( - DataStreamsProcessor as processor, - DsmPathwayCodec, - ) - from ddtrace.internal.datastreams.botocore import ( - get_datastreams_context, - calculate_sqs_payload_size, - ) - - if isinstance(event, dict) and "Records" in event and event["Records"]: - sqs_records = [ - r for r in event["Records"] if r.get("eventSource") == "aws:sqs" - ] - if sqs_records: - for record in sqs_records: - _dsm_set_sqs_context(record) try: self.response = None @@ -360,6 +328,8 @@ def _dsm_set_sqs_context(record): self.inferred_span = create_inferred_span( event, context, event_source, self.decode_authorizer_context ) + if self.data_streams_enabled: + set_dsm_context(event, event_source) self.span = create_function_execution_span( context=context, function_name=self.function_name, diff --git a/tests/test_wrapper.py b/tests/test_wrapper.py index 3a0dce61d..5369a567f 100644 --- a/tests/test_wrapper.py +++ b/tests/test_wrapper.py @@ -565,6 +565,11 @@ def return_type_test(event, context): @patch.dict(os.environ, {"DD_DATA_STREAMS_ENABLED": "true"}) def test_datadog_lambda_wrapper_dsm_sqs_context_pathway_verification(self): + from datadog_lambda.trigger import _EventSource, EventTypes + + sqs_event_source = _EventSource(EventTypes.SQS) + self.mock_extract_dd_trace_context.return_value = ({}, None, sqs_event_source) + with patch( "ddtrace.internal.datastreams.processor.get_connection" ) as mock_get_connection: @@ -729,6 +734,33 @@ def lambda_handler(event, context): processor_instance.shutdown(timeout=0.1) + @patch.dict(os.environ, {"DD_DATA_STREAMS_ENABLED": "true"}) + @patch("datadog_lambda.wrapper.set_dsm_context") + def test_set_dsm_context_called_when_enabled(self, mock_set_dsm_context): + @wrapper.datadog_lambda_wrapper + def lambda_handler(event, context): + return {"statusCode": 200, "body": "processed"} + + lambda_event = {} + lambda_handler(lambda_event, get_mock_context()) + + mock_set_dsm_context.assert_called_once() + + @patch("datadog_lambda.wrapper.set_dsm_context") + def test_set_dsm_context_not_called_when_disabled(self, mock_set_dsm_context): + # Ensure DD_DATA_STREAMS_ENABLED is not in environment + if "DD_DATA_STREAMS_ENABLED" in os.environ: + del os.environ["DD_DATA_STREAMS_ENABLED"] + + @wrapper.datadog_lambda_wrapper + def lambda_handler(event, context): + return {"statusCode": 200, "body": "processed"} + + lambda_event = {} + lambda_handler(lambda_event, get_mock_context()) + + mock_set_dsm_context.assert_not_called() + class TestLambdaDecoratorSettings(unittest.TestCase): def test_some_envs_should_depend_on_dd_tracing_enabled(self): From 02b60d499559d639519e382c50a5fd715502a066 Mon Sep 17 00:00:00 2001 From: Michael Zhao Date: Tue, 3 Jun 2025 12:41:07 -0400 Subject: [PATCH 10/21] fix --- datadog_lambda/wrapper.py | 1 - 1 file changed, 1 deletion(-) diff --git a/datadog_lambda/wrapper.py b/datadog_lambda/wrapper.py index 91d13a6c6..0e23b7218 100644 --- a/datadog_lambda/wrapper.py +++ b/datadog_lambda/wrapper.py @@ -292,7 +292,6 @@ def _inject_authorizer_span_headers(self, request_id): self.response["context"]["_datadog"] = datadog_data def _before(self, event, context): - try: self.response = None set_cold_start(init_timestamp_ns) From d5c239f7e47983f718aa9ada0b62a8e739454a38 Mon Sep 17 00:00:00 2001 From: michael-zhao459 Date: Tue, 3 Jun 2025 13:05:10 -0400 Subject: [PATCH 11/21] Update datadog_lambda/dsm.py Co-authored-by: Rey Abolofia --- datadog_lambda/dsm.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datadog_lambda/dsm.py b/datadog_lambda/dsm.py index d3c7ec54f..c3c23af0d 100644 --- a/datadog_lambda/dsm.py +++ b/datadog_lambda/dsm.py @@ -30,7 +30,7 @@ def _dsm_set_sqs_context(event): ctx = DsmPathwayCodec.decode(contextjson, processor()) ctx.set_checkpoint( - ["direction:in", "topic:" + queue_arn, "type:sqs"], + ["direction:in", f"topic:{queue_arn}, "type:sqs"], payload_size=payload_size, ) except Exception as e: From 50ec7ea8e8d3309bbb74a47b33eb2c20ff1ba675 Mon Sep 17 00:00:00 2001 From: Michael Zhao Date: Tue, 3 Jun 2025 13:12:01 -0400 Subject: [PATCH 12/21] fix --- datadog_lambda/dsm.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datadog_lambda/dsm.py b/datadog_lambda/dsm.py index c3c23af0d..e8e90ea9b 100644 --- a/datadog_lambda/dsm.py +++ b/datadog_lambda/dsm.py @@ -30,7 +30,7 @@ def _dsm_set_sqs_context(event): ctx = DsmPathwayCodec.decode(contextjson, processor()) ctx.set_checkpoint( - ["direction:in", f"topic:{queue_arn}, "type:sqs"], + ["direction:in", f"topic:{queue_arn}", "type:sqs"], payload_size=payload_size, ) except Exception as e: From b00ae82bfabeffe31cbe0d66b9966381ab7d9ba5 Mon Sep 17 00:00:00 2001 From: Michael Zhao Date: Tue, 3 Jun 2025 13:45:40 -0400 Subject: [PATCH 13/21] fixes --- datadog_lambda/dsm.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/datadog_lambda/dsm.py b/datadog_lambda/dsm.py index e8e90ea9b..8dca6f197 100644 --- a/datadog_lambda/dsm.py +++ b/datadog_lambda/dsm.py @@ -12,7 +12,7 @@ def _dsm_set_sqs_context(event): from datadog_lambda.wrapper import format_err_with_traceback from ddtrace.internal.datastreams.processor import ( - DataStreamsProcessor as processor, + DataStreamsProcessor, DsmPathwayCodec, ) from ddtrace.internal.datastreams.botocore import ( @@ -20,7 +20,11 @@ def _dsm_set_sqs_context(event): calculate_sqs_payload_size, ) - records = event.get("Records", []) + records = event.get("Records") + if records is None: + return + processor = DataStreamsProcessor() + for record in records: try: queue_arn = record.get("eventSourceARN", "") @@ -28,7 +32,7 @@ def _dsm_set_sqs_context(event): contextjson = get_datastreams_context(record) payload_size = calculate_sqs_payload_size(record) - ctx = DsmPathwayCodec.decode(contextjson, processor()) + ctx = DsmPathwayCodec.decode(contextjson, processor) ctx.set_checkpoint( ["direction:in", f"topic:{queue_arn}", "type:sqs"], payload_size=payload_size, From 7fbb06b0bb35a2ec70b2911a5b15b25fe904727a Mon Sep 17 00:00:00 2001 From: Michael Zhao Date: Tue, 3 Jun 2025 14:25:14 -0400 Subject: [PATCH 14/21] fixes --- datadog_lambda/dsm.py | 9 +- tests/test_wrapper.py | 207 +++++++++++++++++++++--------------------- 2 files changed, 105 insertions(+), 111 deletions(-) diff --git a/datadog_lambda/dsm.py b/datadog_lambda/dsm.py index 8dca6f197..427f5e479 100644 --- a/datadog_lambda/dsm.py +++ b/datadog_lambda/dsm.py @@ -10,11 +10,8 @@ def set_dsm_context(event, event_source): def _dsm_set_sqs_context(event): from datadog_lambda.wrapper import format_err_with_traceback - - from ddtrace.internal.datastreams.processor import ( - DataStreamsProcessor, - DsmPathwayCodec, - ) + from ddtrace.internal.datastreams import data_streams_processor + from ddtrace.internal.datastreams.processor import DsmPathwayCodec from ddtrace.internal.datastreams.botocore import ( get_datastreams_context, calculate_sqs_payload_size, @@ -23,7 +20,7 @@ def _dsm_set_sqs_context(event): records = event.get("Records") if records is None: return - processor = DataStreamsProcessor() + processor = data_streams_processor() for record in records: try: diff --git a/tests/test_wrapper.py b/tests/test_wrapper.py index 5369a567f..0e0fefb29 100644 --- a/tests/test_wrapper.py +++ b/tests/test_wrapper.py @@ -623,116 +623,113 @@ def updated_get_datastreams_context(message): return context_json + # Step 1: Create a message with some context in the message attributes + from ddtrace.internal.datastreams.processor import DataStreamsProcessor + + processor_instance = DataStreamsProcessor() + with patch( "ddtrace.internal.datastreams.botocore.get_datastreams_context", updated_get_datastreams_context, + ), patch( + "ddtrace.internal.datastreams.data_streams_processor", + return_value=processor_instance, ): - # Step 1: Create a message with some context in the message attributes - - from ddtrace.internal.datastreams.processor import DataStreamsProcessor - - processor_instance = DataStreamsProcessor() - - with patch( - "ddtrace.internal.datastreams.processor.DataStreamsProcessor", - return_value=processor_instance, - ): - - parent_ctx = processor_instance.new_pathway() - - parent_ctx.set_checkpoint( - ["direction:out", "topic:upstream-topic", "type:sqs"], - now_sec=1640995200.0, - payload_size=512, - ) - parent_hash = parent_ctx.hash - encoded_parent_context = parent_ctx.encode_b64() - - sqs_event = { - "Records": [ - { - "eventSource": "aws:sqs", - "eventSourceARN": "arn:aws:sqs:us-east-1:123456789012:test", - "Body": "test message body", - "messageAttributes": { - "_datadog": { - "stringValue": json.dumps( - { - "dd-pathway-ctx-base64": encoded_parent_context - } - ) - } - }, - } - ] - } - - # Step 2: Call the handler - @wrapper.datadog_lambda_wrapper - def lambda_handler(event, context): - return {"statusCode": 200, "body": "processed"} - - result = lambda_handler(sqs_event, get_mock_context()) - self.assertEqual(result["statusCode"], 200) - - # New context set after handler call - current_ctx = processor_instance._current_context.value - self.assertIsNotNone( - current_ctx, - "Data streams context should be set after processing SQS message", - ) - - # Step 3: Check that hash in this context is the child of the hash you passed - # Step 4: Check that the right checkpoint was produced during call to handler - # The buckets hold the aggregated stats for all checkpoints - found_sqs_checkpoint = False - for bucket_time, bucket in processor_instance._buckets.items(): - for aggr_key, stats in bucket.pathway_stats.items(): - edge_tags_str, hash_value, parent_hash_recorded = aggr_key - edge_tags = edge_tags_str.split(",") - - if ( - "direction:in" in edge_tags - and "topic:arn:aws:sqs:us-east-1:123456789012:test" - in edge_tags - and "type:sqs" in edge_tags - ): - found_sqs_checkpoint = True - - # EXPLICIT PARENT-CHILD HASH RELATIONSHIP TEST - self.assertEqual( - parent_hash_recorded, - parent_hash, - f"Parent hash must be preserved: " - f"expected {parent_hash}, got {parent_hash_recorded}", - ) - self.assertEqual( - hash_value, - current_ctx.hash, - f"Child hash must match current context: " - f"expected {current_ctx.hash}, got {hash_value}", - ) - self.assertNotEqual( - hash_value, - parent_hash_recorded, - f"Child hash ({hash_value}) must be different from " - f"parent hash ({parent_hash_recorded}) - proves parent-child", - ) - self.assertGreaterEqual( - stats.payload_size.count, - 1, - "Should have one payload size measurement", - ) - - break - - self.assertTrue( - found_sqs_checkpoint, - "Should have found SQS consumption checkpoint in processor stats", - ) - - processor_instance.shutdown(timeout=0.1) + parent_ctx = processor_instance.new_pathway() + + parent_ctx.set_checkpoint( + ["direction:out", "topic:upstream-topic", "type:sqs"], + now_sec=1640995200.0, + payload_size=512, + ) + parent_hash = parent_ctx.hash + encoded_parent_context = parent_ctx.encode_b64() + + sqs_event = { + "Records": [ + { + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:us-east-1:123456789012:test", + "Body": "test message body", + "messageAttributes": { + "_datadog": { + "stringValue": json.dumps( + { + "dd-pathway-ctx-base64": encoded_parent_context + } + ) + } + }, + } + ] + } + + # Step 2: Call the handler + @wrapper.datadog_lambda_wrapper + def lambda_handler(event, context): + return {"statusCode": 200, "body": "processed"} + + result = lambda_handler(sqs_event, get_mock_context()) + self.assertEqual(result["statusCode"], 200) + + # New context set after handler call + current_ctx = processor_instance._current_context.value + self.assertIsNotNone( + current_ctx, + "Data streams context should be set after processing SQS message", + ) + + # Step 3: Check that hash in this context is the child of the hash you passed + # Step 4: Check that the right checkpoint was produced during call to handler + # The buckets hold the aggregated stats for all checkpoints + found_sqs_checkpoint = False + for bucket_time, bucket in processor_instance._buckets.items(): + for aggr_key, stats in bucket.pathway_stats.items(): + edge_tags_str, hash_value, parent_hash_recorded = aggr_key + edge_tags = edge_tags_str.split(",") + + if ( + "direction:in" in edge_tags + and "topic:arn:aws:sqs:us-east-1:123456789012:test" + in edge_tags + and "type:sqs" in edge_tags + ): + found_sqs_checkpoint = True + + # EXPLICIT PARENT-CHILD HASH RELATIONSHIP TEST + self.assertEqual( + parent_hash_recorded, + parent_hash, + f"Parent hash must be preserved: " + f"expected {parent_hash}, got {parent_hash_recorded}", + ) + self.assertEqual( + hash_value, + current_ctx.hash, + f"Child hash must match current context: " + f"expected {current_ctx.hash}, got {hash_value}", + ) + self.assertNotEqual( + hash_value, + parent_hash_recorded, + f"Child hash ({hash_value}) must be different from " + f"parent hash ({parent_hash_recorded}) - proves parent-child", + ) + self.assertGreaterEqual( + stats.payload_size.count, + 1, + "Should have one payload size measurement", + ) + + break + + self.assertTrue( + found_sqs_checkpoint, + "Should have found SQS consumption checkpoint in processor stats", + ) + + processor_instance.shutdown(timeout=0.1) @patch.dict(os.environ, {"DD_DATA_STREAMS_ENABLED": "true"}) @patch("datadog_lambda.wrapper.set_dsm_context") From eed0501dce5b7ea8f5b99281c8b75c1b0fd68b65 Mon Sep 17 00:00:00 2001 From: Michael Zhao Date: Tue, 3 Jun 2025 15:31:18 -0400 Subject: [PATCH 15/21] test fixes --- tests/test_dsm.py | 113 +++++++++++++++++++ tests/test_wrapper.py | 257 +++++++++++------------------------------- 2 files changed, 176 insertions(+), 194 deletions(-) create mode 100644 tests/test_dsm.py diff --git a/tests/test_dsm.py b/tests/test_dsm.py new file mode 100644 index 000000000..8cebb3c0f --- /dev/null +++ b/tests/test_dsm.py @@ -0,0 +1,113 @@ +import unittest +from unittest.mock import patch, MagicMock + +from datadog_lambda.dsm import set_dsm_context, _dsm_set_sqs_context +from datadog_lambda.trigger import EventTypes + + +class TestDsmContext(unittest.TestCase): + def test_non_sqs_event_source_does_nothing(self): + """Test that non-SQS event sources don't trigger DSM context setting""" + event = {"Records": [{"body": "test"}]} + + mock_event_source = MagicMock() + mock_event_source.equals.return_value = False # Not SQS + + with patch("datadog_lambda.dsm._dsm_set_sqs_context") as mock_sqs_context: + set_dsm_context(event, mock_event_source) + + mock_event_source.equals.assert_called_once_with(EventTypes.SQS) + mock_sqs_context.assert_not_called() + + def test_event_with_no_records_does_nothing(self): + """Test that events where Records is None don't trigger DSM processing""" + events_with_no_records = [ + {}, + {"Records": None}, + {"someOtherField": "value"}, + ] + + for event in events_with_no_records: + with patch( + "ddtrace.internal.datastreams.data_streams_processor" + ) as mock_processor: + _dsm_set_sqs_context(event) + + mock_processor.assert_not_called() + + def test_sqs_event_triggers_dsm_sqs_context(self): + """Test that SQS event sources trigger the SQS-specific DSM context function""" + sqs_event = { + "Records": [ + { + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:us-east-1:123456789012:my-queue", + "body": "Hello from SQS!", + } + ] + } + + mock_event_source = MagicMock() + mock_event_source.equals.return_value = True + + with patch("datadog_lambda.dsm._dsm_set_sqs_context") as mock_sqs_context: + set_dsm_context(sqs_event, mock_event_source) + + mock_sqs_context.assert_called_once_with(sqs_event) + + def test_multiple_records_process_each_record(self): + """Test that each record in an SQS event gets processed individually""" + multi_record_event = { + "Records": [ + { + "eventSourceARN": "arn:aws:sqs:us-east-1:123456789012:queue1", + "body": "Message 1", + }, + { + "eventSourceARN": "arn:aws:sqs:us-east-1:123456789012:queue2", + "body": "Message 2", + }, + { + "eventSourceARN": "arn:aws:sqs:us-east-1:123456789012:queue3", + "body": "Message 3", + }, + ] + } + + mock_processor = MagicMock() + mock_context = MagicMock() + + with patch( + "ddtrace.internal.datastreams.data_streams_processor", + return_value=mock_processor, + ): + with patch( + "ddtrace.internal.datastreams.botocore.get_datastreams_context", + return_value={}, + ): + with patch( + "ddtrace.internal.datastreams.botocore.calculate_sqs_payload_size", + return_value=100, + ): + with patch( + "ddtrace.internal.datastreams.processor.DsmPathwayCodec.decode", + return_value=mock_context, + ): + _dsm_set_sqs_context(multi_record_event) + + assert mock_context.set_checkpoint.call_count == 3 + + calls = mock_context.set_checkpoint.call_args_list + expected_arns = [ + "arn:aws:sqs:us-east-1:123456789012:queue1", + "arn:aws:sqs:us-east-1:123456789012:queue2", + "arn:aws:sqs:us-east-1:123456789012:queue3", + ] + + for i, call in enumerate(calls): + args, kwargs = call + tags = args[0] + self.assertIn("direction:in", tags) + self.assertIn(f"topic:{expected_arns[i]}", tags) + self.assertIn("type:sqs", tags) + self.assertEqual(kwargs["payload_size"], 100) diff --git a/tests/test_wrapper.py b/tests/test_wrapper.py index 0e0fefb29..29a6dfb6d 100644 --- a/tests/test_wrapper.py +++ b/tests/test_wrapper.py @@ -563,200 +563,69 @@ def return_type_test(event, context): self.assertEqual(result, test_result) self.assertFalse(MockPrintExc.called) - @patch.dict(os.environ, {"DD_DATA_STREAMS_ENABLED": "true"}) - def test_datadog_lambda_wrapper_dsm_sqs_context_pathway_verification(self): - from datadog_lambda.trigger import _EventSource, EventTypes - - sqs_event_source = _EventSource(EventTypes.SQS) - self.mock_extract_dd_trace_context.return_value = ({}, None, sqs_event_source) - - with patch( - "ddtrace.internal.datastreams.processor.get_connection" - ) as mock_get_connection: - - mock_conn = unittest.mock.MagicMock() - mock_response = unittest.mock.MagicMock() - mock_response.status = 200 - mock_conn.getresponse.return_value = mock_response - mock_get_connection.return_value = mock_conn - - def updated_get_datastreams_context(message): - """ - Updated version that handles the correct message formats - """ - import base64 - import json - - context_json = None - message_body = message - try: - body = message.get("Body") - if body: - message_body = json.loads(body) - except (ValueError, TypeError): - pass - - message_attributes = message_body.get( - "MessageAttributes" - ) or message_body.get("messageAttributes") - if not message_attributes: - return None - - if "_datadog" not in message_attributes: - return None - - datadog_attr = message_attributes["_datadog"] - - if message_body.get("Type") == "Notification": - if datadog_attr.get("Type") == "Binary": - context_json = json.loads( - base64.b64decode(datadog_attr["Value"]).decode() - ) - elif "StringValue" in datadog_attr: - context_json = json.loads(datadog_attr["StringValue"]) - elif "stringValue" in datadog_attr: - context_json = json.loads(datadog_attr["stringValue"]) - elif "BinaryValue" in datadog_attr: - context_json = json.loads(datadog_attr["BinaryValue"].decode()) - else: - print(f"DEBUG: Unhandled datadog_attr format: {datadog_attr}") - - return context_json - - # Step 1: Create a message with some context in the message attributes - from ddtrace.internal.datastreams.processor import DataStreamsProcessor - - processor_instance = DataStreamsProcessor() - - with patch( - "ddtrace.internal.datastreams.botocore.get_datastreams_context", - updated_get_datastreams_context, - ), patch( - "ddtrace.internal.datastreams.data_streams_processor", - return_value=processor_instance, - ): - - parent_ctx = processor_instance.new_pathway() - - parent_ctx.set_checkpoint( - ["direction:out", "topic:upstream-topic", "type:sqs"], - now_sec=1640995200.0, - payload_size=512, - ) - parent_hash = parent_ctx.hash - encoded_parent_context = parent_ctx.encode_b64() - - sqs_event = { - "Records": [ - { - "eventSource": "aws:sqs", - "eventSourceARN": "arn:aws:sqs:us-east-1:123456789012:test", - "Body": "test message body", - "messageAttributes": { - "_datadog": { - "stringValue": json.dumps( - { - "dd-pathway-ctx-base64": encoded_parent_context - } - ) - } - }, - } - ] - } - - # Step 2: Call the handler - @wrapper.datadog_lambda_wrapper - def lambda_handler(event, context): - return {"statusCode": 200, "body": "processed"} - - result = lambda_handler(sqs_event, get_mock_context()) - self.assertEqual(result["statusCode"], 200) - - # New context set after handler call - current_ctx = processor_instance._current_context.value - self.assertIsNotNone( - current_ctx, - "Data streams context should be set after processing SQS message", - ) - - # Step 3: Check that hash in this context is the child of the hash you passed - # Step 4: Check that the right checkpoint was produced during call to handler - # The buckets hold the aggregated stats for all checkpoints - found_sqs_checkpoint = False - for bucket_time, bucket in processor_instance._buckets.items(): - for aggr_key, stats in bucket.pathway_stats.items(): - edge_tags_str, hash_value, parent_hash_recorded = aggr_key - edge_tags = edge_tags_str.split(",") - - if ( - "direction:in" in edge_tags - and "topic:arn:aws:sqs:us-east-1:123456789012:test" - in edge_tags - and "type:sqs" in edge_tags - ): - found_sqs_checkpoint = True - - # EXPLICIT PARENT-CHILD HASH RELATIONSHIP TEST - self.assertEqual( - parent_hash_recorded, - parent_hash, - f"Parent hash must be preserved: " - f"expected {parent_hash}, got {parent_hash_recorded}", - ) - self.assertEqual( - hash_value, - current_ctx.hash, - f"Child hash must match current context: " - f"expected {current_ctx.hash}, got {hash_value}", - ) - self.assertNotEqual( - hash_value, - parent_hash_recorded, - f"Child hash ({hash_value}) must be different from " - f"parent hash ({parent_hash_recorded}) - proves parent-child", - ) - self.assertGreaterEqual( - stats.payload_size.count, - 1, - "Should have one payload size measurement", - ) - - break - - self.assertTrue( - found_sqs_checkpoint, - "Should have found SQS consumption checkpoint in processor stats", - ) - - processor_instance.shutdown(timeout=0.1) - - @patch.dict(os.environ, {"DD_DATA_STREAMS_ENABLED": "true"}) - @patch("datadog_lambda.wrapper.set_dsm_context") - def test_set_dsm_context_called_when_enabled(self, mock_set_dsm_context): - @wrapper.datadog_lambda_wrapper - def lambda_handler(event, context): - return {"statusCode": 200, "body": "processed"} - - lambda_event = {} - lambda_handler(lambda_event, get_mock_context()) - - mock_set_dsm_context.assert_called_once() - - @patch("datadog_lambda.wrapper.set_dsm_context") - def test_set_dsm_context_not_called_when_disabled(self, mock_set_dsm_context): - # Ensure DD_DATA_STREAMS_ENABLED is not in environment - if "DD_DATA_STREAMS_ENABLED" in os.environ: - del os.environ["DD_DATA_STREAMS_ENABLED"] - - @wrapper.datadog_lambda_wrapper - def lambda_handler(event, context): - return {"statusCode": 200, "body": "processed"} - - lambda_event = {} - lambda_handler(lambda_event, get_mock_context()) - - mock_set_dsm_context.assert_not_called() + def test_set_dsm_context_called_when_DSM_and_tracing_enabled(self): + env_vars = {"DD_DATA_STREAMS_ENABLED": "true"} + with patch.dict(os.environ, env_vars): + with patch("datadog_lambda.wrapper.dd_tracing_enabled", True): + with patch( + "datadog_lambda.wrapper.set_dsm_context" + ) as set_dsm_context_patch: + + @wrapper.datadog_lambda_wrapper + def lambda_handler(event, context): + return "ok" + + result = lambda_handler({}, get_mock_context()) + assert result == "ok" + assert set_dsm_context_patch.called_once() + + def test_set_dsm_context_not_called_when_only_DSM_enabled(self): + env_vars = {"DD_DATA_STREAMS_ENABLED": "true"} + with patch.dict(os.environ, env_vars): + with patch("datadog_lambda.wrapper.dd_tracing_enabled", False): + with patch( + "datadog_lambda.wrapper.set_dsm_context" + ) as set_dsm_context_patch: + + @wrapper.datadog_lambda_wrapper + def lambda_handler(event, context): + return "ok" + + result = lambda_handler({}, get_mock_context()) + assert result == "ok" + assert set_dsm_context_patch.call_count == 0 + + def test_set_dsm_context_not_called_when_only_tracing_enabled(self): + env_vars = {"DD_DATA_STREAMS_ENABLED": "false"} + with patch.dict(os.environ, env_vars): + with patch("datadog_lambda.wrapper.dd_tracing_enabled", True): + with patch( + "datadog_lambda.wrapper.set_dsm_context" + ) as set_dsm_context_patch: + + @wrapper.datadog_lambda_wrapper + def lambda_handler(event, context): + return "ok" + + result = lambda_handler({}, get_mock_context()) + assert result == "ok" + assert set_dsm_context_patch.call_count == 0 + + def test_set_dsm_context_not_called_when_tracing_and_DSM_disabled(self): + env_vars = {"DD_DATA_STREAMS_ENABLED": "false"} + with patch.dict(os.environ, env_vars): + with patch("datadog_lambda.wrapper.dd_tracing_enabled", True): + with patch( + "datadog_lambda.wrapper.set_dsm_context" + ) as set_dsm_context_patch: + + @wrapper.datadog_lambda_wrapper + def lambda_handler(event, context): + return "ok" + + result = lambda_handler({}, get_mock_context()) + assert result == "ok" + assert set_dsm_context_patch.call_count == 0 class TestLambdaDecoratorSettings(unittest.TestCase): From 48413b09906dd0b994af445c923af779051e170d Mon Sep 17 00:00:00 2001 From: Michael Zhao Date: Tue, 3 Jun 2025 15:37:07 -0400 Subject: [PATCH 16/21] fix --- tests/test_wrapper.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_wrapper.py b/tests/test_wrapper.py index 29a6dfb6d..f5c933888 100644 --- a/tests/test_wrapper.py +++ b/tests/test_wrapper.py @@ -577,7 +577,7 @@ def lambda_handler(event, context): result = lambda_handler({}, get_mock_context()) assert result == "ok" - assert set_dsm_context_patch.called_once() + assert set_dsm_context_patch.call_count == 1 def test_set_dsm_context_not_called_when_only_DSM_enabled(self): env_vars = {"DD_DATA_STREAMS_ENABLED": "true"} From 3f00a62e665299ee427372f9016e54c9a4e3b909 Mon Sep 17 00:00:00 2001 From: Michael Zhao Date: Tue, 3 Jun 2025 16:07:46 -0400 Subject: [PATCH 17/21] fix --- tests/test_wrapper.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_wrapper.py b/tests/test_wrapper.py index f5c933888..d6e71cc3d 100644 --- a/tests/test_wrapper.py +++ b/tests/test_wrapper.py @@ -614,7 +614,7 @@ def lambda_handler(event, context): def test_set_dsm_context_not_called_when_tracing_and_DSM_disabled(self): env_vars = {"DD_DATA_STREAMS_ENABLED": "false"} with patch.dict(os.environ, env_vars): - with patch("datadog_lambda.wrapper.dd_tracing_enabled", True): + with patch("datadog_lambda.wrapper.dd_tracing_enabled", False): with patch( "datadog_lambda.wrapper.set_dsm_context" ) as set_dsm_context_patch: From b035f0578ee887d201c5caab1e4dd45d46106e3d Mon Sep 17 00:00:00 2001 From: Michael Zhao Date: Wed, 4 Jun 2025 08:04:06 -0400 Subject: [PATCH 18/21] unit test fixes --- tests/test_dsm.py | 99 ++++++++++++++++++++++--------------------- tests/test_wrapper.py | 96 ++++++++++++++++++++--------------------- 2 files changed, 97 insertions(+), 98 deletions(-) diff --git a/tests/test_dsm.py b/tests/test_dsm.py index 8cebb3c0f..1fa28e97e 100644 --- a/tests/test_dsm.py +++ b/tests/test_dsm.py @@ -6,6 +6,31 @@ class TestDsmContext(unittest.TestCase): + def setUp(self): + patcher = patch("datadog_lambda.dsm._dsm_set_sqs_context") + self.mock_dsm_set_sqs_context = patcher.start() + self.addCleanup(patcher.stop) + + patcher = patch("ddtrace.internal.datastreams.data_streams_processor") + self.mock_data_streams_processor = patcher.start() + self.addCleanup(patcher.stop) + + patcher = patch("ddtrace.internal.datastreams.botocore.get_datastreams_context") + self.mock_get_datastreams_context = patcher.start() + self.mock_get_datastreams_context.return_value = {} + self.addCleanup(patcher.stop) + + patcher = patch( + "ddtrace.internal.datastreams.botocore.calculate_sqs_payload_size" + ) + self.mock_calculate_sqs_payload_size = patcher.start() + self.mock_calculate_sqs_payload_size.return_value = 100 + self.addCleanup(patcher.stop) + + patcher = patch("ddtrace.internal.datastreams.processor.DsmPathwayCodec.decode") + self.mock_dsm_pathway_codec_decode = patcher.start() + self.addCleanup(patcher.stop) + def test_non_sqs_event_source_does_nothing(self): """Test that non-SQS event sources don't trigger DSM context setting""" event = {"Records": [{"body": "test"}]} @@ -13,11 +38,10 @@ def test_non_sqs_event_source_does_nothing(self): mock_event_source = MagicMock() mock_event_source.equals.return_value = False # Not SQS - with patch("datadog_lambda.dsm._dsm_set_sqs_context") as mock_sqs_context: - set_dsm_context(event, mock_event_source) + set_dsm_context(event, mock_event_source) - mock_event_source.equals.assert_called_once_with(EventTypes.SQS) - mock_sqs_context.assert_not_called() + mock_event_source.equals.assert_called_once_with(EventTypes.SQS) + self.mock_dsm_set_sqs_context.assert_not_called() def test_event_with_no_records_does_nothing(self): """Test that events where Records is None don't trigger DSM processing""" @@ -28,12 +52,8 @@ def test_event_with_no_records_does_nothing(self): ] for event in events_with_no_records: - with patch( - "ddtrace.internal.datastreams.data_streams_processor" - ) as mock_processor: - _dsm_set_sqs_context(event) - - mock_processor.assert_not_called() + _dsm_set_sqs_context(event) + self.mock_data_streams_processor.assert_not_called() def test_sqs_event_triggers_dsm_sqs_context(self): """Test that SQS event sources trigger the SQS-specific DSM context function""" @@ -50,10 +70,9 @@ def test_sqs_event_triggers_dsm_sqs_context(self): mock_event_source = MagicMock() mock_event_source.equals.return_value = True - with patch("datadog_lambda.dsm._dsm_set_sqs_context") as mock_sqs_context: - set_dsm_context(sqs_event, mock_event_source) + set_dsm_context(sqs_event, mock_event_source) - mock_sqs_context.assert_called_once_with(sqs_event) + self.mock_dsm_set_sqs_context.assert_called_once_with(sqs_event) def test_multiple_records_process_each_record(self): """Test that each record in an SQS event gets processed individually""" @@ -74,40 +93,24 @@ def test_multiple_records_process_each_record(self): ] } - mock_processor = MagicMock() mock_context = MagicMock() + self.mock_dsm_pathway_codec_decode.return_value = mock_context + + _dsm_set_sqs_context(multi_record_event) + + self.assertEqual(mock_context.set_checkpoint.call_count, 3) + + calls = mock_context.set_checkpoint.call_args_list + expected_arns = [ + "arn:aws:sqs:us-east-1:123456789012:queue1", + "arn:aws:sqs:us-east-1:123456789012:queue2", + "arn:aws:sqs:us-east-1:123456789012:queue3", + ] - with patch( - "ddtrace.internal.datastreams.data_streams_processor", - return_value=mock_processor, - ): - with patch( - "ddtrace.internal.datastreams.botocore.get_datastreams_context", - return_value={}, - ): - with patch( - "ddtrace.internal.datastreams.botocore.calculate_sqs_payload_size", - return_value=100, - ): - with patch( - "ddtrace.internal.datastreams.processor.DsmPathwayCodec.decode", - return_value=mock_context, - ): - _dsm_set_sqs_context(multi_record_event) - - assert mock_context.set_checkpoint.call_count == 3 - - calls = mock_context.set_checkpoint.call_args_list - expected_arns = [ - "arn:aws:sqs:us-east-1:123456789012:queue1", - "arn:aws:sqs:us-east-1:123456789012:queue2", - "arn:aws:sqs:us-east-1:123456789012:queue3", - ] - - for i, call in enumerate(calls): - args, kwargs = call - tags = args[0] - self.assertIn("direction:in", tags) - self.assertIn(f"topic:{expected_arns[i]}", tags) - self.assertIn("type:sqs", tags) - self.assertEqual(kwargs["payload_size"], 100) + for i, call in enumerate(calls): + args, kwargs = call + tags = args[0] + self.assertIn("direction:in", tags) + self.assertIn(f"topic:{expected_arns[i]}", tags) + self.assertIn("type:sqs", tags) + self.assertEqual(kwargs["payload_size"], 100) diff --git a/tests/test_wrapper.py b/tests/test_wrapper.py index d6e71cc3d..f482fa3de 100644 --- a/tests/test_wrapper.py +++ b/tests/test_wrapper.py @@ -76,6 +76,10 @@ def setUp(self): self.mock_dd_lambda_layer_tag = patcher.start() self.addCleanup(patcher.stop) + patcher = patch("datadog_lambda.wrapper.set_dsm_context") + self.mock_set_dsm_context = patcher.start() + self.addCleanup(patcher.stop) + def test_datadog_lambda_wrapper(self): wrapper.dd_tracing_enabled = False @@ -564,68 +568,60 @@ def return_type_test(event, context): self.assertFalse(MockPrintExc.called) def test_set_dsm_context_called_when_DSM_and_tracing_enabled(self): - env_vars = {"DD_DATA_STREAMS_ENABLED": "true"} - with patch.dict(os.environ, env_vars): - with patch("datadog_lambda.wrapper.dd_tracing_enabled", True): - with patch( - "datadog_lambda.wrapper.set_dsm_context" - ) as set_dsm_context_patch: + os.environ["DD_DATA_STREAMS_ENABLED"] = "true" + wrapper.dd_tracing_enabled = True + + @wrapper.datadog_lambda_wrapper + def lambda_handler(event, context): + return "ok" - @wrapper.datadog_lambda_wrapper - def lambda_handler(event, context): - return "ok" + result = lambda_handler({}, get_mock_context()) + self.assertEqual(result, "ok") + self.mock_set_dsm_context.assert_called_once() - result = lambda_handler({}, get_mock_context()) - assert result == "ok" - assert set_dsm_context_patch.call_count == 1 + del os.environ["DD_DATA_STREAMS_ENABLED"] def test_set_dsm_context_not_called_when_only_DSM_enabled(self): - env_vars = {"DD_DATA_STREAMS_ENABLED": "true"} - with patch.dict(os.environ, env_vars): - with patch("datadog_lambda.wrapper.dd_tracing_enabled", False): - with patch( - "datadog_lambda.wrapper.set_dsm_context" - ) as set_dsm_context_patch: + os.environ["DD_DATA_STREAMS_ENABLED"] = "true" + wrapper.dd_tracing_enabled = False + + @wrapper.datadog_lambda_wrapper + def lambda_handler(event, context): + return "ok" - @wrapper.datadog_lambda_wrapper - def lambda_handler(event, context): - return "ok" + result = lambda_handler({}, get_mock_context()) + self.assertEqual(result, "ok") + self.mock_set_dsm_context.assert_not_called() - result = lambda_handler({}, get_mock_context()) - assert result == "ok" - assert set_dsm_context_patch.call_count == 0 + del os.environ["DD_DATA_STREAMS_ENABLED"] def test_set_dsm_context_not_called_when_only_tracing_enabled(self): - env_vars = {"DD_DATA_STREAMS_ENABLED": "false"} - with patch.dict(os.environ, env_vars): - with patch("datadog_lambda.wrapper.dd_tracing_enabled", True): - with patch( - "datadog_lambda.wrapper.set_dsm_context" - ) as set_dsm_context_patch: + os.environ["DD_DATA_STREAMS_ENABLED"] = "false" + wrapper.dd_tracing_enabled = True - @wrapper.datadog_lambda_wrapper - def lambda_handler(event, context): - return "ok" + @wrapper.datadog_lambda_wrapper + def lambda_handler(event, context): + return "ok" - result = lambda_handler({}, get_mock_context()) - assert result == "ok" - assert set_dsm_context_patch.call_count == 0 + result = lambda_handler({}, get_mock_context()) + self.assertEqual(result, "ok") + self.mock_set_dsm_context.assert_not_called() + + del os.environ["DD_DATA_STREAMS_ENABLED"] def test_set_dsm_context_not_called_when_tracing_and_DSM_disabled(self): - env_vars = {"DD_DATA_STREAMS_ENABLED": "false"} - with patch.dict(os.environ, env_vars): - with patch("datadog_lambda.wrapper.dd_tracing_enabled", False): - with patch( - "datadog_lambda.wrapper.set_dsm_context" - ) as set_dsm_context_patch: - - @wrapper.datadog_lambda_wrapper - def lambda_handler(event, context): - return "ok" - - result = lambda_handler({}, get_mock_context()) - assert result == "ok" - assert set_dsm_context_patch.call_count == 0 + os.environ["DD_DATA_STREAMS_ENABLED"] = "false" + wrapper.dd_tracing_enabled = False + + @wrapper.datadog_lambda_wrapper + def lambda_handler(event, context): + return "ok" + + result = lambda_handler({}, get_mock_context()) + self.assertEqual(result, "ok") + self.mock_set_dsm_context.assert_not_called() + + del os.environ["DD_DATA_STREAMS_ENABLED"] class TestLambdaDecoratorSettings(unittest.TestCase): From 32e0b83007194d87921792677c8f7e4a89f5c39d Mon Sep 17 00:00:00 2001 From: Michael Zhao Date: Wed, 4 Jun 2025 11:04:35 -0400 Subject: [PATCH 19/21] added sns support --- datadog_lambda/dsm.py | 57 ++++++++++++++++++++++++++++++++++--------- 1 file changed, 46 insertions(+), 11 deletions(-) diff --git a/datadog_lambda/dsm.py b/datadog_lambda/dsm.py index 427f5e479..c55f42b1e 100644 --- a/datadog_lambda/dsm.py +++ b/datadog_lambda/dsm.py @@ -6,16 +6,24 @@ def set_dsm_context(event, event_source): if event_source.equals(EventTypes.SQS): _dsm_set_sqs_context(event) + elif event_source.equals(EventTypes.SNS): + _dsm_set_sns_context(event) -def _dsm_set_sqs_context(event): +def _dsm_set_context_helper(event, service_type, arn_extractor, payload_size_calculator): + """ + Common helper function for setting DSM context for both SQS and SNS events. + + Args: + event: The Lambda event containing records + service_type: The service type string ('sqs' or 'sns') + arn_extractor: Function to extract the ARN from the record + payload_size_calculator: Function to calculate payload size + """ from datadog_lambda.wrapper import format_err_with_traceback from ddtrace.internal.datastreams import data_streams_processor from ddtrace.internal.datastreams.processor import DsmPathwayCodec - from ddtrace.internal.datastreams.botocore import ( - get_datastreams_context, - calculate_sqs_payload_size, - ) + from ddtrace.internal.datastreams.botocore import get_datastreams_context records = event.get("Records") if records is None: @@ -24,15 +32,42 @@ def _dsm_set_sqs_context(event): for record in records: try: - queue_arn = record.get("eventSourceARN", "") - - contextjson = get_datastreams_context(record) - payload_size = calculate_sqs_payload_size(record) + arn = arn_extractor(record) + print(f"ARN: {arn}") + context_json = get_datastreams_context(record) + payload_size = payload_size_calculator(record, context_json) - ctx = DsmPathwayCodec.decode(contextjson, processor) + ctx = DsmPathwayCodec.decode(context_json, processor) ctx.set_checkpoint( - ["direction:in", f"topic:{queue_arn}", "type:sqs"], + ["direction:in", f"topic:{arn}", f"type:{service_type}"], payload_size=payload_size, ) except Exception as e: logger.error(format_err_with_traceback(e)) + + +def _dsm_set_sns_context(event): + from ddtrace.internal.datastreams.botocore import calculate_sns_payload_size + + def sns_payload_calculator(record, context_json): + return calculate_sns_payload_size(record, context_json) + + def sns_arn_extractor(record): + sns_data = record.get("Sns", {}) + return sns_data.get("TopicArn", "") + + _dsm_set_context_helper( + event, "sns", sns_arn_extractor, sns_payload_calculator + ) + + +def _dsm_set_sqs_context(event): + from ddtrace.internal.datastreams.botocore import calculate_sqs_payload_size + + def sqs_payload_calculator(record, context_json): + return calculate_sqs_payload_size(record) + + def sqs_arn_extractor(record): + return record.get("eventSourceARN", "") + + _dsm_set_context_helper(event, "sqs", sqs_arn_extractor, sqs_payload_calculator) From ff08143c99b7bf63c935b75ccf4c93db7bcf6e29 Mon Sep 17 00:00:00 2001 From: Michael Zhao Date: Wed, 4 Jun 2025 13:57:20 -0400 Subject: [PATCH 20/21] sqs support --- datadog_lambda/dsm.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/datadog_lambda/dsm.py b/datadog_lambda/dsm.py index c55f42b1e..5a0d43630 100644 --- a/datadog_lambda/dsm.py +++ b/datadog_lambda/dsm.py @@ -53,7 +53,9 @@ def sns_payload_calculator(record, context_json): return calculate_sns_payload_size(record, context_json) def sns_arn_extractor(record): - sns_data = record.get("Sns", {}) + sns_data = record.get("Sns") + if not sns_data: + return "" return sns_data.get("TopicArn", "") _dsm_set_context_helper( @@ -69,5 +71,4 @@ def sqs_payload_calculator(record, context_json): def sqs_arn_extractor(record): return record.get("eventSourceARN", "") - _dsm_set_context_helper(event, "sqs", sqs_arn_extractor, sqs_payload_calculator) From 82306f8fb858aaee1cbb67d4890a3aadc38ab3f9 Mon Sep 17 00:00:00 2001 From: Michael Zhao Date: Thu, 5 Jun 2025 09:55:37 -0400 Subject: [PATCH 21/21] remove prints --- datadog_lambda/dsm.py | 1 - 1 file changed, 1 deletion(-) diff --git a/datadog_lambda/dsm.py b/datadog_lambda/dsm.py index 5a0d43630..a14464adf 100644 --- a/datadog_lambda/dsm.py +++ b/datadog_lambda/dsm.py @@ -33,7 +33,6 @@ def _dsm_set_context_helper(event, service_type, arn_extractor, payload_size_cal for record in records: try: arn = arn_extractor(record) - print(f"ARN: {arn}") context_json = get_datastreams_context(record) payload_size = payload_size_calculator(record, context_json)