Pipeline 引用外部数据源最佳实践

场景解析

在企业网络安全日志处理场景中,防火墙、入侵检测系统(IDS)等设备会持续产生大量日志,记录网络流量、访问请求、异常事件等基础信息,但这些原始日志仅能呈现表面现象,难以全面剖析安全威胁,需要在日志处理过程中引入外部数据增强安全分析能力。

例如,某天 IDS(入侵检测系统) 日志记录到多个来自同一 IP 地址的异常端口扫描行为,原始日志仅显示时间、源 IP、扫描端口等信息,无法判断该 IP 是否为恶意攻击源,也不清楚其背后的攻击意图。此时,就需要引用外部数据来丰富日志内容。可以从威胁情报平台获取该 IP 地址的历史攻击记录、所属的恶意组织标签等数据,从地理位置数据库获取其所在地区、网络服务提供商等信息,并将这些外部数据与原始日志进行关联整合。

经过数据融合后,原本孤立的日志事件就有了更丰富的背景信息。安全人员可以通过这些丰富的日志数据,快速判断该 IP 地址是否属于已知的恶意攻击源,是否存在特定的攻击目标偏好,进而采取更精准的安全防护措施,如封禁 IP、加强对应端口的防护策略等。同时,还能基于这些数据对攻击模式进行分析,预测潜在的安全风险,提前完善安全防御体系,提升企业整体的网络安全防护能力。

方案解析

观测云 Pipeline 是一个可编程数据处理器,使用观测云开源的 Platypus 语言作为运行时,能够在边缘节点进行大规模数据分析和特征提取。Datakit Pipeline 提供以下两个内置函数用于从外部表引用数据:

  • query_refer_table(),函数原型为 fn query_refer_table(table_name: str, key: str, value),它能够查询 table_name 表中 key 列为 value 的行,将返回的首行数据丰富至日志中;
  • mquery_refer_table(),函数原型为 fn mquery_refer_table(table_name: str, keys: list, values: list),相比 query_refer_table(),该函数能够使用多个列和值对 table_name 表进行查询。

在以上函数的支持下,Pipeline 能够使用外部表对安全日志进行丰富,以 Zeek conn.log 为例,丰富前的日志如下:

{"ts": 1591367999.3059881,"uid": "CMdzit1AMNsmfAIiQc","id.orig_h": "192.168.4.76", # 源 IP"id.orig_p": 36844,"id.resp_h": "192.168.4.1","id.resp_p": 53,"proto": "udp","service": "dns","duration": 0.06685185432434082,"orig_bytes": 62,"resp_bytes": 141,"conn_state": "SF","missed_bytes": 0,"history": "Dd","orig_pkts": 2,"orig_ip_bytes": 118,"resp_pkts": 2,"resp_ip_bytes": 197,"ip_proto": 17
}

假设在 Pipeline 中引用了风险情报表,此表中记录了危险 IP,包含 IP 列和信息列,当源 IP id.orig_h 字段的值能够匹配到风险表中 IP 列的值时,就会为此日志丰富风险信息字段,丰富后的日志如下:

{"ts": 1591367999.3059881,"uid": "CMdzit1AMNsmfAIiQc","id.orig_h": "192.168.4.76", # 源 IP"id.orig_p": 36844,"id.resp_h": "192.168.4.1","id.resp_p": 53,"proto": "udp","service": "dns","duration": 0.06685185432434082,"orig_bytes": 62,"resp_bytes": 141,"conn_state": "SF","missed_bytes": 0,"history": "Dd","orig_pkts": 2,"orig_ip_bytes": 118,"resp_pkts": 2,"resp_ip_bytes": 197,"ip_proto": 17,"risk_ip": "192.168.4.76", # 丰富风险信息字段"risk_info": "此 IP 近期发起大量攻击" # 丰富风险信息字段
}

在对日志完成丰富后就可以在观测云过滤存在 risk_info 字段的日志进行告警和特征分析,具体的分析方法和场景根据丰富的字段扩展,例如丰富了风险 IP 的地理位置和运营商信息后就能在观测云仪表盘中以地图的方式呈现攻击来源。

在外部表管理方面,Datakit 从 refer_table_url 中以指定间隔拉取外部表数据供 Pipeline 使用,外部表数据必须组织为以下格式:

[{"table_name": "table_abc","column_name": ["col", "col2", "col3", "col4"],"column_type": ["string", "float", "int", "bool"],"row_data": [["a", 123, "123", "false"],["ab", "1234.", "123", true],["ab", "1234.", "1235", "false"]]},{"table_name": "table_ijk","column_name": ["name", "id"],"column_type": ["string", "string"],"row_data": [["a", "12"],["a", "123"],["ab", "1234"]]}
]

