summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--client/pom.xml13
-rw-r--r--config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileDistributionRpcServer.java17
-rw-r--r--configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileDistributionImpl.java4
-rw-r--r--http-utils/pom.xml12
-rw-r--r--node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/nodeagent/NodeAgentImpl.java20
-rw-r--r--node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/task/util/process/TestProcessFactory.java6
-rw-r--r--parent/pom.xml1
-rw-r--r--security-utils/pom.xml12
-rw-r--r--storage/src/tests/distributor/CMakeLists.txt1
-rw-r--r--storage/src/tests/distributor/blockingoperationstartertest.cpp3
-rw-r--r--storage/src/tests/distributor/distributor_stripe_test.cpp17
-rw-r--r--storage/src/tests/distributor/distributor_stripe_test_util.cpp9
-rw-r--r--storage/src/tests/distributor/distributor_stripe_test_util.h2
-rw-r--r--storage/src/tests/distributor/mergeoperationtest.cpp62
-rw-r--r--storage/src/tests/distributor/mock_tickable_stripe.h4
-rw-r--r--storage/src/tests/distributor/node_supported_features_repo_test.cpp52
-rw-r--r--storage/src/tests/distributor/top_level_bucket_db_updater_test.cpp53
-rw-r--r--storage/src/tests/distributor/top_level_distributor_test_util.cpp6
-rw-r--r--storage/src/tests/storageserver/mergethrottlertest.cpp58
-rw-r--r--storage/src/vespa/storage/config/distributorconfiguration.cpp2
-rw-r--r--storage/src/vespa/storage/config/distributorconfiguration.h7
-rw-r--r--storage/src/vespa/storage/config/stor-distributormanager.def7
-rw-r--r--storage/src/vespa/storage/distributor/CMakeLists.txt1
-rw-r--r--storage/src/vespa/storage/distributor/distributor_operation_context.h2
-rw-r--r--storage/src/vespa/storage/distributor/distributor_stripe.cpp14
-rw-r--r--storage/src/vespa/storage/distributor/distributor_stripe.h4
-rw-r--r--storage/src/vespa/storage/distributor/distributor_stripe_component.cpp6
-rw-r--r--storage/src/vespa/storage/distributor/distributor_stripe_component.h10
-rw-r--r--storage/src/vespa/storage/distributor/distributor_stripe_interface.h2
-rw-r--r--storage/src/vespa/storage/distributor/distributor_stripe_operation_context.h2
-rw-r--r--storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.cpp8
-rw-r--r--storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.h2
-rw-r--r--storage/src/vespa/storage/distributor/node_supported_features.h19
-rw-r--r--storage/src/vespa/storage/distributor/node_supported_features_repo.cpp37
-rw-r--r--storage/src/vespa/storage/distributor/node_supported_features_repo.h37
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp35
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.h1
-rw-r--r--storage/src/vespa/storage/distributor/pendingclusterstate.cpp35
-rw-r--r--storage/src/vespa/storage/distributor/pendingclusterstate.h13
-rw-r--r--storage/src/vespa/storage/distributor/stripe_access_guard.h4
-rw-r--r--storage/src/vespa/storage/distributor/tickable_stripe.h4
-rw-r--r--storage/src/vespa/storage/distributor/top_level_bucket_db_updater.cpp8
-rw-r--r--storage/src/vespa/storage/distributor/top_level_bucket_db_updater.h2
-rw-r--r--storage/src/vespa/storage/storageserver/mergethrottler.cpp90
-rw-r--r--storage/src/vespa/storage/storageserver/mergethrottler.h36
-rw-r--r--storageapi/src/tests/mbusprot/storageprotocoltest.cpp12
-rw-r--r--storageapi/src/vespa/storageapi/mbusprot/protobuf/maintenance.proto7
-rw-r--r--storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.cpp11
-rw-r--r--storageapi/src/vespa/storageapi/message/bucket.cpp6
-rw-r--r--storageapi/src/vespa/storageapi/message/bucket.h27
-rw-r--r--vespa-feed-client-cli/pom.xml11
-rw-r--r--vespa-feed-client/pom.xml11
-rw-r--r--vespa-hadoop/pom.xml7
-rw-r--r--vespa-http-client/pom.xml11
54 files changed, 642 insertions, 201 deletions
diff --git a/client/pom.xml b/client/pom.xml
index ba153aed8f8..4abcdf9ac6c 100644
--- a/client/pom.xml
+++ b/client/pom.xml
@@ -16,12 +16,6 @@
<packaging>jar</packaging>
<version>7-SNAPSHOT</version>
- <properties>
- <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
- <!-- TODO: Remove when we no longer support JDK 8 clients -->
- <maven.compiler.release>8</maven.compiler.release>
- </properties>
-
<dependencies>
<dependency>
<groupId>com.google.code.gson</groupId>
@@ -64,6 +58,13 @@
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <release>${vespaClients.jdk.releaseVersion}</release>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<configuration>
<finalName>${project.artifactId}-jar-with-dependencies</finalName>
diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileDistributionRpcServer.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileDistributionRpcServer.java
index 8f9e5edc337..8b9d1f34154 100644
--- a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileDistributionRpcServer.java
+++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileDistributionRpcServer.java
@@ -4,23 +4,20 @@ package com.yahoo.vespa.config.proxy.filedistribution;
import com.yahoo.concurrent.DaemonThreadFactory;
import com.yahoo.config.FileReference;
import com.yahoo.jrt.DoubleArray;
-import com.yahoo.jrt.Int32Value;
import com.yahoo.jrt.Method;
import com.yahoo.jrt.Request;
import com.yahoo.jrt.StringArray;
import com.yahoo.jrt.StringValue;
import com.yahoo.jrt.Supervisor;
-import java.util.logging.Level;
import com.yahoo.vespa.filedistribution.FileDownloader;
-import com.yahoo.vespa.filedistribution.FileReferenceDownload;
import java.io.File;
-import java.util.Arrays;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
+import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Collectors;
@@ -67,10 +64,6 @@ class FileDistributionRpcServer {
.methodDesc("download status for file references")
.returnDesc(0, "file references", "array of file references")
.returnDesc(1, "download status", "percentage downloaded of each file reference in above array"));
- supervisor.addMethod(new Method("filedistribution.setFileReferencesToDownload", "S", "i", this::setFileReferencesToDownload)
- .methodDesc("set which file references to download")
- .paramDesc(0, "file references", "file reference to download")
- .returnDesc(0, "ret", "0 if success, 1 otherwise"));
}
@@ -105,14 +98,6 @@ class FileDistributionRpcServer {
req.returnValues().add(new DoubleArray(downloadStatusArray));
}
- private void setFileReferencesToDownload(Request req) {
- log.log(Level.FINE, () -> "Received method call '" + req.methodName() + "' with parameters : " + req.parameters());
- Arrays.stream(req.parameters().get(0).asStringArray())
- .map(FileReference::new)
- .forEach(fileReference -> downloader.downloadIfNeeded(new FileReferenceDownload(fileReference)));
- req.returnValues().add(new Int32Value(0));
- }
-
private void downloadFile(Request req) {
FileReference fileReference = new FileReference(req.parameters().get(0).asString());
log.log(Level.FINE, () -> "getFile() called for file reference '" + fileReference.value() + "'");
diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileDistributionImpl.java b/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileDistributionImpl.java
index a49e2ec76bb..605f5924e68 100644
--- a/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileDistributionImpl.java
+++ b/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileDistributionImpl.java
@@ -41,8 +41,8 @@ public class FileDistributionImpl implements FileDistribution, RequestWaiter {
return fileReferencesDir;
}
- // Notifies config proxy which file references it should start downloading. It's OK if the call does not succeed,
- // as downloading will then start synchronously when a service requests a file reference instead
+ // Notifies client which file references it should start downloading. It's OK if the call does not succeed,
+ // as this is just a hint to the client to start downloading. Currently the only client is the config server
private void startDownloadingFileReferences(String hostName, int port, Set<FileReference> fileReferences) {
Target target = supervisor.connect(new Spec(hostName, port));
Request request = new Request("filedistribution.setFileReferencesToDownload");
diff --git a/http-utils/pom.xml b/http-utils/pom.xml
index e387e2c59e1..be62b7adb35 100644
--- a/http-utils/pom.xml
+++ b/http-utils/pom.xml
@@ -12,12 +12,6 @@
<packaging>jar</packaging>
<version>7-SNAPSHOT</version>
- <properties>
- <!-- vespa-http-client targets jdk8 and uses this library -->
- <!-- TODO remove once vespa-http-client no longer builds against jdk8 -->
- <maven.compiler.release>8</maven.compiler.release>
- </properties>
-
<dependencies>
<!-- provided -->
<dependency>
@@ -73,11 +67,7 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
- <jdkToolchain>
- <version>${java.version}</version>
- </jdkToolchain>
- <source>${java.version}</source>
- <target>${java.version}</target>
+ <release>${vespaClients.jdk.releaseVersion}</release>
<showDeprecation>true</showDeprecation>
<compilerArgs>
<arg>-Xlint:all</arg>
diff --git a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/nodeagent/NodeAgentImpl.java b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/nodeagent/NodeAgentImpl.java
index 92aacf8827b..f184deab375 100644
--- a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/nodeagent/NodeAgentImpl.java
+++ b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/nodeagent/NodeAgentImpl.java
@@ -438,19 +438,19 @@ public class NodeAgentImpl implements NodeAgent {
void doConverge(NodeAgentContext context) {
NodeSpec node = context.node();
Optional<Container> container = getContainer(context);
- if (!node.equals(lastNode)) {
- logChangesToNodeSpec(context, lastNode, node);
- // Current reboot generation uninitialized or incremented from outside to cancel reboot
- if (currentRebootGeneration < node.currentRebootGeneration())
- currentRebootGeneration = node.currentRebootGeneration();
+ // Current reboot generation uninitialized or incremented from outside to cancel reboot
+ if (currentRebootGeneration < node.currentRebootGeneration())
+ currentRebootGeneration = node.currentRebootGeneration();
- // Either we have changed allocation status (restart gen. only available to allocated nodes), or
- // restart generation has been incremented from outside to cancel restart
- if (currentRestartGeneration.isPresent() != node.currentRestartGeneration().isPresent() ||
- currentRestartGeneration.map(current -> current < node.currentRestartGeneration().get()).orElse(false))
- currentRestartGeneration = node.currentRestartGeneration();
+ // Either we have changed allocation status (restart gen. only available to allocated nodes), or
+ // restart generation has been incremented from outside to cancel restart
+ if (currentRestartGeneration.isPresent() != node.currentRestartGeneration().isPresent() ||
+ currentRestartGeneration.map(current -> current < node.currentRestartGeneration().get()).orElse(false))
+ currentRestartGeneration = node.currentRestartGeneration();
+ if (!node.equals(lastNode)) {
+ logChangesToNodeSpec(context, lastNode, node);
lastNode = node;
}
diff --git a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/task/util/process/TestProcessFactory.java b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/task/util/process/TestProcessFactory.java
index 6bd9deb76ed..063dc7f1324 100644
--- a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/task/util/process/TestProcessFactory.java
+++ b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/task/util/process/TestProcessFactory.java
@@ -93,8 +93,10 @@ public class TestProcessFactory implements ProcessFactory {
String actualCommandLineString = commandLine.toString();
if (!Objects.equals(actualCommandLineString, expectedCommandLineString)) {
muteVerifyAllCommandsExecuted = true;
- throw new IllegalArgumentException("Expected command #" + commandSequenceNumber + " to be '" +
- expectedCommandLineString + "' but got '" + actualCommandLineString + "'");
+ throw new IllegalArgumentException("Expected command #" + commandSequenceNumber + " to be: \n" +
+ " \"" + expectedCommandLineString + "\"\n" +
+ "but got:\n" +
+ " \"" + actualCommandLineString + "\"");
}
return toReturn;
diff --git a/parent/pom.xml b/parent/pom.xml
index aea586f3949..c0ba936d5e0 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -922,6 +922,7 @@
<doclint>all</doclint>
<test.hide>true</test.hide>
+ <vespaClients.jdk.releaseVersion>8</vespaClients.jdk.releaseVersion>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
</properties>
diff --git a/security-utils/pom.xml b/security-utils/pom.xml
index 4e33e31c8c4..b7c7c110ad8 100644
--- a/security-utils/pom.xml
+++ b/security-utils/pom.xml
@@ -12,12 +12,6 @@
<packaging>bundle</packaging>
<version>7-SNAPSHOT</version>
- <properties>
- <!-- vespa-http-client targets jdk8 and uses this library -->
- <!-- TODO remove once vespa-http-client no longer builds against jdk8 -->
- <maven.compiler.release>8</maven.compiler.release>
- </properties>
-
<dependencies>
<!-- provided -->
<dependency>
@@ -73,11 +67,7 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
- <jdkToolchain>
- <version>${java.version}</version>
- </jdkToolchain>
- <source>${java.version}</source>
- <target>${java.version}</target>
+ <release>${vespaClients.jdk.releaseVersion}</release>
<showDeprecation>true</showDeprecation>
<compilerArgs>
<arg>-Xlint:all</arg>
diff --git a/storage/src/tests/distributor/CMakeLists.txt b/storage/src/tests/distributor/CMakeLists.txt
index 7348cfc328b..bee7650aebd 100644
--- a/storage/src/tests/distributor/CMakeLists.txt
+++ b/storage/src/tests/distributor/CMakeLists.txt
@@ -25,6 +25,7 @@ vespa_add_executable(storage_distributor_gtest_runner_app TEST
mergelimitertest.cpp
mergeoperationtest.cpp
multi_thread_stripe_access_guard_test.cpp
+ node_supported_features_repo_test.cpp
nodeinfotest.cpp
nodemaintenancestatstrackertest.cpp
operation_sequencer_test.cpp
diff --git a/storage/src/tests/distributor/blockingoperationstartertest.cpp b/storage/src/tests/distributor/blockingoperationstartertest.cpp
index 15aada37c9b..7c97c962a97 100644
--- a/storage/src/tests/distributor/blockingoperationstartertest.cpp
+++ b/storage/src/tests/distributor/blockingoperationstartertest.cpp
@@ -100,6 +100,9 @@ struct FakeDistributorStripeOperationContext : public DistributorStripeOperation
const BucketGcTimeCalculator::BucketIdHasher& bucket_id_hasher() const override {
abort();
}
+ const NodeSupportedFeaturesRepo& node_supported_features_repo() const noexcept override {
+ abort();
+ }
};
struct BlockingOperationStarterTest : Test {
diff --git a/storage/src/tests/distributor/distributor_stripe_test.cpp b/storage/src/tests/distributor/distributor_stripe_test.cpp
index 902dd6454f1..8c2ebc983fa 100644
--- a/storage/src/tests/distributor/distributor_stripe_test.cpp
+++ b/storage/src/tests/distributor/distributor_stripe_test.cpp
@@ -185,6 +185,12 @@ struct DistributorStripeTest : Test, DistributorStripeTestUtil {
configure_stripe(builder);
}
+ void configure_use_unordered_merge_chaining(bool use_unordered) {
+ ConfigBuilder builder;
+ builder.useUnorderedMergeChaining = use_unordered;
+ configure_stripe(builder);
+ }
+
bool scheduler_has_implicitly_clear_priority_on_schedule_set() const noexcept {
return _stripe->_scheduler->implicitly_clear_priority_on_schedule();
}
@@ -982,4 +988,15 @@ TEST_F(DistributorStripeTest, closing_aborts_gets_started_outside_stripe_thread)
EXPECT_EQ(api::ReturnCode::ABORTED, _sender.reply(0)->getResult().getResult());
}
+TEST_F(DistributorStripeTest, use_unordered_merge_chaining_config_is_propagated_to_internal_config)
+{
+ setup_stripe(Redundancy(1), NodeCount(1), "distributor:1 storage:1");
+
+ configure_use_unordered_merge_chaining(true);
+ EXPECT_TRUE(getConfig().use_unordered_merge_chaining());
+
+ configure_use_unordered_merge_chaining(false);
+ EXPECT_FALSE(getConfig().use_unordered_merge_chaining());
+}
+
}
diff --git a/storage/src/tests/distributor/distributor_stripe_test_util.cpp b/storage/src/tests/distributor/distributor_stripe_test_util.cpp
index c5c51e64e68..b96b2dda1cb 100644
--- a/storage/src/tests/distributor/distributor_stripe_test_util.cpp
+++ b/storage/src/tests/distributor/distributor_stripe_test_util.cpp
@@ -9,8 +9,10 @@
#include <vespa/storage/distributor/distributor_stripe_component.h>
#include <vespa/storage/distributor/distributormetricsset.h>
#include <vespa/storage/distributor/ideal_state_total_metrics.h>
+#include <vespa/storage/distributor/node_supported_features_repo.h>
#include <vespa/vdslib/distribution/distribution.h>
#include <vespa/vespalib/text/stringtokenizer.h>
+#include <vespa/vespalib/stllike/hash_map.hpp>
using document::test::makeBucketSpace;
using document::test::makeDocumentBucket;
@@ -526,6 +528,13 @@ DistributorStripeTestUtil::db_memory_sample_interval() const noexcept {
return _stripe->db_memory_sample_interval();
}
+void
+DistributorStripeTestUtil::set_node_supported_features(uint16_t node, const NodeSupportedFeatures& features) {
+ vespalib::hash_map<uint16_t, NodeSupportedFeatures> new_features;
+ new_features[node] = features;
+ _stripe->update_node_supported_features_repo(_stripe->node_supported_features_repo().make_union_of(new_features));
+}
+
const lib::Distribution&
DistributorStripeTestUtil::getDistribution() const {
return getBucketSpaceRepo().get(makeBucketSpace()).getDistribution();
diff --git a/storage/src/tests/distributor/distributor_stripe_test_util.h b/storage/src/tests/distributor/distributor_stripe_test_util.h
index b1e90821e3b..3226c16aba3 100644
--- a/storage/src/tests/distributor/distributor_stripe_test_util.h
+++ b/storage/src/tests/distributor/distributor_stripe_test_util.h
@@ -26,6 +26,7 @@ class DocumentSelectionParser;
class ExternalOperationHandler;
class IdealStateManager;
class IdealStateMetricSet;
+class NodeSupportedFeatures;
class Operation;
class StripeBucketDBUpdater;
@@ -150,6 +151,7 @@ public:
[[nodiscard]] const PendingMessageTracker& pending_message_tracker() const noexcept;
[[nodiscard]] PendingMessageTracker& pending_message_tracker() noexcept;
[[nodiscard]] std::chrono::steady_clock::duration db_memory_sample_interval() const noexcept;
+ void set_node_supported_features(uint16_t node, const NodeSupportedFeatures& features);
const lib::Distribution& getDistribution() const;
diff --git a/storage/src/tests/distributor/mergeoperationtest.cpp b/storage/src/tests/distributor/mergeoperationtest.cpp
index 65ee5254193..54bd06c98e0 100644
--- a/storage/src/tests/distributor/mergeoperationtest.cpp
+++ b/storage/src/tests/distributor/mergeoperationtest.cpp
@@ -1,5 +1,5 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#include <tests/common/dummystoragelink.h>
+
#include <tests/distributor/distributor_stripe_test_util.h>
#include <vespa/document/test/make_bucket_space.h>
#include <vespa/document/test/make_document_bucket.h>
@@ -12,6 +12,7 @@
#include <vespa/vdslib/distribution/distribution.h>
#include <vespa/vespalib/gtest/gtest.h>
#include <vespa/vespalib/text/stringtokenizer.h>
+#include <charconv>
using document::test::makeDocumentBucket;
using document::test::makeBucketSpace;
@@ -37,6 +38,7 @@ struct MergeOperationTest : Test, DistributorStripeTestUtil {
}
std::shared_ptr<MergeOperation> setup_minimal_merge_op();
+ std::shared_ptr<MergeOperation> setup_simple_merge_op(const std::vector<uint16_t>& nodes);
std::shared_ptr<MergeOperation> setup_simple_merge_op();
void assert_simple_merge_bucket_command();
void assert_simple_delete_bucket_command();
@@ -47,13 +49,13 @@ std::shared_ptr<MergeOperation>
MergeOperationTest::setup_minimal_merge_op()
{
document::BucketId bucket_id(16, 1);
- auto op = std::make_shared<MergeOperation>(BucketAndNodes(makeDocumentBucket(bucket_id), toVector<uint16_t>(0, 1, 2)));
+ auto op = std::make_shared<MergeOperation>(BucketAndNodes(makeDocumentBucket(bucket_id), {0, 1, 2}));
op->setIdealStateManager(&getIdealStateManager());
return op;
}
std::shared_ptr<MergeOperation>
-MergeOperationTest::setup_simple_merge_op()
+MergeOperationTest::setup_simple_merge_op(const std::vector<uint16_t>& nodes)
{
getClock().setAbsoluteTimeInSeconds(10);
@@ -64,12 +66,18 @@ MergeOperationTest::setup_simple_merge_op()
enable_cluster_state("distributor:1 storage:3");
- auto op = std::make_shared<MergeOperation>(BucketAndNodes(makeDocumentBucket(document::BucketId(16, 1)), toVector<uint16_t>(0, 1, 2)));
+ auto op = std::make_shared<MergeOperation>(BucketAndNodes(makeDocumentBucket(document::BucketId(16, 1)), nodes));
op->setIdealStateManager(&getIdealStateManager());
op->start(_sender, framework::MilliSecTime(0));
return op;
}
+std::shared_ptr<MergeOperation>
+MergeOperationTest::setup_simple_merge_op()
+{
+ return setup_simple_merge_op({0, 1, 2});
+}
+
void
MergeOperationTest::assert_simple_merge_bucket_command()
{
@@ -150,8 +158,10 @@ std::string getNodeList(std::string state, uint32_t redundancy, std::string exis
num.erase(pos);
trusted = true;
}
- bucketDB[i] = BucketCopy(0, atoi(num.c_str()),
- api::BucketInfo(1, 2, 3));
+ uint16_t node;
+ [[maybe_unused]] auto [ptr, ec] = std::from_chars(num.data(), num.data() + num.size(), node);
+ assert(ec == std::errc{});
+ bucketDB[i] = BucketCopy(0, node, api::BucketInfo(1, 2, 3));
bucketDB[i].setTrusted(trusted);
}
std::vector<MergeMetaData> nodes(st.size());
@@ -553,4 +563,44 @@ TEST_F(MergeOperationTest, on_throttled_updates_metrics)
EXPECT_EQ(1, metrics->throttled.getValue());
}
+TEST_F(MergeOperationTest, unordered_merges_only_sent_iff_config_enabled_and_all_nodes_support_feature) {
+ setup_stripe(Redundancy(4), NodeCount(4), "distributor:1 storage:4");
+ NodeSupportedFeatures with_unordered;
+ with_unordered.unordered_merge_chaining = true;
+
+ set_node_supported_features(1, with_unordered);
+ set_node_supported_features(2, with_unordered);
+
+ auto config = make_config();
+ config->set_use_unordered_merge_chaining(true);
+ configure_stripe(std::move(config));
+
+ // Only nodes {1, 2} support unordered merging; merges should be ordered (sent to lowest index node 1).
+ setup_simple_merge_op({1, 2, 3}); // Note: these will be re-ordered in ideal state order internally
+ ASSERT_EQ("MergeBucketCommand(BucketId(0x4000000000000001), to time 10000000, "
+ "cluster state version: 0, nodes: [2, 1, 3], chain: [], "
+ "reasons to start: ) => 1",
+ _sender.getLastCommand(true));
+
+ // All involved nodes support unordered merging; merges should be unordered (sent to ideal node 2)
+ setup_simple_merge_op({1, 2});
+ ASSERT_EQ("MergeBucketCommand(BucketId(0x4000000000000001), to time 10000001, "
+ "cluster state version: 0, nodes: [2, 1], chain: [] (unordered forwarding), "
+ "reasons to start: ) => 2",
+ _sender.getLastCommand(true));
+
+ _sender.clear();
+
+ config = make_config();
+ config->set_use_unordered_merge_chaining(false);
+ configure_stripe(std::move(config));
+
+ // If config is not enabled, should send ordered even if nodes support the feature.
+ setup_simple_merge_op({2, 1});
+ ASSERT_EQ("MergeBucketCommand(BucketId(0x4000000000000001), to time 10000002, "
+ "cluster state version: 0, nodes: [2, 1], chain: [], "
+ "reasons to start: ) => 1",
+ _sender.getLastCommand(true));
+}
+
} // storage::distributor
diff --git a/storage/src/tests/distributor/mock_tickable_stripe.h b/storage/src/tests/distributor/mock_tickable_stripe.h
index 38fc0c599a2..ec2f978c029 100644
--- a/storage/src/tests/distributor/mock_tickable_stripe.h
+++ b/storage/src/tests/distributor/mock_tickable_stripe.h
@@ -33,6 +33,10 @@ struct MockTickableStripe : TickableStripe {
void update_read_snapshot_after_activation(const lib::ClusterStateBundle&) override { abort(); }
void clear_read_only_bucket_repo_databases() override { abort(); }
+ void update_node_supported_features_repo(std::shared_ptr<const NodeSupportedFeaturesRepo>) override {
+ abort();
+ }
+
void report_bucket_db_status(document::BucketSpace, std::ostream&) const override { abort(); }
StripeAccessGuard::PendingOperationStats pending_operation_stats() const override { abort(); }
void report_single_bucket_requests(vespalib::xml::XmlOutputStream&) const override { abort(); }
diff --git a/storage/src/tests/distributor/node_supported_features_repo_test.cpp b/storage/src/tests/distributor/node_supported_features_repo_test.cpp
new file mode 100644
index 00000000000..990e0fc50a3
--- /dev/null
+++ b/storage/src/tests/distributor/node_supported_features_repo_test.cpp
@@ -0,0 +1,52 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include <vespa/storage/distributor/node_supported_features_repo.h>
+#include <vespa/vespalib/stllike/hash_map.hpp>
+#include <vespa/vespalib/gtest/gtest.h>
+
+using namespace ::testing;
+
+namespace storage::distributor {
+
+struct NodeSupportedFeaturesRepoTest : Test {
+ using FeatureMap = vespalib::hash_map<uint16_t, NodeSupportedFeatures>;
+ NodeSupportedFeaturesRepo _repo;
+
+ static NodeSupportedFeatures set_features() noexcept {
+ NodeSupportedFeatures f;
+ f.unordered_merge_chaining = true;
+ return f;
+ }
+
+ static NodeSupportedFeatures unset_features() noexcept {
+ return {};
+ }
+};
+
+TEST_F(NodeSupportedFeaturesRepoTest, feature_set_is_empty_by_default) {
+ EXPECT_EQ(_repo.node_supported_features(0), unset_features());
+ EXPECT_EQ(_repo.node_supported_features(12345), unset_features());
+}
+
+TEST_F(NodeSupportedFeaturesRepoTest, make_union_of_can_add_new_feature_mapping) {
+ FeatureMap fm;
+ fm[1] = set_features();
+ fm[60] = set_features();
+ auto new_repo = _repo.make_union_of(fm);
+ EXPECT_EQ(new_repo->node_supported_features(0), unset_features());
+ EXPECT_EQ(new_repo->node_supported_features(1), set_features());
+ EXPECT_EQ(new_repo->node_supported_features(60), set_features());
+}
+
+TEST_F(NodeSupportedFeaturesRepoTest, make_union_of_updates_existing_feature_mappings) {
+ FeatureMap fm;
+ fm[1] = set_features();
+ fm[60] = set_features();
+ auto new_repo = _repo.make_union_of(fm);
+ fm[1] = unset_features();
+ new_repo = new_repo->make_union_of(fm);
+ EXPECT_EQ(new_repo->node_supported_features(1), unset_features());
+ EXPECT_EQ(new_repo->node_supported_features(60), set_features());
+}
+
+}
diff --git a/storage/src/tests/distributor/top_level_bucket_db_updater_test.cpp b/storage/src/tests/distributor/top_level_bucket_db_updater_test.cpp
index fe8a607c9ae..3ed5e9f4a8d 100644
--- a/storage/src/tests/distributor/top_level_bucket_db_updater_test.cpp
+++ b/storage/src/tests/distributor/top_level_bucket_db_updater_test.cpp
@@ -4,6 +4,7 @@
#include <vespa/storage/distributor/top_level_bucket_db_updater.h>
#include <vespa/storage/distributor/bucket_space_distribution_context.h>
#include <vespa/storage/distributor/distributormetricsset.h>
+#include <vespa/storage/distributor/node_supported_features_repo.h>
#include <vespa/storage/distributor/pending_bucket_space_db_transition.h>
#include <vespa/storage/distributor/outdated_nodes_map.h>
#include <vespa/storage/storageutil/distributorstatecache.h>
@@ -119,6 +120,21 @@ public:
invalid_bucket_count));
}
+ void fake_bucket_reply(const lib::ClusterState &state,
+ const api::StorageCommand &cmd,
+ uint32_t bucket_count,
+ const std::function<void(api::RequestBucketInfoReply&)>& reply_decorator)
+ {
+ ASSERT_EQ(cmd.getType(), MessageType::REQUESTBUCKETINFO);
+ const api::StorageMessageAddress& address(*cmd.getAddress());
+ auto reply = make_fake_bucket_reply(state,
+ dynamic_cast<const RequestBucketInfoCommand &>(cmd),
+ address.getIndex(),
+ bucket_count, 0);
+ reply_decorator(*reply);
+ bucket_db_updater().onRequestBucketInfoReply(reply);
+ }
+
void send_fake_reply_for_single_bucket_request(
const api::RequestBucketInfoCommand& rbi)
{
@@ -232,7 +248,7 @@ public:
}
}
- api::StorageMessageAddress storage_address(uint16_t node) {
+ static api::StorageMessageAddress storage_address(uint16_t node) {
static vespalib::string _storage("storage");
return api::StorageMessageAddress(&_storage, lib::NodeType::STORAGE, node);
}
@@ -1299,7 +1315,7 @@ TEST_F(TopLevelBucketDBUpdaterTest, merge_reply_node_down_after_request_sent) {
add_nodes_to_stripe_bucket_db(bucket_id, "0=1234,1=1234,2=1234");
for (uint32_t i = 0; i < 3; ++i) {
- nodes.push_back(api::MergeBucketCommand::Node(i));
+ nodes.emplace_back(i);
}
api::MergeBucketCommand cmd(makeDocumentBucket(bucket_id), nodes, 0);
@@ -2662,4 +2678,37 @@ TEST_F(BucketDBUpdaterSnapshotTest, snapshot_is_unroutable_if_stale_reads_disabl
EXPECT_FALSE(def_rs.is_routable());
}
+TEST_F(BucketDBUpdaterSnapshotTest, node_feature_sets_are_aggregated_from_nodes_and_propagated_to_stripes) {
+ lib::ClusterState state("distributor:1 storage:3");
+ set_cluster_state(state);
+ uint32_t expected_msgs = message_count(3), dummy_buckets_to_return = 1;
+
+ // Known feature sets are initially empty.
+ auto stripes = distributor_stripes();
+ for (auto* s : stripes) {
+ for (uint16_t i : {0, 1, 2}) {
+ EXPECT_FALSE(s->node_supported_features_repo().node_supported_features(i).unordered_merge_chaining);
+ }
+ }
+
+ ASSERT_EQ(expected_msgs, _sender.commands().size());
+ for (uint32_t i = 0; i < _sender.commands().size(); i++) {
+ ASSERT_NO_FATAL_FAILURE(fake_bucket_reply(state, *_sender.command(i),
+ dummy_buckets_to_return, [i](auto& reply) noexcept {
+ // Pretend nodes 1 and 2 are on a shiny version with unordered merge chaining supported.
+ // Node 0 does not support the fanciness.
+ if (i > 0) {
+ reply.supported_node_features().unordered_merge_chaining = true;
+ }
+ }));
+ }
+
+ // Node features should be propagated to all stripes
+ for (auto* s : stripes) {
+ EXPECT_FALSE(s->node_supported_features_repo().node_supported_features(0).unordered_merge_chaining);
+ EXPECT_TRUE(s->node_supported_features_repo().node_supported_features(1).unordered_merge_chaining);
+ EXPECT_TRUE(s->node_supported_features_repo().node_supported_features(2).unordered_merge_chaining);
+ }
+}
+
}
diff --git a/storage/src/tests/distributor/top_level_distributor_test_util.cpp b/storage/src/tests/distributor/top_level_distributor_test_util.cpp
index 636a09d1f6e..2a61141865a 100644
--- a/storage/src/tests/distributor/top_level_distributor_test_util.cpp
+++ b/storage/src/tests/distributor/top_level_distributor_test_util.cpp
@@ -115,13 +115,13 @@ TopLevelDistributorTestUtil::handle_top_level_message(const std::shared_ptr<api:
void
TopLevelDistributorTestUtil::close()
{
- _component.reset(0);
- if (_distributor.get()) {
+ _component.reset();
+ if (_distributor) {
_stripe_pool->stop_and_join(); // Must be tagged as stopped prior to onClose
_distributor->onClose();
}
_sender.clear();
- _node.reset(0);
+ _node.reset();
_config = getStandardConfig(false);
}
diff --git a/storage/src/tests/storageserver/mergethrottlertest.cpp b/storage/src/tests/storageserver/mergethrottlertest.cpp
index e8f8e425af4..0f844ab6b4f 100644
--- a/storage/src/tests/storageserver/mergethrottlertest.cpp
+++ b/storage/src/tests/storageserver/mergethrottlertest.cpp
@@ -52,15 +52,18 @@ struct MergeBuilder {
~MergeBuilder();
MergeBuilder& nodes(uint16_t n0) {
+ _nodes.clear();
_nodes.push_back(n0);
return *this;
}
MergeBuilder& nodes(uint16_t n0, uint16_t n1) {
+ _nodes.clear();
_nodes.push_back(n0);
_nodes.push_back(n1);
return *this;
}
MergeBuilder& nodes(uint16_t n0, uint16_t n1, uint16_t n2) {
+ _nodes.clear();
_nodes.push_back(n0);
_nodes.push_back(n1);
_nodes.push_back(n2);
@@ -146,7 +149,8 @@ struct MergeThrottlerTest : Test {
api::ReturnCode::Result expectedResultCode);
void fill_throttler_queue_with_n_commands(uint16_t throttler_index, size_t queued_count);
- void receive_chained_merge_with_full_queue(bool disable_queue_limits);
+ void fill_up_throttler_active_window_and_queue(uint16_t node_idx);
+ void receive_chained_merge_with_full_queue(bool disable_queue_limits, bool unordered_fwd = false);
std::shared_ptr<api::MergeBucketCommand> peek_throttler_queue_top(size_t throttler_idx) {
auto& queue = _throttlers[throttler_idx]->getMergeQueue();
@@ -1197,7 +1201,7 @@ TEST_F(MergeThrottlerTest, busy_returned_on_full_queue_for_merges_sent_from_dist
}
void
-MergeThrottlerTest::receive_chained_merge_with_full_queue(bool disable_queue_limits)
+MergeThrottlerTest::receive_chained_merge_with_full_queue(bool disable_queue_limits, bool unordered_fwd)
{
// Note: uses node with index 1 to not be the first node in chain
_throttlers[1]->set_disable_queue_limits_for_chained_merges(disable_queue_limits);
@@ -1218,10 +1222,15 @@ MergeThrottlerTest::receive_chained_merge_with_full_queue(bool disable_queue_lim
// Send down another merge with non-empty chain. It should _not_ be busy bounced
// (if limits disabled) as it has already been accepted into another node's merge window.
{
- std::vector<MergeBucketCommand::Node> nodes({{0}, {1}, {2}});
+ std::vector<MergeBucketCommand::Node> nodes({{2}, {1}, {0}});
auto cmd = std::make_shared<MergeBucketCommand>(
makeDocumentBucket(BucketId(32, 0xf000baaa)), nodes, 1234, 1);
- cmd->setChain(std::vector<uint16_t>({0})); // Forwarded from node 0
+ if (!unordered_fwd) {
+ cmd->setChain(std::vector<uint16_t>({0})); // Forwarded from node 0
+ } else {
+ cmd->setChain(std::vector<uint16_t>({2})); // Forwarded from node 2, i.e. _not_ the lowest index
+ }
+ cmd->set_use_unordered_forwarding(unordered_fwd);
_topLinks[1]->sendDown(cmd);
}
}
@@ -1249,11 +1258,34 @@ TEST_F(MergeThrottlerTest, forwarded_merge_has_higher_pri_when_chain_limits_disa
EXPECT_FALSE(highest_pri_merge->getChain().empty()); // Should be the forwarded merge
}
+TEST_F(MergeThrottlerTest, forwarded_unordered_merge_is_directly_accepted_into_active_window) {
+ // Unordered forwarding is orthogonal to disabled chain limits config, so we implicitly test that too.
+ ASSERT_NO_FATAL_FAILURE(receive_chained_merge_with_full_queue(true, true));
+
+ // Unordered merge is immediately forwarded to the next node
+ _topLinks[1]->waitForMessage(MessageType::MERGEBUCKET, _messageWaitTime);
+ auto fwd = std::dynamic_pointer_cast<api::MergeBucketCommand>(
+ _topLinks[1]->getAndRemoveMessage(MessageType::MERGEBUCKET));
+ ASSERT_TRUE(fwd);
+ EXPECT_TRUE(fwd->use_unordered_forwarding());
+ EXPECT_EQ(fwd->getChain(), std::vector<uint16_t>({2, 1}));
+}
+
+TEST_F(MergeThrottlerTest, non_forwarded_unordered_merge_is_enqueued_if_active_window_full)
+{
+ fill_throttler_queue_with_n_commands(1, 0); // Fill active window entirely
+ {
+ std::vector<MergeBucketCommand::Node> nodes({{1}, {2}, {0}});
+ auto cmd = std::make_shared<MergeBucketCommand>(
+ makeDocumentBucket(BucketId(32, 0xf000baaa)), nodes, 1234, 1);
+ cmd->set_use_unordered_forwarding(true);
+ _topLinks[1]->sendDown(cmd);
+ }
+ waitUntilMergeQueueIs(*_throttlers[1], 1, _messageWaitTime); // Should be in queue, not active window
+}
+
TEST_F(MergeThrottlerTest, broken_cycle) {
- std::vector<MergeBucketCommand::Node> nodes;
- nodes.push_back(1);
- nodes.push_back(0);
- nodes.push_back(2);
+ std::vector<MergeBucketCommand::Node> nodes({1, 0, 2});
{
std::vector<uint16_t> chain;
chain.push_back(0);
@@ -1268,10 +1300,7 @@ TEST_F(MergeThrottlerTest, broken_cycle) {
// Send cycled merge which will be executed
{
- std::vector<uint16_t> chain;
- chain.push_back(0);
- chain.push_back(1);
- chain.push_back(2);
+ std::vector<uint16_t> chain({0, 1, 2});
auto cmd = std::make_shared<MergeBucketCommand>(
makeDocumentBucket(BucketId(32, 0xfeef00)), nodes, 1234, 1, chain);
_topLinks[1]->sendDown(cmd);
@@ -1425,9 +1454,10 @@ TEST_F(MergeThrottlerTest, source_only_merges_are_not_affected_by_backpressure)
void MergeThrottlerTest::fill_throttler_queue_with_n_commands(uint16_t throttler_index, size_t queued_count) {
size_t max_pending = _throttlers[throttler_index]->getThrottlePolicy().getMaxPendingCount();
for (size_t i = 0; i < max_pending + queued_count; ++i) {
- _topLinks[throttler_index]->sendDown(MergeBuilder(document::BucketId(16, i)).create());
+ _topLinks[throttler_index]->sendDown(MergeBuilder(document::BucketId(16, i))
+ .nodes(throttler_index, throttler_index + 1)
+ .create());
}
-
// Wait till we have max_pending merge forwards and queued_count enqueued.
_topLinks[throttler_index]->waitForMessages(max_pending, _messageWaitTime);
waitUntilMergeQueueIs(*_throttlers[throttler_index], queued_count, _messageWaitTime);
diff --git a/storage/src/vespa/storage/config/distributorconfiguration.cpp b/storage/src/vespa/storage/config/distributorconfiguration.cpp
index a23d00ee6a3..8a40899165f 100644
--- a/storage/src/vespa/storage/config/distributorconfiguration.cpp
+++ b/storage/src/vespa/storage/config/distributorconfiguration.cpp
@@ -50,6 +50,7 @@ DistributorConfiguration::DistributorConfiguration(StorageComponent& component)
_prioritize_global_bucket_merges(true),
_enable_revert(true),
_implicitly_clear_priority_on_schedule(false),
+ _use_unordered_merge_chaining(false),
_minimumReplicaCountingMode(ReplicaCountingMode::TRUSTED)
{
}
@@ -171,6 +172,7 @@ DistributorConfiguration::configure(const vespa::config::content::core::StorDist
_max_activation_inhibited_out_of_sync_groups = config.maxActivationInhibitedOutOfSyncGroups;
_enable_revert = config.enableRevert;
_implicitly_clear_priority_on_schedule = config.implicitlyClearBucketPriorityOnSchedule;
+ _use_unordered_merge_chaining = config.useUnorderedMergeChaining;
_minimumReplicaCountingMode = config.minimumReplicaCountingMode;
diff --git a/storage/src/vespa/storage/config/distributorconfiguration.h b/storage/src/vespa/storage/config/distributorconfiguration.h
index 7b4e082d1ed..ea1aca17116 100644
--- a/storage/src/vespa/storage/config/distributorconfiguration.h
+++ b/storage/src/vespa/storage/config/distributorconfiguration.h
@@ -267,6 +267,12 @@ public:
[[nodiscard]] bool implicitly_clear_priority_on_schedule() const noexcept {
return _implicitly_clear_priority_on_schedule;
}
+ void set_use_unordered_merge_chaining(bool unordered) noexcept {
+ _use_unordered_merge_chaining = unordered;
+ }
+ [[nodiscard]] bool use_unordered_merge_chaining() const noexcept {
+ return _use_unordered_merge_chaining;
+ }
uint32_t num_distributor_stripes() const noexcept { return _num_distributor_stripes; }
@@ -324,6 +330,7 @@ private:
bool _prioritize_global_bucket_merges;
bool _enable_revert;
bool _implicitly_clear_priority_on_schedule;
+ bool _use_unordered_merge_chaining;
DistrConfig::MinimumReplicaCountingMode _minimumReplicaCountingMode;
diff --git a/storage/src/vespa/storage/config/stor-distributormanager.def b/storage/src/vespa/storage/config/stor-distributormanager.def
index 8a9fdf74802..8021075faa3 100644
--- a/storage/src/vespa/storage/config/stor-distributormanager.def
+++ b/storage/src/vespa/storage/config/stor-distributormanager.def
@@ -286,3 +286,10 @@ num_distributor_stripes int default=0 restart
## bucket due to being blocked by concurrent operations. This avoids potential head-of-line
## blocking of later buckets in the priority database.
implicitly_clear_bucket_priority_on_schedule bool default=false
+
+## Enables sending merges that are forwarded between content nodes in ideal state node key
+## order, instead of strictly increasing node key order (which is the default).
+## Even if this config is set to true, unordered merges will only be sent if _all_ nodes
+## involved in a given merge have previously reported (as part of bucket info fetching)
+## that they support the unordered merge feature.
+use_unordered_merge_chaining bool default=false
diff --git a/storage/src/vespa/storage/distributor/CMakeLists.txt b/storage/src/vespa/storage/distributor/CMakeLists.txt
index 52171406ebf..470bfb69abb 100644
--- a/storage/src/vespa/storage/distributor/CMakeLists.txt
+++ b/storage/src/vespa/storage/distributor/CMakeLists.txt
@@ -32,6 +32,7 @@ vespa_add_library(storage_distributor
messagetracker.cpp
min_replica_provider.cpp
multi_threaded_stripe_access_guard.cpp
+ node_supported_features_repo.cpp
nodeinfo.cpp
operation_routing_snapshot.cpp
operation_sequencer.cpp
diff --git a/storage/src/vespa/storage/distributor/distributor_operation_context.h b/storage/src/vespa/storage/distributor/distributor_operation_context.h
index 934c5e364d8..bceb4ed1377 100644
--- a/storage/src/vespa/storage/distributor/distributor_operation_context.h
+++ b/storage/src/vespa/storage/distributor/distributor_operation_context.h
@@ -17,7 +17,7 @@ class DistributorBucketSpaceRepo;
*/
class DistributorOperationContext {
public:
- virtual ~DistributorOperationContext() {}
+ virtual ~DistributorOperationContext() = default;
virtual api::Timestamp generate_unique_timestamp() = 0;
virtual const BucketSpaceStateMap& bucket_space_states() const noexcept = 0;
virtual BucketSpaceStateMap& bucket_space_states() noexcept = 0;
diff --git a/storage/src/vespa/storage/distributor/distributor_stripe.cpp b/storage/src/vespa/storage/distributor/distributor_stripe.cpp
index 9f565686216..50c70306d92 100644
--- a/storage/src/vespa/storage/distributor/distributor_stripe.cpp
+++ b/storage/src/vespa/storage/distributor/distributor_stripe.cpp
@@ -6,6 +6,7 @@
#include "distributor_stripe.h"
#include "distributormetricsset.h"
#include "idealstatemetricsset.h"
+#include "node_supported_features_repo.h"
#include "operation_sequencer.h"
#include "ownership_transfer_safe_time_point_calculator.h"
#include "storage_node_up_states.h"
@@ -68,6 +69,7 @@ DistributorStripe::DistributorStripe(DistributorComponentRegister& compReg,
_recoveryTimeStarted(_component.getClock()),
_tickResult(framework::ThreadWaitInfo::NO_MORE_CRITICAL_WORK_KNOWN),
_bucketIdHasher(std::make_unique<BucketGcTimeCalculator::BucketIdIdentityHasher>()),
+ _node_supported_features_repo(std::make_shared<const NodeSupportedFeaturesRepo>()),
_metricLock(),
_maintenanceStats(),
_bucketSpacesStats(),
@@ -872,6 +874,12 @@ DistributorStripe::clear_read_only_bucket_repo_databases()
}
void
+DistributorStripe::update_node_supported_features_repo(std::shared_ptr<const NodeSupportedFeaturesRepo> features_repo)
+{
+ _node_supported_features_repo = std::move(features_repo);
+}
+
+void
DistributorStripe::report_bucket_db_status(document::BucketSpace bucket_space, std::ostream& out) const
{
ideal_state_manager().dump_bucket_space_db_status(bucket_space, out);
@@ -889,4 +897,10 @@ DistributorStripe::report_delayed_single_bucket_requests(vespalib::xml::XmlOutpu
bucket_db_updater().report_delayed_single_bucket_requests(xos);
}
+const NodeSupportedFeaturesRepo&
+DistributorStripe::node_supported_features_repo() const noexcept
+{
+ return *_node_supported_features_repo;
+}
+
}
diff --git a/storage/src/vespa/storage/distributor/distributor_stripe.h b/storage/src/vespa/storage/distributor/distributor_stripe.h
index 5ba682d46e3..ce6a2071efd 100644
--- a/storage/src/vespa/storage/distributor/distributor_stripe.h
+++ b/storage/src/vespa/storage/distributor/distributor_stripe.h
@@ -160,6 +160,8 @@ public:
return *_bucketIdHasher;
}
+ const NodeSupportedFeaturesRepo& node_supported_features_repo() const noexcept override;
+
StripeBucketDBUpdater& bucket_db_updater() { return _bucketDBUpdater; }
const StripeBucketDBUpdater& bucket_db_updater() const { return _bucketDBUpdater; }
IdealStateManager& ideal_state_manager() { return _idealStateManager; }
@@ -283,6 +285,7 @@ private:
void update_read_snapshot_after_db_pruning(const lib::ClusterStateBundle& new_state) override;
void update_read_snapshot_after_activation(const lib::ClusterStateBundle& activated_state) override;
void clear_read_only_bucket_repo_databases() override;
+ void update_node_supported_features_repo(std::shared_ptr<const NodeSupportedFeaturesRepo> features_repo) override;
void report_bucket_db_status(document::BucketSpace bucket_space, std::ostream& out) const override;
void report_single_bucket_requests(vespalib::xml::XmlOutputStream& xos) const override;
void report_delayed_single_bucket_requests(vespalib::xml::XmlOutputStream& xos) const override;
@@ -338,6 +341,7 @@ private:
framework::ThreadWaitInfo _tickResult;
BucketDBMetricUpdater _bucketDBMetricUpdater;
std::unique_ptr<BucketGcTimeCalculator::BucketIdHasher> _bucketIdHasher;
+ std::shared_ptr<const NodeSupportedFeaturesRepo> _node_supported_features_repo;
mutable std::mutex _metricLock;
/**
* Maintenance stats for last completed database scan iteration.
diff --git a/storage/src/vespa/storage/distributor/distributor_stripe_component.cpp b/storage/src/vespa/storage/distributor/distributor_stripe_component.cpp
index f2d2afb8fee..aa0a2289727 100644
--- a/storage/src/vespa/storage/distributor/distributor_stripe_component.cpp
+++ b/storage/src/vespa/storage/distributor/distributor_stripe_component.cpp
@@ -277,6 +277,12 @@ DistributorStripeComponent::storage_node_is_up(document::BucketSpace bucket_spac
return ns.getState().oneOf(storage_node_up_states());
}
+const NodeSupportedFeaturesRepo&
+DistributorStripeComponent::node_supported_features_repo() const noexcept
+{
+ return _distributor.node_supported_features_repo();
+}
+
std::unique_ptr<document::select::Node>
DistributorStripeComponent::parse_selection(const vespalib::string& selection) const
{
diff --git a/storage/src/vespa/storage/distributor/distributor_stripe_component.h b/storage/src/vespa/storage/distributor/distributor_stripe_component.h
index b274e21ac7c..5bcf9eec76d 100644
--- a/storage/src/vespa/storage/distributor/distributor_stripe_component.h
+++ b/storage/src/vespa/storage/distributor/distributor_stripe_component.h
@@ -70,7 +70,7 @@ public:
*/
void update_bucket_database(const document::Bucket& bucket,
const BucketCopy& changed_node,
- uint32_t update_flags = 0) override {
+ uint32_t update_flags) override {
update_bucket_database(bucket,
toVector<BucketCopy>(changed_node),
update_flags);
@@ -79,9 +79,9 @@ public:
/**
* Adds the given copies to the bucket database.
*/
- virtual void update_bucket_database(const document::Bucket& bucket,
- const std::vector<BucketCopy>& changed_nodes,
- uint32_t update_flags = 0) override;
+ void update_bucket_database(const document::Bucket& bucket,
+ const std::vector<BucketCopy>& changed_nodes,
+ uint32_t update_flags) override;
/**
* Removes a copy from the given bucket from the bucket database.
@@ -165,6 +165,8 @@ public:
return getDistributor().getBucketIdHasher();
}
+ const NodeSupportedFeaturesRepo& node_supported_features_repo() const noexcept override;
+
// Implements DocumentSelectionParser
std::unique_ptr<document::select::Node> parse_selection(const vespalib::string& selection) const override;
diff --git a/storage/src/vespa/storage/distributor/distributor_stripe_interface.h b/storage/src/vespa/storage/distributor/distributor_stripe_interface.h
index 4f39dd7e5bc..dfed59499c6 100644
--- a/storage/src/vespa/storage/distributor/distributor_stripe_interface.h
+++ b/storage/src/vespa/storage/distributor/distributor_stripe_interface.h
@@ -16,6 +16,7 @@ namespace storage {
namespace storage::distributor {
class DistributorMetricSet;
+class NodeSupportedFeaturesRepo;
class PendingMessageTracker;
/**
@@ -61,6 +62,7 @@ public:
virtual const DistributorConfiguration& getConfig() const = 0;
virtual ChainedMessageSender& getMessageSender() = 0;
virtual const BucketGcTimeCalculator::BucketIdHasher& getBucketIdHasher() const = 0;
+ virtual const NodeSupportedFeaturesRepo& node_supported_features_repo() const noexcept = 0;
};
}
diff --git a/storage/src/vespa/storage/distributor/distributor_stripe_operation_context.h b/storage/src/vespa/storage/distributor/distributor_stripe_operation_context.h
index 5919261ab43..d6f4e5694f6 100644
--- a/storage/src/vespa/storage/distributor/distributor_stripe_operation_context.h
+++ b/storage/src/vespa/storage/distributor/distributor_stripe_operation_context.h
@@ -16,6 +16,7 @@ namespace storage::lib { class ClusterStateBundle; }
namespace storage::distributor {
class PendingMessageTracker;
+class NodeSupportedFeaturesRepo;
/**
* Interface with functionality that is used when handling distributor stripe operations.
@@ -57,6 +58,7 @@ public:
virtual const lib::ClusterStateBundle& cluster_state_bundle() const = 0;
virtual bool storage_node_is_up(document::BucketSpace bucket_space, uint32_t node_index) const = 0;
virtual const BucketGcTimeCalculator::BucketIdHasher& bucket_id_hasher() const = 0;
+ virtual const NodeSupportedFeaturesRepo& node_supported_features_repo() const noexcept = 0;
};
}
diff --git a/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.cpp b/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.cpp
index 1a44b79ac3a..b00e4ce3cba 100644
--- a/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.cpp
+++ b/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.cpp
@@ -132,6 +132,14 @@ void MultiThreadedStripeAccessGuard::clear_read_only_bucket_repo_databases() {
});
}
+void MultiThreadedStripeAccessGuard::update_node_supported_features_repo(
+ std::shared_ptr<const NodeSupportedFeaturesRepo> features_repo)
+{
+ for_each_stripe([&](TickableStripe& stripe) {
+ stripe.update_node_supported_features_repo(features_repo);
+ });
+}
+
void MultiThreadedStripeAccessGuard::report_bucket_db_status(document::BucketSpace bucket_space, std::ostream& out) const {
for_each_stripe([&](TickableStripe& stripe) {
stripe.report_bucket_db_status(bucket_space, out);
diff --git a/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.h b/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.h
index 53799fa338b..c52a01fdded 100644
--- a/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.h
+++ b/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.h
@@ -54,6 +54,8 @@ public:
void update_read_snapshot_after_activation(const lib::ClusterStateBundle& activated_state) override;
void clear_read_only_bucket_repo_databases() override;
+ void update_node_supported_features_repo(std::shared_ptr<const NodeSupportedFeaturesRepo> features_repo) override;
+
void report_bucket_db_status(document::BucketSpace bucket_space, std::ostream& out) const override;
PendingOperationStats pending_operation_stats() const override;
void report_single_bucket_requests(vespalib::xml::XmlOutputStream& xos) const override;
diff --git a/storage/src/vespa/storage/distributor/node_supported_features.h b/storage/src/vespa/storage/distributor/node_supported_features.h
new file mode 100644
index 00000000000..fb9cc68e970
--- /dev/null
+++ b/storage/src/vespa/storage/distributor/node_supported_features.h
@@ -0,0 +1,19 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+namespace storage::distributor {
+
+/**
+ * Collection of distinct features supported by a particular content node.
+ *
+ * Communicated to a distributor via bucket info exchanges. All features
+ * are initially expected to be unsupported.
+ */
+struct NodeSupportedFeatures {
+ bool unordered_merge_chaining = false;
+
+ bool operator==(const NodeSupportedFeatures&) const noexcept = default;
+};
+
+}
diff --git a/storage/src/vespa/storage/distributor/node_supported_features_repo.cpp b/storage/src/vespa/storage/distributor/node_supported_features_repo.cpp
new file mode 100644
index 00000000000..e125f360cec
--- /dev/null
+++ b/storage/src/vespa/storage/distributor/node_supported_features_repo.cpp
@@ -0,0 +1,37 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "node_supported_features_repo.h"
+#include <vespa/vespalib/stllike/hash_map.hpp>
+
+namespace storage::distributor {
+
+NodeSupportedFeaturesRepo::NodeSupportedFeaturesRepo() = default;
+
+NodeSupportedFeaturesRepo::NodeSupportedFeaturesRepo(
+ vespalib::hash_map<uint16_t, NodeSupportedFeatures> features,
+ PrivateCtorTag)
+ : _node_features(std::move(features))
+{}
+
+NodeSupportedFeaturesRepo::~NodeSupportedFeaturesRepo() = default;
+
+const NodeSupportedFeatures&
+NodeSupportedFeaturesRepo::node_supported_features(uint16_t node_idx) const noexcept
+{
+ static const NodeSupportedFeatures default_features;
+ const auto iter = _node_features.find(node_idx);
+ return (iter != _node_features.end() ? iter->second : default_features);
+}
+
+std::shared_ptr<const NodeSupportedFeaturesRepo>
+NodeSupportedFeaturesRepo::make_union_of(const vespalib::hash_map<uint16_t, NodeSupportedFeatures>& node_features) const
+{
+ auto new_features = _node_features; // Must be by copy.
+ // We always let the _new_ features update any existing mapping.
+ for (const auto& nf : node_features) {
+ new_features[nf.first] = nf.second;
+ }
+ return std::make_shared<NodeSupportedFeaturesRepo>(std::move(new_features), PrivateCtorTag{});
+}
+
+}
diff --git a/storage/src/vespa/storage/distributor/node_supported_features_repo.h b/storage/src/vespa/storage/distributor/node_supported_features_repo.h
new file mode 100644
index 00000000000..cc40c27b8e2
--- /dev/null
+++ b/storage/src/vespa/storage/distributor/node_supported_features_repo.h
@@ -0,0 +1,37 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include "node_supported_features.h"
+#include <vespa/vespalib/stllike/hash_map.h>
+#include <memory>
+
+namespace storage::distributor {
+
+/**
+ * Repo of known mappings from node distribution key to feature set supported by
+ * the content node with the given distribution key.
+ *
+ * Entirely immutable; copy-on-write via make_union_of().
+ */
+class NodeSupportedFeaturesRepo {
+ const vespalib::hash_map<uint16_t, NodeSupportedFeatures> _node_features;
+ struct PrivateCtorTag {};
+public:
+ NodeSupportedFeaturesRepo();
+
+ NodeSupportedFeaturesRepo(vespalib::hash_map<uint16_t, NodeSupportedFeatures> features, PrivateCtorTag);
+ ~NodeSupportedFeaturesRepo();
+
+ // Returns supported node features for node with distribution key node_idx, or a default feature set
+ // with all features unset if node has no known mapping.
+ [[nodiscard]] const NodeSupportedFeatures& node_supported_features(uint16_t node_idx) const noexcept;
+
+ // Returns a new repo instance containing the union key->features set of self and node_features.
+ // If there is a duplicate mapping between the two, the features in node_features take precedence
+ // and will be stored in the new repo.
+ [[nodiscard]] std::shared_ptr<const NodeSupportedFeaturesRepo>
+ make_union_of(const vespalib::hash_map<uint16_t, NodeSupportedFeatures>& node_features) const;
+};
+
+}
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp
index f951a880e5d..d220a71966f 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp
@@ -4,6 +4,7 @@
#include <vespa/storage/distributor/idealstatemanager.h>
#include <vespa/storage/distributor/idealstatemetricsset.h>
#include <vespa/storage/distributor/distributor_bucket_space.h>
+#include <vespa/storage/distributor/node_supported_features_repo.h>
#include <vespa/storage/distributor/pendingmessagetracker.h>
#include <vespa/vdslib/distribution/distribution.h>
#include <vespa/vdslib/state/clusterstate.h>
@@ -137,9 +138,8 @@ MergeOperation::onStart(DistributorStripeMessageSender& sender)
getBucketId(),
_limiter,
nodes);
- for (uint32_t i=0; i<nodes.size(); ++i) {
- _mnodes.push_back(api::MergeBucketCommand::Node(
- nodes[i]._nodeIndex, nodes[i]._sourceOnly));
+ for (const auto& node : nodes) {
+ _mnodes.emplace_back(node._nodeIndex, node._sourceOnly);
}
if (_mnodes.size() > 1) {
@@ -148,11 +148,16 @@ MergeOperation::onStart(DistributorStripeMessageSender& sender)
_mnodes,
_manager->operation_context().generate_unique_timestamp(),
clusterState.getVersion());
-
- // Due to merge forwarding/chaining semantics, we must always send
- // the merge command to the lowest indexed storage node involved in
- // the merge in order to avoid deadlocks.
- std::sort(_mnodes.begin(), _mnodes.end(), NodeIndexComparator());
+ const bool may_send_unordered = (_manager->operation_context().distributor_config().use_unordered_merge_chaining()
+ && all_involved_nodes_support_unordered_merge_chaining());
+ if (!may_send_unordered) {
+ // Due to merge forwarding/chaining semantics, we must always send
+ // the merge command to the lowest indexed storage node involved in
+ // the merge in order to avoid deadlocks.
+ std::sort(_mnodes.begin(), _mnodes.end(), NodeIndexComparator());
+ } else {
+ msg->set_use_unordered_forwarding(true);
+ }
LOG(debug, "Sending %s to storage node %u", msg->toString().c_str(),
_mnodes[0].index);
@@ -262,7 +267,7 @@ void
MergeOperation::onReceive(DistributorStripeMessageSender& sender,
const std::shared_ptr<api::StorageReply> & msg)
{
- if (_removeOperation.get()) {
+ if (_removeOperation) {
if (_removeOperation->onReceiveInternal(msg)) {
_ok = _removeOperation->ok();
if (!_ok) {
@@ -277,7 +282,7 @@ MergeOperation::onReceive(DistributorStripeMessageSender& sender,
return;
}
- api::MergeBucketReply& reply(dynamic_cast<api::MergeBucketReply&>(*msg));
+ auto& reply = dynamic_cast<api::MergeBucketReply&>(*msg);
LOG(debug,
"Merge operation for bucket %s finished",
getBucketId().toString().c_str());
@@ -367,6 +372,16 @@ bool MergeOperation::is_global_bucket_merge() const noexcept {
return getBucket().getBucketSpace() == document::FixedBucketSpaces::global_space();
}
+bool MergeOperation::all_involved_nodes_support_unordered_merge_chaining() const noexcept {
+ const auto& features_repo = _manager->operation_context().node_supported_features_repo();
+ for (uint16_t node : getNodes()) {
+ if (!features_repo.node_supported_features(node).unordered_merge_chaining) {
+ return false;
+ }
+ }
+ return true;
+}
+
MergeBucketMetricSet*
MergeOperation::get_merge_metrics()
{
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.h b/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.h
index 832c0f99681..014bae842fa 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.h
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.h
@@ -64,6 +64,7 @@ private:
void deleteSourceOnlyNodes(const BucketDatabase::Entry& currentState,
DistributorStripeMessageSender& sender);
bool is_global_bucket_merge() const noexcept;
+ bool all_involved_nodes_support_unordered_merge_chaining() const noexcept;
MergeBucketMetricSet* get_merge_metrics();
};
diff --git a/storage/src/vespa/storage/distributor/pendingclusterstate.cpp b/storage/src/vespa/storage/distributor/pendingclusterstate.cpp
index 1c1c9f4a431..8183b013668 100644
--- a/storage/src/vespa/storage/distributor/pendingclusterstate.cpp
+++ b/storage/src/vespa/storage/distributor/pendingclusterstate.cpp
@@ -9,6 +9,7 @@
#include <vespa/storageframework/defaultimplementation/clock/realclock.h>
#include <vespa/vdslib/distribution/distribution.h>
#include <vespa/vespalib/util/xmlstream.hpp>
+#include <vespa/vespalib/stllike/hash_map.hpp>
#include <climits>
#include <vespa/log/bufferedlogger.h>
@@ -44,7 +45,8 @@ PendingClusterState::PendingClusterState(
_clusterStateVersion(_cmd->getClusterStateBundle().getVersion()),
_isVersionedTransition(true),
_bucketOwnershipTransfer(false),
- _pendingTransitions()
+ _pendingTransitions(),
+ _node_features()
{
logConstructionInformation();
initializeBucketSpaceTransitions(false, outdatedNodesMap);
@@ -67,7 +69,8 @@ PendingClusterState::PendingClusterState(
_clusterStateVersion(0),
_isVersionedTransition(false),
_bucketOwnershipTransfer(true),
- _pendingTransitions()
+ _pendingTransitions(),
+ _node_features()
{
logConstructionInformation();
initializeBucketSpaceTransitions(true, OutdatedNodesMap());
@@ -287,6 +290,9 @@ PendingClusterState::onRequestBucketInfoReply(const std::shared_ptr<api::Request
auto transitionIter = _pendingTransitions.find(bucketSpaceAndNode.bucketSpace);
assert(transitionIter != _pendingTransitions.end());
transitionIter->second->onRequestBucketInfoReply(*reply, bucketSpaceAndNode.node);
+
+ update_node_supported_features_from_reply(iter->second.node, *reply);
+
_sentMessages.erase(iter);
return true;
@@ -304,21 +310,6 @@ PendingClusterState::resendDelayedMessages() {
}
}
-std::string
-PendingClusterState::requestNodesToString() const
-{
- std::ostringstream ost;
- for (uint32_t i = 0; i < _requestedNodes.size(); ++i) {
- if (_requestedNodes[i]) {
- if (ost.str().length() > 0) {
- ost << ",";
- }
- ost << i;
- }
- }
- return ost.str();
-}
-
void
PendingClusterState::merge_into_bucket_databases(StripeAccessGuard& guard)
{
@@ -366,4 +357,14 @@ PendingClusterState::getPrevClusterStateBundleString() const {
return _prevClusterStateBundle.getBaselineClusterState()->toString();
}
+void
+PendingClusterState::update_node_supported_features_from_reply(uint16_t node, const api::RequestBucketInfoReply& reply)
+{
+ const auto& src_feat = reply.supported_node_features();
+ NodeSupportedFeatures dest_feat;
+ dest_feat.unordered_merge_chaining = src_feat.unordered_merge_chaining;
+ // This will overwrite per bucket-space reply, but does not matter since it's independent of bucket space.
+ _node_features.insert(std::make_pair(node, dest_feat));
+}
+
}
diff --git a/storage/src/vespa/storage/distributor/pendingclusterstate.h b/storage/src/vespa/storage/distributor/pendingclusterstate.h
index 0d07730d9ee..1a2f8901b47 100644
--- a/storage/src/vespa/storage/distributor/pendingclusterstate.h
+++ b/storage/src/vespa/storage/distributor/pendingclusterstate.h
@@ -1,6 +1,7 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#pragma once
+#include "node_supported_features.h"
#include "pending_bucket_space_db_transition_entry.h"
#include "clusterinformation.h"
#include <vespa/storage/common/storagelink.h>
@@ -9,6 +10,7 @@
#include <vespa/storageframework/generic/clock/clock.h>
#include <vespa/vdslib/state/cluster_state_bundle.h>
#include <vespa/vespalib/util/xmlserializable.h>
+#include <vespa/vespalib/stllike/hash_map.h>
#include "outdated_nodes_map.h"
#include <unordered_map>
#include <deque>
@@ -151,9 +153,14 @@ public:
// Get pending transition for a specific bucket space. Only used by unit test.
PendingBucketSpaceDbTransition &getPendingBucketSpaceDbTransition(document::BucketSpace bucketSpace);
+ // May be a subset of the nodes in the cluster, depending on how many nodes were consulted
+ // as part of the pending cluster state. Caller must take care to aggregate features.
+ const vespalib::hash_map<uint16_t, NodeSupportedFeatures>& gathered_node_supported_features() const noexcept {
+ return _node_features;
+ }
+
void printXml(vespalib::XmlOutputStream&) const override;
Summary getSummary() const;
- std::string requestNodesToString() const;
private:
// With 100ms resend timeout, this requires a particular node to have failed
@@ -170,7 +177,7 @@ private:
DistributorMessageSender& sender,
const BucketSpaceStateMap& bucket_space_states,
const std::shared_ptr<api::SetSystemStateCommand>& newStateCmd,
- const OutdatedNodesMap &outdatedNodesMap,
+ const OutdatedNodesMap& outdatedNodesMap,
api::Timestamp creationTimestamp);
/**
@@ -213,6 +220,7 @@ private:
std::string getNewClusterStateBundleString() const;
std::string getPrevClusterStateBundleString() const;
void update_reply_failure_statistics(const api::ReturnCode& result, const BucketSpaceAndNode& source);
+ void update_node_supported_features_from_reply(uint16_t node, const api::RequestBucketInfoReply& reply);
std::shared_ptr<api::SetSystemStateCommand> _cmd;
@@ -233,6 +241,7 @@ private:
bool _isVersionedTransition;
bool _bucketOwnershipTransfer;
std::unordered_map<document::BucketSpace, std::unique_ptr<PendingBucketSpaceDbTransition>, document::BucketSpace::hash> _pendingTransitions;
+ vespalib::hash_map<uint16_t, NodeSupportedFeatures> _node_features;
};
}
diff --git a/storage/src/vespa/storage/distributor/stripe_access_guard.h b/storage/src/vespa/storage/distributor/stripe_access_guard.h
index bfc53c0ed82..2ed40cfcf2e 100644
--- a/storage/src/vespa/storage/distributor/stripe_access_guard.h
+++ b/storage/src/vespa/storage/distributor/stripe_access_guard.h
@@ -20,6 +20,8 @@ namespace vespalib::xml { class XmlOutputStream; }
namespace storage::distributor {
+class NodeSupportedFeaturesRepo;
+
/**
* A stripe access guard guarantees that the holder of a guard can access underlying
* stripes via it in a thread safe manner. In particular, while any access guard is
@@ -57,6 +59,8 @@ public:
virtual void update_read_snapshot_after_activation(const lib::ClusterStateBundle& activated_state) = 0;
virtual void clear_read_only_bucket_repo_databases() = 0;
+ virtual void update_node_supported_features_repo(std::shared_ptr<const NodeSupportedFeaturesRepo> features_repo) = 0;
+
struct PendingOperationStats {
size_t external_load_operations;
size_t maintenance_operations;
diff --git a/storage/src/vespa/storage/distributor/tickable_stripe.h b/storage/src/vespa/storage/distributor/tickable_stripe.h
index d58b1e2e6aa..e458043ac64 100644
--- a/storage/src/vespa/storage/distributor/tickable_stripe.h
+++ b/storage/src/vespa/storage/distributor/tickable_stripe.h
@@ -15,6 +15,8 @@ namespace vespalib::xml { class XmlOutputStream; }
namespace storage::distributor {
+class NodeSupportedFeaturesRepo;
+
/**
* A tickable stripe is the minimal binding glue between the stripe's worker thread and
* the actual implementation. Primarily allows for easier testing without having to
@@ -58,6 +60,8 @@ public:
virtual void update_read_snapshot_after_db_pruning(const lib::ClusterStateBundle& new_state) = 0;
virtual void update_read_snapshot_after_activation(const lib::ClusterStateBundle& activated_state) = 0;
virtual void clear_read_only_bucket_repo_databases() = 0;
+ virtual void update_node_supported_features_repo(std::shared_ptr<const NodeSupportedFeaturesRepo> features_repo) = 0;
+
// Functions used for state reporting
virtual void report_bucket_db_status(document::BucketSpace bucket_space, std::ostream& out) const = 0;
virtual StripeAccessGuard::PendingOperationStats pending_operation_stats() const = 0;
diff --git a/storage/src/vespa/storage/distributor/top_level_bucket_db_updater.cpp b/storage/src/vespa/storage/distributor/top_level_bucket_db_updater.cpp
index 8fc6d7576c9..613f0f6ce09 100644
--- a/storage/src/vespa/storage/distributor/top_level_bucket_db_updater.cpp
+++ b/storage/src/vespa/storage/distributor/top_level_bucket_db_updater.cpp
@@ -7,6 +7,7 @@
#include "top_level_distributor.h"
#include "distributor_bucket_space.h"
#include "distributormetricsset.h"
+#include "node_supported_features_repo.h"
#include "simpleclusterinformation.h"
#include "stripe_access_guard.h"
#include <vespa/document/bucket/fixed_bucket_spaces.h>
@@ -47,11 +48,12 @@ TopLevelBucketDBUpdater::TopLevelBucketDBUpdater(const DistributorNodeContext& n
_chained_sender(chained_sender),
_outdated_nodes_map(),
_transition_timer(_node_ctx.clock()),
+ _node_supported_features_repo(std::make_shared<const NodeSupportedFeaturesRepo>()),
_stale_reads_enabled(false)
{
// FIXME STRIPE top-level Distributor needs a proper way to track the current cluster state bundle!
propagate_active_state_bundle_internally(true); // We're just starting up so assume ownership transfer.
- bootstrap_distribution_config(bootstrap_distribution);
+ bootstrap_distribution_config(std::move(bootstrap_distribution));
}
TopLevelBucketDBUpdater::~TopLevelBucketDBUpdater() = default;
@@ -393,6 +395,10 @@ TopLevelBucketDBUpdater::activate_pending_cluster_state(StripeAccessGuard& guard
guard.notify_distribution_change_enabled();
}
+ _node_supported_features_repo = _node_supported_features_repo->make_union_of(
+ _pending_cluster_state->gathered_node_supported_features());
+ guard.update_node_supported_features_repo(_node_supported_features_repo);
+
guard.update_read_snapshot_after_activation(_pending_cluster_state->getNewClusterStateBundle());
_pending_cluster_state.reset();
_outdated_nodes_map.clear();
diff --git a/storage/src/vespa/storage/distributor/top_level_bucket_db_updater.h b/storage/src/vespa/storage/distributor/top_level_bucket_db_updater.h
index f35991c20f3..b1065e708a4 100644
--- a/storage/src/vespa/storage/distributor/top_level_bucket_db_updater.h
+++ b/storage/src/vespa/storage/distributor/top_level_bucket_db_updater.h
@@ -30,6 +30,7 @@ struct BucketSpaceDistributionConfigs;
class BucketSpaceDistributionContext;
class ClusterStateBundleActivationListener;
class DistributorInterface;
+class NodeSupportedFeaturesRepo;
class StripeAccessor;
class StripeAccessGuard;
@@ -122,6 +123,7 @@ private:
ChainedMessageSender& _chained_sender;
OutdatedNodesMap _outdated_nodes_map;
framework::MilliSecTimer _transition_timer;
+ std::shared_ptr<const NodeSupportedFeaturesRepo> _node_supported_features_repo;
std::atomic<bool> _stale_reads_enabled;
};
diff --git a/storage/src/vespa/storage/storageserver/mergethrottler.cpp b/storage/src/vespa/storage/storageserver/mergethrottler.cpp
index 05e50492206..bc2f54e5a50 100644
--- a/storage/src/vespa/storage/storageserver/mergethrottler.cpp
+++ b/storage/src/vespa/storage/storageserver/mergethrottler.cpp
@@ -113,30 +113,40 @@ MergeThrottler::MergeOperationMetrics::MergeOperationMetrics(const std::string&
}
MergeThrottler::MergeOperationMetrics::~MergeOperationMetrics() = default;
-MergeThrottler::MergeNodeSequence::MergeNodeSequence(
- const api::MergeBucketCommand& cmd,
- uint16_t thisIndex)
+MergeThrottler::MergeNodeSequence::MergeNodeSequence(const api::MergeBucketCommand& cmd, uint16_t thisIndex)
: _cmd(cmd),
_sortedNodes(cmd.getNodes()),
- _sortedIndex(std::numeric_limits<std::size_t>::max()),
- _thisIndex(thisIndex)
+ _sortedIndex(UINT16_MAX),
+ _unordered_index(UINT16_MAX),
+ _thisIndex(thisIndex),
+ _use_unordered_forwarding(cmd.use_unordered_forwarding())
{
// Sort the node vector so that we can find out if we're the
// last node in the chain or if we should forward the merge
std::sort(_sortedNodes.begin(), _sortedNodes.end(), NodeComparator());
- assert(!_sortedNodes.empty());
- for (std::size_t i = 0; i < _sortedNodes.size(); ++i) {
+ assert(!_sortedNodes.empty() && (_sortedNodes.size() < UINT16_MAX));
+ for (uint16_t i = 0; i < static_cast<uint16_t>(_sortedNodes.size()); ++i) {
if (_sortedNodes[i].index == _thisIndex) {
_sortedIndex = i;
break;
}
}
+ const auto& nodes = unordered_nodes();
+ for (uint16_t i = 0; i < static_cast<uint16_t>(nodes.size()); ++i) {
+ if (nodes[i].index == _thisIndex) {
+ _unordered_index = i;
+ break;
+ }
+ }
}
uint16_t
MergeThrottler::MergeNodeSequence::getNextNodeInChain() const
{
assert(_cmd.getChain().size() < _sortedNodes.size());
+ if (_use_unordered_forwarding) {
+ return unordered_nodes()[_cmd.getChain().size() + 1].index;
+ }
// assert(_sortedNodes[_cmd.getChain().size()].index == _thisIndex);
if (_sortedNodes[_cmd.getChain().size()].index != _thisIndex) {
// Some added paranoia output
@@ -153,7 +163,11 @@ MergeThrottler::MergeNodeSequence::isChainCompleted() const
{
if (_cmd.getChain().size() != _sortedNodes.size()) return false;
- for (std::size_t i = 0; i < _cmd.getChain().size(); ++i) {
+ if (_use_unordered_forwarding) {
+ return true; // Expect chain to be correct if size matches node sequence size. TODO can't we always do this?
+ }
+
+ for (size_t i = 0; i < _cmd.getChain().size(); ++i) {
if (_cmd.getChain()[i] != _sortedNodes[i].index) {
return false;
}
@@ -162,10 +176,10 @@ MergeThrottler::MergeNodeSequence::isChainCompleted() const
}
bool
-MergeThrottler::MergeNodeSequence::chainContainsIndex(uint16_t idx) const
+MergeThrottler::MergeNodeSequence::chain_contains_this_node() const noexcept
{
- for (std::size_t i = 0; i < _cmd.getChain().size(); ++i) {
- if (_cmd.getChain()[i] == idx) {
+ for (size_t i = 0; i < _cmd.getChain().size(); ++i) {
+ if (_cmd.getChain()[i] == _thisIndex) {
return true;
}
}
@@ -358,6 +372,7 @@ MergeThrottler::forwardCommandToNode(
fwdMerge->setSourceIndex(mergeCmd.getSourceIndex());
fwdMerge->setPriority(mergeCmd.getPriority());
fwdMerge->setTimeout(mergeCmd.getTimeout());
+ fwdMerge->set_use_unordered_forwarding(mergeCmd.use_unordered_forwarding());
msgGuard.sendUp(fwdMerge);
}
@@ -374,7 +389,7 @@ api::StorageMessage::SP
MergeThrottler::getNextQueuedMerge()
{
if (_queue.empty()) {
- return api::StorageMessage::SP();
+ return {};
}
auto iter = _queue.begin();
@@ -385,7 +400,7 @@ MergeThrottler::getNextQueuedMerge()
}
void
-MergeThrottler::enqueueMerge(
+MergeThrottler::enqueue_merge_for_later_processing(
const api::StorageMessage::SP& msg,
MessageGuard& msgGuard)
{
@@ -395,9 +410,10 @@ MergeThrottler::enqueueMerge(
if (!validateNewMerge(mergeCmd, nodeSeq, msgGuard)) {
return;
}
- const bool is_forwarded_merge = _disable_queue_limits_for_chained_merges && !mergeCmd.getChain().empty();
+ // TODO remove once unordered merges are default, since forwarded unordered merges are never enqueued
+ const bool is_forwarded_merge = _disable_queue_limits_for_chained_merges && !mergeCmd.from_distributor();
_queue.emplace(msg, _queueSequence++, is_forwarded_merge);
- _metrics->queueSize.set(_queue.size());
+ _metrics->queueSize.set(static_cast<int64_t>(_queue.size()));
}
bool
@@ -682,11 +698,30 @@ bool MergeThrottler::backpressure_mode_active() const {
return backpressure_mode_active_no_lock();
}
-bool MergeThrottler::allow_merge_with_queue_full(const api::MergeBucketCommand& cmd) const noexcept {
- // We let any merge through that has already passed through at least one other node's merge
- // window, as that has already taken up a logical resource slot on all those nodes. Busy-bouncing
- // a merge at that point would undo a great amount of thumb-twiddling and waiting.
- return (_disable_queue_limits_for_chained_merges && !cmd.getChain().empty());
+bool MergeThrottler::allow_merge_despite_full_window(const api::MergeBucketCommand& cmd) noexcept {
+ // We cannot let forwarded unordered merges fall into the queue, as that might lead to a deadlock.
+ // See comment in may_allow_into_queue() for rationale.
+ return (cmd.use_unordered_forwarding() && !cmd.from_distributor());
+}
+
+bool MergeThrottler::may_allow_into_queue(const api::MergeBucketCommand& cmd) const noexcept {
+ // We cannot let forwarded unordered merges fall into the queue, as that might lead to a deadlock.
+ // Consider the following scenario, with two nodes C0 and C1, each with a low window size of 1 (low
+ // limit chosen for demonstration purposes, but is entirely generalizable):
+ // 1. Node 0 receives merge M_x for nodes [0, 1], places in active window, forwards to node 1
+ // 2. Node 1 receives merge M_y for nodes [1, 0], places in active window, forwards to node 0
+ // 3. Node 0 receives merge M_y from node 1. Active window is full, so places in queue
+ // 4. Node 1 receives merge M_x from node 0. Active window is full, so places in queue
+ // 5. Neither M_x nor M_y will ever complete since they're waiting for resources that cannot be
+ // freed up before they themselves complete. Classic deadlock(tm).
+ //
+ // We do, however, allow enqueueing unordered merges that come straight from the distributor, as
+ // those cannot cause a deadlock at that point in time.
+ if (cmd.use_unordered_forwarding()) {
+ return cmd.from_distributor();
+ }
+ return ((_queue.size() < _maxQueueSize)
+ || (_disable_queue_limits_for_chained_merges && !cmd.from_distributor()));
}
// Must be run from worker thread
@@ -716,10 +751,10 @@ MergeThrottler::handleMessageDown(
if (isMergeAlreadyKnown(msg)) {
processCycledMergeCommand(msg, msgGuard);
- } else if (canProcessNewMerge()) {
+ } else if (canProcessNewMerge() || allow_merge_despite_full_window(mergeCmd)) {
processNewMergeCommand(msg, msgGuard);
- } else if ((_queue.size() < _maxQueueSize) || allow_merge_with_queue_full(mergeCmd)) {
- enqueueMerge(msg, msgGuard); // Queue for later processing
+ } else if (may_allow_into_queue(mergeCmd)) {
+ enqueue_merge_for_later_processing(msg, msgGuard);
} else {
// No more room at the inn. Return BUSY so that the
// distributor will wait a bit before retrying
@@ -773,7 +808,7 @@ MergeThrottler::validateNewMerge(
<< _component.getIndex()
<< ", which is not in its forwarding chain";
LOG(error, "%s", oss.str().data());
- } else if (mergeCmd.getChain().size() >= nodeSeq.getSortedNodes().size()) {
+ } else if (mergeCmd.getChain().size() >= nodeSeq.unordered_nodes().size()) {
// Chain is full but we haven't seen the merge! This means
// the node has probably gone down with a merge it previously
// forwarded only now coming back to haunt it.
@@ -781,7 +816,7 @@ MergeThrottler::validateNewMerge(
<< " is not in node's internal state, but has a "
<< "full chain, meaning it cannot be forwarded.";
LOG(debug, "%s", oss.str().data());
- } else if (nodeSeq.chainContainsIndex(nodeSeq.getThisNodeIndex())) {
+ } else if (nodeSeq.chain_contains_this_node()) {
oss << mergeCmd.toString()
<< " is not in node's internal state, but contains "
<< "this node in its non-full chain. This should not happen!";
@@ -831,7 +866,9 @@ MergeThrottler::processNewMergeCommand(
// If chain is empty and this node is not the lowest
// index in the nodeset, immediately execute. Required for
// backwards compatibility with older distributor versions.
- if (mergeCmd.getChain().empty()
+ // TODO remove this
+ if (mergeCmd.from_distributor()
+ && !mergeCmd.use_unordered_forwarding()
&& (nodeSeq.getSortedNodes()[0].index != _component.getIndex()))
{
LOG(debug, "%s has empty chain and was sent to node that "
@@ -1039,7 +1076,6 @@ bool
MergeThrottler::onSetSystemState(
const std::shared_ptr<api::SetSystemStateCommand>& stateCmd)
{
-
LOG(debug,
"New cluster state arrived with version %u, flushing "
"all outdated queued merges",
diff --git a/storage/src/vespa/storage/storageserver/mergethrottler.h b/storage/src/vespa/storage/storageserver/mergethrottler.h
index da301172a3a..c115d36ad89 100644
--- a/storage/src/vespa/storage/storageserver/mergethrottler.h
+++ b/storage/src/vespa/storage/storageserver/mergethrottler.h
@@ -161,7 +161,7 @@ private:
ActiveMergeMap _merges;
MergePriorityQueue _queue;
- std::size_t _maxQueueSize;
+ size_t _maxQueueSize;
mbus::StaticThrottlePolicy::UP _throttlePolicy;
uint64_t _queueSequence; // TODO: move into a stable priority queue class
mutable std::mutex _messageLock;
@@ -220,7 +220,7 @@ public:
std::mutex& getStateLock() { return _stateLock; }
Metrics& getMetrics() { return *_metrics; }
- std::size_t getMaxQueueSize() const { return _maxQueueSize; }
+ size_t getMaxQueueSize() const { return _maxQueueSize; }
void print(std::ostream& out, bool verbose, const std::string& indent) const override;
void reportHtmlStatus(std::ostream&, const framework::HttpUrlPath&) const override;
private:
@@ -230,17 +230,18 @@ private:
struct MergeNodeSequence {
const api::MergeBucketCommand& _cmd;
std::vector<api::MergeBucketCommand::Node> _sortedNodes;
- std::size_t _sortedIndex; // Index of current storage node in the sorted node sequence
+ uint16_t _sortedIndex; // Index of current storage node in the sorted node sequence
+ uint16_t _unordered_index;
const uint16_t _thisIndex; // Index of the current storage node
+ bool _use_unordered_forwarding;
MergeNodeSequence(const api::MergeBucketCommand& cmd, uint16_t thisIndex);
- std::size_t getSortedIndex() const { return _sortedIndex; }
const std::vector<api::MergeBucketCommand::Node>& getSortedNodes() const {
return _sortedNodes;
}
bool isIndexUnknown() const {
- return (_sortedIndex == std::numeric_limits<std::size_t>::max());
+ return (_sortedIndex == UINT16_MAX);
}
/**
* This node is the merge executor if it's the first element in the
@@ -252,11 +253,17 @@ private:
uint16_t getExecutorNodeIndex() const{
return _cmd.getNodes()[0].index;
}
- bool isLastNode() const {
- return (_sortedIndex == _sortedNodes.size() - 1);
+ const std::vector<api::MergeBucketCommand::Node>& unordered_nodes() const noexcept {
+ return _cmd.getNodes();
}
- bool chainContainsIndex(uint16_t idx) const;
- uint16_t getThisNodeIndex() const { return _thisIndex; }
+ [[nodiscard]] bool isLastNode() const {
+ if (!_use_unordered_forwarding) {
+ return (_sortedIndex == _sortedNodes.size() - 1);
+ } else {
+ return (_unordered_index == (unordered_nodes().size() - 1));
+ }
+ }
+ [[nodiscard]] bool chain_contains_this_node() const noexcept;
/**
* Gets node to forward to in strictly increasing order.
*/
@@ -339,7 +346,7 @@ private:
* @return Highest priority waiting merge or null SP if queue is empty
*/
api::StorageMessage::SP getNextQueuedMerge();
- void enqueueMerge(const api::StorageMessage::SP& msg, MessageGuard& msgGuard);
+ void enqueue_merge_for_later_processing(const api::StorageMessage::SP& msg, MessageGuard& msgGuard);
/**
* @return true if throttle policy says at least one additional
@@ -347,12 +354,13 @@ private:
*/
bool canProcessNewMerge() const;
- bool merge_is_backpressure_throttled(const api::MergeBucketCommand& cmd) const;
+ [[nodiscard]] bool merge_is_backpressure_throttled(const api::MergeBucketCommand& cmd) const;
void bounce_backpressure_throttled_merge(const api::MergeBucketCommand& cmd, MessageGuard& guard);
- bool merge_has_this_node_as_source_only_node(const api::MergeBucketCommand& cmd) const;
- bool backpressure_mode_active_no_lock() const;
+ [[nodiscard]] bool merge_has_this_node_as_source_only_node(const api::MergeBucketCommand& cmd) const;
+ [[nodiscard]] bool backpressure_mode_active_no_lock() const;
void backpressure_bounce_all_queued_merges(MessageGuard& guard);
- bool allow_merge_with_queue_full(const api::MergeBucketCommand& cmd) const noexcept;
+ [[nodiscard]] static bool allow_merge_despite_full_window(const api::MergeBucketCommand& cmd) noexcept;
+ [[nodiscard]] bool may_allow_into_queue(const api::MergeBucketCommand& cmd) const noexcept;
void sendReply(const api::MergeBucketCommand& cmd,
const api::ReturnCode& result,
diff --git a/storageapi/src/tests/mbusprot/storageprotocoltest.cpp b/storageapi/src/tests/mbusprot/storageprotocoltest.cpp
index a6021a7cfd2..6a00ddc8d8e 100644
--- a/storageapi/src/tests/mbusprot/storageprotocoltest.cpp
+++ b/storageapi/src/tests/mbusprot/storageprotocoltest.cpp
@@ -410,6 +410,12 @@ TEST_P(StorageProtocolTest, request_bucket_info) {
// "Last modified" not counted by operator== for some reason. Testing
// separately until we can figure out if this is by design or not.
EXPECT_EQ(lastMod, entries[0]._info.getLastModified());
+
+ if (GetParam().getMajor() >= 7) {
+ EXPECT_TRUE(reply2->supported_node_features().unordered_merge_chaining);
+ } else {
+ EXPECT_FALSE(reply2->supported_node_features().unordered_merge_chaining);
+ }
}
}
@@ -471,12 +477,18 @@ TEST_P(StorageProtocolTest, merge_bucket) {
chain.push_back(14);
auto cmd = std::make_shared<MergeBucketCommand>(_bucket, nodes, Timestamp(1234), 567, chain);
+ cmd->set_use_unordered_forwarding(true);
auto cmd2 = copyCommand(cmd);
EXPECT_EQ(_bucket, cmd2->getBucket());
EXPECT_EQ(nodes, cmd2->getNodes());
EXPECT_EQ(Timestamp(1234), cmd2->getMaxTimestamp());
EXPECT_EQ(uint32_t(567), cmd2->getClusterStateVersion());
EXPECT_EQ(chain, cmd2->getChain());
+ if (GetParam().getMajor() >= 7) {
+ EXPECT_EQ(cmd2->use_unordered_forwarding(), cmd->use_unordered_forwarding());
+ } else {
+ EXPECT_FALSE(cmd2->use_unordered_forwarding());
+ }
auto reply = std::make_shared<MergeBucketReply>(*cmd);
auto reply2 = copyReply(reply);
diff --git a/storageapi/src/vespa/storageapi/mbusprot/protobuf/maintenance.proto b/storageapi/src/vespa/storageapi/mbusprot/protobuf/maintenance.proto
index 34d67fdc00c..7f7ab1d7c0b 100644
--- a/storageapi/src/vespa/storageapi/mbusprot/protobuf/maintenance.proto
+++ b/storageapi/src/vespa/storageapi/mbusprot/protobuf/maintenance.proto
@@ -38,6 +38,7 @@ message MergeBucketRequest {
uint64 max_timestamp = 3;
repeated MergeNode nodes = 4;
repeated uint32 node_chain = 5;
+ bool unordered_forwarding = 6;
}
message MergeBucketResponse {
@@ -108,8 +109,14 @@ message BucketAndBucketInfo {
BucketInfo bucket_info = 2;
}
+message SupportedNodeFeatures {
+ bool unordered_merge_chaining = 1;
+}
+
message RequestBucketInfoResponse {
repeated BucketAndBucketInfo bucket_infos = 1;
+ // Only present for full bucket info fetches (not for explicit buckets)
+ SupportedNodeFeatures supported_node_features = 2;
}
message NotifyBucketChangeRequest {
diff --git a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.cpp b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.cpp
index bb4cb6e24a3..8425294cbbd 100644
--- a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.cpp
+++ b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.cpp
@@ -766,6 +766,7 @@ void ProtocolSerialization7::onEncode(GBBuf& buf, const api::MergeBucketCommand&
set_merge_nodes(*req.mutable_nodes(), msg.getNodes());
req.set_max_timestamp(msg.getMaxTimestamp());
req.set_cluster_state_version(msg.getClusterStateVersion());
+ req.set_unordered_forwarding(msg.use_unordered_forwarding());
for (uint16_t chain_node : msg.getChain()) {
req.add_node_chain(chain_node);
}
@@ -787,6 +788,7 @@ api::StorageCommand::UP ProtocolSerialization7::onDecodeMergeBucketCommand(BBuf&
chain.emplace_back(node);
}
cmd->setChain(std::move(chain));
+ cmd->set_use_unordered_forwarding(req.unordered_forwarding());
return cmd;
});
}
@@ -999,6 +1001,10 @@ void ProtocolSerialization7::onEncode(GBBuf& buf, const api::RequestBucketInfoRe
bucket_and_info->set_raw_bucket_id(entry._bucketId.getRawId());
set_bucket_info(*bucket_and_info->mutable_bucket_info(), entry._info);
}
+ // We mark features as available at protocol level. Only included for full bucket fetch responses.
+ if (msg.full_bucket_fetch()) {
+ res.mutable_supported_node_features()->set_unordered_merge_chaining(true);
+ }
});
}
@@ -1035,6 +1041,11 @@ api::StorageReply::UP ProtocolSerialization7::onDecodeRequestBucketInfoReply(con
dest_entries[i]._bucketId = document::BucketId(proto_entry.raw_bucket_id());
dest_entries[i]._info = get_bucket_info(proto_entry.bucket_info());
}
+ if (res.has_supported_node_features()) {
+ const auto& src_features = res.supported_node_features();
+ auto& dest_features = reply->supported_node_features();
+ dest_features.unordered_merge_chaining = src_features.unordered_merge_chaining();
+ }
return reply;
});
}
diff --git a/storageapi/src/vespa/storageapi/message/bucket.cpp b/storageapi/src/vespa/storageapi/message/bucket.cpp
index 360db5ea3d7..04a40fbc885 100644
--- a/storageapi/src/vespa/storageapi/message/bucket.cpp
+++ b/storageapi/src/vespa/storageapi/message/bucket.cpp
@@ -107,7 +107,8 @@ MergeBucketCommand::MergeBucketCommand(
_nodes(nodes),
_maxTimestamp(maxTimestamp),
_clusterStateVersion(clusterStateVersion),
- _chain(chain)
+ _chain(chain),
+ _use_unordered_forwarding(false)
{}
MergeBucketCommand::~MergeBucketCommand() = default;
@@ -128,6 +129,9 @@ MergeBucketCommand::print(std::ostream& out, bool verbose, const std::string& in
out << _chain[i];
}
out << "]";
+ if (_use_unordered_forwarding) {
+ out << " (unordered forwarding)";
+ }
out << ", reasons to start: " << _reason;
out << ")";
if (verbose) {
diff --git a/storageapi/src/vespa/storageapi/message/bucket.h b/storageapi/src/vespa/storageapi/message/bucket.h
index c24ed55d7a8..5fd79ffffea 100644
--- a/storageapi/src/vespa/storageapi/message/bucket.h
+++ b/storageapi/src/vespa/storageapi/message/bucket.h
@@ -118,6 +118,7 @@ private:
Timestamp _maxTimestamp;
uint32_t _clusterStateVersion;
std::vector<uint16_t> _chain;
+ bool _use_unordered_forwarding;
public:
MergeBucketCommand(const document::Bucket &bucket,
@@ -133,6 +134,11 @@ public:
uint32_t getClusterStateVersion() const { return _clusterStateVersion; }
void setClusterStateVersion(uint32_t version) { _clusterStateVersion = version; }
void setChain(const std::vector<uint16_t>& chain) { _chain = chain; }
+ void set_use_unordered_forwarding(bool unordered_forwarding) noexcept {
+ _use_unordered_forwarding = unordered_forwarding;
+ }
+ [[nodiscard]] bool use_unordered_forwarding() const noexcept { return _use_unordered_forwarding; }
+ [[nodiscard]] bool from_distributor() const noexcept { return _chain.empty(); }
void print(std::ostream& out, bool verbose, const std::string& indent) const override;
DECLARE_STORAGECOMMAND(MergeBucketCommand, onMergeBucket)
};
@@ -385,19 +391,30 @@ public:
: _bucketId(id), _info(info) {}
friend std::ostream& operator<<(std::ostream& os, const Entry&);
};
- typedef vespalib::Array<Entry> EntryVector;
+ struct SupportedNodeFeatures {
+ bool unordered_merge_chaining = false;
+ };
+ using EntryVector = vespalib::Array<Entry>;
private:
- EntryVector _buckets;
- bool _full_bucket_fetch;
- document::BucketId _super_bucket_id;
+ EntryVector _buckets;
+ bool _full_bucket_fetch;
+ document::BucketId _super_bucket_id;
+ SupportedNodeFeatures _supported_node_features;
public:
explicit RequestBucketInfoReply(const RequestBucketInfoCommand& cmd);
- ~RequestBucketInfoReply();
+ ~RequestBucketInfoReply() override;
const EntryVector & getBucketInfo() const { return _buckets; }
EntryVector & getBucketInfo() { return _buckets; }
[[nodiscard]] bool full_bucket_fetch() const noexcept { return _full_bucket_fetch; }
+ // Only contains useful information if full_bucket_fetch() == true
+ [[nodiscard]] const SupportedNodeFeatures& supported_node_features() const noexcept {
+ return _supported_node_features;
+ }
+ [[nodiscard]] SupportedNodeFeatures& supported_node_features() noexcept {
+ return _supported_node_features;
+ }
const document::BucketId& super_bucket_id() const { return _super_bucket_id; }
void print(std::ostream& out, bool verbose, const std::string& indent) const override;
DECLARE_STORAGEREPLY(RequestBucketInfoReply, onRequestBucketInfoReply)
diff --git a/vespa-feed-client-cli/pom.xml b/vespa-feed-client-cli/pom.xml
index 930d31beb6c..aff625fe3a4 100644
--- a/vespa-feed-client-cli/pom.xml
+++ b/vespa-feed-client-cli/pom.xml
@@ -12,11 +12,6 @@
<packaging>jar</packaging>
<version>7-SNAPSHOT</version>
- <properties>
- <!-- Used by internal properties that are still using JDK8-->
- <maven.compiler.release>8</maven.compiler.release>
- </properties>
-
<dependencies>
<!-- compile scope -->
<dependency>
@@ -56,11 +51,7 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
- <jdkToolchain>
- <version>${java.version}</version>
- </jdkToolchain>
- <source>${java.version}</source>
- <target>${java.version}</target>
+ <release>${vespaClients.jdk.releaseVersion}</release>
<showDeprecation>true</showDeprecation>
<compilerArgs>
<arg>-Xlint:all</arg>
diff --git a/vespa-feed-client/pom.xml b/vespa-feed-client/pom.xml
index cf2da78c4a9..68c9e4b4b7c 100644
--- a/vespa-feed-client/pom.xml
+++ b/vespa-feed-client/pom.xml
@@ -12,11 +12,6 @@
<packaging>jar</packaging>
<version>7-SNAPSHOT</version>
- <properties>
- <!-- Used by internal properties that are still using JDK8-->
- <maven.compiler.release>8</maven.compiler.release>
- </properties>
-
<dependencies>
<!-- compile scope -->
<dependency>
@@ -54,11 +49,7 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
- <jdkToolchain>
- <version>${java.version}</version>
- </jdkToolchain>
- <source>${java.version}</source>
- <target>${java.version}</target>
+ <release>${vespaClients.jdk.releaseVersion}</release>
<showDeprecation>true</showDeprecation>
<compilerArgs>
<arg>-Xlint:all</arg>
diff --git a/vespa-hadoop/pom.xml b/vespa-hadoop/pom.xml
index 60e3aff01cb..8c65470abea 100644
--- a/vespa-hadoop/pom.xml
+++ b/vespa-hadoop/pom.xml
@@ -19,8 +19,6 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<hadoop.version>2.8.0</hadoop.version>
<pig.version>0.14.0</pig.version>
- <!-- This is a client jar and should be compilable with jdk8 -->
- <maven.compiler.release>8</maven.compiler.release>
</properties>
<dependencies>
@@ -186,10 +184,7 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
- <jdkToolchain>
- <version>${java.version}</version>
- </jdkToolchain>
- <release>${maven.compiler.release}</release>
+ <release>${vespaClients.jdk.releaseVersion}</release>
</configuration>
</plugin>
</plugins>
diff --git a/vespa-http-client/pom.xml b/vespa-http-client/pom.xml
index fa73dd1bd74..eefb07d4ece 100644
--- a/vespa-http-client/pom.xml
+++ b/vespa-http-client/pom.xml
@@ -14,11 +14,6 @@
<name>${project.artifactId}</name>
<description>Independent external feeding API towards Vespa.</description>
- <properties>
- <!-- This is a client jar and should be compilable with jdk8 -->
- <maven.compiler.release>8</maven.compiler.release>
- </properties>
-
<dependencies>
<!-- NOTE: Adding dependencies here may break clients because this is used outside an OSGi container with
@@ -155,11 +150,7 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
- <jdkToolchain>
- <version>${java.version}</version>
- </jdkToolchain>
- <source>${java.version}</source>
- <target>${java.version}</target>
+ <release>${vespaClients.jdk.releaseVersion}</release>
<showDeprecation>true</showDeprecation>
<compilerArgs>
<arg>-Xlint:all</arg>