Skip to content

Latest commit

 

History

History
585 lines (439 loc) · 25.5 KB

README.md

File metadata and controls

585 lines (439 loc) · 25.5 KB

Anomstack

Open in GitHub Codespaces

GitHub Repo stars GitHub release (latest by date) Docs License GitHub PyTest Workflow Status GitHub Deployment Workflow Status GitHub Pre-Commit Workflow Status

Painless open source anomaly detection for your metrics! 📈📉🚀

Check out this Data Engineering Podcast where we discussed Anomstack and anomaly detection in general.

Note: If you are already using Airflow then also checkout the airflow-provider-anomaly-detection package.

Supported sources and databases for your metrics to live in and be queried from:

Python BigQuery Snowflake DuckDB SQLite Redshift
🚧

Supported storage for your trained models:

Local GCS S3 Azure Blob
🚧

Supported ways to receive alerts:

Email Slack

Supported ways to run this project:

Python Env Docker Dagster Cloud GitHub Codespaces

What is Anomstack?

back to top

Anomstack is a lightweight (README buzzword bingo alert!) data app built on top of dagster (for orchestration) that lets you easily get great anomaly detection (using pyod for the ML stuff) for your metrics (whatever data platform you use) with as little pain as physically possible.

It's similar in scope and goal to this Airflow Anomaly Detection provider i also made, but easier to get going since does not require airflow and so easier to set up and run yourself or via Dagster Cloud in a serverless manner.

Quick 10 Minute Video Project Intro

Watch the video

GitHub Star History - Lets Gooooo! :)

Star History Chart

How it works

  1. Define your metrics (part of a "metric batch") in a .sql file and corresponding config in a .yaml file. You can also define your own custom python ingest function instead of just SQL, check out the python_ingest_simple example.
  2. Run Anomstack and it will automatically ingest, train, score, and alert ("jobs") on your metrics and detect anomalies (alerts via email/slack etc.).
  3. Get alerts when metrics look anomalous.

Why?

It's still too hard and messy to get decent out of the box anomaly detection on your metrics with minimal fuss. You either have to build some custom solution yourself or buy some sexy modern data stack tool that does it for you. This project aims to make it as easy as possible to get anomaly detection on your metrics without having to buy anything or build anything from scratch yourself.

Features

back to top

Here is a list of features of Anomstack (emoji alert warning!)

  1. 🌟 - You bring your metrics Anomstack will do the ML (❤️PyOD).
  2. 🚀 - Easy to run yourself or via Dagster Cloud.
  3. ⚙️ - Very flexible config, you can see all params in defaults.yaml and override them in each metric batch config.
  4. 🧠 - Ability to define your own custom python ingest function instead of just SQL, check out the python_ingest_simple example.
  5. 🛠️ - Ability to define your own custom python preprocess function instead of the default at /metrics/defaults/python/preprocess.py.
  6. 📧 - Email alerting with fancy(ish) ascii art plots of your metrics and anomaly scores.
  7. 💬 - Slack alerts too (want to make these nicer).
  8. 🤖 - LLM based alerts (ChatGPT) - see LLM Alerts. p.s. they don't work great yet - experimental :)
  9. 🕒 - Ability to ingest at whatever frequency you want and then agg to a different level for training/scoring, see freq example.
  10. 📊 - Plot jobs so you can just eyeball your metrics in Dagster job logs, see #dagster-ui-plots.
  11. 🏗️ - Minimal infrastructure requirements, Anomstack just reads from and writes to whatever database you use.
  12. 📈 - A nice little local Streamlit dashboard to visualize your metrics and anomaly scores, see #streamlit.
  13. 📦 - Dockerized for easy deployment.
  14. 🔔 - Scores & Alerts saved to database so you can query them and do whatever you want with them.
  15. 🏷️ - Add custom metric tags for more complex alert routing e.g. priority or subject area based.
  16. 🔄 - Change detection jobs out of the box.
  17. 😴 - Ability to snooze alerts for a period of time to reduce repeated and duplicate alerts.
  18. 🗞️ - Daily summary emails.

Architecture

The Moving Parts