也就是说必须提供一个 HTTP/HTTPS 端点暴露表数据,可以使用 Nginx 托管 JSON 文件的方式,但是考虑到更灵活的集成能力,推荐内网部署观测云 DataFlux Func,Func 是一个函数开发、管理、执行平台,可将集成了威胁平台的 Python 函数暴露为拉取表数据的端点,也就是说,当 Datakit 从此端点同步数据时会触发脚本运行,脚本将从一个或者多个平台获取数据并组装为指定的格式。整体架构如下:

注意:该功能内存消耗较高,以 150 万行(refer_table 行数)、磁盘占用约 200MB(JSON 文件)的不重复数据(string 类型两列;int,float,bool 各一列)为例,内存占用维持在 950MB ~ 1.2GB,更新时的峰值内存 2.2GB ~ 2.7GB,可通过配置 use_sqlite = true,将数据保存到磁盘上(即使用 SQLite 存储数据,而不是内存)。

演示用例

前置条件

假设用户具备以下条件:

  • 部署了 DataKit 的 Linux 主机;
  • 部署了 DataFlux Func。

配置 Func

在 Func 中新建脚本集 “Pipeline 外部表 Demo”,新建 main 文件,写入以下函数后发布:

@DFF.API('外部表', cache_result=3000, timeout=10)
def refer_table():'''返回符合 Pipeline query_refer_table() 和 mquery_refer_table() 函数格式要求的表数据。'''data = [{"table_name": "risk_ip","column_name": ["risk_ip", "risk_info"],"column_type": ["string", "string"],"row_data": [["180.173.79.213", "属于 xxx 组织的恶意 IP,存在端口和漏洞扫描行为"],["180.173.79.214", "属于 xxx 组织的恶意 IP,存在病毒传播行为"],]}]print('执行同步请求')return data

在 Func 管理页面中新建同步 API,将此函数暴露为接口,为 DataKit 提供外部数据,点击“示例”即可查看接口 URL:

在 Shell 中请求此 URL 即可获得数据:

配置 DataKit 拉取外部表

编辑 DataKit 配置文件:

vim /usr/local/datakit/conf.d/datakit.conf

修改以下配置:

[pipeline]# 将 <YOUR-FUNC-API-URL> 替换为 Func 同步 API 的 URLrefer_table_url = <YOUR-FUNC-API-URL>refer_table_pull_interval = "5m"use_sqlite = truesqlite_mem_mode = false

重启 DataKit 使配置生效:

datakit service -R

配置示例日志

执行以下命令,使用脚本生成测试日志:

# 创建测试目录
mkdir -p ~/workspace/log_demo && cd $_# 创建脚本
cat > gen_log.sh << 'EOF'
#!/bin/bashwhile true; dotimestamp=$(date +%s.%N | cut -c1-17)ips=("180.173.79.213" "180.173.79.214")log="{\"ts\":${timestamp},\"uid\":\"CMdzit1AMNsmfAIiQc\",\"id.orig_h\":\"${ips[$((RANDOM % 2))]}\",\"id.orig_p\":36844,\"id.resp_h\":\"192.168.4.1\",\"id.resp_p\":53,\"proto\":\"udp\",\"service\":\"dns\",\"duration\":0.06685185432434082,\"orig_bytes\":62,\"resp_bytes\":141,\"conn_state\":\"SF\",\"missed_bytes\":0,\"history\":\"Dd\",\"orig_pkts\":2,\"orig_ip_bytes\":118,\"resp_pkts\":2,\"resp_ip_bytes\":197,\"ip_proto\":17}"echo "${log}" >> ./demo.logecho "++ gen log: ${log}"sleep 1
done
EOF# 运行脚本,将在当前目录下生成日志文件 demo.log
bash gen_log.sh

配置 DataKit 采集示例日志

配置 DataKit 日志采集插件:

cd /usr/local/datakit/conf.d/log
cp logging.conf.sample demo.conf
vim demo.conf

修改以下配置:

[[inputs.logging]]# 日志文件路径logfiles = ["/root/workspace/log_demo/demo.log",]# 日志来源source = "demo"

重启 DataKit 使配置生效:

datakit service -R

登录观测云点击【日志】,可见 demo.log 已经被采集。

配置 Pipeline

点击【Pipelines】-【新建 Pipeline】,运行模式选择“本地 Pipeline”,索引选择 “default”,日志来源选择 “demo”,Pipeline 名称填写 “demo”,在“定义解析规则”中输入以下脚本后点击保存:

# 将 JSON 字符串转换为对象
obj = load_json(_)# 从 JSON 对象中提取字段并处理
# 原始日志使用时间戳标记日志时间,可读性差,提取此字段并转换为人类易读的格式
pt_kvs_set("ts_date", obj["ts"]*1000000)
datetime(ts_date, "us", "RFC3339Nano", "Asia/Shanghai")# 从日志中提取源 IP,并根据源 IP 从外部表中丰富字段
pt_kvs_set("src_ip", obj["id.orig_h"])
# query_refer_table 函数的参数分别为外部表名、列名、列值
query_refer_table("risk_ip", "risk_ip", src_ip)

效果确认

Pipeline 保存后约 1 分钟后生效,生效后新增的字段需等待服务端刷新字段后才可查询,刷新完成后查看日志列表,已经丰富了相关字段。

可以快速分析风险的趋势和占比。

配置告警后,可在风险发生时及时告警。

如果在告警中关联创建异常追踪,可闭环对安全风险的处理过程。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/news/913673.shtml
繁体地址,请注明出处:http://hk.pswp.cn/news/913673.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

UI + MCP Client + MCP Server(并且链接多个Server)

项目结构前端项目--------->MCP Client----------->MCP Serverserver就不过多赘述了&#xff0c;他只是相当于添加了多个的tools 链接前后端 http.createServer创建一个服务器// ---------------------------------------------------------------- // server.js import …

香港站群服务器与普通香港服务器对比

在选择香港服务器时&#xff0c;用户常常会遇到"站群服务器"和"普通服务器"两种选项&#xff0c;虽然它们都基于香港数据中心的基础设施&#xff0c;但在 IP 地址配置、功能定位和管理复杂度、成本上存在显著差异&#xff0c;理解这些差异有助于用户根据实…

4.B树和B+树的区别?为什么MySQL选择B+树作为索引?

区别&#xff1a;1.数据存储位置B树每个节点都存储了索引和数据B树只有叶子节点存储数据&#xff0c;非叶子节点仅存储索引2.叶子节点的链接B树的所有叶子节点通过指针连接成一个双向链表&#xff0c;可以高效地进行范围查询或者顺序遍历B树则没有这样的连接关系&#xff0c;查…

转换狂魔,Modbus TCP转Profinet网关打通视觉传感线连接之路

在汽车零部件冲压生产线的世界中&#xff0c;液压机的压力稳定性是确保产品质量的秘密武器。然而&#xff0c;旧时代的人工巡检和传统监测方式却好似拖累现代化进程的沉重枷锁&#xff1a;效率低、成本高&#xff0c;还总是赶不上实时反馈的快车。这时&#xff0c;工厂决心大刀…

C++进阶—二叉树进阶

第一章&#xff1a;内容安排说明 map和set特性需要先铺垫二叉搜索树&#xff0c;而二叉搜索树也是一种树形结构二叉搜索树的特性了解&#xff0c;有助于更好的理解map和set的特性二叉树中部分面试题稍微有点难度&#xff0c;在前面讲解大家不容易接受&#xff0c;且时间长容易…

驱动下一代E/E架构的神经脉络进化—10BASE-T1S

汽车电子电气架构的演进正经历一场深刻的变革&#xff0c;“中央计算单元区域控制器”的架构模式已成为当前主流车型平台发展的明确方向。这种从传统的“功能域”&#xff08;Domain&#xff09;架构向“区域”&#xff08;Zonal&#xff09;架构的转型升级&#xff0c;旨在实现…

某学校系统中挖矿病毒应急排查

本篇文章主要记录某学校长期未运营维护的程序&#xff0c;被黑客发现了漏洞&#xff0c;但好在学校有全流量设备&#xff0c;抓取到了过程中的流量包 需要你进行上机以及结合流量分析&#xff0c;排查攻击者利用的漏洞以及上传利用成功的木马 文章目录靶机介绍1.使用工具分析共…

vue 、react前端页面支持缩放,echarts、地图点击左边不准的原因和解决办法

原因 由于以上都是通过canvas画布生成的&#xff0c;一旦初始化&#xff0c;就会按照比例进行缩放&#xff0c;但与此同时&#xff0c;比例尺并没有变化&#xff0c;导致坐标偏移 解决办法 设置一个zoomVal产量&#xff0c;在页面加载时计算缩放比例&#xff0c;然后在canvas容…

(LeetCode 每日一题) 1353. 最多可以参加的会议数目 (优先队列、小顶堆)

题目&#xff1a;1353. 最多可以参加的会议数目 思路&#xff1a;优先队列实现小顶堆&#xff0c;0(mx*logn) 在第i天&#xff0c;优先选endDay最小的那一个活动进行。那么遍历每一天&#xff0c;用小顶堆来维护每个活动的最后一天即可&#xff0c;细节看注释。 C版本&#xf…

