Kafka代码模板

Kafka 服务器(Broker) 的配置

server.properties

# broker.id: 每个 Kafka Broker 的唯一标识符。broker.id 必须在整个 Kafka 集群中唯一。
broker.id=0# 配置 Kafka Broker 监听客户端请求的地址和端口。这个配置决定了 Kafka 服务将接受来自生产者、消费者以及其他客户端的连接。
listeners=PLAINTEXT://192.168.65.60:9092# Kafka 消息日志文件的存储目录
log.dir=/usr/local/data/kafka‐logs#  Kafka 连接到 Zookeeper 的地址
zookeeper.connect=192.168.65.60:2181

每个 Kafka 集群中的节点(Broker)都需要有一个 server.properties 配置文件,并且每个节点的配置可以有所不同。

生产者

生产者配置

Properties props = new Properties();// Kafka服务器地址
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.65.60:9092,192.168.65.60:9093,192.168.65.60:9094");// 把发送的key和value从字符串序列化为字节数组
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());  
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); /* * 1. 发出消息持久化机制参数* acks=0: 表示producer不需要等待任何broker确认收到消息的回复,就可以继续发送下一条消息。性能最高,但是最容易丢消息* acks=1: 至少要等待leader已经成功将数据写入本地log,但是不需要等待所有follower是否成功写入,就可以继续发送下一条消息*          如果follower没有成功备份数据,而此时leader又挂掉,则消息会丢失* acks=‐1或all: 需要等待 min.insync.replicas(默认为1,推荐配置大于等于2) 这个参数配置的副本个数都成功写入日志。这是最强的数据保证。一般金融级别才会使用这种配置*/
props.put(ProducerConfig.ACKS_CONFIG, "1");// 2. 重试相关
//2.1 发送失败重试次数,重试能保证消息发送的可靠性,但是也可能造成消息重复发送,需要接收者做好消息接收的幂等性处理
props.put(ProducerConfig.RETRIES_CONFIG, 3);// 2.2 重试间隔设置,默认重试间隔100ms
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 300);/ * 3. 本地缓冲区和延迟发送相关* 在设置本地缓冲区/延迟发送后,消息会先发送到本地缓冲区,当达到批量发送消息的大 * 小时,本地线程会从缓冲区取数据(一个batch),批量发送到broker。同时,需要设置 * batch最大的延迟发送时间,如果一条消息在本地缓冲区中等待的时间达到设置的时间后 * batch没满,那么也必须把消息发送出去* /// 3.1 设置本地缓冲区大小
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);// 3.2 设置batch大小
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);/* * 3.3 batch最大的延迟发送时间* 默认值是0:意思就是消息必须立即被发送,但这样会影响性能* 一般设置10毫秒左右,就是说这个消息发送完后会进入本地的一个batch,如果10毫秒内,这个batch满了16kb就会随batch一起被发送出去* 如果10毫秒内,batch没满,那么也必须把消息发送出去,不能让消息的发送延迟时间太长* *  消息 -> 本地缓冲区(32M)-> batch(16k)-> 发送(10ms batch不满也发送)*/
props.put(ProducerConfig.LINGER_MS_CONFIG, 10);

生产者发送消息

// 创建 Kafka 生产者
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);// 发送消息
String topic = "test-topic";  // 主题名称
String key = "order1";  // 消息的 key
String value = "Order details: 123";  // 消息的内容// 创建消息记录
ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);try {// 发送消息,这里的lambda函数就是onCompletion()方法producer.send(record, (metadata, exception) -> {if (exception != null) {System.out.println("Error sending message: " + exception.getMessage());} else {System.out.println("Message sent successfully to topic " + metadata.topic() +" partition " + metadata.partition() + " with offset " + metadata.offset());}});} catch (Exception e) {e.printStackTrace();
} finally {// 关闭生产者producer.close();
}
// 指定发送分区
var producerRecord = new ProducerRecord<String, String>(TOPIC_NAME, 0, key_json, value_json);// 也可以指定发送分区
var producerRecord = new ProducerRecord<String, String>(TOPIC_NAME, key_json, value_json);// 等待消息发送成功的同步阻塞方法
RecordMetadata metadata = producer.send(producerRecord).get();// 异步回调方式发送消息
producer.send(producerRecord, new Callback() {public void onCompletion(RecordMetadata metadata, Exception exception) {// 处理异常}
});
// 关闭
producer.close();

此外,为了保证生产者的消息发送成功,可以通过添加回调函数的方式,在send成功后打印日志。

详细内容参考:Kafka如何保证消息不丢失

ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topic, o);
future.addCallback(result -> logger.info("生产者成功发送消息到topic:{} partition:{}的消息", result.getRecordMetadata().topic(), result.getRecordMetadata().partition()),ex -> logger.error("生产者发送消失败,原因:{}", ex.getMessage()));

