标签归档:events

如何在Tkinter中处理窗口关闭事件?

问题:如何在Tkinter中处理窗口关闭事件?

如何在Python Tkinter程序中处理窗口关闭事件(用户单击“ X”按钮)?

How do I handle the window close event (user clicking the ‘X’ button) in a Python Tkinter program?


回答 0

Tkinter支持一种称为协议处理程序的机制。在这里,术语协议是指应用程序和窗口管理器之间的交互。最常用的协议称为WM_DELETE_WINDOW,用于定义当用户使用窗口管理器显式关闭窗口时发生的情况。

您可以使用该protocol方法为该协议安装处理程序(窗口小部件必须是TkToplevel窗口小部件):

这里有一个具体的例子:

import tkinter as tk
from tkinter import messagebox

root = tk.Tk()

def on_closing():
    if messagebox.askokcancel("Quit", "Do you want to quit?"):
        root.destroy()

root.protocol("WM_DELETE_WINDOW", on_closing)
root.mainloop()

Tkinter supports a mechanism called protocol handlers. Here, the term protocol refers to the interaction between the application and the window manager. The most commonly used protocol is called WM_DELETE_WINDOW, and is used to define what happens when the user explicitly closes a window using the window manager.

You can use the protocol method to install a handler for this protocol (the widget must be a Tk or Toplevel widget):

Here you have a concrete example:

import tkinter as tk
from tkinter import messagebox

root = tk.Tk()

def on_closing():
    if messagebox.askokcancel("Quit", "Do you want to quit?"):
        root.destroy()

root.protocol("WM_DELETE_WINDOW", on_closing)
root.mainloop()

回答 1

马特展示了关闭按钮的一种经典修改。
另一种是使关闭按钮最小化窗口。
您可以通过将iconify方法
作为协议方法的第二个参数来重现此行为。

这是一个在Windows 7和10上测试过的有效示例:

# Python 3
import tkinter
import tkinter.scrolledtext as scrolledtext

root = tkinter.Tk()
# make the top right close button minimize (iconify) the main window
root.protocol("WM_DELETE_WINDOW", root.iconify)
# make Esc exit the program
root.bind('<Escape>', lambda e: root.destroy())

# create a menu bar with an Exit command
menubar = tkinter.Menu(root)
filemenu = tkinter.Menu(menubar, tearoff=0)
filemenu.add_command(label="Exit", command=root.destroy)
menubar.add_cascade(label="File", menu=filemenu)
root.config(menu=menubar)

# create a Text widget with a Scrollbar attached
txt = scrolledtext.ScrolledText(root, undo=True)
txt['font'] = ('consolas', '12')
txt.pack(expand=True, fill='both')

root.mainloop()

在此示例中,我们为用户提供了两个新的退出选项:
经典的File→Exit,以及Esc按钮。

Matt has shown one classic modification of the close button.
The other is to have the close button minimize the window.
You can reproduced this behavior by having the iconify method
be the protocol method’s second argument.

Here’s a working example, tested on Windows 7 & 10:

# Python 3
import tkinter
import tkinter.scrolledtext as scrolledtext

root = tkinter.Tk()
# make the top right close button minimize (iconify) the main window
root.protocol("WM_DELETE_WINDOW", root.iconify)
# make Esc exit the program
root.bind('<Escape>', lambda e: root.destroy())

# create a menu bar with an Exit command
menubar = tkinter.Menu(root)
filemenu = tkinter.Menu(menubar, tearoff=0)
filemenu.add_command(label="Exit", command=root.destroy)
menubar.add_cascade(label="File", menu=filemenu)
root.config(menu=menubar)

# create a Text widget with a Scrollbar attached
txt = scrolledtext.ScrolledText(root, undo=True)
txt['font'] = ('consolas', '12')
txt.pack(expand=True, fill='both')

root.mainloop()

In this example we give the user two new exit options:
the classic File → Exit, and also the Esc button.


回答 2

