协程乍听起来,很简单。 后面会越看越不是想的那么简单,想得越多才会发现其中的奥秘。 python3.7以上版本的使用会有差别,请多注意。

协程

概念

def match(pattern):
    print('looking for ' + pattern)
    try:
        while True:
            s = (yield)
            if pattern in s:
                print(s)
    except GeneratorExit:
        print("=== Done ===")

def read(text, next_coroutine):
    for line in text.split():
        next_coroutine.send(line)

def match_filter(pattern, next_coroutine):
    print('Looking for ' + pattern)
    try:
        while True:
            s =(yield)
            if pattern in s:
                next_coroutine.send(s)
    except GeneratorExit:
        next_coroutine.close()

def print_consumer():
    print("Preparing to print")
    try:
        while True:
            line = (yield)
            print(line)
    except GeneratorExit:
        print("==Done==")





# m = match("Jabberwock")
# m.__next__()
# m.send("the Jabberwock with eyes of flame")
# m.send("go go")
# m.send("the Jabberwock with eyes of flame")
# m.close()
text = 'Commending spending is offending to people pending lending!'
# matcher = match("ending")
# matcher.__next__()
# read(text,matcher)

printer = print_consumer()
printer.__next__()
matcher = match_filter('pend', printer)
matcher.__next__()
read(text, matcher)

多任务

Preparing to print
Looking for pend
spending
pending
==Done==

/#+NAME:

def match(pattern):
    print('looking for ' + pattern)
    try:
        while True:
            s = (yield)
            if pattern in s:
                print(s)
    except GeneratorExit:
        print("=== Done ===")
def read_to_many(text, coroutines):
    for word in text.split():
        for coroutine in coroutines:
            coroutine.send(word)
    for coroutine in coroutines:
        coroutine.close()

m = match("mend")
m.__next__()
p = match("pe")
p.__next__()
text = "Commending spending people pending"
read_to_many(text,[m,p])

grequests

looking for mend
looking for pe
Commending
spending
people
pending
=== Done ===
=== Done ===
import grequests

urls = [
    'http://www.heroku.com',
    'http://python-tablib.org',
    'http://httpbin.org',
    'http://python-requests.org',
    'http://fakedomain/',
    'http://kennethreitz.com'
]
rs = (grequests.get(u) for u in urls)
res = grequests.map(rs)
for item in res:
    print(item)
    # if item:
    #     print(item.text)

理解Python协程:从yield/send到yield from再到async/await





None
None

https://blog.csdn.net/soonfly/article/details/78361819

    Python中的协程大概经历了如下三个阶段:
  • 最初的生成器变形yield/send
  • 引入@asyncio.coroutine 和 yield from
  • 在最近的Python 3.5版本中引入async/await关键字

生成器变形yield/send

普通函数中如果使用了yield关键字,那么该函数就不再是普通函数,而是一个生成器 /#+NAME:

def mygen(alist):
    while len(alist) > 0:
        c = randint(0, len(alist)-1)
        yield alist.pop(c)

a = ["aa", "bb", "cc"]
c = mygen(a)
print(c)


类似上面代码中的c就是一个生成器。生成器就是一种迭代器,可以使用for进行迭代。 生成器函数最大的特点是可以接受外部传入的一个变量,并根据变量内容计算结果后 返回。 这一切都是靠生成器内部的send()函数实现的。

def gen():
    value=0
    while True:
        receive = yield value
        if receive == 'e':
            break
        value = 'got:%s'%receive

g = gen()
#print(g.send("guo"))
print(g.send(None))
print(g.send('heelo'))
print(g.send(123456))
#print(g.send('e'))
0
got:heelo
got:123456
    上面生成器函数中最关键也是最易理解错的,就是receive=yield value这句。 如果对循环体的执行步骤理解错误,就会失之毫厘,差之千里。 其实receive = yield value包含了三个步骤:
  • 向函数外跑出(返回)value
  • 暂停(pause),等待next()或send()恢复
  • 赋值receive = MockGetValue().这个MockGetValue()是假想函数,用来接收send()发送进来的值。
    执行流程:
  • 通过g.send(None)或者next(g)启动生成器函数,并执行到第一个yild语句结束的位置。
  • 这里是关键,很多人就是在这里搞糊涂了。运行receive=yield value语句时,我们按照 开始说的拆开来看,实际上只执行了1,2两步,程序返回了value值,并暂停(pause), 并没有执行第三部给receive赋值。因此yield value会输入初始值0。这里要特别注意: 在启动生成器函数时,只能send(None)如果试图输入其他的值,都会得到错误的提示信息。
  • 通过g.send("hello")会传入hello, 从上次暂停的位置继续执行,那么就是运行第三步,
  • 赋值给receive。然后计算出value值,并回到while头部,遇到yield value,程序再次执行了1,2两步。 程序返回了value值,并暂停(pause)。此时yield value会输入"got:hello",并等待send()激活。
  • 通过g.send(123456),会重复第2步,最后输出结果为”got: 123456″。
  • 当我们g.send(‘e’)时,程序会执行break然后推出循环,最后整个函数执行完毕,所以会得到StopIteration异常。

yield from

asyncio.coroutine 和 yield from

从上面可以看出, 在第一次send(None)启动生成器(执行1–>2,通常第一次返回的值没有什么用)之后, 对于外部的每一次send(),生成器的实际在循环中的运行顺序是3–>1–>2,也就是先获取值,然后dosomething, 然后返回一个值,再暂停等待。 /#+NAME:

import asyncio,random
@asyncio.coroutine
def smart_fib(n):
    index = 0
    a = 0
    b = 1
    while index < n:
        sleep_secs = random.uniform(0, 0.2)
        yield from asyncio.sleep(sleep_secs) #通常yield from后都是接的耗时操作
        print('Smart one think {} secs to get {}'.format(sleep_secs, b))
        a, b = b, a + b
        index += 1

