[ui] NodeEditor: refactor ChunksList and add global stats

This commit is contained in:
Fabien Castan 2021-01-22 09:40:07 +01:00
parent bd5d447d12
commit 831443c29d
9 changed files with 415 additions and 307 deletions

View file

@ -58,12 +58,12 @@ class ExecMode(Enum):
EXTERN = 2
class StatusData:
class StatusData(BaseObject):
"""
"""
dateTimeFormatting = '%Y-%m-%d %H:%M:%S.%f'
def __init__(self, nodeName, nodeType, packageName, packageVersion):
def __init__(self, nodeName='', nodeType='', packageName='', packageVersion=''):
self.status = Status.NONE
self.execMode = ExecMode.NONE
self.nodeName = nodeName
@ -79,6 +79,11 @@ class StatusData:
self.hostname = ""
self.sessionUid = meshroom.core.sessionUid
def merge(self, other):
self.startDateTime = min(self.startDateTime, other.startDateTime)
self.endDateTime = max(self.endDateTime, other.endDateTime)
self.elapsedTime += other.elapsedTime
def reset(self):
self.status = Status.NONE
self.execMode = ExecMode.NONE
@ -112,8 +117,12 @@ class StatusData:
return d
def fromDict(self, d):
self.status = getattr(Status, d.get('status', ''), Status.NONE)
self.execMode = getattr(ExecMode, d.get('execMode', ''), ExecMode.NONE)
self.status = d.get('status', Status.NONE)
if not isinstance(self.status, Status):
self.status = Status[self.status]
self.execMode = d.get('execMode', ExecMode.NONE)
if not isinstance(self.execMode, ExecMode):
self.execMode = ExecMode[self.execMode]
self.nodeName = d.get('nodeName', '')
self.nodeType = d.get('nodeType', '')
self.packageName = d.get('packageName', '')
@ -236,7 +245,7 @@ class NodeChunk(BaseObject):
self.node = node
self.range = range
self.logManager = LogManager(self)
self.status = StatusData(node.name, node.nodeType, node.packageName, node.packageVersion)
self._status = StatusData(node.name, node.nodeType, node.packageName, node.packageVersion)
self.statistics = stats.Statistics()
self.statusFileLastModTime = -1
self._subprocess = None
@ -258,7 +267,7 @@ class NodeChunk(BaseObject):
@property
def statusName(self):
return self.status.status.name
return self._status.status.name
@property
def logger(self):
@ -266,24 +275,24 @@ class NodeChunk(BaseObject):
@property
def execModeName(self):
return self.status.execMode.name
return self._status.execMode.name
def updateStatusFromCache(self):
"""
Update node status based on status file content/existence.
"""
statusFile = self.statusFile
oldStatus = self.status.status
oldStatus = self._status.status
# No status file => reset status to Status.None
if not os.path.exists(statusFile):
self.statusFileLastModTime = -1
self.status.reset()
self._status.reset()
else:
with open(statusFile, 'r') as jsonFile:
statusData = json.load(jsonFile)
self.status.fromDict(statusData)
self._status.fromDict(statusData)
self.statusFileLastModTime = os.path.getmtime(statusFile)
if oldStatus != self.status.status:
if oldStatus != self._status.status:
self.statusChanged.emit()
@property
@ -311,7 +320,7 @@ class NodeChunk(BaseObject):
"""
Write node status on disk.
"""
data = self.status.toDict()
data = self._status.toDict()
statusFilepath = self.statusFile
folder = os.path.dirname(statusFilepath)
if not os.path.exists(folder):
@ -322,16 +331,16 @@ class NodeChunk(BaseObject):
renameWritingToFinalPath(statusFilepathWriting, statusFilepath)
def upgradeStatusTo(self, newStatus, execMode=None):
if newStatus.value <= self.status.status.value:
print('WARNING: downgrade status on node "{}" from {} to {}'.format(self.name, self.status.status,
newStatus))
if newStatus.value <= self._status.status.value:
logging.warning('Downgrade status on node "{}" from {} to {}'.format(self.name, self._status.status,
newStatus))
if newStatus == Status.SUBMITTED:
self.status = StatusData(self.node.name, self.node.nodeType, self.node.packageName, self.node.packageVersion)
self._status = StatusData(self.node.name, self.node.nodeType, self.node.packageName, self.node.packageVersion)
if execMode is not None:
self.status.execMode = execMode
self._status.execMode = execMode
self.execModeNameChanged.emit()
self.status.status = newStatus
self._status.status = newStatus
self.saveStatusFile()
self.statusChanged.emit()
@ -360,24 +369,24 @@ class NodeChunk(BaseObject):
renameWritingToFinalPath(statisticsFilepathWriting, statisticsFilepath)
def isAlreadySubmitted(self):
return self.status.status in (Status.SUBMITTED, Status.RUNNING)
return self._status.status in (Status.SUBMITTED, Status.RUNNING)
def isAlreadySubmittedOrFinished(self):
return self.status.status in (Status.SUBMITTED, Status.RUNNING, Status.SUCCESS)
return self._status.status in (Status.SUBMITTED, Status.RUNNING, Status.SUCCESS)
def isFinishedOrRunning(self):
return self.status.status in (Status.SUCCESS, Status.RUNNING)
return self._status.status in (Status.SUCCESS, Status.RUNNING)
def isStopped(self):
return self.status.status == Status.STOPPED
return self._status.status == Status.STOPPED
def process(self, forceCompute=False):
if not forceCompute and self.status.status == Status.SUCCESS:
print("Node chunk already computed:", self.name)
if not forceCompute and self._status.status == Status.SUCCESS:
logging.info("Node chunk already computed: {}".format(self.name))
return
global runningProcesses
runningProcesses[self.name] = self
self.status.initStartCompute()
self._status.initStartCompute()
startTime = time.time()
self.upgradeStatusTo(Status.RUNNING)
self.statThread = stats.StatisticsThread(self)
@ -385,16 +394,16 @@ class NodeChunk(BaseObject):
try:
self.node.nodeDesc.processChunk(self)
except Exception as e:
if self.status.status != Status.STOPPED:
if self._status.status != Status.STOPPED:
self.upgradeStatusTo(Status.ERROR)
raise
except (KeyboardInterrupt, SystemError, GeneratorExit) as e:
self.upgradeStatusTo(Status.STOPPED)
raise
finally:
self.status.initEndCompute()
self.status.elapsedTime = time.time() - startTime
print(' - elapsed time:', self.status.elapsedTimeStr)
self._status.initEndCompute()
self._status.elapsedTime = time.time() - startTime
logging.info(' - elapsed time: {}'.format(self._status.elapsedTimeStr))
# ask and wait for the stats thread to stop
self.statThread.stopRequest()
self.statThread.join()
@ -408,9 +417,10 @@ class NodeChunk(BaseObject):
self.node.nodeDesc.stopProcess(self)
def isExtern(self):
return self.status.execMode == ExecMode.EXTERN
return self._status.execMode == ExecMode.EXTERN
statusChanged = Signal()
status = Property(Variant, lambda self: self._status, notify=statusChanged)
statusName = Property(str, statusName.fget, notify=statusChanged)
execModeNameChanged = Signal()
execModeName = Property(str, execModeName.fget, notify=execModeNameChanged)
@ -422,7 +432,7 @@ class NodeChunk(BaseObject):
statisticsFile = Property(str, statisticsFile.fget, notify=nodeFolderChanged)
nodeName = Property(str, lambda self: self.node.name, constant=True)
statusNodeName = Property(str, lambda self: self.status.nodeName, constant=True)
statusNodeName = Property(str, lambda self: self._status.nodeName, constant=True)
# simple structure for storing node position
@ -837,6 +847,24 @@ class BaseNode(BaseObject):
return Status.NONE
@Slot(result=StatusData)
def getFusedStatus(self):
fusedStatus = StatusData()
if self._chunks:
fusedStatus.fromDict(self._chunks[0].status.toDict())
for chunk in self._chunks[1:]:
fusedStatus.merge(chunk.status)
fusedStatus.status = self.getGlobalStatus()
return fusedStatus
@Slot(result=StatusData)
def getRecursiveFusedStatus(self):
fusedStatus = self.getFusedStatus()
nodes = self.getInputNodes(recursive=True, dependenciesOnly=True)
for node in nodes:
fusedStatus.merge(node.fusedStatus)
return fusedStatus
@property
def globalExecMode(self):
return self._chunks.at(0).execModeName
@ -1000,6 +1028,10 @@ class BaseNode(BaseObject):
size = Property(int, getSize, notify=sizeChanged)
globalStatusChanged = Signal()
globalStatus = Property(str, lambda self: self.getGlobalStatus().name, notify=globalStatusChanged)
fusedStatus = Property(StatusData, getFusedStatus, notify=globalStatusChanged)
elapsedTime = Property(float, lambda self: self.getFusedStatus().elapsedTime, notify=globalStatusChanged)
recursiveElapsedTime = Property(float, lambda self: self.getRecursiveFusedStatus().elapsedTime, notify=globalStatusChanged)
globalExecModeChanged = Signal()
globalExecMode = Property(str, globalExecMode.fget, notify=globalExecModeChanged)
isComputed = Property(bool, _isComputed, notify=globalStatusChanged)