Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
479 changes: 479 additions & 0 deletions src/containerapp/azext_containerapp/containerapp_job_decorator.py

Large diffs are not rendered by default.

272 changes: 42 additions & 230 deletions src/containerapp/azext_containerapp/custom.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
from msrestazure.tools import parse_resource_id, is_valid_resource_id
from msrest.exceptions import DeserializationError

from .containerapp_job_decorator import ContainerAppJobDecorator, ContainerAppJobCreateDecorator
from .containerapp_auth_decorator import ContainerAppAuthDecorator
from .containerapp_decorator import ContainerAppCreateDecorator, BaseContainerAppDecorator
from ._client_factory import handle_raw_exception, handle_non_404_exception
Expand Down Expand Up @@ -1324,250 +1325,61 @@ def create_containerappsjob(cmd,
user_assigned=None,
registry_identity=None,
workload_profile_name=None):
register_provider_if_needed(cmd, CONTAINER_APPS_RP)
validate_container_app_name(name, AppType.ContainerAppJob.name)
validate_create(registry_identity, registry_pass, registry_user, registry_server, no_wait)

if registry_identity and not is_registry_msi_system(registry_identity):
logger.info("Creating an acrpull role assignment for the registry identity")
create_acrpull_role_assignment(cmd, registry_server, registry_identity, skip_error=True)

if yaml:
if image or managed_env or trigger_type or replica_timeout or replica_retry_limit or\
replica_completion_count or parallelism or cron_expression or cpu or memory or registry_server or\
registry_user or registry_pass or secrets or env_vars or\
startup_command or args or tags:
not disable_warnings and logger.warning('Additional flags were passed along with --yaml. These flags will be ignored, and the configuration defined in the yaml will be used instead')
return create_containerappsjob_yaml(cmd=cmd, name=name, resource_group_name=resource_group_name, file_name=yaml, no_wait=no_wait)

if replica_timeout is None:
raise RequiredArgumentMissingError('Usage error: --replica-timeout is required')

if replica_retry_limit is None:
raise RequiredArgumentMissingError('Usage error: --replica-retry-limit is required')

if not image:
image = HELLO_WORLD_IMAGE

if managed_env is None:
raise RequiredArgumentMissingError('Usage error: --environment is required if not using --yaml')

# Validate managed environment
parsed_managed_env = parse_resource_id(managed_env)
managed_env_name = parsed_managed_env['name']
managed_env_rg = parsed_managed_env['resource_group']
managed_env_info = None

try:
managed_env_info = ManagedEnvironmentClient.show(cmd=cmd, resource_group_name=managed_env_rg, name=managed_env_name)
except:
pass

if not managed_env_info:
raise ValidationError("The environment '{}' does not exist. Specify a valid environment".format(managed_env))

location = managed_env_info["location"]
_ensure_location_allowed(cmd, location, CONTAINER_APPS_RP, "jobs")

if not workload_profile_name and "workloadProfiles" in managed_env_info:
workload_profile_name = get_default_workload_profile_name_from_env(cmd, managed_env_info, managed_env_rg)

manualTriggerConfig_def = None
if trigger_type is not None and trigger_type.lower() == "manual":
manualTriggerConfig_def = ManualTriggerModel
manualTriggerConfig_def["replicaCompletionCount"] = 0 if replica_completion_count is None else replica_completion_count
manualTriggerConfig_def["parallelism"] = 0 if parallelism is None else parallelism

scheduleTriggerConfig_def = None
if trigger_type is not None and trigger_type.lower() == "schedule":
scheduleTriggerConfig_def = ScheduleTriggerModel
scheduleTriggerConfig_def["replicaCompletionCount"] = 0 if replica_completion_count is None else replica_completion_count
scheduleTriggerConfig_def["parallelism"] = 0 if parallelism is None else parallelism
scheduleTriggerConfig_def["cronExpression"] = cron_expression

eventTriggerConfig_def = None
if trigger_type is not None and trigger_type.lower() == "event":
scale_def = None
if min_executions is not None or max_executions is not None or polling_interval is not None:
scale_def = JobScaleModel
scale_def["pollingInterval"] = polling_interval
scale_def["minExecutions"] = min_executions
scale_def["maxExecutions"] = max_executions

