# 2.2.1 - Create an ETL Pipeline with Postgres in Kestra

Youtube Video | \~28 min

:no\_entry: I recommend going through all 3 videos in one sitting

{% embed url="<https://youtu.be/OkfLX28Ecjg?si=vKbIyWo1TtjpNnvt>" %}

{% hint style="info" %}
To get pgAdmin to connect, I use a docker-compose.yml that Bruno put in slack and then needed to modify one line in the video code...
{% endhint %}

## Build postgres\_taxi.yaml Flow

**First**, we need to connect our Kestra, pgAdmin, and Postgres using `docker-compose.yml`&#x20;

1. Open <mark style="background-color:blue;">Docker</mark> Desktop&#x20;
2. Add this `docker-compose.yml` from Bruno to your module-2 directory:
   1. <https://github.com/Tinker0425/de-zoomcamp-my-work/blob/master/module-02/video_3/docker-compose.yml>
3. :black\_small\_square:Terminal head to the directory of your docker-compose.yml and run
   1. ```bash
      docker compose up
      ```
   2. :warning: Note that this load may take a bit of time for the first compose
4. :white\_check\_mark: Check that <mark style="background-color:purple;">Kestra</mark> 8080 and <mark style="background-color:yellow;">pgAdmin</mark> 9000 are running

***

### Build postgres\_taxi.yaml&#x20;

<figure><img src="/files/MZk729y8yr03hcHSBmWS" alt=""><figcaption></figcaption></figure>

Final YAMLs

{% tabs %}
{% tab title="both taxis" %}
<https://github.com/Tinker0425/de-zoomcamp-my-work/blob/master/module-02/video_3/02_postgres_taxi.yaml>
{% endtab %}

{% tab title="green taxi only" %}
<https://github.com/Tinker0425/de-zoomcamp-my-work/blob/master/module-02/video_3/02_postgres_green_taxi.yaml>
{% endtab %}
{% endtabs %}

{% stepper %}
{% step %}

### Goal - View Empty Table in PgAdmin

To get the editor window, click 'Flows on the left hnd side -> 'Editor' tab -> 'Source and topography' as your file view option, similar to his in the video, ours should look like:

<figure><img src="/files/kbHofASBjnG7RwtKbBj7" alt=""><figcaption></figcaption></figure>

Get used to using <mark style="background-color:purple;">Kestra</mark> and setting things up. I recommend writing this out, understand what it is doing, and then testing the run and output:

````yaml
id: postgres_taxi
namespace: zoomcamp

inputs:
  - id: taxi
    type: SELECT
    displayName: Select taxi type
    values: ['yellow','green']
    defaults: 'yellow'

  - id: year
    type: SELECT
    displayName: Pick Year
    values: ["2019","2020"]
    defaults: "2019"

  - id: month
    type: SELECT
    displayName: Pick Month
    values: ["01","02","03","04","05","06","07","08","09","10","11","12"]
    defaults: "01"

variables:
  file: "{{inputs.taxi}}_tripdata_{{inputs.year}}-{{inputs.month}}.csv"
  staging_table: "public.{{inputs.taxi}}_tripdata_staging"
  table: "public.{{inputs.taxi}}_tripdata"
  data: "{{outputs.extract.outputFiles[inputs.taxi ~ '_tripdata_' ~ inputs.year ~ '-' ~ inputs.month ~ '.csv']}}"


