关于并发、并行、同步阻塞、异步非阻塞、线程、进程、协程等这些概念,单纯通过文字恐怕很难有比较深刻的理解,本文就通过代码一步步实现这些并发和异步编程,并进行比较。解释器方面本文选择python3,毕竟python3才是python的未来,并且python3用原生的库实现协程已经非常方便了。
1、准备阶段
下面为所有测试代码所需要的包
#! python3 # coding:utf-8 import socket from concurrent import futures from selectors import DefaultSelector,EVENT_WRITE,EVENT_READ import asyncio import aiohttp import time from time import ctime
在进行不同实现方式的比较时,实现场景就是在进行爬虫开发的时候通过向对方网站发起一系列的http请求访问,统计耗时来判断实现方式的优劣,具体地,通过建立通信套接字,访问新浪主页,返回源码,作为一次请求。先实现一个装饰器用来统计函数的执行时间:
def tsfunc(func): def wrappedFunc(*args,**kargs): start = time.clock() action = func(*args,**kargs) time_delta = time.clock() - start print ('[{0}] {1}() called, time delta: {2}'.format(ctime(),func.__name__,time_delta)) return action return wrappedFunc
输出的格式为:当前时间,调用的函数,函数的执行时间。
2、阻塞/非阻塞和同步/异步
这两对概念不是很好区分,从定义上理解:
阻塞:在进行socket通信过程中,一个线程发起请求,如果当前请求没有返回结果,则进入sleep状态,期间线程挂起不能做其他操作,直到有返回结果,或者超时(如果设置超时的话)。
非阻塞:与阻塞相似,只不过在等待请求结果时,线程并不挂起而是进行其他操作,即在不能立刻得到结果之前,该函数不会阻挂起当前线程,而会立刻返回。
同步:同步和阻塞比较相似,但是二者并不是同一个概念,同步是指完成事件的逻辑,是指一件事完成之后,再完成第二件事,以此类推…
异步:异步和非阻塞比较类似,异步的概念和同步相对。当一个异步过程调用发出后,调用者不能立刻得到结果。实际处理这个调用的部件在完成后,通过状态、通知和回调来通知调用者,实现异步的方式通俗讲就是“等会再告诉你”。
1)阻塞方式
回到代码上,首先实现阻塞方式的请求函数:
def blocking_way(): sock = socket.socket() sock.connect(('www.sina.com',80)) request = 'GET / HTTP/1.0\r\nHOST:www.sina.com\r\n\r\n' sock.send(request.encode('ascii')) response = b'' chunk = sock.recv(4096) while chunk: response += chunk chunk = sock.recv(4096) return response
测试线程、多进程和多线程
# 阻塞无并发 @tsfunc def sync_way(): res = [] for i in range(10): res.append(blocking_way()) return len(res) @tsfunc # 阻塞、多进程 def process_way(): worker = 10 with futures.ProcessPoolExecutor(worker) as executor: futs = {executor.submit(blocking_way) for i in range(10)} return len([fut.result() for fut in futs]) # 阻塞、多线程 @tsfunc def thread_way(): worker = 10 with futures.ThreadPoolExecutor(worker) as executor: futs = {executor.submit(blocking_way) for i in range(10)} return len([fut.result() for fut in futs])
运行结果:
[Wed Dec 13 16:52:25 2017] sync_way() called, time delta: 0.06371647809425328 [Wed Dec 13 16:52:28 2017] process_way() called, time delta: 2.31437644946734 [Wed Dec 13 16:52:28 2017] thread_way() called, time delta: 0.010172946070299727
可见与非并发的方式相比,启动10个进程完成10次请求访问耗费的时间最长,进程确实需要很大的系统开销,相比多线程则效果好得多,启动10个线程并发请求,比顺序请求速度快了6倍左右。
2)非阻塞方式
实现非阻塞的请求代码,与阻塞方式的区别在于等待请求时并不挂起而是直接返回,为了确保能正确读取消息,最原始的方式就是循环读取,知道读取完成为跳出循环,代码如下:
def nonblocking_way(): sock = socket.socket(socket.AF_INET,socket.SOCK_STREAM) sock.setblocking(False) try: sock.connect(('www.sina.com', 80)) except BlockingIOError: pass request = 'GET / HTTP/1.0\r\nHost: www.sina.com\r\n\r\n' data = request.encode('ascii') while True: try: sock.send(data) break except OSError: pass response = b'' while True: try: chunk = sock.recv(4096) while chunk: response += chunk chunk = sock.recv(4096) break except OSError: pass return response
测试单线程异步非阻塞方式:
@tsfunc def async_way(): res = [] for i in range(10): res.append(nonblocking_way()) return len(res)
测试结果与单线程同步阻塞方式相比:
[Wed Dec 13 17:18:30 2017] sync_way() called, time delta: 0.07342884475822574 [Wed Dec 13 17:18:30 2017] async_way() called, time delta: 0.06509009095694886
非阻塞方式起到了一定的效果,但是并不明显,原因肯定是读取消息的时候虽然不是在线程挂起的时候而是在循环读取消息的时候浪费了时间,如果大部分时间读浪费了并没有发挥异步编程的威力,解决的办法就是后面要说的【事件驱动】
3、回调、生成器和协程
a、回调
class Crawler(): def __init__(self,url): self.url = url self.sock = None self.response = b'' def fetch(self): self.sock = socket.socket() self.sock.setblocking(False) try: self.sock.connect(('www.sina.com',80)) except BlockingIOError: pass selector.register(self.sock.fileno(),EVENT_WRITE,self.connected) def connected(self,key,mask): selector.unregister(key.fd) get = 'GET {0} HTTP/1.0\r\nHost:www.sina.com\r\n\r\n'.format(self.url) self.sock.send(get.encode('ascii')) selector.register(key.fd,EVENT_READ,self.read_response) def read_response(self,key,mask): global stopped while True: try: chunk = self.sock.recv(4096) if chunk: self.response += chunk chunk = self.sock.recv(4096) else: selector.unregister(key.fd) urls_todo.remove(self.url) if not urls_todo: stopped = True break except: pass def loop(): while not stopped: events = selector.select() for event_key,event_mask in events: callback = event_key.data callback(event_key,event_mask) @tsfunc def callback_way(): for url in urls_todo: crawler = Crawler(url) crawler.fetch() loop1()
这是通过传统回调方式实现的异步编程,结果如下:
[Tue Mar 27 17:52:49 2018] callback_way() called, time delta: 0.054735804048789374
b、生成器
class Crawler2: def __init__(self, url): self.url = url self.response = b'' def fetch(self): global stopped sock = socket.socket() yield from connect(sock, ('www.sina.com', 80)) get = 'GET {0} HTTP/1.0\r\nHost: www.sina.com\r\n\r\n'.format(self.url) sock.send(get.encode('ascii')) self.response = yield from read_all(sock) urls_todo.remove(self.url) if not urls_todo: stopped = True class Task: def __init__(self, coro): self.coro = coro f = Future1() f.set_result(None) self.step(f) def step(self, future): try: # send会进入到coro执行, 即fetch, 直到下次yield # next_future 为yield返回的对象 next_future = self.coro.send(future.result) except StopIteration: return next_future.add_done_callback(self.step) def loop1(): while not stopped: events = selector.select() for event_key,event_mask in events: callback = event_key.data callback()
运行结果如下:
[Tue Mar 27 17:54:27 2018] generate_way() called, time delta: 0.2914336347673473
c、协程
def nonblocking_way(): sock = socket.socket(socket.AF_INET,socket.SOCK_STREAM) sock.setblocking(False) try: sock.connect(('www.sina.com', 80)) except BlockingIOError: pass request = 'GET / HTTP/1.0\r\nHost: www.sina.com\r\n\r\n' data = request.encode('ascii') while True: try: sock.send(data) break except OSError: pass response = b'' while True: try: chunk = sock.recv(4096) while chunk: response += chunk chunk = sock.recv(4096) break except OSError: pass return response @tsfunc def asyncio_way(): tasks = [fetch(host+url) for url in urls_todo] loop.run_until_complete(asyncio.gather(*tasks)) return (len(tasks))
运行结果:
[Tue Mar 27 17:56:17 2018] asyncio_way() called, time delta: 0.43688060698484166
到此终于把并发和异步编程实例代码测试完,下边贴出全部代码,共读者自行测试,在任务量加大时,相信结果会大不一样。
#! python3 # coding:utf-8 import socket from concurrent import futures from selectors import DefaultSelector,EVENT_WRITE,EVENT_READ import asyncio import aiohttp import time from time import ctime def tsfunc(func): def wrappedFunc(*args,**kargs): start = time.clock() action = func(*args,**kargs) time_delta = time.clock() - start print ('[{0}] {1}() called, time delta: {2}'.format(ctime(),func.__name__,time_delta)) return action return wrappedFunc def blocking_way(): sock = socket.socket() sock.connect(('www.sina.com',80)) request = 'GET / HTTP/1.0\r\nHOST:www.sina.com\r\n\r\n' sock.send(request.encode('ascii')) response = b'' chunk = sock.recv(4096) while chunk: response += chunk chunk = sock.recv(4096) return response def nonblocking_way(): sock = socket.socket(socket.AF_INET,socket.SOCK_STREAM) sock.setblocking(False) try: sock.connect(('www.sina.com', 80)) except BlockingIOError: pass request = 'GET / HTTP/1.0\r\nHost: www.sina.com\r\n\r\n' data = request.encode('ascii') while True: try: sock.send(data) break except OSError: pass response = b'' while True: try: chunk = sock.recv(4096) while chunk: response += chunk chunk = sock.recv(4096) break except OSError: pass return response selector = DefaultSelector() stopped = False urls_todo = ['/','/1','/2','/3','/4','/5','/6','/7','/8','/9'] class Crawler(): def __init__(self,url): self.url = url self.sock = None self.response = b'' def fetch(self): self.sock = socket.socket() self.sock.setblocking(False) try: self.sock.connect(('www.sina.com',80)) except BlockingIOError: pass selector.register(self.sock.fileno(),EVENT_WRITE,self.connected) def connected(self,key,mask): selector.unregister(key.fd) get = 'GET {0} HTTP/1.0\r\nHost:www.sina.com\r\n\r\n'.format(self.url) self.sock.send(get.encode('ascii')) selector.register(key.fd,EVENT_READ,self.read_response) def read_response(self,key,mask): global stopped while True: try: chunk = self.sock.recv(4096) if chunk: self.response += chunk chunk = self.sock.recv(4096) else: selector.unregister(key.fd) urls_todo.remove(self.url) if not urls_todo: stopped = True break except: pass def loop(): while not stopped: events = selector.select() for event_key,event_mask in events: callback = event_key.data callback(event_key,event_mask) # 基于生成器的协程 class Future: def __init__(self): self.result = None self._callbacks = [] def add_done_callback(self,fn): self._callbacks.append(fn) def set_result(self,result): self.result = result for fn in self._callbacks: fn(self) class Crawler1(): def __init__(self,url): self.url = url self.response = b'' def fetch(self): sock = socket.socket() sock.setblocking(False) try: sock.connect(('www.sina.com',80)) except BlockingIOError: pass f = Future() def on_connected(): f.set_result(None) selector.register(sock.fileno(),EVENT_WRITE,on_connected) yield f selector.unregister(sock.fileno()) get = 'GET {0} HTTP/1.0\r\nHost: www.sina.com\r\n\r\n'.format(self.url) sock.send(get.encode('ascii')) global stopped while True: f = Future() def on_readable(): f.set_result(sock.recv(4096)) selector.register(sock.fileno(),EVENT_READ,on_readable) chunk = yield f selector.unregister(sock.fileno()) if chunk: self.response += chunk else: urls_todo.remove(self.url) if not urls_todo: stopped = True break # yield from 改进的生成器协程 class Future1: def __init__(self): self.result = None self._callbacks = [] def add_done_callback(self,fn): self._callbacks.append(fn) def set_result(self,result): self.result = result for fn in self._callbacks: fn(self) def __iter__(self): yield self return self.result def connect(sock, address): f = Future1() sock.setblocking(False) try: sock.connect(address) except BlockingIOError: pass def on_connected(): f.set_result(None) selector.register(sock.fileno(), EVENT_WRITE, on_connected) yield from f selector.unregister(sock.fileno()) def read(sock): f = Future1() def on_readable(): f.set_result(sock.recv(4096)) selector.register(sock.fileno(), EVENT_READ, on_readable) chunk = yield from f selector.unregister(sock.fileno()) return chunk def read_all(sock): response = [] chunk = yield from read(sock) while chunk: response.append(chunk) chunk = yield from read(sock) return b''.join(response) class Crawler2: def __init__(self, url): self.url = url self.response = b'' def fetch(self): global stopped sock = socket.socket() yield from connect(sock, ('www.sina.com', 80)) get = 'GET {0} HTTP/1.0\r\nHost: www.sina.com\r\n\r\n'.format(self.url) sock.send(get.encode('ascii')) self.response = yield from read_all(sock) urls_todo.remove(self.url) if not urls_todo: stopped = True class Task: def __init__(self, coro): self.coro = coro f = Future1() f.set_result(None) self.step(f) def step(self, future): try: # send会进入到coro执行, 即fetch, 直到下次yield # next_future 为yield返回的对象 next_future = self.coro.send(future.result) except StopIteration: return next_future.add_done_callback(self.step) def loop1(): while not stopped: events = selector.select() for event_key,event_mask in events: callback = event_key.data callback() # asyncio 协程 host = 'http://www.sina.com' loop = asyncio.get_event_loop() async def fetch(url): async with aiohttp.ClientSession(loop=loop) as session: async with session.get(url) as response: response = await response.read() return response @tsfunc def asyncio_way(): tasks = [fetch(host+url) for url in urls_todo] loop.run_until_complete(asyncio.gather(*tasks)) return (len(tasks)) @tsfunc def sync_way(): res = [] for i in range(10): res.append(blocking_way()) return len(res) @tsfunc def process_way(): worker = 10 with futures.ProcessPoolExecutor(worker) as executor: futs = {executor.submit(blocking_way) for i in range(10)} return len([fut.result() for fut in futs]) @tsfunc def thread_way(): worker = 10 with futures.ThreadPoolExecutor(worker) as executor: futs = {executor.submit(blocking_way) for i in range(10)} return len([fut.result() for fut in futs]) @tsfunc def async_way(): res = [] for i in range(10): res.append(nonblocking_way()) return len(res) @tsfunc def callback_way(): for url in urls_todo: crawler = Crawler(url) crawler.fetch() loop1() @tsfunc def generate_way(): for url in urls_todo: crawler = Crawler2(url) Task(crawler.fetch()) loop1() if __name__ == '__main__': #sync_way() #process_way() #thread_way() #async_way() #callback_way() #generate_way() asyncio_way()
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。
python,并发编程,异步编程
《魔兽世界》大逃杀!60人新游玩模式《强袭风暴》3月21日上线
暴雪近日发布了《魔兽世界》10.2.6 更新内容,新游玩模式《强袭风暴》即将于3月21 日在亚服上线,届时玩家将前往阿拉希高地展开一场 60 人大逃杀对战。
艾泽拉斯的冒险者已经征服了艾泽拉斯的大地及遥远的彼岸。他们在对抗世界上最致命的敌人时展现出过人的手腕,并且成功阻止终结宇宙等级的威胁。当他们在为即将于《魔兽世界》资料片《地心之战》中来袭的萨拉塔斯势力做战斗准备时,他们还需要在熟悉的阿拉希高地面对一个全新的敌人──那就是彼此。在《巨龙崛起》10.2.6 更新的《强袭风暴》中,玩家将会进入一个全新的海盗主题大逃杀式限时活动,其中包含极高的风险和史诗级的奖励。
《强袭风暴》不是普通的战场,作为一个独立于主游戏之外的活动,玩家可以用大逃杀的风格来体验《魔兽世界》,不分职业、不分装备(除了你在赛局中捡到的),光是技巧和战略的强弱之分就能决定出谁才是能坚持到最后的赢家。本次活动将会开放单人和双人模式,玩家在加入海盗主题的预赛大厅区域前,可以从强袭风暴角色画面新增好友。游玩游戏将可以累计名望轨迹,《巨龙崛起》和《魔兽世界:巫妖王之怒 经典版》的玩家都可以获得奖励。
更新日志
- 小骆驼-《草原狼2(蓝光CD)》[原抓WAV+CUE]
- 群星《欢迎来到我身边 电影原声专辑》[320K/MP3][105.02MB]
- 群星《欢迎来到我身边 电影原声专辑》[FLAC/分轨][480.9MB]
- 雷婷《梦里蓝天HQⅡ》 2023头版限量编号低速原抓[WAV+CUE][463M]
- 群星《2024好听新歌42》AI调整音效【WAV分轨】
- 王思雨-《思念陪着鸿雁飞》WAV
- 王思雨《喜马拉雅HQ》头版限量编号[WAV+CUE]
- 李健《无时无刻》[WAV+CUE][590M]
- 陈奕迅《酝酿》[WAV分轨][502M]
- 卓依婷《化蝶》2CD[WAV+CUE][1.1G]
- 群星《吉他王(黑胶CD)》[WAV+CUE]
- 齐秦《穿乐(穿越)》[WAV+CUE]
- 发烧珍品《数位CD音响测试-动向效果(九)》【WAV+CUE】
- 邝美云《邝美云精装歌集》[DSF][1.6G]
- 吕方《爱一回伤一回》[WAV+CUE][454M]