详解Python的Twisted框架中reactor事件管理器的用法

发表于 5年以前  | 总阅读数:1022 次

铺垫
在大量的实践中,似乎我们总是通过类似的方式来使用异步编程:

  • 监听事件
  • 事件发生执行对应的回调函数
  • 回调完成(可能产生新的事件添加进监听队列)
  • 回到1,监听事件

因此我们将这样的异步模式称为Reactor模式,例如在iOS开发中的Run Loop概念,实际上非常类似于Reactor loop,主线程的Run Loop监听屏幕UI事件,一旦发生UI事件则执行对应的事件处理代码,还可以通过GCD等方式产生事件至主线程执行。

2016525114543434.png \(524×364\)

上图是boost对Reactor模式的描绘,Twisted的设计就是基于这样的Reactor模式,Twisted程序就是在等待事件、处理事件的过程中不断循环。


    from twisted.internet import reactor
    reactor.run()

reactor是Twisted程序中的单例对象。

reactor
reactor是事件管理器,用于注册、注销事件,运行事件循环,当事件发生时调用回调函数处理。关于reactor有下面几个结论:

  • Twisted的reactor只有通过调用reactor.run()来启动。
  • reactor循环是在其开始的进程中运行,也就是运行在主进程中。
  • 一旦启动,就会一直运行下去。reactor就会在程序的控制下(或者具体在一个启动它的线程的控制下)。
  • reactor循环并不会消耗任何CPU的资源。
  • 并不需要显式的创建reactor,只需要引入就OK了。

最后一条需要解释清楚。在Twisted中,reactor是Singleton(也就是单例模式),即在一个程序中只能有一个reactor,并且只要你引入它就相应地创建一个。上面引入的方式这是twisted默认使用的方法,当然了,twisted还有其它可以引入reactor的方法。例如,可以使用twisted.internet.pollreactor中的系统调用来poll来代替select方法。

若使用其它的reactor,需要在引入twisted.internet.reactor前安装它。下面是安装pollreactor的方法:


    from twisted.internet import pollreactor
    pollreactor.install()

如果你没有安装其它特殊的reactor而引入了twisted.internet.reactor,那么Twisted会根据操作系统安装默认的reactor。正因为如此,习惯性做法不要在最顶层的模块内引入reactor以避免安装默认reactor,而是在你要使用reactor的区域内安装。
下面是使用 pollreactor重写上上面的程序:


    from twited.internet import pollreactor
    pollreactor.install()
    from twisted.internet import reactor
    reactor.run()

那么reactor是如何实现单例的?来看一下from twisted.internet import reactor做了哪些事情就并明白了。

下面是twisted/internet/reactor.py的部分代码:


    # twisted/internet/reactor.py
    import sys
    del sys.modules['twisted.internet.reactor']
    from twisted.internet import default
    default.install()

注:Python中所有加载到内存的模块都放在sys.modules,它是一个全局字典。当import一个模块时首先会在这个列表中查找是否已经加载了此模块,如果加载了则只是将模块的名字加入到正在调用import的模块的命名空间中。如果没有加载则从sys.path目录中按照模块名称查找模块文件,找到后将模块载入内存,并加入到sys.modules中,并将名称导入到当前的命名空间中。

假如我们是第一次运行from twisted.internet import reactor,因为sys.modules中还没有twisted.internet.reactor,所以会运行reactory.py中的代码,安装默认的reactor。之后,如果导入的话,因为sys.modules中已存在该模块,所以会直接将sys.modules中的twisted.internet.reactor导入到当前命名空间。

default中的install:


    # twisted/internet/default.py
    def _getInstallFunction(platform):
      """
      Return a function to install the reactor most suited for the given platform.

      @param platform: The platform for which to select a reactor.
      @type platform: L{twisted.python.runtime.Platform}

      @return: A zero-argument callable which will install the selected
        reactor.
      """
      try:
        if platform.isLinux():
          try:
            from twisted.internet.epollreactor import install
          except ImportError:
            from twisted.internet.pollreactor import install
        elif platform.getType() == 'posix' and not platform.isMacOSX():
          from twisted.internet.pollreactor import install
        else:
          from twisted.internet.selectreactor import install
      except ImportError:
        from twisted.internet.selectreactor import install
      return install


    install = _getInstallFunction(platform)

