diff options
author | Tor Brede Vekterli <vekterli@yahooinc.com> | 2022-07-01 15:18:32 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-07-01 15:18:32 +0200 |
commit | 59b00e6d03a6317e8f4a19486b8c26a64abf39b9 (patch) | |
tree | e7f4b570ca8cd2daa0591f7848290773b922dce7 /storage | |
parent | 43d65164a52041f8edfaa08028747a6fa69c6b81 (diff) | |
parent | 331dfa1c316e1663e84a792aabba75fce284356d (diff) |
Merge pull request #23316 from vespa-engine/vekterli/remove-legacy-storage-protocol-versions
GC all legacy storage protocol serialization versions
Diffstat (limited to 'storage')
18 files changed, 31 insertions, 1952 deletions
diff --git a/storage/src/tests/storageapi/mbusprot/storageprotocoltest.cpp b/storage/src/tests/storageapi/mbusprot/storageprotocoltest.cpp index c130e433285..d4d22d6a36c 100644 --- a/storage/src/tests/storageapi/mbusprot/storageprotocoltest.cpp +++ b/storage/src/tests/storageapi/mbusprot/storageprotocoltest.cpp @@ -112,8 +112,7 @@ std::string version_as_gtest_string(TestParamInfo<vespalib::Version> info) { } VESPA_GTEST_INSTANTIATE_TEST_SUITE_P(MultiVersionTest, StorageProtocolTest, - Values(vespalib::Version(6, 240, 0), - vespalib::Version(7, 41, 19)), + Values(vespalib::Version(7, 41, 19)), version_as_gtest_string); namespace { @@ -301,10 +300,6 @@ TEST_P(StorageProtocolTest, get_internal_read_consistency_is_strong_by_default) } TEST_P(StorageProtocolTest, can_set_internal_read_consistency_on_get_commands) { - // Only supported on protocol version 7+. Will default to Strong on older versions, which is what we want. - if (GetParam().getMajor() < 7) { - return; - } auto cmd = std::make_shared<GetCommand>(_bucket, _testDocId, "foo,bar,vekterli", 123); cmd->set_internal_read_consistency(InternalReadConsistency::Weak); auto cmd2 = copyCommand(cmd); @@ -316,10 +311,6 @@ TEST_P(StorageProtocolTest, can_set_internal_read_consistency_on_get_commands) { } TEST_P(StorageProtocolTest, tombstones_propagated_for_gets) { - // Only supported on protocol version 7+. - if (GetParam().getMajor() < 7) { - return; - } auto cmd = std::make_shared<GetCommand>(_bucket, _testDocId, "foo,bar", 123); auto reply = std::make_shared<GetReply>(*cmd, std::shared_ptr<Document>(), 100, false, true); set_dummy_bucket_info_reply_fields(*reply); @@ -332,23 +323,6 @@ TEST_P(StorageProtocolTest, tombstones_propagated_for_gets) { EXPECT_TRUE(reply2->is_tombstone()); } -// TODO remove this once pre-protobuf serialization is removed -TEST_P(StorageProtocolTest, old_serialization_format_treats_tombstone_get_replies_as_not_found) { - if (GetParam().getMajor() >= 7) { - return; - } - auto cmd = std::make_shared<GetCommand>(_bucket, _testDocId, "foo,bar", 123); - auto reply = std::make_shared<GetReply>(*cmd, std::shared_ptr<Document>(), 100, false, true); - set_dummy_bucket_info_reply_fields(*reply); - auto reply2 = copyReply(reply); - - EXPECT_TRUE(reply2->getDocument().get() == nullptr); - EXPECT_EQ(_testDoc->getId(), reply2->getDocumentId()); - EXPECT_EQ(Timestamp(123), reply2->getBeforeTimestamp()); - EXPECT_EQ(Timestamp(0), reply2->getLastModifiedTimestamp()); - EXPECT_FALSE(reply2->is_tombstone()); // Protocol version doesn't understand explicit tombstones. -} - TEST_P(StorageProtocolTest, remove) { auto cmd = std::make_shared<RemoveCommand>(_bucket, _testDocId, 159); auto cmd2 = copyCommand(cmd); @@ -486,11 +460,7 @@ TEST_P(StorageProtocolTest, merge_bucket) { EXPECT_EQ(Timestamp(1234), cmd2->getMaxTimestamp()); EXPECT_EQ(uint32_t(567), cmd2->getClusterStateVersion()); EXPECT_EQ(chain, cmd2->getChain()); - if (GetParam().getMajor() >= 7) { - EXPECT_EQ(cmd2->use_unordered_forwarding(), cmd->use_unordered_forwarding()); - } else { - EXPECT_FALSE(cmd2->use_unordered_forwarding()); - } + EXPECT_EQ(cmd2->use_unordered_forwarding(), cmd->use_unordered_forwarding()); auto reply = std::make_shared<MergeBucketReply>(*cmd); auto reply2 = copyReply(reply); @@ -569,18 +539,10 @@ TEST_P(StorageProtocolTest, remove_location) { uint32_t n_docs_removed = 12345; auto reply = std::make_shared<RemoveLocationReply>(*cmd2, n_docs_removed); auto reply2 = copyReply(reply); - if (GetParam().getMajor() == 7) { - // Statistics are only available for protobuf-enabled version. - EXPECT_EQ(n_docs_removed, reply2->documents_removed()); - } else { - EXPECT_EQ(0, reply2->documents_removed()); - } + EXPECT_EQ(n_docs_removed, reply2->documents_removed()); } TEST_P(StorageProtocolTest, stat_bucket) { - if (GetParam().getMajor() < 7) { - return; // Only available for protobuf-backed protocol version. - } auto cmd = std::make_shared<StatBucketCommand>(_bucket, "id.group == 'mygroup'"); auto cmd2 = copyCommand(cmd); EXPECT_EQ("id.group == 'mygroup'", cmd2->getDocumentSelection()); @@ -823,11 +785,7 @@ TEST_P(StorageProtocolTest, serialized_size_is_used_to_set_approx_size_of_storag auto cmd2 = copyCommand(cmd); auto version = GetParam(); - if (version.getMajor() == 7) { // Protobuf-based encoding - EXPECT_EQ(158u, cmd2->getApproxByteSize()); - } else { // Legacy encoding - EXPECT_EQ(181u, cmd2->getApproxByteSize()); - } + EXPECT_EQ(158u, cmd2->getApproxByteSize()); } TEST_P(StorageProtocolTest, track_memory_footprint_for_some_messages) { diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.cpp b/storage/src/vespa/storage/storageserver/communicationmanager.cpp index 4c375a30867..ab1f72e85d2 100644 --- a/storage/src/vespa/storage/storageserver/communicationmanager.cpp +++ b/storage/src/vespa/storage/storageserver/communicationmanager.cpp @@ -169,10 +169,6 @@ CommunicationManager::handleReply(std::unique_ptr<mbus::Reply> reply) sar->setTrace(reply->steal_trace()); receiveStorageReply(sar); } - } else if (protocolName == mbusprot::StorageProtocol::NAME) { - mbusprot::StorageReply* sr(static_cast<mbusprot::StorageReply*>(reply.get())); - sr->getReply()->setTrace(reply->steal_trace()); - receiveStorageReply(sr->getReply()); } else { LOGBM(warning, "Received unsupported reply type %d for protocol '%s'.", reply->getType(), reply->getProtocol().c_str()); diff --git a/storage/src/vespa/storageapi/mbusprot/CMakeLists.txt b/storage/src/vespa/storageapi/mbusprot/CMakeLists.txt index 43c1da32502..4999201fbeb 100644 --- a/storage/src/vespa/storageapi/mbusprot/CMakeLists.txt +++ b/storage/src/vespa/storageapi/mbusprot/CMakeLists.txt @@ -23,11 +23,6 @@ vespa_add_library(storageapi_mbusprot OBJECT storagereply.cpp protocolserialization.cpp storageprotocol.cpp - protocolserialization4_2.cpp - protocolserialization5_0.cpp - protocolserialization5_1.cpp - protocolserialization5_2.cpp - protocolserialization6_0.cpp protocolserialization7.cpp ${storageapi_PROTOBUF_SRCS} DEPENDS diff --git a/storage/src/vespa/storageapi/mbusprot/legacyprotocolserialization.h b/storage/src/vespa/storageapi/mbusprot/legacyprotocolserialization.h deleted file mode 100644 index f3c6a6f6856..00000000000 --- a/storage/src/vespa/storageapi/mbusprot/legacyprotocolserialization.h +++ /dev/null @@ -1,31 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#pragma once - -#include "protocolserialization.h" - -namespace storage::mbusprot { - -/* - * Utility base class for pre-v7 (protobuf) serialization implementations. - * - * TODO remove on Vespa 8 alongside legacy serialization formats. - */ -class LegacyProtocolSerialization : public ProtocolSerialization { - const std::shared_ptr<const document::DocumentTypeRepo> _repo; -public: - explicit LegacyProtocolSerialization(const std::shared_ptr<const document::DocumentTypeRepo>& repo) - : _repo(repo) - {} - - const document::DocumentTypeRepo& getTypeRepo() const { return *_repo; } - const std::shared_ptr<const document::DocumentTypeRepo> getTypeRepoSp() const { return _repo; } - - virtual document::Bucket getBucket(document::ByteBuffer& buf) const = 0; - virtual void putBucket(const document::Bucket& bucket, vespalib::GrowableByteBuffer& buf) const = 0; - virtual document::BucketSpace getBucketSpace(document::ByteBuffer& buf) const = 0; - virtual void putBucketSpace(document::BucketSpace bucketSpace, vespalib::GrowableByteBuffer& buf) const = 0; - virtual api::BucketInfo getBucketInfo(document::ByteBuffer& buf) const = 0; - virtual void putBucketInfo(const api::BucketInfo& info, vespalib::GrowableByteBuffer& buf) const = 0; -}; - -} // storage::mbusprot diff --git a/storage/src/vespa/storageapi/mbusprot/oldreturncodemapper.h b/storage/src/vespa/storageapi/mbusprot/oldreturncodemapper.h deleted file mode 100644 index 40e09f23697..00000000000 --- a/storage/src/vespa/storageapi/mbusprot/oldreturncodemapper.h +++ /dev/null @@ -1,68 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -/** - * \brief Maps return code values used between 4.2 and 5.0 - */ -#pragma once - -namespace storage { -namespace mbusprot { - -int getOldErrorCode(api::ReturnCode::Result newErrorCode) { - switch (newErrorCode) { - case api::ReturnCode::OK: return 1; - case api::ReturnCode::EXISTS: return 1001; - case api::ReturnCode::NOT_READY: return 2000; - case api::ReturnCode::WRONG_DISTRIBUTION: return 2001; - case api::ReturnCode::REJECTED: return 2002; - case api::ReturnCode::ABORTED: return 2003; - case api::ReturnCode::BUCKET_NOT_FOUND: return 2004; - case api::ReturnCode::BUCKET_DELETED: return 2004; - case api::ReturnCode::TIMESTAMP_EXIST: return 2005; - case api::ReturnCode::UNKNOWN_COMMAND: return 3000; - case api::ReturnCode::NOT_IMPLEMENTED: return 3001; - case api::ReturnCode::ILLEGAL_PARAMETERS: return 3002; - case api::ReturnCode::IGNORED: return 3003; - case api::ReturnCode::UNPARSEABLE: return 3004; - case api::ReturnCode::NOT_CONNECTED: return 4000; - case api::ReturnCode::TIMEOUT: return 4003; - case api::ReturnCode::BUSY: return 4004; - case api::ReturnCode::NO_SPACE: return 5000; - case api::ReturnCode::DISK_FAILURE: return 5001; - case api::ReturnCode::IO_FAILURE: return 5003; - case api::ReturnCode::INTERNAL_FAILURE: return 6000; - default: return 6001; - } -} - -api::ReturnCode::Result getNewErrorCode(int oldErrorCode) { - switch (oldErrorCode) { - case 1: return api::ReturnCode::OK; - case 1000: return api::ReturnCode::OK; // NOT_FOUND - case 1001: return api::ReturnCode::EXISTS; - case 2000: return api::ReturnCode::NOT_READY; - case 2001: return api::ReturnCode::WRONG_DISTRIBUTION; - case 2002: return api::ReturnCode::REJECTED; - case 2003: return api::ReturnCode::ABORTED; - case 2004: return api::ReturnCode::BUCKET_NOT_FOUND; - case 2005: return api::ReturnCode::TIMESTAMP_EXIST; - case 3000: return api::ReturnCode::UNKNOWN_COMMAND; - case 3001: return api::ReturnCode::NOT_IMPLEMENTED; - case 3002: return api::ReturnCode::ILLEGAL_PARAMETERS; - case 3003: return api::ReturnCode::IGNORED; - case 3004: return api::ReturnCode::UNPARSEABLE; - case 4000: return api::ReturnCode::NOT_CONNECTED; - case 4001: return api::ReturnCode::BUSY; // OVERLOAD; - case 4002: return api::ReturnCode::NOT_READY; // REMOTE_DISABLED; - case 4003: return api::ReturnCode::TIMEOUT; - case 4004: return api::ReturnCode::BUSY; - case 5000: return api::ReturnCode::NO_SPACE; - case 5001: return api::ReturnCode::DISK_FAILURE; - case 5003: return api::ReturnCode::IO_FAILURE; - case 6000: return api::ReturnCode::INTERNAL_FAILURE; - default: return api::ReturnCode::INTERNAL_FAILURE; - } -} - -} // mbusprot -} // storage - diff --git a/storage/src/vespa/storageapi/mbusprot/protocolserialization.cpp b/storage/src/vespa/storageapi/mbusprot/protocolserialization.cpp index 4614659c458..59ea60e6e0d 100644 --- a/storage/src/vespa/storageapi/mbusprot/protocolserialization.cpp +++ b/storage/src/vespa/storageapi/mbusprot/protocolserialization.cpp @@ -1,16 +1,16 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#include "protocolserialization4_2.h" +#include "protocolserialization7.h" #include "serializationhelper.h" #include "storagecommand.h" #include "storagereply.h" #include <vespa/storageapi/message/bucketsplitting.h> -#include <vespa/storageapi/message/visitor.h> +#include <vespa/storageapi/message/persistence.h> #include <vespa/storageapi/message/removelocation.h> #include <vespa/storageapi/message/stat.h> +#include <vespa/storageapi/message/visitor.h> #include <vespa/vespalib/util/exceptions.h> - #include <sstream> #include <vespa/log/log.h> diff --git a/storage/src/vespa/storageapi/mbusprot/protocolserialization4_2.cpp b/storage/src/vespa/storageapi/mbusprot/protocolserialization4_2.cpp deleted file mode 100644 index 1bea0ea74c9..00000000000 --- a/storage/src/vespa/storageapi/mbusprot/protocolserialization4_2.cpp +++ /dev/null @@ -1,553 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#include "protocolserialization4_2.h" -#include "oldreturncodemapper.h" -#include "serializationhelper.h" -#include "storagecommand.h" -#include "storagereply.h" - -#include <vespa/storageapi/message/bucketsplitting.h> -#include <vespa/storageapi/message/visitor.h> -#include <vespa/storageapi/message/removelocation.h> -#include <vespa/vespalib/util/exceptions.h> -#include <vespa/document/fieldset/fieldsets.h> - -#include <vespa/log/log.h> -LOG_SETUP(".storage.api.mbusprot.serialization.4_2"); - -using document::BucketSpace; -using document::AllFields; - -namespace storage::mbusprot { - -ProtocolSerialization4_2::ProtocolSerialization4_2( - const std::shared_ptr<const document::DocumentTypeRepo>& repo) - : LegacyProtocolSerialization(repo) -{ -} - -void ProtocolSerialization4_2::onEncode(GBBuf& buf, const api::GetCommand& msg) const -{ - buf.putString(msg.getDocumentId().toString()); - putBucket(msg.getBucket(), buf); - buf.putLong(msg.getBeforeTimestamp()); - buf.putBoolean(false); - onEncodeCommand(buf, msg); -} - -api::StorageCommand::UP -ProtocolSerialization4_2::onDecodeGetCommand(BBuf& buf) const -{ - document::DocumentId did(SH::getString(buf)); - document::Bucket bucket = getBucket(buf); - api::Timestamp beforeTimestamp(SH::getLong(buf)); - bool headerOnly(SH::getBoolean(buf)); // Ignored header only flag - (void) headerOnly; - auto msg = std::make_unique<api::GetCommand>(bucket, did, AllFields::NAME, beforeTimestamp); - onDecodeCommand(buf, *msg); - return msg; -} - -void ProtocolSerialization4_2::onEncode(GBBuf& buf, const api::RemoveCommand& msg) const -{ - buf.putString(msg.getDocumentId().toString()); - putBucket(msg.getBucket(), buf); - buf.putLong(msg.getTimestamp()); - onEncodeBucketInfoCommand(buf, msg); -} - -api::StorageCommand::UP -ProtocolSerialization4_2::onDecodeRemoveCommand(BBuf& buf) const -{ - document::DocumentId did(SH::getString(buf)); - document::Bucket bucket = getBucket(buf); - api::Timestamp timestamp(SH::getLong(buf)); - auto msg = std::make_unique<api::RemoveCommand>(bucket, did, timestamp); - onDecodeBucketInfoCommand(buf, *msg); - return msg; -} - -void ProtocolSerialization4_2::onEncode(GBBuf& buf, const api::RevertCommand& msg) const -{ - putBucket(msg.getBucket(), buf); - buf.putInt(msg.getRevertTokens().size()); - for (uint32_t i=0, n=msg.getRevertTokens().size(); i<n; ++i) { - buf.putLong(msg.getRevertTokens()[i]); - } - onEncodeBucketInfoCommand(buf, msg); -} - -api::StorageCommand::UP -ProtocolSerialization4_2::onDecodeRevertCommand(BBuf& buf) const -{ - document::Bucket bucket = getBucket(buf); - std::vector<api::Timestamp> tokens(SH::getInt(buf)); - for (uint32_t i=0, n=tokens.size(); i<n; ++i) { - tokens[i] = SH::getLong(buf); - } - auto msg = std::make_unique<api::RevertCommand>(bucket, tokens); - onDecodeBucketInfoCommand(buf, *msg); - return msg; -} - -void ProtocolSerialization4_2::onEncode(GBBuf& buf, const api::CreateBucketCommand& msg) const -{ - putBucket(msg.getBucket(), buf); - onEncodeBucketInfoCommand(buf, msg); -} - -api::StorageCommand::UP -ProtocolSerialization4_2::onDecodeCreateBucketCommand(BBuf& buf) const -{ - document::Bucket bucket = getBucket(buf); - auto msg = std::make_unique<api::CreateBucketCommand>(bucket); - onDecodeBucketInfoCommand(buf, *msg); - return msg; -} - -void ProtocolSerialization4_2::onEncode(GBBuf& buf, const api::MergeBucketCommand& msg) const -{ - putBucket(msg.getBucket(), buf); - const std::vector<api::MergeBucketCommand::Node>& nodes(msg.getNodes()); - buf.putShort(nodes.size()); - for (uint32_t i=0; i<nodes.size(); ++i) { - buf.putShort(nodes[i].index); - buf.putBoolean(nodes[i].sourceOnly); - } - buf.putLong(msg.getMaxTimestamp()); - onEncodeCommand(buf, msg); -} - -api::StorageCommand::UP -ProtocolSerialization4_2::onDecodeMergeBucketCommand(BBuf& buf) const -{ - typedef api::MergeBucketCommand::Node Node; - document::Bucket bucket = getBucket(buf); - uint16_t nodeCount = SH::getShort(buf); - std::vector<Node> nodes; - nodes.reserve(nodeCount); - for (uint32_t i=0; i<nodeCount; ++i) { - uint16_t index(SH::getShort(buf)); - bool sourceOnly = SH::getBoolean(buf); - nodes.push_back(Node(index, sourceOnly)); - } - api::Timestamp timestamp(SH::getLong(buf)); - auto msg = std::make_unique<api::MergeBucketCommand>(bucket, nodes, timestamp); - onDecodeCommand(buf, *msg); - return msg; -} - -void ProtocolSerialization4_2::onEncode(GBBuf& buf, const api::GetBucketDiffCommand& msg) const -{ - putBucket(msg.getBucket(), buf); - const std::vector<api::MergeBucketCommand::Node>& nodes(msg.getNodes()); - buf.putShort(nodes.size()); - for (uint32_t i=0; i<nodes.size(); ++i) { - buf.putShort(nodes[i].index); - buf.putBoolean(nodes[i].sourceOnly); - } - buf.putLong(msg.getMaxTimestamp()); - const std::vector<api::GetBucketDiffCommand::Entry>& entries(msg.getDiff()); - buf.putInt(entries.size()); - for (uint32_t i=0; i<entries.size(); ++i) { - onEncodeDiffEntry(buf, entries[i]); - } - onEncodeCommand(buf, msg); -} - -api::StorageCommand::UP -ProtocolSerialization4_2::onDecodeGetBucketDiffCommand(BBuf& buf) const -{ - typedef api::MergeBucketCommand::Node Node; - document::Bucket bucket = getBucket(buf); - uint16_t nodeCount = SH::getShort(buf); - std::vector<Node> nodes; - nodes.reserve(nodeCount); - for (uint32_t i=0; i<nodeCount; ++i) { - uint16_t index(SH::getShort(buf)); - bool sourceOnly = SH::getBoolean(buf); - nodes.push_back(Node(index, sourceOnly)); - } - api::Timestamp timestamp = SH::getLong(buf); - auto msg = std::make_unique<api::GetBucketDiffCommand>(bucket, nodes, timestamp); - std::vector<api::GetBucketDiffCommand::Entry>& entries(msg->getDiff()); - uint32_t entryCount = SH::getInt(buf); - if (entryCount > buf.getRemaining()) { - // Trigger out of bounds exception rather than out of memory error - buf.incPos(entryCount); - } - entries.resize(entryCount); - for (uint32_t i=0; i<entries.size(); ++i) { - onDecodeDiffEntry(buf, entries[i]); - } - onDecodeCommand(buf, *msg); - return msg; -} - -void ProtocolSerialization4_2::onEncode(GBBuf& buf, const api::ApplyBucketDiffCommand& msg) const -{ - putBucket(msg.getBucket(), buf); - const std::vector<api::MergeBucketCommand::Node>& nodes(msg.getNodes()); - buf.putShort(nodes.size()); - for (uint32_t i=0; i<nodes.size(); ++i) { - buf.putShort(nodes[i].index); - buf.putBoolean(nodes[i].sourceOnly); - } - buf.putInt(0x400000); - const std::vector<api::ApplyBucketDiffCommand::Entry>& entries(msg.getDiff()); - buf.putInt(entries.size()); - for (uint32_t i=0; i<entries.size(); ++i) { - onEncodeDiffEntry(buf, entries[i]._entry); - buf.putString(entries[i]._docName); - buf.putInt(entries[i]._headerBlob.size()); - buf.putBytes(&entries[i]._headerBlob[0], entries[i]._headerBlob.size()); - buf.putInt(entries[i]._bodyBlob.size()); - buf.putBytes(&entries[i]._bodyBlob[0], entries[i]._bodyBlob.size()); - } - onEncodeBucketInfoCommand(buf, msg); -} - -api::StorageCommand::UP -ProtocolSerialization4_2::onDecodeApplyBucketDiffCommand(BBuf& buf) const -{ - typedef api::MergeBucketCommand::Node Node; - document::Bucket bucket = getBucket(buf); - uint16_t nodeCount = SH::getShort(buf); - std::vector<Node> nodes; - nodes.reserve(nodeCount); - for (uint32_t i=0; i<nodeCount; ++i) { - uint16_t index(SH::getShort(buf)); - bool sourceOnly = SH::getBoolean(buf); - nodes.push_back(Node(index, sourceOnly)); - } - (void) SH::getInt(buf); // Unused field - auto msg = std::make_unique<api::ApplyBucketDiffCommand>(bucket, nodes); - std::vector<api::ApplyBucketDiffCommand::Entry>& entries(msg->getDiff()); - uint32_t entryCount = SH::getInt(buf); - if (entryCount > buf.getRemaining()) { - // Trigger out of bounds exception rather than out of memory error - buf.incPos(entryCount); - } - entries.resize(entryCount); - for (uint32_t i=0; i<entries.size(); ++i) { - onDecodeDiffEntry(buf, entries[i]._entry); - entries[i]._docName = SH::getString(buf); - uint32_t headerSize = SH::getInt(buf); - if (headerSize > buf.getRemaining()) { - buf.incPos(headerSize); - } - entries[i]._headerBlob.resize(headerSize); - buf.getBytes(&entries[i]._headerBlob[0], entries[i]._headerBlob.size()); - uint32_t bodySize = SH::getInt(buf); - if (bodySize > buf.getRemaining()) { - buf.incPos(bodySize); - } - entries[i]._bodyBlob.resize(bodySize); - buf.getBytes(&entries[i]._bodyBlob[0], entries[i]._bodyBlob.size()); - } - onDecodeBucketInfoCommand(buf, *msg); - return msg; -} - -void -ProtocolSerialization4_2::onEncode(GBBuf& buf, const api::RequestBucketInfoReply& msg) const -{ - buf.putInt(msg.getBucketInfo().size()); - for (const auto & entry : msg.getBucketInfo()) { - buf.putLong(entry._bucketId.getRawId()); - putBucketInfo(entry._info, buf); - } - onEncodeReply(buf, msg); -} - -api::StorageReply::UP -ProtocolSerialization4_2::onDecodeRequestBucketInfoReply(const SCmd& cmd, BBuf& buf) const -{ - auto msg = std::make_unique<api::RequestBucketInfoReply>(static_cast<const api::RequestBucketInfoCommand&>(cmd)); - api::RequestBucketInfoReply::EntryVector & entries(msg->getBucketInfo()); - uint32_t entryCount = SH::getInt(buf); - if (entryCount > buf.getRemaining()) { - // Trigger out of bounds exception rather than out of memory error - buf.incPos(entryCount); - } - entries.resize(entryCount); - for (auto & entry : entries) { - entry._bucketId = document::BucketId(SH::getLong(buf)); - entry._info = getBucketInfo(buf); - } - onDecodeReply(buf, *msg); - return msg; -} - -void ProtocolSerialization4_2::onEncode(GBBuf& buf, const api::NotifyBucketChangeCommand& msg) const -{ - putBucket(msg.getBucket(), buf); - putBucketInfo(msg.getBucketInfo(), buf); - onEncodeCommand(buf, msg); -} - -api::StorageCommand::UP -ProtocolSerialization4_2::onDecodeNotifyBucketChangeCommand(BBuf& buf) const -{ - document::Bucket bucket = getBucket(buf); - api::BucketInfo info(getBucketInfo(buf)); - auto msg = std::make_unique<api::NotifyBucketChangeCommand>(bucket, info); - onDecodeCommand(buf, *msg); - return msg; -} - -void ProtocolSerialization4_2::onEncode(GBBuf& buf, const api::NotifyBucketChangeReply& msg) const -{ - onEncodeReply(buf, msg); -} - -api::StorageReply::UP -ProtocolSerialization4_2::onDecodeNotifyBucketChangeReply(const SCmd& cmd,BBuf& buf) const -{ - auto msg = std::make_unique<api::NotifyBucketChangeReply>(static_cast<const api::NotifyBucketChangeCommand&>(cmd)); - onDecodeReply(buf, *msg); - return msg; -} - -void ProtocolSerialization4_2::onEncode(GBBuf& buf, const api::SplitBucketCommand& msg) const -{ - putBucket(msg.getBucket(), buf); - buf.putByte(msg.getMinSplitBits()); - buf.putByte(msg.getMaxSplitBits()); - buf.putInt(msg.getMinByteSize()); - buf.putInt(msg.getMinDocCount()); - onEncodeCommand(buf, msg); -} - -api::StorageCommand::UP -ProtocolSerialization4_2::onDecodeSplitBucketCommand(BBuf& buf) const -{ - document::Bucket bucket = getBucket(buf); - auto msg = std::make_unique<api::SplitBucketCommand>(bucket); - msg->setMinSplitBits(SH::getByte(buf)); - msg->setMaxSplitBits(SH::getByte(buf)); - msg->setMinByteSize(SH::getInt(buf)); - msg->setMinDocCount(SH::getInt(buf)); - onDecodeCommand(buf, *msg); - return msg; -} - -void ProtocolSerialization4_2::onEncode(GBBuf&, const api::SetBucketStateCommand&) const -{ - throw vespalib::IllegalStateException("Unsupported serialization", VESPA_STRLOC); -} - -api::StorageCommand::UP -ProtocolSerialization4_2::onDecodeSetBucketStateCommand(BBuf&) const -{ - throw vespalib::IllegalStateException("Unsupported deserialization", VESPA_STRLOC); -} - -void ProtocolSerialization4_2::onEncode(GBBuf&, const api::SetBucketStateReply&) const -{ - throw vespalib::IllegalStateException("Unsupported serialization", VESPA_STRLOC); -} - -api::StorageReply::UP -ProtocolSerialization4_2::onDecodeSetBucketStateReply(const SCmd&, BBuf&) const -{ - throw vespalib::IllegalStateException("Unsupported deserialization", VESPA_STRLOC); -} - -void -ProtocolSerialization4_2::onEncode(GBBuf& buf, const api::CreateVisitorCommand& msg) const -{ - putBucketSpace(msg.getBucketSpace(), buf); - buf.putString(msg.getLibraryName()); - buf.putString(msg.getInstanceId()); - buf.putString(msg.getDocumentSelection()); - buf.putInt(msg.getVisitorCmdId()); - buf.putString(msg.getControlDestination()); - buf.putString(msg.getDataDestination()); - buf.putInt(msg.getMaximumPendingReplyCount()); - buf.putLong(msg.getFromTime()); - buf.putLong(msg.getToTime()); - - buf.putInt(msg.getBuckets().size()); - for (uint32_t i = 0; i < msg.getBuckets().size(); i++) { - buf.putLong(msg.getBuckets()[i].getRawId()); - } - - buf.putBoolean(msg.visitRemoves()); - buf.putBoolean(false); - buf.putBoolean(msg.visitInconsistentBuckets()); - buf.putInt(vespalib::count_ms(msg.getQueueTimeout())); - msg.getParameters().serialize(buf); - - onEncodeCommand(buf, msg); -} - -api::StorageCommand::UP -ProtocolSerialization4_2::onDecodeCreateVisitorCommand(BBuf& buf) const -{ - BucketSpace bucketSpace = getBucketSpace(buf); - vespalib::stringref libraryName = SH::getString(buf); - vespalib::stringref instanceId = SH::getString(buf); - vespalib::stringref selection = SH::getString(buf); - auto msg = std::make_unique<api::CreateVisitorCommand>(bucketSpace, libraryName, instanceId, selection); - msg->setVisitorCmdId(SH::getInt(buf)); - msg->setControlDestination(SH::getString(buf)); - msg->setDataDestination(SH::getString(buf)); - msg->setMaximumPendingReplyCount(SH::getInt(buf)); - - msg->setFromTime(SH::getLong(buf)); - msg->setToTime(SH::getLong(buf)); - uint32_t count = SH::getInt(buf); - - if (count > buf.getRemaining()) { - // Trigger out of bounds exception rather than out of memory error - buf.incPos(count); - } - - for (uint32_t i = 0; i < count; i++) { - msg->getBuckets().push_back(document::BucketId(SH::getLong(buf))); - } - - if (SH::getBoolean(buf)) { - msg->setVisitRemoves(); - } - if (SH::getBoolean(buf)) { - msg->setFieldSet(AllFields::NAME); - } - if (SH::getBoolean(buf)) { - msg->setVisitInconsistentBuckets(); - } - msg->setQueueTimeout(std::chrono::milliseconds(SH::getInt(buf))); - msg->getParameters().deserialize(buf); - - onDecodeCommand(buf, *msg); - msg->setVisitorDispatcherVersion(42); - return msg; -} - -void -ProtocolSerialization4_2::onEncode(GBBuf& buf, const api::DestroyVisitorCommand& msg) const -{ - buf.putString(msg.getInstanceId()); - onEncodeCommand(buf, msg); -} - -api::StorageCommand::UP -ProtocolSerialization4_2::onDecodeDestroyVisitorCommand(BBuf& buf) const -{ - vespalib::stringref instanceId = SH::getString(buf); - auto msg = std::make_unique<api::DestroyVisitorCommand>(instanceId); - onDecodeCommand(buf, *msg); - return msg; -} - -void -ProtocolSerialization4_2::onEncode(GBBuf& buf, const api::DestroyVisitorReply& msg) const -{ - onEncodeReply(buf, msg); -} - -api::StorageReply::UP -ProtocolSerialization4_2::onDecodeDestroyVisitorReply(const SCmd& cmd, BBuf& buf) const -{ - auto msg = std::make_unique<api::DestroyVisitorReply>(static_cast<const api::DestroyVisitorCommand&>(cmd)); - onDecodeReply(buf, *msg); - return msg; -} - -void -ProtocolSerialization4_2::onEncode(GBBuf& buf, const api::RemoveLocationCommand& msg) const -{ - buf.putString(msg.getDocumentSelection()); - putBucket(msg.getBucket(), buf); - onEncodeCommand(buf, msg); -} - -api::StorageCommand::UP -ProtocolSerialization4_2::onDecodeRemoveLocationCommand(BBuf& buf) const -{ - vespalib::stringref documentSelection = SH::getString(buf); - document::Bucket bucket = getBucket(buf); - - auto msg = std::make_unique<api::RemoveLocationCommand>(documentSelection, bucket); - onDecodeCommand(buf, *msg); - return msg; -} - -void -ProtocolSerialization4_2::onEncode(GBBuf& buf, const api::RemoveLocationReply& msg) const -{ - onEncodeBucketInfoReply(buf, msg); -} - -api::StorageReply::UP -ProtocolSerialization4_2::onDecodeRemoveLocationReply(const SCmd& cmd, BBuf& buf) const -{ - auto msg = std::make_unique<api::RemoveLocationReply>(static_cast<const api::RemoveLocationCommand&>(cmd)); - onDecodeBucketInfoReply(buf, *msg); - return msg; -} - -void ProtocolSerialization4_2::onEncode(GBBuf&, const api::StatBucketCommand&) const { - throw vespalib::IllegalStateException("StatBucketCommand not expected for legacy protocol version", VESPA_STRLOC); -} - -api::StorageCommand::UP -ProtocolSerialization4_2::onDecodeStatBucketCommand(BBuf&) const { - throw vespalib::IllegalStateException("StatBucketCommand not expected for legacy protocol version", VESPA_STRLOC); -} - -void ProtocolSerialization4_2::onEncode(GBBuf&, const api::StatBucketReply&) const { - throw vespalib::IllegalStateException("StatBucketReply not expected for legacy protocol version", VESPA_STRLOC); -} - -api::StorageReply::UP -ProtocolSerialization4_2::onDecodeStatBucketReply(const SCmd&, BBuf&) const { - throw vespalib::IllegalStateException("StatBucketReply not expected for legacy protocol version", VESPA_STRLOC); -} - -// Utility functions for serialization - -void -ProtocolSerialization4_2::onEncodeBucketInfoCommand(GBBuf& buf, const api::BucketInfoCommand& msg) const -{ - onEncodeCommand(buf, msg); -} - -void -ProtocolSerialization4_2::onDecodeBucketInfoCommand(BBuf& buf, api::BucketInfoCommand& msg) const -{ - onDecodeCommand(buf, msg); -} - -void -ProtocolSerialization4_2::onEncode(GBBuf& buf, const api::ReturnCode& rc) const -{ - // Convert error code to codes used in 4.2 - buf.putInt(getOldErrorCode(rc.getResult())); - buf.putString(rc.getMessage()); -} - -void -ProtocolSerialization4_2::onEncodeDiffEntry(GBBuf& buf, const api::GetBucketDiffCommand::Entry& entry) const -{ - buf.putLong(entry._timestamp); - SH::putGlobalId(entry._gid, buf); - buf.putInt(entry._headerSize); - buf.putInt(entry._bodySize); - buf.putShort(entry._flags); - buf.putShort(entry._hasMask); -} - -void -ProtocolSerialization4_2::onDecodeDiffEntry(BBuf& buf, api::GetBucketDiffCommand::Entry& entry) const -{ - entry._timestamp = SH::getLong(buf); - entry._gid = SH::getGlobalId(buf); - entry._headerSize = SH::getInt(buf); - entry._bodySize = SH::getInt(buf); - entry._flags = SH::getShort(buf); - entry._hasMask = SH::getShort(buf); -} - -} diff --git a/storage/src/vespa/storageapi/mbusprot/protocolserialization4_2.h b/storage/src/vespa/storageapi/mbusprot/protocolserialization4_2.h deleted file mode 100644 index b8a20e9e401..00000000000 --- a/storage/src/vespa/storageapi/mbusprot/protocolserialization4_2.h +++ /dev/null @@ -1,72 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#pragma once - -#include "legacyprotocolserialization.h" - -namespace storage::mbusprot { - -class ProtocolSerialization4_2 : public LegacyProtocolSerialization { -public: - explicit ProtocolSerialization4_2(const std::shared_ptr<const document::DocumentTypeRepo>&); - -protected: - void onEncode(GBBuf&, const api::GetCommand&) const override; - void onEncode(GBBuf&, const api::RemoveCommand&) const override; - void onEncode(GBBuf&, const api::RevertCommand&) const override; - void onEncode(GBBuf&, const api::CreateBucketCommand&) const override; - void onEncode(GBBuf&, const api::MergeBucketCommand&) const override; - void onEncode(GBBuf&, const api::GetBucketDiffCommand&) const override; - void onEncode(GBBuf&, const api::ApplyBucketDiffCommand&) const override; - void onEncode(GBBuf&, const api::RequestBucketInfoReply&) const override; - void onEncode(GBBuf&, const api::NotifyBucketChangeCommand&) const override; - void onEncode(GBBuf&, const api::NotifyBucketChangeReply&) const override; - void onEncode(GBBuf&, const api::SplitBucketCommand&) const override; - void onEncode(GBBuf&, const api::CreateVisitorCommand&) const override; - void onEncode(GBBuf&, const api::DestroyVisitorCommand&) const override; - void onEncode(GBBuf&, const api::DestroyVisitorReply&) const override; - void onEncode(GBBuf&, const api::RemoveLocationCommand&) const override; - void onEncode(GBBuf&, const api::RemoveLocationReply&) const override; - void onEncode(GBBuf&, const api::StatBucketCommand&) const override; - void onEncode(GBBuf&, const api::StatBucketReply&) const override; - - // Not supported on 4.2, but implemented here for simplicity. - void onEncode(GBBuf&, const api::SetBucketStateCommand&) const override; - void onEncode(GBBuf&, const api::SetBucketStateReply&) const override; - - virtual void onEncodeBucketInfoCommand(GBBuf&, const api::BucketInfoCommand&) const; - virtual void onEncodeBucketInfoReply(GBBuf&, const api::BucketInfoReply&) const = 0; - virtual void onEncodeCommand(GBBuf&, const api::StorageCommand&) const = 0; - virtual void onEncodeReply(GBBuf&, const api::StorageReply&) const = 0; - - virtual void onEncodeDiffEntry(GBBuf&, const api::GetBucketDiffCommand::Entry&) const; - virtual void onEncode(GBBuf&, const api::ReturnCode&) const; - SCmd::UP onDecodeGetCommand(BBuf&) const override; - SCmd::UP onDecodeRemoveCommand(BBuf&) const override; - SCmd::UP onDecodeRevertCommand(BBuf&) const override; - SCmd::UP onDecodeCreateBucketCommand(BBuf&) const override; - SCmd::UP onDecodeMergeBucketCommand(BBuf&) const override; - SCmd::UP onDecodeGetBucketDiffCommand(BBuf&) const override; - SCmd::UP onDecodeApplyBucketDiffCommand(BBuf&) const override; - SRep::UP onDecodeRequestBucketInfoReply(const SCmd&, BBuf&) const override; - SCmd::UP onDecodeNotifyBucketChangeCommand(BBuf&) const override; - SRep::UP onDecodeNotifyBucketChangeReply(const SCmd&, BBuf&) const override; - SCmd::UP onDecodeSplitBucketCommand(BBuf&) const override; - SCmd::UP onDecodeSetBucketStateCommand(BBuf&) const override; - SRep::UP onDecodeSetBucketStateReply(const SCmd&, BBuf&) const override; - SCmd::UP onDecodeCreateVisitorCommand(BBuf&) const override; - SCmd::UP onDecodeDestroyVisitorCommand(BBuf&) const override; - SRep::UP onDecodeDestroyVisitorReply(const SCmd&, BBuf&) const override; - SCmd::UP onDecodeRemoveLocationCommand(BBuf&) const override; - SRep::UP onDecodeRemoveLocationReply(const SCmd&, BBuf&) const override; - SCmd::UP onDecodeStatBucketCommand(BBuf&) const override; - SRep::UP onDecodeStatBucketReply(const SCmd&, BBuf&) const override; - - virtual void onDecodeBucketInfoCommand(BBuf&, api::BucketInfoCommand&) const; - virtual void onDecodeBucketInfoReply(BBuf&, api::BucketInfoReply&) const = 0; - virtual void onDecodeCommand(BBuf& buf, api::StorageCommand& msg) const = 0; - virtual void onDecodeReply(BBuf&, api::StorageReply&) const = 0; - - virtual void onDecodeDiffEntry(BBuf&, api::GetBucketDiffCommand::Entry&) const; -}; - -} diff --git a/storage/src/vespa/storageapi/mbusprot/protocolserialization5_0.cpp b/storage/src/vespa/storageapi/mbusprot/protocolserialization5_0.cpp deleted file mode 100644 index e49eb8842ff..00000000000 --- a/storage/src/vespa/storageapi/mbusprot/protocolserialization5_0.cpp +++ /dev/null @@ -1,622 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#include "protocolserialization5_0.h" -#include "serializationhelper.h" -#include "storagecommand.h" -#include "storagereply.h" -#include <vespa/storageapi/message/bucketsplitting.h> -#include <vespa/storageapi/message/visitor.h> -#include <vespa/vdslib/state/clusterstate.h> -#include <vespa/document/update/documentupdate.h> -#include <vespa/document/bucket/fixed_bucket_spaces.h> -#include <sstream> - -using document::BucketSpace; -using document::FixedBucketSpaces; - -namespace storage::mbusprot { - -document::Bucket -ProtocolSerialization5_0::getBucket(document::ByteBuffer& buf) const -{ - document::BucketId bucketId(SH::getLong(buf)); - return document::Bucket(FixedBucketSpaces::default_space(), bucketId); -} - -void -ProtocolSerialization5_0::putBucket(const document::Bucket& bucket, vespalib::GrowableByteBuffer& buf) const -{ - buf.putLong(bucket.getBucketId().getRawId()); - if (bucket.getBucketSpace() != FixedBucketSpaces::default_space()) { - std::ostringstream ost; - ost << "Bucket with bucket space " << bucket.getBucketSpace() << " cannot be serialized on old storageapi protocol."; - throw vespalib::IllegalArgumentException(ost.str(), VESPA_STRLOC); - } -} - -document::BucketSpace -ProtocolSerialization5_0::getBucketSpace(document::ByteBuffer&) const -{ - return FixedBucketSpaces::default_space(); -} - -void -ProtocolSerialization5_0::putBucketSpace(document::BucketSpace bucketSpace, vespalib::GrowableByteBuffer&) const -{ - if (bucketSpace != FixedBucketSpaces::default_space()) { - std::ostringstream ost; - ost << "Bucket space " << bucketSpace << " cannot be serialized on old storageapi protocol."; - throw vespalib::IllegalArgumentException(ost.str(), VESPA_STRLOC); - } -} - -api::BucketInfo -ProtocolSerialization5_0::getBucketInfo(document::ByteBuffer& buf) const -{ - uint32_t crc(SH::getInt(buf)); - uint32_t doccount(SH::getInt(buf)); - uint32_t docsize(SH::getInt(buf)); - uint32_t metacount(SH::getInt(buf)); - uint32_t usedsize(SH::getInt(buf)); - return api::BucketInfo(crc, doccount, docsize, metacount, usedsize); -} - -void -ProtocolSerialization5_0::putBucketInfo(const api::BucketInfo& info, vespalib::GrowableByteBuffer& buf) const -{ - buf.putInt(info.getChecksum()); - buf.putInt(info.getDocumentCount()); - buf.putInt(info.getTotalDocumentSize()); - buf.putInt(info.getMetaCount()); - buf.putInt(info.getUsedFileSize()); -} - -void -ProtocolSerialization5_0::onEncodeReply(GBBuf& buf, const api::StorageReply& msg) const -{ - SH::putReturnCode(msg.getResult(), buf); - buf.putLong(msg.getMsgId()); - buf.putByte(msg.getPriority()); -} - -void -ProtocolSerialization5_0::onDecodeReply(BBuf& buf, api::StorageReply& msg) const -{ - msg.setResult(SH::getReturnCode(buf)); - msg.forceMsgId(SH::getLong(buf)); - msg.setPriority(SH::getByte(buf)); -} - -void -ProtocolSerialization5_0::onEncodeCommand(GBBuf& buf, const api::StorageCommand& msg) const -{ - buf.putLong(msg.getMsgId()); - buf.putByte(msg.getPriority()); - buf.putShort(msg.getSourceIndex()); - buf.putInt(0); // LoadType 'default' -} - -void -ProtocolSerialization5_0::onDecodeCommand(BBuf& buf, api::StorageCommand& msg) const -{ - msg.forceMsgId(SH::getLong(buf)); - uint8_t priority = SH::getByte(buf); - msg.setPriority(priority); - msg.setSourceIndex(SH::getShort(buf)); - (void)SH::getInt(buf); // LoadType -} - - -ProtocolSerialization5_0::ProtocolSerialization5_0(const std::shared_ptr<const document::DocumentTypeRepo>& repo) - : ProtocolSerialization4_2(repo) -{ -} - -void ProtocolSerialization5_0::onEncode(GBBuf& buf, const api::PutReply& msg) const -{ - buf.putBoolean(msg.wasFound()); - onEncodeBucketInfoReply(buf, msg); -} - -void ProtocolSerialization5_0::onEncode(GBBuf& buf, const api::PutCommand& msg) const -{ - SH::putDocument(msg.getDocument().get(), buf); - putBucket(msg.getBucket(), buf); - buf.putLong(msg.getTimestamp()); - buf.putLong(msg.getUpdateTimestamp()); - onEncodeBucketInfoCommand(buf, msg); -} - -api::StorageCommand::UP -ProtocolSerialization5_0::onDecodePutCommand(BBuf& buf) const -{ - document::Document::SP doc(SH::getDocument(buf, getTypeRepo())); - document::Bucket bucket = getBucket(buf); - api::Timestamp ts(SH::getLong(buf)); - auto msg = std::make_unique<api::PutCommand>(bucket, doc, ts); - msg->setUpdateTimestamp(SH::getLong(buf)); - onDecodeBucketInfoCommand(buf, *msg); - return msg; -} - -api::StorageReply::UP -ProtocolSerialization5_0::onDecodePutReply(const SCmd& cmd, BBuf& buf) const -{ - bool wasFound = SH::getBoolean(buf); - auto msg = std::make_unique<api::PutReply>(static_cast<const api::PutCommand&>(cmd), wasFound); - onDecodeBucketInfoReply(buf, *msg); - return msg; -} - -void ProtocolSerialization5_0::onEncode(GBBuf& buf, const api::UpdateReply& msg) const -{ - buf.putLong(msg.getOldTimestamp()); - onEncodeBucketInfoReply(buf, msg); -} - -api::StorageReply::UP -ProtocolSerialization5_0::onDecodeUpdateReply(const SCmd& cmd, BBuf& buf) const -{ - api::Timestamp oldTimestamp(SH::getLong(buf)); - auto msg = std::make_unique<api::UpdateReply>(static_cast<const api::UpdateCommand&>(cmd), oldTimestamp); - onDecodeBucketInfoReply(buf, *msg); - return msg; -} - -void ProtocolSerialization5_0::onEncode(GBBuf& buf, const api::GetReply& msg) const -{ - SH::putDocument(msg.getDocument().get(), buf); - // Old protocol version doesn't understand tombstones. Make it appear as Not Found. - buf.putLong(msg.is_tombstone() ? api::Timestamp(0) : msg.getLastModifiedTimestamp()); - onEncodeBucketInfoReply(buf, msg); -} - -api::StorageReply::UP -ProtocolSerialization5_0::onDecodeGetReply(const SCmd& cmd, BBuf& buf) const -{ - try { - document::Document::SP doc(SH::getDocument(buf, getTypeRepo())); - api::Timestamp lastModified(SH::getLong(buf)); - auto msg = std::make_unique<api::GetReply>(static_cast<const api::GetCommand&>(cmd), doc,lastModified); - onDecodeBucketInfoReply(buf, *msg); - return msg; - } catch (std::exception& e) { - auto msg = std::make_unique<api::GetReply>(static_cast<const api::GetCommand&>(cmd), document::Document::SP(),0); - msg->setResult(api::ReturnCode(api::ReturnCode::UNPARSEABLE, e.what())); - return msg; - } -} - -void ProtocolSerialization5_0::onEncode(GBBuf& buf, const api::RemoveReply& msg) const -{ - buf.putLong(msg.getOldTimestamp()); - onEncodeBucketInfoReply(buf, msg); -} - -api::StorageReply::UP -ProtocolSerialization5_0::onDecodeRemoveReply(const SCmd& cmd, BBuf& buf) const -{ - api::Timestamp oldTimestamp(SH::getLong(buf)); - auto msg = std::make_unique<api::RemoveReply>(static_cast<const api::RemoveCommand&>(cmd), oldTimestamp); - onDecodeBucketInfoReply(buf, *msg); - return msg; -} - -void ProtocolSerialization5_0::onEncode(GBBuf& buf, const api::UpdateCommand& msg) const -{ - document::DocumentUpdate* update = msg.getUpdate().get(); - if (update) { - vespalib::nbostream stream; - update->serializeHEAD(stream); - buf.putInt(stream.size()); - buf.putBytes(stream.peek(), stream.size()); - } else { - buf.putInt(0); - } - - putBucket(msg.getBucket(), buf); - buf.putLong(msg.getTimestamp()); - buf.putLong(msg.getOldTimestamp()); - onEncodeBucketInfoCommand(buf, msg); -} - -api::StorageCommand::UP -ProtocolSerialization5_0::onDecodeUpdateCommand(BBuf& buf) const -{ - document::DocumentUpdate::SP update; - - uint32_t size = SH::getInt(buf); - if (size != 0) { - update = document::DocumentUpdate::createHEAD(getTypeRepo(), vespalib::nbostream(buf.getBufferAtPos(), size)); - buf.incPos(size); - } - - document::Bucket bucket = getBucket(buf); - api::Timestamp timestamp(SH::getLong(buf)); - api::UpdateCommand::UP msg = std::make_unique<api::UpdateCommand>(bucket, update, timestamp); - msg->setOldTimestamp(SH::getLong(buf)); - onDecodeBucketInfoCommand(buf, *msg); - return msg; -} - - -void ProtocolSerialization5_0::onEncode(GBBuf& buf, const api::RevertReply& msg) const -{ - onEncodeBucketInfoReply(buf, msg); -} - -api::StorageReply::UP -ProtocolSerialization5_0::onDecodeRevertReply(const SCmd& cmd, BBuf& buf) const -{ - auto msg = std::make_unique<api::RevertReply>(static_cast<const api::RevertCommand&>(cmd)); - onDecodeBucketInfoReply(buf, *msg); - return msg; -} - -void ProtocolSerialization5_0::onEncode(GBBuf& buf, const api::CreateBucketReply& msg) const -{ - onEncodeBucketInfoReply(buf, msg); -} - -api::StorageReply::UP -ProtocolSerialization5_0::onDecodeCreateBucketReply(const SCmd& cmd, BBuf& buf) const -{ - auto msg = std::make_unique<api::CreateBucketReply>(static_cast<const api::CreateBucketCommand&>(cmd)); - onDecodeBucketInfoReply(buf, *msg); - return msg; -} - -void -ProtocolSerialization5_0::onEncode(GBBuf& buf, const api::DeleteBucketCommand& msg) const -{ - putBucket(msg.getBucket(), buf); - onEncodeBucketInfoCommand(buf, msg); - putBucketInfo(msg.getBucketInfo(), buf); -} - -api::StorageCommand::UP -ProtocolSerialization5_0::onDecodeDeleteBucketCommand(BBuf& buf) const -{ - document::Bucket bucket = getBucket(buf); - auto msg = std::make_unique<api::DeleteBucketCommand>(bucket); - onDecodeBucketInfoCommand(buf, *msg); - if (buf.getRemaining() >= SH::BUCKET_INFO_SERIALIZED_SIZE) { - msg->setBucketInfo(getBucketInfo(buf)); - } - return msg; -} - -void ProtocolSerialization5_0::onEncode(GBBuf& buf, const api::DeleteBucketReply& msg) const -{ - onEncodeBucketInfoReply(buf, msg); -} - -api::StorageReply::UP -ProtocolSerialization5_0::onDecodeDeleteBucketReply(const SCmd& cmd, BBuf& buf) const -{ - auto msg = std::make_unique<api::DeleteBucketReply>(static_cast<const api::DeleteBucketCommand&>(cmd)); - onDecodeBucketInfoReply(buf, *msg); - return msg; -} - -void ProtocolSerialization5_0::onEncode(GBBuf& buf, const api::MergeBucketCommand& msg) const -{ - ProtocolSerialization4_2::onEncode(buf, msg); - - buf.putInt(msg.getClusterStateVersion()); - const std::vector<uint16_t>& chain(msg.getChain()); - buf.putShort(chain.size()); - for (std::size_t i = 0; i < chain.size(); ++i) { - buf.putShort(chain[i]); - } -} - -api::StorageCommand::UP -ProtocolSerialization5_0::onDecodeMergeBucketCommand(BBuf& buf) const -{ - api::StorageCommand::UP cmd = ProtocolSerialization4_2::onDecodeMergeBucketCommand(buf); - uint32_t clusterStateVersion = SH::getInt(buf); - uint16_t chainSize = SH::getShort(buf); - std::vector<uint16_t> chain; - chain.reserve(chainSize); - for (std::size_t i = 0; i < chainSize; ++i) { - uint16_t index = SH::getShort(buf); - chain.push_back(index); - } - api::MergeBucketCommand& mergeCmd = static_cast<api::MergeBucketCommand&>(*cmd); - mergeCmd.setChain(chain); - mergeCmd.setClusterStateVersion(clusterStateVersion); - return cmd; -} - -void ProtocolSerialization5_0::onEncode(GBBuf& buf, const api::MergeBucketReply& msg) const -{ - onEncodeBucketReply(buf, msg); -} - -api::StorageReply::UP -ProtocolSerialization5_0::onDecodeMergeBucketReply(const SCmd& cmd, BBuf& buf) const -{ - auto msg = std::make_unique<api::MergeBucketReply>(static_cast<const api::MergeBucketCommand&>(cmd)); - onDecodeBucketReply(buf, *msg); - return msg; -} - -void ProtocolSerialization5_0::onEncode(GBBuf& buf, const api::GetBucketDiffReply& msg) const -{ - const std::vector<api::GetBucketDiffCommand::Entry>& entries(msg.getDiff()); - buf.putInt(entries.size()); - for (uint32_t i=0; i<entries.size(); ++i) { - onEncodeDiffEntry(buf, entries[i]); - } - onEncodeBucketReply(buf, msg); -} - -api::StorageReply::UP -ProtocolSerialization5_0::onDecodeGetBucketDiffReply(const SCmd& cmd, BBuf& buf) const -{ - auto msg = std::make_unique<api::GetBucketDiffReply>(static_cast<const api::GetBucketDiffCommand&>(cmd)); - std::vector<api::GetBucketDiffCommand::Entry>& entries(msg->getDiff()); - uint32_t entryCount = SH::getInt(buf); - if (entryCount > buf.getRemaining()) { - // Trigger out of bounds exception rather than out of memory error - buf.incPos(entryCount); - } - entries.resize(entryCount); - for (uint32_t i=0; i<entries.size(); ++i) { - onDecodeDiffEntry(buf, entries[i]); - } - onDecodeBucketReply(buf, *msg); - return msg; -} - -void ProtocolSerialization5_0::onEncode(GBBuf& buf, const api::ApplyBucketDiffReply& msg) const -{ - const std::vector<api::ApplyBucketDiffCommand::Entry>& entries(msg.getDiff()); - buf.putInt(entries.size()); - for (uint32_t i=0; i<entries.size(); ++i) { - onEncodeDiffEntry(buf, entries[i]._entry); - buf.putString(entries[i]._docName); - buf.putInt(entries[i]._headerBlob.size()); - buf.putBytes(&entries[i]._headerBlob[0], entries[i]._headerBlob.size()); - buf.putInt(entries[i]._bodyBlob.size()); - buf.putBytes(&entries[i]._bodyBlob[0], entries[i]._bodyBlob.size()); - } - onEncodeBucketInfoReply(buf, msg); -} - -api::StorageReply::UP -ProtocolSerialization5_0::onDecodeApplyBucketDiffReply(const SCmd& cmd, - BBuf& buf) const -{ - auto msg = std::make_unique<api::ApplyBucketDiffReply>(static_cast<const api::ApplyBucketDiffCommand&>(cmd)); - std::vector<api::ApplyBucketDiffCommand::Entry>& entries(msg->getDiff()); - uint32_t entryCount = SH::getInt(buf); - if (entryCount > buf.getRemaining()) { - // Trigger out of bounds exception rather than out of memory error - buf.incPos(entryCount); - } - entries.resize(entryCount); - for (uint32_t i=0; i<entries.size(); ++i) { - onDecodeDiffEntry(buf, entries[i]._entry); - entries[i]._docName = SH::getString(buf); - uint32_t headerSize = SH::getInt(buf); - if (headerSize > buf.getRemaining()) { - buf.incPos(headerSize); - } - entries[i]._headerBlob.resize(headerSize); - buf.getBytes(&entries[i]._headerBlob[0], entries[i]._headerBlob.size()); - uint32_t bodySize = SH::getInt(buf); - if (bodySize > buf.getRemaining()) { - buf.incPos(bodySize); - } - entries[i]._bodyBlob.resize(bodySize); - buf.getBytes(&entries[i]._bodyBlob[0], entries[i]._bodyBlob.size()); - } - onDecodeBucketInfoReply(buf, *msg); - return msg; -} - -void ProtocolSerialization5_0::onEncode(GBBuf& buf, const api::SplitBucketReply& msg) const -{ - const std::vector<api::SplitBucketReply::Entry>& entries(msg.getSplitInfo()); - buf.putInt(entries.size()); - for (std::vector<api::SplitBucketReply::Entry>::const_iterator it - = entries.begin(); it != entries.end(); ++it) - { - buf.putLong(it->first.getRawId()); - putBucketInfo(it->second, buf); - } - onEncodeBucketReply(buf, msg); -} - -api::StorageReply::UP -ProtocolSerialization5_0::onDecodeSplitBucketReply(const SCmd& cmd, BBuf& buf) const -{ - auto msg = std::make_unique<api::SplitBucketReply>(static_cast<const api::SplitBucketCommand&>(cmd)); - std::vector<api::SplitBucketReply::Entry>& entries(msg->getSplitInfo()); - uint32_t targetCount = SH::getInt(buf); - if (targetCount > buf.getRemaining()) { - // Trigger out of bounds exception rather than out of memory error - buf.incPos(targetCount); - } - entries.resize(targetCount); - for (std::vector<api::SplitBucketReply::Entry>::iterator it - = entries.begin(); it != entries.end(); ++it) - { - it->first = document::BucketId(SH::getLong(buf)); - it->second = getBucketInfo(buf); - } - onDecodeBucketReply(buf, *msg); - return msg; -} - -void -ProtocolSerialization5_0::onEncode(GBBuf& buf, const api::JoinBucketsCommand& msg) const -{ - putBucket(msg.getBucket(), buf); - buf.putInt(msg.getSourceBuckets().size()); - for (uint32_t i=0, n=msg.getSourceBuckets().size(); i<n; ++i) { - buf.putLong(msg.getSourceBuckets()[i].getRawId()); - } - buf.putByte(msg.getMinJoinBits()); - onEncodeCommand(buf, msg); -} - -api::StorageCommand::UP -ProtocolSerialization5_0::onDecodeJoinBucketsCommand(BBuf& buf) const -{ - document::Bucket bucket = getBucket(buf); - auto msg = std::make_unique<api::JoinBucketsCommand>(bucket); - uint32_t size = SH::getInt(buf); - if (size > buf.getRemaining()) { - // Trigger out of bounds exception rather than out of memory error - buf.incPos(size); - } - std::vector<document::BucketId>& entries(msg->getSourceBuckets()); - for (uint32_t i=0; i<size; ++i) { - entries.push_back(document::BucketId(SH::getLong(buf))); - } - msg->setMinJoinBits(SH::getByte(buf)); - onDecodeCommand(buf, *msg); - return msg; -} - -void -ProtocolSerialization5_0::onEncode(GBBuf& buf, const api::JoinBucketsReply& msg) const -{ - putBucketInfo(msg.getBucketInfo(), buf); - onEncodeBucketReply(buf, msg); -} - -api::StorageReply::UP -ProtocolSerialization5_0::onDecodeJoinBucketsReply(const SCmd& cmd, BBuf& buf) const -{ - auto msg = std::make_unique<api::JoinBucketsReply>(static_cast<const api::JoinBucketsCommand&>(cmd)); - msg->setBucketInfo(getBucketInfo(buf)); - onDecodeBucketReply(buf, *msg); - return msg; -} - -void -ProtocolSerialization5_0::onEncodeBucketInfoReply(GBBuf& buf, const api::BucketInfoReply& msg) const -{ - onEncodeBucketReply(buf, msg); - putBucketInfo(msg.getBucketInfo(), buf); -} - -void -ProtocolSerialization5_0::onDecodeBucketInfoReply(BBuf& buf, api::BucketInfoReply& msg) const -{ - onDecodeBucketReply(buf, msg); - msg.setBucketInfo(getBucketInfo(buf)); -} - -void -ProtocolSerialization5_0::onEncodeBucketReply(GBBuf& buf, const api::BucketReply& msg) const -{ - onEncodeReply(buf, msg); - buf.putLong(msg.hasBeenRemapped() ? msg.getBucketId().getRawId() : 0); -} - -void -ProtocolSerialization5_0::onDecodeBucketReply(BBuf& buf, api::BucketReply& msg) const -{ - onDecodeReply(buf, msg); - document::BucketId bucket(SH::getLong(buf)); - if (bucket.getRawId() != 0) { - msg.remapBucketId(bucket); - } -} - -void -ProtocolSerialization5_0::onEncode(GBBuf& buf, const api::CreateVisitorReply& msg) const -{ - onEncodeReply(buf, msg); - buf.putInt(msg.getVisitorStatistics().getBucketsVisited()); - buf.putLong(msg.getVisitorStatistics().getDocumentsVisited()); - buf.putLong(msg.getVisitorStatistics().getBytesVisited()); - buf.putLong(msg.getVisitorStatistics().getDocumentsReturned()); - buf.putLong(msg.getVisitorStatistics().getBytesReturned()); - buf.putLong(0); - buf.putLong(0); -} - -api::StorageReply::UP -ProtocolSerialization5_0::onDecodeCreateVisitorReply(const SCmd& cmd, BBuf& buf) const -{ - auto msg = std::make_unique<api::CreateVisitorReply>(static_cast<const api::CreateVisitorCommand&>(cmd)); - onDecodeReply(buf, *msg); - - vdslib::VisitorStatistics vs; - vs.setBucketsVisited(SH::getInt(buf)); - vs.setDocumentsVisited(SH::getLong(buf)); - vs.setBytesVisited(SH::getLong(buf)); - vs.setDocumentsReturned(SH::getLong(buf)); - vs.setBytesReturned(SH::getLong(buf)); - SH::getLong(buf); - SH::getLong(buf); - msg->setVisitorStatistics(vs); - - return msg; -} - -void ProtocolSerialization5_0::onEncode(GBBuf& buf, const api::RequestBucketInfoCommand& msg) const -{ - const std::vector<document::BucketId>& buckets(msg.getBuckets()); - buf.putInt(buckets.size()); - for (uint32_t i=0; i<buckets.size(); ++i) { - buf.putLong(buckets[i].getRawId()); - } - putBucketSpace(msg.getBucketSpace(), buf); - if (buckets.size() == 0) { - buf.putShort(msg.getDistributor()); - buf.putString(msg.getSystemState().toString()); - buf.putString(msg.getDistributionHash()); - } - - onEncodeCommand(buf, msg); -} - -api::StorageCommand::UP -ProtocolSerialization5_0::onDecodeRequestBucketInfoCommand(BBuf& buf) const -{ - std::vector<document::BucketId> buckets(SH::getInt(buf)); - for (uint32_t i=0; i<buckets.size(); ++i) { - buckets[i] = document::BucketId(SH::getLong(buf)); - } - api::RequestBucketInfoCommand::UP msg; - BucketSpace bucketSpace = getBucketSpace(buf); - if (buckets.size() != 0) { - msg.reset(new api::RequestBucketInfoCommand(bucketSpace, buckets)); - } else { - int distributor = SH::getShort(buf); - lib::ClusterState state(SH::getString(buf)); - msg.reset(new api::RequestBucketInfoCommand(bucketSpace, distributor, state, SH::getString(buf))); - } - onDecodeCommand(buf, *msg); - return msg; -} - -void -ProtocolSerialization5_0::onEncode(GBBuf& buf, const api::CreateVisitorCommand& cmd) const -{ - ProtocolSerialization4_2::onEncode(buf, cmd); - - buf.putInt(0); // Unused - buf.putInt(cmd.getMaxBucketsPerVisitor()); -} - -api::StorageCommand::UP -ProtocolSerialization5_0::onDecodeCreateVisitorCommand(BBuf& buf) const -{ - api::StorageCommand::UP cvc = ProtocolSerialization4_2::onDecodeCreateVisitorCommand(buf); - SH::getInt(buf); // Unused - - static_cast<api::CreateVisitorCommand*>(cvc.get())->setMaxBucketsPerVisitor(SH::getInt(buf)); - - static_cast<api::CreateVisitorCommand*>(cvc.get())->setVisitorDispatcherVersion(50); - return cvc; -} - -} diff --git a/storage/src/vespa/storageapi/mbusprot/protocolserialization5_0.h b/storage/src/vespa/storageapi/mbusprot/protocolserialization5_0.h deleted file mode 100644 index 8de2edab17f..00000000000 --- a/storage/src/vespa/storageapi/mbusprot/protocolserialization5_0.h +++ /dev/null @@ -1,73 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#pragma once - -#include "protocolserialization4_2.h" - -namespace storage::mbusprot { - -class ProtocolSerialization5_0 : public ProtocolSerialization4_2 { -public: - ProtocolSerialization5_0(const std::shared_ptr<const document::DocumentTypeRepo>&); - - document::Bucket getBucket(document::ByteBuffer& buf) const override; - void putBucket(const document::Bucket& bucket, vespalib::GrowableByteBuffer& buf) const override; - document::BucketSpace getBucketSpace(document::ByteBuffer& buf) const override; - void putBucketSpace(document::BucketSpace bucketSpace, vespalib::GrowableByteBuffer& buf) const override; - api::BucketInfo getBucketInfo(document::ByteBuffer& buf) const override; - void putBucketInfo(const api::BucketInfo& info, vespalib::GrowableByteBuffer& buf) const override; - - void onEncode(GBBuf&, const api::PutCommand&) const override; - void onEncode(GBBuf&, const api::PutReply&) const override; - void onEncode(GBBuf&, const api::UpdateCommand&) const override; - void onEncode(GBBuf&, const api::UpdateReply&) const override; - void onEncode(GBBuf&, const api::GetReply&) const override; - void onEncode(GBBuf&, const api::RemoveReply&) const override; - void onEncode(GBBuf&, const api::RevertReply&) const override; - void onEncode(GBBuf&, const api::CreateBucketReply&) const override; - void onEncode(GBBuf&, const api::DeleteBucketCommand&) const override; - void onEncode(GBBuf&, const api::DeleteBucketReply&) const override; - void onEncode(GBBuf&, const api::MergeBucketCommand&) const override; - void onEncode(GBBuf&, const api::MergeBucketReply&) const override; - void onEncode(GBBuf&, const api::GetBucketDiffReply&) const override; - void onEncode(GBBuf&, const api::ApplyBucketDiffReply&) const override; - void onEncode(GBBuf&, const api::SplitBucketReply&) const override; - void onEncode(GBBuf&, const api::JoinBucketsCommand&) const override; - void onEncode(GBBuf&, const api::JoinBucketsReply&) const override; - void onEncode(GBBuf&, const api::RequestBucketInfoCommand&) const override; - - void onEncodeBucketInfoReply(GBBuf&, const api::BucketInfoReply&) const override; - virtual void onEncodeBucketReply(GBBuf&, const api::BucketReply&) const; - - void onEncode(GBBuf&, const api::CreateVisitorCommand& msg) const override; - void onEncode(GBBuf&, const api::CreateVisitorReply& msg) const override; - void onEncodeCommand(GBBuf&, const api::StorageCommand&) const override; - void onEncodeReply(GBBuf&, const api::StorageReply&) const override; - - SCmd::UP onDecodePutCommand(BBuf&) const override; - SRep::UP onDecodePutReply(const SCmd&, BBuf&) const override; - SCmd::UP onDecodeUpdateCommand(BBuf&) const override; - SRep::UP onDecodeUpdateReply(const SCmd&, BBuf&) const override; - SRep::UP onDecodeGetReply(const SCmd&, BBuf&) const override; - SRep::UP onDecodeRemoveReply(const SCmd&, BBuf&) const override; - SRep::UP onDecodeRevertReply(const SCmd&, BBuf&) const override; - SRep::UP onDecodeCreateBucketReply(const SCmd&, BBuf&) const override; - SCmd::UP onDecodeDeleteBucketCommand(BBuf&) const override; - SRep::UP onDecodeDeleteBucketReply(const SCmd&, BBuf&) const override; - SCmd::UP onDecodeMergeBucketCommand(BBuf&) const override; - SRep::UP onDecodeMergeBucketReply(const SCmd&, BBuf&) const override; - SRep::UP onDecodeGetBucketDiffReply(const SCmd&, BBuf&) const override; - SRep::UP onDecodeApplyBucketDiffReply(const SCmd&, BBuf&) const override; - SRep::UP onDecodeSplitBucketReply(const SCmd&, BBuf&) const override; - SCmd::UP onDecodeJoinBucketsCommand(BBuf& buf) const override; - SRep::UP onDecodeJoinBucketsReply(const SCmd& cmd, BBuf& buf) const override; - SCmd::UP onDecodeCreateVisitorCommand(BBuf&) const override; - SCmd::UP onDecodeRequestBucketInfoCommand(BBuf& buf) const override; - - void onDecodeBucketInfoReply(BBuf&, api::BucketInfoReply&) const override; - virtual void onDecodeBucketReply(BBuf&, api::BucketReply&) const; - SRep::UP onDecodeCreateVisitorReply(const SCmd& cmd, BBuf& buf) const override; - void onDecodeCommand(BBuf& buf, api::StorageCommand& msg) const override; - void onDecodeReply(BBuf&, api::StorageReply&) const override; -}; - -} diff --git a/storage/src/vespa/storageapi/mbusprot/protocolserialization5_1.cpp b/storage/src/vespa/storageapi/mbusprot/protocolserialization5_1.cpp deleted file mode 100644 index 97ceceac33d..00000000000 --- a/storage/src/vespa/storageapi/mbusprot/protocolserialization5_1.cpp +++ /dev/null @@ -1,198 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#include "protocolserialization5_1.h" -#include "serializationhelper.h" -#include "storagecommand.h" -#include "storagereply.h" -#include <vespa/storageapi/message/bucketsplitting.h> -#include <vespa/storageapi/message/visitor.h> - -using document::BucketSpace; - -namespace storage::mbusprot { - -api::BucketInfo -ProtocolSerialization5_1::getBucketInfo(document::ByteBuffer& buf) const -{ - uint64_t lastModified(SH::getLong(buf)); - uint32_t crc(SH::getInt(buf)); - uint32_t doccount(SH::getInt(buf)); - uint32_t docsize(SH::getInt(buf)); - uint32_t metacount(SH::getInt(buf)); - uint32_t usedsize(SH::getInt(buf)); - uint8_t flags(SH::getByte(buf)); - bool ready = (flags & BUCKET_READY) != 0; - bool active = (flags & BUCKET_ACTIVE) != 0; - return api::BucketInfo(crc, doccount, docsize, - metacount, usedsize, - ready, active, lastModified); -} - -void -ProtocolSerialization5_1::putBucketInfo( - const api::BucketInfo& info, vespalib::GrowableByteBuffer& buf) const -{ - buf.putLong(info.getLastModified()); - buf.putInt(info.getChecksum()); - buf.putInt(info.getDocumentCount()); - buf.putInt(info.getTotalDocumentSize()); - buf.putInt(info.getMetaCount()); - buf.putInt(info.getUsedFileSize()); - uint8_t flags = (info.isReady() ? BUCKET_READY : 0) | - (info.isActive() ? BUCKET_ACTIVE : 0); - buf.putByte(flags); -} - -ProtocolSerialization5_1::ProtocolSerialization5_1( - const std::shared_ptr<const document::DocumentTypeRepo>& repo) - : ProtocolSerialization5_0(repo) -{ -} - -void ProtocolSerialization5_1::onEncode(GBBuf& buf, const api::SetBucketStateCommand& msg) const -{ - putBucket(msg.getBucket(), buf); - buf.putByte(static_cast<uint8_t>(msg.getState())); - onEncodeCommand(buf, msg); -} - -api::StorageCommand::UP -ProtocolSerialization5_1::onDecodeSetBucketStateCommand(BBuf& buf) const -{ - document::Bucket bucket = getBucket(buf); - api::SetBucketStateCommand::BUCKET_STATE state( - static_cast<api::SetBucketStateCommand::BUCKET_STATE>(SH::getByte(buf))); - auto msg = std::make_unique<api::SetBucketStateCommand>(bucket, state); - onDecodeCommand(buf, *msg); - return msg; -} - -void ProtocolSerialization5_1::onEncode(GBBuf& buf, const api::SetBucketStateReply& msg) const -{ - onEncodeBucketReply(buf, msg); -} - -api::StorageReply::UP -ProtocolSerialization5_1::onDecodeSetBucketStateReply(const SCmd& cmd, BBuf& buf) const -{ - auto msg = std::make_unique<api::SetBucketStateReply>(static_cast<const api::SetBucketStateCommand&>(cmd)); - onDecodeBucketReply(buf, *msg); - return msg; -} - -void ProtocolSerialization5_1::onEncode(GBBuf& buf, const api::GetCommand& msg) const -{ - buf.putString(msg.getDocumentId().toString()); - putBucket(msg.getBucket(), buf); - buf.putLong(msg.getBeforeTimestamp()); - buf.putString(msg.getFieldSet()); - onEncodeCommand(buf, msg); -} - -api::StorageCommand::UP -ProtocolSerialization5_1::onDecodeGetCommand(BBuf& buf) const -{ - document::DocumentId did(SH::getString(buf)); - document::Bucket bucket = getBucket(buf); - api::Timestamp beforeTimestamp(SH::getLong(buf)); - std::string fieldSet(SH::getString(buf)); - auto msg = std::make_unique<api::GetCommand>(bucket, did, fieldSet, beforeTimestamp); - onDecodeCommand(buf, *msg); - return msg; -} - -void -ProtocolSerialization5_1::onEncode(GBBuf& buf, const api::CreateVisitorCommand& msg) const -{ - putBucketSpace(msg.getBucketSpace(), buf); - buf.putString(msg.getLibraryName()); - buf.putString(msg.getInstanceId()); - buf.putString(msg.getDocumentSelection()); - buf.putInt(msg.getVisitorCmdId()); - buf.putString(msg.getControlDestination()); - buf.putString(msg.getDataDestination()); - buf.putInt(msg.getMaximumPendingReplyCount()); - buf.putLong(msg.getFromTime()); - buf.putLong(msg.getToTime()); - - buf.putInt(msg.getBuckets().size()); - for (uint32_t i = 0; i < msg.getBuckets().size(); i++) { - buf.putLong(msg.getBuckets()[i].getRawId()); - } - - buf.putBoolean(msg.visitRemoves()); - buf.putString(msg.getFieldSet()); - buf.putBoolean(msg.visitInconsistentBuckets()); - buf.putInt(vespalib::count_ms(msg.getQueueTimeout())); - msg.getParameters().serialize(buf); - - onEncodeCommand(buf, msg); - - buf.putInt(0); // Unused - buf.putInt(msg.getMaxBucketsPerVisitor()); -} - -api::StorageCommand::UP -ProtocolSerialization5_1::onDecodeCreateVisitorCommand(BBuf& buf) const -{ - BucketSpace bucketSpace = getBucketSpace(buf); - vespalib::stringref libraryName = SH::getString(buf); - vespalib::stringref instanceId = SH::getString(buf); - vespalib::stringref selection = SH::getString(buf); - auto msg = std::make_unique<api::CreateVisitorCommand>(bucketSpace, libraryName, instanceId, selection); - msg->setVisitorCmdId(SH::getInt(buf)); - msg->setControlDestination(SH::getString(buf)); - msg->setDataDestination(SH::getString(buf)); - msg->setMaximumPendingReplyCount(SH::getInt(buf)); - - msg->setFromTime(SH::getLong(buf)); - msg->setToTime(SH::getLong(buf)); - uint32_t count = SH::getInt(buf); - - if (count > buf.getRemaining()) { - // Trigger out of bounds exception rather than out of memory error - buf.incPos(count); - } - - for (uint32_t i = 0; i < count; i++) { - msg->getBuckets().push_back(document::BucketId(SH::getLong(buf))); - } - - if (SH::getBoolean(buf)) { - msg->setVisitRemoves(); - } - - msg->setFieldSet(SH::getString(buf)); - - if (SH::getBoolean(buf)) { - msg->setVisitInconsistentBuckets(); - } - msg->setQueueTimeout(std::chrono::milliseconds(SH::getInt(buf))); - msg->getParameters().deserialize(buf); - - onDecodeCommand(buf, *msg); - SH::getInt(buf); // Unused - msg->setMaxBucketsPerVisitor(SH::getInt(buf)); - msg->setVisitorDispatcherVersion(50); - return msg; -} - -void ProtocolSerialization5_1::onEncode(GBBuf& buf, const api::CreateBucketCommand& msg) const -{ - putBucket(msg.getBucket(), buf); - buf.putBoolean(msg.getActive()); - onEncodeBucketInfoCommand(buf, msg); -} - -api::StorageCommand::UP -ProtocolSerialization5_1::onDecodeCreateBucketCommand(BBuf& buf) const -{ - document::Bucket bucket = getBucket(buf); - bool setActive = SH::getBoolean(buf); - auto msg = std::make_unique<api::CreateBucketCommand>(bucket); - msg->setActive(setActive); - onDecodeBucketInfoCommand(buf, *msg); - return msg; -} - -} diff --git a/storage/src/vespa/storageapi/mbusprot/protocolserialization5_1.h b/storage/src/vespa/storageapi/mbusprot/protocolserialization5_1.h deleted file mode 100644 index bfad492e653..00000000000 --- a/storage/src/vespa/storageapi/mbusprot/protocolserialization5_1.h +++ /dev/null @@ -1,34 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#pragma once - -#include "protocolserialization5_0.h" - -namespace storage::mbusprot { - -class ProtocolSerialization5_1 : public ProtocolSerialization5_0 -{ - enum BucketState { - BUCKET_READY = 0x1, - BUCKET_ACTIVE = 0x2, - }; -public: - ProtocolSerialization5_1(const std::shared_ptr<const document::DocumentTypeRepo>&); - - api::BucketInfo getBucketInfo(document::ByteBuffer& buf) const override; - void putBucketInfo(const api::BucketInfo& info, vespalib::GrowableByteBuffer& buf) const override; - -protected: - void onEncode(GBBuf&, const api::SetBucketStateCommand&) const override; - void onEncode(GBBuf&, const api::SetBucketStateReply&) const override; - void onEncode(GBBuf&, const api::GetCommand&) const override; - void onEncode(GBBuf&, const api::CreateVisitorCommand&) const override; - void onEncode(GBBuf&, const api::CreateBucketCommand&) const override; - - SCmd::UP onDecodeSetBucketStateCommand(BBuf&) const override; - SRep::UP onDecodeSetBucketStateReply(const SCmd&, BBuf&) const override; - SCmd::UP onDecodeGetCommand(BBuf&) const override; - SCmd::UP onDecodeCreateVisitorCommand(BBuf&) const override; - SCmd::UP onDecodeCreateBucketCommand(BBuf&) const override; -}; - -} diff --git a/storage/src/vespa/storageapi/mbusprot/protocolserialization5_2.cpp b/storage/src/vespa/storageapi/mbusprot/protocolserialization5_2.cpp deleted file mode 100644 index 10aa9d69fbc..00000000000 --- a/storage/src/vespa/storageapi/mbusprot/protocolserialization5_2.cpp +++ /dev/null @@ -1,64 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -// @author Vegard Sjonfjell - -#include "protocolserialization5_2.h" -#include "storagecommand.h" -#include "serializationhelper.h" - -namespace storage::mbusprot { - -using documentapi::TestAndSetCondition; - -void ProtocolSerialization5_2::onEncode(GBBuf & buf, const api::PutCommand & cmd) const -{ - ProtocolSerialization5_0::onEncode(buf, cmd); - encodeTasCondition(buf, cmd); -} - -api::StorageCommand::UP -ProtocolSerialization5_2::onDecodePutCommand(BBuf & buf) const -{ - auto cmd = ProtocolSerialization5_0::onDecodePutCommand(buf); - decodeTasCondition(*cmd, buf); - return cmd; -} - -void ProtocolSerialization5_2::onEncode(GBBuf & buf, const api::RemoveCommand & cmd) const -{ - ProtocolSerialization4_2::onEncode(buf, cmd); - encodeTasCondition(buf, cmd); -} - -api::StorageCommand::UP -ProtocolSerialization5_2::onDecodeRemoveCommand(BBuf & buf) const -{ - auto cmd = ProtocolSerialization4_2::onDecodeRemoveCommand(buf); - decodeTasCondition(*cmd, buf); - return cmd; -} - -void ProtocolSerialization5_2::onEncode(GBBuf & buf, const api::UpdateCommand & cmd) const -{ - ProtocolSerialization5_0::onEncode(buf, cmd); - encodeTasCondition(buf, cmd); -} - -api::StorageCommand::UP -ProtocolSerialization5_2::onDecodeUpdateCommand(BBuf & buf) const -{ - auto cmd = ProtocolSerialization5_0::onDecodeUpdateCommand(buf); - decodeTasCondition(*cmd, buf); - return cmd; -} - -void ProtocolSerialization5_2::decodeTasCondition(api::StorageCommand & storageCmd, BBuf & buf) { - auto & cmd = static_cast<api::TestAndSetCommand &>(storageCmd); - cmd.setCondition(TestAndSetCondition(SH::getString(buf))); -} - -void ProtocolSerialization5_2::encodeTasCondition(GBBuf & buf, const api::StorageCommand & storageCmd) { - auto & cmd = static_cast<const api::TestAndSetCommand &>(storageCmd); - buf.putString(cmd.getCondition().getSelection()); -} - -} diff --git a/storage/src/vespa/storageapi/mbusprot/protocolserialization5_2.h b/storage/src/vespa/storageapi/mbusprot/protocolserialization5_2.h deleted file mode 100644 index e56b9942fa5..00000000000 --- a/storage/src/vespa/storageapi/mbusprot/protocolserialization5_2.h +++ /dev/null @@ -1,32 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -// @author Vegard Sjonfjell - -#pragma once - -#include "protocolserialization5_1.h" -#include <vespa/vespalib/util/growablebytebuffer.h> -#include <vespa/storageapi/message/persistence.h> - -namespace storage::mbusprot { - -class ProtocolSerialization5_2 : public ProtocolSerialization5_1 -{ -public: - ProtocolSerialization5_2(const std::shared_ptr<const document::DocumentTypeRepo>& repo) - : ProtocolSerialization5_1(repo) - {} - -protected: - void onEncode(GBBuf &, const api::PutCommand &) const override; - void onEncode(GBBuf &, const api::RemoveCommand &) const override; - void onEncode(GBBuf &, const api::UpdateCommand &) const override; - - SCmd::UP onDecodePutCommand(BBuf &) const override; - SCmd::UP onDecodeRemoveCommand(BBuf &) const override; - SCmd::UP onDecodeUpdateCommand(BBuf &) const override; - - static void decodeTasCondition(api::StorageCommand & cmd, BBuf & buf); - static void encodeTasCondition(GBBuf & buf, const api::StorageCommand & cmd); -}; - -} diff --git a/storage/src/vespa/storageapi/mbusprot/protocolserialization6_0.cpp b/storage/src/vespa/storageapi/mbusprot/protocolserialization6_0.cpp deleted file mode 100644 index 799aff8f8b3..00000000000 --- a/storage/src/vespa/storageapi/mbusprot/protocolserialization6_0.cpp +++ /dev/null @@ -1,40 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#include "protocolserialization6_0.h" -#include "serializationhelper.h" - -namespace storage::mbusprot { - -ProtocolSerialization6_0::ProtocolSerialization6_0(const std::shared_ptr<const document::DocumentTypeRepo> &repo) - : ProtocolSerialization5_2(repo) -{ -} - -document::Bucket -ProtocolSerialization6_0::getBucket(document::ByteBuffer &buf) const -{ - document::BucketSpace bucketSpace(SH::getLong(buf)); - document::BucketId bucketId(SH::getLong(buf)); - return document::Bucket(bucketSpace, bucketId); -} - -void -ProtocolSerialization6_0::putBucket(const document::Bucket &bucket, vespalib::GrowableByteBuffer &buf) const -{ - buf.putLong(bucket.getBucketSpace().getId()); - buf.putLong(bucket.getBucketId().getRawId()); -} - -document::BucketSpace -ProtocolSerialization6_0::getBucketSpace(document::ByteBuffer &buf) const -{ - return document::BucketSpace(SH::getLong(buf)); -} - -void -ProtocolSerialization6_0::putBucketSpace(document::BucketSpace bucketSpace, vespalib::GrowableByteBuffer &buf) const -{ - buf.putLong(bucketSpace.getId()); -} - -} diff --git a/storage/src/vespa/storageapi/mbusprot/protocolserialization6_0.h b/storage/src/vespa/storageapi/mbusprot/protocolserialization6_0.h deleted file mode 100644 index 5467cf6c5d2..00000000000 --- a/storage/src/vespa/storageapi/mbusprot/protocolserialization6_0.h +++ /dev/null @@ -1,24 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#pragma once - -#include "protocolserialization5_2.h" - -namespace storage::mbusprot { - -/** - * Protocol serialization version adding decoding and encoding - * of bucket space to almost all commands. - */ -class ProtocolSerialization6_0 : public ProtocolSerialization5_2 -{ -public: - ProtocolSerialization6_0(const std::shared_ptr<const document::DocumentTypeRepo> &repo); - - document::Bucket getBucket(document::ByteBuffer &buf) const override; - void putBucket(const document::Bucket &bucket, vespalib::GrowableByteBuffer &buf) const override; - document::BucketSpace getBucketSpace(document::ByteBuffer &buf) const override; - void putBucketSpace(document::BucketSpace bucketSpace, vespalib::GrowableByteBuffer &buf) const override; -}; - -} diff --git a/storage/src/vespa/storageapi/mbusprot/storageprotocol.cpp b/storage/src/vespa/storageapi/mbusprot/storageprotocol.cpp index 3cf54860f7c..a59e5827523 100644 --- a/storage/src/vespa/storageapi/mbusprot/storageprotocol.cpp +++ b/storage/src/vespa/storageapi/mbusprot/storageprotocol.cpp @@ -6,6 +6,7 @@ #include <vespa/vespalib/util/exceptions.h> #include <vespa/document/util/stringutil.h> #include <vespa/document/bucket/fixed_bucket_spaces.h> +#include <cassert> #include <sstream> #include <vespa/log/bufferedlogger.h> @@ -16,11 +17,7 @@ namespace storage::mbusprot { mbus::string StorageProtocol::NAME = "StorageProtocol"; StorageProtocol::StorageProtocol(const std::shared_ptr<const document::DocumentTypeRepo> repo) - : _serializer5_0(repo), - _serializer5_1(repo), - _serializer5_2(repo), - _serializer6_0(repo), - _serializer7_0(repo) + : _serializer7_0(repo) { } @@ -33,20 +30,7 @@ StorageProtocol::createPolicy(const mbus::string&, const mbus::string&) const } namespace { - vespalib::Version version7_0(7, 41, 19); - vespalib::Version version6_0(6, 240, 0); - vespalib::Version version5_2(5, 93, 30); - vespalib::Version version5_1(5, 1, 0); - vespalib::Version version5_0(5, 0, 12); - vespalib::Version version5_0beta(4, 3, 0); -} - - -static bool -suppressEncodeWarning(const api::StorageMessage *msg) -{ - const auto *req = dynamic_cast<const api::RequestBucketInfoCommand *>(msg); - return ((req != nullptr) && (req->getBucketSpace() != document::FixedBucketSpaces::default_space())); +vespalib::Version version7_0(7, 41, 19); } static mbus::Blob @@ -82,46 +66,23 @@ StorageProtocol::encode(const vespalib::Version& version, const StorageMessage & message(dynamic_cast<const StorageMessage &>(routable)); try { - if (message.getInternalMessage().get() == 0) { - throw vespalib::IllegalArgumentException( - "Given storage message wrapper does not contain a " - "storage message.", - VESPA_STRLOC); - } + assert(message.getInternalMessage()); + if (version < version7_0) { + LOGBP(error, + "Cannot encode message on version %s." + "Minimum version is %s. Cannot serialize %s.", + version.toString().c_str(), + version7_0.toString().c_str(), + message.getInternalMessage()->toString().c_str()); - if (version < version5_1) { - if (version < version5_0beta) { - LOGBP(warning, - "No support for using messagebus for version %s." - "Minimum version is %s. Thus we cannot serialize %s.", - version.toString().c_str(), - version5_0beta.toString().c_str(), - message.getInternalMessage()->toString().c_str()); - - return mbus::Blob(0); - } else { - return encodeMessage(_serializer5_0, routable, message, version5_0, version); - } - } else if (version < version5_2) { - return encodeMessage(_serializer5_1, routable, message, version5_1, version); - } else { - if (version < version6_0) { - return encodeMessage(_serializer5_2, routable, message, version5_2, version); - } else if (version < version7_0) { - return encodeMessage(_serializer6_0, routable, message, version6_0, version); - } else { - return encodeMessage(_serializer7_0, routable, message, version7_0, version); - } + return mbus::Blob(0); } - + return encodeMessage(_serializer7_0, routable, message, version7_0, version); } catch (std::exception & e) { - if (!(version < version6_0 && - suppressEncodeWarning(message.getInternalMessage().get()))) { - LOGBP(warning, "Failed to encode %s storage protocol message %s: %s", - version.toString().c_str(), - message.getInternalMessage()->toString().c_str(), - e.what()); - } + LOGBP(warning, "Failed to encode %s storage protocol message %s: %s", + version.toString().c_str(), + message.getInternalMessage()->toString().c_str(), + e.what()); } return mbus::Blob(0); @@ -168,27 +129,14 @@ StorageProtocol::decode(const vespalib::Version & version, static_cast<api::MessageType::Id>(SerializationHelper::getInt(buf))); StorageMessage::UP message; - if (version < version5_1) { - if (version < version5_0beta) { - LOGBP(error, - "No support for using messagebus for version %s." - "Minimum version is %s.", - version.toString().c_str(), - version5_0beta.toString().c_str()); - } else { - return decodeMessage(_serializer5_0, data, type, version5_0, version); - } - } else if (version < version5_2) { - return decodeMessage(_serializer5_1, data, type, version5_1, version); - } else { - if (version < version6_0) { - return decodeMessage(_serializer5_2, data, type, version5_2, version); - } else if (version < version7_0) { - return decodeMessage(_serializer6_0, data, type, version6_0, version); - } else { - return decodeMessage(_serializer7_0, data, type, version7_0, version); - } + if (version < version7_0) { + LOGBP(error, + "Cannot decode message on version %s. Minimum version is %s.", + version.toString().c_str(), + version7_0.toString().c_str()); + return mbus::Routable::UP(); } + return decodeMessage(_serializer7_0, data, type, version7_0, version); } catch (std::exception & e) { std::ostringstream ost; ost << "Failed to decode " << version.toString() << " messagebus " @@ -196,7 +144,6 @@ StorageProtocol::decode(const vespalib::Version & version, document::StringUtil::printAsHex(ost, data.data(), data.size()); LOGBP(warning, "%s", ost.str().c_str()); } - return mbus::Routable::UP(); } diff --git a/storage/src/vespa/storageapi/mbusprot/storageprotocol.h b/storage/src/vespa/storageapi/mbusprot/storageprotocol.h index 591009c9832..dbea21a4a9f 100644 --- a/storage/src/vespa/storageapi/mbusprot/storageprotocol.h +++ b/storage/src/vespa/storageapi/mbusprot/storageprotocol.h @@ -1,8 +1,6 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #pragma once -#include "protocolserialization5_2.h" -#include "protocolserialization6_0.h" #include "protocolserialization7.h" #include <vespa/messagebus/iprotocol.h> @@ -23,10 +21,6 @@ public: mbus::Blob encode(const vespalib::Version&, const mbus::Routable&) const override; mbus::Routable::UP decode(const vespalib::Version&, mbus::BlobRef) const override; private: - ProtocolSerialization5_0 _serializer5_0; - ProtocolSerialization5_1 _serializer5_1; - ProtocolSerialization5_2 _serializer5_2; - ProtocolSerialization6_0 _serializer6_0; ProtocolSerialization7 _serializer7_0; }; |