Real Time Data Streaming with Debezium and CDC(Change Data Capture)

Sefik Can Kanber
8 min readAug 9, 2024

--

In modern systems, Change Data Capture (CDC) allows us to capture database changes in real-time. These changes can then be used for various operations. Debezium is a powerful open-source tool that uses CDC to monitor database changes and turn them into event streams. In this article, we will look at what Debezium and CDC are, how they work, and their use cases.

What is Change Data Capture (CDC)?

CDC (Change Data Capture) is the process of detecting and recording changes in a database, such as inserts, updates, and deletes. These recorded changes are then transmitted to other systems or applications. CDC is important for data integration, synchronization, and analysis. Unlike traditional methods, CDC captures data changes instantly, allowing for real-time data streams.

Advantages of CDC:

  • Real-Time Data Updates: Captures and processes data changes instantly, keeping data up-to-date.
  • Minimal Performance Impact: Has little effect on database performance and adds minimal overhead.
  • Flexibility and Scalability: Facilitates data integration between systems and can handle large data sets effectively.

Disadvantages of CDC:

  • Complexity: Implementing CDC can be complex, especially with multiple databases and diverse data sources. It requires significant effort and expertise.
  • Resource Consumption: While it aims to minimize performance impact, CDC still uses system resources like CPU, memory, and disk I/O for monitoring and processing changes.
  • Latency: There might be a slight delay between the actual data change and its capture and propagation, which can be an issue for systems needing ultra-low latency.
  • Initial Setup Time: Setting up CDC involves configuring both source databases and target systems, and ensuring the infrastructure can handle the data load, which can be time-consuming.
  • Handling Schema Changes: Managing changes in the database schema (like adding or modifying tables and columns) can be challenging and may lead to data inconsistencies or interruptions.
  • Data Consistency: Ensuring consistent data across different systems can be difficult, especially in distributed environments. Issues like out-of-order events or partial updates need careful handling.
  • Security Concerns: CDC involves accessing and monitoring database logs, which can introduce security risks. Proper access controls and encryption are needed to protect sensitive data.
  • Cost: Implementing and maintaining CDC can be costly, including licensing fees, additional hardware or cloud services, and operational costs for monitoring and managing the system.

What is Debezium?

Debezium is an open-source tool that detects changes in databases and sends these changes to event streaming systems like Kafka, creating real-time data streams. It is built on Apache Kafka and works with Kafka Connect. Debezium has connectors for various databases, including PostgreSQL, MySQL, and MongoDB.

Main Features of Debezium:

  • Real-Time Data Capture: Monitors database logs to instantly detect changes.
  • Reliability: Ensures operations continue without data loss, even if the system crashes.
  • Easy Integration: Easily integrates with various databases and systems.
  • Transparency: Offers full visibility into how data changes are monitored and recorded.

Debezium Use Cases:

  • Real-Time Analytics: Creates real-time analytics applications by processing data changes instantly.
  • Event-Driven Architectures: Helps build reactive applications with event-driven systems based on data changes.
  • Data Replication: Simplifies data backup and scaling by syncing data across different stores.
  • Audit Logging: Supports auditing by maintaining detailed records of data changes.

Adding a Connector to Debezium

To add a new connector to Debezium using the REST API, follow these steps:

  1. Ensure Debezium is Running: Make sure Debezium, along with Kafka and Kafka Connect, is up and running.
  2. Prepare the Configuration: Create a JSON configuration file for the connector, including details like the database connection URL, user credentials, and other necessary settings.
  3. Use the REST API: Send a POST request to the Kafka Connect REST API to add the new connector. You can use tools like curl, Postman, or any HTTP client library to do this.

Here’s an example using curl :

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d '{
"name": "your-connector-name",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"database.hostname": "your-database-host",
"database.port": "5432",
"database.user": "your-database-user",
"database.password": "your-database-password",
"database.dbname": "your-database-name",
"database.server.name": "your-server-name",
"table.include.list": "public.your_table",
"database.history.kafka.bootstrap.servers": "your-kafka-bootstrap-servers",
"database.history.kafka.topic": "your-history-topic"
}
}'

Steps in Detail

  1. Configuration Details:
  • connector.class: This specifies the Debezium connector class to use. For PostgreSQL, it is io.debezium.connector.postgresql.PostgresConnector.
  • tasks.max: Maximum number of tasks to use for this connector. Typically set to 1.
  • database.hostname: Hostname of your PostgreSQL database.
  • database.port: Port of your PostgreSQL database, usually 5432.
  • database.user: Username to connect to the database.
  • database.password: Password for the database user.
  • database.dbname: Name of the database to connect to.
  • database.server.name: Logical name for the server, used to namespace all tables/databases.
  • table.include.list: A comma-separated list of fully-qualified tables to capture changes for.
  • database.history.kafka.bootstrap.servers: Kafka bootstrap servers to use for storing database history.
  • database.history.kafka.topic: Kafka topic to use for storing database history.

