Debezium日常分享系列之:在 Kubernetes 上部署 Debezium

Debezium日常分享系列之:在 Kubernetes 上部署 Debezium

  • 先决条件
  • 步骤
  • 部署数据源 (MySQL)
  • 登录 MySQL db
  • 将数据插入其中
  • 部署 Kafka
  • 部署 kafdrop
  • 部署 Debezium 连接器
  • 创建 Debezium 连接器

Debezium 可以无缝部署在 Kubernetes(一个用于容器编排的开源平台)上。此部署利用了 Strimzi 项目,该项目通过自定义资源简化了 Kubernetes 上 Kafka Connect 和连接器的部署。

先决条件

  • 一个正在运行的 Kubernetes 集群(本演示将使用 minikube)
  • kubectl
  • helm

步骤

使用 registry 启动 minikube 集群

minikube start --insecure-registry "10.0.0.0/24"

启用注册表插件

minikube addons enable registry

部署 OLM(操作员生命周期管理器)

git clone https://github.com/operator-framework/operator-lifecycle-manager.gitcd operator-lifecycle-manager/deploy/charthelm install olm .

检查operator-lifecycle-manager命名空间中的所有pod是否处于运行状态。
在这里插入图片描述
应用以下清单来部署 strimzi kafka 操作员

cat > strimzi-kafka-operator.yaml <<EOF
apiVersion: operators.coreos.com/v1alpha1
kind: Subscription
metadata:name: strimzi-kafka-operatornamespace: operators
spec:channel: stablename: strimzi-kafka-operatorsource: operatorhubio-catalogsourceNamespace: operator-lifecycle-manager # this should be same as the namespace in which olm is deployed
EOF
  • kind: 指定资源类型,这里是 SubscriptionSubscription 资源用于订阅一个 Operator,以便在集群中自动更新和管理该 Operator。
  • spec:
    • channel: 订阅的频道,这里是 stable。频道通常用于指定不同版本的 Operator。
    • name: 要订阅的 Operator 的名称,这里是 strimzi-kafka-operator
    • source: Operator Catalog 的名称,这里是 operatorhubio-catalog。Catalog 是一个包含多个 Operator 的集合。
    • sourceNamespace: Catalog 所在的命名空间,这里是 operator-lifecycle-manager。这个命名空间通常是 Operator Lifecycle Manager (OLM) 安装的命名空间。
kubectl apply -f strimzi-kafka-operator.yaml

检查 Operator Pod 在 Operator 命名空间中是否处于运行状态。启动并运行 Pod 可能需要几分钟时间。

部署数据源 (MySQL)

helm repo add bitnami https://charts.bitnami.com/bitnamihelm repo update # required if above repo is already addedkubectl create ns dbcat > mysql-values.yaml <<EOF
auth:rootPassword: "root"database: "debezium_db"username: "mysql_usr"password: "mysql_pwd"
EOFhelm install -n db mysql bitnami/mysql --version 12.2.2 -f mysql-values.yaml
  • helm repo add bitnami https://charts.bitnami.com/bitnami
    命令解释:这个命令将Bitnami Helm仓库添加到你的Helm客户端中。Bitnami是一个提供高质量、预构建的Kubernetes应用包的公司

注意:此图表中已启用 bin 日志,这是 Debezium 所必需的。

检查 mysql pod 是否在 db 命名空间中启动并运行。

登录 MySQL db

MYSQL_ROOT_PASSWORD=$(kubectl get secret --namespace db mysql -o jsonpath="{.data.mysql-root-password}" | base64 -d)kubectl run mysql-client --rm --tty -i --restart='Never' --image  docker.io/bitnami/mysql:8.4.4-debian-12-r0 --namespace db --env MYSQL_ROOT_PASSWORD=$MYSQL_ROOT_PASSWORD --command -- bashmysql -h mysql.db.svc.cluster.local -uroot -p"$MYSQL_ROOT_PASSWORD"# giving permissions to mysql_usr to allow reading the bin_logs properly
GRANT RELOAD, FLUSH_TABLES ON *.* TO 'mysql_usr'@'%';
FLUSH PRIVILEGES;GRANT REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'mysql_usr'@'%';
FLUSH PRIVILEGES;
  • kubectl get secret --namespace db mysql:从Kubernetes的db命名空间中获取名为mysql的Secret对象。
  • -o jsonpath="{.data.mysql-root-password}":使用JSONPath表达式从Secret对象中提取mysql-root-password字段的数据。
  • | base64 -d:将提取出的Base64编码的字符串解码为明文。
  • kubectl run mysql-client:创建一个名为mysql-client的Pod。
  • --rm:Pod运行结束后自动删除。
  • --tty -i:分配一个伪终端并保持输入交互。
  • --restart='Never':设置Pod的重启策略为不重启。
  • --image docker.io/bitnami/mysql:8.4.4-debian-12-r0:使用指定的Docker镜像。
  • --namespace db:在db命名空间中创建Pod。
  • --env MYSQL_ROOT_PASSWORD=$MYSQL_ROOT_PASSWORD:将环境变量MYSQL_ROOT_PASSWORD设置为之前获取的root密码。
  • --command -- bash:在Pod中启动一个Bash shell。

