AWS strands agents 当智能体作为独立服务/容器部署时,它们无法共享进程内状态

当智能体作为独立服务/容器部署时,它们无法共享进程内状态。 以下是针对分布式部署中动态内存库的生产就绪解决方案:

1. 基于外部存储的内存库

基于 DynamoDB 的共享内存

import boto3
from strands import Agent, tool
from typing import Dict, Any
import jsonclass DynamoDBMemoryBank:def __init__(self, table_name: str = "agent-shared-memory"):self.dynamodb = boto3.resource('dynamodb')self.table = self.dynamodb.Table(table_name)def store(self, memory_key: str, data: Dict[str, Any], agent_id: str):"""在共享内存中存储数据"""self.table.put_item(Item={'memory_key': memory_key,'data': json.dumps(data),'agent_id': agent_id,'timestamp': int(time.time()),'ttl': int(time.time()) + 86400  # 24 小时 TTL})def retrieve(self, memory_key: str) -> Dict[str, Any]:"""从共享内存检索数据"""try:response = self.table.get_item(Key={'memory_key': memory_key})if 'Item' in response:return json.loads(response['Item']['data'])except Exception as e:print(f"检索内存错误: {e}")return {}def list_keys(self, prefix: str = "") -> list:"""列出所有内存键,可选前缀过滤"""response = self.table.scan()keys = [item['memory_key'] for item in response['Items']]if prefix:keys = [k for k in keys if k.startswith(prefix)]return keys# 创建共享内存工具
memory_bank = DynamoDBMemoryBank()@tool
def store_shared_data(key: str, data: str, description: str = "") -> str:"""在所有智能体可访问的共享内存库中存储数据"""memory_bank.store(key, {'content': data,'description': description}, agent_id=os.getenv('AGENT_ID', 'unknown'))return f"存储数据,键为: {key}"@tool
def retrieve_shared_data(key: str) -> str:"""从共享内存库检索数据"""data = memory_bank.retrieve(key)if data:return f"{data.get('description', '')}: {data.get('content', '')}"return f"未找到键对应的数据: {key}"@tool
def list_memory_keys(prefix: str = "") -> str:"""列出可用的内存键"""keys = memory_bank.list_keys(prefix)return f"可用键: {', '.join(keys)}"

基于 Redis 的高性能内存

import redis
import json
from strands import Agent, toolclass RedisMemoryBank:def __init__(self, host: str = "redis-cluster.company.com", port: int = 6379):self.redis_client = redis.Redis(host=host, port=port, decode_responses=True,health_check_interval=30)def store(self, key: str, data: dict, ttl: int = 3600):"""存储数据并设置 TTL"""self.redis_client.setex(key, ttl, json.dumps(data))def retrieve(self, key: str) -> dict:"""检索数据"""data = self.redis_client.get(key)return json.loads(data) if data else {}def publish_event(self, channel: str, message: dict):"""向其他智能体发布事件"""self.redis_client.publish(channel, json.dumps(message))def subscribe_to_events(self, channels: list):"""订阅来自其他智能体的事件"""pubsub = self.redis_client.pubsub()pubsub.subscribe(channels)return pubsub# 基于 Redis 的内存工具
redis_memory = RedisMemoryBank()@tool
def store_fast_memory(key: str, data: str, ttl_minutes: int = 60) -> str:"""在高速 Redis 内存库中存储数据"""redis_memory.store(key, {'content': data}, ttl_minutes * 60)return f"存储在快速内存中: {key}"@tool
def get_fast_memory(key: str) -> str:"""从高速内存检索"""data = redis_memory.retrieve(key)return data.get('content', '未找到')

2. 基于 S3 的已部署智能体会话管理

python

from strands import Agent
from strands.session.s3_session_manager import S3SessionManager# 每个已部署的智能体使用 S3 进行持久化会话
class DeployedAgent:def __init__(self, agent_id: str, user_id: str = None):self.agent_id = agent_id# 创建带有 S3 后端的会话管理器session_id = f"{agent_id}_{user_id}" if user_id else agent_idself.session_manager = S3SessionManager(session_id=session_id,bucket=os.getenv('AGENT_MEMORY_BUCKET', 'agent-memory-bucket'),prefix=f"agents/{agent_id}/sessions")# 创建具有持久化会话的智能体self.agent = Agent(name=f"已部署智能体 {agent_id}",session_manager=self.session_manager,tools=[store_shared_data, retrieve_shared_data, *other_tools])async def process_request(self, message: str, context: dict = None):"""使用持久化内存处理请求"""if context:# 在共享内存中存储上下文供其他智能体使用await store_shared_data(f"context_{self.agent_id}_{int(time.time())}", json.dumps(context))return await self.agent.run(message)# 部署配置
deployed_orchestrator = DeployedAgent("orchestrator", "global")
deployed_specialist = DeployedAgent("data_specialist", "global")

3. 带有共享内存的容器部署

带有共享服务的 Docker Compose

# docker-compose.yml
version: '3.8'
services:# 共享内存服务redis:image: redis:7-alpineports:- "6379:6379"volumes:- redis_data:/datadynamodb-local:image: amazon/dynamodb-localports:- "8000:8000"command: ["-jar", "DynamoDBLocal.jar", "-sharedDb", "-inMemory"]# 智能体服务orchestrator:build: ./orchestratorenvironment:- AGENT_ID=orchestrator- REDIS_HOST=redis- DYNAMODB_ENDPOINT=http://dynamodb-local:8000- AGENT_MEMORY_BUCKET=shared-agent-memoryports:- "9000:9000"depends_on:- redis- dynamodb-localdata-specialist:build: ./data-specialistenvironment:- AGENT_ID=data_specialist- REDIS_HOST=redis- DYNAMODB_ENDPOINT=http://dynamodb-local:8000- AGENT_MEMORY_BUCKET=shared-agent-memoryports:- "9001:9001"depends_on:- redis- dynamodb-localresearch-agent:build: ./research-agentenvironment:- AGENT_ID=research_agent- REDIS_HOST=redis- DYNAMODB_ENDPOINT=http://dynamodb-local:8000- AGENT_MEMORY_BUCKET=shared-agent-memoryports:- "9002:9002"depends_on:- redis- dynamodb-localvolumes:redis_data:

4. 带有共享内存的 AWS ECS/Fargate 部署

# agent_service.py - 所有已部署智能体的基础服务
import os
import asyncio
from strands import Agent
from strands.multiagent.a2a import A2AServerclass ProductionAgentService:def __init__(self):self.agent_id = os.getenv('AGENT_ID')self.memory_bank = DynamoDBMemoryBank()self.redis_memory = RedisMemoryBank(host=os.getenv('REDIS_HOST', 'redis-cluster.company.com'))# 创建带有共享内存工具的智能体self.agent = Agent(name=f"生产环境智能体 {self.agent_id}",session_manager=S3SessionManager(session_id=f"prod_{self.agent_id}",bucket=os.getenv('AGENT_MEMORY_BUCKET'),prefix=f"production/{self.agent_id}"),tools=[store_shared_data,retrieve_shared_data,store_fast_memory,get_fast_memory,*self.get_specialized_tools()])def get_specialized_tools(self):"""在子类中重写以提供专用工具"""return []async def start_service(self):"""启动 A2A 服务"""port = int(os.getenv('PORT', 9000))server = A2AServer(agent=self.agent, port=port)# 启动后台任务asyncio.create_task(self.memory_sync_task())# 启动 A2A 服务器server.serve()async def memory_sync_task(self):"""同步内存和处理事件的后台任务"""pubsub = self.redis_memory.subscribe_to_events(['agent_events'])while True:try:message = pubsub.get_message(timeout=1.0)if message and message['type'] == 'message':await self.handle_agent_event(json.loads(message['data']))except Exception as e:print(f"内存同步错误: {e}")await asyncio.sleep(1)async def handle_agent_event(self, event: dict):"""处理来自其他智能体的事件"""if event.get('type') == 'memory_update':# 刷新本地缓存或采取行动print(f"内存被 {event.get('agent_id')} 更新: {event.get('key')}")# 专用智能体实现
class OrchestratorService(ProductionAgentService):def get_specialized_tools(self):return [orchestration_tools]class DataSpecialistService(ProductionAgentService):def get_specialized_tools(self):return [data_analysis_tools]# 主入口点
if __name__ == "__main__":agent_type = os.getenv('AGENT_TYPE', 'orchestrator')if agent_type == 'orchestrator':service = OrchestratorService()elif agent_type == 'data_specialist':service = DataSpecialistService()else:service = ProductionAgentService()asyncio.run(service.start_service())

5. 事件驱动的内存更新

# 事件驱动的内存同步
class EventDrivenMemoryBank:def __init__(self):self.dynamodb_memory = DynamoDBMemoryBank()self.redis_memory = RedisMemoryBank()self.sns = boto3.client('sns')self.topic_arn = os.getenv('AGENT_EVENTS_TOPIC_ARN')async def store_with_notification(self, key: str, data: dict, agent_id: str):"""存储数据并通知其他智能体"""# 在持久化存储中存储self.dynamodb_memory.store(key, data, agent_id)# 在快速缓存中存储self.redis_memory.store(key, data, ttl=3600)# 通知其他智能体event = {'type': 'memory_update','key': key,'agent_id': agent_id,'timestamp': time.time()}# 发布到 SNS 进行跨区域/服务通知await self.sns.publish(TopicArn=self.topic_arn,Message=json.dumps(event))# 发布到 Redis 进行实时更新self.redis_memory.publish_event('agent_events', event)@tool
def store_with_broadcast(key: str, data: str, description: str = "") -> str:"""存储数据并广播给所有智能体"""event_memory = EventDrivenMemoryBank()asyncio.create_task(event_memory.store_with_notification(key, {'content': data, 'description': description},os.getenv('AGENT_ID')))return f"已存储并广播: {key}"

此架构的主要优势

  • 可扩展性:智能体可以独立部署并根据需求扩展

  • 持久性:内存可以在容器重启和部署后保留

  • 实时同步:Redis 提供快速内存访问和事件通知

  • 耐用性:DynamoDB/S3 提供持久化存储

  • 事件驱动:智能体可以实时响应内存变化

  • 多区域:可以在 AWS 区域和可用区之间工作

生产部署模式

┌─────────────────┐    ┌─────────────────┐    ┌─────────────────┐
│   协调者        │    │  数据专家       │    │  研究智能体     │
│   (ECS 任务)    │    │  (ECS 任务)     │    │  (ECS 任务)     │
└─────────┬───────┘    └─────────┬───────┘    └─────────┬───────┘│                      │                      │└──────────────────────┼──────────────────────┘│┌─────────────┴─────────────┐│     共享内存层            ││                           ││  ┌─────────┐ ┌─────────┐  ││  │ Redis   │ │DynamoDB │  ││  │(快速)   │ │(持久)   │  ││  └─────────┘ └─────────┘  ││                           ││  ┌─────────┐ ┌─────────┐  ││  │   S3    │ │   SNS   │  ││  │(会话)   │ │(事件)   │  ││  └─────────┘ └─────────┘  │└───────────────────────────┘

这种架构确保您的已部署智能体可以共享动态内存库,同时保持分布式、可扩展部署的优势。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/pingmian/96404.shtml
繁体地址,请注明出处:http://hk.pswp.cn/pingmian/96404.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

第五节 JavaScript——引用类型、DOM/BOM 与异步编程

JavaScript 的第五节课通常会深入探讨 ​​引用类型、DOM 操作、BOM 操作、事件处理以及异步编程​​ 等核心概念。这些知识能让你创建动态交互丰富的网页。下面我将详细讲解这些内容并提供示例。 🚀 JavaScript 第五节:引用类型、DOM/BOM 与异步编程 ⚡ 一、引用类型 引…

使用Pycharm进行远程ssh(以Featurize为例)

使用Pycharm进行远程ssh(以Featurize为例)文章目录介绍应用背景远程连接Python连接Jupyter介绍应用背景 在使用Pycharm 专业版的时候进行远程ssh连接服务器(Featurize)的Python解释器和Jupyter 远程连接Python 打开Pycharm点击…

深入研究:ClickHouse中arrayExists与hasAny在ORDER BY场景下的性能差异

最近公司大数据情况下ClickHouse查询性能极差,后来发现在大数据量ORDER BY场景下,arrayExists(x -> x in ...)比hasAny性能快10倍!!!! 一、问题重述与研究背景 在大数据量 ORDER BY场景下,…

Spring AI (二)结合Mysql做聊天信息存储

上文讲了&#xff0c;用Spring ai做简单的聊天功能&#xff0c;没看过的可以查看下 Spring AI结合豆包模型 这里简单结合下Jdbc做下聊天记录的存储和查询&#xff0c;让对话变的更智能。 首先是Pom的支持 <dependency><groupId>org.springframework.ai</grou…

【docker】data-root 数据迁移(防止无法加载镜像和容器问题)

操作系统&#xff1a;ubuntu 24.04 docker版本&#xff1a;docker-ce 28.1.1 目标&#xff1a;将/var/lib/docker 的数据迁移到/data/docker停止docker sudo systemctl stop docker.socket sudo systemctl stop docker这个步骤一定要做&#xff0c;否则容易导致数据不一致。 rs…

二、网页的“化妆师”:从零学习 CSS

一、CSS 是什么 1.1 CSS 的定义 CSS&#xff08;Cascading Style Sheets&#xff0c;层叠样式表&#xff09; 是一种用来给 HTML 页面 添加样式的语言。 简单来说&#xff1a; HTML 负责结构 —— 决定网页上有什么内容。 CSS 负责样式 —— 决定这些内容“长什么样”。 如果…

传统项目管理与敏捷的核心差异

在项目管理领域&#xff0c;传统方法与敏捷方法代表了两种不同的管理思维与实践路径。传统项目管理强调计划性、规范性和阶段性推进&#xff0c;而敏捷则注重灵活性、快速迭代和价值交付。 正如彼得德鲁克所说&#xff1a;“没有完美的计划&#xff0c;只有不断调整的行动。”理…

axios+ts封装

http.ts import axios from axios import type { AxiosInstance, AxiosRequestConfig, AxiosResponse } from axios import qs from qs/*** 扩展AxiosRequestConfig&#xff0c;增加一些自定义的属性* isAuth: 自定义的参数中&#xff0c;用来判断是否携带token 因为AxiosReq…

2026新选题:基于K-Means实现学生求职意向聚类推荐职位

作者简介&#xff1a;Java领域优质创作者、CSDN博客专家 、CSDN内容合伙人、掘金特邀作者、阿里云博客专家、51CTO特邀作者、多年架构师设计经验、多年校企合作经验&#xff0c;被多个学校常年聘为校外企业导师&#xff0c;指导学生毕业设计并参与学生毕业答辩指导&#xff0c;…

SpringCloud gateway配置predicates的匹配规则

需求 通过gateway的route规则&#xff0c;实现分组流量配置 资源 一个nacos&#xff0c;一个gateway &#xff0c;一个服务app&#xff08;部署双实例group-1&#xff0c;group-2&#xff09;&#xff0c;实现特定条件下往分组一和分组二流量切换。 方案 1 配置文件 nacos…

android14 硬键盘ESC改BACK按键返回无效问题

在之前的android版本中修改外接键盘ESC为BACK按键做返回键使用&#xff0c;直接修改如下代码即可&#xff1a;--- a/frameworks/base/data/keyboards/Generic.kcmb/frameworks/base/data/keyboards/Generic.kcm-499,7 499,7 key PLUS {### Non-printing keys ###key ESCAPE { …

【开题答辩全过程】以 asp高校外卖订单系统的设计与实现为例,包含答辩的问题和答案

个人简介一名14年经验的资深毕设内行人&#xff0c;语言擅长Java、php、微信小程序、Python、Golang、安卓Android等开发项目包括大数据、深度学习、网站、小程序、安卓、算法。平常会做一些项目定制化开发、代码讲解、答辩教学、文档编写、也懂一些降重方面的技巧。感谢大家的…

UVa1063/LA3807 The Rotation Game

UVa1063/LA3807 The Rotation Game题目链接题意输入格式输出格式分析AC 代码IDA*分3次BFS题目链接 本题是2004年icpc亚洲区域赛上海赛区的H题 题意 如下图所示形状的棋盘上分别有8个1、2、3&#xff0c;要往A&#xff5e;H方向旋转棋盘&#xff0c;使中间8个方格数字相同。图&…

用pywin32连接autocad 写一个利用遗传算法从选择的闭合图形内进行最优利用率的排版 ai草稿

好的&#xff0c;我们来深入细说遗传算法&#xff08;Genetic Algorithm, GA&#xff09;在钣金自动排版中的应用。遗传算法 (GA) 在钣金排版中的详细解析遗传算法是一种受达尔文生物进化论启发的元启发式优化算法。它不追求一次性找到数学上的绝对最优解&#xff0c;而是通过模…

Go语言io.Copy深度解析:高效数据复制的终极指南

在日常开发中&#xff0c;我们经常需要在不同的数据源之间复制数据。无论是文件操作、网络传输还是进程通信&#xff0c;数据复制都是不可或缺的基础操作。Go语言的标准库提供了一个强大而高效的工具来简化这一过程&#xff1a;io.Copy。 什么是io.Copy&#xff1f; io.Copy是G…

【Vue3】07-利用setup编写vue(2)-setup的语法糖

其它篇章&#xff1a; 1.【Vue3】01-创建Vue3工程 2.【Vue3】02-Vue3工程目录分析 3.【Vue3】03-编写app组件——src 4.【Vue3】04-编写vue实现一个简单效果 5.【Vue3】05-Options API和Composition API的区别 6.【Vue3】06-利用setup编写vue&#xff08;1&#xff09; 7.【Vue…

Firefox自定义备忘

1.设置firefox右键点击标签直接关闭&#xff0c;由于目前没有插件能实现这个功能&#xff0c;只能手动设置了&#xff08;目前已知支持142和之前的版本&#xff09; firefox117右键关闭macWin 117版本应该可以了&#xff0c;大家可试下&#xff0c;配置方法参考之前的帖子&…

跨屏互联KuapingCMS建站系统发布更新 增加数据看板

跨屏互联KuapingCMS建站系统发布更新&#xff0c;增加了文章统计、产品统计、软文统计、流量统计、pv统计、ip统计、os访问者设备统计等等&#xff0c;整个体验会更好&#xff0c;数据显示更加直观&#xff0c;可以清晰看到最近的网站数据&#xff0c;特别是对于老板&#xff0…

WebSocket连接状态监控与自动重连实现

WebSocket连接状态监控与自动重连实现 下面我将实现一个具有连接状态监控和自动重连功能的WebSocket聊天室界面。 设计思路 创建直观的连接状态指示器实现自动重连机制&#xff0c;包括&#xff1a; 指数退避策略&#xff08;重连间隔逐渐增加&#xff09;最大重连次数限制手动…

【Vue2手录05】响应式原理与双向绑定 v-model

一、Vue2响应式原理&#xff08;底层基础&#xff09; Vue2的“响应式”核心是数据变化自动触发视图更新&#xff0c;其实现依赖Object.defineProperty API&#xff0c;但受JavaScript语言机制限制&#xff0c;存在“数组/对象修改盲区”&#xff0c;这是理解后续内容的关键。 …