summaryrefslogtreecommitdiffstats
path: root/storageapi
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@verizonmedia.com>2019-04-12 14:29:42 +0200
committerGitHub <noreply@github.com>2019-04-12 14:29:42 +0200
commitf73b5003a0c44c558adc445c40d2b6e4aecf1b8f (patch)
treee067a85ead86e9b64a592e726610f7b5c903a4df /storageapi
parent11bedaf1b2b9abd3b5ac4cf8c54a365e0871cfd4 (diff)
Revert "Use protocol buffers for internal StorageAPI wire encoding"
Diffstat (limited to 'storageapi')
-rw-r--r--storageapi/src/tests/CMakeLists.txt3
-rw-r--r--storageapi/src/tests/mbusprot/CMakeLists.txt2
-rw-r--r--storageapi/src/tests/mbusprot/storageprotocoltest.cpp1093
-rw-r--r--storageapi/src/vespa/storageapi/CMakeLists.txt4
-rw-r--r--storageapi/src/vespa/storageapi/mbusprot/.gitignore3
-rw-r--r--storageapi/src/vespa/storageapi/mbusprot/CMakeLists.txt17
-rw-r--r--storageapi/src/vespa/storageapi/mbusprot/legacyprotocolserialization.h31
-rw-r--r--storageapi/src/vespa/storageapi/mbusprot/protobuf/common.proto68
-rw-r--r--storageapi/src/vespa/storageapi/mbusprot/protobuf/feed.proto91
-rw-r--r--storageapi/src/vespa/storageapi/mbusprot/protobuf/maintenance.proto160
-rw-r--r--storageapi/src/vespa/storageapi/mbusprot/protobuf/visiting.proto66
-rw-r--r--storageapi/src/vespa/storageapi/mbusprot/protobuf_includes.h13
-rw-r--r--storageapi/src/vespa/storageapi/mbusprot/protocolserialization.cpp5
-rw-r--r--storageapi/src/vespa/storageapi/mbusprot/protocolserialization.h34
-rw-r--r--storageapi/src/vespa/storageapi/mbusprot/protocolserialization4_2.cpp2
-rw-r--r--storageapi/src/vespa/storageapi/mbusprot/protocolserialization4_2.h6
-rw-r--r--storageapi/src/vespa/storageapi/mbusprot/protocolserialization5_0.h3
-rw-r--r--storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.cpp1268
-rw-r--r--storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.h146
-rw-r--r--storageapi/src/vespa/storageapi/mbusprot/storageprotocol.cpp12
-rw-r--r--storageapi/src/vespa/storageapi/mbusprot/storageprotocol.h2
21 files changed, 725 insertions, 2304 deletions
diff --git a/storageapi/src/tests/CMakeLists.txt b/storageapi/src/tests/CMakeLists.txt
index ddc43c70004..ebbf3b8357a 100644
--- a/storageapi/src/tests/CMakeLists.txt
+++ b/storageapi/src/tests/CMakeLists.txt
@@ -7,7 +7,6 @@ vespa_add_executable(storageapi_gtest_runner_app TEST
gtest_runner.cpp
DEPENDS
storageapi_testbuckets
- storageapi_testmbusprot
storageapi
gtest
)
@@ -23,8 +22,8 @@ vespa_add_executable(storageapi_testrunner_app TEST
testrunner.cpp
DEPENDS
storageapi_testmessageapi
+ storageapi_testmbusprot
storageapi
- vdstestlib
)
vespa_add_test(
diff --git a/storageapi/src/tests/mbusprot/CMakeLists.txt b/storageapi/src/tests/mbusprot/CMakeLists.txt
index 2801c9a91dd..16ced76155c 100644
--- a/storageapi/src/tests/mbusprot/CMakeLists.txt
+++ b/storageapi/src/tests/mbusprot/CMakeLists.txt
@@ -4,5 +4,5 @@ vespa_add_library(storageapi_testmbusprot
storageprotocoltest.cpp
DEPENDS
storageapi
- gtest
+ vdstestlib
)
diff --git a/storageapi/src/tests/mbusprot/storageprotocoltest.cpp b/storageapi/src/tests/mbusprot/storageprotocoltest.cpp
index 8690d89e12b..f634667afd5 100644
--- a/storageapi/src/tests/mbusprot/storageprotocoltest.cpp
+++ b/storageapi/src/tests/mbusprot/storageprotocoltest.cpp
@@ -14,16 +14,12 @@
#include <vespa/document/update/fieldpathupdates.h>
#include <vespa/document/test/make_document_bucket.h>
#include <vespa/document/test/make_bucket_space.h>
+#include <vespa/vdstestlib/cppunit/macros.h>
#include <vespa/vespalib/util/growablebytebuffer.h>
#include <vespa/vespalib/objects/nbostream.h>
-
#include <iomanip>
#include <sstream>
-#include <gtest/gtest.h>
-
-using namespace ::testing;
-
using std::shared_ptr;
using document::BucketSpace;
using document::ByteBuffer;
@@ -36,103 +32,183 @@ using document::test::makeBucketSpace;
using storage::lib::ClusterState;
using vespalib::string;
-namespace vespalib {
-
-// Needed for GTest to properly understand how to print Version values.
-// If not present, it will print the byte values of the presumed memory area
-// (which will be overrun for some reason, causing Valgrind to scream at us).
-void PrintTo(const vespalib::Version& v, std::ostream* os) {
- *os << v.toString();
-}
-
-}
-
-namespace storage::api {
+namespace storage {
+namespace api {
-struct StorageProtocolTest : TestWithParam<vespalib::Version> {
+struct StorageProtocolTest : public CppUnit::TestFixture {
document::TestDocMan _docMan;
document::Document::SP _testDoc;
document::DocumentId _testDocId;
- document::BucketId _bucket_id;
document::Bucket _bucket;
- document::BucketId _dummy_remap_bucket{17, 12345};
- BucketInfo _dummy_bucket_info{1,2,3,4,5, true, false, 48};
+ vespalib::Version _version5_0{5, 0, 12};
+ vespalib::Version _version5_1{5, 1, 0};
+ vespalib::Version _version5_2{5, 93, 30};
+ vespalib::Version _version6_0{6, 240, 0};
documentapi::LoadTypeSet _loadTypes;
mbusprot::StorageProtocol _protocol;
+ static std::vector<std::string> _nonVerboseMessageStrings;
+ static std::vector<std::string> _verboseMessageStrings;
+ static std::vector<char> _serialization50;
static auto constexpr CONDITION_STRING = "There's just one condition";
StorageProtocolTest()
: _docMan(),
_testDoc(_docMan.createDocument()),
_testDocId(_testDoc->getId()),
- _bucket_id(16, 0x51),
- _bucket(makeDocumentBucket(_bucket_id)),
+ _bucket(makeDocumentBucket(document::BucketId(16, 0x51))),
_protocol(_docMan.getTypeRepoSP(), _loadTypes)
{
_loadTypes.addLoadType(34, "foo", documentapi::Priority::PRI_NORMAL_2);
}
- ~StorageProtocolTest();
-
- void set_dummy_bucket_info_reply_fields(BucketInfoReply& reply) {
- reply.setBucketInfo(_dummy_bucket_info);
- reply.remapBucketId(_dummy_remap_bucket);
- }
-
- void assert_bucket_info_reply_fields_propagated(const BucketInfoReply& reply) {
- EXPECT_EQ(_dummy_bucket_info, reply.getBucketInfo());
- EXPECT_TRUE(reply.hasBeenRemapped());
- EXPECT_EQ(_dummy_remap_bucket, reply.getBucketId());
- EXPECT_EQ(_bucket_id, reply.getOriginalBucketId());
- }
template<typename Command>
- std::shared_ptr<Command> copyCommand(const std::shared_ptr<Command>&);
+ std::shared_ptr<Command> copyCommand(const std::shared_ptr<Command>&, vespalib::Version);
template<typename Reply>
std::shared_ptr<Reply> copyReply(const std::shared_ptr<Reply>&);
+ void recordOutput(const api::StorageMessage& msg);
+
+ void recordSerialization50();
+
+ void testWriteSerialization50();
+ void testAddress50();
+ void testStringOutputs();
+
+ void testPut51();
+ void testUpdate51();
+ void testGet51();
+ void testRemove51();
+ void testRevert51();
+ void testRequestBucketInfo51();
+ void testNotifyBucketChange51();
+ void testCreateBucket51();
+ void testDeleteBucket51();
+ void testMergeBucket51();
+ void testGetBucketDiff51();
+ void testApplyBucketDiff51();
+ void testSplitBucket51();
+ void testSplitBucketChain51();
+ void testJoinBuckets51();
+ void testCreateVisitor51();
+ void testDestroyVisitor51();
+ void testRemoveLocation51();
+ void testInternalMessage();
+ void testSetBucketState51();
+
+ void testPutCommand52();
+ void testUpdateCommand52();
+ void testRemoveCommand52();
+
+ void testPutCommandWithBucketSpace6_0();
+ void testCreateVisitorWithBucketSpace6_0();
+ void testRequestBucketInfoWithBucketSpace6_0();
+
+ void serialized_size_is_used_to_set_approx_size_of_storage_message();
+
+ CPPUNIT_TEST_SUITE(StorageProtocolTest);
+
+ // Enable to see string outputs of messages
+ // CPPUNIT_TEST_DISABLED(testStringOutputs);
+
+ // Enable this to write 5.0 serialization to disk
+ // CPPUNIT_TEST_DISABLED(testWriteSerialization50);
+ // CPPUNIT_TEST_DISABLED(testAddress50);
+
+ // 5.1 tests
+ CPPUNIT_TEST(testPut51);
+ CPPUNIT_TEST(testUpdate51);
+ CPPUNIT_TEST(testGet51);
+ CPPUNIT_TEST(testRemove51);
+ CPPUNIT_TEST(testRevert51);
+ CPPUNIT_TEST(testRequestBucketInfo51);
+ CPPUNIT_TEST(testNotifyBucketChange51);
+ CPPUNIT_TEST(testCreateBucket51);
+ CPPUNIT_TEST(testDeleteBucket51);
+ CPPUNIT_TEST(testMergeBucket51);
+ CPPUNIT_TEST(testGetBucketDiff51);
+ CPPUNIT_TEST(testApplyBucketDiff51);
+ CPPUNIT_TEST(testSplitBucket51);
+ CPPUNIT_TEST(testJoinBuckets51);
+ CPPUNIT_TEST(testCreateVisitor51);
+ CPPUNIT_TEST(testDestroyVisitor51);
+ CPPUNIT_TEST(testRemoveLocation51);
+ CPPUNIT_TEST(testInternalMessage);
+ CPPUNIT_TEST(testSetBucketState51);
+
+ // 5.2 tests
+ CPPUNIT_TEST(testPutCommand52);
+ CPPUNIT_TEST(testUpdateCommand52);
+ CPPUNIT_TEST(testRemoveCommand52);
+
+ // 6.0 tests
+ CPPUNIT_TEST(testPutCommandWithBucketSpace6_0);
+ CPPUNIT_TEST(testCreateVisitorWithBucketSpace6_0);
+ CPPUNIT_TEST(testRequestBucketInfoWithBucketSpace6_0);
+
+ CPPUNIT_TEST(serialized_size_is_used_to_set_approx_size_of_storage_message);
+
+ CPPUNIT_TEST_SUITE_END();
};
-StorageProtocolTest::~StorageProtocolTest() = default;
+CPPUNIT_TEST_SUITE_REGISTRATION(StorageProtocolTest);
-namespace {
+std::vector<std::string> StorageProtocolTest::_nonVerboseMessageStrings;
+std::vector<std::string> StorageProtocolTest::_verboseMessageStrings;
+std::vector<char> StorageProtocolTest::_serialization50;
-std::string version_as_gtest_string(TestParamInfo<vespalib::Version> info) {
- std::ostringstream ss;
- auto& p = info.param;
- // Dots are not allowed in test names, so convert to underscores.
- ss << p.getMajor() << '_' << p.getMinor() << '_' << p.getMicro();
- return ss.str();
+void
+StorageProtocolTest::recordOutput(const api::StorageMessage& msg)
+{
+ std::ostringstream ost;
+ ost << " ";
+ msg.print(ost, false, " ");
+ _nonVerboseMessageStrings.push_back(ost.str());
+ ost.str("");
+ ost << " ";
+ msg.print(ost, true, " ");
+ _verboseMessageStrings.push_back(ost.str());
}
-}
+namespace {
+
+ bool debug = false;
+
+ struct ScopedName {
+ std::string _name;
+
+ ScopedName(const std::string& s) : _name(s) {
+ if (debug) std::cerr << "Starting test " << _name << "\n";
+ }
+ ~ScopedName() {
+ if (debug) std::cerr << "Finished test " << _name << "\n";
+ }
+ };
-// TODO replace with INSTANTIATE_TEST_SUITE_P on newer gtest versions
-INSTANTIATE_TEST_CASE_P(MultiVersionTest, StorageProtocolTest,
- Values(vespalib::Version(6, 240, 0),
- vespalib::Version(7, 40, 5)),
- version_as_gtest_string);
+} // Anonymous namespace
namespace {
mbus::Message::UP lastCommand;
mbus::Reply::UP lastReply;
}
-TEST_F(StorageProtocolTest, testAddress50) {
+void
+StorageProtocolTest::testAddress50()
+{
StorageMessageAddress address("foo", lib::NodeType::STORAGE, 3);
- EXPECT_EQ(vespalib::string("storage/cluster.foo/storage/3/default"),
+ CPPUNIT_ASSERT_EQUAL(vespalib::string("storage/cluster.foo/storage/3/default"),
address.getRoute().toString());
}
template<typename Command> std::shared_ptr<Command>
-StorageProtocolTest::copyCommand(const std::shared_ptr<Command>& m)
+StorageProtocolTest::copyCommand(const std::shared_ptr<Command>& m, vespalib::Version version)
{
- auto mbusMessage = std::make_unique<mbusprot::StorageCommand>(m);
- auto version = GetParam();
+ mbus::Message::UP mbusMessage(new mbusprot::StorageCommand(m));
mbus::Blob blob = _protocol.encode(version, *mbusMessage);
mbus::Routable::UP copy(_protocol.decode(version, blob));
- assert(copy.get());
- auto* copy2 = dynamic_cast<mbusprot::StorageCommand*>(copy.get());
- assert(copy2 != nullptr);
+ CPPUNIT_ASSERT(copy.get());
+
+ mbusprot::StorageCommand* copy2(dynamic_cast<mbusprot::StorageCommand*>(copy.get()));
+ CPPUNIT_ASSERT(copy2 != 0);
StorageCommand::SP internalMessage(copy2->getCommand());
lastCommand = std::move(mbusMessage);
@@ -143,61 +219,80 @@ StorageProtocolTest::copyCommand(const std::shared_ptr<Command>& m)
template<typename Reply> std::shared_ptr<Reply>
StorageProtocolTest::copyReply(const std::shared_ptr<Reply>& m)
{
- auto mbusMessage = std::make_unique<mbusprot::StorageReply>(m);
- auto version = GetParam();
- mbus::Blob blob = _protocol.encode(version, *mbusMessage);
- mbus::Routable::UP copy(_protocol.decode(version, blob));
- assert(copy.get());
-
- auto* copy2 = dynamic_cast<mbusprot::StorageReply*>(copy.get());
- assert(copy2 != nullptr);
-
+ mbus::Reply::UP mbusMessage(new mbusprot::StorageReply(m));
+ mbus::Blob blob = _protocol.encode(_version5_1, *mbusMessage);
+ mbus::Routable::UP copy(_protocol.decode(_version5_1, blob));
+ CPPUNIT_ASSERT(copy.get());
+ mbusprot::StorageReply* copy2(
+ dynamic_cast<mbusprot::StorageReply*>(copy.get()));
+ CPPUNIT_ASSERT(copy2 != 0);
copy2->setMessage(std::move(lastCommand));
- auto internalMessage = copy2->getReply();
+ StorageReply::SP internalMessage(copy2->getReply());
lastReply = std::move(mbusMessage);
lastCommand = copy2->getMessage();
return std::dynamic_pointer_cast<Reply>(internalMessage);
}
-TEST_P(StorageProtocolTest, put) {
- auto cmd = std::make_shared<PutCommand>(_bucket, _testDoc, 14);
+void
+StorageProtocolTest::recordSerialization50()
+{
+ assert(lastCommand.get());
+ assert(lastReply.get());
+ for (uint32_t j=0; j<2; ++j) {
+ mbusprot::StorageMessage& msg(j == 0
+ ? dynamic_cast<mbusprot::StorageMessage&>(*lastCommand)
+ : dynamic_cast<mbusprot::StorageMessage&>(*lastReply));
+ msg.getInternalMessage()->forceMsgId(0);
+ mbus::Routable& routable(j == 0
+ ? dynamic_cast<mbus::Routable&>(*lastCommand)
+ : dynamic_cast<mbus::Routable&>(*lastReply));
+ mbus::Blob blob = _protocol.encode(_version5_0, routable);
+ _serialization50.push_back('\n');
+ std::string type(msg.getInternalMessage()->getType().toString());
+ for (uint32_t i=0, n=type.size(); i<n; ++i) {
+ _serialization50.push_back(type[i]);
+ }
+ _serialization50.push_back('\n');
+
+ for (uint32_t i=0, n=blob.size(); i<n; ++i) {
+ _serialization50.push_back(blob.data()[i]);
+ }
+ }
+}
+
+void
+StorageProtocolTest::testPut51()
+{
+ ScopedName test("testPut51");
+ PutCommand::SP cmd(new PutCommand(_bucket, _testDoc, 14));
cmd->setUpdateTimestamp(Timestamp(13));
cmd->setLoadType(_loadTypes["foo"]);
- auto cmd2 = copyCommand(cmd);
- EXPECT_EQ(_bucket, cmd2->getBucket());
- EXPECT_EQ(*_testDoc, *cmd2->getDocument());
- EXPECT_EQ(vespalib::string("foo"), cmd2->getLoadType().getName());
- EXPECT_EQ(Timestamp(14), cmd2->getTimestamp());
- EXPECT_EQ(Timestamp(13), cmd2->getUpdateTimestamp());
-
- auto reply = std::make_shared<PutReply>(*cmd2);
- ASSERT_TRUE(reply->hasDocument());
- EXPECT_EQ(*_testDoc, *reply->getDocument());
- set_dummy_bucket_info_reply_fields(*reply);
- auto reply2 = copyReply(reply);
- ASSERT_TRUE(reply2->hasDocument());
- EXPECT_EQ(*_testDoc, *reply->getDocument());
- EXPECT_EQ(_testDoc->getId(), reply2->getDocumentId());
- EXPECT_EQ(Timestamp(14), reply2->getTimestamp());
- EXPECT_NO_FATAL_FAILURE(assert_bucket_info_reply_fields_propagated(*reply2));
-}
-
-TEST_P(StorageProtocolTest, response_without_remapped_bucket_preserves_original_bucket) {
- auto cmd = std::make_shared<PutCommand>(_bucket, _testDoc, 14);
- auto cmd2 = copyCommand(cmd);
- auto reply = std::make_shared<PutReply>(*cmd2);
- auto reply2 = copyReply(reply);
-
- EXPECT_FALSE(reply2->hasBeenRemapped());
- EXPECT_EQ(_bucket_id, reply2->getBucketId());
- EXPECT_EQ(document::BucketId(), reply2->getOriginalBucketId());
-
-}
-
-TEST_P(StorageProtocolTest, update) {
- auto update = std::make_shared<document::DocumentUpdate>(
- _docMan.getTypeRepo(), *_testDoc->getDataType(), _testDoc->getId());
- auto assignUpdate = std::make_shared<document::AssignValueUpdate>(document::IntFieldValue(17));
+ PutCommand::SP cmd2(copyCommand(cmd, _version5_1));
+ CPPUNIT_ASSERT_EQUAL(*_testDoc, *cmd2->getDocument());
+ CPPUNIT_ASSERT_EQUAL(vespalib::string("foo"), cmd2->getLoadType().getName());
+ CPPUNIT_ASSERT_EQUAL(Timestamp(14), cmd2->getTimestamp());
+ CPPUNIT_ASSERT_EQUAL(Timestamp(13), cmd2->getUpdateTimestamp());
+
+ PutReply::SP reply(new PutReply(*cmd2));
+ CPPUNIT_ASSERT(reply->hasDocument());
+ CPPUNIT_ASSERT_EQUAL(*_testDoc, *reply->getDocument());
+ PutReply::SP reply2(copyReply(reply));
+ CPPUNIT_ASSERT(reply2->hasDocument());
+ CPPUNIT_ASSERT_EQUAL(*_testDoc, *reply->getDocument());
+ CPPUNIT_ASSERT_EQUAL(_testDoc->getId(), reply2->getDocumentId());
+ CPPUNIT_ASSERT_EQUAL(Timestamp(14), reply2->getTimestamp());
+
+ recordOutput(*cmd2);
+ recordOutput(*reply2);
+ recordSerialization50();
+}
+
+void
+StorageProtocolTest::testUpdate51()
+{
+ ScopedName test("testUpdate51");
+ document::DocumentUpdate::SP update(new document::DocumentUpdate(_docMan.getTypeRepo(), *_testDoc->getDataType(), _testDoc->getId()));
+ std::shared_ptr<document::AssignValueUpdate> assignUpdate(new document::AssignValueUpdate(document::IntFieldValue(17)));
document::FieldUpdate fieldUpdate(_testDoc->getField("headerval"));
fieldUpdate.addUpdate(*assignUpdate);
update->addUpdate(fieldUpdate);
@@ -205,157 +300,217 @@ TEST_P(StorageProtocolTest, update) {
update->addFieldPathUpdate(document::FieldPathUpdate::CP(
new document::RemoveFieldPathUpdate("headerval", "testdoctype1.headerval > 0")));
- auto cmd = std::make_shared<UpdateCommand>(_bucket, update, 14);
- EXPECT_EQ(Timestamp(0), cmd->getOldTimestamp());
+ UpdateCommand::SP cmd(new UpdateCommand(_bucket, update, 14));
+ CPPUNIT_ASSERT_EQUAL(Timestamp(0), cmd->getOldTimestamp());
cmd->setOldTimestamp(10);
- auto cmd2 = copyCommand(cmd);
- EXPECT_EQ(_bucket, cmd2->getBucket());
- EXPECT_EQ(_testDocId, cmd2->getDocumentId());
- EXPECT_EQ(Timestamp(14), cmd2->getTimestamp());
- EXPECT_EQ(Timestamp(10), cmd2->getOldTimestamp());
- EXPECT_EQ(*update, *cmd2->getUpdate());
-
- auto reply = std::make_shared<UpdateReply>(*cmd2, 8);
- set_dummy_bucket_info_reply_fields(*reply);
- auto reply2 = copyReply(reply);
- EXPECT_EQ(_testDocId, reply2->getDocumentId());
- EXPECT_EQ(Timestamp(14), reply2->getTimestamp());
- EXPECT_EQ(Timestamp(8), reply->getOldTimestamp());
- EXPECT_NO_FATAL_FAILURE(assert_bucket_info_reply_fields_propagated(*reply2));
-}
-
-TEST_P(StorageProtocolTest, get) {
- auto cmd = std::make_shared<GetCommand>(_bucket, _testDocId, "foo,bar,vekterli", 123);
- auto cmd2 = copyCommand(cmd);
- EXPECT_EQ(_bucket, cmd2->getBucket());
- EXPECT_EQ(_testDocId, cmd2->getDocumentId());
- EXPECT_EQ(Timestamp(123), cmd2->getBeforeTimestamp());
- EXPECT_EQ(vespalib::string("foo,bar,vekterli"), cmd2->getFieldSet());
-
- auto reply = std::make_shared<GetReply>(*cmd2, _testDoc, 100);
- set_dummy_bucket_info_reply_fields(*reply);
- auto reply2 = copyReply(reply);
- ASSERT_TRUE(reply2.get() != nullptr);
- ASSERT_TRUE(reply2->getDocument().get() != nullptr);
- EXPECT_EQ(*_testDoc, *reply2->getDocument());
- EXPECT_EQ(_testDoc->getId(), reply2->getDocumentId());
- EXPECT_EQ(Timestamp(123), reply2->getBeforeTimestamp());
- EXPECT_EQ(Timestamp(100), reply2->getLastModifiedTimestamp());
- EXPECT_NO_FATAL_FAILURE(assert_bucket_info_reply_fields_propagated(*reply2));
-}
-
-TEST_P(StorageProtocolTest, remove) {
- auto cmd = std::make_shared<RemoveCommand>(_bucket, _testDocId, 159);
- auto cmd2 = copyCommand(cmd);
- EXPECT_EQ(_bucket, cmd2->getBucket());
- EXPECT_EQ(_testDocId, cmd2->getDocumentId());
- EXPECT_EQ(Timestamp(159), cmd2->getTimestamp());
-
- auto reply = std::make_shared<RemoveReply>(*cmd2, 48);
- set_dummy_bucket_info_reply_fields(*reply);
-
- auto reply2 = copyReply(reply);
- EXPECT_EQ(_testDocId, reply2->getDocumentId());
- EXPECT_EQ(Timestamp(159), reply2->getTimestamp());
- EXPECT_EQ(Timestamp(48), reply2->getOldTimestamp());
- EXPECT_NO_FATAL_FAILURE(assert_bucket_info_reply_fields_propagated(*reply2));
-}
-
-TEST_P(StorageProtocolTest, revert) {
+ UpdateCommand::SP cmd2(copyCommand(cmd, _version5_1));
+ CPPUNIT_ASSERT_EQUAL(_testDocId, cmd2->getDocumentId());
+ CPPUNIT_ASSERT_EQUAL(Timestamp(14), cmd2->getTimestamp());
+ CPPUNIT_ASSERT_EQUAL(Timestamp(10), cmd2->getOldTimestamp());
+ CPPUNIT_ASSERT_EQUAL(*update, *cmd2->getUpdate());
+
+ UpdateReply::SP reply(new UpdateReply(*cmd2, 8));
+ UpdateReply::SP reply2(copyReply(reply));
+ CPPUNIT_ASSERT_EQUAL(_testDocId, reply2->getDocumentId());
+ CPPUNIT_ASSERT_EQUAL(Timestamp(14), reply2->getTimestamp());
+ CPPUNIT_ASSERT_EQUAL(Timestamp(8), reply->getOldTimestamp());
+
+ recordOutput(*cmd2);
+ recordOutput(*reply2);
+ recordSerialization50();
+}
+
+void
+StorageProtocolTest::testGet51()
+{
+ ScopedName test("testGet51");
+ GetCommand::SP cmd(new GetCommand(_bucket, _testDocId, "foo,bar,vekterli", 123));
+ GetCommand::SP cmd2(copyCommand(cmd, _version5_1));
+ CPPUNIT_ASSERT_EQUAL(_testDocId, cmd2->getDocumentId());
+ CPPUNIT_ASSERT_EQUAL(Timestamp(123), cmd2->getBeforeTimestamp());
+ CPPUNIT_ASSERT_EQUAL(vespalib::string("foo,bar,vekterli"), cmd2->getFieldSet());
+
+ GetReply::SP reply(new GetReply(*cmd2, _testDoc, 100));
+ GetReply::SP reply2(copyReply(reply));
+ CPPUNIT_ASSERT(reply2.get());
+ CPPUNIT_ASSERT(reply2->getDocument().get());
+ CPPUNIT_ASSERT_EQUAL(*_testDoc, *reply2->getDocument());
+ CPPUNIT_ASSERT_EQUAL(_testDoc->getId(), reply2->getDocumentId());
+ CPPUNIT_ASSERT_EQUAL(Timestamp(123), reply2->getBeforeTimestamp());
+ CPPUNIT_ASSERT_EQUAL(Timestamp(100), reply2->getLastModifiedTimestamp());
+
+ recordOutput(*cmd2);
+ recordOutput(*reply2);
+ recordSerialization50();
+}
+
+void
+StorageProtocolTest::testRemove51()
+{
+ ScopedName test("testRemove51");
+ RemoveCommand::SP cmd(new RemoveCommand(_bucket, _testDocId, 159));
+ RemoveCommand::SP cmd2(copyCommand(cmd, _version5_1));
+ CPPUNIT_ASSERT_EQUAL(_testDocId, cmd2->getDocumentId());
+ CPPUNIT_ASSERT_EQUAL(Timestamp(159), cmd2->getTimestamp());
+
+ RemoveReply::SP reply(new RemoveReply(*cmd2, 48));
+ reply->setBucketInfo(BucketInfo(1,2,3,4,5, true, false, 48));
+
+ RemoveReply::SP reply2(copyReply(reply));
+ CPPUNIT_ASSERT_EQUAL(_testDocId, reply2->getDocumentId());
+ CPPUNIT_ASSERT_EQUAL(Timestamp(159), reply2->getTimestamp());
+ CPPUNIT_ASSERT_EQUAL(Timestamp(48), reply2->getOldTimestamp());
+ CPPUNIT_ASSERT_EQUAL(BucketInfo(1,2,3,4,5, true, false, 48),
+ reply2->getBucketInfo());
+
+ recordOutput(*cmd2);
+ recordOutput(*reply2);
+ recordSerialization50();
+}
+
+void
+StorageProtocolTest::testRevert51()
+{
+ ScopedName test("testRevertCommand51");
std::vector<Timestamp> tokens;
tokens.push_back(59);
- auto cmd = std::make_shared<RevertCommand>(_bucket, tokens);
- auto cmd2 = copyCommand(cmd);
- EXPECT_EQ(_bucket, cmd2->getBucket());
- EXPECT_EQ(tokens, cmd2->getRevertTokens());
+ RevertCommand::SP cmd(new RevertCommand(_bucket, tokens));
+ RevertCommand::SP cmd2(copyCommand(cmd, _version5_1));
+ CPPUNIT_ASSERT_EQUAL(tokens, cmd2->getRevertTokens());
+
+ RevertReply::SP reply(new RevertReply(*cmd2));
+ BucketInfo info(0x12345432, 101, 520);
+ reply->setBucketInfo(info);
+ RevertReply::SP reply2(copyReply(reply));
- auto reply = std::make_shared<RevertReply>(*cmd2);
- set_dummy_bucket_info_reply_fields(*reply);
- auto reply2 = copyReply(reply);
- EXPECT_NO_FATAL_FAILURE(assert_bucket_info_reply_fields_propagated(*reply2));
+ CPPUNIT_ASSERT_EQUAL(info, reply2->getBucketInfo());
+
+ recordOutput(*cmd2);
+ recordOutput(*reply2);
+ recordSerialization50();
}
-TEST_P(StorageProtocolTest, request_bucket_info) {
+void
+StorageProtocolTest::testRequestBucketInfo51()
+{
+ ScopedName test("testRequestBucketInfo51");
{
std::vector<document::BucketId> ids;
ids.push_back(document::BucketId(3));
ids.push_back(document::BucketId(7));
- auto cmd = std::make_shared<RequestBucketInfoCommand>(makeBucketSpace(), ids);
- auto cmd2 = copyCommand(cmd);
- EXPECT_EQ(ids, cmd2->getBuckets());
- EXPECT_FALSE(cmd2->hasSystemState());
+ RequestBucketInfoCommand::SP cmd(new RequestBucketInfoCommand(makeBucketSpace(), ids));
+ RequestBucketInfoCommand::SP cmd2(copyCommand(cmd, _version5_1));
+ CPPUNIT_ASSERT_EQUAL(ids, cmd2->getBuckets());
+ CPPUNIT_ASSERT(!cmd2->hasSystemState());
+
+ recordOutput(*cmd2);
}
{
ClusterState state("distributor:3 .1.s:d");
- auto cmd = std::make_shared<RequestBucketInfoCommand>(makeBucketSpace(), 3, state, "14");
- auto cmd2 = copyCommand(cmd);
- ASSERT_TRUE(cmd2->hasSystemState());
- EXPECT_EQ(uint16_t(3), cmd2->getDistributor());
- EXPECT_EQ(state, cmd2->getSystemState());
- EXPECT_EQ(size_t(0), cmd2->getBuckets().size());
-
- auto reply = std::make_shared<RequestBucketInfoReply>(*cmd);
+ RequestBucketInfoCommand::SP cmd(new RequestBucketInfoCommand(
+ makeBucketSpace(),
+ 3, state, "14"));
+ RequestBucketInfoCommand::SP cmd2(copyCommand(cmd, _version5_1));
+ CPPUNIT_ASSERT(cmd2->hasSystemState());
+ CPPUNIT_ASSERT_EQUAL(uint16_t(3), cmd2->getDistributor());
+ CPPUNIT_ASSERT_EQUAL(state, cmd2->getSystemState());
+ CPPUNIT_ASSERT_EQUAL(size_t(0), cmd2->getBuckets().size());
+
+ RequestBucketInfoReply::SP reply(new RequestBucketInfoReply(*cmd));
RequestBucketInfoReply::Entry e;
e._bucketId = document::BucketId(4);
const uint64_t lastMod = 0x1337cafe98765432ULL;
e._info = BucketInfo(43, 24, 123, 44, 124, false, true, lastMod);
reply->getBucketInfo().push_back(e);
- auto reply2 = copyReply(reply);
- EXPECT_EQ(size_t(1), reply2->getBucketInfo().size());
+ RequestBucketInfoReply::SP reply2(copyReply(reply));
+ CPPUNIT_ASSERT_EQUAL(size_t(1), reply2->getBucketInfo().size());
auto& entries(reply2->getBucketInfo());
- EXPECT_EQ(e, entries[0]);
+ CPPUNIT_ASSERT_EQUAL(e, entries[0]);
// "Last modified" not counted by operator== for some reason. Testing
// separately until we can figure out if this is by design or not.
- EXPECT_EQ(lastMod, entries[0]._info.getLastModified());
+ CPPUNIT_ASSERT_EQUAL(lastMod, entries[0]._info.getLastModified());
+
+ recordOutput(*cmd2);
+ recordOutput(*reply2);
+ recordSerialization50();
}
}
-TEST_P(StorageProtocolTest, notify_bucket_change) {
- auto cmd = std::make_shared<NotifyBucketChangeCommand>(_bucket, _dummy_bucket_info);
- auto cmd2 = copyCommand(cmd);
- EXPECT_EQ(_bucket, cmd2->getBucket());
- EXPECT_EQ(_dummy_bucket_info, cmd2->getBucketInfo());
-
- auto reply = std::make_shared<NotifyBucketChangeReply>(*cmd);
- auto reply2 = copyReply(reply);
-}
+void
+StorageProtocolTest::testNotifyBucketChange51()
+{
+ ScopedName test("testNotifyBucketChange51");
+ BucketInfo info(2, 3, 4);
+ document::BucketId modifiedBucketId(20, 1000);
+ document::Bucket modifiedBucket(makeDocumentBucket(modifiedBucketId));
+ NotifyBucketChangeCommand::SP cmd(new NotifyBucketChangeCommand(
+ modifiedBucket, info));
+ NotifyBucketChangeCommand::SP cmd2(copyCommand(cmd, _version5_1));
+ CPPUNIT_ASSERT_EQUAL(document::BucketId(20, 1000),
+ cmd2->getBucketId());
+ CPPUNIT_ASSERT_EQUAL(info, cmd2->getBucketInfo());
+
+ NotifyBucketChangeReply::SP reply(new NotifyBucketChangeReply(*cmd));
+ NotifyBucketChangeReply::SP reply2(copyReply(reply));
+
+ recordOutput(*cmd2);
+ recordOutput(*reply2);
+ recordSerialization50();
+}
+
+void
+StorageProtocolTest::testCreateBucket51()
+{
+ ScopedName test("testCreateBucket51");
+ document::BucketId bucketId(623);
+ document::Bucket bucket(makeDocumentBucket(bucketId));
-TEST_P(StorageProtocolTest, create_bucket_without_activation) {
- auto cmd = std::make_shared<CreateBucketCommand>(_bucket);
- EXPECT_FALSE(cmd->getActive());
- auto cmd2 = copyCommand(cmd);
- EXPECT_EQ(_bucket, cmd2->getBucket());
- EXPECT_FALSE(cmd2->getActive());
+ CreateBucketCommand::SP cmd(new CreateBucketCommand(bucket));
+ CreateBucketCommand::SP cmd2(copyCommand(cmd, _version5_1));
+ CPPUNIT_ASSERT_EQUAL(bucketId, cmd2->getBucketId());
- auto reply = std::make_shared<CreateBucketReply>(*cmd);
- set_dummy_bucket_info_reply_fields(*reply);
- auto reply2 = copyReply(reply);
- EXPECT_NO_FATAL_FAILURE(assert_bucket_info_reply_fields_propagated(*reply2));
-}
+ CreateBucketReply::SP reply(new CreateBucketReply(*cmd));
+ CreateBucketReply::SP reply2(copyReply(reply));
+ CPPUNIT_ASSERT_EQUAL(bucketId, reply2->getBucketId());
-TEST_P(StorageProtocolTest, create_bucket_propagates_activation_flag) {
- auto cmd = std::make_shared<CreateBucketCommand>(_bucket);
- cmd->setActive(true);
- auto cmd2 = copyCommand(cmd);
- EXPECT_TRUE(cmd2->getActive());
+ recordOutput(*cmd2);
+ recordOutput(*reply2);
+ recordSerialization50();
}
-TEST_P(StorageProtocolTest, delete_bucket) {
- auto cmd = std::make_shared<DeleteBucketCommand>(_bucket);
- cmd->setBucketInfo(_dummy_bucket_info);
- auto cmd2 = copyCommand(cmd);
- EXPECT_EQ(_bucket, cmd2->getBucket());
- EXPECT_EQ(_dummy_bucket_info, cmd2->getBucketInfo());
-
- auto reply = std::make_shared<DeleteBucketReply>(*cmd);
+void
+StorageProtocolTest::testDeleteBucket51()
+{
+ ScopedName test("testDeleteBucket51");
+ document::BucketId bucketId(623);
+ document::Bucket bucket(makeDocumentBucket(bucketId));
+
+ DeleteBucketCommand::SP cmd(new DeleteBucketCommand(bucket));
+ BucketInfo info(0x100, 200, 300);
+ cmd->setBucketInfo(info);
+ DeleteBucketCommand::SP cmd2(copyCommand(cmd, _version5_1));
+ CPPUNIT_ASSERT_EQUAL(bucketId, cmd2->getBucketId());
+ CPPUNIT_ASSERT_EQUAL(info, cmd2->getBucketInfo());
+
+ DeleteBucketReply::SP reply(new DeleteBucketReply(*cmd));
// Not set automatically by constructor
reply->setBucketInfo(cmd2->getBucketInfo());
- auto reply2 = copyReply(reply);
- EXPECT_EQ(_bucket_id, reply2->getBucketId());
- EXPECT_EQ(_dummy_bucket_info, reply2->getBucketInfo());
+ DeleteBucketReply::SP reply2(copyReply(reply));
+ CPPUNIT_ASSERT_EQUAL(bucketId, reply2->getBucketId());
+ CPPUNIT_ASSERT_EQUAL(info, reply2->getBucketInfo());
+
+ recordOutput(*cmd2);
+ recordOutput(*reply2);
+ recordSerialization50();
}
-TEST_P(StorageProtocolTest, merge_bucket) {
+void
+StorageProtocolTest::testMergeBucket51()
+{
+ ScopedName test("testMergeBucket51");
+ document::BucketId bucketId(623);
+ document::Bucket bucket(makeDocumentBucket(bucketId));
+
typedef api::MergeBucketCommand::Node Node;
std::vector<Node> nodes;
nodes.push_back(Node(4, false));
@@ -367,98 +522,152 @@ TEST_P(StorageProtocolTest, merge_bucket) {
chain.push_back(7);
chain.push_back(14);
- auto cmd = std::make_shared<MergeBucketCommand>(_bucket, nodes, Timestamp(1234), 567, chain);
- auto cmd2 = copyCommand(cmd);
- EXPECT_EQ(_bucket, cmd2->getBucket());
- EXPECT_EQ(nodes, cmd2->getNodes());
- EXPECT_EQ(Timestamp(1234), cmd2->getMaxTimestamp());
- EXPECT_EQ(uint32_t(567), cmd2->getClusterStateVersion());
- EXPECT_EQ(chain, cmd2->getChain());
-
- auto reply = std::make_shared<MergeBucketReply>(*cmd);
- auto reply2 = copyReply(reply);
- EXPECT_EQ(_bucket_id, reply2->getBucketId());
- EXPECT_EQ(nodes, reply2->getNodes());
- EXPECT_EQ(Timestamp(1234), reply2->getMaxTimestamp());
- EXPECT_EQ(uint32_t(567), reply2->getClusterStateVersion());
- EXPECT_EQ(chain, reply2->getChain());
-}
-
-TEST_P(StorageProtocolTest, split_bucket) {
- auto cmd = std::make_shared<SplitBucketCommand>(_bucket);
- EXPECT_EQ(0u, cmd->getMinSplitBits());
- EXPECT_EQ(58u, cmd->getMaxSplitBits());
- EXPECT_EQ(std::numeric_limits<uint32_t>().max(), cmd->getMinByteSize());
- EXPECT_EQ(std::numeric_limits<uint32_t>().max(), cmd->getMinDocCount());
+ MergeBucketCommand::SP cmd(
+ new MergeBucketCommand(bucket, nodes, Timestamp(1234), 567, chain));
+ MergeBucketCommand::SP cmd2(copyCommand(cmd, _version5_1));
+ CPPUNIT_ASSERT_EQUAL(bucketId, cmd2->getBucketId());
+ CPPUNIT_ASSERT_EQUAL(nodes, cmd2->getNodes());
+ CPPUNIT_ASSERT_EQUAL(Timestamp(1234), cmd2->getMaxTimestamp());
+ CPPUNIT_ASSERT_EQUAL(uint32_t(567), cmd2->getClusterStateVersion());
+ CPPUNIT_ASSERT_EQUAL(chain, cmd2->getChain());
+
+ MergeBucketReply::SP reply(new MergeBucketReply(*cmd));
+ MergeBucketReply::SP reply2(copyReply(reply));
+ CPPUNIT_ASSERT_EQUAL(bucketId, reply2->getBucketId());
+ CPPUNIT_ASSERT_EQUAL(nodes, reply2->getNodes());
+ CPPUNIT_ASSERT_EQUAL(Timestamp(1234), reply2->getMaxTimestamp());
+ CPPUNIT_ASSERT_EQUAL(uint32_t(567), reply2->getClusterStateVersion());
+ CPPUNIT_ASSERT_EQUAL(chain, reply2->getChain());
+
+ recordOutput(*cmd2);
+ recordOutput(*reply2);
+ recordSerialization50();
+}
+
+void
+StorageProtocolTest::testSplitBucket51()
+{
+ ScopedName test("testSplitBucket51");
+
+ document::BucketId bucketId(16, 0);
+ document::Bucket bucket(makeDocumentBucket(bucketId));
+ SplitBucketCommand::SP cmd(new SplitBucketCommand(bucket));
+ CPPUNIT_ASSERT_EQUAL(0u, (uint32_t) cmd->getMinSplitBits());
+ CPPUNIT_ASSERT_EQUAL(58u, (uint32_t) cmd->getMaxSplitBits());
+ CPPUNIT_ASSERT_EQUAL(std::numeric_limits<uint32_t>().max(),
+ cmd->getMinByteSize());
+ CPPUNIT_ASSERT_EQUAL(std::numeric_limits<uint32_t>().max(),
+ cmd->getMinDocCount());
cmd->setMinByteSize(1000);
cmd->setMinDocCount(5);
cmd->setMaxSplitBits(40);
cmd->setMinSplitBits(20);
- auto cmd2 = copyCommand(cmd);
- EXPECT_EQ(_bucket, cmd2->getBucket());
- EXPECT_EQ(20u, cmd2->getMinSplitBits());
- EXPECT_EQ(40u, cmd2->getMaxSplitBits());
- EXPECT_EQ(1000u, cmd2->getMinByteSize());
- EXPECT_EQ(5u, cmd2->getMinDocCount());
-
- auto reply = std::make_shared<SplitBucketReply>(*cmd2);
- reply->getSplitInfo().emplace_back(document::BucketId(17, 0), BucketInfo(100, 1000, 10000, true, true));
- reply->getSplitInfo().emplace_back(document::BucketId(17, 1), BucketInfo(101, 1001, 10001, true, true));
- auto reply2 = copyReply(reply);
-
- EXPECT_EQ(_bucket, reply2->getBucket());
- EXPECT_EQ(size_t(2), reply2->getSplitInfo().size());
- EXPECT_EQ(document::BucketId(17, 0), reply2->getSplitInfo()[0].first);
- EXPECT_EQ(document::BucketId(17, 1), reply2->getSplitInfo()[1].first);
- EXPECT_EQ(BucketInfo(100, 1000, 10000, true, true), reply2->getSplitInfo()[0].second);
- EXPECT_EQ(BucketInfo(101, 1001, 10001, true, true), reply2->getSplitInfo()[1].second);
-}
-
-TEST_P(StorageProtocolTest, join_buckets) {
+ SplitBucketCommand::SP cmd2(copyCommand(cmd, _version5_1));
+ CPPUNIT_ASSERT_EQUAL(20u, (uint32_t) cmd2->getMinSplitBits());
+ CPPUNIT_ASSERT_EQUAL(40u, (uint32_t) cmd2->getMaxSplitBits());
+ CPPUNIT_ASSERT_EQUAL(1000u, cmd2->getMinByteSize());
+ CPPUNIT_ASSERT_EQUAL(5u, cmd2->getMinDocCount());
+
+ SplitBucketReply::SP reply(new SplitBucketReply(*cmd2));
+ reply->getSplitInfo().push_back(SplitBucketReply::Entry(
+ document::BucketId(17, 0), BucketInfo(100, 1000, 10000, true, true)));
+ reply->getSplitInfo().push_back(SplitBucketReply::Entry(
+ document::BucketId(17, 1), BucketInfo(101, 1001, 10001, true, true)));
+ SplitBucketReply::SP reply2(copyReply(reply));
+
+ CPPUNIT_ASSERT_EQUAL(bucketId, reply2->getBucketId());
+ CPPUNIT_ASSERT_EQUAL(size_t(2), reply2->getSplitInfo().size());
+ CPPUNIT_ASSERT_EQUAL(document::BucketId(17, 0),
+ reply2->getSplitInfo()[0].first);
+ CPPUNIT_ASSERT_EQUAL(document::BucketId(17, 1),
+ reply2->getSplitInfo()[1].first);
+ CPPUNIT_ASSERT_EQUAL(BucketInfo(100, 1000, 10000, true, true),
+ reply2->getSplitInfo()[0].second);
+ CPPUNIT_ASSERT_EQUAL(BucketInfo(101, 1001, 10001, true, true),
+ reply2->getSplitInfo()[1].second);
+
+ recordOutput(*cmd2);
+ recordOutput(*reply2);
+ recordSerialization50();
+}
+
+void
+StorageProtocolTest::testJoinBuckets51()
+{
+ ScopedName test("testJoinBuckets51");
+ document::BucketId bucketId(16, 0);
+ document::Bucket bucket(makeDocumentBucket(bucketId));
std::vector<document::BucketId> sources;
sources.push_back(document::BucketId(17, 0));
sources.push_back(document::BucketId(17, 1));
- auto cmd = std::make_shared<JoinBucketsCommand>(_bucket);
+ JoinBucketsCommand::SP cmd(new JoinBucketsCommand(bucket));
cmd->getSourceBuckets() = sources;
cmd->setMinJoinBits(3);
- auto cmd2 = copyCommand(cmd);
- EXPECT_EQ(_bucket, cmd2->getBucket());
+ JoinBucketsCommand::SP cmd2(copyCommand(cmd, _version5_1));
- auto reply = std::make_shared<JoinBucketsReply>(*cmd2);
+ JoinBucketsReply::SP reply(new JoinBucketsReply(*cmd2));
reply->setBucketInfo(BucketInfo(3,4,5));
- auto reply2 = copyReply(reply);
+ JoinBucketsReply::SP reply2(copyReply(reply));
- EXPECT_EQ(sources, reply2->getSourceBuckets());
- EXPECT_EQ(3, cmd2->getMinJoinBits());
- EXPECT_EQ(BucketInfo(3,4,5), reply2->getBucketInfo());
- EXPECT_EQ(_bucket, reply2->getBucket());
+ CPPUNIT_ASSERT_EQUAL(sources, reply2->getSourceBuckets());
+ CPPUNIT_ASSERT_EQUAL(3, (int)cmd2->getMinJoinBits());
+ CPPUNIT_ASSERT_EQUAL(BucketInfo(3,4,5), reply2->getBucketInfo());
+ CPPUNIT_ASSERT_EQUAL(bucketId, reply2->getBucketId());
+
+ recordOutput(*cmd2);
+ recordOutput(*reply2);
}
-TEST_P(StorageProtocolTest, destroy_visitor) {
- auto cmd = std::make_shared<DestroyVisitorCommand>("instance");
- auto cmd2 = copyCommand(cmd);
- EXPECT_EQ("instance", cmd2->getInstanceId());
+void
+StorageProtocolTest::testDestroyVisitor51()
+{
+ ScopedName test("testDestroyVisitor51");
- auto reply = std::make_shared<DestroyVisitorReply>(*cmd2);
- auto reply2 = copyReply(reply);
+ DestroyVisitorCommand::SP cmd(
+ new DestroyVisitorCommand("instance"));
+ DestroyVisitorCommand::SP cmd2(copyCommand(cmd, _version5_1));
+ CPPUNIT_ASSERT_EQUAL(string("instance"), cmd2->getInstanceId());
+
+ DestroyVisitorReply::SP reply(new DestroyVisitorReply(*cmd2));
+ DestroyVisitorReply::SP reply2(copyReply(reply));
+
+ recordOutput(*cmd2);
+ recordOutput(*reply2);
+ recordSerialization50();
}
-TEST_P(StorageProtocolTest, remove_location) {
- auto cmd = std::make_shared<RemoveLocationCommand>("id.group == \"mygroup\"", _bucket);
- auto cmd2 = copyCommand(cmd);
- EXPECT_EQ("id.group == \"mygroup\"", cmd2->getDocumentSelection());
- EXPECT_EQ(_bucket, cmd2->getBucket());
+void
+StorageProtocolTest::testRemoveLocation51()
+{
+ ScopedName test("testRemoveLocation51");
+ document::BucketId bucketId(16, 1234);
+ document::Bucket bucket(makeDocumentBucket(bucketId));
+
+ RemoveLocationCommand::SP cmd(
+ new RemoveLocationCommand("id.group == \"mygroup\"", bucket));
+ RemoveLocationCommand::SP cmd2(copyCommand(cmd, _version5_1));
+ CPPUNIT_ASSERT_EQUAL(vespalib::string("id.group == \"mygroup\""), cmd2->getDocumentSelection());
+ CPPUNIT_ASSERT_EQUAL(bucketId, cmd2->getBucketId());
+
+ RemoveLocationReply::SP reply(new RemoveLocationReply(*cmd2));
+ RemoveLocationReply::SP reply2(copyReply(reply));
- auto reply = std::make_shared<RemoveLocationReply>(*cmd2);
- auto reply2 = copyReply(reply);
+ recordOutput(*cmd2);
+ recordOutput(*reply2);
+ recordSerialization50();
}
-TEST_P(StorageProtocolTest, create_visitor) {
+void
+StorageProtocolTest::testCreateVisitor51()
+{
+ ScopedName test("testCreateVisitor51");
+
std::vector<document::BucketId> buckets;
buckets.push_back(document::BucketId(16, 1));
buckets.push_back(document::BucketId(16, 2));
- auto cmd = std::make_shared<CreateVisitorCommand>(makeBucketSpace(), "library", "id", "doc selection");
+ CreateVisitorCommand::SP cmd(
+ new CreateVisitorCommand(makeBucketSpace(), "library", "id", "doc selection"));
cmd->setControlDestination("controldest");
cmd->setDataDestination("datadest");
cmd->setVisitorCmdId(1);
@@ -472,26 +681,40 @@ TEST_P(StorageProtocolTest, create_visitor) {
cmd->setFieldSet("foo,bar,vekterli");
cmd->setVisitInconsistentBuckets();
cmd->setQueueTimeout(100);
+ cmd->setVisitorOrdering(document::OrderingSpecification::DESCENDING);
cmd->setPriority(149);
- auto cmd2 = copyCommand(cmd);
- EXPECT_EQ("library", cmd2->getLibraryName());
- EXPECT_EQ("id", cmd2->getInstanceId());
- EXPECT_EQ("doc selection", cmd2->getDocumentSelection());
- EXPECT_EQ("controldest", cmd2->getControlDestination());
- EXPECT_EQ("datadest", cmd2->getDataDestination());
- EXPECT_EQ(api::Timestamp(123), cmd2->getFromTime());
- EXPECT_EQ(api::Timestamp(456), cmd2->getToTime());
- EXPECT_EQ(2u, cmd2->getMaximumPendingReplyCount());
- EXPECT_EQ(buckets, cmd2->getBuckets());
- EXPECT_EQ("foo,bar,vekterli", cmd2->getFieldSet());
- EXPECT_TRUE(cmd2->visitInconsistentBuckets());
- EXPECT_EQ(149, cmd2->getPriority());
-
- auto reply = std::make_shared<CreateVisitorReply>(*cmd2);
- auto reply2 = copyReply(reply);
-}
-
-TEST_P(StorageProtocolTest, get_bucket_diff) {
+ CreateVisitorCommand::SP cmd2(copyCommand(cmd, _version5_1));
+ CPPUNIT_ASSERT_EQUAL(string("library"), cmd2->getLibraryName());
+ CPPUNIT_ASSERT_EQUAL(string("id"), cmd2->getInstanceId());
+ CPPUNIT_ASSERT_EQUAL(string("doc selection"),
+ cmd2->getDocumentSelection());
+ CPPUNIT_ASSERT_EQUAL(string("controldest"),
+ cmd2->getControlDestination());
+ CPPUNIT_ASSERT_EQUAL(string("datadest"), cmd2->getDataDestination());
+ CPPUNIT_ASSERT_EQUAL(api::Timestamp(123), cmd2->getFromTime());
+ CPPUNIT_ASSERT_EQUAL(api::Timestamp(456), cmd2->getToTime());
+ CPPUNIT_ASSERT_EQUAL(2u, cmd2->getMaximumPendingReplyCount());
+ CPPUNIT_ASSERT_EQUAL(buckets, cmd2->getBuckets());
+ CPPUNIT_ASSERT_EQUAL(vespalib::string("foo,bar,vekterli"), cmd2->getFieldSet());
+ CPPUNIT_ASSERT(cmd2->visitInconsistentBuckets());
+ CPPUNIT_ASSERT_EQUAL(document::OrderingSpecification::DESCENDING, cmd2->getVisitorOrdering());
+ CPPUNIT_ASSERT_EQUAL(149, (int)cmd2->getPriority());
+
+ CreateVisitorReply::SP reply(new CreateVisitorReply(*cmd2));
+ CreateVisitorReply::SP reply2(copyReply(reply));
+
+ recordOutput(*cmd2);
+ recordOutput(*reply2);
+ recordSerialization50();
+}
+
+void
+StorageProtocolTest::testGetBucketDiff51()
+{
+ ScopedName test("testGetBucketDiff51");
+ document::BucketId bucketId(623);
+ document::Bucket bucket(makeDocumentBucket(bucketId));
+
std::vector<api::MergeBucketCommand::Node> nodes;
nodes.push_back(4);
nodes.push_back(13);
@@ -504,68 +727,56 @@ TEST_P(StorageProtocolTest, get_bucket_diff) {
entries.back()._flags = 1;
entries.back()._hasMask = 3;
- EXPECT_EQ("Entry(timestamp: 123456, gid(0x313233343536373839306162), hasMask: 0x3,\n"
- " header size: 100, body size: 65536, flags 0x1)",
- entries.back().toString(true));
+ CPPUNIT_ASSERT_EQUAL(std::string(
+ "Entry(timestamp: 123456, gid(0x313233343536373839306162), "
+ "hasMask: 0x3,\n"
+ " header size: 100, body size: 65536, flags 0x1)"),
+ entries.back().toString(true));
- auto cmd = std::make_shared<GetBucketDiffCommand>(_bucket, nodes, 1056);
+ GetBucketDiffCommand::SP cmd(new GetBucketDiffCommand(bucket, nodes, 1056));
cmd->getDiff() = entries;
- auto cmd2 = copyCommand(cmd);
- EXPECT_EQ(_bucket, cmd2->getBucket());
-
- auto reply = std::make_shared<GetBucketDiffReply>(*cmd2);
- EXPECT_EQ(entries, reply->getDiff());
- auto reply2 = copyReply(reply);
+ GetBucketDiffCommand::SP cmd2(copyCommand(cmd, _version5_1));
- EXPECT_EQ(nodes, reply2->getNodes());
- EXPECT_EQ(entries, reply2->getDiff());
- EXPECT_EQ(Timestamp(1056), reply2->getMaxTimestamp());
-}
-
-namespace {
-
-ApplyBucketDiffCommand::Entry dummy_apply_entry() {
- ApplyBucketDiffCommand::Entry e;
- e._docName = "my cool id";
- vespalib::string header_data = "fancy header";
- e._headerBlob.resize(header_data.size());
- memcpy(&e._headerBlob[0], header_data.data(), header_data.size());
+ GetBucketDiffReply::SP reply(new GetBucketDiffReply(*cmd2));
+ CPPUNIT_ASSERT_EQUAL(entries, reply->getDiff());
+ GetBucketDiffReply::SP reply2(copyReply(reply));
- vespalib::string body_data = "fancier body!";
- e._bodyBlob.resize(body_data.size());
- memcpy(&e._bodyBlob[0], body_data.data(), body_data.size());
+ CPPUNIT_ASSERT_EQUAL(nodes, reply2->getNodes());
+ CPPUNIT_ASSERT_EQUAL(entries, reply2->getDiff());
+ CPPUNIT_ASSERT_EQUAL(Timestamp(1056), reply2->getMaxTimestamp());
- GetBucketDiffCommand::Entry meta;
- meta._timestamp = 567890;
- meta._hasMask = 0x3;
- meta._flags = 0x1;
- meta._headerSize = 12345;
- meta._headerSize = header_data.size();
- meta._bodySize = body_data.size();
-
- e._entry = meta;
- return e;
+ recordOutput(*cmd2);
+ recordOutput(*reply2);
+ recordSerialization50();
}
-}
+void
+StorageProtocolTest::testApplyBucketDiff51()
+{
+ ScopedName test("testApplyBucketDiff51");
+ document::BucketId bucketId(16, 623);
+ document::Bucket bucket(makeDocumentBucket(bucketId));
-TEST_P(StorageProtocolTest, apply_bucket_diff) {
std::vector<api::MergeBucketCommand::Node> nodes;
nodes.push_back(4);
nodes.push_back(13);
- std::vector<ApplyBucketDiffCommand::Entry> entries = {dummy_apply_entry()};
+ std::vector<ApplyBucketDiffCommand::Entry> entries;
+ entries.push_back(ApplyBucketDiffCommand::Entry());
- auto cmd = std::make_shared<ApplyBucketDiffCommand>(_bucket, nodes, 1234);
+ ApplyBucketDiffCommand::SP cmd(new ApplyBucketDiffCommand(bucket, nodes, 1234));
cmd->getDiff() = entries;
- auto cmd2 = copyCommand(cmd);
- EXPECT_EQ(_bucket, cmd2->getBucket());
+ ApplyBucketDiffCommand::SP cmd2(copyCommand(cmd, _version5_1));
- auto reply = std::make_shared<ApplyBucketDiffReply>(*cmd2);
- auto reply2 = copyReply(reply);
+ ApplyBucketDiffReply::SP reply(new ApplyBucketDiffReply(*cmd2));
+ ApplyBucketDiffReply::SP reply2(copyReply(reply));
- EXPECT_EQ(nodes, reply2->getNodes());
- EXPECT_EQ(entries, reply2->getDiff());
- EXPECT_EQ(1234u, reply2->getMaxBufferSize());
+ CPPUNIT_ASSERT_EQUAL(nodes, reply2->getNodes());
+ CPPUNIT_ASSERT_EQUAL(entries, reply2->getDiff());
+ CPPUNIT_ASSERT_EQUAL(1234u, reply2->getMaxBufferSize());
+
+ recordOutput(*cmd2);
+ recordOutput(*reply2);
+ recordSerialization50();
}
namespace {
@@ -596,97 +807,161 @@ namespace {
};
api::StorageReply::UP MyCommand::makeReply() {
- return std::make_unique<MyReply>(*this);
+ return api::StorageReply::UP(new MyReply(*this));
}
}
-TEST_P(StorageProtocolTest, internal_message) {
+void
+StorageProtocolTest::testInternalMessage()
+{
+ ScopedName test("testInternal51");
MyCommand cmd;
MyReply reply(cmd);
- // TODO what's this even intended to test?
+
+ recordOutput(cmd);
+ recordOutput(reply);
}
-TEST_P(StorageProtocolTest, set_bucket_state_with_inactive_state) {
- auto cmd = std::make_shared<SetBucketStateCommand>(_bucket, SetBucketStateCommand::INACTIVE);
- auto cmd2 = copyCommand(cmd);
- EXPECT_EQ(_bucket, cmd2->getBucket());
+void
+StorageProtocolTest::testSetBucketState51()
+{
+ ScopedName test("testSetBucketState51");
+ document::BucketId bucketId(16, 0);
+ document::Bucket bucket(makeDocumentBucket(bucketId));
+ SetBucketStateCommand::SP cmd(
+ new SetBucketStateCommand(bucket, SetBucketStateCommand::ACTIVE));
+ SetBucketStateCommand::SP cmd2(copyCommand(cmd, _version5_1));
- auto reply = std::make_shared<SetBucketStateReply>(*cmd2);
- auto reply2 = copyReply(reply);
+ SetBucketStateReply::SP reply(new SetBucketStateReply(*cmd2));
+ SetBucketStateReply::SP reply2(copyReply(reply));
- EXPECT_EQ(SetBucketStateCommand::INACTIVE, cmd2->getState());
- EXPECT_EQ(_bucket, reply2->getBucket());
-}
+ CPPUNIT_ASSERT_EQUAL(SetBucketStateCommand::ACTIVE, cmd2->getState());
+ CPPUNIT_ASSERT_EQUAL(bucketId, cmd2->getBucketId());
+ CPPUNIT_ASSERT_EQUAL(bucketId, reply2->getBucketId());
-TEST_P(StorageProtocolTest, set_bucket_state_with_active_state) {
- auto cmd = std::make_shared<SetBucketStateCommand>(_bucket, SetBucketStateCommand::ACTIVE);
- auto cmd2 = copyCommand(cmd);
- EXPECT_EQ(SetBucketStateCommand::ACTIVE, cmd2->getState());
+ recordOutput(*cmd2);
+ recordOutput(*reply2);
}
-TEST_P(StorageProtocolTest, put_command_with_condition) {
- auto cmd = std::make_shared<PutCommand>(_bucket, _testDoc, 14);
+void
+StorageProtocolTest::testPutCommand52()
+{
+ ScopedName test("testPutCommand52");
+
+ PutCommand::SP cmd(new PutCommand(_bucket, _testDoc, 14));
cmd->setCondition(TestAndSetCondition(CONDITION_STRING));
- auto cmd2 = copyCommand(cmd);
- EXPECT_EQ(cmd->getCondition().getSelection(), cmd2->getCondition().getSelection());
+ PutCommand::SP cmd2(copyCommand(cmd, _version5_2));
+ CPPUNIT_ASSERT_EQUAL(cmd->getCondition().getSelection(), cmd2->getCondition().getSelection());
}
-TEST_P(StorageProtocolTest, update_command_with_condition) {
- auto update = std::make_shared<document::DocumentUpdate>(
- _docMan.getTypeRepo(), *_testDoc->getDataType(), _testDoc->getId());
- auto cmd = std::make_shared<UpdateCommand>(_bucket, update, 14);
+void
+StorageProtocolTest::testUpdateCommand52()
+{
+ ScopedName test("testUpdateCommand52");
+
+ document::DocumentUpdate::SP update(new document::DocumentUpdate(_docMan.getTypeRepo(), *_testDoc->getDataType(), _testDoc->getId()));
+ UpdateCommand::SP cmd(new UpdateCommand(_bucket, update, 14));
cmd->setCondition(TestAndSetCondition(CONDITION_STRING));
- auto cmd2 = copyCommand(cmd);
- EXPECT_EQ(cmd->getCondition().getSelection(), cmd2->getCondition().getSelection());
+ UpdateCommand::SP cmd2(copyCommand(cmd, _version5_2));
+ CPPUNIT_ASSERT_EQUAL(cmd->getCondition().getSelection(), cmd2->getCondition().getSelection());
}
-TEST_P(StorageProtocolTest, remove_command_with_condition) {
- auto cmd = std::make_shared<RemoveCommand>(_bucket, _testDocId, 159);
+void
+StorageProtocolTest::testRemoveCommand52()
+{
+ ScopedName test("testRemoveCommand52");
+
+ RemoveCommand::SP cmd(new RemoveCommand(_bucket, _testDocId, 159));
cmd->setCondition(TestAndSetCondition(CONDITION_STRING));
- auto cmd2 = copyCommand(cmd);
- EXPECT_EQ(cmd->getCondition().getSelection(), cmd2->getCondition().getSelection());
+ RemoveCommand::SP cmd2(copyCommand(cmd, _version5_2));
+ CPPUNIT_ASSERT_EQUAL(cmd->getCondition().getSelection(), cmd2->getCondition().getSelection());
}
-TEST_P(StorageProtocolTest, put_command_with_bucket_space) {
- document::Bucket bucket(document::BucketSpace(5), _bucket_id);
+void
+StorageProtocolTest::testPutCommandWithBucketSpace6_0()
+{
+ ScopedName test("testPutCommandWithBucketSpace6_0");
+
+ document::Bucket bucket(document::BucketSpace(5), _bucket.getBucketId());
auto cmd = std::make_shared<PutCommand>(bucket, _testDoc, 14);
- auto cmd2 = copyCommand(cmd);
- EXPECT_EQ(bucket, cmd2->getBucket());
+ auto cmd2 = copyCommand(cmd, _version6_0);
+ CPPUNIT_ASSERT_EQUAL(bucket, cmd2->getBucket());
}
-TEST_P(StorageProtocolTest, create_visitor_with_bucket_space) {
+void
+StorageProtocolTest::testCreateVisitorWithBucketSpace6_0()
+{
+ ScopedName test("testCreateVisitorWithBucketSpace6_0");
+
document::BucketSpace bucketSpace(5);
auto cmd = std::make_shared<CreateVisitorCommand>(bucketSpace, "library", "id", "doc selection");
- auto cmd2 = copyCommand(cmd);
- EXPECT_EQ(bucketSpace, cmd2->getBucketSpace());
+ auto cmd2 = copyCommand(cmd, _version6_0);
+ CPPUNIT_ASSERT_EQUAL(bucketSpace, cmd2->getBucketSpace());
}
-TEST_P(StorageProtocolTest, request_bucket_info_with_bucket_space) {
+void
+StorageProtocolTest::testRequestBucketInfoWithBucketSpace6_0()
+{
+ ScopedName test("testRequestBucketInfoWithBucketSpace6_0");
+
document::BucketSpace bucketSpace(5);
std::vector<document::BucketId> ids = {document::BucketId(3)};
auto cmd = std::make_shared<RequestBucketInfoCommand>(bucketSpace, ids);
- auto cmd2 = copyCommand(cmd);
- EXPECT_EQ(bucketSpace, cmd2->getBucketSpace());
- EXPECT_EQ(ids, cmd2->getBuckets());
+ auto cmd2 = copyCommand(cmd, _version6_0);
+ CPPUNIT_ASSERT_EQUAL(bucketSpace, cmd2->getBucketSpace());
+ CPPUNIT_ASSERT_EQUAL(ids, cmd2->getBuckets());
+}
+
+void
+StorageProtocolTest::serialized_size_is_used_to_set_approx_size_of_storage_message()
+{
+ ScopedName test("serialized_size_is_used_to_set_approx_size_of_storage_message");
+
+ PutCommand::SP cmd(new PutCommand(_bucket, _testDoc, 14));
+ CPPUNIT_ASSERT_EQUAL(50u, cmd->getApproxByteSize());
+
+ PutCommand::SP cmd2(copyCommand(cmd, _version6_0));
+ CPPUNIT_ASSERT_EQUAL(181u, cmd2->getApproxByteSize());
}
-TEST_P(StorageProtocolTest, serialized_size_is_used_to_set_approx_size_of_storage_message) {
- auto cmd = std::make_shared<PutCommand>(_bucket, _testDoc, 14);
- EXPECT_EQ(50u, cmd->getApproxByteSize());
+void
+StorageProtocolTest::testStringOutputs()
+{
+ std::cerr << "\nNon verbose output:\n";
+ for (uint32_t i=0, n=_nonVerboseMessageStrings.size(); i<n; ++i) {
+ std::cerr << _nonVerboseMessageStrings[i] << "\n";
+ }
+ std::cerr << "\nVerbose output:\n";
+ for (uint32_t i=0, n=_verboseMessageStrings.size(); i<n; ++i) {
+ std::cerr << _verboseMessageStrings[i] << "\n";
+ }
+}
- auto cmd2 = copyCommand(cmd);
- auto version = GetParam();
- if (version.getMajor() == 7) { // Protobuf-based encoding
- EXPECT_EQ(158u, cmd2->getApproxByteSize());
- } else { // Legacy encoding
- EXPECT_EQ(181u, cmd2->getApproxByteSize());
+void
+StorageProtocolTest::testWriteSerialization50()
+{
+ std::ofstream of("mbusprot/mbusprot.5.0.serialization.5.1");
+ of << std::hex << std::setfill('0');
+ for (uint32_t i=0, n=_serialization50.size(); i<n; ++i) {
+ char c = _serialization50[i];
+ if (c > 126 || (c < 32 && c != 10)) {
+ int32_t num = static_cast<int32_t>(c);
+ if (num < 0) num += 256;
+ of << '\\' << std::setw(2) << num;
+ } else if (c == '\\') {
+ of << "\\\\";
+ } else {
+ of << c;
+ }
}
+ of.close();
}
-} // storage::api
+} // mbusprot
+} // storage
diff --git a/storageapi/src/vespa/storageapi/CMakeLists.txt b/storageapi/src/vespa/storageapi/CMakeLists.txt
index 90eb6dd9eca..c08dcbc2419 100644
--- a/storageapi/src/vespa/storageapi/CMakeLists.txt
+++ b/storageapi/src/vespa/storageapi/CMakeLists.txt
@@ -1,5 +1,4 @@
# Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
vespa_add_library(storageapi
SOURCES
$<TARGET_OBJECTS:storageapi_message>
@@ -9,6 +8,3 @@ vespa_add_library(storageapi
INSTALL lib64
DEPENDS
)
-
-vespa_add_target_package_dependency(storageapi Protobuf)
-
diff --git a/storageapi/src/vespa/storageapi/mbusprot/.gitignore b/storageapi/src/vespa/storageapi/mbusprot/.gitignore
index 8e91fe9cab0..526f91c6668 100644
--- a/storageapi/src/vespa/storageapi/mbusprot/.gitignore
+++ b/storageapi/src/vespa/storageapi/mbusprot/.gitignore
@@ -5,6 +5,3 @@
.deps
.libs
Makefile
-*.pb.h
-*.pb.cc
-
diff --git a/storageapi/src/vespa/storageapi/mbusprot/CMakeLists.txt b/storageapi/src/vespa/storageapi/mbusprot/CMakeLists.txt
index dc4e3897e49..d5952d7cb91 100644
--- a/storageapi/src/vespa/storageapi/mbusprot/CMakeLists.txt
+++ b/storageapi/src/vespa/storageapi/mbusprot/CMakeLists.txt
@@ -1,19 +1,4 @@
# Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-find_package(Protobuf REQUIRED)
-PROTOBUF_GENERATE_CPP(storageapi_PROTOBUF_SRCS storageapi_PROTOBUF_HDRS
- protobuf/common.proto
- protobuf/feed.proto
- protobuf/visiting.proto
- protobuf/maintenance.proto)
-
-# protoc-generated files emit compiler warnings that we normally treat as errors.
-# Instead of rolling our own compiler plugin we'll pragmatically disable the noise.
-set_source_files_properties(${storageapi_PROTOBUF_SRCS} PROPERTIES COMPILE_FLAGS "-Wno-suggest-override -Wno-inline")
-# protoc explicitly annotates methods with inline, which triggers -Werror=inline when
-# the header file grows over a certain size.
-set_source_files_properties(protocolserialization7.cpp PROPERTIES COMPILE_FLAGS "-Wno-inline")
-
vespa_add_library(storageapi_mbusprot OBJECT
SOURCES
storagemessage.cpp
@@ -26,7 +11,5 @@ vespa_add_library(storageapi_mbusprot OBJECT
protocolserialization5_1.cpp
protocolserialization5_2.cpp
protocolserialization6_0.cpp
- protocolserialization7.cpp
- ${storageapi_PROTOBUF_SRCS}
DEPENDS
)
diff --git a/storageapi/src/vespa/storageapi/mbusprot/legacyprotocolserialization.h b/storageapi/src/vespa/storageapi/mbusprot/legacyprotocolserialization.h
deleted file mode 100644
index ef4a6b28749..00000000000
--- a/storageapi/src/vespa/storageapi/mbusprot/legacyprotocolserialization.h
+++ /dev/null
@@ -1,31 +0,0 @@
-// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#pragma once
-
-#include "protocolserialization.h"
-
-namespace storage::mbusprot {
-
-/*
- * Utility base class for pre-v7 (protobuf) serialization implementations.
- *
- * TODO remove on Vespa 8 alongside legacy serialization formats.
- */
-class LegacyProtocolSerialization : public ProtocolSerialization {
- const std::shared_ptr<const document::DocumentTypeRepo> _repo;
-public:
- explicit LegacyProtocolSerialization(const std::shared_ptr<const document::DocumentTypeRepo>& repo)
- : _repo(repo)
- {}
-
- const document::DocumentTypeRepo& getTypeRepo() const { return *_repo; }
- const std::shared_ptr<const document::DocumentTypeRepo> getTypeRepoSp() const { return _repo; }
-
- virtual document::Bucket getBucket(document::ByteBuffer& buf) const = 0;
- virtual void putBucket(const document::Bucket& bucket, vespalib::GrowableByteBuffer& buf) const = 0;
- virtual document::BucketSpace getBucketSpace(document::ByteBuffer& buf) const = 0;
- virtual void putBucketSpace(document::BucketSpace bucketSpace, vespalib::GrowableByteBuffer& buf) const = 0;
- virtual api::BucketInfo getBucketInfo(document::ByteBuffer& buf) const = 0;
- virtual void putBucketInfo(const api::BucketInfo& info, vespalib::GrowableByteBuffer& buf) const = 0;
-};
-
-} // storage::mbusprot
diff --git a/storageapi/src/vespa/storageapi/mbusprot/protobuf/common.proto b/storageapi/src/vespa/storageapi/mbusprot/protobuf/common.proto
deleted file mode 100644
index d641449995d..00000000000
--- a/storageapi/src/vespa/storageapi/mbusprot/protobuf/common.proto
+++ /dev/null
@@ -1,68 +0,0 @@
-// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-syntax = "proto3";
-
-option cc_enable_arenas = true;
-
-package storage.mbusprot.protobuf;
-
-// Note: we use a *Request/*Response naming convention rather than *Command/*Reply,
-// as the former is the gRPC convention and that's where we intend to move.
-
-message BucketSpace {
- uint64 space_id = 1;
-}
-
-message BucketId {
- fixed64 raw_id = 1;
-}
-
-message Bucket {
- uint64 space_id = 1;
- fixed64 raw_bucket_id = 2;
-}
-
-// Next tag to use: 3
-message BucketInfo {
- uint64 last_modified_timestamp = 1;
- fixed32 legacy_checksum = 2;
- // TODO v2 checksum
- uint32 doc_count = 3;
- uint32 total_doc_size = 4;
- uint32 meta_count = 5;
- uint32 used_file_size = 6;
- bool ready = 7;
- bool active = 8;
-}
-
-message GlobalId {
- // 96 bits of GID data in _little_ endian. High entropy, so fixed encoding is better than varint.
- // Low 64 bits as if memcpy()ed from bytes [0, 8) of the GID buffer
- fixed64 lo_64 = 1;
- // High 32 bits as if memcpy()ed from bytes [8, 12) of the GID buffer
- fixed32 hi_32 = 2;
-}
-
-// TODO these should ideally be gRPC headers..
-message RequestHeader {
- uint64 message_id = 1;
- uint32 priority = 2; // Always in range [0, 255]
- uint32 source_index = 3; // Always in range [0, 65535]
- fixed32 loadtype_id = 4; // It's a hash with high entropy, so fixed encoding is better than varint
-}
-
-// TODO these should ideally be gRPC headers..
-message ResponseHeader {
- // TODO this should ideally be gRPC Status...
- uint32 return_code_id = 1;
- bytes return_code_message = 2; // FIXME it's `bytes` since `string` will check for UTF-8... might not hold...
- uint64 message_id = 3;
- uint32 priority = 4; // Always in range [0, 255]
-}
-
-message Document {
- bytes payload = 1;
-}
-
-message DocumentId {
- bytes id = 1;
-}
diff --git a/storageapi/src/vespa/storageapi/mbusprot/protobuf/feed.proto b/storageapi/src/vespa/storageapi/mbusprot/protobuf/feed.proto
deleted file mode 100644
index 58da24df836..00000000000
--- a/storageapi/src/vespa/storageapi/mbusprot/protobuf/feed.proto
+++ /dev/null
@@ -1,91 +0,0 @@
-// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-syntax = "proto3";
-
-option cc_enable_arenas = true;
-
-package storage.mbusprot.protobuf;
-
-import "common.proto";
-
-message TestAndSetCondition {
- bytes selection = 1;
-}
-
-message PutRequest {
- Bucket bucket = 1;
- Document document = 2;
- uint64 new_timestamp = 3;
- uint64 expected_old_timestamp = 4; // If zero; no expectation.
- TestAndSetCondition condition = 5;
-}
-
-message PutResponse {
- BucketInfo bucket_info = 1;
- BucketId remapped_bucket_id = 2;
- bool was_found = 3;
-}
-
-message Update {
- bytes payload = 1;
-}
-
-message UpdateRequest {
- Bucket bucket = 1;
- Update update = 2;
- uint64 new_timestamp = 3;
- uint64 expected_old_timestamp = 4; // If zero; no expectation.
- TestAndSetCondition condition = 5;
-}
-
-message UpdateResponse {
- BucketInfo bucket_info = 1;
- BucketId remapped_bucket_id = 2;
- uint64 updated_timestamp = 3;
-}
-
-message RemoveRequest {
- Bucket bucket = 1;
- bytes document_id = 2;
- uint64 new_timestamp = 3;
- TestAndSetCondition condition = 4;
-}
-
-message RemoveResponse {
- BucketInfo bucket_info = 1;
- BucketId remapped_bucket_id = 2;
- uint64 removed_timestamp = 3;
-}
-
-message GetRequest {
- Bucket bucket = 1;
- bytes document_id = 2;
- bytes field_set = 3;
- uint64 before_timestamp = 4;
-}
-
-message GetResponse {
- Document document = 1;
- uint64 last_modified_timestamp = 2;
- BucketInfo bucket_info = 3;
- BucketId remapped_bucket_id = 4;
-}
-
-message RevertRequest {
- Bucket bucket = 1;
- repeated uint64 revert_tokens = 2;
-}
-
-message RevertResponse {
- BucketInfo bucket_info = 1;
- BucketId remapped_bucket_id = 2;
-}
-
-message RemoveLocationRequest {
- Bucket bucket = 1;
- bytes document_selection = 2;
-}
-
-message RemoveLocationResponse {
- BucketInfo bucket_info = 1;
- BucketId remapped_bucket_id = 2;
-}
diff --git a/storageapi/src/vespa/storageapi/mbusprot/protobuf/maintenance.proto b/storageapi/src/vespa/storageapi/mbusprot/protobuf/maintenance.proto
deleted file mode 100644
index c4766d2900a..00000000000
--- a/storageapi/src/vespa/storageapi/mbusprot/protobuf/maintenance.proto
+++ /dev/null
@@ -1,160 +0,0 @@
-// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-syntax = "proto3";
-
-option cc_enable_arenas = true;
-
-package storage.mbusprot.protobuf;
-
-import "common.proto";
-
-message DeleteBucketRequest {
- Bucket bucket = 1;
- BucketInfo expected_bucket_info = 2;
-}
-
-message DeleteBucketResponse {
- BucketInfo bucket_info = 1;
- BucketId remapped_bucket_id = 2;
-}
-
-message CreateBucketRequest {
- Bucket bucket = 1;
- bool create_as_active = 2;
-}
-
-message CreateBucketResponse {
- BucketInfo bucket_info = 1;
- BucketId remapped_bucket_id = 2;
-}
-
-message MergeNode {
- uint32 index = 1;
- bool source_only = 2;
-}
-
-message MergeBucketRequest {
- Bucket bucket = 1;
- uint32 cluster_state_version = 2;
- uint64 max_timestamp = 3;
- repeated MergeNode nodes = 4;
- repeated uint32 node_chain = 5;
-}
-
-message MergeBucketResponse {
- BucketId remapped_bucket_id = 1;
-}
-
-message MetaDiffEntry {
- uint64 timestamp = 1;
- GlobalId gid = 2;
- uint32 header_size = 3;
- uint32 body_size = 4;
- uint32 flags = 5;
- uint32 presence_mask = 6;
-}
-
-message GetBucketDiffRequest {
- Bucket bucket = 1;
- uint64 max_timestamp = 2;
- repeated MergeNode nodes = 3;
- repeated MetaDiffEntry diff = 4;
-}
-
-message GetBucketDiffResponse {
- BucketId remapped_bucket_id = 1;
- repeated MetaDiffEntry diff = 2;
-}
-
-message ApplyDiffEntry {
- MetaDiffEntry entry_meta = 1;
- bytes document_id = 2;
- bytes header_blob = 3;
- bytes body_blob = 4;
-}
-
-message ApplyBucketDiffRequest {
- Bucket bucket = 1;
- repeated MergeNode nodes = 2;
- uint32 max_buffer_size = 3;
- repeated ApplyDiffEntry entries = 4;
-}
-
-message ApplyBucketDiffResponse {
- BucketId remapped_bucket_id = 1;
- repeated ApplyDiffEntry entries = 4;
-}
-
-message ExplicitBucketSet {
- // `Bucket` is not needed, as the space is inferred from the owning message.
- repeated BucketId bucket_ids = 2;
-}
-
-message AllBuckets {
- uint32 distributor_index = 1;
- bytes cluster_state = 2;
- bytes distribution_hash = 3;
-}
-
-message RequestBucketInfoRequest {
- BucketSpace bucket_space = 1;
- oneof request_for {
- ExplicitBucketSet explicit_bucket_set = 2;
- AllBuckets all_buckets = 3;
- }
-}
-
-message BucketAndBucketInfo {
- fixed64 raw_bucket_id = 1;
- BucketInfo bucket_info = 2;
-}
-
-message RequestBucketInfoResponse {
- repeated BucketAndBucketInfo bucket_infos = 1;
-}
-
-message NotifyBucketChangeRequest {
- Bucket bucket = 1;
- BucketInfo bucket_info = 2;
-}
-
-message NotifyBucketChangeResponse {
- // Currently empty
-}
-
-message SplitBucketRequest {
- Bucket bucket = 1;
- uint32 min_split_bits = 2;
- uint32 max_split_bits = 3;
- uint32 min_byte_size = 4;
- uint32 min_doc_count = 5;
-}
-
-message SplitBucketResponse {
- BucketId remapped_bucket_id = 1;
- repeated BucketAndBucketInfo split_info = 2;
-}
-
-message JoinBucketsRequest {
- Bucket bucket = 1;
- repeated BucketId source_buckets = 2;
- uint32 min_join_bits = 3;
-}
-
-message JoinBucketsResponse {
- BucketInfo bucket_info = 1;
- BucketId remapped_bucket_id = 2;
-}
-
-message SetBucketStateRequest {
- enum BucketState {
- Inactive = 0;
- Active = 1;
- }
-
- Bucket bucket = 1;
- BucketState state = 2;
-}
-
-message SetBucketStateResponse {
- BucketId remapped_bucket_id = 1;
-}
diff --git a/storageapi/src/vespa/storageapi/mbusprot/protobuf/visiting.proto b/storageapi/src/vespa/storageapi/mbusprot/protobuf/visiting.proto
deleted file mode 100644
index 89ce39e52a0..00000000000
--- a/storageapi/src/vespa/storageapi/mbusprot/protobuf/visiting.proto
+++ /dev/null
@@ -1,66 +0,0 @@
-// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-syntax = "proto3";
-
-option cc_enable_arenas = true;
-
-package storage.mbusprot.protobuf;
-
-import "common.proto";
-
-message ClientVisitorParameter {
- bytes key = 1;
- bytes value = 2;
-}
-
-message VisitorConstraints {
- bytes document_selection = 1;
- uint64 from_time_usec = 2;
- uint64 to_time_usec = 3;
- bool visit_removes = 4;
- bytes field_set = 5;
- bool visit_inconsistent_buckets = 6;
-}
-
-message VisitorControlMeta {
- bytes instance_id = 1;
- bytes library_name = 2;
- uint32 visitor_command_id = 3;
- bytes control_destination = 4;
- bytes data_destination = 5;
-
- // TODO move?
- uint32 max_pending_reply_count = 6;
- uint32 queue_timeout = 7;
- uint32 max_buckets_per_visitor = 8;
-}
-
-message CreateVisitorRequest {
- BucketSpace bucket_space = 1;
- repeated BucketId buckets = 2;
-
- VisitorConstraints constraints = 3;
- VisitorControlMeta control_meta = 4;
- repeated ClientVisitorParameter client_parameters = 5;
-}
-
-message VisitorStatistics {
- uint32 buckets_visited = 1;
- uint64 documents_visited = 2;
- uint64 bytes_visited = 3;
- uint64 documents_returned = 4;
- uint64 bytes_returned = 5;
- uint64 second_pass_documents_returned = 6; // TODO don't include? orderdoc only
- uint64 second_pass_bytes_returned = 7; // TODO don't include? orderdoc only
-}
-
-message CreateVisitorResponse {
- VisitorStatistics visitor_statistics = 1;
-}
-
-message DestroyVisitorRequest {
- bytes instance_id = 1;
-}
-
-message DestroyVisitorResponse {
- // Currently empty
-}
diff --git a/storageapi/src/vespa/storageapi/mbusprot/protobuf_includes.h b/storageapi/src/vespa/storageapi/mbusprot/protobuf_includes.h
deleted file mode 100644
index 8e878cf0560..00000000000
--- a/storageapi/src/vespa/storageapi/mbusprot/protobuf_includes.h
+++ /dev/null
@@ -1,13 +0,0 @@
-// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-#pragma once
-
-// Disable warnings emitted by protoc generated files
-#pragma GCC diagnostic push
-#pragma GCC diagnostic ignored "-Wsuggest-override"
-
-#include "feed.pb.h"
-#include "visiting.pb.h"
-#include "maintenance.pb.h"
-
-#pragma GCC diagnostic pop
diff --git a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization.cpp b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization.cpp
index 917b60c50c3..172cd6c8de5 100644
--- a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization.cpp
+++ b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization.cpp
@@ -17,6 +17,11 @@ LOG_SETUP(".storage.api.mbusprot.serialization.base");
namespace storage::mbusprot {
+ProtocolSerialization::ProtocolSerialization(const std::shared_ptr<const document::DocumentTypeRepo>& repo)
+ : _repo(repo)
+{
+}
+
mbus::Blob
ProtocolSerialization::encode(const api::StorageMessage& msg) const
{
diff --git a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization.h b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization.h
index a57627b9ba9..9c3ddb88bdf 100644
--- a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization.h
+++ b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization.h
@@ -59,14 +59,21 @@ class StorageCommand;
class StorageReply;
class ProtocolSerialization {
+ const std::shared_ptr<const document::DocumentTypeRepo> _repo;
+
public:
virtual mbus::Blob encode(const api::StorageMessage&) const;
virtual std::unique_ptr<StorageCommand> decodeCommand(mbus::BlobRef) const;
virtual std::unique_ptr<StorageReply> decodeReply(
mbus::BlobRef, const api::StorageCommand&) const;
+
protected:
- ProtocolSerialization() = default;
- virtual ~ProtocolSerialization() = default;
+ const document::DocumentTypeRepo& getTypeRepo() const { return *_repo; }
+ const std::shared_ptr<const document::DocumentTypeRepo> getTypeRepoSp() const
+ { return _repo; }
+
+ ProtocolSerialization(const std::shared_ptr<const document::DocumentTypeRepo> &repo);
+ virtual ~ProtocolSerialization() {}
typedef api::StorageCommand SCmd;
typedef api::StorageReply SRep;
@@ -95,10 +102,13 @@ protected:
virtual void onEncode(GBBuf&, const api::GetBucketDiffReply&) const = 0;
virtual void onEncode(GBBuf&, const api::ApplyBucketDiffCommand&) const = 0;
virtual void onEncode(GBBuf&, const api::ApplyBucketDiffReply&) const = 0;
- virtual void onEncode(GBBuf&, const api::RequestBucketInfoCommand&) const = 0;
+ virtual void onEncode(GBBuf&,
+ const api::RequestBucketInfoCommand&) const = 0;
virtual void onEncode(GBBuf&, const api::RequestBucketInfoReply&) const = 0;
- virtual void onEncode(GBBuf&, const api::NotifyBucketChangeCommand&) const = 0;
- virtual void onEncode(GBBuf&, const api::NotifyBucketChangeReply&) const = 0;
+ virtual void onEncode(GBBuf&,
+ const api::NotifyBucketChangeCommand&) const = 0;
+ virtual void onEncode(GBBuf&,
+ const api::NotifyBucketChangeReply&) const = 0;
virtual void onEncode(GBBuf&, const api::SplitBucketCommand&) const = 0;
virtual void onEncode(GBBuf&, const api::SplitBucketReply&) const = 0;
virtual void onEncode(GBBuf&, const api::JoinBucketsCommand&) const = 0;
@@ -133,9 +143,11 @@ protected:
virtual SCmd::UP onDecodeApplyBucketDiffCommand(BBuf&) const = 0;
virtual SRep::UP onDecodeApplyBucketDiffReply(const SCmd&, BBuf&) const = 0;
virtual SCmd::UP onDecodeRequestBucketInfoCommand(BBuf&) const = 0;
- virtual SRep::UP onDecodeRequestBucketInfoReply(const SCmd&, BBuf&) const = 0;
+ virtual SRep::UP onDecodeRequestBucketInfoReply(const SCmd&,
+ BBuf&) const = 0;
virtual SCmd::UP onDecodeNotifyBucketChangeCommand(BBuf&) const = 0;
- virtual SRep::UP onDecodeNotifyBucketChangeReply(const SCmd&, BBuf&) const = 0;
+ virtual SRep::UP onDecodeNotifyBucketChangeReply(const SCmd&,
+ BBuf&) const = 0;
virtual SCmd::UP onDecodeSplitBucketCommand(BBuf&) const = 0;
virtual SRep::UP onDecodeSplitBucketReply(const SCmd&, BBuf&) const = 0;
virtual SCmd::UP onDecodeJoinBucketsCommand(BBuf&) const = 0;
@@ -148,6 +160,14 @@ protected:
virtual SRep::UP onDecodeDestroyVisitorReply(const SCmd&, BBuf&) const = 0;
virtual SCmd::UP onDecodeRemoveLocationCommand(BBuf&) const = 0;
virtual SRep::UP onDecodeRemoveLocationReply(const SCmd&, BBuf&) const = 0;
+
+ virtual document::Bucket getBucket(document::ByteBuffer& buf) const = 0;
+ virtual void putBucket(const document::Bucket& bucket, vespalib::GrowableByteBuffer& buf) const = 0;
+ virtual document::BucketSpace getBucketSpace(document::ByteBuffer& buf) const = 0;
+ virtual void putBucketSpace(document::BucketSpace bucketSpace, vespalib::GrowableByteBuffer& buf) const = 0;
+ virtual api::BucketInfo getBucketInfo(document::ByteBuffer& buf) const = 0;
+ virtual void putBucketInfo(const api::BucketInfo& info, vespalib::GrowableByteBuffer& buf) const = 0;
+
};
}
diff --git a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization4_2.cpp b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization4_2.cpp
index 466ff85f398..74a0c964d19 100644
--- a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization4_2.cpp
+++ b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization4_2.cpp
@@ -20,7 +20,7 @@ namespace storage::mbusprot {
ProtocolSerialization4_2::ProtocolSerialization4_2(
const std::shared_ptr<const document::DocumentTypeRepo>& repo)
- : LegacyProtocolSerialization(repo)
+ : ProtocolSerialization(repo)
{
}
diff --git a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization4_2.h b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization4_2.h
index e4ab36dc989..56aa3d4ed30 100644
--- a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization4_2.h
+++ b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization4_2.h
@@ -1,13 +1,13 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#pragma once
-#include "legacyprotocolserialization.h"
+#include "protocolserialization.h"
namespace storage::mbusprot {
-class ProtocolSerialization4_2 : public LegacyProtocolSerialization {
+class ProtocolSerialization4_2 : public ProtocolSerialization {
public:
- explicit ProtocolSerialization4_2(const std::shared_ptr<const document::DocumentTypeRepo>&);
+ ProtocolSerialization4_2(const std::shared_ptr<const document::DocumentTypeRepo>&);
protected:
void onEncode(GBBuf&, const api::GetCommand&) const override;
diff --git a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization5_0.h b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization5_0.h
index 67f02aa2d2a..042ec7850ef 100644
--- a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization5_0.h
+++ b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization5_0.h
@@ -73,9 +73,6 @@ public:
SRep::UP onDecodeCreateVisitorReply(const SCmd& cmd, BBuf& buf) const override;
void onDecodeCommand(BBuf& buf, api::StorageCommand& msg) const override;
void onDecodeReply(BBuf&, api::StorageReply&) const override;
-
-protected:
- const documentapi::LoadTypeSet& loadTypes() const noexcept { return _loadTypes; };
};
}
diff --git a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.cpp b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.cpp
deleted file mode 100644
index ca77977046c..00000000000
--- a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.cpp
+++ /dev/null
@@ -1,1268 +0,0 @@
-// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-#include "protocolserialization7.h"
-#include "serializationhelper.h"
-#include "protobuf_includes.h"
-
-#include <vespa/document/update/documentupdate.h>
-#include <vespa/document/util/bufferexceptions.h>
-#include <vespa/storageapi/message/bucketsplitting.h>
-#include <vespa/storageapi/message/persistence.h>
-#include <vespa/storageapi/message/removelocation.h>
-#include <vespa/storageapi/message/visitor.h>
-
-namespace storage::mbusprot {
-
-ProtocolSerialization7::ProtocolSerialization7(std::shared_ptr<const document::DocumentTypeRepo> repo,
- const documentapi::LoadTypeSet& load_types)
- : ProtocolSerialization(),
- _repo(std::move(repo)),
- _load_types(load_types)
-{
-}
-
-namespace {
-
-void set_bucket(protobuf::Bucket& dest, const document::Bucket& src) {
- dest.set_raw_bucket_id(src.getBucketId().getRawId());
- dest.set_space_id(src.getBucketSpace().getId());
-}
-
-void set_bucket_id(protobuf::BucketId& dest, const document::BucketId& src) {
- dest.set_raw_id(src.getRawId());
-}
-
-document::BucketId get_bucket_id(const protobuf::BucketId& src) {
- return document::BucketId(src.raw_id());
-}
-
-void set_bucket_space(protobuf::BucketSpace& dest, const document::BucketSpace& src) {
- dest.set_space_id(src.getId());
-}
-
-document::BucketSpace get_bucket_space(const protobuf::BucketSpace& src) {
- return document::BucketSpace(src.space_id());
-}
-
-void set_bucket_info(protobuf::BucketInfo& dest, const api::BucketInfo& src) {
- dest.set_last_modified_timestamp(src.getLastModified());
- dest.set_legacy_checksum(src.getChecksum());
- dest.set_doc_count(src.getDocumentCount());
- dest.set_total_doc_size(src.getTotalDocumentSize());
- dest.set_meta_count(src.getMetaCount());
- dest.set_used_file_size(src.getUsedFileSize());
- dest.set_active(src.isActive());
- dest.set_ready(src.isReady());
-}
-
-document::Bucket get_bucket(const protobuf::Bucket& src) {
- return document::Bucket(document::BucketSpace(src.space_id()),
- document::BucketId(src.raw_bucket_id()));
-}
-
-api::BucketInfo get_bucket_info(const protobuf::BucketInfo& src) {
- api::BucketInfo info;
- info.setLastModified(src.last_modified_timestamp());
- info.setChecksum(src.legacy_checksum());
- info.setDocumentCount(src.doc_count());
- info.setTotalDocumentSize(src.total_doc_size());
- info.setMetaCount(src.meta_count());
- info.setUsedFileSize(src.used_file_size());
- info.setActive(src.active());
- info.setReady(src.ready());
- return info;
-}
-
-documentapi::TestAndSetCondition get_tas_condition(const protobuf::TestAndSetCondition& src) {
- return documentapi::TestAndSetCondition(src.selection());
-}
-
-void set_tas_condition(protobuf::TestAndSetCondition& dest, const documentapi::TestAndSetCondition& src) {
- dest.set_selection(src.getSelection().data(), src.getSelection().size());
-}
-
-std::shared_ptr<document::Document> get_document(const protobuf::Document& src_doc,
- const document::DocumentTypeRepo& type_repo)
-{
- if (!src_doc.payload().empty()) {
- document::ByteBuffer doc_buf(src_doc.payload().data(), src_doc.payload().size());
- return std::make_shared<document::Document>(type_repo, doc_buf);
- }
- return std::shared_ptr<document::Document>();
-}
-
-void set_update(protobuf::Update& dest, const document::DocumentUpdate& src) {
- vespalib::nbostream stream;
- src.serializeHEAD(stream);
- dest.set_payload(stream.peek(), stream.size());
-}
-
-std::shared_ptr<document::DocumentUpdate> get_update(const protobuf::Update& src,
- const document::DocumentTypeRepo& type_repo)
-{
- if (!src.payload().empty()) {
- return document::DocumentUpdate::createHEAD(
- type_repo, vespalib::nbostream(src.payload().data(), src.payload().size()));
- }
- return std::shared_ptr<document::DocumentUpdate>();
-}
-
-void write_request_header(vespalib::GrowableByteBuffer& buf, const api::StorageCommand& cmd) {
- protobuf::RequestHeader hdr; // Arena alloc not needed since there are no nested messages
- hdr.set_message_id(cmd.getMsgId());
- hdr.set_priority(cmd.getPriority());
- hdr.set_source_index(cmd.getSourceIndex());
- hdr.set_loadtype_id(cmd.getLoadType().getId());
-
- uint8_t dest[128]; // Only primitive fields, should be plenty large enough.
- auto encoded_size = static_cast<uint32_t>(hdr.ByteSizeLong());
- assert(encoded_size <= sizeof(dest));
- [[maybe_unused]] bool ok = hdr.SerializeWithCachedSizesToArray(dest);
- assert(ok);
- buf.putInt(encoded_size);
- buf.putBytes(reinterpret_cast<const char*>(dest), encoded_size);
-}
-
-void write_response_header(vespalib::GrowableByteBuffer& buf, const api::StorageReply& reply) {
- protobuf::ResponseHeader hdr; // Arena alloc not needed since there are no nested messages
- const auto& result = reply.getResult();
- hdr.set_return_code_id(static_cast<uint32_t>(result.getResult()));
- if (!result.getMessage().empty()) {
- hdr.set_return_code_message(result.getMessage().data(), result.getMessage().size());
- }
- hdr.set_message_id(reply.getMsgId());
- hdr.set_priority(reply.getPriority());
-
- const auto header_size = hdr.ByteSizeLong();
- assert(header_size <= UINT32_MAX);
- buf.putInt(static_cast<uint32_t>(header_size));
-
- auto* dest_buf = reinterpret_cast<uint8_t*>(buf.allocate(header_size));
- [[maybe_unused]] bool ok = hdr.SerializeWithCachedSizesToArray(dest_buf);
- assert(ok);
-}
-
-void decode_request_header(document::ByteBuffer& buf, protobuf::RequestHeader& hdr) {
- auto hdr_len = static_cast<uint32_t>(SerializationHelper::getInt(buf));
- if (hdr_len > buf.getRemaining()) {
- throw document::BufferOutOfBoundsException(buf.getPos(), hdr_len);
- }
- bool ok = hdr.ParseFromArray(buf.getBufferAtPos(), hdr_len);
- if (!ok) {
- throw vespalib::IllegalArgumentException("Malformed protobuf request header");
- }
- buf.incPos(hdr_len);
-}
-
-void decode_response_header(document::ByteBuffer& buf, protobuf::ResponseHeader& hdr) {
- auto hdr_len = static_cast<uint32_t>(SerializationHelper::getInt(buf));
- if (hdr_len > buf.getRemaining()) {
- throw document::BufferOutOfBoundsException(buf.getPos(), hdr_len);
- }
- bool ok = hdr.ParseFromArray(buf.getBufferAtPos(), hdr_len);
- if (!ok) {
- throw vespalib::IllegalArgumentException("Malformed protobuf response header");
- }
- buf.incPos(hdr_len);
-}
-
-} // anonymous namespace
-
-template <typename ProtobufType>
-class BaseEncoder {
- vespalib::GrowableByteBuffer& _out_buf;
- ::google::protobuf::Arena _arena;
- ProtobufType* _proto_obj;
-public:
- explicit BaseEncoder(vespalib::GrowableByteBuffer& out_buf)
- : _out_buf(out_buf),
- _arena(),
- _proto_obj(::google::protobuf::Arena::Create<ProtobufType>(&_arena))
- {
- }
-
- void encode() {
- assert(_proto_obj != nullptr);
- const auto sz = _proto_obj->ByteSizeLong();
- assert(sz <= UINT32_MAX);
- auto* buf = reinterpret_cast<uint8_t*>(_out_buf.allocate(sz));
- [[maybe_unused]] bool ok = _proto_obj->SerializeWithCachedSizesToArray(buf);
- assert(ok);
- _proto_obj = nullptr;
- }
-protected:
- vespalib::GrowableByteBuffer& buffer() noexcept { return _out_buf; }
-
- // Precondition: encode() is not called
- ProtobufType& proto_obj() noexcept { return *_proto_obj; }
- const ProtobufType& proto_obj() const noexcept { return *_proto_obj; }
-};
-
-template <typename ProtobufType>
-class RequestEncoder : public BaseEncoder<ProtobufType> {
-public:
- RequestEncoder(vespalib::GrowableByteBuffer& out_buf, const api::StorageCommand& cmd)
- : BaseEncoder<ProtobufType>(out_buf)
- {
- write_request_header(out_buf, cmd);
- }
-
- // Precondition: encode() is not called
- ProtobufType& request() noexcept { return this->proto_obj(); }
- const ProtobufType& request() const noexcept { return this->proto_obj(); }
-};
-
-template <typename ProtobufType>
-class ResponseEncoder : public BaseEncoder<ProtobufType> {
-public:
- ResponseEncoder(vespalib::GrowableByteBuffer& out_buf, const api::StorageReply& reply)
- : BaseEncoder<ProtobufType>(out_buf)
- {
- write_response_header(out_buf, reply);
- }
-
- // Precondition: encode() is not called
- ProtobufType& response() noexcept { return this->proto_obj(); }
- const ProtobufType& response() const noexcept { return this->proto_obj(); }
-};
-
-template <typename ProtobufType>
-class RequestDecoder {
- protobuf::RequestHeader _hdr;
- ::google::protobuf::Arena _arena;
- ProtobufType* _proto_obj;
- const documentapi::LoadTypeSet& _load_types;
-public:
- RequestDecoder(document::ByteBuffer& in_buf, const documentapi::LoadTypeSet& load_types)
- : _arena(),
- _proto_obj(::google::protobuf::Arena::Create<ProtobufType>(&_arena)),
- _load_types(load_types)
- {
- decode_request_header(in_buf, _hdr);
- assert(in_buf.getRemaining() <= INT_MAX);
- bool ok = _proto_obj->ParseFromArray(in_buf.getBufferAtPos(), in_buf.getRemaining());
- if (!ok) {
- throw vespalib::IllegalArgumentException(
- vespalib::make_string("Malformed protobuf request payload for %s",
- ProtobufType::descriptor()->full_name().c_str()));
- }
- }
-
- void transfer_meta_information_to(api::StorageCommand& dest) {
- dest.forceMsgId(_hdr.message_id());
- dest.setPriority(static_cast<uint8_t>(_hdr.priority()));
- dest.setSourceIndex(static_cast<uint16_t>(_hdr.source_index()));
- dest.setLoadType(_load_types[_hdr.loadtype_id()]);
- }
-
- ProtobufType& request() noexcept { return *_proto_obj; }
- const ProtobufType& request() const noexcept { return *_proto_obj; }
-};
-
-template <typename ProtobufType>
-class ResponseDecoder {
- protobuf::ResponseHeader _hdr;
- ::google::protobuf::Arena _arena;
- ProtobufType* _proto_obj;
-public:
- explicit ResponseDecoder(document::ByteBuffer& in_buf)
- : _arena(),
- _proto_obj(::google::protobuf::Arena::Create<ProtobufType>(&_arena))
- {
- decode_response_header(in_buf, _hdr);
- assert(in_buf.getRemaining() <= INT_MAX);
- bool ok = _proto_obj->ParseFromArray(in_buf.getBufferAtPos(), in_buf.getRemaining());
- if (!ok) {
- throw vespalib::IllegalArgumentException(
- vespalib::make_string("Malformed protobuf response payload for %s",
- ProtobufType::descriptor()->full_name().c_str()));
- }
- }
-
- ProtobufType& response() noexcept { return *_proto_obj; }
- const ProtobufType& response() const noexcept { return *_proto_obj; }
-};
-
-template <typename ProtobufType, typename Func>
-void encode_request(vespalib::GrowableByteBuffer& out_buf, const api::StorageCommand& msg, Func&& f) {
- RequestEncoder<ProtobufType> enc(out_buf, msg);
- f(enc.request());
- enc.encode();
-}
-
-template <typename ProtobufType, typename Func>
-void encode_response(vespalib::GrowableByteBuffer& out_buf, const api::StorageReply& reply, Func&& f) {
- ResponseEncoder<ProtobufType> enc(out_buf, reply);
- auto& res = enc.response();
- f(res);
- enc.encode();
-}
-
-template <typename ProtobufType, typename Func>
-std::unique_ptr<api::StorageCommand>
-ProtocolSerialization7::decode_request(document::ByteBuffer& in_buf, Func&& f) const {
- RequestDecoder<ProtobufType> dec(in_buf, _load_types);
- const auto& req = dec.request();
- auto cmd = f(req);
- dec.transfer_meta_information_to(*cmd);
- return cmd;
-}
-
-template <typename ProtobufType, typename Func>
-std::unique_ptr<api::StorageReply>
-ProtocolSerialization7::decode_response(document::ByteBuffer& in_buf, Func&& f) const {
- ResponseDecoder<ProtobufType> dec(in_buf);
- const auto& res = dec.response();
- auto reply = f(res);
- return reply;
-}
-
-template <typename ProtobufType, typename Func>
-void encode_bucket_request(vespalib::GrowableByteBuffer& out_buf, const api::BucketCommand& msg, Func&& f) {
- encode_request<ProtobufType>(out_buf, msg, [&](ProtobufType& req) {
- set_bucket(*req.mutable_bucket(), msg.getBucket());
- f(req);
- });
-}
-
-template <typename ProtobufType, typename Func>
-std::unique_ptr<api::StorageCommand>
-ProtocolSerialization7::decode_bucket_request(document::ByteBuffer& in_buf, Func&& f) const {
- return decode_request<ProtobufType>(in_buf, [&](const ProtobufType& req) {
- if (!req.has_bucket()) {
- throw vespalib::IllegalArgumentException(
- vespalib::make_string("Malformed protocol buffer request for %s; no bucket",
- ProtobufType::descriptor()->full_name().c_str()));
- }
- const auto bucket = get_bucket(req.bucket());
- return f(req, bucket);
- });
-}
-
-template <typename ProtobufType, typename Func>
-void encode_bucket_response(vespalib::GrowableByteBuffer& out_buf, const api::BucketReply& reply, Func&& f) {
- encode_response<ProtobufType>(out_buf, reply, [&](ProtobufType& res) {
- if (reply.hasBeenRemapped()) {
- set_bucket_id(*res.mutable_remapped_bucket_id(), reply.getBucketId());
- }
- f(res);
- });
-}
-
-template <typename ProtobufType, typename Func>
-std::unique_ptr<api::StorageReply>
-ProtocolSerialization7::decode_bucket_response(document::ByteBuffer& in_buf, Func&& f) const {
- return decode_response<ProtobufType>(in_buf, [&](const ProtobufType& res) {
- auto reply = f(res);
- if (res.has_remapped_bucket_id()) {
- reply->remapBucketId(get_bucket_id(res.remapped_bucket_id()));
- }
- return reply;
- });
-}
-
-template <typename ProtobufType, typename Func>
-void encode_bucket_info_response(vespalib::GrowableByteBuffer& out_buf, const api::BucketInfoReply& reply, Func&& f) {
- encode_bucket_response<ProtobufType>(out_buf, reply, [&](ProtobufType& res) {
- set_bucket_info(*res.mutable_bucket_info(), reply.getBucketInfo());
- f(res);
- });
-}
-
-template <typename ProtobufType, typename Func>
-std::unique_ptr<api::StorageReply>
-ProtocolSerialization7::decode_bucket_info_response(document::ByteBuffer& in_buf, Func&& f) const {
- return decode_bucket_response<ProtobufType>(in_buf, [&](const ProtobufType& res) {
- auto reply = f(res);
- if (res.has_bucket_info()) {
- reply->setBucketInfo(get_bucket_info(res.bucket_info()));
- }
- return reply;
- });
-}
-
-// TODO document protobuf ducktyping assumptions
-
-namespace {
-// Inherit from known base class just to avoid having to template this. We don't care about its subtype anyway.
-void no_op_encode([[maybe_unused]] ::google::protobuf::Message&) {
- // nothing to do here.
-}
-
-void set_document(protobuf::Document& target_doc, const document::Document& src_doc) {
- vespalib::nbostream stream;
- src_doc.serialize(stream);
- target_doc.set_payload(stream.peek(), stream.size());
-}
-
-}
-
-// -----------------------------------------------------------------
-// Put
-// -----------------------------------------------------------------
-
-void ProtocolSerialization7::onEncode(GBBuf& buf, const api::PutCommand& msg) const {
- encode_bucket_request<protobuf::PutRequest>(buf, msg, [&](auto& req) {
- req.set_new_timestamp(msg.getTimestamp());
- req.set_expected_old_timestamp(msg.getUpdateTimestamp());
- if (msg.getCondition().isPresent()) {
- set_tas_condition(*req.mutable_condition(), msg.getCondition());
- }
- if (msg.getDocument()) {
- set_document(*req.mutable_document(), *msg.getDocument());
- }
- });
-}
-
-void ProtocolSerialization7::onEncode(GBBuf& buf, const api::PutReply& msg) const {
- encode_bucket_info_response<protobuf::PutResponse>(buf, msg, [&](auto& res) {
- res.set_was_found(msg.wasFound());
- });
-}
-
-api::StorageCommand::UP ProtocolSerialization7::onDecodePutCommand(BBuf& buf) const {
- return decode_bucket_request<protobuf::PutRequest>(buf, [&](auto& req, auto& bucket) {
- auto document = get_document(req.document(), type_repo());
- auto cmd = std::make_unique<api::PutCommand>(bucket, std::move(document), req.new_timestamp());
- cmd->setUpdateTimestamp(req.expected_old_timestamp());
- if (req.has_condition()) {
- cmd->setCondition(get_tas_condition(req.condition()));
- }
- return cmd;
- });
-}
-
-api::StorageReply::UP ProtocolSerialization7::onDecodePutReply(const SCmd& cmd, BBuf& buf) const {
- return decode_bucket_info_response<protobuf::PutResponse>(buf, [&](auto& res) {
- return std::make_unique<api::PutReply>(static_cast<const api::PutCommand&>(cmd), res.was_found());
- });
-}
-
-// -----------------------------------------------------------------
-// Update
-// -----------------------------------------------------------------
-
-void ProtocolSerialization7::onEncode(GBBuf& buf, const api::UpdateCommand& msg) const {
- encode_bucket_request<protobuf::UpdateRequest>(buf, msg, [&](auto& req) {
- auto* update = msg.getUpdate().get();
- if (update) {
- set_update(*req.mutable_update(), *update);
- }
- req.set_new_timestamp(msg.getTimestamp());
- req.set_expected_old_timestamp(msg.getOldTimestamp());
- if (msg.getCondition().isPresent()) {
- set_tas_condition(*req.mutable_condition(), msg.getCondition());
- }
- });
-}
-
-void ProtocolSerialization7::onEncode(GBBuf& buf, const api::UpdateReply& msg) const {
- encode_bucket_info_response<protobuf::UpdateResponse>(buf, msg, [&](auto& res) {
- res.set_updated_timestamp(msg.getOldTimestamp());
- });
-}
-
-api::StorageCommand::UP ProtocolSerialization7::onDecodeUpdateCommand(BBuf& buf) const {
- return decode_bucket_request<protobuf::UpdateRequest>(buf, [&](auto& req, auto& bucket) {
- auto update = get_update(req.update(), type_repo());
- auto cmd = std::make_unique<api::UpdateCommand>(bucket, std::move(update), req.new_timestamp());
- cmd->setOldTimestamp(req.expected_old_timestamp());
- if (req.has_condition()) {
- cmd->setCondition(get_tas_condition(req.condition()));
- }
- return cmd;
- });
-}
-
-api::StorageReply::UP ProtocolSerialization7::onDecodeUpdateReply(const SCmd& cmd, BBuf& buf) const {
- return decode_bucket_info_response<protobuf::UpdateResponse>(buf, [&](auto& res) {
- return std::make_unique<api::UpdateReply>(static_cast<const api::UpdateCommand&>(cmd),
- res.updated_timestamp());
- });
-}
-
-// -----------------------------------------------------------------
-// Remove
-// -----------------------------------------------------------------
-
-void ProtocolSerialization7::onEncode(GBBuf& buf, const api::RemoveCommand& msg) const {
- encode_bucket_request<protobuf::RemoveRequest>(buf, msg, [&](auto& req) {
- auto doc_id_str = msg.getDocumentId().toString();
- req.set_document_id(doc_id_str.data(), doc_id_str.size());
- req.set_new_timestamp(msg.getTimestamp());
- if (msg.getCondition().isPresent()) {
- set_tas_condition(*req.mutable_condition(), msg.getCondition());
- }
- });
-}
-
-void ProtocolSerialization7::onEncode(GBBuf& buf, const api::RemoveReply& msg) const {
- encode_bucket_info_response<protobuf::RemoveResponse>(buf, msg, [&](auto& res) {
- res.set_removed_timestamp(msg.getOldTimestamp());
- });
-}
-
-api::StorageCommand::UP ProtocolSerialization7::onDecodeRemoveCommand(BBuf& buf) const {
- return decode_bucket_request<protobuf::RemoveRequest>(buf, [&](auto& req, auto& bucket) {
- document::DocumentId doc_id(vespalib::stringref(req.document_id().data(), req.document_id().size()));
- auto cmd = std::make_unique<api::RemoveCommand>(bucket, doc_id, req.new_timestamp());
- if (req.has_condition()) {
- cmd->setCondition(get_tas_condition(req.condition()));
- }
- return cmd;
- });
-}
-
-api::StorageReply::UP ProtocolSerialization7::onDecodeRemoveReply(const SCmd& cmd, BBuf& buf) const {
- return decode_bucket_info_response<protobuf::RemoveResponse>(buf, [&](auto& res) {
- return std::make_unique<api::RemoveReply>(static_cast<const api::RemoveCommand&>(cmd),
- res.removed_timestamp());
- });
-}
-
-// -----------------------------------------------------------------
-// Get
-// -----------------------------------------------------------------
-
-void ProtocolSerialization7::onEncode(GBBuf& buf, const api::GetCommand& msg) const {
- encode_bucket_request<protobuf::GetRequest>(buf, msg, [&](auto& req) {
- auto doc_id = msg.getDocumentId().toString();
- req.set_document_id(doc_id.data(), doc_id.size());
- req.set_before_timestamp(msg.getBeforeTimestamp());
- if (!msg.getFieldSet().empty()) {
- req.set_field_set(msg.getFieldSet().data(), msg.getFieldSet().size());
- }
- });
-}
-
-void ProtocolSerialization7::onEncode(GBBuf& buf, const api::GetReply& msg) const {
- encode_bucket_info_response<protobuf::GetResponse>(buf, msg, [&](auto& res) {
- if (msg.getDocument()) {
- set_document(*res.mutable_document(), *msg.getDocument());
- }
- res.set_last_modified_timestamp(msg.getLastModifiedTimestamp());
- });
-}
-
-api::StorageCommand::UP ProtocolSerialization7::onDecodeGetCommand(BBuf& buf) const {
- return decode_bucket_request<protobuf::GetRequest>(buf, [&](auto& req, auto& bucket) {
- document::DocumentId doc_id(vespalib::stringref(req.document_id().data(), req.document_id().size()));
- return std::make_unique<api::GetCommand>(bucket, std::move(doc_id),
- req.field_set(), req.before_timestamp());
- });
-}
-
-api::StorageReply::UP ProtocolSerialization7::onDecodeGetReply(const SCmd& cmd, BBuf& buf) const {
- return decode_bucket_info_response<protobuf::GetResponse>(buf, [&](auto& res) {
- try {
- auto document = get_document(res.document(), type_repo());
- return std::make_unique<api::GetReply>(static_cast<const api::GetCommand&>(cmd),
- std::move(document), res.last_modified_timestamp());
- } catch (std::exception& e) {
- auto reply = std::make_unique<api::GetReply>(static_cast<const api::GetCommand&>(cmd),
- std::shared_ptr<document::Document>(), 0u);
- reply->setResult(api::ReturnCode(api::ReturnCode::UNPARSEABLE, e.what()));
- return reply;
- }
- });
-}
-
-// -----------------------------------------------------------------
-// Revert
-// -----------------------------------------------------------------
-
-void ProtocolSerialization7::onEncode(GBBuf& buf, const api::RevertCommand& msg) const {
- encode_bucket_request<protobuf::RevertRequest>(buf, msg, [&](auto& req) {
- auto* tokens = req.mutable_revert_tokens();
- assert(msg.getRevertTokens().size() <= INT_MAX);
- tokens->Reserve(static_cast<int>(msg.getRevertTokens().size()));
- for (auto token : msg.getRevertTokens()) {
- tokens->Add(token);
- }
- });
-}
-
-void ProtocolSerialization7::onEncode(GBBuf& buf, const api::RevertReply& msg) const {
- encode_bucket_info_response<protobuf::RevertResponse>(buf, msg, no_op_encode);
-}
-
-api::StorageCommand::UP ProtocolSerialization7::onDecodeRevertCommand(BBuf& buf) const {
- return decode_bucket_request<protobuf::RevertRequest>(buf, [&](auto& req, auto& bucket) {
- std::vector<api::Timestamp> tokens;
- tokens.reserve(req.revert_tokens_size());
- for (auto token : req.revert_tokens()) {
- tokens.emplace_back(api::Timestamp(token));
- }
- return std::make_unique<api::RevertCommand>(bucket, std::move(tokens));
- });
-}
-
-api::StorageReply::UP ProtocolSerialization7::onDecodeRevertReply(const SCmd& cmd, BBuf& buf) const {
- return decode_bucket_info_response<protobuf::RevertResponse>(buf, [&]([[maybe_unused]] auto& res) {
- return std::make_unique<api::RevertReply>(static_cast<const api::RevertCommand&>(cmd));
- });
-}
-
-// -----------------------------------------------------------------
-// RemoveLocation
-// -----------------------------------------------------------------
-
-void ProtocolSerialization7::onEncode(GBBuf& buf, const api::RemoveLocationCommand& msg) const {
- encode_bucket_request<protobuf::RemoveLocationRequest>(buf, msg, [&](auto& req) {
- req.set_document_selection(msg.getDocumentSelection().data(), msg.getDocumentSelection().size());
- });
-}
-
-void ProtocolSerialization7::onEncode(GBBuf& buf, const api::RemoveLocationReply& msg) const {
- encode_bucket_info_response<protobuf::RemoveLocationResponse>(buf, msg, no_op_encode);
-}
-
-api::StorageCommand::UP ProtocolSerialization7::onDecodeRemoveLocationCommand(BBuf& buf) const {
- return decode_bucket_request<protobuf::RemoveLocationRequest>(buf, [&](auto& req, auto& bucket) {
- return std::make_unique<api::RemoveLocationCommand>(req.document_selection(), bucket);
- });
-}
-
-api::StorageReply::UP ProtocolSerialization7::onDecodeRemoveLocationReply(const SCmd& cmd, BBuf& buf) const {
- return decode_bucket_info_response<protobuf::RemoveLocationResponse>(buf, [&]([[maybe_unused]] auto& res) {
- return std::make_unique<api::RemoveLocationReply>(static_cast<const api::RemoveLocationCommand&>(cmd));
- });
-}
-
-// -----------------------------------------------------------------
-// DeleteBucket
-// -----------------------------------------------------------------
-
-void ProtocolSerialization7::onEncode(GBBuf& buf, const api::DeleteBucketCommand& msg) const {
- encode_bucket_request<protobuf::DeleteBucketRequest>(buf, msg, [&](auto& req) {
- set_bucket_info(*req.mutable_expected_bucket_info(), msg.getBucketInfo());
- });
-}
-
-void ProtocolSerialization7::onEncode(GBBuf& buf, const api::DeleteBucketReply& msg) const {
- encode_bucket_info_response<protobuf::DeleteBucketResponse>(buf, msg, no_op_encode);
-}
-
-api::StorageCommand::UP ProtocolSerialization7::onDecodeDeleteBucketCommand(BBuf& buf) const {
- return decode_bucket_request<protobuf::DeleteBucketRequest>(buf, [&](auto& req, auto& bucket) {
- auto cmd = std::make_unique<api::DeleteBucketCommand>(bucket);
- if (req.has_expected_bucket_info()) {
- cmd->setBucketInfo(get_bucket_info(req.expected_bucket_info()));
- }
- return cmd;
- });
-}
-
-api::StorageReply::UP ProtocolSerialization7::onDecodeDeleteBucketReply(const SCmd& cmd, BBuf& buf) const {
- return decode_bucket_info_response<protobuf::DeleteBucketResponse>(buf, [&]([[maybe_unused]] auto& res) {
- return std::make_unique<api::DeleteBucketReply>(static_cast<const api::DeleteBucketCommand&>(cmd));
- });
-}
-
-// -----------------------------------------------------------------
-// CreateBucket
-// -----------------------------------------------------------------
-
-void ProtocolSerialization7::onEncode(GBBuf& buf, const api::CreateBucketCommand& msg) const {
- encode_bucket_request<protobuf::CreateBucketRequest>(buf, msg, [&](auto& req) {
- req.set_create_as_active(msg.getActive());
- });
-}
-
-void ProtocolSerialization7::onEncode(GBBuf& buf, const api::CreateBucketReply& msg) const {
- encode_bucket_info_response<protobuf::CreateBucketResponse>(buf, msg, no_op_encode);
-}
-
-api::StorageCommand::UP ProtocolSerialization7::onDecodeCreateBucketCommand(BBuf& buf) const {
- return decode_bucket_request<protobuf::CreateBucketRequest>(buf, [&](auto& req, auto& bucket) {
- auto cmd = std::make_unique<api::CreateBucketCommand>(bucket);
- cmd->setActive(req.create_as_active());
- return cmd;
- });
-}
-
-api::StorageReply::UP ProtocolSerialization7::onDecodeCreateBucketReply(const SCmd& cmd, BBuf& buf) const {
- return decode_bucket_info_response<protobuf::CreateBucketResponse>(buf, [&]([[maybe_unused]] auto& res) {
- return std::make_unique<api::CreateBucketReply>(static_cast<const api::CreateBucketCommand&>(cmd));
- });
-}
-
-// -----------------------------------------------------------------
-// MergeBucket
-// -----------------------------------------------------------------
-
-namespace {
-
-void set_merge_nodes(::google::protobuf::RepeatedPtrField<protobuf::MergeNode>& dest,
- const std::vector<api::MergeBucketCommand::Node>& src)
-{
- dest.Reserve(src.size());
- for (const auto& src_node : src) {
- auto* dest_node = dest.Add();
- dest_node->set_index(src_node.index);
- dest_node->set_source_only(src_node.sourceOnly);
- }
-}
-
-std::vector<api::MergeBucketCommand::Node> get_merge_nodes(
- const ::google::protobuf::RepeatedPtrField<protobuf::MergeNode>& src)
-{
- std::vector<api::MergeBucketCommand::Node> nodes;
- nodes.reserve(src.size());
- for (const auto& node : src) {
- nodes.emplace_back(node.index(), node.source_only());
- }
- return nodes;
-}
-
-}
-
-void ProtocolSerialization7::onEncode(GBBuf& buf, const api::MergeBucketCommand& msg) const {
- encode_bucket_request<protobuf::MergeBucketRequest>(buf, msg, [&](auto& req) {
- set_merge_nodes(*req.mutable_nodes(), msg.getNodes());
- req.set_max_timestamp(msg.getMaxTimestamp());
- req.set_cluster_state_version(msg.getClusterStateVersion());
- for (uint16_t chain_node : msg.getChain()) {
- req.add_node_chain(chain_node);
- }
- });
-}
-
-void ProtocolSerialization7::onEncode(GBBuf& buf, const api::MergeBucketReply& msg) const {
- encode_bucket_response<protobuf::MergeBucketResponse>(buf, msg, no_op_encode);
-}
-
-api::StorageCommand::UP ProtocolSerialization7::onDecodeMergeBucketCommand(BBuf& buf) const {
- return decode_bucket_request<protobuf::MergeBucketRequest>(buf, [&](auto& req, auto& bucket) {
- auto nodes = get_merge_nodes(req.nodes());
- auto cmd = std::make_unique<api::MergeBucketCommand>(bucket, std::move(nodes), req.max_timestamp());
- cmd->setClusterStateVersion(req.cluster_state_version());
- std::vector<uint16_t> chain;
- chain.reserve(req.node_chain_size());
- for (uint16_t node : req.node_chain()) {
- chain.emplace_back(node);
- }
- cmd->setChain(std::move(chain));
- return cmd;
- });
-}
-
-api::StorageReply::UP ProtocolSerialization7::onDecodeMergeBucketReply(const SCmd& cmd, BBuf& buf) const {
- return decode_bucket_response<protobuf::MergeBucketResponse>(buf, [&]([[maybe_unused]] auto& res) {
- return std::make_unique<api::MergeBucketReply>(static_cast<const api::MergeBucketCommand&>(cmd));
- });
-}
-
-// -----------------------------------------------------------------
-// GetBucketDiff
-// -----------------------------------------------------------------
-
-namespace {
-
-void set_global_id(protobuf::GlobalId& dest, const document::GlobalId& src) {
- static_assert(document::GlobalId::LENGTH == 12);
- uint64_t lo64;
- uint32_t hi32;
- memcpy(&lo64, src.get(), sizeof(uint64_t));
- memcpy(&hi32, src.get() + sizeof(uint64_t), sizeof(uint32_t));
- dest.set_lo_64(lo64);
- dest.set_hi_32(hi32);
-}
-
-document::GlobalId get_global_id(const protobuf::GlobalId& src) {
- static_assert(document::GlobalId::LENGTH == 12);
- const uint64_t lo64 = src.lo_64();
- const uint32_t hi32 = src.hi_32();
-
- char buf[document::GlobalId::LENGTH];
- memcpy(buf, &lo64, sizeof(uint64_t));
- memcpy(buf + sizeof(uint64_t), &hi32, sizeof(uint32_t));
- return document::GlobalId(buf);
-}
-
-void set_diff_entry(protobuf::MetaDiffEntry& dest, const api::GetBucketDiffCommand::Entry& src) {
- dest.set_timestamp(src._timestamp);
- set_global_id(*dest.mutable_gid(), src._gid);
- dest.set_header_size(src._headerSize);
- dest.set_body_size(src._bodySize);
- dest.set_flags(src._flags);
- dest.set_presence_mask(src._hasMask);
-}
-
-api::GetBucketDiffCommand::Entry get_diff_entry(const protobuf::MetaDiffEntry& src) {
- api::GetBucketDiffCommand::Entry e;
- e._timestamp = src.timestamp();
- e._gid = get_global_id(src.gid());
- e._headerSize = src.header_size();
- e._bodySize = src.body_size();
- e._flags = src.flags();
- e._hasMask = src.presence_mask();
- return e;
-}
-
-void fill_proto_meta_diff(::google::protobuf::RepeatedPtrField<protobuf::MetaDiffEntry>& dest,
- const std::vector<api::GetBucketDiffCommand::Entry>& src) {
- for (const auto& diff_entry : src) {
- set_diff_entry(*dest.Add(), diff_entry);
- }
-}
-
-void fill_api_meta_diff(std::vector<api::GetBucketDiffCommand::Entry>& dest,
- const ::google::protobuf::RepeatedPtrField<protobuf::MetaDiffEntry>& src) {
- // FIXME GetBucketDiffReply ctor copies the diff from the request for some reason
- // TODO verify this isn't actually used anywhere and remove this "feature".
- dest.clear();
- dest.reserve(src.size());
- for (const auto& diff_entry : src) {
- dest.emplace_back(get_diff_entry(diff_entry));
- }
-}
-
-} // anonymous namespace
-
-void ProtocolSerialization7::onEncode(GBBuf& buf, const api::GetBucketDiffCommand& msg) const {
- encode_bucket_request<protobuf::GetBucketDiffRequest>(buf, msg, [&](auto& req) {
- set_merge_nodes(*req.mutable_nodes(), msg.getNodes());
- req.set_max_timestamp(msg.getMaxTimestamp());
- fill_proto_meta_diff(*req.mutable_diff(), msg.getDiff());
- });
-}
-
-void ProtocolSerialization7::onEncode(GBBuf& buf, const api::GetBucketDiffReply& msg) const {
- encode_bucket_response<protobuf::GetBucketDiffResponse>(buf, msg, [&](auto& res) {
- fill_proto_meta_diff(*res.mutable_diff(), msg.getDiff());
- });
-}
-
-api::StorageCommand::UP ProtocolSerialization7::onDecodeGetBucketDiffCommand(BBuf& buf) const {
- return decode_bucket_request<protobuf::GetBucketDiffRequest>(buf, [&](auto& req, auto& bucket) {
- auto nodes = get_merge_nodes(req.nodes());
- auto cmd = std::make_unique<api::GetBucketDiffCommand>(bucket, std::move(nodes), req.max_timestamp());
- fill_api_meta_diff(cmd->getDiff(), req.diff());
- return cmd;
- });
-}
-
-api::StorageReply::UP ProtocolSerialization7::onDecodeGetBucketDiffReply(const SCmd& cmd, BBuf& buf) const {
- return decode_bucket_response<protobuf::GetBucketDiffResponse>(buf, [&](auto& res) {
- auto reply = std::make_unique<api::GetBucketDiffReply>(static_cast<const api::GetBucketDiffCommand&>(cmd));
- fill_api_meta_diff(reply->getDiff(), res.diff());
- return reply;
- });
-}
-
-// -----------------------------------------------------------------
-// ApplyBucketDiff
-// -----------------------------------------------------------------
-
-namespace {
-
-void fill_api_apply_diff_vector(std::vector<api::ApplyBucketDiffCommand::Entry>& diff,
- const ::google::protobuf::RepeatedPtrField<protobuf::ApplyDiffEntry>& src)
-{
- // We use the same approach as the legacy protocols here in that we pre-reserve and
- // directly write into the vector. This avoids having to ensure all buffer management is movable.
- size_t n_entries = src.size();
- diff.resize(n_entries);
- for (size_t i = 0; i < n_entries; ++i) {
- auto& proto_entry = src.Get(i);
- auto& dest = diff[i];
- dest._entry = get_diff_entry(proto_entry.entry_meta());
- dest._docName = proto_entry.document_id();
- // TODO consider making buffers std::strings instead to avoid explicit zeroing-on-resize overhead
- dest._headerBlob.resize(proto_entry.header_blob().size());
- memcpy(dest._headerBlob.data(), proto_entry.header_blob().data(), proto_entry.header_blob().size());
- dest._bodyBlob.resize(proto_entry.body_blob().size());
- memcpy(dest._bodyBlob.data(), proto_entry.body_blob().data(), proto_entry.body_blob().size());
- }
-}
-
-void fill_proto_apply_diff_vector(::google::protobuf::RepeatedPtrField<protobuf::ApplyDiffEntry>& dest,
- const std::vector<api::ApplyBucketDiffCommand::Entry>& src)
-{
- dest.Reserve(src.size());
- for (const auto& entry : src) {
- auto* proto_entry = dest.Add();
- set_diff_entry(*proto_entry->mutable_entry_meta(), entry._entry);
- proto_entry->set_document_id(entry._docName.data(), entry._docName.size());
- proto_entry->set_header_blob(entry._headerBlob.data(), entry._headerBlob.size());
- proto_entry->set_body_blob(entry._bodyBlob.data(), entry._bodyBlob.size());
- }
-}
-
-} // anonymous namespace
-
-void ProtocolSerialization7::onEncode(GBBuf& buf, const api::ApplyBucketDiffCommand& msg) const {
- encode_bucket_request<protobuf::ApplyBucketDiffRequest>(buf, msg, [&](auto& req) {
- set_merge_nodes(*req.mutable_nodes(), msg.getNodes());
- req.set_max_buffer_size(msg.getMaxBufferSize());
- fill_proto_apply_diff_vector(*req.mutable_entries(), msg.getDiff());
- });
-}
-
-void ProtocolSerialization7::onEncode(GBBuf& buf, const api::ApplyBucketDiffReply& msg) const {
- encode_bucket_response<protobuf::ApplyBucketDiffResponse>(buf, msg, [&](auto& res) {
- fill_proto_apply_diff_vector(*res.mutable_entries(), msg.getDiff());
- });
-}
-
-api::StorageCommand::UP ProtocolSerialization7::onDecodeApplyBucketDiffCommand(BBuf& buf) const {
- return decode_bucket_request<protobuf::ApplyBucketDiffRequest>(buf, [&](auto& req, auto& bucket) {
- auto nodes = get_merge_nodes(req.nodes());
- auto cmd = std::make_unique<api::ApplyBucketDiffCommand>(bucket, std::move(nodes), req.max_buffer_size());
- fill_api_apply_diff_vector(cmd->getDiff(), req.entries());
- return cmd;
- });
-}
-
-api::StorageReply::UP ProtocolSerialization7::onDecodeApplyBucketDiffReply(const SCmd& cmd, BBuf& buf) const {
- return decode_bucket_response<protobuf::ApplyBucketDiffResponse>(buf, [&](auto& res) {
- auto reply = std::make_unique<api::ApplyBucketDiffReply>(static_cast<const api::ApplyBucketDiffCommand&>(cmd));
- fill_api_apply_diff_vector(reply->getDiff(), res.entries());
- return reply;
- });
-}
-
-// -----------------------------------------------------------------
-// RequestBucketInfo
-// -----------------------------------------------------------------
-
-void ProtocolSerialization7::onEncode(GBBuf& buf, const api::RequestBucketInfoCommand& msg) const {
- encode_request<protobuf::RequestBucketInfoRequest>(buf, msg, [&](auto& req) {
- set_bucket_space(*req.mutable_bucket_space(), msg.getBucketSpace());
- auto& buckets = msg.getBuckets();
- if (!buckets.empty()) {
- auto* proto_buckets = req.mutable_explicit_bucket_set();
- for (const auto& b : buckets) {
- set_bucket_id(*proto_buckets->add_bucket_ids(), b);
- }
- } else {
- auto* all_buckets = req.mutable_all_buckets();
- auto cluster_state = msg.getSystemState().toString();
- all_buckets->set_distributor_index(msg.getDistributor());
- all_buckets->set_cluster_state(cluster_state.data(), cluster_state.size());
- all_buckets->set_distribution_hash(msg.getDistributionHash().data(), msg.getDistributionHash().size());
- }
- });
-}
-
-void ProtocolSerialization7::onEncode(GBBuf& buf, const api::RequestBucketInfoReply& msg) const {
- encode_response<protobuf::RequestBucketInfoResponse>(buf, msg, [&](auto& res) {
- auto* proto_info = res.mutable_bucket_infos();
- proto_info->Reserve(msg.getBucketInfo().size());
- for (const auto& entry : msg.getBucketInfo()) {
- auto* bucket_and_info = proto_info->Add();
- bucket_and_info->set_raw_bucket_id(entry._bucketId.getRawId());
- set_bucket_info(*bucket_and_info->mutable_bucket_info(), entry._info);
- }
- });
-}
-
-api::StorageCommand::UP ProtocolSerialization7::onDecodeRequestBucketInfoCommand(BBuf& buf) const {
- return decode_request<protobuf::RequestBucketInfoRequest>(buf, [&](auto& req) {
- auto bucket_space = get_bucket_space(req.bucket_space());
- if (req.has_explicit_bucket_set()) {
- const uint32_t n_buckets = req.explicit_bucket_set().bucket_ids_size();
- std::vector<document::BucketId> buckets(n_buckets);
- const auto& proto_buckets = req.explicit_bucket_set().bucket_ids();
- for (uint32_t i = 0; i < n_buckets; ++i) {
- buckets[i] = get_bucket_id(proto_buckets.Get(i));
- }
- return std::make_unique<api::RequestBucketInfoCommand>(bucket_space, std::move(buckets));
- } else if (req.has_all_buckets()) {
- const auto& all_req = req.all_buckets();
- return std::make_unique<api::RequestBucketInfoCommand>(
- bucket_space, all_req.distributor_index(),
- lib::ClusterState(all_req.cluster_state()), all_req.distribution_hash());
- } else {
- throw vespalib::IllegalArgumentException("RequestBucketInfo does not have any applicable fields set");
- }
- });
-}
-
-api::StorageReply::UP ProtocolSerialization7::onDecodeRequestBucketInfoReply(const SCmd& cmd, BBuf& buf) const {
- return decode_response<protobuf::RequestBucketInfoResponse>(buf, [&](auto& res) {
- auto reply = std::make_unique<api::RequestBucketInfoReply>(static_cast<const api::RequestBucketInfoCommand&>(cmd));
- auto& dest_entries = reply->getBucketInfo();
- uint32_t n_entries = res.bucket_infos_size();
- dest_entries.resize(n_entries);
- for (uint32_t i = 0; i < n_entries; ++i) {
- const auto& proto_entry = res.bucket_infos(i);
- dest_entries[i]._bucketId = document::BucketId(proto_entry.raw_bucket_id());
- dest_entries[i]._info = get_bucket_info(proto_entry.bucket_info());
- }
- return reply;
- });
-}
-
-// -----------------------------------------------------------------
-// NotifyBucketChange
-// -----------------------------------------------------------------
-
-void ProtocolSerialization7::onEncode(GBBuf& buf, const api::NotifyBucketChangeCommand& msg) const {
- encode_bucket_request<protobuf::NotifyBucketChangeRequest>(buf, msg, [&](auto& req) {
- set_bucket_info(*req.mutable_bucket_info(), msg.getBucketInfo());
- });
-}
-
-void ProtocolSerialization7::onEncode(GBBuf& buf, const api::NotifyBucketChangeReply& msg) const {
- encode_response<protobuf::NotifyBucketChangeResponse>(buf, msg, no_op_encode);
-}
-
-api::StorageCommand::UP ProtocolSerialization7::onDecodeNotifyBucketChangeCommand(BBuf& buf) const {
- return decode_bucket_request<protobuf::NotifyBucketChangeRequest>(buf, [&](auto& req, auto& bucket) {
- auto bucket_info = get_bucket_info(req.bucket_info());
- return std::make_unique<api::NotifyBucketChangeCommand>(bucket, bucket_info);
- });
-}
-
-api::StorageReply::UP ProtocolSerialization7::onDecodeNotifyBucketChangeReply(const SCmd& cmd, BBuf& buf) const {
- return decode_response<protobuf::NotifyBucketChangeResponse>(buf, [&]([[maybe_unused]] auto& res) {
- return std::make_unique<api::NotifyBucketChangeReply>(static_cast<const api::NotifyBucketChangeCommand&>(cmd));
- });
-}
-
-// -----------------------------------------------------------------
-// SplitBucket
-// -----------------------------------------------------------------
-
-void ProtocolSerialization7::onEncode(GBBuf& buf, const api::SplitBucketCommand& msg) const {
- encode_bucket_request<protobuf::SplitBucketRequest>(buf, msg, [&](auto& req) {
- req.set_min_split_bits(msg.getMinSplitBits());
- req.set_max_split_bits(msg.getMaxSplitBits());
- req.set_min_byte_size(msg.getMinByteSize());
- req.set_min_doc_count(msg.getMinDocCount());
- });
-}
-
-void ProtocolSerialization7::onEncode(GBBuf& buf, const api::SplitBucketReply& msg) const {
- encode_bucket_response<protobuf::SplitBucketResponse>(buf, msg, [&](auto& res) {
- for (const auto& split_info : msg.getSplitInfo()) {
- auto* proto_info = res.add_split_info();
- proto_info->set_raw_bucket_id(split_info.first.getRawId());
- set_bucket_info(*proto_info->mutable_bucket_info(), split_info.second);
- }
- });
-}
-
-api::StorageCommand::UP ProtocolSerialization7::onDecodeSplitBucketCommand(BBuf& buf) const {
- return decode_bucket_request<protobuf::SplitBucketRequest>(buf, [&](auto& req, auto& bucket) {
- auto cmd = std::make_unique<api::SplitBucketCommand>(bucket);
- cmd->setMinSplitBits(static_cast<uint8_t>(req.min_split_bits()));
- cmd->setMaxSplitBits(static_cast<uint8_t>(req.max_split_bits()));
- cmd->setMinByteSize(req.min_byte_size());
- cmd->setMinDocCount(req.min_doc_count());
- return cmd;
- });
-}
-
-api::StorageReply::UP ProtocolSerialization7::onDecodeSplitBucketReply(const SCmd& cmd, BBuf& buf) const {
- return decode_bucket_response<protobuf::SplitBucketResponse>(buf, [&](auto& res) {
- auto reply = std::make_unique<api::SplitBucketReply>(static_cast<const api::SplitBucketCommand&>(cmd));
- auto& dest_info = reply->getSplitInfo();
- dest_info.reserve(res.split_info_size());
- for (const auto& proto_info : res.split_info()) {
- dest_info.emplace_back(document::BucketId(proto_info.raw_bucket_id()),
- get_bucket_info(proto_info.bucket_info()));
- }
- return reply;
- });
-}
-
-// -----------------------------------------------------------------
-// JoinBuckets
-// -----------------------------------------------------------------
-
-void ProtocolSerialization7::onEncode(GBBuf& buf, const api::JoinBucketsCommand& msg) const {
- encode_bucket_request<protobuf::JoinBucketsRequest>(buf, msg, [&](auto& req) {
- for (const auto& source : msg.getSourceBuckets()) {
- set_bucket_id(*req.add_source_buckets(), source);
- }
- req.set_min_join_bits(msg.getMinJoinBits());
- });
-}
-
-void ProtocolSerialization7::onEncode(GBBuf& buf, const api::JoinBucketsReply& msg) const {
- encode_bucket_info_response<protobuf::JoinBucketsResponse>(buf, msg, no_op_encode);
-}
-
-api::StorageCommand::UP ProtocolSerialization7::onDecodeJoinBucketsCommand(BBuf& buf) const {
- return decode_bucket_request<protobuf::JoinBucketsRequest>(buf, [&](auto& req, auto& bucket) {
- auto cmd = std::make_unique<api::JoinBucketsCommand>(bucket);
- auto& entries = cmd->getSourceBuckets();
- for (const auto& proto_bucket : req.source_buckets()) {
- entries.emplace_back(get_bucket_id(proto_bucket));
- }
- cmd->setMinJoinBits(static_cast<uint8_t>(req.min_join_bits()));
- return cmd;
- });
-}
-
-api::StorageReply::UP ProtocolSerialization7::onDecodeJoinBucketsReply(const SCmd& cmd, BBuf& buf) const {
- return decode_bucket_info_response<protobuf::JoinBucketsResponse>(buf, [&]([[maybe_unused]] auto& res) {
- return std::make_unique<api::JoinBucketsReply>(static_cast<const api::JoinBucketsCommand&>(cmd));
- });
-}
-
-// -----------------------------------------------------------------
-// SetBucketState
-// -----------------------------------------------------------------
-
-void ProtocolSerialization7::onEncode(GBBuf& buf, const api::SetBucketStateCommand& msg) const {
- encode_bucket_request<protobuf::SetBucketStateRequest>(buf, msg, [&](auto& req) {
- auto state = (msg.getState() == api::SetBucketStateCommand::BUCKET_STATE::ACTIVE
- ? protobuf::SetBucketStateRequest_BucketState_Active
- : protobuf::SetBucketStateRequest_BucketState_Inactive);
- req.set_state(state);
- });
-}
-
-void ProtocolSerialization7::onEncode(GBBuf& buf, const api::SetBucketStateReply& msg) const {
- // SetBucketStateReply is _technically_ a BucketInfoReply, but the legacy protocol impls
- // do _not_ encode bucket info as part of the wire format (and it's not used on the distributor),
- // so we follow that here and only encode remapping information.
- encode_bucket_response<protobuf::SetBucketStateResponse>(buf, msg, no_op_encode);
-}
-
-api::StorageCommand::UP ProtocolSerialization7::onDecodeSetBucketStateCommand(BBuf& buf) const {
- return decode_bucket_request<protobuf::SetBucketStateRequest>(buf, [&](auto& req, auto& bucket) {
- auto state = (req.state() == protobuf::SetBucketStateRequest_BucketState_Active
- ? api::SetBucketStateCommand::BUCKET_STATE::ACTIVE
- : api::SetBucketStateCommand::BUCKET_STATE::INACTIVE);
- return std::make_unique<api::SetBucketStateCommand>(bucket, state);
- });
-}
-
-api::StorageReply::UP ProtocolSerialization7::onDecodeSetBucketStateReply(const SCmd& cmd, BBuf& buf) const {
- return decode_bucket_response<protobuf::SetBucketStateResponse>(buf, [&]([[maybe_unused]] auto& res) {
- return std::make_unique<api::SetBucketStateReply>(static_cast<const api::SetBucketStateCommand&>(cmd));
- });
-}
-
-// -----------------------------------------------------------------
-// CreateVisitor
-// -----------------------------------------------------------------
-
-void ProtocolSerialization7::onEncode(GBBuf& buf, const api::CreateVisitorCommand& msg) const {
- encode_request<protobuf::CreateVisitorRequest>(buf, msg, [&](auto& req) {
- set_bucket_space(*req.mutable_bucket_space(), msg.getBucketSpace());
- for (const auto& bucket : msg.getBuckets()) {
- set_bucket_id(*req.add_buckets(), bucket);
- }
-
- auto* ctrl_meta = req.mutable_control_meta();
- ctrl_meta->set_library_name(msg.getLibraryName().data(), msg.getLibraryName().size());
- ctrl_meta->set_instance_id(msg.getInstanceId().data(), msg.getInstanceId().size());
- ctrl_meta->set_visitor_command_id(msg.getVisitorCmdId());
- ctrl_meta->set_control_destination(msg.getControlDestination().data(), msg.getControlDestination().size());
- ctrl_meta->set_data_destination(msg.getDataDestination().data(), msg.getDataDestination().size());
- ctrl_meta->set_queue_timeout(msg.getQueueTimeout());
- ctrl_meta->set_max_pending_reply_count(msg.getMaximumPendingReplyCount());
- ctrl_meta->set_max_buckets_per_visitor(msg.getMaxBucketsPerVisitor());
-
- auto* constraints = req.mutable_constraints();
- constraints->set_document_selection(msg.getDocumentSelection().data(), msg.getDocumentSelection().size());
- constraints->set_from_time_usec(msg.getFromTime());
- constraints->set_to_time_usec(msg.getToTime());
- constraints->set_visit_inconsistent_buckets(msg.visitInconsistentBuckets());
- constraints->set_visit_removes(msg.visitRemoves());
- constraints->set_field_set(msg.getFieldSet().data(), msg.getFieldSet().size());
-
- for (const auto& param : msg.getParameters()) {
- auto* proto_param = req.add_client_parameters();
- proto_param->set_key(param.first.data(), param.first.size());
- proto_param->set_value(param.second.data(), param.second.size());
- }
- });
-}
-
-void ProtocolSerialization7::onEncode(GBBuf& buf, const api::CreateVisitorReply& msg) const {
- encode_response<protobuf::CreateVisitorResponse>(buf, msg, [&](auto& res) {
- auto& stats = msg.getVisitorStatistics();
- auto* proto_stats = res.mutable_visitor_statistics();
- proto_stats->set_buckets_visited(stats.getBucketsVisited());
- proto_stats->set_documents_visited(stats.getDocumentsVisited());
- proto_stats->set_bytes_visited(stats.getBytesVisited());
- proto_stats->set_documents_returned(stats.getDocumentsReturned());
- proto_stats->set_bytes_returned(stats.getBytesReturned());
- proto_stats->set_second_pass_documents_returned(stats.getSecondPassDocumentsReturned());
- proto_stats->set_second_pass_bytes_returned(stats.getSecondPassBytesReturned());
- });
-}
-
-api::StorageCommand::UP ProtocolSerialization7::onDecodeCreateVisitorCommand(BBuf& buf) const {
- return decode_request<protobuf::CreateVisitorRequest>(buf, [&](auto& req) {
- auto bucket_space = get_bucket_space(req.bucket_space());
- auto& ctrl_meta = req.control_meta();
- auto& constraints = req.constraints();
- auto cmd = std::make_unique<api::CreateVisitorCommand>(bucket_space, ctrl_meta.library_name(),
- ctrl_meta.instance_id(), constraints.document_selection());
- for (const auto& proto_bucket : req.buckets()) {
- cmd->getBuckets().emplace_back(get_bucket_id(proto_bucket));
- }
-
- cmd->setVisitorCmdId(ctrl_meta.visitor_command_id());
- cmd->setControlDestination(ctrl_meta.control_destination());
- cmd->setDataDestination(ctrl_meta.data_destination());
- cmd->setMaximumPendingReplyCount(ctrl_meta.max_pending_reply_count());
- cmd->setQueueTimeout(ctrl_meta.queue_timeout());
- cmd->setMaxBucketsPerVisitor(ctrl_meta.max_buckets_per_visitor());
- cmd->setVisitorDispatcherVersion(50); // FIXME this magic number is lifted verbatim from the 5.1 protocol impl
-
- for (const auto& proto_param : req.client_parameters()) {
- cmd->getParameters().set(proto_param.key(), proto_param.value());
- }
-
- cmd->setFromTime(constraints.from_time_usec());
- cmd->setToTime(constraints.to_time_usec());
- cmd->setVisitRemoves(constraints.visit_removes());
- cmd->setFieldSet(constraints.field_set());
- cmd->setVisitInconsistentBuckets(constraints.visit_inconsistent_buckets());
- return cmd;
- });
-}
-
-api::StorageReply::UP ProtocolSerialization7::onDecodeCreateVisitorReply(const SCmd& cmd, BBuf& buf) const {
- return decode_response<protobuf::CreateVisitorResponse>(buf, [&](auto& res) {
- auto reply = std::make_unique<api::CreateVisitorReply>(static_cast<const api::CreateVisitorCommand&>(cmd));
- vdslib::VisitorStatistics vs;
- const auto& proto_stats = res.visitor_statistics();
- vs.setBucketsVisited(proto_stats.buckets_visited());
- vs.setDocumentsVisited(proto_stats.documents_visited());
- vs.setBytesVisited(proto_stats.bytes_visited());
- vs.setDocumentsReturned(proto_stats.documents_returned());
- vs.setBytesReturned(proto_stats.bytes_returned());
- vs.setSecondPassDocumentsReturned(proto_stats.second_pass_documents_returned());
- vs.setSecondPassBytesReturned(proto_stats.second_pass_bytes_returned());
- reply->setVisitorStatistics(vs);
- return reply;
- });
-}
-
-// -----------------------------------------------------------------
-// DestroyVisitor
-// -----------------------------------------------------------------
-
-void ProtocolSerialization7::onEncode(GBBuf& buf, const api::DestroyVisitorCommand& msg) const {
- encode_request<protobuf::DestroyVisitorRequest>(buf, msg, [&](auto& req) {
- req.set_instance_id(msg.getInstanceId().data(), msg.getInstanceId().size());
- });
-}
-
-void ProtocolSerialization7::onEncode(GBBuf& buf, const api::DestroyVisitorReply& msg) const {
- encode_response<protobuf::DestroyVisitorResponse>(buf, msg, no_op_encode);
-}
-
-api::StorageCommand::UP ProtocolSerialization7::onDecodeDestroyVisitorCommand(BBuf& buf) const {
- return decode_request<protobuf::DestroyVisitorRequest>(buf, [&](auto& req) {
- return std::make_unique<api::DestroyVisitorCommand>(req.instance_id());
- });
-}
-
-api::StorageReply::UP ProtocolSerialization7::onDecodeDestroyVisitorReply(const SCmd& cmd, BBuf& buf) const {
- return decode_response<protobuf::DestroyVisitorResponse>(buf, [&]([[maybe_unused]] auto& res) {
- return std::make_unique<api::DestroyVisitorReply>(static_cast<const api::DestroyVisitorCommand&>(cmd));
- });
-}
-
-} // storage::mbusprot
diff --git a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.h b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.h
deleted file mode 100644
index f3499150278..00000000000
--- a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.h
+++ /dev/null
@@ -1,146 +0,0 @@
-// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-#pragma once
-
-#include "protocolserialization.h"
-#include <vespa/documentapi/loadtypes/loadtypeset.h>
-
-namespace storage {
-namespace mbusprot {
-
-/**
- * Protocol serialization version that uses Protocol Buffers for all its binary
- * encoding and decoding.
- */
-class ProtocolSerialization7 : public ProtocolSerialization {
- const std::shared_ptr<const document::DocumentTypeRepo> _repo;
- const documentapi::LoadTypeSet& _load_types;
-public:
- ProtocolSerialization7(std::shared_ptr<const document::DocumentTypeRepo> repo,
- const documentapi::LoadTypeSet& load_types);
-
- const document::DocumentTypeRepo& type_repo() const { return *_repo; }
-
- // Put
- void onEncode(GBBuf&, const api::PutCommand&) const override;
- void onEncode(GBBuf&, const api::PutReply&) const override;
- SCmd::UP onDecodePutCommand(BBuf&) const override;
- SRep::UP onDecodePutReply(const SCmd&, BBuf&) const override;
-
- // Update
- void onEncode(GBBuf&, const api::UpdateCommand&) const override;
- void onEncode(GBBuf&, const api::UpdateReply&) const override;
- SCmd::UP onDecodeUpdateCommand(BBuf&) const override;
- SRep::UP onDecodeUpdateReply(const SCmd&, BBuf&) const override;
-
- // Remove
- void onEncode(GBBuf&, const api::RemoveCommand&) const override;
- void onEncode(GBBuf&, const api::RemoveReply&) const override;
- SCmd::UP onDecodeRemoveCommand(BBuf&) const override;
- SRep::UP onDecodeRemoveReply(const SCmd&, BBuf&) const override;
-
- // Get
- void onEncode(GBBuf&, const api::GetCommand&) const override;
- void onEncode(GBBuf&, const api::GetReply&) const override;
- SCmd::UP onDecodeGetCommand(BBuf&) const override;
- SRep::UP onDecodeGetReply(const SCmd&, BBuf&) const override;
-
- // Revert - TODO this is deprecated, no?
- void onEncode(GBBuf&, const api::RevertCommand&) const override;
- void onEncode(GBBuf&, const api::RevertReply&) const override;
- SCmd::UP onDecodeRevertCommand(BBuf&) const override;
- SRep::UP onDecodeRevertReply(const SCmd&, BBuf&) const override;
-
- // DeleteBucket
- void onEncode(GBBuf&, const api::DeleteBucketCommand&) const override;
- void onEncode(GBBuf&, const api::DeleteBucketReply&) const override;
- SCmd::UP onDecodeDeleteBucketCommand(BBuf&) const override;
- SRep::UP onDecodeDeleteBucketReply(const SCmd&, BBuf&) const override;
-
- // CreateBucket
- void onEncode(GBBuf&, const api::CreateBucketCommand&) const override;
- void onEncode(GBBuf&, const api::CreateBucketReply&) const override;
- SCmd::UP onDecodeCreateBucketCommand(BBuf&) const override;
- SRep::UP onDecodeCreateBucketReply(const SCmd&, BBuf&) const override;
-
- // MergeBucket
- void onEncode(GBBuf&, const api::MergeBucketCommand&) const override;
- void onEncode(GBBuf&, const api::MergeBucketReply&) const override;
- SCmd::UP onDecodeMergeBucketCommand(BBuf&) const override;
- SRep::UP onDecodeMergeBucketReply(const SCmd&, BBuf&) const override;
-
- // GetBucketDiff
- void onEncode(GBBuf&, const api::GetBucketDiffCommand&) const override;
- void onEncode(GBBuf&, const api::GetBucketDiffReply&) const override;
- SCmd::UP onDecodeGetBucketDiffCommand(BBuf&) const override;
- SRep::UP onDecodeGetBucketDiffReply(const SCmd&, BBuf&) const override;
-
- // ApplyBucketDiff
- void onEncode(GBBuf&, const api::ApplyBucketDiffCommand&) const override;
- void onEncode(GBBuf&, const api::ApplyBucketDiffReply&) const override;
- SCmd::UP onDecodeApplyBucketDiffCommand(BBuf&) const override;
- SRep::UP onDecodeApplyBucketDiffReply(const SCmd&, BBuf&) const override;
-
- // RequestBucketInfo
- void onEncode(GBBuf&, const api::RequestBucketInfoCommand&) const override;
- void onEncode(GBBuf&, const api::RequestBucketInfoReply&) const override;
- SCmd::UP onDecodeRequestBucketInfoCommand(BBuf&) const override;
- SRep::UP onDecodeRequestBucketInfoReply(const SCmd&, BBuf&) const override;
-
- // NotifyBucketChange
- void onEncode(GBBuf&, const api::NotifyBucketChangeCommand&) const override;
- void onEncode(GBBuf&, const api::NotifyBucketChangeReply&) const override;
- SCmd::UP onDecodeNotifyBucketChangeCommand(BBuf&) const override;
- SRep::UP onDecodeNotifyBucketChangeReply(const SCmd&, BBuf&) const override;
-
- // SplitBucket
- void onEncode(GBBuf&, const api::SplitBucketCommand&) const override;
- void onEncode(GBBuf&, const api::SplitBucketReply&) const override;
- SCmd::UP onDecodeSplitBucketCommand(BBuf&) const override;
- SRep::UP onDecodeSplitBucketReply(const SCmd&, BBuf&) const override;
-
- // JoinBuckets
- void onEncode(GBBuf&, const api::JoinBucketsCommand&) const override;
- void onEncode(GBBuf&, const api::JoinBucketsReply&) const override;
- SCmd::UP onDecodeJoinBucketsCommand(BBuf&) const override;
- SRep::UP onDecodeJoinBucketsReply(const SCmd&, BBuf&) const override;
-
- // SetBucketState
- void onEncode(GBBuf&, const api::SetBucketStateCommand&) const override;
- void onEncode(GBBuf&, const api::SetBucketStateReply&) const override;
- SCmd::UP onDecodeSetBucketStateCommand(BBuf&) const override;
- SRep::UP onDecodeSetBucketStateReply(const SCmd&, BBuf&) const override;
-
- // CreateVisitor
- void onEncode(GBBuf&, const api::CreateVisitorCommand&) const override;
- void onEncode(GBBuf&, const api::CreateVisitorReply&) const override;
- SCmd::UP onDecodeCreateVisitorCommand(BBuf&) const override;
- SRep::UP onDecodeCreateVisitorReply(const SCmd&, BBuf&) const override;
-
- // DestroyVisitor
- void onEncode(GBBuf&, const api::DestroyVisitorCommand&) const override;
- void onEncode(GBBuf&, const api::DestroyVisitorReply&) const override;
- SCmd::UP onDecodeDestroyVisitorCommand(BBuf&) const override;
- SRep::UP onDecodeDestroyVisitorReply(const SCmd&, BBuf&) const override;
-
- // RemoveLocation
- void onEncode(GBBuf&, const api::RemoveLocationCommand&) const override;
- void onEncode(GBBuf&, const api::RemoveLocationReply&) const override;
- SCmd::UP onDecodeRemoveLocationCommand(BBuf&) const override;
- SRep::UP onDecodeRemoveLocationReply(const SCmd&, BBuf&) const override;
-
-private:
- template <typename ProtobufType, typename Func>
- std::unique_ptr<api::StorageCommand> decode_request(document::ByteBuffer& in_buf, Func&& f) const;
- template <typename ProtobufType, typename Func>
- std::unique_ptr<api::StorageReply> decode_response(document::ByteBuffer& in_buf, Func&& f) const;
- template <typename ProtobufType, typename Func>
- std::unique_ptr<api::StorageCommand> decode_bucket_request(document::ByteBuffer& in_buf, Func&& f) const;
- template <typename ProtobufType, typename Func>
- std::unique_ptr<api::StorageReply> decode_bucket_response(document::ByteBuffer& in_buf, Func&& f) const;
- template <typename ProtobufType, typename Func>
- std::unique_ptr<api::StorageReply> decode_bucket_info_response(document::ByteBuffer& in_buf, Func&& f) const;
-};
-
-}
-}
diff --git a/storageapi/src/vespa/storageapi/mbusprot/storageprotocol.cpp b/storageapi/src/vespa/storageapi/mbusprot/storageprotocol.cpp
index 7bc6333762b..7e6be0a84f5 100644
--- a/storageapi/src/vespa/storageapi/mbusprot/storageprotocol.cpp
+++ b/storageapi/src/vespa/storageapi/mbusprot/storageprotocol.cpp
@@ -20,8 +20,7 @@ StorageProtocol::StorageProtocol(const std::shared_ptr<const document::DocumentT
: _serializer5_0(repo, loadTypes),
_serializer5_1(repo, loadTypes),
_serializer5_2(repo, loadTypes),
- _serializer6_0(repo, loadTypes),
- _serializer7_0(repo, loadTypes)
+ _serializer6_0(repo, loadTypes)
{
}
@@ -34,7 +33,6 @@ StorageProtocol::createPolicy(const mbus::string&, const mbus::string&) const
}
namespace {
- vespalib::Version version7_0(7, 40, 5);
vespalib::Version version6_0(6, 240, 0);
vespalib::Version version5_2(5, 93, 30);
vespalib::Version version5_1(5, 1, 0);
@@ -108,10 +106,8 @@ StorageProtocol::encode(const vespalib::Version& version,
} else {
if (version < version6_0) {
return encodeMessage(_serializer5_2, routable, message, version5_2, version);
- } else if (version < version7_0) {
- return encodeMessage(_serializer6_0, routable, message, version6_0, version);
} else {
- return encodeMessage(_serializer7_0, routable, message, version7_0, version);
+ return encodeMessage(_serializer6_0, routable, message, version6_0, version);
}
}
@@ -184,10 +180,8 @@ StorageProtocol::decode(const vespalib::Version & version,
} else {
if (version < version6_0) {
return decodeMessage(_serializer5_2, data, type, version5_2, version);
- } else if (version < version7_0) {
- return decodeMessage(_serializer6_0, data, type, version6_0, version);
} else {
- return decodeMessage(_serializer7_0, data, type, version7_0, version);
+ return decodeMessage(_serializer6_0, data, type, version6_0, version);
}
}
} catch (std::exception & e) {
diff --git a/storageapi/src/vespa/storageapi/mbusprot/storageprotocol.h b/storageapi/src/vespa/storageapi/mbusprot/storageprotocol.h
index 67ea121c340..1acd7c9675f 100644
--- a/storageapi/src/vespa/storageapi/mbusprot/storageprotocol.h
+++ b/storageapi/src/vespa/storageapi/mbusprot/storageprotocol.h
@@ -3,7 +3,6 @@
#include "protocolserialization5_2.h"
#include "protocolserialization6_0.h"
-#include "protocolserialization7.h"
#include <vespa/messagebus/iprotocol.h>
namespace storage::mbusprot {
@@ -29,7 +28,6 @@ private:
ProtocolSerialization5_1 _serializer5_1;
ProtocolSerialization5_2 _serializer5_2;
ProtocolSerialization6_0 _serializer6_0;
- ProtocolSerialization7 _serializer7_0;
};
}