标签归档:dispatcher

Python中的事件系统

问题:Python中的事件系统

您使用哪个Python事件系统?我已经知道pydispatcher,但是我想知道还能找到什么或常用的东西?

我对大型框架中的事件管理器不感兴趣,我宁愿使用可以轻松扩展的小型准系统解决方案。

What event system for Python do you use? I’m already aware of pydispatcher, but I was wondering what else can be found, or is commonly used?

I’m not interested in event managers that are part of large frameworks, I’d rather use a small bare-bones solution that I can easily extend.


回答 0

PyPI软件包

截至2020年6月,这些都是PyPI上与事件相关的软件包,按最新发布日期订购。

还有更多

使用非常不同的术语(事件,信号,处理程序,方法分派,hook等),可以选择很多库。

我试图对上述软件包以及此处答案中提到的技术进行概述。

首先,一些术语…

观察者模式

事件系统最基本的样式是“处理程序方法包”,它是Observer模式的简单实现

基本上,处理程序方法(可调用函数)存储在数组中,并在事件“触发”时分别调用。

发布-订阅

Observer事件系统的缺点是只能在实际的Event对象(或处理程序列表)上注册处理程序。因此,在注册时该事件已经需要存在。

这就是存在事件系统第二种样式的原因: 发布-订阅模式。在这里,处理程序不在事件对象(或处理程序列表)上注册,而是在中央调度程序上注册。通知者也仅与调度程序对话。侦听或发布的内容由“信号”确定,仅不过是名称(字符串)。

中介者模式

可能也很有趣:中介者模式

钩子

在应用程序插件的上下文中通常使用“挂钩”系统。该应用程序包含固定的集成点(挂钩),并且每个插件都可以连接到该挂钩并执行某些操作。

其他“事件”

注意:从上述意义上讲,threading.Event不是“事件系统”。这是一个线程同步系统,其中一个线程等待,直到另一个线程“标记” Event对象。

网络消息传递库也经常使用术语“事件”。有时这些在概念上是相似的;有时不是。它们当然可以遍历线程,进程和计算机的边界。参见例如 pyzmqpymqTwistedTornadogeventeventlet

参考不足

在Python中,持有对方法或对象的引用可确保不会被垃圾收集器删除。这可能是理想的,但它也可能导致内存泄漏:永远不会清除链接的处理程序。

一些事件系统使用弱引用而不是常规引用来解决此问题。

关于各种库的一些话

观察者式事件系统:

  • zope.event显示了其工作原理的基本内容(请参阅Lennart的答案)。注意:此示例甚至不支持处理程序参数。
  • LongPoke的“可调用列表”实现表明,可以通过子类化非常简单地实现这样的事件系统list
  • Felk的变体EventHook还可以确保被调用方法和调用方法的签名。
  • spassig的EventHook(Michael Foord的事件模式)是一个简单的实现。
  • Josip的Valued Lessons Event类基本上是相同的,但是使用a set而不是a list来存储包,并且实现__call__都是合理的补充。
  • PyNotify在概念上相似,还提供了变量和条件(“变量更改事件”)的其他概念。主页不起作用。
  • axel基本上是一个处理程序包,具有更多与线程,错误处理,…相关的功能。
  • python-dispatch需要偶数源类从派生pydispatch.Dispatcher
  • buslane是基于类的,支持单个或多个处理程序,并有助于扩展类型提示。
  • Pithikos的Observer / Event是轻巧的设计。

发布-订阅库:

  • 方向指示灯具有一些漂亮的功能,例如自动断开连接和基于发件人的过滤。
  • PyPubSub是一个稳定的软件包,并承诺“有助于调试和维护主题和消息的高级功能”。
  • pymitter是Node.js EventEmitter2的Python端口,并提供命名空间,通配符和TTL。
  • PyDispatcher似乎在多对多出版物等方面强调灵活性。支持弱引用。
  • 路易是一个经过改进的PyDispatcher,并应努力“在各种各样的环境中”。
  • pypydispatcher基于(您猜对了……)PyDispatcher,并且也可以在PyPy中使用。
  • django.dispatch是重写的PyDispatcher,“界面更有限,但性能更高”。
  • pyeventdispatcher基于PHP的Symfony框架的event-dispatcher。
  • 调度程序是从django.dispatch中提取的,但是已经相当老了。
  • Cristian Garcia的EventManger是一个非常短的实现。

其他:

  • Pluggy包含一个pytest插件使用的挂钩系统。
  • RxPy3实现了Observable模式,并允许合并事件,重试等。
  • Qt的信号和插槽可从PyQtPySide2获得。当在同一线程中使用时,它们可用作回调,或在两个不同线程之间用作事件(使用事件循环)。信号和插槽的局限性是它们仅在从派生的类的对象中工作QObject

