1.服务端
"""
@File: rabbitmq_server.py
@Date: 2025/6/26 10:42
@Author: xxx
@Description:
1. RabbitMQ服务端,支持多节点命令执行
2. 作为被控节点运行,可接收定向命令并返回结果
""" import ssl
import pika
import time
import json
import socket
import logging
import subprocess
import configparser
logger = logging. getLogger( )
logger. setLevel( logging. DEBUG)
file_handler = logging. FileHandler( 'rabbitmq_server.log' , encoding= 'utf-8' )
file_handler. setLevel( logging. DEBUG)
file_handler. setFormatter( logging. Formatter( '%(asctime)s - %(levelname)s - %(lineno)04d - %(message)s' , datefmt= "%Y-%m-%d %H:%M:%S" ) )
logger. addHandler( file_handler)
console_handler = logging. StreamHandler( )
console_handler. setLevel( logging. INFO)
console_handler. setFormatter( logging. Formatter( '%(asctime)s - %(levelname)s - %(lineno)04d - %(message)s' , datefmt= "%Y-%m-%d %H:%M:%S" ) )
logger. addHandler( console_handler) RABBITMQ_HOST_CONF = "/etc/rabbitmq/rabbitmq.conf" class RabbitMQServer : """RabbitMQ RPC服务器类功能:接收并执行来自客户端的定向命令""" def __init__ ( self, node_name= None , mq_user= "rabbitmq" , mq_password= "rabbitmq@123" , mq_virtual_host= "/" , mq_host= None , mq_port= 5671 , mq_ca= "/opt/ssl/ca_certificate.pem" ) : """初始化RabbitMQ服务端:param node_name: 节点名称标识(唯一):param mq_user: RabbitMQ用户名:param mq_password: RabbitMQ密码:param mq_virtual_host: 虚拟主机:param mq_host: RabbitMQ服务器IP:param mq_port: RabbitMQ服务端口:param mq_ca: SSL证书路径""" self. NODE_NAME = node_name if node_name else socket. gethostname( ) self. RABBITMQ_USER = mq_userself. RABBITMQ_UNLOCK_CODE = mq_passwordself. RABBITMQ_VIRTUAL_HOST = mq_virtual_hostself. RABBITMQ_HOST = mq_host if mq_host else self. get_option( RABBITMQ_HOST_CONF, "global" , "rabbitmq_host" ) self. RABBITMQ_PORT = mq_portself. SSL_CA_PATH = mq_caself. _setup_connection( ) def get_option ( self, file_path, section, option) : """获取 file_path 配置项值,若配置文件没有,返回空字符串:param section: section字符串,例如:'global':param option: key值,例如:'manage_nodes':return: 字符串类型数据""" parser = configparser. ConfigParser( ) parser. read( file_path) if not parser. has_option( section, option) : return "" else : return parser. get( section, option) def _get_ssl_options ( self) : """配置SSL安全连接选项""" context = ssl. SSLContext( ssl. PROTOCOL_TLSv1_2) context. load_verify_locations( self. SSL_CA_PATH) return pika. SSLOptions( context, "localhost" ) def _setup_connection ( self) : """建立RabbitMQ连接并设置队列""" credentials = pika. PlainCredentials( self. RABBITMQ_USER, self. RABBITMQ_UNLOCK_CODE ) connection_params = pika. ConnectionParameters( host= self. RABBITMQ_HOST, port= self. RABBITMQ_PORT, virtual_host= self. RABBITMQ_VIRTUAL_HOST, credentials= credentials, ssl_options= self. _get_ssl_options( ) , heartbeat= 600 ) self. connection = pika. BlockingConnection( connection_params) self. channel = self. connection. channel( ) self. channel. queue_declare( queue= self. NODE_NAME, durable= True ) self. channel. basic_qos( prefetch_count= 1 ) self. channel. basic_consume( queue= self. NODE_NAME, on_message_callback= self. _execute_command, auto_ack= False ) def _execute_command ( self, ch, method, props, body) : """执行接收到的命令并返回结果""" try : message = json. loads( body. decode( 'utf-8' ) ) command = message. get( 'command' , '' ) target = message. get( 'target' , '' ) logger. info( f" [x] 收到( { target} )命令: { command} " ) if target != self. NODE_NAME: logger. warning( f" [x] 收到非本节点( { self. NODE_NAME} )命令,已忽略" ) ch. basic_ack( delivery_tag= method. delivery_tag) return logger. info( f" [*] 执行命令 【 { command} 】..." ) try : output = subprocess. check_output( command, shell= True , stderr= subprocess. STDOUT, timeout= 60 ) response = output. decode( 'utf-8' ) except subprocess. TimeoutExpired: response = "Error: Command timed out" except subprocess. CalledProcessError as e: response = f"Error: { e. output. decode( 'utf-8' ) } " except Exception as e: response = f"System Error: { str ( e) } " ch. basic_publish( exchange= '' , routing_key= props. reply_to, properties= pika. BasicProperties( correlation_id= props. correlation_id, delivery_mode= 2 ) , body= response. encode( 'utf-8' ) ) logger. info( f" [*] 命令执行完成" ) ch. basic_ack( delivery_tag= method. delivery_tag) except Exception as e: logger. exception( f" [x] 消息处理异常: { str ( e) } " ) ch. basic_nack( delivery_tag= method. delivery_tag) def start ( self, max_retries= 5 , retry_delay= 10 ) : """启动RabbitMQ服务并持续监听消息功能:管理服务生命周期,处理连接异常和重试逻辑:param max_retries: 最大重试次数,默认5次:param retry_delay: 重试间隔时间(秒),默认10秒:return:""" retry_count = 0 while True : try : logger. info( f" [*] { self. NODE_NAME} 节点服务启动 (尝试 { retry_count + 1 } / { max_retries} )" ) logger. info( f" [*] 等待队列 { self. NODE_NAME} 中的请求..." ) if not hasattr ( self, 'connection' ) or self. connection. is_closed: self. _setup_connection( ) self. channel. start_consuming( ) except pika. exceptions. AMQPConnectionError as e: retry_count += 1 logger. exception( f"连接失败: { str ( e) } " ) if retry_count >= max_retries: logger. error( " [x] 达到最大重试次数,终止服务" ) self. close( ) break logger. warning( f" [*] { retry_delay} 秒后尝试重新连接..." ) time. sleep( retry_delay) except KeyboardInterrupt: logger. error( "\n [x] 接收到终止信号" ) self. close( ) logger. error( " [x] 服务已停止" ) break except Exception as e: logger. exception( f"服务异常: { str ( e) } " ) time. sleep( retry_delay) def close ( self) : """安全关闭RabbitMQ连接功能:清理资源,确保连接被正确关闭:return:""" if hasattr ( self, 'connection' ) and not self. connection. is_closed: self. connection. close( ) logger. info( " [x] 连接已安全关闭" ) if __name__ == '__main__' : server = RabbitMQServer( ) try : server. start( ) except KeyboardInterrupt: logger. error( "\n [x] 接收到终止信号" ) server. close( ) logger. error( " [x] 服务已停止" )
2.客户端
"""
@File: rabbitmq_client.py
@Date: 2025/6/26 10:43
@Author: xxx
@Description:
1. RabbitMQ客户端类,支持向指定节点发送SSH命令
2. 作为控制端运行,可定向发送命令并接收执行结果
""" import ssl
import pika
import time
import uuid
import json
import socket
import logging
import configparser
logger = logging. getLogger( )
logger. setLevel( logging. DEBUG)
file_handler = logging. FileHandler( 'rabbitmq_client.log' , encoding= 'utf-8' )
file_handler. setLevel( logging. DEBUG)
file_handler. setFormatter( logging. Formatter( '%(asctime)s - %(levelname)s - %(lineno)04d - %(message)s' , datefmt= "%Y-%m-%d %H:%M:%S" ) )
logger. addHandler( file_handler)
console_handler = logging. StreamHandler( )
console_handler. setLevel( logging. INFO)
console_handler. setFormatter( logging. Formatter( '%(asctime)s - %(levelname)s - %(lineno)04d - %(message)s' , datefmt= "%Y-%m-%d %H:%M:%S" ) )
logger. addHandler( console_handler) RABBITMQ_HOST_CONF = "/etc/rabbitmq/rabbitmq.conf" class RabbitMQClient : """RabbitMQ RPC客户端类功能:向指定节点发送命令并获取执行结果""" def __init__ ( self, mq_user= "rabbitmq" , mq_password= "rabbitmq@123" , mq_virtual_host= "/" , mq_host= None , mq_port= 5671 , mq_ca= "/opt/ssl/ca_certificate.pem" ) : """初始化RabbitMQ客户端:param mq_user: RabbitMQ用户名:param mq_password: RabbitMQ密码:param mq_virtual_host: 虚拟主机:param mq_host: RabbitMQ服务器IP:param mq_port: RabbitMQ服务端口:param mq_ca: SSL证书路径""" self. RABBITMQ_USER = mq_userself. RABBITMQ_UNLOCK_CODE = mq_passwordself. RABBITMQ_VIRTUAL_HOST = mq_virtual_hostself. RABBITMQ_HOST = mq_host if mq_host else self. get_option( RABBITMQ_HOST_CONF, "global" , "rabbitmq_host" ) self. RABBITMQ_PORT = mq_portself. SSL_CA_PATH = mq_caself. response = None self. corr_id = None logger. info( " [x] 正在建立连接 ..." ) self. _connect( ) logger. info( " [x] 连接建立成功" ) def get_option ( self, file_path, section, option) : """获取 file_path 配置项值,若配置文件没有,返回空字符串:param section: section字符串,例如:'global':param option: key值,例如:'manage_nodes':return: 字符串类型数据""" parser = configparser. ConfigParser( ) parser. read( file_path) if not parser. has_option( section, option) : return "" else : return parser. get( section, option) def _connect ( self) : """建立RabbitMQ连接并初始化回调队列功能:配置安全连接参数、创建通信信道、设置消息回调处理:return:""" ssl_context = ssl. SSLContext( ssl. PROTOCOL_TLSv1_2) ssl_context. load_verify_locations( self. SSL_CA_PATH) ssl_options = pika. SSLOptions( ssl_context, "localhost" ) credentials = pika. PlainCredentials( self. RABBITMQ_USER, self. RABBITMQ_UNLOCK_CODE ) connection_params = pika. ConnectionParameters( host= self. RABBITMQ_HOST, port= self. RABBITMQ_PORT, virtual_host= self. RABBITMQ_VIRTUAL_HOST, credentials= credentials, ssl_options= ssl_options, heartbeat= 60 ) self. connection = pika. BlockingConnection( connection_params) self. channel = self. connection. channel( ) result = self. channel. queue_declare( queue= '' , exclusive= True ) self. callback_queue = result. method. queueself. channel. basic_consume( queue= self. callback_queue, on_message_callback= self. _on_response, auto_ack= False ) def _on_response ( self, ch, method, props, body) : """RPC模式下的响应消息回调处理函数功能:匹配并接收服务端返回的命令执行结果处理逻辑:1.通过correlation_id匹配对应的请求2.将二进制消息体解码为字符串3.存储结果供execute_command方法获取:param ch: (pika.channel.Channel): 接收到消息的信道对象:param method: (pika.spec.Basic.Deliver): 包含投递信息(如delivery_tag):param props: (pika.spec.BasicProperties): 消息属性(含correlation_id等):param body: (bytes): 消息体内容(服务端返回的执行结果):return:""" try : if self. corr_id == props. correlation_id: self. response = body. decode( 'utf-8' ) except UnicodeDecodeError as e: self. response = f"解码失败: { str ( e) } " def execute_command ( self, command, target_node= None , timeout= 60 ) : """向指定RabbitMQ节点发送命令并获取执行结果(RPC模式):param command (str): 要执行的shell命令字符串(如"ls -l"):param target_node (str): 目标节点标识,对应服务端的队列名- 默认None表示发送到当前主机节点:param timeout (int): 等待响应的超时时间(秒),默认60秒:return str: 命令执行结果文本异常:TimeoutError: 超过指定时间未收到响应时抛出AMQP相关异常: 消息发送失败时抛出向指定节点执行远程命令""" self. response = None self. corr_id = str ( uuid. uuid4( ) ) if not target_node: target_node = socket. gethostname( ) message = { "command" : command, "target" : target_node, "timestamp" : time. time( ) } self. channel. basic_publish( exchange= '' , routing_key= target_node, properties= pika. BasicProperties( reply_to= self. callback_queue, correlation_id= self. corr_id, ) , body= json. dumps( message) . encode( 'utf-8' ) ) start_time = time. time( ) while self. response is None : self. connection. process_data_events( ) if time. time( ) - start_time > timeout: raise TimeoutError( f"等待节点 { target_node} 响应超时" ) time. sleep( 0.1 ) return self. response def close ( self) : """安全关闭RabbitMQ连接功能:1. 清理网络连接资源2. 自动删除临时队列(exclusive队列)3. 防止资源泄漏:return:""" if self. connection and not self. connection. is_closed: self. connection. close( ) logger. warning( " [x] 连接已关闭" ) if __name__ == '__main__' : client = RabbitMQClient( ) try : nodes = [ "node247" , "node248" , "node249" ] for node in nodes: try : logger. info( f"\n向节点 { node} 执行命令: hostname" ) logger. info( client. execute_command( command= "hostname" , target_node= node) ) except Exception as e: logger. exception( f"节点 { node} 执行失败: { str ( e) } " ) try : logger. info( f"\n向节点 { node} 执行命令: ls -l /opt/" ) logger. info( client. execute_command( command= "ls -l /opt/" , target_node= node) ) except Exception as e: logger. exception( f"节点 { node} 执行失败: { str ( e) } " ) try : logger. info( f"\n向节点 { node} 执行命令: date" ) logger. info( client. execute_command( command= "date" , target_node= node) ) except Exception as e: logger. exception( f"节点 { node} 执行失败: { str ( e) } " ) finally : client. close( )
3.调用结果
192.168 .120 .17 node17
192.168 .120 .18 node18
192.168 .120 .19 node19python3 rabbitmq_server. py
192.168 .120 .17 node17python3 rabbitmq_client. py