[core] graph: use new dependenciesOnly option on graph operations

Input attribute to input attribute connection is not a real dependency between the nodes.
This commit is contained in:
Fabien Castan 2020-12-14 19:02:49 +01:00
parent 98b5039f8a
commit d291fcc39d
4 changed files with 60 additions and 43 deletions

View file

@ -85,9 +85,10 @@ class Visitor(object):
Base class for Graph Visitors that does nothing. Base class for Graph Visitors that does nothing.
Sub-classes can override any method to implement specific algorithms. Sub-classes can override any method to implement specific algorithms.
""" """
def __init__(self, reverse): def __init__(self, reverse, dependenciesOnly):
super(Visitor, self).__init__() super(Visitor, self).__init__()
self.reverse = reverse self.reverse = reverse
self.dependenciesOnly = dependenciesOnly
# def initializeVertex(self, s, g): # def initializeVertex(self, s, g):
# '''is invoked on every vertex of the graph before the start of the graph search.''' # '''is invoked on every vertex of the graph before the start of the graph search.'''
@ -383,7 +384,7 @@ class Graph(BaseObject):
Returns: Returns:
OrderedDict[Node, Node]: the source->duplicate map OrderedDict[Node, Node]: the source->duplicate map
""" """
srcNodes, srcEdges = self.dfsOnDiscover(startNodes=[fromNode], reverse=True) srcNodes, srcEdges = self.dfsOnDiscover(startNodes=[fromNode], reverse=True, dependenciesOnly=True)
# use OrderedDict to keep duplicated nodes creation order # use OrderedDict to keep duplicated nodes creation order
duplicates = OrderedDict() duplicates = OrderedDict()
@ -581,13 +582,13 @@ class Graph(BaseObject):
def edge(self, dstAttributeName): def edge(self, dstAttributeName):
return self._edges.get(dstAttributeName) return self._edges.get(dstAttributeName)
def getLeafNodes(self): def getLeafNodes(self, dependenciesOnly):
nodesWithOutput = set([edge.src.node for edge in self.edges]) nodesWithOutputLink = set([edge.src.node for edge in self.getEdges(dependenciesOnly)])
return set(self._nodes) - nodesWithOutput return set(self._nodes) - nodesWithOutputLink
def getRootNodes(self): def getRootNodes(self, dependenciesOnly):
nodesWithInput = set([edge.dst.node for edge in self.edges]) nodesWithInputLink = set([edge.dst.node for edge in self.getEdges(dependenciesOnly)])
return set(self._nodes) - nodesWithInput return set(self._nodes) - nodesWithInputLink
@changeTopology @changeTopology
def addEdge(self, srcAttr, dstAttr): def addEdge(self, srcAttr, dstAttr):
@ -635,21 +636,21 @@ class Graph(BaseObject):
minDepth, maxDepth = self._nodesMinMaxDepths[node] minDepth, maxDepth = self._nodesMinMaxDepths[node]
return minDepth if minimal else maxDepth return minDepth if minimal else maxDepth
def getInputEdges(self, node): def getInputEdges(self, node, dependenciesOnly):
return set([edge for edge in self.edges if edge.dst.node is node]) return set([edge for edge in self.getEdges(dependenciesOnly=dependenciesOnly) if edge.dst.node is node])
def _getInputEdgesPerNode(self): def _getInputEdgesPerNode(self, dependenciesOnly):
nodeEdges = defaultdict(set) nodeEdges = defaultdict(set)
for edge in self.edges: for edge in self.getEdges(dependenciesOnly=dependenciesOnly):
nodeEdges[edge.dst.node].add(edge.src.node) nodeEdges[edge.dst.node].add(edge.src.node)
return nodeEdges return nodeEdges
def _getOutputEdgesPerNode(self): def _getOutputEdgesPerNode(self, dependenciesOnly):
nodeEdges = defaultdict(set) nodeEdges = defaultdict(set)
for edge in self.edges: for edge in self.getEdges(dependenciesOnly=dependenciesOnly):
nodeEdges[edge.src.node].add(edge.dst.node) nodeEdges[edge.src.node].add(edge.dst.node)
return nodeEdges return nodeEdges
@ -657,7 +658,7 @@ class Graph(BaseObject):
def dfs(self, visitor, startNodes=None, longestPathFirst=False): def dfs(self, visitor, startNodes=None, longestPathFirst=False):
# Default direction (visitor.reverse=False): from node to root # Default direction (visitor.reverse=False): from node to root
# Reverse direction (visitor.reverse=True): from node to leaves # Reverse direction (visitor.reverse=True): from node to leaves
nodeChildren = self._getOutputEdgesPerNode() if visitor.reverse else self._getInputEdgesPerNode() nodeChildren = self._getOutputEdgesPerNode(visitor.dependenciesOnly) if visitor.reverse else self._getInputEdgesPerNode(visitor.dependenciesOnly)
# Initialize color map # Initialize color map
colors = {} colors = {}
for u in self._nodes: for u in self._nodes:
@ -668,7 +669,7 @@ class Graph(BaseObject):
# it is not possible to handle this case at the moment # it is not possible to handle this case at the moment
raise NotImplementedError("Graph.dfs(): longestPathFirst=True and visitor.reverse=True are not compatible yet.") raise NotImplementedError("Graph.dfs(): longestPathFirst=True and visitor.reverse=True are not compatible yet.")
nodes = startNodes or (self.getRootNodes() if visitor.reverse else self.getLeafNodes()) nodes = startNodes or (self.getRootNodes(visitor.dependenciesOnly) if visitor.reverse else self.getLeafNodes(visitor.dependenciesOnly))
if longestPathFirst: if longestPathFirst:
# Graph topology must be known and node depths up-to-date # Graph topology must be known and node depths up-to-date
@ -711,7 +712,7 @@ class Graph(BaseObject):
colors[u] = BLACK colors[u] = BLACK
visitor.finishVertex(u, self) visitor.finishVertex(u, self)
def dfsOnFinish(self, startNodes=None, longestPathFirst=False, reverse=False): def dfsOnFinish(self, startNodes=None, longestPathFirst=False, reverse=False, dependenciesOnly=False):
""" """
Return the node chain from startNodes to the graph roots/leaves. Return the node chain from startNodes to the graph roots/leaves.
Order is defined by the visit and finishVertex event. Order is defined by the visit and finishVertex event.
@ -728,13 +729,13 @@ class Graph(BaseObject):
""" """
nodes = [] nodes = []
edges = [] edges = []
visitor = Visitor(reverse=reverse) visitor = Visitor(reverse=reverse, dependenciesOnly=dependenciesOnly)
visitor.finishVertex = lambda vertex, graph: nodes.append(vertex) visitor.finishVertex = lambda vertex, graph: nodes.append(vertex)
visitor.finishEdge = lambda edge, graph: edges.append(edge) visitor.finishEdge = lambda edge, graph: edges.append(edge)
self.dfs(visitor=visitor, startNodes=startNodes, longestPathFirst=longestPathFirst) self.dfs(visitor=visitor, startNodes=startNodes, longestPathFirst=longestPathFirst)
return nodes, edges return nodes, edges
def dfsOnDiscover(self, startNodes=None, filterTypes=None, longestPathFirst=False, reverse=False): def dfsOnDiscover(self, startNodes=None, filterTypes=None, longestPathFirst=False, reverse=False, dependenciesOnly=False):
""" """
Return the node chain from startNodes to the graph roots/leaves. Return the node chain from startNodes to the graph roots/leaves.
Order is defined by the visit and discoverVertex event. Order is defined by the visit and discoverVertex event.
@ -753,7 +754,7 @@ class Graph(BaseObject):
""" """
nodes = [] nodes = []
edges = [] edges = []
visitor = Visitor(reverse=reverse) visitor = Visitor(reverse=reverse, dependenciesOnly=dependenciesOnly)
def discoverVertex(vertex, graph): def discoverVertex(vertex, graph):
if not filterTypes or vertex.nodeType in filterTypes: if not filterTypes or vertex.nodeType in filterTypes:
@ -777,7 +778,7 @@ class Graph(BaseObject):
""" """
nodes = [] nodes = []
edges = [] edges = []
visitor = Visitor(reverse=False) visitor = Visitor(reverse=False, dependenciesOnly=True)
def discoverVertex(vertex, graph): def discoverVertex(vertex, graph):
if vertex.hasStatus(Status.SUCCESS): if vertex.hasStatus(Status.SUCCESS):
@ -832,7 +833,7 @@ class Graph(BaseObject):
self._computationBlocked.clear() self._computationBlocked.clear()
compatNodes = [] compatNodes = []
visitor = Visitor(reverse=False) visitor = Visitor(reverse=False, dependenciesOnly=True)
def discoverVertex(vertex, graph): def discoverVertex(vertex, graph):
# initialize depths # initialize depths
@ -866,7 +867,7 @@ class Graph(BaseObject):
# propagate inputVertex computability # propagate inputVertex computability
self._computationBlocked[currentVertex] |= self._computationBlocked[inputVertex] self._computationBlocked[currentVertex] |= self._computationBlocked[inputVertex]
leaves = self.getLeafNodes() leaves = self.getLeafNodes(visitor.dependenciesOnly)
visitor.finishEdge = finishEdge visitor.finishEdge = finishEdge
visitor.discoverVertex = discoverVertex visitor.discoverVertex = discoverVertex
self.dfs(visitor=visitor, startNodes=leaves) self.dfs(visitor=visitor, startNodes=leaves)
@ -890,7 +891,7 @@ class Graph(BaseObject):
""" """
nodesStack = [] nodesStack = []
edgesScore = defaultdict(lambda: 0) edgesScore = defaultdict(lambda: 0)
visitor = Visitor(reverse=False) visitor = Visitor(reverse=False, dependenciesOnly=False)
def finishEdge(edge, graph): def finishEdge(edge, graph):
u, v = edge u, v = edge
@ -926,18 +927,34 @@ class Graph(BaseObject):
flowEdges.append(link) flowEdges.append(link)
return flowEdges return flowEdges
def getInputNodes(self, node, recursive=False): def getEdges(self, dependenciesOnly=False):
if not dependenciesOnly:
return self.edges
outEdges = []
for e in self.edges:
attr = e.src
if dependenciesOnly:
if attr.isLink:
attr = attr.getLinkParam(recursive=True)
if not attr.isOutput:
continue
newE = Edge(attr, e.dst)
outEdges.append(newE)
return outEdges
def getInputNodes(self, node, recursive, dependenciesOnly):
""" Return either the first level input nodes of a node or the whole chain. """ """ Return either the first level input nodes of a node or the whole chain. """
if not recursive: if not recursive:
return set([edge.src.node for edge in self.edges if edge.dst.node is node]) return set([edge.src.node for edge in self.getEdges(dependenciesOnly) if edge.dst.node is node])
inputNodes, edges = self.dfsOnDiscover(startNodes=[node], filterTypes=None, reverse=False) inputNodes, edges = self.dfsOnDiscover(startNodes=[node], filterTypes=None, reverse=False)
return inputNodes[1:] # exclude current node return inputNodes[1:] # exclude current node
def getOutputNodes(self, node, recursive=False): def getOutputNodes(self, node, recursive, dependenciesOnly):
""" Return either the first level output nodes of a node or the whole chain. """ """ Return either the first level output nodes of a node or the whole chain. """
if not recursive: if not recursive:
return set([edge.dst.node for edge in self.edges if edge.src.node is node]) return set([edge.dst.node for edge in self.getEdges(dependenciesOnly) if edge.src.node is node])
outputNodes, edges = self.dfsOnDiscover(startNodes=[node], filterTypes=None, reverse=True) outputNodes, edges = self.dfsOnDiscover(startNodes=[node], filterTypes=None, reverse=True)
return outputNodes[1:] # exclude current node return outputNodes[1:] # exclude current node
@ -957,8 +974,8 @@ class Graph(BaseObject):
return 0 return 0
class SCVisitor(Visitor): class SCVisitor(Visitor):
def __init__(self, reverse): def __init__(self, reverse, dependenciesOnly):
super(SCVisitor, self).__init__(reverse) super(SCVisitor, self).__init__(reverse, dependenciesOnly)
canCompute = True canCompute = True
canSubmit = True canSubmit = True
@ -969,7 +986,7 @@ class Graph(BaseObject):
if vertex.isExtern(): if vertex.isExtern():
self.canCompute = False self.canCompute = False
visitor = SCVisitor(reverse=False) visitor = SCVisitor(reverse=False, dependenciesOnly=True)
self.dfs(visitor=visitor, startNodes=[startNode]) self.dfs(visitor=visitor, startNodes=[startNode])
return visitor.canCompute + (2 * visitor.canSubmit) return visitor.canCompute + (2 * visitor.canSubmit)
@ -1131,7 +1148,7 @@ class Graph(BaseObject):
@Slot(Node) @Slot(Node)
def clearDataFrom(self, startNode): def clearDataFrom(self, startNode):
for node in self.dfsOnDiscover(startNodes=[startNode], reverse=True)[0]: for node in self.dfsOnDiscover(startNodes=[startNode], reverse=True, dependenciesOnly=True)[0]:
node.clearData() node.clearData()
def iterChunksByStatus(self, status): def iterChunksByStatus(self, status):

