diff options
40 files changed, 516 insertions, 215 deletions
diff --git a/cloud-tenant-base-dependencies-enforcer/pom.xml b/cloud-tenant-base-dependencies-enforcer/pom.xml index 636dbf91a57..bcff251f2c2 100644 --- a/cloud-tenant-base-dependencies-enforcer/pom.xml +++ b/cloud-tenant-base-dependencies-enforcer/pom.xml @@ -22,7 +22,7 @@ <aopalliance.version>1.0</aopalliance.version> <athenz.version>1.10.14</athenz.version> <bouncycastle.version>1.68</bouncycastle.version> - <felix.version>6.0.3</felix.version> + <felix.version>7.0.1</felix.version> <felix.log.version>1.0.1</felix.log.version> <findbugs.version>1.3.9</findbugs.version> <guava.version>20.0</guava.version> diff --git a/config-model-api/src/main/java/com/yahoo/config/model/api/ModelContext.java b/config-model-api/src/main/java/com/yahoo/config/model/api/ModelContext.java index 2c51aa89c66..8c2957502a8 100644 --- a/config-model-api/src/main/java/com/yahoo/config/model/api/ModelContext.java +++ b/config-model-api/src/main/java/com/yahoo/config/model/api/ModelContext.java @@ -103,6 +103,7 @@ public interface ModelContext { @ModelFeatureFlag(owners = {"geirst", "vekterli"}) default int distributorMergeBusyWait() { return 10; } @ModelFeatureFlag(owners = {"vekterli", "geirst"}) default boolean distributorEnhancedMaintenanceScheduling() { return false; } @ModelFeatureFlag(owners = {"arnej"}) default boolean forwardIssuesAsErrors() { return true; } + @ModelFeatureFlag(owners = {"geirst", "vekterli"}) default boolean asyncApplyBucketDiff() { return false; } } /** Warning: As elsewhere in this package, do not make backwards incompatible changes that will break old config models! */ diff --git a/config-model/src/main/java/com/yahoo/config/model/deploy/TestProperties.java b/config-model/src/main/java/com/yahoo/config/model/deploy/TestProperties.java index 7a09a36c1d7..b7506587102 100644 --- a/config-model/src/main/java/com/yahoo/config/model/deploy/TestProperties.java +++ b/config-model/src/main/java/com/yahoo/config/model/deploy/TestProperties.java @@ -70,6 +70,7 @@ public class TestProperties implements ModelContext.Properties, ModelContext.Fea private int maxUnCommittedMemory = 123456; private double diskBloatFactor = 0.2; private boolean distributorEnhancedMaintenanceScheduling = false; + private boolean asyncApplyBucketDiff = false; @Override public ModelContext.FeatureFlags featureFlags() { return this; } @Override public boolean multitenant() { return multitenant; } @@ -120,6 +121,7 @@ public class TestProperties implements ModelContext.Properties, ModelContext.Fea @Override public int docstoreCompressionLevel() { return docstoreCompressionLevel; } @Override public boolean distributorEnhancedMaintenanceScheduling() { return distributorEnhancedMaintenanceScheduling; } @Override public int maxUnCommittedMemory() { return maxUnCommittedMemory; } + @Override public boolean asyncApplyBucketDiff() { return asyncApplyBucketDiff; } public TestProperties maxUnCommittedMemory(int maxUnCommittedMemory) { this.maxUnCommittedMemory = maxUnCommittedMemory; @@ -307,6 +309,11 @@ public class TestProperties implements ModelContext.Properties, ModelContext.Fea return this; } + public TestProperties setAsyncApplyBucketDiff(boolean value) { + asyncApplyBucketDiff = value; + return this; + } + public static class Spec implements ConfigServerSpec { private final String hostName; diff --git a/config-model/src/main/java/com/yahoo/vespa/model/ConfigSentinel.java b/config-model/src/main/java/com/yahoo/vespa/model/ConfigSentinel.java index 2eb8a6e319b..c86cda0bbd1 100644 --- a/config-model/src/main/java/com/yahoo/vespa/model/ConfigSentinel.java +++ b/config-model/src/main/java/com/yahoo/vespa/model/ConfigSentinel.java @@ -19,7 +19,6 @@ public class ConfigSentinel extends AbstractService implements SentinelConfig.Pr private final ApplicationId applicationId; private final Zone zone; - private final boolean requireConnectivityCheck; /** * Constructs a new ConfigSentinel for the given host. @@ -32,7 +31,6 @@ public class ConfigSentinel extends AbstractService implements SentinelConfig.Pr super(host, "sentinel"); this.applicationId = applicationId; this.zone = zone; - this.requireConnectivityCheck = featureFlags.requireConnectivityCheck(); portsMeta.on(0).tag("rpc").tag("admin"); portsMeta.on(1).tag("telnet").tag("interactive").tag("http").tag("state"); setProp("clustertype", "hosts"); @@ -80,16 +78,6 @@ public class ConfigSentinel extends AbstractService implements SentinelConfig.Pr builder.service(getServiceConfig(s)); } } - builder.connectivity(getConnectivityConfig(requireConnectivityCheck)); - } - - private SentinelConfig.Connectivity.Builder getConnectivityConfig(boolean enable) { - var builder = new SentinelConfig.Connectivity.Builder(); - if (! enable) { - builder.minOkPercent(0); - builder.maxBadCount(Integer.MAX_VALUE); - } - return builder; } private SentinelConfig.Application.Builder getApplicationConfig() { diff --git a/config-model/src/main/java/com/yahoo/vespa/model/clients/ContainerDocumentApi.java b/config-model/src/main/java/com/yahoo/vespa/model/clients/ContainerDocumentApi.java index 0177aa6b2e8..8534e1f65a7 100644 --- a/config-model/src/main/java/com/yahoo/vespa/model/clients/ContainerDocumentApi.java +++ b/config-model/src/main/java/com/yahoo/vespa/model/clients/ContainerDocumentApi.java @@ -23,24 +23,23 @@ public class ContainerDocumentApi { public static final String DOCUMENT_V1_PREFIX = "/document/v1"; public ContainerDocumentApi(ContainerCluster<?> cluster, Options options) { - var executor = new Threadpool("feedapi-handler", cluster, options.feedApiThreadpoolOptions); - cluster.addComponent(executor); - addRestApiHandler(cluster, options, executor); - addFeedHandler(cluster, options, executor); + addRestApiHandler(cluster, options); + addFeedHandler(cluster, options); } - private static void addFeedHandler(ContainerCluster<?> cluster, Options options, Threadpool executor) { + private static void addFeedHandler(ContainerCluster<?> cluster, Options options) { String bindingSuffix = ContainerCluster.RESERVED_URI_PREFIX + "/feedapi"; var handler = newVespaClientHandler("com.yahoo.vespa.http.server.FeedHandler", bindingSuffix, options); cluster.addComponent(handler); + var executor = new Threadpool("feedapi-handler", cluster, options.feedApiThreadpoolOptions); handler.inject(executor); + handler.addComponent(executor); } - private static void addRestApiHandler(ContainerCluster<?> cluster, Options options, Threadpool executor) { + private static void addRestApiHandler(ContainerCluster<?> cluster, Options options) { var handler = newVespaClientHandler("com.yahoo.document.restapi.resource.DocumentV1ApiHandler", DOCUMENT_V1_PREFIX + "/*", options); cluster.addComponent(handler); - handler.inject(executor); // We need to include a dummy implementation of the previous restapi handler (using the same class name). // The internal legacy test framework requires that the name of the old handler is listed in /ApplicationStatus. diff --git a/config-model/src/main/java/com/yahoo/vespa/model/content/storagecluster/FileStorProducer.java b/config-model/src/main/java/com/yahoo/vespa/model/content/storagecluster/FileStorProducer.java index 5c692e8ef6b..d7f6fb6c581 100644 --- a/config-model/src/main/java/com/yahoo/vespa/model/content/storagecluster/FileStorProducer.java +++ b/config-model/src/main/java/com/yahoo/vespa/model/content/storagecluster/FileStorProducer.java @@ -47,6 +47,7 @@ public class FileStorProducer implements StorFilestorConfig.Producer { private final int reponseNumThreads; private final StorFilestorConfig.Response_sequencer_type.Enum responseSequencerType; private final boolean useAsyncMessageHandlingOnSchedule; + private final boolean asyncApplyBucketDiff; private static StorFilestorConfig.Response_sequencer_type.Enum convertResponseSequencerType(String sequencerType) { try { @@ -62,6 +63,7 @@ public class FileStorProducer implements StorFilestorConfig.Producer { this.reponseNumThreads = featureFlags.defaultNumResponseThreads(); this.responseSequencerType = convertResponseSequencerType(featureFlags.responseSequencerType()); useAsyncMessageHandlingOnSchedule = featureFlags.useAsyncMessageHandlingOnSchedule(); + asyncApplyBucketDiff = featureFlags.asyncApplyBucketDiff(); } @Override @@ -73,6 +75,7 @@ public class FileStorProducer implements StorFilestorConfig.Producer { builder.num_response_threads(reponseNumThreads); builder.response_sequencer_type(responseSequencerType); builder.use_async_message_handling_on_schedule(useAsyncMessageHandlingOnSchedule); + builder.async_apply_bucket_diff(asyncApplyBucketDiff); } } diff --git a/config-model/src/test/java/com/yahoo/vespa/model/container/xml/ContainerDocumentApiBuilderTest.java b/config-model/src/test/java/com/yahoo/vespa/model/container/xml/ContainerDocumentApiBuilderTest.java index b52f797dae3..ed26dafe60c 100644 --- a/config-model/src/test/java/com/yahoo/vespa/model/container/xml/ContainerDocumentApiBuilderTest.java +++ b/config-model/src/test/java/com/yahoo/vespa/model/container/xml/ContainerDocumentApiBuilderTest.java @@ -105,7 +105,7 @@ public class ContainerDocumentApiBuilderTest extends ContainerModelBuilderTestBa assertThat(injectedComponentIds, hasItem("threadpool@feedapi-handler")); ContainerThreadpoolConfig config = root.getConfig( - ContainerThreadpoolConfig.class, "cluster1/component/threadpool@feedapi-handler"); + ContainerThreadpoolConfig.class, "cluster1/component/com.yahoo.vespa.http.server.FeedHandler/threadpool@feedapi-handler"); assertEquals(-4, config.maxThreads()); assertEquals(-4, config.minThreads()); } @@ -128,7 +128,7 @@ public class ContainerDocumentApiBuilderTest extends ContainerModelBuilderTestBa createModel(root, elem); ContainerThreadpoolConfig feedThreadpoolConfig = root.getConfig( - ContainerThreadpoolConfig.class, "cluster1/component/threadpool@feedapi-handler"); + ContainerThreadpoolConfig.class, "cluster1/component/com.yahoo.vespa.http.server.FeedHandler/threadpool@feedapi-handler"); assertEquals(50, feedThreadpoolConfig.maxThreads()); assertEquals(25, feedThreadpoolConfig.minThreads()); assertEquals(1000, feedThreadpoolConfig.queueSize()); diff --git a/config-model/src/test/java/com/yahoo/vespa/model/content/StorageClusterTest.java b/config-model/src/test/java/com/yahoo/vespa/model/content/StorageClusterTest.java index 9d8d7509966..739f8b7fff2 100644 --- a/config-model/src/test/java/com/yahoo/vespa/model/content/StorageClusterTest.java +++ b/config-model/src/test/java/com/yahoo/vespa/model/content/StorageClusterTest.java @@ -142,6 +142,16 @@ public class StorageClusterTest { return new StorServerConfig(builder); } + private StorFilestorConfig filestorConfigFromProducer(StorFilestorConfig.Producer producer) { + var builder = new StorFilestorConfig.Builder(); + producer.getConfig(builder); + return new StorFilestorConfig(builder); + } + + private StorFilestorConfig filestorConfigFromProperties(TestProperties properties) { + return filestorConfigFromProducer(parse(cluster("foo", ""), properties)); + } + @Test public void testMergeFeatureFlags() { var config = configFromProperties(new TestProperties().setMaxMergeQueueSize(1919).setMaxConcurrentMergesPerNode(37)); @@ -159,6 +169,15 @@ public class StorageClusterTest { } @Test + public void async_apply_bucket_diff_can_be_controlled_by_feature_flag() { + var config = filestorConfigFromProperties(new TestProperties()); + assertFalse(config.async_apply_bucket_diff()); + + config = filestorConfigFromProperties(new TestProperties().setAsyncApplyBucketDiff(true)); + assertTrue(config.async_apply_bucket_diff()); + } + + @Test public void testVisitors() { StorVisitorConfig.Builder builder = new StorVisitorConfig.Builder(); parse(cluster("bees", @@ -188,9 +207,7 @@ public class StorageClusterTest { ); { - StorFilestorConfig.Builder builder = new StorFilestorConfig.Builder(); - stc.getConfig(builder); - StorFilestorConfig config = new StorFilestorConfig(builder); + var config = filestorConfigFromProducer(stc); assertEquals(7, config.num_threads()); assertFalse(config.enable_multibit_split_optimalization()); @@ -199,9 +216,7 @@ public class StorageClusterTest { { assertEquals(1, stc.getChildren().size()); StorageNode sn = stc.getChildren().values().iterator().next(); - StorFilestorConfig.Builder builder = new StorFilestorConfig.Builder(); - sn.getConfig(builder); - StorFilestorConfig config = new StorFilestorConfig(builder); + var config = filestorConfigFromProducer(sn); assertEquals(7, config.num_threads()); } } @@ -215,9 +230,7 @@ public class StorageClusterTest { "</tuning>")), new Flavor(new FlavorsConfig.Flavor.Builder().name("test-flavor").minCpuCores(9).build()) ); - StorFilestorConfig.Builder builder = new StorFilestorConfig.Builder(); - stc.getConfig(builder); - StorFilestorConfig config = new StorFilestorConfig(builder); + var config = filestorConfigFromProducer(stc); assertEquals(2, config.num_response_threads()); assertEquals(StorFilestorConfig.Response_sequencer_type.ADAPTIVE, config.response_sequencer_type()); assertEquals(7, config.num_threads()); @@ -238,9 +251,7 @@ public class StorageClusterTest { ); { - StorFilestorConfig.Builder builder = new StorFilestorConfig.Builder(); - stc.getConfig(builder); - StorFilestorConfig config = new StorFilestorConfig(builder); + var config = filestorConfigFromProducer(stc); assertEquals(4, config.num_threads()); assertFalse(config.enable_multibit_split_optimalization()); @@ -248,9 +259,7 @@ public class StorageClusterTest { { assertEquals(1, stc.getChildren().size()); StorageNode sn = stc.getChildren().values().iterator().next(); - StorFilestorConfig.Builder builder = new StorFilestorConfig.Builder(); - sn.getConfig(builder); - StorFilestorConfig config = new StorFilestorConfig(builder); + var config = filestorConfigFromProducer(sn); assertEquals(4, config.num_threads()); } } @@ -262,17 +271,13 @@ public class StorageClusterTest { ); { - StorFilestorConfig.Builder builder = new StorFilestorConfig.Builder(); - stc.getConfig(builder); - StorFilestorConfig config = new StorFilestorConfig(builder); + var config = filestorConfigFromProducer(stc); assertEquals(8, config.num_threads()); } { assertEquals(1, stc.getChildren().size()); StorageNode sn = stc.getChildren().values().iterator().next(); - StorFilestorConfig.Builder builder = new StorFilestorConfig.Builder(); - sn.getConfig(builder); - StorFilestorConfig config = new StorFilestorConfig(builder); + var config = filestorConfigFromProducer(sn); assertEquals(9, config.num_threads()); } } @@ -285,17 +290,13 @@ public class StorageClusterTest { @Test public void testFeatureFlagControlOfResponseSequencer() { - StorFilestorConfig.Builder builder = new StorFilestorConfig.Builder(); - simpleCluster(new TestProperties().setResponseNumThreads(13).setResponseSequencerType("THROUGHPUT")).getConfig(builder); - StorFilestorConfig config = new StorFilestorConfig(builder); + var config = filestorConfigFromProducer(simpleCluster(new TestProperties().setResponseNumThreads(13).setResponseSequencerType("THROUGHPUT"))); assertEquals(13, config.num_response_threads()); assertEquals(StorFilestorConfig.Response_sequencer_type.THROUGHPUT, config.response_sequencer_type()); } private void verifyAsyncMessageHandlingOnSchedule(boolean expected, boolean value) { - StorFilestorConfig.Builder builder = new StorFilestorConfig.Builder(); - simpleCluster(new TestProperties().setAsyncMessageHandlingOnSchedule(value)).getConfig(builder); - StorFilestorConfig config = new StorFilestorConfig(builder); + var config = filestorConfigFromProducer(simpleCluster(new TestProperties().setAsyncMessageHandlingOnSchedule(value))); assertEquals(expected, config.use_async_message_handling_on_schedule()); } @Test diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/deploy/ModelContextImpl.java b/configserver/src/main/java/com/yahoo/vespa/config/server/deploy/ModelContextImpl.java index 2752bda3c68..7fdc18827f4 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/deploy/ModelContextImpl.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/deploy/ModelContextImpl.java @@ -179,7 +179,6 @@ public class ModelContextImpl implements ModelContext { private final List<String> allowedAthenzProxyIdentities; private final int maxActivationInhibitedOutOfSyncGroups; private final ToIntFunction<ClusterSpec.Type> jvmOmitStackTraceInFastThrow; - private final boolean requireConnectivityCheck; private final int maxConcurrentMergesPerContentNode; private final int maxMergeQueueSize; private final boolean ignoreMergeQueueLimit; @@ -198,6 +197,7 @@ public class ModelContextImpl implements ModelContext { private final boolean distributorEnhancedMaintenanceScheduling; private final int maxUnCommittedMemory; private final boolean forwardIssuesAsErrors; + private final boolean asyncApplyBucketDiff; public FeatureFlags(FlagSource source, ApplicationId appId) { this.defaultTermwiseLimit = flagValue(source, appId, Flags.DEFAULT_TERM_WISE_LIMIT); @@ -216,7 +216,6 @@ public class ModelContextImpl implements ModelContext { this.maxActivationInhibitedOutOfSyncGroups = flagValue(source, appId, Flags.MAX_ACTIVATION_INHIBITED_OUT_OF_SYNC_GROUPS); this.jvmOmitStackTraceInFastThrow = type -> flagValueAsInt(source, appId, type, PermanentFlags.JVM_OMIT_STACK_TRACE_IN_FAST_THROW); this.largeRankExpressionLimit = flagValue(source, appId, Flags.LARGE_RANK_EXPRESSION_LIMIT); - this.requireConnectivityCheck = flagValue(source, appId, Flags.REQUIRE_CONNECTIVITY_CHECK); this.maxConcurrentMergesPerContentNode = flagValue(source, appId, Flags.MAX_CONCURRENT_MERGES_PER_NODE); this.maxMergeQueueSize = flagValue(source, appId, Flags.MAX_MERGE_QUEUE_SIZE); this.ignoreMergeQueueLimit = flagValue(source, appId, Flags.IGNORE_MERGE_QUEUE_LIMIT); @@ -234,6 +233,7 @@ public class ModelContextImpl implements ModelContext { this.distributorEnhancedMaintenanceScheduling = flagValue(source, appId, Flags.DISTRIBUTOR_ENHANCED_MAINTENANCE_SCHEDULING); this.maxUnCommittedMemory = flagValue(source, appId, Flags.MAX_UNCOMMITTED_MEMORY);; this.forwardIssuesAsErrors = flagValue(source, appId, PermanentFlags.FORWARD_ISSUES_AS_ERRORS); + this.asyncApplyBucketDiff = flagValue(source, appId, Flags.ASYNC_APPLY_BUCKET_DIFF); } @Override public double defaultTermwiseLimit() { return defaultTermwiseLimit; } @@ -254,7 +254,6 @@ public class ModelContextImpl implements ModelContext { return translateJvmOmitStackTraceInFastThrowIntToString(jvmOmitStackTraceInFastThrow, type); } @Override public int largeRankExpressionLimit() { return largeRankExpressionLimit; } - @Override public boolean requireConnectivityCheck() { return requireConnectivityCheck; } @Override public int maxConcurrentMergesPerNode() { return maxConcurrentMergesPerContentNode; } @Override public int maxMergeQueueSize() { return maxMergeQueueSize; } @Override public boolean ignoreMergeQueueLimit() { return ignoreMergeQueueLimit; } @@ -272,6 +271,7 @@ public class ModelContextImpl implements ModelContext { @Override public boolean distributorEnhancedMaintenanceScheduling() { return distributorEnhancedMaintenanceScheduling; } @Override public int maxUnCommittedMemory() { return maxUnCommittedMemory; } @Override public boolean forwardIssuesAsErrors() { return forwardIssuesAsErrors; } + @Override public boolean asyncApplyBucketDiff() { return asyncApplyBucketDiff; } private static <V> V flagValue(FlagSource source, ApplicationId appId, UnboundFlag<? extends V, ?, ?> flag) { return flag.bindTo(source) diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionRepository.java b/configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionRepository.java index 083cb535bfa..0aeea5ce2d5 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionRepository.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionRepository.java @@ -433,7 +433,8 @@ public class SessionRepository { private Optional<RemoteSession> loadSessionIfActive(RemoteSession session) { for (ApplicationId applicationId : applicationRepo.activeApplications()) { - if (applicationRepo.requireActiveSessionOf(applicationId) == session.getSessionId()) { + Optional<Long> activeSession = applicationRepo.activeSessionOf(applicationId); + if (activeSession.isPresent() && activeSession.get() == session.getSessionId()) { log.log(Level.FINE, () -> "Found active application for session " + session.getSessionId() + " , loading it"); applicationRepo.activateApplication(ensureApplicationLoaded(session), session.getSessionId()); log.log(Level.INFO, session.logPre() + "Application activated successfully: " + applicationId + " (generation " + session.getSessionId() + ")"); diff --git a/container-dependency-versions/pom.xml b/container-dependency-versions/pom.xml index 27f7374b802..cc44ada80b4 100644 --- a/container-dependency-versions/pom.xml +++ b/container-dependency-versions/pom.xml @@ -403,8 +403,8 @@ <properties> <aopalliance.version>1.0</aopalliance.version> <bouncycastle.version>1.68</bouncycastle.version> - <felix.version>6.0.3</felix.version> - <felix.log.version>1.0.1</felix.log.version> + <felix.version>7.0.1</felix.version> + <felix.log.version>1.0.1</felix.log.version> <!-- TODO Vespa 8: upgrade! --> <findbugs.version>1.3.9</findbugs.version> <guava.version>20.0</guava.version> <guice.version>3.0</guice.version> diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/application/Endpoint.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/application/Endpoint.java index b441d40c1c3..3a35e3aec7a 100644 --- a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/application/Endpoint.java +++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/application/Endpoint.java @@ -3,6 +3,7 @@ package com.yahoo.vespa.hosted.controller.application; import com.yahoo.config.provision.ApplicationId; import com.yahoo.config.provision.ClusterSpec; +import com.yahoo.config.provision.InstanceName; import com.yahoo.config.provision.RegionName; import com.yahoo.config.provision.SystemName; import com.yahoo.config.provision.zone.RoutingMethod; @@ -13,13 +14,15 @@ import com.yahoo.vespa.hosted.controller.api.identifiers.DeploymentId; import java.net.URI; import java.util.List; import java.util.Objects; +import java.util.Optional; import java.util.function.Predicate; import java.util.stream.Collectors; import java.util.stream.Stream; /** - * Represents an application's endpoint in hosted Vespa. This encapsulates all logic for building URLs and DNS names for - * application endpoints. + * Represents an application or instance endpoint in hosted Vespa. + * + * This encapsulates the logic for building URLs and DNS names for applications in all hosted Vespa systems. * * @author mpolden */ @@ -50,7 +53,7 @@ public class Endpoint { if (scope == Scope.global) { if (id == null) throw new IllegalArgumentException("Endpoint ID must be set for global endpoints"); } else { - if (id != null) throw new IllegalArgumentException("Endpoint ID cannot be set for " + scope + " endpoints"); + if (scope == Scope.zone && id != null) throw new IllegalArgumentException("Endpoint ID cannot be set for " + scope + " endpoints"); if (zones.size() != 1) throw new IllegalArgumentException("A single zone must be given for " + scope + " endpoints"); } this.id = id; @@ -63,12 +66,14 @@ public class Endpoint { this.tls = port.tls; } - private Endpoint(EndpointId id, ClusterSpec.Id cluster, ApplicationId application, List<ZoneId> zones, Scope scope, SystemName system, - Port port, boolean legacy, RoutingMethod routingMethod) { + private Endpoint(EndpointId id, ClusterSpec.Id cluster, TenantAndApplicationId application, + Optional<InstanceName> instance, List<ZoneId> zones, Scope scope, SystemName system, Port port, + boolean legacy, RoutingMethod routingMethod) { this(id, cluster, createUrl(endpointOrClusterAsString(id, cluster), Objects.requireNonNull(application, "application must be non-null"), + Objects.requireNonNull(instance, "instance must be non-null"), zones, scope, Objects.requireNonNull(system, "system must be non-null"), @@ -164,20 +169,21 @@ public class Endpoint { return id == null ? cluster.value() : id.id(); } - private static URI createUrl(String name, ApplicationId application, List<ZoneId> zones, Scope scope, - SystemName system, Port port, boolean legacy, RoutingMethod routingMethod) { + private static URI createUrl(String name, TenantAndApplicationId application, Optional<InstanceName> instance, + List<ZoneId> zones, Scope scope, SystemName system, Port port, boolean legacy, + RoutingMethod routingMethod) { String scheme = port.tls ? "https" : "http"; String separator = separator(system, routingMethod, port.tls); String portPart = port.isDefault() ? "" : ":" + port.port; return URI.create(scheme + "://" + sanitize(namePart(name, separator)) + systemPart(system, separator, legacy) + - sanitize(instancePart(application, separator)) + + sanitize(instancePart(instance, separator)) + sanitize(application.application().value()) + separator + sanitize(application.tenant().value()) + "." + - scopePart(scope, zones, legacy, system) + + scopePart(scope, zones, system, legacy) + dnsSuffix(system, legacy) + portPart + "/"); @@ -199,26 +205,42 @@ public class Endpoint { return name + separator; } - private static String scopePart(Scope scope, List<ZoneId> zones, boolean legacy, SystemName system) { + private static String scopePart(Scope scope, List<ZoneId> zones, SystemName system, boolean legacy) { + String scopeSymbol = scopeSymbol(scope, system, legacy); + if (scope == Scope.global) return scopeSymbol; + + ZoneId zone = zones.get(0); + String region = zone.region().value(); + boolean skipEnvironment = zone.environment().isProduction() && (system.isPublic() || !legacy); + String environment = skipEnvironment ? "" : "." + zone.environment().value(); if (system.isPublic() && !legacy) { - if (scope == Scope.global) return "g"; - var zone = zones.get(0); - var region = zone.region().value(); - char scopeSymbol = scope == Scope.region ? 'r' : 'z'; - String environment = zone.environment().isProduction() ? "" : "." + zone.environment().value(); return region + environment + "." + scopeSymbol; } - if (scope == Scope.global) return "global"; - var zone = zones.get(0); - var region = zone.region().value(); - if (scope == Scope.region) region += "-w"; - if ((system.isPublic() || !legacy) && zone.environment().isProduction()) return region; - return region + "." + zone.environment().value(); + return region + (scopeSymbol.isEmpty() ? "" : "-" + scopeSymbol) + environment; } - private static String instancePart(ApplicationId application, String separator) { - if (application.instance().isDefault()) return ""; // Skip "default" - return application.instance().value() + separator; + private static String scopeSymbol(Scope scope, SystemName system, boolean legacy) { + if (system.isPublic() && !legacy) { + switch (scope) { + case zone: return "z"; + case regionSplit: return "w"; + case region: return "r"; + case global: return "g"; + } + } + switch (scope) { + case zone: return ""; + case regionSplit: return "w"; + case region: return "r"; + case global: return "global"; + } + throw new IllegalArgumentException("No scope symbol defined for " + scope + " in " + system + " (legacy: " + legacy + ")"); + } + + private static String instancePart(Optional<InstanceName> instance, String separator) { + if (instance.isEmpty()) return ""; + if (instance.get().isDefault()) return ""; // Skip "default" + return instance.get().value() + separator; } private static String systemPart(SystemName system, String separator, boolean legacy) { @@ -246,7 +268,7 @@ public class Endpoint { private static String upstreamIdOf(String name, ApplicationId application, ZoneId zone) { return Stream.of(namePart(name, ""), - instancePart(application, ""), + instancePart(Optional.of(application.instance()), ""), application.application().value(), application.tenant().value(), zone.region().value(), @@ -289,12 +311,21 @@ public class Endpoint { /** Endpoint points to one or more zones. Traffic is routed to the zone closest to the client */ global, - /** Endpoint points to one more zones in the same geographical region. Traffic is routed equally across zones */ + /** + * Endpoint points to one more zones in the same geographical region. Traffic is weighted according to + * configured weights. + */ region, /** Endpoint points to a single zone */ zone, + /** + * Endpoint points to one more zones in the same geographical region. Traffic is routed equally across zones. + * + * This is for internal use only. Endpoints with this scope are not exposed directly to tenants. + */ + regionSplit, } /** Represents an endpoint's HTTP port */ @@ -338,9 +369,14 @@ public class Endpoint { } + /** Build an endpoint for given instance */ + public static EndpointBuilder of(ApplicationId instance) { + return new EndpointBuilder(TenantAndApplicationId.from(instance), Optional.of(instance.instance())); + } + /** Build an endpoint for given application */ - public static EndpointBuilder of(ApplicationId application) { - return new EndpointBuilder(application); + public static EndpointBuilder of(TenantAndApplicationId application) { + return new EndpointBuilder(application, Optional.empty()); } /** Create an endpoint for given system application */ @@ -353,7 +389,8 @@ public class Endpoint { public static class EndpointBuilder { - private final ApplicationId application; + private final TenantAndApplicationId application; + private final Optional<InstanceName> instance; private Scope scope; private List<ZoneId> zones; @@ -363,8 +400,9 @@ public class Endpoint { private RoutingMethod routingMethod = RoutingMethod.shared; private boolean legacy = false; - private EndpointBuilder(ApplicationId application) { - this.application = application; + private EndpointBuilder(TenantAndApplicationId application, Optional<InstanceName> instance) { + this.application = Objects.requireNonNull(application); + this.instance = Objects.requireNonNull(instance); } /** Sets the zone target for this */ @@ -402,9 +440,19 @@ public class Endpoint { } /** Sets the region target for this, deduced from given zone */ - public EndpointBuilder targetRegion(ClusterSpec.Id cluster, ZoneId zone) { + public EndpointBuilder targetRegionSplit(ClusterSpec.Id cluster, ZoneId zone) { checkScope(); this.cluster = cluster; + this.scope = Scope.regionSplit; + this.zones = List.of(effectiveZone(zone)); + return this; + } + + /** Sets the region target for this by endpointId, deduced from given zone */ + public EndpointBuilder targetRegion(EndpointId endpointId, ClusterSpec.Id cluster, ZoneId zone) { + checkScope(); + this.endpointId = endpointId; + this.cluster = cluster; this.scope = Scope.region; this.zones = List.of(effectiveZone(zone)); return this; @@ -436,7 +484,10 @@ public class Endpoint { if (routingMethod.isDirect() && !port.isDefault()) { throw new IllegalArgumentException("Routing method " + routingMethod + " can only use default port"); } - return new Endpoint(endpointId, cluster, application, zones, scope, system, port, legacy, routingMethod); + if (scope == Scope.region && instance.isPresent()) { + throw new IllegalArgumentException("Instance cannot be set for scope " + scope); + } + return new Endpoint(endpointId, cluster, application, instance, zones, scope, system, port, legacy, routingMethod); } private void checkScope() { diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/application/pkg/ApplicationPackageValidator.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/application/pkg/ApplicationPackageValidator.java index 3dc95251eb8..bd6376e58aa 100644 --- a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/application/pkg/ApplicationPackageValidator.java +++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/application/pkg/ApplicationPackageValidator.java @@ -102,7 +102,7 @@ public class ApplicationPackageValidator { instant)); } - /** Verify that compactable endpoint parts (instance aname nd endpoint ID) do not clash */ + /** Verify that compactable endpoint parts (instance name and endpoint ID) do not clash */ private void validateCompactedEndpoint(ApplicationPackage applicationPackage) { Map<List<String>, InstanceEndpoint> instanceEndpoints = new HashMap<>(); for (var instanceSpec : applicationPackage.deploymentSpec().instances()) { diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/restapi/application/ApplicationApiHandler.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/restapi/application/ApplicationApiHandler.java index 59d18fcf17b..51911d62ef4 100644 --- a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/restapi/application/ApplicationApiHandler.java +++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/restapi/application/ApplicationApiHandler.java @@ -2698,6 +2698,7 @@ public class ApplicationApiHandler extends AuditLoggingRequestHandler { private static String endpointScopeString(Endpoint.Scope scope) { switch (scope) { case region: return "region"; + case regionSplit: return "regionSplit"; case global: return "global"; case zone: return "zone"; } diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/routing/RoutingPolicy.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/routing/RoutingPolicy.java index ec9cf4064c9..f93e24f969e 100644 --- a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/routing/RoutingPolicy.java +++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/routing/RoutingPolicy.java @@ -111,7 +111,7 @@ public class RoutingPolicy { /** Returns the region endpoint of this */ public Endpoint regionEndpointIn(SystemName system, RoutingMethod routingMethod, boolean legacy) { - Endpoint.EndpointBuilder endpoint = endpoint(routingMethod).targetRegion(id.cluster(), id.zone()); + Endpoint.EndpointBuilder endpoint = endpoint(routingMethod).targetRegionSplit(id.cluster(), id.zone()); if (legacy) { endpoint = endpoint.legacy(); } diff --git a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/application/EndpointTest.java b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/application/EndpointTest.java index da641d17a8a..5d985518809 100644 --- a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/application/EndpointTest.java +++ b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/application/EndpointTest.java @@ -20,8 +20,10 @@ import static org.junit.Assert.assertEquals; */ public class EndpointTest { - private static final ApplicationId app1 = ApplicationId.from("t1", "a1", "default"); - private static final ApplicationId app2 = ApplicationId.from("t2", "a2", "i2"); + private static final ApplicationId instance1 = ApplicationId.from("t1", "a1", "default"); + private static final ApplicationId instance2 = ApplicationId.from("t2", "a2", "i2"); + private static final TenantAndApplicationId app1 = TenantAndApplicationId.from(instance1); + private static final TenantAndApplicationId app2 = TenantAndApplicationId.from(instance2); @Test public void global_endpoints() { @@ -30,62 +32,62 @@ public class EndpointTest { Map<String, Endpoint> tests = Map.of( // Legacy endpoint "http://a1.t1.global.vespa.yahooapis.com:4080/", - Endpoint.of(app1).target(endpointId).on(Port.plain(4080)).legacy().in(SystemName.main), + Endpoint.of(instance1).target(endpointId).on(Port.plain(4080)).legacy().in(SystemName.main), // Legacy endpoint with TLS "https://a1--t1.global.vespa.yahooapis.com:4443/", - Endpoint.of(app1).target(endpointId).on(Port.tls(4443)).legacy().in(SystemName.main), + Endpoint.of(instance1).target(endpointId).on(Port.tls(4443)).legacy().in(SystemName.main), // Main endpoint "https://a1--t1.global.vespa.oath.cloud:4443/", - Endpoint.of(app1).target(endpointId).on(Port.tls(4443)).in(SystemName.main), + Endpoint.of(instance1).target(endpointId).on(Port.tls(4443)).in(SystemName.main), // Main endpoint in CD "https://cd--a1--t1.global.vespa.oath.cloud:4443/", - Endpoint.of(app1).target(endpointId).on(Port.tls(4443)).in(SystemName.cd), + Endpoint.of(instance1).target(endpointId).on(Port.tls(4443)).in(SystemName.cd), // Main endpoint in CD "https://cd--i2--a2--t2.global.vespa.oath.cloud:4443/", - Endpoint.of(app2).target(endpointId).on(Port.tls(4443)).in(SystemName.cd), + Endpoint.of(instance2).target(endpointId).on(Port.tls(4443)).in(SystemName.cd), // Main endpoint with direct routing and default TLS port "https://a1.t1.global.vespa.oath.cloud/", - Endpoint.of(app1).target(endpointId).on(Port.tls()).routingMethod(RoutingMethod.exclusive).in(SystemName.main), + Endpoint.of(instance1).target(endpointId).on(Port.tls()).routingMethod(RoutingMethod.exclusive).in(SystemName.main), // Main endpoint with custom rotation name "https://r1.a1.t1.global.vespa.oath.cloud/", - Endpoint.of(app1).target(EndpointId.of("r1")).on(Port.tls()).routingMethod(RoutingMethod.exclusive).in(SystemName.main), + Endpoint.of(instance1).target(EndpointId.of("r1")).on(Port.tls()).routingMethod(RoutingMethod.exclusive).in(SystemName.main), // Main endpoint for custom instance in default rotation "https://i2.a2.t2.global.vespa.oath.cloud/", - Endpoint.of(app2).target(endpointId).on(Port.tls()).routingMethod(RoutingMethod.exclusive).in(SystemName.main), + Endpoint.of(instance2).target(endpointId).on(Port.tls()).routingMethod(RoutingMethod.exclusive).in(SystemName.main), // Main endpoint for custom instance with custom rotation name "https://r2.i2.a2.t2.global.vespa.oath.cloud/", - Endpoint.of(app2).target(EndpointId.of("r2")).on(Port.tls()).routingMethod(RoutingMethod.exclusive).in(SystemName.main), + Endpoint.of(instance2).target(EndpointId.of("r2")).on(Port.tls()).routingMethod(RoutingMethod.exclusive).in(SystemName.main), // Main endpoint in public system (legacy) "https://a1.t1.global.public.vespa.oath.cloud/", - Endpoint.of(app1).target(endpointId).on(Port.tls()).routingMethod(RoutingMethod.exclusive).legacy().in(SystemName.Public) + Endpoint.of(instance1).target(endpointId).on(Port.tls()).routingMethod(RoutingMethod.exclusive).legacy().in(SystemName.Public) ); tests.forEach((expected, endpoint) -> assertEquals(expected, endpoint.url().toString())); Map<String, Endpoint> tests2 = Map.of( // Main endpoint in public CD system (legacy) "https://publiccd.a1.t1.global.public-cd.vespa.oath.cloud/", - Endpoint.of(app1).target(endpointId).on(Port.tls()).routingMethod(RoutingMethod.exclusive).legacy().in(SystemName.PublicCd), + Endpoint.of(instance1).target(endpointId).on(Port.tls()).routingMethod(RoutingMethod.exclusive).legacy().in(SystemName.PublicCd), // Default endpoint in public system "https://a1.t1.g.vespa-app.cloud/", - Endpoint.of(app1).target(endpointId).on(Port.tls()).routingMethod(RoutingMethod.exclusive).in(SystemName.Public), + Endpoint.of(instance1).target(endpointId).on(Port.tls()).routingMethod(RoutingMethod.exclusive).in(SystemName.Public), // Default endpoint in public CD system "https://a1.t1.g.cd.vespa-app.cloud/", - Endpoint.of(app1).target(endpointId).on(Port.tls()).routingMethod(RoutingMethod.exclusive).in(SystemName.PublicCd), + Endpoint.of(instance1).target(endpointId).on(Port.tls()).routingMethod(RoutingMethod.exclusive).in(SystemName.PublicCd), // Custom instance in public system "https://i2.a2.t2.g.vespa-app.cloud/", - Endpoint.of(app2).target(endpointId).on(Port.tls()).routingMethod(RoutingMethod.exclusive).in(SystemName.Public) + Endpoint.of(instance2).target(endpointId).on(Port.tls()).routingMethod(RoutingMethod.exclusive).in(SystemName.Public) ); tests2.forEach((expected, endpoint) -> assertEquals(expected, endpoint.url().toString())); } @@ -97,54 +99,54 @@ public class EndpointTest { Map<String, Endpoint> tests = Map.of( // Legacy endpoint "http://a1.t1.global.vespa.yahooapis.com:4080/", - Endpoint.of(app1).target(endpointId).on(Port.plain(4080)).legacy().in(SystemName.main), + Endpoint.of(instance1).target(endpointId).on(Port.plain(4080)).legacy().in(SystemName.main), // Legacy endpoint with TLS "https://a1--t1.global.vespa.yahooapis.com:4443/", - Endpoint.of(app1).target(endpointId).on(Port.tls(4443)).legacy().in(SystemName.main), + Endpoint.of(instance1).target(endpointId).on(Port.tls(4443)).legacy().in(SystemName.main), // Main endpoint "https://a1--t1.global.vespa.oath.cloud:4443/", - Endpoint.of(app1).target(endpointId).on(Port.tls(4443)).in(SystemName.main), + Endpoint.of(instance1).target(endpointId).on(Port.tls(4443)).in(SystemName.main), // Main endpoint in CD "https://cd--i2--a2--t2.global.vespa.oath.cloud:4443/", - Endpoint.of(app2).target(endpointId).on(Port.tls(4443)).in(SystemName.cd), + Endpoint.of(instance2).target(endpointId).on(Port.tls(4443)).in(SystemName.cd), // Main endpoint in CD "https://cd--a1--t1.global.vespa.oath.cloud:4443/", - Endpoint.of(app1).target(endpointId).on(Port.tls(4443)).in(SystemName.cd), + Endpoint.of(instance1).target(endpointId).on(Port.tls(4443)).in(SystemName.cd), // Main endpoint with direct routing and default TLS port "https://a1.t1.global.vespa.oath.cloud/", - Endpoint.of(app1).target(endpointId).on(Port.tls()).routingMethod(RoutingMethod.exclusive).in(SystemName.main), + Endpoint.of(instance1).target(endpointId).on(Port.tls()).routingMethod(RoutingMethod.exclusive).in(SystemName.main), // Main endpoint with custom rotation name "https://r1.a1.t1.global.vespa.oath.cloud/", - Endpoint.of(app1).target(EndpointId.of("r1")).on(Port.tls()).routingMethod(RoutingMethod.exclusive).in(SystemName.main), + Endpoint.of(instance1).target(EndpointId.of("r1")).on(Port.tls()).routingMethod(RoutingMethod.exclusive).in(SystemName.main), // Main endpoint for custom instance in default rotation "https://i2.a2.t2.global.vespa.oath.cloud/", - Endpoint.of(app2).target(endpointId).on(Port.tls()).routingMethod(RoutingMethod.exclusive).in(SystemName.main), + Endpoint.of(instance2).target(endpointId).on(Port.tls()).routingMethod(RoutingMethod.exclusive).in(SystemName.main), // Main endpoint for custom instance with custom rotation name "https://r2.i2.a2.t2.global.vespa.oath.cloud/", - Endpoint.of(app2).target(EndpointId.of("r2")).on(Port.tls()).routingMethod(RoutingMethod.exclusive).in(SystemName.main), + Endpoint.of(instance2).target(EndpointId.of("r2")).on(Port.tls()).routingMethod(RoutingMethod.exclusive).in(SystemName.main), // Main endpoint in public system (legacy) "https://a1.t1.global.public.vespa.oath.cloud/", - Endpoint.of(app1).target(endpointId).on(Port.tls()).routingMethod(RoutingMethod.exclusive).legacy().in(SystemName.Public) + Endpoint.of(instance1).target(endpointId).on(Port.tls()).routingMethod(RoutingMethod.exclusive).legacy().in(SystemName.Public) ); tests.forEach((expected, endpoint) -> assertEquals(expected, endpoint.url().toString())); Map<String, Endpoint> tests2 = Map.of( // Custom endpoint and instance in public CD system (legacy) "https://foo.publiccd.i2.a2.t2.global.public-cd.vespa.oath.cloud/", - Endpoint.of(app2).target(EndpointId.of("foo")).on(Port.tls()).routingMethod(RoutingMethod.exclusive).legacy().in(SystemName.PublicCd), + Endpoint.of(instance2).target(EndpointId.of("foo")).on(Port.tls()).routingMethod(RoutingMethod.exclusive).legacy().in(SystemName.PublicCd), // Custom endpoint and instance in public system "https://foo.i2.a2.t2.g.vespa-app.cloud/", - Endpoint.of(app2).target(EndpointId.of("foo")).on(Port.tls()).routingMethod(RoutingMethod.exclusive).in(SystemName.Public) + Endpoint.of(instance2).target(EndpointId.of("foo")).on(Port.tls()).routingMethod(RoutingMethod.exclusive).in(SystemName.Public) ); tests2.forEach((expected, endpoint) -> assertEquals(expected, endpoint.url().toString())); } @@ -158,62 +160,62 @@ public class EndpointTest { Map<String, Endpoint> tests = Map.of( // Legacy endpoint (always contains environment) "http://a1.t1.us-north-1.prod.vespa.yahooapis.com:4080/", - Endpoint.of(app1).target(cluster, prodZone).on(Port.plain(4080)).legacy().in(SystemName.main), + Endpoint.of(instance1).target(cluster, prodZone).on(Port.plain(4080)).legacy().in(SystemName.main), // Secure legacy endpoint "https://a1--t1.us-north-1.prod.vespa.yahooapis.com:4443/", - Endpoint.of(app1).target(cluster, prodZone).on(Port.tls(4443)).legacy().in(SystemName.main), + Endpoint.of(instance1).target(cluster, prodZone).on(Port.tls(4443)).legacy().in(SystemName.main), // Prod endpoint in main "https://a1--t1.us-north-1.vespa.oath.cloud:4443/", - Endpoint.of(app1).target(cluster, prodZone).on(Port.tls(4443)).in(SystemName.main), + Endpoint.of(instance1).target(cluster, prodZone).on(Port.tls(4443)).in(SystemName.main), // Prod endpoint in CD "https://cd--a1--t1.us-north-1.vespa.oath.cloud:4443/", - Endpoint.of(app1).target(cluster, prodZone).on(Port.tls(4443)).in(SystemName.cd), + Endpoint.of(instance1).target(cluster, prodZone).on(Port.tls(4443)).in(SystemName.cd), // Test endpoint in main "https://a1--t1.us-north-2.test.vespa.oath.cloud:4443/", - Endpoint.of(app1).target(cluster, testZone).on(Port.tls(4443)).in(SystemName.main), + Endpoint.of(instance1).target(cluster, testZone).on(Port.tls(4443)).in(SystemName.main), // Non-default cluster in main "https://c1--a1--t1.us-north-1.vespa.oath.cloud/", - Endpoint.of(app1).target(ClusterSpec.Id.from("c1"), prodZone).on(Port.tls()).in(SystemName.main), + Endpoint.of(instance1).target(ClusterSpec.Id.from("c1"), prodZone).on(Port.tls()).in(SystemName.main), // Non-default instance in main "https://i2--a2--t2.us-north-1.vespa.oath.cloud:4443/", - Endpoint.of(app2).target(cluster, prodZone).on(Port.tls(4443)).in(SystemName.main), + Endpoint.of(instance2).target(cluster, prodZone).on(Port.tls(4443)).in(SystemName.main), // Non-default cluster in public (legacy) "https://c1.a1.t1.us-north-1.public.vespa.oath.cloud/", - Endpoint.of(app1).target(ClusterSpec.Id.from("c1"), prodZone).on(Port.tls()).routingMethod(RoutingMethod.exclusive).legacy().in(SystemName.Public), + Endpoint.of(instance1).target(ClusterSpec.Id.from("c1"), prodZone).on(Port.tls()).routingMethod(RoutingMethod.exclusive).legacy().in(SystemName.Public), // Non-default cluster and instance in public (legacy) "https://c2.i2.a2.t2.us-north-1.public.vespa.oath.cloud/", - Endpoint.of(app2).target(ClusterSpec.Id.from("c2"), prodZone).on(Port.tls()).routingMethod(RoutingMethod.exclusive).legacy().in(SystemName.Public), + Endpoint.of(instance2).target(ClusterSpec.Id.from("c2"), prodZone).on(Port.tls()).routingMethod(RoutingMethod.exclusive).legacy().in(SystemName.Public), // Endpoint in main using shared layer 4 "https://a1.t1.us-north-1.vespa.oath.cloud/", - Endpoint.of(app1).target(cluster, prodZone).on(Port.tls()).routingMethod(RoutingMethod.sharedLayer4).in(SystemName.main) + Endpoint.of(instance1).target(cluster, prodZone).on(Port.tls()).routingMethod(RoutingMethod.sharedLayer4).in(SystemName.main) ); tests.forEach((expected, endpoint) -> assertEquals(expected, endpoint.url().toString())); Map<String, Endpoint> tests2 = Map.of( // Non-default cluster and instance in public CD (legacy) "https://c2.publiccd.i2.a2.t2.us-north-1.public-cd.vespa.oath.cloud/", - Endpoint.of(app2).target(ClusterSpec.Id.from("c2"), prodZone).on(Port.tls()).routingMethod(RoutingMethod.exclusive).legacy().in(SystemName.PublicCd), + Endpoint.of(instance2).target(ClusterSpec.Id.from("c2"), prodZone).on(Port.tls()).routingMethod(RoutingMethod.exclusive).legacy().in(SystemName.PublicCd), // Custom cluster name in public "https://c1.a1.t1.us-north-1.z.vespa-app.cloud/", - Endpoint.of(app1).target(ClusterSpec.Id.from("c1"), prodZone).on(Port.tls()).routingMethod(RoutingMethod.exclusive).in(SystemName.Public), + Endpoint.of(instance1).target(ClusterSpec.Id.from("c1"), prodZone).on(Port.tls()).routingMethod(RoutingMethod.exclusive).in(SystemName.Public), // Default cluster name in non-production zone in public "https://a1.t1.us-north-2.test.z.vespa-app.cloud/", - Endpoint.of(app1).target(ClusterSpec.Id.from("default"), testZone).on(Port.tls()).routingMethod(RoutingMethod.exclusive).in(SystemName.Public), + Endpoint.of(instance1).target(ClusterSpec.Id.from("default"), testZone).on(Port.tls()).routingMethod(RoutingMethod.exclusive).in(SystemName.Public), // Default cluster name in public CD "https://a1.t1.us-north-1.z.cd.vespa-app.cloud/", - Endpoint.of(app1).target(ClusterSpec.Id.from("default"), prodZone).on(Port.tls()).routingMethod(RoutingMethod.exclusive).in(SystemName.PublicCd) + Endpoint.of(instance1).target(ClusterSpec.Id.from("default"), prodZone).on(Port.tls()).routingMethod(RoutingMethod.exclusive).in(SystemName.PublicCd) ); tests2.forEach((expected, endpoint) -> assertEquals(expected, endpoint.url().toString())); } @@ -227,7 +229,7 @@ public class EndpointTest { var tests = Map.of( // Default rotation "https://a1.t1.global.public.vespa.oath.cloud/", - Endpoint.of(app1) + Endpoint.of(instance1) .target(EndpointId.defaultId()) .routingMethod(RoutingMethod.exclusive) .on(Port.tls()) @@ -236,7 +238,7 @@ public class EndpointTest { // Wildcard to match other rotations "https://*.a1.t1.global.public.vespa.oath.cloud/", - Endpoint.of(app1) + Endpoint.of(instance1) .wildcard() .routingMethod(RoutingMethod.exclusive) .on(Port.tls()) @@ -245,7 +247,7 @@ public class EndpointTest { // Default cluster in zone "https://a1.t1.us-north-1.public.vespa.oath.cloud/", - Endpoint.of(app1) + Endpoint.of(instance1) .target(defaultCluster, prodZone) .routingMethod(RoutingMethod.exclusive) .on(Port.tls()) @@ -254,7 +256,7 @@ public class EndpointTest { // Wildcard to match other clusters in zone "https://*.a1.t1.us-north-1.public.vespa.oath.cloud/", - Endpoint.of(app1) + Endpoint.of(instance1) .wildcard(prodZone) .routingMethod(RoutingMethod.exclusive) .on(Port.tls()) @@ -263,7 +265,7 @@ public class EndpointTest { // Default cluster in test zone "https://a1.t1.us-north-2.test.public.vespa.oath.cloud/", - Endpoint.of(app1) + Endpoint.of(instance1) .target(defaultCluster, testZone) .routingMethod(RoutingMethod.exclusive) .on(Port.tls()) @@ -272,7 +274,7 @@ public class EndpointTest { // Wildcard to match other clusters in test zone "https://*.a1.t1.us-north-2.test.public.vespa.oath.cloud/", - Endpoint.of(app1) + Endpoint.of(instance1) .wildcard(testZone) .routingMethod(RoutingMethod.exclusive) .on(Port.tls()) @@ -281,7 +283,7 @@ public class EndpointTest { // Wildcard to match other clusters in zone "https://*.a1.t1.us-north-1.z.vespa-app.cloud/", - Endpoint.of(app1) + Endpoint.of(instance1) .wildcard(prodZone) .routingMethod(RoutingMethod.exclusive) .on(Port.tls()) @@ -296,37 +298,66 @@ public class EndpointTest { var cluster = ClusterSpec.Id.from("default"); var prodZone = ZoneId.from("prod", "us-north-2"); Map<String, Endpoint> tests = Map.of( - "https://a1.t1.us-north-1-w.public.vespa.oath.cloud/", + "https://r0.a1.t1.us-north-2-r.vespa.oath.cloud/", Endpoint.of(app1) - .targetRegion(cluster, ZoneId.from("prod", "us-north-1a")) + .targetRegion(EndpointId.of("r0"), ClusterSpec.Id.from("c1"), prodZone) + .routingMethod(RoutingMethod.sharedLayer4) + .on(Port.tls()) + .in(SystemName.main), + "https://r1.a2.t2.us-north-2.r.vespa-app.cloud/", + Endpoint.of(app2) + .targetRegion(EndpointId.of("r1"), ClusterSpec.Id.from("c1"), prodZone) + .routingMethod(RoutingMethod.exclusive) + .on(Port.tls()) + .in(SystemName.Public) + ); + tests.forEach((expected, endpoint) -> assertEquals(expected, endpoint.url().toString())); + Endpoint endpoint = Endpoint.of(instance1) + .targetRegionSplit(cluster, ZoneId.from("prod", "us-north-1a")) + .routingMethod(RoutingMethod.exclusive) + .on(Port.tls()) + .in(SystemName.main); + assertEquals("Availability zone is removed from region", + "us-north-1", + endpoint.zones().get(0).region().value()); + } + + @Test + public void region_split_endpoints() { + var cluster = ClusterSpec.Id.from("default"); + var prodZone = ZoneId.from("prod", "us-north-2"); + Map<String, Endpoint> tests = Map.of( + "https://a1.t1.us-north-1-w.public.vespa.oath.cloud/", + Endpoint.of(instance1) + .targetRegionSplit(cluster, ZoneId.from("prod", "us-north-1a")) .routingMethod(RoutingMethod.exclusive) .on(Port.tls()) .legacy() .in(SystemName.Public), "https://a1.t1.us-north-2-w.public.vespa.oath.cloud/", - Endpoint.of(app1) - .targetRegion(cluster, prodZone) + Endpoint.of(instance1) + .targetRegionSplit(cluster, prodZone) .routingMethod(RoutingMethod.exclusive) .on(Port.tls()) .legacy() .in(SystemName.Public), "https://a1.t1.us-north-2-w.test.public.vespa.oath.cloud/", - Endpoint.of(app1) - .targetRegion(cluster, ZoneId.from("test", "us-north-2")) + Endpoint.of(instance1) + .targetRegionSplit(cluster, ZoneId.from("test", "us-north-2")) .routingMethod(RoutingMethod.exclusive) .on(Port.tls()) .legacy() .in(SystemName.Public), - "https://c1.a1.t1.us-north-2.r.vespa-app.cloud/", - Endpoint.of(app1) - .targetRegion(ClusterSpec.Id.from("c1"), prodZone) + "https://c1.a1.t1.us-north-2.w.vespa-app.cloud/", + Endpoint.of(instance1) + .targetRegionSplit(ClusterSpec.Id.from("c1"), prodZone) .routingMethod(RoutingMethod.exclusive) .on(Port.tls()) .in(SystemName.Public) ); tests.forEach((expected, endpoint) -> assertEquals(expected, endpoint.url().toString())); - Endpoint endpoint = Endpoint.of(app1) - .targetRegion(cluster, ZoneId.from("prod", "us-north-1a")) + Endpoint endpoint = Endpoint.of(instance1) + .targetRegionSplit(cluster, ZoneId.from("prod", "us-north-1a")) .routingMethod(RoutingMethod.exclusive) .on(Port.tls()) .in(SystemName.main); @@ -341,23 +372,23 @@ public class EndpointTest { var tests1 = Map.of( // With default cluster "a1.t1.us-north-1.prod", - Endpoint.of(app1).target(EndpointId.defaultId()).on(Port.tls(4443)).in(SystemName.main), + Endpoint.of(instance1).target(EndpointId.defaultId()).on(Port.tls(4443)).in(SystemName.main), // With non-default cluster "c1.a1.t1.us-north-1.prod", - Endpoint.of(app1).target(EndpointId.of("ignored1"), ClusterSpec.Id.from("c1"), List.of(zone)).on(Port.tls(4443)).in(SystemName.main) + Endpoint.of(instance1).target(EndpointId.of("ignored1"), ClusterSpec.Id.from("c1"), List.of(zone)).on(Port.tls(4443)).in(SystemName.main) ); var tests2 = Map.of( // With non-default instance and default cluster "i2.a2.t2.us-north-1.prod", - Endpoint.of(app2).target(EndpointId.defaultId(), ClusterSpec.Id.from("default"), List.of(zone)).on(Port.tls(4443)).in(SystemName.main), + Endpoint.of(instance2).target(EndpointId.defaultId(), ClusterSpec.Id.from("default"), List.of(zone)).on(Port.tls(4443)).in(SystemName.main), // With non-default instance and cluster "c2.i2.a2.t2.us-north-1.prod", - Endpoint.of(app2).target(EndpointId.of("ignored2"), ClusterSpec.Id.from("c2"), List.of(zone)).on(Port.tls(4443)).in(SystemName.main) + Endpoint.of(instance2).target(EndpointId.of("ignored2"), ClusterSpec.Id.from("c2"), List.of(zone)).on(Port.tls(4443)).in(SystemName.main) ); - tests1.forEach((expected, endpoint) -> assertEquals(expected, endpoint.upstreamIdOf(new DeploymentId(app1, zone)))); - tests2.forEach((expected, endpoint) -> assertEquals(expected, endpoint.upstreamIdOf(new DeploymentId(app2, zone)))); + tests1.forEach((expected, endpoint) -> assertEquals(expected, endpoint.upstreamIdOf(new DeploymentId(instance1, zone)))); + tests2.forEach((expected, endpoint) -> assertEquals(expected, endpoint.upstreamIdOf(new DeploymentId(instance2, zone)))); } } diff --git a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/routing/RoutingPoliciesTest.java b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/routing/RoutingPoliciesTest.java index 2002b59dc1e..c678a83bf2c 100644 --- a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/routing/RoutingPoliciesTest.java +++ b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/routing/RoutingPoliciesTest.java @@ -364,11 +364,11 @@ public class RoutingPoliciesTest { Map.of(zone1, 1L, zone2, 1L), true); assertEquals("Registers expected DNS names", Set.of("app1.tenant1.aws-eu-west-1-w.public.vespa.oath.cloud", - "app1.tenant1.aws-eu-west-1.r.vespa-app.cloud", + "app1.tenant1.aws-eu-west-1.w.vespa-app.cloud", "app1.tenant1.aws-eu-west-1a.public.vespa.oath.cloud", "app1.tenant1.aws-eu-west-1a.z.vespa-app.cloud", "app1.tenant1.aws-us-east-1-w.public.vespa.oath.cloud", - "app1.tenant1.aws-us-east-1.r.vespa-app.cloud", + "app1.tenant1.aws-us-east-1.w.vespa-app.cloud", "app1.tenant1.aws-us-east-1c.public.vespa.oath.cloud", "app1.tenant1.aws-us-east-1c.z.vespa-app.cloud", "app1.tenant1.g.vespa-app.cloud", @@ -837,7 +837,7 @@ public class RoutingPoliciesTest { DeploymentId deployment = new DeploymentId(application, zone); EndpointList regionEndpoints = tester.controller().routing().endpointsOf(deployment) .cluster(cluster) - .scope(Endpoint.Scope.region); + .scope(Endpoint.Scope.regionSplit); if (!legacy) { regionEndpoints = regionEndpoints.not().legacy(); } diff --git a/filedistribution/pom.xml b/filedistribution/pom.xml index b9fda41e7f3..e5c4e921e85 100644 --- a/filedistribution/pom.xml +++ b/filedistribution/pom.xml @@ -26,6 +26,12 @@ </dependency> <dependency> <groupId>com.yahoo.vespa</groupId> + <artifactId>http-utils</artifactId> + <version>${project.version}</version> + <scope>compile</scope> + </dependency> + <dependency> + <groupId>com.yahoo.vespa</groupId> <artifactId>vespajlib</artifactId> <version>${project.version}</version> </dependency> diff --git a/flags/src/main/java/com/yahoo/vespa/flags/Flags.java b/flags/src/main/java/com/yahoo/vespa/flags/Flags.java index 39193db9e60..4b7d05928c0 100644 --- a/flags/src/main/java/com/yahoo/vespa/flags/Flags.java +++ b/flags/src/main/java/com/yahoo/vespa/flags/Flags.java @@ -88,7 +88,7 @@ public class Flags { public static final UnboundIntFlag MAX_UNCOMMITTED_MEMORY = defineIntFlag( "max-uncommitted-memory", 130000, List.of("geirst, baldersheim"), "2021-10-21", "2022-01-01", - "The task limit used by the executors handling feed in proton", + "Max amount of memory holding updates to an attribute before we do a commit.", "Takes effect at redeployment", ZONE_ID, APPLICATION_ID); @@ -369,6 +369,13 @@ public class Flags { "Takes effect at redeploy", ZONE_ID, APPLICATION_ID); + public static final UnboundBooleanFlag ASYNC_APPLY_BUCKET_DIFF = defineFeatureFlag( + "async-apply-bucket-diff", false, + List.of("geirst", "vekterli"), "2021-10-22", "2022-01-31", + "Whether portions of apply bucket diff handling will be performed asynchronously", + "Takes effect at redeploy", + ZONE_ID, APPLICATION_ID); + /** WARNING: public for testing: All flags should be defined in {@link Flags}. */ public static UnboundBooleanFlag defineFeatureFlag(String flagId, boolean defaultValue, List<String> owners, String createdAt, String expiresAt, String description, diff --git a/jdisc_core/src/main/java/com/yahoo/jdisc/core/FelixParams.java b/jdisc_core/src/main/java/com/yahoo/jdisc/core/FelixParams.java index 402c6c288f3..90f7d7aab01 100644 --- a/jdisc_core/src/main/java/com/yahoo/jdisc/core/FelixParams.java +++ b/jdisc_core/src/main/java/com/yahoo/jdisc/core/FelixParams.java @@ -54,7 +54,6 @@ public class FelixParams { Map<String, String> ret = new HashMap<>(); ret.put(BundleCache.CACHE_ROOTDIR_PROP, cachePath); ret.put(Constants.FRAMEWORK_SYSTEMPACKAGES, exportPackages.toString()); - ret.put(Constants.SUPPORTS_BOOTCLASSPATH_EXTENSION, "true"); ret.put(Constants.FRAMEWORK_BOOTDELEGATION, "com.yourkit.runtime,com.yourkit.probes,com.yourkit.probes.builtin,com.singularity.*"); ret.put(Constants.FRAMEWORK_BSNVERSION, Constants.FRAMEWORK_BSNVERSION_MANAGED); return ret; diff --git a/jdisc_core/src/main/java/com/yahoo/jdisc/core/OsgiLogHandler.java b/jdisc_core/src/main/java/com/yahoo/jdisc/core/OsgiLogHandler.java index 50ac90b6181..b7993de5d82 100644 --- a/jdisc_core/src/main/java/com/yahoo/jdisc/core/OsgiLogHandler.java +++ b/jdisc_core/src/main/java/com/yahoo/jdisc/core/OsgiLogHandler.java @@ -168,5 +168,8 @@ class OsgiLogHandler extends Handler { return new Hashtable<>(); } + @Override + public <A> A adapt(Class<A> aClass) { return null; } + } } diff --git a/persistence/src/vespa/persistence/spi/catchresult.cpp b/persistence/src/vespa/persistence/spi/catchresult.cpp index 3dbe8cfdf7e..366e439cc2d 100644 --- a/persistence/src/vespa/persistence/spi/catchresult.cpp +++ b/persistence/src/vespa/persistence/spi/catchresult.cpp @@ -13,7 +13,7 @@ CatchResult::CatchResult() CatchResult::~CatchResult() = default; void -CatchResult::onComplete(std::unique_ptr<Result> result) { +CatchResult::onComplete(std::unique_ptr<Result> result) noexcept { _promisedResult.set_value(std::move(result)); } void diff --git a/persistence/src/vespa/persistence/spi/catchresult.h b/persistence/src/vespa/persistence/spi/catchresult.h index 80d4f863971..02c626ea23e 100644 --- a/persistence/src/vespa/persistence/spi/catchresult.h +++ b/persistence/src/vespa/persistence/spi/catchresult.h @@ -12,7 +12,7 @@ public: std::future<std::unique_ptr<Result>> future_result() { return _promisedResult.get_future(); } - void onComplete(std::unique_ptr<Result> result) override; + void onComplete(std::unique_ptr<Result> result) noexcept override; void addResultHandler(const ResultHandler * resultHandler) override; private: std::promise<std::unique_ptr<Result>> _promisedResult; diff --git a/persistence/src/vespa/persistence/spi/operationcomplete.h b/persistence/src/vespa/persistence/spi/operationcomplete.h index fd6b846b2a4..fd4d5714cbc 100644 --- a/persistence/src/vespa/persistence/spi/operationcomplete.h +++ b/persistence/src/vespa/persistence/spi/operationcomplete.h @@ -23,7 +23,7 @@ class OperationComplete public: using UP = std::unique_ptr<OperationComplete>; virtual ~OperationComplete() = default; - virtual void onComplete(std::unique_ptr<Result> result) = 0; + virtual void onComplete(std::unique_ptr<Result> result) noexcept = 0; virtual void addResultHandler(const ResultHandler * resultHandler) = 0; }; diff --git a/searchcore/src/vespa/searchcore/bmcluster/spi_bm_feed_handler.cpp b/searchcore/src/vespa/searchcore/bmcluster/spi_bm_feed_handler.cpp index 8ad120e7d05..eccae8fe8ab 100644 --- a/searchcore/src/vespa/searchcore/bmcluster/spi_bm_feed_handler.cpp +++ b/searchcore/src/vespa/searchcore/bmcluster/spi_bm_feed_handler.cpp @@ -39,7 +39,7 @@ class MyOperationComplete : public storage::spi::OperationComplete public: MyOperationComplete(PersistenceProvider* provider, std::atomic<uint32_t> &errors, const Bucket& bucket, PendingTracker& tracker); ~MyOperationComplete() override; - void onComplete(std::unique_ptr<storage::spi::Result> result) override; + void onComplete(std::unique_ptr<storage::spi::Result> result) noexcept override; void addResultHandler(const storage::spi::ResultHandler* resultHandler) override; }; @@ -58,7 +58,7 @@ MyOperationComplete::~MyOperationComplete() } void -MyOperationComplete::onComplete(std::unique_ptr<storage::spi::Result> result) +MyOperationComplete::onComplete(std::unique_ptr<storage::spi::Result> result) noexcept { if (result->hasError()) { ++_errors; diff --git a/storage/src/tests/persistence/apply_bucket_diff_state_test.cpp b/storage/src/tests/persistence/apply_bucket_diff_state_test.cpp index fe77477fa77..d51485df38d 100644 --- a/storage/src/tests/persistence/apply_bucket_diff_state_test.cpp +++ b/storage/src/tests/persistence/apply_bucket_diff_state_test.cpp @@ -11,6 +11,8 @@ using document::DocumentId; using document::test::makeDocumentBucket; +using vespalib::MonitoredRefCount; +using vespalib::RetainGuard; namespace storage { @@ -72,6 +74,7 @@ class ApplyBucketDiffStateTestBase : public ::testing::Test public: uint32_t sync_count; DummyMergeBucketInfoSyncer syncer; + MonitoredRefCount monitored_ref_count; ApplyBucketDiffStateTestBase() : ::testing::Test(), @@ -83,7 +86,7 @@ public: ~ApplyBucketDiffStateTestBase(); std::unique_ptr<ApplyBucketDiffState> make_state() { - return std::make_unique<ApplyBucketDiffState>(syncer, spi::Bucket(dummy_document_bucket)); + return std::make_unique<ApplyBucketDiffState>(syncer, spi::Bucket(dummy_document_bucket), RetainGuard(monitored_ref_count)); } }; diff --git a/storage/src/tests/persistence/mergehandlertest.cpp b/storage/src/tests/persistence/mergehandlertest.cpp index 60030004594..017b8ce2b92 100644 --- a/storage/src/tests/persistence/mergehandlertest.cpp +++ b/storage/src/tests/persistence/mergehandlertest.cpp @@ -20,7 +20,12 @@ using namespace ::testing; namespace storage { -struct MergeHandlerTest : SingleDiskPersistenceTestUtils { +/* + * Class for testing merge handler taking async_apply_bucket_diff as + * parameter for the test. + */ +struct MergeHandlerTest : SingleDiskPersistenceTestUtils, + public testing::WithParamInterface<bool> { uint32_t _location; // Location used for all merge tests document::Bucket _bucket; // Bucket used for all merge tests uint64_t _maxTimestamp; @@ -149,8 +154,11 @@ struct MergeHandlerTest : SingleDiskPersistenceTestUtils { int _counter; MessageSenderStub _stub; std::shared_ptr<api::ApplyBucketDiffCommand> _applyCmd; + void convert_delayed_error_to_exception(MergeHandlerTest& test, MergeHandler& handler); }; + void convert_delayed_error_to_exception(MergeHandler& handler); + std::string doTestSPIException(MergeHandler& handler, PersistenceProviderWrapper& providerWrapper, @@ -159,11 +167,21 @@ struct MergeHandlerTest : SingleDiskPersistenceTestUtils { MergeHandler createHandler(size_t maxChunkSize = 0x400000) { return MergeHandler(getEnv(), getPersistenceProvider(), - getEnv()._component.cluster_context(), getEnv()._component.getClock(), maxChunkSize); + getEnv()._component.cluster_context(), getEnv()._component.getClock(), maxChunkSize, 64, GetParam()); } MergeHandler createHandler(spi::PersistenceProvider & spi) { return MergeHandler(getEnv(), spi, - getEnv()._component.cluster_context(), getEnv()._component.getClock()); + getEnv()._component.cluster_context(), getEnv()._component.getClock(), 4190208, 64, GetParam()); + } + + std::shared_ptr<api::StorageMessage> get_queued_reply() { + std::shared_ptr<api::StorageMessage> msg; + if (_replySender.queue.getNext(msg, 0s)) { + return msg; + } else { + return {}; + } + } }; @@ -209,7 +227,7 @@ MergeHandlerTest::setUpChain(ChainPos pos) { // Test a regular merge bucket command fetching data, including // puts, removes, unrevertable removes & duplicates. -TEST_F(MergeHandlerTest, merge_bucket_command) { +TEST_P(MergeHandlerTest, merge_bucket_command) { MergeHandler handler = createHandler(); LOG(debug, "Handle a merge bucket command"); @@ -270,11 +288,11 @@ MergeHandlerTest::testGetBucketDiffChain(bool midChain) EXPECT_EQ(17, diff.size()); } -TEST_F(MergeHandlerTest, get_bucket_diff_mid_chain) { +TEST_P(MergeHandlerTest, get_bucket_diff_mid_chain) { testGetBucketDiffChain(true); } -TEST_F(MergeHandlerTest, get_bucket_diff_end_of_chain) { +TEST_P(MergeHandlerTest, get_bucket_diff_end_of_chain) { testGetBucketDiffChain(false); } @@ -320,17 +338,17 @@ MergeHandlerTest::testApplyBucketDiffChain(bool midChain) EXPECT_EQ(0, diff.size()); } -TEST_F(MergeHandlerTest, apply_bucket_diff_mid_chain) { +TEST_P(MergeHandlerTest, apply_bucket_diff_mid_chain) { testApplyBucketDiffChain(true); } -TEST_F(MergeHandlerTest, apply_bucket_diff_end_of_chain) { +TEST_P(MergeHandlerTest, apply_bucket_diff_end_of_chain) { testApplyBucketDiffChain(false); } // Test that a simplistic merge with one thing to actually merge, // sends correct commands and finish. -TEST_F(MergeHandlerTest, master_message_flow) { +TEST_P(MergeHandlerTest, master_message_flow) { MergeHandler handler = createHandler(); LOG(debug, "Handle a merge bucket command"); @@ -424,7 +442,7 @@ getFilledDataSize(const std::vector<api::ApplyBucketDiffCommand::Entry>& diff) } -TEST_F(MergeHandlerTest, chunked_apply_bucket_diff) { +TEST_P(MergeHandlerTest, chunked_apply_bucket_diff) { uint32_t docSize = 1024; uint32_t docCount = 10; uint32_t maxChunkSize = docSize * 3; @@ -488,7 +506,7 @@ TEST_F(MergeHandlerTest, chunked_apply_bucket_diff) { EXPECT_TRUE(reply->getResult().success()); } -TEST_F(MergeHandlerTest, chunk_limit_partially_filled_diff) { +TEST_P(MergeHandlerTest, chunk_limit_partially_filled_diff) { setUpChain(FRONT); uint32_t docSize = 1024; @@ -524,7 +542,7 @@ TEST_F(MergeHandlerTest, chunk_limit_partially_filled_diff) { EXPECT_LE(getFilledDataSize(fwdDiffCmd->getDiff()), maxChunkSize); } -TEST_F(MergeHandlerTest, max_timestamp) { +TEST_P(MergeHandlerTest, max_timestamp) { doPut(1234, spi::Timestamp(_maxTimestamp + 10), 1024, 1024); MergeHandler handler = createHandler(); @@ -632,7 +650,7 @@ MergeHandlerTest::createDummyGetBucketDiff(int timestampOffset, uint16_t hasMask return getBucketDiffCmd; } -TEST_F(MergeHandlerTest, spi_flush_guard) { +TEST_P(MergeHandlerTest, spi_flush_guard) { PersistenceProviderWrapper providerWrapper(getPersistenceProvider()); MergeHandler handler = createHandler(providerWrapper); @@ -647,13 +665,16 @@ TEST_F(MergeHandlerTest, spi_flush_guard) { try { auto cmd = createDummyApplyDiff(6000); handler.handleApplyBucketDiff(*cmd, createTracker(cmd, _bucket)); + if (GetParam()) { + convert_delayed_error_to_exception(handler); + } FAIL() << "No exception thrown on failing in-place remove"; } catch (const std::runtime_error& e) { EXPECT_TRUE(std::string(e.what()).find("Failed remove") != std::string::npos); } } -TEST_F(MergeHandlerTest, bucket_not_found_in_db) { +TEST_P(MergeHandlerTest, bucket_not_found_in_db) { MergeHandler handler = createHandler(); // Send merge for unknown bucket auto cmd = std::make_shared<api::MergeBucketCommand>(makeDocumentBucket(document::BucketId(16, 6789)), _nodes, _maxTimestamp); @@ -661,7 +682,7 @@ TEST_F(MergeHandlerTest, bucket_not_found_in_db) { EXPECT_TRUE(tracker->getResult().isBucketDisappearance()); } -TEST_F(MergeHandlerTest, merge_progress_safe_guard) { +TEST_P(MergeHandlerTest, merge_progress_safe_guard) { MergeHandler handler = createHandler(); auto cmd = std::make_shared<api::MergeBucketCommand>(_bucket, _nodes, _maxTimestamp); handler.handleMergeBucket(*cmd, createTracker(cmd, _bucket)); @@ -684,7 +705,7 @@ TEST_F(MergeHandlerTest, merge_progress_safe_guard) { EXPECT_EQ(mergeReply->getResult().getResult(), api::ReturnCode::INTERNAL_FAILURE); } -TEST_F(MergeHandlerTest, safe_guard_not_invoked_when_has_mask_changes) { +TEST_P(MergeHandlerTest, safe_guard_not_invoked_when_has_mask_changes) { MergeHandler handler = createHandler(); _nodes.clear(); _nodes.emplace_back(0, false); @@ -716,7 +737,7 @@ TEST_F(MergeHandlerTest, safe_guard_not_invoked_when_has_mask_changes) { EXPECT_EQ(0x5, applyBucketDiffCmd2->getDiff()[0]._entry._hasMask); } -TEST_F(MergeHandlerTest, entry_removed_after_get_bucket_diff) { +TEST_P(MergeHandlerTest, entry_removed_after_get_bucket_diff) { MergeHandler handler = createHandler(); std::vector<api::ApplyBucketDiffCommand::Entry> applyDiff; { @@ -741,6 +762,23 @@ TEST_F(MergeHandlerTest, entry_removed_after_get_bucket_diff) { EXPECT_EQ(0x0, diff[0]._entry._hasMask); } +void +MergeHandlerTest::convert_delayed_error_to_exception(MergeHandler& handler) +{ + handler.drain_async_writes(); + if (getEnv()._fileStorHandler.isMerging(_bucket)) { + auto s = getEnv()._fileStorHandler.editMergeStatus(_bucket); + api::ReturnCode return_code; + s->check_delayed_error(return_code); + if (return_code.failed()) { + getEnv()._fileStorHandler.clearMergeStatus(_bucket, return_code); + fetchSingleMessage<api::ApplyBucketDiffReply>(); + fetchSingleMessage<api::ApplyBucketDiffCommand>(); + throw std::runtime_error(return_code.getMessage()); + } + } +} + std::string MergeHandlerTest::doTestSPIException(MergeHandler& handler, PersistenceProviderWrapper& providerWrapper, @@ -755,6 +793,9 @@ MergeHandlerTest::doTestSPIException(MergeHandler& handler, providerWrapper.setFailureMask(failureMask); try { invoker.invoke(*this, handler, *_context); + if (GetParam()) { + convert_delayed_error_to_exception(handler); + } if (failureMask != 0) { return (std::string("No exception was thrown during handler " "invocation. Expected exception containing '") @@ -823,7 +864,7 @@ MergeHandlerTest::HandleMergeBucketInvoker::invoke( handler.handleMergeBucket(*cmd, test.createTracker(cmd, test._bucket)); } -TEST_F(MergeHandlerTest, merge_bucket_spi_failures) { +TEST_P(MergeHandlerTest, merge_bucket_spi_failures) { PersistenceProviderWrapper providerWrapper(getPersistenceProvider()); MergeHandler handler = createHandler(providerWrapper); providerWrapper.setResult( @@ -855,7 +896,7 @@ MergeHandlerTest::HandleGetBucketDiffInvoker::invoke( handler.handleGetBucketDiff(*cmd, test.createTracker(cmd, test._bucket)); } -TEST_F(MergeHandlerTest, get_bucket_diff_spi_failures) { +TEST_P(MergeHandlerTest, get_bucket_diff_spi_failures) { PersistenceProviderWrapper providerWrapper(getPersistenceProvider()); MergeHandler handler = createHandler(providerWrapper); providerWrapper.setResult(spi::Result(spi::Result::ErrorType::PERMANENT_ERROR, "who you gonna call?")); @@ -888,7 +929,7 @@ MergeHandlerTest::HandleApplyBucketDiffInvoker::invoke( handler.handleApplyBucketDiff(*cmd, test.createTracker(cmd, test._bucket)); } -TEST_F(MergeHandlerTest, apply_bucket_diff_spi_failures) { +TEST_P(MergeHandlerTest, apply_bucket_diff_spi_failures) { PersistenceProviderWrapper providerWrapper(getPersistenceProvider()); MergeHandler handler = createHandler(providerWrapper); providerWrapper.setResult( @@ -953,7 +994,7 @@ MergeHandlerTest::HandleGetBucketDiffReplyInvoker::afterInvoke( api::ReturnCode::INTERNAL_FAILURE); } -TEST_F(MergeHandlerTest, get_bucket_diff_reply_spi_failures) { +TEST_P(MergeHandlerTest, get_bucket_diff_reply_spi_failures) { PersistenceProviderWrapper providerWrapper(getPersistenceProvider()); MergeHandler handler = createHandler(providerWrapper); providerWrapper.setResult( @@ -1006,6 +1047,18 @@ MergeHandlerTest::HandleApplyBucketDiffReplyInvoker::beforeInvoke( } void +MergeHandlerTest::HandleApplyBucketDiffReplyInvoker::convert_delayed_error_to_exception(MergeHandlerTest& test, MergeHandler &handler) +{ + handler.drain_async_writes(); + if (!_stub.replies.empty() && _stub.replies.back()->getResult().failed()) { + auto chained_reply = _stub.replies.back(); + _stub.replies.pop_back(); + test.messageKeeper().sendReply(chained_reply); + throw std::runtime_error(chained_reply->getResult().getMessage()); + } +} + +void MergeHandlerTest::HandleApplyBucketDiffReplyInvoker::invoke( MergeHandlerTest& test, MergeHandler& handler, @@ -1016,6 +1069,9 @@ MergeHandlerTest::HandleApplyBucketDiffReplyInvoker::invoke( test.fillDummyApplyDiff(reply->getDiff()); _stub.clear(); handler.handleApplyBucketDiffReply(*reply, _stub, test.createTracker(reply, test._bucket)); + if (test.GetParam()) { + convert_delayed_error_to_exception(test, handler); + } } std::string @@ -1039,7 +1095,7 @@ MergeHandlerTest::HandleApplyBucketDiffReplyInvoker::afterInvoke( } } -TEST_F(MergeHandlerTest, apply_bucket_diff_reply_spi_failures) { +TEST_P(MergeHandlerTest, apply_bucket_diff_reply_spi_failures) { PersistenceProviderWrapper providerWrapper(getPersistenceProvider()); HandleApplyBucketDiffReplyInvoker invoker; for (int i = 0; i < 2; ++i) { @@ -1066,7 +1122,7 @@ TEST_F(MergeHandlerTest, apply_bucket_diff_reply_spi_failures) { } } -TEST_F(MergeHandlerTest, remove_from_diff) { +TEST_P(MergeHandlerTest, remove_from_diff) { framework::defaultimplementation::FakeClock clock; MergeStatus status(clock, 0, 0); @@ -1132,7 +1188,7 @@ TEST_F(MergeHandlerTest, remove_from_diff) { } } -TEST_F(MergeHandlerTest, remove_put_on_existing_timestamp) { +TEST_P(MergeHandlerTest, remove_put_on_existing_timestamp) { setUpChain(BACK); document::TestDocMan docMan; @@ -1156,8 +1212,15 @@ TEST_F(MergeHandlerTest, remove_put_on_existing_timestamp) { auto tracker = handler.handleApplyBucketDiff(*applyBucketDiffCmd, createTracker(applyBucketDiffCmd, _bucket)); - auto applyBucketDiffReply = std::dynamic_pointer_cast<api::ApplyBucketDiffReply>(std::move(*tracker).stealReplySP()); - ASSERT_TRUE(applyBucketDiffReply.get()); + if (GetParam()) { + ASSERT_FALSE(tracker); + handler.drain_async_writes(); + auto applyBucketDiffReply = std::dynamic_pointer_cast<api::ApplyBucketDiffReply>(get_queued_reply()); + ASSERT_TRUE(applyBucketDiffReply.get()); + } else { + auto applyBucketDiffReply = std::dynamic_pointer_cast<api::ApplyBucketDiffReply>(std::move(*tracker).stealReplySP()); + ASSERT_TRUE(applyBucketDiffReply.get()); + } auto cmd = std::make_shared<api::MergeBucketCommand>(_bucket, _nodes, _maxTimestamp); handler.handleMergeBucket(*cmd, createTracker(cmd, _bucket)); @@ -1246,7 +1309,7 @@ std::ostream &operator<<(std::ostream &os, const GetBucketDiffCommand::Entry &en } -TEST_F(MergeHandlerTest, partially_filled_apply_bucket_diff_reply) +TEST_P(MergeHandlerTest, partially_filled_apply_bucket_diff_reply) { using NodeList = decltype(_nodes); // Redundancy is 2 and source only nodes 3 and 4 have doc1 and doc2 @@ -1382,4 +1445,6 @@ TEST_F(MergeHandlerTest, partially_filled_apply_bucket_diff_reply) LOG(debug, "got mergebucket reply"); } +VESPA_GTEST_INSTANTIATE_TEST_SUITE_P(AsyncApplyBucketDiffParams, MergeHandlerTest, testing::Values(false, true)); + } // storage diff --git a/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_complete.cpp b/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_complete.cpp index 1fbe155b16d..2dc5989e857 100644 --- a/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_complete.cpp +++ b/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_complete.cpp @@ -20,7 +20,7 @@ ApplyBucketDiffEntryComplete::ApplyBucketDiffEntryComplete(std::shared_ptr<Apply ApplyBucketDiffEntryComplete::~ApplyBucketDiffEntryComplete() = default; void -ApplyBucketDiffEntryComplete::onComplete(std::unique_ptr<spi::Result> result) +ApplyBucketDiffEntryComplete::onComplete(std::unique_ptr<spi::Result> result) noexcept { if (_result_handler != nullptr) { _result_handler->handle(*result); diff --git a/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_complete.h b/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_complete.h index dd2346d9dee..1037318aec6 100644 --- a/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_complete.h +++ b/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_complete.h @@ -27,7 +27,7 @@ class ApplyBucketDiffEntryComplete : public spi::OperationComplete public: ApplyBucketDiffEntryComplete(std::shared_ptr<ApplyBucketDiffState> state, document::DocumentId doc_id, const char *op, const framework::Clock& clock, metrics::DoubleAverageMetric& latency_metric); ~ApplyBucketDiffEntryComplete(); - void onComplete(std::unique_ptr<spi::Result> result) override; + void onComplete(std::unique_ptr<spi::Result> result) noexcept override; void addResultHandler(const spi::ResultHandler* resultHandler) override; }; diff --git a/storage/src/vespa/storage/persistence/apply_bucket_diff_state.cpp b/storage/src/vespa/storage/persistence/apply_bucket_diff_state.cpp index eb7a5ef5bc6..ad153c41aef 100644 --- a/storage/src/vespa/storage/persistence/apply_bucket_diff_state.cpp +++ b/storage/src/vespa/storage/persistence/apply_bucket_diff_state.cpp @@ -2,21 +2,27 @@ #include "apply_bucket_diff_state.h" #include "mergehandler.h" +#include "persistenceutil.h" #include <vespa/document/base/documentid.h> #include <vespa/persistence/spi/result.h> #include <vespa/vespalib/stllike/asciistream.h> using storage::spi::Result; +using vespalib::RetainGuard; namespace storage { -ApplyBucketDiffState::ApplyBucketDiffState(const MergeBucketInfoSyncer& merge_bucket_info_syncer, const spi::Bucket& bucket) +ApplyBucketDiffState::ApplyBucketDiffState(const MergeBucketInfoSyncer& merge_bucket_info_syncer, const spi::Bucket& bucket, RetainGuard&& retain_guard) : _merge_bucket_info_syncer(merge_bucket_info_syncer), _bucket(bucket), _fail_message(), _failed_flag(), _stale_bucket_info(false), - _promise() + _promise(), + _tracker(), + _delayed_reply(), + _sender(nullptr), + _retain_guard(std::move(retain_guard)) { } @@ -32,6 +38,17 @@ ApplyBucketDiffState::~ApplyBucketDiffState() if (_promise.has_value()) { _promise.value().set_value(_fail_message); } + if (_delayed_reply) { + if (!_delayed_reply->getResult().failed() && !_fail_message.empty()) { + _delayed_reply->setResult(api::ReturnCode(api::ReturnCode::INTERNAL_FAILURE, _fail_message)); + } + if (_sender) { + _sender->sendReply(std::move(_delayed_reply)); + } else { + // _tracker->_reply and _delayed_reply points to the same reply. + _tracker->sendReply(); + } + } } void @@ -69,4 +86,19 @@ ApplyBucketDiffState::get_future() return _promise.value().get_future(); } +void +ApplyBucketDiffState::set_delayed_reply(std::unique_ptr<MessageTracker>&& tracker, std::shared_ptr<api::StorageReply>&& delayed_reply) +{ + _tracker = std::move(tracker); + _delayed_reply = std::move(delayed_reply); +} + +void +ApplyBucketDiffState::set_delayed_reply(std::unique_ptr<MessageTracker>&& tracker, MessageSender& sender, std::shared_ptr<api::StorageReply>&& delayed_reply) +{ + _tracker = std::move(tracker); + _sender = &sender; + _delayed_reply = std::move(delayed_reply); +} + } diff --git a/storage/src/vespa/storage/persistence/apply_bucket_diff_state.h b/storage/src/vespa/storage/persistence/apply_bucket_diff_state.h index af4174b06d6..39f60156e66 100644 --- a/storage/src/vespa/storage/persistence/apply_bucket_diff_state.h +++ b/storage/src/vespa/storage/persistence/apply_bucket_diff_state.h @@ -3,16 +3,20 @@ #pragma once #include <vespa/persistence/spi/bucket.h> +#include <vespa/vespalib/util/retain_guard.h> #include <future> #include <memory> #include <vector> namespace document { class DocumentId; } +namespace storage::api { class StorageReply; } namespace storage::spi { class Result; } namespace storage { class ApplyBucketDiffEntryResult; +class MessageSender; +class MessageTracker; class MergeBucketInfoSyncer; /* @@ -26,9 +30,13 @@ class ApplyBucketDiffState { std::atomic_flag _failed_flag; bool _stale_bucket_info; std::optional<std::promise<vespalib::string>> _promise; + std::unique_ptr<MessageTracker> _tracker; + std::shared_ptr<api::StorageReply> _delayed_reply; + MessageSender* _sender; + vespalib::RetainGuard _retain_guard; public: - ApplyBucketDiffState(const MergeBucketInfoSyncer &merge_bucket_info_syncer, const spi::Bucket& bucket); + ApplyBucketDiffState(const MergeBucketInfoSyncer &merge_bucket_info_syncer, const spi::Bucket& bucket, vespalib::RetainGuard&& retain_guard); ~ApplyBucketDiffState(); void on_entry_complete(std::unique_ptr<storage::spi::Result> result, const document::DocumentId &doc_id, const char *op); void wait(); @@ -36,6 +44,8 @@ public: void mark_stale_bucket_info(); void sync_bucket_info(); std::future<vespalib::string> get_future(); + void set_delayed_reply(std::unique_ptr<MessageTracker>&& tracker, std::shared_ptr<api::StorageReply>&& delayed_reply); + void set_delayed_reply(std::unique_ptr<MessageTracker>&& tracker, MessageSender& sender, std::shared_ptr<api::StorageReply>&& delayed_reply); }; } diff --git a/storage/src/vespa/storage/persistence/asynchandler.cpp b/storage/src/vespa/storage/persistence/asynchandler.cpp index d150f5600e5..47b5e4f5f27 100644 --- a/storage/src/vespa/storage/persistence/asynchandler.cpp +++ b/storage/src/vespa/storage/persistence/asynchandler.cpp @@ -78,7 +78,7 @@ public: _executorId(executor.getExecutorId(bucketId.getId())) { } - void onComplete(spi::Result::UP result) override { + void onComplete(spi::Result::UP result) noexcept override { _task->setResult(std::move(result)); _executor.executeTask(_executorId, std::move(_task)); } diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp index 2ffb827accf..c32efb6aa66 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp +++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp @@ -209,6 +209,11 @@ FileStorManager::configure(std::unique_ptr<vespa::config::content::StorFilestorC *_filestorHandler, i % numStripes, _component)); } _bucketExecutorRegistration = _provider->register_executor(std::make_shared<BucketExecutorWrapper>(*this)); + } else { + std::lock_guard guard(_lock); + for (auto& handler : _persistenceHandlers) { + handler->configure(*config); + } } } diff --git a/storage/src/vespa/storage/persistence/filestorage/mergestatus.cpp b/storage/src/vespa/storage/persistence/filestorage/mergestatus.cpp index febac5b87e5..1d9e0c6fae6 100644 --- a/storage/src/vespa/storage/persistence/filestorage/mergestatus.cpp +++ b/storage/src/vespa/storage/persistence/filestorage/mergestatus.cpp @@ -14,6 +14,7 @@ MergeStatus::MergeStatus(const framework::Clock& clock, uint32_t traceLevel) : reply(), full_node_list(), nodeList(), maxTimestamp(0), diff(), pendingId(0), pendingGetDiff(), pendingApplyDiff(), timeout(0), startTime(clock), + delayed_error(), context(priority, traceLevel) {} @@ -122,4 +123,23 @@ MergeStatus::print(std::ostream& out, bool verbose, } } +void +MergeStatus::set_delayed_error(std::future<vespalib::string>&& delayed_error_in) +{ + delayed_error = std::move(delayed_error_in); +} + +void +MergeStatus::check_delayed_error(api::ReturnCode &return_code) +{ + if (!return_code.failed() && delayed_error.has_value()) { + // Wait for pending writes to local node to complete and check error + auto& future_error = delayed_error.value(); + future_error.wait(); + vespalib::string fail_message = future_error.get(); + delayed_error.reset(); + return_code = api::ReturnCode(api::ReturnCode::INTERNAL_FAILURE, std::move(fail_message)); + } +} + }; diff --git a/storage/src/vespa/storage/persistence/filestorage/mergestatus.h b/storage/src/vespa/storage/persistence/filestorage/mergestatus.h index b28ca4e373a..05ffd1336a2 100644 --- a/storage/src/vespa/storage/persistence/filestorage/mergestatus.h +++ b/storage/src/vespa/storage/persistence/filestorage/mergestatus.h @@ -9,7 +9,9 @@ #include <vector> #include <deque> +#include <future> #include <memory> +#include <optional> namespace storage { @@ -25,6 +27,7 @@ public: std::shared_ptr<api::ApplyBucketDiffReply> pendingApplyDiff; vespalib::duration timeout; framework::MilliSecTimer startTime; + std::optional<std::future<vespalib::string>> delayed_error; spi::Context context; MergeStatus(const framework::Clock&, api::StorageMessage::Priority, uint32_t traceLevel); @@ -40,6 +43,8 @@ public: bool removeFromDiff(const std::vector<api::ApplyBucketDiffCommand::Entry>& part, uint16_t hasMask, const std::vector<api::MergeBucketCommand::Node> &nodes); void print(std::ostream& out, bool verbose, const std::string& indent) const override; bool isFirstNode() const { return static_cast<bool>(reply); } + void set_delayed_error(std::future<vespalib::string>&& delayed_error_in); + void check_delayed_error(api::ReturnCode &return_code); }; } // storage diff --git a/storage/src/vespa/storage/persistence/mergehandler.cpp b/storage/src/vespa/storage/persistence/mergehandler.cpp index 17a16487ac4..77e7762ec9a 100644 --- a/storage/src/vespa/storage/persistence/mergehandler.cpp +++ b/storage/src/vespa/storage/persistence/mergehandler.cpp @@ -17,6 +17,9 @@ #include <vespa/log/log.h> LOG_SETUP(".persistence.mergehandler"); +using vespalib::MonitoredRefCount; +using vespalib::RetainGuard; + namespace storage { MergeHandler::MergeHandler(PersistenceUtil& env, spi::PersistenceProvider& spi, @@ -28,12 +31,19 @@ MergeHandler::MergeHandler(PersistenceUtil& env, spi::PersistenceProvider& spi, _cluster_context(cluster_context), _env(env), _spi(spi), + _monitored_ref_count(std::make_unique<MonitoredRefCount>()), _maxChunkSize(maxChunkSize), _commonMergeChainOptimalizationMinimumSize(commonMergeChainOptimalizationMinimumSize), _async_apply_bucket_diff(async_apply_bucket_diff) { } +MergeHandler::~MergeHandler() +{ + drain_async_writes(); +} + + namespace { constexpr int getDeleteFlag() { @@ -674,7 +684,8 @@ namespace { api::StorageReply::SP MergeHandler::processBucketMerge(const spi::Bucket& bucket, MergeStatus& status, - MessageSender& sender, spi::Context& context) const + MessageSender& sender, spi::Context& context, + std::shared_ptr<ApplyBucketDiffState>& async_results) const { // If last action failed, fail the whole merge if (status.reply->getResult().failed()) { @@ -806,6 +817,10 @@ MergeHandler::processBucketMerge(const spi::Bucket& bucket, MergeStatus& status, } cmd->setPriority(status.context.getPriority()); cmd->setTimeout(status.timeout); + if (async_results) { + // Check currently pending writes to local node before sending new command. + check_apply_diff_sync(std::move(async_results)); + } if (applyDiffNeedLocalData(cmd->getDiff(), 0, true)) { framework::MilliSecTimer startTime(_clock); fetchLocalData(bucket, cmd->getDiff(), 0, context); @@ -1171,7 +1186,8 @@ MergeHandler::handleGetBucketDiffReply(api::GetBucketDiffReply& reply, MessageSe reply.getDiff().begin(), reply.getDiff().end()); - replyToSend = processBucketMerge(bucket, *s, sender, s->context); + std::shared_ptr<ApplyBucketDiffState> async_results; + replyToSend = processBucketMerge(bucket, *s, sender, s->context, async_results); if (!replyToSend.get()) { // We have sent something on, and shouldn't reply now. @@ -1211,7 +1227,7 @@ MergeHandler::handleApplyBucketDiff(api::ApplyBucketDiffCommand& cmd, MessageTra tracker->setMetric(_env._metrics.applyBucketDiff); spi::Bucket bucket(cmd.getBucket()); - auto async_results = std::make_shared<ApplyBucketDiffState>(*this, bucket); + std::shared_ptr<ApplyBucketDiffState> async_results; LOG(debug, "%s", cmd.toString().c_str()); if (_env._fileStorHandler.isMerging(bucket.getBucket())) { @@ -1233,10 +1249,13 @@ MergeHandler::handleApplyBucketDiff(api::ApplyBucketDiffCommand& cmd, MessageTra } if (applyDiffHasLocallyNeededData(cmd.getDiff(), index)) { framework::MilliSecTimer startTime(_clock); + async_results = std::make_shared<ApplyBucketDiffState>(*this, bucket, RetainGuard(*_monitored_ref_count)); applyDiffLocally(bucket, cmd.getDiff(), index, tracker->context(), async_results); + if (!_async_apply_bucket_diff.load(std::memory_order_relaxed)) { + check_apply_diff_sync(std::move(async_results)); + } _env._metrics.merge_handler_metrics.mergeDataWriteLatency.addValue( startTime.getElapsedTimeAsDouble()); - check_apply_diff_sync(std::move(async_results)); } else { LOG(spam, "Merge(%s): Didn't need fetched data on node %u (%u).", bucket.toString().c_str(), _env._nodeIndex, index); @@ -1260,10 +1279,14 @@ MergeHandler::handleApplyBucketDiff(api::ApplyBucketDiffCommand& cmd, MessageTra } } - tracker->setReply(std::make_shared<api::ApplyBucketDiffReply>(cmd)); + auto reply = std::make_shared<api::ApplyBucketDiffReply>(cmd); + tracker->setReply(reply); static_cast<api::ApplyBucketDiffReply&>(tracker->getReply()).getDiff().swap(cmd.getDiff()); LOG(spam, "Replying to ApplyBucketDiff for %s to node %d.", bucket.toString().c_str(), cmd.getNodes()[index - 1].index); + if (async_results) { + async_results->set_delayed_reply(std::move(tracker), std::move(reply)); + } } else { // When not the last node in merge chain, we must save reply, and // send command on. @@ -1280,6 +1303,10 @@ MergeHandler::handleApplyBucketDiff(api::ApplyBucketDiffCommand& cmd, MessageTra cmd2->setPriority(cmd.getPriority()); cmd2->setTimeout(cmd.getTimeout()); s->pendingId = cmd2->getMsgId(); + if (async_results) { + // Reply handler should check for delayed error. + s->set_delayed_error(async_results->get_future()); + } _env._fileStorHandler.sendCommand(cmd2); // Everything went fine. Don't delete state but wait for reply stateGuard.deactivate(); @@ -1290,12 +1317,11 @@ MergeHandler::handleApplyBucketDiff(api::ApplyBucketDiffCommand& cmd, MessageTra } void -MergeHandler::handleApplyBucketDiffReply(api::ApplyBucketDiffReply& reply,MessageSender& sender, MessageTracker::UP tracker) const +MergeHandler::handleApplyBucketDiffReply(api::ApplyBucketDiffReply& reply, MessageSender& sender, MessageTracker::UP tracker) const { - (void) tracker; _env._metrics.applyBucketDiffReply.inc(); spi::Bucket bucket(reply.getBucket()); - auto async_results = std::make_shared<ApplyBucketDiffState>(*this, bucket); + std::shared_ptr<ApplyBucketDiffState> async_results; std::vector<api::ApplyBucketDiffCommand::Entry>& diff(reply.getDiff()); LOG(debug, "%s", reply.toString().c_str()); @@ -1316,6 +1342,8 @@ MergeHandler::handleApplyBucketDiffReply(api::ApplyBucketDiffReply& reply,Messag api::StorageReply::SP replyToSend; // Process apply bucket diff locally api::ReturnCode returnCode = reply.getResult(); + // Check for delayed error from handleApplyBucketDiff + s->check_delayed_error(returnCode); try { if (reply.getResult().failed()) { LOG(debug, "Got failed apply bucket diff reply %s", reply.toString().c_str()); @@ -1329,9 +1357,12 @@ MergeHandler::handleApplyBucketDiffReply(api::ApplyBucketDiffReply& reply,Messag } if (applyDiffHasLocallyNeededData(diff, index)) { framework::MilliSecTimer startTime(_clock); + async_results = std::make_shared<ApplyBucketDiffState>(*this, bucket, RetainGuard(*_monitored_ref_count)); applyDiffLocally(bucket, diff, index, s->context, async_results); + if (!_async_apply_bucket_diff.load(std::memory_order_relaxed)) { + check_apply_diff_sync(std::move(async_results)); + } _env._metrics.merge_handler_metrics.mergeDataWriteLatency.addValue(startTime.getElapsedTimeAsDouble()); - check_apply_diff_sync(std::move(async_results)); } else { LOG(spam, "Merge(%s): Didn't need fetched data on node %u (%u)", bucket.toString().c_str(), @@ -1370,7 +1401,7 @@ MergeHandler::handleApplyBucketDiffReply(api::ApplyBucketDiffReply& reply,Messag // Should reply now, since we failed. replyToSend = s->reply; } else { - replyToSend = processBucketMerge(bucket, *s, sender, s->context); + replyToSend = processBucketMerge(bucket, *s, sender, s->context, async_results); if (!replyToSend.get()) { // We have sent something on and shouldn't reply now. @@ -1392,6 +1423,10 @@ MergeHandler::handleApplyBucketDiffReply(api::ApplyBucketDiffReply& reply,Messag throw; } + if (async_results && replyToSend) { + replyToSend->setResult(returnCode); + async_results->set_delayed_reply(std::move(tracker), sender, std::move(replyToSend)); + } if (clearState) { _env._fileStorHandler.clearMergeStatus(bucket.getBucket()); } @@ -1402,4 +1437,19 @@ MergeHandler::handleApplyBucketDiffReply(api::ApplyBucketDiffReply& reply,Messag } } +void +MergeHandler::drain_async_writes() +{ + if (_monitored_ref_count) { + // Wait for related ApplyBucketDiffState objects to be destroyed + _monitored_ref_count->waitForZeroRefCount(); + } +} + +void +MergeHandler::configure(bool async_apply_bucket_diff) noexcept +{ + _async_apply_bucket_diff.store(async_apply_bucket_diff, std::memory_order_release); +} + } // storage diff --git a/storage/src/vespa/storage/persistence/mergehandler.h b/storage/src/vespa/storage/persistence/mergehandler.h index f6e8ddcf306..17cfb847d2c 100644 --- a/storage/src/vespa/storage/persistence/mergehandler.h +++ b/storage/src/vespa/storage/persistence/mergehandler.h @@ -20,6 +20,7 @@ #include <vespa/storageapi/message/bucket.h> #include <vespa/storage/common/cluster_context.h> #include <vespa/storage/common/messagesender.h> +#include <vespa/vespalib/util/monitored_refcount.h> namespace storage { @@ -48,6 +49,8 @@ public: uint32_t commonMergeChainOptimalizationMinimumSize = 64, bool async_apply_bucket_diff = false); + ~MergeHandler(); + bool buildBucketInfoList( const spi::Bucket& bucket, Timestamp maxTimestamp, @@ -70,21 +73,25 @@ public: void handleGetBucketDiffReply(api::GetBucketDiffReply&, MessageSender&) const; MessageTrackerUP handleApplyBucketDiff(api::ApplyBucketDiffCommand&, MessageTrackerUP) const; void handleApplyBucketDiffReply(api::ApplyBucketDiffReply&, MessageSender&, MessageTrackerUP) const; + void drain_async_writes(); + void configure(bool async_apply_bucket_diff) noexcept; private: const framework::Clock &_clock; const ClusterContext &_cluster_context; PersistenceUtil &_env; spi::PersistenceProvider &_spi; + std::unique_ptr<vespalib::MonitoredRefCount> _monitored_ref_count; const uint32_t _maxChunkSize; const uint32_t _commonMergeChainOptimalizationMinimumSize; - const bool _async_apply_bucket_diff; + std::atomic<bool> _async_apply_bucket_diff; /** Returns a reply if merge is complete */ api::StorageReply::SP processBucketMerge(const spi::Bucket& bucket, MergeStatus& status, MessageSender& sender, - spi::Context& context) const; + spi::Context& context, + std::shared_ptr<ApplyBucketDiffState>& async_results) const; /** * Invoke either put, remove or unrevertable remove on the SPI diff --git a/storage/src/vespa/storage/persistence/persistencehandler.cpp b/storage/src/vespa/storage/persistence/persistencehandler.cpp index 1ef883fc810..aa1a9c136fd 100644 --- a/storage/src/vespa/storage/persistence/persistencehandler.cpp +++ b/storage/src/vespa/storage/persistence/persistencehandler.cpp @@ -150,4 +150,10 @@ PersistenceHandler::processLockedMessage(FileStorHandler::LockedMessage lock) co } } +void +PersistenceHandler::configure(vespa::config::content::StorFilestorConfig& config) noexcept +{ + _mergeHandler.configure(config.asyncApplyBucketDiff); +} + } diff --git a/storage/src/vespa/storage/persistence/persistencehandler.h b/storage/src/vespa/storage/persistence/persistencehandler.h index a92c2dc78ca..c60fb05e56e 100644 --- a/storage/src/vespa/storage/persistence/persistencehandler.h +++ b/storage/src/vespa/storage/persistence/persistencehandler.h @@ -35,6 +35,7 @@ public: const AsyncHandler & asyncHandler() const { return _asyncHandler; } const SplitJoinHandler & splitjoinHandler() const { return _splitJoinHandler; } const SimpleMessageHandler & simpleMessageHandler() const { return _simpleHandler; } + void configure(vespa::config::content::StorFilestorConfig& config) noexcept; private: // Message handling functions MessageTracker::UP handleCommandSplitByType(api::StorageCommand&, MessageTracker::UP tracker) const; diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java index a3e3c512dcd..e5667c7b392 100644 --- a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java +++ b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java @@ -8,7 +8,6 @@ import com.yahoo.cloud.config.ClusterListConfig; import com.yahoo.concurrent.DaemonThreadFactory; import com.yahoo.container.core.HandlerMetricContextUtil; import com.yahoo.container.core.documentapi.VespaDocumentAccess; -import com.yahoo.container.handler.threadpool.ContainerThreadPool; import com.yahoo.container.jdisc.ContentChannelOutputStream; import com.yahoo.container.jdisc.MaxPendingContentChannelOutputStream; import com.yahoo.document.Document; @@ -184,7 +183,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { private final Map<String, Map<Method, Handler>> handlers = defineApi(); @Inject - public DocumentV1ApiHandler(ContainerThreadPool threadPool, + public DocumentV1ApiHandler(Executor defaultExecutor, Metric metric, MetricReceiver metricReceiver, VespaDocumentAccess documentAccess, @@ -193,7 +192,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { AllClustersBucketSpacesConfig bucketSpacesConfig, DocumentOperationExecutorConfig executorConfig) { this(Clock.systemUTC(), Duration.ofSeconds(5), metric, metricReceiver, documentAccess, - documentManagerConfig, executorConfig, clusterListConfig, bucketSpacesConfig, threadPool.executor()); + documentManagerConfig, executorConfig, clusterListConfig, bucketSpacesConfig, defaultExecutor); } DocumentV1ApiHandler(Clock clock, Duration handlerTimeout, Metric metric, MetricReceiver metricReceiver, DocumentAccess access, |