注:以下内容为与 GPT-4O 共同创作完成
以共享内存的方式实现多程序之间的数据通信,尤其适合在一台机器上的多程序之间进行高频数据交换。
以下示例展示了 sender.py 向 receiver.py 发送数据并接收经 receiver.py 处理后的数据,以及如何通过事件驱动机制实现 receiver.py 等待 sender.py 的通知,避免了轮询共享内存,降低 CPU 使用率。
manager_server.py
import multiprocessing
multiprocessing.set_start_method("spawn", force=True)from multiprocessing.managers import BaseManager
from multiprocessing import Event# 定义共享事件对象
event_send = Event()
event_receive = Event()# 定义普通函数来返回事件对象
def get_event_send():return event_senddef get_event_receive():return event_receiveclass SharedManager(BaseManager):passdef main():# 注册共享事件对象SharedManager.register("get_event_send", callable=get_event_send)SharedManager.register("get_event_receive", callable=get_event_receive)# 启动 Manager 服务器manager = SharedManager(address=("localhost", 50000), authkey=b"password")print("Manager server is running...")manager.start()try:input("Press Enter to stop the server...\n")finally:manager.shutdown()if __name__ == "__main__":main()
sender.py
import time
import numpy as np
from multiprocessing import shared_memory
from multiprocessing.managers import BaseManagerclass SharedManager(BaseManager):passdef main():# 注册共享事件对象SharedManager.register("get_event_send")SharedManager.register("get_event_receive")# 连接到 Manager 服务器manager = SharedManager(address=("localhost", 50000), authkey=b"password")manager.connect()# 获取共享事件对象event_send = manager.get_event_send()event_receive = manager.get_event_receive()# 创建共享内存shm_send = shared_memory.SharedMemory(name="shared_data_send", create=True, size=1024)shm_receive = shared_memory.SharedMemory(name="shared_data_receive", create=True, size=1024)# 创建 NumPy 数组,绑定到共享内存data_send = np.ndarray((256,), dtype=np.float32, buffer=shm_send.buf)data_receive = np.ndarray((256,), dtype=np.float32, buffer=shm_receive.buf)try:while True:# 向共享内存写入数据data_send[:] = np.random.rand(256) # 模拟发送的数据print("sender.py: Sent data to shared memory")# 通知 receiver.py 数据已写入event_send.set()# 等待 receiver.py 处理完成event_receive.wait()event_receive.clear() # 重置事件# 读取处理后的数据print("sender.py: Received processed data:", data_receive[:5]) # 打印前 5 个数据# 等待 0.1 秒(10Hz)time.sleep(5)except KeyboardInterrupt:print("sender.py: Exiting...")finally:# 释放共享内存shm_send.close()shm_send.unlink()shm_receive.close()shm_receive.unlink()if __name__ == "__main__":main()
receiver.py
import time
import numpy as np
from multiprocessing import shared_memory
from multiprocessing.managers import BaseManagerclass SharedManager(BaseManager):passdef main():# 注册共享事件对象SharedManager.register("get_event_send")SharedManager.register("get_event_receive")# 连接到 Manager 服务器manager = SharedManager(address=("localhost", 50000), authkey=b"password")manager.connect()# 获取共享事件对象event_send = manager.get_event_send()event_receive = manager.get_event_receive()# 连接到共享内存shm_send = shared_memory.SharedMemory(name="shared_data_send")shm_receive = shared_memory.SharedMemory(name="shared_data_receive")# 创建 NumPy 数组,绑定到共享内存data_send = np.ndarray((256,), dtype=np.float32, buffer=shm_send.buf)data_receive = np.ndarray((256,), dtype=np.float32, buffer=shm_receive.buf)try:while True:# 等待 sender.py 的数据写入通知event_send.wait()event_send.clear() # 重置事件# 从共享内存读取数据received_data = data_send.copy() # 复制数据以避免直接操作共享内存print("receiver.py: Received data:", received_data[:5]) # 打印前 5 个数据# 模拟处理数据并写回共享内存data_receive[:] = received_data * 2print("receiver.py: Processed data written back to shared memory")# 通知 sender.py 数据已处理完成event_receive.set()except KeyboardInterrupt:print("receiver.py: Exiting...")finally:# 释放共享内存shm_send.close()shm_receive.close()if __name__ == "__main__":main()
以上3个脚本的运行步骤为:
1. 启动 manager_server.py,保持运行状态
2. 运行 sender.py
3. 运行 receiver.py