Skip to content

Commit 712749f

Browse files
committed
fix: coderabbit comments on the pr
1 parent 52c8da5 commit 712749f

File tree

6 files changed

+31
-15
lines changed

6 files changed

+31
-15
lines changed

examples/observability/cross_workflow_tracking/cross_workflow_tracking_example.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
from nat.runtime.loader import load_workflow
1717

1818

19-
async def create_simple_config():
19+
async def create_simple_config() -> str:
2020
"""Create a simple workflow config using built-in NAT functions."""
2121

2222
config_content = """
@@ -39,7 +39,7 @@ async def create_simple_config():
3939
return f.name
4040

4141

42-
async def main():
42+
async def main() -> None:
4343
"""Demonstrate real NAT workflow with cross-workflow observability."""
4444

4545
print("=" * 50)

examples/observability/cross_workflow_tracking/integrated_example.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,7 @@ async def create_sample_workflow_configs():
174174
temp_dir = tempfile.mkdtemp()
175175

176176
# Data validation workflow config
177-
validation_config = f"""
177+
validation_config = """
178178
llms:
179179
mock_llm:
180180
_type: mock
@@ -193,7 +193,7 @@ async def create_sample_workflow_configs():
193193
f.write(validation_config.strip())
194194

195195
# Data processing workflow config
196-
processing_config = f"""
196+
processing_config = """
197197
llms:
198198
mock_llm:
199199
_type: mock
@@ -212,7 +212,7 @@ async def create_sample_workflow_configs():
212212
f.write(processing_config.strip())
213213

214214
# Analysis workflow config
215-
analysis_config = f"""
215+
analysis_config = """
216216
llms:
217217
mock_llm:
218218
_type: mock

examples/observability/cross_workflow_tracking/observability_config.yml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,10 @@ general:
3030

3131
llms:
3232
nim_llm:
33-
_type: nim
34-
model_name: meta/llama-3.1-8b-instruct
35-
temperature: 0.7
36-
max_tokens: 1024
33+
_type: nat_test_llm
34+
response_seq:
35+
- "This is a test response for observability demonstration."
36+
delay_ms: 100
3737

3838
workflow:
3939
_type: chat_completion

src/nat/observability/processor/cross_workflow_processor.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,14 @@
1313
# See the License for the specific language governing permissions and
1414
# limitations under the License.
1515

16+
import logging
17+
1618
from nat.data_models.span import Span
1719
from nat.observability.processor.processor import Processor
1820
from nat.utils.type_utils import override
1921

22+
logger = logging.getLogger(__name__)
23+
2024

2125
class CrossWorkflowProcessor(Processor[Span, Span]):
2226
"""
@@ -98,8 +102,9 @@ async def process(self, item: Span) -> Span:
98102
for key, value in obs_context.custom_attributes.items():
99103
item.set_attribute(f"observability.custom.{key}", value)
100104

101-
except Exception as e:
105+
except (AttributeError, KeyError, TypeError, ValueError) as e:
102106
# If there's any error in processing, log it but don't fail the span
107+
logger.warning(f"Error processing cross-workflow observability data: {e}", exc_info=True)
103108
item.set_attribute("observability.processing_error", str(e))
104109

105110
return item
@@ -158,8 +163,9 @@ async def process(self, item: Span) -> Span:
158163
item.set_attribute("relationship.type", "root_workflow")
159164
item.set_attribute("relationship.nesting_level", 0)
160165

161-
except Exception as e:
166+
except (AttributeError, IndexError, TypeError) as e:
162167
# If there's any error in processing, log it but don't fail the span
168+
logger.warning(f"Error processing workflow relationship data: {e}", exc_info=True)
163169
item.set_attribute("relationship.processing_error", str(e))
164170

165171
return item

src/nat/observability/workflow_utils.py

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313
# See the License for the specific language governing permissions and
1414
# limitations under the License.
1515

16+
import logging
17+
import sys
1618
import time
1719
from typing import TYPE_CHECKING
1820
from typing import Any
@@ -22,6 +24,8 @@
2224

2325
from nat.observability.context import ObservabilityContext
2426

27+
logger = logging.getLogger(__name__)
28+
2529
if TYPE_CHECKING:
2630
from nat.builder.workflow import Workflow
2731

@@ -95,13 +99,15 @@ async def invoke_workflow_with_context(workflow: "Workflow[InputT, StreamingOutp
9599
return result
96100

97101
except Exception as e:
98-
# Update workflow metadata on failure
102+
# Update workflow metadata on failure and log error
99103
if obs_context:
100104
current_workflow = obs_context.get_current_workflow()
101105
if current_workflow:
102106
current_workflow.end_time = time.time()
103107
current_workflow.status = "failed"
104108
current_workflow.tags["error"] = str(e)
109+
110+
logger.error(f"Workflow '{workflow_name}' failed with error: {e}", exc_info=True)
105111
raise
106112

107113
@staticmethod
@@ -161,13 +167,15 @@ async def invoke_workflow_stream_with_context(
161167
current_workflow.status = "completed"
162168

163169
except Exception as e:
164-
# Update workflow metadata on failure
170+
# Update workflow metadata on failure and log error
165171
if obs_context:
166172
current_workflow = obs_context.get_current_workflow()
167173
if current_workflow:
168174
current_workflow.end_time = time.time()
169175
current_workflow.status = "failed"
170176
current_workflow.tags["error"] = str(e)
177+
178+
logger.error(f"Streaming workflow '{workflow_name}' failed with error: {e}", exc_info=True)
171179
raise
172180

173181
@staticmethod
@@ -271,11 +279,13 @@ async def invoke_with_steps_and_context(workflow: "Workflow[InputT, StreamingOut
271279
return result, steps
272280

273281
except Exception as e:
274-
# Update workflow metadata on failure
282+
# Update workflow metadata on failure and log error
275283
if obs_context:
276284
current_workflow = obs_context.get_current_workflow()
277285
if current_workflow:
278286
current_workflow.end_time = time.time()
279287
current_workflow.status = "failed"
280288
current_workflow.tags["error"] = str(e)
289+
290+
logger.error(f"Workflow with steps '{workflow_name}' failed with error: {e}", exc_info=True)
281291
raise

0 commit comments

Comments
 (0)