kafka部署和基本操作

一、部署kafka

解压
tar xzvf kafka_2.12-3.9.1.tgz
tar -zxf kafka_2.12-3.9.1.tgz

1.修改config/server.properties

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.#
# This configuration file is intended for use in ZK-based mode, where Apache ZooKeeper is required.
# See kafka.server.KafkaConfig for additional details and defaults
############################## Server Basics ############################## The id of the broker. This must be set to a unique integer for each broker.
broker.id=0############################# Socket Server Settings ############################## The address the socket server listens on. If not configured, the host name will be equal to the value of
# java.net.InetAddress.getCanonicalHostName(), with PLAINTEXT listener name, and port 9092.
#   FORMAT:
#     listeners = listener_name://host_name:port
#   EXAMPLE:
#     listeners = PLAINTEXT://your.host.name:9092
#listeners=PLAINTEXT://:9092# Listener name, hostname and port the broker will advertise to clients.
# If not set, it uses the value for "listeners".
#advertised.listeners=PLAINTEXT://your.host.name:9092# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL# The number of threads that the server uses for receiving requests from the network and sending responses to the network
num.network.threads=3# The number of threads that the server uses for processing requests, which may include disk I/O
num.io.threads=8# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600############################# Log Basics ############################## A comma separated list of directories under which to store log files
#log.dirs=/tmp/kafka-logs
log.dirs=/data3/kafka_new_0617/kafka_log_dir# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1############################# Internal Topic Settings  #############################
# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1############################# Log Flush Policy ############################## Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs here:
#    1. Durability: Unflushed data may be lost if you are not using replication.
#    2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
#    3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000############################# Log Retention Policy ############################## The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.# The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=168# A size-based retention policy for logs. Segments are pruned from the log unless the remaining
# segments drop below log.retention.bytes. Functions independently of log.retention.hours.
#log.retention.bytes=1073741824# The maximum size of a log segment file. When this size is reached a new log segment will be created.
#log.segment.bytes=1073741824# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000############################# Zookeeper ############################## Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=localhost:2181# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=18000############################# Group Coordinator Settings ############################## The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
# The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
# The default value for this is 3 seconds.
# We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
# However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
group.initial.rebalance.delay.ms=0

2.修改config/zookeeper.properties

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# the directory where the snapshot is stored.
dataDir=/data3/kafka_new_0617/zook_data
# the port at which the clients will connect
clientPort=2181
# disable the per-ip limit on the number of connections since this is a non-production config
maxClientCnxns=0
# Disable the adminserver by default to avoid port conflicts.
# Set the port to something non-conflicting if choosing to enable this
admin.enableServer=false
# admin.serverPort=8080

3.启动kafka

bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
bin/kafka-server-start.sh -daemon config/server.properties

4.kafka常用命令

a、kafka-acls.sh #配置,查看kafka集群鉴权信息
b、kafka-configs.sh #查看,修改kafka配置
c、kafka-console-consumer.sh #消费命令
d、kafka-console-producer.sh #生产命令
e、kafka-consumer-groups.sh #查看消费者组,重置消费位点等
f、kafka-consumer-perf-test.sh #kafka自带消费性能测试命令
g、kafka-mirror-maker.sh #kafka集群间同步命令
h、kafka-preferred-replica-election.sh #重新选举topic分区leader
i、kafka-producer-perf-test.sh #kafka自带生产性能测试命令
j、kafka-reassign-partitions.sh #kafka数据重平衡命令
k、kafka-run-class.sh #kafka执行脚本
l、kafka-server-start.sh #进程启动
m、kafka-server-stop.sh #进程停止
n、kafka-topics.sh #查询topic状态,新建,删除,扩容

创建topic

kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic merchant-topic-test

查看topic list

bin/kafka-topics.sh --list --bootstrap-server localhost:9092

删除topic

bin/kafka-topics.sh --delete --bootstrap-server localhost:9092 --topic tidb_btb_merchant

消费topic(实时查看写入的日志)

./bin/kafka-console-consumer.sh --bootstrap-server 10.126.106.158:9092 --from-beginning --topic tidb_btb_merchant
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic jasonhu-test
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic jasonhu-test --partition 0 --offset latest

producer msg:# 使用控制台生产者写入数据(逐行输入),写入后可以通过kafka-console-consumer.sh实时查看写入结果

./bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic jasonhu-test