if scale_rule_name:
scale_rule_type = scale_rule_type.lower()
scale_rule_def = ScaleRuleModel
curr_metadata = {}
metadata_def = parse_metadata_flags(scale_rule_metadata, curr_metadata)
auth_def = parse_auth_flags(scale_rule_auth)
scale_rule_def["name"] = scale_rule_name
scale_rule_def["type"] = scale_rule_type
scale_rule_def["metadata"] = metadata_def
scale_rule_def["auth"] = auth_def

if not scale_def:
scale_def = JobScaleModel
scale_def["rules"] = [scale_rule_def]

eventTriggerConfig_def = EventTriggerModel
eventTriggerConfig_def["replicaCompletionCount"] = replica_completion_count
eventTriggerConfig_def["parallelism"] = parallelism
eventTriggerConfig_def["scale"] = scale_def

secrets_def = None
if secrets is not None:
secrets_def = parse_secret_flags(secrets)

registries_def = None
if registry_server is not None and not is_registry_msi_system(registry_identity):
registries_def = RegistryCredentialsModel
registries_def["server"] = registry_server

# Infer credentials if not supplied and its azurecr
if (registry_user is None or registry_pass is None) and registry_identity is None:
registry_user, registry_pass = _infer_acr_credentials(cmd, registry_server, disable_warnings)

if not registry_identity:
registries_def["username"] = registry_user

if secrets_def is None:
secrets_def = []
registries_def["passwordSecretRef"] = store_as_secret_and_return_secret_ref(secrets_def, registry_user, registry_server, registry_pass, disable_warnings=disable_warnings)
else:
registries_def["identity"] = registry_identity

config_def = JobConfigurationModel
config_def["secrets"] = secrets_def
config_def["triggerType"] = trigger_type
config_def["replicaTimeout"] = replica_timeout
config_def["replicaRetryLimit"] = replica_retry_limit
config_def["manualTriggerConfig"] = manualTriggerConfig_def if manualTriggerConfig_def is not None else None
config_def["scheduleTriggerConfig"] = scheduleTriggerConfig_def if scheduleTriggerConfig_def is not None else None
config_def["eventTriggerConfig"] = eventTriggerConfig_def if eventTriggerConfig_def is not None else None
config_def["registries"] = [registries_def] if registries_def is not None else None

# Identity actions
identity_def = ManagedServiceIdentityModel
identity_def["type"] = "None"

assign_system_identity = system_assigned
if user_assigned:
assign_user_identities = [x.lower() for x in user_assigned]
else:
assign_user_identities = []

if assign_system_identity and assign_user_identities:
identity_def["type"] = "SystemAssigned, UserAssigned"
elif assign_system_identity:
identity_def["type"] = "SystemAssigned"
elif assign_user_identities:
identity_def["type"] = "UserAssigned"

if assign_user_identities:
identity_def["userAssignedIdentities"] = {}
subscription_id = get_subscription_id(cmd.cli_ctx)

for r in assign_user_identities:
r = _ensure_identity_resource_id(subscription_id, resource_group_name, r)
identity_def["userAssignedIdentities"][r] = {} # pylint: disable=unsupported-assignment-operation

resources_def = None
if cpu is not None or memory is not None:
resources_def = ContainerResourcesModel
resources_def["cpu"] = cpu
resources_def["memory"] = memory

container_def = ContainerModel
container_def["name"] = container_name if container_name else name
container_def["image"] = image if not is_registry_msi_system(registry_identity) else HELLO_WORLD_IMAGE
if env_vars is not None:
container_def["env"] = parse_env_var_flags(env_vars)
if startup_command is not None:
container_def["command"] = startup_command
if args is not None:
container_def["args"] = args
if resources_def is not None:
container_def["resources"] = resources_def

template_def = JobTemplateModel
template_def["containers"] = [container_def]

containerappjob_def = ContainerAppsJobModel
containerappjob_def["location"] = location
containerappjob_def["identity"] = identity_def
containerappjob_def["properties"]["environmentId"] = managed_env
containerappjob_def["properties"]["configuration"] = config_def
containerappjob_def["properties"]["template"] = template_def
containerappjob_def["tags"] = tags

if workload_profile_name:
containerappjob_def["properties"]["workloadProfileName"] = workload_profile_name
ensure_workload_profile_supported(cmd, managed_env_name, managed_env_rg, workload_profile_name, managed_env_info)