很明显,default中会根据平台获取相应的install。Linux下会首先使用epollreactor,如果内核还不支持,就只能使用pollreactor。Mac平台使用pollreactor,windows使用selectreactor。每种install的实现差不多,这里我们抽取selectreactor中的install来看看。


    # twisted/internet/selectreactor.py:
    def install():
      """Configure the twisted mainloop to be run using the select() reactor.
      """
      # 单例
      reactor = SelectReactor()
      from twisted.internet.main import installReactor
      installReactor(reactor)

    # twisted/internet/main.py:
    def installReactor(reactor):
      """
      Install reactor C{reactor}.

      @param reactor: An object that provides one or more IReactor* interfaces.
      """
      # this stuff should be common to all reactors.
      import twisted.internet
      import sys
      if 'twisted.internet.reactor' in sys.modules:
        raise error.ReactorAlreadyInstalledError("reactor already installed")
      twisted.internet.reactor = reactor
      sys.modules['twisted.internet.reactor'] = reactor

在installReactor中,向sys.modules添加twisted.internet.reactor键,值就是再install中创建的单例reactor。以后要使用reactor,就会导入这个单例了。


    SelectReactor
    # twisted/internet/selectreactor.py
    @implementer(IReactorFDSet)
    class SelectReactor(posixbase.PosixReactorBase, _extraBase)

implementer表示SelectReactor实现了IReactorFDSet接口的方法,这里用到了zope.interface,它是python中的接口实现,有兴趣的同学可以去看下。

IReactorFDSet接口主要对描述符的获取、添加、删除等操作的方法。这些方法看名字就能知道意思,所以我就没有加注释。


    # twisted/internet/interfaces.py
    class IReactorFDSet(Interface):

      def addReader(reader):

      def addWriter(writer):

      def removeReader(reader):

      def removeWriter(writer):

      def removeAll():

      def getReaders():

      def getWriters():
    reactor.listenTCP()

示例中的reactor.listenTCP()注册了一个监听事件,它是父类PosixReactorBase中方法。


    # twisted/internet/posixbase.py
    @implementer(IReactorTCP, IReactorUDP, IReactorMulticast)
    class PosixReactorBase(_SignalReactorMixin, _DisconnectSelectableMixin,
                ReactorBase):

      def listenTCP(self, port, factory, backlog=50, interface=''):
        p = tcp.Port(port, factory, backlog, interface, self)
        p.startListening()
        return p

    # twisted/internet/tcp.py
    @implementer(interfaces.IListeningPort)
    class Port(base.BasePort, _SocketCloser):
      def __init__(self, port, factory, backlog=50, interface='', reactor=None):
        """Initialize with a numeric port to listen on.
        """
        base.BasePort.__init__(self, reactor=reactor)
        self.port = port
        self.factory = factory
        self.backlog = backlog
        if abstract.isIPv6Address(interface):
          self.addressFamily = socket.AF_INET6
          self._addressType = address.IPv6Address
        self.interface = interface
      ...

      def startListening(self):
        """Create and bind my socket, and begin listening on it.
         创建并绑定套接字,开始监听。

        This is called on unserialization, and must be called after creating a
        server to begin listening on the specified port.
        """
        if self._preexistingSocket is None:
          # Create a new socket and make it listen
          try:
            # 创建套接字
            skt = self.createInternetSocket()
            if self.addressFamily == socket.AF_INET6:
              addr = _resolveIPv6(self.interface, self.port)
            else:
              addr = (self.interface, self.port)
            # 绑定
            skt.bind(addr)
          except socket.error as le:
            raise CannotListenError(self.interface, self.port, le)
          # 监听
          skt.listen(self.backlog)
        else:
          # Re-use the externally specified socket
          skt = self._preexistingSocket
          self._preexistingSocket = None
          # Avoid shutting it down at the end.
          self._shouldShutdown = False

        # Make sure that if we listened on port 0, we update that to
        # reflect what the OS actually assigned us.
        self._realPortNumber = skt.getsockname()[1]

        log.msg("%s starting on %s" % (
            self._getLogPrefix(self.factory), self._realPortNumber))

        # The order of the next 5 lines is kind of bizarre. If no one
        # can explain it, perhaps we should re-arrange them.
        self.factory.doStart()
        self.connected = True
        self.socket = skt
        self.fileno = self.socket.fileno
        self.numberAccepts = 100

        # startReading调用reactor的addReader方法将Port加入读集合
        self.startReading()

