[core] do not use the task manager in batch command line tools

Avoid usage of multi-threading to launch the computation in command line
tools without interactive needs
This commit is contained in:
Fabien Castan 2021-10-19 15:48:07 +02:00
parent 1c66c6d9ab
commit ae0c65f563
4 changed files with 112 additions and 12 deletions

View file

@ -1185,6 +1185,15 @@ class Graph(BaseObject):
self.updateStatusFromCache(force=True)
self.cacheDirChanged.emit()
def setVerbose(self, v):
with GraphModification(self):
for node in self._nodes:
if node.hasAttribute('verbose'):
try:
node.verbose.value = v
except:
pass
nodes = Property(BaseObject, nodes.fget, constant=True)
edges = Property(BaseObject, edges.fget, constant=True)
filepathChanged = Signal()
@ -1204,3 +1213,100 @@ def loadGraph(filepath):
graph.load(filepath)
graph.update()
return graph
def getAlreadySubmittedChunks(nodes):
out = []
for node in nodes:
for chunk in node.chunks:
if chunk.isAlreadySubmitted():
out.append(chunk)
return out
def executeGraph(graph, toNodes=None, forceCompute=False, forceStatus=False):
"""
"""
if forceCompute:
nodes, edges = graph.dfsOnFinish(startNodes=toNodes)
else:
nodes, edges = graph.dfsToProcess(startNodes=toNodes)
chunksInConflict = getAlreadySubmittedChunks(nodes)
if chunksInConflict:
chunksStatus = set([chunk.status.status.name for chunk in chunksInConflict])
chunksName = [node.name for node in chunksInConflict]
msg = 'WARNING: Some nodes are already submitted with status: {}\nNodes: {}'.format(
', '.join(chunksStatus),
', '.join(chunksName)
)
if forceStatus:
print(msg)
else:
raise RuntimeError(msg)
print('Nodes to execute: ', str([n.name for n in nodes]))
for node in nodes:
node.beginSequence(forceCompute)
for n, node in enumerate(nodes):
try:
multiChunks = len(node.chunks) > 1
for c, chunk in enumerate(node.chunks):
if multiChunks:
print('\n[{node}/{nbNodes}]({chunk}/{nbChunks}) {nodeName}'.format(
node=n+1, nbNodes=len(nodes),
chunk=c+1, nbChunks=len(node.chunks), nodeName=node.nodeType))
else:
print('\n[{node}/{nbNodes}] {nodeName}'.format(
node=n + 1, nbNodes=len(nodes), nodeName=node.nodeType))
chunk.process(forceCompute)
except Exception as e:
logging.error("Error on node computation: {}".format(e))
graph.clearSubmittedNodes()
raise
for node in nodes:
node.endSequence()
def submitGraph(graph, submitter, toNodes=None):
nodesToProcess, edgesToProcess = graph.dfsToProcess(startNodes=toNodes)
flowEdges = graph.flowEdges(startNodes=toNodes)
edgesToProcess = set(edgesToProcess).intersection(flowEdges)
if not nodesToProcess:
logging.warning('Nothing to compute')
return
logging.info("Nodes to process: {}".format(edgesToProcess))
logging.info("Edges to process: {}".format(edgesToProcess))
sub = None
if submitter:
sub = meshroom.core.submitters.get(submitter, None)
elif len(meshroom.core.submitters) == 1:
# if only one submitter available use it
sub = meshroom.core.submitters.values()[0]
if sub is None:
raise RuntimeError("Unknown Submitter: '{submitter}'. Available submitters are: '{allSubmitters}'.".format(
submitter=submitter, allSubmitters=str(meshroom.core.submitters.keys())))
try:
res = sub.submit(nodesToProcess, edgesToProcess, graph.filepath)
if res:
for node in nodesToProcess:
node.submit() # update node status
except Exception as e:
logging.error("Error on submit : {}".format(e))
def submit(graphFile, submitter, toNode=None):
"""
Submit the given graph via the given submitter.
"""
graph = loadGraph(graphFile)
toNodes = graph.findNodes([toNode]) if toNode else None
submitGraph(graph, submitter, toNodes)