Alex Merced || 2024-10-04
Data Lakehouse || data lakehouse - data engineering - streaming
- Free Copy of Apache Iceberg: The Definitive Guide
- Free Apache Iceberg Crash Course
- A Guide to Change Data Capture (CDC) with Apache Iceberg
- Using Apache Iceberg with Kafka Connect
- Using Apache Iceberg with Flink
- Streaming and Batch Data Lakehouses with Apache Iceberg, Dremio and Upsolver
Introduction
Overview of CDC
Change Data Capture (CDC) is the process of identifying and capturing changes made to data within a database. It’s a critical technique in modern data architectures, enabling systems to stay synchronized, whether for analytical purposes, replication, or near-real-time data streaming. CDC helps minimize the need to reprocess entire datasets by focusing on only the incremental changes—new inserts, updates, and deletions.
Challenges with Systems Lacking Native CDC
Many databases offer native CDC features, such as change logs or triggers, that automatically track data modifications. However, when working with systems that don’t provide these built-in features, implementing CDC becomes more challenging. You need to design tables and processes that allow you to track changes manually while minimizing performance impact. Without proper design, you may face issues like slow updates, data inconsistencies, and complex synchronization logic.
Goals of This Blog
In this blog, we will explore:
- How to design tables that enable efficient incremental updates, even in systems without CDC.
- How to write SQL for applying changes to other tables effectively, reducing the overhead of full table scans or complete reloads.
Designing Tables for Incremental Updates
Use of Timestamps and Version Columns
One of the simplest and most effective ways to track changes in systems without CDC is by adding updated_at
and created_at
columns to your tables. These timestamp columns can provide a clear audit trail of when rows were inserted or modified. For updates, the updated_at
field gets refreshed, allowing you to easily query records that have changed since the last update.
Additionally, incorporating a version column helps track the number of times a record has been modified. Each time a row is updated, the version increases, making it easier to detect whether a record needs to be synchronized elsewhere. For soft deletes, adding a deleted_at
column can signal when a row has been marked for deletion without physically removing it.
Storing Historical Data
Maintaining historical data is another technique for simulating CDC. You can create history or audit tables that store every change made to a record, preserving its previous versions. This enables you to recreate the full history of changes while still maintaining a “live” table with only the current state.
When capturing history:
- Use a combination of
INSERT
andUPDATE
triggers, or manually insert new versions into the history table. - Each entry can include metadata like
modified_by
andoperation_type
(insert, update, delete) to clarify how the change occurred.
Partitioning and Indexing Strategies
For larger tables, partitioning by time or version can help optimize incremental updates. By partitioning on columns like updated_at
, you can narrow down the scope of queries to the most recent partitions, avoiding costly full table scans.
Similarly, indexing on these columns can accelerate queries by making it faster to retrieve the most recently modified rows. Keep in mind that partitioning and indexing come with trade-offs, such as increased write overhead, so it’s important to balance these optimizations based on the use case.
Writing SQL for Efficient Incremental Updates
Identifying the Changes
Once your tables are designed to track changes, the next step is to efficiently query for only the modified rows. This can be achieved by leveraging the updated_at
or version
columns.
For example, to select all records updated in the last hour, you can use a query like:
SELECT *
FROM your_table
WHERE updated_at > NOW() - INTERVAL '1 hour';
Alternatively, if you’re using a versioning system, you can select all records with a version greater than the last processed version:
SELECT *
FROM your_table
WHERE version > :last_processed_version;
These queries allow you to efficiently identify incremental changes without having to scan the entire table.
Merging Changes into Target Tables
After identifying the changes, you’ll want to apply them to the target tables (such as a reporting or analytics table). Depending on the database you’re using, you might employ different strategies. If the database supports the MERGE statement, it can handle inserts, updates, and deletions in a single query.
Here’s an example of how you can merge changes from a source table into a target table:
MERGE INTO target_table AS t
USING source_table AS s
ON t.id = s.id
WHEN MATCHED AND s.updated_at > t.updated_at THEN
UPDATE SET t.column1 = s.column1, t.updated_at = s.updated_at
WHEN NOT MATCHED THEN
INSERT (id, column1, updated_at)
VALUES (s.id, s.column1, s.updated_at);
In systems where MERGE isn’t supported, you can use a combination of INSERT and UPDATE queries:
-- Update existing records
UPDATE target_table AS t
SET column1 = s.column1, updated_at = s.updated_at
FROM source_table AS s
WHERE t.id = s.id AND s.updated_at > t.updated_at;
-- Insert new records
INSERT INTO target_table (id, column1, updated_at)
SELECT s.id, s.column1, s.updated_at
FROM source_table AS s
WHERE NOT EXISTS (
SELECT 1 FROM target_table t WHERE t.id = s.id
);
Handling Conflicts
When applying incremental changes, conflicts can arise, especially if multiple systems or users are updating the same data. One common conflict occurs when duplicate records are inserted or when simultaneous updates lead to inconsistencies.
To manage this, databases often provide clauses like ON CONFLICT
(in PostgreSQL) or similar approaches in other systems. For example, to avoid conflicts during inserts, you can specify an action in case of a duplicate key:
INSERT INTO target_table (id, column1, updated_at)
VALUES (:id, :column1, :updated_at)
ON CONFLICT (id) DO UPDATE
SET column1 = EXCLUDED.column1, updated_at = EXCLUDED.updated_at;
This ensures that if the row already exists, it will be updated rather than throwing an error.
Batch Processing and Scheduling Incremental Updates
Batching Updates
To avoid overwhelming your system or locking your database during large updates, batching incremental updates can be an effective strategy. Instead of applying all changes at once, process them in smaller, manageable chunks. For example, you can process updates in batches of 1,000 rows at a time.
Here’s how you could implement batch processing in SQL:
-- Assume a batch size of 1000 rows
WITH batch AS (
SELECT * FROM source_table
WHERE updated_at > :last_processed_time
ORDER BY updated_at
LIMIT 1000
)
-- Apply the batch to the target table
MERGE INTO target_table AS t
USING batch AS b
ON t.id = b.id
WHEN MATCHED AND b.updated_at > t.updated_at THEN
UPDATE SET t.column1 = b.column1, t.updated_at = b.updated_at
WHEN NOT MATCHED THEN
INSERT (id, column1, updated_at)
VALUES (b.id, b.column1, b.updated_at);
After processing each batch, you can update your tracking mechanism (e.g., the last processed timestamp or version) and continue with the next batch.
Efficient Scheduling of Updates
To ensure that your incremental updates happen regularly, you need an efficient scheduling mechanism. For databases without native job scheduling, you can rely on external tools such as cron jobs, Airflow, or other orchestration systems.
Here’s an example of how you might schedule your updates using a cron job:
# Run every 10 minutes
*/10 * * * * /path/to/script/incremental_update.sh
In environments using more complex workflows, you can configure tools like Apache Airflow to orchestrate these updates, handling dependencies and retries in case of failure.
# Airflow DAG for incremental updates
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime
dag = DAG('incremental_update', start_date=datetime(2023, 1, 1))
run_incremental_update = BashOperator(
task_id='run_incremental_update',
bash_command='python /path/to/incremental_update.py',
dag=dag
)
run_incremental_update
By automating these updates, you can ensure your target systems stay synchronized with minimal manual intervention.
Monitoring and Validating Incremental Updates
Tracking Changes Applied
To ensure that your incremental updates are working as expected, it’s essential to track and log the changes applied to your target tables. This can be done by maintaining control tables or logs that record the status of each update operation. These logs can store details such as the number of rows processed, the time the update occurred, and any errors that were encountered.
For example, you can create a simple audit table to track update operations:
CREATE TABLE update_log (
update_id SERIAL PRIMARY KEY,
table_name TEXT,
rows_updated INT,
update_time TIMESTAMP DEFAULT NOW(),
status TEXT,
error_message TEXT
);
After each incremental update, you can insert a record into this log to track the operation:
INSERT INTO update_log (table_name, rows_updated, status)
VALUES ('target_table', :rows_updated, 'success');
If an error occurs, you can capture it and log the details:
INSERT INTO update_log (table_name, status, error_message)
VALUES ('target_table', 'failed', 'Error details...');
Data Validation and Consistency Checks
Beyond simply logging updates, you should also perform regular validation checks to ensure the correctness of the data. One approach is to compare record counts between the source and target tables to ensure they are in sync:
-- Count of records in the source table since last update
SELECT COUNT(*) FROM source_table WHERE updated_at > :last_update_time;
-- Count of records updated in the target table
SELECT COUNT(*) FROM target_table WHERE updated_at > :last_update_time;
If the counts don’t match, it could indicate a data inconsistency or a problem with the update process.
You can also compute checksums or hash values of critical columns to validate that the data was transferred without corruption:
-- Compute checksum on the source
SELECT MD5(ARRAY_AGG(column1 || column2)) AS checksum
FROM source_table WHERE updated_at > :last_update_time;
-- Compute checksum on the target
SELECT MD5(ARRAY_AGG(column1 || column2)) AS checksum
FROM target_table WHERE updated_at > :last_update_time;
If discrepancies are found, you can investigate and reprocess the affected batches. Regular consistency checks ensure that the target table accurately reflects the latest state of the source data.
Real-world Example: Applying Incremental Changes
Scenario Setup
Let’s take a practical example where you are managing sales transactions in a source table and need to keep an analytics table in sync for reporting purposes. The source table contains new and updated transactions, and the target table is an aggregated summary of sales per product.
- Source Table (sales_transactions): Contains individual transaction records with
transaction_id
,product_id
,amount
, andupdated_at
. - Target Table (product_sales): Aggregates total sales by product with
product_id
,total_sales
, andlast_updated
.
Step-by-step Guide
Design the source table:
Ensure the source table includes an updated_at
column to track when each transaction was last modified.
CREATE TABLE sales_transactions (
transaction_id SERIAL PRIMARY KEY,
product_id INT,
amount DECIMAL(10, 2),
updated_at TIMESTAMP DEFAULT NOW()
);
Write SQL for incremental updates:
Use SQL to identify the new or modified transactions since the last update. For example, select transactions that occurred in the last 24 hours:
SELECT product_id, SUM(amount) AS total_sales
FROM sales_transactions
WHERE updated_at > NOW() - INTERVAL '24 hours'
GROUP BY product_id;
Merge changes into the target table:
Use a MERGE
or UPSERT
query to update the product_sales
table with the latest totals. If the product exists, update its total_sales; otherwise, insert a new record:
MERGE INTO product_sales AS p
USING (SELECT product_id, SUM(amount) AS total_sales
FROM sales_transactions
WHERE updated_at > NOW() - INTERVAL '24 hours'
GROUP BY product_id) AS s
ON p.product_id = s.product_id
WHEN MATCHED THEN
UPDATE SET p.total_sales = p.total_sales + s.total_sales, p.last_updated = NOW()
WHEN NOT MATCHED THEN
INSERT (product_id, total_sales, last_updated)
VALUES (s.product_id, s.total_sales, NOW());
Handle batching and scheduling:
If the transaction volume is large, you can batch these updates by limiting the number of rows processed in each run. For example:
SELECT product_id, SUM(amount) AS total_sales
FROM sales_transactions
WHERE updated_at > :last_update_time
ORDER BY updated_at
LIMIT 1000
GROUP BY product_id;
Schedule this query to run periodically using a job scheduler like cron or an orchestration tool like Airflow to ensure the data stays up-to-date.
Real-World CDC Alternatives for Non-CDC Systems
Leveraging Triggers for Change Capture
In databases that support triggers but lack full CDC features, you can use triggers to manually implement a change tracking system. Triggers allow you to capture row-level changes on insert, update, or delete operations and store these changes in a separate table.
For example, in PostgreSQL, you can create a trigger to capture changes:
CREATE TABLE change_log (
id SERIAL PRIMARY KEY,
table_name TEXT,
operation_type TEXT,
record_id INT,
old_data JSONB,
new_data JSONB,
change_time TIMESTAMP DEFAULT NOW()
);
CREATE OR REPLACE FUNCTION log_changes()
RETURNS TRIGGER AS $$
BEGIN
INSERT INTO change_log (table_name, operation_type, record_id, old_data, new_data)
VALUES (TG_TABLE_NAME, TG_OP, NEW.id, OLD, NEW);
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER capture_changes
AFTER INSERT OR UPDATE OR DELETE
ON your_table
FOR EACH ROW
EXECUTE FUNCTION log_changes();
This approach simulates CDC by writing changes to a log table. You can then process this change log periodically to apply updates to your target systems.
Using External CDC Tools
In cases where implementing your own CDC system becomes too complex or resource-intensive, third-party CDC tools like Debezium can provide a reliable solution. Debezium, for instance, is an open-source platform that captures database changes and publishes them as events in Kafka, allowing you to stream changes to other systems in near-real-time.
Debezium supports databases like MySQL, PostgreSQL, and MongoDB and can track insert, update, and delete operations via the database’s binlog or equivalent. This tool can be particularly useful when scaling up your CDC needs across multiple systems.
Batch Processing with ETL Pipelines
Another alternative is to simulate CDC through traditional ETL (Extract, Transform, Load) pipelines. Many ETL tools allow you to set up incremental data loads where only the changes since the last load are processed. This approach might not provide real-time changes, but it can work well for batch processing use cases.
Tools like Apache NiFi, Airflow, and Talend allow you to build robust ETL workflows that can efficiently handle incremental updates. You can configure them to read from source tables based on timestamps or other tracking columns and apply those changes to target systems.
This approach is often more suitable for less frequent updates or larger datasets where near-real-time processing is not necessary.
Conclusion
Summary of Key Takeaways
In systems that don’t have native CDC (Change Data Capture) capabilities, it is still possible to design a process for capturing and applying incremental updates efficiently. By carefully structuring your tables with timestamps, version columns, or history tables, you can track changes without requiring full table scans. Writing efficient SQL for merging, batching, and scheduling updates ensures that the changes are applied in a scalable and reliable manner.
Key points to remember:
- Design your tables to support change tracking by using columns like
updated_at
,created_at
, andversion
. - Use SQL strategies like
MERGE
orUPSERT
to apply changes to your target tables, minimizing resource consumption. - Batch your updates to avoid overwhelming your database and schedule these operations for optimal performance.
- Monitor and validate changes to ensure your target data remains consistent and correct.
Further Improvements
While the approaches covered in this blog offers practical solutions for implementing CDC in systems without built-in features, there are always opportunities for further optimization:
- Adopting middleware or third-party tools: Tools like Debezium or Apache Kafka can provide change capture capabilities even for systems without native CDC support.
- Moving towards CDC-enabled databases: As data needs grow, switching to databases with native CDC features can offer better scalability and reliability.
- Implementing more advanced validation mechanisms: Consider using more sophisticated data quality tools or building in redundancy checks for mission-critical data.
With these principles, you can handle incremental updates in a variety of systems, helping to synchronize your data with efficiency and reliability.