整个逻辑很简单,和正常的server端一样,创建套接字、绑定、监听。不同的是将套接字的描述符添加到了reactor的读集合。那么假如有了client连接过来的话,reactor会监控到,然后触发事件处理程序。

reacotr.run()事件主循环


    # twisted/internet/posixbase.py
    @implementer(IReactorTCP, IReactorUDP, IReactorMulticast)
    class PosixReactorBase(_SignalReactorMixin, _DisconnectSelectableMixin,
                ReactorBase)

    # twisted/internet/base.py
    class _SignalReactorMixin(object):

      def startRunning(self, installSignalHandlers=True):
        """
        PosixReactorBase的父类_SignalReactorMixin和ReactorBase都有该函数,但是
        _SignalReactorMixin在前,安装mro顺序的话,会先调用_SignalReactorMixin中的。
        """
        self._installSignalHandlers = installSignalHandlers
        ReactorBase.startRunning(self)

      def run(self, installSignalHandlers=True):
        self.startRunning(installSignalHandlers=installSignalHandlers)
        self.mainLoop()

      def mainLoop(self):
        while self._started:
          try:
            while self._started:
              # Advance simulation time in delayed event
              # processors.
              self.runUntilCurrent()
              t2 = self.timeout()
              t = self.running and t2
              # doIteration是关键,select,poll,epool实现各有不同
              self.doIteration(t)
          except:
            log.msg("Unexpected error in main loop.")
            log.err()
          else:
            log.msg('Main loop terminated.')

mianLoop就是最终的主循环了,在循环中,调用doIteration方法监控读写描述符的集合,一旦发现有描述符准备好读写,就会调用相应的事件处理程序。


    # twisted/internet/selectreactor.py
    @implementer(IReactorFDSet)
    class SelectReactor(posixbase.PosixReactorBase, _extraBase):

      def __init__(self):
        """
        Initialize file descriptor tracking dictionaries and the base class.
        """
        self._reads = set()
        self._writes = set()
        posixbase.PosixReactorBase.__init__(self)

      def doSelect(self, timeout):
        """
        Run one iteration of the I/O monitor loop.

        This will run all selectables who had input or output readiness
        waiting for them.
        """
        try:
          # 调用select方法监控读写集合,返回准备好读写的描述符
          r, w, ignored = _select(self._reads,
                      self._writes,
                      [], timeout)
        except ValueError:
          # Possibly a file descriptor has gone negative?
          self._preenDescriptors()
          return
        except TypeError:
          # Something *totally* invalid (object w/o fileno, non-integral
          # result) was passed
          log.err()
          self._preenDescriptors()
          return
        except (select.error, socket.error, IOError) as se:
          # select(2) encountered an error, perhaps while calling the fileno()
          # method of a socket. (Python 2.6 socket.error is an IOError
          # subclass, but on Python 2.5 and earlier it is not.)
          if se.args[0] in (0, 2):
            # windows does this if it got an empty list
            if (not self._reads) and (not self._writes):
              return
            else:
              raise
          elif se.args[0] == EINTR:
            return
          elif se.args[0] == EBADF:
            self._preenDescriptors()
            return
          else:
            # OK, I really don't know what's going on. Blow up.
            raise

        _drdw = self._doReadOrWrite
        _logrun = log.callWithLogger
        for selectables, method, fdset in ((r, "doRead", self._reads),
                          (w,"doWrite", self._writes)):
          for selectable in selectables:
            # if this was disconnected in another thread, kill it.
            # ^^^^ --- what the !@#*? serious! -exarkun
            if selectable not in fdset:
              continue
            # This for pausing input when we're not ready for more.

            # 调用_doReadOrWrite方法
            _logrun(selectable, _drdw, selectable, method)

      doIteration = doSelect

      def _doReadOrWrite(self, selectable, method):
        try:
          # 调用method,doRead或者是doWrite,
          # 这里的selectable可能是我们监听的tcp.Port
          why = getattr(selectable, method)()
        except:
          why = sys.exc_info()[1]
          log.err()
        if why:
          self._disconnectSelectable(selectable, why, method=="doRead")

