New notion of local isolated computation for python nodes using meshroom_compute

Reoganization
- BaseNode: is the base class for all nodes
- Node: is now dedicated to python nodes, with the implentation directly
in the process function
- CommandLineNode: dedicated to generate and run external command lines
This commit is contained in:
Fabien Castan 2025-03-24 00:03:45 +01:00
parent faece7efca
commit 727a4d129b
6 changed files with 288 additions and 152 deletions

View file

@ -57,6 +57,7 @@ class Status(Enum):
class ExecMode(Enum):
NONE = auto()
LOCAL = auto()
LOCAL_ISOLATED = auto()
EXTERN = auto()
@ -67,20 +68,13 @@ class StatusData(BaseObject):
def __init__(self, nodeName='', nodeType='', packageName='', packageVersion='', parent: BaseObject = None):
super(StatusData, self).__init__(parent)
self.status = Status.NONE
self.execMode = ExecMode.NONE
self.nodeName = nodeName
self.nodeType = nodeType
self.packageName = packageName
self.packageVersion = packageVersion
self.graph = ''
self.commandLine = None
self.env = None
self.startDateTime = ""
self.endDateTime = ""
self.elapsedTime = 0
self.hostname = ""
self.sessionUid = meshroom.core.sessionUid
self.reset()
self.nodeName: str = nodeName
self.nodeType: str = nodeType
self.packageName: str = packageName
self.packageVersion: str = packageVersion
self.sessionUid: Optional[str] = meshroom.core.sessionUid
self.submitterSessionUid: Optional[str] = None
def merge(self, other):
self.startDateTime = min(self.startDateTime, other.startDateTime)
@ -88,27 +82,44 @@ class StatusData(BaseObject):
self.elapsedTime += other.elapsedTime
def reset(self):
self.status = Status.NONE
self.execMode = ExecMode.NONE
self.graph = ''
self.commandLine = None
self.env = None
self.startDateTime = ""
self.endDateTime = ""
self.elapsedTime = 0
self.hostname = ""
self.sessionUid = meshroom.core.sessionUid
self.nodeName: str = ""
self.nodeType: str = ""
self.packageName: str = ""
self.packageVersion: str = ""
self.resetDynamicValues()
def resetDynamicValues(self):
self.status: Status = Status.NONE
self.execMode: ExecMode = ExecMode.NONE
self.graph = ""
self.commandLine: str = ""
self.env: str = ""
self._startTime: Optional[datetime.datetime] = None
self.startDateTime: str = ""
self.endDateTime: str = ""
self.elapsedTime: float = 0.0
self.hostname: str = ""
def initStartCompute(self):
import platform
self.sessionUid = meshroom.core.sessionUid
self.hostname = platform.node()
self._startTime = time.time()
self.startDateTime = datetime.datetime.now().strftime(self.dateTimeFormatting)
# to get datetime obj: datetime.datetime.strptime(obj, self.dateTimeFormatting)
def initSubmit(self):
''' When submitting a node, we reset the status information to ensure that we do not keep outdated information.
'''
self.resetDynamicValues()
self.sessionUid = None
self.submitterSessionUid = meshroom.core.sessionUid
def initEndCompute(self):
self.sessionUid = meshroom.core.sessionUid
self.endDateTime = datetime.datetime.now().strftime(self.dateTimeFormatting)
if self._startTime != None:
self.elapsedTime = time.time() - self._startTime
@property
def elapsedTimeStr(self):
@ -118,9 +129,12 @@ class StatusData(BaseObject):
d = self.__dict__.copy()
d["elapsedTimeStr"] = self.elapsedTimeStr
# Skip non data attributes from BaseObject
# Skip some attributes (some are from BaseObject)
d.pop("destroyed", None)
d.pop("objectNameChanged", None)
d.pop("_parent", None)
d.pop("_startTime", None)
return d
def fromDict(self, d):
@ -142,8 +156,9 @@ class StatusData(BaseObject):
self.elapsedTime = d.get('elapsedTime', 0)
self.hostname = d.get('hostname', '')
self.sessionUid = d.get('sessionUid', '')
self.submitterSessionUid = d.get('submitterSessionUid', '')
class LogManager:
dateTimeFormatting = '%H:%M:%S'
@ -251,9 +266,9 @@ class NodeChunk(BaseObject):
super(NodeChunk, self).__init__(parent)
self.node = node
self.range = range
self.logManager = LogManager(self)
self._status = StatusData(node.name, node.nodeType, node.packageName, node.packageVersion)
self.statistics = stats.Statistics()
self.logManager: LogManager = LogManager(self)
self._status: StatusData = StatusData(node.name, node.nodeType, node.packageName, node.packageVersion)
self.statistics: stats.Statistics = stats.Statistics()
self.statusFileLastModTime = -1
self.subprocess = None
# Notify update in filepaths when node's internal folder changes
@ -298,6 +313,7 @@ class NodeChunk(BaseObject):
try:
with open(statusFile, 'r') as jsonFile:
statusData = json.load(jsonFile)
# logging.debug(f"updateStatusFromCache({self.node.name}): From status {self.status.status} to {statusData['status']}")
self.status.fromDict(statusData)
self.statusFileLastModTime = os.path.getmtime(statusFile)
except Exception:
@ -343,12 +359,9 @@ class NodeChunk(BaseObject):
renameWritingToFinalPath(statusFilepathWriting, statusFilepath)
def upgradeStatusTo(self, newStatus, execMode=None):
if newStatus.value <= self._status.status.value:
logging.warning("Downgrade status on node '{}' from {} to {}".
format(self.name, self._status.status, newStatus))
if newStatus.value < self._status.status.value:
logging.warning(f"Downgrade status on node '{self.name}' from {self._status.status} to {newStatus}")
if newStatus == Status.SUBMITTED:
self._status = StatusData(self.node.name, self.node.nodeType, self.node.packageName, self.node.packageVersion)
if execMode is not None:
self._status.execMode = execMode
self.execModeNameChanged.emit()
@ -397,15 +410,22 @@ class NodeChunk(BaseObject):
def isFinished(self):
return self._status.status == Status.SUCCESS
def process(self, forceCompute=False):
def process(self, forceCompute=False, inCurrentEnv=False):
if not forceCompute and self._status.status == Status.SUCCESS:
logging.info("Node chunk already computed: {}".format(self.name))
return
# Start the process environment for nodes running in isolation.
# This only happens once, when the node has the SUBMITTED status.
# The sub-process will go through this method again, but the node status will have been set to RUNNING.
if not inCurrentEnv and isinstance(self.node.nodeDesc, desc.Node):
self._processInIsolatedEnvironment()
return
global runningProcesses
runningProcesses[self.name] = self
self._status.initStartCompute()
exceptionStatus = None
startTime = time.time()
executionStatus = None
self.upgradeStatusTo(Status.RUNNING)
self.statThread = stats.StatisticsThread(self)
self.statThread.start()
@ -413,18 +433,18 @@ class NodeChunk(BaseObject):
self.node.nodeDesc.processChunk(self)
# NOTE: this assumes saving the output attributes for each chunk
self.node.saveOutputAttr()
executionStatus = Status.SUCCESS
except Exception:
if self._status.status != Status.STOPPED:
exceptionStatus = Status.ERROR
executionStatus = Status.ERROR
raise
except (KeyboardInterrupt, SystemError, GeneratorExit):
exceptionStatus = Status.STOPPED
executionStatus = Status.STOPPED
raise
finally:
self._status.initEndCompute()
self._status.elapsedTime = time.time() - startTime
if exceptionStatus is not None:
self.upgradeStatusTo(exceptionStatus)
if executionStatus:
self.upgradeStatusTo(executionStatus)
logging.info(" - elapsed time: {}".format(self._status.elapsedTimeStr))
# Ask and wait for the stats thread to stop
self.statThread.stopRequest()
@ -432,19 +452,43 @@ class NodeChunk(BaseObject):
self.statistics = stats.Statistics()
del runningProcesses[self.name]
self.upgradeStatusTo(Status.SUCCESS)
def _processInIsolatedEnvironment(self):
"""Process this node chunk in the isolated environment defined in the environment configuration."""
try:
self._status.initSubmit()
self.upgradeStatusTo(Status.RUNNING, execMode=ExecMode.LOCAL_ISOLATED)
self.node.nodeDesc.processChunkInEnvironment(self)
except:
# status should be already updated by meshroom_compute
self.updateStatusFromCache()
if self._status.status != Status.ERROR:
# If meshroom_compute has crashed or been killed, the status may have not been set to ERROR.
# In this particular case, we enforce it from here.
self.upgradeStatusTo(Status.ERROR)
raise
# Update the chunk status.
self.updateStatusFromCache()
# Update the output attributes, as any chunk may have modified them.
self.node.updateOutputAttr()
def stopProcess(self):
if not self.isExtern():
if self._status.status == Status.RUNNING:
self.upgradeStatusTo(Status.STOPPED)
elif self._status.status == Status.SUBMITTED:
self.upgradeStatusTo(Status.NONE)
if self.isExtern():
return
if self._status.status != Status.RUNNING:
return
self.upgradeStatusTo(Status.STOPPED)
self.node.nodeDesc.stopProcess(self)
def isExtern(self):
return self._status.execMode == ExecMode.EXTERN or (
self._status.execMode == ExecMode.LOCAL and self._status.sessionUid != meshroom.core.sessionUid)
""" The computation is managed externally by another instance of Meshroom, or by meshroom_compute on renderfarm).
In the ambiguous case of an isolated environment, it is considered as local as we can stop it.
"""
if self._status.execMode == ExecMode.LOCAL_ISOLATED:
# It is a local isolated node, check if it is submitted by our current session.
return self._status.submitterSessionUid != meshroom.core.sessionUid
return self._status.sessionUid != meshroom.core.sessionUid
statusChanged = Signal()
status = Property(Variant, lambda self: self._status, notify=statusChanged)
@ -845,7 +889,13 @@ class BaseNode(BaseObject):
Status will be reset to Status.NONE
"""
if self.internalFolder and os.path.exists(self.internalFolder):
shutil.rmtree(self.internalFolder)
try:
shutil.rmtree(self.internalFolder)
except Exception as e:
# We could get some "Device or resource busy" on .nfs file while removing the folder on linux network.
# On windows, some output files may be open for visualization and the removal will fail.
# On both cases, we can ignore it.
logging.warning(f"Failed to remove internal folder: '{self.internalFolder}'. Error: {e}.")
self.updateStatusFromCache()
@Slot(result=str)
@ -1063,6 +1113,7 @@ class BaseNode(BaseObject):
def submit(self, forceCompute=False):
for chunk in self._chunks:
if forceCompute or chunk.status.status != Status.SUCCESS:
chunk._status.initSubmit()
chunk.upgradeStatusTo(Status.SUBMITTED, ExecMode.EXTERN)
def beginSequence(self, forceCompute=False):
@ -1077,9 +1128,9 @@ class BaseNode(BaseObject):
# Invoke the Node Description's pre-process for the Client Node to prepare its processing
self.nodeDesc.preprocess(self)
def process(self, forceCompute=False):
def process(self, forceCompute=False, inCurrentEnv=False):
for chunk in self._chunks:
chunk.process(forceCompute)
chunk.process(forceCompute, inCurrentEnv)
def postprocess(self):
# Invoke the post process on Client Node to execute after the processing on the node is completed
@ -1090,8 +1141,8 @@ class BaseNode(BaseObject):
return
if not self.nodeDesc.hasDynamicOutputAttribute:
return
# logging.warning("updateOutputAttr: {}, status: {}".format(self.name, self.globalStatus))
if self.getGlobalStatus() == Status.SUCCESS:
# logging.warning(f"updateOutputAttr: {self.name}, status: {self.globalStatus}")
if Status.SUCCESS in [c._status.status for c in self.getChunks()]:
self.loadOutputAttr()
else:
self.resetOutputAttr()
@ -1339,19 +1390,33 @@ class BaseNode(BaseObject):
return False
return True
def submitterStatusInThisSession(self):
if not self._chunks:
return False
for chunk in self._chunks:
if chunk.status.submitterSessionUid != meshroom.core.sessionUid:
return False
return True
@Slot(result=bool)
def canBeStopped(self):
# Only locked nodes running in local with the same
# sessionUid as the Meshroom instance can be stopped
return (self.locked and self.getGlobalStatus() == Status.RUNNING and
self.globalExecMode == "LOCAL" and self.statusInThisSession())
# logging.warning(f"[{self.name}] canBeStopped: globalExecMode={self.globalExecMode} globalStatus={self.getGlobalStatus()} statusInThisSession={self.statusInThisSession()}, submitterStatusInThisSession={self.submitterStatusInThisSession()}")
return (self.getGlobalStatus() == Status.RUNNING and
((self.globalExecMode == ExecMode.LOCAL.name and self.statusInThisSession()) or
(self.globalExecMode == ExecMode.LOCAL_ISOLATED.name and self.submitterStatusInThisSession())
))
@Slot(result=bool)
def canBeCanceled(self):
# Only locked nodes submitted in local with the same
# sessionUid as the Meshroom instance can be canceled
return (self.locked and self.getGlobalStatus() == Status.SUBMITTED and
self.globalExecMode == "LOCAL" and self.statusInThisSession())
# logging.warning(f"[{self.name}] canBeCanceled: globalExecMode={self.globalExecMode} globalStatus={self.getGlobalStatus()} statusInThisSession={self.statusInThisSession()}, submitterStatusInThisSession={self.submitterStatusInThisSession()}")
return (self.getGlobalStatus() == Status.SUBMITTED and
((self.globalExecMode == ExecMode.LOCAL.name and self.statusInThisSession()) or
(self.globalExecMode == ExecMode.LOCAL_ISOLATED.name and self.submitterStatusInThisSession())
))
def hasImageOutputAttribute(self):
"""