Dagster: Pensando em Dados como Assets

Dagster: Pensando em Dados como Assets

10 min de leitura

Se você trabalha com engenharia de dados, provavelmente já está cansado de pensar em pipelines como uma sequência de tarefas: Extract → Transform → Load. Mas e se eu te disser que essa não é a melhor forma de modelar seus dados? O Dagster revolucionou a forma como pensamos sobre orquestração de dados com um conceito simples, porém poderoso: Software-Defined Assets.

O Problema com ETL Tradicional

No modelo tradicional de ETL, pensamos em processos:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
# Abordagem tradicional - focada em TAREFAS
def extract_data():
    """Extrai dados da API"""
    data = requests.get('https://api.example.com/sales')
    return data.json()

def transform_data(raw_data):
    """Transforma os dados"""
    df = pd.DataFrame(raw_data)
    df['total'] = df['quantity'] * df['price']
    return df

def load_data(df):
    """Carrega no banco de dados"""
    df.to_sql('sales', engine, if_exists='replace')

# Pipeline focado em EXECUTAR tarefas
def etl_pipeline():
    raw = extract_data()
    transformed = transform_data(raw)
    load_data(transformed)

Qual o problema com isso?

  1. Você não sabe ONDE seus dados estão - apenas que executou tarefas
  2. Não há linhagem clara - difícil rastrear de onde vem cada dado
  3. Re-execuções são complicadas - precisa rodar tudo de novo
  4. Testes são difíceis - você testa processos, não dados
  5. Falta visibilidade - você vê que uma tarefa rodou, mas não sabe o estado do dado

A Revolução: Pensando em Assets, Não em Tasks

O Dagster propõe uma mudança fundamental: pense no QUE você quer (os dados) e não no COMO fazer (as tarefas).

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import dagster as dg
import pandas as pd
import requests

@dg.asset  # https://docs.dagster.io/concepts/assets/software-defined-assets
def raw_sales_data() -> pd.DataFrame:
    """Dados brutos de vendas vindos da API"""
    response = requests.get('https://api.example.com/sales')
    return pd.DataFrame(response.json())

@dg.asset
def processed_sales(raw_sales_data: pd.DataFrame) -> pd.DataFrame:
    """Vendas processadas com cálculo de total"""
    df = raw_sales_data.copy()
    df['total'] = df['quantity'] * df['price']
    df['processed_at'] = pd.Timestamp.now()
    return df

@dg.asset
def sales_summary(processed_sales: pd.DataFrame) -> pd.DataFrame:
    """Resumo diário de vendas"""
    return processed_sales.groupby(
        pd.Grouper(key='date', freq='D')
    ).agg({
        'total': 'sum',
        'quantity': 'sum',
        'order_id': 'count'
    }).reset_index()

Veja a diferença! Agora:

  • ✅ Cada função define UM DADO (asset)
  • ✅ As dependências são explícitas (através dos parâmetros)
  • ✅ O Dagster constrói o grafo automaticamente
  • ✅ Você pode materializar assets individualmente
  • ✅ A linhagem é clara e rastreável

Como Funciona na Prática

1. Definindo um Asset Simples

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
import dagster as dg

@dg.asset
def customers_data(context: dg.AssetExecutionContext):
    """Lista de clientes ativos"""
    context.log.info("Carregando dados de clientes...")
    
    # Simula busca de dados
    customers = [
        {"id": 1, "name": "João Silva", "status": "active"},
        {"id": 2, "name": "Maria Santos", "status": "active"},
        {"id": 3, "name": "Pedro Costa", "status": "inactive"},
    ]
    
    active_customers = [c for c in customers if c['status'] == 'active']
    context.log.info(f"Encontrados {len(active_customers)} clientes ativos")
    
    return active_customers

2. Criando Dependências Entre Assets

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import dagster as dg
import pandas as pd

@dg.asset
def orders_raw() -> pd.DataFrame:
    """Pedidos brutos do banco de dados"""
    return pd.DataFrame({
        'order_id': [1, 2, 3, 4],
        'customer_id': [1, 1, 2, 3],
        'amount': [100.0, 150.0, 200.0, 75.0],
        'date': pd.date_range('2026-04-01', periods=4)
    })

