[core] Add support for isolated process environments

Initial commit that adds support for executing nodes in isolated process
environements:
* Python virtual environments
* Conda environments
* Rez environments

Note: this commit is mostly for sharing current progress and  should be re-worked/split in more atomic ones.
This commit is contained in:
Yann Lanthony 2025-03-03 16:28:23 +01:00
parent 2f05261564
commit 691cae4d79
8 changed files with 322 additions and 12 deletions

View file

@ -337,6 +337,8 @@ def initNodes():
for f in nodesFolders:
loadAllNodes(folder=f)
desc.configProcessEnvironmentsInit()
def initSubmitters():
meshroomFolder = os.path.dirname(os.path.dirname(__file__))

View file

@ -24,9 +24,14 @@ from .node import (
CommandLineNode,
InitNode,
InputNode,
IsolatedEnvNode,
Node,
)
from .config import (
configProcessEnvironmentsInit
)
__all__ = [
# attribute
"Attribute",
@ -52,5 +57,8 @@ __all__ = [
"CommandLineNode",
"InitNode",
"InputNode",
"IsolatedEnvNode",
"Node",
# envs
"configProcessEnvironmentsInit",
]

View file

@ -0,0 +1,76 @@
import json
import logging
from dataclasses import dataclass, field
from pathlib import Path
from meshroom.env import EnvVar
from .process import (
DefaultEnvironment,
ProcessEnvironment,
processEnvironmentFactory,
)
@dataclass
class _RegisteredProcessEnvironment:
"""A registered ProcessEnvironment, loaded from a source config file."""
desc: ProcessEnvironment
source: str
@dataclass
class _ConfigProcessEnvironments:
envs: dict[str, _RegisteredProcessEnvironment] = field(default_factory=dict)
nodeTypeEnvMapping: dict[str, str] = field(default_factory=dict)
CONFIG_PROCESS_ENVIRONMENTS = _ConfigProcessEnvironments()
def configProcessEnvironmentsInit():
envConfigs = EnvVar.get(EnvVar.MESHROOM_CONFIG_PROCESS_ENVS)
if not envConfigs:
return
envFiles = envConfigs.split(":")
for envFile in envFiles:
if not (envFilepath := Path(envFile)).exists():
continue
try:
_registerEnvironments(envFilepath)
except json.JSONDecodeError as e:
logging.warning(f"Failed to parse environment config file: {e}")
def getProcessEnvironment(nodeType: str) -> ProcessEnvironment:
if not (env_name := CONFIG_PROCESS_ENVIRONMENTS.nodeTypeEnvMapping.get(nodeType)):
return DefaultEnvironment("default", "default")
return CONFIG_PROCESS_ENVIRONMENTS.envs[env_name].desc
def _registerEnvironments(envFile: Path):
with open(envFile, "r") as f:
content = json.load(f)
envs = content.get("envs")
for env_name, value in envs.items():
_registerEnvironment(env_name, value, envFile)
for nodeType, envName in content["mapping"].items():
_registerNodeTypeEnvironmentMapping(nodeType, envName)
def _registerEnvironment(name: str, fields: dict, source: Path):
if env := CONFIG_PROCESS_ENVIRONMENTS.envs.get(name, None):
logging.warning(f"Skipping already defined env: {env}")
envDesc = processEnvironmentFactory(name, fields.pop("type"), **fields)
CONFIG_PROCESS_ENVIRONMENTS.envs[name] = _RegisteredProcessEnvironment(envDesc, source.as_posix())
def _registerNodeTypeEnvironmentMapping(nodeType: str, envName: str):
CONFIG_PROCESS_ENVIRONMENTS.nodeTypeEnvMapping[nodeType] = envName

View file

@ -1,9 +1,13 @@
import logging
import os
import psutil
import shlex
import sys
from .computation import Level, StaticNodeSize
from .attribute import StringParam, ColorParam
from .process import ProcessEnvironment
from .config import getProcessEnvironment
from meshroom.core import cgroup
@ -280,3 +284,89 @@ class InitNode(object):
for attr in attributesDict:
if node.hasAttribute(attr):
node.attribute(attr).value = attributesDict[attr]
class IsolatedEnvNode(CommandLineNode):
@classmethod
def getRuntimeEnv(cls) -> ProcessEnvironment:
return getProcessEnvironment(cls.__name__)
def processInEnvironment(self, chunk):
try:
IsolatedEnvNode._ensureGraphIsSaved(chunk.node)
except RuntimeError as e:
with open(chunk.logFile, "w") as logFile:
logFile.write(str(e))
raise
env = os.environ.copy()
env["PYTHONPATH"] = ProcessEnvironment.pythonPath()
runtimeEnv = self.getRuntimeEnv()
clArgs = f"{chunk.node.graph.filepath} --node {chunk.node.name} -i {chunk.range.iteration}"
fullCL = runtimeEnv.commandLine(clArgs)
logging.info(f"Starting env for '{chunk.node.name}' ({runtimeEnv}): {fullCL}")
# Change process process group to avoid meshroom main process being killed if the subprocess
# gets terminated by the user.
if sys.platform == "win32":
platformArgs = {"creationflags": psutil.CREATE_NEW_PROCESS_GROUP}
else:
platformArgs = {"preexec_fn": os.setsid}
with open(chunk.logFile, "w") as logF:
chunk.status.commandLine = fullCL
chunk.saveStatusFile()
chunk.subprocess = psutil.Popen(
shlex.split(fullCL),
stdout=logF,
stderr=logF,
cwd=chunk.node.internalFolder,
env=env,
**platformArgs,
)
chunk.subprocess.communicate()
chunk.subprocess.wait()
chunk.status.returnCode = chunk.subprocess.returncode
if chunk.subprocess.returncode != 0:
with open(chunk.logFile, "r") as logF:
logContent = "".join(logF.readlines())
raise RuntimeError(
'Error on node "{}":\nLog:\n{}'.format(chunk.name, logContent)
)
@staticmethod
def _ensureGraphIsSaved(node):
"""Raise a RuntimeError if the current node is not saved."""
if not IsolatedEnvNode._nodeSaved(node):
raise RuntimeError("File must be saved before computing in isolated environment.")
@staticmethod
def _nodeSaved(node):
"""Returns whether a node is identical to its serialized counterpart in the current graph file."""
if not (filepath := node.graph.filepath):
return False
from meshroom.core.graph import loadGraph
g = loadGraph(filepath)
if (node := g.node(node.name)) is None:
return False
return node._uid == node._uid
def stopProcess(self, chunk):
# The same node could exists several times in the graph and
# only one would have the running subprocess; ignore all others
if not hasattr(chunk, "subprocess"):
return
if chunk.subprocess:
# Kill process tree
processes = chunk.subprocess.children(recursive=True) + [chunk.subprocess]
try:
for process in processes:
process.terminate()
except psutil.NoSuchProcess:
pass

View file

@ -0,0 +1,112 @@
import os
from abc import ABC, abstractmethod
from enum import Enum
from pathlib import Path
from dataclasses import dataclass
import meshroom
_MESHROOM_ROOT = Path(meshroom.__file__).parent.parent
_MESHROOM_COMPUTE = _MESHROOM_ROOT / "bin" / "meshroom_compute"
@dataclass
class ProcessEnvironment(ABC):
"""Describes an isolated meshroom compute process environment.
Attributes:
name: User-readable name of the environment.
uri: Unique resource identifier to activate the environment.
"""
name: str
uri: str
@abstractmethod
def commandLine(self, args: str) -> str:
"""Build Meshroom compute command line from given args to run in this environment."""
...
@staticmethod
def pythonPath():
return f"{_MESHROOM_ROOT}:{os.getenv('PYTHONPATH', '')}"
@dataclass
class DefaultEnvironment(ProcessEnvironment):
"""Default environment similar to the main process."""
def commandLine(self, args: str) -> str:
return f"{_MESHROOM_COMPUTE} {args}"
@dataclass
class CondaEnvironment(ProcessEnvironment):
"""Conda environment where uri defines the name of the environment."""
def commandLine(self, args: str) -> str:
return f"conda run -n {self.uri} {_MESHROOM_COMPUTE} {args}"
@dataclass
class VirtualEnvironment(ProcessEnvironment):
"""Python virtual environment where uri defines the root path of the virtual environment."""
def commandLine(self, args: str) -> str:
return f"{self.uri}/bin/python {_MESHROOM_COMPUTE} {args}"
@dataclass
class RezEnvironment(ProcessEnvironment):
"""Rez environment where uri defined either a list of requirements or a .rxt file.
Attributes:
pyexe: The python executable to use for starting meshroom compute.
"""
pyexe: str = "python"
def commandLine(self, args: str) -> str:
pythonPathSetup = f"export PYTHONPATH={_MESHROOM_ROOT}:$PYTHONPATH;"
cmd = f"{pythonPathSetup} {self.pyexe} {_MESHROOM_COMPUTE} {args}"
if (path := Path(self.uri)).exists() and path.suffix == ".rxt":
return f"rez env -i {self.uri} -c '{cmd}'"
return f"rez env {self.uri} -c '{cmd}'"
class ProcessEnvironmentType(Enum):
DEFAULT = "default"
CONDA = "conda"
VIRTUALENV = "venv"
REZ = "rez"
_ENV_BY_TYPE = {
ProcessEnvironmentType.DEFAULT: DefaultEnvironment,
ProcessEnvironmentType.CONDA: CondaEnvironment,
ProcessEnvironmentType.VIRTUALENV: VirtualEnvironment,
ProcessEnvironmentType.REZ: RezEnvironment,
}
def processEnvironmentFactory(name: str, type: str, **kwargs) -> ProcessEnvironment:
"""Creates a ProcessEnvironment instance of the given `type`.
Args:
name: The name of the environment.
type: The ProcessEnvironment type.
**kwargs: Keyword arguments to pass to the ProcessEnvironment constructor.
Returns:
The created ProcessEnvironment instance.
"""
try:
return _ENV_BY_TYPE[ProcessEnvironmentType(type)](name, **kwargs)
except (KeyError, ValueError):
raise KeyError(f"Unvalid environment type: {type}")

View file

@ -13,7 +13,7 @@ import time
import types
import uuid
from collections import namedtuple
from enum import Enum
from enum import Enum, auto
from typing import Callable, Optional
import meshroom
@ -55,9 +55,10 @@ class Status(Enum):
class ExecMode(Enum):
NONE = 0
LOCAL = 1
EXTERN = 2
NONE = auto()
LOCAL = auto()
LOCAL_ISOLATED = auto()
EXTERN = auto()
class StatusData(BaseObject):
@ -405,6 +406,14 @@ class NodeChunk(BaseObject):
if not forceCompute and self._status.status == Status.SUCCESS:
logging.info("Node chunk already computed: {}".format(self.name))
return
# Start the process environment for nodes running in isolation.
# This only happens once, when the node has the SUBMITTED status.
# The sub-process will go through this method again, but the node status will have been set to RUNNING.
if isinstance(self.node.nodeDesc, desc.IsolatedEnvNode) and self._status.status is Status.SUBMITTED:
self._processInIsolatedEnvironment()
return
global runningProcesses
runningProcesses[self.name] = self
self._status.initStartCompute()
@ -415,6 +424,8 @@ class NodeChunk(BaseObject):
self.statThread.start()
try:
self.node.nodeDesc.processChunk(self)
# NOTE: this assumes saving the output attributes for each chunk
self.node.saveOutputAttr()
except Exception:
if self._status.status != Status.STOPPED:
exceptionStatus = Status.ERROR
@ -436,6 +447,15 @@ class NodeChunk(BaseObject):
self.upgradeStatusTo(Status.SUCCESS)
def _processInIsolatedEnvironment(self):
"""Process this node chunk in the isolated environment defined in the environment configuration."""
try:
self.upgradeStatusTo(Status.RUNNING, execMode=ExecMode.LOCAL_ISOLATED)
self.node.nodeDesc.processInEnvironment(self)
except:
self.upgradeStatusTo(Status.ERROR)
raise
def stopProcess(self):
if not self.isExtern():
if self._status.status == Status.RUNNING:
@ -1073,7 +1093,6 @@ class BaseNode(BaseObject):
def postprocess(self):
# Invoke the post process on Client Node to execute after the processing on the node is completed
self.nodeDesc.postprocess(self)
self.saveOutputAttr()
def updateOutputAttr(self):
if not self.nodeDesc:
@ -1331,14 +1350,14 @@ class BaseNode(BaseObject):
# Only locked nodes running in local with the same
# sessionUid as the Meshroom instance can be stopped
return (self.locked and self.getGlobalStatus() == Status.RUNNING and
self.globalExecMode == "LOCAL" and self.statusInThisSession())
self.globalExecMode in {"LOCAL", "LOCAL_ISOLATED"} and self.statusInThisSession())
@Slot(result=bool)
def canBeCanceled(self):
# Only locked nodes submitted in local with the same
# sessionUid as the Meshroom instance can be canceled
return (self.locked and self.getGlobalStatus() == Status.SUBMITTED and
self.globalExecMode == "LOCAL" and self.statusInThisSession())
self.globalExecMode in {"LOCAL", "LOCAL_ISOLATED"} and self.statusInThisSession())
def hasImageOutputAttribute(self):
"""

View file

@ -39,6 +39,9 @@ class EnvVar(Enum):
str, "port:3768", "QML debugging params as expected by -qmljsdebugger"
)
# Core
MESHROOM_CONFIG_PROCESS_ENVS = VarDefinition(str, "", "Process environments config files (colon separated)")
@staticmethod
def get(envVar: "EnvVar") -> Any:
"""Get the value of `envVar`, cast to the variable type."""

View file

@ -184,9 +184,9 @@ class ChunksMonitor(QObject):
elif self.filePollerRefresh is PollerRefreshStatus.MINIMAL_ENABLED.value:
for c in self.monitorableChunks:
# Only chunks that are run externally should be monitored; when run locally, status changes are already notified
if c.isExtern():
if c.isExtern() or c._status.execMode is ExecMode.LOCAL_ISOLATED:
# Chunks with an ERROR status may be re-submitted externally and should thus still be monitored
if c._status.status is Status.SUBMITTED or c._status.status is Status.RUNNING or c._status.status is Status.ERROR:
if c._status.status in {Status.SUBMITTED, Status.RUNNING, Status.ERROR}:
files.append(c.statusFile)
chunks.append(c)
return files, chunks
@ -204,7 +204,7 @@ class ChunksMonitor(QObject):
# update chunk status if last modification time has changed since previous record
if fileModTime != chunk.statusFileLastModTime:
chunk.updateStatusFromCache()
chunk.node.updateOutputAttr()
chunk.node.loadOutputAttr()
def onFilePollerRefreshUpdated(self):
"""
@ -579,7 +579,7 @@ class UIGraph(QObject):
def updateGraphComputingStatus(self):
# update graph computing status
computingLocally = any([
(ch.status.execMode == ExecMode.LOCAL and
(ch.status.execMode in {ExecMode.LOCAL, ExecMode.LOCAL_ISOLATED} and
ch.status.sessionUid == sessionUid and
ch.status.status in (Status.RUNNING, Status.SUBMITTED))
for ch in self._sortedDFSChunks])