消费者

消费者配置

Properties properties = new Properties();// Kafka服务器地址
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.65.60:9092,192.168.65.60:9093,192.168.65.60:9094");// 把发送的key和value从字符串序列化为字节数组
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());// 消费分组名
props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP_NAME);// 是否自动提交offset,默认就是true
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");// 自动提交offset的间隔时间
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");/* * 当消费主题的是一个新的消费组,或者指定offset的消费方式,offset不存在,那么应该如何消费* latest(默认) :只消费自己启动之后发送到主题的消息* earliest:第一次从头开始消费,以后按照消费offset记录继续消费,这个需要区别于 consumer.seekToBeginning(每次都从头开始消费)*/
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");// consumer给broker发送心跳的间隔时间,broker接收到心跳如果此时有rebalance发生会通过心跳响应将rebalance方案下发给consumer,这个时间可以稍微短一点
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000);// 服务端broker多久感知不到一个consumer心跳就认为他故障了,会将其踢出消费组,对应的Partition也会被重新分配给其他consumer,默认是10秒
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10 * 1000);// 一次poll最大拉取消息的条数,如果消费者处理速度很快,可以设置大点,如果处理速度一般,可以设置小点
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);// 如果两次poll操作间隔超过了这个时间,broker就会认为这个consumer处理能力太弱,会将其踢出消费组,将分区分配给别的consumer消费
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 30 * 1000);

消费者消费消息

