【网站内容安全检测】之2:从网站所有URL页面中提取所有外部及内部域名信息

还没写成Go的,用Python吧,稍微慢一点

依赖内容(安装命令pip install -r requirements.txt)

requirements.txt

aiohttp
beautifulsoup4==4.12.2
tqdm==4.66.1
redis==5.2.1
motor==3.3.1
pymongo==4.6.0
chardet

提取域名的程序
domain_extractor.py

#!/usr/bin/env python3
# -*- coding: utf-8 -*-import re
import sqlite3
import threading
import asyncio
import aiohttp
import chardet
import ssl
import os
from concurrent.futures import ThreadPoolExecutor
from urllib.parse import urlparse, urlunparse, urljoin
from typing import Set, List, Dict, Tuple
import time
from tqdm import tqdm
from bs4 import BeautifulSoupclass DomainExtractor:def __init__(self, db_path: str = 'domains.db', max_connections: int = 100):self.db_path = db_pathself.domain_pattern = re.compile(r'[a-zA-Z0-9][a-zA-Z0-9-]{1,61}[a-zA-Z0-9](?:\.[a-zA-Z]{2,})+')self.max_connections = max_connectionsself.semaphore = asyncio.Semaphore(max_connections)self.session = Noneself.tasks = []# 定义需要过滤的文件扩展名self.file_extensions = {# 网页文件'.html', '.htm', '.shtml', '.jhtml', '.asp', '.aspx', '.php', '.jsp', '.cgi',# 文档文件'.pdf', '.doc', '.docx', '.xls', '.xlsx', '.ppt', '.pptx', '.txt', '.rtf',# 图片文件'.jpg', '.jpeg', '.png', '.gif', '.bmp', '.webp', '.svg',# 压缩文件'.zip', '.rar', '.7z', '.tar', '.gz',# 音频文件'.mp3', '.wav', '.ogg', '.m4a',# 视频文件'.mp4', '.avi', '.mov', '.wmv', '.flv',# 其他常见文件'.exe', '.dll', '.so', '.dylib', '.class', '.jar', '.war','.css', '.js', '.json', '.xml', '.rss', '.atom'}self.init_database()def init_database(self):"""初始化SQLite数据库"""# 如果数据库文件存在,删除它if os.path.exists(self.db_path):os.remove(self.db_path)conn = sqlite3.connect(self.db_path)cursor = conn.cursor()cursor.execute('''CREATE TABLE IF NOT EXISTS domains (domain TEXT PRIMARY KEY,first_seen TIMESTAMP DEFAULT CURRENT_TIMESTAMP,last_seen TIMESTAMP DEFAULT CURRENT_TIMESTAMP,count INTEGER DEFAULT 1,source_url TEXT)''')conn.commit()conn.close()async def init_session(self):"""初始化HTTP会话"""if not self.session:# 创建SSL上下文ssl_context = ssl.create_default_context()ssl_context.check_hostname = Falsessl_context.verify_mode = ssl.CERT_NONE# 创建连接器connector = aiohttp.TCPConnector(ssl=ssl_context,force_close=True,enable_cleanup_closed=True,limit=self.max_connections)self.session = aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=30),headers={'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36','Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8','Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8','Accept-Encoding': 'gzip, deflate, br','Connection': 'keep-alive','Upgrade-Insecure-Requests': '1','Cache-Control': 'max-age=0'},connector=connector)async def close_session(self):"""关闭HTTP会话"""if self.session and not self.session.closed:await self.session.close()self.session = Nonedef is_valid_domain(self, domain: str) -> bool:"""验证域名是否有效"""# 移除所有干扰字符domain = re.sub(r'[()()【】\[\]《》<>,。、;:""''!?…—]', '', domain)domain = re.sub(r'[^\x00-\x7F]+', '', domain)  # 移除所有非ASCII字符# 检查是否是文件扩展名if any(domain.lower().endswith(ext) for ext in self.file_extensions):return False# 检查域名格式if not self.domain_pattern.match(domain):return False# 检查是否是IP地址if re.match(r'^(\d{1,3}\.){3}\d{1,3}$', domain):return False# 检查是否是本地域名if domain in ('localhost', '127.0.0.1', '0.0.0.0'):return Falsereturn Truedef extract_domain(self, url: str) -> str:"""从URL中提取域名"""try:parsed = urlparse(url)domain = parsed.netloc.lower()# 移除端口号if ':' in domain:domain = domain.split(':')[0]# 移除所有干扰字符domain = re.sub(r'[()()【】\[\]《》<>,。、;:""''!?…—]', '', domain)domain = re.sub(r'[^\x00-\x7F]+', '', domain)  # 移除所有非ASCII字符return domainexcept:return ''def is_html_content(self, content_type: str) -> bool:"""检查内容类型是否为HTML"""html_types = ['text/html', 'application/xhtml+xml', 'application/xml']return any(html_type in content_type.lower() for html_type in html_types)def convert_to_https(self, url: str) -> str:"""将HTTP URL转换为HTTPS"""parsed = urlparse(url)if parsed.scheme == 'http':return urlunparse(('https', parsed.netloc, parsed.path, parsed.params, parsed.query, parsed.fragment))return urlasync def fetch_page(self, url: str) -> str:"""异步获取页面内容"""async with self.semaphore:try:if not self.session or self.session.closed:await self.init_session()# 添加重试机制max_retries = 3for retry in range(max_retries):try:# 首先尝试HTTPShttps_url = self.convert_to_https(url)try:async with self.session.get(https_url, allow_redirects=True) as response:if response.status == 200:return await self._process_response(response)except Exception as e:print(f"HTTPS请求失败 {https_url}: {e}")# 如果HTTPS失败,尝试HTTPasync with self.session.get(url, allow_redirects=True) as response:if response.status == 200:return await self._process_response(response)elif response.status == 301 or response.status == 302:# 处理重定向redirect_url = response.headers.get('Location')if redirect_url:if not redirect_url.startswith(('http://', 'https://')):redirect_url = urljoin(url, redirect_url)async with self.session.get(redirect_url, allow_redirects=True) as redirect_response:if redirect_response.status == 200:return await self._process_response(redirect_response)elif response.status == 404:print(f"页面不存在: {url}")return Noneelif response.status == 403:print(f"访问被拒绝: {url}")return Noneelse:print(f"HTTP错误 {response.status}: {url}")if retry < max_retries - 1:await asyncio.sleep(1)  # 等待1秒后重试continuereturn Noneexcept aiohttp.ClientSSLError as e:print(f"SSL证书错误 {url}: {e}")if retry < max_retries - 1:await asyncio.sleep(1)continuereturn Noneexcept aiohttp.ClientConnectorError as e:print(f"连接错误 {url}: {e}")if retry < max_retries - 1:await asyncio.sleep(1)continuereturn Noneexcept Exception as e:print(f"获取页面失败 {url}: {e}")if retry < max_retries - 1:await asyncio.sleep(1)continuereturn Nonereturn Noneexcept Exception as e:print(f"获取页面失败 {url}: {e}")return Noneasync def _process_response(self, response: aiohttp.ClientResponse) -> str:"""处理HTTP响应"""# 检查内容类型content_type = response.headers.get('Content-Type', '')if not self.is_html_content(content_type):return None# 获取原始字节内容content = await response.read()# 检测编码encoding = chardet.detect(content)['encoding']if not encoding:# 如果检测失败,尝试从响应头获取编码if 'charset=' in content_type:encoding = content_type.split('charset=')[-1].strip()else:# 默认使用utf-8encoding = 'utf-8'try:# 尝试使用检测到的编码解码return content.decode(encoding)except UnicodeDecodeError:# 如果解码失败,尝试其他常见编码for enc in ['utf-8', 'gbk', 'gb2312', 'gb18030', 'big5']:try:return content.decode(enc)except UnicodeDecodeError:continue# 如果所有编码都失败,使用utf-8并忽略错误return content.decode('utf-8', errors='ignore')def extract_domains_from_text(self, text: str, source_url: str) -> Set[str]:"""从文本中提取所有域名"""domains = set()# 提取所有URL中的域名urls = re.findall(r'https?://[^\s<>"]+|www\.[^\s<>"]+', text)for url in urls:domain = self.extract_domain(url)if domain and self.is_valid_domain(domain):domains.add(domain)# 提取纯文本中的域名text_domains = self.domain_pattern.findall(text)for domain in text_domains:domain = domain.lower()if self.is_valid_domain(domain):domains.add(domain)return domainsasync def process_url(self, url: str) -> Tuple[str, Set[str]]:"""处理单个URL并提取域名"""try:html = await self.fetch_page(url)if not html:return url, set()# 使用BeautifulSoup解析HTMLtry:soup = BeautifulSoup(html, 'html.parser')except Exception as e:print(f"解析HTML失败 {url}: {e}")return url, set()# 提取所有链接domains = set()for tag in soup.find_all(['a', 'link', 'script', 'img']):for attr in ['href', 'src']:if tag.get(attr):domain = self.extract_domain(tag[attr])if domain and self.is_valid_domain(domain):domains.add(domain)# 提取页面文本中的域名text_domains = self.extract_domains_from_text(soup.get_text(), url)domains.update(text_domains)return url, domainsexcept asyncio.CancelledError:print(f"任务被取消: {url}")return url, set()except Exception as e:print(f"处理URL时出错 {url}: {e}")return url, set()def save_domains(self, domain_data: Dict[str, Set[str]]):"""将域名保存到数据库"""conn = sqlite3.connect(self.db_path)cursor = conn.cursor()# 使用事务提高性能cursor.execute('BEGIN TRANSACTION')for source_url, domains in domain_data.items():for domain in domains:cursor.execute('''INSERT INTO domains (domain, last_seen, count, source_url)VALUES (?, CURRENT_TIMESTAMP, 1, ?)ON CONFLICT(domain) DO UPDATE SETlast_seen = CURRENT_TIMESTAMP,count = count + 1''', (domain, source_url))conn.commit()conn.close()async def process_file(self, input_file: str, num_threads: int = 4):"""处理输入文件并提取域名"""print(f"开始处理文件: {input_file}")start_time = time.time()# 读取所有URLwith open(input_file, 'r', encoding='utf-8') as f:urls = [line.strip() for line in f if line.strip()]total_urls = len(urls)print(f"共读取到 {total_urls} 个URL")# 初始化HTTP会话await self.init_session()try:# 创建任务列表self.tasks = []for url in urls:task = asyncio.create_task(self.process_url(url))self.tasks.append(task)# 使用tqdm显示进度domain_data = {}for completed_task in tqdm(asyncio.as_completed(self.tasks), total=len(self.tasks), desc="处理进度"):url, domains = await completed_taskif domains:domain_data[url] = domains# 保存域名到数据库print(f"\n保存域名到数据库...")self.save_domains(domain_data)elapsed_time = time.time() - start_timeprint(f"\n处理完成!")print(f"总用时: {elapsed_time:.2f} 秒")print(f"平均速度: {total_urls/elapsed_time:.2f} URL/秒")except Exception as e:print(f"处理过程中出错: {e}")finally:# 取消所有未完成的任务for task in self.tasks:if not task.done():task.cancel()# 等待所有任务完成await asyncio.gather(*self.tasks, return_exceptions=True)await self.close_session()def get_domain_stats(self):"""获取域名统计信息"""conn = sqlite3.connect(self.db_path)cursor = conn.cursor()cursor.execute('SELECT COUNT(*) FROM domains')total_domains = cursor.fetchone()[0]cursor.execute('''SELECT domain, count, first_seen, last_seen, source_url FROM domains ORDER BY count DESC LIMIT 10''')top_domains = cursor.fetchall()conn.close()print("\n域名统计信息:")print(f"总域名数: {total_domains}")print("\nTop 10 域名:")for domain, count, first_seen, last_seen, source_url in top_domains:print(f"域名: {domain}")print(f"出现次数: {count}")print(f"首次发现: {first_seen}")print(f"最后发现: {last_seen}")print(f"来源URL: {source_url}")print("-" * 50)async def main():import sysif len(sys.argv) < 2:print("用法: python domain_extractor.py <URL文件路径> [并发数]")print("例如: python domain_extractor.py example.com_links.txt 100")returninput_file = sys.argv[1]max_connections = int(sys.argv[2]) if len(sys.argv) > 2 else 100extractor = DomainExtractor(max_connections=max_connections)await extractor.process_file(input_file)extractor.get_domain_stats()if __name__ == "__main__":asyncio.run(main()) 