取决于Tkinter活动,尤其是在使用Tkinter.after时,使用destroy()-即使通过使用protocol(),按钮等停止该活动也会干扰该活动(“执行时出错”),而不仅仅是终止它。在几乎每种情况下,最好的解决方案是使用标志。这是一个简单,愚蠢的用法示例(尽管我敢肯定,你们大多数人都不需要它!

from Tkinter import *

def close_window():
  global running
  running = False  # turn off while loop
  print( "Window closed")

root = Tk()
root.protocol("WM_DELETE_WINDOW", close_window)
cv = Canvas(root, width=200, height=200)
cv.pack()

running = True;
# This is an endless loop stopped only by setting 'running' to 'False'
while running: 
  for i in range(200): 
    if not running: 
        break
    cv.create_oval(i, i, i+1, i+1)
    root.update() 

这样可以很好地终止图形活动。您只需要running在正确的位置检查即可。

Depending on the Tkinter activity, and especially when using Tkinter.after, stopping this activity with destroy() — even by using protocol(), a button, etc. — will disturb this activity (“while executing” error) rather than just terminate it. The best solution in almost every case is to use a flag. Here is a simple, silly example of how to use it (although I am certain that most of you don’t need it! :)

from Tkinter import *

def close_window():
  global running
  running = False  # turn off while loop
  print( "Window closed")

root = Tk()
root.protocol("WM_DELETE_WINDOW", close_window)
cv = Canvas(root, width=200, height=200)
cv.pack()

running = True;
# This is an endless loop stopped only by setting 'running' to 'False'
while running: 
  for i in range(200): 
    if not running: 
        break
    cv.create_oval(i, i, i+1, i+1)
    root.update() 

This terminates graphics activity nicely. You only need to check running at the right place(s).


回答 3

我要感谢Apostolos的回答,这一点引起了我的注意。这是2019年Python 3的更加详细的示例,带有更清晰的描述和示例代码。


注意以下事实destroy():(或完全没有自定义窗口关闭处理程序)将在用户关闭窗口时立即销毁窗口及其所有正在运行的回调

这可能对您不利,具体取决于您当前的Tkinter活动,尤其是在使用tkinter.after(定期回调)时。您可能正在使用处理一些数据并将其写入磁盘的回调…在这种情况下,您显然希望数据写入完成而不会被突然终止。

最好的解决方案是使用标志。因此,当用户请求关闭窗口时,可以将其标记为一个标志,然后对其进行响应。

(注意:我通常将GUI设计为封装良好的类和单独的工作线程,并且我绝对不使用“全局”(我改用类实例变量),但这只是一个简单的示例,用于演示当用户关闭窗口时,Tk如何突然终止您的定期回调…)

from tkinter import *
import time

# Try setting this to False and look at the printed numbers (1 to 10)
# during the work-loop, if you close the window while the periodic_call
# worker is busy working (printing). It will abruptly end the numbers,
# and kill the periodic callback! That's why you should design most
# applications with a safe closing callback as described in this demo.
safe_closing = True

# ---------

busy_processing = False
close_requested = False

def close_window():
    global close_requested
    close_requested = True
    print("User requested close at:", time.time(), "Was busy processing:", busy_processing)

root = Tk()
if safe_closing:
    root.protocol("WM_DELETE_WINDOW", close_window)
lbl = Label(root)
lbl.pack()

def periodic_call():
    global busy_processing

    if not close_requested:
        busy_processing = True
        for i in range(10):
            print((i+1), "of 10")
            time.sleep(0.2)
            lbl["text"] = str(time.time()) # Will error if force-closed.
            root.update() # Force redrawing since we change label multiple times in a row.
        busy_processing = False
        root.after(500, periodic_call)
    else:
        print("Destroying GUI at:", time.time())
        try: # "destroy()" can throw, so you should wrap it like this.
            root.destroy()
        except:
            # NOTE: In most code, you'll wanna force a close here via
            # "exit" if the window failed to destroy. Just ensure that
            # you have no code after your `mainloop()` call (at the
            # bottom of this file), since the exit call will cause the
            # process to terminate immediately without running any more
            # code. Of course, you should NEVER have code after your
            # `mainloop()` call in well-designed code anyway...
            # exit(0)
            pass

root.after_idle(periodic_call)
root.mainloop()

此代码将向您显示WM_DELETE_WINDOW即使在periodic_call()工作/循环中间我们的自定义繁忙时,处理程序也会运行!

我们使用一些相当夸张的.after()值:500毫秒。这只是为了使它很容易让你看到关闭的区别,而周期性呼叫占线,或不…如果关闭,而数字更新,你会看到WM_DELETE_WINDOW发生了,而你的定期电话“是忙处理:是”。如果您在数字暂停时关闭(这意味着此时不处理定期回调),则会看到关闭发生在“不忙”期间。

在实际使用中,您.after()将使用30到100毫秒左右的时间来获得响应式GUI。这只是一个演示,可以帮助您了解如何保护自己免受Tk的默认“关闭时立即中断所有工作”的行为。

总结:让WM_DELETE_WINDOW处理程序设置一个标志,然后.destroy()在安全的情况下(当您的应用程序完成所有工作时)定期并手动检查该标志。

PS:您还可以使用WM_DELETE_WINDOW询问用户是否真的要关闭的窗口; 如果他们回答“否”,则无需设置标志。非常简单 您只需要在自己的信箱中显示一个消息框,WM_DELETE_WINDOW然后根据用户的答案设置标志即可。

I’d like to thank the answer by Apostolos for bringing this to my attention. Here’s a much more detailed example for Python 3 in the year 2019, with a clearer description and example code.


Beware of the fact that destroy() (or not having a custom window closing handler at all) will destroy the window and all of its running callbacks instantly when the user closes it.

This can be bad for you, depending on your current Tkinter activity, and especially when using tkinter.after (periodic callbacks). You might be using a callback which processes some data and writes to disk… in that case, you obviously want the data writing to finish without being abruptly killed.

The best solution for that is to use a flag. So when the user requests window closing, you mark that as a flag, and then react to it.

(Note: I normally design GUIs as nicely encapsulated classes and separate worker threads, and I definitely don’t use “global” (I use class instance variables instead), but this is meant to be a simple, stripped-down example to demonstrate how Tk abruptly kills your periodic callbacks when the user closes the window…)

from tkinter import *
import time

# Try setting this to False and look at the printed numbers (1 to 10)
# during the work-loop, if you close the window while the periodic_call
# worker is busy working (printing). It will abruptly end the numbers,
# and kill the periodic callback! That's why you should design most
# applications with a safe closing callback as described in this demo.
safe_closing = True

# ---------

busy_processing = False
close_requested = False

def close_window():
    global close_requested
    close_requested = True
    print("User requested close at:", time.time(), "Was busy processing:", busy_processing)

root = Tk()
if safe_closing:
    root.protocol("WM_DELETE_WINDOW", close_window)
lbl = Label(root)
lbl.pack()

def periodic_call():
    global busy_processing

    if not close_requested:
        busy_processing = True
        for i in range(10):
            print((i+1), "of 10")
            time.sleep(0.2)
            lbl["text"] = str(time.time()) # Will error if force-closed.
            root.update() # Force redrawing since we change label multiple times in a row.
        busy_processing = False
        root.after(500, periodic_call)
    else:
        print("Destroying GUI at:", time.time())
        try: # "destroy()" can throw, so you should wrap it like this.
            root.destroy()
        except:
            # NOTE: In most code, you'll wanna force a close here via
            # "exit" if the window failed to destroy. Just ensure that
            # you have no code after your `mainloop()` call (at the
            # bottom of this file), since the exit call will cause the
            # process to terminate immediately without running any more
            # code. Of course, you should NEVER have code after your
            # `mainloop()` call in well-designed code anyway...
            # exit(0)
            pass

root.after_idle(periodic_call)
root.mainloop()

This code will show you that the WM_DELETE_WINDOW handler runs even while our custom periodic_call() is busy in the middle of work/loops!

We use some pretty exaggerated .after() values: 500 milliseconds. This is just meant to make it very easy for you to see the difference between closing while the periodic call is busy, or not… If you close while the numbers are updating, you will see that the WM_DELETE_WINDOW happened while your periodic call “was busy processing: True”. If you close while the numbers are paused (meaning that the periodic callback isn’t processing at that moment), you see that the close happened while it’s “not busy”.

In real-world usage, your .after() would use something like 30-100 milliseconds, to have a responsive GUI. This is just a demonstration to help you understand how to protect yourself against Tk’s default “instantly interrupt all work when closing” behavior.

In summary: Make the WM_DELETE_WINDOW handler set a flag, and then check that flag periodically and manually .destroy() the window when it’s safe (when your app is done with all work).

PS: You can also use WM_DELETE_WINDOW to ask the user if they REALLY want to close the window; and if they answer no, you don’t set the flag. It’s very simple. You just show a messagebox in your WM_DELETE_WINDOW and set the flag based on the user’s answer.


回答 4

尝试简单版本:

import tkinter

window = Tk()

closebutton = Button(window, text='X', command=window.destroy)
closebutton.pack()

window.mainloop()

或者,如果您想添加更多命令:

import tkinter

window = Tk()


def close():
    window.destroy()
    #More Functions


closebutton = Button(window, text='X', command=close)
closebutton.pack()

window.mainloop()

Try The Simple Version:

import tkinter

window = Tk()

closebutton = Button(window, text='X', command=window.destroy)
closebutton.pack()

window.mainloop()

Or If You Want To Add More Commands:

import tkinter

window = Tk()


def close():
    window.destroy()
    #More Functions


closebutton = Button(window, text='X', command=close)
closebutton.pack()

window.mainloop()

回答 5

from tkinter import*
root=Tk()
exit_button=Button(root,text="X",command=root.quit)
root.mainloop()
from tkinter import*
root=Tk()
exit_button=Button(root,text="X",command=root.quit)
root.mainloop()

您如何在Tkinter的事件循环中运行自己的代码?

问题:您如何在Tkinter的事件循环中运行自己的代码?

我的弟弟刚刚开始​​编程,对于他的Science Fair项目,他正在模拟天空中一群鸟。他得到了大部分他写的代码,它工作得很好,但鸟儿需要移动的每一刻

但是,Tkinter浪费了自己的事件循环的时间,因此他的代码无法运行。这样root.mainloop()运行,运行,并保持运行,并且它运行的唯一事情是事件处理程序。

有没有一种方法可以让他的代码与mainloop一起运行(没有多线程,这很令人困惑,应该保持简单),如果这样,那是什么?

现在,他想出了一个丑陋的方法,将其move()功能绑定到<b1-motion>,这样,只要按住按钮并摇动鼠标,它就可以工作。但是必须有一个更好的方法。

My little brother is just getting into programming, and for his Science Fair project, he’s doing a simulation of a flock of birds in the sky. He’s gotten most of his code written, and it works nicely, but the birds need to move every moment.

Tkinter, however, hogs the time for its own event loop, and so his code won’t run. Doing root.mainloop() runs, runs, and keeps running, and the only thing it runs is the event handlers.

Is there a way to have his code run alongside the mainloop (without multithreading, it’s confusing and this should be kept simple), and if so, what is it?

Right now, he came up with an ugly hack, tying his move() function to <b1-motion>, so that as long as he holds the button down and wiggles the mouse, it works. But there’s got to be a better way.


回答 0

afterTk对象上使用方法:

from tkinter import *

root = Tk()

def task():
    print("hello")
    root.after(2000, task)  # reschedule event in 2 seconds

root.after(2000, task)
root.mainloop()

这是该after方法的声明和文档:

def after(self, ms, func=None, *args):
    """Call function once after given time.

    MS specifies the time in milliseconds. FUNC gives the
    function which shall be called. Additional parameters
    are given as parameters to the function call.  Return
    identifier to cancel scheduling with after_cancel."""

Use the after method on the Tk object:

from tkinter import *

root = Tk()

def task():
    print("hello")
    root.after(2000, task)  # reschedule event in 2 seconds

root.after(2000, task)
root.mainloop()

Here’s the declaration and documentation for the after method:

def after(self, ms, func=None, *args):
    """Call function once after given time.

    MS specifies the time in milliseconds. FUNC gives the
    function which shall be called. Additional parameters
    are given as parameters to the function call.  Return
    identifier to cancel scheduling with after_cancel."""

回答 1

Bjorn发布解决方案在我的计算机(RedHat Enterprise 5,python 2.6.1)上显示“ RuntimeError:从不同的公寓呼叫Tcl”消息。Bjorn可能没有得到此消息,因为据我检查过的一个地方,使用Tkinter处理线程是不可预测的且依赖于平台。

问题似乎是可以将其app.start()视为对Tk的引用,因为app包含Tk元素。我通过替换app.start()self.start()inside来解决此问题__init__。我也这样做了,以便所有Tk引用都在调用mainloop()函数内,或者在调用的函数所调用的函数内mainloop()(这对于避免“不同单元”错误很关键)。

最后,我添加了带有回调的协议处理程序,因为如果没有此程序,当用户关闭Tk窗口时,程序将退出并出现错误。

修改后的代码如下:

# Run tkinter code in another thread

import tkinter as tk
import threading

class App(threading.Thread):

    def __init__(self):
        threading.Thread.__init__(self)
        self.start()

    def callback(self):
        self.root.quit()

    def run(self):
        self.root = tk.Tk()
        self.root.protocol("WM_DELETE_WINDOW", self.callback)

        label = tk.Label(self.root, text="Hello World")
        label.pack()

        self.root.mainloop()


app = App()
print('Now we can continue running code while mainloop runs!')

for i in range(100000):
    print(i)

The solution posted by Bjorn results in a “RuntimeError: Calling Tcl from different appartment” message on my computer (RedHat Enterprise 5, python 2.6.1). Bjorn might not have gotten this message, since, according to one place I checked, mishandling threading with Tkinter is unpredictable and platform-dependent.

The problem seems to be that app.start() counts as a reference to Tk, since app contains Tk elements. I fixed this by replacing app.start() with a self.start() inside __init__. I also made it so that all Tk references are either inside the function that calls mainloop() or are inside functions that are called by the function that calls mainloop() (this is apparently critical to avoid the “different apartment” error).

Finally, I added a protocol handler with a callback, since without this the program exits with an error when the Tk window is closed by the user.

The revised code is as follows:

# Run tkinter code in another thread

import tkinter as tk
import threading

class App(threading.Thread):

    def __init__(self):
        threading.Thread.__init__(self)
        self.start()

    def callback(self):
        self.root.quit()

    def run(self):
        self.root = tk.Tk()
        self.root.protocol("WM_DELETE_WINDOW", self.callback)

        label = tk.Label(self.root, text="Hello World")
        label.pack()

        self.root.mainloop()


app = App()
print('Now we can continue running code while mainloop runs!')

for i in range(100000):
    print(i)

回答 2

在编写自己的循环时,就像在模拟中一样(我假设),您需要调用执行update功能的函数mainloop:使用所做的更改来更新窗口,但是您需要在循环中进行操作。

def task():
   # do something
   root.update()

while 1:
   task()  

When writing your own loop, as in the simulation (I assume), you need to call the update function which does what the mainloop does: updates the window with your changes, but you do it in your loop.

def task():
   # do something
   root.update()

while 1:
   task()  

回答 3

另一个选择是让tkinter在单独的线程上执行。一种方法是这样的:

import Tkinter
import threading

class MyTkApp(threading.Thread):
    def __init__(self):
        self.root=Tkinter.Tk()
        self.s = Tkinter.StringVar()
        self.s.set('Foo')
        l = Tkinter.Label(self.root,textvariable=self.s)
        l.pack()
        threading.Thread.__init__(self)

    def run(self):
        self.root.mainloop()


app = MyTkApp()
app.start()

# Now the app should be running and the value shown on the label
# can be changed by changing the member variable s.
# Like this:
# app.s.set('Bar')

但是要小心,多线程编程很难,而且很容易使自己陷入困境。例如,当您更改上面的示例类的成员变量时,请务必小心,以免打断Tkinter的事件循环。

Another option is to let tkinter execute on a separate thread. One way of doing it is like this:

import Tkinter
import threading

class MyTkApp(threading.Thread):
    def __init__(self):
        self.root=Tkinter.Tk()
        self.s = Tkinter.StringVar()
        self.s.set('Foo')
        l = Tkinter.Label(self.root,textvariable=self.s)
        l.pack()
        threading.Thread.__init__(self)

    def run(self):
        self.root.mainloop()


app = MyTkApp()
app.start()

# Now the app should be running and the value shown on the label
# can be changed by changing the member variable s.
# Like this:
# app.s.set('Bar')

Be careful though, multithreaded programming is hard and it is really easy to shoot your self in the foot. For example you have to be careful when you change member variables of the sample class above so you don’t interrupt with the event loop of Tkinter.


回答 4

这是GPS读取器和数据演示器的第一个工作版本。tkinter是一件非常脆弱的事情,错误消息很少。它不会塞满东西,也无法说明为什么要花费很多时间。一个好的WYSIWYG表单开发者非常困难。无论如何,这将运行一个小的例程,每秒10次,并在表格上显示信息。花了一段时间才能实现。当我尝试将计时器值设置为0时,表单就永远不会出现。我的头现在好痛!每秒10次或更多次对我来说足够了。我希望它可以帮助其他人。迈克·莫罗

import tkinter as tk
import time

def GetDateTime():
  # Get current date and time in ISO8601
  # https://en.wikipedia.org/wiki/ISO_8601 
  # https://xkcd.com/1179/
  return (time.strftime("%Y%m%d", time.gmtime()),
          time.strftime("%H%M%S", time.gmtime()),
          time.strftime("%Y%m%d", time.localtime()),
          time.strftime("%H%M%S", time.localtime()))

class Application(tk.Frame):

  def __init__(self, master):

    fontsize = 12
    textwidth = 9

    tk.Frame.__init__(self, master)
    self.pack()

    tk.Label(self, font=('Helvetica', fontsize), bg = '#be004e', fg = 'white', width = textwidth,
             text='Local Time').grid(row=0, column=0)
    self.LocalDate = tk.StringVar()
    self.LocalDate.set('waiting...')
    tk.Label(self, font=('Helvetica', fontsize), bg = '#be004e', fg = 'white', width = textwidth,
             textvariable=self.LocalDate).grid(row=0, column=1)

    tk.Label(self, font=('Helvetica', fontsize), bg = '#be004e', fg = 'white', width = textwidth,
             text='Local Date').grid(row=1, column=0)
    self.LocalTime = tk.StringVar()
    self.LocalTime.set('waiting...')
    tk.Label(self, font=('Helvetica', fontsize), bg = '#be004e', fg = 'white', width = textwidth,
             textvariable=self.LocalTime).grid(row=1, column=1)

    tk.Label(self, font=('Helvetica', fontsize), bg = '#40CCC0', fg = 'white', width = textwidth,
             text='GMT Time').grid(row=2, column=0)
    self.nowGdate = tk.StringVar()
    self.nowGdate.set('waiting...')
    tk.Label(self, font=('Helvetica', fontsize), bg = '#40CCC0', fg = 'white', width = textwidth,
             textvariable=self.nowGdate).grid(row=2, column=1)

    tk.Label(self, font=('Helvetica', fontsize), bg = '#40CCC0', fg = 'white', width = textwidth,
             text='GMT Date').grid(row=3, column=0)
    self.nowGtime = tk.StringVar()
    self.nowGtime.set('waiting...')
    tk.Label(self, font=('Helvetica', fontsize), bg = '#40CCC0', fg = 'white', width = textwidth,
             textvariable=self.nowGtime).grid(row=3, column=1)

    tk.Button(self, text='Exit', width = 10, bg = '#FF8080', command=root.destroy).grid(row=4, columnspan=2)

    self.gettime()
  pass

  def gettime(self):
    gdt, gtm, ldt, ltm = GetDateTime()
    gdt = gdt[0:4] + '/' + gdt[4:6] + '/' + gdt[6:8]
    gtm = gtm[0:2] + ':' + gtm[2:4] + ':' + gtm[4:6] + ' Z'  
    ldt = ldt[0:4] + '/' + ldt[4:6] + '/' + ldt[6:8]
    ltm = ltm[0:2] + ':' + ltm[2:4] + ':' + ltm[4:6]  
    self.nowGtime.set(gdt)
    self.nowGdate.set(gtm)
    self.LocalTime.set(ldt)
    self.LocalDate.set(ltm)

    self.after(100, self.gettime)
   #print (ltm)  # Prove it is running this and the external code, too.
  pass

root = tk.Tk()
root.wm_title('Temp Converter')
app = Application(master=root)

w = 200 # width for the Tk root
h = 125 # height for the Tk root

# get display screen width and height
ws = root.winfo_screenwidth()  # width of the screen
hs = root.winfo_screenheight() # height of the screen

# calculate x and y coordinates for positioning the Tk root window

#centered
#x = (ws/2) - (w/2)
#y = (hs/2) - (h/2)

#right bottom corner (misfires in Win10 putting it too low. OK in Ubuntu)
x = ws - w
y = hs - h - 35  # -35 fixes it, more or less, for Win10

#set the dimensions of the screen and where it is placed
root.geometry('%dx%d+%d+%d' % (w, h, x, y))

root.mainloop()

This is the first working version of what will be a GPS reader and data presenter. tkinter is a very fragile thing with way too few error messages. It does not put stuff up and does not tell why much of the time. Very difficult coming from a good WYSIWYG form developer. Anyway, this runs a small routine 10 times a second and presents the information on a form. Took a while to make it happen. When I tried a timer value of 0, the form never came up. My head now hurts! 10 or more times per second is good enough for me. I hope it helps someone else. Mike Morrow

import tkinter as tk
import time

def GetDateTime():
  # Get current date and time in ISO8601
  # https://en.wikipedia.org/wiki/ISO_8601 
  # https://xkcd.com/1179/
  return (time.strftime("%Y%m%d", time.gmtime()),
          time.strftime("%H%M%S", time.gmtime()),
          time.strftime("%Y%m%d", time.localtime()),
          time.strftime("%H%M%S", time.localtime()))

class Application(tk.Frame):

  def __init__(self, master):

    fontsize = 12
    textwidth = 9

    tk.Frame.__init__(self, master)
    self.pack()

    tk.Label(self, font=('Helvetica', fontsize), bg = '#be004e', fg = 'white', width = textwidth,
             text='Local Time').grid(row=0, column=0)
    self.LocalDate = tk.StringVar()
    self.LocalDate.set('waiting...')
    tk.Label(self, font=('Helvetica', fontsize), bg = '#be004e', fg = 'white', width = textwidth,
             textvariable=self.LocalDate).grid(row=0, column=1)

    tk.Label(self, font=('Helvetica', fontsize), bg = '#be004e', fg = 'white', width = textwidth,
             text='Local Date').grid(row=1, column=0)
    self.LocalTime = tk.StringVar()
    self.LocalTime.set('waiting...')
    tk.Label(self, font=('Helvetica', fontsize), bg = '#be004e', fg = 'white', width = textwidth,
             textvariable=self.LocalTime).grid(row=1, column=1)

    tk.Label(self, font=('Helvetica', fontsize), bg = '#40CCC0', fg = 'white', width = textwidth,
             text='GMT Time').grid(row=2, column=0)
    self.nowGdate = tk.StringVar()
    self.nowGdate.set('waiting...')
    tk.Label(self, font=('Helvetica', fontsize), bg = '#40CCC0', fg = 'white', width = textwidth,
             textvariable=self.nowGdate).grid(row=2, column=1)

    tk.Label(self, font=('Helvetica', fontsize), bg = '#40CCC0', fg = 'white', width = textwidth,
             text='GMT Date').grid(row=3, column=0)
    self.nowGtime = tk.StringVar()
    self.nowGtime.set('waiting...')
    tk.Label(self, font=('Helvetica', fontsize), bg = '#40CCC0', fg = 'white', width = textwidth,
             textvariable=self.nowGtime).grid(row=3, column=1)

    tk.Button(self, text='Exit', width = 10, bg = '#FF8080', command=root.destroy).grid(row=4, columnspan=2)

    self.gettime()
  pass

  def gettime(self):
    gdt, gtm, ldt, ltm = GetDateTime()
    gdt = gdt[0:4] + '/' + gdt[4:6] + '/' + gdt[6:8]
    gtm = gtm[0:2] + ':' + gtm[2:4] + ':' + gtm[4:6] + ' Z'  
    ldt = ldt[0:4] + '/' + ldt[4:6] + '/' + ldt[6:8]
    ltm = ltm[0:2] + ':' + ltm[2:4] + ':' + ltm[4:6]  
    self.nowGtime.set(gdt)
    self.nowGdate.set(gtm)
    self.LocalTime.set(ldt)
    self.LocalDate.set(ltm)

    self.after(100, self.gettime)
   #print (ltm)  # Prove it is running this and the external code, too.
  pass

root = tk.Tk()
root.wm_title('Temp Converter')
app = Application(master=root)

w = 200 # width for the Tk root
h = 125 # height for the Tk root

# get display screen width and height
ws = root.winfo_screenwidth()  # width of the screen
hs = root.winfo_screenheight() # height of the screen

# calculate x and y coordinates for positioning the Tk root window

#centered
#x = (ws/2) - (w/2)
#y = (hs/2) - (h/2)

#right bottom corner (misfires in Win10 putting it too low. OK in Ubuntu)
x = ws - w
y = hs - h - 35  # -35 fixes it, more or less, for Win10

#set the dimensions of the screen and where it is placed
root.geometry('%dx%d+%d+%d' % (w, h, x, y))

root.mainloop()

Python中的事件系统

问题:Python中的事件系统

您使用哪个Python事件系统?我已经知道pydispatcher,但是我想知道还能找到什么或常用的东西?

我对大型框架中的事件管理器不感兴趣,我宁愿使用可以轻松扩展的小型准系统解决方案。

What event system for Python do you use? I’m already aware of pydispatcher, but I was wondering what else can be found, or is commonly used?

I’m not interested in event managers that are part of large frameworks, I’d rather use a small bare-bones solution that I can easily extend.


回答 0

PyPI软件包

截至2020年6月,这些都是PyPI上与事件相关的软件包,按最新发布日期订购。

还有更多

使用非常不同的术语(事件,信号,处理程序,方法分派,hook等),可以选择很多库。

我试图对上述软件包以及此处答案中提到的技术进行概述。

首先,一些术语…

观察者模式

事件系统最基本的样式是“处理程序方法包”,它是Observer模式的简单实现

基本上,处理程序方法(可调用函数)存储在数组中,并在事件“触发”时分别调用。

发布-订阅

Observer事件系统的缺点是只能在实际的Event对象(或处理程序列表)上注册处理程序。因此,在注册时该事件已经需要存在。

这就是存在事件系统第二种样式的原因: 发布-订阅模式。在这里,处理程序不在事件对象(或处理程序列表)上注册,而是在中央调度程序上注册。通知者也仅与调度程序对话。侦听或发布的内容由“信号”确定,仅不过是名称(字符串)。

中介者模式

可能也很有趣:中介者模式

钩子

在应用程序插件的上下文中通常使用“挂钩”系统。该应用程序包含固定的集成点(挂钩),并且每个插件都可以连接到该挂钩并执行某些操作。

其他“事件”

注意:从上述意义上讲,threading.Event不是“事件系统”。这是一个线程同步系统,其中一个线程等待,直到另一个线程“标记” Event对象。

网络消息传递库也经常使用术语“事件”。有时这些在概念上是相似的;有时不是。它们当然可以遍历线程,进程和计算机的边界。参见例如 pyzmqpymqTwistedTornadogeventeventlet

参考不足

在Python中,持有对方法或对象的引用可确保不会被垃圾收集器删除。这可能是理想的,但它也可能导致内存泄漏:永远不会清除链接的处理程序。

一些事件系统使用弱引用而不是常规引用来解决此问题。

关于各种库的一些话

观察者式事件系统:

  • zope.event显示了其工作原理的基本内容(请参阅Lennart的答案)。注意:此示例甚至不支持处理程序参数。
  • LongPoke的“可调用列表”实现表明,可以通过子类化非常简单地实现这样的事件系统list
  • Felk的变体EventHook还可以确保被调用方法和调用方法的签名。
  • spassig的EventHook(Michael Foord的事件模式)是一个简单的实现。
  • Josip的Valued Lessons Event类基本上是相同的,但是使用a set而不是a list来存储包,并且实现__call__都是合理的补充。
  • PyNotify在概念上相似,还提供了变量和条件(“变量更改事件”)的其他概念。主页不起作用。
  • axel基本上是一个处理程序包,具有更多与线程,错误处理,…相关的功能。
  • python-dispatch需要偶数源类从派生pydispatch.Dispatcher
  • buslane是基于类的,支持单个或多个处理程序,并有助于扩展类型提示。
  • Pithikos的Observer / Event是轻巧的设计。

发布-订阅库:

  • 方向指示灯具有一些漂亮的功能,例如自动断开连接和基于发件人的过滤。
  • PyPubSub是一个稳定的软件包,并承诺“有助于调试和维护主题和消息的高级功能”。
  • pymitter是Node.js EventEmitter2的Python端口,并提供命名空间,通配符和TTL。
  • PyDispatcher似乎在多对多出版物等方面强调灵活性。支持弱引用。
  • 路易是一个经过改进的PyDispatcher,并应努力“在各种各样的环境中”。
  • pypydispatcher基于(您猜对了……)PyDispatcher,并且也可以在PyPy中使用。
  • django.dispatch是重写的PyDispatcher,“界面更有限,但性能更高”。
  • pyeventdispatcher基于PHP的Symfony框架的event-dispatcher。
  • 调度程序是从django.dispatch中提取的,但是已经相当老了。
  • Cristian Garcia的EventManger是一个非常短的实现。

其他:

  • Pluggy包含一个pytest插件使用的挂钩系统。
  • RxPy3实现了Observable模式,并允许合并事件,重试等。
  • Qt的信号和插槽可从PyQtPySide2获得。当在同一线程中使用时,它们可用作回调,或在两个不同线程之间用作事件(使用事件循环)。信号和插槽的局限性是它们仅在从派生的类的对象中工作QObject

PyPI packages

As of June 2020, these are the event-related packages available on PyPI, ordered by most recent release date.

There’s more

That’s a lot of libraries to choose from, using very different terminology (events, signals, handlers, method dispatch, hooks, …).

I’m trying to keep an overview of the above packages, plus the techniques mentioned in the answers here.

First, some terminology…

Observer pattern

The most basic style of event system is the ‘bag of handler methods’, which is a simple implementation of the Observer pattern.

Basically, the handler methods (callables) are stored in an array and are each called when the event ‘fires’.

Publish-Subscribe

The disadvantage of Observer event systems is that you can only register the handlers on the actual Event object (or handlers list). So at registration time the event already needs to exist.

That’s why the second style of event systems exists: the publish-subscribe pattern. Here, the handlers don’t register on an event object (or handler list), but on a central dispatcher. Also the notifiers only talk to the dispatcher. What to listen for, or what to publish is determined by ‘signal’, which is nothing more than a name (string).

Mediator pattern

Might be of interest as well: the Mediator pattern.

Hooks

A ‘hook’ system is usally used in the context of application plugins. The application contains fixed integration points (hooks), and each plugin may connect to that hook and perform certain actions.

Other ‘events’

Note: threading.Event is not an ‘event system’ in the above sense. It’s a thread synchronization system where one thread waits until another thread ‘signals’ the Event object.

Network messaging libraries often use the term ‘events’ too; sometimes these are similar in concept; sometimes not. They can of course traverse thread-, process- and computer boundaries. See e.g. pyzmq, pymq, Twisted, Tornado, gevent, eventlet.

Weak references

In Python, holding a reference to a method or object ensures that it won’t get deleted by the garbage collector. This can be desirable, but it can also lead to memory leaks: the linked handlers are never cleaned up.

Some event systems use weak references instead of regular ones to solve this.

Some words about the various libraries

Observer-style event systems:

  • zope.event shows the bare bones of how this works (see Lennart’s answer). Note: this example does not even support handler arguments.
  • LongPoke’s ‘callable list’ implementation shows that such an event system can be implemented very minimalistically by subclassing list.
  • Felk’s variation EventHook also ensures the signatures of callees and callers.
  • spassig’s EventHook (Michael Foord’s Event Pattern) is a straightforward implementation.
  • Josip’s Valued Lessons Event class is basically the same, but uses a set instead of a list to store the bag, and implements __call__ which are both reasonable additions.
  • PyNotify is similar in concept and also provides additional concepts of variables and conditions (‘variable changed event’). Homepage is not functional.
  • axel is basically a bag-of-handlers with more features related to threading, error handling, …
  • python-dispatch requires the even source classes to derive from pydispatch.Dispatcher.
  • buslane is class-based, supports single- or multiple handlers and facilitates extensive type hints.
  • Pithikos’ Observer/Event is a lightweight design.

Publish-subscribe libraries:

  • blinker has some nifty features such as automatic disconnection and filtering based on sender.
  • PyPubSub is a stable package, and promises “advanced features that facilitate debugging and maintaining topics and messages”.
  • pymitter is a Python port of Node.js EventEmitter2 and offers namespaces, wildcards and TTL.
  • PyDispatcher seems to emphasize flexibility with regards to many-to-many publication etc. Supports weak references.
  • louie is a reworked PyDispatcher and should work “in a wide variety of contexts”.
  • pypydispatcher is based on (you guessed it…) PyDispatcher and also works in PyPy.
  • django.dispatch is a rewritten PyDispatcher “with a more limited interface, but higher performance”.
  • pyeventdispatcher is based on PHP’s Symfony framework’s event-dispatcher.
  • dispatcher was extracted from django.dispatch but is getting fairly old.
  • Cristian Garcia’s EventManger is a really short implementation.

Others:

  • pluggy contains a hook system which is used by pytest plugins.
  • RxPy3 implements the Observable pattern and allows merging events, retry etc.
  • Qt’s Signals and Slots are available from PyQt or PySide2. They work as callback when used in the same thread, or as events (using an event loop) between two different threads. Signals and Slots have the limitation that they only work in objects of classes that derive from QObject.

回答 1

我一直这样:

class Event(list):
    """Event subscription.

    A list of callable objects. Calling an instance of this will cause a
    call to each item in the list in ascending order by index.

    Example Usage:
    >>> def f(x):
    ...     print 'f(%s)' % x
    >>> def g(x):
    ...     print 'g(%s)' % x
    >>> e = Event()
    >>> e()
    >>> e.append(f)
    >>> e(123)
    f(123)
    >>> e.remove(f)
    >>> e()
    >>> e += (f, g)
    >>> e(10)
    f(10)
    g(10)
    >>> del e[0]
    >>> e(2)
    g(2)

    """
    def __call__(self, *args, **kwargs):
        for f in self:
            f(*args, **kwargs)

    def __repr__(self):
        return "Event(%s)" % list.__repr__(self)

但是,就像我看到的所有其他内容一样,没有为此自动生成的pydoc,也没有签名,这确实很糟糕。

I’ve been doing it this way:

class Event(list):
    """Event subscription.

    A list of callable objects. Calling an instance of this will cause a
    call to each item in the list in ascending order by index.

    Example Usage:
    >>> def f(x):
    ...     print 'f(%s)' % x
    >>> def g(x):
    ...     print 'g(%s)' % x
    >>> e = Event()
    >>> e()
    >>> e.append(f)
    >>> e(123)
    f(123)
    >>> e.remove(f)
    >>> e()
    >>> e += (f, g)
    >>> e(10)
    f(10)
    g(10)
    >>> del e[0]
    >>> e(2)
    g(2)

    """
    def __call__(self, *args, **kwargs):
        for f in self:
            f(*args, **kwargs)

    def __repr__(self):
        return "Event(%s)" % list.__repr__(self)

However, like with everything else I’ve seen, there is no auto generated pydoc for this, and no signatures, which really sucks.


回答 2

我们使用Michael Foord在他的Event Pattern中建议的EventHook :

只需使用以下命令将EventHooks添加到您的类中:

class MyBroadcaster()
    def __init__():
        self.onChange = EventHook()

theBroadcaster = MyBroadcaster()

# add a listener to the event
theBroadcaster.onChange += myFunction

# remove listener from the event
theBroadcaster.onChange -= myFunction

# fire event
theBroadcaster.onChange.fire()

我们添加了将所有侦听器从一个对象删除到Michaels类的功能,并最终得到了以下结果:

class EventHook(object):

    def __init__(self):
        self.__handlers = []

    def __iadd__(self, handler):
        self.__handlers.append(handler)
        return self

    def __isub__(self, handler):
        self.__handlers.remove(handler)
        return self

    def fire(self, *args, **keywargs):
        for handler in self.__handlers:
            handler(*args, **keywargs)

    def clearObjectHandlers(self, inObject):
        for theHandler in self.__handlers:
            if theHandler.im_self == inObject:
                self -= theHandler

We use an EventHook as suggested from Michael Foord in his Event Pattern:

Just add EventHooks to your classes with:

class MyBroadcaster()
    def __init__():
        self.onChange = EventHook()

theBroadcaster = MyBroadcaster()

# add a listener to the event
theBroadcaster.onChange += myFunction

# remove listener from the event
theBroadcaster.onChange -= myFunction

# fire event
theBroadcaster.onChange.fire()

We add the functionality to remove all listener from an object to Michaels class and ended up with this:

class EventHook(object):

    def __init__(self):
        self.__handlers = []

    def __iadd__(self, handler):
        self.__handlers.append(handler)
        return self

    def __isub__(self, handler):
        self.__handlers.remove(handler)
        return self

    def fire(self, *args, **keywargs):
        for handler in self.__handlers:
            handler(*args, **keywargs)

    def clearObjectHandlers(self, inObject):
        for theHandler in self.__handlers:
            if theHandler.im_self == inObject:
                self -= theHandler

回答 3

我使用zope.event。这是您可以想象的最裸露的骨头。:-)实际上,这是完整的源代码:

subscribers = []

def notify(event):
    for subscriber in subscribers:
        subscriber(event)

请注意,例如,您不能在进程之间发送消息。它不是消息系统,只是事件系统,仅此而已。

I use zope.event. It’s the most bare bones you can imagine. :-) In fact, here is the complete source code:

subscribers = []

def notify(event):
    for subscriber in subscribers:
        subscriber(event)

Note that you can’t send messages between processes, for example. It’s not a messaging system, just an event system, nothing more, nothing less.


回答 4

我在“ 有价值的类”中找到了这个小脚本。我似乎拥有正确的简单性/功率比。Peter Thatcher是以下代码的作者(未提及许可)。

class Event:
    def __init__(self):
        self.handlers = set()

    def handle(self, handler):
        self.handlers.add(handler)
        return self

    def unhandle(self, handler):
        try:
            self.handlers.remove(handler)
        except:
            raise ValueError("Handler is not handling this event, so cannot unhandle it.")
        return self

    def fire(self, *args, **kargs):
        for handler in self.handlers:
            handler(*args, **kargs)

    def getHandlerCount(self):
        return len(self.handlers)

    __iadd__ = handle
    __isub__ = unhandle
    __call__ = fire
    __len__  = getHandlerCount

class MockFileWatcher:
    def __init__(self):
        self.fileChanged = Event()

    def watchFiles(self):
        source_path = "foo"
        self.fileChanged(source_path)