PyPI packages

As of June 2020, these are the event-related packages available on PyPI, ordered by most recent release date.

There’s more

That’s a lot of libraries to choose from, using very different terminology (events, signals, handlers, method dispatch, hooks, …).

I’m trying to keep an overview of the above packages, plus the techniques mentioned in the answers here.

First, some terminology…

Observer pattern

The most basic style of event system is the ‘bag of handler methods’, which is a simple implementation of the Observer pattern.

Basically, the handler methods (callables) are stored in an array and are each called when the event ‘fires’.

Publish-Subscribe

The disadvantage of Observer event systems is that you can only register the handlers on the actual Event object (or handlers list). So at registration time the event already needs to exist.

That’s why the second style of event systems exists: the publish-subscribe pattern. Here, the handlers don’t register on an event object (or handler list), but on a central dispatcher. Also the notifiers only talk to the dispatcher. What to listen for, or what to publish is determined by ‘signal’, which is nothing more than a name (string).

Mediator pattern

Might be of interest as well: the Mediator pattern.

Hooks

A ‘hook’ system is usally used in the context of application plugins. The application contains fixed integration points (hooks), and each plugin may connect to that hook and perform certain actions.

Other ‘events’

Note: threading.Event is not an ‘event system’ in the above sense. It’s a thread synchronization system where one thread waits until another thread ‘signals’ the Event object.

Network messaging libraries often use the term ‘events’ too; sometimes these are similar in concept; sometimes not. They can of course traverse thread-, process- and computer boundaries. See e.g. pyzmq, pymq, Twisted, Tornado, gevent, eventlet.

Weak references

In Python, holding a reference to a method or object ensures that it won’t get deleted by the garbage collector. This can be desirable, but it can also lead to memory leaks: the linked handlers are never cleaned up.

Some event systems use weak references instead of regular ones to solve this.

Some words about the various libraries

Observer-style event systems:

  • zope.event shows the bare bones of how this works (see Lennart’s answer). Note: this example does not even support handler arguments.
  • LongPoke’s ‘callable list’ implementation shows that such an event system can be implemented very minimalistically by subclassing list.
  • Felk’s variation EventHook also ensures the signatures of callees and callers.
  • spassig’s EventHook (Michael Foord’s Event Pattern) is a straightforward implementation.
  • Josip’s Valued Lessons Event class is basically the same, but uses a set instead of a list to store the bag, and implements __call__ which are both reasonable additions.
  • PyNotify is similar in concept and also provides additional concepts of variables and conditions (‘variable changed event’). Homepage is not functional.
  • axel is basically a bag-of-handlers with more features related to threading, error handling, …
  • python-dispatch requires the even source classes to derive from pydispatch.Dispatcher.
  • buslane is class-based, supports single- or multiple handlers and facilitates extensive type hints.
  • Pithikos’ Observer/Event is a lightweight design.

Publish-subscribe libraries:

  • blinker has some nifty features such as automatic disconnection and filtering based on sender.
  • PyPubSub is a stable package, and promises “advanced features that facilitate debugging and maintaining topics and messages”.
  • pymitter is a Python port of Node.js EventEmitter2 and offers namespaces, wildcards and TTL.
  • PyDispatcher seems to emphasize flexibility with regards to many-to-many publication etc. Supports weak references.
  • louie is a reworked PyDispatcher and should work “in a wide variety of contexts”.
  • pypydispatcher is based on (you guessed it…) PyDispatcher and also works in PyPy.
  • django.dispatch is a rewritten PyDispatcher “with a more limited interface, but higher performance”.
  • pyeventdispatcher is based on PHP’s Symfony framework’s event-dispatcher.
  • dispatcher was extracted from django.dispatch but is getting fairly old.
  • Cristian Garcia’s EventManger is a really short implementation.

Others:

  • pluggy contains a hook system which is used by pytest plugins.
  • RxPy3 implements the Observable pattern and allows merging events, retry etc.
  • Qt’s Signals and Slots are available from PyQt or PySide2. They work as callback when used in the same thread, or as events (using an event loop) between two different threads. Signals and Slots have the limitation that they only work in objects of classes that derive from QObject.

回答 1

我一直这样:

class Event(list):
    """Event subscription.

    A list of callable objects. Calling an instance of this will cause a
    call to each item in the list in ascending order by index.

    Example Usage:
    >>> def f(x):
    ...     print 'f(%s)' % x
    >>> def g(x):
    ...     print 'g(%s)' % x
    >>> e = Event()
    >>> e()
    >>> e.append(f)
    >>> e(123)
    f(123)
    >>> e.remove(f)
    >>> e()
    >>> e += (f, g)
    >>> e(10)
    f(10)
    g(10)
    >>> del e[0]
    >>> e(2)
    g(2)

    """
    def __call__(self, *args, **kwargs):
        for f in self:
            f(*args, **kwargs)

    def __repr__(self):
        return "Event(%s)" % list.__repr__(self)

但是,就像我看到的所有其他内容一样,没有为此自动生成的pydoc,也没有签名,这确实很糟糕。

I’ve been doing it this way:

class Event(list):
    """Event subscription.

    A list of callable objects. Calling an instance of this will cause a
    call to each item in the list in ascending order by index.

    Example Usage:
    >>> def f(x):
    ...     print 'f(%s)' % x
    >>> def g(x):
    ...     print 'g(%s)' % x
    >>> e = Event()
    >>> e()
    >>> e.append(f)
    >>> e(123)
    f(123)
    >>> e.remove(f)
    >>> e()
    >>> e += (f, g)
    >>> e(10)
    f(10)
    g(10)
    >>> del e[0]
    >>> e(2)
    g(2)

    """
    def __call__(self, *args, **kwargs):
        for f in self:
            f(*args, **kwargs)

    def __repr__(self):
        return "Event(%s)" % list.__repr__(self)

However, like with everything else I’ve seen, there is no auto generated pydoc for this, and no signatures, which really sucks.


回答 2

我们使用Michael Foord在他的Event Pattern中建议的EventHook :

只需使用以下命令将EventHooks添加到您的类中:

class MyBroadcaster()
    def __init__():
        self.onChange = EventHook()

theBroadcaster = MyBroadcaster()

# add a listener to the event
theBroadcaster.onChange += myFunction

# remove listener from the event
theBroadcaster.onChange -= myFunction

# fire event
theBroadcaster.onChange.fire()

我们添加了将所有侦听器从一个对象删除到Michaels类的功能,并最终得到了以下结果:

class EventHook(object):

    def __init__(self):
        self.__handlers = []

    def __iadd__(self, handler):
        self.__handlers.append(handler)
        return self

    def __isub__(self, handler):
        self.__handlers.remove(handler)
        return self

    def fire(self, *args, **keywargs):
        for handler in self.__handlers:
            handler(*args, **keywargs)

    def clearObjectHandlers(self, inObject):
        for theHandler in self.__handlers:
            if theHandler.im_self == inObject:
                self -= theHandler

We use an EventHook as suggested from Michael Foord in his Event Pattern:

Just add EventHooks to your classes with:

class MyBroadcaster()
    def __init__():
        self.onChange = EventHook()

theBroadcaster = MyBroadcaster()

# add a listener to the event
theBroadcaster.onChange += myFunction

# remove listener from the event
theBroadcaster.onChange -= myFunction

# fire event
theBroadcaster.onChange.fire()

We add the functionality to remove all listener from an object to Michaels class and ended up with this:

class EventHook(object):

    def __init__(self):
        self.__handlers = []

    def __iadd__(self, handler):
        self.__handlers.append(handler)
        return self

    def __isub__(self, handler):
        self.__handlers.remove(handler)
        return self

    def fire(self, *args, **keywargs):
        for handler in self.__handlers:
            handler(*args, **keywargs)

    def clearObjectHandlers(self, inObject):
        for theHandler in self.__handlers:
            if theHandler.im_self == inObject:
                self -= theHandler

回答 3

我使用zope.event。这是您可以想象的最裸露的骨头。:-)实际上,这是完整的源代码:

subscribers = []

def notify(event):
    for subscriber in subscribers:
        subscriber(event)

请注意,例如,您不能在进程之间发送消息。它不是消息系统,只是事件系统,仅此而已。

I use zope.event. It’s the most bare bones you can imagine. :-) In fact, here is the complete source code:

subscribers = []

def notify(event):
    for subscriber in subscribers:
        subscriber(event)

Note that you can’t send messages between processes, for example. It’s not a messaging system, just an event system, nothing more, nothing less.


回答 4

我在“ 有价值的类”中找到了这个小脚本。我似乎拥有正确的简单性/功率比。Peter Thatcher是以下代码的作者(未提及许可)。

