Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IWF-357: Add internal channel TypeStore #70

Open
wants to merge 15 commits into
base: main
Choose a base branch
from

Conversation

lwolczynski
Copy link
Contributor

@lwolczynski lwolczynski commented Jan 3, 2025

Closes #41

import inspect
import time
import unittest

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test added to make sure the currently existing issue is fixed. Issue description by @longquanzheng

The MVP solution works but not ideal – it just blend/mix the prefix and non-prefix channel names without differentiation. This could cause some confusion/unexpected behavior.

For example:

  • User can define a channel name “ABC” (not by prefix) and try to publish with name “ABCD” will also be allowed – but it should be disallowed. Because “ABC” is not by prefix.

Comment on lines +9 to +12
INTERNAL_CHANNEL = 1
# TODO: extend to other types
# DATA_ATTRIBUTE = 2
# SIGNAL_CHANNEL = 3
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#42

@lwolczynski lwolczynski marked this pull request as ready for review January 3, 2025 21:25
self._name_to_type_store = dict()
self._prefix_to_type_store = dict()

def is_valid_name_or_prefix(self, name: str) -> bool:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this used anywhere?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed it to be used now. It was not used before, good catch

iwf/command_results.py Outdated Show resolved Hide resolved
t = self._do_get_type(name)

if t is None:
raise ValueError(f"{self._class_type} not registered: {name}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be a WorkflowDefinitionError since the type has not been registered in the store?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking about it as well. I think we have two options here:

  1. Change the exception type from ValueError to WorkflowDefinitionError
  2. Let get_type return None and let the caller do the exception handling

I noticed that publish_to_internal_channel in communication.py and from_idl_command_results in command_results.py will never get to their exception handling if it's done in type_store.py. Thoughts?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mentioned it here as well

Copy link
Contributor

@ktrops ktrops Jan 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this error is fatal, I think we should still raise something here, instead of returning None. In the caller we should wrap the get_type call in a try-except block. Maybe here we should create our own specific exception like NotRegisteredError and in the caller we except it and raise a WorkflowDefinitionError. We want to chain the exceptions, so that the first exception is preserved in the stack trace.

example here in type_store.py in get_type():

if t is None:
      raise NotRegisteredError(f"{self._class_type} not registered: {name}")

example in the caller (i.e. communication.py):

try:
    registered_type = self._internal_channel_type_store.get_type(channel_name)
except NotRegisteredError as exception:
    raise WorkflowDefinitionError(f"InternalChannel channel_name is not defined {channel_name}") from exception

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice! I think that's a great idea. Thanks for the help! Added it.


def add_internal_channel_def(self, obj: CommunicationMethod):
if self._class_type != Type.INTERNAL_CHANNEL:
raise WorkflowDefinitionError(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I might have this switched, but if a workflow is adding an internal channel definition and we are filtering them in the SDK in line 94 of registry.py by checking CommunicationMethodType.InternalChannel then it's not a WorkflowDefinitionError, but an SDK error. The user didn't define something incorrectly, but the SDK is filtering incorrectly, so maybe this should be a ValueError .

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good point 👍 changed it to ValueError

@lwolczynski lwolczynski requested a review from ktrops January 6, 2025 16:16
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Signal /internal channel by prefix
2 participants