Source code for cgl.core.slow_task

import time
import sys
from PySide6 import QtCore, QtWidgets


[docs] class BackgroundExecutor(QtCore.QObject): complete = QtCore.Signal(object) error = QtCore.Signal(object) def __init__(self, parent=None): super().__init__(parent)
[docs] @QtCore.Slot(object, object, object) def run(self, func, args, kwargs): try: result = func(*args, **kwargs) self.complete.emit(result) except Exception as e: # print(traceback.format_exc()) self.error.emit(e)
[docs] class ProgressDialog(QtWidgets.QDialog): submit = QtCore.Signal(object, object, object) def __init__(self, message="Running...", parent=None): super().__init__(parent) self.result = None self.error = None self.progress = QtWidgets.QProgressBar() self.progress.setRange(0, 0) self.executor = BackgroundExecutor() self.thread = QtCore.QThread() layout = QtWidgets.QVBoxLayout() layout.addWidget(QtWidgets.QLabel(message)) layout.addWidget(self.progress) self.setLayout(layout) # start background thread self.executor.moveToThread(self.thread) app = QtWidgets.QApplication.instance() app.aboutToQuit.connect(self.close_threads) self.thread.start() self.submit.connect(self.executor.run) self.executor.complete.connect(self.on_complete) self.executor.error.connect(self.on_error)
[docs] def close_threads(self): if self.thread.isRunning(): self.thread.quit() self.thread.wait()
[docs] def run_func(self, func, args, kwargs): self.submit.emit(func, args, kwargs)
[docs] @QtCore.Slot(object) def on_complete(self, result): self.result = result self.close()
[docs] @QtCore.Slot(object) def on_error(self, error): self.error = error self.close()
[docs] def closeEvent(self, event): self.close_threads()
[docs] def run_slow_background_task(message, func, *args, **kwargs): w = ProgressDialog(message) w.setWindowTitle("Running Slow Task") w.resize(300, 50) w.run_func(func, args, kwargs) w.exec() if w.error: raise w.error return w.result
if __name__ == "__main__": app = QtWidgets.QApplication(sys.argv) def sleep_task(amount=5): time.sleep(amount) return amount result = run_slow_background_task("sleep task", sleep_task, 5) assert result == 5