Snowflake is Cloud hosted relational database used to create Datawarehouse on demand. Data in the data warehouse can be loaded as full load or incremental load. The full load is a process of deleting whole existing data and reloading it again. Full loads are time and resource-consuming tasks compared to incremental loads that only load a small amount of new or updated data instead of loading full data every time. We can achieve incremental loading in snowflake by implementing change data capture (CDC)using Stream and Merge objects. Stream object is used for change data capture which includes inserts, updates, and deletes, as well as metadata about each change so that actions can be taken using the changed data. The data captured using stream is then merged to the target table using match and not match condition.
What are Stream and Merge?
Merge–
Merge is command is used to perform some alterations on the table, to update the existing records, delete the old/inactive records, or add new rows from another table.
Snowflake offers two clauses to perform Merge:
- Matched Clause – Matched Clause performs Update and Delete operation on the target table when the rows satisfy the condition.
- Not Matched Clause – Not Matched Clause performs the Insert operation when the row satisfying conditions are not matched. The rows from the source table that are not matched with the target table will be inserted.
Stream–
Stream is a table created on the top of the source to capture change data; it tracks the changes made to source table rows.
The created stream object just holds the offset from where change data capture can be tracked, however, the main data in source remain unaltered.
3 additional columns are added to the source table in a stream-
Column | Description |
---|---|
METADATA$ACTION | It may have only two values Insert/Delete |
METADATA$ISUPDATE | This will be flagged as True if the record is an updated |
METADATA$ROW_ID | There are unique hash keys that will be tracked against each change. |
As we know now what is stream and merge , Let’s see how to use stream and merge to load the data-
Step 1-
Connect to the Snowflake DB and Create sample source and target tables
Step2-
Create stream on source table using below query-
Step3–
Let’s insert some dummy data into the source table-
After inserting data into the source let’s check data captured in the stream-
As we inserted data the first time in the source the newly inserted rows will be flagged as INSERT in the METADATA$ACTION column and METADATA$UPDATE as FALSE in the stream.
Step4-
Inset data into target using stream and merge using below query-
As we are inserting data the first time there will not be any matching personal_id in the target table and as the METADATA$ACTION flag is INSERT, the merge command will insert the whole data into the target table as it is.
Step5–
Let’s update a few source rows and load them again to target-
As soon as we update the source table, the stream will capture these changes and update the stream data.
The updated row will be marked as INSERT and the older row which we updated will be marked as Delete in the METADATA$ACTION column. so that when we load updated data from source to target older row with City Nagpur will get deleted and updated row with City Mumbai will get inserted.
Again run the same stream and merge command we used earlier to load only updated data in target, updated target data will look like this-
Here you have successfully achieved incremental loading using snowflake.
To automate this load process we can create a task, this task will run after a specified time interval and load data into the target if there are any source changes.
Happy Reading!
I have stared snowflake from last week and this blog is very helpful to me foe understanding Merge statement.
thank you so much for sharing.
Very Informative..
Hey Praful,
I have Implemented the Incremental operations by creating Snowflake Stream and Merge Statement and it’s perfectly working fine with data volume of 950 millions rows in the target table.
Incremental Checks are quite fast and perform the upsert and delete operation within 2 minutes using Warehouse with Size = XSMALL.
Just a query ,if we wanted to execute this process on hourly schedule , do we need to go with a snowflake task or some other way to schedule this process ?
How to get information on which column is changed using Stream ?
I have to capture changes in selected columns only and update them in log tables.
How do you implement incremental load if your source data is a file from a local server and the target table is in Snowflake?