View file

@ -589,11 +589,11 @@ class BaseNode(BaseObject):
def minDepth(self): def minDepth(self):
return self.graph.getDepth(self, minimal=True) return self.graph.getDepth(self, minimal=True)
def getInputNodes(self, recursive=False): def getInputNodes(self, recursive, dependenciesOnly):
return self.graph.getInputNodes(self, recursive=recursive) return self.graph.getInputNodes(self, recursive=recursive, dependenciesOnly=dependenciesOnly)
def getOutputNodes(self, recursive=False): def getOutputNodes(self, recursive, dependenciesOnly):
return self.graph.getOutputNodes(self, recursive=recursive) return self.graph.getOutputNodes(self, recursive=recursive, dependenciesOnly=dependenciesOnly)
def toDict(self): def toDict(self):
pass pass
@ -883,7 +883,7 @@ class BaseNode(BaseObject):
# Warning: we must handle some specific cases for global start/stop # Warning: we must handle some specific cases for global start/stop
if self._locked and currentStatus in (Status.ERROR, Status.STOPPED, Status.NONE): if self._locked and currentStatus in (Status.ERROR, Status.STOPPED, Status.NONE):
self.setLocked(False) self.setLocked(False)
inputNodes = self.getInputNodes(recursive=True) inputNodes = self.getInputNodes(recursive=True, dependenciesOnly=True)
for node in inputNodes: for node in inputNodes:
if node.getGlobalStatus() == Status.RUNNING: if node.getGlobalStatus() == Status.RUNNING:
@ -901,8 +901,8 @@ class BaseNode(BaseObject):
if currentStatus == Status.SUCCESS: if currentStatus == Status.SUCCESS:
# At this moment, the node is necessarily locked because of previous if statement # At this moment, the node is necessarily locked because of previous if statement
inputNodes = self.getInputNodes(recursive=True) inputNodes = self.getInputNodes(recursive=True, dependenciesOnly=True)
outputNodes = self.getOutputNodes(recursive=True) outputNodes = self.getOutputNodes(recursive=True, dependenciesOnly=True)
stayLocked = None stayLocked = None
# Check if at least one dependentNode is submitted or currently running # Check if at least one dependentNode is submitted or currently running

