【搭建Node-RED + MQTT Broker实现AI大模型交互】

搭建Node-RED + MQTT Broker实现AI大模型交互

  • 搭建Node-RED + MQTT Broker实现AI大模型交互
    • 一、系统架构
    • 二、环境准备与安装
      • 1. 安装Node.js
      • 2. 安装Mosquitto MQTT Broker
      • 3. 配置Mosquitto
      • 4. 安装Node-RED
      • 5. 配置Node-RED监听所有网络接口
      • 6. 启动Node-RED
    • 三、Node-RED流程配置
      • 1. 创建新流程
      • 2. 添加并配置MQTT In节点
      • 3. 添加并配置处理数据Function节点
      • 4. 添加并配置HTTP Request节点
      • 5. 添加并配置处理响应Function节点
      • 6. 添加并配置MQTT Out节点
      • 7. 添加错误处理
      • 8. 连接节点
      • 9. 部署流程
    • 四、测试系统
      • 1. 创建测试脚本
      • 2. 安装MQTT客户端库
      • 3. 运行测试
    • 五、在其他Linux客户端使用MQTT与系统交互
      • 1. 安装MQTT客户端工具
      • 2. 订阅消息(接收响应)
      • 3. 发送消息(请求AI处理)
      • 4. 使用Python进行交互(可选)
    • 六、系统安全与扩展
      • 安全配置
      • 系统扩展
    • 七、故障排除
    • 八、总结

搭建Node-RED + MQTT Broker实现AI大模型交互

本文档详细记录了使用Node-RED和MQTT Broker构建一个可与DeepSeek AI大模型交互的物联网平台的完整过程。

一、系统架构

[接收设备消息] --> [处理数据] --> [调用DeepSeek API] --> [处理API响应] --> [MQTT Out]
| ^
| |
v |
[错误消息] --> [错误消息接收] --> [MQTT err]

系统主要组件:

  1. 设备/客户端:向MQTT Broker发送请求消息
  2. MQTT Broker:消息代理,处理发布/订阅
  3. Node-RED:流程编排引擎,处理逻辑和API交互
  4. DeepSeek API:AI大模型服务

数据流向:

  1. 设备发布消息到device/data主题
  2. Node-RED订阅并处理消息
  3. Node-RED调用DeepSeek API
  4. Node-RED将响应发布到device/response主题
  5. 设备接收响应

二、环境准备与安装

1. 安装Node.js

首先确保安装了Node.js v18或更高版本:

curl -fsSL https://deb.nodesource.com/setup_18.x | bash -
apt-get install -y nodejs

验证安装:

node -v

2. 安装Mosquitto MQTT Broker

apt-get update
apt-get install -y mosquitto mosquitto-clients

3. 配置Mosquitto

创建配置文件,允许匿名连接:

cat > /etc/mosquitto/conf.d/default.conf << EOF
listener 1883
allow_anonymous true
EOF

重启Mosquitto服务:

systemctl restart mosquitto

4. 安装Node-RED

npm install -g --unsafe-perm node-red

5. 配置Node-RED监听所有网络接口

修改配置文件:

cat > /root/.node-red/settings.js << EOF
module.exports = {uiPort: process.env.PORT || 1880,uiHost: "0.0.0.0",
}
EOF

6. 启动Node-RED

nohup node-red > node-red.log 2>&1 &

三、Node-RED流程配置

访问Node-RED界面:http://服务器IP:1880/

1. 创建新流程

  1. 点击"+"按钮创建新的流程
  2. 将流程命名为"DeepSeek AI交互"

2. 添加并配置MQTT In节点

  1. 从节点面板中拖动"mqtt in"节点到工作区
  2. 双击节点进行配置:
    • 服务器:点击编辑按钮添加新的MQTT Broker
      • 名称:本地MQTT Broker
      • 服务器:localhost
      • 端口:1883
    • 主题:device/data
    • QoS:2
    • 输出:自动检测(JSON对象、字符串或buffer)
    • 名称:接收设备消息

3. 添加并配置处理数据Function节点

  1. 从节点面板中拖动"function"节点到工作区
  2. 双击节点进行配置:
    • 名称:处理数据
    • 函数代码:
// 处理接收到的设备数据
const deviceData = msg.payload;// 构建发送给DeepSeek API的请求
msg.payload = {"model": "deepseek-chat","messages": [{"role": "system", "content": "你是一个助手。"},{"role": "user", "content": deviceData.message}]
};// 设置请求头
msg.headers = {"Content-Type": "application/json"
};// 设置超时时间为60秒
msg.requestTimeout = 60000;return msg;

4. 添加并配置HTTP Request节点

  1. 从节点面板中拖动"http request"节点到工作区
  2. 双击节点进行配置:
    • 名称:调用DeepSeek API
    • 方法:POST
    • URL:https://api.deepseek.com/chat/completions
    • 返回:解析为JSON对象
    • 在认证选项卡中:
      • 使用:Bearer Authentication
      • Token:您的DeepSeek API Token (例如: sk-b30b58c4056e4149872d87eb9228ed54)
    • 添加请求头:
      • Content-Type: application/json

5. 添加并配置处理响应Function节点

  1. 从节点面板中拖动"function"节点到工作区
  2. 双击节点进行配置:
    • 名称:处理API响应
    • 函数代码:
// 处理DeepSeek API的响应
const response = msg.payload;// 提取AI回复内容
let aiResponse = "";
if (response && response.choices && response.choices.length > 0) {aiResponse = response.choices[0].message.content;
} else {aiResponse = "无法获取有效回复";node.warn("API响应格式不符合预期: " + JSON.stringify(response));
}// 构建回复消息
msg.payload = {"status": "success","response": aiResponse,"timestamp": new Date().toISOString()
};return msg;

6. 添加并配置MQTT Out节点

  1. 从节点面板中拖动"mqtt out"节点到工作区
  2. 双击节点进行配置:
    • 服务器:选择之前创建的本地MQTT Broker
    • 主题:device/response
    • QoS:1
    • 保留:否
    • 名称:MQTT Out

7. 添加错误处理

  1. 从节点面板中拖动"catch"节点到工作区

  2. 双击节点进行配置:

    • 名称:错误消息
  3. 添加处理错误的Function节点:

    • 名称:错误消息接收
    • 函数代码:
// 记录错误
node.error("处理错误: " + JSON.stringify(msg.error));// 构建错误响应
msg.payload = {"status": "error","message": msg.error ? (msg.error.message || "未知错误") : "处理请求时发生错误","code": msg.statusCode || 500,"timestamp": new Date().toISOString()
};// 设置主题(确保错误消息发送到正确的主题)
msg.topic = "device/error";return msg;
  1. 添加用于错误的MQTT Out节点:
    • 服务器:选择之前创建的本地MQTT Broker
    • 主题:device/error
    • QoS:1
    • 保留:否
    • 名称:MQTT err

8. 连接节点

按照以下顺序连接节点:

  1. 接收设备消息 → 处理数据
  2. 处理数据 → 调用DeepSeek API
  3. 调用DeepSeek API → 处理API响应
  4. 处理API响应 → MQTT Out
  5. 错误消息 → 错误消息接收
  6. 错误消息接收 → MQTT err

9. 部署流程

点击右上角的"部署"按钮使配置生效。

四、测试系统

1. 创建测试脚本

创建一个简单的Node.js脚本来测试系统:

cat > /root/test-mqtt.js << EOF
const mqtt = require('mqtt');
const client = mqtt.connect('mqtt://localhost:1883');client.on('connect', function () {console.log('已连接到MQTT Broker');client.subscribe('device/response');client.subscribe('device/error');const testMessage = { deviceId: 'test-001', message: '介绍一下Node-RED的基本功能', timestamp: new Date().toISOString() };console.log('发送测试消息:', testMessage);client.publish('device/data', JSON.stringify(testMessage));
});client.on('message', function (topic, message) {console.log('收到消息,主题:', topic);try { console.log(JSON.parse(message.toString())); } catch(e) { console.log(message.toString()); }
});setTimeout(function() { client.end(); console.log('测试完成,已断开连接'); 
}, 120000);
EOF

2. 安装MQTT客户端库

npm install mqtt

3. 运行测试

node /root/test-mqtt.js

输出结果示例:

已连接到MQTT Broker
发送测试消息: { deviceId: 'test-001',message: '介绍一下Node-RED的基本功能',timestamp: '2025-05-15T06:51:24.069Z' }
收到消息,主题: device/response
{status: 'success',response: 'Node-RED 是一个基于 Node.js 开发的低代码/可视化编程工具,主要用于连接硬件设备、API 和在线服务,构建物联网(IoT)应用或自动化工作流。其核心特点是通过拖放节点(Nodes)和连线(Flows)快速实现数据流处理,无需深入编码。以下是它的基本功能:\n\n1. 可视化流程编排\n   - 节点(Nodes):预置了大量功能模块\n   - 连线(Flows):用连线将节点按逻辑顺序连接\n\n2. 丰富的节点类型\n   - 输入节点:如 HTTP 请求、MQTT 订阅等\n   - 处理节点:函数、延迟、切换等\n   - 输出节点:数据库、API 调用、邮件通知等\n\n3. 易于集成和扩展\n   - 支持各种协议和服务的集成\n   - 可通过npm安装扩展节点',timestamp: '2025-05-15T06:51:42.361Z'
}
测试完成,已断开连接

五、在其他Linux客户端使用MQTT与系统交互

1. 安装MQTT客户端工具

# Debian/Ubuntu系统
sudo apt-get install mosquitto-clients# RHEL/CentOS系统
sudo yum install mosquitto-clients# Arch系统
sudo pacman -S mosquitto

2. 订阅消息(接收响应)

# 订阅响应主题
mosquitto_sub -h 服务器IP -t "device/response" -v# 订阅错误主题
mosquitto_sub -h 服务器IP -t "device/error" -v

3. 发送消息(请求AI处理)

# 发送消息
mosquitto_pub -h 服务器IP -t "device/data" -m '{"deviceId":"linux-001","message":"什么是物联网?","timestamp":"'$(date -Iseconds)'"}'

4. 使用Python进行交互(可选)

import paho.mqtt.client as mqtt
import json
import time
from datetime import datetime# MQTT服务器信息
broker_address = "服务器IP"
port = 1883
pub_topic = "device/data"
sub_topics = ["device/response", "device/error"]# 回调函数 - 连接成功
def on_connect(client, userdata, flags, rc):print("已连接到MQTT Broker")# 订阅主题for topic in sub_topics:client.subscribe(topic)print(f"已订阅主题: {topic}")# 回调函数 - 接收消息
def on_message(client, userdata, msg):print(f"\n收到消息 主题: {msg.topic}")try:payload = json.loads(msg.payload.decode())print(json.dumps(payload, indent=2, ensure_ascii=False))except:print(msg.payload.decode())# 创建客户端
client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message# 连接到Broker
client.connect(broker_address, port, 60)# 启动网络循环
client.loop_start()# 发送测试消息
def send_message(message_text):msg = {"deviceId": "python-device-001","message": message_text,"timestamp": datetime.now().isoformat()}print(f"发送消息: {json.dumps(msg, ensure_ascii=False)}")client.publish(pub_topic, json.dumps(msg))# 等待连接建立
time.sleep(1)# 示例查询
send_message("请解释什么是物联网?")# 保持脚本运行以接收响应
try:while True:user_input = input("\n输入问题(输入'exit'退出): ")if user_input.lower() == 'exit':breaksend_message(user_input)time.sleep(1)
except KeyboardInterrupt:print("程序被用户中断")# 断开连接
client.loop_stop()
client.disconnect()

六、系统安全与扩展

安全配置

  1. MQTT安全加强

    • 添加用户名密码认证:修改/etc/mosquitto/conf.d/default.conf
    • 启用TLS加密:配置证书
  2. Node-RED安全加强

    • 添加登录认证:修改settings.js
    • 部署HTTPS:配置证书

系统扩展

  1. 添加Dashboard

    • 安装Node-RED Dashboard节点
    • 创建可视化界面监控系统
  2. 添加数据存储

    • 连接数据库(MySQL/MongoDB)存储交互历史
  3. 支持多种AI模型

    • 扩展Function节点支持其他AI模型API
  4. 设备认证与管理

    • 开发设备注册和认证机制