用法

python domain_extractor.py <URL文件路径> [并发数]

结束后会在当前目录下生成一个domains.db的sqlite数据库文件,用数据库管理软件把域名信息导出出来,留后续识别内容安全使用

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/news/911853.shtml
繁体地址,请注明出处:http://hk.pswp.cn/news/911853.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【LLaMA-Factory 实战系列】四、API 篇 - 部署推理服务与批量调用实战

【LLaMA-Factory 实战系列】四、API 篇 - 部署推理服务与批量调用实战 1. 引言2. 推理后端的选择与对比3. 部署 API 推理服务3.1 创建 API 配置文件3.2 启动 API 服务3.3 探索交互式 API 文档 4. 编写 Python 脚本进行批量调用4.1 准备工作4.2 批量调用脚本4.3 运行脚本并查看结…

C++工厂模式的作用(工厂方法、Factory Method、Factory Pattern)

文章目录 代码示例工厂的作用1. 对象创建的封装 &#x1f3ed;2. 解耦客户端和具体类 &#x1f517;3. 统一的创建入口 &#x1f6aa;4. 隐藏实现细节 &#x1f3ad; 在这个项目中的具体体现总结 代码示例 https://gitee.com/arnold_s/my-learning-test/tree/master/20250610_…

9-C#修改任务管理的名称

