Python-正则表达式-信息提取-滑动窗口-数据分发-文件加载及分析器-浏览器分析-学习笔记

欠4前年的一份笔记 ,献给今后的自己。

正则表达式

概述

正则表达式,Regular Expression,缩写为regex、regexp、RE等。

正则表达式是文本处理极为重要的技术,用它可以对字符串按照某种规则进行检索、替换。

1970年代,Unix之父Ken Thompson将正则表达式引入到Unix中文本编辑器ed和grep命令中,由此正则表达式普及未来。

1980年后,perl语言对Henry Spencef编写的库,扩展了很多新的特性。1997年开始,Philip Hazel开发出了PCRE (Perl Compatible Regular Expressions),它被PHP和HTTPD等工具采用。

正则表达式应用极其广泛,shell中处理文本的命令、各种高级编程语言都支持正则表达式。

参考 https://www.w3cschool.cn/regex_rmjc/

分类

  1. BRE

基本正则表达式,grep、sed、vi等软件支持。vim有扩展。

  1. ERE

扩展正则表达式,egrep(grep-E)、sed-r等。

  1. PCRE

几乎所有高级语言都是PCRE的方言或者变种。Python从1.6开始使用SRE正则表达式引擎, 可以认为是PCRE的子集,见模块re。

基本语法

元字符 metacharacter

在这里插入图片描述

转义

凡是在正则表达式中有特殊意义的符号,如果想使用它的本意,请使用\转义。反斜杠自身,得使用\\
\r,\n还是转义后代表回车、换行

重复

在这里插入图片描述

练习:

1、匹配手机号码

字符串为“手机号码13851888188。”

2、匹配中国座机

字符串为“号码025-83105736、0543-5467328。”

  1. \d{11}
  2. \d{3,4)-1d{7,8)

在这里插入图片描述

注意:
断言会不会捕获呢?也就是断言占不占分组号呢?
断言不占分组号。断言如同条件,只是要求匹配必须满足断言的条件。
分组和捕获是同一个意思
使用正则表达式时,能用简单表达式,就不要复杂的表达式
贪婪与非贪婪
默认是贪婪模式,也就是说尽量多匹配更长的字符串。
非贪婪很简单,在重复的符号后面加上一个?问号,就尽量的少匹配了。

在这里插入图片描述

very very happy 使用v.*y和v.*?y

引擎选项

在这里插入图片描述
单行模式:
. 可以匹配所有字符,包括换行符。
^表示整个字符串的开头,$整个字符串的结尾
多行模式:
.可以匹配除了换行符之外的字符。
^ 表不行首,$行尾
^表示整个字符串的开始,$表示整个字符串的结尾。开始指的是\n后紧接着下一个字符,结束
指的是/n前的字符
可以认为,单行模式就如同看穿了换行符,所有文本就是一个长长的只有一行的字符串,所有^就是这一行字符串的行首,$就是这一行的行尾。
多行模式,无法穿透换行符,^和$还是行首行尾的意思,只不过限于每一行
注意:注意字符串中看不见的换行符,\r\n会影响e$的测试,e$只能匹配e\n

举例
very very happy
harry key
上面2行happy之后,有可能是\r\n结尾。
y$ 单行匹配key的y,多行匹配happy和key的y。

练习
1、匹配一个0~999之间的任意数字

1
12
995
9999
102
02
003
4d\d    							1位数
[1-9]?\d    					1-2位数
^([1-9]\d\d?|\d)				1-3位数
^([1-9]\d\d?|\d)$				1-3位数的行
^([1-9]\d\d?|\d)\r?$
^([1-9]\d\d?|\d)(?!\d) 			数字开头1-3位数且之后不能是数字

2、IP地址
匹配合法的IP地址

192.168.1.150
0.0.0.0
255.255.255.255
17.16.52.100
172.16.0.100
400.400.999.888
001.022.003.000
257.257.255.256

((\d{1,3}).)<3}(\d{1,3})
(?:(\d{1,3)). 3}(\d{1,3}) # 400.400.999.888

对于ip地址验证的问题

  • 可以把数据提出来后,交给IP地址解析库 处理,如果解析异常,就说明有问题,正则的验
    证只是一个初步的筛选,把明显错误过滤掉。
  • 可以使用复杂的正则表达式验证地址正确性
  • 前导0是可以的
import socketnw = socket.inet_aton('192.168.05.001')
print(nw, socket.inet_ntoa(nw))分析:
每一段上可以写的数字有10100100023023230100,也就说1位就是0-92位每一位
也是0-9,3位第一位只能0-2,其余2位都可以0-9
(?:([e-2]\d{2|\d{1,2})\.){3}([0-2]\d{2|\d{1,2})# 解决超出200的问题,但是256呢?
200是特殊的,要再单独分情况处理
250-5|20-4]\d|01]?\d\d?这就是每一段的逻辑
(?: (25[0-5]|2[0-4]\d|[01]?\d\d?)1.){3)(25[0-5]|2[0-4]\d|[01]?\d\d?)

3、选出含有ftp的链接,且文件类型是gz或者xz的文件名

ftp://ftp.astron.com/pub/file/file-5.14.tar.gz
ttp://ftp.gmplib.org/pub/gmp-5.1.2/gmp-5.1.2.tar.xz
ttp://ftp.vim.org/pub/vim/unix/vim-7.3.tar.bz2
http://anduin.linuxfromscratch.org/sources/LFS/1fs-packages/conglomeration//iana-etc/iana-etc-2.30.tar.bz2
http://anduin.linuxfromscratch.org/sources/other/udev-1fs-205-1.tar.bz2
http://download.savannah.gnu.org/releases/1ibpipeline/libpipeline-1.2.4.tar.gz
http://download.savannah.gnu.org/releases/man-db/man-db-2.6.5.tar.xz
http://download.savannah.gnu.org/releases/sysvinit/sysvinit-2.88dsf.tar.bz2
http://ftp.altlinux.org/pub/people/legion/kbd/kbd-1.15.5.tar.gz
http://mirror.hust.edu.cn/gnu/autoconf/autoconf-2.69.tar.xz
http://mirror.hust.edu.cn/gnu/automake/automake-1.14.tar.xz

.*ftp.*\.(?:gz|xz) #
ftp.*/(.*(?:gz|xz))
\.*ftp.*/([^/]*\.(?:gz|xz))# 捕获文件名分组
(?<=.*ftp.*/)[^/]*\.(?:gz|xz)# 断言文件名前一定还有ftp

Python的正则表达式

Python使用re模块提供了正则表达式处理的能力。

在这里插入图片描述

方法

编译

re.compile(pattern, flags=0)
设定flags,编译模式,返回正则表达式对象regex。
pattern就是正则表达式字符串,flags是选项。正则表达式需要被编译,为了提高效率,这些编译后的结果被保存,下次使用同样的pattern的时候,就不需要再次编译。
re的其它方法为了提高效率都调用了编译方法,就是为了提速。

单次匹配

re.match(pattern, string, flags=0)

regex.match(string[, pos[, endpos]])

match匹配从字符串的开头匹配,regex对象match方法可以重设定开始位置和结束位置。返回match对象
re.search(pattern, string, flags=0)
regex. search(string[, pos[, endpos]])

从头搜索直到第一个匹配,regex对象search方法可以重设定开始位置和结束位置,返回match对象

re.fullmatch(pattern, string, flags=0)

regex.fullmatch(string[, pos[, endpos]])

整个字符串和正则表达式匹配

