Source code for cgl.plugins.shotgrid.tracking_internal.shotgun_specific

import logging

# try:
#     from Queue import Empty
# except ModuleNotFoundError:
#     logging.info('Python3 - Skipping Queue import')
import shotgun_api3 as sg_api

from cgl.core.config.query import AlchemyConfigManager

CFG = AlchemyConfigManager()


# TODO: This is a temporary fix until i can sort out the maya multi-threading issues
# TODO: This disables multi-threading for the Shotgun Queries


[docs] class ShotgunQueueEnd(object): pass
[docs] class ShotgunQuery(object): _connection = None def __init__(self, type_, *args, **kwargs): self.result = None self.condition = None self.type = type_ self.company = None self.project = "default" self.args = args self.kwargs = kwargs def __str__(self): return """== Shotgun Query == %s %s %s ==================""" % ( self.type, self.args, self.kwargs, )
[docs] @classmethod def get_connection(cls): if cls._connection is None: config = CFG.shotgrid_config["api"] cls._connection = sg_api.Shotgun( base_url=str(config["server_url"]), script_name=str(config["api_script"]), api_key=str(config["api_key"]), ) return cls._connection
[docs] @classmethod def base_shotgun_query(cls, type_, *args, **kwargs): connection = cls.get_connection() return getattr(connection, type_)(*args, **kwargs)
[docs] @classmethod def find(cls, *args, **kwargs): return cls.base_shotgun_query("find", *args, **kwargs)
[docs] @classmethod def find_one(cls, *args, **kwargs): return cls.base_shotgun_query("find_one", *args, **kwargs)
[docs] @classmethod def schema_entity_read(cls, *args, **kwargs): return cls.base_shotgun_query("schema_entity_read", *args, **kwargs)
[docs] @classmethod def schema_field_read(cls, *args, **kwargs): return cls.base_shotgun_query("schema_field_read", *args, **kwargs)
[docs] @classmethod def create(cls, *args, **kwargs): return cls.base_shotgun_query("create", *args, **kwargs)
[docs] @classmethod def upload_thumbnail(cls, *args, **kwargs): return cls.base_shotgun_query("upload_thumbnail", *args, **kwargs)
[docs] @classmethod def upload(cls, *args, **kwargs): return cls.base_shotgun_query("upload", *args, **kwargs)
[docs] @classmethod def update(cls, *args, **kwargs): return cls.base_shotgun_query("update", *args, **kwargs)
[docs] @classmethod def delete(cls, *args, **kwargs): return cls.base_shotgun_query("delete", *args, **kwargs)
[docs] @classmethod def note_thread_read(cls, *args, **kwargs): return cls.base_shotgun_query("note_thread_read", *args, **kwargs)
[docs] @classmethod def batch(cls, *args, **kwargs): return cls.base_shotgun_query("batch", *args, **kwargs)
[docs] class ShotgunProcess(object): STATES = { 0: "init", 1: "not connected", 2: "connected", 3: "query", } def __init__(self, id_, queue): ShotgunProcess.__init__(self, id_=id_, queue=queue) self.id_ = id_ self.state = 0 self.queue = queue self.connection = None def __str__(self): return "Shotgun processor(%s:%s) " % (self.id_, id(self.queue))
[docs] def run(self): logging.debug("%s run" % self.__str__()) task = self.queue.get(True) if not self.connection: import cgl.plugins.project_management.shotgun.shotgun_api3 as sg_api config = CFG.shotgrid_config["api"] self.connection = sg_api.Shotgun( base_url=config["server_url"], script_name=config["api_script"], api_key=config["api_key"], ) result = getattr(self.connection, task.type)(*task.args, **task.kwargs) task.result["result"] = result