[core][graph] update nodes computability on topology change

CompatibilityNodes introduce a new notion of computability per node.
An uncomputed (and therefore uncomputable) CompatibilityNode blocks the computation of all its successor.

* update nodes computability on topology change (in addition to min/max depth)
* evaluate leaves computability to determine if the whole graph can be processed
This commit is contained in:
Yann Lanthony 2018-07-18 16:28:33 +02:00
parent 89dd55f43b
commit 30cd8001fd
2 changed files with 60 additions and 18 deletions

View file

@ -176,6 +176,8 @@ class Graph(BaseObject):
self._updateRequested = False self._updateRequested = False
self.dirtyTopology = False self.dirtyTopology = False
self._nodesMinMaxDepths = {} self._nodesMinMaxDepths = {}
self._computationBlocked = {}
self._canComputeLeaves = True
self._nodes = DictModel(keyAttrName='name', parent=self) self._nodes = DictModel(keyAttrName='name', parent=self)
self._edges = DictModel(keyAttrName='dst', parent=self) # use dst attribute as unique key since it can only have one input connection self._edges = DictModel(keyAttrName='dst', parent=self) # use dst attribute as unique key since it can only have one input connection
self.cacheDir = meshroom.core.defaultCacheFolder self.cacheDir = meshroom.core.defaultCacheFolder
@ -654,6 +656,8 @@ class Graph(BaseObject):
if vertex.hasStatus(Status.SUCCESS): if vertex.hasStatus(Status.SUCCESS):
# stop branch visit if discovering a node already computed # stop branch visit if discovering a node already computed
raise StopBranchVisit() raise StopBranchVisit()
if self._computationBlocked[vertex]:
raise RuntimeError("Can't compute node '{}'".format(vertex.name))
def finishVertex(vertex, graph): def finishVertex(vertex, graph):
chunksToProcess = [] chunksToProcess = []
@ -679,36 +683,73 @@ class Graph(BaseObject):
self.dfs(visitor=visitor, startNodes=startNodes) self.dfs(visitor=visitor, startNodes=startNodes)
return nodes, edges return nodes, edges
def minMaxDepthPerNode(self, startNodes=None): @Slot(Node, result=bool)
def canCompute(self, node):
""" """
Compute the min and max depth for each node. Return the computability of a node based on itself and its dependency chain.
Computation can't happen for:
- CompatibilityNodes
- nodes having a non-computed CompatibilityNode in its dependency chain
Args: Args:
startNodes: list of starting nodes. Use all leaves if empty. node (Node): the node to evaluate
Returns: Returns:
dict: {node: (minDepth, maxDepth)} bool: whether the node can be computed
""" """
depthPerNode = {} if isinstance(node, CompatibilityNode):
for node in self.nodes: return False
depthPerNode[node] = (0, 0) return not self._computationBlocked[node]
def updateNodesTopologicalData(self):
"""
Compute and cache nodes topological data:
- min and max depth
- computability
"""
self._nodesMinMaxDepths.clear()
self._computationBlocked.clear()
visitor = Visitor() visitor = Visitor()
def discoverVertex(vertex, graph):
# initialize depths
self._nodesMinMaxDepths[vertex] = (0, 0)
# initialize computability
self._computationBlocked[vertex] = isinstance(vertex, CompatibilityNode) and not vertex.hasStatus(Status.SUCCESS)
def finishEdge(edge, graph): def finishEdge(edge, graph):
u, v = edge currentVertex, inputVertex = edge
du = depthPerNode[u]
dv = depthPerNode[v] # update depths
du = self._nodesMinMaxDepths[currentVertex]
dv = self._nodesMinMaxDepths[inputVertex]
if du[0] == 0: if du[0] == 0:
# if not initialized, set the depth of the first child # if not initialized, set the depth of the first child
depthMin = dv[0] + 1 depthMin = dv[0] + 1
else: else:
depthMin = min(du[0], dv[0] + 1) depthMin = min(du[0], dv[0] + 1)
depthPerNode[u] = (depthMin, max(du[1], dv[1] + 1)) self._nodesMinMaxDepths[currentVertex] = (depthMin, max(du[1], dv[1] + 1))
# update computability
if currentVertex.hasStatus(Status.SUCCESS):
# output is already computed and available,
# does not depend on input connections computability
return
# propagate inputVertex computability
self._computationBlocked[currentVertex] |= self._computationBlocked[inputVertex]
leaves = self.getLeaves()
visitor.finishEdge = finishEdge visitor.finishEdge = finishEdge
self.dfs(visitor=visitor, startNodes=startNodes) visitor.discoverVertex = discoverVertex
return depthPerNode self.dfs(visitor=visitor, startNodes=leaves)
# update graph computability status
canComputeLeaves = all([self.canCompute(node) for node in leaves])
if self._canComputeLeaves != canComputeLeaves:
self._canComputeLeaves = canComputeLeaves
self.canComputeLeavesChanged.emit()
def dfsMaxEdgeLength(self, startNodes=None): def dfsMaxEdgeLength(self, startNodes=None):
""" """
@ -858,8 +899,8 @@ class Graph(BaseObject):
# Graph topology has changed # Graph topology has changed
if self.dirtyTopology: if self.dirtyTopology:
# Update nodes depths cache # update nodes topological data cache
self._nodesMinMaxDepths = self.minMaxDepthPerNode() self.updateNodesTopologicalData()
self.dirtyTopology = False self.dirtyTopology = False
self.updateInternals() self.updateInternals()
@ -955,6 +996,8 @@ class Graph(BaseObject):
cacheDirChanged = Signal() cacheDirChanged = Signal()
cacheDir = Property(str, cacheDir.fget, cacheDir.fset, notify=cacheDirChanged) cacheDir = Property(str, cacheDir.fget, cacheDir.fset, notify=cacheDirChanged)
updated = Signal() updated = Signal()
canComputeLeavesChanged = Signal()
canComputeLeaves = Property(bool, lambda self: self._canComputeLeaves, notify=canComputeLeavesChanged)
def loadGraph(filepath): def loadGraph(filepath):

View file

@ -153,9 +153,8 @@ def test_transitive_reduction():
] ]
assert set(flowEdgesRes) == set(flowEdges) assert set(flowEdgesRes) == set(flowEdges)
depthPerNode = graph.minMaxDepthPerNode() assert len(graph._nodesMinMaxDepths) == len(graph.nodes)
assert len(depthPerNode) == len(graph.nodes) for node, (minDepth, maxDepth) in graph._nodesMinMaxDepths.items():
for node, (minDepth, maxDepth) in depthPerNode.items():
assert node.depth == maxDepth assert node.depth == maxDepth