目录 | | 目录 | |
---|---|---|---|
迭代器与生成器(一) | 1.手动遍历迭代器 2.代理迭代 3.使用生成器创建新的迭代模式 4.实现迭代器协议 | 迭代器与生成器(三) | 9.排列组合的迭代 10.序列上索引值迭代 11.同时迭代多个序列 12.不同集合上元素的迭代 |
迭代器与生成器(二) | 5.反向迭代 6.带有外部状态的生成器函数 7.迭代器切片 8.跳过可迭代对象的开始部分 | 迭代器与生成器(四) | 13.创建数据处理管道 14.展开嵌套的序列 15.顺序迭代合并后的排序迭代对象 16.迭代器代替 while 无限循环 |
迭代器与生成器(四)
- 13.创建数据处理管道
- 14.展开嵌套的序列
- 15.顺序迭代合并后的排序迭代对象
- 16.迭代器代替 while 无限循环
13.创建数据处理管道
你想以数据管道(类似 Unix 管道)的方式迭代处理数据。比如,你有个大量的数据需要处理,但是不能将它们一次性放入内存中。
生成器函数是一个实现管道机制的好办法。为了演示,假定你要处理一个非常大的日志文件目录:
foo/access-log-012007.gzaccess-log-022007.gzaccess-log-032007.gz...access-log-012008
bar/access-log-092007.bz2...access-log-022008
假设每个日志文件包含这样的数据:
124.115.6.12 - - [10/Jul/2012:00:18:50 -0500] "GET /robots.txt ..." 200 71
210.212.209.67 - - [10/Jul/2012:00:18:51 -0500] "GET /ply/ ..." 200 11875
210.212.209.67 - - [10/Jul/2012:00:18:51 -0500] "GET /favicon.ico ..." 404 369
61.135.216.105 - - [10/Jul/2012:00:20:04 -0500] "GET /blog/atom.xml ..." 304 -
...
为了处理这些文件,你可以定义一个由多个执行特定任务独立任务的简单生成器函数组成的容器。就像这样:
import os
import fnmatch
import gzip
import bz2
import redef gen_find(filepat, top):'''在目录树中查找所有匹配指定通配符模式的文件名。'''for path, dirlist, filelist in os.walk(top): # 使用 os.walk(top) 递归遍历 top 目录及其子目录。for name in fnmatch.filter(filelist, filepat): # 对于每个目录,筛选出匹配 filepat 的文件名。yield os.path.join(path, name) # 使用 yield 生成每个匹配文件的完整路径(拼接目录路径和文件名)。def gen_opener(filenames):'''按顺序打开一系列文件,每次生成一个文件对象。文件会在下一次迭代前关闭。'''for filename in filenames:if filename.endswith('.gz'):f = gzip.open(filename, 'rt')elif filename.endswith('.bz2'):f = bz2.open(filename, 'rt')else:f = open(filename, 'rt')yield ff.close() # 由于生成器会在 yield 后暂停,文件对象的实际关闭是在调用代码继续下一次迭代时发生的。def gen_concatenate(iterators):'''将多个迭代器连接成一个单一的序列。'''for it in iterators:yield from itdef gen_grep(pattern, lines):'''在行序列中搜索匹配正则表达式的行。'''pat = re.compile(pattern)for line in lines:if pat.search(line):yield line
现在你可以很容易的将这些函数连起来创建一个处理管道。比如,为了查找包含单词 python
的所有日志行,你可以这样做:
lognames = gen_find('access-log*', 'www')
files = gen_opener(lognames)
lines = gen_concatenate(files)
pylines = gen_grep('(?i)python', lines) # (?i)是正则表达式的忽略大小写标志,因此会匹配 python、Python、PYTHON 等。
for line in pylines:print(line)
如果将来的时候你想扩展管道,你甚至可以在生成器表达式中包装数据。比如,下面这个版本计算出传输的字节数并计算其总和。
lognames = gen_find('access-log*', 'www')
files = gen_opener(lognames)
lines = gen_concatenate(files)
pylines = gen_grep('(?i)python', lines)bytecolumn = (line.rsplit(None,1)[1] for line in pylines)
bytes = (int(x) for x in bytecolumn if x != '-')
print('Total', sum(bytes))
line.rsplit(None, 1)
:None
表示按任意空白字符(空格、制表符等)分割。1
表示最多分割一次,从右侧开始分割。- 例如:
"1234 python 512".rsplit(None, 1)
→['1234 python', '512']
。
[1]
取分割后的最后一列(如'512'
)。bytecolumn
是一个生成器表达式,生成所有行的最后一列值。
示例输出:
# 输入pylines:
# ["1234 python 512", "42 PYTHON 1024", "python - 2048"]
bytecolumn = (line.rsplit(None,1)[1] for line in pylines)
# 生成的bytecolumn内容:
# ['512', '1024', '2048']
# 输入bytecolumn:
# ['512', '1024', '2048']
bytes = (int(x) for x in bytecolumn if x != '-')
# 生成的bytes内容:
# 512, 1024, 2048
以管道方式处理数据可以用来解决各类其他问题,包括解析,读取实时数据,定时轮询等。
为了理解上述代码,重点是要明白 yield
语句作为数据的 生产者,而 for
循环语句作为数据的 消费者。当这些生成器被连在一起后,每个 yield
会将一个单独的数据元素传递给迭代处理管道的下一阶段。在例子最后部分,sum()
函数是最终的程序驱动者,每次从生成器管道中提取出一个元素。
这种方式一个非常好的特点是每个生成器函数很小并且都是独立的。这样的话就很容易编写和维护它们了。很多时候,这些函数如果比较通用的话可以在其他场景重复使用。并且最终将这些组件组合起来的代码看上去非常简单,也很容易理解。
使用这种方式的内存效率也不得不提。上述代码即便是在一个超大型文件目录中也能工作的很好。事实上,由于使用了迭代方式处理,代码运行过程中只需要很小很小的内存。
在调用 gen_concatenate()
函数的时候你可能会有些不太明白。这个函数的目的是将输入序列拼接成一个很长的行序列。itertools.chain()
函数同样有类似的功能,但是它需要将所有可迭代对象作为参数传入。
在上面这个例子中,你可能会写类似这样的语句 lines = itertools.chain(*files)
,这将导致 gen_opener()
生成器被提前全部消费掉。但由于 gen_opener()
生成器每次生成一个打开过的文件,等到下一个迭代步骤时文件就关闭了,因此 chain()
在这里不能这样使用。上面的方案可以避免这种情况。
files = gen_opener(['a.txt', 'b.gz']) # 生成器,每次 yield 一个文件对象
lines = itertools.chain(*files) # 错误!files 会被全部消费,文件可能已关闭
for line in lines: # 实际迭代时文件已关闭,可能报错print(line)
files = gen_opener(['a.txt', 'b.gz']) # 生成器,每次 yield 一个文件对象
lines = gen_concatenate(files) # 惰性拼接,按需打开文件
for line in lines: # 安全迭代print(line)
gen_concatenate()
函数中出现过 yield from
语句,它将 yield
操作代理到父生成器上去。语句 yield from it
简单的返回生成器 it
所产生的所有值。
🚀 对
yield from
仍有疑问的,可以参考博主的这篇博客《yield from 功能解析》。
最后还有一点需要注意的是,管道方式并不是万能的。有时候你想立即处理所有数据。然而,即便是这种情况,使用生成器管道也可以将这类问题从逻辑上变为工作流的处理方式。
14.展开嵌套的序列
你想将一个多层嵌套的序列展开成一个单层列表。
可以写一个包含 yield from
语句的递归生成器来轻松解决这个问题。比如:
from collections import Iterabledef flatten(items, ignore_types=(str, bytes)):for x in items:if isinstance(x, Iterable) and not isinstance(x, ignore_types):yield from flatten(x)else:yield xitems = [1, 2, [3, 4, [5, 6], 7], 8]
# Produces 1 2 3 4 5 6 7 8
for x in flatten(items):print(x)
在上面代码中, isinstance(x, Iterable)
检查某个元素是否是可迭代的。如果是的话, yield from
就会返回所有子例程的值。最终返回结果就是一个没有嵌套的简单序列了。
额外的参数 ignore_types
和检测语句 isinstance(x, ignore_types)
用来将字符串和字节排除在可迭代对象外,防止将它们再展开成单个的字符。这样的话字符串数组就能最终返回我们所期望的结果了。比如:
>>> items = ['Dave', 'Paula', ['Thomas', 'Lewis']]
>>> for x in flatten(items):
... print(x)
...
Dave
Paula
Thomas
Lewis
>>>
语句 yield from
在你想在生成器中调用其他生成器作为子例程的时候非常有用。如果你不使用它的话,那么就必须写额外的 for
循环了。比如:
def flatten(items, ignore_types=(str, bytes)):for x in items:if isinstance(x, Iterable) and not isinstance(x, ignore_types):for i in flatten(x):yield ielse:yield x
尽管只改了一点点,但是 yield from
语句看上去感觉更好,并且也使得代码更简洁清爽。
之前提到的对于字符串和字节的额外检查是为了防止将它们再展开成单个字符。如果还有其他你不想展开的类型,修改参数 ignore_types
即可。
最后要注意的一点是, yield from
在涉及到基于协程和生成器的并发编程中扮演着更加重要的角色。
15.顺序迭代合并后的排序迭代对象
你有一系列排序序列,想将它们合并后得到一个排序序列并在上面迭代遍历。
heapq.merge()
函数可以帮你解决这个问题。比如:
>>> import heapq
>>> a = [1, 4, 7, 10]
>>> b = [2, 5, 6, 11]
>>> for c in heapq.merge(a, b):
... print(c)
...
1
2
4
5
6
7
10
11
heapq.merge
可迭代特性意味着它不会立马读取所有序列。这就意味着你可以在非常长的序列中使用它,而不会有太大的开销。比如,下面是一个例子来演示如何合并两个排序文件:
with open('sorted_file_1', 'rt') as file1, \open('sorted_file_2', 'rt') as file2, \open('merged_file', 'wt') as outf:for line in heapq.merge(file1, file2):outf.write(line)
有一点要强调的是 heapq.merge()
需要 所有输入序列必须是排过序的。特别的,它并不会预先读取所有数据到堆栈中或者预先排序,也不会对输入做任何的排序检测。它仅仅是检查所有序列的开始部分并返回最小的那个,这个过程一直会持续直到所有输入序列中的元素都被遍历完。
16.迭代器代替 while 无限循环
你在代码中使用 while
循环来迭代处理数据,因为它需要调用某个函数或者和一般迭代模式不同的测试条件。能不能用迭代器来重写这个循环呢?
一个常见的 IO 操作程序可能会像下面这样:
CHUNKSIZE = 8192def reader(s):while True:data = s.recv(CHUNKSIZE)if data == b'':breakprocess_data(data)
这种代码通常可以使用 iter()
来代替,如下所示:
def reader2(s):for chunk in iter(lambda: s.recv(CHUNKSIZE), b''):pass# process_data(data)
如果你怀疑它到底能不能正常工作,可以试验下一个简单的例子。比如:
>>> import sys
>>> f = open('/etc/passwd')
>>> for chunk in iter(lambda: f.read(10), ''):
... n = sys.stdout.write(chunk)
...
nobody:*:-2:-2:Unprivileged User:/var/empty:/usr/bin/false
root:*:0:0:System Administrator:/var/root:/bin/sh
daemon:*:1:1:System Services:/var/root:/usr/bin/false
_uucp:*:4:4:Unix to Unix Copy Protocol:/var/spool/uucp:/usr/sbin/uucico
...
>>>
iter
函数一个鲜为人知的特性是它接受一个可选的 callable
对象和一个标记(结尾)值作为输入参数。当以这种方式使用的时候,它会创建一个迭代器,这个迭代器会不断调用 callable
对象直到返回值和标记值相等为止。
iter(callable, sentinel)
的用法
- 功能:
创建一个迭代器,重复调用callable
(可调用对象)直到它返回sentinel
(哨兵值)。- 在此代码中:
callable
是lambda: f.read(10)
:每次调用时从文件f
读取 10 字节。sentinel
是''
:当f.read(10)
返回空字符串时(表示文件结束),迭代停止。- 效果:
将文件分块读取,每次 10 字节,避免一次性加载大文件到内存。
这种特殊的方法对于一些特定的会被重复调用的函数很有效果,比如涉及到 I/O 调用的函数。举例来讲,如果你想从套接字或文件中以数据块的方式读取数据,通常你得要不断重复的执行 read()
或 recv()
,并在后面紧跟一个文件结尾测试来决定是否终止。这节中的方案使用一个简单的 iter()
调用就可以将两者结合起来了。其中 lambda
函数参数是为了创建一个无参的 callable
对象,并为 recv
或 read()
方法提供了 size
参数。