import res = '''bottle\nbag\nbig\napple'''
for i, c in enumerate(s, 1):print((i - 1, c), end='\n' if i % 8 == 0 else ' ')print()# (0, 'b') (1, 'o') (2, 't') (3, 't') (4, 'l') (5, 'e') (6, '\n') (7, 'b')
# (8, 'a') (9, 'g') (10, '\n') (11, 'b') (12, 'i') (13, 'g') (14, '\n') (15, 'a')
# (16, 'p') (17, 'p') (18, 'l') (19, 'e')# match方法
print('--match--')
result = re.match('b', s)  # 找到一个就不找了
print(1, result)  # <re.Match object; span=(0, 1), match='b'> 
result = re.match('a', s)  # 没找到,返回None
print(2, result)     # None
result = re.match('^a', s, re.M)  # 依然从头开始找,多行模式没有用
print(3, result)        # None
result = re.match('^a', s, re.S)  # 依然从头开始找print(4, result)        # None
# 先编译,然后使用正则表达式对象
regex = re.compile('a')
result = regex.match(s)  # 依然从头开始找
print(5, result)        # None
result = regex.match(s, 15)  # 把索引15作为开始找
print(6, result)  # <re.Match object; span=(15, 16), match='a'>
print()
# search方法
print('--search--')
result = re.search('a', s)  # 扫描找到匹配的第一个位置
print(7, result)  # apple       # <re.Match object; span=(8, 9), match='a'>
regex = re.compile('b')
result = regex.search(s, 1)     print(8, result)  # bag     # <re.Match object; span=(7, 8), match='b'>
regex = re.compile('^b', re.M)result = regex.search(s)  # 不管是不是多行,找到就返回
print(8.5, result)  # bottle    # <re.Match object; span=(0, 1), match='b'>
result = regex.search(s, 8)
print(9, result)  # big     # <re.Match object; span=(11, 12), match='b'>
# fullmatch/jŸZ
result = re.fullmatch('bag', s)
print(10, result)       # None
regex = re.compile('bag')
result = regex.fullmatch(s)     
print(11, result)       # None
result = regex.fullmatch(s, 7)
print(12, result)       # None
result = regex.fullmatch(s, 7, 10)
print(13, result)  # 要完全匹配,多了少了都不行,[7,10)   <re.Match object; span=(7, 10), match='bag'>

全文搜索

re.findall (pattern, string, flags=0)
regex.findall(string[, pos[, endpos]])
对整个字符串,从左至右匹配,返回所有匹配项的列表
re.finditer(pattern, string, flags=0)
regex.finditer(string[, pos[, endpos]])
对整个字符串,从左至右匹配,返回所有匹配项,返回迭代器。
注意每次迭代返回的是match对象。

import res = '''bottle\nbag\nbig\nable'''
for i, c in enumerate(s, 1):print((i - 1, c), end='\n' if i % 8 == 0 else ' ')
# (0, 'b') (1, 'o') (2, 't') (3, 't') (4, 'l') (5, 'e') (6, '\n') (7, 'b')
# (8, 'a') (9, 'g') (10, '\n') (11, 'b') (12, 'i') (13, 'g') (14, '\n') (15, 'a')
# (16, 'b') (17, 'l') (18, 'e') 
print()
# findal1方法result = re.findall('b', s)
print(1, result)        # ['b', 'b', 'b', 'b']
regex = re.compile('^b')
result = regex.findall(s)
print(2, result)        # ['b']
regex = re.compile('^b', re.M)
result = regex.findall(s, 7)
print(3, result)  # bag big     ['b', 'b']regex = re.compile('^b', re.S)result = regex.findall(s)
print(4, result)  # bottle      ['b']
regex = re.compile('^b', re.M)                  
result = regex.findall(s, 7, 10)
print(5, result)  # bag   ['b']
# finditer E
result = regex.finditer(s)
print(type(result))             # <class 'callable_iterator'>
print(next(result))             # <re.Match object; span=(0, 1), match='b'>
print(next(result))             # <re.Match object; span=(7, 8), match='b'>

匹配替换

re.sub(pattern, replacement, string, count=0, flags=0)

regex.sub(replacement, string, count=0)

使用pattern对字符串string进行匹配,对匹配项使用repl替换。

replacement可以是 string, bytes, function.

re.subn(pattern, replacement, string, count=0, flags=0)

regex.subn(replacement, string, count=0)

同sub返回一个元组(new_string,number_of_subs_made)

import re
s = '''bottle\nbag\nbig\napple'''
for i, c in enumerate(s, 1):print((i - 1, c), end='\n' if i % 8 == 0 else ' ')
print()# 替换方法
regex = re.compile('b\wg')
result = regex.sub('magedu', s)
print(1, result)  # 被替换后的字符串
result = regex.sub('magedu', s, 1)  # 替换1次
print(2, result)  # 被替换后的字符串
regex = re.compile('\s+')
result = regex.subn('\t', s)
print(3, result)  # 被替换后的字符串及替换次数的元组

分割字符串

字符串的分割函数,太难用,不能指定多个字符进行分割。

re.split(pattern, string, maxsplit=0, flags=0)

re.split分割字符串

import res = '''01 bottle
02 bag
03 big1
100 able'''for i,c in enumerate(s, 1):print((i-1, c), end='\n' if i%8==0 else ' ')print()# 把每行单词提取出来print(s.split()) # 做不到 ['01', 'bottle', '02', 'bag', '03', 'big1', '100', 'able']result = re.split('[\s\d]+',s)print(1, result) # ['', 'bottle', 'bag', 'big', 'able']regex = re.compile('^[\s\d]+') # 字符串首result = regex.split(s)print(2, result) # '', 'bottle\n02 bag\n03 big1\n100 able']regex = re.compile('^[\s\d]+', re.M) # 行首result = regex.split(s)print(3, result) # ['', 'bottle\n', 'bag\n', 'big1\n', 'able']regex = re.compile('\s+\d+\s+')result = regex.split(' ' + s)print(4, result)

分组

使用小括号的pattern捕获的数据被放到了组group中。

match、search函数可以返回match对象;findall这回字符串列表;finditer返回一个个match对象

如果pattern中使用了分组,如果有匹配的结果,会在match对象中

1、使用group(N)方式返回对应分组,1-N是对应的分组,0返回整个匹配的字符串
2、如果使用了命名分组,可以使用group(‘name’) 的方式取分组
3、也可以使用groups() 返回所有组
4、使用groupdict() 返回所有命名的分组

import res = '''bottle\nbag\nbig\napple'''for i, c in enumerate(s, 1):print((i - 1, c), end='\n' if i % 8 == 0 else ' ')print()# 分组
regex = re.compile('(b\w+)')
result = regex.match(s)
print(type(result))
print(1, 'match', result.groups())
result = regex.search(s, 1)
print(2, 'search', result.groups())  ## 命名分组regex = re.compile('(b\w+)\n(?P<name2>b\w+)\n(?P<name3>b\w+)')result = regex.match(s)print(3, 'match', result)print(4, result.group(3), result.group(2), result.group(1))print(5, result.group(0).encode())  # 0 返回整个匹配字符串print(6, result.group('name2'), result.group('name3'))print(6, result.groups())print(7, result.groupdict())result = regex.findall(s)for x in result:  # 字符串列表print(type(x), x)regex = re.compile('(?P<head>b\w+)')result = regex.finditers()for x in result:print(type(x), x, x.group(), x.group('head'))

练习

匹配邮箱地址

test@hot-mail.com
v-ipamagedu.com
web.manager@magedu. com.cn
super.user@google.com
a@w-a-com

匹 html标记内的内容

<a href= http://www.magedu.com/index.html' target='_blank'>马哥教育</a>

匹配URL

http://www.magedu.com/index.html
https://login.magedu.com
file:///ect/sysconfig/network

匹配二代中国身份证ID

321105700101003
321105197001010030
11210020170101054X
17位数字+1位校验码组成

前6位地址码,8位出生年月,3位数字,1位校验位(0-9或X)

判断密码强弱

要求密码必须由 10-15位 指定字符组成:

十进制数字
大写字母
小写字母
下划线
要求四种类型的字符都要出现才算合法的强密码

例如:Aatb32_67mq,其中包含大写字母、小写字母、数字和下划线,是合格的强密码

单词统计 word count

对sample文件进行单词统计,要求使用正则表达式

参考

邮箱


\w+[-.\w]*@[\w-]+(\.[\w-]+)+html提取<[^<>]+>(.*)<[^<>]+>如果要匹配标记a<(\w+)\s+[^<>]+>(.*)(</\1>)URL提取
(wt)://([^\s]+)身份证验证
身份证验证需要使用公式计算,最严格的应该实名验证。\d{17)[0-9xX]|\d{15}强密码Aatb32_67mnq
Aatb32_67m.nq
中国是一个伟大的国家aA_810-15位,其中包含大写字母、小写字母、数字和下划线
^\w{10,15}$如果测试有不可见字符干扰使用^w{10,15}\r?$看似正确,但是,如果密码有中文呢?^[a-zA-Z0-9_]{10,15}$但是还是没有解决类似于 111111111112 这种密码的问题,如何解决?
需要用到一些非正则表达式的手段。利用判断来解决,思路如下:
1、可以判断当前密码字符串中是否有\W,如果出现就说明一定不是合法的,如果不出现说明合法
2、对合法继续判断,如果出现过_下划线,说明有可能是强密码,但是没有下划线说明一定不是强密码。
3、对包含下划线的合法密码字符串继续判断,如果出现过\\d的,说明有可能是强密码,没有出现\\d的一定不是强密码
4、对上一次的包含下划线、数字的合法的密码字符串继续判断,如果出现了[A-Z]说明有可能是 强密码,没有出现[A-Z]说明一定不是强密码
5、对上一次包含下划线、数字、大写字母的合法密码字符串继续判断,如果出现了[a-z]说明就是强密码,找到了,没有出现小写字母就一定不是强密码。请注意上面的判断的顺序,应该是概率上最可能不出在密码字符申的先判断。

