铺垫
在大量的实践中,似乎我们总是通过类似的方式来使用异步编程:
- 监听事件
- 事件发生执行对应的回调函数
- 回调完成(可能产生新的事件添加进监听队列)
- 回到1,监听事件
因此我们将这样的异步模式称为Reactor模式,例如在iOS开发中的Run Loop概念,实际上非常类似于Reactor loop,主线程的Run Loop监听屏幕UI事件,一旦发生UI事件则执行对应的事件处理代码,还可以通过GCD等方式产生事件至主线程执行。
上图是boost对Reactor模式的描绘,Twisted的设计就是基于这样的Reactor模式,Twisted程序就是在等待事件、处理事件的过程中不断循环。
from twisted.internet import reactor reactor.run()
reactor是Twisted程序中的单例对象。
reactor
reactor是事件管理器,用于注册、注销事件,运行事件循环,当事件发生时调用回调函数处理。关于reactor有下面几个结论:
- Twisted的reactor只有通过调用reactor.run()来启动。
- reactor循环是在其开始的进程中运行,也就是运行在主进程中。
- 一旦启动,就会一直运行下去。reactor就会在程序的控制下(或者具体在一个启动它的线程的控制下)。
- reactor循环并不会消耗任何CPU的资源。
- 并不需要显式的创建reactor,只需要引入就OK了。
最后一条需要解释清楚。在Twisted中,reactor是Singleton(也就是单例模式),即在一个程序中只能有一个reactor,并且只要你引入它就相应地创建一个。上面引入的方式这是twisted默认使用的方法,当然了,twisted还有其它可以引入reactor的方法。例如,可以使用twisted.internet.pollreactor中的系统调用来poll来代替select方法。
若使用其它的reactor,需要在引入twisted.internet.reactor前安装它。下面是安装pollreactor的方法:
from twisted.internet import pollreactor pollreactor.install()
如果你没有安装其它特殊的reactor而引入了twisted.internet.reactor,那么Twisted会根据操作系统安装默认的reactor。正因为如此,习惯性做法不要在最顶层的模块内引入reactor以避免安装默认reactor,而是在你要使用reactor的区域内安装。
下面是使用 pollreactor重写上上面的程序:
from twited.internet import pollreactor pollreactor.install() from twisted.internet import reactor reactor.run()
那么reactor是如何实现单例的?来看一下from twisted.internet import reactor做了哪些事情就并明白了。
下面是twisted/internet/reactor.py的部分代码:
# twisted/internet/reactor.py import sys del sys.modules['twisted.internet.reactor'] from twisted.internet import default default.install()
注:Python中所有加载到内存的模块都放在sys.modules,它是一个全局字典。当import一个模块时首先会在这个列表中查找是否已经加载了此模块,如果加载了则只是将模块的名字加入到正在调用import的模块的命名空间中。如果没有加载则从sys.path目录中按照模块名称查找模块文件,找到后将模块载入内存,并加入到sys.modules中,并将名称导入到当前的命名空间中。
假如我们是第一次运行from twisted.internet import reactor,因为sys.modules中还没有twisted.internet.reactor,所以会运行reactory.py中的代码,安装默认的reactor。之后,如果导入的话,因为sys.modules中已存在该模块,所以会直接将sys.modules中的twisted.internet.reactor导入到当前命名空间。
default中的install:
# twisted/internet/default.py def _getInstallFunction(platform): """ Return a function to install the reactor most suited for the given platform. @param platform: The platform for which to select a reactor. @type platform: L{twisted.python.runtime.Platform} @return: A zero-argument callable which will install the selected reactor. """ try: if platform.isLinux(): try: from twisted.internet.epollreactor import install except ImportError: from twisted.internet.pollreactor import install elif platform.getType() == 'posix' and not platform.isMacOSX(): from twisted.internet.pollreactor import install else: from twisted.internet.selectreactor import install except ImportError: from twisted.internet.selectreactor import install return install install = _getInstallFunction(platform)
很明显,default中会根据平台获取相应的install。Linux下会首先使用epollreactor,如果内核还不支持,就只能使用pollreactor。Mac平台使用pollreactor,windows使用selectreactor。每种install的实现差不多,这里我们抽取selectreactor中的install来看看。
# twisted/internet/selectreactor.py: def install(): """Configure the twisted mainloop to be run using the select() reactor. """ # 单例 reactor = SelectReactor() from twisted.internet.main import installReactor installReactor(reactor) # twisted/internet/main.py: def installReactor(reactor): """ Install reactor C{reactor}. @param reactor: An object that provides one or more IReactor* interfaces. """ # this stuff should be common to all reactors. import twisted.internet import sys if 'twisted.internet.reactor' in sys.modules: raise error.ReactorAlreadyInstalledError("reactor already installed") twisted.internet.reactor = reactor sys.modules['twisted.internet.reactor'] = reactor
在installReactor中,向sys.modules添加twisted.internet.reactor键,值就是再install中创建的单例reactor。以后要使用reactor,就会导入这个单例了。
SelectReactor # twisted/internet/selectreactor.py @implementer(IReactorFDSet) class SelectReactor(posixbase.PosixReactorBase, _extraBase)
IReactorFDSet接口主要对描述符的获取、添加、删除等操作的方法。这些方法看名字就能知道意思,所以我就没有加注释。
# twisted/internet/interfaces.py class IReactorFDSet(Interface): def addReader(reader): def addWriter(writer): def removeReader(reader): def removeWriter(writer): def removeAll(): def getReaders(): def getWriters(): reactor.listenTCP()
示例中的reactor.listenTCP()注册了一个监听事件,它是父类PosixReactorBase中方法。
# twisted/internet/posixbase.py @implementer(IReactorTCP, IReactorUDP, IReactorMulticast) class PosixReactorBase(_SignalReactorMixin, _DisconnectSelectableMixin, ReactorBase): def listenTCP(self, port, factory, backlog=50, interface=''): p = tcp.Port(port, factory, backlog, interface, self) p.startListening() return p # twisted/internet/tcp.py @implementer(interfaces.IListeningPort) class Port(base.BasePort, _SocketCloser): def __init__(self, port, factory, backlog=50, interface='', reactor=None): """Initialize with a numeric port to listen on. """ base.BasePort.__init__(self, reactor=reactor) self.port = port self.factory = factory self.backlog = backlog if abstract.isIPv6Address(interface): self.addressFamily = socket.AF_INET6 self._addressType = address.IPv6Address self.interface = interface ... def startListening(self): """Create and bind my socket, and begin listening on it. 创建并绑定套接字,开始监听。 This is called on unserialization, and must be called after creating a server to begin listening on the specified port. """ if self._preexistingSocket is None: # Create a new socket and make it listen try: # 创建套接字 skt = self.createInternetSocket() if self.addressFamily == socket.AF_INET6: addr = _resolveIPv6(self.interface, self.port) else: addr = (self.interface, self.port) # 绑定 skt.bind(addr) except socket.error as le: raise CannotListenError(self.interface, self.port, le) # 监听 skt.listen(self.backlog) else: # Re-use the externally specified socket skt = self._preexistingSocket self._preexistingSocket = None # Avoid shutting it down at the end. self._shouldShutdown = False # Make sure that if we listened on port 0, we update that to # reflect what the OS actually assigned us. self._realPortNumber = skt.getsockname()[1] log.msg("%s starting on %s" % ( self._getLogPrefix(self.factory), self._realPortNumber)) # The order of the next 5 lines is kind of bizarre. If no one # can explain it, perhaps we should re-arrange them. self.factory.doStart() self.connected = True self.socket = skt self.fileno = self.socket.fileno self.numberAccepts = 100 # startReading调用reactor的addReader方法将Port加入读集合 self.startReading()
整个逻辑很简单,和正常的server端一样,创建套接字、绑定、监听。不同的是将套接字的描述符添加到了reactor的读集合。那么假如有了client连接过来的话,reactor会监控到,然后触发事件处理程序。
reacotr.run()事件主循环
# twisted/internet/posixbase.py @implementer(IReactorTCP, IReactorUDP, IReactorMulticast) class PosixReactorBase(_SignalReactorMixin, _DisconnectSelectableMixin, ReactorBase) # twisted/internet/base.py class _SignalReactorMixin(object): def startRunning(self, installSignalHandlers=True): """ PosixReactorBase的父类_SignalReactorMixin和ReactorBase都有该函数,但是 _SignalReactorMixin在前,安装mro顺序的话,会先调用_SignalReactorMixin中的。 """ self._installSignalHandlers = installSignalHandlers ReactorBase.startRunning(self) def run(self, installSignalHandlers=True): self.startRunning(installSignalHandlers=installSignalHandlers) self.mainLoop() def mainLoop(self): while self._started: try: while self._started: # Advance simulation time in delayed event # processors. self.runUntilCurrent() t2 = self.timeout() t = self.running and t2 # doIteration是关键,select,poll,epool实现各有不同 self.doIteration(t) except: log.msg("Unexpected error in main loop.") log.err() else: log.msg('Main loop terminated.')
mianLoop就是最终的主循环了,在循环中,调用doIteration方法监控读写描述符的集合,一旦发现有描述符准备好读写,就会调用相应的事件处理程序。
# twisted/internet/selectreactor.py @implementer(IReactorFDSet) class SelectReactor(posixbase.PosixReactorBase, _extraBase): def __init__(self): """ Initialize file descriptor tracking dictionaries and the base class. """ self._reads = set() self._writes = set() posixbase.PosixReactorBase.__init__(self) def doSelect(self, timeout): """ Run one iteration of the I/O monitor loop. This will run all selectables who had input or output readiness waiting for them. """ try: # 调用select方法监控读写集合,返回准备好读写的描述符 r, w, ignored = _select(self._reads, self._writes, [], timeout) except ValueError: # Possibly a file descriptor has gone negative"doRead", self._reads), (w,"doWrite", self._writes)): for selectable in selectables: # if this was disconnected in another thread, kill it. # ^^^^ --- what the !@#*"doRead")
那么假如客户端有连接请求了,就会调用读集合中tcp.Port的doRead方法。
# twisted/internet/tcp.py @implementer(interfaces.IListeningPort) class Port(base.BasePort, _SocketCloser): def doRead(self): """Called when my socket is ready for reading. 当套接字准备好读的时候调用 This accepts a connection and calls self.protocol() to handle the wire-level protocol. """ try: if platformType == "posix": numAccepts = self.numberAccepts else: numAccepts = 1 for i in range(numAccepts): if self.disconnecting: return try: # 调用accept skt, addr = self.socket.accept() except socket.error as e: if e.args[0] in (EWOULDBLOCK, EAGAIN): self.numberAccepts = i break elif e.args[0] == EPERM: continue elif e.args[0] in (EMFILE, ENOBUFS, ENFILE, ENOMEM, ECONNABORTED): log.msg("Could not accept new connection (%s)" % ( errorcode[e.args[0]],)) break raise fdesc._setCloseOnExec(skt.fileno()) protocol = self.factory.buildProtocol(self._buildAddr(addr)) if protocol is None: skt.close() continue s = self.sessionno self.sessionno = s+1 # transport初始化的过程中,会将自身假如到reactor的读集合中,那么当它准备 # 好读的时候,就可以调用它的doRead方法读取客户端发过来的数据了 transport = self.transport(skt, protocol, addr, self, s, self.reactor) protocol.makeConnection(transport) else: self.numberAccepts = self.numberAccepts+20 except: log.deferr()
doRead方法中,调用accept产生了用于接收客户端数据的套接字,将套接字与transport绑定,然后把transport加入到reactor的读集合。当客户端有数据到来时,就会调用transport的doRead方法进行数据读取了。
Connection是Server(transport实例的类)的父类,它实现了doRead方法。
# twisted/internet/tcp.py @implementer(interfaces.ITCPTransport, interfaces.ISystemHandle) class Connection(_TLSConnectionMixin, abstract.FileDescriptor, _SocketCloser, _AbortingMixin): def doRead(self): try: # 接收数据 data = self.socket.recv(self.bufferSize) except socket.error as se: if se.args[0] == EWOULDBLOCK: return else: return main.CONNECTION_LOST return self._dataReceived(data) def _dataReceived(self, data): if not data: return main.CONNECTION_DONE # 调用我们自定义protocol的dataReceived方法处理数据 rval = self.protocol.dataReceived(data) if rval is not None: offender = self.protocol.dataReceived warningFormat = ( 'Returning a value other than None from %(fqpn)s is ' 'deprecated since %(version)s.') warningString = deprecate.getDeprecationWarningString( offender, versions.Version('Twisted', 11, 0, 0), format=warningFormat) deprecate.warnAboutFunction(offender, warningString) return rval
_dataReceived中调用了示例中我们自定义的EchoProtocol的dataReceived方法处理数据。
至此,一个简单的流程,从创建监听事件,到接收客户端数据就此结束了。
稳了!魔兽国服回归的3条重磅消息!官宣时间再确认!
昨天有一位朋友在大神群里分享,自己亚服账号被封号之后居然弹出了国服的封号信息对话框。
这里面让他访问的是一个国服的战网网址,com.cn和后面的zh都非常明白地表明这就是国服战网。
而他在复制这个网址并且进行登录之后,确实是网易的网址,也就是我们熟悉的停服之后国服发布的暴雪游戏产品运营到期开放退款的说明。这是一件比较奇怪的事情,因为以前都没有出现这样的情况,现在突然提示跳转到国服战网的网址,是不是说明了简体中文客户端已经开始进行更新了呢?
更新日志
- 小骆驼-《草原狼2(蓝光CD)》[原抓WAV+CUE]
- 群星《欢迎来到我身边 电影原声专辑》[320K/MP3][105.02MB]
- 群星《欢迎来到我身边 电影原声专辑》[FLAC/分轨][480.9MB]
- 雷婷《梦里蓝天HQⅡ》 2023头版限量编号低速原抓[WAV+CUE][463M]
- 群星《2024好听新歌42》AI调整音效【WAV分轨】
- 王思雨-《思念陪着鸿雁飞》WAV
- 王思雨《喜马拉雅HQ》头版限量编号[WAV+CUE]
- 李健《无时无刻》[WAV+CUE][590M]
- 陈奕迅《酝酿》[WAV分轨][502M]
- 卓依婷《化蝶》2CD[WAV+CUE][1.1G]
- 群星《吉他王(黑胶CD)》[WAV+CUE]
- 齐秦《穿乐(穿越)》[WAV+CUE]
- 发烧珍品《数位CD音响测试-动向效果(九)》【WAV+CUE】
- 邝美云《邝美云精装歌集》[DSF][1.6G]
- 吕方《爱一回伤一回》[WAV+CUE][454M]