flowchart LR;

    metric_batch_config[".yaml"]
    metric_batch_sql[".sql"]
    metric_batch_ingest_py["ingest.py"]
    metric_batch_preprocess_py["preprocess.py"]
    ingest[[ingest]]
    train[[train]]
    score[[score]]
    alert[[alert]]
    change[[change]]
    llmalert[[llmalert]]
    plot[[plot]]
    dashboardpy["dashboard.py"]

    subgraph metric_batch
    metric_batch_config
    metric_batch_sql
    metric_batch_ingest_py
    metric_batch_preprocess_py
    end

    subgraph dagster_jobs
    ingest
    train
    score
    alert
    change
    llmalert
    plot
    end

    subgraph alerts
    email
    slack
    end

    subgraph datasources
    duckdb
    bigquery
    snowflake
    python
    end

    subgraph user_inputs
    metric_batch
    end

    subgraph anomstack
    dagster_jobs
    datasources
    model_store
    alerts
    llmalert
    dashboard
    end

    subgraph model_store
    local
    gcs
    s3
    end

    subgraph dashboard
    dashboardpy
    end

    ingest --> train
    train --> score
    score --> alert
    score --> llmalert
    score --> plot
    ingest --> change
    change --> alert

    metric_batch --> dagster_jobs

    alert --> email
    alert --> slack
    llmalert --> email
    llmalert --> slack

    datasources <--> dagster_jobs
    train --> model_store
    model_store --> score
    datasources --> dashboard

Loading

Metrics Table

Core to what Anomstack is doing in reading from and appending to a "Metrics" table for each metric batch. This is a "long" format table where new metrics are appended to the table as they come in or are defined and configured as you add new metric batches.

