使用 Go 语言实现完整且轻量级高性能的 MQTT Broker

MQTT(Message Queuing Telemetry Transport)是一种轻量级的发布/订阅消息传输协议。但是目前虽然mqtt的客户端很多,但是服务端着实不多,常见的服务端如mosquitto或emqx。但是golang语言的实现几乎找不到。golang的轻量级部署和高并发高性能,很合适做mqtt Broker。本文将详细介绍如何使用 Go 语言实现一个简单轻量级且高性能的 MQTT Broker,并涵盖MQTT3.1.1协议的核心特性和完整功能。

1. 需求分析

本文选择golang语言实现一个完整的 MQTT 3.1.1 Broker,不涉及集群支持和协议版本检测。简单且轻量级,不但可以替代mosquitto,后续还可以灵活的做扩展,如增加webUI的管理界面。且部署也很简单,一个exe可执行文件。

完整项目开源地址:https://github.com/yangyongzhen/goang-mqtt-broker

gitee: https://gitee.com/yyz116/goang-mqtt-broker

可执行文件在release目录下。

1.1 实现效果截图

服务启动:
在这里插入图片描述
客户端发布:
在这里插入图片描述
客户端订阅:
在这里插入图片描述
使用mosquitto客户端测试效果:

在这里插入图片描述
在这里插入图片描述
优化增加基于redis的持久化存储:
可在etc/config.yaml文件中配置是否启用redis的持久化。默认基于内存。
在这里插入图片描述
windows下的可执行文件仅有7M左右大小,简单小巧。且代码开源方便定制。可以作为替代mosquitto的另外一种选择。
在这里插入图片描述

