Source code for cgl.core.path.support

import glob
import logging
import os
import re
import subprocess
import sys
import time
from typing import Dict, List, Tuple, Iterable
from pathlib import Path
from PySide6 import QtWidgets

from cgl.core.config.query import AlchemyConfigManager, get_user_name
from cgl.core.path.constants import (
    SEQ_REGEX,
)
from cgl.pathlib.sequence_model import Sequence
from cgl.core.utils.general import cgl_execute, load_json, save_json, get_file_type

CFG = AlchemyConfigManager()
SeqKey = Tuple[str, str, int, str, str]

_RIGHTMOST_NUM_RE = re.compile(
    r"^(?P<prefix>.*?)(?P<digits>\d{1,7})(?P<suffix>[^0-9]*)$"
)

_SEQ_NUM_RE = re.compile(r"^(?P<head>.*?)(?P<frame>\d+)(?P<tail>\.[^.\\/]+)$")
_HASH_RE = re.compile(r"^(?P<head>.*?)(?P<hash>\#{2,})(?P<tail>\.[^.\\/]+)$")
_PRINTF_RE = re.compile(r"^(?P<head>.*?)(?:%0?(?P<pad>\d+)d)(?P<tail>\.[^.\\/]+)$")


[docs] def path_delimit(path: str, to_windows=False) -> str: """ It takes a path and returns a path with a linux/osx style path delimiter Args: to_windows: default is to return a linux style path string with "/" delimiter. Set to True if you need windows delimiters "\\" path: The path to be delimified Returns: The path with unified delimiters. """ if to_windows: return path.replace("/", "\\") else: return path.replace("\\", "/")
[docs] def open_file(filename: str) -> None: """ It opens a file in the default application for that file type Args: filename: The path to the file to be opened Returns: None Raises: FileNotFoundError: If file named 'filename' does not exist """ _, ext_ = os.path.splitext(filename) if os.path.exists(filename): if sys.platform == "win32": os.startfile(filename) else: if sys.platform == "darwin": if ext_ in [".ma", ".mb"]: command = "open -a maya -file {}".format(filename) logging.info("excecuting: {command}") cgl_execute(command) else: opener = "open" if sys.platform == "darwin" else "xdg-open" subprocess.call([opener, filename]) else: logging.exception("File does not exist: {}".format(filename))
[docs] def open_folder(file_path): """ opens the explorer/finder to the folder containing the file """ if sys.platform == "win32": os.system("start explorer /select,{}".format(file_path)) elif sys.platform == "darwin": os.system("open -R {}".format(file_path)) else: os.system("xdg-open {}".format(file_path))
[docs] def create_folders(path_object): """ Creates folders for the given path object. Creates folders for both the source and the render paths. Args: path_object: The path object to create folders for Returns: None """ from cgl.core.path.object import PathObject if isinstance(path_object, PathObject): render_path = path_object.get_render_path(dirname=True) source_path = path_object.get_source_path(dirname=True) elif isinstance(path_object, str): if not os.path.isdir(path_object): path_object = os.path.dirname(path_object) if "render" in path_object: render_path = path_object source_path = path_object.replace("render", "source") else: source_path = path_object render_path = path_object.replace("source", "render") else: raise ValueError("path_object must be a PathObject or a string") if not os.path.exists(render_path): os.makedirs(render_path) if not os.path.exists(source_path): os.makedirs(source_path)
[docs] def replace_illegal_filename_characters(filename): """ Replaces illegal characters in a filename with underscores Args: filename (str): The filename to be corrected Returns: Corrected file name """ safe_name = re.sub(r"[^A-Za-z0-9\.#]+", "_", filename) if safe_name.endswith("_"): safe_name = safe_name[:-1] return safe_name
[docs] def get_folder_size(folder): """ Returns: total_bytes (int): The size of the given folder including all children in bytes Raises: FileNotFoundError: If the folder does not exist """ total_bytes = 0 if os.path.isdir(folder): for root, dirs, files in os.walk(folder): try: total_bytes += sum( os.path.getsize(os.path.join(root, name).replace("\\", "/")) for name in files ) except FileNotFoundError as e: print(e) logging.debug("ERROR: likely a problem with a file in %s" % root) elif os.path.isfile(folder): logging.debug("this is a file numskull") return 0 return total_bytes
[docs] def get_task_default_file(task=None) -> list: """ Returns the path to the default file of the given task Args: task: The task to get the default file of Returns: The path to the default file of the given task or None if no default file is found for the given task. """ # TODO - move this to alc_query. if not task: task_folder = f"{CFG.cookbook_dir}/default_files".replace("\\", "/") if os.path.exists(task_folder): return [task_folder] else: return None else: task_folder = f"{CFG.cookbook_dir}/default_files/{task}" print("[Default Files] Searching task folder: {}".format(task_folder)) default_file = glob.glob("{}/default.*".format(task_folder)) if default_file: list_of_default_files = [] if len(default_file) > 1: for each in default_file: each = each.replace("\\", "/") list_of_default_files.append(each) return list_of_default_files else: if default_file[0]: return default_file[0].replace("\\", "/") else: return [] return []
[docs] def start(filepath): """ Opens a file on any os in the default application for that file type if given a folder, opens the os explorer to that location. Args: filepath: The path to the file to be opened Returns: None Raises: FileNotFoundError: If the file does not exist """ # if it has an extention base, ext = os.path.splitext(filepath) if ext == ".spp": from cgl.plugins.substance.utils import ( copy_user_setup, ) copy_user_setup() # Sets images and videos to open in RV # orig_filepath = None file_type = get_file_type(filepath) if file_type == "sequence": seq_object = Sequence.from_path(filepath.split(" ")[0]) print("Alchemy is set to open sequences with ffplay") filepath = " ".join(seq_object.ffplay_cmd(fps=24)) if sys.platform == "darwin": cmd = "open " command = cmd + filepath print(command) cgl_execute(command) elif sys.platform == "linux2": cmd = "xdg-open " elif sys.platform == "win32": cmd = ["cmd", "/c", "start"] filepath = str(Path(filepath)) cmd.append(filepath) try: cgl_execute(cmd, new_window=False) except FileNotFoundError: print("File not found: {}".format(filepath)) print("Command: {}".format(cmd)) logging.error(FileNotFoundError)
[docs] def start_url(url): """ Opens an url in the default browser Args: url: Returns: None """ import webbrowser webbrowser.open(url)
[docs] def find_latest_publish_objects(folder, source=True, render=False): """ Returns all the latest published versions of the "folder" Args: folder: The folder to find the latest published versions of source (bool): Determines if source versions are to be returned (Default value = True) render (bool): Determines if render versions are to be returned (Default value = False) Returns: A list of the latest published versions of the given folder """ from cgl.core.path.object import PathObject path_object = PathObject().from_path_string(folder) path_object.set_attr(task="*", user="publish") folders = glob.glob(path_object.get_path()) total_size = 0 sync_objects = [] for each in folders: f_object = PathObject.from_path_string(each) # TODO - latest version should work without providing a base version. f_object.set_attr(version="000.000") if source: f_object.set_attr(version="000.000", tree="source") l_object = f_object.copy(latest=True) sync_objects.append(l_object) size = get_folder_size(l_object.get_path()) total_size += size if render: f_object.set_attr(version="000.000", tree="render") l_object = f_object.copy(latest=True) sync_objects.append(l_object) size = get_folder_size(l_object.get_path()) total_size += size logging.debug( "%s %s Total Size of Latest Publishes\n\t%s" % ( path_object.sequence, path_object.shot, print_file_size(total_size, do_print=False), ) ) return sync_objects
[docs] def show_in_folder(path_string): """ Opens the folder containing the given path_string Args: path_string: The path to the file to be opened Returns: None """ print(path_string) full_path = Path(path_string).parent if sys.platform == "darwin": cmd = ["open", full_path.as_posix()] elif sys.platform == "linux2": cmd = ["xdg-open", full_path.as_posix()] else: # cmd = r"cmd /c start " full_path = str(full_path) print(full_path) cmd = ["cmd", "/c", "start", full_path] logging.debug("running command: %s" % cmd) # this command will only ever be run locally, it does not need render management support cgl_execute(cmd)
[docs] def seq_from_file(filename, ext_map): """ Checks to see if a filename can be displayed as a hash_sequence Args: filename (str): The filename to be checked ext_map: The ext_map to be used to check the filename Returns: The filename as a hash_sequence if it can be displayed as one, otherwise the original filename is returned """ match = re.search(SEQ_REGEX, filename) if match: name_, ext_ = os.path.splitext(filename) frame_num = match.group(0).replace(".", "") name_ = name_.replace(frame_num, "####") if ext_ in ext_map.keys(): if ext_map[ext_] == "image_plane": return f"{name_}{ext_}" return filename
[docs] def is_file(path_string): """ Uses regex to determine whether the string ends with a file extension or not. Assumes that if there's an extension this is a file path that ends with a filename. Args: path_string: The path to be checked Returns: True if the path ends with a file extension, otherwise False """ match = re.search(r"\.[a-zA-Z0-9]{2,7}$", path_string) if match: return True else: return False
[docs] def list_dir(path_object, variable): """ Lists what's in the directory of the given variable, convenience function for working in the gui Args: path_object: The path object to be listed variable: The variable to be listed Returns: A list of the given variable in the given path_object """ if variable == "version": versions = path_object.listdir_by_element("version") return list(sorted(versions, reverse=True)) or [] elif variable == "user": return path_object.listdir_by_element("user") or [] else: return path_object.listdir_by_element(variable) or []
[docs] def lj_list_dir( directory: str, basename: bool = True, return_sequences: bool = False, full_range: bool = False, ignore_hidden: bool = True, ) -> List[str]: """ List directory contents with smart image-sequence grouping. Delegates sequence logic (frame discovery, padding, ranges, missing frames) to `Sequence`. This function only: 1) scans the directory, 2) identifies unique sequence 'heads' (rightmost 1–7 digit block), 3) instantiates one `Sequence` per head, 4) formats display strings. Non-sequence files are returned as-is. """ try: names = [e.name for e in os.scandir(directory)] except FileNotFoundError: return [] except PermissionError: return [] if ignore_hidden: names = [n for n in names if not n.startswith(".")] # optional external filter hook names = _maybe_clean_file_list(names) # 1) find unique sequence heads + collect non-sequence singles seq_keys, non_seq_files = _collect_sequence_keys(directory, names) # 2) build Sequence objects once per unique head sequences = [] for dirpath, head, pad, suffix, ext in seq_keys: # prefer hash pattern so callers get both .hash/.printf from Sequence pattern = os.path.join(dirpath, f"{head}.{'#' * pad}{suffix}.{ext}") try: seq = Sequence.from_path(pattern) except Exception: # If Sequence rejects it (shouldn’t happen often), skip gracefully continue sequences.append(seq) # 3) format outputs seq_display: List[str] = [] for seq in sequences: if seq.count <= 1: # Single frame: show concrete filename if seq.frames: f = seq.file_for(seq.frames[0]) seq_display.append(os.path.basename(f) if basename else str(f)) else: # no frames found; show pattern to hint intent p = seq.filepath_hash seq_display.append(os.path.basename(p) if basename else str(p)) continue # Multi-frame placeholder = seq.filepath_hash # e.g. playblast.####.jpg if full_range: fr_str = f"{seq.start_frame}-{seq.end_frame}" else: fr_str = _compact_frame_list(list(seq.frames)) disp = f"{placeholder} {fr_str}" seq_display.append(os.path.basename(disp) if basename else str(disp)) if return_sequences: return sorted(seq_display) # Non-sequences non_seq_display = [os.path.basename(p) if basename else p for p in non_seq_files] return sorted(non_seq_display + seq_display)
# ----------------------------- # Helpers # ----------------------------- def _maybe_clean_file_list(names: List[str]) -> List[str]: """Call external clean_file_list(names) if defined by caller; else pass-through.""" fn = globals().get("clean_file_list") if callable(fn): try: return fn(names) # type: ignore[misc] except Exception as e: logging.error(f"[_maybe_clean_file_list]{e}") pass else: logging.debug(f"[_maybe_clean_file_list] {names}") return names def _collect_sequence_keys( directory: str, names: Iterable[str], *, delimiters: str = ".", # which characters count as a delimiter min_len: int = 1, max_len: int = 7, ) -> Tuple[List[SeqKey], List[str]]: """ Identify unique sequence heads where the basename ends with: <prefix><delimiter><digits>{min_len..max_len} + '.' + <ext> Returns: (unique_keys, non_sequence_fullpaths) unique key format: (directory, prefix, pad, suffix, ext) - suffix is always '' here by definition (digits are at the very end) - pad is the zero-pad width (len of the matched digits) - ext does NOT include the leading '.' """ # Build a regex that enforces: basename ends with delimiter + digits # e.g. 'shotA_prv_0001' or 'comp.v001' or 'render-12' delim_class = re.escape(delimiters) tail_re = re.compile( rf"^(?P<prefix>.*?)(?P<delim>[{delim_class}])(?P<digits>\d{{{min_len},{max_len}}})$" ) keys: Dict[SeqKey, None] = {} singles: List[str] = [] for name in names: base, ext = os.path.splitext(name) if not ext: # no extension -> not a sequence frame singles.append(os.path.join(directory, name)) continue m = tail_re.match(base) if not m: # does not end with delimiter+digits -> not a sequence frame singles.append(os.path.join(directory, name)) continue prefix = m.group("prefix") digits = m.group("digits") pad = len(digits) suffix = "" # by the rule: digits must be immediately before the dot key: SeqKey = (directory, prefix, pad, suffix, ext.lstrip(".")) keys.setdefault(key, None) return list(keys.keys()), singles def _compact_frame_list(frames_sorted: List[int]) -> str: """Compact a sorted list of ints into '1-5,7,9-12'.""" if not frames_sorted: return "" frames_sorted.sort() out: List[str] = [] s = e = frames_sorted[0] for f in frames_sorted[1:]: if f == e + 1: e = f else: out.append(f"{s}-{e}" if s != e else f"{s}") s = e = f out.append(f"{s}-{e}" if s != e else f"{s}") return ",".join(out) # # def split_sequence_frange(sequence): # """ # Takes the result of a lj_list_dir, and gives back the file path as well as the sequence # # Args: # sequence: The sequence to be split # # Returns: # The file path and the sequence of the given sequence or undefined if no sequence is found # # """ # frange = re.search(SPLIT_SEQ_REGEX, sequence) # if frange: # return sequence.split(frange.group(0))[0], frange.group(0).replace(" ", "") # else: # return # # # def split_sequence(sequence: str): # """ # Splits a sequence with a match for ######, as well as %0#d # # Args: # sequence: The sequence to be split # # Returns: # The sequence split by the match for ######, as well as %0#d or undefined if no sequence is found # # """ # frange = None # # group = "" # # if "#" in sequence: # frange = re.search(SEQ_SPLIT, sequence) # group = frange.group(0) # elif "%" in sequence: # frange = re.search(SEQ2_SPLIT, sequence) # group = frange.group(0) # elif ".*." in sequence: # frange = "this" # group = "*." # if frange: # return sequence.split(group)[0] # else: # return
[docs] def remove_root(filepath): """ Removes the root from the given filepath Args: filepath: The filepath to be processed Returns: The filepath with the root removed """ # TODO - move this function to PathObject - it really does belong there. # config = user_config() if filepath: root = CFG.get_prod_root() filepath = filepath.replace("\\", "/") root = root.replace("\\", "/") filey = filepath.replace(root, "") filey = re.sub("^\/", "", filey) return filey else: return ""
[docs] def remove_coobkook_root(filepath): """ Removes the cookbook root from the filepath Args: filepath: Returns: """ root = CFG.cookbook_root filepath = filepath.replace("\\", "/") root = root.replace("\\", "/") filey = filepath.replace(root, "") filey = re.sub("^\/", "", filey) return filey
[docs] def add_cookbook_root(filepath): """ Adds the cookbook root to the filepath Args: filepath: The filepath to be processed Returns: The filepath with the cookbook root added """ root = CFG.cookbook_root filepath = filepath.replace("\\", "/") root = root.replace("\\", "/") return "{}/{}".format(root, filepath)
[docs] def add_root(path_): root = CFG.get_prod_root() filepath = "{}/{}".format(root, path_) return filepath.replace("\\", "/")
[docs] def hd_proxy_exists(hd_proxy_path, frame_range): if "#" in hd_proxy_path: files = glob.glob("{}*.jpg".format(hd_proxy_path.split("##")[0])) if files: sframe, eframe = frame_range.split("-") length = int(eframe) - int(sframe) + 1 if len(files) != length: logging.info( "Full HD Proxy Files not found at: {}, " "Proxy Must Exist before Creating mov".format(hd_proxy_path) ) return False else: logging.info( "HD Proxy Sequence Found at: {}, Creating Web Preview".format( hd_proxy_path ) ) return True else: logging.info( "HD Proxy Path not found at: {}, Proxy Must Exist before Creating mov".format( hd_proxy_path ) ) return False
[docs] def get_short_name(string): return "".join(filter(str.isalpha, string)).lower()[0:3]
[docs] def get_projects(company: str = ""): """ Returns projects for "company" Args: company: The company to get the projects for (Default value = "") Returns: A list of projects for the given company """ from cgl.core.path.object import PathObject if not company: company = CFG.company d = {"company": company, "tree": "source"} po = PathObject.from_dict(d) projects = po.listdir_by_element("project") return projects
[docs] def edit_publish_msd(publish_path_object, wait=10): """ Edits the published MSD path to contain "publish" paths. Args: publish_path_object: The path object to be edited wait (int): The time to wait before editing the publish path (Default value = 10) Returns: True if the publish path was edited successfully, otherwise False """ from cgl.core import msd from cgl.core.path.object import PathObject version = publish_path_object.version publish_msd_path = msd.get_msd_path(publish_path_object) time.sleep(wait) if os.path.exists(publish_msd_path): # TODO - i'd like this to be smarter and just continue to try ever N seconds until it can load something. publish_msd = load_json(publish_msd_path) if publish_msd: render_files = publish_msd["render_files"] source_files = publish_msd["source_files"] if render_files: for rf in render_files: user_path = publish_msd["render_files"][rf] po = PathObject.from_path_string(add_root(user_path)) publish_path = po.copy(user="publish", version=version).get_path() publish_msd["render_files"][rf] = publish_path if source_files: for sf in source_files: user_path = publish_msd["source_files"][sf] po = PathObject.from_path_string(add_root(user_path)) publish_path = po.copy(user="publish", version=version).get_abs_path publish_msd["source_files"][sf] = publish_path save_json(filepath=publish_msd_path, data=publish_msd) return True else: logging.error( f"Could not load: {publish_msd_path}, this is typically caused when the wait time before editing" f"is not long enough on a publish. Try increasing default wait time on edit_publish_msd()" ) return False
[docs] def edit_project_msd(entity_object): from cgl.core.msd import ProjectMsd proj_msd = ProjectMsd(entity_object.project) proj_msd.add_entity(entity_object)
[docs] def read_publish_msd(user_render_dir): publish_msd_path = os.path.join(user_render_dir, "publish.msd") json_obj = load_json(publish_msd_path) publish_path = json_obj["publish_path"] return publish_path
[docs] def get_latest_asset_publish(project, sequence="*", asset=None, task="mdl"): if not project: logging.exception("Project is a required field") if not sequence: sequence = "*" if not asset: logging.exception("asset is a required field") d = { "project": project, "sequence": sequence, "shot": asset, "task": task, "tree": "assets", } return d
[docs] def resolve_new_path(path_object, resolve_list): po = path_object.copy() for var in resolve_list: items = po.listdir_by_element(var) if isinstance(items, list): # Remove any items that start with "." items = [item for item in items if not item.startswith(".")] if items: # There is something left after removing the items item = pick_best_default_item(items, var) po.set_attrx(var, item, dont_check=True) return po
[docs] def pick_best_default_item(items, variable): item = None if variable == "user": cu = get_user_name() # change this to get the user from the user globals if cu in items: item = cu elif "publish" in items: item = "publish" elif items: item = items[0] if variable == "version": if items: items.sort() item = items[-1] if variable == "resolution": if "high" in items: item = "high" else: if items: item = items[0] return item
[docs] def get_dcc_from_task_file(task_file): """ returns the dcc from a task file from the cookbook expecting a path like: cookbook/$DCC/tasks/$TASK.py Args: task_file: Returns: """ # ensure the path is using "/" as a delimiter task_file = task_file.replace("\\", "/") # split the path and take the 3rd element from the end return task_file.split("/")[-4]
[docs] def split_after(path_object, attr): # TODO - this must be updated to match how glob_project_element works """ Convenience function that returns the path after splitting it at the desired attribute. For example, splitting a path at `project` would return the path up to and including the project variable:: split_after(path_object, "project") Args: path_object: The path object to be split attr: The attribute to split the path at Returns: The path after splitting it at the desired attribute. """ value = path_object.__dict__[attr] if value: path_ = path_object.get_path() front = path_.split(value)[0] return os.path.join(front, value).replace("\\", "/") else: logging.info("Attr: {} does not exist".format(attr)) return path_object.get_path()
if __name__ == "__main__": path = r"E:\Alchemy\CGL\jlt\PUBLISHES\render\assets\char\jack\mdl\default\tom\000.000\high" print(lj_list_dir(path, return_sequences=False))