summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--configdefinitions/src/vespa/stor-filestor.def1
-rw-r--r--storage/src/tests/persistence/filestorage/mergeblockingtest.cpp2
-rw-r--r--storage/src/tests/persistence/mergehandlertest.cpp13
-rw-r--r--storage/src/tests/storageserver/mergethrottlertest.cpp3
-rw-r--r--storage/src/vespa/storage/persistence/mergehandler.cpp45
-rw-r--r--storage/src/vespa/storage/persistence/mergehandler.h6
-rw-r--r--storage/src/vespa/storage/persistence/persistencehandler.cpp1
-rw-r--r--storageapi/src/tests/mbusprot/storageprotocoltest.cpp2
-rw-r--r--storageapi/src/vespa/storageapi/mbusprot/protocolserialization4_2.cpp8
-rw-r--r--storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.cpp4
-rw-r--r--storageapi/src/vespa/storageapi/message/bucket.cpp9
-rw-r--r--storageapi/src/vespa/storageapi/message/bucket.h9
12 files changed, 31 insertions, 72 deletions
diff --git a/configdefinitions/src/vespa/stor-filestor.def b/configdefinitions/src/vespa/stor-filestor.def
index 1cec77832a7..b498eea3ff2 100644
--- a/configdefinitions/src/vespa/stor-filestor.def
+++ b/configdefinitions/src/vespa/stor-filestor.def
@@ -57,6 +57,7 @@ bucket_merge_chunk_size int default=4190208 restart
## consumption might increase in a 4.1 to 4.2 upgrade due to this, as 4.1
## dont support to only fill in part of the metadata provided and will always
## fill all.
+## NB unused and will be removed shortly.
enable_merge_local_node_choose_docs_optimalization bool default=true restart
## Whether or not to enable the multibit split optimalization. This is useful
diff --git a/storage/src/tests/persistence/filestorage/mergeblockingtest.cpp b/storage/src/tests/persistence/filestorage/mergeblockingtest.cpp
index 58f6587e087..35aee60d30f 100644
--- a/storage/src/tests/persistence/filestorage/mergeblockingtest.cpp
+++ b/storage/src/tests/persistence/filestorage/mergeblockingtest.cpp
@@ -72,7 +72,7 @@ createGetDiff(const document::BucketId& bucket,
std::shared_ptr<api::ApplyBucketDiffCommand>
createApplyDiff(const document::BucketId& bucket,
const std::vector<api::MergeBucketCommand::Node>& nodes) {
- auto cmd = std::make_shared<api::ApplyBucketDiffCommand>(makeDocumentBucket(bucket), nodes, 1024*1024);
+ auto cmd = std::make_shared<api::ApplyBucketDiffCommand>(makeDocumentBucket(bucket), nodes);
assignCommandMeta(*cmd);
return cmd;
}
diff --git a/storage/src/tests/persistence/mergehandlertest.cpp b/storage/src/tests/persistence/mergehandlertest.cpp
index 8aacb6027e7..cda570a4396 100644
--- a/storage/src/tests/persistence/mergehandlertest.cpp
+++ b/storage/src/tests/persistence/mergehandlertest.cpp
@@ -285,7 +285,7 @@ MergeHandlerTest::testApplyBucketDiffChain(bool midChain)
MergeHandler handler = createHandler();
LOG(debug, "Verifying that apply bucket diff is sent on");
- auto cmd = std::make_shared<api::ApplyBucketDiffCommand>(_bucket, _nodes, _maxTimestamp);
+ auto cmd = std::make_shared<api::ApplyBucketDiffCommand>(_bucket, _nodes);
MessageTracker::UP tracker1 = handler.handleApplyBucketDiff(*cmd, createTracker(cmd, _bucket));
api::StorageMessage::SP replySent = std::move(*tracker1).stealReplySP();
@@ -510,7 +510,7 @@ TEST_F(MergeHandlerTest, chunk_limit_partially_filled_diff) {
}
setUpChain(MIDDLE);
- auto applyBucketDiffCmd = std::make_shared<api::ApplyBucketDiffCommand>(_bucket, _nodes, maxChunkSize);
+ auto applyBucketDiffCmd = std::make_shared<api::ApplyBucketDiffCommand>(_bucket, _nodes);
applyBucketDiffCmd->getDiff() = applyDiff;
MergeHandler handler = createHandler(maxChunkSize);
@@ -591,15 +591,14 @@ MergeHandlerTest::createDummyApplyDiff(int timestampOffset,
fillDummyApplyDiff(applyDiff);
}
- auto applyBucketDiffCmd = std::make_shared<api::ApplyBucketDiffCommand>(_bucket, _nodes, 1024*1024);
+ auto applyBucketDiffCmd = std::make_shared<api::ApplyBucketDiffCommand>(_bucket, _nodes);
applyBucketDiffCmd->getDiff() = applyDiff;
return applyBucketDiffCmd;
}
// Must match up with diff used in createDummyApplyDiff
std::shared_ptr<api::GetBucketDiffCommand>
-MergeHandlerTest::createDummyGetBucketDiff(int timestampOffset,
- uint16_t hasMask)
+MergeHandlerTest::createDummyGetBucketDiff(int timestampOffset, uint16_t hasMask)
{
std::vector<api::GetBucketDiffCommand::Entry> diff;
{
@@ -726,7 +725,7 @@ TEST_F(MergeHandlerTest, entry_removed_after_get_bucket_diff) {
applyDiff.push_back(e);
}
setUpChain(BACK);
- auto applyBucketDiffCmd = std::make_shared<api::ApplyBucketDiffCommand>(_bucket, _nodes, 1024*1024);
+ auto applyBucketDiffCmd = std::make_shared<api::ApplyBucketDiffCommand>(_bucket, _nodes);
applyBucketDiffCmd->getDiff() = applyDiff;
auto tracker = handler.handleApplyBucketDiff(*applyBucketDiffCmd, createTracker(applyBucketDiffCmd, _bucket));
@@ -1148,7 +1147,7 @@ TEST_F(MergeHandlerTest, remove_put_on_existing_timestamp) {
applyDiff.push_back(e);
}
- auto applyBucketDiffCmd = std::make_shared<api::ApplyBucketDiffCommand>(_bucket, _nodes, 1024*1024);
+ auto applyBucketDiffCmd = std::make_shared<api::ApplyBucketDiffCommand>(_bucket, _nodes);
applyBucketDiffCmd->getDiff() = applyDiff;
auto tracker = handler.handleApplyBucketDiff(*applyBucketDiffCmd, createTracker(applyBucketDiffCmd, _bucket));
diff --git a/storage/src/tests/storageserver/mergethrottlertest.cpp b/storage/src/tests/storageserver/mergethrottlertest.cpp
index e6067fa561c..0271af85fef 100644
--- a/storage/src/tests/storageserver/mergethrottlertest.cpp
+++ b/storage/src/tests/storageserver/mergethrottlertest.cpp
@@ -1367,8 +1367,7 @@ TEST_F(MergeThrottlerTest, get_bucket_diff_command_not_in_active_set_is_rejected
TEST_F(MergeThrottlerTest, apply_bucket_diff_command_not_in_active_set_is_rejected) {
document::BucketId bucket(16, 1234);
std::vector<api::GetBucketDiffCommand::Node> nodes;
- auto applyDiffCmd = std::make_shared<api::ApplyBucketDiffCommand>(
- makeDocumentBucket(bucket), nodes, api::Timestamp(1234));
+ auto applyDiffCmd = std::make_shared<api::ApplyBucketDiffCommand>(makeDocumentBucket(bucket), nodes);
ASSERT_NO_FATAL_FAILURE(sendAndExpectReply(applyDiffCmd,
api::MessageType::APPLYBUCKETDIFF_REPLY,
diff --git a/storage/src/vespa/storage/persistence/mergehandler.cpp b/storage/src/vespa/storage/persistence/mergehandler.cpp
index c7c681a838b..cabd74f6777 100644
--- a/storage/src/vespa/storage/persistence/mergehandler.cpp
+++ b/storage/src/vespa/storage/persistence/mergehandler.cpp
@@ -17,14 +17,12 @@ namespace storage {
MergeHandler::MergeHandler(PersistenceUtil& env, spi::PersistenceProvider& spi,
const vespalib::string & clusterName, const framework::Clock & clock,
uint32_t maxChunkSize,
- bool enableMergeLocalNodeChooseDocsOptimalization,
uint32_t commonMergeChainOptimalizationMinimumSize)
: _clock(clock),
_clusterName(clusterName),
_env(env),
_spi(spi),
_maxChunkSize(maxChunkSize),
- _enableMergeLocalNodeChooseDocsOptimalization(enableMergeLocalNodeChooseDocsOptimalization),
_commonMergeChainOptimalizationMinimumSize(commonMergeChainOptimalizationMinimumSize)
{
}
@@ -653,26 +651,14 @@ MergeHandler::applyDiffLocally(
}
namespace {
- void findCandidates(const document::BucketId& id, MergeStatus& status,
- bool constrictHasMask, uint16_t hasMask,
- uint16_t newHasMask,
- uint32_t maxSize, api::ApplyBucketDiffCommand& cmd)
+ void findCandidates(MergeStatus& status, bool constrictHasMask, uint16_t hasMask,
+ uint16_t newHasMask, api::ApplyBucketDiffCommand& cmd)
{
uint32_t chunkSize = 0;
for (const auto& entry : status.diff) {
if (constrictHasMask && entry._hasMask != hasMask) {
continue;
}
- if (chunkSize != 0 &&
- chunkSize + entry._bodySize + entry._headerSize > maxSize)
- {
- LOG(spam, "Merge of %s used %d bytes, max is %d. Will "
- "fetch in next merge round.",
- id.toString().c_str(),
- chunkSize + entry._bodySize + entry._headerSize,
- maxSize);
- break;
- }
chunkSize += entry._bodySize + entry._headerSize;
cmd.getDiff().emplace_back(entry);
if (constrictHasMask) {
@@ -717,21 +703,12 @@ MergeHandler::processBucketMerge(const spi::Bucket& bucket, MergeStatus& status,
nodes.push_back(status.nodeList.back());
assert(nodes.size() > 1);
- // Add all the metadata, and thus use big limit. Max
- // data to fetch parameter will control amount added.
- uint32_t maxSize =
- (_enableMergeLocalNodeChooseDocsOptimalization
- ? std::numeric_limits<uint32_t>::max()
- : _maxChunkSize);
-
- cmd = std::make_shared<api::ApplyBucketDiffCommand>(bucket.getBucket(), nodes, maxSize);
+ cmd = std::make_shared<api::ApplyBucketDiffCommand>(bucket.getBucket(), nodes);
cmd->setAddress(createAddress(_clusterName, nodes[1].index));
- findCandidates(bucket.getBucketId(),
- status,
+ findCandidates(status,
true,
1 << (status.nodeList.size() - 1),
1 << (nodes.size() - 1),
- maxSize,
*cmd);
if (cmd->getDiff().size() != 0) break;
cmd.reset();
@@ -788,15 +765,11 @@ MergeHandler::processBucketMerge(const spi::Bucket& bucket, MergeStatus& status,
newMask = 1 << (nodes.size() - 1);
}
assert(nodes.size() > 1);
- uint32_t maxSize =
- (_enableMergeLocalNodeChooseDocsOptimalization
- ? std::numeric_limits<uint32_t>::max()
- : _maxChunkSize);
- cmd = std::make_shared<api::ApplyBucketDiffCommand>(bucket.getBucket(), nodes, maxSize);
+ cmd = std::make_shared<api::ApplyBucketDiffCommand>(bucket.getBucket(), nodes);
cmd->setAddress(createAddress(_clusterName, nodes[1].index));
// Add all the metadata, and thus use big limit. Max
// data to fetch parameter will control amount added.
- findCandidates(bucket.getBucketId(), status, true, e.first, newMask, maxSize, *cmd);
+ findCandidates(status, true, e.first, newMask, *cmd);
break;
}
}
@@ -805,9 +778,9 @@ MergeHandler::processBucketMerge(const spi::Bucket& bucket, MergeStatus& status,
// If we found no group big enough to handle on its own, do a common
// merge to merge the remaining data.
if ( ! cmd ) {
- cmd = std::make_shared<api::ApplyBucketDiffCommand>(bucket.getBucket(), status.nodeList, _maxChunkSize);
+ cmd = std::make_shared<api::ApplyBucketDiffCommand>(bucket.getBucket(), status.nodeList);
cmd->setAddress(createAddress(_clusterName, status.nodeList[1].index));
- findCandidates(bucket.getBucketId(), status, false, 0, 0, _maxChunkSize, *cmd);
+ findCandidates(status, false, 0, 0, *cmd);
}
cmd->setPriority(status.context.getPriority());
cmd->setTimeout(status.timeout);
@@ -1312,7 +1285,7 @@ MergeHandler::handleApplyBucketDiff(api::ApplyBucketDiffCommand& cmd, MessageTra
LOG(spam, "Sending ApplyBucketDiff for %s on to node %d",
bucket.toString().c_str(), cmd.getNodes()[index + 1].index);
- auto cmd2 = std::make_shared<api::ApplyBucketDiffCommand>(bucket.getBucket(), cmd.getNodes(), cmd.getMaxBufferSize());
+ auto cmd2 = std::make_shared<api::ApplyBucketDiffCommand>(bucket.getBucket(), cmd.getNodes());
cmd2->setAddress(createAddress(_clusterName, cmd.getNodes()[index + 1].index));
cmd2->getDiff().swap(cmd.getDiff());
cmd2->setPriority(cmd.getPriority());
diff --git a/storage/src/vespa/storage/persistence/mergehandler.h b/storage/src/vespa/storage/persistence/mergehandler.h
index 070f72c44df..5e65e1a39ec 100644
--- a/storage/src/vespa/storage/persistence/mergehandler.h
+++ b/storage/src/vespa/storage/persistence/mergehandler.h
@@ -37,7 +37,6 @@ public:
MergeHandler(PersistenceUtil& env, spi::PersistenceProvider& spi,
const vespalib::string & clusterName, const framework::Clock & clock,
uint32_t maxChunkSize = 4190208,
- bool enableMergeLocalNodeChooseDocsOptimalization = true,
uint32_t commonMergeChainOptimalizationMinimumSize = 64);
bool buildBucketInfoList(
@@ -70,9 +69,8 @@ private:
const vespalib::string &_clusterName;
PersistenceUtil &_env;
spi::PersistenceProvider &_spi;
- uint32_t _maxChunkSize;
- bool _enableMergeLocalNodeChooseDocsOptimalization;
- uint32_t _commonMergeChainOptimalizationMinimumSize;
+ const uint32_t _maxChunkSize;
+ const uint32_t _commonMergeChainOptimalizationMinimumSize;
/** Returns a reply if merge is complete */
api::StorageReply::SP processBucketMerge(const spi::Bucket& bucket,
diff --git a/storage/src/vespa/storage/persistence/persistencehandler.cpp b/storage/src/vespa/storage/persistence/persistencehandler.cpp
index b9d5d9affa0..c3751abb7d9 100644
--- a/storage/src/vespa/storage/persistence/persistencehandler.cpp
+++ b/storage/src/vespa/storage/persistence/persistencehandler.cpp
@@ -19,7 +19,6 @@ PersistenceHandler::PersistenceHandler(vespalib::ISequencedTaskExecutor & sequen
_processAllHandler(_env, provider),
_mergeHandler(_env, provider, component.getClusterName(), _clock,
cfg.bucketMergeChunkSize,
- cfg.enableMergeLocalNodeChooseDocsOptimalization,
cfg.commonMergeChainOptimalizationMinimumSize),
_asyncHandler(_env, provider, sequencedExecutor, component.getBucketIdFactory()),
_splitJoinHandler(_env, provider, bucketOwnershipNotifier, cfg.enableMultibitSplitOptimalization),
diff --git a/storageapi/src/tests/mbusprot/storageprotocoltest.cpp b/storageapi/src/tests/mbusprot/storageprotocoltest.cpp
index d1de3bf5afb..9fd2c96f27e 100644
--- a/storageapi/src/tests/mbusprot/storageprotocoltest.cpp
+++ b/storageapi/src/tests/mbusprot/storageprotocoltest.cpp
@@ -688,7 +688,7 @@ TEST_P(StorageProtocolTest, apply_bucket_diff) {
nodes.push_back(13);
std::vector<ApplyBucketDiffCommand::Entry> entries = {dummy_apply_entry()};
- auto cmd = std::make_shared<ApplyBucketDiffCommand>(_bucket, nodes, 1234);
+ auto cmd = std::make_shared<ApplyBucketDiffCommand>(_bucket, nodes);
cmd->getDiff() = entries;
auto cmd2 = copyCommand(cmd);
EXPECT_EQ(_bucket, cmd2->getBucket());
diff --git a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization4_2.cpp b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization4_2.cpp
index 13fba8b8508..c57b991eb36 100644
--- a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization4_2.cpp
+++ b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization4_2.cpp
@@ -193,7 +193,7 @@ void ProtocolSerialization4_2::onEncode(GBBuf& buf, const api::ApplyBucketDiffCo
buf.putShort(nodes[i].index);
buf.putBoolean(nodes[i].sourceOnly);
}
- buf.putInt(msg.getMaxBufferSize());
+ buf.putInt(0x400000);
const std::vector<api::ApplyBucketDiffCommand::Entry>& entries(msg.getDiff());
buf.putInt(entries.size());
for (uint32_t i=0; i<entries.size(); ++i) {
@@ -220,12 +220,12 @@ ProtocolSerialization4_2::onDecodeApplyBucketDiffCommand(BBuf& buf) const
bool sourceOnly = SH::getBoolean(buf);
nodes.push_back(Node(index, sourceOnly));
}
- uint32_t maxBufferSize(SH::getInt(buf));
- auto msg = std::make_unique<api::ApplyBucketDiffCommand>(bucket, nodes, maxBufferSize);
+ (void) SH::getInt(buf); // Unused field
+ auto msg = std::make_unique<api::ApplyBucketDiffCommand>(bucket, nodes);
std::vector<api::ApplyBucketDiffCommand::Entry>& entries(msg->getDiff());
uint32_t entryCount = SH::getInt(buf);
if (entryCount > buf.getRemaining()) {
- // Trigger out of bounds exception rather than out of memory error
+ // Trigger out of bounds exception rather than out of memory error
buf.incPos(entryCount);
}
entries.resize(entryCount);
diff --git a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.cpp b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.cpp
index 166925382bd..5d66d86036c 100644
--- a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.cpp
+++ b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.cpp
@@ -944,7 +944,7 @@ void fill_proto_apply_diff_vector(::google::protobuf::RepeatedPtrField<protobuf:
void ProtocolSerialization7::onEncode(GBBuf& buf, const api::ApplyBucketDiffCommand& msg) const {
encode_bucket_request<protobuf::ApplyBucketDiffRequest>(buf, msg, [&](auto& req) {
set_merge_nodes(*req.mutable_nodes(), msg.getNodes());
- req.set_max_buffer_size(msg.getMaxBufferSize());
+ req.set_max_buffer_size(0x400000); // Unused, GC soon.
fill_proto_apply_diff_vector(*req.mutable_entries(), msg.getDiff());
});
}
@@ -958,7 +958,7 @@ void ProtocolSerialization7::onEncode(GBBuf& buf, const api::ApplyBucketDiffRepl
api::StorageCommand::UP ProtocolSerialization7::onDecodeApplyBucketDiffCommand(BBuf& buf) const {
return decode_bucket_request<protobuf::ApplyBucketDiffRequest>(buf, [&](auto& req, auto& bucket) {
auto nodes = get_merge_nodes(req.nodes());
- auto cmd = std::make_unique<api::ApplyBucketDiffCommand>(bucket, std::move(nodes), req.max_buffer_size());
+ auto cmd = std::make_unique<api::ApplyBucketDiffCommand>(bucket, std::move(nodes));
fill_api_apply_diff_vector(cmd->getDiff(), req.entries());
return cmd;
});
diff --git a/storageapi/src/vespa/storageapi/message/bucket.cpp b/storageapi/src/vespa/storageapi/message/bucket.cpp
index ced71f5326b..1262c5db632 100644
--- a/storageapi/src/vespa/storageapi/message/bucket.cpp
+++ b/storageapi/src/vespa/storageapi/message/bucket.cpp
@@ -341,12 +341,10 @@ ApplyBucketDiffCommand::Entry::operator==(const Entry& e) const
}
ApplyBucketDiffCommand::ApplyBucketDiffCommand(
- const document::Bucket &bucket, const std::vector<Node>& nodes,
- uint32_t maxBufferSize)
+ const document::Bucket &bucket, const std::vector<Node>& nodes)
: BucketInfoCommand(MessageType::APPLYBUCKETDIFF, bucket),
_nodes(nodes),
- _diff(),
- _maxBufferSize(maxBufferSize)
+ _diff()
{}
ApplyBucketDiffCommand::~ApplyBucketDiffCommand() = default;
@@ -369,8 +367,7 @@ ApplyBucketDiffCommand::print(std::ostream& out, bool verbose,
if (i != 0) out << ", ";
out << _nodes[i];
}
- out << ", max buffer size " << _maxBufferSize << " bytes"
- << ", " << _diff.size() << " entries of " << totalSize << " bytes, "
+ out << _diff.size() << " entries of " << totalSize << " bytes, "
<< (100.0 * filled / _diff.size()) << " \% filled)";
if (_diff.empty()) {
out << ", no entries";
diff --git a/storageapi/src/vespa/storageapi/message/bucket.h b/storageapi/src/vespa/storageapi/message/bucket.h
index eabe847c7e0..e5ec7698329 100644
--- a/storageapi/src/vespa/storageapi/message/bucket.h
+++ b/storageapi/src/vespa/storageapi/message/bucket.h
@@ -274,22 +274,15 @@ public:
private:
std::vector<Node> _nodes;
std::vector<Entry> _diff;
- // We may send more metadata entries that should fit in one apply bucket
- // diff command, to let node pick which ones it wants to fill in first.
- // Nodes should verify that they don't fill a command up with more than
- // this number of bytes.
- uint32_t _maxBufferSize;
public:
ApplyBucketDiffCommand(const document::Bucket &bucket,
- const std::vector<Node>& nodes,
- uint32_t maxBufferSize);
+ const std::vector<Node>& nodes);
~ApplyBucketDiffCommand() override;
const std::vector<Node>& getNodes() const { return _nodes; }
const std::vector<Entry>& getDiff() const { return _diff; }
std::vector<Entry>& getDiff() { return _diff; }
- uint32_t getMaxBufferSize() const { return _maxBufferSize; }
void print(std::ostream& out, bool verbose, const std::string& indent) const override;
DECLARE_STORAGECOMMAND(ApplyBucketDiffCommand, onApplyBucketDiff)