将数据插入其中

-- Create the database if it doesn't exist
CREATE DATABASE IF NOT EXISTS debezium_db;-- Use the database
USE debezium_db;-- Create a sample table
CREATE TABLE employees (id INT AUTO_INCREMENT PRIMARY KEY,first_name VARCHAR(50) NOT NULL,last_name VARCHAR(50) NOT NULL,email VARCHAR(100) UNIQUE NOT NULL,salary DECIMAL(10,2) NOT NULL,created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);-- Insert sample data
INSERT INTO employees (first_name, last_name, email, salary) VALUES
('John', 'Doe', 'john.doe@example.com', 60000.00),
('Jane', 'Smith', 'jane.smith@example.com', 75000.00),
('Alice', 'Johnson', 'alice.johnson@example.com', 82000.00),
('Bob', 'Williams', 'bob.williams@example.com', 50000.00);

部署 Kafka

kubectl create ns kafkacat > kafka-values.yaml <<EOF
listeners:client:protocol: PLAINTEXTcontroller:protocol: PLAINTEXTinterbroker:protocol: PLAINTEXTexternal:protocol: PLAINTEXT
sasl:enabledMechanisms: PLAIN
broker:replicaCount: 1
controller:replicaCount: 1
EOFhelm install kafka -n kafka bitnami/kafka --version 31.3.1 -f kafka-values.yaml

注意:上述 Kafka 配置是为了简化演示,禁用了 Kafka Broker 和 Controller 的所有身份验证机制。不建议在生产环境中禁用身份验证运行 Kafka。

请检查 Kafka Broker 和 Controller Pod 是否在 Kafka 命名空间中启动并运行。

部署 kafdrop

安装 Kafka UI,用于查看 Kafka 主题和浏览消费者组。

helm repo add lsst-sqre https://lsst-sqre.github.io/charts/helm repo update # required if above repo is already addedcat > kafdrop-values.yaml <<EOF
kafka:brokerConnect: kafka:9092
EOFhelm install -n kafka kafdrop lsst-sqre/kafdrop --version 0.1.3 -f kafdrop-values.yaml# port forward kafdrop to localhost:9000
kubectl port-forward -n kafka svc/kafdrop 9000:9000

部署 Debezium 连接器

要部署 Debezium 连接器,您需要先部署一个包含所需连接器插件的 Kafka Connect 集群,然后再实例化实际的连接器本身。第一步,需要创建一个包含该插件的 Kafka Connect 容器镜像。

创建 Kafka Connect 集群

运行以下命令获取 minikube 注册表的 IP:kubectl -n kube-system get svc registry -o jsonpath=‘{.spec.clusterIP}’,并将其替换到下面的 kafkaconnect 清单中。

  • get svc registry: 这部分命令用来获取名为 registry 的服务的信息。svcservice 的缩写,代表服务资源。
  • -o jsonpath='{.spec.clusterIP}': 这个选项指定输出格式为 JSONPath 表达式的结果。JSONPath 是一种查询 JSON 数据的语法,类似于 XPath 用于 XML。这里的 {.spec.clusterIP} 表示从返回的服务对象中提取 spec.clusterIP 字段的值,即该服务的 ClusterIP 地址。
