import sys
import re
import opentimelineio as otio
from cgl.plugins.otio.tools import extract_shots
from cgl.plugins.otio.tools.aaf import aaf_media_linker
SEQUENCE_NAME_FMT = r"(?P<sequence>[0-9]{3})"
SHOT_NUMBER_FMT = r"(?P<shot>[0-9]{4})"
SCENE_FMT = r"[^\S\r\n]?:[^\S\r\n]?(?P<scene>[0-9]{1,3}[a-zA-Z]{0,3})"
SHOT_NAME_FMT = SEQUENCE_NAME_FMT + "_" + SHOT_NUMBER_FMT
SHOT_CODE_SCENE_FMT = SHOT_NAME_FMT + SCENE_FMT
[docs]
def parse_shot_name(name):
m = re.fullmatch(SHOT_CODE_SCENE_FMT, name)
if m:
return m.groupdict()
m = re.fullmatch(SHOT_NAME_FMT, name)
if m:
d = m.groupdict()
d['scene'] = None
return None
[docs]
def clean_name(name):
d = parse_shot_name(name)
if not d:
return None
sequence = d.get('sequence', None)
shot = d.get('shot', None)
scene = d.get('scene', None)
sequence = int(sequence)
shot = int(shot)
if scene:
return f"{sequence:03d}_{shot:04d}:{scene}"
return f"{sequence:03d}_{shot:04d}"
[docs]
def is_shot(name):
shot_name = parse_shot_name(name)
if shot_name:
return True
return False
[docs]
def find_markers(item, markers = []):
if hasattr(item, 'markers') and item.markers:
markers.extend(item.markers)
if isinstance(item, otio.core.Composition):
for thing in item:
find_markers(thing, markers)
return markers
[docs]
def remove_transitions(thing):
if isinstance(thing, otio.schema.Stack):
for item in enumerate(thing):
remove_transitions(item)
if isinstance(thing, otio.schema.Track):
for c in reversed(range(len(thing))):
child = thing[c]
if isinstance(child, otio.schema.Transition):
del thing[c]
[docs]
def attach_markers(target_track, markers):
for marker, start_time in markers:
target_item = target_track.child_at_time(start_time)
marker.marked_range = otio.opentime.TimeRange(
start_time=start_time,
duration=marker.marked_range.duration
)
if target_item is None or not hasattr(target_item, 'markers'):
continue
marked_start_local = target_track.transformed_time(
start_time, target_item
)
marker.marked_range = otio.opentime.TimeRange(
start_time=marked_start_local,
duration=marker.marked_range.duration
)
target_item.markers.append(marker)
[docs]
def iter_flatten(thing, trim=True):
if isinstance(thing, otio.schema.Stack):
flatten_tracks = []
for item in thing:
if not isinstance(item, otio.schema.Track):
t = otio.schema.Track()
t.append(item.deepcopy())
item = t
flatten_tracks.append(item)
markers = extract_track_markers(flatten_tracks)
thing = otio.algorithms.flatten_stack(flatten_tracks)
attach_markers(thing, markers)
if isinstance(thing, otio.schema.Track):
if trim and thing.source_range:
thing = otio.algorithms.track_trimmed_to_range(thing, thing.source_range)
for item in thing:
yield from iter_flatten(item)
else:
yield thing.deepcopy()
[docs]
def flatten_item(item):
new_track = otio.schema.Track()
if isinstance(item, otio.core.Composition):
new_track.source_range = item.source_range
for new_item in iter_flatten(item, False):
new_track.append(new_item)
if not new_track.source_range:
new_track.source_range = otio.opentime.TimeRange(duration = item.source_range.duration)
available_duration = new_track.available_range().duration
# if for some reason the flatten does something wrong
# ensure the available_duration of the track is correct
if available_duration.to_frames() < new_track.source_range.duration.to_frames():
source_range = otio.opentime.TimeRange(duration = item.source_range.duration - available_duration)
gap = otio.schema.Gap(source_range=source_range)
new_track.append(gap)
return new_track
[docs]
def find_shot_name(item):
markers = find_markers(item, [])
# for marker in markers:
# print(f"*{marker.name}")
shot_name = None
for marker in markers:
if is_shot(marker.name):
shot_name = clean_name(marker.name)
break
if not shot_name:
return None
return shot_name
[docs]
def add_handles(item, handles):
if handles <= 0:
return
source_range = item.source_range
if not source_range:
source_range = item.available_range()
h = otio.opentime.RationalTime(handles, 24)
handle_gap = otio.schema.Gap()
handle_gap.source_range = otio.opentime.TimeRange(duration = h)
item.insert(0, handle_gap.deepcopy())
item.append(handle_gap.deepcopy())
item.source_range = otio.opentime.TimeRange(start_time = source_range.start_time + h, duration = source_range.duration)
[docs]
def get_item_timings(shot_track):
parent = shot_track.parent()
source_media = []
for item in shot_track:
if not isinstance(item, otio.schema.Clip):
continue
if not item.media_reference:
continue
if isinstance(item.media_reference, otio.schema.MissingReference):
print(item.media_reference)
continue
# range_in_parent = item.range_in_parent()
# print(range_in_parent.start_time)
target_url = item.media_reference.target_url
abs_start_time = item.transformed_time(
item.source_range.start_time, parent
)
start_time = item.transformed_time(
item.source_range.start_time, shot_track
)
source_media.append([target_url, start_time.value, abs_start_time.value])
# print(os.path.basename(target_url), start_time.value, abs_start_time.value)
return source_media
[docs]
def simplify_timeline(aaf_path):
aaf_media_linker.register_media_linker()
timeline = otio.adapters.read_from_file(aaf_path, media_linker_name="aaf_media_linker")
new_timeline = otio.schema.Timeline()
new_timeline.global_start_time = timeline.global_start_time
sequence_name = extract_shots.parse_sequence_name(timeline.name)
if not sequence_name:
raise ValueError(f"Unable to parse sequence/scene name for timeline: {timeline.name}")
new_timeline.name = sequence_name
shot_dict = {}
for track in timeline.video_tracks():
new_track = otio.schema.Track(kind=track.kind)
new_timeline.tracks.append(new_track)
for item in track:
shot_name = find_shot_name(item)
if shot_name:
remove_transitions(item)
new_item = flatten_item(item)
new_item.name = shot_name
new_item.kind = track.kind
add_handles(new_item, extract_shots.HANDLES)
new_track.append(new_item)
assert isinstance(new_item, otio.schema.Track)
assert new_item.available_range().duration.to_frames() >= new_item.source_range.duration.to_frames()
shot_dict[shot_name] = new_item
elif isinstance(item, otio.schema.Transition):
new_track.append(item.deepcopy())
else:
prev = None
# new_track.append(item.deepcopy())
if len(new_track) > 0:
prev = new_track[-1]
if isinstance(prev, otio.schema.Gap):
prev.source_range = prev.source_range.duration_extended_by(item.source_range.duration)
else:
gap = otio.schema.Gap()
gap.name = item.name
gap.source_range = item.source_range
new_track.append(gap)
return new_timeline, shot_dict
if __name__ == "__main__":
aaf_path = sys.argv[1]
timeline, shot_dict = simplify_timeline(sys.argv[1])
otio.adapters.write_to_file(timeline, aaf_path + ".otio")