import glob
import logging
import os
import re
import subprocess
import sys
import time
from typing import Dict, List, Tuple, Iterable
from pathlib import Path
from PySide6 import QtWidgets
from cgl.core.config.query import AlchemyConfigManager, get_user_name
from cgl.core.path.constants import (
SEQ_REGEX,
)
from cgl.pathlib.sequence_model import Sequence
from cgl.core.utils.general import cgl_execute, load_json, save_json, get_file_type
CFG = AlchemyConfigManager()
SeqKey = Tuple[str, str, int, str, str]
_RIGHTMOST_NUM_RE = re.compile(
r"^(?P<prefix>.*?)(?P<digits>\d{1,7})(?P<suffix>[^0-9]*)$"
)
_SEQ_NUM_RE = re.compile(r"^(?P<head>.*?)(?P<frame>\d+)(?P<tail>\.[^.\\/]+)$")
_HASH_RE = re.compile(r"^(?P<head>.*?)(?P<hash>\#{2,})(?P<tail>\.[^.\\/]+)$")
_PRINTF_RE = re.compile(r"^(?P<head>.*?)(?:%0?(?P<pad>\d+)d)(?P<tail>\.[^.\\/]+)$")
[docs]
def path_delimit(path: str, to_windows=False) -> str:
"""
It takes a path and returns a path with a linux/osx style path delimiter
Args:
to_windows: default is to return a linux style path string with "/" delimiter.
Set to True if you need windows delimiters "\\"
path: The path to be delimified
Returns:
The path with unified delimiters.
"""
if to_windows:
return path.replace("/", "\\")
else:
return path.replace("\\", "/")
[docs]
def open_file(filename: str) -> None:
"""
It opens a file in the default application for that file type
Args:
filename: The path to the file to be opened
Returns:
None
Raises:
FileNotFoundError: If file named 'filename' does not exist
"""
_, ext_ = os.path.splitext(filename)
if os.path.exists(filename):
if sys.platform == "win32":
os.startfile(filename)
else:
if sys.platform == "darwin":
if ext_ in [".ma", ".mb"]:
command = "open -a maya -file {}".format(filename)
logging.info("excecuting: {command}")
cgl_execute(command)
else:
opener = "open" if sys.platform == "darwin" else "xdg-open"
subprocess.call([opener, filename])
else:
logging.exception("File does not exist: {}".format(filename))
[docs]
def open_folder(file_path):
"""
opens the explorer/finder to the folder containing the file
"""
if sys.platform == "win32":
os.system("start explorer /select,{}".format(file_path))
elif sys.platform == "darwin":
os.system("open -R {}".format(file_path))
else:
os.system("xdg-open {}".format(file_path))
[docs]
def create_folders(path_object):
"""
Creates folders for the given path object. Creates folders for both the source and the render paths.
Args:
path_object: The path object to create folders for
Returns:
None
"""
from cgl.core.path.object import PathObject
if isinstance(path_object, PathObject):
render_path = path_object.get_render_path(dirname=True)
source_path = path_object.get_source_path(dirname=True)
elif isinstance(path_object, str):
if not os.path.isdir(path_object):
path_object = os.path.dirname(path_object)
if "render" in path_object:
render_path = path_object
source_path = path_object.replace("render", "source")
else:
source_path = path_object
render_path = path_object.replace("source", "render")
else:
raise ValueError("path_object must be a PathObject or a string")
if not os.path.exists(render_path):
os.makedirs(render_path)
if not os.path.exists(source_path):
os.makedirs(source_path)
[docs]
def replace_illegal_filename_characters(filename):
"""
Replaces illegal characters in a filename with underscores
Args:
filename (str): The filename to be corrected
Returns:
Corrected file name
"""
safe_name = re.sub(r"[^A-Za-z0-9\.#]+", "_", filename)
if safe_name.endswith("_"):
safe_name = safe_name[:-1]
return safe_name
[docs]
def get_folder_size(folder):
"""
Returns:
total_bytes (int): The size of the given folder including all children in bytes
Raises:
FileNotFoundError: If the folder does not exist
"""
total_bytes = 0
if os.path.isdir(folder):
for root, dirs, files in os.walk(folder):
try:
total_bytes += sum(
os.path.getsize(os.path.join(root, name).replace("\\", "/"))
for name in files
)
except FileNotFoundError as e:
print(e)
logging.debug("ERROR: likely a problem with a file in %s" % root)
elif os.path.isfile(folder):
logging.debug("this is a file numskull")
return 0
return total_bytes
[docs]
def get_task_default_file(task=None) -> list:
"""
Returns the path to the default file of the given task
Args:
task: The task to get the default file of
Returns:
The path to the default file of the given task or None if no default file is found for the given task.
"""
# TODO - move this to alc_query.
if not task:
task_folder = f"{CFG.cookbook_dir}/default_files".replace("\\", "/")
if os.path.exists(task_folder):
return [task_folder]
else:
return None
else:
task_folder = f"{CFG.cookbook_dir}/default_files/{task}"
print("[Default Files] Searching task folder: {}".format(task_folder))
default_file = glob.glob("{}/default.*".format(task_folder))
if default_file:
list_of_default_files = []
if len(default_file) > 1:
for each in default_file:
each = each.replace("\\", "/")
list_of_default_files.append(each)
return list_of_default_files
else:
if default_file[0]:
return default_file[0].replace("\\", "/")
else:
return []
return []
[docs]
def start(filepath):
"""
Opens a file on any os in the default application for that file type
if given a folder, opens the os explorer to that location.
Args:
filepath: The path to the file to be opened
Returns:
None
Raises:
FileNotFoundError: If the file does not exist
"""
# if it has an extention
base, ext = os.path.splitext(filepath)
if ext == ".spp":
from cgl.plugins.substance.utils import (
copy_user_setup,
)
copy_user_setup()
# Sets images and videos to open in RV
# orig_filepath = None
file_type = get_file_type(filepath)
if file_type == "sequence":
seq_object = Sequence.from_path(filepath.split(" ")[0])
print("Alchemy is set to open sequences with ffplay")
filepath = " ".join(seq_object.ffplay_cmd(fps=24))
if sys.platform == "darwin":
cmd = "open "
command = cmd + filepath
print(command)
cgl_execute(command)
elif sys.platform == "linux2":
cmd = "xdg-open "
elif sys.platform == "win32":
cmd = ["cmd", "/c", "start"]
filepath = str(Path(filepath))
cmd.append(filepath)
try:
cgl_execute(cmd, new_window=False)
except FileNotFoundError:
print("File not found: {}".format(filepath))
print("Command: {}".format(cmd))
logging.error(FileNotFoundError)
[docs]
def start_url(url):
"""
Opens an url in the default browser
Args:
url:
Returns:
None
"""
import webbrowser
webbrowser.open(url)
[docs]
def print_file_size(total_bytes, do_print=True):
"""
Returns a string with the size of the given file in Mb and bytes
Args:
total_bytes: The size of the file in bytes
do_print (bool): Determines if file size is to be printed (Default value = True)
Returns:
A string with the size of the given file in Mb and bytes
"""
total_mb = float(total_bytes) / 1024 / 1024
# total_gb = total_mb / 1024
size_string = "%s Mb(%s bytes)" % (
format(total_mb, ".2f"),
"{:,}".format(total_bytes),
)
if do_print:
logging.debug(size_string)
return size_string
[docs]
def find_latest_publish_objects(folder, source=True, render=False):
"""
Returns all the latest published versions of the "folder"
Args:
folder: The folder to find the latest published versions of
source (bool): Determines if source versions are to be returned (Default value = True)
render (bool): Determines if render versions are to be returned (Default value = False)
Returns:
A list of the latest published versions of the given folder
"""
from cgl.core.path.object import PathObject
path_object = PathObject().from_path_string(folder)
path_object.set_attr(task="*", user="publish")
folders = glob.glob(path_object.get_path())
total_size = 0
sync_objects = []
for each in folders:
f_object = PathObject.from_path_string(each)
# TODO - latest version should work without providing a base version.
f_object.set_attr(version="000.000")
if source:
f_object.set_attr(version="000.000", tree="source")
l_object = f_object.copy(latest=True)
sync_objects.append(l_object)
size = get_folder_size(l_object.get_path())
total_size += size
if render:
f_object.set_attr(version="000.000", tree="render")
l_object = f_object.copy(latest=True)
sync_objects.append(l_object)
size = get_folder_size(l_object.get_path())
total_size += size
logging.debug(
"%s %s Total Size of Latest Publishes\n\t%s"
% (
path_object.sequence,
path_object.shot,
print_file_size(total_size, do_print=False),
)
)
return sync_objects
[docs]
def show_in_folder(path_string):
"""
Opens the folder containing the given path_string
Args:
path_string: The path to the file to be opened
Returns:
None
"""
print(path_string)
full_path = Path(path_string).parent
if sys.platform == "darwin":
cmd = ["open", full_path.as_posix()]
elif sys.platform == "linux2":
cmd = ["xdg-open", full_path.as_posix()]
else:
# cmd = r"cmd /c start "
full_path = str(full_path)
print(full_path)
cmd = ["cmd", "/c", "start", full_path]
logging.debug("running command: %s" % cmd)
# this command will only ever be run locally, it does not need render management support
cgl_execute(cmd)
[docs]
def seq_from_file(filename, ext_map):
"""
Checks to see if a filename can be displayed as a hash_sequence
Args:
filename (str): The filename to be checked
ext_map: The ext_map to be used to check the filename
Returns:
The filename as a hash_sequence if it can be displayed as one, otherwise the original filename is returned
"""
match = re.search(SEQ_REGEX, filename)
if match:
name_, ext_ = os.path.splitext(filename)
frame_num = match.group(0).replace(".", "")
name_ = name_.replace(frame_num, "####")
if ext_ in ext_map.keys():
if ext_map[ext_] == "image_plane":
return f"{name_}{ext_}"
return filename
[docs]
def is_file(path_string):
"""
Uses regex to determine whether the string ends with a file extension or not.
Assumes that if there's an extension this is a file path that ends with a filename.
Args:
path_string: The path to be checked
Returns:
True if the path ends with a file extension, otherwise False
"""
match = re.search(r"\.[a-zA-Z0-9]{2,7}$", path_string)
if match:
return True
else:
return False
[docs]
def list_dir(path_object, variable):
"""
Lists what's in the directory of the given variable, convenience function for working in the gui
Args:
path_object: The path object to be listed
variable: The variable to be listed
Returns:
A list of the given variable in the given path_object
"""
if variable == "version":
versions = path_object.listdir_by_element("version")
return list(sorted(versions, reverse=True)) or []
elif variable == "user":
return path_object.listdir_by_element("user") or []
else:
return path_object.listdir_by_element(variable) or []
[docs]
def lj_list_dir(
directory: str,
basename: bool = True,
return_sequences: bool = False,
full_range: bool = False,
ignore_hidden: bool = True,
) -> List[str]:
"""
List directory contents with smart image-sequence grouping.
Delegates sequence logic (frame discovery, padding, ranges, missing frames)
to `Sequence`. This function only:
1) scans the directory,
2) identifies unique sequence 'heads' (rightmost 1–7 digit block),
3) instantiates one `Sequence` per head,
4) formats display strings.
Non-sequence files are returned as-is.
"""
try:
names = [e.name for e in os.scandir(directory)]
except FileNotFoundError:
return []
except PermissionError:
return []
if ignore_hidden:
names = [n for n in names if not n.startswith(".")]
# optional external filter hook
names = _maybe_clean_file_list(names)
# 1) find unique sequence heads + collect non-sequence singles
seq_keys, non_seq_files = _collect_sequence_keys(directory, names)
# 2) build Sequence objects once per unique head
sequences = []
for dirpath, head, pad, suffix, ext in seq_keys:
# prefer hash pattern so callers get both .hash/.printf from Sequence
pattern = os.path.join(dirpath, f"{head}.{'#' * pad}{suffix}.{ext}")
try:
seq = Sequence.from_path(pattern)
except Exception:
# If Sequence rejects it (shouldn’t happen often), skip gracefully
continue
sequences.append(seq)
# 3) format outputs
seq_display: List[str] = []
for seq in sequences:
if seq.count <= 1:
# Single frame: show concrete filename
if seq.frames:
f = seq.file_for(seq.frames[0])
seq_display.append(os.path.basename(f) if basename else str(f))
else:
# no frames found; show pattern to hint intent
p = seq.filepath_hash
seq_display.append(os.path.basename(p) if basename else str(p))
continue
# Multi-frame
placeholder = seq.filepath_hash # e.g. playblast.####.jpg
if full_range:
fr_str = f"{seq.start_frame}-{seq.end_frame}"
else:
fr_str = _compact_frame_list(list(seq.frames))
disp = f"{placeholder} {fr_str}"
seq_display.append(os.path.basename(disp) if basename else str(disp))
if return_sequences:
return sorted(seq_display)
# Non-sequences
non_seq_display = [os.path.basename(p) if basename else p for p in non_seq_files]
return sorted(non_seq_display + seq_display)
# -----------------------------
# Helpers
# -----------------------------
def _maybe_clean_file_list(names: List[str]) -> List[str]:
"""Call external clean_file_list(names) if defined by caller; else pass-through."""
fn = globals().get("clean_file_list")
if callable(fn):
try:
return fn(names) # type: ignore[misc]
except Exception as e:
logging.error(f"[_maybe_clean_file_list]{e}")
pass
else:
logging.debug(f"[_maybe_clean_file_list] {names}")
return names
def _collect_sequence_keys(
directory: str,
names: Iterable[str],
*,
delimiters: str = ".", # which characters count as a delimiter
min_len: int = 1,
max_len: int = 7,
) -> Tuple[List[SeqKey], List[str]]:
"""
Identify unique sequence heads where the basename ends with:
<prefix><delimiter><digits>{min_len..max_len} + '.' + <ext>
Returns:
(unique_keys, non_sequence_fullpaths)
unique key format: (directory, prefix, pad, suffix, ext)
- suffix is always '' here by definition (digits are at the very end)
- pad is the zero-pad width (len of the matched digits)
- ext does NOT include the leading '.'
"""
# Build a regex that enforces: basename ends with delimiter + digits
# e.g. 'shotA_prv_0001' or 'comp.v001' or 'render-12'
delim_class = re.escape(delimiters)
tail_re = re.compile(
rf"^(?P<prefix>.*?)(?P<delim>[{delim_class}])(?P<digits>\d{{{min_len},{max_len}}})$"
)
keys: Dict[SeqKey, None] = {}
singles: List[str] = []
for name in names:
base, ext = os.path.splitext(name)
if not ext:
# no extension -> not a sequence frame
singles.append(os.path.join(directory, name))
continue
m = tail_re.match(base)
if not m:
# does not end with delimiter+digits -> not a sequence frame
singles.append(os.path.join(directory, name))
continue
prefix = m.group("prefix")
digits = m.group("digits")
pad = len(digits)
suffix = "" # by the rule: digits must be immediately before the dot
key: SeqKey = (directory, prefix, pad, suffix, ext.lstrip("."))
keys.setdefault(key, None)
return list(keys.keys()), singles
def _compact_frame_list(frames_sorted: List[int]) -> str:
"""Compact a sorted list of ints into '1-5,7,9-12'."""
if not frames_sorted:
return ""
frames_sorted.sort()
out: List[str] = []
s = e = frames_sorted[0]
for f in frames_sorted[1:]:
if f == e + 1:
e = f
else:
out.append(f"{s}-{e}" if s != e else f"{s}")
s = e = f
out.append(f"{s}-{e}" if s != e else f"{s}")
return ",".join(out)
#
# def split_sequence_frange(sequence):
# """
# Takes the result of a lj_list_dir, and gives back the file path as well as the sequence
#
# Args:
# sequence: The sequence to be split
#
# Returns:
# The file path and the sequence of the given sequence or undefined if no sequence is found
#
# """
# frange = re.search(SPLIT_SEQ_REGEX, sequence)
# if frange:
# return sequence.split(frange.group(0))[0], frange.group(0).replace(" ", "")
# else:
# return
#
#
# def split_sequence(sequence: str):
# """
# Splits a sequence with a match for ######, as well as %0#d
#
# Args:
# sequence: The sequence to be split
#
# Returns:
# The sequence split by the match for ######, as well as %0#d or undefined if no sequence is found
#
# """
# frange = None
#
# group = ""
#
# if "#" in sequence:
# frange = re.search(SEQ_SPLIT, sequence)
# group = frange.group(0)
# elif "%" in sequence:
# frange = re.search(SEQ2_SPLIT, sequence)
# group = frange.group(0)
# elif ".*." in sequence:
# frange = "this"
# group = "*."
# if frange:
# return sequence.split(group)[0]
# else:
# return
[docs]
def remove_root(filepath):
"""
Removes the root from the given filepath
Args:
filepath: The filepath to be processed
Returns:
The filepath with the root removed
"""
# TODO - move this function to PathObject - it really does belong there.
# config = user_config()
if filepath:
root = CFG.get_prod_root()
filepath = filepath.replace("\\", "/")
root = root.replace("\\", "/")
filey = filepath.replace(root, "")
filey = re.sub("^\/", "", filey)
return filey
else:
return ""
[docs]
def remove_coobkook_root(filepath):
"""
Removes the cookbook root from the filepath
Args:
filepath:
Returns:
"""
root = CFG.cookbook_root
filepath = filepath.replace("\\", "/")
root = root.replace("\\", "/")
filey = filepath.replace(root, "")
filey = re.sub("^\/", "", filey)
return filey
[docs]
def add_cookbook_root(filepath):
"""
Adds the cookbook root to the filepath
Args:
filepath: The filepath to be processed
Returns:
The filepath with the cookbook root added
"""
root = CFG.cookbook_root
filepath = filepath.replace("\\", "/")
root = root.replace("\\", "/")
return "{}/{}".format(root, filepath)
[docs]
def add_root(path_):
root = CFG.get_prod_root()
filepath = "{}/{}".format(root, path_)
return filepath.replace("\\", "/")
[docs]
def hd_proxy_exists(hd_proxy_path, frame_range):
if "#" in hd_proxy_path:
files = glob.glob("{}*.jpg".format(hd_proxy_path.split("##")[0]))
if files:
sframe, eframe = frame_range.split("-")
length = int(eframe) - int(sframe) + 1
if len(files) != length:
logging.info(
"Full HD Proxy Files not found at: {}, "
"Proxy Must Exist before Creating mov".format(hd_proxy_path)
)
return False
else:
logging.info(
"HD Proxy Sequence Found at: {}, Creating Web Preview".format(
hd_proxy_path
)
)
return True
else:
logging.info(
"HD Proxy Path not found at: {}, Proxy Must Exist before Creating mov".format(
hd_proxy_path
)
)
return False
[docs]
def get_short_name(string):
return "".join(filter(str.isalpha, string)).lower()[0:3]
[docs]
def get_projects(company: str = ""):
"""
Returns projects for "company"
Args:
company: The company to get the projects for (Default value = "")
Returns:
A list of projects for the given company
"""
from cgl.core.path.object import PathObject
if not company:
company = CFG.company
d = {"company": company, "tree": "source"}
po = PathObject.from_dict(d)
projects = po.listdir_by_element("project")
return projects
[docs]
def edit_publish_msd(publish_path_object, wait=10):
"""
Edits the published MSD path to contain "publish" paths.
Args:
publish_path_object: The path object to be edited
wait (int): The time to wait before editing the publish path (Default value = 10)
Returns:
True if the publish path was edited successfully, otherwise False
"""
from cgl.core import msd
from cgl.core.path.object import PathObject
version = publish_path_object.version
publish_msd_path = msd.get_msd_path(publish_path_object)
time.sleep(wait)
if os.path.exists(publish_msd_path):
# TODO - i'd like this to be smarter and just continue to try ever N seconds until it can load something.
publish_msd = load_json(publish_msd_path)
if publish_msd:
render_files = publish_msd["render_files"]
source_files = publish_msd["source_files"]
if render_files:
for rf in render_files:
user_path = publish_msd["render_files"][rf]
po = PathObject.from_path_string(add_root(user_path))
publish_path = po.copy(user="publish", version=version).get_path()
publish_msd["render_files"][rf] = publish_path
if source_files:
for sf in source_files:
user_path = publish_msd["source_files"][sf]
po = PathObject.from_path_string(add_root(user_path))
publish_path = po.copy(user="publish", version=version).get_abs_path
publish_msd["source_files"][sf] = publish_path
save_json(filepath=publish_msd_path, data=publish_msd)
return True
else:
logging.error(
f"Could not load: {publish_msd_path}, this is typically caused when the wait time before editing"
f"is not long enough on a publish. Try increasing default wait time on edit_publish_msd()"
)
return False
[docs]
def edit_project_msd(entity_object):
from cgl.core.msd import ProjectMsd
proj_msd = ProjectMsd(entity_object.project)
proj_msd.add_entity(entity_object)
[docs]
def read_publish_msd(user_render_dir):
publish_msd_path = os.path.join(user_render_dir, "publish.msd")
json_obj = load_json(publish_msd_path)
publish_path = json_obj["publish_path"]
return publish_path
[docs]
def get_latest_asset_publish(project, sequence="*", asset=None, task="mdl"):
if not project:
logging.exception("Project is a required field")
if not sequence:
sequence = "*"
if not asset:
logging.exception("asset is a required field")
d = {
"project": project,
"sequence": sequence,
"shot": asset,
"task": task,
"tree": "assets",
}
return d
[docs]
def resolve_new_path(path_object, resolve_list):
po = path_object.copy()
for var in resolve_list:
items = po.listdir_by_element(var)
if isinstance(items, list):
# Remove any items that start with "."
items = [item for item in items if not item.startswith(".")]
if items: # There is something left after removing the items
item = pick_best_default_item(items, var)
po.set_attrx(var, item, dont_check=True)
return po
[docs]
def pick_best_default_item(items, variable):
item = None
if variable == "user":
cu = get_user_name() # change this to get the user from the user globals
if cu in items:
item = cu
elif "publish" in items:
item = "publish"
elif items:
item = items[0]
if variable == "version":
if items:
items.sort()
item = items[-1]
if variable == "resolution":
if "high" in items:
item = "high"
else:
if items:
item = items[0]
return item
[docs]
def get_dcc_from_task_file(task_file):
"""
returns the dcc from a task file from the cookbook
expecting a path like: cookbook/$DCC/tasks/$TASK.py
Args:
task_file:
Returns:
"""
# ensure the path is using "/" as a delimiter
task_file = task_file.replace("\\", "/")
# split the path and take the 3rd element from the end
return task_file.split("/")[-4]
[docs]
def split_after(path_object, attr):
# TODO - this must be updated to match how glob_project_element works
"""
Convenience function that returns the path after splitting it at the desired attribute.
For example, splitting a path at `project` would return the path up to and including the project variable::
split_after(path_object, "project")
Args:
path_object: The path object to be split
attr: The attribute to split the path at
Returns:
The path after splitting it at the desired attribute.
"""
value = path_object.__dict__[attr]
if value:
path_ = path_object.get_path()
front = path_.split(value)[0]
return os.path.join(front, value).replace("\\", "/")
else:
logging.info("Attr: {} does not exist".format(attr))
return path_object.get_path()
if __name__ == "__main__":
path = r"E:\Alchemy\CGL\jlt\PUBLISHES\render\assets\char\jack\mdl\default\tom\000.000\high"
print(lj_list_dir(path, return_sequences=False))