[core] improve the update of the node status

This commit is contained in:
Fabien Castan 2025-03-28 20:23:01 +01:00
parent 8be90ce362
commit ea3f87b041

View file

@ -80,6 +80,20 @@ class StatusData(BaseObject):
self.resetDynamicValues() self.resetDynamicValues()
def setNode(self, node):
""" Set the node information from one node instance."""
self.nodeName = node.name
self.setNodeType(node)
def setNodeType(self, node):
""" Set the node type and package information from the given node.
We do not set the name in this method as it may vary if there are duplicates.
"""
self.nodeType = node.nodeType
self.packageName = node.packageName
self.packageVersion = node.packageVersion
self.mrNodeType = node.getMrNodeType()
def merge(self, other): def merge(self, other):
self.startDateTime = min(self.startDateTime, other.startDateTime) self.startDateTime = min(self.startDateTime, other.startDateTime)
self.endDateTime = max(self.endDateTime, other.endDateTime) self.endDateTime = max(self.endDateTime, other.endDateTime)
@ -112,12 +126,15 @@ class StatusData(BaseObject):
self._startTime = time.time() self._startTime = time.time()
self.startDateTime = datetime.datetime.now().strftime(self.dateTimeFormatting) self.startDateTime = datetime.datetime.now().strftime(self.dateTimeFormatting)
# to get datetime obj: datetime.datetime.strptime(obj, self.dateTimeFormatting) # to get datetime obj: datetime.datetime.strptime(obj, self.dateTimeFormatting)
self.status = Status.RUNNING
self.execMode = ExecMode.LOCAL
def initIsolatedCompute(self): def initIsolatedCompute(self):
''' When submitting a node, we reset the status information to ensure that we do not keep outdated information. ''' When submitting a node, we reset the status information to ensure that we do not keep outdated information.
''' '''
self.resetDynamicValues() self.resetDynamicValues()
# assert(self.mrNodeType == MrNodeType.NODE) self.initStartCompute()
assert(self.mrNodeType == MrNodeType.NODE)
self.sessionUid = None self.sessionUid = None
self.submitterSessionUid = meshroom.core.sessionUid self.submitterSessionUid = meshroom.core.sessionUid
@ -337,16 +354,19 @@ class NodeChunk(BaseObject):
if not os.path.exists(statusFile): if not os.path.exists(statusFile):
self.statusFileLastModTime = -1 self.statusFileLastModTime = -1
self._status.reset() self._status.reset()
self._status.setNodeType(self.node)
else: else:
try: try:
with open(statusFile, 'r') as jsonFile: with open(statusFile, 'r') as jsonFile:
statusData = json.load(jsonFile) statusData = json.load(jsonFile)
# logging.debug(f"updateStatusFromCache({self.node.name}): From status {self.status.status} to {statusData['status']}") # logging.debug(f"updateStatusFromCache({self.node.name}): From status {self.status.status} to {statusData['status']}")
self.status.fromDict(statusData) self._status.fromDict(statusData)
self.statusFileLastModTime = os.path.getmtime(statusFile) self.statusFileLastModTime = os.path.getmtime(statusFile)
except Exception: except Exception as e:
logging.debug(f"updateStatusFromCache({self.node.name}): Error while loading status file {statusFile}: {e}")
self.statusFileLastModTime = -1 self.statusFileLastModTime = -1
self.status.reset() self._status.reset()
self._status.setNodeType(self.node)
if oldStatus != self.status.status: if oldStatus != self.status.status:
self.statusChanged.emit() self.statusChanged.emit()
@ -386,6 +406,12 @@ class NodeChunk(BaseObject):
json.dump(data, jsonFile, indent=4) json.dump(data, jsonFile, indent=4)
renameWritingToFinalPath(statusFilepathWriting, statusFilepath) renameWritingToFinalPath(statusFilepathWriting, statusFilepath)
def upgradeStatusFile(self):
""" Upgrade node status file based on the current status.
"""
self.saveStatusFile()
self.statusChanged.emit()
def upgradeStatusTo(self, newStatus, execMode=None): def upgradeStatusTo(self, newStatus, execMode=None):
if newStatus.value < self._status.status.value: if newStatus.value < self._status.status.value:
logging.warning(f"Downgrade status on node '{self.name}' from {self._status.status} to {newStatus}") logging.warning(f"Downgrade status on node '{self.name}' from {self._status.status} to {newStatus}")
@ -394,8 +420,7 @@ class NodeChunk(BaseObject):
self._status.execMode = execMode self._status.execMode = execMode
self.execModeNameChanged.emit() self.execModeNameChanged.emit()
self._status.status = newStatus self._status.status = newStatus
self.saveStatusFile() self.upgradeStatusFile()
self.statusChanged.emit()
def updateStatisticsFromCache(self): def updateStatisticsFromCache(self):
""" """
@ -452,9 +477,10 @@ class NodeChunk(BaseObject):
global runningProcesses global runningProcesses
runningProcesses[self.name] = self runningProcesses[self.name] = self
self._status.setNode(self.node)
self._status.initStartCompute() self._status.initStartCompute()
self.upgradeStatusFile()
executionStatus = None executionStatus = None
self.upgradeStatusTo(Status.RUNNING)
self.statThread = stats.StatisticsThread(self) self.statThread = stats.StatisticsThread(self)
self.statThread.start() self.statThread.start()
try: try:
@ -471,7 +497,10 @@ class NodeChunk(BaseObject):
executionStatus = Status.STOPPED executionStatus = Status.STOPPED
raise raise
finally: finally:
self._status.setNode(self.node)
self._status.initEndCompute() self._status.initEndCompute()
self.upgradeStatusFile()
if executionStatus: if executionStatus:
self.upgradeStatusTo(executionStatus) self.upgradeStatusTo(executionStatus)
logging.info(" - elapsed time: {}".format(self._status.elapsedTimeStr)) logging.info(" - elapsed time: {}".format(self._status.elapsedTimeStr))
@ -485,8 +514,10 @@ class NodeChunk(BaseObject):
def _processInIsolatedEnvironment(self): def _processInIsolatedEnvironment(self):
"""Process this node chunk in the isolated environment defined in the environment configuration.""" """Process this node chunk in the isolated environment defined in the environment configuration."""
try: try:
self._status.setNode(self.node)
self._status.initIsolatedCompute() self._status.initIsolatedCompute()
self.upgradeStatusTo(Status.RUNNING, execMode=ExecMode.LOCAL) self.upgradeStatusFile()
self.node.nodeDesc.processChunkInEnvironment(self) self.node.nodeDesc.processChunkInEnvironment(self)
except: except:
# status should be already updated by meshroom_compute # status should be already updated by meshroom_compute
@ -1151,14 +1182,16 @@ class BaseNode(BaseObject):
def submit(self, forceCompute=False): def submit(self, forceCompute=False):
for chunk in self._chunks: for chunk in self._chunks:
if forceCompute or chunk.status.status != Status.SUCCESS: if forceCompute or chunk.status.status != Status.SUCCESS:
chunk._status.setNode(self)
chunk._status.initExternSubmit() chunk._status.initExternSubmit()
chunk.upgradeStatusTo(Status.SUBMITTED, ExecMode.EXTERN) chunk.upgradeStatusFile()
def beginSequence(self, forceCompute=False): def beginSequence(self, forceCompute=False):
for chunk in self._chunks: for chunk in self._chunks:
if forceCompute or (chunk.status.status not in (Status.RUNNING, Status.SUCCESS)): if forceCompute or (chunk.status.status not in (Status.RUNNING, Status.SUCCESS)):
chunk._status.setNode(self)
chunk._status.initLocalSubmit() chunk._status.initLocalSubmit()
chunk.upgradeStatusTo(Status.SUBMITTED, ExecMode.LOCAL) chunk.upgradeStatusFile()
def processIteration(self, iteration): def processIteration(self, iteration):
self._chunks[iteration].process() self._chunks[iteration].process()