1.2 功能特性
1.2.1核心功能
  • 完整的 MQTT 3.1.1 协议支持
  • QoS 0, 1, 2 消息传递保证
  • 会话管理(持久会话和清理会话)
  • 保留消息(Retained Messages)
  • 遗嘱消息(Last Will and Testament)
  • 主题通配符(+ 和 # 通配符支持)
  • 客户端认证(用户名/密码)
  • 保活机制(Keep Alive)
  • 并发安全
1.2.2 架构特性
  • 🏗️ 模块化设计,易于扩展
  • 🔌 可插拔存储接口
  • 🔒 线程安全的并发处理
  • 📊 内置监控指标
  • 🐳 Docker 支持

2. 项目架构设计

架构设计

数据流

客户端连接 → TCP Server 接受连接
协议解析 → Client 解析 MQTT 数据包
认证验证 → Auth 模块验证用户凭据
会话管理 → Storage 加载/保存会话信息
消息路由 → Broker 根据订阅关系路由消息
主题匹配 → Topic Manager 处理通配符匹配

2.1 目录结构
mqtt-broker/
├── README.md
├── Makefile
├── Dockerfile
├── go.mod
├── go.sum
├── cmd/
│   ├── broker/
│   │   └── main.go
│   └── test-client/
│       └── main.go
├── internal/
│   ├── auth/
│   │   └── auth.go
│   ├── broker/
│   │   ├── broker.go
│   │   ├── client.go
│   │   └── topic.go
│   ├── protocol/
│   │   ├── common/
│   │   │   └── types.go
│   │   └── mqtt311/
│   │       └── packet.go
│   └── storage/
│       ├── interface.go
│       └── memory/
│           └── store.go
└── pkg/└── mqtt/└── packet.go
2.2 主要模块
  • cmd/broker/main.go:程序入口。
  • internal/broker/:Broker 核心逻辑,包括连接管理、消息路由等。
  • internal/storage/:存储接口和内存实现。
  • pkg/mqtt/packet.go:MQTT 数据包编码和解码。

3. 核心实现

3.1 存储接口

internal/storage/interface.go 文件中定义存储接口:

package storageimport ("github.com/yangyongzhen/mqtt-broker/internal/protocol/common"
)type Store interface {SaveSession(clientID string, session *Session) errorLoadSession(clientID string) (*Session, error)DeleteSession(clientID string) errorSaveMessage(clientID string, message *common.Message) errorLoadMessages(clientID string) ([]*common.Message, error)DeleteMessage(clientID string, packetID uint16) errorSaveRetainedMessage(topic string, message *common.Message) errorLoadRetainedMessage(topic string) (*common.Message, error)DeleteRetainedMessage(topic string) errorSaveSubscription(clientID string, subscription *common.Subscription) errorLoadSubscriptions(clientID string) ([]*common.Subscription, error)DeleteSubscription(clientID string, topic string) error
}type Session struct {ClientID      stringCleanSession  boolSubscriptions map[string]*common.SubscriptionPendingAcks   map[uint16]*common.MessageLastSeen      time.Time
}
3.2 内存存储实现

internal/storage/memory/store.go 文件中实现内存存储:

package memoryimport ("sync""github.com/yangyongzhen/mqtt-broker/internal/storage""github.com/yangyongzhen/mqtt-broker/internal/protocol/common"
)type MemoryStore struct {sessions        map[string]*storage.SessionretainedMsgs    map[string]*common.MessageclientMessages  map[string][]*common.Messagemu              sync.RWMutex
}func NewMemoryStore() *MemoryStore {return &MemoryStore{sessions:       make(map[string]*storage.Session),retainedMsgs:   make(map[string]*common.Message),clientMessages: make(map[string][]*common.Message),}
}func (m *MemoryStore) SaveSession(clientID string, session *storage.Session) error {m.mu.Lock()defer m.mu.Unlock()m.sessions[clientID] = sessionreturn nil
}func (m *MemoryStore) LoadSession(clientID string) (*storage.Session, error) {m.mu.RLock()defer m.mu.RUnlock()session, exists := m.sessions[clientID]if !exists {return nil, nil}return session, nil
}// 其他方法省略...
3.3 客户端连接管理

internal/broker/client.go 文件中实现客户端连接管理:

package brokerimport ("bufio""fmt""net""sync""time""github.com/yangyongzhen/mqtt-broker/internal/protocol/common""github.com/yangyongzhen/mqtt-broker/internal/protocol/mqtt311""github.com/yangyongzhen/mqtt-broker/internal/storage""github.com/yangyongzhen/mqtt-broker/pkg/mqtt"
)type Client struct {conn           net.ConnclientID       stringinfo           *common.ClientInfosession        *storage.Sessionbroker         *BrokerpacketReader   *mqtt.PacketReaderwriteChan      chan []bytecloseChan      chan struct{}keepAliveTimer *time.Timermu             sync.RWMutexconnected      boolnextPacketID   uint16pendingAcks    map[uint16]*PendingMessage
}type PendingMessage struct {Message   *common.MessageTimestamp time.TimeRetries   int
}func NewClient(conn net.Conn, broker *Broker) *Client {return &Client{conn:         conn,broker:       broker,packetReader: mqtt.NewPacketReader(conn),writeChan:    make(chan []byte, 1000),closeChan:    make(chan struct{}),pendingAcks:  make(map[uint16]*PendingMessage),nextPacketID: 1,}
}func (c *Client) Start() {go c.readLoop()go c.writeLoop()go c.retryLoop()
}func (c *Client) readLoop() {defer c.Close()for {select {case <-c.closeChan:returndefault:packet, err := c.packetReader.ReadPacket()if err != nil {fmt.Printf("Read packet error: %v\n", err)return}c.handlePacket(packet)}}
}func (c *Client) writeLoop() {defer c.Close()for {select {case data := <-c.writeChan:if _, err := c.conn.Write(data); err != nil {fmt.Printf("Write error: %v\n", err)return}case <-c.closeChan:return}}
}func (c *Client) retryLoop() {ticker := time.NewTicker(5 * time.Second)defer ticker.Stop()for {select {case <-ticker.C:c.retryPendingMessages()case <-c.closeChan:return}}
}func (c *Client) handlePacket(packet common.Packet) {switch p := packet.(type) {case *mqtt311.ConnectPacket:c.handleConnect(p)case *mqtt311.PublishPacket:c.handlePublish(p)case *mqtt311.SubscribePacket:c.handleSubscribe(p)case *mqtt311.UnsubscribePacket:c.handleUnsubscribe(p)case *mqtt311.PingreqPacket:c.handlePingReq()case *mqtt311.DisconnectPacket:c.handleDisconnect()}
}// handleConnect, handlePublish 等其他方法省略...
3.4 主 Broker 实现

internal/broker/broker.go 文件中实现主 Broker 的逻辑:

package brokerimport ("fmt""net""sync""time""github.com/yangyongzhen/mqtt-broker/internal/auth""github.com/yangyongzhen/mqtt-broker/internal/protocol/common""github.com/yangyongzhen/mqtt-broker/internal/storage"
)type Broker struct {listener      net.Listenerclients       map[string]*ClienttopicManager  *TopicManagerstore         storage.Storeauth          auth.Authenticatormu            sync.RWMutexrunning       boolconfig        *Config
}type Config struct {MaxConnections   intMaxMessageSize   intRetainedMsgLimit intSessionExpiry    time.DurationMessageExpiry    time.Duration
}func NewBroker(store storage.Store, authenticator auth.Authenticator) *Broker {return &Broker{clients:      make(map[string]*Client),topicManager: NewTopicManager(),store:        store,auth:         authenticator,config: &Config{MaxConnections:   10000,MaxMessageSize:   1024 * 1024,RetainedMsgLimit: 10000,SessionExpiry:    24 * time.Hour,MessageExpiry:    24 * time.Hour,},}
}func (b *Broker) Start(address string) error {listener, err := net.Listen("tcp", address)if err != nil {return err}b.listener = listenerb.running = truefmt.Printf("MQTT Broker started on %s\n", address)for b.running {conn, err := listener.Accept()if err != nil {if b.running {fmt.Printf("Accept error: %v\n", err)}continue}client := NewClient(conn, b)go client.Start()}return nil
}func (b *Broker) Stop() {b.running = falseif b.listener != nil {b.listener.Close()}b.mu.Lock()defer b.mu.Unlock()for _, client := range b.clients {client.Close()}
}// AddClient, RemoveClient, PublishMessage 等其他方法省略...
3.5 主程序入口

cmd/broker/main.go 文件中定义主程序入口:

package mainimport ("flag""fmt""log""os""os/signal""syscall""github.com/yangyongzhen/mqtt-broker/internal/auth""github.com/yangyongzhen/mqtt-broker/internal/broker""github.com/yangyongzhen/mqtt-broker/internal/storage/memory"
)func main() {addr := flag.String("addr", ":1883", "MQTT broker address")flag.Parse()authenticator := auth.NewSimpleAuthenticator() // 示例认证器,需要自行实现store := memory.NewMemoryStore()b := broker.NewBroker(store, authenticator)go func() {if err := b.Start(*addr); err != nil {log.Fatalf("Failed to start MQTT broker: %v", err)}}()sigChan := make(chan os.Signal, 1)signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)<-sigChanb.Stop()fmt.Println("MQTT broker stopped")
}

以上代码是实现一个简单的 MQTT Broker 的基础框架,更多详细功能和性能优化可以根据实际需求进行扩展和改进。

安装和运行

  1. 克隆项目
git clone <your-repo-url>
cd mqtt-brokergo mod tidy

安装依赖

go mod tidy

构建项目

make build
或者
go build -o bin/mqtt-broker cmd/broker/main.go
运行 Broker
make run
或者
./bin/mqtt-broker -addr=:1883 -debug

使用 Docker

构建镜像
docker build -t mqtt-broker .
#### 运行容器
docker run -p 1883:1883 mqtt-broker
#### 使用示例**启动 Broker**
#### 默认端口 1883
go run cmd/broker/main.go
##### 自定义端口和调试模式
go run cmd/broker/main.go -addr=:1883 -debug

测试客户端

项目包含一个简单的测试客户端,可以用来测试 broker 功能:

订阅消息:

go run cmd/test-client/main.go -mode=sub -topic=test/hello -client=subscriber1

发布消息:

go run cmd/test-client/main.go -mode=pub -topic=test/hello -msg="Hello MQTT!" -client=publisher1

使用第三方客户端

你也可以使用任何标准的 MQTT 客户端连接到 broker:

使用 mosquitto 客户端:

订阅
mosquitto_sub -h localhost -p 1883 -t "test/topic"
发布
mosquitto_pub -h localhost -p 1883 -t "test/topic" -m "Hello World"

使用认证:

默认用户:

admin/password, test/test123
mosquitto_pub -h localhost -p 1883 -u admin -P password -t “test/topic” -m “Authenticated message”

配置说明

命令行参数
参数 默认值 说明
-addr :1883 Broker 监听地址
-debug false 启用调试日志

内置用户

Broker 默认创建了以下测试用户:

用户名 密码
admin password
test test123

项目开源地址:

https://github.com/yangyongzhen/goang-mqtt-broker

gitee: https://gitee.com/yyz116/goang-mqtt-broker

作者

作者csdn猫哥,转载请注明出处: https://blog.csdn.net/yyz_1987

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.pswp.cn/news/906758.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

uv sync --frozen卡住不动

今天受邀帮同事调试uv卡住不动的问题&#xff0c;同样的代码已经在别的服务器跑起来了&#xff0c;换了一台服务器之后&#xff0c;执行uv sync --frozen没有按预期创建虚拟环境和安装依赖。 1. 镜像源是已经配置好的&#xff0c;pip install也能很快安装包。 2. 查看了uv.lo…

Spring Boot中如何对密码等敏感信息进行脱敏处理

以下是常见的脱敏方法及实现步骤&#xff0c;涵盖配置、日志和API响应等多个层面&#xff1a; ​1. 配置文件敏感信息脱敏​ (1) 使用加密库&#xff08;如Jasypt&#xff09; ​步骤​&#xff1a; 添加依赖&#xff1a; <dependency><groupId>com.github.ulise…

springboot中redis的事务的研究

redis的事务类似于队列操作&#xff0c;执行过程分为三步&#xff1a; 开启事务入队操作执行事务 使用到的几个命令如下&#xff1a; 命令说明multi开启一个事务exec事务提交discard事务回滚watch监听key(s)&#xff1a;当监听一个key(s)时&#xff0c;如果在本次事务提交之…

python打卡day35@浙大疏锦行

知识点回顾&#xff1a; 三种不同的模型可视化方法&#xff1a;推荐torchinfo打印summary权重分布可视化进度条功能&#xff1a;手动和自动写法&#xff0c;让打印结果更加美观推理的写法&#xff1a;评估模式 作业&#xff1a;调整模型定义时的超参数&#xff0c;对比下效果。…

Python爬虫实战:研究Crawley 框架相关技术

1. Crawley 框架相关定义 1.1 网络爬虫定义 网络爬虫是一种按照一定的规则,自动地抓取万维网信息的程序或者脚本。它通过 HTTP 协议与 Web 服务器进行交互,获取网页内容并进行解析处理,是数据采集和信息检索的重要工具。 1.2 Crawley 框架定义 Crawley 是一个基于 Pytho…

tvalid寄存器的理解

if(!out_axis_tvalid_reg || m_axis_tready ) beginend m_axis_tready 是上拍下一级给的ready信号 out_axis_tvalid_reg是上一拍&#xff0c;本级给下级的valid信号 一共有四种组合&#xff0c;然后可以通过这个if语句&#xff0c;在接下来的begin ... end中&#xff0c;用来…

【AI实战】从“苦AI”到“爽AI”:Magentic-UI 把“人类-多智能体协作”玩明白了!

Hello&#xff0c;亲爱的小伙伴们&#xff01;你是否曾经在深夜里&#xff0c;为了自动化点外卖、筛机票、抓网页数据焦头烂额&#xff1f;有没有幻想过哪天能出个“贴心AI管家”&#xff0c;一键点菜、搞定事务、自动操作网页&#xff0c;比你还懂你&#xff1f;更关键——还让…

【东枫科技】usrp rfnoc 开发环境搭建

作者 太原市东枫电子科技有限公司 &#xff0c;代理销售 USRP&#xff0c;Nvidia&#xff0c;等产品与技术支持&#xff0c;培训服务。 环境 Ubuntu 20.04 依赖包 sudo apt-get updatesudo apt-get install autoconf automake build-essential ccache cmake cpufrequtils …

Ntfs!ReadIndexBuffer函数分析之根目录读取索引缓冲区的一个例子

Ntfs!ReadIndexBuffer函数分析之根目录读取索引缓冲区的一个例子 第一部分&#xff1a; 0: kd> p Ntfs!ReadIndexBuffer0xdc: f7173962 e829f60300 call Ntfs!NtfsCheckIndexBuffer (f71b2f90) 0: kd> t Ntfs!NtfsCheckIndexBuffer: f71b2f90 55 p…

LumaDot (亮度可调的屏幕圆点)

应用名称 LumaDot &#xff08;源自 “Luminance”&#xff08;亮度&#xff09; “Dot”&#xff08;圆点&#xff09;&#xff0c;强调其核心功能&#xff1a;亮度可调的屏幕圆点&#xff09; 应用说明 LumaDot 是一款轻量级 Windows 桌面工具&#xff0c;专为需要屏幕标记…

HarmonyOS 鸿蒙应用开发基础:EventHub,优雅解决跨组件通信难题

EventHub是鸿蒙开发中用于线程内通信的事件中心模块&#xff0c;基于发布订阅模式实现组件间的高效通信。它完美解决了传统回调方式在多层嵌套场景下的痛点&#xff0c;使得组件间的通信更加灵活和易于管理。 核心特性 事件中心机制&#xff1a;通过事件名进行通信&#xff0c…

前端框架token相关bug,前后端本地联调

今天我搭建框架的时候&#xff0c;我想请求我自己的本地&#xff01;然后我自己想链接我自己的本地后端&#xff0c;我之前用的前端项目&#xff0c;都是链别人的后端&#xff0c;基本上很少情况会链接自己的后端&#xff01;所以我当时想的是&#xff0c;我前后端接口一样&…

【数据结构初阶】顺序表专题

文章目录 顺序表1.数据结构相关概念1、什么是数据结构2、为什么需要数据结构&#xff1f; 2.顺序表1、顺序表的概念及结构2、顺序表分类3、动态顺序表的实现1.定义一个动态顺序表2.顺序表的初始化3.顺序表的销毁4.顺序表达的尾插5.顺序表的头插6.空间大小检查函数7.顺序表的尾删…

从神经生物学到社会心理学:游戏沉迷机制的深度解构

你是否曾在深夜放下手机时惊觉&#xff1a;"明明只想玩10分钟&#xff0c;怎么天都亮了&#xff1f;"这不是意志力薄弱的表现&#xff0c;而是价值数十亿美元的游戏产业用神经科学精心设计的认知陷阱。 当《王者荣耀》的Victory音效让你心跳加速&#xff0c;当《原神…

15.集合框架的学习

一、简介 集合框架&#xff08;Collection Framework&#xff09; 是 Java 提供的一套用于存储、操作和处理数据集合的标准化架构。它主要位于 java.util 包中&#xff0c;提供了一组 接口 和 实现类&#xff0c;用于操作不同类型的数据集合&#xff0c;如列表&#xff08;List…

【方案分享】展厅智能讲解:基于BLE蓝牙Beacon的自动讲解触发技术实现

【方案分享】展厅智能讲解&#xff1a;基于BLE蓝牙Beacon的自动讲解触发技术实现 让观众靠近展品即可自动弹出讲解页面&#xff0c;是智能展厅的核心功能之一。本文将从软硬件技术、BLE Beacon原理、微信小程序实现、优劣对比与拓展方案五个维度&#xff0c;系统讲解“靠近展台…

微前端架构:从单体到模块化的前端新革命

在信息技术&#xff08;IT&#xff09;的迅猛发展中&#xff0c;前端开发领域正迎来一场颠覆性的变革 —— 微前端架构&#xff08;Micro - Frontends&#xff09;。2025 年&#xff0c;随着 Web 应用的复杂性激增、团队协作需求的增长以及用户对无缝体验的期待&#xff0c;微前…

React中常用的钩子函数:

一. 基础钩子 (1)useState 用于在函数组件中添加局部状态。useState可以传递一个参数&#xff0c;做为状态的初始值&#xff0c;返回一个数组&#xff0c;数组的第一个元素是返回的状态变量&#xff0c;第二个是修改状态变量的函数。 const [state, setState] useState(ini…

如何在 Windows 11 或 10 上通过 PowerShell 安装 Docker Desktop

了解如何使用 PowerShell 或命令提示符在 Windows 11 或 10 上安装 Docker CLI 和 Docker Desktop GUI,以创建容器运行虚拟机。无需手动访问网站下载安装程序,所有操作都将在命令终端完成。 Docker 是一个强大的容器化平台,允许开发人员将应用程序及其依赖项打包为轻量级容…

Python实例题:人机对战初体验Python基于Pygame实现四子棋游戏

目录 Python实例题 题目 代码实现 实现原理 游戏逻辑&#xff1a; AI 算法&#xff1a; 界面渲染&#xff1a; 关键代码解析 游戏棋盘渲染 AI 决策算法 胜利条件检查 使用说明 安装依赖&#xff1a; 运行游戏&#xff1a; 游戏操作&#xff1a; 扩展建议 增强…