import glob
import aaf2
import argparse
import pathlib
import traceback
import os
import time
from concurrent.futures import ProcessPoolExecutor, as_completed
import opentimelineio as otio
from cgl.plugins.otio.tools import mp4_chopper, extract_shots
from cgl.plugins.otio.tools.aaf import aaf_embedded_media_tool, image_to_aaf
from cgl.plugins.otio.tools.fcp import fcp_extract_shots, fcp_media_linker
def scale_ranges(in_range, new_rate=24):
assert in_range
start_time = scale_rational(in_range.start_time, new_rate)
duration = scale_rational(in_range.duration, new_rate)
return otio.opentime.TimeRange(start_time, duration)
[docs]
def scale_rational(value, new_rate=24):
if value.rate != new_rate:
new_value = value.rescaled_to(new_rate)
new_value = otio.opentime.RationalTime(round(new_value.value), new_value.rate)
return new_value
return value
[docs]
def scale_ranges(in_range, new_rate=24):
assert in_range
start_time = scale_rational(in_range.start_time, new_rate)
duration = scale_rational(in_range.duration, new_rate)
return otio.opentime.TimeRange(start_time, duration)
[docs]
def remove_transitions(track):
new_track = otio.schema.Track(kind=track.kind)
for item in track:
if isinstance(item, otio.schema.Transition):
continue
new_track.append(item.deepcopy())
return new_track
[docs]
def flatten_stacks(timeline):
new_timeline = otio.schema.Timeline()
new_timeline.global_start_time = (
timeline.global_start_time or otio.opentime.RationalTime(0, 24)
)
new_timeline.name = timeline.name
timeline_duration = timeline.duration()
for track in timeline.tracks:
# add gap to tail if track is too
track_duration = track.duration()
if track_duration < timeline_duration:
gap = otio.schema.Gap(
source_range=otio.opentime.TimeRange(
duration=timeline_duration - track_duration
)
)
track.append(gap)
new_track = otio.schema.Track(kind=track.kind)
new_timeline.tracks.append(new_track)
for item in track:
if isinstance(item, otio.schema.Transition):
continue
if isinstance(item, otio.schema.Stack):
flatten_tracks = []
for t in item:
if t.kind == track.kind:
flatten_tracks.insert(0, t)
new_item = otio.algorithms.flatten_stack(flatten_tracks)
new_item = remove_transitions(new_item)
if item.source_range:
new_item = otio.algorithms.track_trimmed_to_range(
new_item, item.source_range
)
# print(new_item)
new_item.source_range = new_item.available_range()
# print(item.metadata)
for key, value in item.metadata.items():
new_item.metadata[key] = value
# new_item.source_range = track.
new_track.source_range = otio.opentime.TimeRange(
otio.opentime.RationalTime(0, 24), item.source_range.duration
)
new_item.name = item.name or new_item.name
new_track.append(new_item)
else:
new_track.append(item.deepcopy())
return new_timeline
[docs]
def simplify_timeline(original_timeline, media_dict):
timeline = flatten_stacks(original_timeline)
markers = []
video_track_index = -1
for track in timeline.tracks:
if track.kind == otio.schema.TrackKind.Video:
video_track_index += 1
for child in track.find_children():
if hasattr(child, "source_range") and child.source_range:
child.source_range = scale_ranges(child.source_range)
if (
track.kind == otio.schema.TrackKind.Video
and isinstance(child, otio.schema.Track)
and fcp_extract_shots.is_shot(child)
):
shot_name = extract_shots.clean_name(child.name)
print(f"found shot: {shot_name}")
marker = otio.schema.Marker(shot_name)
child.markers.append(marker)
range_in_parent = child.range_in_parent()
cut_in = range_in_parent.start_time.to_frames()
markers.append([cut_in, video_track_index, marker])
if isinstance(child, otio.schema.Clip):
clip = child
if clip.media_reference:
available_duration = 0
if clip.media_reference.available_range:
available_duration = (
clip.media_reference.available_range.duration.to_seconds()
)
# zero duration is a still
if available_duration == 0:
new_duration = max(
720 * 2, 720 + clip.source_range.duration.to_frames()
)
new_range = otio.opentime.TimeRange(
otio.opentime.RationalTime(0, 24),
otio.opentime.RationalTime(new_duration, 24),
)
clip.media_reference.available_range = new_range
clip.source_range = otio.opentime.TimeRange(
otio.opentime.RationalTime(720, 24),
clip.source_range.duration,
)
else:
available_range = scale_ranges(
clip.media_reference.available_range
)
start_offset = (
clip.source_range.start_time.to_frames()
- available_range.start_time.to_frames()
)
available_dur = available_range.duration.to_frames()
out_point = (
start_offset + clip.source_range.duration.to_frames()
)
if out_point > available_dur:
print(
f"Warning: {track.kind} {clip.name} source range larger the available_range, extending {out_point} > {available_dur}"
)
available_range = available_range.extended_by(
clip.source_range
)
clip.media_reference.available_range = available_range
if (
hasattr(clip.media_reference, "target_url")
and clip.media_reference.target_url
):
target_url = clip.media_reference.target_url
name, ext = os.path.splitext(target_url)
ext = ext.lower()
if not os.path.exists(target_url):
continue
if ext not in media_dict:
media_dict[ext] = {}
if target_url not in media_dict[ext]:
media_dict[ext][target_url] = [clip]
else:
media_dict[ext][target_url].append(clip)
elif isinstance(child, otio.schema.Transition):
add_transition_metadata(child)
return timeline, markers
[docs]
def try_convert(target_url, output_aaf, mob_name, clips):
pad_audio = 0
mob_id = None
for clip in clips:
pad_audio = max(
clip.media_reference.available_range.duration.to_seconds(), pad_audio
)
pad_audio = max(clip.source_range.duration.to_seconds(), pad_audio)
try:
mob_id = aaf_embedded_media_tool.create_aaf_file(
[target_url],
output_aaf,
mob_name,
mob_name,
frame_rate=24,
ignore_alpha=True,
use_embedded_timecode=True,
copy_dnxhd_streams=True,
pad_audio=pad_audio,
)
if mob_id:
image_to_aaf.add_network_locators(output_aaf, target_url, mob_id)
except:
print(traceback.format_exc())
return mob_id
[docs]
def find_aaf(aaf_name, aaf_search_paths):
for dir_path in aaf_search_paths:
path = os.path.join(dir_path, aaf_name)
if os.path.exists(path):
print(f"found: {path}")
return path
for subdir in os.listdir(dir_path):
path = os.path.join(dir_path, subdir, aaf_name)
if os.path.exists(path):
print(f"found: {path}")
return path
return None
[docs]
def convert_otio_fcp_scenes(src_files, output_dir, aaf_search_paths):
start = time.time()
fcp_media_linker.register_media_linker()
media_dict = {}
timelines = []
aaf_sequences_dirname = os.path.join(output_dir, "aaf_sequences")
if not os.path.exists(aaf_sequences_dirname):
os.makedirs(aaf_sequences_dirname)
otio_sequences_dirname = os.path.join(output_dir, "otio_sequences")
if not os.path.exists(otio_sequences_dirname):
os.makedirs(otio_sequences_dirname)
aaf_media_dirname = os.path.join(output_dir, "aaf_media")
if not os.path.exists(aaf_media_dirname):
os.makedirs(aaf_media_dirname)
work_dirname = os.path.join(output_dir, "work")
if not os.path.exists(work_dirname):
os.makedirs(work_dirname)
for path in src_files:
timeline = otio.adapters.read_from_file(
path, media_linker_name="fcp_media_linker"
)
new_timeline, markers = simplify_timeline(timeline, media_dict)
timelines.append([new_timeline, markers])
aaf_elements = []
for ext, files in media_dict.items():
# test errror handling
# for i in range(10):
# files[f"badd file {i:04d} {ext}"] = []
if ext in (".psd", ".psb", ".jpg", ".png"):
if ext in (".psd", ".psb"):
media_dirname = os.path.join(aaf_media_dirname, "psd")
else:
media_dirname = os.path.join(aaf_media_dirname, "images")
if not os.path.exists(media_dirname):
os.makedirs(media_dirname)
encode_items = []
existing_aafs = {}
for target_url, clips in files.items():
# skip urls that contain single quotes, handle in slow method
if target_url.count("'"):
continue
basename = f"{os.path.basename(target_url)}.aaf"
output_aaf = find_aaf(basename, aaf_search_paths)
if output_aaf:
existing_aafs[target_url] = output_aaf
else:
encode_items.append(target_url)
dnxhr_mapping = {}
if encode_items:
dnxhr_mapping = mp4_chopper.still_dnxhr_encoder(
encode_items, work_dirname
)
executor = ProcessPoolExecutor(4)
try:
# process_psd_media(target_url, output_aaf, clips)
futures = []
for target_url, clips in files.items():
existing_aaf = existing_aafs.get(target_url, None)
basename = os.path.basename(target_url)
output_aaf = os.path.join(media_dirname, f"{basename}.aaf")
dnxhr_file = dnxhr_mapping.get(target_url, None)
f = executor.submit(
mp4_chopper.process_psd_media,
target_url,
output_aaf,
dnxhr_file,
existing_aaf,
)
futures.append(f)
i = 0
for f in as_completed(futures):
target_url, output_aaf, mob_id = f.result()
print(f"{i:05d}/{len(files):05d} {ext} {target_url}")
clips = media_dict[ext][target_url]
for clip in clips:
clip.metadata["AAF"] = {
"SourceID": str(mob_id or aaf2.mobid.MobID.new())
}
if output_aaf:
aaf_elements.append(output_aaf)
i += 1
finally:
executor.shutdown(False, cancel_futures=True)
elif ext in (".wav", ".aif", ".aiff", ".m4a", ".mp3", ".mov", ".mp4"):
if ext in (".mov", ".mp4"):
media_dirname = os.path.join(aaf_media_dirname, "video")
else:
media_dirname = os.path.join(aaf_media_dirname, "audio")
if not os.path.exists(media_dirname):
os.makedirs(media_dirname)
for i, (target_url, clips) in enumerate(files.items()):
print(f"{i:05d}/{len(files):05d} {ext} {target_url}")
basename = f"{os.path.basename(target_url)}.aaf"
mob_id = None
output_aaf = find_aaf(basename, aaf_search_paths)
if output_aaf:
mob_id = mp4_chopper.get_master_mob_id(output_aaf)
if not mob_id:
output_aaf = os.path.join(media_dirname, basename)
mob_id = mp4_chopper.get_master_mob_id(output_aaf)
if not mob_id:
mob_id = try_convert(target_url, output_aaf, basename, clips)
for clip in clips:
clip.metadata["AAF"] = {
"SourceID": str(mob_id or aaf2.mobid.MobID.new())
}
aaf_elements.append(output_aaf)
for path, (timeline, markers) in zip(src_files, timelines):
basename = os.path.basename(path)
out_aaf = os.path.join(aaf_sequences_dirname, basename + ".aaf")
print(f"writing {out_aaf}")
otio.adapters.write_to_file(timeline, out_aaf, use_empty_mob_ids=True)
mp4_chopper.add_markers(out_aaf, markers)
out_otio = os.path.join(otio_sequences_dirname, basename + ".aaf.otio")
otio.adapters.write_to_file(timeline, out_otio)
dur = time.time() - start
print(f"timelines converted in {dur/60.0} minutes")
[docs]
def run_cli():
parser = argparse.ArgumentParser(prog="convert")
parser.add_argument("source_dir", type=pathlib.Path)
parser.add_argument("output_dir", type=pathlib.Path)
parser.add_argument(
"-s",
"--search",
type=pathlib.Path,
action="append",
dest="search_paths",
help="aaf search paths for existing media",
)
args = parser.parse_args()
assert os.path.isdir(args.source_dir)
assert os.path.isdir(args.output_dir)
files = glob.glob(os.path.join(args.source_dir, "*.otio"))
aaf_search_paths = [str(item) for item in args.search_paths]
convert_otio_fcp_scenes(files, str(args.output_dir), aaf_search_paths)
if __name__ == "__main__":
#
run_cli()
# files = glob.glob(".\\tests\\*.otio")
# convert_media(files, 'output')