Source code for cgl.plugins.preflight.main

import json
import logging
import os
import sys
import traceback
import time
from cgl.msd.version import VersionMsd
from cgl.msd.services.event_service import EventService


from PySide6 import QtCore, QtGui, QtWidgets

from cgl.core.config.query import AlchemyConfigManager, get_image_path
from cgl.plugins.preflight.preflight_check import PreflightCheck

from cgl.ui.startup import do_gui_init
from cgl.ui.widgets.containers.table import LJTableWidget
from cgl.ui.widgets.dialog import InputDialog, CheckFailedDialog
from cgl.ui.widgets.widgets import GifWidget

CFG = AlchemyConfigManager()


[docs] class PreflightModel(QtCore.QAbstractTableModel): def __init__(self, data_list, header_titles=None, data_filter=False): QtCore.QAbstractTableModel.__init__(self) # self.setHeaderData(Qt.Horizontal, Qt.AlignLeft, Qt.TextAlignmentRole) self.cfg = CFG self.data_ = data_list self.headers = header_titles self.data_filter = data_filter
[docs] def data(self, index, role): row = index.row() col = index.column() value = self.data_[row][col] header = self.headers[col].lower() if role == QtCore.Qt.DisplayRole: # status column if header == "status": s = str(value).strip().lower() if s in {"pass", "passed"}: return "✅ Passed" elif s in {"fail", "failed"}: return "❌ Failed" elif s == "running": return "🟡 Running" elif s == "untested": return "⚪ Untested" return s.title() # required column elif header == "required": val = str(value).strip().lower() if val in {"true", "1", "yes"}: return "🟢" elif val in {"false", "0", "no"}: return "🟡" return str(value) return str(value) if role == QtCore.Qt.BackgroundRole and header == "status": v = str(value).strip().lower() if v in {"fail", "failed"}: return QtGui.QBrush(QtGui.QColor("#290000")) elif v in {"pass", "passed"}: return QtGui.QBrush(QtGui.QColor("#002900")) return None
[docs] def headerData(self, section, orientation, role): if role == QtCore.Qt.DisplayRole and orientation == QtCore.Qt.Horizontal: return self.headers[section]
[docs] def rowCount(self, index): if self.data_: return len(self.data_) else: return None
[docs] def columnCount(self, index): return len(self.headers)
[docs] class ItemTable(LJTableWidget): item_selected = QtCore.Signal(object) nothing_selected = QtCore.Signal() def __init__(self, parent): LJTableWidget.__init__(self, parent) # self.setMinimumWidth(400) self.clicked.connect(self.row_selected)
[docs] def mouseReleaseEvent(self, e): super(ItemTable, self).mouseReleaseEvent(e) self.row_selected()
[docs] def row_selected(self): selected = [] for index in self.selectedIndexes(): mdl_index = self.model().mapToSource(index) mdl = self.model().sourceModel() row = mdl_index.row() sel = mdl.data_[row] data = { "Check": sel[0], "Status": sel[1], "Path": sel[2], "Order": sel[3], "Required": sel[4], } if data in selected: continue selected.append(data) if selected: self.item_selected.emit(selected) else: self.nothing_selected.emit()
[docs] def try_run_class(class_, msd=None, check_name=None, recipe_type=None): try: class_.run() except Exception: tb = traceback.format_exc() class_.status = False class_.feedback = f"Job Failed:\n {tb}" if msd: es = EventService(msd) es.log_event( action="bug", user=os.getenv("USERNAME", "unknown"), app="alchemy", details={"check": check_name, "traceback": tb}, ) es.log_bug( bug_id=f"{check_name}", status="failed", details={"traceback": tb}, recipe_type=recipe_type, )
[docs] class PreflightExecutor(QtCore.QObject): complete = QtCore.Signal(object, object) def __init__(self, parent=None): super().__init__(parent)
[docs] @QtCore.Slot(object, object) def run(self, class_, index): try_run_class(class_) self.complete.emit(class_, index)
[docs] class Preflight(QtWidgets.QDialog): check_submit = QtCore.Signal(object, object) def __init__( self, parent=None, software="alchemy", preflight="", model=None, path_object=None, current_selection=None, auto_show=False, pf_type="publish", force_top_level=False, use_threads=True, extra_shared_data=None, manifest_path=None, **kwargs, ): # if parent is None: # import maya.OpenMayaUI as omui # from shiboken2 import wrapInstance # # ptr = omui.MQtUtil.mainWindow() # parent = wrapInstance(int(ptr), QtWidgets.QWidget) super(Preflight, self).__init__(parent) self.setMinimumWidth(400) self.setMinimumHeight(600) self.software = software if force_top_level: self.setWindowFlags(self.windowFlags() | QtCore.Qt.WindowStaysOnTopHint) self.preflight = preflight.lower() self.pf_type = pf_type self.cfg = CFG self.software_dir = os.path.join(self.cfg.cookbook_dir, software) self.preflight_dir = os.path.join(self.software_dir, pf_type) # set shared data for the PreflightCheck self.shared_data = {} self.shared_data["path_object"] = path_object self.shared_data["manifest_path"] = manifest_path self.setWindowModality(QtCore.Qt.NonModal) # Build a version MSD wrapper (non-publish) if path_object: self.msd = VersionMsd(path=path_object) self.event_service = EventService(self.msd) existing_context = self.msd.get_shared_data() for key, info in existing_context.items(): print(key, info) if existing_context: self.shared_data.update(existing_context) if os.path.isdir(self.preflight_dir): if self.preflight not in os.listdir(self.preflight_dir): if self.pf_type == "start": return None if software == "maya": from cgl.ui.widgets.dialog import InputDialog logging.info( f"{self.preflight_dir}, looking for : {self.preflight}" ) message = ( "no {} preflight found, create one in the Production Cookbook:\n" "Software: {}\n" "Menu Type: {}\n" "Create new Preflight: {}".format( self.preflight, self.software, self.pf_type, self.preflight ) ) dialog = InputDialog( title="Preflight Not Found", message=message, force_top_level=True, ) dialog.exec() return None print(f"{self.preflight} not found") self.preflight = "default" # self.setAttribute(QtCore.Qt.WA_DeleteOnClose, True) self.json_file = os.path.join( self.software_dir, "{}.cgl".format(self.pf_type) ).replace("\\", "/") self.modules = {} self.ignore = ["__init__.py"] self.selected_checks = [] self.function_d = {} self.table_data = [] self.setWindowTitle("%s %s" % (self.preflight.title(), pf_type)) if path_object.tree == "source": self.shared_data["source_object"] = path_object self.shared_data["render_object"] = path_object.copy( tree="render", filename=None, ext=None ) else: self.shared_data["source_object"] = path_object.copy( tree="source", filename=None, ext=None ) self.shared_data["render_object"] = path_object self.shared_data["current_selection"] = current_selection self.shared_data["preflight_dialog"] = self if extra_shared_data: self.shared_data.update(extra_shared_data) try: for key, value in kwargs.iteritems(): self.shared_data[key] = value except AttributeError: for key, value in kwargs.items(): self.shared_data[key] = value if model: self.shared_data["parent"] = parent self.shared_data["mdl"] = model # create the lay v_layout = QtWidgets.QVBoxLayout() combo_layout = QtWidgets.QHBoxLayout() self.executor = PreflightExecutor() self.close_when_done = False self.checks = [] self.check_index = None self.bg_thread = QtCore.QThread(self) app = QtWidgets.QApplication.instance() app.aboutToQuit.connect(self.close_threads) # disable this to turn off threading if use_threads: self.executor.moveToThread(self.bg_thread) self.bg_thread.start() # ------ self.executor.complete.connect(self.check_complete) self.check_submit.connect(self.executor.run) # create the widgets self.publish = ItemTable(self) # select all row when clicked self.publish.setSelectionBehavior(QtWidgets.QAbstractItemView.SelectRows) # keep this window on top self.setWindowFlags(self.windowFlags()) self.publish.setMinimumWidth(300) self.publish.setMinimumHeight(250) self.software_label = QtWidgets.QLabel("Preflight Checks") self.software_label.setProperty("class", "ultra_title") self.image_plane = GifWidget(gif_path=get_image_path("mana.gif")) # keep aspect ratio # self.image_plane.setScaledContents(True) self.image_plane.hide() self.progress_bar = QtWidgets.QProgressBar() self.progress_bar.hide() self.run_all = QtWidgets.QPushButton("Run All") self.run_all.setProperty("class", "add_button") self.run_selected = QtWidgets.QPushButton("Run Selected") self.run_selected.setProperty("class", "basic") # construct the GUI combo_layout.addWidget(self.software_label) combo_layout.addStretch(1) button_bar = QtWidgets.QHBoxLayout() button_bar.addItem( QtWidgets.QSpacerItem( 0, 0, QtWidgets.QSizePolicy.Expanding, QtWidgets.QSizePolicy.Minimum ) ) button_bar.addWidget(self.run_selected) button_bar.addWidget(self.run_all) v_layout.addLayout(combo_layout) v_layout.addWidget(self.publish) v_layout.addWidget(self.image_plane) v_layout.addWidget(self.progress_bar) v_layout.addLayout(button_bar) self.setLayout(v_layout) # load the GUI self._load_json() self.populate_table() QtCore.QTimer.singleShot(0, self._configure_table_columns) self.restore_incomplete_preflight() self.publish.item_selected.connect(self.update_selection) self.run_selected.clicked.connect(self.run_selected_clicked) self.run_all.clicked.connect(self.run_all_clicked) if auto_show: self.show() def _load_json(self): with open(self.json_file, "r") as stream: d = json.load(stream) for button in d[self.software]: if button["name"] == self.preflight: self.modules = button["buttons"] return for button in d[self.software]: if button["name"] == "default": self.modules = button["buttons"] logging.info( "No preflight for %s found in %s, using default" % (self.preflight, self.json_file) ) self.preflight = "default" return
[docs] def populate_table(self): sys.path.insert(0, self.cfg.cookbook_root) data = [] for item in self.modules: if item != "order": if "required" not in item: item["required"] = True order = item["order"] order = f"{order:03}" data.append( [ item["label"], "Untested", item["module"], order, item["required"], ] ) self.table_data = data self.publish.set_item_model( PreflightModel(data, ["Check", "Status", "Path", "Order", "Required"]) )
# self._configure_table_columns() # self.publish.resizeColumnsToContents()
[docs] def update_selection(self, data): self.selected_checks = data
[docs] def begin_checks(self, checks): self.function_d = {} # load / reload modules first_item = True copy_shared = False for item in self.modules: if item == "order": continue module = item["module"] module = self.clean_module(module) module_name = module.split(".")[-1] # only copy shared data if we are running the first item if first_item and item["label"] == checks[0]["Check"]: copy_shared = True first_item = False try: # Python 3+ if sys.version_info[0] >= 3: import importlib # make sure the cookbook path is in the path if self.cfg.cookbook_root not in sys.path: print( f"inserting {self.cfg.cookbook_root} to path", ) sys.path.insert(0, self.cfg.cookbook_root) print("\tModule", module, "\n\tModule Name", module_name) loaded_module = importlib.import_module(module, module_name) print(1) loaded_module = importlib.reload(loaded_module) print(2) else: print(3, module, module_name) loaded_module = __import__( module, globals(), locals(), module_name, -1 ) except Exception as e: print(e) dialog = InputDialog( title="Error Reading cookbook", message=e, force_top_level=True, ) dialog.exec() self.close_when_done = False self.end_checks() return print(666, loaded_module, module_name) class_ = getattr(loaded_module, module_name) c = class_(parent=self) self.function_d.update({item["label"]: c}) if copy_shared: print("resetting shared data", self.shared_data.keys()) PreflightCheck.shared_data = self.shared_data.copy() self.event_service.update_shared_data(self.shared_data) print(55) self.checks = checks self.check_index = None self.image_plane.start() self.progress_bar.show() print(66) self.run_all.setEnabled(False) self.run_selected.setEnabled(False)
[docs] def clean_module(self, module): """ cleans up the module string for loading if it has "import" and other information Args: module: Returns: """ if "import" in module: module = module.split("import")[1] if " as " in module: module = module.split(" as ")[0] module = module.strip() return module
[docs] def end_checks(self): self.function_d = {} self.checks = [] self.check_index = None self.image_plane.stop() self.progress_bar.hide() self.run_all.setEnabled(True) self.run_selected.setEnabled(True)
[docs] def get_checks_from_selection(self): """ Always compute selected rows directly from the current model. Avoids stale selections after model refresh. """ model = self.publish.model().sourceModel() sel = self.publish.selectionModel().selectedRows() checks = [] for idx in sel: row = idx.row() checks.append( { "Check": model.data_[row][0], "Status": model.data_[row][1], "Path": model.data_[row][2], "Order": model.data_[row][3], "Required": model.data_[row][4], } ) return checks
[docs] def run_selected_clicked(self, checks=None): if not checks: checks = self.get_checks_from_selection() if not checks: dialog = InputDialog( title="Check Failed", message="No checks selected", force_top_level=True ) dialog.exec() return self.event_service.log_event( action=self.pf_type, user=CFG.alc_user, app=self.software, details={"cookbook_recipe": self.preflight, "steps": {}}, ) max_order = max(int(c["Order"]) for c in checks) model = self.publish.model().sourceModel() expanded = [] for row in model.data_: order = int(row[3]) if order <= max_order: expanded.append( { "Check": row[0], "Status": row[1], "Path": row[2], "Order": row[3], "Required": row[4], } ) print("[run_selected_clicked] begin checks", checks) checks = [c for c in expanded if c["Status"] != "Passed"] self.begin_checks(checks) print(2) self.check_index = 0 self.check_request(0) self.update_progress(0, len(self.checks))
[docs] def check_request(self, index): check = self.checks[index] if self.previous_checks_passed(check): class_ = self.function_d[check["Check"]] if class_.thread_safe and self.bg_thread.isRunning(): self.check_submit.emit(class_, index) else: # delay the running the check a bit to let event loop run QtCore.QTimer.singleShot( 100, lambda: ( try_run_class( class_, msd=self.msd, check_name=check["Check"], recipe_type=self.pf_type, ), self.check_complete(class_, index), ), ) check["Status"] = "Running" self.update_status(check=check["Check"], status=check["Status"]) else: self.end_checks() dialog = InputDialog( title="Check Failed", message="Can't run a check when previous required checks have not passed", force_top_level=True, ) dialog.exec()
[docs] @QtCore.Slot(object, object) def check_complete(self, class_, index): logging.info(f"completed check {index} {self.close_when_done}") check = self.checks[index] if self.function_d[check["Check"]].status: check["Status"] = "Passed" else: check["Status"] = "Failed" self.update_status(check=check["Check"], status=check["Status"]) next_index = index + 1 # finish if complete or a check fails if next_index >= len(self.checks) or not class_.status: self.end_checks() if not class_.status: dialog = CheckFailedDialog( title="Check Failed", message=class_.feedback, force_top_level=True ) dialog.exec() elif self.close_when_done: self.close_when_done = False try: self.accept() # if this is a QDialog except AttributeError: logging.info("Attempting to Close this QWidget") self.close() else: self.check_request(next_index) self.update_progress(next_index, len(self.checks))
[docs] def update_event_step(self, status, check): end_time = time.time() duration = None if "start_time" in check: duration = end_time - check["start_time"] step_data = self.event_service.update_step( step_key=check, status=status, feedback=getattr(self, "feedback", ""), duration=duration, ) # Keep memory in sync for UI self.msd.payload["events"][-1]["details"]["steps"][check] = step_data
def _configure_table_columns(self): header = self.publish.horizontalHeader() header.setStretchLastSection(False) header.setSectionResizeMode(0, QtWidgets.QHeaderView.Stretch) header.setSectionResizeMode(1, QtWidgets.QHeaderView.ResizeToContents) header.setSectionResizeMode(4, QtWidgets.QHeaderView.ResizeToContents) self.publish.sortByColumn(3, QtCore.Qt.SortOrder(0)) self.publish.hideColumn(2) self.publish.hideColumn(3) self.publish.setSortingEnabled(False)
[docs] def restore_incomplete_preflight(self): """ Restores the last incomplete preflight (where any step != 'Passed'). Pre-fills the table and selects failed/incomplete checks for rerun. """ if not hasattr(self, "event_service"): from cgl.msd.services.event_service import EventService self.event_service = EventService(self.msd) latest_event = self.event_service.get_latest_event_by_action( action=self.pf_type.lower() ) if not latest_event: return steps = latest_event.get("details", {}).get("steps", {}) if not steps: print("No Steps Found") return # Determine if preflight was incomplete failed_or_incomplete = { name: data for name, data in steps.items() if data.get("status") != "Passed" } if not failed_or_incomplete: print("✅ Last preflight was fully successful — no restore needed.") return print( f"⚠️ Restoring incomplete preflight ({len(failed_or_incomplete)} incomplete steps)" ) # Pre-fill the table with the last known statuses for row in self.table_data: check_label = row[0] if check_label in steps: status = steps[check_label].get("status", "Untested") if status != "Passed": status = "Untested" row[1] = status # Refresh the model widths = [ self.publish.columnWidth(i) for i in range(self.publish.model().columnCount()) ] self.publish.set_item_model( PreflightModel( self.table_data, ["Check", "Status", "Path", "Order", "Required"], ) ) self._configure_table_columns() for i, w in enumerate(widths): self.publish.setColumnWidth(i, w) # --- Optionally select the failed/incomplete checks --- failed_labels = set(failed_or_incomplete.keys()) # model = self.publish.model().sourceModel() for row_index, row in enumerate(self.table_data): if row[0] in failed_labels: # Select this row qt_index = self.publish.model().index(row_index, 0) self.publish.selectionModel().select( qt_index, QtCore.QItemSelectionModel.Select | QtCore.QItemSelectionModel.Rows, ) print("Auto-selected failed/incomplete checks for re-run.")
[docs] def update_progress(self, value, max_value): self.progress_bar.setRange(0, max_value) self.progress_bar.setValue(value)
[docs] def close_threads(self): try: if self.bg_thread.isRunning(): print("closing threads") self.bg_thread.quit() self.bg_thread.wait() print("threads closed") except AttributeError: logging.info("{} has No threads to close".format(self))
[docs] def closeEvent(self, event): self.close_threads()
[docs] def previous_checks_passed(self, check): mdl = self.publish.model() if int(check["Order"]) == 1: return True for irow in range(int(check["Order"]) - 1): name = mdl.index(irow, 0) passed = mdl.index(irow, 1) required = mdl.index(irow, 4) if str(mdl.data(passed)) != str("Passed"): logging.info("%s didnt pass" % str(mdl.data(name))) if str(mdl.data(required)) == str(True): logging.info( "Required Check Doesn't Pass: ", str(mdl.data(name)), str(mdl.data(passed)), str(mdl.data(required)), ) return False else: logging.info("Check Failed, Not Required, Next Check Enabled") return True return True
# get a list of all the checks before me # if any of them have "Required" as True and "Failed" send a popup
[docs] def update_status(self, check, status): row = -1 for each in self.table_data: row += 1 if each[0] == check: self.table_data[row][1] = status # refresh the table with self.table_data widths = [ self.publish.columnWidth(i) for i in range(self.publish.model().columnCount()) ] self.publish.set_item_model( PreflightModel( self.table_data, ["Check", "Status", "Path", "Order", "Required"], ) ) self._configure_table_columns() for i, w in enumerate(widths): self.publish.setColumnWidth(i, w) self.update_event_step(status, check)
[docs] def run_all_clicked(self): # To Do: load waiting gif while function operates self.event_service.log_event( action=self.pf_type, user=CFG.alc_user, app=self.software, details={"cookbook_recipe": self.preflight, "steps": {}}, ) self.image_plane.start() model = self.publish.model() all_rows = [] for irow in range(model.rowCount()): data = { "Check": str(model.data(model.index(irow, 0))), "Status": str(model.data(model.index(irow, 1))), "Path": str(model.data(model.index(irow, 2))), "Order": str(model.data(model.index(irow, 3))), "Required": str(model.data(model.index(irow, 4))), } all_rows.append(data) self.close_when_done = True self.run_selected_clicked(checks=all_rows)
[docs] def main(path_object): app = do_gui_init() task = "mdl" mw = Preflight(software="maya", preflight=task, path_object=path_object) mw.setWindowTitle("%s Publish" % task) mw.show() mw.raise_() app.exec()
if __name__ == "__main__": from cgl.core.path.object import PathObject # execute only if run as a script path_ = r"E:\Alchemy\CGL\JLT\VERSIONS\source\assets\char\jack\mdl\default\tom" path_object = PathObject().from_path_string(path_) main(path_object)