Seamless pipelines for AMI water data, now as easy to access as turning on a tap.
A core function of this project is to provide a set of adapters for various AMI data sources. These adapters access each AMI data source with minimal setup. They're able to extract data and transform it into our standard format, then store that standardized data in various storage sinks, like Snowflake.
Here are the adapters in this project:
Adapter | Implementation Status | Compatible Sinks | Documentation |
---|---|---|---|
Beacon360 | Complete | Snowflake | Link |
Aclara | Complete | Snowflake | Link |
Metersense | Complete | Snowflake | Link |
Sentryx | Complete | Snowflake | Link |
Xylem for Moulton Niguel | Complete. | Snowflake | Link |
Subeca | Complete | Snowflake | Link |
Neptune 360 | Complete | Snowflake | Link |
Harmony | Planned | n/a | n/a |
- amiadapters - Standalone Python library that adapts AMI data sources into our standard data format
- amicontrol - The control plane for our AMI data pipeline. This houses Airflow DAGs and other code to operate the pipline.
- amideploy - IaC code to stand up an AMI Connect pipeline in the cloud. See README for instructions.
- docs - End-user and maintainer documentation.
- sql - Dumping ground for SQL used to manage storage sinks, e.g. Snowflake.
- test - Unittests for all python code in the project.
Contributions should be reviewed and approved via a Pull Request to the main
branch. Please squash commits when you merge your PR.
This is a Python 3.12 project. To set up your local python environment, create a virtual environment with:
python3.12 -m venv venv
source venv/bin/activate
pip install -r requirements.txt
You may need to install rust
to get airflow to install. Follow instructions in the rust error message.
We use black
to format Python files. Before making a commit, format files with:
black .
Run unit tests from the project's root directory with
python -m unittest
You can use the CLI called cli.py
to interact with AMI Connect. Turn on the virtual environment and run:
python cli.py --help
Use it to:
- Configure your production pipeline
- Run the pipeline locally
Production AMI Connect pipelines are configured using the Snowflake database where the pipeline stores AMI data.
To see your pipeline's configuration, run:
python cli.py config get
There is a variety of configuration changes you might make with the CLI. In general, you can use the --help
flag or look at the code to see how they work.
One common task is to add a new source (a.k.a. utility or agency) to the system. Here's an example that adds the my_utility
source which uses the aclara
adapter:
python cli.py config add-source my_utility aclara America/Los_Angeles --sftp-host my-sftp-host --sftp-remote-data-directory ./data --sftp-local-download-directory ./output --sftp-local-known-hosts-file ./known-hosts --sinks my_snowflake
Each adapter type requires its own configuration. You'll need to use the correct CLI options for a given adapter. Check ./docs/adapters for that info.
For now, we use a YAML file to configure secrets:
secrets.yaml
You should get this file from a colleague, or create one based on./secrets.yaml.example
.
The ./cli.py
CLI will run the pipeline on your laptop. It will extract data from the sources in your local config, transform, then load the data into your configured sinks. Run with:
python cli.py run
It's common to comment out or modify lines in this script while testing.
Use the deploy.sh
script to deploy new code to your AMI Connect pipeline. As of this writing, the script
simply copies new code to the Airflow server and updates python dependencies.
You'll need to tell the script the hostname of your Airflow server. You can set the AMI_CONNECT__AIRFLOW_SERVER_HOSTNAME
environment variable to your hostname.
The script assumes you've stored a key pair at ./amideploy/configuration/airflow-key.pem
.
For configuration, it expects a secrets.prod.yaml
file. This will be used to configure
your production pipeline alongside the config in the Snowflake database.
Example deploy:
export AMI_CONNECT__AIRFLOW_SERVER_HOSTNAME=<my EC2 hostname> && sh deploy.sh
We use Apache Airflow to orchestrate our data pipeline. The Airflow code is in the amicontrol
package.
With the dependencies from requirements.txt
installed, you should be able to run Airflow locally. First,
set the AIRFLOW_HOME variable and your PYTHONPATH:
mkdir ./amicontrol/airflow
export AIRFLOW_HOME=`pwd`/amicontrol/airflow
export PYTHONPATH="${PYTHONPATH}:./amiadapters"
If it's your first time running the application on your machine, initialize the local Airflow app in AIRFLOW_HOME
with:
airflow standalone
You'll want to modify the configuration to pick up our DAGs. In $AIRFLOW_HOME/airflow.cfg
, change to these values:
dags_folder = {/your path to repo}/ami-connect/amicontrol/dags
load_examples = False
Before you run airflow standalone
, set these environment variables:
# This fixes hanging HTTP requests
# See: https://stackoverflow.com/questions/75980623/why-is-my-airflow-hanging-up-if-i-send-a-http-request-inside-a-task
export NO_PROXY="*"
Re-run the command to start the application:
airflow standalone
Watch for the admin
user and its password in stdout
. Use those credentials to login. You should see our DAGs!
Adapters integrate an AMI data source with our pipeline. In general, when you create one, you'll need to define how it extracts data from the AMI data source, how it transforms that data into our generalized format, and (optionally) how it stores raw extracted data into storage sinks.
Steps:
- In a new python file within
amiadapters
, create a new implementation of theBaseAMIAdapter
abstract class. - Define an extract function that retrieves AMI data from a source and outputs it using an output controller. This function shouldn't alter the data much - its output should stay as close to the source data to reduce risk of error and to give us a historical record of the source data via our stored intermediate outputs.
- Define a transform function that takes the data from extract and transforms it into our generic data models, then outputs it using an output controller.
- If you're loading the raw data into a storage sink, you should define that step using something like RawSnowflakeLoader.
- For the pipeline to run your adapter, the
AMIAdapterConfiguration.adapters()
function will need to be able to instantiate your adapter. This will require some code inconfig.py
. You'll need to figure out what specific config is required by your adapter for the configuration table'sconfiguration_sources.sources
column, then be able to parse that block, e.g. withinAMIAdapterConfiguration.from_database()
and the CLI which is used to add/update sources.
Add any documentation that's specific to this adapter to ./docs/adapters.