5.问题解决
(1).Member console-consumer-19248bcd-b405-4367-979a-6a4baa37e8f2 in group console-consumer-71214 has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
该问题实际上是个乌龙,实操通过kafka-console-consumer.sh消费topic,一直没有数据,查看kafka日志,发现了上面日志;
经验证,发现是没有最新日志写入,所以消费不到日志。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/diannao/87540.shtml
繁体地址,请注明出处:http://hk.pswp.cn/diannao/87540.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Bootstrap 5学习教程,从入门到精通,Bootstrap 5 导航语法知识点及案例代码(17)

Bootstrap 5 导航语法知识点及案例代码 Bootstrap 5 提供了强大的导航组件,帮助开发者快速构建响应式且美观的导航栏。 一、Bootstrap 5 导航组件概述 Bootstrap 5 提供了多种导航组件,主要包括: 导航栏(Navbar)&am…

清除 docker 无用的 镜像/容器

清除 docker 无用的 镜像/容器 删除 <none> 的 docker 镜像 使用以下命令删除所有 的 Docker 镜像&#xff08;即悬空镜像 / dangling images&#xff09;&#xff1a; docker image prune -f这会自动删除所有没有 tag 的镜像&#xff08;&#xff09;&#xff0c;不会…

使用Charles抓包工具提升API调试与性能优化效率

在软件开发过程中&#xff0c;网络请求调试和性能优化往往成为开发者遇到的挑战&#xff0c;尤其是在进行API接口调试时。开发者需要确保网络请求的正确性、响应时间以及系统的整体性能。然而&#xff0c;传统的调试方法常常无法提供足够的细节来深入分析问题&#xff0c;进而影…

如何协调各项目关键节点的冲突与依赖

在多项目并行的环境下&#xff0c;关键节点间的冲突与依赖是导致项目延期、资源浪费和沟通误解的主要根源。要高效协调此类问题&#xff0c;企业应重点从建立透明的进度依赖图、使用项目管理工具对齐节点、推动跨部门协同机制入手。其中&#xff0c;通过Gantt图或关键路径法实现…

mongodb单节点改副本集模式

前一阵将三节点的副本集改成了单节点&#xff0c;但后面业务代码出现问题&#xff1a;无法使用事务&#xff0c;因为事务只有在副本集上能用&#xff0c;单节点无法使用&#xff0c;故需要改回副本集模式&#xff0c;而我目前仅有一台服务器&#xff0c;所以考虑在一台服务器上…

Android 修改了页面的xml布局,使用了databinding,这时候编译时需要用到apt吗

deepseek回答&#xff1a; 在 Android 开发中使用 DataBinding 时&#xff0c;不需要显式使用 apt&#xff08;Annotation Processing Tool&#xff09;。以下是详细说明&#xff1a; 1. DataBinding 的编译机制 DataBinding 是 Android Gradle 插件原生支持的功能&#xff…

服务器如何从http升级到https(nginx)

1.证书申请 可以到阿里云或者华为云去申请证书&#xff0c;申请完下载证书是个压缩包&#xff0c;然后解压 可以到到几个文件夹&#xff0c;找到 .Nginx 文件夹打开 会有两个文件&#xff0c;将这两个文件上传至nginx/conf/cert文件夹下&#xff08;cert需要手…

6.19_JAVA_微服务

1、跑后端的时候要把数据库跑起来&#xff0c;否则会报错。 2、predicate断言&#xff1a; 预言&#xff1a;predict 3、gateway&#xff1a;出路口 4、API&#xff1a;List.of("a", "b", "c");把abc编程一个集合。 5、 6、shortcutFieldOrd…

Linux 基础命令:`ls`、`cd`、`du` 快速入门

在 Linux 系统中&#xff0c;ls、cd 和 du 是日常操作中最常用的三个命令。掌握它们能大幅提升文件管理效率。 1. ls&#xff1a;查看目录内容 用途&#xff1a;列出当前或指定目录下的文件和子目录。 常用命令&#xff1a; ls -l # 详细列表&#xff08;权限、大…

408第一季 - 数据结构 - 散列表

散列表 概念 散列表本身就是为了查找 原始人思想 散列表思想 6%5 是 1 1%5 也是1 冲突 冲突怎么办&#xff1f; 线性探测法 就往后找&#xff0c;1跑到索引为2 然后查找&#xff0c;可以发现&#xff0c;只要没冲突就只用查找1次 然后你想找10的话&#xff0c;发现索引为0…

