aboutsummaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2024-02-02 23:02:12 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2024-02-03 11:19:35 +0000
commit1e27c802c89362fd00009f2359730508a840e4af (patch)
tree7a110154ada614696513ee84130851a454c5e10a /storage
parent62bee4e38718e3fd670908f09cbf14ca0acc69cf (diff)
common_merge_chain_optimalization_minimum_size hardcoded at 64
Diffstat (limited to 'storage')
-rw-r--r--storage/src/tests/persistence/mergehandlertest.cpp4
-rw-r--r--storage/src/vespa/storage/persistence/mergehandler.cpp97
-rw-r--r--storage/src/vespa/storage/persistence/mergehandler.h44
-rw-r--r--storage/src/vespa/storage/persistence/persistencehandler.cpp4
4 files changed, 50 insertions, 99 deletions
diff --git a/storage/src/tests/persistence/mergehandlertest.cpp b/storage/src/tests/persistence/mergehandlertest.cpp
index f79235ae505..4bd0570efa8 100644
--- a/storage/src/tests/persistence/mergehandlertest.cpp
+++ b/storage/src/tests/persistence/mergehandlertest.cpp
@@ -170,11 +170,11 @@ struct MergeHandlerTest : PersistenceTestUtils {
MergeHandler createHandler(size_t maxChunkSize = 0x400000) {
return MergeHandler(getEnv(), getPersistenceProvider(),
- getEnv()._component.cluster_context(), getEnv()._component.getClock(), *_sequenceTaskExecutor, maxChunkSize, 64);
+ getEnv()._component.cluster_context(), getEnv()._component.getClock(), *_sequenceTaskExecutor, maxChunkSize);
}
MergeHandler createHandler(spi::PersistenceProvider & spi) {
return MergeHandler(getEnv(), spi,
- getEnv()._component.cluster_context(), getEnv()._component.getClock(), *_sequenceTaskExecutor, 4190208, 64);
+ getEnv()._component.cluster_context(), getEnv()._component.getClock(), *_sequenceTaskExecutor, 4190208);
}
std::shared_ptr<api::StorageMessage> get_queued_reply() {
diff --git a/storage/src/vespa/storage/persistence/mergehandler.cpp b/storage/src/vespa/storage/persistence/mergehandler.cpp
index b1e36147e30..7ee2d9f37bf 100644
--- a/storage/src/vespa/storage/persistence/mergehandler.cpp
+++ b/storage/src/vespa/storage/persistence/mergehandler.cpp
@@ -27,15 +27,13 @@ namespace storage {
MergeHandler::MergeHandler(PersistenceUtil& env, spi::PersistenceProvider& spi,
const ClusterContext& cluster_context, const framework::Clock & clock,
vespalib::ISequencedTaskExecutor& executor,
- uint32_t maxChunkSize,
- uint32_t commonMergeChainOptimalizationMinimumSize)
+ uint32_t maxChunkSize)
: _clock(clock),
_cluster_context(cluster_context),
_env(env),
_spi(spi),
_monitored_ref_count(std::make_unique<MonitoredRefCount>()),
_maxChunkSize(maxChunkSize),
- _commonMergeChainOptimalizationMinimumSize(commonMergeChainOptimalizationMinimumSize),
_executor(executor)
{
}
@@ -48,6 +46,8 @@ MergeHandler::~MergeHandler()
namespace {
+constexpr uint32_t COMMON_MERGE_CHAIN_OPTIMIZATION_SIZE = 64u;
+
constexpr int getDeleteFlag() {
// Referred into old slotfile code before. Where should this number come from?
return 2;
@@ -175,7 +175,7 @@ MergeHandler::buildBucketInfoList(
std::vector<api::GetBucketDiffCommand::Entry>& output,
spi::Context& context) const
{
- assert(output.size() == 0);
+ assert(output.empty());
assert(myNodeIndex < 16);
uint32_t oldSize = output.size();
using DbBucketInfo = api::BucketInfo;
@@ -487,13 +487,12 @@ MergeHandler::fetchLocalData(
}
document::Document::UP
-MergeHandler::deserializeDiffDocument(
- const api::ApplyBucketDiffCommand::Entry& e,
- const document::DocumentTypeRepo& repo) const
+MergeHandler::deserializeDiffDocument(const api::ApplyBucketDiffCommand::Entry& e,
+ const document::DocumentTypeRepo& repo) const
{
auto doc = std::make_unique<document::Document>();
vespalib::nbostream hbuf(&e._headerBlob[0], e._headerBlob.size());
- if (e._bodyBlob.size() > 0) {
+ if (!e._bodyBlob.empty()) {
// TODO Remove this branch and add warning on error.
vespalib::nbostream bbuf(&e._bodyBlob[0], e._bodyBlob.size());
doc->deserialize(repo, hbuf, bbuf);
@@ -533,17 +532,13 @@ MergeHandler::applyDiffEntry(std::shared_ptr<ApplyBucketDiffState> async_results
* Apply the diffs needed locally.
*/
void
-MergeHandler::applyDiffLocally(
- const spi::Bucket& bucket,
- std::vector<api::ApplyBucketDiffCommand::Entry>& diff,
- uint8_t nodeIndex,
- spi::Context& context,
- std::shared_ptr<ApplyBucketDiffState> async_results) const
+MergeHandler::applyDiffLocally(const spi::Bucket& bucket, std::vector<api::ApplyBucketDiffCommand::Entry>& diff,
+ uint8_t nodeIndex, spi::Context& context,
+ const std::shared_ptr<ApplyBucketDiffState> & async_results) const
{
// Sort the data to apply by which file they should be added to
LOG(spam, "Merge(%s): Applying data locally. Diff has %zu entries",
- bucket.toString().c_str(),
- diff.size());
+ bucket.toString().c_str(), diff.size());
uint32_t nodeMask = 1 << nodeIndex;
uint32_t byteCount = 0;
uint32_t addedCount = 0;
@@ -563,9 +558,8 @@ MergeHandler::applyDiffLocally(
if (spi::Timestamp(e._entry._timestamp) > existing.getTimestamp()) {
++j;
- LOG(spam, "ApplyBucketDiff(%s): slot %s not in diff and "
- "already present in persistence", bucket.toString().c_str(),
- existing.toString().c_str());
+ LOG(spam, "ApplyBucketDiff(%s): slot %s not in diff and already present in persistence",
+ bucket.toString().c_str(), existing.toString().c_str());
continue;
}
if ((e._entry._hasMask & nodeMask) != 0) {
@@ -576,8 +570,7 @@ MergeHandler::applyDiffLocally(
}
if (!e.filled()) {
++i;
- LOG(debug, "Failed to apply unretrieved entry %s to diff "
- "locally on %s. Entry was probably compacted away.",
+ LOG(debug, "Failed to apply unretrieved entry %s to diff locally on %s. Entry was probably compacted away.",
e.toString().c_str(), bucket.toString().c_str());
continue;
}
@@ -596,19 +589,14 @@ MergeHandler::applyDiffLocally(
++i;
++j;
if ((e._entry._flags & DELETED) && !existing.isRemove()) {
- LOG(debug, "Slot in diff is remove for existing "
- "timestamp in %s. Diff slot: %s. Existing slot: %s",
- bucket.toString().c_str(), e.toString().c_str(),
- existing.toString().c_str());
+ LOG(debug, "Slot in diff is remove for existing timestamp in %s. Diff slot: %s. Existing slot: %s",
+ bucket.toString().c_str(), e.toString().c_str(), existing.toString().c_str());
applyDiffEntry(async_results, bucket, e, repo);
} else {
// Duplicate put, just ignore it.
- LOG(debug, "During diff apply, attempting to add slot "
- "whose timestamp already exists in %s, but assuming "
- "these are for the same entry--ignoring it. "
- "Diff slot: %s. Existing slot: %s",
- bucket.toString().c_str(), e.toString().c_str(),
- existing.toString().c_str());
+ LOG(debug, "During diff apply, attempting to add slot whose timestamp already exists in %s, "
+ " but assuming these are for the same entry--ignoring it. Diff slot: %s. Existing slot: %s",
+ bucket.toString().c_str(), e.toString().c_str(), existing.toString().c_str());
}
continue;
}
@@ -623,8 +611,7 @@ MergeHandler::applyDiffLocally(
continue;
}
if (!e.filled()) {
- LOG(debug, "Failed to apply unretrieved entry %s to diff "
- "locally on %s. Entry was probably compacted away.",
+ LOG(debug, "Failed to apply unretrieved entry %s to diff locally on %s. Entry was probably compacted away.",
e.toString().c_str(), bucket.toString().c_str());
continue;
}
@@ -650,19 +637,13 @@ MergeHandler::sync_bucket_info(const spi::Bucket& bucket) const
spi::BucketInfoResult infoResult(_spi.getBucketInfo(bucket));
if (infoResult.getErrorCode() != spi::Result::ErrorType::NONE) {
LOG(warning, "Failed to get bucket info for %s: %s",
- bucket.toString().c_str(),
- infoResult.getErrorMessage().c_str());
- throw std::runtime_error("Failed to invoke getBucketInfo on "
- "persistence provider");
+ bucket.toString().c_str(), infoResult.getErrorMessage().c_str());
+ throw std::runtime_error("Failed to invoke getBucketInfo on persistence provider");
}
const spi::BucketInfo& tmpInfo(infoResult.getBucketInfo());
- api::BucketInfo providerInfo(tmpInfo.getChecksum(),
- tmpInfo.getDocumentCount(),
- tmpInfo.getDocumentSize(),
- tmpInfo.getEntryCount(),
- tmpInfo.getUsedSize(),
- tmpInfo.isReady(),
- tmpInfo.isActive());
+ api::BucketInfo providerInfo(tmpInfo.getChecksum(), tmpInfo.getDocumentCount(), tmpInfo.getDocumentSize(),
+ tmpInfo.getEntryCount(), tmpInfo.getUsedSize(),
+ tmpInfo.isReady(), tmpInfo.isActive());
_env.updateBucketDatabase(bucket.getBucket(), providerInfo);
}
@@ -698,9 +679,7 @@ MergeHandler::processBucketMerge(const spi::Bucket& bucket, MergeStatus& status,
// If last action failed, fail the whole merge
if (status.reply->getResult().failed()) {
LOG(warning, "Done with merge of %s (failed: %s) %s",
- bucket.toString().c_str(),
- status.reply->getResult().toString().c_str(),
- status.toString().c_str());
+ bucket.toString().c_str(), status.reply->getResult().toString().c_str(), status.toString().c_str());
return status.reply;
}
@@ -732,13 +711,9 @@ MergeHandler::processBucketMerge(const spi::Bucket& bucket, MergeStatus& status,
cmd = std::make_shared<api::ApplyBucketDiffCommand>(bucket.getBucket(), nodes);
cmd->setAddress(createAddress(_cluster_context.cluster_name_ptr(), nodes[1].index));
- findCandidates(status,
- active_nodes_mask,
- true,
- 1 << (status.nodeList.size() - 1),
- 1 << (nodes.size() - 1),
- *cmd);
- if (cmd->getDiff().size() != 0) {
+ findCandidates(status, active_nodes_mask, true, 1 << (status.nodeList.size() - 1),
+ 1 << (nodes.size() - 1), *cmd);
+ if (!cmd->getDiff().empty()) {
break;
}
cmd.reset();
@@ -748,8 +723,7 @@ MergeHandler::processBucketMerge(const spi::Bucket& bucket, MergeStatus& status,
active_nodes_mask = (1u << status.nodeList.size()) - 1;
// If only one node left in the merge, return ok.
if (status.nodeList.size() == 1) {
- LOG(debug, "Done with merge of %s as there is only one node "
- "that is not source only left in the merge.",
+ LOG(debug, "Done with merge of %s as there is only one node that is not source only left in the merge.",
bucket.toString().c_str());
return status.reply;
}
@@ -777,11 +751,8 @@ MergeHandler::processBucketMerge(const spi::Bucket& bucket, MergeStatus& status,
if (e.first == 0u) {
continue;
}
- if (e.second >= uint32_t(_commonMergeChainOptimalizationMinimumSize)
- || counts.size() == 1)
- {
- LOG(spam, "Sending separate apply bucket diff for path %x "
- "with size %u",
+ if ((e.second >= COMMON_MERGE_CHAIN_OPTIMIZATION_SIZE) || (counts.size() == 1)) {
+ LOG(spam, "Sending separate apply bucket diff for path %x with size %u",
e.first, e.second);
std::vector<api::MergeBucketCommand::Node> nodes;
// This node always has to be first in chain.
@@ -837,7 +808,7 @@ MergeHandler::processBucketMerge(const spi::Bucket& bucket, MergeStatus& status,
status.pendingId = cmd->getMsgId();
LOG(debug, "Sending %s", cmd->toString().c_str());
sender.sendCommand(cmd);
- return api::StorageReply::SP();
+ return {};
}
/** Ensures merge states are deleted if we fail operation */
@@ -1203,7 +1174,7 @@ MergeHandler::handleGetBucketDiffReply(api::GetBucketDiffReply& reply, MessageSe
assert(reply.getNodes().size() >= 2);
// Get bucket diff should retrieve all info at once
- assert(s->diff.size() == 0);
+ assert(s->diff.empty());
s->diff.insert(s->diff.end(),
reply.getDiff().begin(),
reply.getDiff().end());
diff --git a/storage/src/vespa/storage/persistence/mergehandler.h b/storage/src/vespa/storage/persistence/mergehandler.h
index 0f9f01b4cb1..f3bef802229 100644
--- a/storage/src/vespa/storage/persistence/mergehandler.h
+++ b/storage/src/vespa/storage/persistence/mergehandler.h
@@ -51,26 +51,17 @@ public:
MergeHandler(PersistenceUtil& env, spi::PersistenceProvider& spi,
const ClusterContext& cluster_context, const framework::Clock & clock,
vespalib::ISequencedTaskExecutor& executor,
- uint32_t maxChunkSize = 4190208,
- uint32_t commonMergeChainOptimalizationMinimumSize = 64);
+ uint32_t maxChunkSize = 4190208);
~MergeHandler() override;
- bool buildBucketInfoList(
- const spi::Bucket& bucket,
- Timestamp maxTimestamp,
- uint8_t myNodeIndex,
- std::vector<api::GetBucketDiffCommand::Entry>& output,
- spi::Context& context) const;
- void fetchLocalData(const spi::Bucket& bucket,
- std::vector<api::ApplyBucketDiffCommand::Entry>& diff,
- uint8_t nodeIndex,
- spi::Context& context) const;
- void applyDiffLocally(const spi::Bucket& bucket,
- std::vector<api::ApplyBucketDiffCommand::Entry>& diff,
- uint8_t nodeIndex,
- spi::Context& context,
- std::shared_ptr<ApplyBucketDiffState> async_results) const;
+ bool buildBucketInfoList(const spi::Bucket& bucket, Timestamp maxTimestamp, uint8_t myNodeIndex,
+ std::vector<api::GetBucketDiffCommand::Entry>& output, spi::Context& context) const;
+ void fetchLocalData(const spi::Bucket& bucket, std::vector<api::ApplyBucketDiffCommand::Entry>& diff,
+ uint8_t nodeIndex, spi::Context& context) const;
+ void applyDiffLocally(const spi::Bucket& bucket, std::vector<api::ApplyBucketDiffCommand::Entry>& diff,
+ uint8_t nodeIndex, spi::Context& context,
+ const std::shared_ptr<ApplyBucketDiffState> & async_results) const;
void sync_bucket_info(const spi::Bucket& bucket) const override;
void schedule_delayed_delete(std::unique_ptr<ApplyBucketDiffState>) const override;
@@ -89,35 +80,26 @@ private:
spi::PersistenceProvider &_spi;
std::unique_ptr<vespalib::MonitoredRefCount> _monitored_ref_count;
const uint32_t _maxChunkSize;
- const uint32_t _commonMergeChainOptimalizationMinimumSize;
vespalib::ISequencedTaskExecutor& _executor;
MessageTrackerUP handleGetBucketDiffStage2(api::GetBucketDiffCommand&, MessageTrackerUP) const;
/** Returns a reply if merge is complete */
- api::StorageReply::SP processBucketMerge(const spi::Bucket& bucket,
- MergeStatus& status,
- MessageSender& sender,
- spi::Context& context,
- std::shared_ptr<ApplyBucketDiffState>& async_results) const;
+ api::StorageReply::SP processBucketMerge(const spi::Bucket& bucket, MergeStatus& status, MessageSender& sender,
+ spi::Context& context, std::shared_ptr<ApplyBucketDiffState>& async_results) const;
/**
* Invoke either put, remove or unrevertable remove on the SPI
* depending on the flags in the diff entry.
*/
- void applyDiffEntry(std::shared_ptr<ApplyBucketDiffState> async_results,
- const spi::Bucket&,
- const api::ApplyBucketDiffCommand::Entry&,
- const document::DocumentTypeRepo& repo) const;
+ void applyDiffEntry(std::shared_ptr<ApplyBucketDiffState> async_results, const spi::Bucket&,
+ const api::ApplyBucketDiffCommand::Entry&, const document::DocumentTypeRepo& repo) const;
/**
* Fill entries-vector with metadata for bucket up to maxTimestamp,
* sorted ascendingly on entry timestamp.
* Throws std::runtime_error upon iteration failure.
*/
- void populateMetaData(const spi::Bucket&,
- Timestamp maxTimestamp,
- DocEntryList & entries,
- spi::Context& context) const;
+ void populateMetaData(const spi::Bucket&, Timestamp maxTimestamp, DocEntryList & entries, spi::Context& context) const;
std::unique_ptr<document::Document>
deserializeDiffDocument(const api::ApplyBucketDiffCommand::Entry& e, const document::DocumentTypeRepo& repo) const;
diff --git a/storage/src/vespa/storage/persistence/persistencehandler.cpp b/storage/src/vespa/storage/persistence/persistencehandler.cpp
index 2f7fbb99290..87c1f83794e 100644
--- a/storage/src/vespa/storage/persistence/persistencehandler.cpp
+++ b/storage/src/vespa/storage/persistence/persistencehandler.cpp
@@ -19,9 +19,7 @@ PersistenceHandler::PersistenceHandler(vespalib::ISequencedTaskExecutor & sequen
: _clock(component.getClock()),
_env(component, filestorHandler, metrics, provider),
_processAllHandler(_env, provider),
- _mergeHandler(_env, provider, component.cluster_context(), _clock, sequencedExecutor,
- cfg.bucketMergeChunkSize,
- cfg.commonMergeChainOptimalizationMinimumSize),
+ _mergeHandler(_env, provider, component.cluster_context(), _clock, sequencedExecutor, cfg.bucketMergeChunkSize),
_asyncHandler(_env, provider, bucketOwnershipNotifier, sequencedExecutor, component.getBucketIdFactory()),
_splitJoinHandler(_env, provider, bucketOwnershipNotifier, cfg.enableMultibitSplitOptimalization),
_simpleHandler(_env, provider, component.getBucketIdFactory())