单词统计

from collections import defaultdictimport redef makekey2(line:str, chars=set("""!'"#./\()[],*- \r\n""")):start = 0for i, c in enumerate(line):if c in chars:if start == i:  # 如果紧挨着还是特字符,start一定等丁istart += 1  # 加1并continuecontinueyield line[start:i]start = i + 1  # 加1是跳过这个不需要的特珠字符celse:if start < len(line):  # 小于,说明还有有效的字符,而且一直到末尾yield line[start:]# ''"'I\host\mount splitext('.cshrc splitdrive("//host/computer/dir . abc path s\r\n
regex = re.compile('[^\w-]+')def makekey3(line: str):for word in regex.split(line):if len(word):yield worddef wordcount(filename, encoding='utf8', ignore=set()):d = defaultdict(lambda: 0)with open(filename, encoding=encoding) as f:for line in f:for word in map(str.lower, makekey2(line)):if word not in ignore:d[word] += 1return ddef top(d: dict, n=10):for i, (k, v) in enumerate(sorted(d.items(), key=lambda item: item[1], reverse=True)):if i > n:breakprint(k, v)# 单词统计前儿名
top(wordcount('sample', ignore={'the', 'a'}))

滑动窗口

数据载入

对于本项目来说,数据就是日志的一行行记录,载入数据就是文件10的读取。将获取数据的方法封装成函数。

def load(path):""""装载日志文件"""with open(path) as f:for line in f:fields = extract(line)if fields:yield fieldselse:continue  # TODO 解析失败就抛弃,或者打印日志

时间窗口分析

概念

很多数据,例如日志,都是和时间相关的,都是按照时间顺序产生的。产生的数据分析的时候,要按照时间求值
interval 表示每一次求值的时间间隔
width 时间窗口宽度,指的一次求值的时间窗口宽度

当 width > interval
在这里插入图片描述

数据求值时会有重叠

width = interval

在这里插入图片描述

数据求值没有重叠
当 width < interval
一般不采纳这种方案,会有数据缺失

时序数据

运维环境中,日志、监控等产生的数据都是与时间相关的数据,按照时间先后产生并记录下来的
数据,所以一般按照时间对数据进行分析。

数据分析基本程序结构

无限的生成随机数函数,产生时间相关的数据,返回时间和随机数的字典
每次取3个数据,求平均值。

import random
import datetime
import timedef source():while True:yield {'value': random.randint(1, 100), 'datetime': datetime.datetime.now()}time.sleep(1)# 获取数据
s = source()
items = [next(s) for _ in range(3)]# 处理函数
def handler(iterable):return sum(map(lambda item: item['value'], iterable)) / len(iterable)print(items)
print("{:.2f}".format(handler(items)))# 输出:
# [{'value': 52, 'datetime': datetime.datetime(2025, 7, 10, 23, 3, 57, 876429)}, {'value': 5, 'datetime': datetime.datetime(2025, 7, 10, 23, 3, 58, 880602)}, {'value': 91, 'datetime': datetime.datetime(2025, 7, 10, 23, 3, 59, 885730)}]
# 49.33

上面代码模拟了,一段时间内产生了数据,等了一段固定的时间取数据来计算平均值。

窗口函数实现

将上面的获取数据的程序扩展为window函数。使用重叠的方案。

import random
import datetime
import timedef source(second=1):"""生成数据"""while True:yield {'datetime': datetime.datetime.now(datetime.timezone(datetime.timedelta(hours=8))),'value': random.randint(1, 100)}time.sleep(second)def window(iterator, handler, width: int, interval: int):""":param iterator:数据源,生成器,用来拿数据:param handler:数据处理函数:param width:时间窗口宽度,秒:param interval:处理时间间隔,秒"""start = datetime.datetime.strptime('20170101 000000 +0800', '%Y%m%d %H%M%S %z')current = datetime.datetime.strptime('20170101 010000 +0800', '%Y%m%d %H%M%S %z')buffer = []  # 窗口中的待计算数据delta = datetime.timedelta(seconds=width - interval)while True:# 从数据源获取数据data = next(iterator)if data:buffer.append(data)  # 存入临时缓冲等待计算current = data['datetime']# 每隔interval计算buffer中的数据一次if (current - start).total_seconds() >= interval:ret = handler(buffer)print('{:.2f}'.format(ret))start = current# 清除超出width的数据buffer = [x for x in buffer if x['datetime'] > current - delta]def handler(iterable):return sum(map(lambda x: x['value'], iterable)) / len(iterable)window(source(), handler, 10, 5)

时间的计算

在这里插入图片描述

数据分发

分发
生产者消费者模型
对于一个监控系统,需要处理很多数据,包括日志。对其中已有数据的采集、分析。
被监控对象就是数据的生产者producer,数据的处理程序就是数据的消费者consumer。
生产者消费者传统模型

在这里插入图片描述

传统的生产者消费者模型,生产者生产,消费者消费。但这种模型有些问题
开发的代码耦合太高,如果生成规模扩大,不易扩展,生产和消费的速度很难匹配等。
思考一下,生产者和消费者的问题是什么?
举例:
卖包子的,如果包子卖不完,还要继续蒸包子,会怎么样?门可罗雀,包子成山。
如果把包子先蒸一些,卖着,快卖完了,赶紧包,再蒸一些。不会有等包子的队伍。
如果包子供不应求,还没有和面呢,包子都被预定了,出现排队等包子的情况。
上面这些情况,最核心的问题,就是生产者和消费者速度要匹配的问题。
但是,往往速度不能够很好的匹配。
解决的办法–队列queue。
作用–解耦、缓冲。

在这里插入图片描述

日志生产者往往会部署好几个程序,日志产生的也很多,而消费者也会有多个程序,去提取日志

分析处理。
数据的生产是不稳定的!会造成短时间数据的“潮涌”,需要缓冲。
消费者消费能力不一样,有快有慢,消费者可以自己决定消费缓冲区中的数据。
单机可以使用queue内建的模块构建进程内的队列,满足多个线程间的生产消费需要。
大型系统可以使用第三方消息中间件:RabbitMQ、RocketMQ、Kafka

queue模块–队列

queue模块提供了一个先进先出的队列Queue。
queue. Queue(maxsize=0)
创建FIFO队列,返回Queue对象。
maxsize 小于等于0,队列长度没有限制。
Queue.get(block=True, timeout=None)
从队列中移除元素并返回这个元素。
block 为阻塞,timeout为超时。
如果block为True,是阻塞,timeout为None就是一直阻塞。
如果block为True但是timeout有值,就阻塞到一定秒数抛出Empty异常。
block为False,是非阻塞,timeout将被忽略,要么成功返回一个元素,要么抛出empty异常。
Queue.get_nowait()
等价于 get(False),也就是说要么成功返回一个元素,要么抛出empty异常。
但是queue的这种阻塞效果,需要多线程的时候演示。
Queue.put(item, block=True, timeout=None)
把一个元素加入到队列中去。
block=True,timeout=None,一直阻塞直至有空位放元素。
block=True, timeout=5,阻塞5秒就抛出Full异常。
block=False, timeout失效,立即返回,能塞进去就塞,不能则返回抛出Full异常。
Queue.put_nowait(item)
等价于 put(item, False),也就是能塞进去就塞,不能则返回抛出Full异常。

# Queue测试
from queue import Queue
import randomq = Queue()q.put(random.randint(1, 100))
q.put(random.randint(1, 100))print(q.get())
print(q.get())# print(q.get())# 阻塞
# Traceback (most recent call last):
#   File "/Users/quyixiao/pp/python_lesson/function1/function15.py", line 14, in <module>
#     print(q.get(timeout=3))  # 阻塞,但超时抛异常
#           ~~~~~^^^^^^^^^^^
#   File "/Library/Frameworks/Python.framework/Versions/3.13/lib/python3.13/queue.py", line 212, in get
#     raise Empty
# _queue.Empty
print(q.get(timeout=3))  # 阻塞,但超时抛异常

分发器的实现

生产者(数据源)生产数据,缓冲到消息队列中
数据处理流程:
数据加载 -》提取 -》分析(滑动窗口函数)
处理大量数据的时候,对于一个数据源来说,需要多个消费者处理。但是如何分配数据就是个问题了。
需要一个分发器(调度器),把数据分发给不同的消费者处理。
每一个消费者拿到数据后,有自己的处理函数。所以要有一种注册机制