C#修改任务管理的名称

Fisco Bcos学习 - 搭建第一个区块链网络

文章目录 一、前言二、环境准备三、安装依赖在 macOS 上安装依赖在 Ubuntu 上安装依赖在 CentOS 上安装依赖 四、创建操作目录并下载安装脚本五、搭建单群组 4 节点联盟链六、启动 FISCO BCOS 链七、检查进程八、检查日志输出 在数字化时代&#xff0c;区块链技术正逐渐成为推动…

可视化图解算法53:表达式求值

牛客网 面试笔试 TOP 101 1. 题目 描述 请写一个整数计算器&#xff0c;支持加减乘三种运算和括号。 数据范围&#xff1a;0≤∣s∣≤100&#xff0c;保证计算结果始终在整型范围内 要求&#xff1a;空间复杂度&#xff1a; O(n)&#xff0c;时间复杂度 O(n) 示例1 输入…

小白成长之路-Nginx配置(二)

文章目录 一、localtion配置1.匹配规则2.匹配优先级3.配置案例 二、rewrite1、 语法2、 可写入字段3 配置案例4 if 指令5.sutoindex6. nginx配置中的常用变量 三、配置Nginx状态统计1.下载vts模块2.编译nginx 提示&#xff1a;以下是本篇文章正文内容&#xff0c;下面案例可供参…

