Make Task Manager with Nodes submitted to a render farm

This commit is contained in:
Lee Geertsen 2019-08-27 17:46:12 +02:00 committed by Yann Lanthony
parent c00db25c23
commit 01974c23ec
No known key found for this signature in database
GPG key ID: 519FAE6DF7A70642
5 changed files with 108 additions and 57 deletions

View file

@ -1029,7 +1029,8 @@ class Graph(BaseObject):
def stopExecution(self): def stopExecution(self):
""" Request graph execution to be stopped by terminating running chunks""" """ Request graph execution to be stopped by terminating running chunks"""
for chunk in self.iterChunksByStatus(Status.RUNNING): for chunk in self.iterChunksByStatus(Status.RUNNING):
chunk.stopProcess() if not chunk.isExtern():
chunk.stopProcess()
@Slot() @Slot()
def clearSubmittedNodes(self): def clearSubmittedNodes(self):

View file

@ -307,6 +307,9 @@ class NodeChunk(BaseObject):
self.upgradeStatusTo(Status.STOPPED) self.upgradeStatusTo(Status.STOPPED)
self.node.nodeDesc.stopProcess(self) self.node.nodeDesc.stopProcess(self)
def isExtern(self):
return self.status.execMode == ExecMode.EXTERN
statusChanged = Signal() statusChanged = Signal()
statusName = Property(str, statusName.fget, notify=statusChanged) statusName = Property(str, statusName.fget, notify=statusChanged)
execModeNameChanged = Signal() execModeNameChanged = Signal()
@ -558,7 +561,8 @@ class BaseNode(BaseObject):
if the graph is still being computed. if the graph is still being computed.
""" """
for chunk in self.alreadySubmittedChunks(): for chunk in self.alreadySubmittedChunks():
chunk.upgradeStatusTo(Status.NONE, ExecMode.NONE) if not chunk.isExtern():
chunk.upgradeStatusTo(Status.NONE, ExecMode.NONE)
def upgradeStatusTo(self, newStatus): def upgradeStatusTo(self, newStatus):
""" """

View file

