Source code for cgl.plugins.otio.tools.fcp.fcp_convert_scenes_to_aaf

import glob
import aaf2
import argparse
import pathlib
import traceback
import os
import time
from concurrent.futures import ProcessPoolExecutor, as_completed

import opentimelineio as otio

from cgl.plugins.otio.tools import mp4_chopper, extract_shots
from cgl.plugins.otio.tools.aaf import aaf_embedded_media_tool, image_to_aaf
from cgl.plugins.otio.tools.fcp import fcp_extract_shots, fcp_media_linker


def scale_ranges(in_range, new_rate=24):
    assert in_range
    start_time = scale_rational(in_range.start_time, new_rate)
    duration = scale_rational(in_range.duration, new_rate)
    return otio.opentime.TimeRange(start_time, duration)


[docs] def scale_rational(value, new_rate=24): if value.rate != new_rate: new_value = value.rescaled_to(new_rate) new_value = otio.opentime.RationalTime(round(new_value.value), new_value.rate) return new_value return value
[docs] def scale_ranges(in_range, new_rate=24): assert in_range start_time = scale_rational(in_range.start_time, new_rate) duration = scale_rational(in_range.duration, new_rate) return otio.opentime.TimeRange(start_time, duration)
[docs] def add_transition_metadata(t): t.in_offset = scale_rational(t.in_offset) t.out_offset = scale_rational(t.out_offset) t.in_offset.to_frames() meta = {} meta["PointList"] = [{"Time": 0, "Value": 0.0}, {"Time": 1.0, "Value": 100.0}] meta["CutPoint"] = t.in_offset.to_frames() op = { "DataDefinition": {"Name": "Picture"}, "Identification": "89d9b67e-5584-302d-9abd-8bd330c46841", "IsTimeWarp": False, "OperationCategory": "OperationCategory_Effect", "NumberInputs": 2, "Description": "", "Name": "VideoDissolve_2", "ParametersDefined": {}, } meta["OperationGroup"] = {"Operation": op} t.metadata["AAF"] = meta
[docs] def remove_transitions(track): new_track = otio.schema.Track(kind=track.kind) for item in track: if isinstance(item, otio.schema.Transition): continue new_track.append(item.deepcopy()) return new_track
[docs] def flatten_stacks(timeline): new_timeline = otio.schema.Timeline() new_timeline.global_start_time = ( timeline.global_start_time or otio.opentime.RationalTime(0, 24) ) new_timeline.name = timeline.name timeline_duration = timeline.duration() for track in timeline.tracks: # add gap to tail if track is too track_duration = track.duration() if track_duration < timeline_duration: gap = otio.schema.Gap( source_range=otio.opentime.TimeRange( duration=timeline_duration - track_duration ) ) track.append(gap) new_track = otio.schema.Track(kind=track.kind) new_timeline.tracks.append(new_track) for item in track: if isinstance(item, otio.schema.Transition): continue if isinstance(item, otio.schema.Stack): flatten_tracks = [] for t in item: if t.kind == track.kind: flatten_tracks.insert(0, t) new_item = otio.algorithms.flatten_stack(flatten_tracks) new_item = remove_transitions(new_item) if item.source_range: new_item = otio.algorithms.track_trimmed_to_range( new_item, item.source_range ) # print(new_item) new_item.source_range = new_item.available_range() # print(item.metadata) for key, value in item.metadata.items(): new_item.metadata[key] = value # new_item.source_range = track. new_track.source_range = otio.opentime.TimeRange( otio.opentime.RationalTime(0, 24), item.source_range.duration ) new_item.name = item.name or new_item.name new_track.append(new_item) else: new_track.append(item.deepcopy()) return new_timeline
[docs] def simplify_timeline(original_timeline, media_dict): timeline = flatten_stacks(original_timeline) markers = [] video_track_index = -1 for track in timeline.tracks: if track.kind == otio.schema.TrackKind.Video: video_track_index += 1 for child in track.find_children(): if hasattr(child, "source_range") and child.source_range: child.source_range = scale_ranges(child.source_range) if ( track.kind == otio.schema.TrackKind.Video and isinstance(child, otio.schema.Track) and fcp_extract_shots.is_shot(child) ): shot_name = extract_shots.clean_name(child.name) print(f"found shot: {shot_name}") marker = otio.schema.Marker(shot_name) child.markers.append(marker) range_in_parent = child.range_in_parent() cut_in = range_in_parent.start_time.to_frames() markers.append([cut_in, video_track_index, marker]) if isinstance(child, otio.schema.Clip): clip = child if clip.media_reference: available_duration = 0 if clip.media_reference.available_range: available_duration = ( clip.media_reference.available_range.duration.to_seconds() ) # zero duration is a still if available_duration == 0: new_duration = max( 720 * 2, 720 + clip.source_range.duration.to_frames() ) new_range = otio.opentime.TimeRange( otio.opentime.RationalTime(0, 24), otio.opentime.RationalTime(new_duration, 24), ) clip.media_reference.available_range = new_range clip.source_range = otio.opentime.TimeRange( otio.opentime.RationalTime(720, 24), clip.source_range.duration, ) else: available_range = scale_ranges( clip.media_reference.available_range ) start_offset = ( clip.source_range.start_time.to_frames() - available_range.start_time.to_frames() ) available_dur = available_range.duration.to_frames() out_point = ( start_offset + clip.source_range.duration.to_frames() ) if out_point > available_dur: print( f"Warning: {track.kind} {clip.name} source range larger the available_range, extending {out_point} > {available_dur}" ) available_range = available_range.extended_by( clip.source_range ) clip.media_reference.available_range = available_range if ( hasattr(clip.media_reference, "target_url") and clip.media_reference.target_url ): target_url = clip.media_reference.target_url name, ext = os.path.splitext(target_url) ext = ext.lower() if not os.path.exists(target_url): continue if ext not in media_dict: media_dict[ext] = {} if target_url not in media_dict[ext]: media_dict[ext][target_url] = [clip] else: media_dict[ext][target_url].append(clip) elif isinstance(child, otio.schema.Transition): add_transition_metadata(child) return timeline, markers
[docs] def try_convert(target_url, output_aaf, mob_name, clips): pad_audio = 0 mob_id = None for clip in clips: pad_audio = max( clip.media_reference.available_range.duration.to_seconds(), pad_audio ) pad_audio = max(clip.source_range.duration.to_seconds(), pad_audio) try: mob_id = aaf_embedded_media_tool.create_aaf_file( [target_url], output_aaf, mob_name, mob_name, frame_rate=24, ignore_alpha=True, use_embedded_timecode=True, copy_dnxhd_streams=True, pad_audio=pad_audio, ) if mob_id: image_to_aaf.add_network_locators(output_aaf, target_url, mob_id) except: print(traceback.format_exc()) return mob_id
[docs] def find_aaf(aaf_name, aaf_search_paths): for dir_path in aaf_search_paths: path = os.path.join(dir_path, aaf_name) if os.path.exists(path): print(f"found: {path}") return path for subdir in os.listdir(dir_path): path = os.path.join(dir_path, subdir, aaf_name) if os.path.exists(path): print(f"found: {path}") return path return None
[docs] def convert_otio_fcp_scenes(src_files, output_dir, aaf_search_paths): start = time.time() fcp_media_linker.register_media_linker() media_dict = {} timelines = [] aaf_sequences_dirname = os.path.join(output_dir, "aaf_sequences") if not os.path.exists(aaf_sequences_dirname): os.makedirs(aaf_sequences_dirname) otio_sequences_dirname = os.path.join(output_dir, "otio_sequences") if not os.path.exists(otio_sequences_dirname): os.makedirs(otio_sequences_dirname) aaf_media_dirname = os.path.join(output_dir, "aaf_media") if not os.path.exists(aaf_media_dirname): os.makedirs(aaf_media_dirname) work_dirname = os.path.join(output_dir, "work") if not os.path.exists(work_dirname): os.makedirs(work_dirname) for path in src_files: timeline = otio.adapters.read_from_file( path, media_linker_name="fcp_media_linker" ) new_timeline, markers = simplify_timeline(timeline, media_dict) timelines.append([new_timeline, markers]) aaf_elements = [] for ext, files in media_dict.items(): # test errror handling # for i in range(10): # files[f"badd file {i:04d} {ext}"] = [] if ext in (".psd", ".psb", ".jpg", ".png"): if ext in (".psd", ".psb"): media_dirname = os.path.join(aaf_media_dirname, "psd") else: media_dirname = os.path.join(aaf_media_dirname, "images") if not os.path.exists(media_dirname): os.makedirs(media_dirname) encode_items = [] existing_aafs = {} for target_url, clips in files.items(): # skip urls that contain single quotes, handle in slow method if target_url.count("'"): continue basename = f"{os.path.basename(target_url)}.aaf" output_aaf = find_aaf(basename, aaf_search_paths) if output_aaf: existing_aafs[target_url] = output_aaf else: encode_items.append(target_url) dnxhr_mapping = {} if encode_items: dnxhr_mapping = mp4_chopper.still_dnxhr_encoder( encode_items, work_dirname ) executor = ProcessPoolExecutor(4) try: # process_psd_media(target_url, output_aaf, clips) futures = [] for target_url, clips in files.items(): existing_aaf = existing_aafs.get(target_url, None) basename = os.path.basename(target_url) output_aaf = os.path.join(media_dirname, f"{basename}.aaf") dnxhr_file = dnxhr_mapping.get(target_url, None) f = executor.submit( mp4_chopper.process_psd_media, target_url, output_aaf, dnxhr_file, existing_aaf, ) futures.append(f) i = 0 for f in as_completed(futures): target_url, output_aaf, mob_id = f.result() print(f"{i:05d}/{len(files):05d} {ext} {target_url}") clips = media_dict[ext][target_url] for clip in clips: clip.metadata["AAF"] = { "SourceID": str(mob_id or aaf2.mobid.MobID.new()) } if output_aaf: aaf_elements.append(output_aaf) i += 1 finally: executor.shutdown(False, cancel_futures=True) elif ext in (".wav", ".aif", ".aiff", ".m4a", ".mp3", ".mov", ".mp4"): if ext in (".mov", ".mp4"): media_dirname = os.path.join(aaf_media_dirname, "video") else: media_dirname = os.path.join(aaf_media_dirname, "audio") if not os.path.exists(media_dirname): os.makedirs(media_dirname) for i, (target_url, clips) in enumerate(files.items()): print(f"{i:05d}/{len(files):05d} {ext} {target_url}") basename = f"{os.path.basename(target_url)}.aaf" mob_id = None output_aaf = find_aaf(basename, aaf_search_paths) if output_aaf: mob_id = mp4_chopper.get_master_mob_id(output_aaf) if not mob_id: output_aaf = os.path.join(media_dirname, basename) mob_id = mp4_chopper.get_master_mob_id(output_aaf) if not mob_id: mob_id = try_convert(target_url, output_aaf, basename, clips) for clip in clips: clip.metadata["AAF"] = { "SourceID": str(mob_id or aaf2.mobid.MobID.new()) } aaf_elements.append(output_aaf) for path, (timeline, markers) in zip(src_files, timelines): basename = os.path.basename(path) out_aaf = os.path.join(aaf_sequences_dirname, basename + ".aaf") print(f"writing {out_aaf}") otio.adapters.write_to_file(timeline, out_aaf, use_empty_mob_ids=True) mp4_chopper.add_markers(out_aaf, markers) out_otio = os.path.join(otio_sequences_dirname, basename + ".aaf.otio") otio.adapters.write_to_file(timeline, out_otio) dur = time.time() - start print(f"timelines converted in {dur/60.0} minutes")
[docs] def run_cli(): parser = argparse.ArgumentParser(prog="convert") parser.add_argument("source_dir", type=pathlib.Path) parser.add_argument("output_dir", type=pathlib.Path) parser.add_argument( "-s", "--search", type=pathlib.Path, action="append", dest="search_paths", help="aaf search paths for existing media", ) args = parser.parse_args() assert os.path.isdir(args.source_dir) assert os.path.isdir(args.output_dir) files = glob.glob(os.path.join(args.source_dir, "*.otio")) aaf_search_paths = [str(item) for item in args.search_paths] convert_otio_fcp_scenes(files, str(args.output_dir), aaf_search_paths)
if __name__ == "__main__": # run_cli() # files = glob.glob(".\\tests\\*.otio") # convert_media(files, 'output')