Machine Learning

4 YAML Files Instead of PySpark: How We Let Analysts Build Data Pipelines Without Developers

we three weeks to send one pipe of data. Today, an analyst with zero Python knowledge makes it a day. Here's how we got there.

I'm Kiril Kazlou, a data engineer at Mindbox. Our team is constantly recalculating business metrics for clients — which means we're constantly building data texts to pay for analytics, pulling from many different sources.

For a long time, we relied on PySpark for all our data processing. The problem? You can't really work with PySpark without Python experience. All new pipelines require an engineer. And that meant waiting – sometimes weeks.

In this post, I'll show you how we're building an internal data platform where an analyst or product manager can put together a pipeline that's constantly updated by writing just four YAML files.

Why PySpark Was Slowing Us Down

Let me illustrate the pain with a textbook example — calculating MAU (Monthly Active Users).

On the surface, this sounds like a simple SQL function: COUNT(DISTINCT customerId) in several tables per time window. But because of all the overhead infrastructure – PySpark, Airflow DAG setup, Spark resource allocation, testing – we had to give it to developers. The result? Just a full week to post MAU counter.

Every new metric takes one to three weeks to deliver. And every time, the process looked the same:

  1. The analyst defined the business requirements, found an available developer, and then delivered the context.
  2. The developer specified the details, wrote the PySpark code, went through code review, prepared the DAG, and deployed.

What we really wanted were analysts and product managers – people who have a better understanding of the business logic and are good at SQL and YAML – to handle this themselves. There is no Python. There is no PySpark.

How pipelines are built with PySpark

What PySpark Replaced You With: YAML and SQL Are All You Need

To take a declarative approach, we divide our data layer into three parts and choose the appropriate tool for each:

  • dlt (data load tool) – imports data from external APIs and databases into the repository. It is completely configured with a YAML file. No code is required.
  • dbt (data structure tool) in Trino – Transforms data using pure SQL. Connects models with ref()automatically builds a dependency graph, and handles incremental updates.
  • Airflow + Cosmos – organizes the pipes. Airflow DAG is generated automatically dag.yaml and the dbt project.

We were already using Trino as a query engine for ad-hoc queries and had connected it to Superset for BI. It had proven itself: for common logic queries, it processed large datasets faster and with fewer resources than Spark. In addition, Trino natively supports integrated access to multiple data stores from a single SQL query. For 90% of our pipes, the Trino was a perfect fit.

Workflow diagram of the new pipeline: the parser writes YAML configuration and SQL models directly. dbt and Trino handle automation with Airflow. No developer involvement is required. The whole process takes one day.
After: analyst managed pipes with dbt + Trino

How We Load Data: dlt.yaml

The first YAML file defines where and how to load data for downstream processing. Here's a real-world example – loading payment data into an internal API:

product: sg-team
feature: billing
schema: billing_tarification

dag:
  dag_id: dlt_billing_tarification
  schedule: "0 4 * * *"
  description: "Daily refresh of tarification data"
  tags:
    - billing

alerts:
  enabled: true
  severity: warning

source:
  type: rest_api
  client:
    base_url: "
    auth:
      type: bearer
      token: dlt-billing.token
  resources:
    - name: tarification_data
      endpoint:
        path: /tarificationData
        method: POST
        json:
          firstPeriod: "{{ previous_month_date }}"
          lastPeriod: "{{ previous_month_date }}"
          pricingPlanLine: CurrentPlan
      write_disposition: replace
      processing_steps:
        - map: dlt_custom.billing_tarification_data.map

    - name: charges_raw
      columns:
        staffUserName:
          data_type: text
          nullable: true
      endpoint:
        path: /data-feed/charges
        method: POST
        json:
          firstPeriod: "{{ previous_month_date }}"
          lastPeriod: "{{ previous_month_date }}"
      write_disposition: replace

    - name: discounts_raw
      endpoint:
        path: /data-feed/discounts
        method: POST
        json:
          firstPeriod: "{{ previous_month_date }}"
          lastPeriod: "{{ previous_month_date }}"
      write_disposition: replace

This configuration defines four services from a single API. In each case, we specify the destination, the request parameters, and the write strategy — for us, replace means “overwrite all the time.” You can also add processing steps, define column types, and configure alerts.

The overall config 40 lines of YAML. Without dlt, each connector can handle requests for Python scripting, naming, retries, serial production in Delta Table format, and uploading to storage.

How to Transform Data with SQL: dbt_project.yaml and sources.yaml

The next step is to configure the dbt model. With Trino, that means SQL queries.

Here is an example of how we set the MAU calculation. Here's what event editing looks like from one source:

-- int_mau_events_visits.sql (simplified)
{{ config(materialized='table') }}

WITH period AS (
    -- Rolling window: last 5 months to current
    SELECT
        YEAR(CURRENT_DATE - INTERVAL '5' MONTH) AS start_year,
        MONTH(CURRENT_DATE - INTERVAL '5' MONTH) AS start_month,
        YEAR(CURRENT_DATE) AS end_year,
        MONTH(CURRENT_DATE) AS end_month
),

