diff --git a/bin/meshroom_compute b/bin/meshroom_compute index 77755204..59d8b647 100755 --- a/bin/meshroom_compute +++ b/bin/meshroom_compute @@ -85,7 +85,7 @@ if args.node: if chunk.status.status in submittedStatuses: # Particular case for the local isolated, the node status is set to RUNNING by the submitter directly. # We ensure that no other instance has started to compute, by checking that the sessionUid is empty. - if chunk.status.mrNodeType == meshroom.core.MrNodeType.NODE and not chunk.status.sessionUid and chunk.status.submitterSessionUid: + if chunk.node.getMrNodeType() == meshroom.core.MrNodeType.NODE and not chunk.status.sessionUid and chunk.status.submitterSessionUid: continue print(f'Warning: Node is already submitted with status "{chunk.status.status.name}". See file: "{chunk.statusFile}". ExecMode: {chunk.status.execMode.name}, SessionUid: {chunk.status.sessionUid}, submitterSessionUid: {chunk.status.submitterSessionUid}') # sys.exit(-1) diff --git a/meshroom/core/node.py b/meshroom/core/node.py index 1f997e3c..a65ae3f4 100644 --- a/meshroom/core/node.py +++ b/meshroom/core/node.py @@ -16,9 +16,10 @@ from collections import namedtuple from enum import Enum, auto from typing import Callable, Optional + import meshroom from meshroom.common import Signal, Variant, Property, BaseObject, Slot, ListModel, DictModel -from meshroom.core import desc, stats, hashValue, nodeVersion, Version +from meshroom.core import desc, stats, hashValue, nodeVersion, Version, MrNodeType from meshroom.core.attribute import attributeFactory, ListAttribute, GroupAttribute, Attribute from meshroom.core.exception import NodeUpgradeError, UnknownNodeTypeError @@ -65,13 +66,14 @@ class StatusData(BaseObject): """ dateTimeFormatting = '%Y-%m-%d %H:%M:%S.%f' - def __init__(self, nodeName='', nodeType='', packageName='', packageVersion='', parent: BaseObject = None): + def __init__(self, nodeName='', nodeType='', packageName='', packageVersion='', mrNodeType: MrNodeType = MrNodeType.NONE, parent: BaseObject = None): super(StatusData, self).__init__(parent) self.reset() self.nodeName: str = nodeName self.nodeType: str = nodeType self.packageName: str = packageName self.packageVersion: str = packageVersion + self.mrNodeType = mrNodeType self.sessionUid: Optional[str] = meshroom.core.sessionUid self.submitterSessionUid: Optional[str] = None @@ -85,7 +87,7 @@ class StatusData(BaseObject): self.nodeType: str = "" self.packageName: str = "" self.packageVersion: str = "" - self.mrNodeType: desc.MrNodeType = desc.MrNodeType.NONE + self.mrNodeType: MrNodeType = MrNodeType.NONE self.resetDynamicValues() def resetDynamicValues(self): @@ -112,16 +114,27 @@ class StatusData(BaseObject): ''' When submitting a node, we reset the status information to ensure that we do not keep outdated information. ''' self.resetDynamicValues() - self.mrNodeType = desc.MrNodeType.NODE + # assert(self.mrNodeType == MrNodeType.NODE) self.sessionUid = None self.submitterSessionUid = meshroom.core.sessionUid - def initSubmit(self): + def initExternSubmit(self): ''' When submitting a node, we reset the status information to ensure that we do not keep outdated information. ''' self.resetDynamicValues() self.sessionUid = None self.submitterSessionUid = meshroom.core.sessionUid + self.status = Status.SUBMITTED + self.execMode = ExecMode.EXTERN + + def initLocalSubmit(self): + ''' When submitting a node, we reset the status information to ensure that we do not keep outdated information. + ''' + self.resetDynamicValues() + self.sessionUid = None + self.submitterSessionUid = meshroom.core.sessionUid + self.status = Status.SUBMITTED + self.execMode = ExecMode.LOCAL def initEndCompute(self): self.sessionUid = meshroom.core.sessionUid @@ -152,9 +165,9 @@ class StatusData(BaseObject): self.execMode = d.get('execMode', ExecMode.NONE) if not isinstance(self.execMode, ExecMode): self.execMode = ExecMode[self.execMode] - self.mrNodeType = d.get('mrNodeType', desc.MrNodeType.NONE) - if not isinstance(self.mrNodeType, desc.MrNodeType): - self.mrNodeType = desc.MrNodeType[self.mrNodeType] + self.mrNodeType = d.get('mrNodeType', MrNodeType.NONE) + if not isinstance(self.mrNodeType, MrNodeType): + self.mrNodeType = MrNodeType[self.mrNodeType] self.nodeName = d.get('nodeName', '') self.nodeType = d.get('nodeType', '') @@ -279,7 +292,7 @@ class NodeChunk(BaseObject): self.node = node self.range = range self.logManager: LogManager = LogManager(self) - self._status: StatusData = StatusData(node.name, node.nodeType, node.packageName, node.packageVersion) + self._status: StatusData = StatusData(node.name, node.nodeType, node.packageName, node.packageVersion, node.getMrNodeType()) self.statistics: stats.Statistics = stats.Statistics() self.statusFileLastModTime = -1 self.subprocess = None @@ -430,7 +443,7 @@ class NodeChunk(BaseObject): # Start the process environment for nodes running in isolation. # This only happens once, when the node has the SUBMITTED status. # The sub-process will go through this method again, but the node status will have been set to RUNNING. - if not inCurrentEnv and isinstance(self.node.nodeDesc, desc.Node): + if not inCurrentEnv and self.node.getMrNodeType() == MrNodeType.NODE: self._processInIsolatedEnvironment() return @@ -447,6 +460,7 @@ class NodeChunk(BaseObject): self.node.saveOutputAttr() executionStatus = Status.SUCCESS except Exception: + self.updateStatusFromCache() # check if the status has been updated by another process if self._status.status != Status.STOPPED: executionStatus = Status.ERROR raise @@ -474,7 +488,7 @@ class NodeChunk(BaseObject): except: # status should be already updated by meshroom_compute self.updateStatusFromCache() - if self._status.status != Status.ERROR: + if self._status.status not in (Status.ERROR, Status.STOPPED, Status.KILLED): # If meshroom_compute has crashed or been killed, the status may have not been set to ERROR. # In this particular case, we enforce it from here. self.upgradeStatusTo(Status.ERROR) @@ -485,26 +499,24 @@ class NodeChunk(BaseObject): self.node.updateOutputAttr() def stopProcess(self): - # if self.isExtern() or self._status.status != Status.RUNNING: - # return + if self.isExtern(): + raise ValueError("Cannot stop process: node is computed externally (another instance of Meshroom)") + if self._status.status != Status.RUNNING: + raise ValueError(f"Cannot stop process: node is not running (status is: {self._status.status}).") - self.upgradeStatusTo(Status.STOPPED) self.node.nodeDesc.stopProcess(self) + # Update the status to get latest information before changing it + self.updateStatusFromCache() + self.upgradeStatusTo(Status.STOPPED) + def isExtern(self): """ The computation is managed externally by another instance of Meshroom. In the ambiguous case of an isolated environment, it is considered as local as we can stop it. """ - uid = self._status.submitterSessionUid if self._status.mrNodeType == desc.MrNodeType.NODE else meshroom.core.sessionUid + uid = self._status.submitterSessionUid if self.node.getMrNodeType() == MrNodeType.NODE else meshroom.core.sessionUid return uid != meshroom.core.sessionUid - # def isIndependantProcess(self): - # if self._status.execMode == ExecMode.EXTERN: - # # Compute is managed by another instance of Meshroom - # return True - # # Compute is using a meshroom_compute subprocess - # return self._status.mrNodeType == desc.MrNodeType.NODE - statusChanged = Signal() status = Property(Variant, lambda self: self._status, notify=statusChanged) statusName = Property(str, statusName.fget, notify=statusChanged) @@ -588,6 +600,11 @@ class BaseNode(BaseObject): except KeyError: raise e + def getMrNodeType(self): + if self.isCompatibilityNode: + return MrNodeType.NONE + return self.nodeDesc.getMrNodeType() + def getName(self): return self._name @@ -1128,12 +1145,13 @@ class BaseNode(BaseObject): def submit(self, forceCompute=False): for chunk in self._chunks: if forceCompute or chunk.status.status != Status.SUCCESS: - chunk._status.initSubmit() + chunk._status.initExternSubmit() chunk.upgradeStatusTo(Status.SUBMITTED, ExecMode.EXTERN) def beginSequence(self, forceCompute=False): for chunk in self._chunks: if forceCompute or (chunk.status.status not in (Status.RUNNING, Status.SUCCESS)): + chunk._status.initLocalSubmit() chunk.upgradeStatusTo(Status.SUBMITTED, ExecMode.LOCAL) def processIteration(self, iteration): @@ -1239,6 +1257,8 @@ class BaseNode(BaseObject): return Status.INPUT if not self._chunks: return Status.NONE + if len( self._chunks) == 1: + return self._chunks[0].status.status chunksStatus = [chunk.status.status for chunk in self._chunks] @@ -1257,11 +1277,14 @@ class BaseNode(BaseObject): @Slot(result=StatusData) def getFusedStatus(self): + if not self._chunks: + return StatusData() + if len(self._chunks) == 1: + return self._chunks[0].status fusedStatus = StatusData() - if self._chunks: - fusedStatus.fromDict(self._chunks[0].status.toDict()) - for chunk in self._chunks[1:]: - fusedStatus.merge(chunk.status) + fusedStatus.fromDict(self._chunks[0].status.toDict()) + for chunk in self._chunks[1:]: + fusedStatus.merge(chunk.status) fusedStatus.status = self.getGlobalStatus() return fusedStatus @@ -1417,7 +1440,8 @@ class BaseNode(BaseObject): if not self._chunks: return False for chunk in self._chunks: - uid = chunk.status.submitterSessionUid if chunk.status.mrNodeType == desc.MrNodeType.NODE else chunk.status.sessionUid + mrNodeType = chunk.node.getMrNodeType() + uid = chunk.status.submitterSessionUid if mrNodeType == MrNodeType.NODE else chunk.status.sessionUid if uid != meshroom.core.sessionUid: return False return True @@ -1426,7 +1450,6 @@ class BaseNode(BaseObject): def canBeStopped(self): # Only locked nodes running in local with the same # sessionUid as the Meshroom instance can be stopped - # logging.warning(f"[{self.name}] canBeStopped: globalExecMode={self.globalExecMode} globalStatus={self.getGlobalStatus()} statusInThisSession={self.statusInThisSession()}, submitterStatusInThisSession={self.submitterStatusInThisSession()}") return (self.getGlobalStatus() == Status.RUNNING and self.globalExecMode == ExecMode.LOCAL.name and self.initFromThisSession()) @@ -1435,7 +1458,6 @@ class BaseNode(BaseObject): def canBeCanceled(self): # Only locked nodes submitted in local with the same # sessionUid as the Meshroom instance can be canceled - # logging.warning(f"[{self.name}] canBeCanceled: globalExecMode={self.globalExecMode} globalStatus={self.getGlobalStatus()} statusInThisSession={self.statusInThisSession()}, submitterStatusInThisSession={self.submitterStatusInThisSession()}") return (self.getGlobalStatus() == Status.SUBMITTED and self.globalExecMode == ExecMode.LOCAL.name and self.initFromThisSession()) diff --git a/meshroom/ui/graph.py b/meshroom/ui/graph.py index 85bb3c2a..a41430b2 100644 --- a/meshroom/ui/graph.py +++ b/meshroom/ui/graph.py @@ -187,7 +187,7 @@ class ChunksMonitor(QObject): # when run locally, status changes are already notified. # Chunks with an ERROR status may be re-submitted externally and should thus still be monitored if (c.isExtern() and c._status.status in (Status.SUBMITTED, Status.RUNNING, Status.ERROR)) or ( - (c._status.mrNodeType == MrNodeType.NODE) and (c._status.status in (Status.SUBMITTED, Status.RUNNING))): + (c.node.getMrNodeType() == MrNodeType.NODE) and (c._status.status in (Status.SUBMITTED, Status.RUNNING))): files.append(c.statusFile) chunks.append(c) return files, chunks @@ -584,7 +584,7 @@ class UIGraph(QObject): def updateGraphComputingStatus(self): # update graph computing status computingLocally = any([ - ((ch.status.submitterSessionUid if ch.status.mrNodeType == MrNodeType.NODE else ch.status.sessionUid) == sessionUid) and ( + ((ch.status.submitterSessionUid if ch.node.getMrNodeType() == MrNodeType.NODE else ch.status.sessionUid) == sessionUid) and ( ch.status.status in (Status.RUNNING, Status.SUBMITTED)) for ch in self._sortedDFSChunks]) submitted = any([ch.status.status == Status.SUBMITTED for ch in self._sortedDFSChunks])