class Event:
    def __init__(self):
        self.handlers = set()

    def handle(self, handler):
        self.handlers.add(handler)
        return self

    def unhandle(self, handler):
        try:
            self.handlers.remove(handler)
        except:
            raise ValueError("Handler is not handling this event, so cannot unhandle it.")
        return self

    def fire(self, *args, **kargs):
        for handler in self.handlers:
            handler(*args, **kargs)

    def getHandlerCount(self):
        return len(self.handlers)

    __iadd__ = handle
    __isub__ = unhandle
    __call__ = fire
    __len__  = getHandlerCount

class MockFileWatcher:
    def __init__(self):
        self.fileChanged = Event()

    def watchFiles(self):
        source_path = "foo"
        self.fileChanged(source_path)

def log_file_change(source_path):
    print "%r changed." % (source_path,)

def log_file_change2(source_path):
    print "%r changed!" % (source_path,)

watcher              = MockFileWatcher()
watcher.fileChanged += log_file_change2
watcher.fileChanged += log_file_change
watcher.fileChanged -= log_file_change2
watcher.watchFiles()

I found this small script on Valued Lessons. It seems to have just the right simplicity/power ratio I’m after. Peter Thatcher is the author of following code (no licensing is mentioned).

class Event:
    def __init__(self):
        self.handlers = set()

    def handle(self, handler):
        self.handlers.add(handler)
        return self

    def unhandle(self, handler):
        try:
            self.handlers.remove(handler)
        except:
            raise ValueError("Handler is not handling this event, so cannot unhandle it.")
        return self

    def fire(self, *args, **kargs):
        for handler in self.handlers:
            handler(*args, **kargs)

    def getHandlerCount(self):
        return len(self.handlers)

    __iadd__ = handle
    __isub__ = unhandle
    __call__ = fire
    __len__  = getHandlerCount

class MockFileWatcher:
    def __init__(self):
        self.fileChanged = Event()

    def watchFiles(self):
        source_path = "foo"
        self.fileChanged(source_path)

def log_file_change(source_path):
    print "%r changed." % (source_path,)

def log_file_change2(source_path):
    print "%r changed!" % (source_path,)

watcher              = MockFileWatcher()
watcher.fileChanged += log_file_change2
watcher.fileChanged += log_file_change
watcher.fileChanged -= log_file_change2
watcher.watchFiles()

回答 5

这是一个应该可以正常工作的最小设计。您要做的就是简单地Observer在一个类中继承,然后使用observe(event_name, callback_fn)它侦听特定事件。每当该特定事件在代码中的任意位置(即Event('USB connected'))触发时,就会触发相应的回调。

class Observer():
    _observers = []
    def __init__(self):
        self._observers.append(self)
        self._observed_events = []
    def observe(self, event_name, callback_fn):
        self._observed_events.append({'event_name' : event_name, 'callback_fn' : callback_fn})


class Event():
    def __init__(self, event_name, *callback_args):
        for observer in Observer._observers:
            for observable in observer._observed_events:
                if observable['event_name'] == event_name:
                    observable['callback_fn'](*callback_args)

例:

class Room(Observer):
    def __init__(self):
        print("Room is ready.")
        Observer.__init__(self) # DON'T FORGET THIS
    def someone_arrived(self, who):
        print(who + " has arrived!")

# Observe for specific event
room = Room()
room.observe('someone arrived',  room.someone_arrived)

# Fire some events
Event('someone left',    'John')
Event('someone arrived', 'Lenard') # will output "Lenard has arrived!"
Event('someone Farted',  'Lenard')

Here is a minimal design that should work fine. What you have to do is to simply inherit Observer in a class and afterwards use observe(event_name, callback_fn) to listen for a specific event. Whenever that specific event is fired anywhere in the code (ie. Event('USB connected')), the corresponding callback will fire.

class Observer():
    _observers = []
    def __init__(self):
        self._observers.append(self)
        self._observed_events = []
    def observe(self, event_name, callback_fn):
        self._observed_events.append({'event_name' : event_name, 'callback_fn' : callback_fn})


class Event():
    def __init__(self, event_name, *callback_args):
        for observer in Observer._observers:
            for observable in observer._observed_events:
                if observable['event_name'] == event_name:
                    observable['callback_fn'](*callback_args)

Example:

class Room(Observer):
    def __init__(self):
        print("Room is ready.")
        Observer.__init__(self) # DON'T FORGET THIS
    def someone_arrived(self, who):
        print(who + " has arrived!")

# Observe for specific event
room = Room()
room.observe('someone arrived',  room.someone_arrived)

# Fire some events
Event('someone left',    'John')
Event('someone arrived', 'Lenard') # will output "Lenard has arrived!"
Event('someone Farted',  'Lenard')