@ -2,6 +2,7 @@ import logging
from threading import Thread from threading import Thread
from enum import Enum from enum import Enum
import meshroom
from meshroom.common import BaseObject, DictModel, Property from meshroom.common import BaseObject, DictModel, Property
class State(Enum): class State(Enum):
@ -18,44 +19,50 @@ class TaskThread(Thread):
self._manager = manager self._manager = manager
self.forceCompute = False self.forceCompute = False
def isRunning(self):
return self._state == State.RUNNING
def run(self): def run(self):
self._state = State.RUNNING self._state = State.RUNNING
for n, node in enumerate(self._manager._nodesToProcess): for n, node in enumerate(self._manager._nodesToProcess):
if not node.isFinishedOrRunning(): if not node.isFinishedOrRunning():
#try: multiChunks = len(node.chunks) > 1
multiChunks = len(node.chunks) > 1 for c, chunk in enumerate(node.chunks):
for c, chunk in enumerate(node.chunks): if multiChunks:
if multiChunks: print('\n[{node}/{nbNodes}]({chunk}/{nbChunks}) {nodeName}'.format(
print('\n[{node}/{nbNodes}]({chunk}/{nbChunks}) {nodeName}'.format( node=n + 1, nbNodes=len(self._manager._nodesToProcess),
node=n + 1, nbNodes=len(self._manager._nodesToProcess), chunk=c + 1, nbChunks=len(node.chunks), nodeName=node.nodeType))
chunk=c + 1, nbChunks=len(node.chunks), nodeName=node.nodeType)) else:
else: print('\n[{node}/{nbNodes}] {nodeName}'.format(
print('\n[{node}/{nbNodes}] {nodeName}'.format( node=n + 1, nbNodes=len(self._manager._nodesToProcess), nodeName=node.nodeType))
node=n + 1, nbNodes=len(self._manager._nodesToProcess), nodeName=node.nodeType))
if not chunk.isFinishedOrRunning() and self._state == State.RUNNING: if not chunk.isFinishedOrRunning() and self._state == State.RUNNING:
try: try:
chunk.process(self.forceCompute) chunk.process(self.forceCompute)
except Exception as e: except Exception as e:
if chunk.isStopped(): if chunk.isStopped():
self._state = State.STOPPED self._state = State.STOPPED
self._manager._graph.clearSubmittedNodes() self._manager._graph.clearSubmittedNodes()
self._manager._nodesToProcess.clear() self._manager._nodesToProcess.clear()
else: else:
logging.error("Error on node computation: {}".format(e)) logging.error("Error on node computation: {}".format(e))
nodesToDelete, _ = self._manager._graph.nodesFromNode(node) nodesToDelete, _ = self._manager._graph.nodesFromNode(node)
for nodeD in nodesToDelete: for nodeD in nodesToDelete:
if nodeD != node: if nodeD != node:
try:
self._manager._nodesToProcess.remove(nodeD) self._manager._nodesToProcess.remove(nodeD)
nodeD.clearSubmittedChunks() except:
# Node already removed (for instance a global clear of _nodesToProcess)
self._state = State.DEAD pass
nodeD.clearSubmittedChunks()
self._manager._nodesToProcess.clear() self._manager._nodesToProcess.clear()
self._state = State.DEAD
self._manager._uigraph.computeStatusChanged.emit()
class TaskManager(BaseObject): class TaskManager(BaseObject):
@ -63,16 +70,30 @@ class TaskManager(BaseObject):
super(TaskManager, self).__init__(parent) super(TaskManager, self).__init__(parent)
self._nodes = DictModel(keyAttrName='name', parent=self) self._nodes = DictModel(keyAttrName='name', parent=self)
self._nodesToProcess = DictModel(keyAttrName='name', parent=self) self._nodesToProcess = DictModel(keyAttrName='name', parent=self)
self._nodesExtern = DictModel(keyAttrName='name', parent=self)
self._graph = None self._graph = None
self._thread = TaskThread(self) self._thread = TaskThread(self)
def addNodes(self, graph=None, toNodes=None, forceCompute=False, forceStatus=False): def compute(self, graph=None, toNodes=None, uigraph=None, forceCompute=False, forceStatus=False):
self._graph = graph self._graph = graph
self._uigraph = uigraph
logging.info(self._thread._state)
if self._thread._state in (State.IDLE, State.DEAD, State.ERROR, State.STOPPED): if self._thread._state in (State.IDLE, State.DEAD, State.ERROR, State.STOPPED):
self._nodes.clear() try:
self._nodes.clear()
except:
print("Task Manager nodes already empty")
externEmpty = True
for node in self._nodesExtern:
if node.isAlreadySubmitted():
externEmpty = False
break
if not externEmpty:
self._nodes.update(self._nodesExtern)
else:
self._nodesExtern.clear()
if forceCompute: if forceCompute:
nodes, edges = graph.dfsOnFinish(startNodes=toNodes) nodes, edges = graph.dfsOnFinish(startNodes=toNodes)
@ -88,7 +109,10 @@ class TaskManager(BaseObject):
for node in nodes: for node in nodes:
node.beginSequence(forceCompute) node.beginSequence(forceCompute)
self._nodes.update(nodes) try:
self._nodes.update(nodes)
except:
print("nodes already added to Task Manager")
self._nodesToProcess.update(nodes) self._nodesToProcess.update(nodes)
if self._thread._state == State.IDLE: if self._thread._state == State.IDLE:
@ -97,6 +121,39 @@ class TaskManager(BaseObject):
self._thread = TaskThread(self) self._thread = TaskThread(self)
self._thread.start() self._thread.start()
def submit(self, graph=None, submitter=None, toNodes=None):
if self._thread._state in (State.IDLE, State.DEAD, State.ERROR, State.STOPPED):
self._nodes.clear()
externEmpty = True
for node in self._nodesExtern:
if node.isAlreadySubmitted():
externEmpty = False
break
if not externEmpty:
self._nodes.update(self._nodesExtern)
else:
self._nodesExtern.clear()
nodesToProcess, edgesToProcess = graph.dfsToProcess(startNodes=toNodes)
flowEdges = graph.flowEdges(startNodes=toNodes)
edgesToProcess = set(edgesToProcess).intersection(flowEdges)
sub = meshroom.core.submitters.get(submitter, None)
if sub is None:
raise RuntimeError("Unknown Submitter : " + submitter)
try:
res = sub.submit(nodesToProcess, edgesToProcess, graph.filepath)
if res:
for node in nodesToProcess:
node.submit() # update node status
self._nodes.update(nodesToProcess)
self._nodesExtern.update(nodesToProcess)
except Exception as e:
logging.error("Error on submit : {}".format(e))
nodes = Property(BaseObject, lambda self: self._nodes, constant=True) nodes = Property(BaseObject, lambda self: self._nodes, constant=True)
def getAlreadySubmittedChunks(nodes): def getAlreadySubmittedChunks(nodes):

View file

