Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,10 @@ def cf_synapse_spark_pool(cli_ctx, workspace_name):
return cf_synapse_client_artifacts_factory(cli_ctx, workspace_name).big_data_pools


def cf_synapse_library(cli_ctx, workspace_name):
return cf_synapse_client_artifacts_factory(cli_ctx, workspace_name).library


def cf_synapse_client_managedprivateendpoints_factory(cli_ctx, workspace_name):
from azure.synapse.managedprivateendpoints import ManagedPrivateEndpointsClient
from azure.cli.core._profile import Profile
Expand Down
57 changes: 57 additions & 0 deletions src/azure-cli/azure/cli/command_modules/synapse/_help.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,10 @@
text: |-
az synapse spark pool update --name testpool --workspace-name testsynapseworkspace --resource-group rg \\
--enable-auto-scale --min-node-count 3 --max-node-count 100
- name: Update the Spark pool's custom libraries.
text: |-
az synapse spark pool update --name testpool --workspace-name testsynapseworkspace --resource-group rg \\
--package-action Add --package package1.jar package2.jar
"""

helps['synapse spark pool delete'] = """
Expand Down Expand Up @@ -1562,6 +1566,59 @@
az synapse notebook delete --workspace-name testsynapseworkspace \\
--name testnotebook
"""
helps['synapse workspace-package'] = """
type: group
short-summary: Manage Synapse's workspace packages.
"""

helps['synapse workspace-package upload'] = """
type: command
short-summary: Upload a local workspace package file to an Azure Synapse workspace.
examples:
- name: Upload a local workspace package file to an Azure Synapse workspace.
text: |-
az synapse workspace-package upload --workspace-name testsynapseworkspace \\
--package C:/package.jar
"""

helps['synapse workspace-package upload-batch'] = """
type: command
short-summary: Upload workspace package files from a local directory to an Azure Synapse workspace.
examples:
- name: Upload workspace package files from a local directory to an Azure Synapse workspace.
text: |-
az synapse workspace-package upload-batch --workspace-name testsynapseworkspace \\
--source C:/package
"""

helps['synapse workspace-package show'] = """
type: command
short-summary: Get a workspace package.
examples:
- name: Get a workspace package.
text: |-
az synapse workspace-package show --workspace-name testsynapseworkspace \\
--name testpackage.jar
"""

helps['synapse workspace-package list'] = """
type: command
short-summary: List workspace packages.
examples:
- name: List workspace packages.
text: |-
az synapse workspace-package list --workspace-name testsynapseworkspace
"""

helps['synapse workspace-package delete'] = """
type: command
short-summary: Delete a workspace package.
examples:
- name: Delete a workspace package.
text: |-
az synapse workspace-package delete --workspace-name testsynapseworkspace \\
--name testpackage.jar
"""

helps['synapse integration-runtime'] = """
type: group
Expand Down
29 changes: 27 additions & 2 deletions src/azure-cli/azure/cli/command_modules/synapse/_params.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@
from argcomplete import FilesCompleter
from azure.mgmt.synapse.models import TransparentDataEncryptionStatus, SecurityAlertPolicyState, BlobAuditingPolicyState
from azure.cli.core.commands.parameters import name_type, tags_type, get_three_state_flag, get_enum_type, \
get_resource_name_completion_list, get_location_type
get_resource_name_completion_list, get_location_type, file_type
from azure.cli.core.commands.validators import get_default_location_from_resource_group
from azure.cli.core.util import get_json_object, shell_safe_json_parse
from ._validators import validate_storage_account, validate_statement_language
from ._validators import validate_storage_account, validate_statement_language, add_progress_callback
from ._completers import get_role_definition_name_completion_list
from .constant import SparkBatchLanguage, SparkStatementLanguage, SqlPoolConnectionClientType, PrincipalType, \
SqlPoolConnectionClientAuthenticationType, ItemType
Expand All @@ -33,6 +33,8 @@
definition_file_arg_type = CLIArgumentType(options_list=['--file'], completer=FilesCompleter(),
type=shell_safe_json_parse,
help='Properties may be supplied from a JSON file using the `@{path}` syntax or a JSON string.')
progress_type = CLIArgumentType(help='Include this flag to disable progress reporting for the command.',
action='store_true', validator=add_progress_callback)
time_format_help = 'Time should be in following format: "YYYY-MM-DDTHH:MM:SS".'

storage_arg_group = "Storage"
Expand Down Expand Up @@ -171,6 +173,11 @@ def load_arguments(self, _):
help='The library requirements file.')
c.argument('force', arg_type=get_three_state_flag(), help='The flag of force operation.')

# Custom Libraries
c.argument('package_action', arg_group='Custom Libraries', arg_type=get_enum_type(['Add', 'Remove']),
help='Package action must be specified when you add or remove a workspace package from a Apache Spark pool.')
c.argument('package', arg_group='Custom Libraries', nargs='+', help='List of workspace packages name.')

