[core] TaskManager: restart/block thread when stop computing

- If global stop: block restarting thread.
- If local stop: restart thread with the remaining nodes to compute.

This implementation permits to avoid app freezing when a local stop occurs.
This commit is contained in:
Julien-Haudegond 2020-08-31 13:13:59 +02:00
parent f502625c0b
commit 58790e3bce
2 changed files with 87 additions and 29 deletions

View file

@ -3,7 +3,8 @@ from threading import Thread
from enum import Enum from enum import Enum
import meshroom import meshroom
from meshroom.common import BaseObject, DictModel, Property from meshroom.common import BaseObject, DictModel, Property, Signal, Slot
from meshroom.core.node import Status
class State(Enum): class State(Enum):
@ -34,6 +35,8 @@ class TaskThread(Thread):
""" Consume compute tasks. """ """ Consume compute tasks. """
self._state = State.RUNNING self._state = State.RUNNING
stopAndRestart = False
for nId, node in enumerate(self._manager._nodesToProcess): for nId, node in enumerate(self._manager._nodesToProcess):
# skip already finished/running nodes # skip already finished/running nodes
@ -61,7 +64,8 @@ class TaskThread(Thread):
chunk.process(self.forceCompute) chunk.process(self.forceCompute)
except Exception as e: except Exception as e:
if chunk.isStopped(): if chunk.isStopped():
pass stopAndRestart = True
break
else: else:
logging.error("Error on node computation: {}".format(e)) logging.error("Error on node computation: {}".format(e))
nodesToRemove, _ = self._manager._graph.nodesFromNode(node) nodesToRemove, _ = self._manager._graph.nodesFromNode(node)
@ -74,6 +78,13 @@ class TaskThread(Thread):
pass pass
n.clearSubmittedChunks() n.clearSubmittedChunks()
if stopAndRestart:
break
if stopAndRestart:
self._state = State.STOPPED
self._manager.restartRequested.emit()
else:
self._manager._nodesToProcess = [] self._manager._nodesToProcess = []
self._state = State.DEAD self._state = State.DEAD
@ -91,6 +102,60 @@ class TaskManager(BaseObject):
# internal thread in which local tasks are executed # internal thread in which local tasks are executed
self._thread = TaskThread(self) self._thread = TaskThread(self)
self._blockRestart = False
self.restartRequested.connect(self.restart)
def requestBlockRestart(self):
"""
Block computing.
Note: should only be used to completely stop computing.
"""
self._blockRestart = True
def blockRestart(self):
""" Avoid the automatic restart of computing. """
for node in self._nodesToProcess:
if node.getGlobalStatus() in (Status.SUBMITTED, Status.ERROR):
node.upgradeStatusTo(Status.NONE)
self.removeNode(node, displayList=True)
self._blockRestart = False
self._nodesToProcess = []
self._thread._state = State.DEAD
@Slot()
def restart(self):
"""
Restart computing when thread has been stopped.
Note: this is done like this to avoid app freezing.
"""
# Make sure to wait the end of the current thread
self._thread.join()
# Avoid restart if thread was globally stopped
if self._blockRestart:
self.blockRestart()
return
if self._thread._state != State.STOPPED:
return
for node in self._nodesToProcess:
if node.getGlobalStatus() == Status.STOPPED:
# Remove node from the computing list
self.removeNode(node, displayList=False, processList=True)
# Remove output nodes from display and computing lists
outputNodes = node.getOutputNodes(recursive=True)
for n in outputNodes:
if n.getGlobalStatus() in (Status.ERROR, Status.SUBMITTED):
n.upgradeStatusTo(Status.NONE)
self.removeNode(n, displayList=True, processList=True)
# Start a new thread with the remaining nodes to compute
self._thread = TaskThread(self)
self._thread.start()
def compute(self, graph=None, toNodes=None, forceCompute=False, forceStatus=False): def compute(self, graph=None, toNodes=None, forceCompute=False, forceStatus=False):
""" """
Start graph computation, from root nodes to leaves - or nodes in 'toNodes' if specified. Start graph computation, from root nodes to leaves - or nodes in 'toNodes' if specified.
@ -116,7 +181,7 @@ class TaskManager(BaseObject):
else: else:
nodes, edges = graph.dfsToProcess(startNodes=toNodes) nodes, edges = graph.dfsToProcess(startNodes=toNodes)
nodes = [node for node in nodes if not self.contains(node)] # be sure to avoid non-real conflicts nodes = [node for node in nodes if not self.contains(node)] # be sure to avoid non-real conflicts
chunksInConflict = getAlreadySubmittedChunks(nodes) chunksInConflict = self.getAlreadySubmittedChunks(nodes)
if chunksInConflict: if chunksInConflict:
chunksStatus = set([chunk.status.status.name for chunk in chunksInConflict]) chunksStatus = set([chunk.status.status.name for chunk in chunksInConflict])
@ -242,18 +307,19 @@ class TaskManager(BaseObject):
except Exception as e: except Exception as e:
logging.error("Error on submit : {}".format(e)) logging.error("Error on submit : {}".format(e))
nodes = Property(BaseObject, lambda self: self._nodes, constant=True) def getAlreadySubmittedChunks(self, nodes):
def getAlreadySubmittedChunks(nodes):
""" """
Check if nodes already have been submitted Check if nodes have already been submitted in another Meshroom instance.
:param nodes: :param nodes:
:return: :return:
""" """
out = [] out = []
for node in nodes: for node in nodes:
for chunk in node.chunks: for chunk in node.chunks:
if chunk.isAlreadySubmitted(): # Already submitted/running chunks in another task manager
if chunk.isAlreadySubmitted() and not self.containsNodeName(chunk.statusNodeName):
out.append(chunk) out.append(chunk)
return out return out
nodes = Property(BaseObject, lambda self: self._nodes, constant=True)
restartRequested = Signal()

View file

@ -379,8 +379,10 @@ class UIGraph(QObject):
def stopExecution(self): def stopExecution(self):
if not self.isComputingLocally(): if not self.isComputingLocally():
return return
self._taskManager.requestBlockRestart()
self._graph.stopExecution() self._graph.stopExecution()
self._taskManager._thread.join() self._taskManager._thread.join()
self.computeStatusChanged.emit() self.computeStatusChanged.emit()
@Slot(Node) @Slot(Node)
@ -393,16 +395,6 @@ class UIGraph(QObject):
node.stopComputation() node.stopComputation()
self._taskManager._thread.join() self._taskManager._thread.join()
# If some dependent nodes are submitted, the first one will be in error
# Make sure to remove those nodes from the Task Manager list
outputNodes = node.getOutputNodes(recursive=True)
for n in outputNodes:
if n.getGlobalStatus() == Status.ERROR:
n.upgradeStatusTo(Status.NONE)
self._taskManager.removeNode(n)
self.computeStatusChanged.emit()
@Slot(Node) @Slot(Node)
def cancelNodeComputation(self, node): def cancelNodeComputation(self, node):
""" Cancel the computation of the node and all the nodes depending on it. """ """ Cancel the computation of the node and all the nodes depending on it. """