回答 6

我创建了一个EventManager类(最后是代码)。语法如下:

#Create an event with no listeners assigned to it
EventManager.addEvent( eventName = [] )

#Create an event with listeners assigned to it
EventManager.addEvent( eventName = [fun1, fun2,...] )

#Create any number event with listeners assigned to them
EventManager.addEvent( eventName1 = [e1fun1, e1fun2,...], eventName2 = [e2fun1, e2fun2,...], ... )

#Add or remove listener to an existing event
EventManager.eventName += extra_fun
EventManager.eventName -= removed_fun

#Delete an event
del EventManager.eventName

#Fire the event
EventManager.eventName()

这是一个例子:

def hello(name):
    print "Hello {}".format(name)
    
def greetings(name):
    print "Greetings {}".format(name)

EventManager.addEvent( salute = [greetings] )
EventManager.salute += hello

print "\nInitial salute"
EventManager.salute('Oscar')

print "\nNow remove greetings"
EventManager.salute -= greetings
EventManager.salute('Oscar')

输出:

最初的致敬
问候奥斯卡
你好奥斯卡

现在删除问候
你好奥斯卡

EventManger代码:

class EventManager:
    
    class Event:
        def __init__(self,functions):
            if type(functions) is not list:
                raise ValueError("functions parameter has to be a list")
            self.functions = functions
            
        def __iadd__(self,func):
            self.functions.append(func)
            return self
            
        def __isub__(self,func):
            self.functions.remove(func)
            return self
            
        def __call__(self,*args,**kvargs):
            for func in self.functions : func(*args,**kvargs)
            
    @classmethod
    def addEvent(cls,**kvargs):
        """
        addEvent( event1 = [f1,f2,...], event2 = [g1,g2,...], ... )
        creates events using **kvargs to create any number of events. Each event recieves a list of functions,
        where every function in the list recieves the same parameters.
        
        Example:
        
        def hello(): print "Hello ",
        def world(): print "World"
        
        EventManager.addEvent( salute = [hello] )
        EventManager.salute += world
        
        EventManager.salute()
        
        Output:
        Hello World
        """
        for key in kvargs.keys():
            if type(kvargs[key]) is not list:
                raise ValueError("value has to be a list")
            else:
                kvargs[key] = cls.Event(kvargs[key])
        
        cls.__dict__.update(kvargs)

I created an EventManager class (code at the end). The syntax is the following:

#Create an event with no listeners assigned to it
EventManager.addEvent( eventName = [] )

#Create an event with listeners assigned to it
EventManager.addEvent( eventName = [fun1, fun2,...] )

#Create any number event with listeners assigned to them
EventManager.addEvent( eventName1 = [e1fun1, e1fun2,...], eventName2 = [e2fun1, e2fun2,...], ... )

#Add or remove listener to an existing event
EventManager.eventName += extra_fun
EventManager.eventName -= removed_fun

#Delete an event
del EventManager.eventName

#Fire the event
EventManager.eventName()

Here is an Example:

def hello(name):
    print "Hello {}".format(name)
    
def greetings(name):
    print "Greetings {}".format(name)

EventManager.addEvent( salute = [greetings] )
EventManager.salute += hello

print "\nInitial salute"
EventManager.salute('Oscar')

print "\nNow remove greetings"
EventManager.salute -= greetings
EventManager.salute('Oscar')

Output:

Initial salute
Greetings Oscar
Hello Oscar

Now remove greetings
Hello Oscar

EventManger Code:

class EventManager:
    
    class Event:
        def __init__(self,functions):
            if type(functions) is not list:
                raise ValueError("functions parameter has to be a list")
            self.functions = functions
            
        def __iadd__(self,func):
            self.functions.append(func)
            return self
            
        def __isub__(self,func):
            self.functions.remove(func)
            return self
            
        def __call__(self,*args,**kvargs):
            for func in self.functions : func(*args,**kvargs)
            
    @classmethod
    def addEvent(cls,**kvargs):
        """
        addEvent( event1 = [f1,f2,...], event2 = [g1,g2,...], ... )
        creates events using **kvargs to create any number of events. Each event recieves a list of functions,
        where every function in the list recieves the same parameters.
        
        Example:
        
        def hello(): print "Hello ",
        def world(): print "World"
        
        EventManager.addEvent( salute = [hello] )
        EventManager.salute += world
        
        EventManager.salute()
        
        Output:
        Hello World
        """
        for key in kvargs.keys():
            if type(kvargs[key]) is not list:
                raise ValueError("value has to be a list")
            else:
                kvargs[key] = cls.Event(kvargs[key])
        
        cls.__dict__.update(kvargs)