2. Sending the Request:

  • Use the curl command (or other HTTP tools) to send the configuration to the Kafka Connect REST API.
  • Replace placeholders with your actual database and Kafka configuration details.

3. Verify Connector Status:

  • After sending the request, you can check the status of your connector by sending a GET request to the /connectors/{connector-name}/status endpoint:
curl -X GET http://localhost:8083/connectors/your-connector-name/status

This will provide information about whether the connector is running successfully or if there are any errors. By following these steps, you can add a new connector to Debezium and start capturing changes from your PostgreSQL database.

Sample Project

In this section, I will explain Debezium with a simple Go application. Since the application only performs a basic creation process, I won’t go into detail on that part.

First, let’s start a Go module and add the necessary dependencies.

mkdir GoDebezium
cd GoDebezium
go mod init github.com/sefikcan/godebezium
go get github.com/go-playground/validator/v10@v10.22.0
go get github.com/labstack/echo/v4@v4.12.0
go get github.com/pkg/errors@v0.9.1
go get github.com/spf13/viper@v1.19.0
go get go.uber.org/zap@v1.27.0
go get gorm.io/driver/postgres@v1.5.9
go get gorm.io/gorm@v1.25.11

We have just one entity here, Product, which will be used to store product information. Let’s create this Product entity as follows.

type Product struct {
Id int `gorm:"primary_key" json:"id"`
Name string `gorm:"index:idx_name,unique" json:"name"`
BaseCost float64 `json:"baseCost"`
ProductType string `json:"productType"`
AdditionalKwhCost float64 `json:"additionalKwhCost"`
IncludedKwh float64 `json:"includedKwh"`
}

Next, we add the handler for creating a product.

func (p productHandlers) Create() echo.HandlerFunc {
return func(e echo.Context) error {
productRequest := product.CreateProductRequest{}
if err := e.Bind(&productRequest); err != nil {
return e.JSON(http.StatusBadRequest, util.NewHttpResponse(http.StatusBadRequest, strings.ToLower(err.Error()), nil))
}

createdProduct, err := p.productUseCase.Create(context.Background(), productRequest)
if err != nil {
return e.JSON(http.StatusInternalServerError, util.NewHttpResponse(http.StatusInternalServerError, strings.ToLower(err.Error()), nil))
}

return e.JSON(http.StatusCreated, createdProduct)

Under the deployments folder, you'll find the dependencies for Debezium processes and the docker-compose configuration required to set up the project. You can add the necessary information for the project using the following Docker Compose configuration. One important detail to note is that the wal_level value for PostgreSQL should be set when the container is started. It can also be added later if needed.

You can run our compose file with the following command:

docker-compose up -d
version: '3.4'

services:
tariffdb:
image: postgres
container_name: tariffdb
environment:
- POSTGRES_DB=tariff
- POSTGRES_USER=tariff_user
- POSTGRES_PASSWORD=password
ports:
- "5432:5432"
command:
- "postgres"
- "-c"
- "wal_level=logical"

zookeeper:
image: debezium/zookeeper
ports:
- "2181:2181"
- "2888:2888"
- "3888:3888"

kafka:
image: debezium/kafka
ports:
- "9092:9092"
- "29092:29092"
depends_on:
- zookeeper
environment:
- ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_ADVERTISED_LISTENERS=LISTENER_EXT://localhost:29092,LISTENER_INT://kafka:9092
- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=LISTENER_INT:PLAINTEXT,LISTENER_EXT:PLAINTEXT
- KAFKA_LISTENERS=LISTENER_INT://0.0.0.0:9092,LISTENER_EXT://0.0.0.0:29092
- KAFKA_INTER_BROKER_LISTENER_NAME=LISTENER_INT

kafka-ui:
image: provectuslabs/kafka-ui
container_name: kafka-ui
ports:
- "8090:8090"
restart: always
environment:
- SERVER_PORT=8090
- KAFKA_CLUSTERS_0_NAME=localhost
- KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka:9092

connect:
image: debezium/connect
ports:
- "8083:8083"
environment:
- BOOTSTRAP_SERVERS=kafka:9092
- GROUP_ID=1
- CONFIG_STORAGE_TOPIC=my_connect_configs
- OFFSET_STORAGE_TOPIC=my_connect_offsets
- STATUS_STORAGE_TOPIC=my_connect_statuses
depends_on:
- zookeeper
- kafka

volumes:
postgres-data:

After installing the necessary dependencies, we can configure and run the connector that facilitates communication between PostgreSQL and Debezium as follows. Here, I’ll proceed with making the connector’s REST request.

###

# curl -X DELETE http://localhost:8083/connectors/my-tariff-connector
#curl -X POST -H "Accept: application/json" -H "Content-Type: application/json" http://localhost:8083/connectors/ -d '{
# "name": "my-tariff-connector",
# "config": {
# "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
# "database.hostname": "host.docker.internal",
# "database.port": "5432",
# "database.user": "tariff_user",
# "database.password": "password",
# "database.dbname": "tariff",
# "database.server.id": "1234",
# "database.whitelist": "tariff",
# "database.history.kafka.bootstrap.servers": "kafka:9092",
# "database.history.kafka.topic": "debezium-history-topic",
# "topic.prefix": "pg_tariff",
# "table.whitelist": "public.products",
# "decimal.handling.mode": "double",
# "plugin.name": "pgoutput",
# "include.schema.changes": "false"
# }
#}'
POST http://localhost:8083/connectors/
Accept: application/json
Content-Type: application/json

{
"name": "my-tariff-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "tariffdb",
"database.port": "5432",
"database.user": "tariff_user",
"database.password": "password",
"database.dbname": "tariff",
"database.server.id": "1234",
"database.whitelist": "tariff",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "debezium-history-topic",
"topic.prefix": "pg_tariff",
"table.whitelist": "public.products",
"decimal.handling.mode": "double",
"plugin.name": "pgoutput",
"include.schema.changes": "false"
}
}

###

Everything is set up now. We can send our request and verify that Debezium successfully transfers information from PostgreSQL to the Kafka topic we specified.

curl -XPOST -H "Content-Type: application/json" http://localhost:5000/api/v1/products -d '{"additionalKwhCost": 10,
"baseCost": 10,
"includedKwh": 10,
"name": "efsaddases",
"productType": "string"}'

