Skip to content
152 changes: 78 additions & 74 deletions src/azure-cli/azure/cli/command_modules/eventgrid/custom.py
Original file line number Diff line number Diff line change
Expand Up @@ -1333,136 +1333,119 @@ def _update_event_subscription_internal( # pylint: disable=too-many-locals,too-
deadletter_identity,
deadletter_endpoint)

event_subscription_destination_with_resource_identity = None
if instance.delivery_with_resource_identity is not None:
event_subscription_destination_with_resource_identity = instance.delivery_with_resource_identity.destination

deadletter_destination = None
instance_delivery_with_resource_identity = instance.delivery_with_resource_identity
event_subscription_destination = instance.destination
event_subscription_labels = instance.labels
event_subscription_filter = instance.filter
event_delivery_schema = instance.event_delivery_schema
retry_policy = instance.retry_policy

if endpoint_type.lower() != WEBHOOK_DESTINATION.lower() and endpoint is None:
raise CLIError('Invalid usage: Since --endpoint-type is specified, a valid endpoint must also be specified.')

tennant_id = None
application_id = None

# for the update path, endpoint_type can be None but it does not mean that this is webhook,
# as it can be other types too.
if event_subscription_destination is not None and \
hasattr(event_subscription_destination, 'azure_active_directory_tenant_id'):
tennant_id = event_subscription_destination.azure_active_directory_tenant_id

if event_subscription_destination is not None and \
hasattr(event_subscription_destination, 'azure_active_directory_application_id_or_uri'):
application_id = event_subscription_destination.azure_active_directory_application_id_or_uri

if event_subscription_destination_with_resource_identity is not None and \
hasattr(event_subscription_destination_with_resource_identity, 'azure_active_directory_tenant_id'):
tennant_id = event_subscription_destination_with_resource_identity.azure_active_directory_tenant_id
current_destination = instance.destination
current_filter = instance.filter
current_event_delivery_schema = instance.event_delivery_schema
current_retry_policy = instance.retry_policy
current_destination_with_resource_identity = None

if event_subscription_destination_with_resource_identity is not None and \
hasattr(event_subscription_destination_with_resource_identity, 'azure_active_directory_application_id_or_uri'):
application_id = \
event_subscription_destination_with_resource_identity.azure_active_directory_application_id_or_uri
if instance.delivery_with_resource_identity is not None:
current_destination_with_resource_identity = instance.delivery_with_resource_identity.destination

max_events_per_batch = 0
preferred_batch_size_in_kilobytes = 0
tenant_id = _get_tenant_id(current_destination, current_destination_with_resource_identity)
application_id = _get_application_id(current_destination, current_destination_with_resource_identity)

if event_subscription_destination is not None and \
event_subscription_destination.endpoint_type is not None and (event_subscription_destination.endpoint_type.lower() == WEBHOOK_DESTINATION.lower() or event_subscription_destination.endpoint_type.lower() == AZUREFUNCTION_DESTINATION.lower()): # pylint: disable=line-too-long
max_events_per_batch = event_subscription_destination.max_events_per_batch
preferred_batch_size_in_kilobytes = event_subscription_destination.preferred_batch_size_in_kilobytes
# for the update path, endpoint_type can be None but it does not mean that this is webhook,
# as it can be other types too.
current_max_events_per_batch = 0
current_preferred_batch_size_in_kilobytes = 0

if event_subscription_destination_with_resource_identity is not None and \
event_subscription_destination_with_resource_identity.endpoint_type is not None and (event_subscription_destination_with_resource_identity.endpoint_type.lower() == WEBHOOK_DESTINATION.lower() or event_subscription_destination_with_resource_identity.endpoint_type.lower() == AZUREFUNCTION_DESTINATION.lower()): # pylint: disable=line-too-long
max_events_per_batch = event_subscription_destination_with_resource_identity.max_events_per_batch
preferred_batch_size_in_kilobytes = event_subscription_destination_with_resource_identity.preferred_batch_size_in_kilobytes # pylint: disable=line-too-long
if current_destination is not None and (current_destination.endpoint_type.lower() == WEBHOOK_DESTINATION.lower() or current_destination.endpoint_type.lower() == AZUREFUNCTION_DESTINATION.lower()): # pylint: disable=line-too-long
current_max_events_per_batch = current_destination.max_events_per_batch
current_preferred_batch_size_in_kilobytes = current_destination.preferred_batch_size_in_kilobytes
elif current_destination_with_resource_identity is not None and (current_destination_with_resource_identity.endpoint_type.lower() == WEBHOOK_DESTINATION.lower() or current_destination_with_resource_identity.endpoint_type.lower() == AZUREFUNCTION_DESTINATION.lower()): # pylint: disable=line-too-long
current_max_events_per_batch = current_destination_with_resource_identity.max_events_per_batch
current_preferred_batch_size_in_kilobytes = current_destination_with_resource_identity.preferred_batch_size_in_kilobytes # pylint: disable=line-too-long

delivery_with_resource_identity = None
updated_destination = None
updated_delivery_with_resource_identity = None

