import logging from threading import Thread from enum import Enum import meshroom from meshroom.common import BaseObject, DictModel, Property, Signal, Slot from meshroom.core.node import Status class State(Enum): """ State of the Thread that is computing nodes """ IDLE = 0 RUNNING = 1 STOPPED = 2 DEAD = 3 ERROR = 4 class TaskThread(Thread): """ A thread with a pile of nodes to compute """ def __init__(self, manager): Thread.__init__(self, target=self.run) self._state = State.IDLE self._manager = manager self.forceCompute = False def isRunning(self): return self._state == State.RUNNING def run(self): """ Consume compute tasks. """ self._state = State.RUNNING stopAndRestart = False for nId, node in enumerate(self._manager._nodesToProcess): # skip already finished/running nodes if node.isFinishedOrRunning(): continue # if a node does not exist anymore, node.chunks becomes a PySide property try: multiChunks = len(node.chunks) > 1 except TypeError: continue for cId, chunk in enumerate(node.chunks): if chunk.isFinishedOrRunning() or not self.isRunning(): continue if multiChunks: logging.info('[{node}/{nbNodes}]({chunk}/{nbChunks}) {nodeName}'.format( node=nId+1, nbNodes=len(self._manager._nodesToProcess), chunk=cId+1, nbChunks=len(node.chunks), nodeName=node.nodeType)) else: logging.info('[{node}/{nbNodes}] {nodeName}'.format( node=nId+1, nbNodes=len(self._manager._nodesToProcess), nodeName=node.nodeType)) try: chunk.process(self.forceCompute) except Exception as e: if chunk.isStopped(): stopAndRestart = True break else: logging.error("Error on node computation: {}".format(e)) nodesToRemove, _ = self._manager._graph.nodesFromNode(node) # remove following nodes from the task queue for n in nodesToRemove[1:]: # exclude current node try: self._manager._nodesToProcess.remove(n) except ValueError: # Node already removed (for instance a global clear of _nodesToProcess) pass n.clearSubmittedChunks() if stopAndRestart: break if stopAndRestart: self._state = State.STOPPED self._manager.restartRequested.emit() else: self._manager._nodesToProcess = [] self._state = State.DEAD class TaskManager(BaseObject): """ Manage graph - local and external - computation tasks. """ def __init__(self, parent=None): super(TaskManager, self).__init__(parent) self._graph = None self._nodes = DictModel(keyAttrName='_name', parent=self) self._nodesToProcess = [] self._nodesExtern = [] # internal thread in which local tasks are executed self._thread = TaskThread(self) self._blockRestart = False self.restartRequested.connect(self.restart) def requestBlockRestart(self): """ Block computing. Note: should only be used to completely stop computing. """ self._blockRestart = True def blockRestart(self): """ Avoid the automatic restart of computing. """ for node in self._nodesToProcess: if node.getGlobalStatus() in (Status.SUBMITTED, Status.ERROR): node.upgradeStatusTo(Status.NONE) self.removeNode(node, displayList=True) self._blockRestart = False self._nodesToProcess = [] self._thread._state = State.DEAD @Slot() def restart(self): """ Restart computing when thread has been stopped. Note: this is done like this to avoid app freezing. """ # Make sure to wait the end of the current thread self._thread.join() # Avoid restart if thread was globally stopped if self._blockRestart: self.blockRestart() return if self._thread._state != State.STOPPED: return for node in self._nodesToProcess: if node.getGlobalStatus() == Status.STOPPED: # Remove node from the computing list self.removeNode(node, displayList=False, processList=True) # Remove output nodes from display and computing lists outputNodes = node.getOutputNodes(recursive=True) for n in outputNodes: if n.getGlobalStatus() in (Status.ERROR, Status.SUBMITTED): n.upgradeStatusTo(Status.NONE) self.removeNode(n, displayList=True, processList=True) # Start a new thread with the remaining nodes to compute self._thread = TaskThread(self) self._thread.start() def compute(self, graph=None, toNodes=None, forceCompute=False, forceStatus=False): """ Start graph computation, from root nodes to leaves - or nodes in 'toNodes' if specified. Computation tasks (NodeChunk) happen in a separate thread (see TaskThread). :param graph: the graph to consider. :param toNodes: specific leaves, all graph leaves if None. :param forceCompute: force the computation despite nodes status. :param forceStatus: force the computation even if some nodes are submitted externally. """ self._graph = graph if self._thread._state != State.RUNNING: self._nodes.clear() externEmpty = any(node.isAlreadySubmitted() for node in self._nodesExtern) if not externEmpty: self._nodes.update(self._nodesExtern) else: self._nodesExtern = [] if forceCompute: nodes, edges = graph.dfsOnFinish(startNodes=toNodes) else: nodes, edges = graph.dfsToProcess(startNodes=toNodes) nodes = [node for node in nodes if not self.contains(node)] # be sure to avoid non-real conflicts chunksInConflict = self.getAlreadySubmittedChunks(nodes) if chunksInConflict: chunksStatus = set([chunk.status.status.name for chunk in chunksInConflict]) chunksName = [node.name for node in chunksInConflict] # Syntax and terms are used on QML side to recognize the error msg = '[COMPUTATION] Already Submitted:\n' \ 'WARNING - Some nodes are already submitted with status: {}\nNodes: {}'.format( ', '.join(chunksStatus), ', '.join(chunksName) ) if forceStatus: logging.warning(msg) else: raise RuntimeError(msg) for node in nodes: node.destroyed.connect(lambda obj=None, name=node.name: self.onNodeDestroyed(obj, name)) node.beginSequence(forceCompute) self._nodes.update(nodes) self._nodesToProcess.extend(nodes) if self._thread._state == State.IDLE: self._thread.start() elif self._thread._state in (State.DEAD, State.ERROR): self._thread = TaskThread(self) self._thread.start() def onNodeDestroyed(self, obj, name): """ Remove node from the taskmanager when it's destroyed in the graph :param obj: :param name: :return: """ if name in self._nodes.keys(): self._nodes.pop(name) def contains(self, node): return node in self._nodes.values() def containsNodeName(self, name): """ Check if a node with the argument name belongs to the display list. """ if name in self._nodes.keys(): return True return False def removeNode(self, node, displayList=True, processList=False, externList=False): """ Remove node from the Task Manager. Args: node (Node): node to remove. displayList (bool): remove from the display list. processList (bool): remove from the nodesToProcess list. externList (bool): remove from the nodesExtern list. """ if displayList and self._nodes.contains(node): self._nodes.pop(node.name) if processList and node in self._nodesToProcess: self._nodesToProcess.remove(node) if externList and node in self._nodesExtern: self._nodesExtern.remove(node) def clear(self): """ Remove all the nodes from the taskmanager :return: """ self._nodes.clear() self._nodesExtern = [] self._nodesToProcess = [] def update(self, graph): """ Add all the nodes that are being rendered in a renderfarm to the taskmanager when new graph is loaded :param graph: :return: """ for node in graph._nodes: if node.isAlreadySubmitted() and node._chunks.size() > 0 and node.isExtern(): self._nodes.add(node) self._nodesExtern.append(node) def submit(self, graph=None, submitter=None, toNodes=None): """ Nodes are send to the renderfarm :param graph: :param submitter: :param toNodes: :return: """ # ensure submitter is properly set sub = meshroom.core.submitters.get(submitter, None) if sub is None: raise RuntimeError("Unknown Submitter : " + submitter) if self._thread._state != State.RUNNING: self._nodes.clear() externEmpty = True for node in self._nodesExtern: if node.isAlreadySubmitted(): externEmpty = False break if not externEmpty: self._nodes.update(self._nodesExtern) else: self._nodesExtern = [] nodesToProcess, edgesToProcess = graph.dfsToProcess(startNodes=toNodes) flowEdges = graph.flowEdges(startNodes=toNodes) edgesToProcess = set(edgesToProcess).intersection(flowEdges) try: res = sub.submit(nodesToProcess, edgesToProcess, graph.filepath) if res: for node in nodesToProcess: node.destroyed.connect(lambda obj=None, name=node.name: self.onNodeDestroyed(obj, name)) node.submit() # update node status self._nodes.update(nodesToProcess) self._nodesExtern.extend(nodesToProcess) except Exception as e: logging.error("Error on submit : {}".format(e)) def getAlreadySubmittedChunks(self, nodes): """ Check if nodes have already been submitted in another Meshroom instance. :param nodes: :return: """ out = [] for node in nodes: for chunk in node.chunks: # Already submitted/running chunks in another task manager if chunk.isAlreadySubmitted() and not self.containsNodeName(chunk.statusNodeName): out.append(chunk) return out nodes = Property(BaseObject, lambda self: self._nodes, constant=True) restartRequested = Signal()