kafka之操作示例

一、常用shell命令

#1、创建topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replications 1 --topic test#2、查看创建的topic
bin/kafka-topics.sh --list --zookeeper localhost:2181#3、生产者发布消息命令
(执行完此命令后在控制台输入要发送的消息,回车即可)
bin/kafka-console-producer.sh --broker-list 192.168.91.231:9092,192.168.91.231:9093,192.168.91.231:9094 --topic test#4、消费者接受消息命令
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning#5、kafka启动
首先启动zookeeper zkServer.sh start(相当于一个server,kafka会连接这个server)
bin/kafka-server-start.sh config/server.properties # 启动kafka#6、查看kafka节点数目
在zookeeper中查看,登录客户端bin/zkCli.sh 执行ls /brokers/ids 查看节点数目及节点ID,[0,1,2]#7、kafka中的概念
生产者 Producer、代理Broker、消费者Consumer、主题Topic、分区 Partition、消费者组 Consumer Group#8、查看主颗信息
bin/kafka-topics.sh --zookeeper 192.168.91.231:2181 [加其他选项]eg:
bin/kafka-topics.sh --zookeeper 192.168.91.231:2181 --describe
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test#9、为主题创建分区
一共创建八个分区,编号分别为0~7
bin/kafka-topics.sh --zookeeper 192.168.91.231:2181 --alter -partitions 8 -topic test#10、查看kafka进程
ps -eflgrep server.properties
ps -eflgrep server-1.properties
ps -eflgrep server-2.properties#11、kafka宕机重启后,消息不会丢失#12、kafka其中一个broker宕机后,对消费者和生产者影响很小(命令行下测试)
消费者会尝试连接,连接不到,返回java.net.ConnectException:Connection refused异常 生产者可能会在发送消息的时候报异常,但会很快连接到其他broker,继续正常使用#13.查看kafka消息队列的积压情况
bin/kafka-consumer-groups.sh --zookeeper 192.168.91.231:2181 --describe --group console-consumer-37289#14.kafka 中查看所有的group列表信息
bin/kafka-consumer-groups.sh --zookeeper 192.168.91.231:2181 --list

二、python操作kafka

本地安装与启动(基于Docker)

#1、下载zookeeper镜像与kafka镜像:
docker pull registry.cn-shanghai.aliyuncs.com/egon-k8s-test/kafka-zookeeper:3.4.6
docker pull registry.cn-shanghai.aliyuncs.com/egon-k8s-test/wurstmeister-kafka:2.13-2.8.1

#2、本地启动zookeeper
docker run -d --name zookeeper -p 2181:2181 -t registry.cn-shanghai.aliyuncs.com/egon-k8s-test/kafka-zookeeper:3.4.6

#3、本地启动kafka(注意下述代码,将kafka启动在9092端口)
docker run -d --name kafka --publish 9092:9092 --link zookeeper \
--enV KAFKA ZO0KEEPER CONNECT=zookeeper:2181 \
--enV KAFKA ADVERTISED HOST NAME=192.168.71.113 \
--enV KAFKA ADVERTISED PORT=9092 \
registry.cn-shanghai.aliyuncs.com/egon-k8s-test/wurstmeister-kafka:2.13-2.8.1

上面写的localhost没有影响,查看端口如下
# netstat -tuanlp | grep 9092
tcp 0 0 0.0.0.0:9092 0.0.0.0:*LISTEN 102483/docker-proxy
tcp6 00:::9092 :::* LISTEN 102487/docker-proxy

#4、进入kafka bash
docker exec it kafka bash
cd /opt/kafka/bin

#5、创建Topic,分区为2,Topic name为'kafka_demo'
kafka-topics.sh --create --zookeeper zookeeper:2181 \
--replication-factor 1 --partitions 2 --topic kafka_demo

kafka-topics.sh --create --zookeeper zookeeper:2181 \
--replication-factor 1 --partitions 2 --topic egon

数据存在哪里
[root@web02 ~]# docker exec -it kafka bash
bash-5.1#
bash-5.1#
bash-5.1#
bash-5.1# ls /kafka/
kafka-logs-f33383f9c414
bash-5.1#
bash-5.1#
bash-5.1#
bash-5.1# 1s /kafka/kafka-logs-f33383f9c414/
kafka_demo-0 kafka_demo-1
egon-0 egon-1
.........
bash-5.1#
bash-5.1#
bash-5.1#
bash-5.1# ls /kafka/kafka-logs-f33383f9c414/egon-0
00000000000000000000.index0000000000000000000.timeindex
00000000800000080000.1og leader-epoch-checkpoint

#6、查看当前所有topic
kafka-topics.sh --zookeeper zookeeper:2181 --list