// 创建 Kafka 消费者
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);// 订阅主题
consumer.subscribe(Collections.singletonList("test-topic"));// 消费指定分区,这段代码指定了消费者从TOPIC_NAME的第一个分区(分区0)开始消费
consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));/* 回溯消费(从头消费 - seekToBeginning)* seekToBeginning()方法使消费者回溯到该分区的最初位置,意味着从头开始消费该分    区的所有消息。* 这对于重新消费主题中的消息或重新同步时非常有用。* /
consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));
consumer.seekToBeginning(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));// 指定offset消费,即消费者将跳过之前的消息,从该offset开始消费
consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));
consumer.seek(new TopicPartition(TOPIC_NAME, 0), 10);/* 从指定时间点开始消费 - 1小时前
* partitionsFor()方法获取指定主题(TOPIC_NAME)的所有分区信息。
* fetchDataTime 是一个时间戳,表示1小时前的时间,new Date().getTime() - 1000 * 60 * 60 用来计算这个时间戳。
* map 用于存储每个分区与其对应的时间戳(fetchDataTime)。这个时间戳将用于从Kafka中拉取时间戳较早的消息。
*/ 
List<PartitionInfo> topicPartitions = consumer.partitionsFor(TOPIC_NAME);
long fetchDataTime = new Date().getTime() ‐ 1000 * 60 * 60;
Map<TopicPartition, Long> map = new HashMap<>();
for (PartitionInfo par : topicPartitions) {map.put(new TopicPartition(topicName, par.partition()), fetchDataTime);
}
// 消费消息
try {while (true) {consumer.poll(1000).forEach(record -> {// 可以修改为具体业务逻辑System.out.println("Consumed record with key: " + record.key() +", value: " + record.value() + ", from partition: " + record.partition());});}
} catch (Exception e) {e.printStackTrace();
} finally {// 关闭消费者consumer.close();
}

消费者提交offset

手动提交offset的意义

  1. 控制消费进度

手动提交offset能够让消费者在每个消息或消息批次消费后,明确地告诉Kafka“我已经消费到这个offset了”。这对于控制消息消费的精确性非常重要,尤其在需要精确控制消费位置的场景中。

  1. 避免消息丢失或重复消费

如果自动提交offset,可能会发生消费者在处理中出现异常(如程序崩溃),导致已消费的消息的offset提交失败,导致消息丢失或重复消费。手动提交则可以在处理完消息并确保成功时再提交offset,避免这种问题。比如在金融交易、日志收集系统等场景中,需要确保消息的处理不会丢失,并且不会重复处理。

  1. 灵活的错误处理与恢复

通过手动提交offset,消费者可以在消费过程中灵活地处理错误。如果在消费某条消息时发生异常,消费者可以选择不提交offset,这样在消费者重启或恢复时会重新消费该消息。它使得消费者在出错时能更好地控制重试策略。

代码实现

同步提交
consumer.commitSync();

当调用该方法时,消费者会将当前消费的偏移量提交到Kafka集群,并且当前线程会阻塞,直到该提交操作完成。

优势:

  1. 阻塞:会等待offset提交成功,不会继续执行后续代码,直到提交完成。
  2. 可靠性:如果提交失败,commitSync()会抛出异常,可以捕获并进行处理,确保提交正确。

缺点:会导致性能问题,因为它会阻塞当前线程,直到提交完成。

异步提交
consumer.commitAsync(new OffsetCommitCallback() {@Overridepublic void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception ex) {// 处理异常}
});

回调函数:异步提交会接受一个OffsetCommitCallback回调接口作为参数,该接口的onComplete()方法会在提交操作完成时被调用。这个方法会接收到两个参数:
offsets:包含提交的偏移量信息(TopicPartition和OffsetAndMetadata)。
ex:如果提交发生错误,该参数会包含异常信息。

优势:

  1. 非阻塞:不会等待提交完成,允许程序继续执行其他操作。
  2. 提高吞吐量:减少等待时间,尤其是在批量消费和提交的情况下,可以提高整体的吞吐量和性能。

缺点:可能会出现提交失败的情况,回调函数中的异常处理需要做好,以确保异常得到及时处理。

Spring boot集成

1. 添加依赖

在pom.xml 中添加 Kafka 的相关依赖

<dependencies><!-- Spring Boot Starter for Apache Kafka --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><!-- Spring Boot Starter Web (optional if you need a web app) --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- Spring Boot Starter for Actuator (optional for monitoring) --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId></dependency><!-- Spring Boot Starter Test (optional for testing) --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency>
</dependencies>

2. 配置文件

application.yml

spring:kafka:# Kafka broker 地址bootstrap‐servers: 192.168.65.60:9092,192.168.65.60:9093,192.168.65.60:9094producer:retries: 3batch‐size: 16384buffer‐memory: 33554432acks: 1key‐serializer: org.apache.kafka.common.serialization.StringSerializervalue‐serializer: org.apache.kafka.common.serialization.StringSerializerconsumer:group‐id: default‐groupenable‐auto‐commit: falseauto‐offset‐reset: earliestkey‐deserializer: xxx.StringDeserializervalue‐deserializer: xxx.StringDeserializerlistener:ack‐mode: manual_immediate

注意:
ack‐mode
RECORD:当每一条记录被消费者监听器(ListenerConsumer)处理之后提交
BATCH:当每一批poll()的数据被消费者监听器处理之后提交
TIME:当每一批poll()的数据被消费者监听器处理之后,距离上次提交时间大于TIME时提交
COUNT:当每一批poll()的数据被消费者监听器处理之后,被处理record数量大于等于COUNT时提交
TIME | COUNT:有一个条件满足时提交
MANUAL:当每一批poll()的数据被消费者监听器处理之后, 手动调用Acknowledgment.acknowledge()后提交
MANUAL_IMMEDIATE:手动调用Acknowledgment.acknowledge()后立即提交,一般使用这种(一次提交一条消息)

3. 启动类

package com.example.kafka;import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.beans.factory.annotation.Autowired;@SpringBootApplication
public class KafkaApplication implements CommandLineRunner {@Autowiredprivate KafkaProducer kafkaProducer;public static void main(String[] args) {SpringApplication.run(KafkaApplication.class, args);}@Overridepublic void run(String... args) throws Exception {// 发送消息kafkaProducer.sendMessage("test-topic", "Hello, Kafka!");}
}

4. 生产者类

package com.example.kafka.producer;import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;@Service
public class KafkaProducer {private final KafkaTemplate<String, String> kafkaTemplate;public KafkaProducer(KafkaTemplate<String, String> kafkaTemplate) {this.kafkaTemplate = kafkaTemplate;}public void sendMessage(String topic, String message) {kafkaTemplate.send(topic, message);System.out.println("Message sent: " + message);}
}

5. 消费者类

package com.example.kafka.consumer;import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;@Service
public class KafkaConsumer {@KafkaListener(topics = "test-topic", groupId = "test-group")public void consume(String message) {System.out.println("Consumed message: " + message);}@KafkaListener(topics = "test-topic",groupId = "test-group")public void consume1(ConsumerRecord<String, String> record, Acknowledgment ack) {String value = record.value();ack.acknowledge();  //手动提交offset}// 配置多个topic,concurrency就是同组下的消费者个数,就是并发消费数,必须小于等于分区总数@KafkaListener(groupId = "testGroup", topicPartitions = {@TopicPartition(topic = "topic1", partitions = {"0", "1"}), // 从topic1的分区0和1读取消息@TopicPartition(topic = "topic2", partitions = "0",partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100")) // 从topic2的分区0读取消息,并设置分区1的初始偏移量为100}, concurrency = "6")public void listenToMultipleTopics(String message) {// 消费消息的逻辑System.out.println("Group: testGroup, Message: " + message);}
}

Kafka事务

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("transactional.id", "my‐transactional‐id");
Producer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());// 初始化事务
producer.initTransactions();
try {// 开启事务producer.beginTransaction();// 发到不同的主题的不同分区producer.send(/*...*/);// 提交事务producer.commitTransaction();
} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {producer.close();
} catch (KafkaException e) {// 回滚事务producer.abortTransaction();
}
// 关闭
producer.close();

spring框架下Kafka事务

可以通过**@Transactional**实现

配置

可以通过在application.yml文件或KafkaConfig配置类中添加配置的方式,提供事务支持。

1. application.yml
spring:kafka:bootstrap-servers: localhost:9092  # Kafka 集群地址producer:acks: all  # 确保消息被所有副本确认transactional-id-prefix: tx-  # 事务前缀,Kafka 事务需要一个事务 ID 前缀consumer:group-id: test-group  # 消费者组 IDenable-auto-commit: false  # 手动提交 offsetlistener:ack-mode: manual  # 设置为手动提交确认
2. 配置类
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.transaction.KafkaTransactionManager;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.clients.producer.KafkaProducer;@Configuration
@EnableKafka
public class KafkaConfig {@Beanpublic KafkaTemplate<String, String> kafkaTemplate() {// 设置 Kafka 生产者的事务管理器KafkaTransactionManager<String, String> transactionManager =new KafkaTransactionManager<>(producerFactory());KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<>(producerFactory());kafkaTemplate.setTransactionManager(transactionManager);return kafkaTemplate;}@Beanpublic DefaultKafkaProducerFactory<String, String> producerFactory() {Map<String, Object> configProps = new HashMap<>();configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);configProps.put(ProducerConfig.ACKS_CONFIG, "all");configProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "tx-");  // 事务 IDreturn new DefaultKafkaProducerFactory<>(configProps);}
}

生产者

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.annotation.Transactional;
import org.springframework.stereotype.Service;@Service
public class KafkaTransactionProducer {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;@Transactionalpublic void sendTransactionalMessages() {try {// 发送事务消息kafkaTemplate.send("topic1", "key1", "message1");kafkaTemplate.send("topic2", "key2", "message2");// 你可以在此处加入其他业务逻辑,如果出现异常,会回滚事务if (someConditionFails()) {throw new RuntimeException("Simulating failure to trigger rollback");}// 如果没有异常,事务提交,消息将被正常发送} catch (Exception e) {// 事务回滚System.out.println("Transaction failed, rolling back...");throw e;}}private boolean someConditionFails() {// 模拟某些条件下事务失败return true;}
}

消费者

import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;@Service
@EnableKafka
public class KafkaTransactionConsumer {@KafkaListener(topics = "topic1", groupId = "test-group")public void listenTopic1(String message) {System.out.println("Received message from topic1: " + message);}@KafkaListener(topics = "topic2", groupId = "test-group")public void listenTopic2(String message) {System.out.println("Received message from topic2: " + message);}
}

在生产者的配置中启用事务,配置 transactional.id,并设置事务管理器 KafkaTransactionManager,它会自动管理 Kafka 事务的开始、提交和回滚。

事务管理:@Transactional 注解用于标识在发送消息的过程是一个事务操作。如果其中任何消息发送失败,Spring Kafka 会自动回滚事务。

回滚机制:在 sendTransactionalMessages() 中模拟了一个失败的条件,确保事务在遇到异常时会被回滚。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/news/909797.shtml
繁体地址,请注明出处:http://hk.pswp.cn/news/909797.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

最大子数组和C++

给你一个整数数组 nums &#xff0c;请你找出一个具有最大和的连续子数组&#xff08;子数组最少包含一个元素&#xff09;&#xff0c;返回其最大和。 子数组是数组中的一个连续部分。 示例 1&#xff1a; 输入&#xff1a;nums [-2,1,-3,4,-1,2,1,-5,4] 输出&#xff1a;…

centos 7单机安装ceph并创建rbd块设备

1. 安装依赖包 新增阿里云源ceph下载地址 vim /etc/yum.repos.d/ceph.repo [ceph] nameceph baseurlhttp://mirrors.aliyun.com/ceph/rpm-jewel/el7/x86_64/ gpgcheck0 [ceph-noarch] namecephnoarch baseurlhttp://mirrors.aliyun.com/ceph/rpm-jewel/el7/noarch/ gpgcheck…

Jenkins搭建、权限管理、参数化、流水线等详细教程!

部署Jenkins 一、jenkins 安装 官网&#xff1a; https://jenkins.io yum 安装 jenkins *jenkins 依赖 java 环境 #注意2.346之后的版本不再支持jdk8 卸载旧jenkins #查询以前是否安装jenkins rpm -qa |grep jenkins #卸载 jenkins yum -y remove jenkins rpm -e jenkins…

百度飞桨(PaddlePaddle)案例分享:基于 PaddleOCR 的图像文字提取系统

一、案例背景 在实际教学、办公及政务系统中&#xff0c;纸质材料&#xff08;如手写作文、表格、试卷等&#xff09;仍广泛存在。为提升信息处理效率&#xff0c;采用 OCR&#xff08;Optical Character Recognition&#xff09;技术将图像中的文字提取为可编辑文本已成为刚需…

python操控鼠标

在已知屏幕坐标的情况下&#xff0c;可以通过 Python 的 pyautogui 或 pynput 等库实现网页上的鼠标点击操作。以下是具体步骤和代码示例&#xff1a; 1. 使用 PyAutoGUI&#xff08;推荐&#xff09; pyautogui 是一个简单易用的库&#xff0c;可以直接通过屏幕坐标控制鼠标点…

UV 与 Bun 深度解析

UV 与 Bun 深度解析&#xff1a;现代开发工具的安装与使用指南 什么是 UV&#xff1f; UV&#xff08;Ultra-Velocity&#xff09;是由 Astral 公司&#xff08;Ruff 的创建者&#xff09;开发的超高速 Python 包管理工具&#xff1a; 用 Rust 编写&#xff0c;速度极快&…

【算力网络】多样化算力感知

一、算力网络 ​ 算力网络&#xff08;Computing Power Network&#xff09;是我国率先提出的原创性技术理念&#xff0c;其核心是通过高速网络整合分散的算力资源&#xff08;如云端、边缘、终端等&#xff09;&#xff0c;实现算力的动态感知、智能调度和一体化服务&#x…

Greenplum/PostgreSQL pg_hba.conf 认证方法详解

Greenplum/PostgreSQL pg_hba.conf 认证方法详解 pg_hba.conf 文件中的 METHOD 字段指定了客户端认证方式&#xff0c;以下是各种认证方法的详细说明和配置示例。 常用认证方法 1. trust - 无条件允许连接 说明&#xff1a;不需要密码&#xff0c;完全信任连接 适用场景&am…

分布式数据库中间件-Sharding-JDBC

前言 学习视频&#xff1a;深入Sharding-JDBC分库分表从入门到精通【黑马程序员】本内容仅用于个人学习笔记&#xff0c;如有侵扰&#xff0c;联系删除 1、概述 1.1、分库分表是什么 小明是一家初创电商平台的开发人员&#xff0c;他负责卖家模块的功能开发&#xff0c;其中…

pycharm2020.2版本给项目选择了虚拟环境解释器,项目文件都运行正常,为什么terminal文件路径的前面没有虚拟解释器的名称

解决问题&#xff1a; 1.打开 Anaconda Prompt输入 conda init cmd.exe 或者 pycharm终端直接 conda init cmd.exe 重启动 CMD和pycharm&#xff0c;使配置生效。

2025商旅平台排行:国内主流商旅平台解析

在数字化转型加速2025年&#xff0c;企业商旅管理正从“成本中心”向“智能管控枢纽”升级。如何通过技术赋能实现商旅成本精准优化与管理效率跃升&#xff1f;本文聚焦国内五大主流商旅平台&#xff0c;以“综合型头部平台创新型平台”双维度解析&#xff0c;结合数据实证与场…

CNS无线电信号覆盖分析系统v0.1

#系统终端有的版本号了# 开发一套类EMACS的专业软件任重道远&#xff0c;经过慢吞吞的开发&#xff0c;我们已经将目标定位大幅下调了&#xff0c;不再对标EMACS系统了&#xff0c;改为瞄行业老二WRAP软件了。当然WRAP软件在电磁信号仿真分析领域也是神一样的存在&#xff0c;其…

单视频二维码生成与列表二维码生成(完整版)

视频二维码有有两种情况&#xff1a;一种是单个视频的生成一个二维码&#xff1b;另一种是一组视频&#xff08;多个视频&#xff09;生成一个列表二维码。用户按自己的实际需求生成&#xff0c;即可&#xff0c;很方便。 STEP1 注册帐号 使用视频二维码&#xff0c;您需要注…

关于linux:1. Linux 基础运维

一、Linux 安装与发行版选择 关于操作系统种类&#xff1a; 1&#xff09;基于 Linux 内核的操作系统 Ubuntu、Debian、Kali、CentOS、RHEL、Arch、Android、Alpine、OpenWRT 等 特点&#xff1a;开源、稳定、安全、广泛使用于服务器与开发领域 2&#xff09;基于 Windows…

(LeetCode 每日一题) 2016. 增量元素之间的最大差值 (数组)

题目&#xff1a;2016. 增量元素之间的最大差值 思路&#xff1a;维护已遍历过的最小值&#xff0c;时间复杂度0(n)。 C版本&#xff1a; class Solution { public:int maximumDifference(vector<int>& nums) {int mnnums[0];int ans0;for(int i1;i<nums.size()…

MySQL基础与常用数据类型浅析

一.MySQL数据类型分类 二.数值类型 2.1int类型 我们使用TINYINT作为例子进行实验验证: 越界插入会直接报错,跟我们当时学习语言的时候不太一样,语言会进行隐式类型转换或截断.一般不会直接报错.其他的int类型也是同理. 说明: 在MySQL中&#xff0c;整型可以指定是有符号的…

Ubuntu 20.04离线安装Nvidia-docker

服务器因系统故障重装&#xff0c;安装docker容器时发现几年前的在线安装步骤不好使了&#xff0c;只好尝试离线安装。为了下次不卡壳&#xff0c;记录一下安装步骤。 先确定自己的操作系统&#xff0c;并确保已经安装了nvidia driver。我的操作系统是Ubuntu 20.04。 1. 下载…

6,TCP客户端

1,创建一个新的项目 2,界面设计

【dify更新问题】如何更新dify且低成本解决git pull 443问题

我的dify部署在mini server上&#xff0c;挂不了TZ&#xff0c;所以采用了如下办法 更新origin (.git/config) 地址为&#xff1a;https://gitee.com/dify_ai/dify.git 顺序执行 &#xff08;https://docs.dify.ai/en/getting-started/install-self-hosted/docker-compose#upg…

即时通讯消息推送技术深度解析:从底层原理到行业实践-优雅草卓伊凡|片翼|搁浅

即时通讯消息推送技术深度解析&#xff1a;从底层原理到行业实践-优雅草卓伊凡|片翼|搁浅 引言&#xff1a;重新启程的即时通讯项目 优雅草科技的卓伊凡最近重启了即时通讯项目的二次开发工作&#xff0c;在这个万物互联的时代&#xff0c;消息推送通知作为IM系统的核心功能之…