mirror of
https://github.com/alicevision/Meshroom.git
synced 2025-04-29 10:17:27 +02:00
Merge branches 'dev/split/descAttribute', 'dev/split/descNode', 'dev/split/descComputation' and 'dev/split/descModule' into dev/split/coreDescModule
This commit is contained in:
commit
c36cf99f77
4 changed files with 475 additions and 411 deletions
56
meshroom/core/desc/__init__.py
Normal file
56
meshroom/core/desc/__init__.py
Normal file
|
@ -0,0 +1,56 @@
|
||||||
|
from .attribute import (
|
||||||
|
Attribute,
|
||||||
|
BoolParam,
|
||||||
|
ChoiceParam,
|
||||||
|
ColorParam,
|
||||||
|
File,
|
||||||
|
FloatParam,
|
||||||
|
GroupAttribute,
|
||||||
|
IntParam,
|
||||||
|
ListAttribute,
|
||||||
|
PushButtonParam,
|
||||||
|
StringParam,
|
||||||
|
)
|
||||||
|
from .computation import (
|
||||||
|
DynamicNodeSize,
|
||||||
|
Level,
|
||||||
|
MultiDynamicNodeSize,
|
||||||
|
Parallelization,
|
||||||
|
Range,
|
||||||
|
StaticNodeSize,
|
||||||
|
)
|
||||||
|
from .node import (
|
||||||
|
AVCommandLineNode,
|
||||||
|
CommandLineNode,
|
||||||
|
InitNode,
|
||||||
|
InputNode,
|
||||||
|
Node,
|
||||||
|
)
|
||||||
|
|
||||||
|
__all__ = [
|
||||||
|
# attribute
|
||||||
|
"Attribute",
|
||||||
|
"BoolParam",
|
||||||
|
"ChoiceParam",
|
||||||
|
"ColorParam",
|
||||||
|
"File",
|
||||||
|
"FloatParam",
|
||||||
|
"GroupAttribute",
|
||||||
|
"IntParam",
|
||||||
|
"ListAttribute",
|
||||||
|
"PushButtonParam",
|
||||||
|
"StringParam",
|
||||||
|
# computation
|
||||||
|
"DynamicNodeSize",
|
||||||
|
"Level",
|
||||||
|
"MultiDynamicNodeSize",
|
||||||
|
"Parallelization",
|
||||||
|
"Range",
|
||||||
|
"StaticNodeSize",
|
||||||
|
# node
|
||||||
|
"AVCommandLineNode",
|
||||||
|
"CommandLineNode",
|
||||||
|
"InitNode",
|
||||||
|
"InputNode",
|
||||||
|
"Node",
|
||||||
|
]
|
|
@ -1,15 +1,10 @@
|
||||||
from meshroom.common import BaseObject, Property, Variant, VariantList, JSValue
|
|
||||||
from meshroom.core import cgroup
|
|
||||||
|
|
||||||
from collections.abc import Iterable
|
|
||||||
from enum import Enum
|
|
||||||
import math
|
|
||||||
import os
|
|
||||||
import psutil
|
|
||||||
import types
|
|
||||||
import ast
|
import ast
|
||||||
import distutils.util
|
import distutils.util
|
||||||
import shlex
|
import os
|
||||||
|
import types
|
||||||
|
from collections.abc import Iterable
|
||||||
|
|
||||||
|
from meshroom.common import BaseObject, JSValue, Property, Variant, VariantList
|
||||||
|
|
||||||
|
|
||||||
class Attribute(BaseObject):
|
class Attribute(BaseObject):
|
||||||
|
@ -531,404 +526,3 @@ class ColorParam(Param):
|
||||||
'color code (param: {}, value: {}, type: {})'.format(self.name, value, type(value)))
|
'color code (param: {}, value: {}, type: {})'.format(self.name, value, type(value)))
|
||||||
return value
|
return value
|
||||||
|
|
||||||
|
|
||||||
class Level(Enum):
|
|
||||||
NONE = 0
|
|
||||||
NORMAL = 1
|
|
||||||
INTENSIVE = 2
|
|
||||||
|
|
||||||
|
|
||||||
class Range:
|
|
||||||
def __init__(self, iteration=0, blockSize=0, fullSize=0):
|
|
||||||
self.iteration = iteration
|
|
||||||
self.blockSize = blockSize
|
|
||||||
self.fullSize = fullSize
|
|
||||||
|
|
||||||
@property
|
|
||||||
def start(self):
|
|
||||||
return self.iteration * self.blockSize
|
|
||||||
|
|
||||||
@property
|
|
||||||
def effectiveBlockSize(self):
|
|
||||||
remaining = (self.fullSize - self.start) + 1
|
|
||||||
return self.blockSize if remaining >= self.blockSize else remaining
|
|
||||||
|
|
||||||
@property
|
|
||||||
def end(self):
|
|
||||||
return self.start + self.effectiveBlockSize
|
|
||||||
|
|
||||||
@property
|
|
||||||
def last(self):
|
|
||||||
return self.end - 1
|
|
||||||
|
|
||||||
def toDict(self):
|
|
||||||
return {
|
|
||||||
"rangeIteration": self.iteration,
|
|
||||||
"rangeStart": self.start,
|
|
||||||
"rangeEnd": self.end,
|
|
||||||
"rangeLast": self.last,
|
|
||||||
"rangeBlockSize": self.blockSize,
|
|
||||||
"rangeEffectiveBlockSize": self.effectiveBlockSize,
|
|
||||||
"rangeFullSize": self.fullSize,
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
class Parallelization:
|
|
||||||
def __init__(self, staticNbBlocks=0, blockSize=0):
|
|
||||||
self.staticNbBlocks = staticNbBlocks
|
|
||||||
self.blockSize = blockSize
|
|
||||||
|
|
||||||
def getSizes(self, node):
|
|
||||||
"""
|
|
||||||
Args:
|
|
||||||
node:
|
|
||||||
Returns: (blockSize, fullSize, nbBlocks)
|
|
||||||
"""
|
|
||||||
size = node.size
|
|
||||||
if self.blockSize:
|
|
||||||
nbBlocks = int(math.ceil(float(size) / float(self.blockSize)))
|
|
||||||
return self.blockSize, size, nbBlocks
|
|
||||||
if self.staticNbBlocks:
|
|
||||||
return 1, self.staticNbBlocks, self.staticNbBlocks
|
|
||||||
return None
|
|
||||||
|
|
||||||
def getRange(self, node, iteration):
|
|
||||||
blockSize, fullSize, nbBlocks = self.getSizes(node)
|
|
||||||
return Range(iteration=iteration, blockSize=blockSize, fullSize=fullSize)
|
|
||||||
|
|
||||||
def getRanges(self, node):
|
|
||||||
blockSize, fullSize, nbBlocks = self.getSizes(node)
|
|
||||||
ranges = []
|
|
||||||
for i in range(nbBlocks):
|
|
||||||
ranges.append(Range(iteration=i, blockSize=blockSize, fullSize=fullSize))
|
|
||||||
return ranges
|
|
||||||
|
|
||||||
|
|
||||||
class DynamicNodeSize(object):
|
|
||||||
"""
|
|
||||||
DynamicNodeSize expresses a dependency to an input attribute to define
|
|
||||||
the size of a Node in terms of individual tasks for parallelization.
|
|
||||||
If the attribute is a link to another node, Node's size will be the same as this connected node.
|
|
||||||
If the attribute is a ListAttribute, Node's size will be the size of this list.
|
|
||||||
"""
|
|
||||||
def __init__(self, param):
|
|
||||||
self._param = param
|
|
||||||
|
|
||||||
def computeSize(self, node):
|
|
||||||
param = node.attribute(self._param)
|
|
||||||
# Link: use linked node's size
|
|
||||||
if param.isLink:
|
|
||||||
return param.getLinkParam().node.size
|
|
||||||
# ListAttribute: use list size
|
|
||||||
if isinstance(param.desc, ListAttribute):
|
|
||||||
return len(param)
|
|
||||||
if isinstance(param.desc, IntParam):
|
|
||||||
return param.value
|
|
||||||
return 1
|
|
||||||
|
|
||||||
|
|
||||||
class MultiDynamicNodeSize(object):
|
|
||||||
"""
|
|
||||||
MultiDynamicNodeSize expresses dependencies to multiple input attributes to
|
|
||||||
define the size of a node in terms of individual tasks for parallelization.
|
|
||||||
Works as DynamicNodeSize and sum the sizes of each dependency.
|
|
||||||
"""
|
|
||||||
def __init__(self, params):
|
|
||||||
"""
|
|
||||||
Args:
|
|
||||||
params (list): list of input attributes names
|
|
||||||
"""
|
|
||||||
assert isinstance(params, (list, tuple))
|
|
||||||
self._params = params
|
|
||||||
|
|
||||||
def computeSize(self, node):
|
|
||||||
size = 0
|
|
||||||
for param in self._params:
|
|
||||||
param = node.attribute(param)
|
|
||||||
if param.isLink:
|
|
||||||
size += param.getLinkParam().node.size
|
|
||||||
elif isinstance(param.desc, ListAttribute):
|
|
||||||
size += len(param)
|
|
||||||
else:
|
|
||||||
size += 1
|
|
||||||
return size
|
|
||||||
|
|
||||||
|
|
||||||
class StaticNodeSize(object):
|
|
||||||
"""
|
|
||||||
StaticNodeSize expresses a static Node size in terms of individual tasks for parallelization.
|
|
||||||
"""
|
|
||||||
def __init__(self, size):
|
|
||||||
self._size = size
|
|
||||||
|
|
||||||
def computeSize(self, node):
|
|
||||||
return self._size
|
|
||||||
|
|
||||||
|
|
||||||
class Node(object):
|
|
||||||
"""
|
|
||||||
"""
|
|
||||||
internalFolder = '{cache}/{nodeType}/{uid}/'
|
|
||||||
cpu = Level.NORMAL
|
|
||||||
gpu = Level.NONE
|
|
||||||
ram = Level.NORMAL
|
|
||||||
packageName = ''
|
|
||||||
packageVersion = ''
|
|
||||||
internalInputs = [
|
|
||||||
StringParam(
|
|
||||||
name="invalidation",
|
|
||||||
label="Invalidation Message",
|
|
||||||
description="A message that will invalidate the node's output folder.\n"
|
|
||||||
"This is useful for development, we can invalidate the output of the node when we modify the code.\n"
|
|
||||||
"It is displayed in bold font in the invalidation/comment messages tooltip.",
|
|
||||||
value="",
|
|
||||||
semantic="multiline",
|
|
||||||
advanced=True,
|
|
||||||
uidIgnoreValue="", # If the invalidation string is empty, it does not participate to the node's UID
|
|
||||||
),
|
|
||||||
StringParam(
|
|
||||||
name="comment",
|
|
||||||
label="Comments",
|
|
||||||
description="User comments describing this specific node instance.\n"
|
|
||||||
"It is displayed in regular font in the invalidation/comment messages tooltip.",
|
|
||||||
value="",
|
|
||||||
semantic="multiline",
|
|
||||||
invalidate=False,
|
|
||||||
),
|
|
||||||
StringParam(
|
|
||||||
name="label",
|
|
||||||
label="Node's Label",
|
|
||||||
description="Customize the default label (to replace the technical name of the node instance).",
|
|
||||||
value="",
|
|
||||||
invalidate=False,
|
|
||||||
),
|
|
||||||
ColorParam(
|
|
||||||
name="color",
|
|
||||||
label="Color",
|
|
||||||
description="Custom color for the node (SVG name or hexadecimal code).",
|
|
||||||
value="",
|
|
||||||
invalidate=False,
|
|
||||||
)
|
|
||||||
]
|
|
||||||
inputs = []
|
|
||||||
outputs = []
|
|
||||||
size = StaticNodeSize(1)
|
|
||||||
parallelization = None
|
|
||||||
documentation = ''
|
|
||||||
category = 'Other'
|
|
||||||
|
|
||||||
def __init__(self):
|
|
||||||
super(Node, self).__init__()
|
|
||||||
self.hasDynamicOutputAttribute = any(output.isDynamicValue for output in self.outputs)
|
|
||||||
|
|
||||||
def upgradeAttributeValues(self, attrValues, fromVersion):
|
|
||||||
return attrValues
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def update(cls, node):
|
|
||||||
""" Method call before node's internal update on invalidation.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
node: the BaseNode instance being updated
|
|
||||||
See Also:
|
|
||||||
BaseNode.updateInternals
|
|
||||||
"""
|
|
||||||
pass
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def postUpdate(cls, node):
|
|
||||||
""" Method call after node's internal update on invalidation.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
node: the BaseNode instance being updated
|
|
||||||
See Also:
|
|
||||||
NodeBase.updateInternals
|
|
||||||
"""
|
|
||||||
pass
|
|
||||||
|
|
||||||
def preprocess(self, node):
|
|
||||||
""" Gets invoked just before the processChunk method for the node.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
node: The BaseNode instance about to be processed.
|
|
||||||
"""
|
|
||||||
pass
|
|
||||||
|
|
||||||
def postprocess(self, node):
|
|
||||||
""" Gets invoked after the processChunk method for the node.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
node: The BaseNode instance which is processed.
|
|
||||||
"""
|
|
||||||
pass
|
|
||||||
|
|
||||||
def stopProcess(self, chunk):
|
|
||||||
raise NotImplementedError('No stopProcess implementation on node: {}'.format(chunk.node.name))
|
|
||||||
|
|
||||||
def processChunk(self, chunk):
|
|
||||||
raise NotImplementedError('No processChunk implementation on node: "{}"'.format(chunk.node.name))
|
|
||||||
|
|
||||||
|
|
||||||
class InputNode(Node):
|
|
||||||
"""
|
|
||||||
Node that does not need to be processed, it is just a placeholder for inputs.
|
|
||||||
"""
|
|
||||||
def __init__(self):
|
|
||||||
super(InputNode, self).__init__()
|
|
||||||
|
|
||||||
def processChunk(self, chunk):
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
class CommandLineNode(Node):
|
|
||||||
"""
|
|
||||||
"""
|
|
||||||
commandLine = '' # need to be defined on the node
|
|
||||||
parallelization = None
|
|
||||||
commandLineRange = ''
|
|
||||||
|
|
||||||
def __init__(self):
|
|
||||||
super(CommandLineNode, self).__init__()
|
|
||||||
|
|
||||||
def buildCommandLine(self, chunk):
|
|
||||||
|
|
||||||
cmdPrefix = ''
|
|
||||||
# If rez available in env, we use it
|
|
||||||
if "REZ_ENV" in os.environ and chunk.node.packageVersion:
|
|
||||||
# If the node package is already in the environment, we don't need a new dedicated rez environment
|
|
||||||
alreadyInEnv = os.environ.get("REZ_{}_VERSION".format(chunk.node.packageName.upper()),
|
|
||||||
"").startswith(chunk.node.packageVersion)
|
|
||||||
if not alreadyInEnv:
|
|
||||||
cmdPrefix = '{rez} {packageFullName} -- '.format(rez=os.environ.get("REZ_ENV"),
|
|
||||||
packageFullName=chunk.node.packageFullName)
|
|
||||||
|
|
||||||
cmdSuffix = ''
|
|
||||||
if chunk.node.isParallelized and chunk.node.size > 1:
|
|
||||||
cmdSuffix = ' ' + self.commandLineRange.format(**chunk.range.toDict())
|
|
||||||
|
|
||||||
return cmdPrefix + chunk.node.nodeDesc.commandLine.format(**chunk.node._cmdVars) + cmdSuffix
|
|
||||||
|
|
||||||
def stopProcess(self, chunk):
|
|
||||||
# The same node could exists several times in the graph and
|
|
||||||
# only one would have the running subprocess; ignore all others
|
|
||||||
if not hasattr(chunk, "subprocess"):
|
|
||||||
return
|
|
||||||
if chunk.subprocess:
|
|
||||||
# Kill process tree
|
|
||||||
processes = chunk.subprocess.children(recursive=True) + [chunk.subprocess]
|
|
||||||
try:
|
|
||||||
for process in processes:
|
|
||||||
process.terminate()
|
|
||||||
except psutil.NoSuchProcess:
|
|
||||||
pass
|
|
||||||
|
|
||||||
def processChunk(self, chunk):
|
|
||||||
try:
|
|
||||||
with open(chunk.logFile, 'w') as logF:
|
|
||||||
cmd = self.buildCommandLine(chunk)
|
|
||||||
chunk.status.commandLine = cmd
|
|
||||||
chunk.saveStatusFile()
|
|
||||||
print(' - commandLine: {}'.format(cmd))
|
|
||||||
print(' - logFile: {}'.format(chunk.logFile))
|
|
||||||
chunk.subprocess = psutil.Popen(shlex.split(cmd), stdout=logF, stderr=logF, cwd=chunk.node.internalFolder)
|
|
||||||
|
|
||||||
# Store process static info into the status file
|
|
||||||
# chunk.status.env = node.proc.environ()
|
|
||||||
# chunk.status.createTime = node.proc.create_time()
|
|
||||||
|
|
||||||
chunk.statThread.proc = chunk.subprocess
|
|
||||||
stdout, stderr = chunk.subprocess.communicate()
|
|
||||||
chunk.subprocess.wait()
|
|
||||||
|
|
||||||
chunk.status.returnCode = chunk.subprocess.returncode
|
|
||||||
|
|
||||||
if chunk.subprocess.returncode != 0:
|
|
||||||
with open(chunk.logFile, 'r') as logF:
|
|
||||||
logContent = ''.join(logF.readlines())
|
|
||||||
raise RuntimeError('Error on node "{}":\nLog:\n{}'.format(chunk.name, logContent))
|
|
||||||
except Exception:
|
|
||||||
raise
|
|
||||||
finally:
|
|
||||||
chunk.subprocess = None
|
|
||||||
|
|
||||||
|
|
||||||
# Specific command line node for AliceVision apps
|
|
||||||
class AVCommandLineNode(CommandLineNode):
|
|
||||||
|
|
||||||
cgroupParsed = False
|
|
||||||
cmdMem = ''
|
|
||||||
cmdCore = ''
|
|
||||||
|
|
||||||
def __init__(self):
|
|
||||||
super(AVCommandLineNode, self).__init__()
|
|
||||||
|
|
||||||
if AVCommandLineNode.cgroupParsed is False:
|
|
||||||
|
|
||||||
AVCommandLineNode.cmdMem = ''
|
|
||||||
memSize = cgroup.getCgroupMemorySize()
|
|
||||||
if memSize > 0:
|
|
||||||
AVCommandLineNode.cmdMem = ' --maxMemory={memSize}'.format(memSize=memSize)
|
|
||||||
|
|
||||||
AVCommandLineNode.cmdCore = ''
|
|
||||||
coresCount = cgroup.getCgroupCpuCount()
|
|
||||||
if coresCount > 0:
|
|
||||||
AVCommandLineNode.cmdCore = ' --maxCores={coresCount}'.format(coresCount=coresCount)
|
|
||||||
|
|
||||||
AVCommandLineNode.cgroupParsed = True
|
|
||||||
|
|
||||||
def buildCommandLine(self, chunk):
|
|
||||||
commandLineString = super(AVCommandLineNode, self).buildCommandLine(chunk)
|
|
||||||
|
|
||||||
return commandLineString + AVCommandLineNode.cmdMem + AVCommandLineNode.cmdCore
|
|
||||||
|
|
||||||
|
|
||||||
class InitNode(object):
|
|
||||||
def __init__(self):
|
|
||||||
super(InitNode, self).__init__()
|
|
||||||
|
|
||||||
def initialize(self, node, inputs, recursiveInputs):
|
|
||||||
"""
|
|
||||||
Initialize the attributes that are needed for a node to start running.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
node (Node): the node whose attributes must be initialized
|
|
||||||
inputs (list): the user-provided list of input files/directories
|
|
||||||
recursiveInputs (list): the user-provided list of input directories to search recursively for images
|
|
||||||
"""
|
|
||||||
pass
|
|
||||||
|
|
||||||
def resetAttributes(self, node, attributeNames):
|
|
||||||
"""
|
|
||||||
Reset the values of the provided attributes for a node.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
node (Node): the node whose attributes are to be reset
|
|
||||||
attributeNames (list): the list containing the names of the attributes to reset
|
|
||||||
"""
|
|
||||||
for attrName in attributeNames:
|
|
||||||
if node.hasAttribute(attrName):
|
|
||||||
node.attribute(attrName).resetToDefaultValue()
|
|
||||||
|
|
||||||
def extendAttributes(self, node, attributesDict):
|
|
||||||
"""
|
|
||||||
Extend the values of the provided attributes for a node.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
node (Node): the node whose attributes are to be extended
|
|
||||||
attributesDict (dict): the dictionary containing the attributes' names (as keys) and the values to extend with
|
|
||||||
"""
|
|
||||||
for attr in attributesDict.keys():
|
|
||||||
if node.hasAttribute(attr):
|
|
||||||
node.attribute(attr).extend(attributesDict[attr])
|
|
||||||
|
|
||||||
def setAttributes(self, node, attributesDict):
|
|
||||||
"""
|
|
||||||
Set the values of the provided attributes for a node.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
node (Node): the node whose attributes are to be extended
|
|
||||||
attributesDict (dict): the dictionary containing the attributes' names (as keys) and the values to set
|
|
||||||
"""
|
|
||||||
for attr in attributesDict:
|
|
||||||
if node.hasAttribute(attr):
|
|
||||||
node.attribute(attr).value = attributesDict[attr]
|
|
137
meshroom/core/desc/computation.py
Normal file
137
meshroom/core/desc/computation.py
Normal file
|
@ -0,0 +1,137 @@
|
||||||
|
import math
|
||||||
|
from enum import Enum
|
||||||
|
|
||||||
|
from .attribute import ListAttribute, IntParam
|
||||||
|
|
||||||
|
|
||||||
|
class Level(Enum):
|
||||||
|
NONE = 0
|
||||||
|
NORMAL = 1
|
||||||
|
INTENSIVE = 2
|
||||||
|
|
||||||
|
|
||||||
|
class Range:
|
||||||
|
def __init__(self, iteration=0, blockSize=0, fullSize=0):
|
||||||
|
self.iteration = iteration
|
||||||
|
self.blockSize = blockSize
|
||||||
|
self.fullSize = fullSize
|
||||||
|
|
||||||
|
@property
|
||||||
|
def start(self):
|
||||||
|
return self.iteration * self.blockSize
|
||||||
|
|
||||||
|
@property
|
||||||
|
def effectiveBlockSize(self):
|
||||||
|
remaining = (self.fullSize - self.start) + 1
|
||||||
|
return self.blockSize if remaining >= self.blockSize else remaining
|
||||||
|
|
||||||
|
@property
|
||||||
|
def end(self):
|
||||||
|
return self.start + self.effectiveBlockSize
|
||||||
|
|
||||||
|
@property
|
||||||
|
def last(self):
|
||||||
|
return self.end - 1
|
||||||
|
|
||||||
|
def toDict(self):
|
||||||
|
return {
|
||||||
|
"rangeIteration": self.iteration,
|
||||||
|
"rangeStart": self.start,
|
||||||
|
"rangeEnd": self.end,
|
||||||
|
"rangeLast": self.last,
|
||||||
|
"rangeBlockSize": self.blockSize,
|
||||||
|
"rangeEffectiveBlockSize": self.effectiveBlockSize,
|
||||||
|
"rangeFullSize": self.fullSize,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
class Parallelization:
|
||||||
|
def __init__(self, staticNbBlocks=0, blockSize=0):
|
||||||
|
self.staticNbBlocks = staticNbBlocks
|
||||||
|
self.blockSize = blockSize
|
||||||
|
|
||||||
|
def getSizes(self, node):
|
||||||
|
"""
|
||||||
|
Args:
|
||||||
|
node:
|
||||||
|
Returns: (blockSize, fullSize, nbBlocks)
|
||||||
|
"""
|
||||||
|
size = node.size
|
||||||
|
if self.blockSize:
|
||||||
|
nbBlocks = int(math.ceil(float(size) / float(self.blockSize)))
|
||||||
|
return self.blockSize, size, nbBlocks
|
||||||
|
if self.staticNbBlocks:
|
||||||
|
return 1, self.staticNbBlocks, self.staticNbBlocks
|
||||||
|
return None
|
||||||
|
|
||||||
|
def getRange(self, node, iteration):
|
||||||
|
blockSize, fullSize, nbBlocks = self.getSizes(node)
|
||||||
|
return Range(iteration=iteration, blockSize=blockSize, fullSize=fullSize)
|
||||||
|
|
||||||
|
def getRanges(self, node):
|
||||||
|
blockSize, fullSize, nbBlocks = self.getSizes(node)
|
||||||
|
ranges = []
|
||||||
|
for i in range(nbBlocks):
|
||||||
|
ranges.append(Range(iteration=i, blockSize=blockSize, fullSize=fullSize))
|
||||||
|
return ranges
|
||||||
|
|
||||||
|
|
||||||
|
class DynamicNodeSize(object):
|
||||||
|
"""
|
||||||
|
DynamicNodeSize expresses a dependency to an input attribute to define
|
||||||
|
the size of a Node in terms of individual tasks for parallelization.
|
||||||
|
If the attribute is a link to another node, Node's size will be the same as this connected node.
|
||||||
|
If the attribute is a ListAttribute, Node's size will be the size of this list.
|
||||||
|
"""
|
||||||
|
def __init__(self, param):
|
||||||
|
self._param = param
|
||||||
|
|
||||||
|
def computeSize(self, node):
|
||||||
|
param = node.attribute(self._param)
|
||||||
|
# Link: use linked node's size
|
||||||
|
if param.isLink:
|
||||||
|
return param.getLinkParam().node.size
|
||||||
|
# ListAttribute: use list size
|
||||||
|
if isinstance(param.desc, ListAttribute):
|
||||||
|
return len(param)
|
||||||
|
if isinstance(param.desc, IntParam):
|
||||||
|
return param.value
|
||||||
|
return 1
|
||||||
|
|
||||||
|
|
||||||
|
class MultiDynamicNodeSize(object):
|
||||||
|
"""
|
||||||
|
MultiDynamicNodeSize expresses dependencies to multiple input attributes to
|
||||||
|
define the size of a node in terms of individual tasks for parallelization.
|
||||||
|
Works as DynamicNodeSize and sum the sizes of each dependency.
|
||||||
|
"""
|
||||||
|
def __init__(self, params):
|
||||||
|
"""
|
||||||
|
Args:
|
||||||
|
params (list): list of input attributes names
|
||||||
|
"""
|
||||||
|
assert isinstance(params, (list, tuple))
|
||||||
|
self._params = params
|
||||||
|
|
||||||
|
def computeSize(self, node):
|
||||||
|
size = 0
|
||||||
|
for param in self._params:
|
||||||
|
param = node.attribute(param)
|
||||||
|
if param.isLink:
|
||||||
|
size += param.getLinkParam().node.size
|
||||||
|
elif isinstance(param.desc, ListAttribute):
|
||||||
|
size += len(param)
|
||||||
|
else:
|
||||||
|
size += 1
|
||||||
|
return size
|
||||||
|
|
||||||
|
|
||||||
|
class StaticNodeSize(object):
|
||||||
|
"""
|
||||||
|
StaticNodeSize expresses a static Node size in terms of individual tasks for parallelization.
|
||||||
|
"""
|
||||||
|
def __init__(self, size):
|
||||||
|
self._size = size
|
||||||
|
|
||||||
|
def computeSize(self, node):
|
||||||
|
return self._size
|
277
meshroom/core/desc/node.py
Normal file
277
meshroom/core/desc/node.py
Normal file
|
@ -0,0 +1,277 @@
|
||||||
|
import os
|
||||||
|
import psutil
|
||||||
|
import shlex
|
||||||
|
|
||||||
|
from .computation import Level, StaticNodeSize
|
||||||
|
from .attribute import StringParam, ColorParam
|
||||||
|
|
||||||
|
from meshroom.core import cgroup
|
||||||
|
|
||||||
|
|
||||||
|
class Node(object):
|
||||||
|
"""
|
||||||
|
"""
|
||||||
|
internalFolder = '{cache}/{nodeType}/{uid}/'
|
||||||
|
cpu = Level.NORMAL
|
||||||
|
gpu = Level.NONE
|
||||||
|
ram = Level.NORMAL
|
||||||
|
packageName = ''
|
||||||
|
packageVersion = ''
|
||||||
|
internalInputs = [
|
||||||
|
StringParam(
|
||||||
|
name="invalidation",
|
||||||
|
label="Invalidation Message",
|
||||||
|
description="A message that will invalidate the node's output folder.\n"
|
||||||
|
"This is useful for development, we can invalidate the output of the node when we modify the code.\n"
|
||||||
|
"It is displayed in bold font in the invalidation/comment messages tooltip.",
|
||||||
|
value="",
|
||||||
|
semantic="multiline",
|
||||||
|
advanced=True,
|
||||||
|
uidIgnoreValue="", # If the invalidation string is empty, it does not participate to the node's UID
|
||||||
|
),
|
||||||
|
StringParam(
|
||||||
|
name="comment",
|
||||||
|
label="Comments",
|
||||||
|
description="User comments describing this specific node instance.\n"
|
||||||
|
"It is displayed in regular font in the invalidation/comment messages tooltip.",
|
||||||
|
value="",
|
||||||
|
semantic="multiline",
|
||||||
|
invalidate=False,
|
||||||
|
),
|
||||||
|
StringParam(
|
||||||
|
name="label",
|
||||||
|
label="Node's Label",
|
||||||
|
description="Customize the default label (to replace the technical name of the node instance).",
|
||||||
|
value="",
|
||||||
|
invalidate=False,
|
||||||
|
),
|
||||||
|
ColorParam(
|
||||||
|
name="color",
|
||||||
|
label="Color",
|
||||||
|
description="Custom color for the node (SVG name or hexadecimal code).",
|
||||||
|
value="",
|
||||||
|
invalidate=False,
|
||||||
|
)
|
||||||
|
]
|
||||||
|
inputs = []
|
||||||
|
outputs = []
|
||||||
|
size = StaticNodeSize(1)
|
||||||
|
parallelization = None
|
||||||
|
documentation = ''
|
||||||
|
category = 'Other'
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
super(Node, self).__init__()
|
||||||
|
self.hasDynamicOutputAttribute = any(output.isDynamicValue for output in self.outputs)
|
||||||
|
|
||||||
|
def upgradeAttributeValues(self, attrValues, fromVersion):
|
||||||
|
return attrValues
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def update(cls, node):
|
||||||
|
""" Method call before node's internal update on invalidation.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
node: the BaseNode instance being updated
|
||||||
|
See Also:
|
||||||
|
BaseNode.updateInternals
|
||||||
|
"""
|
||||||
|
pass
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def postUpdate(cls, node):
|
||||||
|
""" Method call after node's internal update on invalidation.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
node: the BaseNode instance being updated
|
||||||
|
See Also:
|
||||||
|
NodeBase.updateInternals
|
||||||
|
"""
|
||||||
|
pass
|
||||||
|
|
||||||
|
def preprocess(self, node):
|
||||||
|
""" Gets invoked just before the processChunk method for the node.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
node: The BaseNode instance about to be processed.
|
||||||
|
"""
|
||||||
|
pass
|
||||||
|
|
||||||
|
def postprocess(self, node):
|
||||||
|
""" Gets invoked after the processChunk method for the node.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
node: The BaseNode instance which is processed.
|
||||||
|
"""
|
||||||
|
pass
|
||||||
|
|
||||||
|
def stopProcess(self, chunk):
|
||||||
|
raise NotImplementedError('No stopProcess implementation on node: {}'.format(chunk.node.name))
|
||||||
|
|
||||||
|
def processChunk(self, chunk):
|
||||||
|
raise NotImplementedError('No processChunk implementation on node: "{}"'.format(chunk.node.name))
|
||||||
|
|
||||||
|
|
||||||
|
class InputNode(Node):
|
||||||
|
"""
|
||||||
|
Node that does not need to be processed, it is just a placeholder for inputs.
|
||||||
|
"""
|
||||||
|
def __init__(self):
|
||||||
|
super(InputNode, self).__init__()
|
||||||
|
|
||||||
|
def processChunk(self, chunk):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class CommandLineNode(Node):
|
||||||
|
"""
|
||||||
|
"""
|
||||||
|
commandLine = '' # need to be defined on the node
|
||||||
|
parallelization = None
|
||||||
|
commandLineRange = ''
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
super(CommandLineNode, self).__init__()
|
||||||
|
|
||||||
|
def buildCommandLine(self, chunk):
|
||||||
|
|
||||||
|
cmdPrefix = ''
|
||||||
|
# If rez available in env, we use it
|
||||||
|
if "REZ_ENV" in os.environ and chunk.node.packageVersion:
|
||||||
|
# If the node package is already in the environment, we don't need a new dedicated rez environment
|
||||||
|
alreadyInEnv = os.environ.get("REZ_{}_VERSION".format(chunk.node.packageName.upper()),
|
||||||
|
"").startswith(chunk.node.packageVersion)
|
||||||
|
if not alreadyInEnv:
|
||||||
|
cmdPrefix = '{rez} {packageFullName} -- '.format(rez=os.environ.get("REZ_ENV"),
|
||||||
|
packageFullName=chunk.node.packageFullName)
|
||||||
|
|
||||||
|
cmdSuffix = ''
|
||||||
|
if chunk.node.isParallelized and chunk.node.size > 1:
|
||||||
|
cmdSuffix = ' ' + self.commandLineRange.format(**chunk.range.toDict())
|
||||||
|
|
||||||
|
return cmdPrefix + chunk.node.nodeDesc.commandLine.format(**chunk.node._cmdVars) + cmdSuffix
|
||||||
|
|
||||||
|
def stopProcess(self, chunk):
|
||||||
|
# The same node could exists several times in the graph and
|
||||||
|
# only one would have the running subprocess; ignore all others
|
||||||
|
if not hasattr(chunk, "subprocess"):
|
||||||
|
return
|
||||||
|
if chunk.subprocess:
|
||||||
|
# Kill process tree
|
||||||
|
processes = chunk.subprocess.children(recursive=True) + [chunk.subprocess]
|
||||||
|
try:
|
||||||
|
for process in processes:
|
||||||
|
process.terminate()
|
||||||
|
except psutil.NoSuchProcess:
|
||||||
|
pass
|
||||||
|
|
||||||
|
def processChunk(self, chunk):
|
||||||
|
try:
|
||||||
|
with open(chunk.logFile, 'w') as logF:
|
||||||
|
cmd = self.buildCommandLine(chunk)
|
||||||
|
chunk.status.commandLine = cmd
|
||||||
|
chunk.saveStatusFile()
|
||||||
|
print(' - commandLine: {}'.format(cmd))
|
||||||
|
print(' - logFile: {}'.format(chunk.logFile))
|
||||||
|
chunk.subprocess = psutil.Popen(shlex.split(cmd), stdout=logF, stderr=logF, cwd=chunk.node.internalFolder)
|
||||||
|
|
||||||
|
# Store process static info into the status file
|
||||||
|
# chunk.status.env = node.proc.environ()
|
||||||
|
# chunk.status.createTime = node.proc.create_time()
|
||||||
|
|
||||||
|
chunk.statThread.proc = chunk.subprocess
|
||||||
|
stdout, stderr = chunk.subprocess.communicate()
|
||||||
|
chunk.subprocess.wait()
|
||||||
|
|
||||||
|
chunk.status.returnCode = chunk.subprocess.returncode
|
||||||
|
|
||||||
|
if chunk.subprocess.returncode != 0:
|
||||||
|
with open(chunk.logFile, 'r') as logF:
|
||||||
|
logContent = ''.join(logF.readlines())
|
||||||
|
raise RuntimeError('Error on node "{}":\nLog:\n{}'.format(chunk.name, logContent))
|
||||||
|
except Exception:
|
||||||
|
raise
|
||||||
|
finally:
|
||||||
|
chunk.subprocess = None
|
||||||
|
|
||||||
|
|
||||||
|
# Specific command line node for AliceVision apps
|
||||||
|
class AVCommandLineNode(CommandLineNode):
|
||||||
|
|
||||||
|
cgroupParsed = False
|
||||||
|
cmdMem = ''
|
||||||
|
cmdCore = ''
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
super(AVCommandLineNode, self).__init__()
|
||||||
|
|
||||||
|
if AVCommandLineNode.cgroupParsed is False:
|
||||||
|
|
||||||
|
AVCommandLineNode.cmdMem = ''
|
||||||
|
memSize = cgroup.getCgroupMemorySize()
|
||||||
|
if memSize > 0:
|
||||||
|
AVCommandLineNode.cmdMem = ' --maxMemory={memSize}'.format(memSize=memSize)
|
||||||
|
|
||||||
|
AVCommandLineNode.cmdCore = ''
|
||||||
|
coresCount = cgroup.getCgroupCpuCount()
|
||||||
|
if coresCount > 0:
|
||||||
|
AVCommandLineNode.cmdCore = ' --maxCores={coresCount}'.format(coresCount=coresCount)
|
||||||
|
|
||||||
|
AVCommandLineNode.cgroupParsed = True
|
||||||
|
|
||||||
|
def buildCommandLine(self, chunk):
|
||||||
|
commandLineString = super(AVCommandLineNode, self).buildCommandLine(chunk)
|
||||||
|
|
||||||
|
return commandLineString + AVCommandLineNode.cmdMem + AVCommandLineNode.cmdCore
|
||||||
|
|
||||||
|
|
||||||
|
class InitNode(object):
|
||||||
|
def __init__(self):
|
||||||
|
super(InitNode, self).__init__()
|
||||||
|
|
||||||
|
def initialize(self, node, inputs, recursiveInputs):
|
||||||
|
"""
|
||||||
|
Initialize the attributes that are needed for a node to start running.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
node (Node): the node whose attributes must be initialized
|
||||||
|
inputs (list): the user-provided list of input files/directories
|
||||||
|
recursiveInputs (list): the user-provided list of input directories to search recursively for images
|
||||||
|
"""
|
||||||
|
pass
|
||||||
|
|
||||||
|
def resetAttributes(self, node, attributeNames):
|
||||||
|
"""
|
||||||
|
Reset the values of the provided attributes for a node.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
node (Node): the node whose attributes are to be reset
|
||||||
|
attributeNames (list): the list containing the names of the attributes to reset
|
||||||
|
"""
|
||||||
|
for attrName in attributeNames:
|
||||||
|
if node.hasAttribute(attrName):
|
||||||
|
node.attribute(attrName).resetToDefaultValue()
|
||||||
|
|
||||||
|
def extendAttributes(self, node, attributesDict):
|
||||||
|
"""
|
||||||
|
Extend the values of the provided attributes for a node.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
node (Node): the node whose attributes are to be extended
|
||||||
|
attributesDict (dict): the dictionary containing the attributes' names (as keys) and the values to extend with
|
||||||
|
"""
|
||||||
|
for attr in attributesDict.keys():
|
||||||
|
if node.hasAttribute(attr):
|
||||||
|
node.attribute(attr).extend(attributesDict[attr])
|
||||||
|
|
||||||
|
def setAttributes(self, node, attributesDict):
|
||||||
|
"""
|
||||||
|
Set the values of the provided attributes for a node.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
node (Node): the node whose attributes are to be extended
|
||||||
|
attributesDict (dict): the dictionary containing the attributes' names (as keys) and the values to set
|
||||||
|
"""
|
||||||
|
for attr in attributesDict:
|
||||||
|
if node.hasAttribute(attr):
|
||||||
|
node.attribute(attr).value = attributesDict[attr]
|
Loading…
Add table
Reference in a new issue