# synapse sql pool
with self.argument_context('synapse sql pool') as c:
c.argument('workspace_name', id_part='name', help='The workspace name.')
Expand Down Expand Up @@ -817,6 +824,24 @@ def load_arguments(self, _):
c.argument('workspace_name', arg_type=workspace_name_arg_type)
c.argument('notebook_name', arg_type=name_type, help='The notebook name.')

# synapse artifacts library
with self.argument_context('synapse workspace-package') as c:
c.argument('workspace_name', arg_type=workspace_name_arg_type)
c.ignore('progress_callback')

for scope in ['show', 'delete']:
with self.argument_context('synapse workspace-package ' + scope) as c:
c.argument('package_name', arg_type=name_type, options_list=['--package-name', '--package', '--name', '-n'], help='The workspace package name.')

with self.argument_context('synapse workspace-package upload') as c:
c.argument('package', options_list=('--package', '--file', '-f'), type=file_type, completer=FilesCompleter(),
help='Specifies a local file path for a file to upload as workspace package.')
c.extra('no_progress', progress_type)

with self.argument_context('synapse workspace-package upload-batch') as c:
c.argument('source', options_list=('--source', '-s'), help='The directory where the files to be uploaded are located.')
c.extra('no_progress', progress_type)

# synapse integration runtime
with self.argument_context('synapse integration-runtime') as c:
c.argument('workspace_name', arg_type=workspace_name_arg_type, id_part='name')
Expand Down
18 changes: 18 additions & 0 deletions src/azure-cli/azure/cli/command_modules/synapse/_validators.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,21 @@ def validate_audit_policy_arguments(namespace):
namespace.retention_days])
if not namespace.state and not blob_storage_arguments_provided:
raise CLIError('Either state or blob storage arguments are missing')


def add_progress_callback(cmd, namespace):
def _update_progress(current, total):
message = getattr(_update_progress, 'message', 'Alive')
reuse = getattr(_update_progress, 'reuse', False)

if total:
hook.add(message=message, value=current, total_val=total)
if total == current and not reuse:
hook.end()

hook = cmd.cli_ctx.get_progress_controller(det=True)
_update_progress.hook = hook

if not namespace.no_progress:
namespace.progress_callback = _update_progress
del namespace.no_progress
13 changes: 13 additions & 0 deletions src/azure-cli/azure/cli/command_modules/synapse/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,10 @@ def get_custom_sdk(custom_module, client_factory):
operation_tmpl='azure.synapse.artifacts.operations#NotebookOperations.{}',
client_factory=None)

synapse_library_sdk = CliCommandType(
operation_tmpl='azure.synapse.artifacts.operations#LibraryOperations.{}',
client_factory=None)

synapse_managed_private_endpoints_sdk = CliCommandType(
operations_tmpl='azure.synapse.managedprivateendpoints.operations#ManagedPrivateEndpoints.{}',
client_factory=None)
Expand Down Expand Up @@ -480,6 +484,15 @@ def get_custom_sdk(custom_module, client_factory):
g.custom_command('export', 'export_notebook')
g.custom_command('delete', 'delete_notebook', confirmation=True, supports_no_wait=True)

# Data Plane Commands --Artifacts library operations
with self.command_group('synapse workspace-package', synapse_library_sdk,
custom_command_type=get_custom_sdk('artifacts', None)) as g:
g.custom_command('upload', 'upload_workspace_package')
g.custom_command('upload-batch', 'workspace_package_upload_batch')
g.custom_command('list', 'list_workspace_package')
g.custom_show_command('show', 'get_workspace_package')
g.custom_command('delete', 'delete_workspace_package', confirmation=True, supports_no_wait=True)

# Data Plane Commands --Managed private endpoints operations
with self.command_group('synapse managed-private-endpoints', synapse_managed_private_endpoints_sdk,
custom_command_type=get_custom_sdk('managedprivateendpoints', None)) as g:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@
Trigger, DataFlow, BigDataPoolReference, NotebookSessionProperties,
NotebookResource)
from azure.cli.core.util import sdk_no_wait, CLIError
from azure.core.exceptions import ResourceNotFoundError
from .._client_factory import (cf_synapse_linked_service, cf_synapse_dataset, cf_synapse_pipeline,
cf_synapse_pipeline_run, cf_synapse_trigger, cf_synapse_trigger_run,
cf_synapse_data_flow, cf_synapse_notebook, cf_synapse_spark_pool)
cf_synapse_data_flow, cf_synapse_notebook, cf_synapse_spark_pool,
cf_synapse_library)
from ..constant import EXECUTOR_SIZE, SPARK_SERVICE_ENDPOINT_API_VERSION


