一个棘手的问题摆在面前:核心业务系统由一个稳健的 Java 单体应用承载,数据持久化层深度依赖 MyBatis 操作 MySQL。与此同时,新成立的数据科学团队需要近乎实时地获取业务数据的变更,用于模型训练和实时看板。直接开放生产数据库的读权限是绝对无法接受的,这不仅会冲击主库性能,也带来了安全隐患。传统的定时批处理ETL任务,延迟太高,无法满足业务对“实时”的要求。
我们最初的构想是通过应用层发消息,即在 Java 代码中,每次数据库事务成功后,异步发送一条 Kafka 消息。这个方案很快被否决了。主要原因在于其侵入性太强,需要在几十个业务逻辑点修改代码,容易遗漏,且双写逻辑会引入数据不一致的风险。我们需要一个对现有 Java 应用零侵入的方案。
最终,我们把目光投向了基于数据库日志的变更数据捕获(Change Data Capture, CDC)。技术选型锁定在 Debezium + Kafka Connect 这一组合上。它直接读取 MySQL 的 binlog,将数据变更(INSERT, UPDATE, DELETE)转化为事件流推送到 Kafka 中。下游的任何系统,包括我们 Python 技术栈的数据平台,都可以订阅这个流来消费数据。
这个架构看起来很完美,但一个在真实项目中无法回避的问题浮出水面:Schema 演进。当上游的 Java 应用迭代,需要修改数据库表结构(比如增加一个字段、修改字段类型)时,这个数据管道会如何反应?下游的 Python 消费端会崩溃吗?如何构建一个能够优雅处理上游结构变更,具备韧性的跨语言数据管道?这才是本次实践的核心挑战。
架构概览
我们的目标是搭建一个从 Java/MyBatis 应用的 MySQL 数据库到 Python 消费者的单向实时数据流。
graph TD
subgraph "Java Application Stack"
A[Java App with MyBatis] --CRUD--> B[MySQL Database];
end
subgraph "CDC Infrastructure"
B --Reads binlog--> C[Debezium MySQL Connector];
C --Runs on--> D[Kafka Connect];
D --Publishes Avro records--> E[Apache Kafka];
D --Registers/Fetches schema--> F[Schema Registry];
end
subgraph "Python Consumer Stack"
G[Python Data Service] --Subscribes to--> E;
G --Fetches schema for deserialization--> F;
end
style A fill:#f9f,stroke:#333,stroke-width:2px
style B fill:#f9f,stroke:#333,stroke-width:2px
style G fill:#ccf,stroke:#333,stroke-width:2px
这里的关键组件是 Schema Registry。我们选择 Avro 作为消息序列化格式,而不是简单的 JSON。在真实项目中,这是一个至关重要的决策。JSON 缺乏强制的 Schema 约束,当上游数据结构变更时,下游消费者在运行时才会发现错误,极易导致生产故障。而 Avro 消息本身携带 Schema ID,消费者在反序列化时,会从 Schema Registry 获取对应的 Schema,从而实现安全的、向前或向后兼容的解析。
第一步:模拟源业务系统 (Java & MyBatis)
我们先构建一个简单的 Spring Boot + MyBatis 应用来模拟业务源头。它操作一张 customers 表。
Maven 依赖 (pom.xml)
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.5</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
<artifactId>cdc-source-app</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>cdc-source-app</name>
<properties>
<java.version>11</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>2.2.2</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
数据库表结构 (V1)
CREATE DATABASE cdc_demo;
USE cdc_demo;
CREATE TABLE customers (
id INT AUTO_INCREMENT PRIMARY KEY,
first_name VARCHAR(50) NOT NULL,
last_name VARCHAR(50) NOT NULL,
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
) ENGINE=InnoDB;
-- 确保MySQL开启binlog
-- 在 my.cnf 中配置:
-- server-id = 1
-- log_bin = /var/log/mysql/mysql-bin.log
-- binlog_format = ROW
-- binlog_row_image = FULL
MyBatis实体与Mapper
Customer.java:
package com.example.cdcsourceapp.model;
import lombok.Data;
import java.sql.Timestamp;
@Data
public class Customer {
private Integer id;
private String firstName;
private String lastName;
private Timestamp createdAt;
}
CustomerMapper.java:
package com.example.cdcsourceapp.mapper;
import com.example.cdcsourceapp.model.Customer;
import org.apache.ibatis.annotations.Insert;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Options;
import org.apache.ibatis.annotations.Update;
@Mapper
public interface CustomerMapper {
@Insert("INSERT INTO customers(first_name, last_name) VALUES(#{firstName}, #{lastName})")
@Options(useGeneratedKeys = true, keyProperty = "id")
void insert(Customer customer);
@Update("UPDATE customers SET first_name = #{firstName}, last_name = #{lastName} WHERE id = #{id}")
int update(Customer customer);
}
这个应用非常标准,代表了大量线上在跑的Java服务。
第二步:部署CDC基础设施
我们使用 docker-compose 来一键启动整个数据管道的后端服务,这在开发和测试阶段非常高效。
docker-compose.yml:
version: '3.7'
services:
mysql:
image: debezium/example-mysql:1.9
container_name: cdc-mysql
ports:
- "3306:3306"
environment:
- MYSQL_ROOT_PASSWORD=debezium
- MYSQL_USER=mysqluser
- MYSQL_PASSWORD=mysqlpw
- MYSQL_DATABASE=cdc_demo
zookeeper:
image: confluentinc/cp-zookeeper:7.2.1
container_name: cdc-zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka:
image: confluentinc/cp-kafka:7.2.1
container_name: cdc-kafka
ports:
- "9092:9092"
depends_on:
- zookeeper
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
schema-registry:
image: confluentinc/cp-schema-registry:7.2.1
container_name: cdc-schema-registry
ports:
- "8081:8081"
depends_on:
- kafka
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: kafka:29092
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
connect:
image: debezium/connect:1.9
container_name: cdc-connect
ports:
- "8083:8083"
depends_on:
- kafka
- schema-registry
environment:
BOOTSTRAP_SERVERS: kafka:29092
GROUP_ID: 1
CONFIG_STORAGE_TOPIC: my_connect_configs
OFFSET_STORAGE_TOPIC: my_connect_offsets
STATUS_STORAGE_TOPIC: my_connect_statuses
KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
KEY_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
# 这里的坑:要确保Kafka Connect容器可以解析到MySQL的主机名
# 使用 `extra_hosts` 在Docker网络中添加主机映射
extra_hosts:
- "host.docker.internal:host-gateway"
基础设施启动后,我们需要向 Kafka Connect 的 REST API 注册 Debezium MySQL 连接器。
register-mysql-connector.json:
{
"name": "customer-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "root",
"database.password": "debezium",
"database.server.id": "184054",
"database.server.name": "dbserver1",
"database.include.list": "cdc_demo",
"table.include.list": "cdc_demo.customers",
"database.history.kafka.bootstrap.servers": "kafka:29092",
"database.history.kafka.topic": "dbhistory.customers",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://schema-registry:8081",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"decimal.handling.mode": "double"
}
}
通过curl命令部署连接器:
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" \
localhost:8083/connectors/ -d @register-mysql-connector.json
此时,数据管道已准备就绪。dbserver1.cdc_demo.customers 这个 Kafka topic 将会接收所有 customers 表的变更事件。
第三步:初版Python消费者
现在,我们来编写第一个版本的 Python 消费者。它的任务是订阅 topic,解码 Avro 消息并打印出来。
consumer_v1.py:
import logging
from confluent_kafka.avro import AvroConsumer
from confluent_kafka.avro.serializer import SerializerError
# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
def consume_customer_events():
"""
一个健壮的 Kafka Avro 消费者
"""
conf = {
'bootstrap.servers': 'localhost:9092',
'group.id': 'python-customer-consumer-group-1',
'schema.registry.url': 'http://localhost:8081',
# 每次从头消费,便于调试
'auto.offset.reset': 'earliest'
}
consumer = AvroConsumer(conf)
topic = 'dbserver1.cdc_demo.customers'
consumer.subscribe([topic])
logging.info(f"Subscribed to topic: {topic}")
try:
while True:
# poll的超时设置很重要,避免CPU空转
msg = consumer.poll(1.0)
if msg is None:
continue
if msg.error():
logging.error(f"Consumer error: {msg.error()}")
# 对可恢复的错误可以添加重试逻辑
continue
# Debezium的事件是嵌套的
# 我们关心的是'after'字段中的数据
event_value = msg.value()
if event_value and 'after' in event_value:
customer_data = event_value['after']
# 假设我们只关心 first_name 和 last_name
# 这是一个常见的错误:代码与特定 schema 版本强耦合
first_name = customer_data.get('first_name', 'N/A')
last_name = customer_data.get('last_name', 'N/A')
customer_id = customer_data.get('id', -1)
logging.info(f"Received customer event [ID={customer_id}]: Name={first_name} {last_name}")
else:
# 这可能是DELETE事件,'after'为null
logging.warning(f"Received a non-update/create event or malformed message: {event_value}")
except SerializerError as e:
# Schema不匹配或无法解析时会抛出此异常
logging.critical(f"Message deserialization failed: {e}")
except KeyboardInterrupt:
logging.info("Consumer is shutting down...")
finally:
# 确保消费者被正确关闭
consumer.close()
if __name__ == '__main__':
consume_customer_events()
现在,我们在 Java 应用中插入一条数据:
// 在某个Service或Controller中
Customer c = new Customer();
c.setFirstName("John");
c.setLastName("Doe");
customerMapper.insert(c);
Python 消费端会立刻打印出日志:INFO: Received customer event [ID=1]: Name=John Doe
第四步:模拟Schema演进的危机
业务发展了,我们需要在 customers 表中增加一个 email 字段。这是一个极其常见的生产场景。
首先,在数据库中执行 DDL:
ALTER TABLE customers ADD COLUMN email VARCHAR(100) NULL;
然后,修改 Java 实体类和 MyBatis Mapper:
Customer.java (V2):
// ...
@Data
public class Customer {
// ...
private String email;
}
CustomerMapper.java (V2):
// ...
@Mapper
public interface CustomerMapper {
@Insert("INSERT INTO customers(first_name, last_name, email) VALUES(#{firstName}, #{lastName}, #{email})")
// ...
@Update("UPDATE customers SET first_name = #{firstName}, last_name = #{lastName}, email = #{email} WHERE id = #{id}")
// ...
}
重启 Java 应用后,我们插入一条带 email 的新数据:
Customer c = new Customer();
c.setFirstName("Jane");
c.setLastName("Smith");
c.setEmail("[email protected]");
customerMapper.insert(c);
发生了什么?
- Debezium 检测到了
ALTER TABLE操作。它将这个 DDL 更新到dbhistory.customerstopic 中。 - 当新的
INSERT发生时,Debezium 发现customers表的结构变了。 - 它会生成一个新的 Avro Schema,包含
id,first_name,last_name,created_at和新增的email字段。 - 然后,它将这个新 Schema 注册到 Schema Registry。Schema Registry 会给它分配一个新的版本号(比如 Version 2)。
- Debezium 使用这个新的 Schema(Version 2)序列化新数据,并将消息发送到 Kafka。消息中包含了指向 Schema Registry 中 Version 2 Schema 的ID。
我们的 consumer_v1.py 还能正常工作吗?会的。因为它使用的是 .get() 方法来安全地访问字典键。当新消息到达时,它会打印:INFO: Received customer event [ID=2]: Name=Jane Smith
它没有崩溃,但它静默地丢失了 email 这个新字段的信息。在很多业务场景下,这和系统崩溃一样致命。
第五步:构建具备Schema演进适应能力的消费者
一个生产级的消费者不能对 Schema 做出强假设。它应该能够动态地处理收到的数据。
consumer_v2.py:
import logging
from confluent_kafka.avro import AvroConsumer
from confluent_kafka.avro.serializer import SerializerError
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
def process_record(record: dict):
"""
一个通用的记录处理函数,它不关心具体的字段。
在真实项目中,这里可能是写入另一个数据库、调用API或进行数据转换。
"""
if not record or not isinstance(record, dict):
logging.warning("Received an empty or invalid record.")
return
# 动态地处理所有字段,而不是硬编码
customer_id = record.get('id', 'UNKNOWN_ID')
logging.info(f"--- Processing Customer Record [ID={customer_id}] ---")
for key, value in record.items():
logging.info(f" -> {key}: {value}")
logging.info("--- End of Record ---")
def consume_events_resiliently():
conf = {
'bootstrap.servers': 'localhost:9092',
'group.id': 'python-customer-consumer-group-2', # Use a new group id to re-process
'schema.registry.url': 'http://localhost:8081',
'auto.offset.reset': 'earliest'
}
consumer = AvroConsumer(conf)
topic = 'dbserver1.cdc_demo.customers'
consumer.subscribe([topic])
logging.info(f"Resilient consumer subscribed to topic: {topic}")
try:
while True:
msg = consumer.poll(1.0)
if msg is None:
continue
if msg.error():
logging.error(f"Consumer error: {msg.error()}")
continue
event_value = msg.value()
if event_value and 'after' in event_value and event_value['after'] is not None:
# 核心改进:将整个'after'块传递给处理函数
# 不再在消费循环中做任何关于字段的假设
process_record(event_value['after'])
else:
# 处理DELETE事件,'before'字段会包含被删除前的数据
if event_value and 'before' in event_value and event_value['before'] is not None:
deleted_id = event_value['before'].get('id', 'UNKNOWN_ID')
logging.warning(f"Detected DELETE event for customer ID: {deleted_id}")
else:
logging.warning(f"Received a non-data event: op={event_value.get('op', 'N/A')}")
except SerializerError as e:
logging.critical(f"Message deserialization failed. Schema evolution issue?: {e}")
except KeyboardInterrupt:
logging.info("Consumer is shutting down...")
finally:
consumer.close()
if __name__ == '__main__':
consume_events_resiliently()
当我们运行 consumer_v2.py 时,它会重新消费 Topic 中的所有消息。
对于第一条老数据,它会打印:
--- Processing Customer Record [ID=1] ---
-> id: 1
-> first_name: John
-> last_name: Doe
-> created_at: 1667814800000
--- End of Record ---
对于第二条包含新字段的数据,它会打印:
--- Processing Customer Record [ID=2] ---
-> id: 2
-> first_name: Jane
-> last_name: Smith
-> created_at: 1667814900000
-> email: [email protected]
--- End of Record ---
这个消费者版本成功地处理了两种不同版本的 Schema,没有任何代码修改。它的核心思想是将数据消费逻辑与数据处理逻辑解耦。消费者的责任是安全地、完整地接收数据,而下游的处理函数 process_record 则负责理解和使用这些数据。这种设计使得消费端对上游的非破坏性 Schema 变更(如添加可选字段)完全免疫。
方案的局限性与未来展望
这个方案优雅地解决了跨语言数据管道中由字段增加引起的 Schema 演进问题,但在生产环境中,还有更多需要考虑的边界情况。
首先,关于破坏性变更(Breaking Changes)。如果上游删除了一个字段,或者修改了字段类型(比如 INT 改为 STRING),下游消费者如果没有相应的兼容性策略,依然可能会在业务逻辑层面出错。Schema Registry 支持配置兼容性级别(如 BACKWARD, FORWARD, FULL),这可以在注册新 Schema 时强制执行规则,防止破坏性变更进入系统,但这也要求团队之间有良好的沟通和流程。
其次,是数据回填(Backfill)问题。当一个全新的消费服务上线时,它需要获取 customers 表的全量历史数据,然后再开始消费增量事件。Debezium 提供了快照(Snapshot)机制来解决这个问题,但这需要在连接器配置中精细调整,并考虑快照期间对源数据库的性能影响。
最后,是运维复杂性。我们引入了 Kafka, ZooKeeper, Schema Registry, Kafka Connect 等多个组件,这无疑增加了系统的运维成本和监控的复杂度。需要建立完善的监控告警体系,比如监控 Kafka Connect 的任务状态、Debezium 的复制延迟、Schema Registry 的可用性等。这些都是将方案从“可行”推向“生产级可靠”所必须跨越的鸿沟。