🖥️
DE Zoomcamp Notes
Linkedin | Kayla TinkerGithub | Tinker0425Blog | From Clouds to CodeBlueSky | Cloudy Blue Wave
  • Welcome - Data Engineering Zoomcamp 2025 Notes
  • INTRODUCTION
    • Introduction & Set Up
      • Virtual Environments
  • MODULE 1
    • Introduction to Module 1
    • 1.1 - Google Cloud Platform GCP
      • 1.1.1 - Introduction to Google Cloud Platform
    • 1.2 - Docker & Docker-compose
      • 1.2.1 - Introduction to Docker
      • 1.2.2 - Ingesting NY Taxi Data to Postgres
      • 1.2.3 - Connecting pgAdmin and Postgres
      • 1.2.4 - Dockerizing the Ingestion Script
      • 1.2.5 - Running Postgres and pgAdmin with Docker-Compose
      • Docker-Compose Summary
      • 1.2.6 - SQL Refresher
      • Optional Docker Video
    • 1.3 - Setting up infrastructure on GCP with Terraform
      • 1.3.1 - Terraform Primer
      • 1.3.2 - Terraform Basics
      • 1.3.3 - Terraform Variables
    • Homework
  • Module 2
    • Introduction to Module 2
    • 2.1 - Introduction to Orchestration and Kestra
      • 2.1.1 - Workflow Orchestration Introduction
      • 2.1.2 - Learn Kestra
    • 2.2 - ETL Pipelines in Kestra: Detailed Walkthrough
      • 2.2.1 - Create an ETL Pipeline with Postgres in Kestra
      • 2.2.2 - Manage Scheduling and Backfills using Postgres in Kestra
      • 2.2.3 - Transform Data with dbt and Postgres in Kestra
    • 2.3 - ETL Pipelines in Kestra: Google Cloud Platform
      • 2.3.1 - Create an ETL Pipeline with GCS and BigQuery in Kestra
      • 2.3.2 - Manage Scheduling and Backfills using BigQuery in Kestra
      • 2.3.3 - Transform Data with dbt and BigQuery in Kestra
    • Bonus: Deploy to the Cloud
    • Homework
  • Module 3
    • Introduction to Module 3
    • 3.1 - Data Warehouse, Partitioning and Clustering
      • 3.1.1 - Data Warehouse and BigQuery
      • 3.1.2 - Partitioning and Clustering
    • 3.2 - BigQuery Internals and Best Practices
      • 3.2.1 - BigQuery Best Practices
      • 3.2.2 - Internals of Big Query
    • 3.3 - Machine Learning
      • 3.3.1 - BigQuery Machine Learning
      • 3.3.2 - BigQuery Machine Learning Deployment
    • Homework
  • Workshop
    • Workshop Week
    • Homework
  • Module 4
    • Introduction to Module 4
    • 4.1 - DBT the basics
      • 4.1.1 - Analytics Engineering Basics
      • 4.1.2 - What is dbt?
    • 4.2 - Creating your Project
      • 4.2.1 - Set Up Project
      • 4.2.2 - Start Your dbt Project BigQuery and dbt Cloud
      • 4.2.3 - Build the First dbt Models
      • 4.2.4 - Testing and Documenting the Project
    • 4.3 - Deployment & Visualizations
      • 4.3.1 - Deployment Using dbt Cloud
      • 4.3.2 - Visualising the data with Google Data Studio
    • Homework
  • Module 5
    • Introduction to Module 5
    • 5.1 - Install & Intro
      • 5.1.1 - Install
      • 5.1.2 - Intro to Batch Processing
      • 5.1.3 - Intro to Spark
    • 5.2 - Spark SQL and DataFrames
      • 5.2.1 - Spark & PySpark
      • 5.2.2 - Spark Dataframes
      • 5.2.3 - SQL with Spark
    • 5.3 - Spark Internals
      • 5.3.1 - Anatomy of a Spark Cluster
      • 5.3.2 - GroupBy in Spark
      • 5.3.3 - Joins in Spark
    • 5.4 - Running Spark in the Cloud
      • 5.4.1 - Connecting to Google Cloud Storage
      • 5.4.2 - Creating a Local Spark Cluster
      • 5.4.3 - Setting up a Dataproc Cluster
      • 5.4.4 - Connecting Spark to Big Query
    • Homework
  • Module 6
    • Introduction to Module 6
    • 6.1 - Stream Processing
      • 6.1.1 - Introduction
      • 6.1.2 - Intro to stream processing
      • 6.1.3 - What is Kafka?
      • 6.1.4 - Confluent cloud
      • 6.1.5 - Kafka producer consumer
      • 6.1.6 - Kafka configuration
    • Homework
  • Final Project
    • Final Project
    • How To!
      • 1 - Create a Google Cloud Project
      • 2 - API Key and Access Token Setup
      • 3 - Fork This Repo in Github
      • Ready to Run!
    • THE END
Powered by GitBook

Connect

  • Linkedin | Kayla Tinker
  • BlueSky | Cloudy Blue Wave
  • Blog | From Clouds to Code
  • Github | Tinker0425
