最近做新老版本zabbix监控主机迁移发现zabbix6.0后api安全有了效大升级,批量主机维护脚本出现认证兼容性问题,以下为脚本更新token支持:`在这里插入代码片:
# /usr/bin/env python3
# -*- coding:utf-8 -*-
import requests
import json
import os
import re
from typing import List, Dict
import ipaddress## 用户配置区域
ZABBIX_URL = "http://[IP]/api_jsonrpc.php" # Zabbix API 地址(根据不同版本api接口地址调整)
USE_API_TOKEN = True # 是否使用 API Token 认证(True/False,token与传统认证方式二选一)
API_TOKEN = "[token]" # zbx6创建固定Token时填写,zbx5.0以下用动态auth-token忽略
USERNAME = "[user]" # 传统认证用户名(zbx6以下兼容,使用api-token认证时忽略)
PASSWORD = "[password]" # 传统认证密码(zbx6以下兼容,使用api-token认证时忽略)
HOSTS_FILE = "serverlist.txt" # 主机列表文件路径class ZabbixClient:def __init__(self, url: str, use_api_token: bool, token: str = None, username: str = None, password: str = None):self.url = urlself.use_api_token = use_api_tokenself.headers = {"Content-Type": "application/json-rpc"}# 配置认证if use_api_token:self.headers["Authorization"] = f"Bearer {token}"else:self.auth = self._login(username, password)def _login(self, username: str, password: str) -> str:"""传统认证获取Token"""payload = {"jsonrpc": "2.0","method": "user.login","params": {"user": username, "password": password},"id": 1}return self._send_request(payload, require_auth=False)["result"]def _send_request(self, payload: Dict, require_auth: bool = True) -> Dict:"""统一请求处理"""if require_auth and not self.use_api_token:payload["auth"] = self.authtry:response = requests.post(self.url,headers=self.headers,data=json.dumps(payload),timeout=10)response.raise_for_status()result = response.json()if "error" in result:error_info = result["error"]raise Exception(f"API错误 ({error_info['code']}): {error_info['message']}\n数据: {error_info.get('data', '')}")return resultexcept requests.exceptions.RequestException as e:raise Exception(f"请求失败: {str(e)}")def check_host_exists(self, hostname: str) -> bool:"""检查主机是否存在"""payload = {"jsonrpc": "2.0","method": "host.get","params": {"filter": {"host": hostname}, "output": ["hostid"]},"id": 2}result = self._send_request(payload, require_auth=True)return len(result["result"]) > 0def get_hostgroup(self, group_name: str) -> List[Dict]:"""获取主机组ID"""payload = {"jsonrpc": "2.0","method": "hostgroup.get","params": {"filter": {"name": [group_name]},"output": ["groupid"]},"id": 3}return self._send_request(payload, require_auth=True).get("result", [])def create_hostgroup(self, group_name: str) -> str:"""创建主机组并返回ID"""payload = {"jsonrpc": "2.0","method": "hostgroup.create","params": {"name": group_name},"id": 4}result = self._send_request(payload, require_auth=True)return result["result"]["groupids"][0]def get_template(self, template_name: str) -> List[Dict]:"""获取模板ID"""payload = {"jsonrpc": "2.0","method": "template.get","params": {"filter": {"host": [template_name]},"output": ["templateid"]},"id": 5}return self._send_request(payload, require_auth=True).get("result", [])def create_host(self, hostname: str, ip: str, groups: List[str], templates: List[str]) -> None:"""创建主机(带完整参数校验)"""self._validate_base_params(hostname, ip, groups, templates)if self.check_host_exists(hostname):print(f"主机 {hostname} 已存在,跳过")returngroup_ids = self._process_hostgroups(groups)template_ids = self._process_templates(templates)interface = self._build_interface(ip)payload = self._build_payload(hostname, interface, group_ids, template_ids)self._send_request(payload, require_auth=True)print(f"主机 {hostname} ({ip}) 创建成功")def _validate_base_params(self, hostname: str, ip: str, groups: List[str], templates: List[str]):"""基础参数校验"""if not hostname.strip():raise ValueError("主机名不可为空或仅包含空格")if not re.match(r'^[\w\-_.]+$', hostname):raise ValueError(f"主机名包含非法字符: {hostname}")if not self._is_valid_ip(ip):raise ValueError(f"无效IP地址: {ip}")if not groups or not all(groups):raise ValueError("主机组不可为空或包含空值")if not templates or not all(templates):raise ValueError("模板不可为空或包含空值")def _process_hostgroups(self, groups: List[str]) -> List[Dict]:"""处理主机组(创建不存在的组)"""group_ids = []for group in groups:group = group.strip()group_info = self.get_hostgroup(group)if not group_info:print(f"创建主机组 '{group}'...")group_id = self.create_hostgroup(group)group_ids.append({"groupid": group_id})else:group_ids.append({"groupid": group_info[0]["groupid"]})return group_idsdef _process_templates(self, templates: List[str]) -> List[Dict]:"""处理模板(校验存在性)"""template_ids = []for template in templates:template = template.strip()template_info = self.get_template(template)if not template_info:raise Exception(f"模板 '{template}' 不存在,请检查名称(区分大小写)")template_ids.append({"templateid": template_info[0]["templateid"]})return template_idsdef _build_interface(self, ip: str) -> Dict:"""构建接口参数(完整字段)"""return {"type": 1, # Zabbix Agent接口"main": 1, # 主接口"useip": 1, # 使用IP地址"ip": ip,"port": "10050", # 默认端口"dns": "", # 可选字段,显式为空"ipmi_dns": "","username": "","password": ""}def _build_payload(self, hostname: str, interface: Dict, group_ids: List[Dict], template_ids: List[Dict]) -> Dict:"""构建完整请求参数(移除无效库存字段)"""return {"jsonrpc": "2.0","method": "host.create","params": {"host": hostname,"interfaces": [interface],"groups": group_ids,"templates": template_ids,"inventory_mode": 0 # 禁用自动发现,无需inventory字段},"id": 6}def _is_valid_ip(self, ip: str) -> bool:"""验证IP地址格式(支持IPv4/IPv6)"""try:ipaddress.ip_address(ip)return Trueexcept ValueError:print(f"无效IP格式: {ip}")return Falsedef main():"""主函数:批量导入主机"""try:# 初始化客户端if USE_API_TOKEN:zabbix = ZabbixClient(url=ZABBIX_URL,use_api_token=True,token=API_TOKEN)else:zabbix = ZabbixClient(url=ZABBIX_URL,use_api_token=False,username=USERNAME,password=PASSWORD)# 验证主机列表文件if not os.path.exists(HOSTS_FILE):raise FileNotFoundError(f"文件 {HOSTS_FILE} 不存在")# 处理主机列表with open(HOSTS_FILE, "r", encoding="utf-8") as f:for line_num, line in enumerate(f, 1):line = line.strip()if not line or line.startswith("#"):continuetry:# 解析行数据parts = line.split("#", 3)if len(parts) != 4:raise ValueError("字段数量错误,必须为4个(主机名#IP#主机组#模板)")hostname, ip, groups_str, templates_str = partsgroups = groups_str.split(",") if groups_str else []templates = templates_str.split(",") if templates_str else []# 创建主机zabbix.create_host(hostname, ip, groups, templates)except ValueError as ve:print(f"行 {line_num} 格式错误: {str(ve)},跳过")except Exception as e:print(f"行 {line_num} 处理失败: {str(e)}")except Exception as e:print(f"\n程序终止: {str(e)}")exit(1)if __name__ == "__main__":main()print("\n批量导入完成!")
注:输入文件 serverlist.txt 格式:
每行 主机名#IP地址#主机组1,主机组2#模板1,模板,示例:
web-server#192.168.1.1#Web Servers,Linux#Template OS Linux
db-server#192.168.1.2#DB Servers#Template OS Linux,Template MySQL