Debezium日常分享系列之:在 Kubernetes 中使用 Debezium 的 CDC
- 架构
- 源数据库
- 创建数据库凭证密钥
- Debezium 自定义镜像
- 构建并推送镜像
- Kafka Connect 集群
- Debezium Postgres 连接器
- Debezium 创建的 Kafka 主题
Debezium 是一个开源的分布式变更数据捕获 (CDC) 平台。Debezium 充当实时 CDC 引擎,实时捕获插入、更新和删除操作。
Debezium 提供多种部署选项,但使用 Kafka Connect 尤其有利,尤其是在数据复制方面。本文介绍此方法,因为它可以将现有数据和变更数据捕获 (CDC) 从源数据库无缝传输到 Kafka 主题中。从那里,数据可以高效地复制到目标数据库。通过将 Kafka Connect 与 Debezium 结合使用,组织可以实现强大且可扩展的数据复制解决方案,从而有效地跨不同系统同步数据。
本演示探讨了如何在 Kubernetes 环境中部署 Debezium,以从本地 Postgres 数据库捕获 CDC 数据,并将更改传输到 Kafka 主题。
使用 Strimzi 在 Kubernetes 中部署 Kafka。此 Kafka 集群将用于部署 Kafka Connect 并使用 Debezium。
架构
Kafka Connect 是一个框架和运行时环境,用于促进 Kafka 生态系统内的数据移动。它支持两项关键功能:
- 源连接器:这些连接器(例如 Debezium)支持将来自各种源的记录传输到 Kafka 主题中。
- 接收器连接器:相反,接收器连接器将记录从 Kafka 主题传输到其他目标系统。
Kafka Connect 作为 Kafka Broker 的独立服务运行。
默认情况下,数据库表的更改会被发送到以表本身命名的 Kafka 主题。但是,Debezium 允许通过配置调整实现灵活的主题路由。例如,用户可以:
- 将记录路由到名称与其来源表不同的主题。
- 将来自多个表的更改事件记录合并到一个主题中。
一旦更改事件记录驻留在 Apache Kafka 中,Kafka Connect 生态系统中的各种连接器就可以将它们流式传输到各种系统和数据库,包括 Elasticsearch、数据仓库、分析平台或像 Infinispan 这样的缓存解决方案。根据所选的接收器连接器,可能需要配置 Debezium 新的记录状态提取转换功能。
源数据库
有关所有支持的 Debezium 连接器,请参阅此官方文档 - https://debezium.io/documentation/reference/stable/connectors/index.html
本指南展示了如何使用托管在本地计算机上的 Postgres 数据库进行数据捕获。虽然 Postgres 也可以部署在 Kubernetes 内部,但您可以参考《Kubernetes (K8s) 上的 Postgres》指南。要访问 Kubernetes 集群外部(由 Docker Desktop 托管)的数据库,您可以将 localhost 替换为数据库主机名 host.docker.internal。通常,要从 Kubernetes 集群外部访问数据库,可以使用 ExternalName 等服务发现机制。
创建数据库凭证密钥
db-secret.yaml
apiVersion: v1
kind: Secret
metadata:name: debezium-secretnamespace: kafka
type: Opaque
stringData:username: dbzuserpassword: dbzpass
apiVersion: v1
: 这指定了使用的 API 版本。对于Secret
对象,v1
是一个常用版本。kind: Secret
: 指明这是一个Secret
类型的资源。metadata:
: 这个部分包含了描述Secret
的元数据。name: debezium-secret
: 指定Secret
的名称为debezium-secret
。namespace: kafka
: 指定Secret
所属的命名空间为kafka
。Kubernetes 中的资源可以通过命名空间进行隔离,以实现多租户管理。type: Opaque
: 指定Secret
的类型为Opaque
。Opaque
类型的Secret
通常用于存储任意的数据,如文本或二进制数据。stringData:
: 这是一个特殊的字段,允许你直接以字符串的形式提供数据,而不需要先对其进行 base64 编码。Kubernetes 会自动处理编码过程。username: dbzuser
: 指定用户名为dbzuser
。password: dbzpass
: 指定密码为dbzpass
。
创建一个 Role 来引用上述 secret,并创建一个 RoleBinding 将此角色绑定到 Kafka Connect 集群服务帐户,以便 Kafka Connect 可以访问该 secret。
db-rbac.yaml
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:name: connector-configuration-rolenamespace: kafka
rules:- apiGroups: [""]resources: ["secrets"]resourceNames: ["debezium-secret"]verbs: ["get"]
在Kubernetes中,Role
是一种资源,用于定义特定命名空间内的权限。你提供的 YAML 文件定义了一个名为 connector-configuration-role
的角色,该角色具有对特定资源的访问权限。下面是对这个 YAML 文件的逐行解释:
- apiVersion 指定了 Kubernetes API 的版本。
rbac.authorization.k8s.io/v1
表示这是 RBAC(Role-Based Access Control,基于角色的访问控制)API 的 v1 版本。 -
kind: Role
kind 指定了资源的类型。在这里,Role
表示这是一个角色资源。
-
metadata:
metadata 是一个包含元数据的字段,通常包括名称、标签等信息。name: connector-configuration-role
: 角色的名称为connector-configuration-role
。
namespace: kafka
: 角色所属的命名空间为kafka
。
-
rules:
rules 是一个列表,定义了该角色可以执行的操作和可以访问的资源。
-
- apiGroups: [""]resources: ["secrets"]resourceNames: ["debezium-secret"]verbs: ["get"]
这是一个规则条目,定义了具体的权限:
- apiGroups: 指定了 API 组。这里使用空字符串
""
表示核心 API 组。 - resources: 指定了资源类型。这里指定的是
secrets
,即 Kubernetes 中的秘密资源。 - resourceNames: 指定了具体资源的名称。这里指定的是
debezium-secret
,表示该角色只能访问名为debezium-secret
的秘密资源。 - verbs: 指定了允许的操作。这里指定的是
get
,表示该角色可以获取指定的秘密资源。
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:name: connector-configuration-role-bindingnamespace: kafka
subjects:- kind: ServiceAccountname: debezium-connect-cluster-connectnamespace: kafka
roleRef:kind: Rolename: connector-configuration-roleapiGroup: rbac.authorization.k8s.io
在Kubernetes中,RoleBinding
是一个资源对象,用于将特定的 Role
(角色)与一个或多个 Subject
(主体,如用户、组或服务账户)绑定在一起。这使得主体能够执行由该角色定义的操作。
- kind: RoleBinding这指定了资源的类型为
RoleBinding
,即角色绑定。 - 主体(Subjects)
-
- kind: ServiceAccount 指定了主体的类型为
ServiceAccount
(服务账户)。
- kind: ServiceAccount 指定了主体的类型为
- name: debezium-connect-cluster-connect这是服务账户的名称。
-
- 角色引用(RoleRef)
- kind: Role指定了要绑定的角色类型为
Role
- name: connector-configuration-role这是要绑定的角色的名称。
- apiGroup: rbac.authorization.k8s.io:这指定了角色所属的API组。
rbac.authorization.k8s.io
是RBAC API的组。
总结:
这个 RoleBinding
配置文件的作用是将名为 connector-configuration-role
的角色绑定到名为 debezium-connect-cluster-connect
的服务账户上,绑定操作发生在 kafka
命名空间中。这意味着 debezium-connect-cluster-connect
服务账户将具有 connector-configuration-role
角色所定义的所有权限。
Strimzi 在 Kafka Connect 部署期间会自动创建一个服务帐户。此服务帐户名称遵循特定格式:$KafkaConnectName-connect。由于我们将部署一个名为 debezium-connect-cluster 的 Kafka Connect 集群,因此相应的服务帐户名称为 debezium-connect-cluster-connect。
kubectl apply -f debezium/db-secret.yaml -n kafkakubectl apply -f debezium/db-rbac.yaml -n kafka
Debezium 自定义镜像
要部署 Debezium 连接器,首先必须设置一个包含所需连接器插件的 Kafka Connect 集群。此过程包括为 Kafka Connect 创建包含所需插件的 Strimzi 容器镜像,然后再实例化连接器本身。
请参阅官方文档下载连接器 - https://debezium.io/documentation/reference/stable/install.html
有关预安装的连接器镜像,请参阅 Debezium 官方镜像 - https://quay.io/organization/debezium
在本演示中,我将 Postgres 连接器下载到 debezium/plugins
注意:使用 TimestampConverter jar(来自 https://github.com/oryanmoshe/debezium-timestamp-converter)将所有时间数据类型(所有数据库中的)转换为您选择的指定格式。
默认值:
"timestampConverter.format.time": "HH:mm:ss.SSS",
"timestampConverter.format.date": "YYYY-MM-dd",
"timestampConverter.format.datetime": "YYYY-MM-dd'T'HH:mm:ss.SSS'Z'",
"timestampConverter.debug": "false"
当使用多个连接器时,必须将此 jar 单独添加到所有 Debezium 连接器中。
DockerfileDebezium
#DockerfileFROM quay.io/strimzi/kafka:0.40.0-kafka-3.7.0
USER root:root
COPY debezium/plugins /opt/kafka/plugins/
USER 1001
Dockerfile 是一个用于构建 Docker 镜像的脚本文件,其中包含了构建镜像所需的一系列命令。每个命令都会在镜像中创建一个新的层,使得最终的镜像可以被构建、分发和运行。
FROM
指令指定了基础镜像,这里是使用了 Strimzi 提供的 Kafka 镜像,版本号为0.40.0-kafka-3.7.0
。这意味着新的镜像是基于这个现有的 Kafka 镜像构建的,继承了它所有的配置和内容。USER
指令用来设置执行后续指令的用户。这里将用户设置为root:root
,意味着接下来的操作将以超级用户的权限进行。这通常是为了确保有足够权限来执行某些特定的任务,比如安装软件或修改系统文件。COPY
指令从本地文件系统复制文件到镜像中的指定路径。这里的命令表示将本地目录debezium/plugins
下的所有内容复制到镜像内的/opt/kafka/plugins/
目录下。这通常是用于添加应用程序所需的额外插件或依赖项。- USER 1001最后一条
USER
指令将用户切换回1001
。这通常是一个非特权用户,用于提高安全性,防止镜像以 root 用户运行时可能带来的安全风险。在生产环境中,尽量避免以 root 用户运行容器是一个好的实践。
总结来说,这段 Dockerfile 的主要目的是在基于 Strimzi Kafka 镜像的基础上,添加 Debezium 插件,并且确保这些操作是以 root 权限完成的,但在最终运行时切换到一个较低权限的用户以增加安全性。
构建并推送镜像
docker build -t osds-debezium -f debezium/DockerfileDebezium .
docker login
docker tag osds-debezium howdytech01/osds:osds-debezium
docker push howdytech01/osds:osds-debezium
docker build
: 这是用于构建 Docker 镜像的命令。-t osds-debezium
:-t
参数用于给构建的镜像打标签。这里的标签是osds-debezium
,这意味着构建完成后,镜像的名称将为osds-debezium
。-f debezium/DockerfileDebezium
:-f
参数指定要使用的Dockerfile
文件。在这个例子中,Dockerfile
文件位于debezium/
目录下,并且文件名为DockerfileDebezium
。.
: 这个点表示构建上下文目录,即 Docker 构建过程中会访问的文件和目录。在这里,当前目录 (.
) 是构建上下文。docker push
: 这个命令用于将镜像推送到 Docker 注册表(通常是 Docker Hub)。howdytech01/osds:osds-debezium
: 这是你希望推送的镜像的完整名称。推送成功后,其他用户可以通过这个名称从 Docker Hub 拉取该镜像。
Kafka Connect 集群
使用 Strimzi Operator 创建 Kafka Connect 集群。
debezium-connect-cluster.yaml
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:name: debezium-connect-clusterannotations:strimzi.io/use-connector-resources: "true"
spec:version: 3.7.0image: howdytech01/osds:osds-debeziumreplicas: 1bootstrapServers: osds-cluster-kafka-bootstrap:9092config:config.providers: secretsconfig.providers.secrets.class: io.strimzi.kafka.KubernetesSecretConfigProvidergroup.id: connect-clusteroffset.storage.topic: connect-cluster-offsetsconfig.storage.topic: connect-cluster-configsstatus.storage.topic: connect-cluster-status# -1 means it will use the default replication factor configured in the brokerconfig.storage.replication.factor: -1offset.storage.replication.factor: -1status.storage.replication.factor: -1
- kind 指定了资源的类型。在这里,
KafkaConnect
表示这是一个Kafka Connect集群的资源。 - image:
howdytech01/osds:osds-debezium
指定Kafka Connect使用的Docker镜像。 - replicas:
1
指定Kafka Connect集群的副本数。这里设置为1,表示只有一个Kafka Connect实例。 - config.providers:
secrets
配置提供者,这里使用了secrets
。 - config.providers.secrets.class:
io.strimzi.kafka.KubernetesSecretConfigProvider
配置提供者的类,这里指定了一个Kubernetes Secret配置提供者,用于从Kubernetes Secret中读取配置信息。 - group.id:
connect-cluster
指定Kafka Connect集群的组ID。
此配置设置了一个名为 debezium-connect-cluster 的 Kafka Connect 集群,具有特定的配置,并指向 Kafka 引导服务器进行连接。
kubectl apply -f debezium/debezium-connect-cluster.yaml -n kafka
注意:
配置指定将 bootstrapServers 设置为 osds-cluster-kafka-bootstrap:9092。这表明此处使用的是本演示中在 Kubernetes 中创建的 Strimzi Kafka 集群。
需要注意的是,我们已经配置了 Strimzi Secret 提供程序。该提供程序会自动为 Kafka Connect 集群生成一个服务帐号,该帐号已与必要的角色关联。此设置使 Kafka Connect 能够安全地访问包含敏感信息(例如数据库凭据)的 Secret 对象。
Debezium Postgres 连接器
请参阅此页面以在 Postgres 上设置并启用 CDC 日志记录 - https://debezium.io/documentation/reference/stable/connectors/postgresql.html
使用以下配置创建一个 KafkaConnector
postgres-connector.yaml
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:name: debezium-connector-postgreslabels:strimzi.io/cluster: debezium-connect-cluster
spec:class: io.debezium.connector.postgresql.PostgresConnectortasksMax: 1config:tasks.max: 1database.hostname: host.docker.internaldatabase.port: 5432database.user: ${secrets:kafka/debezium-secret:username}database.password: ${secrets:kafka/debezium-secret:password}database.dbname: movie_rental_dbtopic.prefix: movie_rental_dbplugin.name: pgoutputpublication.autocreate.mode: filteredtable.include.list: public.actorkey.converter.schemas.enable: falsevalue.converter.schemas.enable: falsesnapshot.mode: alwaysmessage.key.columns: public.actor:actor_idtransforms: unwraptransforms.unwrap.type: io.debezium.transforms.ExtractNewRecordStatetransforms.unwrap.add.fields: op:_meta_op,table:_meta_table,lsn:_meta_lsn,source.ts_ms:_meta_event_ts,schema:_meta_schematransforms.unwrap.add.headers: dbtransforms.unwrap.delete.handling.mode: rewritedelete.tombstone.handling.mode: rewriteconverters: timestampConverter,timestampConverter.type: oryanmoshe.kafka.connect.util.TimestampConverterkey.converter: org.apache.kafka.connect.json.JsonConvertervalue.converter: org.apache.kafka.connect.json.JsonConverter
kind: KafkaConnector
:kind 指定了资源的类型,这里是KafkaConnector
,表示这是一个Kafka连接器。- class: 连接器的类名,这里是
io.debezium.connector.postgresql.PostgresConnector
,表示这是一个用于PostgreSQL的Debezium连接器。 - tasksMax: 连接器的最大任务数,这里设置为
1
。 - tasks.max: 连接器的任务数,与
tasksMax
相同,设置为1
。 - database.hostname: PostgreSQL数据库的主机名,这里是
host.docker.internal
,通常表示Docker内部网络中的主机。 - database.port: PostgreSQL数据库的端口,这里是
5432
。 - database.user: 数据库用户名,使用了环境变量
${secrets:kafka/debezium-secret:username}
,表示从Kubernetes Secrets中获取用户名。 - database.password: 数据库密码,使用了环境变量
${secrets:kafka/debezium-secret:password}
,表示从Kubernetes Secrets中获取密码。 - database.dbname: 要连接的数据库名称,这里是
movie_rental_db
。 - topic.prefix: 生成的Kafka主题的前缀,这里是
movie_rental_db
。 - plugin.name: PostgreSQL的复制插件名称,这里是
pgoutput
。 - publication.autocreate.mode: 自动创建发布表的模式,这里是
filtered
,表示只包含指定的表。 - table.include.list: 需要捕获变化的表列表,这里是
public.actor
。 - key.converter.schemas.enable: 是否启用键的Schema转换,这里是
false
。 - value.converter.schemas.enable: 是否启用值的Schema转换,这里是
false
。 - snapshot.mode: 快照模式,这里是
always
,表示每次启动连接器时都进行全量快照。 - message.key.columns: 消息键的列,这里是
public.actor:actor_id
。 - transforms: 定义了一系列的转换操作。
- unwrap: 使用
io.debezium.transforms.ExtractNewRecordState
变换器来提取新记录的状态。 - add.fields: 添加额外的字段,例如
_meta_op
,_meta_table
,_meta_lsn
,_meta_event_ts
,_meta_schema
。 - add.headers: 添加额外的头信息,例如
db
。 - delete.handling.mode: 删除处理模式,这里是
rewrite
。 - delete.tombstone.handling.mode: 删除墓碑的处理模式,这里是
rewrite
。 - converters: 定义了自定义的转换器,这里是
timestampConverter
。 - timestampConverter.type: 自定义转换器的类名,这里是
oryanmoshe.kafka.connect.util.TimestampConverter
。 - key.converter: 键的转换器,这里是
org.apache.kafka.connect.json.JsonConverter
。 - value.converter: 值的转换器,这里是
org.apache.kafka.connect.json.JsonConverter
。
此配置设置了一个名为 debezium-connector-postgres 的 KafkaConnector,并具有用于连接 PostgreSQL 数据库和捕获 public.actor 表的更改的特定配置。
Debezium 创建的 Kafka 主题
从 Kafka 集群终端验证
kubectl exec -n kafka -i -t osds-cluster-kafka-0 -- /bin/bash
bin/kafka-topics.sh --list --bootstrap-server localhost:9092