Also, eine Antwort zu finden war komplizierter als ich dachte. Wie @MTset in einem der Kommentare erwähnt, bietet Python keine Möglichkeit, die Ausführung eines Threads abzubrechen. Also, was ich getan habe, war eine 'threadHandler'-Klasse zu erstellen, die nun Thread behandelt. Es verfolgt den letzten Thread, der erstellt wurde, und bietet eine Möglichkeit, das Ergebnis der Ausführung des letzten Threads zu erhalten.
Ich poste eine modifizierte Version des Testcodes aus dem ursprünglichen Beitrag sowie den threadHandler-Code in vollem Umfang, falls jemand dafür verwendet hat.
Datei 1 hier
# tester.py, run this file
from PyQt4 import QtGui, QtCore
import random, sys, time
from threadHandler import MyThreadHandler
class MyModel(object):
def dataCruncher(self, val):
delay = random.randint(1,5)
print('{} sleeping for {}'.format(val, delay))
time.sleep(delay) # takes a long time to process data using val
print('{} done sleeping'.format(val))
return val
class MainWindow(QtGui.QMainWindow):
def __init__(self, threadHandler):
self.app = QtGui.QApplication(sys.argv)
super(MainWindow, self).__init__()
self.count = 0
self.initUI()
self.button_clicked_events = Event()
self.threadHandler = threadHandler
def initUI(self):
# Layouts
central = QtGui.QWidget()
layout = QtGui.QVBoxLayout()
self.button = QtGui.QPushButton('Press Me')
self.text = QtGui.QLabel('?')
layout.addWidget(self.button)
layout.addWidget(self.text)
central.setLayout(layout)
self.setCentralWidget(central)
self.button.clicked.connect(self._buttonClicked)
def _buttonClicked(self):
self.count += 1
self.button_clicked_events(self.count)
def setLabel(self, val):
self.text.setText(str(val))
def startup(self):
self.show()
result = self.app.exec_()
return result
class Event(list):
"""Event subscription.
A list of callable objects. Calling an instance of this will cause a
call to each item in the list in ascending order by index.
Example Usage:
>>> def f(x):
... print 'f(%s)' % x
>>> def g(x):
... print 'g(%s)' % x
>>> e = Event()
>>> e()
>>> e.append(f)
>>> e(123)
f(123)
>>> e.remove(f)
>>> e()
>>> e += (f, g)
>>> e(10)
f(10)
g(10)
>>> del e[0]
>>> e(2)
g(2)
"""
def __init__(self):
self.output = {}
def __call__(self, *args, **kwargs):
for f,key in self:
output = f(*args, **kwargs)
self.output[key] = output
return self.output
def __repr__(self):
return "Event({})".format(list.__repr__(self))
if __name__ == '__main__':
def checker(handler, window):
if handler.isLastDone():
val = handler.getLastResult()
window.setLabel(val)
else:
window.setLabel('calculating...')
random.seed()
model = MyModel()
threadHandler = MyThreadHandler()
myWindow = MainWindow(threadHandler)
threadHandler.createTimer(1, checker, threadHandler, myWindow)
def getData(count):
threadHandler.createOneShot(model.dataCruncher, count)
myWindow.button_clicked_events.append((getData, 'dt'))
result = myWindow.startup()
print('ending')
threadHandler.end()
print('ended')
sys.exit(result)
Datei 2 unter
#threadHandler.py, save this file in the same folder as tester.py
import threading, time
class MyThreadHandler(object):
def __init__(self):
self.oneShots = []
self.timers = []
self.oldOneShots = []
self.latest = None
self.cleaning = False
self._startCleaner()
def _startCleaner(self):
print('-'*20+'Starting cleaner'+'-'*20)
self.cleaner = self.createTimer(1, self._cleanupThreads)
def _stopCleaner(self):
print('-'*20+'Stopping cleaner'+'-'*20)
self.cleaner.stop()
def getNumThreads(self):
return len(self.oneShots)
def getNumOldThreads(self):
return len(self.oldOneShots)
def end(self):
for i,timer in enumerate(self.timers):
timer.stop()
self.timers.pop(i)
def createTimer(self, interval, func, *args, **kwargs):
timer = myTimer(interval, func, args, kwargs)
self.timers.append(timer)
return timer
def createOneShot(self, func, *args, **kwargs):
oneshot = myOneShot(func, args, kwargs)
self.oneShots.append(oneshot)
self.latest = oneshot
def isLastDone(self):
if not self.latest is None:
return not self.latest.running()
else:
return None
def getLastResult(self):
if self.latest is None:
raise ValueError('There have not been any oneshots created.')
while self.latest.running():
pass
result = self.latest.getResult()
if len(self.oneShots) > 0:
self.oldOneShots.append(myOneShot(self._cleanAll, (self.oneShots,)))
self.oneShots = []
return result
def _cleanAll(self, toClean):
# loop through toClean and pop up anything that's done. this DOES lock
while len(toClean) > 0:
toClean = self._cleanup(toClean)
def _cleanup(self, toCleanup):
while not self.cleaning:
self.cleaning = True
for i, thread in enumerate(toCleanup):
if not thread.running():
toCleanup.pop(i)
self.cleaning = False
return toCleanup
def _cleanupThreads(self):
# check each of these lists and pop out any threads that are done. This
# does not lock. This function should really only be called by the
# cleaner, which is set up in __init__
self.oneShots = self._cleanup(self.oneShots)
self.timers = self._cleanup(self.timers)
self.oldOneShots = self._cleanup(self.oldOneShots)
class myTimer(object):
def __init__(self, delay, func, args=tuple(), kwargs={}):
self.delay = delay
self.func = func
self.loop = True
self.args = args
self.kwargs = kwargs
self.thread = threading.Thread(target=self.run, daemon=True)
self.thread.start()
self.output = None
def run(self):
while self.loop:
self.output = self.func(*self.args, **self.kwargs)
if self.delay > 0.1:
count = 0
while count <= self.delay:
count += 0.1
time.sleep(0.1)
else:
time.sleep(self.delay)
def stop(self):
self.loop = False
def running(self):
return self.loop
def getResult(self):
return self.output
class myOneShot(object):
def __init__(self, func, args=tuple(), kwargs={}):
self.func = func
self.args = args
self.kwargs = kwargs
self.thread = threading.Thread(target=self.run, daemon=True)
self.thread.start()
self.output = None
def run(self):
self.output = self.func(*self.args, **self.kwargs)
def running(self):
return self.thread.is_alive()
def getResult(self):
return self.output
if __name__ == '__main__':
import random
random.seed()
def longFunc(num):
delay = random.randint(5,8)
if num in (3, 6):
delay = 2
print('-'*30+'func {} has sleep {}'.format(num, delay))
time.sleep(delay)
print('-'*30+'func {} is done'.format(num))
return num
def checker(handler):
if handler.isLastDone():
return handler.getLastResult()
else:
return None
myHandler = MyThreadHandler()
# The 'checker' function simulates something in my program that uses the
# data generated by the 'longFunc'. It waits until there are no more threads
# in the threadHandler, as that would indicate that the user is done
# switching back-and-forth between different values
checkTimer = myHandler.createTimer(1, checker, myHandler)
# create 10 one-shot threads that take a 'long' time. The delay is to keep
# them in order, as this loop is meant to simulate a user switching between
# items using a keyboard or mouse, which I imagine they couldn't do any
# faster than every 1/10th of a second
start = time.time()
for i in range(4):
myHandler.createOneShot(longFunc, i)
time.sleep(0.1)
# wait until there are no more threads executing
last = myHandler.getLastResult()
print('result from last = {}'.format(last))
for i in range(4, 7):
myHandler.createOneShot(longFunc, i)
time.sleep(0.1)
last = myHandler.getLastResult()
print('result from last = {}'.format(last))
while myHandler.getNumOldThreads() >0 or myHandler.getNumThreads() > 0:
pass
myHandler.end()
print('done ending')
vielleicht muss ich meine Frage umformulieren. Der Benutzer möchte die letzte Antwort. Wenn also die Schaltfläche deaktiviert ist, während eine Antwort verarbeitet wird, möchte der Benutzer diese Verarbeitung unterbrechen, um einen anderen Wert zu berechnen. – wesanyer
Soweit ich das aus https: //docs.python erkennen kann .org/3/library/threading.html '... Pythons Thread-Klasse unterstützt eine Teilmenge des Verhaltens der Java-Thread-Klasse; Derzeit gibt es keine Prioritäten, keine Thread-Gruppen und Threads können nicht zerstört, gestoppt, ausgesetzt, fortgesetzt oder unterbrochen werden. 'Möglicherweise brauchen Sie eine andere Lösung als Multithreading. – MTset
Sie können auch einen Blick darauf werfen, die speziell QThread verwendet: https://nikolak.com/pyqt-threading-tutorial/ – MTset