diff --git a/meshroom/core/desc.py b/meshroom/core/desc.py index 510df4c1..e5df6d54 100755 --- a/meshroom/core/desc.py +++ b/meshroom/core/desc.py @@ -2,6 +2,7 @@ from meshroom.common import BaseObject, Property, Variant from enum import Enum # available by default in python3. For python2: "pip install enum34" import collections import os +import psutil class Attribute(BaseObject): @@ -195,7 +196,52 @@ class Node(object): def __init__(self): pass + def stop(self, node): + pass + + def process(self, node): + raise NotImplementedError('No process implementation on this node') + class CommandLineNode(Node): """ """ + + def buildCommandLine(self, node): + cmdPrefix = '' + if 'REZ_ENV' in os.environ: + cmdPrefix = '{rez} {packageFullName} -- '.format(rez=os.environ.get('REZ_ENV'), packageFullName=node.packageFullName) + return cmdPrefix + node.nodeDesc.commandLine.format(**node._cmdVars) + + def stop(self, node): + if node.subprocess: + node.subprocess.terminate() + + def process(self, node): + try: + with open(node.logFile(), 'w') as logF: + cmd = self.buildCommandLine(node) + print(' - commandLine:', cmd) + print(' - logFile:', node.logFile()) + node.subprocess = psutil.Popen(cmd, stdout=logF, stderr=logF, shell=True) + + # store process static info into the status file + node.status.commandLine = cmd + # node.status.env = node.proc.environ() + # node.status.createTime = node.proc.create_time() + + node.statThread.proc = node.subprocess + stdout, stderr = node.subprocess.communicate() + node.subprocess.wait() + + node.status.returnCode = node.subprocess.returncode + + if node.subprocess.returncode != 0: + with open(node.logFile(), 'r') as logF: + logContent = ''.join(logF.readlines()) + raise RuntimeError('Error on node "{}":\nLog:\n{}'.format(node.name, logContent)) + except: + raise + finally: + node.subprocess = None + diff --git a/meshroom/core/graph.py b/meshroom/core/graph.py index 92e8fc30..6edd378d 100644 --- a/meshroom/core/graph.py +++ b/meshroom/core/graph.py @@ -51,6 +51,9 @@ else: stringIsLinkRe = re.compile('^\{.+\}$') +def isCollection(v): + return isinstance(v, collections.Iterable) and not isinstance(v, basestring) + @contextmanager def GraphModification(graph): """ @@ -510,6 +513,7 @@ class Node(BaseObject): def updateInternals(self): self._cmdVars = { 'cache': self.graph.cacheDir, + 'nodeType': self.nodeType, } for uidIndex, associatedAttributes in self.attributesPerUid.items(): assAttr = [(a.getName(), a.uid(uidIndex)) for a in associatedAttributes] @@ -541,10 +545,8 @@ class Node(BaseObject): if attr.isInput: continue # skip inputs attr.value = attr.attributeDesc.value.format( - nodeType=self.nodeType, **self._cmdVars) attr._invalidationValue = attr.attributeDesc.value.format( - nodeType=self.nodeType, **cmdVarsNoCache) v = attr.value @@ -559,13 +561,7 @@ class Node(BaseObject): @property def internalFolder(self): - return self.nodeDesc.internalFolder.format(nodeType=self.nodeType, **self._cmdVars) - - def commandLine(self): - cmdPrefix = '' - if 'REZ_ENV' in os.environ: - cmdPrefix = '{rez} {packageFullName} -- '.format(rez=os.environ.get('REZ_ENV'), packageFullName=self.packageFullName) - return cmdPrefix + self.nodeDesc.commandLine.format(nodeType=self.nodeType, **self._cmdVars) + return self.nodeDesc.internalFolder.format(**self._cmdVars) def statusFile(self): return os.path.join(self.graph.cacheDir, self.internalFolder, 'status') @@ -648,48 +644,26 @@ class Node(BaseObject): self.upgradeStatusTo(Status.SUBMITTED_LOCAL) def stopProcess(self): - if self._subprocess: - self._subprocess.terminate() + self.nodeDesc.stop(self) def process(self): global runningProcesses runningProcesses[self.name] = self self.upgradeStatusTo(Status.RUNNING) - statThread = stats.StatisticsThread(self) - statThread.start() + self.statThread = stats.StatisticsThread(self) + self.statThread.start() startTime = time.time() try: - with open(self.logFile(), 'w') as logF: - cmd = self.commandLine() - print(' - commandLine:', cmd) - print(' - logFile:', self.logFile()) - self._subprocess = psutil.Popen(cmd, stdout=logF, stderr=logF, shell=True) - - # store process static info into the status file - self.status.commandLine = cmd - # self.status.env = self.proc.environ() - # self.status.createTime = self.proc.create_time() - - statThread.proc = self._subprocess - stdout, stderr = self._subprocess.communicate() - self._subprocess.wait() - - self.status.returnCode = self._subprocess.returncode - - if self._subprocess.returncode != 0: - with open(self.logFile(), 'r') as logF: - logContent = ''.join(logF.readlines()) - raise RuntimeError('Error on node "{}":\nLog:\n{}'.format(self.name, logContent)) + self.nodeDesc.process(self) except BaseException: self.upgradeStatusTo(Status.ERROR) raise finally: elapsedTime = time.time() - startTime print(' - elapsed time:', elapsedTime) - self._subprocess = None # ask and wait for the stats thread to stop - statThread.stopRequest() - statThread.join() + self.statThread.stopRequest() + self.statThread.join() del runningProcesses[self.name] self.upgradeStatusTo(Status.SUCCESS) diff --git a/meshroom/core/stats.py b/meshroom/core/stats.py index 6d988cd2..d9ef5e38 100644 --- a/meshroom/core/stats.py +++ b/meshroom/core/stats.py @@ -190,7 +190,7 @@ class StatisticsThread(threading.Thread): def __init__(self, node): threading.Thread.__init__(self) self.node = node - self.proc = None + self.proc = psutil.Process() # by default current process pid self.statistics = self.node.statistics self._stopFlag = threading.Event()