def log_file_change(source_path):
    print "%r changed." % (source_path,)

def log_file_change2(source_path):
    print "%r changed!" % (source_path,)

watcher              = MockFileWatcher()
watcher.fileChanged += log_file_change2
watcher.fileChanged += log_file_change
watcher.fileChanged -= log_file_change2
watcher.watchFiles()

I found this small script on Valued Lessons. It seems to have just the right simplicity/power ratio I’m after. Peter Thatcher is the author of following code (no licensing is mentioned).

class Event:
    def __init__(self):
        self.handlers = set()

    def handle(self, handler):
        self.handlers.add(handler)
        return self

    def unhandle(self, handler):
        try:
            self.handlers.remove(handler)
        except:
            raise ValueError("Handler is not handling this event, so cannot unhandle it.")
        return self

    def fire(self, *args, **kargs):
        for handler in self.handlers:
            handler(*args, **kargs)

    def getHandlerCount(self):
        return len(self.handlers)

    __iadd__ = handle
    __isub__ = unhandle
    __call__ = fire
    __len__  = getHandlerCount

class MockFileWatcher:
    def __init__(self):
        self.fileChanged = Event()

    def watchFiles(self):
        source_path = "foo"
        self.fileChanged(source_path)

def log_file_change(source_path):
    print "%r changed." % (source_path,)

