Page cover

2.2.1 - Create an ETL Pipeline with Postgres in Kestra

Last updated Jan 28, 2025

Youtube Video | ~28 min

I recommend going through all 3 videos in one sitting

To get pgAdmin to connect, I use a docker-compose.yml that Bruno put in slack and then needed to modify one line in the video code...

Build postgres_taxi.yaml Flow

First, we need to connect our Kestra, pgAdmin, and Postgres using docker-compose.yml

  1. Open Docker Desktop

  2. Add this docker-compose.yml from Bruno to your module-2 directory:

  3. ▪️Terminal head to the directory of your docker-compose.yml and run

    1. docker compose up
    2. ⚠️ Note that this load may take a bit of time for the first compose

  4. Check that Kestra 8080 and pgAdmin 9000 are running


Build postgres_taxi.yaml

Final YAMLs

1

Goal - View Empty Table in PgAdmin

To get the editor window, click 'Flows on the left hnd side -> 'Editor' tab -> 'Source and topography' as your file view option, similar to his in the video, ours should look like:

Get used to using Kestra and setting things up. I recommend writing this out, understand what it is doing, and then testing the run and output:

id: postgres_taxi
namespace: zoomcamp

inputs:
  - id: taxi
    type: SELECT
    displayName: Select taxi type
    values: ['yellow','green']
    defaults: 'yellow'

  - id: year
    type: SELECT
    displayName: Pick Year
    values: ["2019","2020"]
    defaults: "2019"

  - id: month
    type: SELECT
    displayName: Pick Month
    values: ["01","02","03","04","05","06","07","08","09","10","11","12"]
    defaults: "01"

variables:
  file: "{{inputs.taxi}}_tripdata_{{inputs.year}}-{{inputs.month}}.csv"
  staging_table: "public.{{inputs.taxi}}_tripdata_staging"
  table: "public.{{inputs.taxi}}_tripdata"
  data: "{{outputs.extract.outputFiles[inputs.taxi ~ '_tripdata_' ~ inputs.year ~ '-' ~ inputs.month ~ '.csv']}}"


tasks:
  - id: set_label
    type: io.kestra.plugin.core.execution.Labels
    labels:
      file: "{{render(vars.file)}}"
      taxi: "{{inputs.taxi}}"

  - id: extract
    type: io.kestra.plugin.scripts.shell.Commands
    outputFiles:
      - "*.csv"
    taskRunner:
      type: io.kestra.plugin.core.runner.Process
    commands:
      - wget -qO- https://github.com/DataTalksClub/nyc-tlc-data/releases/download/{{inputs.taxi}}/{{render(vars.file)}}.gz | gunzip > {{render(vars.file)}}

  - id: green_create_table
    type: io.kestra.plugin.jdbc.postgresql.Queries
    sql: |
      CREATE TABLE IF NOT EXISTS {{render(vars.table)}} (
          unique_row_id          text,
          filename               text,
          VendorID               text,
          lpep_pickup_datetime   timestamp,
          lpep_dropoff_datetime  timestamp,
          store_and_fwd_flag     text,
          RatecodeID             text,
          PULocationID           text,
          DOLocationID           text,
          passenger_count        integer,
          trip_distance          double precision,
          fare_amount            double precision,
          extra                  double precision,
          mta_tax                double precision,
          tip_amount             double precision,
          tolls_amount           double precision,
          ehail_fee              double precision,
          improvement_surcharge  double precision,
          total_amount           double precision,
          payment_type           integer,
          trip_type              integer,
          congestion_surcharge   double precision
      );

  - id: green_create_staging_table
    type: io.kestra.plugin.jdbc.postgresql.Queries
    sql: |
      CREATE TABLE IF NOT EXISTS {{render(vars.staging_table)}} (
          unique_row_id          text,
          filename               text,
          VendorID               text,
          lpep_pickup_datetime   timestamp,
          lpep_dropoff_datetime  timestamp,
          store_and_fwd_flag     text,
          RatecodeID             text,
          PULocationID           text,
          DOLocationID           text,
          passenger_count        integer,
          trip_distance          double precision,
          fare_amount            double precision,
          extra                  double precision,
          mta_tax                double precision,
          tip_amount             double precision,
          tolls_amount           double precision,
          ehail_fee              double precision,
          improvement_surcharge  double precision,
          total_amount           double precision,
          payment_type           integer,
          trip_type              integer,
          congestion_surcharge   double precision
      );

pluginDefaults:
  - type: io.kestra.plugin.jdbc.postgresql
    values:
    url: jdbc:postgresql://kestra-metadata:5432/kestra
      username: kestra
      password: k3str4
