summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--config-model-api/src/main/java/com/yahoo/config/model/api/ModelContext.java1
-rw-r--r--config-model/src/main/java/com/yahoo/vespa/model/container/ApplicationContainer.java21
-rw-r--r--configserver/src/main/java/com/yahoo/vespa/config/server/deploy/ModelContextImpl.java3
-rw-r--r--controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/integration/athenz/ZmsClientMock.java2
-rw-r--r--controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/role/Role.java5
-rw-r--r--controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/role/RoleDefinition.java3
-rw-r--r--controller-server/src/main/java/com/yahoo/vespa/hosted/controller/athenz/impl/AthenzFacade.java11
-rw-r--r--controller-server/src/main/java/com/yahoo/vespa/hosted/controller/restapi/filter/AthenzRoleFilter.java37
-rw-r--r--controller-server/src/test/java/com/yahoo/vespa/hosted/controller/restapi/filter/ControllerAuthorizationFilterTest.java13
-rw-r--r--flags/src/main/java/com/yahoo/vespa/flags/Flags.java9
-rw-r--r--vespa-feed-client-api/abi-spec.json15
-rw-r--r--vespa-feed-client-api/src/main/java/ai/vespa/feed/client/FeedClient.java19
-rw-r--r--vespa-feed-client-api/src/main/java/ai/vespa/feed/client/Helper.java42
-rw-r--r--vespa-feed-client-api/src/main/java/ai/vespa/feed/client/MultiFeedException.java38
-rw-r--r--vespa-feed-client-api/src/test/java/ai/vespa/feed/client/FeedClientTest.java105
15 files changed, 312 insertions, 12 deletions
diff --git a/config-model-api/src/main/java/com/yahoo/config/model/api/ModelContext.java b/config-model-api/src/main/java/com/yahoo/config/model/api/ModelContext.java
index 6c70af8cbca..9fef9b4615d 100644
--- a/config-model-api/src/main/java/com/yahoo/config/model/api/ModelContext.java
+++ b/config-model-api/src/main/java/com/yahoo/config/model/api/ModelContext.java
@@ -113,6 +113,7 @@ public interface ModelContext {
@ModelFeatureFlag(owners = {"baldersheim", "geirst", "toregge"}) default int maxCompactBuffers() { return 1; }
@ModelFeatureFlag(owners = {"hmusum"}) default boolean failDeploymentWithInvalidJvmOptions() { return false; }
@ModelFeatureFlag(owners = {"baldersheim"}) default double tlsSizeFraction() { throw new UnsupportedOperationException("TODO specify default value"); }
+ @ModelFeatureFlag(owners = {"bjorncs"}) default boolean enableServerOcspStapling() { return false; }
}
/** Warning: As elsewhere in this package, do not make backwards incompatible changes that will break old config models! */
diff --git a/config-model/src/main/java/com/yahoo/vespa/model/container/ApplicationContainer.java b/config-model/src/main/java/com/yahoo/vespa/model/container/ApplicationContainer.java
index 9ad257fad04..7c386875d02 100644
--- a/config-model/src/main/java/com/yahoo/vespa/model/container/ApplicationContainer.java
+++ b/config-model/src/main/java/com/yahoo/vespa/model/container/ApplicationContainer.java
@@ -23,6 +23,7 @@ public final class ApplicationContainer extends Container implements
private static final String defaultHostedJVMArgs = "-XX:+SuppressFatalErrorMessage";
private final boolean isHostedVespa;
+ private final boolean enableServerOcspStapling;
public ApplicationContainer(AbstractConfigProducer<?> parent, String name, int index, DeployState deployState) {
this(parent, name, false, index, deployState);
@@ -31,6 +32,7 @@ public final class ApplicationContainer extends Container implements
public ApplicationContainer(AbstractConfigProducer<?> parent, String name, boolean retired, int index, DeployState deployState) {
super(parent, name, retired, index, deployState);
this.isHostedVespa = deployState.isHosted();
+ this.enableServerOcspStapling = deployState.featureFlags().enableServerOcspStapling();
addComponent(new SimpleComponent("com.yahoo.container.jdisc.messagebus.NetworkMultiplexerHolder"));
addComponent(new SimpleComponent("com.yahoo.container.jdisc.messagebus.NetworkMultiplexerProvider"));
@@ -64,10 +66,23 @@ public final class ApplicationContainer extends Container implements
/** Returns the jvm arguments this should start with */
@Override
public String getJvmOptions() {
+ StringBuilder b = new StringBuilder();
+ if (isHostedVespa) {
+ if (hasDocproc()) {
+ b.append(ApplicationContainer.defaultHostedJVMArgs).append(' ');
+ }
+ if (enableServerOcspStapling) {
+ b.append("-Djdk.tls.server.enableStatusRequestExtension=true ")
+ .append("-Djdk.tls.stapling.responseTimeout=2000 ")
+ .append("-Djdk.tls.stapling.cacheSize=256 ")
+ .append("-Djdk.tls.stapling.cacheLifetime=3600 ");
+ }
+ }
String jvmArgs = super.getJvmOptions();
- return isHostedVespa && hasDocproc()
- ? ("".equals(jvmArgs) ? defaultHostedJVMArgs : defaultHostedJVMArgs + " " + jvmArgs)
- : jvmArgs;
+ if (!jvmArgs.isBlank()) {
+ b.append(jvmArgs.trim());
+ }
+ return b.toString().trim();
}
private boolean hasDocproc() {
diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/deploy/ModelContextImpl.java b/configserver/src/main/java/com/yahoo/vespa/config/server/deploy/ModelContextImpl.java
index 063603fe8a8..d69fe4fba89 100644
--- a/configserver/src/main/java/com/yahoo/vespa/config/server/deploy/ModelContextImpl.java
+++ b/configserver/src/main/java/com/yahoo/vespa/config/server/deploy/ModelContextImpl.java
@@ -205,6 +205,7 @@ public class ModelContextImpl implements ModelContext {
private final int maxCompactBuffers;
private final boolean failDeploymentWithInvalidJvmOptions;
private final double tlsSizeFraction;
+ private final boolean enableServerOcspStapling;
public FeatureFlags(FlagSource source, ApplicationId appId) {
this.defaultTermwiseLimit = flagValue(source, appId, Flags.DEFAULT_TERM_WISE_LIMIT);
@@ -248,6 +249,7 @@ public class ModelContextImpl implements ModelContext {
this.maxCompactBuffers = flagValue(source, appId, Flags.MAX_COMPACT_BUFFERS);
this.failDeploymentWithInvalidJvmOptions = flagValue(source, appId, Flags.FAIL_DEPLOYMENT_WITH_INVALID_JVM_OPTIONS);
this.tlsSizeFraction = flagValue(source, appId, Flags.TLS_SIZE_FRACTION);
+ this.enableServerOcspStapling = flagValue(source, appId, Flags.ENABLE_SERVER_OCSP_STAPLING);
}
@Override public double defaultTermwiseLimit() { return defaultTermwiseLimit; }
@@ -293,6 +295,7 @@ public class ModelContextImpl implements ModelContext {
@Override public boolean failDeploymentWithInvalidJvmOptions() { return failDeploymentWithInvalidJvmOptions; }
@Override public int maxCompactBuffers() { return maxCompactBuffers; }
@Override public double tlsSizeFraction() { return tlsSizeFraction; }
+ @Override public boolean enableServerOcspStapling() { return enableServerOcspStapling; }
private static <V> V flagValue(FlagSource source, ApplicationId appId, UnboundFlag<? extends V, ?, ?> flag) {
return flag.bindTo(source)
diff --git a/controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/integration/athenz/ZmsClientMock.java b/controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/integration/athenz/ZmsClientMock.java
index 4679f660319..9fb6fa1501b 100644
--- a/controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/integration/athenz/ZmsClientMock.java
+++ b/controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/integration/athenz/ZmsClientMock.java
@@ -40,7 +40,7 @@ public class ZmsClientMock implements ZmsClient {
private final AthenzDbMock athenz;
private final AthenzIdentity controllerIdentity;
private static final Pattern TENANT_RESOURCE_PATTERN = Pattern.compile("service\\.hosting\\.tenant\\.(?<tenantDomain>[\\w\\-_]+)\\..*");
- private static final Pattern APPLICATION_RESOURCE_PATTERN = Pattern.compile("service\\.hosting\\.tenant\\.[\\w\\-_]+\\.res_group\\.(?<resourceGroup>[\\w\\-_]+)\\.wildcard");
+ private static final Pattern APPLICATION_RESOURCE_PATTERN = Pattern.compile("service\\.hosting\\.tenant\\.[\\w\\-_]+\\.res_group\\.(?<resourceGroup>[\\w\\-_]+)\\.(?<environment>[.*]+)");
public ZmsClientMock(AthenzDbMock athenz, AthenzIdentity controllerIdentity) {
this.athenz = athenz;
diff --git a/controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/role/Role.java b/controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/role/Role.java
index 5cdd12ecb1c..c40c2d4db01 100644
--- a/controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/role/Role.java
+++ b/controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/role/Role.java
@@ -52,6 +52,11 @@ public abstract class Role {
return new TenantRole(RoleDefinition.developer, tenant);
}
+ /** Returns a {@link RoleDefinition#hostedDeveloper} for the current system and given tenant. */
+ public static TenantRole hostedDeveloper(TenantName tenant) {
+ return new TenantRole(RoleDefinition.hostedDeveloper, tenant);
+ }
+
/** Returns a {@link RoleDefinition#administrator} for the current system and given tenant. */
public static TenantRole administrator(TenantName tenant) {
return new TenantRole(RoleDefinition.administrator, tenant);
diff --git a/controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/role/RoleDefinition.java b/controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/role/RoleDefinition.java
index eeb3bae4431..aed5c08f0db 100644
--- a/controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/role/RoleDefinition.java
+++ b/controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/role/RoleDefinition.java
@@ -60,6 +60,9 @@ public enum RoleDefinition {
Policy.billingInformationRead,
Policy.secretStoreOperations),
+ /** Developer for manual deployments for a tenant */
+ hostedDeveloper(Policy.developmentDeployment),
+
/** Admin — the administrative function for user management etc. */
administrator(Policy.tenantUpdate,
Policy.tenantManager,
diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/athenz/impl/AthenzFacade.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/athenz/impl/AthenzFacade.java
index d116ef3333c..28cf132af90 100644
--- a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/athenz/impl/AthenzFacade.java
+++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/athenz/impl/AthenzFacade.java
@@ -5,7 +5,9 @@ import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.inject.Inject;
import com.yahoo.config.provision.ApplicationName;
+import com.yahoo.config.provision.Environment;
import com.yahoo.config.provision.TenantName;
+import com.yahoo.config.provision.Zone;
import com.yahoo.text.Text;
import com.yahoo.vespa.athenz.api.AthenzDomain;
import com.yahoo.vespa.athenz.api.AthenzIdentity;
@@ -248,9 +250,9 @@ public class AthenzFacade implements AccessControl {
}
public boolean hasApplicationAccess(
- AthenzIdentity identity, ApplicationAction action, AthenzDomain tenantDomain, ApplicationName applicationName) {
+ AthenzIdentity identity, ApplicationAction action, AthenzDomain tenantDomain, ApplicationName applicationName, Optional<Zone> zone) {
return hasAccess(
- action.name(), applicationResourceString(tenantDomain, applicationName), identity);
+ action.name(), applicationResourceString(tenantDomain, applicationName, zone), identity);
}
public boolean hasTenantAdminAccess(AthenzIdentity identity, AthenzDomain tenantDomain) {
@@ -325,8 +327,9 @@ public class AthenzFacade implements AccessControl {
return resourceStringPrefix(tenantDomain) + ".wildcard";
}
- private String applicationResourceString(AthenzDomain tenantDomain, ApplicationName applicationName) {
- return resourceStringPrefix(tenantDomain) + "." + "res_group" + "." + applicationName.value() + ".wildcard";
+ private String applicationResourceString(AthenzDomain tenantDomain, ApplicationName applicationName, Optional<Zone> zone) {
+ String environment = zone.map(Zone::environment).map(Environment::value).orElse("*");
+ return resourceStringPrefix(tenantDomain) + "." + "res_group" + "." + applicationName.value() + "." + environment;
}
private enum TenantAction {
diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/restapi/filter/AthenzRoleFilter.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/restapi/filter/AthenzRoleFilter.java
index c685390c7ed..7ab3b75a758 100644
--- a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/restapi/filter/AthenzRoleFilter.java
+++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/restapi/filter/AthenzRoleFilter.java
@@ -4,7 +4,10 @@ package com.yahoo.vespa.hosted.controller.restapi.filter;
import com.auth0.jwt.JWT;
import com.google.inject.Inject;
import com.yahoo.config.provision.ApplicationName;
+import com.yahoo.config.provision.Environment;
+import com.yahoo.config.provision.RegionName;
import com.yahoo.config.provision.TenantName;
+import com.yahoo.config.provision.Zone;
import com.yahoo.jdisc.http.filter.DiscFilterRequest;
import com.yahoo.jdisc.http.filter.security.base.JsonSecurityRequestFilterBase;
@@ -16,6 +19,7 @@ import com.yahoo.restapi.Path;
import com.yahoo.vespa.athenz.api.AthenzDomain;
import com.yahoo.vespa.athenz.api.AthenzIdentity;
import com.yahoo.vespa.athenz.api.AthenzPrincipal;
+import com.yahoo.vespa.athenz.api.AthenzUser;
import com.yahoo.vespa.athenz.client.zms.ZmsClientException;
import com.yahoo.vespa.hosted.controller.Controller;
import com.yahoo.vespa.hosted.controller.TenantController;
@@ -94,6 +98,15 @@ public class AthenzRoleFilter extends JsonSecurityRequestFilterBase {
path.matches("/application/v4/tenant/{tenant}/application/{application}/{*}");
Optional<ApplicationName> application = Optional.ofNullable(path.get("application")).map(ApplicationName::from);
+ final Optional<Zone> zone;
+ if(path.matches("/application/v4/tenant/{tenant}/application/{application}/{*}/instance/{*}/environment/{environment}/region/{region}/{*}")) {
+ zone = Optional.of(new Zone(Environment.from(path.get("environment")), RegionName.from(path.get("region"))));
+ } else if(path.matches("/application/v4/tenant/{tenant}/application/{application}/{*}/environment/{environment}/region/{region}/{*}")) {
+ zone = Optional.of(new Zone(Environment.from(path.get("environment")), RegionName.from(path.get("region"))));
+ } else {
+ zone = Optional.empty();
+ }
+
AthenzIdentity identity = principal.getIdentity();
Set<Role> roleMemberships = new CopyOnWriteArraySet<>();
@@ -121,10 +134,18 @@ public class AthenzRoleFilter extends JsonSecurityRequestFilterBase {
&& ! tenant.get().name().value().equals("sandbox"))
futures.add(executor.submit(() -> {
if ( tenant.get().type() == Tenant.Type.athenz
- && hasDeployerAccess(identity, ((AthenzTenant) tenant.get()).domain(), application.get()))
+ && hasDeployerAccess(identity, ((AthenzTenant) tenant.get()).domain(), application.get(), zone))
roleMemberships.add(Role.buildService(tenant.get().name(), application.get()));
}));
+ if (identity instanceof AthenzUser && zone.isPresent()) {
+ Zone z = zone.get();
+ futures.add(executor.submit(() -> {
+ if (canDeployToManualZones(identity, ((AthenzTenant) tenant.get()).domain(), application.get(), z))
+ roleMemberships.add(Role.hostedDeveloper(tenant.get().name()));
+ }));
+ }
+
futures.add(executor.submit(() -> {
if (athenz.hasSystemFlagsAccess(identity, /*dryrun*/false))
roleMemberships.add(Role.systemFlagsDeployer());
@@ -167,12 +188,22 @@ public class AthenzRoleFilter extends JsonSecurityRequestFilterBase {
}
}
- private boolean hasDeployerAccess(AthenzIdentity identity, AthenzDomain tenantDomain, ApplicationName application) {
+ private boolean hasDeployerAccess(AthenzIdentity identity, AthenzDomain tenantDomain, ApplicationName application, Optional<Zone> zone) {
try {
return athenz.hasApplicationAccess(identity,
ApplicationAction.deploy,
tenantDomain,
- application);
+ application,
+ zone);
+ } catch (ZmsClientException e) {
+ throw new RuntimeException("Failed to authorize operation: (" + e.getMessage() + ")", e);
+ }
+ }
+
+ private boolean canDeployToManualZones(AthenzIdentity identity, AthenzDomain tenantDomain, ApplicationName application, Zone zone) {
+ if (! zone.environment().isManuallyDeployed()) return false;
+ try {
+ return athenz.hasApplicationAccess(identity, ApplicationAction.deploy, tenantDomain, application, Optional.of(zone));
} catch (ZmsClientException e) {
throw new RuntimeException("Failed to authorize operation: (" + e.getMessage() + ")", e);
}
diff --git a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/restapi/filter/ControllerAuthorizationFilterTest.java b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/restapi/filter/ControllerAuthorizationFilterTest.java
index 9e17b44c9a6..eab3a37a9c3 100644
--- a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/restapi/filter/ControllerAuthorizationFilterTest.java
+++ b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/restapi/filter/ControllerAuthorizationFilterTest.java
@@ -4,6 +4,7 @@ package com.yahoo.vespa.hosted.controller.restapi.filter;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.yahoo.application.container.handler.Request;
import com.yahoo.config.provision.SystemName;
+import com.yahoo.config.provision.TenantName;
import com.yahoo.jdisc.http.HttpRequest.Method;
import com.yahoo.jdisc.http.filter.DiscFilterRequest;
import com.yahoo.vespa.hosted.controller.ControllerTester;
@@ -75,6 +76,18 @@ public class ControllerAuthorizationFilterTest {
assertIsAllowed(invokeFilter(filter, createRequest(Method.GET, "/zone/v1/path", securityContext)));
}
+ @Test
+ public void hostedDeveloper() {
+ ControllerTester tester = new ControllerTester();
+ TenantName tenantName = TenantName.defaultName();
+ SecurityContext securityContext = new SecurityContext(() -> "user", Set.of(Role.hostedDeveloper(tenantName)));
+
+ ControllerAuthorizationFilter filter = createFilter(tester);
+ assertIsAllowed(invokeFilter(filter, createRequest(Method.POST, "/application/v4/tenant/" + tenantName.value() + "/application/app/instance/default/environment/dev/region/region/deploy", securityContext)));
+ assertIsForbidden(invokeFilter(filter, createRequest(Method.POST, "/application/v4/tenant/" + tenantName.value() + "/application/app/instance/default/environment/prod/region/region/deploy", securityContext)));
+ assertIsForbidden(invokeFilter(filter, createRequest(Method.POST, "/application/v4/tenant/" + tenantName.value() + "/application/app/submit", securityContext)));
+ }
+
private static void assertIsAllowed(Optional<AuthorizationResponse> response) {
assertFalse("Expected no response from filter, but got \"" +
response.map(r -> r.message + "\" (" + r.statusCode + ")").orElse(""),
diff --git a/flags/src/main/java/com/yahoo/vespa/flags/Flags.java b/flags/src/main/java/com/yahoo/vespa/flags/Flags.java
index c3394c9dd76..5ef22580f28 100644
--- a/flags/src/main/java/com/yahoo/vespa/flags/Flags.java
+++ b/flags/src/main/java/com/yahoo/vespa/flags/Flags.java
@@ -303,7 +303,7 @@ public class Flags {
);
public static final UnboundBooleanFlag ENABLE_ROUTING_REUSE_PORT = defineFeatureFlag(
- "enable-routing-reuse-port", false,
+ "enable-routing-reuse-port", true,
List.of("mortent"), "2021-09-29", "2022-02-01",
"Enable reuse port in routing configuration",
"Takes effect on container restart",
@@ -400,6 +400,13 @@ public class Flags {
"Takes effect at redeployment",
ZONE_ID, APPLICATION_ID);
+ public static final UnboundBooleanFlag ENABLE_SERVER_OCSP_STAPLING = defineFeatureFlag(
+ "enable-server-ocsp-stapling", false,
+ List.of("bjorncs"), "2021-12-17", "2022-06-01",
+ "Enable server OCSP stapling for jdisc containers",
+ "Takes effect on redeployment",
+ ZONE_ID, APPLICATION_ID);
+
/** WARNING: public for testing: All flags should be defined in {@link Flags}. */
public static UnboundBooleanFlag defineFeatureFlag(String flagId, boolean defaultValue, List<String> owners,
String createdAt, String expiresAt, String description,
diff --git a/vespa-feed-client-api/abi-spec.json b/vespa-feed-client-api/abi-spec.json
index a9047365a7a..8af7798984f 100644
--- a/vespa-feed-client-api/abi-spec.json
+++ b/vespa-feed-client-api/abi-spec.json
@@ -103,6 +103,8 @@
"public abstract java.util.concurrent.CompletableFuture put(ai.vespa.feed.client.DocumentId, java.lang.String, ai.vespa.feed.client.OperationParameters)",
"public abstract java.util.concurrent.CompletableFuture update(ai.vespa.feed.client.DocumentId, java.lang.String, ai.vespa.feed.client.OperationParameters)",
"public abstract java.util.concurrent.CompletableFuture remove(ai.vespa.feed.client.DocumentId, ai.vespa.feed.client.OperationParameters)",
+ "public static java.util.List await(java.util.List)",
+ "public static varargs java.util.List await(java.util.concurrent.CompletableFuture[])",
"public abstract ai.vespa.feed.client.OperationStats stats()",
"public abstract ai.vespa.feed.client.FeedClient$CircuitBreaker$State circuitBreakerState()",
"public abstract void close(boolean)",
@@ -221,6 +223,19 @@
],
"fields": []
},
+ "ai.vespa.feed.client.MultiFeedException": {
+ "superClass": "java.lang.RuntimeException",
+ "interfaces": [],
+ "attributes": [
+ "public"
+ ],
+ "methods": [
+ "public void <init>(java.util.Collection)",
+ "public java.util.Collection feedExceptions()",
+ "public java.util.Set documentIds()"
+ ],
+ "fields": []
+ },
"ai.vespa.feed.client.OperationParameters": {
"superClass": "java.lang.Object",
"interfaces": [],
diff --git a/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/FeedClient.java b/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/FeedClient.java
index d463c611d6a..5e95990a078 100644
--- a/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/FeedClient.java
+++ b/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/FeedClient.java
@@ -2,6 +2,7 @@
package ai.vespa.feed.client;
import java.io.Closeable;
+import java.util.List;
import java.util.concurrent.CompletableFuture;
/**
@@ -37,6 +38,24 @@ public interface FeedClient extends Closeable {
*/
CompletableFuture<Result> remove(DocumentId documentId, OperationParameters params);
+ /**
+ * Waits for all feed operations to complete, either successfully or with exception.
+ * @throws MultiFeedException if any operation fails
+ * @return list of results with the same ordering as the {@code promises} parameter
+ * */
+ static List<Result> await(List<CompletableFuture<Result>> promises) throws MultiFeedException {
+ return Helper.await(promises);
+ }
+
+ /**
+ * Same as {@link #await(List)} except {@code promises} parameter is a vararg
+ * @see #await(List)
+ */
+ @SafeVarargs
+ static List<Result> await(CompletableFuture<Result>... promises) throws MultiFeedException {
+ return Helper.await(promises);
+ }
+
/** Returns a snapshot of the stats for this feed client, such as requests made, and responses by status. */
OperationStats stats();
diff --git a/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/Helper.java b/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/Helper.java
new file mode 100644
index 00000000000..59c12077bef
--- /dev/null
+++ b/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/Helper.java
@@ -0,0 +1,42 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package ai.vespa.feed.client;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.stream.Collectors;
+
+/**
+ * @author bjorncs
+ */
+class Helper {
+
+ private Helper() {}
+
+ @SafeVarargs
+ static List<Result> await(CompletableFuture<Result>... promises) throws MultiFeedException {
+ List<CompletableFuture<Result>> list = new ArrayList<>();
+ for (CompletableFuture<Result> p : promises) list.add(p);
+ return await(list);
+ }
+
+ static List<Result> await(List<CompletableFuture<Result>> promises) throws MultiFeedException {
+ try {
+ CompletableFuture.allOf(promises.toArray(new CompletableFuture<?>[0])).join();
+ return promises.stream()
+ .map(p -> Objects.requireNonNull(p.getNow(null)))
+ .collect(Collectors.toList());
+ } catch (CompletionException e) {
+ List<FeedException> exceptions = new ArrayList<>();
+ for (CompletableFuture<Result> promise : promises) {
+ if (promise.isCompletedExceptionally()) {
+ // Lambda is executed on this thread since the future is already completed
+ promise.whenComplete((__, error) -> exceptions.add((FeedException) error));
+ }
+ }
+ throw new MultiFeedException(exceptions);
+ }
+ }
+}
diff --git a/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/MultiFeedException.java b/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/MultiFeedException.java
new file mode 100644
index 00000000000..5db687b49ff
--- /dev/null
+++ b/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/MultiFeedException.java
@@ -0,0 +1,38 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package ai.vespa.feed.client;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Aggregates multiple instances of {@link FeedException}
+ *
+ * @author bjorncs
+ */
+public class MultiFeedException extends RuntimeException {
+
+ private final List<FeedException> exceptions;
+
+ public MultiFeedException(Collection<FeedException> exceptions) {
+ super(toMessage(exceptions));
+ this.exceptions = Collections.unmodifiableList(new ArrayList<>(exceptions));
+ }
+
+ public Collection<FeedException> feedExceptions() { return exceptions; }
+
+ public Set<DocumentId> documentIds() {
+ return exceptions.stream()
+ .filter(e -> e.documentId().isPresent())
+ .map(e -> e.documentId().get())
+ .collect(Collectors.toSet());
+ }
+
+ private static String toMessage(Collection<FeedException> exceptions) {
+ return String.format("%d feed operations failed", exceptions.size());
+ }
+
+}
diff --git a/vespa-feed-client-api/src/test/java/ai/vespa/feed/client/FeedClientTest.java b/vespa-feed-client-api/src/test/java/ai/vespa/feed/client/FeedClientTest.java
new file mode 100644
index 00000000000..688f311bb05
--- /dev/null
+++ b/vespa-feed-client-api/src/test/java/ai/vespa/feed/client/FeedClientTest.java
@@ -0,0 +1,105 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package ai.vespa.feed.client;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * @author bjorncs
+ */
+class FeedClientTest {
+
+ private ExecutorService executor;
+
+ @BeforeEach
+ void setUp() {
+ executor = Executors.newSingleThreadExecutor();
+ }
+
+ @AfterEach
+ void tearDown() throws InterruptedException {
+ executor.shutdown();
+ assertTrue(executor.awaitTermination(60, TimeUnit.SECONDS));
+ }
+
+ @Test
+ void await_returns_list_of_result_on_success() {
+ MyResult r1 = new MyResult();
+ CompletableFuture<Result> f1 = CompletableFuture.completedFuture(r1);
+ MyResult r2 = new MyResult();
+ CompletableFuture<Result> f2 = CompletableFuture.completedFuture(r2);
+ MyResult r3 = new MyResult();
+ CompletableFuture<Result> f3 = CompletableFuture.completedFuture(r3);
+
+ List<Result> aggregated = FeedClient.await(f1, f2, f3);
+ assertEquals(3, aggregated.size());
+ assertEquals(r1, aggregated.get(0));
+ assertEquals(r2, aggregated.get(1));
+ assertEquals(r3, aggregated.get(2));
+ }
+
+ @Test
+ void await_handles_async_completion_with_success() throws ExecutionException, InterruptedException {
+ CompletableFuture<Result> f1 = new CompletableFuture<>();
+ CompletableFuture<Result> f2 = new CompletableFuture<>();
+ CompletableFuture<Result> f3 = new CompletableFuture<>();
+
+ CompletableFuture<List<Result>> awaitPromise = CompletableFuture.supplyAsync(() -> FeedClient.await(f1, f2, f3), executor);
+ // Completed in reverse order
+ MyResult r3 = new MyResult();
+ f3.complete(r3);
+ MyResult r2 = new MyResult();
+ f2.complete(r2);
+ MyResult r1 = new MyResult();
+ f1.complete(r1);
+
+ List<Result> aggregated = awaitPromise.get();
+ assertEquals(3, aggregated.size());
+ assertEquals(r1, aggregated.get(0));
+ assertEquals(r2, aggregated.get(1));
+ assertEquals(r3, aggregated.get(2));
+ }
+
+ @Test
+ void await_throws_when_some_results_completes_exceptionally() {
+ CompletableFuture<Result> f1 = new CompletableFuture<>();
+ DocumentId docId1 = DocumentId.of("music", "music", "doc1");
+ FeedException exceptionDoc1 = new FeedException(docId1, "Doc1 failed");
+ f1.completeExceptionally(exceptionDoc1);
+ CompletableFuture<Result> f2 = new CompletableFuture<>();
+ DocumentId docId2 = DocumentId.of("music", "music", "doc2");
+ FeedException exceptionDoc2 = new FeedException(docId2, "Doc2 failed");
+ f2.completeExceptionally(exceptionDoc2);
+ CompletableFuture<Result> f3 = CompletableFuture.completedFuture(new MyResult());
+
+ MultiFeedException multiException = assertThrows(MultiFeedException.class, () -> FeedClient.await(f1, f2, f3));
+ Set<DocumentId> expectedDocsIds = new HashSet<>(Arrays.asList(docId1, docId2));
+ assertEquals(expectedDocsIds, new HashSet<>(multiException.documentIds()));
+ Set<FeedException> expectedExceptions = new HashSet<>(Arrays.asList(exceptionDoc1, exceptionDoc2));
+ assertEquals(expectedExceptions, new HashSet<>(multiException.feedExceptions()));
+ assertEquals("2 feed operations failed", multiException.getMessage());
+ }
+
+ static class MyResult implements Result {
+ @Override public Type type() { return null; }
+ @Override public DocumentId documentId() { return null; }
+ @Override public Optional<String> resultMessage() { return Optional.empty(); }
+ @Override public Optional<String> traceMessage() { return Optional.empty(); }
+ }
+}