def log_file_change2(source_path):
    print "%r changed!" % (source_path,)

watcher              = MockFileWatcher()
watcher.fileChanged += log_file_change2
watcher.fileChanged += log_file_change
watcher.fileChanged -= log_file_change2
watcher.watchFiles()

回答 5

这是一个应该可以正常工作的最小设计。您要做的就是简单地Observer在一个类中继承,然后使用observe(event_name, callback_fn)它侦听特定事件。每当该特定事件在代码中的任意位置(即Event('USB connected'))触发时,就会触发相应的回调。

class Observer():
    _observers = []
    def __init__(self):
        self._observers.append(self)
        self._observed_events = []
    def observe(self, event_name, callback_fn):
        self._observed_events.append({'event_name' : event_name, 'callback_fn' : callback_fn})


class Event():
    def __init__(self, event_name, *callback_args):
        for observer in Observer._observers:
            for observable in observer._observed_events:
                if observable['event_name'] == event_name:
                    observable['callback_fn'](*callback_args)

例:

class Room(Observer):
    def __init__(self):
        print("Room is ready.")
        Observer.__init__(self) # DON'T FORGET THIS
    def someone_arrived(self, who):
        print(who + " has arrived!")

# Observe for specific event
room = Room()
room.observe('someone arrived',  room.someone_arrived)

