diff options
18 files changed, 485 insertions, 57 deletions
diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/ApplicationRepository.java b/configserver/src/main/java/com/yahoo/vespa/config/server/ApplicationRepository.java index 6cc05a0f69e..1ffefd1a6b7 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/ApplicationRepository.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/ApplicationRepository.java @@ -32,8 +32,11 @@ import com.yahoo.transaction.Transaction; import com.yahoo.vespa.config.server.application.Application; import com.yahoo.vespa.config.server.application.ApplicationReindexing; import com.yahoo.vespa.config.server.application.ApplicationSet; +import com.yahoo.vespa.config.server.application.ClusterReindexing; +import com.yahoo.vespa.config.server.application.ClusterReindexingStatusClient; import com.yahoo.vespa.config.server.application.CompressedApplicationInputStream; import com.yahoo.vespa.config.server.application.ConfigConvergenceChecker; +import com.yahoo.vespa.config.server.application.DefaultClusterReindexingStatusClient; import com.yahoo.vespa.config.server.application.FileDistributionStatus; import com.yahoo.vespa.config.server.application.HttpProxy; import com.yahoo.vespa.config.server.application.TenantApplications; @@ -134,6 +137,7 @@ public class ApplicationRepository implements com.yahoo.config.provision.Deploye private final LogRetriever logRetriever; private final TesterClient testerClient; private final Metric metric; + private final ClusterReindexingStatusClient clusterReindexingStatusClient; @Inject public ApplicationRepository(TenantRepository tenantRepository, @@ -157,7 +161,8 @@ public class ApplicationRepository implements com.yahoo.config.provision.Deploye Clock.systemUTC(), testerClient, metric, - flagSource); + flagSource, + new DefaultClusterReindexingStatusClient()); } private ApplicationRepository(TenantRepository tenantRepository, @@ -171,7 +176,8 @@ public class ApplicationRepository implements com.yahoo.config.provision.Deploye Clock clock, TesterClient testerClient, Metric metric, - FlagSource flagSource) { + FlagSource flagSource, + ClusterReindexingStatusClient clusterReindexingStatusClient) { this.tenantRepository = Objects.requireNonNull(tenantRepository); this.hostProvisioner = Objects.requireNonNull(hostProvisioner); this.infraDeployer = Objects.requireNonNull(infraDeployer); @@ -183,6 +189,7 @@ public class ApplicationRepository implements com.yahoo.config.provision.Deploye this.clock = Objects.requireNonNull(clock); this.testerClient = Objects.requireNonNull(testerClient); this.metric = Objects.requireNonNull(metric); + this.clusterReindexingStatusClient = clusterReindexingStatusClient; } public static class Builder { @@ -266,7 +273,8 @@ public class ApplicationRepository implements com.yahoo.config.provision.Deploye clock, testerClient, metric, - flagSource); + flagSource, + ClusterReindexingStatusClient.DUMMY_INSTANCE); } } @@ -528,6 +536,10 @@ public class ApplicationRepository implements com.yahoo.config.provision.Deploye CLUSTERCONTROLLER_CONTAINER.serviceName, relativePath); } + public Map<String, ClusterReindexing> getClusterReindexingStatus(ApplicationId applicationId) { + return uncheck(() -> clusterReindexingStatusClient.getReindexingStatus(getApplication(applicationId))); + } + public Long getApplicationGeneration(ApplicationId applicationId) { return getApplication(applicationId).getApplicationGeneration(); } diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/application/ClusterReindexing.java b/configserver/src/main/java/com/yahoo/vespa/config/server/application/ClusterReindexing.java index ca9aa01ea56..7f0671820d3 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/application/ClusterReindexing.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/application/ClusterReindexing.java @@ -2,6 +2,7 @@ package com.yahoo.vespa.config.server.application; import java.time.Instant; +import java.util.Arrays; import java.util.Map; import java.util.Objects; import java.util.Optional; @@ -10,6 +11,7 @@ import java.util.Optional; * Reindexing status for each document type in a content cluster. * * @author jonmv + * @author bjorncs */ public class ClusterReindexing { @@ -51,8 +53,19 @@ public class ClusterReindexing { public enum State { + PENDING("pending"), RUNNING("running"), FAILED("failed"), SUCCESSFUL("successful"); - PENDING, RUNNING, FAILED, SUCCESSFUL; + private final String stringValue; + State(String stringValue) { this.stringValue = stringValue; } + + public static State fromString(String value) { + return Arrays.stream(values()) + .filter(v -> v.stringValue.equals(value)) + .findAny() + .orElseThrow(() -> new IllegalArgumentException("Unknown value: " + value)); + } + + public String asString() { return stringValue; } } } diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/application/ClusterReindexingStatusClient.java b/configserver/src/main/java/com/yahoo/vespa/config/server/application/ClusterReindexingStatusClient.java new file mode 100644 index 00000000000..1201bbd4814 --- /dev/null +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/application/ClusterReindexingStatusClient.java @@ -0,0 +1,25 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.config.server.application; + +import com.yahoo.vespa.config.server.modelfactory.ModelResult; + +import java.io.IOException; +import java.util.Map; + +/** + * Retrieves reindexing status from content clusters + * + * @author bjorncs + */ +public interface ClusterReindexingStatusClient extends AutoCloseable { + + Map<String, ClusterReindexing> getReindexingStatus(ModelResult application) throws IOException; + + void close(); + + ClusterReindexingStatusClient DUMMY_INSTANCE = new ClusterReindexingStatusClient() { + @Override public Map<String, ClusterReindexing> getReindexingStatus(ModelResult application) { return Map.of(); } + @Override public void close() {} + }; + +} diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/application/DefaultClusterReindexingStatusClient.java b/configserver/src/main/java/com/yahoo/vespa/config/server/application/DefaultClusterReindexingStatusClient.java new file mode 100644 index 00000000000..96873f147f6 --- /dev/null +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/application/DefaultClusterReindexingStatusClient.java @@ -0,0 +1,150 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.config.server.application; + +import ai.vespa.util.http.VespaAsyncHttpClientBuilder; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.yahoo.concurrent.CompletableFutures; +import com.yahoo.concurrent.DaemonThreadFactory; +import com.yahoo.config.model.api.PortInfo; +import com.yahoo.config.model.api.ServiceInfo; +import com.yahoo.vespa.applicationmodel.ClusterId; +import com.yahoo.vespa.config.server.modelfactory.ModelResult; +import org.apache.hc.client5.http.async.methods.SimpleHttpRequests; +import org.apache.hc.client5.http.async.methods.SimpleHttpResponse; +import org.apache.hc.client5.http.config.RequestConfig; +import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient; +import org.apache.hc.core5.concurrent.FutureCallback; +import org.apache.hc.core5.http.HttpStatus; +import org.apache.hc.core5.reactor.IOReactorConfig; +import org.apache.hc.core5.util.Timeout; + +import java.io.IOException; +import java.net.URI; +import java.time.Instant; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.stream.Collectors; + +import static com.yahoo.config.model.api.container.ContainerServiceType.CLUSTERCONTROLLER_CONTAINER; +import static com.yahoo.yolean.Exceptions.throwUnchecked; +import static com.yahoo.yolean.Exceptions.uncheck; + +/** + * Retrieves reindexing status from cluster controllers over HTTP + * + * @author bjorncs + */ +public class DefaultClusterReindexingStatusClient implements ClusterReindexingStatusClient { + + private static final ObjectMapper mapper = new ObjectMapper(); + + private final Executor executor = + Executors.newSingleThreadExecutor(new DaemonThreadFactory("cluster-controller-reindexing-client-")); + private final CloseableHttpAsyncClient httpClient = createHttpClient(); + + public DefaultClusterReindexingStatusClient() { + httpClient.start(); + } + + @Override + public Map<String, ClusterReindexing> getReindexingStatus(ModelResult application) throws IOException { + Map<ClusterId, List<ServiceInfo>> clusters = clusterControllerClusters(application); + Map<ClusterId, CompletableFuture<ClusterReindexing>> futureStatusPerCluster = new HashMap<>(); + clusters.forEach((clusterId, clusterNodes) -> { + var parallelRequests = clusterNodes.stream() + .map(this::getReindexingStatus) + .collect(Collectors.toList()); + CompletableFuture<ClusterReindexing> combinedRequest = CompletableFutures.firstOf(parallelRequests); + futureStatusPerCluster.put(clusterId, combinedRequest); + }); + + try { + Map<String, ClusterReindexing> statusPerCluster = new HashMap<>(); + futureStatusPerCluster.forEach((clusterId, futureStatus) -> { + statusPerCluster.put(clusterId.s(), futureStatus.join()); + }); + return statusPerCluster; + } catch (Exception e) { + throw new IOException("Failed to get reindexing status from cluster controllers: " + e.getMessage(), e); + } + } + + @Override public void close() { uncheck(() -> httpClient.close()); } + + private CompletableFuture<ClusterReindexing> getReindexingStatus(ServiceInfo service) { + URI uri = URI.create(String.format("http://%s:%d/reindexing/v1/status", service.getHostName(), getStatePort(service))); + CompletableFuture<SimpleHttpResponse> responsePromise = new CompletableFuture<>(); + httpClient.execute(SimpleHttpRequests.get(uri), new FutureCallback<>() { + @Override public void completed(SimpleHttpResponse result) { responsePromise.complete(result); } + @Override public void failed(Exception ex) { responsePromise.completeExceptionally(ex); } + @Override public void cancelled() { responsePromise.cancel(false); } + }); + return responsePromise.handleAsync((response, error) -> { + if (response != null) { + return uncheck(() -> toClusterReindexing(response)); + } else { + throw throwUnchecked(new IOException(String.format("For '%s': %s", uri, error.getMessage()), error)); + } + }, executor); + } + + private static ClusterReindexing toClusterReindexing(SimpleHttpResponse response) throws IOException { + if (response.getCode() != HttpStatus.SC_OK) throw new IOException("Expected status code 200, got " + response.getCode()); + if (response.getBody() == null) throw new IOException("Response has no content"); + return toClusterReindexing(response.getBodyBytes()); + } + + private static ClusterReindexing toClusterReindexing(byte[] requestBody) throws IOException { + JsonNode jsonNode = mapper.readTree(requestBody); + Map<String, ClusterReindexing.Status> documentStatuses = new HashMap<>(); + for (JsonNode statusJson : jsonNode.get("status")) { + String type = statusJson.get("type").textValue(); + Instant startedMillis = Instant.ofEpochMilli(statusJson.get("startedMillis").longValue()); + Instant endedMillis = Instant.ofEpochMilli(statusJson.get("endedMillis").longValue()); + String progressToken = statusJson.get("progress").textValue(); + ClusterReindexing.State state = ClusterReindexing.State.fromString(statusJson.get("state").textValue()); + String message = statusJson.get("message").textValue(); + documentStatuses.put(type, new ClusterReindexing.Status(startedMillis, endedMillis, state, message, progressToken)); + } + return new ClusterReindexing(documentStatuses); + } + + private static int getStatePort(ServiceInfo service) { + return service.getPorts().stream() + .filter(port -> port.getTags().contains("state")) + .map(PortInfo::getPort) + .findAny() + .orElseThrow(() -> new IllegalStateException("Cluster controller container has no container port")); + } + + private static Map<ClusterId, List<ServiceInfo>> clusterControllerClusters(ModelResult application) { + return application.getModel().getHosts().stream() + .flatMap(host -> host.getServices().stream()) + .filter(service -> service.getServiceType().equals(CLUSTERCONTROLLER_CONTAINER.serviceName)) + .collect(Collectors.groupingBy(service -> new ClusterId(service.getProperty("clustername").get()))); + + } + + private static CloseableHttpAsyncClient createHttpClient() { + return VespaAsyncHttpClientBuilder + .create() + .setIOReactorConfig(IOReactorConfig.custom() + .setSoTimeout(Timeout.ofSeconds(2)) + .build()) + .setDefaultRequestConfig( + RequestConfig.custom() + .setConnectTimeout(Timeout.ofSeconds(2)) + .setConnectionRequestTimeout(Timeout.ofSeconds(2)) + .setResponseTimeout(Timeout.ofSeconds(4)) + .build()) + .setUserAgent("cluster-controller-reindexing-client") + .build(); + + } + +} diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/http/v2/ApplicationHandler.java b/configserver/src/main/java/com/yahoo/vespa/config/server/http/v2/ApplicationHandler.java index b924e07ff47..0c7d60c5313 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/http/v2/ApplicationHandler.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/http/v2/ApplicationHandler.java @@ -255,7 +255,7 @@ public class ApplicationHandler extends HttpHandler { return new ReindexingResponse(tenant.getApplicationRepo().database() .readReindexingStatus(applicationId) .orElseThrow(() -> new NotFoundException("Reindexing status not found for " + applicationId)), - Map.of()); // TODO jonmv/bjorncs: Get status of each cluster and fill in here. + applicationRepository.getClusterReindexingStatus(applicationId)); } private HttpResponse restart(HttpRequest request, ApplicationId applicationId) { @@ -481,21 +481,11 @@ public class ApplicationHandler extends HttpHandler { private static void setStatus(Cursor object, ClusterReindexing.Status status) { object.setLong("startedMillis", status.startedAt().toEpochMilli()); status.endedAt().ifPresent(endedAt -> object.setLong("endedMillis", endedAt.toEpochMilli())); - status.state().map(ReindexingResponse::toString).ifPresent(state -> object.setString("state", state)); + status.state().map(ClusterReindexing.State::asString).ifPresent(state -> object.setString("state", state)); status.message().ifPresent(message -> object.setString("message", message)); status.progress().ifPresent(progress -> object.setString("progress", progress)); } - static String toString(ClusterReindexing.State state) { - switch (state) { - case PENDING: return "pending"; - case RUNNING: return "running"; - case FAILED: return "failed"; - case SUCCESSFUL: return "successful"; - default: throw new IllegalArgumentException("Unexpected state '" + state + "'"); - } - } - } } diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/rpc/security/MultiTenantRpcAuthorizer.java b/configserver/src/main/java/com/yahoo/vespa/config/server/rpc/security/MultiTenantRpcAuthorizer.java index 54da80a0299..49a8df3d0e4 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/rpc/security/MultiTenantRpcAuthorizer.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/rpc/security/MultiTenantRpcAuthorizer.java @@ -11,7 +11,6 @@ import com.yahoo.config.provision.security.NodeIdentifierException; import com.yahoo.config.provision.security.NodeIdentity; import com.yahoo.jrt.Request; import com.yahoo.jrt.SecurityContext; -import java.util.logging.Level; import com.yahoo.security.tls.MixedMode; import com.yahoo.security.tls.TransportSecurityUtils; import com.yahoo.vespa.config.ConfigKey; @@ -29,9 +28,11 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.function.BiConsumer; +import java.util.logging.Level; import java.util.logging.Logger; -import static com.yahoo.vespa.config.server.rpc.security.AuthorizationException.*; +import static com.yahoo.vespa.config.server.rpc.security.AuthorizationException.Type; +import static com.yahoo.yolean.Exceptions.throwUnchecked; /** @@ -206,11 +207,6 @@ public class MultiTenantRpcAuthorizer implements RpcAuthorizer { .orElseThrow(() -> new AuthorizationException(String.format("No handler exists for tenant '%s'", tenantName.value()))); } - @SuppressWarnings("unchecked") - private static <T extends Throwable> void throwUnchecked(Throwable t) throws T { - throw (T)t; - } - private enum JrtErrorCode { UNAUTHORIZED(1), AUTHORIZATION_FAILED(2); diff --git a/configserver/src/test/java/com/yahoo/vespa/config/server/application/DefaultClusterReindexingStatusClientTest.java b/configserver/src/test/java/com/yahoo/vespa/config/server/application/DefaultClusterReindexingStatusClientTest.java new file mode 100644 index 00000000000..21894e4a756 --- /dev/null +++ b/configserver/src/test/java/com/yahoo/vespa/config/server/application/DefaultClusterReindexingStatusClientTest.java @@ -0,0 +1,101 @@ +package com.yahoo.vespa.config.server.application;// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +import com.github.tomakehurst.wiremock.junit.WireMockRule; +import com.yahoo.config.model.api.HostInfo; +import com.yahoo.config.model.api.Model; +import com.yahoo.config.model.api.PortInfo; +import com.yahoo.config.model.api.ServiceInfo; +import com.yahoo.documentapi.ProgressToken; +import com.yahoo.vespa.config.server.modelfactory.ModelResult; +import org.junit.Rule; +import org.junit.Test; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.Map; + +import static com.github.tomakehurst.wiremock.client.WireMock.get; +import static com.github.tomakehurst.wiremock.client.WireMock.okJson; +import static com.github.tomakehurst.wiremock.client.WireMock.serverError; +import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo; +import static com.github.tomakehurst.wiremock.core.WireMockConfiguration.options; +import static com.yahoo.config.model.api.container.ContainerServiceType.CLUSTERCONTROLLER_CONTAINER; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * @author bjorncs + */ +public class DefaultClusterReindexingStatusClientTest { + + @Rule public final WireMockRule server1 = new WireMockRule(options().dynamicPort(), true); + @Rule public final WireMockRule server2 = new WireMockRule(options().dynamicPort(), true); + @Rule public final WireMockRule server3 = new WireMockRule(options().dynamicPort(), true); + + @Test + public void combines_result_from_multiple_cluster_controller_clusters() throws IOException { + var client = new DefaultClusterReindexingStatusClient(); + MockApplication app = new MockApplication(); + String uriPath = "/reindexing/v1/status"; + server1.stubFor(get(urlEqualTo(uriPath)).willReturn(serverError())); + server2.stubFor(get(urlEqualTo(uriPath)).willReturn(okJson( + "{\"status\":[{" + + "\"type\":\"music\"," + + "\"startedMillis\":0," + + "\"endedMillis\":123," + + "\"progress\":\"" + new ProgressToken().serializeToString() + "\"," + + "\"state\": \"" + ClusterReindexing.State.FAILED.asString() + "\"," + + "\"message\":\"something went wrong\"}" + + "]}"))); + server3.stubFor(get(urlEqualTo(uriPath)).willReturn(okJson( + "{\"status\":[{" + + "\"type\":\"artist\"," + + "\"startedMillis\":10," + + "\"endedMillis\":150," + + "\"progress\":\"" + new ProgressToken().serializeToString() + "\"," + + "\"state\": \"" + ClusterReindexing.State.SUCCESSFUL.asString() + "\"," + + "\"message\":\"successs\"}" + + "]}"))); + Map<String, ClusterReindexing> result = client.getReindexingStatus(app); + System.out.println(result); + } + + + private class MockApplication implements ModelResult { + private final Collection<HostInfo> hosts; + + MockApplication() { + this.hosts = createHosts(); + } + + @Override + public Model getModel() { + Model model = mock(Model.class); + when(model.getHosts()).thenReturn(hosts); + return model; + } + + private Collection<HostInfo> createHosts() { + return List.of( + createHostInfo(server1.port(), "cc1.1", "cluster1"), + createHostInfo(server2.port(), "cc1.2", "cluster1"), + createHostInfo(server3.port(), "cc2.1", "cluster2")); + } + + private HostInfo createHostInfo(int serverPort, String serviceName, String clusterId) { + return new HostInfo( + "localhost", + List.of(new ServiceInfo( + serviceName, + CLUSTERCONTROLLER_CONTAINER.serviceName, + List.of(new PortInfo(serverPort, List.of("state"))), + Map.of("clustername", clusterId), + "myconfigId", + "localhost"))); + } + + } + + +}
\ No newline at end of file diff --git a/configserver/src/test/java/com/yahoo/vespa/config/server/http/v2/ApplicationHandlerTest.java b/configserver/src/test/java/com/yahoo/vespa/config/server/http/v2/ApplicationHandlerTest.java index 910c4b069e3..3ca21a46d12 100644 --- a/configserver/src/test/java/com/yahoo/vespa/config/server/http/v2/ApplicationHandlerTest.java +++ b/configserver/src/test/java/com/yahoo/vespa/config/server/http/v2/ApplicationHandlerTest.java @@ -440,7 +440,7 @@ public class ApplicationHandlerTest { @Test public void testClusterReindexingStateSerialization() { - Stream.of(ClusterReindexing.State.values()).forEach(ReindexingResponse::toString); + Stream.of(ClusterReindexing.State.values()).forEach(ClusterReindexing.State::toString); } @Test diff --git a/jdisc_http_service/pom.xml b/jdisc_http_service/pom.xml index 7333db96b91..094ca7baa25 100644 --- a/jdisc_http_service/pom.xml +++ b/jdisc_http_service/pom.xml @@ -78,6 +78,12 @@ <version>${project.version}</version> <scope>provided</scope> </dependency> + <dependency> + <groupId>com.yahoo.vespa</groupId> + <artifactId>yolean</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> <!-- TEST SCOPE --> <dependency> diff --git a/jdisc_http_service/src/main/java/com/yahoo/jdisc/http/server/jetty/Exceptions.java b/jdisc_http_service/src/main/java/com/yahoo/jdisc/http/server/jetty/Exceptions.java deleted file mode 100644 index 0806f352ae9..00000000000 --- a/jdisc_http_service/src/main/java/com/yahoo/jdisc/http/server/jetty/Exceptions.java +++ /dev/null @@ -1,30 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.jdisc.http.server.jetty; - -/** - * Utility methods for exceptions - * - * @author Tony Vaagenes - */ -public class Exceptions { - - /** - * Allows treating checked exceptions as unchecked. - * Usage: - * throw throwUnchecked(e); - * The reason for the return type is to allow writing throw at the call site - * instead of just calling throwUnchecked. Just calling throwUnchecked - * means that the java compiler won't know that the statement will throw an exception, - * and will therefore complain on things such e.g. missing return value. - */ - public static RuntimeException throwUnchecked(Throwable e) { - throwUncheckedImpl(e); - return null; - } - - @SuppressWarnings("unchecked") - private static <T extends Throwable> void throwUncheckedImpl(Throwable t) throws T { - throw (T)t; - } - -} diff --git a/jdisc_http_service/src/main/java/com/yahoo/jdisc/http/server/jetty/HttpRequestDispatch.java b/jdisc_http_service/src/main/java/com/yahoo/jdisc/http/server/jetty/HttpRequestDispatch.java index 940009e7520..84c47f5a342 100644 --- a/jdisc_http_service/src/main/java/com/yahoo/jdisc/http/server/jetty/HttpRequestDispatch.java +++ b/jdisc_http_service/src/main/java/com/yahoo/jdisc/http/server/jetty/HttpRequestDispatch.java @@ -35,8 +35,8 @@ import java.util.logging.Logger; import static com.yahoo.jdisc.http.HttpHeaders.Values.APPLICATION_X_WWW_FORM_URLENCODED; import static com.yahoo.jdisc.http.core.HttpServletRequestUtils.getConnection; -import static com.yahoo.jdisc.http.server.jetty.Exceptions.throwUnchecked; import static com.yahoo.jdisc.http.server.jetty.JDiscHttpServlet.getConnector; +import static com.yahoo.yolean.Exceptions.throwUnchecked; /** * @author Simon Thoresen Hult diff --git a/jdisc_http_service/src/main/java/com/yahoo/jdisc/http/server/jetty/JDiscFilterInvokerFilter.java b/jdisc_http_service/src/main/java/com/yahoo/jdisc/http/server/jetty/JDiscFilterInvokerFilter.java index e4dbccf1bcb..a89c115a1c2 100644 --- a/jdisc_http_service/src/main/java/com/yahoo/jdisc/http/server/jetty/JDiscFilterInvokerFilter.java +++ b/jdisc_http_service/src/main/java/com/yahoo/jdisc/http/server/jetty/JDiscFilterInvokerFilter.java @@ -26,8 +26,8 @@ import java.util.Map; import java.util.Optional; import java.util.concurrent.atomic.AtomicReference; -import static com.yahoo.jdisc.http.server.jetty.Exceptions.throwUnchecked; import static com.yahoo.jdisc.http.server.jetty.JDiscHttpServlet.getConnector; +import static com.yahoo.yolean.Exceptions.throwUnchecked; /** * Runs JDisc security filters for Servlets diff --git a/parent/pom.xml b/parent/pom.xml index 9f975e06d45..6a8942230a1 100644 --- a/parent/pom.xml +++ b/parent/pom.xml @@ -701,6 +701,11 @@ </dependency> <dependency> <groupId>org.junit.jupiter</groupId> + <artifactId>junit-jupiter</artifactId> + <version>${junit.version}</version> + </dependency> + <dependency> + <groupId>org.junit.jupiter</groupId> <artifactId>junit-jupiter-engine</artifactId> <version>${junit.version}</version> </dependency> diff --git a/vespajlib/pom.xml b/vespajlib/pom.xml index 302a3c6f5bf..68639d30ab2 100644 --- a/vespajlib/pom.xml +++ b/vespajlib/pom.xml @@ -78,6 +78,11 @@ <artifactId>jackson-databind</artifactId> <scope>test</scope> </dependency> + <dependency> + <groupId>org.junit.jupiter</groupId> + <artifactId>junit-jupiter</artifactId> + <scope>test</scope> + </dependency> </dependencies> <build> diff --git a/vespajlib/src/main/java/com/yahoo/concurrent/CompletableFutures.java b/vespajlib/src/main/java/com/yahoo/concurrent/CompletableFutures.java new file mode 100644 index 00000000000..b1fa6a9438d --- /dev/null +++ b/vespajlib/src/main/java/com/yahoo/concurrent/CompletableFutures.java @@ -0,0 +1,67 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.concurrent; + +import java.util.List; +import java.util.concurrent.CompletableFuture; + +/** + * Helper for {@link java.util.concurrent.CompletableFuture} / {@link java.util.concurrent.CompletionStage}. + * + * @author bjorncs + */ +public class CompletableFutures { + + private CompletableFutures() {} + + /** + * Returns a new completable future that is either + * - completed when any of the provided futures complete without exception + * - completed exceptionally once all provided futures complete exceptionally + */ + public static <T> CompletableFuture<T> firstOf(List<CompletableFuture<T>> futures) { + class Combiner { + final Object monitor = new Object(); + final CompletableFuture<T> combined = new CompletableFuture<>(); + final int futuresCount; + + Throwable error = null; + int exceptionCount = 0; + + Combiner(int futuresCount) { this.futuresCount = futuresCount; } + + void onCompletion(T value, Throwable error) { + if (combined.isDone()) return; + T valueToComplete = null; + Throwable exceptionToComplete = null; + + synchronized (monitor) { + if (value != null) { + valueToComplete = value; + } else { + if (this.error == null) { + this.error = error; + } else { + this.error.addSuppressed(error); + } + if (++exceptionCount == futuresCount) { + exceptionToComplete = this.error; + } + } + } + if (valueToComplete != null) { + combined.complete(value); + } else if (exceptionToComplete != null) { + combined.completeExceptionally(exceptionToComplete); + } + } + } + + int size = futures.size(); + if (size == 0) throw new IllegalArgumentException(); + if (size == 1) return futures.get(0); + Combiner combiner = new Combiner(size); + futures.forEach(future -> future.whenComplete(combiner::onCompletion)); + return combiner.combined; + } + +} diff --git a/vespajlib/src/test/java/com/yahoo/concurrent/CompletableFuturesTest.java b/vespajlib/src/test/java/com/yahoo/concurrent/CompletableFuturesTest.java new file mode 100644 index 00000000000..cf9c36537d9 --- /dev/null +++ b/vespajlib/src/test/java/com/yahoo/concurrent/CompletableFuturesTest.java @@ -0,0 +1,66 @@ +package com.yahoo.concurrent;// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +import org.junit.jupiter.api.Test; + +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; + +/** + * @author bjorncs + */ +class CompletableFuturesTest { + + @Test + public void firstof_completes_when_first_futures_has_completed() { + CompletableFuture<String> f1 = new CompletableFuture<>(); + CompletableFuture<String> f2 = new CompletableFuture<>(); + CompletableFuture<String> f3 = new CompletableFuture<>(); + CompletableFuture<String> result = CompletableFutures.firstOf(List.of(f1, f2, f3)); + f1.complete("success"); + assertTrue(result.isDone()); + assertFalse(result.isCompletedExceptionally()); + assertEquals("success", result.join()); + } + + @Test + public void firstof_completes_if_any_futures_completes() { + CompletableFuture<String> f1 = new CompletableFuture<>(); + CompletableFuture<String> f2 = new CompletableFuture<>(); + CompletableFuture<String> f3 = new CompletableFuture<>(); + CompletableFuture<String> result = CompletableFutures.firstOf(List.of(f1, f2, f3)); + f1.completeExceptionally(new Throwable("t1")); + f2.completeExceptionally(new Throwable("t2")); + f3.complete("success"); + assertTrue(result.isDone()); + assertFalse(result.isCompletedExceptionally()); + assertEquals("success", result.join()); + } + + @Test + public void firstof_completes_exceptionally_when_all_futures_have_complete_exceptionally() { + CompletableFuture<String> f1 = new CompletableFuture<>(); + CompletableFuture<String> f2 = new CompletableFuture<>(); + CompletableFuture<String> f3 = new CompletableFuture<>(); + CompletableFuture<String> result = CompletableFutures.firstOf(List.of(f1, f2, f3)); + f1.completeExceptionally(new Throwable("t1")); + f2.completeExceptionally(new Throwable("t2")); + f3.completeExceptionally(new Throwable("t3")); + assertTrue(result.isDone()); + assertTrue(result.isCompletedExceptionally()); + try { + result.join(); + fail("Exception expected"); + } catch (CompletionException e) { + Throwable cause = e.getCause(); + assertEquals("t1", cause.getMessage()); + assertEquals(2, cause.getSuppressed().length); + } + } + +}
\ No newline at end of file diff --git a/yolean/abi-spec.json b/yolean/abi-spec.json index 4b68b2527b8..82bf59ebf87 100644 --- a/yolean/abi-spec.json +++ b/yolean/abi-spec.json @@ -40,7 +40,8 @@ "public static void uncheckAndIgnore(com.yahoo.yolean.Exceptions$RunnableThrowingIOException, java.lang.Class)", "public static java.lang.Object uncheck(com.yahoo.yolean.Exceptions$SupplierThrowingIOException)", "public static varargs java.lang.Object uncheck(com.yahoo.yolean.Exceptions$SupplierThrowingIOException, java.lang.String, java.lang.String[])", - "public static java.lang.Object uncheckAndIgnore(com.yahoo.yolean.Exceptions$SupplierThrowingIOException, java.lang.Class)" + "public static java.lang.Object uncheckAndIgnore(com.yahoo.yolean.Exceptions$SupplierThrowingIOException, java.lang.Class)", + "public static java.lang.RuntimeException throwUnchecked(java.lang.Throwable)" ], "fields": [] }, diff --git a/yolean/src/main/java/com/yahoo/yolean/Exceptions.java b/yolean/src/main/java/com/yahoo/yolean/Exceptions.java index 063ba70c75d..c377ee3ac37 100644 --- a/yolean/src/main/java/com/yahoo/yolean/Exceptions.java +++ b/yolean/src/main/java/com/yahoo/yolean/Exceptions.java @@ -160,4 +160,25 @@ public class Exceptions { public interface SupplierThrowingIOException<T> { T get() throws IOException; } + + /** + * Allows treating checked exceptions as unchecked. + * Usage: + * throw throwUnchecked(e); + * The reason for the return type is to allow writing throw at the call site + * instead of just calling throwUnchecked. Just calling throwUnchecked + * means that the java compiler won't know that the statement will throw an exception, + * and will therefore complain on things such e.g. missing return value. + */ + public static RuntimeException throwUnchecked(Throwable e) { + throwUncheckedImpl(e); + return new RuntimeException(); // Non-null return value to stop tooling from complaining about potential NPE + } + + @SuppressWarnings("unchecked") + private static <T extends Throwable> void throwUncheckedImpl(Throwable t) throws T { + throw (T)t; + } + + } |