summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--cloud-tenant-base-dependencies-enforcer/pom.xml2
-rw-r--r--config-model-api/src/main/java/com/yahoo/config/model/api/ModelContext.java1
-rw-r--r--config-model/src/main/java/com/yahoo/config/model/deploy/TestProperties.java7
-rw-r--r--config-model/src/main/java/com/yahoo/vespa/model/ConfigSentinel.java12
-rw-r--r--config-model/src/main/java/com/yahoo/vespa/model/clients/ContainerDocumentApi.java13
-rw-r--r--config-model/src/main/java/com/yahoo/vespa/model/content/storagecluster/FileStorProducer.java3
-rw-r--r--config-model/src/test/java/com/yahoo/vespa/model/container/xml/ContainerDocumentApiBuilderTest.java4
-rw-r--r--config-model/src/test/java/com/yahoo/vespa/model/content/StorageClusterTest.java55
-rw-r--r--configserver/src/main/java/com/yahoo/vespa/config/server/deploy/ModelContextImpl.java6
-rw-r--r--configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionRepository.java3
-rw-r--r--container-dependency-versions/pom.xml4
-rw-r--r--controller-server/src/main/java/com/yahoo/vespa/hosted/controller/application/Endpoint.java117
-rw-r--r--controller-server/src/main/java/com/yahoo/vespa/hosted/controller/application/pkg/ApplicationPackageValidator.java2
-rw-r--r--controller-server/src/main/java/com/yahoo/vespa/hosted/controller/restapi/application/ApplicationApiHandler.java1
-rw-r--r--controller-server/src/main/java/com/yahoo/vespa/hosted/controller/routing/RoutingPolicy.java2
-rw-r--r--controller-server/src/test/java/com/yahoo/vespa/hosted/controller/application/EndpointTest.java163
-rw-r--r--controller-server/src/test/java/com/yahoo/vespa/hosted/controller/routing/RoutingPoliciesTest.java6
-rw-r--r--filedistribution/pom.xml6
-rw-r--r--flags/src/main/java/com/yahoo/vespa/flags/Flags.java9
-rw-r--r--jdisc_core/src/main/java/com/yahoo/jdisc/core/FelixParams.java1
-rw-r--r--jdisc_core/src/main/java/com/yahoo/jdisc/core/OsgiLogHandler.java3
-rw-r--r--persistence/src/vespa/persistence/spi/catchresult.cpp2
-rw-r--r--persistence/src/vespa/persistence/spi/catchresult.h2
-rw-r--r--persistence/src/vespa/persistence/spi/operationcomplete.h2
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/spi_bm_feed_handler.cpp4
-rw-r--r--storage/src/tests/persistence/apply_bucket_diff_state_test.cpp5
-rw-r--r--storage/src/tests/persistence/mergehandlertest.cpp119
-rw-r--r--storage/src/vespa/storage/persistence/apply_bucket_diff_entry_complete.cpp2
-rw-r--r--storage/src/vespa/storage/persistence/apply_bucket_diff_entry_complete.h2
-rw-r--r--storage/src/vespa/storage/persistence/apply_bucket_diff_state.cpp36
-rw-r--r--storage/src/vespa/storage/persistence/apply_bucket_diff_state.h12
-rw-r--r--storage/src/vespa/storage/persistence/asynchandler.cpp2
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp5
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/mergestatus.cpp20
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/mergestatus.h5
-rw-r--r--storage/src/vespa/storage/persistence/mergehandler.cpp70
-rw-r--r--storage/src/vespa/storage/persistence/mergehandler.h11
-rw-r--r--storage/src/vespa/storage/persistence/persistencehandler.cpp6
-rw-r--r--storage/src/vespa/storage/persistence/persistencehandler.h1
-rw-r--r--vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java5
40 files changed, 516 insertions, 215 deletions
diff --git a/cloud-tenant-base-dependencies-enforcer/pom.xml b/cloud-tenant-base-dependencies-enforcer/pom.xml
index 636dbf91a57..bcff251f2c2 100644
--- a/cloud-tenant-base-dependencies-enforcer/pom.xml
+++ b/cloud-tenant-base-dependencies-enforcer/pom.xml
@@ -22,7 +22,7 @@
<aopalliance.version>1.0</aopalliance.version>
<athenz.version>1.10.14</athenz.version>
<bouncycastle.version>1.68</bouncycastle.version>
- <felix.version>6.0.3</felix.version>
+ <felix.version>7.0.1</felix.version>
<felix.log.version>1.0.1</felix.log.version>
<findbugs.version>1.3.9</findbugs.version>
<guava.version>20.0</guava.version>
diff --git a/config-model-api/src/main/java/com/yahoo/config/model/api/ModelContext.java b/config-model-api/src/main/java/com/yahoo/config/model/api/ModelContext.java
index 2c51aa89c66..8c2957502a8 100644
--- a/config-model-api/src/main/java/com/yahoo/config/model/api/ModelContext.java
+++ b/config-model-api/src/main/java/com/yahoo/config/model/api/ModelContext.java
@@ -103,6 +103,7 @@ public interface ModelContext {
@ModelFeatureFlag(owners = {"geirst", "vekterli"}) default int distributorMergeBusyWait() { return 10; }
@ModelFeatureFlag(owners = {"vekterli", "geirst"}) default boolean distributorEnhancedMaintenanceScheduling() { return false; }
@ModelFeatureFlag(owners = {"arnej"}) default boolean forwardIssuesAsErrors() { return true; }
+ @ModelFeatureFlag(owners = {"geirst", "vekterli"}) default boolean asyncApplyBucketDiff() { return false; }
}
/** Warning: As elsewhere in this package, do not make backwards incompatible changes that will break old config models! */
diff --git a/config-model/src/main/java/com/yahoo/config/model/deploy/TestProperties.java b/config-model/src/main/java/com/yahoo/config/model/deploy/TestProperties.java
index 7a09a36c1d7..b7506587102 100644
--- a/config-model/src/main/java/com/yahoo/config/model/deploy/TestProperties.java
+++ b/config-model/src/main/java/com/yahoo/config/model/deploy/TestProperties.java
@@ -70,6 +70,7 @@ public class TestProperties implements ModelContext.Properties, ModelContext.Fea
private int maxUnCommittedMemory = 123456;
private double diskBloatFactor = 0.2;
private boolean distributorEnhancedMaintenanceScheduling = false;
+ private boolean asyncApplyBucketDiff = false;
@Override public ModelContext.FeatureFlags featureFlags() { return this; }
@Override public boolean multitenant() { return multitenant; }
@@ -120,6 +121,7 @@ public class TestProperties implements ModelContext.Properties, ModelContext.Fea
@Override public int docstoreCompressionLevel() { return docstoreCompressionLevel; }
@Override public boolean distributorEnhancedMaintenanceScheduling() { return distributorEnhancedMaintenanceScheduling; }
@Override public int maxUnCommittedMemory() { return maxUnCommittedMemory; }
+ @Override public boolean asyncApplyBucketDiff() { return asyncApplyBucketDiff; }
public TestProperties maxUnCommittedMemory(int maxUnCommittedMemory) {
this.maxUnCommittedMemory = maxUnCommittedMemory;
@@ -307,6 +309,11 @@ public class TestProperties implements ModelContext.Properties, ModelContext.Fea
return this;
}
+ public TestProperties setAsyncApplyBucketDiff(boolean value) {
+ asyncApplyBucketDiff = value;
+ return this;
+ }
+
public static class Spec implements ConfigServerSpec {
private final String hostName;
diff --git a/config-model/src/main/java/com/yahoo/vespa/model/ConfigSentinel.java b/config-model/src/main/java/com/yahoo/vespa/model/ConfigSentinel.java
index 2eb8a6e319b..c86cda0bbd1 100644
--- a/config-model/src/main/java/com/yahoo/vespa/model/ConfigSentinel.java
+++ b/config-model/src/main/java/com/yahoo/vespa/model/ConfigSentinel.java
@@ -19,7 +19,6 @@ public class ConfigSentinel extends AbstractService implements SentinelConfig.Pr
private final ApplicationId applicationId;
private final Zone zone;
- private final boolean requireConnectivityCheck;
/**
* Constructs a new ConfigSentinel for the given host.
@@ -32,7 +31,6 @@ public class ConfigSentinel extends AbstractService implements SentinelConfig.Pr
super(host, "sentinel");
this.applicationId = applicationId;
this.zone = zone;
- this.requireConnectivityCheck = featureFlags.requireConnectivityCheck();
portsMeta.on(0).tag("rpc").tag("admin");
portsMeta.on(1).tag("telnet").tag("interactive").tag("http").tag("state");
setProp("clustertype", "hosts");
@@ -80,16 +78,6 @@ public class ConfigSentinel extends AbstractService implements SentinelConfig.Pr
builder.service(getServiceConfig(s));
}
}
- builder.connectivity(getConnectivityConfig(requireConnectivityCheck));
- }
-
- private SentinelConfig.Connectivity.Builder getConnectivityConfig(boolean enable) {
- var builder = new SentinelConfig.Connectivity.Builder();
- if (! enable) {
- builder.minOkPercent(0);
- builder.maxBadCount(Integer.MAX_VALUE);
- }
- return builder;
}
private SentinelConfig.Application.Builder getApplicationConfig() {
diff --git a/config-model/src/main/java/com/yahoo/vespa/model/clients/ContainerDocumentApi.java b/config-model/src/main/java/com/yahoo/vespa/model/clients/ContainerDocumentApi.java
index 0177aa6b2e8..8534e1f65a7 100644
--- a/config-model/src/main/java/com/yahoo/vespa/model/clients/ContainerDocumentApi.java
+++ b/config-model/src/main/java/com/yahoo/vespa/model/clients/ContainerDocumentApi.java
@@ -23,24 +23,23 @@ public class ContainerDocumentApi {
public static final String DOCUMENT_V1_PREFIX = "/document/v1";
public ContainerDocumentApi(ContainerCluster<?> cluster, Options options) {
- var executor = new Threadpool("feedapi-handler", cluster, options.feedApiThreadpoolOptions);
- cluster.addComponent(executor);
- addRestApiHandler(cluster, options, executor);
- addFeedHandler(cluster, options, executor);
+ addRestApiHandler(cluster, options);
+ addFeedHandler(cluster, options);
}
- private static void addFeedHandler(ContainerCluster<?> cluster, Options options, Threadpool executor) {
+ private static void addFeedHandler(ContainerCluster<?> cluster, Options options) {
String bindingSuffix = ContainerCluster.RESERVED_URI_PREFIX + "/feedapi";
var handler = newVespaClientHandler("com.yahoo.vespa.http.server.FeedHandler", bindingSuffix, options);
cluster.addComponent(handler);
+ var executor = new Threadpool("feedapi-handler", cluster, options.feedApiThreadpoolOptions);
handler.inject(executor);
+ handler.addComponent(executor);
}
- private static void addRestApiHandler(ContainerCluster<?> cluster, Options options, Threadpool executor) {
+ private static void addRestApiHandler(ContainerCluster<?> cluster, Options options) {
var handler = newVespaClientHandler("com.yahoo.document.restapi.resource.DocumentV1ApiHandler", DOCUMENT_V1_PREFIX + "/*", options);
cluster.addComponent(handler);
- handler.inject(executor);
// We need to include a dummy implementation of the previous restapi handler (using the same class name).
// The internal legacy test framework requires that the name of the old handler is listed in /ApplicationStatus.
diff --git a/config-model/src/main/java/com/yahoo/vespa/model/content/storagecluster/FileStorProducer.java b/config-model/src/main/java/com/yahoo/vespa/model/content/storagecluster/FileStorProducer.java
index 5c692e8ef6b..d7f6fb6c581 100644
--- a/config-model/src/main/java/com/yahoo/vespa/model/content/storagecluster/FileStorProducer.java
+++ b/config-model/src/main/java/com/yahoo/vespa/model/content/storagecluster/FileStorProducer.java
@@ -47,6 +47,7 @@ public class FileStorProducer implements StorFilestorConfig.Producer {
private final int reponseNumThreads;
private final StorFilestorConfig.Response_sequencer_type.Enum responseSequencerType;
private final boolean useAsyncMessageHandlingOnSchedule;
+ private final boolean asyncApplyBucketDiff;
private static StorFilestorConfig.Response_sequencer_type.Enum convertResponseSequencerType(String sequencerType) {
try {
@@ -62,6 +63,7 @@ public class FileStorProducer implements StorFilestorConfig.Producer {
this.reponseNumThreads = featureFlags.defaultNumResponseThreads();
this.responseSequencerType = convertResponseSequencerType(featureFlags.responseSequencerType());
useAsyncMessageHandlingOnSchedule = featureFlags.useAsyncMessageHandlingOnSchedule();
+ asyncApplyBucketDiff = featureFlags.asyncApplyBucketDiff();
}
@Override
@@ -73,6 +75,7 @@ public class FileStorProducer implements StorFilestorConfig.Producer {
builder.num_response_threads(reponseNumThreads);
builder.response_sequencer_type(responseSequencerType);
builder.use_async_message_handling_on_schedule(useAsyncMessageHandlingOnSchedule);
+ builder.async_apply_bucket_diff(asyncApplyBucketDiff);
}
}
diff --git a/config-model/src/test/java/com/yahoo/vespa/model/container/xml/ContainerDocumentApiBuilderTest.java b/config-model/src/test/java/com/yahoo/vespa/model/container/xml/ContainerDocumentApiBuilderTest.java
index b52f797dae3..ed26dafe60c 100644
--- a/config-model/src/test/java/com/yahoo/vespa/model/container/xml/ContainerDocumentApiBuilderTest.java
+++ b/config-model/src/test/java/com/yahoo/vespa/model/container/xml/ContainerDocumentApiBuilderTest.java
@@ -105,7 +105,7 @@ public class ContainerDocumentApiBuilderTest extends ContainerModelBuilderTestBa
assertThat(injectedComponentIds, hasItem("threadpool@feedapi-handler"));
ContainerThreadpoolConfig config = root.getConfig(
- ContainerThreadpoolConfig.class, "cluster1/component/threadpool@feedapi-handler");
+ ContainerThreadpoolConfig.class, "cluster1/component/com.yahoo.vespa.http.server.FeedHandler/threadpool@feedapi-handler");
assertEquals(-4, config.maxThreads());
assertEquals(-4, config.minThreads());
}
@@ -128,7 +128,7 @@ public class ContainerDocumentApiBuilderTest extends ContainerModelBuilderTestBa
createModel(root, elem);
ContainerThreadpoolConfig feedThreadpoolConfig = root.getConfig(
- ContainerThreadpoolConfig.class, "cluster1/component/threadpool@feedapi-handler");
+ ContainerThreadpoolConfig.class, "cluster1/component/com.yahoo.vespa.http.server.FeedHandler/threadpool@feedapi-handler");
assertEquals(50, feedThreadpoolConfig.maxThreads());
assertEquals(25, feedThreadpoolConfig.minThreads());
assertEquals(1000, feedThreadpoolConfig.queueSize());
diff --git a/config-model/src/test/java/com/yahoo/vespa/model/content/StorageClusterTest.java b/config-model/src/test/java/com/yahoo/vespa/model/content/StorageClusterTest.java
index 9d8d7509966..739f8b7fff2 100644
--- a/config-model/src/test/java/com/yahoo/vespa/model/content/StorageClusterTest.java
+++ b/config-model/src/test/java/com/yahoo/vespa/model/content/StorageClusterTest.java
@@ -142,6 +142,16 @@ public class StorageClusterTest {
return new StorServerConfig(builder);
}
+ private StorFilestorConfig filestorConfigFromProducer(StorFilestorConfig.Producer producer) {
+ var builder = new StorFilestorConfig.Builder();
+ producer.getConfig(builder);
+ return new StorFilestorConfig(builder);
+ }
+
+ private StorFilestorConfig filestorConfigFromProperties(TestProperties properties) {
+ return filestorConfigFromProducer(parse(cluster("foo", ""), properties));
+ }
+
@Test
public void testMergeFeatureFlags() {
var config = configFromProperties(new TestProperties().setMaxMergeQueueSize(1919).setMaxConcurrentMergesPerNode(37));
@@ -159,6 +169,15 @@ public class StorageClusterTest {
}
@Test
+ public void async_apply_bucket_diff_can_be_controlled_by_feature_flag() {
+ var config = filestorConfigFromProperties(new TestProperties());
+ assertFalse(config.async_apply_bucket_diff());
+
+ config = filestorConfigFromProperties(new TestProperties().setAsyncApplyBucketDiff(true));
+ assertTrue(config.async_apply_bucket_diff());
+ }
+
+ @Test
public void testVisitors() {
StorVisitorConfig.Builder builder = new StorVisitorConfig.Builder();
parse(cluster("bees",
@@ -188,9 +207,7 @@ public class StorageClusterTest {
);
{
- StorFilestorConfig.Builder builder = new StorFilestorConfig.Builder();
- stc.getConfig(builder);
- StorFilestorConfig config = new StorFilestorConfig(builder);
+ var config = filestorConfigFromProducer(stc);
assertEquals(7, config.num_threads());
assertFalse(config.enable_multibit_split_optimalization());
@@ -199,9 +216,7 @@ public class StorageClusterTest {
{
assertEquals(1, stc.getChildren().size());
StorageNode sn = stc.getChildren().values().iterator().next();
- StorFilestorConfig.Builder builder = new StorFilestorConfig.Builder();
- sn.getConfig(builder);
- StorFilestorConfig config = new StorFilestorConfig(builder);
+ var config = filestorConfigFromProducer(sn);
assertEquals(7, config.num_threads());
}
}
@@ -215,9 +230,7 @@ public class StorageClusterTest {
"</tuning>")),
new Flavor(new FlavorsConfig.Flavor.Builder().name("test-flavor").minCpuCores(9).build())
);
- StorFilestorConfig.Builder builder = new StorFilestorConfig.Builder();
- stc.getConfig(builder);
- StorFilestorConfig config = new StorFilestorConfig(builder);
+ var config = filestorConfigFromProducer(stc);
assertEquals(2, config.num_response_threads());
assertEquals(StorFilestorConfig.Response_sequencer_type.ADAPTIVE, config.response_sequencer_type());
assertEquals(7, config.num_threads());
@@ -238,9 +251,7 @@ public class StorageClusterTest {
);
{
- StorFilestorConfig.Builder builder = new StorFilestorConfig.Builder();
- stc.getConfig(builder);
- StorFilestorConfig config = new StorFilestorConfig(builder);
+ var config = filestorConfigFromProducer(stc);
assertEquals(4, config.num_threads());
assertFalse(config.enable_multibit_split_optimalization());
@@ -248,9 +259,7 @@ public class StorageClusterTest {
{
assertEquals(1, stc.getChildren().size());
StorageNode sn = stc.getChildren().values().iterator().next();
- StorFilestorConfig.Builder builder = new StorFilestorConfig.Builder();
- sn.getConfig(builder);
- StorFilestorConfig config = new StorFilestorConfig(builder);
+ var config = filestorConfigFromProducer(sn);
assertEquals(4, config.num_threads());
}
}
@@ -262,17 +271,13 @@ public class StorageClusterTest {
);
{
- StorFilestorConfig.Builder builder = new StorFilestorConfig.Builder();
- stc.getConfig(builder);
- StorFilestorConfig config = new StorFilestorConfig(builder);
+ var config = filestorConfigFromProducer(stc);
assertEquals(8, config.num_threads());
}
{
assertEquals(1, stc.getChildren().size());
StorageNode sn = stc.getChildren().values().iterator().next();
- StorFilestorConfig.Builder builder = new StorFilestorConfig.Builder();
- sn.getConfig(builder);
- StorFilestorConfig config = new StorFilestorConfig(builder);
+ var config = filestorConfigFromProducer(sn);
assertEquals(9, config.num_threads());
}
}
@@ -285,17 +290,13 @@ public class StorageClusterTest {
@Test
public void testFeatureFlagControlOfResponseSequencer() {
- StorFilestorConfig.Builder builder = new StorFilestorConfig.Builder();
- simpleCluster(new TestProperties().setResponseNumThreads(13).setResponseSequencerType("THROUGHPUT")).getConfig(builder);
- StorFilestorConfig config = new StorFilestorConfig(builder);
+ var config = filestorConfigFromProducer(simpleCluster(new TestProperties().setResponseNumThreads(13).setResponseSequencerType("THROUGHPUT")));
assertEquals(13, config.num_response_threads());
assertEquals(StorFilestorConfig.Response_sequencer_type.THROUGHPUT, config.response_sequencer_type());
}
private void verifyAsyncMessageHandlingOnSchedule(boolean expected, boolean value) {
- StorFilestorConfig.Builder builder = new StorFilestorConfig.Builder();
- simpleCluster(new TestProperties().setAsyncMessageHandlingOnSchedule(value)).getConfig(builder);
- StorFilestorConfig config = new StorFilestorConfig(builder);
+ var config = filestorConfigFromProducer(simpleCluster(new TestProperties().setAsyncMessageHandlingOnSchedule(value)));
assertEquals(expected, config.use_async_message_handling_on_schedule());
}
@Test
diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/deploy/ModelContextImpl.java b/configserver/src/main/java/com/yahoo/vespa/config/server/deploy/ModelContextImpl.java
index 2752bda3c68..7fdc18827f4 100644
--- a/configserver/src/main/java/com/yahoo/vespa/config/server/deploy/ModelContextImpl.java
+++ b/configserver/src/main/java/com/yahoo/vespa/config/server/deploy/ModelContextImpl.java
@@ -179,7 +179,6 @@ public class ModelContextImpl implements ModelContext {
private final List<String> allowedAthenzProxyIdentities;
private final int maxActivationInhibitedOutOfSyncGroups;
private final ToIntFunction<ClusterSpec.Type> jvmOmitStackTraceInFastThrow;
- private final boolean requireConnectivityCheck;
private final int maxConcurrentMergesPerContentNode;
private final int maxMergeQueueSize;
private final boolean ignoreMergeQueueLimit;
@@ -198,6 +197,7 @@ public class ModelContextImpl implements ModelContext {
private final boolean distributorEnhancedMaintenanceScheduling;
private final int maxUnCommittedMemory;
private final boolean forwardIssuesAsErrors;
+ private final boolean asyncApplyBucketDiff;
public FeatureFlags(FlagSource source, ApplicationId appId) {
this.defaultTermwiseLimit = flagValue(source, appId, Flags.DEFAULT_TERM_WISE_LIMIT);
@@ -216,7 +216,6 @@ public class ModelContextImpl implements ModelContext {
this.maxActivationInhibitedOutOfSyncGroups = flagValue(source, appId, Flags.MAX_ACTIVATION_INHIBITED_OUT_OF_SYNC_GROUPS);
this.jvmOmitStackTraceInFastThrow = type -> flagValueAsInt(source, appId, type, PermanentFlags.JVM_OMIT_STACK_TRACE_IN_FAST_THROW);
this.largeRankExpressionLimit = flagValue(source, appId, Flags.LARGE_RANK_EXPRESSION_LIMIT);
- this.requireConnectivityCheck = flagValue(source, appId, Flags.REQUIRE_CONNECTIVITY_CHECK);
this.maxConcurrentMergesPerContentNode = flagValue(source, appId, Flags.MAX_CONCURRENT_MERGES_PER_NODE);
this.maxMergeQueueSize = flagValue(source, appId, Flags.MAX_MERGE_QUEUE_SIZE);
this.ignoreMergeQueueLimit = flagValue(source, appId, Flags.IGNORE_MERGE_QUEUE_LIMIT);
@@ -234,6 +233,7 @@ public class ModelContextImpl implements ModelContext {
this.distributorEnhancedMaintenanceScheduling = flagValue(source, appId, Flags.DISTRIBUTOR_ENHANCED_MAINTENANCE_SCHEDULING);
this.maxUnCommittedMemory = flagValue(source, appId, Flags.MAX_UNCOMMITTED_MEMORY);;
this.forwardIssuesAsErrors = flagValue(source, appId, PermanentFlags.FORWARD_ISSUES_AS_ERRORS);
+ this.asyncApplyBucketDiff = flagValue(source, appId, Flags.ASYNC_APPLY_BUCKET_DIFF);
}
@Override public double defaultTermwiseLimit() { return defaultTermwiseLimit; }
@@ -254,7 +254,6 @@ public class ModelContextImpl implements ModelContext {
return translateJvmOmitStackTraceInFastThrowIntToString(jvmOmitStackTraceInFastThrow, type);
}
@Override public int largeRankExpressionLimit() { return largeRankExpressionLimit; }
- @Override public boolean requireConnectivityCheck() { return requireConnectivityCheck; }
@Override public int maxConcurrentMergesPerNode() { return maxConcurrentMergesPerContentNode; }
@Override public int maxMergeQueueSize() { return maxMergeQueueSize; }
@Override public boolean ignoreMergeQueueLimit() { return ignoreMergeQueueLimit; }
@@ -272,6 +271,7 @@ public class ModelContextImpl implements ModelContext {
@Override public boolean distributorEnhancedMaintenanceScheduling() { return distributorEnhancedMaintenanceScheduling; }
@Override public int maxUnCommittedMemory() { return maxUnCommittedMemory; }
@Override public boolean forwardIssuesAsErrors() { return forwardIssuesAsErrors; }
+ @Override public boolean asyncApplyBucketDiff() { return asyncApplyBucketDiff; }
private static <V> V flagValue(FlagSource source, ApplicationId appId, UnboundFlag<? extends V, ?, ?> flag) {
return flag.bindTo(source)
diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionRepository.java b/configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionRepository.java
index 083cb535bfa..0aeea5ce2d5 100644
--- a/configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionRepository.java
+++ b/configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionRepository.java
@@ -433,7 +433,8 @@ public class SessionRepository {
private Optional<RemoteSession> loadSessionIfActive(RemoteSession session) {
for (ApplicationId applicationId : applicationRepo.activeApplications()) {
- if (applicationRepo.requireActiveSessionOf(applicationId) == session.getSessionId()) {
+ Optional<Long> activeSession = applicationRepo.activeSessionOf(applicationId);
+ if (activeSession.isPresent() && activeSession.get() == session.getSessionId()) {
log.log(Level.FINE, () -> "Found active application for session " + session.getSessionId() + " , loading it");
applicationRepo.activateApplication(ensureApplicationLoaded(session), session.getSessionId());
log.log(Level.INFO, session.logPre() + "Application activated successfully: " + applicationId + " (generation " + session.getSessionId() + ")");
diff --git a/container-dependency-versions/pom.xml b/container-dependency-versions/pom.xml
index 27f7374b802..cc44ada80b4 100644
--- a/container-dependency-versions/pom.xml
+++ b/container-dependency-versions/pom.xml
@@ -403,8 +403,8 @@
<properties>
<aopalliance.version>1.0</aopalliance.version>
<bouncycastle.version>1.68</bouncycastle.version>
- <felix.version>6.0.3</felix.version>
- <felix.log.version>1.0.1</felix.log.version>
+ <felix.version>7.0.1</felix.version>
+ <felix.log.version>1.0.1</felix.log.version> <!-- TODO Vespa 8: upgrade! -->
<findbugs.version>1.3.9</findbugs.version>
<guava.version>20.0</guava.version>
<guice.version>3.0</guice.version>
diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/application/Endpoint.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/application/Endpoint.java
index b441d40c1c3..3a35e3aec7a 100644
--- a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/application/Endpoint.java
+++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/application/Endpoint.java
@@ -3,6 +3,7 @@ package com.yahoo.vespa.hosted.controller.application;
import com.yahoo.config.provision.ApplicationId;
import com.yahoo.config.provision.ClusterSpec;
+import com.yahoo.config.provision.InstanceName;
import com.yahoo.config.provision.RegionName;
import com.yahoo.config.provision.SystemName;
import com.yahoo.config.provision.zone.RoutingMethod;
@@ -13,13 +14,15 @@ import com.yahoo.vespa.hosted.controller.api.identifiers.DeploymentId;
import java.net.URI;
import java.util.List;
import java.util.Objects;
+import java.util.Optional;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
- * Represents an application's endpoint in hosted Vespa. This encapsulates all logic for building URLs and DNS names for
- * application endpoints.
+ * Represents an application or instance endpoint in hosted Vespa.
+ *
+ * This encapsulates the logic for building URLs and DNS names for applications in all hosted Vespa systems.
*
* @author mpolden
*/
@@ -50,7 +53,7 @@ public class Endpoint {
if (scope == Scope.global) {
if (id == null) throw new IllegalArgumentException("Endpoint ID must be set for global endpoints");
} else {
- if (id != null) throw new IllegalArgumentException("Endpoint ID cannot be set for " + scope + " endpoints");
+ if (scope == Scope.zone && id != null) throw new IllegalArgumentException("Endpoint ID cannot be set for " + scope + " endpoints");
if (zones.size() != 1) throw new IllegalArgumentException("A single zone must be given for " + scope + " endpoints");
}
this.id = id;
@@ -63,12 +66,14 @@ public class Endpoint {
this.tls = port.tls;
}
- private Endpoint(EndpointId id, ClusterSpec.Id cluster, ApplicationId application, List<ZoneId> zones, Scope scope, SystemName system,
- Port port, boolean legacy, RoutingMethod routingMethod) {
+ private Endpoint(EndpointId id, ClusterSpec.Id cluster, TenantAndApplicationId application,
+ Optional<InstanceName> instance, List<ZoneId> zones, Scope scope, SystemName system, Port port,
+ boolean legacy, RoutingMethod routingMethod) {
this(id,
cluster,
createUrl(endpointOrClusterAsString(id, cluster),
Objects.requireNonNull(application, "application must be non-null"),
+ Objects.requireNonNull(instance, "instance must be non-null"),
zones,
scope,
Objects.requireNonNull(system, "system must be non-null"),
@@ -164,20 +169,21 @@ public class Endpoint {
return id == null ? cluster.value() : id.id();
}
- private static URI createUrl(String name, ApplicationId application, List<ZoneId> zones, Scope scope,
- SystemName system, Port port, boolean legacy, RoutingMethod routingMethod) {
+ private static URI createUrl(String name, TenantAndApplicationId application, Optional<InstanceName> instance,
+ List<ZoneId> zones, Scope scope, SystemName system, Port port, boolean legacy,
+ RoutingMethod routingMethod) {
String scheme = port.tls ? "https" : "http";
String separator = separator(system, routingMethod, port.tls);
String portPart = port.isDefault() ? "" : ":" + port.port;
return URI.create(scheme + "://" +
sanitize(namePart(name, separator)) +
systemPart(system, separator, legacy) +
- sanitize(instancePart(application, separator)) +
+ sanitize(instancePart(instance, separator)) +
sanitize(application.application().value()) +
separator +
sanitize(application.tenant().value()) +
"." +
- scopePart(scope, zones, legacy, system) +
+ scopePart(scope, zones, system, legacy) +
dnsSuffix(system, legacy) +
portPart +
"/");
@@ -199,26 +205,42 @@ public class Endpoint {
return name + separator;
}
- private static String scopePart(Scope scope, List<ZoneId> zones, boolean legacy, SystemName system) {
+ private static String scopePart(Scope scope, List<ZoneId> zones, SystemName system, boolean legacy) {
+ String scopeSymbol = scopeSymbol(scope, system, legacy);
+ if (scope == Scope.global) return scopeSymbol;
+
+ ZoneId zone = zones.get(0);
+ String region = zone.region().value();
+ boolean skipEnvironment = zone.environment().isProduction() && (system.isPublic() || !legacy);
+ String environment = skipEnvironment ? "" : "." + zone.environment().value();
if (system.isPublic() && !legacy) {
- if (scope == Scope.global) return "g";
- var zone = zones.get(0);
- var region = zone.region().value();
- char scopeSymbol = scope == Scope.region ? 'r' : 'z';
- String environment = zone.environment().isProduction() ? "" : "." + zone.environment().value();
return region + environment + "." + scopeSymbol;
}
- if (scope == Scope.global) return "global";
- var zone = zones.get(0);
- var region = zone.region().value();
- if (scope == Scope.region) region += "-w";
- if ((system.isPublic() || !legacy) && zone.environment().isProduction()) return region;
- return region + "." + zone.environment().value();
+ return region + (scopeSymbol.isEmpty() ? "" : "-" + scopeSymbol) + environment;
}
- private static String instancePart(ApplicationId application, String separator) {
- if (application.instance().isDefault()) return ""; // Skip "default"
- return application.instance().value() + separator;
+ private static String scopeSymbol(Scope scope, SystemName system, boolean legacy) {
+ if (system.isPublic() && !legacy) {
+ switch (scope) {
+ case zone: return "z";
+ case regionSplit: return "w";
+ case region: return "r";
+ case global: return "g";
+ }
+ }
+ switch (scope) {
+ case zone: return "";
+ case regionSplit: return "w";
+ case region: return "r";
+ case global: return "global";
+ }
+ throw new IllegalArgumentException("No scope symbol defined for " + scope + " in " + system + " (legacy: " + legacy + ")");
+ }
+
+ private static String instancePart(Optional<InstanceName> instance, String separator) {
+ if (instance.isEmpty()) return "";
+ if (instance.get().isDefault()) return ""; // Skip "default"
+ return instance.get().value() + separator;
}
private static String systemPart(SystemName system, String separator, boolean legacy) {
@@ -246,7 +268,7 @@ public class Endpoint {
private static String upstreamIdOf(String name, ApplicationId application, ZoneId zone) {
return Stream.of(namePart(name, ""),
- instancePart(application, ""),
+ instancePart(Optional.of(application.instance()), ""),
application.application().value(),
application.tenant().value(),
zone.region().value(),
@@ -289,12 +311,21 @@ public class Endpoint {
/** Endpoint points to one or more zones. Traffic is routed to the zone closest to the client */
global,
- /** Endpoint points to one more zones in the same geographical region. Traffic is routed equally across zones */
+ /**
+ * Endpoint points to one more zones in the same geographical region. Traffic is weighted according to
+ * configured weights.
+ */
region,
/** Endpoint points to a single zone */
zone,
+ /**
+ * Endpoint points to one more zones in the same geographical region. Traffic is routed equally across zones.
+ *
+ * This is for internal use only. Endpoints with this scope are not exposed directly to tenants.
+ */
+ regionSplit,
}
/** Represents an endpoint's HTTP port */
@@ -338,9 +369,14 @@ public class Endpoint {
}
+ /** Build an endpoint for given instance */
+ public static EndpointBuilder of(ApplicationId instance) {
+ return new EndpointBuilder(TenantAndApplicationId.from(instance), Optional.of(instance.instance()));
+ }
+
/** Build an endpoint for given application */
- public static EndpointBuilder of(ApplicationId application) {
- return new EndpointBuilder(application);
+ public static EndpointBuilder of(TenantAndApplicationId application) {
+ return new EndpointBuilder(application, Optional.empty());
}
/** Create an endpoint for given system application */
@@ -353,7 +389,8 @@ public class Endpoint {
public static class EndpointBuilder {
- private final ApplicationId application;
+ private final TenantAndApplicationId application;
+ private final Optional<InstanceName> instance;
private Scope scope;
private List<ZoneId> zones;
@@ -363,8 +400,9 @@ public class Endpoint {
private RoutingMethod routingMethod = RoutingMethod.shared;
private boolean legacy = false;
- private EndpointBuilder(ApplicationId application) {
- this.application = application;
+ private EndpointBuilder(TenantAndApplicationId application, Optional<InstanceName> instance) {
+ this.application = Objects.requireNonNull(application);
+ this.instance = Objects.requireNonNull(instance);
}
/** Sets the zone target for this */
@@ -402,9 +440,19 @@ public class Endpoint {
}
/** Sets the region target for this, deduced from given zone */
- public EndpointBuilder targetRegion(ClusterSpec.Id cluster, ZoneId zone) {
+ public EndpointBuilder targetRegionSplit(ClusterSpec.Id cluster, ZoneId zone) {
checkScope();
this.cluster = cluster;
+ this.scope = Scope.regionSplit;
+ this.zones = List.of(effectiveZone(zone));
+ return this;
+ }
+
+ /** Sets the region target for this by endpointId, deduced from given zone */
+ public EndpointBuilder targetRegion(EndpointId endpointId, ClusterSpec.Id cluster, ZoneId zone) {
+ checkScope();
+ this.endpointId = endpointId;
+ this.cluster = cluster;
this.scope = Scope.region;
this.zones = List.of(effectiveZone(zone));
return this;
@@ -436,7 +484,10 @@ public class Endpoint {
if (routingMethod.isDirect() && !port.isDefault()) {
throw new IllegalArgumentException("Routing method " + routingMethod + " can only use default port");
}
- return new Endpoint(endpointId, cluster, application, zones, scope, system, port, legacy, routingMethod);
+ if (scope == Scope.region && instance.isPresent()) {
+ throw new IllegalArgumentException("Instance cannot be set for scope " + scope);
+ }
+ return new Endpoint(endpointId, cluster, application, instance, zones, scope, system, port, legacy, routingMethod);
}
private void checkScope() {
diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/application/pkg/ApplicationPackageValidator.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/application/pkg/ApplicationPackageValidator.java
index 3dc95251eb8..bd6376e58aa 100644
--- a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/application/pkg/ApplicationPackageValidator.java
+++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/application/pkg/ApplicationPackageValidator.java
@@ -102,7 +102,7 @@ public class ApplicationPackageValidator {
instant));
}
- /** Verify that compactable endpoint parts (instance aname nd endpoint ID) do not clash */
+ /** Verify that compactable endpoint parts (instance name and endpoint ID) do not clash */
private void validateCompactedEndpoint(ApplicationPackage applicationPackage) {
Map<List<String>, InstanceEndpoint> instanceEndpoints = new HashMap<>();
for (var instanceSpec : applicationPackage.deploymentSpec().instances()) {
diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/restapi/application/ApplicationApiHandler.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/restapi/application/ApplicationApiHandler.java
index 59d18fcf17b..51911d62ef4 100644
--- a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/restapi/application/ApplicationApiHandler.java
+++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/restapi/application/ApplicationApiHandler.java
@@ -2698,6 +2698,7 @@ public class ApplicationApiHandler extends AuditLoggingRequestHandler {
private static String endpointScopeString(Endpoint.Scope scope) {
switch (scope) {
case region: return "region";
+ case regionSplit: return "regionSplit";
case global: return "global";
case zone: return "zone";
}
diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/routing/RoutingPolicy.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/routing/RoutingPolicy.java
index ec9cf4064c9..f93e24f969e 100644
--- a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/routing/RoutingPolicy.java
+++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/routing/RoutingPolicy.java
@@ -111,7 +111,7 @@ public class RoutingPolicy {
/** Returns the region endpoint of this */
public Endpoint regionEndpointIn(SystemName system, RoutingMethod routingMethod, boolean legacy) {
- Endpoint.EndpointBuilder endpoint = endpoint(routingMethod).targetRegion(id.cluster(), id.zone());
+ Endpoint.EndpointBuilder endpoint = endpoint(routingMethod).targetRegionSplit(id.cluster(), id.zone());
if (legacy) {
endpoint = endpoint.legacy();
}
diff --git a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/application/EndpointTest.java b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/application/EndpointTest.java
index da641d17a8a..5d985518809 100644
--- a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/application/EndpointTest.java
+++ b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/application/EndpointTest.java
@@ -20,8 +20,10 @@ import static org.junit.Assert.assertEquals;
*/
public class EndpointTest {
- private static final ApplicationId app1 = ApplicationId.from("t1", "a1", "default");
- private static final ApplicationId app2 = ApplicationId.from("t2", "a2", "i2");
+ private static final ApplicationId instance1 = ApplicationId.from("t1", "a1", "default");
+ private static final ApplicationId instance2 = ApplicationId.from("t2", "a2", "i2");
+ private static final TenantAndApplicationId app1 = TenantAndApplicationId.from(instance1);
+ private static final TenantAndApplicationId app2 = TenantAndApplicationId.from(instance2);
@Test
public void global_endpoints() {
@@ -30,62 +32,62 @@ public class EndpointTest {
Map<String, Endpoint> tests = Map.of(
// Legacy endpoint
"http://a1.t1.global.vespa.yahooapis.com:4080/",
- Endpoint.of(app1).target(endpointId).on(Port.plain(4080)).legacy().in(SystemName.main),
+ Endpoint.of(instance1).target(endpointId).on(Port.plain(4080)).legacy().in(SystemName.main),
// Legacy endpoint with TLS
"https://a1--t1.global.vespa.yahooapis.com:4443/",
- Endpoint.of(app1).target(endpointId).on(Port.tls(4443)).legacy().in(SystemName.main),
+ Endpoint.of(instance1).target(endpointId).on(Port.tls(4443)).legacy().in(SystemName.main),
// Main endpoint
"https://a1--t1.global.vespa.oath.cloud:4443/",
- Endpoint.of(app1).target(endpointId).on(Port.tls(4443)).in(SystemName.main),
+ Endpoint.of(instance1).target(endpointId).on(Port.tls(4443)).in(SystemName.main),
// Main endpoint in CD
"https://cd--a1--t1.global.vespa.oath.cloud:4443/",
- Endpoint.of(app1).target(endpointId).on(Port.tls(4443)).in(SystemName.cd),
+ Endpoint.of(instance1).target(endpointId).on(Port.tls(4443)).in(SystemName.cd),
// Main endpoint in CD
"https://cd--i2--a2--t2.global.vespa.oath.cloud:4443/",
- Endpoint.of(app2).target(endpointId).on(Port.tls(4443)).in(SystemName.cd),
+ Endpoint.of(instance2).target(endpointId).on(Port.tls(4443)).in(SystemName.cd),
// Main endpoint with direct routing and default TLS port
"https://a1.t1.global.vespa.oath.cloud/",
- Endpoint.of(app1).target(endpointId).on(Port.tls()).routingMethod(RoutingMethod.exclusive).in(SystemName.main),
+ Endpoint.of(instance1).target(endpointId).on(Port.tls()).routingMethod(RoutingMethod.exclusive).in(SystemName.main),
// Main endpoint with custom rotation name
"https://r1.a1.t1.global.vespa.oath.cloud/",
- Endpoint.of(app1).target(EndpointId.of("r1")).on(Port.tls()).routingMethod(RoutingMethod.exclusive).in(SystemName.main),
+ Endpoint.of(instance1).target(EndpointId.of("r1")).on(Port.tls()).routingMethod(RoutingMethod.exclusive).in(SystemName.main),
// Main endpoint for custom instance in default rotation
"https://i2.a2.t2.global.vespa.oath.cloud/",
- Endpoint.of(app2).target(endpointId).on(Port.tls()).routingMethod(RoutingMethod.exclusive).in(SystemName.main),
+ Endpoint.of(instance2).target(endpointId).on(Port.tls()).routingMethod(RoutingMethod.exclusive).in(SystemName.main),
// Main endpoint for custom instance with custom rotation name
"https://r2.i2.a2.t2.global.vespa.oath.cloud/",
- Endpoint.of(app2).target(EndpointId.of("r2")).on(Port.tls()).routingMethod(RoutingMethod.exclusive).in(SystemName.main),
+ Endpoint.of(instance2).target(EndpointId.of("r2")).on(Port.tls()).routingMethod(RoutingMethod.exclusive).in(SystemName.main),
// Main endpoint in public system (legacy)
"https://a1.t1.global.public.vespa.oath.cloud/",
- Endpoint.of(app1).target(endpointId).on(Port.tls()).routingMethod(RoutingMethod.exclusive).legacy().in(SystemName.Public)
+ Endpoint.of(instance1).target(endpointId).on(Port.tls()).routingMethod(RoutingMethod.exclusive).legacy().in(SystemName.Public)
);
tests.forEach((expected, endpoint) -> assertEquals(expected, endpoint.url().toString()));
Map<String, Endpoint> tests2 = Map.of(
// Main endpoint in public CD system (legacy)
"https://publiccd.a1.t1.global.public-cd.vespa.oath.cloud/",
- Endpoint.of(app1).target(endpointId).on(Port.tls()).routingMethod(RoutingMethod.exclusive).legacy().in(SystemName.PublicCd),
+ Endpoint.of(instance1).target(endpointId).on(Port.tls()).routingMethod(RoutingMethod.exclusive).legacy().in(SystemName.PublicCd),
// Default endpoint in public system
"https://a1.t1.g.vespa-app.cloud/",
- Endpoint.of(app1).target(endpointId).on(Port.tls()).routingMethod(RoutingMethod.exclusive).in(SystemName.Public),
+ Endpoint.of(instance1).target(endpointId).on(Port.tls()).routingMethod(RoutingMethod.exclusive).in(SystemName.Public),
// Default endpoint in public CD system
"https://a1.t1.g.cd.vespa-app.cloud/",
- Endpoint.of(app1).target(endpointId).on(Port.tls()).routingMethod(RoutingMethod.exclusive).in(SystemName.PublicCd),
+ Endpoint.of(instance1).target(endpointId).on(Port.tls()).routingMethod(RoutingMethod.exclusive).in(SystemName.PublicCd),
// Custom instance in public system
"https://i2.a2.t2.g.vespa-app.cloud/",
- Endpoint.of(app2).target(endpointId).on(Port.tls()).routingMethod(RoutingMethod.exclusive).in(SystemName.Public)
+ Endpoint.of(instance2).target(endpointId).on(Port.tls()).routingMethod(RoutingMethod.exclusive).in(SystemName.Public)
);
tests2.forEach((expected, endpoint) -> assertEquals(expected, endpoint.url().toString()));
}
@@ -97,54 +99,54 @@ public class EndpointTest {
Map<String, Endpoint> tests = Map.of(
// Legacy endpoint
"http://a1.t1.global.vespa.yahooapis.com:4080/",
- Endpoint.of(app1).target(endpointId).on(Port.plain(4080)).legacy().in(SystemName.main),
+ Endpoint.of(instance1).target(endpointId).on(Port.plain(4080)).legacy().in(SystemName.main),
// Legacy endpoint with TLS
"https://a1--t1.global.vespa.yahooapis.com:4443/",
- Endpoint.of(app1).target(endpointId).on(Port.tls(4443)).legacy().in(SystemName.main),
+ Endpoint.of(instance1).target(endpointId).on(Port.tls(4443)).legacy().in(SystemName.main),
// Main endpoint
"https://a1--t1.global.vespa.oath.cloud:4443/",
- Endpoint.of(app1).target(endpointId).on(Port.tls(4443)).in(SystemName.main),
+ Endpoint.of(instance1).target(endpointId).on(Port.tls(4443)).in(SystemName.main),
// Main endpoint in CD
"https://cd--i2--a2--t2.global.vespa.oath.cloud:4443/",
- Endpoint.of(app2).target(endpointId).on(Port.tls(4443)).in(SystemName.cd),
+ Endpoint.of(instance2).target(endpointId).on(Port.tls(4443)).in(SystemName.cd),
// Main endpoint in CD
"https://cd--a1--t1.global.vespa.oath.cloud:4443/",
- Endpoint.of(app1).target(endpointId).on(Port.tls(4443)).in(SystemName.cd),
+ Endpoint.of(instance1).target(endpointId).on(Port.tls(4443)).in(SystemName.cd),
// Main endpoint with direct routing and default TLS port
"https://a1.t1.global.vespa.oath.cloud/",
- Endpoint.of(app1).target(endpointId).on(Port.tls()).routingMethod(RoutingMethod.exclusive).in(SystemName.main),
+ Endpoint.of(instance1).target(endpointId).on(Port.tls()).routingMethod(RoutingMethod.exclusive).in(SystemName.main),
// Main endpoint with custom rotation name
"https://r1.a1.t1.global.vespa.oath.cloud/",
- Endpoint.of(app1).target(EndpointId.of("r1")).on(Port.tls()).routingMethod(RoutingMethod.exclusive).in(SystemName.main),
+ Endpoint.of(instance1).target(EndpointId.of("r1")).on(Port.tls()).routingMethod(RoutingMethod.exclusive).in(SystemName.main),
// Main endpoint for custom instance in default rotation
"https://i2.a2.t2.global.vespa.oath.cloud/",
- Endpoint.of(app2).target(endpointId).on(Port.tls()).routingMethod(RoutingMethod.exclusive).in(SystemName.main),
+ Endpoint.of(instance2).target(endpointId).on(Port.tls()).routingMethod(RoutingMethod.exclusive).in(SystemName.main),
// Main endpoint for custom instance with custom rotation name
"https://r2.i2.a2.t2.global.vespa.oath.cloud/",
- Endpoint.of(app2).target(EndpointId.of("r2")).on(Port.tls()).routingMethod(RoutingMethod.exclusive).in(SystemName.main),
+ Endpoint.of(instance2).target(EndpointId.of("r2")).on(Port.tls()).routingMethod(RoutingMethod.exclusive).in(SystemName.main),
// Main endpoint in public system (legacy)
"https://a1.t1.global.public.vespa.oath.cloud/",
- Endpoint.of(app1).target(endpointId).on(Port.tls()).routingMethod(RoutingMethod.exclusive).legacy().in(SystemName.Public)
+ Endpoint.of(instance1).target(endpointId).on(Port.tls()).routingMethod(RoutingMethod.exclusive).legacy().in(SystemName.Public)
);
tests.forEach((expected, endpoint) -> assertEquals(expected, endpoint.url().toString()));
Map<String, Endpoint> tests2 = Map.of(
// Custom endpoint and instance in public CD system (legacy)
"https://foo.publiccd.i2.a2.t2.global.public-cd.vespa.oath.cloud/",
- Endpoint.of(app2).target(EndpointId.of("foo")).on(Port.tls()).routingMethod(RoutingMethod.exclusive).legacy().in(SystemName.PublicCd),
+ Endpoint.of(instance2).target(EndpointId.of("foo")).on(Port.tls()).routingMethod(RoutingMethod.exclusive).legacy().in(SystemName.PublicCd),
// Custom endpoint and instance in public system
"https://foo.i2.a2.t2.g.vespa-app.cloud/",
- Endpoint.of(app2).target(EndpointId.of("foo")).on(Port.tls()).routingMethod(RoutingMethod.exclusive).in(SystemName.Public)
+ Endpoint.of(instance2).target(EndpointId.of("foo")).on(Port.tls()).routingMethod(RoutingMethod.exclusive).in(SystemName.Public)
);
tests2.forEach((expected, endpoint) -> assertEquals(expected, endpoint.url().toString()));
}
@@ -158,62 +160,62 @@ public class EndpointTest {
Map<String, Endpoint> tests = Map.of(
// Legacy endpoint (always contains environment)
"http://a1.t1.us-north-1.prod.vespa.yahooapis.com:4080/",
- Endpoint.of(app1).target(cluster, prodZone).on(Port.plain(4080)).legacy().in(SystemName.main),
+ Endpoint.of(instance1).target(cluster, prodZone).on(Port.plain(4080)).legacy().in(SystemName.main),
// Secure legacy endpoint
"https://a1--t1.us-north-1.prod.vespa.yahooapis.com:4443/",
- Endpoint.of(app1).target(cluster, prodZone).on(Port.tls(4443)).legacy().in(SystemName.main),
+ Endpoint.of(instance1).target(cluster, prodZone).on(Port.tls(4443)).legacy().in(SystemName.main),
// Prod endpoint in main
"https://a1--t1.us-north-1.vespa.oath.cloud:4443/",
- Endpoint.of(app1).target(cluster, prodZone).on(Port.tls(4443)).in(SystemName.main),
+ Endpoint.of(instance1).target(cluster, prodZone).on(Port.tls(4443)).in(SystemName.main),
// Prod endpoint in CD
"https://cd--a1--t1.us-north-1.vespa.oath.cloud:4443/",
- Endpoint.of(app1).target(cluster, prodZone).on(Port.tls(4443)).in(SystemName.cd),
+ Endpoint.of(instance1).target(cluster, prodZone).on(Port.tls(4443)).in(SystemName.cd),
// Test endpoint in main
"https://a1--t1.us-north-2.test.vespa.oath.cloud:4443/",
- Endpoint.of(app1).target(cluster, testZone).on(Port.tls(4443)).in(SystemName.main),
+ Endpoint.of(instance1).target(cluster, testZone).on(Port.tls(4443)).in(SystemName.main),
// Non-default cluster in main
"https://c1--a1--t1.us-north-1.vespa.oath.cloud/",
- Endpoint.of(app1).target(ClusterSpec.Id.from("c1"), prodZone).on(Port.tls()).in(SystemName.main),
+ Endpoint.of(instance1).target(ClusterSpec.Id.from("c1"), prodZone).on(Port.tls()).in(SystemName.main),
// Non-default instance in main
"https://i2--a2--t2.us-north-1.vespa.oath.cloud:4443/",
- Endpoint.of(app2).target(cluster, prodZone).on(Port.tls(4443)).in(SystemName.main),
+ Endpoint.of(instance2).target(cluster, prodZone).on(Port.tls(4443)).in(SystemName.main),
// Non-default cluster in public (legacy)
"https://c1.a1.t1.us-north-1.public.vespa.oath.cloud/",
- Endpoint.of(app1).target(ClusterSpec.Id.from("c1"), prodZone).on(Port.tls()).routingMethod(RoutingMethod.exclusive).legacy().in(SystemName.Public),
+ Endpoint.of(instance1).target(ClusterSpec.Id.from("c1"), prodZone).on(Port.tls()).routingMethod(RoutingMethod.exclusive).legacy().in(SystemName.Public),
// Non-default cluster and instance in public (legacy)
"https://c2.i2.a2.t2.us-north-1.public.vespa.oath.cloud/",
- Endpoint.of(app2).target(ClusterSpec.Id.from("c2"), prodZone).on(Port.tls()).routingMethod(RoutingMethod.exclusive).legacy().in(SystemName.Public),
+ Endpoint.of(instance2).target(ClusterSpec.Id.from("c2"), prodZone).on(Port.tls()).routingMethod(RoutingMethod.exclusive).legacy().in(SystemName.Public),
// Endpoint in main using shared layer 4
"https://a1.t1.us-north-1.vespa.oath.cloud/",
- Endpoint.of(app1).target(cluster, prodZone).on(Port.tls()).routingMethod(RoutingMethod.sharedLayer4).in(SystemName.main)
+ Endpoint.of(instance1).target(cluster, prodZone).on(Port.tls()).routingMethod(RoutingMethod.sharedLayer4).in(SystemName.main)
);
tests.forEach((expected, endpoint) -> assertEquals(expected, endpoint.url().toString()));
Map<String, Endpoint> tests2 = Map.of(
// Non-default cluster and instance in public CD (legacy)
"https://c2.publiccd.i2.a2.t2.us-north-1.public-cd.vespa.oath.cloud/",
- Endpoint.of(app2).target(ClusterSpec.Id.from("c2"), prodZone).on(Port.tls()).routingMethod(RoutingMethod.exclusive).legacy().in(SystemName.PublicCd),
+ Endpoint.of(instance2).target(ClusterSpec.Id.from("c2"), prodZone).on(Port.tls()).routingMethod(RoutingMethod.exclusive).legacy().in(SystemName.PublicCd),
// Custom cluster name in public
"https://c1.a1.t1.us-north-1.z.vespa-app.cloud/",
- Endpoint.of(app1).target(ClusterSpec.Id.from("c1"), prodZone).on(Port.tls()).routingMethod(RoutingMethod.exclusive).in(SystemName.Public),
+ Endpoint.of(instance1).target(ClusterSpec.Id.from("c1"), prodZone).on(Port.tls()).routingMethod(RoutingMethod.exclusive).in(SystemName.Public),
// Default cluster name in non-production zone in public
"https://a1.t1.us-north-2.test.z.vespa-app.cloud/",
- Endpoint.of(app1).target(ClusterSpec.Id.from("default"), testZone).on(Port.tls()).routingMethod(RoutingMethod.exclusive).in(SystemName.Public),
+ Endpoint.of(instance1).target(ClusterSpec.Id.from("default"), testZone).on(Port.tls()).routingMethod(RoutingMethod.exclusive).in(SystemName.Public),
// Default cluster name in public CD
"https://a1.t1.us-north-1.z.cd.vespa-app.cloud/",
- Endpoint.of(app1).target(ClusterSpec.Id.from("default"), prodZone).on(Port.tls()).routingMethod(RoutingMethod.exclusive).in(SystemName.PublicCd)
+ Endpoint.of(instance1).target(ClusterSpec.Id.from("default"), prodZone).on(Port.tls()).routingMethod(RoutingMethod.exclusive).in(SystemName.PublicCd)
);
tests2.forEach((expected, endpoint) -> assertEquals(expected, endpoint.url().toString()));
}
@@ -227,7 +229,7 @@ public class EndpointTest {
var tests = Map.of(
// Default rotation
"https://a1.t1.global.public.vespa.oath.cloud/",
- Endpoint.of(app1)
+ Endpoint.of(instance1)
.target(EndpointId.defaultId())
.routingMethod(RoutingMethod.exclusive)
.on(Port.tls())
@@ -236,7 +238,7 @@ public class EndpointTest {
// Wildcard to match other rotations
"https://*.a1.t1.global.public.vespa.oath.cloud/",
- Endpoint.of(app1)
+ Endpoint.of(instance1)
.wildcard()
.routingMethod(RoutingMethod.exclusive)
.on(Port.tls())
@@ -245,7 +247,7 @@ public class EndpointTest {
// Default cluster in zone
"https://a1.t1.us-north-1.public.vespa.oath.cloud/",
- Endpoint.of(app1)
+ Endpoint.of(instance1)
.target(defaultCluster, prodZone)
.routingMethod(RoutingMethod.exclusive)
.on(Port.tls())
@@ -254,7 +256,7 @@ public class EndpointTest {
// Wildcard to match other clusters in zone
"https://*.a1.t1.us-north-1.public.vespa.oath.cloud/",
- Endpoint.of(app1)
+ Endpoint.of(instance1)
.wildcard(prodZone)
.routingMethod(RoutingMethod.exclusive)
.on(Port.tls())
@@ -263,7 +265,7 @@ public class EndpointTest {
// Default cluster in test zone
"https://a1.t1.us-north-2.test.public.vespa.oath.cloud/",
- Endpoint.of(app1)
+ Endpoint.of(instance1)
.target(defaultCluster, testZone)
.routingMethod(RoutingMethod.exclusive)
.on(Port.tls())
@@ -272,7 +274,7 @@ public class EndpointTest {
// Wildcard to match other clusters in test zone
"https://*.a1.t1.us-north-2.test.public.vespa.oath.cloud/",
- Endpoint.of(app1)
+ Endpoint.of(instance1)
.wildcard(testZone)
.routingMethod(RoutingMethod.exclusive)
.on(Port.tls())
@@ -281,7 +283,7 @@ public class EndpointTest {
// Wildcard to match other clusters in zone
"https://*.a1.t1.us-north-1.z.vespa-app.cloud/",
- Endpoint.of(app1)
+ Endpoint.of(instance1)
.wildcard(prodZone)
.routingMethod(RoutingMethod.exclusive)
.on(Port.tls())
@@ -296,37 +298,66 @@ public class EndpointTest {
var cluster = ClusterSpec.Id.from("default");
var prodZone = ZoneId.from("prod", "us-north-2");
Map<String, Endpoint> tests = Map.of(
- "https://a1.t1.us-north-1-w.public.vespa.oath.cloud/",
+ "https://r0.a1.t1.us-north-2-r.vespa.oath.cloud/",
Endpoint.of(app1)
- .targetRegion(cluster, ZoneId.from("prod", "us-north-1a"))
+ .targetRegion(EndpointId.of("r0"), ClusterSpec.Id.from("c1"), prodZone)
+ .routingMethod(RoutingMethod.sharedLayer4)
+ .on(Port.tls())
+ .in(SystemName.main),
+ "https://r1.a2.t2.us-north-2.r.vespa-app.cloud/",
+ Endpoint.of(app2)
+ .targetRegion(EndpointId.of("r1"), ClusterSpec.Id.from("c1"), prodZone)
+ .routingMethod(RoutingMethod.exclusive)
+ .on(Port.tls())
+ .in(SystemName.Public)
+ );
+ tests.forEach((expected, endpoint) -> assertEquals(expected, endpoint.url().toString()));
+ Endpoint endpoint = Endpoint.of(instance1)
+ .targetRegionSplit(cluster, ZoneId.from("prod", "us-north-1a"))
+ .routingMethod(RoutingMethod.exclusive)
+ .on(Port.tls())
+ .in(SystemName.main);
+ assertEquals("Availability zone is removed from region",
+ "us-north-1",
+ endpoint.zones().get(0).region().value());
+ }
+
+ @Test
+ public void region_split_endpoints() {
+ var cluster = ClusterSpec.Id.from("default");
+ var prodZone = ZoneId.from("prod", "us-north-2");
+ Map<String, Endpoint> tests = Map.of(
+ "https://a1.t1.us-north-1-w.public.vespa.oath.cloud/",
+ Endpoint.of(instance1)
+ .targetRegionSplit(cluster, ZoneId.from("prod", "us-north-1a"))
.routingMethod(RoutingMethod.exclusive)
.on(Port.tls())
.legacy()
.in(SystemName.Public),
"https://a1.t1.us-north-2-w.public.vespa.oath.cloud/",
- Endpoint.of(app1)
- .targetRegion(cluster, prodZone)
+ Endpoint.of(instance1)
+ .targetRegionSplit(cluster, prodZone)
.routingMethod(RoutingMethod.exclusive)
.on(Port.tls())
.legacy()
.in(SystemName.Public),
"https://a1.t1.us-north-2-w.test.public.vespa.oath.cloud/",
- Endpoint.of(app1)
- .targetRegion(cluster, ZoneId.from("test", "us-north-2"))
+ Endpoint.of(instance1)
+ .targetRegionSplit(cluster, ZoneId.from("test", "us-north-2"))
.routingMethod(RoutingMethod.exclusive)
.on(Port.tls())
.legacy()
.in(SystemName.Public),
- "https://c1.a1.t1.us-north-2.r.vespa-app.cloud/",
- Endpoint.of(app1)
- .targetRegion(ClusterSpec.Id.from("c1"), prodZone)
+ "https://c1.a1.t1.us-north-2.w.vespa-app.cloud/",
+ Endpoint.of(instance1)
+ .targetRegionSplit(ClusterSpec.Id.from("c1"), prodZone)
.routingMethod(RoutingMethod.exclusive)
.on(Port.tls())
.in(SystemName.Public)
);
tests.forEach((expected, endpoint) -> assertEquals(expected, endpoint.url().toString()));
- Endpoint endpoint = Endpoint.of(app1)
- .targetRegion(cluster, ZoneId.from("prod", "us-north-1a"))
+ Endpoint endpoint = Endpoint.of(instance1)
+ .targetRegionSplit(cluster, ZoneId.from("prod", "us-north-1a"))
.routingMethod(RoutingMethod.exclusive)
.on(Port.tls())
.in(SystemName.main);
@@ -341,23 +372,23 @@ public class EndpointTest {
var tests1 = Map.of(
// With default cluster
"a1.t1.us-north-1.prod",
- Endpoint.of(app1).target(EndpointId.defaultId()).on(Port.tls(4443)).in(SystemName.main),
+ Endpoint.of(instance1).target(EndpointId.defaultId()).on(Port.tls(4443)).in(SystemName.main),
// With non-default cluster
"c1.a1.t1.us-north-1.prod",
- Endpoint.of(app1).target(EndpointId.of("ignored1"), ClusterSpec.Id.from("c1"), List.of(zone)).on(Port.tls(4443)).in(SystemName.main)
+ Endpoint.of(instance1).target(EndpointId.of("ignored1"), ClusterSpec.Id.from("c1"), List.of(zone)).on(Port.tls(4443)).in(SystemName.main)
);
var tests2 = Map.of(
// With non-default instance and default cluster
"i2.a2.t2.us-north-1.prod",
- Endpoint.of(app2).target(EndpointId.defaultId(), ClusterSpec.Id.from("default"), List.of(zone)).on(Port.tls(4443)).in(SystemName.main),
+ Endpoint.of(instance2).target(EndpointId.defaultId(), ClusterSpec.Id.from("default"), List.of(zone)).on(Port.tls(4443)).in(SystemName.main),
// With non-default instance and cluster
"c2.i2.a2.t2.us-north-1.prod",
- Endpoint.of(app2).target(EndpointId.of("ignored2"), ClusterSpec.Id.from("c2"), List.of(zone)).on(Port.tls(4443)).in(SystemName.main)
+ Endpoint.of(instance2).target(EndpointId.of("ignored2"), ClusterSpec.Id.from("c2"), List.of(zone)).on(Port.tls(4443)).in(SystemName.main)
);
- tests1.forEach((expected, endpoint) -> assertEquals(expected, endpoint.upstreamIdOf(new DeploymentId(app1, zone))));
- tests2.forEach((expected, endpoint) -> assertEquals(expected, endpoint.upstreamIdOf(new DeploymentId(app2, zone))));
+ tests1.forEach((expected, endpoint) -> assertEquals(expected, endpoint.upstreamIdOf(new DeploymentId(instance1, zone))));
+ tests2.forEach((expected, endpoint) -> assertEquals(expected, endpoint.upstreamIdOf(new DeploymentId(instance2, zone))));
}
}
diff --git a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/routing/RoutingPoliciesTest.java b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/routing/RoutingPoliciesTest.java
index 2002b59dc1e..c678a83bf2c 100644
--- a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/routing/RoutingPoliciesTest.java
+++ b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/routing/RoutingPoliciesTest.java
@@ -364,11 +364,11 @@ public class RoutingPoliciesTest {
Map.of(zone1, 1L, zone2, 1L), true);
assertEquals("Registers expected DNS names",
Set.of("app1.tenant1.aws-eu-west-1-w.public.vespa.oath.cloud",
- "app1.tenant1.aws-eu-west-1.r.vespa-app.cloud",
+ "app1.tenant1.aws-eu-west-1.w.vespa-app.cloud",
"app1.tenant1.aws-eu-west-1a.public.vespa.oath.cloud",
"app1.tenant1.aws-eu-west-1a.z.vespa-app.cloud",
"app1.tenant1.aws-us-east-1-w.public.vespa.oath.cloud",
- "app1.tenant1.aws-us-east-1.r.vespa-app.cloud",
+ "app1.tenant1.aws-us-east-1.w.vespa-app.cloud",
"app1.tenant1.aws-us-east-1c.public.vespa.oath.cloud",
"app1.tenant1.aws-us-east-1c.z.vespa-app.cloud",
"app1.tenant1.g.vespa-app.cloud",
@@ -837,7 +837,7 @@ public class RoutingPoliciesTest {
DeploymentId deployment = new DeploymentId(application, zone);
EndpointList regionEndpoints = tester.controller().routing().endpointsOf(deployment)
.cluster(cluster)
- .scope(Endpoint.Scope.region);
+ .scope(Endpoint.Scope.regionSplit);
if (!legacy) {
regionEndpoints = regionEndpoints.not().legacy();
}
diff --git a/filedistribution/pom.xml b/filedistribution/pom.xml
index b9fda41e7f3..e5c4e921e85 100644
--- a/filedistribution/pom.xml
+++ b/filedistribution/pom.xml
@@ -26,6 +26,12 @@
</dependency>
<dependency>
<groupId>com.yahoo.vespa</groupId>
+ <artifactId>http-utils</artifactId>
+ <version>${project.version}</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.yahoo.vespa</groupId>
<artifactId>vespajlib</artifactId>
<version>${project.version}</version>
</dependency>
diff --git a/flags/src/main/java/com/yahoo/vespa/flags/Flags.java b/flags/src/main/java/com/yahoo/vespa/flags/Flags.java
index 39193db9e60..4b7d05928c0 100644
--- a/flags/src/main/java/com/yahoo/vespa/flags/Flags.java
+++ b/flags/src/main/java/com/yahoo/vespa/flags/Flags.java
@@ -88,7 +88,7 @@ public class Flags {
public static final UnboundIntFlag MAX_UNCOMMITTED_MEMORY = defineIntFlag(
"max-uncommitted-memory", 130000,
List.of("geirst, baldersheim"), "2021-10-21", "2022-01-01",
- "The task limit used by the executors handling feed in proton",
+ "Max amount of memory holding updates to an attribute before we do a commit.",
"Takes effect at redeployment",
ZONE_ID, APPLICATION_ID);
@@ -369,6 +369,13 @@ public class Flags {
"Takes effect at redeploy",
ZONE_ID, APPLICATION_ID);
+ public static final UnboundBooleanFlag ASYNC_APPLY_BUCKET_DIFF = defineFeatureFlag(
+ "async-apply-bucket-diff", false,
+ List.of("geirst", "vekterli"), "2021-10-22", "2022-01-31",
+ "Whether portions of apply bucket diff handling will be performed asynchronously",
+ "Takes effect at redeploy",
+ ZONE_ID, APPLICATION_ID);
+
/** WARNING: public for testing: All flags should be defined in {@link Flags}. */
public static UnboundBooleanFlag defineFeatureFlag(String flagId, boolean defaultValue, List<String> owners,
String createdAt, String expiresAt, String description,
diff --git a/jdisc_core/src/main/java/com/yahoo/jdisc/core/FelixParams.java b/jdisc_core/src/main/java/com/yahoo/jdisc/core/FelixParams.java
index 402c6c288f3..90f7d7aab01 100644
--- a/jdisc_core/src/main/java/com/yahoo/jdisc/core/FelixParams.java
+++ b/jdisc_core/src/main/java/com/yahoo/jdisc/core/FelixParams.java
@@ -54,7 +54,6 @@ public class FelixParams {
Map<String, String> ret = new HashMap<>();
ret.put(BundleCache.CACHE_ROOTDIR_PROP, cachePath);
ret.put(Constants.FRAMEWORK_SYSTEMPACKAGES, exportPackages.toString());
- ret.put(Constants.SUPPORTS_BOOTCLASSPATH_EXTENSION, "true");
ret.put(Constants.FRAMEWORK_BOOTDELEGATION, "com.yourkit.runtime,com.yourkit.probes,com.yourkit.probes.builtin,com.singularity.*");
ret.put(Constants.FRAMEWORK_BSNVERSION, Constants.FRAMEWORK_BSNVERSION_MANAGED);
return ret;
diff --git a/jdisc_core/src/main/java/com/yahoo/jdisc/core/OsgiLogHandler.java b/jdisc_core/src/main/java/com/yahoo/jdisc/core/OsgiLogHandler.java
index 50ac90b6181..b7993de5d82 100644
--- a/jdisc_core/src/main/java/com/yahoo/jdisc/core/OsgiLogHandler.java
+++ b/jdisc_core/src/main/java/com/yahoo/jdisc/core/OsgiLogHandler.java
@@ -168,5 +168,8 @@ class OsgiLogHandler extends Handler {
return new Hashtable<>();
}
+ @Override
+ public <A> A adapt(Class<A> aClass) { return null; }
+
}
}
diff --git a/persistence/src/vespa/persistence/spi/catchresult.cpp b/persistence/src/vespa/persistence/spi/catchresult.cpp
index 3dbe8cfdf7e..366e439cc2d 100644
--- a/persistence/src/vespa/persistence/spi/catchresult.cpp
+++ b/persistence/src/vespa/persistence/spi/catchresult.cpp
@@ -13,7 +13,7 @@ CatchResult::CatchResult()
CatchResult::~CatchResult() = default;
void
-CatchResult::onComplete(std::unique_ptr<Result> result) {
+CatchResult::onComplete(std::unique_ptr<Result> result) noexcept {
_promisedResult.set_value(std::move(result));
}
void
diff --git a/persistence/src/vespa/persistence/spi/catchresult.h b/persistence/src/vespa/persistence/spi/catchresult.h
index 80d4f863971..02c626ea23e 100644
--- a/persistence/src/vespa/persistence/spi/catchresult.h
+++ b/persistence/src/vespa/persistence/spi/catchresult.h
@@ -12,7 +12,7 @@ public:
std::future<std::unique_ptr<Result>> future_result() {
return _promisedResult.get_future();
}
- void onComplete(std::unique_ptr<Result> result) override;
+ void onComplete(std::unique_ptr<Result> result) noexcept override;
void addResultHandler(const ResultHandler * resultHandler) override;
private:
std::promise<std::unique_ptr<Result>> _promisedResult;
diff --git a/persistence/src/vespa/persistence/spi/operationcomplete.h b/persistence/src/vespa/persistence/spi/operationcomplete.h
index fd6b846b2a4..fd4d5714cbc 100644
--- a/persistence/src/vespa/persistence/spi/operationcomplete.h
+++ b/persistence/src/vespa/persistence/spi/operationcomplete.h
@@ -23,7 +23,7 @@ class OperationComplete
public:
using UP = std::unique_ptr<OperationComplete>;
virtual ~OperationComplete() = default;
- virtual void onComplete(std::unique_ptr<Result> result) = 0;
+ virtual void onComplete(std::unique_ptr<Result> result) noexcept = 0;
virtual void addResultHandler(const ResultHandler * resultHandler) = 0;
};
diff --git a/searchcore/src/vespa/searchcore/bmcluster/spi_bm_feed_handler.cpp b/searchcore/src/vespa/searchcore/bmcluster/spi_bm_feed_handler.cpp
index 8ad120e7d05..eccae8fe8ab 100644
--- a/searchcore/src/vespa/searchcore/bmcluster/spi_bm_feed_handler.cpp
+++ b/searchcore/src/vespa/searchcore/bmcluster/spi_bm_feed_handler.cpp
@@ -39,7 +39,7 @@ class MyOperationComplete : public storage::spi::OperationComplete
public:
MyOperationComplete(PersistenceProvider* provider, std::atomic<uint32_t> &errors, const Bucket& bucket, PendingTracker& tracker);
~MyOperationComplete() override;
- void onComplete(std::unique_ptr<storage::spi::Result> result) override;
+ void onComplete(std::unique_ptr<storage::spi::Result> result) noexcept override;
void addResultHandler(const storage::spi::ResultHandler* resultHandler) override;
};
@@ -58,7 +58,7 @@ MyOperationComplete::~MyOperationComplete()
}
void
-MyOperationComplete::onComplete(std::unique_ptr<storage::spi::Result> result)
+MyOperationComplete::onComplete(std::unique_ptr<storage::spi::Result> result) noexcept
{
if (result->hasError()) {
++_errors;
diff --git a/storage/src/tests/persistence/apply_bucket_diff_state_test.cpp b/storage/src/tests/persistence/apply_bucket_diff_state_test.cpp
index fe77477fa77..d51485df38d 100644
--- a/storage/src/tests/persistence/apply_bucket_diff_state_test.cpp
+++ b/storage/src/tests/persistence/apply_bucket_diff_state_test.cpp
@@ -11,6 +11,8 @@
using document::DocumentId;
using document::test::makeDocumentBucket;
+using vespalib::MonitoredRefCount;
+using vespalib::RetainGuard;
namespace storage {
@@ -72,6 +74,7 @@ class ApplyBucketDiffStateTestBase : public ::testing::Test
public:
uint32_t sync_count;
DummyMergeBucketInfoSyncer syncer;
+ MonitoredRefCount monitored_ref_count;
ApplyBucketDiffStateTestBase()
: ::testing::Test(),
@@ -83,7 +86,7 @@ public:
~ApplyBucketDiffStateTestBase();
std::unique_ptr<ApplyBucketDiffState> make_state() {
- return std::make_unique<ApplyBucketDiffState>(syncer, spi::Bucket(dummy_document_bucket));
+ return std::make_unique<ApplyBucketDiffState>(syncer, spi::Bucket(dummy_document_bucket), RetainGuard(monitored_ref_count));
}
};
diff --git a/storage/src/tests/persistence/mergehandlertest.cpp b/storage/src/tests/persistence/mergehandlertest.cpp
index 60030004594..017b8ce2b92 100644
--- a/storage/src/tests/persistence/mergehandlertest.cpp
+++ b/storage/src/tests/persistence/mergehandlertest.cpp
@@ -20,7 +20,12 @@ using namespace ::testing;
namespace storage {
-struct MergeHandlerTest : SingleDiskPersistenceTestUtils {
+/*
+ * Class for testing merge handler taking async_apply_bucket_diff as
+ * parameter for the test.
+ */
+struct MergeHandlerTest : SingleDiskPersistenceTestUtils,
+ public testing::WithParamInterface<bool> {
uint32_t _location; // Location used for all merge tests
document::Bucket _bucket; // Bucket used for all merge tests
uint64_t _maxTimestamp;
@@ -149,8 +154,11 @@ struct MergeHandlerTest : SingleDiskPersistenceTestUtils {
int _counter;
MessageSenderStub _stub;
std::shared_ptr<api::ApplyBucketDiffCommand> _applyCmd;
+ void convert_delayed_error_to_exception(MergeHandlerTest& test, MergeHandler& handler);
};
+ void convert_delayed_error_to_exception(MergeHandler& handler);
+
std::string
doTestSPIException(MergeHandler& handler,
PersistenceProviderWrapper& providerWrapper,
@@ -159,11 +167,21 @@ struct MergeHandlerTest : SingleDiskPersistenceTestUtils {
MergeHandler createHandler(size_t maxChunkSize = 0x400000) {
return MergeHandler(getEnv(), getPersistenceProvider(),
- getEnv()._component.cluster_context(), getEnv()._component.getClock(), maxChunkSize);
+ getEnv()._component.cluster_context(), getEnv()._component.getClock(), maxChunkSize, 64, GetParam());
}
MergeHandler createHandler(spi::PersistenceProvider & spi) {
return MergeHandler(getEnv(), spi,
- getEnv()._component.cluster_context(), getEnv()._component.getClock());
+ getEnv()._component.cluster_context(), getEnv()._component.getClock(), 4190208, 64, GetParam());
+ }
+
+ std::shared_ptr<api::StorageMessage> get_queued_reply() {
+ std::shared_ptr<api::StorageMessage> msg;
+ if (_replySender.queue.getNext(msg, 0s)) {
+ return msg;
+ } else {
+ return {};
+ }
+
}
};
@@ -209,7 +227,7 @@ MergeHandlerTest::setUpChain(ChainPos pos) {
// Test a regular merge bucket command fetching data, including
// puts, removes, unrevertable removes & duplicates.
-TEST_F(MergeHandlerTest, merge_bucket_command) {
+TEST_P(MergeHandlerTest, merge_bucket_command) {
MergeHandler handler = createHandler();
LOG(debug, "Handle a merge bucket command");
@@ -270,11 +288,11 @@ MergeHandlerTest::testGetBucketDiffChain(bool midChain)
EXPECT_EQ(17, diff.size());
}
-TEST_F(MergeHandlerTest, get_bucket_diff_mid_chain) {
+TEST_P(MergeHandlerTest, get_bucket_diff_mid_chain) {
testGetBucketDiffChain(true);
}
-TEST_F(MergeHandlerTest, get_bucket_diff_end_of_chain) {
+TEST_P(MergeHandlerTest, get_bucket_diff_end_of_chain) {
testGetBucketDiffChain(false);
}
@@ -320,17 +338,17 @@ MergeHandlerTest::testApplyBucketDiffChain(bool midChain)
EXPECT_EQ(0, diff.size());
}
-TEST_F(MergeHandlerTest, apply_bucket_diff_mid_chain) {
+TEST_P(MergeHandlerTest, apply_bucket_diff_mid_chain) {
testApplyBucketDiffChain(true);
}
-TEST_F(MergeHandlerTest, apply_bucket_diff_end_of_chain) {
+TEST_P(MergeHandlerTest, apply_bucket_diff_end_of_chain) {
testApplyBucketDiffChain(false);
}
// Test that a simplistic merge with one thing to actually merge,
// sends correct commands and finish.
-TEST_F(MergeHandlerTest, master_message_flow) {
+TEST_P(MergeHandlerTest, master_message_flow) {
MergeHandler handler = createHandler();
LOG(debug, "Handle a merge bucket command");
@@ -424,7 +442,7 @@ getFilledDataSize(const std::vector<api::ApplyBucketDiffCommand::Entry>& diff)
}
-TEST_F(MergeHandlerTest, chunked_apply_bucket_diff) {
+TEST_P(MergeHandlerTest, chunked_apply_bucket_diff) {
uint32_t docSize = 1024;
uint32_t docCount = 10;
uint32_t maxChunkSize = docSize * 3;
@@ -488,7 +506,7 @@ TEST_F(MergeHandlerTest, chunked_apply_bucket_diff) {
EXPECT_TRUE(reply->getResult().success());
}
-TEST_F(MergeHandlerTest, chunk_limit_partially_filled_diff) {
+TEST_P(MergeHandlerTest, chunk_limit_partially_filled_diff) {
setUpChain(FRONT);
uint32_t docSize = 1024;
@@ -524,7 +542,7 @@ TEST_F(MergeHandlerTest, chunk_limit_partially_filled_diff) {
EXPECT_LE(getFilledDataSize(fwdDiffCmd->getDiff()), maxChunkSize);
}
-TEST_F(MergeHandlerTest, max_timestamp) {
+TEST_P(MergeHandlerTest, max_timestamp) {
doPut(1234, spi::Timestamp(_maxTimestamp + 10), 1024, 1024);
MergeHandler handler = createHandler();
@@ -632,7 +650,7 @@ MergeHandlerTest::createDummyGetBucketDiff(int timestampOffset, uint16_t hasMask
return getBucketDiffCmd;
}
-TEST_F(MergeHandlerTest, spi_flush_guard) {
+TEST_P(MergeHandlerTest, spi_flush_guard) {
PersistenceProviderWrapper providerWrapper(getPersistenceProvider());
MergeHandler handler = createHandler(providerWrapper);
@@ -647,13 +665,16 @@ TEST_F(MergeHandlerTest, spi_flush_guard) {
try {
auto cmd = createDummyApplyDiff(6000);
handler.handleApplyBucketDiff(*cmd, createTracker(cmd, _bucket));
+ if (GetParam()) {
+ convert_delayed_error_to_exception(handler);
+ }
FAIL() << "No exception thrown on failing in-place remove";
} catch (const std::runtime_error& e) {
EXPECT_TRUE(std::string(e.what()).find("Failed remove") != std::string::npos);
}
}
-TEST_F(MergeHandlerTest, bucket_not_found_in_db) {
+TEST_P(MergeHandlerTest, bucket_not_found_in_db) {
MergeHandler handler = createHandler();
// Send merge for unknown bucket
auto cmd = std::make_shared<api::MergeBucketCommand>(makeDocumentBucket(document::BucketId(16, 6789)), _nodes, _maxTimestamp);
@@ -661,7 +682,7 @@ TEST_F(MergeHandlerTest, bucket_not_found_in_db) {
EXPECT_TRUE(tracker->getResult().isBucketDisappearance());
}
-TEST_F(MergeHandlerTest, merge_progress_safe_guard) {
+TEST_P(MergeHandlerTest, merge_progress_safe_guard) {
MergeHandler handler = createHandler();
auto cmd = std::make_shared<api::MergeBucketCommand>(_bucket, _nodes, _maxTimestamp);
handler.handleMergeBucket(*cmd, createTracker(cmd, _bucket));
@@ -684,7 +705,7 @@ TEST_F(MergeHandlerTest, merge_progress_safe_guard) {
EXPECT_EQ(mergeReply->getResult().getResult(), api::ReturnCode::INTERNAL_FAILURE);
}
-TEST_F(MergeHandlerTest, safe_guard_not_invoked_when_has_mask_changes) {
+TEST_P(MergeHandlerTest, safe_guard_not_invoked_when_has_mask_changes) {
MergeHandler handler = createHandler();
_nodes.clear();
_nodes.emplace_back(0, false);
@@ -716,7 +737,7 @@ TEST_F(MergeHandlerTest, safe_guard_not_invoked_when_has_mask_changes) {
EXPECT_EQ(0x5, applyBucketDiffCmd2->getDiff()[0]._entry._hasMask);
}
-TEST_F(MergeHandlerTest, entry_removed_after_get_bucket_diff) {
+TEST_P(MergeHandlerTest, entry_removed_after_get_bucket_diff) {
MergeHandler handler = createHandler();
std::vector<api::ApplyBucketDiffCommand::Entry> applyDiff;
{
@@ -741,6 +762,23 @@ TEST_F(MergeHandlerTest, entry_removed_after_get_bucket_diff) {
EXPECT_EQ(0x0, diff[0]._entry._hasMask);
}
+void
+MergeHandlerTest::convert_delayed_error_to_exception(MergeHandler& handler)
+{
+ handler.drain_async_writes();
+ if (getEnv()._fileStorHandler.isMerging(_bucket)) {
+ auto s = getEnv()._fileStorHandler.editMergeStatus(_bucket);
+ api::ReturnCode return_code;
+ s->check_delayed_error(return_code);
+ if (return_code.failed()) {
+ getEnv()._fileStorHandler.clearMergeStatus(_bucket, return_code);
+ fetchSingleMessage<api::ApplyBucketDiffReply>();
+ fetchSingleMessage<api::ApplyBucketDiffCommand>();
+ throw std::runtime_error(return_code.getMessage());
+ }
+ }
+}
+
std::string
MergeHandlerTest::doTestSPIException(MergeHandler& handler,
PersistenceProviderWrapper& providerWrapper,
@@ -755,6 +793,9 @@ MergeHandlerTest::doTestSPIException(MergeHandler& handler,
providerWrapper.setFailureMask(failureMask);
try {
invoker.invoke(*this, handler, *_context);
+ if (GetParam()) {
+ convert_delayed_error_to_exception(handler);
+ }
if (failureMask != 0) {
return (std::string("No exception was thrown during handler "
"invocation. Expected exception containing '")
@@ -823,7 +864,7 @@ MergeHandlerTest::HandleMergeBucketInvoker::invoke(
handler.handleMergeBucket(*cmd, test.createTracker(cmd, test._bucket));
}
-TEST_F(MergeHandlerTest, merge_bucket_spi_failures) {
+TEST_P(MergeHandlerTest, merge_bucket_spi_failures) {
PersistenceProviderWrapper providerWrapper(getPersistenceProvider());
MergeHandler handler = createHandler(providerWrapper);
providerWrapper.setResult(
@@ -855,7 +896,7 @@ MergeHandlerTest::HandleGetBucketDiffInvoker::invoke(
handler.handleGetBucketDiff(*cmd, test.createTracker(cmd, test._bucket));
}
-TEST_F(MergeHandlerTest, get_bucket_diff_spi_failures) {
+TEST_P(MergeHandlerTest, get_bucket_diff_spi_failures) {
PersistenceProviderWrapper providerWrapper(getPersistenceProvider());
MergeHandler handler = createHandler(providerWrapper);
providerWrapper.setResult(spi::Result(spi::Result::ErrorType::PERMANENT_ERROR, "who you gonna call?"));
@@ -888,7 +929,7 @@ MergeHandlerTest::HandleApplyBucketDiffInvoker::invoke(
handler.handleApplyBucketDiff(*cmd, test.createTracker(cmd, test._bucket));
}
-TEST_F(MergeHandlerTest, apply_bucket_diff_spi_failures) {
+TEST_P(MergeHandlerTest, apply_bucket_diff_spi_failures) {
PersistenceProviderWrapper providerWrapper(getPersistenceProvider());
MergeHandler handler = createHandler(providerWrapper);
providerWrapper.setResult(
@@ -953,7 +994,7 @@ MergeHandlerTest::HandleGetBucketDiffReplyInvoker::afterInvoke(
api::ReturnCode::INTERNAL_FAILURE);
}
-TEST_F(MergeHandlerTest, get_bucket_diff_reply_spi_failures) {
+TEST_P(MergeHandlerTest, get_bucket_diff_reply_spi_failures) {
PersistenceProviderWrapper providerWrapper(getPersistenceProvider());
MergeHandler handler = createHandler(providerWrapper);
providerWrapper.setResult(
@@ -1006,6 +1047,18 @@ MergeHandlerTest::HandleApplyBucketDiffReplyInvoker::beforeInvoke(
}
void
+MergeHandlerTest::HandleApplyBucketDiffReplyInvoker::convert_delayed_error_to_exception(MergeHandlerTest& test, MergeHandler &handler)
+{
+ handler.drain_async_writes();
+ if (!_stub.replies.empty() && _stub.replies.back()->getResult().failed()) {
+ auto chained_reply = _stub.replies.back();
+ _stub.replies.pop_back();
+ test.messageKeeper().sendReply(chained_reply);
+ throw std::runtime_error(chained_reply->getResult().getMessage());
+ }
+}
+
+void
MergeHandlerTest::HandleApplyBucketDiffReplyInvoker::invoke(
MergeHandlerTest& test,
MergeHandler& handler,
@@ -1016,6 +1069,9 @@ MergeHandlerTest::HandleApplyBucketDiffReplyInvoker::invoke(
test.fillDummyApplyDiff(reply->getDiff());
_stub.clear();
handler.handleApplyBucketDiffReply(*reply, _stub, test.createTracker(reply, test._bucket));
+ if (test.GetParam()) {
+ convert_delayed_error_to_exception(test, handler);
+ }
}
std::string
@@ -1039,7 +1095,7 @@ MergeHandlerTest::HandleApplyBucketDiffReplyInvoker::afterInvoke(
}
}
-TEST_F(MergeHandlerTest, apply_bucket_diff_reply_spi_failures) {
+TEST_P(MergeHandlerTest, apply_bucket_diff_reply_spi_failures) {
PersistenceProviderWrapper providerWrapper(getPersistenceProvider());
HandleApplyBucketDiffReplyInvoker invoker;
for (int i = 0; i < 2; ++i) {
@@ -1066,7 +1122,7 @@ TEST_F(MergeHandlerTest, apply_bucket_diff_reply_spi_failures) {
}
}
-TEST_F(MergeHandlerTest, remove_from_diff) {
+TEST_P(MergeHandlerTest, remove_from_diff) {
framework::defaultimplementation::FakeClock clock;
MergeStatus status(clock, 0, 0);
@@ -1132,7 +1188,7 @@ TEST_F(MergeHandlerTest, remove_from_diff) {
}
}
-TEST_F(MergeHandlerTest, remove_put_on_existing_timestamp) {
+TEST_P(MergeHandlerTest, remove_put_on_existing_timestamp) {
setUpChain(BACK);
document::TestDocMan docMan;
@@ -1156,8 +1212,15 @@ TEST_F(MergeHandlerTest, remove_put_on_existing_timestamp) {
auto tracker = handler.handleApplyBucketDiff(*applyBucketDiffCmd, createTracker(applyBucketDiffCmd, _bucket));
- auto applyBucketDiffReply = std::dynamic_pointer_cast<api::ApplyBucketDiffReply>(std::move(*tracker).stealReplySP());
- ASSERT_TRUE(applyBucketDiffReply.get());
+ if (GetParam()) {
+ ASSERT_FALSE(tracker);
+ handler.drain_async_writes();
+ auto applyBucketDiffReply = std::dynamic_pointer_cast<api::ApplyBucketDiffReply>(get_queued_reply());
+ ASSERT_TRUE(applyBucketDiffReply.get());
+ } else {
+ auto applyBucketDiffReply = std::dynamic_pointer_cast<api::ApplyBucketDiffReply>(std::move(*tracker).stealReplySP());
+ ASSERT_TRUE(applyBucketDiffReply.get());
+ }
auto cmd = std::make_shared<api::MergeBucketCommand>(_bucket, _nodes, _maxTimestamp);
handler.handleMergeBucket(*cmd, createTracker(cmd, _bucket));
@@ -1246,7 +1309,7 @@ std::ostream &operator<<(std::ostream &os, const GetBucketDiffCommand::Entry &en
}
-TEST_F(MergeHandlerTest, partially_filled_apply_bucket_diff_reply)
+TEST_P(MergeHandlerTest, partially_filled_apply_bucket_diff_reply)
{
using NodeList = decltype(_nodes);
// Redundancy is 2 and source only nodes 3 and 4 have doc1 and doc2
@@ -1382,4 +1445,6 @@ TEST_F(MergeHandlerTest, partially_filled_apply_bucket_diff_reply)
LOG(debug, "got mergebucket reply");
}
+VESPA_GTEST_INSTANTIATE_TEST_SUITE_P(AsyncApplyBucketDiffParams, MergeHandlerTest, testing::Values(false, true));
+
} // storage
diff --git a/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_complete.cpp b/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_complete.cpp
index 1fbe155b16d..2dc5989e857 100644
--- a/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_complete.cpp
+++ b/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_complete.cpp
@@ -20,7 +20,7 @@ ApplyBucketDiffEntryComplete::ApplyBucketDiffEntryComplete(std::shared_ptr<Apply
ApplyBucketDiffEntryComplete::~ApplyBucketDiffEntryComplete() = default;
void
-ApplyBucketDiffEntryComplete::onComplete(std::unique_ptr<spi::Result> result)
+ApplyBucketDiffEntryComplete::onComplete(std::unique_ptr<spi::Result> result) noexcept
{
if (_result_handler != nullptr) {
_result_handler->handle(*result);
diff --git a/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_complete.h b/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_complete.h
index dd2346d9dee..1037318aec6 100644
--- a/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_complete.h
+++ b/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_complete.h
@@ -27,7 +27,7 @@ class ApplyBucketDiffEntryComplete : public spi::OperationComplete
public:
ApplyBucketDiffEntryComplete(std::shared_ptr<ApplyBucketDiffState> state, document::DocumentId doc_id, const char *op, const framework::Clock& clock, metrics::DoubleAverageMetric& latency_metric);
~ApplyBucketDiffEntryComplete();
- void onComplete(std::unique_ptr<spi::Result> result) override;
+ void onComplete(std::unique_ptr<spi::Result> result) noexcept override;
void addResultHandler(const spi::ResultHandler* resultHandler) override;
};
diff --git a/storage/src/vespa/storage/persistence/apply_bucket_diff_state.cpp b/storage/src/vespa/storage/persistence/apply_bucket_diff_state.cpp
index eb7a5ef5bc6..ad153c41aef 100644
--- a/storage/src/vespa/storage/persistence/apply_bucket_diff_state.cpp
+++ b/storage/src/vespa/storage/persistence/apply_bucket_diff_state.cpp
@@ -2,21 +2,27 @@
#include "apply_bucket_diff_state.h"
#include "mergehandler.h"
+#include "persistenceutil.h"
#include <vespa/document/base/documentid.h>
#include <vespa/persistence/spi/result.h>
#include <vespa/vespalib/stllike/asciistream.h>
using storage::spi::Result;
+using vespalib::RetainGuard;
namespace storage {
-ApplyBucketDiffState::ApplyBucketDiffState(const MergeBucketInfoSyncer& merge_bucket_info_syncer, const spi::Bucket& bucket)
+ApplyBucketDiffState::ApplyBucketDiffState(const MergeBucketInfoSyncer& merge_bucket_info_syncer, const spi::Bucket& bucket, RetainGuard&& retain_guard)
: _merge_bucket_info_syncer(merge_bucket_info_syncer),
_bucket(bucket),
_fail_message(),
_failed_flag(),
_stale_bucket_info(false),
- _promise()
+ _promise(),
+ _tracker(),
+ _delayed_reply(),
+ _sender(nullptr),
+ _retain_guard(std::move(retain_guard))
{
}
@@ -32,6 +38,17 @@ ApplyBucketDiffState::~ApplyBucketDiffState()
if (_promise.has_value()) {
_promise.value().set_value(_fail_message);
}
+ if (_delayed_reply) {
+ if (!_delayed_reply->getResult().failed() && !_fail_message.empty()) {
+ _delayed_reply->setResult(api::ReturnCode(api::ReturnCode::INTERNAL_FAILURE, _fail_message));
+ }
+ if (_sender) {
+ _sender->sendReply(std::move(_delayed_reply));
+ } else {
+ // _tracker->_reply and _delayed_reply points to the same reply.
+ _tracker->sendReply();
+ }
+ }
}
void
@@ -69,4 +86,19 @@ ApplyBucketDiffState::get_future()
return _promise.value().get_future();
}
+void
+ApplyBucketDiffState::set_delayed_reply(std::unique_ptr<MessageTracker>&& tracker, std::shared_ptr<api::StorageReply>&& delayed_reply)
+{
+ _tracker = std::move(tracker);
+ _delayed_reply = std::move(delayed_reply);
+}
+
+void
+ApplyBucketDiffState::set_delayed_reply(std::unique_ptr<MessageTracker>&& tracker, MessageSender& sender, std::shared_ptr<api::StorageReply>&& delayed_reply)
+{
+ _tracker = std::move(tracker);
+ _sender = &sender;
+ _delayed_reply = std::move(delayed_reply);
+}
+
}
diff --git a/storage/src/vespa/storage/persistence/apply_bucket_diff_state.h b/storage/src/vespa/storage/persistence/apply_bucket_diff_state.h
index af4174b06d6..39f60156e66 100644
--- a/storage/src/vespa/storage/persistence/apply_bucket_diff_state.h
+++ b/storage/src/vespa/storage/persistence/apply_bucket_diff_state.h
@@ -3,16 +3,20 @@
#pragma once
#include <vespa/persistence/spi/bucket.h>
+#include <vespa/vespalib/util/retain_guard.h>
#include <future>
#include <memory>
#include <vector>
namespace document { class DocumentId; }
+namespace storage::api { class StorageReply; }
namespace storage::spi { class Result; }
namespace storage {
class ApplyBucketDiffEntryResult;
+class MessageSender;
+class MessageTracker;
class MergeBucketInfoSyncer;
/*
@@ -26,9 +30,13 @@ class ApplyBucketDiffState {
std::atomic_flag _failed_flag;
bool _stale_bucket_info;
std::optional<std::promise<vespalib::string>> _promise;
+ std::unique_ptr<MessageTracker> _tracker;
+ std::shared_ptr<api::StorageReply> _delayed_reply;
+ MessageSender* _sender;
+ vespalib::RetainGuard _retain_guard;
public:
- ApplyBucketDiffState(const MergeBucketInfoSyncer &merge_bucket_info_syncer, const spi::Bucket& bucket);
+ ApplyBucketDiffState(const MergeBucketInfoSyncer &merge_bucket_info_syncer, const spi::Bucket& bucket, vespalib::RetainGuard&& retain_guard);
~ApplyBucketDiffState();
void on_entry_complete(std::unique_ptr<storage::spi::Result> result, const document::DocumentId &doc_id, const char *op);
void wait();
@@ -36,6 +44,8 @@ public:
void mark_stale_bucket_info();
void sync_bucket_info();
std::future<vespalib::string> get_future();
+ void set_delayed_reply(std::unique_ptr<MessageTracker>&& tracker, std::shared_ptr<api::StorageReply>&& delayed_reply);
+ void set_delayed_reply(std::unique_ptr<MessageTracker>&& tracker, MessageSender& sender, std::shared_ptr<api::StorageReply>&& delayed_reply);
};
}
diff --git a/storage/src/vespa/storage/persistence/asynchandler.cpp b/storage/src/vespa/storage/persistence/asynchandler.cpp
index d150f5600e5..47b5e4f5f27 100644
--- a/storage/src/vespa/storage/persistence/asynchandler.cpp
+++ b/storage/src/vespa/storage/persistence/asynchandler.cpp
@@ -78,7 +78,7 @@ public:
_executorId(executor.getExecutorId(bucketId.getId()))
{
}
- void onComplete(spi::Result::UP result) override {
+ void onComplete(spi::Result::UP result) noexcept override {
_task->setResult(std::move(result));
_executor.executeTask(_executorId, std::move(_task));
}
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
index 2ffb827accf..c32efb6aa66 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
+++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
@@ -209,6 +209,11 @@ FileStorManager::configure(std::unique_ptr<vespa::config::content::StorFilestorC
*_filestorHandler, i % numStripes, _component));
}
_bucketExecutorRegistration = _provider->register_executor(std::make_shared<BucketExecutorWrapper>(*this));
+ } else {
+ std::lock_guard guard(_lock);
+ for (auto& handler : _persistenceHandlers) {
+ handler->configure(*config);
+ }
}
}
diff --git a/storage/src/vespa/storage/persistence/filestorage/mergestatus.cpp b/storage/src/vespa/storage/persistence/filestorage/mergestatus.cpp
index febac5b87e5..1d9e0c6fae6 100644
--- a/storage/src/vespa/storage/persistence/filestorage/mergestatus.cpp
+++ b/storage/src/vespa/storage/persistence/filestorage/mergestatus.cpp
@@ -14,6 +14,7 @@ MergeStatus::MergeStatus(const framework::Clock& clock,
uint32_t traceLevel)
: reply(), full_node_list(), nodeList(), maxTimestamp(0), diff(), pendingId(0),
pendingGetDiff(), pendingApplyDiff(), timeout(0), startTime(clock),
+ delayed_error(),
context(priority, traceLevel)
{}
@@ -122,4 +123,23 @@ MergeStatus::print(std::ostream& out, bool verbose,
}
}
+void
+MergeStatus::set_delayed_error(std::future<vespalib::string>&& delayed_error_in)
+{
+ delayed_error = std::move(delayed_error_in);
+}
+
+void
+MergeStatus::check_delayed_error(api::ReturnCode &return_code)
+{
+ if (!return_code.failed() && delayed_error.has_value()) {
+ // Wait for pending writes to local node to complete and check error
+ auto& future_error = delayed_error.value();
+ future_error.wait();
+ vespalib::string fail_message = future_error.get();
+ delayed_error.reset();
+ return_code = api::ReturnCode(api::ReturnCode::INTERNAL_FAILURE, std::move(fail_message));
+ }
+}
+
};
diff --git a/storage/src/vespa/storage/persistence/filestorage/mergestatus.h b/storage/src/vespa/storage/persistence/filestorage/mergestatus.h
index b28ca4e373a..05ffd1336a2 100644
--- a/storage/src/vespa/storage/persistence/filestorage/mergestatus.h
+++ b/storage/src/vespa/storage/persistence/filestorage/mergestatus.h
@@ -9,7 +9,9 @@
#include <vector>
#include <deque>
+#include <future>
#include <memory>
+#include <optional>
namespace storage {
@@ -25,6 +27,7 @@ public:
std::shared_ptr<api::ApplyBucketDiffReply> pendingApplyDiff;
vespalib::duration timeout;
framework::MilliSecTimer startTime;
+ std::optional<std::future<vespalib::string>> delayed_error;
spi::Context context;
MergeStatus(const framework::Clock&, api::StorageMessage::Priority, uint32_t traceLevel);
@@ -40,6 +43,8 @@ public:
bool removeFromDiff(const std::vector<api::ApplyBucketDiffCommand::Entry>& part, uint16_t hasMask, const std::vector<api::MergeBucketCommand::Node> &nodes);
void print(std::ostream& out, bool verbose, const std::string& indent) const override;
bool isFirstNode() const { return static_cast<bool>(reply); }
+ void set_delayed_error(std::future<vespalib::string>&& delayed_error_in);
+ void check_delayed_error(api::ReturnCode &return_code);
};
} // storage
diff --git a/storage/src/vespa/storage/persistence/mergehandler.cpp b/storage/src/vespa/storage/persistence/mergehandler.cpp
index 17a16487ac4..77e7762ec9a 100644
--- a/storage/src/vespa/storage/persistence/mergehandler.cpp
+++ b/storage/src/vespa/storage/persistence/mergehandler.cpp
@@ -17,6 +17,9 @@
#include <vespa/log/log.h>
LOG_SETUP(".persistence.mergehandler");
+using vespalib::MonitoredRefCount;
+using vespalib::RetainGuard;
+
namespace storage {
MergeHandler::MergeHandler(PersistenceUtil& env, spi::PersistenceProvider& spi,
@@ -28,12 +31,19 @@ MergeHandler::MergeHandler(PersistenceUtil& env, spi::PersistenceProvider& spi,
_cluster_context(cluster_context),
_env(env),
_spi(spi),
+ _monitored_ref_count(std::make_unique<MonitoredRefCount>()),
_maxChunkSize(maxChunkSize),
_commonMergeChainOptimalizationMinimumSize(commonMergeChainOptimalizationMinimumSize),
_async_apply_bucket_diff(async_apply_bucket_diff)
{
}
+MergeHandler::~MergeHandler()
+{
+ drain_async_writes();
+}
+
+
namespace {
constexpr int getDeleteFlag() {
@@ -674,7 +684,8 @@ namespace {
api::StorageReply::SP
MergeHandler::processBucketMerge(const spi::Bucket& bucket, MergeStatus& status,
- MessageSender& sender, spi::Context& context) const
+ MessageSender& sender, spi::Context& context,
+ std::shared_ptr<ApplyBucketDiffState>& async_results) const
{
// If last action failed, fail the whole merge
if (status.reply->getResult().failed()) {
@@ -806,6 +817,10 @@ MergeHandler::processBucketMerge(const spi::Bucket& bucket, MergeStatus& status,
}
cmd->setPriority(status.context.getPriority());
cmd->setTimeout(status.timeout);
+ if (async_results) {
+ // Check currently pending writes to local node before sending new command.
+ check_apply_diff_sync(std::move(async_results));
+ }
if (applyDiffNeedLocalData(cmd->getDiff(), 0, true)) {
framework::MilliSecTimer startTime(_clock);
fetchLocalData(bucket, cmd->getDiff(), 0, context);
@@ -1171,7 +1186,8 @@ MergeHandler::handleGetBucketDiffReply(api::GetBucketDiffReply& reply, MessageSe
reply.getDiff().begin(),
reply.getDiff().end());
- replyToSend = processBucketMerge(bucket, *s, sender, s->context);
+ std::shared_ptr<ApplyBucketDiffState> async_results;
+ replyToSend = processBucketMerge(bucket, *s, sender, s->context, async_results);
if (!replyToSend.get()) {
// We have sent something on, and shouldn't reply now.
@@ -1211,7 +1227,7 @@ MergeHandler::handleApplyBucketDiff(api::ApplyBucketDiffCommand& cmd, MessageTra
tracker->setMetric(_env._metrics.applyBucketDiff);
spi::Bucket bucket(cmd.getBucket());
- auto async_results = std::make_shared<ApplyBucketDiffState>(*this, bucket);
+ std::shared_ptr<ApplyBucketDiffState> async_results;
LOG(debug, "%s", cmd.toString().c_str());
if (_env._fileStorHandler.isMerging(bucket.getBucket())) {
@@ -1233,10 +1249,13 @@ MergeHandler::handleApplyBucketDiff(api::ApplyBucketDiffCommand& cmd, MessageTra
}
if (applyDiffHasLocallyNeededData(cmd.getDiff(), index)) {
framework::MilliSecTimer startTime(_clock);
+ async_results = std::make_shared<ApplyBucketDiffState>(*this, bucket, RetainGuard(*_monitored_ref_count));
applyDiffLocally(bucket, cmd.getDiff(), index, tracker->context(), async_results);
+ if (!_async_apply_bucket_diff.load(std::memory_order_relaxed)) {
+ check_apply_diff_sync(std::move(async_results));
+ }
_env._metrics.merge_handler_metrics.mergeDataWriteLatency.addValue(
startTime.getElapsedTimeAsDouble());
- check_apply_diff_sync(std::move(async_results));
} else {
LOG(spam, "Merge(%s): Didn't need fetched data on node %u (%u).",
bucket.toString().c_str(), _env._nodeIndex, index);
@@ -1260,10 +1279,14 @@ MergeHandler::handleApplyBucketDiff(api::ApplyBucketDiffCommand& cmd, MessageTra
}
}
- tracker->setReply(std::make_shared<api::ApplyBucketDiffReply>(cmd));
+ auto reply = std::make_shared<api::ApplyBucketDiffReply>(cmd);
+ tracker->setReply(reply);
static_cast<api::ApplyBucketDiffReply&>(tracker->getReply()).getDiff().swap(cmd.getDiff());
LOG(spam, "Replying to ApplyBucketDiff for %s to node %d.",
bucket.toString().c_str(), cmd.getNodes()[index - 1].index);
+ if (async_results) {
+ async_results->set_delayed_reply(std::move(tracker), std::move(reply));
+ }
} else {
// When not the last node in merge chain, we must save reply, and
// send command on.
@@ -1280,6 +1303,10 @@ MergeHandler::handleApplyBucketDiff(api::ApplyBucketDiffCommand& cmd, MessageTra
cmd2->setPriority(cmd.getPriority());
cmd2->setTimeout(cmd.getTimeout());
s->pendingId = cmd2->getMsgId();
+ if (async_results) {
+ // Reply handler should check for delayed error.
+ s->set_delayed_error(async_results->get_future());
+ }
_env._fileStorHandler.sendCommand(cmd2);
// Everything went fine. Don't delete state but wait for reply
stateGuard.deactivate();
@@ -1290,12 +1317,11 @@ MergeHandler::handleApplyBucketDiff(api::ApplyBucketDiffCommand& cmd, MessageTra
}
void
-MergeHandler::handleApplyBucketDiffReply(api::ApplyBucketDiffReply& reply,MessageSender& sender, MessageTracker::UP tracker) const
+MergeHandler::handleApplyBucketDiffReply(api::ApplyBucketDiffReply& reply, MessageSender& sender, MessageTracker::UP tracker) const
{
- (void) tracker;
_env._metrics.applyBucketDiffReply.inc();
spi::Bucket bucket(reply.getBucket());
- auto async_results = std::make_shared<ApplyBucketDiffState>(*this, bucket);
+ std::shared_ptr<ApplyBucketDiffState> async_results;
std::vector<api::ApplyBucketDiffCommand::Entry>& diff(reply.getDiff());
LOG(debug, "%s", reply.toString().c_str());
@@ -1316,6 +1342,8 @@ MergeHandler::handleApplyBucketDiffReply(api::ApplyBucketDiffReply& reply,Messag
api::StorageReply::SP replyToSend;
// Process apply bucket diff locally
api::ReturnCode returnCode = reply.getResult();
+ // Check for delayed error from handleApplyBucketDiff
+ s->check_delayed_error(returnCode);
try {
if (reply.getResult().failed()) {
LOG(debug, "Got failed apply bucket diff reply %s", reply.toString().c_str());
@@ -1329,9 +1357,12 @@ MergeHandler::handleApplyBucketDiffReply(api::ApplyBucketDiffReply& reply,Messag
}
if (applyDiffHasLocallyNeededData(diff, index)) {
framework::MilliSecTimer startTime(_clock);
+ async_results = std::make_shared<ApplyBucketDiffState>(*this, bucket, RetainGuard(*_monitored_ref_count));
applyDiffLocally(bucket, diff, index, s->context, async_results);
+ if (!_async_apply_bucket_diff.load(std::memory_order_relaxed)) {
+ check_apply_diff_sync(std::move(async_results));
+ }
_env._metrics.merge_handler_metrics.mergeDataWriteLatency.addValue(startTime.getElapsedTimeAsDouble());
- check_apply_diff_sync(std::move(async_results));
} else {
LOG(spam, "Merge(%s): Didn't need fetched data on node %u (%u)",
bucket.toString().c_str(),
@@ -1370,7 +1401,7 @@ MergeHandler::handleApplyBucketDiffReply(api::ApplyBucketDiffReply& reply,Messag
// Should reply now, since we failed.
replyToSend = s->reply;
} else {
- replyToSend = processBucketMerge(bucket, *s, sender, s->context);
+ replyToSend = processBucketMerge(bucket, *s, sender, s->context, async_results);
if (!replyToSend.get()) {
// We have sent something on and shouldn't reply now.
@@ -1392,6 +1423,10 @@ MergeHandler::handleApplyBucketDiffReply(api::ApplyBucketDiffReply& reply,Messag
throw;
}
+ if (async_results && replyToSend) {
+ replyToSend->setResult(returnCode);
+ async_results->set_delayed_reply(std::move(tracker), sender, std::move(replyToSend));
+ }
if (clearState) {
_env._fileStorHandler.clearMergeStatus(bucket.getBucket());
}
@@ -1402,4 +1437,19 @@ MergeHandler::handleApplyBucketDiffReply(api::ApplyBucketDiffReply& reply,Messag
}
}
+void
+MergeHandler::drain_async_writes()
+{
+ if (_monitored_ref_count) {
+ // Wait for related ApplyBucketDiffState objects to be destroyed
+ _monitored_ref_count->waitForZeroRefCount();
+ }
+}
+
+void
+MergeHandler::configure(bool async_apply_bucket_diff) noexcept
+{
+ _async_apply_bucket_diff.store(async_apply_bucket_diff, std::memory_order_release);
+}
+
} // storage
diff --git a/storage/src/vespa/storage/persistence/mergehandler.h b/storage/src/vespa/storage/persistence/mergehandler.h
index f6e8ddcf306..17cfb847d2c 100644
--- a/storage/src/vespa/storage/persistence/mergehandler.h
+++ b/storage/src/vespa/storage/persistence/mergehandler.h
@@ -20,6 +20,7 @@
#include <vespa/storageapi/message/bucket.h>
#include <vespa/storage/common/cluster_context.h>
#include <vespa/storage/common/messagesender.h>
+#include <vespa/vespalib/util/monitored_refcount.h>
namespace storage {
@@ -48,6 +49,8 @@ public:
uint32_t commonMergeChainOptimalizationMinimumSize = 64,
bool async_apply_bucket_diff = false);
+ ~MergeHandler();
+
bool buildBucketInfoList(
const spi::Bucket& bucket,
Timestamp maxTimestamp,
@@ -70,21 +73,25 @@ public:
void handleGetBucketDiffReply(api::GetBucketDiffReply&, MessageSender&) const;
MessageTrackerUP handleApplyBucketDiff(api::ApplyBucketDiffCommand&, MessageTrackerUP) const;
void handleApplyBucketDiffReply(api::ApplyBucketDiffReply&, MessageSender&, MessageTrackerUP) const;
+ void drain_async_writes();
+ void configure(bool async_apply_bucket_diff) noexcept;
private:
const framework::Clock &_clock;
const ClusterContext &_cluster_context;
PersistenceUtil &_env;
spi::PersistenceProvider &_spi;
+ std::unique_ptr<vespalib::MonitoredRefCount> _monitored_ref_count;
const uint32_t _maxChunkSize;
const uint32_t _commonMergeChainOptimalizationMinimumSize;
- const bool _async_apply_bucket_diff;
+ std::atomic<bool> _async_apply_bucket_diff;
/** Returns a reply if merge is complete */
api::StorageReply::SP processBucketMerge(const spi::Bucket& bucket,
MergeStatus& status,
MessageSender& sender,
- spi::Context& context) const;
+ spi::Context& context,
+ std::shared_ptr<ApplyBucketDiffState>& async_results) const;
/**
* Invoke either put, remove or unrevertable remove on the SPI
diff --git a/storage/src/vespa/storage/persistence/persistencehandler.cpp b/storage/src/vespa/storage/persistence/persistencehandler.cpp
index 1ef883fc810..aa1a9c136fd 100644
--- a/storage/src/vespa/storage/persistence/persistencehandler.cpp
+++ b/storage/src/vespa/storage/persistence/persistencehandler.cpp
@@ -150,4 +150,10 @@ PersistenceHandler::processLockedMessage(FileStorHandler::LockedMessage lock) co
}
}
+void
+PersistenceHandler::configure(vespa::config::content::StorFilestorConfig& config) noexcept
+{
+ _mergeHandler.configure(config.asyncApplyBucketDiff);
+}
+
}
diff --git a/storage/src/vespa/storage/persistence/persistencehandler.h b/storage/src/vespa/storage/persistence/persistencehandler.h
index a92c2dc78ca..c60fb05e56e 100644
--- a/storage/src/vespa/storage/persistence/persistencehandler.h
+++ b/storage/src/vespa/storage/persistence/persistencehandler.h
@@ -35,6 +35,7 @@ public:
const AsyncHandler & asyncHandler() const { return _asyncHandler; }
const SplitJoinHandler & splitjoinHandler() const { return _splitJoinHandler; }
const SimpleMessageHandler & simpleMessageHandler() const { return _simpleHandler; }
+ void configure(vespa::config::content::StorFilestorConfig& config) noexcept;
private:
// Message handling functions
MessageTracker::UP handleCommandSplitByType(api::StorageCommand&, MessageTracker::UP tracker) const;
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java
index a3e3c512dcd..e5667c7b392 100644
--- a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java
+++ b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java
@@ -8,7 +8,6 @@ import com.yahoo.cloud.config.ClusterListConfig;
import com.yahoo.concurrent.DaemonThreadFactory;
import com.yahoo.container.core.HandlerMetricContextUtil;
import com.yahoo.container.core.documentapi.VespaDocumentAccess;
-import com.yahoo.container.handler.threadpool.ContainerThreadPool;
import com.yahoo.container.jdisc.ContentChannelOutputStream;
import com.yahoo.container.jdisc.MaxPendingContentChannelOutputStream;
import com.yahoo.document.Document;
@@ -184,7 +183,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
private final Map<String, Map<Method, Handler>> handlers = defineApi();
@Inject
- public DocumentV1ApiHandler(ContainerThreadPool threadPool,
+ public DocumentV1ApiHandler(Executor defaultExecutor,
Metric metric,
MetricReceiver metricReceiver,
VespaDocumentAccess documentAccess,
@@ -193,7 +192,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
AllClustersBucketSpacesConfig bucketSpacesConfig,
DocumentOperationExecutorConfig executorConfig) {
this(Clock.systemUTC(), Duration.ofSeconds(5), metric, metricReceiver, documentAccess,
- documentManagerConfig, executorConfig, clusterListConfig, bucketSpacesConfig, threadPool.executor());
+ documentManagerConfig, executorConfig, clusterListConfig, bucketSpacesConfig, defaultExecutor);
}
DocumentV1ApiHandler(Clock clock, Duration handlerTimeout, Metric metric, MetricReceiver metricReceiver, DocumentAccess access,