数据加载–》提取–》分发–》分析函数1
                                          |----》分析函数2

分析1和分析2是不同的handler,不同的窗口宽度、间隔时间
如何分发?
这里就简单一点,轮询策略。
一对多的副本发送,一个数据通过分发器,发送到n个消费者。
消息队列
在生产者和消费者之间使用消息队列,那么所有消费者共用一个消息队列,还是各自拥有一个消息队列呢?
共用一个消息队列也可以,但是需要解决争抢的问题。相对来说每一个消费者自己拥有一个队列,较为容易。

如何注册?
在调度器内部记录有哪些消费者,每一个消费者拥有自己的队列。
线程

由于一条数据会被多个不同的注册过的handler处理,所以最好的方式是多线程。

import threading# 定义线程
# target线程中运行的函数;args这个函数运行时需要的实参的元组
t = threading.Thread(target=window, args=(src, handler, width, interval))# 启动线程
t.start()

分发器代码实现

import random
import datetime
import time
from queue import Queue
import threadingdef source(second=1):"""生成数据"""while True:yield {'datetime': datetime.datetime.now(datetime.timezone(datetime.timedelta(hours=8))),'value': random.randint(1, 100)}time.sleep(second)def window(src: Queue, handler, width: int, interval: int):"""窗口函数:param src:数据源,缓存队列,用来拿数据:param handler:数据处理函数:param width:时间窗口宽度,秒:param interval:处理时间间隔,秒"""start = datetime.datetime.strptime('20170101 000000 +0800', '%Y%m%d %H%M%S %z')current = datetime.datetime.strptime('20170101 010000 +0800', '%Y%m%d %H%M%S %z')buffer = []  # 窗口中的待计算数据delta = datetime.timedelta(seconds=width - interval)while True:# 从数据源获取数据data = src.get()if data:buffer.append(data)  # 存入临时缓冲等待计算current = data['datetime']# 每隔interval计算buffer中的数据一次if (current - start).total_seconds() >= interval:ret = handler(buffer)print('{:.2f}'.format(ret))start = current# 清除超出width的数据buffer = [x for x in buffer if x['datetime'] > current - delta]def handler(iterable):return sum(map(lambda x: x['value'], iterable)) / len(iterable)def dispatcher(src):# 分发器中记录handler,同时保存各自的队列handlers = []queues = []def reg(handler, width: int, interval: int):"""注册 窗口处理函数:param handler:注册的数据处理函数:param width:时间窗口宽度:param interval:时间间隔"""q = Queue()queues.append(q)h = threading.Thread(target=window, args=(q, handler, width, interval))handlers.append(h)def run():for t in handlers:t.start()  # 启动线程处理数据for item in src:  # 将数据源取到的数据分发到所有队列中for q in queues:q.put(item)return reg, runreg, run = dispatcher(source())
reg(handler, 10, 5)  # 注册
run()  # 运行

注意,以上代码也只是现阶段所学知识的一种实现,项目中建议使用消息队列服务的“订阅”模式,消费者各自消费自己的队列的数据。

日志分析

概述
生成中会生成大量的系统日志、应用程序日志、安全日志等等日志,通过对日志的分析可以了解服务器的负载、健康状况,可以分析客户的分布情况、客户的行为,甚至基于这些分析可以做出预测。
一般采集流程

日志产出 采集(Logstash、Flume、Scribe) >存储>分析 >存储(数据库、NoSQL) ->可视化
开源实时日志分析ELK平台
Logstash收集日志,并存放到ElasticSearch集群中,Kibana则从ES集群中查询数据生成图表,返回浏览器端

分析的前提

半结构化数据日志是半结构化数据,是有组织的,有格式的数据。可以分割成行和列,就可以当做表理解和处理了,当然也可以分析里面的数据。

文本分析

日志是文本文件,需要依赖文件10、字符串操作、正则表达式等技术。
通过这些技术就能够把日志中需要的数据提取出来。

183.60.212.153 - - [19/Feb/2013:10:23:29 +0800] "GET /o2o/media.html?menu=3 НТТР/1.1" 200 16691 "-" "Mozilla/5.0 (compatible; EasouSpider; +http://www.easou.com/search/spider.html)"

这是最常见的日志,nginx、tomcat等WEB Server都会产生这样的日志。如何提取出数据?这里面每一段有效的数据对后期的分析都是必须的。

提取数据
一、空格分割
with open('xxx.log') as f:for line in f:for field in line.split():print(field)

缺点:
数据并没有按照业务分割好,比如时间就被分开了,URL相关的也被分开了,User Agent的空格多,被分割了。

所以,定义的时候不选用这种在filed中出现的字符就可以省很多事,例如使用’\x01’这个不可见的

ASCII,print(‘\x01’)试一试
能否依旧是空格分割,但是遇到双引号、中括号特殊处理一下?
思路:

先按照空格切分,然后一个个字符迭代,但如果发现是[或者”,则就不判断是否空格,直到]或

者”结尾,这个区间获取的就是时间等数据。

line = '''183.60.212.153 - - [19/Feb/2013:10:23:29 +0800] "GET /o2o/media.html?menu=3 НТТР/1.1" 200 16691 "-" "Mozilla/5.0 (compatible; EasouSpider; +http://www.easou.com/search/spider.html)"'''CHARS = set("\t")def makekey(line: str):start = 0skip = Falsefor i, c in enumerate(line):if not skip and c in '"[':  # [或第一个引号start = i + 1skip = Trueelif skip and c in '"]':  # 第二个引号 或]skip = Falseyield line[start: i]start = i + 1continueif skip:  # 如果遇到[或 第一个引号就跳过continueif c in CHARS:if start == i:start = i + 1continueyield line[start:i]start = i + 1else:if start < len(line):yield line[start:]print(list(makekey(line)))输出:
['19/Feb/2013:10:23:29 +0800', 'GET /o2o/media.html?menu=3 НТТР/1.1', '-', 'Mozilla/5.0 (compatible; EasouSpider; +http://www.easou.com/search/spider.html)']

类型转换
fields中的数据是有类型的,例如时间、状态码等。对不同的field要做不同的类型转换,甚至是自
定义的转换
时间转换
19/Feb/2013:10:23:29 +0800 对应格式是
%d/%b/%Y:%H:%M:%S %z

import datetimedef convert_time(timestr):return datetime.datetime.strptime(timestr, '%d/%b/%Y:%H:%M:%S %z')

可以得到
lambda timestr: datetime.datetime.strptime(timestr,‘%d/%b/%Y:%H:%M:%S %z’)
状态码和字节数
都是整型,使用int函数转换
请求信息的解析

GET /020/media.html?menu=3 HTTP/1.1
method url protocol 三部分都非常重要

def get_request(request:str):return dict(zip(['method', 'url', 'protocol'], request.split()))

lambda request: dict(zip(‘method’, ‘url’, protocol’),request.split0))

映射
对每一个字段命名,然后与值和类型转换的方法对应。解析每一行是有顺序的。

import datetimeline = '''66.249.69.131 - - [19/Feb/2013:10:23:29 +0800] "GET /robots.txt HTTP/1.1" 404 162 "-" "Mozilla/5.0 (compatible; Googlebot/2.1; + http:///bot.html)"'''CHARS = set(" \t")def makekey(line: str):start = 0skip = Falsefor i, c in enumerate(line):if not skip and c in '"[':start = i + 1skip = Trueelif skip and c in '"]':skip = Falseyield line[start:i]start = i + 1continueif skip:continueif c in CHARS:if start == i:start = i + 1continueyield line[start:i]start = i + 1else:if start < len(line):yield line[start:]names = ('remote', '', '', 'datetime', 'request', 'status', 'length', '', 'useragent')ops = (None, None, None, lambda timestr: datetime.datetime.strptime(timestr, '%d/%b/%Y:%H:%M:%S %z'),lambda request: dict(zip(['method', 'url', 'protocol'], request.split())),int, int, None, None)def extract(line: str):return dict(map(lambda item:(item[0], item[2](item[1]) if item[2] is not None else item[1]), zip(names, makekey(line), ops)))print(extract(line))输出:
{'remote': '66.249.69.131', '': '-', 'datetime': datetime.datetime(2013, 2, 19, 10, 23, 29, tzinfo=datetime.timezone(datetime.timedelta(seconds=28800))), 'request': {'method': 'GET', 'url': '/robots.txt', 'protocol': 'HTTP/1.1'}, 'status': 404, 'length': 162, 'useragent': 'Mozilla/5.0 (compatible; Googlebot/2.1; + http:///bot.html)'}