```

Line - url: jdbc:postgresql://kestra-metadata:5432/kestra was changed from video and github so we can use pgAdmin in our docker-compose and not have to download it locally.

Kestra, save and execute - then open pgAdmin and add a new server connection:

You should now see two empty green taxi tables in pgAdmin

2

Add green_copy_in_to_staging_table

- id: green_copy_in_to_staging_table
  type: io.kestra.plugin.jdbc.postgresql.CopyIn
  format: CSV
  from: "{{render(vars.data)}}"
  table: "{{render(vars.staging_table)}}"
  header: true
  columns: [VendorID,lpep_pickup_datetime,lpep_dropoff_datetime,store_and_fwd_flag,RatecodeID,PULocationID,DOLocationID,passenger_count,trip_distance,fare_amount,extra,mta_tax,tip_amount,tolls_amount,ehail_fee,improvement_surcharge,total_amount,payment_type,trip_type,congestion_surcharge]

Add to tasks the green_copy_in_to_staging_table and re-run in Kestra

You should now see data in the green taxi staging table in pgAdmin

3

Add green_add_unique_id_and_filename

- id: green_add_unique_id_and_filename
  type: io.kestra.plugin.jdbc.postgresql.Queries
  sql: |
    UPDATE {{render(vars.staging_table)}}
    SET 
      unique_row_id = md5(
        COALESCE(CAST(VendorID AS text), '') ||
        COALESCE(CAST(lpep_pickup_datetime AS text), '') || 
        COALESCE(CAST(lpep_dropoff_datetime AS text), '') || 
        COALESCE(PULocationID, '') || 
        COALESCE(DOLocationID, '') || 
        COALESCE(CAST(fare_amount AS text), '') || 
        COALESCE(CAST(trip_distance AS text), '')      
      ),
      filename = '{{render(vars.file)}}';

Add to tasks the green_add_unique_id_and_filename and re-run in Kestra

You should now see unique id's and filename columns updated in the green taxi staging table in pgAdmin

⚠️ This appends rows so we have duplicates, thus we need to Truncate our tables first:

4

Add green_truncate_staging_table

Add to tasks green_truncate_staging_table after green_create_staging_table

- id: green_truncate_staging_table
  type: io.kestra.plugin.jdbc.postgresql.Queries
  sql: |
    TRUNCATE TABLE {{render(vars.staging_table)}};

Re-run in Kestra

You should now see data that is no longer appending itself in the green taxi staging table in pgAdmin

5

Add green_merge_data

- id: green_merge_data
  type: io.kestra.plugin.jdbc.postgresql.Queries
  sql: |
    MERGE INTO {{render(vars.table)}} AS T
    USING {{render(vars.staging_table)}} AS S
    ON T.unique_row_id = S.unique_row_id
    WHEN NOT MATCHED THEN
      INSERT (
        unique_row_id, filename, VendorID, lpep_pickup_datetime, lpep_dropoff_datetime,
        store_and_fwd_flag, RatecodeID, PULocationID, DOLocationID, passenger_count,
        trip_distance, fare_amount, extra, mta_tax, tip_amount, tolls_amount, ehail_fee,
        improvement_surcharge, total_amount, payment_type, trip_type, congestion_surcharge
      )
      VALUES (
        S.unique_row_id, S.filename, S.VendorID, S.lpep_pickup_datetime, S.lpep_dropoff_datetime,
        S.store_and_fwd_flag, S.RatecodeID, S.PULocationID, S.DOLocationID, S.passenger_count,
        S.trip_distance, S.fare_amount, S.extra, S.mta_tax, S.tip_amount, S.tolls_amount, S.ehail_fee,
        S.improvement_surcharge, S.total_amount, S.payment_type, S.trip_type, S.congestion_surcharge
      );

Add to tasks green_merge_dataand re-run in Kestra

You should now see data in the main green taxi table in pgAdmin

6

Add purge_files

To keep Kestra clean and not run out of room, we want to purge these files after loading

- id: purge_files
  type: io.kestra.plugin.core.storage.PurgeCurrentExecutionFiles
  description: This will remove output files. If you'd like to explore Kestra outputs, disable it.

7

Add yellow taxi's using 'if' statement

I feel like there should be a more efficient way?

- id: if_yellow_taxi
  type: io.kestra.plugin.core.flow.If
  condition: "{{inputs.taxi == 'yellow'}}"
  then:
    - id: yellow_create_table

####
# Then add all taxi tasks renamed as 'yellow'
####

# Then add if green and the green tasks
- id: if_green_taxi
  type: io.kestra.plugin.core.flow.If
  condition: "{{inputs.taxi == 'green'}}"
  then:
    - id: green_create_table

Add to tasks and re-run in Kestra

You should now see yellow and green taxi table in pgAdmin

👀 Up next, we will add all the data using the cloud ☁️

Last updated