Autodesk ShotGrid / ShotGun / Flow Python API

https://developers.shotgridsoftware.com/python-api/

# Example:

sg = shotgun_api3.Shotgun("https://my-site.shotgrid.autodesk.com",
                          login="rhendriks",
                          password="c0mPre$Hi0n")

sg.find("Shot", filters=[["sg_status_list", "is", "ip"]], fields=["code", "sg_status_list"])


# Output:

[{'code': 'bunny_020_0170',
  'id': 896,
  'sg_sequence': {'id': 5, 'name': 'bunny_020', 'type': 'Sequence'},
  'sg_status_list': 'ip',
  'type': 'Shot'},
 {'code': 'bunny_020_0200',
  'id': 899,
  'sg_sequence': {'id': 5, 'name': 'bunny_020', 'type': 'Sequence'},
  'sg_status_list': 'ip',
  'type': 'Shot'},
 {'code': 'bunny_030_0080',
  'id': 907,
  'sg_sequence': {'id': 6, 'name': 'bunny_030', 'type': 'Sequence'},
  'sg_status_list': 'ip',
  'type': 'Shot'}]
# Find all fields for a given schema (ie: Project, 

# Get the schema for Project entity
project_schema = sg.schema_field_read('Project')

# Print all available fields
print("Available Project fields:")
for field_name, field_info in sorted(project_schema.items()):
    print(f"  {field_name}: {field_info['name']} ({field_info['data_type']['value']})")

# or

for field_name, field_info in sorted(project_schema.items()):
    print(f"\nField: {field_name}")
    print(f"  Display Name: {field_info['name']}")
    print(f"  Type: {field_info['data_type']['value']}")
    if 'valid_values' in field_info['properties']:
        print(f"  Valid Values: {field_info['properties']['valid_values']['value']}")

Example of importing an EDL into Flow

Below is an overview of how you can programmatically replicate what the RV Import Cut Tool does—i.e. load a CMX 3600 EDL into ShotGrid—using the ShotGrid (formerly Shotgun) APIs. First, the official tool is built into RV and Flow Production Tracking, but its core functionality (parsing an EDL, creating Cut and CutItem records, uploading the EDL file, etc.) is exposed via the ShotGrid API layers:

EDL example:

TITLE: Pool Shark
FCM: NON-DROP FRAME

001  cutX_1  V     C        10:59:23:01 10:59:28:16 01:00:00:00 01:00:05:15
* FROM CLIP NAME: clip01.mov

002  cutX_1  V     C        11:39:48:15 11:39:51:13 01:00:05:15 01:00:08:13
* FROM CLIP NAME: clip02.mov

003  cutX_1  V     C        13:16:30:21 13:16:34:19 01:00:08:13 01:00:12:11
* FROM CLIP NAME: clip03.mov

004  cutX_1  V     C        14:09:43:16 14:09:44:20 01:00:12:11 01:00:13:15
* FROM CLIP NAME: clip04.mov

import getpass
import shotgun_api3
from shotgun_api3 import Shotgun
import pprint
import edl
#from edl import Parser
import pycmx
from pycmx.parse_cmx_events import parse_cmx3600
#import opentimelineio as otio
import itertools

'''
- Import CMX 3600-format EDL
pip install shotgun_api3
pip install timecode
pip install edl # requires timecode
pip install pycmx #  currently maintained

https://help.autodesk.com/view/SGSUB/ENU/?guid=SG_Editorial_ed_edl_events_html

'''
debug = False
FPS = 24

def timecode_to_frames(tc, fps):
    h, m, s, f = map(int, tc.split(":"))
    return ((h * 3600 + m * 60 + s) * fps) + f

# Application key
sg = Shotgun(
    "https://YOUR.shotgrid.autodesk.com",
    script_name="YOUR_SCRIPT_NAME",
    api_key="YOUR_API_KEY"
)  

'''
# manual login
codeVersion = getpass.getpass("Please enter your shotgun password \nIt will be used only for this cycle and not be stored : ")
sg = shotgun_api3.Shotgun(
    "https://evpg.shotgrid.autodesk.com", 
    login="daniele.tosti@scanlinevfx.com", 
    password=codeVersion, 
    convert_datetimes_to_utc=False 
)
'''

#pprint.pprint(dir(sg))
#print()

project_schema = sg.schema_field_read('Project')    
all_field_names = list(project_schema.keys())