二、正则表达式提取
构造一个正则表达式提取需要的字段,改造extract函数、names和ops

import  datetimenames = ('remote', 'datetime', 'method', 'url', 'protocol', 'status', 'length', 'useragent')ops = (None, lambda timestr: datetime.datetime.strptime(timestr, '%d/%b/%Y:%H:%M:%S %z'),
None, None, None, int, int, None)pattern = r'''([\d.]{7,}) - - \[([/\w +:]+)\] "(\w+) (\S+) ([\w/\d.]+)" (\d+) (\d+).+ "(.+)"'''

能够使用命名分组呢?进一步改造pattern为命名分组,ops也就可以和名词对应了,names就没有必要存在了


PATTERN = r'''(?P<remote>[\d\.]{7,})\s-\s-\s\[(?P<datetime>.*)\]\s(?P<method>.*)\s(?P<url>.*)\s(?P<protocol>.*)"\s(?P<status>\d{3})\s(?P<size>\d+)\s"[^"]+"\s"(?P<useragent>[^"]+)"'''regex = re.compile(PATTERN)ops = {'datetime': lambda datestr: datetime.datetime.strptime(datestr, '%d/%b/%Y:%H:%M:%S %z'),'status': int,'size': int
}

改造后的代码

import datetimeimport reline = '''183.60.212.153 - - [19/Feb/2013:10:23:29 +0800] "GET /o2o/media.html?menu=3 НТТР/1.1" 200 16691 "-" "Mozilla/5.0 (compatible; EasouSpider; +http://www.easou.com/search/spider.html)"'''pattern = r'''(?P<remote>[\d\.]{7,})\s-\s-\s\[(?P<datetime>.*)\]\s(?P<method>.*)\s(?P<url>.*)\s(?P<protocol>.*)"\s(?P<status>\d{3})\s(?P<size>\d+)\s"[^"]+"\s"(?P<useragent>[^"]+)"'''ops = {'datetime': lambda datestr: datetime.datetime.strptime(datestr, '%d/%b/%Y:%H:%M:%S %z'),'status': int,'size': int
}regex = re.compile(pattern)def extract(line: str) -> dict:matcher = regex.match(line)return {k: ops.get(k, lambda x: x)(v) for k, v in matcher.groupdict().items()}print(extract(line))输出:{'remote': '183.60.212.153', 'datetime': datetime.datetime(2013, 2, 19, 10, 23, 29, tzinfo=datetime.timezone(datetime.timedelta(seconds=28800))), 'method': '"GET', 'url': '/o2o/media.html?menu=3', 'protocol': 'НТТР/1.1', 'status': 200, 'size': 16691, 'useragent': 'Mozilla/5.0 (compatible; EasouSpider; +http://www.easou.com/search/spider.html)'}

异常处理
日志中不免会出现一些不匹配的行,需要处理。
这里使用re.match方法,有可能匹配不上。所以要增加一个判断
采用抛出异常的方式,让调用者获得异常并自行处理。

import re
import datetimeline = '''183.60.212.153 - - [19/Feb/2013:10:23:29 +0800] "GET /o2o/media.html?menu=3 НТТР/1.1" 200 16691 "-" "Mozilla/5.0 (compatible; EasouSpider; +http://www.easou.com/search/spider.html)"'''pattern = r'''(?P<remote>[\d\.]{7,})\s-\s-\s\[(?P<datetime>.*)\]\s(?P<method>.*)\s(?P<url>.*)\s(?P<protocol>.*)"\s(?P<status>\d{3})\s(?P<size>\d+)\s"[^"]+"\s"(?P<useragent>[^"]+)"'''ops = {'datetime': lambda datestr: datetime.datetime.strptime(datestr, '%d/%b/%Y:%H:%M:%S %z'),'status': int,'size': int
}regex = re.compile(pattern)def extract(logline: str) -> dict:""""返回字段的字典,抛出异常说明匹配失败"""matcher = regex.match(line)if matcher:return {k: ops.get(k, lambda x: x)(v) for k, v in matcher.groupdict().items()}else:raise Exception('No match')

但是,也可以采用返回一个特殊值的方式,告知调用者没有匹配。

def extract(logline: str) -> dict:"""返回字段的字典,如果返回None说明匹配失败"""matcher = regex.match(line)if matcher:return {k: ops.get(k, lambda x: x)(v) for k, v in matcher.groupdict().items()}else:return None

通过返回值,在函数外部获取了None,同样也可以采取一些措施。本次采用返回None的实现。

文件加载及分析器

整合代码

load函数就是从日志中提取合格的数据的生成器函数。
它可以作为dispatcher函数的数据源。
原来写的handler函数处理一个字典的’datetime’字段,不能处理日志抽取函数extract返回的字典,
提供一个新的函数。

import random
import datetime
import time
from queue import Queue
import threading
import re# 数据源PATTERN = r'(?P<ip>\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}) .* .* \[(?P<datetime>.*)\] "(?P<method>\w+) (?P<url>[^\s]*) (?P<version>[\w|/\.\d]*)" (?P<status>\d{3}) (?P<length>\d+) "(?P<refer>[^\s]*)" "(?P<userAgent>.*)"'regex = re.compile(PATTERN)  # 40i7
ops = {'datetime': lambda datestr: datetime.datetime.strptime(datestr, '%d/%b/%Y:%H:%M:%S %z'),'status': int,'size': int
}def extract(line: str) -> dict:matcher = regex.match(line)if matcher:return {name: ops.get(name, lambda x: x)(data) for name, data in matcher.groupdict().items()}def load(path):""""装载日志文件"""with open(path) as f:for line in f:fields = extract(line)if fields:yield fieldselse:continue  # TODO 解析失败则抛弃或者记录日志# 数据处理
def source(second=1):"""生成数据"""while True:yield {'datetime': datetime.datetime.now(datetime.timezone(datetime.timedelta(hours=8))),'value': random.randint(1, 100)}time.sleep(second)# 滑动窗口函数
def window(src: Queue, handler, width: int, interval: int):"""窗口函数:param src:数据源,缓存队列,用来拿数据:param handler:数据处理函数:param width:时间窗口宽度,秒:param interval:处理时间间隔,秒"""start = datetime.datetime.strptime('20170101 080000 +0800', '%Y%m%d %H%M%S %z')current = datetime.datetime.strptime('20170101 010000 +0800', '%Y%m%d %H%M%S %z')buffer = []  # 窗口中的待计算数据delta = datetime.timedelta(seconds=width - interval)while True:# 从数据源获取数据data = src.get()if data:buffer.append(data)  # 存入临时缓冲等待计算current = data['datetime']# 每隔interval计算buffer中的数据一次if (current - start).total_seconds() >= interval:ret = handler(buffer)print('(}'.format(ret))start = current# 清除超出width的数据buffer = [x for x in buffer if x['datetime'] > current - delta]# 随机数平均数测试函数def handler(iterable):return sum(map(lambda x: x['value'], iterable)) / len(iterable)# 测试函数
def donothing_handler(iterable):return iterable# 分发器
def dispatcher(src):# 分发器中记录handler,同时保存各自的队列handlers = []queues = []def reg(handler, width: int, interval: int):"""注册 窗口处理函数:param handler:注册的数据处理函数:param width:时间窗口宽度:param interval:时间间隔"""q = Queue()queues.append(q)h = threading.Thread(target=window, args=(q, handler, width, interval))handlers.append(h)def run():for t in handlers:t.start()  # 启动线程处理数据for item in src:  # 将数据源取到的数据分发到所有队列中for q in queues:print('item: {}'.format(item))q.put(item)return reg, runif __name__ == '__main__':import sys# path = sys.argv[1]path = 'test.log'reg, run = dispatcher(load(path))reg(donothing_handler, 10, 5)  # 注册处理函数run()  # 运行test.log 文件内容为66.249.69.131 - - [19/Feb/2013:10:23:29 +0800] "GET /robots.txt HTTP/1.1" 404 162 "-" "Mozilla/5.0 (compatible; Googlebot/2.1; + http:///bot.html)"
完成分析功能

分析日志很重要,通过海量数据分析就能够知道是否遭受了攻击,是否被爬取及爬取高峰期,是否有盗链等。
百度(Baidu) 爬虫名称(Baiduspider)
谷歌(Google)爬虫名称(Googlebot)

状态码分析