@dg.asset
def customers() -> pd.DataFrame:
    """Dados de clientes"""
    return pd.DataFrame({
        'customer_id': [1, 2, 3],
        'name': ['João Silva', 'Maria Santos', 'Pedro Costa'],
        'segment': ['Premium', 'Standard', 'Premium']
    })

@dg.asset
def enriched_orders(orders_raw: pd.DataFrame, customers: pd.DataFrame) -> pd.DataFrame:
    """Pedidos enriquecidos com dados de clientes"""
    return orders_raw.merge(
        customers,
        on='customer_id',
        how='left'
    )

@dg.asset
def revenue_by_segment(enriched_orders: pd.DataFrame) -> pd.DataFrame:
    """Receita total por segmento de cliente"""
    return enriched_orders.groupby('segment')['amount'].sum().reset_index()

O Dagster automaticamente entende que:

  • enriched_orders depende de orders_raw E customers
  • revenue_by_segment depende de enriched_orders
  • Se você atualizar customers, enriched_orders e revenue_by_segment ficam desatualizados

dagster-asset-basic-example

3. Multi-Assets: Vários Assets de Uma Única Operação

Às vezes você faz UMA chamada que retorna VÁRIOS dados. Use @multi_asset:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import dagster as dg

@dg.multi_asset(
    specs=[
        dg.AssetSpec("users"),
        dg.AssetSpec("posts"),
        dg.AssetSpec("comments")
    ]
)
def extract_from_api(context: dg.AssetExecutionContext):
    """Busca dados de múltiplas entidades em uma única chamada à API"""
    context.log.info("Fazendo chamada única para API...")
    
    # Simula API que retorna tudo de uma vez
    api_response = {
        'users': [{'id': 1, 'name': 'João'}],
        'posts': [{'id': 1, 'user_id': 1, 'title': 'Hello'}],
        'comments': [{'id': 1, 'post_id': 1, 'text': 'Nice!'}]
    }
    
    # Retorna cada asset
    yield dg.MaterializeResult(
        asset_key="users",
        metadata={"count": len(api_response['users'])}
    )
    yield dg.MaterializeResult(
        asset_key="posts",
        metadata={"count": len(api_response['posts'])}
    )
    yield dg.MaterializeResult(
        asset_key="comments",
        metadata={"count": len(api_response['comments'])}
    )

4. Graph Assets: Múltiplas Operações para Um Asset

Quando você precisa de várias etapas intermediárias mas só quer expor o resultado final (veja mais em Graph-backed assets):

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import dagster as dg

@dg.op
def download_csv(context: dg.OpExecutionContext) -> str:
    """Baixa CSV da S3"""
    context.log.info("Baixando arquivo...")
    return "s3://bucket/data.csv"

@dg.op
def parse_csv(context: dg.OpExecutionContext, file_path: str) -> list:
    """Parse do CSV"""
    context.log.info(f"Fazendo parse de {file_path}")
    return [{"col1": "val1"}, {"col1": "val2"}]

@dg.op
def validate_data(context: dg.OpExecutionContext, data: list) -> list:
    """Valida os dados"""
    context.log.info(f"Validando {len(data)} registros")
    # Validações aqui
    return data

@dg.graph_asset
def validated_sales_data():
    """Asset final que encapsula todo o processo"""
    file = download_csv()
    parsed = parse_csv(file)
    return validate_data(parsed)

Isso cria UM asset (validated_sales_data) mas com três operações internas que você pode monitorar separadamente.

dagster-example-graph-asset

Definindo Jobs Modernos com Assets

Agora que você tem assets, como executá-los? Com asset jobs:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import dagster as dg

# Define seus assets
@dg.asset
def bronze_sales():
    return {"status": "raw"}

@dg.asset
def silver_sales(bronze_sales):
    return {"status": "cleaned"}

@dg.asset
def gold_sales(silver_sales):
    return {"status": "aggregated"}

# Cria um job que materializa TODOS os assets
# https://docs.dagster.io/concepts/assets/asset-jobs
all_assets_job = dg.define_asset_job(
    name="materialize_all_sales",
    selection=dg.AssetSelection.all()
)

# Cria um job que materializa APENAS camada gold
gold_only_job = dg.define_asset_job(
    name="materialize_gold",
    selection=dg.AssetSelection.assets(gold_sales)
)