if registry_identity:
if is_registry_msi_system(registry_identity):
set_managed_identity(cmd, resource_group_name, containerappjob_def, system_assigned=True)
else:
set_managed_identity(cmd, resource_group_name, containerappjob_def, user_assigned=[registry_identity])
try:
r = ContainerAppsJobClient.create_or_update(
cmd=cmd, resource_group_name=resource_group_name, name=name, containerapp_job_envelope=containerappjob_def, no_wait=no_wait)

if is_registry_msi_system(registry_identity):
while r["properties"]["provisioningState"] == "InProgress":
r = ContainerAppClient.show(cmd, resource_group_name, name)
time.sleep(10)
logger.info("Creating an acrpull role assignment for the system identity")
system_sp = r["identity"]["principalId"]
create_acrpull_role_assignment(cmd, registry_server, registry_identity=None, service_principal=system_sp)
container_def["image"] = image

registries_def = RegistryCredentialsModel
registries_def["server"] = registry_server
registries_def["identity"] = registry_identity
config_def["registries"] = [registries_def]

r = ContainerAppsJobClient.create_or_update(cmd=cmd, resource_group_name=resource_group_name, name=name, containerapp_job_envelope=containerappjob_def, no_wait=no_wait)
raw_parameters = locals()
containerapp_job_create_decorator = ContainerAppJobCreateDecorator(
cmd=cmd,
client=ContainerAppsJobClient,
raw_parameters=raw_parameters,
models="azext_containerapp._sdk_models"
)
containerapp_job_create_decorator.register_provider(CONTAINER_APPS_RP)
containerapp_job_create_decorator.validate_arguments()

if "properties" in r and "provisioningState" in r["properties"] and r["properties"]["provisioningState"].lower() == "waiting" and not no_wait:
not disable_warnings and logger.warning('Containerapp job creation in progress. Please monitor the creation using `az containerapp job show -n {} -g {}`'.format(name, resource_group_name))
containerapp_job_create_decorator.construct_payload()
r = containerapp_job_create_decorator.create()
containerapp_job_create_decorator.construct_for_post_process(r)
r = containerapp_job_create_decorator.post_process(r)

return r
except Exception as e:
handle_raw_exception(e)
return r


def show_containerappsjob(cmd, name, resource_group_name):
_validate_subscription_registered(cmd, CONTAINER_APPS_RP)
raw_parameters = locals()
containerapp_job_decorator = ContainerAppJobDecorator(
cmd=cmd,
client=ContainerAppsJobClient,
raw_parameters=raw_parameters,
models="azext_containerapp._sdk_models"
)
containerapp_job_decorator.validate_subscription_registered(CONTAINER_APPS_RP)

try:
return ContainerAppsJobClient.show(cmd=cmd, resource_group_name=resource_group_name, name=name)
except CLIError as e:
handle_raw_exception(e)
return containerapp_job_decorator.show()


def list_containerappsjob(cmd, resource_group_name=None):
_validate_subscription_registered(cmd, CONTAINER_APPS_RP)
raw_parameters = locals()
containerapp_job_decorator = ContainerAppJobDecorator(
cmd=cmd,
client=ContainerAppsJobClient,
raw_parameters=raw_parameters,
models="azext_containerapp._sdk_models"
)
containerapp_job_decorator.validate_subscription_registered(CONTAINER_APPS_RP)

try:
containerappsjobs = []
if resource_group_name is None:
containerappsjobs = ContainerAppsJobClient.list_by_subscription(cmd=cmd)
else:
containerappsjobs = ContainerAppsJobClient.list_by_resource_group(cmd=cmd, resource_group_name=resource_group_name)

return containerappsjobs
except CLIError as e:
handle_raw_exception(e)
return containerapp_job_decorator.list()


def delete_containerappsjob(cmd, name, resource_group_name, no_wait=False):
_validate_subscription_registered(cmd, CONTAINER_APPS_RP)
raw_parameters = locals()
containerapp_job_decorator = ContainerAppJobDecorator(
cmd=cmd,
client=ContainerAppsJobClient,
raw_parameters=raw_parameters,
models="azext_containerapp._sdk_models"
)
containerapp_job_decorator.validate_subscription_registered(CONTAINER_APPS_RP)

try:
return ContainerAppsJobClient.delete(cmd=cmd, name=name, resource_group_name=resource_group_name, no_wait=no_wait)
except CLIError as e:
handle_raw_exception(e)
return containerapp_job_decorator.delete()


def update_containerappsjob(cmd,
Expand Down
Loading