summaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@yahooinc.com>2022-07-01 12:15:49 +0000
committerTor Brede Vekterli <vekterli@yahooinc.com>2022-07-01 12:47:47 +0000
commit331dfa1c316e1663e84a792aabba75fce284356d (patch)
treed452d7624e5652d25ba85d3be657d7c02af6a2ad /storage
parente72d4b1c5b850783cfaa5c297351c330c4c034f8 (diff)
GC all legacy storage protocol serialization versions
Legacy version negotiation only happens over MessageBus transport, which is now removed. Current StorageAPI RPC transport always uses the newest version directly since it's built around Protobuf.
Diffstat (limited to 'storage')
-rw-r--r--storage/src/tests/storageapi/mbusprot/storageprotocoltest.cpp50
-rw-r--r--storage/src/vespa/storage/storageserver/communicationmanager.cpp4
-rw-r--r--storage/src/vespa/storageapi/mbusprot/CMakeLists.txt5
-rw-r--r--storage/src/vespa/storageapi/mbusprot/legacyprotocolserialization.h31
-rw-r--r--storage/src/vespa/storageapi/mbusprot/oldreturncodemapper.h68
-rw-r--r--storage/src/vespa/storageapi/mbusprot/protocolserialization.cpp6
-rw-r--r--storage/src/vespa/storageapi/mbusprot/protocolserialization4_2.cpp553
-rw-r--r--storage/src/vespa/storageapi/mbusprot/protocolserialization4_2.h72
-rw-r--r--storage/src/vespa/storageapi/mbusprot/protocolserialization5_0.cpp622
-rw-r--r--storage/src/vespa/storageapi/mbusprot/protocolserialization5_0.h73
-rw-r--r--storage/src/vespa/storageapi/mbusprot/protocolserialization5_1.cpp198
-rw-r--r--storage/src/vespa/storageapi/mbusprot/protocolserialization5_1.h34
-rw-r--r--storage/src/vespa/storageapi/mbusprot/protocolserialization5_2.cpp64
-rw-r--r--storage/src/vespa/storageapi/mbusprot/protocolserialization5_2.h32
-rw-r--r--storage/src/vespa/storageapi/mbusprot/protocolserialization6_0.cpp40
-rw-r--r--storage/src/vespa/storageapi/mbusprot/protocolserialization6_0.h24
-rw-r--r--storage/src/vespa/storageapi/mbusprot/storageprotocol.cpp101
-rw-r--r--storage/src/vespa/storageapi/mbusprot/storageprotocol.h6
18 files changed, 31 insertions, 1952 deletions
diff --git a/storage/src/tests/storageapi/mbusprot/storageprotocoltest.cpp b/storage/src/tests/storageapi/mbusprot/storageprotocoltest.cpp
index c130e433285..d4d22d6a36c 100644
--- a/storage/src/tests/storageapi/mbusprot/storageprotocoltest.cpp
+++ b/storage/src/tests/storageapi/mbusprot/storageprotocoltest.cpp
@@ -112,8 +112,7 @@ std::string version_as_gtest_string(TestParamInfo<vespalib::Version> info) {
}
VESPA_GTEST_INSTANTIATE_TEST_SUITE_P(MultiVersionTest, StorageProtocolTest,
- Values(vespalib::Version(6, 240, 0),
- vespalib::Version(7, 41, 19)),
+ Values(vespalib::Version(7, 41, 19)),
version_as_gtest_string);
namespace {
@@ -301,10 +300,6 @@ TEST_P(StorageProtocolTest, get_internal_read_consistency_is_strong_by_default)
}
TEST_P(StorageProtocolTest, can_set_internal_read_consistency_on_get_commands) {
- // Only supported on protocol version 7+. Will default to Strong on older versions, which is what we want.
- if (GetParam().getMajor() < 7) {
- return;
- }
auto cmd = std::make_shared<GetCommand>(_bucket, _testDocId, "foo,bar,vekterli", 123);
cmd->set_internal_read_consistency(InternalReadConsistency::Weak);
auto cmd2 = copyCommand(cmd);
@@ -316,10 +311,6 @@ TEST_P(StorageProtocolTest, can_set_internal_read_consistency_on_get_commands) {
}
TEST_P(StorageProtocolTest, tombstones_propagated_for_gets) {
- // Only supported on protocol version 7+.
- if (GetParam().getMajor() < 7) {
- return;
- }
auto cmd = std::make_shared<GetCommand>(_bucket, _testDocId, "foo,bar", 123);
auto reply = std::make_shared<GetReply>(*cmd, std::shared_ptr<Document>(), 100, false, true);
set_dummy_bucket_info_reply_fields(*reply);
@@ -332,23 +323,6 @@ TEST_P(StorageProtocolTest, tombstones_propagated_for_gets) {
EXPECT_TRUE(reply2->is_tombstone());
}
-// TODO remove this once pre-protobuf serialization is removed
-TEST_P(StorageProtocolTest, old_serialization_format_treats_tombstone_get_replies_as_not_found) {
- if (GetParam().getMajor() >= 7) {
- return;
- }
- auto cmd = std::make_shared<GetCommand>(_bucket, _testDocId, "foo,bar", 123);
- auto reply = std::make_shared<GetReply>(*cmd, std::shared_ptr<Document>(), 100, false, true);
- set_dummy_bucket_info_reply_fields(*reply);
- auto reply2 = copyReply(reply);
-
- EXPECT_TRUE(reply2->getDocument().get() == nullptr);
- EXPECT_EQ(_testDoc->getId(), reply2->getDocumentId());
- EXPECT_EQ(Timestamp(123), reply2->getBeforeTimestamp());
- EXPECT_EQ(Timestamp(0), reply2->getLastModifiedTimestamp());
- EXPECT_FALSE(reply2->is_tombstone()); // Protocol version doesn't understand explicit tombstones.
-}
-
TEST_P(StorageProtocolTest, remove) {
auto cmd = std::make_shared<RemoveCommand>(_bucket, _testDocId, 159);
auto cmd2 = copyCommand(cmd);
@@ -486,11 +460,7 @@ TEST_P(StorageProtocolTest, merge_bucket) {
EXPECT_EQ(Timestamp(1234), cmd2->getMaxTimestamp());
EXPECT_EQ(uint32_t(567), cmd2->getClusterStateVersion());
EXPECT_EQ(chain, cmd2->getChain());
- if (GetParam().getMajor() >= 7) {
- EXPECT_EQ(cmd2->use_unordered_forwarding(), cmd->use_unordered_forwarding());
- } else {
- EXPECT_FALSE(cmd2->use_unordered_forwarding());
- }
+ EXPECT_EQ(cmd2->use_unordered_forwarding(), cmd->use_unordered_forwarding());
auto reply = std::make_shared<MergeBucketReply>(*cmd);
auto reply2 = copyReply(reply);
@@ -569,18 +539,10 @@ TEST_P(StorageProtocolTest, remove_location) {
uint32_t n_docs_removed = 12345;
auto reply = std::make_shared<RemoveLocationReply>(*cmd2, n_docs_removed);
auto reply2 = copyReply(reply);
- if (GetParam().getMajor() == 7) {
- // Statistics are only available for protobuf-enabled version.
- EXPECT_EQ(n_docs_removed, reply2->documents_removed());
- } else {
- EXPECT_EQ(0, reply2->documents_removed());
- }
+ EXPECT_EQ(n_docs_removed, reply2->documents_removed());
}
TEST_P(StorageProtocolTest, stat_bucket) {
- if (GetParam().getMajor() < 7) {
- return; // Only available for protobuf-backed protocol version.
- }
auto cmd = std::make_shared<StatBucketCommand>(_bucket, "id.group == 'mygroup'");
auto cmd2 = copyCommand(cmd);
EXPECT_EQ("id.group == 'mygroup'", cmd2->getDocumentSelection());
@@ -823,11 +785,7 @@ TEST_P(StorageProtocolTest, serialized_size_is_used_to_set_approx_size_of_storag
auto cmd2 = copyCommand(cmd);
auto version = GetParam();
- if (version.getMajor() == 7) { // Protobuf-based encoding
- EXPECT_EQ(158u, cmd2->getApproxByteSize());
- } else { // Legacy encoding
- EXPECT_EQ(181u, cmd2->getApproxByteSize());
- }
+ EXPECT_EQ(158u, cmd2->getApproxByteSize());
}
TEST_P(StorageProtocolTest, track_memory_footprint_for_some_messages) {
diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.cpp b/storage/src/vespa/storage/storageserver/communicationmanager.cpp
index 4c375a30867..ab1f72e85d2 100644
--- a/storage/src/vespa/storage/storageserver/communicationmanager.cpp
+++ b/storage/src/vespa/storage/storageserver/communicationmanager.cpp
@@ -169,10 +169,6 @@ CommunicationManager::handleReply(std::unique_ptr<mbus::Reply> reply)
sar->setTrace(reply->steal_trace());
receiveStorageReply(sar);
}
- } else if (protocolName == mbusprot::StorageProtocol::NAME) {
- mbusprot::StorageReply* sr(static_cast<mbusprot::StorageReply*>(reply.get()));
- sr->getReply()->setTrace(reply->steal_trace());
- receiveStorageReply(sr->getReply());
} else {
LOGBM(warning, "Received unsupported reply type %d for protocol '%s'.",
reply->getType(), reply->getProtocol().c_str());
diff --git a/storage/src/vespa/storageapi/mbusprot/CMakeLists.txt b/storage/src/vespa/storageapi/mbusprot/CMakeLists.txt
index 43c1da32502..4999201fbeb 100644
--- a/storage/src/vespa/storageapi/mbusprot/CMakeLists.txt
+++ b/storage/src/vespa/storageapi/mbusprot/CMakeLists.txt
@@ -23,11 +23,6 @@ vespa_add_library(storageapi_mbusprot OBJECT
storagereply.cpp
protocolserialization.cpp
storageprotocol.cpp
- protocolserialization4_2.cpp
- protocolserialization5_0.cpp
- protocolserialization5_1.cpp
- protocolserialization5_2.cpp
- protocolserialization6_0.cpp
protocolserialization7.cpp
${storageapi_PROTOBUF_SRCS}
DEPENDS
diff --git a/storage/src/vespa/storageapi/mbusprot/legacyprotocolserialization.h b/storage/src/vespa/storageapi/mbusprot/legacyprotocolserialization.h
deleted file mode 100644
index f3c6a6f6856..00000000000
--- a/storage/src/vespa/storageapi/mbusprot/legacyprotocolserialization.h
+++ /dev/null
@@ -1,31 +0,0 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#pragma once
-
-#include "protocolserialization.h"
-
-namespace storage::mbusprot {
-
-/*
- * Utility base class for pre-v7 (protobuf) serialization implementations.
- *
- * TODO remove on Vespa 8 alongside legacy serialization formats.
- */
-class LegacyProtocolSerialization : public ProtocolSerialization {
- const std::shared_ptr<const document::DocumentTypeRepo> _repo;
-public:
- explicit LegacyProtocolSerialization(const std::shared_ptr<const document::DocumentTypeRepo>& repo)
- : _repo(repo)
- {}
-
- const document::DocumentTypeRepo& getTypeRepo() const { return *_repo; }
- const std::shared_ptr<const document::DocumentTypeRepo> getTypeRepoSp() const { return _repo; }
-
- virtual document::Bucket getBucket(document::ByteBuffer& buf) const = 0;
- virtual void putBucket(const document::Bucket& bucket, vespalib::GrowableByteBuffer& buf) const = 0;
- virtual document::BucketSpace getBucketSpace(document::ByteBuffer& buf) const = 0;
- virtual void putBucketSpace(document::BucketSpace bucketSpace, vespalib::GrowableByteBuffer& buf) const = 0;
- virtual api::BucketInfo getBucketInfo(document::ByteBuffer& buf) const = 0;
- virtual void putBucketInfo(const api::BucketInfo& info, vespalib::GrowableByteBuffer& buf) const = 0;
-};
-
-} // storage::mbusprot
diff --git a/storage/src/vespa/storageapi/mbusprot/oldreturncodemapper.h b/storage/src/vespa/storageapi/mbusprot/oldreturncodemapper.h
deleted file mode 100644
index 40e09f23697..00000000000
--- a/storage/src/vespa/storageapi/mbusprot/oldreturncodemapper.h
+++ /dev/null
@@ -1,68 +0,0 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-/**
- * \brief Maps return code values used between 4.2 and 5.0
- */
-#pragma once
-
-namespace storage {
-namespace mbusprot {
-
-int getOldErrorCode(api::ReturnCode::Result newErrorCode) {
- switch (newErrorCode) {
- case api::ReturnCode::OK: return 1;
- case api::ReturnCode::EXISTS: return 1001;
- case api::ReturnCode::NOT_READY: return 2000;
- case api::ReturnCode::WRONG_DISTRIBUTION: return 2001;
- case api::ReturnCode::REJECTED: return 2002;
- case api::ReturnCode::ABORTED: return 2003;
- case api::ReturnCode::BUCKET_NOT_FOUND: return 2004;
- case api::ReturnCode::BUCKET_DELETED: return 2004;
- case api::ReturnCode::TIMESTAMP_EXIST: return 2005;
- case api::ReturnCode::UNKNOWN_COMMAND: return 3000;
- case api::ReturnCode::NOT_IMPLEMENTED: return 3001;
- case api::ReturnCode::ILLEGAL_PARAMETERS: return 3002;
- case api::ReturnCode::IGNORED: return 3003;
- case api::ReturnCode::UNPARSEABLE: return 3004;
- case api::ReturnCode::NOT_CONNECTED: return 4000;
- case api::ReturnCode::TIMEOUT: return 4003;
- case api::ReturnCode::BUSY: return 4004;
- case api::ReturnCode::NO_SPACE: return 5000;
- case api::ReturnCode::DISK_FAILURE: return 5001;
- case api::ReturnCode::IO_FAILURE: return 5003;
- case api::ReturnCode::INTERNAL_FAILURE: return 6000;
- default: return 6001;
- }
-}
-
-api::ReturnCode::Result getNewErrorCode(int oldErrorCode) {
- switch (oldErrorCode) {
- case 1: return api::ReturnCode::OK;
- case 1000: return api::ReturnCode::OK; // NOT_FOUND
- case 1001: return api::ReturnCode::EXISTS;
- case 2000: return api::ReturnCode::NOT_READY;
- case 2001: return api::ReturnCode::WRONG_DISTRIBUTION;
- case 2002: return api::ReturnCode::REJECTED;
- case 2003: return api::ReturnCode::ABORTED;
- case 2004: return api::ReturnCode::BUCKET_NOT_FOUND;
- case 2005: return api::ReturnCode::TIMESTAMP_EXIST;
- case 3000: return api::ReturnCode::UNKNOWN_COMMAND;
- case 3001: return api::ReturnCode::NOT_IMPLEMENTED;
- case 3002: return api::ReturnCode::ILLEGAL_PARAMETERS;
- case 3003: return api::ReturnCode::IGNORED;
- case 3004: return api::ReturnCode::UNPARSEABLE;
- case 4000: return api::ReturnCode::NOT_CONNECTED;
- case 4001: return api::ReturnCode::BUSY; // OVERLOAD;
- case 4002: return api::ReturnCode::NOT_READY; // REMOTE_DISABLED;
- case 4003: return api::ReturnCode::TIMEOUT;
- case 4004: return api::ReturnCode::BUSY;
- case 5000: return api::ReturnCode::NO_SPACE;
- case 5001: return api::ReturnCode::DISK_FAILURE;
- case 5003: return api::ReturnCode::IO_FAILURE;
- case 6000: return api::ReturnCode::INTERNAL_FAILURE;
- default: return api::ReturnCode::INTERNAL_FAILURE;
- }
-}
-
-} // mbusprot
-} // storage
-
diff --git a/storage/src/vespa/storageapi/mbusprot/protocolserialization.cpp b/storage/src/vespa/storageapi/mbusprot/protocolserialization.cpp
index 4614659c458..59ea60e6e0d 100644
--- a/storage/src/vespa/storageapi/mbusprot/protocolserialization.cpp
+++ b/storage/src/vespa/storageapi/mbusprot/protocolserialization.cpp
@@ -1,16 +1,16 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#include "protocolserialization4_2.h"
+#include "protocolserialization7.h"
#include "serializationhelper.h"
#include "storagecommand.h"
#include "storagereply.h"
#include <vespa/storageapi/message/bucketsplitting.h>
-#include <vespa/storageapi/message/visitor.h>
+#include <vespa/storageapi/message/persistence.h>
#include <vespa/storageapi/message/removelocation.h>
#include <vespa/storageapi/message/stat.h>
+#include <vespa/storageapi/message/visitor.h>
#include <vespa/vespalib/util/exceptions.h>
-
#include <sstream>
#include <vespa/log/log.h>
diff --git a/storage/src/vespa/storageapi/mbusprot/protocolserialization4_2.cpp b/storage/src/vespa/storageapi/mbusprot/protocolserialization4_2.cpp
deleted file mode 100644
index 1bea0ea74c9..00000000000
--- a/storage/src/vespa/storageapi/mbusprot/protocolserialization4_2.cpp
+++ /dev/null
@@ -1,553 +0,0 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-#include "protocolserialization4_2.h"
-#include "oldreturncodemapper.h"
-#include "serializationhelper.h"
-#include "storagecommand.h"
-#include "storagereply.h"
-
-#include <vespa/storageapi/message/bucketsplitting.h>
-#include <vespa/storageapi/message/visitor.h>
-#include <vespa/storageapi/message/removelocation.h>
-#include <vespa/vespalib/util/exceptions.h>
-#include <vespa/document/fieldset/fieldsets.h>
-
-#include <vespa/log/log.h>
-LOG_SETUP(".storage.api.mbusprot.serialization.4_2");
-
-using document::BucketSpace;
-using document::AllFields;
-
-namespace storage::mbusprot {
-
-ProtocolSerialization4_2::ProtocolSerialization4_2(
- const std::shared_ptr<const document::DocumentTypeRepo>& repo)
- : LegacyProtocolSerialization(repo)
-{
-}
-
-void ProtocolSerialization4_2::onEncode(GBBuf& buf, const api::GetCommand& msg) const
-{
- buf.putString(msg.getDocumentId().toString());
- putBucket(msg.getBucket(), buf);
- buf.putLong(msg.getBeforeTimestamp());
- buf.putBoolean(false);
- onEncodeCommand(buf, msg);
-}
-
-api::StorageCommand::UP
-ProtocolSerialization4_2::onDecodeGetCommand(BBuf& buf) const
-{
- document::DocumentId did(SH::getString(buf));
- document::Bucket bucket = getBucket(buf);
- api::Timestamp beforeTimestamp(SH::getLong(buf));
- bool headerOnly(SH::getBoolean(buf)); // Ignored header only flag
- (void) headerOnly;
- auto msg = std::make_unique<api::GetCommand>(bucket, did, AllFields::NAME, beforeTimestamp);
- onDecodeCommand(buf, *msg);
- return msg;
-}
-
-void ProtocolSerialization4_2::onEncode(GBBuf& buf, const api::RemoveCommand& msg) const
-{
- buf.putString(msg.getDocumentId().toString());
- putBucket(msg.getBucket(), buf);
- buf.putLong(msg.getTimestamp());
- onEncodeBucketInfoCommand(buf, msg);
-}
-
-api::StorageCommand::UP
-ProtocolSerialization4_2::onDecodeRemoveCommand(BBuf& buf) const
-{
- document::DocumentId did(SH::getString(buf));
- document::Bucket bucket = getBucket(buf);
- api::Timestamp timestamp(SH::getLong(buf));
- auto msg = std::make_unique<api::RemoveCommand>(bucket, did, timestamp);
- onDecodeBucketInfoCommand(buf, *msg);
- return msg;
-}
-
-void ProtocolSerialization4_2::onEncode(GBBuf& buf, const api::RevertCommand& msg) const
-{
- putBucket(msg.getBucket(), buf);
- buf.putInt(msg.getRevertTokens().size());
- for (uint32_t i=0, n=msg.getRevertTokens().size(); i<n; ++i) {
- buf.putLong(msg.getRevertTokens()[i]);
- }
- onEncodeBucketInfoCommand(buf, msg);
-}
-
-api::StorageCommand::UP
-ProtocolSerialization4_2::onDecodeRevertCommand(BBuf& buf) const
-{
- document::Bucket bucket = getBucket(buf);
- std::vector<api::Timestamp> tokens(SH::getInt(buf));
- for (uint32_t i=0, n=tokens.size(); i<n; ++i) {
- tokens[i] = SH::getLong(buf);
- }
- auto msg = std::make_unique<api::RevertCommand>(bucket, tokens);
- onDecodeBucketInfoCommand(buf, *msg);
- return msg;
-}
-
-void ProtocolSerialization4_2::onEncode(GBBuf& buf, const api::CreateBucketCommand& msg) const
-{
- putBucket(msg.getBucket(), buf);
- onEncodeBucketInfoCommand(buf, msg);
-}
-
-api::StorageCommand::UP
-ProtocolSerialization4_2::onDecodeCreateBucketCommand(BBuf& buf) const
-{
- document::Bucket bucket = getBucket(buf);
- auto msg = std::make_unique<api::CreateBucketCommand>(bucket);
- onDecodeBucketInfoCommand(buf, *msg);
- return msg;
-}
-
-void ProtocolSerialization4_2::onEncode(GBBuf& buf, const api::MergeBucketCommand& msg) const
-{
- putBucket(msg.getBucket(), buf);
- const std::vector<api::MergeBucketCommand::Node>& nodes(msg.getNodes());
- buf.putShort(nodes.size());
- for (uint32_t i=0; i<nodes.size(); ++i) {
- buf.putShort(nodes[i].index);
- buf.putBoolean(nodes[i].sourceOnly);
- }
- buf.putLong(msg.getMaxTimestamp());
- onEncodeCommand(buf, msg);
-}
-
-api::StorageCommand::UP
-ProtocolSerialization4_2::onDecodeMergeBucketCommand(BBuf& buf) const
-{
- typedef api::MergeBucketCommand::Node Node;
- document::Bucket bucket = getBucket(buf);
- uint16_t nodeCount = SH::getShort(buf);
- std::vector<Node> nodes;
- nodes.reserve(nodeCount);
- for (uint32_t i=0; i<nodeCount; ++i) {
- uint16_t index(SH::getShort(buf));
- bool sourceOnly = SH::getBoolean(buf);
- nodes.push_back(Node(index, sourceOnly));
- }
- api::Timestamp timestamp(SH::getLong(buf));
- auto msg = std::make_unique<api::MergeBucketCommand>(bucket, nodes, timestamp);
- onDecodeCommand(buf, *msg);
- return msg;
-}
-
-void ProtocolSerialization4_2::onEncode(GBBuf& buf, const api::GetBucketDiffCommand& msg) const
-{
- putBucket(msg.getBucket(), buf);
- const std::vector<api::MergeBucketCommand::Node>& nodes(msg.getNodes());
- buf.putShort(nodes.size());
- for (uint32_t i=0; i<nodes.size(); ++i) {
- buf.putShort(nodes[i].index);
- buf.putBoolean(nodes[i].sourceOnly);
- }
- buf.putLong(msg.getMaxTimestamp());
- const std::vector<api::GetBucketDiffCommand::Entry>& entries(msg.getDiff());
- buf.putInt(entries.size());
- for (uint32_t i=0; i<entries.size(); ++i) {
- onEncodeDiffEntry(buf, entries[i]);
- }
- onEncodeCommand(buf, msg);
-}
-
-api::StorageCommand::UP
-ProtocolSerialization4_2::onDecodeGetBucketDiffCommand(BBuf& buf) const
-{
- typedef api::MergeBucketCommand::Node Node;
- document::Bucket bucket = getBucket(buf);
- uint16_t nodeCount = SH::getShort(buf);
- std::vector<Node> nodes;
- nodes.reserve(nodeCount);
- for (uint32_t i=0; i<nodeCount; ++i) {
- uint16_t index(SH::getShort(buf));
- bool sourceOnly = SH::getBoolean(buf);
- nodes.push_back(Node(index, sourceOnly));
- }
- api::Timestamp timestamp = SH::getLong(buf);
- auto msg = std::make_unique<api::GetBucketDiffCommand>(bucket, nodes, timestamp);
- std::vector<api::GetBucketDiffCommand::Entry>& entries(msg->getDiff());
- uint32_t entryCount = SH::getInt(buf);
- if (entryCount > buf.getRemaining()) {
- // Trigger out of bounds exception rather than out of memory error
- buf.incPos(entryCount);
- }
- entries.resize(entryCount);
- for (uint32_t i=0; i<entries.size(); ++i) {
- onDecodeDiffEntry(buf, entries[i]);
- }
- onDecodeCommand(buf, *msg);
- return msg;
-}
-
-void ProtocolSerialization4_2::onEncode(GBBuf& buf, const api::ApplyBucketDiffCommand& msg) const
-{
- putBucket(msg.getBucket(), buf);
- const std::vector<api::MergeBucketCommand::Node>& nodes(msg.getNodes());
- buf.putShort(nodes.size());
- for (uint32_t i=0; i<nodes.size(); ++i) {
- buf.putShort(nodes[i].index);
- buf.putBoolean(nodes[i].sourceOnly);
- }
- buf.putInt(0x400000);
- const std::vector<api::ApplyBucketDiffCommand::Entry>& entries(msg.getDiff());
- buf.putInt(entries.size());
- for (uint32_t i=0; i<entries.size(); ++i) {
- onEncodeDiffEntry(buf, entries[i]._entry);
- buf.putString(entries[i]._docName);
- buf.putInt(entries[i]._headerBlob.size());
- buf.putBytes(&entries[i]._headerBlob[0], entries[i]._headerBlob.size());
- buf.putInt(entries[i]._bodyBlob.size());
- buf.putBytes(&entries[i]._bodyBlob[0], entries[i]._bodyBlob.size());
- }
- onEncodeBucketInfoCommand(buf, msg);
-}
-
-api::StorageCommand::UP
-ProtocolSerialization4_2::onDecodeApplyBucketDiffCommand(BBuf& buf) const
-{
- typedef api::MergeBucketCommand::Node Node;
- document::Bucket bucket = getBucket(buf);
- uint16_t nodeCount = SH::getShort(buf);
- std::vector<Node> nodes;
- nodes.reserve(nodeCount);
- for (uint32_t i=0; i<nodeCount; ++i) {
- uint16_t index(SH::getShort(buf));
- bool sourceOnly = SH::getBoolean(buf);
- nodes.push_back(Node(index, sourceOnly));
- }
- (void) SH::getInt(buf); // Unused field
- auto msg = std::make_unique<api::ApplyBucketDiffCommand>(bucket, nodes);
- std::vector<api::ApplyBucketDiffCommand::Entry>& entries(msg->getDiff());
- uint32_t entryCount = SH::getInt(buf);
- if (entryCount > buf.getRemaining()) {
- // Trigger out of bounds exception rather than out of memory error
- buf.incPos(entryCount);
- }
- entries.resize(entryCount);
- for (uint32_t i=0; i<entries.size(); ++i) {
- onDecodeDiffEntry(buf, entries[i]._entry);
- entries[i]._docName = SH::getString(buf);
- uint32_t headerSize = SH::getInt(buf);
- if (headerSize > buf.getRemaining()) {
- buf.incPos(headerSize);
- }
- entries[i]._headerBlob.resize(headerSize);
- buf.getBytes(&entries[i]._headerBlob[0], entries[i]._headerBlob.size());
- uint32_t bodySize = SH::getInt(buf);
- if (bodySize > buf.getRemaining()) {
- buf.incPos(bodySize);
- }
- entries[i]._bodyBlob.resize(bodySize);
- buf.getBytes(&entries[i]._bodyBlob[0], entries[i]._bodyBlob.size());
- }
- onDecodeBucketInfoCommand(buf, *msg);
- return msg;
-}
-
-void
-ProtocolSerialization4_2::onEncode(GBBuf& buf, const api::RequestBucketInfoReply& msg) const
-{
- buf.putInt(msg.getBucketInfo().size());
- for (const auto & entry : msg.getBucketInfo()) {
- buf.putLong(entry._bucketId.getRawId());
- putBucketInfo(entry._info, buf);
- }
- onEncodeReply(buf, msg);
-}
-
-api::StorageReply::UP
-ProtocolSerialization4_2::onDecodeRequestBucketInfoReply(const SCmd& cmd, BBuf& buf) const
-{
- auto msg = std::make_unique<api::RequestBucketInfoReply>(static_cast<const api::RequestBucketInfoCommand&>(cmd));
- api::RequestBucketInfoReply::EntryVector & entries(msg->getBucketInfo());
- uint32_t entryCount = SH::getInt(buf);
- if (entryCount > buf.getRemaining()) {
- // Trigger out of bounds exception rather than out of memory error
- buf.incPos(entryCount);
- }
- entries.resize(entryCount);
- for (auto & entry : entries) {
- entry._bucketId = document::BucketId(SH::getLong(buf));
- entry._info = getBucketInfo(buf);
- }
- onDecodeReply(buf, *msg);
- return msg;
-}
-
-void ProtocolSerialization4_2::onEncode(GBBuf& buf, const api::NotifyBucketChangeCommand& msg) const
-{
- putBucket(msg.getBucket(), buf);
- putBucketInfo(msg.getBucketInfo(), buf);
- onEncodeCommand(buf, msg);
-}
-
-api::StorageCommand::UP
-ProtocolSerialization4_2::onDecodeNotifyBucketChangeCommand(BBuf& buf) const
-{
- document::Bucket bucket = getBucket(buf);
- api::BucketInfo info(getBucketInfo(buf));
- auto msg = std::make_unique<api::NotifyBucketChangeCommand>(bucket, info);
- onDecodeCommand(buf, *msg);
- return msg;
-}
-
-void ProtocolSerialization4_2::onEncode(GBBuf& buf, const api::NotifyBucketChangeReply& msg) const
-{
- onEncodeReply(buf, msg);
-}
-
-api::StorageReply::UP
-ProtocolSerialization4_2::onDecodeNotifyBucketChangeReply(const SCmd& cmd,BBuf& buf) const
-{
- auto msg = std::make_unique<api::NotifyBucketChangeReply>(static_cast<const api::NotifyBucketChangeCommand&>(cmd));
- onDecodeReply(buf, *msg);
- return msg;
-}
-
-void ProtocolSerialization4_2::onEncode(GBBuf& buf, const api::SplitBucketCommand& msg) const
-{
- putBucket(msg.getBucket(), buf);
- buf.putByte(msg.getMinSplitBits());
- buf.putByte(msg.getMaxSplitBits());
- buf.putInt(msg.getMinByteSize());
- buf.putInt(msg.getMinDocCount());
- onEncodeCommand(buf, msg);
-}
-
-api::StorageCommand::UP
-ProtocolSerialization4_2::onDecodeSplitBucketCommand(BBuf& buf) const
-{
- document::Bucket bucket = getBucket(buf);
- auto msg = std::make_unique<api::SplitBucketCommand>(bucket);
- msg->setMinSplitBits(SH::getByte(buf));
- msg->setMaxSplitBits(SH::getByte(buf));
- msg->setMinByteSize(SH::getInt(buf));
- msg->setMinDocCount(SH::getInt(buf));
- onDecodeCommand(buf, *msg);
- return msg;
-}
-
-void ProtocolSerialization4_2::onEncode(GBBuf&, const api::SetBucketStateCommand&) const
-{
- throw vespalib::IllegalStateException("Unsupported serialization", VESPA_STRLOC);
-}
-
-api::StorageCommand::UP
-ProtocolSerialization4_2::onDecodeSetBucketStateCommand(BBuf&) const
-{
- throw vespalib::IllegalStateException("Unsupported deserialization", VESPA_STRLOC);
-}
-
-void ProtocolSerialization4_2::onEncode(GBBuf&, const api::SetBucketStateReply&) const
-{
- throw vespalib::IllegalStateException("Unsupported serialization", VESPA_STRLOC);
-}
-
-api::StorageReply::UP
-ProtocolSerialization4_2::onDecodeSetBucketStateReply(const SCmd&, BBuf&) const
-{
- throw vespalib::IllegalStateException("Unsupported deserialization", VESPA_STRLOC);
-}
-
-void
-ProtocolSerialization4_2::onEncode(GBBuf& buf, const api::CreateVisitorCommand& msg) const
-{
- putBucketSpace(msg.getBucketSpace(), buf);
- buf.putString(msg.getLibraryName());
- buf.putString(msg.getInstanceId());
- buf.putString(msg.getDocumentSelection());
- buf.putInt(msg.getVisitorCmdId());
- buf.putString(msg.getControlDestination());
- buf.putString(msg.getDataDestination());
- buf.putInt(msg.getMaximumPendingReplyCount());
- buf.putLong(msg.getFromTime());
- buf.putLong(msg.getToTime());
-
- buf.putInt(msg.getBuckets().size());
- for (uint32_t i = 0; i < msg.getBuckets().size(); i++) {
- buf.putLong(msg.getBuckets()[i].getRawId());
- }
-
- buf.putBoolean(msg.visitRemoves());
- buf.putBoolean(false);
- buf.putBoolean(msg.visitInconsistentBuckets());
- buf.putInt(vespalib::count_ms(msg.getQueueTimeout()));
- msg.getParameters().serialize(buf);
-
- onEncodeCommand(buf, msg);
-}
-
-api::StorageCommand::UP
-ProtocolSerialization4_2::onDecodeCreateVisitorCommand(BBuf& buf) const
-{
- BucketSpace bucketSpace = getBucketSpace(buf);
- vespalib::stringref libraryName = SH::getString(buf);
- vespalib::stringref instanceId = SH::getString(buf);
- vespalib::stringref selection = SH::getString(buf);
- auto msg = std::make_unique<api::CreateVisitorCommand>(bucketSpace, libraryName, instanceId, selection);
- msg->setVisitorCmdId(SH::getInt(buf));
- msg->setControlDestination(SH::getString(buf));
- msg->setDataDestination(SH::getString(buf));
- msg->setMaximumPendingReplyCount(SH::getInt(buf));
-
- msg->setFromTime(SH::getLong(buf));
- msg->setToTime(SH::getLong(buf));
- uint32_t count = SH::getInt(buf);
-
- if (count > buf.getRemaining()) {
- // Trigger out of bounds exception rather than out of memory error
- buf.incPos(count);
- }
-
- for (uint32_t i = 0; i < count; i++) {
- msg->getBuckets().push_back(document::BucketId(SH::getLong(buf)));
- }
-
- if (SH::getBoolean(buf)) {
- msg->setVisitRemoves();
- }
- if (SH::getBoolean(buf)) {
- msg->setFieldSet(AllFields::NAME);
- }
- if (SH::getBoolean(buf)) {
- msg->setVisitInconsistentBuckets();
- }
- msg->setQueueTimeout(std::chrono::milliseconds(SH::getInt(buf)));
- msg->getParameters().deserialize(buf);
-
- onDecodeCommand(buf, *msg);
- msg->setVisitorDispatcherVersion(42);
- return msg;
-}
-
-void
-ProtocolSerialization4_2::onEncode(GBBuf& buf, const api::DestroyVisitorCommand& msg) const
-{
- buf.putString(msg.getInstanceId());
- onEncodeCommand(buf, msg);
-}
-
-api::StorageCommand::UP
-ProtocolSerialization4_2::onDecodeDestroyVisitorCommand(BBuf& buf) const
-{
- vespalib::stringref instanceId = SH::getString(buf);
- auto msg = std::make_unique<api::DestroyVisitorCommand>(instanceId);
- onDecodeCommand(buf, *msg);
- return msg;
-}
-
-void
-ProtocolSerialization4_2::onEncode(GBBuf& buf, const api::DestroyVisitorReply& msg) const
-{
- onEncodeReply(buf, msg);
-}
-
-api::StorageReply::UP
-ProtocolSerialization4_2::onDecodeDestroyVisitorReply(const SCmd& cmd, BBuf& buf) const
-{
- auto msg = std::make_unique<api::DestroyVisitorReply>(static_cast<const api::DestroyVisitorCommand&>(cmd));
- onDecodeReply(buf, *msg);
- return msg;
-}
-
-void
-ProtocolSerialization4_2::onEncode(GBBuf& buf, const api::RemoveLocationCommand& msg) const
-{
- buf.putString(msg.getDocumentSelection());
- putBucket(msg.getBucket(), buf);
- onEncodeCommand(buf, msg);
-}
-
-api::StorageCommand::UP
-ProtocolSerialization4_2::onDecodeRemoveLocationCommand(BBuf& buf) const
-{
- vespalib::stringref documentSelection = SH::getString(buf);
- document::Bucket bucket = getBucket(buf);
-
- auto msg = std::make_unique<api::RemoveLocationCommand>(documentSelection, bucket);
- onDecodeCommand(buf, *msg);
- return msg;
-}
-
-void
-ProtocolSerialization4_2::onEncode(GBBuf& buf, const api::RemoveLocationReply& msg) const
-{
- onEncodeBucketInfoReply(buf, msg);
-}
-
-api::StorageReply::UP
-ProtocolSerialization4_2::onDecodeRemoveLocationReply(const SCmd& cmd, BBuf& buf) const
-{
- auto msg = std::make_unique<api::RemoveLocationReply>(static_cast<const api::RemoveLocationCommand&>(cmd));
- onDecodeBucketInfoReply(buf, *msg);
- return msg;
-}
-
-void ProtocolSerialization4_2::onEncode(GBBuf&, const api::StatBucketCommand&) const {
- throw vespalib::IllegalStateException("StatBucketCommand not expected for legacy protocol version", VESPA_STRLOC);
-}
-
-api::StorageCommand::UP
-ProtocolSerialization4_2::onDecodeStatBucketCommand(BBuf&) const {
- throw vespalib::IllegalStateException("StatBucketCommand not expected for legacy protocol version", VESPA_STRLOC);
-}
-
-void ProtocolSerialization4_2::onEncode(GBBuf&, const api::StatBucketReply&) const {
- throw vespalib::IllegalStateException("StatBucketReply not expected for legacy protocol version", VESPA_STRLOC);
-}
-
-api::StorageReply::UP
-ProtocolSerialization4_2::onDecodeStatBucketReply(const SCmd&, BBuf&) const {
- throw vespalib::IllegalStateException("StatBucketReply not expected for legacy protocol version", VESPA_STRLOC);
-}
-
-// Utility functions for serialization
-
-void
-ProtocolSerialization4_2::onEncodeBucketInfoCommand(GBBuf& buf, const api::BucketInfoCommand& msg) const
-{
- onEncodeCommand(buf, msg);
-}
-
-void
-ProtocolSerialization4_2::onDecodeBucketInfoCommand(BBuf& buf, api::BucketInfoCommand& msg) const
-{
- onDecodeCommand(buf, msg);
-}
-
-void
-ProtocolSerialization4_2::onEncode(GBBuf& buf, const api::ReturnCode& rc) const
-{
- // Convert error code to codes used in 4.2
- buf.putInt(getOldErrorCode(rc.getResult()));
- buf.putString(rc.getMessage());
-}
-
-void
-ProtocolSerialization4_2::onEncodeDiffEntry(GBBuf& buf, const api::GetBucketDiffCommand::Entry& entry) const
-{
- buf.putLong(entry._timestamp);
- SH::putGlobalId(entry._gid, buf);
- buf.putInt(entry._headerSize);
- buf.putInt(entry._bodySize);
- buf.putShort(entry._flags);
- buf.putShort(entry._hasMask);
-}
-
-void
-ProtocolSerialization4_2::onDecodeDiffEntry(BBuf& buf, api::GetBucketDiffCommand::Entry& entry) const
-{
- entry._timestamp = SH::getLong(buf);
- entry._gid = SH::getGlobalId(buf);
- entry._headerSize = SH::getInt(buf);
- entry._bodySize = SH::getInt(buf);
- entry._flags = SH::getShort(buf);
- entry._hasMask = SH::getShort(buf);
-}
-
-}
diff --git a/storage/src/vespa/storageapi/mbusprot/protocolserialization4_2.h b/storage/src/vespa/storageapi/mbusprot/protocolserialization4_2.h
deleted file mode 100644
index b8a20e9e401..00000000000
--- a/storage/src/vespa/storageapi/mbusprot/protocolserialization4_2.h
+++ /dev/null
@@ -1,72 +0,0 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#pragma once
-
-#include "legacyprotocolserialization.h"
-
-namespace storage::mbusprot {
-
-class ProtocolSerialization4_2 : public LegacyProtocolSerialization {
-public:
- explicit ProtocolSerialization4_2(const std::shared_ptr<const document::DocumentTypeRepo>&);
-
-protected:
- void onEncode(GBBuf&, const api::GetCommand&) const override;
- void onEncode(GBBuf&, const api::RemoveCommand&) const override;
- void onEncode(GBBuf&, const api::RevertCommand&) const override;
- void onEncode(GBBuf&, const api::CreateBucketCommand&) const override;
- void onEncode(GBBuf&, const api::MergeBucketCommand&) const override;
- void onEncode(GBBuf&, const api::GetBucketDiffCommand&) const override;
- void onEncode(GBBuf&, const api::ApplyBucketDiffCommand&) const override;
- void onEncode(GBBuf&, const api::RequestBucketInfoReply&) const override;
- void onEncode(GBBuf&, const api::NotifyBucketChangeCommand&) const override;
- void onEncode(GBBuf&, const api::NotifyBucketChangeReply&) const override;
- void onEncode(GBBuf&, const api::SplitBucketCommand&) const override;
- void onEncode(GBBuf&, const api::CreateVisitorCommand&) const override;
- void onEncode(GBBuf&, const api::DestroyVisitorCommand&) const override;
- void onEncode(GBBuf&, const api::DestroyVisitorReply&) const override;
- void onEncode(GBBuf&, const api::RemoveLocationCommand&) const override;
- void onEncode(GBBuf&, const api::RemoveLocationReply&) const override;
- void onEncode(GBBuf&, const api::StatBucketCommand&) const override;
- void onEncode(GBBuf&, const api::StatBucketReply&) const override;
-
- // Not supported on 4.2, but implemented here for simplicity.
- void onEncode(GBBuf&, const api::SetBucketStateCommand&) const override;
- void onEncode(GBBuf&, const api::SetBucketStateReply&) const override;
-
- virtual void onEncodeBucketInfoCommand(GBBuf&, const api::BucketInfoCommand&) const;
- virtual void onEncodeBucketInfoReply(GBBuf&, const api::BucketInfoReply&) const = 0;
- virtual void onEncodeCommand(GBBuf&, const api::StorageCommand&) const = 0;
- virtual void onEncodeReply(GBBuf&, const api::StorageReply&) const = 0;
-
- virtual void onEncodeDiffEntry(GBBuf&, const api::GetBucketDiffCommand::Entry&) const;
- virtual void onEncode(GBBuf&, const api::ReturnCode&) const;
- SCmd::UP onDecodeGetCommand(BBuf&) const override;
- SCmd::UP onDecodeRemoveCommand(BBuf&) const override;
- SCmd::UP onDecodeRevertCommand(BBuf&) const override;
- SCmd::UP onDecodeCreateBucketCommand(BBuf&) const override;
- SCmd::UP onDecodeMergeBucketCommand(BBuf&) const override;
- SCmd::UP onDecodeGetBucketDiffCommand(BBuf&) const override;
- SCmd::UP onDecodeApplyBucketDiffCommand(BBuf&) const override;
- SRep::UP onDecodeRequestBucketInfoReply(const SCmd&, BBuf&) const override;
- SCmd::UP onDecodeNotifyBucketChangeCommand(BBuf&) const override;
- SRep::UP onDecodeNotifyBucketChangeReply(const SCmd&, BBuf&) const override;
- SCmd::UP onDecodeSplitBucketCommand(BBuf&) const override;
- SCmd::UP onDecodeSetBucketStateCommand(BBuf&) const override;
- SRep::UP onDecodeSetBucketStateReply(const SCmd&, BBuf&) const override;
- SCmd::UP onDecodeCreateVisitorCommand(BBuf&) const override;
- SCmd::UP onDecodeDestroyVisitorCommand(BBuf&) const override;
- SRep::UP onDecodeDestroyVisitorReply(const SCmd&, BBuf&) const override;
- SCmd::UP onDecodeRemoveLocationCommand(BBuf&) const override;
- SRep::UP onDecodeRemoveLocationReply(const SCmd&, BBuf&) const override;
- SCmd::UP onDecodeStatBucketCommand(BBuf&) const override;
- SRep::UP onDecodeStatBucketReply(const SCmd&, BBuf&) const override;
-
- virtual void onDecodeBucketInfoCommand(BBuf&, api::BucketInfoCommand&) const;
- virtual void onDecodeBucketInfoReply(BBuf&, api::BucketInfoReply&) const = 0;
- virtual void onDecodeCommand(BBuf& buf, api::StorageCommand& msg) const = 0;
- virtual void onDecodeReply(BBuf&, api::StorageReply&) const = 0;
-
- virtual void onDecodeDiffEntry(BBuf&, api::GetBucketDiffCommand::Entry&) const;
-};
-
-}
diff --git a/storage/src/vespa/storageapi/mbusprot/protocolserialization5_0.cpp b/storage/src/vespa/storageapi/mbusprot/protocolserialization5_0.cpp
deleted file mode 100644
index e49eb8842ff..00000000000
--- a/storage/src/vespa/storageapi/mbusprot/protocolserialization5_0.cpp
+++ /dev/null
@@ -1,622 +0,0 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-#include "protocolserialization5_0.h"
-#include "serializationhelper.h"
-#include "storagecommand.h"
-#include "storagereply.h"
-#include <vespa/storageapi/message/bucketsplitting.h>
-#include <vespa/storageapi/message/visitor.h>
-#include <vespa/vdslib/state/clusterstate.h>
-#include <vespa/document/update/documentupdate.h>
-#include <vespa/document/bucket/fixed_bucket_spaces.h>
-#include <sstream>
-
-using document::BucketSpace;
-using document::FixedBucketSpaces;
-
-namespace storage::mbusprot {
-
-document::Bucket
-ProtocolSerialization5_0::getBucket(document::ByteBuffer& buf) const
-{
- document::BucketId bucketId(SH::getLong(buf));
- return document::Bucket(FixedBucketSpaces::default_space(), bucketId);
-}
-
-void
-ProtocolSerialization5_0::putBucket(const document::Bucket& bucket, vespalib::GrowableByteBuffer& buf) const
-{
- buf.putLong(bucket.getBucketId().getRawId());
- if (bucket.getBucketSpace() != FixedBucketSpaces::default_space()) {
- std::ostringstream ost;
- ost << "Bucket with bucket space " << bucket.getBucketSpace() << " cannot be serialized on old storageapi protocol.";
- throw vespalib::IllegalArgumentException(ost.str(), VESPA_STRLOC);
- }
-}
-
-document::BucketSpace
-ProtocolSerialization5_0::getBucketSpace(document::ByteBuffer&) const
-{
- return FixedBucketSpaces::default_space();
-}
-
-void
-ProtocolSerialization5_0::putBucketSpace(document::BucketSpace bucketSpace, vespalib::GrowableByteBuffer&) const
-{
- if (bucketSpace != FixedBucketSpaces::default_space()) {
- std::ostringstream ost;
- ost << "Bucket space " << bucketSpace << " cannot be serialized on old storageapi protocol.";
- throw vespalib::IllegalArgumentException(ost.str(), VESPA_STRLOC);
- }
-}
-
-api::BucketInfo
-ProtocolSerialization5_0::getBucketInfo(document::ByteBuffer& buf) const
-{
- uint32_t crc(SH::getInt(buf));
- uint32_t doccount(SH::getInt(buf));
- uint32_t docsize(SH::getInt(buf));
- uint32_t metacount(SH::getInt(buf));
- uint32_t usedsize(SH::getInt(buf));
- return api::BucketInfo(crc, doccount, docsize, metacount, usedsize);
-}
-
-void
-ProtocolSerialization5_0::putBucketInfo(const api::BucketInfo& info, vespalib::GrowableByteBuffer& buf) const
-{
- buf.putInt(info.getChecksum());
- buf.putInt(info.getDocumentCount());
- buf.putInt(info.getTotalDocumentSize());
- buf.putInt(info.getMetaCount());
- buf.putInt(info.getUsedFileSize());
-}
-
-void
-ProtocolSerialization5_0::onEncodeReply(GBBuf& buf, const api::StorageReply& msg) const
-{
- SH::putReturnCode(msg.getResult(), buf);
- buf.putLong(msg.getMsgId());
- buf.putByte(msg.getPriority());
-}
-
-void
-ProtocolSerialization5_0::onDecodeReply(BBuf& buf, api::StorageReply& msg) const
-{
- msg.setResult(SH::getReturnCode(buf));
- msg.forceMsgId(SH::getLong(buf));
- msg.setPriority(SH::getByte(buf));
-}
-
-void
-ProtocolSerialization5_0::onEncodeCommand(GBBuf& buf, const api::StorageCommand& msg) const
-{
- buf.putLong(msg.getMsgId());
- buf.putByte(msg.getPriority());
- buf.putShort(msg.getSourceIndex());
- buf.putInt(0); // LoadType 'default'
-}
-
-void
-ProtocolSerialization5_0::onDecodeCommand(BBuf& buf, api::StorageCommand& msg) const
-{
- msg.forceMsgId(SH::getLong(buf));
- uint8_t priority = SH::getByte(buf);
- msg.setPriority(priority);
- msg.setSourceIndex(SH::getShort(buf));
- (void)SH::getInt(buf); // LoadType
-}
-
-
-ProtocolSerialization5_0::ProtocolSerialization5_0(const std::shared_ptr<const document::DocumentTypeRepo>& repo)
- : ProtocolSerialization4_2(repo)
-{
-}
-
-void ProtocolSerialization5_0::onEncode(GBBuf& buf, const api::PutReply& msg) const
-{
- buf.putBoolean(msg.wasFound());
- onEncodeBucketInfoReply(buf, msg);
-}
-
-void ProtocolSerialization5_0::onEncode(GBBuf& buf, const api::PutCommand& msg) const
-{
- SH::putDocument(msg.getDocument().get(), buf);
- putBucket(msg.getBucket(), buf);
- buf.putLong(msg.getTimestamp());
- buf.putLong(msg.getUpdateTimestamp());
- onEncodeBucketInfoCommand(buf, msg);
-}
-
-api::StorageCommand::UP
-ProtocolSerialization5_0::onDecodePutCommand(BBuf& buf) const
-{
- document::Document::SP doc(SH::getDocument(buf, getTypeRepo()));
- document::Bucket bucket = getBucket(buf);
- api::Timestamp ts(SH::getLong(buf));
- auto msg = std::make_unique<api::PutCommand>(bucket, doc, ts);
- msg->setUpdateTimestamp(SH::getLong(buf));
- onDecodeBucketInfoCommand(buf, *msg);
- return msg;
-}
-
-api::StorageReply::UP
-ProtocolSerialization5_0::onDecodePutReply(const SCmd& cmd, BBuf& buf) const
-{
- bool wasFound = SH::getBoolean(buf);
- auto msg = std::make_unique<api::PutReply>(static_cast<const api::PutCommand&>(cmd), wasFound);
- onDecodeBucketInfoReply(buf, *msg);
- return msg;
-}
-
-void ProtocolSerialization5_0::onEncode(GBBuf& buf, const api::UpdateReply& msg) const
-{
- buf.putLong(msg.getOldTimestamp());
- onEncodeBucketInfoReply(buf, msg);
-}
-
-api::StorageReply::UP
-ProtocolSerialization5_0::onDecodeUpdateReply(const SCmd& cmd, BBuf& buf) const
-{
- api::Timestamp oldTimestamp(SH::getLong(buf));
- auto msg = std::make_unique<api::UpdateReply>(static_cast<const api::UpdateCommand&>(cmd), oldTimestamp);
- onDecodeBucketInfoReply(buf, *msg);
- return msg;
-}
-
-void ProtocolSerialization5_0::onEncode(GBBuf& buf, const api::GetReply& msg) const
-{
- SH::putDocument(msg.getDocument().get(), buf);
- // Old protocol version doesn't understand tombstones. Make it appear as Not Found.
- buf.putLong(msg.is_tombstone() ? api::Timestamp(0) : msg.getLastModifiedTimestamp());
- onEncodeBucketInfoReply(buf, msg);
-}
-
-api::StorageReply::UP
-ProtocolSerialization5_0::onDecodeGetReply(const SCmd& cmd, BBuf& buf) const
-{
- try {
- document::Document::SP doc(SH::getDocument(buf, getTypeRepo()));
- api::Timestamp lastModified(SH::getLong(buf));
- auto msg = std::make_unique<api::GetReply>(static_cast<const api::GetCommand&>(cmd), doc,lastModified);
- onDecodeBucketInfoReply(buf, *msg);
- return msg;
- } catch (std::exception& e) {
- auto msg = std::make_unique<api::GetReply>(static_cast<const api::GetCommand&>(cmd), document::Document::SP(),0);
- msg->setResult(api::ReturnCode(api::ReturnCode::UNPARSEABLE, e.what()));
- return msg;
- }
-}
-
-void ProtocolSerialization5_0::onEncode(GBBuf& buf, const api::RemoveReply& msg) const
-{
- buf.putLong(msg.getOldTimestamp());
- onEncodeBucketInfoReply(buf, msg);
-}
-
-api::StorageReply::UP
-ProtocolSerialization5_0::onDecodeRemoveReply(const SCmd& cmd, BBuf& buf) const
-{
- api::Timestamp oldTimestamp(SH::getLong(buf));
- auto msg = std::make_unique<api::RemoveReply>(static_cast<const api::RemoveCommand&>(cmd), oldTimestamp);
- onDecodeBucketInfoReply(buf, *msg);
- return msg;
-}
-
-void ProtocolSerialization5_0::onEncode(GBBuf& buf, const api::UpdateCommand& msg) const
-{
- document::DocumentUpdate* update = msg.getUpdate().get();
- if (update) {
- vespalib::nbostream stream;
- update->serializeHEAD(stream);
- buf.putInt(stream.size());
- buf.putBytes(stream.peek(), stream.size());
- } else {
- buf.putInt(0);
- }
-
- putBucket(msg.getBucket(), buf);
- buf.putLong(msg.getTimestamp());
- buf.putLong(msg.getOldTimestamp());
- onEncodeBucketInfoCommand(buf, msg);
-}
-
-api::StorageCommand::UP
-ProtocolSerialization5_0::onDecodeUpdateCommand(BBuf& buf) const
-{
- document::DocumentUpdate::SP update;
-
- uint32_t size = SH::getInt(buf);
- if (size != 0) {
- update = document::DocumentUpdate::createHEAD(getTypeRepo(), vespalib::nbostream(buf.getBufferAtPos(), size));
- buf.incPos(size);
- }
-
- document::Bucket bucket = getBucket(buf);
- api::Timestamp timestamp(SH::getLong(buf));
- api::UpdateCommand::UP msg = std::make_unique<api::UpdateCommand>(bucket, update, timestamp);
- msg->setOldTimestamp(SH::getLong(buf));
- onDecodeBucketInfoCommand(buf, *msg);
- return msg;
-}
-
-
-void ProtocolSerialization5_0::onEncode(GBBuf& buf, const api::RevertReply& msg) const
-{
- onEncodeBucketInfoReply(buf, msg);
-}
-
-api::StorageReply::UP
-ProtocolSerialization5_0::onDecodeRevertReply(const SCmd& cmd, BBuf& buf) const
-{
- auto msg = std::make_unique<api::RevertReply>(static_cast<const api::RevertCommand&>(cmd));
- onDecodeBucketInfoReply(buf, *msg);
- return msg;
-}
-
-void ProtocolSerialization5_0::onEncode(GBBuf& buf, const api::CreateBucketReply& msg) const
-{
- onEncodeBucketInfoReply(buf, msg);
-}
-
-api::StorageReply::UP
-ProtocolSerialization5_0::onDecodeCreateBucketReply(const SCmd& cmd, BBuf& buf) const
-{
- auto msg = std::make_unique<api::CreateBucketReply>(static_cast<const api::CreateBucketCommand&>(cmd));
- onDecodeBucketInfoReply(buf, *msg);
- return msg;
-}
-
-void
-ProtocolSerialization5_0::onEncode(GBBuf& buf, const api::DeleteBucketCommand& msg) const
-{
- putBucket(msg.getBucket(), buf);
- onEncodeBucketInfoCommand(buf, msg);
- putBucketInfo(msg.getBucketInfo(), buf);
-}
-
-api::StorageCommand::UP
-ProtocolSerialization5_0::onDecodeDeleteBucketCommand(BBuf& buf) const
-{
- document::Bucket bucket = getBucket(buf);
- auto msg = std::make_unique<api::DeleteBucketCommand>(bucket);
- onDecodeBucketInfoCommand(buf, *msg);
- if (buf.getRemaining() >= SH::BUCKET_INFO_SERIALIZED_SIZE) {
- msg->setBucketInfo(getBucketInfo(buf));
- }
- return msg;
-}
-
-void ProtocolSerialization5_0::onEncode(GBBuf& buf, const api::DeleteBucketReply& msg) const
-{
- onEncodeBucketInfoReply(buf, msg);
-}
-
-api::StorageReply::UP
-ProtocolSerialization5_0::onDecodeDeleteBucketReply(const SCmd& cmd, BBuf& buf) const
-{
- auto msg = std::make_unique<api::DeleteBucketReply>(static_cast<const api::DeleteBucketCommand&>(cmd));
- onDecodeBucketInfoReply(buf, *msg);
- return msg;
-}
-
-void ProtocolSerialization5_0::onEncode(GBBuf& buf, const api::MergeBucketCommand& msg) const
-{
- ProtocolSerialization4_2::onEncode(buf, msg);
-
- buf.putInt(msg.getClusterStateVersion());
- const std::vector<uint16_t>& chain(msg.getChain());
- buf.putShort(chain.size());
- for (std::size_t i = 0; i < chain.size(); ++i) {
- buf.putShort(chain[i]);
- }
-}
-
-api::StorageCommand::UP
-ProtocolSerialization5_0::onDecodeMergeBucketCommand(BBuf& buf) const
-{
- api::StorageCommand::UP cmd = ProtocolSerialization4_2::onDecodeMergeBucketCommand(buf);
- uint32_t clusterStateVersion = SH::getInt(buf);
- uint16_t chainSize = SH::getShort(buf);
- std::vector<uint16_t> chain;
- chain.reserve(chainSize);
- for (std::size_t i = 0; i < chainSize; ++i) {
- uint16_t index = SH::getShort(buf);
- chain.push_back(index);
- }
- api::MergeBucketCommand& mergeCmd = static_cast<api::MergeBucketCommand&>(*cmd);
- mergeCmd.setChain(chain);
- mergeCmd.setClusterStateVersion(clusterStateVersion);
- return cmd;
-}
-
-void ProtocolSerialization5_0::onEncode(GBBuf& buf, const api::MergeBucketReply& msg) const
-{
- onEncodeBucketReply(buf, msg);
-}
-
-api::StorageReply::UP
-ProtocolSerialization5_0::onDecodeMergeBucketReply(const SCmd& cmd, BBuf& buf) const
-{
- auto msg = std::make_unique<api::MergeBucketReply>(static_cast<const api::MergeBucketCommand&>(cmd));
- onDecodeBucketReply(buf, *msg);
- return msg;
-}
-
-void ProtocolSerialization5_0::onEncode(GBBuf& buf, const api::GetBucketDiffReply& msg) const
-{
- const std::vector<api::GetBucketDiffCommand::Entry>& entries(msg.getDiff());
- buf.putInt(entries.size());
- for (uint32_t i=0; i<entries.size(); ++i) {
- onEncodeDiffEntry(buf, entries[i]);
- }
- onEncodeBucketReply(buf, msg);
-}
-
-api::StorageReply::UP
-ProtocolSerialization5_0::onDecodeGetBucketDiffReply(const SCmd& cmd, BBuf& buf) const
-{
- auto msg = std::make_unique<api::GetBucketDiffReply>(static_cast<const api::GetBucketDiffCommand&>(cmd));
- std::vector<api::GetBucketDiffCommand::Entry>& entries(msg->getDiff());
- uint32_t entryCount = SH::getInt(buf);
- if (entryCount > buf.getRemaining()) {
- // Trigger out of bounds exception rather than out of memory error
- buf.incPos(entryCount);
- }
- entries.resize(entryCount);
- for (uint32_t i=0; i<entries.size(); ++i) {
- onDecodeDiffEntry(buf, entries[i]);
- }
- onDecodeBucketReply(buf, *msg);
- return msg;
-}
-
-void ProtocolSerialization5_0::onEncode(GBBuf& buf, const api::ApplyBucketDiffReply& msg) const
-{
- const std::vector<api::ApplyBucketDiffCommand::Entry>& entries(msg.getDiff());
- buf.putInt(entries.size());
- for (uint32_t i=0; i<entries.size(); ++i) {
- onEncodeDiffEntry(buf, entries[i]._entry);
- buf.putString(entries[i]._docName);
- buf.putInt(entries[i]._headerBlob.size());
- buf.putBytes(&entries[i]._headerBlob[0], entries[i]._headerBlob.size());
- buf.putInt(entries[i]._bodyBlob.size());
- buf.putBytes(&entries[i]._bodyBlob[0], entries[i]._bodyBlob.size());
- }
- onEncodeBucketInfoReply(buf, msg);
-}
-
-api::StorageReply::UP
-ProtocolSerialization5_0::onDecodeApplyBucketDiffReply(const SCmd& cmd,
- BBuf& buf) const
-{
- auto msg = std::make_unique<api::ApplyBucketDiffReply>(static_cast<const api::ApplyBucketDiffCommand&>(cmd));
- std::vector<api::ApplyBucketDiffCommand::Entry>& entries(msg->getDiff());
- uint32_t entryCount = SH::getInt(buf);
- if (entryCount > buf.getRemaining()) {
- // Trigger out of bounds exception rather than out of memory error
- buf.incPos(entryCount);
- }
- entries.resize(entryCount);
- for (uint32_t i=0; i<entries.size(); ++i) {
- onDecodeDiffEntry(buf, entries[i]._entry);
- entries[i]._docName = SH::getString(buf);
- uint32_t headerSize = SH::getInt(buf);
- if (headerSize > buf.getRemaining()) {
- buf.incPos(headerSize);
- }
- entries[i]._headerBlob.resize(headerSize);
- buf.getBytes(&entries[i]._headerBlob[0], entries[i]._headerBlob.size());
- uint32_t bodySize = SH::getInt(buf);
- if (bodySize > buf.getRemaining()) {
- buf.incPos(bodySize);
- }
- entries[i]._bodyBlob.resize(bodySize);
- buf.getBytes(&entries[i]._bodyBlob[0], entries[i]._bodyBlob.size());
- }
- onDecodeBucketInfoReply(buf, *msg);
- return msg;
-}
-
-void ProtocolSerialization5_0::onEncode(GBBuf& buf, const api::SplitBucketReply& msg) const
-{
- const std::vector<api::SplitBucketReply::Entry>& entries(msg.getSplitInfo());
- buf.putInt(entries.size());
- for (std::vector<api::SplitBucketReply::Entry>::const_iterator it
- = entries.begin(); it != entries.end(); ++it)
- {
- buf.putLong(it->first.getRawId());
- putBucketInfo(it->second, buf);
- }
- onEncodeBucketReply(buf, msg);
-}
-
-api::StorageReply::UP
-ProtocolSerialization5_0::onDecodeSplitBucketReply(const SCmd& cmd, BBuf& buf) const
-{
- auto msg = std::make_unique<api::SplitBucketReply>(static_cast<const api::SplitBucketCommand&>(cmd));
- std::vector<api::SplitBucketReply::Entry>& entries(msg->getSplitInfo());
- uint32_t targetCount = SH::getInt(buf);
- if (targetCount > buf.getRemaining()) {
- // Trigger out of bounds exception rather than out of memory error
- buf.incPos(targetCount);
- }
- entries.resize(targetCount);
- for (std::vector<api::SplitBucketReply::Entry>::iterator it
- = entries.begin(); it != entries.end(); ++it)
- {
- it->first = document::BucketId(SH::getLong(buf));
- it->second = getBucketInfo(buf);
- }
- onDecodeBucketReply(buf, *msg);
- return msg;
-}
-
-void
-ProtocolSerialization5_0::onEncode(GBBuf& buf, const api::JoinBucketsCommand& msg) const
-{
- putBucket(msg.getBucket(), buf);
- buf.putInt(msg.getSourceBuckets().size());
- for (uint32_t i=0, n=msg.getSourceBuckets().size(); i<n; ++i) {
- buf.putLong(msg.getSourceBuckets()[i].getRawId());
- }
- buf.putByte(msg.getMinJoinBits());
- onEncodeCommand(buf, msg);
-}
-
-api::StorageCommand::UP
-ProtocolSerialization5_0::onDecodeJoinBucketsCommand(BBuf& buf) const
-{
- document::Bucket bucket = getBucket(buf);
- auto msg = std::make_unique<api::JoinBucketsCommand>(bucket);
- uint32_t size = SH::getInt(buf);
- if (size > buf.getRemaining()) {
- // Trigger out of bounds exception rather than out of memory error
- buf.incPos(size);
- }
- std::vector<document::BucketId>& entries(msg->getSourceBuckets());
- for (uint32_t i=0; i<size; ++i) {
- entries.push_back(document::BucketId(SH::getLong(buf)));
- }
- msg->setMinJoinBits(SH::getByte(buf));
- onDecodeCommand(buf, *msg);
- return msg;
-}
-
-void
-ProtocolSerialization5_0::onEncode(GBBuf& buf, const api::JoinBucketsReply& msg) const
-{
- putBucketInfo(msg.getBucketInfo(), buf);
- onEncodeBucketReply(buf, msg);
-}
-
-api::StorageReply::UP
-ProtocolSerialization5_0::onDecodeJoinBucketsReply(const SCmd& cmd, BBuf& buf) const
-{
- auto msg = std::make_unique<api::JoinBucketsReply>(static_cast<const api::JoinBucketsCommand&>(cmd));
- msg->setBucketInfo(getBucketInfo(buf));
- onDecodeBucketReply(buf, *msg);
- return msg;
-}
-
-void
-ProtocolSerialization5_0::onEncodeBucketInfoReply(GBBuf& buf, const api::BucketInfoReply& msg) const
-{
- onEncodeBucketReply(buf, msg);
- putBucketInfo(msg.getBucketInfo(), buf);
-}
-
-void
-ProtocolSerialization5_0::onDecodeBucketInfoReply(BBuf& buf, api::BucketInfoReply& msg) const
-{
- onDecodeBucketReply(buf, msg);
- msg.setBucketInfo(getBucketInfo(buf));
-}
-
-void
-ProtocolSerialization5_0::onEncodeBucketReply(GBBuf& buf, const api::BucketReply& msg) const
-{
- onEncodeReply(buf, msg);
- buf.putLong(msg.hasBeenRemapped() ? msg.getBucketId().getRawId() : 0);
-}
-
-void
-ProtocolSerialization5_0::onDecodeBucketReply(BBuf& buf, api::BucketReply& msg) const
-{
- onDecodeReply(buf, msg);
- document::BucketId bucket(SH::getLong(buf));
- if (bucket.getRawId() != 0) {
- msg.remapBucketId(bucket);
- }
-}
-
-void
-ProtocolSerialization5_0::onEncode(GBBuf& buf, const api::CreateVisitorReply& msg) const
-{
- onEncodeReply(buf, msg);
- buf.putInt(msg.getVisitorStatistics().getBucketsVisited());
- buf.putLong(msg.getVisitorStatistics().getDocumentsVisited());
- buf.putLong(msg.getVisitorStatistics().getBytesVisited());
- buf.putLong(msg.getVisitorStatistics().getDocumentsReturned());
- buf.putLong(msg.getVisitorStatistics().getBytesReturned());
- buf.putLong(0);
- buf.putLong(0);
-}
-
-api::StorageReply::UP
-ProtocolSerialization5_0::onDecodeCreateVisitorReply(const SCmd& cmd, BBuf& buf) const
-{
- auto msg = std::make_unique<api::CreateVisitorReply>(static_cast<const api::CreateVisitorCommand&>(cmd));
- onDecodeReply(buf, *msg);
-
- vdslib::VisitorStatistics vs;
- vs.setBucketsVisited(SH::getInt(buf));
- vs.setDocumentsVisited(SH::getLong(buf));
- vs.setBytesVisited(SH::getLong(buf));
- vs.setDocumentsReturned(SH::getLong(buf));
- vs.setBytesReturned(SH::getLong(buf));
- SH::getLong(buf);
- SH::getLong(buf);
- msg->setVisitorStatistics(vs);
-
- return msg;
-}
-
-void ProtocolSerialization5_0::onEncode(GBBuf& buf, const api::RequestBucketInfoCommand& msg) const
-{
- const std::vector<document::BucketId>& buckets(msg.getBuckets());
- buf.putInt(buckets.size());
- for (uint32_t i=0; i<buckets.size(); ++i) {
- buf.putLong(buckets[i].getRawId());
- }
- putBucketSpace(msg.getBucketSpace(), buf);
- if (buckets.size() == 0) {
- buf.putShort(msg.getDistributor());
- buf.putString(msg.getSystemState().toString());
- buf.putString(msg.getDistributionHash());
- }
-
- onEncodeCommand(buf, msg);
-}
-
-api::StorageCommand::UP
-ProtocolSerialization5_0::onDecodeRequestBucketInfoCommand(BBuf& buf) const
-{
- std::vector<document::BucketId> buckets(SH::getInt(buf));
- for (uint32_t i=0; i<buckets.size(); ++i) {
- buckets[i] = document::BucketId(SH::getLong(buf));
- }
- api::RequestBucketInfoCommand::UP msg;
- BucketSpace bucketSpace = getBucketSpace(buf);
- if (buckets.size() != 0) {
- msg.reset(new api::RequestBucketInfoCommand(bucketSpace, buckets));
- } else {
- int distributor = SH::getShort(buf);
- lib::ClusterState state(SH::getString(buf));
- msg.reset(new api::RequestBucketInfoCommand(bucketSpace, distributor, state, SH::getString(buf)));
- }
- onDecodeCommand(buf, *msg);
- return msg;
-}
-
-void
-ProtocolSerialization5_0::onEncode(GBBuf& buf, const api::CreateVisitorCommand& cmd) const
-{
- ProtocolSerialization4_2::onEncode(buf, cmd);
-
- buf.putInt(0); // Unused
- buf.putInt(cmd.getMaxBucketsPerVisitor());
-}
-
-api::StorageCommand::UP
-ProtocolSerialization5_0::onDecodeCreateVisitorCommand(BBuf& buf) const
-{
- api::StorageCommand::UP cvc = ProtocolSerialization4_2::onDecodeCreateVisitorCommand(buf);
- SH::getInt(buf); // Unused
-
- static_cast<api::CreateVisitorCommand*>(cvc.get())->setMaxBucketsPerVisitor(SH::getInt(buf));
-
- static_cast<api::CreateVisitorCommand*>(cvc.get())->setVisitorDispatcherVersion(50);
- return cvc;
-}
-
-}
diff --git a/storage/src/vespa/storageapi/mbusprot/protocolserialization5_0.h b/storage/src/vespa/storageapi/mbusprot/protocolserialization5_0.h
deleted file mode 100644
index 8de2edab17f..00000000000
--- a/storage/src/vespa/storageapi/mbusprot/protocolserialization5_0.h
+++ /dev/null
@@ -1,73 +0,0 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#pragma once
-
-#include "protocolserialization4_2.h"
-
-namespace storage::mbusprot {
-
-class ProtocolSerialization5_0 : public ProtocolSerialization4_2 {
-public:
- ProtocolSerialization5_0(const std::shared_ptr<const document::DocumentTypeRepo>&);
-
- document::Bucket getBucket(document::ByteBuffer& buf) const override;
- void putBucket(const document::Bucket& bucket, vespalib::GrowableByteBuffer& buf) const override;
- document::BucketSpace getBucketSpace(document::ByteBuffer& buf) const override;
- void putBucketSpace(document::BucketSpace bucketSpace, vespalib::GrowableByteBuffer& buf) const override;
- api::BucketInfo getBucketInfo(document::ByteBuffer& buf) const override;
- void putBucketInfo(const api::BucketInfo& info, vespalib::GrowableByteBuffer& buf) const override;
-
- void onEncode(GBBuf&, const api::PutCommand&) const override;
- void onEncode(GBBuf&, const api::PutReply&) const override;
- void onEncode(GBBuf&, const api::UpdateCommand&) const override;
- void onEncode(GBBuf&, const api::UpdateReply&) const override;
- void onEncode(GBBuf&, const api::GetReply&) const override;
- void onEncode(GBBuf&, const api::RemoveReply&) const override;
- void onEncode(GBBuf&, const api::RevertReply&) const override;
- void onEncode(GBBuf&, const api::CreateBucketReply&) const override;
- void onEncode(GBBuf&, const api::DeleteBucketCommand&) const override;
- void onEncode(GBBuf&, const api::DeleteBucketReply&) const override;
- void onEncode(GBBuf&, const api::MergeBucketCommand&) const override;
- void onEncode(GBBuf&, const api::MergeBucketReply&) const override;
- void onEncode(GBBuf&, const api::GetBucketDiffReply&) const override;
- void onEncode(GBBuf&, const api::ApplyBucketDiffReply&) const override;
- void onEncode(GBBuf&, const api::SplitBucketReply&) const override;
- void onEncode(GBBuf&, const api::JoinBucketsCommand&) const override;
- void onEncode(GBBuf&, const api::JoinBucketsReply&) const override;
- void onEncode(GBBuf&, const api::RequestBucketInfoCommand&) const override;
-
- void onEncodeBucketInfoReply(GBBuf&, const api::BucketInfoReply&) const override;
- virtual void onEncodeBucketReply(GBBuf&, const api::BucketReply&) const;
-
- void onEncode(GBBuf&, const api::CreateVisitorCommand& msg) const override;
- void onEncode(GBBuf&, const api::CreateVisitorReply& msg) const override;
- void onEncodeCommand(GBBuf&, const api::StorageCommand&) const override;
- void onEncodeReply(GBBuf&, const api::StorageReply&) const override;
-
- SCmd::UP onDecodePutCommand(BBuf&) const override;
- SRep::UP onDecodePutReply(const SCmd&, BBuf&) const override;
- SCmd::UP onDecodeUpdateCommand(BBuf&) const override;
- SRep::UP onDecodeUpdateReply(const SCmd&, BBuf&) const override;
- SRep::UP onDecodeGetReply(const SCmd&, BBuf&) const override;
- SRep::UP onDecodeRemoveReply(const SCmd&, BBuf&) const override;
- SRep::UP onDecodeRevertReply(const SCmd&, BBuf&) const override;
- SRep::UP onDecodeCreateBucketReply(const SCmd&, BBuf&) const override;
- SCmd::UP onDecodeDeleteBucketCommand(BBuf&) const override;
- SRep::UP onDecodeDeleteBucketReply(const SCmd&, BBuf&) const override;
- SCmd::UP onDecodeMergeBucketCommand(BBuf&) const override;
- SRep::UP onDecodeMergeBucketReply(const SCmd&, BBuf&) const override;
- SRep::UP onDecodeGetBucketDiffReply(const SCmd&, BBuf&) const override;
- SRep::UP onDecodeApplyBucketDiffReply(const SCmd&, BBuf&) const override;
- SRep::UP onDecodeSplitBucketReply(const SCmd&, BBuf&) const override;
- SCmd::UP onDecodeJoinBucketsCommand(BBuf& buf) const override;
- SRep::UP onDecodeJoinBucketsReply(const SCmd& cmd, BBuf& buf) const override;
- SCmd::UP onDecodeCreateVisitorCommand(BBuf&) const override;
- SCmd::UP onDecodeRequestBucketInfoCommand(BBuf& buf) const override;
-
- void onDecodeBucketInfoReply(BBuf&, api::BucketInfoReply&) const override;
- virtual void onDecodeBucketReply(BBuf&, api::BucketReply&) const;
- SRep::UP onDecodeCreateVisitorReply(const SCmd& cmd, BBuf& buf) const override;
- void onDecodeCommand(BBuf& buf, api::StorageCommand& msg) const override;
- void onDecodeReply(BBuf&, api::StorageReply&) const override;
-};
-
-}
diff --git a/storage/src/vespa/storageapi/mbusprot/protocolserialization5_1.cpp b/storage/src/vespa/storageapi/mbusprot/protocolserialization5_1.cpp
deleted file mode 100644
index 97ceceac33d..00000000000
--- a/storage/src/vespa/storageapi/mbusprot/protocolserialization5_1.cpp
+++ /dev/null
@@ -1,198 +0,0 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-#include "protocolserialization5_1.h"
-#include "serializationhelper.h"
-#include "storagecommand.h"
-#include "storagereply.h"
-#include <vespa/storageapi/message/bucketsplitting.h>
-#include <vespa/storageapi/message/visitor.h>
-
-using document::BucketSpace;
-
-namespace storage::mbusprot {
-
-api::BucketInfo
-ProtocolSerialization5_1::getBucketInfo(document::ByteBuffer& buf) const
-{
- uint64_t lastModified(SH::getLong(buf));
- uint32_t crc(SH::getInt(buf));
- uint32_t doccount(SH::getInt(buf));
- uint32_t docsize(SH::getInt(buf));
- uint32_t metacount(SH::getInt(buf));
- uint32_t usedsize(SH::getInt(buf));
- uint8_t flags(SH::getByte(buf));
- bool ready = (flags & BUCKET_READY) != 0;
- bool active = (flags & BUCKET_ACTIVE) != 0;
- return api::BucketInfo(crc, doccount, docsize,
- metacount, usedsize,
- ready, active, lastModified);
-}
-
-void
-ProtocolSerialization5_1::putBucketInfo(
- const api::BucketInfo& info, vespalib::GrowableByteBuffer& buf) const
-{
- buf.putLong(info.getLastModified());
- buf.putInt(info.getChecksum());
- buf.putInt(info.getDocumentCount());
- buf.putInt(info.getTotalDocumentSize());
- buf.putInt(info.getMetaCount());
- buf.putInt(info.getUsedFileSize());
- uint8_t flags = (info.isReady() ? BUCKET_READY : 0) |
- (info.isActive() ? BUCKET_ACTIVE : 0);
- buf.putByte(flags);
-}
-
-ProtocolSerialization5_1::ProtocolSerialization5_1(
- const std::shared_ptr<const document::DocumentTypeRepo>& repo)
- : ProtocolSerialization5_0(repo)
-{
-}
-
-void ProtocolSerialization5_1::onEncode(GBBuf& buf, const api::SetBucketStateCommand& msg) const
-{
- putBucket(msg.getBucket(), buf);
- buf.putByte(static_cast<uint8_t>(msg.getState()));
- onEncodeCommand(buf, msg);
-}
-
-api::StorageCommand::UP
-ProtocolSerialization5_1::onDecodeSetBucketStateCommand(BBuf& buf) const
-{
- document::Bucket bucket = getBucket(buf);
- api::SetBucketStateCommand::BUCKET_STATE state(
- static_cast<api::SetBucketStateCommand::BUCKET_STATE>(SH::getByte(buf)));
- auto msg = std::make_unique<api::SetBucketStateCommand>(bucket, state);
- onDecodeCommand(buf, *msg);
- return msg;
-}
-
-void ProtocolSerialization5_1::onEncode(GBBuf& buf, const api::SetBucketStateReply& msg) const
-{
- onEncodeBucketReply(buf, msg);
-}
-
-api::StorageReply::UP
-ProtocolSerialization5_1::onDecodeSetBucketStateReply(const SCmd& cmd, BBuf& buf) const
-{
- auto msg = std::make_unique<api::SetBucketStateReply>(static_cast<const api::SetBucketStateCommand&>(cmd));
- onDecodeBucketReply(buf, *msg);
- return msg;
-}
-
-void ProtocolSerialization5_1::onEncode(GBBuf& buf, const api::GetCommand& msg) const
-{
- buf.putString(msg.getDocumentId().toString());
- putBucket(msg.getBucket(), buf);
- buf.putLong(msg.getBeforeTimestamp());
- buf.putString(msg.getFieldSet());
- onEncodeCommand(buf, msg);
-}
-
-api::StorageCommand::UP
-ProtocolSerialization5_1::onDecodeGetCommand(BBuf& buf) const
-{
- document::DocumentId did(SH::getString(buf));
- document::Bucket bucket = getBucket(buf);
- api::Timestamp beforeTimestamp(SH::getLong(buf));
- std::string fieldSet(SH::getString(buf));
- auto msg = std::make_unique<api::GetCommand>(bucket, did, fieldSet, beforeTimestamp);
- onDecodeCommand(buf, *msg);
- return msg;
-}
-
-void
-ProtocolSerialization5_1::onEncode(GBBuf& buf, const api::CreateVisitorCommand& msg) const
-{
- putBucketSpace(msg.getBucketSpace(), buf);
- buf.putString(msg.getLibraryName());
- buf.putString(msg.getInstanceId());
- buf.putString(msg.getDocumentSelection());
- buf.putInt(msg.getVisitorCmdId());
- buf.putString(msg.getControlDestination());
- buf.putString(msg.getDataDestination());
- buf.putInt(msg.getMaximumPendingReplyCount());
- buf.putLong(msg.getFromTime());
- buf.putLong(msg.getToTime());
-
- buf.putInt(msg.getBuckets().size());
- for (uint32_t i = 0; i < msg.getBuckets().size(); i++) {
- buf.putLong(msg.getBuckets()[i].getRawId());
- }
-
- buf.putBoolean(msg.visitRemoves());
- buf.putString(msg.getFieldSet());
- buf.putBoolean(msg.visitInconsistentBuckets());
- buf.putInt(vespalib::count_ms(msg.getQueueTimeout()));
- msg.getParameters().serialize(buf);
-
- onEncodeCommand(buf, msg);
-
- buf.putInt(0); // Unused
- buf.putInt(msg.getMaxBucketsPerVisitor());
-}
-
-api::StorageCommand::UP
-ProtocolSerialization5_1::onDecodeCreateVisitorCommand(BBuf& buf) const
-{
- BucketSpace bucketSpace = getBucketSpace(buf);
- vespalib::stringref libraryName = SH::getString(buf);
- vespalib::stringref instanceId = SH::getString(buf);
- vespalib::stringref selection = SH::getString(buf);
- auto msg = std::make_unique<api::CreateVisitorCommand>(bucketSpace, libraryName, instanceId, selection);
- msg->setVisitorCmdId(SH::getInt(buf));
- msg->setControlDestination(SH::getString(buf));
- msg->setDataDestination(SH::getString(buf));
- msg->setMaximumPendingReplyCount(SH::getInt(buf));
-
- msg->setFromTime(SH::getLong(buf));
- msg->setToTime(SH::getLong(buf));
- uint32_t count = SH::getInt(buf);
-
- if (count > buf.getRemaining()) {
- // Trigger out of bounds exception rather than out of memory error
- buf.incPos(count);
- }
-
- for (uint32_t i = 0; i < count; i++) {
- msg->getBuckets().push_back(document::BucketId(SH::getLong(buf)));
- }
-
- if (SH::getBoolean(buf)) {
- msg->setVisitRemoves();
- }
-
- msg->setFieldSet(SH::getString(buf));
-
- if (SH::getBoolean(buf)) {
- msg->setVisitInconsistentBuckets();
- }
- msg->setQueueTimeout(std::chrono::milliseconds(SH::getInt(buf)));
- msg->getParameters().deserialize(buf);
-
- onDecodeCommand(buf, *msg);
- SH::getInt(buf); // Unused
- msg->setMaxBucketsPerVisitor(SH::getInt(buf));
- msg->setVisitorDispatcherVersion(50);
- return msg;
-}
-
-void ProtocolSerialization5_1::onEncode(GBBuf& buf, const api::CreateBucketCommand& msg) const
-{
- putBucket(msg.getBucket(), buf);
- buf.putBoolean(msg.getActive());
- onEncodeBucketInfoCommand(buf, msg);
-}
-
-api::StorageCommand::UP
-ProtocolSerialization5_1::onDecodeCreateBucketCommand(BBuf& buf) const
-{
- document::Bucket bucket = getBucket(buf);
- bool setActive = SH::getBoolean(buf);
- auto msg = std::make_unique<api::CreateBucketCommand>(bucket);
- msg->setActive(setActive);
- onDecodeBucketInfoCommand(buf, *msg);
- return msg;
-}
-
-}
diff --git a/storage/src/vespa/storageapi/mbusprot/protocolserialization5_1.h b/storage/src/vespa/storageapi/mbusprot/protocolserialization5_1.h
deleted file mode 100644
index bfad492e653..00000000000
--- a/storage/src/vespa/storageapi/mbusprot/protocolserialization5_1.h
+++ /dev/null
@@ -1,34 +0,0 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#pragma once
-
-#include "protocolserialization5_0.h"
-
-namespace storage::mbusprot {
-
-class ProtocolSerialization5_1 : public ProtocolSerialization5_0
-{
- enum BucketState {
- BUCKET_READY = 0x1,
- BUCKET_ACTIVE = 0x2,
- };
-public:
- ProtocolSerialization5_1(const std::shared_ptr<const document::DocumentTypeRepo>&);
-
- api::BucketInfo getBucketInfo(document::ByteBuffer& buf) const override;
- void putBucketInfo(const api::BucketInfo& info, vespalib::GrowableByteBuffer& buf) const override;
-
-protected:
- void onEncode(GBBuf&, const api::SetBucketStateCommand&) const override;
- void onEncode(GBBuf&, const api::SetBucketStateReply&) const override;
- void onEncode(GBBuf&, const api::GetCommand&) const override;
- void onEncode(GBBuf&, const api::CreateVisitorCommand&) const override;
- void onEncode(GBBuf&, const api::CreateBucketCommand&) const override;
-
- SCmd::UP onDecodeSetBucketStateCommand(BBuf&) const override;
- SRep::UP onDecodeSetBucketStateReply(const SCmd&, BBuf&) const override;
- SCmd::UP onDecodeGetCommand(BBuf&) const override;
- SCmd::UP onDecodeCreateVisitorCommand(BBuf&) const override;
- SCmd::UP onDecodeCreateBucketCommand(BBuf&) const override;
-};
-
-}
diff --git a/storage/src/vespa/storageapi/mbusprot/protocolserialization5_2.cpp b/storage/src/vespa/storageapi/mbusprot/protocolserialization5_2.cpp
deleted file mode 100644
index 10aa9d69fbc..00000000000
--- a/storage/src/vespa/storageapi/mbusprot/protocolserialization5_2.cpp
+++ /dev/null
@@ -1,64 +0,0 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-// @author Vegard Sjonfjell
-
-#include "protocolserialization5_2.h"
-#include "storagecommand.h"
-#include "serializationhelper.h"
-
-namespace storage::mbusprot {
-
-using documentapi::TestAndSetCondition;
-
-void ProtocolSerialization5_2::onEncode(GBBuf & buf, const api::PutCommand & cmd) const
-{
- ProtocolSerialization5_0::onEncode(buf, cmd);
- encodeTasCondition(buf, cmd);
-}
-
-api::StorageCommand::UP
-ProtocolSerialization5_2::onDecodePutCommand(BBuf & buf) const
-{
- auto cmd = ProtocolSerialization5_0::onDecodePutCommand(buf);
- decodeTasCondition(*cmd, buf);
- return cmd;
-}
-
-void ProtocolSerialization5_2::onEncode(GBBuf & buf, const api::RemoveCommand & cmd) const
-{
- ProtocolSerialization4_2::onEncode(buf, cmd);
- encodeTasCondition(buf, cmd);
-}
-
-api::StorageCommand::UP
-ProtocolSerialization5_2::onDecodeRemoveCommand(BBuf & buf) const
-{
- auto cmd = ProtocolSerialization4_2::onDecodeRemoveCommand(buf);
- decodeTasCondition(*cmd, buf);
- return cmd;
-}
-
-void ProtocolSerialization5_2::onEncode(GBBuf & buf, const api::UpdateCommand & cmd) const
-{
- ProtocolSerialization5_0::onEncode(buf, cmd);
- encodeTasCondition(buf, cmd);
-}
-
-api::StorageCommand::UP
-ProtocolSerialization5_2::onDecodeUpdateCommand(BBuf & buf) const
-{
- auto cmd = ProtocolSerialization5_0::onDecodeUpdateCommand(buf);
- decodeTasCondition(*cmd, buf);
- return cmd;
-}
-
-void ProtocolSerialization5_2::decodeTasCondition(api::StorageCommand & storageCmd, BBuf & buf) {
- auto & cmd = static_cast<api::TestAndSetCommand &>(storageCmd);
- cmd.setCondition(TestAndSetCondition(SH::getString(buf)));
-}
-
-void ProtocolSerialization5_2::encodeTasCondition(GBBuf & buf, const api::StorageCommand & storageCmd) {
- auto & cmd = static_cast<const api::TestAndSetCommand &>(storageCmd);
- buf.putString(cmd.getCondition().getSelection());
-}
-
-}
diff --git a/storage/src/vespa/storageapi/mbusprot/protocolserialization5_2.h b/storage/src/vespa/storageapi/mbusprot/protocolserialization5_2.h
deleted file mode 100644
index e56b9942fa5..00000000000
--- a/storage/src/vespa/storageapi/mbusprot/protocolserialization5_2.h
+++ /dev/null
@@ -1,32 +0,0 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-// @author Vegard Sjonfjell
-
-#pragma once
-
-#include "protocolserialization5_1.h"
-#include <vespa/vespalib/util/growablebytebuffer.h>
-#include <vespa/storageapi/message/persistence.h>
-
-namespace storage::mbusprot {
-
-class ProtocolSerialization5_2 : public ProtocolSerialization5_1
-{
-public:
- ProtocolSerialization5_2(const std::shared_ptr<const document::DocumentTypeRepo>& repo)
- : ProtocolSerialization5_1(repo)
- {}
-
-protected:
- void onEncode(GBBuf &, const api::PutCommand &) const override;
- void onEncode(GBBuf &, const api::RemoveCommand &) const override;
- void onEncode(GBBuf &, const api::UpdateCommand &) const override;
-
- SCmd::UP onDecodePutCommand(BBuf &) const override;
- SCmd::UP onDecodeRemoveCommand(BBuf &) const override;
- SCmd::UP onDecodeUpdateCommand(BBuf &) const override;
-
- static void decodeTasCondition(api::StorageCommand & cmd, BBuf & buf);
- static void encodeTasCondition(GBBuf & buf, const api::StorageCommand & cmd);
-};
-
-}
diff --git a/storage/src/vespa/storageapi/mbusprot/protocolserialization6_0.cpp b/storage/src/vespa/storageapi/mbusprot/protocolserialization6_0.cpp
deleted file mode 100644
index 799aff8f8b3..00000000000
--- a/storage/src/vespa/storageapi/mbusprot/protocolserialization6_0.cpp
+++ /dev/null
@@ -1,40 +0,0 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-#include "protocolserialization6_0.h"
-#include "serializationhelper.h"
-
-namespace storage::mbusprot {
-
-ProtocolSerialization6_0::ProtocolSerialization6_0(const std::shared_ptr<const document::DocumentTypeRepo> &repo)
- : ProtocolSerialization5_2(repo)
-{
-}
-
-document::Bucket
-ProtocolSerialization6_0::getBucket(document::ByteBuffer &buf) const
-{
- document::BucketSpace bucketSpace(SH::getLong(buf));
- document::BucketId bucketId(SH::getLong(buf));
- return document::Bucket(bucketSpace, bucketId);
-}
-
-void
-ProtocolSerialization6_0::putBucket(const document::Bucket &bucket, vespalib::GrowableByteBuffer &buf) const
-{
- buf.putLong(bucket.getBucketSpace().getId());
- buf.putLong(bucket.getBucketId().getRawId());
-}
-
-document::BucketSpace
-ProtocolSerialization6_0::getBucketSpace(document::ByteBuffer &buf) const
-{
- return document::BucketSpace(SH::getLong(buf));
-}
-
-void
-ProtocolSerialization6_0::putBucketSpace(document::BucketSpace bucketSpace, vespalib::GrowableByteBuffer &buf) const
-{
- buf.putLong(bucketSpace.getId());
-}
-
-}
diff --git a/storage/src/vespa/storageapi/mbusprot/protocolserialization6_0.h b/storage/src/vespa/storageapi/mbusprot/protocolserialization6_0.h
deleted file mode 100644
index 5467cf6c5d2..00000000000
--- a/storage/src/vespa/storageapi/mbusprot/protocolserialization6_0.h
+++ /dev/null
@@ -1,24 +0,0 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-#pragma once
-
-#include "protocolserialization5_2.h"
-
-namespace storage::mbusprot {
-
-/**
- * Protocol serialization version adding decoding and encoding
- * of bucket space to almost all commands.
- */
-class ProtocolSerialization6_0 : public ProtocolSerialization5_2
-{
-public:
- ProtocolSerialization6_0(const std::shared_ptr<const document::DocumentTypeRepo> &repo);
-
- document::Bucket getBucket(document::ByteBuffer &buf) const override;
- void putBucket(const document::Bucket &bucket, vespalib::GrowableByteBuffer &buf) const override;
- document::BucketSpace getBucketSpace(document::ByteBuffer &buf) const override;
- void putBucketSpace(document::BucketSpace bucketSpace, vespalib::GrowableByteBuffer &buf) const override;
-};
-
-}
diff --git a/storage/src/vespa/storageapi/mbusprot/storageprotocol.cpp b/storage/src/vespa/storageapi/mbusprot/storageprotocol.cpp
index 3cf54860f7c..a59e5827523 100644
--- a/storage/src/vespa/storageapi/mbusprot/storageprotocol.cpp
+++ b/storage/src/vespa/storageapi/mbusprot/storageprotocol.cpp
@@ -6,6 +6,7 @@
#include <vespa/vespalib/util/exceptions.h>
#include <vespa/document/util/stringutil.h>
#include <vespa/document/bucket/fixed_bucket_spaces.h>
+#include <cassert>
#include <sstream>
#include <vespa/log/bufferedlogger.h>
@@ -16,11 +17,7 @@ namespace storage::mbusprot {
mbus::string StorageProtocol::NAME = "StorageProtocol";
StorageProtocol::StorageProtocol(const std::shared_ptr<const document::DocumentTypeRepo> repo)
- : _serializer5_0(repo),
- _serializer5_1(repo),
- _serializer5_2(repo),
- _serializer6_0(repo),
- _serializer7_0(repo)
+ : _serializer7_0(repo)
{
}
@@ -33,20 +30,7 @@ StorageProtocol::createPolicy(const mbus::string&, const mbus::string&) const
}
namespace {
- vespalib::Version version7_0(7, 41, 19);
- vespalib::Version version6_0(6, 240, 0);
- vespalib::Version version5_2(5, 93, 30);
- vespalib::Version version5_1(5, 1, 0);
- vespalib::Version version5_0(5, 0, 12);
- vespalib::Version version5_0beta(4, 3, 0);
-}
-
-
-static bool
-suppressEncodeWarning(const api::StorageMessage *msg)
-{
- const auto *req = dynamic_cast<const api::RequestBucketInfoCommand *>(msg);
- return ((req != nullptr) && (req->getBucketSpace() != document::FixedBucketSpaces::default_space()));
+vespalib::Version version7_0(7, 41, 19);
}
static mbus::Blob
@@ -82,46 +66,23 @@ StorageProtocol::encode(const vespalib::Version& version,
const StorageMessage & message(dynamic_cast<const StorageMessage &>(routable));
try {
- if (message.getInternalMessage().get() == 0) {
- throw vespalib::IllegalArgumentException(
- "Given storage message wrapper does not contain a "
- "storage message.",
- VESPA_STRLOC);
- }
+ assert(message.getInternalMessage());
+ if (version < version7_0) {
+ LOGBP(error,
+ "Cannot encode message on version %s."
+ "Minimum version is %s. Cannot serialize %s.",
+ version.toString().c_str(),
+ version7_0.toString().c_str(),
+ message.getInternalMessage()->toString().c_str());
- if (version < version5_1) {
- if (version < version5_0beta) {
- LOGBP(warning,
- "No support for using messagebus for version %s."
- "Minimum version is %s. Thus we cannot serialize %s.",
- version.toString().c_str(),
- version5_0beta.toString().c_str(),
- message.getInternalMessage()->toString().c_str());
-
- return mbus::Blob(0);
- } else {
- return encodeMessage(_serializer5_0, routable, message, version5_0, version);
- }
- } else if (version < version5_2) {
- return encodeMessage(_serializer5_1, routable, message, version5_1, version);
- } else {
- if (version < version6_0) {
- return encodeMessage(_serializer5_2, routable, message, version5_2, version);
- } else if (version < version7_0) {
- return encodeMessage(_serializer6_0, routable, message, version6_0, version);
- } else {
- return encodeMessage(_serializer7_0, routable, message, version7_0, version);
- }
+ return mbus::Blob(0);
}
-
+ return encodeMessage(_serializer7_0, routable, message, version7_0, version);
} catch (std::exception & e) {
- if (!(version < version6_0 &&
- suppressEncodeWarning(message.getInternalMessage().get()))) {
- LOGBP(warning, "Failed to encode %s storage protocol message %s: %s",
- version.toString().c_str(),
- message.getInternalMessage()->toString().c_str(),
- e.what());
- }
+ LOGBP(warning, "Failed to encode %s storage protocol message %s: %s",
+ version.toString().c_str(),
+ message.getInternalMessage()->toString().c_str(),
+ e.what());
}
return mbus::Blob(0);
@@ -168,27 +129,14 @@ StorageProtocol::decode(const vespalib::Version & version,
static_cast<api::MessageType::Id>(SerializationHelper::getInt(buf)));
StorageMessage::UP message;
- if (version < version5_1) {
- if (version < version5_0beta) {
- LOGBP(error,
- "No support for using messagebus for version %s."
- "Minimum version is %s.",
- version.toString().c_str(),
- version5_0beta.toString().c_str());
- } else {
- return decodeMessage(_serializer5_0, data, type, version5_0, version);
- }
- } else if (version < version5_2) {
- return decodeMessage(_serializer5_1, data, type, version5_1, version);
- } else {
- if (version < version6_0) {
- return decodeMessage(_serializer5_2, data, type, version5_2, version);
- } else if (version < version7_0) {
- return decodeMessage(_serializer6_0, data, type, version6_0, version);
- } else {
- return decodeMessage(_serializer7_0, data, type, version7_0, version);
- }
+ if (version < version7_0) {
+ LOGBP(error,
+ "Cannot decode message on version %s. Minimum version is %s.",
+ version.toString().c_str(),
+ version7_0.toString().c_str());
+ return mbus::Routable::UP();
}
+ return decodeMessage(_serializer7_0, data, type, version7_0, version);
} catch (std::exception & e) {
std::ostringstream ost;
ost << "Failed to decode " << version.toString() << " messagebus "
@@ -196,7 +144,6 @@ StorageProtocol::decode(const vespalib::Version & version,
document::StringUtil::printAsHex(ost, data.data(), data.size());
LOGBP(warning, "%s", ost.str().c_str());
}
-
return mbus::Routable::UP();
}
diff --git a/storage/src/vespa/storageapi/mbusprot/storageprotocol.h b/storage/src/vespa/storageapi/mbusprot/storageprotocol.h
index 591009c9832..dbea21a4a9f 100644
--- a/storage/src/vespa/storageapi/mbusprot/storageprotocol.h
+++ b/storage/src/vespa/storageapi/mbusprot/storageprotocol.h
@@ -1,8 +1,6 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#pragma once
-#include "protocolserialization5_2.h"
-#include "protocolserialization6_0.h"
#include "protocolserialization7.h"
#include <vespa/messagebus/iprotocol.h>
@@ -23,10 +21,6 @@ public:
mbus::Blob encode(const vespalib::Version&, const mbus::Routable&) const override;
mbus::Routable::UP decode(const vespalib::Version&, mbus::BlobRef) const override;
private:
- ProtocolSerialization5_0 _serializer5_0;
- ProtocolSerialization5_1 _serializer5_1;
- ProtocolSerialization5_2 _serializer5_2;
- ProtocolSerialization6_0 _serializer6_0;
ProtocolSerialization7 _serializer7_0;
};