import json
import logging
import os
import sys
import traceback
import time
from cgl.msd.version import VersionMsd
from cgl.msd.services.event_service import EventService
from PySide6 import QtCore, QtGui, QtWidgets
from cgl.core.config.query import AlchemyConfigManager, get_image_path
from cgl.plugins.preflight.preflight_check import PreflightCheck
from cgl.ui.startup import do_gui_init
from cgl.ui.widgets.containers.table import LJTableWidget
from cgl.ui.widgets.dialog import InputDialog, CheckFailedDialog
from cgl.ui.widgets.widgets import GifWidget
CFG = AlchemyConfigManager()
[docs]
class PreflightModel(QtCore.QAbstractTableModel):
def __init__(self, data_list, header_titles=None, data_filter=False):
QtCore.QAbstractTableModel.__init__(self)
# self.setHeaderData(Qt.Horizontal, Qt.AlignLeft, Qt.TextAlignmentRole)
self.cfg = CFG
self.data_ = data_list
self.headers = header_titles
self.data_filter = data_filter
[docs]
def data(self, index, role):
row = index.row()
col = index.column()
value = self.data_[row][col]
header = self.headers[col].lower()
if role == QtCore.Qt.DisplayRole:
# status column
if header == "status":
s = str(value).strip().lower()
if s in {"pass", "passed"}:
return "✅ Passed"
elif s in {"fail", "failed"}:
return "❌ Failed"
elif s == "running":
return "🟡 Running"
elif s == "untested":
return "⚪ Untested"
return s.title()
# required column
elif header == "required":
val = str(value).strip().lower()
if val in {"true", "1", "yes"}:
return "🟢"
elif val in {"false", "0", "no"}:
return "🟡"
return str(value)
return str(value)
if role == QtCore.Qt.BackgroundRole and header == "status":
v = str(value).strip().lower()
if v in {"fail", "failed"}:
return QtGui.QBrush(QtGui.QColor("#290000"))
elif v in {"pass", "passed"}:
return QtGui.QBrush(QtGui.QColor("#002900"))
return None
[docs]
def rowCount(self, index):
if self.data_:
return len(self.data_)
else:
return None
[docs]
def columnCount(self, index):
return len(self.headers)
[docs]
class ItemTable(LJTableWidget):
item_selected = QtCore.Signal(object)
nothing_selected = QtCore.Signal()
def __init__(self, parent):
LJTableWidget.__init__(self, parent)
# self.setMinimumWidth(400)
self.clicked.connect(self.row_selected)
[docs]
def mouseReleaseEvent(self, e):
super(ItemTable, self).mouseReleaseEvent(e)
self.row_selected()
[docs]
def row_selected(self):
selected = []
for index in self.selectedIndexes():
mdl_index = self.model().mapToSource(index)
mdl = self.model().sourceModel()
row = mdl_index.row()
sel = mdl.data_[row]
data = {
"Check": sel[0],
"Status": sel[1],
"Path": sel[2],
"Order": sel[3],
"Required": sel[4],
}
if data in selected:
continue
selected.append(data)
if selected:
self.item_selected.emit(selected)
else:
self.nothing_selected.emit()
[docs]
def try_run_class(class_, msd=None, check_name=None, recipe_type=None):
try:
class_.run()
except Exception:
tb = traceback.format_exc()
class_.status = False
class_.feedback = f"Job Failed:\n {tb}"
if msd:
es = EventService(msd)
es.log_event(
action="bug",
user=os.getenv("USERNAME", "unknown"),
app="alchemy",
details={"check": check_name, "traceback": tb},
)
es.log_bug(
bug_id=f"{check_name}",
status="failed",
details={"traceback": tb},
recipe_type=recipe_type,
)
[docs]
class PreflightExecutor(QtCore.QObject):
complete = QtCore.Signal(object, object)
def __init__(self, parent=None):
super().__init__(parent)
[docs]
@QtCore.Slot(object, object)
def run(self, class_, index):
try_run_class(class_)
self.complete.emit(class_, index)
[docs]
class Preflight(QtWidgets.QDialog):
check_submit = QtCore.Signal(object, object)
def __init__(
self,
parent=None,
software="alchemy",
preflight="",
model=None,
path_object=None,
current_selection=None,
auto_show=False,
pf_type="publish",
force_top_level=False,
use_threads=True,
extra_shared_data=None,
manifest_path=None,
**kwargs,
):
# if parent is None:
# import maya.OpenMayaUI as omui
# from shiboken2 import wrapInstance
#
# ptr = omui.MQtUtil.mainWindow()
# parent = wrapInstance(int(ptr), QtWidgets.QWidget)
super(Preflight, self).__init__(parent)
self.setMinimumWidth(400)
self.setMinimumHeight(600)
self.software = software
if force_top_level:
self.setWindowFlags(self.windowFlags() | QtCore.Qt.WindowStaysOnTopHint)
self.preflight = preflight.lower()
self.pf_type = pf_type
self.cfg = CFG
self.software_dir = os.path.join(self.cfg.cookbook_dir, software)
self.preflight_dir = os.path.join(self.software_dir, pf_type)
# set shared data for the PreflightCheck
self.shared_data = {}
self.shared_data["path_object"] = path_object
self.shared_data["manifest_path"] = manifest_path
self.setWindowModality(QtCore.Qt.NonModal)
# Build a version MSD wrapper (non-publish)
if path_object:
self.msd = VersionMsd(path=path_object)
self.event_service = EventService(self.msd)
existing_context = self.msd.get_shared_data()
for key, info in existing_context.items():
print(key, info)
if existing_context:
self.shared_data.update(existing_context)
if os.path.isdir(self.preflight_dir):
if self.preflight not in os.listdir(self.preflight_dir):
if self.pf_type == "start":
return None
if software == "maya":
from cgl.ui.widgets.dialog import InputDialog
logging.info(
f"{self.preflight_dir}, looking for : {self.preflight}"
)
message = (
"no {} preflight found, create one in the Production Cookbook:\n"
"Software: {}\n"
"Menu Type: {}\n"
"Create new Preflight: {}".format(
self.preflight, self.software, self.pf_type, self.preflight
)
)
dialog = InputDialog(
title="Preflight Not Found",
message=message,
force_top_level=True,
)
dialog.exec()
return None
print(f"{self.preflight} not found")
self.preflight = "default"
# self.setAttribute(QtCore.Qt.WA_DeleteOnClose, True)
self.json_file = os.path.join(
self.software_dir, "{}.cgl".format(self.pf_type)
).replace("\\", "/")
self.modules = {}
self.ignore = ["__init__.py"]
self.selected_checks = []
self.function_d = {}
self.table_data = []
self.setWindowTitle("%s %s" % (self.preflight.title(), pf_type))
if path_object.tree == "source":
self.shared_data["source_object"] = path_object
self.shared_data["render_object"] = path_object.copy(
tree="render", filename=None, ext=None
)
else:
self.shared_data["source_object"] = path_object.copy(
tree="source", filename=None, ext=None
)
self.shared_data["render_object"] = path_object
self.shared_data["current_selection"] = current_selection
self.shared_data["preflight_dialog"] = self
if extra_shared_data:
self.shared_data.update(extra_shared_data)
try:
for key, value in kwargs.iteritems():
self.shared_data[key] = value
except AttributeError:
for key, value in kwargs.items():
self.shared_data[key] = value
if model:
self.shared_data["parent"] = parent
self.shared_data["mdl"] = model
# create the lay
v_layout = QtWidgets.QVBoxLayout()
combo_layout = QtWidgets.QHBoxLayout()
self.executor = PreflightExecutor()
self.close_when_done = False
self.checks = []
self.check_index = None
self.bg_thread = QtCore.QThread(self)
app = QtWidgets.QApplication.instance()
app.aboutToQuit.connect(self.close_threads)
# disable this to turn off threading
if use_threads:
self.executor.moveToThread(self.bg_thread)
self.bg_thread.start()
# ------
self.executor.complete.connect(self.check_complete)
self.check_submit.connect(self.executor.run)
# create the widgets
self.publish = ItemTable(self)
# select all row when clicked
self.publish.setSelectionBehavior(QtWidgets.QAbstractItemView.SelectRows)
# keep this window on top
self.setWindowFlags(self.windowFlags())
self.publish.setMinimumWidth(300)
self.publish.setMinimumHeight(250)
self.software_label = QtWidgets.QLabel("Preflight Checks")
self.software_label.setProperty("class", "ultra_title")
self.image_plane = GifWidget(gif_path=get_image_path("mana.gif"))
# keep aspect ratio
# self.image_plane.setScaledContents(True)
self.image_plane.hide()
self.progress_bar = QtWidgets.QProgressBar()
self.progress_bar.hide()
self.run_all = QtWidgets.QPushButton("Run All")
self.run_all.setProperty("class", "add_button")
self.run_selected = QtWidgets.QPushButton("Run Selected")
self.run_selected.setProperty("class", "basic")
# construct the GUI
combo_layout.addWidget(self.software_label)
combo_layout.addStretch(1)
button_bar = QtWidgets.QHBoxLayout()
button_bar.addItem(
QtWidgets.QSpacerItem(
0, 0, QtWidgets.QSizePolicy.Expanding, QtWidgets.QSizePolicy.Minimum
)
)
button_bar.addWidget(self.run_selected)
button_bar.addWidget(self.run_all)
v_layout.addLayout(combo_layout)
v_layout.addWidget(self.publish)
v_layout.addWidget(self.image_plane)
v_layout.addWidget(self.progress_bar)
v_layout.addLayout(button_bar)
self.setLayout(v_layout)
# load the GUI
self._load_json()
self.populate_table()
QtCore.QTimer.singleShot(0, self._configure_table_columns)
self.restore_incomplete_preflight()
self.publish.item_selected.connect(self.update_selection)
self.run_selected.clicked.connect(self.run_selected_clicked)
self.run_all.clicked.connect(self.run_all_clicked)
if auto_show:
self.show()
def _load_json(self):
with open(self.json_file, "r") as stream:
d = json.load(stream)
for button in d[self.software]:
if button["name"] == self.preflight:
self.modules = button["buttons"]
return
for button in d[self.software]:
if button["name"] == "default":
self.modules = button["buttons"]
logging.info(
"No preflight for %s found in %s, using default"
% (self.preflight, self.json_file)
)
self.preflight = "default"
return
[docs]
def populate_table(self):
sys.path.insert(0, self.cfg.cookbook_root)
data = []
for item in self.modules:
if item != "order":
if "required" not in item:
item["required"] = True
order = item["order"]
order = f"{order:03}"
data.append(
[
item["label"],
"Untested",
item["module"],
order,
item["required"],
]
)
self.table_data = data
self.publish.set_item_model(
PreflightModel(data, ["Check", "Status", "Path", "Order", "Required"])
)
# self._configure_table_columns()
# self.publish.resizeColumnsToContents()
[docs]
def update_selection(self, data):
self.selected_checks = data
[docs]
def begin_checks(self, checks):
self.function_d = {}
# load / reload modules
first_item = True
copy_shared = False
for item in self.modules:
if item == "order":
continue
module = item["module"]
module = self.clean_module(module)
module_name = module.split(".")[-1]
# only copy shared data if we are running the first item
if first_item and item["label"] == checks[0]["Check"]:
copy_shared = True
first_item = False
try:
# Python 3+
if sys.version_info[0] >= 3:
import importlib
# make sure the cookbook path is in the path
if self.cfg.cookbook_root not in sys.path:
print(
f"inserting {self.cfg.cookbook_root} to path",
)
sys.path.insert(0, self.cfg.cookbook_root)
print("\tModule", module, "\n\tModule Name", module_name)
loaded_module = importlib.import_module(module, module_name)
print(1)
loaded_module = importlib.reload(loaded_module)
print(2)
else:
print(3, module, module_name)
loaded_module = __import__(
module, globals(), locals(), module_name, -1
)
except Exception as e:
print(e)
dialog = InputDialog(
title="Error Reading cookbook",
message=e,
force_top_level=True,
)
dialog.exec()
self.close_when_done = False
self.end_checks()
return
print(666, loaded_module, module_name)
class_ = getattr(loaded_module, module_name)
c = class_(parent=self)
self.function_d.update({item["label"]: c})
if copy_shared:
print("resetting shared data", self.shared_data.keys())
PreflightCheck.shared_data = self.shared_data.copy()
self.event_service.update_shared_data(self.shared_data)
print(55)
self.checks = checks
self.check_index = None
self.image_plane.start()
self.progress_bar.show()
print(66)
self.run_all.setEnabled(False)
self.run_selected.setEnabled(False)
[docs]
def clean_module(self, module):
"""
cleans up the module string for loading if it has "import" and other information
Args:
module:
Returns:
"""
if "import" in module:
module = module.split("import")[1]
if " as " in module:
module = module.split(" as ")[0]
module = module.strip()
return module
[docs]
def end_checks(self):
self.function_d = {}
self.checks = []
self.check_index = None
self.image_plane.stop()
self.progress_bar.hide()
self.run_all.setEnabled(True)
self.run_selected.setEnabled(True)
[docs]
def get_checks_from_selection(self):
"""
Always compute selected rows directly from the current model.
Avoids stale selections after model refresh.
"""
model = self.publish.model().sourceModel()
sel = self.publish.selectionModel().selectedRows()
checks = []
for idx in sel:
row = idx.row()
checks.append(
{
"Check": model.data_[row][0],
"Status": model.data_[row][1],
"Path": model.data_[row][2],
"Order": model.data_[row][3],
"Required": model.data_[row][4],
}
)
return checks
[docs]
def run_selected_clicked(self, checks=None):
if not checks:
checks = self.get_checks_from_selection()
if not checks:
dialog = InputDialog(
title="Check Failed", message="No checks selected", force_top_level=True
)
dialog.exec()
return
self.event_service.log_event(
action=self.pf_type,
user=CFG.alc_user,
app=self.software,
details={"cookbook_recipe": self.preflight, "steps": {}},
)
max_order = max(int(c["Order"]) for c in checks)
model = self.publish.model().sourceModel()
expanded = []
for row in model.data_:
order = int(row[3])
if order <= max_order:
expanded.append(
{
"Check": row[0],
"Status": row[1],
"Path": row[2],
"Order": row[3],
"Required": row[4],
}
)
print("[run_selected_clicked] begin checks", checks)
checks = [c for c in expanded if c["Status"] != "Passed"]
self.begin_checks(checks)
print(2)
self.check_index = 0
self.check_request(0)
self.update_progress(0, len(self.checks))
[docs]
def check_request(self, index):
check = self.checks[index]
if self.previous_checks_passed(check):
class_ = self.function_d[check["Check"]]
if class_.thread_safe and self.bg_thread.isRunning():
self.check_submit.emit(class_, index)
else:
# delay the running the check a bit to let event loop run
QtCore.QTimer.singleShot(
100,
lambda: (
try_run_class(
class_,
msd=self.msd,
check_name=check["Check"],
recipe_type=self.pf_type,
),
self.check_complete(class_, index),
),
)
check["Status"] = "Running"
self.update_status(check=check["Check"], status=check["Status"])
else:
self.end_checks()
dialog = InputDialog(
title="Check Failed",
message="Can't run a check when previous required checks have not passed",
force_top_level=True,
)
dialog.exec()
[docs]
@QtCore.Slot(object, object)
def check_complete(self, class_, index):
logging.info(f"completed check {index} {self.close_when_done}")
check = self.checks[index]
if self.function_d[check["Check"]].status:
check["Status"] = "Passed"
else:
check["Status"] = "Failed"
self.update_status(check=check["Check"], status=check["Status"])
next_index = index + 1
# finish if complete or a check fails
if next_index >= len(self.checks) or not class_.status:
self.end_checks()
if not class_.status:
dialog = CheckFailedDialog(
title="Check Failed", message=class_.feedback, force_top_level=True
)
dialog.exec()
elif self.close_when_done:
self.close_when_done = False
try:
self.accept() # if this is a QDialog
except AttributeError:
logging.info("Attempting to Close this QWidget")
self.close()
else:
self.check_request(next_index)
self.update_progress(next_index, len(self.checks))
[docs]
def update_event_step(self, status, check):
end_time = time.time()
duration = None
if "start_time" in check:
duration = end_time - check["start_time"]
step_data = self.event_service.update_step(
step_key=check,
status=status,
feedback=getattr(self, "feedback", ""),
duration=duration,
)
# Keep memory in sync for UI
self.msd.payload["events"][-1]["details"]["steps"][check] = step_data
def _configure_table_columns(self):
header = self.publish.horizontalHeader()
header.setStretchLastSection(False)
header.setSectionResizeMode(0, QtWidgets.QHeaderView.Stretch)
header.setSectionResizeMode(1, QtWidgets.QHeaderView.ResizeToContents)
header.setSectionResizeMode(4, QtWidgets.QHeaderView.ResizeToContents)
self.publish.sortByColumn(3, QtCore.Qt.SortOrder(0))
self.publish.hideColumn(2)
self.publish.hideColumn(3)
self.publish.setSortingEnabled(False)
[docs]
def restore_incomplete_preflight(self):
"""
Restores the last incomplete preflight (where any step != 'Passed').
Pre-fills the table and selects failed/incomplete checks for rerun.
"""
if not hasattr(self, "event_service"):
from cgl.msd.services.event_service import EventService
self.event_service = EventService(self.msd)
latest_event = self.event_service.get_latest_event_by_action(
action=self.pf_type.lower()
)
if not latest_event:
return
steps = latest_event.get("details", {}).get("steps", {})
if not steps:
print("No Steps Found")
return
# Determine if preflight was incomplete
failed_or_incomplete = {
name: data for name, data in steps.items() if data.get("status") != "Passed"
}
if not failed_or_incomplete:
print("✅ Last preflight was fully successful — no restore needed.")
return
print(
f"⚠️ Restoring incomplete preflight ({len(failed_or_incomplete)} incomplete steps)"
)
# Pre-fill the table with the last known statuses
for row in self.table_data:
check_label = row[0]
if check_label in steps:
status = steps[check_label].get("status", "Untested")
if status != "Passed":
status = "Untested"
row[1] = status
# Refresh the model
widths = [
self.publish.columnWidth(i)
for i in range(self.publish.model().columnCount())
]
self.publish.set_item_model(
PreflightModel(
self.table_data,
["Check", "Status", "Path", "Order", "Required"],
)
)
self._configure_table_columns()
for i, w in enumerate(widths):
self.publish.setColumnWidth(i, w)
# --- Optionally select the failed/incomplete checks ---
failed_labels = set(failed_or_incomplete.keys())
# model = self.publish.model().sourceModel()
for row_index, row in enumerate(self.table_data):
if row[0] in failed_labels:
# Select this row
qt_index = self.publish.model().index(row_index, 0)
self.publish.selectionModel().select(
qt_index,
QtCore.QItemSelectionModel.Select | QtCore.QItemSelectionModel.Rows,
)
print("Auto-selected failed/incomplete checks for re-run.")
[docs]
def update_progress(self, value, max_value):
self.progress_bar.setRange(0, max_value)
self.progress_bar.setValue(value)
[docs]
def close_threads(self):
try:
if self.bg_thread.isRunning():
print("closing threads")
self.bg_thread.quit()
self.bg_thread.wait()
print("threads closed")
except AttributeError:
logging.info("{} has No threads to close".format(self))
[docs]
def closeEvent(self, event):
self.close_threads()
[docs]
def previous_checks_passed(self, check):
mdl = self.publish.model()
if int(check["Order"]) == 1:
return True
for irow in range(int(check["Order"]) - 1):
name = mdl.index(irow, 0)
passed = mdl.index(irow, 1)
required = mdl.index(irow, 4)
if str(mdl.data(passed)) != str("Passed"):
logging.info("%s didnt pass" % str(mdl.data(name)))
if str(mdl.data(required)) == str(True):
logging.info(
"Required Check Doesn't Pass: ",
str(mdl.data(name)),
str(mdl.data(passed)),
str(mdl.data(required)),
)
return False
else:
logging.info("Check Failed, Not Required, Next Check Enabled")
return True
return True
# get a list of all the checks before me
# if any of them have "Required" as True and "Failed" send a popup
[docs]
def update_status(self, check, status):
row = -1
for each in self.table_data:
row += 1
if each[0] == check:
self.table_data[row][1] = status
# refresh the table with self.table_data
widths = [
self.publish.columnWidth(i)
for i in range(self.publish.model().columnCount())
]
self.publish.set_item_model(
PreflightModel(
self.table_data,
["Check", "Status", "Path", "Order", "Required"],
)
)
self._configure_table_columns()
for i, w in enumerate(widths):
self.publish.setColumnWidth(i, w)
self.update_event_step(status, check)
[docs]
def run_all_clicked(self):
# To Do: load waiting gif while function operates
self.event_service.log_event(
action=self.pf_type,
user=CFG.alc_user,
app=self.software,
details={"cookbook_recipe": self.preflight, "steps": {}},
)
self.image_plane.start()
model = self.publish.model()
all_rows = []
for irow in range(model.rowCount()):
data = {
"Check": str(model.data(model.index(irow, 0))),
"Status": str(model.data(model.index(irow, 1))),
"Path": str(model.data(model.index(irow, 2))),
"Order": str(model.data(model.index(irow, 3))),
"Required": str(model.data(model.index(irow, 4))),
}
all_rows.append(data)
self.close_when_done = True
self.run_selected_clicked(checks=all_rows)
[docs]
def main(path_object):
app = do_gui_init()
task = "mdl"
mw = Preflight(software="maya", preflight=task, path_object=path_object)
mw.setWindowTitle("%s Publish" % task)
mw.show()
mw.raise_()
app.exec()
if __name__ == "__main__":
from cgl.core.path.object import PathObject
# execute only if run as a script
path_ = r"E:\Alchemy\CGL\JLT\VERSIONS\source\assets\char\jack\mdl\default\tom"
path_object = PathObject().from_path_string(path_)
main(path_object)