From d46c4fa37040ade26c4ef2d5db07eb9cb6da0fc6 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Mon, 6 Jan 2025 12:22:31 +0000 Subject: [PATCH 1/4] Update SNAPSHOT version to v2.12.0 --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index f96204df0..a58c6f7df 100644 --- a/pom.xml +++ b/pom.xml @@ -21,7 +21,7 @@ org.gridsuite gridsuite-network-modification-server - 2.11.0-SNAPSHOT + 2.12.0-SNAPSHOT jar Network modification server From 5844631f0248831261819d925df854a35c5a2324 Mon Sep 17 00:00:00 2001 From: Slimane Amar <63394973+SlimaneAmar@users.noreply.github.com> Date: Tue, 7 Jan 2025 10:54:58 +0100 Subject: [PATCH 2/4] Add a metric for network modifications in the thread pool (#585) Signed-off-by: Slimane AMAR --- pom.xml | 9 ++-- .../NetworkModificationApplicator.java | 43 ++++++++----------- ...geNetworkModificationExecutionService.java | 42 ++++++++++++++++++ .../service/NetworkModificationObserver.java | 27 ++++++++++-- .../server/VoltageInitReportTest.java | 7 ++- 5 files changed, 93 insertions(+), 35 deletions(-) create mode 100644 src/main/java/org/gridsuite/modification/server/service/LargeNetworkModificationExecutionService.java diff --git a/pom.xml b/pom.xml index a58c6f7df..5525dff14 100644 --- a/pom.xml +++ b/pom.xml @@ -174,6 +174,10 @@ org.springframework.cloud spring-cloud-stream + + org.springframework.boot + spring-boot-starter-actuator + @@ -188,11 +192,6 @@ powsybl-config-classic runtime - - org.springframework.boot - spring-boot-starter-actuator - runtime - io.micrometer micrometer-registry-prometheus diff --git a/src/main/java/org/gridsuite/modification/server/modifications/NetworkModificationApplicator.java b/src/main/java/org/gridsuite/modification/server/modifications/NetworkModificationApplicator.java index cf4f840b9..2cefd6b43 100644 --- a/src/main/java/org/gridsuite/modification/server/modifications/NetworkModificationApplicator.java +++ b/src/main/java/org/gridsuite/modification/server/modifications/NetworkModificationApplicator.java @@ -14,8 +14,6 @@ import com.powsybl.iidm.network.Network; import com.powsybl.network.store.client.NetworkStoreService; import com.powsybl.network.store.client.PreloadingStrategy; - -import jakarta.annotation.PreDestroy; import lombok.Getter; import lombok.Setter; import org.apache.commons.lang3.tuple.Pair; @@ -30,6 +28,7 @@ import org.gridsuite.modification.server.elasticsearch.EquipmentInfosService; import org.gridsuite.modification.server.impacts.AbstractBaseImpact; import org.gridsuite.modification.server.service.FilterService; +import org.gridsuite.modification.server.service.LargeNetworkModificationExecutionService; import org.gridsuite.modification.server.service.NetworkModificationObserver; import org.gridsuite.modification.server.service.ReportService; import org.slf4j.Logger; @@ -39,9 +38,6 @@ import java.util.List; import java.util.UUID; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; /** * @author Slimane Amar @@ -58,7 +54,7 @@ public class NetworkModificationApplicator { @Getter private final FilterService filterService; - private final ExecutorService applicationExecutor; + private final LargeNetworkModificationExecutionService largeNetworkModificationExecutionService; private final NetworkModificationObserver networkModificationObserver; @@ -68,18 +64,18 @@ public class NetworkModificationApplicator { public NetworkModificationApplicator(NetworkStoreService networkStoreService, EquipmentInfosService equipmentInfosService, ReportService reportService, FilterService filterService, - @Value("${max-large-concurrent-applications}") int maxConcurrentApplications, - NetworkModificationObserver networkModificationObserver) { + NetworkModificationObserver networkModificationObserver, + LargeNetworkModificationExecutionService largeNetworkModificationExecutionService) { this.networkStoreService = networkStoreService; this.equipmentInfosService = equipmentInfosService; this.reportService = reportService; this.filterService = filterService; - this.applicationExecutor = Executors.newFixedThreadPool(maxConcurrentApplications); this.networkModificationObserver = networkModificationObserver; + this.largeNetworkModificationExecutionService = largeNetworkModificationExecutionService; } - /* This method is used when creating, inserting, moving or duplicating modifications + /* This method is used for incremental modifications * Since there is no queue for these operations and they can be memory consuming when the preloading strategy is large * (for example for VOLTAGE_INIT_MODIFICATION), * we limit the number of concurrent applications of these modifications to avoid out of memory issues. @@ -97,15 +93,16 @@ public NetworkModificationResult applyModifications(List modi .map(ModificationType::getStrategy) .orElse(PreloadingStrategy.NONE); if (preloadingStrategy == PreloadingStrategy.ALL_COLLECTIONS_NEEDED_FOR_BUS_VIEW) { - CompletableFuture future = CompletableFuture.supplyAsync(() -> processApplication(modificationInfosList, networkInfos, reportInfos), applicationExecutor); - return future.join(); + return largeNetworkModificationExecutionService + .supplyAsync(() -> apply(modificationInfosList, networkInfos, reportInfos)) + .join(); } else { - return processApplication(modificationInfosList, networkInfos, reportInfos); + return apply(modificationInfosList, networkInfos, reportInfos); } } - // used for creating, inserting, moving or duplicating modifications - private NetworkModificationResult processApplication(List modificationInfosList, NetworkInfos networkInfos, ReportInfos reportInfos) { + // This method is used for incremental modifications + private NetworkModificationResult apply(List modificationInfosList, NetworkInfos networkInfos, ReportInfos reportInfos) { NetworkStoreListener listener = NetworkStoreListener.create(networkInfos.getNetwork(), networkInfos.getNetworkUuuid(), networkStoreService, equipmentInfosService, collectionThreshold); ApplicationStatus groupApplicationStatus = apply(modificationInfosList, listener.getNetwork(), reportInfos); List networkImpacts = listener.flushNetworkModifications(); @@ -134,15 +131,16 @@ public NetworkModificationResult applyModifications(List future = CompletableFuture.supplyAsync(() -> processApplication(modificationInfosGroups, networkInfos), applicationExecutor); - return future.join(); + return largeNetworkModificationExecutionService + .supplyAsync(() -> apply(modificationInfosGroups, networkInfos)) + .join(); } else { - return processApplication(modificationInfosGroups, networkInfos); + return apply(modificationInfosGroups, networkInfos); } } - // used for building a variant - private NetworkModificationResult processApplication(List>> modificationInfosGroups, NetworkInfos networkInfos) { + // This method is used when building a variant + private NetworkModificationResult apply(List>> modificationInfosGroups, NetworkInfos networkInfos) { NetworkStoreListener listener = NetworkStoreListener.create(networkInfos.getNetwork(), networkInfos.getNetworkUuuid(), networkStoreService, equipmentInfosService, collectionThreshold); List groupsApplicationStatuses = modificationInfosGroups.stream() @@ -233,9 +231,4 @@ public static ApplicationStatus getApplicationStatus(ReportNode reportNode) { throw new IllegalArgumentException(String.format("Report severity '%s' unknown !", severity.getValue())); } } - - @PreDestroy - public void shutdown() { - applicationExecutor.shutdown(); - } } diff --git a/src/main/java/org/gridsuite/modification/server/service/LargeNetworkModificationExecutionService.java b/src/main/java/org/gridsuite/modification/server/service/LargeNetworkModificationExecutionService.java new file mode 100644 index 000000000..27211d2d4 --- /dev/null +++ b/src/main/java/org/gridsuite/modification/server/service/LargeNetworkModificationExecutionService.java @@ -0,0 +1,42 @@ +/** + * Copyright (c) 2024, RTE (http://www.rte-france.com) + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + */ + +package org.gridsuite.modification.server.service; + +import jakarta.annotation.PreDestroy; +import lombok.NonNull; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Service; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.function.Supplier; + +/** + * @author Slimane Amar + */ +@Service +public class LargeNetworkModificationExecutionService { + + private ThreadPoolExecutor executorService; + + public LargeNetworkModificationExecutionService(@Value("${max-large-concurrent-applications}") int maxConcurrentLargeModifications, + @NonNull NetworkModificationObserver networkModificationObserver) { + executorService = (ThreadPoolExecutor) Executors.newFixedThreadPool(maxConcurrentLargeModifications); + networkModificationObserver.createThreadPoolMetric(executorService); + } + + @PreDestroy + private void preDestroy() { + executorService.shutdown(); + } + + public CompletableFuture supplyAsync(Supplier supplier) { + return CompletableFuture.supplyAsync(supplier, executorService); + } +} diff --git a/src/main/java/org/gridsuite/modification/server/service/NetworkModificationObserver.java b/src/main/java/org/gridsuite/modification/server/service/NetworkModificationObserver.java index acf418d13..8acb7eaa4 100644 --- a/src/main/java/org/gridsuite/modification/server/service/NetworkModificationObserver.java +++ b/src/main/java/org/gridsuite/modification/server/service/NetworkModificationObserver.java @@ -1,20 +1,31 @@ package org.gridsuite.modification.server.service; +import io.micrometer.core.instrument.Gauge; +import io.micrometer.core.instrument.MeterRegistry; import io.micrometer.observation.Observation; import io.micrometer.observation.ObservationRegistry; import lombok.NonNull; import org.gridsuite.modification.ModificationType; import org.springframework.stereotype.Service; +import java.util.concurrent.ThreadPoolExecutor; + @Service public class NetworkModificationObserver { - protected static final String OBSERVATION_PREFIX = "app.network-modification."; - protected static final String MODIFICATION_TYPE_TAG_NAME = "modification_type"; + private static final String OBSERVATION_PREFIX = "app.network-modification."; + private static final String MODIFICATION_TYPE_TAG_NAME = "modification_type"; + + private static final String TASK_TYPE_TAG_NAME = "type"; + private static final String TASK_TYPE_TAG_VALUE_CURRENT = "current"; + private static final String TASK_TYPE_TAG_VALUE_PENDING = "pending"; + private static final String TASK_POOL_METER_NAME_PREFIX = OBSERVATION_PREFIX + "tasks.pool."; private final ObservationRegistry observationRegistry; + private final MeterRegistry meterRegistry; - public NetworkModificationObserver(@NonNull ObservationRegistry observationRegistry) { + public NetworkModificationObserver(@NonNull ObservationRegistry observationRegistry, @NonNull MeterRegistry meterRegistry) { this.observationRegistry = observationRegistry; + this.meterRegistry = meterRegistry; } public void observe(String name, ModificationType modificationType, Observation.CheckedRunnable runnable) throws E { @@ -26,4 +37,14 @@ private Observation createObservation(String name, ModificationType modification .lowCardinalityKeyValue(MODIFICATION_TYPE_TAG_NAME, modificationType.name()); } + public void createThreadPoolMetric(ThreadPoolExecutor threadPoolExecutor) { + Gauge.builder(TASK_POOL_METER_NAME_PREFIX + TASK_TYPE_TAG_VALUE_CURRENT, threadPoolExecutor, ThreadPoolExecutor::getActiveCount) + .description("The number of active large network modification tasks in the thread pool") + .tag(TASK_TYPE_TAG_NAME, TASK_TYPE_TAG_VALUE_CURRENT) + .register(meterRegistry); + Gauge.builder(TASK_POOL_METER_NAME_PREFIX + TASK_TYPE_TAG_VALUE_PENDING, threadPoolExecutor, executor -> executor.getQueue().size()) + .description("The number of pending large network modification tasks in the thread pool") + .tag(TASK_TYPE_TAG_NAME, TASK_TYPE_TAG_VALUE_PENDING) + .register(meterRegistry); + } } diff --git a/src/test/java/org/gridsuite/modification/server/VoltageInitReportTest.java b/src/test/java/org/gridsuite/modification/server/VoltageInitReportTest.java index 5a10440e4..dbbec9625 100644 --- a/src/test/java/org/gridsuite/modification/server/VoltageInitReportTest.java +++ b/src/test/java/org/gridsuite/modification/server/VoltageInitReportTest.java @@ -19,6 +19,7 @@ import com.powsybl.network.store.client.RestClient; import com.powsybl.network.store.iidm.impl.CachedNetworkStoreClient; import com.powsybl.network.store.iidm.impl.OfflineNetworkStoreClient; +import io.micrometer.core.instrument.simple.SimpleMeterRegistry; import io.micrometer.observation.ObservationRegistry; import lombok.extern.slf4j.Slf4j; import org.gridsuite.modification.dto.*; @@ -28,6 +29,7 @@ import org.gridsuite.modification.server.dto.NetworkModificationResult.ApplicationStatus; import org.gridsuite.modification.server.elasticsearch.EquipmentInfosService; import org.gridsuite.modification.server.modifications.NetworkModificationApplicator; +import org.gridsuite.modification.server.service.LargeNetworkModificationExecutionService; import org.gridsuite.modification.server.service.NetworkModificationObserver; import org.gridsuite.modification.server.service.ReportService; import org.junit.jupiter.api.DisplayName; @@ -67,8 +69,9 @@ void testVoltageInitDuplicationLogs(final ApplicationStatus resultStatus, final final NetworkStoreService networkStoreService = new NetworkStoreServicePublic(restClient, PreloadingStrategy.NONE, (restClient_, preloadingStrategy, executorService) -> new CachedNetworkStoreClient(new OfflineNetworkStoreClient())); final EquipmentInfosService equipmentInfosService = Mockito.mock(EquipmentInfosService.class); - final NetworkModificationObserver networkModificationObserver = new NetworkModificationObserver(ObservationRegistry.NOOP); - final NetworkModificationApplicator networkModificationApplicator = new NetworkModificationApplicator(networkStoreService, equipmentInfosService, reportService, null, 2, networkModificationObserver); + final NetworkModificationObserver networkModificationObserver = new NetworkModificationObserver(ObservationRegistry.NOOP, new SimpleMeterRegistry()); + final LargeNetworkModificationExecutionService modificationExecutionService = new LargeNetworkModificationExecutionService(2, networkModificationObserver); + final NetworkModificationApplicator networkModificationApplicator = new NetworkModificationApplicator(networkStoreService, equipmentInfosService, reportService, null, networkModificationObserver, modificationExecutionService); networkModificationApplicator.setCollectionThreshold(5); final Network network = Network.read(Paths.get(this.getClass().getClassLoader().getResource("fourSubstations_testsOpenReac.xiidm").toURI())); From 26ccfb81b2bb720512a549b94e37e2daa47cf88d Mon Sep 17 00:00:00 2001 From: Ayoub LABIDI <117761394+ayolab@users.noreply.github.com> Date: Tue, 7 Jan 2025 17:44:36 +0100 Subject: [PATCH 3/4] Enhance test coverage for NetworkModificationApplicator (#586) Signed-off-by: Ayoub LABIDI --- .../NetworkModificationApplicatorTest.java | 82 +++++++++++++++++++ 1 file changed, 82 insertions(+) diff --git a/src/test/java/org/gridsuite/modification/server/service/NetworkModificationApplicatorTest.java b/src/test/java/org/gridsuite/modification/server/service/NetworkModificationApplicatorTest.java index 42708b1e3..7b114ba46 100644 --- a/src/test/java/org/gridsuite/modification/server/service/NetworkModificationApplicatorTest.java +++ b/src/test/java/org/gridsuite/modification/server/service/NetworkModificationApplicatorTest.java @@ -11,22 +11,104 @@ import com.powsybl.commons.report.ReportNode; import com.powsybl.commons.report.ReportNodeAdder; import com.powsybl.commons.report.TypedValue; +import com.powsybl.network.store.client.NetworkStoreService; +import com.powsybl.network.store.client.PreloadingStrategy; + +import org.apache.commons.lang3.tuple.Pair; +import org.gridsuite.modification.ModificationType; +import org.gridsuite.modification.dto.ModificationInfos; +import org.gridsuite.modification.server.dto.NetworkInfos; +import org.gridsuite.modification.server.dto.NetworkModificationResult; +import org.gridsuite.modification.server.dto.ReportInfos; import org.gridsuite.modification.server.dto.NetworkModificationResult.ApplicationStatus; +import org.gridsuite.modification.server.elasticsearch.EquipmentInfosService; import org.gridsuite.modification.server.modifications.NetworkModificationApplicator; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; import java.util.List; +import java.util.concurrent.CompletableFuture; import java.util.stream.Stream; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; @Tag("UnitTest") class NetworkModificationApplicatorTest { + + @Mock + private NetworkStoreService networkStoreService; + + @Mock + private EquipmentInfosService equipmentInfosService; + + @Mock + private ReportService reportService; + + @Mock + private FilterService filterService; + + @Mock + private NetworkModificationObserver networkModificationObserver; + + @Mock + private LargeNetworkModificationExecutionService largeNetworkModificationExecutionService; + + @BeforeEach + void setUp() { + MockitoAnnotations.openMocks(this); + } + + @Test + void testApplyModificationsWithAllCollectionsNeededForBusView() { + List modificationInfosList = List.of(mock(ModificationInfos.class)); + NetworkInfos networkInfos = mock(NetworkInfos.class); + ReportInfos reportInfos = mock(ReportInfos.class); + + NetworkModificationApplicator applicator = new NetworkModificationApplicator( + networkStoreService, equipmentInfosService, reportService, filterService, networkModificationObserver, largeNetworkModificationExecutionService); + + ModificationType mockModificationType = mock(ModificationType.class); + when(modificationInfosList.get(0).getType()).thenReturn(mockModificationType); + when(mockModificationType.getStrategy()).thenReturn(PreloadingStrategy.ALL_COLLECTIONS_NEEDED_FOR_BUS_VIEW); + when(largeNetworkModificationExecutionService.supplyAsync(any())).thenReturn(CompletableFuture.completedFuture(NetworkModificationResult.builder().build())); + + NetworkModificationResult result = applicator.applyModifications(modificationInfosList, networkInfos, reportInfos); + + assertNotNull(result); + verify(largeNetworkModificationExecutionService).supplyAsync(any()); + } + + @Test + void testApplyModificationsWithGroupsAndAllCollectionsNeededForBusView() { + List>> modificationInfosGroups = List.of(Pair.of(mock(ReportInfos.class), List.of(mock(ModificationInfos.class)))); + NetworkInfos networkInfos = mock(NetworkInfos.class); + + NetworkModificationApplicator applicator = new NetworkModificationApplicator( + networkStoreService, equipmentInfosService, reportService, filterService, networkModificationObserver, largeNetworkModificationExecutionService); + + ModificationType mockModificationType = mock(ModificationType.class); + when(modificationInfosGroups.get(0).getRight().get(0).getType()).thenReturn(mockModificationType); + when(mockModificationType.getStrategy()).thenReturn(PreloadingStrategy.ALL_COLLECTIONS_NEEDED_FOR_BUS_VIEW); + when(largeNetworkModificationExecutionService.supplyAsync(any())).thenReturn(CompletableFuture.completedFuture(NetworkModificationResult.builder().build())); + + NetworkModificationResult result = applicator.applyModifications(modificationInfosGroups, networkInfos); + + assertNotNull(result); + verify(largeNetworkModificationExecutionService).supplyAsync(any()); + } + @ParameterizedTest @MethodSource("provideArgumentsForComputeHigherSeverity") void computeHigherSeverity(List reports, ApplicationStatus expectedSeverity) { From 8acdae0f83226a8eecc20d865c660d581642a319 Mon Sep 17 00:00:00 2001 From: Joris Mancini <53527338+TheMaskedTurtle@users.noreply.github.com> Date: Wed, 8 Jan 2025 10:00:09 +0100 Subject: [PATCH 4/4] feat: rely on rabbitmq dlq for computation errors handling (#575) Signed-off-by: Joris Mancini --- .../modification/server/BuildException.java | 17 ++++++++ .../service/BuildFailedPublisherService.java | 41 ------------------- .../server/service/BuildWorkerService.java | 20 +++------ src/main/resources/config/application.yaml | 16 ++++++-- .../server/service/BuildTest.java | 11 +---- .../service/BuildWorkerServiceTest.java | 31 ++++++++++++++ 6 files changed, 68 insertions(+), 68 deletions(-) create mode 100644 src/main/java/org/gridsuite/modification/server/BuildException.java delete mode 100644 src/main/java/org/gridsuite/modification/server/service/BuildFailedPublisherService.java create mode 100644 src/test/java/org/gridsuite/modification/server/service/BuildWorkerServiceTest.java diff --git a/src/main/java/org/gridsuite/modification/server/BuildException.java b/src/main/java/org/gridsuite/modification/server/BuildException.java new file mode 100644 index 000000000..29ad16c59 --- /dev/null +++ b/src/main/java/org/gridsuite/modification/server/BuildException.java @@ -0,0 +1,17 @@ +/* + * Copyright (c) 2024, RTE (http://www.rte-france.com) + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + */ + +package org.gridsuite.modification.server; + +/** + * @author Joris Mancini + */ +public class BuildException extends RuntimeException { + public BuildException(String message, Throwable e) { + super(message, e); + } +} diff --git a/src/main/java/org/gridsuite/modification/server/service/BuildFailedPublisherService.java b/src/main/java/org/gridsuite/modification/server/service/BuildFailedPublisherService.java deleted file mode 100644 index 06289dcdb..000000000 --- a/src/main/java/org/gridsuite/modification/server/service/BuildFailedPublisherService.java +++ /dev/null @@ -1,41 +0,0 @@ -/** - * Copyright (c) 2022, RTE (http://www.rte-france.com) - * This Source Code Form is subject to the terms of the Mozilla Public - * License, v. 2.0. If a copy of the MPL was not distributed with this - * file, You can obtain one at http://mozilla.org/MPL/2.0/. - */ - -package org.gridsuite.modification.server.service; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.cloud.stream.function.StreamBridge; -import org.springframework.messaging.Message; -import org.springframework.messaging.support.MessageBuilder; -import org.springframework.stereotype.Service; - -@Service -public class BuildFailedPublisherService { - - private static final String CATEGORY_BROKER_OUTPUT = BuildFailedPublisherService.class.getName() + ".output-broker-messages"; - - private static final Logger LOGGER = LoggerFactory.getLogger(CATEGORY_BROKER_OUTPUT); - - @Autowired - private StreamBridge failedMessagePublisher; - - public void publishFail(String receiver, String failMessage) { - publish(receiver, failMessage); - } - - private void publish(String receiver, String failMessage) { - Message message = MessageBuilder - .withPayload("") - .setHeader("receiver", receiver) - .setHeader("message", failMessage) - .build(); - LOGGER.debug("Sending message : {}", message); - failedMessagePublisher.send("publishFailedBuild-out-0", message); - } -} diff --git a/src/main/java/org/gridsuite/modification/server/service/BuildWorkerService.java b/src/main/java/org/gridsuite/modification/server/service/BuildWorkerService.java index 157e06616..9ac2fbde4 100644 --- a/src/main/java/org/gridsuite/modification/server/service/BuildWorkerService.java +++ b/src/main/java/org/gridsuite/modification/server/service/BuildWorkerService.java @@ -9,6 +9,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Sets; import lombok.NonNull; +import org.gridsuite.modification.server.BuildException; import org.gridsuite.modification.server.dto.BuildInfos; import org.gridsuite.modification.server.dto.NetworkModificationResult; import org.slf4j.Logger; @@ -46,8 +47,6 @@ public class BuildWorkerService { private final BuildStoppedPublisherService stoppedPublisherService; - private final BuildFailedPublisherService failedPublisherService; - private final Map> futures = new ConcurrentHashMap<>(); private final Map cancelBuildRequests = new ConcurrentHashMap<>(); @@ -61,12 +60,10 @@ public class BuildWorkerService { public BuildWorkerService(@NonNull NetworkModificationService networkModificationService, @NonNull ObjectMapper objectMapper, - @NonNull BuildStoppedPublisherService stoppedPublisherService, - @NonNull BuildFailedPublisherService failedPublisherService) { + @NonNull BuildStoppedPublisherService stoppedPublisherService) { this.networkModificationService = networkModificationService; this.objectMapper = objectMapper; this.stoppedPublisherService = stoppedPublisherService; - this.failedPublisherService = failedPublisherService; } private CompletableFuture execBuildVariant(BuildExecContext execContext, BuildInfos buildInfos) { @@ -98,11 +95,11 @@ private CompletableFuture execBuildVariant(BuildExecC @Bean public Consumer> consumeBuild() { return message -> { - BuildExecContext execContext = null; + BuildExecContext execContext; try { execContext = BuildExecContext.fromMessage(message, objectMapper); } catch (Exception e) { - LOGGER.error("Error retrieving message in consumeBuild", e); + throw new BuildException("Failed to read build message", e); } startBuild(Objects.requireNonNull(execContext)); }; @@ -113,7 +110,7 @@ private void startBuild(BuildExecContext execContext) { BuildInfos buildInfos = execContext.getBuildInfos(); CompletableFuture future = execBuildVariant(execContext, buildInfos); NetworkModificationResult result; - if (future != null && (result = future.get()) != null) { // result available + if (future != null && (result = future.join()) != null) { // result available notificationService.emitBuildResultMessage(result, execContext.getReceiver()); LOGGER.info("Build complete on node '{}'", execContext.getReceiver()); } else { // result not available : stop build request @@ -123,13 +120,8 @@ private void startBuild(BuildExecContext execContext) { } } catch (CancellationException e) { stoppedPublisherService.publishCancel(execContext.getReceiver(), CANCEL_MESSAGE); - } catch (InterruptedException e) { - LOGGER.error(FAIL_MESSAGE, e); - failedPublisherService.publishFail(execContext.getReceiver(), FAIL_MESSAGE + " : " + e.getMessage()); - Thread.currentThread().interrupt(); } catch (Exception e) { - LOGGER.error(FAIL_MESSAGE, e); - failedPublisherService.publishFail(execContext.getReceiver(), FAIL_MESSAGE + " : " + e.getMessage()); + throw new BuildException("Node build failed", e); } finally { futures.remove(execContext.getReceiver()); cancelBuildRequests.remove(execContext.getReceiver()); diff --git a/src/main/resources/config/application.yaml b/src/main/resources/config/application.yaml index 2e05b56e6..9fae09985 100644 --- a/src/main/resources/config/application.yaml +++ b/src/main/resources/config/application.yaml @@ -21,6 +21,7 @@ spring: group: buildGroup consumer: concurrency: 2 + max-attempts: 1 publishBuild-out-0: destination: ${powsybl-ws.rabbitmq.destination.prefix:}build.run publishResultBuild-out-0: @@ -31,9 +32,18 @@ spring: destination: ${powsybl-ws.rabbitmq.destination.prefix:}build.cancel publishStoppedBuild-out-0: destination: ${powsybl-ws.rabbitmq.destination.prefix:}build.stopped - publishFailedBuild-out-0: - destination: ${powsybl-ws.rabbitmq.destination.prefix:}build.failed - output-bindings: publishBuild-out-0;publishResultBuild-out-0;publishCancelBuild-out-0;publishStoppedBuild-out-0;publishFailedBuild-out-0 + output-bindings: publishBuild-out-0;publishResultBuild-out-0;publishCancelBuild-out-0;publishStoppedBuild-out-0 + rabbit: + bindings: + consumeBuild-in-0: + consumer: + auto-bind-dlq: true + dead-letter-exchange: ${powsybl-ws.rabbitmq.destination.prefix:}build.run.dlx + dead-letter-queue-name: ${powsybl-ws.rabbitmq.destination.prefix:}build.run.dlx.dlq + dead-letter-exchange-type: topic + quorum: + enabled: true + delivery-limit: 2 powsybl-ws: database: diff --git a/src/test/java/org/gridsuite/modification/server/service/BuildTest.java b/src/test/java/org/gridsuite/modification/server/service/BuildTest.java index bd6b8150c..646fb3474 100644 --- a/src/test/java/org/gridsuite/modification/server/service/BuildTest.java +++ b/src/test/java/org/gridsuite/modification/server/service/BuildTest.java @@ -72,9 +72,6 @@ import static com.powsybl.iidm.network.ReactiveLimitsKind.MIN_MAX; import static org.gridsuite.modification.server.impacts.TestImpactUtils.*; import static org.gridsuite.modification.server.service.BuildWorkerService.CANCEL_MESSAGE; -import static org.gridsuite.modification.server.service.BuildWorkerService.FAIL_MESSAGE; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.startsWith; import static org.junit.jupiter.api.Assertions.*; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; @@ -127,9 +124,6 @@ class BuildTest { @Value("${spring.cloud.stream.bindings.publishStoppedBuild-out-0.destination}") private String buildStoppedDestination; - @Value("${spring.cloud.stream.bindings.publishFailedBuild-out-0.destination}") - private String buildFailedDestination; - @Autowired private OutputDestination output; @@ -910,9 +904,6 @@ void runBuildWithReportErrorTest(final MockWebServer server) throws Exception { assertTrue(TestUtils.getRequestsDone(1, server).stream().anyMatch(r -> r.matches("/v1/reports/.*"))); assertNull(output.receive(TIMEOUT, buildResultDestination)); - Message message = output.receive(TIMEOUT * 3, buildFailedDestination); - assertEquals("me", message.getHeaders().get("receiver")); - assertThat((String) message.getHeaders().get("message"), startsWith(FAIL_MESSAGE)); Message buildMessage = output.receive(TIMEOUT, consumeBuildDestination); assertNotNull(buildMessage); assertEquals("me", buildMessage.getHeaders().get("receiver")); @@ -970,7 +961,7 @@ private void testNetworkModificationsCount(UUID groupUuid, int actualSize) { @AfterEach void tearDown(final MockWebServer server) { - List destinations = List.of(consumeBuildDestination, cancelBuildDestination, buildResultDestination, buildStoppedDestination, buildFailedDestination); + List destinations = List.of(consumeBuildDestination, cancelBuildDestination, buildResultDestination, buildStoppedDestination); TestUtils.assertQueuesEmptyThenClear(destinations, output); try { TestUtils.assertServerRequestsEmptyThenShutdown(server); diff --git a/src/test/java/org/gridsuite/modification/server/service/BuildWorkerServiceTest.java b/src/test/java/org/gridsuite/modification/server/service/BuildWorkerServiceTest.java new file mode 100644 index 000000000..024aa9c18 --- /dev/null +++ b/src/test/java/org/gridsuite/modification/server/service/BuildWorkerServiceTest.java @@ -0,0 +1,31 @@ +/* + * Copyright (c) 2024, RTE (http://www.rte-france.com) + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + */ + +package org.gridsuite.modification.server.service; + +import org.gridsuite.modification.server.BuildException; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.messaging.support.MessageBuilder; + +import static org.junit.jupiter.api.Assertions.assertThrows; + +@SpringBootTest +class BuildWorkerServiceTest { + + @Autowired + private BuildWorkerService buildWorkerService; + + @Test + void testConsumeBuildWithMalformedInput() { + assertThrows( + BuildException.class, + () -> buildWorkerService.consumeBuild().accept(MessageBuilder.withPayload("wrong message").build()), + "Failed to read build message"); + } +}