diff --git a/tests/integ/datasets.py b/tests/integ/datasets.py new file mode 100644 index 0000000000..a68b1ee648 --- /dev/null +++ b/tests/integ/datasets.py @@ -0,0 +1,27 @@ +# Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You +# may not use this file except in compliance with the License. A copy of +# the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the "license" file accompanying this file. This file is +# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific +# language governing permissions and limitations under the License. +from __future__ import absolute_import + +import gzip +import os +import pickle + +from tests.integ import DATA_DIR + + +def one_p_mnist(): + data_path = os.path.join(DATA_DIR, "one_p_mnist", "mnist.pkl.gz") + with gzip.open(data_path, "rb") as f: + training_set, _, _ = pickle.load(f, encoding="latin1") + + return training_set diff --git a/tests/integ/test_airflow_config.py b/tests/integ/test_airflow_config.py index 7f4c5ecc9e..6b91811edc 100644 --- a/tests/integ/test_airflow_config.py +++ b/tests/integ/test_airflow_config.py @@ -12,15 +12,17 @@ # language governing permissions and limitations under the License. from __future__ import absolute_import -import gzip import os -import pickle -import sys -import pytest -import tests.integ +import airflow +import pytest import numpy as np +from airflow import DAG +from airflow.contrib.operators.sagemaker_training_operator import SageMakerTrainingOperator +from airflow.contrib.operators.sagemaker_transform_operator import SageMakerTransformOperator +from six.moves.urllib.parse import urlparse +import tests.integ from sagemaker import ( KMeans, FactorizationMachines, @@ -40,21 +42,13 @@ from sagemaker.pytorch.estimator import PyTorch from sagemaker.sklearn import SKLearn from sagemaker.tensorflow import TensorFlow -from sagemaker.workflow import airflow as sm_airflow from sagemaker.utils import sagemaker_timestamp - -import airflow -from airflow import DAG -from airflow.contrib.operators.sagemaker_training_operator import SageMakerTrainingOperator -from airflow.contrib.operators.sagemaker_transform_operator import SageMakerTransformOperator - +from sagemaker.workflow import airflow as sm_airflow from sagemaker.xgboost import XGBoost -from tests.integ import DATA_DIR, PYTHON_VERSION +from tests.integ import datasets, DATA_DIR, PYTHON_VERSION from tests.integ.record_set import prepare_record_set_from_local_files from tests.integ.timeout import timeout -from six.moves.urllib.parse import urlparse - PYTORCH_MNIST_DIR = os.path.join(DATA_DIR, "pytorch_mnist") PYTORCH_MNIST_SCRIPT = os.path.join(PYTORCH_MNIST_DIR, "mnist.py") AIRFLOW_CONFIG_TIMEOUT_IN_SECONDS = 10 @@ -101,13 +95,6 @@ def test_byo_airflow_config_uploads_data_source_to_s3_when_inputs_provided( @pytest.mark.canary_quick def test_kmeans_airflow_config_uploads_data_source_to_s3(sagemaker_session, cpu_instance_type): with timeout(seconds=AIRFLOW_CONFIG_TIMEOUT_IN_SECONDS): - data_path = os.path.join(DATA_DIR, "one_p_mnist", "mnist.pkl.gz") - pickle_args = {} if sys.version_info.major == 2 else {"encoding": "latin1"} - - # Load the data into memory as numpy arrays - with gzip.open(data_path, "rb") as f: - train_set, _, _ = pickle.load(f, **pickle_args) - kmeans = KMeans( role=ROLE, train_instance_count=SINGLE_INSTANCE_COUNT, @@ -126,7 +113,7 @@ def test_kmeans_airflow_config_uploads_data_source_to_s3(sagemaker_session, cpu_ kmeans.center_factor = 1 kmeans.eval_metrics = ["ssd", "msd"] - records = kmeans.record_set(train_set[0][:100]) + records = kmeans.record_set(datasets.one_p_mnist()[0][:100]) training_config = _build_airflow_workflow( estimator=kmeans, instance_type=cpu_instance_type, inputs=records @@ -140,13 +127,6 @@ def test_kmeans_airflow_config_uploads_data_source_to_s3(sagemaker_session, cpu_ def test_fm_airflow_config_uploads_data_source_to_s3(sagemaker_session, cpu_instance_type): with timeout(seconds=AIRFLOW_CONFIG_TIMEOUT_IN_SECONDS): - data_path = os.path.join(DATA_DIR, "one_p_mnist", "mnist.pkl.gz") - pickle_args = {} if sys.version_info.major == 2 else {"encoding": "latin1"} - - # Load the data into memory as numpy arrays - with gzip.open(data_path, "rb") as f: - train_set, _, _ = pickle.load(f, **pickle_args) - fm = FactorizationMachines( role=ROLE, train_instance_count=SINGLE_INSTANCE_COUNT, @@ -160,7 +140,8 @@ def test_fm_airflow_config_uploads_data_source_to_s3(sagemaker_session, cpu_inst sagemaker_session=sagemaker_session, ) - records = fm.record_set(train_set[0][:200], train_set[1][:200].astype("float32")) + training_set = datasets.one_p_mnist() + records = fm.record_set(training_set[0][:200], training_set[1][:200].astype("float32")) training_config = _build_airflow_workflow( estimator=fm, instance_type=cpu_instance_type, inputs=records @@ -206,13 +187,6 @@ def test_ipinsights_airflow_config_uploads_data_source_to_s3(sagemaker_session, def test_knn_airflow_config_uploads_data_source_to_s3(sagemaker_session, cpu_instance_type): with timeout(seconds=AIRFLOW_CONFIG_TIMEOUT_IN_SECONDS): - data_path = os.path.join(DATA_DIR, "one_p_mnist", "mnist.pkl.gz") - pickle_args = {} if sys.version_info.major == 2 else {"encoding": "latin1"} - - # Load the data into memory as numpy arrays - with gzip.open(data_path, "rb") as f: - train_set, _, _ = pickle.load(f, **pickle_args) - knn = KNN( role=ROLE, train_instance_count=SINGLE_INSTANCE_COUNT, @@ -223,7 +197,8 @@ def test_knn_airflow_config_uploads_data_source_to_s3(sagemaker_session, cpu_ins sagemaker_session=sagemaker_session, ) - records = knn.record_set(train_set[0][:200], train_set[1][:200].astype("float32")) + training_set = datasets.one_p_mnist() + records = knn.record_set(training_set[0][:200], training_set[1][:200].astype("float32")) training_config = _build_airflow_workflow( estimator=knn, instance_type=cpu_instance_type, inputs=records @@ -277,16 +252,10 @@ def test_linearlearner_airflow_config_uploads_data_source_to_s3( sagemaker_session, cpu_instance_type ): with timeout(seconds=AIRFLOW_CONFIG_TIMEOUT_IN_SECONDS): - data_path = os.path.join(DATA_DIR, "one_p_mnist", "mnist.pkl.gz") - pickle_args = {} if sys.version_info.major == 2 else {"encoding": "latin1"} - - # Load the data into memory as numpy arrays - with gzip.open(data_path, "rb") as f: - train_set, _, _ = pickle.load(f, **pickle_args) - - train_set[1][:100] = 1 - train_set[1][100:200] = 0 - train_set = train_set[0], train_set[1].astype(np.dtype("float32")) + training_set = datasets.one_p_mnist() + training_set[1][:100] = 1 + training_set[1][100:200] = 0 + training_set = training_set[0], training_set[1].astype(np.dtype("float32")) ll = LinearLearner( ROLE, @@ -331,7 +300,7 @@ def test_linearlearner_airflow_config_uploads_data_source_to_s3( ll.early_stopping_tolerance = 0.0001 ll.early_stopping_patience = 3 - records = ll.record_set(train_set[0][:200], train_set[1][:200]) + records = ll.record_set(training_set[0][:200], training_set[1][:200]) training_config = _build_airflow_workflow( estimator=ll, instance_type=cpu_instance_type, inputs=records @@ -380,13 +349,6 @@ def test_ntm_airflow_config_uploads_data_source_to_s3(sagemaker_session, cpu_ins @pytest.mark.canary_quick def test_pca_airflow_config_uploads_data_source_to_s3(sagemaker_session, cpu_instance_type): with timeout(seconds=AIRFLOW_CONFIG_TIMEOUT_IN_SECONDS): - data_path = os.path.join(DATA_DIR, "one_p_mnist", "mnist.pkl.gz") - pickle_args = {} if sys.version_info.major == 2 else {"encoding": "latin1"} - - # Load the data into memory as numpy arrays - with gzip.open(data_path, "rb") as f: - train_set, _, _ = pickle.load(f, **pickle_args) - pca = PCA( role=ROLE, train_instance_count=SINGLE_INSTANCE_COUNT, @@ -399,7 +361,7 @@ def test_pca_airflow_config_uploads_data_source_to_s3(sagemaker_session, cpu_ins pca.subtract_mean = True pca.extra_components = 5 - records = pca.record_set(train_set[0][:100]) + records = pca.record_set(datasets.one_p_mnist()[0][:100]) training_config = _build_airflow_workflow( estimator=pca, instance_type=cpu_instance_type, inputs=records diff --git a/tests/integ/test_byo_estimator.py b/tests/integ/test_byo_estimator.py index d84ef1cc10..738af4c57a 100644 --- a/tests/integ/test_byo_estimator.py +++ b/tests/integ/test_byo_estimator.py @@ -12,11 +12,8 @@ # language governing permissions and limitations under the License. from __future__ import absolute_import -import gzip import json import os -import pickle -import sys import pytest @@ -24,7 +21,7 @@ from sagemaker.amazon.amazon_estimator import get_image_uri from sagemaker.estimator import Estimator from sagemaker.utils import unique_name_from_base -from tests.integ import DATA_DIR, TRAINING_DEFAULT_TIMEOUT_MINUTES +from tests.integ import DATA_DIR, TRAINING_DEFAULT_TIMEOUT_MINUTES, datasets from tests.integ.timeout import timeout, timeout_and_delete_endpoint_by_name @@ -33,6 +30,11 @@ def region(sagemaker_session): return sagemaker_session.boto_session.region_name +@pytest.fixture +def training_set(): + return datasets.one_p_mnist() + + def fm_serializer(data): js = {"instances": []} for row in data: @@ -41,7 +43,7 @@ def fm_serializer(data): @pytest.mark.canary_quick -def test_byo_estimator(sagemaker_session, region, cpu_instance_type): +def test_byo_estimator(sagemaker_session, region, cpu_instance_type, training_set): """Use Factorization Machines algorithm as an example here. First we need to prepare data for training. We take standard data set, convert it to the @@ -57,12 +59,6 @@ def test_byo_estimator(sagemaker_session, region, cpu_instance_type): job_name = unique_name_from_base("byo") with timeout(minutes=TRAINING_DEFAULT_TIMEOUT_MINUTES): - data_path = os.path.join(DATA_DIR, "one_p_mnist", "mnist.pkl.gz") - pickle_args = {} if sys.version_info.major == 2 else {"encoding": "latin1"} - - with gzip.open(data_path, "rb") as f: - train_set, _, _ = pickle.load(f, **pickle_args) - prefix = "test_byo_estimator" key = "recordio-pb-data" @@ -92,26 +88,20 @@ def test_byo_estimator(sagemaker_session, region, cpu_instance_type): predictor.content_type = "application/json" predictor.deserializer = sagemaker.predictor.json_deserializer - result = predictor.predict(train_set[0][:10]) + result = predictor.predict(training_set[0][:10]) assert len(result["predictions"]) == 10 for prediction in result["predictions"]: assert prediction["score"] is not None -def test_async_byo_estimator(sagemaker_session, region, cpu_instance_type): +def test_async_byo_estimator(sagemaker_session, region, cpu_instance_type, training_set): image_name = get_image_uri(region, "factorization-machines") endpoint_name = unique_name_from_base("byo") training_data_path = os.path.join(DATA_DIR, "dummy_tensor") job_name = unique_name_from_base("byo") with timeout(minutes=5): - data_path = os.path.join(DATA_DIR, "one_p_mnist", "mnist.pkl.gz") - pickle_args = {} if sys.version_info.major == 2 else {"encoding": "latin1"} - - with gzip.open(data_path, "rb") as f: - train_set, _, _ = pickle.load(f, **pickle_args) - prefix = "test_byo_estimator" key = "recordio-pb-data" @@ -144,7 +134,7 @@ def test_async_byo_estimator(sagemaker_session, region, cpu_instance_type): predictor.content_type = "application/json" predictor.deserializer = sagemaker.predictor.json_deserializer - result = predictor.predict(train_set[0][:10]) + result = predictor.predict(training_set[0][:10]) assert len(result["predictions"]) == 10 for prediction in result["predictions"]: diff --git a/tests/integ/test_factorization_machines.py b/tests/integ/test_factorization_machines.py index 6aba98c5f6..5e4b37d1a0 100644 --- a/tests/integ/test_factorization_machines.py +++ b/tests/integ/test_factorization_machines.py @@ -12,29 +12,25 @@ # language governing permissions and limitations under the License. from __future__ import absolute_import -import gzip -import os -import pickle -import sys import time +import pytest + from sagemaker import FactorizationMachines, FactorizationMachinesModel from sagemaker.utils import unique_name_from_base -from tests.integ import DATA_DIR, TRAINING_DEFAULT_TIMEOUT_MINUTES +from tests.integ import datasets, TRAINING_DEFAULT_TIMEOUT_MINUTES from tests.integ.timeout import timeout, timeout_and_delete_endpoint_by_name -def test_factorization_machines(sagemaker_session, cpu_instance_type): - job_name = unique_name_from_base("fm") +@pytest.fixture +def training_set(): + return datasets.one_p_mnist() - with timeout(minutes=TRAINING_DEFAULT_TIMEOUT_MINUTES): - data_path = os.path.join(DATA_DIR, "one_p_mnist", "mnist.pkl.gz") - pickle_args = {} if sys.version_info.major == 2 else {"encoding": "latin1"} - # Load the data into memory as numpy arrays - with gzip.open(data_path, "rb") as f: - train_set, _, _ = pickle.load(f, **pickle_args) +def test_factorization_machines(sagemaker_session, cpu_instance_type, training_set): + job_name = unique_name_from_base("fm") + with timeout(minutes=TRAINING_DEFAULT_TIMEOUT_MINUTES): fm = FactorizationMachines( role="SageMakerRole", train_instance_count=1, @@ -50,7 +46,7 @@ def test_factorization_machines(sagemaker_session, cpu_instance_type): # training labels must be 'float32' fm.fit( - fm.record_set(train_set[0][:200], train_set[1][:200].astype("float32")), + fm.record_set(training_set[0][:200], training_set[1][:200].astype("float32")), job_name=job_name, ) @@ -59,24 +55,17 @@ def test_factorization_machines(sagemaker_session, cpu_instance_type): fm.model_data, role="SageMakerRole", sagemaker_session=sagemaker_session ) predictor = model.deploy(1, cpu_instance_type, endpoint_name=job_name) - result = predictor.predict(train_set[0][:10]) + result = predictor.predict(training_set[0][:10]) assert len(result) == 10 for record in result: assert record.label["score"] is not None -def test_async_factorization_machines(sagemaker_session, cpu_instance_type): +def test_async_factorization_machines(sagemaker_session, cpu_instance_type, training_set): job_name = unique_name_from_base("fm") with timeout(minutes=5): - data_path = os.path.join(DATA_DIR, "one_p_mnist", "mnist.pkl.gz") - pickle_args = {} if sys.version_info.major == 2 else {"encoding": "latin1"} - - # Load the data into memory as numpy arrays - with gzip.open(data_path, "rb") as f: - train_set, _, _ = pickle.load(f, **pickle_args) - fm = FactorizationMachines( role="SageMakerRole", train_instance_count=1, @@ -92,7 +81,7 @@ def test_async_factorization_machines(sagemaker_session, cpu_instance_type): # training labels must be 'float32' fm.fit( - fm.record_set(train_set[0][:200], train_set[1][:200].astype("float32")), + fm.record_set(training_set[0][:200], training_set[1][:200].astype("float32")), job_name=job_name, wait=False, ) @@ -109,7 +98,7 @@ def test_async_factorization_machines(sagemaker_session, cpu_instance_type): estimator.model_data, role="SageMakerRole", sagemaker_session=sagemaker_session ) predictor = model.deploy(1, cpu_instance_type, endpoint_name=job_name) - result = predictor.predict(train_set[0][:10]) + result = predictor.predict(training_set[0][:10]) assert len(result) == 10 for record in result: diff --git a/tests/integ/test_kmeans.py b/tests/integ/test_kmeans.py index c6dd990576..e4f071efd6 100644 --- a/tests/integ/test_kmeans.py +++ b/tests/integ/test_kmeans.py @@ -12,31 +12,25 @@ # language governing permissions and limitations under the License. from __future__ import absolute_import -import gzip import json -import os -import pickle -import sys import time import pytest from sagemaker import KMeans, KMeansModel from sagemaker.utils import unique_name_from_base -from tests.integ import DATA_DIR, TRAINING_DEFAULT_TIMEOUT_MINUTES +from tests.integ import datasets, TRAINING_DEFAULT_TIMEOUT_MINUTES from tests.integ.timeout import timeout, timeout_and_delete_endpoint_by_name -def test_kmeans(sagemaker_session, cpu_instance_type): - job_name = unique_name_from_base("kmeans") - with timeout(minutes=TRAINING_DEFAULT_TIMEOUT_MINUTES): - data_path = os.path.join(DATA_DIR, "one_p_mnist", "mnist.pkl.gz") - pickle_args = {} if sys.version_info.major == 2 else {"encoding": "latin1"} +@pytest.fixture +def training_set(): + return datasets.one_p_mnist() - # Load the data into memory as numpy arrays - with gzip.open(data_path, "rb") as f: - train_set, _, _ = pickle.load(f, **pickle_args) +def test_kmeans(sagemaker_session, cpu_instance_type, training_set): + job_name = unique_name_from_base("kmeans") + with timeout(minutes=TRAINING_DEFAULT_TIMEOUT_MINUTES): kmeans = KMeans( role="SageMakerRole", train_instance_count=1, @@ -69,14 +63,14 @@ def test_kmeans(sagemaker_session, cpu_instance_type): force_dense="True", ) - kmeans.fit(kmeans.record_set(train_set[0][:100]), job_name=job_name) + kmeans.fit(kmeans.record_set(training_set[0][:100]), job_name=job_name) with timeout_and_delete_endpoint_by_name(job_name, sagemaker_session): model = KMeansModel( kmeans.model_data, role="SageMakerRole", sagemaker_session=sagemaker_session ) predictor = model.deploy(1, cpu_instance_type, endpoint_name=job_name) - result = predictor.predict(train_set[0][:10]) + result = predictor.predict(training_set[0][:10]) assert len(result) == 10 for record in result: @@ -89,17 +83,10 @@ def test_kmeans(sagemaker_session, cpu_instance_type): assert "Could not find model" in str(exception.value) -def test_async_kmeans(sagemaker_session, cpu_instance_type): +def test_async_kmeans(sagemaker_session, cpu_instance_type, training_set): job_name = unique_name_from_base("kmeans") with timeout(minutes=5): - data_path = os.path.join(DATA_DIR, "one_p_mnist", "mnist.pkl.gz") - pickle_args = {} if sys.version_info.major == 2 else {"encoding": "latin1"} - - # Load the data into memory as numpy arrays - with gzip.open(data_path, "rb") as f: - train_set, _, _ = pickle.load(f, **pickle_args) - kmeans = KMeans( role="SageMakerRole", train_instance_count=1, @@ -130,7 +117,7 @@ def test_async_kmeans(sagemaker_session, cpu_instance_type): force_dense="True", ) - kmeans.fit(kmeans.record_set(train_set[0][:100]), wait=False, job_name=job_name) + kmeans.fit(kmeans.record_set(training_set[0][:100]), wait=False, job_name=job_name) print("Detached from training job. Will re-attach in 20 seconds") time.sleep(20) @@ -142,7 +129,7 @@ def test_async_kmeans(sagemaker_session, cpu_instance_type): estimator.model_data, role="SageMakerRole", sagemaker_session=sagemaker_session ) predictor = model.deploy(1, cpu_instance_type, endpoint_name=job_name) - result = predictor.predict(train_set[0][:10]) + result = predictor.predict(training_set[0][:10]) assert len(result) == 10 for record in result: diff --git a/tests/integ/test_knn.py b/tests/integ/test_knn.py index 52d2e2e3e0..7f9b9854e9 100644 --- a/tests/integ/test_knn.py +++ b/tests/integ/test_knn.py @@ -12,29 +12,25 @@ # language governing permissions and limitations under the License. from __future__ import absolute_import -import gzip -import os -import pickle -import sys import time +import pytest + from sagemaker import KNN, KNNModel from sagemaker.utils import unique_name_from_base -from tests.integ import DATA_DIR, TRAINING_DEFAULT_TIMEOUT_MINUTES +from tests.integ import datasets, TRAINING_DEFAULT_TIMEOUT_MINUTES from tests.integ.timeout import timeout, timeout_and_delete_endpoint_by_name -def test_knn_regressor(sagemaker_session, cpu_instance_type): - job_name = unique_name_from_base("knn") +@pytest.fixture +def training_set(): + return datasets.one_p_mnist() - with timeout(minutes=TRAINING_DEFAULT_TIMEOUT_MINUTES): - data_path = os.path.join(DATA_DIR, "one_p_mnist", "mnist.pkl.gz") - pickle_args = {} if sys.version_info.major == 2 else {"encoding": "latin1"} - # Load the data into memory as numpy arrays - with gzip.open(data_path, "rb") as f: - train_set, _, _ = pickle.load(f, **pickle_args) +def test_knn_regressor(sagemaker_session, cpu_instance_type, training_set): + job_name = unique_name_from_base("knn") + with timeout(minutes=TRAINING_DEFAULT_TIMEOUT_MINUTES): knn = KNN( role="SageMakerRole", train_instance_count=1, @@ -47,31 +43,24 @@ def test_knn_regressor(sagemaker_session, cpu_instance_type): # training labels must be 'float32' knn.fit( - knn.record_set(train_set[0][:200], train_set[1][:200].astype("float32")), + knn.record_set(training_set[0][:200], training_set[1][:200].astype("float32")), job_name=job_name, ) with timeout_and_delete_endpoint_by_name(job_name, sagemaker_session): model = KNNModel(knn.model_data, role="SageMakerRole", sagemaker_session=sagemaker_session) predictor = model.deploy(1, cpu_instance_type, endpoint_name=job_name) - result = predictor.predict(train_set[0][:10]) + result = predictor.predict(training_set[0][:10]) assert len(result) == 10 for record in result: assert record.label["score"] is not None -def test_async_knn_classifier(sagemaker_session, cpu_instance_type): +def test_async_knn_classifier(sagemaker_session, cpu_instance_type, training_set): job_name = unique_name_from_base("knn") with timeout(minutes=5): - data_path = os.path.join(DATA_DIR, "one_p_mnist", "mnist.pkl.gz") - pickle_args = {} if sys.version_info.major == 2 else {"encoding": "latin1"} - - # Load the data into memory as numpy arrays - with gzip.open(data_path, "rb") as f: - train_set, _, _ = pickle.load(f, **pickle_args) - knn = KNN( role="SageMakerRole", train_instance_count=1, @@ -86,7 +75,7 @@ def test_async_knn_classifier(sagemaker_session, cpu_instance_type): # training labels must be 'float32' knn.fit( - knn.record_set(train_set[0][:200], train_set[1][:200].astype("float32")), + knn.record_set(training_set[0][:200], training_set[1][:200].astype("float32")), wait=False, job_name=job_name, ) @@ -101,7 +90,7 @@ def test_async_knn_classifier(sagemaker_session, cpu_instance_type): estimator.model_data, role="SageMakerRole", sagemaker_session=sagemaker_session ) predictor = model.deploy(1, cpu_instance_type, endpoint_name=job_name) - result = predictor.predict(train_set[0][:10]) + result = predictor.predict(training_set[0][:10]) assert len(result) == 10 for record in result: diff --git a/tests/integ/test_linear_learner.py b/tests/integ/test_linear_learner.py index 62bd35b8c1..5d49f77306 100644 --- a/tests/integ/test_linear_learner.py +++ b/tests/integ/test_linear_learner.py @@ -12,10 +12,6 @@ # language governing permissions and limitations under the License. from __future__ import absolute_import -import gzip -import os -import pickle -import sys import time import numpy as np @@ -23,25 +19,23 @@ from sagemaker.amazon.linear_learner import LinearLearner, LinearLearnerModel from sagemaker.utils import unique_name_from_base -from tests.integ import DATA_DIR, TRAINING_DEFAULT_TIMEOUT_MINUTES +from tests.integ import datasets, TRAINING_DEFAULT_TIMEOUT_MINUTES from tests.integ.timeout import timeout, timeout_and_delete_endpoint_by_name +@pytest.fixture +def training_set(): + return datasets.one_p_mnist() + + @pytest.mark.canary_quick -def test_linear_learner(sagemaker_session, cpu_instance_type): +def test_linear_learner(sagemaker_session, cpu_instance_type, training_set): job_name = unique_name_from_base("linear-learner") with timeout(minutes=TRAINING_DEFAULT_TIMEOUT_MINUTES): - data_path = os.path.join(DATA_DIR, "one_p_mnist", "mnist.pkl.gz") - pickle_args = {} if sys.version_info.major == 2 else {"encoding": "latin1"} - - # Load the data into memory as numpy arrays - with gzip.open(data_path, "rb") as f: - train_set, _, _ = pickle.load(f, **pickle_args) - - train_set[1][:100] = 1 - train_set[1][100:200] = 0 - train_set = train_set[0], train_set[1].astype(np.dtype("float32")) + training_set[1][:100] = 1 + training_set[1][100:200] = 0 + training_set = training_set[0], training_set[1].astype(np.dtype("float32")) ll = LinearLearner( "SageMakerRole", @@ -85,30 +79,23 @@ def test_linear_learner(sagemaker_session, cpu_instance_type): ll.huber_delta = 0.1 ll.early_stopping_tolerance = 0.0001 ll.early_stopping_patience = 3 - ll.fit(ll.record_set(train_set[0][:200], train_set[1][:200]), job_name=job_name) + ll.fit(ll.record_set(training_set[0][:200], training_set[1][:200]), job_name=job_name) with timeout_and_delete_endpoint_by_name(job_name, sagemaker_session): predictor = ll.deploy(1, cpu_instance_type, endpoint_name=job_name) - result = predictor.predict(train_set[0][0:100]) + result = predictor.predict(training_set[0][0:100]) assert len(result) == 100 for record in result: assert record.label["predicted_label"] is not None assert record.label["score"] is not None -def test_linear_learner_multiclass(sagemaker_session, cpu_instance_type): +def test_linear_learner_multiclass(sagemaker_session, cpu_instance_type, training_set): job_name = unique_name_from_base("linear-learner") with timeout(minutes=TRAINING_DEFAULT_TIMEOUT_MINUTES): - data_path = os.path.join(DATA_DIR, "one_p_mnist", "mnist.pkl.gz") - pickle_args = {} if sys.version_info.major == 2 else {"encoding": "latin1"} - - # Load the data into memory as numpy arrays - with gzip.open(data_path, "rb") as f: - train_set, _, _ = pickle.load(f, **pickle_args) - - train_set = train_set[0], train_set[1].astype(np.dtype("float32")) + training_set = training_set[0], training_set[1].astype(np.dtype("float32")) ll = LinearLearner( "SageMakerRole", @@ -120,32 +107,25 @@ def test_linear_learner_multiclass(sagemaker_session, cpu_instance_type): ) ll.epochs = 1 - ll.fit(ll.record_set(train_set[0][:200], train_set[1][:200]), job_name=job_name) + ll.fit(ll.record_set(training_set[0][:200], training_set[1][:200]), job_name=job_name) with timeout_and_delete_endpoint_by_name(job_name, sagemaker_session): predictor = ll.deploy(1, cpu_instance_type, endpoint_name=job_name) - result = predictor.predict(train_set[0][0:100]) + result = predictor.predict(training_set[0][0:100]) assert len(result) == 100 for record in result: assert record.label["predicted_label"] is not None assert record.label["score"] is not None -def test_async_linear_learner(sagemaker_session, cpu_instance_type): +def test_async_linear_learner(sagemaker_session, cpu_instance_type, training_set): job_name = unique_name_from_base("linear-learner") with timeout(minutes=TRAINING_DEFAULT_TIMEOUT_MINUTES): - data_path = os.path.join(DATA_DIR, "one_p_mnist", "mnist.pkl.gz") - pickle_args = {} if sys.version_info.major == 2 else {"encoding": "latin1"} - - # Load the data into memory as numpy arrays - with gzip.open(data_path, "rb") as f: - train_set, _, _ = pickle.load(f, **pickle_args) - - train_set[1][:100] = 1 - train_set[1][100:200] = 0 - train_set = train_set[0], train_set[1].astype(np.dtype("float32")) + training_set[1][:100] = 1 + training_set[1][100:200] = 0 + training_set = training_set[0], training_set[1].astype(np.dtype("float32")) ll = LinearLearner( "SageMakerRole", @@ -189,7 +169,11 @@ def test_async_linear_learner(sagemaker_session, cpu_instance_type): ll.huber_delta = 0.1 ll.early_stopping_tolerance = 0.0001 ll.early_stopping_patience = 3 - ll.fit(ll.record_set(train_set[0][:200], train_set[1][:200]), wait=False, job_name=job_name) + ll.fit( + ll.record_set(training_set[0][:200], training_set[1][:200]), + wait=False, + job_name=job_name, + ) print("Waiting to re-attach to the training job: %s" % job_name) time.sleep(20) @@ -203,7 +187,7 @@ def test_async_linear_learner(sagemaker_session, cpu_instance_type): ) predictor = model.deploy(1, cpu_instance_type, endpoint_name=job_name) - result = predictor.predict(train_set[0][0:100]) + result = predictor.predict(training_set[0][0:100]) assert len(result) == 100 for record in result: assert record.label["predicted_label"] is not None diff --git a/tests/integ/test_pca.py b/tests/integ/test_pca.py index 8babd0bcc7..0787b9f2cd 100644 --- a/tests/integ/test_pca.py +++ b/tests/integ/test_pca.py @@ -12,29 +12,25 @@ # language governing permissions and limitations under the License. from __future__ import absolute_import -import gzip -import os -import pickle -import sys import time +import pytest + import sagemaker.amazon.pca from sagemaker.utils import unique_name_from_base -from tests.integ import DATA_DIR, TRAINING_DEFAULT_TIMEOUT_MINUTES +from tests.integ import datasets, TRAINING_DEFAULT_TIMEOUT_MINUTES from tests.integ.timeout import timeout, timeout_and_delete_endpoint_by_name -def test_pca(sagemaker_session, cpu_instance_type): - job_name = unique_name_from_base("pca") +@pytest.fixture +def training_set(): + return datasets.one_p_mnist() - with timeout(minutes=TRAINING_DEFAULT_TIMEOUT_MINUTES): - data_path = os.path.join(DATA_DIR, "one_p_mnist", "mnist.pkl.gz") - pickle_args = {} if sys.version_info.major == 2 else {"encoding": "latin1"} - # Load the data into memory as numpy arrays - with gzip.open(data_path, "rb") as f: - train_set, _, _ = pickle.load(f, **pickle_args) +def test_pca(sagemaker_session, cpu_instance_type, training_set): + job_name = unique_name_from_base("pca") + with timeout(minutes=TRAINING_DEFAULT_TIMEOUT_MINUTES): pca = sagemaker.amazon.pca.PCA( role="SageMakerRole", train_instance_count=1, @@ -47,7 +43,7 @@ def test_pca(sagemaker_session, cpu_instance_type): pca.algorithm_mode = "randomized" pca.subtract_mean = True pca.extra_components = 5 - pca.fit(pca.record_set(train_set[0][:100]), job_name=job_name) + pca.fit(pca.record_set(training_set[0][:100]), job_name=job_name) with timeout_and_delete_endpoint_by_name(job_name, sagemaker_session): pca_model = sagemaker.amazon.pca.PCAModel( @@ -60,24 +56,17 @@ def test_pca(sagemaker_session, cpu_instance_type): initial_instance_count=1, instance_type=cpu_instance_type, endpoint_name=job_name ) - result = predictor.predict(train_set[0][:5]) + result = predictor.predict(training_set[0][:5]) assert len(result) == 5 for record in result: assert record.label["projection"] is not None -def test_async_pca(sagemaker_session, cpu_instance_type): +def test_async_pca(sagemaker_session, cpu_instance_type, training_set): job_name = unique_name_from_base("pca") with timeout(minutes=5): - data_path = os.path.join(DATA_DIR, "one_p_mnist", "mnist.pkl.gz") - pickle_args = {} if sys.version_info.major == 2 else {"encoding": "latin1"} - - # Load the data into memory as numpy arrays - with gzip.open(data_path, "rb") as f: - train_set, _, _ = pickle.load(f, **pickle_args) - pca = sagemaker.amazon.pca.PCA( role="SageMakerRole", train_instance_count=1, @@ -90,7 +79,7 @@ def test_async_pca(sagemaker_session, cpu_instance_type): pca.algorithm_mode = "randomized" pca.subtract_mean = True pca.extra_components = 5 - pca.fit(pca.record_set(train_set[0][:100]), wait=False, job_name=job_name) + pca.fit(pca.record_set(training_set[0][:100]), wait=False, job_name=job_name) print("Detached from training job. Will re-attach in 20 seconds") time.sleep(20) @@ -107,7 +96,7 @@ def test_async_pca(sagemaker_session, cpu_instance_type): initial_instance_count=1, instance_type=cpu_instance_type, endpoint_name=job_name ) - result = predictor.predict(train_set[0][:5]) + result = predictor.predict(training_set[0][:5]) assert len(result) == 5 for record in result: diff --git a/tests/integ/test_record_set.py b/tests/integ/test_record_set.py index c92ba9e016..4dfe59ed6a 100644 --- a/tests/integ/test_record_set.py +++ b/tests/integ/test_record_set.py @@ -12,15 +12,10 @@ # language governing permissions and limitations under the License. from __future__ import absolute_import -import gzip -import os -import pickle -import sys - from six.moves.urllib.parse import urlparse from sagemaker import KMeans -from tests.integ import DATA_DIR +from tests.integ import datasets def test_record_set(sagemaker_session, cpu_instance_type): @@ -28,10 +23,6 @@ def test_record_set(sagemaker_session, cpu_instance_type): In particular, test that the objects uploaded to the S3 bucket are encrypted. """ - data_path = os.path.join(DATA_DIR, "one_p_mnist", "mnist.pkl.gz") - pickle_args = {} if sys.version_info.major == 2 else {"encoding": "latin1"} - with gzip.open(data_path, "rb") as file_object: - train_set, _, _ = pickle.load(file_object, **pickle_args) kmeans = KMeans( role="SageMakerRole", train_instance_count=1, @@ -39,7 +30,7 @@ def test_record_set(sagemaker_session, cpu_instance_type): k=10, sagemaker_session=sagemaker_session, ) - record_set = kmeans.record_set(train_set[0][:100], encrypt=True) + record_set = kmeans.record_set(datasets.one_p_mnist()[0][:100], encrypt=True) parsed_url = urlparse(record_set.s3_data) s3_client = sagemaker_session.boto_session.client("s3") head = s3_client.head_object(Bucket=parsed_url.netloc, Key=parsed_url.path.lstrip("/")) diff --git a/tests/integ/test_transformer.py b/tests/integ/test_transformer.py index b87a6064b1..43ccfac164 100644 --- a/tests/integ/test_transformer.py +++ b/tests/integ/test_transformer.py @@ -12,11 +12,8 @@ # language governing permissions and limitations under the License. from __future__ import absolute_import -import gzip import json import os -import pickle -import sys import time import pytest @@ -29,6 +26,7 @@ from sagemaker.estimator import Estimator from sagemaker.utils import unique_name_from_base from tests.integ import ( + datasets, DATA_DIR, PYTHON_VERSION, TRAINING_DEFAULT_TIMEOUT_MINUTES, @@ -108,14 +106,6 @@ def test_transform_mxnet( @pytest.mark.canary_quick def test_attach_transform_kmeans(sagemaker_session, cpu_instance_type): - data_path = os.path.join(DATA_DIR, "one_p_mnist") - pickle_args = {} if sys.version_info.major == 2 else {"encoding": "latin1"} - - # Load the data into memory as numpy arrays - train_set_path = os.path.join(data_path, "mnist.pkl.gz") - with gzip.open(train_set_path, "rb") as f: - train_set, _, _ = pickle.load(f, **pickle_args) - kmeans = KMeans( role="SageMakerRole", train_instance_count=1, @@ -134,14 +124,14 @@ def test_attach_transform_kmeans(sagemaker_session, cpu_instance_type): kmeans.half_life_time_size = 1 kmeans.epochs = 1 - records = kmeans.record_set(train_set[0][:100]) + records = kmeans.record_set(datasets.one_p_mnist()[0][:100]) job_name = unique_name_from_base("test-kmeans-attach") with timeout(minutes=TRAINING_DEFAULT_TIMEOUT_MINUTES): kmeans.fit(records, job_name=job_name) - transform_input_path = os.path.join(data_path, "transform_input.csv") + transform_input_path = os.path.join(DATA_DIR, "one_p_mnist", "transform_input.csv") transform_input_key_prefix = "integ-test-data/one_p_mnist/transform" transform_input = kmeans.sagemaker_session.upload_data( path=transform_input_path, key_prefix=transform_input_key_prefix @@ -235,15 +225,8 @@ def test_transform_mxnet_tags( def test_transform_byo_estimator(sagemaker_session, cpu_instance_type): - data_path = os.path.join(DATA_DIR, "one_p_mnist") - pickle_args = {} if sys.version_info.major == 2 else {"encoding": "latin1"} tags = [{"Key": "some-tag", "Value": "value-for-tag"}] - # Load the data into memory as numpy arrays - train_set_path = os.path.join(data_path, "mnist.pkl.gz") - with gzip.open(train_set_path, "rb") as f: - train_set, _, _ = pickle.load(f, **pickle_args) - kmeans = KMeans( role="SageMakerRole", train_instance_count=1, @@ -262,7 +245,7 @@ def test_transform_byo_estimator(sagemaker_session, cpu_instance_type): kmeans.half_life_time_size = 1 kmeans.epochs = 1 - records = kmeans.record_set(train_set[0][:100]) + records = kmeans.record_set(datasets.one_p_mnist()[0][:100]) job_name = unique_name_from_base("test-kmeans-attach") @@ -272,7 +255,7 @@ def test_transform_byo_estimator(sagemaker_session, cpu_instance_type): estimator = Estimator.attach(training_job_name=job_name, sagemaker_session=sagemaker_session) estimator._enable_network_isolation = True - transform_input_path = os.path.join(data_path, "transform_input.csv") + transform_input_path = os.path.join(DATA_DIR, "one_p_mnist", "transform_input.csv") transform_input_key_prefix = "integ-test-data/one_p_mnist/transform" transform_input = kmeans.sagemaker_session.upload_data( path=transform_input_path, key_prefix=transform_input_key_prefix diff --git a/tests/integ/test_tuner.py b/tests/integ/test_tuner.py index 38cef9c8df..b196baa952 100644 --- a/tests/integ/test_tuner.py +++ b/tests/integ/test_tuner.py @@ -12,22 +12,15 @@ # language governing permissions and limitations under the License. from __future__ import absolute_import -import gzip import json import os -import pickle -import sys import time import numpy as np import pytest -import tests.integ from botocore.exceptions import ClientError -from tests.integ import DATA_DIR, PYTHON_VERSION, TUNING_DEFAULT_TIMEOUT_MINUTES -from tests.integ.record_set import prepare_record_set_from_local_files -from tests.integ.timeout import timeout, timeout_and_delete_endpoint_by_name -from tests.integ import vpc_test_utils +import tests.integ from sagemaker import KMeans, LDA, RandomCutForest from sagemaker.amazon.amazon_estimator import get_image_uri from sagemaker.amazon.common import read_records @@ -48,19 +41,22 @@ create_identical_dataset_and_algorithm_tuner, ) from sagemaker.utils import unique_name_from_base +from tests.integ import ( + datasets, + vpc_test_utils, + DATA_DIR, + PYTHON_VERSION, + TUNING_DEFAULT_TIMEOUT_MINUTES, +) +from tests.integ.record_set import prepare_record_set_from_local_files +from tests.integ.timeout import timeout, timeout_and_delete_endpoint_by_name DATA_PATH = os.path.join(DATA_DIR, "iris", "data") @pytest.fixture(scope="module") def kmeans_train_set(sagemaker_session): - data_path = os.path.join(DATA_DIR, "one_p_mnist", "mnist.pkl.gz") - pickle_args = {} if sys.version_info.major == 2 else {"encoding": "latin1"} - # Load the data into memory as numpy arrays - with gzip.open(data_path, "rb") as f: - train_set, _, _ = pickle.load(f, **pickle_args) - - return train_set + return datasets.one_p_mnist() @pytest.fixture(scope="module") @@ -383,8 +379,10 @@ def test_create_transfer_learning_tuner( def test_tuning_kmeans_identical_dataset_algorithm_tuner_from_non_terminal_parent( sagemaker_session, kmeans_train_set, kmeans_estimator, hyperparameter_ranges ): - """Tests Identical dataset and algorithm use case with one non terminal parent and child job launched with - .identical_dataset_and_algorithm_tuner() """ + """Tests Identical dataset and algorithm use case with + one non terminal parent and child job launched with + .identical_dataset_and_algorithm_tuner() + """ parent_tuning_job_name = unique_name_from_base("km-non-term", max_length=32) child_tuning_job_name = unique_name_from_base("km-non-term-child", max_length=32) @@ -848,12 +846,6 @@ def test_tuning_byo_estimator(sagemaker_session, cpu_instance_type): training_data_path = os.path.join(DATA_DIR, "dummy_tensor") with timeout(minutes=TUNING_DEFAULT_TIMEOUT_MINUTES): - data_path = os.path.join(DATA_DIR, "one_p_mnist", "mnist.pkl.gz") - pickle_args = {} if sys.version_info.major == 2 else {"encoding": "latin1"} - - with gzip.open(data_path, "rb") as f: - train_set, _, _ = pickle.load(f, **pickle_args) - prefix = "test_byo_estimator" key = "recordio-pb-data" s3_train_data = sagemaker_session.upload_data( @@ -900,7 +892,7 @@ def test_tuning_byo_estimator(sagemaker_session, cpu_instance_type): predictor.content_type = "application/json" predictor.deserializer = json_deserializer - result = predictor.predict(train_set[0][:10]) + result = predictor.predict(datasets.one_p_mnist()[0][:10]) assert len(result["predictions"]) == 10 for prediction in result["predictions"]: diff --git a/tests/integ/test_tuner_multi_algo.py b/tests/integ/test_tuner_multi_algo.py index 3f8a00a34a..ad933075e1 100644 --- a/tests/integ/test_tuner_multi_algo.py +++ b/tests/integ/test_tuner_multi_algo.py @@ -12,13 +12,11 @@ # language governing permissions and limitations under the License. from __future__ import absolute_import -import gzip import json import os -import pickle -import sys import pytest + from sagemaker import utils from sagemaker.amazon.amazon_estimator import get_image_uri from sagemaker.analytics import HyperparameterTuningJobAnalytics @@ -26,14 +24,11 @@ from sagemaker.estimator import Estimator from sagemaker.predictor import json_deserializer from sagemaker.tuner import ContinuousParameter, IntegerParameter, HyperparameterTuner - -from tests.integ import DATA_DIR, TUNING_DEFAULT_TIMEOUT_MINUTES +from tests.integ import datasets, DATA_DIR, TUNING_DEFAULT_TIMEOUT_MINUTES from tests.integ.timeout import timeout, timeout_and_delete_endpoint_by_name BASE_TUNING_JOB_NAME = "multi-algo-pysdk" -DATA_PATH = os.path.join(DATA_DIR, "one_p_mnist", "mnist.pkl.gz") - EXECUTION_ROLE = "SageMakerRole" STRATEGY = "Bayesian" @@ -63,10 +58,7 @@ @pytest.fixture(scope="module") def data_set(): - pickle_args = {} if sys.version_info.major == 2 else {"encoding": "latin1"} - with gzip.open(DATA_PATH, "rb") as f: - data_set, _, _ = pickle.load(f, **pickle_args) - return data_set + return datasets.one_p_mnist() @pytest.fixture(scope="function")