From 30cd8001fdbd41a43aaf0834bcab32ecaf70fed3 Mon Sep 17 00:00:00 2001 From: Yann Lanthony Date: Wed, 18 Jul 2018 16:28:33 +0200 Subject: [PATCH] [core][graph] update nodes computability on topology change CompatibilityNodes introduce a new notion of computability per node. An uncomputed (and therefore uncomputable) CompatibilityNode blocks the computation of all its successor. * update nodes computability on topology change (in addition to min/max depth) * evaluate leaves computability to determine if the whole graph can be processed --- meshroom/core/graph.py | 73 +++++++++++++++++++++++++++++++++--------- tests/test_graph.py | 5 ++- 2 files changed, 60 insertions(+), 18 deletions(-) diff --git a/meshroom/core/graph.py b/meshroom/core/graph.py index f63e27b8..00a08168 100644 --- a/meshroom/core/graph.py +++ b/meshroom/core/graph.py @@ -176,6 +176,8 @@ class Graph(BaseObject): self._updateRequested = False self.dirtyTopology = False self._nodesMinMaxDepths = {} + self._computationBlocked = {} + self._canComputeLeaves = True self._nodes = DictModel(keyAttrName='name', parent=self) self._edges = DictModel(keyAttrName='dst', parent=self) # use dst attribute as unique key since it can only have one input connection self.cacheDir = meshroom.core.defaultCacheFolder @@ -654,6 +656,8 @@ class Graph(BaseObject): if vertex.hasStatus(Status.SUCCESS): # stop branch visit if discovering a node already computed raise StopBranchVisit() + if self._computationBlocked[vertex]: + raise RuntimeError("Can't compute node '{}'".format(vertex.name)) def finishVertex(vertex, graph): chunksToProcess = [] @@ -679,36 +683,73 @@ class Graph(BaseObject): self.dfs(visitor=visitor, startNodes=startNodes) return nodes, edges - def minMaxDepthPerNode(self, startNodes=None): + @Slot(Node, result=bool) + def canCompute(self, node): """ - Compute the min and max depth for each node. + Return the computability of a node based on itself and its dependency chain. + Computation can't happen for: + - CompatibilityNodes + - nodes having a non-computed CompatibilityNode in its dependency chain Args: - startNodes: list of starting nodes. Use all leaves if empty. + node (Node): the node to evaluate Returns: - dict: {node: (minDepth, maxDepth)} + bool: whether the node can be computed """ - depthPerNode = {} - for node in self.nodes: - depthPerNode[node] = (0, 0) + if isinstance(node, CompatibilityNode): + return False + return not self._computationBlocked[node] + + def updateNodesTopologicalData(self): + """ + Compute and cache nodes topological data: + - min and max depth + - computability + """ + + self._nodesMinMaxDepths.clear() + self._computationBlocked.clear() visitor = Visitor() + def discoverVertex(vertex, graph): + # initialize depths + self._nodesMinMaxDepths[vertex] = (0, 0) + # initialize computability + self._computationBlocked[vertex] = isinstance(vertex, CompatibilityNode) and not vertex.hasStatus(Status.SUCCESS) + def finishEdge(edge, graph): - u, v = edge - du = depthPerNode[u] - dv = depthPerNode[v] + currentVertex, inputVertex = edge + + # update depths + du = self._nodesMinMaxDepths[currentVertex] + dv = self._nodesMinMaxDepths[inputVertex] if du[0] == 0: # if not initialized, set the depth of the first child depthMin = dv[0] + 1 else: depthMin = min(du[0], dv[0] + 1) - depthPerNode[u] = (depthMin, max(du[1], dv[1] + 1)) + self._nodesMinMaxDepths[currentVertex] = (depthMin, max(du[1], dv[1] + 1)) + # update computability + if currentVertex.hasStatus(Status.SUCCESS): + # output is already computed and available, + # does not depend on input connections computability + return + # propagate inputVertex computability + self._computationBlocked[currentVertex] |= self._computationBlocked[inputVertex] + + leaves = self.getLeaves() visitor.finishEdge = finishEdge - self.dfs(visitor=visitor, startNodes=startNodes) - return depthPerNode + visitor.discoverVertex = discoverVertex + self.dfs(visitor=visitor, startNodes=leaves) + + # update graph computability status + canComputeLeaves = all([self.canCompute(node) for node in leaves]) + if self._canComputeLeaves != canComputeLeaves: + self._canComputeLeaves = canComputeLeaves + self.canComputeLeavesChanged.emit() def dfsMaxEdgeLength(self, startNodes=None): """ @@ -858,8 +899,8 @@ class Graph(BaseObject): # Graph topology has changed if self.dirtyTopology: - # Update nodes depths cache - self._nodesMinMaxDepths = self.minMaxDepthPerNode() + # update nodes topological data cache + self.updateNodesTopologicalData() self.dirtyTopology = False self.updateInternals() @@ -955,6 +996,8 @@ class Graph(BaseObject): cacheDirChanged = Signal() cacheDir = Property(str, cacheDir.fget, cacheDir.fset, notify=cacheDirChanged) updated = Signal() + canComputeLeavesChanged = Signal() + canComputeLeaves = Property(bool, lambda self: self._canComputeLeaves, notify=canComputeLeavesChanged) def loadGraph(filepath): diff --git a/tests/test_graph.py b/tests/test_graph.py index 8c0c5e41..c7fa6e4d 100644 --- a/tests/test_graph.py +++ b/tests/test_graph.py @@ -153,9 +153,8 @@ def test_transitive_reduction(): ] assert set(flowEdgesRes) == set(flowEdges) - depthPerNode = graph.minMaxDepthPerNode() - assert len(depthPerNode) == len(graph.nodes) - for node, (minDepth, maxDepth) in depthPerNode.items(): + assert len(graph._nodesMinMaxDepths) == len(graph.nodes) + for node, (minDepth, maxDepth) in graph._nodesMinMaxDepths.items(): assert node.depth == maxDepth