View file

@ -147,7 +147,7 @@ class TaskManager(BaseObject):
self.removeNode(node, displayList=False, processList=True) self.removeNode(node, displayList=False, processList=True)
# Remove output nodes from display and computing lists # Remove output nodes from display and computing lists
outputNodes = node.getOutputNodes(recursive=True) outputNodes = node.getOutputNodes(recursive=True, dependenciesOnly=True)
for n in outputNodes: for n in outputNodes:
if n.getGlobalStatus() in (Status.ERROR, Status.SUBMITTED): if n.getGlobalStatus() in (Status.ERROR, Status.SUBMITTED):
n.upgradeStatusTo(Status.NONE) n.upgradeStatusTo(Status.NONE)
@ -184,7 +184,7 @@ class TaskManager(BaseObject):
else: else:
# Check dependencies of toNodes # Check dependencies of toNodes
if not toNodes: if not toNodes:
toNodes = graph.getLeafNodes() toNodes = graph.getLeafNodes(dependenciesOnly=True)
toNodes = list(toNodes) toNodes = list(toNodes)
allReady = self.checkNodesDependencies(graph, toNodes, "COMPUTATION") allReady = self.checkNodesDependencies(graph, toNodes, "COMPUTATION")
@ -402,7 +402,7 @@ class TaskManager(BaseObject):
# Check dependencies of toNodes # Check dependencies of toNodes
if not toNodes: if not toNodes:
toNodes = graph.getLeafNodes() toNodes = graph.getLeafNodes(dependenciesOnly=True)
toNodes = list(toNodes) toNodes = list(toNodes)
allReady = self.checkNodesDependencies(graph, toNodes, "SUBMITTING") allReady = self.checkNodesDependencies(graph, toNodes, "SUBMITTING")

View file

@ -526,7 +526,7 @@ class UIGraph(QObject):
with self.groupedGraphModification("Remove Nodes from {}".format(startNode.name)): with self.groupedGraphModification("Remove Nodes from {}".format(startNode.name)):
# Perform nodes removal from leaves to start node so that edges # Perform nodes removal from leaves to start node so that edges
# can be re-created in correct order on redo. # can be re-created in correct order on redo.
[self.removeNode(node) for node in reversed(self._graph.dfsOnDiscover(startNodes=[startNode], reverse=True)[0])] [self.removeNode(node) for node in reversed(self._graph.dfsOnDiscover(startNodes=[startNode], reverse=True, dependenciesOnly=True)[0])]
@Slot(Attribute, Attribute) @Slot(Attribute, Attribute)
def addEdge(self, src, dst): def addEdge(self, src, dst):