#7、命令行操作
$docker exec -ti kafka sh
/ # cd /opt/kafka/bin
/ # kafka-console-producer.sh --bootstrap-server 192.168.71.113:9092 --topic test_topic
然后一行行输入,回车即发送一条消息
>111
>222
>333

另外一个终端
$ docker exec -ti kafka sh
/ # cd /opt/kafka/bin
/ # kafka-console-consumer.sh --bootstrap-server 192.168.71.113:9092 --topic test_topic --from-beginning可以收到消息
111
222
333

#8、安装kafka-python
pip install kafka-python

代码示例:

# pip3 install kafka-python  # 版本是2.0.2
from kafka import KafkaProducer, KafkaConsumer
import json
import threading
import time# Kafka broker address
bootstrap_servers = '192.168.71.113:9092'# Topic name
topic = 'test_topic'# Producer function
def kafka_producer():producer = KafkaProducer(bootstrap_servers=bootstrap_servers,value_serializer=lambda v: json.dumps(v).encode('utf-8'))try:for i in range(10):message = {'message': f'Hello Kafka! Message {i}'}producer.send(topic, value=message)print(f"Sent: {message}")time.sleep(1)else:print("发送完成")except Exception as ex:print(f"Exception occurred: {ex}")finally:producer.close()# Consumer function
def kafka_consumer():consumer = KafkaConsumer(topic,bootstrap_servers=bootstrap_servers,auto_offset_reset='earliest',consumer_timeout_ms=5000)  # 设置超时时间为1秒try:for message in consumer:print(f"Received: {message.value}")else:print("消费完毕,等5000毫秒超时即可结束,执行finally内的代码")except Exception as ex:print(f"Exception occurred: {ex}")finally:print("消费者结束")consumer.close()# Create threads for producer and consumer
producer_thread = threading.Thread(target=kafka_producer)
consumer_thread = threading.Thread(target=kafka_consumer)# Start both threads
producer_thread.start()
consumer_thread.start()# Wait for threads to complete
producer_thread.join()
consumer_thread.join()print("Kafka producer and consumer threads have finished.")

执行结果:

                  

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.pswp.cn/pingmian/82174.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

网络安全基础--第七课

路由表 路由器的转发原理:当一个数据包进入路由器,路由器将基于数据包中的目标IP地址,查询本地 路由表,若表中存在记录,则将无条件按记录转发,若没有记录,路由器不能泛洪,因为路由器…

Java SpringBoot 扣子CozeAI SseEmitter流式对话完整实战 打字机效果

书接上回:springBoot 整合 扣子cozeAI 智能体 对话https://blog.csdn.net/weixin_44548582/article/details/147457236 上文实现的是一次性等待并得到完整的AI回复内容,但随着问题和AI的逻辑日趋复杂,会明显增加这个等待时间,这对…

《AVL树完全解析:平衡之道与C++实现》

目录 AVL树的核心概念数据结构与节点定义插入操作与平衡因子更新旋转操作:从理论到代码双旋场景深度剖析平衡检测与测试策略性能分析与工程实践总结 0.前置知识:BS树 代码实现部分对和BS树相似的部分会省略。 1. AVL树的核心概念 1.1 平衡二叉搜索树…

跨平台游戏引擎 Axmol-2.6.0 发布

Axmol 2.6.0 版本是一个以错误修复和功能改进为主的次要LTS长期支持版本 🙏感谢所有贡献者及财务赞助者:scorewarrior、peterkharitonov、duong、thienphuoc、bingsoo、asnagni、paulocoutinhox、DelinWorks 相对于2.5.0版本的重要变更: 通…

【Django Serializer】一篇文章详解 Django 序列化器

