2.2.1 - Create an ETL Pipeline with Postgres in Kestra
Last updated Jan 28, 2025
Youtube Video | ~28 min
⛔ I recommend going through all 3 videos in one sitting
Build postgres_taxi.yaml Flow
First, we need to connect our Kestra, pgAdmin, and Postgres using docker-compose.yml
Open Docker Desktop
Add this
docker-compose.yml
from Bruno to your module-2 directory:▪️Terminal head to the directory of your docker-compose.yml and run
docker compose up
⚠️ Note that this load may take a bit of time for the first compose
✅ Check that Kestra 8080 and pgAdmin 9000 are running
Build postgres_taxi.yaml

Final YAMLs
Goal - View Empty Table in PgAdmin
To get the editor window, click 'Flows on the left hnd side -> 'Editor' tab -> 'Source and topography' as your file view option, similar to his in the video, ours should look like:

Get used to using Kestra and setting things up. I recommend writing this out, understand what it is doing, and then testing the run and output:
id: postgres_taxi
namespace: zoomcamp
inputs:
- id: taxi
type: SELECT
displayName: Select taxi type
values: ['yellow','green']
defaults: 'yellow'
- id: year
type: SELECT
displayName: Pick Year
values: ["2019","2020"]
defaults: "2019"
- id: month
type: SELECT
displayName: Pick Month
values: ["01","02","03","04","05","06","07","08","09","10","11","12"]
defaults: "01"
variables:
file: "{{inputs.taxi}}_tripdata_{{inputs.year}}-{{inputs.month}}.csv"
staging_table: "public.{{inputs.taxi}}_tripdata_staging"
table: "public.{{inputs.taxi}}_tripdata"
data: "{{outputs.extract.outputFiles[inputs.taxi ~ '_tripdata_' ~ inputs.year ~ '-' ~ inputs.month ~ '.csv']}}"
tasks:
- id: set_label
type: io.kestra.plugin.core.execution.Labels
labels:
file: "{{render(vars.file)}}"
taxi: "{{inputs.taxi}}"
- id: extract
type: io.kestra.plugin.scripts.shell.Commands
outputFiles:
- "*.csv"
taskRunner:
type: io.kestra.plugin.core.runner.Process
commands:
- wget -qO- https://github.com/DataTalksClub/nyc-tlc-data/releases/download/{{inputs.taxi}}/{{render(vars.file)}}.gz | gunzip > {{render(vars.file)}}
- id: green_create_table
type: io.kestra.plugin.jdbc.postgresql.Queries
sql: |
CREATE TABLE IF NOT EXISTS {{render(vars.table)}} (
unique_row_id text,
filename text,
VendorID text,
lpep_pickup_datetime timestamp,
lpep_dropoff_datetime timestamp,
store_and_fwd_flag text,
RatecodeID text,
PULocationID text,
DOLocationID text,
passenger_count integer,
trip_distance double precision,
fare_amount double precision,
extra double precision,
mta_tax double precision,
tip_amount double precision,
tolls_amount double precision,
ehail_fee double precision,
improvement_surcharge double precision,
total_amount double precision,
payment_type integer,
trip_type integer,
congestion_surcharge double precision
);
- id: green_create_staging_table
type: io.kestra.plugin.jdbc.postgresql.Queries
sql: |
CREATE TABLE IF NOT EXISTS {{render(vars.staging_table)}} (
unique_row_id text,
filename text,
VendorID text,
lpep_pickup_datetime timestamp,
lpep_dropoff_datetime timestamp,
store_and_fwd_flag text,
RatecodeID text,
PULocationID text,
DOLocationID text,
passenger_count integer,
trip_distance double precision,
fare_amount double precision,
extra double precision,
mta_tax double precision,
tip_amount double precision,
tolls_amount double precision,
ehail_fee double precision,
improvement_surcharge double precision,
total_amount double precision,
payment_type integer,
trip_type integer,
congestion_surcharge double precision
);
pluginDefaults:
- type: io.kestra.plugin.jdbc.postgresql
values:
url: jdbc:postgresql://kestra-metadata:5432/kestra
username: kestra
password: k3str4
```
Kestra, save and execute - then open pgAdmin and add a new server connection:
✅ You should now see two empty green taxi tables in pgAdmin
Add green_copy_in_to_staging_table
green_copy_in_to_staging_table
- id: green_copy_in_to_staging_table
type: io.kestra.plugin.jdbc.postgresql.CopyIn
format: CSV
from: "{{render(vars.data)}}"
table: "{{render(vars.staging_table)}}"
header: true
columns: [VendorID,lpep_pickup_datetime,lpep_dropoff_datetime,store_and_fwd_flag,RatecodeID,PULocationID,DOLocationID,passenger_count,trip_distance,fare_amount,extra,mta_tax,tip_amount,tolls_amount,ehail_fee,improvement_surcharge,total_amount,payment_type,trip_type,congestion_surcharge]
Add to tasks the green_copy_in_to_staging_table
and re-run in Kestra
✅ You should now see data in the green taxi staging table in pgAdmin
Add green_add_unique_id_and_filename
green_add_unique_id_and_filename
- id: green_add_unique_id_and_filename
type: io.kestra.plugin.jdbc.postgresql.Queries
sql: |
UPDATE {{render(vars.staging_table)}}
SET
unique_row_id = md5(
COALESCE(CAST(VendorID AS text), '') ||
COALESCE(CAST(lpep_pickup_datetime AS text), '') ||
COALESCE(CAST(lpep_dropoff_datetime AS text), '') ||
COALESCE(PULocationID, '') ||
COALESCE(DOLocationID, '') ||
COALESCE(CAST(fare_amount AS text), '') ||
COALESCE(CAST(trip_distance AS text), '')
),
filename = '{{render(vars.file)}}';
Add to tasks the green_add_unique_id_and_filename
and re-run in Kestra
✅ You should now see unique id's and filename columns updated in the green taxi staging table in pgAdmin
⚠️ This appends rows so we have duplicates, thus we need to Truncate our tables first:
Add green_truncate_staging_table
green_truncate_staging_table
Add to tasks green_truncate_staging_table
after green_create_staging_table
- id: green_truncate_staging_table
type: io.kestra.plugin.jdbc.postgresql.Queries
sql: |
TRUNCATE TABLE {{render(vars.staging_table)}};
Re-run in Kestra
✅ You should now see data that is no longer appending itself in the green taxi staging table in pgAdmin
Add green_merge_data
green_merge_data
- id: green_merge_data
type: io.kestra.plugin.jdbc.postgresql.Queries
sql: |
MERGE INTO {{render(vars.table)}} AS T
USING {{render(vars.staging_table)}} AS S
ON T.unique_row_id = S.unique_row_id
WHEN NOT MATCHED THEN
INSERT (
unique_row_id, filename, VendorID, lpep_pickup_datetime, lpep_dropoff_datetime,
store_and_fwd_flag, RatecodeID, PULocationID, DOLocationID, passenger_count,
trip_distance, fare_amount, extra, mta_tax, tip_amount, tolls_amount, ehail_fee,
improvement_surcharge, total_amount, payment_type, trip_type, congestion_surcharge
)
VALUES (
S.unique_row_id, S.filename, S.VendorID, S.lpep_pickup_datetime, S.lpep_dropoff_datetime,
S.store_and_fwd_flag, S.RatecodeID, S.PULocationID, S.DOLocationID, S.passenger_count,
S.trip_distance, S.fare_amount, S.extra, S.mta_tax, S.tip_amount, S.tolls_amount, S.ehail_fee,
S.improvement_surcharge, S.total_amount, S.payment_type, S.trip_type, S.congestion_surcharge
);
Add to tasks green_merge_data
and re-run in Kestra
✅ You should now see data in the main green taxi table in pgAdmin
Add yellow taxi's using 'if' statement
❓ I feel like there should be a more efficient way?
- id: if_yellow_taxi
type: io.kestra.plugin.core.flow.If
condition: "{{inputs.taxi == 'yellow'}}"
then:
- id: yellow_create_table
####
# Then add all taxi tasks renamed as 'yellow'
####
# Then add if green and the green tasks
- id: if_green_taxi
type: io.kestra.plugin.core.flow.If
condition: "{{inputs.taxi == 'green'}}"
then:
- id: green_create_table
Add to tasks and re-run in Kestra
✅ You should now see yellow and green taxi table in pgAdmin
👀 Up next, we will add all the data using the cloud ☁️
Last updated