if debug:
    # Get the schemas
    pprint.pprint(sg.schema_field_read("CutItem"))
    pprint.pprint(sg.schema_field_read("Shot"))

#######################
filters = [['name','is','PIPELINE-TEST'],]
fields = ['id','project','code','content']
#project = sg.find("Project", filters=filters,  fields=all_field_names)
project = sg.find_one('Project', filters=filters, fields=all_field_names)

#pprint.pprint(project)
print()

PROJECT_ID = project['id']

#####################
edl_path = "YOUR.edl"


# this accepts only a perfectly formatted edl
# Parse the EDL
with open(edl_path) as f:
    edl = parse_cmx3600(f) #parser.parse(f)

# The actual event is class pycmx.parse_cmx_statements.Event
events = list(edl.events)



# inspect events
first_evt = events[0]

print("Event #1 attributes:", vars(first_evt))
# Event #1 attributes: 
'''
{
    'statements': [
        FCM(drop=False, line_number=1),
        Unrecognized(content='', line_number=2),
        Event(   # class pycmx.parse_cmx_statements.Event
            event='001',
            source='TOSTI_1',
            channels='V',
            trans='C',
            trans_op='',
            source_in='10:59:23:01',
            source_out='10:59:28:16',
            record_in='01:00:00:00',
            record_out='01:00:05:15',
            format=8,
            line_number=3
        ),
        ClipName(
            name='clip01.mov',
            affect='from',
            line_number=4
        ),
        Unrecognized(content='', line_number=5)
    ]
}
'''

for event in edl.events:
    #print(dir(event))  # This will show all attributes
    #print(vars(event))  # This 
    stmt = event.statements
    #print(stmt)
    print(stmt[1].source)
    print(type(stmt[1]))
    print(dir(stmt[1]))
    break




# debug
print("Title:", edl.title)
print("Events number:", len(list(edl.events)))
print()

with open(edl_path) as f:
    for line in f:
        parts = line.strip().split()
        if len(parts) == 8 and parts[3] in ("C", "D"):
            num, reel, tracks, edit_type, src_in, src_out, rec_in, rec_out = parts
            print(
                f"Event {num}: "
                f"Reel {reel} "
                f"Track {tracks} Edit_type {edit_type} "
                f"{src_in}-{src_out} on record {rec_in}-{rec_out}"
            )

  


# cut
cut_code = "Tosti EDL Import – “%s”" % edl_path.split("/")[-1]
existing_cuts = sg.find(
    "Cut",
    [
        ["code",    "is",    cut_code],
        ["project", "is",    {"type":"Project","id":PROJECT_ID}]
    ],
    ["id"]
)
if existing_cuts:
    cut_ref = {"type": "Cut", "id": existing_cuts[0]["id"]}
else:
    new_cut = sg.create(
        "Cut",
        {
            "project": {"type":"Project","id":PROJECT_ID},
            "code":    cut_code
        }
    )
    cut_ref = {"type": "Cut", "id": new_cut["id"]}

#cut = sg.create("Cut", {
#    "project": {"type": "Project", "id": PROJECT_ID},
#    "code": "Tosti EDL Import – “%s”" % edl_path.split("/")[-1]
#})

if cut_ref:
    cut_id = cut_ref["id"]
    print("Cut:", cut_ref['id'], cut_ref) 
else:
    raise ValueError("Missing cut ref.")
# Cut: 103 {'id': 103, 'project': {'id': 551, 'name': 'PIPELINE-TEST', 'type': 'Project'}, 'code': 'Tosti EDL Import – “M:\\Tosti\\SOFTWARE\\RnD\\shotgun\\EDL Import\\example.edl”', 'type': 'Cut'}
# Created under 
# https://evpg.shotgrid.autodesk.com/page/13428
# https://evpg.shotgrid.autodesk.com/page/13428#Cut_103
# or 
# https://evpg.shotgrid.autodesk.com/detail/Cut/103


