Rely on the nodeDesc MrNodeType

This commit is contained in:
Fabien Castan 2025-03-26 11:36:24 +00:00
parent 1ca83fc6a9
commit 2f08448310
3 changed files with 55 additions and 33 deletions

View file

@ -85,7 +85,7 @@ if args.node:
if chunk.status.status in submittedStatuses: if chunk.status.status in submittedStatuses:
# Particular case for the local isolated, the node status is set to RUNNING by the submitter directly. # Particular case for the local isolated, the node status is set to RUNNING by the submitter directly.
# We ensure that no other instance has started to compute, by checking that the sessionUid is empty. # We ensure that no other instance has started to compute, by checking that the sessionUid is empty.
if chunk.status.mrNodeType == meshroom.core.MrNodeType.NODE and not chunk.status.sessionUid and chunk.status.submitterSessionUid: if chunk.node.getMrNodeType() == meshroom.core.MrNodeType.NODE and not chunk.status.sessionUid and chunk.status.submitterSessionUid:
continue continue
print(f'Warning: Node is already submitted with status "{chunk.status.status.name}". See file: "{chunk.statusFile}". ExecMode: {chunk.status.execMode.name}, SessionUid: {chunk.status.sessionUid}, submitterSessionUid: {chunk.status.submitterSessionUid}') print(f'Warning: Node is already submitted with status "{chunk.status.status.name}". See file: "{chunk.statusFile}". ExecMode: {chunk.status.execMode.name}, SessionUid: {chunk.status.sessionUid}, submitterSessionUid: {chunk.status.submitterSessionUid}')
# sys.exit(-1) # sys.exit(-1)

View file

@ -16,9 +16,10 @@ from collections import namedtuple
from enum import Enum, auto from enum import Enum, auto
from typing import Callable, Optional from typing import Callable, Optional
import meshroom import meshroom
from meshroom.common import Signal, Variant, Property, BaseObject, Slot, ListModel, DictModel from meshroom.common import Signal, Variant, Property, BaseObject, Slot, ListModel, DictModel
from meshroom.core import desc, stats, hashValue, nodeVersion, Version from meshroom.core import desc, stats, hashValue, nodeVersion, Version, MrNodeType
from meshroom.core.attribute import attributeFactory, ListAttribute, GroupAttribute, Attribute from meshroom.core.attribute import attributeFactory, ListAttribute, GroupAttribute, Attribute
from meshroom.core.exception import NodeUpgradeError, UnknownNodeTypeError from meshroom.core.exception import NodeUpgradeError, UnknownNodeTypeError
@ -65,13 +66,14 @@ class StatusData(BaseObject):
""" """
dateTimeFormatting = '%Y-%m-%d %H:%M:%S.%f' dateTimeFormatting = '%Y-%m-%d %H:%M:%S.%f'
def __init__(self, nodeName='', nodeType='', packageName='', packageVersion='', parent: BaseObject = None): def __init__(self, nodeName='', nodeType='', packageName='', packageVersion='', mrNodeType: MrNodeType = MrNodeType.NONE, parent: BaseObject = None):
super(StatusData, self).__init__(parent) super(StatusData, self).__init__(parent)
self.reset() self.reset()
self.nodeName: str = nodeName self.nodeName: str = nodeName
self.nodeType: str = nodeType self.nodeType: str = nodeType
self.packageName: str = packageName self.packageName: str = packageName
self.packageVersion: str = packageVersion self.packageVersion: str = packageVersion
self.mrNodeType = mrNodeType
self.sessionUid: Optional[str] = meshroom.core.sessionUid self.sessionUid: Optional[str] = meshroom.core.sessionUid
self.submitterSessionUid: Optional[str] = None self.submitterSessionUid: Optional[str] = None
@ -85,7 +87,7 @@ class StatusData(BaseObject):
self.nodeType: str = "" self.nodeType: str = ""
self.packageName: str = "" self.packageName: str = ""
self.packageVersion: str = "" self.packageVersion: str = ""
self.mrNodeType: desc.MrNodeType = desc.MrNodeType.NONE self.mrNodeType: MrNodeType = MrNodeType.NONE
self.resetDynamicValues() self.resetDynamicValues()
def resetDynamicValues(self): def resetDynamicValues(self):
@ -112,16 +114,27 @@ class StatusData(BaseObject):
''' When submitting a node, we reset the status information to ensure that we do not keep outdated information. ''' When submitting a node, we reset the status information to ensure that we do not keep outdated information.
''' '''
self.resetDynamicValues() self.resetDynamicValues()
self.mrNodeType = desc.MrNodeType.NODE # assert(self.mrNodeType == MrNodeType.NODE)
self.sessionUid = None self.sessionUid = None
self.submitterSessionUid = meshroom.core.sessionUid self.submitterSessionUid = meshroom.core.sessionUid
def initSubmit(self): def initExternSubmit(self):
''' When submitting a node, we reset the status information to ensure that we do not keep outdated information. ''' When submitting a node, we reset the status information to ensure that we do not keep outdated information.
''' '''
self.resetDynamicValues() self.resetDynamicValues()
self.sessionUid = None self.sessionUid = None
self.submitterSessionUid = meshroom.core.sessionUid self.submitterSessionUid = meshroom.core.sessionUid
self.status = Status.SUBMITTED
self.execMode = ExecMode.EXTERN
def initLocalSubmit(self):
''' When submitting a node, we reset the status information to ensure that we do not keep outdated information.
'''
self.resetDynamicValues()
self.sessionUid = None
self.submitterSessionUid = meshroom.core.sessionUid
self.status = Status.SUBMITTED
self.execMode = ExecMode.LOCAL
def initEndCompute(self): def initEndCompute(self):
self.sessionUid = meshroom.core.sessionUid self.sessionUid = meshroom.core.sessionUid
@ -152,9 +165,9 @@ class StatusData(BaseObject):
self.execMode = d.get('execMode', ExecMode.NONE) self.execMode = d.get('execMode', ExecMode.NONE)
if not isinstance(self.execMode, ExecMode): if not isinstance(self.execMode, ExecMode):
self.execMode = ExecMode[self.execMode] self.execMode = ExecMode[self.execMode]
self.mrNodeType = d.get('mrNodeType', desc.MrNodeType.NONE) self.mrNodeType = d.get('mrNodeType', MrNodeType.NONE)
if not isinstance(self.mrNodeType, desc.MrNodeType): if not isinstance(self.mrNodeType, MrNodeType):
self.mrNodeType = desc.MrNodeType[self.mrNodeType] self.mrNodeType = MrNodeType[self.mrNodeType]
self.nodeName = d.get('nodeName', '') self.nodeName = d.get('nodeName', '')
self.nodeType = d.get('nodeType', '') self.nodeType = d.get('nodeType', '')
@ -279,7 +292,7 @@ class NodeChunk(BaseObject):
self.node = node self.node = node
self.range = range self.range = range
self.logManager: LogManager = LogManager(self) self.logManager: LogManager = LogManager(self)
self._status: StatusData = StatusData(node.name, node.nodeType, node.packageName, node.packageVersion) self._status: StatusData = StatusData(node.name, node.nodeType, node.packageName, node.packageVersion, node.getMrNodeType())
self.statistics: stats.Statistics = stats.Statistics() self.statistics: stats.Statistics = stats.Statistics()
self.statusFileLastModTime = -1 self.statusFileLastModTime = -1
self.subprocess = None self.subprocess = None
@ -430,7 +443,7 @@ class NodeChunk(BaseObject):
# Start the process environment for nodes running in isolation. # Start the process environment for nodes running in isolation.
# This only happens once, when the node has the SUBMITTED status. # This only happens once, when the node has the SUBMITTED status.
# The sub-process will go through this method again, but the node status will have been set to RUNNING. # The sub-process will go through this method again, but the node status will have been set to RUNNING.
if not inCurrentEnv and isinstance(self.node.nodeDesc, desc.Node): if not inCurrentEnv and self.node.getMrNodeType() == MrNodeType.NODE:
self._processInIsolatedEnvironment() self._processInIsolatedEnvironment()
return return
@ -447,6 +460,7 @@ class NodeChunk(BaseObject):
self.node.saveOutputAttr() self.node.saveOutputAttr()
executionStatus = Status.SUCCESS executionStatus = Status.SUCCESS
except Exception: except Exception:
self.updateStatusFromCache() # check if the status has been updated by another process
if self._status.status != Status.STOPPED: if self._status.status != Status.STOPPED:
executionStatus = Status.ERROR executionStatus = Status.ERROR
raise raise
@ -474,7 +488,7 @@ class NodeChunk(BaseObject):
except: except:
# status should be already updated by meshroom_compute # status should be already updated by meshroom_compute
self.updateStatusFromCache() self.updateStatusFromCache()
if self._status.status != Status.ERROR: if self._status.status not in (Status.ERROR, Status.STOPPED, Status.KILLED):
# If meshroom_compute has crashed or been killed, the status may have not been set to ERROR. # If meshroom_compute has crashed or been killed, the status may have not been set to ERROR.
# In this particular case, we enforce it from here. # In this particular case, we enforce it from here.
self.upgradeStatusTo(Status.ERROR) self.upgradeStatusTo(Status.ERROR)
@ -485,26 +499,24 @@ class NodeChunk(BaseObject):
self.node.updateOutputAttr() self.node.updateOutputAttr()
def stopProcess(self): def stopProcess(self):
# if self.isExtern() or self._status.status != Status.RUNNING: if self.isExtern():
# return raise ValueError("Cannot stop process: node is computed externally (another instance of Meshroom)")
if self._status.status != Status.RUNNING:
raise ValueError(f"Cannot stop process: node is not running (status is: {self._status.status}).")
self.upgradeStatusTo(Status.STOPPED)
self.node.nodeDesc.stopProcess(self) self.node.nodeDesc.stopProcess(self)
# Update the status to get latest information before changing it
self.updateStatusFromCache()
self.upgradeStatusTo(Status.STOPPED)
def isExtern(self): def isExtern(self):
""" The computation is managed externally by another instance of Meshroom. """ The computation is managed externally by another instance of Meshroom.
In the ambiguous case of an isolated environment, it is considered as local as we can stop it. In the ambiguous case of an isolated environment, it is considered as local as we can stop it.
""" """
uid = self._status.submitterSessionUid if self._status.mrNodeType == desc.MrNodeType.NODE else meshroom.core.sessionUid uid = self._status.submitterSessionUid if self.node.getMrNodeType() == MrNodeType.NODE else meshroom.core.sessionUid
return uid != meshroom.core.sessionUid return uid != meshroom.core.sessionUid
# def isIndependantProcess(self):
# if self._status.execMode == ExecMode.EXTERN:
# # Compute is managed by another instance of Meshroom
# return True
# # Compute is using a meshroom_compute subprocess
# return self._status.mrNodeType == desc.MrNodeType.NODE
statusChanged = Signal() statusChanged = Signal()
status = Property(Variant, lambda self: self._status, notify=statusChanged) status = Property(Variant, lambda self: self._status, notify=statusChanged)
statusName = Property(str, statusName.fget, notify=statusChanged) statusName = Property(str, statusName.fget, notify=statusChanged)
@ -588,6 +600,11 @@ class BaseNode(BaseObject):
except KeyError: except KeyError:
raise e raise e
def getMrNodeType(self):
if self.isCompatibilityNode:
return MrNodeType.NONE
return self.nodeDesc.getMrNodeType()
def getName(self): def getName(self):
return self._name return self._name
@ -1128,12 +1145,13 @@ class BaseNode(BaseObject):
def submit(self, forceCompute=False): def submit(self, forceCompute=False):
for chunk in self._chunks: for chunk in self._chunks:
if forceCompute or chunk.status.status != Status.SUCCESS: if forceCompute or chunk.status.status != Status.SUCCESS:
chunk._status.initSubmit() chunk._status.initExternSubmit()
chunk.upgradeStatusTo(Status.SUBMITTED, ExecMode.EXTERN) chunk.upgradeStatusTo(Status.SUBMITTED, ExecMode.EXTERN)
def beginSequence(self, forceCompute=False): def beginSequence(self, forceCompute=False):
for chunk in self._chunks: for chunk in self._chunks:
if forceCompute or (chunk.status.status not in (Status.RUNNING, Status.SUCCESS)): if forceCompute or (chunk.status.status not in (Status.RUNNING, Status.SUCCESS)):
chunk._status.initLocalSubmit()
chunk.upgradeStatusTo(Status.SUBMITTED, ExecMode.LOCAL) chunk.upgradeStatusTo(Status.SUBMITTED, ExecMode.LOCAL)
def processIteration(self, iteration): def processIteration(self, iteration):
@ -1239,6 +1257,8 @@ class BaseNode(BaseObject):
return Status.INPUT return Status.INPUT
if not self._chunks: if not self._chunks:
return Status.NONE return Status.NONE
if len( self._chunks) == 1:
return self._chunks[0].status.status
chunksStatus = [chunk.status.status for chunk in self._chunks] chunksStatus = [chunk.status.status for chunk in self._chunks]
@ -1257,8 +1277,11 @@ class BaseNode(BaseObject):
@Slot(result=StatusData) @Slot(result=StatusData)
def getFusedStatus(self): def getFusedStatus(self):
if not self._chunks:
return StatusData()
if len(self._chunks) == 1:
return self._chunks[0].status
fusedStatus = StatusData() fusedStatus = StatusData()
if self._chunks:
fusedStatus.fromDict(self._chunks[0].status.toDict()) fusedStatus.fromDict(self._chunks[0].status.toDict())
for chunk in self._chunks[1:]: for chunk in self._chunks[1:]:
fusedStatus.merge(chunk.status) fusedStatus.merge(chunk.status)
@ -1417,7 +1440,8 @@ class BaseNode(BaseObject):
if not self._chunks: if not self._chunks:
return False return False
for chunk in self._chunks: for chunk in self._chunks:
uid = chunk.status.submitterSessionUid if chunk.status.mrNodeType == desc.MrNodeType.NODE else chunk.status.sessionUid mrNodeType = chunk.node.getMrNodeType()
uid = chunk.status.submitterSessionUid if mrNodeType == MrNodeType.NODE else chunk.status.sessionUid
if uid != meshroom.core.sessionUid: if uid != meshroom.core.sessionUid:
return False return False
return True return True
@ -1426,7 +1450,6 @@ class BaseNode(BaseObject):
def canBeStopped(self): def canBeStopped(self):
# Only locked nodes running in local with the same # Only locked nodes running in local with the same
# sessionUid as the Meshroom instance can be stopped # sessionUid as the Meshroom instance can be stopped
# logging.warning(f"[{self.name}] canBeStopped: globalExecMode={self.globalExecMode} globalStatus={self.getGlobalStatus()} statusInThisSession={self.statusInThisSession()}, submitterStatusInThisSession={self.submitterStatusInThisSession()}")
return (self.getGlobalStatus() == Status.RUNNING and return (self.getGlobalStatus() == Status.RUNNING and
self.globalExecMode == ExecMode.LOCAL.name and self.globalExecMode == ExecMode.LOCAL.name and
self.initFromThisSession()) self.initFromThisSession())
@ -1435,7 +1458,6 @@ class BaseNode(BaseObject):
def canBeCanceled(self): def canBeCanceled(self):
# Only locked nodes submitted in local with the same # Only locked nodes submitted in local with the same
# sessionUid as the Meshroom instance can be canceled # sessionUid as the Meshroom instance can be canceled
# logging.warning(f"[{self.name}] canBeCanceled: globalExecMode={self.globalExecMode} globalStatus={self.getGlobalStatus()} statusInThisSession={self.statusInThisSession()}, submitterStatusInThisSession={self.submitterStatusInThisSession()}")
return (self.getGlobalStatus() == Status.SUBMITTED and return (self.getGlobalStatus() == Status.SUBMITTED and
self.globalExecMode == ExecMode.LOCAL.name and self.globalExecMode == ExecMode.LOCAL.name and
self.initFromThisSession()) self.initFromThisSession())

View file

@ -187,7 +187,7 @@ class ChunksMonitor(QObject):
# when run locally, status changes are already notified. # when run locally, status changes are already notified.
# Chunks with an ERROR status may be re-submitted externally and should thus still be monitored # Chunks with an ERROR status may be re-submitted externally and should thus still be monitored
if (c.isExtern() and c._status.status in (Status.SUBMITTED, Status.RUNNING, Status.ERROR)) or ( if (c.isExtern() and c._status.status in (Status.SUBMITTED, Status.RUNNING, Status.ERROR)) or (
(c._status.mrNodeType == MrNodeType.NODE) and (c._status.status in (Status.SUBMITTED, Status.RUNNING))): (c.node.getMrNodeType() == MrNodeType.NODE) and (c._status.status in (Status.SUBMITTED, Status.RUNNING))):
files.append(c.statusFile) files.append(c.statusFile)
chunks.append(c) chunks.append(c)
return files, chunks return files, chunks
@ -584,7 +584,7 @@ class UIGraph(QObject):
def updateGraphComputingStatus(self): def updateGraphComputingStatus(self):
# update graph computing status # update graph computing status
computingLocally = any([ computingLocally = any([
((ch.status.submitterSessionUid if ch.status.mrNodeType == MrNodeType.NODE else ch.status.sessionUid) == sessionUid) and ( ((ch.status.submitterSessionUid if ch.node.getMrNodeType() == MrNodeType.NODE else ch.status.sessionUid) == sessionUid) and (
ch.status.status in (Status.RUNNING, Status.SUBMITTED)) ch.status.status in (Status.RUNNING, Status.SUBMITTED))
for ch in self._sortedDFSChunks]) for ch in self._sortedDFSChunks])
submitted = any([ch.status.status == Status.SUBMITTED for ch in self._sortedDFSChunks]) submitted = any([ch.status.status == Status.SUBMITTED for ch in self._sortedDFSChunks])