How to Sync Changes between two tables using MERGE INTO and Delta Lake CDF (or CDC)?

Amr Ali
6 min readMar 13, 2023

--

Quite often, there comes a need where you, as a data engineer required to sync two different tables where any changes in the source table must be reflected in the target table.

An obvious solution to this is to copy everything from the source table whenever a sync is due between the two tables, but this sometimes might be a very expensive operation, the best way to do this, is to sync the changes only.

With Delta Lake CDF, we can configure the source table to generate the Change Data Feed that tells what happened exactly between versions.

Needless to say, the easiest way to do that in Databricks is to use Delta Live Table APPLY CHANGES command. However, you can still achieve the same effect if you are using Delta Lake using MERGE INTO syntax.

Let's explore an example of how to make this exactly.

First, let's create the tables. In this example, I will assume you want to sync two tables called table1 from two different databases, ‘bronze_db’ and ‘silver_db’.

create database bronze_db;
create table bronze_db.table1 (id string, name string, age int);
-- turn on CDF using delta.enableChangeDataFeed = true on the source table
ALTER TABLE bronze_db.table1 SET TBLPROPERTIES (delta.enableChangeDataFeed = true);

-- insert some records
insert into bronze_db.table1(id, name, age) values("1", "Smith", 40);
insert into bronze_db.table1(id, name, age) values("2", "Ed", 35);
select * from bronze_db.table1;

If you run this code, you will get the following result:

Let's create a silver database and initially, we will copy everything from the bronze database as it is (initial load):

drop database if exists silver_db cascade;
create database silver_db;
create table silver_db.table1 as select * from bronze_db.table1;

so now both bronze_db.table1 and silver_db.table1 are exactly the same and well sync-ed together.

Now, let's make some changes to the bronze table. Simply, we will drop row 1, update row 2 twice, and insert a new row.

delete from bronze_db.table1 where id='1';
update bronze_db.table1 set name = 'John' where id='2';
update bronze_db.table1 set age = 55 where id='2';
insert into bronze_db.table1(id, name, age) values("3", "Michael", 22);
select * from bronze_db.table1;

This is the result of the table after the update:

If you want to check what changes occurred to get to these results, this is where CDF comes into play by issuing this command. Note the parameter ‘1’ passed to the table_changes function means to get all changes starting from version 1 of the table.

SELECT * FROM table_changes('bronze_db.table1', 1);

And this is the CDF result

Now, we can try to merge this into the silver layer to sync the two tables back. However, if you run the merge code below, it will fail:

merge into silver_db.table1 as target using (SELECT * FROM table_changes('bronze_db.table1',1)) as source on target.id = source.id 
when matched and source._change_type = 'delete' then delete
when matched and (source._change_type = 'insert' or source._change_type = 'update_postimage') then update set *
when not matched and (source._change_type = 'insert' or source._change_type = 'update_postimage') then insert *;

you will get this error message:

Cannot perform Merge as multiple source rows matched and attempted to modify the same
target row in the Delta table in possibly conflicting ways. By SQL semantics of Merge,
when multiple source rows match on the same target row, the result may be ambiguous
as it is unclear which source row should be used to update or delete the matching
target row. You can preprocess the source table to eliminate the possibility of
multiple matches. Please refer to
https://docs.databricks.com/delta/delta-update.html#upsert-into-a-table-using-merge

Why this error message? it is mainly because the merge operation expects unique records when applying the ‘ON target.id = source.id’, when multiple records are matched, this confuses the MERGE INTO operation.

Let's analyse where this duplication is coming from, mainly the source of duplication are:
1. The update CDF is always two records (update_preimage, update_postimage)
2. Multiple updates on the same row

For the first source of duplication, we can exclude all changes that have the update_preimage, as we are only interested in the row values after the change occurred, not before:

SELECT * FROM table_changes('bronze_db.table1',1) where _change_type != 'update_preimage';

However, for the 2nd source of duplication will be a little tricky to handle. The problem is we need to have one unique id in the CDF, so obviously, we can use group by, but we need to pick up the latest changes as well so we do not mess up the order in which the changes were applied. i.e. we want to preserve the same update order from source to target. To do that, we can use a row_number() function as below:

  select distinct * from (SELECT *, row_number() over (partition by id order by _commit_version desc) as rank
FROM table_changes('bronze_db.table1', 1)) where _change_type != 'update_preimage' order by id, _commit_version desc

If you inspect the result, you will notice that the row_number() function has made the most recent changes ranked as 1 always (Look at id=2, the most recent change is the update_postimage with rank 1)

so, to pick up the most recent changes, we need to filter on rank = 1, so let's put all that together:

merge into silver_db.table1 as target using (select distinct * from (SELECT *, row_number() over (partition by id order by _commit_version desc) as rank
FROM table_changes('bronze_db.table1', 4)) where _change_type != 'update_preimage' and rank=1) as source on target.id = source.id
when matched and source._change_type = 'delete' then delete
when matched and (source._change_type = 'insert' or source._change_type = 'update_postimage') then update set *
when not matched and (source._change_type = 'insert' or source._change_type = 'update_postimage') then insert *;

and once you run it, you will get the following:

if you check the silver table and compare it with the bronze table, you will find them both identical now:

*Special Case***

In some rare cases, one might get two different operations in the same version number, for example, the source system has inserted a record, and then immediately deleted it in the same transaction. This case is very rare and should be carefully inspected to decide the proper order of the operations, as insert then immediately delete, would be very much confused with delete then immediately insert. However, to handle that case, one might add additional partitioning to avoid failure and pick one of the operations randomly. for example:

merge into silver_db.table1 as target using (select *, row_number() over (partition by id order by _change_type asc) as rank2 from (select distinct * from (SELECT *, row_number() over (partition by id order by _commit_version desc) as rank
FROM table_changes('bronze_db.table1', 4)) where _change_type != 'update_preimage' and rank=1) where rank2==1) as source on target.id = source.id
when matched and source._change_type = 'delete' then delete
when matched and (source._change_type = 'insert' or source._change_type = 'update_postimage') then update set *
when not matched and (source._change_type = 'insert' or source._change_type = 'update_postimage') then insert *;

Hope this was useful. And thanks to Chris Grant, from Databricks, for the feedback and insights.

--

--

Amr Ali
Amr Ali

Written by Amr Ali

Cloud Solutions Architect with an interest in Big Data Analytics, Machine Learning and AI.

Responses (2)