Java结构型模式---代理模式

代理模式基础概念代理模式是一种结构型设计模式&#xff0c;其核心思想是通过创建一个代理对象来控制对另一个真实对象的访问。代理对象在客户端和真实对象之间起到中介作用&#xff0c;允许在不改变真实对象的前提下&#xff0c;对其进行增强或控制。代理模式的核心组件主题接…

MySQL流程控制函数全解析

MySQL 中的流程控制函数&#xff08;也称为条件函数&#xff09;允许你在 SQL 语句中进行逻辑判断&#xff0c;根据不同的条件返回不同的值或执行不同的操作。它们极大地增强了 SQL 的灵活性和表达能力&#xff0c;尤其在进行数据转换、结果格式化、条件聚合和复杂业务逻辑实现…

【7】PostgreSQL 事务

【7】PostgreSQL 事务前言使用事务事务内错误处理事务保存点DDL 事务前言 在 PostgreSQL 中&#xff0c;每一个操作都是一个事务。即使一个简单的查询(select)&#xff0c;这也是一个事务。 例如&#xff1a; postgres# select now();now --------------------…

Linux:多线程---深入互斥浅谈同步

文章目录1. 互斥1.1 为什么需要互斥1.2 互斥锁1.3 初谈互斥与同步1.4 锁的原理1.5 可重入VS线程安全1.6 死锁1.7 避免死锁的算法&#xff08;扩展&#xff09;序&#xff1a;在上一章中我们知道了线程控制的三个角度&#xff1a;线程创建、线程等待和线程终止&#xff0c;分别从…

适用于 vue2、vue3 的自定义指定:v-int(正整数)

在项目中&#xff0c;我们经常会遇到输入框只允许输入数字的情况&#xff0c;下面是一段自定义指定 代码&#xff0c;复制到项目中&#xff0c;注册指定即可使用用法如下&#xff1a; 创建一个IntInput.js 文件&#xff0c;将下面代码复制到文件中保存在项目中的 main.js 文件中…

学习基于springboot秒杀系统-环境配置(接口封装,mybatis,mysql,redis(Linux))

文章目录前言创建springboot项目封装controller层输入输出rest api 的json输出返回页面集成mybatis集成redis下载虚拟机和centos下载redis.tar.gz上传redis.tar.gz 到虚拟机前言 今天开始记录学习秒杀系统-课程是基于慕课上的搜索秒杀系统的课程&#xff0c;老师讲解非常好。这…

stm32达到什么程度叫精通?

STM32达到什么程度叫精通&#xff1f;一个十年老兵的深度反思 前言&#xff1a;精通二字&#xff0c;重如泰山 每次有人问我"STM32达到什么程度叫精通"这个问题&#xff0c;我都会沉默很久。 不是因为这个问题难回答&#xff0c;而是因为"精通"这两个字太重…

微软上线Deep Research:OpenAI同款智能体,o3+必应双王炸

今天凌晨&#xff0c;微软在官网宣布&#xff0c;Azure AI Foundry中上线Deep Research公开预览版。这是支持API和SDK的OpenAI 高级智能体研究能力产品&#xff0c;并且Azure 的企业级智能体平台完全集成。Deep Research是OpenAI在今年4月25日发布的最新产品&#xff0c;能够像…

Spring Batch终极指南:原理、实战与性能优化

&#x1f31f; Spring Batch终极指南&#xff1a;原理、实战与性能优化单机日处理10亿数据&#xff1f;揭秘企业级批处理架构的核心引擎&#xff01;一、Spring Batch 究竟是什么&#xff1f;Spring batch是用于创建批处理应用程序&#xff08;执行一系列作业&#xff09;的开源…

【Part 3 Unity VR眼镜端播放器开发与优化】第四节|高分辨率VR全景视频播放性能优化

文章目录《VR 360全景视频开发》专栏Part 3&#xff5c;Unity VR眼镜端播放器开发与优化第一节&#xff5c;基于Unity的360全景视频播放实现方案第二节&#xff5c;VR眼镜端的开发适配与交互设计第三节&#xff5c;Unity VR手势交互开发与深度优化第四节&#xff5c;高分辨率V…

TCP/IP协议基础

TCPIP协议基础 网络模型 -OSI参考模型 -OSI参考模型各层功能 -TCP/IP网络模型 -TCP/IP协议栈OSI参考模型 – 为了解决网络设备之间的兼容性问题&#xff0c;国际标准化组织ISO于1984年提出了OSI RM&#xff08;开放系统互连参考模型&#xff09;。 OSI参考模型一共有七层&#…