七、故障排除

  1. 无法连接MQTT Broker

    • 检查防火墙是否开放1883端口
    • 检查Mosquitto服务状态
  2. 无法访问Node-RED

    • 检查防火墙是否开放1880端口
    • 检查Node-RED进程是否运行
  3. API调用失败

    • 检查API Token是否正确
    • 检查网络连接和API地址
    • 检查HTTP Request节点配置
    • 增加请求超时时间
  4. 消息格式错误

    • 确保发送JSON格式消息
    • 确保包含message字段

八、总结

通过本文档,我们就完成了一个基于Node-RED和MQTT Broker的AI交互系统搭建,实现了:

  1. 接收来自设备的MQTT消息
  2. 处理消息并调用DeepSeek AI API
  3. 将AI响应通过MQTT返回给设备
  4. 错误处理和日志记录

这个系统可以作为物联网设备与AI大模型交互的基础平台,可根据实际需求进行扩展和优化。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.pswp.cn/web/80475.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

算法第21天 | 第77题. 组合、216. 组合总和 III、17. 电话号码的字母组合

回溯基础概念 什么是回溯&#xff1f; 如何实现回溯&#xff1f; 第77题. 组合 题目 思路与解法 carl的讲解&#xff1a; 回溯搜索法 class Solution:def combine(self, n: int, k: int) -> List[List[int]]:self.path []self.res []self.backtracking(n, k, 1)retu…

嵌入式硬件篇---拓展板

文章目录 前言 前言 本文简单介绍了拓展板的原理以及使用。

【深度学习基础】从感知机到多层神经网络:模型原理、结构与计算过程全解析

【深度学习基础】从感知机到多层神经网络&#xff1a;模型原理、结构与计算过程全解析 1. 引言 神经网络的重要性&#xff1a; 作为人工智能的核心技术之一&#xff0c;神经网络通过模拟人脑神经元的工作机制&#xff0c;成为解决复杂模式识别、预测和决策任务的利器。从图像分…

sparkSQL读入csv文件写入mysql(2)

&#xff08;二&#xff09;创建数据库和表 接下来&#xff0c;我们去创建一个新的数据库&#xff0c;数据表&#xff0c;并插入一条数据。 -- 创建数据库 CREATE DATABASE spark; -- 使用数据库 USE spark;-- 创建表 create table person(id int, name char(20), age int);-- …

JVM如何处理多线程内存抢占问题

目录 1、堆内存结构 2、运行时数据 3、内存分配机制 3.1、堆内存结构 3.2、内存分配方式 1、指针碰撞 2、空闲列表 4、jvm内存抢占方案 4.1、TLAB 4.2、CAS 4.3、锁优化 4.4、逃逸分析与栈上分配 5、问题 5.1、内存分配竞争导致性能下降 5.2、伪共享&#xff08…

Ubuntu---omg又出bug了

自用遇到问题的合集 250518——桌面文件突然消失 ANS&#xff1a;参考博文

正则表达式与文本处理的艺术

引言 在前端开发领域&#xff0c;文本处理是一项核心技能。正则表达式作为一种强大的模式匹配工具&#xff0c;能够帮助我们高效地处理各种复杂的文本操作任务。 正则表达式基础 什么是正则表达式&#xff1f; 正则表达式是一种用于匹配字符串中字符组合的模式。它由一系列…

初学c语言15(字符和字符串函数)

一.字符串分类函数 头文件&#xff1a;ctype.h 作用&#xff1a;判断是什么类型的字符 函数举例&#xff1a; 函数 符合条件就为真 islower判断是否为小写字符&#xff08;a~z&#xff09;isupper判断是否为大写字符&#xff08;A~Z&#xff09;isdigit十进制数字&#xf…

12-串口外设

一、串口外设的基本概述 1、基本定义 串口通信&#xff0c;通过在通信双方之间以比特位&#xff08;bit&#xff09;的形式逐一发送或接收数据&#xff0c;实现了信息的有效传递。其通信方式不仅简单可靠&#xff0c;而且成本很低。 2、stm32的串口 下面是两个MCU的数据交互&…

NE555双音门铃实验

1脚为地。通常被连接到电路共同接地。 2脚为触发输入端。 3脚为输出端&#xff0c;输出的电平状态受触发器的控制&#xff0c;而触发器受上比较器6脚和下比较器2脚的控制。当触发器接受上比较器A1从R脚输入的高电平时&#xff0c;触发器被置于复位状态&#xff0c;3脚输出低电…