Expand Down Expand Up @@ -344,3 +346,89 @@ def write_to_file(notebook, path):
def delete_notebook(cmd, workspace_name, notebook_name, no_wait=False):
client = cf_synapse_notebook(cmd.cli_ctx, workspace_name)
return sdk_no_wait(no_wait, client.begin_delete_notebook, notebook_name, polling=True)


# Workspace package
def list_workspace_package(cmd, workspace_name):
client = cf_synapse_library(cmd.cli_ctx, workspace_name)
return client.list()


def get_workspace_package(cmd, workspace_name, package_name):
client = cf_synapse_library(cmd.cli_ctx, workspace_name)
return client.get(package_name)


def upload_workspace_package(cmd, workspace_name, package, progress_callback=None):
client = cf_synapse_library(cmd.cli_ctx, workspace_name)
package_name = os.path.basename(package)

# Check if the package already exists
if test_workspace_package(client, package_name):
raise CLIError("A workspace package with name '{0}' already exists.".format(
package_name))

# Create package
client.begin_create(package_name).result()
# Upload package content
package_size = os.path.getsize(package)
chunk_size = 4 * 1024 * 1024
if progress_callback is not None:
progress_callback(0, package_size)
with open(package, 'rb') as stream:
index = 0
while True:
read_size = min(chunk_size, package_size - index)
data = stream.read(read_size)

if data == b'':
break

client.append(package_name, data)
index += len(data)
if progress_callback is not None:
progress_callback(index, package_size)
# Call Flush API as a completion signal
client.begin_flush(package_name).result()

return client.get(package_name)


def workspace_package_upload_batch(cmd, workspace_name, source, progress_callback=None):
# Tell progress reporter to reuse the same hook
if progress_callback:
progress_callback.reuse = True

source_files = []
results = []
for root, _, files in os.walk(source):
for f in files:
full_path = os.path.join(root, f)
source_files.append(full_path)

for index, source_file in enumerate(source_files):
# add package name and number to progress message
if progress_callback:
progress_callback.message = '{}/{}: "{}"'.format(
index + 1, len(source_files), os.path.basename(source_file))

results.append(upload_workspace_package(cmd, workspace_name, source_file, progress_callback))

# end progress hook
if progress_callback:
progress_callback.hook.end()

return results


def test_workspace_package(client, package_name):
try:
client.get(package_name)
return True
except ResourceNotFoundError:
return False


def delete_workspace_package(cmd, workspace_name, package_name, no_wait=False):
client = cf_synapse_library(cmd.cli_ctx, workspace_name)
return sdk_no_wait(no_wait, client.begin_delete, package_name)
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@
# --------------------------------------------------------------------------------------------
# pylint: disable=unused-argument, line-too-long
from azure.cli.core.util import sdk_no_wait, read_file_content
from azure.mgmt.synapse.models import BigDataPoolResourceInfo, AutoScaleProperties, AutoPauseProperties, LibraryRequirements, NodeSizeFamily
from azure.mgmt.synapse.models import BigDataPoolResourceInfo, AutoScaleProperties, AutoPauseProperties, LibraryRequirements, NodeSizeFamily, LibraryInfo
from .._client_factory import cf_synapse_client_workspace_factory
from .artifacts import get_workspace_package


# Synapse sparkpool
Expand Down Expand Up @@ -43,7 +44,9 @@ def update_spark_pool(cmd, client, resource_group_name, workspace_name, spark_po
node_size=None, node_count=None, enable_auto_scale=None,
min_node_count=None, max_node_count=None,
enable_auto_pause=None, delay=None,
library_requirements=None, tags=None, force=False, no_wait=False):
library_requirements=None,
package_action=None, package=None,
tags=None, force=False, no_wait=False):
existing_spark_pool = client.get(resource_group_name, workspace_name, spark_pool_name)

if node_size:
Expand Down Expand Up @@ -78,6 +81,17 @@ def update_spark_pool(cmd, client, resource_group_name, workspace_name, spark_po
existing_spark_pool.auto_pause = AutoPauseProperties(enabled=enable_auto_pause,
delay_in_minutes=delay)

if package_action and package:
if package_action == "Add":
if existing_spark_pool.custom_libraries is None:
existing_spark_pool.custom_libraries = []
for item in package:
package_get = get_workspace_package(cmd, workspace_name, item)
library = LibraryInfo(name=package_get.name, type=package_get.properties.type, path=package_get.properties.path, container_name=package_get.properties.container_name)
existing_spark_pool.custom_libraries.append(library)
if package_action == "Remove":
existing_spark_pool.custom_libraries = [library for library in existing_spark_pool.custom_libraries if library.name not in package]

return sdk_no_wait(no_wait, client.begin_create_or_update, resource_group_name, workspace_name, spark_pool_name,
existing_spark_pool, force=force)

Expand Down
Binary file not shown.

This file was deleted.

Loading