On this page
  • Build postgres_taxi.yaml Flow
  • Build postgres_taxi.yaml
  1. Module 2
  2. 2.2 - ETL Pipelines in Kestra: Detailed Walkthrough

2.2.1 - Create an ETL Pipeline with Postgres in Kestra

Last updated Jan 28, 2025

Previous2.2 - ETL Pipelines in Kestra: Detailed WalkthroughNext2.2.2 - Manage Scheduling and Backfills using Postgres in Kestra

Last updated 4 months ago

Youtube Video | ~28 min

I recommend going through all 3 videos in one sitting

To get pgAdmin to connect, I use a docker-compose.yml that Bruno put in slack and then needed to modify one line in the video code...

Build postgres_taxi.yaml Flow

First, we need to connect our Kestra, pgAdmin, and Postgres using docker-compose.yml

  1. Open Docker Desktop

  2. Add this docker-compose.yml from Bruno to your module-2 directory:

    1. docker compose up

Build postgres_taxi.yaml

Final YAMLs

1

Goal - View Empty Table in PgAdmin

To get the editor window, click 'Flows on the left hnd side -> 'Editor' tab -> 'Source and topography' as your file view option, similar to his in the video, ours should look like:

Get used to using Kestra and setting things up. I recommend writing this out, understand what it is doing, and then testing the run and output:

id: postgres_taxi
namespace: zoomcamp

inputs:
  - id: taxi
    type: SELECT
    displayName: Select taxi type
    values: ['yellow','green']
    defaults: 'yellow'

  - id: year
    type: SELECT
    displayName: Pick Year
    values: ["2019","2020"]
    defaults: "2019"

  - id: month
    type: SELECT
    displayName: Pick Month
    values: ["01","02","03","04","05","06","07","08","09","10","11","12"]
    defaults: "01"

variables:
  file: "{{inputs.taxi}}_tripdata_{{inputs.year}}-{{inputs.month}}.csv"
  staging_table: "public.{{inputs.taxi}}_tripdata_staging"
  table: "public.{{inputs.taxi}}_tripdata"
  data: "{{outputs.extract.outputFiles[inputs.taxi ~ '_tripdata_' ~ inputs.year ~ '-' ~ inputs.month ~ '.csv']}}"


tasks:
  - id: set_label
    type: io.kestra.plugin.core.execution.Labels
    labels:
      file: "{{render(vars.file)}}"
      taxi: "{{inputs.taxi}}"

  - id: extract
    type: io.kestra.plugin.scripts.shell.Commands
    outputFiles:
      - "*.csv"
    taskRunner:
      type: io.kestra.plugin.core.runner.Process
    commands:
      - wget -qO- https://github.com/DataTalksClub/nyc-tlc-data/releases/download/{{inputs.taxi}}/{{render(vars.file)}}.gz | gunzip > {{render(vars.file)}}

  - id: green_create_table
    type: io.kestra.plugin.jdbc.postgresql.Queries
    sql: |
      CREATE TABLE IF NOT EXISTS {{render(vars.table)}} (
          unique_row_id          text,
          filename               text,
          VendorID               text,
          lpep_pickup_datetime   timestamp,
          lpep_dropoff_datetime  timestamp,
          store_and_fwd_flag     text,
          RatecodeID             text,
          PULocationID           text,
          DOLocationID           text,
          passenger_count        integer,
          trip_distance          double precision,
          fare_amount            double precision,
          extra                  double precision,
          mta_tax                double precision,
          tip_amount             double precision,
          tolls_amount           double precision,
          ehail_fee              double precision,
          improvement_surcharge  double precision,
          total_amount           double precision,
          payment_type           integer,
          trip_type              integer,
          congestion_surcharge   double precision
      );

  - id: green_create_staging_table
    type: io.kestra.plugin.jdbc.postgresql.Queries
    sql: |
      CREATE TABLE IF NOT EXISTS {{render(vars.staging_table)}} (
          unique_row_id          text,
          filename               text,
          VendorID               text,
          lpep_pickup_datetime   timestamp,
          lpep_dropoff_datetime  timestamp,
          store_and_fwd_flag     text,
          RatecodeID             text,
          PULocationID           text,
          DOLocationID           text,
          passenger_count        integer,
          trip_distance          double precision,
          fare_amount            double precision,
          extra                  double precision,
          mta_tax                double precision,
          tip_amount             double precision,
          tolls_amount           double precision,
          ehail_fee              double precision,
          improvement_surcharge  double precision,
          total_amount           double precision,
          payment_type           integer,
          trip_type              integer,
          congestion_surcharge   double precision
      );

pluginDefaults:
  - type: io.kestra.plugin.jdbc.postgresql
    values:
    url: jdbc:postgresql://kestra-metadata:5432/kestra
      username: kestra
      password: k3str4
