QT聊天项目DAY18

1.文件传输

1.1 客户端

采用分块传输(20MB/块),以及MD5码校验并将读出的二进制数据采用Base64编码进行传输

1.1.0 通信协议

1.1.1 UI

采用垂直布局,该布局大小为570 * 160,间隔全是0,UI方面不详细介绍了

1.1.2 MainWindow

头文件

#ifndef MAINWINDOWS_H
#define MAINWINDOWS_H#include <QObject>
#include <QString>
#include <QtWidgets/QMainWindow>
#include "ui_mainWindows.h"class mainWindows : public QMainWindow
{Q_OBJECTpublic:mainWindows(QWidget *parent = nullptr);~mainWindows();private:void InitUI();void BindSlots();private slots:void OnFileBtnClicked();void OnConnectBtnClicked();void OnUploadBtnClicked();void OnTestBtnClicked();private:Ui::mainWindowsClass ui;QString _fileName;                                                                              // 点击选择文件时,保存选中的文件路径QString _fileMD5;                                                                               // 文件的MD5码QString _serverIP;int _serverPort;
};#endif // MAINWINDOWS_H

构造函数

设置按钮的可用状态

绑定按钮的槽函数,实时更新进度条,当连接成功时设置按钮的可用状态,以及网络错误给出提示

点击选择文件按钮时,保存文件的路径信息

连接按钮点击时,向服务器发起连接

测试通信

上传文件,分段发送

1.打开文件

2.计算MD5值

3.获取文件名以及文件大小

4.计算文件需要分几段去发送,目前我定义一段为20M

5. 循环发送文件之前的一些准备

6.循环发送文件内容

首先读取文件内容,第一次读取,这段内容的序号就是1,然后将数据转换成Base64,最后标记当前分段是否是要发送的文件末尾,然后TCP使用json序列化,将所有数据存储到Json中进行发送

7.最后关闭文件

1.1.3 TcpMgr

头文件

#ifndef TCPMGR_H
#define TCPMGR_H#include <QObject>
#include <QTcpSocket>
#include "Singletion.h"class TcpMgr  : public QObject, public Singletion<TcpMgr>
{Q_OBJECTfriend class Singletion<TcpMgr>;public:~TcpMgr();// 删除拷贝和复制TcpMgr(const TcpMgr&) = delete;TcpMgr& operator=(const TcpMgr&) = delete;public:void ConnectToHost(const QString& hostName, quint16 port);void DisconnectFromHost();void SendData(const quint16 MsgID, QByteArray& data);bool IsConnected() const;private:TcpMgr(QObject* parent = 0);void ProcessData();void BindSlots();signals:void sigSendMsg(const quint16 MsgID, QByteArray& data);void sigConnected(bool isConnected);void sigLogicProcess(int MsgID, QJsonObject& jsonObj);void sigNetError(QString errorMsg);private slots:void slotConnected();																						// 当连接成功建立时void slotReadyRead();																						// TCP缓存有数据可读时void slotDisconnected();																					// 当连接断开时void slotErrorOccured(QAbstractSocket::SocketError socketError);											// 当发生错误时void slotSendMsg(const quint16 MsgID, QByteArray& data);													// 发送消息private:QTcpSocket* m_tcpSocket;QByteArray _buffer;
};#endif // TCPMGR_H

构造函数

QT会自己为槽函数设置队列,不用自己在设置队列和互斥,直接用信号,最方便

解析服务器返回的数据

1.将服务器发来的数据读取出来并进行存储,然后循环取出每一个消息,ID+DATA_LEN+DATA有效的避免了粘包问题

2.取出消息头部(6字节)

3.取出消息ID(2字节)和消息长度(4字节)

4.判断缓冲区的内容是否大于该消息总长度

5.读取数据并将这段数据从缓冲区中移除,将二进制文件解析成Json

发送消息

ID + DATA_LEN + DATA

1.1.4 处理服务器响应报文 LogicSystem

TcpMgr将消息整合成Json格式,交给LogicSystem处理

构造函数

创建工作线程,该线程负责处理消息队列里的任务

工作线程,当有信息添加进来会立即调用槽函数,该函数会在线程中执行,然后创建消息副本,处理上次发送的消息

class Worker : public QObject
{Q_OBJECTpublic:~Worker();Worker(QObject* parent = 0);friend void operator<<(Worker* worker, Message& message);private:void RegisterFunc(quint16 id, Func func);void DisTestMessage(QJsonObject& json);void DisUploadMessage(QJsonObject& json);void AddTask(Message& message);void ProcessTask(Message& message);public slots:void ConsumMessage();signals:void sigTransFile(int transSize);private:QMap<quint16, Func> m_funcMap;QQueue<Message> m_messageQueue;QMutex m_mutex;
};

添加任务

消费任务

处理测试信息

处理文件上传响应,更新进度条

优雅退出

1.2 服务器

整个服务器框架,依旧采用事件循环池+监听连接+将连接转交给CSession(接受和发送数据全在线程中处理)+具体的处理放在LogicSystem中进行

LogicSystem处理消息

将消息添加到消息队列中

