Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Experiment/fast api #23

Closed
wants to merge 8 commits into from
Closed

Experiment/fast api #23

wants to merge 8 commits into from

Conversation

drewbanin
Copy link

This PR contains proof-of-concept code -- it should not be merged in its current state.

What am I looking at here? Check out the Loom summary.

Overview

The rpc server, in its current state, is bad. It's bad in the following ways:

  • It is finicky and unreliable
  • It is slow
  • It is stateful
  • It is hard to maintain
  • It is tightly coupled to the filesystem where dbt code lives
  • It lacks the hooks required to create stellar user experiences on top of it

This draft PR presents a proof-of-concept for a new dbt server, built on top of FastAPI (BIG +1 to whoever put this on our radar) that answers some questions about what a better dbt server could look like, as well as poses new ones.

About this PR

Everything in this PR is an experiment. Very little of this code is ready for primetime as-is, but I wanted to push the boundaries of both FastAPI and my own thinking to see what I could learn.

The relevant code in this PR is all net-new, and it does not rely on any of the existing code in the dbt server. Relatedly, only a small subset of the capabilities of the existing RPC server are supported, but there is a path to building support for these features within the framework implemented here.

Stop talking, I want to play with it:

From the dbt-rpc directory with this branch checked out:

poetry install
poetry shell
uvicorn dbt_rpc.server:app --reload --host=127.0.0.1 --port 8580

From a dbt project directory (somewhere else), run this code.

The server assumes that you have a profiles.yml file in your $HOME directory with a profile called debug:

debug:
  target: dev
  outputs:
    dev:
      type: postgres
      host: localhost
      user: drew
      password: password
      port: 5432
      schema: dbt_dbanin
      database: drew

What's implemented

The FastAPI server implements the following endpoints:

  • /push - used to push an unparsed representation of a dbt project to the server. Basically a big json-encoded blob of the filetree from the root of a dbt project
  • /parse - used to parse an unparsed representation of a dbt project to a representation of a manifest. This manifest can be used later to run dbt operations like dbt run or invoke compile/execute commands against arbitrary dbt-sql statements
    • All manifests are persisted on disk and identified by a manifest hash, so re-parses are noops and repeated dbt invocations are fast!
  • /run - used to invoke a synchronous dbt run on the server. This is a blocking request, and results from the run are returned to the client at the end of the run. This is a proof-of-concept, and not something we'd want to support in practice (I don't think)
  • /run-async - where things start getting cool! This endpoint kicks off a new background task and returns a pointer to the run to the client
    • The client can then connect over a websocket (at /ws) to stream execution logs from the server
    • Task state is stored in a sqlite database. This is used to support cross-worker coordination (eg. if the server is run with multiple workers) instead of managing shared memory across Python threads/processes (please, please, let's not do this). I used sqlite because it's easy to use in Python, but it's totally possible for this to be redis/postgres instead
  • /compile and /preview - these endpoints are used to compile/execute SQL in the context of an already-parsed manifest. They're pretty fast - I clocked a couple of milliseconds for queries against postgres, vs. thousands of milliseconds on the existing rpc server
    • Note: these endpoints aren't perfect, and we'd probably want to build support for long-polling or generally async query patterns. We could possibly use websockets here... TBD

What else did you do?

I played around with building a facade that mimics the existing rpc server. I got as far as supporting the status rpc method which tested out the idea of proxying the existing interface to a new, better, implementation. I think this approach is kind of fraught, but maybe it could be viable with a little bit more care and attention. Either way, I do like the idea of:

  1. Mimicking the existing API with a shim
  2. Creating a migration path to consumption of a new server API

Big takeaways

I think that synchronizing dbt project code state is going to be a big challenge. The most minimum of MVPs (shown here) seems tractable, but I didn't even try to touch diffing files locally, or dealing with parsing non-localities, or anything like that. Here be dragons, I think, and my hope is that the overhead of sending a full serialized representation of a dbt project over the network won't be a non-starter.

Adding sqlite to the mix felt like a real unlock. Suddenly, a lot of process/memory coordination that Python makes very challenging became easy-peasy. I liked that by storing task state in sqlite, I could pretty readily support websocket-based log tailing. I didn't let myself think about what database migrations would like in this world... I bet that would be a real challenge... but the idea of managing task/query/project state in files and databases felt VERY compelling in practice. Let's definitely do this.

There are some big wins we can achieve by creating better interfaces into dbt Core. I had to do some gross patching to:

  • Extract logs from running dbt invocations and
  • Re-use a serialized manifest for dbt runs / query compilation / query execution
    I think that by upgrading dbt Core's APIs, we can create more robust patterns for serving up access to dbt via web API.

I didn't touch credential logic at all - this code still just points to a profiles.yml file. This remains a pretty big question mark, but there is only one place where we need to capture those creds... so maybe there's a path to making it better with a little more thought and creativity.

I didn't think too hard about state persistence -- this thing just writes files to a path called working-dir in the root of the repo. We'll need to do something smarter there, but I imagine that there are sensible ways for us to configure this via an env var or similar.

Last thoughts

I'll drop some comments inline explaining my thinking, what worked, what didn't, etc. All comments, whether condemnations, eviscerations, or explications are very welcomed! What do you think -- you buy it?

@@ -0,0 +1,36 @@

from sqlalchemy.orm import Session
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is the primary interface into database operations. Right now, "tasks" (like a dbt run) are stored in a sqlite database. This task state is used to side-step inter-thread/process communication in Python and preserve state across server invocations. Nothing on the server is stored in memory

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nothing on the server is stored in memory

🌈 stateless 🌈
you love to see it

from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

SQLALCHEMY_DATABASE_URL = "sqlite:///./sql_app.db"
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i followed this tutorial. No, I don't know what declarative_base does either :)

https://fastapi.tiangolo.com/tutorial/sql-databases/


from .services import filesystem_service

GLOBAL_LOGGER = logging.getLogger(__name__)
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lots and lots and lots of hacks in here. It was a big struggle to tap into the logger, but ultimately very possible. If we pursue this further, let's expose some more readily consumable APIs in core directly

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

)

