mirror of
https://github.com/alicevision/Meshroom.git
synced 2025-04-30 02:37:26 +02:00
[core] taskManager: code cleanup
This commit is contained in:
parent
991aca989b
commit
b883bd193b
3 changed files with 65 additions and 79 deletions
|
@ -226,7 +226,7 @@ class NodeChunk(BaseObject):
|
||||||
print('WARNING: downgrade status on node "{}" from {} to {}'.format(self.name, self.status.status,
|
print('WARNING: downgrade status on node "{}" from {} to {}'.format(self.name, self.status.status,
|
||||||
newStatus))
|
newStatus))
|
||||||
|
|
||||||
if(newStatus == Status.SUBMITTED):
|
if newStatus == Status.SUBMITTED:
|
||||||
self.status = StatusData(self.node.name, self.node.nodeType, self.node.packageName, self.node.packageVersion)
|
self.status = StatusData(self.node.name, self.node.nodeType, self.node.packageName, self.node.packageVersion)
|
||||||
if execMode is not None:
|
if execMode is not None:
|
||||||
self.status.execMode = execMode
|
self.status.execMode = execMode
|
||||||
|
@ -556,10 +556,8 @@ class BaseNode(BaseObject):
|
||||||
return True
|
return True
|
||||||
|
|
||||||
def isFinishedOrRunning(self):
|
def isFinishedOrRunning(self):
|
||||||
for chunk in self._chunks:
|
""" Return True if all chunks of this Node is either finished or running, False otherwise. """
|
||||||
if not chunk.isFinishedOrRunning():
|
return all(chunk.isFinishedOrRunning() for chunk in self._chunks)
|
||||||
return False
|
|
||||||
return True
|
|
||||||
|
|
||||||
def alreadySubmittedChunks(self):
|
def alreadySubmittedChunks(self):
|
||||||
return [ch for ch in self._chunks if ch.isAlreadySubmitted()]
|
return [ch for ch in self._chunks if ch.isAlreadySubmitted()]
|
||||||
|
|
|
@ -5,6 +5,7 @@ from enum import Enum
|
||||||
import meshroom
|
import meshroom
|
||||||
from meshroom.common import BaseObject, DictModel, Property
|
from meshroom.common import BaseObject, DictModel, Property
|
||||||
|
|
||||||
|
|
||||||
class State(Enum):
|
class State(Enum):
|
||||||
"""
|
"""
|
||||||
State of the Thread that is computing nodes
|
State of the Thread that is computing nodes
|
||||||
|
@ -15,6 +16,7 @@ class State(Enum):
|
||||||
DEAD = 3
|
DEAD = 3
|
||||||
ERROR = 4
|
ERROR = 4
|
||||||
|
|
||||||
|
|
||||||
class TaskThread(Thread):
|
class TaskThread(Thread):
|
||||||
"""
|
"""
|
||||||
A thread with a pile of nodes to compute
|
A thread with a pile of nodes to compute
|
||||||
|
@ -29,85 +31,76 @@ class TaskThread(Thread):
|
||||||
return self._state == State.RUNNING
|
return self._state == State.RUNNING
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
|
""" Consume compute tasks. """
|
||||||
self._state = State.RUNNING
|
self._state = State.RUNNING
|
||||||
|
|
||||||
for n, node in enumerate(self._manager._nodesToProcess):
|
for nId, node in enumerate(self._manager._nodesToProcess):
|
||||||
if not node.isFinishedOrRunning():
|
|
||||||
multiChunks = len(node.chunks) > 1
|
|
||||||
for c, chunk in enumerate(node.chunks):
|
|
||||||
if multiChunks:
|
|
||||||
print('\n[{node}/{nbNodes}]({chunk}/{nbChunks}) {nodeName}'.format(
|
|
||||||
node=n + 1, nbNodes=len(self._manager._nodesToProcess),
|
|
||||||
chunk=c + 1, nbChunks=len(node.chunks), nodeName=node.nodeType))
|
|
||||||
else:
|
|
||||||
print('\n[{node}/{nbNodes}] {nodeName}'.format(
|
|
||||||
node=n + 1, nbNodes=len(self._manager._nodesToProcess), nodeName=node.nodeType))
|
|
||||||
|
|
||||||
if not chunk.isFinishedOrRunning() and self._state == State.RUNNING:
|
# skip already finished/running nodes
|
||||||
|
if node.isFinishedOrRunning():
|
||||||
|
continue
|
||||||
|
|
||||||
|
multiChunks = len(node.chunks) > 1
|
||||||
|
for cId, chunk in enumerate(node.chunks):
|
||||||
|
if chunk.isFinishedOrRunning() or not self.isRunning():
|
||||||
|
continue
|
||||||
|
|
||||||
|
if multiChunks:
|
||||||
|
logging.info('[{node}/{nbNodes}]({chunk}/{nbChunks}) {nodeName}'.format(
|
||||||
|
node=nId+1, nbNodes=len(self._manager._nodesToProcess),
|
||||||
|
chunk=cId+1, nbChunks=len(node.chunks), nodeName=node.nodeType))
|
||||||
|
else:
|
||||||
|
logging.info('[{node}/{nbNodes}] {nodeName}'.format(
|
||||||
|
node=nId+1, nbNodes=len(self._manager._nodesToProcess), nodeName=node.nodeType))
|
||||||
try:
|
try:
|
||||||
chunk.process(self.forceCompute)
|
chunk.process(self.forceCompute)
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
if chunk.isStopped():
|
if chunk.isStopped():
|
||||||
self._state = State.STOPPED
|
self._state = State.STOPPED
|
||||||
self._manager._graph.clearSubmittedNodes()
|
self._manager._graph.clearSubmittedNodes()
|
||||||
self._manager._nodesToProcess = []
|
self._manager._nodesToProcess = []
|
||||||
|
|
||||||
else:
|
else:
|
||||||
logging.error("Error on node computation: {}".format(e))
|
logging.error("Error on node computation: {}".format(e))
|
||||||
nodesToDelete, _ = self._manager._graph.nodesFromNode(node)
|
nodesToRemove, _ = self._manager._graph.nodesFromNode(node)
|
||||||
|
# remove following nodes from the task queue
|
||||||
for nodeD in nodesToDelete:
|
for n in nodesToRemove[1:]: # exclude current node
|
||||||
if nodeD != node:
|
|
||||||
try:
|
try:
|
||||||
self._manager._nodesToProcess.remove(nodeD)
|
self._manager._nodesToProcess.remove(n)
|
||||||
except:
|
except ValueError:
|
||||||
# Node already removed (for instance a global clear of _nodesToProcess)
|
# Node already removed (for instance a global clear of _nodesToProcess)
|
||||||
pass
|
pass
|
||||||
nodeD.clearSubmittedChunks()
|
n.clearSubmittedChunks()
|
||||||
|
|
||||||
self._manager._nodesToProcess = []
|
self._manager._nodesToProcess = []
|
||||||
self._state = State.DEAD
|
self._state = State.DEAD
|
||||||
self._manager._uigraph.computeStatusChanged.emit()
|
|
||||||
|
|
||||||
|
|
||||||
class TaskManager(BaseObject):
|
class TaskManager(BaseObject):
|
||||||
"""
|
"""
|
||||||
Manage the nodes that have to be computed locally or on a renderfarm
|
Manage graph - local and external - computation tasks.
|
||||||
"""
|
"""
|
||||||
def __init__(self, parent=None):
|
def __init__(self, parent=None):
|
||||||
super(TaskManager, self).__init__(parent)
|
super(TaskManager, self).__init__(parent)
|
||||||
|
self._graph = None
|
||||||
self._nodes = DictModel(keyAttrName='_name', parent=self)
|
self._nodes = DictModel(keyAttrName='_name', parent=self)
|
||||||
self._nodesToProcess = []
|
self._nodesToProcess = []
|
||||||
self._nodesExtern = []
|
# internal thread in which local tasks are executed
|
||||||
self._graph = None
|
|
||||||
self._thread = TaskThread(self)
|
self._thread = TaskThread(self)
|
||||||
|
|
||||||
def compute(self, graph=None, toNodes=None, uigraph=None, forceCompute=False, forceStatus=False):
|
def compute(self, graph=None, toNodes=None, forceCompute=False):
|
||||||
"""
|
"""
|
||||||
Nodes are to be computed locally are sent to a thread
|
Start graph computation, from root nodes to leaves - or nodes in 'toNodes' if specified.
|
||||||
:param graph:
|
Computation tasks (NodeChunk) happen in a separate thread (see TaskThread).
|
||||||
:param toNodes:
|
|
||||||
:param uigraph:
|
:param graph: the graph to consider.
|
||||||
:param forceCompute:
|
:param toNodes: specific leaves, all graph leaves if None.
|
||||||
:param forceStatus:
|
:param forceCompute: force the computation despite nodes status.
|
||||||
:return:
|
|
||||||
"""
|
"""
|
||||||
self._graph = graph
|
self._graph = graph
|
||||||
self._uigraph = uigraph
|
|
||||||
|
|
||||||
if self._thread._state in (State.IDLE, State.DEAD, State.ERROR, State.STOPPED):
|
if self._thread._state != State.RUNNING:
|
||||||
try:
|
|
||||||
self._nodes.clear()
|
self._nodes.clear()
|
||||||
except:
|
externEmpty = any(node.isAlreadySubmitted() for node in self._nodesExtern)
|
||||||
print("Task Manager nodes already empty")
|
|
||||||
|
|
||||||
externEmpty = True
|
|
||||||
for node in self._nodesExtern:
|
|
||||||
if node.isAlreadySubmitted():
|
|
||||||
externEmpty = False
|
|
||||||
break
|
|
||||||
|
|
||||||
if not externEmpty:
|
if not externEmpty:
|
||||||
self._nodes.update(self._nodesExtern)
|
self._nodes.update(self._nodesExtern)
|
||||||
else:
|
else:
|
||||||
|
@ -117,10 +110,7 @@ class TaskManager(BaseObject):
|
||||||
nodes, edges = graph.dfsOnFinish(startNodes=toNodes)
|
nodes, edges = graph.dfsOnFinish(startNodes=toNodes)
|
||||||
else:
|
else:
|
||||||
allNodes, edges = graph.dfsToProcess(startNodes=toNodes)
|
allNodes, edges = graph.dfsToProcess(startNodes=toNodes)
|
||||||
nodes = []
|
nodes = [node for node in allNodes if not node.isAlreadySubmittedOrFinished()]
|
||||||
for node in allNodes:
|
|
||||||
if not node.isAlreadySubmittedOrFinished():
|
|
||||||
nodes.append(node)
|
|
||||||
|
|
||||||
for node in nodes:
|
for node in nodes:
|
||||||
node.destroyed.connect(lambda obj=None, name=node.name: self.onNodeDestroyed(obj, name))
|
node.destroyed.connect(lambda obj=None, name=node.name: self.onNodeDestroyed(obj, name))
|
||||||
|
@ -172,7 +162,13 @@ class TaskManager(BaseObject):
|
||||||
:param toNodes:
|
:param toNodes:
|
||||||
:return:
|
:return:
|
||||||
"""
|
"""
|
||||||
if self._thread._state in (State.IDLE, State.DEAD, State.ERROR, State.STOPPED):
|
|
||||||
|
# ensure submitter is properly set
|
||||||
|
sub = meshroom.core.submitters.get(submitter, None)
|
||||||
|
if sub is None:
|
||||||
|
raise RuntimeError("Unknown Submitter : " + submitter)
|
||||||
|
|
||||||
|
if self._thread._state != State.RUNNING:
|
||||||
self._nodes.clear()
|
self._nodes.clear()
|
||||||
|
|
||||||
externEmpty = True
|
externEmpty = True
|
||||||
|
@ -190,10 +186,6 @@ class TaskManager(BaseObject):
|
||||||
flowEdges = graph.flowEdges(startNodes=toNodes)
|
flowEdges = graph.flowEdges(startNodes=toNodes)
|
||||||
edgesToProcess = set(edgesToProcess).intersection(flowEdges)
|
edgesToProcess = set(edgesToProcess).intersection(flowEdges)
|
||||||
|
|
||||||
sub = meshroom.core.submitters.get(submitter, None)
|
|
||||||
if sub is None:
|
|
||||||
raise RuntimeError("Unknown Submitter : " + submitter)
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
res = sub.submit(nodesToProcess, edgesToProcess, graph.filepath)
|
res = sub.submit(nodesToProcess, edgesToProcess, graph.filepath)
|
||||||
if res:
|
if res:
|
||||||
|
@ -207,6 +199,7 @@ class TaskManager(BaseObject):
|
||||||
|
|
||||||
nodes = Property(BaseObject, lambda self: self._nodes, constant=True)
|
nodes = Property(BaseObject, lambda self: self._nodes, constant=True)
|
||||||
|
|
||||||
|
|
||||||
def getAlreadySubmittedChunks(nodes):
|
def getAlreadySubmittedChunks(nodes):
|
||||||
"""
|
"""
|
||||||
Check if nodes already have been submitted
|
Check if nodes already have been submitted
|
||||||
|
@ -219,8 +212,3 @@ def getAlreadySubmittedChunks(nodes):
|
||||||
if chunk.isAlreadySubmitted():
|
if chunk.isAlreadySubmitted():
|
||||||
out.append(chunk)
|
out.append(chunk)
|
||||||
return out
|
return out
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -351,7 +351,7 @@ class UIGraph(QObject):
|
||||||
@Slot(Node)
|
@Slot(Node)
|
||||||
def execute(self, node=None):
|
def execute(self, node=None):
|
||||||
nodes = [node] if node else None
|
nodes = [node] if node else None
|
||||||
self._taskManager.compute(self._graph, nodes, self)
|
self._taskManager.compute(self._graph, nodes)
|
||||||
|
|
||||||
def _execute(self, nodes):
|
def _execute(self, nodes):
|
||||||
self.computeStatusChanged.emit()
|
self.computeStatusChanged.emit()
|
||||||
|
|
Loading…
Add table
Reference in a new issue