Qt的第一个程序

Qt的第一个程序 1.hello world2.使用图形化拖拽方式3.使用C代码的方式3.1.头文件3.2.setText3.3.对象树 4.设计MyLabel5.乱码问题 &#x1f31f;&#x1f31f;hello&#xff0c;各位读者大大们你们好呀&#x1f31f;&#x1f31f; &#x1f680;&#x1f680;系列专栏&#xff…

图书数据接口

基本说明&#xff1a; 接口地址&#xff1a;http://data.isbn.work/openApi/getInfoByIsbn?isbn{isbn}&appKey{appkey}返回格式&#xff1a;json请求方式&#xff1a;get请求示例&#xff1a;http://data.isbn.work/openApi/getInfoByIsbn?isbn9787513159074&appKey…

MongoDB原理

目录 一、概念 二、架构 2.1 逻辑结构 2.2 数据模型 2.3 存储引擎&#xff1a;WiredTiger 三、事务 一、概念 MongoDB是文档数据库&#xff0c;基本存储单元是 文档&#xff08;Document&#xff09;&#xff0c;以BSON格式&#xff08;一种类json的二进制形式&#xff…

《解码音频:从基础到未来的听觉探索》

音频&#xff1a;开启声音世界的大门 在生活的每一个角落&#xff0c;音频如影随形&#xff0c;编织出丰富多彩的听觉体验。清晨&#xff0c;第一缕阳光尚未完全照进房间&#xff0c;手机里温柔的闹钟铃声&#xff0c;将我们从睡梦中轻轻唤醒&#xff0c;开启活力满满的一天。通…