那么假如客户端有连接请求了,就会调用读集合中tcp.Port的doRead方法。


    # twisted/internet/tcp.py

    @implementer(interfaces.IListeningPort)
    class Port(base.BasePort, _SocketCloser):

      def doRead(self):
        """Called when my socket is ready for reading.
        当套接字准备好读的时候调用

        This accepts a connection and calls self.protocol() to handle the
        wire-level protocol.
        """
        try:
          if platformType == "posix":
            numAccepts = self.numberAccepts
          else:
            numAccepts = 1
          for i in range(numAccepts):
            if self.disconnecting:
              return
            try:
              # 调用accept
              skt, addr = self.socket.accept()
            except socket.error as e:
              if e.args[0] in (EWOULDBLOCK, EAGAIN):
                self.numberAccepts = i
                break
              elif e.args[0] == EPERM:
                continue
              elif e.args[0] in (EMFILE, ENOBUFS, ENFILE, ENOMEM, ECONNABORTED):
                log.msg("Could not accept new connection (%s)" % (
                  errorcode[e.args[0]],))
                break
              raise

            fdesc._setCloseOnExec(skt.fileno())
            protocol = self.factory.buildProtocol(self._buildAddr(addr))
            if protocol is None:
              skt.close()
              continue
            s = self.sessionno
            self.sessionno = s+1
            # transport初始化的过程中,会将自身假如到reactor的读集合中,那么当它准备
            # 好读的时候,就可以调用它的doRead方法读取客户端发过来的数据了
            transport = self.transport(skt, protocol, addr, self, s, self.reactor)
            protocol.makeConnection(transport)
          else:
            self.numberAccepts = self.numberAccepts+20
        except:
          log.deferr()

doRead方法中,调用accept产生了用于接收客户端数据的套接字,将套接字与transport绑定,然后把transport加入到reactor的读集合。当客户端有数据到来时,就会调用transport的doRead方法进行数据读取了。

Connection是Server(transport实例的类)的父类,它实现了doRead方法。


    # twisted/internet/tcp.py
    @implementer(interfaces.ITCPTransport, interfaces.ISystemHandle)
    class Connection(_TLSConnectionMixin, abstract.FileDescriptor, _SocketCloser,
             _AbortingMixin):

      def doRead(self):
        try:
          # 接收数据
          data = self.socket.recv(self.bufferSize)
        except socket.error as se:
          if se.args[0] == EWOULDBLOCK:
            return
          else:
            return main.CONNECTION_LOST

        return self._dataReceived(data)

      def _dataReceived(self, data):
        if not data:
          return main.CONNECTION_DONE
        # 调用我们自定义protocol的dataReceived方法处理数据
        rval = self.protocol.dataReceived(data)
        if rval is not None:
          offender = self.protocol.dataReceived
          warningFormat = (
            'Returning a value other than None from %(fqpn)s is '
            'deprecated since %(version)s.')
          warningString = deprecate.getDeprecationWarningString(
            offender, versions.Version('Twisted', 11, 0, 0),
            format=warningFormat)
          deprecate.warnAboutFunction(offender, warningString)
        return rval

_dataReceived中调用了示例中我们自定义的EchoProtocol的dataReceived方法处理数据。

至此,一个简单的流程,从创建监听事件,到接收客户端数据就此结束了。

 相关推荐

