序
欠4前年的一份笔记 ,献给今后的自己。
正则表达式
概述
正则表达式,Regular Expression,缩写为regex、regexp、RE等。
正则表达式是文本处理极为重要的技术,用它可以对字符串按照某种规则进行检索、替换。
1970年代,Unix之父Ken Thompson将正则表达式引入到Unix中文本编辑器ed和grep命令中,由此正则表达式普及未来。
1980年后,perl语言对Henry Spencef编写的库,扩展了很多新的特性。1997年开始,Philip Hazel开发出了PCRE (Perl Compatible Regular Expressions),它被PHP和HTTPD等工具采用。
正则表达式应用极其广泛,shell中处理文本的命令、各种高级编程语言都支持正则表达式。
参考 https://www.w3cschool.cn/regex_rmjc/
分类
- BRE
基本正则表达式,grep、sed、vi等软件支持。vim有扩展。
- ERE
扩展正则表达式,egrep(grep-E)、sed-r等。
- PCRE
几乎所有高级语言都是PCRE的方言或者变种。Python从1.6开始使用SRE正则表达式引擎, 可以认为是PCRE的子集,见模块re。
基本语法
元字符 metacharacter
转义
凡是在正则表达式中有特殊意义的符号,如果想使用它的本意,请使用\转义。反斜杠自身,得使用\\
\r,\n还是转义后代表回车、换行
重复
练习:
1、匹配手机号码
字符串为“手机号码13851888188。”
2、匹配中国座机
字符串为“号码025-83105736、0543-5467328。”
- \d{11}
- \d{3,4)-1d{7,8)
注意:
断言会不会捕获呢?也就是断言占不占分组号呢?
断言不占分组号。断言如同条件,只是要求匹配必须满足断言的条件。
分组和捕获是同一个意思
使用正则表达式时,能用简单表达式,就不要复杂的表达式
贪婪与非贪婪
默认是贪婪模式,也就是说尽量多匹配更长的字符串。
非贪婪很简单,在重复的符号后面加上一个?问号,就尽量的少匹配了。
very very happy 使用v.*y和v.*?y
引擎选项
单行模式:
. 可以匹配所有字符,包括换行符。
^表示整个字符串的开头,$整个字符串的结尾
多行模式:
.可以匹配除了换行符之外的字符。
^ 表不行首,$行尾
^表示整个字符串的开始,$表示整个字符串的结尾。开始指的是\n后紧接着下一个字符,结束
指的是/n前的字符
可以认为,单行模式就如同看穿了换行符,所有文本就是一个长长的只有一行的字符串,所有^就是这一行字符串的行首,$就是这一行的行尾。
多行模式,无法穿透换行符,^和$还是行首行尾的意思,只不过限于每一行
注意:注意字符串中看不见的换行符,\r\n会影响e$的测试,e$只能匹配e\n
举例
very very happy
harry key
上面2行happy之后,有可能是\r\n结尾。
y$ 单行匹配key的y,多行匹配happy和key的y。
练习
1、匹配一个0~999之间的任意数字
1
12
995
9999
102
02
003
4d
\d 1位数
[1-9]?\d 1-2位数
^([1-9]\d\d?|\d) 1-3位数
^([1-9]\d\d?|\d)$ 1-3位数的行
^([1-9]\d\d?|\d)\r?$
^([1-9]\d\d?|\d)(?!\d) 数字开头1-3位数且之后不能是数字
2、IP地址
匹配合法的IP地址
192.168.1.150
0.0.0.0
255.255.255.255
17.16.52.100
172.16.0.100
400.400.999.888
001.022.003.000
257.257.255.256
((\d{1,3}).)<3}(\d{1,3})
(?:(\d{1,3)). 3}(\d{1,3}) # 400.400.999.888
对于ip地址验证的问题
- 可以把数据提出来后,交给IP地址解析库 处理,如果解析异常,就说明有问题,正则的验
证只是一个初步的筛选,把明显错误过滤掉。 - 可以使用复杂的正则表达式验证地址正确性
- 前导0是可以的
import socket
nw = socket.inet_aton('192.168.05.001')
print(nw, socket.inet_ntoa(nw))
分析:
每一段上可以写的数字有1、01、001、000、23、023、230、100,也就说1位就是0-9,2位每一位
也是0-9,3位第一位只能0-2,其余2位都可以0-9
(?:([e-2]\d{2}|\d{1,2})\.){3}([0-2]\d{2}|\d{1,2})# 解决超出200的问题,但是256呢?
200是特殊的,要再单独分情况处理
25[0-5]|2[0-4]\d|[01]?\d\d?这就是每一段的逻辑
(?: (25[0-5]|2[0-4]\d|[01]?\d\d?)1.){3)(25[0-5]|2[0-4]\d|[01]?\d\d?)
3、选出含有ftp的链接,且文件类型是gz或者xz的文件名
ftp://ftp.astron.com/pub/file/file-5.14.tar.gz
ttp://ftp.gmplib.org/pub/gmp-5.1.2/gmp-5.1.2.tar.xz
ttp://ftp.vim.org/pub/vim/unix/vim-7.3.tar.bz2
http://anduin.linuxfromscratch.org/sources/LFS/1fs-packages/conglomeration//iana-etc/iana-etc-2.30.tar.bz2
http://anduin.linuxfromscratch.org/sources/other/udev-1fs-205-1.tar.bz2
http://download.savannah.gnu.org/releases/1ibpipeline/libpipeline-1.2.4.tar.gz
http://download.savannah.gnu.org/releases/man-db/man-db-2.6.5.tar.xz
http://download.savannah.gnu.org/releases/sysvinit/sysvinit-2.88dsf.tar.bz2
http://ftp.altlinux.org/pub/people/legion/kbd/kbd-1.15.5.tar.gz
http://mirror.hust.edu.cn/gnu/autoconf/autoconf-2.69.tar.xz
http://mirror.hust.edu.cn/gnu/automake/automake-1.14.tar.xz
.*ftp.*\.(?:gz|xz) #
ftp.*/(.*(?:gz|xz))
\.*ftp.*/([^/]*\.(?:gz|xz))# 捕获文件名分组
(?<=.*ftp.*/)[^/]*\.(?:gz|xz)# 断言文件名前一定还有ftp
Python的正则表达式
Python使用re模块提供了正则表达式处理的能力。
方法
编译
re.compile(pattern, flags=0)
设定flags,编译模式,返回正则表达式对象regex。
pattern就是正则表达式字符串,flags是选项。正则表达式需要被编译,为了提高效率,这些编译后的结果被保存,下次使用同样的pattern的时候,就不需要再次编译。
re的其它方法为了提高效率都调用了编译方法,就是为了提速。
单次匹配
re.match(pattern, string, flags=0)
regex.match(string[, pos[, endpos]])
match匹配从字符串的开头匹配,regex对象match方法可以重设定开始位置和结束位置。返回match对象
re.search(pattern, string, flags=0)
regex. search(string[, pos[, endpos]])
从头搜索直到第一个匹配,regex对象search方法可以重设定开始位置和结束位置,返回match对象
re.fullmatch(pattern, string, flags=0)
regex.fullmatch(string[, pos[, endpos]])
整个字符串和正则表达式匹配
import re
s = '''bottle\nbag\nbig\napple'''
for i, c in enumerate(s, 1):
print((i - 1, c), end='\n' if i % 8 == 0 else ' ')
print()
# (0, 'b') (1, 'o') (2, 't') (3, 't') (4, 'l') (5, 'e') (6, '\n') (7, 'b')
# (8, 'a') (9, 'g') (10, '\n') (11, 'b') (12, 'i') (13, 'g') (14, '\n') (15, 'a')
# (16, 'p') (17, 'p') (18, 'l') (19, 'e')
# match方法
print('--match--')
result = re.match('b', s) # 找到一个就不找了
print(1, result) # <re.Match object; span=(0, 1), match='b'>
result = re.match('a', s) # 没找到,返回None
print(2, result) # None
result = re.match('^a', s, re.M) # 依然从头开始找,多行模式没有用
print(3, result) # None
result = re.match('^a', s, re.S) # 依然从头开始找
print(4, result) # None
# 先编译,然后使用正则表达式对象
regex = re.compile('a')
result = regex.match(s) # 依然从头开始找
print(5, result) # None
result = regex.match(s, 15) # 把索引15作为开始找
print(6, result) # <re.Match object; span=(15, 16), match='a'>
print()
# search方法
print('--search--')
result = re.search('a', s) # 扫描找到匹配的第一个位置
print(7, result) # apple # <re.Match object; span=(8, 9), match='a'>
regex = re.compile('b')
result = regex.search(s, 1)
print(8, result) # bag # <re.Match object; span=(7, 8), match='b'>
regex = re.compile('^b', re.M)
result = regex.search(s) # 不管是不是多行,找到就返回
print(8.5, result) # bottle # <re.Match object; span=(0, 1), match='b'>
result = regex.search(s, 8)
print(9, result) # big # <re.Match object; span=(11, 12), match='b'>
# fullmatch/jŸZ
result = re.fullmatch('bag', s)
print(10, result) # None
regex = re.compile('bag')
result = regex.fullmatch(s)
print(11, result) # None
result = regex.fullmatch(s, 7)
print(12, result) # None
result = regex.fullmatch(s, 7, 10)
print(13, result) # 要完全匹配,多了少了都不行,[7,10) <re.Match object; span=(7, 10), match='bag'>
全文搜索
re.findall (pattern, string, flags=0)
regex.findall(string[, pos[, endpos]])
对整个字符串,从左至右匹配,返回所有匹配项的列表
re.finditer(pattern, string, flags=0)
regex.finditer(string[, pos[, endpos]])
对整个字符串,从左至右匹配,返回所有匹配项,返回迭代器。
注意每次迭代返回的是match对象。
import re
s = '''bottle\nbag\nbig\nable'''
for i, c in enumerate(s, 1):
print((i - 1, c), end='\n' if i % 8 == 0 else ' ')
# (0, 'b') (1, 'o') (2, 't') (3, 't') (4, 'l') (5, 'e') (6, '\n') (7, 'b')
# (8, 'a') (9, 'g') (10, '\n') (11, 'b') (12, 'i') (13, 'g') (14, '\n') (15, 'a')
# (16, 'b') (17, 'l') (18, 'e')
print()
# findal1方法
result = re.findall('b', s)
print(1, result) # ['b', 'b', 'b', 'b']
regex = re.compile('^b')
result = regex.findall(s)
print(2, result) # ['b']
regex = re.compile('^b', re.M)
result = regex.findall(s, 7)
print(3, result) # bag big ['b', 'b']
regex = re.compile('^b', re.S)
result = regex.findall(s)
print(4, result) # bottle ['b']
regex = re.compile('^b', re.M)
result = regex.findall(s, 7, 10)
print(5, result) # bag ['b']
# finditer E
result = regex.finditer(s)
print(type(result)) # <class 'callable_iterator'>
print(next(result)) # <re.Match object; span=(0, 1), match='b'>
print(next(result)) # <re.Match object; span=(7, 8), match='b'>
匹配替换
re.sub(pattern, replacement, string, count=0, flags=0)
regex.sub(replacement, string, count=0)
使用pattern对字符串string进行匹配,对匹配项使用repl替换。
replacement可以是 string, bytes, function.
re.subn(pattern, replacement, string, count=0, flags=0)
regex.subn(replacement, string, count=0)
同sub返回一个元组(new_string,number_of_subs_made)
import re
s = '''bottle\nbag\nbig\napple'''
for i, c in enumerate(s, 1):
print((i - 1, c), end='\n' if i % 8 == 0 else ' ')
print()
# 替换方法
regex = re.compile('b\wg')
result = regex.sub('magedu', s)
print(1, result) # 被替换后的字符串
result = regex.sub('magedu', s, 1) # 替换1次
print(2, result) # 被替换后的字符串
regex = re.compile('\s+')
result = regex.subn('\t', s)
print(3, result) # 被替换后的字符串及替换次数的元组
分割字符串
字符串的分割函数,太难用,不能指定多个字符进行分割。
re.split(pattern, string, maxsplit=0, flags=0)
re.split分割字符串
import re
s = '''01 bottle
02 bag
03 big1
100 able'''
for i,c in enumerate(s, 1):
print((i-1, c), end='\n' if i%8==0 else ' ')
print()
# 把每行单词提取出来
print(s.split()) # 做不到 ['01', 'bottle', '02', 'bag', '03', 'big1', '100', 'able']
result = re.split('[\s\d]+',s)
print(1, result) # ['', 'bottle', 'bag', 'big', 'able']
regex = re.compile('^[\s\d]+') # 字符串首
result = regex.split(s)
print(2, result) # '', 'bottle\n02 bag\n03 big1\n100 able']
regex = re.compile('^[\s\d]+', re.M) # 行首
result = regex.split(s)
print(3, result) # ['', 'bottle\n', 'bag\n', 'big1\n', 'able']
regex = re.compile('\s+\d+\s+')
result = regex.split(' ' + s)
print(4, result)
分组
使用小括号的pattern捕获的数据被放到了组group中。
match、search函数可以返回match对象;findall这回字符串列表;finditer返回一个个match对象
如果pattern中使用了分组,如果有匹配的结果,会在match对象中
1、使用group(N)方式返回对应分组,1-N是对应的分组,0返回整个匹配的字符串
2、如果使用了命名分组,可以使用group(‘name’) 的方式取分组
3、也可以使用groups() 返回所有组
4、使用groupdict() 返回所有命名的分组
import re
s = '''bottle\nbag\nbig\napple'''
for i, c in enumerate(s, 1):
print((i - 1, c), end='\n' if i % 8 == 0 else ' ')
print()
# 分组
regex = re.compile('(b\w+)')
result = regex.match(s)
print(type(result))
print(1, 'match', result.groups())
result = regex.search(s, 1)
print(2, 'search', result.groups()) #
# 命名分组
regex = re.compile('(b\w+)\n(?P<name2>b\w+)\n(?P<name3>b\w+)')
result = regex.match(s)
print(3, 'match', result)
print(4, result.group(3), result.group(2), result.group(1))
print(5, result.group(0).encode()) # 0 返回整个匹配字符串
print(6, result.group('name2'), result.group('name3'))
print(6, result.groups())
print(7, result.groupdict())
result = regex.findall(s)
for x in result: # 字符串列表
print(type(x), x)
regex = re.compile('(?P<head>b\w+)')
result = regex.finditers()
for x in result:
print(type(x), x, x.group(), x.group('head'))
练习
匹配邮箱地址
test@hot-mail.com
v-ipamagedu.com
web.manager@magedu. com.cn
super.user@google.com
a@w-a-com
匹 html标记内的内容
<a href= http://www.magedu.com/index.html' target='_blank'>马哥教育</a>
匹配URL
http://www.magedu.com/index.html
https://login.magedu.com
file:///ect/sysconfig/network
匹配二代中国身份证ID
321105700101003
321105197001010030
11210020170101054X
17位数字+1位校验码组成
前6位地址码,8位出生年月,3位数字,1位校验位(0-9或X)
判断密码强弱
要求密码必须由 10-15位 指定字符组成:
十进制数字
大写字母
小写字母
下划线
要求四种类型的字符都要出现才算合法的强密码
例如:Aatb32_67mq,其中包含大写字母、小写字母、数字和下划线,是合格的强密码
单词统计 word count
对sample文件进行单词统计,要求使用正则表达式
参考
邮箱
\w+[-.\w]*@[\w-]+(\.[\w-]+)+
html提取
<[^<>]+>(.*)<[^<>]+>
如果要匹配标记a
<(\w+)\s+[^<>]+>(.*)(</\1>)
URL提取
(wt)://([^\s]+)
身份证验证
身份证验证需要使用公式计算,最严格的应该实名验证。
\d{17)[0-9xX]|\d{15}
强密码
Aatb32_67mnq
Aatb32_67m.nq
中国是一个伟大的国家aA_8
10-15位,其中包含大写字母、小写字母、数字和下划线
^\w{10,15}$
如果测试有不可见字符干扰使用^w{10,15}\r?$
看似正确,但是,如果密码有中文呢?
^[a-zA-Z0-9_]{10,15}$
但是还是没有解决类似于 111111111112 这种密码的问题,如何解决?
需要用到一些非正则表达式的手段。
利用判断来解决,思路如下:
1、可以判断当前密码字符串中是否有\W,如果出现就说明一定不是合法的,如果不出现说明合法
2、对合法继续判断,如果出现过_下划线,说明有可能是强密码,但是没有下划线说明一定不是强密码。
3、对包含下划线的合法密码字符串继续判断,如果出现过\\d的,说明有可能是强密码,没有出现\\d的一定不是强密码
4、对上一次的包含下划线、数字的合法的密码字符串继续判断,如果出现了[A-Z]说明有可能是 强密码,没有出现[A-Z]说明一定不是强密码
5、对上一次包含下划线、数字、大写字母的合法密码字符串继续判断,如果出现了[a-z]说明就是强密码,找到了,没有出现小写字母就一定不是强密码。请注意上面的判断的顺序,应该是概率上最可能不出在密码字符申的先判断。
单词统计
from collections import defaultdict
import re
def makekey2(line:str, chars=set("""!'"#./\()[],*- \r\n""")):
start = 0
for i, c in enumerate(line):
if c in chars:
if start == i: # 如果紧挨着还是特字符,start一定等丁i
start += 1 # 加1并continue
continue
yield line[start:i]
start = i + 1 # 加1是跳过这个不需要的特珠字符c
else:
if start < len(line): # 小于,说明还有有效的字符,而且一直到末尾
yield line[start:]
# ''"'I\host\mount splitext('.cshrc splitdrive("//host/computer/dir . abc path s\r\n
regex = re.compile('[^\w-]+')
def makekey3(line: str):
for word in regex.split(line):
if len(word):
yield word
def wordcount(filename, encoding='utf8', ignore=set()):
d = defaultdict(lambda: 0)
with open(filename, encoding=encoding) as f:
for line in f:
for word in map(str.lower, makekey2(line)):
if word not in ignore:
d[word] += 1
return d
def top(d: dict, n=10):
for i, (k, v) in enumerate(sorted(d.items(), key=lambda item: item[1], reverse=True)):
if i > n:
break
print(k, v)
# 单词统计前儿名
top(wordcount('sample', ignore={'the', 'a'}))
滑动窗口
数据载入
对于本项目来说,数据就是日志的一行行记录,载入数据就是文件10的读取。将获取数据的方法封装成函数。
def load(path):
""""装载日志文件"""
with open(path) as f:
for line in f:
fields = extract(line)
if fields:
yield fields
else:
continue # TODO 解析失败就抛弃,或者打印日志
时间窗口分析
概念
很多数据,例如日志,都是和时间相关的,都是按照时间顺序产生的。产生的数据分析的时候,要按照时间求值
interval 表示每一次求值的时间间隔
width 时间窗口宽度,指的一次求值的时间窗口宽度
当 width > interval
数据求值时会有重叠
width = interval
数据求值没有重叠
当 width < interval
一般不采纳这种方案,会有数据缺失
时序数据
运维环境中,日志、监控等产生的数据都是与时间相关的数据,按照时间先后产生并记录下来的
数据,所以一般按照时间对数据进行分析。
数据分析基本程序结构
无限的生成随机数函数,产生时间相关的数据,返回时间和随机数的字典
每次取3个数据,求平均值。
import random
import datetime
import time
def source():
while True:
yield {'value': random.randint(1, 100), 'datetime': datetime.datetime.now()}
time.sleep(1)
# 获取数据
s = source()
items = [next(s) for _ in range(3)]
# 处理函数
def handler(iterable):
return sum(map(lambda item: item['value'], iterable)) / len(iterable)
print(items)
print("{:.2f}".format(handler(items)))
# 输出:
# [{'value': 52, 'datetime': datetime.datetime(2025, 7, 10, 23, 3, 57, 876429)}, {'value': 5, 'datetime': datetime.datetime(2025, 7, 10, 23, 3, 58, 880602)}, {'value': 91, 'datetime': datetime.datetime(2025, 7, 10, 23, 3, 59, 885730)}]
# 49.33
上面代码模拟了,一段时间内产生了数据,等了一段固定的时间取数据来计算平均值。
窗口函数实现
将上面的获取数据的程序扩展为window函数。使用重叠的方案。
import random
import datetime
import time
def source(second=1):
"""生成数据"""
while True:
yield {
'datetime': datetime.datetime.now(datetime.timezone(datetime.timedelta(hours=8))),
'value': random.randint(1, 100)
}
time.sleep(second)
def window(iterator, handler, width: int, interval: int):
"""
:param iterator:数据源,生成器,用来拿数据
:param handler:数据处理函数
:param width:时间窗口宽度,秒
:param interval:处理时间间隔,秒
"""
start = datetime.datetime.strptime('20170101 000000 +0800', '%Y%m%d %H%M%S %z')
current = datetime.datetime.strptime('20170101 010000 +0800', '%Y%m%d %H%M%S %z')
buffer = [] # 窗口中的待计算数据
delta = datetime.timedelta(seconds=width - interval)
while True:
# 从数据源获取数据
data = next(iterator)
if data:
buffer.append(data) # 存入临时缓冲等待计算
current = data['datetime']
# 每隔interval计算buffer中的数据一次
if (current - start).total_seconds() >= interval:
ret = handler(buffer)
print('{:.2f}'.format(ret))
start = current
# 清除超出width的数据
buffer = [x for x in buffer if x['datetime'] > current - delta]
def handler(iterable):
return sum(map(lambda x: x['value'], iterable)) / len(iterable)
window(source(), handler, 10, 5)
时间的计算
数据分发
分发
生产者消费者模型
对于一个监控系统,需要处理很多数据,包括日志。对其中已有数据的采集、分析。
被监控对象就是数据的生产者producer,数据的处理程序就是数据的消费者consumer。
生产者消费者传统模型
传统的生产者消费者模型,生产者生产,消费者消费。但这种模型有些问题
开发的代码耦合太高,如果生成规模扩大,不易扩展,生产和消费的速度很难匹配等。
思考一下,生产者和消费者的问题是什么?
举例:
卖包子的,如果包子卖不完,还要继续蒸包子,会怎么样?门可罗雀,包子成山。
如果把包子先蒸一些,卖着,快卖完了,赶紧包,再蒸一些。不会有等包子的队伍。
如果包子供不应求,还没有和面呢,包子都被预定了,出现排队等包子的情况。
上面这些情况,最核心的问题,就是生产者和消费者速度要匹配的问题。
但是,往往速度不能够很好的匹配。
解决的办法–队列queue。
作用–解耦、缓冲。
日志生产者往往会部署好几个程序,日志产生的也很多,而消费者也会有多个程序,去提取日志
分析处理。
数据的生产是不稳定的!会造成短时间数据的“潮涌”,需要缓冲。
消费者消费能力不一样,有快有慢,消费者可以自己决定消费缓冲区中的数据。
单机可以使用queue内建的模块构建进程内的队列,满足多个线程间的生产消费需要。
大型系统可以使用第三方消息中间件:RabbitMQ、RocketMQ、Kafka
queue模块–队列
queue模块提供了一个先进先出的队列Queue。
queue. Queue(maxsize=0)
创建FIFO队列,返回Queue对象。
maxsize 小于等于0,队列长度没有限制。
Queue.get(block=True, timeout=None)
从队列中移除元素并返回这个元素。
block 为阻塞,timeout为超时。
如果block为True,是阻塞,timeout为None就是一直阻塞。
如果block为True但是timeout有值,就阻塞到一定秒数抛出Empty异常。
block为False,是非阻塞,timeout将被忽略,要么成功返回一个元素,要么抛出empty异常。
Queue.get_nowait()
等价于 get(False),也就是说要么成功返回一个元素,要么抛出empty异常。
但是queue的这种阻塞效果,需要多线程的时候演示。
Queue.put(item, block=True, timeout=None)
把一个元素加入到队列中去。
block=True,timeout=None,一直阻塞直至有空位放元素。
block=True, timeout=5,阻塞5秒就抛出Full异常。
block=False, timeout失效,立即返回,能塞进去就塞,不能则返回抛出Full异常。
Queue.put_nowait(item)
等价于 put(item, False),也就是能塞进去就塞,不能则返回抛出Full异常。
# Queue测试
from queue import Queue
import random
q = Queue()
q.put(random.randint(1, 100))
q.put(random.randint(1, 100))
print(q.get())
print(q.get())
# print(q.get())# 阻塞
# Traceback (most recent call last):
# File "/Users/quyixiao/pp/python_lesson/function1/function15.py", line 14, in <module>
# print(q.get(timeout=3)) # 阻塞,但超时抛异常
# ~~~~~^^^^^^^^^^^
# File "/Library/Frameworks/Python.framework/Versions/3.13/lib/python3.13/queue.py", line 212, in get
# raise Empty
# _queue.Empty
print(q.get(timeout=3)) # 阻塞,但超时抛异常
分发器的实现
生产者(数据源)生产数据,缓冲到消息队列中
数据处理流程:
数据加载 -》提取 -》分析(滑动窗口函数)
处理大量数据的时候,对于一个数据源来说,需要多个消费者处理。但是如何分配数据就是个问题了。
需要一个分发器(调度器),把数据分发给不同的消费者处理。
每一个消费者拿到数据后,有自己的处理函数。所以要有一种注册机制
数据加载–》提取–》分发–》分析函数1
|----》分析函数2
分析1和分析2是不同的handler,不同的窗口宽度、间隔时间
如何分发?
这里就简单一点,轮询策略。
一对多的副本发送,一个数据通过分发器,发送到n个消费者。
消息队列
在生产者和消费者之间使用消息队列,那么所有消费者共用一个消息队列,还是各自拥有一个消息队列呢?
共用一个消息队列也可以,但是需要解决争抢的问题。相对来说每一个消费者自己拥有一个队列,较为容易。
如何注册?
在调度器内部记录有哪些消费者,每一个消费者拥有自己的队列。
线程
由于一条数据会被多个不同的注册过的handler处理,所以最好的方式是多线程。
import threading
# 定义线程
# target线程中运行的函数;args这个函数运行时需要的实参的元组
t = threading.Thread(target=window, args=(src, handler, width, interval))
# 启动线程
t.start()
分发器代码实现
import random
import datetime
import time
from queue import Queue
import threading
def source(second=1):
"""生成数据"""
while True:
yield {
'datetime': datetime.datetime.now(datetime.timezone(datetime.timedelta(hours=8))),
'value': random.randint(1, 100)
}
time.sleep(second)
def window(src: Queue, handler, width: int, interval: int):
"""窗口函数
:param src:数据源,缓存队列,用来拿数据
:param handler:数据处理函数
:param width:时间窗口宽度,秒
:param interval:处理时间间隔,秒
"""
start = datetime.datetime.strptime('20170101 000000 +0800', '%Y%m%d %H%M%S %z')
current = datetime.datetime.strptime('20170101 010000 +0800', '%Y%m%d %H%M%S %z')
buffer = [] # 窗口中的待计算数据
delta = datetime.timedelta(seconds=width - interval)
while True:
# 从数据源获取数据
data = src.get()
if data:
buffer.append(data) # 存入临时缓冲等待计算
current = data['datetime']
# 每隔interval计算buffer中的数据一次
if (current - start).total_seconds() >= interval:
ret = handler(buffer)
print('{:.2f}'.format(ret))
start = current
# 清除超出width的数据
buffer = [x for x in buffer if x['datetime'] > current - delta]
def handler(iterable):
return sum(map(lambda x: x['value'], iterable)) / len(iterable)
def dispatcher(src):
# 分发器中记录handler,同时保存各自的队列
handlers = []
queues = []
def reg(handler, width: int, interval: int):
"""注册 窗口处理函数
:param handler:注册的数据处理函数
:param width:时间窗口宽度
:param interval:时间间隔
"""
q = Queue()
queues.append(q)
h = threading.Thread(target=window, args=(q, handler, width, interval))
handlers.append(h)
def run():
for t in handlers:
t.start() # 启动线程处理数据
for item in src: # 将数据源取到的数据分发到所有队列中
for q in queues:
q.put(item)
return reg, run
reg, run = dispatcher(source())
reg(handler, 10, 5) # 注册
run() # 运行
注意,以上代码也只是现阶段所学知识的一种实现,项目中建议使用消息队列服务的“订阅”模式,消费者各自消费自己的队列的数据。
日志分析
概述
生成中会生成大量的系统日志、应用程序日志、安全日志等等日志,通过对日志的分析可以了解服务器的负载、健康状况,可以分析客户的分布情况、客户的行为,甚至基于这些分析可以做出预测。
一般采集流程
日志产出 采集(Logstash、Flume、Scribe) >存储>分析 >存储(数据库、NoSQL) ->可视化
开源实时日志分析ELK平台
Logstash收集日志,并存放到ElasticSearch集群中,Kibana则从ES集群中查询数据生成图表,返回浏览器端
分析的前提
半结构化数据日志是半结构化数据,是有组织的,有格式的数据。可以分割成行和列,就可以当做表理解和处理了,当然也可以分析里面的数据。
文本分析
日志是文本文件,需要依赖文件10、字符串操作、正则表达式等技术。
通过这些技术就能够把日志中需要的数据提取出来。
183.60.212.153 - - [19/Feb/2013:10:23:29 +0800] "GET /o2o/media.html?menu=3 НТТР/1.1" 200 16691 "-" "Mozilla/5.0 (compatible; EasouSpider; +http://www.easou.com/search/spider.html)"
这是最常见的日志,nginx、tomcat等WEB Server都会产生这样的日志。如何提取出数据?这里面每一段有效的数据对后期的分析都是必须的。
提取数据
一、空格分割
with open('xxx.log') as f:
for line in f:
for field in line.split():
print(field)
缺点:
数据并没有按照业务分割好,比如时间就被分开了,URL相关的也被分开了,User Agent的空格多,被分割了。
所以,定义的时候不选用这种在filed中出现的字符就可以省很多事,例如使用’\x01’这个不可见的
ASCII,print(‘\x01’)试一试
能否依旧是空格分割,但是遇到双引号、中括号特殊处理一下?
思路:
先按照空格切分,然后一个个字符迭代,但如果发现是[或者”,则就不判断是否空格,直到]或
者”结尾,这个区间获取的就是时间等数据。
line = '''183.60.212.153 - - [19/Feb/2013:10:23:29 +0800] "GET /o2o/media.html?menu=3 НТТР/1.1" 200 16691 "-" "Mozilla/5.0 (compatible; EasouSpider; +http://www.easou.com/search/spider.html)"'''
CHARS = set("\t")
def makekey(line: str):
start = 0
skip = False
for i, c in enumerate(line):
if not skip and c in '"[': # [或第一个引号
start = i + 1
skip = True
elif skip and c in '"]': # 第二个引号 或]
skip = False
yield line[start: i]
start = i + 1
continue
if skip: # 如果遇到[或 第一个引号就跳过
continue
if c in CHARS:
if start == i:
start = i + 1
continue
yield line[start:i]
start = i + 1
else:
if start < len(line):
yield line[start:]
print(list(makekey(line)))
输出:
['19/Feb/2013:10:23:29 +0800', 'GET /o2o/media.html?menu=3 НТТР/1.1', '-', 'Mozilla/5.0 (compatible; EasouSpider; +http://www.easou.com/search/spider.html)']
类型转换
fields中的数据是有类型的,例如时间、状态码等。对不同的field要做不同的类型转换,甚至是自
定义的转换
时间转换
19/Feb/2013:10:23:29 +0800 对应格式是
%d/%b/%Y:%H:%M:%S %z
import datetime
def convert_time(timestr):
return datetime.datetime.strptime(timestr, '%d/%b/%Y:%H:%M:%S %z')
可以得到
lambda timestr: datetime.datetime.strptime(timestr,‘%d/%b/%Y:%H:%M:%S %z’)
状态码和字节数
都是整型,使用int函数转换
请求信息的解析
GET /020/media.html?menu=3 HTTP/1.1
method url protocol 三部分都非常重要
def get_request(request:str):
return dict(zip(['method', 'url', 'protocol'], request.split()))
lambda request: dict(zip(‘method’, ‘url’, protocol’),request.split0))
映射
对每一个字段命名,然后与值和类型转换的方法对应。解析每一行是有顺序的。
import datetime
line = '''66.249.69.131 - - [19/Feb/2013:10:23:29 +0800] "GET /robots.txt HTTP/1.1" 404 162 "-" "Mozilla/5.0 (compatible; Googlebot/2.1; + http:///bot.html)"'''
CHARS = set(" \t")
def makekey(line: str):
start = 0
skip = False
for i, c in enumerate(line):
if not skip and c in '"[':
start = i + 1
skip = True
elif skip and c in '"]':
skip = False
yield line[start:i]
start = i + 1
continue
if skip:
continue
if c in CHARS:
if start == i:
start = i + 1
continue
yield line[start:i]
start = i + 1
else:
if start < len(line):
yield line[start:]
names = ('remote', '', '', 'datetime', 'request', 'status', 'length', '', 'useragent')
ops = (None, None, None, lambda timestr: datetime.datetime.strptime(timestr, '%d/%b/%Y:%H:%M:%S %z'),
lambda request: dict(zip(['method', 'url', 'protocol'], request.split())),
int, int, None, None)
def extract(line: str):
return dict(map(lambda item:(item[0], item[2](item[1]) if item[2] is not None else item[1]), zip(names, makekey(line), ops)))
print(extract(line))
输出:
{'remote': '66.249.69.131', '': '-', 'datetime': datetime.datetime(2013, 2, 19, 10, 23, 29, tzinfo=datetime.timezone(datetime.timedelta(seconds=28800))), 'request': {'method': 'GET', 'url': '/robots.txt', 'protocol': 'HTTP/1.1'}, 'status': 404, 'length': 162, 'useragent': 'Mozilla/5.0 (compatible; Googlebot/2.1; + http:///bot.html)'}
二、正则表达式提取
构造一个正则表达式提取需要的字段,改造extract函数、names和ops
import datetime
names = ('remote', 'datetime', 'method', 'url', 'protocol', 'status', 'length', 'useragent')
ops = (None, lambda timestr: datetime.datetime.strptime(timestr, '%d/%b/%Y:%H:%M:%S %z'),
None, None, None, int, int, None)
pattern = r'''([\d.]{7,}) - - \[([/\w +:]+)\] "(\w+) (\S+) ([\w/\d.]+)" (\d+) (\d+).+ "(.+)"'''
能够使用命名分组呢?进一步改造pattern为命名分组,ops也就可以和名词对应了,names就没有必要存在了
PATTERN = r'''(?P<remote>[\d\.]{7,})\s-\s-\s\[(?P<datetime>.*)\]\s(?P<method>.*)\s(?P<url>.*)\s(?P<protocol>.*)"\s(?P<status>\d{3})\s(?P<size>\d+)\s"[^"]+"\s"(?P<useragent>[^"]+)"'''
regex = re.compile(PATTERN)
ops = {
'datetime': lambda datestr: datetime.datetime.strptime(datestr, '%d/%b/%Y:%H:%M:%S %z'),
'status': int,
'size': int
}
改造后的代码
import datetime
import re
line = '''183.60.212.153 - - [19/Feb/2013:10:23:29 +0800] "GET /o2o/media.html?menu=3 НТТР/1.1" 200 16691 "-" "Mozilla/5.0 (compatible; EasouSpider; +http://www.easou.com/search/spider.html)"'''
pattern = r'''(?P<remote>[\d\.]{7,})\s-\s-\s\[(?P<datetime>.*)\]\s(?P<method>.*)\s(?P<url>.*)\s(?P<protocol>.*)"\s(?P<status>\d{3})\s(?P<size>\d+)\s"[^"]+"\s"(?P<useragent>[^"]+)"'''
ops = {
'datetime': lambda datestr: datetime.datetime.strptime(datestr, '%d/%b/%Y:%H:%M:%S %z'),
'status': int,
'size': int
}
regex = re.compile(pattern)
def extract(line: str) -> dict:
matcher = regex.match(line)
return {k: ops.get(k, lambda x: x)(v) for k, v in matcher.groupdict().items()}
print(extract(line))
输出:
{'remote': '183.60.212.153', 'datetime': datetime.datetime(2013, 2, 19, 10, 23, 29, tzinfo=datetime.timezone(datetime.timedelta(seconds=28800))), 'method': '"GET', 'url': '/o2o/media.html?menu=3', 'protocol': 'НТТР/1.1', 'status': 200, 'size': 16691, 'useragent': 'Mozilla/5.0 (compatible; EasouSpider; +http://www.easou.com/search/spider.html)'}
异常处理
日志中不免会出现一些不匹配的行,需要处理。
这里使用re.match方法,有可能匹配不上。所以要增加一个判断
采用抛出异常的方式,让调用者获得异常并自行处理。
import re
import datetime
line = '''183.60.212.153 - - [19/Feb/2013:10:23:29 +0800] "GET /o2o/media.html?menu=3 НТТР/1.1" 200 16691 "-" "Mozilla/5.0 (compatible; EasouSpider; +http://www.easou.com/search/spider.html)"'''
pattern = r'''(?P<remote>[\d\.]{7,})\s-\s-\s\[(?P<datetime>.*)\]\s(?P<method>.*)\s(?P<url>.*)\s(?P<protocol>.*)"\s(?P<status>\d{3})\s(?P<size>\d+)\s"[^"]+"\s"(?P<useragent>[^"]+)"'''
ops = {
'datetime': lambda datestr: datetime.datetime.strptime(datestr, '%d/%b/%Y:%H:%M:%S %z'),
'status': int,
'size': int
}
regex = re.compile(pattern)
def extract(logline: str) -> dict:
""""返回字段的字典,抛出异常说明匹配失败"""
matcher = regex.match(line)
if matcher:
return {k: ops.get(k, lambda x: x)(v) for k, v in matcher.groupdict().items()}
else:
raise Exception('No match')
但是,也可以采用返回一个特殊值的方式,告知调用者没有匹配。
def extract(logline: str) -> dict:
"""返回字段的字典,如果返回None说明匹配失败"""
matcher = regex.match(line)
if matcher:
return {k: ops.get(k, lambda x: x)(v) for k, v in matcher.groupdict().items()}
else:
return None
通过返回值,在函数外部获取了None,同样也可以采取一些措施。本次采用返回None的实现。
文件加载及分析器
整合代码
load函数就是从日志中提取合格的数据的生成器函数。
它可以作为dispatcher函数的数据源。
原来写的handler函数处理一个字典的’datetime’字段,不能处理日志抽取函数extract返回的字典,
提供一个新的函数。
import random
import datetime
import time
from queue import Queue
import threading
import re
# 数据源
PATTERN = r'(?P<ip>\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}) .* .* \[(?P<datetime>.*)\] "(?P<method>\w+) (?P<url>[^\s]*) (?P<version>[\w|/\.\d]*)" (?P<status>\d{3}) (?P<length>\d+) "(?P<refer>[^\s]*)" "(?P<userAgent>.*)"'
regex = re.compile(PATTERN) # 40i7
ops = {
'datetime': lambda datestr: datetime.datetime.strptime(datestr, '%d/%b/%Y:%H:%M:%S %z'),
'status': int,
'size': int
}
def extract(line: str) -> dict:
matcher = regex.match(line)
if matcher:
return {name: ops.get(name, lambda x: x)(data) for name, data in matcher.groupdict().items()}
def load(path):
""""装载日志文件"""
with open(path) as f:
for line in f:
fields = extract(line)
if fields:
yield fields
else:
continue # TODO 解析失败则抛弃或者记录日志
# 数据处理
def source(second=1):
"""生成数据"""
while True:
yield {
'datetime': datetime.datetime.now(datetime.timezone(datetime.timedelta(hours=8))),
'value': random.randint(1, 100)
}
time.sleep(second)
# 滑动窗口函数
def window(src: Queue, handler, width: int, interval: int):
"""窗口函数
:param src:数据源,缓存队列,用来拿数据
:param handler:数据处理函数
:param width:时间窗口宽度,秒
:param interval:处理时间间隔,秒"""
start = datetime.datetime.strptime('20170101 080000 +0800', '%Y%m%d %H%M%S %z')
current = datetime.datetime.strptime('20170101 010000 +0800', '%Y%m%d %H%M%S %z')
buffer = [] # 窗口中的待计算数据
delta = datetime.timedelta(seconds=width - interval)
while True:
# 从数据源获取数据
data = src.get()
if data:
buffer.append(data) # 存入临时缓冲等待计算
current = data['datetime']
# 每隔interval计算buffer中的数据一次
if (current - start).total_seconds() >= interval:
ret = handler(buffer)
print('(}'.format(ret))
start = current
# 清除超出width的数据
buffer = [x for x in buffer if x['datetime'] > current - delta]
# 随机数平均数测试函数
def handler(iterable):
return sum(map(lambda x: x['value'], iterable)) / len(iterable)
# 测试函数
def donothing_handler(iterable):
return iterable
# 分发器
def dispatcher(src):
# 分发器中记录handler,同时保存各自的队列
handlers = []
queues = []
def reg(handler, width: int, interval: int):
"""
注册 窗口处理函数
:param handler:注册的数据处理函数
:param width:时间窗口宽度
:param interval:时间间隔
"""
q = Queue()
queues.append(q)
h = threading.Thread(target=window, args=(q, handler, width, interval))
handlers.append(h)
def run():
for t in handlers:
t.start() # 启动线程处理数据
for item in src: # 将数据源取到的数据分发到所有队列中
for q in queues:
print('item: {}'.format(item))
q.put(item)
return reg, run
if __name__ == '__main__':
import sys
# path = sys.argv[1]
path = 'test.log'
reg, run = dispatcher(load(path))
reg(donothing_handler, 10, 5) # 注册处理函数
run() # 运行
test.log 文件内容为
66.249.69.131 - - [19/Feb/2013:10:23:29 +0800] "GET /robots.txt HTTP/1.1" 404 162 "-" "Mozilla/5.0 (compatible; Googlebot/2.1; + http:///bot.html)"
完成分析功能
分析日志很重要,通过海量数据分析就能够知道是否遭受了攻击,是否被爬取及爬取高峰期,是否有盗链等。
百度(Baidu) 爬虫名称(Baiduspider)
谷歌(Google)爬虫名称(Googlebot)
状态码分析
状态码中包含了很多信息。例如
304,服务器收到客户端提交的请求参数,发现资源未变化,要求浏览器使用静态资源的缓存
404,服务器找不大请求的资源
304占比大,说明静态缓存效果明显。404占比大,说明网站出现了错误链接,或者尝试嗅探网站
资源。
如果400、500占比突然开始增大,网站一定出问题了。
# 状态码占比
def status_handler(iterable):
# 时间窗口内的一批数据
status = {}
for item in iterable:
key = item['status']
status[key] = status.get(key, 0) + 1
# total = sum(status.values())
total = len(iterable)
return {k: status[k] / total for k, v in status.items()}
如果还需要什么分析,增加分析函数handler注册就行了
日志文件的加载
目前实现的代码中,只能接受一个路径,修改为接受一批路径。
可以约定一下路径下文件的存放方式:
如果送来的是一批路径,就迭代其中路径。
如果路径是一个普通文件,就按照行读取内容。
如果路径是一个目录,就遍历路径下所有普通文件,每一个文件按照行处理。不递归处理子目录。
from pathlib import Path
def load(*paths):
for item in paths:
p = Path(item)
if not p.exists():
continue
if p.is_dir():
for file in p.iterdir():
if file.is_file():
pass # 和下面处理一样
elif p.is_file():
with open(str(p)) as f:
for line in f:
fields = extract(line)
if fields:
yield fields
else:
continue # TODO 解析失败则抛弃或者记录日志
写的过程中发现重复的地方,把文件处理部分提出来写成函数。
from pathlib import Path
def openfile(path: str):
with open(path) as f:
for line in f:
fields = extract(line)
if fields:
yield fields
else:
continue # TODO 解析失败则抛弃或者记录日志
def load(*paths):
for item in paths:
p = Path(item)
if not p.exists():
continue
if p.is_dir():
for file in p.iterdir():
if file.is_file():
yield from openfile(str(file))
elif p.is_file():
yield from openfile(str(p))
完整代码
import random
import datetime
import time
from queue import Queue
import threading
import re
from pathlib import Path
# 数据源
PATTERN = r'''(?P<remote>[\d\.]{7,})\s-\s-\s\[(?P<datetime>.*)\]\s(?P<method>.*)\s(?P<url>.*)\s(?P<protocol>.*)"\s(?P<status>\d{3})\s(?P<size>\d+)\s"[^"]+"\s"(?P<useragent>[^"]+)"'''
regex = re.compile(PATTERN)
ops = {
'datetime': lambda datestr: datetime.datetime.strptime(datestr, '%d/%b/%Y:%H:%M:%S %z'),
'status': int,
'size': int
}
def extract(line: str) -> dict:
matcher = regex.match(line)
if matcher:
return {name: ops.get(name, lambda x: x)(data) for name, data in matcher.groupdict().items()}
# 装载文件
def openfile(path: str):
with open(path) as f:
for line in f:
fields = extract(line)
if fields:
yield fields
else:
continue # TODO 解析失败则抛弃或者记录日志
def load(*paths):
for item in paths:
p = Path(item)
if not p.exists():
continue
if p.is_dir():
for file in p.iterdir():
if file.is_file():
yield from openfile(str(file))
elif p.is_file():
yield from openfile(str(p))
# 数据处理
def source(second=1):
"""生成数据"""
while True:
yield {
'datetime': datetime.datetime.now(datetime.timezone(datetime.timedelta(hours=8))),
'value': random.randint(1, 100)
}
print('sleep second:',second)
time.sleep(second)
# 滑动窗口函数
def window(src: Queue, handler, width: int, interval: int):
"""
窗口函数
:param src:数据源,缓存队列,用来拿数据
:param handler:数据处理函数
:param width:时间窗口宽度,秒
:param interval:处理时间间隔,秒
"""
start = datetime.datetime.strptime('20170101 000000 +0800', '%Y%m%d %H%M%S %z')
current = datetime.datetime.strptime('20170101 010000 +0800', '%Y%m%d %H%M%S %z')
buffer = [] # 窗口中的待计算数据
delta = datetime.timedelta(seconds=width - interval)
while True:
# 从数据源获取数据
data = src.get()
if data:
buffer.append(data) # 存入临时缓冲等待计算
current = data['datetime']
# 每隔interval计算buffer中的数据一次
if (current - start).total_seconds() >= interval:
ret = handler(buffer)
print('{}'.format(ret))
start = current
# 清除超出width的数据
buffer = [x for x in buffer if x['datetime'] > current - delta]
# 随机数平均数测试函数
def handler(iterable):
return sum(map(lambda x: x['value'], iterable)) / len(iterable)
# 测试凼数
def donothing_handler(iterable):
return iterable
# 状态码占比
def status_handler(iterable):
# 时间窗口内的一批数据
status = {}
for item in iterable:
key = item['status']
status[key] = status.get(key, 0) + 1
# total = sum(status.values())
total = len(iterable)
return {k: status[k] / total for k, v in status.items()}
# 分发器
def dispatcher(src):
# 分发器中记录handler,同时保存各自的队列
handlers = []
queues = []
def reg(handler, width: int, interval: int):
"""
注册 窗口处理函数
:param handler:注册的数据处理函数
:param width:时间窗口宽度
:param interval:时间间隔
"""
q = Queue()
queues.append(q)
h = threading.Thread(target=window, args=(q, handler, width, interval))
handlers.append(h)
def run():
for t in handlers:
t.start() # 启动线程处理数据
for item in src: # 将数据源取到的数据分发到所有队列中
for q in queues:
print('queue put ', item)
q.put(item)
return reg, run
if __name__ == '__main__':
import sys
# path = sys.argv[1]
path = 'test1.log'
reg, run = dispatcher(load(path))
reg(status_handler, 10, 5) # 注册
run() # 1Z1J 妁高薪职业学院
# test1.log
183.60.212.153 - - [19/Feb/2013:10:23:29 +0800] "GET /o2o/media.html?menu=3 НТТР/1.1" 200 16691 "-" "Mozilla/5.0 (compatible; EasouSpider; +http://www.easou.com/search/spider.html)"
183.60.212.153 - - [19/Feb/2013:10:23:29 +0800] "GET /o2o/media.html?menu=3 НТТР/1.1" 200 16691 "-" "Mozilla/5.0 (compatible; EasouSpider; +http://www.easou.com/search/spider.html)"
183.60.212.153 - - [19/Feb/2013:10:23:29 +0800] "GET /o2o/media.html?menu=3 НТТР/1.1" 200 16691 "-" "Mozilla/5.0 (compatible; EasouSpider; +http://www.easou.com/search/spider.html)"
183.60.212.153 - - [19/Feb/2013:10:23:29 +0800] "GET /o2o/media.html?menu=3 НТТР/1.1" 200 16691 "-" "Mozilla/5.0 (compatible; EasouSpider; +http://www.easou.com/search/spider.html)"
183.60.212.153 - - [19/Feb/2013:10:23:29 +0800] "GET /o2o/media.html?menu=3 НТТР/1.1" 200 16691 "-" "Mozilla/5.0 (compatible; EasouSpider; +http://www.easou.com/search/spider.html)"
输出:
queue put {'remote': '183.60.212.153', 'datetime': datetime.datetime(2013, 2, 19, 10, 23, 29, tzinfo=datetime.timezone(datetime.timedelta(seconds=28800))), 'method': '"GET', 'url': '/o2o/media.html?menu=3', 'protocol': 'НТТР/1.1', 'status': 200, 'size': 16691, 'useragent': 'Mozilla/5.0 (compatible; EasouSpider; +http://www.easou.com/search/spider.html)'}
queue put {'remote': '183.60.212.153', 'datetime': datetime.datetime(2013, 2, 19, 10, 23, 29, tzinfo=datetime.timezone(datetime.timedelta(seconds=28800))), 'method': '"GET', 'url': '/o2o/media.html?menu=3', 'protocol': 'НТТР/1.1', 'status': 200, 'size': 16691, 'useragent': 'Mozilla/5.0 (compatible; EasouSpider; +http://www.easou.com/search/spider.html)'}
queue put {'remote': '183.60.212.153', 'datetime': datetime.datetime(2013, 2, 19, 10, 23, 29, tzinfo=datetime.timezone(datetime.timedelta(seconds=28800))), 'method': '"GET', 'url': '/o2o/media.html?menu=3', 'protocol': 'НТТР/1.1', 'status': 200, 'size': 16691, 'useragent': 'Mozilla/5.0 (compatible; EasouSpider; +http://www.easou.com/search/spider.html)'}
queue put {'remote': '183.60.212.153', 'datetime': datetime.datetime(2013, 2, 19, 10, 23, 29, tzinfo=datetime.timezone(datetime.timedelta(seconds=28800))), 'method': '"GET', 'url': '/o2o/media.html?menu=3', 'protocol': 'НТТР/1.1', 'status': 200, 'size': 16691, 'useragent': 'Mozilla/5.0 (compatible; EasouSpider; +http://www.easou.com/search/spider.html)'}
queue put {'remote': '183.60.212.153', 'datetime': datetime.datetime(2013, 2, 19, 10, 23, 29, tzinfo=datetime.timezone(datetime.timedelta(seconds=28800))), 'method': '"GET', 'url': '/o2o/media.html?menu=3', 'protocol': 'НТТР/1.1', 'status': 200, 'size': 16691, 'useragent': 'Mozilla/5.0 (compatible; EasouSpider; +http://www.easou.com/search/spider.html)'}
到这里,一个离线日志分析项目基本完成。
1、可以指定文件或目录,对日志进行数据分析
2、分析函数可以动态注册
3、数据可以分发给不同的分析处理程序处理
浏览器分析
useragent
这里指的是,软件按照一定的格式向远端的服务器提供一个标识自己的字符串。在HTTP协议中,使用user-agent字段传送这个字符串。
注意:这个值可以被修改
格式
现在浏览器的user-agent值格式一般如下:
Mozilla/[version] ([system and browser information]) [platform] ([platform details]
) [extensions]
例如
Chrome
Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chr
ome/57.0.2987.133 Safari/537.36
Firefox
Mozilla/5.0 (Windows NT 6.1; Win64; x64; rv:56.0) Gecko/20100101 Firefox/56.0
Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:52.0) Gecko/20100101 Firefox/52.0
IE
Mozilla/5.0 (compatible; MSIE 10.0; Windows NT 6.1; WOW64; Trident/6.0; SLCC2; .NET
CLR 2.0.50727; .NET CLR 3.5.30729; •NET CLR 3.0.30729; Media Center PC 6.0; .NET4.
eC; .NET4.0E)
信息提取
pyyaml, ua-parser. user-agents模块。
安装
pip3 install pyyaml ua-parser user-agents
使用
from user_agents import parse
useragents = [
"Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/57.0.2987.133 Safari/537.36",
"Mozilla/5.0 (Windows NT 6.1; Win64; x64; rv:56.0) Gecko/20100101 Firefox/56.0"
"Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:52.0) Gecko/20100101 Firefox/52.0",
"Mozilla/5.0 (compatible; MSIE 10.0; Windows NT 6.1; WOW64; Trident/6.0; SLCC2;.NET CLR 2.0.50727; .NET CLR 3.5.30729; .NET CLR 3.0.30729; Media Center PC 6.0; .NET4.0C; -NET4.0E)"
]
for uastring in useragents:
ua = parse(uastring)
print(ua.browser, ua.browser.family, ua.browser.version, ua.browser.version_string)
输出:
Browser(family='Chrome', version=(57, 0, 2987), version_string='57.0.2987') Chrome (57, 0, 2987) 57.0.2987
Browser(family='Firefox', version=(52, 0), version_string='52.0') Firefox (52, 0) 52.0
Browser(family='IE', version=(10, 0), version_string='10.0') IE (10, 0) 10.0
ua.browser. family和ua.browser.version_string分别返回浏览器名称、版本号。
数据分析
from user_agents import parse
import datetime
ops = {
'datetime': lambda timestr: datetime.datetime.strptime(timestr, '%d/%b/%Y:%H:%M:%S %z'),
'status': int,
'length': int,
'request': lambda request: dict(zip(('method', 'url', 'protocol'), request.split())),
'useragent': lambda useragent: parse(useragent)
}
from user_agents import parse
ops = {
'datetime': lambda datestr: datetime.datetime.strptime(datestr, '%d/%b/%Y:%H:%M:%S %z'),
'status': int,
'size': int,
'useragent': lambda ua: parse(ua)
}
增加浏览器分析函数
# # 浏览器分析
def browser_handler(iterable):
browsers = {}
for item in iterable:
ua = item['useragent']
key = (ua.browser.family, ua.browser.version_string)
browsers[key] = browsers.get(key, 0) + 1
return browsers
注册handler,注意时间窗口宽度
reg(browser_handler, 5, 5)
问题
如果想知道所有浏览器的统计,怎么办?
allbrowsers = 1
# 浏览器分析
def browser_handler(iterable):
browsers = {}
for item in iterable:
ua = item['useragent']
key = (ua.browser.family, ua.browser.version_string)
browsers[key] = browsers.get(key, 0) + 1
allbrowsers[key] = allbrowsers.get(key, 0) + 1
print(sorted(allbrowsers.items(), key=lambda x: x[1], reverse=True)[:10])
return browsers
完整代码
import random
import datetime
import time
from queue import Queue
import threading
import re
from pathlib import Path
PATTERN = r'''(?P<remote>[\d\.]{7,})\s-\s-\s\[(?P<datetime>.*)\]\s(?P<method>.*)\s(?P<url>.*)\s(?P<protocol>.*)"\s(?P<status>\d{3})\s(?P<size>\d+)\s"[^"]+"\s"(?P<useragent>[^"]+)"'''
regex = re.compile(PATTERN)
from user_agents import parse
ops = {
'datetime': lambda datestr: datetime.datetime.strptime(datestr, '%d/%b/%Y:%H:%M:%S %z'),
'status': int,
'size': int,
'useragent': lambda ua: parse(ua)
}
def extract(line: str) -> dict:
matcher = regex.match(line)
if matcher:
return {name: ops.get(name, lambda x: x)(data) for name, data in matcher.groupdict().items()}
# 装载文件
def openfile(path: str):
with open(path) as f:
for line in f:
fields = extract(line)
if fields:
yield fields
else:
continue # TODO 解析失败则抛弃或者记录日志
def load(*paths):
for item in paths:
p = Path(item)
if not p.exists():
continue
if p.is_dir():
for file in p.iterdir():
if file.is_file():
yield from openfile(str(file))
elif p.is_file():
yield from openfile(str(p))
# 数据处理
def source(second=1):
"""生成数据"""
while True:
yield {
'datetime': datetime.datetime.now(datetime.timezone(datetime.timedelta(hours=8))),
'value': random.randint(1, 100)
}
time.sleep(second)
# 滑动窗口函数
def window(src: Queue, handler, width: int, interval: int):
"""
窗口函数
:param
src:数据源,缓存队列,用来拿数据
:param
handler:数据处理函数
:param
width:时间窗口宽度,秒
:param
interval:处理时间间隔,秒
"""
start = datetime.datetime.strptime('20170101 000800 +0800', '%Y%m%d %H%M%S %z')
current = datetime.datetime.strptime('20170101 010000 +0800', '%Y%m%d %H%M%S %z')
buffer = [] # 窗口中的待计算数据
delta = datetime.timedelta(seconds=width - interval)
while True:
# 从数据源获取数据
data = src.get()
if data:
buffer.append(data) # 存入临时缓冲等待计算学院。
current = data['datetime']
# 每隔interval计算buffer中的数据一次
if (current - start).total_seconds() >= interval:
ret = handler(buffer)
print('{}'.format(ret))
start = current
# 清除超出width的数据
buffer = [x for x in buffer if x['datetime'] > current - delta]
# 随机数平均数测试函数
def handler(iterable):
return sum(map(lambda x: x['value'], iterable)) / len(iterable)
# 测试函数
def donothing_handler(iterable):
return iterable
# 状态码占比
def status_handler(iterable):
# 时间窗口内的一批数据
status = {}
for item in iterable:
key = item['status']
status[key] = status.get(key, 0) + 1
# total = sum(status.values())
total = len(iterable)
return {k: status[k] / total for k, v in status.items()}
allbrowsers = {}
# 浏览器分析
def browser_handler(iterable):
browsers = {}
for item in iterable:
ua = item['useragent']
key = (ua.browser.family, ua.browser.version_string)
browsers[key] = browsers.get(key, 0) + 1
allbrowsers[key] = allbrowsers.get(key, 0) + 1
print(sorted(allbrowsers.items(), key=lambda x: x[1], reverse=True)[:10])
return browsers
# 分发器
def dispatcher(src):
# 分发器中记录handler,同时保存各自的队列
handlers = []
queues = []
def reg(handler, width: int, interval: int):
"""注册 窗口处理函数
:param handler:注册的数据处理函数
:param width:时间窗口宽度
:param interval:时间间隔
"""
q = Queue()
queues.append(q)
h = threading.Thread(target=window, args=(q, handler, width, interval))
handlers.append(h)
def run():
for t in handlers:
t.start() # 启动线程处理数据
for item in src: # 将数据源取到的数据分发到所有队列中
for q in queues:
print('q put item :',item)
q.put(item)
return reg, run
if __name__ == '__main__':
import sys
# path = sys.argv[1]
path = 'test1.log'
reg, run = dispatcher(load(path))
reg(status_handler, 10, 5) # VE
reg(browser_handler, 5, 5)
run() # 运行
test.log文件内容如下
66.249.69.131 - - [19/Feb/2013:10:23:29 +0800] "GET /robots.txt HTTP/1.1" 404 162 "-" "Mozilla/5.0 (compatible; Googlebot/2.1; + http:///bot.html)"
输出:
q put item : {'remote': '183.60.212.153', 'datetime': datetime.datetime(2013, 2, 19, 10, 23, 29, tzinfo=datetime.timezone(datetime.timedelta(seconds=28800))), 'method': '"GET', 'url': '/o2o/media.html?menu=3', 'protocol': 'НТТР/1.1', 'status': 200, 'size': 16691, 'useragent': <user_agents.parsers.UserAgent object at 0x10308f0e0>}
q put item : {'remote': '183.60.212.153', 'datetime':