Source code for cgl.plugins.otio.tools.aaf.aaf_extract_shots

import sys
import re

import opentimelineio as otio

from cgl.plugins.otio.tools import extract_shots
from cgl.plugins.otio.tools.aaf import aaf_media_linker


SEQUENCE_NAME_FMT = r"(?P<sequence>[0-9]{3})"
SHOT_NUMBER_FMT = r"(?P<shot>[0-9]{4})"
SCENE_FMT = r"[^\S\r\n]?:[^\S\r\n]?(?P<scene>[0-9]{1,3}[a-zA-Z]{0,3})"

SHOT_NAME_FMT = SEQUENCE_NAME_FMT + "_" + SHOT_NUMBER_FMT
SHOT_CODE_SCENE_FMT = SHOT_NAME_FMT + SCENE_FMT

[docs] def parse_shot_name(name): m = re.fullmatch(SHOT_CODE_SCENE_FMT, name) if m: return m.groupdict() m = re.fullmatch(SHOT_NAME_FMT, name) if m: d = m.groupdict() d['scene'] = None return None
[docs] def clean_name(name): d = parse_shot_name(name) if not d: return None sequence = d.get('sequence', None) shot = d.get('shot', None) scene = d.get('scene', None) sequence = int(sequence) shot = int(shot) if scene: return f"{sequence:03d}_{shot:04d}:{scene}" return f"{sequence:03d}_{shot:04d}"
[docs] def is_shot(name): shot_name = parse_shot_name(name) if shot_name: return True return False
[docs] def find_markers(item, markers = []): if hasattr(item, 'markers') and item.markers: markers.extend(item.markers) if isinstance(item, otio.core.Composition): for thing in item: find_markers(thing, markers) return markers
[docs] def remove_transitions(thing): if isinstance(thing, otio.schema.Stack): for item in enumerate(thing): remove_transitions(item) if isinstance(thing, otio.schema.Track): for c in reversed(range(len(thing))): child = thing[c] if isinstance(child, otio.schema.Transition): del thing[c]
[docs] def extract_track_markers(tracks): markers = [] for track in tracks: for clip in track.find_children(): if not clip.markers: continue for marker in list(clip.markers): clip.markers.remove(marker) marked_start_local = clip.transformed_time( marker.marked_range.start_time, track ) markers.append([marker, marked_start_local]) return markers
[docs] def attach_markers(target_track, markers): for marker, start_time in markers: target_item = target_track.child_at_time(start_time) marker.marked_range = otio.opentime.TimeRange( start_time=start_time, duration=marker.marked_range.duration ) if target_item is None or not hasattr(target_item, 'markers'): continue marked_start_local = target_track.transformed_time( start_time, target_item ) marker.marked_range = otio.opentime.TimeRange( start_time=marked_start_local, duration=marker.marked_range.duration ) target_item.markers.append(marker)
[docs] def iter_flatten(thing, trim=True): if isinstance(thing, otio.schema.Stack): flatten_tracks = [] for item in thing: if not isinstance(item, otio.schema.Track): t = otio.schema.Track() t.append(item.deepcopy()) item = t flatten_tracks.append(item) markers = extract_track_markers(flatten_tracks) thing = otio.algorithms.flatten_stack(flatten_tracks) attach_markers(thing, markers) if isinstance(thing, otio.schema.Track): if trim and thing.source_range: thing = otio.algorithms.track_trimmed_to_range(thing, thing.source_range) for item in thing: yield from iter_flatten(item) else: yield thing.deepcopy()
[docs] def flatten_item(item): new_track = otio.schema.Track() if isinstance(item, otio.core.Composition): new_track.source_range = item.source_range for new_item in iter_flatten(item, False): new_track.append(new_item) if not new_track.source_range: new_track.source_range = otio.opentime.TimeRange(duration = item.source_range.duration) available_duration = new_track.available_range().duration # if for some reason the flatten does something wrong # ensure the available_duration of the track is correct if available_duration.to_frames() < new_track.source_range.duration.to_frames(): source_range = otio.opentime.TimeRange(duration = item.source_range.duration - available_duration) gap = otio.schema.Gap(source_range=source_range) new_track.append(gap) return new_track
[docs] def find_shot_name(item): markers = find_markers(item, []) # for marker in markers: # print(f"*{marker.name}") shot_name = None for marker in markers: if is_shot(marker.name): shot_name = clean_name(marker.name) break if not shot_name: return None return shot_name
[docs] def add_handles(item, handles): if handles <= 0: return source_range = item.source_range if not source_range: source_range = item.available_range() h = otio.opentime.RationalTime(handles, 24) handle_gap = otio.schema.Gap() handle_gap.source_range = otio.opentime.TimeRange(duration = h) item.insert(0, handle_gap.deepcopy()) item.append(handle_gap.deepcopy()) item.source_range = otio.opentime.TimeRange(start_time = source_range.start_time + h, duration = source_range.duration)
[docs] def get_item_timings(shot_track): parent = shot_track.parent() source_media = [] for item in shot_track: if not isinstance(item, otio.schema.Clip): continue if not item.media_reference: continue if isinstance(item.media_reference, otio.schema.MissingReference): print(item.media_reference) continue # range_in_parent = item.range_in_parent() # print(range_in_parent.start_time) target_url = item.media_reference.target_url abs_start_time = item.transformed_time( item.source_range.start_time, parent ) start_time = item.transformed_time( item.source_range.start_time, shot_track ) source_media.append([target_url, start_time.value, abs_start_time.value]) # print(os.path.basename(target_url), start_time.value, abs_start_time.value) return source_media
[docs] def simplify_timeline(aaf_path): aaf_media_linker.register_media_linker() timeline = otio.adapters.read_from_file(aaf_path, media_linker_name="aaf_media_linker") new_timeline = otio.schema.Timeline() new_timeline.global_start_time = timeline.global_start_time sequence_name = extract_shots.parse_sequence_name(timeline.name) if not sequence_name: raise ValueError(f"Unable to parse sequence/scene name for timeline: {timeline.name}") new_timeline.name = sequence_name shot_dict = {} for track in timeline.video_tracks(): new_track = otio.schema.Track(kind=track.kind) new_timeline.tracks.append(new_track) for item in track: shot_name = find_shot_name(item) if shot_name: remove_transitions(item) new_item = flatten_item(item) new_item.name = shot_name new_item.kind = track.kind add_handles(new_item, extract_shots.HANDLES) new_track.append(new_item) assert isinstance(new_item, otio.schema.Track) assert new_item.available_range().duration.to_frames() >= new_item.source_range.duration.to_frames() shot_dict[shot_name] = new_item elif isinstance(item, otio.schema.Transition): new_track.append(item.deepcopy()) else: prev = None # new_track.append(item.deepcopy()) if len(new_track) > 0: prev = new_track[-1] if isinstance(prev, otio.schema.Gap): prev.source_range = prev.source_range.duration_extended_by(item.source_range.duration) else: gap = otio.schema.Gap() gap.name = item.name gap.source_range = item.source_range new_track.append(gap) return new_timeline, shot_dict
if __name__ == "__main__": aaf_path = sys.argv[1] timeline, shot_dict = simplify_timeline(sys.argv[1]) otio.adapters.write_to_file(timeline, aaf_path + ".otio")