@ -352,7 +352,7 @@ class UIGraph(QObject):
@Slot(Node) @Slot(Node)
def execute(self, node=None): def execute(self, node=None):
nodes = [node] if node else None nodes = [node] if node else None
self._taskManager.addNodes(self._graph, nodes) self._taskManager.compute(self._graph, nodes, self)
def _execute(self, nodes): def _execute(self, nodes):
self.computeStatusChanged.emit() self.computeStatusChanged.emit()
@ -382,7 +382,7 @@ class UIGraph(QObject):
""" """
self.save() # graph must be saved before being submitted self.save() # graph must be saved before being submitted
node = [node] if node else None node = [node] if node else None
submitGraph(self._graph, os.environ.get('MESHROOM_DEFAULT_SUBMITTER', ''), node) self._taskManager.submit(self._graph, os.environ.get('MESHROOM_DEFAULT_SUBMITTER', ''), node)
def updateGraphComputingStatus(self): def updateGraphComputingStatus(self):
# update graph computing status # update graph computing status
@ -399,11 +399,11 @@ class UIGraph(QObject):
def isComputingExternally(self): def isComputingExternally(self):
""" Whether this graph is being computed externally. """ """ Whether this graph is being computed externally. """
return (self._running or self._submitted) and not self.isComputingLocally() return self._submitted
def isComputingLocally(self): def isComputingLocally(self):
""" Whether this graph is being computed locally (i.e computation can be stopped). """ """ Whether this graph is being computed locally (i.e computation can be stopped). """
return self._taskManager._thread.is_alive() return self._taskManager._thread.isRunning()
def push(self, command): def push(self, command):
""" Try and push the given command to the undo stack. """ Try and push the given command to the undo stack.

View file

@ -462,7 +462,6 @@ ApplicationWindow {
Row { Row {
// disable controls if graph is executed externally // disable controls if graph is executed externally
enabled: !_reconstruction.computingExternally
Layout.alignment: Qt.AlignHCenter Layout.alignment: Qt.AlignHCenter
Button { Button {
@ -471,7 +470,6 @@ ApplicationWindow {
palette.button: enabled ? buttonColor : disabledPalette.button palette.button: enabled ? buttonColor : disabledPalette.button
palette.window: enabled ? buttonColor : disabledPalette.window palette.window: enabled ? buttonColor : disabledPalette.window
palette.buttonText: enabled ? "white" : disabledPalette.buttonText palette.buttonText: enabled ? "white" : disabledPalette.buttonText
enabled: computeManager.canStartComputation
onClicked: computeManager.compute(null) onClicked: computeManager.compute(null)
} }
Button { Button {
@ -482,7 +480,6 @@ ApplicationWindow {
Item { width: 20; height: 1 } Item { width: 20; height: 1 }
Button { Button {
visible: _reconstruction.canSubmit visible: _reconstruction.canSubmit
enabled: computeManager.canSubmit
text: "Submit" text: "Submit"
onClicked: computeManager.submit(null) onClicked: computeManager.submit(null)
} }
@ -502,13 +499,6 @@ ApplicationWindow {
} }
} }
Label {
text: "Graph is being computed externally"
font.italic: true
Layout.alignment: Qt.AlignHCenter
visible: _reconstruction.computingExternally
}
// "ProgressBar" reflecting status of all the chunks in the graph, in their process order // "ProgressBar" reflecting status of all the chunks in the graph, in their process order
NodeChunks { NodeChunks {
id: chunksListView id: chunksListView
@ -636,23 +626,22 @@ ApplicationWindow {
width: Math.round(parent.width * 0.3) width: Math.round(parent.width * 0.3)
node: _reconstruction.selectedNode node: _reconstruction.selectedNode
// Make NodeEditor readOnly when computing // Make NodeEditor readOnly when computing
// readOnly: graphLocked
readOnly: { readOnly: {
if(! _reconstruction.computing) { if(! _reconstruction.computing) {
return false; return false;
} }
if(_reconstruction.taskManager.nodes.contains(_reconstruction.selectedNode)) { if(_reconstruction.selectedNode.globalStatus == "SUCCESS") {
return true; var nodes = uigraph.graph.onlyNodesFromNode(_reconstruction.selectedNode);
} else { for(var i = 0; i < nodes.length; i++) {
if(_reconstruction.selectedNode.globalStatus == "SUCCESS") { if(["SUBMITTED", "RUNNING"].includes(nodes[i].globalStatus) && nodes[i].chunks.at(0).statusNodeName == nodes[i].name) {
var nodes = _reconstruction.graph.onlyNodesFromNode(_reconstruction.selectedNode); return true;
for(var i = 0; i < nodes.length; i++) {
if(["SUBMITTED", "RUNNING"].includes(nodes[i].globalStatus) && nodes[i].chunks.at(0).statusNodeName == nodes[i].name) {
return true;
}
} }
} }
} else if(["SUBMITTED", "RUNNING"].includes(_reconstruction.selectedNode.globalStatus)) {
return true;
} else {
return false;
} }
return false; return false;