```

Line - url: jdbc:postgresql://kestra-metadata:5432/kestra was changed from video and github so we can use pgAdmin in our docker-compose and not have to download it locally.

Kestra, save and execute - then open pgAdmin and add a new server connection:

2

Add green_copy_in_to_staging_table

- id: green_copy_in_to_staging_table
  type: io.kestra.plugin.jdbc.postgresql.CopyIn
  format: CSV
  from: "{{render(vars.data)}}"
  table: "{{render(vars.staging_table)}}"
  header: true
  columns: [VendorID,lpep_pickup_datetime,lpep_dropoff_datetime,store_and_fwd_flag,RatecodeID,PULocationID,DOLocationID,passenger_count,trip_distance,fare_amount,extra,mta_tax,tip_amount,tolls_amount,ehail_fee,improvement_surcharge,total_amount,payment_type,trip_type,congestion_surcharge]

Add to tasks the green_copy_in_to_staging_table and re-run in Kestra

3

Add green_add_unique_id_and_filename

- id: green_add_unique_id_and_filename
  type: io.kestra.plugin.jdbc.postgresql.Queries
  sql: |
    UPDATE {{render(vars.staging_table)}}
    SET 
      unique_row_id = md5(
        COALESCE(CAST(VendorID AS text), '') ||
        COALESCE(CAST(lpep_pickup_datetime AS text), '') || 
        COALESCE(CAST(lpep_dropoff_datetime AS text), '') || 
        COALESCE(PULocationID, '') || 
        COALESCE(DOLocationID, '') || 
        COALESCE(CAST(fare_amount AS text), '') || 
        COALESCE(CAST(trip_distance AS text), '')      
      ),
      filename = '{{render(vars.file)}}';

Add to tasks the green_add_unique_id_and_filename and re-run in Kestra

4

Add green_truncate_staging_table

Add to tasks green_truncate_staging_table after green_create_staging_table

- id: green_truncate_staging_table
  type: io.kestra.plugin.jdbc.postgresql.Queries
  sql: |
    TRUNCATE TABLE {{render(vars.staging_table)}};

Re-run in Kestra

5

Add green_merge_data

- id: green_merge_data
  type: io.kestra.plugin.jdbc.postgresql.Queries
  sql: |
    MERGE INTO {{render(vars.table)}} AS T
    USING {{render(vars.staging_table)}} AS S
    ON T.unique_row_id = S.unique_row_id
    WHEN NOT MATCHED THEN
      INSERT (
        unique_row_id, filename, VendorID, lpep_pickup_datetime, lpep_dropoff_datetime,
        store_and_fwd_flag, RatecodeID, PULocationID, DOLocationID, passenger_count,
        trip_distance, fare_amount, extra, mta_tax, tip_amount, tolls_amount, ehail_fee,
        improvement_surcharge, total_amount, payment_type, trip_type, congestion_surcharge
      )
      VALUES (
        S.unique_row_id, S.filename, S.VendorID, S.lpep_pickup_datetime, S.lpep_dropoff_datetime,
        S.store_and_fwd_flag, S.RatecodeID, S.PULocationID, S.DOLocationID, S.passenger_count,
        S.trip_distance, S.fare_amount, S.extra, S.mta_tax, S.tip_amount, S.tolls_amount, S.ehail_fee,
        S.improvement_surcharge, S.total_amount, S.payment_type, S.trip_type, S.congestion_surcharge
      );

Add to tasks green_merge_dataand re-run in Kestra

6

Add purge_files

To keep Kestra clean and not run out of room, we want to purge these files after loading

- id: purge_files
  type: io.kestra.plugin.core.storage.PurgeCurrentExecutionFiles
  description: This will remove output files. If you'd like to explore Kestra outputs, disable it.

7

Add yellow taxi's using 'if' statement

- id: if_yellow_taxi
  type: io.kestra.plugin.core.flow.If
  condition: "{{inputs.taxi == 'yellow'}}"
  then:
    - id: yellow_create_table

####
# Then add all taxi tasks renamed as 'yellow'
####

# Then add if green and the green tasks
- id: if_green_taxi
  type: io.kestra.plugin.core.flow.If
  condition: "{{inputs.taxi == 'green'}}"
  then:
    - id: green_create_table

Add to tasks and re-run in Kestra

Terminal head to the directory of your docker-compose.yml and run

Note that this load may take a bit of time for the first compose

Check that Kestra 8080 and pgAdmin 9000 are running

You should now see two empty green taxi tables in pgAdmin

You should now see data in the green taxi staging table in pgAdmin

You should now see unique id's and filename columns updated in the green taxi staging table in pgAdmin

This appends rows so we have duplicates, thus we need to Truncate our tables first:

You should now see data that is no longer appending itself in the green taxi staging table in pgAdmin

You should now see data in the main green taxi table in pgAdmin

I feel like there should be a more efficient way?

You should now see yellow and green taxi table in pgAdmin

Up next, we will add all the data using the cloud

▪️
⚠️
✅
✅
✅
✅
⚠️
✅
✅
❓
✅
👀
☁️
https://github.com/Tinker0425/de-zoomcamp-my-work/blob/master/module-02/video_3/docker-compose.yml
https://github.com/Tinker0425/de-zoomcamp-my-work/blob/master/module-02/video_3/02_postgres_taxi.yaml
https://github.com/Tinker0425/de-zoomcamp-my-work/blob/master/module-02/video_3/02_postgres_green_taxi.yaml
⛔
Page cover image