[core] support killed job

* add KILLED status
* register atexit function to update the status of running nodes to
KILLED
This commit is contained in:
Fabien Castan 2017-10-26 19:56:58 +02:00
parent 99ec7ab386
commit 6f2ea4c8ef

View file

@ -1,5 +1,6 @@
from __future__ import print_function
import atexit
import collections
import hashlib
import json
@ -283,7 +284,8 @@ class Status(Enum):
SUBMITTED_LOCAL = 3
RUNNING = 4
ERROR = 5
SUCCESS = 6
KILLED = 6
SUCCESS = 7
class StatusData:
@ -310,6 +312,15 @@ class StatusData:
self.env = d.get('env', '')
runningProcesses = {}
@atexit.register
def clearProcessesStatus():
global runningProcesses
for k, v in runningProcesses.iteritems():
v.upgradeStatusTo(Status.KILLED)
class Node(BaseObject):
"""
"""
@ -557,6 +568,8 @@ class Node(BaseObject):
self._subprocess.terminate()
def process(self):
global runningProcesses
runningProcesses[self.uid()] = self
self.upgradeStatusTo(Status.RUNNING)
statThread = stats.StatisticsThread(self)
statThread.start()
@ -582,10 +595,10 @@ class Node(BaseObject):
if self._subprocess.returncode != 0:
with open(self.logFile(), 'r') as logF:
logContent = ''.join(logF.readlines())
self.upgradeStatusTo(Status.ERROR)
raise RuntimeError('Error on node "{}":\nLog:\n{}'.format(self.name, logContent))
except Exception:
except BaseException:
self.upgradeStatusTo(Status.ERROR)
del runningProcesses[self.uid()]
raise
finally:
elapsedTime = time.time() - startTime
@ -596,6 +609,7 @@ class Node(BaseObject):
statThread.join()
self.upgradeStatusTo(Status.SUCCESS)
del runningProcesses[self.uid()]
def endSequence(self):
pass