[core] graph: add the notion of flowEdges and minMaxDepthPerNode

- flowEdges: Compute a transitive reduction of the graph to provide
node-level minimal dependencies to create an execution workflow for
renderfarm.
- minMaxDepthPerNode: compute the min and max depth per node
This commit is contained in:
Fabien Castan 2017-10-24 20:18:35 +02:00
parent 2bf95d200f
commit e533c588dd
2 changed files with 136 additions and 7 deletions

View file

@ -7,6 +7,7 @@ import os
import psutil import psutil
import re import re
import shutil import shutil
import sys
import time import time
import uuid import uuid
from collections import defaultdict from collections import defaultdict
@ -594,6 +595,9 @@ class Node(BaseObject):
def statusName(self): def statusName(self):
return self.status.status.name return self.status.status.name
def __repr__(self):
return self.name
name = Property(str, getName, constant=True) name = Property(str, getName, constant=True)
nodeType = Property(str, nodeType.fget, constant=True) nodeType = Property(str, nodeType.fget, constant=True)
attributes = Property(BaseObject, getAttributes, constant=True) attributes = Property(BaseObject, getAttributes, constant=True)
@ -840,31 +844,38 @@ class Graph(BaseObject):
return nodeEdges return nodeEdges
def dfs(self, visitor, startNodes=None): def dfs(self, visitor, startNodes=None, longestPathFirst=False):
nodeChildren = self._getInputEdgesPerNode() nodeChildren = self._getInputEdgesPerNode()
minMaxDepthPerNode = self.minMaxDepthPerNode() if longestPathFirst else None
colors = {} colors = {}
for u in self._nodes: for u in self._nodes:
colors[u] = WHITE colors[u] = WHITE
time = 0
if startNodes: if startNodes:
if longestPathFirst:
startNodes = sorted(startNodes, key=lambda item: item.depth)
for startNode in startNodes: for startNode in startNodes:
self.dfsVisit(startNode, visitor, colors, nodeChildren) self.dfsVisit(startNode, visitor, colors, nodeChildren, longestPathFirst, minMaxDepthPerNode)
else: else:
leaves = self.getLeaves() leaves = self.getLeaves()
if longestPathFirst:
leaves = sorted(leaves, key=lambda item: item.depth)
for u in leaves: for u in leaves:
if colors[u] == WHITE: if colors[u] == WHITE:
self.dfsVisit(u, visitor, colors, nodeChildren) self.dfsVisit(u, visitor, colors, nodeChildren, longestPathFirst, minMaxDepthPerNode)
def dfsVisit(self, u, visitor, colors, nodeChildren): def dfsVisit(self, u, visitor, colors, nodeChildren, longestPathFirst, minMaxDepthPerNode):
colors[u] = GRAY colors[u] = GRAY
visitor.discoverVertex(u, self) visitor.discoverVertex(u, self)
# d_time[u] = time = time + 1 # d_time[u] = time = time + 1
for v in nodeChildren[u]: children = nodeChildren[u]
if longestPathFirst:
children = sorted(children, reverse=True, key=lambda item: minMaxDepthPerNode[item][1])
for v in children:
visitor.examineEdge((u, v), self) visitor.examineEdge((u, v), self)
if colors[v] == WHITE: if colors[v] == WHITE:
visitor.treeEdge((u, v), self) visitor.treeEdge((u, v), self)
# (u,v) is a tree edge # (u,v) is a tree edge
self.dfsVisit(v, visitor, colors, nodeChildren) # TODO: avoid recursion self.dfsVisit(v, visitor, colors, nodeChildren, longestPathFirst, minMaxDepthPerNode) # TODO: avoid recursion
elif colors[v] == GRAY: elif colors[v] == GRAY:
visitor.backEdge((u, v), self) visitor.backEdge((u, v), self)
pass # (u,v) is a back edge pass # (u,v) is a back edge
@ -916,6 +927,79 @@ class Graph(BaseObject):
self.dfs(visitor=visitor, startNodes=startNodes) self.dfs(visitor=visitor, startNodes=startNodes)
return (nodes, edges) return (nodes, edges)
def minMaxDepthPerNode(self, startNodes=None):
"""
Compute the min and max depth for each node.
:param startNodes: list of starting nodes. Use all leaves if empty.
:return: {node: (minDepth, maxDepth)}
"""
depthPerNode = {}
for node in self.nodes:
depthPerNode[node] = (0, 0)
visitor = Visitor()
def finishEdge(edge, graph):
u, v = edge
du = depthPerNode[u]
dv = depthPerNode[v]
if du[0] == 0:
# if not initialized, set the depth of the first child
depthMin = dv[0] + 1
else:
depthMin = min(du[0], dv[0] + 1)
depthPerNode[u] = (depthMin, max(du[1], dv[1] + 1))
visitor.finishEdge = finishEdge
self.dfs(visitor=visitor, startNodes=startNodes)
return depthPerNode
def dfsMaxEdgeLength(self, startNodes=None):
"""
:param startNodes: list of starting nodes. Use all leaves if empty.
:return:
"""
nodesStack = []
edgesScore = defaultdict(lambda: 0)
visitor = Visitor()
def finishEdge(edge, graph):
u, v = edge
for i, n in enumerate(reversed(nodesStack)):
index = i + 1
if index > edgesScore[(n, v)]:
edgesScore[(n, v)] = index
def finishVertex(vertex, graph):
v = nodesStack.pop()
assert v == vertex
visitor.discoverVertex = lambda vertex, graph: nodesStack.append(vertex)
visitor.finishVertex = finishVertex
visitor.finishEdge = finishEdge
self.dfs(visitor=visitor, startNodes=startNodes, longestPathFirst=True)
return edgesScore
def flowEdges(self, startNodes=None):
"""
Return as few edges as possible, such that if there is a directed path from one vertex to another in the
original graph, there is also such a path in the reduction.
:param startNodes:
:return: the remaining edges after a transitive reduction of the graph.
"""
flowEdges = []
edgesScore = self.dfsMaxEdgeLength(startNodes)
for e in self.edges.objects.values():
ee = (e.dst.node, e.src.node)
assert ee in edgesScore
assert edgesScore[ee] != 0
if edgesScore[ee] == 1:
flowEdges.append(ee)
return flowEdges
def _applyExpr(self): def _applyExpr(self):
for node in self._nodes: for node in self._nodes:
node._applyExpr() node._applyExpr()

