阿里云和腾讯云RocketMQ 发消息和消费消息客户端JAVA接口

一、RocketMQ 概述

RocketMQ 是阿里巴巴开源的一款分布式消息中间件,后捐赠给 Apache 基金会成为顶级项目。它具有低延迟、高并发、高可用、高可靠等特点,广泛应用于订单交易、消息推送、流计算、日志收集等场景。

核心特点

  1. 分布式架构:支持集群部署,可水平扩展

  2. 高吞吐量:单机可支持10万级TPS

  3. 低延迟:毫秒级消息投递

  4. 高可用性:支持主从复制,自动故障转移

  5. 消息可靠性:支持消息持久化,确保不丢失

  6. 丰富的消息模式:支持普通消息、顺序消息、事务消息、定时消息等

二、核心概念

1. 基本组件

组件说明
NameServer轻量级注册中心,负责Broker的注册与发现
Broker消息存储与转发服务器,负责消息存储、投递和查询
Producer消息生产者,负责发送消息
Consumer消息消费者,负责消费消息
Topic消息主题,用于消息分类
Message Queue消息队列,Topic的分区单位
Tag消息标签,用于消息二级分类
Group生产者组/消费者组,用于集群管理

一、阿里云rocketMQ

使用阿里云 ONS SDK
<dependency><groupId>com.aliyun.openservices</groupId><artifactId>ons-client</artifactId><version>2.0.5.Final</version> <!-- 推荐最新版本 -->
</dependency>

获取阿里云 RocketMQ 配置

  • Endpointhttp://{YourInstanceId}.mq-internet.aliyuncs.com:80

  • AccessKey:阿里云账号的 AccessKey ID 和 AccessKey Secret

  • Topic:消息主题(需在阿里云控制台创建)

  • Group ID:消费者组(需在控制台创建)

1、发消息

