From b883bd193b4b79d1fd219bf20160f99d969d666c Mon Sep 17 00:00:00 2001 From: Yann Lanthony Date: Fri, 24 Jan 2020 18:38:20 +0100 Subject: [PATCH] [core] taskManager: code cleanup --- meshroom/core/node.py | 8 +-- meshroom/core/taskManager.py | 134 ++++++++++++++++------------------- meshroom/ui/graph.py | 2 +- 3 files changed, 65 insertions(+), 79 deletions(-) diff --git a/meshroom/core/node.py b/meshroom/core/node.py index 6fc1ce92..308b3645 100644 --- a/meshroom/core/node.py +++ b/meshroom/core/node.py @@ -226,7 +226,7 @@ class NodeChunk(BaseObject): print('WARNING: downgrade status on node "{}" from {} to {}'.format(self.name, self.status.status, newStatus)) - if(newStatus == Status.SUBMITTED): + if newStatus == Status.SUBMITTED: self.status = StatusData(self.node.name, self.node.nodeType, self.node.packageName, self.node.packageVersion) if execMode is not None: self.status.execMode = execMode @@ -556,10 +556,8 @@ class BaseNode(BaseObject): return True def isFinishedOrRunning(self): - for chunk in self._chunks: - if not chunk.isFinishedOrRunning(): - return False - return True + """ Return True if all chunks of this Node is either finished or running, False otherwise. """ + return all(chunk.isFinishedOrRunning() for chunk in self._chunks) def alreadySubmittedChunks(self): return [ch for ch in self._chunks if ch.isAlreadySubmitted()] diff --git a/meshroom/core/taskManager.py b/meshroom/core/taskManager.py index a5eee0d7..58c3afac 100644 --- a/meshroom/core/taskManager.py +++ b/meshroom/core/taskManager.py @@ -5,6 +5,7 @@ from enum import Enum import meshroom from meshroom.common import BaseObject, DictModel, Property + class State(Enum): """ State of the Thread that is computing nodes @@ -15,6 +16,7 @@ class State(Enum): DEAD = 3 ERROR = 4 + class TaskThread(Thread): """ A thread with a pile of nodes to compute @@ -29,85 +31,76 @@ class TaskThread(Thread): return self._state == State.RUNNING def run(self): + """ Consume compute tasks. """ self._state = State.RUNNING - for n, node in enumerate(self._manager._nodesToProcess): - if not node.isFinishedOrRunning(): - multiChunks = len(node.chunks) > 1 - for c, chunk in enumerate(node.chunks): - if multiChunks: - print('\n[{node}/{nbNodes}]({chunk}/{nbChunks}) {nodeName}'.format( - node=n + 1, nbNodes=len(self._manager._nodesToProcess), - chunk=c + 1, nbChunks=len(node.chunks), nodeName=node.nodeType)) + for nId, node in enumerate(self._manager._nodesToProcess): + + # skip already finished/running nodes + if node.isFinishedOrRunning(): + continue + + multiChunks = len(node.chunks) > 1 + for cId, chunk in enumerate(node.chunks): + if chunk.isFinishedOrRunning() or not self.isRunning(): + continue + + if multiChunks: + logging.info('[{node}/{nbNodes}]({chunk}/{nbChunks}) {nodeName}'.format( + node=nId+1, nbNodes=len(self._manager._nodesToProcess), + chunk=cId+1, nbChunks=len(node.chunks), nodeName=node.nodeType)) + else: + logging.info('[{node}/{nbNodes}] {nodeName}'.format( + node=nId+1, nbNodes=len(self._manager._nodesToProcess), nodeName=node.nodeType)) + try: + chunk.process(self.forceCompute) + except Exception as e: + if chunk.isStopped(): + self._state = State.STOPPED + self._manager._graph.clearSubmittedNodes() + self._manager._nodesToProcess = [] else: - print('\n[{node}/{nbNodes}] {nodeName}'.format( - node=n + 1, nbNodes=len(self._manager._nodesToProcess), nodeName=node.nodeType)) - - if not chunk.isFinishedOrRunning() and self._state == State.RUNNING: - try: - chunk.process(self.forceCompute) - - except Exception as e: - if chunk.isStopped(): - self._state = State.STOPPED - self._manager._graph.clearSubmittedNodes() - self._manager._nodesToProcess = [] - - else: - logging.error("Error on node computation: {}".format(e)) - nodesToDelete, _ = self._manager._graph.nodesFromNode(node) - - for nodeD in nodesToDelete: - if nodeD != node: - try: - self._manager._nodesToProcess.remove(nodeD) - except: - # Node already removed (for instance a global clear of _nodesToProcess) - pass - nodeD.clearSubmittedChunks() + logging.error("Error on node computation: {}".format(e)) + nodesToRemove, _ = self._manager._graph.nodesFromNode(node) + # remove following nodes from the task queue + for n in nodesToRemove[1:]: # exclude current node + try: + self._manager._nodesToProcess.remove(n) + except ValueError: + # Node already removed (for instance a global clear of _nodesToProcess) + pass + n.clearSubmittedChunks() self._manager._nodesToProcess = [] self._state = State.DEAD - self._manager._uigraph.computeStatusChanged.emit() class TaskManager(BaseObject): """ - Manage the nodes that have to be computed locally or on a renderfarm + Manage graph - local and external - computation tasks. """ def __init__(self, parent=None): super(TaskManager, self).__init__(parent) + self._graph = None self._nodes = DictModel(keyAttrName='_name', parent=self) self._nodesToProcess = [] - self._nodesExtern = [] - self._graph = None + # internal thread in which local tasks are executed self._thread = TaskThread(self) - def compute(self, graph=None, toNodes=None, uigraph=None, forceCompute=False, forceStatus=False): + def compute(self, graph=None, toNodes=None, forceCompute=False): """ - Nodes are to be computed locally are sent to a thread - :param graph: - :param toNodes: - :param uigraph: - :param forceCompute: - :param forceStatus: - :return: + Start graph computation, from root nodes to leaves - or nodes in 'toNodes' if specified. + Computation tasks (NodeChunk) happen in a separate thread (see TaskThread). + + :param graph: the graph to consider. + :param toNodes: specific leaves, all graph leaves if None. + :param forceCompute: force the computation despite nodes status. """ self._graph = graph - self._uigraph = uigraph - - if self._thread._state in (State.IDLE, State.DEAD, State.ERROR, State.STOPPED): - try: - self._nodes.clear() - except: - print("Task Manager nodes already empty") - - externEmpty = True - for node in self._nodesExtern: - if node.isAlreadySubmitted(): - externEmpty = False - break + if self._thread._state != State.RUNNING: + self._nodes.clear() + externEmpty = any(node.isAlreadySubmitted() for node in self._nodesExtern) if not externEmpty: self._nodes.update(self._nodesExtern) else: @@ -117,17 +110,14 @@ class TaskManager(BaseObject): nodes, edges = graph.dfsOnFinish(startNodes=toNodes) else: allNodes, edges = graph.dfsToProcess(startNodes=toNodes) - nodes = [] - for node in allNodes: - if not node.isAlreadySubmittedOrFinished(): - nodes.append(node) + nodes = [node for node in allNodes if not node.isAlreadySubmittedOrFinished()] for node in nodes: node.destroyed.connect(lambda obj=None, name=node.name: self.onNodeDestroyed(obj, name)) node.beginSequence(forceCompute) self._nodes.update(nodes) - self._nodesToProcess .extend(nodes) + self._nodesToProcess.extend(nodes) if self._thread._state == State.IDLE: self._thread.start() @@ -172,7 +162,13 @@ class TaskManager(BaseObject): :param toNodes: :return: """ - if self._thread._state in (State.IDLE, State.DEAD, State.ERROR, State.STOPPED): + + # ensure submitter is properly set + sub = meshroom.core.submitters.get(submitter, None) + if sub is None: + raise RuntimeError("Unknown Submitter : " + submitter) + + if self._thread._state != State.RUNNING: self._nodes.clear() externEmpty = True @@ -190,10 +186,6 @@ class TaskManager(BaseObject): flowEdges = graph.flowEdges(startNodes=toNodes) edgesToProcess = set(edgesToProcess).intersection(flowEdges) - sub = meshroom.core.submitters.get(submitter, None) - if sub is None: - raise RuntimeError("Unknown Submitter : " + submitter) - try: res = sub.submit(nodesToProcess, edgesToProcess, graph.filepath) if res: @@ -207,6 +199,7 @@ class TaskManager(BaseObject): nodes = Property(BaseObject, lambda self: self._nodes, constant=True) + def getAlreadySubmittedChunks(nodes): """ Check if nodes already have been submitted @@ -219,8 +212,3 @@ def getAlreadySubmittedChunks(nodes): if chunk.isAlreadySubmitted(): out.append(chunk) return out - - - - - diff --git a/meshroom/ui/graph.py b/meshroom/ui/graph.py index b38c8b13..a27d604a 100644 --- a/meshroom/ui/graph.py +++ b/meshroom/ui/graph.py @@ -351,7 +351,7 @@ class UIGraph(QObject): @Slot(Node) def execute(self, node=None): nodes = [node] if node else None - self._taskManager.compute(self._graph, nodes, self) + self._taskManager.compute(self._graph, nodes) def _execute(self, nodes): self.computeStatusChanged.emit()