tasks:
  - id: set_label
    type: io.kestra.plugin.core.execution.Labels
    labels:
      file: "{{render(vars.file)}}"
      taxi: "{{inputs.taxi}}"

  - id: extract
    type: io.kestra.plugin.scripts.shell.Commands
    outputFiles:
      - "*.csv"
    taskRunner:
      type: io.kestra.plugin.core.runner.Process
    commands:
      - wget -qO- https://github.com/DataTalksClub/nyc-tlc-data/releases/download/{{inputs.taxi}}/{{render(vars.file)}}.gz | gunzip > {{render(vars.file)}}

  - id: green_create_table
    type: io.kestra.plugin.jdbc.postgresql.Queries
    sql: |
      CREATE TABLE IF NOT EXISTS {{render(vars.table)}} (
          unique_row_id          text,
          filename               text,
          VendorID               text,
          lpep_pickup_datetime   timestamp,
          lpep_dropoff_datetime  timestamp,
          store_and_fwd_flag     text,
          RatecodeID             text,
          PULocationID           text,
          DOLocationID           text,
          passenger_count        integer,
          trip_distance          double precision,
          fare_amount            double precision,
          extra                  double precision,
          mta_tax                double precision,
          tip_amount             double precision,
          tolls_amount           double precision,
          ehail_fee              double precision,
          improvement_surcharge  double precision,
          total_amount           double precision,
          payment_type           integer,
          trip_type              integer,
          congestion_surcharge   double precision
      );

  - id: green_create_staging_table
    type: io.kestra.plugin.jdbc.postgresql.Queries
    sql: |
      CREATE TABLE IF NOT EXISTS {{render(vars.staging_table)}} (
          unique_row_id          text,
          filename               text,
          VendorID               text,
          lpep_pickup_datetime   timestamp,
          lpep_dropoff_datetime  timestamp,
          store_and_fwd_flag     text,
          RatecodeID             text,
          PULocationID           text,
          DOLocationID           text,
          passenger_count        integer,
          trip_distance          double precision,
          fare_amount            double precision,
          extra                  double precision,
          mta_tax                double precision,
          tip_amount             double precision,
          tolls_amount           double precision,
          ehail_fee              double precision,
          improvement_surcharge  double precision,
          total_amount           double precision,
          payment_type           integer,
          trip_type              integer,
          congestion_surcharge   double precision
      );

pluginDefaults:
  - type: io.kestra.plugin.jdbc.postgresql
    values:
    url: jdbc:postgresql://kestra-metadata:5432/kestra
      username: kestra
      password: k3str4
```
````

{% hint style="info" %}
Line - url: jdbc:postgresql://kestra-metadata:5432/kestra was changed from video and github so we can use pgAdmin in our docker-compose and not have to download it locally.
{% endhint %}

<mark style="background-color:purple;">Kestra</mark>, save and execute - then open <mark style="background-color:yellow;">pgAdmin</mark> and add a new server connection:

![](/files/hEpkxYgtotXNfC9bCaX2)

:white\_check\_mark: You should now see two empty green taxi tables in <mark style="background-color:yellow;">pgAdmin</mark>
{% endstep %}

{% step %}

### Add `green_copy_in_to_staging_table`&#x20;

```yaml
- id: green_copy_in_to_staging_table
  type: io.kestra.plugin.jdbc.postgresql.CopyIn
  format: CSV
  from: "{{render(vars.data)}}"
  table: "{{render(vars.staging_table)}}"
  header: true
  columns: [VendorID,lpep_pickup_datetime,lpep_dropoff_datetime,store_and_fwd_flag,RatecodeID,PULocationID,DOLocationID,passenger_count,trip_distance,fare_amount,extra,mta_tax,tip_amount,tolls_amount,ehail_fee,improvement_surcharge,total_amount,payment_type,trip_type,congestion_surcharge]
```

Add to tasks the `green_copy_in_to_staging_table` and re-run in <mark style="background-color:purple;">Kestra</mark>&#x20;

:white\_check\_mark: You should now see data in the green taxi staging table in <mark style="background-color:yellow;">pgAdmin</mark>
{% endstep %}

{% step %}

### Add `green_add_unique_id_and_filename`

```yaml
- id: green_add_unique_id_and_filename
  type: io.kestra.plugin.jdbc.postgresql.Queries
  sql: |
    UPDATE {{render(vars.staging_table)}}
    SET 
      unique_row_id = md5(
        COALESCE(CAST(VendorID AS text), '') ||
        COALESCE(CAST(lpep_pickup_datetime AS text), '') || 
        COALESCE(CAST(lpep_dropoff_datetime AS text), '') || 
        COALESCE(PULocationID, '') || 
        COALESCE(DOLocationID, '') || 
        COALESCE(CAST(fare_amount AS text), '') || 
        COALESCE(CAST(trip_distance AS text), '')      
      ),
      filename = '{{render(vars.file)}}';