Spring Boot 集成 Elasticsearch(含 ElasticsearchRestTemplate 示例)

Elasticsearch 是一个基于 Lucene 的分布式搜索服务器&#xff0c;具有高效的全文检索能力。在现代应用中&#xff0c;尤其是需要强大搜索功能的系统中&#xff0c;Elasticsearch 被广泛使用。 Spring Boot 提供了对 Elasticsearch 的集成支持&#xff0c;使得开发者可以轻松地…

CMake实践:指定gcc版本编译和交叉编译

目录 1.指定gcc版本编译 1.1.通过CMake参数来实现 1.2.使用 RPATH/RUNPATH 直接指定库路径 1.3.使用符号链接和 LD_LIBRARY_PATH 1.4.使用 wrapper 脚本封装 LD_LIBRARY_PATH 2.交叉编译 2.1.基本用法 2.2.工具链文件关键配置 2.3.多平台工具链示例 2.4.注意事项 2.…

详解鸿蒙Next仓颉开发语言中的全屏模式

大家好&#xff0c;今天跟大家分享一下仓颉开发语言中的全屏模式。 和ArkTS一样&#xff0c;仓颉的新建项目默认是非全屏模式的&#xff0c;如果你的应用颜色比较丰富&#xff0c;就会发现屏幕上方和底部的留白&#xff0c;这是应用自动避让了屏幕上方摄像头区域和底部的导航条…

LoRA 浅析

1. 核心思想 LoRA 是一种参数高效的微调方法&#xff0c;旨在减少微调大型语言模型 (LLMs) 所需的计算资源和存储空间。其核心思想是&#xff1a; 冻结预训练模型权重&#xff1a; 在微调过程中&#xff0c;保持预训练 LLM 的原始权重不变。引入低秩矩阵&#xff1a; 对于 LL…

软件范式正在经历第三次革命

核心主题&#xff1a;软件范式正在经历第三次根本性革命&#xff08;软件3.0&#xff09;&#xff0c;其核心是“智能体”&#xff08;Agent&#xff09;&#xff0c;未来十年将是“智能体的十年”。 逻辑模块解析&#xff1a; 软件的三次重生革命 软件1.0&#xff1a; 传统编…

JavaScript 变量与运算符全面解析:从基础到高级应用

昨天学长说可以放缓一下学习进度,刚好最近期末复习也不是很紧张,所以来重新复习一下js的一些知识点。 一&#xff1a;变量 &#xff08;1&#xff09;变量声明 来简单看一下变量的一些知识点。首先是变量声明&#xff1a;变量声明尽量使用数组字母下划线 来举几个例子&#x…

移动语义对性能优化的具体示例

前言 本文章对比了&#xff1a;小中大字符串在普通传值、传值移动、传左值引用、传右值引用、模板完美转发、内联版本等多种测试&#xff0c;对比各个方式的性能优异&#xff1a; 测试代码1 #include <iostream> #include <string> #include <chrono> #incl…

C/C++ 和 OpenCV 来制作一个能与人对弈的实体棋盘机器人

项目核心架构 整个系统可以分为四个主要模块&#xff1a; 视觉感知模块 (Vision Perception Module): 任务: 使用摄像头“看懂”棋盘。工具: C, OpenCV。功能: 校准摄像头、检测棋盘边界、进行透视变换、分割 64 个棋盘格、识别每个格子上的棋子、检测人类玩家的走法。 决策模…

SpringBoot扩展——日志管理!

Spring Boot扩展 在Spring Boot中可以集成第三方的框架如MyBatis、MyBatis-Plus和RabbitMQ等统称为扩展。每一个扩展会封装成一个集成&#xff0c;即Spring Boot的starter&#xff08;依赖组件&#xff09;。starter是一种非常重要的机制&#xff0c;不需要烦琐的配置&#xf…

【JSON-To-Video】AI智能体开发:为视频图片元素添加动效(滑入、旋转、滑出),附代码

各位朋友们&#xff0c;大家好&#xff01; 今天要教大家如何在 JSON - To - Video 中为视频内图片元素添加滑入、旋转、滑出的动效。 如果您还不会封装制作自己的【视频工具插件】&#xff0c;欢迎查看之前的教程&#xff01; AI智能体平台&#xff0c;如何封装自定义短视频…