第一章 Django 序列化器概述 1.1 序列化器的定义 1.1.1 序列化与反序列化的概念 1. 序列化 想象你有一个装满各种物品(数据对象)的大箱子(数据库),但是你要把这些物品通过一个狭窄的管道(网络&#xff…

关于spring @Bean里调用其他产生bean的方法

背景 常常见到如下代码 Bean public TestBean testBean() {TestBean t new TestBean();System.out.println("testBean:" t);return t; }Bean public FooBean fooBean() {TestBean t testBean();System.out.println("这里看似是自己new的,但因为…

Level1.7列表

1.7_1列表(索引切片) #1.列表 students[Bob,Alice,Jim,Mike,Judy] print(students)#2.在列表(添加不同数据类型,查看列表是否可以运行?是否为列表类型?) students[Bob,Alice,Jim,Mike,Judy,123…

Python爬虫实战:研究Cola框架相关技术

一、Cola 框架概述 Cola 是一款基于 Python 的异步爬虫框架,专为高效抓取和处理大规模数据设计。它结合了 Scrapy 的强大功能和 asyncio 的异步性能优势,特别适合需要高并发处理的爬虫任务。 1.1 核心特性 异步 IO 支持:基于 asyncio 实现非阻塞 IO,大幅提高并发性能模块…

vue2中el-table 实现前端分页

一些接口不分页的数据列表,一次性返回大量数据会导致前端渲染卡顿,接口不做分页的情况下前端可以截取数据来做分页 以下是一个例子,被截取的列表和全量数据在同一个栈内存空间,所以如果有表格内的表单编辑,新的值也会事…

Python + moviepy:根据图片或数据高效生成视频全流程详解

前言 在数据可视化、自媒体内容生产、学术汇报等领域,我们常常需要将一组图片或一段变动的数据,自动合成为视频文件。这样不仅能提升内容表现力,也极大节省了人工操作时间。Python作为数据处理和自动化领域的王者,其`moviepy`库为我们提供了灵活高效的视频生成方案。本文将…

科技赋能,开启现代健康养生新潮流

在科技与生活深度融合的当下,健康养生也迎来了全新的打开方式。无需传统医学的介入,借助现代科学与智能设备,我们能以更高效、精准的方式守护健康。​ 饮食管理步入精准化时代。利用手机上的营养计算 APP,录入每日饮食&#xff0…

Ubuntu24.04 LTS安装java8、mysql8.0

在 Ubuntu 24.04 上安装 OpenJDK OpenJDK 包在 Ubuntu 24.04 的默认存储库中随时可用。 打开终端并运行以下 apt 命令: sudo apt update查看是否已经安装java java --version如果未安装会有提示,直接复制命令安装即可,默认版本: sudo apt in…

深度学习框架显存泄漏诊断手册(基于PyTorch的Memory Snapshot对比分析方法)

点击 “AladdinEdu,同学们用得起的【H卡】算力平台”,H卡级别算力,按量计费,灵活弹性,顶级配置,学生专属优惠。 一、显存泄漏:深度学习开发者的"隐形杀手" 在深度学习模型的训练与推…

Pytorch分布式训练,数据并行,单机多卡,多机多卡

分布式训练 所有代码可以见我github 仓库:https://github.com/xiejialong/ddp_learning.git 数据并行(Data Parallelism,DP) 跨多个gpu训练模型的最简单方法是使用 torch.nn.DataParallel. 在这种方法中,模型被复制…

【论文阅读】——D^3-Human: Dynamic Disentangled Digital Human from Monocular Vi

文章目录 摘要1 引言2 相关工作3 方法3.1 HmSDF 表示3.2 区域聚合3.3. 变形场3.4. 遮挡感知可微分渲染3.5 训练3.5.1 训练策略3.5.2 重建损失3.5.3 正则化限制 4. 实验4.1 定量评估4.2 定性评价4.3 消融研究4.4 应用程序 5 结论 摘要 我们介绍 D 3 D^{3} D3人,一种…

docker commit除了提交容器成镜像,还能搞什么之修改cmd命令

要让新镜像默认启动时执行 /usr/sbin/sshd -D,需在提交镜像时 ​​显式指定新的启动命令​​。 方法一:提交时通过 --change 覆盖 CMD docker commit --changeCMD ["/usr/sbin/sshd", "-D"] v2 project:v2 方法二:重…

为什么我输入对了密码,还是不能用 su 切换到 root?

“为什么我输入对了密码,还是不能用 su 切换到 root?” 其实这背后可能不是“密码错了”,而是系统不允许你用 su 切 root,即使密码对了。 👇 以下是最常见的几个真正原因: ❌ 1. Root 用户没有设置密码&…

转移dp简单数学数论

1.转移dp问题 昨天的练习赛上有一个很好玩的起终点问题,第一时间给出bfs的写法。 但是写到后面发现不行,还得是的dp转移的写法才能完美的解决这道题目。 每个格子可以经过可以不经过,因此它的状态空间是2^(n*m)&…

IP查询基础介绍

IP 查询原理 IP 地址是网络设备唯一标识,IP 查询通过解析 IP 地址获取地理位置、运营商等信息。目前主流的 IPv4(32 位)与 IPv6(128 位)协议,前者理论提供约 43 亿地址,后者地址空间近乎无限。…

Linux命令简介

1 Linux系统的命令概述 在 Linux 操作系统中,凡是在字符操作界面中输入能够完成特定操作和任务的字符串都可以称为命令。严格来说,命令通常只代表实现某一类功能的指令或程序的名称。 1.1 Shell Linux 命令的执行必须依赖于 Shell 命令解释器。Shell …