mirror of
https://github.com/alicevision/Meshroom.git
synced 2025-04-29 18:27:23 +02:00
Avoid to get trailing "submitted" chunks on the current node when we stop the computation.
469 lines
No EOL
18 KiB
Python
469 lines
No EOL
18 KiB
Python
import logging
|
|
from threading import Thread
|
|
from enum import Enum
|
|
|
|
import meshroom
|
|
from meshroom.common import BaseObject, DictModel, Property, Signal, Slot
|
|
from meshroom.core.node import Status
|
|
import meshroom.core.graph
|
|
|
|
|
|
class State(Enum):
|
|
"""
|
|
State of the Thread that is computing nodes
|
|
"""
|
|
IDLE = 0
|
|
RUNNING = 1
|
|
STOPPED = 2
|
|
DEAD = 3
|
|
ERROR = 4
|
|
|
|
|
|
class TaskThread(Thread):
|
|
"""
|
|
A thread with a pile of nodes to compute
|
|
"""
|
|
def __init__(self, manager):
|
|
Thread.__init__(self, target=self.run)
|
|
self._state = State.IDLE
|
|
self._manager = manager
|
|
self.forceCompute = False
|
|
|
|
def isRunning(self):
|
|
return self._state == State.RUNNING
|
|
|
|
def run(self):
|
|
""" Consume compute tasks. """
|
|
self._state = State.RUNNING
|
|
|
|
stopAndRestart = False
|
|
|
|
for nId, node in enumerate(self._manager._nodesToProcess):
|
|
|
|
# skip already finished/running nodes
|
|
if node.isFinishedOrRunning():
|
|
continue
|
|
|
|
# if a node does not exist anymore, node.chunks becomes a PySide property
|
|
try:
|
|
multiChunks = len(node.chunks) > 1
|
|
except TypeError:
|
|
continue
|
|
|
|
for cId, chunk in enumerate(node.chunks):
|
|
if chunk.isFinishedOrRunning() or not self.isRunning():
|
|
continue
|
|
|
|
if multiChunks:
|
|
logging.info('[{node}/{nbNodes}]({chunk}/{nbChunks}) {nodeName}'.format(
|
|
node=nId+1, nbNodes=len(self._manager._nodesToProcess),
|
|
chunk=cId+1, nbChunks=len(node.chunks), nodeName=node.nodeType))
|
|
else:
|
|
logging.info('[{node}/{nbNodes}] {nodeName}'.format(
|
|
node=nId+1, nbNodes=len(self._manager._nodesToProcess), nodeName=node.nodeType))
|
|
try:
|
|
chunk.process(self.forceCompute)
|
|
except Exception as e:
|
|
if chunk.isStopped():
|
|
stopAndRestart = True
|
|
break
|
|
else:
|
|
logging.error("Error on node computation: {}".format(e))
|
|
nodesToRemove, _ = self._manager._graph.dfsOnDiscover(startNodes=[node], reverse=True)
|
|
# remove following nodes from the task queue
|
|
for n in nodesToRemove[1:]: # exclude current node
|
|
try:
|
|
self._manager._nodesToProcess.remove(n)
|
|
except ValueError:
|
|
# Node already removed (for instance a global clear of _nodesToProcess)
|
|
pass
|
|
n.clearSubmittedChunks()
|
|
|
|
if stopAndRestart:
|
|
break
|
|
|
|
if stopAndRestart:
|
|
self._state = State.STOPPED
|
|
self._manager.restartRequested.emit()
|
|
else:
|
|
self._manager._nodesToProcess = []
|
|
self._state = State.DEAD
|
|
|
|
|
|
class TaskManager(BaseObject):
|
|
"""
|
|
Manage graph - local and external - computation tasks.
|
|
"""
|
|
def __init__(self, parent=None):
|
|
super(TaskManager, self).__init__(parent)
|
|
self._graph = None
|
|
self._nodes = DictModel(keyAttrName='_name', parent=self)
|
|
self._nodesToProcess = []
|
|
self._nodesExtern = []
|
|
# internal thread in which local tasks are executed
|
|
self._thread = TaskThread(self)
|
|
|
|
self._blockRestart = False
|
|
self.restartRequested.connect(self.restart)
|
|
|
|
def requestBlockRestart(self):
|
|
"""
|
|
Block computing.
|
|
Note: should only be used to completely stop computing.
|
|
"""
|
|
self._blockRestart = True
|
|
|
|
def blockRestart(self):
|
|
""" Avoid the automatic restart of computing. """
|
|
for node in self._nodesToProcess:
|
|
chunkCount = 0
|
|
for chunk in node.chunks:
|
|
if chunk.status.status in (Status.SUBMITTED, Status.ERROR):
|
|
chunk.upgradeStatusTo(Status.NONE)
|
|
chunkCount += 1
|
|
if chunkCount == len(node.chunks):
|
|
self.removeNode(node, displayList=True)
|
|
|
|
self._blockRestart = False
|
|
self._nodesToProcess = []
|
|
self._thread._state = State.DEAD
|
|
|
|
@Slot()
|
|
def restart(self):
|
|
"""
|
|
Restart computing when thread has been stopped.
|
|
Note: this is done like this to avoid app freezing.
|
|
"""
|
|
# Make sure to wait the end of the current thread
|
|
self._thread.join()
|
|
|
|
# Avoid restart if thread was globally stopped
|
|
if self._blockRestart:
|
|
self.blockRestart()
|
|
return
|
|
|
|
if self._thread._state != State.STOPPED:
|
|
return
|
|
|
|
for node in self._nodesToProcess:
|
|
if node.getGlobalStatus() == Status.STOPPED:
|
|
# Remove node from the computing list
|
|
self.removeNode(node, displayList=False, processList=True)
|
|
|
|
# Remove output nodes from display and computing lists
|
|
outputNodes = node.getOutputNodes(recursive=True, dependenciesOnly=True)
|
|
for n in outputNodes:
|
|
if n.getGlobalStatus() in (Status.ERROR, Status.SUBMITTED):
|
|
n.upgradeStatusTo(Status.NONE)
|
|
self.removeNode(n, displayList=True, processList=True)
|
|
|
|
# Start a new thread with the remaining nodes to compute
|
|
self._thread = TaskThread(self)
|
|
self._thread.start()
|
|
|
|
def compute(self, graph=None, toNodes=None, forceCompute=False, forceStatus=False):
|
|
"""
|
|
Start graph computation, from root nodes to leaves - or nodes in 'toNodes' if specified.
|
|
Computation tasks (NodeChunk) happen in a separate thread (see TaskThread).
|
|
|
|
:param graph: the graph to consider.
|
|
:param toNodes: specific leaves, all graph leaves if None.
|
|
:param forceCompute: force the computation despite nodes status.
|
|
:param forceStatus: force the computation even if some nodes are submitted externally.
|
|
"""
|
|
self._graph = graph
|
|
|
|
if self._thread._state != State.RUNNING:
|
|
self._nodes.clear()
|
|
externEmpty = any(node.isAlreadySubmitted() for node in self._nodesExtern)
|
|
if not externEmpty:
|
|
self._nodes.update(self._nodesExtern)
|
|
else:
|
|
self._nodesExtern = []
|
|
|
|
if forceCompute:
|
|
nodes, edges = graph.dfsOnFinish(startNodes=toNodes)
|
|
self.checkCompatibilityNodes(graph, nodes, "COMPUTATION") # name of the context is important for QML
|
|
self.checkDuplicates(nodes, "COMPUTATION") # name of the context is important for QML
|
|
else:
|
|
# Check dependencies of toNodes
|
|
if not toNodes:
|
|
toNodes = graph.getLeafNodes(dependenciesOnly=True)
|
|
toNodes = list(toNodes)
|
|
allReady = self.checkNodesDependencies(graph, toNodes, "COMPUTATION")
|
|
|
|
# At this point, toNodes is a list
|
|
# If it is empty, we raise an error to avoid passing through dfsToProcess
|
|
if not toNodes:
|
|
self.raiseImpossibleProcess("COMPUTATION")
|
|
|
|
nodes, edges = graph.dfsToProcess(startNodes=toNodes)
|
|
if not nodes:
|
|
logging.warning('Nothing to compute')
|
|
return
|
|
self.checkCompatibilityNodes(graph, nodes, "COMPUTATION") # name of the context is important for QML
|
|
self.checkDuplicates(nodes, "COMPUTATION") # name of the context is important for QML
|
|
|
|
nodes = [node for node in nodes if not self.contains(node)] # be sure to avoid non-real conflicts
|
|
chunksInConflict = self.getAlreadySubmittedChunks(nodes)
|
|
|
|
if chunksInConflict:
|
|
chunksStatus = set([chunk.status.status.name for chunk in chunksInConflict])
|
|
chunksName = [node.name for node in chunksInConflict]
|
|
# Warning: Syntax and terms are parsed on QML side to recognize the error
|
|
# Syntax : [Context] ErrorType: ErrorMessage
|
|
msg = '[COMPUTATION] Already Submitted:\n' \
|
|
'WARNING - Some nodes are already submitted with status: {}\nNodes: {}'.format(
|
|
', '.join(chunksStatus),
|
|
', '.join(chunksName)
|
|
)
|
|
|
|
if forceStatus:
|
|
logging.warning(msg)
|
|
else:
|
|
raise RuntimeError(msg)
|
|
|
|
for node in nodes:
|
|
node.destroyed.connect(lambda obj=None, name=node.name: self.onNodeDestroyed(obj, name))
|
|
node.beginSequence(forceCompute)
|
|
|
|
self._nodes.update(nodes)
|
|
self._nodesToProcess.extend(nodes)
|
|
|
|
if self._thread._state == State.IDLE:
|
|
self._thread.start()
|
|
elif self._thread._state in (State.DEAD, State.ERROR):
|
|
self._thread = TaskThread(self)
|
|
self._thread.start()
|
|
|
|
# At the end because it raises a WarningError but should not stop processing
|
|
if not allReady:
|
|
self.raiseDependenciesMessage("COMPUTATION")
|
|
|
|
def onNodeDestroyed(self, obj, name):
|
|
"""
|
|
Remove node from the taskmanager when it's destroyed in the graph
|
|
:param obj:
|
|
:param name:
|
|
:return:
|
|
"""
|
|
if name in self._nodes.keys():
|
|
self._nodes.pop(name)
|
|
|
|
def contains(self, node):
|
|
return node in self._nodes.values()
|
|
|
|
def containsNodeName(self, name):
|
|
""" Check if a node with the argument name belongs to the display list. """
|
|
if name in self._nodes.keys():
|
|
return True
|
|
return False
|
|
|
|
def removeNode(self, node, displayList=True, processList=False, externList=False):
|
|
""" Remove node from the Task Manager.
|
|
|
|
Args:
|
|
node (Node): node to remove.
|
|
displayList (bool): remove from the display list.
|
|
processList (bool): remove from the nodesToProcess list.
|
|
externList (bool): remove from the nodesExtern list.
|
|
"""
|
|
if displayList and self._nodes.contains(node):
|
|
self._nodes.pop(node.name)
|
|
if processList and node in self._nodesToProcess:
|
|
self._nodesToProcess.remove(node)
|
|
if externList and node in self._nodesExtern:
|
|
self._nodesExtern.remove(node)
|
|
|
|
def clear(self):
|
|
"""
|
|
Remove all the nodes from the taskmanager
|
|
:return:
|
|
"""
|
|
self._nodes.clear()
|
|
self._nodesExtern = []
|
|
self._nodesToProcess = []
|
|
|
|
def update(self, graph):
|
|
"""
|
|
Add all the nodes that are being rendered in a renderfarm to the taskmanager when new graph is loaded
|
|
:param graph:
|
|
:return:
|
|
"""
|
|
for node in graph._nodes:
|
|
if node.isAlreadySubmitted() and node._chunks.size() > 0 and node.isExtern():
|
|
self._nodes.add(node)
|
|
self._nodesExtern.append(node)
|
|
|
|
def checkCompatibilityNodes(self, graph, nodes, context):
|
|
compatNodes = []
|
|
for node in nodes:
|
|
if node in graph._compatibilityNodes.values():
|
|
compatNodes.append(node.nameToLabel(node.name))
|
|
if compatNodes:
|
|
# Warning: Syntax and terms are parsed on QML side to recognize the error
|
|
# Syntax : [Context] ErrorType: ErrorMessage
|
|
raise RuntimeError("[{}] Compatibility Issue:\n"
|
|
"Cannot compute because of these incompatible nodes:\n"
|
|
"{}".format(context, sorted(compatNodes)))
|
|
|
|
def checkDuplicates(self, nodesToProcess, context):
|
|
for node in nodesToProcess:
|
|
for duplicate in node.duplicates:
|
|
if duplicate in nodesToProcess:
|
|
# Warning: Syntax and terms are parsed on QML side to recognize the error
|
|
# Syntax : [Context] ErrorType: ErrorMessage
|
|
raise RuntimeError("[{}] Duplicates Issue:\n"
|
|
"Cannot compute because there are some duplicate nodes to process:\n\n"
|
|
"First match: '{}' and '{}'\n\n"
|
|
"There can be other duplicate nodes in the list. Please, check the graph and try again.".format(
|
|
context, node.nameToLabel(node.name), node.nameToLabel(duplicate.name)))
|
|
|
|
def checkNodesDependencies(self, graph, toNodes, context):
|
|
"""
|
|
Check dependencies of nodes to process.
|
|
Update toNodes with computable/submittable nodes only.
|
|
|
|
Returns:
|
|
bool: True if all the nodes can be processed. False otherwise.
|
|
"""
|
|
ready = []
|
|
computed = []
|
|
for node in toNodes:
|
|
if context == "COMPUTATION":
|
|
if graph.canCompute(node) and graph.canSubmitOrCompute(node) % 2 == 1:
|
|
ready.append(node)
|
|
elif node.isComputed:
|
|
computed.append(node)
|
|
elif context == "SUBMITTING":
|
|
if graph.canCompute(node) and graph.canSubmitOrCompute(node) > 1:
|
|
ready.append(node)
|
|
elif node.isComputed:
|
|
computed.append(node)
|
|
else:
|
|
raise ValueError("Argument 'context' must be: 'COMPUTATION' or 'SUBMITTING'")
|
|
|
|
if len(ready) + len(computed) != len(toNodes):
|
|
toNodes.clear()
|
|
toNodes.extend(ready)
|
|
return False
|
|
|
|
return True
|
|
|
|
def raiseDependenciesMessage(self, context):
|
|
# Warning: Syntax and terms are parsed on QML side to recognize the error
|
|
# Syntax : [Context] ErrorType: ErrorMessage
|
|
raise RuntimeWarning("[{}] Unresolved dependencies:\n"
|
|
"Some nodes cannot be computed in LOCAL/submitted in EXTERN because of unresolved dependencies.\n\n"
|
|
"Nodes which are ready will be processed.".format(context))
|
|
|
|
def raiseImpossibleProcess(self, context):
|
|
# Warning: Syntax and terms are parsed on QML side to recognize the error
|
|
# Syntax : [Context] ErrorType: ErrorMessage
|
|
raise RuntimeError("[{}] Impossible Process:\n"
|
|
"There is no node able to be processed.".format(context))
|
|
|
|
def submit(self, graph=None, submitter=None, toNodes=None):
|
|
"""
|
|
Nodes are send to the renderfarm
|
|
:param graph:
|
|
:param submitter:
|
|
:param toNodes:
|
|
:return:
|
|
"""
|
|
|
|
# Ensure submitter is properly set
|
|
sub = None
|
|
if submitter:
|
|
sub = meshroom.core.submitters.get(submitter, None)
|
|
elif len(meshroom.core.submitters) == 1:
|
|
# if only one submitter available use it
|
|
allSubmitters = meshroom.core.submitters.values()
|
|
sub = next(iter(allSubmitters)) # retrieve the first element
|
|
if sub is None:
|
|
# Warning: Syntax and terms are parsed on QML side to recognize the error
|
|
# Syntax : [Context] ErrorType: ErrorMessage
|
|
raise RuntimeError("[SUBMITTING] Unknown Submitter:\n"
|
|
"Unknown Submitter called '{submitter}'. Available submitters are: '{allSubmitters}'.".format(
|
|
submitter=submitter,
|
|
allSubmitters=str(meshroom.core.submitters.keys())
|
|
))
|
|
|
|
# Update task manager's lists
|
|
if self._thread._state != State.RUNNING:
|
|
self._nodes.clear()
|
|
|
|
externEmpty = True
|
|
for node in self._nodesExtern:
|
|
if node.isAlreadySubmitted():
|
|
externEmpty = False
|
|
break
|
|
|
|
if not externEmpty:
|
|
self._nodes.update(self._nodesExtern)
|
|
else:
|
|
self._nodesExtern = []
|
|
|
|
# Check dependencies of toNodes
|
|
if not toNodes:
|
|
toNodes = graph.getLeafNodes(dependenciesOnly=True)
|
|
toNodes = list(toNodes)
|
|
allReady = self.checkNodesDependencies(graph, toNodes, "SUBMITTING")
|
|
|
|
# At this point, toNodes is a list
|
|
# If it is empty, we raise an error to avoid passing through dfsToProcess
|
|
if not toNodes:
|
|
self.raiseImpossibleProcess("SUBMITTING")
|
|
|
|
nodesToProcess, edgesToProcess = graph.dfsToProcess(startNodes=toNodes)
|
|
if not nodesToProcess:
|
|
logging.warning('Nothing to compute')
|
|
return
|
|
self.checkCompatibilityNodes(graph, nodesToProcess, "SUBMITTING") # name of the context is important for QML
|
|
self.checkDuplicates(nodesToProcess, "SUBMITTING") # name of the context is important for QML
|
|
|
|
flowEdges = graph.flowEdges(startNodes=toNodes)
|
|
edgesToProcess = set(edgesToProcess).intersection(flowEdges)
|
|
|
|
logging.info("Nodes to process: {}".format(nodesToProcess))
|
|
logging.info("Edges to process: {}".format(edgesToProcess))
|
|
|
|
try:
|
|
res = sub.submit(nodesToProcess, edgesToProcess, graph.filepath)
|
|
if res:
|
|
for node in nodesToProcess:
|
|
node.destroyed.connect(lambda obj=None, name=node.name: self.onNodeDestroyed(obj, name))
|
|
node.submit() # update node status
|
|
self._nodes.update(nodesToProcess)
|
|
self._nodesExtern.extend(nodesToProcess)
|
|
|
|
# At the end because it raises a WarningError but should not stop processing
|
|
if not allReady:
|
|
self.raiseDependenciesMessage("SUBMITTING")
|
|
except Exception as e:
|
|
logging.error("Error on submit : {}".format(e))
|
|
|
|
def submitFromFile(self, graphFile, submitter, toNode=None):
|
|
"""
|
|
Submit the given graph via the given submitter.
|
|
"""
|
|
graph = meshroom.core.graph.loadGraph(graphFile)
|
|
toNodes = graph.findNodes([toNode]) if toNode else None
|
|
self.submit(graph, submitter, toNodes)
|
|
|
|
def getAlreadySubmittedChunks(self, nodes):
|
|
"""
|
|
Check if nodes have already been submitted in another Meshroom instance.
|
|
:param nodes:
|
|
:return:
|
|
"""
|
|
out = []
|
|
for node in nodes:
|
|
for chunk in node.chunks:
|
|
# Already submitted/running chunks in another task manager
|
|
if chunk.isAlreadySubmitted() and not self.containsNodeName(chunk.statusNodeName):
|
|
out.append(chunk)
|
|
return out
|
|
|
|
nodes = Property(BaseObject, lambda self: self._nodes, constant=True)
|
|
restartRequested = Signal() |