Explicit meshroom node type in status file

Avoid an ambiguous LOCAL_ISOLATED, as a process can be extern and using
an isolated execution environement.
This commit is contained in:
Fabien Castan 2025-03-25 12:28:22 +01:00
parent 426855baa6
commit eb9df4c900
6 changed files with 76 additions and 33 deletions

View file

@ -83,9 +83,9 @@ if args.node:
chunks = node.chunks
for chunk in chunks:
if chunk.status.status in submittedStatuses:
# Particular case for the LOCAL_ISOLATED, the node status is set to RUNNING by the submitter directly.
# Particular case for the local isolated, the node status is set to RUNNING by the submitter directly.
# We ensure that no other instance has started to compute, by checking that the sessionUid is empty.
if chunk.status.execMode == ExecMode.LOCAL_ISOLATED and not chunk.status.sessionUid and chunk.status.submitterSessionUid:
if chunk.status.mrNodeType == meshroom.core.MrNodeType.NODE and not chunk.status.sessionUid and chunk.status.submitterSessionUid:
continue
print(f'Warning: Node is already submitted with status "{chunk.status.status.name}". See file: "{chunk.statusFile}". ExecMode: {chunk.status.execMode.name}, SessionUid: {chunk.status.sessionUid}, submitterSessionUid: {chunk.status.submitterSessionUid}')
# sys.exit(-1)

View file

@ -1,14 +1,14 @@
import hashlib
from contextlib import contextmanager
import hashlib
import importlib
import inspect
import os
import tempfile
import uuid
import logging
import os
import pkgutil
import sys
import tempfile
import traceback
import uuid
try:
# for cx_freeze
@ -21,7 +21,7 @@ except Exception:
from meshroom.core.submitter import BaseSubmitter
from meshroom.env import EnvVar, meshroomFolder
from . import desc
from .desc import MrNodeType
# Setup logging
logging.basicConfig(format='[%(asctime)s][%(levelname)s] %(message)s', level=logging.INFO)
@ -36,6 +36,7 @@ submitters = {}
pipelineTemplates = {}
def hashValue(value):
""" Hash 'value' using sha1. """
hashObject = hashlib.sha1(str(value).encode('utf-8'))

View file

@ -20,6 +20,7 @@ from .computation import (
StaticNodeSize,
)
from .node import (
MrNodeType,
AVCommandLineNode,
BaseNode,
CommandLineNode,

View file

@ -1,3 +1,4 @@
import enum
from inspect import getfile
from pathlib import Path
import logging
@ -13,11 +14,17 @@ from .attribute import StringParam, ColorParam
import meshroom
from meshroom.core import cgroup
_MESHROOM_ROOT = Path(meshroom.__file__).parent.parent
_MESHROOM_COMPUTE = _MESHROOM_ROOT / "bin" / "meshroom_compute"
class MrNodeType(enum.Enum):
NONE = enum.auto()
BASENODE = enum.auto()
NODE = enum.auto()
COMMANDLINE = enum.auto()
INPUT = enum.auto()
def isNodeSaved(node):
"""Returns whether a node is identical to its serialized counterpart in the current graph file."""
filepath = node.graph.filepath
@ -87,6 +94,9 @@ class BaseNode(object):
self.hasDynamicOutputAttribute = any(output.isDynamicValue for output in self.outputs)
self.sourceCodeFolder = Path(getfile(self.__class__)).parent.resolve().as_posix()
def getMrNodeType(self):
return MrNodeType.BASENODE
def upgradeAttributeValues(self, attrValues, fromVersion):
return attrValues
@ -236,6 +246,9 @@ class InputNode(BaseNode):
def __init__(self):
super(InputNode, self).__init__()
def getMrNodeType(self):
return MrNodeType.INPUT
def processChunk(self, chunk):
pass
@ -245,6 +258,9 @@ class Node(BaseNode):
def __init__(self):
super(Node, self).__init__()
def getMrNodeType(self):
return MrNodeType.NODE
def processChunkInEnvironment(self, chunk):
if not isNodeSaved(chunk.node):
raise RuntimeError("File must be saved before computing in isolated environment.")
@ -267,6 +283,9 @@ class CommandLineNode(BaseNode):
def __init__(self):
super(CommandLineNode, self).__init__()
def getMrNodeType(self):
return MrNodeType.COMMANDLINE
def buildCommandLine(self, chunk):
cmdPrefix = ''

View file

@ -57,7 +57,6 @@ class Status(Enum):
class ExecMode(Enum):
NONE = auto()
LOCAL = auto()
LOCAL_ISOLATED = auto()
EXTERN = auto()
@ -86,6 +85,7 @@ class StatusData(BaseObject):
self.nodeType: str = ""
self.packageName: str = ""
self.packageVersion: str = ""
self.mrNodeType: desc.MrNodeType = desc.MrNodeType.NONE
self.resetDynamicValues()
def resetDynamicValues(self):
@ -108,6 +108,14 @@ class StatusData(BaseObject):
self.startDateTime = datetime.datetime.now().strftime(self.dateTimeFormatting)
# to get datetime obj: datetime.datetime.strptime(obj, self.dateTimeFormatting)
def initIsolatedCompute(self):
''' When submitting a node, we reset the status information to ensure that we do not keep outdated information.
'''
self.resetDynamicValues()
self.mrNodeType = desc.MrNodeType.NODE
self.sessionUid = None
self.submitterSessionUid = meshroom.core.sessionUid
def initSubmit(self):
''' When submitting a node, we reset the status information to ensure that we do not keep outdated information.
'''
@ -144,6 +152,10 @@ class StatusData(BaseObject):
self.execMode = d.get('execMode', ExecMode.NONE)
if not isinstance(self.execMode, ExecMode):
self.execMode = ExecMode[self.execMode]
self.mrNodeType = d.get('mrNodeType', desc.MrNodeType.NONE)
if not isinstance(self.mrNodeType, desc.MrNodeType):
self.mrNodeType = desc.MrNodeType[self.mrNodeType]
self.nodeName = d.get('nodeName', '')
self.nodeType = d.get('nodeType', '')
self.packageName = d.get('packageName', '')
@ -456,8 +468,8 @@ class NodeChunk(BaseObject):
def _processInIsolatedEnvironment(self):
"""Process this node chunk in the isolated environment defined in the environment configuration."""
try:
self._status.initSubmit()
self.upgradeStatusTo(Status.RUNNING, execMode=ExecMode.LOCAL_ISOLATED)
self._status.initIsolatedCompute()
self.upgradeStatusTo(Status.RUNNING, execMode=ExecMode.LOCAL)
self.node.nodeDesc.processChunkInEnvironment(self)
except:
# status should be already updated by meshroom_compute
@ -473,22 +485,25 @@ class NodeChunk(BaseObject):
self.node.updateOutputAttr()
def stopProcess(self):
if self.isExtern():
return
if self._status.status != Status.RUNNING:
return
# if self.isExtern() or self._status.status != Status.RUNNING:
# return
self.upgradeStatusTo(Status.STOPPED)
self.node.nodeDesc.stopProcess(self)
def isExtern(self):
""" The computation is managed externally by another instance of Meshroom, or by meshroom_compute on renderfarm).
""" The computation is managed externally by another instance of Meshroom.
In the ambiguous case of an isolated environment, it is considered as local as we can stop it.
"""
if self._status.execMode == ExecMode.LOCAL_ISOLATED:
# It is a local isolated node, check if it is submitted by our current session.
return self._status.submitterSessionUid != meshroom.core.sessionUid
return self._status.sessionUid != meshroom.core.sessionUid
uid = self._status.submitterSessionUid if self._status.mrNodeType == desc.MrNodeType.NODE else meshroom.core.sessionUid
return uid != meshroom.core.sessionUid
# def isIndependantProcess(self):
# if self._status.execMode == ExecMode.EXTERN:
# # Compute is managed by another instance of Meshroom
# return True
# # Compute is using a meshroom_compute subprocess
# return self._status.mrNodeType == desc.MrNodeType.NODE
statusChanged = Signal()
status = Property(Variant, lambda self: self._status, notify=statusChanged)
@ -1398,15 +1413,23 @@ class BaseNode(BaseObject):
return False
return True
def initFromThisSession(self):
if not self._chunks:
return False
for chunk in self._chunks:
uid = chunk.status.submitterSessionUid if chunk.status.mrNodeType == desc.MrNodeType.NODE else chunk.status.sessionUid
if uid != meshroom.core.sessionUid:
return False
return True
@Slot(result=bool)
def canBeStopped(self):
# Only locked nodes running in local with the same
# sessionUid as the Meshroom instance can be stopped
# logging.warning(f"[{self.name}] canBeStopped: globalExecMode={self.globalExecMode} globalStatus={self.getGlobalStatus()} statusInThisSession={self.statusInThisSession()}, submitterStatusInThisSession={self.submitterStatusInThisSession()}")
return (self.getGlobalStatus() == Status.RUNNING and
((self.globalExecMode == ExecMode.LOCAL.name and self.statusInThisSession()) or
(self.globalExecMode == ExecMode.LOCAL_ISOLATED.name and self.submitterStatusInThisSession())
))
self.globalExecMode == ExecMode.LOCAL.name and
self.initFromThisSession())
@Slot(result=bool)
def canBeCanceled(self):
@ -1414,9 +1437,8 @@ class BaseNode(BaseObject):
# sessionUid as the Meshroom instance can be canceled
# logging.warning(f"[{self.name}] canBeCanceled: globalExecMode={self.globalExecMode} globalStatus={self.getGlobalStatus()} statusInThisSession={self.statusInThisSession()}, submitterStatusInThisSession={self.submitterStatusInThisSession()}")
return (self.getGlobalStatus() == Status.SUBMITTED and
((self.globalExecMode == ExecMode.LOCAL.name and self.statusInThisSession()) or
(self.globalExecMode == ExecMode.LOCAL_ISOLATED.name and self.submitterStatusInThisSession())
))
self.globalExecMode == ExecMode.LOCAL.name and
self.initFromThisSession())
def hasImageOutputAttribute(self):
"""

View file

@ -30,7 +30,7 @@ from meshroom.core.graphIO import GraphIO
from meshroom.core.taskManager import TaskManager
from meshroom.core.node import NodeChunk, Node, Status, ExecMode, CompatibilityNode, Position
from meshroom.core import submitters
from meshroom.core import submitters, MrNodeType
from meshroom.ui import commands
from meshroom.ui.utils import makeProperty
@ -187,7 +187,7 @@ class ChunksMonitor(QObject):
# when run locally, status changes are already notified.
# Chunks with an ERROR status may be re-submitted externally and should thus still be monitored
if (c.isExtern() and c._status.status in (Status.SUBMITTED, Status.RUNNING, Status.ERROR)) or (
(c._status.execMode is ExecMode.LOCAL_ISOLATED) and (c._status.status in (Status.SUBMITTED, Status.RUNNING))):
(c._status.mrNodeType == MrNodeType.NODE) and (c._status.status in (Status.SUBMITTED, Status.RUNNING))):
files.append(c.statusFile)
chunks.append(c)
return files, chunks
@ -201,13 +201,14 @@ class ChunksMonitor(QObject):
times: the last modification times for currently monitored files.
"""
newRecords = dict(zip(self.monitoredChunks, times))
hasChanges = False
hasChangesAndSuccess = False
for chunk, fileModTime in newRecords.items():
# update chunk status if last modification time has changed since previous record
if fileModTime != chunk.statusFileLastModTime:
chunk.updateStatusFromCache()
hasChanges = True
if hasChanges:
if chunk.status.status == Status.SUCCESS:
hasChangesAndSuccess = True
if hasChangesAndSuccess:
chunk.node.loadOutputAttr()
def onFilePollerRefreshUpdated(self):
@ -583,8 +584,7 @@ class UIGraph(QObject):
def updateGraphComputingStatus(self):
# update graph computing status
computingLocally = any([
(((ch.status.execMode == ExecMode.LOCAL and ch.status.sessionUid == sessionUid) or
ch.status.execMode == ExecMode.LOCAL_ISOLATED) and
((ch.status.submitterSessionUid if ch.status.mrNodeType == MrNodeType.NODE else ch.status.sessionUid) == sessionUid) and (
ch.status.status in (Status.RUNNING, Status.SUBMITTED))
for ch in self._sortedDFSChunks])
submitted = any([ch.status.status == Status.SUBMITTED for ch in self._sortedDFSChunks])