状态码中包含了很多信息。例如
304,服务器收到客户端提交的请求参数,发现资源未变化,要求浏览器使用静态资源的缓存
404,服务器找不大请求的资源
304占比大,说明静态缓存效果明显。404占比大,说明网站出现了错误链接,或者尝试嗅探网站
资源。
如果400、500占比突然开始增大,网站一定出问题了。

# 状态码占比
def status_handler(iterable):# 时间窗口内的一批数据status = {}for item in iterable:key = item['status']status[key] = status.get(key, 0) + 1# total = sum(status.values())total = len(iterable)return {k: status[k] / total for k, v in status.items()}

如果还需要什么分析,增加分析函数handler注册就行了

日志文件的加载

目前实现的代码中,只能接受一个路径,修改为接受一批路径。

可以约定一下路径下文件的存放方式:

如果送来的是一批路径,就迭代其中路径。

如果路径是一个普通文件,就按照行读取内容。

如果路径是一个目录,就遍历路径下所有普通文件,每一个文件按照行处理。不递归处理子目录。

from pathlib import Pathdef load(*paths):for item in paths:p = Path(item)if not p.exists():continueif p.is_dir():for file in p.iterdir():if file.is_file():pass  # 和下面处理一样elif p.is_file():with open(str(p)) as f:for line in f:fields = extract(line)if fields:yield fieldselse:continue  # TODO 解析失败则抛弃或者记录日志

写的过程中发现重复的地方,把文件处理部分提出来写成函数。

from pathlib import Pathdef openfile(path: str):with open(path) as f:for line in f:fields = extract(line)if fields:yield fieldselse:continue  # TODO 解析失败则抛弃或者记录日志def load(*paths):for item in paths:p = Path(item)if not p.exists():continueif p.is_dir():for file in p.iterdir():if file.is_file():yield from openfile(str(file))elif p.is_file():yield from openfile(str(p))
完整代码
import random
import datetime
import time
from queue import Queue
import threading
import re
from pathlib import Path# 数据源PATTERN = r'''(?P<remote>[\d\.]{7,})\s-\s-\s\[(?P<datetime>.*)\]\s(?P<method>.*)\s(?P<url>.*)\s(?P<protocol>.*)"\s(?P<status>\d{3})\s(?P<size>\d+)\s"[^"]+"\s"(?P<useragent>[^"]+)"'''regex = re.compile(PATTERN)ops = {'datetime': lambda datestr: datetime.datetime.strptime(datestr, '%d/%b/%Y:%H:%M:%S %z'),'status': int,'size': int
}def extract(line: str) -> dict:matcher = regex.match(line)if matcher:return {name: ops.get(name, lambda x: x)(data) for name, data in matcher.groupdict().items()}# 装载文件
def openfile(path: str):with open(path) as f:for line in f:fields = extract(line)if fields:yield fieldselse:continue  # TODO 解析失败则抛弃或者记录日志def load(*paths):for item in paths:p = Path(item)if not p.exists():continueif p.is_dir():for file in p.iterdir():if file.is_file():yield from openfile(str(file))elif p.is_file():yield from openfile(str(p))# 数据处理
def source(second=1):"""生成数据"""while True:yield {'datetime': datetime.datetime.now(datetime.timezone(datetime.timedelta(hours=8))),'value': random.randint(1, 100)}print('sleep second:',second)time.sleep(second)# 滑动窗口函数
def window(src: Queue, handler, width: int, interval: int):"""窗口函数:param src:数据源,缓存队列,用来拿数据:param handler:数据处理函数:param width:时间窗口宽度,秒:param interval:处理时间间隔,秒"""start = datetime.datetime.strptime('20170101 000000 +0800', '%Y%m%d %H%M%S %z')current = datetime.datetime.strptime('20170101 010000 +0800', '%Y%m%d %H%M%S %z')buffer = []  # 窗口中的待计算数据delta = datetime.timedelta(seconds=width - interval)while True:# 从数据源获取数据data = src.get()if data:buffer.append(data)  # 存入临时缓冲等待计算current = data['datetime']# 每隔interval计算buffer中的数据一次if (current - start).total_seconds() >= interval:ret = handler(buffer)print('{}'.format(ret))start = current# 清除超出width的数据buffer = [x for x in buffer if x['datetime'] > current - delta]# 随机数平均数测试函数def handler(iterable):return sum(map(lambda x: x['value'], iterable)) / len(iterable)# 测试凼数
def donothing_handler(iterable):return iterable# 状态码占比
def status_handler(iterable):# 时间窗口内的一批数据status = {}for item in iterable:key = item['status']status[key] = status.get(key, 0) + 1# total = sum(status.values())total = len(iterable)return {k: status[k] / total for k, v in status.items()}# 分发器
def dispatcher(src):# 分发器中记录handler,同时保存各自的队列handlers = []queues = []def reg(handler, width: int, interval: int):"""注册 窗口处理函数:param handler:注册的数据处理函数:param width:时间窗口宽度:param interval:时间间隔"""q = Queue()queues.append(q)h = threading.Thread(target=window, args=(q, handler, width, interval))handlers.append(h)def run():for t in handlers:t.start()  # 启动线程处理数据for item in src:  # 将数据源取到的数据分发到所有队列中for q in queues:print('queue put ', item)q.put(item)return reg, runif __name__ == '__main__':import sys# path = sys.argv[1]path = 'test1.log'reg, run = dispatcher(load(path))reg(status_handler, 10, 5)  # 注册run()  # 1Z1J 妁高薪职业学院# test1.log 
183.60.212.153 - - [19/Feb/2013:10:23:29 +0800] "GET /o2o/media.html?menu=3 НТТР/1.1" 200 16691 "-" "Mozilla/5.0 (compatible; EasouSpider; +http://www.easou.com/search/spider.html)"
183.60.212.153 - - [19/Feb/2013:10:23:29 +0800] "GET /o2o/media.html?menu=3 НТТР/1.1" 200 16691 "-" "Mozilla/5.0 (compatible; EasouSpider; +http://www.easou.com/search/spider.html)"
183.60.212.153 - - [19/Feb/2013:10:23:29 +0800] "GET /o2o/media.html?menu=3 НТТР/1.1" 200 16691 "-" "Mozilla/5.0 (compatible; EasouSpider; +http://www.easou.com/search/spider.html)"
183.60.212.153 - - [19/Feb/2013:10:23:29 +0800] "GET /o2o/media.html?menu=3 НТТР/1.1" 200 16691 "-" "Mozilla/5.0 (compatible; EasouSpider; +http://www.easou.com/search/spider.html)"
183.60.212.153 - - [19/Feb/2013:10:23:29 +0800] "GET /o2o/media.html?menu=3 НТТР/1.1" 200 16691 "-" "Mozilla/5.0 (compatible; EasouSpider; +http://www.easou.com/search/spider.html)"输出:queue put  {'remote': '183.60.212.153', 'datetime': datetime.datetime(2013, 2, 19, 10, 23, 29, tzinfo=datetime.timezone(datetime.timedelta(seconds=28800))), 'method': '"GET', 'url': '/o2o/media.html?menu=3', 'protocol': 'НТТР/1.1', 'status': 200, 'size': 16691, 'useragent': 'Mozilla/5.0 (compatible; EasouSpider; +http://www.easou.com/search/spider.html)'}
queue put  {'remote': '183.60.212.153', 'datetime': datetime.datetime(2013, 2, 19, 10, 23, 29, tzinfo=datetime.timezone(datetime.timedelta(seconds=28800))), 'method': '"GET', 'url': '/o2o/media.html?menu=3', 'protocol': 'НТТР/1.1', 'status': 200, 'size': 16691, 'useragent': 'Mozilla/5.0 (compatible; EasouSpider; +http://www.easou.com/search/spider.html)'}
queue put  {'remote': '183.60.212.153', 'datetime': datetime.datetime(2013, 2, 19, 10, 23, 29, tzinfo=datetime.timezone(datetime.timedelta(seconds=28800))), 'method': '"GET', 'url': '/o2o/media.html?menu=3', 'protocol': 'НТТР/1.1', 'status': 200, 'size': 16691, 'useragent': 'Mozilla/5.0 (compatible; EasouSpider; +http://www.easou.com/search/spider.html)'}
queue put  {'remote': '183.60.212.153', 'datetime': datetime.datetime(2013, 2, 19, 10, 23, 29, tzinfo=datetime.timezone(datetime.timedelta(seconds=28800))), 'method': '"GET', 'url': '/o2o/media.html?menu=3', 'protocol': 'НТТР/1.1', 'status': 200, 'size': 16691, 'useragent': 'Mozilla/5.0 (compatible; EasouSpider; +http://www.easou.com/search/spider.html)'}
queue put  {'remote': '183.60.212.153', 'datetime': datetime.datetime(2013, 2, 19, 10, 23, 29, tzinfo=datetime.timezone(datetime.timedelta(seconds=28800))), 'method': '"GET', 'url': '/o2o/media.html?menu=3', 'protocol': 'НТТР/1.1', 'status': 200, 'size': 16691, 'useragent': 'Mozilla/5.0 (compatible; EasouSpider; +http://www.easou.com/search/spider.html)'}