Kafka topic message

{
"schema": {
"type": "struct",
"fields": [
{
"type": "struct",
"fields": [
{
"type": "int64",
"optional": false,
"default": 0,
"field": "id"
},
{
"type": "string",
"optional": true,
"field": "name"
},
{
"type": "double",
"optional": true,
"field": "base_cost"
},
{
"type": "string",
"optional": true,
"field": "product_type"
},
{
"type": "double",
"optional": true,
"field": "additional_kwh_cost"
},
{
"type": "double",
"optional": true,
"field": "included_kwh"
}
],
"optional": true,
"name": "pg_tariff.public.products.Value",
"field": "before"
},
{
"type": "struct",
"fields": [
{
"type": "int64",
"optional": false,
"default": 0,
"field": "id"
},
{
"type": "string",
"optional": true,
"field": "name"
},
{
"type": "double",
"optional": true,
"field": "base_cost"
},
{
"type": "string",
"optional": true,
"field": "product_type"
},
{
"type": "double",
"optional": true,
"field": "additional_kwh_cost"
},
{
"type": "double",
"optional": true,
"field": "included_kwh"
}
],
"optional": true,
"name": "pg_tariff.public.products.Value",
"field": "after"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "version"
},
{
"type": "string",
"optional": false,
"field": "connector"
},
{
"type": "string",
"optional": false,
"field": "name"
},
{
"type": "int64",
"optional": false,
"field": "ts_ms"
},
{
"type": "string",
"optional": true,
"name": "io.debezium.data.Enum",
"version": 1,
"parameters": {
"allowed": "true,last,false,incremental"
},
"default": "false",
"field": "snapshot"
},
{
"type": "string",
"optional": false,
"field": "db"
},
{
"type": "string",
"optional": true,
"field": "sequence"
},
{
"type": "string",
"optional": false,
"field": "schema"
},
{
"type": "string",
"optional": false,
"field": "table"
},
{
"type": "int64",
"optional": true,
"field": "txId"
},
{
"type": "int64",
"optional": true,
"field": "lsn"
},
{
"type": "int64",
"optional": true,
"field": "xmin"
}
],
"optional": false,
"name": "io.debezium.connector.postgresql.Source",
"field": "source"
},
{
"type": "string",
"optional": false,
"field": "op"
},
{
"type": "int64",
"optional": true,
"field": "ts_ms"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "id"
},
{
"type": "int64",
"optional": false,
"field": "total_order"
},
{
"type": "int64",
"optional": false,
"field": "data_collection_order"
}
],
"optional": true,
"name": "event.block",
"version": 1,
"field": "transaction"
}
],
"optional": false,
"name": "pg_tariff.public.products.Envelope",
"version": 1
},
"payload": {
"before": null,
"after": {
"id": 1,
"name": "efsaddases",
"base_cost": 10.0,
"product_type": "string",
"additional_kwh_cost": 10.0,
"included_kwh": 10.0
},
"source": {
"version": "2.5.0.Final",
"connector": "postgresql",
"name": "pg_tariff",
"ts_ms": 1723183786404,
"snapshot": "false",
"db": "tariff",
"sequence": "[null,\"26767192\"]",
"schema": "public",
"table": "products",
"txId": 749,
"lsn": 26767192,
"xmin": null
},
"op": "c",
"ts_ms": 1723183786697,
"transaction": null
}
}

Yes, everything went as expected. Now we can take this data and process it as needed with the consumer we will create.

Summary

Debezium is a powerful tool for monitoring database changes in real-time and reliably transmitting these changes to other systems. By creating real-time data streams using Change Data Capture (CDC), Debezium enables businesses to manage and analyze their data more efficiently. Systems built with Debezium offer high reliability, flexibility, and scalability, meeting the demands of modern data management.

--

--