Source code for cgl.core.utils.general

from __future__ import annotations
import shlex
import getpass
import glob
import json
import logging
import os
import pathlib
import platform
import pprint
import shutil
import stat
import subprocess
import sys
import time
import traceback
import re
import random
from datetime import datetime
from logging.handlers import RotatingFileHandler
from pathlib import Path
from typing import Optional, Sequence, Mapping, Any

from PySide6 import QtCore, QtGui, QtWidgets
from cgl.apps.css_editor.style_inspector import enable_dev_tools
from cgl.core.config.query import AlchemyConfigManager

CFG = AlchemyConfigManager()


[docs] def pretty(obj): """ return a pretty printed representation of an object Args: obj: the object to pretty print Returns: str: """ pp = pprint.PrettyPrinter(indent=4) return pp.pformat(obj)
[docs] def pretty_print_error(e: CglExecuteError): """ Nicely format a CglExecuteError for logging or console output. """ border = "=" * 60 lines = [ border, f"❌ Command failed: {e.command}", f"Return code: {e.returncode}", ] if e.stdout: lines.append("\n--- STDOUT ---\n" + e.stdout.strip()) if e.stderr: lines.append("\n--- STDERR ---\n" + e.stderr.strip()) lines.append(border) return "\n".join(lines)
[docs] class CglExecuteError(RuntimeError): """Raised when cgl_execute is misused or the child process fails.""" def __init__( self, message, command=None, returncode=None, stdout=None, stderr=None ): super().__init__(message) self.command = command self.returncode = returncode self.stdout = stdout self.stderr = stderr
[docs] def app_name(str_=None, human=False): """ return the name of the application used in settings files Args: str_: the name to map from defaults sys.argv[0] human: a human-readable string Returns: str: name of application """ supported_apps = ["maya", "nuke", "houdini", "unreal", "unity"] if str_ is None: str_ = sys.argv[0] title = os.path.basename(str_) if human: title = title.replace("_", " ") title, _ = os.path.splitext(title) if title not in supported_apps: title = "magic_browser" return title
[docs] def current_user(): """ find the currently logged-in user Returns: str: username """ user = getpass.getuser().lower() if user: return user else: logging.info("Cant find user") return None
[docs] def clean_file_list(file_list): """ removes items we don't want to display in the GUI based off what's listed in the globals. Args: file_list: Returns: clean_list: list of files with items removed """ util_config = CFG.get_util_config() if "ignore" in util_config.keys(): ignore_matches = util_config["ignore"]["matches"] ignore_contains = util_config["ignore"]["contains"] ignore_endswith = util_config["ignore"]["endswith"] ignore_startswith = util_config["ignore"]["startswith"] else: # update_globals() logging.info("Found Missing Globals and updated them. Try Launching again") return clean_list = [] for f in file_list: if "\\" in f or "/" in f: folder, name = os.path.split(f) else: name = f # Validate the ignore rules if not ignore_name( name, ignore_matches, ignore_contains, ignore_endswith, ignore_startswith ): clean_list.append(f) return clean_list
[docs] def ignore_name( name, ignore_matches, ignore_contains, ignore_endswith, ignore_startswith ): """ Check if a file should be ignored based on the ignore rules """ if ignore_matches: # Check if the name matches exactly for match in ignore_matches: if match == name: return True if ignore_contains: # Check if the name contains the string for contain in ignore_contains: if contain in name: return True if ignore_endswith: # Check if the name ends with the string after the period for endswith in ignore_endswith: if name.endswith(endswith): return True if ignore_startswith: # Check if the name starts with the string for startswith in ignore_startswith: if name.startswith(startswith): return True return False
[docs] def clean_file_string(file_name): """ Removes characters we don't want from a file name """ import re # use a regex to see if this is a sequence: regex = r"\.[0-9]{4}\." frame = "" ext = "" match = re.search(regex, file_name) if match: frame = match.group() filename, ext = file_name.split(frame) elif "." in file_name: filename, ext = os.path.splitext(file_name) else: print("no extension") filename = file_name clean_filename = "".join(x for x in filename if x.isalnum()) clean_filename = clean_filename + frame + ext return clean_filename
[docs] def cgl_clean_dir(dir_path): """ Removes everything in a directory, including recursively removing all the contents of any subdirectories. Args: dir_path: Returns: """ if os.path.exists(dir_path): for root, dirs, files in os.walk(dir_path): for file in files: os.remove(os.path.join(root, file)) for d in dirs: shutil.rmtree(os.path.join(root, d))
[docs] def cgl_move(source, destination, verbose=False): """ Catch All Move Function. handles files and folders on all platforms """ # run_dict = {"start_time": time.time(), "function": "cgl_move()"} if get_file_type(source) == "folder": if platform.system() == "Windows": # Use robocopy on Windows command = r'robocopy.exe "{}" "{}" /S /Move /np /nfl /njh /njs /ndl /nc /ns'.format( source, destination ) cgl_execute(command=command, new_window=True) else: # Use shutil.move() on macOS and Linux shutil.move(source, destination) else: # Use shutil.move() for files on all platforms shutil.move(source, destination)
[docs] def compare_drives(source: str, destination: str): cmd = [ "robocopy", source, destination, "/MIR", "/XO", "/L", # List only "/NJH", "/NJS", "/NDL", "/NP", ] result = subprocess.run( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True ) if result.returncode >= 8: print("Robocopy reported an error:") print(result.stderr) else: print("Files that differ or are missing:") print(result.stdout)
[docs] def cgl_sync_file(source_file: str, dest_dir: str): """ Sync a single file from source to destination folder. Only updates the file if the source version is newer or if it does not exist in the destination. """ if not os.path.exists(source_file): raise ValueError(f"Source file '{source_file}' does not exist.") if not os.path.exists(dest_dir): os.makedirs(dest_dir) # dest_file = os.path.join(dest_dir, os.path.basename(source_file)) # /XO: exclude older files (i.e. skip destination files that are newer) # /FFT: assume FAT file times (2-second granularity) # /NDL: no directory listing # /NFL: no file listing # /NP: no progress # /NJH /NJS: no job header or summary cmd = [ "robocopy", os.path.dirname(source_file), # The source directory of the file dest_dir, os.path.basename(source_file), # Just copy this specific file "/XO", # Exclude older files "/FFT", # Assume FAT file times "/NDL", "/NFL", "/NP", "/NJH", "/NJS", ] print("Running command:") print(" ".join(cmd)) result = subprocess.run( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True ) if result.returncode >= 8: print("Robocopy encountered an error:") print(result.stderr) else: print(f"File {os.path.basename(source_file)} synced successfully.")
[docs] def cgl_sync(source_dir: str, dest_dir: str): """ Sync files from source to destination using robocopy. Only updates files if the source version is newer. Skips identical files. """ if not os.path.exists(source_dir): raise ValueError(f"Source folder '{source_dir}' does not exist.") if not os.path.exists(dest_dir): os.makedirs(dest_dir) # /MIR: mirror directory structure (includes /E) # /XO: exclude older files (i.e. skip destination files that are newer) # /FFT: assume FAT file times (2-second granularity, more forgiving) # /NDL: no directory listing # /NFL: no file listing # /NP: no progress # /NJH /NJS: no job header or summary cmd = [ "robocopy", source_dir, dest_dir, "/MIR", "/XO", "/FFT", "/NDL", "/NFL", "/NP", "/NJH", "/NJS", ] print("Running command:") print(" ".join(cmd)) result = subprocess.run( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True ) if result.returncode >= 8: print("Robocopy encountered an error:") print(result.stderr) else: print("Sync completed successfully.")
[docs] def cgl_copy( source, destination, methodology: str = "local", verbose: bool = False, dest_is_folder: bool = False, test: bool = False, job_name: str = "", wait_to_finish: bool = False, ): """ Catch-all for copy operations. Behavior: - If source is a *sequence* (get_file_type(source) == "sequence"), use the existing sequence path (robocopy on win / copy_file_list otherwise). - If source is a *list of paths*, use copy_file_list. - Otherwise, defer to cgl_copy_single() which already supports both file and directory sources. We ensure `dest_is_folder` is set whenever either source is a directory or destination is/should be a directory. Returns: run_dict (same shape as before) """ start_time = time.time() run_dict = {"start_time": start_time, "function": "cgl_copy()"} source_path = Path(source) if not isinstance(source, list) else None destination_path = Path(destination) print( "Copying files from source to destination folder...", source_path, destination_path, ) # --- Sequence handling stays exactly as you had it --- if isinstance(source, str) and get_file_type(source) == "sequence": dir_name, file_name = os.path.split(source) pattern = f"{file_name.split('###')[0]}*" run_dict["output"] = os.path.join(destination, file_name) if platform.system() == "Windows": command = ( f'robocopy "{dir_name}" "{destination}" "{pattern}" ' "/NFL /NDL /NJH /NJS /nc /ns /np /MT:8" ) temp_dict = cgl_execute( command=command, return_output=True, print_output=False, verbose=verbose, command_name=f"{job_name}:copy_sequence", ) else: source_list = glob.glob(source) temp_dict = copy_file_list( source_list, destination, methodology, verbose, dest_is_folder=True ) run_dict["job_id"] = temp_dict.get("job_id") run_dict["command"] = temp_dict.get("command") run_dict["artist_time"] = get_end_time(start_time) run_dict["end_time"] = time.time() return run_dict # --- List-of-files handling stays as-is --- if isinstance(source, list): temp_dict = copy_file_list( source, destination, methodology, verbose, dest_is_folder ) run_dict["command"] = temp_dict.get("command") run_dict["artist_time"] = get_end_time(start_time) run_dict["end_time"] = time.time() return run_dict # --- Single source: hand off to cgl_copy_single() which handles files *and* dirs --- # Decide if destination should be treated as a folder: # - If the source is a directory, we are copying a folder → treat destination as a folder # - If the destination exists and is a directory → treat as folder # - If the destination path ends with a separator (rare in Path) or clearly looks like a dir (heuristics), # you can add extra rules here if you need them later. if source_path is not None: if source_path.is_dir() or destination_path.is_dir(): dest_is_folder = True temp_dict = cgl_copy_single( str(source_path) if source_path is not None else source, str(destination_path), test=test, verbose=verbose, dest_is_folder=dest_is_folder, ) run_dict["command"] = temp_dict.get("command") run_dict["artist_time"] = get_end_time(start_time) run_dict["end_time"] = time.time() return run_dict
[docs] def copy_file_list(source_list, destination, methodology, verbose, dest_is_folder): if platform.system() == "Windows": # Use robocopy on Windows command = 'robocopy "%s" "%s" /NFL /NDL /NJH /NJS /nc /ns /np /MT:8' % ( " ".join(source_list), destination, ) temp_dict = cgl_execute( command=command, return_output=True, print_output=False, verbose=verbose, ) else: # Use shutil.copy2() on macOS and Linux for source in source_list: shutil.copy2(source, destination) temp_dict = {} # Placeholder for other functionality if needed return temp_dict
[docs] def create_file_dirs(file_path): """ given file_path checks to see if directories exist and creates them if they don't. Args: file_path: path to file you're about to create. """ dirname = os.path.dirname(file_path) if os.path.isdir(file_path): dirname = file_path logging.info(dirname) if not os.path.exists(dirname): os.makedirs(dirname)
[docs] def normpath(filepath): """ returns path with all '\' replaced with '/' Args: filepath: Returns: """ return filepath.replace("\\", "/")
[docs] def cgl_copy_single( source, destination, test=False, verbose=False, dest_is_folder=False, command_name="cgl_copy_single", ): """ Lumbermill Copy Function. Built to handle any kind of copy interaction. """ run_dict = {"function": "cgl_copy_single()"} if verbose: logging.info("copying %s to %s" % (source, destination)) command = None if sys.platform == "win32": source = source.replace("/", "\\") destination = destination.replace("/", "\\") # make sure the destination directories exist if dest_is_folder: if not os.path.exists(destination): os.makedirs(destination) else: directory = os.path.dirname(destination) if not os.path.exists(directory): os.makedirs(directory) if os.path.isdir(source): # what to do if we're copying a directory to another directory run_dict["command_type"] = "directory to directory" command = f'robocopy "{source}" "{destination}" /NFL /NDL /NJH /NJS /nc /ns /np /MT:8 /E' else: dir_, file_ = os.path.split(source) # We are dealing with a single file. if dest_is_folder: # Destination is a Folder run_dict["command_type"] = "single file to directory" command = ( 'robocopy "%s" "%s" "%s" /NFL /NDL /NJH /NJS /nc /ns /np /MT:8' % (dir_, destination, file_) ) else: # Destination is a file with a different name run_dict["command_type"] = "file to renamed file" # TODO - check to ensure the files have the same extension. command = f'copy "{source}" "{destination}" /Y' if command: if test: logging.info(command) else: run_dict["start_time"] = time.time() run_dict["command"] = command # FIX: Use shell=True for Windows commands cgl_execute( command=command, print_output=False, verbose=verbose, command_name=command_name, ) run_dict["artist_time"] = time.time() - run_dict["start_time"] run_dict["end_time"] = time.time() return run_dict elif sys.platform in ["darwin", "linux", "linux2"]: # ... existing code for Unix systems ... # For rsync commands, you might also need shell=True if command: if test: logging.info(command) else: run_dict["start_time"] = time.time() run_dict["command"] = command cgl_execute( command=command, print_output=False, verbose=verbose, command_name=command_name, shell=True, # ← Add this for consistency ) run_dict["artist_time"] = time.time() - run_dict["start_time"] run_dict["end_time"] = time.time() return run_dict else: logging.info("%s is not a supported platform" % sys.platform) return False
[docs] def split_all(path): """ It splits a path into all of its parts, and returns a list of those parts Args: path: The path to split Returns: A list of all the parts of the path. """ if isinstance(path, pathlib.Path): path = path.as_posix() if "\\" in path: path = path.replace("\\", "/") path = pathlib.Path(path) parts = list(path.parts) return parts
[docs] def atomic_replace(tmp: Path, target: Path, retries=5): for i in range(retries): try: os.replace(tmp, target) return except PermissionError: if i == retries - 1: raise time.sleep(0.05 + random.random() * 0.1)
[docs] def save_json(path: Path, data: dict): if not isinstance(path, Path): path = Path(path) tmp = path.with_suffix(".tmp") with tmp.open("w", encoding="utf-8") as f: json.dump(data, f, indent=2) atomic_replace(tmp, path)
[docs] def load_json(filepath): try: with open(filepath) as jsonfile: data = json.load(jsonfile) return data except IOError or FileNotFoundError: print("Error reading file %s" % filepath)
[docs] def fix_json(filepath): """ Function to fix json file if there's no newlines Args: filepath: Path to json file """ save_json(filepath, load_json(filepath))
[docs] def save_xml(filepath, data): import xmltodict with open(filepath, "w") as outfile: outfile.write(xmltodict.unparse(data))
[docs] def load_xml(filepath): import xmltodict with open(filepath) as xmlfile: docs = xmltodict.parse(xmlfile.read()) return docs
def __describe_object(obj, data): import inspect x = inspect.getsourcefile(obj.__class__) if not x: x = "no source :(" d = "%s:%s- %s" % (obj.__class__.__name__, obj.objectName(), x) data.append(d)
[docs] def display_debug_widget(parent): # This terrible and I shouldnt do it. def recurse_children(parent, pos, data): obj = parent.childAt(pos) if obj: while obj.parent() is not None: __describe_object(obj, data) obj = obj.parent() __describe_object(obj, data) def __custom_notify(receiver, event): qapp = QtWidgets.QApplication.instance() if ( event.type() == QtGui.QMouseEvent.MouseButtonPress and event.button() == QtCore.Qt.RightButton and event.modifiers() == QtCore.Qt.ControlModifier ): pos = event.pos() text = [] widget = app._parent recurse_children(widget, pos, text) app.text.setText("EMPTY") app.text.setText("\n".join(text)) return qapp.original_notify(receiver, event) class DebugWidget(QtWidgets.QDialog): def __init__(self, par): from cgl.ui.widgets.base import restore_size super().__init__(par) self.text = QtWidgets.QTextEdit(self) lay = QtWidgets.QHBoxLayout() self.text.setText("Ctrl-LeftClick widget to see information") self.text.setStyleSheet("color:white;") lay.addWidget(self.text) self.setLayout(lay) self.setWindowTitle("WIDGET DEBUG") self.setMinimumSize(300, 200) self.show() restore_size(self) def closeEvent(self, event): from cgl.ui.util import UISettings, widget_name geo = self.geometry() settings = UISettings.settings() settings.setValue(widget_name(self), geo) super().closeEvent(event) dev_mode = os.getenv("DEV_MODE") if dev_mode: dw = DebugWidget(parent) dw.show() app = QtWidgets.QApplication.instance() app.original_notify = app.notify app.notify = __custom_notify app.text = dw.text app._parent = parent
[docs] def load_fonts_from_directory(directory=None): if not directory: directory = os.path.join(CFG.get_code_root(), "resources", "fonts") for font_file in os.listdir(directory): if font_file.endswith(".ttf") or font_file.endswith(".otf"): font_path = os.path.join(directory, font_file) font_id = QtGui.QFontDatabase.addApplicationFont(font_path) print('Loaded font "{}" with ID: {}'.format(font_file, font_id))
[docs] def get_resource_path(relative_path): """Get absolute path to resource, works for PyInstaller and development.""" if hasattr(sys, "_MEIPASS"): # Running in a PyInstaller bundle base_path = sys._MEIPASS else: # Running in development mode base_path = os.path.abspath(os.path.dirname(__file__)) return os.path.join(base_path, relative_path)
[docs] def apply_theme(app, theme_name="alchemy_theme.qss", dev_mode=False): from pathlib import Path theme_path = f":/{theme_name}" f = QtCore.QFile(theme_path) print(theme_path) if f.exists() and f.open(QtCore.QIODevice.ReadOnly | QtCore.QFile.Text): print(f"Applying theme from {theme_path}") qss = QtCore.QTextStream(f).readAll() app.setStyleSheet(qss) return True if dev_mode: from cgl.ui.startup import ThemeFileWatcher theme_path = Path(CFG.get_code_root()) / "resources" / "alchemy_theme.qss" f = QtCore.QFile(theme_path.as_posix()) app.theme_watcher = ThemeFileWatcher(theme_path.as_posix()) # enable inspector with click-open features alc_code_root = CFG.get_code_root() enable_dev_tools( app, css_file=theme_path.as_posix(), project_root=alc_code_root )
[docs] def load_style_sheet(style_file="stylesheet.css", app=None, dev_mode=0): import logging import os from PySide6 import QtCore if app is None: app = QtWidgets.QApplication.instance() alc_code_root = CFG.get_code_root() if not QtCore.QDir.searchPaths("resources"): QtCore.QDir.addSearchPath("resources", os.path.join(alc_code_root, "resources")) file_ = os.path.join(alc_code_root, "resources", style_file) if dev_mode: # your existing watcher (if PySide6-safe) if "cglumberjack" in __file__: from cgl.ui.startup import ThemeFileWatcher app.theme_watcher = ThemeFileWatcher(file_) print("dev mode -> watching css file:", file_) # enable inspector with click-open features enable_dev_tools(app, css_file=file_, project_root=alc_code_root) logging.info(f"STYLE FILE {file_}") if os.path.exists(file_): with open(file_, "r", encoding="utf-8") as f: data = f.read() app.setStyleSheet(data)
def _clean_env(extra_env: Optional[Mapping[str, str]] = None) -> Mapping[str, str]: env = dict(os.environ.copy()) for k in ("QT_PLUGIN_PATH", "QT_QPA_PLATFORM_PLUGIN_PATH"): env.pop(k, None) if extra_env: env.update(extra_env) return env def _normalize_command(command: Sequence[str] | str, command_name: str) -> list[str]: """ Normalize the command input to ensure it is a list of strings. Args: command (Sequence[str] | str): The command to normalize, can be a string or a sequence of strings. command_name (str): A name for the command Returns: list[str]: A list of strings representing the command. Raises: CglExecuteError: If the command is a 'cd' command or a drive change """ if isinstance(command, str): command = shlex.split(command) command = list(command) if command and command[0].lower() == "cd": raise CglExecuteError( f"[{command_name}] 'cd' detected in command {command}. " f"Use the 'working_directory' argument instead.", command=command, ) if os.name == "nt" and command and re.match(r"^[A-Z]:$", command[0]): raise CglExecuteError( f"[{command_name}] Drive change '{command[0]}' detected. " f"Use the 'working_directory' argument instead. example: cgl_execute(command, working_directory='D:/')", command=command, ) return command def _platform_flags(new_window: bool, detach: bool): """ Determine platform-specific flags for process creation. Args: new_window (bool): If True, create a new console window for the command. detach (bool): If True, detach the process from the parent. """ creationflags, preexec_fn = 0, None if os.name == "nt": DETACHED_PROCESS = 0x00000008 CREATE_NEW_PROCESS_GROUP = 0x00000200 CREATE_NEW_CONSOLE = 0x00000010 if new_window: creationflags |= CREATE_NEW_CONSOLE | CREATE_NEW_PROCESS_GROUP elif detach: creationflags |= DETACHED_PROCESS | CREATE_NEW_PROCESS_GROUP else: if detach: preexec_fn = os.setsid return creationflags, preexec_fn def _run_blocking( command, cmd_display, command_name, proc_env, cwd, creationflags, preexec_fn, timeout, print_output, start, shell=False, ): """ Run a command in a blocking way, capturing stdout and stderr. If the command fails to start, it raises CglExecuteError with details. If the command is not found, it raises CglExecuteError with a 127 return code. If the command is found, it returns a dictionary with process info. Args: command (list[str]): The command to run, as a list of strings. cmd_display (str): A string representation of the command for logging. command_name (str): A name for the command, used in error messages. proc_env (Mapping[str, str]): Environment variables to use for the process. cwd (str | None): The working directory to run the command in. creationflags (int): Flags for process creation (Windows-specific). preexec_fn (callable | None): Function to run before executing the command (Unix-specific). timeout (float | None): Timeout for the command execution, in seconds. print_output (bool): Whether to print stdout and stderr to the console. start (float): The start time of the command execution, used for duration calculation. Raises: CglExecuteError: If the command fails to start, times out, or returns a non-zero exit code. Returns: dict: A dictionary containing the command, command name, process ID (if applicable), return code, stdout, stderr, start time, end time, and duration of the command execution. """ try: proc = subprocess.Popen( command, env=proc_env, cwd=cwd, creationflags=creationflags, preexec_fn=preexec_fn, shell=shell, # ← Add shell parameter ) except FileNotFoundError: try: proc = subprocess.run( command, env=proc_env, cwd=cwd, capture_output=True, text=True, creationflags=creationflags if os.name == "nt" else 0, preexec_fn=preexec_fn, timeout=timeout, ) except FileNotFoundError as e: raise CglExecuteError( f"[{command_name}] Executable not found: {e}", command=cmd_display, returncode=127, stderr=str(e), ) except subprocess.TimeoutExpired as e: raise CglExecuteError( f"[{command_name}] Timeout after {timeout}s: {cmd_display}", command=cmd_display, stdout=e.stdout, stderr=e.stderr, ) stdout, stderr = proc.stdout or "", proc.stderr or "" if print_output and stdout: print(stdout, end="") if print_output and stderr: print(stderr, end="") if proc.returncode != 0: # Special-case robocopy: exit codes < 8 are not errors if "robocopy" in cmd_display.lower() and proc.returncode < 8: return { "command": cmd_display, "command_name": command_name, "pid": None, "returncode": proc.returncode, "stdout": stdout, "stderr": stderr, "start_time": start, "end_time": time.time(), "duration": time.time() - start, } raise CglExecuteError( f"[{command_name}] Command failed (code {proc.returncode}): {cmd_display}", command=cmd_display, returncode=proc.returncode, stdout=stdout, stderr=stderr, ) return { "command": cmd_display, "command_name": command_name, "pid": None, "returncode": proc.returncode, "stdout": stdout, "stderr": stderr, "start_time": start, "end_time": time.time(), "duration": time.time() - start, } def _run_nonblocking( command, cmd_display, command_name, proc_env, cwd, creationflags, preexec_fn, start ): """ Run a command in a non-blocking way, returning immediately with process info. This is useful for fire-and-forget operations where you don't need to capture output. If the command fails to start, it raises CglExecuteError with details. If the command is not found, it raises CglExecuteError with a 127 return code. If the command is found, it returns a dictionary with process info. Args: command (list[str]): The command to run, as a list of strings. cmd_display (str): A string representation of the command for logging. command_name (str): A name for the command, used in error messages. proc_env (Mapping[str, str]): Environment variables to use for the process. cwd (str | None): The working directory to run the command in. creationflags (int): Flags for process creation (Windows-specific). preexec_fn (callable | None): Function to run before executing the command (Unix-specific). start (float): The start time of the command execution, used for duration calculation. """ try: proc = subprocess.Popen( command, env=proc_env, cwd=cwd, creationflags=creationflags, preexec_fn=preexec_fn, ) except FileNotFoundError as e: raise CglExecuteError( f"[{command_name}] Executable not found: {e}", command=cmd_display, returncode=127, stderr=str(e), ) return { "command": cmd_display, "command_name": command_name, "pid": proc.pid, "returncode": None, "stdout": "", "stderr": "", "start_time": start, "end_time": time.time(), "duration": time.time() - start, } def _prepare_command_for_subprocess( command: str | Sequence[str], ) -> tuple[str | list[str], bool]: """ Inspect and normalize the command. Returns (normalized_command, use_shell) """ if isinstance(command, (list, tuple)): head = str(command[0]).lower() if os.name == "nt" and head in { "cmd", "cmd.exe", "copy", "del", "move", "rmdir", "start", }: return " ".join(map(str, command)), True return list(map(str, command)), False # It's a string cmd_str = command.strip() lower = cmd_str.lower() # Detect shell operators or Windows builtins if any( x in lower for x in [ "&&", "||", "|", ">", "<", "cmd /c", "start ", "copy ", "del ", "move ", ] ): return cmd_str, True # Default: treat as normal executable command return shlex.split(cmd_str), False
[docs] def cgl_execute( command: Sequence[str] | str, *, return_output: bool = True, print_output: bool = True, verbose: bool = True, command_name: str = "cgl_execute", new_window: bool = False, detach: bool = False, env: Optional[Mapping[str, str]] = None, timeout: Optional[float] = None, working_directory: Optional[str | Path] = None, ) -> dict[str, Any]: """ Runs a local process with smart shell detection. Replaces the old _run_blocking/_run_nonblocking pattern. """ # Normalize cmd, use_shell = _prepare_command_for_subprocess(command) cmd_display = " ".join(map(str, cmd)) if isinstance(cmd, (list, tuple)) else cmd # Logging if verbose: msg = f"[{command_name}] Executing: {cmd_display}" if working_directory: msg += f" from {working_directory}" if use_shell: msg += " [SHELL MODE]" logging.info(msg) # Environment & flags proc_env = _clean_env(env) cwd = str(working_directory) if working_directory else None creationflags, preexec_fn = _platform_flags(new_window, detach) start = time.time() try: if return_output: proc = subprocess.run( cmd, env=proc_env, cwd=cwd, shell=use_shell, capture_output=True, text=True, creationflags=creationflags if os.name == "nt" else 0, preexec_fn=preexec_fn, timeout=timeout, ) stdout, stderr = proc.stdout or "", proc.stderr or "" else: proc = subprocess.Popen( cmd, env=proc_env, cwd=cwd, shell=use_shell, creationflags=creationflags, preexec_fn=preexec_fn, ) stdout, stderr = "", "" except FileNotFoundError as e: raise CglExecuteError( f"[{command_name}] Executable not found: {e}", command=cmd_display, returncode=127, ) except subprocess.TimeoutExpired: raise CglExecuteError( f"[{command_name}] Timeout after {timeout}s: {cmd_display}", command=cmd_display, ) # Evaluate result if return_output and proc.returncode not in (0, None): # Special-case robocopy success codes (<8) if "robocopy" in cmd_display.lower() and proc.returncode < 8: pass else: raise CglExecuteError( f"[{command_name}] Command failed (code {proc.returncode}): {cmd_display}", command=cmd_display, returncode=proc.returncode, stdout=stdout, stderr=stderr, ) return { "command": cmd_display, "command_name": command_name, "pid": getattr(proc, "pid", None), "returncode": getattr(proc, "returncode", None), "stdout": stdout, "stderr": stderr, "start_time": start, "end_time": time.time(), "duration": time.time() - start, }
[docs] def get_end_time(start_time): return time.time() - start_time
[docs] def get_job_id(): return str(time.time()).replace(".", "")
# # def write_to_cgl_data(path_object, process_info): # cfg = ProjectConfig(path_object) # job_id = None # if "job_id" in process_info.keys(): # if process_info["job_id"]: # job_id = process_info["job_id"] # else: # process_info["job_id"] = get_job_id() # user = current_user() # cgl_data = os.path.join(ProjectConfig().globals_root, "cgl_data.json") # if os.path.exists(cgl_data): # data = load_json(cgl_data) # else: # data = {} # if user not in data.keys(): # data[user] = {} # if job_id not in data[user].keys(): # data[user][process_info["job_id"]] = process_info # else: # logging.info("%s already exists in %s dict" % (process_info["job_id"], user)) # return # save_json(cgl_data, data)
[docs] def screen_grab(path_object): """ 1) takes a screen grab and saves it to the "preview_path" variable on PathObject() 2) creates a "thumbnail" at the "thumb_path" from PathObject() 3) updates ftrack or shotgrid with the new thumbnail. """ import cgl.core.review as review import cgl.core.screen_grab as sg ppath = sg.run(path_object=path_object) review.create_movie_thumbnail(path_object) print("Creating Preview at: {}".format(ppath)) print("Creating thumb at: {}".format(path_object.thumb_path))
# path_object.update_test_project_msd(attr='preview_file') # path_object.update_test_project_msd(attr='thumb_file')
[docs] def remove_read_only_attribute(file_path): """ Removes read only attribute from file Args: file_path: Absolute file path to file """ os.chmod(file_path, stat.S_IWRITE)
[docs] def get_branch_name(): cmd = "git symbolic-ref --short -q HEAD" branch_name = cgl_execute(command=cmd, return_output=True) return branch_name["stdout"].splitlines()[0]
[docs] def ext_exists(folder_path, ext, as_posix=True) -> list[Path] or list[str]: """ Args: folder_path: directory to search ext: extention you're looking for. expecting a ".ext" but "ext" will likely work too. Returns: list of Path objects that match the extension in the folder_path """ folder_path = Path(folder_path) if not folder_path.is_dir(): folder_path = folder_path.parent results = list(folder_path.glob("*" + ext)) if as_posix: results = [p.as_posix() for p in results] return results
[docs] class Logger: def __init__(self, name=None, log_file=None, log_level=logging.DEBUG): if name is None: self.logger = logging.getLogger() else: self.logger = logging.getLogger(name) self.logger.setLevel(log_level) formatter = logging.Formatter( "%(asctime)s - %(name)s - %(levelname)s - %(message)s" ) if self.logger.hasHandlers(): self.logger.handlers.clear() console_handler = logging.StreamHandler() console_handler.setFormatter(formatter) self.logger.addHandler(console_handler) if log_file: file_handler = RotatingFileHandler( log_file, maxBytes=1048576 * 2, backupCount=5 ) file_handler.setFormatter(formatter) self.logger.addHandler(file_handler)
[docs] def set_log_level(self, level): """ The function sets the log level of a logger object. Args: level: The "level" parameter is used to set the logging level for the logger object. The logging level determines the severity of the messages that will be logged. The available logging levels are: DEBUG, INFO, WARN, ERROR """ self.logger.setLevel(level)
[docs] class DateTimeEncoder(json.JSONEncoder):
[docs] def default(self, obj): if isinstance(obj, datetime): return obj.strftime("%Y-%m-%dT%H:%M:%S") return super().default(obj)
[docs] def get_print_list(list_array): """ Returns a string of a list with commas and an "and" before the last item. Args: list_array: Returns: """ print_list = "" if list_array: for item in list_array: print_list += f"{item}, " # remove the last comma and space if print_list: print_list = print_list[:-2] return print_list
[docs] def get_file_type(filepath) -> str: """ Returns the file type of the given filepath Args: filepath: Returns: The file type of the given filepath Raises: KeyError: If the file type is not found in the config file """ ft = "file" if "." not in filepath: ft = "folder" if "####" in filepath: ft = "sequence" if "%0" in filepath: ft = "sequence" if ft == "file": file_, ext = os.path.splitext(filepath) util_config = CFG.get_util_config() try: ft = util_config["ext_map"][ext.lower()] except KeyError: logging.error("No file type found for %s" % ext.lower()) ft = "NA" return ft