# if endpoint and delivery_identity_endpoint is not specified then use the instance value
if endpoint is None and delivery_identity_endpoint is None:
if event_subscription_destination is not None:
if current_destination is not None:
_validate_and_update_destination(
event_subscription_destination.endpoint_type,
event_subscription_destination,
current_destination.endpoint_type,
current_destination,
storage_queue_msg_ttl,
delivery_attribute_mapping)
elif event_subscription_destination_with_resource_identity is not None:
updated_destination = current_destination
elif current_destination_with_resource_identity is not None:
_validate_and_update_destination(
event_subscription_destination_with_resource_identity.endpoint_type,
instance_delivery_with_resource_identity.destination,
current_destination_with_resource_identity.endpoint_type,
current_destination_with_resource_identity.destination,
storage_queue_msg_ttl,
delivery_attribute_mapping)
delivery_with_resource_identity = instance_delivery_with_resource_identity
updated_delivery_with_resource_identity = current_destination_with_resource_identity
elif endpoint is not None:
_validate_destination_attribute(
endpoint_type,
storage_queue_msg_ttl,
delivery_attribute_mapping)
event_subscription_destination = _get_endpoint_destination(
updated_destination = _get_endpoint_destination(
endpoint_type,
endpoint,
max_events_per_batch,
preferred_batch_size_in_kilobytes,
tennant_id,
current_max_events_per_batch,
current_preferred_batch_size_in_kilobytes,
tenant_id,
application_id,
storage_queue_msg_ttl,
delivery_attribute_mapping)
elif delivery_identity_endpoint is not None:
identity_type_name = _get_event_subscription_identity_type(delivery_identity)
delivery_identity_info = EventSubscriptionIdentity(type=identity_type_name)
destination_with_identity = _get_endpoint_destination(
delivery_identity_endpoint_type,
delivery_identity_endpoint,
0,
0,
tennant_id,
tenant_id,
application_id,
storage_queue_msg_ttl,
delivery_attribute_mapping)
delivery_with_resource_identity = DeliveryWithResourceIdentity(

identity_type_name = _get_event_subscription_identity_type(delivery_identity)
delivery_identity_info = EventSubscriptionIdentity(type=identity_type_name)

updated_delivery_with_resource_identity = DeliveryWithResourceIdentity(
identity=delivery_identity_info,
destination=destination_with_identity)

updated_deadletter_destination = None
if deadletter_endpoint is not None:
deadletter_destination = _get_deadletter_destination(deadletter_endpoint)

deadletter_with_resource_identity = None
updated_deadletter_destination = _get_deadletter_destination(deadletter_endpoint)

updated_deadletter_with_resource_identity = None
if deadletter_identity_endpoint is not None:
deadletter_destination_with_identity = _get_deadletter_destination(deadletter_identity_endpoint)
deadletter_identity_type_name = _get_event_subscription_identity_type(deadletter_identity)
deadletter_delivery_identity_info = EventSubscriptionIdentity(type=deadletter_identity_type_name)
deadletter_with_resource_identity = DeadLetterWithResourceIdentity(
updated_deadletter_with_resource_identity = DeadLetterWithResourceIdentity(
identity=deadletter_delivery_identity_info,
dead_letter_destination=deadletter_destination_with_identity)

_set_event_subscription_filter(
event_subscription_filter,
_update_event_subscription_filter(
current_filter,
subject_begins_with,
subject_ends_with,
included_event_types,
enable_advanced_filtering_on_arrays,
advanced_filter)
updated_filter = current_filter

updated_labels = None
if labels is not None:
event_subscription_labels = labels
updated_labels = labels

params = EventSubscriptionUpdateParameters(
destination=event_subscription_destination,
filter=event_subscription_filter,
labels=event_subscription_labels,
retry_policy=retry_policy,
dead_letter_destination=deadletter_destination,
event_delivery_schema=event_delivery_schema,
delivery_with_resource_identity=delivery_with_resource_identity,
dead_letter_with_resource_identity=deadletter_with_resource_identity)
destination=updated_destination,
filter=updated_filter,
labels=updated_labels,
retry_policy=current_retry_policy,
dead_letter_destination=updated_deadletter_destination,
event_delivery_schema=current_event_delivery_schema,
delivery_with_resource_identity=updated_delivery_with_resource_identity,
dead_letter_with_resource_identity=updated_deadletter_with_resource_identity)

return params

Expand Down Expand Up @@ -1655,6 +1638,27 @@ def _get_event_subscription_identity_type(identity_type_name):
return result


def _get_tenant_id(destination, destination_with_resource_identity):
tenant_id = None

if destination is not None and hasattr(destination, 'azure_active_directory_tenant_id'):
tenant_id = destination.azure_active_directory_tenant_id
elif destination_with_resource_identity is not None and hasattr(destination_with_resource_identity, 'azure_active_directory_tenant_id'): # pylint: disable=line-too-long
tenant_id = destination_with_resource_identity.azure_active_directory_tenant_id

return tenant_id


def _get_application_id(destination, destination_with_resource_identity):
application_id = None

if destination is not None and hasattr(destination, 'azure_active_directory_application_id_or_uri'):
application_id = destination.azure_active_directory_application_id_or_uri
elif destination_with_resource_identity is not None and hasattr(destination_with_resource_identity, 'azure_active_directory_application_id_or_uri'): # pylint: disable=line-too-long
application_id = destination_with_resource_identity.azure_active_directory_application_id_or_uri
return application_id


def _get_input_schema_and_mapping(
input_schema=EVENTGRID_SCHEMA,
input_mapping_fields=None,
Expand Down Expand Up @@ -1812,7 +1816,7 @@ def _get_identity_info_only_if_not_none(identity=None):
return identity_info


def _set_event_subscription_filter(
def _update_event_subscription_filter(
event_subscription_filter,
subject_begins_with=None,
subject_ends_with=None,
Expand Down
Loading