View file

@ -115,6 +115,51 @@ def test_depth_diamond_graph2():
assert len(edges) == 1 assert len(edges) == 1
def test_transitive_reduction():
graph = Graph('Tests tasks depth')
tA = graph.addNewNode('Ls', input='/tmp')
tB = graph.addNewNode('AppendText', inputText='echo B')
tC = graph.addNewNode('AppendText', inputText='echo C')
tD = graph.addNewNode('AppendText', inputText='echo D')
tE = graph.addNewNode('AppendFiles')
# C
# / \
# /---/---->\
# A -> B ---> E
# \ /
# \ /
# D
graph.addEdges(
(tA.output, tE.input),
(tA.output, tB.input),
(tB.output, tC.input),
(tB.output, tD.input),
(tB.output, tE.input4),
(tC.output, tE.input3),
(tD.output, tE.input2),
)
edgesScore = graph.dfsMaxEdgeLength()
flowEdges = graph.flowEdges()
flowEdgesRes = [(tB, tA),
(tD, tB),
(tC, tB),
(tE, tD),
(tE, tC),
]
assert set(flowEdgesRes) == set(flowEdges)
depthPerNode = graph.minMaxDepthPerNode()
assert len(depthPerNode) == len(graph.nodes)
for node, (minDepth, maxDepth) in depthPerNode.iteritems():
assert node.depth == maxDepth
if __name__ == '__main__': if __name__ == '__main__':
test_depth() test_depth()
test_depth_diamond_graph() test_depth_diamond_graph()
test_transitive_reduction()