Meshroom/meshroom/core/graph.py
2017-10-23 19:55:33 +02:00

1020 lines
34 KiB
Python

from __future__ import print_function
import collections
import hashlib
import json
import os
import psutil
import re
import shutil
import time
import uuid
from collections import defaultdict
from enum import Enum # available by default in python3. For python2: "pip install enum34"
from pprint import pprint
import logging
from . import stats
from . import desc
from meshroom import core as pg
from meshroom.common import BaseObject, DictModel, Slot, Signal, Property, Variant, ListModel
# Replace default encoder to support Enums
DefaultJSONEncoder = json.JSONEncoder # store the original one
class MyJSONEncoder(DefaultJSONEncoder): # declare a new one with Enum support
def default(self, obj):
if isinstance(obj, Enum):
return obj.name
return DefaultJSONEncoder.default(self, obj) # use the default one for all other types
json.JSONEncoder = MyJSONEncoder # replace the default implementation with our new one
try:
unicode = unicode
except NameError:
# 'unicode' is undefined, must be Python 3
str = str
unicode = str
bytes = bytes
basestring = (str, bytes)
else:
# 'unicode' exists, must be Python 2
str = str
unicode = unicode
bytes = str
basestring = basestring
def hash(v):
hashObject = hashlib.sha1(str(v).encode('utf-8'))
return hashObject.hexdigest()
def attribute_factory(description, name, value, node, parent=None):
# type: (desc.Attribute, str, (), Node, Attribute) -> Attribute
"""
Create an Attribute based on description type.
:param description: the Attribute description
:param name: name of the Attribute
:param value: value of the Attribute. Will be set if not None.
:param node: node owning the Attribute. Note that the created Attribute is not added to Node's attributes
:param parent: (optional) parent Attribute (must be ListAttribute or GroupAttribute)
:return:
"""
if isinstance(description, pg.desc.GroupAttribute):
cls = GroupAttribute
elif isinstance(description, pg.desc.ListAttribute):
cls = ListAttribute
else:
cls = Attribute
attr = cls(name, node, description, parent=parent)
if value is not None:
attr.value = value
return attr
class Attribute(BaseObject):
"""
"""
def __init__(self, name, node, attributeDesc, parent=None):
super(Attribute, self).__init__(parent)
self._name = name
self.node = node # type: Node
self.attributeDesc = attributeDesc
self._value = getattr(attributeDesc, 'value', None)
self._label = getattr(attributeDesc, 'label', None)
self._isOutput = getattr(attributeDesc, 'isOutput', False)
def absoluteName(self):
return '{}.{}.{}'.format(self.node.graph.name, self.node.name, self._name)
def fullName(self):
""" Name inside the Graph: nodeName.name """
if isinstance(self.parent(), ListAttribute):
return '{}[{}]'.format(self.parent().fullName(), self.parent().index(self))
elif isinstance(self.parent(), GroupAttribute):
return '{}.{}'.format(self.parent().fullName(), self._name)
return '{}.{}'.format(self.node.name, self._name)
def getName(self):
""" Attribute name """
return self._name
def getLabel(self):
return self._label
def _get_value(self):
return self.getLinkParam().value if self.isLink else self._value
def _set_value(self, value):
if self._value == value:
return
self._value = value
self.valueChanged.emit()
@property
def isOutput(self):
return self._isOutput
def uid(self):
"""
"""
if self.attributeDesc.isOutput:
# only dependent of the linked node uid, so it is independent
# from the cache folder which may be used in the filepath.
return self.node.uid()
if self.isLink:
return self.getLinkParam().uid()
if isinstance(self._value, basestring):
return hash(str(self._value))
return hash(self._value)
@property
def isLink(self):
"""
If the attribute is a link to another attribute.
"""
if self.attributeDesc.isOutput:
return False
else:
return self in self.node.graph.edges.keys()
def getLinkParam(self):
if not self.isLink:
return None
return self.node.graph.edge(self).src
def _applyExpr(self):
"""
For string parameters with an expression (when loaded from file),
this function convert the expression into a real edge in the graph
and clear the string value.
"""
v = self._value
if isinstance(v, Attribute):
g = self.node.graph
g.addEdge(v, self)
self._value = ""
elif isinstance(v, basestring) and len(v) > 2 and v[0] == '{' and v[-1] == '}':
# value is a link to another attribute
g = self.node.graph
link = v[1:-1]
linkNode, linkAttr = link.split('.')
g.addEdge(g.node(linkNode).attribute(linkAttr), self)
self._value = ""
def getExportValue(self):
value = self._value
# print('getExportValue: ', self.name(), value, self.isLink())
if self.isLink:
value = '{' + self.getLinkParam().fullName() + '}'
return value
name = Property(str, getName, constant=True)
label = Property(str, getLabel, constant=True)
desc = Property(desc.Attribute, lambda self: self.attributeDesc, constant=True)
valueChanged = Signal()
value = Property(Variant, _get_value, _set_value, notify=valueChanged)
isOutput = Property(bool, isOutput.fget, constant=True)
isLinkChanged = Signal()
isLink = Property(bool, isLink.fget, notify=isLinkChanged)
class ListAttribute(Attribute):
def __init__(self, name, node, attributeDesc, parent=None):
super(ListAttribute, self).__init__(name, node, attributeDesc, parent)
self._value = ListModel(parent=self)
def __getitem__(self, item):
return self._value.at(item)
def __len__(self):
return len(self._value)
def _set_value(self, value):
self._value.clear()
self.extend(value)
def append(self, value):
self.extend([value])
def insert(self, value, index):
attr = attribute_factory(self.attributeDesc.elementDesc, "", value, self.node, self)
self._value.insert(index, [attr])
def index(self, item):
return self._value.indexOf(item)
def extend(self, values):
childAttributes = []
for value in values:
attr = attribute_factory(self.attributeDesc.elementDesc, "", value, self.node, parent=self)
childAttributes.append(attr)
self._value.extend(childAttributes)
def remove(self, index):
self._value.removeAt(index)
def getExportValue(self):
return [attr.getExportValue() for attr in self._value]
# Override value property setter
value = Property(Variant, Attribute._get_value, _set_value, notify=Attribute.valueChanged)
class GroupAttribute(Attribute):
def __init__(self, name, node, attributeDesc, parent=None):
super(GroupAttribute, self).__init__(name, node, attributeDesc, parent)
self._value = DictModel(keyAttrName='name', parent=self)
subAttributes = []
for name, subAttrDesc in self.attributeDesc.groupDesc.items():
childAttr = attribute_factory(subAttrDesc, name, None, self.node, parent=self)
subAttributes.append(childAttr)
self._value.reset(subAttributes)
def __getattr__(self, key):
try:
return super(GroupAttribute, self).__getattr__(key)
except AttributeError:
try:
return self._value.get(key)
except KeyError:
raise AttributeError(key)
def _set_value(self, exportedValue):
# set individual child attribute values
for key, value in exportedValue.items():
self._value.get(key).value = value
def getExportValue(self):
return {key: attr.getExportValue() for key, attr in self._value.objects.items()}
# Override value property
value = Property(Variant, Attribute._get_value, _set_value, notify=Attribute.valueChanged)
class Edge(BaseObject):
def __init__(self, src, dst, parent=None):
super(Edge, self).__init__(parent)
self._src = src
self._dst = dst
@property
def src(self):
return self._src
@property
def dst(self):
return self._dst
src = Property(Attribute, src.fget, constant=True)
dst = Property(Attribute, dst.fget, constant=True)
class Status(Enum):
"""
"""
NONE = 1
SUBMITTED_EXTERN = 2
SUBMITTED_LOCAL = 3
RUNNING = 4
ERROR = 5
SUCCESS = 6
class StatusData:
"""
"""
def __init__(self, nodeName, nodeType):
self.status = Status.NONE
self.nodeName = nodeName
self.nodeType = nodeType
self.graph = ''
self.commandLine = None
self.env = None
def toDict(self):
return self.__dict__
def fromDict(self, d):
self.status = Status._member_map_[d['status']]
self.nodeName = d.get('nodeName', '')
self.nodeType = d.get('nodeType', '')
self.graph = d.get('graph', '')
self.commandLine = d.get('commandLine', '')
self.env = d.get('env', '')
class Node(BaseObject):
"""
"""
# Regexp handling complex attribute names with recursive understanding of Lists and Groups
# i.e: a.b, a[0], a[0].b.c[1]
attributeRE = re.compile(r'\.?(?P<name>\w+)(?:\[(?P<index>\d+)\])?')
def __init__(self, nodeDesc, parent=None, **kwargs):
super(Node, self).__init__(parent)
self._name = None # type: str
self.graph = None # type: Graph
self.nodeDesc = pg.nodesDesc[nodeDesc]()
self._cmdVars = {}
self._attributes = DictModel(keyAttrName='name', parent=self)
self.attributesPerUid = defaultdict(set)
self._initFromDesc()
for k, v in kwargs.items():
self.attribute(k).value = v
self.status = StatusData(self.name, self.nodeType)
self.statistics = stats.Statistics()
self._subprocess = None
def __getattr__(self, k):
try:
# Throws exception if not in prototype chain
# return object.__getattribute__(self, k) # doesn't work in python2
return object.__getattr__(self, k)
except AttributeError:
try:
return self.attribute(k)
except KeyError:
raise AttributeError(k)
def getName(self):
return self._name
@Slot(str, result=Attribute)
def attribute(self, name):
att = None
# Complex name indicating group or list attribute
if '[' in name or '.' in name:
p = self.attributeRE.findall(name)
for n, idx in p:
# first step: get root attribute
if att is None:
att = self._attributes.get(n)
else:
# get child Attribute in Group
assert isinstance(att, GroupAttribute)
att = att.value.get(n)
if idx != '':
# get child Attribute in List
assert isinstance(att, ListAttribute)
att = att.value.at(int(idx))
else:
att = self._attributes.get(name)
return att
def getAttributes(self):
return self._attributes
def _initFromDesc(self):
# Init from class and instance members
for name, desc in vars(self.nodeDesc.__class__).items() + vars(self.nodeDesc).items():
if issubclass(desc.__class__, pg.desc.Attribute):
self._attributes.add(attribute_factory(desc, name, None, self))
# List attributes per uid
for attr in self._attributes:
for uidIndex in attr.attributeDesc.uid:
self.attributesPerUid[uidIndex].add(attr)
def _applyExpr(self):
for attr in self._attributes:
attr._applyExpr()
@property
def nodeType(self):
return self.nodeDesc.__class__.__name__
def uid(self):
return self.nodeUid
def _updateUid(self):
hashInputParams = [(attr.getName(), attr.uid()) for attr in self._attributes if
not attr.attributeDesc.isOutput]
hashInputParams.sort()
self.nodeUid = hash(tuple([b for a, b in hashInputParams]))
return self.nodeUid
@property
def depth(self):
return self.graph.getDepth(self)
def toDict(self):
attributes = {k: v.getExportValue() for k, v in self._attributes.objects.items()}
return {
'nodeType': self.nodeType,
'attributes': {k: v for k, v in attributes.items() if v is not None}, # filter empty values
}
def updateInternals(self):
self._updateUid()
self._cmdVars = {
'cache': pg.cacheFolder,
}
for uidIndex, associatedAttributes in self.attributesPerUid.items():
assAttr = [(a.getName(), a.uid()) for a in associatedAttributes]
assAttr.sort()
self._cmdVars['uid{}'.format(uidIndex)] = hash(tuple([b for a, b in assAttr]))
# Evaluate input params
for name, attr in self._attributes.objects.items():
if attr.attributeDesc.isOutput:
continue # skip outputs
linkAttr = attr.getLinkParam()
if linkAttr:
attr._value = linkAttr._value
v = attr._value
if isinstance(attr.attributeDesc, desc.ChoiceParam) and not attr.attributeDesc.exclusive:
assert(isinstance(v, collections.Sequence) and not isinstance(v, basestring))
v = attr.attributeDesc.joinChar.join(v)
self._cmdVars[name] = '--{name} {value}'.format(name=name, value=v)
self._cmdVars[name + 'Value'] = str(v)
if v is not None and v is not '':
self._cmdVars[attr.attributeDesc.group] = self._cmdVars.get(attr.attributeDesc.group, '') + \
' ' + self._cmdVars[name]
# Evaluate output params
for name, attr in self._attributes.objects.items():
if not attr.attributeDesc.isOutput:
continue # skip inputs
attr.value = attr.attributeDesc.value.format(
nodeType=self.nodeType,
**self._cmdVars)
v = attr._value
self._cmdVars[name] = '--{name} {value}'.format(name=name, value=v)
self._cmdVars[name + 'Value'] = str(v)
if v is not None and v is not '':
self._cmdVars[attr.attributeDesc.group] = self._cmdVars.get(attr.attributeDesc.group, '') + \
' ' + self._cmdVars[name]
self.internalFolderChanged.emit()
@property
def internalFolder(self):
return self.nodeDesc.internalFolder.format(nodeType=self.nodeType, **self._cmdVars)
def commandLine(self):
return self.nodeDesc.commandLine.format(nodeType=self.nodeType, **self._cmdVars)
def statusFile(self):
return os.path.join(pg.cacheFolder, self.internalFolder, 'status')
def statisticsFile(self):
return os.path.join(pg.cacheFolder, self.internalFolder, 'statistics')
def logFile(self):
return os.path.join(pg.cacheFolder, self.internalFolder, 'log')
def updateStatusFromCache(self):
"""
Need up-to-date UIDs.
"""
statusFile = self.statusFile()
if not os.path.exists(statusFile):
self.upgradeStatusTo(Status.NONE)
return
with open(statusFile, 'r') as jsonFile:
statusData = json.load(jsonFile)
self.status.fromDict(statusData)
self.statusChanged.emit()
def saveStatusFile(self):
"""
Need up-to-date UIDs.
"""
data = self.status.toDict()
statusFilepath = self.statusFile()
folder = os.path.dirname(statusFilepath)
if not os.path.exists(folder):
os.makedirs(folder)
statusFilepathWriting = statusFilepath + '.writing.' + str(uuid.uuid4())
with open(statusFilepathWriting, 'w') as jsonFile:
json.dump(data, jsonFile, indent=4)
shutil.move(statusFilepathWriting, statusFilepath)
def updateStatisticsFromCache(self):
"""
"""
statisticsFile = self.statisticsFile()
if not os.path.exists(statisticsFile):
return
with open(statisticsFile, 'r') as jsonFile:
statisticsData = json.load(jsonFile)
self.statistics.fromDict(statisticsData)
def saveStatistics(self):
data = self.statistics.toDict()
statisticsFilepath = self.statisticsFile()
folder = os.path.dirname(statisticsFilepath)
if not os.path.exists(folder):
os.makedirs(folder)
statisticsFilepathWriting = statisticsFilepath + '.writing.' + str(uuid.uuid4())
with open(statisticsFilepathWriting, 'w') as jsonFile:
json.dump(data, jsonFile, indent=4)
shutil.move(statisticsFilepathWriting, statisticsFilepath)
def upgradeStatusTo(self, newStatus):
if int(newStatus.value) <= int(self.status.status.value):
print('WARNING: downgrade status on node "{}" from {} to {}'.format(self._name, self.status.status.name,
newStatus))
self.status.status = newStatus
self.statusChanged.emit()
self.saveStatusFile()
def isAlreadySubmitted(self):
return self.status.status in (Status.SUBMITTED_EXTERN, Status.SUBMITTED_LOCAL, Status.RUNNING)
def submit(self):
self.upgradeStatusTo(Status.SUBMITTED_EXTERN)
def beginSequence(self):
self.upgradeStatusTo(Status.SUBMITTED_LOCAL)
def stopProcess(self):
if self._subprocess:
self._subprocess.terminate()
def process(self):
self.upgradeStatusTo(Status.RUNNING)
statThread = stats.StatisticsThread(self)
statThread.start()
startTime = time.time()
try:
with open(self.logFile(), 'w') as logF:
cmd = self.commandLine()
print(' - commandLine:', cmd)
print(' - logFile:', self.logFile())
self._subprocess = psutil.Popen(cmd, stdout=logF, stderr=logF, shell=True)
# store process static info into the status file
self.status.commandLine = cmd
# self.status.env = self.proc.environ()
# self.status.createTime = self.proc.create_time()
statThread.proc = self._subprocess
stdout, stderr = self._subprocess.communicate()
self._subprocess.wait()
self.status.returnCode = self._subprocess.returncode
if self._subprocess.returncode != 0:
with open(self.logFile(), 'r') as logF:
logContent = ''.join(logF.readlines())
self.upgradeStatusTo(Status.ERROR)
raise RuntimeError('Error on node "{}":\nLog:\n{}'.format(self.name, logContent))
except Exception:
self.upgradeStatusTo(Status.ERROR)
raise
finally:
elapsedTime = time.time() - startTime
print(' - elapsed time:', elapsedTime)
self._subprocess = None
# ask and wait for the stats thread to stop
statThread.stopRequest()
statThread.join()
self.upgradeStatusTo(Status.SUCCESS)
def endSequence(self):
pass
def getStatus(self):
return self.status
@property
def statusName(self):
return self.status.status.name
name = Property(str, getName, constant=True)
nodeType = Property(str, nodeType.fget, constant=True)
attributes = Property(BaseObject, getAttributes, constant=True)
internalFolderChanged = Signal()
internalFolder = Property(str, internalFolder.fget, notify=internalFolderChanged)
depthChanged = Signal()
depth = Property(int, depth.fget, notify=depthChanged)
statusChanged = Signal()
statusName = Property(str, statusName.fget, notify=statusChanged)
WHITE = 0
GRAY = 1
BLACK = 2
class Visitor:
# def initializeVertex(self, s, g):
# '''is invoked on every vertex of the graph before the start of the graph search.'''
# pass
# def startVertex(self, s, g):
# '''is invoked on the source vertex once before the start of the search.'''
# pass
def discoverVertex(self, u, g):
""" is invoked when a vertex is encountered for the first time. """
pass
# def examineEdge(self, e, g):
# '''is invoked on every out-edge of each vertex after it is discovered.'''
# pass
# def treeEdge(self, e, g):
# '''is invoked on each edge as it becomes a member of the edges that form the search tree. If you wish to record predecessors, do so at this event point.'''
# pass
# def backEdge(self, e, g):
# '''is invoked on the back edges in the graph.'''
# pass
# def forwardOrCrossEdge(self, e, g):
# '''is invoked on forward or cross edges in the graph. In an undirected graph this method is never called.'''
# pass
# def finishEdge(self, e, g):
# '''is invoked on the non-tree edges in the graph as well as on each tree edge after its target vertex is finished.'''
# pass
def finishVertex(self, u, g):
""" is invoked on a vertex after all of its out edges have been added to the search tree and all of the
adjacent vertices have been discovered (but before their out-edges have been examined). """
pass
class Graph(BaseObject):
"""
_________________ _________________ _________________
| | | | | |
| Node A | | Node B | | Node C |
| | edge | | edge | |
|input output|>---->|input output|>---->|input output|
|_______________| |_______________| |_______________|
Data structures:
nodes = {'A': <nodeA>, 'B': <nodeB>, 'C': <nodeC>}
edges = {B.input: A.output, C.input: B.output,}
"""
def __init__(self, name, parent=None):
super(Graph, self).__init__(parent)
self.name = name
self._nodes = DictModel(keyAttrName='name', parent=self)
self._edges = DictModel(keyAttrName='dst', parent=self) # use dst attribute as unique key since it can only have one input connection
def clear(self):
self._nodes.clear()
self._edges.clear()
@Slot(str)
def load(self, filepath):
self.clear()
with open(filepath) as jsonFile:
graphData = json.load(jsonFile)
if not isinstance(graphData, dict):
raise RuntimeError('loadGraph error: Graph is not a dict. File: {}'.format(filepath))
self.name = os.path.splitext(os.path.basename(filepath))[0]
for nodeName, nodeData in graphData.items():
if not isinstance(nodeData, dict):
raise RuntimeError('loadGraph error: Node is not a dict. File: {}'.format(filepath))
n = Node(nodeData['nodeType'], parent=self, **nodeData['attributes'])
# Add node to the graph with raw attributes values
self._addNode(n, nodeName)
# Create graph edges by resolving attributes expressions
self._applyExpr()
def _addNode(self, node, uniqueName):
"""
Internal method to add the given node to this Graph, with the given name (must be unique).
Attribute expressions are not resolved.
"""
if node.graph is not None and node.graph != self:
raise RuntimeError(
'Node "{}" cannot be part of the Graph "{}", as it is already part of the other graph "{}".'.format(
node.nodeType, self.name, node.graph.name))
assert uniqueName not in self._nodes.keys()
node._name = uniqueName
node.graph = self
self._nodes.add(node)
self.stopExecutionRequested.connect(node.stopProcess)
# Trigger internal update when an attribute is modified
for attr in node.attributes: # type: Attribute
attr.valueChanged.connect(self.updateInternals)
def addNode(self, node, uniqueName=None):
"""
Add the given node to this Graph with an optional unique name,
and resolve attributes expressions.
"""
self._addNode(node, uniqueName if uniqueName else self._createUniqueNodeName(node.nodeType))
# Resolve attribute expressions
node._applyExpr()
return node
def outEdges(self, attribute):
""" Return the list of edges starting from the given attribute """
# type: (Attribute,) -> [Edge]
return [edge for edge in self.edges if edge.src == attribute]
def removeNode(self, nodeName):
"""
Remove the node identified by 'nodeName' from the graph
and return in and out edges removed by this operation in two dicts {dstAttr.fullName(), srcAttr.fullName()}
"""
node = self.node(nodeName)
self._nodes.pop(nodeName)
inEdges = {}
outEdges = {}
for attr in node._attributes:
for edge in self.outEdges(attr):
self.edges.remove(edge)
outEdges[edge.dst.fullName()] = edge.src.fullName()
if attr in self.edges.keys():
edge = self.edges.pop(attr)
inEdges[edge.dst.fullName()] = edge.src.fullName()
self.updateInternals()
return inEdges, outEdges
@Slot(str, result=Node)
def addNewNode(self, nodeType, **kwargs):
"""
:param nodeType:
:param kwargs:
:return:
:rtype: Node
"""
node = self.addNode(Node(nodeDesc=nodeType, parent=self, **kwargs))
return node
def _createUniqueNodeName(self, inputName):
i = 1
while i:
newName = "{name}_{index}".format(name=inputName, index=i)
if newName not in self._nodes.objects:
return newName
i += 1
def node(self, nodeName):
return self._nodes.get(nodeName)
def attribute(self, fullName):
# type: (str) -> Attribute
"""
Return the attribute identified by the unique name 'fullName'.
"""
node, attribute = fullName.split('.', 1)
return self.node(node).attribute(attribute)
def findNodeCandidates(self, nodeNameExpr):
pattern = re.compile(nodeNameExpr)
return [v for k, v in self._nodes.objects.items() if pattern.match(k)]
def findNode(self, nodeExpr):
candidates = self.findNodeCandidates('^' + nodeExpr)
if not candidates:
raise KeyError('No node candidate for "{}"'.format(nodeExpr))
elif len(candidates) > 1:
raise KeyError('Multiple node candidates for "{}": {}'.format(nodeExpr, str([c.name for c in candidates])))
return candidates[0]
def findNodes(self, nodesExpr):
return [self.findNode(nodeName) for nodeName in nodesExpr]
def edge(self, dstAttributeName):
return self._edges.get(dstAttributeName)
def getLeaves(self):
nodesWithOutput = set([edge.src.node for edge in self.edges])
return set(self._nodes) - nodesWithOutput
def addEdge(self, srcAttr, dstAttr):
assert isinstance(srcAttr, Attribute)
assert isinstance(dstAttr, Attribute)
if srcAttr.node.graph != self or dstAttr.node.graph != self:
raise RuntimeError('The attributes of the edge should be part of a common graph.')
if dstAttr in self.edges.keys():
raise RuntimeError('Destination attribute "{}" is already connected.'.format(dstAttr.fullName()))
edge = Edge(srcAttr, dstAttr)
self.edges.add(edge)
dstAttr.valueChanged.emit()
dstAttr.isLinkChanged.emit()
return edge
def addEdges(self, *edges):
for edge in edges:
self.addEdge(*edge)
def removeEdge(self, dstAttr):
if dstAttr not in self.edges.keys():
raise RuntimeError('Attribute "{}" is not connected'.format(dstAttr.fullName()))
edge = self.edges.pop(dstAttr)
dstAttr.valueChanged.emit()
dstAttr.isLinkChanged.emit()
return edge
def getDepth(self, node):
# TODO: would be better to use bfs instead of recursive function
inputEdges = self.getInputEdges(node)
if not inputEdges:
return 0
inputDepths = [e.src.node.depth for e in inputEdges]
return max(inputDepths) + 1
def getInputEdges(self, node):
return set([edge for edge in self.edges if edge.dst.node is node])
def _getInputEdgesPerNode(self):
nodeEdges = defaultdict(set)
for edge in self.edges:
nodeEdges[edge.dst.node].add(edge.src.node)
return nodeEdges
def dfs(self, visitor, startNodes=None):
nodeChildren = self._getInputEdgesPerNode()
colors = {}
for u in self._nodes:
colors[u] = WHITE
time = 0
if startNodes:
for startNode in startNodes:
self.dfsVisit(startNode, visitor, colors, nodeChildren)
else:
leaves = self.getLeaves()
for u in leaves:
if colors[u] == WHITE:
self.dfsVisit(u, visitor, colors, nodeChildren)
def dfsVisit(self, u, visitor, colors, nodeChildren):
colors[u] = GRAY
visitor.discoverVertex(u, self)
# d_time[u] = time = time + 1
for v in nodeChildren[u]:
if colors[v] == WHITE:
# (u,v) is a tree edge
self.dfsVisit(v, visitor, colors, nodeChildren) # TODO: avoid recursion
elif colors[v] == GRAY:
pass # (u,v) is a back edge
elif colors[v] == BLACK:
pass # (u,v) is a cross or forward edge
colors[u] = BLACK
visitor.finishVertex(u, self)
def dfsNodesOnFinish(self, startNodes=None):
nodes = []
visitor = Visitor()
visitor.finishVertex = lambda vertex, graph: nodes.append(vertex)
self.dfs(visitor=visitor, startNodes=startNodes)
return nodes
def dfsNodesToProcess(self, startNodes=None):
nodes = []
visitor = Visitor()
def finishVertex(vertex, graph):
if vertex.status.status in (Status.SUBMITTED_EXTERN,
Status.SUBMITTED_LOCAL):
print('WARNING: node "{}" is already submitted.'.format(vertex.name))
if vertex.status.status is Status.RUNNING:
print('WARNING: node "{}" is already running.'.format(vertex.name))
if vertex.status.status is not Status.SUCCESS:
nodes.append(vertex)
visitor.finishVertex = finishVertex
self.dfs(visitor=visitor, startNodes=startNodes)
return nodes
def _applyExpr(self):
for node in self._nodes:
node._applyExpr()
def toDict(self):
return {k: node.toDict() for k, node in self._nodes.objects.items()}
@Slot(result=str)
def asString(self):
return str(self.toDict())
@Slot(str)
def save(self, filepath):
"""
"""
data = self.toDict()
pprint(data)
with open(filepath, 'w') as jsonFile:
json.dump(data, jsonFile, indent=4)
def updateInternals(self, startNodes=None):
nodes = self.dfsNodesOnFinish(startNodes=startNodes)
for node in nodes:
node.updateInternals()
def updateStatusFromCache(self):
for node in self._nodes:
node.updateStatusFromCache()
def updateStatisticsFromCache(self):
for node in self._nodes:
node.updateStatisticsFromCache()
def update(self):
self.updateInternals()
self.updateStatusFromCache()
def stopExecution(self):
""" Request graph execution to be stopped """
self.stopExecutionRequested.emit()
def submittedNodes(self):
""" Return the list of submitted nodes inside this Graph """
return [node for node in self.nodes if node.isAlreadySubmitted()]
def clearSubmittedNodes(self):
""" Reset the status of already submitted nodes to Status.NONE """
[node.upgradeStatusTo(Status.NONE) for node in self.submittedNodes()]
@property
def nodes(self):
return self._nodes
@property
def edges(self):
return self._edges
nodes = Property(BaseObject, nodes.fget, constant=True)
edges = Property(BaseObject, edges.fget, constant=True)
stopExecutionRequested = Signal()
def loadGraph(filepath):
"""
"""
graph = Graph("")
graph.load(filepath)
return graph
def getAlreadySubmittedNodes(nodes):
out = []
for node in nodes:
if node.isAlreadySubmitted():
out.append(node)
return out
def execute(graph, toNodes=None, force=False):
"""
"""
if force:
nodes = graph.dfsNodesOnFinish(startNodes=toNodes)
else:
nodes = graph.dfsNodesToProcess(startNodes=toNodes)
nodesInConflict = getAlreadySubmittedNodes(nodes)
if nodesInConflict:
nodesStatus = set([node.status.status.name for node in nodesInConflict])
nodesName = [node.name for node in nodesInConflict]
#raise RuntimeError(
print(
'WARNING: Some nodes are already submitted with status: {}\n'
'Nodes: {}'.format(
', '.join(nodesStatus),
', '.join(nodesName),
))
print('Nodes to execute: ', str([n.name for n in nodes]))
for node in nodes:
node.beginSequence()
for i, node in enumerate(nodes):
try:
print('\n[{i}/{N}] {nodeName}'.format(i=i + 1, N=len(nodes), nodeName=node.nodeType))
node.process()
except Exception as e:
logging.error("Error on node computation: {}".format(e))
graph.clearSubmittedNodes()
return
for node in nodes:
node.endSequence()