From 948713a05c0c877220e405b186f7a21bd2016bc2 Mon Sep 17 00:00:00 2001 From: Marcio Dos Santos Date: Fri, 21 Jun 2019 15:56:42 -0700 Subject: [PATCH 1/2] change: improving Chainer integ tests --- tests/data/chainer_mnist/distributed_mnist.py | 9 +- tests/data/chainer_mnist/mnist.py | 4 +- tests/integ/test_chainer_train.py | 142 +++++++----------- 3 files changed, 62 insertions(+), 93 deletions(-) diff --git a/tests/data/chainer_mnist/distributed_mnist.py b/tests/data/chainer_mnist/distributed_mnist.py index e43b613725..ce5cc60885 100644 --- a/tests/data/chainer_mnist/distributed_mnist.py +++ b/tests/data/chainer_mnist/distributed_mnist.py @@ -46,7 +46,7 @@ def __call__(self, x): def _preprocess_mnist(raw, withlabel, ndim, scale, image_dtype, label_dtype, rgb_format): - images = raw['x'] + images = raw['x'][-100:] if ndim == 2: images = images.reshape(-1, 28, 28) elif ndim == 3: @@ -59,7 +59,7 @@ def _preprocess_mnist(raw, withlabel, ndim, scale, image_dtype, label_dtype, rgb images *= scale / 255. if withlabel: - labels = raw['y'].astype(label_dtype) + labels = raw['y'][-100:].astype(label_dtype) return tuple_dataset.TupleDataset(images, labels) return images @@ -112,9 +112,6 @@ def _preprocess_mnist(raw, withlabel, ndim, scale, image_dtype, label_dtype, rgb chainer.optimizers.Adam(), comm) optimizer.setup(model) - train_file = np.load(os.path.join(args.train, 'train.npz')) - test_file = np.load(os.path.join(args.test, 'test.npz')) - preprocess_mnist_options = { 'withlabel': True, 'ndim': 1, @@ -166,7 +163,7 @@ def _preprocess_mnist(raw, withlabel, ndim, scale, image_dtype, label_dtype, rgb trainer.run() # only save the model in the master node - if args.host == 'algo-1': + if args.host == env.hosts[0]: serializers.save_npz(os.path.join(env.model_dir, 'model.npz'), model) diff --git a/tests/data/chainer_mnist/mnist.py b/tests/data/chainer_mnist/mnist.py index f52d982966..24a50db6eb 100644 --- a/tests/data/chainer_mnist/mnist.py +++ b/tests/data/chainer_mnist/mnist.py @@ -42,7 +42,7 @@ def __call__(self, x): def _preprocess_mnist(raw, withlabel, ndim, scale, image_dtype, label_dtype, rgb_format): - images = raw['x'] + images = raw['x'][-100:] if ndim == 2: images = images.reshape(-1, 28, 28) elif ndim == 3: @@ -56,7 +56,7 @@ def _preprocess_mnist(raw, withlabel, ndim, scale, image_dtype, label_dtype, rgb images *= scale / 255. if withlabel: - labels = raw['y'].astype(label_dtype) + labels = raw['y'][-100:].astype(label_dtype) return tuple_dataset.TupleDataset(images, labels) else: return images diff --git a/tests/integ/test_chainer_train.py b/tests/integ/test_chainer_train.py index 515b7cc05a..7cfdb695f0 100644 --- a/tests/integ/test_chainer_train.py +++ b/tests/integ/test_chainer_train.py @@ -13,135 +13,107 @@ from __future__ import absolute_import import os -import time -import pytest import numpy +import pytest -from sagemaker.chainer.defaults import CHAINER_VERSION from sagemaker.chainer.estimator import Chainer from sagemaker.chainer.model import ChainerModel from sagemaker.utils import unique_name_from_base -import tests.integ from tests.integ import DATA_DIR, PYTHON_VERSION, TRAINING_DEFAULT_TIMEOUT_MINUTES from tests.integ.timeout import timeout, timeout_and_delete_endpoint_by_name @pytest.fixture(scope='module') -def chainer_training_job(sagemaker_session, chainer_full_version): - return _run_mnist_training_job(sagemaker_session, "ml.c4.xlarge", 1, chainer_full_version) +def chainer_local_training_job(sagemaker_local_session, chainer_full_version): + return _run_mnist_training_job(sagemaker_local_session, "local", 1, chainer_full_version) + + +def test_distributed_cpu_training(sagemaker_local_session, chainer_full_version): + _run_mnist_training_job(sagemaker_local_session, "local", 2, chainer_full_version) + +def test_training_with_additional_hyperparameters(sagemaker_local_session, chainer_full_version): + script_path = os.path.join(DATA_DIR, 'chainer_mnist', 'mnist.py') + data_path = os.path.join(DATA_DIR, 'chainer_mnist') -def test_distributed_cpu_training(sagemaker_session, chainer_full_version): - _run_mnist_training_job(sagemaker_session, "ml.c4.xlarge", 2, chainer_full_version) + chainer = Chainer(entry_point=script_path, role='SageMakerRole', + train_instance_count=1, train_instance_type="local", + framework_version=chainer_full_version, + py_version=PYTHON_VERSION, + sagemaker_session=sagemaker_local_session, hyperparameters={'epochs': 1}, + use_mpi=True, + num_processes=2, + process_slots_per_host=2, + additional_mpi_options="-x NCCL_DEBUG=INFO") + train_input = 'file://' + os.path.join(data_path, 'train') + test_input = 'file://' + os.path.join(data_path, 'test') -@pytest.mark.skipif(tests.integ.test_region() in tests.integ.HOSTING_NO_P2_REGIONS - or tests.integ.test_region() in tests.integ.TRAINING_NO_P2_REGIONS, - reason='no ml.p2 instances in these regions') -def test_distributed_gpu_training(sagemaker_session, chainer_full_version): - _run_mnist_training_job(sagemaker_session, "ml.p2.xlarge", 2, chainer_full_version) + chainer.fit({'train': train_input, 'test': test_input}) -def test_training_with_additional_hyperparameters(sagemaker_session, chainer_full_version): +@pytest.mark.canary_quick +@pytest.mark.regional_testing +def test_attach_deploy(sagemaker_session, chainer_full_version): with timeout(minutes=TRAINING_DEFAULT_TIMEOUT_MINUTES): script_path = os.path.join(DATA_DIR, 'chainer_mnist', 'mnist.py') data_path = os.path.join(DATA_DIR, 'chainer_mnist') chainer = Chainer(entry_point=script_path, role='SageMakerRole', + framework_version=chainer_full_version, py_version=PYTHON_VERSION, train_instance_count=1, train_instance_type="ml.c4.xlarge", - framework_version=chainer_full_version, - py_version=PYTHON_VERSION, - sagemaker_session=sagemaker_session, hyperparameters={'epochs': 1}, - use_mpi=True, - num_processes=2, - process_slots_per_host=2, - additional_mpi_options="-x NCCL_DEBUG=INFO") - - train_input = chainer.sagemaker_session.upload_data(path=os.path.join(data_path, 'train'), - key_prefix='integ-test-data/chainer_mnist/train') - test_input = chainer.sagemaker_session.upload_data(path=os.path.join(data_path, 'test'), - key_prefix='integ-test-data/chainer_mnist/test') + sagemaker_session=sagemaker_session, hyperparameters={'epochs': 1}) - job_name = unique_name_from_base('test-chainer-training') - chainer.fit({'train': train_input, 'test': test_input}, job_name=job_name) - return chainer.latest_training_job.name + train_input = sagemaker_session.upload_data(path=os.path.join(data_path, 'train'), + key_prefix='integ-test-data/chainer_mnist/train') + test_input = sagemaker_session.upload_data(path=os.path.join(data_path, 'test'), + key_prefix='integ-test-data/chainer_mnist/test') + + job_name = unique_name_from_base('test-chainer-training') + chainer.fit({'train': train_input, 'test': test_input}, wait=False, job_name=job_name) -@pytest.mark.canary_quick -@pytest.mark.regional_testing -def test_attach_deploy(chainer_training_job, sagemaker_session): endpoint_name = unique_name_from_base('test-chainer-attach-deploy') with timeout_and_delete_endpoint_by_name(endpoint_name, sagemaker_session): - estimator = Chainer.attach(chainer_training_job, sagemaker_session=sagemaker_session) + estimator = Chainer.attach(chainer.latest_training_job.name, + sagemaker_session=sagemaker_session) predictor = estimator.deploy(1, 'ml.m4.xlarge', endpoint_name=endpoint_name) _predict_and_assert(predictor) -def test_deploy_model(chainer_training_job, sagemaker_session): - endpoint_name = unique_name_from_base('test-chainer-deploy-model') - with timeout_and_delete_endpoint_by_name(endpoint_name, sagemaker_session): - desc = sagemaker_session.sagemaker_client.describe_training_job(TrainingJobName=chainer_training_job) - model_data = desc['ModelArtifacts']['S3ModelArtifacts'] - script_path = os.path.join(DATA_DIR, 'chainer_mnist', 'mnist.py') - model = ChainerModel(model_data, 'SageMakerRole', entry_point=script_path, sagemaker_session=sagemaker_session) - predictor = model.deploy(1, "ml.m4.xlarge", endpoint_name=endpoint_name) - _predict_and_assert(predictor) - - -def test_async_fit(sagemaker_session): - with timeout(minutes=5): - training_job_name = _run_mnist_training_job(sagemaker_session, "ml.c4.xlarge", 1, - chainer_full_version=CHAINER_VERSION, wait=False) +def test_deploy_model(chainer_local_training_job, sagemaker_local_session): + script_path = os.path.join(DATA_DIR, 'chainer_mnist', 'mnist.py') - print("Waiting to re-attach to the training job: %s" % training_job_name) - time.sleep(20) + model = ChainerModel(chainer_local_training_job.model_data, 'SageMakerRole', + entry_point=script_path, sagemaker_session=sagemaker_local_session) - endpoint_name = unique_name_from_base('test-chainer-async-fit') - with timeout_and_delete_endpoint_by_name(endpoint_name, sagemaker_session): - print("Re-attaching now to: %s" % training_job_name) - estimator = Chainer.attach(training_job_name=training_job_name, sagemaker_session=sagemaker_session) - predictor = estimator.deploy(1, "ml.c4.xlarge", endpoint_name=endpoint_name) + predictor = model.deploy(1, "local") + try: _predict_and_assert(predictor) - - -def test_failed_training_job(sagemaker_session, chainer_full_version): - with timeout(minutes=TRAINING_DEFAULT_TIMEOUT_MINUTES): - script_path = os.path.join(DATA_DIR, 'chainer_mnist', 'failure_script.py') - - chainer = Chainer(entry_point=script_path, role='SageMakerRole', - framework_version=chainer_full_version, py_version=PYTHON_VERSION, - train_instance_count=1, train_instance_type='ml.c4.xlarge', - sagemaker_session=sagemaker_session) - - with pytest.raises(ValueError) as e: - chainer.fit(job_name=unique_name_from_base('test-chainer-training')) - assert 'ExecuteUserScriptError' in str(e.value) + finally: + predictor.delete_endpoint() def _run_mnist_training_job(sagemaker_session, instance_type, instance_count, chainer_full_version, wait=True): - with timeout(minutes=TRAINING_DEFAULT_TIMEOUT_MINUTES): + script_path = os.path.join(DATA_DIR, 'chainer_mnist', 'mnist.py') if instance_type == 1 else \ + os.path.join(DATA_DIR, 'chainer_mnist', 'distributed_mnist.py') - script_path = os.path.join(DATA_DIR, 'chainer_mnist', 'mnist.py') if instance_type == 1 else \ - os.path.join(DATA_DIR, 'chainer_mnist', 'distributed_mnist.py') - - data_path = os.path.join(DATA_DIR, 'chainer_mnist') + data_path = os.path.join(DATA_DIR, 'chainer_mnist') - chainer = Chainer(entry_point=script_path, role='SageMakerRole', - framework_version=chainer_full_version, py_version=PYTHON_VERSION, - train_instance_count=instance_count, train_instance_type=instance_type, - sagemaker_session=sagemaker_session, hyperparameters={'epochs': 1}) + chainer = Chainer(entry_point=script_path, role='SageMakerRole', + framework_version=chainer_full_version, py_version=PYTHON_VERSION, + train_instance_count=instance_count, train_instance_type=instance_type, + sagemaker_session=sagemaker_session, hyperparameters={'epochs': 1}) - train_input = chainer.sagemaker_session.upload_data(path=os.path.join(data_path, 'train'), - key_prefix='integ-test-data/chainer_mnist/train') - test_input = chainer.sagemaker_session.upload_data(path=os.path.join(data_path, 'test'), - key_prefix='integ-test-data/chainer_mnist/test') + train_input = 'file://' + os.path.join(data_path, 'train') + test_input = 'file://' + os.path.join(data_path, 'test') - job_name = unique_name_from_base('test-chainer-training') - chainer.fit({'train': train_input, 'test': test_input}, wait=wait, job_name=job_name) - return chainer.latest_training_job.name + job_name = unique_name_from_base('test-chainer-training') + chainer.fit({'train': train_input, 'test': test_input}, wait=wait, job_name=job_name) + return chainer def _predict_and_assert(predictor): From 7708c1d2666c599ba765e4c5d51186320d1b6343 Mon Sep 17 00:00:00 2001 From: Marcio Dos Santos Date: Wed, 26 Jun 2019 15:51:45 -0700 Subject: [PATCH 2/2] black format --- tests/data/chainer_mnist/distributed_mnist.py | 6 +- tests/data/chainer_mnist/mnist.py | 4 +- tests/integ/test_chainer_train.py | 136 +++++++++++------- 3 files changed, 87 insertions(+), 59 deletions(-) diff --git a/tests/data/chainer_mnist/distributed_mnist.py b/tests/data/chainer_mnist/distributed_mnist.py index 37b1c1e468..2d8f0319bd 100644 --- a/tests/data/chainer_mnist/distributed_mnist.py +++ b/tests/data/chainer_mnist/distributed_mnist.py @@ -46,7 +46,7 @@ def __call__(self, x): def _preprocess_mnist(raw, withlabel, ndim, scale, image_dtype, label_dtype, rgb_format): - images = raw['x'][-100:] + images = raw["x"][-100:] if ndim == 2: images = images.reshape(-1, 28, 28) elif ndim == 3: @@ -59,7 +59,7 @@ def _preprocess_mnist(raw, withlabel, ndim, scale, image_dtype, label_dtype, rgb images *= scale / 255.0 if withlabel: - labels = raw['y'][-100:].astype(label_dtype) + labels = raw["y"][-100:].astype(label_dtype) return tuple_dataset.TupleDataset(images, labels) return images @@ -171,7 +171,7 @@ def _preprocess_mnist(raw, withlabel, ndim, scale, image_dtype, label_dtype, rgb # only save the model in the master node if args.host == env.hosts[0]: - serializers.save_npz(os.path.join(env.model_dir, 'model.npz'), model) + serializers.save_npz(os.path.join(env.model_dir, "model.npz"), model) def model_fn(model_dir): diff --git a/tests/data/chainer_mnist/mnist.py b/tests/data/chainer_mnist/mnist.py index 860d0a2220..dd98f504bc 100644 --- a/tests/data/chainer_mnist/mnist.py +++ b/tests/data/chainer_mnist/mnist.py @@ -42,7 +42,7 @@ def __call__(self, x): def _preprocess_mnist(raw, withlabel, ndim, scale, image_dtype, label_dtype, rgb_format): - images = raw['x'][-100:] + images = raw["x"][-100:] if ndim == 2: images = images.reshape(-1, 28, 28) elif ndim == 3: @@ -55,7 +55,7 @@ def _preprocess_mnist(raw, withlabel, ndim, scale, image_dtype, label_dtype, rgb images *= scale / 255.0 if withlabel: - labels = raw['y'][-100:].astype(label_dtype) + labels = raw["y"][-100:].astype(label_dtype) return tuple_dataset.TupleDataset(images, labels) else: return images diff --git a/tests/integ/test_chainer_train.py b/tests/integ/test_chainer_train.py index b949d2b925..5b036cd432 100644 --- a/tests/integ/test_chainer_train.py +++ b/tests/integ/test_chainer_train.py @@ -24,7 +24,7 @@ from tests.integ.timeout import timeout, timeout_and_delete_endpoint_by_name -@pytest.fixture(scope='module') +@pytest.fixture(scope="module") def chainer_local_training_job(sagemaker_local_session, chainer_full_version): return _run_mnist_training_job(sagemaker_local_session, "local", 1, chainer_full_version) @@ -36,61 +36,79 @@ def test_distributed_cpu_training(sagemaker_local_session, chainer_full_version) @pytest.mark.local_mode def test_training_with_additional_hyperparameters(sagemaker_local_session, chainer_full_version): - script_path = os.path.join(DATA_DIR, 'chainer_mnist', 'mnist.py') - data_path = os.path.join(DATA_DIR, 'chainer_mnist') - - chainer = Chainer(entry_point=script_path, role='SageMakerRole', - train_instance_count=1, train_instance_type="local", - framework_version=chainer_full_version, - py_version=PYTHON_VERSION, - sagemaker_session=sagemaker_local_session, hyperparameters={'epochs': 1}, - use_mpi=True, - num_processes=2, - process_slots_per_host=2, - additional_mpi_options="-x NCCL_DEBUG=INFO") - - train_input = 'file://' + os.path.join(data_path, 'train') - test_input = 'file://' + os.path.join(data_path, 'test') - - chainer.fit({'train': train_input, 'test': test_input}) + script_path = os.path.join(DATA_DIR, "chainer_mnist", "mnist.py") + data_path = os.path.join(DATA_DIR, "chainer_mnist") + + chainer = Chainer( + entry_point=script_path, + role="SageMakerRole", + train_instance_count=1, + train_instance_type="local", + framework_version=chainer_full_version, + py_version=PYTHON_VERSION, + sagemaker_session=sagemaker_local_session, + hyperparameters={"epochs": 1}, + use_mpi=True, + num_processes=2, + process_slots_per_host=2, + additional_mpi_options="-x NCCL_DEBUG=INFO", + ) + + train_input = "file://" + os.path.join(data_path, "train") + test_input = "file://" + os.path.join(data_path, "test") + + chainer.fit({"train": train_input, "test": test_input}) @pytest.mark.canary_quick @pytest.mark.regional_testing def test_attach_deploy(sagemaker_session, chainer_full_version): with timeout(minutes=TRAINING_DEFAULT_TIMEOUT_MINUTES): - script_path = os.path.join(DATA_DIR, 'chainer_mnist', 'mnist.py') - data_path = os.path.join(DATA_DIR, 'chainer_mnist') + script_path = os.path.join(DATA_DIR, "chainer_mnist", "mnist.py") + data_path = os.path.join(DATA_DIR, "chainer_mnist") - chainer = Chainer(entry_point=script_path, role='SageMakerRole', - framework_version=chainer_full_version, py_version=PYTHON_VERSION, - train_instance_count=1, train_instance_type="ml.c4.xlarge", - sagemaker_session=sagemaker_session, hyperparameters={'epochs': 1}) + chainer = Chainer( + entry_point=script_path, + role="SageMakerRole", + framework_version=chainer_full_version, + py_version=PYTHON_VERSION, + train_instance_count=1, + train_instance_type="ml.c4.xlarge", + sagemaker_session=sagemaker_session, + hyperparameters={"epochs": 1}, + ) - train_input = sagemaker_session.upload_data(path=os.path.join(data_path, 'train'), - key_prefix='integ-test-data/chainer_mnist/train') + train_input = sagemaker_session.upload_data( + path=os.path.join(data_path, "train"), key_prefix="integ-test-data/chainer_mnist/train" + ) - test_input = sagemaker_session.upload_data(path=os.path.join(data_path, 'test'), - key_prefix='integ-test-data/chainer_mnist/test') + test_input = sagemaker_session.upload_data( + path=os.path.join(data_path, "test"), key_prefix="integ-test-data/chainer_mnist/test" + ) - job_name = unique_name_from_base('test-chainer-training') - chainer.fit({'train': train_input, 'test': test_input}, wait=False, job_name=job_name) + job_name = unique_name_from_base("test-chainer-training") + chainer.fit({"train": train_input, "test": test_input}, wait=False, job_name=job_name) - endpoint_name = unique_name_from_base('test-chainer-attach-deploy') + endpoint_name = unique_name_from_base("test-chainer-attach-deploy") with timeout_and_delete_endpoint_by_name(endpoint_name, sagemaker_session): - estimator = Chainer.attach(chainer.latest_training_job.name, - sagemaker_session=sagemaker_session) - predictor = estimator.deploy(1, 'ml.m4.xlarge', endpoint_name=endpoint_name) + estimator = Chainer.attach( + chainer.latest_training_job.name, sagemaker_session=sagemaker_session + ) + predictor = estimator.deploy(1, "ml.m4.xlarge", endpoint_name=endpoint_name) _predict_and_assert(predictor) @pytest.mark.local_mode def test_deploy_model(chainer_local_training_job, sagemaker_local_session): - script_path = os.path.join(DATA_DIR, 'chainer_mnist', 'mnist.py') + script_path = os.path.join(DATA_DIR, "chainer_mnist", "mnist.py") - model = ChainerModel(chainer_local_training_job.model_data, 'SageMakerRole', - entry_point=script_path, sagemaker_session=sagemaker_local_session) + model = ChainerModel( + chainer_local_training_job.model_data, + "SageMakerRole", + entry_point=script_path, + sagemaker_session=sagemaker_local_session, + ) predictor = model.deploy(1, "local") try: @@ -99,23 +117,33 @@ def test_deploy_model(chainer_local_training_job, sagemaker_local_session): predictor.delete_endpoint() -def _run_mnist_training_job(sagemaker_session, instance_type, instance_count, - chainer_full_version, wait=True): - script_path = os.path.join(DATA_DIR, 'chainer_mnist', 'mnist.py') if instance_type == 1 else \ - os.path.join(DATA_DIR, 'chainer_mnist', 'distributed_mnist.py') - - data_path = os.path.join(DATA_DIR, 'chainer_mnist') - - chainer = Chainer(entry_point=script_path, role='SageMakerRole', - framework_version=chainer_full_version, py_version=PYTHON_VERSION, - train_instance_count=instance_count, train_instance_type=instance_type, - sagemaker_session=sagemaker_session, hyperparameters={'epochs': 1}) - - train_input = 'file://' + os.path.join(data_path, 'train') - test_input = 'file://' + os.path.join(data_path, 'test') - - job_name = unique_name_from_base('test-chainer-training') - chainer.fit({'train': train_input, 'test': test_input}, wait=wait, job_name=job_name) +def _run_mnist_training_job( + sagemaker_session, instance_type, instance_count, chainer_full_version, wait=True +): + script_path = ( + os.path.join(DATA_DIR, "chainer_mnist", "mnist.py") + if instance_type == 1 + else os.path.join(DATA_DIR, "chainer_mnist", "distributed_mnist.py") + ) + + data_path = os.path.join(DATA_DIR, "chainer_mnist") + + chainer = Chainer( + entry_point=script_path, + role="SageMakerRole", + framework_version=chainer_full_version, + py_version=PYTHON_VERSION, + train_instance_count=instance_count, + train_instance_type=instance_type, + sagemaker_session=sagemaker_session, + hyperparameters={"epochs": 1}, + ) + + train_input = "file://" + os.path.join(data_path, "train") + test_input = "file://" + os.path.join(data_path, "test") + + job_name = unique_name_from_base("test-chainer-training") + chainer.fit({"train": train_input, "test": test_input}, wait=wait, job_name=job_name) return chainer