[core] Node: Clean-up code

This commit is contained in:
Candice Bentéjac 2025-04-28 18:24:08 +02:00
parent 081d38f78d
commit 8300626ef5

View file

@ -56,6 +56,8 @@ class Status(Enum):
class ExecMode(Enum):
"""
"""
NONE = auto()
LOCAL = auto()
EXTERN = auto()
@ -66,7 +68,8 @@ class StatusData(BaseObject):
"""
dateTimeFormatting = '%Y-%m-%d %H:%M:%S.%f'
def __init__(self, nodeName='', nodeType='', packageName='', packageVersion='', mrNodeType: MrNodeType = MrNodeType.NONE, parent: BaseObject = None):
def __init__(self, nodeName='', nodeType='', packageName='', packageVersion='',
mrNodeType: MrNodeType = MrNodeType.NONE, parent: BaseObject = None):
super(StatusData, self).__init__(parent)
self.nodeName: str = nodeName
@ -81,12 +84,13 @@ class StatusData(BaseObject):
self.resetDynamicValues()
def setNode(self, node):
""" Set the node information from one node instance."""
""" Set the node information from one node instance. """
self.nodeName = node.name
self.setNodeType(node)
def setNodeType(self, node):
""" Set the node type and package information from the given node.
"""
Set the node type and package information from the given node.
We do not set the name in this method as it may vary if there are duplicates.
"""
self.nodeType = node.nodeType
@ -132,17 +136,21 @@ class StatusData(BaseObject):
# we don't want to modify the execMode set from the submit.
def initIsolatedCompute(self):
''' When submitting a node, we reset the status information to ensure that we do not keep outdated information.
'''
"""
When submitting a node, we reset the status information to ensure that we do not keep
outdated information.
"""
self.resetDynamicValues()
self.initStartCompute()
assert(self.mrNodeType == MrNodeType.NODE)
assert self.mrNodeType == MrNodeType.NODE
self.sessionUid = None
self.submitterSessionUid = meshroom.core.sessionUid
def initExternSubmit(self):
''' When submitting a node, we reset the status information to ensure that we do not keep outdated information.
'''
"""
When submitting a node, we reset the status information to ensure that we do not keep
outdated information.
"""
self.resetDynamicValues()
self.sessionUid = None
self.submitterSessionUid = meshroom.core.sessionUid
@ -150,8 +158,10 @@ class StatusData(BaseObject):
self.execMode = ExecMode.EXTERN
def initLocalSubmit(self):
''' When submitting a node, we reset the status information to ensure that we do not keep outdated information.
'''
"""
When submitting a node, we reset the status information to ensure that we do not keep
outdated information.
"""
self.resetDynamicValues()
self.sessionUid = None
self.submitterSessionUid = meshroom.core.sessionUid
@ -181,31 +191,31 @@ class StatusData(BaseObject):
return d
def fromDict(self, d):
self.status = d.get('status', Status.NONE)
self.status = d.get("status", Status.NONE)
if not isinstance(self.status, Status):
self.status = Status[self.status]
self.execMode = d.get('execMode', ExecMode.NONE)
self.execMode = d.get("execMode", ExecMode.NONE)
if not isinstance(self.execMode, ExecMode):
self.execMode = ExecMode[self.execMode]
self.mrNodeType = d.get('mrNodeType', MrNodeType.NONE)
self.mrNodeType = d.get("mrNodeType", MrNodeType.NONE)
if not isinstance(self.mrNodeType, MrNodeType):
self.mrNodeType = MrNodeType[self.mrNodeType]
self.nodeName = d.get('nodeName', '')
self.nodeType = d.get('nodeType', '')
self.packageName = d.get('packageName', '')
self.packageVersion = d.get('packageVersion', '')
self.graph = d.get('graph', '')
self.commandLine = d.get('commandLine', '')
self.env = d.get('env', '')
self.startDateTime = d.get('startDateTime', '')
self.endDateTime = d.get('endDateTime', '')
self.elapsedTime = d.get('elapsedTime', 0)
self.hostname = d.get('hostname', '')
self.sessionUid = d.get('sessionUid', '')
self.submitterSessionUid = d.get('submitterSessionUid', '')
self.nodeName = d.get("nodeName", "")
self.nodeType = d.get("nodeType", "")
self.packageName = d.get("packageName", "")
self.packageVersion = d.get("packageVersion", "")
self.graph = d.get("graph", "")
self.commandLine = d.get("commandLine", "")
self.env = d.get("env", "")
self.startDateTime = d.get("startDateTime", "")
self.endDateTime = d.get("endDateTime", "")
self.elapsedTime = d.get("elapsedTime", 0)
self.hostname = d.get("hostname", "")
self.sessionUid = d.get("sessionUid", "")
self.submitterSessionUid = d.get("submitterSessionUid", "")
class LogManager:
dateTimeFormatting = '%H:%M:%S'
@ -223,7 +233,8 @@ class LogManager:
for handler in self.logger.handlers[:]:
self.logger.removeHandler(handler)
handler = logging.FileHandler(self.chunk.logFile)
formatter = self.Formatter('[%(asctime)s.%(msecs)03d][%(levelname)s] %(message)s', self.dateTimeFormatting)
formatter = self.Formatter('[%(asctime)s.%(msecs)03d][%(levelname)s] %(message)s',
self.dateTimeFormatting)
handler.setFormatter(formatter)
self.logger.addHandler(handler)
@ -284,15 +295,15 @@ class LogManager:
self.progressBar = False
def textToLevel(self, text):
if text == 'critical':
if text == "critical":
return logging.CRITICAL
elif text == 'error':
elif text == "error":
return logging.ERROR
elif text == 'warning':
elif text == "warning":
return logging.WARNING
elif text == 'info':
elif text == "info":
return logging.INFO
elif text == 'debug':
elif text == "debug":
return logging.DEBUG
else:
return logging.NOTSET
@ -313,7 +324,8 @@ class NodeChunk(BaseObject):
self.node = node
self.range = range
self.logManager: LogManager = LogManager(self)
self._status: StatusData = StatusData(node.name, node.nodeType, node.packageName, node.packageVersion, node.getMrNodeType())
self._status: StatusData = StatusData(node.name, node.nodeType, node.packageName,
node.packageVersion, node.getMrNodeType())
self.statistics: stats.Statistics = stats.Statistics()
self.statusFileLastModTime = -1
self.subprocess = None
@ -375,21 +387,24 @@ class NodeChunk(BaseObject):
if self.range.blockSize == 0:
return os.path.join(self.node.graph.cacheDir, self.node.internalFolder, "status")
else:
return os.path.join(self.node.graph.cacheDir, self.node.internalFolder, str(self.index) + ".status")
return os.path.join(self.node.graph.cacheDir, self.node.internalFolder,
str(self.index) + ".status")
@property
def statisticsFile(self):
if self.range.blockSize == 0:
return os.path.join(self.node.graph.cacheDir, self.node.internalFolder, "statistics")
else:
return os.path.join(self.node.graph.cacheDir, self.node.internalFolder, str(self.index) + ".statistics")
return os.path.join(self.node.graph.cacheDir, self.node.internalFolder,
str(self.index) + ".statistics")
@property
def logFile(self):
if self.range.blockSize == 0:
return os.path.join(self.node.graph.cacheDir, self.node.internalFolder, "log")
else:
return os.path.join(self.node.graph.cacheDir, self.node.internalFolder, str(self.index) + ".log")
return os.path.join(self.node.graph.cacheDir, self.node.internalFolder,
str(self.index) + ".log")
def saveStatusFile(self):
"""
@ -406,7 +421,8 @@ class NodeChunk(BaseObject):
renameWritingToFinalPath(statusFilepathWriting, statusFilepath)
def upgradeStatusFile(self):
""" Upgrade node status file based on the current status.
"""
Upgrade node status file based on the current status.
"""
self.saveStatusFile()
self.statusChanged.emit()
@ -465,10 +481,11 @@ class NodeChunk(BaseObject):
if not forceCompute and self._status.status == Status.SUCCESS:
logging.info("Node chunk already computed: {}".format(self.name))
return
# Start the process environment for nodes running in isolation.
# This only happens once, when the node has the SUBMITTED status.
# The sub-process will go through this method again, but the node status will have been set to RUNNING.
# The sub-process will go through this method again, but the node status will
# have been set to RUNNING.
if not inCurrentEnv and self.node.getMrNodeType() == MrNodeType.NODE:
self._processInIsolatedEnvironment()
return
@ -541,12 +558,14 @@ class NodeChunk(BaseObject):
self.updateStatusFromCache()
if self._status.status != Status.RUNNING:
# When we stop the process of a node with multiple chunks, the Node function will call the stop function of each chunk.
# So, the chunck status could be SUBMITTED, RUNNING or ERROR.
# When we stop the process of a node with multiple chunks, the Node function will call
# the stop function of each chunk.
# So, the chunk status could be SUBMITTED, RUNNING or ERROR.
if self._status.status is Status.SUBMITTED:
self.upgradeStatusTo(Status.NONE)
elif self._status.status in (Status.ERROR, Status.STOPPED, Status.KILLED, Status.SUCCESS, Status.NONE):
elif self._status.status in (Status.ERROR, Status.STOPPED, Status.KILLED,
Status.SUCCESS, Status.NONE):
# Nothing to do, the computation is already stopped.
pass
else:
@ -560,8 +579,10 @@ class NodeChunk(BaseObject):
self.upgradeStatusTo(Status.STOPPED)
def isExtern(self):
""" The computation is managed externally by another instance of Meshroom.
In the ambiguous case of an isolated environment, it is considered as local as we can stop it (if it is run from the current Meshroom instance).
"""
The computation is managed externally by another instance of Meshroom.
In the ambiguous case of an isolated environment, it is considered as local as we can stop
it (if it is run from the current Meshroom instance).
"""
if self._status.execMode == ExecMode.EXTERN:
return True
@ -603,7 +624,8 @@ class BaseNode(BaseObject):
# i.e: a.b, a[0], a[0].b.c[1]
attributeRE = re.compile(r'\.?(?P<name>\w+)(?:\[(?P<index>\d+)\])?')
def __init__(self, nodeType: str, position: Position = None, parent: BaseObject = None, uid: str = None, **kwargs):
def __init__(self, nodeType: str, position: Position = None, parent: BaseObject = None,
uid: str = None, **kwargs):
"""
Create a new Node instance based on the given node description.
Any other keyword argument will be used to initialize this node's attributes.
@ -656,7 +678,8 @@ class BaseNode(BaseObject):
raise e
def getMrNodeType(self):
# In compatibility mode, we may or may not have access to the nodeDesc and its information about the node type.
# In compatibility mode, we may or may not have access to the nodeDesc and its information
# about the node type.
if self.nodeDesc is None:
return MrNodeType.NONE
return self.nodeDesc.getMrNodeType()
@ -826,22 +849,25 @@ class BaseNode(BaseObject):
return os.path.join(self.graph.cacheDir, self.internalFolder, 'values')
def getInputNodes(self, recursive, dependenciesOnly):
return self.graph.getInputNodes(self, recursive=recursive, dependenciesOnly=dependenciesOnly)
return self.graph.getInputNodes(self, recursive=recursive,
dependenciesOnly=dependenciesOnly)
def getOutputNodes(self, recursive, dependenciesOnly):
return self.graph.getOutputNodes(self, recursive=recursive, dependenciesOnly=dependenciesOnly)
return self.graph.getOutputNodes(self, recursive=recursive,
dependenciesOnly=dependenciesOnly)
def toDict(self):
pass
def _computeUid(self):
""" Compute node UID by combining associated attributes' UIDs. """
# If there is no invalidating attribute, then the computation of the UID should not go through as
# it will only include the node type
# If there is no invalidating attribute, then the computation of the UID should not
# go through as it will only include the node type
if not self.invalidatingAttributes:
return
# UID is computed by hashing the sorted list of tuple (name, value) of all attributes impacting this UID
# UID is computed by hashing the sorted list of tuple (name, value) of all attributes
# impacting this UID
uidAttributes = []
for attr in self.invalidatingAttributes:
if not attr.enabled:
@ -862,6 +888,10 @@ class BaseNode(BaseObject):
self._uid = hashValue(uidAttributes)
def _buildCmdVars(self):
"""
Generate command variables using input attributes and resolved output attributes
names and values.
"""
def _buildAttributeCmdVars(cmdVars, name, attr):
if attr.enabled:
group = attr.attributeDesc.group(attr.node) \
@ -886,7 +916,6 @@ class BaseNode(BaseObject):
for v in attr._value:
_buildAttributeCmdVars(cmdVars, v.name, v)
""" Generate command variables using input attributes and resolved output attributes names and values. """
self._cmdVars["uid"] = self._uid
self._cmdVars["nodeCacheFolder"] = self.internalFolder
self._cmdVars["nodeSourceCodeFolder"] = self.sourceCodeFolder
@ -901,8 +930,9 @@ class BaseNode(BaseObject):
cmdVarsNoCache = self._cmdVars.copy()
cmdVarsNoCache["cache"] = ""
# Use "self._internalFolder" instead of "self.internalFolder" because we do not want it to be
# resolved with the {cache} information ("self.internalFolder" resolves "self._internalFolder")
# Use "self._internalFolder" instead of "self.internalFolder" because we do not want it to
# be resolved with the {cache} information ("self.internalFolder" resolves
# "self._internalFolder")
cmdVarsNoCache["nodeCacheFolder"] = self._internalFolder.format(**cmdVarsNoCache)
# Evaluate output params
@ -919,8 +949,8 @@ class BaseNode(BaseObject):
try:
defaultValue = attr.defaultValue()
except AttributeError:
# If we load an old scene, the lambda associated to the 'value' could try to access other
# params that could not exist yet
# If we load an old scene, the lambda associated to the 'value' could try to
# access other params that could not exist yet
logging.warning('Invalid lambda evaluation for "{nodeName}.{attrName}"'.
format(nodeName=self.name, attrName=attr.name))
if defaultValue is not None:
@ -945,8 +975,8 @@ class BaseNode(BaseObject):
self._cmdVars[name + 'Value'] = attr.getValueStr(withQuotes=False)
if v:
self._cmdVars[attr.attributeDesc.group] = self._cmdVars.get(attr.attributeDesc.group, '') + \
' ' + self._cmdVars[name]
self._cmdVars[attr.attributeDesc.group] = \
self._cmdVars.get(attr.attributeDesc.group, '') + ' ' + self._cmdVars[name]
@property
def isParallelized(self):
@ -958,7 +988,7 @@ class BaseNode(BaseObject):
def hasStatus(self, status: Status):
if not self._chunks:
return (status == Status.INPUT)
return status == Status.INPUT
for chunk in self._chunks:
if chunk.status.status != status:
return False
@ -973,7 +1003,8 @@ class BaseNode(BaseObject):
""" Return True if this node type is computable, False otherwise.
A computable node type can be in a context that does not allow computation.
"""
# Ambiguous case for NONE, which could be used for compatibility nodes if we don't have any information about the node descriptor.
# Ambiguous case for NONE, which could be used for compatibility nodes if we don't have
# any information about the node descriptor.
return self.getMrNodeType() != MrNodeType.INPUT
def clearData(self):
@ -984,9 +1015,11 @@ class BaseNode(BaseObject):
try:
shutil.rmtree(self.internalFolder)
except Exception as e:
# We could get some "Device or resource busy" on .nfs file while removing the folder on linux network.
# On windows, some output files may be open for visualization and the removal will fail.
# On both cases, we can ignore it.
# We could get some "Device or resource busy" on .nfs file while removing the folder
# on Linux network.
# On Windows, some output files may be open for visualization and the removal will
# fail.
# In both cases, we can ignore it.
logging.warning(f"Failed to remove internal folder: '{self.internalFolder}'. Error: {e}.")
self.updateStatusFromCache()
@ -1011,7 +1044,10 @@ class BaseNode(BaseObject):
@Slot(result=bool)
def isSubmittedOrRunning(self):
""" Return True if all chunks are at least submitted and there is one running chunk, False otherwise. """
"""
Return True if all chunks are at least submitted and there is one running chunk,
False otherwise.
"""
if not self.isAlreadySubmittedOrFinished():
return False
for chunk in self._chunks:
@ -1026,7 +1062,10 @@ class BaseNode(BaseObject):
@Slot(result=bool)
def isFinishedOrRunning(self):
""" Return True if all chunks of this Node is either finished or running, False otherwise. """
"""
Return True if all chunks of this Node is either finished or running, False
otherwise.
"""
return all(chunk.isFinishedOrRunning() for chunk in self._chunks)
@Slot(result=bool)
@ -1038,12 +1077,15 @@ class BaseNode(BaseObject):
return [ch for ch in self._chunks if ch.isAlreadySubmitted()]
def isExtern(self):
""" Return True if at least one chunk of this Node has an external execution mode, False otherwise.
"""
Return True if at least one chunk of this Node has an external execution mode,
False otherwise.
It is not enough to check whether the first chunk's execution mode is external, because computations
may have been started locally, interrupted, and restarted externally. In that case, if the first
chunk has completed locally before the computations were interrupted, its execution mode will always
be local, even if computations resume externally.
It is not enough to check whether the first chunk's execution mode is external,
because computations may have been started locally, interrupted, and restarted externally.
In that case, if the first chunk has completed locally before the computations were
interrupted, its execution mode will always be local, even if computations resume
externally.
"""
if len(self._chunks) == 0:
return False
@ -1051,8 +1093,9 @@ class BaseNode(BaseObject):
@Slot()
def clearSubmittedChunks(self):
""" Reset all submitted chunks to Status.NONE. This method should be used to clear inconsistent status
if a computation failed without informing the graph.
"""
Reset all submitted chunks to Status.NONE. This method should be used to clear
inconsistent status if a computation failed without informing the graph.
Warnings:
This must be used with caution. This could lead to inconsistent node status
@ -1069,9 +1112,7 @@ class BaseNode(BaseObject):
chunk.upgradeStatusTo(Status.NONE, ExecMode.NONE)
def upgradeStatusTo(self, newStatus):
"""
Upgrade node to the given status and save it on disk.
"""
""" Upgrade node to the given status and save it on disk. """
for chunk in self._chunks:
chunk.upgradeStatusTo(newStatus)
@ -1083,7 +1124,7 @@ class BaseNode(BaseObject):
pass
def _getAttributeChangedCallback(self, attr: Attribute) -> Optional[Callable]:
"""Get the node descriptor-defined value changed callback associated to `attr` if any."""
""" Get the node descriptor-defined value changed callback associated to `attr` if any. """
# Callbacks cannot be defined on nested attributes.
if attr.root is not None:
@ -1097,7 +1138,8 @@ class BaseNode(BaseObject):
def _onAttributeChanged(self, attr: Attribute):
"""
When an attribute value has changed, a specific function can be defined in the descriptor and be called.
When an attribute value has changed, a specific function can be defined in the descriptor
and be called.
Args:
attr: The Attribute that has changed.
@ -1116,7 +1158,7 @@ class BaseNode(BaseObject):
if attr.value is None:
# Discard dynamic values depending on the graph processing.
return
if self.graph and self.graph.isLoading:
# Do not trigger attribute callbacks during the graph loading.
return
@ -1132,7 +1174,9 @@ class BaseNode(BaseObject):
edge.dst.valueChanged.emit()
def onAttributeClicked(self, attr):
""" When an attribute is clicked, a specific function can be defined in the descriptor and be called.
"""
When an attribute is clicked, a specific function can be defined in the descriptor
and be called.
Args:
attr (Attribute): attribute that has been clicked
@ -1230,7 +1274,8 @@ class BaseNode(BaseObject):
chunk.process(forceCompute, inCurrentEnv)
def postprocess(self):
# Invoke the post process on Client Node to execute after the processing on the node is completed
# Invoke the post process on Client Node to execute after the processing on the
# node is completed
self.nodeDesc.postprocess(self)
def updateOutputAttr(self):
@ -1499,7 +1544,7 @@ class BaseNode(BaseObject):
if chunk.status.submitterSessionUid != meshroom.core.sessionUid:
return False
return True
def initFromThisSession(self) -> bool:
if len(self._chunks) == 0:
return False
@ -1507,7 +1552,7 @@ class BaseNode(BaseObject):
if meshroom.core.sessionUid not in (chunk.status.sessionUid, chunk.status.submitterSessionUid):
return False
return True
def isMainNode(self) -> bool:
""" In case of a node with duplicates, we check that the node is the one driving the computation. """
if len(self._chunks) == 0:
@ -1546,8 +1591,8 @@ class BaseNode(BaseObject):
def hasImageOutputAttribute(self) -> bool:
"""
Return True if at least one attribute has the 'image' semantic (and can thus be loaded in the 2D Viewer),
False otherwise.
Return True if at least one attribute has the 'image' semantic (and can thus be loaded in
the 2D Viewer), False otherwise.
"""
for attr in self._attributes:
if not attr.enabled or not attr.isOutput:
@ -1558,8 +1603,8 @@ class BaseNode(BaseObject):
def hasSequenceOutputAttribute(self) -> bool:
"""
Return True if at least one attribute has the 'sequence' semantic (and can thus be loaded in the 2D Viewer),
False otherwise.
Return True if at least one attribute has the 'sequence' semantic (and can thus be loaded in
the 2D Viewer), False otherwise.
"""
for attr in self._attributes:
if not attr.enabled or not attr.isOutput:
@ -1570,7 +1615,8 @@ class BaseNode(BaseObject):
def has3DOutputAttribute(self):
"""
Return True if at least one attribute is a File that can be loaded in the 3D Viewer, False otherwise.
Return True if at least one attribute is a File that can be loaded in the 3D Viewer,
False otherwise.
"""
# List of supported extensions, taken from Viewer3DSettings
supportedExts = ['.obj', '.stl', '.fbx', '.gltf', '.abc', '.ply']
@ -1654,14 +1700,16 @@ class Node(BaseNode):
self._sourceCodeFolder = self.nodeDesc.sourceCodeFolder
for attrDesc in self.nodeDesc.inputs:
self._attributes.add(attributeFactory(attrDesc, kwargs.get(attrDesc.name, None), isOutput=False, node=self))
self._attributes.add(attributeFactory(attrDesc, kwargs.get(attrDesc.name, None),
isOutput=False, node=self))
for attrDesc in self.nodeDesc.outputs:
self._attributes.add(attributeFactory(attrDesc, kwargs.get(attrDesc.name, None), isOutput=True, node=self))
self._attributes.add(attributeFactory(attrDesc, kwargs.get(attrDesc.name, None),
isOutput=True, node=self))
for attrDesc in self.nodeDesc.internalInputs:
self._internalAttributes.add(attributeFactory(attrDesc, kwargs.get(attrDesc.name, None), isOutput=False,
node=self))
self._internalAttributes.add(attributeFactory(attrDesc, kwargs.get(attrDesc.name, None),
isOutput=False, node=self))
# Declare events for specific output attributes
for attr in self._attributes:
@ -1891,7 +1939,8 @@ class CompatibilityNode(BaseNode):
@staticmethod
def attributeDescFromName(refAttributes, name, value, strict=True):
"""
Try to find a matching attribute description in refAttributes for given attribute 'name' and 'value'.
Try to find a matching attribute description in refAttributes for given attribute
'name' and 'value'.
Args:
refAttributes ([desc.Attribute]): reference Attributes to look for a description