Redis分布式锁实现

概述 为什么要要分布式锁 在并发编程中&#xff0c;我们通过锁&#xff0c;来避免由于竞争而造成的数据不一致问题。 通常&#xff0c;我们以synchronized 、Lock来使用它。Java中的锁&#xff0c;只能保证在同一个JVM进程内中执行 如果需要在分布式集群环境下的话&#xff0…

软件设计师-错题笔记-网络基础知识

1. 解析&#xff1a; 1.子网划分相关知识&#xff1a; 在IPv4地址中&#xff0c;/27表示子网掩码为255.255.255.224&#xff0c;它将一个C类网络&#xff08;默认子网掩码255.255.255.0&#xff09;进一步划分 对于子网掩码255.255.255.224&#xff0c;其对应的二进制为111…

Fine-Tuning Llama2 with LoRA

Fine-Tuning Llama2 with LoRA 1. What is LoRA?2. How does LoRA work?3. Applying LoRA to Llama2 models4. LoRA finetuning recipe in torchtune5. Trading off memory and model performance with LoRAModel ArgumentsReferences https://docs.pytorch.org/torchtune/ma…

python打卡day29

类的装饰器 知识点回顾 类的装饰器装饰器思想的进一步理解&#xff1a;外部修改、动态类方法的定义&#xff1a;内部定义和外部定义 回顾一下&#xff0c;函数的装饰器是 &#xff1a;接收一个函数&#xff0c;返回一个修改后的函数。类也有修饰器&#xff0c;类装饰器本质上确…

十一、STM32入门学习之FREERTOS移植

目录 一、FreeRTOS1、源码下载&#xff1a;2、解压源码 二、移植步骤一&#xff1a;在需要移植的项目中新建myFreeRTOS的文件夹&#xff0c;用于存放FREERTOS的相关源码步骤二&#xff1a;keil中包含相关文件夹和文件引用路径步骤三&#xff1a;修改FreeRTOSConfig.h文件的相关…

2025 年十大网络安全预测

随着我们逐步迈向 2026 年&#xff0c;网络安全领域正处于一个关键的转折点&#xff0c;技术创新与数字威胁以前所未有的复杂态势交织在一起。 地缘政治环境进一步加剧了这些网络安全挑战&#xff0c;国际犯罪组织利用先进的技术能力来追求战略目标。 人工智能在这一不断演变…

Mac 环境下 JDK 版本切换全指南

概要 在 macOS 上安装了多个 JDK 后&#xff0c;可以通过系统自带的 /usr/libexec/java_home 工具来查询并切换不同版本的 Java。只需在终端中执行 /usr/libexec/java_home -V 列出所有已安装的 JDK&#xff0c;然后将你想使用的版本路径赋值给环境变量 JAVA_HOME&#xff0c;…

中级网络工程师知识点6

1.堆叠方式可以共享使用交换机背板带宽&#xff1b;级联方式可以使用双绞线将交换机连接在一起 2.光功率计是专门测量光功率大小的仪器&#xff0c;在对光缆进行检测时&#xff0c;通过在光缆的发送端和接收端分别测量光功率&#xff0c;进而计算出光衰情况。 3.光时域反射计…

动态规划——乌龟棋

题目描述 解题思路 首先这是一个很明显的线性dp的题目&#xff0c;很容易发现规律 数据输入 我们用 h[ N ] 数组存储每一个格子的分数 用 cnt [ ]&#xff0c;数组表示每一中卡片的数目 1&#xff0c;状态表示 因为这里一个有4种跳跃方式可以选择 f[ i ][ a ][ b ][ c ][ d…

C#自定义控件-实现了一个支持平移、缩放、双击重置的图像显示控件

1. 控件概述 这是一个继承自 Control 的自定义控件&#xff0c;主要用于图像的显示和交互操作&#xff0c;具有以下核心功能&#xff1a; 图像显示与缩放&#xff08;支持鼠标滚轮缩放&#xff09;图像平移&#xff08;支持鼠标拖拽&#xff09;视图重置&#xff08;双击重置…