diff options
12 files changed, 31 insertions, 72 deletions
diff --git a/configdefinitions/src/vespa/stor-filestor.def b/configdefinitions/src/vespa/stor-filestor.def index 1cec77832a7..b498eea3ff2 100644 --- a/configdefinitions/src/vespa/stor-filestor.def +++ b/configdefinitions/src/vespa/stor-filestor.def @@ -57,6 +57,7 @@ bucket_merge_chunk_size int default=4190208 restart ## consumption might increase in a 4.1 to 4.2 upgrade due to this, as 4.1 ## dont support to only fill in part of the metadata provided and will always ## fill all. +## NB unused and will be removed shortly. enable_merge_local_node_choose_docs_optimalization bool default=true restart ## Whether or not to enable the multibit split optimalization. This is useful diff --git a/storage/src/tests/persistence/filestorage/mergeblockingtest.cpp b/storage/src/tests/persistence/filestorage/mergeblockingtest.cpp index 58f6587e087..35aee60d30f 100644 --- a/storage/src/tests/persistence/filestorage/mergeblockingtest.cpp +++ b/storage/src/tests/persistence/filestorage/mergeblockingtest.cpp @@ -72,7 +72,7 @@ createGetDiff(const document::BucketId& bucket, std::shared_ptr<api::ApplyBucketDiffCommand> createApplyDiff(const document::BucketId& bucket, const std::vector<api::MergeBucketCommand::Node>& nodes) { - auto cmd = std::make_shared<api::ApplyBucketDiffCommand>(makeDocumentBucket(bucket), nodes, 1024*1024); + auto cmd = std::make_shared<api::ApplyBucketDiffCommand>(makeDocumentBucket(bucket), nodes); assignCommandMeta(*cmd); return cmd; } diff --git a/storage/src/tests/persistence/mergehandlertest.cpp b/storage/src/tests/persistence/mergehandlertest.cpp index 8aacb6027e7..cda570a4396 100644 --- a/storage/src/tests/persistence/mergehandlertest.cpp +++ b/storage/src/tests/persistence/mergehandlertest.cpp @@ -285,7 +285,7 @@ MergeHandlerTest::testApplyBucketDiffChain(bool midChain) MergeHandler handler = createHandler(); LOG(debug, "Verifying that apply bucket diff is sent on"); - auto cmd = std::make_shared<api::ApplyBucketDiffCommand>(_bucket, _nodes, _maxTimestamp); + auto cmd = std::make_shared<api::ApplyBucketDiffCommand>(_bucket, _nodes); MessageTracker::UP tracker1 = handler.handleApplyBucketDiff(*cmd, createTracker(cmd, _bucket)); api::StorageMessage::SP replySent = std::move(*tracker1).stealReplySP(); @@ -510,7 +510,7 @@ TEST_F(MergeHandlerTest, chunk_limit_partially_filled_diff) { } setUpChain(MIDDLE); - auto applyBucketDiffCmd = std::make_shared<api::ApplyBucketDiffCommand>(_bucket, _nodes, maxChunkSize); + auto applyBucketDiffCmd = std::make_shared<api::ApplyBucketDiffCommand>(_bucket, _nodes); applyBucketDiffCmd->getDiff() = applyDiff; MergeHandler handler = createHandler(maxChunkSize); @@ -591,15 +591,14 @@ MergeHandlerTest::createDummyApplyDiff(int timestampOffset, fillDummyApplyDiff(applyDiff); } - auto applyBucketDiffCmd = std::make_shared<api::ApplyBucketDiffCommand>(_bucket, _nodes, 1024*1024); + auto applyBucketDiffCmd = std::make_shared<api::ApplyBucketDiffCommand>(_bucket, _nodes); applyBucketDiffCmd->getDiff() = applyDiff; return applyBucketDiffCmd; } // Must match up with diff used in createDummyApplyDiff std::shared_ptr<api::GetBucketDiffCommand> -MergeHandlerTest::createDummyGetBucketDiff(int timestampOffset, - uint16_t hasMask) +MergeHandlerTest::createDummyGetBucketDiff(int timestampOffset, uint16_t hasMask) { std::vector<api::GetBucketDiffCommand::Entry> diff; { @@ -726,7 +725,7 @@ TEST_F(MergeHandlerTest, entry_removed_after_get_bucket_diff) { applyDiff.push_back(e); } setUpChain(BACK); - auto applyBucketDiffCmd = std::make_shared<api::ApplyBucketDiffCommand>(_bucket, _nodes, 1024*1024); + auto applyBucketDiffCmd = std::make_shared<api::ApplyBucketDiffCommand>(_bucket, _nodes); applyBucketDiffCmd->getDiff() = applyDiff; auto tracker = handler.handleApplyBucketDiff(*applyBucketDiffCmd, createTracker(applyBucketDiffCmd, _bucket)); @@ -1148,7 +1147,7 @@ TEST_F(MergeHandlerTest, remove_put_on_existing_timestamp) { applyDiff.push_back(e); } - auto applyBucketDiffCmd = std::make_shared<api::ApplyBucketDiffCommand>(_bucket, _nodes, 1024*1024); + auto applyBucketDiffCmd = std::make_shared<api::ApplyBucketDiffCommand>(_bucket, _nodes); applyBucketDiffCmd->getDiff() = applyDiff; auto tracker = handler.handleApplyBucketDiff(*applyBucketDiffCmd, createTracker(applyBucketDiffCmd, _bucket)); diff --git a/storage/src/tests/storageserver/mergethrottlertest.cpp b/storage/src/tests/storageserver/mergethrottlertest.cpp index e6067fa561c..0271af85fef 100644 --- a/storage/src/tests/storageserver/mergethrottlertest.cpp +++ b/storage/src/tests/storageserver/mergethrottlertest.cpp @@ -1367,8 +1367,7 @@ TEST_F(MergeThrottlerTest, get_bucket_diff_command_not_in_active_set_is_rejected TEST_F(MergeThrottlerTest, apply_bucket_diff_command_not_in_active_set_is_rejected) { document::BucketId bucket(16, 1234); std::vector<api::GetBucketDiffCommand::Node> nodes; - auto applyDiffCmd = std::make_shared<api::ApplyBucketDiffCommand>( - makeDocumentBucket(bucket), nodes, api::Timestamp(1234)); + auto applyDiffCmd = std::make_shared<api::ApplyBucketDiffCommand>(makeDocumentBucket(bucket), nodes); ASSERT_NO_FATAL_FAILURE(sendAndExpectReply(applyDiffCmd, api::MessageType::APPLYBUCKETDIFF_REPLY, diff --git a/storage/src/vespa/storage/persistence/mergehandler.cpp b/storage/src/vespa/storage/persistence/mergehandler.cpp index c7c681a838b..cabd74f6777 100644 --- a/storage/src/vespa/storage/persistence/mergehandler.cpp +++ b/storage/src/vespa/storage/persistence/mergehandler.cpp @@ -17,14 +17,12 @@ namespace storage { MergeHandler::MergeHandler(PersistenceUtil& env, spi::PersistenceProvider& spi, const vespalib::string & clusterName, const framework::Clock & clock, uint32_t maxChunkSize, - bool enableMergeLocalNodeChooseDocsOptimalization, uint32_t commonMergeChainOptimalizationMinimumSize) : _clock(clock), _clusterName(clusterName), _env(env), _spi(spi), _maxChunkSize(maxChunkSize), - _enableMergeLocalNodeChooseDocsOptimalization(enableMergeLocalNodeChooseDocsOptimalization), _commonMergeChainOptimalizationMinimumSize(commonMergeChainOptimalizationMinimumSize) { } @@ -653,26 +651,14 @@ MergeHandler::applyDiffLocally( } namespace { - void findCandidates(const document::BucketId& id, MergeStatus& status, - bool constrictHasMask, uint16_t hasMask, - uint16_t newHasMask, - uint32_t maxSize, api::ApplyBucketDiffCommand& cmd) + void findCandidates(MergeStatus& status, bool constrictHasMask, uint16_t hasMask, + uint16_t newHasMask, api::ApplyBucketDiffCommand& cmd) { uint32_t chunkSize = 0; for (const auto& entry : status.diff) { if (constrictHasMask && entry._hasMask != hasMask) { continue; } - if (chunkSize != 0 && - chunkSize + entry._bodySize + entry._headerSize > maxSize) - { - LOG(spam, "Merge of %s used %d bytes, max is %d. Will " - "fetch in next merge round.", - id.toString().c_str(), - chunkSize + entry._bodySize + entry._headerSize, - maxSize); - break; - } chunkSize += entry._bodySize + entry._headerSize; cmd.getDiff().emplace_back(entry); if (constrictHasMask) { @@ -717,21 +703,12 @@ MergeHandler::processBucketMerge(const spi::Bucket& bucket, MergeStatus& status, nodes.push_back(status.nodeList.back()); assert(nodes.size() > 1); - // Add all the metadata, and thus use big limit. Max - // data to fetch parameter will control amount added. - uint32_t maxSize = - (_enableMergeLocalNodeChooseDocsOptimalization - ? std::numeric_limits<uint32_t>::max() - : _maxChunkSize); - - cmd = std::make_shared<api::ApplyBucketDiffCommand>(bucket.getBucket(), nodes, maxSize); + cmd = std::make_shared<api::ApplyBucketDiffCommand>(bucket.getBucket(), nodes); cmd->setAddress(createAddress(_clusterName, nodes[1].index)); - findCandidates(bucket.getBucketId(), - status, + findCandidates(status, true, 1 << (status.nodeList.size() - 1), 1 << (nodes.size() - 1), - maxSize, *cmd); if (cmd->getDiff().size() != 0) break; cmd.reset(); @@ -788,15 +765,11 @@ MergeHandler::processBucketMerge(const spi::Bucket& bucket, MergeStatus& status, newMask = 1 << (nodes.size() - 1); } assert(nodes.size() > 1); - uint32_t maxSize = - (_enableMergeLocalNodeChooseDocsOptimalization - ? std::numeric_limits<uint32_t>::max() - : _maxChunkSize); - cmd = std::make_shared<api::ApplyBucketDiffCommand>(bucket.getBucket(), nodes, maxSize); + cmd = std::make_shared<api::ApplyBucketDiffCommand>(bucket.getBucket(), nodes); cmd->setAddress(createAddress(_clusterName, nodes[1].index)); // Add all the metadata, and thus use big limit. Max // data to fetch parameter will control amount added. - findCandidates(bucket.getBucketId(), status, true, e.first, newMask, maxSize, *cmd); + findCandidates(status, true, e.first, newMask, *cmd); break; } } @@ -805,9 +778,9 @@ MergeHandler::processBucketMerge(const spi::Bucket& bucket, MergeStatus& status, // If we found no group big enough to handle on its own, do a common // merge to merge the remaining data. if ( ! cmd ) { - cmd = std::make_shared<api::ApplyBucketDiffCommand>(bucket.getBucket(), status.nodeList, _maxChunkSize); + cmd = std::make_shared<api::ApplyBucketDiffCommand>(bucket.getBucket(), status.nodeList); cmd->setAddress(createAddress(_clusterName, status.nodeList[1].index)); - findCandidates(bucket.getBucketId(), status, false, 0, 0, _maxChunkSize, *cmd); + findCandidates(status, false, 0, 0, *cmd); } cmd->setPriority(status.context.getPriority()); cmd->setTimeout(status.timeout); @@ -1312,7 +1285,7 @@ MergeHandler::handleApplyBucketDiff(api::ApplyBucketDiffCommand& cmd, MessageTra LOG(spam, "Sending ApplyBucketDiff for %s on to node %d", bucket.toString().c_str(), cmd.getNodes()[index + 1].index); - auto cmd2 = std::make_shared<api::ApplyBucketDiffCommand>(bucket.getBucket(), cmd.getNodes(), cmd.getMaxBufferSize()); + auto cmd2 = std::make_shared<api::ApplyBucketDiffCommand>(bucket.getBucket(), cmd.getNodes()); cmd2->setAddress(createAddress(_clusterName, cmd.getNodes()[index + 1].index)); cmd2->getDiff().swap(cmd.getDiff()); cmd2->setPriority(cmd.getPriority()); diff --git a/storage/src/vespa/storage/persistence/mergehandler.h b/storage/src/vespa/storage/persistence/mergehandler.h index 070f72c44df..5e65e1a39ec 100644 --- a/storage/src/vespa/storage/persistence/mergehandler.h +++ b/storage/src/vespa/storage/persistence/mergehandler.h @@ -37,7 +37,6 @@ public: MergeHandler(PersistenceUtil& env, spi::PersistenceProvider& spi, const vespalib::string & clusterName, const framework::Clock & clock, uint32_t maxChunkSize = 4190208, - bool enableMergeLocalNodeChooseDocsOptimalization = true, uint32_t commonMergeChainOptimalizationMinimumSize = 64); bool buildBucketInfoList( @@ -70,9 +69,8 @@ private: const vespalib::string &_clusterName; PersistenceUtil &_env; spi::PersistenceProvider &_spi; - uint32_t _maxChunkSize; - bool _enableMergeLocalNodeChooseDocsOptimalization; - uint32_t _commonMergeChainOptimalizationMinimumSize; + const uint32_t _maxChunkSize; + const uint32_t _commonMergeChainOptimalizationMinimumSize; /** Returns a reply if merge is complete */ api::StorageReply::SP processBucketMerge(const spi::Bucket& bucket, diff --git a/storage/src/vespa/storage/persistence/persistencehandler.cpp b/storage/src/vespa/storage/persistence/persistencehandler.cpp index b9d5d9affa0..c3751abb7d9 100644 --- a/storage/src/vespa/storage/persistence/persistencehandler.cpp +++ b/storage/src/vespa/storage/persistence/persistencehandler.cpp @@ -19,7 +19,6 @@ PersistenceHandler::PersistenceHandler(vespalib::ISequencedTaskExecutor & sequen _processAllHandler(_env, provider), _mergeHandler(_env, provider, component.getClusterName(), _clock, cfg.bucketMergeChunkSize, - cfg.enableMergeLocalNodeChooseDocsOptimalization, cfg.commonMergeChainOptimalizationMinimumSize), _asyncHandler(_env, provider, sequencedExecutor, component.getBucketIdFactory()), _splitJoinHandler(_env, provider, bucketOwnershipNotifier, cfg.enableMultibitSplitOptimalization), diff --git a/storageapi/src/tests/mbusprot/storageprotocoltest.cpp b/storageapi/src/tests/mbusprot/storageprotocoltest.cpp index d1de3bf5afb..9fd2c96f27e 100644 --- a/storageapi/src/tests/mbusprot/storageprotocoltest.cpp +++ b/storageapi/src/tests/mbusprot/storageprotocoltest.cpp @@ -688,7 +688,7 @@ TEST_P(StorageProtocolTest, apply_bucket_diff) { nodes.push_back(13); std::vector<ApplyBucketDiffCommand::Entry> entries = {dummy_apply_entry()}; - auto cmd = std::make_shared<ApplyBucketDiffCommand>(_bucket, nodes, 1234); + auto cmd = std::make_shared<ApplyBucketDiffCommand>(_bucket, nodes); cmd->getDiff() = entries; auto cmd2 = copyCommand(cmd); EXPECT_EQ(_bucket, cmd2->getBucket()); diff --git a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization4_2.cpp b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization4_2.cpp index 13fba8b8508..c57b991eb36 100644 --- a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization4_2.cpp +++ b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization4_2.cpp @@ -193,7 +193,7 @@ void ProtocolSerialization4_2::onEncode(GBBuf& buf, const api::ApplyBucketDiffCo buf.putShort(nodes[i].index); buf.putBoolean(nodes[i].sourceOnly); } - buf.putInt(msg.getMaxBufferSize()); + buf.putInt(0x400000); const std::vector<api::ApplyBucketDiffCommand::Entry>& entries(msg.getDiff()); buf.putInt(entries.size()); for (uint32_t i=0; i<entries.size(); ++i) { @@ -220,12 +220,12 @@ ProtocolSerialization4_2::onDecodeApplyBucketDiffCommand(BBuf& buf) const bool sourceOnly = SH::getBoolean(buf); nodes.push_back(Node(index, sourceOnly)); } - uint32_t maxBufferSize(SH::getInt(buf)); - auto msg = std::make_unique<api::ApplyBucketDiffCommand>(bucket, nodes, maxBufferSize); + (void) SH::getInt(buf); // Unused field + auto msg = std::make_unique<api::ApplyBucketDiffCommand>(bucket, nodes); std::vector<api::ApplyBucketDiffCommand::Entry>& entries(msg->getDiff()); uint32_t entryCount = SH::getInt(buf); if (entryCount > buf.getRemaining()) { - // Trigger out of bounds exception rather than out of memory error + // Trigger out of bounds exception rather than out of memory error buf.incPos(entryCount); } entries.resize(entryCount); diff --git a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.cpp b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.cpp index 166925382bd..5d66d86036c 100644 --- a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.cpp +++ b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.cpp @@ -944,7 +944,7 @@ void fill_proto_apply_diff_vector(::google::protobuf::RepeatedPtrField<protobuf: void ProtocolSerialization7::onEncode(GBBuf& buf, const api::ApplyBucketDiffCommand& msg) const { encode_bucket_request<protobuf::ApplyBucketDiffRequest>(buf, msg, [&](auto& req) { set_merge_nodes(*req.mutable_nodes(), msg.getNodes()); - req.set_max_buffer_size(msg.getMaxBufferSize()); + req.set_max_buffer_size(0x400000); // Unused, GC soon. fill_proto_apply_diff_vector(*req.mutable_entries(), msg.getDiff()); }); } @@ -958,7 +958,7 @@ void ProtocolSerialization7::onEncode(GBBuf& buf, const api::ApplyBucketDiffRepl api::StorageCommand::UP ProtocolSerialization7::onDecodeApplyBucketDiffCommand(BBuf& buf) const { return decode_bucket_request<protobuf::ApplyBucketDiffRequest>(buf, [&](auto& req, auto& bucket) { auto nodes = get_merge_nodes(req.nodes()); - auto cmd = std::make_unique<api::ApplyBucketDiffCommand>(bucket, std::move(nodes), req.max_buffer_size()); + auto cmd = std::make_unique<api::ApplyBucketDiffCommand>(bucket, std::move(nodes)); fill_api_apply_diff_vector(cmd->getDiff(), req.entries()); return cmd; }); diff --git a/storageapi/src/vespa/storageapi/message/bucket.cpp b/storageapi/src/vespa/storageapi/message/bucket.cpp index ced71f5326b..1262c5db632 100644 --- a/storageapi/src/vespa/storageapi/message/bucket.cpp +++ b/storageapi/src/vespa/storageapi/message/bucket.cpp @@ -341,12 +341,10 @@ ApplyBucketDiffCommand::Entry::operator==(const Entry& e) const } ApplyBucketDiffCommand::ApplyBucketDiffCommand( - const document::Bucket &bucket, const std::vector<Node>& nodes, - uint32_t maxBufferSize) + const document::Bucket &bucket, const std::vector<Node>& nodes) : BucketInfoCommand(MessageType::APPLYBUCKETDIFF, bucket), _nodes(nodes), - _diff(), - _maxBufferSize(maxBufferSize) + _diff() {} ApplyBucketDiffCommand::~ApplyBucketDiffCommand() = default; @@ -369,8 +367,7 @@ ApplyBucketDiffCommand::print(std::ostream& out, bool verbose, if (i != 0) out << ", "; out << _nodes[i]; } - out << ", max buffer size " << _maxBufferSize << " bytes" - << ", " << _diff.size() << " entries of " << totalSize << " bytes, " + out << _diff.size() << " entries of " << totalSize << " bytes, " << (100.0 * filled / _diff.size()) << " \% filled)"; if (_diff.empty()) { out << ", no entries"; diff --git a/storageapi/src/vespa/storageapi/message/bucket.h b/storageapi/src/vespa/storageapi/message/bucket.h index eabe847c7e0..e5ec7698329 100644 --- a/storageapi/src/vespa/storageapi/message/bucket.h +++ b/storageapi/src/vespa/storageapi/message/bucket.h @@ -274,22 +274,15 @@ public: private: std::vector<Node> _nodes; std::vector<Entry> _diff; - // We may send more metadata entries that should fit in one apply bucket - // diff command, to let node pick which ones it wants to fill in first. - // Nodes should verify that they don't fill a command up with more than - // this number of bytes. - uint32_t _maxBufferSize; public: ApplyBucketDiffCommand(const document::Bucket &bucket, - const std::vector<Node>& nodes, - uint32_t maxBufferSize); + const std::vector<Node>& nodes); ~ApplyBucketDiffCommand() override; const std::vector<Node>& getNodes() const { return _nodes; } const std::vector<Entry>& getDiff() const { return _diff; } std::vector<Entry>& getDiff() { return _diff; } - uint32_t getMaxBufferSize() const { return _maxBufferSize; } void print(std::ostream& out, bool verbose, const std::string& indent) const override; DECLARE_STORAGECOMMAND(ApplyBucketDiffCommand, onApplyBucketDiff) |