[tests] Test attribute callback behavior with dynamic outputs

New test suite that focuses on testing the behavior of the attribute
changes callback system when such an attribute is connected to
an upstream dynamic output value.
This commit is contained in:
Yann Lanthony 2024-10-26 23:39:11 +02:00
parent 0231fdc1df
commit 7c21595da9

View file

@ -1,6 +1,6 @@
# coding:utf-8
from meshroom.core.graph import Graph, loadGraph
from meshroom.core.graph import Graph, loadGraph, executeGraph
from meshroom.core import desc, registerNodeType, unregisterNodeType
from meshroom.core.node import Node
@ -309,3 +309,81 @@ class TestAttributeCallbackBehaviorWithUpstreamCompoundAttributes:
assert nodeB.affectedInput.value == 20
class NodeWithDynamicOutputValue(desc.Node):
"""
A Node containing an output attribute which value is computed dynamically during graph execution.
"""
inputs = [
desc.IntParam(
name="input",
label="Input",
description="Input used in the computation of 'output'",
value=0,
),
]
outputs = [
desc.IntParam(
name="output",
label="Output",
description="Dynamically computed output (input * 2)",
# Setting value to None makes the attribute dynamic.
value=None,
),
]
def processChunk(self, chunk):
chunk.node.output.value = chunk.node.input.value * 2
class TestAttributeCallbackBehaviorWithUpstreamDynamicOutputs:
@classmethod
def setup_class(cls):
registerNodeType(NodeWithAttributeChangedCallback)
registerNodeType(NodeWithDynamicOutputValue)
@classmethod
def teardown_class(cls):
unregisterNodeType(NodeWithAttributeChangedCallback)
unregisterNodeType(NodeWithDynamicOutputValue)
def test_connectingUncomputedDynamicOutputDoesNotTriggerDownstreamAttributeChangedCallback(
self,
):
graph = Graph("")
nodeA = graph.addNewNode(NodeWithDynamicOutputValue.__name__)
nodeB = graph.addNewNode(NodeWithAttributeChangedCallback.__name__)
nodeA.input.value = 10
graph.addEdge(nodeA.output, nodeB.input)
assert nodeB.affectedInput.value == 0
def test_connectingComputedDynamicOutputTriggersDownstreamAttributeChangedCallback(
self, graphWithIsolatedCache
):
graph: Graph = graphWithIsolatedCache
nodeA = graph.addNewNode(NodeWithDynamicOutputValue.__name__)
nodeB = graph.addNewNode(NodeWithAttributeChangedCallback.__name__)
nodeA.input.value = 10
executeGraph(graph)
graph.addEdge(nodeA.output, nodeB.input)
assert nodeA.output.value == nodeB.input.value == 20
assert nodeB.affectedInput.value == 40
def test_dynamicOutputValueComputeDoesNotTriggerDownstreamAttributeChangedCallback(
self, graphWithIsolatedCache
):
graph: Graph = graphWithIsolatedCache
nodeA = graph.addNewNode(NodeWithDynamicOutputValue.__name__)
nodeB = graph.addNewNode(NodeWithAttributeChangedCallback.__name__)
graph.addEdge(nodeA.output, nodeB.input)
nodeA.input.value = 10
executeGraph(graph)
assert nodeB.input.value == 20
assert nodeB.affectedInput.value == 0