刘强东夫妇:“移民美国”传言被驳斥

京东创始人刘强东和其妻子章泽天最近成为了互联网舆论关注的焦点。有关他们“移民美国”和在美国购买豪宅的传言在互联网上广泛传播。然而,京东官方通过微博发言人发布的消息澄清了这些传言,称这些言论纯属虚假信息和蓄意捏造。

发布于:1年以前  |  808次阅读  |  详细内容 »

博主曝三大运营商,将集体采购百万台华为Mate60系列

日前,据博主“@超能数码君老周”爆料,国内三大运营商中国移动、中国电信和中国联通预计将集体采购百万台规模的华为Mate60系列手机。

发布于:1年以前  |  770次阅读  |  详细内容 »

ASML CEO警告:出口管制不是可行做法,不要“逼迫中国大陆创新”

据报道,荷兰半导体设备公司ASML正看到美国对华遏制政策的负面影响。阿斯麦(ASML)CEO彼得·温宁克在一档电视节目中分享了他对中国大陆问题以及该公司面临的出口管制和保护主义的看法。彼得曾在多个场合表达了他对出口管制以及中荷经济关系的担忧。

发布于:1年以前  |  756次阅读  |  详细内容 »

抖音中长视频App青桃更名抖音精选,字节再发力对抗B站

今年早些时候,抖音悄然上线了一款名为“青桃”的 App,Slogan 为“看见你的热爱”,根据应用介绍可知,“青桃”是一个属于年轻人的兴趣知识视频平台,由抖音官方出品的中长视频关联版本,整体风格有些类似B站。

发布于:1年以前  |  648次阅读  |  详细内容 »

威马CDO:中国每百户家庭仅17户有车

日前,威马汽车首席数据官梅松林转发了一份“世界各国地区拥车率排行榜”,同时,他发文表示:中国汽车普及率低于非洲国家尼日利亚,每百户家庭仅17户有车。意大利世界排名第一,每十户中九户有车。

发布于:1年以前  |  589次阅读  |  详细内容 »

研究发现维生素 C 等抗氧化剂会刺激癌症生长和转移

近日,一项新的研究发现,维生素 C 和 E 等抗氧化剂会激活一种机制,刺激癌症肿瘤中新血管的生长,帮助它们生长和扩散。

发布于:1年以前  |  449次阅读  |  详细内容 »

苹果据称正引入3D打印技术,用以生产智能手表的钢质底盘

据媒体援引消息人士报道,苹果公司正在测试使用3D打印技术来生产其智能手表的钢质底盘。消息传出后,3D系统一度大涨超10%,不过截至周三收盘,该股涨幅回落至2%以内。

发布于:1年以前  |  446次阅读  |  详细内容 »

千万级抖音网红秀才账号被封禁

9月2日,坐拥千万粉丝的网红主播“秀才”账号被封禁,在社交媒体平台上引发热议。平台相关负责人表示,“秀才”账号违反平台相关规定,已封禁。据知情人士透露,秀才近期被举报存在违法行为,这可能是他被封禁的部分原因。据悉,“秀才”年龄39岁,是安徽省亳州市蒙城县人,抖音网红,粉丝数量超1200万。他曾被称为“中老年...

发布于:1年以前  |  445次阅读  |  详细内容 »

亚马逊股东起诉公司和贝索斯,称其在购买卫星发射服务时忽视了 SpaceX

9月3日消息,亚马逊的一些股东,包括持有该公司股票的一家养老基金,日前对亚马逊、其创始人贝索斯和其董事会提起诉讼,指控他们在为 Project Kuiper 卫星星座项目购买发射服务时“违反了信义义务”。

发布于:1年以前  |  444次阅读  |  详细内容 »

苹果上线AppsbyApple网站,以推广自家应用程序

据消息,为推广自家应用,苹果现推出了一个名为“Apps by Apple”的网站,展示了苹果为旗下产品(如 iPhone、iPad、Apple Watch、Mac 和 Apple TV)开发的各种应用程序。