# Fire some events
Event('someone left',    'John')
Event('someone arrived', 'Lenard') # will output "Lenard has arrived!"
Event('someone Farted',  'Lenard')

Here is a minimal design that should work fine. What you have to do is to simply inherit Observer in a class and afterwards use observe(event_name, callback_fn) to listen for a specific event. Whenever that specific event is fired anywhere in the code (ie. Event('USB connected')), the corresponding callback will fire.

class Observer():
    _observers = []
    def __init__(self):
        self._observers.append(self)
        self._observed_events = []
    def observe(self, event_name, callback_fn):
        self._observed_events.append({'event_name' : event_name, 'callback_fn' : callback_fn})


class Event():
    def __init__(self, event_name, *callback_args):
        for observer in Observer._observers:
            for observable in observer._observed_events:
                if observable['event_name'] == event_name:
                    observable['callback_fn'](*callback_args)

Example:

class Room(Observer):
    def __init__(self):
        print("Room is ready.")
        Observer.__init__(self) # DON'T FORGET THIS
    def someone_arrived(self, who):
        print(who + " has arrived!")

# Observe for specific event
room = Room()
room.observe('someone arrived',  room.someone_arrived)

# Fire some events
Event('someone left',    'John')
Event('someone arrived', 'Lenard') # will output "Lenard has arrived!"
Event('someone Farted',  'Lenard')

回答 6

我创建了一个EventManager类(最后是代码)。语法如下:

#Create an event with no listeners assigned to it
EventManager.addEvent( eventName = [] )

#Create an event with listeners assigned to it
EventManager.addEvent( eventName = [fun1, fun2,...] )

#Create any number event with listeners assigned to them
EventManager.addEvent( eventName1 = [e1fun1, e1fun2,...], eventName2 = [e2fun1, e2fun2,...], ... )

#Add or remove listener to an existing event
EventManager.eventName += extra_fun
EventManager.eventName -= removed_fun

#Delete an event
del EventManager.eventName

#Fire the event
EventManager.eventName()

这是一个例子:

def hello(name):
    print "Hello {}".format(name)
    
def greetings(name):
    print "Greetings {}".format(name)

EventManager.addEvent( salute = [greetings] )
EventManager.salute += hello

print "\nInitial salute"
EventManager.salute('Oscar')

print "\nNow remove greetings"
EventManager.salute -= greetings
EventManager.salute('Oscar')

输出:

最初的致敬
问候奥斯卡
你好奥斯卡

现在删除问候
你好奥斯卡

EventManger代码:

class EventManager:
    
    class Event:
        def __init__(self,functions):
            if type(functions) is not list:
                raise ValueError("functions parameter has to be a list")
            self.functions = functions
            
        def __iadd__(self,func):
            self.functions.append(func)
            return self
            
        def __isub__(self,func):
            self.functions.remove(func)
            return self
            
        def __call__(self,*args,**kvargs):
            for func in self.functions : func(*args,**kvargs)
            
    @classmethod
    def addEvent(cls,**kvargs):
        """
        addEvent( event1 = [f1,f2,...], event2 = [g1,g2,...], ... )
        creates events using **kvargs to create any number of events. Each event recieves a list of functions,
        where every function in the list recieves the same parameters.
        
        Example:
        
        def hello(): print "Hello ",
        def world(): print "World"
        
        EventManager.addEvent( salute = [hello] )
        EventManager.salute += world
        
        EventManager.salute()
        
        Output:
        Hello World
        """
        for key in kvargs.keys():
            if type(kvargs[key]) is not list:
                raise ValueError("value has to be a list")
            else:
                kvargs[key] = cls.Event(kvargs[key])
        
        cls.__dict__.update(kvargs)

I created an EventManager class (code at the end). The syntax is the following:

#Create an event with no listeners assigned to it
EventManager.addEvent( eventName = [] )

#Create an event with listeners assigned to it
EventManager.addEvent( eventName = [fun1, fun2,...] )

#Create any number event with listeners assigned to them
EventManager.addEvent( eventName1 = [e1fun1, e1fun2,...], eventName2 = [e2fun1, e2fun2,...], ... )

#Add or remove listener to an existing event
EventManager.eventName += extra_fun
EventManager.eventName -= removed_fun

#Delete an event
del EventManager.eventName

#Fire the event
EventManager.eventName()

Here is an Example:

def hello(name):
    print "Hello {}".format(name)
    
def greetings(name):
    print "Greetings {}".format(name)

EventManager.addEvent( salute = [greetings] )
EventManager.salute += hello

print "\nInitial salute"
EventManager.salute('Oscar')

print "\nNow remove greetings"
EventManager.salute -= greetings
EventManager.salute('Oscar')

Output:

Initial salute
Greetings Oscar
Hello Oscar

Now remove greetings
Hello Oscar

EventManger Code:

class EventManager:
    
    class Event:
        def __init__(self,functions):
            if type(functions) is not list:
                raise ValueError("functions parameter has to be a list")
            self.functions = functions
            
        def __iadd__(self,func):
            self.functions.append(func)
            return self
            
        def __isub__(self,func):
            self.functions.remove(func)
            return self
            
        def __call__(self,*args,**kvargs):
            for func in self.functions : func(*args,**kvargs)
            
    @classmethod
    def addEvent(cls,**kvargs):
        """
        addEvent( event1 = [f1,f2,...], event2 = [g1,g2,...], ... )
        creates events using **kvargs to create any number of events. Each event recieves a list of functions,
        where every function in the list recieves the same parameters.
        
        Example:
        
        def hello(): print "Hello ",
        def world(): print "World"
        
        EventManager.addEvent( salute = [hello] )
        EventManager.salute += world
        
        EventManager.salute()
        
        Output:
        Hello World
        """
        for key in kvargs.keys():
            if type(kvargs[key]) is not list:
                raise ValueError("value has to be a list")
            else:
                kvargs[key] = cls.Event(kvargs[key])
        
        cls.__dict__.update(kvargs)

回答 7

您可以看看pymitterpypi)。它是一种小的单文件(〜250 loc)方法,“提供命名空间,通配符和TTL”。

这是一个基本示例:

from pymitter import EventEmitter

ee = EventEmitter()

# decorator usage
@ee.on("myevent")
def handler1(arg):
   print "handler1 called with", arg

# callback usage
def handler2(arg):
    print "handler2 called with", arg
ee.on("myotherevent", handler2)

# emit
ee.emit("myevent", "foo")
# -> "handler1 called with foo"

ee.emit("myotherevent", "bar")
# -> "handler2 called with bar"

You may have a look at pymitter (pypi). Its a small single-file (~250 loc) approach “providing namespaces, wildcards and TTL”.

Here’s a basic example:

from pymitter import EventEmitter

ee = EventEmitter()

# decorator usage
@ee.on("myevent")
def handler1(arg):
   print "handler1 called with", arg

# callback usage
def handler2(arg):
    print "handler2 called with", arg
ee.on("myotherevent", handler2)

# emit
ee.emit("myevent", "foo")
# -> "handler1 called with foo"

ee.emit("myotherevent", "bar")
# -> "handler2 called with bar"

回答 8

我对Longpoke的简约方法进行了改进,该方法还确保了被叫者和调用方法的签名:

class EventHook(object):
    '''
    A simple implementation of the Observer-Pattern.
    The user can specify an event signature upon inizializazion,
    defined by kwargs in the form of argumentname=class (e.g. id=int).
    The arguments' types are not checked in this implementation though.
    Callables with a fitting signature can be added with += or removed with -=.
    All listeners can be notified by calling the EventHook class with fitting
    arguments.

    >>> event = EventHook(id=int, data=dict)
    >>> event += lambda id, data: print("%d %s" % (id, data))
    >>> event(id=5, data={"foo": "bar"})
    5 {'foo': 'bar'}

    >>> event = EventHook(id=int)
    >>> event += lambda wrong_name: None
    Traceback (most recent call last):
        ...
    ValueError: Listener must have these arguments: (id=int)

    >>> event = EventHook(id=int)
    >>> event += lambda id: None
    >>> event(wrong_name=0)
    Traceback (most recent call last):
        ...
    ValueError: This EventHook must be called with these arguments: (id=int)
    '''
    def __init__(self, **signature):
        self._signature = signature
        self._argnames = set(signature.keys())
        self._handlers = []

    def _kwargs_str(self):
        return ", ".join(k+"="+v.__name__ for k, v in self._signature.items())

    def __iadd__(self, handler):
        params = inspect.signature(handler).parameters
        valid = True
        argnames = set(n for n in params.keys())
        if argnames != self._argnames:
            valid = False
        for p in params.values():
            if p.kind == p.VAR_KEYWORD:
                valid = True
                break
            if p.kind not in (p.POSITIONAL_OR_KEYWORD, p.KEYWORD_ONLY):
                valid = False
                break
        if not valid:
            raise ValueError("Listener must have these arguments: (%s)"
                             % self._kwargs_str())
        self._handlers.append(handler)
        return self

    def __isub__(self, handler):
        self._handlers.remove(handler)
        return self

    def __call__(self, *args, **kwargs):
        if args or set(kwargs.keys()) != self._argnames:
            raise ValueError("This EventHook must be called with these " +
                             "keyword arguments: (%s)" % self._kwargs_str())
        for handler in self._handlers[:]:
            handler(**kwargs)

    def __repr__(self):
        return "EventHook(%s)" % self._kwargs_str()

I made a variation of Longpoke’s minimalistic approach that also ensures the signatures for both callees and callers:

class EventHook(object):
    '''
    A simple implementation of the Observer-Pattern.
    The user can specify an event signature upon inizializazion,
    defined by kwargs in the form of argumentname=class (e.g. id=int).
    The arguments' types are not checked in this implementation though.
    Callables with a fitting signature can be added with += or removed with -=.
    All listeners can be notified by calling the EventHook class with fitting
    arguments.

    >>> event = EventHook(id=int, data=dict)
    >>> event += lambda id, data: print("%d %s" % (id, data))
    >>> event(id=5, data={"foo": "bar"})
    5 {'foo': 'bar'}

    >>> event = EventHook(id=int)
    >>> event += lambda wrong_name: None
    Traceback (most recent call last):
        ...
    ValueError: Listener must have these arguments: (id=int)

    >>> event = EventHook(id=int)
    >>> event += lambda id: None
    >>> event(wrong_name=0)
    Traceback (most recent call last):
        ...
    ValueError: This EventHook must be called with these arguments: (id=int)
    '''
    def __init__(self, **signature):
        self._signature = signature
        self._argnames = set(signature.keys())
        self._handlers = []

    def _kwargs_str(self):
        return ", ".join(k+"="+v.__name__ for k, v in self._signature.items())

    def __iadd__(self, handler):
        params = inspect.signature(handler).parameters
        valid = True
        argnames = set(n for n in params.keys())
        if argnames != self._argnames:
            valid = False
        for p in params.values():
            if p.kind == p.VAR_KEYWORD:
                valid = True
                break
            if p.kind not in (p.POSITIONAL_OR_KEYWORD, p.KEYWORD_ONLY):
                valid = False
                break
        if not valid:
            raise ValueError("Listener must have these arguments: (%s)"
                             % self._kwargs_str())
        self._handlers.append(handler)
        return self

    def __isub__(self, handler):
        self._handlers.remove(handler)
        return self

    def __call__(self, *args, **kwargs):
        if args or set(kwargs.keys()) != self._argnames:
            raise ValueError("This EventHook must be called with these " +
                             "keyword arguments: (%s)" % self._kwargs_str())
        for handler in self._handlers[:]:
            handler(**kwargs)

    def __repr__(self):
        return "EventHook(%s)" % self._kwargs_str()