import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.PropertyKeyConst;import java.util.Properties;public class AliyunMQProducer {public static void main(String[] args) {// 1. 配置 ProducerProperties properties = new Properties();properties.put(PropertyKeyConst.NAMESRV_ADDR, "http://YourInstanceId.mq-internet.aliyuncs.com:80");properties.put(PropertyKeyConst.AccessKey, "YourAccessKey");properties.put(PropertyKeyConst.SecretKey, "YourSecretKey");properties.put(PropertyKeyConst.GROUP_ID, "YourGroupId"); // Producer Group ID// 2. 创建 ProducerProducer producer = ONSFactory.createProducer(properties);producer.start();// 3. 创建消息Message msg = new Message("YourTopic",  // Topic"YourTag",    // Tag"Hello Aliyun RocketMQ!".getBytes()  // Body);// 4. 发送消息producer.send(msg);System.out.println("消息发送成功!");// 5. 关闭 Producerproducer.shutdown();}
}

2、消费MQ

import com.aliyun.openservices.ons.api.*;
import java.util.Properties;public class AliyunMQConsumer {public static void main(String[] args) {// 1. 配置 ConsumerProperties properties = new Properties();properties.put(PropertyKeyConst.NAMESRV_ADDR, "http://YourInstanceId.mq-internet.aliyuncs.com:80");properties.put(PropertyKeyConst.AccessKey, "YourAccessKey");properties.put(PropertyKeyConst.SecretKey, "YourSecretKey");properties.put(PropertyKeyConst.GROUP_ID, "YourGroupId"); // Consumer Group ID// 2. 创建 ConsumerConsumer consumer = ONSFactory.createConsumer(properties);// 3. 订阅 Topic 和 Tag(* 表示所有 Tag)consumer.subscribe("YourTopic", "*", new MessageListener() {@Overridepublic Action consume(Message message, ConsumeContext context) {System.out.println("收到消息: " + new String(message.getBody()));return Action.CommitMessage; // 消费成功}});// 4. 启动 Consumerconsumer.start();System.out.println("消费者已启动,等待消息...");}
}

 

  1. 阿里云 ONS SDK 更稳定,推荐使用(比 Apache RocketMQ 客户端更适配阿里云环境)。

  2. Topic 和 Group ID 需先在阿里云控制台创建,否则会报错。

  3. 生产环境建议配置重试机制和日志监控,避免消息丢失。

  4. 消费模式

    • 集群消费(CLUSTERING):同 Group ID 的多个 Consumer 分摊消息(默认)。

    • 广播消费(BROADCASTING):同 Group ID 的每个 Consumer 都收到所有消息。

二、腾讯云RocketMQ

import java.io.UnsupportedEncodingException;
import java.util.List;import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;import lombok.extern.slf4j.Slf4j;/*** 腾讯云rocketMQ服务类*/
@Slf4j
@Service
@Transactional(rollbackFor = Exception.class)
public class RocketTXMqService {@Value("${rocketmq.namespace:-1}")private String namespace;@Value("${rocketmq.producer.group:-1}")private String groupName;@Value("${rocketmq.producer.access-key:-1}")private String accessKey;@Value("${rocketmq.producer.secret-key:-1}")private String secretKey;@Value("${rocketmq.name-server:-1}")private String nameserver;// MQ生产者private DefaultMQProducer producer;// MQ实例化消费者pushprivate DefaultMQPushConsumer pushConsumer;// MQ实例化消费者pullprivate DefaultLitePullConsumer pullConsumer;/*** 创建生产者* * @return*/public DefaultMQProducer getProducer() {if (null == producer) {// 实例化消息生产者Producerproducer = new DefaultMQProducer(namespace, groupName,new AclClientRPCHook(new SessionCredentials(accessKey, secretKey)) // ACL权限);// 设置NameServer的地址producer.setNamesrvAddr(nameserver);try {// 启动Producer实例producer.start();} catch (MQClientException e) {e.printStackTrace();}}return producer;}/*** 同步发送 发送消息*/public void syncSend(String topic, String tag, String data) {producer = getProducer();// 发送消息SendResult sendResult = null;try {// 创建消息实例,设置topic和消息内容Message msg = new Message(topic, tag, data.getBytes(RemotingHelper.DEFAULT_CHARSET));sendResult = producer.send(msg);log.info("埋点信息发送腾讯云MQ:" + data);log.info("发送腾讯云MQ接口返回状态sendResult:" + sendResult);} catch (UnsupportedEncodingException e) {log.error("UnsupportedEncodingException:" + e.getMessage());} catch (MQClientException e) {log.error("MQClientException:" + e.getMessage());} catch (RemotingException e) {log.error("RemotingException:" + e.getMessage());} catch (MQBrokerException e) {log.error("MQBrokerException:" + e.getMessage());} catch (InterruptedException e) {log.error("InterruptedException:" + e.getMessage());}}/*** 创建push消费者* * @return*/public DefaultMQPushConsumer getPushConsumer() {if (null == pushConsumer) {// 实例化消费者pushConsumer = new DefaultMQPushConsumer(namespace, groupName,new AclClientRPCHook(new SessionCredentials(accessKey, secretKey))); // ACL权限// 设置NameServer的地址pushConsumer.setNamesrvAddr(nameserver);}return pushConsumer;}/*** 创建pull 消费者* * @return*/public DefaultLitePullConsumer getPullConsumer() {if (null == pullConsumer) {// 实例化消费者// 实例化消费者pullConsumer = new DefaultLitePullConsumer(namespace, groupName,new AclClientRPCHook(new SessionCredentials(accessKey, secretKey)));// 设置NameServer的地址pullConsumer.setNamesrvAddr(nameserver);// 设置从第一个偏移量开始消费pullConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);}return pullConsumer;}/*** push方式订阅消费* * @param topicName*/public void pushConsumer(String topicName) {pushConsumer = this.getPushConsumer();if (null != pushConsumer) {try {pushConsumer.subscribe(topicName, "*");// 注册回调实现类来处理从broker拉取回来的消息pushConsumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {// 消息处理逻辑log.info("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);// 标记该消息已经被成功消费, 根据消费情况,返回处理状态return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});// 启动消费者实例pushConsumer.start();} catch (MQClientException e) {log.error("push MQClientException:" + e.getMessage());}}}/*** pull方式订阅消费* * @param topicName*/public void pullConsumer(String topicName) {pullConsumer = this.getPullConsumer();if (null != pullConsumer) {try {// 订阅topicpullConsumer.subscribe(topicName, "*");// 启动消费者实例pullConsumer.start();} catch (MQClientException e) {log.error(" pull MQClientException:" + e.getMessage());}try {log.info("Consumer Started.%n");while (true) {// 拉取消息List<MessageExt> messageExts = pullConsumer.poll();log.info("%s%n", messageExts);}} finally {pullConsumer.shutdown();}}}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/web/88357.shtml
繁体地址,请注明出处:http://hk.pswp.cn/web/88357.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Vue响应式原理六:Vue3响应式原理

1. 多个对象响应式当前存在的问题&#xff1a;当前实现仅针对某个固定对象&#xff08;obj&#xff09;进行依赖收集&#xff0c;实际开发中需要处理多个不同对象将对象响应式处理逻辑抽取为通用函数&#xff0c;支持任意对象代码如下&#xff1a; // 方案一&#xff1a;Obje…

【算法笔记 day three】滑动窗口(其他类型)

hello大家好&#xff01;这份笔记包含的题目类型主要包括求子数组已经一些比较‘小众’的题目。和之前一样&#xff0c;笔记中的代码和思路要么是我手搓要么是我借鉴一些大佬的想法转化成自己的话复现。所以方法不一定是最好的&#xff0c;但一定是经过我理解的产物&#xff0c…

docker-镜像管理指南

在本节中&#xff0c;我们将详细介绍 Docker 镜像的常用命令&#xff0c;帮助您更好地管理和操作镜像。以下是核心命令及其功能说明&#xff1a;1.使用"ls"查看镜像列表#查看现有的镜像列表[rootdocker01 ~]# docker images [rootdocker01 ~]# docker image ls#仅查看…

Mac 电脑无法读取硬盘的解决方案

引言近年来&#xff0c;选择使用 Mac 电脑的用户越来越多&#xff0c;尤其是在设计、开发、剪辑、文档处理等领域&#xff0c;macOS 凭借其优秀的系统生态与硬件体验吸引了大量拥趸。与此同时&#xff0c;对于摄影师、剪辑师、程序员、学生等用户来说&#xff0c;一块移动硬盘往…

25春期末考

web 疯狂星期四 先来看一下源码 分析代码的黑名单后得知 我们可以用的字符就只剩下 字母a-z(大小写均可) 数字2 空格 这里的限制太多了 这里比较常用的getallheaders被ban掉了 这里就是用session来做 session_start()开启session session_id()获取session 这里我们要构造一…

时间显示 蓝桥云课Java

目录 题目链接 题目 解题思路 代码 题目链接 竞赛中心 - 蓝桥云课 题目 解题思路 通过%天数,得到一天内的时间,然后/小时单位(换算成毫秒的)得到小时,然后总数减去该小时,得到分钟数,秒数同理 代码 import java.util.Scanner; // 1:无需package // 2: 类名必须Main, 不…

STM32F1控制步进电机

一、基础知识1. 步进电机控制方式脉冲方向控制&#xff08;最常见&#xff09;控制信号&#xff1a;DIR方向&#xff1a;高低电平决定正转或反转&#xff1b;STEP脉冲&#xff1a;每个脉冲电机前进一步&#xff08;可通过端口拉高拉低来模拟脉冲&#xff0c;或使用pwm来生成脉冲…

Docker 容器部署脚本

#!/bin/bash# # Author: ldj # Date: 2025-07-08 15:37:11 # Description: 首先删除旧的容器和镜像&#xff0c;然后登录到 Harbor 并拉取最新的镜像进行部署 # # 显示每条命令执行情况&#xff0c;便于调试 set -x harbor_addr$1 harbor_repo$2 project_name$3 version$4 po…

OpenCV 4.10.0 移植 - Android

前文: Ubuntu 编译 OpenCV SDK for Android Linux OpenCV 4.10.0 移植 概述 在移动应用开发领域&#xff0c;Android平台与OpenCV库的结合为开发者提供了强大的图像处理和计算机视觉能力。OpenCV(Open Source Computer Vision Library)是一个开源的计算机视觉和机器学习软件…

go go go 出发咯 - go web开发入门系列(二) Gin 框架实战指南

go go go 出发咯 - go web开发入门系列&#xff08;二&#xff09; Gin 框架实战指南 往期回顾 go go go 出发咯 - go web开发入门系列&#xff08;一&#xff09; helloworld 前言 前一节我们使用了go语言简单的通过net/http搭建了go web服务&#xff0c;但是仅使用 Go 的标…

编译OpenHarmony-4.0-Release RK3566 报错

编译OpenHarmony-4.0-Release RK3566 报错1. 报错问题2.问题解决3.解决方案4.​调试技巧​subsystem name config incorrect in ‘/home/openharmony/OpenHarmony/vendor/kaihong/khdvk_356b/bundle.json’, build file subsystem name is kaihong_products,configured subsy1.…

【PTA数据结构 | C语言版】线性表循环右移

本专栏持续输出数据结构题目集&#xff0c;欢迎订阅。 文章目录题目代码题目 给定顺序表 A(a1​,a2​,⋯,an​)&#xff0c;请设计一个时间和空间上尽可能高效的算法将该线性表循环右移指定的 m 位。例如&#xff0c;(1,2,5,7,3,4,6,8) 循环右移 3 位&#xff08;m3) 后的结果…

c++-内部类

概念如果一个类定义在另一个类的内部&#xff0c;这个内部类就叫做内部类。内部类是一个独立的类&#xff0c; 它不属于外部类。特性1.不能通过外部类的对象去访问内部类的成员。外部类对内部类没有任何优越的访问权限。 2.内部类就是外部类的友元类&#xff0c;参见友元类的定…

.golangci.yml文件配置

version: “2” run: timeout: 5m concurrency: 10 modules-download-mode: readonly linters: default: standard enable: - revive - cyclop settings: staticcheck: initialisms: [ “ACL”, “API”, “ASCII”, “CPU”, “CSS”, “DNS”, “EOF”, “GUID”, “HTML”, …

YOLO模型魔改指南:从原理到实战,替换Backbone、Neck和Head(战损版)

前言 Hello&#xff0c;大家好&#xff0c;我是GISer Liu&#x1f601;&#xff0c;一名热爱AI技术的GIS开发者。本系列是作者参加DataWhale 2025年6月份Yolo原理组队学习的技术笔记文档&#xff0c;这里整理为博客&#xff0c;希望能帮助Yolo的开发者少走弯路&#xff01; &am…

Swift 图论实战:DFS 算法解锁 LeetCode 323 连通分量个数

文章目录摘要描述示例题解答案DFS 遍历每个连通区域Union-Find&#xff08;并查集&#xff09;题解代码分析&#xff08;Swift 实现&#xff1a;DFS&#xff09;题解代码详解构建邻接表DFS 深度优先搜索遍历所有节点示例测试及结果示例 1示例 2示例 3时间复杂度分析空间复杂度分…

【剑指offer】栈 队列

&#x1f4c1; JZ9 用两个栈实现队列一个栈in用作进元素&#xff0c;一个栈out用于出元素。当栈out没有元素时&#xff0c;从in栈获取数据&#xff0c;根据栈的特性&#xff0c;栈out的top元素一定是先进入的元素&#xff0c;因此当栈out使用pop操作时&#xff0c;一定时满足队…

GoView 低代码数据可视化

纯前端 分支&#xff1a; master &#x1f47b; 携带 后端 请求分支: master-fetch &#x1f4da; GoView 文档 地址&#xff1a;https://www.mtruning.club/ 项目纯前端-Demo 地址&#xff1a;https://vue.mtruning.club/ 项目带后端-Demo 地址&#xff1a;https://demo.mtrun…

Spring Boot返回前端Long型丢失精度 后两位 变成00

文章目录一、前言二、问题描述2.1、问题背景2.2、问题示例三、解决方法3.1、将ID转换为字符串3.2、使用JsonSerialize注解3.3、使用JsonFormat注解一、前言 在后端开发中&#xff0c;我们经常会遇到需要将ID作为标识符传递给前端的情况。当ID为long类型时&#xff0c;如果该ID…

计算机网络实验——无线局域网安全实验

实验1. WEP和WPA2-PSK实验一、实验目的验证AP和终端与实现WEP安全机制相关的参数的配置过程。验证AP和终端与实现WPA2-PSK安全机制相关的参数的配置过程。验证终端与AP之间建立关联的过程。验证关闭端口的重新开启过程。验证属于不同BSS的终端之间的数据传输过程。二、实验任务…