Logical deletes are a common practice when loading dimensions in a data warehouse. If a dimensional record is deleted in the source database, it may need to continue to exist in your data warehouse. In my recent project, this was one of the requirements set during the design phase. Since ODI was being used as the ETL (or ELT) tool, I started researching if there was an existing knowledge module to handle this requirement. But I couldn’t find any, so I decided to customize the out-of-the-box KM – IKM MSSQL Incremental Update.
In the example described below, I have setup Simple CDC Journal, to keep track of changes in the source data. The database is MS SQL Server.
Before making any changes, create a copy of the original out-of-the-box KM:
Open the copy of the KM created, and give it a new name in the Definition tab. Then switch to the Details tab.
Modify the step that deletes the records from the target table that are marked as deleted records in the journal table. I.e. Step 132 – Synchronize deletions from journal. In the original KM, the command executed on this step is:
delete from <%=odiRef.getTable("L","TARG_NAME","A")%> where exists (select 'X' from <%=odiRef.getTable("L","INT_NAME","A")%> <%=odiRef.getInfo("DEST_TAB_ALIAS_WORD")%> I where <%=odiRef.getColList("", odiRef.getTable("L","TARG_NAME","A") + ".[COL_NAME]\t= I.[COL_NAME]", "\n\t\tand\t", "", "UK")%> and IND_UPDATE = 'D')
Change this to an update statement in the new KM:
update <%=odiRef.getTable("L","TARG_NAME","A")%> set <%=odiRef.getColList("","[COL_NAME]\t", "\n\tand\t", "", "UD1")%> = 'Y' where exists ( select 'X' from <%=odiRef.getTable("L","INT_NAME","A")%> <%=odiRef.getInfo("DEST_TAB_ALIAS_WORD")%> I where <%=odiRef.getColList("", odiRef.getTable("L","TARG_NAME","A") + ".[COL_NAME]\t= I.[COL_NAME]", "\n\t\tand\t", "", "UK")%> and IND_UPDATE = 'D')
After the change, the step will look like this:
Save and close the customized KM.
In order to test this KM, I created a new dimension table in my target database.
CREATE TABLE DIM_TEST ( ROW_TID NUMERIC IDENTITY , CODE VARCHAR(50) , NAME VARCHAR(50) , UPD_DT DATETIME , UPD_BY VARCHAR(50) , DEL_FLG CHAR(1) , SRC_TID VARCHAR(10) )
And a similar table in my source database:
CREATE TABLE TBL_TEST ( [TID] [int] NOT NULL PRIMARY KEY, [CODE] [varchar](50) NULL, [NAME] [varchar](50) NULL, [UPD_DT] [datetime] NULL, [UPD_BY] [varchar](50) NULL, )
Import both of these tables into the target and source models in ODI. Then setup Simple CDC on the source table.
This will create a journal table and a journal view:
select * from J$TBL_TEST select * from JV$DTBL_TEST
Then, insert some records into the source table:
INSERT INTO [TBL_TEST] VALUES (1001, 'A', 'Aaaaa', GETDATE(), 'TestUser') GO INSERT INTO [TBL_TEST] VALUES (1002, 'B', 'Bbbbb', GETDATE(), 'TestUser') GO INSERT INTO [TBL_TEST] VALUES (1003, 'C', 'Ccccc', GETDATE(), 'TestUser') GO INSERT INTO [TBL_TEST] VALUES (1004, 'D', 'Ddddd', GETDATE(), 'TestUser') GO INSERT INTO [TBL_TEST] VALUES (1005, 'E', 'Eeeee', GETDATE(), 'TestUser') GO INSERT INTO [TBL_TEST] VALUES (1006, 'F', 'Fffff', GETDATE(), 'TestUser') GO INSERT INTO [TBL_TEST] VALUES (1007, 'G', 'Ggggg', GETDATE(), 'TestUser') GO
The journal table and view will now have data in them:
Create a simple interface to load this journal data into the target dimension table:
Make sure to do the following in the Mapping tab:
- Click on Target Datastore, and make sure that the Update Key is set to <Undefined>
- Click on DEL_FLG, and make sure:
- Insert box is checked
- Update box is unchecked
- Key box is unchecked
- UD1 box is checked
- Click on SRC_TID, and make sure:
- Insert box is checked
- Update box is unchecked
- Key box is checked
In the flow tab, select the custom IKM created earlier and keep the default options, and execute the interface. We now have all the records loaded from our source to target table:
As you will notice, all the deleted flag (DEL_FLG) values are ‘N’. Now we will delete one of the records in the source table, and just for fun, also update a few values:
DELETE FROM TBL_TEST WHERE CODE = 'D' GO UPDATE TBL_TEST SET NAME = 'New value 1' WHERE CODE = 'B' GO UPDATE TBL_TEST SET NAME = 'New value 2' WHERE CODE = 'G' GO UPDATE TBL_TEST SET UPD_BY = 'TestUser2' WHERE CODE = 'F' GO
Now, you will see that the journal table and view will show the changes:
Execute the interface once again to load these changes into the dimension table:
Voila! The deleted record from source table is still retained in the target table, but is now flagged as deleted, so it can be filtered out in reports.