#cut_id = 103
if 1 == 1:
    for evt in events:

        reel_name = None
        event_index = None
        source_in = None
        source_out = None
        record_in = None
        record_out = None

        stmt = evt.statements
        #print("statement ", stmt)
        for entry in stmt:
            #print(type(entry))
            if type(entry).__name__ == "Event": # cant use if isinstance(entry, pycmx.parse_cmx_statements.Event):
                #print(dir(entry))
                #print(entry.source)
                reel_name = entry.source
                event_index = entry.event
                source_in = entry.source_in
                source_out = entry.source_out
                record_in = entry.record_in
                record_out = entry.record_out            

        if debug:
            print(reel_name)   # e.g. "TOSTI_1"
            print(event_index) # e.g. 001 
            print(source_in)
            print(source_out)
            print(record_in)
            print(record_out)

        if not all(x is not None for x in [reel_name, event_index, source_in, source_out, record_in, record_out]):
            raise ValueError("One or more required variables are None")

        seq_code = f"{reel_name}"
        shot_code = f"{reel_name}__{event_index}" # this is Sequence evt.reel shot evt.number

        if debug:
            print(reel_name)   # e.g. "TOSTI_1"
            print(event_index) # e.g. 001 
            print(seq_code)        
            print(shot_code)
            print(source_in)
            print(source_out)
            print(record_in)
            print(record_out)

        ######## create the sequence if it does not exist
        sequences = sg.find("Sequence",[
                            ["code", "is", seq_code], 
                            ["project", "is", {"type":"Project","id":PROJECT_ID}]
                        ],
                        ["id"]
                        )

        if sequences:
            seq_ref = {"type": "Sequence", "id": sequences[0]["id"]}
        else:        
            seq = sg.create("Sequence", {
                "project": {"type":"Project", "id":PROJECT_ID},
                "code":    seq_code, 
            })
            seq_ref = {"type":"Sequence","id": seq["id"]}        


        ######## Same for the shot
        shots = sg.find("Shot",[
                            ["code", "is", shot_code], 
                            ["project", "is", {"type":"Project","id":PROJECT_ID}], 
                            ["sg_sequence","is", seq_ref] 
                        ],
                        ["id"]
                        )
        
        if shots:
            shot_ref = {"type": "Shot", "id": shots[0]["id"]}
        else:
            # Optionally create the Shot if it doesn't exist:
            shot_ref = sg.create("Shot", {
                "project": {"type":"Project","id":PROJECT_ID},
                "code": shot_code, 
                "sg_sequence": seq_ref
            })
            shot_ref = {"type": shot_ref["type"], "id": shot_ref["id"]}

        ######## Build the CutItem payload
        ci_data = {
            "project": {"type": "Project", "id": PROJECT_ID},
            "cut": {"type": "Cut", "id": cut_id},
            "shot": shot_ref,
            "cut_item_in": timecode_to_frames(source_in, FPS), #source_in,      # e.g. "00:01:23:12"
            "cut_item_out": timecode_to_frames(source_out, FPS), #source_out,    # e.g. "00:01:28:05"
            "edit_in": timecode_to_frames(record_in, FPS), #source_in,      # e.g. "00:01:23:12"
            "edit_out": timecode_to_frames(record_out, FPS), #source_out,    # e.g. "00:01:28:05"
            "timecode_cut_item_in_text": record_in,
            "timecode_cut_item_out_text": record_out,
            # you can also set sg_cut_track, sg_cut_item_type, etc.
        }
        sg.create("CutItem", ci_data)


        cut_in_frame = timecode_to_frames(source_in, FPS)
        cut_out_frame = timecode_to_frames(source_out, FPS)
        cut_duration = (cut_out_frame - cut_in_frame) + 1  # inclusive

        sg.update("Shot", shot_ref["id"], {
            "sg_cut_in": cut_in_frame,
            "sg_cut_out": cut_out_frame,
            "sg_cut_duration": cut_duration
        })


def cleanup_cut(sg, cut_id):
    # 1. Fetch all CutItems associated with the Cut
    cut_items = sg.find(
        "CutItem",
        [["cut", "is", {"type": "Cut", "id": cut_id}]],
        ["id"]
    )

    # 2. Delete each CutItem
    for item in cut_items:
        sg.delete("CutItem", item["id"])

    # 3. Delete the Cut itself
    sg.delete("Cut", cut_id)



#cleanup_cut(sg, cut_id)


def cleanup_cut_batch(sg, cut_id):
    # Gather delete ops
    ops = []
    items = sg.find("CutItem", [["cut", "is", {"type": "Cut", "id": cut_id}]], ["id"])
    for it in items:
        ops.append({
            "request_type": "delete",
            "entity_type": "CutItem",
            "entity_id": it["id"]
        })
    ops.append({
        "request_type": "delete",
        "entity_type": "Cut",
        "entity_id": cut_id
    })

    # Execute in one batch
    sg.batch(ops)