diff --git a/.env.example b/.env.example index 127c3523..c8333fe7 100644 --- a/.env.example +++ b/.env.example @@ -14,6 +14,20 @@ SUBSCRIPTION_ID = '' LOCATION = '' BASE_NAME = '' RESOURCE_GROUP = '' + +# Observability related +APPLICATIONINSIGHTS_CONNECTION_STRING = '' +LOG_TO_CONSOLE = 'false' +# DEBUG, INFO, WARNING, ERROR, CRITICAL +LOG_LEVEL = 'DEBUG' +# Probability 0.0 -> 1.0 +LOG_SAMPLING_RATE = '1.0' +# Probability 0.0 -> 1.0 +TRACE_SAMPLING_RATE = '1.0' +# Seconds +METRICS_EXPORT_INTERVAL = '15' + +# Azure ML Workspace Variables WORKSPACE_NAME = '' ACI_DEPLOYMENT_NAME = '' @@ -21,7 +35,7 @@ ACI_DEPLOYMENT_NAME = '' # Variables that are defined in variables-template.yml # they determine _how_ the project runs #################################################### -SOURCES_DIR_TRAIN = 'ml_model' +SOURCES_DIR_TRAIN = '.' EXPERIMENT_NAME = 'flower_classification' DATASET_NAME = 'flower_dataset' # Optional. Set it if you have configured non default datastore to point to your data diff --git a/.pipelines/02-processing-data.yml b/.pipelines/02-processing-data.yml index 1b74c56b..1d1ea91a 100644 --- a/.pipelines/02-processing-data.yml +++ b/.pipelines/02-processing-data.yml @@ -51,6 +51,8 @@ stages: # Invoke the Python building and publishing a data preprocessing pipeline python -m ml_service.pipelines.build_data_processing_pipeline displayName: 'Publish Data Preprocessing Pipeline' + env: + APPLICATIONINSIGHTS_CONNECTION_STRING: $(APPLICATIONINSIGHTS_CONNECTION_STRING) # Trigger_Preprocessing_Pipeline - template: trigger-preprocessing-pipeline.yml diff --git a/.pipelines/03-train-evaluate-register-model.yml b/.pipelines/03-train-evaluate-register-model.yml index daa85317..148751bc 100644 --- a/.pipelines/03-train-evaluate-register-model.yml +++ b/.pipelines/03-train-evaluate-register-model.yml @@ -62,6 +62,8 @@ stages: # Invoke the Python building and publishing a training pipeline python -m ml_service.pipelines.build_training_pipeline displayName: 'Publish Azure Machine Learning Pipeline' + env: + APPLICATIONINSIGHTS_CONNECTION_STRING: $(APPLICATIONINSIGHTS_CONNECTION_STRING) - stage: 'Trigger_Training_Pipeline' displayName: 'Train and evaluate model' @@ -75,6 +77,7 @@ stages: container: mlops timeoutInMinutes: 0 steps: + - template: update-ci-dependencies.yml - task: AzureCLI@1 inputs: azureSubscription: '$(WORKSPACE_SVC_CONNECTION)' @@ -89,6 +92,9 @@ stages: echo "##vso[task.setvariable variable=AMLPIPELINEID;isOutput=true]$AMLPIPELINEID" name: 'getpipelineid' displayName: 'Get Pipeline ID' + env: + APPLICATIONINSIGHTS_CONNECTION_STRING: $(APPLICATIONINSIGHTS_CONNECTION_STRING) + - job: "Run_ML_Pipeline" dependsOn: "Get_Pipeline_ID" displayName: "Trigger ML Training Pipeline" diff --git a/.pipelines/04-deploy-model-aci.yml b/.pipelines/04-deploy-model-aci.yml index acbc1e5e..3c2aff6d 100644 --- a/.pipelines/04-deploy-model-aci.yml +++ b/.pipelines/04-deploy-model-aci.yml @@ -67,3 +67,5 @@ stages: set -e # fail on error export SUBSCRIPTION_ID=$(az account show --query id -o tsv) python -m ml_service.util.smoke_test_scoring_service --service "$(ACI_DEPLOYMENT_NAME)" + env: + APPLICATIONINSIGHTS_CONNECTION_STRING: $(APPLICATIONINSIGHTS_CONNECTION_STRING) diff --git a/.pipelines/07-processing-data-os-cmd.yml b/.pipelines/07-processing-data-os-cmd.yml index c7da1ded..50c85ea7 100644 --- a/.pipelines/07-processing-data-os-cmd.yml +++ b/.pipelines/07-processing-data-os-cmd.yml @@ -51,6 +51,8 @@ stages: # Invoke the Python building and publishing a data preprocessing pipeline python -m ml_service.pipelines.build_data_processing_os_cmd_pipeline displayName: 'Publish Data Preprocessing OS cmd Pipeline' + env: + APPLICATIONINSIGHTS_CONNECTION_STRING: $(APPLICATIONINSIGHTS_CONNECTION_STRING) # Trigger_Preprocessing_Pipeline - template: trigger-preprocessing-pipeline.yml diff --git a/.pipelines/code-quality-template.yml b/.pipelines/code-quality-template.yml index 7e769658..c83de34b 100644 --- a/.pipelines/code-quality-template.yml +++ b/.pipelines/code-quality-template.yml @@ -1,5 +1,7 @@ # Pipeline template to run linting, unit tests with code coverage, and publish the results. steps: +- template: update-ci-dependencies.yml + - script: | flake8 --output-file=lint-testresults.xml --format junit-xml displayName: 'Run lint tests' diff --git a/.pipelines/trigger-preprocessing-pipeline.yml b/.pipelines/trigger-preprocessing-pipeline.yml index a7c8e80b..7ff5698c 100644 --- a/.pipelines/trigger-preprocessing-pipeline.yml +++ b/.pipelines/trigger-preprocessing-pipeline.yml @@ -16,6 +16,7 @@ stages: container: mlops timeoutInMinutes: 0 steps: + - template: update-ci-dependencies.yml - task: AzureCLI@1 inputs: azureSubscription: '$(WORKSPACE_SVC_CONNECTION)' @@ -30,6 +31,9 @@ stages: echo "##vso[task.setvariable variable=PREPROCESSPIPELINEID;isOutput=true]$PREPROCESSPIPELINEID" name: 'getpreprocessingpipelineid' displayName: 'Get Preprocessing Pipeline ID of ${{ parameters.aml_pipeline_name }}' + env: + APPLICATIONINSIGHTS_CONNECTION_STRING: $(APPLICATIONINSIGHTS_CONNECTION_STRING) + - job: "Run_Data_Processing_Pipeline" dependsOn: "Get_Preprocessing_Pipeline_ID" displayName: "Trigger Preprocessing Pipeline ${{ parameters.aml_pipeline_name }}" diff --git a/.pipelines/update-ci-dependencies.yml b/.pipelines/update-ci-dependencies.yml new file mode 100644 index 00000000..ff790ed6 --- /dev/null +++ b/.pipelines/update-ci-dependencies.yml @@ -0,0 +1,5 @@ +steps: +# This step ensures that the latest ci dependencies are applied to the build agent +- script: | + conda env update -f ml_model/ci_dependencies.yml -n ci + displayName: 'Update missing dependencies for current branch on build agent' \ No newline at end of file diff --git a/.pipelines/variables-template.yml b/.pipelines/variables-template.yml index 3e0c2241..d87679b2 100644 --- a/.pipelines/variables-template.yml +++ b/.pipelines/variables-template.yml @@ -4,7 +4,7 @@ variables: # The directory containing the scripts for training, evaluating, and registering the model - name: SOURCES_DIR_TRAIN - value: ml_model + value: '.' # Azure ML Variables - name: EXPERIMENT_NAME @@ -32,8 +32,8 @@ variables: - name: ALLOW_RUN_CANCEL value: "false" # Flag to allow rebuilding the AML Environment after it was built for the first time. This enables dependency updates from conda_dependencies.yaml. - # - name: AML_REBUILD_ENVIRONMENT - # value: "false" + - name: AML_REBUILD_ENVIRONMENT + value: "true" # AML Environment Config - name: AML_ENV_NAME @@ -42,6 +42,8 @@ variables: value: flower_custom_preprocess_env # AML Compute Cluster Config + - name: AML_ENV_TRAIN_CONDA_DEP_FILE + value: "ml_model/conda_dependencies.yml" - name: AML_COMPUTE_CLUSTER_CPU_SKU value: STANDARD_DS2_V2 - name: AML_COMPUTE_CLUSTER_NAME @@ -52,6 +54,24 @@ variables: value: 0 - name: AML_CLUSTER_MAX_NODES value: 4 + - name: AML_CLUSTER_PRIORITY + value: lowpriority + + # Observability related + - name: LOG_TO_CONSOLE + value: 'false' + - name: LOG_LEVEL + value: 'INFO' # DEBUG, INFO, WARNING, ERROR, CRITICAL + - name: LOG_SAMPLING_RATE + value: '1.0' # Probability 0.0 -> 1.0 + - name: TRACE_SAMPLING_RATE + value: '1.0' # Probability 0.0 -> 1.0 + - name: METRICS_EXPORT_INTERVAL + value: '15' # Seconds + + # The name for the (docker/webapp) scoring image + - name: IMAGE_NAME + value: "flowerclassifier" # AML pipelines can run outside of Azure DevOps, these parameters control AML pipeline behaviors - name: PREPROCESSING_PARAM diff --git a/ml_model/ci_dependencies.yml b/ml_model/ci_dependencies.yml index 987a0f34..38699966 100644 --- a/ml_model/ci_dependencies.yml +++ b/ml_model/ci_dependencies.yml @@ -26,4 +26,11 @@ dependencies: - tensorflow==2.3.* - keras==2.4.* + # Observability + - dataclasses==0.6 + - opencensus==0.7.11 + - opencensus-ext-httplib==0.7.3 + - opencensus-ext-logging==0.1.0 + - opencensus-context==0.1.2 + - opencensus-ext-azure==1.0.5 diff --git a/ml_model/conda_dependencies.yml b/ml_model/conda_dependencies.yml index dd6a1eb0..0c954d8b 100644 --- a/ml_model/conda_dependencies.yml +++ b/ml_model/conda_dependencies.yml @@ -23,3 +23,12 @@ dependencies: # Training deps - tensorflow==2.3.* - keras==2.4.* + + # Observability + - python-dotenv==0.12.* + - dataclasses==0.6 + - opencensus==0.7.11 + - opencensus-ext-httplib==0.7.3 + - opencensus-ext-logging==0.1.0 + - opencensus-context==0.1.2 + - opencensus-ext-azure==1.0.5 diff --git a/ml_model/dev_dependencies.yml b/ml_model/dev_dependencies.yml index 43f42db9..a8e4cac6 100644 --- a/ml_model/dev_dependencies.yml +++ b/ml_model/dev_dependencies.yml @@ -29,4 +29,10 @@ dependencies: - keras==2.4.* - debugpy - + # Observability + - dataclasses==0.6 + - opencensus==0.7.11 + - opencensus-ext-httplib==0.7.3 + - opencensus-ext-logging==0.1.0 + - opencensus-context==0.1.2 + - opencensus-ext-azure==1.0.5 \ No newline at end of file diff --git a/ml_model/evaluate/evaluate_model.py b/ml_model/evaluate/evaluate_model.py index 2776b20a..03884e69 100644 --- a/ml_model/evaluate/evaluate_model.py +++ b/ml_model/evaluate/evaluate_model.py @@ -25,7 +25,8 @@ """ from azureml.core import Run import argparse -from util.model_helper import get_model +from ml_model.util.model_helper import get_model +from ml_service.util.logger.observability import observability def evaluate_model_performs_better(model, run): @@ -37,13 +38,14 @@ def evaluate_model_performs_better(model, run): if (production_model_accuracy is None or new_model_accuracy is None): raise Exception(f"Unable to find {metric_eval} metrics, exiting evaluation") # NOQA: E501 else: - print(f"Current model accuracy: {production_model_accuracy}, new model accuracy: {new_model_accuracy}") # NOQA: E501 + observability.log(f"Current model accuracy: {production_model_accuracy}, new model accuracy: {new_model_accuracy}") # NOQA: E501 if (new_model_accuracy > production_model_accuracy): - print("New model performs better, register it") + observability.log("New model performs better, register it") return True else: - print("New model doesn't perform better, skip registration") + observability.log("New model doesn't perform better," + " skip registration") return False @@ -91,8 +93,15 @@ def main(): if(not should_register and (allow_run_cancel).lower() == 'true'): run.parent.cancel() else: - print("This is the first model, register it") + observability.log("This is the first model, register it") if __name__ == '__main__': - main() + observability.start_span('evaluate_model') + try: + main() + except Exception as exception: + observability.exception(exception) + raise exception + finally: + observability.end_span() diff --git a/ml_model/preprocessing/Dockerfile b/ml_model/preprocessing/Dockerfile index e28cb0d7..4ea98a1c 100644 --- a/ml_model/preprocessing/Dockerfile +++ b/ml_model/preprocessing/Dockerfile @@ -25,8 +25,18 @@ RUN wget --quiet https://repo.anaconda.com/miniconda/Miniconda3-${CONDA_VERSION} ~/miniconda/bin/conda clean -tipsy ENV PATH="/home/dockeruser/miniconda/bin/:${PATH}" +USER root + +RUN apt-get update --fix-missing && \ + apt-get install -y build-essential + +# Create conda environment for dockeruser user +USER dockeruser + RUN conda install -y conda=${CONDA_VERSION} python=${PYTHON_VERSION} && \ pip install azureml-defaults==${AZUREML_SDK_VERSION} inference-schema==${INFERENCE_SCHEMA_VERSION} &&\ + pip install python-dotenv==0.12.* dataclasses==0.6 opencensus==0.7.11 opencensus-ext-httplib==0.7.3 \ + opencensus-ext-azure==1.0.5 opencensus-ext-logging==0.1.0 opencensus-context==0.1.2 && \ conda clean -aqy && \ rm -rf ~/miniconda/pkgs && \ find ~/miniconda/ -type d -name __pycache__ -prune -exec rm -rf {} \; diff --git a/ml_model/preprocessing/preprocess_aml.py b/ml_model/preprocessing/preprocess_aml.py index 45ec3bf6..909631b2 100644 --- a/ml_model/preprocessing/preprocess_aml.py +++ b/ml_model/preprocessing/preprocess_aml.py @@ -26,12 +26,13 @@ from azureml.core.run import Run import argparse import json -from preprocess_images import resize_images -from util.model_helper import get_or_register_dataset, get_aml_context +from ml_model.preprocessing.preprocess_images import resize_images +from ml_model.util.model_helper import get_or_register_dataset, get_aml_context +from ml_service.util.logger.observability import observability def main(): - print("Running preprocess.py") + observability.log("Running preprocess.py") parser = argparse.ArgumentParser("preprocess") parser.add_argument( @@ -69,11 +70,12 @@ def main(): args = parser.parse_args() - print("Argument [dataset_name]: %s" % args.dataset_name) - print("Argument [datastore_name]: %s" % args.datastore_name) - print("Argument [data_file_path]: %s" % args.data_file_path) - print("Argument [output_dataset]: %s" % args.output_dataset) - print("Argument [preprocessing_param]: %s" % args.preprocessing_param) + observability.log("Argument [dataset_name]: %s" % args.dataset_name) + observability.log("Argument [datastore_name]: %s" % args.datastore_name) + observability.log("Argument [data_file_path]: %s" % args.data_file_path) + observability.log("Argument [output_dataset]: %s" % args.output_dataset) + observability.log("Argument [preprocessing_param]: %s" + % args.preprocessing_param) data_file_path = args.data_file_path dataset_name = args.dataset_name @@ -85,12 +87,12 @@ def main(): aml_workspace, *_ = get_aml_context(run) if preprocessing_param is None or preprocessing_param == "": - with open("parameters.json") as f: + with open("ml_model/parameters.json") as f: pars = json.load(f) preprocessing_args = pars["preprocessing"] else: preprocessing_args = json.loads(preprocessing_param) - print(f"preprocessing parameters {preprocessing_args}") + observability.log(f"preprocessing parameters {preprocessing_args}") for (k, v) in preprocessing_args.items(): run.log(k, v) run.parent.log(k, v) @@ -107,15 +109,22 @@ def main(): # Process data mount_context = dataset.mount() mount_context.start() - print(f"mount_point is: {mount_context.mount_point}") + observability.log(f"mount_point is: {mount_context.mount_point}") resize_images(mount_context.mount_point, output_dataset, preprocessing_args) # NOQA: E501 mount_context.stop() run.tag("run_type", value="preprocess") - print(f"tags now present for run: {run.tags}") + observability.log(f"tags now present for run: {run.tags}") run.complete() if __name__ == '__main__': - main() + observability.start_span('preprocess_aml') + try: + main() + except Exception as exception: + observability.exception(exception) + raise exception + finally: + observability.end_span() diff --git a/ml_model/preprocessing/preprocess_images.py b/ml_model/preprocessing/preprocess_images.py index 43608fd0..87799153 100644 --- a/ml_model/preprocessing/preprocess_images.py +++ b/ml_model/preprocessing/preprocess_images.py @@ -2,6 +2,7 @@ import shutil import numpy as np from PIL import Image +from ml_service.util.logger.observability import observability def resize_image(img, size): @@ -33,15 +34,15 @@ def resize_image(img, size): def resize_images(indir, outdir, preprocessing_args): size = (preprocessing_args['image_size']['x'], preprocessing_args['image_size']['y']) - print(f"indir: {indir}") - print(f"outdir: {outdir}") + observability.log(f"indir: {indir}") + observability.log(f"outdir: {outdir}") if (os.path.exists(indir)): - print("indir exists") + observability.log("indir exists") else: - print("indir doesn't exit") + observability.log("indir doesn't exit") if os.path.exists(outdir): - print("outdir exists, delete all files") + observability.log("outdir exists, delete all files") for filename in os.listdir(outdir): file_path = os.path.join(outdir, filename) if os.path.isfile(file_path): @@ -52,7 +53,7 @@ def resize_images(indir, outdir, preprocessing_args): # Loop through each subfolder in the input dir for root, dirs, filenames in os.walk(indir): for d in dirs: - print('processing folder ' + d) + observability.log('processing folder ' + d) # Create a matching subfolder in the output dir saveFolder = os.path.join(outdir, d) if not os.path.exists(saveFolder): @@ -62,12 +63,12 @@ def resize_images(indir, outdir, preprocessing_args): for f in files: # Open the file imgFile = os.path.join(root, d, f) - print("reading " + imgFile) + observability.log("reading " + imgFile) img = Image.open(imgFile) # Create a resized version and save it proc_img = resize_image(img, size) saveAs = os.path.join(saveFolder, f) - print("writing " + saveAs) + observability.log("writing " + saveAs) proc_img.save(saveAs) @@ -82,4 +83,11 @@ def main(): if __name__ == '__main__': - main() + observability.start_span('preprocess_images') + try: + main() + except Exception as exception: + observability.exception(exception) + raise exception + finally: + observability.end_span() diff --git a/ml_model/preprocessing/preprocess_os_cmd.py b/ml_model/preprocessing/preprocess_os_cmd.py index 328eabf9..f09daba4 100644 --- a/ml_model/preprocessing/preprocess_os_cmd.py +++ b/ml_model/preprocessing/preprocess_os_cmd.py @@ -4,11 +4,12 @@ from azureml.core.run import Run import argparse import subprocess -from util.model_helper import get_or_register_dataset, get_aml_context +from ml_model.util.model_helper import get_or_register_dataset, get_aml_context +from ml_service.util.logger.observability import observability def main(): - print("Running preprocess_os_cmd.py") + observability.log("Running preprocess_os_cmd.py") parser = argparse.ArgumentParser("preprocess_os_cmd") parser.add_argument( @@ -40,10 +41,10 @@ def main(): args = parser.parse_args() - print("Argument [dataset_name]: %s" % args.dataset_name) - print("Argument [datastore_name]: %s" % args.datastore_name) - print("Argument [data_file_path]: %s" % args.data_file_path) - print("Argument [output_dataset]: %s" % args.output_dataset) + observability.log("Argument [dataset_name]: %s" % args.dataset_name) + observability.log("Argument [datastore_name]: %s" % args.datastore_name) + observability.log("Argument [data_file_path]: %s" % args.data_file_path) + observability.log("Argument [output_dataset]: %s" % args.output_dataset) data_file_path = args.data_file_path dataset_name = args.dataset_name @@ -68,7 +69,7 @@ def main(): # Process data mount_context = dataset.mount() mount_context.start() - print(f"mount_point is: {mount_context.mount_point}") + observability.log(f"mount_point is: {mount_context.mount_point}") #### # Execute something here just 'cp' from input to output folder @@ -85,23 +86,30 @@ def main(): # Check output while True: output = process.stdout.readline() - print(output.strip()) + observability.log(output.strip()) # Do something else return_code = process.poll() if return_code is not None: - print('RETURN CODE', return_code) + observability.log(f'RETURN CODE {return_code}') # Process has finished, read rest of the output for output in process.stdout.readlines(): - print(output.strip()) + observability.log(output.strip()) break mount_context.stop() run.tag("run_type", value="preprocess_os_cmd") - print(f"tags now present for run: {run.tags}") + observability.log(f"tags now present for run: {run.tags}") run.complete() if __name__ == '__main__': - main() + observability.start_span('preprocess_os_cmd') + try: + main() + except Exception as exception: + observability.exception(exception) + raise exception + finally: + observability.end_span() diff --git a/ml_model/register/register_model.py b/ml_model/register/register_model.py index 3d550441..7cfe23bb 100644 --- a/ml_model/register/register_model.py +++ b/ml_model/register/register_model.py @@ -30,7 +30,8 @@ import traceback from azureml.core import Run from azureml.core.model import Model as AMLModel -from util.model_helper import get_aml_context +from ml_model.util.model_helper import get_aml_context +from ml_service.util.logger.observability import observability def find_child_run(parent_run, child_run_id): @@ -54,6 +55,7 @@ def find_run(experiment, run_id): def main(): + run = Run.get_context() ws, exp, run_id = get_aml_context(run) @@ -84,19 +86,19 @@ def main(): if (run_id == 'amlcompute'): run_id = run.parent.id run = run.parent - print(f"parent run_id is {run_id}") + observability.log(f"parent run_id is {run_id}") model_name = args.model_name model_path = args.step_input - print("Getting registration parameters") + observability.log("Getting registration parameters") # Load the registration parameters from the parameters file - with open("parameters.json") as f: + with open("ml_model/parameters.json") as f: pars = json.load(f) try: register_args = pars["registration"] except KeyError: - print("Could not load registration values from file") + observability.log("Could not load registration values from file") register_args = {"tags": []} model_tags = {} @@ -105,23 +107,23 @@ def main(): mtag = run.get_metrics()[tag] model_tags[tag] = mtag except KeyError: - print(f"Could not find {tag} metric on parent run.") + observability.log(f"Could not find {tag} metric on parent run.") parent_tags = run.get_tags() try: build_id = parent_tags["BuildId"] except KeyError: build_id = None - print("BuildId tag not found on parent run.") - print(f"Tags present: {parent_tags}") + observability.log("BuildId tag not found on parent run.") + observability.log(f"Tags present: {parent_tags}") try: build_uri = parent_tags["BuildUri"] except KeyError: build_uri = None - print("BuildUri tag not found on parent run.") - print(f"Tags present: {parent_tags}") + observability.log("BuildUri tag not found on parent run.") + observability.log(f"Tags present: {parent_tags}") - print(f"Loading training run_id from {model_path}") + observability.log(f"Loading training run_id from {model_path}") run_id_file = os.path.join(model_path, "run_id.txt") with open(run_id_file, "r") as text_file: training_run_id = text_file.read().replace('\n', '') @@ -152,7 +154,8 @@ def main(): build_id, build_uri) else: - print("Training run not found. Skipping model registration.") + observability.log("Training run not found." + "Skipping model registration.") sys.exit(0) @@ -161,7 +164,7 @@ def model_already_registered(model_name, exp, run_id): if len(model_list) >= 1: raise Exception(f"Model name: {model_name} in workspace {exp.workspace} with run_id {run_id} is already registered.") # NOQA: E501 else: - print("Model is not registered for this run.") + observability.log("Model is not registered for this run.") def register_aml_model( @@ -185,7 +188,7 @@ def register_aml_model( model_name=model_name, model_path=os.path.join("outputs", model_name), tags=tagsValue) - print( + observability.log( "Model registered: {} \nModel Description: {} " "\nModel Version: {}".format( model.name, model.description, model.version @@ -193,9 +196,16 @@ def register_aml_model( ) except Exception: traceback.print_exc(limit=None, file=None, chain=True) - print("Model registration failed") + observability.log("Model registration failed") raise if __name__ == '__main__': - main() + observability.start_span('register_model') + try: + main() + except Exception as exception: + observability.exception(exception) + raise exception + finally: + observability.end_span() diff --git a/ml_model/scoring/score.py b/ml_model/scoring/score.py index a8904714..d8b916cc 100644 --- a/ml_model/scoring/score.py +++ b/ml_model/scoring/score.py @@ -93,7 +93,7 @@ def predict_image(classifier, image_array): if len(image_urls) != len(image_classes): raise "number of urls is not same as number of classes" - with open("parameters.json") as f: + with open("ml_model/parameters.json") as f: pars = json.load(f) image_size = pars["preprocessing"]["image_size"] size = (image_size["x"], image_size["y"]) diff --git a/ml_model/training/train.py b/ml_model/training/train.py index ab4bb3a1..8172c139 100644 --- a/ml_model/training/train.py +++ b/ml_model/training/train.py @@ -29,6 +29,7 @@ from keras.layers import Dropout, Flatten, Dense from keras import optimizers from keras.preprocessing.image import ImageDataGenerator +from ml_service.util.logger.observability import observability # Split the dataframe into test and train data @@ -38,12 +39,12 @@ def split_data(data_folder, preprocessing_args): preprocessing_args['image_size']['y']) batch_size = preprocessing_args['batch_size'] - print("Getting Data...") + observability.log("Getting Data...") datagen = ImageDataGenerator( rescale=1./255, # normalize pixel values validation_split=0.3) # hold back 30% of the images for validation - print("Preparing training dataset...") + observability.log("Preparing training dataset...") train_generator = datagen.flow_from_directory( data_folder, target_size=img_size, @@ -51,7 +52,7 @@ def split_data(data_folder, preprocessing_args): class_mode='categorical', subset='training') # set as training data - print("Preparing validation dataset...") + observability.log("Preparing validation dataset...") validation_generator = datagen.flow_from_directory( data_folder, target_size=img_size, @@ -60,7 +61,7 @@ def split_data(data_folder, preprocessing_args): subset='validation') # set as validation data classes = sorted(train_generator.class_indices.keys()) - print("class names: ", classes) + observability.log(f"class names: {classes}") data = {"train": train_generator, "test": validation_generator, @@ -138,7 +139,7 @@ def get_model_metrics(history): def main(): - print("Running train.py") + observability.log("Running train.py") train_args = {"num_epochs": 10} preprocessing_args = { @@ -151,8 +152,15 @@ def main(): metrics = get_model_metrics(history) for (k, v) in metrics.items(): - print(f"{k}: {v}") + observability.log(f"{k}: {v}") if __name__ == '__main__': - main() + observability.start_span('train') + try: + main() + except Exception as exception: + observability.exception(exception) + raise exception + finally: + observability.end_span() diff --git a/ml_model/training/train_aml.py b/ml_model/training/train_aml.py index 46f3bbac..b9b32283 100644 --- a/ml_model/training/train_aml.py +++ b/ml_model/training/train_aml.py @@ -27,12 +27,13 @@ import os import argparse import json -from train import split_data, train_model, get_model_metrics -from util.model_helper import get_or_register_dataset +from ml_model.training.train import split_data, train_model, get_model_metrics +from ml_model.util.model_helper import get_or_register_dataset +from ml_service.util.logger.observability import observability def main(): - print("Running train_aml.py") + observability.log("Running train_aml.py") parser = argparse.ArgumentParser("train") parser.add_argument( @@ -69,11 +70,11 @@ def main(): ) args = parser.parse_args() - print("Argument [model_name]: %s" % args.model_name) - print("Argument [step_output]: %s" % args.step_output) - print("Argument [data_file_path]: %s" % args.data_file_path) - print("Argument [dataset_name]: %s" % args.dataset_name) - print("Argument [datastore_name]: %s" % args.datastore_name) + observability.log("Argument [model_name]: %s" % args.model_name) + observability.log("Argument [step_output]: %s" % args.step_output) + observability.log("Argument [data_file_path]: %s" % args.data_file_path) + observability.log("Argument [dataset_name]: %s" % args.dataset_name) + observability.log("Argument [datastore_name]: %s" % args.datastore_name) model_name = args.model_name step_output_path = args.step_output @@ -83,25 +84,26 @@ def main(): run = Run.get_context() - print("Getting training parameters") + observability.log("Getting training parameters") # Load the training parameters from the parameters file - with open("parameters.json") as f: + with open("ml_model/parameters.json") as f: pars = json.load(f) try: preprocessing_args = pars["preprocessing"] train_args = pars["training"] except KeyError: - print("Could not load preprocessing or training values from file") + observability.log("Could not load preprocessing or training values " + "from file") train_args = {} preprocessing_args = {} # Log the training parameters - print(f"Parameters: {preprocessing_args}") + observability.log(f"Parameters: {preprocessing_args}") for (k, v) in preprocessing_args.items(): run.log(k, v) run.parent.log(k, v) - print(f"Parameters: {train_args}") + observability.log(f"Parameters: {train_args}") for (k, v) in train_args.items(): run.log(k, v) run.parent.log(k, v) @@ -122,7 +124,7 @@ def main(): # mount the dynamic version of the dataset, which can't be determined at pipeline publish time # NOQA: E501 mount_context = dataset.mount() mount_context.start() - print(f"mount_point is: {mount_context.mount_point}") + observability.log(f"mount_point is: {mount_context.mount_point}") data = split_data(mount_context.mount_point, preprocessing_args) model, history = train_model(data, train_args, preprocessing_args) mount_context.stop() @@ -146,10 +148,17 @@ def main(): model.save(output_path) run.tag("run_type", value="train") - print(f"tags now present for run: {run.tags}") + observability.log(f"tags now present for run: {run.tags}") run.complete() if __name__ == '__main__': - main() + observability.start_span('train_aml') + try: + main() + except Exception as exception: + observability.exception(exception) + raise exception + finally: + observability.end_span() diff --git a/ml_model/util/model_helper.py b/ml_model/util/model_helper.py index 95154ffd..57bd15e8 100644 --- a/ml_model/util/model_helper.py +++ b/ml_model/util/model_helper.py @@ -1,6 +1,7 @@ """ model_helper.py """ +from ml_service.util.logger.observability import observability from azureml.core import Run from azureml.core import Workspace, Dataset, Datastore from azureml.core.model import Model as AMLModel @@ -53,7 +54,8 @@ def get_model( None. """ if aml_workspace is None: - print("No workspace defined - using current experiment workspace.") + observability.log("No workspace defined - " + "using current experiment workspace.") aml_workspace, *_ = get_aml_context(Run.get_context(allow_offline=False)) # NOQA: E501 tags = None @@ -116,14 +118,15 @@ def get_or_register_dataset( raise Exception("Datset name can't be null") if aml_workspace is None: - print("No workspace defined - using current experiment workspace.") + observability.log("No workspace defined - " + "using current experiment workspace.") aml_workspace, *_ = get_aml_context(Run.get_context()) if data_file_path == "nopath": - print(f"get latest version of dataset: {dataset_name}") + observability.log(f"get latest version of dataset: {dataset_name}") dataset = Dataset.get_by_name(aml_workspace, dataset_name) else: - print(f"register a new dataset or new version: {dataset_name}, {datastore_name}, {data_file_path}") # NOQA: E501 + observability.log(f"register a new dataset or new version: {dataset_name}, {datastore_name}, {data_file_path}") # NOQA: E501 dataset = register_dataset( aml_workspace, dataset_name, diff --git a/ml_service/pipelines/build_data_processing_os_cmd_pipeline.py b/ml_service/pipelines/build_data_processing_os_cmd_pipeline.py index b3e44900..60310d96 100644 --- a/ml_service/pipelines/build_data_processing_os_cmd_pipeline.py +++ b/ml_service/pipelines/build_data_processing_os_cmd_pipeline.py @@ -7,6 +7,7 @@ from ml_service.util.attach_compute import get_compute from ml_service.util.env_variables import Env from ml_service.util.manage_environment import get_environment +from ml_service.util.logger.observability import observability def main(): @@ -17,12 +18,12 @@ def main(): subscription_id=e.subscription_id, resource_group=e.resource_group, ) - print(f"get_workspace:{aml_workspace}") + observability.log(f"get_workspace:{aml_workspace}") # Get Azure machine learning cluster aml_compute = get_compute(aml_workspace, e.compute_name, e.vm_size) if aml_compute is not None: - print(f"aml_compute:{aml_compute}") + observability.log(f"aml_compute:{aml_compute}") # Create a reusable Azure ML environment environment = get_environment( @@ -35,6 +36,23 @@ def main(): run_config = RunConfiguration() run_config.environment = environment + # Activate AppInsights in Pipeline run: + # https://docs.microsoft.com/en-us/azure/machine-learning/how-to-log-pipelines-application-insights + # Add environment variable with Application Insights Connection String + # Replace the value with your own connection string + run_config.environment.environment_variables = { + "APPLICATIONINSIGHTS_CONNECTION_STRING": + e.app_insights_connection_string, + "LOG_LEVEL": + e.log_level, + "LOG_SAMPLING_RATE": + e.log_sampling_rate, + "TRACE_SAMPLING_RATE": + e.trace_sampling_rate, + "METRICS_EXPORT_INTERVAL": + e.metrics_export_interval + } + if e.datastore_name: datastore_name = e.datastore_name else: @@ -61,7 +79,7 @@ def main(): preprocess_step = PythonScriptStep( name="Preprocess Data with OS cmd", - script_name='preprocessing/preprocess_os_cmd.py', + script_name='ml_model/preprocessing/preprocess_os_cmd.py', compute_target=aml_compute, source_directory=e.sources_directory_train, arguments=[ @@ -73,7 +91,7 @@ def main(): runconfig=run_config, allow_reuse=False, ) - print("Step Preprocess OS cmd created") + observability.log("Step Preprocess OS cmd created") steps = [preprocess_step] preprocess_pipeline = Pipeline(workspace=aml_workspace, steps=steps) @@ -84,9 +102,16 @@ def main(): description="Data preprocessing OS cmd pipeline", version=e.build_id, ) - print(f"Published pipeline: {published_pipeline.name}") - print(f"for build {published_pipeline.version}") + observability.log(f"Published pipeline: {published_pipeline.name}") + observability.log(f"for build {published_pipeline.version}") if __name__ == "__main__": - main() + observability.start_span('build_data_processing_os_cmd_pipeline') + try: + main() + except Exception as exception: + observability.exception(exception) + raise exception + finally: + observability.end_span() diff --git a/ml_service/pipelines/build_data_processing_pipeline.py b/ml_service/pipelines/build_data_processing_pipeline.py index c84af6d8..a34843e8 100644 --- a/ml_service/pipelines/build_data_processing_pipeline.py +++ b/ml_service/pipelines/build_data_processing_pipeline.py @@ -7,6 +7,7 @@ from ml_service.util.attach_compute import get_compute from ml_service.util.env_variables import Env from ml_service.util.manage_environment import get_environment +from ml_service.util.logger.observability import observability def main(): @@ -17,12 +18,11 @@ def main(): subscription_id=e.subscription_id, resource_group=e.resource_group, ) - print(f"get_workspace:{aml_workspace}") - + observability.log(f"get_workspace:{aml_workspace}") # Get Azure machine learning cluster aml_compute = get_compute(aml_workspace, e.compute_name, e.vm_size) if aml_compute is not None: - print(f"aml_compute:{aml_compute}") + observability.log(f"aml_compute:{aml_compute}") # Create a reusable Azure ML environment environment = get_environment( @@ -34,6 +34,23 @@ def main(): run_config = RunConfiguration() run_config.environment = environment + # Activate AppInsights in Pipeline run: + # https://docs.microsoft.com/en-us/azure/machine-learning/how-to-log-pipelines-application-insights + # Add environment variable with Application Insights Connection String + # Replace the value with your own connection string + run_config.environment.environment_variables = { + "APPLICATIONINSIGHTS_CONNECTION_STRING": + e.app_insights_connection_string, + "LOG_LEVEL": + e.log_level, + "LOG_SAMPLING_RATE": + e.log_sampling_rate, + "TRACE_SAMPLING_RATE": + e.trace_sampling_rate, + "METRICS_EXPORT_INTERVAL": + e.metrics_export_interval + } + if e.datastore_name: datastore_name = e.datastore_name else: @@ -61,7 +78,7 @@ def main(): preprocess_step = PythonScriptStep( name="Preprocess Data", - script_name="preprocessing/preprocess_aml.py", + script_name="ml_model/preprocessing/preprocess_aml.py", compute_target=aml_compute, source_directory=e.sources_directory_train, arguments=[ @@ -74,7 +91,7 @@ def main(): runconfig=run_config, allow_reuse=False, ) - print("Step Preprocess created") + observability.log("Step Preprocess created") steps = [preprocess_step] preprocess_pipeline = Pipeline(workspace=aml_workspace, steps=steps) @@ -85,9 +102,16 @@ def main(): description="Data preprocessing pipeline", version=e.build_id, ) - print(f"Published pipeline: {published_pipeline.name}") - print(f"for build {published_pipeline.version}") + observability.log(f"Published pipeline: {published_pipeline.name}") + observability.log(f"for build {published_pipeline.version}") if __name__ == "__main__": - main() + observability.start_span('build_data_processing_pipeline') + try: + main() + except Exception as exception: + observability.exception(exception) + raise exception + finally: + observability.end_span() diff --git a/ml_service/pipelines/build_training_pipeline.py b/ml_service/pipelines/build_training_pipeline.py index c7c2fe56..a9afed5a 100644 --- a/ml_service/pipelines/build_training_pipeline.py +++ b/ml_service/pipelines/build_training_pipeline.py @@ -6,6 +6,7 @@ from ml_service.util.attach_compute import get_compute from ml_service.util.env_variables import Env from ml_service.util.manage_environment import get_environment +from ml_service.util.logger.observability import observability def main(): @@ -16,12 +17,12 @@ def main(): subscription_id=e.subscription_id, resource_group=e.resource_group, ) - print(f"get_workspace:{aml_workspace}") + observability.log(f"get_workspace:{aml_workspace}") # Get Azure machine learning cluster aml_compute = get_compute(aml_workspace, e.compute_name, e.vm_size) if aml_compute is not None: - print(f"aml_compute:{aml_compute}") + observability.log(f"aml_compute:{aml_compute}") # Create a reusable Azure ML environment environment = get_environment( @@ -33,6 +34,23 @@ def main(): run_config = RunConfiguration() run_config.environment = environment + # Activate AppInsights in Pipeline run: + # https://docs.microsoft.com/en-us/azure/machine-learning/how-to-log-pipelines-application-insights + # Add environment variable with Application Insights Connection String + # Replace the value with your own connection string + run_config.environment.environment_variables = { + "APPLICATIONINSIGHTS_CONNECTION_STRING": + e.app_insights_connection_string, + "LOG_LEVEL": + e.log_level, + "LOG_SAMPLING_RATE": + e.log_sampling_rate, + "TRACE_SAMPLING_RATE": + e.trace_sampling_rate, + "METRICS_EXPORT_INTERVAL": + e.metrics_export_interval + } + if e.datastore_name: datastore_name = e.datastore_name else: @@ -51,7 +69,7 @@ def main(): train_step = PythonScriptStep( name="Train Model", - script_name="training/train_aml.py", + script_name="ml_model/training/train_aml.py", compute_target=aml_compute, source_directory=e.sources_directory_train, outputs=[pipeline_data], @@ -65,11 +83,11 @@ def main(): runconfig=run_config, allow_reuse=True, ) - print("Step Train created") + observability.log("Step Train created") evaluate_step = PythonScriptStep( name="Evaluate Model ", - script_name="evaluate/evaluate_model.py", + script_name="ml_model/evaluate/evaluate_model.py", compute_target=aml_compute, source_directory=e.sources_directory_train, arguments=[ @@ -79,11 +97,11 @@ def main(): runconfig=run_config, allow_reuse=False, ) - print("Step Evaluate created") + observability.log("Step Evaluate created") register_step = PythonScriptStep( name="Register Model ", - script_name="register/register_model.py", + script_name="ml_model/register/register_model.py", compute_target=aml_compute, source_directory=e.sources_directory_train, inputs=[pipeline_data], @@ -94,15 +112,16 @@ def main(): runconfig=run_config, allow_reuse=False, ) - print("Step Register created") + observability.log("Step Register created") # Check run_evaluation flag to include or exclude evaluation step. if (e.run_evaluation).lower() == "true": - print("Include evaluation step before register step.") + observability.log("Include evaluation step before register step.") evaluate_step.run_after(train_step) register_step.run_after(evaluate_step) steps = [train_step, evaluate_step, register_step] else: - print("Exclude evaluation step and directly run register step.") + observability.log("Exclude evaluation step " + "and directly run register step.") register_step.run_after(train_step) steps = [train_step, register_step] @@ -114,9 +133,16 @@ def main(): description="Model training/retraining pipeline", version=e.build_id, ) - print(f"Published pipeline: {published_pipeline.name}") - print(f"for build {published_pipeline.version}") + observability.log(f"Published pipeline: {published_pipeline.name}") + observability.log(f"for build {published_pipeline.version}") if __name__ == "__main__": - main() + observability.start_span('build_training_pipeline') + try: + main() + except Exception as exception: + observability.exception(exception) + raise exception + finally: + observability.end_span() diff --git a/ml_service/pipelines/run_data_processing_pipeline.py b/ml_service/pipelines/run_data_processing_pipeline.py index 7df06e2f..c526be80 100644 --- a/ml_service/pipelines/run_data_processing_pipeline.py +++ b/ml_service/pipelines/run_data_processing_pipeline.py @@ -2,6 +2,7 @@ from azureml.core import Experiment, Workspace from ml_service.util.env_variables import Env import argparse +from ml_service.util.logger.observability import observability def main(): @@ -59,7 +60,7 @@ def main(): raise KeyError(f"Unable to find a published pipeline for this build {e.build_id}") # NOQA: E501 else: published_pipeline = matched_pipes[0] - print("published pipeline id is", published_pipeline.id) + observability.log(f"published pipeline id is {published_pipeline.id}") # Save the Pipeline ID for other AzDO jobs after script is complete if args.output_pipeline_id_file is not None: @@ -77,8 +78,15 @@ def main(): published_pipeline, tags=tags) - print("Pipeline run initiated ", run.id) + observability.log(f"Pipeline run initiated {run.id}") if __name__ == "__main__": - main() + observability.start_span('run_data_processing_pipeline') + try: + main() + except Exception as exception: + observability.exception(exception) + raise exception + finally: + observability.end_span() diff --git a/ml_service/pipelines/run_training_pipeline.py b/ml_service/pipelines/run_training_pipeline.py index 83eb4472..e3a4e7a0 100644 --- a/ml_service/pipelines/run_training_pipeline.py +++ b/ml_service/pipelines/run_training_pipeline.py @@ -2,6 +2,7 @@ from azureml.core import Experiment, Workspace import argparse from ml_service.util.env_variables import Env +from ml_service.util.logger.observability import observability def main(): @@ -45,7 +46,7 @@ def main(): raise KeyError(f"Unable to find a published pipeline for this build {e.build_id}") # NOQA: E501 else: published_pipeline = matched_pipes[0] - print("published pipeline id is", published_pipeline.id) + observability.log(f"published pipeline id is {published_pipeline.id}") # Save the Pipeline ID for other AzDO jobs after script is complete if args.output_pipeline_id_file is not None: @@ -65,8 +66,15 @@ def main(): tags=tags, pipeline_parameters=pipeline_parameters) - print("Pipeline run initiated ", run.id) + observability.log(f"Pipeline run initiated {run.id}") if __name__ == "__main__": - main() + observability.start_span('run_training_pipeline') + try: + main() + except Exception as exception: + observability.exception(exception) + raise exception + finally: + observability.end_span() diff --git a/ml_service/tests/__init__.py b/ml_service/tests/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/ml_service/tests/util/__init__.py b/ml_service/tests/util/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/ml_service/tests/util/test_app_insights_logger.py b/ml_service/tests/util/test_app_insights_logger.py new file mode 100644 index 00000000..ec40891c --- /dev/null +++ b/ml_service/tests/util/test_app_insights_logger.py @@ -0,0 +1,76 @@ +import logging +import unittest +from unittest.mock import MagicMock, patch + +from ml_service.util.logger.app_insights_logger import AppInsightsLogger + + +class RealAppInsightsLogger(AppInsightsLogger): + def __init__(self): + self.logger = logging.getLogger(__name__) + self.env = MockEnv("") + + +class MockRun: + def __init__(self, run_id): + self.id = run_id + self.parent = MagicMock() + self.name = run_id + self.experiment = MagicMock() + + +class MockEnv: + def __init__(self, run_id): + self.build_id = run_id + + +class TestObservability(unittest.TestCase): + @patch("ml_service.util.logger.app_insights_logger.AppInsightsLogger") + def setUp(cls, mock_app_insights_logger): + cls.concert_app_insights_logger = RealAppInsightsLogger() + cls.mock_app_insights_logger = mock_app_insights_logger + + def test_get_run_id_having_online_context(self): + expected = "FOO" + + response = self.concert_app_insights_logger.\ + get_run_id_and_set_context(MockRun("FOO")) + + self.assertEqual(expected, response) + + def test_get_run_id_having_online_context_using_build_id(self): + self.concert_app_insights_logger.env.build_id = expected = "FOO" + + response = self.concert_app_insights_logger.\ + get_run_id_and_set_context(MockRun("OfflineRun")) + + self.assertEqual(expected, response) + + def test_get_run_id_having_online_context_using_uuid(self): + self.concert_app_insights_logger.env.build_id = "" + + response = self.concert_app_insights_logger.\ + get_run_id_and_set_context(MockRun("OfflineRun")) + + self.assertIsNotNone(response) + + def test_log_called_with_parameters(self): + self.mock_app_insights_logger.log("FOO", "BAZ") + + self.mock_app_insights_logger.log.assert_called_with("FOO", "BAZ") + + def test_log_metric_called_with_parameters(self): + self.mock_app_insights_logger.log_metric("FOO", "BAZ", "BAR", False) + + self.mock_app_insights_logger.log_metric.assert_called_with( + "FOO", "BAZ", "BAR", False + ) + + def test_set_view_is_called_with_parameters(self): + self.mock_app_insights_logger.set_view("FOO", "BAR", "BAZ") + self.mock_app_insights_logger.set_view.\ + assert_called_with("FOO", "BAR", "BAZ") + + +if __name__ == "__main__": + unittest.main() diff --git a/ml_service/tests/util/test_azure_ml_logger.py b/ml_service/tests/util/test_azure_ml_logger.py new file mode 100644 index 00000000..992e0743 --- /dev/null +++ b/ml_service/tests/util/test_azure_ml_logger.py @@ -0,0 +1,32 @@ +import unittest +from unittest.mock import patch + +from ml_service.util.logger.azure_ml_logger import AzureMlLogger + + +class TestObservability(unittest.TestCase): + @patch("ml_service.util.logger.azure_ml_logger.AzureMlLogger") + def setUp(cls, mock_azure_ml_logger): + cls.azure_ml_logger = mock_azure_ml_logger + + def test_log_called_with_parameters(self): + self.azure_ml_logger.log("FOO", "BAZ") + + self.azure_ml_logger.log.assert_called_with("FOO", "BAZ") + + def test_log_metric_called_with_parameters(self): + self.azure_ml_logger.log_metric("FOO", "BAZ", "BAR") + + self.azure_ml_logger.log_metric.assert_called_with("FOO", "BAZ", "BAR") + + def test_get_callee_returns_callee_file_with_line_number(self): + azure_ml_logger = AzureMlLogger() + expected = "test_azure_ml_logger.py:26" + + response = azure_ml_logger.get_callee(0) + + self.assertEqual(expected, response) + + +if __name__ == "__main__": + unittest.main() diff --git a/ml_service/tests/util/test_observability.py b/ml_service/tests/util/test_observability.py new file mode 100644 index 00000000..c95db2e0 --- /dev/null +++ b/ml_service/tests/util/test_observability.py @@ -0,0 +1,67 @@ +import unittest +from unittest.mock import patch + +from ml_service.util.logger.observability import Observability +from ml_service.util.logger.logger_interface import Severity + + +class ObservabilityMock(Observability): + @patch("ml_service.util.logger.app_insights_logger.AppInsightsLogger") + @patch("ml_service.util.logger.azure_ml_logger.AzureMlLogger") + @patch("ml_service.util.logger.console_logger.ConsoleLogger") + @patch("ml_service.util.logger.observability.Loggers") + def __init__(self, mock_loggers, mock_console_logger, mock_aml_logger, + mock_app_insight_logger): + mock_loggers.loggers = [mock_console_logger, mock_aml_logger, + mock_app_insight_logger] + self._loggers = mock_loggers + + +class TestObservability(unittest.TestCase): + @patch("ml_service.util.logger.observability.Observability") + def setUp(cls, mock_observability): + cls.observability = mock_observability + + def test_log_metric_called_with_parameters(self): + self.observability.log_metric("FOO", "BAZ", "BAR") + + self.observability.log_metric.assert_called_with("FOO", "BAZ", "BAR") + + def test_log_called_with_parameters(self): + self.observability.log("FOO", Severity.CRITICAL) + + self.observability.log.assert_called_with("FOO", Severity.CRITICAL) + + def test_log_metric_is_being_called_by_all_loggers(self): + # Force creating a new singleton on base class + Observability._instance = None + self.observability = ObservabilityMock() + self.observability.log_metric("FOO", "BAZ", "BAR") + + self.observability._loggers.loggers[0].log_metric.assert_called_with( + "FOO", "BAZ", "BAR", False + ) + self.observability._loggers.loggers[1].log_metric.assert_called_with( + "FOO", "BAZ", "BAR", False + ) + self.observability._loggers.loggers[2].log_metric.assert_called_with( + "FOO", "BAZ", "BAR", False + ) + + def test_log_is_being_called_by_all_loggers(self): + # Force creating a new singleton on base class + Observability._instance = None + self.observability = ObservabilityMock() + + self.observability.log("FOO", Severity.CRITICAL) + + self.observability._loggers.loggers[0].\ + log.assert_called_with("FOO", Severity.CRITICAL) + self.observability._loggers.loggers[1].\ + log.assert_called_with("FOO", Severity.CRITICAL) + self.observability._loggers.loggers[2].\ + log.assert_called_with("FOO", Severity.CRITICAL) + + +if __name__ == "__main__": + unittest.main() diff --git a/ml_service/util/attach_compute.py b/ml_service/util/attach_compute.py index 8756610e..57b25163 100644 --- a/ml_service/util/attach_compute.py +++ b/ml_service/util/attach_compute.py @@ -4,6 +4,7 @@ from azureml.core.compute import ComputeTarget from azureml.exceptions import ComputeTargetException from ml_service.util.env_variables import Env +from ml_service.util.logger.observability import observability def get_compute(workspace: Workspace, compute_name: str, vm_size: str): # NOQA E501 @@ -11,7 +12,7 @@ def get_compute(workspace: Workspace, compute_name: str, vm_size: str): # NOQA if compute_name in workspace.compute_targets: compute_target = workspace.compute_targets[compute_name] if compute_target and type(compute_target) is AmlCompute: - print("Found existing compute target " + compute_name + " so using it.") # NOQA + observability.log("Found existing compute target " + compute_name + " so using it.") # NOQA else: e = Env() compute_config = AmlCompute.provisioning_configuration( @@ -33,6 +34,6 @@ def get_compute(workspace: Workspace, compute_name: str, vm_size: str): # NOQA ) return compute_target except ComputeTargetException as ex: - print(ex) + observability.exception(ex) print("An error occurred trying to provision compute.") exit(1) diff --git a/ml_service/util/env_variables.py b/ml_service/util/env_variables.py index 14ef086b..c9eaae18 100644 --- a/ml_service/util/env_variables.py +++ b/ml_service/util/env_variables.py @@ -3,7 +3,6 @@ from dataclasses import dataclass import os from typing import Optional - from dotenv import load_dotenv @@ -40,6 +39,12 @@ class Env: max_nodes: int = int(os.environ.get("AML_CLUSTER_MAX_NODES", 4)) aml_preprocessing_custom_docker_env_name: Optional[str] = os.environ.get("AML_PREPROCESSING_CUSTOM_DOCKER_ENV_NAME") # NOQA: E501 preprocessing_os_cmd_pipeline_name: Optional[str] = os.environ.get("PREPROCESSING_OS_CMD_PIPELINE_NAME") # NOQA: E501 + app_insights_connection_string: Optional[str] = os.environ.get("APPLICATIONINSIGHTS_CONNECTION_STRING") # NOQA: E501 + log_to_console: Optional[bool] = os.environ.get("LOG_TO_CONSOLE", "false").lower().strip() == "true" # NOQA: E501 + log_level: Optional[str] = os.environ.get("LOG_LEVEL", "WARNING") # NOQA: E501 + log_sampling_rate: float = float(os.environ.get("LOG_SAMPLING_RATE", 1.0)) # NOQA: E501 + trace_sampling_rate: float = float(os.environ.get("TRACE_SAMPLING_RATE", 1.0)) # NOQA: E501 + metrics_export_interval: int = int(os.environ.get("METRICS_EXPORT_INTERVAL", 15)) # NOQA: E501 # derived variables processed_dataset_name: Optional[str] = f"{dataset_name}_processed" # NOQA: E501 diff --git a/ml_service/util/logger/__init__.py b/ml_service/util/logger/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/ml_service/util/logger/app_insights_logger.py b/ml_service/util/logger/app_insights_logger.py new file mode 100644 index 00000000..2d49f0c5 --- /dev/null +++ b/ml_service/util/logger/app_insights_logger.py @@ -0,0 +1,188 @@ +import logging + +from opencensus.ext.azure import metrics_exporter +from opencensus.ext.azure.log_exporter import AzureLogHandler +from opencensus.ext.azure.trace_exporter import AzureExporter +from opencensus.trace import config_integration +from opencensus.trace.samplers import ProbabilitySampler +from opencensus.trace.tracer import Tracer + +from opencensus.trace.span import SpanKind +from opencensus.trace.status import Status +from opencensus.stats import aggregation as aggregation_module +from opencensus.stats import measure as measure_module +from opencensus.stats import stats as stats_module +from opencensus.stats import view as view_module +from opencensus.tags import tag_map as tag_map_module + +from ml_service.util.env_variables import Env +from ml_service.util.logger.logger_interface import ( + LoggerInterface, + ObservabilityAbstract, + Severity, +) + + +class AppInsightsLogger(LoggerInterface, ObservabilityAbstract): + def __init__(self, run): + print('Initializing the AppInsightsLogger') + self.env = Env() + self.run_id = self.get_run_id_and_set_context(run) + + # Prepare integrations and initialize tracer + config_integration.trace_integrations(['httplib', 'logging']) + texporter = AzureExporter(connection_string=self. + env.app_insights_connection_string) + texporter.add_telemetry_processor(self.callback_function) + self.tracer = Tracer( + exporter=texporter, + sampler=ProbabilitySampler(self.env.trace_sampling_rate) + ) + + # Create AppInsights Handler and set log format + self.logger = logging.getLogger(__name__) + self.logger.setLevel( + getattr(logging, self.env.log_level.upper(), "WARNING")) + handler = AzureLogHandler( + connection_string=self.env.app_insights_connection_string, + logging_sampling_rate=self.env.log_sampling_rate, + ) + handler.add_telemetry_processor(self.callback_function) + self.logger.addHandler(handler) + + # initializes metric exporter + mexporter = metrics_exporter.new_metrics_exporter( + enable_standard_metrics=True, + export_interval=self.env.metrics_export_interval, + connection_string=self.env.app_insights_connection_string, + ) + mexporter.add_telemetry_processor(self.callback_function) + stats_module.stats.view_manager.register_exporter(mexporter) + + def log_metric( + self, name="", value="", description="", log_parent=False, + ): + """ + Sends a custom metric to appInsights + :param name: name of the metric + :param value: value of the metric + :param description: description of the metric + :param log_parent: not being used for this logger + :return: + """ + measurement_map = \ + stats_module.stats.stats_recorder.new_measurement_map() + tag_map = tag_map_module.TagMap() + + measure = measure_module.MeasureFloat(name, description) + self.set_view(name, description, measure) + measurement_map.measure_put_attachment(name, value) + measurement_map.record(tag_map) + + def log(self, description="", severity=Severity.INFO): + """ + Sends the logs to App Insights + :param description: log description + :param severity: log severity + :return: + """ + # Overwrite custom dimensions with caller data + modulename, filename, lineno = self.get_callee_details(2) + self.custom_dimensions[self.CUSTOM_DIMENSIONS][self.FILENAME] =\ + filename + self.custom_dimensions[self.CUSTOM_DIMENSIONS][self.LINENO] =\ + lineno + self.custom_dimensions[self.CUSTOM_DIMENSIONS][self.MODULE] =\ + modulename + if self.current_span() is not None: + self.custom_dimensions[self.CUSTOM_DIMENSIONS][self.PROCESS] =\ + self.current_span().name + + if severity == self.severity.DEBUG: + self.logger.debug(description, extra=self.custom_dimensions) + elif severity == self.severity.INFO: + self.logger.info(description, extra=self.custom_dimensions) + elif severity == self.severity.WARNING: + self.logger.warning(description, extra=self.custom_dimensions) + elif severity == self.severity.ERROR: + self.logger.error(description, extra=self.custom_dimensions) + elif severity == self.severity.CRITICAL: + self.logger.critical(description, extra=self.custom_dimensions) + + def exception(self, exception: Exception): + """ + Sends the exception to App Insights + :param exception: Actual exception to be sent + :return: + """ + self.logger.exception(exception, extra=self.custom_dimensions) + # Mark current span/operation with internal error + self.current_span().status = Status(2, exception) + self.current_span().attributes['http.status_code'] = 500 + + @staticmethod + def set_view(metric, description, measure): + """ + Sets the view for the custom metric + """ + prompt_view = view_module.View( + metric, + description, + [], + measure, + aggregation_module.LastValueAggregation() + ) + stats_module.stats.view_manager.register_view(prompt_view) + + def callback_function(self, envelope): + """ + Attaches a correlation_id as a custom + dimension to the exporter just before + sending the logs/metrics + :param envelope: + :return: Always return True + (if False, it does not export metrics/logs) + """ + envelope.data.baseData.properties[self.CORRELATION_ID] = self.run_id + return True + + def span(self, name='span'): + """Create a new span with the trace using the context information. + :type name: str + :param name: The name of the span. + :rtype: :class:`~opencensus.trace.span.Span` + :returns: The Span object. + """ + return self.start_span(name) + + def start_span(self, name='span'): + """Start a span. + :type name: str + :param name: The name of the span. + :rtype: :class:`~opencensus.trace.span.Span` + :returns: The Span object. + """ + span = self.tracer.start_span(name) + span.span_kind = SpanKind.SERVER + span.attributes['http.method'] = 'START' + span.attributes['http.route'] = name + return span + + def end_span(self): + """End a span. Remove the span from the span stack, and update the + span_id in TraceContext as the current span_id which is the peek + element in the span stack. + """ + self.tracer.end_span() + + def current_span(self): + """Return the current span.""" + return self.tracer.current_span() + + def add_attribute_to_current_span(self, attribute_key, attribute_value): + self.tracer.add_attribute_to_current_span(attribute_key, + attribute_value) + + def list_collected_spans(self): + """List collected spans.""" + self.tracer.list_collected_spans() diff --git a/ml_service/util/logger/azure_ml_logger.py b/ml_service/util/logger/azure_ml_logger.py new file mode 100644 index 00000000..10f9a207 --- /dev/null +++ b/ml_service/util/logger/azure_ml_logger.py @@ -0,0 +1,57 @@ +import datetime +import time + +from ml_service.util.logger.logger_interface import ( + LoggerInterface, + ObservabilityAbstract, + Severity, +) + + +class AzureMlLogger(LoggerInterface, ObservabilityAbstract): + def __init__(self, run=None): + self.run = run + + def log_metric(self, name, value, description, log_parent): + """Log a metric value to the run with the given name. + :param log_parent: mark True if you want to log to parent Run + :param name: The name of metric. + :type name: str + :param value: The value to be posted to the service. + :type value: + :param description: An optional metric description. + :type description: str + """ + if name != "" and value != "": + self.run.log( + name, value, description + ) if log_parent is False \ + else self.run.parent.log(name, value, description) + + def log(self, description="", severity=Severity.INFO): + """ + Sends the logs to AML (experiments -> logs/outputs) + :param description: log description + :param severity: log severity + :return: + """ + + time_stamp = datetime.datetime.fromtimestamp(time.time()).strftime( + "%Y-%m-%d %H:%M:%S" + ) + callee = self.get_callee( + 2 + ) # to get the script who is calling Observability + print( + "{}, [{}], {}:{}".format( + time_stamp, self.severity_map[severity], callee, description + ) + ) + + def exception(self, exception: Exception): + """ + Prints the exception to console + :param exception: Actual exception to be sent + :return: + """ + self.log(exception, Severity.CRITICAL) diff --git a/ml_service/util/logger/console_logger.py b/ml_service/util/logger/console_logger.py new file mode 100644 index 00000000..fc0ebdb8 --- /dev/null +++ b/ml_service/util/logger/console_logger.py @@ -0,0 +1,42 @@ +import logging + +from ml_service.util.env_variables import Env +from ml_service.util.logger.logger_interface import ( + LoggerInterface, + ObservabilityAbstract, + Severity, +) + + +class ConsoleLogger(LoggerInterface, ObservabilityAbstract): + def __init__(self, run): + self.env = Env() + # initializes log exporter + self.run_id = self.get_run_id_and_set_context(run) + self.level = getattr(logging, self.env.log_level.upper(), "WARNING") + + def log_metric( + self, name="", value="", description="", log_parent=False, + ): + self.log(f"Logging Metric for runId={self.run_id}: " + "name={name} value={value} " + "description={description} log_parent={log_parent}") + + def log(self, description="", severity=Severity.INFO): + """ + Prints the logs to console + :param description: log description + :param severity: log severity + :return: + """ + if self.level <= severity: + print(f"{description} - custom dimensions:" + f" {self.custom_dimensions}") + + def exception(self, exception: Exception): + """ + Prints the exception to console + :param exception: Actual exception to be sent + :return: + """ + print(exception) diff --git a/ml_service/util/logger/logger_interface.py b/ml_service/util/logger/logger_interface.py new file mode 100644 index 00000000..637d9047 --- /dev/null +++ b/ml_service/util/logger/logger_interface.py @@ -0,0 +1,152 @@ +import inspect +import uuid +from opencensus.trace.tracer import Tracer + + +class Severity: + DEBUG = 10 + INFO = 20 + WARNING = 30 + ERROR = 40 + CRITICAL = 50 + + +class LoggerInterface(Tracer): + + def log_metric(self, name, value, description, log_parent): + pass + + def log(self, name, value, description, severity, log_parent): + pass + + def exception(self, exception): + pass + + def finish(self): + """End the spans and send to reporters.""" + pass + + def span(self, name='span'): + """Create a new span with the trace using the context information. + :type name: str + :param name: The name of the span. + :rtype: :class:`~opencensus.trace.span.Span` + :returns: The Span object. + """ + pass + + def start_span(self, name='span'): + """Start a span. + :type name: str + :param name: The name of the span. + :rtype: :class:`~opencensus.trace.span.Span` + :returns: The Span object. + """ + pass + + def end_span(self): + """End a span. Remove the span from the span stack, and update the + span_id in TraceContext as the current span_id which is the peek + element in the span stack. + """ + pass + + def current_span(self): + """Return the current span.""" + pass + + def add_attribute_to_current_span(self, attribute_key, attribute_value): + pass + + def list_collected_spans(self): + """List collected spans.""" + pass + + +class ObservabilityAbstract: + OFFLINE_RUN = "OfflineRun" + CUSTOM_DIMENSIONS = "custom_dimensions" + CORRELATION_ID = "correlation_id" + FILENAME = "fileName" + MODULE = "module" + PROCESS = "process" + LINENO = "lineNumber" + severity = Severity() + severity_map = {10: "DEBUG", 20: "INFO", + 30: "WARNING", 40: "ERROR", 50: "CRITICAL"} + + def get_run_id_and_set_context(self, run): + """ + gets the correlation ID by the in following order: + - If the script is running in an Online run Context of AML --> run_id + - If the script is running where a build_id + environment variable is set --> build_id + - Else --> generate a unique id + + Sets also the custom context dimensions based on On or Offline run + :param run: + :return: correlation_id + """ + run_id = str(uuid.uuid1()) + if not run.id.startswith(self.OFFLINE_RUN): + run_id = run.id + self.custom_dimensions = { + 'custom_dimensions': { + "parent_run_id": run.parent.id, + "step_id": run.id, + "step_name": run.name, + "experiment_name": run.experiment.name, + "run_url": run.parent.get_portal_url(), + "offline_run": False + } + } + elif self.env.build_id: + run_id = self.env.build_id + self.custom_dimensions = { + 'custom_dimensions': { + "run_id": self.env.build_id, + "offline_run": True + } + } + else: + self.custom_dimensions = { + 'custom_dimensions': { + "run_id": run_id, + "offline_run": True + } + } + return run_id + + @staticmethod + def get_callee(stack_level): + """ + This method get the callee location in [file_name:line_number] format + :param stack_level: + :return: string of [file_name:line_number] + """ + try: + stack = inspect.stack() + file_name = stack[stack_level + 1].filename.split("/")[-1] + line_number = stack[stack_level + 1].lineno + return "{}:{}".format(file_name, line_number) + except IndexError: + print("Index error, failed to log to AzureML") + return "" + + @staticmethod + def get_callee_details(stack_level): + """ + This method returns the callee details as a tuple, + tuple values ar all strings. + :param stack_level: + :return: (module_name, file_name, line_number) + """ + try: + stack = inspect.stack() + file_name = stack[stack_level + 1].filename + line_number = stack[stack_level + 1].lineno + module_name = inspect.getmodulename(file_name) + return module_name, file_name, line_number + except IndexError: + print("Index error, failed to log to AzureML") + return "" diff --git a/ml_service/util/logger/observability.py b/ml_service/util/logger/observability.py new file mode 100644 index 00000000..8736c17a --- /dev/null +++ b/ml_service/util/logger/observability.py @@ -0,0 +1,162 @@ +from azureml.core import Run + +from ml_service.util.env_variables import Env +from ml_service.util.logger.app_insights_logger import AppInsightsLogger +from ml_service.util.logger.azure_ml_logger import AzureMlLogger +from ml_service.util.logger.console_logger import ConsoleLogger +from ml_service.util.logger.logger_interface import ( + ObservabilityAbstract, + LoggerInterface, + Severity, +) + + +class Loggers(ObservabilityAbstract): + def __init__(self) -> None: + print('Initializing the Loggers') + self.loggers: LoggerInterface = [] + self.register_loggers() + + def add(self, logger) -> None: + self.loggers.append(logger) + + def get_loggers_string(self) -> None: + return ", ".join([type(x).__name__ for x in self.loggers]) + + def register_loggers(self): + """ + This method is responsible to create loggers/tracers + and add them to the list of loggers + Notes: + - If the context of the Run object is offline, + we do not create AzureMlLogger instance + - If APPLICATIONINSIGHTS_CONNECTION_STRING is notset + to ENV variable, we do not create AppInsightsLogger + instance + """ + run = Run.get_context() + e = Env() + if not run.id.startswith(self.OFFLINE_RUN): + self.loggers.append(AzureMlLogger(run)) + if e.app_insights_connection_string: + if "InstrumentationKey" in e.app_insights_connection_string: + self.loggers.append(AppInsightsLogger(run)) + if e.log_to_console: + self.loggers.append(ConsoleLogger(run)) + + +class Observability(LoggerInterface): + _instance = None + + # Straightforward Singleton Pattern from + # https://python-patterns.guide/gang-of-four/singleton/ + def __new__(cls): + if cls._instance is None: + print('Creating the Observability Singleton') + cls._instance = super(Observability, cls).__new__(cls) + cls._instance.__initialized = False + return cls._instance + + def __init__(self) -> None: + if(not self.__initialized): + print('Initializing the Observability Singleton') + self.__initialized = True + self._loggers = Loggers() + + def log_metric( + self, name="", value="", description="", log_parent=False, + ): + """ + this method sends the metrics to all registered loggers + :param name: metric name + :param value: metric value + :param description: description of the metric + :param log_parent: (only for AML), send the metric to the run.parent + :return: + """ + for logger in self._loggers.loggers: + logger.log_metric(name, value, description, log_parent) + + def log(self, description="", severity=Severity.INFO): + """ + this method sends the logs to all registered loggers + :param description: Actual log description to be sent + :param severity: log Severity + :return: + """ + for logger in self._loggers.loggers: + logger.log(description, severity) + + def exception(self, exception: Exception): + """ + this method sends the exception to all registered loggers + :param exception: Actual exception to be sent + :return: + """ + for logger in self._loggers.loggers: + logger.exception(exception) + + def get_logger(self, logger_class): + """ + This method iterate over the loggers and it + returns the logger with the same type as the provided one. + this is a reference that can be used in case + any of the built in functions of the loggers is required + :param logger_class: + :return: a logger class + """ + for logger in self._loggers.loggers: + if type(logger) is type(logger_class): + return logger + + def span(self, name='span'): + """Create a new span with the trace using the context information + for all registered loggers. + :type name: str + :param name: The name of the span. + :rtype: :class:`~opencensus.trace.span.Span` + :returns: The Span object. + """ + for logger in self._loggers.loggers: + logger.span(name) + return self.current_span() + + def start_span(self, name='span'): + """Start a span for all registered loggers. + :type name: str + :param name: The name of the span. + :rtype: :class:`~opencensus.trace.span.Span` + :returns: The Span object. + """ + for logger in self._loggers.loggers: + logger.start_span(name) + return self.current_span() + + def end_span(self): + """End a span for all registered loggers. + Remove the span from the span stack, and update the + span_id in TraceContext as the current span_id which is the peek + element in the span stack. + """ + for logger in self._loggers.loggers: + logger.end_span() + + def current_span(self): + """Return the current span from first logger""" + if len(self._loggers.loggers) > 0: + return self._loggers.loggers[0].current_span() + + def add_attribute_to_current_span(self, attribute_key, attribute_value): + """Add attribute to current span for all registered loggers. + """ + for logger in self._loggers.loggers: + logger.add_attribute_to_current_span(attribute_key, + attribute_value) + + def list_collected_spans(self): + """List collected spans from first logger.""" + if len(self._loggers.loggers) > 0: + return self._loggers.loggers[0].list_collected_spans() + + +observability = Observability() diff --git a/ml_service/util/manage_environment.py b/ml_service/util/manage_environment.py index 0ff2c8de..fae6731d 100644 --- a/ml_service/util/manage_environment.py +++ b/ml_service/util/manage_environment.py @@ -3,6 +3,7 @@ from azureml.core import Workspace, Environment from ml_service.util.env_variables import Env from azureml.core.runconfig import DEFAULT_CPU_IMAGE, DEFAULT_GPU_IMAGE +from ml_service.util.logger.observability import observability def get_environment( @@ -61,8 +62,8 @@ def get_environment( restored_environment.register(workspace) if restored_environment is not None: - print(restored_environment) + observability.log(restored_environment) return restored_environment except Exception as e: - print(e) + observability.exception(e) exit(1)