summaryrefslogtreecommitdiffstats
path: root/storageapi
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@verizonmedia.com>2019-04-12 14:17:56 +0000
committerTor Brede Vekterli <vekterli@verizonmedia.com>2019-04-23 12:16:45 +0000
commit183ce7f7475a361e64913cd60685392d635c8b1a (patch)
tree2da75df5b39ef5f7c258ac24c200523a2da51172 /storageapi
parent979a2980aeaf89cc111f9dec74fa46cf191a8d8f (diff)
Reapply protocol buffers for internal StorageAPI wire encoding
Diffstat (limited to 'storageapi')
-rw-r--r--storageapi/src/tests/CMakeLists.txt3
-rw-r--r--storageapi/src/tests/mbusprot/CMakeLists.txt2
-rw-r--r--storageapi/src/tests/mbusprot/storageprotocoltest.cpp1093
-rw-r--r--storageapi/src/vespa/storageapi/CMakeLists.txt4
-rw-r--r--storageapi/src/vespa/storageapi/mbusprot/.gitignore3
-rw-r--r--storageapi/src/vespa/storageapi/mbusprot/CMakeLists.txt17
-rw-r--r--storageapi/src/vespa/storageapi/mbusprot/legacyprotocolserialization.h31
-rw-r--r--storageapi/src/vespa/storageapi/mbusprot/protobuf/common.proto68
-rw-r--r--storageapi/src/vespa/storageapi/mbusprot/protobuf/feed.proto91
-rw-r--r--storageapi/src/vespa/storageapi/mbusprot/protobuf/maintenance.proto160
-rw-r--r--storageapi/src/vespa/storageapi/mbusprot/protobuf/visiting.proto66
-rw-r--r--storageapi/src/vespa/storageapi/mbusprot/protobuf_includes.h13
-rw-r--r--storageapi/src/vespa/storageapi/mbusprot/protocolserialization.cpp5
-rw-r--r--storageapi/src/vespa/storageapi/mbusprot/protocolserialization.h34
-rw-r--r--storageapi/src/vespa/storageapi/mbusprot/protocolserialization4_2.cpp2
-rw-r--r--storageapi/src/vespa/storageapi/mbusprot/protocolserialization4_2.h6
-rw-r--r--storageapi/src/vespa/storageapi/mbusprot/protocolserialization5_0.h3
-rw-r--r--storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.cpp1268
-rw-r--r--storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.h146
-rw-r--r--storageapi/src/vespa/storageapi/mbusprot/storageprotocol.cpp12
-rw-r--r--storageapi/src/vespa/storageapi/mbusprot/storageprotocol.h2
21 files changed, 2304 insertions, 725 deletions
diff --git a/storageapi/src/tests/CMakeLists.txt b/storageapi/src/tests/CMakeLists.txt
index ebbf3b8357a..ddc43c70004 100644
--- a/storageapi/src/tests/CMakeLists.txt
+++ b/storageapi/src/tests/CMakeLists.txt
@@ -7,6 +7,7 @@ vespa_add_executable(storageapi_gtest_runner_app TEST
gtest_runner.cpp
DEPENDS
storageapi_testbuckets
+ storageapi_testmbusprot
storageapi
gtest
)
@@ -22,8 +23,8 @@ vespa_add_executable(storageapi_testrunner_app TEST
testrunner.cpp
DEPENDS
storageapi_testmessageapi
- storageapi_testmbusprot
storageapi
+ vdstestlib
)
vespa_add_test(
diff --git a/storageapi/src/tests/mbusprot/CMakeLists.txt b/storageapi/src/tests/mbusprot/CMakeLists.txt
index 16ced76155c..2801c9a91dd 100644
--- a/storageapi/src/tests/mbusprot/CMakeLists.txt
+++ b/storageapi/src/tests/mbusprot/CMakeLists.txt
@@ -4,5 +4,5 @@ vespa_add_library(storageapi_testmbusprot
storageprotocoltest.cpp
DEPENDS
storageapi
- vdstestlib
+ gtest
)
diff --git a/storageapi/src/tests/mbusprot/storageprotocoltest.cpp b/storageapi/src/tests/mbusprot/storageprotocoltest.cpp
index f634667afd5..8690d89e12b 100644
--- a/storageapi/src/tests/mbusprot/storageprotocoltest.cpp
+++ b/storageapi/src/tests/mbusprot/storageprotocoltest.cpp
@@ -14,12 +14,16 @@
#include <vespa/document/update/fieldpathupdates.h>
#include <vespa/document/test/make_document_bucket.h>
#include <vespa/document/test/make_bucket_space.h>
-#include <vespa/vdstestlib/cppunit/macros.h>
#include <vespa/vespalib/util/growablebytebuffer.h>
#include <vespa/vespalib/objects/nbostream.h>
+
#include <iomanip>
#include <sstream>
+#include <gtest/gtest.h>
+
+using namespace ::testing;
+
using std::shared_ptr;
using document::BucketSpace;
using document::ByteBuffer;
@@ -32,183 +36,103 @@ using document::test::makeBucketSpace;
using storage::lib::ClusterState;
using vespalib::string;
-namespace storage {
-namespace api {
+namespace vespalib {
+
+// Needed for GTest to properly understand how to print Version values.
+// If not present, it will print the byte values of the presumed memory area
+// (which will be overrun for some reason, causing Valgrind to scream at us).
+void PrintTo(const vespalib::Version& v, std::ostream* os) {
+ *os << v.toString();
+}
+
+}
+
+namespace storage::api {
-struct StorageProtocolTest : public CppUnit::TestFixture {
+struct StorageProtocolTest : TestWithParam<vespalib::Version> {
document::TestDocMan _docMan;
document::Document::SP _testDoc;
document::DocumentId _testDocId;
+ document::BucketId _bucket_id;
document::Bucket _bucket;
- vespalib::Version _version5_0{5, 0, 12};
- vespalib::Version _version5_1{5, 1, 0};
- vespalib::Version _version5_2{5, 93, 30};
- vespalib::Version _version6_0{6, 240, 0};
+ document::BucketId _dummy_remap_bucket{17, 12345};
+ BucketInfo _dummy_bucket_info{1,2,3,4,5, true, false, 48};
documentapi::LoadTypeSet _loadTypes;
mbusprot::StorageProtocol _protocol;
- static std::vector<std::string> _nonVerboseMessageStrings;
- static std::vector<std::string> _verboseMessageStrings;
- static std::vector<char> _serialization50;
static auto constexpr CONDITION_STRING = "There's just one condition";
StorageProtocolTest()
: _docMan(),
_testDoc(_docMan.createDocument()),
_testDocId(_testDoc->getId()),
- _bucket(makeDocumentBucket(document::BucketId(16, 0x51))),
+ _bucket_id(16, 0x51),
+ _bucket(makeDocumentBucket(_bucket_id)),
_protocol(_docMan.getTypeRepoSP(), _loadTypes)
{
_loadTypes.addLoadType(34, "foo", documentapi::Priority::PRI_NORMAL_2);
}
+ ~StorageProtocolTest();
+
+ void set_dummy_bucket_info_reply_fields(BucketInfoReply& reply) {
+ reply.setBucketInfo(_dummy_bucket_info);
+ reply.remapBucketId(_dummy_remap_bucket);
+ }
+
+ void assert_bucket_info_reply_fields_propagated(const BucketInfoReply& reply) {
+ EXPECT_EQ(_dummy_bucket_info, reply.getBucketInfo());
+ EXPECT_TRUE(reply.hasBeenRemapped());
+ EXPECT_EQ(_dummy_remap_bucket, reply.getBucketId());
+ EXPECT_EQ(_bucket_id, reply.getOriginalBucketId());
+ }
template<typename Command>
- std::shared_ptr<Command> copyCommand(const std::shared_ptr<Command>&, vespalib::Version);
+ std::shared_ptr<Command> copyCommand(const std::shared_ptr<Command>&);
template<typename Reply>
std::shared_ptr<Reply> copyReply(const std::shared_ptr<Reply>&);
- void recordOutput(const api::StorageMessage& msg);
-
- void recordSerialization50();
-
- void testWriteSerialization50();
- void testAddress50();
- void testStringOutputs();
-
- void testPut51();
- void testUpdate51();
- void testGet51();
- void testRemove51();
- void testRevert51();
- void testRequestBucketInfo51();
- void testNotifyBucketChange51();
- void testCreateBucket51();
- void testDeleteBucket51();
- void testMergeBucket51();
- void testGetBucketDiff51();
- void testApplyBucketDiff51();
- void testSplitBucket51();
- void testSplitBucketChain51();
- void testJoinBuckets51();
- void testCreateVisitor51();
- void testDestroyVisitor51();
- void testRemoveLocation51();
- void testInternalMessage();
- void testSetBucketState51();
-
- void testPutCommand52();
- void testUpdateCommand52();
- void testRemoveCommand52();
-
- void testPutCommandWithBucketSpace6_0();
- void testCreateVisitorWithBucketSpace6_0();
- void testRequestBucketInfoWithBucketSpace6_0();
-
- void serialized_size_is_used_to_set_approx_size_of_storage_message();
-
- CPPUNIT_TEST_SUITE(StorageProtocolTest);
-
- // Enable to see string outputs of messages
- // CPPUNIT_TEST_DISABLED(testStringOutputs);
-
- // Enable this to write 5.0 serialization to disk
- // CPPUNIT_TEST_DISABLED(testWriteSerialization50);
- // CPPUNIT_TEST_DISABLED(testAddress50);
-
- // 5.1 tests
- CPPUNIT_TEST(testPut51);
- CPPUNIT_TEST(testUpdate51);
- CPPUNIT_TEST(testGet51);
- CPPUNIT_TEST(testRemove51);
- CPPUNIT_TEST(testRevert51);
- CPPUNIT_TEST(testRequestBucketInfo51);
- CPPUNIT_TEST(testNotifyBucketChange51);
- CPPUNIT_TEST(testCreateBucket51);
- CPPUNIT_TEST(testDeleteBucket51);
- CPPUNIT_TEST(testMergeBucket51);
- CPPUNIT_TEST(testGetBucketDiff51);
- CPPUNIT_TEST(testApplyBucketDiff51);
- CPPUNIT_TEST(testSplitBucket51);
- CPPUNIT_TEST(testJoinBuckets51);
- CPPUNIT_TEST(testCreateVisitor51);
- CPPUNIT_TEST(testDestroyVisitor51);
- CPPUNIT_TEST(testRemoveLocation51);
- CPPUNIT_TEST(testInternalMessage);
- CPPUNIT_TEST(testSetBucketState51);
-
- // 5.2 tests
- CPPUNIT_TEST(testPutCommand52);
- CPPUNIT_TEST(testUpdateCommand52);
- CPPUNIT_TEST(testRemoveCommand52);
-
- // 6.0 tests
- CPPUNIT_TEST(testPutCommandWithBucketSpace6_0);
- CPPUNIT_TEST(testCreateVisitorWithBucketSpace6_0);
- CPPUNIT_TEST(testRequestBucketInfoWithBucketSpace6_0);
-
- CPPUNIT_TEST(serialized_size_is_used_to_set_approx_size_of_storage_message);
-
- CPPUNIT_TEST_SUITE_END();
};
-CPPUNIT_TEST_SUITE_REGISTRATION(StorageProtocolTest);
-
-std::vector<std::string> StorageProtocolTest::_nonVerboseMessageStrings;
-std::vector<std::string> StorageProtocolTest::_verboseMessageStrings;
-std::vector<char> StorageProtocolTest::_serialization50;
-
-void
-StorageProtocolTest::recordOutput(const api::StorageMessage& msg)
-{
- std::ostringstream ost;
- ost << " ";
- msg.print(ost, false, " ");
- _nonVerboseMessageStrings.push_back(ost.str());
- ost.str("");
- ost << " ";
- msg.print(ost, true, " ");
- _verboseMessageStrings.push_back(ost.str());
-}
+StorageProtocolTest::~StorageProtocolTest() = default;
namespace {
- bool debug = false;
-
- struct ScopedName {
- std::string _name;
+std::string version_as_gtest_string(TestParamInfo<vespalib::Version> info) {
+ std::ostringstream ss;
+ auto& p = info.param;
+ // Dots are not allowed in test names, so convert to underscores.
+ ss << p.getMajor() << '_' << p.getMinor() << '_' << p.getMicro();
+ return ss.str();
+}
- ScopedName(const std::string& s) : _name(s) {
- if (debug) std::cerr << "Starting test " << _name << "\n";
- }
- ~ScopedName() {
- if (debug) std::cerr << "Finished test " << _name << "\n";
- }
- };
+}
-} // Anonymous namespace
+// TODO replace with INSTANTIATE_TEST_SUITE_P on newer gtest versions
+INSTANTIATE_TEST_CASE_P(MultiVersionTest, StorageProtocolTest,
+ Values(vespalib::Version(6, 240, 0),
+ vespalib::Version(7, 40, 5)),
+ version_as_gtest_string);
namespace {
mbus::Message::UP lastCommand;
mbus::Reply::UP lastReply;
}
-void
-StorageProtocolTest::testAddress50()
-{
+TEST_F(StorageProtocolTest, testAddress50) {
StorageMessageAddress address("foo", lib::NodeType::STORAGE, 3);
- CPPUNIT_ASSERT_EQUAL(vespalib::string("storage/cluster.foo/storage/3/default"),
+ EXPECT_EQ(vespalib::string("storage/cluster.foo/storage/3/default"),
address.getRoute().toString());
}
template<typename Command> std::shared_ptr<Command>
-StorageProtocolTest::copyCommand(const std::shared_ptr<Command>& m, vespalib::Version version)
+StorageProtocolTest::copyCommand(const std::shared_ptr<Command>& m)
{
- mbus::Message::UP mbusMessage(new mbusprot::StorageCommand(m));
+ auto mbusMessage = std::make_unique<mbusprot::StorageCommand>(m);
+ auto version = GetParam();
mbus::Blob blob = _protocol.encode(version, *mbusMessage);
mbus::Routable::UP copy(_protocol.decode(version, blob));
+ assert(copy.get());
- CPPUNIT_ASSERT(copy.get());
-
- mbusprot::StorageCommand* copy2(dynamic_cast<mbusprot::StorageCommand*>(copy.get()));
- CPPUNIT_ASSERT(copy2 != 0);
+ auto* copy2 = dynamic_cast<mbusprot::StorageCommand*>(copy.get());
+ assert(copy2 != nullptr);
StorageCommand::SP internalMessage(copy2->getCommand());
lastCommand = std::move(mbusMessage);
@@ -219,80 +143,61 @@ StorageProtocolTest::copyCommand(const std::shared_ptr<Command>& m, vespalib::Ve
template<typename Reply> std::shared_ptr<Reply>
StorageProtocolTest::copyReply(const std::shared_ptr<Reply>& m)
{
- mbus::Reply::UP mbusMessage(new mbusprot::StorageReply(m));
- mbus::Blob blob = _protocol.encode(_version5_1, *mbusMessage);
- mbus::Routable::UP copy(_protocol.decode(_version5_1, blob));
- CPPUNIT_ASSERT(copy.get());
- mbusprot::StorageReply* copy2(
- dynamic_cast<mbusprot::StorageReply*>(copy.get()));
- CPPUNIT_ASSERT(copy2 != 0);
+ auto mbusMessage = std::make_unique<mbusprot::StorageReply>(m);
+ auto version = GetParam();
+ mbus::Blob blob = _protocol.encode(version, *mbusMessage);
+ mbus::Routable::UP copy(_protocol.decode(version, blob));
+ assert(copy.get());
+
+ auto* copy2 = dynamic_cast<mbusprot::StorageReply*>(copy.get());
+ assert(copy2 != nullptr);
+
copy2->setMessage(std::move(lastCommand));
- StorageReply::SP internalMessage(copy2->getReply());
+ auto internalMessage = copy2->getReply();
lastReply = std::move(mbusMessage);
lastCommand = copy2->getMessage();
return std::dynamic_pointer_cast<Reply>(internalMessage);
}
-void
-StorageProtocolTest::recordSerialization50()
-{
- assert(lastCommand.get());
- assert(lastReply.get());
- for (uint32_t j=0; j<2; ++j) {
- mbusprot::StorageMessage& msg(j == 0
- ? dynamic_cast<mbusprot::StorageMessage&>(*lastCommand)
- : dynamic_cast<mbusprot::StorageMessage&>(*lastReply));
- msg.getInternalMessage()->forceMsgId(0);
- mbus::Routable& routable(j == 0
- ? dynamic_cast<mbus::Routable&>(*lastCommand)
- : dynamic_cast<mbus::Routable&>(*lastReply));
- mbus::Blob blob = _protocol.encode(_version5_0, routable);
- _serialization50.push_back('\n');
- std::string type(msg.getInternalMessage()->getType().toString());
- for (uint32_t i=0, n=type.size(); i<n; ++i) {
- _serialization50.push_back(type[i]);
- }
- _serialization50.push_back('\n');
-
- for (uint32_t i=0, n=blob.size(); i<n; ++i) {
- _serialization50.push_back(blob.data()[i]);
- }
- }
-}
-
-void
-StorageProtocolTest::testPut51()
-{
- ScopedName test("testPut51");
- PutCommand::SP cmd(new PutCommand(_bucket, _testDoc, 14));
+TEST_P(StorageProtocolTest, put) {
+ auto cmd = std::make_shared<PutCommand>(_bucket, _testDoc, 14);
cmd->setUpdateTimestamp(Timestamp(13));
cmd->setLoadType(_loadTypes["foo"]);
- PutCommand::SP cmd2(copyCommand(cmd, _version5_1));
- CPPUNIT_ASSERT_EQUAL(*_testDoc, *cmd2->getDocument());
- CPPUNIT_ASSERT_EQUAL(vespalib::string("foo"), cmd2->getLoadType().getName());
- CPPUNIT_ASSERT_EQUAL(Timestamp(14), cmd2->getTimestamp());
- CPPUNIT_ASSERT_EQUAL(Timestamp(13), cmd2->getUpdateTimestamp());
-
- PutReply::SP reply(new PutReply(*cmd2));
- CPPUNIT_ASSERT(reply->hasDocument());
- CPPUNIT_ASSERT_EQUAL(*_testDoc, *reply->getDocument());
- PutReply::SP reply2(copyReply(reply));
- CPPUNIT_ASSERT(reply2->hasDocument());
- CPPUNIT_ASSERT_EQUAL(*_testDoc, *reply->getDocument());
- CPPUNIT_ASSERT_EQUAL(_testDoc->getId(), reply2->getDocumentId());
- CPPUNIT_ASSERT_EQUAL(Timestamp(14), reply2->getTimestamp());
-
- recordOutput(*cmd2);
- recordOutput(*reply2);
- recordSerialization50();
-}
-
-void
-StorageProtocolTest::testUpdate51()
-{
- ScopedName test("testUpdate51");
- document::DocumentUpdate::SP update(new document::DocumentUpdate(_docMan.getTypeRepo(), *_testDoc->getDataType(), _testDoc->getId()));
- std::shared_ptr<document::AssignValueUpdate> assignUpdate(new document::AssignValueUpdate(document::IntFieldValue(17)));
+ auto cmd2 = copyCommand(cmd);
+ EXPECT_EQ(_bucket, cmd2->getBucket());
+ EXPECT_EQ(*_testDoc, *cmd2->getDocument());
+ EXPECT_EQ(vespalib::string("foo"), cmd2->getLoadType().getName());
+ EXPECT_EQ(Timestamp(14), cmd2->getTimestamp());
+ EXPECT_EQ(Timestamp(13), cmd2->getUpdateTimestamp());
+
+ auto reply = std::make_shared<PutReply>(*cmd2);
+ ASSERT_TRUE(reply->hasDocument());
+ EXPECT_EQ(*_testDoc, *reply->getDocument());
+ set_dummy_bucket_info_reply_fields(*reply);
+ auto reply2 = copyReply(reply);
+ ASSERT_TRUE(reply2->hasDocument());
+ EXPECT_EQ(*_testDoc, *reply->getDocument());
+ EXPECT_EQ(_testDoc->getId(), reply2->getDocumentId());
+ EXPECT_EQ(Timestamp(14), reply2->getTimestamp());
+ EXPECT_NO_FATAL_FAILURE(assert_bucket_info_reply_fields_propagated(*reply2));
+}
+
+TEST_P(StorageProtocolTest, response_without_remapped_bucket_preserves_original_bucket) {
+ auto cmd = std::make_shared<PutCommand>(_bucket, _testDoc, 14);
+ auto cmd2 = copyCommand(cmd);
+ auto reply = std::make_shared<PutReply>(*cmd2);
+ auto reply2 = copyReply(reply);
+
+ EXPECT_FALSE(reply2->hasBeenRemapped());
+ EXPECT_EQ(_bucket_id, reply2->getBucketId());
+ EXPECT_EQ(document::BucketId(), reply2->getOriginalBucketId());
+
+}
+
+TEST_P(StorageProtocolTest, update) {
+ auto update = std::make_shared<document::DocumentUpdate>(
+ _docMan.getTypeRepo(), *_testDoc->getDataType(), _testDoc->getId());
+ auto assignUpdate = std::make_shared<document::AssignValueUpdate>(document::IntFieldValue(17));
document::FieldUpdate fieldUpdate(_testDoc->getField("headerval"));
fieldUpdate.addUpdate(*assignUpdate);
update->addUpdate(fieldUpdate);
@@ -300,217 +205,157 @@ StorageProtocolTest::testUpdate51()
update->addFieldPathUpdate(document::FieldPathUpdate::CP(
new document::RemoveFieldPathUpdate("headerval", "testdoctype1.headerval > 0")));
- UpdateCommand::SP cmd(new UpdateCommand(_bucket, update, 14));
- CPPUNIT_ASSERT_EQUAL(Timestamp(0), cmd->getOldTimestamp());
+ auto cmd = std::make_shared<UpdateCommand>(_bucket, update, 14);
+ EXPECT_EQ(Timestamp(0), cmd->getOldTimestamp());
cmd->setOldTimestamp(10);
- UpdateCommand::SP cmd2(copyCommand(cmd, _version5_1));
- CPPUNIT_ASSERT_EQUAL(_testDocId, cmd2->getDocumentId());
- CPPUNIT_ASSERT_EQUAL(Timestamp(14), cmd2->getTimestamp());
- CPPUNIT_ASSERT_EQUAL(Timestamp(10), cmd2->getOldTimestamp());
- CPPUNIT_ASSERT_EQUAL(*update, *cmd2->getUpdate());
-
- UpdateReply::SP reply(new UpdateReply(*cmd2, 8));
- UpdateReply::SP reply2(copyReply(reply));
- CPPUNIT_ASSERT_EQUAL(_testDocId, reply2->getDocumentId());
- CPPUNIT_ASSERT_EQUAL(Timestamp(14), reply2->getTimestamp());
- CPPUNIT_ASSERT_EQUAL(Timestamp(8), reply->getOldTimestamp());
-
- recordOutput(*cmd2);
- recordOutput(*reply2);
- recordSerialization50();
-}
-
-void
-StorageProtocolTest::testGet51()
-{
- ScopedName test("testGet51");
- GetCommand::SP cmd(new GetCommand(_bucket, _testDocId, "foo,bar,vekterli", 123));
- GetCommand::SP cmd2(copyCommand(cmd, _version5_1));
- CPPUNIT_ASSERT_EQUAL(_testDocId, cmd2->getDocumentId());
- CPPUNIT_ASSERT_EQUAL(Timestamp(123), cmd2->getBeforeTimestamp());
- CPPUNIT_ASSERT_EQUAL(vespalib::string("foo,bar,vekterli"), cmd2->getFieldSet());
-
- GetReply::SP reply(new GetReply(*cmd2, _testDoc, 100));
- GetReply::SP reply2(copyReply(reply));
- CPPUNIT_ASSERT(reply2.get());
- CPPUNIT_ASSERT(reply2->getDocument().get());
- CPPUNIT_ASSERT_EQUAL(*_testDoc, *reply2->getDocument());
- CPPUNIT_ASSERT_EQUAL(_testDoc->getId(), reply2->getDocumentId());
- CPPUNIT_ASSERT_EQUAL(Timestamp(123), reply2->getBeforeTimestamp());
- CPPUNIT_ASSERT_EQUAL(Timestamp(100), reply2->getLastModifiedTimestamp());
-
- recordOutput(*cmd2);
- recordOutput(*reply2);
- recordSerialization50();
-}
-
-void
-StorageProtocolTest::testRemove51()
-{
- ScopedName test("testRemove51");
- RemoveCommand::SP cmd(new RemoveCommand(_bucket, _testDocId, 159));
- RemoveCommand::SP cmd2(copyCommand(cmd, _version5_1));
- CPPUNIT_ASSERT_EQUAL(_testDocId, cmd2->getDocumentId());
- CPPUNIT_ASSERT_EQUAL(Timestamp(159), cmd2->getTimestamp());
-
- RemoveReply::SP reply(new RemoveReply(*cmd2, 48));
- reply->setBucketInfo(BucketInfo(1,2,3,4,5, true, false, 48));
-
- RemoveReply::SP reply2(copyReply(reply));
- CPPUNIT_ASSERT_EQUAL(_testDocId, reply2->getDocumentId());
- CPPUNIT_ASSERT_EQUAL(Timestamp(159), reply2->getTimestamp());
- CPPUNIT_ASSERT_EQUAL(Timestamp(48), reply2->getOldTimestamp());
- CPPUNIT_ASSERT_EQUAL(BucketInfo(1,2,3,4,5, true, false, 48),
- reply2->getBucketInfo());
-
- recordOutput(*cmd2);
- recordOutput(*reply2);
- recordSerialization50();
-}
-
-void
-StorageProtocolTest::testRevert51()
-{
- ScopedName test("testRevertCommand51");
+ auto cmd2 = copyCommand(cmd);
+ EXPECT_EQ(_bucket, cmd2->getBucket());
+ EXPECT_EQ(_testDocId, cmd2->getDocumentId());
+ EXPECT_EQ(Timestamp(14), cmd2->getTimestamp());
+ EXPECT_EQ(Timestamp(10), cmd2->getOldTimestamp());
+ EXPECT_EQ(*update, *cmd2->getUpdate());
+
+ auto reply = std::make_shared<UpdateReply>(*cmd2, 8);
+ set_dummy_bucket_info_reply_fields(*reply);
+ auto reply2 = copyReply(reply);
+ EXPECT_EQ(_testDocId, reply2->getDocumentId());
+ EXPECT_EQ(Timestamp(14), reply2->getTimestamp());
+ EXPECT_EQ(Timestamp(8), reply->getOldTimestamp());
+ EXPECT_NO_FATAL_FAILURE(assert_bucket_info_reply_fields_propagated(*reply2));
+}
+
+TEST_P(StorageProtocolTest, get) {
+ auto cmd = std::make_shared<GetCommand>(_bucket, _testDocId, "foo,bar,vekterli", 123);
+ auto cmd2 = copyCommand(cmd);
+ EXPECT_EQ(_bucket, cmd2->getBucket());
+ EXPECT_EQ(_testDocId, cmd2->getDocumentId());
+ EXPECT_EQ(Timestamp(123), cmd2->getBeforeTimestamp());
+ EXPECT_EQ(vespalib::string("foo,bar,vekterli"), cmd2->getFieldSet());
+
+ auto reply = std::make_shared<GetReply>(*cmd2, _testDoc, 100);
+ set_dummy_bucket_info_reply_fields(*reply);
+ auto reply2 = copyReply(reply);
+ ASSERT_TRUE(reply2.get() != nullptr);
+ ASSERT_TRUE(reply2->getDocument().get() != nullptr);
+ EXPECT_EQ(*_testDoc, *reply2->getDocument());
+ EXPECT_EQ(_testDoc->getId(), reply2->getDocumentId());
+ EXPECT_EQ(Timestamp(123), reply2->getBeforeTimestamp());
+ EXPECT_EQ(Timestamp(100), reply2->getLastModifiedTimestamp());
+ EXPECT_NO_FATAL_FAILURE(assert_bucket_info_reply_fields_propagated(*reply2));
+}
+
+TEST_P(StorageProtocolTest, remove) {
+ auto cmd = std::make_shared<RemoveCommand>(_bucket, _testDocId, 159);
+ auto cmd2 = copyCommand(cmd);
+ EXPECT_EQ(_bucket, cmd2->getBucket());
+ EXPECT_EQ(_testDocId, cmd2->getDocumentId());
+ EXPECT_EQ(Timestamp(159), cmd2->getTimestamp());
+
+ auto reply = std::make_shared<RemoveReply>(*cmd2, 48);
+ set_dummy_bucket_info_reply_fields(*reply);
+
+ auto reply2 = copyReply(reply);
+ EXPECT_EQ(_testDocId, reply2->getDocumentId());
+ EXPECT_EQ(Timestamp(159), reply2->getTimestamp());
+ EXPECT_EQ(Timestamp(48), reply2->getOldTimestamp());
+ EXPECT_NO_FATAL_FAILURE(assert_bucket_info_reply_fields_propagated(*reply2));
+}
+
+TEST_P(StorageProtocolTest, revert) {
std::vector<Timestamp> tokens;
tokens.push_back(59);
- RevertCommand::SP cmd(new RevertCommand(_bucket, tokens));
- RevertCommand::SP cmd2(copyCommand(cmd, _version5_1));
- CPPUNIT_ASSERT_EQUAL(tokens, cmd2->getRevertTokens());
-
- RevertReply::SP reply(new RevertReply(*cmd2));
- BucketInfo info(0x12345432, 101, 520);
- reply->setBucketInfo(info);
- RevertReply::SP reply2(copyReply(reply));
+ auto cmd = std::make_shared<RevertCommand>(_bucket, tokens);
+ auto cmd2 = copyCommand(cmd);
+ EXPECT_EQ(_bucket, cmd2->getBucket());
+ EXPECT_EQ(tokens, cmd2->getRevertTokens());
- CPPUNIT_ASSERT_EQUAL(info, reply2->getBucketInfo());
-
- recordOutput(*cmd2);
- recordOutput(*reply2);
- recordSerialization50();
+ auto reply = std::make_shared<RevertReply>(*cmd2);
+ set_dummy_bucket_info_reply_fields(*reply);
+ auto reply2 = copyReply(reply);
+ EXPECT_NO_FATAL_FAILURE(assert_bucket_info_reply_fields_propagated(*reply2));
}
-void
-StorageProtocolTest::testRequestBucketInfo51()
-{
- ScopedName test("testRequestBucketInfo51");
+TEST_P(StorageProtocolTest, request_bucket_info) {
{
std::vector<document::BucketId> ids;
ids.push_back(document::BucketId(3));
ids.push_back(document::BucketId(7));
- RequestBucketInfoCommand::SP cmd(new RequestBucketInfoCommand(makeBucketSpace(), ids));
- RequestBucketInfoCommand::SP cmd2(copyCommand(cmd, _version5_1));
- CPPUNIT_ASSERT_EQUAL(ids, cmd2->getBuckets());
- CPPUNIT_ASSERT(!cmd2->hasSystemState());
-
- recordOutput(*cmd2);
+ auto cmd = std::make_shared<RequestBucketInfoCommand>(makeBucketSpace(), ids);
+ auto cmd2 = copyCommand(cmd);
+ EXPECT_EQ(ids, cmd2->getBuckets());
+ EXPECT_FALSE(cmd2->hasSystemState());
}
{
ClusterState state("distributor:3 .1.s:d");
- RequestBucketInfoCommand::SP cmd(new RequestBucketInfoCommand(
- makeBucketSpace(),
- 3, state, "14"));
- RequestBucketInfoCommand::SP cmd2(copyCommand(cmd, _version5_1));
- CPPUNIT_ASSERT(cmd2->hasSystemState());
- CPPUNIT_ASSERT_EQUAL(uint16_t(3), cmd2->getDistributor());
- CPPUNIT_ASSERT_EQUAL(state, cmd2->getSystemState());
- CPPUNIT_ASSERT_EQUAL(size_t(0), cmd2->getBuckets().size());
-
- RequestBucketInfoReply::SP reply(new RequestBucketInfoReply(*cmd));
+ auto cmd = std::make_shared<RequestBucketInfoCommand>(makeBucketSpace(), 3, state, "14");
+ auto cmd2 = copyCommand(cmd);
+ ASSERT_TRUE(cmd2->hasSystemState());
+ EXPECT_EQ(uint16_t(3), cmd2->getDistributor());
+ EXPECT_EQ(state, cmd2->getSystemState());
+ EXPECT_EQ(size_t(0), cmd2->getBuckets().size());
+
+ auto reply = std::make_shared<RequestBucketInfoReply>(*cmd);
RequestBucketInfoReply::Entry e;
e._bucketId = document::BucketId(4);
const uint64_t lastMod = 0x1337cafe98765432ULL;
e._info = BucketInfo(43, 24, 123, 44, 124, false, true, lastMod);
reply->getBucketInfo().push_back(e);
- RequestBucketInfoReply::SP reply2(copyReply(reply));
- CPPUNIT_ASSERT_EQUAL(size_t(1), reply2->getBucketInfo().size());
+ auto reply2 = copyReply(reply);
+ EXPECT_EQ(size_t(1), reply2->getBucketInfo().size());
auto& entries(reply2->getBucketInfo());
- CPPUNIT_ASSERT_EQUAL(e, entries[0]);
+ EXPECT_EQ(e, entries[0]);
// "Last modified" not counted by operator== for some reason. Testing
// separately until we can figure out if this is by design or not.
- CPPUNIT_ASSERT_EQUAL(lastMod, entries[0]._info.getLastModified());
-
- recordOutput(*cmd2);
- recordOutput(*reply2);
- recordSerialization50();
+ EXPECT_EQ(lastMod, entries[0]._info.getLastModified());
}
}
-void
-StorageProtocolTest::testNotifyBucketChange51()
-{
- ScopedName test("testNotifyBucketChange51");
- BucketInfo info(2, 3, 4);
- document::BucketId modifiedBucketId(20, 1000);
- document::Bucket modifiedBucket(makeDocumentBucket(modifiedBucketId));
- NotifyBucketChangeCommand::SP cmd(new NotifyBucketChangeCommand(
- modifiedBucket, info));
- NotifyBucketChangeCommand::SP cmd2(copyCommand(cmd, _version5_1));
- CPPUNIT_ASSERT_EQUAL(document::BucketId(20, 1000),
- cmd2->getBucketId());
- CPPUNIT_ASSERT_EQUAL(info, cmd2->getBucketInfo());
-
- NotifyBucketChangeReply::SP reply(new NotifyBucketChangeReply(*cmd));
- NotifyBucketChangeReply::SP reply2(copyReply(reply));
-
- recordOutput(*cmd2);
- recordOutput(*reply2);
- recordSerialization50();
-}
-
-void
-StorageProtocolTest::testCreateBucket51()
-{
- ScopedName test("testCreateBucket51");
- document::BucketId bucketId(623);
- document::Bucket bucket(makeDocumentBucket(bucketId));
+TEST_P(StorageProtocolTest, notify_bucket_change) {
+ auto cmd = std::make_shared<NotifyBucketChangeCommand>(_bucket, _dummy_bucket_info);
+ auto cmd2 = copyCommand(cmd);
+ EXPECT_EQ(_bucket, cmd2->getBucket());
+ EXPECT_EQ(_dummy_bucket_info, cmd2->getBucketInfo());
- CreateBucketCommand::SP cmd(new CreateBucketCommand(bucket));
- CreateBucketCommand::SP cmd2(copyCommand(cmd, _version5_1));
- CPPUNIT_ASSERT_EQUAL(bucketId, cmd2->getBucketId());
+ auto reply = std::make_shared<NotifyBucketChangeReply>(*cmd);
+ auto reply2 = copyReply(reply);
+}
- CreateBucketReply::SP reply(new CreateBucketReply(*cmd));
- CreateBucketReply::SP reply2(copyReply(reply));
- CPPUNIT_ASSERT_EQUAL(bucketId, reply2->getBucketId());
+TEST_P(StorageProtocolTest, create_bucket_without_activation) {
+ auto cmd = std::make_shared<CreateBucketCommand>(_bucket);
+ EXPECT_FALSE(cmd->getActive());
+ auto cmd2 = copyCommand(cmd);
+ EXPECT_EQ(_bucket, cmd2->getBucket());
+ EXPECT_FALSE(cmd2->getActive());
- recordOutput(*cmd2);
- recordOutput(*reply2);
- recordSerialization50();
+ auto reply = std::make_shared<CreateBucketReply>(*cmd);
+ set_dummy_bucket_info_reply_fields(*reply);
+ auto reply2 = copyReply(reply);
+ EXPECT_NO_FATAL_FAILURE(assert_bucket_info_reply_fields_propagated(*reply2));
}
-void
-StorageProtocolTest::testDeleteBucket51()
-{
- ScopedName test("testDeleteBucket51");
- document::BucketId bucketId(623);
- document::Bucket bucket(makeDocumentBucket(bucketId));
-
- DeleteBucketCommand::SP cmd(new DeleteBucketCommand(bucket));
- BucketInfo info(0x100, 200, 300);
- cmd->setBucketInfo(info);
- DeleteBucketCommand::SP cmd2(copyCommand(cmd, _version5_1));
- CPPUNIT_ASSERT_EQUAL(bucketId, cmd2->getBucketId());
- CPPUNIT_ASSERT_EQUAL(info, cmd2->getBucketInfo());
-
- DeleteBucketReply::SP reply(new DeleteBucketReply(*cmd));
+TEST_P(StorageProtocolTest, create_bucket_propagates_activation_flag) {
+ auto cmd = std::make_shared<CreateBucketCommand>(_bucket);
+ cmd->setActive(true);
+ auto cmd2 = copyCommand(cmd);
+ EXPECT_TRUE(cmd2->getActive());
+}
+
+TEST_P(StorageProtocolTest, delete_bucket) {
+ auto cmd = std::make_shared<DeleteBucketCommand>(_bucket);
+ cmd->setBucketInfo(_dummy_bucket_info);
+ auto cmd2 = copyCommand(cmd);
+ EXPECT_EQ(_bucket, cmd2->getBucket());
+ EXPECT_EQ(_dummy_bucket_info, cmd2->getBucketInfo());
+
+ auto reply = std::make_shared<DeleteBucketReply>(*cmd);
// Not set automatically by constructor
reply->setBucketInfo(cmd2->getBucketInfo());
- DeleteBucketReply::SP reply2(copyReply(reply));
- CPPUNIT_ASSERT_EQUAL(bucketId, reply2->getBucketId());
- CPPUNIT_ASSERT_EQUAL(info, reply2->getBucketInfo());
-
- recordOutput(*cmd2);
- recordOutput(*reply2);
- recordSerialization50();
+ auto reply2 = copyReply(reply);
+ EXPECT_EQ(_bucket_id, reply2->getBucketId());
+ EXPECT_EQ(_dummy_bucket_info, reply2->getBucketInfo());
}
-void
-StorageProtocolTest::testMergeBucket51()
-{
- ScopedName test("testMergeBucket51");
- document::BucketId bucketId(623);
- document::Bucket bucket(makeDocumentBucket(bucketId));
-
+TEST_P(StorageProtocolTest, merge_bucket) {
typedef api::MergeBucketCommand::Node Node;
std::vector<Node> nodes;
nodes.push_back(Node(4, false));
@@ -522,152 +367,98 @@ StorageProtocolTest::testMergeBucket51()
chain.push_back(7);
chain.push_back(14);
- MergeBucketCommand::SP cmd(
- new MergeBucketCommand(bucket, nodes, Timestamp(1234), 567, chain));
- MergeBucketCommand::SP cmd2(copyCommand(cmd, _version5_1));
- CPPUNIT_ASSERT_EQUAL(bucketId, cmd2->getBucketId());
- CPPUNIT_ASSERT_EQUAL(nodes, cmd2->getNodes());
- CPPUNIT_ASSERT_EQUAL(Timestamp(1234), cmd2->getMaxTimestamp());
- CPPUNIT_ASSERT_EQUAL(uint32_t(567), cmd2->getClusterStateVersion());
- CPPUNIT_ASSERT_EQUAL(chain, cmd2->getChain());
-
- MergeBucketReply::SP reply(new MergeBucketReply(*cmd));
- MergeBucketReply::SP reply2(copyReply(reply));
- CPPUNIT_ASSERT_EQUAL(bucketId, reply2->getBucketId());
- CPPUNIT_ASSERT_EQUAL(nodes, reply2->getNodes());
- CPPUNIT_ASSERT_EQUAL(Timestamp(1234), reply2->getMaxTimestamp());
- CPPUNIT_ASSERT_EQUAL(uint32_t(567), reply2->getClusterStateVersion());
- CPPUNIT_ASSERT_EQUAL(chain, reply2->getChain());
-
- recordOutput(*cmd2);
- recordOutput(*reply2);
- recordSerialization50();
-}
-
-void
-StorageProtocolTest::testSplitBucket51()
-{
- ScopedName test("testSplitBucket51");
-
- document::BucketId bucketId(16, 0);
- document::Bucket bucket(makeDocumentBucket(bucketId));
- SplitBucketCommand::SP cmd(new SplitBucketCommand(bucket));
- CPPUNIT_ASSERT_EQUAL(0u, (uint32_t) cmd->getMinSplitBits());
- CPPUNIT_ASSERT_EQUAL(58u, (uint32_t) cmd->getMaxSplitBits());
- CPPUNIT_ASSERT_EQUAL(std::numeric_limits<uint32_t>().max(),
- cmd->getMinByteSize());
- CPPUNIT_ASSERT_EQUAL(std::numeric_limits<uint32_t>().max(),
- cmd->getMinDocCount());
+ auto cmd = std::make_shared<MergeBucketCommand>(_bucket, nodes, Timestamp(1234), 567, chain);
+ auto cmd2 = copyCommand(cmd);
+ EXPECT_EQ(_bucket, cmd2->getBucket());
+ EXPECT_EQ(nodes, cmd2->getNodes());
+ EXPECT_EQ(Timestamp(1234), cmd2->getMaxTimestamp());
+ EXPECT_EQ(uint32_t(567), cmd2->getClusterStateVersion());
+ EXPECT_EQ(chain, cmd2->getChain());
+
+ auto reply = std::make_shared<MergeBucketReply>(*cmd);
+ auto reply2 = copyReply(reply);
+ EXPECT_EQ(_bucket_id, reply2->getBucketId());
+ EXPECT_EQ(nodes, reply2->getNodes());
+ EXPECT_EQ(Timestamp(1234), reply2->getMaxTimestamp());
+ EXPECT_EQ(uint32_t(567), reply2->getClusterStateVersion());
+ EXPECT_EQ(chain, reply2->getChain());
+}
+
+TEST_P(StorageProtocolTest, split_bucket) {
+ auto cmd = std::make_shared<SplitBucketCommand>(_bucket);
+ EXPECT_EQ(0u, cmd->getMinSplitBits());
+ EXPECT_EQ(58u, cmd->getMaxSplitBits());
+ EXPECT_EQ(std::numeric_limits<uint32_t>().max(), cmd->getMinByteSize());
+ EXPECT_EQ(std::numeric_limits<uint32_t>().max(), cmd->getMinDocCount());
cmd->setMinByteSize(1000);
cmd->setMinDocCount(5);
cmd->setMaxSplitBits(40);
cmd->setMinSplitBits(20);
- SplitBucketCommand::SP cmd2(copyCommand(cmd, _version5_1));
- CPPUNIT_ASSERT_EQUAL(20u, (uint32_t) cmd2->getMinSplitBits());
- CPPUNIT_ASSERT_EQUAL(40u, (uint32_t) cmd2->getMaxSplitBits());
- CPPUNIT_ASSERT_EQUAL(1000u, cmd2->getMinByteSize());
- CPPUNIT_ASSERT_EQUAL(5u, cmd2->getMinDocCount());
-
- SplitBucketReply::SP reply(new SplitBucketReply(*cmd2));
- reply->getSplitInfo().push_back(SplitBucketReply::Entry(
- document::BucketId(17, 0), BucketInfo(100, 1000, 10000, true, true)));
- reply->getSplitInfo().push_back(SplitBucketReply::Entry(
- document::BucketId(17, 1), BucketInfo(101, 1001, 10001, true, true)));
- SplitBucketReply::SP reply2(copyReply(reply));
-
- CPPUNIT_ASSERT_EQUAL(bucketId, reply2->getBucketId());
- CPPUNIT_ASSERT_EQUAL(size_t(2), reply2->getSplitInfo().size());
- CPPUNIT_ASSERT_EQUAL(document::BucketId(17, 0),
- reply2->getSplitInfo()[0].first);
- CPPUNIT_ASSERT_EQUAL(document::BucketId(17, 1),
- reply2->getSplitInfo()[1].first);
- CPPUNIT_ASSERT_EQUAL(BucketInfo(100, 1000, 10000, true, true),
- reply2->getSplitInfo()[0].second);
- CPPUNIT_ASSERT_EQUAL(BucketInfo(101, 1001, 10001, true, true),
- reply2->getSplitInfo()[1].second);
-
- recordOutput(*cmd2);
- recordOutput(*reply2);
- recordSerialization50();
-}
-
-void
-StorageProtocolTest::testJoinBuckets51()
-{
- ScopedName test("testJoinBuckets51");
- document::BucketId bucketId(16, 0);
- document::Bucket bucket(makeDocumentBucket(bucketId));
+ auto cmd2 = copyCommand(cmd);
+ EXPECT_EQ(_bucket, cmd2->getBucket());
+ EXPECT_EQ(20u, cmd2->getMinSplitBits());
+ EXPECT_EQ(40u, cmd2->getMaxSplitBits());
+ EXPECT_EQ(1000u, cmd2->getMinByteSize());
+ EXPECT_EQ(5u, cmd2->getMinDocCount());
+
+ auto reply = std::make_shared<SplitBucketReply>(*cmd2);
+ reply->getSplitInfo().emplace_back(document::BucketId(17, 0), BucketInfo(100, 1000, 10000, true, true));
+ reply->getSplitInfo().emplace_back(document::BucketId(17, 1), BucketInfo(101, 1001, 10001, true, true));
+ auto reply2 = copyReply(reply);
+
+ EXPECT_EQ(_bucket, reply2->getBucket());
+ EXPECT_EQ(size_t(2), reply2->getSplitInfo().size());
+ EXPECT_EQ(document::BucketId(17, 0), reply2->getSplitInfo()[0].first);
+ EXPECT_EQ(document::BucketId(17, 1), reply2->getSplitInfo()[1].first);
+ EXPECT_EQ(BucketInfo(100, 1000, 10000, true, true), reply2->getSplitInfo()[0].second);
+ EXPECT_EQ(BucketInfo(101, 1001, 10001, true, true), reply2->getSplitInfo()[1].second);
+}
+
+TEST_P(StorageProtocolTest, join_buckets) {
std::vector<document::BucketId> sources;
sources.push_back(document::BucketId(17, 0));
sources.push_back(document::BucketId(17, 1));
- JoinBucketsCommand::SP cmd(new JoinBucketsCommand(bucket));
+ auto cmd = std::make_shared<JoinBucketsCommand>(_bucket);
cmd->getSourceBuckets() = sources;
cmd->setMinJoinBits(3);
- JoinBucketsCommand::SP cmd2(copyCommand(cmd, _version5_1));
+ auto cmd2 = copyCommand(cmd);
+ EXPECT_EQ(_bucket, cmd2->getBucket());
- JoinBucketsReply::SP reply(new JoinBucketsReply(*cmd2));
+ auto reply = std::make_shared<JoinBucketsReply>(*cmd2);
reply->setBucketInfo(BucketInfo(3,4,5));
- JoinBucketsReply::SP reply2(copyReply(reply));
+ auto reply2 = copyReply(reply);
- CPPUNIT_ASSERT_EQUAL(sources, reply2->getSourceBuckets());
- CPPUNIT_ASSERT_EQUAL(3, (int)cmd2->getMinJoinBits());
- CPPUNIT_ASSERT_EQUAL(BucketInfo(3,4,5), reply2->getBucketInfo());
- CPPUNIT_ASSERT_EQUAL(bucketId, reply2->getBucketId());
-
- recordOutput(*cmd2);
- recordOutput(*reply2);
+ EXPECT_EQ(sources, reply2->getSourceBuckets());
+ EXPECT_EQ(3, cmd2->getMinJoinBits());
+ EXPECT_EQ(BucketInfo(3,4,5), reply2->getBucketInfo());
+ EXPECT_EQ(_bucket, reply2->getBucket());
}
-void
-StorageProtocolTest::testDestroyVisitor51()
-{
- ScopedName test("testDestroyVisitor51");
+TEST_P(StorageProtocolTest, destroy_visitor) {
+ auto cmd = std::make_shared<DestroyVisitorCommand>("instance");
+ auto cmd2 = copyCommand(cmd);
+ EXPECT_EQ("instance", cmd2->getInstanceId());
- DestroyVisitorCommand::SP cmd(
- new DestroyVisitorCommand("instance"));
- DestroyVisitorCommand::SP cmd2(copyCommand(cmd, _version5_1));
- CPPUNIT_ASSERT_EQUAL(string("instance"), cmd2->getInstanceId());
-
- DestroyVisitorReply::SP reply(new DestroyVisitorReply(*cmd2));
- DestroyVisitorReply::SP reply2(copyReply(reply));
-
- recordOutput(*cmd2);
- recordOutput(*reply2);
- recordSerialization50();
+ auto reply = std::make_shared<DestroyVisitorReply>(*cmd2);
+ auto reply2 = copyReply(reply);
}
-void
-StorageProtocolTest::testRemoveLocation51()
-{
- ScopedName test("testRemoveLocation51");
- document::BucketId bucketId(16, 1234);
- document::Bucket bucket(makeDocumentBucket(bucketId));
-
- RemoveLocationCommand::SP cmd(
- new RemoveLocationCommand("id.group == \"mygroup\"", bucket));
- RemoveLocationCommand::SP cmd2(copyCommand(cmd, _version5_1));
- CPPUNIT_ASSERT_EQUAL(vespalib::string("id.group == \"mygroup\""), cmd2->getDocumentSelection());
- CPPUNIT_ASSERT_EQUAL(bucketId, cmd2->getBucketId());
-
- RemoveLocationReply::SP reply(new RemoveLocationReply(*cmd2));
- RemoveLocationReply::SP reply2(copyReply(reply));
+TEST_P(StorageProtocolTest, remove_location) {
+ auto cmd = std::make_shared<RemoveLocationCommand>("id.group == \"mygroup\"", _bucket);
+ auto cmd2 = copyCommand(cmd);
+ EXPECT_EQ("id.group == \"mygroup\"", cmd2->getDocumentSelection());
+ EXPECT_EQ(_bucket, cmd2->getBucket());
- recordOutput(*cmd2);
- recordOutput(*reply2);
- recordSerialization50();
+ auto reply = std::make_shared<RemoveLocationReply>(*cmd2);
+ auto reply2 = copyReply(reply);
}
-void
-StorageProtocolTest::testCreateVisitor51()
-{
- ScopedName test("testCreateVisitor51");
-
+TEST_P(StorageProtocolTest, create_visitor) {
std::vector<document::BucketId> buckets;
buckets.push_back(document::BucketId(16, 1));
buckets.push_back(document::BucketId(16, 2));
- CreateVisitorCommand::SP cmd(
- new CreateVisitorCommand(makeBucketSpace(), "library", "id", "doc selection"));
+ auto cmd = std::make_shared<CreateVisitorCommand>(makeBucketSpace(), "library", "id", "doc selection");
cmd->setControlDestination("controldest");
cmd->setDataDestination("datadest");
cmd->setVisitorCmdId(1);
@@ -681,40 +472,26 @@ StorageProtocolTest::testCreateVisitor51()
cmd->setFieldSet("foo,bar,vekterli");
cmd->setVisitInconsistentBuckets();
cmd->setQueueTimeout(100);
- cmd->setVisitorOrdering(document::OrderingSpecification::DESCENDING);
cmd->setPriority(149);
- CreateVisitorCommand::SP cmd2(copyCommand(cmd, _version5_1));
- CPPUNIT_ASSERT_EQUAL(string("library"), cmd2->getLibraryName());
- CPPUNIT_ASSERT_EQUAL(string("id"), cmd2->getInstanceId());
- CPPUNIT_ASSERT_EQUAL(string("doc selection"),
- cmd2->getDocumentSelection());
- CPPUNIT_ASSERT_EQUAL(string("controldest"),
- cmd2->getControlDestination());
- CPPUNIT_ASSERT_EQUAL(string("datadest"), cmd2->getDataDestination());
- CPPUNIT_ASSERT_EQUAL(api::Timestamp(123), cmd2->getFromTime());
- CPPUNIT_ASSERT_EQUAL(api::Timestamp(456), cmd2->getToTime());
- CPPUNIT_ASSERT_EQUAL(2u, cmd2->getMaximumPendingReplyCount());
- CPPUNIT_ASSERT_EQUAL(buckets, cmd2->getBuckets());
- CPPUNIT_ASSERT_EQUAL(vespalib::string("foo,bar,vekterli"), cmd2->getFieldSet());
- CPPUNIT_ASSERT(cmd2->visitInconsistentBuckets());
- CPPUNIT_ASSERT_EQUAL(document::OrderingSpecification::DESCENDING, cmd2->getVisitorOrdering());
- CPPUNIT_ASSERT_EQUAL(149, (int)cmd2->getPriority());
-
- CreateVisitorReply::SP reply(new CreateVisitorReply(*cmd2));
- CreateVisitorReply::SP reply2(copyReply(reply));
-
- recordOutput(*cmd2);
- recordOutput(*reply2);
- recordSerialization50();
-}
-
-void
-StorageProtocolTest::testGetBucketDiff51()
-{
- ScopedName test("testGetBucketDiff51");
- document::BucketId bucketId(623);
- document::Bucket bucket(makeDocumentBucket(bucketId));
-
+ auto cmd2 = copyCommand(cmd);
+ EXPECT_EQ("library", cmd2->getLibraryName());
+ EXPECT_EQ("id", cmd2->getInstanceId());
+ EXPECT_EQ("doc selection", cmd2->getDocumentSelection());
+ EXPECT_EQ("controldest", cmd2->getControlDestination());
+ EXPECT_EQ("datadest", cmd2->getDataDestination());
+ EXPECT_EQ(api::Timestamp(123), cmd2->getFromTime());
+ EXPECT_EQ(api::Timestamp(456), cmd2->getToTime());
+ EXPECT_EQ(2u, cmd2->getMaximumPendingReplyCount());
+ EXPECT_EQ(buckets, cmd2->getBuckets());
+ EXPECT_EQ("foo,bar,vekterli", cmd2->getFieldSet());
+ EXPECT_TRUE(cmd2->visitInconsistentBuckets());
+ EXPECT_EQ(149, cmd2->getPriority());
+
+ auto reply = std::make_shared<CreateVisitorReply>(*cmd2);
+ auto reply2 = copyReply(reply);
+}
+
+TEST_P(StorageProtocolTest, get_bucket_diff) {
std::vector<api::MergeBucketCommand::Node> nodes;
nodes.push_back(4);
nodes.push_back(13);
@@ -727,56 +504,68 @@ StorageProtocolTest::testGetBucketDiff51()
entries.back()._flags = 1;
entries.back()._hasMask = 3;
- CPPUNIT_ASSERT_EQUAL(std::string(
- "Entry(timestamp: 123456, gid(0x313233343536373839306162), "
- "hasMask: 0x3,\n"
- " header size: 100, body size: 65536, flags 0x1)"),
- entries.back().toString(true));
+ EXPECT_EQ("Entry(timestamp: 123456, gid(0x313233343536373839306162), hasMask: 0x3,\n"
+ " header size: 100, body size: 65536, flags 0x1)",
+ entries.back().toString(true));
- GetBucketDiffCommand::SP cmd(new GetBucketDiffCommand(bucket, nodes, 1056));
+ auto cmd = std::make_shared<GetBucketDiffCommand>(_bucket, nodes, 1056);
cmd->getDiff() = entries;
- GetBucketDiffCommand::SP cmd2(copyCommand(cmd, _version5_1));
+ auto cmd2 = copyCommand(cmd);
+ EXPECT_EQ(_bucket, cmd2->getBucket());
+
+ auto reply = std::make_shared<GetBucketDiffReply>(*cmd2);
+ EXPECT_EQ(entries, reply->getDiff());
+ auto reply2 = copyReply(reply);
- GetBucketDiffReply::SP reply(new GetBucketDiffReply(*cmd2));
- CPPUNIT_ASSERT_EQUAL(entries, reply->getDiff());
- GetBucketDiffReply::SP reply2(copyReply(reply));
+ EXPECT_EQ(nodes, reply2->getNodes());
+ EXPECT_EQ(entries, reply2->getDiff());
+ EXPECT_EQ(Timestamp(1056), reply2->getMaxTimestamp());
+}
+
+namespace {
+
+ApplyBucketDiffCommand::Entry dummy_apply_entry() {
+ ApplyBucketDiffCommand::Entry e;
+ e._docName = "my cool id";
+ vespalib::string header_data = "fancy header";
+ e._headerBlob.resize(header_data.size());
+ memcpy(&e._headerBlob[0], header_data.data(), header_data.size());
- CPPUNIT_ASSERT_EQUAL(nodes, reply2->getNodes());
- CPPUNIT_ASSERT_EQUAL(entries, reply2->getDiff());
- CPPUNIT_ASSERT_EQUAL(Timestamp(1056), reply2->getMaxTimestamp());
+ vespalib::string body_data = "fancier body!";
+ e._bodyBlob.resize(body_data.size());
+ memcpy(&e._bodyBlob[0], body_data.data(), body_data.size());
- recordOutput(*cmd2);
- recordOutput(*reply2);
- recordSerialization50();
+ GetBucketDiffCommand::Entry meta;
+ meta._timestamp = 567890;
+ meta._hasMask = 0x3;
+ meta._flags = 0x1;
+ meta._headerSize = 12345;
+ meta._headerSize = header_data.size();
+ meta._bodySize = body_data.size();
+
+ e._entry = meta;
+ return e;
}
-void
-StorageProtocolTest::testApplyBucketDiff51()
-{
- ScopedName test("testApplyBucketDiff51");
- document::BucketId bucketId(16, 623);
- document::Bucket bucket(makeDocumentBucket(bucketId));
+}
+TEST_P(StorageProtocolTest, apply_bucket_diff) {
std::vector<api::MergeBucketCommand::Node> nodes;
nodes.push_back(4);
nodes.push_back(13);
- std::vector<ApplyBucketDiffCommand::Entry> entries;
- entries.push_back(ApplyBucketDiffCommand::Entry());
+ std::vector<ApplyBucketDiffCommand::Entry> entries = {dummy_apply_entry()};
- ApplyBucketDiffCommand::SP cmd(new ApplyBucketDiffCommand(bucket, nodes, 1234));
+ auto cmd = std::make_shared<ApplyBucketDiffCommand>(_bucket, nodes, 1234);
cmd->getDiff() = entries;
- ApplyBucketDiffCommand::SP cmd2(copyCommand(cmd, _version5_1));
+ auto cmd2 = copyCommand(cmd);
+ EXPECT_EQ(_bucket, cmd2->getBucket());
- ApplyBucketDiffReply::SP reply(new ApplyBucketDiffReply(*cmd2));
- ApplyBucketDiffReply::SP reply2(copyReply(reply));
+ auto reply = std::make_shared<ApplyBucketDiffReply>(*cmd2);
+ auto reply2 = copyReply(reply);
- CPPUNIT_ASSERT_EQUAL(nodes, reply2->getNodes());
- CPPUNIT_ASSERT_EQUAL(entries, reply2->getDiff());
- CPPUNIT_ASSERT_EQUAL(1234u, reply2->getMaxBufferSize());
-
- recordOutput(*cmd2);
- recordOutput(*reply2);
- recordSerialization50();
+ EXPECT_EQ(nodes, reply2->getNodes());
+ EXPECT_EQ(entries, reply2->getDiff());
+ EXPECT_EQ(1234u, reply2->getMaxBufferSize());
}
namespace {
@@ -807,161 +596,97 @@ namespace {
};
api::StorageReply::UP MyCommand::makeReply() {
- return api::StorageReply::UP(new MyReply(*this));
+ return std::make_unique<MyReply>(*this);
}
}
-void
-StorageProtocolTest::testInternalMessage()
-{
- ScopedName test("testInternal51");
+TEST_P(StorageProtocolTest, internal_message) {
MyCommand cmd;
MyReply reply(cmd);
-
- recordOutput(cmd);
- recordOutput(reply);
+ // TODO what's this even intended to test?
}
-void
-StorageProtocolTest::testSetBucketState51()
-{
- ScopedName test("testSetBucketState51");
- document::BucketId bucketId(16, 0);
- document::Bucket bucket(makeDocumentBucket(bucketId));
- SetBucketStateCommand::SP cmd(
- new SetBucketStateCommand(bucket, SetBucketStateCommand::ACTIVE));
- SetBucketStateCommand::SP cmd2(copyCommand(cmd, _version5_1));
+TEST_P(StorageProtocolTest, set_bucket_state_with_inactive_state) {
+ auto cmd = std::make_shared<SetBucketStateCommand>(_bucket, SetBucketStateCommand::INACTIVE);
+ auto cmd2 = copyCommand(cmd);
+ EXPECT_EQ(_bucket, cmd2->getBucket());
- SetBucketStateReply::SP reply(new SetBucketStateReply(*cmd2));
- SetBucketStateReply::SP reply2(copyReply(reply));
+ auto reply = std::make_shared<SetBucketStateReply>(*cmd2);
+ auto reply2 = copyReply(reply);
- CPPUNIT_ASSERT_EQUAL(SetBucketStateCommand::ACTIVE, cmd2->getState());
- CPPUNIT_ASSERT_EQUAL(bucketId, cmd2->getBucketId());
- CPPUNIT_ASSERT_EQUAL(bucketId, reply2->getBucketId());
-
- recordOutput(*cmd2);
- recordOutput(*reply2);
+ EXPECT_EQ(SetBucketStateCommand::INACTIVE, cmd2->getState());
+ EXPECT_EQ(_bucket, reply2->getBucket());
}
-void
-StorageProtocolTest::testPutCommand52()
-{
- ScopedName test("testPutCommand52");
+TEST_P(StorageProtocolTest, set_bucket_state_with_active_state) {
+ auto cmd = std::make_shared<SetBucketStateCommand>(_bucket, SetBucketStateCommand::ACTIVE);
+ auto cmd2 = copyCommand(cmd);
+ EXPECT_EQ(SetBucketStateCommand::ACTIVE, cmd2->getState());
+}
- PutCommand::SP cmd(new PutCommand(_bucket, _testDoc, 14));
+TEST_P(StorageProtocolTest, put_command_with_condition) {
+ auto cmd = std::make_shared<PutCommand>(_bucket, _testDoc, 14);
cmd->setCondition(TestAndSetCondition(CONDITION_STRING));
- PutCommand::SP cmd2(copyCommand(cmd, _version5_2));
- CPPUNIT_ASSERT_EQUAL(cmd->getCondition().getSelection(), cmd2->getCondition().getSelection());
+ auto cmd2 = copyCommand(cmd);
+ EXPECT_EQ(cmd->getCondition().getSelection(), cmd2->getCondition().getSelection());
}
-void
-StorageProtocolTest::testUpdateCommand52()
-{
- ScopedName test("testUpdateCommand52");
-
- document::DocumentUpdate::SP update(new document::DocumentUpdate(_docMan.getTypeRepo(), *_testDoc->getDataType(), _testDoc->getId()));
- UpdateCommand::SP cmd(new UpdateCommand(_bucket, update, 14));
+TEST_P(StorageProtocolTest, update_command_with_condition) {
+ auto update = std::make_shared<document::DocumentUpdate>(
+ _docMan.getTypeRepo(), *_testDoc->getDataType(), _testDoc->getId());
+ auto cmd = std::make_shared<UpdateCommand>(_bucket, update, 14);
cmd->setCondition(TestAndSetCondition(CONDITION_STRING));
- UpdateCommand::SP cmd2(copyCommand(cmd, _version5_2));
- CPPUNIT_ASSERT_EQUAL(cmd->getCondition().getSelection(), cmd2->getCondition().getSelection());
+ auto cmd2 = copyCommand(cmd);
+ EXPECT_EQ(cmd->getCondition().getSelection(), cmd2->getCondition().getSelection());
}
-void
-StorageProtocolTest::testRemoveCommand52()
-{
- ScopedName test("testRemoveCommand52");
-
- RemoveCommand::SP cmd(new RemoveCommand(_bucket, _testDocId, 159));
+TEST_P(StorageProtocolTest, remove_command_with_condition) {
+ auto cmd = std::make_shared<RemoveCommand>(_bucket, _testDocId, 159);
cmd->setCondition(TestAndSetCondition(CONDITION_STRING));
- RemoveCommand::SP cmd2(copyCommand(cmd, _version5_2));
- CPPUNIT_ASSERT_EQUAL(cmd->getCondition().getSelection(), cmd2->getCondition().getSelection());
+ auto cmd2 = copyCommand(cmd);
+ EXPECT_EQ(cmd->getCondition().getSelection(), cmd2->getCondition().getSelection());
}
-void
-StorageProtocolTest::testPutCommandWithBucketSpace6_0()
-{
- ScopedName test("testPutCommandWithBucketSpace6_0");
-
- document::Bucket bucket(document::BucketSpace(5), _bucket.getBucketId());
+TEST_P(StorageProtocolTest, put_command_with_bucket_space) {
+ document::Bucket bucket(document::BucketSpace(5), _bucket_id);
auto cmd = std::make_shared<PutCommand>(bucket, _testDoc, 14);
- auto cmd2 = copyCommand(cmd, _version6_0);
- CPPUNIT_ASSERT_EQUAL(bucket, cmd2->getBucket());
+ auto cmd2 = copyCommand(cmd);
+ EXPECT_EQ(bucket, cmd2->getBucket());
}
-void
-StorageProtocolTest::testCreateVisitorWithBucketSpace6_0()
-{
- ScopedName test("testCreateVisitorWithBucketSpace6_0");
-
+TEST_P(StorageProtocolTest, create_visitor_with_bucket_space) {
document::BucketSpace bucketSpace(5);
auto cmd = std::make_shared<CreateVisitorCommand>(bucketSpace, "library", "id", "doc selection");
- auto cmd2 = copyCommand(cmd, _version6_0);
- CPPUNIT_ASSERT_EQUAL(bucketSpace, cmd2->getBucketSpace());
+ auto cmd2 = copyCommand(cmd);
+ EXPECT_EQ(bucketSpace, cmd2->getBucketSpace());
}
-void
-StorageProtocolTest::testRequestBucketInfoWithBucketSpace6_0()
-{
- ScopedName test("testRequestBucketInfoWithBucketSpace6_0");
-
+TEST_P(StorageProtocolTest, request_bucket_info_with_bucket_space) {
document::BucketSpace bucketSpace(5);
std::vector<document::BucketId> ids = {document::BucketId(3)};
auto cmd = std::make_shared<RequestBucketInfoCommand>(bucketSpace, ids);
- auto cmd2 = copyCommand(cmd, _version6_0);
- CPPUNIT_ASSERT_EQUAL(bucketSpace, cmd2->getBucketSpace());
- CPPUNIT_ASSERT_EQUAL(ids, cmd2->getBuckets());
-}
-
-void
-StorageProtocolTest::serialized_size_is_used_to_set_approx_size_of_storage_message()
-{
- ScopedName test("serialized_size_is_used_to_set_approx_size_of_storage_message");
-
- PutCommand::SP cmd(new PutCommand(_bucket, _testDoc, 14));
- CPPUNIT_ASSERT_EQUAL(50u, cmd->getApproxByteSize());
-
- PutCommand::SP cmd2(copyCommand(cmd, _version6_0));
- CPPUNIT_ASSERT_EQUAL(181u, cmd2->getApproxByteSize());
+ auto cmd2 = copyCommand(cmd);
+ EXPECT_EQ(bucketSpace, cmd2->getBucketSpace());
+ EXPECT_EQ(ids, cmd2->getBuckets());
}
-void
-StorageProtocolTest::testStringOutputs()
-{
- std::cerr << "\nNon verbose output:\n";
- for (uint32_t i=0, n=_nonVerboseMessageStrings.size(); i<n; ++i) {
- std::cerr << _nonVerboseMessageStrings[i] << "\n";
- }
- std::cerr << "\nVerbose output:\n";
- for (uint32_t i=0, n=_verboseMessageStrings.size(); i<n; ++i) {
- std::cerr << _verboseMessageStrings[i] << "\n";
- }
-}
+TEST_P(StorageProtocolTest, serialized_size_is_used_to_set_approx_size_of_storage_message) {
+ auto cmd = std::make_shared<PutCommand>(_bucket, _testDoc, 14);
+ EXPECT_EQ(50u, cmd->getApproxByteSize());
-void
-StorageProtocolTest::testWriteSerialization50()
-{
- std::ofstream of("mbusprot/mbusprot.5.0.serialization.5.1");
- of << std::hex << std::setfill('0');
- for (uint32_t i=0, n=_serialization50.size(); i<n; ++i) {
- char c = _serialization50[i];
- if (c > 126 || (c < 32 && c != 10)) {
- int32_t num = static_cast<int32_t>(c);
- if (num < 0) num += 256;
- of << '\\' << std::setw(2) << num;
- } else if (c == '\\') {
- of << "\\\\";
- } else {
- of << c;
- }
+ auto cmd2 = copyCommand(cmd);
+ auto version = GetParam();
+ if (version.getMajor() == 7) { // Protobuf-based encoding
+ EXPECT_EQ(158u, cmd2->getApproxByteSize());
+ } else { // Legacy encoding
+ EXPECT_EQ(181u, cmd2->getApproxByteSize());
}
- of.close();
}
-} // mbusprot
-} // storage
+} // storage::api
diff --git a/storageapi/src/vespa/storageapi/CMakeLists.txt b/storageapi/src/vespa/storageapi/CMakeLists.txt
index c08dcbc2419..90eb6dd9eca 100644
--- a/storageapi/src/vespa/storageapi/CMakeLists.txt
+++ b/storageapi/src/vespa/storageapi/CMakeLists.txt
@@ -1,4 +1,5 @@
# Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
vespa_add_library(storageapi
SOURCES
$<TARGET_OBJECTS:storageapi_message>
@@ -8,3 +9,6 @@ vespa_add_library(storageapi
INSTALL lib64
DEPENDS
)
+
+vespa_add_target_package_dependency(storageapi Protobuf)
+
diff --git a/storageapi/src/vespa/storageapi/mbusprot/.gitignore b/storageapi/src/vespa/storageapi/mbusprot/.gitignore
index 526f91c6668..8e91fe9cab0 100644
--- a/storageapi/src/vespa/storageapi/mbusprot/.gitignore
+++ b/storageapi/src/vespa/storageapi/mbusprot/.gitignore
@@ -5,3 +5,6 @@
.deps
.libs
Makefile
+*.pb.h
+*.pb.cc
+
diff --git a/storageapi/src/vespa/storageapi/mbusprot/CMakeLists.txt b/storageapi/src/vespa/storageapi/mbusprot/CMakeLists.txt
index d5952d7cb91..dc4e3897e49 100644
--- a/storageapi/src/vespa/storageapi/mbusprot/CMakeLists.txt
+++ b/storageapi/src/vespa/storageapi/mbusprot/CMakeLists.txt
@@ -1,4 +1,19 @@
# Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+find_package(Protobuf REQUIRED)
+PROTOBUF_GENERATE_CPP(storageapi_PROTOBUF_SRCS storageapi_PROTOBUF_HDRS
+ protobuf/common.proto
+ protobuf/feed.proto
+ protobuf/visiting.proto
+ protobuf/maintenance.proto)
+
+# protoc-generated files emit compiler warnings that we normally treat as errors.
+# Instead of rolling our own compiler plugin we'll pragmatically disable the noise.
+set_source_files_properties(${storageapi_PROTOBUF_SRCS} PROPERTIES COMPILE_FLAGS "-Wno-suggest-override -Wno-inline")
+# protoc explicitly annotates methods with inline, which triggers -Werror=inline when
+# the header file grows over a certain size.
+set_source_files_properties(protocolserialization7.cpp PROPERTIES COMPILE_FLAGS "-Wno-inline")
+
vespa_add_library(storageapi_mbusprot OBJECT
SOURCES
storagemessage.cpp
@@ -11,5 +26,7 @@ vespa_add_library(storageapi_mbusprot OBJECT
protocolserialization5_1.cpp
protocolserialization5_2.cpp
protocolserialization6_0.cpp
+ protocolserialization7.cpp
+ ${storageapi_PROTOBUF_SRCS}
DEPENDS
)
diff --git a/storageapi/src/vespa/storageapi/mbusprot/legacyprotocolserialization.h b/storageapi/src/vespa/storageapi/mbusprot/legacyprotocolserialization.h
new file mode 100644
index 00000000000..ef4a6b28749
--- /dev/null
+++ b/storageapi/src/vespa/storageapi/mbusprot/legacyprotocolserialization.h
@@ -0,0 +1,31 @@
+// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include "protocolserialization.h"
+
+namespace storage::mbusprot {
+
+/*
+ * Utility base class for pre-v7 (protobuf) serialization implementations.
+ *
+ * TODO remove on Vespa 8 alongside legacy serialization formats.
+ */
+class LegacyProtocolSerialization : public ProtocolSerialization {
+ const std::shared_ptr<const document::DocumentTypeRepo> _repo;
+public:
+ explicit LegacyProtocolSerialization(const std::shared_ptr<const document::DocumentTypeRepo>& repo)
+ : _repo(repo)
+ {}
+
+ const document::DocumentTypeRepo& getTypeRepo() const { return *_repo; }
+ const std::shared_ptr<const document::DocumentTypeRepo> getTypeRepoSp() const { return _repo; }
+
+ virtual document::Bucket getBucket(document::ByteBuffer& buf) const = 0;
+ virtual void putBucket(const document::Bucket& bucket, vespalib::GrowableByteBuffer& buf) const = 0;
+ virtual document::BucketSpace getBucketSpace(document::ByteBuffer& buf) const = 0;
+ virtual void putBucketSpace(document::BucketSpace bucketSpace, vespalib::GrowableByteBuffer& buf) const = 0;
+ virtual api::BucketInfo getBucketInfo(document::ByteBuffer& buf) const = 0;
+ virtual void putBucketInfo(const api::BucketInfo& info, vespalib::GrowableByteBuffer& buf) const = 0;
+};
+
+} // storage::mbusprot
diff --git a/storageapi/src/vespa/storageapi/mbusprot/protobuf/common.proto b/storageapi/src/vespa/storageapi/mbusprot/protobuf/common.proto
new file mode 100644
index 00000000000..d641449995d
--- /dev/null
+++ b/storageapi/src/vespa/storageapi/mbusprot/protobuf/common.proto
@@ -0,0 +1,68 @@
+// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+syntax = "proto3";
+
+option cc_enable_arenas = true;
+
+package storage.mbusprot.protobuf;
+
+// Note: we use a *Request/*Response naming convention rather than *Command/*Reply,
+// as the former is the gRPC convention and that's where we intend to move.
+
+message BucketSpace {
+ uint64 space_id = 1;
+}
+
+message BucketId {
+ fixed64 raw_id = 1;
+}
+
+message Bucket {
+ uint64 space_id = 1;
+ fixed64 raw_bucket_id = 2;
+}
+
+// Next tag to use: 3
+message BucketInfo {
+ uint64 last_modified_timestamp = 1;
+ fixed32 legacy_checksum = 2;
+ // TODO v2 checksum
+ uint32 doc_count = 3;
+ uint32 total_doc_size = 4;
+ uint32 meta_count = 5;
+ uint32 used_file_size = 6;
+ bool ready = 7;
+ bool active = 8;
+}
+
+message GlobalId {
+ // 96 bits of GID data in _little_ endian. High entropy, so fixed encoding is better than varint.
+ // Low 64 bits as if memcpy()ed from bytes [0, 8) of the GID buffer
+ fixed64 lo_64 = 1;
+ // High 32 bits as if memcpy()ed from bytes [8, 12) of the GID buffer
+ fixed32 hi_32 = 2;
+}
+
+// TODO these should ideally be gRPC headers..
+message RequestHeader {
+ uint64 message_id = 1;
+ uint32 priority = 2; // Always in range [0, 255]
+ uint32 source_index = 3; // Always in range [0, 65535]
+ fixed32 loadtype_id = 4; // It's a hash with high entropy, so fixed encoding is better than varint
+}
+
+// TODO these should ideally be gRPC headers..
+message ResponseHeader {
+ // TODO this should ideally be gRPC Status...
+ uint32 return_code_id = 1;
+ bytes return_code_message = 2; // FIXME it's `bytes` since `string` will check for UTF-8... might not hold...
+ uint64 message_id = 3;
+ uint32 priority = 4; // Always in range [0, 255]
+}
+
+message Document {
+ bytes payload = 1;
+}
+
+message DocumentId {
+ bytes id = 1;
+}
diff --git a/storageapi/src/vespa/storageapi/mbusprot/protobuf/feed.proto b/storageapi/src/vespa/storageapi/mbusprot/protobuf/feed.proto
new file mode 100644
index 00000000000..58da24df836
--- /dev/null
+++ b/storageapi/src/vespa/storageapi/mbusprot/protobuf/feed.proto
@@ -0,0 +1,91 @@
+// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+syntax = "proto3";
+
+option cc_enable_arenas = true;
+
+package storage.mbusprot.protobuf;
+
+import "common.proto";
+
+message TestAndSetCondition {
+ bytes selection = 1;
+}
+
+message PutRequest {
+ Bucket bucket = 1;
+ Document document = 2;
+ uint64 new_timestamp = 3;
+ uint64 expected_old_timestamp = 4; // If zero; no expectation.
+ TestAndSetCondition condition = 5;
+}
+
+message PutResponse {
+ BucketInfo bucket_info = 1;
+ BucketId remapped_bucket_id = 2;
+ bool was_found = 3;
+}
+
+message Update {
+ bytes payload = 1;
+}
+
+message UpdateRequest {
+ Bucket bucket = 1;
+ Update update = 2;
+ uint64 new_timestamp = 3;
+ uint64 expected_old_timestamp = 4; // If zero; no expectation.
+ TestAndSetCondition condition = 5;
+}
+
+message UpdateResponse {
+ BucketInfo bucket_info = 1;
+ BucketId remapped_bucket_id = 2;
+ uint64 updated_timestamp = 3;
+}
+
+message RemoveRequest {
+ Bucket bucket = 1;
+ bytes document_id = 2;
+ uint64 new_timestamp = 3;
+ TestAndSetCondition condition = 4;
+}
+
+message RemoveResponse {
+ BucketInfo bucket_info = 1;
+ BucketId remapped_bucket_id = 2;
+ uint64 removed_timestamp = 3;
+}
+
+message GetRequest {
+ Bucket bucket = 1;
+ bytes document_id = 2;
+ bytes field_set = 3;
+ uint64 before_timestamp = 4;
+}
+
+message GetResponse {
+ Document document = 1;
+ uint64 last_modified_timestamp = 2;
+ BucketInfo bucket_info = 3;
+ BucketId remapped_bucket_id = 4;
+}
+
+message RevertRequest {
+ Bucket bucket = 1;
+ repeated uint64 revert_tokens = 2;
+}
+
+message RevertResponse {
+ BucketInfo bucket_info = 1;
+ BucketId remapped_bucket_id = 2;
+}
+
+message RemoveLocationRequest {
+ Bucket bucket = 1;
+ bytes document_selection = 2;
+}
+
+message RemoveLocationResponse {
+ BucketInfo bucket_info = 1;
+ BucketId remapped_bucket_id = 2;
+}
diff --git a/storageapi/src/vespa/storageapi/mbusprot/protobuf/maintenance.proto b/storageapi/src/vespa/storageapi/mbusprot/protobuf/maintenance.proto
new file mode 100644
index 00000000000..c4766d2900a
--- /dev/null
+++ b/storageapi/src/vespa/storageapi/mbusprot/protobuf/maintenance.proto
@@ -0,0 +1,160 @@
+// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+syntax = "proto3";
+
+option cc_enable_arenas = true;
+
+package storage.mbusprot.protobuf;
+
+import "common.proto";
+
+message DeleteBucketRequest {
+ Bucket bucket = 1;
+ BucketInfo expected_bucket_info = 2;
+}
+
+message DeleteBucketResponse {
+ BucketInfo bucket_info = 1;
+ BucketId remapped_bucket_id = 2;
+}
+
+message CreateBucketRequest {
+ Bucket bucket = 1;
+ bool create_as_active = 2;
+}
+
+message CreateBucketResponse {
+ BucketInfo bucket_info = 1;
+ BucketId remapped_bucket_id = 2;
+}
+
+message MergeNode {
+ uint32 index = 1;
+ bool source_only = 2;
+}
+
+message MergeBucketRequest {
+ Bucket bucket = 1;
+ uint32 cluster_state_version = 2;
+ uint64 max_timestamp = 3;
+ repeated MergeNode nodes = 4;
+ repeated uint32 node_chain = 5;
+}
+
+message MergeBucketResponse {
+ BucketId remapped_bucket_id = 1;
+}
+
+message MetaDiffEntry {
+ uint64 timestamp = 1;
+ GlobalId gid = 2;
+ uint32 header_size = 3;
+ uint32 body_size = 4;
+ uint32 flags = 5;
+ uint32 presence_mask = 6;
+}
+
+message GetBucketDiffRequest {
+ Bucket bucket = 1;
+ uint64 max_timestamp = 2;
+ repeated MergeNode nodes = 3;
+ repeated MetaDiffEntry diff = 4;
+}
+
+message GetBucketDiffResponse {
+ BucketId remapped_bucket_id = 1;
+ repeated MetaDiffEntry diff = 2;
+}
+
+message ApplyDiffEntry {
+ MetaDiffEntry entry_meta = 1;
+ bytes document_id = 2;
+ bytes header_blob = 3;
+ bytes body_blob = 4;
+}
+
+message ApplyBucketDiffRequest {
+ Bucket bucket = 1;
+ repeated MergeNode nodes = 2;
+ uint32 max_buffer_size = 3;
+ repeated ApplyDiffEntry entries = 4;
+}
+
+message ApplyBucketDiffResponse {
+ BucketId remapped_bucket_id = 1;
+ repeated ApplyDiffEntry entries = 4;
+}
+
+message ExplicitBucketSet {
+ // `Bucket` is not needed, as the space is inferred from the owning message.
+ repeated BucketId bucket_ids = 2;
+}
+
+message AllBuckets {
+ uint32 distributor_index = 1;
+ bytes cluster_state = 2;
+ bytes distribution_hash = 3;
+}
+
+message RequestBucketInfoRequest {
+ BucketSpace bucket_space = 1;
+ oneof request_for {
+ ExplicitBucketSet explicit_bucket_set = 2;
+ AllBuckets all_buckets = 3;
+ }
+}
+
+message BucketAndBucketInfo {
+ fixed64 raw_bucket_id = 1;
+ BucketInfo bucket_info = 2;
+}
+
+message RequestBucketInfoResponse {
+ repeated BucketAndBucketInfo bucket_infos = 1;
+}
+
+message NotifyBucketChangeRequest {
+ Bucket bucket = 1;
+ BucketInfo bucket_info = 2;
+}
+
+message NotifyBucketChangeResponse {
+ // Currently empty
+}
+
+message SplitBucketRequest {
+ Bucket bucket = 1;
+ uint32 min_split_bits = 2;
+ uint32 max_split_bits = 3;
+ uint32 min_byte_size = 4;
+ uint32 min_doc_count = 5;
+}
+
+message SplitBucketResponse {
+ BucketId remapped_bucket_id = 1;
+ repeated BucketAndBucketInfo split_info = 2;
+}
+
+message JoinBucketsRequest {
+ Bucket bucket = 1;
+ repeated BucketId source_buckets = 2;
+ uint32 min_join_bits = 3;
+}
+
+message JoinBucketsResponse {
+ BucketInfo bucket_info = 1;
+ BucketId remapped_bucket_id = 2;
+}
+
+message SetBucketStateRequest {
+ enum BucketState {
+ Inactive = 0;
+ Active = 1;
+ }
+
+ Bucket bucket = 1;
+ BucketState state = 2;
+}
+
+message SetBucketStateResponse {
+ BucketId remapped_bucket_id = 1;
+}
diff --git a/storageapi/src/vespa/storageapi/mbusprot/protobuf/visiting.proto b/storageapi/src/vespa/storageapi/mbusprot/protobuf/visiting.proto
new file mode 100644
index 00000000000..89ce39e52a0
--- /dev/null
+++ b/storageapi/src/vespa/storageapi/mbusprot/protobuf/visiting.proto
@@ -0,0 +1,66 @@
+// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+syntax = "proto3";
+
+option cc_enable_arenas = true;
+
+package storage.mbusprot.protobuf;
+
+import "common.proto";
+
+message ClientVisitorParameter {
+ bytes key = 1;
+ bytes value = 2;
+}
+
+message VisitorConstraints {
+ bytes document_selection = 1;
+ uint64 from_time_usec = 2;
+ uint64 to_time_usec = 3;
+ bool visit_removes = 4;
+ bytes field_set = 5;
+ bool visit_inconsistent_buckets = 6;
+}
+
+message VisitorControlMeta {
+ bytes instance_id = 1;
+ bytes library_name = 2;
+ uint32 visitor_command_id = 3;
+ bytes control_destination = 4;
+ bytes data_destination = 5;
+
+ // TODO move?
+ uint32 max_pending_reply_count = 6;
+ uint32 queue_timeout = 7;
+ uint32 max_buckets_per_visitor = 8;
+}
+
+message CreateVisitorRequest {
+ BucketSpace bucket_space = 1;
+ repeated BucketId buckets = 2;
+
+ VisitorConstraints constraints = 3;
+ VisitorControlMeta control_meta = 4;
+ repeated ClientVisitorParameter client_parameters = 5;
+}
+
+message VisitorStatistics {
+ uint32 buckets_visited = 1;
+ uint64 documents_visited = 2;
+ uint64 bytes_visited = 3;
+ uint64 documents_returned = 4;
+ uint64 bytes_returned = 5;
+ uint64 second_pass_documents_returned = 6; // TODO don't include? orderdoc only
+ uint64 second_pass_bytes_returned = 7; // TODO don't include? orderdoc only
+}
+
+message CreateVisitorResponse {
+ VisitorStatistics visitor_statistics = 1;
+}
+
+message DestroyVisitorRequest {
+ bytes instance_id = 1;
+}
+
+message DestroyVisitorResponse {
+ // Currently empty
+}
diff --git a/storageapi/src/vespa/storageapi/mbusprot/protobuf_includes.h b/storageapi/src/vespa/storageapi/mbusprot/protobuf_includes.h
new file mode 100644
index 00000000000..8e878cf0560
--- /dev/null
+++ b/storageapi/src/vespa/storageapi/mbusprot/protobuf_includes.h
@@ -0,0 +1,13 @@
+// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+// Disable warnings emitted by protoc generated files
+#pragma GCC diagnostic push
+#pragma GCC diagnostic ignored "-Wsuggest-override"
+
+#include "feed.pb.h"
+#include "visiting.pb.h"
+#include "maintenance.pb.h"
+
+#pragma GCC diagnostic pop
diff --git a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization.cpp b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization.cpp
index 172cd6c8de5..917b60c50c3 100644
--- a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization.cpp
+++ b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization.cpp
@@ -17,11 +17,6 @@ LOG_SETUP(".storage.api.mbusprot.serialization.base");
namespace storage::mbusprot {
-ProtocolSerialization::ProtocolSerialization(const std::shared_ptr<const document::DocumentTypeRepo>& repo)
- : _repo(repo)
-{
-}
-
mbus::Blob
ProtocolSerialization::encode(const api::StorageMessage& msg) const
{
diff --git a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization.h b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization.h
index 9c3ddb88bdf..a57627b9ba9 100644
--- a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization.h
+++ b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization.h
@@ -59,21 +59,14 @@ class StorageCommand;
class StorageReply;
class ProtocolSerialization {
- const std::shared_ptr<const document::DocumentTypeRepo> _repo;
-
public:
virtual mbus::Blob encode(const api::StorageMessage&) const;
virtual std::unique_ptr<StorageCommand> decodeCommand(mbus::BlobRef) const;
virtual std::unique_ptr<StorageReply> decodeReply(
mbus::BlobRef, const api::StorageCommand&) const;
-
protected:
- const document::DocumentTypeRepo& getTypeRepo() const { return *_repo; }
- const std::shared_ptr<const document::DocumentTypeRepo> getTypeRepoSp() const
- { return _repo; }
-
- ProtocolSerialization(const std::shared_ptr<const document::DocumentTypeRepo> &repo);
- virtual ~ProtocolSerialization() {}
+ ProtocolSerialization() = default;
+ virtual ~ProtocolSerialization() = default;
typedef api::StorageCommand SCmd;
typedef api::StorageReply SRep;
@@ -102,13 +95,10 @@ protected:
virtual void onEncode(GBBuf&, const api::GetBucketDiffReply&) const = 0;
virtual void onEncode(GBBuf&, const api::ApplyBucketDiffCommand&) const = 0;
virtual void onEncode(GBBuf&, const api::ApplyBucketDiffReply&) const = 0;
- virtual void onEncode(GBBuf&,
- const api::RequestBucketInfoCommand&) const = 0;
+ virtual void onEncode(GBBuf&, const api::RequestBucketInfoCommand&) const = 0;
virtual void onEncode(GBBuf&, const api::RequestBucketInfoReply&) const = 0;
- virtual void onEncode(GBBuf&,
- const api::NotifyBucketChangeCommand&) const = 0;
- virtual void onEncode(GBBuf&,
- const api::NotifyBucketChangeReply&) const = 0;
+ virtual void onEncode(GBBuf&, const api::NotifyBucketChangeCommand&) const = 0;
+ virtual void onEncode(GBBuf&, const api::NotifyBucketChangeReply&) const = 0;
virtual void onEncode(GBBuf&, const api::SplitBucketCommand&) const = 0;
virtual void onEncode(GBBuf&, const api::SplitBucketReply&) const = 0;
virtual void onEncode(GBBuf&, const api::JoinBucketsCommand&) const = 0;
@@ -143,11 +133,9 @@ protected:
virtual SCmd::UP onDecodeApplyBucketDiffCommand(BBuf&) const = 0;
virtual SRep::UP onDecodeApplyBucketDiffReply(const SCmd&, BBuf&) const = 0;
virtual SCmd::UP onDecodeRequestBucketInfoCommand(BBuf&) const = 0;
- virtual SRep::UP onDecodeRequestBucketInfoReply(const SCmd&,
- BBuf&) const = 0;
+ virtual SRep::UP onDecodeRequestBucketInfoReply(const SCmd&, BBuf&) const = 0;
virtual SCmd::UP onDecodeNotifyBucketChangeCommand(BBuf&) const = 0;
- virtual SRep::UP onDecodeNotifyBucketChangeReply(const SCmd&,
- BBuf&) const = 0;
+ virtual SRep::UP onDecodeNotifyBucketChangeReply(const SCmd&, BBuf&) const = 0;
virtual SCmd::UP onDecodeSplitBucketCommand(BBuf&) const = 0;
virtual SRep::UP onDecodeSplitBucketReply(const SCmd&, BBuf&) const = 0;
virtual SCmd::UP onDecodeJoinBucketsCommand(BBuf&) const = 0;
@@ -160,14 +148,6 @@ protected:
virtual SRep::UP onDecodeDestroyVisitorReply(const SCmd&, BBuf&) const = 0;
virtual SCmd::UP onDecodeRemoveLocationCommand(BBuf&) const = 0;
virtual SRep::UP onDecodeRemoveLocationReply(const SCmd&, BBuf&) const = 0;
-
- virtual document::Bucket getBucket(document::ByteBuffer& buf) const = 0;
- virtual void putBucket(const document::Bucket& bucket, vespalib::GrowableByteBuffer& buf) const = 0;
- virtual document::BucketSpace getBucketSpace(document::ByteBuffer& buf) const = 0;
- virtual void putBucketSpace(document::BucketSpace bucketSpace, vespalib::GrowableByteBuffer& buf) const = 0;
- virtual api::BucketInfo getBucketInfo(document::ByteBuffer& buf) const = 0;
- virtual void putBucketInfo(const api::BucketInfo& info, vespalib::GrowableByteBuffer& buf) const = 0;
-
};
}
diff --git a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization4_2.cpp b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization4_2.cpp
index 74a0c964d19..466ff85f398 100644
--- a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization4_2.cpp
+++ b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization4_2.cpp
@@ -20,7 +20,7 @@ namespace storage::mbusprot {
ProtocolSerialization4_2::ProtocolSerialization4_2(
const std::shared_ptr<const document::DocumentTypeRepo>& repo)
- : ProtocolSerialization(repo)
+ : LegacyProtocolSerialization(repo)
{
}
diff --git a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization4_2.h b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization4_2.h
index 56aa3d4ed30..e4ab36dc989 100644
--- a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization4_2.h
+++ b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization4_2.h
@@ -1,13 +1,13 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#pragma once
-#include "protocolserialization.h"
+#include "legacyprotocolserialization.h"
namespace storage::mbusprot {
-class ProtocolSerialization4_2 : public ProtocolSerialization {
+class ProtocolSerialization4_2 : public LegacyProtocolSerialization {
public:
- ProtocolSerialization4_2(const std::shared_ptr<const document::DocumentTypeRepo>&);
+ explicit ProtocolSerialization4_2(const std::shared_ptr<const document::DocumentTypeRepo>&);
protected:
void onEncode(GBBuf&, const api::GetCommand&) const override;
diff --git a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization5_0.h b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization5_0.h
index 042ec7850ef..67f02aa2d2a 100644
--- a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization5_0.h
+++ b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization5_0.h
@@ -73,6 +73,9 @@ public:
SRep::UP onDecodeCreateVisitorReply(const SCmd& cmd, BBuf& buf) const override;
void onDecodeCommand(BBuf& buf, api::StorageCommand& msg) const override;
void onDecodeReply(BBuf&, api::StorageReply&) const override;
+
+protected:
+ const documentapi::LoadTypeSet& loadTypes() const noexcept { return _loadTypes; };
};
}
diff --git a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.cpp b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.cpp
new file mode 100644
index 00000000000..ca77977046c
--- /dev/null
+++ b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.cpp
@@ -0,0 +1,1268 @@
+// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "protocolserialization7.h"
+#include "serializationhelper.h"
+#include "protobuf_includes.h"
+
+#include <vespa/document/update/documentupdate.h>
+#include <vespa/document/util/bufferexceptions.h>
+#include <vespa/storageapi/message/bucketsplitting.h>
+#include <vespa/storageapi/message/persistence.h>
+#include <vespa/storageapi/message/removelocation.h>
+#include <vespa/storageapi/message/visitor.h>
+
+namespace storage::mbusprot {
+
+ProtocolSerialization7::ProtocolSerialization7(std::shared_ptr<const document::DocumentTypeRepo> repo,
+ const documentapi::LoadTypeSet& load_types)
+ : ProtocolSerialization(),
+ _repo(std::move(repo)),
+ _load_types(load_types)
+{
+}
+
+namespace {
+
+void set_bucket(protobuf::Bucket& dest, const document::Bucket& src) {
+ dest.set_raw_bucket_id(src.getBucketId().getRawId());
+ dest.set_space_id(src.getBucketSpace().getId());
+}
+
+void set_bucket_id(protobuf::BucketId& dest, const document::BucketId& src) {
+ dest.set_raw_id(src.getRawId());
+}
+
+document::BucketId get_bucket_id(const protobuf::BucketId& src) {
+ return document::BucketId(src.raw_id());
+}
+
+void set_bucket_space(protobuf::BucketSpace& dest, const document::BucketSpace& src) {
+ dest.set_space_id(src.getId());
+}
+
+document::BucketSpace get_bucket_space(const protobuf::BucketSpace& src) {
+ return document::BucketSpace(src.space_id());
+}
+
+void set_bucket_info(protobuf::BucketInfo& dest, const api::BucketInfo& src) {
+ dest.set_last_modified_timestamp(src.getLastModified());
+ dest.set_legacy_checksum(src.getChecksum());
+ dest.set_doc_count(src.getDocumentCount());
+ dest.set_total_doc_size(src.getTotalDocumentSize());
+ dest.set_meta_count(src.getMetaCount());
+ dest.set_used_file_size(src.getUsedFileSize());
+ dest.set_active(src.isActive());
+ dest.set_ready(src.isReady());
+}
+
+document::Bucket get_bucket(const protobuf::Bucket& src) {
+ return document::Bucket(document::BucketSpace(src.space_id()),
+ document::BucketId(src.raw_bucket_id()));
+}
+
+api::BucketInfo get_bucket_info(const protobuf::BucketInfo& src) {
+ api::BucketInfo info;
+ info.setLastModified(src.last_modified_timestamp());
+ info.setChecksum(src.legacy_checksum());
+ info.setDocumentCount(src.doc_count());
+ info.setTotalDocumentSize(src.total_doc_size());
+ info.setMetaCount(src.meta_count());
+ info.setUsedFileSize(src.used_file_size());
+ info.setActive(src.active());
+ info.setReady(src.ready());
+ return info;
+}
+
+documentapi::TestAndSetCondition get_tas_condition(const protobuf::TestAndSetCondition& src) {
+ return documentapi::TestAndSetCondition(src.selection());
+}
+
+void set_tas_condition(protobuf::TestAndSetCondition& dest, const documentapi::TestAndSetCondition& src) {
+ dest.set_selection(src.getSelection().data(), src.getSelection().size());
+}
+
+std::shared_ptr<document::Document> get_document(const protobuf::Document& src_doc,
+ const document::DocumentTypeRepo& type_repo)
+{
+ if (!src_doc.payload().empty()) {
+ document::ByteBuffer doc_buf(src_doc.payload().data(), src_doc.payload().size());
+ return std::make_shared<document::Document>(type_repo, doc_buf);
+ }
+ return std::shared_ptr<document::Document>();
+}
+
+void set_update(protobuf::Update& dest, const document::DocumentUpdate& src) {
+ vespalib::nbostream stream;
+ src.serializeHEAD(stream);
+ dest.set_payload(stream.peek(), stream.size());
+}
+
+std::shared_ptr<document::DocumentUpdate> get_update(const protobuf::Update& src,
+ const document::DocumentTypeRepo& type_repo)
+{
+ if (!src.payload().empty()) {
+ return document::DocumentUpdate::createHEAD(
+ type_repo, vespalib::nbostream(src.payload().data(), src.payload().size()));
+ }
+ return std::shared_ptr<document::DocumentUpdate>();
+}
+
+void write_request_header(vespalib::GrowableByteBuffer& buf, const api::StorageCommand& cmd) {
+ protobuf::RequestHeader hdr; // Arena alloc not needed since there are no nested messages
+ hdr.set_message_id(cmd.getMsgId());
+ hdr.set_priority(cmd.getPriority());
+ hdr.set_source_index(cmd.getSourceIndex());
+ hdr.set_loadtype_id(cmd.getLoadType().getId());
+
+ uint8_t dest[128]; // Only primitive fields, should be plenty large enough.
+ auto encoded_size = static_cast<uint32_t>(hdr.ByteSizeLong());
+ assert(encoded_size <= sizeof(dest));
+ [[maybe_unused]] bool ok = hdr.SerializeWithCachedSizesToArray(dest);
+ assert(ok);
+ buf.putInt(encoded_size);
+ buf.putBytes(reinterpret_cast<const char*>(dest), encoded_size);
+}
+
+void write_response_header(vespalib::GrowableByteBuffer& buf, const api::StorageReply& reply) {
+ protobuf::ResponseHeader hdr; // Arena alloc not needed since there are no nested messages
+ const auto& result = reply.getResult();
+ hdr.set_return_code_id(static_cast<uint32_t>(result.getResult()));
+ if (!result.getMessage().empty()) {
+ hdr.set_return_code_message(result.getMessage().data(), result.getMessage().size());
+ }
+ hdr.set_message_id(reply.getMsgId());
+ hdr.set_priority(reply.getPriority());
+
+ const auto header_size = hdr.ByteSizeLong();
+ assert(header_size <= UINT32_MAX);
+ buf.putInt(static_cast<uint32_t>(header_size));
+
+ auto* dest_buf = reinterpret_cast<uint8_t*>(buf.allocate(header_size));
+ [[maybe_unused]] bool ok = hdr.SerializeWithCachedSizesToArray(dest_buf);
+ assert(ok);
+}
+
+void decode_request_header(document::ByteBuffer& buf, protobuf::RequestHeader& hdr) {
+ auto hdr_len = static_cast<uint32_t>(SerializationHelper::getInt(buf));
+ if (hdr_len > buf.getRemaining()) {
+ throw document::BufferOutOfBoundsException(buf.getPos(), hdr_len);
+ }
+ bool ok = hdr.ParseFromArray(buf.getBufferAtPos(), hdr_len);
+ if (!ok) {
+ throw vespalib::IllegalArgumentException("Malformed protobuf request header");
+ }
+ buf.incPos(hdr_len);
+}
+
+void decode_response_header(document::ByteBuffer& buf, protobuf::ResponseHeader& hdr) {
+ auto hdr_len = static_cast<uint32_t>(SerializationHelper::getInt(buf));
+ if (hdr_len > buf.getRemaining()) {
+ throw document::BufferOutOfBoundsException(buf.getPos(), hdr_len);
+ }
+ bool ok = hdr.ParseFromArray(buf.getBufferAtPos(), hdr_len);
+ if (!ok) {
+ throw vespalib::IllegalArgumentException("Malformed protobuf response header");
+ }
+ buf.incPos(hdr_len);
+}
+
+} // anonymous namespace
+
+template <typename ProtobufType>
+class BaseEncoder {
+ vespalib::GrowableByteBuffer& _out_buf;
+ ::google::protobuf::Arena _arena;
+ ProtobufType* _proto_obj;
+public:
+ explicit BaseEncoder(vespalib::GrowableByteBuffer& out_buf)
+ : _out_buf(out_buf),
+ _arena(),
+ _proto_obj(::google::protobuf::Arena::Create<ProtobufType>(&_arena))
+ {
+ }
+
+ void encode() {
+ assert(_proto_obj != nullptr);
+ const auto sz = _proto_obj->ByteSizeLong();
+ assert(sz <= UINT32_MAX);
+ auto* buf = reinterpret_cast<uint8_t*>(_out_buf.allocate(sz));
+ [[maybe_unused]] bool ok = _proto_obj->SerializeWithCachedSizesToArray(buf);
+ assert(ok);
+ _proto_obj = nullptr;
+ }
+protected:
+ vespalib::GrowableByteBuffer& buffer() noexcept { return _out_buf; }
+
+ // Precondition: encode() is not called
+ ProtobufType& proto_obj() noexcept { return *_proto_obj; }
+ const ProtobufType& proto_obj() const noexcept { return *_proto_obj; }
+};
+
+template <typename ProtobufType>
+class RequestEncoder : public BaseEncoder<ProtobufType> {
+public:
+ RequestEncoder(vespalib::GrowableByteBuffer& out_buf, const api::StorageCommand& cmd)
+ : BaseEncoder<ProtobufType>(out_buf)
+ {
+ write_request_header(out_buf, cmd);
+ }
+
+ // Precondition: encode() is not called
+ ProtobufType& request() noexcept { return this->proto_obj(); }
+ const ProtobufType& request() const noexcept { return this->proto_obj(); }
+};
+
+template <typename ProtobufType>
+class ResponseEncoder : public BaseEncoder<ProtobufType> {
+public:
+ ResponseEncoder(vespalib::GrowableByteBuffer& out_buf, const api::StorageReply& reply)
+ : BaseEncoder<ProtobufType>(out_buf)
+ {
+ write_response_header(out_buf, reply);
+ }
+
+ // Precondition: encode() is not called
+ ProtobufType& response() noexcept { return this->proto_obj(); }
+ const ProtobufType& response() const noexcept { return this->proto_obj(); }
+};
+
+template <typename ProtobufType>
+class RequestDecoder {
+ protobuf::RequestHeader _hdr;
+ ::google::protobuf::Arena _arena;
+ ProtobufType* _proto_obj;
+ const documentapi::LoadTypeSet& _load_types;
+public:
+ RequestDecoder(document::ByteBuffer& in_buf, const documentapi::LoadTypeSet& load_types)
+ : _arena(),
+ _proto_obj(::google::protobuf::Arena::Create<ProtobufType>(&_arena)),
+ _load_types(load_types)
+ {
+ decode_request_header(in_buf, _hdr);
+ assert(in_buf.getRemaining() <= INT_MAX);
+ bool ok = _proto_obj->ParseFromArray(in_buf.getBufferAtPos(), in_buf.getRemaining());
+ if (!ok) {
+ throw vespalib::IllegalArgumentException(
+ vespalib::make_string("Malformed protobuf request payload for %s",
+ ProtobufType::descriptor()->full_name().c_str()));
+ }
+ }
+
+ void transfer_meta_information_to(api::StorageCommand& dest) {
+ dest.forceMsgId(_hdr.message_id());
+ dest.setPriority(static_cast<uint8_t>(_hdr.priority()));
+ dest.setSourceIndex(static_cast<uint16_t>(_hdr.source_index()));
+ dest.setLoadType(_load_types[_hdr.loadtype_id()]);
+ }
+
+ ProtobufType& request() noexcept { return *_proto_obj; }
+ const ProtobufType& request() const noexcept { return *_proto_obj; }
+};
+
+template <typename ProtobufType>
+class ResponseDecoder {
+ protobuf::ResponseHeader _hdr;
+ ::google::protobuf::Arena _arena;
+ ProtobufType* _proto_obj;
+public:
+ explicit ResponseDecoder(document::ByteBuffer& in_buf)
+ : _arena(),
+ _proto_obj(::google::protobuf::Arena::Create<ProtobufType>(&_arena))
+ {
+ decode_response_header(in_buf, _hdr);
+ assert(in_buf.getRemaining() <= INT_MAX);
+ bool ok = _proto_obj->ParseFromArray(in_buf.getBufferAtPos(), in_buf.getRemaining());
+ if (!ok) {
+ throw vespalib::IllegalArgumentException(
+ vespalib::make_string("Malformed protobuf response payload for %s",
+ ProtobufType::descriptor()->full_name().c_str()));
+ }
+ }
+
+ ProtobufType& response() noexcept { return *_proto_obj; }
+ const ProtobufType& response() const noexcept { return *_proto_obj; }
+};
+
+template <typename ProtobufType, typename Func>
+void encode_request(vespalib::GrowableByteBuffer& out_buf, const api::StorageCommand& msg, Func&& f) {
+ RequestEncoder<ProtobufType> enc(out_buf, msg);
+ f(enc.request());
+ enc.encode();
+}
+
+template <typename ProtobufType, typename Func>
+void encode_response(vespalib::GrowableByteBuffer& out_buf, const api::StorageReply& reply, Func&& f) {
+ ResponseEncoder<ProtobufType> enc(out_buf, reply);
+ auto& res = enc.response();
+ f(res);
+ enc.encode();
+}
+
+template <typename ProtobufType, typename Func>
+std::unique_ptr<api::StorageCommand>
+ProtocolSerialization7::decode_request(document::ByteBuffer& in_buf, Func&& f) const {
+ RequestDecoder<ProtobufType> dec(in_buf, _load_types);
+ const auto& req = dec.request();
+ auto cmd = f(req);
+ dec.transfer_meta_information_to(*cmd);
+ return cmd;
+}
+
+template <typename ProtobufType, typename Func>
+std::unique_ptr<api::StorageReply>
+ProtocolSerialization7::decode_response(document::ByteBuffer& in_buf, Func&& f) const {
+ ResponseDecoder<ProtobufType> dec(in_buf);
+ const auto& res = dec.response();
+ auto reply = f(res);
+ return reply;
+}
+
+template <typename ProtobufType, typename Func>
+void encode_bucket_request(vespalib::GrowableByteBuffer& out_buf, const api::BucketCommand& msg, Func&& f) {
+ encode_request<ProtobufType>(out_buf, msg, [&](ProtobufType& req) {
+ set_bucket(*req.mutable_bucket(), msg.getBucket());
+ f(req);
+ });
+}
+
+template <typename ProtobufType, typename Func>
+std::unique_ptr<api::StorageCommand>
+ProtocolSerialization7::decode_bucket_request(document::ByteBuffer& in_buf, Func&& f) const {
+ return decode_request<ProtobufType>(in_buf, [&](const ProtobufType& req) {
+ if (!req.has_bucket()) {
+ throw vespalib::IllegalArgumentException(
+ vespalib::make_string("Malformed protocol buffer request for %s; no bucket",
+ ProtobufType::descriptor()->full_name().c_str()));
+ }
+ const auto bucket = get_bucket(req.bucket());
+ return f(req, bucket);
+ });
+}
+
+template <typename ProtobufType, typename Func>
+void encode_bucket_response(vespalib::GrowableByteBuffer& out_buf, const api::BucketReply& reply, Func&& f) {
+ encode_response<ProtobufType>(out_buf, reply, [&](ProtobufType& res) {
+ if (reply.hasBeenRemapped()) {
+ set_bucket_id(*res.mutable_remapped_bucket_id(), reply.getBucketId());
+ }
+ f(res);
+ });
+}
+
+template <typename ProtobufType, typename Func>
+std::unique_ptr<api::StorageReply>
+ProtocolSerialization7::decode_bucket_response(document::ByteBuffer& in_buf, Func&& f) const {
+ return decode_response<ProtobufType>(in_buf, [&](const ProtobufType& res) {
+ auto reply = f(res);
+ if (res.has_remapped_bucket_id()) {
+ reply->remapBucketId(get_bucket_id(res.remapped_bucket_id()));
+ }
+ return reply;
+ });
+}
+
+template <typename ProtobufType, typename Func>
+void encode_bucket_info_response(vespalib::GrowableByteBuffer& out_buf, const api::BucketInfoReply& reply, Func&& f) {
+ encode_bucket_response<ProtobufType>(out_buf, reply, [&](ProtobufType& res) {
+ set_bucket_info(*res.mutable_bucket_info(), reply.getBucketInfo());
+ f(res);
+ });
+}
+
+template <typename ProtobufType, typename Func>
+std::unique_ptr<api::StorageReply>
+ProtocolSerialization7::decode_bucket_info_response(document::ByteBuffer& in_buf, Func&& f) const {
+ return decode_bucket_response<ProtobufType>(in_buf, [&](const ProtobufType& res) {
+ auto reply = f(res);
+ if (res.has_bucket_info()) {
+ reply->setBucketInfo(get_bucket_info(res.bucket_info()));
+ }
+ return reply;
+ });
+}
+
+// TODO document protobuf ducktyping assumptions
+
+namespace {
+// Inherit from known base class just to avoid having to template this. We don't care about its subtype anyway.
+void no_op_encode([[maybe_unused]] ::google::protobuf::Message&) {
+ // nothing to do here.
+}
+
+void set_document(protobuf::Document& target_doc, const document::Document& src_doc) {
+ vespalib::nbostream stream;
+ src_doc.serialize(stream);
+ target_doc.set_payload(stream.peek(), stream.size());
+}
+
+}
+
+// -----------------------------------------------------------------
+// Put
+// -----------------------------------------------------------------
+
+void ProtocolSerialization7::onEncode(GBBuf& buf, const api::PutCommand& msg) const {
+ encode_bucket_request<protobuf::PutRequest>(buf, msg, [&](auto& req) {
+ req.set_new_timestamp(msg.getTimestamp());
+ req.set_expected_old_timestamp(msg.getUpdateTimestamp());
+ if (msg.getCondition().isPresent()) {
+ set_tas_condition(*req.mutable_condition(), msg.getCondition());
+ }
+ if (msg.getDocument()) {
+ set_document(*req.mutable_document(), *msg.getDocument());
+ }
+ });
+}
+
+void ProtocolSerialization7::onEncode(GBBuf& buf, const api::PutReply& msg) const {
+ encode_bucket_info_response<protobuf::PutResponse>(buf, msg, [&](auto& res) {
+ res.set_was_found(msg.wasFound());
+ });
+}
+
+api::StorageCommand::UP ProtocolSerialization7::onDecodePutCommand(BBuf& buf) const {
+ return decode_bucket_request<protobuf::PutRequest>(buf, [&](auto& req, auto& bucket) {
+ auto document = get_document(req.document(), type_repo());
+ auto cmd = std::make_unique<api::PutCommand>(bucket, std::move(document), req.new_timestamp());
+ cmd->setUpdateTimestamp(req.expected_old_timestamp());
+ if (req.has_condition()) {
+ cmd->setCondition(get_tas_condition(req.condition()));
+ }
+ return cmd;
+ });
+}
+
+api::StorageReply::UP ProtocolSerialization7::onDecodePutReply(const SCmd& cmd, BBuf& buf) const {
+ return decode_bucket_info_response<protobuf::PutResponse>(buf, [&](auto& res) {
+ return std::make_unique<api::PutReply>(static_cast<const api::PutCommand&>(cmd), res.was_found());
+ });
+}
+
+// -----------------------------------------------------------------
+// Update
+// -----------------------------------------------------------------
+
+void ProtocolSerialization7::onEncode(GBBuf& buf, const api::UpdateCommand& msg) const {
+ encode_bucket_request<protobuf::UpdateRequest>(buf, msg, [&](auto& req) {
+ auto* update = msg.getUpdate().get();
+ if (update) {
+ set_update(*req.mutable_update(), *update);
+ }
+ req.set_new_timestamp(msg.getTimestamp());
+ req.set_expected_old_timestamp(msg.getOldTimestamp());
+ if (msg.getCondition().isPresent()) {
+ set_tas_condition(*req.mutable_condition(), msg.getCondition());
+ }
+ });
+}
+
+void ProtocolSerialization7::onEncode(GBBuf& buf, const api::UpdateReply& msg) const {
+ encode_bucket_info_response<protobuf::UpdateResponse>(buf, msg, [&](auto& res) {
+ res.set_updated_timestamp(msg.getOldTimestamp());
+ });
+}
+
+api::StorageCommand::UP ProtocolSerialization7::onDecodeUpdateCommand(BBuf& buf) const {
+ return decode_bucket_request<protobuf::UpdateRequest>(buf, [&](auto& req, auto& bucket) {
+ auto update = get_update(req.update(), type_repo());
+ auto cmd = std::make_unique<api::UpdateCommand>(bucket, std::move(update), req.new_timestamp());
+ cmd->setOldTimestamp(req.expected_old_timestamp());
+ if (req.has_condition()) {
+ cmd->setCondition(get_tas_condition(req.condition()));
+ }
+ return cmd;
+ });
+}
+
+api::StorageReply::UP ProtocolSerialization7::onDecodeUpdateReply(const SCmd& cmd, BBuf& buf) const {
+ return decode_bucket_info_response<protobuf::UpdateResponse>(buf, [&](auto& res) {
+ return std::make_unique<api::UpdateReply>(static_cast<const api::UpdateCommand&>(cmd),
+ res.updated_timestamp());
+ });
+}
+
+// -----------------------------------------------------------------
+// Remove
+// -----------------------------------------------------------------
+
+void ProtocolSerialization7::onEncode(GBBuf& buf, const api::RemoveCommand& msg) const {
+ encode_bucket_request<protobuf::RemoveRequest>(buf, msg, [&](auto& req) {
+ auto doc_id_str = msg.getDocumentId().toString();
+ req.set_document_id(doc_id_str.data(), doc_id_str.size());
+ req.set_new_timestamp(msg.getTimestamp());
+ if (msg.getCondition().isPresent()) {
+ set_tas_condition(*req.mutable_condition(), msg.getCondition());
+ }
+ });
+}
+
+void ProtocolSerialization7::onEncode(GBBuf& buf, const api::RemoveReply& msg) const {
+ encode_bucket_info_response<protobuf::RemoveResponse>(buf, msg, [&](auto& res) {
+ res.set_removed_timestamp(msg.getOldTimestamp());
+ });
+}
+
+api::StorageCommand::UP ProtocolSerialization7::onDecodeRemoveCommand(BBuf& buf) const {
+ return decode_bucket_request<protobuf::RemoveRequest>(buf, [&](auto& req, auto& bucket) {
+ document::DocumentId doc_id(vespalib::stringref(req.document_id().data(), req.document_id().size()));
+ auto cmd = std::make_unique<api::RemoveCommand>(bucket, doc_id, req.new_timestamp());
+ if (req.has_condition()) {
+ cmd->setCondition(get_tas_condition(req.condition()));
+ }
+ return cmd;
+ });
+}
+
+api::StorageReply::UP ProtocolSerialization7::onDecodeRemoveReply(const SCmd& cmd, BBuf& buf) const {
+ return decode_bucket_info_response<protobuf::RemoveResponse>(buf, [&](auto& res) {
+ return std::make_unique<api::RemoveReply>(static_cast<const api::RemoveCommand&>(cmd),
+ res.removed_timestamp());
+ });
+}
+
+// -----------------------------------------------------------------
+// Get
+// -----------------------------------------------------------------
+
+void ProtocolSerialization7::onEncode(GBBuf& buf, const api::GetCommand& msg) const {
+ encode_bucket_request<protobuf::GetRequest>(buf, msg, [&](auto& req) {
+ auto doc_id = msg.getDocumentId().toString();
+ req.set_document_id(doc_id.data(), doc_id.size());
+ req.set_before_timestamp(msg.getBeforeTimestamp());
+ if (!msg.getFieldSet().empty()) {
+ req.set_field_set(msg.getFieldSet().data(), msg.getFieldSet().size());
+ }
+ });
+}
+
+void ProtocolSerialization7::onEncode(GBBuf& buf, const api::GetReply& msg) const {
+ encode_bucket_info_response<protobuf::GetResponse>(buf, msg, [&](auto& res) {
+ if (msg.getDocument()) {
+ set_document(*res.mutable_document(), *msg.getDocument());
+ }
+ res.set_last_modified_timestamp(msg.getLastModifiedTimestamp());
+ });
+}
+
+api::StorageCommand::UP ProtocolSerialization7::onDecodeGetCommand(BBuf& buf) const {
+ return decode_bucket_request<protobuf::GetRequest>(buf, [&](auto& req, auto& bucket) {
+ document::DocumentId doc_id(vespalib::stringref(req.document_id().data(), req.document_id().size()));
+ return std::make_unique<api::GetCommand>(bucket, std::move(doc_id),
+ req.field_set(), req.before_timestamp());
+ });
+}
+
+api::StorageReply::UP ProtocolSerialization7::onDecodeGetReply(const SCmd& cmd, BBuf& buf) const {
+ return decode_bucket_info_response<protobuf::GetResponse>(buf, [&](auto& res) {
+ try {
+ auto document = get_document(res.document(), type_repo());
+ return std::make_unique<api::GetReply>(static_cast<const api::GetCommand&>(cmd),
+ std::move(document), res.last_modified_timestamp());
+ } catch (std::exception& e) {
+ auto reply = std::make_unique<api::GetReply>(static_cast<const api::GetCommand&>(cmd),
+ std::shared_ptr<document::Document>(), 0u);
+ reply->setResult(api::ReturnCode(api::ReturnCode::UNPARSEABLE, e.what()));
+ return reply;
+ }
+ });
+}
+
+// -----------------------------------------------------------------
+// Revert
+// -----------------------------------------------------------------
+
+void ProtocolSerialization7::onEncode(GBBuf& buf, const api::RevertCommand& msg) const {
+ encode_bucket_request<protobuf::RevertRequest>(buf, msg, [&](auto& req) {
+ auto* tokens = req.mutable_revert_tokens();
+ assert(msg.getRevertTokens().size() <= INT_MAX);
+ tokens->Reserve(static_cast<int>(msg.getRevertTokens().size()));
+ for (auto token : msg.getRevertTokens()) {
+ tokens->Add(token);
+ }
+ });
+}
+
+void ProtocolSerialization7::onEncode(GBBuf& buf, const api::RevertReply& msg) const {
+ encode_bucket_info_response<protobuf::RevertResponse>(buf, msg, no_op_encode);
+}
+
+api::StorageCommand::UP ProtocolSerialization7::onDecodeRevertCommand(BBuf& buf) const {
+ return decode_bucket_request<protobuf::RevertRequest>(buf, [&](auto& req, auto& bucket) {
+ std::vector<api::Timestamp> tokens;
+ tokens.reserve(req.revert_tokens_size());
+ for (auto token : req.revert_tokens()) {
+ tokens.emplace_back(api::Timestamp(token));
+ }
+ return std::make_unique<api::RevertCommand>(bucket, std::move(tokens));
+ });
+}
+
+api::StorageReply::UP ProtocolSerialization7::onDecodeRevertReply(const SCmd& cmd, BBuf& buf) const {
+ return decode_bucket_info_response<protobuf::RevertResponse>(buf, [&]([[maybe_unused]] auto& res) {
+ return std::make_unique<api::RevertReply>(static_cast<const api::RevertCommand&>(cmd));
+ });
+}
+
+// -----------------------------------------------------------------
+// RemoveLocation
+// -----------------------------------------------------------------
+
+void ProtocolSerialization7::onEncode(GBBuf& buf, const api::RemoveLocationCommand& msg) const {
+ encode_bucket_request<protobuf::RemoveLocationRequest>(buf, msg, [&](auto& req) {
+ req.set_document_selection(msg.getDocumentSelection().data(), msg.getDocumentSelection().size());
+ });
+}
+
+void ProtocolSerialization7::onEncode(GBBuf& buf, const api::RemoveLocationReply& msg) const {
+ encode_bucket_info_response<protobuf::RemoveLocationResponse>(buf, msg, no_op_encode);
+}
+
+api::StorageCommand::UP ProtocolSerialization7::onDecodeRemoveLocationCommand(BBuf& buf) const {
+ return decode_bucket_request<protobuf::RemoveLocationRequest>(buf, [&](auto& req, auto& bucket) {
+ return std::make_unique<api::RemoveLocationCommand>(req.document_selection(), bucket);
+ });
+}
+
+api::StorageReply::UP ProtocolSerialization7::onDecodeRemoveLocationReply(const SCmd& cmd, BBuf& buf) const {
+ return decode_bucket_info_response<protobuf::RemoveLocationResponse>(buf, [&]([[maybe_unused]] auto& res) {
+ return std::make_unique<api::RemoveLocationReply>(static_cast<const api::RemoveLocationCommand&>(cmd));
+ });
+}
+
+// -----------------------------------------------------------------
+// DeleteBucket
+// -----------------------------------------------------------------
+
+void ProtocolSerialization7::onEncode(GBBuf& buf, const api::DeleteBucketCommand& msg) const {
+ encode_bucket_request<protobuf::DeleteBucketRequest>(buf, msg, [&](auto& req) {
+ set_bucket_info(*req.mutable_expected_bucket_info(), msg.getBucketInfo());
+ });
+}
+
+void ProtocolSerialization7::onEncode(GBBuf& buf, const api::DeleteBucketReply& msg) const {
+ encode_bucket_info_response<protobuf::DeleteBucketResponse>(buf, msg, no_op_encode);
+}
+
+api::StorageCommand::UP ProtocolSerialization7::onDecodeDeleteBucketCommand(BBuf& buf) const {
+ return decode_bucket_request<protobuf::DeleteBucketRequest>(buf, [&](auto& req, auto& bucket) {
+ auto cmd = std::make_unique<api::DeleteBucketCommand>(bucket);
+ if (req.has_expected_bucket_info()) {
+ cmd->setBucketInfo(get_bucket_info(req.expected_bucket_info()));
+ }
+ return cmd;
+ });
+}
+
+api::StorageReply::UP ProtocolSerialization7::onDecodeDeleteBucketReply(const SCmd& cmd, BBuf& buf) const {
+ return decode_bucket_info_response<protobuf::DeleteBucketResponse>(buf, [&]([[maybe_unused]] auto& res) {
+ return std::make_unique<api::DeleteBucketReply>(static_cast<const api::DeleteBucketCommand&>(cmd));
+ });
+}
+
+// -----------------------------------------------------------------
+// CreateBucket
+// -----------------------------------------------------------------
+
+void ProtocolSerialization7::onEncode(GBBuf& buf, const api::CreateBucketCommand& msg) const {
+ encode_bucket_request<protobuf::CreateBucketRequest>(buf, msg, [&](auto& req) {
+ req.set_create_as_active(msg.getActive());
+ });
+}
+
+void ProtocolSerialization7::onEncode(GBBuf& buf, const api::CreateBucketReply& msg) const {
+ encode_bucket_info_response<protobuf::CreateBucketResponse>(buf, msg, no_op_encode);
+}
+
+api::StorageCommand::UP ProtocolSerialization7::onDecodeCreateBucketCommand(BBuf& buf) const {
+ return decode_bucket_request<protobuf::CreateBucketRequest>(buf, [&](auto& req, auto& bucket) {
+ auto cmd = std::make_unique<api::CreateBucketCommand>(bucket);
+ cmd->setActive(req.create_as_active());
+ return cmd;
+ });
+}
+
+api::StorageReply::UP ProtocolSerialization7::onDecodeCreateBucketReply(const SCmd& cmd, BBuf& buf) const {
+ return decode_bucket_info_response<protobuf::CreateBucketResponse>(buf, [&]([[maybe_unused]] auto& res) {
+ return std::make_unique<api::CreateBucketReply>(static_cast<const api::CreateBucketCommand&>(cmd));
+ });
+}
+
+// -----------------------------------------------------------------
+// MergeBucket
+// -----------------------------------------------------------------
+
+namespace {
+
+void set_merge_nodes(::google::protobuf::RepeatedPtrField<protobuf::MergeNode>& dest,
+ const std::vector<api::MergeBucketCommand::Node>& src)
+{
+ dest.Reserve(src.size());
+ for (const auto& src_node : src) {
+ auto* dest_node = dest.Add();
+ dest_node->set_index(src_node.index);
+ dest_node->set_source_only(src_node.sourceOnly);
+ }
+}
+
+std::vector<api::MergeBucketCommand::Node> get_merge_nodes(
+ const ::google::protobuf::RepeatedPtrField<protobuf::MergeNode>& src)
+{
+ std::vector<api::MergeBucketCommand::Node> nodes;
+ nodes.reserve(src.size());
+ for (const auto& node : src) {
+ nodes.emplace_back(node.index(), node.source_only());
+ }
+ return nodes;
+}
+
+}
+
+void ProtocolSerialization7::onEncode(GBBuf& buf, const api::MergeBucketCommand& msg) const {
+ encode_bucket_request<protobuf::MergeBucketRequest>(buf, msg, [&](auto& req) {
+ set_merge_nodes(*req.mutable_nodes(), msg.getNodes());
+ req.set_max_timestamp(msg.getMaxTimestamp());
+ req.set_cluster_state_version(msg.getClusterStateVersion());
+ for (uint16_t chain_node : msg.getChain()) {
+ req.add_node_chain(chain_node);
+ }
+ });
+}
+
+void ProtocolSerialization7::onEncode(GBBuf& buf, const api::MergeBucketReply& msg) const {
+ encode_bucket_response<protobuf::MergeBucketResponse>(buf, msg, no_op_encode);
+}
+
+api::StorageCommand::UP ProtocolSerialization7::onDecodeMergeBucketCommand(BBuf& buf) const {
+ return decode_bucket_request<protobuf::MergeBucketRequest>(buf, [&](auto& req, auto& bucket) {
+ auto nodes = get_merge_nodes(req.nodes());
+ auto cmd = std::make_unique<api::MergeBucketCommand>(bucket, std::move(nodes), req.max_timestamp());
+ cmd->setClusterStateVersion(req.cluster_state_version());
+ std::vector<uint16_t> chain;
+ chain.reserve(req.node_chain_size());
+ for (uint16_t node : req.node_chain()) {
+ chain.emplace_back(node);
+ }
+ cmd->setChain(std::move(chain));
+ return cmd;
+ });
+}
+
+api::StorageReply::UP ProtocolSerialization7::onDecodeMergeBucketReply(const SCmd& cmd, BBuf& buf) const {
+ return decode_bucket_response<protobuf::MergeBucketResponse>(buf, [&]([[maybe_unused]] auto& res) {
+ return std::make_unique<api::MergeBucketReply>(static_cast<const api::MergeBucketCommand&>(cmd));
+ });
+}
+
+// -----------------------------------------------------------------
+// GetBucketDiff
+// -----------------------------------------------------------------
+
+namespace {
+
+void set_global_id(protobuf::GlobalId& dest, const document::GlobalId& src) {
+ static_assert(document::GlobalId::LENGTH == 12);
+ uint64_t lo64;
+ uint32_t hi32;
+ memcpy(&lo64, src.get(), sizeof(uint64_t));
+ memcpy(&hi32, src.get() + sizeof(uint64_t), sizeof(uint32_t));
+ dest.set_lo_64(lo64);
+ dest.set_hi_32(hi32);
+}
+
+document::GlobalId get_global_id(const protobuf::GlobalId& src) {
+ static_assert(document::GlobalId::LENGTH == 12);
+ const uint64_t lo64 = src.lo_64();
+ const uint32_t hi32 = src.hi_32();
+
+ char buf[document::GlobalId::LENGTH];
+ memcpy(buf, &lo64, sizeof(uint64_t));
+ memcpy(buf + sizeof(uint64_t), &hi32, sizeof(uint32_t));
+ return document::GlobalId(buf);
+}
+
+void set_diff_entry(protobuf::MetaDiffEntry& dest, const api::GetBucketDiffCommand::Entry& src) {
+ dest.set_timestamp(src._timestamp);
+ set_global_id(*dest.mutable_gid(), src._gid);
+ dest.set_header_size(src._headerSize);
+ dest.set_body_size(src._bodySize);
+ dest.set_flags(src._flags);
+ dest.set_presence_mask(src._hasMask);
+}
+
+api::GetBucketDiffCommand::Entry get_diff_entry(const protobuf::MetaDiffEntry& src) {
+ api::GetBucketDiffCommand::Entry e;
+ e._timestamp = src.timestamp();
+ e._gid = get_global_id(src.gid());
+ e._headerSize = src.header_size();
+ e._bodySize = src.body_size();
+ e._flags = src.flags();
+ e._hasMask = src.presence_mask();
+ return e;
+}
+
+void fill_proto_meta_diff(::google::protobuf::RepeatedPtrField<protobuf::MetaDiffEntry>& dest,
+ const std::vector<api::GetBucketDiffCommand::Entry>& src) {
+ for (const auto& diff_entry : src) {
+ set_diff_entry(*dest.Add(), diff_entry);
+ }
+}
+
+void fill_api_meta_diff(std::vector<api::GetBucketDiffCommand::Entry>& dest,
+ const ::google::protobuf::RepeatedPtrField<protobuf::MetaDiffEntry>& src) {
+ // FIXME GetBucketDiffReply ctor copies the diff from the request for some reason
+ // TODO verify this isn't actually used anywhere and remove this "feature".
+ dest.clear();
+ dest.reserve(src.size());
+ for (const auto& diff_entry : src) {
+ dest.emplace_back(get_diff_entry(diff_entry));
+ }
+}
+
+} // anonymous namespace
+
+void ProtocolSerialization7::onEncode(GBBuf& buf, const api::GetBucketDiffCommand& msg) const {
+ encode_bucket_request<protobuf::GetBucketDiffRequest>(buf, msg, [&](auto& req) {
+ set_merge_nodes(*req.mutable_nodes(), msg.getNodes());
+ req.set_max_timestamp(msg.getMaxTimestamp());
+ fill_proto_meta_diff(*req.mutable_diff(), msg.getDiff());
+ });
+}
+
+void ProtocolSerialization7::onEncode(GBBuf& buf, const api::GetBucketDiffReply& msg) const {
+ encode_bucket_response<protobuf::GetBucketDiffResponse>(buf, msg, [&](auto& res) {
+ fill_proto_meta_diff(*res.mutable_diff(), msg.getDiff());
+ });
+}
+
+api::StorageCommand::UP ProtocolSerialization7::onDecodeGetBucketDiffCommand(BBuf& buf) const {
+ return decode_bucket_request<protobuf::GetBucketDiffRequest>(buf, [&](auto& req, auto& bucket) {
+ auto nodes = get_merge_nodes(req.nodes());
+ auto cmd = std::make_unique<api::GetBucketDiffCommand>(bucket, std::move(nodes), req.max_timestamp());
+ fill_api_meta_diff(cmd->getDiff(), req.diff());
+ return cmd;
+ });
+}
+
+api::StorageReply::UP ProtocolSerialization7::onDecodeGetBucketDiffReply(const SCmd& cmd, BBuf& buf) const {
+ return decode_bucket_response<protobuf::GetBucketDiffResponse>(buf, [&](auto& res) {
+ auto reply = std::make_unique<api::GetBucketDiffReply>(static_cast<const api::GetBucketDiffCommand&>(cmd));
+ fill_api_meta_diff(reply->getDiff(), res.diff());
+ return reply;
+ });
+}
+
+// -----------------------------------------------------------------
+// ApplyBucketDiff
+// -----------------------------------------------------------------
+
+namespace {
+
+void fill_api_apply_diff_vector(std::vector<api::ApplyBucketDiffCommand::Entry>& diff,
+ const ::google::protobuf::RepeatedPtrField<protobuf::ApplyDiffEntry>& src)
+{
+ // We use the same approach as the legacy protocols here in that we pre-reserve and
+ // directly write into the vector. This avoids having to ensure all buffer management is movable.
+ size_t n_entries = src.size();
+ diff.resize(n_entries);
+ for (size_t i = 0; i < n_entries; ++i) {
+ auto& proto_entry = src.Get(i);
+ auto& dest = diff[i];
+ dest._entry = get_diff_entry(proto_entry.entry_meta());
+ dest._docName = proto_entry.document_id();
+ // TODO consider making buffers std::strings instead to avoid explicit zeroing-on-resize overhead
+ dest._headerBlob.resize(proto_entry.header_blob().size());
+ memcpy(dest._headerBlob.data(), proto_entry.header_blob().data(), proto_entry.header_blob().size());
+ dest._bodyBlob.resize(proto_entry.body_blob().size());
+ memcpy(dest._bodyBlob.data(), proto_entry.body_blob().data(), proto_entry.body_blob().size());
+ }
+}
+
+void fill_proto_apply_diff_vector(::google::protobuf::RepeatedPtrField<protobuf::ApplyDiffEntry>& dest,
+ const std::vector<api::ApplyBucketDiffCommand::Entry>& src)
+{
+ dest.Reserve(src.size());
+ for (const auto& entry : src) {
+ auto* proto_entry = dest.Add();
+ set_diff_entry(*proto_entry->mutable_entry_meta(), entry._entry);
+ proto_entry->set_document_id(entry._docName.data(), entry._docName.size());
+ proto_entry->set_header_blob(entry._headerBlob.data(), entry._headerBlob.size());
+ proto_entry->set_body_blob(entry._bodyBlob.data(), entry._bodyBlob.size());
+ }
+}
+
+} // anonymous namespace
+
+void ProtocolSerialization7::onEncode(GBBuf& buf, const api::ApplyBucketDiffCommand& msg) const {
+ encode_bucket_request<protobuf::ApplyBucketDiffRequest>(buf, msg, [&](auto& req) {
+ set_merge_nodes(*req.mutable_nodes(), msg.getNodes());
+ req.set_max_buffer_size(msg.getMaxBufferSize());
+ fill_proto_apply_diff_vector(*req.mutable_entries(), msg.getDiff());
+ });
+}
+
+void ProtocolSerialization7::onEncode(GBBuf& buf, const api::ApplyBucketDiffReply& msg) const {
+ encode_bucket_response<protobuf::ApplyBucketDiffResponse>(buf, msg, [&](auto& res) {
+ fill_proto_apply_diff_vector(*res.mutable_entries(), msg.getDiff());
+ });
+}
+
+api::StorageCommand::UP ProtocolSerialization7::onDecodeApplyBucketDiffCommand(BBuf& buf) const {
+ return decode_bucket_request<protobuf::ApplyBucketDiffRequest>(buf, [&](auto& req, auto& bucket) {
+ auto nodes = get_merge_nodes(req.nodes());
+ auto cmd = std::make_unique<api::ApplyBucketDiffCommand>(bucket, std::move(nodes), req.max_buffer_size());
+ fill_api_apply_diff_vector(cmd->getDiff(), req.entries());
+ return cmd;
+ });
+}
+
+api::StorageReply::UP ProtocolSerialization7::onDecodeApplyBucketDiffReply(const SCmd& cmd, BBuf& buf) const {
+ return decode_bucket_response<protobuf::ApplyBucketDiffResponse>(buf, [&](auto& res) {
+ auto reply = std::make_unique<api::ApplyBucketDiffReply>(static_cast<const api::ApplyBucketDiffCommand&>(cmd));
+ fill_api_apply_diff_vector(reply->getDiff(), res.entries());
+ return reply;
+ });
+}
+
+// -----------------------------------------------------------------
+// RequestBucketInfo
+// -----------------------------------------------------------------
+
+void ProtocolSerialization7::onEncode(GBBuf& buf, const api::RequestBucketInfoCommand& msg) const {
+ encode_request<protobuf::RequestBucketInfoRequest>(buf, msg, [&](auto& req) {
+ set_bucket_space(*req.mutable_bucket_space(), msg.getBucketSpace());
+ auto& buckets = msg.getBuckets();
+ if (!buckets.empty()) {
+ auto* proto_buckets = req.mutable_explicit_bucket_set();
+ for (const auto& b : buckets) {
+ set_bucket_id(*proto_buckets->add_bucket_ids(), b);
+ }
+ } else {
+ auto* all_buckets = req.mutable_all_buckets();
+ auto cluster_state = msg.getSystemState().toString();
+ all_buckets->set_distributor_index(msg.getDistributor());
+ all_buckets->set_cluster_state(cluster_state.data(), cluster_state.size());
+ all_buckets->set_distribution_hash(msg.getDistributionHash().data(), msg.getDistributionHash().size());
+ }
+ });
+}
+
+void ProtocolSerialization7::onEncode(GBBuf& buf, const api::RequestBucketInfoReply& msg) const {
+ encode_response<protobuf::RequestBucketInfoResponse>(buf, msg, [&](auto& res) {
+ auto* proto_info = res.mutable_bucket_infos();
+ proto_info->Reserve(msg.getBucketInfo().size());
+ for (const auto& entry : msg.getBucketInfo()) {
+ auto* bucket_and_info = proto_info->Add();
+ bucket_and_info->set_raw_bucket_id(entry._bucketId.getRawId());
+ set_bucket_info(*bucket_and_info->mutable_bucket_info(), entry._info);
+ }
+ });
+}
+
+api::StorageCommand::UP ProtocolSerialization7::onDecodeRequestBucketInfoCommand(BBuf& buf) const {
+ return decode_request<protobuf::RequestBucketInfoRequest>(buf, [&](auto& req) {
+ auto bucket_space = get_bucket_space(req.bucket_space());
+ if (req.has_explicit_bucket_set()) {
+ const uint32_t n_buckets = req.explicit_bucket_set().bucket_ids_size();
+ std::vector<document::BucketId> buckets(n_buckets);
+ const auto& proto_buckets = req.explicit_bucket_set().bucket_ids();
+ for (uint32_t i = 0; i < n_buckets; ++i) {
+ buckets[i] = get_bucket_id(proto_buckets.Get(i));
+ }
+ return std::make_unique<api::RequestBucketInfoCommand>(bucket_space, std::move(buckets));
+ } else if (req.has_all_buckets()) {
+ const auto& all_req = req.all_buckets();
+ return std::make_unique<api::RequestBucketInfoCommand>(
+ bucket_space, all_req.distributor_index(),
+ lib::ClusterState(all_req.cluster_state()), all_req.distribution_hash());
+ } else {
+ throw vespalib::IllegalArgumentException("RequestBucketInfo does not have any applicable fields set");
+ }
+ });
+}
+
+api::StorageReply::UP ProtocolSerialization7::onDecodeRequestBucketInfoReply(const SCmd& cmd, BBuf& buf) const {
+ return decode_response<protobuf::RequestBucketInfoResponse>(buf, [&](auto& res) {
+ auto reply = std::make_unique<api::RequestBucketInfoReply>(static_cast<const api::RequestBucketInfoCommand&>(cmd));
+ auto& dest_entries = reply->getBucketInfo();
+ uint32_t n_entries = res.bucket_infos_size();
+ dest_entries.resize(n_entries);
+ for (uint32_t i = 0; i < n_entries; ++i) {
+ const auto& proto_entry = res.bucket_infos(i);
+ dest_entries[i]._bucketId = document::BucketId(proto_entry.raw_bucket_id());
+ dest_entries[i]._info = get_bucket_info(proto_entry.bucket_info());
+ }
+ return reply;
+ });
+}
+
+// -----------------------------------------------------------------
+// NotifyBucketChange
+// -----------------------------------------------------------------
+
+void ProtocolSerialization7::onEncode(GBBuf& buf, const api::NotifyBucketChangeCommand& msg) const {
+ encode_bucket_request<protobuf::NotifyBucketChangeRequest>(buf, msg, [&](auto& req) {
+ set_bucket_info(*req.mutable_bucket_info(), msg.getBucketInfo());
+ });
+}
+
+void ProtocolSerialization7::onEncode(GBBuf& buf, const api::NotifyBucketChangeReply& msg) const {
+ encode_response<protobuf::NotifyBucketChangeResponse>(buf, msg, no_op_encode);
+}
+
+api::StorageCommand::UP ProtocolSerialization7::onDecodeNotifyBucketChangeCommand(BBuf& buf) const {
+ return decode_bucket_request<protobuf::NotifyBucketChangeRequest>(buf, [&](auto& req, auto& bucket) {
+ auto bucket_info = get_bucket_info(req.bucket_info());
+ return std::make_unique<api::NotifyBucketChangeCommand>(bucket, bucket_info);
+ });
+}
+
+api::StorageReply::UP ProtocolSerialization7::onDecodeNotifyBucketChangeReply(const SCmd& cmd, BBuf& buf) const {
+ return decode_response<protobuf::NotifyBucketChangeResponse>(buf, [&]([[maybe_unused]] auto& res) {
+ return std::make_unique<api::NotifyBucketChangeReply>(static_cast<const api::NotifyBucketChangeCommand&>(cmd));
+ });
+}
+
+// -----------------------------------------------------------------
+// SplitBucket
+// -----------------------------------------------------------------
+
+void ProtocolSerialization7::onEncode(GBBuf& buf, const api::SplitBucketCommand& msg) const {
+ encode_bucket_request<protobuf::SplitBucketRequest>(buf, msg, [&](auto& req) {
+ req.set_min_split_bits(msg.getMinSplitBits());
+ req.set_max_split_bits(msg.getMaxSplitBits());
+ req.set_min_byte_size(msg.getMinByteSize());
+ req.set_min_doc_count(msg.getMinDocCount());
+ });
+}
+
+void ProtocolSerialization7::onEncode(GBBuf& buf, const api::SplitBucketReply& msg) const {
+ encode_bucket_response<protobuf::SplitBucketResponse>(buf, msg, [&](auto& res) {
+ for (const auto& split_info : msg.getSplitInfo()) {
+ auto* proto_info = res.add_split_info();
+ proto_info->set_raw_bucket_id(split_info.first.getRawId());
+ set_bucket_info(*proto_info->mutable_bucket_info(), split_info.second);
+ }
+ });
+}
+
+api::StorageCommand::UP ProtocolSerialization7::onDecodeSplitBucketCommand(BBuf& buf) const {
+ return decode_bucket_request<protobuf::SplitBucketRequest>(buf, [&](auto& req, auto& bucket) {
+ auto cmd = std::make_unique<api::SplitBucketCommand>(bucket);
+ cmd->setMinSplitBits(static_cast<uint8_t>(req.min_split_bits()));
+ cmd->setMaxSplitBits(static_cast<uint8_t>(req.max_split_bits()));
+ cmd->setMinByteSize(req.min_byte_size());
+ cmd->setMinDocCount(req.min_doc_count());
+ return cmd;
+ });
+}
+
+api::StorageReply::UP ProtocolSerialization7::onDecodeSplitBucketReply(const SCmd& cmd, BBuf& buf) const {
+ return decode_bucket_response<protobuf::SplitBucketResponse>(buf, [&](auto& res) {
+ auto reply = std::make_unique<api::SplitBucketReply>(static_cast<const api::SplitBucketCommand&>(cmd));
+ auto& dest_info = reply->getSplitInfo();
+ dest_info.reserve(res.split_info_size());
+ for (const auto& proto_info : res.split_info()) {
+ dest_info.emplace_back(document::BucketId(proto_info.raw_bucket_id()),
+ get_bucket_info(proto_info.bucket_info()));
+ }
+ return reply;
+ });
+}
+
+// -----------------------------------------------------------------
+// JoinBuckets
+// -----------------------------------------------------------------
+
+void ProtocolSerialization7::onEncode(GBBuf& buf, const api::JoinBucketsCommand& msg) const {
+ encode_bucket_request<protobuf::JoinBucketsRequest>(buf, msg, [&](auto& req) {
+ for (const auto& source : msg.getSourceBuckets()) {
+ set_bucket_id(*req.add_source_buckets(), source);
+ }
+ req.set_min_join_bits(msg.getMinJoinBits());
+ });
+}
+
+void ProtocolSerialization7::onEncode(GBBuf& buf, const api::JoinBucketsReply& msg) const {
+ encode_bucket_info_response<protobuf::JoinBucketsResponse>(buf, msg, no_op_encode);
+}
+
+api::StorageCommand::UP ProtocolSerialization7::onDecodeJoinBucketsCommand(BBuf& buf) const {
+ return decode_bucket_request<protobuf::JoinBucketsRequest>(buf, [&](auto& req, auto& bucket) {
+ auto cmd = std::make_unique<api::JoinBucketsCommand>(bucket);
+ auto& entries = cmd->getSourceBuckets();
+ for (const auto& proto_bucket : req.source_buckets()) {
+ entries.emplace_back(get_bucket_id(proto_bucket));
+ }
+ cmd->setMinJoinBits(static_cast<uint8_t>(req.min_join_bits()));
+ return cmd;
+ });
+}
+
+api::StorageReply::UP ProtocolSerialization7::onDecodeJoinBucketsReply(const SCmd& cmd, BBuf& buf) const {
+ return decode_bucket_info_response<protobuf::JoinBucketsResponse>(buf, [&]([[maybe_unused]] auto& res) {
+ return std::make_unique<api::JoinBucketsReply>(static_cast<const api::JoinBucketsCommand&>(cmd));
+ });
+}
+
+// -----------------------------------------------------------------
+// SetBucketState
+// -----------------------------------------------------------------
+
+void ProtocolSerialization7::onEncode(GBBuf& buf, const api::SetBucketStateCommand& msg) const {
+ encode_bucket_request<protobuf::SetBucketStateRequest>(buf, msg, [&](auto& req) {
+ auto state = (msg.getState() == api::SetBucketStateCommand::BUCKET_STATE::ACTIVE
+ ? protobuf::SetBucketStateRequest_BucketState_Active
+ : protobuf::SetBucketStateRequest_BucketState_Inactive);
+ req.set_state(state);
+ });
+}
+
+void ProtocolSerialization7::onEncode(GBBuf& buf, const api::SetBucketStateReply& msg) const {
+ // SetBucketStateReply is _technically_ a BucketInfoReply, but the legacy protocol impls
+ // do _not_ encode bucket info as part of the wire format (and it's not used on the distributor),
+ // so we follow that here and only encode remapping information.
+ encode_bucket_response<protobuf::SetBucketStateResponse>(buf, msg, no_op_encode);
+}
+
+api::StorageCommand::UP ProtocolSerialization7::onDecodeSetBucketStateCommand(BBuf& buf) const {
+ return decode_bucket_request<protobuf::SetBucketStateRequest>(buf, [&](auto& req, auto& bucket) {
+ auto state = (req.state() == protobuf::SetBucketStateRequest_BucketState_Active
+ ? api::SetBucketStateCommand::BUCKET_STATE::ACTIVE
+ : api::SetBucketStateCommand::BUCKET_STATE::INACTIVE);
+ return std::make_unique<api::SetBucketStateCommand>(bucket, state);
+ });
+}
+
+api::StorageReply::UP ProtocolSerialization7::onDecodeSetBucketStateReply(const SCmd& cmd, BBuf& buf) const {
+ return decode_bucket_response<protobuf::SetBucketStateResponse>(buf, [&]([[maybe_unused]] auto& res) {
+ return std::make_unique<api::SetBucketStateReply>(static_cast<const api::SetBucketStateCommand&>(cmd));
+ });
+}
+
+// -----------------------------------------------------------------
+// CreateVisitor
+// -----------------------------------------------------------------
+
+void ProtocolSerialization7::onEncode(GBBuf& buf, const api::CreateVisitorCommand& msg) const {
+ encode_request<protobuf::CreateVisitorRequest>(buf, msg, [&](auto& req) {
+ set_bucket_space(*req.mutable_bucket_space(), msg.getBucketSpace());
+ for (const auto& bucket : msg.getBuckets()) {
+ set_bucket_id(*req.add_buckets(), bucket);
+ }
+
+ auto* ctrl_meta = req.mutable_control_meta();
+ ctrl_meta->set_library_name(msg.getLibraryName().data(), msg.getLibraryName().size());
+ ctrl_meta->set_instance_id(msg.getInstanceId().data(), msg.getInstanceId().size());
+ ctrl_meta->set_visitor_command_id(msg.getVisitorCmdId());
+ ctrl_meta->set_control_destination(msg.getControlDestination().data(), msg.getControlDestination().size());
+ ctrl_meta->set_data_destination(msg.getDataDestination().data(), msg.getDataDestination().size());
+ ctrl_meta->set_queue_timeout(msg.getQueueTimeout());
+ ctrl_meta->set_max_pending_reply_count(msg.getMaximumPendingReplyCount());
+ ctrl_meta->set_max_buckets_per_visitor(msg.getMaxBucketsPerVisitor());
+
+ auto* constraints = req.mutable_constraints();
+ constraints->set_document_selection(msg.getDocumentSelection().data(), msg.getDocumentSelection().size());
+ constraints->set_from_time_usec(msg.getFromTime());
+ constraints->set_to_time_usec(msg.getToTime());
+ constraints->set_visit_inconsistent_buckets(msg.visitInconsistentBuckets());
+ constraints->set_visit_removes(msg.visitRemoves());
+ constraints->set_field_set(msg.getFieldSet().data(), msg.getFieldSet().size());
+
+ for (const auto& param : msg.getParameters()) {
+ auto* proto_param = req.add_client_parameters();
+ proto_param->set_key(param.first.data(), param.first.size());
+ proto_param->set_value(param.second.data(), param.second.size());
+ }
+ });
+}
+
+void ProtocolSerialization7::onEncode(GBBuf& buf, const api::CreateVisitorReply& msg) const {
+ encode_response<protobuf::CreateVisitorResponse>(buf, msg, [&](auto& res) {
+ auto& stats = msg.getVisitorStatistics();
+ auto* proto_stats = res.mutable_visitor_statistics();
+ proto_stats->set_buckets_visited(stats.getBucketsVisited());
+ proto_stats->set_documents_visited(stats.getDocumentsVisited());
+ proto_stats->set_bytes_visited(stats.getBytesVisited());
+ proto_stats->set_documents_returned(stats.getDocumentsReturned());
+ proto_stats->set_bytes_returned(stats.getBytesReturned());
+ proto_stats->set_second_pass_documents_returned(stats.getSecondPassDocumentsReturned());
+ proto_stats->set_second_pass_bytes_returned(stats.getSecondPassBytesReturned());
+ });
+}
+
+api::StorageCommand::UP ProtocolSerialization7::onDecodeCreateVisitorCommand(BBuf& buf) const {
+ return decode_request<protobuf::CreateVisitorRequest>(buf, [&](auto& req) {
+ auto bucket_space = get_bucket_space(req.bucket_space());
+ auto& ctrl_meta = req.control_meta();
+ auto& constraints = req.constraints();
+ auto cmd = std::make_unique<api::CreateVisitorCommand>(bucket_space, ctrl_meta.library_name(),
+ ctrl_meta.instance_id(), constraints.document_selection());
+ for (const auto& proto_bucket : req.buckets()) {
+ cmd->getBuckets().emplace_back(get_bucket_id(proto_bucket));
+ }
+
+ cmd->setVisitorCmdId(ctrl_meta.visitor_command_id());
+ cmd->setControlDestination(ctrl_meta.control_destination());
+ cmd->setDataDestination(ctrl_meta.data_destination());
+ cmd->setMaximumPendingReplyCount(ctrl_meta.max_pending_reply_count());
+ cmd->setQueueTimeout(ctrl_meta.queue_timeout());
+ cmd->setMaxBucketsPerVisitor(ctrl_meta.max_buckets_per_visitor());
+ cmd->setVisitorDispatcherVersion(50); // FIXME this magic number is lifted verbatim from the 5.1 protocol impl
+
+ for (const auto& proto_param : req.client_parameters()) {
+ cmd->getParameters().set(proto_param.key(), proto_param.value());
+ }
+
+ cmd->setFromTime(constraints.from_time_usec());
+ cmd->setToTime(constraints.to_time_usec());
+ cmd->setVisitRemoves(constraints.visit_removes());
+ cmd->setFieldSet(constraints.field_set());
+ cmd->setVisitInconsistentBuckets(constraints.visit_inconsistent_buckets());
+ return cmd;
+ });
+}
+
+api::StorageReply::UP ProtocolSerialization7::onDecodeCreateVisitorReply(const SCmd& cmd, BBuf& buf) const {
+ return decode_response<protobuf::CreateVisitorResponse>(buf, [&](auto& res) {
+ auto reply = std::make_unique<api::CreateVisitorReply>(static_cast<const api::CreateVisitorCommand&>(cmd));
+ vdslib::VisitorStatistics vs;
+ const auto& proto_stats = res.visitor_statistics();
+ vs.setBucketsVisited(proto_stats.buckets_visited());
+ vs.setDocumentsVisited(proto_stats.documents_visited());
+ vs.setBytesVisited(proto_stats.bytes_visited());
+ vs.setDocumentsReturned(proto_stats.documents_returned());
+ vs.setBytesReturned(proto_stats.bytes_returned());
+ vs.setSecondPassDocumentsReturned(proto_stats.second_pass_documents_returned());
+ vs.setSecondPassBytesReturned(proto_stats.second_pass_bytes_returned());
+ reply->setVisitorStatistics(vs);
+ return reply;
+ });
+}
+
+// -----------------------------------------------------------------
+// DestroyVisitor
+// -----------------------------------------------------------------
+
+void ProtocolSerialization7::onEncode(GBBuf& buf, const api::DestroyVisitorCommand& msg) const {
+ encode_request<protobuf::DestroyVisitorRequest>(buf, msg, [&](auto& req) {
+ req.set_instance_id(msg.getInstanceId().data(), msg.getInstanceId().size());
+ });
+}
+
+void ProtocolSerialization7::onEncode(GBBuf& buf, const api::DestroyVisitorReply& msg) const {
+ encode_response<protobuf::DestroyVisitorResponse>(buf, msg, no_op_encode);
+}
+
+api::StorageCommand::UP ProtocolSerialization7::onDecodeDestroyVisitorCommand(BBuf& buf) const {
+ return decode_request<protobuf::DestroyVisitorRequest>(buf, [&](auto& req) {
+ return std::make_unique<api::DestroyVisitorCommand>(req.instance_id());
+ });
+}
+
+api::StorageReply::UP ProtocolSerialization7::onDecodeDestroyVisitorReply(const SCmd& cmd, BBuf& buf) const {
+ return decode_response<protobuf::DestroyVisitorResponse>(buf, [&]([[maybe_unused]] auto& res) {
+ return std::make_unique<api::DestroyVisitorReply>(static_cast<const api::DestroyVisitorCommand&>(cmd));
+ });
+}
+
+} // storage::mbusprot
diff --git a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.h b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.h
new file mode 100644
index 00000000000..f3499150278
--- /dev/null
+++ b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.h
@@ -0,0 +1,146 @@
+// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include "protocolserialization.h"
+#include <vespa/documentapi/loadtypes/loadtypeset.h>
+
+namespace storage {
+namespace mbusprot {
+
+/**
+ * Protocol serialization version that uses Protocol Buffers for all its binary
+ * encoding and decoding.
+ */
+class ProtocolSerialization7 : public ProtocolSerialization {
+ const std::shared_ptr<const document::DocumentTypeRepo> _repo;
+ const documentapi::LoadTypeSet& _load_types;
+public:
+ ProtocolSerialization7(std::shared_ptr<const document::DocumentTypeRepo> repo,
+ const documentapi::LoadTypeSet& load_types);
+
+ const document::DocumentTypeRepo& type_repo() const { return *_repo; }
+
+ // Put
+ void onEncode(GBBuf&, const api::PutCommand&) const override;
+ void onEncode(GBBuf&, const api::PutReply&) const override;
+ SCmd::UP onDecodePutCommand(BBuf&) const override;
+ SRep::UP onDecodePutReply(const SCmd&, BBuf&) const override;
+
+ // Update
+ void onEncode(GBBuf&, const api::UpdateCommand&) const override;
+ void onEncode(GBBuf&, const api::UpdateReply&) const override;
+ SCmd::UP onDecodeUpdateCommand(BBuf&) const override;
+ SRep::UP onDecodeUpdateReply(const SCmd&, BBuf&) const override;
+
+ // Remove
+ void onEncode(GBBuf&, const api::RemoveCommand&) const override;
+ void onEncode(GBBuf&, const api::RemoveReply&) const override;
+ SCmd::UP onDecodeRemoveCommand(BBuf&) const override;
+ SRep::UP onDecodeRemoveReply(const SCmd&, BBuf&) const override;
+
+ // Get
+ void onEncode(GBBuf&, const api::GetCommand&) const override;
+ void onEncode(GBBuf&, const api::GetReply&) const override;
+ SCmd::UP onDecodeGetCommand(BBuf&) const override;
+ SRep::UP onDecodeGetReply(const SCmd&, BBuf&) const override;
+
+ // Revert - TODO this is deprecated, no?
+ void onEncode(GBBuf&, const api::RevertCommand&) const override;
+ void onEncode(GBBuf&, const api::RevertReply&) const override;
+ SCmd::UP onDecodeRevertCommand(BBuf&) const override;
+ SRep::UP onDecodeRevertReply(const SCmd&, BBuf&) const override;
+
+ // DeleteBucket
+ void onEncode(GBBuf&, const api::DeleteBucketCommand&) const override;
+ void onEncode(GBBuf&, const api::DeleteBucketReply&) const override;
+ SCmd::UP onDecodeDeleteBucketCommand(BBuf&) const override;
+ SRep::UP onDecodeDeleteBucketReply(const SCmd&, BBuf&) const override;
+
+ // CreateBucket
+ void onEncode(GBBuf&, const api::CreateBucketCommand&) const override;
+ void onEncode(GBBuf&, const api::CreateBucketReply&) const override;
+ SCmd::UP onDecodeCreateBucketCommand(BBuf&) const override;
+ SRep::UP onDecodeCreateBucketReply(const SCmd&, BBuf&) const override;
+
+ // MergeBucket
+ void onEncode(GBBuf&, const api::MergeBucketCommand&) const override;
+ void onEncode(GBBuf&, const api::MergeBucketReply&) const override;
+ SCmd::UP onDecodeMergeBucketCommand(BBuf&) const override;
+ SRep::UP onDecodeMergeBucketReply(const SCmd&, BBuf&) const override;
+
+ // GetBucketDiff
+ void onEncode(GBBuf&, const api::GetBucketDiffCommand&) const override;
+ void onEncode(GBBuf&, const api::GetBucketDiffReply&) const override;
+ SCmd::UP onDecodeGetBucketDiffCommand(BBuf&) const override;
+ SRep::UP onDecodeGetBucketDiffReply(const SCmd&, BBuf&) const override;
+
+ // ApplyBucketDiff
+ void onEncode(GBBuf&, const api::ApplyBucketDiffCommand&) const override;
+ void onEncode(GBBuf&, const api::ApplyBucketDiffReply&) const override;
+ SCmd::UP onDecodeApplyBucketDiffCommand(BBuf&) const override;
+ SRep::UP onDecodeApplyBucketDiffReply(const SCmd&, BBuf&) const override;
+
+ // RequestBucketInfo
+ void onEncode(GBBuf&, const api::RequestBucketInfoCommand&) const override;
+ void onEncode(GBBuf&, const api::RequestBucketInfoReply&) const override;
+ SCmd::UP onDecodeRequestBucketInfoCommand(BBuf&) const override;
+ SRep::UP onDecodeRequestBucketInfoReply(const SCmd&, BBuf&) const override;
+
+ // NotifyBucketChange
+ void onEncode(GBBuf&, const api::NotifyBucketChangeCommand&) const override;
+ void onEncode(GBBuf&, const api::NotifyBucketChangeReply&) const override;
+ SCmd::UP onDecodeNotifyBucketChangeCommand(BBuf&) const override;
+ SRep::UP onDecodeNotifyBucketChangeReply(const SCmd&, BBuf&) const override;
+
+ // SplitBucket
+ void onEncode(GBBuf&, const api::SplitBucketCommand&) const override;
+ void onEncode(GBBuf&, const api::SplitBucketReply&) const override;
+ SCmd::UP onDecodeSplitBucketCommand(BBuf&) const override;
+ SRep::UP onDecodeSplitBucketReply(const SCmd&, BBuf&) const override;
+
+ // JoinBuckets
+ void onEncode(GBBuf&, const api::JoinBucketsCommand&) const override;
+ void onEncode(GBBuf&, const api::JoinBucketsReply&) const override;
+ SCmd::UP onDecodeJoinBucketsCommand(BBuf&) const override;
+ SRep::UP onDecodeJoinBucketsReply(const SCmd&, BBuf&) const override;
+
+ // SetBucketState
+ void onEncode(GBBuf&, const api::SetBucketStateCommand&) const override;
+ void onEncode(GBBuf&, const api::SetBucketStateReply&) const override;
+ SCmd::UP onDecodeSetBucketStateCommand(BBuf&) const override;
+ SRep::UP onDecodeSetBucketStateReply(const SCmd&, BBuf&) const override;
+
+ // CreateVisitor
+ void onEncode(GBBuf&, const api::CreateVisitorCommand&) const override;
+ void onEncode(GBBuf&, const api::CreateVisitorReply&) const override;
+ SCmd::UP onDecodeCreateVisitorCommand(BBuf&) const override;
+ SRep::UP onDecodeCreateVisitorReply(const SCmd&, BBuf&) const override;
+
+ // DestroyVisitor
+ void onEncode(GBBuf&, const api::DestroyVisitorCommand&) const override;
+ void onEncode(GBBuf&, const api::DestroyVisitorReply&) const override;
+ SCmd::UP onDecodeDestroyVisitorCommand(BBuf&) const override;
+ SRep::UP onDecodeDestroyVisitorReply(const SCmd&, BBuf&) const override;
+
+ // RemoveLocation
+ void onEncode(GBBuf&, const api::RemoveLocationCommand&) const override;
+ void onEncode(GBBuf&, const api::RemoveLocationReply&) const override;
+ SCmd::UP onDecodeRemoveLocationCommand(BBuf&) const override;
+ SRep::UP onDecodeRemoveLocationReply(const SCmd&, BBuf&) const override;
+
+private:
+ template <typename ProtobufType, typename Func>
+ std::unique_ptr<api::StorageCommand> decode_request(document::ByteBuffer& in_buf, Func&& f) const;
+ template <typename ProtobufType, typename Func>
+ std::unique_ptr<api::StorageReply> decode_response(document::ByteBuffer& in_buf, Func&& f) const;
+ template <typename ProtobufType, typename Func>
+ std::unique_ptr<api::StorageCommand> decode_bucket_request(document::ByteBuffer& in_buf, Func&& f) const;
+ template <typename ProtobufType, typename Func>
+ std::unique_ptr<api::StorageReply> decode_bucket_response(document::ByteBuffer& in_buf, Func&& f) const;
+ template <typename ProtobufType, typename Func>
+ std::unique_ptr<api::StorageReply> decode_bucket_info_response(document::ByteBuffer& in_buf, Func&& f) const;
+};
+
+}
+}
diff --git a/storageapi/src/vespa/storageapi/mbusprot/storageprotocol.cpp b/storageapi/src/vespa/storageapi/mbusprot/storageprotocol.cpp
index 7e6be0a84f5..7bc6333762b 100644
--- a/storageapi/src/vespa/storageapi/mbusprot/storageprotocol.cpp
+++ b/storageapi/src/vespa/storageapi/mbusprot/storageprotocol.cpp
@@ -20,7 +20,8 @@ StorageProtocol::StorageProtocol(const std::shared_ptr<const document::DocumentT
: _serializer5_0(repo, loadTypes),
_serializer5_1(repo, loadTypes),
_serializer5_2(repo, loadTypes),
- _serializer6_0(repo, loadTypes)
+ _serializer6_0(repo, loadTypes),
+ _serializer7_0(repo, loadTypes)
{
}
@@ -33,6 +34,7 @@ StorageProtocol::createPolicy(const mbus::string&, const mbus::string&) const
}
namespace {
+ vespalib::Version version7_0(7, 40, 5);
vespalib::Version version6_0(6, 240, 0);
vespalib::Version version5_2(5, 93, 30);
vespalib::Version version5_1(5, 1, 0);
@@ -106,8 +108,10 @@ StorageProtocol::encode(const vespalib::Version& version,
} else {
if (version < version6_0) {
return encodeMessage(_serializer5_2, routable, message, version5_2, version);
- } else {
+ } else if (version < version7_0) {
return encodeMessage(_serializer6_0, routable, message, version6_0, version);
+ } else {
+ return encodeMessage(_serializer7_0, routable, message, version7_0, version);
}
}
@@ -180,8 +184,10 @@ StorageProtocol::decode(const vespalib::Version & version,
} else {
if (version < version6_0) {
return decodeMessage(_serializer5_2, data, type, version5_2, version);
- } else {
+ } else if (version < version7_0) {
return decodeMessage(_serializer6_0, data, type, version6_0, version);
+ } else {
+ return decodeMessage(_serializer7_0, data, type, version7_0, version);
}
}
} catch (std::exception & e) {
diff --git a/storageapi/src/vespa/storageapi/mbusprot/storageprotocol.h b/storageapi/src/vespa/storageapi/mbusprot/storageprotocol.h
index 1acd7c9675f..67ea121c340 100644
--- a/storageapi/src/vespa/storageapi/mbusprot/storageprotocol.h
+++ b/storageapi/src/vespa/storageapi/mbusprot/storageprotocol.h
@@ -3,6 +3,7 @@
#include "protocolserialization5_2.h"
#include "protocolserialization6_0.h"
+#include "protocolserialization7.h"
#include <vespa/messagebus/iprotocol.h>
namespace storage::mbusprot {
@@ -28,6 +29,7 @@ private:
ProtocolSerialization5_1 _serializer5_1;
ProtocolSerialization5_2 _serializer5_2;
ProtocolSerialization6_0 _serializer6_0;
+ ProtocolSerialization7 _serializer7_0;
};
}