Create TaskManager with a task overview

Create a seperate class to handle the logic of computing nodes
+ An UI overview with all submitted nodes
This commit is contained in:
Lee Geertsen 2019-08-12 18:34:48 +02:00 committed by Yann Lanthony
parent bc1eb83d92
commit 51d6c18840
No known key found for this signature in database
GPG key ID: 519FAE6DF7A70642
11 changed files with 579 additions and 76 deletions

View file

@ -1107,54 +1107,6 @@ def getAlreadySubmittedChunks(nodes):
out.append(chunk)
return out
def executeGraph(graph, toNodes=None, forceCompute=False, forceStatus=False):
"""
"""
if forceCompute:
nodes, edges = graph.dfsOnFinish(startNodes=toNodes)
else:
nodes, edges = graph.dfsToProcess(startNodes=toNodes)
chunksInConflict = getAlreadySubmittedChunks(nodes)
if chunksInConflict:
chunksStatus = set([chunk.status.status.name for chunk in chunksInConflict])
chunksName = [node.name for node in chunksInConflict]
msg = 'WARNING: Some nodes are already submitted with status: {}\nNodes: {}'.format(
', '.join(chunksStatus),
', '.join(chunksName)
)
if forceStatus:
print(msg)
else:
raise RuntimeError(msg)
print('Nodes to execute: ', str([n.name for n in nodes]))
for node in nodes:
node.beginSequence(forceCompute)
for n, node in enumerate(nodes):
try:
multiChunks = len(node.chunks) > 1
for c, chunk in enumerate(node.chunks):
if multiChunks:
print('\n[{node}/{nbNodes}]({chunk}/{nbChunks}) {nodeName}'.format(
node=n+1, nbNodes=len(nodes),
chunk=c+1, nbChunks=len(node.chunks), nodeName=node.nodeType))
else:
print('\n[{node}/{nbNodes}] {nodeName}'.format(
node=n + 1, nbNodes=len(nodes), nodeName=node.nodeType))
chunk.process(forceCompute)
except Exception as e:
logging.error("Error on node computation: {}".format(e))
graph.clearSubmittedNodes()
raise
for node in nodes:
node.endSequence()
def submitGraph(graph, submitter, toNodes=None):
nodesToProcess, edgesToProcess = graph.dfsToProcess(startNodes=toNodes)
flowEdges = graph.flowEdges(startNodes=toNodes)