diff --git a/sdk/batch/azure-batch/tests/batch_preparers.py b/sdk/batch/azure-batch/tests/batch_preparers.py index fc31832eb071..1083aee951d8 100644 --- a/sdk/batch/azure-batch/tests/batch_preparers.py +++ b/sdk/batch/azure-batch/tests/batch_preparers.py @@ -101,10 +101,6 @@ def create_resource(self, name, **kwargs): keys.primary) if storage: self._add_app_package(group.name, name) - self.test_class_instance.scrubber.register_name_pair( - name, - self.resource_moniker - ) else: # If using pilotprod, need to prefix the region with the environment. # IE: myaccount.pilotprod1.eastus.batch.azure.com diff --git a/sdk/batch/azure-batch/tests/test_batch.py b/sdk/batch/azure-batch/tests/test_batch.py index 7a033b155987..0c4731259154 100644 --- a/sdk/batch/azure-batch/tests/test_batch.py +++ b/sdk/batch/azure-batch/tests/test_batch.py @@ -7,10 +7,7 @@ #-------------------------------------------------------------------------- import datetime import io -import logging import time -import unittest -import requests import azure.batch from azure.batch import models @@ -23,11 +20,10 @@ ) from devtools_testutils import ( - AzureMgmtTestCase, + AzureMgmtRecordedTestCase, ResourceGroupPreparer, StorageAccountPreparer, - CachedResourceGroupPreparer, - CachedStorageAccountPreparer + CachedResourceGroupPreparer ) from devtools_testutils.fake_credentials import BATCH_TEST_PASSWORD @@ -38,7 +34,7 @@ DEFAULT_VM_SIZE = 'standard_d2_v2' -class BatchTest(AzureMgmtTestCase): +class TestBatch(AzureMgmtRecordedTestCase): def _batch_url(self, batch): if batch.account_endpoint.startswith('https://'): @@ -65,42 +61,43 @@ def create_sharedkey_client(self, batch_account, credentials, **kwargs): def assertBatchError(self, code, func, *args, **kwargs): try: func(*args, **kwargs) - self.fail("BatchErrorException expected but not raised") + pytest.fail("BatchErrorException expected but not raised") except models.BatchErrorException as err: - self.assertEqual(err.error.code, code) + assert err.error.code == code except Exception as err: - self.fail("Expected BatchErrorExcption, instead got: {!r}".format(err)) + pytest.fail("Expected BatchErrorExcption, instead got: {!r}".format(err)) def assertCreateTasksError(self, code, func, *args, **kwargs): try: func(*args, **kwargs) - self.fail("CreateTasksError expected but not raised") + pytest.fail("CreateTasksError expected but not raised") except models.CreateTasksErrorException as err: try: batch_error = err.errors.pop() if code: - self.assertEqual(batch_error.error.code, code) + assert batch_error.error.code == code except IndexError: - self.fail("Inner BatchErrorException expected but not exist") + pytest.fail("Inner BatchErrorException expected but not exist") except Exception as err: - self.fail("Expected CreateTasksError, instead got: {!r}".format(err)) + pytest.fail("Expected CreateTasksError, instead got: {!r}".format(err)) @pytest.mark.live_test_only("Can't use recordings until tests use the test proxy") @ResourceGroupPreparer(location=AZURE_LOCATION) @StorageAccountPreparer(name_prefix='batch1', location=AZURE_LOCATION) @AccountPreparer(location=AZURE_LOCATION, batch_environment=BATCH_ENVIRONMENT) @JobPreparer() - def test_batch_applications(self, batch_job, **kwargs): + def test_batch_applications(self, **kwargs): + batch_job = kwargs.pop("batch_job") client = self.create_sharedkey_client(**kwargs) # Test List Applications apps = list(client.application.list()) - self.assertEqual(len(apps), 1) + assert len(apps) == 1 # Test Get Application app = client.application.get('application_id') - self.assertIsInstance(app, models.ApplicationSummary) - self.assertEqual(app.id, 'application_id') - self.assertEqual(app.versions, ['v1.0']) + assert isinstance(app, models.ApplicationSummary) + assert app.id == 'application_id' + assert app.versions == ['v1.0'] # Test Create Task with Application Package task_id = 'python_task_with_app_package' @@ -110,12 +107,12 @@ def test_batch_applications(self, batch_job, **kwargs): application_package_references=[models.ApplicationPackageReference(application_id='application_id', version='v1.0')] ) response = client.task.add(batch_job.id, task) - self.assertIsNone(response) + assert response is None # Test Get Task with Application Package task = client.task.get(batch_job.id, task_id) - self.assertIsInstance(task, models.CloudTask) - self.assertEqual(task.application_package_references[0].application_id, 'application_id') + assert isinstance(task, models.CloudTask) + assert task.application_package_references[0].application_id == 'application_id' @pytest.mark.live_test_only("Can't use recordings until tests use the test proxy") @ResourceGroupPreparer(location=AZURE_LOCATION) @@ -131,19 +128,19 @@ def test_batch_certificates(self, **kwargs): password='nodesdk') response = client.certificate.add(certificate) - self.assertIsNone(response) + assert response is None # Test List Certificates certs = client.certificate.list() test_cert = [c for c in certs if c.thumbprint == 'cff2ab63c8c955aaf71989efa641b906558d9fb7'] - self.assertEqual(len(test_cert), 1) + assert len(test_cert) == 1 # Test Get Certificate cert = client.certificate.get('sha1', 'cff2ab63c8c955aaf71989efa641b906558d9fb7') - self.assertIsInstance(cert, models.Certificate) - self.assertEqual(cert.thumbprint, 'cff2ab63c8c955aaf71989efa641b906558d9fb7') - self.assertEqual(cert.thumbprint_algorithm, 'sha1') - self.assertIsNone(cert.delete_certificate_error) + assert isinstance(cert, models.Certificate) + assert cert.thumbprint == 'cff2ab63c8c955aaf71989efa641b906558d9fb7' + assert cert.thumbprint_algorithm == 'sha1' + assert cert.delete_certificate_error is None # Test Cancel Certificate Delete self.assertBatchError('CertificateStateActive', @@ -155,7 +152,7 @@ def test_batch_certificates(self, **kwargs): response = client.certificate.delete( 'sha1', 'cff2ab63c8c955aaf71989efa641b906558d9fb7') - self.assertIsNone(response) + assert response is None @pytest.mark.live_test_only("Can't use recordings until tests use the test proxy") @ResourceGroupPreparer(location=AZURE_LOCATION) @@ -165,8 +162,8 @@ def test_batch_create_pools(self, **kwargs): # Test List Node Agent SKUs response = client.account.list_supported_images() response = list(response) - self.assertTrue(len(response) > 1) - self.assertIsNotNone(response[-1].image_reference) + assert len(response) > 1 + assert response[-1].image_reference is not None # Test Create Iaas Pool users = [ @@ -188,17 +185,17 @@ def test_batch_create_pools(self, **kwargs): user_accounts=users ) response = client.pool.add(test_iaas_pool) - self.assertIsNone(response) + assert response is None # Test list pool node counnt counts = list(client.account.list_pool_node_counts()) - self.assertIsNotNone(counts) - self.assertEqual(len(counts), 1) - self.assertEqual(counts[0].pool_id, test_iaas_pool.id) - self.assertIsNotNone(counts[0].dedicated) - self.assertEqual(counts[0].dedicated.total, 0) - self.assertEqual(counts[0].dedicated.leaving_pool, 0) - self.assertEqual(counts[0].low_priority.total, 0) + assert counts is not None + assert len(counts) == 1 + assert counts[0].pool_id == test_iaas_pool.id + assert counts[0].dedicated is not None + assert counts[0].dedicated.total == 0 + assert counts[0].dedicated.leaving_pool == 0 + assert counts[0].low_priority.total == 0 # Test Create Pool with Network Configuration #TODO Public IP tests @@ -253,10 +250,10 @@ def test_batch_create_pools(self, **kwargs): data_disks=[data_disk]) ) response = client.pool.add(test_disk_pool) - self.assertIsNone(response) + assert response is None disk_pool = client.pool.get(test_disk_pool.id) - self.assertEqual(disk_pool.virtual_machine_configuration.data_disks[0].lun, 1) - self.assertEqual(disk_pool.virtual_machine_configuration.data_disks[0].disk_size_gb, 50) + assert disk_pool.virtual_machine_configuration.data_disks[0].lun == 1 + assert disk_pool.virtual_machine_configuration.data_disks[0].disk_size_gb == 50 # Test Create Pool with Application Licenses test_app_pool = models.PoolAddParameter( @@ -273,9 +270,9 @@ def test_batch_create_pools(self, **kwargs): data_disks=[data_disk]) ) response = client.pool.add(test_app_pool) - self.assertIsNone(response) + assert response is None app_pool = client.pool.get(test_app_pool.id) - self.assertEqual(app_pool.application_licenses[0], "maya") + assert app_pool.application_licenses[0] == "maya" # Test Create Pool with Azure Disk Encryption test_ade_pool = models.PoolAddParameter( @@ -293,10 +290,10 @@ def test_batch_create_pools(self, **kwargs): node_agent_sku_id='batch.node.ubuntu 18.04') ) response = client.pool.add(test_ade_pool) - self.assertIsNone(response) + assert response is None ade_pool = client.pool.get(test_ade_pool.id) - self.assertEqual(ade_pool.virtual_machine_configuration.disk_encryption_configuration.targets, - [models.DiskEncryptionTarget.temporary_disk]) + targets = ade_pool.virtual_machine_configuration.disk_encryption_configuration.targets + assert targets == [models.DiskEncryptionTarget.temporary_disk] # Test Create Pool with Virtual Machine Configuration With Extensions test_vmextension_pool = models.PoolAddParameter( @@ -318,19 +315,19 @@ def test_batch_create_pools(self, **kwargs): node_agent_sku_id='batch.node.windows amd64') ) response = client.pool.add(test_vmextension_pool) - self.assertIsNone(response) + assert response is None vmextension_pool = client.pool.get(test_vmextension_pool.id) - self.assertTrue(vmextension_pool.virtual_machine_configuration.extensions[0].enable_automatic_upgrade) + assert vmextension_pool.virtual_machine_configuration.extensions[0].enable_automatic_upgrade # Test List Pools without Filters pools = list(client.pool.list()) - self.assertTrue(len(pools) > 1) + assert len(pools) > 1 # Test List Pools with Maximum options = models.PoolListOptions(max_results=1) pools = client.pool.list(options) pools.next() - self.assertEqual(len(pools.current_page), 1) + assert len(pools.current_page) == 1 # Test List Pools with Filter options = models.PoolListOptions( @@ -338,7 +335,7 @@ def test_batch_create_pools(self, **kwargs): select='id,state', expand='stats') pools = list(client.pool.list(options)) - self.assertEqual(len(pools), 1) + assert len(pools) == 1 @pytest.mark.live_test_only("Can't use recordings until tests use the test proxy") @ResourceGroupPreparer(location=AZURE_LOCATION) @@ -368,13 +365,13 @@ def test_batch_create_pool_with_blobfuse_mount(self, **kwargs): )] ) response = client.pool.add(test_iaas_pool) - self.assertIsNone(response) + assert response is None mount_pool = client.pool.get(test_iaas_pool.id) - self.assertIsNotNone(mount_pool.mount_configuration) - self.assertEqual(len(mount_pool.mount_configuration), 1) - self.assertIsNotNone(mount_pool.mount_configuration[0].azure_blob_file_system_configuration) - self.assertIsNone(mount_pool.mount_configuration[0].nfs_mount_configuration) + assert mount_pool.mount_configuration is not None + assert len(mount_pool.mount_configuration) == 1 + assert mount_pool.mount_configuration[0].azure_blob_file_system_configuration is not None + assert mount_pool.mount_configuration[0].nfs_mount_configuration is None @pytest.mark.live_test_only("Can't use recordings until tests use the test proxy") @ResourceGroupPreparer(location=AZURE_LOCATION) @@ -400,7 +397,7 @@ def test_batch_update_pools(self, **kwargs): ) ) response = client.pool.add(test_paas_pool) - self.assertIsNone(response) + assert response is None # Test Update Pool Parameters params = models.PoolUpdatePropertiesParameter( @@ -408,47 +405,48 @@ def test_batch_update_pools(self, **kwargs): application_package_references=[], metadata=[models.MetadataItem(name='foo', value='bar')]) response = client.pool.update_properties(test_paas_pool.id, params) - self.assertIsNone(response) + assert response is None # Test Patch Pool Parameters params = models.PoolPatchParameter(metadata=[models.MetadataItem(name='foo2', value='bar2')]) response = client.pool.patch(test_paas_pool.id, params) - self.assertIsNone(response) + assert response is None # Test Pool Exists response = client.pool.exists(test_paas_pool.id) - self.assertTrue(response) + assert response # Test Get Pool pool = client.pool.get(test_paas_pool.id) - self.assertIsInstance(pool, models.CloudPool) - self.assertEqual(pool.id, test_paas_pool.id) - self.assertEqual(pool.state, models.PoolState.active) - self.assertEqual(pool.allocation_state, models.AllocationState.steady) - self.assertEqual(pool.cloud_service_configuration.os_family, '5') - self.assertEqual(pool.vm_size, DEFAULT_VM_SIZE) - self.assertIsNone(pool.start_task) - self.assertEqual(pool.metadata[0].name, 'foo2') - self.assertEqual(pool.metadata[0].value, 'bar2') + assert isinstance(pool, models.CloudPool) + assert pool.id == test_paas_pool.id + assert pool.state == models.PoolState.active + assert pool.allocation_state == models.AllocationState.steady + assert pool.cloud_service_configuration.os_family == '5' + assert pool.vm_size == DEFAULT_VM_SIZE + assert pool.start_task is None + assert pool.metadata[0].name == 'foo2' + assert pool.metadata[0].value == 'bar2' # Test Get Pool with OData Clauses options = models.PoolGetOptions(select='id,state', expand='stats') pool = client.pool.get(test_paas_pool.id, options) - self.assertIsInstance(pool, models.CloudPool) - self.assertEqual(pool.id, test_paas_pool.id) - self.assertEqual(pool.state, models.PoolState.active) - self.assertIsNone(pool.allocation_state) - self.assertIsNone(pool.vm_size) + assert isinstance(pool, models.CloudPool) + assert pool.id == test_paas_pool.id + assert pool.state == models.PoolState.active + assert pool.allocation_state is None + assert pool.vm_size is None # Test Delete Pool response = client.pool.delete(test_paas_pool.id) - self.assertIsNone(response) + assert response is None @pytest.mark.live_test_only("Can't use recordings until tests use the test proxy") @ResourceGroupPreparer(location=AZURE_LOCATION) @AccountPreparer(location=AZURE_LOCATION, batch_environment=BATCH_ENVIRONMENT) @PoolPreparer(location=AZURE_LOCATION) - def test_batch_scale_pools(self, batch_pool, **kwargs): + def test_batch_scale_pools(self, **kwargs): + batch_pool = kwargs.pop("batch_pool") client = self.create_sharedkey_client(**kwargs) # Test Enable Autoscale interval = datetime.timedelta(minutes=6) @@ -457,14 +455,12 @@ def test_batch_scale_pools(self, batch_pool, **kwargs): auto_scale_formula='$TargetDedicatedNodes=2', auto_scale_evaluation_interval=interval) - self.assertIsNone(response) + assert response is None # Test Evaluate Autoscale result = client.pool.evaluate_auto_scale(batch_pool.name, '$TargetDedicatedNodes=3') - self.assertIsInstance(result, models.AutoScaleRun) - self.assertEqual( - result.results, - '$TargetDedicatedNodes=3;$TargetLowPriorityNodes=0;$NodeDeallocationOption=requeue') + assert isinstance(result, models.AutoScaleRun) + assert result.results == '$TargetDedicatedNodes=3;$TargetLowPriorityNodes=0;$NodeDeallocationOption=requeue' # Test Disable Autoscale pool = client.pool.get(batch_pool.name) @@ -472,7 +468,7 @@ def test_batch_scale_pools(self, batch_pool, **kwargs): time.sleep(5) pool = client.pool.get(batch_pool.name) response = client.pool.disable_auto_scale(batch_pool.name) - self.assertIsNone(response) + assert response is None # Test Pool Resize pool = client.pool.get(batch_pool.name) @@ -481,11 +477,11 @@ def test_batch_scale_pools(self, batch_pool, **kwargs): pool = client.pool.get(batch_pool.name) params = models.PoolResizeParameter(target_dedicated_nodes=0, target_low_priority_nodes=2) response = client.pool.resize(batch_pool.name, params) - self.assertIsNone(response) + assert response is None # Test Stop Pool Resize response = client.pool.stop_resize(batch_pool.name) - self.assertIsNone(response) + assert response is None pool = client.pool.get(batch_pool.name) while self.is_live and pool.allocation_state != models.AllocationState.steady: time.sleep(5) @@ -493,7 +489,7 @@ def test_batch_scale_pools(self, batch_pool, **kwargs): # Test Get Pool Usage Info info = list(client.pool.list_usage_metrics()) - self.assertEqual(info, []) + assert info == [] @pytest.mark.live_test_only("Can't use recordings until tests use the test proxy") @ResourceGroupPreparer(location=AZURE_LOCATION) @@ -517,33 +513,33 @@ def test_batch_job_schedules(self, **kwargs): job_specification=job_spec ) response = client.job_schedule.add(params) - self.assertIsNone(response) + assert response is None # Test List Job Schedules schedules = list(client.job_schedule.list()) - self.assertTrue(len(schedules) > 0) + assert len(schedules) > 0 # Test Get Job Schedule schedule = client.job_schedule.get(schedule_id) - self.assertIsInstance(schedule, models.CloudJobSchedule) - self.assertEqual(schedule.id, schedule_id) - self.assertEqual(schedule.state, models.JobScheduleState.active) + assert isinstance(schedule, models.CloudJobSchedule) + assert schedule.id == schedule_id + assert schedule.state == models.JobScheduleState.active # Test Job Schedule Exists exists = client.job_schedule.exists(schedule_id) - self.assertTrue(exists) + assert exists # Test List Jobs from Schedule jobs = list(client.job.list_from_job_schedule(schedule_id)) - self.assertTrue(len(jobs) > 0) + assert len(jobs) > 0 # Test Disable Job Schedule response = client.job_schedule.disable(schedule_id) - self.assertIsNone(response) + assert response is None # Test Enable Job Schedule response = client.job_schedule.enable(schedule_id) - self.assertIsNone(response) + assert response is None # Test Update Job Schedule job_spec = models.JobSpecification( @@ -554,7 +550,7 @@ def test_batch_job_schedules(self, **kwargs): ) params = models.JobScheduleUpdateParameter(schedule=schedule, job_specification=job_spec) response = client.job_schedule.update(schedule_id, params) - self.assertIsNone(response) + assert response is None # Test Patch Job Schedule schedule = models.Schedule( @@ -562,15 +558,15 @@ def test_batch_job_schedules(self, **kwargs): ) params = models.JobSchedulePatchParameter(schedule=schedule) response = client.job_schedule.patch(schedule_id, params) - self.assertIsNone(response) + assert response is None # Test Terminate Job Schedule response = client.job_schedule.terminate(schedule_id) - self.assertIsNone(response) + assert response is None # Test Delete Job Schedule response = client.job_schedule.delete(schedule_id) - self.assertIsNone(response) + assert response is None @pytest.mark.live_test_only("Can't use recordings until tests use the test proxy") @ResourceGroupPreparer(location=AZURE_LOCATION) @@ -621,11 +617,11 @@ def test_batch_network_configuration(self, **kwargs): # Test Compute Node Config nodes = list(client.compute_node.list(pool.id)) - self.assertEqual(len(nodes), 1) - self.assertIsInstance(nodes[0], models.ComputeNode) - self.assertEqual(len(nodes[0].endpoint_configuration.inbound_endpoints), 2) - self.assertEqual(nodes[0].endpoint_configuration.inbound_endpoints[0].name, 'TestEndpointConfig.0') - self.assertEqual(nodes[0].endpoint_configuration.inbound_endpoints[0].protocol.value, 'udp') + assert len(nodes) == 1 + assert isinstance(nodes[0], models.ComputeNode) + assert len(nodes[0].endpoint_configuration.inbound_endpoints) == 2 + assert nodes[0].endpoint_configuration.inbound_endpoints[0].name == 'TestEndpointConfig.0' + assert nodes[0].endpoint_configuration.inbound_endpoints[0].protocol.value == 'udp' @pytest.mark.live_test_only("Can't use recordings until tests use the test proxy") @ResourceGroupPreparer(location=AZURE_LOCATION) @@ -656,51 +652,52 @@ def test_batch_network_configuration_acceleratednetworking(self, **kwargs): time.sleep(10) network_pool = client.pool.get(pool.id) - self.assertTrue(network_pool.network_configuration.enable_accelerated_networking) + assert network_pool.network_configuration.enable_accelerated_networking @pytest.mark.live_test_only("Can't use recordings until tests use the test proxy") @ResourceGroupPreparer(location=AZURE_LOCATION) @AccountPreparer(location=AZURE_LOCATION, batch_environment=BATCH_ENVIRONMENT) @PoolPreparer(location=AZURE_LOCATION, size=2, config='iaas') - def test_batch_compute_nodes(self, batch_pool, **kwargs): + def test_batch_compute_nodes(self, **kwargs): + batch_pool = kwargs.pop("batch_pool") client = self.create_sharedkey_client(**kwargs) # Test List Compute Nodes nodes = list(client.compute_node.list(batch_pool.name)) - self.assertEqual(len(nodes), 2) + assert len(nodes) == 2 while self.is_live and any([n for n in nodes if n.state != models.ComputeNodeState.idle]): time.sleep(10) nodes = list(client.compute_node.list(batch_pool.name)) - self.assertEqual(len(nodes), 2) + assert len(nodes) == 2 # Test Get Compute Node node = client.compute_node.get(batch_pool.name, nodes[0].id) - self.assertIsInstance(node, models.ComputeNode) - self.assertEqual(node.scheduling_state, models.SchedulingState.enabled) - self.assertTrue(node.is_dedicated) - self.assertIsNotNone(node.node_agent_info) - self.assertIsNotNone(node.node_agent_info.version) + assert isinstance(node, models.ComputeNode) + assert node.scheduling_state == models.SchedulingState.enabled + assert node.is_dedicated + assert node.node_agent_info is not None + assert node.node_agent_info.version is not None # Test Upload Log config = models.UploadBatchServiceLogsConfiguration( container_url = "https://computecontainer.blob.core.windows.net/", start_time = datetime.datetime.utcnow() - datetime.timedelta(minutes=6)) result = client.compute_node.upload_batch_service_logs(batch_pool.name, nodes[0].id, config) - self.assertIsNotNone(result) - self.assertTrue(result.number_of_files_uploaded > 0) - self.assertIsNotNone(result.virtual_directory_name) + assert result is not None + assert result.number_of_files_uploaded > 0 + assert result.virtual_directory_name is not None # Test Disable Scheduling response = client.compute_node.disable_scheduling(batch_pool.name, nodes[0].id) - self.assertIsNone(response) + assert response is None # Test Enable Scheduling response = client.compute_node.enable_scheduling(batch_pool.name, nodes[0].id) - self.assertIsNone(response) + assert response is None # Test Reboot Node response = client.compute_node.reboot( batch_pool.name, nodes[0].id, node_reboot_option=models.ComputeNodeRebootOption.terminate) - self.assertIsNone(response) + assert response is None # Test Reimage Node self.assertBatchError('OperationNotValidOnNode', @@ -712,41 +709,42 @@ def test_batch_compute_nodes(self, batch_pool, **kwargs): # Test Remove Nodes options = models.NodeRemoveParameter(node_list=[n.id for n in nodes]) response = client.pool.remove_nodes(batch_pool.name, options) - self.assertIsNone(response) + assert response is None @pytest.mark.live_test_only("Can't use recordings until tests use the test proxy") @CachedResourceGroupPreparer(location=AZURE_LOCATION) @AccountPreparer(location=AZURE_LOCATION, batch_environment=BATCH_ENVIRONMENT) @PoolPreparer(location=AZURE_LOCATION, size=1) - def test_batch_compute_node_user(self, batch_pool, **kwargs): + def test_batch_compute_node_user(self, **kwargs): + batch_pool = kwargs.pop("batch_pool") client = self.create_sharedkey_client(**kwargs) nodes = list(client.compute_node.list(batch_pool.name)) while self.is_live and any([n for n in nodes if n.state != models.ComputeNodeState.idle]): time.sleep(10) nodes = list(client.compute_node.list(batch_pool.name)) - self.assertEqual(len(nodes), 1) + assert len(nodes) == 1 # Test Add User user_name = 'BatchPythonSDKUser' nodes = list(client.compute_node.list(batch_pool.name)) user = models.ComputeNodeUser(name=user_name, password=BATCH_TEST_PASSWORD, is_admin=False) response = client.compute_node.add_user(batch_pool.name, nodes[0].id, user) - self.assertIsNone(response) + assert response is None # Test Update User user = models.NodeUpdateUserParameter(password='liilef#$DdRGSa_ewkjh') response = client.compute_node.update_user(batch_pool.name, nodes[0].id, user_name, user) - self.assertIsNone(response) + assert response is None # Test Get remote login settings remote_login_settings = client.compute_node.get_remote_login_settings(batch_pool.name, nodes[0].id) - self.assertIsInstance(remote_login_settings, models.ComputeNodeGetRemoteLoginSettingsResult) - self.assertIsNotNone(remote_login_settings.remote_login_ip_address) - self.assertIsNotNone(remote_login_settings.remote_login_port) + assert isinstance(remote_login_settings, models.ComputeNodeGetRemoteLoginSettingsResult) + assert remote_login_settings.remote_login_ip_address is not None + assert remote_login_settings.remote_login_port is not None # Test Delete User response = client.compute_node.delete_user(batch_pool.name, nodes[0].id, user_name) - self.assertIsNone(response) + assert response is None @pytest.mark.live_test_only("Can't use recordings until tests use the test proxy") @ResourceGroupPreparer(location=AZURE_LOCATION) @@ -754,18 +752,20 @@ def test_batch_compute_node_user(self, batch_pool, **kwargs): @AccountPreparer(location=AZURE_LOCATION, batch_environment=BATCH_ENVIRONMENT, name_prefix='batch4') @PoolPreparer(size=1) @JobPreparer() - def test_batch_files(self, batch_pool, batch_job, **kwargs): + def test_batch_files(self, **kwargs): + batch_pool = kwargs.pop("batch_pool") + batch_job = kwargs.pop("batch_job") client = self.create_sharedkey_client(**kwargs) nodes = list(client.compute_node.list(batch_pool.name)) while self.is_live and any([n for n in nodes if n.state != models.ComputeNodeState.idle]): time.sleep(10) nodes = list(client.compute_node.list(batch_pool.name)) - self.assertEqual(len(nodes), 1) + assert len(nodes) == 1 node = nodes[0].id task_id = 'test_task' task_param = models.TaskAddParameter(id=task_id, command_line='cmd /c "echo hello world"') response = client.task.add(batch_job.id, task_param) - self.assertIsNone(response) + assert response is None task = client.task.get(batch_job.id, task_id) while self.is_live and task.state != models.TaskState.completed: time.sleep(5) @@ -774,13 +774,13 @@ def test_batch_files(self, batch_pool, batch_job, **kwargs): # Test List Files from Compute Node all_files = client.file.list_from_compute_node(batch_pool.name, node, recursive=True) only_files = [f for f in all_files if not f.is_directory] - self.assertTrue(len(only_files) >= 2) + assert len(only_files) >= 2 # Test File Properties from Compute Node props = client.file.get_properties_from_compute_node( batch_pool.name, node, only_files[0].name, raw=True) - self.assertTrue('Content-Length' in props.headers) - self.assertTrue('Content-Type' in props.headers) + assert 'Content-Length' in props.headers + assert 'Content-Type' in props.headers # Test Get File from Compute Node file_length = 0 @@ -788,22 +788,22 @@ def test_batch_files(self, batch_pool, batch_job, **kwargs): response = client.file.get_from_compute_node(batch_pool.name, node, only_files[0].name) for data in response: file_length += len(data) - self.assertEqual(file_length, props.headers['Content-Length']) + assert file_length == props.headers['Content-Length'] # Test Delete File from Compute Node response = client.file.delete_from_compute_node(batch_pool.name, node, only_files[0].name) - self.assertIsNone(response) + assert response is None # Test List Files from Task all_files = client.file.list_from_task(batch_job.id, task_id) only_files = [f for f in all_files if not f.is_directory] - self.assertTrue(len(only_files) >= 1) + assert len(only_files) >= 1 # Test File Properties from Task props = client.file.get_properties_from_task( batch_job.id, task_id, only_files[0].name, raw=True) - self.assertTrue('Content-Length' in props.headers) - self.assertTrue('Content-Type' in props.headers) + assert 'Content-Length' in props.headers + assert 'Content-Type' in props.headers # Test Get File from Task file_length = 0 @@ -811,17 +811,18 @@ def test_batch_files(self, batch_pool, batch_job, **kwargs): response = client.file.get_from_task(batch_job.id, task_id, only_files[0].name) for data in response: file_length += len(data) - self.assertEqual(file_length, props.headers['Content-Length']) + assert file_length == props.headers['Content-Length'] # Test Delete File from Task response = client.file.delete_from_task(batch_job.id, task_id, only_files[0].name) - self.assertIsNone(response) + assert response is None @pytest.mark.live_test_only("Can't use recordings until tests use the test proxy") @ResourceGroupPreparer(location=AZURE_LOCATION) @AccountPreparer(location=AZURE_LOCATION, batch_environment=BATCH_ENVIRONMENT) @JobPreparer(on_task_failure=models.OnTaskFailure.perform_exit_options_job_action) - def test_batch_tasks(self, batch_job, **kwargs): + def test_batch_tasks(self, **kwargs): + batch_job = kwargs.pop("batch_job") client = self.create_sharedkey_client(**kwargs) # Test Create Task with Auto Complete @@ -842,10 +843,10 @@ def test_batch_tasks(self, batch_job, **kwargs): message += "\n{}: {}".format(v.key, v.value) raise Exception(message) task = client.task.get(batch_job.id, task_param.id) - self.assertIsInstance(task, models.CloudTask) - self.assertEqual(task.exit_conditions.default.job_action, models.JobAction.none) - self.assertEqual(task.exit_conditions.exit_codes[0].code, 1) - self.assertEqual(task.exit_conditions.exit_codes[0].exit_options.job_action, models.JobAction.terminate) + assert isinstance(task, models.CloudTask) + assert task.exit_conditions.default.job_action == models.JobAction.none + assert task.exit_conditions.exit_codes[0].code == 1 + assert task.exit_conditions.exit_codes[0].exit_options.job_action == models.JobAction.terminate # Test Create Task with Output Files container_url = "https://test.blob.core.windows.net:443/test-container" @@ -872,8 +873,8 @@ def test_batch_tasks(self, batch_job, **kwargs): ) client.task.add(batch_job.id, task_param) task = client.task.get(batch_job.id, task_param.id) - self.assertIsInstance(task, models.CloudTask) - self.assertEqual(len(task.output_files), 2) + assert isinstance(task, models.CloudTask) + assert len(task.output_files) == 2 # Test Create Task with Auto User auto_user = models.AutoUserSpecification( @@ -886,9 +887,9 @@ def test_batch_tasks(self, batch_job, **kwargs): ) client.task.add(batch_job.id, task_param) task = client.task.get(batch_job.id, task_param.id) - self.assertIsInstance(task, models.CloudTask) - self.assertEqual(task.user_identity.auto_user.scope, models.AutoUserScope.task) - self.assertEqual(task.user_identity.auto_user.elevation_level, models.ElevationLevel.admin) + assert isinstance(task, models.CloudTask) + assert task.user_identity.auto_user.scope == models.AutoUserScope.task + assert task.user_identity.auto_user.elevation_level == models.ElevationLevel.admin # Test Create Task with Token Settings task_param = models.TaskAddParameter( @@ -899,8 +900,8 @@ def test_batch_tasks(self, batch_job, **kwargs): ) client.task.add(batch_job.id, task_param) task = client.task.get(batch_job.id, task_param.id) - self.assertIsInstance(task, models.CloudTask) - self.assertEqual(task.authentication_token_settings.access[0], models.AccessScope.job) + assert isinstance(task, models.CloudTask) + assert task.authentication_token_settings.access[0] == models.AccessScope.job # Test Create Task with Container Settings task_param = models.TaskAddParameter( @@ -912,9 +913,9 @@ def test_batch_tasks(self, batch_job, **kwargs): ) client.task.add(batch_job.id, task_param) task = client.task.get(batch_job.id, task_param.id) - self.assertIsInstance(task, models.CloudTask) - self.assertEqual(task.container_settings.image_name, 'windows_container:latest') - self.assertEqual(task.container_settings.registry.user_name, 'username') + assert isinstance(task, models.CloudTask) + assert task.container_settings.image_name == 'windows_container:latest' + assert task.container_settings.registry.user_name == 'username' # Test Create Task with Run-As-User task_param = models.TaskAddParameter( @@ -924,8 +925,8 @@ def test_batch_tasks(self, batch_job, **kwargs): ) client.task.add(batch_job.id, task_param) task = client.task.get(batch_job.id, task_param.id) - self.assertIsInstance(task, models.CloudTask) - self.assertEqual(task.user_identity.user_name, 'task-user') + assert isinstance(task, models.CloudTask) + assert task.user_identity.user_name == 'task-user' # Test Add Task Collection tasks = [] @@ -934,47 +935,47 @@ def test_batch_tasks(self, batch_job, **kwargs): id=self.get_resource_name('batch_task{}_'.format(i)), command_line='cmd /c "echo hello world"')) result = client.task.add_collection(batch_job.id, tasks) - self.assertIsInstance(result, models.TaskAddCollectionResult) - self.assertEqual(len(result.value), 3) - self.assertEqual(result.value[0].status, models.TaskAddStatus.success) + assert isinstance(result, models.TaskAddCollectionResult) + assert len(result.value) == 3 + assert result.value[0].status == models.TaskAddStatus.success # Test List Tasks tasks = list(client.task.list(batch_job.id)) - self.assertEqual(len(tasks), 9) + assert len(tasks) == 9 # Test Count Tasks task_results = client.job.get_task_counts(batch_job.id) - self.assertIsInstance(task_results, models.TaskCountsResult) - self.assertEqual(task_results.task_counts.completed, 0) - self.assertEqual(task_results.task_counts.succeeded, 0) + assert isinstance(task_results, models.TaskCountsResult) + assert task_results.task_counts.completed == 0 + assert task_results.task_counts.succeeded == 0 # Test Terminate Task response = client.task.terminate(batch_job.id, task_param.id) - self.assertIsNone(response) + assert response is None task = client.task.get(batch_job.id, task_param.id) - self.assertEqual(task.state, models.TaskState.completed) + assert task.state == models.TaskState.completed # Test Reactivate Task response = client.task.reactivate(batch_job.id, task_param.id) - self.assertIsNone(response) + assert response is None task = client.task.get(batch_job.id, task_param.id) - self.assertEqual(task.state, models.TaskState.active) + assert task.state == models.TaskState.active # Test Update Task response = client.task.update( batch_job.id, task_param.id, constraints=models.TaskConstraints(max_task_retry_count=1)) - self.assertIsNone(response) + assert response is None # Test Get Subtasks # TODO: Test with actual subtasks subtasks = client.task.list_subtasks(batch_job.id, task_param.id) - self.assertIsInstance(subtasks, models.CloudTaskListSubtasksResult) - self.assertEqual(subtasks.value, []) + assert isinstance(subtasks, models.CloudTaskListSubtasksResult) + assert subtasks.value == [] # Test Delete Task response = client.task.delete(batch_job.id, task_param.id) - self.assertIsNone(response) + assert response is None # Test Bulk Add Task Failure task_id = "mytask" @@ -1018,11 +1019,10 @@ def test_batch_tasks(self, batch_job, **kwargs): resource_files=resource_files) tasks_to_add.append(task) result = client.task.add_collection(batch_job.id, tasks_to_add) - self.assertIsInstance(result, models.TaskAddCollectionResult) - self.assertEqual(len(result.value), 733) - self.assertEqual(result.value[0].status, models.TaskAddStatus.success) - self.assertTrue( - all(t.status == models.TaskAddStatus.success for t in result.value)) + assert isinstance(result, models.TaskAddCollectionResult) + assert len(result.value) == 733 + assert result.value[0].status == models.TaskAddStatus.success + assert all(t.status == models.TaskAddStatus.success for t in result.value) @pytest.mark.live_test_only("Can't use recordings until tests use the test proxy") @ResourceGroupPreparer(location=AZURE_LOCATION) @@ -1050,7 +1050,7 @@ def test_batch_jobs(self, **kwargs): job_release_task=job_release ) response = client.job.add(job_param) - self.assertIsNone(response) + assert response is None # Test Update Job constraints = models.JobConstraints(max_task_retry_count=3) @@ -1062,18 +1062,18 @@ def test_batch_jobs(self, **kwargs): ) ) response = client.job.update(job_param.id, options) - self.assertIsNone(response) + assert response is None # Test Patch Job options = models.JobPatchParameter(priority=900) response = client.job.patch(job_param.id, options) - self.assertIsNone(response) + assert response is None job = client.job.get(job_param.id) - self.assertIsInstance(job, models.CloudJob) - self.assertEqual(job.id, job_param.id) - self.assertEqual(job.constraints.max_task_retry_count, 3) - self.assertEqual(job.priority, 900) + assert isinstance(job, models.CloudJob) + assert job.id == job_param.id + assert job.constraints.max_task_retry_count == 3 + assert job.priority == 900 # Test Create Job with Auto Complete job_auto_param = models.JobAddParameter( @@ -1085,34 +1085,34 @@ def test_batch_jobs(self, **kwargs): ) ) response = client.job.add(job_auto_param) - self.assertIsNone(response) + assert response is None job = client.job.get(job_auto_param.id) - self.assertIsInstance(job, models.CloudJob) - self.assertEqual(job.on_all_tasks_complete, models.OnAllTasksComplete.terminate_job) - self.assertEqual(job.on_task_failure, models.OnTaskFailure.perform_exit_options_job_action) + assert isinstance(job, models.CloudJob) + assert job.on_all_tasks_complete == models.OnAllTasksComplete.terminate_job + assert job.on_task_failure == models.OnTaskFailure.perform_exit_options_job_action # Test List Jobs jobs = client.job.list() - self.assertIsInstance(jobs, models.CloudJobPaged) - self.assertEqual(len(list(jobs)), 2) + assert isinstance(jobs, models.CloudJobPaged) + assert len(list(jobs)) == 2 # Test Disable Job response = client.job.disable(job_param.id, models.DisableJobOption.requeue) - self.assertIsNone(response) + assert response is None # Test Enable Job response = client.job.enable(job_param.id) - self.assertIsNone(response) + assert response is None # Prep and release task status task_status = client.job.list_preparation_and_release_task_status(job_param.id) - self.assertIsInstance(task_status, models.JobPreparationAndReleaseTaskExecutionInformationPaged) - self.assertEqual(list(task_status), []) + assert isinstance(task_status, models.JobPreparationAndReleaseTaskExecutionInformationPaged) + assert list(task_status) == [] # Test Terminate Job response = client.job.terminate(job_param.id) - self.assertIsNone(response) + assert response is None # Test Delete Job response = client.job.delete(job_auto_param.id) - self.assertIsNone(response) + assert response is None diff --git a/tools/azure-sdk-tools/devtools_testutils/resource_testcase.py b/tools/azure-sdk-tools/devtools_testutils/resource_testcase.py index f6d2ef01826b..a56f92fe9e6f 100644 --- a/tools/azure-sdk-tools/devtools_testutils/resource_testcase.py +++ b/tools/azure-sdk-tools/devtools_testutils/resource_testcase.py @@ -17,7 +17,7 @@ pass from . import AzureMgmtPreparer -from .sanitizers import add_general_regex_sanitizer +from .sanitizers import add_general_string_sanitizer logging.getLogger().setLevel(logging.INFO) @@ -99,11 +99,7 @@ def create_resource(self, name, **kwargs): id="/subscriptions/00000000-0000-0000-0000-000000000000/resourceGroups/" + name, ) if name != self.moniker: - try: - self.test_class_instance.scrubber.register_name_pair(name, self.moniker) - # tests using the test proxy don't have a scrubber instance - except AttributeError: - add_general_regex_sanitizer(regex=name, value=self.moniker) + add_general_string_sanitizer(target=name, value=self.moniker) return { self.parameter_name: self.resource, self.parameter_name_for_location: self.location, diff --git a/tools/azure-sdk-tools/devtools_testutils/storage_testcase.py b/tools/azure-sdk-tools/devtools_testutils/storage_testcase.py index f015cc48c88f..3d907ea182c5 100644 --- a/tools/azure-sdk-tools/devtools_testutils/storage_testcase.py +++ b/tools/azure-sdk-tools/devtools_testutils/storage_testcase.py @@ -30,6 +30,7 @@ from . import AzureMgmtPreparer, ResourceGroupPreparer, FakeResource from .resource_testcase import RESOURCE_GROUP_PARAM +from .sanitizers import add_general_string_sanitizer FakeStorageAccount = FakeResource @@ -80,7 +81,7 @@ def create_resource(self, name, **kwargs): storage_keys = {v.key_name: v.value for v in self.client.storage_accounts.list_keys(group.name, name).keys} self.storage_key = storage_keys["key1"] - self.test_class_instance.scrubber.register_name_pair(name, self.resource_moniker) + add_general_string_sanitizer(target=name, value=self.resource_moniker) else: self.resource = StorageAccount( location=self.location,