void LogicSystem::PostMsgToQueue(LogicNode* logicNode)
{unique_lock<mutex> lock(_mutex);_msgQueue.push(logicNode);if (_msgQueue.size() >= 1){lock.unlock();																									// 解锁_consumer.notify_one();}
}

1.2.1 单线程处理客户端请求

LogicSystem构造函数

在该线程中循环处理消息,消息队列为空就阻塞等待被唤醒

LogicSystem::LogicSystem():bStop(false)
{RegisterCallBack();_outPath = get<string>(ServerStatic::ParseConfig("FileOutPath", "Path"));											// 获取文件存储路径_workThread = thread(&LogicSystem::DealMsg, this);																	// 后台线程不停的处理消息
}

循环处理消息,没有消息就阻塞,被唤醒就处理消息,如果析构了,就处理完剩余的消息

调用客户端请求对应的处理函数

void LogicSystem::RegisterCallBack()
{_funcMap[MSG_ID_TEST_REQUEST] = bind(&LogicSystem::DisTestMessage, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3);_funcMap[MSG_ID_UPLOAD_FILE_REQUEST] = bind(&LogicSystem::DisUploadFileMessage, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3);
}

处理测试消息

处理文件上传消息

1.Json解析

2.解码

3.取出客户端发送的内容

4.判断是否是第一个包,来创建文件打开方式

5.文件打开与写入

6.返回上传文件响应

单线程LogicSystem

#ifndef LOGICSYSTEM_H
#define LOGICSYSTEM_H
#include "GlobalHead.h"
#include "Singletion.h"
#include "Struct.h"// 把函数地址当作变量类型来使用
class CSession;
class LogicNode;
using FunCallBack = function<void(CSession* session, const short& msg_id, const string& msg_data)>;class LogicSystem : public std::enable_shared_from_this<LogicSystem>, public Singletion<LogicSystem>
{friend class Singletion<LogicSystem>;																// 为了访问该类的构造函数public:~LogicSystem();																						void PostMsgToQueue(LogicNode* logicNode);private:LogicSystem();void DealMsg();																						// 处理信息void RegisterCallBack();																			// 注册回调函数void ConsumerFunc();private:void DisTestMessage(CSession* session, const short& msg_id, const string& msg_data);				// 处理测试消息void DisUploadFileMessage(CSession* session, const short& msg_id, const string& msg_data);			// 处理上传文件消息private:thread _workThread;																					// 工作线程处理消息queue<LogicNode*> _msgQueue;mutex _mutex;condition_variable _consumer;																		// 消费者,条件变量bool bStop;map<short, FunCallBack> _funcMap;																	// 回调函数映射表unordered_map<int, UserInfo*> _userMaps;string _outPath;																					// 文件存储路径
};#endif // LOGICSYSTEM_H#include "LogicSystem.h"
#include <fstream>
#include "CSession.h"
#include "MsgNode.h"
#include "Enum.h"
#include "ServerStatic.h"
#include "UserMgr.h"
#include "base64.h"using namespace std;LogicSystem::LogicSystem():bStop(false)
{RegisterCallBack();_outPath = get<string>(ServerStatic::ParseConfig("FileOutPath", "Path"));											// 获取文件存储路径_workThread = thread(&LogicSystem::DealMsg, this);																	// 后台线程不停的处理消息
}LogicSystem::~LogicSystem()
{bStop = true;_consumer.notify_all();_workThread.join();
}void LogicSystem::PostMsgToQueue(LogicNode* logicNode)
{unique_lock<mutex> lock(_mutex);_msgQueue.push(logicNode);if (_msgQueue.size() >= 1){lock.unlock();																									// 解锁_consumer.notify_one();}
}void LogicSystem::DealMsg()
{while (true){unique_lock<mutex> lock(_mutex);_consumer.wait(lock, [this] {return!_msgQueue.empty(); });														// 等待队列不为空if (bStop){while (!_msgQueue.empty()){ConsumerFunc();}break;}ConsumerFunc();}
}void LogicSystem::ConsumerFunc()
{LogicNode* logicNode = _msgQueue.front();short id = logicNode->_rn->_msg_id;auto it = _funcMap.find(id);if (it != _funcMap.end()){it->second(logicNode->_cs.get(), id, string(logicNode->_rn->_data, logicNode->_rn->_curLen));}_msgQueue.pop();delete logicNode;
}void LogicSystem::RegisterCallBack()
{_funcMap[MSG_ID_TEST_REQUEST] = bind(&LogicSystem::DisTestMessage, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3);_funcMap[MSG_ID_UPLOAD_FILE_REQUEST] = bind(&LogicSystem::DisUploadFileMessage, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3);
}void LogicSystem::DisTestMessage(CSession* session, const short& msg_id, const string& msg_data)
{Json::Reader reader;Json::Value root;if (!reader.parse(msg_data, root)){cout << "parse json error" << endl;return;}string data = root["data"].asString();cout << "receive test message: " << data << endl;Json::Value response;ConnectionRAII conn([&response, &session](){string response_str = response.toStyledString();session->Send(response_str, MSG_ID_TEST_RESPONSE);});response["code"] = ErrorCodes::SUCCESS;response["data"] = "recv test message success";
}void LogicSystem::DisUploadFileMessage(CSession* session, const short& msg_id, const string& msg_data)
{Json::Reader reader;Json::Value root;if (!reader.parse(msg_data, root)){cout << "parse json error" << endl;return;}string data = root["data"].asString();Json::Value response;ConnectionRAII conn([&response, &session](){string response_str = response.toStyledString();session->Send(response_str, MSG_ID_UPLOAD_FILE_RESPONSE);});// 解码string decoded = base64_decode(data);int seq = root["seq"].asInt();string name = root["name"].asString();int totalSize = root["totalSize"].asInt();int transSize = root["transSize"].asInt();cout << "receive upload file message: " << name << " seq: " << seq << " totalSize: " << totalSize << " transSize: " << transSize << endl;string filePath = _outPath + "/" + name;cout << "outPath: " << filePath << endl;ofstream ofs;// 第一个包if (seq == 1){ofs.open(filePath, ios::binary | ios::trunc);}else{ofs.open(filePath, ios::binary | ios::app);}// 检查文件是否打开成功if (!ofs.is_open()){cerr << "open file error" << endl;response["code"] = ErrorCodes::FILE_OPEN_ERROR;return;}// 写入文件ofs.write(decoded.data(), decoded.size());if (!ofs){cerr << "write file error" << endl;response["code"] = ErrorCodes::FILE_WRITE_ERROR;return;}// 关闭文件ofs.close();cout << "write file success" << endl;// 响应response["code"] = ErrorCodes::SUCCESS;response["seq"] = seq;response["name"] = name;response["transSize"] = transSize;response["totalSize"] = totalSize;
}

1.2.2 多线程处理客户端请求

1.2.2.1 自己尝试使用多线程来接受文件

创建线程池

class ThreadPool
{
public:~ThreadPool();ThreadPool(int threadNum);void Stop() { bStop = true; }friend void operator<<(ThreadPool& pool, shared_ptr<FileInfo> fileInfo);private:void PostFileToBuffer(shared_ptr<FileInfo> fileInfo);												// 将文件添加到缓存区void WriteFile(vector<shared_ptr<FileInfo>> fileInfos);												// 从缓冲区中读取文件并写入到内存中void DealFileBuffer();																				// 处理文件缓冲区private:vector<thread> _threads;int _threadNum;bool bStop = false;																					// 线程池停止标志mutex _waitFileBuffer_mutex;																		// 等待文件缓冲区互斥锁condition_variable _waitFileBuffer_cv;																// 等待文件缓冲区非空vector<shared_ptr<FileInfo>> _fileBuffer;															// 文件缓冲区
};

构造函数

将待写入文件的信息,先解析到数组里,然后多线程去写入

写入到文件中

添加待写入字符串到数组中

存储文件的结构体

LogicSystem构造函数

将文件写入缓冲区

完整代码

头文件

#ifndef LOGICSYSTEM_H
#define LOGICSYSTEM_H
#include "GlobalHead.h"
#include "Singletion.h"
#include "Struct.h"// 把函数地址当作变量类型来使用
class CSession;
class LogicNode;
using FunCallBack = function<void(CSession* session, const short& msg_id, const string& msg_data)>;class ThreadPool
{
public:~ThreadPool();ThreadPool(int threadNum);void Stop() { bStop = true; }friend void operator<<(ThreadPool& pool, shared_ptr<FileInfo> fileInfo);private:void PostFileToBuffer(shared_ptr<FileInfo> fileInfo);												// 将文件添加到缓存区void WriteFile(vector<shared_ptr<FileInfo>> fileInfos);												// 从缓冲区中读取文件并写入到内存中void DealFileBuffer();																				// 处理文件缓冲区private:vector<thread> _threads;int _threadNum;bool bStop = false;																					// 线程池停止标志mutex _waitFileBuffer_mutex;																		// 等待文件缓冲区互斥锁condition_variable _waitFileBuffer_cv;																// 等待文件缓冲区非空vector<shared_ptr<FileInfo>> _fileBuffer;															// 文件缓冲区
};class LogicSystem : public std::enable_shared_from_this<LogicSystem>, public Singletion<LogicSystem>
{friend class Singletion<LogicSystem>;																// 为了访问该类的构造函数friend class ThreadPool;public:~LogicSystem();																						void PostMsgToQueue(LogicNode* logicNode);LogicSystem(const LogicSystem&) = delete;															// 禁止拷贝构造函数LogicSystem& operator=(const LogicSystem&) = delete;												// 禁止拷贝赋值运算符private:LogicSystem();void DealMsg();																						// 处理信息void RegisterCallBack();																			// 注册回调函数void ConsumerFunc();private:void DisTestMessage(CSession* session, const short& msg_id, const string& msg_data);				// 处理测试消息void DisUploadFileMessage(CSession* session, const short& msg_id, const string& msg_data);			// 处理上传文件消息private:thread _workThread;																					// 工作线程处理消息queue<LogicNode*> _msgQueue;mutex _mutex;condition_variable _consumer;																		// 消费者,条件变量bool bStop;map<short, FunCallBack> _funcMap;																	// 回调函数映射表unordered_map<int, UserInfo*> _userMaps;string _outPath;																					// 文件存储路径ThreadPool* _threadPool;																			// 线程池
};#endif // LOGICSYSTEM_H

实现文件

#include "LogicSystem.h"
#include <fstream>
#include "CSession.h"
#include "MsgNode.h"
#include "Enum.h"
#include "ServerStatic.h"
#include "UserMgr.h"
#include "base64.h"using namespace std;#pragma region /* ThreadPool */ThreadPool::~ThreadPool()
{bStop = true;_waitFileBuffer_cv.notify_all();for (auto& t : _threads){t.join();}
}ThreadPool::ThreadPool(int threadNum): _threadNum(threadNum)
{for (int i = 0; i < threadNum; i++){_threads.push_back(thread(&ThreadPool::DealFileBuffer, this));}
}void operator<<(ThreadPool& pool, shared_ptr<FileInfo> fileInfo)
{pool.PostFileToBuffer(fileInfo);
}void ThreadPool::PostFileToBuffer(shared_ptr<FileInfo> fileInfo)
{unique_lock<mutex> lock(_waitFileBuffer_mutex);_fileBuffer.push_back(fileInfo);sort(_fileBuffer.begin(), _fileBuffer.end());											// 排序_fileBuffer.push_back(fileInfo);if (_fileBuffer.size() >= 1){lock.unlock();_waitFileBuffer_cv.notify_one();}
}void ThreadPool::DealFileBuffer()
{while (true){unique_lock<mutex> lock(_waitFileBuffer_mutex);_waitFileBuffer_cv.wait(lock, [this] {return!_fileBuffer.empty(); });vector<shared_ptr<FileInfo>> fileInfos = std::move(_fileBuffer);_fileBuffer.clear();lock.unlock();if (bStop){WriteFile(std::move(fileInfos));break;}WriteFile(std::move(fileInfos));}
}void ThreadPool::WriteFile(vector<shared_ptr<FileInfo>> fileInfos)
{for (shared_ptr<FileInfo> fileInfo : fileInfos){cout << "write file: " << fileInfo->_name << " seq: " << fileInfo->_curSeq << "writePath: " << fileInfo->_writePath << endl;ofstream ofs;// 第一个包if (fileInfo->_curSeq == 1){ofs.open(fileInfo->_writePath, ios::binary | ios::trunc);}else{ofs.open(fileInfo->_writePath, ios::binary | ios::app);}// 检查文件是否打开成功if (!ofs.is_open()){cerr << "open file error" << endl;return;}// 写入文件ofs.write(fileInfo->_data.data(), fileInfo->_data.size());if (!ofs){cerr << "write file error" << endl;return;}// 关闭文件ofs.close();cout << "write file success" << endl;}
}#pragma endregion#pragma region /* LogicSystem */
LogicSystem::LogicSystem():bStop(false)
{RegisterCallBack();_outPath = get<string>(ServerStatic::ParseConfig("FileOutPath", "Path"));											// 获取文件存储路径_workThread = thread(&LogicSystem::DealMsg, this);																	// 后台线程不停的处理消息_threadPool = new ThreadPool(5);																					// 创建线程池
}LogicSystem::~LogicSystem()
{bStop = true;_consumer.notify_all();_workThread.join();delete _threadPool;
}void LogicSystem::PostMsgToQueue(LogicNode* logicNode)
{unique_lock<mutex> lock(_mutex);_msgQueue.push(logicNode);if (_msgQueue.size() >= 1){lock.unlock();																									// 解锁_consumer.notify_one();}
}void LogicSystem::DealMsg()
{while (true){unique_lock<mutex> lock(_mutex);_consumer.wait(lock, [this] {return!_msgQueue.empty(); });														// 等待队列不为空if (bStop){while (!_msgQueue.empty()){ConsumerFunc();}break;}ConsumerFunc();}
}void LogicSystem::ConsumerFunc()
{LogicNode* logicNode = _msgQueue.front();short id = logicNode->_rn->_msg_id;auto it = _funcMap.find(id);if (it != _funcMap.end()){it->second(logicNode->_cs.get(), id, string(logicNode->_rn->_data, logicNode->_rn->_curLen));}_msgQueue.pop();delete logicNode;
}void LogicSystem::RegisterCallBack()
{_funcMap[MSG_ID_TEST_REQUEST] = bind(&LogicSystem::DisTestMessage, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3);_funcMap[MSG_ID_UPLOAD_FILE_REQUEST] = bind(&LogicSystem::DisUploadFileMessage, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3);
}void LogicSystem::DisTestMessage(CSession* session, const short& msg_id, const string& msg_data)
{Json::Reader reader;Json::Value root;if (!reader.parse(msg_data, root)){cout << "parse json error" << endl;return;}string data = root["data"].asString();cout << "receive test message: " << data << endl;Json::Value response;ConnectionRAII conn([&response, &session](){string response_str = response.toStyledString();session->Send(response_str, MSG_ID_TEST_RESPONSE);});response["code"] = ErrorCodes::SUCCESS;response["data"] = "recv test message success";
}void LogicSystem::DisUploadFileMessage(CSession* session, const short& msg_id, const string& msg_data)
{Json::Reader reader;Json::Value root;if (!reader.parse(msg_data, root)){cout << "parse json error" << endl;return;}string data = root["data"].asString();Json::Value response;ConnectionRAII conn([&response, &session](){string response_str = response.toStyledString();session->Send(response_str, MSG_ID_UPLOAD_FILE_RESPONSE);});// 解码string decoded = base64_decode(data);int seq = root["seq"].asInt();string name = root["name"].asString();int totalSize = root["totalSize"].asInt();int transSize = root["transSize"].asInt();cout << "receive upload file message: " << name << " seq: " << seq << " totalSize: " << totalSize << " transSize: " << transSize << endl;string filePath = _outPath + "/" + name;cout << "outPath: " << filePath << endl;// 将文件写入缓冲区shared_ptr<FileInfo> fileInfo_ptr = make_shared<FileInfo>(name, filePath, seq, decoded);*_threadPool << fileInfo_ptr;// 响应response["code"] = ErrorCodes::SUCCESS;response["seq"] = seq;response["name"] = name;response["transSize"] = transSize;response["totalSize"] = totalSize;
}#pragma endregion

但是速率并没有增加多少,有问题

不采用多线程写入文件,而采用多线程解析文件并存入缓冲区

线程池中包含一个互斥变量,逻辑处理类中包含一个互斥变量

线程池给消息队列上锁

唤醒阻塞的线程执行Tcp消息任务

将上传文件任务按照序列seq解析到缓冲区中,此时会唤醒因为文件缓冲区为空而等待的线程

LogicSystem给文件缓冲区上锁

唤醒因为文件缓冲区为空而阻塞等待执行写入的线程

线程写入内容到文件

void LogicSystem::DealFileBuffer()
{while (true){unique_lock<mutex> lock(_waitFileBuffer_mutex);_waitFileBuffer_cv.wait(lock, [this] {return!_fileBuffer.empty(); });vector<shared_ptr<FileInfo>> fileInfos = std::move(_fileBuffer);_fileBuffer.clear();lock.unlock();if (bStop){WriteFile(std::move(fileInfos));break;}WriteFile(std::move(fileInfos));}
}void LogicSystem::WriteFile(vector<shared_ptr<FileInfo>> fileInfos)
{ofstream ofs;for (shared_ptr<FileInfo> fileInfo : fileInfos){// 获取响应Json::Value &response = fileInfo->_response;// 返回响应ConnectionRAII conn([&response, &fileInfo](){string response_str = response.toStyledString();fileInfo->_session->Send(response_str, MSG_ID_UPLOAD_FILE_RESPONSE);});cout << "write file: " << fileInfo->_name << " seq: " << fileInfo->_curSeq << "writePath: " << fileInfo->_writePath << endl;// 打开文件if (!ofs.is_open()){// 第一个包if (fileInfo->_curSeq == 1){ofs.open(fileInfo->_writePath, ios::binary | ios::trunc);ConnectionRAII conn([&ofs](){ofs.close();});}else{ofs.open(fileInfo->_writePath, ios::binary | ios::app);}}// 检查文件是否打开成功if (!ofs.is_open()){cerr << "open file error" << endl;response["code"] = ErrorCodes::FILE_OPEN_ERROR;return;}// 写入文件ofs.write(fileInfo->_data.data(), fileInfo->_data.size());if (!ofs){cerr << "write file error" << endl;response["code"] = ErrorCodes::FILE_WRITE_ERROR;return;}cout << "write file success" << endl;}
}

接收文件速率并没有什么提高

头文件

#ifndef LOGICSYSTEM_H
#define LOGICSYSTEM_H
#include "GlobalHead.h"
#include "Singletion.h"
#include "Struct.h"// 把函数地址当作变量类型来使用
class CSession;
class LogicNode;
using FunCallBack = function<void(CSession* session, const short& msg_id, const string& msg_data)>;class ThreadPool : public Singletion<ThreadPool>
{friend class Singletion<ThreadPool>;																// 为了访问该类的构造函数
public:~ThreadPool();void Stop() { bStop = true; }friend void operator<<(ThreadPool& pool, shared_ptr<LogicNode> logicNode);protected:ThreadPool(int threadNum);private:void DealMsg();																						// 处理信息void ConsumerFunc();void PraseFileInfo(shared_ptr<LogicNode> logicNode);												// 解析文件信息private:vector<thread> _threads;int _threadNum;bool bStop = false;																					// 线程池停止标志mutex _waitTcpMsg_mutex;																			// 等待tcp消息互斥锁condition_variable _waitTcpMsg_cv;																	// 等待tcp消息非空queue<shared_ptr<LogicNode>> _msgQueue;																// TCP消息队列
};class LogicSystem : public std::enable_shared_from_this<LogicSystem>, public Singletion<LogicSystem>
{friend class Singletion<LogicSystem>;																// 为了访问该类的构造函数friend class ThreadPool;public:~LogicSystem();																						void PostMsgToQueue(shared_ptr<LogicNode> logicNode);LogicSystem(const LogicSystem&) = delete;															// 禁止拷贝构造函数LogicSystem& operator=(const LogicSystem&) = delete;												// 禁止拷贝赋值运算符private:LogicSystem();private:void RegisterCallBack();																			// 注册回调函数void DisTestMessage(CSession* session, const short& msg_id, const string& msg_data);				// 处理测试消息void PostFileToBuffer(shared_ptr<FileInfo> fileInfo);												// 将文件添加到缓存区void WriteFile(vector<shared_ptr<FileInfo>> fileInfos);												// 从缓冲区中读取文件并写入到内存中void DealFileBuffer();																				// 处理文件缓冲区private:map<short, FunCallBack> _funcMap;																	// 回调函数映射表string _outPath;																					// 文件存储路径thread _workThread;																					// 工作线程shared_ptr<ThreadPool> _threadPool;																	// 线程池bool bStop = false;																					// 线程池停止标志mutex _waitFileBuffer_mutex;																		// 等待文件缓冲区互斥锁condition_variable _waitFileBuffer_cv;																// 等待文件缓冲区非空vector<shared_ptr<FileInfo>> _fileBuffer;															// 文件缓冲区
};#endif // LOGICSYSTEM_H

实现文件

#include "LogicSystem.h"
#include <fstream>
#include "CSession.h"
#include "MsgNode.h"
#include "Enum.h"
#include "ServerStatic.h"
#include "UserMgr.h"
#include "base64.h"using namespace std;#pragma region /* ThreadPool */ThreadPool::~ThreadPool()
{bStop = true;_waitTcpMsg_cv.notify_all();for (auto& t : _threads){t.join();}
}ThreadPool::ThreadPool(int threadNum): _threadNum(threadNum)
{for (int i = 0; i < threadNum; i++){_threads.push_back(thread(&ThreadPool::DealMsg, this));}
}void operator<<(ThreadPool& pool, shared_ptr<LogicNode> Message)
{unique_lock<mutex> lock(pool._waitTcpMsg_mutex);pool._msgQueue.push(Message);if (pool._msgQueue.size() >= 1){lock.unlock();																									// 解锁pool._waitTcpMsg_cv.notify_all();}
}void ThreadPool::DealMsg()
{while (true){unique_lock<mutex> lock(_waitTcpMsg_mutex);_waitTcpMsg_cv.wait(lock, [this] {return!_msgQueue.empty(); });													// 等待队列不为空cout << "_msgQueue size: " << _msgQueue.size() << endl;if (bStop){while (!_msgQueue.empty()){ConsumerFunc();}break;}ConsumerFunc();}
}void ThreadPool::ConsumerFunc()
{shared_ptr<LogicNode> logicNode = _msgQueue.front();_msgQueue.pop();uint16_t id = logicNode->_rn->_msg_id;if (id == MSG_ID_UPLOAD_FILE_REQUEST){PraseFileInfo(logicNode);}else if (id == MSG_ID_TEST_REQUEST){LogicSystem::GetInstance()->DisTestMessage(logicNode->_cs.get(), id, string(logicNode->_rn->_data, logicNode->_rn->_curLen));}
}void ThreadPool::PraseFileInfo(shared_ptr<LogicNode> logicNode)
{const string& msg_data = string(logicNode->_rn->_data, logicNode->_rn->_curLen);Json::Reader reader;Json::Value root;if (!reader.parse(msg_data, root)){cout << "parse json error" << endl;return;}string data = root["data"].asString();// 解码string decoded = base64_decode(data);int seq = root["seq"].asInt();string name = root["name"].asString();int totalSize = root["totalSize"].asInt();int transSize = root["transSize"].asInt();cout << "receive upload file message: " << name << " seq: " << seq << " totalSize: " << totalSize << " transSize: " << transSize << endl;string filePath = LogicSystem::GetInstance()->_outPath + "/" + name;// 响应Json::Value response;response["code"] = ErrorCodes::SUCCESS;response["seq"] = seq;response["name"] = name;response["transSize"] = transSize;response["totalSize"] = totalSize;// 将文件写入缓冲区shared_ptr<FileInfo> fileInfo_ptr = make_shared<FileInfo>(name, filePath, seq, decoded, response, logicNode->_cs);LogicSystem::GetInstance()->PostFileToBuffer(fileInfo_ptr);
}#pragma endregion#pragma region /* LogicSystem */
LogicSystem::LogicSystem():bStop(false)
{RegisterCallBack();_outPath = get<string>(ServerStatic::ParseConfig("FileOutPath", "Path"));											// 获取文件存储路径_workThread = thread(&LogicSystem::DealFileBuffer, this);															// 后台线程不停的处理消息_threadPool = ThreadPool::GetInstance(5);																			// 获取线程池
}LogicSystem::~LogicSystem()
{bStop = true;_waitFileBuffer_cv.notify_all();_workThread.join();
}void LogicSystem::PostMsgToQueue(shared_ptr<LogicNode> logicNode)
{*_threadPool << logicNode;cout << "post message to queue" << endl;
}void LogicSystem::RegisterCallBack()
{_funcMap[MSG_ID_TEST_REQUEST] = bind(&LogicSystem::DisTestMessage, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3);
}void LogicSystem::DisTestMessage(CSession* session, const short& msg_id, const string& msg_data)
{Json::Reader reader;Json::Value root;if (!reader.parse(msg_data, root)){cout << "parse json error" << endl;return;}string data = root["data"].asString();cout << "receive test message: " << data << endl;Json::Value response;ConnectionRAII conn([&response, &session](){string response_str = response.toStyledString();session->Send(response_str, MSG_ID_TEST_RESPONSE);});response["code"] = ErrorCodes::SUCCESS;response["data"] = "recv test message success";
}void LogicSystem::PostFileToBuffer(shared_ptr<FileInfo> fileInfo)
{unique_lock<mutex> lock(_waitFileBuffer_mutex);_fileBuffer.push_back(fileInfo);sort(_fileBuffer.begin(), _fileBuffer.end());											// 排序if (_fileBuffer.size() >= 1){lock.unlock();_waitFileBuffer_cv.notify_one();}
}void LogicSystem::DealFileBuffer()
{while (true){unique_lock<mutex> lock(_waitFileBuffer_mutex);_waitFileBuffer_cv.wait(lock, [this] {return!_fileBuffer.empty(); });vector<shared_ptr<FileInfo>> fileInfos = std::move(_fileBuffer);_fileBuffer.clear();lock.unlock();if (bStop){WriteFile(std::move(fileInfos));break;}WriteFile(std::move(fileInfos));}
}void LogicSystem::WriteFile(vector<shared_ptr<FileInfo>> fileInfos)
{ofstream ofs;for (shared_ptr<FileInfo> fileInfo : fileInfos){// 获取响应Json::Value &response = fileInfo->_response;// 返回响应ConnectionRAII conn([&response, &fileInfo](){string response_str = response.toStyledString();fileInfo->_session->Send(response_str, MSG_ID_UPLOAD_FILE_RESPONSE);});cout << "write file: " << fileInfo->_name << " seq: " << fileInfo->_curSeq << "writePath: " << fileInfo->_writePath << endl;// 打开文件if (!ofs.is_open()){// 第一个包if (fileInfo->_curSeq == 1){ofs.open(fileInfo->_writePath, ios::binary | ios::trunc);ConnectionRAII conn([&ofs](){ofs.close();});}else{ofs.open(fileInfo->_writePath, ios::binary | ios::app);}}// 检查文件是否打开成功if (!ofs.is_open()){cerr << "open file error" << endl;response["code"] = ErrorCodes::FILE_OPEN_ERROR;return;}// 写入文件ofs.write(fileInfo->_data.data(), fileInfo->_data.size());if (!ofs){cerr << "write file error" << endl;response["code"] = ErrorCodes::FILE_WRITE_ERROR;return;}cout << "write file success" << endl;}
}#pragma endregion

1.2.2.2 老师的多线程思路

将处理客户端消息包的具体逻辑挪移到LogicWork类中,该类是一个线程类

头文件

#include <string>
#include <queue>
#include <map>
#include <mutex>
#include <thread>
#include <condition_variable>
#include <functional>
#include <atomic>using namespace std;
class CSession;
class LogicNode;using FunCallBack = function<void(shared_ptr<CSession> session, const short& msg_id, const string& msg_data)>;class LogicWorker
{
public:LogicWorker();~LogicWorker();friend void operator<<(shared_ptr<LogicWorker> worker , const shared_ptr<LogicNode> node);			// 向消息队列中添加消息private:void RegisterCallBack();																			// 注册回调函数void DisTestMessage(shared_ptr<CSession> session, const uint16_t& msg_id, const string& msg_data);	// 处理测试消息void DisUplaodFile(shared_ptr<CSession> session, const uint16_t& msg_id, const string& msg_data);	// 处理文件上传消息void DealMsg();																						// 处理信息void ConsumerFunc(shared_ptr<LogicNode> logicNode);private:map<short, FunCallBack> _funcMap;																	// 回调函数映射表atomic<bool> _b_stop;																				// 线程池停止标志thread _woker_thread;																				// 工作线程mutex _wait_tcpMsg_mutex;																			// 等待tcp消息互斥锁condition_variable _wait_tcpMsg_cv;																	// 等待tcp消息非空queue<shared_ptr<LogicNode>> _msgQueue;																// TCP消息队列
};#endif // LOGICWORKER_H

实现文件

#include "LogicWorker.h"
#include "CSession.h"
#include "MsgNode.h"
#include "Enum.h"
#include "Struct.h"
#include "FileSystem.h"LogicWorker::LogicWorker():_b_stop(false)
{RegisterCallBack();_woker_thread = thread(&LogicWorker::DealMsg, this);
}LogicWorker::~LogicWorker()
{_b_stop = true;_wait_tcpMsg_cv.notify_one();_woker_thread.join();
}void LogicWorker::RegisterCallBack()
{_funcMap[MSG_ID_TEST_REQUEST] = bind(&LogicWorker::DisTestMessage, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3);_funcMap[MSG_ID_UPLOAD_FILE_REQUEST] = bind(&LogicWorker::DisUplaodFile, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3);
}void LogicWorker::DisTestMessage(shared_ptr<CSession> session, const uint16_t& msg_id, const string& msg_data)
{Json::Reader reader;Json::Value root;if (!reader.parse(msg_data, root)){cout << "parse json error" << endl;return;}string data = root["data"].asString();cout << "receive test message: " << data << endl;Json::Value response;ConnectionRAII conn([&response, &session](){string response_str = response.toStyledString();session->Send(response_str, MSG_ID_TEST_RESPONSE);});response["code"] = ErrorCodes::SUCCESS;response["data"] = "recv test message success";
}void LogicWorker::DisUplaodFile(shared_ptr<CSession> session, const uint16_t& msg_id, const string& msg_data)
{Json::Reader reader;Json::Value root;if (!reader.parse(msg_data, root)){cout << "parse json error" << endl;return;}string data = root["data"].asString();int seq = root["seq"].asInt();string name = root["name"].asString();int totalSize = root["totalSize"].asInt();int transSize = root["transSize"].asInt();int totalSeq = root["totalSeq"].asInt();hash<string> hashFunc;size_t fileHash = hashFunc(name);int index = fileHash % THREAD_COUNT;shared_ptr<FileTask> fileTask = make_shared<FileTask>(session, name, seq, totalSize, transSize, totalSeq, data);FileSystem::GetInstance()->PostMsgToQueue(fileTask, index);
}void LogicWorker::DealMsg()
{while (!_b_stop){unique_lock<mutex> lock(_wait_tcpMsg_mutex);_wait_tcpMsg_cv.wait(lock, [this] {return!_msgQueue.empty(); });													// 等待队列不为空cout << "_msgQueue size: " << _msgQueue.size() << endl;if(_b_stop)break;queue<shared_ptr<LogicNode>> msgQueue = move(_msgQueue);															// 移动队列lock.unlock();while (!msgQueue.empty()){ConsumerFunc(msgQueue.front());msgQueue.pop();}}
}void LogicWorker::ConsumerFunc(shared_ptr<LogicNode> logicNode)
{uint16_t id = logicNode->_rn->_msg_id;auto it = _funcMap.find(id);if (it != _funcMap.end()){it->second(logicNode->_cs, id, string(logicNode->_rn->_data, logicNode->_rn->_msgLen));}
}void operator<<(shared_ptr<LogicWorker> worker, const shared_ptr<LogicNode> node)
{{unique_lock<mutex> lock(worker->_wait_tcpMsg_mutex);worker->_msgQueue.push(node);}worker->_wait_tcpMsg_cv.notify_one();
}

执行文件写入的线程类FileWorker

头文件

#ifndef FILEWORKER_H
#define FILEWORKER_H#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <atomic>
#include <string>
using namespace std;class CSession;struct 	FileTask
{FileTask(shared_ptr<CSession> session, string file_name, int seq, int total_size, int trans_size, int last_seq, string file_data):_session(session), _file_name(file_name), _seq(seq), _total_size(total_size), _trans_size(trans_size), _last_seq(last_seq), _file_data(file_data){}~FileTask() {}shared_ptr<CSession> _session;string _file_name;int _seq;int _total_size;int _trans_size;int _last_seq;string _file_data;
};class FileWorker
{
public:FileWorker();~FileWorker();friend void operator<<(shared_ptr<FileWorker> worker, shared_ptr<FileTask> task);private:void ExecuteTask(shared_ptr<FileTask> task);void Run();private:thread _worker_thread;queue<shared_ptr<FileTask>> _task_queue;atomic<bool> _b_stop;mutex _wait_file_task_mutex;condition_variable _wait_file_task_cv;
};#endif // FILEWORKER_H

实现文件

#include "FileWorker.h"
#include <fstream>
#include "base64.h"
#include "ServerStatic.h"
#include "CSession.h"
#include "Enum.h"FileWorker::FileWorker():_b_stop(false)
{_worker_thread = thread(&FileWorker::Run, this);
}FileWorker::~FileWorker()
{while (!_task_queue.empty()){_task_queue.pop();}_b_stop = true;_wait_file_task_cv.notify_one();_worker_thread.join();
}void FileWorker::Run()
{while (!_b_stop){unique_lock<mutex> lock(_wait_file_task_mutex);_wait_file_task_cv.wait(lock, [this]() { return !_task_queue.empty() || _b_stop; });if (_b_stop)break;queue<shared_ptr<FileTask>> task_queue = move(_task_queue);lock.unlock();while (!task_queue.empty()){shared_ptr<FileTask> task = task_queue.front();task_queue.pop();ExecuteTask(task);}}
}void operator<<(shared_ptr<FileWorker> worker, shared_ptr<FileTask> task)
{{unique_lock<mutex> lock(worker->_wait_file_task_mutex);worker->_task_queue.push(task);}worker->_wait_file_task_cv.notify_one();
}void FileWorker::ExecuteTask(shared_ptr<FileTask> task)
{// 响应Json::Value response;response["seq"] = task->_seq;response["name"] = task->_file_name;response["transSize"] = task->_trans_size;response["totalSize"] = task->_total_size;ConnectionRAII RAII([&response, task](){string response_str = response.toStyledString();task->_session->Send(response_str, MSG_IDS::MSG_ID_UPLOAD_FILE_RESPONSE);});// 解码string decode = base64_decode(task->_file_data);string filePath = get<string>(ServerStatic::ParseConfig("FileOutPath", "Path")) + "/" + task->_file_name;int last_seq = task->_last_seq;// 写入文件ofstream ofs;if (task->_seq == 1){ofs.open(filePath, ios::binary | ios::trunc);}else{ofs.open(filePath, ios::binary | ios::app);}if (!ofs.is_open()){response["code"] = ErrorCodes::FILE_OPEN_ERROR;return;}ofs.write(decode.c_str(), decode.size());if (!ofs){response["code"] = ErrorCodes::FILE_WRITE_ERROR;return;}ofs.close();response["code"] = ErrorCodes::SUCCESS;cout << "FileWorker::ExecuteTask: " << filePath << " write curr_seq : " << task->_seq << "	success!!!" << endl;
}

该多线程是基于多个客户端的基础上来实现的,为每一个会话单独的分配线程去负责对应的网络传输,而不是我理解的多个线程去处理某一个文件传输任务,此外处理Tcp请求信息的是一个专属线程,专门处理文件传输的又是一个线程,在添加Tcp消息和任务时都是添加到对应的线程类所在的队列中

针对不同的会话ID来创建对应的hash_value,但是还是有很大概率会分配到同一个处理TCP消息的线程中;

如下

针对不同的文件名创建对应的hash_value,来分配对应的线程处理文件传输

同样的也会有这种问题

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/news/918442.shtml
繁体地址,请注明出处:http://hk.pswp.cn/news/918442.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

centos系统sglang单节点本地部署大模型

前置工作 本地部署大模型的基本概念和前置工作-CSDN博客 模型部署 这里通过docker容器进行部署。我这里是h20*8,部署deepseek-v3-0324,这个配置和模型都比较大,大家根据自己的硬件对应调整 步骤一 我们要通过sglang部署模型,先拉取sglang的docker镜像,这里下载失败的…

【dij算法/最短路/分层图】P4568 [JLOI2011] 飞行路线

题目描述 Alice 和 Bob 现在要乘飞机旅行&#xff0c;他们选择了一家相对便宜的航空公司。该航空公司一共在 nnn 个城市设有业务&#xff0c;设这些城市分别标记为 000 到 n−1n-1n−1&#xff0c;一共有 mmm 种航线&#xff0c;每种航线连接两个城市&#xff0c;并且航线有一定…

告别传统,CVPR三论文用GNN动态图重塑视觉AI

本文选自gongzhonghao【图灵学术SCI论文辅导】关注我们&#xff0c;掌握更多顶会顶刊发文资讯今天&#xff0c;为大家推荐一个极具前沿价值与实用潜力的研究方向&#xff1a;图神经网络&#xff08;GNN&#xff09;。作为深度学习领域的新兴力量&#xff0c;图神经网络在近年顶…

HTTP/HTTPS代理,支持RSA和SM2算法

在日常工作和学习中&#xff0c;我们经常遇到HTTP和HTTPS的相关问题&#xff0c;要解决这些问题&#xff0c;有时就需要搭建各种实验环境&#xff0c;重现业务场景&#xff0c;比如&#xff1a; 将HTTP转为HTTPS。本地只能发送HTTP请求&#xff0c;但是远程服务器却只能接收HT…

如何提高AI写作论文的查重率?推荐七个AI写作论文工具

随着AI技术在学术领域的广泛应用&#xff0c;越来越多的学生和研究人员开始使用AI写作工具来提高写作效率&#xff0c;帮助完成毕业论文、科研论文等。然而&#xff0c;AI生成的内容是否会提高论文的查重率&#xff1f;是否能有效避免重复和提高通过率&#xff1f;这些问题成为…

跨平台、低延迟、可嵌入:实时音视频技术在 AI 控制系统中的进化之路

引言&#xff1a;面向未来的实时音视频基座 在万物互联与智能化加速落地的时代&#xff0c;实时音视频技术早已不再只是社交娱乐的附属功能&#xff0c;而是智慧城市、应急指挥、远程操控、工业智造、教育培训、安防监控等系统的“神经中枢”。一条高性能、可控、低延迟的视频…

Spring WebFlux开发指导

Spring WebFlux是一个响应式的web服务器端应用开发框架&#xff0c;响应式是指&#xff0c;当前端组件的状态发生变化&#xff0c;则生成事件通知&#xff0c;根据需求可异步或者同步地向服务器端接口发送请求&#xff0c;当服务器端网络IO组件的状态发生变化&#xff0c;则生成…

09-docker镜像手动制作

文章目录一.手动制作单服务的nginx镜像1.启动一个基础容器&#xff0c;此处我使用的是centos7镜像。2.修改容器中的软件源3.安装nginx服务并启动nginx服务4.修复nginx的首页文件5.退出容器6.将退出的容器提交为镜像7.测试镜像的可用性二.手动制作多服务的nginx sshd镜像1.启用…

Android.mk教程

语法 Android.mk 的必备三行 LOCAL_PATH : $(call my-dir) # Android.mk的目录&#xff0c;call调用函数include $(CLEAR_VARS) # 除了LOCAL_PATH清除所有LOCAL_XXXinclude $(BUILD_SHARED_LIBRARY) # BUILD_XXX, 指定构建类型 # BUILD_SHARED_LIBRARY → .so动态库 # BUILD…

稠密检索:基于神经嵌入的高效语义搜索范式

本文由「大千AI助手」原创发布&#xff0c;专注用真话讲AI&#xff0c;回归技术本质。拒绝神话或妖魔化。搜索「大千AI助手」关注我&#xff0c;一起撕掉过度包装&#xff0c;学习真实的AI技术&#xff01; 1. 背景与定义 稠密检索&#xff08;Dense Retrieval&#xff09;是一…

AI日报0807 | GPT-5或今晚1点来袭:四大版本全曝光

关注&#xff1a;未来世界2099每日分享&#xff1a;全球最新AI资讯【应用商业技术其他】服务&#xff1a;【学习Q】【资源Q】【学习资料】【行业报告】&#xff08;无限免费下载&#xff09;应用 1、讯飞星火代码画布震撼上线&#xff1a;动嘴就能开发&#xff0c;工作效率翻倍…

认识爬虫 —— 正则表达式提取

本质是对字符串的处理&#xff0c;正则表达式描述的是一种字符串匹配的模式。简而言之&#xff0c;用具备一定特征意义的表达式对字符串进行检查&#xff0c;将符合条件的子字符串提取出来。导入模块import re一、单字符匹配match(表达式&#xff0c;匹配对象)&#xff1a;匹配…

单链表专题---暴力算法美学(1)(有视频演示)

1.1 移除链表元素 题目要求&#xff1a;给你一个链表的头节点head 和一个整数val,请你删除链表中所有满足Node.val val 的节点&#xff0c;并返回新的头节点。 思路一&#xff1a;遍历链表&#xff0c;遇到val就删除&#xff0c;pcur指向val的下一个节点&#xff0c;最后只剩…

机器学习-决策树(DecisionTree)

0 回归决策树展示 import pandas as pd import numpy as np from sklearn.tree import DecisionTreeRegressor from sklearn.metrics import root_mean_squared_error, r2_score from sklearn.model_selection import GridSearchCV,KFold from sklearn.model_selection import…

【Java Web】JDBC 连接 MySQL 实现数据库 CRUD(增删改查)详解

在 Java Web 开发中&#xff0c;与数据库交互是不可避免的&#xff0c;而 JDBC&#xff08;Java Database Connectivity&#xff09; 是 Java 官方提供的标准数据库连接接口&#xff0c;几乎所有 Java 项目中都用过它。 本文通过一个完整示例&#xff0c;带你从零实现 增&#…

HTTP 请求返回状态码和具体含义?200、400、403、404、502、503、504等

HTTP 状态码是服务器对客户端请求的响应状态标识&#xff0c;分为五大类&#xff08;以第一位数字区分&#xff09;&#xff0c;常用状态码如下&#xff1a; 1. 信息类&#xff08;1xx&#xff09;&#xff1a;请求已接收&#xff0c;继续处理 100 Continue&#xff1a;服务器已…

13-netty基础-手写rpc-消费方生成代理-05

netty系列文章&#xff1a; 01-netty基础-socket02-netty基础-java四种IO模型03-netty基础-多路复用select、poll、epoll04-netty基础-Reactor三种模型05-netty基础-ByteBuf数据结构06-netty基础-编码解码07-netty基础-自定义编解码器08-netty基础-自定义序列化和反序列化09-n…

ThreadLocal有哪些内存泄露问题,如何避免?

每个Thread都有一个ThreadLocal.ThreadLocalMap的map&#xff0c;该map的key为ThreadLocal实例&#xff0c;它为一个弱引 用&#xff0c;我们知道弱引用有利于GC回收。当ThreadLocal的key null时&#xff0c;GC就会回收这部分空间&#xff0c;但是value却不一 定能够被回收&am…

从0到1学LangChain之Agent代理:解锁大模型应用新姿势

从0到1学LangChain之Agent代理&#xff1a;解锁大模型应用新姿势 本文较长&#xff0c;建议点赞收藏&#xff0c;以免遗失。更多AI大模型开发 学习视频/籽料/面试题 都在这>>Github<< 什么是 LangChain Agent 代理 如果把大模型比作一个超级大脑&#xff0c;那么…

Spring Boot 2.6.0+ 循环依赖问题及解决方案

Spring Boot 2.6.0 循环依赖问题及解决方案 目录 背景解决方案 1. 配置文件开启循环依赖&#xff08;侵入性最低&#xff0c;临时方案&#xff09;2. Lazy 延迟注入&#xff08;侵入性低&#xff0c;推荐优先尝试&#xff09;3. 手动从容器获取&#xff08;ApplicationContex…