@asyncio.coroutine
def stupid_fib(n):
    index = 0
    a = 0
    b = 1
    while index < n:
        sleep_secs = random.uniform(0, 0.4)
        yield from asyncio.sleep(sleep_secs) #通常yield from后都是接的耗时操作
        print('Stupid one think {} secs to get {}'.format(sleep_secs, b))
        a, b = b, a + b
        index += 1

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    tasks = [
        smart_fib(10),
        stupid_fib(10),
    ]
    loop.run_until_complete(asyncio.wait(tasks))
    print('All fib finished.')
    loop.close()

Smart one think 0.11031614424326 secs to get 1 Stupid one think 0.12950662293074858 secs to get 1 Smart one think 0.10591317213352734 secs to get 1 Smart one think 0.11354689044765778 secs to get 2 Smart one think 0.08559092760479364 secs to get 3 Stupid one think 0.33101713290895557 secs to get 1 Stupid one think 0.06224618319358406 secs to get 2 Smart one think 0.11242062800947546 secs to get 5 Smart one think 0.09282076351300741 secs to get 8 Smart one think 0.0011066966499061915 secs to get 13 Smart one think 0.0360142029337974 secs to get 21 Stupid one think 0.25309904734263905 secs to get 3 Smart one think 0.10073901635917995 secs to get 34 Stupid one think 0.038835009996781446 secs to get 5 Smart one think 0.06387797324568914 secs to get 55 Stupid one think 0.16375259650237514 secs to get 8 Stupid one think 0.03817679401995511 secs to get 13 Stupid one think 0.15001471992463522 secs to get 21 Stupid one think 0.07427191480649885 secs to get 34 Stupid one think 0.05027838783129171 secs to get 55 All fib finished.

Python黑魔法 --- 异步IO(asyncio)协程

import asyncio

import time

now = lambda: time.time()

async def do_some_work(x):
    print('Waiting: ', x)

    await asyncio.sleep(x)
    return 'Done after {}s'.format(x)

start = now()

coroutine1 = do_some_work(1)
coroutine2 = do_some_work(2)
coroutine3 = do_some_work(4)

tasks = [
    asyncio.ensure_future(coroutine1),
    asyncio.ensure_future(coroutine2),
    asyncio.ensure_future(coroutine3)
]

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

for task in tasks:
    print('Task ret: ', task.result())

print('TIME: ', now() - start)

Python黑魔法 --- 嵌套协程

Waiting:  1
Waiting:  2
Waiting:  4
Task ret:  Done after 1s
Task ret:  Done after 2s
Task ret:  Done after 4s
TIME:  4.005348205566406
import asyncio

import time

now = lambda: time.time()

async def do_some_work(x):
    print('Waiting: ', x)

    await asyncio.sleep(x)
    return 'Done after {}s'.format(x)


start = now()

coroutine1 = do_some_work(1)
coroutine2 = do_some_work(2)
coroutine3 = do_some_work(4)

async def do_some_deep_work(x):
    print('Waiting: ', x)

    await do_some_work(1)
    return 'Done after {}s'.format(x)
coroutine4 = do_some_deep_work(1)

tasks = [
    asyncio.ensure_future(coroutine1),
    asyncio.ensure_future(coroutine2),
    asyncio.ensure_future(coroutine3),
    asyncio.ensure_future(coroutine4)
]

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

for task in tasks:
    print('Task ret: ', task.result())

print('TIME: ', now() - start)

Waiting: 1 Waiting: 2 Waiting: 4 Waiting: 1 Waiting: 1 Task ret: Done after 1s Task ret: Done after 2s Task ret: Done after 4s Task ret: Done after 1s TIME: 4.000974893569946

正确的实现

  • When to use and when not to use Python 3.5 `await` ?
  • https://stackoverflow.com/questions/33357233/when-to-use-and-when-not-to-use-python-3-5-await/33399896#33399896
  • python-asyncio TypeError: object dict can't be used in 'await' expression
  • https://stackoverflow.com/questions/49822552/python-asyncio-typeerror-object-dict-cant-be-used-in-await-expression
  • Python Async/Await入门指南
  • https://zhuanlan.zhihu.com/p/27258289
  • demo

import asyncio from demo.dm_demo import Predict from collections import defaultdict from argparse import ArgumentParser from concurrent.futures import ThreadPoolExecutor import time import numpy as n

def get_predict():
    predict = Predict("1.1")
    predict.demo_common()
    print("="*100)
    print("version", "1.1")
    return predict

predict = get_predict()
executor = ThreadPoolExecutor(4)
loop = asyncio.get_event_loop()
now = lambda : time.time()

async def sent(sents):
    predict.context = sents
    res = await loop.run_in_executor(executor, predict.demo_common)
    return res[0]

async def process(line):
    try:
        label_zh, label_index, sents, feas = line.split("\t")
        sents = sents.split("<sssss>")
        label_predict = await sent(sents)
        return label_predict
    except Exception as e:
        print("error", e)
        return None

async def p(line):
    res = await process(line)
    return res


def eval_predict(name):
    res = []
    t = []
    with open(name) as ff:
        predict = get_predict()
        tasks = []
        for line in ff.readlines():
            #print("line", line)
            tasks.append(asyncio.ensure_future(p(line)))
            if len(tasks)==4:
                start = now()
                loop.run_until_complete(asyncio.wait(tasks))
                for task in tasks:
                    print(task.result())
                t.append(now()-start)
                print("mean",np.mean(t))
                start = now()
                tasks = []
    return res

if __name__ == '__main__':
    eval_predict("offline_evaluate/different_model_compare/res.txt")