web安全之h2注入系统学习

起初是在N1 Junior 2025 上面碰到一题&#xff0c;考点是h2的sql注入。由于之前没有见过&#xff0c;趁此机会系统学习一番 实验代码 public class H2Inject {public static void main(String[] args) throws Exception{JdbcDataSource dataSource new JdbcDataSource();dataS…

AWS认证系列:考点解析 - cloud trail,cloud watch,aws config

&#x1f3af;一句话总览&#xff1a; 服务名类比/角色主要功能CloudTrail监控摄像头录像回放记录“谁在什么时候做了什么操作”CloudWatch护士测体温 护士喊医生实时监控系统状态&#xff0c;并能报警/自动应对AWS Config保安巡逻 记录资产变更历史记录 AWS 资源的“配置状…

Java八股文——数据结构「数据结构篇」

了解哪些数据结构&#xff1f; 面试官您好&#xff0c;我了解并使用过多种数据结构。在我的理解中&#xff0c;数据结构可以分为几个大的类别&#xff0c;每一类都有其独特的优势和适用场景。 1. 线性结构 (Linear Structures) 这类结构的特点是数据元素之间存在一对一的线性…

C#测试调用EPPlus根据批注设置excel单元格内容

EPPlus也是常用的Excel文件操作库&#xff0c;但不同于ClosedXML&#xff0c;使用EPPlus前需要设置授权信息&#xff0c;商业应用需要设置商业授权&#xff0c;个人使用或非商业应用也需要设置授权&#xff08;测试的时候只需设置全名&#xff0c;保存excel文件时会保存到文件详…

windows本地搭建skywalking, 线程池中traceId不丢失

1.从官网下载9.0.0版本 Downloads | Apache SkyWalking 其它历史版本的 下载地址 Index of /dist/skywalking 这个页面 可以下载 apm服务: apache-skywalking-apm-9.0.0.tar.gz agent的包: apache-skywalking-java-agent-9.0.0.tgz 2.解压后, (看情况去config路径下 appli…

多模态大语言模型arxiv论文略读(135)

Agent S: An Open Agentic Framework that Uses Computers Like a Human ➡️ 论文标题&#xff1a;Agent S: An Open Agentic Framework that Uses Computers Like a Human ➡️ 论文作者&#xff1a;Saaket Agashe, Jiuzhou Han, Shuyu Gan, Jiachen Yang, Ang Li, Xin Eric…

wpa_supplicant连接到了路由,但是 udhcpc会分配到不同网段的ip,路由器ip为192.168.0网段,板子分配ip为192.168.1的网段

wpa_supplicant连接到了路由&#xff0c;但是 udhcpc会分配到不同网段的ip,路由器ip为192.168.0网段&#xff0c;板子分配ip为192.168.1的网段 你提到的情况&#xff1a; 使用 wpa_supplicant 成功连接到路由器&#xff1b; 然后通过 udhcpc&#xff08;DHCP客户端&#xff09…

[Hestia]开源网络服务器控制面板,快速、可靠、开源

测评介绍 本期测评试用一下Hestia这款面板。Hestia是一个由国际社区支持开发的开源项目&#xff0c;2019年正式发布&#xff0c;目前已积累1.1万余次代码提交&#xff0c;几乎每周都有十多次的代码提交&#xff0c;更新热度很高。仅支持比较新的debian和ubuntu&#xff0c;对于…

Windows 安装 Redis8.0.2

1.下载 Releases redis-windows/redis-windowshttps://github.com/redis-windows/redis-windows/releases 下载后直接解压到想要的安装目录就行了&#xff0c;启动Redis直接双击 redis-server.exe 文件就行了&#xff0c;Redis启动后双击 redis-cli.exe 就可以直接连接到Redi…

GitHub中openmmlab和Detectron2的区别

MMDetection 和 Detectron2 都是计算机视觉领域中流行的开源目标检测框架&#xff0c;它们有许多相似之处&#xff0c;但也存在一些关键差异。以下是两者的主要区别&#xff1a; 1. 开发团队与社区 MMDetection 由中国开源组织 OpenMMLab 开发维护&#xff0c;社区以中文用户为…