import os
import argparse
import pathlib
from pprint import pprint
from cgl.core.path.object import PathObject
from cgl.plugins.otio.tools import extract_shots
# from cgl.plugins.shotgrid.main import ProjectManagementData
# from cgl.plugins.shotgrid.tracking_internal.shotgun_specific import ShotgunQuery as sg
[docs]
def shotgrid_cut_upload(edit_path):
po = PathObject.from_path_string(edit_path)
edit_dict = extract_shots.extract_shot_edit_dict(edit_path)
sequence_shot_name = f"{po.sequence}_{po.shot}"
cut_name = f"{sequence_shot_name}_{po.version}"
basename = os.path.basename(edit_path)
preview_po = PathObject.from_path_string(po.get_preview_path())
pm = ProjectManagementData(preview_po).get_project_management_data()
version = pm.find_version()
# batch find all the shotgrid shot ids used in the cut
any_filter = [["code", "is", sequence_shot_name]]
for key, _ in edit_dict.items():
f = ["code", "is", key]
any_filter.append(f)
filters = [
["project.Project.name", "is", po.project],
{"filter_operator": "any", "filters": any_filter},
]
fields = ["code", "project"]
shots = sg.find("Shot", filters, fields)
# create shot to shot id map
shots_id_name = {}
project_data = None
for shot in shots:
shots_id_name[shot["code"]] = shot["id"]
if not project_data:
project_data = {"type": "Project", "id": shot["project"]["id"]}
assert project_data
# remove cut if it already exists
existing = (
sg.find("Cut", [["code", "is", cut_name], ["project", "is", project_data]])
or []
)
for cut in existing:
sg.delete("Cut", cut["id"])
# create cut
cut_data = {"project": project_data, "code": cut_name}
if sequence_shot_name in shots_id_name:
cut_data["entity"] = {"type": "Shot", "id": shots_id_name[sequence_shot_name]}
if version:
cut_data["version"] = {"type": "Version", "id": version["id"]}
cut = sg.create("Cut", cut_data)
# upload the timeline to shotgrid too
sg.upload(
"Cut", cut["id"], edit_path, field_name="attachments", display_name=basename
)
# create cut item
batch_data = []
for key, meta in edit_dict.items():
cut_data = meta["shotgrid"]
shot_id = shots_id_name.get(key, None)
if not shot_id:
continue
sg_cut_order = cut_data["sg_cut_order"]
item_code = f"{key}_{sg_cut_order:08d}"
item_data = {
"project": project_data,
"cut": {"type": "Cut", "id": cut["id"]},
"shot": {"type": "Shot", "id": shot_id},
"code": item_code,
"cut_item_duration": cut_data["sg_cut_duration"],
"cut_item_in": cut_data["sg_head_in"],
"cut_item_out": cut_data["sg_tail_out"],
"cut_order": cut_data["sg_cut_order"],
"edit_in": cut_data["sg_cut_in"],
"edit_out": cut_data["sg_cut_out"],
}
batch_data.append(
{"request_type": "create", "entity_type": "CutItem", "data": item_data}
)
result = sg.batch(batch_data)
pprint(result)
[docs]
def run_cli():
parser = argparse.ArgumentParser(
prog="chop timeline into multiple pieces based off a timeline",
)
parser.add_argument("edit_path", type=pathlib.Path)
args = parser.parse_args()
shotgrid_cut_upload(str(args.edit_path))
if __name__ == "__main__":
run_cli()