# Job com tags personalizadas
tagged_job = dg.define_asset_job(
    name="daily_sales_job",
    selection=dg.AssetSelection.all(),
    tags={"team": "data-platform", "priority": "high"}
)

# Agrupa tudo em Definitions
defs = dg.Definitions(
    assets=[bronze_sales, silver_sales, gold_sales],
    jobs=[all_assets_job, gold_only_job, tagged_job]
)

Exemplo Real: Pipeline de E-commerce

Vamos criar um pipeline completo de análise de e-commerce:

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
import dagster as dg
import pandas as pd
from datetime import datetime

# Camada Bronze - Dados Brutos
@dg.asset(group_name="bronze")
def bronze_orders(context: dg.AssetExecutionContext) -> pd.DataFrame:
    """Pedidos brutos da API de e-commerce"""
    context.log.info("Extraindo pedidos...")
    orders = pd.DataFrame({
        'order_id': range(1, 101),
        'customer_id': [i % 20 for i in range(1, 101)],
        'product_id': [i % 10 for i in range(1, 101)],
        'quantity': [1, 2, 1, 3] * 25,
        'price': [99.90, 149.90, 199.90, 299.90] * 25,
        'created_at': pd.date_range('2026-03-01', periods=100, freq='h')
    })
    context.add_output_metadata({"num_orders": len(orders)})
    return orders

@dg.asset(group_name="bronze")
def bronze_products() -> pd.DataFrame:
    """Catálogo de produtos"""
    return pd.DataFrame({
        'product_id': range(10),
        'name': [f'Produto {i}' for i in range(10)],
        'category': ['Eletrônicos', 'Roupas', 'Livros'] * 3 + ['Alimentos']
    })

@dg.asset(group_name="bronze")
def bronze_customers() -> pd.DataFrame:
    """Base de clientes"""
    return pd.DataFrame({
        'customer_id': range(20),
        'name': [f'Cliente {i}' for i in range(20)],
        'city': ['São Paulo', 'Rio de Janeiro', 'Belo Horizonte', 'Brasília'] * 5,
        'segment': ['Premium', 'Standard'] * 10
    })

# Camada Silver - Dados Limpos e Enriquecidos
@dg.asset(group_name="silver")
def silver_orders_enriched(
    bronze_orders: pd.DataFrame,
    bronze_products: pd.DataFrame,
    bronze_customers: pd.DataFrame
) -> pd.DataFrame:
    """Pedidos enriquecidos com dados de produtos e clientes"""
    enriched = bronze_orders.copy()
    
    # Calcula total
    enriched['total'] = enriched['quantity'] * enriched['price']
    
    # Enriquece com produtos
    enriched = enriched.merge(
        bronze_products[['product_id', 'name', 'category']],
        on='product_id',
        how='left'
    )
    
    # Enriquece com clientes
    enriched = enriched.merge(
        bronze_customers[['customer_id', 'city', 'segment']],
        on='customer_id',
        how='left'
    )
    
    return enriched

# Camada Gold - Métricas de Negócio
@dg.asset(group_name="gold")
def gold_daily_revenue(silver_orders_enriched: pd.DataFrame) -> pd.DataFrame:
    """Receita diária agregada"""
    return silver_orders_enriched.groupby(
        silver_orders_enriched['created_at'].dt.date
    ).agg({
        'total': 'sum',
        'order_id': 'count'
    }).rename(columns={
        'order_id': 'num_orders'
    }).reset_index()

@dg.asset(group_name="gold")
def gold_revenue_by_category(silver_orders_enriched: pd.DataFrame) -> pd.DataFrame:
    """Receita por categoria de produto"""
    return silver_orders_enriched.groupby('category')['total'].sum().reset_index()

@dg.asset(group_name="gold")
def gold_revenue_by_city(silver_orders_enriched: pd.DataFrame) -> pd.DataFrame:
    """Receita por cidade"""
    return silver_orders_enriched.groupby('city')['total'].sum().reset_index()

@dg.asset(group_name="gold")
def gold_customer_ltv(silver_orders_enriched: pd.DataFrame) -> pd.DataFrame:
    """Lifetime Value por cliente"""
    return silver_orders_enriched.groupby(['customer_id', 'name', 'segment']).agg({
        'total': 'sum',
        'order_id': 'count'
    }).rename(columns={
        'total': 'lifetime_value',
        'order_id': 'num_orders'
    }).reset_index()