回答 9

如果我在pyQt中编写代码,则使用QT套接字/信号范例,对于Django也是一样

如果我正在执行异步I / O,请使用本机选择模块

如果我使用SAX python解析器,则使用的是SAX提供的事件API。所以看起来我是底层API的受害者:-)

也许您应该问自己,您对事件框架/模块有什么期望。我个人的喜好是使用QT的套接字/信号范例。有关更多信息,请参见此处

If I do code in pyQt I use QT sockets/signals paradigm, same is for django

If I’m doing async I/O I use native select module

If I’m usign a SAX python parser I’m using event API provided by SAX. So it looks like I’m victim of underlying API :-)

Maybe you should ask yourself what do you expect from event framework/module. My personal preference is to use Socket/Signal paradigm from QT. more info about that can be found here


回答 10

这是另一个需要考虑的模块。对于要求更高的应用程序,这似乎是一个可行的选择。

Py-notify是一个Python软件包,提供用于实现Observer编程模式的工具。这些工具包括信号,条件和变量。

信号是发出信号时调用的处理程序列表。条件基本上是布尔变量,加上条件状态更改时发出的信号。可以使用标准逻辑运算符(非和等)将它们组合为复合条件。与条件不同,变量可以保存任何Python对象,不仅是布尔值,而且不能将它们组合在一起。

Here’s another module for consideration. It seems a viable choice for more demanding applications.

Py-notify is a Python package providing tools for implementing Observer programming pattern. These tools include signals, conditions and variables.

Signals are lists of handlers that are called when signal is emitted. Conditions are basically boolean variables coupled with a signal that is emitted when condition state changes. They can be combined using standard logical operators (not, and, etc.) into compound conditions. Variables, unlike conditions, can hold any Python object, not just booleans, but they cannot be combined.


回答 11

如果您想做更复杂的事情,例如合并事件或重试,则可以使用Observable模式和一个成熟的库来实现。https://github.com/ReactiveX/RxPY。可观察变量在Javascript和Java中非常常见,对于某些异步任务非常方便使用。

from rx import Observable, Observer


def push_five_strings(observer):
        observer.on_next("Alpha")
        observer.on_next("Beta")
        observer.on_next("Gamma")
        observer.on_next("Delta")
        observer.on_next("Epsilon")
        observer.on_completed()


class PrintObserver(Observer):

    def on_next(self, value):
        print("Received {0}".format(value))

    def on_completed(self):
        print("Done!")

    def on_error(self, error):
        print("Error Occurred: {0}".format(error))

source = Observable.create(push_five_strings)

source.subscribe(PrintObserver())

输出

Received Alpha
Received Beta
Received Gamma
Received Delta
Received Epsilon
Done!

If you wanted to do more complicated things like merging events or retry you can use the Observable pattern and a mature library that implements that. https://github.com/ReactiveX/RxPY . Observables are very common in Javascript and Java and very convenient to use for some async tasks.

from rx import Observable, Observer


def push_five_strings(observer):
        observer.on_next("Alpha")
        observer.on_next("Beta")
        observer.on_next("Gamma")
        observer.on_next("Delta")
        observer.on_next("Epsilon")
        observer.on_completed()


class PrintObserver(Observer):

    def on_next(self, value):
        print("Received {0}".format(value))

    def on_completed(self):
        print("Done!")

    def on_error(self, error):
        print("Error Occurred: {0}".format(error))

source = Observable.create(push_five_strings)

source.subscribe(PrintObserver())

OUTPUT:

Received Alpha
Received Beta
Received Gamma
Received Delta
Received Epsilon
Done!

回答 12

如果您需要跨流程或网络边界运行的事件总线,则可以尝试PyMQ。它目前支持发布/订阅,消息队列和同步RPC。默认版本在Redis后端上运行,因此您需要运行的Redis服务器。还有一个用于测试的内存后端。您也可以编写自己的后端。

import pymq

# common code
class MyEvent:
    pass

# subscribe code
@pymq.subscriber
def on_event(event: MyEvent):
    print('event received')

# publisher code
pymq.publish(MyEvent())

# you can also customize channels
pymq.subscribe(on_event, channel='my_channel')
pymq.publish(MyEvent(), channel='my_channel')

要初始化系统:

from pymq.provider.redis import RedisConfig

# starts a new thread with a Redis event loop
pymq.init(RedisConfig())

# main application control loop

pymq.shutdown()

免责声明:我是这个图书馆的作者

If you need an eventbus that works across process or network boundaries you can try PyMQ. It currently supports pub/sub, message queues and synchronous RPC. The default version works on top of a Redis backend, so you need a running Redis server. There is also an in-memory backend for testing. You can also write your own backend.

import pymq

# common code
class MyEvent:
    pass

# subscribe code
@pymq.subscriber
def on_event(event: MyEvent):
    print('event received')

# publisher code
pymq.publish(MyEvent())

# you can also customize channels
pymq.subscribe(on_event, channel='my_channel')
pymq.publish(MyEvent(), channel='my_channel')

To initialize the system:

from pymq.provider.redis import RedisConfig

# starts a new thread with a Redis event loop
pymq.init(RedisConfig())

# main application control loop

pymq.shutdown()

Disclaimer: I am the author of this library


回答 13

您可以尝试buslane模块。

该库使基于消息的系统的实现更加容易。它支持命令(单个处理程序)和事件(0或多个处理程序)方法。Buslane使用Python类型注释正确注册处理程序。

简单的例子:

from dataclasses import dataclass

from buslane.commands import Command, CommandHandler, CommandBus


@dataclass(frozen=True)
class RegisterUserCommand(Command):
    email: str
    password: str


class RegisterUserCommandHandler(CommandHandler[RegisterUserCommand]):

    def handle(self, command: RegisterUserCommand) -> None:
        assert command == RegisterUserCommand(
            email='john@lennon.com',
            password='secret',
        )


command_bus = CommandBus()
command_bus.register(handler=RegisterUserCommandHandler())
command_bus.execute(command=RegisterUserCommand(
    email='john@lennon.com',
    password='secret',
))

要安装buslane,只需使用pip:

$ pip install buslane

You can try buslane module.

This library makes implementation of message-based system easier. It supports commands (single handler) and events (0 or multiple handlers) approach. Buslane uses Python type annotations to properly register handler.

Simple example:

from dataclasses import dataclass

from buslane.commands import Command, CommandHandler, CommandBus


@dataclass(frozen=True)
class RegisterUserCommand(Command):
    email: str
    password: str


class RegisterUserCommandHandler(CommandHandler[RegisterUserCommand]):

    def handle(self, command: RegisterUserCommand) -> None:
        assert command == RegisterUserCommand(
            email='john@lennon.com',
            password='secret',
        )


command_bus = CommandBus()
command_bus.register(handler=RegisterUserCommandHandler())
command_bus.execute(command=RegisterUserCommand(
    email='john@lennon.com',
    password='secret',
))

To install buslane, simply use pip:

$ pip install buslane

回答 14

前段时间,我编写了可能对您有用的库。它允许您拥有本地和全局侦听器,多种不同的注册方式,执行优先级等。

from pyeventdispatcher import register

register("foo.bar", lambda event: print("second"))
register("foo.bar", lambda event: print("first "), -100)

dispatch(Event("foo.bar", {"id": 1}))
# first second

看看pyeventdispatcher

Some time ago I’ve wrote library that might be useful for you. It allows you to have local and global listeners, multiple different ways of registering them, execution priority and so on.

from pyeventdispatcher import register

register("foo.bar", lambda event: print("second"))
register("foo.bar", lambda event: print("first "), -100)

dispatch(Event("foo.bar", {"id": 1}))
# first second

Have a look pyeventdispatcher