Goodbye DBT: How to Orchestrate Your Database with Dagster Assets

Goodbye DBT: How to Orchestrate Your Database with Dagster Assets

7 min de leitura

Managing table and view schemas in databases with traceability (version control) and reliability is a problem that many tools have tried to solve over the years. In the world of software development, the most common approach today is to use some kind of ORM (Object-Relational Mapping), which maps data structures in your code (usually classes) to tables in the database.

Some famous examples are: Entity Framework in C#, SQLAlchemy in Python, and Prisma in Node.js.

However in the data science world we don’t yet have a defined standard for how to approach this problem. This is due to several factors, the main one being that using a traditional ORM generally doesn’t make much sense as it can be limiting in some cases, not giving “full freedom” when modifying data structures (views especially) and also optimizations (mainly in OLAP databases).

One of the tools that aims to solve this problem and has been gaining a lot of traction recently is our beloved DBT. It is truly an incredible tool with great potential, but as our friend Benn Stancil said in a recent article:

“Orchestration tools like Dagster outperform dbt with development environments explicitly built to create multi-source, multi-destination, polyglot data pipelines. They were able to copy the beloved parts of dbt Core faster than dbt Labs was able to add dynamic programming, robust Python support, and other elements people wanted in their transformation layer.” Benn Stancil, How dbt fails

I must say I agree 100% with everything Benn said, and that’s why I decided to show how I use Dagster in my day-to-day to manage these structures in my databases.

Preparing Our Environment

Creating a Postgres Database

We’ll use neon.tech for hosting a Postgres database. Neon offers an incredible Serverless hosting service with a great free plan.

After creating the database (I won’t go into details as it’s super intuitive and simple), we’ll load a product sales dataset to run our tests. You can find it at this link: relational.fit.cvut.cz/dataset/SalesDB.

Below is the table schema for this dataset.

schema tables

Note: I changed the original dataset field names from Camel case to Snake case.

Installing and Configuring Dagster

To keep the article concise I’ll only add the main parts of the code here. The complete code is available on my GitHub at this link.

First let’s define our database connection resource. We can define a resource in Dagster as an object that abstracts some external resource, such as a database connection (in this case) or an API.

Here’s the code for our resource:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
from sqlalchemy import create_engine
from contextlib import contextmanager
from dagster import resource
import os

@resource
@contextmanager
def postgres_connection(context):
    ''' Return a Postgres connection. '''
    database_url: str = os.getenv('DATABASE_URL')
    assert database_url is not None, 'DATABASE_URL Environment variable not set.'
    engine = create_engine(database_url)
    with engine.begin() as connection:
        yield connection

Now let’s define our repository. It serves as a collection of assets, jobs, schedules, and sensors. They are a convenient way to organize your different jobs and other definitions in different repositories.

We’ll also create an asset reconciliation sensor, which is what will work the magic of automatically propagating our schema updates to dependent assets.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
from dagster import repository, build_asset_reconciliation_sensor, AssetSelection
from repositories import *

assets_reconciliation_sensor = build_asset_reconciliation_sensor(
    asset_selection=AssetSelection.all(),
    name='assets_reconciliation_sensor'
)

@repository
def postgres():
    '''  This repository is responsible for all jobs and assets related to Postgres. '''
    return [
        postgres_assets,
        assets_reconciliation_sensor
    ]

In lines 2 and 16 we are importing and adding our assets to the repository, respectively: we’ll define those assets below.

Creating Our First Asset in Dagster

First, let’s create the assets containing the DDL for creating our tables. We can do this as follows:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
from dagster import asset, Output, MetadataValue
import pathlib

@asset(
    required_resource_keys={'postgres'},
    name='costumers',
    compute_kind='postgres',
)
def costumers_asset(context):
    ''' Asset for table "costumers" in the Postgres Database. '''
    with open(f'{pathlib.Path(__file__).parent}/sql/ddl.sql', 'r') as file:
        ddl = file.read()
    context.log.info(f'Creating table "costumers".')
    context.resources.postgres.execute(ddl)
    return Output(
        None,
        metadata={
            'DDL': MetadataValue.md(f'```n{ddl}n```')
        }
    )

The content of the “sql/ddl.sql” file is:

1
2
3
4
5
6
CREATE TABLE IF NOT EXISTS public.customers (
 customer_id int8 PRIMARY KEY,
 first_name varchar(40) NOT NULL,
 middle_initial varchar(40) NULL,
 last_name varchar(40) NOT NULL
)

After repeating the process for the other tables (you can find the complete code on my GitHub), let’s now assign the necessary resources we defined when creating our assets — in this case our Postgres connection. We can do this as follows:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
from resources import postgres
from dagster import with_resources
import os
from .costumers import costumers_asset
from .employees import employees_asset
from .products  import products_asset
from .sales     import sales_asset

postgres_assets = with_resources(
    [
        costumers_asset,
        employees_asset,
        products_asset,
        sales_asset
    ],
    resource_defs={
        'postgres': postgres.postgres_connection,
    },
)

So far, we have the following assets in Dagster:

image

But if we tried to create our assets now, we’d have to manually create the tables in the correct order to avoid errors due to foreign keys. Let’s fix this by defining the dependencies on tables that have foreign keys — in this case the “sales” table — by adding the following code snippet to the asset definition:

1
2
3
4
5
6
7
...
non_argument_deps={
    'products':  AssetIn(key='products'),
    'costumers': AssetIn(key='costumers'),
    'employees': AssetIn(key='employees'),
}
...

Now we can materialize (run) our assets without worrying about dependencies between tables.

image

Below is a GIF of the asset materialization process so far:

image

After that, I loaded the data into the tables manually, but nothing would stop us from creating a Dagster job to do it automatically.

Creating Assets for Views

Our tables were modeled using Star Schema, where we have the dimension tables: “employees”, “products” and “costumers”, and the fact table: “sales”. However, let’s now create a view where we can get a denormalized view of the data. Here’s the query we’ll use to create that view.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
CREATE OR REPLACE VIEW view_sales_full AS
SELECT
    sales.sales_id,
    employees.employee_id,
    concat(employees.first_name, ' ', employees.middle_initial, '. ', employees.last_name) AS employee_full_name,
    customers.customer_id,
    concat(customers.first_name, ' ', customers.middle_initial, '. ', customers.last_name) AS customer_full_name,
    products.product_id,
    products."name" AS product_name,
    products.price  AS product_price,
    sales.quantity  AS product_quantity
FROM
    sales INNER JOIN employees ON
        sales.salesperson_id = employees.employee_id
    INNER JOIN products ON
        sales.product_id = products.product_id
    INNER JOIN customers ON
        sales.customer_id = customers.customer_id

The query above returns:

image

Now let’s define the asset to manage this view:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
from dagster import asset, Output, AssetIn, MetadataValue
import pathlib

@asset(
    required_resource_keys={'postgres'},
    name='view_sales_full',
    compute_kind='postgres',
    non_argument_deps={
        'products':  AssetIn(key='products'),
        'costumers': AssetIn(key='costumers'),
        'employees': AssetIn(key='elements'),
        'sales':     AssetIn(key='sales'),
    }
)
def view_sales_full_asset(context):
    ''' Asset for view "view_sales_full" in the Postgres Database. '''
    with open(f'{pathlib.Path(__file__).parent}/sql/ddl.sql', 'r') as file:
        ddl = file.read()
    context.log.info('Droping view "view_sales_full".')
    context.resources.postgres.execute('DROP VIEW IF EXISTS view_sales_full CASCADE')
    context.log.info(f'Creating view "view_sales_full".')
    context.resources.postgres.execute(ddl)
    return Output(
        None,
        metadata={
            'DDL': MetadataValue.md(f'```n{ddl}n```')
        }
    )

After defining this, here is our complete flow so far:

image

I believe you’ve understood the concept: there is no limit to how many views and dependent tables we can create from here.

Conclusion

Dagster is truly an incredible orchestration tool that brings sensational concepts like assets, being much more complex than what was shown here, including partitioning, internal DAGs for more complex assets, freshness policy and much more. For this and other reasons, I always choose to use it over more rigid tools such as Airflow.

Dagster VS DBT?

It’s worth remembering that despite the sensationalist title, you don’t have to choose between one tool or the other. One of Dagster’s great advantages is that its assets can literally be anything, including DBT models. Dagster has been widely used by the community as an orchestrator for DBT.

You can find more details in this article published by Dagster itself.

img

But tell me your opinion: if you had to choose between DBT and Dagster, which would you choose — or would you use both together? Leave your thoughts in the comments below!