发布于:1年以前  |  442次阅读  |  详细内容 »

特斯拉美国降价引发投资者不满:“这是短期麻醉剂”

特斯拉本周在美国大幅下调Model S和X售价,引发了该公司一些最坚定支持者的不满。知名特斯拉多头、未来基金(Future Fund)管理合伙人加里·布莱克发帖称,降价是一种“短期麻醉剂”,会让潜在客户等待进一步降价。

发布于:1年以前  |  441次阅读  |  详细内容 »

光刻机巨头阿斯麦:拿到许可,继续对华出口

据外媒9月2日报道,荷兰半导体设备制造商阿斯麦称,尽管荷兰政府颁布的半导体设备出口管制新规9月正式生效,但该公司已获得在2023年底以前向中国运送受限制芯片制造机器的许可。

发布于:1年以前  |  437次阅读  |  详细内容 »

马斯克与库克首次隔空合作:为苹果提供卫星服务

近日,根据美国证券交易委员会的文件显示,苹果卫星服务提供商 Globalstar 近期向马斯克旗下的 SpaceX 支付 6400 万美元(约 4.65 亿元人民币)。用于在 2023-2025 年期间,发射卫星,进一步扩展苹果 iPhone 系列的 SOS 卫星服务。

发布于:1年以前  |  430次阅读  |  详细内容 »

𝕏(推特)调整隐私政策,可拿用户发布的信息训练 AI 模型

据报道,马斯克旗下社交平台𝕏(推特)日前调整了隐私政策,允许 𝕏 使用用户发布的信息来训练其人工智能(AI)模型。新的隐私政策将于 9 月 29 日生效。新政策规定,𝕏可能会使用所收集到的平台信息和公开可用的信息,来帮助训练 𝕏 的机器学习或人工智能模型。

发布于:1年以前  |  428次阅读  |  详细内容 »

荣耀CEO谈华为手机回归:替老同事们高兴,对行业也是好事

9月2日,荣耀CEO赵明在采访中谈及华为手机回归时表示,替老同事们高兴,觉得手机行业,由于华为的回归,让竞争充满了更多的可能性和更多的魅力,对行业来说也是件好事。

发布于:1年以前  |  423次阅读  |  详细内容 »

AI操控无人机能力超越人类冠军

《自然》30日发表的一篇论文报道了一个名为Swift的人工智能(AI)系统,该系统驾驶无人机的能力可在真实世界中一对一冠军赛里战胜人类对手。

发布于:1年以前  |  423次阅读  |  详细内容 »

AI生成的蘑菇科普书存在可致命错误

近日,非营利组织纽约真菌学会(NYMS)发出警告,表示亚马逊为代表的电商平台上,充斥着各种AI生成的蘑菇觅食科普书籍,其中存在诸多错误。

发布于:1年以前  |  420次阅读  |  详细内容 »

社交媒体平台𝕏计划收集用户生物识别数据与工作教育经历

社交媒体平台𝕏(原推特)新隐私政策提到:“在您同意的情况下,我们可能出于安全、安保和身份识别目的收集和使用您的生物识别信息。”

发布于:1年以前  |  411次阅读  |  详细内容 »

国产扫地机器人热销欧洲,国产割草机器人抢占欧洲草坪

2023年德国柏林消费电子展上,各大企业都带来了最新的理念和产品,而高端化、本土化的中国产品正在不断吸引欧洲等国际市场的目光。

发布于:1年以前  |  406次阅读  |  详细内容 »

罗永浩吐槽iPhone15和14不会有区别,除了序列号变了

罗永浩日前在直播中吐槽苹果即将推出的 iPhone 新品,具体内容为:“以我对我‘子公司’的了解,我认为 iPhone 15 跟 iPhone 14 不会有什么区别的,除了序(列)号变了,这个‘不要脸’的东西,这个‘臭厨子’。

发布于:1年以前  |  398次阅读  |  详细内容 »
 相关文章
Android插件化方案 5年以前  |  237269次阅读
vscode超好用的代码书签插件Bookmarks 2年以前  |  8108次阅读
 目录