简介
动机
规格摘要
规格:将值发送到生成器
规格:异常和清理
try-finally
中__del__()
可选的扩展
未决问题
示例
参考实现
致谢
参考文献
版权
trampoline function
)就能使协程相互调用且不用阻塞——对异步应用程序有巨大好处。这些应用程序可以编写协程来运行非阻塞的 socket I/O,通过给 I/O 调度器提供控制,直到数据被发送或变为可用。同时,执行 I/O 的代码只需像如下方式操作,就能暂停执行,直到 nonblocking_read() 继续产生一个值:data = (yield nonblocking_read(my_socket, nbytes))
StopIteration
。StopIteration
(如果生成器没有捕获传入的异常,或者它引发了其它异常,则该异常会传递给调用者。)GeneratorExit
。如果生成器在之后引发 StopIteration
(通过正常退出,或者已经被关闭)或 GeneratorExit
(通过不捕获异常),则 close() 返回给其调用者。如果生成器产生一个值,则抛出 RuntimeError
。如果生成器引发任何其它异常,也会传递给调用者。如果生成器已经退出(异常退出或正常退出),则 close() 不执行任何操作。TypeError
(可能是由于某种逻辑错误)。所以,在与协程通信前,必须先调用 next() 或 send(None) ,来将程序推进到第一个 yield 表达式。StopIteration
异常(当生成器正常退出,或早已退出时)。如果生成器出现未捕获的异常,则它会传给调用者。x = yield 42
x = yield
x = 12 + (yield 42)
x = 12 + (yield)
foo(yield 42)
foo(yield)
yield 12,42
是合法的):x = 12 + yield 42
x = 12 + yield
foo(yield 42, 12)
foo(yield, 12)
g.throw(type, value, traceback)
会使生成器在挂起的点处抛出指定的异常(即在 yield 语句中,或在其函数体的头部、且还未调用 next() 时)。如果生成器捕获了异常,并生成了新的值,则它就是 g.throw() 的返回值。如果生成器没有捕获异常,那 throw() 也会抛出同样的异常(它溜走了)。如果生成器抛出其它异常(包括返回时产生的 StopIteration),那该异常会被 throw() 抛出。总之,throw() 的行为类似于 next() 或 send(),除了它是在挂起点处抛出异常。如果生成器已经处于关闭状态,throw() 只会抛出经过它的异常,而不去执行生成器的任何代码。raise type, value, traceback
resolve()
, signal()
, genraise()
, raiseinto()
和 flush()
。没有一个像 throw() 那般合适。def close(self):
try:
self.throw(GeneratorExit)
except (GeneratorExit, StopIteration):
pass
else:
raise RuntimeError("generator ignored GeneratorExit")
# Other exceptions are not caught
__del__()
g.__ del __()
是 g.close() 的装饰器。当生成器对象被作垃圾回收时,会调用它(在 CPython 中,则是它的引用计数变为零时)。如果 close() 引发异常, 异常的堆栈信息(traceback)会被打印到 sys.stderr 并被忽略掉;它不会退回到触发垃圾回收的地方。这与类实例在处理 __del__()
的异常时的方法一样。g.__del__()
。这是当前 CPython 的垃圾收集器的表现。做此限制的原因是,GC 代码需要在一个任意点打破循环,以便回收它,在此之后,不允许 Python 代码“看到”形成循环的对象,因为它们可能处于无效的状态。被用于解开(hanging off)循环的对象不受此限制。当生成器产生另一个值作为对“GeneratorExit”异常的响应时,close()
应该引发什么异常?
我最初选择了 TypeError ,因为它表示生成器函数发生了严重的错误行为,应该通过修改代码来修复。但是 PEP-343 中的 with_template
装饰器类使用了 RuntimeError 来进行类似处理。可以说它们都应该使用相同的异常。我宁愿不为此目的引入新的异常类,因为它不是我希望人们捕获的异常:我希望它变成一个 traceback 给程序员看到,然后进行修复。所以我觉得它们都应该抛出 RuntimeError 。有一些先例:在检测到无限递归的情况下,或者检测到未初始化的对象(由于各种各样的原因),核心 Python 代码会抛出该异常。
Oren Tirosh 建议将 send() 方法重命名为 feed() ,以便能跟 consumer 接口兼容(规范参见:http://effbot.org/zone/consumer.htm)。
def consumer(func):
def wrapper(*args,**kw):
gen = func(*args, **kw)
gen.next()
return gen
wrapper.__name__ = func.__name__
wrapper.__dict__ = func.__dict__
wrapper.__doc__ = func.__doc__
return wrapper
@consumer
def thumbnail_pager(pagesize, thumbsize, destination):
while True:
page = new_image(pagesize)
rows, columns = pagesize / thumbsize
pending = False
try:
for row in xrange(rows):
for column in xrange(columns):
thumb = create_thumbnail((yield), thumbsize)
page.write(
thumb, col*thumbsize.x, row*thumbsize.y )
pending = True
except GeneratorExit:
# close() was called, so flush any pending output
if pending:
destination.send(page)
# then close the downstream consumer, and exit
destination.close()
return
else:
# we finished a page full of thumbnails, so send it
# downstream and keep on looping
destination.send(page)
@consumer
def jpeg_writer(dirname):
fileno = 1
while True:
filename = os.path.join(dirname,"page%04d.jpg" % fileno)
write_jpeg((yield), filename)
fileno += 1
# Put them together to make a function that makes thumbnail
# pages from a list of images and other parameters.
#
def write_thumbnails(pagesize, thumbsize, images, output_dir):
pipeline = thumbnail_pager(
pagesize, thumbsize, jpeg_writer(output_dir)
)
for image in images:
pipeline.send(image)
pipeline.close()
import collections
class Trampoline:
"""Manage communications between coroutines"""
running = False
def __init__(self):
self.queue = collections.deque()
def add(self, coroutine):
"""Request that a coroutine be executed"""
self.schedule(coroutine)
def run(self):
result = None
self.running = True
try:
while self.running and self.queue:
func = self.queue.popleft()
result = func()
return result
finally:
self.running = False
def stop(self):
self.running = False
def schedule(self, coroutine, stack=(), val=None, *exc):
def resume():
value = val
try:
if exc:
value = coroutine.throw(value,*exc)
else:
value = coroutine.send(value)
except:
if stack:
# send the error back to the "caller"
self.schedule(
stack[0], stack[1], *sys.exc_info()
)
else:
# Nothing left in this pseudothread to
# handle it, let it propagate to the
# run loop
raise
if isinstance(value, types.GeneratorType):
# Yielded to a specific coroutine, push the
# current one on the stack, and call the new
# one with no args
self.schedule(value, (coroutine,stack))
elif stack:
# Yielded a result, pop the stack and send the
# value to the caller
self.schedule(stack[0], stack[1], value)
# else: this pseudothread has ended
self.queue.append(resume)
nonblocking_read
、nonblocking_write
和其它 I/O 协程,该例子在连接关闭时抛出 ConnectionLost
):# coroutine function that echos data back on a connected
# socket
#
def echo_handler(sock):
while True:
try:
data = yield nonblocking_read(sock)
yield nonblocking_write(sock, data)
except ConnectionLost:
pass # exit normally if connection lost
# coroutine function that listens for connections on a
# socket, and then launches a service "handler" coroutine
# to service the connection
#
def listen_on(trampoline, sock, handler):
while True:
# get the next incoming connection
connected_socket = yield nonblocking_accept(sock)
# start another coroutine to handle the connection
trampoline.add( handler(connected_socket) )
# Create a scheduler to manage all our coroutines
t = Trampoline()
# Create a coroutine instance to run the echo_handler on
# incoming connections
#
server = listen_on(
t, listening_socket("localhost","echo"), echo_handler
)
# Add the coroutine to the scheduler
t.add(server)
# loop forever, accepting connections and servicing them
# "in parallel"
#
t.run()