kubectl create ns debeziumcat > kafka-connect.yaml <<EOF
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:name: debezium-connect-clusterannotations:strimzi.io/use-connector-resources: "true"namespace: debezium
spec:version: 3.8.0replicas: 1bootstrapServers: kafka.kafka.svc.cluster.local:9092 # kafka broken endpointconfig:config.providers: secretsconfig.providers.secrets.class: io.strimzi.kafka.KubernetesSecretConfigProvidergroup.id: connect-clusteroffset.storage.topic: connect-cluster-offsetsconfig.storage.topic: connect-cluster-configsstatus.storage.topic: connect-cluster-status# -1 means it will use the default replication factor configured in the brokerconfig.storage.replication.factor: -1offset.storage.replication.factor: -1status.storage.replication.factor: -1build:output:type: dockerimage: <TO_BE_REPLACED_BY_ABOVE_CLUSTER_IP>/debezium-connect-mysql:latestplugins:- name: debezium-mysql-connectorartifacts:- type: tgzurl: https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/3.0.7.Final/debezium-connector-mysql-3.0.7.Final-plugin.tar.gz
EOFkubectl apply -f kafka-connect.yaml
  • build: 配置了构建过程:
  • output.typeoutput.image: 指定了 Docker 镜像的输出类型和名称。
  • plugins: 定义了要安装的插件,这里是 Debezium MySQL 连接器,指定了插件的下载 URL。

注意:在上述配置中,如果您希望将镜像推送到 ECR/GCR 或任何其他镜像仓库,请将镜像端点替换为相应的端点,并且集群应具有将镜像推送到该仓库的权限。

检查 Kafka 连接是否已就绪。可能需要几分钟(4-5 分钟)才能进入就绪状态。

kubectl get kafkaconnect -n debezium

它应该返回状态 Ready: True

NAME                       DESIRED REPLICAS   READY
debezium-connect-cluster   1                  True

在 Kafdrop 中,您应该能够看到 3 个主题:

connect-cluster-configs
connect-cluster-offsets
connect-cluster-status

创建 Debezium 连接器

在创建连接器之前,我们需要创建一个 k8s secret(用于存储数据库凭据)和 rbac。

cat > mysql-creds.yaml <<EOF
apiVersion: v1
kind: Secret
metadata:name: mysql-credsnamespace: debezium
type: Opaque
stringData:username: mysql_usrpassword: mysql_pwd
EOFkubectl apply -f mysql-creds.yamlcat > role.yaml <<EOF
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:name: connector-configuration-rolenamespace: debezium
rules:- apiGroups: [""]resources: ["secrets"]resourceNames: ["mysql-creds"]verbs: ["get"]
EOFkubectl apply -f role.yamlcat > role-binding.yaml <<EOF
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:name: connector-configuration-role-bindingnamespace: debezium
subjects:- kind: ServiceAccountname: debezium-connect-cluster-connectnamespace: debezium
roleRef:kind: Rolename: connector-configuration-roleapiGroup: rbac.authorization.k8s.io
EOFkubectl apply -f role-binding.yaml

通过登录 MySQL 获取 MySQL server_id(按照与上述相同的步骤)

SELECT @@server_id; # it is expected to be 1

部署 kafka 连接器

cat > kafka-connector.yaml <<'EOF'
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:name: debezium-connector-mysqlnamespace: debeziumlabels:strimzi.io/cluster: debezium-connect-cluster
spec:class: io.debezium.connector.mysql.MySqlConnectortasksMax: 1config:tasks.max: 1database.hostname: mysql.db.svc.cluster.local # mysql db hostnamedatabase.port: 3306database.user: ${secrets:debezium/mysql-creds:username}database.password: ${secrets:debezium/mysql-creds:password}database.server.id: 1 # SELECT @@server_id;topic.prefix: mysqldatabase.include.list: debezium_dbschema.history.internal.kafka.bootstrap.servers: kafka.kafka.svc.cluster.local:9092 # kafka broken endpointschema.history.internal.kafka.topic: schema-changes.debezium_db
EOFkubectl apply -f kafka-connector.yaml

检查 kafka 连接是否已就绪。可能需要几分钟才能进入就绪状态。

kubectl get kafkaconnector -n debezium

它应该返回状态 Ready: True

NAME                       CLUSTER                    CONNECTOR CLASS                              MAX TASKS   READY
debezium-connector-mysql   debezium-connect-cluster   io.debezium.connector.mysql.MySqlConnector   1           True

现在,您应该能够在 kafdrop 中看到更多主题,例如 mysql.debezium_db.employees。

此主题将包含上面创建员工表时插入的所有数据。

为了测试 Debezium 连接器,请向表中添加更多数据。

-- Insert more sample data
INSERT INTO employees (first_name, last_name, email, salary) VALUES
('Charlie', 'Brown', 'charlie.brown@example.com', 72000.00),
('David', 'Miller', 'david.miller@example.com', 68000.00),
('Emma', 'Wilson', 'emma.wilson@example.com', 79000.00),
('Frank', 'Anderson', 'frank.anderson@example.com', 55000.00),
('Grace', 'Thomas', 'grace.thomas@example.com', 87000.00),
('Henry', 'Taylor', 'henry.taylor@example.com', 62000.00);