回答 7

您可以看看pymitterpypi)。它是一种小的单文件(〜250 loc)方法,“提供命名空间,通配符和TTL”。

这是一个基本示例:

from pymitter import EventEmitter

ee = EventEmitter()

# decorator usage
@ee.on("myevent")
def handler1(arg):
   print "handler1 called with", arg

# callback usage
def handler2(arg):
    print "handler2 called with", arg
ee.on("myotherevent", handler2)

# emit
ee.emit("myevent", "foo")
# -> "handler1 called with foo"

ee.emit("myotherevent", "bar")
# -> "handler2 called with bar"

You may have a look at pymitter (pypi). Its a small single-file (~250 loc) approach “providing namespaces, wildcards and TTL”.

Here’s a basic example:

from pymitter import EventEmitter

ee = EventEmitter()

# decorator usage
@ee.on("myevent")
def handler1(arg):
   print "handler1 called with", arg

# callback usage
def handler2(arg):
    print "handler2 called with", arg
ee.on("myotherevent", handler2)

# emit
ee.emit("myevent", "foo")
# -> "handler1 called with foo"

ee.emit("myotherevent", "bar")
# -> "handler2 called with bar"

回答 8

我对Longpoke的简约方法进行了改进,该方法还确保了被叫者和调用方法的签名:

class EventHook(object):
    '''
    A simple implementation of the Observer-Pattern.
    The user can specify an event signature upon inizializazion,
    defined by kwargs in the form of argumentname=class (e.g. id=int).
    The arguments' types are not checked in this implementation though.
    Callables with a fitting signature can be added with += or removed with -=.
    All listeners can be notified by calling the EventHook class with fitting
    arguments.

    >>> event = EventHook(id=int, data=dict)
    >>> event += lambda id, data: print("%d %s" % (id, data))
    >>> event(id=5, data={"foo": "bar"})
    5 {'foo': 'bar'}

    >>> event = EventHook(id=int)
    >>> event += lambda wrong_name: None
    Traceback (most recent call last):
        ...
    ValueError: Listener must have these arguments: (id=int)

    >>> event = EventHook(id=int)
    >>> event += lambda id: None
    >>> event(wrong_name=0)
    Traceback (most recent call last):
        ...
    ValueError: This EventHook must be called with these arguments: (id=int)
    '''
    def __init__(self, **signature):
        self._signature = signature
        self._argnames = set(signature.keys())
        self._handlers = []

    def _kwargs_str(self):
        return ", ".join(k+"="+v.__name__ for k, v in self._signature.items())

    def __iadd__(self, handler):
        params = inspect.signature(handler).parameters
        valid = True
        argnames = set(n for n in params.keys())
        if argnames != self._argnames:
            valid = False
        for p in params.values():
            if p.kind == p.VAR_KEYWORD:
                valid = True
                break
            if p.kind not in (p.POSITIONAL_OR_KEYWORD, p.KEYWORD_ONLY):
                valid = False
                break
        if not valid:
            raise ValueError("Listener must have these arguments: (%s)"
                             % self._kwargs_str())
        self._handlers.append(handler)
        return self

    def __isub__(self, handler):
        self._handlers.remove(handler)
        return self

    def __call__(self, *args, **kwargs):
        if args or set(kwargs.keys()) != self._argnames:
            raise ValueError("This EventHook must be called with these " +
                             "keyword arguments: (%s)" % self._kwargs_str())
        for handler in self._handlers[:]:
            handler(**kwargs)

    def __repr__(self):
        return "EventHook(%s)" % self._kwargs_str()

I made a variation of Longpoke’s minimalistic approach that also ensures the signatures for both callees and callers:

class EventHook(object):
    '''
    A simple implementation of the Observer-Pattern.
    The user can specify an event signature upon inizializazion,
    defined by kwargs in the form of argumentname=class (e.g. id=int).
    The arguments' types are not checked in this implementation though.
    Callables with a fitting signature can be added with += or removed with -=.
    All listeners can be notified by calling the EventHook class with fitting
    arguments.

    >>> event = EventHook(id=int, data=dict)
    >>> event += lambda id, data: print("%d %s" % (id, data))
    >>> event(id=5, data={"foo": "bar"})
    5 {'foo': 'bar'}

    >>> event = EventHook(id=int)
    >>> event += lambda wrong_name: None
    Traceback (most recent call last):
        ...
    ValueError: Listener must have these arguments: (id=int)

    >>> event = EventHook(id=int)
    >>> event += lambda id: None
    >>> event(wrong_name=0)
    Traceback (most recent call last):
        ...
    ValueError: This EventHook must be called with these arguments: (id=int)
    '''
    def __init__(self, **signature):
        self._signature = signature
        self._argnames = set(signature.keys())
        self._handlers = []

    def _kwargs_str(self):
        return ", ".join(k+"="+v.__name__ for k, v in self._signature.items())

    def __iadd__(self, handler):
        params = inspect.signature(handler).parameters
        valid = True
        argnames = set(n for n in params.keys())
        if argnames != self._argnames:
            valid = False
        for p in params.values():
            if p.kind == p.VAR_KEYWORD:
                valid = True
                break
            if p.kind not in (p.POSITIONAL_OR_KEYWORD, p.KEYWORD_ONLY):
                valid = False
                break
        if not valid:
            raise ValueError("Listener must have these arguments: (%s)"
                             % self._kwargs_str())
        self._handlers.append(handler)
        return self

    def __isub__(self, handler):
        self._handlers.remove(handler)
        return self

    def __call__(self, *args, **kwargs):
        if args or set(kwargs.keys()) != self._argnames:
            raise ValueError("This EventHook must be called with these " +
                             "keyword arguments: (%s)" % self._kwargs_str())
        for handler in self._handlers[:]:
            handler(**kwargs)

    def __repr__(self):
        return "EventHook(%s)" % self._kwargs_str()

回答 9

如果我在pyQt中编写代码,则使用QT套接字/信号范例,对于Django也是一样

如果我正在执行异步I / O,请使用本机选择模块

如果我使用SAX python解析器,则使用的是SAX提供的事件API。所以看起来我是底层API的受害者:-)

也许您应该问自己,您对事件框架/模块有什么期望。我个人的喜好是使用QT的套接字/信号范例。有关更多信息,请参见此处

If I do code in pyQt I use QT sockets/signals paradigm, same is for django

If I’m doing async I/O I use native select module

If I’m usign a SAX python parser I’m using event API provided by SAX. So it looks like I’m victim of underlying API :-)

Maybe you should ask yourself what do you expect from event framework/module. My personal preference is to use Socket/Signal paradigm from QT. more info about that can be found here


回答 10

这是另一个需要考虑的模块。对于要求更高的应用程序,这似乎是一个可行的选择。

Py-notify是一个Python软件包,提供用于实现Observer编程模式的工具。这些工具包括信号,条件和变量。

信号是发出信号时调用的处理程序列表。条件基本上是布尔变量,加上条件状态更改时发出的信号。可以使用标准逻辑运算符(非和等)将它们组合为复合条件。与条件不同,变量可以保存任何Python对象,不仅是布尔值,而且不能将它们组合在一起。

Here’s another module for consideration. It seems a viable choice for more demanding applications.

Py-notify is a Python package providing tools for implementing Observer programming pattern. These tools include signals, conditions and variables.

Signals are lists of handlers that are called when signal is emitted. Conditions are basically boolean variables coupled with a signal that is emitted when condition state changes. They can be combined using standard logical operators (not, and, etc.) into compound conditions. Variables, unlike conditions, can hold any Python object, not just booleans, but they cannot be combined.


回答 11

如果您想做更复杂的事情,例如合并事件或重试,则可以使用Observable模式和一个成熟的库来实现。https://github.com/ReactiveX/RxPY。可观察变量在Javascript和Java中非常常见,对于某些异步任务非常方便使用。

from rx import Observable, Observer


def push_five_strings(observer):
        observer.on_next("Alpha")
        observer.on_next("Beta")
        observer.on_next("Gamma")
        observer.on_next("Delta")
        observer.on_next("Epsilon")
        observer.on_completed()


class PrintObserver(Observer):

    def on_next(self, value):
        print("Received {0}".format(value))

    def on_completed(self):
        print("Done!")

    def on_error(self, error):
        print("Error Occurred: {0}".format(error))

source = Observable.create(push_five_strings)

source.subscribe(PrintObserver())

输出

Received Alpha
Received Beta
Received Gamma
Received Delta
Received Epsilon
Done!

If you wanted to do more complicated things like merging events or retry you can use the Observable pattern and a mature library that implements that. https://github.com/ReactiveX/RxPY . Observables are very common in Javascript and Java and very convenient to use for some async tasks.

from rx import Observable, Observer


def push_five_strings(observer):
        observer.on_next("Alpha")
        observer.on_next("Beta")
        observer.on_next("Gamma")
        observer.on_next("Delta")
        observer.on_next("Epsilon")
        observer.on_completed()


