Source code for cgl.core.review

from __future__ import annotations

import glob
import logging
import re
import shutil

try:
    import aspose.pydrawing as drawing
    import aspose.slides as slides

except ImportError:
    pass
from pathlib import Path
from cgl.core.path.object import PathObject
from cgl.core.path.support import create_folders
from cgl.pathlib.sequence_model import Sequence
from cgl.core.path.support import get_file_type
from cgl.core.utils.general import cgl_execute, clean_file_list
from cgl.core.config.query import AlchemyConfigManager
import cgl.plugins.alchemy.query as alc_query
from PIL import Image
import os
import sys

CFG = AlchemyConfigManager()


FFMPEG_PATH = alc_query.ffmpeg_path()


[docs] def create_jpg_proxies_from_path_object(path_object: PathObject, all=True): """ Create jpg proxes from a PathObject(). """ source_path = path_object.get_source_path(dirname=True) dest_path = path_object.get_hd_proxy_path() dest_folder, filename = os.path.split(dest_path) create_folders(dest_path) create_jpg_proxies(source_path, dest_folder, filename) return True
[docs] def create_jpg_proxies(source_folder, dest_folder=None, filename=None): """ Creates a jpg proxy of each image_plane in the source folder :param source_folder: :param dest_folder: :return: """ ext_map = CFG.util_config["ext_map"] proxy_name = os.path.splitext(filename)[0] i = get_highest_number(dest_folder, proxy_name) print(source_folder) clean_list = clean_file_list(os.listdir(source_folder)) print(clean_list) for f in clean_list: file, ext = os.path.splitext(f) ext = ext.lower() source_image = os.path.join(source_folder, f) if ext in ext_map.keys(): file_type = ext_map[ext] if file_type == "image_plane": i += 1 pname = proxy_name.replace(".####", "") new_file = f"{pname}.{i:04d}.jpg" dest_image = os.path.join(dest_folder, new_file) convert_image_to_jpg(source_image, dest_image) elif file_type == "image": i += 1 pname = proxy_name.replace(".####", "") new_file = f"{pname}.{i:04d}.jpg" dest_image = os.path.join(dest_folder, new_file) print(2, source_image, dest_image) convert_image_to_jpg(source_image, dest_image) elif file_type == "movie": if movie_is_only_file(source_image): dest_movie = os.path.join(dest_folder, f"{proxy_name}.mp4") convert_movie_to_mp4(source_image, dest_movie) print("comverting movie to mp4 for shotgrid") return dest_movie else: i = convert_mov_to_jpg(source_image, dest_folder, proxy_name) print("converting movie to jpgs as part of larger sequence") return None elif file_type == "doc" or file_type == "pdf": # convert doc to jpg is smart enought to handle "i" and return the next number i = convert_doc_to_jpg(source_image, dest_folder, proxy_name) elif file_type == "ppt": i = convert_ppt_to_jpg(source_image, dest_folder, proxy_name) elif ext == ".gif": i += 1 new_file = f"{proxy_name}.{i:04d}.jpg" dest_image = os.path.join(dest_folder, new_file) convert_NA_to_jpg(source_image, dest_image) elif ext == ".webp": i += 1 new_file = f"{proxy_name}.{i:04d}.jpg" dest_image = os.path.join(dest_folder, new_file) convert_NA_to_jpg(source_image, dest_image) elif file_type == "mesh": return None else: print( f"can't convert {ext} yet of filetype: {file_type}\n\t{source_image}" ) return False else: print("{} not found in valid extensions".format(ext)) return False return None
# UTILITY FUNCTIONS:
[docs] def timestamp_from_frame_number(frame_number, fps): """ converts a frame number to a timestamp Args: frame_number: fps: Returns: frame number """ return frame_number / fps
[docs] def frame_from_timestamp(timestamp, fps): """ converts a timestamp to a frame number Args: timestamp: fps: Returns: timestamp """ return int(timestamp * fps)
# def get_frame_rate(source_mov): # """ # gets the frame rate of the movie # :param source_mov: # :return: # """ # return Sequence(source_mov).get_fps()
[docs] def renumber_sequence(sequence, dest_folder, new_start_frame=1): """ renumbers a sequence to start at 1 Args: sequence: sequence must have #### or %04d or other sequence delimiter in the name. new_start_frame: the new start frame """ pass
[docs] def get_highest_number(dest_folder, dest_name, ext=".jpg"): """ gets all images in the folder and returns the highest number Args: dest_folder: Returns: highest number Raises: IndexError: if no files are found """ try: highest = glob.glob(f"{dest_folder}/{dest_name}*.jpg")[-1] match = re.search(r"\d{4}\.jpg", highest) num, ext = os.path.splitext(match.group()) highest_num = int(num) except IndexError: return 0 return highest_num
[docs] def get_start_frame(dest_sequence): """ gets the start frame of the sequence Args: dest_sequence: Returns: start frame """ folder = os.path.split(dest_sequence)[0] files = os.listdir(folder) files = clean_file_list(files) if len(files) > 0: files.sort() first_file = files[0] match = re.search(r"\d{4}\.jpg", first_file) num, ext = os.path.splitext(match.group()) return int(num)
[docs] def movie_is_only_file(source_mov): """ Checks to see if the movie is the only file in the folder Args: source_mov: Returns: True if the movie is the only file in the folder otherwise False """ folder, filename = os.path.split(source_mov) files = os.listdir(folder) if len(files) == 1: return True else: return False
# FUNCTIONS FOR CONVERTING TO jpg
[docs] def convert_NA_to_jpg(source_gif, dest_image, width=1920, height=1080): """ Converts anything oiiotool can't handle to a jpg. Currently, this applies to gifs and webp files or something Args: source_gif: dest_image: width: height: """ source_gif = source_gif.replace("/", "\\") dest_image = dest_image.replace("/", "\\") command = f'magick convert "{source_gif}" -resize {width}x{height} "{dest_image}"' cgl_execute(command)
[docs] def convert_ppt_to_jpg(source_ppt, dest_folder, dest_name): """ Converts a ppt to a jpg Args: source_ppt: dest_folder: dest_name: Returns: highest number """ dest_image_docs = None print("loading {}".format(source_ppt)) prs = slides.Presentation(source_ppt) filename = os.path.basename(source_ppt) print("finished loading:") filename, ext = os.path.splitext(filename) # images = [] number = get_highest_number(dest_folder, dest_name) for slide in prs.slides: number += 1 print(f"\tconverting slide {number}") dest_image = os.path.join(dest_folder, f"{dest_name}.{number:04d}.jpg") dest_image_docs = os.path.join( dest_folder, "docs", f"{filename}.{number:04d}.jpg" ) docs_dir = os.path.dirname(dest_image_docs) if not os.path.exists(docs_dir): os.makedirs(docs_dir) slide.get_thumbnail(2, 2).save( dest_image_docs, drawing.imaging.ImageFormat.jpeg ) convert_image_to_jpg(dest_image_docs, dest_image) docs_folder = os.path.dirname(dest_image_docs) shutil.rmtree(docs_folder) return number
[docs] def convert_doc_to_jpg(source_doc, dest_folder, dest_name, width=1920, height=1080): """ Converts a doc/docs to a jpg Args: source_doc: dest_folder: width: height: """ # if os is windows if not sys.platform == "win32": print("This function only works on windows") return import aspose.words as aw dest_image_docs = None print("loading {}".format(source_doc)) doc = aw.Document(source_doc) filename = os.path.basename(source_doc) print("finished loading:") filename, ext = os.path.splitext(filename) # images = [] if ".####" in dest_name: dest_name = dest_name.replace(".####", "") number = get_highest_number(dest_folder, dest_name) for page in range(0, doc.page_count): print(f"\tconverting page {page}") number += 1 extracted_page = doc.extract_pages(page, 1) dest_image = os.path.join(dest_folder, f"{dest_name}.{number:04d}.jpg") dest_image_docs = os.path.join( dest_folder, "docs", f"{filename}.{number:04d}.jpg" ) extracted_page.save(dest_image_docs) convert_image_to_jpg(dest_image_docs, dest_image) docs_folder = os.path.dirname(dest_image_docs) shutil.rmtree(docs_folder) return number
[docs] def convert_image_to_jpg(source_image, dest_image=None, width=1920, height=1080): """ Converts an image to JPG and resizes it using Pillow (PIL). Args: source_image (str): Path to the source image. dest_image (str, optional): Output path. If None, auto-generates. width (int): Target width (default: 1920). height (int): Target height (default: 1080). Returns: str: Path to the converted JPG file. """ # Normalize paths source_image = os.path.normpath(source_image) # Auto-generate dest_image if not provided if not dest_image: # (Assuming PathObject logic remains unchanged) po = PathObject().from_path_string(source_image) dest_image = po.get_preview_path(ext=".jpg") dest_folder = os.path.dirname(dest_image) os.makedirs(dest_folder, exist_ok=True) dest_image = os.path.normpath(dest_image) try: # Open the image with Image.open(source_image) as img: # Resize while maintaining aspect ratio (similar to oiiotool's -fit) img.thumbnail((width, height), Image.Resampling.LANCZOS) # Convert to RGB if needed (e.g., for PNG with alpha) if img.mode in ("RGBA", "P"): img = img.convert("RGB") # Save as JPG img.save(dest_image, "JPEG", quality=95) print(f"Converted: {source_image} -> {dest_image}") return dest_image except Exception as e: print(f"Error converting {source_image}: {str(e)}") return None
[docs] def convert_exr_to_hdr(source_exr): """ Converts an exr to an hdr file using oiiotool """ source_exr = source_exr.replace("/", "\\") dest_hdr = source_exr.replace(".exr", ".hdr") dest_hdr = dest_hdr.replace("/", "\\") command = f'oiiotool "{source_exr}" -o "{dest_hdr}"' cgl_execute(command) return dest_hdr
[docs] def convert_mov_to_jpg( source_movie, dest_folder, dest_name, fps=24, width=1920, height=1080 ): """ Uses ffmpeg to convert a movie to a jpg Args: source_movie: dest_name: fps: width: height: Returns: highest number """ res = "{}x{}".format(width, height) source_movie = source_movie.replace("/", "\\") dest_folder = dest_folder.replace("/", "\\") number = get_highest_number(dest_folder, dest_name) dest_image = os.path.join(dest_folder, f"{dest_name}.%04d.jpg") command = f'{FFMPEG_PATH} -i "{source_movie}" -start_number {number + 1} -vf scale={res} "{dest_image}"' cgl_execute(command) return get_highest_number(dest_folder, dest_name)
# FUNCTIONS FOR CONVERTING TO MP4
[docs] def create_proxy_mp4(path_object, fps=24, width=1920, height=1080): """ Creates a proxy mp4 for a given path object Args: path_object: fps: width: height: """ if path_object.file_type == "sequence": source_sequence = path_object.get_path() start_frame = path_object.layer_frame_range.split("-")[0] dest_mp4 = path_object.get_preview_path() wav_file = get_wave_file(os.path.dirname(source_sequence)) convert_jpg_sequence_to_mp4( source_sequence, start_frame, dest_mp4, wav_file=wav_file, fps=fps, width=width, height=height, ) return True return False
[docs] def get_wave_file(path): """ Checks if a wav file exists in the same folder as a given path Args: path: Returns: wav file if it exists otherwise None """ folder = path wavs = glob.glob(os.path.join(folder, "*.wav")) if wavs: return wavs[0].replace("\\", "/") return None
[docs] def convert_exr_sequence_to_mp4(path_object, fps=24): """ converts an exr sequence into an mp4 file suitable for web previews Args: path_object: fps: Returns: path to the mp4 file """ from cgl.plugins.otio.tools import media_encoder # TODO - split off the frame range if it has one: source_sequence = path_object.get_path() if " " in source_sequence: path_, frames = source_sequence.split(" ") else: path_ = source_sequence print(f"SOURCE SEQUENCE: {path_}") path_ = path_.replace("####", "%04d") print(path_) media_encoder.encode_video( path_, os.path.dirname(path_), framerate=fps, cleanup=True, use_gui=True, ) return path_object.get_preview_path()
[docs] def find_frame_number(pattern_path): """ Replaces %04d in paths with the actual frame number found on disk. Example: Input: "path/to/sequence.%04d.png" Output: "path/to/sequence.0001.png" (if 0001.png exists) """ # Convert to Path object for cross-platform handling path = Path(pattern_path) # Extract directory and filename pattern dir_path = path.parent pattern = path.name # Find all matching files frame_files = [] for f in dir_path.glob(pattern.replace("%04d", "[0-9]" * 4)): if match := re.search(r"(\d{4})", f.name): frame_files.append(int(match.group(1))) if not frame_files: raise FileNotFoundError(f"No frames found matching: {pattern_path}") # Use the first frame found frame_num = min(frame_files) return str(path.with_name(pattern.replace("%04d", f"{frame_num:04d}")))
[docs] def convert_single_jpg_to_mp4( source_jpg, start_frame=1, dest_mp4=None, fps=24, width=1920, height=1080 ): """ Converts a single jpg to an mp4 file suitable for web previews Args: source_jpg: source jpg file start_frame: start frame of the sequence dest_mp4: destination mp4 file fps: frame rate of the sequence width: width of the output height: height of the output Returns: path to the mp4 file """ if not dest_mp4: dest_mp4 = PathObject().from_path_string(source_jpg).get_preview_path() if os.path.exists(dest_mp4): os.remove(dest_mp4) dest_folder = os.path.dirname(dest_mp4) if not os.path.exists(dest_folder): os.makedirs(dest_folder) filein = source_jpg.replace("####", "0001") ffmpeg_cmd = ( f"{FFMPEG_PATH} -start_number {start_frame} -framerate {fps} -i {filein} " f"-s:v {width}x{height} -c:v libx264 -pix_fmt yuv420p {dest_mp4}" ) cgl_execute(ffmpeg_cmd) return dest_mp4
[docs] def convert_jpg_sequence_to_mp4( source_sequence, start_frame=None, dest_mp4=None, wav_file=None, fps=24, width=1920, height=1080, force=True, ): """ converts a jpg sequence into an mp4 file suitable for web previews Args: source_sequence: source jpg sequence start_frame: start frame of the sequence dest_mp4: destination mp4 file wav_file: optional wav file to use for audio fps: frame rate of the sequence width: width of the output height: height of the output """ print("Source Sequence Path", source_sequence) src_seq = Sequence.from_path(source_sequence) start_frame = src_seq.start_frame print(start_frame) if not start_frame: print("no start frame found") return if not dest_mp4: dest_mp4 = PathObject().from_path_string(source_sequence).get_preview_path() if os.path.exists(dest_mp4): if force: os.remove(dest_mp4) else: logging.info(f"movie file exists at: {dest_mp4}, skipping creation") # filein = src_seq.filepath_printf ffmpeg_cmd = " ".join(src_seq.ffmpeg_encode_cmd(dest_mp4)) if wav_file: print("Talk to CGL about adding wav_file support to ffmpeg_cmd") # ffmpeg_cmd = ( # f"{FFMPEG_PATH} -start_number {start_frame} -framerate {fps} -i {filein} -i {wav_file} -s:v {width}x{height} -c:v libx264 -c:a aac " # f"{encoder} -profile:v {profile} -crf {constant_rate_factor} -pix_fmt {pixel_format} " # f"-r {output_frame_rate} {filter_arg} {dest_mp4}" # ) dest_folder = os.path.dirname(dest_mp4) if not os.path.exists(dest_folder): # TODO replace this with stuff from core.path.version create_folders, although this is easier for now. os.makedirs(dest_folder) os.makedirs(dest_folder.replace("render", "source")) print(ffmpeg_cmd) print(f"Creating {dest_mp4}") cgl_execute(ffmpeg_cmd, new_window=True) return dest_mp4
[docs] def convert_movie_to_mp4( source_movie, dest_mp4=None, fps=24, source_audio=None, width="1920", height="1080" ): """ converts a movie into an mp4 file suitable for web previews Args: source_movie: dest_mp4: fps: width: height: Returns: path to the mp4 file """ if dest_mp4 is None: dest_mp4 = PathObject.from_path_string(source_movie).get_preview_path() if os.path.exists(dest_mp4): os.remove(dest_mp4) dest_folder = os.path.dirname(dest_mp4) if not os.path.exists(dest_folder): os.makedirs(dest_folder) ffmpeg_cmd = "" output_frame_rate = fps encoder = "libx264" profile = "high" constant_rate_factor = ( "24" # i need to test this with stuff that's not created at 24fps - ) pixel_format = "yuv420p" # vcodec = "-vcodec libx264 -pix_fmt yuv420p -vf 'scale=trunc((a*oh)/2)*2:720' -g 30 -b:v 2000k -vprofile high -bf 0" # acodec = "-strict experimental -acodec aac -ab 160k -ac 2" filter_arg = ( r' -filter:v "scale=iw*min($width/iw\,$height/ih):ih*min($width/iw\,$height/ih),' r" pad=$width:$height:($width-iw*min($width/iw\,$height/ih))/2:" r'($height-ih*min($width/iw\,$height/ih))/2" '.replace( "$width", str(width) ).replace("$height", str(height)) ) ffmpeg_cmd = ( f'{FFMPEG_PATH} -i "{source_movie}" -s:v {width}x{height} -b:v 50M -c:v {encoder} -profile:v {profile}' f" -crf {constant_rate_factor} -pix_fmt {pixel_format} -r {output_frame_rate} {filter_arg} {dest_mp4}" ) if source_audio: ffmpeg_cmd = ( f'{FFMPEG_PATH} -i "{source_movie}" -i {source_audio}-s:v {width}x{height} -b:v 50M -c:v {encoder} -profile:v {profile}' f" -crf {constant_rate_factor} -pix_fmt {pixel_format} -r {output_frame_rate} {filter_arg} {dest_mp4}" ) cgl_execute(ffmpeg_cmd) return dest_mp4
# FUNCTIONS FOR DEALING WITH AUDIO # def extract_wav_from_movie( # filein, fileout=None, processing_method="local", dependent_job=None # ): # """ # extracts audio from a video file. # Args: # filein: location of video file to extract from. # fileout: location of .wav file to be created # processing_method: local, smedge, or deadline processing. # dependent_job: # """ # # check if the input is an approved video format. # cfg = ProjectConfig.getLastConfig() # paths = cfg.util_config["paths"] # ext_map = cfg.util_config["ext_map"] # file_, ext = os.path.splitext(filein) # if ext in ext_map.keys(): # if ext_map[ext] == "movie": # if not fileout: # fileout = "%s.wav" % file_ # if not fileout.endswith(".wav"): # logging.info("%s is not a .wav file, aborting wav extraction") # return # command = "%s -i %s -acodec pcm_s16le -ac 2 %s" % ( # paths[f"{FFMPEG_PATH}"], # filein, # fileout, # ) # cgl_execute( # command, # command_name="Audio Extraction", # methodology=processing_method, # WaitForJobID=dependent_job, # new_window=True, # ) # return fileout # else: # logging.info( # "Extension %s not cataloged in globals, please add it to the ext_map dictionary" # % ext # )
[docs] def convert_to_wav(source_audio, dest_wav): """ use ffmpeg to convert an audio file to a wav file Args: source_audio: dest_wav: """ pass
[docs] def create_review_jpgs(path_object): """ """ print(path_object.get_abs_path(dirname=True)) file_string = path_object.get_path() file_type = get_file_type(file_string) print("Creating {} review jpgs for: {}".format(file_type, path_object.get_path())) if file_type == "movie": return True if file_type == "sequence": ext = path_object.ext if " " in ext: ext = ext.split(" ")[0] # if ext == ".exr": # return convert_exr_sequence_to_mp4(path_object) # else: return create_jpg_proxies_from_path_object(path_object) if file_type == "image": return create_jpg_proxies_from_path_object(path_object) if file_type == "image_plane": return create_jpg_proxies_from_path_object(path_object) elif file_type == "doc": return create_jpg_proxies_from_path_object(path_object) elif file_type == "ppt": return create_jpg_proxies_from_path_object(path_object) elif file_type == "pdf": return create_jpg_proxies_from_path_object(path_object) else: return False return False
[docs] def create_review_mp4(path_object: str | PathObject): """ Creates a review mp4 from a path object, assumes that a jpg sequences is available. """ print("Creating review mp4 for: {}".format(path_object.get_path())) file_string = path_object.get_path() file_type = get_file_type(path_object.get_path()) proxy_path = path_object.get_hd_proxy_path(ext=path_object.ext) print("Proxy Path:", proxy_path) if file_type == "image": proxy_path = path_object.get_hd_proxy_path(ext=".jpg") return convert_single_jpg_to_mp4(source_jpg=proxy_path) if file_type == "sequence": print("Converting sequence to mp4 -->") return convert_jpg_sequence_to_mp4(source_sequence=proxy_path) elif file_type == "image_plane": return convert_jpg_sequence_to_mp4(source_sequence=proxy_path) elif file_type == "movie": return convert_movie_to_mp4(source_movie=file_string) return None
# if file_type == "sequence": # if file_string.ext == ".exr": # convert_exr_sequence_to_mp4(file_string) # return True # elif file_string.ext == ".jpg": # convert_jpg_sequence_to_mp4(source_sequence=proxy_path) # else: # print( # "Unsupported/Untested sequence type {} - " # "talk to devs about adding it, should be easy".format(file_string.ext) # ) # return False # elif file_type == "video": # convert_movie_to_mp4(source_movie=file_string) # elif file_type == "image_plane": # convert_jpg_sequence_to_mp4(source_sequence=proxy_path)
[docs] def create_thumbnail(path_object, thumb_type="task"): """ Check if this is a movie or an image_plane if it's a movie use create_thumbnail if it's an image_plane use create_image_thumbnail Args: path_object: thumb_type: Returns: """ file_string = path_object.get_preview_path() file_type = get_file_type(file_string) print(file_type) if file_type == "movie": return create_movie_thumbnail(path_object, thumb_type=thumb_type) if file_type == "image_plane": return create_image_thumbnail(path_object, thumb_type=thumb_type) return False
[docs] def create_movie_thumbnail(path_object, thumb_type="task", preview_ext=".mp4"): preview_path = path_object.get_preview_path(ext=preview_ext) if os.path.exists(preview_path): task_thumb_path = path_object.get_thumb_path(thumb_type=thumb_type) thumb_dir = os.path.dirname(task_thumb_path) directory = os.path.dirname(task_thumb_path) if not os.path.exists(directory): os.makedirs(directory) if not os.path.exists(thumb_dir): os.makedirs(thumb_dir) # create a thumbnail 200x200 from the mp4 file if os.path.exists(task_thumb_path): os.remove(task_thumb_path) command = f'{FFMPEG_PATH} -i {preview_path} -vf "thumbnail,scale=177:100" -frames:v 1 {task_thumb_path}' cgl_execute(command) else: raise FileNotFoundError( f"Thubnail Creation Error: Preview File not found: {preview_path}" )
[docs] def create_image_thumbnail(path_object, thumb_type="task"): source_image = path_object.get_preview_path(ext=".jpg") if os.path.exists(source_image): dest_image = path_object.get_thumb_path(thumb_type=thumb_type) if os.path.exists(dest_image): os.remove(dest_image) convert_image_to_jpg( source_image=source_image, dest_image=dest_image, width=300, height=300 ) else: raise FileNotFoundError( f"Thubnail Creation Error: Preview File not found: {source_image}" )
if __name__ == "__main__": po = PathObject.from_path_string( "E:/Alchemy/JHCS/TTA/VERSIONS/render/assets/set/clbh/design/default/tom/000.001/high/2025-08-07-1031 set.jpg" ) create_review_jpgs(po)