mirror of
https://github.com/alicevision/Meshroom.git
synced 2025-04-29 18:27:23 +02:00
461 lines
18 KiB
Python
461 lines
18 KiB
Python
import logging
|
|
from threading import Thread
|
|
from enum import Enum
|
|
|
|
import meshroom
|
|
from meshroom.common import BaseObject, DictModel, Property, Signal, Slot
|
|
from meshroom.core.node import Status
|
|
import meshroom.core.graph
|
|
|
|
|
|
class State(Enum):
|
|
"""
|
|
State of the Thread that is computing nodes
|
|
"""
|
|
IDLE = 0
|
|
RUNNING = 1
|
|
STOPPED = 2
|
|
DEAD = 3
|
|
ERROR = 4
|
|
|
|
|
|
class TaskThread(Thread):
|
|
"""
|
|
A thread with a pile of nodes to compute
|
|
"""
|
|
def __init__(self, manager):
|
|
Thread.__init__(self, target=self.run)
|
|
self._state = State.IDLE
|
|
self._manager = manager
|
|
self.forceCompute = False
|
|
|
|
def isRunning(self):
|
|
return self._state == State.RUNNING
|
|
|
|
def run(self):
|
|
""" Consume compute tasks. """
|
|
self._state = State.RUNNING
|
|
|
|
stopAndRestart = False
|
|
|
|
for nId, node in enumerate(self._manager._nodesToProcess):
|
|
|
|
# skip already finished/running nodes
|
|
if node.isFinishedOrRunning():
|
|
continue
|
|
|
|
# if a node does not exist anymore, node.chunks becomes a PySide property
|
|
try:
|
|
multiChunks = len(node.chunks) > 1
|
|
except TypeError:
|
|
continue
|
|
|
|
for cId, chunk in enumerate(node.chunks):
|
|
if chunk.isFinishedOrRunning() or not self.isRunning():
|
|
continue
|
|
|
|
if multiChunks:
|
|
logging.info('[{node}/{nbNodes}]({chunk}/{nbChunks}) {nodeName}'.format(
|
|
node=nId+1, nbNodes=len(self._manager._nodesToProcess),
|
|
chunk=cId+1, nbChunks=len(node.chunks), nodeName=node.nodeType))
|
|
else:
|
|
logging.info('[{node}/{nbNodes}] {nodeName}'.format(
|
|
node=nId+1, nbNodes=len(self._manager._nodesToProcess), nodeName=node.nodeType))
|
|
try:
|
|
chunk.process(self.forceCompute)
|
|
except Exception as e:
|
|
if chunk.isStopped():
|
|
stopAndRestart = True
|
|
break
|
|
else:
|
|
logging.error("Error on node computation: {}".format(e))
|
|
nodesToRemove, _ = self._manager._graph.dfsOnDiscover(startNodes=[node], reverse=True)
|
|
# remove following nodes from the task queue
|
|
for n in nodesToRemove[1:]: # exclude current node
|
|
try:
|
|
self._manager._nodesToProcess.remove(n)
|
|
except ValueError:
|
|
# Node already removed (for instance a global clear of _nodesToProcess)
|
|
pass
|
|
n.clearSubmittedChunks()
|
|
|
|
if stopAndRestart:
|
|
break
|
|
|
|
if stopAndRestart:
|
|
self._state = State.STOPPED
|
|
self._manager.restartRequested.emit()
|
|
else:
|
|
self._manager._nodesToProcess = []
|
|
self._state = State.DEAD
|
|
|
|
|
|
class TaskManager(BaseObject):
|
|
"""
|
|
Manage graph - local and external - computation tasks.
|
|
"""
|
|
def __init__(self, parent=None):
|
|
super(TaskManager, self).__init__(parent)
|
|
self._graph = None
|
|
self._nodes = DictModel(keyAttrName='_name', parent=self)
|
|
self._nodesToProcess = []
|
|
self._nodesExtern = []
|
|
# internal thread in which local tasks are executed
|
|
self._thread = TaskThread(self)
|
|
|
|
self._blockRestart = False
|
|
self.restartRequested.connect(self.restart)
|
|
|
|
def requestBlockRestart(self):
|
|
"""
|
|
Block computing.
|
|
Note: should only be used to completely stop computing.
|
|
"""
|
|
self._blockRestart = True
|
|
|
|
def blockRestart(self):
|
|
""" Avoid the automatic restart of computing. """
|
|
for node in self._nodesToProcess:
|
|
chunkCount = 0
|
|
for chunk in node.chunks:
|
|
if chunk.status.status in (Status.SUBMITTED, Status.ERROR):
|
|
chunk.upgradeStatusTo(Status.NONE)
|
|
chunkCount += 1
|
|
if chunkCount == len(node.chunks):
|
|
self.removeNode(node, displayList=True)
|
|
|
|
self._blockRestart = False
|
|
self._nodesToProcess = []
|
|
self._thread._state = State.DEAD
|
|
|
|
@Slot()
|
|
def restart(self):
|
|
"""
|
|
Restart computing when thread has been stopped.
|
|
Note: this is done like this to avoid app freezing.
|
|
"""
|
|
# Make sure to wait the end of the current thread
|
|
self._thread.join()
|
|
|
|
# Avoid restart if thread was globally stopped
|
|
if self._blockRestart:
|
|
self.blockRestart()
|
|
return
|
|
|
|
if self._thread._state != State.STOPPED:
|
|
return
|
|
|
|
for node in self._nodesToProcess:
|
|
if node.getGlobalStatus() == Status.STOPPED:
|
|
# Remove node from the computing list
|
|
self.removeNode(node, displayList=False, processList=True)
|
|
|
|
# Remove output nodes from display and computing lists
|
|
outputNodes = node.getOutputNodes(recursive=True, dependenciesOnly=True)
|
|
for n in outputNodes:
|
|
if n.getGlobalStatus() in (Status.ERROR, Status.SUBMITTED):
|
|
n.upgradeStatusTo(Status.NONE)
|
|
self.removeNode(n, displayList=True, processList=True)
|
|
|
|
# Start a new thread with the remaining nodes to compute
|
|
self._thread = TaskThread(self)
|
|
self._thread.start()
|
|
|
|
def compute(self, graph=None, toNodes=None, forceCompute=False, forceStatus=False):
|
|
"""
|
|
Start graph computation, from root nodes to leaves - or nodes in 'toNodes' if specified.
|
|
Computation tasks (NodeChunk) happen in a separate thread (see TaskThread).
|
|
|
|
:param graph: the graph to consider.
|
|
:param toNodes: specific leaves, all graph leaves if None.
|
|
:param forceCompute: force the computation despite nodes status.
|
|
:param forceStatus: force the computation even if some nodes are submitted externally.
|
|
"""
|
|
self._graph = graph
|
|
|
|
self.updateNodes()
|
|
|
|
if forceCompute:
|
|
nodes, edges = graph.dfsOnFinish(startNodes=toNodes)
|
|
self.checkCompatibilityNodes(graph, nodes, "COMPUTATION") # name of the context is important for QML
|
|
self.checkDuplicates(nodes, "COMPUTATION") # name of the context is important for QML
|
|
else:
|
|
# Check dependencies of toNodes
|
|
if not toNodes:
|
|
toNodes = graph.getLeafNodes(dependenciesOnly=True)
|
|
toNodes = list(toNodes)
|
|
allReady = self.checkNodesDependencies(graph, toNodes, "COMPUTATION")
|
|
|
|
# At this point, toNodes is a list
|
|
# If it is empty, we raise an error to avoid passing through dfsToProcess
|
|
if not toNodes:
|
|
self.raiseImpossibleProcess("COMPUTATION")
|
|
|
|
nodes, edges = graph.dfsToProcess(startNodes=toNodes)
|
|
if not nodes:
|
|
logging.warning('Nothing to compute')
|
|
return
|
|
self.checkCompatibilityNodes(graph, nodes, "COMPUTATION") # name of the context is important for QML
|
|
self.checkDuplicates(nodes, "COMPUTATION") # name of the context is important for QML
|
|
|
|
nodes = [node for node in nodes if not self.contains(node)] # be sure to avoid non-real conflicts
|
|
chunksInConflict = self.getAlreadySubmittedChunks(nodes)
|
|
|
|
if chunksInConflict:
|
|
chunksStatus = set([chunk.status.status.name for chunk in chunksInConflict])
|
|
chunksName = [node.name for node in chunksInConflict]
|
|
# Warning: Syntax and terms are parsed on QML side to recognize the error
|
|
# Syntax : [Context] ErrorType: ErrorMessage
|
|
msg = '[COMPUTATION] Already Submitted:\n' \
|
|
'WARNING - Some nodes are already submitted with status: {}\nNodes: {}'.format(
|
|
', '.join(chunksStatus),
|
|
', '.join(chunksName)
|
|
)
|
|
|
|
if forceStatus:
|
|
logging.warning(msg)
|
|
else:
|
|
raise RuntimeError(msg)
|
|
|
|
for node in nodes:
|
|
node.destroyed.connect(lambda obj=None, name=node.name: self.onNodeDestroyed(obj, name))
|
|
node.beginSequence(forceCompute)
|
|
|
|
self._nodes.update(nodes)
|
|
self._nodesToProcess.extend(nodes)
|
|
|
|
if self._thread._state == State.IDLE:
|
|
self._thread.start()
|
|
elif self._thread._state in (State.DEAD, State.ERROR):
|
|
self._thread = TaskThread(self)
|
|
self._thread.start()
|
|
|
|
# At the end because it raises a WarningError but should not stop processing
|
|
if not allReady:
|
|
self.raiseDependenciesMessage("COMPUTATION")
|
|
|
|
def onNodeDestroyed(self, obj, name):
|
|
"""
|
|
Remove node from the taskmanager when it's destroyed in the graph
|
|
:param obj:
|
|
:param name:
|
|
:return:
|
|
"""
|
|
if name in self._nodes.keys():
|
|
self._nodes.pop(name)
|
|
|
|
def contains(self, node):
|
|
return node in self._nodes.values()
|
|
|
|
def containsNodeName(self, name):
|
|
""" Check if a node with the argument name belongs to the display list. """
|
|
if name in self._nodes.keys():
|
|
return True
|
|
return False
|
|
|
|
def removeNode(self, node, displayList=True, processList=False, externList=False):
|
|
""" Remove node from the Task Manager.
|
|
|
|
Args:
|
|
node (Node): node to remove.
|
|
displayList (bool): remove from the display list.
|
|
processList (bool): remove from the nodesToProcess list.
|
|
externList (bool): remove from the nodesExtern list.
|
|
"""
|
|
if displayList and self._nodes.contains(node):
|
|
self._nodes.pop(node.name)
|
|
if processList and node in self._nodesToProcess:
|
|
self._nodesToProcess.remove(node)
|
|
if externList and node in self._nodesExtern:
|
|
self._nodesExtern.remove(node)
|
|
|
|
def clear(self):
|
|
"""
|
|
Remove all the nodes from the taskmanager
|
|
:return:
|
|
"""
|
|
self._nodes.clear()
|
|
self._nodesExtern = []
|
|
self._nodesToProcess = []
|
|
|
|
def updateNodes(self):
|
|
"""
|
|
Update task manager nodes lists by checking the nodes status.
|
|
"""
|
|
self._nodesExtern = [node for node in self._nodesExtern if node.isExtern() and node.isAlreadySubmitted()]
|
|
newNodes = [node for node in self._nodes if node.isAlreadySubmitted()]
|
|
if len(newNodes) != len(self._nodes):
|
|
self._nodes.clear()
|
|
self._nodes.update(newNodes)
|
|
|
|
def update(self, graph):
|
|
"""
|
|
Add all the nodes that are being rendered in a renderfarm to the taskmanager when new graph is loaded
|
|
:param graph:
|
|
:return:
|
|
"""
|
|
for node in graph._nodes:
|
|
if node.isAlreadySubmitted() and node._chunks.size() > 0 and node.isExtern():
|
|
self._nodes.add(node)
|
|
self._nodesExtern.append(node)
|
|
|
|
def checkCompatibilityNodes(self, graph, nodes, context):
|
|
compatNodes = []
|
|
for node in nodes:
|
|
if node in graph._compatibilityNodes.values():
|
|
compatNodes.append(node.nameToLabel(node.name))
|
|
if compatNodes:
|
|
# Warning: Syntax and terms are parsed on QML side to recognize the error
|
|
# Syntax : [Context] ErrorType: ErrorMessage
|
|
raise RuntimeError("[{}] Compatibility Issue:\n"
|
|
"Cannot compute because of these incompatible nodes:\n"
|
|
"{}".format(context, sorted(compatNodes)))
|
|
|
|
def checkDuplicates(self, nodesToProcess, context):
|
|
for node in nodesToProcess:
|
|
for duplicate in node.duplicates:
|
|
if duplicate in nodesToProcess:
|
|
# Warning: Syntax and terms are parsed on QML side to recognize the error
|
|
# Syntax : [Context] ErrorType: ErrorMessage
|
|
raise RuntimeError("[{}] Duplicates Issue:\n"
|
|
"Cannot compute because there are some duplicate nodes to process:\n\n"
|
|
"First match: '{}' and '{}'\n\n"
|
|
"There can be other duplicate nodes in the list. Please, check the graph and try again.".format(
|
|
context, node.nameToLabel(node.name), node.nameToLabel(duplicate.name)))
|
|
|
|
def checkNodesDependencies(self, graph, toNodes, context):
|
|
"""
|
|
Check dependencies of nodes to process.
|
|
Update toNodes with computable/submittable nodes only.
|
|
|
|
Returns:
|
|
bool: True if all the nodes can be processed. False otherwise.
|
|
"""
|
|
ready = []
|
|
computed = []
|
|
for node in toNodes:
|
|
if context == "COMPUTATION":
|
|
if graph.canCompute(node) and graph.canSubmitOrCompute(node) % 2 == 1:
|
|
ready.append(node)
|
|
elif node.isComputed:
|
|
computed.append(node)
|
|
elif context == "SUBMITTING":
|
|
if graph.canCompute(node) and graph.canSubmitOrCompute(node) > 1:
|
|
ready.append(node)
|
|
elif node.isComputed:
|
|
computed.append(node)
|
|
else:
|
|
raise ValueError("Argument 'context' must be: 'COMPUTATION' or 'SUBMITTING'")
|
|
|
|
if len(ready) + len(computed) != len(toNodes):
|
|
del toNodes[:] # for python 2 compatibility, else use: toNodes.clear()
|
|
toNodes.extend(ready)
|
|
return False
|
|
|
|
return True
|
|
|
|
def raiseDependenciesMessage(self, context):
|
|
# Warning: Syntax and terms are parsed on QML side to recognize the error
|
|
# Syntax : [Context] ErrorType: ErrorMessage
|
|
raise RuntimeWarning("[{}] Unresolved dependencies:\n"
|
|
"Some nodes cannot be computed in LOCAL/submitted in EXTERN because of unresolved dependencies.\n\n"
|
|
"Nodes which are ready will be processed.".format(context))
|
|
|
|
def raiseImpossibleProcess(self, context):
|
|
# Warning: Syntax and terms are parsed on QML side to recognize the error
|
|
# Syntax : [Context] ErrorType: ErrorMessage
|
|
raise RuntimeError("[{}] Impossible Process:\n"
|
|
"There is no node able to be processed.".format(context))
|
|
|
|
def submit(self, graph, submitter=None, toNodes=None):
|
|
"""
|
|
Nodes are send to the renderfarm
|
|
:param graph:
|
|
:param submitter:
|
|
:param toNodes:
|
|
:return:
|
|
"""
|
|
|
|
# Ensure submitter is properly set
|
|
sub = None
|
|
if submitter:
|
|
sub = meshroom.core.submitters.get(submitter, None)
|
|
elif len(meshroom.core.submitters) == 1:
|
|
# if only one submitter available use it
|
|
allSubmitters = meshroom.core.submitters.values()
|
|
sub = next(iter(allSubmitters)) # retrieve the first element
|
|
if sub is None:
|
|
# Warning: Syntax and terms are parsed on QML side to recognize the error
|
|
# Syntax : [Context] ErrorType: ErrorMessage
|
|
raise RuntimeError("[SUBMITTING] Unknown Submitter:\n"
|
|
"Unknown Submitter called '{submitter}'. Available submitters are: '{allSubmitters}'.".format(
|
|
submitter=submitter,
|
|
allSubmitters=str(meshroom.core.submitters.keys())
|
|
))
|
|
|
|
# Update task manager's lists
|
|
self.updateNodes()
|
|
graph.update()
|
|
|
|
# Check dependencies of toNodes
|
|
if not toNodes:
|
|
toNodes = graph.getLeafNodes(dependenciesOnly=True)
|
|
toNodes = list(toNodes)
|
|
allReady = self.checkNodesDependencies(graph, toNodes, "SUBMITTING")
|
|
|
|
# At this point, toNodes is a list
|
|
# If it is empty, we raise an error to avoid passing through dfsToProcess
|
|
if not toNodes:
|
|
self.raiseImpossibleProcess("SUBMITTING")
|
|
|
|
nodesToProcess, edgesToProcess = graph.dfsToProcess(startNodes=toNodes)
|
|
if not nodesToProcess:
|
|
logging.warning('Nothing to compute')
|
|
return
|
|
self.checkCompatibilityNodes(graph, nodesToProcess, "SUBMITTING") # name of the context is important for QML
|
|
self.checkDuplicates(nodesToProcess, "SUBMITTING") # name of the context is important for QML
|
|
|
|
flowEdges = graph.flowEdges(startNodes=toNodes)
|
|
edgesToProcess = set(edgesToProcess).intersection(flowEdges)
|
|
|
|
logging.info("Nodes to process: {}".format(nodesToProcess))
|
|
logging.info("Edges to process: {}".format(edgesToProcess))
|
|
|
|
try:
|
|
res = sub.submit(nodesToProcess, edgesToProcess, graph.filepath)
|
|
if res:
|
|
for node in nodesToProcess:
|
|
node.destroyed.connect(lambda obj=None, name=node.name: self.onNodeDestroyed(obj, name))
|
|
node.submit() # update node status
|
|
self._nodes.update(nodesToProcess)
|
|
self._nodesExtern.extend(nodesToProcess)
|
|
|
|
# At the end because it raises a WarningError but should not stop processing
|
|
if not allReady:
|
|
self.raiseDependenciesMessage("SUBMITTING")
|
|
except Exception as e:
|
|
logging.error("Error on submit : {}".format(e))
|
|
|
|
def submitFromFile(self, graphFile, submitter, toNode=None):
|
|
"""
|
|
Submit the given graph via the given submitter.
|
|
"""
|
|
graph = meshroom.core.graph.loadGraph(graphFile)
|
|
self.submit(graph, submitter, toNode)
|
|
|
|
def getAlreadySubmittedChunks(self, nodes):
|
|
"""
|
|
Check if nodes have already been submitted in another Meshroom instance.
|
|
:param nodes:
|
|
:return:
|
|
"""
|
|
out = []
|
|
for node in nodes:
|
|
for chunk in node.chunks:
|
|
# Already submitted/running chunks in another task manager
|
|
if chunk.isAlreadySubmitted() and not self.containsNodeName(chunk.statusNodeName):
|
|
out.append(chunk)
|
|
return out
|
|
|
|
nodes = Property(BaseObject, lambda self: self._nodes, constant=True)
|
|
restartRequested = Signal()
|