From 691cae4d799a926b019941158e2e92e8fb1b050c Mon Sep 17 00:00:00 2001 From: Yann Lanthony Date: Mon, 3 Mar 2025 16:28:23 +0100 Subject: [PATCH] [core] Add support for isolated process environments Initial commit that adds support for executing nodes in isolated process environements: * Python virtual environments * Conda environments * Rez environments Note: this commit is mostly for sharing current progress and should be re-worked/split in more atomic ones. --- meshroom/core/__init__.py | 2 + meshroom/core/desc/__init__.py | 8 +++ meshroom/core/desc/config.py | 76 ++++++++++++++++++++++ meshroom/core/desc/node.py | 90 ++++++++++++++++++++++++++ meshroom/core/desc/process.py | 112 +++++++++++++++++++++++++++++++++ meshroom/core/node.py | 35 ++++++++--- meshroom/env.py | 3 + meshroom/ui/graph.py | 8 +-- 8 files changed, 322 insertions(+), 12 deletions(-) create mode 100644 meshroom/core/desc/config.py create mode 100644 meshroom/core/desc/process.py diff --git a/meshroom/core/__init__.py b/meshroom/core/__init__.py index 3bd30d5b..51a00ca8 100644 --- a/meshroom/core/__init__.py +++ b/meshroom/core/__init__.py @@ -337,6 +337,8 @@ def initNodes(): for f in nodesFolders: loadAllNodes(folder=f) + desc.configProcessEnvironmentsInit() + def initSubmitters(): meshroomFolder = os.path.dirname(os.path.dirname(__file__)) diff --git a/meshroom/core/desc/__init__.py b/meshroom/core/desc/__init__.py index b60a9d51..631eba07 100644 --- a/meshroom/core/desc/__init__.py +++ b/meshroom/core/desc/__init__.py @@ -24,9 +24,14 @@ from .node import ( CommandLineNode, InitNode, InputNode, + IsolatedEnvNode, Node, ) +from .config import ( + configProcessEnvironmentsInit +) + __all__ = [ # attribute "Attribute", @@ -52,5 +57,8 @@ __all__ = [ "CommandLineNode", "InitNode", "InputNode", + "IsolatedEnvNode", "Node", + # envs + "configProcessEnvironmentsInit", ] diff --git a/meshroom/core/desc/config.py b/meshroom/core/desc/config.py new file mode 100644 index 00000000..f82e07a8 --- /dev/null +++ b/meshroom/core/desc/config.py @@ -0,0 +1,76 @@ +import json +import logging +from dataclasses import dataclass, field +from pathlib import Path + +from meshroom.env import EnvVar + +from .process import ( + DefaultEnvironment, + ProcessEnvironment, + processEnvironmentFactory, +) + + +@dataclass +class _RegisteredProcessEnvironment: + """A registered ProcessEnvironment, loaded from a source config file.""" + desc: ProcessEnvironment + source: str + + +@dataclass +class _ConfigProcessEnvironments: + envs: dict[str, _RegisteredProcessEnvironment] = field(default_factory=dict) + nodeTypeEnvMapping: dict[str, str] = field(default_factory=dict) + + +CONFIG_PROCESS_ENVIRONMENTS = _ConfigProcessEnvironments() + + +def configProcessEnvironmentsInit(): + envConfigs = EnvVar.get(EnvVar.MESHROOM_CONFIG_PROCESS_ENVS) + if not envConfigs: + return + + envFiles = envConfigs.split(":") + + for envFile in envFiles: + if not (envFilepath := Path(envFile)).exists(): + continue + try: + _registerEnvironments(envFilepath) + except json.JSONDecodeError as e: + logging.warning(f"Failed to parse environment config file: {e}") + + +def getProcessEnvironment(nodeType: str) -> ProcessEnvironment: + if not (env_name := CONFIG_PROCESS_ENVIRONMENTS.nodeTypeEnvMapping.get(nodeType)): + return DefaultEnvironment("default", "default") + + return CONFIG_PROCESS_ENVIRONMENTS.envs[env_name].desc + + +def _registerEnvironments(envFile: Path): + with open(envFile, "r") as f: + content = json.load(f) + envs = content.get("envs") + + for env_name, value in envs.items(): + _registerEnvironment(env_name, value, envFile) + + for nodeType, envName in content["mapping"].items(): + _registerNodeTypeEnvironmentMapping(nodeType, envName) + + +def _registerEnvironment(name: str, fields: dict, source: Path): + if env := CONFIG_PROCESS_ENVIRONMENTS.envs.get(name, None): + logging.warning(f"Skipping already defined env: {env}") + + envDesc = processEnvironmentFactory(name, fields.pop("type"), **fields) + + CONFIG_PROCESS_ENVIRONMENTS.envs[name] = _RegisteredProcessEnvironment(envDesc, source.as_posix()) + + +def _registerNodeTypeEnvironmentMapping(nodeType: str, envName: str): + CONFIG_PROCESS_ENVIRONMENTS.nodeTypeEnvMapping[nodeType] = envName diff --git a/meshroom/core/desc/node.py b/meshroom/core/desc/node.py index 2272c0ba..97710e56 100644 --- a/meshroom/core/desc/node.py +++ b/meshroom/core/desc/node.py @@ -1,9 +1,13 @@ +import logging import os import psutil import shlex +import sys from .computation import Level, StaticNodeSize from .attribute import StringParam, ColorParam +from .process import ProcessEnvironment +from .config import getProcessEnvironment from meshroom.core import cgroup @@ -280,3 +284,89 @@ class InitNode(object): for attr in attributesDict: if node.hasAttribute(attr): node.attribute(attr).value = attributesDict[attr] + + +class IsolatedEnvNode(CommandLineNode): + @classmethod + def getRuntimeEnv(cls) -> ProcessEnvironment: + return getProcessEnvironment(cls.__name__) + + def processInEnvironment(self, chunk): + try: + IsolatedEnvNode._ensureGraphIsSaved(chunk.node) + except RuntimeError as e: + with open(chunk.logFile, "w") as logFile: + logFile.write(str(e)) + raise + + env = os.environ.copy() + env["PYTHONPATH"] = ProcessEnvironment.pythonPath() + + runtimeEnv = self.getRuntimeEnv() + clArgs = f"{chunk.node.graph.filepath} --node {chunk.node.name} -i {chunk.range.iteration}" + fullCL = runtimeEnv.commandLine(clArgs) + + logging.info(f"Starting env for '{chunk.node.name}' ({runtimeEnv}): {fullCL}") + + # Change process process group to avoid meshroom main process being killed if the subprocess + # gets terminated by the user. + if sys.platform == "win32": + platformArgs = {"creationflags": psutil.CREATE_NEW_PROCESS_GROUP} + else: + platformArgs = {"preexec_fn": os.setsid} + + with open(chunk.logFile, "w") as logF: + chunk.status.commandLine = fullCL + chunk.saveStatusFile() + chunk.subprocess = psutil.Popen( + shlex.split(fullCL), + stdout=logF, + stderr=logF, + cwd=chunk.node.internalFolder, + env=env, + **platformArgs, + ) + + chunk.subprocess.communicate() + chunk.subprocess.wait() + + chunk.status.returnCode = chunk.subprocess.returncode + + if chunk.subprocess.returncode != 0: + with open(chunk.logFile, "r") as logF: + logContent = "".join(logF.readlines()) + raise RuntimeError( + 'Error on node "{}":\nLog:\n{}'.format(chunk.name, logContent) + ) + + @staticmethod + def _ensureGraphIsSaved(node): + """Raise a RuntimeError if the current node is not saved.""" + if not IsolatedEnvNode._nodeSaved(node): + raise RuntimeError("File must be saved before computing in isolated environment.") + + @staticmethod + def _nodeSaved(node): + """Returns whether a node is identical to its serialized counterpart in the current graph file.""" + if not (filepath := node.graph.filepath): + return False + + from meshroom.core.graph import loadGraph + g = loadGraph(filepath) + if (node := g.node(node.name)) is None: + return False + return node._uid == node._uid + + def stopProcess(self, chunk): + # The same node could exists several times in the graph and + # only one would have the running subprocess; ignore all others + if not hasattr(chunk, "subprocess"): + return + if chunk.subprocess: + # Kill process tree + processes = chunk.subprocess.children(recursive=True) + [chunk.subprocess] + try: + for process in processes: + process.terminate() + except psutil.NoSuchProcess: + pass \ No newline at end of file diff --git a/meshroom/core/desc/process.py b/meshroom/core/desc/process.py new file mode 100644 index 00000000..c39bf843 --- /dev/null +++ b/meshroom/core/desc/process.py @@ -0,0 +1,112 @@ +import os + +from abc import ABC, abstractmethod +from enum import Enum +from pathlib import Path + +from dataclasses import dataclass + +import meshroom + + +_MESHROOM_ROOT = Path(meshroom.__file__).parent.parent +_MESHROOM_COMPUTE = _MESHROOM_ROOT / "bin" / "meshroom_compute" + + +@dataclass +class ProcessEnvironment(ABC): + """Describes an isolated meshroom compute process environment. + + Attributes: + name: User-readable name of the environment. + uri: Unique resource identifier to activate the environment. + """ + + name: str + uri: str + + @abstractmethod + def commandLine(self, args: str) -> str: + """Build Meshroom compute command line from given args to run in this environment.""" + ... + + @staticmethod + def pythonPath(): + return f"{_MESHROOM_ROOT}:{os.getenv('PYTHONPATH', '')}" + + +@dataclass +class DefaultEnvironment(ProcessEnvironment): + """Default environment similar to the main process.""" + + def commandLine(self, args: str) -> str: + return f"{_MESHROOM_COMPUTE} {args}" + + +@dataclass +class CondaEnvironment(ProcessEnvironment): + """Conda environment where uri defines the name of the environment.""" + + def commandLine(self, args: str) -> str: + return f"conda run -n {self.uri} {_MESHROOM_COMPUTE} {args}" + + +@dataclass +class VirtualEnvironment(ProcessEnvironment): + """Python virtual environment where uri defines the root path of the virtual environment.""" + + def commandLine(self, args: str) -> str: + return f"{self.uri}/bin/python {_MESHROOM_COMPUTE} {args}" + + +@dataclass +class RezEnvironment(ProcessEnvironment): + """Rez environment where uri defined either a list of requirements or a .rxt file. + + Attributes: + pyexe: The python executable to use for starting meshroom compute. + """ + + pyexe: str = "python" + + def commandLine(self, args: str) -> str: + pythonPathSetup = f"export PYTHONPATH={_MESHROOM_ROOT}:$PYTHONPATH;" + + cmd = f"{pythonPathSetup} {self.pyexe} {_MESHROOM_COMPUTE} {args}" + + if (path := Path(self.uri)).exists() and path.suffix == ".rxt": + return f"rez env -i {self.uri} -c '{cmd}'" + + return f"rez env {self.uri} -c '{cmd}'" + + +class ProcessEnvironmentType(Enum): + DEFAULT = "default" + CONDA = "conda" + VIRTUALENV = "venv" + REZ = "rez" + + +_ENV_BY_TYPE = { + ProcessEnvironmentType.DEFAULT: DefaultEnvironment, + ProcessEnvironmentType.CONDA: CondaEnvironment, + ProcessEnvironmentType.VIRTUALENV: VirtualEnvironment, + ProcessEnvironmentType.REZ: RezEnvironment, +} + + +def processEnvironmentFactory(name: str, type: str, **kwargs) -> ProcessEnvironment: + """Creates a ProcessEnvironment instance of the given `type`. + + Args: + name: The name of the environment. + type: The ProcessEnvironment type. + **kwargs: Keyword arguments to pass to the ProcessEnvironment constructor. + + Returns: + The created ProcessEnvironment instance. + """ + try: + return _ENV_BY_TYPE[ProcessEnvironmentType(type)](name, **kwargs) + except (KeyError, ValueError): + raise KeyError(f"Unvalid environment type: {type}") diff --git a/meshroom/core/node.py b/meshroom/core/node.py index bdf36976..e6c0ed54 100644 --- a/meshroom/core/node.py +++ b/meshroom/core/node.py @@ -13,7 +13,7 @@ import time import types import uuid from collections import namedtuple -from enum import Enum +from enum import Enum, auto from typing import Callable, Optional import meshroom @@ -55,9 +55,10 @@ class Status(Enum): class ExecMode(Enum): - NONE = 0 - LOCAL = 1 - EXTERN = 2 + NONE = auto() + LOCAL = auto() + LOCAL_ISOLATED = auto() + EXTERN = auto() class StatusData(BaseObject): @@ -405,6 +406,14 @@ class NodeChunk(BaseObject): if not forceCompute and self._status.status == Status.SUCCESS: logging.info("Node chunk already computed: {}".format(self.name)) return + + # Start the process environment for nodes running in isolation. + # This only happens once, when the node has the SUBMITTED status. + # The sub-process will go through this method again, but the node status will have been set to RUNNING. + if isinstance(self.node.nodeDesc, desc.IsolatedEnvNode) and self._status.status is Status.SUBMITTED: + self._processInIsolatedEnvironment() + return + global runningProcesses runningProcesses[self.name] = self self._status.initStartCompute() @@ -415,6 +424,8 @@ class NodeChunk(BaseObject): self.statThread.start() try: self.node.nodeDesc.processChunk(self) + # NOTE: this assumes saving the output attributes for each chunk + self.node.saveOutputAttr() except Exception: if self._status.status != Status.STOPPED: exceptionStatus = Status.ERROR @@ -436,6 +447,15 @@ class NodeChunk(BaseObject): self.upgradeStatusTo(Status.SUCCESS) + def _processInIsolatedEnvironment(self): + """Process this node chunk in the isolated environment defined in the environment configuration.""" + try: + self.upgradeStatusTo(Status.RUNNING, execMode=ExecMode.LOCAL_ISOLATED) + self.node.nodeDesc.processInEnvironment(self) + except: + self.upgradeStatusTo(Status.ERROR) + raise + def stopProcess(self): if not self.isExtern(): if self._status.status == Status.RUNNING: @@ -1072,8 +1092,7 @@ class BaseNode(BaseObject): def postprocess(self): # Invoke the post process on Client Node to execute after the processing on the node is completed - self.nodeDesc.postprocess(self) - self.saveOutputAttr() + self.nodeDesc.postprocess(self) def updateOutputAttr(self): if not self.nodeDesc: @@ -1331,14 +1350,14 @@ class BaseNode(BaseObject): # Only locked nodes running in local with the same # sessionUid as the Meshroom instance can be stopped return (self.locked and self.getGlobalStatus() == Status.RUNNING and - self.globalExecMode == "LOCAL" and self.statusInThisSession()) + self.globalExecMode in {"LOCAL", "LOCAL_ISOLATED"} and self.statusInThisSession()) @Slot(result=bool) def canBeCanceled(self): # Only locked nodes submitted in local with the same # sessionUid as the Meshroom instance can be canceled return (self.locked and self.getGlobalStatus() == Status.SUBMITTED and - self.globalExecMode == "LOCAL" and self.statusInThisSession()) + self.globalExecMode in {"LOCAL", "LOCAL_ISOLATED"} and self.statusInThisSession()) def hasImageOutputAttribute(self): """ diff --git a/meshroom/env.py b/meshroom/env.py index 3036db20..b4665024 100644 --- a/meshroom/env.py +++ b/meshroom/env.py @@ -39,6 +39,9 @@ class EnvVar(Enum): str, "port:3768", "QML debugging params as expected by -qmljsdebugger" ) + # Core + MESHROOM_CONFIG_PROCESS_ENVS = VarDefinition(str, "", "Process environments config files (colon separated)") + @staticmethod def get(envVar: "EnvVar") -> Any: """Get the value of `envVar`, cast to the variable type.""" diff --git a/meshroom/ui/graph.py b/meshroom/ui/graph.py index 5a0d1b27..8dc13e8f 100644 --- a/meshroom/ui/graph.py +++ b/meshroom/ui/graph.py @@ -184,9 +184,9 @@ class ChunksMonitor(QObject): elif self.filePollerRefresh is PollerRefreshStatus.MINIMAL_ENABLED.value: for c in self.monitorableChunks: # Only chunks that are run externally should be monitored; when run locally, status changes are already notified - if c.isExtern(): + if c.isExtern() or c._status.execMode is ExecMode.LOCAL_ISOLATED: # Chunks with an ERROR status may be re-submitted externally and should thus still be monitored - if c._status.status is Status.SUBMITTED or c._status.status is Status.RUNNING or c._status.status is Status.ERROR: + if c._status.status in {Status.SUBMITTED, Status.RUNNING, Status.ERROR}: files.append(c.statusFile) chunks.append(c) return files, chunks @@ -204,7 +204,7 @@ class ChunksMonitor(QObject): # update chunk status if last modification time has changed since previous record if fileModTime != chunk.statusFileLastModTime: chunk.updateStatusFromCache() - chunk.node.updateOutputAttr() + chunk.node.loadOutputAttr() def onFilePollerRefreshUpdated(self): """ @@ -579,7 +579,7 @@ class UIGraph(QObject): def updateGraphComputingStatus(self): # update graph computing status computingLocally = any([ - (ch.status.execMode == ExecMode.LOCAL and + (ch.status.execMode in {ExecMode.LOCAL, ExecMode.LOCAL_ISOLATED} and ch.status.sessionUid == sessionUid and ch.status.status in (Status.RUNNING, Status.SUBMITTED)) for ch in self._sortedDFSChunks])