```

Add to tasks the `green_add_unique_id_and_filename` and re-run in <mark style="background-color:purple;">Kestra</mark>&#x20;

:white\_check\_mark: You should now see unique id's and filename columns updated in the green taxi staging table in <mark style="background-color:yellow;">pgAdmin</mark>

:warning: This appends rows so we have duplicates, thus we need to Truncate our tables first:
{% endstep %}

{% step %}

### Add `green_truncate_staging_table`

Add to tasks `green_truncate_staging_table` after `green_create_staging_table`

```yaml
- id: green_truncate_staging_table
  type: io.kestra.plugin.jdbc.postgresql.Queries
  sql: |
    TRUNCATE TABLE {{render(vars.staging_table)}};
```

Re-run in <mark style="background-color:purple;">Kestra</mark>

:white\_check\_mark: You should now see data that is no longer appending itself in the green taxi staging table in <mark style="background-color:yellow;">pgAdmin</mark>
{% endstep %}

{% step %}

### Add `green_merge_data`

```yaml
- id: green_merge_data
  type: io.kestra.plugin.jdbc.postgresql.Queries
  sql: |
    MERGE INTO {{render(vars.table)}} AS T
    USING {{render(vars.staging_table)}} AS S
    ON T.unique_row_id = S.unique_row_id
    WHEN NOT MATCHED THEN
      INSERT (
        unique_row_id, filename, VendorID, lpep_pickup_datetime, lpep_dropoff_datetime,
        store_and_fwd_flag, RatecodeID, PULocationID, DOLocationID, passenger_count,
        trip_distance, fare_amount, extra, mta_tax, tip_amount, tolls_amount, ehail_fee,
        improvement_surcharge, total_amount, payment_type, trip_type, congestion_surcharge
      )
      VALUES (
        S.unique_row_id, S.filename, S.VendorID, S.lpep_pickup_datetime, S.lpep_dropoff_datetime,
        S.store_and_fwd_flag, S.RatecodeID, S.PULocationID, S.DOLocationID, S.passenger_count,
        S.trip_distance, S.fare_amount, S.extra, S.mta_tax, S.tip_amount, S.tolls_amount, S.ehail_fee,
        S.improvement_surcharge, S.total_amount, S.payment_type, S.trip_type, S.congestion_surcharge
      );
```

Add to tasks `green_merge_data`and re-run in <mark style="background-color:purple;">Kestra</mark>

:white\_check\_mark: You should now see data in the main green taxi table in <mark style="background-color:yellow;">pgAdmin</mark>
{% endstep %}

{% step %}

### Add `purge_files`

To keep Kestra clean and not run out of room, we want to purge these files after loading

```yaml
- id: purge_files
  type: io.kestra.plugin.core.storage.PurgeCurrentExecutionFiles
  description: This will remove output files. If you'd like to explore Kestra outputs, disable it.
```

{% endstep %}

{% step %}

### Add yellow taxi's using 'if' statement

:question: I feel like there should be a more efficient way?

```yaml
- id: if_yellow_taxi
  type: io.kestra.plugin.core.flow.If
  condition: "{{inputs.taxi == 'yellow'}}"
  then:
    - id: yellow_create_table

####
# Then add all taxi tasks renamed as 'yellow'
####

# Then add if green and the green tasks
- id: if_green_taxi
  type: io.kestra.plugin.core.flow.If
  condition: "{{inputs.taxi == 'green'}}"
  then:
    - id: green_create_table

```

Add to tasks and re-run in <mark style="background-color:purple;">Kestra</mark>

:white\_check\_mark: You should now see yellow and green taxi table in <mark style="background-color:yellow;">pgAdmin</mark>
{% endstep %}
{% endstepper %}

:eyes: Up next, we will add all the data using the cloud :cloud:


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://data-engineering-zoomcamp-2025-t.gitbook.io/tinker0425/module-2/2.2-etl-pipelines-in-kestra-detailed-walkthrough/2.2.1-create-an-etl-pipeline-with-postgres-in-kestra.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