# Big hack?
logs_redirect_handler.formatter = json_formatter
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for some weird reason, this formatter can't be set via the constructer! strange! We want json logs back because:

  1. they give us info about log level (eg. DEBUG vs INFO)
  2. we can always extract the message parameter to get human-style logs (we do this elsewhere in the server)

"""
return record.channel.split(".")[0] == "dbt"

def setup_handlers(self):
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this code was inspired by dbt Cloud - see setup_handlers() in clients/dbt.py for an example

import io
import time

def run_dbt(task_id, args, db):
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lots of helpers in here intended to help with async tasks / websocket logging, things like that. I tried to keep the magic to a minimum overall.

Logs from run_dbt() are written to logs.stdout in a folder determined by the task_id (stored in sqlite). tail_logs_for_path() below reads from this file and yields back log lines as they're emitted. These logs are then pushed back to clients connected via a websocket. Works pretty well in practice!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in a folder determined by the task_id (stored in sqlite)

yes yes yes

path = filesystem_service.get_root_path(state_id)
reuse = True

# Stupid example of reusing an existing manifest
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this could be WAY smarter, but I wanted to just try this out as a proof of concept. A call to /push is a no-op if state_id already exists on the server filesystem. For this to work, the state_id should be calculated as a hash of all project code. Pretty naive, but neat to see it work in practice

path = filesystem_service.get_root_path(args.state_id)
serialize_path = filesystem_service.get_path(args.state_id, 'manifest.msgpack')

# TODO: We... don't use this!
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this comment is out-of-date -- we do use it, but I needed to patch a dbt function or two :)

response_model=schemas.Task,
db: Session = Depends(crud.get_db)
):
return task_service.run_async(background_tasks, db, args)
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tell me this isn't 🔥 - you can't!

websocket: WebSocket,
db: Session = Depends(crud.get_db),
):
await websocket.accept()
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fastapi makes websockets really easy! It's not clear to me what would be involved in putting this behind something like nginx thoguh -- that part is still an open question in my mind!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

socket to me, socket to me, a little respect

@drewbanin
Copy link
Author

@drewbanin drewbanin closed this Sep 27, 2021
Copy link

@davidharting davidharting left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whoa this PR made me want to write some Python. 🚀

@@ -0,0 +1,36 @@

from sqlalchemy.orm import Session

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nothing on the server is stored in memory

🌈 stateless 🌈
you love to see it


DBT_PROJECT_DIR = os.getenv('DBT_PROJECT_DIR')

class CompatibilityShim(object):

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ask "what's the status of the dbt project?"

I wonder if dealing with this statelessly becomes easier if we forget about all the other complexities of rpc server compatibility that you may have had to deal with in this file.

crud.set_task_done(db, db_task)


def run_async(background_tasks, db, args):

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is using FastAPI's first-party background task abstraction?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah!! Works pretty well... open question around if it's viable for this use-case though!

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right on! Until we separate worker and web server into separate processes seems as good as any other "monolithic" option.

@drewbanin drewbanin deleted the experiment/fast-api branch October 4, 2021 13:45
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants