Meshroom/meshroom/submitters/rippleSubmitter.py
2024-07-25 16:24:47 +02:00

151 lines
5.3 KiB
Python

import os
import json
#meshroom modules
from meshroom.core.desc import Level
from meshroom.core.submitter import BaseSubmitter
#mpc logging import
import mpc.logging
#Ripple imports
from mpc.ripple.rippleConfig import RippleConfig as _RippleConfig
from mpc.ripple.rippleProcess import RippleProcess
from mpc.ripple.dispatcher import DefaultDispatcher
from mpc.ripple.rippleStorage import RippleStorage
from mpc.ripple.rippleUtilities import RippleGroup
from mpc.ripple.rippleAttribute import RippleAttribute
#validators for numbers
from mpc.pyCore.validators import IntValidator
_log = mpc.logging.getLogger()
currentDir = os.path.dirname(os.path.realpath(__file__))
binDir = os.path.dirname(os.path.dirname(os.path.dirname(currentDir)))
# Give access to min/maxProcessors, which is an alias to slots
class RippleProcessWithSlots(RippleProcess):
minProcessors = RippleAttribute('', IntValidator(), 1, True)
maxProcessors = RippleAttribute('', IntValidator(), 1, True)
class RippleSubmitter(BaseSubmitter):
def __init__(self, parent=None):
super(RippleSubmitter, self).__init__(name='Ripple', parent=parent)
def createTask(self, meshroomFile, node, parents):
nbBlocks = 1
#Map meshroom GPU modes to MPC services
gpudict = {
"NONE":"",
"NORMAL":",cuda8G",
"INTENSIVE":",cuda16G"
}
#Specify some constraints
requirements = "!\"rs*\",@.mem>25{gpu}".format(gpu=gpudict[node.nodeDesc.gpu.name])
#decide if we need multiple slots
minProcessors = 1
maxProcessors = 1
if Level.INTENSIVE in (node.nodeDesc.ram, node.nodeDesc.cpu):
#at least 2 slots
minProcessors = 2
#if more than 2 are available without waiting, use 3 or 4
maxProcessors = 4
requirements = requirements + ",!\"rr*\""
elif Level.NORMAL in (node.nodeDesc.ram, node.nodeDesc.cpu):
#if 2 are available, otherwise 1
maxProcessors = 2
requirements = requirements + ",!\"rr*\""
#specify which node to wait before launching the current one
waitsFor = []
for parent in parents:
waitsFor.append(parent.name)
#Basic command line for this node
command='meshroom_compute --node {nodeName} "{meshroomFile}" --extern'.format(nodeName=node.name, meshroomFile=meshroomFile)
if node.isParallelized:
_, _, nbBlocks = node.nodeDesc.parallelization.getSizes(node)
#Create as many process as iteration (or chunks)
rippleprocs = []
for iteration in range(0, nbBlocks):
#Add iteration number
commandext = '{cmd} --iteration {iter}'.format(cmd=command, iter=iteration)
#Create process task with parameters
rippleproc = RippleProcessWithSlots(name='{name} iteration {iter}'.format(name=node.name, iter=iteration), discipline='ripple', appendKeys=True, keys=requirements, label=node.name, cmdList=[commandext], waitsFor=waitsFor, minProcessors=minProcessors, maxProcessors=maxProcessors)
rippleprocs.append(rippleproc)
rippleObj = RippleGroup(label="{name} Group".format(name=node.name), tasks=rippleprocs, name=node.name, waitsFor=waitsFor)
else:
rippleObj = RippleProcessWithSlots(name=node.name, discipline='ripple', appendKeys=True, keys=requirements, label=node.name, cmdList=[command], waitsFor=waitsFor, minProcessors=minProcessors, maxProcessors=maxProcessors)
return rippleObj
def submit(self, nodes, edges, filepath, submitLabel="{projectName}"):
projectName = os.path.splitext(os.path.basename(filepath))[0]
label = submitLabel.format(projectName=projectName)
#Build a tree
tree = {}
for node in nodes:
tree[node] = []
for end, start in edges:
tree[end].append(start)
nodesDone = set()
hasChange = True
tasks = []
#As long as a valid node was found in the previous iteration
while hasChange:
hasChange = False
toRemove = []
#Loop over all nodes in the graph
for node in tree.keys():
#Ignore a node already processed
found = False
if node.name in nodesDone:
found = True
if found:
continue
#Check if all parents are already visited
valid = True
for parent in tree[node]:
found = False
if parent.name in nodesDone:
found = True
if found is False:
valid = False
if valid is False:
continue
tasks.append(self.createTask(filepath, node, tree[node]))
toRemove.append(node.name)
hasChange = True
for itemRemove in toRemove:
nodesDone.add(itemRemove)
if (len(tasks) == 0):
return True
DefaultDispatcher(label=label, tasks=tasks, jobType='release', paused=False)()
return True