From aba9d163d5fbe52c1b6c18d4d37a5e1d2a091612 Mon Sep 17 00:00:00 2001 From: pojurer <56473157+vapavlo@users.noreply.github.com> Date: Sat, 21 Dec 2024 16:04:43 +0000 Subject: [PATCH 1/3] Add sccnn tests --- test/nn/backbones/simplicial/test_sccnn.py | 196 +++++++++++++++++++++ 1 file changed, 196 insertions(+) diff --git a/test/nn/backbones/simplicial/test_sccnn.py b/test/nn/backbones/simplicial/test_sccnn.py index 19e2b774..4f4c5f68 100644 --- a/test/nn/backbones/simplicial/test_sccnn.py +++ b/test/nn/backbones/simplicial/test_sccnn.py @@ -1,5 +1,6 @@ """Unit tests for SCCNN""" +import pytest import torch from torch_geometric.utils import get_laplacian from ...._utils.nn_module_auto_test import NNModuleAutoTest @@ -36,3 +37,198 @@ def test_SCCNNCustom(simple_graph_1): }, ]) auto_test.run() + + +@pytest.fixture +def create_sample_data(): + # Create a small sample graph for testing + num_nodes = 5 + x = torch.randn(num_nodes, 3) # 3 node features + x_1 = torch.randn(8, 4) # 8 edges with 4 features + x_2 = torch.randn(6, 5) # 6 faces with 5 features + + # Create sample Laplacians and incidence matrices + hodge_laplacian_0 = torch.sparse_coo_tensor(size=(num_nodes, num_nodes)) + down_laplacian_1 = torch.sparse_coo_tensor(size=(8, 8)) + up_laplacian_1 = torch.sparse_coo_tensor(size=(8, 8)) + down_laplacian_2 = torch.sparse_coo_tensor(size=(6, 6)) + up_laplacian_2 = torch.sparse_coo_tensor(size=(6, 6)) + + incidence_1 = torch.sparse_coo_tensor(size=(num_nodes, 8)) + incidence_2 = torch.sparse_coo_tensor(size=(8, 6)) + + return { + 'x': x, + 'x_1': x_1, + 'x_2': x_2, + 'laplacian_all': (hodge_laplacian_0, down_laplacian_1, up_laplacian_1, down_laplacian_2, up_laplacian_2), + 'incidence_all': (incidence_1, incidence_2) + } + +def test_sccnn_basic_initialization(): + """Test basic initialization of SCCNNCustom.""" + in_channels = (3, 4, 5) + hidden_channels = (6, 6, 6) + + # Test basic initialization + model = SCCNNCustom( + in_channels_all=in_channels, + hidden_channels_all=hidden_channels, + conv_order=2, + sc_order=3 + ) + assert model is not None + + # Verify layer structure + assert len(model.layers) == 2 # Default n_layers is 2 + assert hasattr(model, 'in_linear_0') + assert hasattr(model, 'in_linear_1') + assert hasattr(model, 'in_linear_2') + +def test_update_functions(): + """Test different update functions in the SCCNN.""" + in_channels = (3, 4, 5) + hidden_channels = (6, 6, 6) + + # Test sigmoid update function + model = SCCNNCustom( + in_channels_all=in_channels, + hidden_channels_all=hidden_channels, + conv_order=2, + sc_order=3, + update_func="sigmoid" + ) + assert model is not None + + # Test ReLU update function + model = SCCNNCustom( + in_channels_all=in_channels, + hidden_channels_all=hidden_channels, + conv_order=2, + sc_order=3, + update_func="relu" + ) + assert model is not None + +def test_aggr_norm(create_sample_data): + """Test aggregation normalization functionality.""" + data = create_sample_data + + model = SCCNNCustom( + in_channels_all=(3, 4, 5), + hidden_channels_all=(6, 6, 6), + conv_order=2, + sc_order=3, + aggr_norm=True + ) + + # Forward pass with aggregation normalization + output = model( + (data['x'], data['x_1'], data['x_2']), + data['laplacian_all'], + data['incidence_all'] + ) + + assert len(output) == 3 + assert all(torch.isfinite(out).all() for out in output) + +def test_different_conv_orders(): + """Test SCCNN with different convolution orders.""" + in_channels = (3, 4, 5) + hidden_channels = (6, 6, 6) + + # Test with conv_order = 1 + model1 = SCCNNCustom( + in_channels_all=in_channels, + hidden_channels_all=hidden_channels, + conv_order=1, + sc_order=3 + ) + assert model1 is not None + + # Test with conv_order = 3 + model2 = SCCNNCustom( + in_channels_all=in_channels, + hidden_channels_all=hidden_channels, + conv_order=3, + sc_order=3 + ) + assert model2 is not None + + # Test invalid conv_order + with pytest.raises(AssertionError): + model = SCCNNCustom( + in_channels_all=in_channels, + hidden_channels_all=hidden_channels, + conv_order=0, + sc_order=3 + ) + +def test_different_sc_orders(): + """Test SCCNN with different simplicial complex orders.""" + in_channels = (3, 4, 5) + hidden_channels = (6, 6, 6) + + # Test with sc_order = 2 + model1 = SCCNNCustom( + in_channels_all=in_channels, + hidden_channels_all=hidden_channels, + conv_order=2, + sc_order=2 + ) + assert model1 is not None + + # Test with sc_order > 2 + model2 = SCCNNCustom( + in_channels_all=in_channels, + hidden_channels_all=hidden_channels, + conv_order=2, + sc_order=3 + ) + assert model2 is not None + +def test_forward_shapes(create_sample_data): + """Test output shapes for different input configurations.""" + data = create_sample_data + + model = SCCNNCustom( + in_channels_all=(3, 4, 5), + hidden_channels_all=(6, 6, 6), + conv_order=2, + sc_order=3 + ) + + output = model( + (data['x'], data['x_1'], data['x_2']), + data['laplacian_all'], + data['incidence_all'] + ) + + assert output[0].shape == (data['x'].shape[0], 6) + assert output[1].shape == (data['x_1'].shape[0], 6) + assert output[2].shape == (data['x_2'].shape[0], 6) + +def test_n_layers(): + """Test SCCNN with different numbers of layers.""" + in_channels = (3, 4, 5) + hidden_channels = (6, 6, 6) + + # Test with 1 layer + model1 = SCCNNCustom( + in_channels_all=in_channels, + hidden_channels_all=hidden_channels, + conv_order=2, + sc_order=3, + n_layers=1 + ) + assert len(model1.layers) == 1 + + # Test with 3 layers + model2 = SCCNNCustom( + in_channels_all=in_channels, + hidden_channels_all=hidden_channels, + conv_order=2, + sc_order=3, + n_layers=3 + ) + assert len(model2.layers) == 3 \ No newline at end of file From e1e8c8c600db58d1812a450322e61712dae59fd4 Mon Sep 17 00:00:00 2001 From: pojurer <56473157+vapavlo@users.noreply.github.com> Date: Sat, 21 Dec 2024 16:05:31 +0000 Subject: [PATCH 2/3] Add readouts tests --- test/nn/readouts/__init__.py | 0 test/nn/readouts/test_identical.py | 107 ++++++++++++++ .../nn/readouts/test_propagate_signal_down.py | 131 ++++++++++++++++++ 3 files changed, 238 insertions(+) create mode 100644 test/nn/readouts/__init__.py create mode 100644 test/nn/readouts/test_identical.py create mode 100644 test/nn/readouts/test_propagate_signal_down.py diff --git a/test/nn/readouts/__init__.py b/test/nn/readouts/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/test/nn/readouts/test_identical.py b/test/nn/readouts/test_identical.py new file mode 100644 index 00000000..51fc53e5 --- /dev/null +++ b/test/nn/readouts/test_identical.py @@ -0,0 +1,107 @@ +import pytest +import torch +import torch_geometric.data as tg_data +from topobenchmark.nn.readouts.base import AbstractZeroCellReadOut +from topobenchmark.nn.readouts.identical import NoReadOut + + +class TestNoReadOut: + @pytest.fixture + def base_kwargs(self): + """Fixture providing the required base parameters.""" + return { + 'hidden_dim': 64, + 'out_channels': 32, + 'task_level': 'graph' + } + + @pytest.fixture + def readout_layer(self, base_kwargs): + """Fixture to create a NoReadOut instance for testing.""" + return NoReadOut(**base_kwargs) + + @pytest.fixture + def sample_model_output(self): + """Fixture to create a sample model output dictionary.""" + return { + 'x_0': torch.randn(10, 64), # Required key for model output + 'edge_indices': torch.randint(0, 10, (2, 15)), + 'other_data': torch.randn(10, 32) + } + + @pytest.fixture + def sample_batch(self): + """Fixture to create a sample batch of graph data.""" + return tg_data.Data( + x=torch.randn(10, 32), + edge_index=torch.randint(0, 10, (2, 15)), + batch_0=torch.zeros(10, dtype=torch.long) # Required key for batch data + ) + + def test_initialization(self, base_kwargs): + """Test that NoReadOut initializes correctly with required parameters.""" + readout = NoReadOut(**base_kwargs) + assert isinstance(readout, NoReadOut) + assert isinstance(readout, AbstractZeroCellReadOut) + + def test_forward_pass_returns_unchanged_output(self, readout_layer, sample_model_output, sample_batch): + """Test that forward pass returns the model output without modifications.""" + original_output = sample_model_output.copy() + output = readout_layer(sample_model_output, sample_batch) + + # The output should contain the original data plus the computed logits + for key in original_output: + assert key in output + assert torch.equal(output[key], original_output[key]) + assert 'logits' in output + + def test_invalid_task_level(self, base_kwargs): + """Test that initialization fails with invalid task_level.""" + invalid_kwargs = base_kwargs.copy() + invalid_kwargs['task_level'] = 'invalid_level' + with pytest.raises(AssertionError, match="Invalid task_level"): + NoReadOut(**invalid_kwargs) + + def test_repr(self, readout_layer): + """Test the string representation of the NoReadOut layer.""" + assert str(readout_layer) == "NoReadOut()" + assert repr(readout_layer) == "NoReadOut()" + + def test_forward_pass_with_different_batch_sizes(self, readout_layer): + """Test that forward pass works with different batch sizes.""" + # Test with single graph + single_batch = tg_data.Data( + x=torch.randn(5, 32), + edge_index=torch.randint(0, 5, (2, 8)), + batch_0=torch.zeros(5, dtype=torch.long) + ) + single_output = { + 'x_0': torch.randn(5, 64), + 'embeddings': torch.randn(5, 64) + } + result = readout_layer(single_output, single_batch) + assert 'logits' in result + + # Test with multiple graphs + multi_batch = tg_data.Data( + x=torch.randn(15, 32), + edge_index=torch.randint(0, 15, (2, 25)), + batch_0=torch.cat([torch.zeros(5), torch.ones(5), torch.ones(5) * 2]).long() + ) + multi_output = { + 'x_0': torch.randn(15, 64), + 'embeddings': torch.randn(15, 64) + } + result = readout_layer(multi_output, multi_batch) + assert 'logits' in result + + def test_kwargs_handling(self, base_kwargs): + """Test that the layer correctly handles both required and additional keyword arguments.""" + additional_kwargs = { + 'random_param': 42, + 'another_param': 'test', + 'pooling_type': 'mean' # Valid additional parameter + } + kwargs = {**base_kwargs, **additional_kwargs} + readout = NoReadOut(**kwargs) + assert isinstance(readout, NoReadOut) \ No newline at end of file diff --git a/test/nn/readouts/test_propagate_signal_down.py b/test/nn/readouts/test_propagate_signal_down.py new file mode 100644 index 00000000..1f21de58 --- /dev/null +++ b/test/nn/readouts/test_propagate_signal_down.py @@ -0,0 +1,131 @@ +import pytest +import torch +import torch_geometric.data as tg_data +import topomodelx +from topobenchmark.nn.readouts.propagate_signal_down import PropagateSignalDown + + +class TestPropagateSignalDown: + @pytest.fixture + def base_kwargs(self): + """Fixture providing the required base parameters.""" + return { + 'hidden_dim': 64, + 'out_channels': 32, + 'task_level': 'graph', + 'num_cell_dimensions': 2, # Need at least 2 dimensions for signal propagation + 'readout_name': 'test_readout' + } + + @pytest.fixture + def readout_layer(self, base_kwargs): + """Fixture to create a PropagateSignalDown instance for testing.""" + layer = PropagateSignalDown(**base_kwargs) + layer.hidden_dim = base_kwargs['hidden_dim'] + return layer + + @pytest.fixture + def create_sparse_incidence_matrix(self): + """Helper fixture to create sparse incidence matrices.""" + def _create_matrix(num_source, num_target, sparsity=0.3): + num_entries = int(num_source * num_target * sparsity) + indices = torch.zeros((2, num_entries), dtype=torch.long) + values = torch.ones(num_entries) + + for i in range(num_entries): + source = torch.randint(0, num_source, (1,)) + target = torch.randint(0, num_target, (1,)) + indices[0, i] = source + indices[1, i] = target + values[i] = torch.randint(0, 2, (1,)) * 2 - 1 # {-1, 1} values + + sparse_matrix = torch.sparse_coo_tensor( + indices=torch.stack([indices[1], indices[0]]), + values=values, + size=(num_target, num_source) + ).coalesce() + + return sparse_matrix + return _create_matrix + + @pytest.fixture + def sample_batch(self, create_sparse_incidence_matrix): + """Fixture to create a sample batch with required incidence matrices.""" + num_nodes = 10 + num_edges = 15 + + return tg_data.Data( + x=torch.randn(num_nodes, 64), + edge_index=torch.randint(0, num_nodes, (2, num_edges)), + batch_0=torch.zeros(num_nodes, dtype=torch.long), + incidence_1=create_sparse_incidence_matrix(num_edges, num_nodes) + ) + + @pytest.fixture + def sample_model_output(self, sample_batch): + """Fixture to create a sample model output with cell embeddings.""" + hidden_dim = 64 + + num_nodes = sample_batch.x.size(0) + num_edges = sample_batch.edge_index.size(1) + + return { + 'logits': torch.randn(num_nodes, hidden_dim), + 'x_0': torch.randn(num_nodes, hidden_dim), + 'x_1': torch.randn(num_edges, hidden_dim), + } + + def test_forward_propagation(self, readout_layer, sample_model_output, sample_batch): + """Test the forward pass with detailed assertions.""" + initial_output = {k: v.clone() for k, v in sample_model_output.items()} + sample_model_output['x_0'] = sample_model_output['logits'] + + output = readout_layer(sample_model_output, sample_batch) + + assert 'x_0' in output + assert output['x_0'].shape == initial_output['logits'].shape + assert output['x_0'].dtype == torch.float32 + + assert 'x_1' in output + assert output['x_1'].shape == initial_output['x_1'].shape + assert output['x_1'].dtype == torch.float32 + + @pytest.mark.parametrize('missing_key', ['incidence_1']) + def test_missing_incidence_matrix(self, readout_layer, sample_model_output, sample_batch, missing_key): + """Test handling of missing incidence matrices.""" + invalid_batch = tg_data.Data(**{k: v for k, v in sample_batch.items() if k != missing_key}) + sample_model_output['x_0'] = sample_model_output['logits'] + + with pytest.raises(KeyError): + readout_layer(sample_model_output, invalid_batch) + + @pytest.mark.parametrize('missing_key', ['x_1']) # Changed to only test x_1 + def test_missing_cell_features(self, readout_layer, sample_model_output, sample_batch, missing_key): + """Test handling of missing cell features.""" + invalid_output = {k: v for k, v in sample_model_output.items() if k != missing_key} + invalid_output['x_0'] = invalid_output['logits'] # Always map logits to x_0 + + with pytest.raises(KeyError): + readout_layer(invalid_output, sample_batch) + + def test_gradient_flow(self, readout_layer, sample_model_output, sample_batch): + """Test gradient flow through the network.""" + # Create a copy of logits tensor to track gradients properly + logits = sample_model_output['logits'].clone().detach().requires_grad_(True) + x_1 = sample_model_output['x_1'].clone().detach().requires_grad_(True) + + model_output = { + 'logits': logits, + 'x_0': logits, # Share the same tensor + 'x_1': x_1 + } + + output = readout_layer(model_output, sample_batch) + loss = output['x_0'].sum() + loss.backward() + + # Check gradient flow + assert logits.grad is not None + assert not torch.allclose(logits.grad, torch.zeros_like(logits.grad)) + assert x_1.grad is not None + assert not torch.allclose(x_1.grad, torch.zeros_like(x_1.grad)) \ No newline at end of file From 38fbf3272aadfcd2fce05a4d04fe7ed0ccb72d0f Mon Sep 17 00:00:00 2001 From: pojurer <56473157+vapavlo@users.noreply.github.com> Date: Sat, 21 Dec 2024 20:03:54 +0000 Subject: [PATCH 3/3] Add gcnn tests --- test/nn/backbones/combinatorial/test_gccn.py | 148 +++++++- .../combinatorial/test_gccn_onehasse.py | 346 ++++++++++++++++++ 2 files changed, 493 insertions(+), 1 deletion(-) diff --git a/test/nn/backbones/combinatorial/test_gccn.py b/test/nn/backbones/combinatorial/test_gccn.py index 8b382e21..22e73bd3 100644 --- a/test/nn/backbones/combinatorial/test_gccn.py +++ b/test/nn/backbones/combinatorial/test_gccn.py @@ -230,4 +230,150 @@ def test_get_activation(): assert issubclass(relu_module, torch.nn.Module) with pytest.raises(NotImplementedError): - get_activation("invalid_activation") \ No newline at end of file + get_activation("invalid_activation") + + +@pytest.mark.parametrize("activation", ["relu", "elu", "tanh", "id"]) +def test_topotune_different_activations(activation): + """ + Test TopoTune with multiple activations to improve coverage of get_activation. + + Parameters + ---------- + activation : str + Activation function. + """ + batch = create_mock_complex_batch() + gnn = MockGNN(16, 32, 16) + + neighborhoods = OmegaConf.create(["up_adjacency-0", "down_incidence-1"]) + model = TopoTune( + GNN=gnn, + neighborhoods=neighborhoods, + layers=1, # single layer to keep test simpler + use_edge_attr=False, + activation=activation, + ) + + output = model(batch) + # We expect a dict of updated features for each rank in the batch + assert isinstance(output, dict) + for rank, feat in output.items(): + assert isinstance(feat, torch.Tensor) + # The shape should match the original x_rank shape + original_feat = getattr(batch, f"x_{rank}") + assert feat.shape == original_feat.shape + + +def test_topotune_use_edge_attr_true(): + """ + Test TopoTune with use_edge_attr=True to ensure that edge attributes flow through properly. + """ + batch = create_mock_complex_batch() + gnn = MockGNN(16, 32, 16) + + # Add more complex neighborhoods to ensure both interrank and intrarank expansions + neighborhoods = OmegaConf.create([ + "up_adjacency-0", # intrarank route rank=0->0 + "up_adjacency-1", # intrarank route rank=1->1 + "down_incidence-1", # interrank route rank=1->0 + "down_incidence-2", # interrank route rank=2->1 + ]) + model = TopoTune( + GNN=gnn, + neighborhoods=neighborhoods, + layers=2, + use_edge_attr=True, + activation="relu", + ) + + output = model(batch) + assert isinstance(output, dict) + # Check that each rank in [0,1,2] got updated + for rank in range(3): + assert rank in output + assert isinstance(output[rank], torch.Tensor) + # The shape should match the original x_rank shape + original_feat = getattr(batch, f"x_{rank}") + assert output[rank].shape == original_feat.shape + + +def test_topotune_single_node_per_rank(): + """ + Test corner case: each rank has only 1 cell, ensuring the path that returns early in intrarank_gnn_forward (x.shape[0] < 2). + """ + # Create a batch with just 1 node, 1 edge, 1 face + batch = create_mock_complex_batch() + gnn = MockGNN(16, 32, 16) + + neighborhoods = OmegaConf.create(["up_adjacency-0", "down_incidence-1"]) + model = TopoTune( + GNN=gnn, + neighborhoods=neighborhoods, + layers=1, + use_edge_attr=False, + activation="relu", + ) + output = model(batch) + # Since we have exactly 1 cell in each rank, intrarank_gnn_forward + # should skip the GNN pass and return the original features + assert isinstance(output, dict) + for rank, feat in output.items(): + # Should remain the same as the input + assert torch.allclose(feat, getattr(batch, f"x_{rank}"), atol=1e-6) + + +def test_topotune_multiple_layers(): + """ + Test TopoTune with multiple layers > 2 to ensure repeated forward passes. + """ + batch = create_mock_complex_batch() + gnn = MockGNN(16, 32, 16) + + neighborhoods = OmegaConf.create(["up_adjacency-0", "down_incidence-1"]) + model = TopoTune( + GNN=gnn, + neighborhoods=neighborhoods, + layers=3, # more than 2 + use_edge_attr=False, + activation="relu", + ) + + output = model(batch) + assert isinstance(output, dict) + # By default, the final shape should still be (N, 16) per rank + for rank, feat in output.items(): + original_feat = getattr(batch, f"x_{rank}") + assert feat.shape == original_feat.shape + + +def test_topotune_src_rank_larger_than_dst_rank(): + """ + Test a scenario where src_rank > dst_rank for an interrank route. + """ + batch = create_mock_complex_batch() + gnn = MockGNN(16, 32, 16) + # Force a route from rank=2 -> rank=0, for instance + neighborhoods = OmegaConf.create(["down_incidence-1", "down_incidence-2"]) + # topotune will interpret these strings as routes: + # (1->0) from down_incidence-1 + # (2->1) from down_incidence-2 + # Let's force an additional route from 2->0 by customizing the route logic if you want + # but as is, 2->0 won't happen automatically unless your `get_routes_from_neighborhoods` + # is coded that way. We'll just rely on existing logic for (2->1). + + model = TopoTune( + GNN=gnn, + neighborhoods=neighborhoods, + layers=1, + use_edge_attr=False, + activation="relu", + ) + + output = model(batch) + assert isinstance(output, dict) + # Ranks 0, 1, 2 should exist in the final output dictionary + for rank in [0, 1, 2]: + assert rank in output + assert output[rank].shape == getattr(batch, f"x_{rank}").shape + diff --git a/test/nn/backbones/combinatorial/test_gccn_onehasse.py b/test/nn/backbones/combinatorial/test_gccn_onehasse.py index fa898927..b7403a13 100644 --- a/test/nn/backbones/combinatorial/test_gccn_onehasse.py +++ b/test/nn/backbones/combinatorial/test_gccn_onehasse.py @@ -43,6 +43,42 @@ def forward(self, x, edge_index): Output of the GCN layer. """ return self.conv(x, edge_index) + + +class MockGNNWithLinear(MockGNN): + """ + Mock GNN with Linear layer (ignoring edge_index). + + Parameters + ---------- + in_channels : int + Number of input channels. + hidden_channels : int + Number of hidden channels. + out_channels : int + Number of output channels. + """ + def __init__(self, in_channels, hidden_channels, out_channels): + super().__init__(in_channels, hidden_channels, out_channels) + self.linear = torch.nn.Linear(in_channels, out_channels) + + def forward(self, x, edge_index=None): + """Forward pass of the MockGNN. + + Parameters + ---------- + x : torch.Tensor + Input node features. + edge_index : torch.Tensor + Edge indices. + + Returns + ------- + torch.Tensor + Output of the GCN layer. + """ + return self.linear(x) + def create_mock_complex_batch(): """Create a mock complex batch for testing. @@ -217,3 +253,313 @@ def test_get_activation(): with pytest.raises(NotImplementedError): get_activation("invalid_activation") + + +def test_topotune_onehasse_early_return_x2_zero(): + """ + Test the early return path in forward() when batch.x_2.shape[0] == 0. + """ + batch = create_mock_complex_batch() + batch.x_2 = torch.zeros((0, 16)) # Force x_2 to have 0 faces + gnn = MockGNN(16, 32, 16) + + # Define any neighborhoods; they won't matter since x_2=0 triggers early return + neighborhoods = OmegaConf.create(["up_adjacency-0", "down_incidence-2"]) + + model = TopoTune_OneHasse( + GNN=gnn, + neighborhoods=neighborhoods, + layers=2, + use_edge_attr=False, + activation="relu", + ) + out = model(batch) + # Model should skip expansions and return {0: x_0, 1: x_1, 2: x_2}. + assert 0 in out and 1 in out and 2 in out + assert out[0].shape == batch.x_0.shape + assert out[1].shape == batch.x_1.shape + assert out[2].shape == batch.x_2.shape + # Verify no changes were made to the features + assert torch.allclose(out[0], batch.x_0, atol=1e-6) + assert torch.allclose(out[1], batch.x_1, atol=1e-6) + assert out[2].numel() == 0 # zero faces indeed + + +def test_topotune_onehasse_fallback_rank_not_updated(): + """ + Test the fallback in forward() for a rank that is never updated. + """ + batch = create_mock_complex_batch() + gnn = MockGNN(16, 32, 16) + + # Suppose we only define neighborhoods that produce a route for rank=0->0, + # ignoring rank=2 entirely. This means rank=2 won't show up in x_out_per_rank, + # triggering the fallback assignment in forward(). + neighborhoods = OmegaConf.create(["up_adjacency-0"]) + + model = TopoTune_OneHasse( + GNN=gnn, + neighborhoods=neighborhoods, + layers=1, + use_edge_attr=False, + activation="relu", + ) + out = model(batch) + # Ranks 0,1,2 should be in final output, even though only rank=0 was updated. + assert 0 in out and 1 in out and 2 in out + # rank=2 should remain the same as the input + assert torch.allclose(out[2], batch.x_2, atol=1e-6) + + +@pytest.mark.parametrize( + "bad_neighborhood,expected_errmsg", + [ + ("up_adjacency-2", "Unsupported src_rank for 'up' neighborhood: 2"), + ("down_adjacency-0", "Unsupported src_rank for 'down' neighborhood: 0"), + ("down_incidence-0", "Unsupported src_rank for 'down_incidence' neighborhood: 0"), + ("up_incidence-2", "Unsupported src_rank for 'up_incidence' neighborhood: 2"), + ] +) +def test_topotune_onehasse_unsupported_src_rank_raises(bad_neighborhood, expected_errmsg): + """ + Test that a ValueError is raised if a neighborhood implies an unsupported src_rank. + + Parameters + ---------- + bad_neighborhood : str + Unsupported neighborhood name. + expected_errmsg : str + Expected error message. + """ + batch = create_mock_complex_batch() + gnn = MockGNN(16, 32, 16) + + neighborhoods = OmegaConf.create([bad_neighborhood]) + model = TopoTune_OneHasse( + GNN=gnn, + neighborhoods=neighborhoods, + layers=1, + use_edge_attr=False, + activation="relu", + ) + + with pytest.raises(ValueError, match=expected_errmsg): + model(batch) + + +def test_topotune_onehasse_indexerror_in_aggregate_inter_nbhd(mocker): + """ + Force an IndexError in aggregate_inter_nbhd to cover that branch. + + Parameters + ---------- + mocker : pytest_mock.plugin.MockerFixture + Mocker object. + """ + batch = create_mock_complex_batch() + gnn = MockGNN(16, 32, 16) + neighborhoods = OmegaConf.create(["up_adjacency-0", "down_incidence-1"]) + + model = TopoTune_OneHasse( + GNN=gnn, + neighborhoods=neighborhoods, + layers=1, + use_edge_attr=False, + activation="relu", + ) + + # We'll mock or patch model.membership after it's generated so it claims rank=0 has more elements than it does. + # For instance, if batch.x_0 has shape [3,16], membership[0].shape is (3,). + # Let's forcibly set membership[0] to have 10 elements => triggers end_idx out-of-bounds. + original_generate_membership_vectors = model.generate_membership_vectors + + def fake_generate_membership_vectors(b): + """ + Fake membership vector generation that inflates membership for rank=0. + + Parameters + ---------- + b : torch_geometric.data.Data + The input batch data. + + Returns + ------- + dict of {int: torch.Tensor} + The artificially inflated membership dictionary. + """ + membership = original_generate_membership_vectors(b) + membership[0] = torch.arange(10) # artificially claim 10 'nodes' at rank=0 + return membership + + mocker.patch.object(model, 'generate_membership_vectors', side_effect=fake_generate_membership_vectors) + + with pytest.raises(IndexError, match="out of bounds"): + model(batch) + + +def create_special_batch(): + """ + Create a batch with shapes adjusted to trigger certain corner cases. + + For instance: + - 2 faces (x_2 of size [2, *]) + - Non-square adjacency or incidence to see if it leads to certain expansions + or error-handling in all_nbhds_expand. + + Returns + ------- + Data + Bacthed graphs. + """ + x_0 = torch.randn(4, 8) # rank 0: 4 nodes + x_1 = torch.randn(2, 8) # rank 1: 2 edges + x_2 = torch.randn(2, 8) # rank 2: 2 faces + batch = Data(x_0=x_0, x_1=x_1, x_2=x_2) + + # Minimal adjacency/incidence, possibly non-square + # to ensure certain expansions or indexing happen + batch["up_adjacency-0"] = torch.sparse_coo_tensor( + indices=torch.tensor([[0, 1], [1, 2]]), # slightly "irregular" + values=torch.ones(2), + size=(4, 4) + ).coalesce() + batch["down_incidence-1"] = torch.sparse_coo_tensor( + indices=torch.tensor([[0, 1], [0, 0]]), + values=torch.ones(2), + size=(4, 2) # node->edge shape or something that might not match typical + ).coalesce() + # Possibly no adjacency for rank=2 or something partial + + batch["cell_statistics"] = torch.tensor([[4, 2, 2]]) + return batch + + +def test_valueerror_in_all_nbhds_expand_missing_neighborhood_key(): + """ + Trigger a ValueError by passing a neighborhood type that leads to an unsupported condition. + """ + batch = create_special_batch() + gnn = MockGNNWithLinear(8, 16, 8) + # Suppose 'down_laplacian-2' is not recognized by the code, or leads to a raise. + neighborhoods = OmegaConf.create(["down_laplacian-2"]) + + model = TopoTune_OneHasse( + GNN=gnn, + neighborhoods=neighborhoods, + layers=1, + use_edge_attr=True, + activation="relu", + ) + + with pytest.raises(AttributeError, match="GlobalStorage' object has no attribute 'down_laplacian-2"): + model(batch) + + +def test_aggregate_inter_nbhd_index_error(mocker): + """ + Force an IndexError in aggregate_inter_nbhd by artificially inflating membership for one of the ranks so end_idx exceeds x_out.shape[0]. + + Parameters + ---------- + mocker : pytest_mock.plugin.MockerFixture + Mocker object used for patching. + """ + batch = create_special_batch() + gnn = MockGNNWithLinear(8, 16, 8) + neighborhoods = OmegaConf.create(["up_adjacency-0", "down_incidence-1"]) + + model = TopoTune_OneHasse( + GNN=gnn, + neighborhoods=neighborhoods, + layers=1, + use_edge_attr=False, + activation="relu", + ) + + # We'll run forward once to set up membership, then patch membership[0]. + model.membership = model.generate_membership_vectors(batch) + # Suppose membership[0] is an array of length 4, let's artificially set it to 10 + # so the aggregator tries to slice out-of-bounds. + model.membership[0] = torch.arange(10) + + # We'll call aggregate_inter_nbhd directly, simulating a post-GNN output of size 8 + # but membership says rank 0 alone has 10 elements -> triggers IndexError. + fake_x_out = torch.randn(8, 8) # only 8 total features + with pytest.raises(IndexError, match="out of bounds"): + model.aggregate_inter_nbhd(fake_x_out) + + +def test_fallback_for_unupdated_rank(): + """ + Test the scenario where a rank never gets updated because no neighborhoods exist for it. + """ + batch = create_special_batch() + # Suppose we define a neighborhood that only touches rank=0 and rank=1 + # but never rank=2. This ensures rank=2 is not updated. + neighborhoods = OmegaConf.create(["up_adjacency-0"]) # e.g., node->node + + gnn = MockGNNWithLinear(8, 16, 8) + model = TopoTune_OneHasse( + GNN=gnn, + neighborhoods=neighborhoods, + layers=2, + use_edge_attr=False, + activation="relu", + ) + + out = model(batch) + # The code's final loop ensures rank=2 is still present even if never updated. + assert 2 in out + # rank=2 should remain exactly as input + assert torch.allclose(out[2], batch.x_2, atol=1e-6) + + +def test_partial_layer_execution_x2_nonzero(): + """ + Cover the scenario where batch.x_2.shape[0] > 0 but we still have partial execution. + """ + batch = create_special_batch() + # We have 2 faces, so x_2.shape[0] != 0 => no early return + # Let's define neighborhoods that do a partial coverage across layers + neighborhoods = OmegaConf.create(["up_adjacency-0", "down_incidence-1"]) + + gnn = MockGNNWithLinear(8, 16, 8) + model = TopoTune_OneHasse( + GNN=gnn, + neighborhoods=neighborhoods, + layers=3, # multiple layers + use_edge_attr=False, + activation="relu", + ) + + out = model(batch) + # Check that the final dictionary has ranks 0,1,2 + for rank_id in [0, 1, 2]: + assert rank_id in out + # Ensure they have the correct shape + original_feats = getattr(batch, f"x_{rank_id}") + assert out[rank_id].shape == original_feats.shape + # This helps cover code inside the for-loop for multiple layers. + + +def test_activation_id(): + """ + Ensure coverage of the 'id' activation path. + """ + batch = create_special_batch() + neighborhoods = OmegaConf.create(["up_adjacency-0"]) + gnn = MockGNNWithLinear(8, 16, 8) + + model = TopoTune_OneHasse( + GNN=gnn, + neighborhoods=neighborhoods, + layers=1, + use_edge_attr=False, + activation="id", # identity activation + ) + + out = model(batch) + # The identity activation should result in no nonlinearity being applied + # beyond the raw linear transform. + for rank_id in [0, 1, 2]: + assert rank_id in out \ No newline at end of file