用途限制声明,本文仅用于网络安全技术研究、教育与知识分享。文中涉及的渗透测试方法与工具,严禁用于未经授权的网络攻击、数据窃取或任何违法活动。任何因不当使用本文内容导致的法律后果,作者及发布平台不承担任何责任。渗透测试涉及复杂技术操作,可能对目标系统造成数据损坏、服务中断等风险。读者需充分评估技术能力与潜在后果,在合法合规前提下谨慎实践。
这次主要介绍子域名爆破脚本,关于这个脚本大家应该不会陌生一个工具,oneforall子域名爆破,接下来我们就从代码的角度看看是如何进行子域名爆破的,先提供源码,再进行逐个模块分析
import dns.resolver
import socket
import argparse
import time
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor, as_completed
import logging
from typing import List, Optional# 配置日志输出
logging.basicConfig(level=logging.INFO,format='%(asctime)s - %(levelname)s - %(message)s',handlers=[logging.FileHandler("dns_exploration.log"),logging.StreamHandler()]
)
logger = logging.getLogger(__name__)def reverse_dns(ip: str) -> Optional[List[str]]:"""反向DNS查询,获取IP对应的域名信息"""try:result = socket.gethostbyaddr(ip)return [result[0]] + result[1]except (socket.herror, socket.timeout):return Nonedef dns_request(domain: str, record_type: str = 'A') -> List[str]:"""DNS查询函数,支持多种记录类型:param domain: 待查询域名:param record_type: DNS记录类型(A, AAAA, CNAME, MX, NS等):return: 查询到的记录列表"""ips = []try:resolver = dns.resolver.Resolver()resolver.timeout = 5 # 超时时间resolver.lifetime = 10 # 总生命周期result = resolver.resolve(domain, record_type)for answer in result:record_value = answer.to_text()ips.append(record_value)logger.info(f"域名: {domain} | 类型: {record_type} | 记录值: {record_value}")# 对A/AAAA记录进行反向查询if record_type in ['A', 'AAAA']:reverse_domains = reverse_dns(record_value)if reverse_domains:logger.info(f" 反向解析: {', '.join(reverse_domains)}")except dns.resolver.NXDOMAIN:logger.debug(f"域名不存在: {domain} (类型: {record_type})")except dns.resolver.NoAnswer:logger.debug(f"无{record_type}记录: {domain}")except (dns.exception.Timeout, dns.resolver.NoNameservers):logger.warning(f"查询超时/无可用DNS服务器: {domain} (类型: {record_type})")except Exception as e:logger.error(f"查询错误 {domain}: {str(e)}")return ipsdef check_subdomain(word: str, domain: str, record_types: List[str], nums: bool) -> List[str]:"""检查单个子域名(含数字后缀变种)"""successes = []# 基础子域名subdomain = f"{word}.{domain}"for rtype in record_types:if dns_request(subdomain, rtype):successes.append(subdomain)# 带数字后缀的子域名if nums:for i in range(10):num_subdomain = f"{word}{i}.{domain}"for rtype in record_types:if dns_request(num_subdomain, rtype):successes.append(num_subdomain)return successesdef subdomain_search(domain: str, dictionary: List[str], nums: bool = False,record_types: List[str] = ['A'], threads: int = 5) -> List[str]:"""子域名枚举主函数:param domain: 主域名:param dictionary: 子域名字典列表:param nums: 是否检查数字后缀:param record_types: 需要查询的DNS记录类型:param threads: 线程数,控制并发:return: 所有存在的子域名列表"""successes = []total = len(dictionary)start_time = time.time()with ThreadPoolExecutor(max_workers=threads) as executor:# 提交所有任务futures = {executor.submit(check_subdomain, word, domain, record_types, nums): wordfor word in dictionary}# 跟踪进度completed = 0for future in as_completed(futures):completed += 1progress = (completed / total) * 100logger.info(f"进度: {progress:.2f}% ({completed}/{total})")successes.extend(future.result())logger.info(f"枚举完成,耗时: {time.time() - start_time:.2f}秒")return list(set(successes)) # 去重def save_results(results: List[str], domain: str) -> None:"""保存结果到文件"""timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")filename = f"subdomains_{domain}_{timestamp}.txt"with open(filename, "w") as f:for subdomain in sorted(results):f.write(f"{subdomain}\n")logger.info(f"结果已保存到: {filename}")def main():# 命令行参数解析parser = argparse.ArgumentParser(description="高级DNS子域名枚举工具")parser.add_argument("-d", "--domain", required=True, help="目标主域名 (例如: google.com)")parser.add_argument("-f", "--file", default="subdomains.txt", help="子域名字典文件路径")parser.add_argument("-n", "--nums", action="store_true", help="检查带数字(0-9)后缀的子域名")parser.add_argument("-t", "--types", default="A", help="DNS记录类型,多个用逗号分隔 (例如: A,AAAA,CNAME)")parser.add_argument("-th", "--threads", type=int, default=5, help="并发线程数 (默认: 5)")args = parser.parse_args()# 解析记录类型record_types = [t.strip().upper() for t in args.types.split(',')]# 读取字典文件try:with open(args.file, "r", encoding="utf-8") as f:dictionary = [line.strip() for line in f if line.strip()]logger.info(f"已加载字典文件: {args.file} (共{len(dictionary)}个条目)")except FileNotFoundError:logger.error(f"字典文件不存在: {args.file}")return# 执行子域名枚举logger.info(f"开始枚举 {args.domain} 的子域名...")results = subdomain_search(domain=args.domain,dictionary=dictionary,nums=args.nums,record_types=record_types,threads=args.threads)# 输出结果if results:logger.info(f"共发现 {len(results)} 个有效子域名:")for sub in sorted(results):logger.info(f" - {sub}")save_results(results, args.domain)else:logger.info("未发现有效子域名")if __name__ == "__main__":main()
1. 模块导入与日志配置
import dns.resolver
import socket
import argparse
import time
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor, as_completed
import logging
from typing import List, Optional# 配置日志输出
logging.basicConfig(level=logging.INFO,format='%(asctime)s - %(levelname)s - %(message)s',handlers=[logging.FileHandler("dns_exploration.log"),logging.StreamHandler()]
)
logger = logging.getLogger(__name__)
解析:
-
模块选择逻辑:
dns.resolver
:核心 DNS 查询工具,用于解析域名的各类 DNS 记录(替代原始版本的基础查询)。socket
:仅保留反向 DNS 查询功能(IP→域名映射)。argparse
:实现命令行参数解析,让工具支持灵活配置(如指定域名、字典文件等)。concurrent.futures
:提供线程池支持,实现并发查询以提升效率(解决原始版本单线程慢的问题)。logging
:替代原始的print
输出,支持多级别日志(INFO/WARNING/ERROR)和双目标输出(控制台 + 文件)。typing
:添加类型注解,提升代码可读性和 IDE 支持(如List[str]
表示字符串列表)。
-
日志配置细节:
level=logging.INFO
:默认输出 INFO 及以上级别日志(DEBUG 信息需手动开启)。format
:包含时间戳、日志级别和消息内容,便于追溯执行过程。handlers
:同时输出到文件(dns_exploration.log
)和控制台,既保留记录又方便实时查看。
2. 反向 DNS 查询函数 reverse_dns
def reverse_dns(ip: str) -> Optional[List[str]]:"""反向DNS查询,获取IP对应的域名信息"""try:result = socket.gethostbyaddr(ip)return [result[0]] + result[1]except (socket.herror, socket.timeout):return None
解析:
- 功能定位:通过 IP 地址反向解析对应的域名(IP→域名映射),补充正向 DNS 查询的信息。
- 参数与返回值:
- 输入
ip: str
:待查询的 IP 地址(IPv4 或 IPv6)。 - 返回
Optional[List[str]]
:可能为None
(解析失败)或域名列表(主域名 + 别名)。
- 输入
- 异常处理:
socket.herror
:主机不存在或无反向记录时触发。socket.timeout
:查询超时(新增处理,解决原始版本未处理超时的问题)。
- 实现细节:
socket.gethostbyaddr(ip)
返回元组(主机名, 别名列表, IP列表)
,此处提取主机名和别名合并为列表返回。
3. DNS 查询函数 dns_request
def dns_request(domain: str, record_type: str = 'A') -> List[str]:"""DNS查询函数,支持多种记录类型:param domain: 待查询域名:param record_type: DNS记录类型(A, AAAA, CNAME, MX, NS等):return: 查询到的记录列表"""ips = []try:resolver = dns.resolver.Resolver()resolver.timeout = 5 # 超时时间resolver.lifetime = 10 # 总生命周期result = resolver.resolve(domain, record_type)for answer in result:record_value = answer.to_text()ips.append(record_value)logger.info(f"域名: {domain} | 类型: {record_type} | 记录值: {record_value}")# 对A/AAAA记录进行反向查询if record_type in ['A', 'AAAA']:reverse_domains = reverse_dns(record_value)if reverse_domains:logger.info(f" 反向解析: {', '.join(reverse_domains)}")except dns.resolver.NXDOMAIN:logger.debug(f"域名不存在: {domain} (类型: {record_type})")except dns.resolver.NoAnswer:logger.debug(f"无{record_type}记录: {domain}")except (dns.exception.Timeout, dns.resolver.NoNameservers):logger.warning(f"查询超时/无可用DNS服务器: {domain} (类型: {record_type})")except Exception as e:logger.error(f"查询错误 {domain}: {str(e)}")return ips
解析:
- 核心改进:从原始版本仅支持 A 记录查询,扩展为支持任意 DNS 记录类型(A/AAAA/CNAME/MX 等)。
- ** resolver 配置 **:
timeout=5
:单次查询超时时间(5 秒),避免长时间阻塞。lifetime=10
:总查询生命周期(10 秒),包含重试时间,防止无限等待。
- 记录处理逻辑:
- 对查询结果
result
遍历,将每条记录转为文本(如 A 记录的 IP、CNAME 的域名)并存储。 - 仅对 A/AAAA 记录(IP 地址记录)触发反向查询,因为其他类型(如 MX)的记录值不是 IP,无需反向解析。
- 对查询结果
- 异常处理增强:
NXDOMAIN
:域名不存在(DEBUG 级别,避免日志冗余)。NoAnswer
:域名存在但无指定类型记录(如查询 AAAA 但域名仅支持 IPv4)。Timeout/NoNameservers
:警告级别,提示网络或 DNS 服务器问题。- 通用异常捕获:防止未预料的错误导致程序崩溃。
4. 子域名检查函数 check_subdomain
def check_subdomain(word: str, domain: str, record_types: List[str], nums: bool) -> List[str]:"""检查单个子域名(含数字后缀变种)"""successes = []# 基础子域名subdomain = f"{word}.{domain}"for rtype in record_types:if dns_request(subdomain, rtype):successes.append(subdomain)# 带数字后缀的子域名if nums:for i in range(10):num_subdomain = f"{word}{i}.{domain}"for rtype in record_types:if dns_request(num_subdomain, rtype):successes.append(num_subdomain)return successes
解析:
- 功能定位:处理单个字典词的子域名生成与查询,是并发任务的最小执行单元。
- 逻辑拆分:
- 基础子域名:将字典词与主域名拼接(如
word + .google.com
),查询所有指定记录类型。 - 数字后缀子域名:若
nums=True
,在字典词后拼接 0-9 数字(如word0.google.com
),再查询记录类型。
- 基础子域名:将字典词与主域名拼接(如
- 去重预备:返回的
successes
列表可能包含重复子域名(如同一子域名同时有 A 和 CNAME 记录),后续在主函数中通过list(set(successes))
去重。
5. 子域名枚举主函数 subdomain_search
def subdomain_search(domain: str, dictionary: List[str], nums: bool = False,record_types: List[str] = ['A'], threads: int = 5) -> List[str]:"""子域名枚举主函数:param domain: 主域名:param dictionary: 子域名字典列表:param nums: 是否检查数字后缀:param record_types: 需要查询的DNS记录类型:param threads: 线程数,控制并发:return: 所有存在的子域名列表"""successes = []total = len(dictionary)start_time = time.time()with ThreadPoolExecutor(max_workers=threads) as executor:# 提交所有任务futures = {executor.submit(check_subdomain, word, domain, record_types, nums): wordfor word in dictionary}# 跟踪进度completed = 0for future in as_completed(futures):completed += 1progress = (completed / total) * 100logger.info(f"进度: {progress:.2f}% ({completed}/{total})")successes.extend(future.result())logger.info(f"枚举完成,耗时: {time.time() - start_time:.2f}秒")return list(set(successes)) # 去重
解析:
- 并发核心:使用
ThreadPoolExecutor
实现多线程查询,解决原始版本单线程效率低下的问题。max_workers=threads
控制并发数(默认 5,可通过命令行调整)。 - 任务管理:
executor.submit
:为每个字典词提交一个check_subdomain
任务,返回Future
对象(代表异步任务)。as_completed(futures)
:迭代已完成的任务,实时获取结果并更新进度。
- 进度跟踪:通过
completed/total
计算进度百分比,让用户了解枚举进展(原始版本无进度提示)。 - 性能优化:
- 并发查询大幅缩短总耗时(尤其字典较大时)。
- 最终通过
list(set(successes))
去重,避免同一子域名被多次记录。
6. 结果保存与主函数 main
def save_results(results: List[str], domain: str) -> None:"""保存结果到文件"""timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")filename = f"subdomains_{domain}_{timestamp}.txt"with open(filename, "w") as f:for subdomain in sorted(results):f.write(f"{subdomain}\n")logger.info(f"结果已保存到: {filename}")def main():# 命令行参数解析parser = argparse.ArgumentParser(description="高级DNS子域名枚举工具")parser.add_argument("-d", "--domain", required=True, help="目标主域名 (例如: google.com)")parser.add_argument("-f", "--file", default="subdomains.txt", help="子域名字典文件路径")parser.add_argument("-n", "--nums", action="store_true", help="检查带数字(0-9)后缀的子域名")parser.add_argument("-t", "--types", default="A", help="DNS记录类型,多个用逗号分隔 (例如: A,AAAA,CNAME)")parser.add_argument("-th", "--threads", type=int, default=5, help="并发线程数 (默认: 5)")args = parser.parse_args()# 解析记录类型record_types = [t.strip().upper() for t in args.types.split(',')]# 读取字典文件try:with open(args.file, "r", encoding="utf-8") as f:dictionary = [line.strip() for line in f if line.strip()]logger.info(f"已加载字典文件: {args.file} (共{len(dictionary)}个条目)")except FileNotFoundError:logger.error(f"字典文件不存在: {args.file}")return# 执行子域名枚举logger.info(f"开始枚举 {args.domain} 的子域名...")results = subdomain_search(domain=args.domain,dictionary=dictionary,nums=args.nums,record_types=record_types,threads=args.threads)# 输出结果if results:logger.info(f"共发现 {len(results)} 个有效子域名:")for sub in sorted(results):logger.info(f" - {sub}")save_results(results, args.domain)else:logger.info("未发现有效子域名")
解析:
-
save_results
函数:- 生成带时间戳的文件名(如
subdomains_google.com_20240520_153045.txt
),避免结果文件覆盖。 - 对结果排序后写入,方便阅读和后续处理。
- 生成带时间戳的文件名(如
-
main
函数(程序入口):- 参数解析:通过
argparse
定义命令行参数,支持用户灵活配置目标域名、字典文件、记录类型等(原始版本需硬编码修改参数)。 - 字典加载:读取子域名字典文件,过滤空行,统计条目数(便于进度计算)。
- 流程调度:调用
subdomain_search
执行枚举,最终输出结果并保存到文件。
- 参数解析:通过
通过此代码,我们能够了解子域名爆破的内在逻辑,以及通过附加的功能以及不断优化从而能够在大多数的情况下进行子域名爆破。以上源代码能够进行基本的使用,这里主要作用是能够通过代码了解工具的内在原理,代码可自行进行完善等操作。还有,爆破就重要的一点,就是需要一个强大的字典。