Source code for cgl.plugins.otio.tools.shotgrid_cut_upload

import os
import argparse
import pathlib
from pprint import pprint

from cgl.core.path.object import PathObject
from cgl.plugins.otio.tools import extract_shots

# from cgl.plugins.shotgrid.main import ProjectManagementData
# from cgl.plugins.shotgrid.tracking_internal.shotgun_specific import ShotgunQuery as sg


[docs] def shotgrid_cut_upload(edit_path): po = PathObject.from_path_string(edit_path) edit_dict = extract_shots.extract_shot_edit_dict(edit_path) sequence_shot_name = f"{po.sequence}_{po.shot}" cut_name = f"{sequence_shot_name}_{po.version}" basename = os.path.basename(edit_path) preview_po = PathObject.from_path_string(po.get_preview_path()) pm = ProjectManagementData(preview_po).get_project_management_data() version = pm.find_version() # batch find all the shotgrid shot ids used in the cut any_filter = [["code", "is", sequence_shot_name]] for key, _ in edit_dict.items(): f = ["code", "is", key] any_filter.append(f) filters = [ ["project.Project.name", "is", po.project], {"filter_operator": "any", "filters": any_filter}, ] fields = ["code", "project"] shots = sg.find("Shot", filters, fields) # create shot to shot id map shots_id_name = {} project_data = None for shot in shots: shots_id_name[shot["code"]] = shot["id"] if not project_data: project_data = {"type": "Project", "id": shot["project"]["id"]} assert project_data # remove cut if it already exists existing = ( sg.find("Cut", [["code", "is", cut_name], ["project", "is", project_data]]) or [] ) for cut in existing: sg.delete("Cut", cut["id"]) # create cut cut_data = {"project": project_data, "code": cut_name} if sequence_shot_name in shots_id_name: cut_data["entity"] = {"type": "Shot", "id": shots_id_name[sequence_shot_name]} if version: cut_data["version"] = {"type": "Version", "id": version["id"]} cut = sg.create("Cut", cut_data) # upload the timeline to shotgrid too sg.upload( "Cut", cut["id"], edit_path, field_name="attachments", display_name=basename ) # create cut item batch_data = [] for key, meta in edit_dict.items(): cut_data = meta["shotgrid"] shot_id = shots_id_name.get(key, None) if not shot_id: continue sg_cut_order = cut_data["sg_cut_order"] item_code = f"{key}_{sg_cut_order:08d}" item_data = { "project": project_data, "cut": {"type": "Cut", "id": cut["id"]}, "shot": {"type": "Shot", "id": shot_id}, "code": item_code, "cut_item_duration": cut_data["sg_cut_duration"], "cut_item_in": cut_data["sg_head_in"], "cut_item_out": cut_data["sg_tail_out"], "cut_order": cut_data["sg_cut_order"], "edit_in": cut_data["sg_cut_in"], "edit_out": cut_data["sg_cut_out"], } batch_data.append( {"request_type": "create", "entity_type": "CutItem", "data": item_data} ) result = sg.batch(batch_data) pprint(result)
[docs] def run_cli(): parser = argparse.ArgumentParser( prog="chop timeline into multiple pieces based off a timeline", ) parser.add_argument("edit_path", type=pathlib.Path) args = parser.parse_args() shotgrid_cut_upload(str(args.edit_path))
if __name__ == "__main__": run_cli()