aboutsummaryrefslogtreecommitdiffstats
path: root/storage/src/vespa
diff options
context:
space:
mode:
Diffstat (limited to 'storage/src/vespa')
-rw-r--r--storage/src/vespa/storage/config/distributorconfiguration.cpp2
-rw-r--r--storage/src/vespa/storage/config/distributorconfiguration.h6
-rw-r--r--storage/src/vespa/storage/config/stor-distributormanager.def7
-rw-r--r--storage/src/vespa/storage/config/stor-server.def4
-rw-r--r--storage/src/vespa/storage/distributor/activecopy.cpp5
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/putoperation.cpp2
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp2
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/updateoperation.cpp5
-rw-r--r--storage/src/vespa/storage/distributor/operationtargetresolverimpl.cpp79
-rw-r--r--storage/src/vespa/storage/distributor/operationtargetresolverimpl.h22
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/merge_handler_metrics.cpp4
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/merge_handler_metrics.h4
-rw-r--r--storage/src/vespa/storage/persistence/mergehandler.cpp53
-rw-r--r--storage/src/vespa/storage/persistence/mergehandler.h20
-rw-r--r--storage/src/vespa/storage/storageserver/documentapiconverter.cpp6
-rw-r--r--storage/src/vespa/storage/storageserver/rpc/cluster_controller_api_rpc_service.cpp6
-rw-r--r--storage/src/vespa/storage/storageserver/storagenode.cpp17
-rw-r--r--storage/src/vespa/storageapi/mbusprot/protobuf/feed.proto17
-rw-r--r--storage/src/vespa/storageapi/mbusprot/protocolserialization7.cpp7
-rw-r--r--storage/src/vespa/storageapi/message/persistence.cpp12
-rw-r--r--storage/src/vespa/storageapi/message/persistence.h21
-rw-r--r--storage/src/vespa/storageapi/message/state.cpp24
-rw-r--r--storage/src/vespa/storageapi/message/state.h18
23 files changed, 267 insertions, 76 deletions
diff --git a/storage/src/vespa/storage/config/distributorconfiguration.cpp b/storage/src/vespa/storage/config/distributorconfiguration.cpp
index 6957e541d6b..cfbc4caf82d 100644
--- a/storage/src/vespa/storage/config/distributorconfiguration.cpp
+++ b/storage/src/vespa/storage/config/distributorconfiguration.cpp
@@ -46,6 +46,7 @@ DistributorConfiguration::DistributorConfiguration(StorageComponent& component)
_use_weak_internal_read_consistency_for_client_gets(false),
_enable_metadata_only_fetch_phase_for_inconsistent_updates(true),
_enable_operation_cancellation(false),
+ _symmetric_put_and_activate_replica_selection(false),
_minimumReplicaCountingMode(ReplicaCountingMode::TRUSTED)
{
}
@@ -150,6 +151,7 @@ DistributorConfiguration::configure(const DistributorManagerConfig & config)
_max_activation_inhibited_out_of_sync_groups = config.maxActivationInhibitedOutOfSyncGroups;
_enable_operation_cancellation = config.enableOperationCancellation;
_minimumReplicaCountingMode = deriveReplicaCountingMode(config.minimumReplicaCountingMode);
+ _symmetric_put_and_activate_replica_selection = config.symmetricPutAndActivateReplicaSelection;
if (config.maxClusterClockSkewSec >= 0) {
_maxClusterClockSkew = std::chrono::seconds(config.maxClusterClockSkewSec);
diff --git a/storage/src/vespa/storage/config/distributorconfiguration.h b/storage/src/vespa/storage/config/distributorconfiguration.h
index 38fac13150c..2b73fdc0fa1 100644
--- a/storage/src/vespa/storage/config/distributorconfiguration.h
+++ b/storage/src/vespa/storage/config/distributorconfiguration.h
@@ -234,8 +234,11 @@ public:
[[nodiscard]] bool enable_operation_cancellation() const noexcept {
return _enable_operation_cancellation;
}
+ [[nodiscard]] bool symmetric_put_and_activate_replica_selection() const noexcept {
+ return _symmetric_put_and_activate_replica_selection;
+ }
- bool containsTimeStatement(const std::string& documentSelection) const;
+ [[nodiscard]] bool containsTimeStatement(const std::string& documentSelection) const;
private:
StorageComponent& _component;
@@ -276,6 +279,7 @@ private:
bool _use_weak_internal_read_consistency_for_client_gets;
bool _enable_metadata_only_fetch_phase_for_inconsistent_updates; //TODO Rewrite tests and GC
bool _enable_operation_cancellation;
+ bool _symmetric_put_and_activate_replica_selection;
ReplicaCountingMode _minimumReplicaCountingMode;
};
diff --git a/storage/src/vespa/storage/config/stor-distributormanager.def b/storage/src/vespa/storage/config/stor-distributormanager.def
index 3f6028d7fa1..a4d4461ba68 100644
--- a/storage/src/vespa/storage/config/stor-distributormanager.def
+++ b/storage/src/vespa/storage/config/stor-distributormanager.def
@@ -159,6 +159,13 @@ num_distributor_stripes int default=0 restart
## requests partially or fully "invalidated" by such a change.
enable_operation_cancellation bool default=false
+## Iff true there will be an 1-1 symmetry between the replicas chosen as feed targets
+## for Put operations and the replica selection logic for bucket activation. In particular,
+## the most preferred replica for feed will be the most preferred bucket for activation.
+## This helps ensure that new versions of documents are routed to replicas that are most
+## likely to reflect these changes as part of visible search results.
+symmetric_put_and_activate_replica_selection bool default=false
+
## TODO GC very soon, it has no effect.
priority_merge_out_of_sync_copies int default=120
diff --git a/storage/src/vespa/storage/config/stor-server.def b/storage/src/vespa/storage/config/stor-server.def
index 8cd204bcf9f..44e4b14eafc 100644
--- a/storage/src/vespa/storage/config/stor-server.def
+++ b/storage/src/vespa/storage/config/stor-server.def
@@ -114,3 +114,7 @@ simulated_bucket_request_latency_msec int default=0
## a disjoint subset of the node's buckets, in order to reduce locking contention.
## Max value is unspecified, but will be clamped internally.
content_node_bucket_db_stripe_bits int default=4 restart
+
+## Iff set, a special `pidfile` file is written under the node's root directory upon
+## startup containing the PID of the running process.
+write_pid_file_on_startup bool default=true
diff --git a/storage/src/vespa/storage/distributor/activecopy.cpp b/storage/src/vespa/storage/distributor/activecopy.cpp
index 35070bcee3b..b823978a0cc 100644
--- a/storage/src/vespa/storage/distributor/activecopy.cpp
+++ b/storage/src/vespa/storage/distributor/activecopy.cpp
@@ -108,6 +108,7 @@ buildNodeList(const BucketDatabase::Entry& e,vespalib::ConstArrayRef<uint16_t> n
struct ActiveStateOrder {
bool operator()(const ActiveCopy & e1, const ActiveCopy & e2) noexcept {
+ // Replica selection order should be kept in sync with OperationTargetResolverImpl's InstanceOrder.
if (e1._ready != e2._ready) {
return e1._ready;
}
@@ -120,7 +121,9 @@ struct ActiveStateOrder {
if (e1._active != e2._active) {
return e1._active;
}
- return e1.nodeIndex() < e2.nodeIndex();
+ // Use _entry_ order instead of node index, as it is in ideal state order (even for retired
+ // nodes), which avoids unintentional affinities towards lower node indexes.
+ return e1.entryIndex() < e2.entryIndex();
}
};
diff --git a/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp
index 54087850e1b..a92896279b0 100644
--- a/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp
@@ -180,6 +180,8 @@ void PutOperation::start_direct_put_dispatch(DistributorStripeMessageSender& sen
_op_ctx.distributor_config().getMinimalBucketSplit(),
_bucket_space.getDistribution().getRedundancy(),
_msg->getBucket().getBucketSpace());
+ targetResolver.use_symmetric_replica_selection(
+ _op_ctx.distributor_config().symmetric_put_and_activate_replica_selection());
OperationTargetList targets(targetResolver.getTargets(OperationTargetResolver::PUT, _doc_id_bucket_id));
for (const auto& target : targets) {
diff --git a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp
index 84e9ab71bcb..849746416d6 100644
--- a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp
@@ -668,7 +668,7 @@ TwoPhaseUpdateOperation::applyUpdateToDocument(document::Document& doc) const
bool
TwoPhaseUpdateOperation::shouldCreateIfNonExistent() const
{
- return _updateCmd->getUpdate()->getCreateIfNonExistent();
+ return _updateCmd->create_if_missing();
}
bool
diff --git a/storage/src/vespa/storage/distributor/operations/external/updateoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/updateoperation.cpp
index 7b6833cc299..2b47d53363f 100644
--- a/storage/src/vespa/storage/distributor/operations/external/updateoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/external/updateoperation.cpp
@@ -29,7 +29,7 @@ UpdateOperation::UpdateOperation(const DistributorNodeContext& node_ctx,
_msg(msg),
_entries(std::move(entries)),
_new_timestamp(_msg->getTimestamp()),
- _is_auto_create_update(_msg->getUpdate()->getCreateIfNonExistent()),
+ _is_auto_create_update(_msg->create_if_missing()),
_node_ctx(node_ctx),
_op_ctx(op_ctx),
_bucketSpace(bucketSpace),
@@ -112,6 +112,9 @@ UpdateOperation::onStart(DistributorStripeMessageSender& sender)
copyMessageSettings(*_msg, *command);
command->setOldTimestamp(_msg->getOldTimestamp());
command->setCondition(_msg->getCondition());
+ if (_msg->has_cached_create_if_missing()) {
+ command->set_cached_create_if_missing(_msg->create_if_missing());
+ }
messages.emplace_back(std::move(command), node);
}
diff --git a/storage/src/vespa/storage/distributor/operationtargetresolverimpl.cpp b/storage/src/vespa/storage/distributor/operationtargetresolverimpl.cpp
index 394c13c2bad..618cfb56359 100644
--- a/storage/src/vespa/storage/distributor/operationtargetresolverimpl.cpp
+++ b/storage/src/vespa/storage/distributor/operationtargetresolverimpl.cpp
@@ -10,9 +10,12 @@
namespace storage::distributor {
BucketInstance::BucketInstance(const document::BucketId& id, const api::BucketInfo& info, lib::Node node,
- uint16_t idealLocationPriority, bool trusted, bool exist) noexcept
+ uint16_t ideal_location_priority, uint16_t db_entry_order,
+ bool trusted, bool exist) noexcept
: _bucket(id), _info(info), _node(node),
- _idealLocationPriority(idealLocationPriority), _trusted(trusted), _exist(exist)
+ _ideal_location_priority(ideal_location_priority),
+ _db_entry_order(db_entry_order),
+ _trusted(trusted), _exists(exist)
{
}
@@ -24,8 +27,8 @@ BucketInstance::print(vespalib::asciistream& out, const PrintProperties&) const
std::ostringstream ost;
ost << std::hex << _bucket.getId();
- out << "(" << ost.str() << ", " << infoString << ", node " << _node.getIndex() << ", ideal " << _idealLocationPriority
- << (_trusted ? ", trusted" : "") << (_exist ? "" : ", new copy") << ")";
+ out << "(" << ost.str() << ", " << infoString << ", node " << _node.getIndex() << ", ideal " << _ideal_location_priority
+ << (_trusted ? ", trusted" : "") << (_exists ? "" : ", new copy") << ")";
}
bool
@@ -42,7 +45,7 @@ BucketInstanceList::add(const BucketDatabase::Entry& e, const IdealServiceLayerN
for (uint32_t i = 0; i < e.getBucketInfo().getNodeCount(); ++i) {
const BucketCopy& copy(e.getBucketInfo().getNodeRef(i));
lib::Node node(lib::NodeType::STORAGE, copy.getNode());
- _instances.emplace_back(e.getBucketId(), copy.getBucketInfo(), node, idealState.lookup(copy.getNode()), copy.trusted(), true);
+ _instances.emplace_back(e.getBucketId(), copy.getBucketInfo(), node, idealState.lookup(copy.getNode()), i, copy.trusted(), true);
}
}
@@ -106,7 +109,8 @@ BucketInstanceList::extendToEnoughCopies(const DistributorBucketSpace& distribut
for (uint32_t i=0; i<idealNodes.size(); ++i) {
lib::Node node(lib::NodeType::STORAGE, idealNodes[i]);
if (!contains(node)) {
- _instances.emplace_back(newTarget, api::BucketInfo(), node, i, false, false);
+ // We don't sort `_instances` after extending, so just reuse `i` as dummy DB entry order.
+ _instances.emplace_back(newTarget, api::BucketInfo(), node, i, i, false, false);
}
}
}
@@ -116,7 +120,7 @@ BucketInstanceList::createTargets(document::BucketSpace bucketSpace)
{
OperationTargetList result;
for (const auto& bi : _instances) {
- result.emplace_back(document::Bucket(bucketSpace, bi._bucket), bi._node, !bi._exist);
+ result.emplace_back(document::Bucket(bucketSpace, bi._bucket), bi._node, !bi._exists);
}
return result;
}
@@ -129,6 +133,49 @@ BucketInstanceList::print(vespalib::asciistream& out, const PrintProperties& p)
namespace {
/**
+ * To maintain a symmetry between which replicas receive Puts and which versions are
+ * preferred for activation, use an identical ordering predicate for both (for the case
+ * where replicas are for the same concrete bucket).
+ *
+ * Must only be used with BucketInstances that have a distinct _db_entry_order set per instance.
+ */
+struct ActiveReplicaSymmetricInstanceOrder {
+ bool operator()(const BucketInstance& a, const BucketInstance& b) noexcept {
+ if (a._bucket == b._bucket) {
+ if (a._info.isReady() != b._info.isReady()) {
+ return a._info.isReady();
+ }
+ if (a._info.getDocumentCount() != b._info.getDocumentCount()) {
+ return a._info.getDocumentCount() > b._info.getDocumentCount();
+ }
+ if (a._ideal_location_priority != b._ideal_location_priority) {
+ return a._ideal_location_priority < b._ideal_location_priority;
+ }
+ if (a._info.isActive() != b._info.isActive()) {
+ return a._info.isActive();
+ }
+ // If all else is equal, this implies both A and B are on retired nodes, which is unlikely
+ // but possible. Fall back to the existing DB _entry order_, which is equal to an ideal
+ // state order where retired nodes are considered part of the ideal state (which is not the
+ // case for most ideal state operations). Since the DB entry order is in ideal state order,
+ // using this instead of node _index_ avoids affinities to lower indexes in such edge cases.
+ return a._db_entry_order < b._db_entry_order;
+ } else {
+ // TODO this inconsistent split case is equal to the legacy logic (aside from the tie-breaking),
+ // but is considered to be extremely unlikely in practice, so not worth optimizing for.
+ if ((a._info.getMetaCount() == 0) ^ (b._info.getMetaCount() == 0)) {
+ return (a._info.getMetaCount() == 0);
+ }
+ if (a._bucket.getUsedBits() != b._bucket.getUsedBits()) {
+ return (a._bucket.getUsedBits() > b._bucket.getUsedBits());
+ }
+ return a._db_entry_order < b._db_entry_order;
+ }
+ return false;
+ }
+};
+
+/**
* - Trusted copies should be preferred over non-trusted copies for the same bucket.
* - Buckets in ideal locations should be preferred over non-ideal locations for the
* same bucket across several nodes.
@@ -137,14 +184,14 @@ namespace {
* - Right after split/join, bucket is often not in ideal location, but should be
* preferred instead of source anyhow.
*/
-struct InstanceOrder {
- bool operator()(const BucketInstance& a, const BucketInstance& b) {
+struct LegacyInstanceOrder {
+ bool operator()(const BucketInstance& a, const BucketInstance& b) noexcept {
if (a._bucket == b._bucket) {
- // Trusted only makes sense within same bucket
- // Prefer trusted buckets over non-trusted ones.
+ // Trusted only makes sense within same bucket
+ // Prefer trusted buckets over non-trusted ones.
if (a._trusted != b._trusted) return a._trusted;
- if (a._idealLocationPriority != b._idealLocationPriority) {
- return a._idealLocationPriority < b._idealLocationPriority;
+ if (a._ideal_location_priority != b._ideal_location_priority) {
+ return a._ideal_location_priority < b._ideal_location_priority;
}
} else {
if ((a._info.getMetaCount() == 0) ^ (b._info.getMetaCount() == 0)) {
@@ -164,7 +211,11 @@ OperationTargetResolverImpl::getAllInstances(OperationType type, const document:
BucketInstanceList instances;
if (type == PUT) {
instances.populate(id, _distributor_bucket_space, _bucketDatabase);
- instances.sort(InstanceOrder());
+ if (_symmetric_replica_selection) {
+ instances.sort(ActiveReplicaSymmetricInstanceOrder());
+ } else {
+ instances.sort(LegacyInstanceOrder());
+ }
instances.removeNodeDuplicates();
instances.extendToEnoughCopies(_distributor_bucket_space, _bucketDatabase,
_bucketDatabase.getAppropriateBucket(_minUsedBucketBits, id), id);
diff --git a/storage/src/vespa/storage/distributor/operationtargetresolverimpl.h b/storage/src/vespa/storage/distributor/operationtargetresolverimpl.h
index 9f367a89cba..6ab38928200 100644
--- a/storage/src/vespa/storage/distributor/operationtargetresolverimpl.h
+++ b/storage/src/vespa/storage/distributor/operationtargetresolverimpl.h
@@ -15,15 +15,17 @@ struct BucketInstance : public vespalib::AsciiPrintable {
document::BucketId _bucket;
api::BucketInfo _info;
lib::Node _node;
- uint16_t _idealLocationPriority;
- bool _trusted;
- bool _exist;
+ uint16_t _ideal_location_priority;
+ uint16_t _db_entry_order;
+ bool _trusted; // TODO remove
+ bool _exists;
BucketInstance() noexcept
- : _idealLocationPriority(0xffff), _trusted(false), _exist(false) {}
+ : _ideal_location_priority(0xffff), _db_entry_order(0xffff), _trusted(false), _exists(false)
+ {}
BucketInstance(const document::BucketId& id, const api::BucketInfo& info,
- lib::Node node, uint16_t idealLocationPriority, bool trusted,
- bool exist) noexcept;
+ lib::Node node, uint16_t ideal_location_priority,
+ uint16_t db_entry_order, bool trusted, bool exist) noexcept;
void print(vespalib::asciistream& out, const PrintProperties&) const override;
};
@@ -83,6 +85,7 @@ class OperationTargetResolverImpl : public OperationTargetResolver {
uint32_t _minUsedBucketBits;
uint16_t _redundancy;
document::BucketSpace _bucketSpace;
+ bool _symmetric_replica_selection;
public:
OperationTargetResolverImpl(const DistributorBucketSpace& distributor_bucket_space,
@@ -94,9 +97,14 @@ public:
_bucketDatabase(bucketDatabase),
_minUsedBucketBits(minUsedBucketBits),
_redundancy(redundancy),
- _bucketSpace(bucketSpace)
+ _bucketSpace(bucketSpace),
+ _symmetric_replica_selection(true)
{}
+ void use_symmetric_replica_selection(bool symmetry) noexcept {
+ _symmetric_replica_selection = symmetry;
+ }
+
BucketInstanceList getAllInstances(OperationType type, const document::BucketId& id);
BucketInstanceList getInstances(OperationType type, const document::BucketId& id) {
BucketInstanceList result(getAllInstances(type, id));
diff --git a/storage/src/vespa/storage/persistence/filestorage/merge_handler_metrics.cpp b/storage/src/vespa/storage/persistence/filestorage/merge_handler_metrics.cpp
index c3cb38bd7ac..582c69e943f 100644
--- a/storage/src/vespa/storage/persistence/filestorage/merge_handler_metrics.cpp
+++ b/storage/src/vespa/storage/persistence/filestorage/merge_handler_metrics.cpp
@@ -20,8 +20,8 @@ MergeHandlerMetrics::MergeHandlerMetrics(metrics::MetricSet* owner)
"current node.", owner),
mergeAverageDataReceivedNeeded("mergeavgdatareceivedneeded", {}, "Amount of data transferred from previous node "
"in chain that we needed to apply locally.", owner),
- put_latency("put_latency", {}, "Latency of individual puts that are part of merge operations", owner),
- remove_latency("remove_latency", {}, "Latency of individual removes that are part of merge operations", owner)
+ merge_put_latency("merge_put_latency", {}, "Latency of individual puts that are part of merge operations", owner),
+ merge_remove_latency("merge_remove_latency", {}, "Latency of individual removes that are part of merge operations", owner)
{}
MergeHandlerMetrics::~MergeHandlerMetrics() = default;
diff --git a/storage/src/vespa/storage/persistence/filestorage/merge_handler_metrics.h b/storage/src/vespa/storage/persistence/filestorage/merge_handler_metrics.h
index 44b85570357..a2d68011695 100644
--- a/storage/src/vespa/storage/persistence/filestorage/merge_handler_metrics.h
+++ b/storage/src/vespa/storage/persistence/filestorage/merge_handler_metrics.h
@@ -21,8 +21,8 @@ struct MergeHandlerMetrics {
metrics::DoubleAverageMetric mergeAverageDataReceivedNeeded;
// Individual operation metrics. These capture both count and latency sum, so
// no need for explicit count metric on the side.
- metrics::DoubleAverageMetric put_latency;
- metrics::DoubleAverageMetric remove_latency;
+ metrics::DoubleAverageMetric merge_put_latency;
+ metrics::DoubleAverageMetric merge_remove_latency;
// Iteration over metadata and document payload data is already covered by
// the merge[Meta]Data(Read|Write)Latency metrics, so not repeated here. Can be
// explicitly added if deemed required.
diff --git a/storage/src/vespa/storage/persistence/mergehandler.cpp b/storage/src/vespa/storage/persistence/mergehandler.cpp
index 7ee2d9f37bf..b3207428f5f 100644
--- a/storage/src/vespa/storage/persistence/mergehandler.cpp
+++ b/storage/src/vespa/storage/persistence/mergehandler.cpp
@@ -4,13 +4,14 @@
#include "persistenceutil.h"
#include "apply_bucket_diff_entry_complete.h"
#include "apply_bucket_diff_state.h"
-#include <vespa/storage/persistence/filestorage/mergestatus.h>
-#include <vespa/persistence/spi/persistenceprovider.h>
-#include <vespa/persistence/spi/docentry.h>
-#include <vespa/vdslib/distribution/distribution.h>
#include <vespa/document/fieldset/fieldsets.h>
#include <vespa/document/fieldvalue/document.h>
+#include <vespa/persistence/spi/docentry.h>
+#include <vespa/persistence/spi/persistenceprovider.h>
+#include <vespa/storage/persistence/filestorage/mergestatus.h>
+#include <vespa/vdslib/distribution/distribution.h>
#include <vespa/vespalib/objects/nbostream.h>
+#include <vespa/vespalib/stllike/hash_map.hpp>
#include <vespa/vespalib/util/exceptions.h>
#include <vespa/vespalib/util/isequencedtaskexecutor.h>
#include <algorithm>
@@ -506,8 +507,18 @@ void
MergeHandler::applyDiffEntry(std::shared_ptr<ApplyBucketDiffState> async_results,
const spi::Bucket& bucket,
const api::ApplyBucketDiffCommand::Entry& e,
- const document::DocumentTypeRepo& repo) const
+ const document::DocumentTypeRepo& repo,
+ const NewestDocumentVersionMapping& newest_per_doc) const
{
+ if (!e._docName.empty()) {
+ auto version_iter = newest_per_doc.find(e._docName);
+ assert(version_iter != newest_per_doc.end());
+ if (e._entry._timestamp != version_iter->second) {
+ LOG(spam, "ApplyBucketDiff(%s): skipping diff entry %s since it is subsumed by a newer timestamp %" PRIu64,
+ bucket.toString().c_str(), e.toString().c_str(), version_iter->second);
+ return;
+ }
+ }
auto throttle_token = _env._fileStorHandler.operation_throttler().blocking_acquire_one();
spi::Timestamp timestamp(e._entry._timestamp);
if (!(e._entry._flags & (DELETED | DELETED_IN_PLACE))) {
@@ -516,14 +527,14 @@ MergeHandler::applyDiffEntry(std::shared_ptr<ApplyBucketDiffState> async_results
document::DocumentId docId = doc->getId();
auto complete = std::make_unique<ApplyBucketDiffEntryComplete>(std::move(async_results), std::move(docId),
std::move(throttle_token), "put",
- _clock, _env._metrics.merge_handler_metrics.put_latency);
+ _clock, _env._metrics.merge_handler_metrics.merge_put_latency);
_spi.putAsync(bucket, timestamp, std::move(doc), std::move(complete));
} else {
std::vector<spi::IdAndTimestamp> ids;
ids.emplace_back(document::DocumentId(e._docName), timestamp);
auto complete = std::make_unique<ApplyBucketDiffEntryComplete>(std::move(async_results), ids[0].id,
std::move(throttle_token), "remove",
- _clock, _env._metrics.merge_handler_metrics.remove_latency);
+ _clock, _env._metrics.merge_handler_metrics.merge_remove_latency);
_spi.removeAsync(bucket, std::move(ids), std::move(complete));
}
}
@@ -548,6 +559,7 @@ MergeHandler::applyDiffLocally(const spi::Bucket& bucket, std::vector<api::Apply
DocEntryList entries;
populateMetaData(bucket, Timestamp::max(), entries, context);
+ const auto newest_versions = enumerate_newest_document_versions(diff);
const document::DocumentTypeRepo & repo = _env.getDocumentTypeRepo();
uint32_t existingCount = entries.size();
@@ -580,7 +592,7 @@ MergeHandler::applyDiffLocally(const spi::Bucket& bucket, std::vector<api::Apply
++i;
LOG(spam, "ApplyBucketDiff(%s): Adding slot %s",
bucket.toString().c_str(), e.toString().c_str());
- applyDiffEntry(async_results, bucket, e, repo);
+ applyDiffEntry(async_results, bucket, e, repo, newest_versions);
} else {
assert(spi::Timestamp(e._entry._timestamp) == existing.getTimestamp());
// Diffing for existing timestamp; should either both be put
@@ -591,7 +603,7 @@ MergeHandler::applyDiffLocally(const spi::Bucket& bucket, std::vector<api::Apply
if ((e._entry._flags & DELETED) && !existing.isRemove()) {
LOG(debug, "Slot in diff is remove for existing timestamp in %s. Diff slot: %s. Existing slot: %s",
bucket.toString().c_str(), e.toString().c_str(), existing.toString().c_str());
- applyDiffEntry(async_results, bucket, e, repo);
+ applyDiffEntry(async_results, bucket, e, repo, newest_versions);
} else {
// Duplicate put, just ignore it.
LOG(debug, "During diff apply, attempting to add slot whose timestamp already exists in %s, "
@@ -619,7 +631,7 @@ MergeHandler::applyDiffLocally(const spi::Bucket& bucket, std::vector<api::Apply
LOG(spam, "ApplyBucketDiff(%s): Adding slot %s",
bucket.toString().c_str(), e.toString().c_str());
- applyDiffEntry(async_results, bucket, e, repo);
+ applyDiffEntry(async_results, bucket, e, repo, newest_versions);
byteCount += e._headerBlob.size() + e._bodyBlob.size();
}
if (byteCount + notNeededByteCount != 0) {
@@ -631,6 +643,27 @@ MergeHandler::applyDiffLocally(const spi::Bucket& bucket, std::vector<api::Apply
bucket.toString().c_str(), addedCount);
}
+MergeHandler::NewestDocumentVersionMapping
+MergeHandler::enumerate_newest_document_versions(const std::vector<api::ApplyBucketDiffCommand::Entry>& diff)
+{
+ NewestDocumentVersionMapping newest_per_doc;
+ for (const auto& e : diff) {
+ // We expect the doc name to always be filled out, both for remove operations and for puts.
+ // But since the latter is technically redundant (ID is also found within the document), we
+ // guard on this to be forwards compatible in case this changes (e.g. to populate and use
+ // the GID field instead). Fallback to the legacy behavior if so.
+ if (e._docName.empty()) {
+ continue;
+ }
+ auto [existing_iter, inserted] = newest_per_doc.insert(std::make_pair(vespalib::stringref(e._docName), e._entry._timestamp));
+ if (!inserted) {
+ assert(existing_iter != newest_per_doc.end());
+ existing_iter->second = std::max(existing_iter->second, e._entry._timestamp);
+ }
+ }
+ return newest_per_doc;
+}
+
void
MergeHandler::sync_bucket_info(const spi::Bucket& bucket) const
{
diff --git a/storage/src/vespa/storage/persistence/mergehandler.h b/storage/src/vespa/storage/persistence/mergehandler.h
index f3bef802229..2be45e7bc8b 100644
--- a/storage/src/vespa/storage/persistence/mergehandler.h
+++ b/storage/src/vespa/storage/persistence/mergehandler.h
@@ -18,6 +18,7 @@
#include <vespa/storageapi/message/bucket.h>
#include <vespa/storage/common/cluster_context.h>
#include <vespa/storage/common/messagesender.h>
+#include <vespa/vespalib/stllike/hash_map.h>
#include <vespa/vespalib/util/monitored_refcount.h>
#include <vespa/storageframework/generic/clock/time.h>
@@ -42,6 +43,8 @@ private:
using MessageTrackerUP = std::unique_ptr<MessageTracker>;
using Timestamp = framework::MicroSecTime;
public:
+ using NewestDocumentVersionMapping = vespalib::hash_map<vespalib::stringref, api::Timestamp>;
+
enum StateFlag {
IN_USE = 0x01,
DELETED = 0x02,
@@ -72,6 +75,17 @@ public:
void handleApplyBucketDiffReply(api::ApplyBucketDiffReply&, MessageSender&, MessageTrackerUP) const;
void drain_async_writes();
+ /**
+ * Returns a mapping that, for each document ID, contains the newest version of that document that
+ * is present in the diff.
+ *
+ * The returned hash_map keys point directly into the `ApplyBucketDiffCommand::Entry::_docName` memory
+ * owned by `diff`, so this memory must remain unchanged and stable for the duration of the returned
+ * mapping's lifetime.
+ */
+ static NewestDocumentVersionMapping enumerate_newest_document_versions(
+ const std::vector<api::ApplyBucketDiffCommand::Entry>& diff);
+
private:
using DocEntryList = std::vector<std::unique_ptr<spi::DocEntry>>;
const framework::Clock &_clock;
@@ -90,9 +104,13 @@ private:
/**
* Invoke either put, remove or unrevertable remove on the SPI
* depending on the flags in the diff entry.
+ *
+ * If `newest_doc_version` indicates that the entry is not the newest version present in the
+ * diff, the entry is silently ignored and is _not_ invoked on the SPI.
*/
void applyDiffEntry(std::shared_ptr<ApplyBucketDiffState> async_results, const spi::Bucket&,
- const api::ApplyBucketDiffCommand::Entry&, const document::DocumentTypeRepo& repo) const;
+ const api::ApplyBucketDiffCommand::Entry&, const document::DocumentTypeRepo& repo,
+ const NewestDocumentVersionMapping& newest_per_doc) const;
/**
* Fill entries-vector with metadata for bucket up to maxTimestamp,
diff --git a/storage/src/vespa/storage/storageserver/documentapiconverter.cpp b/storage/src/vespa/storage/storageserver/documentapiconverter.cpp
index ca46e87285b..5b8052a05f8 100644
--- a/storage/src/vespa/storage/storageserver/documentapiconverter.cpp
+++ b/storage/src/vespa/storage/storageserver/documentapiconverter.cpp
@@ -54,6 +54,9 @@ DocumentApiConverter::toStorageAPI(documentapi::DocumentMessage& fromMsg)
auto to = std::make_unique<api::UpdateCommand>(bucket, from.stealDocumentUpdate(), from.getNewTimestamp());
to->setOldTimestamp(from.getOldTimestamp());
to->setCondition(from.getCondition());
+ if (from.has_cached_create_if_missing()) {
+ to->set_cached_create_if_missing(from.create_if_missing());
+ }
toMsg = std::move(to);
break;
}
@@ -217,6 +220,9 @@ DocumentApiConverter::toDocumentAPI(api::StorageCommand& fromMsg)
to->setOldTimestamp(from.getOldTimestamp());
to->setNewTimestamp(from.getTimestamp());
to->setCondition(from.getCondition());
+ if (from.has_cached_create_if_missing()) {
+ to->set_cached_create_if_missing(from.create_if_missing());
+ }
toMsg = std::move(to);
break;
}
diff --git a/storage/src/vespa/storage/storageserver/rpc/cluster_controller_api_rpc_service.cpp b/storage/src/vespa/storage/storageserver/rpc/cluster_controller_api_rpc_service.cpp
index 4d74eb1974b..3b53c0c9584 100644
--- a/storage/src/vespa/storage/storageserver/rpc/cluster_controller_api_rpc_service.cpp
+++ b/storage/src/vespa/storage/storageserver/rpc/cluster_controller_api_rpc_service.cpp
@@ -124,7 +124,8 @@ void ClusterControllerApiRpcService::RPC_setSystemState2(FRT_RPCRequest* req) {
req->GetParams()->GetValue(0)._string._len);
lib::ClusterState systemState(systemStateStr);
- auto cmd = std::make_shared<api::SetSystemStateCommand>(lib::ClusterStateBundle(systemState));
+ auto bundle = std::make_shared<const lib::ClusterStateBundle>(systemState);
+ auto cmd = std::make_shared<api::SetSystemStateCommand>(std::move(bundle));
cmd->setPriority(api::StorageMessage::VERYHIGH);
detach_and_forward_to_enqueuer(std::move(cmd), req);
@@ -167,8 +168,7 @@ void ClusterControllerApiRpcService::RPC_setDistributionStates(FRT_RPCRequest* r
}
LOG(debug, "Got state bundle %s", state_bundle->toString().c_str());
- // TODO add constructor taking in shared_ptr directly instead?
- auto cmd = std::make_shared<api::SetSystemStateCommand>(*state_bundle);
+ auto cmd = std::make_shared<api::SetSystemStateCommand>(std::move(state_bundle));
cmd->setPriority(api::StorageMessage::VERYHIGH);
detach_and_forward_to_enqueuer(std::move(cmd), req);
diff --git a/storage/src/vespa/storage/storageserver/storagenode.cpp b/storage/src/vespa/storage/storageserver/storagenode.cpp
index f7a426a0527..35b70dd853c 100644
--- a/storage/src/vespa/storage/storageserver/storagenode.cpp
+++ b/storage/src/vespa/storage/storageserver/storagenode.cpp
@@ -140,7 +140,7 @@ StorageNode::initialize(const NodeStateReporter & nodeStateReporter)
// Initializing state manager early, as others use it init time to
// update node state according min used bits etc.
// Needs node type to be set right away. Needs thread pool, index and
- // dead lock detector too, but not before open()
+ // deadlock detector too, but not before open()
_stateManager = std::make_unique<StateManager>(
_context.getComponentRegister(),
std::move(_hostInfo),
@@ -148,10 +148,10 @@ StorageNode::initialize(const NodeStateReporter & nodeStateReporter)
_singleThreadedDebugMode);
_context.getComponentRegister().setNodeStateUpdater(*_stateManager);
- // Create VDS root folder, in case it doesn't already exist.
- // Maybe better to rather fail if it doesn't exist, but tests
- // might break if we do that. Might alter later.
- std::filesystem::create_directories(std::filesystem::path(_rootFolder));
+ // Create storage root folder, in case it doesn't already exist.
+ if (!_rootFolder.empty()) {
+ std::filesystem::create_directories(std::filesystem::path(_rootFolder));
+ } // else: running as part of unit tests
initializeNodeSpecific();
@@ -192,13 +192,16 @@ StorageNode::initialize(const NodeStateReporter & nodeStateReporter)
initializeStatusWebServer();
+ if (server_config().writePidFileOnStartup) {
+ assert(!_rootFolder.empty());
// Write pid file as the last thing we do. If we fail initialization
// due to an exception we won't run shutdown. Thus we won't remove the
// pid file if something throws after writing it in initialization.
// Initialize _pidfile here, such that we can know that we didn't create
// it in shutdown code for shutdown during init.
- _pidFile = _rootFolder + "/pidfile";
- writePidFile(_pidFile);
+ _pidFile = _rootFolder + "/pidfile";
+ writePidFile(_pidFile);
+ }
}
void
diff --git a/storage/src/vespa/storageapi/mbusprot/protobuf/feed.proto b/storage/src/vespa/storageapi/mbusprot/protobuf/feed.proto
index 55d516a017b..403752b0c84 100644
--- a/storage/src/vespa/storageapi/mbusprot/protobuf/feed.proto
+++ b/storage/src/vespa/storageapi/mbusprot/protobuf/feed.proto
@@ -31,11 +31,18 @@ message Update {
}
message UpdateRequest {
- Bucket bucket = 1;
- Update update = 2;
- uint64 new_timestamp = 3;
- uint64 expected_old_timestamp = 4; // If zero; no expectation.
- TestAndSetCondition condition = 5;
+ enum CreateIfMissing {
+ UNSPECIFIED = 0; // Legacy fallback: must deserialize `update` to find flag value
+ TRUE = 1;
+ FALSE = 2;
+ }
+
+ Bucket bucket = 1;
+ Update update = 2;
+ uint64 new_timestamp = 3;
+ uint64 expected_old_timestamp = 4; // If zero; no expectation.
+ TestAndSetCondition condition = 5;
+ CreateIfMissing create_if_missing = 6;
}
message UpdateResponse {
diff --git a/storage/src/vespa/storageapi/mbusprot/protocolserialization7.cpp b/storage/src/vespa/storageapi/mbusprot/protocolserialization7.cpp
index 57047be6037..0f4a34cc775 100644
--- a/storage/src/vespa/storageapi/mbusprot/protocolserialization7.cpp
+++ b/storage/src/vespa/storageapi/mbusprot/protocolserialization7.cpp
@@ -465,6 +465,10 @@ void ProtocolSerialization7::onEncode(GBBuf& buf, const api::UpdateCommand& msg)
if (msg.getCondition().isPresent()) {
set_tas_condition(*req.mutable_condition(), msg.getCondition());
}
+ if (msg.has_cached_create_if_missing()) {
+ req.set_create_if_missing(msg.create_if_missing() ? protobuf::UpdateRequest_CreateIfMissing_TRUE
+ : protobuf::UpdateRequest_CreateIfMissing_FALSE);
+ }
});
}
@@ -482,6 +486,9 @@ api::StorageCommand::UP ProtocolSerialization7::onDecodeUpdateCommand(BBuf& buf)
if (req.has_condition()) {
cmd->setCondition(get_tas_condition(req.condition()));
}
+ if (req.create_if_missing() != protobuf::UpdateRequest_CreateIfMissing_UNSPECIFIED) {
+ cmd->set_cached_create_if_missing(req.create_if_missing() == protobuf::UpdateRequest_CreateIfMissing_TRUE);
+ }
return cmd;
});
}
diff --git a/storage/src/vespa/storageapi/message/persistence.cpp b/storage/src/vespa/storageapi/message/persistence.cpp
index 4c24bb74faf..af054855bbe 100644
--- a/storage/src/vespa/storageapi/message/persistence.cpp
+++ b/storage/src/vespa/storageapi/message/persistence.cpp
@@ -105,13 +105,23 @@ UpdateCommand::UpdateCommand(const document::Bucket &bucket, const document::Doc
: TestAndSetCommand(MessageType::UPDATE, bucket),
_update(update),
_timestamp(time),
- _oldTimestamp(0)
+ _oldTimestamp(0),
+ _create_if_missing()
{
if ( ! _update) {
throw vespalib::IllegalArgumentException("Cannot update a null update", VESPA_STRLOC);
}
}
+bool
+UpdateCommand::create_if_missing() const
+{
+ if (_create_if_missing.has_value()) {
+ return *_create_if_missing;
+ }
+ return _update->getCreateIfNonExistent();
+}
+
const document::DocumentType *
UpdateCommand::getDocumentType() const {
return &_update->getType();
diff --git a/storage/src/vespa/storageapi/message/persistence.h b/storage/src/vespa/storageapi/message/persistence.h
index f44ab4e8280..0676e1d0f44 100644
--- a/storage/src/vespa/storageapi/message/persistence.h
+++ b/storage/src/vespa/storageapi/message/persistence.h
@@ -1,8 +1,6 @@
// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
/**
- * @file persistence.h
- *
- * Persistence related commands, like put, get & remove
+ * Persistence related commands, like put, get & remove
*/
#pragma once
@@ -10,6 +8,7 @@
#include <vespa/storageapi/defs.h>
#include <vespa/document/base/documentid.h>
#include <vespa/documentapi/messagebus/messages/testandsetcondition.h>
+#include <optional>
namespace document {
class DocumentUpdate;
@@ -117,20 +116,32 @@ class UpdateCommand : public TestAndSetCommand {
std::shared_ptr<document::DocumentUpdate> _update;
Timestamp _timestamp;
Timestamp _oldTimestamp;
+ std::optional<bool> _create_if_missing; // caches the value held (possibly lazily deserialized) in _update
public:
UpdateCommand(const document::Bucket &bucket,
const std::shared_ptr<document::DocumentUpdate>&, Timestamp);
~UpdateCommand() override;
- void setTimestamp(Timestamp ts) { _timestamp = ts; }
- void setOldTimestamp(Timestamp ts) { _oldTimestamp = ts; }
+ void setTimestamp(Timestamp ts) noexcept { _timestamp = ts; }
+ void setOldTimestamp(Timestamp ts) noexcept { _oldTimestamp = ts; }
+
+ [[nodiscard]] bool has_cached_create_if_missing() const noexcept {
+ return _create_if_missing.has_value();
+ }
+ // It is the caller's responsibility to ensure this value matches that of _update->getCreateIfNonExisting()
+ void set_cached_create_if_missing(bool create) noexcept {
+ _create_if_missing = create;
+ }
const std::shared_ptr<document::DocumentUpdate>& getUpdate() const { return _update; }
const document::DocumentId& getDocumentId() const override;
Timestamp getTimestamp() const { return _timestamp; }
Timestamp getOldTimestamp() const { return _oldTimestamp; }
+ // May throw iff has_cached_create_if_missing() == false, otherwise noexcept.
+ [[nodiscard]] bool create_if_missing() const;
+
const document::DocumentType * getDocumentType() const override;
vespalib::string getSummary() const override;
diff --git a/storage/src/vespa/storageapi/message/state.cpp b/storage/src/vespa/storageapi/message/state.cpp
index 5a50167f584..b4e8655d783 100644
--- a/storage/src/vespa/storageapi/message/state.cpp
+++ b/storage/src/vespa/storageapi/message/state.cpp
@@ -5,8 +5,7 @@
#include <vespa/vdslib/state/clusterstate.h>
#include <ostream>
-namespace storage {
-namespace api {
+namespace storage::api {
IMPLEMENT_COMMAND(GetNodeStateCommand, GetNodeStateReply)
IMPLEMENT_REPLY(GetNodeStateReply)
@@ -45,7 +44,7 @@ GetNodeStateReply::GetNodeStateReply(const GetNodeStateCommand& cmd)
GetNodeStateReply::GetNodeStateReply(const GetNodeStateCommand& cmd,
const lib::NodeState& state)
: StorageReply(cmd),
- _state(new lib::NodeState(state))
+ _state(std::make_unique<lib::NodeState>(state))
{
}
@@ -64,23 +63,31 @@ GetNodeStateReply::print(std::ostream& out, bool verbose,
}
}
+SetSystemStateCommand::SetSystemStateCommand(std::shared_ptr<const lib::ClusterStateBundle> state)
+ : StorageCommand(MessageType::SETSYSTEMSTATE),
+ _state(std::move(state))
+{
+}
+
SetSystemStateCommand::SetSystemStateCommand(const lib::ClusterStateBundle& state)
: StorageCommand(MessageType::SETSYSTEMSTATE),
- _state(state)
+ _state(std::make_shared<const lib::ClusterStateBundle>(state))
{
}
SetSystemStateCommand::SetSystemStateCommand(const lib::ClusterState& state)
: StorageCommand(MessageType::SETSYSTEMSTATE),
- _state(state)
+ _state(std::make_shared<const lib::ClusterStateBundle>(state))
{
}
+SetSystemStateCommand::~SetSystemStateCommand() = default;
+
void
SetSystemStateCommand::print(std::ostream& out, bool verbose,
const std::string& indent) const
{
- out << "SetSystemStateCommand(" << *_state.getBaselineClusterState() << ")";
+ out << "SetSystemStateCommand(" << *_state->getBaselineClusterState() << ")";
if (verbose) {
out << " : ";
StorageCommand::print(out, verbose, indent);
@@ -89,7 +96,7 @@ SetSystemStateCommand::print(std::ostream& out, bool verbose,
SetSystemStateReply::SetSystemStateReply(const SetSystemStateCommand& cmd)
: StorageReply(cmd),
- _state(cmd.getClusterStateBundle())
+ _state(cmd.cluster_state_bundle_ptr())
{
}
@@ -138,5 +145,4 @@ void ActivateClusterStateVersionReply::print(std::ostream& out, bool verbose,
}
}
-} // api
-} // storage
+} // storage::api
diff --git a/storage/src/vespa/storageapi/message/state.h b/storage/src/vespa/storageapi/message/state.h
index 900355b12a2..4aa4c8a8f31 100644
--- a/storage/src/vespa/storageapi/message/state.h
+++ b/storage/src/vespa/storageapi/message/state.h
@@ -61,13 +61,19 @@ public:
* put/get/remove etx)
*/
class SetSystemStateCommand : public StorageCommand {
- lib::ClusterStateBundle _state;
+ std::shared_ptr<const lib::ClusterStateBundle> _state;
public:
+ explicit SetSystemStateCommand(std::shared_ptr<const lib::ClusterStateBundle> state);
explicit SetSystemStateCommand(const lib::ClusterStateBundle &state);
explicit SetSystemStateCommand(const lib::ClusterState &state);
- const lib::ClusterState& getSystemState() const { return *_state.getBaselineClusterState(); }
- const lib::ClusterStateBundle& getClusterStateBundle() const { return _state; }
+ ~SetSystemStateCommand() override;
+
+ [[nodiscard]] const lib::ClusterState& getSystemState() const { return *_state->getBaselineClusterState(); }
+ [[nodiscard]] const lib::ClusterStateBundle& getClusterStateBundle() const { return *_state; }
+ [[nodiscard]] std::shared_ptr<const lib::ClusterStateBundle> cluster_state_bundle_ptr() const noexcept {
+ return _state;
+ }
void print(std::ostream& out, bool verbose, const std::string& indent) const override;
DECLARE_STORAGECOMMAND(SetSystemStateCommand, onSetSystemState)
@@ -80,14 +86,14 @@ public:
* @brief Reply received after a SetSystemStateCommand.
*/
class SetSystemStateReply : public StorageReply {
- lib::ClusterStateBundle _state;
+ std::shared_ptr<const lib::ClusterStateBundle> _state;
public:
explicit SetSystemStateReply(const SetSystemStateCommand& cmd);
// Not serialized. Available locally
- const lib::ClusterState& getSystemState() const { return *_state.getBaselineClusterState(); }
- const lib::ClusterStateBundle& getClusterStateBundle() const { return _state; }
+ const lib::ClusterState& getSystemState() const { return *_state->getBaselineClusterState(); }
+ const lib::ClusterStateBundle& getClusterStateBundle() const { return *_state; }
void print(std::ostream& out, bool verbose, const std::string& indent) const override;
DECLARE_STORAGEREPLY(SetSystemStateReply, onSetSystemStateReply)