到这里,一个离线日志分析项目基本完成。
1、可以指定文件或目录,对日志进行数据分析
2、分析函数可以动态注册
3、数据可以分发给不同的分析处理程序处理

浏览器分析

useragent
这里指的是,软件按照一定的格式向远端的服务器提供一个标识自己的字符串。在HTTP协议中,使用user-agent字段传送这个字符串。
注意:这个值可以被修改
格式

现在浏览器的user-agent值格式一般如下:Mozilla/[version] ([system and browser information]) [platform] ([platform details]
) [extensions]例如  ChromeMozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chr
ome/57.0.2987.133 Safari/537.36FirefoxMozilla/5.0 (Windows NT 6.1; Win64; x64; rv:56.0) Gecko/20100101 Firefox/56.0
Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:52.0) Gecko/20100101 Firefox/52.0IE
Mozilla/5.0 (compatible; MSIE 10.0; Windows NT 6.1; WOW64; Trident/6.0; SLCC2; .NET
CLR 2.0.50727; .NET CLR 3.5.30729; •NET CLR 3.0.30729; Media Center PC 6.0; .NET4.
eC; .NET4.0E)

信息提取
pyyaml, ua-parser. user-agents模块。
安装
pip3 install pyyaml ua-parser user-agents
使用

from user_agents import parseuseragents = ["Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/57.0.2987.133 Safari/537.36","Mozilla/5.0 (Windows NT 6.1; Win64; x64; rv:56.0) Gecko/20100101 Firefox/56.0""Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:52.0) Gecko/20100101 Firefox/52.0","Mozilla/5.0 (compatible; MSIE 10.0; Windows NT 6.1; WOW64; Trident/6.0; SLCC2;.NET CLR 2.0.50727; .NET CLR 3.5.30729; .NET CLR 3.0.30729; Media Center PC 6.0; .NET4.0C; -NET4.0E)"
]for uastring in useragents:ua = parse(uastring)print(ua.browser, ua.browser.family, ua.browser.version, ua.browser.version_string)输出:
Browser(family='Chrome', version=(57, 0, 2987), version_string='57.0.2987') Chrome (57, 0, 2987) 57.0.2987
Browser(family='Firefox', version=(52, 0), version_string='52.0') Firefox (52, 0) 52.0
Browser(family='IE', version=(10, 0), version_string='10.0') IE (10, 0) 10.0

ua.browser. family和ua.browser.version_string分别返回浏览器名称、版本号。

数据分析

from user_agents import parse
import datetimeops = {'datetime': lambda timestr: datetime.datetime.strptime(timestr, '%d/%b/%Y:%H:%M:%S %z'),'status': int,'length': int,'request': lambda request: dict(zip(('method', 'url', 'protocol'), request.split())),'useragent': lambda useragent: parse(useragent)
}from user_agents import parseops = {'datetime': lambda datestr: datetime.datetime.strptime(datestr, '%d/%b/%Y:%H:%M:%S %z'),'status': int,'size': int,'useragent': lambda ua: parse(ua)
}

增加浏览器分析函数

# # 浏览器分析
def browser_handler(iterable):browsers = {}for item in iterable:ua = item['useragent']key = (ua.browser.family, ua.browser.version_string)browsers[key] = browsers.get(key, 0) + 1return browsers

注册handler,注意时间窗口宽度

reg(browser_handler, 5, 5)

问题
如果想知道所有浏览器的统计,怎么办?

allbrowsers = 1# 浏览器分析
def browser_handler(iterable):browsers = {}for item in iterable:ua = item['useragent']key = (ua.browser.family, ua.browser.version_string)browsers[key] = browsers.get(key, 0) + 1allbrowsers[key] = allbrowsers.get(key, 0) + 1print(sorted(allbrowsers.items(), key=lambda x: x[1], reverse=True)[:10])return browsers

完整代码