class PrintObserver(Observer):

    def on_next(self, value):
        print("Received {0}".format(value))

    def on_completed(self):
        print("Done!")

    def on_error(self, error):
        print("Error Occurred: {0}".format(error))

source = Observable.create(push_five_strings)

source.subscribe(PrintObserver())

OUTPUT:

Received Alpha
Received Beta
Received Gamma
Received Delta
Received Epsilon
Done!

回答 12

如果您需要跨流程或网络边界运行的事件总线,则可以尝试PyMQ。它目前支持发布/订阅,消息队列和同步RPC。默认版本在Redis后端上运行,因此您需要运行的Redis服务器。还有一个用于测试的内存后端。您也可以编写自己的后端。

import pymq

# common code
class MyEvent:
    pass

# subscribe code
@pymq.subscriber
def on_event(event: MyEvent):
    print('event received')

# publisher code
pymq.publish(MyEvent())

# you can also customize channels
pymq.subscribe(on_event, channel='my_channel')
pymq.publish(MyEvent(), channel='my_channel')

要初始化系统:

from pymq.provider.redis import RedisConfig

# starts a new thread with a Redis event loop
pymq.init(RedisConfig())

# main application control loop

pymq.shutdown()

免责声明:我是这个图书馆的作者

If you need an eventbus that works across process or network boundaries you can try PyMQ. It currently supports pub/sub, message queues and synchronous RPC. The default version works on top of a Redis backend, so you need a running Redis server. There is also an in-memory backend for testing. You can also write your own backend.

import pymq

# common code
class MyEvent:
    pass

# subscribe code
@pymq.subscriber
def on_event(event: MyEvent):
    print('event received')

# publisher code
pymq.publish(MyEvent())

# you can also customize channels
pymq.subscribe(on_event, channel='my_channel')
pymq.publish(MyEvent(), channel='my_channel')

To initialize the system:

from pymq.provider.redis import RedisConfig

# starts a new thread with a Redis event loop
pymq.init(RedisConfig())

# main application control loop

pymq.shutdown()

Disclaimer: I am the author of this library


回答 13

您可以尝试buslane模块。

该库使基于消息的系统的实现更加容易。它支持命令(单个处理程序)和事件(0或多个处理程序)方法。Buslane使用Python类型注释正确注册处理程序。

简单的例子:

from dataclasses import dataclass

from buslane.commands import Command, CommandHandler, CommandBus


@dataclass(frozen=True)
class RegisterUserCommand(Command):
    email: str
    password: str


class RegisterUserCommandHandler(CommandHandler[RegisterUserCommand]):

    def handle(self, command: RegisterUserCommand) -> None:
        assert command == RegisterUserCommand(
            email='john@lennon.com',
            password='secret',
        )


command_bus = CommandBus()
command_bus.register(handler=RegisterUserCommandHandler())
command_bus.execute(command=RegisterUserCommand(
    email='john@lennon.com',
    password='secret',
))

要安装buslane,只需使用pip:

$ pip install buslane

You can try buslane module.

This library makes implementation of message-based system easier. It supports commands (single handler) and events (0 or multiple handlers) approach. Buslane uses Python type annotations to properly register handler.

Simple example:

from dataclasses import dataclass

from buslane.commands import Command, CommandHandler, CommandBus


@dataclass(frozen=True)
class RegisterUserCommand(Command):
    email: str
    password: str


class RegisterUserCommandHandler(CommandHandler[RegisterUserCommand]):

    def handle(self, command: RegisterUserCommand) -> None:
        assert command == RegisterUserCommand(
            email='john@lennon.com',
            password='secret',
        )


command_bus = CommandBus()
command_bus.register(handler=RegisterUserCommandHandler())
command_bus.execute(command=RegisterUserCommand(
    email='john@lennon.com',
    password='secret',
))

To install buslane, simply use pip:

$ pip install buslane

回答 14

前段时间,我编写了可能对您有用的库。它允许您拥有本地和全局侦听器,多种不同的注册方式,执行优先级等。

from pyeventdispatcher import register

register("foo.bar", lambda event: print("second"))
register("foo.bar", lambda event: print("first "), -100)

dispatch(Event("foo.bar", {"id": 1}))
# first second

看看pyeventdispatcher

Some time ago I’ve wrote library that might be useful for you. It allows you to have local and global listeners, multiple different ways of registering them, execution priority and so on.

from pyeventdispatcher import register

register("foo.bar", lambda event: print("second"))
register("foo.bar", lambda event: print("first "), -100)

dispatch(Event("foo.bar", {"id": 1}))
# first second

Have a look pyeventdispatcher