diff options
Diffstat (limited to 'storage/src/vespa/storage')
17 files changed, 194 insertions, 50 deletions
diff --git a/storage/src/vespa/storage/config/distributorconfiguration.cpp b/storage/src/vespa/storage/config/distributorconfiguration.cpp index 6957e541d6b..cfbc4caf82d 100644 --- a/storage/src/vespa/storage/config/distributorconfiguration.cpp +++ b/storage/src/vespa/storage/config/distributorconfiguration.cpp @@ -46,6 +46,7 @@ DistributorConfiguration::DistributorConfiguration(StorageComponent& component) _use_weak_internal_read_consistency_for_client_gets(false), _enable_metadata_only_fetch_phase_for_inconsistent_updates(true), _enable_operation_cancellation(false), + _symmetric_put_and_activate_replica_selection(false), _minimumReplicaCountingMode(ReplicaCountingMode::TRUSTED) { } @@ -150,6 +151,7 @@ DistributorConfiguration::configure(const DistributorManagerConfig & config) _max_activation_inhibited_out_of_sync_groups = config.maxActivationInhibitedOutOfSyncGroups; _enable_operation_cancellation = config.enableOperationCancellation; _minimumReplicaCountingMode = deriveReplicaCountingMode(config.minimumReplicaCountingMode); + _symmetric_put_and_activate_replica_selection = config.symmetricPutAndActivateReplicaSelection; if (config.maxClusterClockSkewSec >= 0) { _maxClusterClockSkew = std::chrono::seconds(config.maxClusterClockSkewSec); diff --git a/storage/src/vespa/storage/config/distributorconfiguration.h b/storage/src/vespa/storage/config/distributorconfiguration.h index 38fac13150c..2b73fdc0fa1 100644 --- a/storage/src/vespa/storage/config/distributorconfiguration.h +++ b/storage/src/vespa/storage/config/distributorconfiguration.h @@ -234,8 +234,11 @@ public: [[nodiscard]] bool enable_operation_cancellation() const noexcept { return _enable_operation_cancellation; } + [[nodiscard]] bool symmetric_put_and_activate_replica_selection() const noexcept { + return _symmetric_put_and_activate_replica_selection; + } - bool containsTimeStatement(const std::string& documentSelection) const; + [[nodiscard]] bool containsTimeStatement(const std::string& documentSelection) const; private: StorageComponent& _component; @@ -276,6 +279,7 @@ private: bool _use_weak_internal_read_consistency_for_client_gets; bool _enable_metadata_only_fetch_phase_for_inconsistent_updates; //TODO Rewrite tests and GC bool _enable_operation_cancellation; + bool _symmetric_put_and_activate_replica_selection; ReplicaCountingMode _minimumReplicaCountingMode; }; diff --git a/storage/src/vespa/storage/config/stor-distributormanager.def b/storage/src/vespa/storage/config/stor-distributormanager.def index 3f6028d7fa1..a4d4461ba68 100644 --- a/storage/src/vespa/storage/config/stor-distributormanager.def +++ b/storage/src/vespa/storage/config/stor-distributormanager.def @@ -159,6 +159,13 @@ num_distributor_stripes int default=0 restart ## requests partially or fully "invalidated" by such a change. enable_operation_cancellation bool default=false +## Iff true there will be an 1-1 symmetry between the replicas chosen as feed targets +## for Put operations and the replica selection logic for bucket activation. In particular, +## the most preferred replica for feed will be the most preferred bucket for activation. +## This helps ensure that new versions of documents are routed to replicas that are most +## likely to reflect these changes as part of visible search results. +symmetric_put_and_activate_replica_selection bool default=false + ## TODO GC very soon, it has no effect. priority_merge_out_of_sync_copies int default=120 diff --git a/storage/src/vespa/storage/config/stor-server.def b/storage/src/vespa/storage/config/stor-server.def index 8cd204bcf9f..44e4b14eafc 100644 --- a/storage/src/vespa/storage/config/stor-server.def +++ b/storage/src/vespa/storage/config/stor-server.def @@ -114,3 +114,7 @@ simulated_bucket_request_latency_msec int default=0 ## a disjoint subset of the node's buckets, in order to reduce locking contention. ## Max value is unspecified, but will be clamped internally. content_node_bucket_db_stripe_bits int default=4 restart + +## Iff set, a special `pidfile` file is written under the node's root directory upon +## startup containing the PID of the running process. +write_pid_file_on_startup bool default=true diff --git a/storage/src/vespa/storage/distributor/activecopy.cpp b/storage/src/vespa/storage/distributor/activecopy.cpp index 35070bcee3b..b823978a0cc 100644 --- a/storage/src/vespa/storage/distributor/activecopy.cpp +++ b/storage/src/vespa/storage/distributor/activecopy.cpp @@ -108,6 +108,7 @@ buildNodeList(const BucketDatabase::Entry& e,vespalib::ConstArrayRef<uint16_t> n struct ActiveStateOrder { bool operator()(const ActiveCopy & e1, const ActiveCopy & e2) noexcept { + // Replica selection order should be kept in sync with OperationTargetResolverImpl's InstanceOrder. if (e1._ready != e2._ready) { return e1._ready; } @@ -120,7 +121,9 @@ struct ActiveStateOrder { if (e1._active != e2._active) { return e1._active; } - return e1.nodeIndex() < e2.nodeIndex(); + // Use _entry_ order instead of node index, as it is in ideal state order (even for retired + // nodes), which avoids unintentional affinities towards lower node indexes. + return e1.entryIndex() < e2.entryIndex(); } }; diff --git a/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp index 54087850e1b..a92896279b0 100644 --- a/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp @@ -180,6 +180,8 @@ void PutOperation::start_direct_put_dispatch(DistributorStripeMessageSender& sen _op_ctx.distributor_config().getMinimalBucketSplit(), _bucket_space.getDistribution().getRedundancy(), _msg->getBucket().getBucketSpace()); + targetResolver.use_symmetric_replica_selection( + _op_ctx.distributor_config().symmetric_put_and_activate_replica_selection()); OperationTargetList targets(targetResolver.getTargets(OperationTargetResolver::PUT, _doc_id_bucket_id)); for (const auto& target : targets) { diff --git a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp index 84e9ab71bcb..849746416d6 100644 --- a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp @@ -668,7 +668,7 @@ TwoPhaseUpdateOperation::applyUpdateToDocument(document::Document& doc) const bool TwoPhaseUpdateOperation::shouldCreateIfNonExistent() const { - return _updateCmd->getUpdate()->getCreateIfNonExistent(); + return _updateCmd->create_if_missing(); } bool diff --git a/storage/src/vespa/storage/distributor/operations/external/updateoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/updateoperation.cpp index 7b6833cc299..2b47d53363f 100644 --- a/storage/src/vespa/storage/distributor/operations/external/updateoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/external/updateoperation.cpp @@ -29,7 +29,7 @@ UpdateOperation::UpdateOperation(const DistributorNodeContext& node_ctx, _msg(msg), _entries(std::move(entries)), _new_timestamp(_msg->getTimestamp()), - _is_auto_create_update(_msg->getUpdate()->getCreateIfNonExistent()), + _is_auto_create_update(_msg->create_if_missing()), _node_ctx(node_ctx), _op_ctx(op_ctx), _bucketSpace(bucketSpace), @@ -112,6 +112,9 @@ UpdateOperation::onStart(DistributorStripeMessageSender& sender) copyMessageSettings(*_msg, *command); command->setOldTimestamp(_msg->getOldTimestamp()); command->setCondition(_msg->getCondition()); + if (_msg->has_cached_create_if_missing()) { + command->set_cached_create_if_missing(_msg->create_if_missing()); + } messages.emplace_back(std::move(command), node); } diff --git a/storage/src/vespa/storage/distributor/operationtargetresolverimpl.cpp b/storage/src/vespa/storage/distributor/operationtargetresolverimpl.cpp index 394c13c2bad..618cfb56359 100644 --- a/storage/src/vespa/storage/distributor/operationtargetresolverimpl.cpp +++ b/storage/src/vespa/storage/distributor/operationtargetresolverimpl.cpp @@ -10,9 +10,12 @@ namespace storage::distributor { BucketInstance::BucketInstance(const document::BucketId& id, const api::BucketInfo& info, lib::Node node, - uint16_t idealLocationPriority, bool trusted, bool exist) noexcept + uint16_t ideal_location_priority, uint16_t db_entry_order, + bool trusted, bool exist) noexcept : _bucket(id), _info(info), _node(node), - _idealLocationPriority(idealLocationPriority), _trusted(trusted), _exist(exist) + _ideal_location_priority(ideal_location_priority), + _db_entry_order(db_entry_order), + _trusted(trusted), _exists(exist) { } @@ -24,8 +27,8 @@ BucketInstance::print(vespalib::asciistream& out, const PrintProperties&) const std::ostringstream ost; ost << std::hex << _bucket.getId(); - out << "(" << ost.str() << ", " << infoString << ", node " << _node.getIndex() << ", ideal " << _idealLocationPriority - << (_trusted ? ", trusted" : "") << (_exist ? "" : ", new copy") << ")"; + out << "(" << ost.str() << ", " << infoString << ", node " << _node.getIndex() << ", ideal " << _ideal_location_priority + << (_trusted ? ", trusted" : "") << (_exists ? "" : ", new copy") << ")"; } bool @@ -42,7 +45,7 @@ BucketInstanceList::add(const BucketDatabase::Entry& e, const IdealServiceLayerN for (uint32_t i = 0; i < e.getBucketInfo().getNodeCount(); ++i) { const BucketCopy& copy(e.getBucketInfo().getNodeRef(i)); lib::Node node(lib::NodeType::STORAGE, copy.getNode()); - _instances.emplace_back(e.getBucketId(), copy.getBucketInfo(), node, idealState.lookup(copy.getNode()), copy.trusted(), true); + _instances.emplace_back(e.getBucketId(), copy.getBucketInfo(), node, idealState.lookup(copy.getNode()), i, copy.trusted(), true); } } @@ -106,7 +109,8 @@ BucketInstanceList::extendToEnoughCopies(const DistributorBucketSpace& distribut for (uint32_t i=0; i<idealNodes.size(); ++i) { lib::Node node(lib::NodeType::STORAGE, idealNodes[i]); if (!contains(node)) { - _instances.emplace_back(newTarget, api::BucketInfo(), node, i, false, false); + // We don't sort `_instances` after extending, so just reuse `i` as dummy DB entry order. + _instances.emplace_back(newTarget, api::BucketInfo(), node, i, i, false, false); } } } @@ -116,7 +120,7 @@ BucketInstanceList::createTargets(document::BucketSpace bucketSpace) { OperationTargetList result; for (const auto& bi : _instances) { - result.emplace_back(document::Bucket(bucketSpace, bi._bucket), bi._node, !bi._exist); + result.emplace_back(document::Bucket(bucketSpace, bi._bucket), bi._node, !bi._exists); } return result; } @@ -129,6 +133,49 @@ BucketInstanceList::print(vespalib::asciistream& out, const PrintProperties& p) namespace { /** + * To maintain a symmetry between which replicas receive Puts and which versions are + * preferred for activation, use an identical ordering predicate for both (for the case + * where replicas are for the same concrete bucket). + * + * Must only be used with BucketInstances that have a distinct _db_entry_order set per instance. + */ +struct ActiveReplicaSymmetricInstanceOrder { + bool operator()(const BucketInstance& a, const BucketInstance& b) noexcept { + if (a._bucket == b._bucket) { + if (a._info.isReady() != b._info.isReady()) { + return a._info.isReady(); + } + if (a._info.getDocumentCount() != b._info.getDocumentCount()) { + return a._info.getDocumentCount() > b._info.getDocumentCount(); + } + if (a._ideal_location_priority != b._ideal_location_priority) { + return a._ideal_location_priority < b._ideal_location_priority; + } + if (a._info.isActive() != b._info.isActive()) { + return a._info.isActive(); + } + // If all else is equal, this implies both A and B are on retired nodes, which is unlikely + // but possible. Fall back to the existing DB _entry order_, which is equal to an ideal + // state order where retired nodes are considered part of the ideal state (which is not the + // case for most ideal state operations). Since the DB entry order is in ideal state order, + // using this instead of node _index_ avoids affinities to lower indexes in such edge cases. + return a._db_entry_order < b._db_entry_order; + } else { + // TODO this inconsistent split case is equal to the legacy logic (aside from the tie-breaking), + // but is considered to be extremely unlikely in practice, so not worth optimizing for. + if ((a._info.getMetaCount() == 0) ^ (b._info.getMetaCount() == 0)) { + return (a._info.getMetaCount() == 0); + } + if (a._bucket.getUsedBits() != b._bucket.getUsedBits()) { + return (a._bucket.getUsedBits() > b._bucket.getUsedBits()); + } + return a._db_entry_order < b._db_entry_order; + } + return false; + } +}; + +/** * - Trusted copies should be preferred over non-trusted copies for the same bucket. * - Buckets in ideal locations should be preferred over non-ideal locations for the * same bucket across several nodes. @@ -137,14 +184,14 @@ namespace { * - Right after split/join, bucket is often not in ideal location, but should be * preferred instead of source anyhow. */ -struct InstanceOrder { - bool operator()(const BucketInstance& a, const BucketInstance& b) { +struct LegacyInstanceOrder { + bool operator()(const BucketInstance& a, const BucketInstance& b) noexcept { if (a._bucket == b._bucket) { - // Trusted only makes sense within same bucket - // Prefer trusted buckets over non-trusted ones. + // Trusted only makes sense within same bucket + // Prefer trusted buckets over non-trusted ones. if (a._trusted != b._trusted) return a._trusted; - if (a._idealLocationPriority != b._idealLocationPriority) { - return a._idealLocationPriority < b._idealLocationPriority; + if (a._ideal_location_priority != b._ideal_location_priority) { + return a._ideal_location_priority < b._ideal_location_priority; } } else { if ((a._info.getMetaCount() == 0) ^ (b._info.getMetaCount() == 0)) { @@ -164,7 +211,11 @@ OperationTargetResolverImpl::getAllInstances(OperationType type, const document: BucketInstanceList instances; if (type == PUT) { instances.populate(id, _distributor_bucket_space, _bucketDatabase); - instances.sort(InstanceOrder()); + if (_symmetric_replica_selection) { + instances.sort(ActiveReplicaSymmetricInstanceOrder()); + } else { + instances.sort(LegacyInstanceOrder()); + } instances.removeNodeDuplicates(); instances.extendToEnoughCopies(_distributor_bucket_space, _bucketDatabase, _bucketDatabase.getAppropriateBucket(_minUsedBucketBits, id), id); diff --git a/storage/src/vespa/storage/distributor/operationtargetresolverimpl.h b/storage/src/vespa/storage/distributor/operationtargetresolverimpl.h index 9f367a89cba..6ab38928200 100644 --- a/storage/src/vespa/storage/distributor/operationtargetresolverimpl.h +++ b/storage/src/vespa/storage/distributor/operationtargetresolverimpl.h @@ -15,15 +15,17 @@ struct BucketInstance : public vespalib::AsciiPrintable { document::BucketId _bucket; api::BucketInfo _info; lib::Node _node; - uint16_t _idealLocationPriority; - bool _trusted; - bool _exist; + uint16_t _ideal_location_priority; + uint16_t _db_entry_order; + bool _trusted; // TODO remove + bool _exists; BucketInstance() noexcept - : _idealLocationPriority(0xffff), _trusted(false), _exist(false) {} + : _ideal_location_priority(0xffff), _db_entry_order(0xffff), _trusted(false), _exists(false) + {} BucketInstance(const document::BucketId& id, const api::BucketInfo& info, - lib::Node node, uint16_t idealLocationPriority, bool trusted, - bool exist) noexcept; + lib::Node node, uint16_t ideal_location_priority, + uint16_t db_entry_order, bool trusted, bool exist) noexcept; void print(vespalib::asciistream& out, const PrintProperties&) const override; }; @@ -83,6 +85,7 @@ class OperationTargetResolverImpl : public OperationTargetResolver { uint32_t _minUsedBucketBits; uint16_t _redundancy; document::BucketSpace _bucketSpace; + bool _symmetric_replica_selection; public: OperationTargetResolverImpl(const DistributorBucketSpace& distributor_bucket_space, @@ -94,9 +97,14 @@ public: _bucketDatabase(bucketDatabase), _minUsedBucketBits(minUsedBucketBits), _redundancy(redundancy), - _bucketSpace(bucketSpace) + _bucketSpace(bucketSpace), + _symmetric_replica_selection(true) {} + void use_symmetric_replica_selection(bool symmetry) noexcept { + _symmetric_replica_selection = symmetry; + } + BucketInstanceList getAllInstances(OperationType type, const document::BucketId& id); BucketInstanceList getInstances(OperationType type, const document::BucketId& id) { BucketInstanceList result(getAllInstances(type, id)); diff --git a/storage/src/vespa/storage/persistence/filestorage/merge_handler_metrics.cpp b/storage/src/vespa/storage/persistence/filestorage/merge_handler_metrics.cpp index c3cb38bd7ac..582c69e943f 100644 --- a/storage/src/vespa/storage/persistence/filestorage/merge_handler_metrics.cpp +++ b/storage/src/vespa/storage/persistence/filestorage/merge_handler_metrics.cpp @@ -20,8 +20,8 @@ MergeHandlerMetrics::MergeHandlerMetrics(metrics::MetricSet* owner) "current node.", owner), mergeAverageDataReceivedNeeded("mergeavgdatareceivedneeded", {}, "Amount of data transferred from previous node " "in chain that we needed to apply locally.", owner), - put_latency("put_latency", {}, "Latency of individual puts that are part of merge operations", owner), - remove_latency("remove_latency", {}, "Latency of individual removes that are part of merge operations", owner) + merge_put_latency("merge_put_latency", {}, "Latency of individual puts that are part of merge operations", owner), + merge_remove_latency("merge_remove_latency", {}, "Latency of individual removes that are part of merge operations", owner) {} MergeHandlerMetrics::~MergeHandlerMetrics() = default; diff --git a/storage/src/vespa/storage/persistence/filestorage/merge_handler_metrics.h b/storage/src/vespa/storage/persistence/filestorage/merge_handler_metrics.h index 44b85570357..a2d68011695 100644 --- a/storage/src/vespa/storage/persistence/filestorage/merge_handler_metrics.h +++ b/storage/src/vespa/storage/persistence/filestorage/merge_handler_metrics.h @@ -21,8 +21,8 @@ struct MergeHandlerMetrics { metrics::DoubleAverageMetric mergeAverageDataReceivedNeeded; // Individual operation metrics. These capture both count and latency sum, so // no need for explicit count metric on the side. - metrics::DoubleAverageMetric put_latency; - metrics::DoubleAverageMetric remove_latency; + metrics::DoubleAverageMetric merge_put_latency; + metrics::DoubleAverageMetric merge_remove_latency; // Iteration over metadata and document payload data is already covered by // the merge[Meta]Data(Read|Write)Latency metrics, so not repeated here. Can be // explicitly added if deemed required. diff --git a/storage/src/vespa/storage/persistence/mergehandler.cpp b/storage/src/vespa/storage/persistence/mergehandler.cpp index 7ee2d9f37bf..b3207428f5f 100644 --- a/storage/src/vespa/storage/persistence/mergehandler.cpp +++ b/storage/src/vespa/storage/persistence/mergehandler.cpp @@ -4,13 +4,14 @@ #include "persistenceutil.h" #include "apply_bucket_diff_entry_complete.h" #include "apply_bucket_diff_state.h" -#include <vespa/storage/persistence/filestorage/mergestatus.h> -#include <vespa/persistence/spi/persistenceprovider.h> -#include <vespa/persistence/spi/docentry.h> -#include <vespa/vdslib/distribution/distribution.h> #include <vespa/document/fieldset/fieldsets.h> #include <vespa/document/fieldvalue/document.h> +#include <vespa/persistence/spi/docentry.h> +#include <vespa/persistence/spi/persistenceprovider.h> +#include <vespa/storage/persistence/filestorage/mergestatus.h> +#include <vespa/vdslib/distribution/distribution.h> #include <vespa/vespalib/objects/nbostream.h> +#include <vespa/vespalib/stllike/hash_map.hpp> #include <vespa/vespalib/util/exceptions.h> #include <vespa/vespalib/util/isequencedtaskexecutor.h> #include <algorithm> @@ -506,8 +507,18 @@ void MergeHandler::applyDiffEntry(std::shared_ptr<ApplyBucketDiffState> async_results, const spi::Bucket& bucket, const api::ApplyBucketDiffCommand::Entry& e, - const document::DocumentTypeRepo& repo) const + const document::DocumentTypeRepo& repo, + const NewestDocumentVersionMapping& newest_per_doc) const { + if (!e._docName.empty()) { + auto version_iter = newest_per_doc.find(e._docName); + assert(version_iter != newest_per_doc.end()); + if (e._entry._timestamp != version_iter->second) { + LOG(spam, "ApplyBucketDiff(%s): skipping diff entry %s since it is subsumed by a newer timestamp %" PRIu64, + bucket.toString().c_str(), e.toString().c_str(), version_iter->second); + return; + } + } auto throttle_token = _env._fileStorHandler.operation_throttler().blocking_acquire_one(); spi::Timestamp timestamp(e._entry._timestamp); if (!(e._entry._flags & (DELETED | DELETED_IN_PLACE))) { @@ -516,14 +527,14 @@ MergeHandler::applyDiffEntry(std::shared_ptr<ApplyBucketDiffState> async_results document::DocumentId docId = doc->getId(); auto complete = std::make_unique<ApplyBucketDiffEntryComplete>(std::move(async_results), std::move(docId), std::move(throttle_token), "put", - _clock, _env._metrics.merge_handler_metrics.put_latency); + _clock, _env._metrics.merge_handler_metrics.merge_put_latency); _spi.putAsync(bucket, timestamp, std::move(doc), std::move(complete)); } else { std::vector<spi::IdAndTimestamp> ids; ids.emplace_back(document::DocumentId(e._docName), timestamp); auto complete = std::make_unique<ApplyBucketDiffEntryComplete>(std::move(async_results), ids[0].id, std::move(throttle_token), "remove", - _clock, _env._metrics.merge_handler_metrics.remove_latency); + _clock, _env._metrics.merge_handler_metrics.merge_remove_latency); _spi.removeAsync(bucket, std::move(ids), std::move(complete)); } } @@ -548,6 +559,7 @@ MergeHandler::applyDiffLocally(const spi::Bucket& bucket, std::vector<api::Apply DocEntryList entries; populateMetaData(bucket, Timestamp::max(), entries, context); + const auto newest_versions = enumerate_newest_document_versions(diff); const document::DocumentTypeRepo & repo = _env.getDocumentTypeRepo(); uint32_t existingCount = entries.size(); @@ -580,7 +592,7 @@ MergeHandler::applyDiffLocally(const spi::Bucket& bucket, std::vector<api::Apply ++i; LOG(spam, "ApplyBucketDiff(%s): Adding slot %s", bucket.toString().c_str(), e.toString().c_str()); - applyDiffEntry(async_results, bucket, e, repo); + applyDiffEntry(async_results, bucket, e, repo, newest_versions); } else { assert(spi::Timestamp(e._entry._timestamp) == existing.getTimestamp()); // Diffing for existing timestamp; should either both be put @@ -591,7 +603,7 @@ MergeHandler::applyDiffLocally(const spi::Bucket& bucket, std::vector<api::Apply if ((e._entry._flags & DELETED) && !existing.isRemove()) { LOG(debug, "Slot in diff is remove for existing timestamp in %s. Diff slot: %s. Existing slot: %s", bucket.toString().c_str(), e.toString().c_str(), existing.toString().c_str()); - applyDiffEntry(async_results, bucket, e, repo); + applyDiffEntry(async_results, bucket, e, repo, newest_versions); } else { // Duplicate put, just ignore it. LOG(debug, "During diff apply, attempting to add slot whose timestamp already exists in %s, " @@ -619,7 +631,7 @@ MergeHandler::applyDiffLocally(const spi::Bucket& bucket, std::vector<api::Apply LOG(spam, "ApplyBucketDiff(%s): Adding slot %s", bucket.toString().c_str(), e.toString().c_str()); - applyDiffEntry(async_results, bucket, e, repo); + applyDiffEntry(async_results, bucket, e, repo, newest_versions); byteCount += e._headerBlob.size() + e._bodyBlob.size(); } if (byteCount + notNeededByteCount != 0) { @@ -631,6 +643,27 @@ MergeHandler::applyDiffLocally(const spi::Bucket& bucket, std::vector<api::Apply bucket.toString().c_str(), addedCount); } +MergeHandler::NewestDocumentVersionMapping +MergeHandler::enumerate_newest_document_versions(const std::vector<api::ApplyBucketDiffCommand::Entry>& diff) +{ + NewestDocumentVersionMapping newest_per_doc; + for (const auto& e : diff) { + // We expect the doc name to always be filled out, both for remove operations and for puts. + // But since the latter is technically redundant (ID is also found within the document), we + // guard on this to be forwards compatible in case this changes (e.g. to populate and use + // the GID field instead). Fallback to the legacy behavior if so. + if (e._docName.empty()) { + continue; + } + auto [existing_iter, inserted] = newest_per_doc.insert(std::make_pair(vespalib::stringref(e._docName), e._entry._timestamp)); + if (!inserted) { + assert(existing_iter != newest_per_doc.end()); + existing_iter->second = std::max(existing_iter->second, e._entry._timestamp); + } + } + return newest_per_doc; +} + void MergeHandler::sync_bucket_info(const spi::Bucket& bucket) const { diff --git a/storage/src/vespa/storage/persistence/mergehandler.h b/storage/src/vespa/storage/persistence/mergehandler.h index f3bef802229..2be45e7bc8b 100644 --- a/storage/src/vespa/storage/persistence/mergehandler.h +++ b/storage/src/vespa/storage/persistence/mergehandler.h @@ -18,6 +18,7 @@ #include <vespa/storageapi/message/bucket.h> #include <vespa/storage/common/cluster_context.h> #include <vespa/storage/common/messagesender.h> +#include <vespa/vespalib/stllike/hash_map.h> #include <vespa/vespalib/util/monitored_refcount.h> #include <vespa/storageframework/generic/clock/time.h> @@ -42,6 +43,8 @@ private: using MessageTrackerUP = std::unique_ptr<MessageTracker>; using Timestamp = framework::MicroSecTime; public: + using NewestDocumentVersionMapping = vespalib::hash_map<vespalib::stringref, api::Timestamp>; + enum StateFlag { IN_USE = 0x01, DELETED = 0x02, @@ -72,6 +75,17 @@ public: void handleApplyBucketDiffReply(api::ApplyBucketDiffReply&, MessageSender&, MessageTrackerUP) const; void drain_async_writes(); + /** + * Returns a mapping that, for each document ID, contains the newest version of that document that + * is present in the diff. + * + * The returned hash_map keys point directly into the `ApplyBucketDiffCommand::Entry::_docName` memory + * owned by `diff`, so this memory must remain unchanged and stable for the duration of the returned + * mapping's lifetime. + */ + static NewestDocumentVersionMapping enumerate_newest_document_versions( + const std::vector<api::ApplyBucketDiffCommand::Entry>& diff); + private: using DocEntryList = std::vector<std::unique_ptr<spi::DocEntry>>; const framework::Clock &_clock; @@ -90,9 +104,13 @@ private: /** * Invoke either put, remove or unrevertable remove on the SPI * depending on the flags in the diff entry. + * + * If `newest_doc_version` indicates that the entry is not the newest version present in the + * diff, the entry is silently ignored and is _not_ invoked on the SPI. */ void applyDiffEntry(std::shared_ptr<ApplyBucketDiffState> async_results, const spi::Bucket&, - const api::ApplyBucketDiffCommand::Entry&, const document::DocumentTypeRepo& repo) const; + const api::ApplyBucketDiffCommand::Entry&, const document::DocumentTypeRepo& repo, + const NewestDocumentVersionMapping& newest_per_doc) const; /** * Fill entries-vector with metadata for bucket up to maxTimestamp, diff --git a/storage/src/vespa/storage/storageserver/documentapiconverter.cpp b/storage/src/vespa/storage/storageserver/documentapiconverter.cpp index ca46e87285b..5b8052a05f8 100644 --- a/storage/src/vespa/storage/storageserver/documentapiconverter.cpp +++ b/storage/src/vespa/storage/storageserver/documentapiconverter.cpp @@ -54,6 +54,9 @@ DocumentApiConverter::toStorageAPI(documentapi::DocumentMessage& fromMsg) auto to = std::make_unique<api::UpdateCommand>(bucket, from.stealDocumentUpdate(), from.getNewTimestamp()); to->setOldTimestamp(from.getOldTimestamp()); to->setCondition(from.getCondition()); + if (from.has_cached_create_if_missing()) { + to->set_cached_create_if_missing(from.create_if_missing()); + } toMsg = std::move(to); break; } @@ -217,6 +220,9 @@ DocumentApiConverter::toDocumentAPI(api::StorageCommand& fromMsg) to->setOldTimestamp(from.getOldTimestamp()); to->setNewTimestamp(from.getTimestamp()); to->setCondition(from.getCondition()); + if (from.has_cached_create_if_missing()) { + to->set_cached_create_if_missing(from.create_if_missing()); + } toMsg = std::move(to); break; } diff --git a/storage/src/vespa/storage/storageserver/rpc/cluster_controller_api_rpc_service.cpp b/storage/src/vespa/storage/storageserver/rpc/cluster_controller_api_rpc_service.cpp index 4d74eb1974b..3b53c0c9584 100644 --- a/storage/src/vespa/storage/storageserver/rpc/cluster_controller_api_rpc_service.cpp +++ b/storage/src/vespa/storage/storageserver/rpc/cluster_controller_api_rpc_service.cpp @@ -124,7 +124,8 @@ void ClusterControllerApiRpcService::RPC_setSystemState2(FRT_RPCRequest* req) { req->GetParams()->GetValue(0)._string._len); lib::ClusterState systemState(systemStateStr); - auto cmd = std::make_shared<api::SetSystemStateCommand>(lib::ClusterStateBundle(systemState)); + auto bundle = std::make_shared<const lib::ClusterStateBundle>(systemState); + auto cmd = std::make_shared<api::SetSystemStateCommand>(std::move(bundle)); cmd->setPriority(api::StorageMessage::VERYHIGH); detach_and_forward_to_enqueuer(std::move(cmd), req); @@ -167,8 +168,7 @@ void ClusterControllerApiRpcService::RPC_setDistributionStates(FRT_RPCRequest* r } LOG(debug, "Got state bundle %s", state_bundle->toString().c_str()); - // TODO add constructor taking in shared_ptr directly instead? - auto cmd = std::make_shared<api::SetSystemStateCommand>(*state_bundle); + auto cmd = std::make_shared<api::SetSystemStateCommand>(std::move(state_bundle)); cmd->setPriority(api::StorageMessage::VERYHIGH); detach_and_forward_to_enqueuer(std::move(cmd), req); diff --git a/storage/src/vespa/storage/storageserver/storagenode.cpp b/storage/src/vespa/storage/storageserver/storagenode.cpp index f7a426a0527..35b70dd853c 100644 --- a/storage/src/vespa/storage/storageserver/storagenode.cpp +++ b/storage/src/vespa/storage/storageserver/storagenode.cpp @@ -140,7 +140,7 @@ StorageNode::initialize(const NodeStateReporter & nodeStateReporter) // Initializing state manager early, as others use it init time to // update node state according min used bits etc. // Needs node type to be set right away. Needs thread pool, index and - // dead lock detector too, but not before open() + // deadlock detector too, but not before open() _stateManager = std::make_unique<StateManager>( _context.getComponentRegister(), std::move(_hostInfo), @@ -148,10 +148,10 @@ StorageNode::initialize(const NodeStateReporter & nodeStateReporter) _singleThreadedDebugMode); _context.getComponentRegister().setNodeStateUpdater(*_stateManager); - // Create VDS root folder, in case it doesn't already exist. - // Maybe better to rather fail if it doesn't exist, but tests - // might break if we do that. Might alter later. - std::filesystem::create_directories(std::filesystem::path(_rootFolder)); + // Create storage root folder, in case it doesn't already exist. + if (!_rootFolder.empty()) { + std::filesystem::create_directories(std::filesystem::path(_rootFolder)); + } // else: running as part of unit tests initializeNodeSpecific(); @@ -192,13 +192,16 @@ StorageNode::initialize(const NodeStateReporter & nodeStateReporter) initializeStatusWebServer(); + if (server_config().writePidFileOnStartup) { + assert(!_rootFolder.empty()); // Write pid file as the last thing we do. If we fail initialization // due to an exception we won't run shutdown. Thus we won't remove the // pid file if something throws after writing it in initialization. // Initialize _pidfile here, such that we can know that we didn't create // it in shutdown code for shutdown during init. - _pidFile = _rootFolder + "/pidfile"; - writePidFile(_pidFile); + _pidFile = _rootFolder + "/pidfile"; + writePidFile(_pidFile); + } } void |