To get the editor window, click 'Flows on the left hnd side -> 'Editor' tab -> 'Source and topography' as your file view option, similar to his in the video, ours should look like:
Get used to using Kestra and setting things up. I recommend writing this out, understand what it is doing, and then testing the run and output:
Line - url: jdbc:postgresql://kestra-metadata:5432/kestra was changed from video and github so we can use pgAdmin in our docker-compose and not have to download it locally.
Kestra, save and execute - then open pgAdmin and add a new server connection:
✅ You should now see two empty green taxi tables in pgAdmin
2
Add green_copy_in_to_staging_table
Add to tasks the green_copy_in_to_staging_table and re-run in Kestra
✅ You should now see data in the green taxi staging table in pgAdmin
3
Add green_add_unique_id_and_filename
Add to tasks the green_add_unique_id_and_filename and re-run in Kestra
✅ You should now see unique id's and filename columns updated in the green taxi staging table in pgAdmin
⚠️ This appends rows so we have duplicates, thus we need to Truncate our tables first:
4
Add green_truncate_staging_table
Add to tasks green_truncate_staging_table after green_create_staging_table
Re-run in Kestra
✅ You should now see data that is no longer appending itself in the green taxi staging table in pgAdmin
5
Add green_merge_data
Add to tasks green_merge_dataand re-run in Kestra
✅ You should now see data in the main green taxi table in pgAdmin
6
Add purge_files
To keep Kestra clean and not run out of room, we want to purge these files after loading
7
Add yellow taxi's using 'if' statement
❓ I feel like there should be a more efficient way?
Add to tasks and re-run in Kestra
✅ You should now see yellow and green taxi table in pgAdmin
👀 Up next, we will add all the data using the cloud ☁️
- id: green_merge_data
type: io.kestra.plugin.jdbc.postgresql.Queries
sql: |
MERGE INTO {{render(vars.table)}} AS T
USING {{render(vars.staging_table)}} AS S
ON T.unique_row_id = S.unique_row_id
WHEN NOT MATCHED THEN
INSERT (
unique_row_id, filename, VendorID, lpep_pickup_datetime, lpep_dropoff_datetime,
store_and_fwd_flag, RatecodeID, PULocationID, DOLocationID, passenger_count,
trip_distance, fare_amount, extra, mta_tax, tip_amount, tolls_amount, ehail_fee,
improvement_surcharge, total_amount, payment_type, trip_type, congestion_surcharge
)
VALUES (
S.unique_row_id, S.filename, S.VendorID, S.lpep_pickup_datetime, S.lpep_dropoff_datetime,
S.store_and_fwd_flag, S.RatecodeID, S.PULocationID, S.DOLocationID, S.passenger_count,
S.trip_distance, S.fare_amount, S.extra, S.mta_tax, S.tip_amount, S.tolls_amount, S.ehail_fee,
S.improvement_surcharge, S.total_amount, S.payment_type, S.trip_type, S.congestion_surcharge
);
- id: purge_files
type: io.kestra.plugin.core.storage.PurgeCurrentExecutionFiles
description: This will remove output files. If you'd like to explore Kestra outputs, disable it.
- id: if_yellow_taxi
type: io.kestra.plugin.core.flow.If
condition: "{{inputs.taxi == 'yellow'}}"
then:
- id: yellow_create_table
####
# Then add all taxi tasks renamed as 'yellow'
####
# Then add if green and the green tasks
- id: if_green_taxi
type: io.kestra.plugin.core.flow.If
condition: "{{inputs.taxi == 'green'}}"
then:
- id: green_create_table