Here are the columns in the metrics table:

  • metric_timestamp: Timestamp of the metric (Defined in ingest_sql or ingest_fn).
  • metric_batch: Name of the metric batch (Defined from metric_batch in the yaml config for the batch).
  • metric_name: Name of the metric (Defined in ingest_sql or ingest_fn).
  • metric_type: Type of the metric the row relates to.
    • metric for the raw metric value.
    • score for the anomaly score (a float from 0-1).
    • alert for an alert (a 1 when an alert was raised).
  • metric_value: Value of the metric (coming from the ingest, score, or alert jobs (see concepts for more details).
SELECT
  metric_timestamp,
  metric_batch,
  metric_name,
  metric_type,
  metric_value,
FROM
  `metrics.metrics`
WHERE
  metric_batch = 'gsod'
  and
  metric_name = 'gsod_us_temp_avg'
ORDER BY metric_timestamp DESC
limit 10
/*
+--------------------------+------------+----------------+-----------+------------+
|metric_timestamp          |metric_batch|metric_name     |metric_type|metric_value|
+--------------------------+------------+----------------+-----------+------------+
|2023-11-12 00:00:00.000000|gsod        |gsod_us_temp_avg|score      |1           |
|2023-11-12 00:00:00.000000|gsod        |gsod_us_temp_avg|score      |1           |
|2023-11-12 00:00:00.000000|gsod        |gsod_us_temp_avg|alert      |1           |
|2023-11-12 00:00:00.000000|gsod        |gsod_us_temp_avg|metric     |44.4758     |
|2023-11-11 00:00:00.000000|gsod        |gsod_us_temp_avg|score      |1           |
|2023-11-11 00:00:00.000000|gsod        |gsod_us_temp_avg|score      |1           |
|2023-11-11 00:00:00.000000|gsod        |gsod_us_temp_avg|score      |1           |
|2023-11-11 00:00:00.000000|gsod        |gsod_us_temp_avg|metric     |46.3212     |
|2023-11-11 00:00:00.000000|gsod        |gsod_us_temp_avg|score      |1           |
|2023-11-11 00:00:00.000000|gsod        |gsod_us_temp_avg|metric     |46.3212     |
+--------------------------+------------+----------------+-----------+------------+
*/

Of course you can easily pivot this table to get a slightly more "wide" format table if you prefer and is easier for working with your analytics tools etc.

SELECT
  metric_timestamp,
  metric_batch,
  metric_name,
  avg(if(metric_type='metric', metric_value, null)) as metric_value,
  avg(if(metric_type='score', metric_value, null)) as metric_score,
  max(if(metric_type='alert', metric_value, 0)) as metric_alert,
FROM
  `metrics.metrics`
WHERE
  metric_batch = 'gsod'
  and
  metric_name = 'gsod_us_temp_avg'
GROUP BY 1,2,3
ORDER BY metric_timestamp DESC
limit 10
/*
+--------------------------+------------+----------------+------------------+------------+------------+
|metric_timestamp          |metric_batch|metric_name     |metric_value      |metric_score|metric_alert|
+--------------------------+------------+----------------+------------------+------------+------------+
|2023-11-12 00:00:00.000000|gsod        |gsod_us_temp_avg|44.4758           |1           |1           |
|2023-11-11 00:00:00.000000|gsod        |gsod_us_temp_avg|46.3212           |1           |1           |
|2023-11-10 00:00:00.000000|gsod        |gsod_us_temp_avg|47.51435          |1           |0           |
|2023-11-08 00:00:00.000000|gsod        |gsod_us_temp_avg|51.7557           |1           |0           |
|2023-11-07 00:00:00.000000|gsod        |gsod_us_temp_avg|54.1946           |1           |0           |
|2023-11-06 00:00:00.000000|gsod        |gsod_us_temp_avg|53.8131           |1           |0           |
|2023-11-05 00:00:00.000000|gsod        |gsod_us_temp_avg|52.0883           |1           |0           |
|2023-11-04 00:00:00.000000|gsod        |gsod_us_temp_avg|47.8              |1           |0           |
|2023-11-03 00:00:00.000000|gsod        |gsod_us_temp_avg|48.752422407267225|1           |0           |
|2023-11-02 00:00:00.000000|gsod        |gsod_us_temp_avg|38.999010833725855|1           |0           |
+--------------------------+------------+----------------+------------------+------------+------------+
*/

Examples

back to top

Here as some specific examples, there are lots more in the ./metrics/examples/ folder.

HackerNews

Derive metrics from current top stories

In ./metrics/examples/hackernews/ you will find an example of using a customer Python function (hn_top_stories_scores.py) to pull current top 10 stories from HackerNew API and derive some metrics based on their score. This is all defined in the hn_top_stories_scores.yaml` configuration file for this metric batch.

GSOD

Derive metrics from public GSOD data in BigQuery

In ./metrics/examples/gsod/ you will find an example of just defining some sql to derive a metric batch on data already in BigQuery (gsod.sql) and ingest it into a table called metrics in a metrics dataset in a Google Bigquery project. This is all defined in the gsod.yaml` configuration file for this metric batch.

Weather

Use a custom python function to pull some weather metrics from the Open Meteo API

In ./metrics/examples/weather/ you will find an example of using a customer Python function (ingest_weather.py) to pull current temperature data for some cities from the Open Meteo API and ingest it into a table called metrics in a metrics dataset in a Google Bigquery project. This is all defined in the weather.yaml` configuration file for this metric batch.

Yahoo Finance

Use a custom python function to pull some Yahoo Finance data

In ./metrics/examples/yfinance/ you will find an example of using a customer Python function (yfinance.py) to pull current stock price data for some stocks and ingest it into a table called metrics in a metrics dataset in a Google Bigquery project. This is all defined in the yfinance.yaml` configuration file for this metric batch.

Project structure

back to top

Quickstart

back to top

Below are some quick start instructions for getting up and running with Anomstack and a local db using duckdb and some example metrics.

For proper use you would need to set up all your metrics and environment variables etc, but this should get you started.

By default Anomstack will run on port 3000, so you can go to http://localhost:3000 to see the dagster UI. You can then enable the jobs you want to run and see them run in the UI.

Note: you will need to wait for it to run a dozen or so ingest jobs before there is enough data for train, score and alert jobs to run successfully.

There are some more detailed instructions (WIP) in /docs/deployment/.

GitHub Codespaces

Open in GitHub Codespaces

You can run Anomstack using docker in a GitHub Codespace. This is a great way to get started and familiar with Anomstack without having to install or run anything locally.

You can see the .devcontainer folder for the config used to run Anomstack in a codespace and the post create script post_create_command.sh for the commands the devcontainer will run to get Anomstack up and running.

Dagster Cloud

You can run this project in Dagster Cloud. Fork the repo (or make a completely new repo using the andrewm4894/anomstack GitHub template) and then follow the instructions here to deploy to Dagster Cloud from your forked repo.

You can then manage you metrics via PR's in your GitHub repo (here is a PR to add Google Trends metrics) and run them in Dagster Cloud which will just sync with your repo.

Docker

To get started with Anomstack, you can run it via docker compose.

# clone repo
git clone https://github.com/andrewm4894/anomstack.git
# clone repo at specific release tag
# git clone -b v0.0.1 https://github.com/andrewm4894/anomstack.git
# cd into project
cd anomstack
# generate your .env file based on example
cp .example.env .env
# run docker compose up to start anomstack
docker compose up -d
# anomstack should now be running on port 3000

To update and rebuild after adding metrics or changing code, you can run:

# rebuild docker compose
docker compose build
# run docker compose up to re-start anomstack
docker compose up -d

Local Python env

You can also run Anomstack locally via a python virtual env.

# git clone
git clone https://github.com/andrewm4894/anomstack.git
# clone repo at specific release tag
# git clone -b v0.0.1 https://github.com/andrewm4894/anomstack.git
# cd into project
cd anomstack
# make virtual env
python3 -m venv .venv
# activate virtual env
source .venv/bin/activate
# install deps
pip3 install -r requirements.txt
# cp example env file
cp .example.env .env
# run locally
dagster dev -f anomstack/main.py
# anomstack should now be running on port 3000

Adding your metrics

back to top

To add metrics, you can add them to the metrics folder. You can see some examples in the metrics/examples folder.

For example, here is the PR that added Google Trends metrics to the examples.

You can customize the default params for your metrics in the metrics/defaults folder.

Environment variables for your metrics can be set in the .env file (see .example.env for examples and comments) or in the docker-compose.yml file.

Visualization

back to top

Visualization of the metrics and anomaly scores is a bit outside the scope of this project, but we do provide a couple of ways to visualize your metrics and anomaly scores.

Dagster UI Plots

Within Dagster there is the plot.py job to generate some plots of your metrics and anomaly scores for quick eyeballing within the dagster UI.

Click to see some screenshots

plot1

plot2

Streamlit

You can also use the little streamlit app in ./dashboard.py to visualize your metrics and anomaly scores.

# run streamlit app
streamlit run .\dashboard.py

...Or you can run it via make dashboard.

Click to see some screenshots

streamlit1

streamlit2

Concepts

back to top

  • "Metric Batch": You configure metric batches in Anomstack. A metric batch is a collection of metrics that you want to run together and with its own separate set of parameters. Of course a metric batch can contain just one metric if you want but typically it makes more sense to group metrics in ways that make sense for you. A metric batch is just some SQL or custom Python that results in a Pandas DataFrame with metric_timestamp, metric_name and metric_value columns.
  • "Jobs": At the core Anomstack runs a few jobs (Dagster Jobs) for each metric batch. These jobs are:
    • "Ingest" (ingest.py): This job runs the sql query (or Python function) for the metric batch and ingests the data into the database.
    • "Train" (train.py): This job trains a model for each metric.
    • "Score" (score.py): This job scores metrics using the latest trained model for each metric.
    • "Alert" (alert.py): This job alerts you when the metric looks anomalous.
    • "LLM Alert" (llmalert.py): This job alerts you when the metric looks anomalous as decided by a LLM (ChatGPT).
    • "Plot" (plot.py): This job plots metric values and scores for a batch at regular intervals so you can see some charts from within the Dagster UI.

Alerts

back to top

Check out more example alerts in the anomaly gallery.

Anomstack supports alerts via email and slack. You can configure these in the .env file (see .example.env for examples and comments).

Below is an example of an alert via email. It has some ascii art plotting recent metric values and which observations were anomalous. Attached is a png plot with more details.

email alert

And the attached plot will look something like this:

plot

LLM Alerts

back to top

Yes! I have managed to find a way to ram a large language model (LLM) into this project. But you know what, it might just work...

Update: It works horribly, but it works! 🤣. Still need to do a lot more prompt engineering to get this to work well, but it's a start.

Idea here is to just send the metric data and prompt to a LLM (ChatGPT) and ask it if it thinks the metric looks anomalous. If it does, we alert.

Note: If you don't want to send your metric data to OpenAI then just set disable_llmalert to True in your metric batch config.

Click to see some LLM Alert screenshots

Below you see an example of an LLM alert via email. In this case we add a description of the reasoning from the LLM around why it thinks the metric looks anomalous.

llmalert1

llmalert2

Contributing

Read the contributing guide to learn about our development process, how to propose bugfixes and improvements, and how to build and test your changes to Anomstack.