Meshroom/meshroom/core/taskManager.py

374 lines
No EOL
14 KiB
Python

import logging
from threading import Thread
from enum import Enum
import meshroom
from meshroom.common import BaseObject, DictModel, Property, Signal, Slot
from meshroom.core.node import Status
import meshroom.core.graph
class State(Enum):
"""
State of the Thread that is computing nodes
"""
IDLE = 0
RUNNING = 1
STOPPED = 2
DEAD = 3
ERROR = 4
class TaskThread(Thread):
"""
A thread with a pile of nodes to compute
"""
def __init__(self, manager):
Thread.__init__(self, target=self.run)
self._state = State.IDLE
self._manager = manager
self.forceCompute = False
def isRunning(self):
return self._state == State.RUNNING
def run(self):
""" Consume compute tasks. """
self._state = State.RUNNING
stopAndRestart = False
for nId, node in enumerate(self._manager._nodesToProcess):
# skip already finished/running nodes
if node.isFinishedOrRunning():
continue
# if a node does not exist anymore, node.chunks becomes a PySide property
try:
multiChunks = len(node.chunks) > 1
except TypeError:
continue
for cId, chunk in enumerate(node.chunks):
if chunk.isFinishedOrRunning() or not self.isRunning():
continue
if multiChunks:
logging.info('[{node}/{nbNodes}]({chunk}/{nbChunks}) {nodeName}'.format(
node=nId+1, nbNodes=len(self._manager._nodesToProcess),
chunk=cId+1, nbChunks=len(node.chunks), nodeName=node.nodeType))
else:
logging.info('[{node}/{nbNodes}] {nodeName}'.format(
node=nId+1, nbNodes=len(self._manager._nodesToProcess), nodeName=node.nodeType))
try:
chunk.process(self.forceCompute)
except Exception as e:
if chunk.isStopped():
stopAndRestart = True
break
else:
logging.error("Error on node computation: {}".format(e))
nodesToRemove, _ = self._manager._graph.dfsOnDiscover(startNodes=[node], reverse=True)
# remove following nodes from the task queue
for n in nodesToRemove[1:]: # exclude current node
try:
self._manager._nodesToProcess.remove(n)
except ValueError:
# Node already removed (for instance a global clear of _nodesToProcess)
pass
n.clearSubmittedChunks()
if stopAndRestart:
break
if stopAndRestart:
self._state = State.STOPPED
self._manager.restartRequested.emit()
else:
self._manager._nodesToProcess = []
self._state = State.DEAD
class TaskManager(BaseObject):
"""
Manage graph - local and external - computation tasks.
"""
def __init__(self, parent=None):
super(TaskManager, self).__init__(parent)
self._graph = None
self._nodes = DictModel(keyAttrName='_name', parent=self)
self._nodesToProcess = []
self._nodesExtern = []
# internal thread in which local tasks are executed
self._thread = TaskThread(self)
self._blockRestart = False
self.restartRequested.connect(self.restart)
def requestBlockRestart(self):
"""
Block computing.
Note: should only be used to completely stop computing.
"""
self._blockRestart = True
def blockRestart(self):
""" Avoid the automatic restart of computing. """
for node in self._nodesToProcess:
if node.getGlobalStatus() in (Status.SUBMITTED, Status.ERROR):
node.upgradeStatusTo(Status.NONE)
self.removeNode(node, displayList=True)
self._blockRestart = False
self._nodesToProcess = []
self._thread._state = State.DEAD
@Slot()
def restart(self):
"""
Restart computing when thread has been stopped.
Note: this is done like this to avoid app freezing.
"""
# Make sure to wait the end of the current thread
self._thread.join()
# Avoid restart if thread was globally stopped
if self._blockRestart:
self.blockRestart()
return
if self._thread._state != State.STOPPED:
return
for node in self._nodesToProcess:
if node.getGlobalStatus() == Status.STOPPED:
# Remove node from the computing list
self.removeNode(node, displayList=False, processList=True)
# Remove output nodes from display and computing lists
outputNodes = node.getOutputNodes(recursive=True)
for n in outputNodes:
if n.getGlobalStatus() in (Status.ERROR, Status.SUBMITTED):
n.upgradeStatusTo(Status.NONE)
self.removeNode(n, displayList=True, processList=True)
# Start a new thread with the remaining nodes to compute
self._thread = TaskThread(self)
self._thread.start()
def compute(self, graph=None, toNodes=None, forceCompute=False, forceStatus=False):
"""
Start graph computation, from root nodes to leaves - or nodes in 'toNodes' if specified.
Computation tasks (NodeChunk) happen in a separate thread (see TaskThread).
:param graph: the graph to consider.
:param toNodes: specific leaves, all graph leaves if None.
:param forceCompute: force the computation despite nodes status.
:param forceStatus: force the computation even if some nodes are submitted externally.
"""
self._graph = graph
if self._thread._state != State.RUNNING:
self._nodes.clear()
externEmpty = any(node.isAlreadySubmitted() for node in self._nodesExtern)
if not externEmpty:
self._nodes.update(self._nodesExtern)
else:
self._nodesExtern = []
if forceCompute:
nodes, edges = graph.dfsOnFinish(startNodes=toNodes)
else:
nodes, edges = graph.dfsToProcess(startNodes=toNodes)
if not nodes:
logging.warning('Nothing to compute')
return
self.checkCompatibilityNodes(graph, nodes, "COMPUTATION") # name of the context is important for QML
nodes = [node for node in nodes if not self.contains(node)] # be sure to avoid non-real conflicts
chunksInConflict = self.getAlreadySubmittedChunks(nodes)
if chunksInConflict:
chunksStatus = set([chunk.status.status.name for chunk in chunksInConflict])
chunksName = [node.name for node in chunksInConflict]
# Warning: Syntax and terms are parsed on QML side to recognize the error
# Syntax : [Context] ErrorType: ErrorMessage
msg = '[COMPUTATION] Already Submitted:\n' \
'WARNING - Some nodes are already submitted with status: {}\nNodes: {}'.format(
', '.join(chunksStatus),
', '.join(chunksName)
)
if forceStatus:
logging.warning(msg)
else:
raise RuntimeError(msg)
for node in nodes:
node.destroyed.connect(lambda obj=None, name=node.name: self.onNodeDestroyed(obj, name))
node.beginSequence(forceCompute)
self._nodes.update(nodes)
self._nodesToProcess.extend(nodes)
if self._thread._state == State.IDLE:
self._thread.start()
elif self._thread._state in (State.DEAD, State.ERROR):
self._thread = TaskThread(self)
self._thread.start()
def onNodeDestroyed(self, obj, name):
"""
Remove node from the taskmanager when it's destroyed in the graph
:param obj:
:param name:
:return:
"""
if name in self._nodes.keys():
self._nodes.pop(name)
def contains(self, node):
return node in self._nodes.values()
def containsNodeName(self, name):
""" Check if a node with the argument name belongs to the display list. """
if name in self._nodes.keys():
return True
return False
def removeNode(self, node, displayList=True, processList=False, externList=False):
""" Remove node from the Task Manager.
Args:
node (Node): node to remove.
displayList (bool): remove from the display list.
processList (bool): remove from the nodesToProcess list.
externList (bool): remove from the nodesExtern list.
"""
if displayList and self._nodes.contains(node):
self._nodes.pop(node.name)
if processList and node in self._nodesToProcess:
self._nodesToProcess.remove(node)
if externList and node in self._nodesExtern:
self._nodesExtern.remove(node)
def clear(self):
"""
Remove all the nodes from the taskmanager
:return:
"""
self._nodes.clear()
self._nodesExtern = []
self._nodesToProcess = []
def update(self, graph):
"""
Add all the nodes that are being rendered in a renderfarm to the taskmanager when new graph is loaded
:param graph:
:return:
"""
for node in graph._nodes:
if node.isAlreadySubmitted() and node._chunks.size() > 0 and node.isExtern():
self._nodes.add(node)
self._nodesExtern.append(node)
def checkCompatibilityNodes(self, graph, nodes, context):
compatNodes = []
for node in nodes:
if node in graph._compatibilityNodes.values():
compatNodes.append(node.nameToLabel(node.name))
if compatNodes:
# Warning: Syntax and terms are parsed on QML side to recognize the error
# Syntax : [Context] ErrorType: ErrorMessage
raise RuntimeError("[{}] Compatibility Issue:\n"
"Cannot compute because of these incompatible nodes:\n"
"{}".format(context, sorted(compatNodes)))
def submit(self, graph=None, submitter=None, toNodes=None):
"""
Nodes are send to the renderfarm
:param graph:
:param submitter:
:param toNodes:
:return:
"""
# Ensure submitter is properly set
sub = None
if submitter:
sub = meshroom.core.submitters.get(submitter, None)
elif len(meshroom.core.submitters) == 1:
# if only one submitter available use it
sub = list(meshroom.core.submitters.values())[0]
if sub is None:
# Warning: Syntax and terms are parsed on QML side to recognize the error
# Syntax : [Context] ErrorType: ErrorMessage
raise RuntimeError("[SUBMITTING] Unknown Submitter:\n"
"Unknown Submitter called '{submitter}'. Available submitters are: '{allSubmitters}'.".format(
submitter=submitter,
allSubmitters=str(meshroom.core.submitters.keys())
))
# Update task manager's lists
if self._thread._state != State.RUNNING:
self._nodes.clear()
externEmpty = True
for node in self._nodesExtern:
if node.isAlreadySubmitted():
externEmpty = False
break
if not externEmpty:
self._nodes.update(self._nodesExtern)
else:
self._nodesExtern = []
nodesToProcess, edgesToProcess = graph.dfsToProcess(startNodes=toNodes)
if not nodesToProcess:
logging.warning('Nothing to compute')
return
self.checkCompatibilityNodes(graph, nodesToProcess, "SUBMITTING") # name of the context is important for QML
flowEdges = graph.flowEdges(startNodes=toNodes)
edgesToProcess = set(edgesToProcess).intersection(flowEdges)
logging.info("Nodes to process: {}".format(nodesToProcess))
logging.info("Edges to process: {}".format(edgesToProcess))
try:
res = sub.submit(nodesToProcess, edgesToProcess, graph.filepath)
if res:
for node in nodesToProcess:
node.destroyed.connect(lambda obj=None, name=node.name: self.onNodeDestroyed(obj, name))
node.submit() # update node status
self._nodes.update(nodesToProcess)
self._nodesExtern.extend(nodesToProcess)
except Exception as e:
logging.error("Error on submit : {}".format(e))
def submitFromFile(self, graphFile, submitter, toNode=None):
"""
Submit the given graph via the given submitter.
"""
graph = meshroom.core.graph.loadGraph(graphFile)
toNodes = graph.findNodes([toNode]) if toNode else None
self.submit(graph, submitter, toNodes)
def getAlreadySubmittedChunks(self, nodes):
"""
Check if nodes have already been submitted in another Meshroom instance.
:param nodes:
:return:
"""
out = []
for node in nodes:
for chunk in node.chunks:
# Already submitted/running chunks in another task manager
if chunk.isAlreadySubmitted() and not self.containsNodeName(chunk.statusNodeName):
out.append(chunk)
return out
nodes = Property(BaseObject, lambda self: self._nodes, constant=True)
restartRequested = Signal()