Debezium日常分享系列之:在 Kubernetes 上部署 Debezium
- 先决条件
- 步骤
- 部署数据源 (MySQL)
- 登录 MySQL db
- 将数据插入其中
- 部署 Kafka
- 部署 kafdrop
- 部署 Debezium 连接器
- 创建 Debezium 连接器
Debezium 可以无缝部署在 Kubernetes(一个用于容器编排的开源平台)上。此部署利用了 Strimzi 项目,该项目通过自定义资源简化了 Kubernetes 上 Kafka Connect 和连接器的部署。
先决条件
- 一个正在运行的 Kubernetes 集群(本演示将使用 minikube)
- kubectl
- helm
步骤
使用 registry 启动 minikube 集群
minikube start --insecure-registry "10.0.0.0/24"
启用注册表插件
minikube addons enable registry
部署 OLM(操作员生命周期管理器)
git clone https://github.com/operator-framework/operator-lifecycle-manager.gitcd operator-lifecycle-manager/deploy/charthelm install olm .
检查operator-lifecycle-manager命名空间中的所有pod是否处于运行状态。
应用以下清单来部署 strimzi kafka 操作员
cat > strimzi-kafka-operator.yaml <<EOF
apiVersion: operators.coreos.com/v1alpha1
kind: Subscription
metadata:name: strimzi-kafka-operatornamespace: operators
spec:channel: stablename: strimzi-kafka-operatorsource: operatorhubio-catalogsourceNamespace: operator-lifecycle-manager # this should be same as the namespace in which olm is deployed
EOF
- kind: 指定资源类型,这里是
Subscription
。Subscription
资源用于订阅一个 Operator,以便在集群中自动更新和管理该 Operator。 - spec:
- channel: 订阅的频道,这里是
stable
。频道通常用于指定不同版本的 Operator。 - name: 要订阅的 Operator 的名称,这里是
strimzi-kafka-operator
。 - source: Operator Catalog 的名称,这里是
operatorhubio-catalog
。Catalog 是一个包含多个 Operator 的集合。 - sourceNamespace: Catalog 所在的命名空间,这里是
operator-lifecycle-manager
。这个命名空间通常是 Operator Lifecycle Manager (OLM) 安装的命名空间。
- channel: 订阅的频道,这里是
kubectl apply -f strimzi-kafka-operator.yaml
检查 Operator Pod 在 Operator 命名空间中是否处于运行状态。启动并运行 Pod 可能需要几分钟时间。
部署数据源 (MySQL)
helm repo add bitnami https://charts.bitnami.com/bitnamihelm repo update # required if above repo is already addedkubectl create ns dbcat > mysql-values.yaml <<EOF
auth:rootPassword: "root"database: "debezium_db"username: "mysql_usr"password: "mysql_pwd"
EOFhelm install -n db mysql bitnami/mysql --version 12.2.2 -f mysql-values.yaml
helm repo add bitnami https://charts.bitnami.com/bitnami
命令解释:这个命令将Bitnami Helm仓库添加到你的Helm客户端中。Bitnami是一个提供高质量、预构建的Kubernetes应用包的公司
注意:此图表中已启用 bin 日志,这是 Debezium 所必需的。
检查 mysql pod 是否在 db 命名空间中启动并运行。
登录 MySQL db
MYSQL_ROOT_PASSWORD=$(kubectl get secret --namespace db mysql -o jsonpath="{.data.mysql-root-password}" | base64 -d)kubectl run mysql-client --rm --tty -i --restart='Never' --image docker.io/bitnami/mysql:8.4.4-debian-12-r0 --namespace db --env MYSQL_ROOT_PASSWORD=$MYSQL_ROOT_PASSWORD --command -- bashmysql -h mysql.db.svc.cluster.local -uroot -p"$MYSQL_ROOT_PASSWORD"# giving permissions to mysql_usr to allow reading the bin_logs properly
GRANT RELOAD, FLUSH_TABLES ON *.* TO 'mysql_usr'@'%';
FLUSH PRIVILEGES;GRANT REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'mysql_usr'@'%';
FLUSH PRIVILEGES;
kubectl get secret --namespace db mysql
:从Kubernetes的db
命名空间中获取名为mysql
的Secret对象。-o jsonpath="{.data.mysql-root-password}"
:使用JSONPath表达式从Secret对象中提取mysql-root-password
字段的数据。| base64 -d
:将提取出的Base64编码的字符串解码为明文。kubectl run mysql-client
:创建一个名为mysql-client
的Pod。--rm
:Pod运行结束后自动删除。--tty -i
:分配一个伪终端并保持输入交互。--restart='Never'
:设置Pod的重启策略为不重启。--image docker.io/bitnami/mysql:8.4.4-debian-12-r0
:使用指定的Docker镜像。--namespace db
:在db
命名空间中创建Pod。--env MYSQL_ROOT_PASSWORD=$MYSQL_ROOT_PASSWORD
:将环境变量MYSQL_ROOT_PASSWORD
设置为之前获取的root密码。--command -- bash
:在Pod中启动一个Bash shell。
将数据插入其中
-- Create the database if it doesn't exist
CREATE DATABASE IF NOT EXISTS debezium_db;-- Use the database
USE debezium_db;-- Create a sample table
CREATE TABLE employees (id INT AUTO_INCREMENT PRIMARY KEY,first_name VARCHAR(50) NOT NULL,last_name VARCHAR(50) NOT NULL,email VARCHAR(100) UNIQUE NOT NULL,salary DECIMAL(10,2) NOT NULL,created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);-- Insert sample data
INSERT INTO employees (first_name, last_name, email, salary) VALUES
('John', 'Doe', 'john.doe@example.com', 60000.00),
('Jane', 'Smith', 'jane.smith@example.com', 75000.00),
('Alice', 'Johnson', 'alice.johnson@example.com', 82000.00),
('Bob', 'Williams', 'bob.williams@example.com', 50000.00);
部署 Kafka
kubectl create ns kafkacat > kafka-values.yaml <<EOF
listeners:client:protocol: PLAINTEXTcontroller:protocol: PLAINTEXTinterbroker:protocol: PLAINTEXTexternal:protocol: PLAINTEXT
sasl:enabledMechanisms: PLAIN
broker:replicaCount: 1
controller:replicaCount: 1
EOFhelm install kafka -n kafka bitnami/kafka --version 31.3.1 -f kafka-values.yaml
注意:上述 Kafka 配置是为了简化演示,禁用了 Kafka Broker 和 Controller 的所有身份验证机制。不建议在生产环境中禁用身份验证运行 Kafka。
请检查 Kafka Broker 和 Controller Pod 是否在 Kafka 命名空间中启动并运行。
部署 kafdrop
安装 Kafka UI,用于查看 Kafka 主题和浏览消费者组。
helm repo add lsst-sqre https://lsst-sqre.github.io/charts/helm repo update # required if above repo is already addedcat > kafdrop-values.yaml <<EOF
kafka:brokerConnect: kafka:9092
EOFhelm install -n kafka kafdrop lsst-sqre/kafdrop --version 0.1.3 -f kafdrop-values.yaml# port forward kafdrop to localhost:9000
kubectl port-forward -n kafka svc/kafdrop 9000:9000
部署 Debezium 连接器
要部署 Debezium 连接器,您需要先部署一个包含所需连接器插件的 Kafka Connect 集群,然后再实例化实际的连接器本身。第一步,需要创建一个包含该插件的 Kafka Connect 容器镜像。
创建 Kafka Connect 集群
运行以下命令获取 minikube 注册表的 IP:kubectl -n kube-system get svc registry -o jsonpath=‘{.spec.clusterIP}’,并将其替换到下面的 kafkaconnect 清单中。
get svc registry
: 这部分命令用来获取名为registry
的服务的信息。svc
是service
的缩写,代表服务资源。-o jsonpath='{.spec.clusterIP}'
: 这个选项指定输出格式为 JSONPath 表达式的结果。JSONPath 是一种查询 JSON 数据的语法,类似于 XPath 用于 XML。这里的{.spec.clusterIP}
表示从返回的服务对象中提取spec.clusterIP
字段的值,即该服务的 ClusterIP 地址。
kubectl create ns debeziumcat > kafka-connect.yaml <<EOF
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:name: debezium-connect-clusterannotations:strimzi.io/use-connector-resources: "true"namespace: debezium
spec:version: 3.8.0replicas: 1bootstrapServers: kafka.kafka.svc.cluster.local:9092 # kafka broken endpointconfig:config.providers: secretsconfig.providers.secrets.class: io.strimzi.kafka.KubernetesSecretConfigProvidergroup.id: connect-clusteroffset.storage.topic: connect-cluster-offsetsconfig.storage.topic: connect-cluster-configsstatus.storage.topic: connect-cluster-status# -1 means it will use the default replication factor configured in the brokerconfig.storage.replication.factor: -1offset.storage.replication.factor: -1status.storage.replication.factor: -1build:output:type: dockerimage: <TO_BE_REPLACED_BY_ABOVE_CLUSTER_IP>/debezium-connect-mysql:latestplugins:- name: debezium-mysql-connectorartifacts:- type: tgzurl: https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/3.0.7.Final/debezium-connector-mysql-3.0.7.Final-plugin.tar.gz
EOFkubectl apply -f kafka-connect.yaml
build
: 配置了构建过程:output.type
和output.image
: 指定了 Docker 镜像的输出类型和名称。plugins
: 定义了要安装的插件,这里是 Debezium MySQL 连接器,指定了插件的下载 URL。
注意:在上述配置中,如果您希望将镜像推送到 ECR/GCR 或任何其他镜像仓库,请将镜像端点替换为相应的端点,并且集群应具有将镜像推送到该仓库的权限。
检查 Kafka 连接是否已就绪。可能需要几分钟(4-5 分钟)才能进入就绪状态。
kubectl get kafkaconnect -n debezium
它应该返回状态 Ready: True
NAME DESIRED REPLICAS READY
debezium-connect-cluster 1 True
在 Kafdrop 中,您应该能够看到 3 个主题:
connect-cluster-configs
connect-cluster-offsets
connect-cluster-status
创建 Debezium 连接器
在创建连接器之前,我们需要创建一个 k8s secret(用于存储数据库凭据)和 rbac。
cat > mysql-creds.yaml <<EOF
apiVersion: v1
kind: Secret
metadata:name: mysql-credsnamespace: debezium
type: Opaque
stringData:username: mysql_usrpassword: mysql_pwd
EOFkubectl apply -f mysql-creds.yamlcat > role.yaml <<EOF
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:name: connector-configuration-rolenamespace: debezium
rules:- apiGroups: [""]resources: ["secrets"]resourceNames: ["mysql-creds"]verbs: ["get"]
EOFkubectl apply -f role.yamlcat > role-binding.yaml <<EOF
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:name: connector-configuration-role-bindingnamespace: debezium
subjects:- kind: ServiceAccountname: debezium-connect-cluster-connectnamespace: debezium
roleRef:kind: Rolename: connector-configuration-roleapiGroup: rbac.authorization.k8s.io
EOFkubectl apply -f role-binding.yaml
通过登录 MySQL 获取 MySQL server_id(按照与上述相同的步骤)
SELECT @@server_id; # it is expected to be 1
部署 kafka 连接器
cat > kafka-connector.yaml <<'EOF'
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:name: debezium-connector-mysqlnamespace: debeziumlabels:strimzi.io/cluster: debezium-connect-cluster
spec:class: io.debezium.connector.mysql.MySqlConnectortasksMax: 1config:tasks.max: 1database.hostname: mysql.db.svc.cluster.local # mysql db hostnamedatabase.port: 3306database.user: ${secrets:debezium/mysql-creds:username}database.password: ${secrets:debezium/mysql-creds:password}database.server.id: 1 # SELECT @@server_id;topic.prefix: mysqldatabase.include.list: debezium_dbschema.history.internal.kafka.bootstrap.servers: kafka.kafka.svc.cluster.local:9092 # kafka broken endpointschema.history.internal.kafka.topic: schema-changes.debezium_db
EOFkubectl apply -f kafka-connector.yaml
检查 kafka 连接是否已就绪。可能需要几分钟才能进入就绪状态。
kubectl get kafkaconnector -n debezium
它应该返回状态 Ready: True
NAME CLUSTER CONNECTOR CLASS MAX TASKS READY
debezium-connector-mysql debezium-connect-cluster io.debezium.connector.mysql.MySqlConnector 1 True
现在,您应该能够在 kafdrop 中看到更多主题,例如 mysql.debezium_db.employees。
此主题将包含上面创建员工表时插入的所有数据。
为了测试 Debezium 连接器,请向表中添加更多数据。
-- Insert more sample data
INSERT INTO employees (first_name, last_name, email, salary) VALUES
('Charlie', 'Brown', 'charlie.brown@example.com', 72000.00),
('David', 'Miller', 'david.miller@example.com', 68000.00),
('Emma', 'Wilson', 'emma.wilson@example.com', 79000.00),
('Frank', 'Anderson', 'frank.anderson@example.com', 55000.00),
('Grace', 'Thomas', 'grace.thomas@example.com', 87000.00),
('Henry', 'Taylor', 'henry.taylor@example.com', 62000.00);
它应该反映在主题mysql.debezium_db.employees中。
这表明 MySQL 表员工中所做的更改已反映在 Kafka 中。