import random
import datetime
import time
from queue import Queue
import threading
import re
from pathlib import PathPATTERN = r'''(?P<remote>[\d\.]{7,})\s-\s-\s\[(?P<datetime>.*)\]\s(?P<method>.*)\s(?P<url>.*)\s(?P<protocol>.*)"\s(?P<status>\d{3})\s(?P<size>\d+)\s"[^"]+"\s"(?P<useragent>[^"]+)"'''regex = re.compile(PATTERN)from user_agents import parseops = {'datetime': lambda datestr: datetime.datetime.strptime(datestr, '%d/%b/%Y:%H:%M:%S %z'),'status': int,'size': int,'useragent': lambda ua: parse(ua)
}def extract(line: str) -> dict:matcher = regex.match(line)if matcher:return {name: ops.get(name, lambda x: x)(data) for name, data in matcher.groupdict().items()}# 装载文件
def openfile(path: str):with open(path) as f:for line in f:fields = extract(line)if fields:yield fieldselse:continue  # TODO 解析失败则抛弃或者记录日志def load(*paths):for item in paths:p = Path(item)if not p.exists():continueif p.is_dir():for file in p.iterdir():if file.is_file():yield from openfile(str(file))elif p.is_file():yield from openfile(str(p))# 数据处理
def source(second=1):"""生成数据"""while True:yield {'datetime': datetime.datetime.now(datetime.timezone(datetime.timedelta(hours=8))),'value': random.randint(1, 100)}time.sleep(second)# 滑动窗口函数
def window(src: Queue, handler, width: int, interval: int):"""窗口函数:paramsrc:数据源,缓存队列,用来拿数据:paramhandler:数据处理函数:paramwidth:时间窗口宽度,秒:paraminterval:处理时间间隔,秒"""start = datetime.datetime.strptime('20170101 000800 +0800', '%Y%m%d %H%M%S %z')current = datetime.datetime.strptime('20170101 010000 +0800', '%Y%m%d %H%M%S %z')buffer = []  # 窗口中的待计算数据delta = datetime.timedelta(seconds=width - interval)while True:# 从数据源获取数据data = src.get()if data:buffer.append(data)  # 存入临时缓冲等待计算学院。current = data['datetime']# 每隔interval计算buffer中的数据一次if (current - start).total_seconds() >= interval:ret = handler(buffer)print('{}'.format(ret))start = current# 清除超出width的数据buffer = [x for x in buffer if x['datetime'] > current - delta]# 随机数平均数测试函数def handler(iterable):return sum(map(lambda x: x['value'], iterable)) / len(iterable)# 测试函数def donothing_handler(iterable):return iterable# 状态码占比def status_handler(iterable):# 时间窗口内的一批数据status = {}for item in iterable:key = item['status']status[key] = status.get(key, 0) + 1# total = sum(status.values())total = len(iterable)return {k: status[k] / total for k, v in status.items()}allbrowsers = {}# 浏览器分析
def browser_handler(iterable):browsers = {}for item in iterable:ua = item['useragent']key = (ua.browser.family, ua.browser.version_string)browsers[key] = browsers.get(key, 0) + 1allbrowsers[key] = allbrowsers.get(key, 0) + 1print(sorted(allbrowsers.items(), key=lambda x: x[1], reverse=True)[:10])return browsers# 分发器
def dispatcher(src):# 分发器中记录handler,同时保存各自的队列handlers = []queues = []def reg(handler, width: int, interval: int):"""注册 窗口处理函数:param handler:注册的数据处理函数:param width:时间窗口宽度:param interval:时间间隔"""q = Queue()queues.append(q)h = threading.Thread(target=window, args=(q, handler, width, interval))handlers.append(h)def run():for t in handlers:t.start()  # 启动线程处理数据for item in src:  # 将数据源取到的数据分发到所有队列中for q in queues:print('q put item :',item)q.put(item)return reg, runif __name__ == '__main__':import sys# path = sys.argv[1]path = 'test1.log'reg, run = dispatcher(load(path))reg(status_handler, 10, 5)  # VEreg(browser_handler, 5, 5)run()  # 运行test.log文件内容如下66.249.69.131 - - [19/Feb/2013:10:23:29 +0800] "GET /robots.txt HTTP/1.1" 404 162 "-" "Mozilla/5.0 (compatible; Googlebot/2.1; + http:///bot.html)"输出:
q put item : {'remote': '183.60.212.153', 'datetime': datetime.datetime(2013, 2, 19, 10, 23, 29, tzinfo=datetime.timezone(datetime.timedelta(seconds=28800))), 'method': '"GET', 'url': '/o2o/media.html?menu=3', 'protocol': 'НТТР/1.1', 'status': 200, 'size': 16691, 'useragent': <user_agents.parsers.UserAgent object at 0x10308f0e0>}
q put item : {'remote': '183.60.212.153', 'datetime': 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/web/88487.shtml
繁体地址,请注明出处:http://hk.pswp.cn/web/88487.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

一文入门神经网络:神经网络概念初识

神经网络的世界远比你想象得更丰富多元。从基础架构到前沿融合模型&#xff0c;我为你梳理了当前最值得关注的神经网络类型&#xff0c;不仅包括那些“教科书级”的经典模型&#xff0c;也覆盖了正在改变行业格局的新兴架构。以下是系统分类与核心特点总结&#xff1a;一、基础…

线上事故处理记录

线上事故处理记录 一、MySQL 导致的服务器 CPU 飙升 有一天&#xff0c;突然收到了服务器 CPU 飙升的告警信息&#xff0c;打开普罗米修斯查看 CPU 的使用情况&#xff0c;发现 CPU 确实飙升了&#xff0c;下面开始去进行问题定位了。 1. 首先连接到对应的服务器&#xff0c;然…

ParaCAD 笔记 png 图纸标注数据集

ParaCAD-Dataset git lfs install git clone https://www.modelscope.cn/datasets/yuwenbonnie/ParaCAD-Dataset.git https://github.com/ParaCAD/ 不止100g 下个最小的 没有三视图

C#使用Semantic Kernel实现Embedding功能

1、背景 C#开发中&#xff0c;可以通过Semantic Kernel实现本地模型的调用和实现。 本地的Ollama的版本如下&#xff1a;安装的Package如下&#xff1a;2、代码实现 // See https://aka.ms/new-console-template for more information using Microsoft.Extensions.AI; using Mi…

转转APP逆向

APP版本 11.15.0 接口分析 # URL https://app.zhuanzhuan.com/zz/transfer/search# header cookie xxx x-zz-monitoring-metrics feMetricAntiCheatLevelV1 zztk user-agent Zhuan/11.15.0 (11015000) Dalvik/2.1.0 (Linux; U; Android 10; Pixel 3 Build/QQ3A.200805.001) z…

注解与反射的完美配合:Java中的声明式编程实践

注解与反射的完美配合&#xff1a;Java中的声明式编程实践 目录 引言 核心概念 工作机制 实战示例 传统方式的痛点 注解反射的优势 实际应用场景 最佳实践 总结 引言 在现代Java开发中&#xff0c;我们经常看到这样的代码&#xff1a; Range(min 1, max 50)priva…

开源入侵防御系统——CrowdSec

1、简介 CrowdSec 是一款现代化、开源、基于行为的入侵防御系统&#xff08;IDS/IPS&#xff09;&#xff0c;专为保护服务器、服务、容器、云原生应用而设计。它通过分析日志检测可疑行为&#xff0c;并可基于社区协作共享恶意 IP 黑名单&#xff0c;从而实现分布式防御。 其…

imx6ull-裸机学习实验13——串口格式化函数移植实验

目录 前言 格式化函数 实验程序编写 stdio文件夹 main.c Makefile修改 编译下载 前言 在学习实验12&#xff1a;imx6ull串口通信实验&#xff0c;我们实现了 UART1 基本的数据收发功能&#xff0c;虽然可以用来调试程序&#xff0c;但是功能太单一了&#xff0c;只能输出…

CCF-GESP 等级考试 2025年6月认证C++三级真题解析

1 单选题&#xff08;每题 2 分&#xff0c;共 30 分&#xff09;第1题 8位二进制原码能表示的最小整数是&#xff1a;&#xff08; &#xff09;A. -127 B. -128 C. -255 …

【网络安全】服务间身份认证与授权模式

未经许可,不得转载。 文章目录 问题背景用户到服务的身份认证与授权系统对系统的通信服务与服务之间的通信需求分析Basic Auth(基本身份认证)优点缺点mTLS 证书认证优点缺点OAuth 2.0优点缺点JWS(JSON Web Signature)优点缺点结合 Open Policy Agent 的 JWS 方案优点缺点结…

【EGSR2025】材质+扩散模型+神经网络相关论文整理随笔(四)

An evaluation of SVBRDF Prediction from Generative Image Models for Appearance Modeling of 3D Scenes输入3D场景的几何和一张参考图像&#xff0c;通过扩散模型和SVBRDF预测器获取多视角的材质maps&#xff0c;这些maps最终合并成场景的纹理地图集&#xff0c;并支持在任…

Grid网格布局完整功能介绍和示例演示

CSS Grid布局是一种强大的二维布局系统&#xff0c;可以将页面划分为行和列&#xff0c;精确控制元素的位置和大小。以下是其完整功能介绍和示例演示&#xff1a; 基本概念 网格容器&#xff08;Grid Container&#xff09;&#xff1a;应用display: grid的元素。网格项&#x…

学习C++、QT---21(QT中QFile库的QFile读取文件、写入文件的讲解)

每日一言把大目标拆成小步&#xff0c;每天前进一点点&#xff0c;终会抵达终点。QFile读取文件我们记事本要进行读取文件、写入文件、等等的操作&#xff0c;那么这个时候我们的QT有一个QT类叫做QFile这个类的话是专门对于文件操作的&#xff0c;所以我们来学习我们在QT的帮助…

AD736ARZ-R7精密真有效值转换器 高精度测量的首选方案

AD736ARZ-R7精密转换器产品概述AD736ARZ-R7是ADI&#xff08;Analog Devices Inc.&#xff09;推出的一款低功耗、高精度的真有效值&#xff08;RMS&#xff09;转直流&#xff08;DC&#xff09;转换器&#xff0c;采用SOIC-8封装&#xff0c;适用于需要精确测量交流或复杂波形…

【web应用】若依框架前端报表制作与导出全攻略(ECharts + html2canvas + jsPDF)

文章目录前言一、ECharts准备工作1. 检查ECharts安装2. 导入ECharts3. 创建饼图组件4. 模板部分二、报表导出功能实现1. 安装依赖2. 导入依赖3. 完整导出函数实现4. 样式优化三、完整组件实现四、常见问题与解决方案1. 图表截图不完整或模糊2. 图表背景透明3. 导出PDF中文乱码4…

vue3+express联调接口时报“\“username\“ is required“问题

我用node .js的express框架写的登录接口&#xff0c;发现postman可以调通&#xff0c;但是vue3前端报错vue3我发现是我后端node.js的app.js入口文件中配置的解析前端参数的解析中间件和前端请求头中的Content-Type配置不一致的原因 解决方案 因为我后端配置解析表单数据的中间件…

《月亮与六便士》:天才的背叛与凡人救赎的残酷辩证法

当满地六便士成了庸人的火葬场​​毛姆笔下的斯特里克兰德&#xff0c;是一把捅穿中产幻梦的利刃。这个抛妻弃子、背叛友人的证券经纪人&#xff0c;在伦敦客厅的茶香与银勺碰撞声中&#xff0c;突然听见了远方的惊雷——“我必须画画”。如书中所言&#xff1a;​​“在满地都…

vue2往vue3升级需要注意的点(个人建议非必要别直接升级)

将 Vue 2 项目升级到 Vue 3 的过程中&#xff0c;需要重点关注以下几个难点和关键点&#xff1a; 建议小项目直接用vue3重写更快&#xff0c;bug更少 文章目录1. **Composition API 的学习与应用**2. **全局 API 的变更**3. **模板语法的兼容性变化**4. **组件选项和生命周期的…

聚焦数据资源建设与应用,浙江省质科院赴景联文科技调研交流

7月10日上午&#xff0c;浙江省质科院标准化中心副主任蒋建平、应珊婷等一行领导带队莅临景联文科技调研指导工作。双方围绕工业数据展开深度交流。座谈会上&#xff0c;景联文科技详细汇报了数据资源建设与应用方面的成果与规划&#xff0c;介绍了公共数据授权运营与对外合作的…

【Linux】系统引导修复

目录 开机引导过程 一.通电 二.BIOS环境检测 三.磁盘引导阶段 四.文件引导阶段 自动引导配置文件丢失修复 内核参数文件丢失修复 内核镜像文件丢失修复 内核初始化文件丢失修复 boot目录误删丢失修复 开机引导过程 磁盘引导阶段 /boot/grub2/grub.cfg #读取自动引…