From e533c588dd5b71f4e1dfac3c7f5bfa0ba9db935a Mon Sep 17 00:00:00 2001 From: Fabien Castan Date: Tue, 24 Oct 2017 20:18:35 +0200 Subject: [PATCH] [core] graph: add the notion of flowEdges and minMaxDepthPerNode - flowEdges: Compute a transitive reduction of the graph to provide node-level minimal dependencies to create an execution workflow for renderfarm. - minMaxDepthPerNode: compute the min and max depth per node --- meshroom/core/graph.py | 98 +++++++++++++++++++++++++++++++++++++++--- tests/test_graph.py | 45 +++++++++++++++++++ 2 files changed, 136 insertions(+), 7 deletions(-) diff --git a/meshroom/core/graph.py b/meshroom/core/graph.py index e940b23b..d28791a9 100644 --- a/meshroom/core/graph.py +++ b/meshroom/core/graph.py @@ -7,6 +7,7 @@ import os import psutil import re import shutil +import sys import time import uuid from collections import defaultdict @@ -594,6 +595,9 @@ class Node(BaseObject): def statusName(self): return self.status.status.name + def __repr__(self): + return self.name + name = Property(str, getName, constant=True) nodeType = Property(str, nodeType.fget, constant=True) attributes = Property(BaseObject, getAttributes, constant=True) @@ -840,31 +844,38 @@ class Graph(BaseObject): return nodeEdges - def dfs(self, visitor, startNodes=None): + def dfs(self, visitor, startNodes=None, longestPathFirst=False): nodeChildren = self._getInputEdgesPerNode() + minMaxDepthPerNode = self.minMaxDepthPerNode() if longestPathFirst else None colors = {} for u in self._nodes: colors[u] = WHITE - time = 0 if startNodes: + if longestPathFirst: + startNodes = sorted(startNodes, key=lambda item: item.depth) for startNode in startNodes: - self.dfsVisit(startNode, visitor, colors, nodeChildren) + self.dfsVisit(startNode, visitor, colors, nodeChildren, longestPathFirst, minMaxDepthPerNode) else: leaves = self.getLeaves() + if longestPathFirst: + leaves = sorted(leaves, key=lambda item: item.depth) for u in leaves: if colors[u] == WHITE: - self.dfsVisit(u, visitor, colors, nodeChildren) + self.dfsVisit(u, visitor, colors, nodeChildren, longestPathFirst, minMaxDepthPerNode) - def dfsVisit(self, u, visitor, colors, nodeChildren): + def dfsVisit(self, u, visitor, colors, nodeChildren, longestPathFirst, minMaxDepthPerNode): colors[u] = GRAY visitor.discoverVertex(u, self) # d_time[u] = time = time + 1 - for v in nodeChildren[u]: + children = nodeChildren[u] + if longestPathFirst: + children = sorted(children, reverse=True, key=lambda item: minMaxDepthPerNode[item][1]) + for v in children: visitor.examineEdge((u, v), self) if colors[v] == WHITE: visitor.treeEdge((u, v), self) # (u,v) is a tree edge - self.dfsVisit(v, visitor, colors, nodeChildren) # TODO: avoid recursion + self.dfsVisit(v, visitor, colors, nodeChildren, longestPathFirst, minMaxDepthPerNode) # TODO: avoid recursion elif colors[v] == GRAY: visitor.backEdge((u, v), self) pass # (u,v) is a back edge @@ -916,6 +927,79 @@ class Graph(BaseObject): self.dfs(visitor=visitor, startNodes=startNodes) return (nodes, edges) + def minMaxDepthPerNode(self, startNodes=None): + """ + Compute the min and max depth for each node. + + :param startNodes: list of starting nodes. Use all leaves if empty. + :return: {node: (minDepth, maxDepth)} + """ + depthPerNode = {} + for node in self.nodes: + depthPerNode[node] = (0, 0) + + visitor = Visitor() + + def finishEdge(edge, graph): + u, v = edge + du = depthPerNode[u] + dv = depthPerNode[v] + if du[0] == 0: + # if not initialized, set the depth of the first child + depthMin = dv[0] + 1 + else: + depthMin = min(du[0], dv[0] + 1) + depthPerNode[u] = (depthMin, max(du[1], dv[1] + 1)) + + visitor.finishEdge = finishEdge + self.dfs(visitor=visitor, startNodes=startNodes) + return depthPerNode + + def dfsMaxEdgeLength(self, startNodes=None): + """ + :param startNodes: list of starting nodes. Use all leaves if empty. + :return: + """ + nodesStack = [] + edgesScore = defaultdict(lambda: 0) + visitor = Visitor() + + def finishEdge(edge, graph): + u, v = edge + for i, n in enumerate(reversed(nodesStack)): + index = i + 1 + if index > edgesScore[(n, v)]: + edgesScore[(n, v)] = index + + def finishVertex(vertex, graph): + v = nodesStack.pop() + assert v == vertex + + visitor.discoverVertex = lambda vertex, graph: nodesStack.append(vertex) + visitor.finishVertex = finishVertex + visitor.finishEdge = finishEdge + self.dfs(visitor=visitor, startNodes=startNodes, longestPathFirst=True) + return edgesScore + + def flowEdges(self, startNodes=None): + """ + Return as few edges as possible, such that if there is a directed path from one vertex to another in the + original graph, there is also such a path in the reduction. + + :param startNodes: + :return: the remaining edges after a transitive reduction of the graph. + """ + flowEdges = [] + edgesScore = self.dfsMaxEdgeLength(startNodes) + + for e in self.edges.objects.values(): + ee = (e.dst.node, e.src.node) + assert ee in edgesScore + assert edgesScore[ee] != 0 + if edgesScore[ee] == 1: + flowEdges.append(ee) + return flowEdges + def _applyExpr(self): for node in self._nodes: node._applyExpr() diff --git a/tests/test_graph.py b/tests/test_graph.py index 3ba12182..4df9545e 100644 --- a/tests/test_graph.py +++ b/tests/test_graph.py @@ -115,6 +115,51 @@ def test_depth_diamond_graph2(): assert len(edges) == 1 +def test_transitive_reduction(): + + graph = Graph('Tests tasks depth') + + tA = graph.addNewNode('Ls', input='/tmp') + tB = graph.addNewNode('AppendText', inputText='echo B') + tC = graph.addNewNode('AppendText', inputText='echo C') + tD = graph.addNewNode('AppendText', inputText='echo D') + tE = graph.addNewNode('AppendFiles') + # C + # / \ + # /---/---->\ + # A -> B ---> E + # \ / + # \ / + # D + graph.addEdges( + (tA.output, tE.input), + + (tA.output, tB.input), + (tB.output, tC.input), + (tB.output, tD.input), + + (tB.output, tE.input4), + (tC.output, tE.input3), + (tD.output, tE.input2), + ) + edgesScore = graph.dfsMaxEdgeLength() + + flowEdges = graph.flowEdges() + flowEdgesRes = [(tB, tA), + (tD, tB), + (tC, tB), + (tE, tD), + (tE, tC), + ] + assert set(flowEdgesRes) == set(flowEdges) + + depthPerNode = graph.minMaxDepthPerNode() + assert len(depthPerNode) == len(graph.nodes) + for node, (minDepth, maxDepth) in depthPerNode.iteritems(): + assert node.depth == maxDepth + + if __name__ == '__main__': test_depth() test_depth_diamond_graph() + test_transitive_reduction()