events AS (
    -- Pull visit events within the period window
    SELECT src._tenant, src.unmergedCustomerId,
           'visits' AS src_type, src.endpoint
    FROM {{ source('final', 'customerstracking_visits') }} src
    CROSS JOIN period p
    WHERE src.unmergedCustomerId IS NOT NULL
      AND /* ...timestamp filtering by year/month bounds... */
),

events_with_customer AS (
    -- Resolve merged customer IDs
    SELECT e._tenant,
           COALESCE(mc.mergedCustomerId, e.unmergedCustomerId) AS customerId,
           e.src_type, e.endpoint
    FROM events e
    LEFT JOIN {{ ref('int_merged_customers') }} mc
      ON e._tenant = mc._tenant
      AND e.unmergedCustomerId = mc.unmergedCustomerId
)

-- Keep only actual (non-deleted) customers
SELECT ewc._tenant, ewc.customerId, ewc.src_type, ewc.endpoint
FROM events_with_customer ewc
WHERE EXISTS (
    SELECT 1 FROM {{ ref('int_actual_customers') }} ac
    WHERE ewc._tenant = ac._tenant
      AND ewc.customerId = ac.customerId
)

All 10 event sources follow the exact same pattern. The only difference is the source table and filters. Then the models merge into a single stream:

-- int_mau_events.sql (union of all sources)
SELECT * FROM {{ ref('int_mau_events_inapps_targetings') }}
UNION ALL
SELECT * FROM {{ ref('int_mau_events_inapps_clicks') }}
UNION ALL
SELECT * FROM {{ ref('int_mau_events_visits') }}
UNION ALL
SELECT * FROM {{ ref('int_mau_events_orders') }}
-- ...plus 6 more sources

And finally, the data mart where everything comes together:

-- mau_period_datamart.sql
{{ config(
    materialized='incremental',
    incremental_strategy='merge',
    unique_key=['_tenant', 'start_year', 'start_month', 'end_year', 'end_month']
) }}

{%- set months_back = var('months_back', 5) | int -%}

WITH period AS (
    SELECT
        YEAR(CURRENT_DATE - INTERVAL '{{ months_back }}' MONTH) AS start_year,
        MONTH(CURRENT_DATE - INTERVAL '{{ months_back }}' MONTH) AS start_month,
        YEAR(CURRENT_DATE) AS end_year,
        MONTH(CURRENT_DATE) AS end_month
),
events_resolved AS (
    SELECT * FROM {{ ref('int_mau_events') }}
),
metrics_by_tenant AS (
    SELECT
        er._tenant,
        COUNT(DISTINCT CASE WHEN src_type = 'visits'
              THEN customerId END) AS CustomersTracking_Visits,
        COUNT(DISTINCT CASE WHEN src_type = 'orders'
              THEN customerId END) AS ProcessingOrders_Orders,
        COUNT(DISTINCT CASE WHEN src_type = 'mailings'
              THEN customerId END) AS Mailings_MessageStatuses,
        -- ...other metrics
        COUNT(DISTINCT customerId) AS MAU
    FROM events_resolved er
    GROUP BY er._tenant
)
SELECT m.*, p.start_year, p.start_month, p.end_year, p.end_month
FROM metrics_by_tenant m
CROSS JOIN period p

For database configuration, we use incremental_strategy='merge'. dbt automatically generates a merge query, instead of unique_key with anger. There is no need to use incremental loading manually.

To tie the models into a single project, we stop dbt_project.yaml:

name: mau_period
version: '1.0.0'

models:
  mau_period:
    +on_table_exists: replace
    +on_schema_change: append_new_columns

Again sources.yamlwhich defines the input tables:

sources:
  - name: final
    database: data_platform
    schema: final
    tables:
      - name: inapps_targetings_v2
      - name: inapps_clicks_v2
      - name: customerstracking_visits
      - name: processingorders_orders
      - name: cdp_mergedcustomers_v2
      # ...

The result is the same business logic we had in PySpark, but in pure SQL: sources.yaml replaces typedspark schemas, {{ ref() }} again {{ source() }} give back .get_table()and automatic execution order with a dependency graph replaces Spark resource tuning.

How to set up airflow: dag.yaml

A fourth configuration file defines when and how Airflow uses the pipeline:

product: sg-team
feature: billing
schema: mau
schedule: "15 21 * * *"  # every day at 00:15 MSK

params:
  - name: start_date
    description: "Start date (YYYY-MM-DD). Leave empty for auto"
    default: ""
  - name: end_date
    description: "End date (YYYY-MM-DD). Leave empty for auto"
    default: ""
  - name: months_back
    description: "Months to look back (default: 5)"
    default: 5

alerts:
  enabled: true
  severity: warning

After that our Python script is passed dag.yaml again dbt_project.yaml and uses the Cosmos library to generate a fully functional Airflow DAG. This is only part of the Python code in every setup. It is written once and works for every dbt project. Here is the important part:

def _build_dbt_project_dags(project_path: Path, environ: dict) -> list[DbtDag]:
    config_dict = yaml.safe_load(dag_config_path.read_text())
    config = DagConfig.model_validate(config_dict)

    # YAML params → Airflow Params
    params = {}
    operator_vars = {}
    for param in config.params:
        params[param.name] = Param(
            default=param.default if param.default is not None else "",
            description=param.description,
        )
        operator_vars[param.name] = f"{{{{ params.{param.name} }}}}"

    # Cosmos creates the DAG from the dbt project
    with DbtDag(
        dag_id=f"dbt_{project_path.name}",
        schedule=config.schedule,
        params=params,
        project_config=ProjectConfig(dbt_project_path=project_path),
        profile_config=ProfileConfig(
            profile_name="default",
            target_name=project_name,
            profile_mapping=TrinoLDAPProfileMapping(
                conn_id="trino_default",
                profile_args={
                    "database": profile_database,
                    "schema": profile_schema,
                },
            ),
        ),
        operator_args={"vars": operator_vars},
    ) as dag:
        # Create schema before running models
        create_schema = SQLExecuteQueryOperator(
            task_id="create_schema",
            conn_id="trino_default",
            sql=f"CREATE SCHEMA IF NOT EXISTS {profile_database}.{profile_schema} ...",
        )
        # Attach to root tasks
        for unique_id, _ in dag.dbt_graph.filtered_nodes.items():
            task = dag.tasks_map[unique_id]
            if not task.upstream_task_ids:
                create_schema >> task

Cosmos is learning manifest.json from the dbt project, analyzes the model dependency graph, and creates a separate Airflow function for each model. Function dependencies are automatically created based on ref() calls to SQL.

How Analysts Build Pipelines Without Engineers

Now if the analyst needs a new pipe to appear, he can assemble it in a few steps:

Step 1. Create a folder in the repo: dbt-projects/my_new_pipeline/.

Step 2. If external data entry is required, write a YAML configuration for the dlt.

Step 3. Write SQL models in models/ folder and define sources in sources.yaml.

Step 4. Create dbt_project.yaml again dag.yaml.

Step 5. Push to Git, go through revision, merge.

CI/CD builds the dbt project and sends the artifacts to S3. Airflow reads the DAG files from there, Cosmos transfers the dbt project and generates a task graph. In the schedule, dbt uses the models in Trino correctly. The result is a market of updated data in the store, accessible through Superset.

What Changed After the Migration

A before-and-after comparison showing pipeline delivery time drops from one to three weeks under PySpark to one day with a YAML-based stack, with pipeline ownership moving from developers to analysts.
Changed: from weeks to one day, from engineers to analysts

In order for analysts to build pipelines themselves, they must understand ref() again source() concepts, differences between table again incremental virtualization, and Git basics. We ran several in-house workshops and put together step-by-step guides for each type of activity.

Why the New Stack Doesn't Completely Replace PySpark

For about 10% of our pipelines, PySpark is still the only option – when the transformation doesn't go into SQL. dbt supports Jinja macros, but that's not a full-blown Python replacement. And it would be dishonest to exceed the limits of new tools.

dlt + Delta: test update support. We use the Delta format in our storage layer. Delta's dlt connector is marked as experimental, so the integration strategy didn't work out of the box. We had to find workarounds – in some cases we used them replace instead of merge (sacrificing incrementality), and in others we have written tradition processing_steps.

Trino's limited fault tolerance. Trino has a fault tolerance mechanism, but it works by writing intermediate results to S3. With our terabyte-scale data volume, this is not possible – just the number of S3 operations makes it too expensive. Without fault tolerance enabled, if the Trino worker goes down, the entire query fails. Spark, in contrast, resumes a failed operation. We tackled this with DAG-level retries and by decomposing heavy models into intermediate chains.

UDFs and custom logic. In Spark, you can write custom logic in Python right inside the pipeline – it's very easy. With a new building, this is very difficult. dbt over Trino doesn't help: Jinja only generates SQL, and dbt's Python models only work with Snowflake, Databricks, and BigQuery. You can write UDFs in Trino, but only in Java – with all the extras including: separate repo, build pipeline, deployment of JARs to all workers. So if the transformation doesn't go into SQL, you can end up with an unmaintainable SQL monster or a standalone script that breaks the lineage.

What's Next: Tests, Model Templates, and Training

A better test. We've had a rigorous testing pipeline at PySpark, but the new build is still working. Recent versions of dbt have introduced unit testing — now you can validate a logical SQL model against fake data without spinning a full pipeline. We want to add dbt testing both at the model level and as a separate monitoring layer.

Reusable templates for common patterns. Most of our dbt models look the same. A single configuration can define a dozen models with the same pattern — only the source table and filters differ. We plan to extract the shared logic into dbt macros.

Expanding the platform's user base. We want more developers and analysts to work with data independently. We organize regular internal training sessions, documentation, and onboarding guides so that new users can get up to speed quickly and start building their own models.

If your team is stuck in the same “analysts wait for developers” cycle, I'd love to hear how you solve it. Connect with me on LinkedIn and let's compare notes.


All images in this article belong to the author unless otherwise indicated.

Source link

Related Articles

Leave a Reply

Your email address will not be published. Required fields are marked *

Back to top button