from __future__ import annotations
import shlex
import getpass
import glob
import json
import logging
import os
import pathlib
import platform
import pprint
import shutil
import stat
import subprocess
import sys
import time
import traceback
import re
import random
from datetime import datetime
from logging.handlers import RotatingFileHandler
from pathlib import Path
from typing import Optional, Sequence, Mapping, Any
from PySide6 import QtCore, QtGui, QtWidgets
from cgl.apps.css_editor.style_inspector import enable_dev_tools
from cgl.core.config.query import AlchemyConfigManager
CFG = AlchemyConfigManager()
[docs]
def pretty(obj):
"""
return a pretty printed representation of an object
Args:
obj: the object to pretty print
Returns:
str:
"""
pp = pprint.PrettyPrinter(indent=4)
return pp.pformat(obj)
[docs]
def pretty_print_error(e: CglExecuteError):
"""
Nicely format a CglExecuteError for logging or console output.
"""
border = "=" * 60
lines = [
border,
f"❌ Command failed: {e.command}",
f"Return code: {e.returncode}",
]
if e.stdout:
lines.append("\n--- STDOUT ---\n" + e.stdout.strip())
if e.stderr:
lines.append("\n--- STDERR ---\n" + e.stderr.strip())
lines.append(border)
return "\n".join(lines)
[docs]
class CglExecuteError(RuntimeError):
"""Raised when cgl_execute is misused or the child process fails."""
def __init__(
self, message, command=None, returncode=None, stdout=None, stderr=None
):
super().__init__(message)
self.command = command
self.returncode = returncode
self.stdout = stdout
self.stderr = stderr
[docs]
def app_name(str_=None, human=False):
"""
return the name of the application used in settings files
Args:
str_: the name to map from defaults sys.argv[0]
human: a human-readable string
Returns:
str: name of application
"""
supported_apps = ["maya", "nuke", "houdini", "unreal", "unity"]
if str_ is None:
str_ = sys.argv[0]
title = os.path.basename(str_)
if human:
title = title.replace("_", " ")
title, _ = os.path.splitext(title)
if title not in supported_apps:
title = "magic_browser"
return title
[docs]
def current_user():
"""
find the currently logged-in user
Returns:
str: username
"""
user = getpass.getuser().lower()
if user:
return user
else:
logging.info("Cant find user")
return None
[docs]
def clean_file_list(file_list):
"""
removes items we don't want to display in the GUI based off what's listed in the globals.
Args:
file_list:
Returns:
clean_list: list of files with items removed
"""
util_config = CFG.get_util_config()
if "ignore" in util_config.keys():
ignore_matches = util_config["ignore"]["matches"]
ignore_contains = util_config["ignore"]["contains"]
ignore_endswith = util_config["ignore"]["endswith"]
ignore_startswith = util_config["ignore"]["startswith"]
else:
# update_globals()
logging.info("Found Missing Globals and updated them. Try Launching again")
return
clean_list = []
for f in file_list:
if "\\" in f or "/" in f:
folder, name = os.path.split(f)
else:
name = f
# Validate the ignore rules
if not ignore_name(
name, ignore_matches, ignore_contains, ignore_endswith, ignore_startswith
):
clean_list.append(f)
return clean_list
[docs]
def ignore_name(
name, ignore_matches, ignore_contains, ignore_endswith, ignore_startswith
):
"""
Check if a file should be ignored based on the ignore rules
"""
if ignore_matches:
# Check if the name matches exactly
for match in ignore_matches:
if match == name:
return True
if ignore_contains:
# Check if the name contains the string
for contain in ignore_contains:
if contain in name:
return True
if ignore_endswith:
# Check if the name ends with the string after the period
for endswith in ignore_endswith:
if name.endswith(endswith):
return True
if ignore_startswith:
# Check if the name starts with the string
for startswith in ignore_startswith:
if name.startswith(startswith):
return True
return False
[docs]
def clean_file_string(file_name):
"""
Removes characters we don't want from a file name
"""
import re
# use a regex to see if this is a sequence:
regex = r"\.[0-9]{4}\."
frame = ""
ext = ""
match = re.search(regex, file_name)
if match:
frame = match.group()
filename, ext = file_name.split(frame)
elif "." in file_name:
filename, ext = os.path.splitext(file_name)
else:
print("no extension")
filename = file_name
clean_filename = "".join(x for x in filename if x.isalnum())
clean_filename = clean_filename + frame + ext
return clean_filename
[docs]
def cgl_clean_dir(dir_path):
"""
Removes everything in a directory, including recursively removing all the contents of any subdirectories.
Args:
dir_path:
Returns:
"""
if os.path.exists(dir_path):
for root, dirs, files in os.walk(dir_path):
for file in files:
os.remove(os.path.join(root, file))
for d in dirs:
shutil.rmtree(os.path.join(root, d))
[docs]
def cgl_move(source, destination, verbose=False):
"""
Catch All Move Function. handles files and folders on all platforms
"""
# run_dict = {"start_time": time.time(), "function": "cgl_move()"}
if get_file_type(source) == "folder":
if platform.system() == "Windows":
# Use robocopy on Windows
command = r'robocopy.exe "{}" "{}" /S /Move /np /nfl /njh /njs /ndl /nc /ns'.format(
source, destination
)
cgl_execute(command=command, new_window=True)
else:
# Use shutil.move() on macOS and Linux
shutil.move(source, destination)
else:
# Use shutil.move() for files on all platforms
shutil.move(source, destination)
[docs]
def compare_drives(source: str, destination: str):
cmd = [
"robocopy",
source,
destination,
"/MIR",
"/XO",
"/L", # List only
"/NJH",
"/NJS",
"/NDL",
"/NP",
]
result = subprocess.run(
cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True
)
if result.returncode >= 8:
print("Robocopy reported an error:")
print(result.stderr)
else:
print("Files that differ or are missing:")
print(result.stdout)
[docs]
def cgl_sync_file(source_file: str, dest_dir: str):
"""
Sync a single file from source to destination folder.
Only updates the file if the source version is newer or if it does not exist in the destination.
"""
if not os.path.exists(source_file):
raise ValueError(f"Source file '{source_file}' does not exist.")
if not os.path.exists(dest_dir):
os.makedirs(dest_dir)
# dest_file = os.path.join(dest_dir, os.path.basename(source_file))
# /XO: exclude older files (i.e. skip destination files that are newer)
# /FFT: assume FAT file times (2-second granularity)
# /NDL: no directory listing
# /NFL: no file listing
# /NP: no progress
# /NJH /NJS: no job header or summary
cmd = [
"robocopy",
os.path.dirname(source_file), # The source directory of the file
dest_dir,
os.path.basename(source_file), # Just copy this specific file
"/XO", # Exclude older files
"/FFT", # Assume FAT file times
"/NDL",
"/NFL",
"/NP",
"/NJH",
"/NJS",
]
print("Running command:")
print(" ".join(cmd))
result = subprocess.run(
cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True
)
if result.returncode >= 8:
print("Robocopy encountered an error:")
print(result.stderr)
else:
print(f"File {os.path.basename(source_file)} synced successfully.")
[docs]
def cgl_sync(source_dir: str, dest_dir: str):
"""
Sync files from source to destination using robocopy.
Only updates files if the source version is newer.
Skips identical files.
"""
if not os.path.exists(source_dir):
raise ValueError(f"Source folder '{source_dir}' does not exist.")
if not os.path.exists(dest_dir):
os.makedirs(dest_dir)
# /MIR: mirror directory structure (includes /E)
# /XO: exclude older files (i.e. skip destination files that are newer)
# /FFT: assume FAT file times (2-second granularity, more forgiving)
# /NDL: no directory listing
# /NFL: no file listing
# /NP: no progress
# /NJH /NJS: no job header or summary
cmd = [
"robocopy",
source_dir,
dest_dir,
"/MIR",
"/XO",
"/FFT",
"/NDL",
"/NFL",
"/NP",
"/NJH",
"/NJS",
]
print("Running command:")
print(" ".join(cmd))
result = subprocess.run(
cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True
)
if result.returncode >= 8:
print("Robocopy encountered an error:")
print(result.stderr)
else:
print("Sync completed successfully.")
[docs]
def cgl_copy(
source,
destination,
methodology: str = "local",
verbose: bool = False,
dest_is_folder: bool = False,
test: bool = False,
job_name: str = "",
wait_to_finish: bool = False,
):
"""
Catch-all for copy operations.
Behavior:
- If source is a *sequence* (get_file_type(source) == "sequence"),
use the existing sequence path (robocopy on win / copy_file_list otherwise).
- If source is a *list of paths*, use copy_file_list.
- Otherwise, defer to cgl_copy_single() which already supports both
file and directory sources. We ensure `dest_is_folder` is set whenever
either source is a directory or destination is/should be a directory.
Returns:
run_dict (same shape as before)
"""
start_time = time.time()
run_dict = {"start_time": start_time, "function": "cgl_copy()"}
source_path = Path(source) if not isinstance(source, list) else None
destination_path = Path(destination)
print(
"Copying files from source to destination folder...",
source_path,
destination_path,
)
# --- Sequence handling stays exactly as you had it ---
if isinstance(source, str) and get_file_type(source) == "sequence":
dir_name, file_name = os.path.split(source)
pattern = f"{file_name.split('###')[0]}*"
run_dict["output"] = os.path.join(destination, file_name)
if platform.system() == "Windows":
command = (
f'robocopy "{dir_name}" "{destination}" "{pattern}" '
"/NFL /NDL /NJH /NJS /nc /ns /np /MT:8"
)
temp_dict = cgl_execute(
command=command,
return_output=True,
print_output=False,
verbose=verbose,
command_name=f"{job_name}:copy_sequence",
)
else:
source_list = glob.glob(source)
temp_dict = copy_file_list(
source_list, destination, methodology, verbose, dest_is_folder=True
)
run_dict["job_id"] = temp_dict.get("job_id")
run_dict["command"] = temp_dict.get("command")
run_dict["artist_time"] = get_end_time(start_time)
run_dict["end_time"] = time.time()
return run_dict
# --- List-of-files handling stays as-is ---
if isinstance(source, list):
temp_dict = copy_file_list(
source, destination, methodology, verbose, dest_is_folder
)
run_dict["command"] = temp_dict.get("command")
run_dict["artist_time"] = get_end_time(start_time)
run_dict["end_time"] = time.time()
return run_dict
# --- Single source: hand off to cgl_copy_single() which handles files *and* dirs ---
# Decide if destination should be treated as a folder:
# - If the source is a directory, we are copying a folder → treat destination as a folder
# - If the destination exists and is a directory → treat as folder
# - If the destination path ends with a separator (rare in Path) or clearly looks like a dir (heuristics),
# you can add extra rules here if you need them later.
if source_path is not None:
if source_path.is_dir() or destination_path.is_dir():
dest_is_folder = True
temp_dict = cgl_copy_single(
str(source_path) if source_path is not None else source,
str(destination_path),
test=test,
verbose=verbose,
dest_is_folder=dest_is_folder,
)
run_dict["command"] = temp_dict.get("command")
run_dict["artist_time"] = get_end_time(start_time)
run_dict["end_time"] = time.time()
return run_dict
[docs]
def copy_file_list(source_list, destination, methodology, verbose, dest_is_folder):
if platform.system() == "Windows":
# Use robocopy on Windows
command = 'robocopy "%s" "%s" /NFL /NDL /NJH /NJS /nc /ns /np /MT:8' % (
" ".join(source_list),
destination,
)
temp_dict = cgl_execute(
command=command,
return_output=True,
print_output=False,
verbose=verbose,
)
else:
# Use shutil.copy2() on macOS and Linux
for source in source_list:
shutil.copy2(source, destination)
temp_dict = {} # Placeholder for other functionality if needed
return temp_dict
[docs]
def create_file_dirs(file_path):
"""
given file_path checks to see if directories exist and creates them if they don't.
Args:
file_path: path to file you're about to create.
"""
dirname = os.path.dirname(file_path)
if os.path.isdir(file_path):
dirname = file_path
logging.info(dirname)
if not os.path.exists(dirname):
os.makedirs(dirname)
[docs]
def normpath(filepath):
"""
returns path with all '\' replaced with '/'
Args:
filepath:
Returns:
"""
return filepath.replace("\\", "/")
[docs]
def cgl_copy_single(
source,
destination,
test=False,
verbose=False,
dest_is_folder=False,
command_name="cgl_copy_single",
):
"""
Lumbermill Copy Function. Built to handle any kind of copy interaction.
"""
run_dict = {"function": "cgl_copy_single()"}
if verbose:
logging.info("copying %s to %s" % (source, destination))
command = None
if sys.platform == "win32":
source = source.replace("/", "\\")
destination = destination.replace("/", "\\")
# make sure the destination directories exist
if dest_is_folder:
if not os.path.exists(destination):
os.makedirs(destination)
else:
directory = os.path.dirname(destination)
if not os.path.exists(directory):
os.makedirs(directory)
if os.path.isdir(source):
# what to do if we're copying a directory to another directory
run_dict["command_type"] = "directory to directory"
command = f'robocopy "{source}" "{destination}" /NFL /NDL /NJH /NJS /nc /ns /np /MT:8 /E'
else:
dir_, file_ = os.path.split(source)
# We are dealing with a single file.
if dest_is_folder:
# Destination is a Folder
run_dict["command_type"] = "single file to directory"
command = (
'robocopy "%s" "%s" "%s" /NFL /NDL /NJH /NJS /nc /ns /np /MT:8'
% (dir_, destination, file_)
)
else:
# Destination is a file with a different name
run_dict["command_type"] = "file to renamed file"
# TODO - check to ensure the files have the same extension.
command = f'copy "{source}" "{destination}" /Y'
if command:
if test:
logging.info(command)
else:
run_dict["start_time"] = time.time()
run_dict["command"] = command
# FIX: Use shell=True for Windows commands
cgl_execute(
command=command,
print_output=False,
verbose=verbose,
command_name=command_name,
)
run_dict["artist_time"] = time.time() - run_dict["start_time"]
run_dict["end_time"] = time.time()
return run_dict
elif sys.platform in ["darwin", "linux", "linux2"]:
# ... existing code for Unix systems ...
# For rsync commands, you might also need shell=True
if command:
if test:
logging.info(command)
else:
run_dict["start_time"] = time.time()
run_dict["command"] = command
cgl_execute(
command=command,
print_output=False,
verbose=verbose,
command_name=command_name,
shell=True, # ← Add this for consistency
)
run_dict["artist_time"] = time.time() - run_dict["start_time"]
run_dict["end_time"] = time.time()
return run_dict
else:
logging.info("%s is not a supported platform" % sys.platform)
return False
[docs]
def split_all(path):
"""
It splits a path into all of its parts, and returns a list of those parts
Args:
path: The path to split
Returns:
A list of all the parts of the path.
"""
if isinstance(path, pathlib.Path):
path = path.as_posix()
if "\\" in path:
path = path.replace("\\", "/")
path = pathlib.Path(path)
parts = list(path.parts)
return parts
[docs]
def atomic_replace(tmp: Path, target: Path, retries=5):
for i in range(retries):
try:
os.replace(tmp, target)
return
except PermissionError:
if i == retries - 1:
raise
time.sleep(0.05 + random.random() * 0.1)
[docs]
def save_json(path: Path, data: dict):
if not isinstance(path, Path):
path = Path(path)
tmp = path.with_suffix(".tmp")
with tmp.open("w", encoding="utf-8") as f:
json.dump(data, f, indent=2)
atomic_replace(tmp, path)
[docs]
def load_json(filepath):
try:
with open(filepath) as jsonfile:
data = json.load(jsonfile)
return data
except IOError or FileNotFoundError:
print("Error reading file %s" % filepath)
[docs]
def fix_json(filepath):
"""
Function to fix json file if there's no newlines
Args:
filepath: Path to json file
"""
save_json(filepath, load_json(filepath))
[docs]
def save_xml(filepath, data):
import xmltodict
with open(filepath, "w") as outfile:
outfile.write(xmltodict.unparse(data))
[docs]
def load_xml(filepath):
import xmltodict
with open(filepath) as xmlfile:
docs = xmltodict.parse(xmlfile.read())
return docs
def __describe_object(obj, data):
import inspect
x = inspect.getsourcefile(obj.__class__)
if not x:
x = "no source :("
d = "%s:%s- %s" % (obj.__class__.__name__, obj.objectName(), x)
data.append(d)
[docs]
def print_call_stack():
traceback.print_stack()
[docs]
def load_fonts_from_directory(directory=None):
if not directory:
directory = os.path.join(CFG.get_code_root(), "resources", "fonts")
for font_file in os.listdir(directory):
if font_file.endswith(".ttf") or font_file.endswith(".otf"):
font_path = os.path.join(directory, font_file)
font_id = QtGui.QFontDatabase.addApplicationFont(font_path)
print('Loaded font "{}" with ID: {}'.format(font_file, font_id))
[docs]
def get_resource_path(relative_path):
"""Get absolute path to resource, works for PyInstaller and development."""
if hasattr(sys, "_MEIPASS"):
# Running in a PyInstaller bundle
base_path = sys._MEIPASS
else:
# Running in development mode
base_path = os.path.abspath(os.path.dirname(__file__))
return os.path.join(base_path, relative_path)
[docs]
def apply_theme(app, theme_name="alchemy_theme.qss", dev_mode=False):
from pathlib import Path
theme_path = f":/{theme_name}"
f = QtCore.QFile(theme_path)
print(theme_path)
if f.exists() and f.open(QtCore.QIODevice.ReadOnly | QtCore.QFile.Text):
print(f"Applying theme from {theme_path}")
qss = QtCore.QTextStream(f).readAll()
app.setStyleSheet(qss)
return True
if dev_mode:
from cgl.ui.startup import ThemeFileWatcher
theme_path = Path(CFG.get_code_root()) / "resources" / "alchemy_theme.qss"
f = QtCore.QFile(theme_path.as_posix())
app.theme_watcher = ThemeFileWatcher(theme_path.as_posix())
# enable inspector with click-open features
alc_code_root = CFG.get_code_root()
enable_dev_tools(
app, css_file=theme_path.as_posix(), project_root=alc_code_root
)
[docs]
def load_style_sheet(style_file="stylesheet.css", app=None, dev_mode=0):
import logging
import os
from PySide6 import QtCore
if app is None:
app = QtWidgets.QApplication.instance()
alc_code_root = CFG.get_code_root()
if not QtCore.QDir.searchPaths("resources"):
QtCore.QDir.addSearchPath("resources", os.path.join(alc_code_root, "resources"))
file_ = os.path.join(alc_code_root, "resources", style_file)
if dev_mode:
# your existing watcher (if PySide6-safe)
if "cglumberjack" in __file__:
from cgl.ui.startup import ThemeFileWatcher
app.theme_watcher = ThemeFileWatcher(file_)
print("dev mode -> watching css file:", file_)
# enable inspector with click-open features
enable_dev_tools(app, css_file=file_, project_root=alc_code_root)
logging.info(f"STYLE FILE {file_}")
if os.path.exists(file_):
with open(file_, "r", encoding="utf-8") as f:
data = f.read()
app.setStyleSheet(data)
def _clean_env(extra_env: Optional[Mapping[str, str]] = None) -> Mapping[str, str]:
env = dict(os.environ.copy())
for k in ("QT_PLUGIN_PATH", "QT_QPA_PLATFORM_PLUGIN_PATH"):
env.pop(k, None)
if extra_env:
env.update(extra_env)
return env
def _normalize_command(command: Sequence[str] | str, command_name: str) -> list[str]:
"""
Normalize the command input to ensure it is a list of strings.
Args:
command (Sequence[str] | str): The command to normalize, can be a string or a sequence of strings.
command_name (str): A name for the command
Returns:
list[str]: A list of strings representing the command.
Raises:
CglExecuteError: If the command is a 'cd' command or a drive change
"""
if isinstance(command, str):
command = shlex.split(command)
command = list(command)
if command and command[0].lower() == "cd":
raise CglExecuteError(
f"[{command_name}] 'cd' detected in command {command}. "
f"Use the 'working_directory' argument instead.",
command=command,
)
if os.name == "nt" and command and re.match(r"^[A-Z]:$", command[0]):
raise CglExecuteError(
f"[{command_name}] Drive change '{command[0]}' detected. "
f"Use the 'working_directory' argument instead. example: cgl_execute(command, working_directory='D:/')",
command=command,
)
return command
def _platform_flags(new_window: bool, detach: bool):
"""
Determine platform-specific flags for process creation.
Args:
new_window (bool): If True, create a new console window for the command.
detach (bool): If True, detach the process from the parent.
"""
creationflags, preexec_fn = 0, None
if os.name == "nt":
DETACHED_PROCESS = 0x00000008
CREATE_NEW_PROCESS_GROUP = 0x00000200
CREATE_NEW_CONSOLE = 0x00000010
if new_window:
creationflags |= CREATE_NEW_CONSOLE | CREATE_NEW_PROCESS_GROUP
elif detach:
creationflags |= DETACHED_PROCESS | CREATE_NEW_PROCESS_GROUP
else:
if detach:
preexec_fn = os.setsid
return creationflags, preexec_fn
def _run_blocking(
command,
cmd_display,
command_name,
proc_env,
cwd,
creationflags,
preexec_fn,
timeout,
print_output,
start,
shell=False,
):
"""
Run a command in a blocking way, capturing stdout and stderr.
If the command fails to start, it raises CglExecuteError with details.
If the command is not found, it raises CglExecuteError with a 127 return code.
If the command is found, it returns a dictionary with process info.
Args:
command (list[str]): The command to run, as a list of strings.
cmd_display (str): A string representation of the command for logging.
command_name (str): A name for the command, used in error messages.
proc_env (Mapping[str, str]): Environment variables to use for the process.
cwd (str | None): The working directory to run the command in.
creationflags (int): Flags for process creation (Windows-specific).
preexec_fn (callable | None): Function to run before executing the command (Unix-specific).
timeout (float | None): Timeout for the command execution, in seconds.
print_output (bool): Whether to print stdout and stderr to the console.
start (float): The start time of the command execution, used for duration calculation.
Raises:
CglExecuteError: If the command fails to start, times out, or returns a non-zero exit code.
Returns:
dict: A dictionary containing the command, command name, process ID (if applicable),
return code, stdout, stderr, start time, end time, and duration of the command
execution.
"""
try:
proc = subprocess.Popen(
command,
env=proc_env,
cwd=cwd,
creationflags=creationflags,
preexec_fn=preexec_fn,
shell=shell, # ← Add shell parameter
)
except FileNotFoundError:
try:
proc = subprocess.run(
command,
env=proc_env,
cwd=cwd,
capture_output=True,
text=True,
creationflags=creationflags if os.name == "nt" else 0,
preexec_fn=preexec_fn,
timeout=timeout,
)
except FileNotFoundError as e:
raise CglExecuteError(
f"[{command_name}] Executable not found: {e}",
command=cmd_display,
returncode=127,
stderr=str(e),
)
except subprocess.TimeoutExpired as e:
raise CglExecuteError(
f"[{command_name}] Timeout after {timeout}s: {cmd_display}",
command=cmd_display,
stdout=e.stdout,
stderr=e.stderr,
)
stdout, stderr = proc.stdout or "", proc.stderr or ""
if print_output and stdout:
print(stdout, end="")
if print_output and stderr:
print(stderr, end="")
if proc.returncode != 0:
# Special-case robocopy: exit codes < 8 are not errors
if "robocopy" in cmd_display.lower() and proc.returncode < 8:
return {
"command": cmd_display,
"command_name": command_name,
"pid": None,
"returncode": proc.returncode,
"stdout": stdout,
"stderr": stderr,
"start_time": start,
"end_time": time.time(),
"duration": time.time() - start,
}
raise CglExecuteError(
f"[{command_name}] Command failed (code {proc.returncode}): {cmd_display}",
command=cmd_display,
returncode=proc.returncode,
stdout=stdout,
stderr=stderr,
)
return {
"command": cmd_display,
"command_name": command_name,
"pid": None,
"returncode": proc.returncode,
"stdout": stdout,
"stderr": stderr,
"start_time": start,
"end_time": time.time(),
"duration": time.time() - start,
}
def _run_nonblocking(
command, cmd_display, command_name, proc_env, cwd, creationflags, preexec_fn, start
):
"""
Run a command in a non-blocking way, returning immediately with process info.
This is useful for fire-and-forget operations where you don't need to capture output.
If the command fails to start, it raises CglExecuteError with details.
If the command is not found, it raises CglExecuteError with a 127 return code.
If the command is found, it returns a dictionary with process info.
Args:
command (list[str]): The command to run, as a list of strings.
cmd_display (str): A string representation of the command for logging.
command_name (str): A name for the command, used in error messages.
proc_env (Mapping[str, str]): Environment variables to use for the process.
cwd (str | None): The working directory to run the command in.
creationflags (int): Flags for process creation (Windows-specific).
preexec_fn (callable | None): Function to run before executing the command (Unix-specific).
start (float): The start time of the command execution, used for duration calculation.
"""
try:
proc = subprocess.Popen(
command,
env=proc_env,
cwd=cwd,
creationflags=creationflags,
preexec_fn=preexec_fn,
)
except FileNotFoundError as e:
raise CglExecuteError(
f"[{command_name}] Executable not found: {e}",
command=cmd_display,
returncode=127,
stderr=str(e),
)
return {
"command": cmd_display,
"command_name": command_name,
"pid": proc.pid,
"returncode": None,
"stdout": "",
"stderr": "",
"start_time": start,
"end_time": time.time(),
"duration": time.time() - start,
}
def _prepare_command_for_subprocess(
command: str | Sequence[str],
) -> tuple[str | list[str], bool]:
"""
Inspect and normalize the command.
Returns (normalized_command, use_shell)
"""
if isinstance(command, (list, tuple)):
head = str(command[0]).lower()
if os.name == "nt" and head in {
"cmd",
"cmd.exe",
"copy",
"del",
"move",
"rmdir",
"start",
}:
return " ".join(map(str, command)), True
return list(map(str, command)), False
# It's a string
cmd_str = command.strip()
lower = cmd_str.lower()
# Detect shell operators or Windows builtins
if any(
x in lower
for x in [
"&&",
"||",
"|",
">",
"<",
"cmd /c",
"start ",
"copy ",
"del ",
"move ",
]
):
return cmd_str, True
# Default: treat as normal executable command
return shlex.split(cmd_str), False
[docs]
def cgl_execute(
command: Sequence[str] | str,
*,
return_output: bool = True,
print_output: bool = True,
verbose: bool = True,
command_name: str = "cgl_execute",
new_window: bool = False,
detach: bool = False,
env: Optional[Mapping[str, str]] = None,
timeout: Optional[float] = None,
working_directory: Optional[str | Path] = None,
) -> dict[str, Any]:
"""
Runs a local process with smart shell detection.
Replaces the old _run_blocking/_run_nonblocking pattern.
"""
# Normalize
cmd, use_shell = _prepare_command_for_subprocess(command)
cmd_display = " ".join(map(str, cmd)) if isinstance(cmd, (list, tuple)) else cmd
# Logging
if verbose:
msg = f"[{command_name}] Executing: {cmd_display}"
if working_directory:
msg += f" from {working_directory}"
if use_shell:
msg += " [SHELL MODE]"
logging.info(msg)
# Environment & flags
proc_env = _clean_env(env)
cwd = str(working_directory) if working_directory else None
creationflags, preexec_fn = _platform_flags(new_window, detach)
start = time.time()
try:
if return_output:
proc = subprocess.run(
cmd,
env=proc_env,
cwd=cwd,
shell=use_shell,
capture_output=True,
text=True,
creationflags=creationflags if os.name == "nt" else 0,
preexec_fn=preexec_fn,
timeout=timeout,
)
stdout, stderr = proc.stdout or "", proc.stderr or ""
else:
proc = subprocess.Popen(
cmd,
env=proc_env,
cwd=cwd,
shell=use_shell,
creationflags=creationflags,
preexec_fn=preexec_fn,
)
stdout, stderr = "", ""
except FileNotFoundError as e:
raise CglExecuteError(
f"[{command_name}] Executable not found: {e}",
command=cmd_display,
returncode=127,
)
except subprocess.TimeoutExpired:
raise CglExecuteError(
f"[{command_name}] Timeout after {timeout}s: {cmd_display}",
command=cmd_display,
)
# Evaluate result
if return_output and proc.returncode not in (0, None):
# Special-case robocopy success codes (<8)
if "robocopy" in cmd_display.lower() and proc.returncode < 8:
pass
else:
raise CglExecuteError(
f"[{command_name}] Command failed (code {proc.returncode}): {cmd_display}",
command=cmd_display,
returncode=proc.returncode,
stdout=stdout,
stderr=stderr,
)
return {
"command": cmd_display,
"command_name": command_name,
"pid": getattr(proc, "pid", None),
"returncode": getattr(proc, "returncode", None),
"stdout": stdout,
"stderr": stderr,
"start_time": start,
"end_time": time.time(),
"duration": time.time() - start,
}
[docs]
def get_end_time(start_time):
return time.time() - start_time
[docs]
def get_job_id():
return str(time.time()).replace(".", "")
#
# def write_to_cgl_data(path_object, process_info):
# cfg = ProjectConfig(path_object)
# job_id = None
# if "job_id" in process_info.keys():
# if process_info["job_id"]:
# job_id = process_info["job_id"]
# else:
# process_info["job_id"] = get_job_id()
# user = current_user()
# cgl_data = os.path.join(ProjectConfig().globals_root, "cgl_data.json")
# if os.path.exists(cgl_data):
# data = load_json(cgl_data)
# else:
# data = {}
# if user not in data.keys():
# data[user] = {}
# if job_id not in data[user].keys():
# data[user][process_info["job_id"]] = process_info
# else:
# logging.info("%s already exists in %s dict" % (process_info["job_id"], user))
# return
# save_json(cgl_data, data)
[docs]
def screen_grab(path_object):
"""
1) takes a screen grab and saves it to the "preview_path" variable on PathObject()
2) creates a "thumbnail" at the "thumb_path" from PathObject()
3) updates ftrack or shotgrid with the new thumbnail.
"""
import cgl.core.review as review
import cgl.core.screen_grab as sg
ppath = sg.run(path_object=path_object)
review.create_movie_thumbnail(path_object)
print("Creating Preview at: {}".format(ppath))
print("Creating thumb at: {}".format(path_object.thumb_path))
# path_object.update_test_project_msd(attr='preview_file')
# path_object.update_test_project_msd(attr='thumb_file')
[docs]
def remove_read_only_attribute(file_path):
"""
Removes read only attribute from file
Args:
file_path: Absolute file path to file
"""
os.chmod(file_path, stat.S_IWRITE)
[docs]
def get_branch_name():
cmd = "git symbolic-ref --short -q HEAD"
branch_name = cgl_execute(command=cmd, return_output=True)
return branch_name["stdout"].splitlines()[0]
[docs]
def ext_exists(folder_path, ext, as_posix=True) -> list[Path] or list[str]:
"""
Args:
folder_path: directory to search
ext: extention you're looking for. expecting a ".ext" but "ext" will likely work too.
Returns:
list of Path objects that match the extension in the folder_path
"""
folder_path = Path(folder_path)
if not folder_path.is_dir():
folder_path = folder_path.parent
results = list(folder_path.glob("*" + ext))
if as_posix:
results = [p.as_posix() for p in results]
return results
[docs]
class Logger:
def __init__(self, name=None, log_file=None, log_level=logging.DEBUG):
if name is None:
self.logger = logging.getLogger()
else:
self.logger = logging.getLogger(name)
self.logger.setLevel(log_level)
formatter = logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
if self.logger.hasHandlers():
self.logger.handlers.clear()
console_handler = logging.StreamHandler()
console_handler.setFormatter(formatter)
self.logger.addHandler(console_handler)
if log_file:
file_handler = RotatingFileHandler(
log_file, maxBytes=1048576 * 2, backupCount=5
)
file_handler.setFormatter(formatter)
self.logger.addHandler(file_handler)
[docs]
def set_log_level(self, level):
"""
The function sets the log level of a logger object.
Args:
level: The "level" parameter is used to set the logging level for the logger object. The
logging level determines the severity of the messages that will be logged. The available logging
levels are:
DEBUG, INFO, WARN, ERROR
"""
self.logger.setLevel(level)
[docs]
class DateTimeEncoder(json.JSONEncoder):
[docs]
def default(self, obj):
if isinstance(obj, datetime):
return obj.strftime("%Y-%m-%dT%H:%M:%S")
return super().default(obj)
[docs]
def get_print_list(list_array):
"""
Returns a string of a list with commas and an "and" before the last item.
Args:
list_array:
Returns:
"""
print_list = ""
if list_array:
for item in list_array:
print_list += f"{item}, "
# remove the last comma and space
if print_list:
print_list = print_list[:-2]
return print_list
[docs]
def get_file_type(filepath) -> str:
"""
Returns the file type of the given filepath
Args:
filepath:
Returns:
The file type of the given filepath
Raises:
KeyError: If the file type is not found in the config file
"""
ft = "file"
if "." not in filepath:
ft = "folder"
if "####" in filepath:
ft = "sequence"
if "%0" in filepath:
ft = "sequence"
if ft == "file":
file_, ext = os.path.splitext(filepath)
util_config = CFG.get_util_config()
try:
ft = util_config["ext_map"][ext.lower()]
except KeyError:
logging.error("No file type found for %s" % ext.lower())
ft = "NA"
return ft