[core] stop graph execution mechanism

This commit is contained in:
Yann Lanthony 2017-10-13 15:17:34 +02:00
parent 0a69e9f344
commit e494a63c9d

View file

@ -219,6 +219,7 @@ class Node(BaseObject):
self.attribute(k)._value = v
self.status = StatusData(self.name, self.nodeType())
self.statistics = stats.Statistics()
self._subprocess = None
def __getattr__(self, k):
try:
@ -407,6 +408,10 @@ class Node(BaseObject):
def beginSequence(self):
self.upgradeStatusTo(Status.SUBMITTED_LOCAL)
def stopProcess(self):
if self._subprocess:
self._subprocess.terminate()
def process(self):
self.upgradeStatusTo(Status.RUNNING)
statThread = stats.StatisticsThread(self)
@ -416,20 +421,20 @@ class Node(BaseObject):
cmd = self.commandLine()
print('\n =====> commandLine:\n', cmd, '\n')
print(' - logFile: ', self.logFile())
self.proc = psutil.Popen(cmd, stdout=logF, stderr=logF, shell=True)
self._subprocess = psutil.Popen(cmd, stdout=logF, stderr=logF, shell=True)
# store process static info into the status file
self.status.commandLine = cmd
# self.status.env = self.proc.environ()
# self.status.createTime = self.proc.create_time()
statThread.proc = self.proc
stdout, stderr = self.proc.communicate()
self.proc.wait()
statThread.proc = self._subprocess
stdout, stderr = self._subprocess.communicate()
self._subprocess.wait()
self.status.returnCode = self.proc.returncode
self.status.returnCode = self._subprocess.returncode
if self.proc.returncode != 0:
if self._subprocess.returncode != 0:
logContent = ''
with open(self.logFile(), 'r') as logF:
logContent = ''.join(logF.readlines())
@ -438,9 +443,11 @@ class Node(BaseObject):
except:
self.upgradeStatusTo(Status.ERROR)
raise
statThread.running = False
# Don't need to join, the thread will finish a bit later.
# statThread.join()
finally:
statThread.running = False
# Don't need to join, the thread will finish a bit later.
# statThread.join()
self._subprocess = None
self.upgradeStatusTo(Status.SUCCESS)
@ -548,6 +555,7 @@ class Graph(BaseObject):
node._name = self._createUniqueNodeName(node.nodeType())
node.graph = self
self._nodes.add(node)
self.stopExecutionRequested.connect(node.stopProcess)
# Trigger internal update when an attribute is modified
for attr in node.attributes: # type: Attribute
@ -732,6 +740,10 @@ class Graph(BaseObject):
self.updateInternals()
self.updateStatusFromCache()
def stopExecution(self):
""" Request graph execution to be stopped """
self.stopExecutionRequested.emit()
@property
def nodes(self):
return self._nodes
@ -743,6 +755,7 @@ class Graph(BaseObject):
nodes = Property(BaseObject, nodes.fget, constant=True)
edges = Property(BaseObject, edges.fget, constant=True)
stopExecutionRequested = Signal()
def loadGraph(filepath):
"""