它应该反映在主题mysql.debezium_db.employees中。
在这里插入图片描述
这表明 MySQL 表员工中所做的更改已反映在 Kafka 中。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/bicheng/86997.shtml
繁体地址,请注明出处:http://hk.pswp.cn/bicheng/86997.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

利润才是机器视觉企业的的“稳定器”,机器视觉企业的利润 = (规模经济 + 技术差异化 × 场景价值) - 竞争强度

影响机器视觉企业盈利能力的关键因素。这个公式本质上反映了行业的核心动态:利润来自成本控制(规模化效应)和差异化优势(技术壁垒与场景稀缺性的协同),但被市场竞争(内卷程度)所侵蚀。下面我将一步步拆解这个公式,结合机器视觉行业的特点(如工业自动化、质检、安防、…

EPLAN 中定制 自己的- A3 图框的详细指南(一)

EPLAN 中定制 BIEM - A3 图框的详细指南 在智能电气设计领域&#xff0c;图框作为图纸的重要组成部分&#xff0c;其定制的规范性和准确性至关重要。本文将以北京经济管理职业学院人工智能学院的相关任务为例&#xff0c;详细介绍在 EPLAN 软件中定制 BIEM - A3 图框的全过程…

macbook开发环境的配置记录

前言&#xff1a;好多东西不记录就会忘记 git ssh配置 当我们的没有配置git ssh的时候&#xff0c;使用ssh下载的时候会显示报错“make sure you have the correct access rights and respository exits" 如何解决&#xff0c;我们先在命令行检查检查一下用户名和邮箱是…

GitLab 18.1 高级 SAST 已支持 PHP,可升级体验!

GitLab 是一个全球知名的一体化 DevOps 平台&#xff0c;很多人都通过私有化部署 GitLab 来进行源代码托管。极狐GitLab 是 GitLab 在中国的发行版&#xff0c;专门为中国程序员服务。可以一键式部署极狐GitLab。 学习极狐GitLab 的相关资料&#xff1a; 极狐GitLab 官网极狐…

[学习]M-QAM的数学原理与调制解调原理详解(仿真示例)

M-QAM的数学原理与调制解调原理详解 QAM&#xff08;正交幅度调制&#xff09;作为现代数字通信的核心技术&#xff0c;其数学原理和实现方法值得深入探讨。本文将分为数学原理、调制解调原理和实现要点三个部分进行系统阐述。 文章目录 M-QAM的数学原理与调制解调原理详解一、…

图书管理系统练习项目源码-前后端分离-使用node.js来做后端开发

前端学习了这么久了&#xff0c;node.js 也有了一定的了解&#xff0c;知道使用node也可以来开发后端&#xff0c;今天给大家分享 使用node 来做后端&#xff0c;vue来写前端&#xff0c;做一个简单的图书管理系统。我们在刚开始学习编程的时候&#xff0c;需要自己写大量的项目…

【甲方安全视角】企业建设下的安全运营

文章目录 一、安全运营的概念与起源二、安全运营的职责与定位三、安全运营工程师的核心能力要求四、安全运营的典型场景与应对技巧1. 明确责任划分,避免“医生做保姆”2. 推动机制:自下而上 vs. 自上而下3. 宣传与内部影响力建设五、安全运营的战略意义六、为何需要安全原因在…

03认证原理自定义认证添加认证验证码

目录 大纲 一、自定义资源权限规则 二、自定义登录界面 三、自定义登录成功处理 四、显示登录失败信息 五、自定义登录失败处理 六、注销登录 七、登录用户数据获取 1. SecurityContextHolder 2. SecurityContextHolderStrategy 3. 代码中获取认证之后用户数据 4. 多…

IPLOOK 2025上半年足迹回顾:连接全球,步履不停

2025年上半年&#xff0c;IPLOOK积极活跃于全球通信舞台&#xff0c;足迹横跨亚洲、欧洲、非洲与北美洲&#xff0c;我们围绕5G核心网、私有网络、云化架构等方向&#xff0c;向来自不同地区的客户与合作伙伴展示了领先的端到端解决方案&#xff0c;深入了解各地市场需求与技术…

【Kafka】docker 中配置带 Kerberos 认证的 Kafka 环境(全过程)

1. 准备 docker 下载镜像 docker pull centos/systemd&#xff0c;该镜像是基于 centos7 增加了 systemd 的功能&#xff0c;可以更方便启动后台服务 启动镜像 使用 systemd 功能需要权限&#xff0c;如果是模拟 gitlab services 就不要使用 systemd 的方式启动 如果不使用 s…

用Python构建一个可扩展的多网盘聚合管理工具 (以阿里云盘为例)

摘要 本文旨在从开发者视角&#xff0c;探讨并实践如何构建一个命令行界面的、支持多网盘聚合管理的工具。我们将以阿里云盘为例&#xff0c;深入解析其API认证与核心操作&#xff0c;并用Python从零开始实现文件列表、重命名、分享等功能。更重要的是&#xff0c;本文将重点讨…

筑牢网络安全屏障

在数字化浪潮席卷全球的今天&#xff0c;网络空间已成为继陆、海、空、天之后的 “第五疆域”&#xff0c;深刻影响着国家政治、经济、军事等各个领域。“没有网络安全就没有国家安全”&#xff0c;这句论断精准道出了网络安全与国家安全之间密不可分的关系。​ 网络安全关乎国…

计算机网络(一)层

一、分层 分层的意义&#xff1a;简化复杂性、提高灵活性、促进标准化 &#xff08;1&#xff09;法律上国际标准——OSI体系结构 &#xff08;2&#xff09;事实上的网络标准——TCP/IP体系结构 TCP&#xff1a;运输层的协议 IP&#xff1a;网际层的一个协议 网络接口层&…

STM32 rs485实现中断DMA模式收发不定长数据

在STM32F103上使用TD301D485H模块通过USB转485/422串口线与电脑通信 TXD (TD301D485H) -> PA2 (STM32F103)RXD (TD301D485H) -> PA3 (STM32F103)CON (TD301D485H) -> PA1 (STM32F103) 由于485是半双工通信&#xff0c;需要在发送和接收时控制方向引脚&#xff08;CO…

DDL-8-小结

DDL 小结 DDL 小结 DDL 小结DDL - 数据库操作DDL - 表操作 DDL - 数据库操作 查看当前有哪些数据库 SHOW DATABASES;新建数据库 CREATE DATABASE 数据库名;使用数据库 USE 数据库名;查询当前数据库 SELECT DATABASE();删除数据库 DROP DATABASE 数据库名;DDL - 表操作 查看当前…

Redis 安装使用教程

一、Redis 简介 Redis 是一个开源&#xff08;BSD 许可&#xff09;、内存数据结构存储系统&#xff0c;可以用作数据库、缓存和消息中间件。支持字符串、哈希、列表、集合、有序集合等数据类型&#xff0c;广泛应用于分布式缓存、排行榜、实时数据分析等场景。 二、下载安装…

Go语言测试与调试:单元测试与基准测试

以下是《Go语言实战指南》中关于 测试与调试&#xff1a;单元测试与基准测试 的详细内容&#xff0c;涵盖测试编写、运行、覆盖率分析与性能测试&#xff0c;适用于实际项目开发与性能优化阶段。 一、Go 的测试体系概览 Go 提供原生的测试工具包 testing&#xff0c;无需第三方…

数字FIR-I型滤波器设计(窗函数法)

目录 一、实验目的 二、实验原理 2.1 概念辨析 2.2 代码实现逻辑与工具函数 三、实验内容 3.1 设计带通滤波器&#xff08;电平组合法&#xff0c;&#xff08;理想宽带低通-理想窄带低通&#xff09;x窗函数&#xff09; 3.2 高通滤波器&#xff08;…

RHCSA认证题目练习一(配置网络设置)

一. 题目 配置网络设置 解题过程&#xff1a; 注意&#xff1a;不可以在xshell中完成&#xff0c;否则会直接断联 这里用图形化解题&#xff0c;更加简单防止命令记错 1. 打开图形化视图 命令&#xff1a;nmtui 按回车确认 按回车确认 2.首先把IPv4配置 <自动> 改成 …

STL简介+string模拟实现

STL简介string模拟实现 1. 什么是STL2. STL的版本3. STL的六大组件4.STL的缺陷5. string5.1 C语言中的字符串5.2 1个OJ题 6.标准库中的string类6.1 string类(了解)6.2 string类的常用接口说明1.string类对象的常见构造函数2.析构函数(~string())3.赋值函数 (operator) 6.3 stri…