# Configurando Jobs
bronze_job = dg.define_asset_job(
    name="load_bronze_layer",
    selection=dg.AssetSelection.groups("bronze")
)

gold_job = dg.define_asset_job(
    name="compute_gold_metrics",
    selection=dg.AssetSelection.groups("gold")
)

full_pipeline = dg.define_asset_job(
    name="full_ecommerce_pipeline",
    selection=dg.AssetSelection.all()
)

# Definitions
defs = dg.Definitions(
    assets=[
        bronze_orders, bronze_products, bronze_customers,
        silver_orders_enriched,
        gold_daily_revenue, gold_revenue_by_category,
        gold_revenue_by_city, gold_customer_ltv
    ],
    jobs=[bronze_job, gold_job, full_pipeline]
)

Visualizando a Linhagem

O Dagster automaticamente cria um grafo visual mostrando a linhagem completa dos seus dados:

Grafo de linhagem do pipeline de e-commerce

Neste grafo você pode:

  • Ver todas as dependências entre assets
  • Identificar qual camada cada asset pertence
  • Materializar assets individuais ou por grupo
  • Rastrear o impacto de mudanças upstream

Benefícios da Abordagem Asset-First

1. Linhagem Automática

O Dagster cria automaticamente um grafo de dependências mostrando de onde cada dado vem.

2. Materialização Seletiva

Precisa atualizar só um asset? Basta materializá-lo:

1
2
# No UI ou via código
dg.materialize([gold_revenue_by_category])

3. Versionamento de Código

1
2
3
@dg.asset(code_version="v2.0")  # https://docs.dagster.io/concepts/assets/software-defined-assets#asset-code-versions
def my_asset():
    return {"version": 2}

O Dagster rastreia quando seu código muda e quais assets precisam ser re-materializados.

4. Metadata Rica

Adicione metadata aos seus assets:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
@dg.asset
def sales_report(context: dg.AssetExecutionContext):
    df = compute_sales()
    
    context.add_output_metadata({
        "num_rows": len(df),
        "revenue_total": df['total'].sum(),
        "avg_order_value": df['total'].mean(),
        "preview": dg.MetadataValue.md(df.head().to_markdown())
    })
    
    return df

5. Testes Mais Fáceis

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
def test_revenue_calculation():
    # Cria dados falsos
    fake_orders = pd.DataFrame({
        'quantity': [2, 3],
        'price': [10.0, 20.0]
    })
    
    # Testa a função diretamente
    result = silver_orders_enriched(
        bronze_orders=fake_orders,
        bronze_products=pd.DataFrame(),
        bronze_customers=pd.DataFrame()
    )
    
    assert result['total'].tolist() == [20.0, 60.0]

Comparação: Antes vs Depois

❌ Abordagem Tradicional (Task-Based):

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
@task
def extract(): ...

@task
def transform(): ...

@task
def load(): ...

# Você executa PROCESSOS
dag = [extract() >> transform() >> load()]

✅ Abordagem Moderna (Asset-Based):

1
2
3
4
5
6
7
8
@dg.asset
def customer_data(): ...

@dg.asset
def enriched_data(customer_data): ...

# Você MATERIALIZA DADOS
# O Dagster descobre a ordem automaticamente

Conclusão

A mudança de paradigma de ETL tradicional para Software-Defined Assets não é apenas uma mudança de sintaxe - é uma mudança fundamental na forma de pensar sobre dados:

  • De: “Eu executo tarefas que movem dados”
  • Para: “Eu defino dados e o Dagster cuida da execução”

Isso traz:

  • Maior clareza: você sabe exatamente quais dados existem
  • Melhor rastreabilidade: linhagem automática
  • Flexibilidade: materialize só o que precisa
  • Testabilidade: funções puras são fáceis de testar
  • Observabilidade: metadata rica e grafo visual

Se você ainda está pensando em pipelines como sequências de tarefas, está na hora de fazer a mudança. O futuro da engenharia de dados é centrado em ativos, não em tarefas.

Para se aprofundar mais, explore a documentação oficial do Dagster e o repositório no GitHub.


Gostou do conteúdo? Compartilhe com outros engenheiros de dados que ainda estão presos no modelo ETL tradicional! 🚀