diff options
author | Tor Brede Vekterli <vekterli@verizonmedia.com> | 2019-04-12 14:17:56 +0000 |
---|---|---|
committer | Tor Brede Vekterli <vekterli@verizonmedia.com> | 2019-04-23 12:16:45 +0000 |
commit | 183ce7f7475a361e64913cd60685392d635c8b1a (patch) | |
tree | 2da75df5b39ef5f7c258ac24c200523a2da51172 /storageapi | |
parent | 979a2980aeaf89cc111f9dec74fa46cf191a8d8f (diff) |
Reapply protocol buffers for internal StorageAPI wire encoding
Diffstat (limited to 'storageapi')
21 files changed, 2304 insertions, 725 deletions
diff --git a/storageapi/src/tests/CMakeLists.txt b/storageapi/src/tests/CMakeLists.txt index ebbf3b8357a..ddc43c70004 100644 --- a/storageapi/src/tests/CMakeLists.txt +++ b/storageapi/src/tests/CMakeLists.txt @@ -7,6 +7,7 @@ vespa_add_executable(storageapi_gtest_runner_app TEST gtest_runner.cpp DEPENDS storageapi_testbuckets + storageapi_testmbusprot storageapi gtest ) @@ -22,8 +23,8 @@ vespa_add_executable(storageapi_testrunner_app TEST testrunner.cpp DEPENDS storageapi_testmessageapi - storageapi_testmbusprot storageapi + vdstestlib ) vespa_add_test( diff --git a/storageapi/src/tests/mbusprot/CMakeLists.txt b/storageapi/src/tests/mbusprot/CMakeLists.txt index 16ced76155c..2801c9a91dd 100644 --- a/storageapi/src/tests/mbusprot/CMakeLists.txt +++ b/storageapi/src/tests/mbusprot/CMakeLists.txt @@ -4,5 +4,5 @@ vespa_add_library(storageapi_testmbusprot storageprotocoltest.cpp DEPENDS storageapi - vdstestlib + gtest ) diff --git a/storageapi/src/tests/mbusprot/storageprotocoltest.cpp b/storageapi/src/tests/mbusprot/storageprotocoltest.cpp index f634667afd5..8690d89e12b 100644 --- a/storageapi/src/tests/mbusprot/storageprotocoltest.cpp +++ b/storageapi/src/tests/mbusprot/storageprotocoltest.cpp @@ -14,12 +14,16 @@ #include <vespa/document/update/fieldpathupdates.h> #include <vespa/document/test/make_document_bucket.h> #include <vespa/document/test/make_bucket_space.h> -#include <vespa/vdstestlib/cppunit/macros.h> #include <vespa/vespalib/util/growablebytebuffer.h> #include <vespa/vespalib/objects/nbostream.h> + #include <iomanip> #include <sstream> +#include <gtest/gtest.h> + +using namespace ::testing; + using std::shared_ptr; using document::BucketSpace; using document::ByteBuffer; @@ -32,183 +36,103 @@ using document::test::makeBucketSpace; using storage::lib::ClusterState; using vespalib::string; -namespace storage { -namespace api { +namespace vespalib { + +// Needed for GTest to properly understand how to print Version values. +// If not present, it will print the byte values of the presumed memory area +// (which will be overrun for some reason, causing Valgrind to scream at us). +void PrintTo(const vespalib::Version& v, std::ostream* os) { + *os << v.toString(); +} + +} + +namespace storage::api { -struct StorageProtocolTest : public CppUnit::TestFixture { +struct StorageProtocolTest : TestWithParam<vespalib::Version> { document::TestDocMan _docMan; document::Document::SP _testDoc; document::DocumentId _testDocId; + document::BucketId _bucket_id; document::Bucket _bucket; - vespalib::Version _version5_0{5, 0, 12}; - vespalib::Version _version5_1{5, 1, 0}; - vespalib::Version _version5_2{5, 93, 30}; - vespalib::Version _version6_0{6, 240, 0}; + document::BucketId _dummy_remap_bucket{17, 12345}; + BucketInfo _dummy_bucket_info{1,2,3,4,5, true, false, 48}; documentapi::LoadTypeSet _loadTypes; mbusprot::StorageProtocol _protocol; - static std::vector<std::string> _nonVerboseMessageStrings; - static std::vector<std::string> _verboseMessageStrings; - static std::vector<char> _serialization50; static auto constexpr CONDITION_STRING = "There's just one condition"; StorageProtocolTest() : _docMan(), _testDoc(_docMan.createDocument()), _testDocId(_testDoc->getId()), - _bucket(makeDocumentBucket(document::BucketId(16, 0x51))), + _bucket_id(16, 0x51), + _bucket(makeDocumentBucket(_bucket_id)), _protocol(_docMan.getTypeRepoSP(), _loadTypes) { _loadTypes.addLoadType(34, "foo", documentapi::Priority::PRI_NORMAL_2); } + ~StorageProtocolTest(); + + void set_dummy_bucket_info_reply_fields(BucketInfoReply& reply) { + reply.setBucketInfo(_dummy_bucket_info); + reply.remapBucketId(_dummy_remap_bucket); + } + + void assert_bucket_info_reply_fields_propagated(const BucketInfoReply& reply) { + EXPECT_EQ(_dummy_bucket_info, reply.getBucketInfo()); + EXPECT_TRUE(reply.hasBeenRemapped()); + EXPECT_EQ(_dummy_remap_bucket, reply.getBucketId()); + EXPECT_EQ(_bucket_id, reply.getOriginalBucketId()); + } template<typename Command> - std::shared_ptr<Command> copyCommand(const std::shared_ptr<Command>&, vespalib::Version); + std::shared_ptr<Command> copyCommand(const std::shared_ptr<Command>&); template<typename Reply> std::shared_ptr<Reply> copyReply(const std::shared_ptr<Reply>&); - void recordOutput(const api::StorageMessage& msg); - - void recordSerialization50(); - - void testWriteSerialization50(); - void testAddress50(); - void testStringOutputs(); - - void testPut51(); - void testUpdate51(); - void testGet51(); - void testRemove51(); - void testRevert51(); - void testRequestBucketInfo51(); - void testNotifyBucketChange51(); - void testCreateBucket51(); - void testDeleteBucket51(); - void testMergeBucket51(); - void testGetBucketDiff51(); - void testApplyBucketDiff51(); - void testSplitBucket51(); - void testSplitBucketChain51(); - void testJoinBuckets51(); - void testCreateVisitor51(); - void testDestroyVisitor51(); - void testRemoveLocation51(); - void testInternalMessage(); - void testSetBucketState51(); - - void testPutCommand52(); - void testUpdateCommand52(); - void testRemoveCommand52(); - - void testPutCommandWithBucketSpace6_0(); - void testCreateVisitorWithBucketSpace6_0(); - void testRequestBucketInfoWithBucketSpace6_0(); - - void serialized_size_is_used_to_set_approx_size_of_storage_message(); - - CPPUNIT_TEST_SUITE(StorageProtocolTest); - - // Enable to see string outputs of messages - // CPPUNIT_TEST_DISABLED(testStringOutputs); - - // Enable this to write 5.0 serialization to disk - // CPPUNIT_TEST_DISABLED(testWriteSerialization50); - // CPPUNIT_TEST_DISABLED(testAddress50); - - // 5.1 tests - CPPUNIT_TEST(testPut51); - CPPUNIT_TEST(testUpdate51); - CPPUNIT_TEST(testGet51); - CPPUNIT_TEST(testRemove51); - CPPUNIT_TEST(testRevert51); - CPPUNIT_TEST(testRequestBucketInfo51); - CPPUNIT_TEST(testNotifyBucketChange51); - CPPUNIT_TEST(testCreateBucket51); - CPPUNIT_TEST(testDeleteBucket51); - CPPUNIT_TEST(testMergeBucket51); - CPPUNIT_TEST(testGetBucketDiff51); - CPPUNIT_TEST(testApplyBucketDiff51); - CPPUNIT_TEST(testSplitBucket51); - CPPUNIT_TEST(testJoinBuckets51); - CPPUNIT_TEST(testCreateVisitor51); - CPPUNIT_TEST(testDestroyVisitor51); - CPPUNIT_TEST(testRemoveLocation51); - CPPUNIT_TEST(testInternalMessage); - CPPUNIT_TEST(testSetBucketState51); - - // 5.2 tests - CPPUNIT_TEST(testPutCommand52); - CPPUNIT_TEST(testUpdateCommand52); - CPPUNIT_TEST(testRemoveCommand52); - - // 6.0 tests - CPPUNIT_TEST(testPutCommandWithBucketSpace6_0); - CPPUNIT_TEST(testCreateVisitorWithBucketSpace6_0); - CPPUNIT_TEST(testRequestBucketInfoWithBucketSpace6_0); - - CPPUNIT_TEST(serialized_size_is_used_to_set_approx_size_of_storage_message); - - CPPUNIT_TEST_SUITE_END(); }; -CPPUNIT_TEST_SUITE_REGISTRATION(StorageProtocolTest); - -std::vector<std::string> StorageProtocolTest::_nonVerboseMessageStrings; -std::vector<std::string> StorageProtocolTest::_verboseMessageStrings; -std::vector<char> StorageProtocolTest::_serialization50; - -void -StorageProtocolTest::recordOutput(const api::StorageMessage& msg) -{ - std::ostringstream ost; - ost << " "; - msg.print(ost, false, " "); - _nonVerboseMessageStrings.push_back(ost.str()); - ost.str(""); - ost << " "; - msg.print(ost, true, " "); - _verboseMessageStrings.push_back(ost.str()); -} +StorageProtocolTest::~StorageProtocolTest() = default; namespace { - bool debug = false; - - struct ScopedName { - std::string _name; +std::string version_as_gtest_string(TestParamInfo<vespalib::Version> info) { + std::ostringstream ss; + auto& p = info.param; + // Dots are not allowed in test names, so convert to underscores. + ss << p.getMajor() << '_' << p.getMinor() << '_' << p.getMicro(); + return ss.str(); +} - ScopedName(const std::string& s) : _name(s) { - if (debug) std::cerr << "Starting test " << _name << "\n"; - } - ~ScopedName() { - if (debug) std::cerr << "Finished test " << _name << "\n"; - } - }; +} -} // Anonymous namespace +// TODO replace with INSTANTIATE_TEST_SUITE_P on newer gtest versions +INSTANTIATE_TEST_CASE_P(MultiVersionTest, StorageProtocolTest, + Values(vespalib::Version(6, 240, 0), + vespalib::Version(7, 40, 5)), + version_as_gtest_string); namespace { mbus::Message::UP lastCommand; mbus::Reply::UP lastReply; } -void -StorageProtocolTest::testAddress50() -{ +TEST_F(StorageProtocolTest, testAddress50) { StorageMessageAddress address("foo", lib::NodeType::STORAGE, 3); - CPPUNIT_ASSERT_EQUAL(vespalib::string("storage/cluster.foo/storage/3/default"), + EXPECT_EQ(vespalib::string("storage/cluster.foo/storage/3/default"), address.getRoute().toString()); } template<typename Command> std::shared_ptr<Command> -StorageProtocolTest::copyCommand(const std::shared_ptr<Command>& m, vespalib::Version version) +StorageProtocolTest::copyCommand(const std::shared_ptr<Command>& m) { - mbus::Message::UP mbusMessage(new mbusprot::StorageCommand(m)); + auto mbusMessage = std::make_unique<mbusprot::StorageCommand>(m); + auto version = GetParam(); mbus::Blob blob = _protocol.encode(version, *mbusMessage); mbus::Routable::UP copy(_protocol.decode(version, blob)); + assert(copy.get()); - CPPUNIT_ASSERT(copy.get()); - - mbusprot::StorageCommand* copy2(dynamic_cast<mbusprot::StorageCommand*>(copy.get())); - CPPUNIT_ASSERT(copy2 != 0); + auto* copy2 = dynamic_cast<mbusprot::StorageCommand*>(copy.get()); + assert(copy2 != nullptr); StorageCommand::SP internalMessage(copy2->getCommand()); lastCommand = std::move(mbusMessage); @@ -219,80 +143,61 @@ StorageProtocolTest::copyCommand(const std::shared_ptr<Command>& m, vespalib::Ve template<typename Reply> std::shared_ptr<Reply> StorageProtocolTest::copyReply(const std::shared_ptr<Reply>& m) { - mbus::Reply::UP mbusMessage(new mbusprot::StorageReply(m)); - mbus::Blob blob = _protocol.encode(_version5_1, *mbusMessage); - mbus::Routable::UP copy(_protocol.decode(_version5_1, blob)); - CPPUNIT_ASSERT(copy.get()); - mbusprot::StorageReply* copy2( - dynamic_cast<mbusprot::StorageReply*>(copy.get())); - CPPUNIT_ASSERT(copy2 != 0); + auto mbusMessage = std::make_unique<mbusprot::StorageReply>(m); + auto version = GetParam(); + mbus::Blob blob = _protocol.encode(version, *mbusMessage); + mbus::Routable::UP copy(_protocol.decode(version, blob)); + assert(copy.get()); + + auto* copy2 = dynamic_cast<mbusprot::StorageReply*>(copy.get()); + assert(copy2 != nullptr); + copy2->setMessage(std::move(lastCommand)); - StorageReply::SP internalMessage(copy2->getReply()); + auto internalMessage = copy2->getReply(); lastReply = std::move(mbusMessage); lastCommand = copy2->getMessage(); return std::dynamic_pointer_cast<Reply>(internalMessage); } -void -StorageProtocolTest::recordSerialization50() -{ - assert(lastCommand.get()); - assert(lastReply.get()); - for (uint32_t j=0; j<2; ++j) { - mbusprot::StorageMessage& msg(j == 0 - ? dynamic_cast<mbusprot::StorageMessage&>(*lastCommand) - : dynamic_cast<mbusprot::StorageMessage&>(*lastReply)); - msg.getInternalMessage()->forceMsgId(0); - mbus::Routable& routable(j == 0 - ? dynamic_cast<mbus::Routable&>(*lastCommand) - : dynamic_cast<mbus::Routable&>(*lastReply)); - mbus::Blob blob = _protocol.encode(_version5_0, routable); - _serialization50.push_back('\n'); - std::string type(msg.getInternalMessage()->getType().toString()); - for (uint32_t i=0, n=type.size(); i<n; ++i) { - _serialization50.push_back(type[i]); - } - _serialization50.push_back('\n'); - - for (uint32_t i=0, n=blob.size(); i<n; ++i) { - _serialization50.push_back(blob.data()[i]); - } - } -} - -void -StorageProtocolTest::testPut51() -{ - ScopedName test("testPut51"); - PutCommand::SP cmd(new PutCommand(_bucket, _testDoc, 14)); +TEST_P(StorageProtocolTest, put) { + auto cmd = std::make_shared<PutCommand>(_bucket, _testDoc, 14); cmd->setUpdateTimestamp(Timestamp(13)); cmd->setLoadType(_loadTypes["foo"]); - PutCommand::SP cmd2(copyCommand(cmd, _version5_1)); - CPPUNIT_ASSERT_EQUAL(*_testDoc, *cmd2->getDocument()); - CPPUNIT_ASSERT_EQUAL(vespalib::string("foo"), cmd2->getLoadType().getName()); - CPPUNIT_ASSERT_EQUAL(Timestamp(14), cmd2->getTimestamp()); - CPPUNIT_ASSERT_EQUAL(Timestamp(13), cmd2->getUpdateTimestamp()); - - PutReply::SP reply(new PutReply(*cmd2)); - CPPUNIT_ASSERT(reply->hasDocument()); - CPPUNIT_ASSERT_EQUAL(*_testDoc, *reply->getDocument()); - PutReply::SP reply2(copyReply(reply)); - CPPUNIT_ASSERT(reply2->hasDocument()); - CPPUNIT_ASSERT_EQUAL(*_testDoc, *reply->getDocument()); - CPPUNIT_ASSERT_EQUAL(_testDoc->getId(), reply2->getDocumentId()); - CPPUNIT_ASSERT_EQUAL(Timestamp(14), reply2->getTimestamp()); - - recordOutput(*cmd2); - recordOutput(*reply2); - recordSerialization50(); -} - -void -StorageProtocolTest::testUpdate51() -{ - ScopedName test("testUpdate51"); - document::DocumentUpdate::SP update(new document::DocumentUpdate(_docMan.getTypeRepo(), *_testDoc->getDataType(), _testDoc->getId())); - std::shared_ptr<document::AssignValueUpdate> assignUpdate(new document::AssignValueUpdate(document::IntFieldValue(17))); + auto cmd2 = copyCommand(cmd); + EXPECT_EQ(_bucket, cmd2->getBucket()); + EXPECT_EQ(*_testDoc, *cmd2->getDocument()); + EXPECT_EQ(vespalib::string("foo"), cmd2->getLoadType().getName()); + EXPECT_EQ(Timestamp(14), cmd2->getTimestamp()); + EXPECT_EQ(Timestamp(13), cmd2->getUpdateTimestamp()); + + auto reply = std::make_shared<PutReply>(*cmd2); + ASSERT_TRUE(reply->hasDocument()); + EXPECT_EQ(*_testDoc, *reply->getDocument()); + set_dummy_bucket_info_reply_fields(*reply); + auto reply2 = copyReply(reply); + ASSERT_TRUE(reply2->hasDocument()); + EXPECT_EQ(*_testDoc, *reply->getDocument()); + EXPECT_EQ(_testDoc->getId(), reply2->getDocumentId()); + EXPECT_EQ(Timestamp(14), reply2->getTimestamp()); + EXPECT_NO_FATAL_FAILURE(assert_bucket_info_reply_fields_propagated(*reply2)); +} + +TEST_P(StorageProtocolTest, response_without_remapped_bucket_preserves_original_bucket) { + auto cmd = std::make_shared<PutCommand>(_bucket, _testDoc, 14); + auto cmd2 = copyCommand(cmd); + auto reply = std::make_shared<PutReply>(*cmd2); + auto reply2 = copyReply(reply); + + EXPECT_FALSE(reply2->hasBeenRemapped()); + EXPECT_EQ(_bucket_id, reply2->getBucketId()); + EXPECT_EQ(document::BucketId(), reply2->getOriginalBucketId()); + +} + +TEST_P(StorageProtocolTest, update) { + auto update = std::make_shared<document::DocumentUpdate>( + _docMan.getTypeRepo(), *_testDoc->getDataType(), _testDoc->getId()); + auto assignUpdate = std::make_shared<document::AssignValueUpdate>(document::IntFieldValue(17)); document::FieldUpdate fieldUpdate(_testDoc->getField("headerval")); fieldUpdate.addUpdate(*assignUpdate); update->addUpdate(fieldUpdate); @@ -300,217 +205,157 @@ StorageProtocolTest::testUpdate51() update->addFieldPathUpdate(document::FieldPathUpdate::CP( new document::RemoveFieldPathUpdate("headerval", "testdoctype1.headerval > 0"))); - UpdateCommand::SP cmd(new UpdateCommand(_bucket, update, 14)); - CPPUNIT_ASSERT_EQUAL(Timestamp(0), cmd->getOldTimestamp()); + auto cmd = std::make_shared<UpdateCommand>(_bucket, update, 14); + EXPECT_EQ(Timestamp(0), cmd->getOldTimestamp()); cmd->setOldTimestamp(10); - UpdateCommand::SP cmd2(copyCommand(cmd, _version5_1)); - CPPUNIT_ASSERT_EQUAL(_testDocId, cmd2->getDocumentId()); - CPPUNIT_ASSERT_EQUAL(Timestamp(14), cmd2->getTimestamp()); - CPPUNIT_ASSERT_EQUAL(Timestamp(10), cmd2->getOldTimestamp()); - CPPUNIT_ASSERT_EQUAL(*update, *cmd2->getUpdate()); - - UpdateReply::SP reply(new UpdateReply(*cmd2, 8)); - UpdateReply::SP reply2(copyReply(reply)); - CPPUNIT_ASSERT_EQUAL(_testDocId, reply2->getDocumentId()); - CPPUNIT_ASSERT_EQUAL(Timestamp(14), reply2->getTimestamp()); - CPPUNIT_ASSERT_EQUAL(Timestamp(8), reply->getOldTimestamp()); - - recordOutput(*cmd2); - recordOutput(*reply2); - recordSerialization50(); -} - -void -StorageProtocolTest::testGet51() -{ - ScopedName test("testGet51"); - GetCommand::SP cmd(new GetCommand(_bucket, _testDocId, "foo,bar,vekterli", 123)); - GetCommand::SP cmd2(copyCommand(cmd, _version5_1)); - CPPUNIT_ASSERT_EQUAL(_testDocId, cmd2->getDocumentId()); - CPPUNIT_ASSERT_EQUAL(Timestamp(123), cmd2->getBeforeTimestamp()); - CPPUNIT_ASSERT_EQUAL(vespalib::string("foo,bar,vekterli"), cmd2->getFieldSet()); - - GetReply::SP reply(new GetReply(*cmd2, _testDoc, 100)); - GetReply::SP reply2(copyReply(reply)); - CPPUNIT_ASSERT(reply2.get()); - CPPUNIT_ASSERT(reply2->getDocument().get()); - CPPUNIT_ASSERT_EQUAL(*_testDoc, *reply2->getDocument()); - CPPUNIT_ASSERT_EQUAL(_testDoc->getId(), reply2->getDocumentId()); - CPPUNIT_ASSERT_EQUAL(Timestamp(123), reply2->getBeforeTimestamp()); - CPPUNIT_ASSERT_EQUAL(Timestamp(100), reply2->getLastModifiedTimestamp()); - - recordOutput(*cmd2); - recordOutput(*reply2); - recordSerialization50(); -} - -void -StorageProtocolTest::testRemove51() -{ - ScopedName test("testRemove51"); - RemoveCommand::SP cmd(new RemoveCommand(_bucket, _testDocId, 159)); - RemoveCommand::SP cmd2(copyCommand(cmd, _version5_1)); - CPPUNIT_ASSERT_EQUAL(_testDocId, cmd2->getDocumentId()); - CPPUNIT_ASSERT_EQUAL(Timestamp(159), cmd2->getTimestamp()); - - RemoveReply::SP reply(new RemoveReply(*cmd2, 48)); - reply->setBucketInfo(BucketInfo(1,2,3,4,5, true, false, 48)); - - RemoveReply::SP reply2(copyReply(reply)); - CPPUNIT_ASSERT_EQUAL(_testDocId, reply2->getDocumentId()); - CPPUNIT_ASSERT_EQUAL(Timestamp(159), reply2->getTimestamp()); - CPPUNIT_ASSERT_EQUAL(Timestamp(48), reply2->getOldTimestamp()); - CPPUNIT_ASSERT_EQUAL(BucketInfo(1,2,3,4,5, true, false, 48), - reply2->getBucketInfo()); - - recordOutput(*cmd2); - recordOutput(*reply2); - recordSerialization50(); -} - -void -StorageProtocolTest::testRevert51() -{ - ScopedName test("testRevertCommand51"); + auto cmd2 = copyCommand(cmd); + EXPECT_EQ(_bucket, cmd2->getBucket()); + EXPECT_EQ(_testDocId, cmd2->getDocumentId()); + EXPECT_EQ(Timestamp(14), cmd2->getTimestamp()); + EXPECT_EQ(Timestamp(10), cmd2->getOldTimestamp()); + EXPECT_EQ(*update, *cmd2->getUpdate()); + + auto reply = std::make_shared<UpdateReply>(*cmd2, 8); + set_dummy_bucket_info_reply_fields(*reply); + auto reply2 = copyReply(reply); + EXPECT_EQ(_testDocId, reply2->getDocumentId()); + EXPECT_EQ(Timestamp(14), reply2->getTimestamp()); + EXPECT_EQ(Timestamp(8), reply->getOldTimestamp()); + EXPECT_NO_FATAL_FAILURE(assert_bucket_info_reply_fields_propagated(*reply2)); +} + +TEST_P(StorageProtocolTest, get) { + auto cmd = std::make_shared<GetCommand>(_bucket, _testDocId, "foo,bar,vekterli", 123); + auto cmd2 = copyCommand(cmd); + EXPECT_EQ(_bucket, cmd2->getBucket()); + EXPECT_EQ(_testDocId, cmd2->getDocumentId()); + EXPECT_EQ(Timestamp(123), cmd2->getBeforeTimestamp()); + EXPECT_EQ(vespalib::string("foo,bar,vekterli"), cmd2->getFieldSet()); + + auto reply = std::make_shared<GetReply>(*cmd2, _testDoc, 100); + set_dummy_bucket_info_reply_fields(*reply); + auto reply2 = copyReply(reply); + ASSERT_TRUE(reply2.get() != nullptr); + ASSERT_TRUE(reply2->getDocument().get() != nullptr); + EXPECT_EQ(*_testDoc, *reply2->getDocument()); + EXPECT_EQ(_testDoc->getId(), reply2->getDocumentId()); + EXPECT_EQ(Timestamp(123), reply2->getBeforeTimestamp()); + EXPECT_EQ(Timestamp(100), reply2->getLastModifiedTimestamp()); + EXPECT_NO_FATAL_FAILURE(assert_bucket_info_reply_fields_propagated(*reply2)); +} + +TEST_P(StorageProtocolTest, remove) { + auto cmd = std::make_shared<RemoveCommand>(_bucket, _testDocId, 159); + auto cmd2 = copyCommand(cmd); + EXPECT_EQ(_bucket, cmd2->getBucket()); + EXPECT_EQ(_testDocId, cmd2->getDocumentId()); + EXPECT_EQ(Timestamp(159), cmd2->getTimestamp()); + + auto reply = std::make_shared<RemoveReply>(*cmd2, 48); + set_dummy_bucket_info_reply_fields(*reply); + + auto reply2 = copyReply(reply); + EXPECT_EQ(_testDocId, reply2->getDocumentId()); + EXPECT_EQ(Timestamp(159), reply2->getTimestamp()); + EXPECT_EQ(Timestamp(48), reply2->getOldTimestamp()); + EXPECT_NO_FATAL_FAILURE(assert_bucket_info_reply_fields_propagated(*reply2)); +} + +TEST_P(StorageProtocolTest, revert) { std::vector<Timestamp> tokens; tokens.push_back(59); - RevertCommand::SP cmd(new RevertCommand(_bucket, tokens)); - RevertCommand::SP cmd2(copyCommand(cmd, _version5_1)); - CPPUNIT_ASSERT_EQUAL(tokens, cmd2->getRevertTokens()); - - RevertReply::SP reply(new RevertReply(*cmd2)); - BucketInfo info(0x12345432, 101, 520); - reply->setBucketInfo(info); - RevertReply::SP reply2(copyReply(reply)); + auto cmd = std::make_shared<RevertCommand>(_bucket, tokens); + auto cmd2 = copyCommand(cmd); + EXPECT_EQ(_bucket, cmd2->getBucket()); + EXPECT_EQ(tokens, cmd2->getRevertTokens()); - CPPUNIT_ASSERT_EQUAL(info, reply2->getBucketInfo()); - - recordOutput(*cmd2); - recordOutput(*reply2); - recordSerialization50(); + auto reply = std::make_shared<RevertReply>(*cmd2); + set_dummy_bucket_info_reply_fields(*reply); + auto reply2 = copyReply(reply); + EXPECT_NO_FATAL_FAILURE(assert_bucket_info_reply_fields_propagated(*reply2)); } -void -StorageProtocolTest::testRequestBucketInfo51() -{ - ScopedName test("testRequestBucketInfo51"); +TEST_P(StorageProtocolTest, request_bucket_info) { { std::vector<document::BucketId> ids; ids.push_back(document::BucketId(3)); ids.push_back(document::BucketId(7)); - RequestBucketInfoCommand::SP cmd(new RequestBucketInfoCommand(makeBucketSpace(), ids)); - RequestBucketInfoCommand::SP cmd2(copyCommand(cmd, _version5_1)); - CPPUNIT_ASSERT_EQUAL(ids, cmd2->getBuckets()); - CPPUNIT_ASSERT(!cmd2->hasSystemState()); - - recordOutput(*cmd2); + auto cmd = std::make_shared<RequestBucketInfoCommand>(makeBucketSpace(), ids); + auto cmd2 = copyCommand(cmd); + EXPECT_EQ(ids, cmd2->getBuckets()); + EXPECT_FALSE(cmd2->hasSystemState()); } { ClusterState state("distributor:3 .1.s:d"); - RequestBucketInfoCommand::SP cmd(new RequestBucketInfoCommand( - makeBucketSpace(), - 3, state, "14")); - RequestBucketInfoCommand::SP cmd2(copyCommand(cmd, _version5_1)); - CPPUNIT_ASSERT(cmd2->hasSystemState()); - CPPUNIT_ASSERT_EQUAL(uint16_t(3), cmd2->getDistributor()); - CPPUNIT_ASSERT_EQUAL(state, cmd2->getSystemState()); - CPPUNIT_ASSERT_EQUAL(size_t(0), cmd2->getBuckets().size()); - - RequestBucketInfoReply::SP reply(new RequestBucketInfoReply(*cmd)); + auto cmd = std::make_shared<RequestBucketInfoCommand>(makeBucketSpace(), 3, state, "14"); + auto cmd2 = copyCommand(cmd); + ASSERT_TRUE(cmd2->hasSystemState()); + EXPECT_EQ(uint16_t(3), cmd2->getDistributor()); + EXPECT_EQ(state, cmd2->getSystemState()); + EXPECT_EQ(size_t(0), cmd2->getBuckets().size()); + + auto reply = std::make_shared<RequestBucketInfoReply>(*cmd); RequestBucketInfoReply::Entry e; e._bucketId = document::BucketId(4); const uint64_t lastMod = 0x1337cafe98765432ULL; e._info = BucketInfo(43, 24, 123, 44, 124, false, true, lastMod); reply->getBucketInfo().push_back(e); - RequestBucketInfoReply::SP reply2(copyReply(reply)); - CPPUNIT_ASSERT_EQUAL(size_t(1), reply2->getBucketInfo().size()); + auto reply2 = copyReply(reply); + EXPECT_EQ(size_t(1), reply2->getBucketInfo().size()); auto& entries(reply2->getBucketInfo()); - CPPUNIT_ASSERT_EQUAL(e, entries[0]); + EXPECT_EQ(e, entries[0]); // "Last modified" not counted by operator== for some reason. Testing // separately until we can figure out if this is by design or not. - CPPUNIT_ASSERT_EQUAL(lastMod, entries[0]._info.getLastModified()); - - recordOutput(*cmd2); - recordOutput(*reply2); - recordSerialization50(); + EXPECT_EQ(lastMod, entries[0]._info.getLastModified()); } } -void -StorageProtocolTest::testNotifyBucketChange51() -{ - ScopedName test("testNotifyBucketChange51"); - BucketInfo info(2, 3, 4); - document::BucketId modifiedBucketId(20, 1000); - document::Bucket modifiedBucket(makeDocumentBucket(modifiedBucketId)); - NotifyBucketChangeCommand::SP cmd(new NotifyBucketChangeCommand( - modifiedBucket, info)); - NotifyBucketChangeCommand::SP cmd2(copyCommand(cmd, _version5_1)); - CPPUNIT_ASSERT_EQUAL(document::BucketId(20, 1000), - cmd2->getBucketId()); - CPPUNIT_ASSERT_EQUAL(info, cmd2->getBucketInfo()); - - NotifyBucketChangeReply::SP reply(new NotifyBucketChangeReply(*cmd)); - NotifyBucketChangeReply::SP reply2(copyReply(reply)); - - recordOutput(*cmd2); - recordOutput(*reply2); - recordSerialization50(); -} - -void -StorageProtocolTest::testCreateBucket51() -{ - ScopedName test("testCreateBucket51"); - document::BucketId bucketId(623); - document::Bucket bucket(makeDocumentBucket(bucketId)); +TEST_P(StorageProtocolTest, notify_bucket_change) { + auto cmd = std::make_shared<NotifyBucketChangeCommand>(_bucket, _dummy_bucket_info); + auto cmd2 = copyCommand(cmd); + EXPECT_EQ(_bucket, cmd2->getBucket()); + EXPECT_EQ(_dummy_bucket_info, cmd2->getBucketInfo()); - CreateBucketCommand::SP cmd(new CreateBucketCommand(bucket)); - CreateBucketCommand::SP cmd2(copyCommand(cmd, _version5_1)); - CPPUNIT_ASSERT_EQUAL(bucketId, cmd2->getBucketId()); + auto reply = std::make_shared<NotifyBucketChangeReply>(*cmd); + auto reply2 = copyReply(reply); +} - CreateBucketReply::SP reply(new CreateBucketReply(*cmd)); - CreateBucketReply::SP reply2(copyReply(reply)); - CPPUNIT_ASSERT_EQUAL(bucketId, reply2->getBucketId()); +TEST_P(StorageProtocolTest, create_bucket_without_activation) { + auto cmd = std::make_shared<CreateBucketCommand>(_bucket); + EXPECT_FALSE(cmd->getActive()); + auto cmd2 = copyCommand(cmd); + EXPECT_EQ(_bucket, cmd2->getBucket()); + EXPECT_FALSE(cmd2->getActive()); - recordOutput(*cmd2); - recordOutput(*reply2); - recordSerialization50(); + auto reply = std::make_shared<CreateBucketReply>(*cmd); + set_dummy_bucket_info_reply_fields(*reply); + auto reply2 = copyReply(reply); + EXPECT_NO_FATAL_FAILURE(assert_bucket_info_reply_fields_propagated(*reply2)); } -void -StorageProtocolTest::testDeleteBucket51() -{ - ScopedName test("testDeleteBucket51"); - document::BucketId bucketId(623); - document::Bucket bucket(makeDocumentBucket(bucketId)); - - DeleteBucketCommand::SP cmd(new DeleteBucketCommand(bucket)); - BucketInfo info(0x100, 200, 300); - cmd->setBucketInfo(info); - DeleteBucketCommand::SP cmd2(copyCommand(cmd, _version5_1)); - CPPUNIT_ASSERT_EQUAL(bucketId, cmd2->getBucketId()); - CPPUNIT_ASSERT_EQUAL(info, cmd2->getBucketInfo()); - - DeleteBucketReply::SP reply(new DeleteBucketReply(*cmd)); +TEST_P(StorageProtocolTest, create_bucket_propagates_activation_flag) { + auto cmd = std::make_shared<CreateBucketCommand>(_bucket); + cmd->setActive(true); + auto cmd2 = copyCommand(cmd); + EXPECT_TRUE(cmd2->getActive()); +} + +TEST_P(StorageProtocolTest, delete_bucket) { + auto cmd = std::make_shared<DeleteBucketCommand>(_bucket); + cmd->setBucketInfo(_dummy_bucket_info); + auto cmd2 = copyCommand(cmd); + EXPECT_EQ(_bucket, cmd2->getBucket()); + EXPECT_EQ(_dummy_bucket_info, cmd2->getBucketInfo()); + + auto reply = std::make_shared<DeleteBucketReply>(*cmd); // Not set automatically by constructor reply->setBucketInfo(cmd2->getBucketInfo()); - DeleteBucketReply::SP reply2(copyReply(reply)); - CPPUNIT_ASSERT_EQUAL(bucketId, reply2->getBucketId()); - CPPUNIT_ASSERT_EQUAL(info, reply2->getBucketInfo()); - - recordOutput(*cmd2); - recordOutput(*reply2); - recordSerialization50(); + auto reply2 = copyReply(reply); + EXPECT_EQ(_bucket_id, reply2->getBucketId()); + EXPECT_EQ(_dummy_bucket_info, reply2->getBucketInfo()); } -void -StorageProtocolTest::testMergeBucket51() -{ - ScopedName test("testMergeBucket51"); - document::BucketId bucketId(623); - document::Bucket bucket(makeDocumentBucket(bucketId)); - +TEST_P(StorageProtocolTest, merge_bucket) { typedef api::MergeBucketCommand::Node Node; std::vector<Node> nodes; nodes.push_back(Node(4, false)); @@ -522,152 +367,98 @@ StorageProtocolTest::testMergeBucket51() chain.push_back(7); chain.push_back(14); - MergeBucketCommand::SP cmd( - new MergeBucketCommand(bucket, nodes, Timestamp(1234), 567, chain)); - MergeBucketCommand::SP cmd2(copyCommand(cmd, _version5_1)); - CPPUNIT_ASSERT_EQUAL(bucketId, cmd2->getBucketId()); - CPPUNIT_ASSERT_EQUAL(nodes, cmd2->getNodes()); - CPPUNIT_ASSERT_EQUAL(Timestamp(1234), cmd2->getMaxTimestamp()); - CPPUNIT_ASSERT_EQUAL(uint32_t(567), cmd2->getClusterStateVersion()); - CPPUNIT_ASSERT_EQUAL(chain, cmd2->getChain()); - - MergeBucketReply::SP reply(new MergeBucketReply(*cmd)); - MergeBucketReply::SP reply2(copyReply(reply)); - CPPUNIT_ASSERT_EQUAL(bucketId, reply2->getBucketId()); - CPPUNIT_ASSERT_EQUAL(nodes, reply2->getNodes()); - CPPUNIT_ASSERT_EQUAL(Timestamp(1234), reply2->getMaxTimestamp()); - CPPUNIT_ASSERT_EQUAL(uint32_t(567), reply2->getClusterStateVersion()); - CPPUNIT_ASSERT_EQUAL(chain, reply2->getChain()); - - recordOutput(*cmd2); - recordOutput(*reply2); - recordSerialization50(); -} - -void -StorageProtocolTest::testSplitBucket51() -{ - ScopedName test("testSplitBucket51"); - - document::BucketId bucketId(16, 0); - document::Bucket bucket(makeDocumentBucket(bucketId)); - SplitBucketCommand::SP cmd(new SplitBucketCommand(bucket)); - CPPUNIT_ASSERT_EQUAL(0u, (uint32_t) cmd->getMinSplitBits()); - CPPUNIT_ASSERT_EQUAL(58u, (uint32_t) cmd->getMaxSplitBits()); - CPPUNIT_ASSERT_EQUAL(std::numeric_limits<uint32_t>().max(), - cmd->getMinByteSize()); - CPPUNIT_ASSERT_EQUAL(std::numeric_limits<uint32_t>().max(), - cmd->getMinDocCount()); + auto cmd = std::make_shared<MergeBucketCommand>(_bucket, nodes, Timestamp(1234), 567, chain); + auto cmd2 = copyCommand(cmd); + EXPECT_EQ(_bucket, cmd2->getBucket()); + EXPECT_EQ(nodes, cmd2->getNodes()); + EXPECT_EQ(Timestamp(1234), cmd2->getMaxTimestamp()); + EXPECT_EQ(uint32_t(567), cmd2->getClusterStateVersion()); + EXPECT_EQ(chain, cmd2->getChain()); + + auto reply = std::make_shared<MergeBucketReply>(*cmd); + auto reply2 = copyReply(reply); + EXPECT_EQ(_bucket_id, reply2->getBucketId()); + EXPECT_EQ(nodes, reply2->getNodes()); + EXPECT_EQ(Timestamp(1234), reply2->getMaxTimestamp()); + EXPECT_EQ(uint32_t(567), reply2->getClusterStateVersion()); + EXPECT_EQ(chain, reply2->getChain()); +} + +TEST_P(StorageProtocolTest, split_bucket) { + auto cmd = std::make_shared<SplitBucketCommand>(_bucket); + EXPECT_EQ(0u, cmd->getMinSplitBits()); + EXPECT_EQ(58u, cmd->getMaxSplitBits()); + EXPECT_EQ(std::numeric_limits<uint32_t>().max(), cmd->getMinByteSize()); + EXPECT_EQ(std::numeric_limits<uint32_t>().max(), cmd->getMinDocCount()); cmd->setMinByteSize(1000); cmd->setMinDocCount(5); cmd->setMaxSplitBits(40); cmd->setMinSplitBits(20); - SplitBucketCommand::SP cmd2(copyCommand(cmd, _version5_1)); - CPPUNIT_ASSERT_EQUAL(20u, (uint32_t) cmd2->getMinSplitBits()); - CPPUNIT_ASSERT_EQUAL(40u, (uint32_t) cmd2->getMaxSplitBits()); - CPPUNIT_ASSERT_EQUAL(1000u, cmd2->getMinByteSize()); - CPPUNIT_ASSERT_EQUAL(5u, cmd2->getMinDocCount()); - - SplitBucketReply::SP reply(new SplitBucketReply(*cmd2)); - reply->getSplitInfo().push_back(SplitBucketReply::Entry( - document::BucketId(17, 0), BucketInfo(100, 1000, 10000, true, true))); - reply->getSplitInfo().push_back(SplitBucketReply::Entry( - document::BucketId(17, 1), BucketInfo(101, 1001, 10001, true, true))); - SplitBucketReply::SP reply2(copyReply(reply)); - - CPPUNIT_ASSERT_EQUAL(bucketId, reply2->getBucketId()); - CPPUNIT_ASSERT_EQUAL(size_t(2), reply2->getSplitInfo().size()); - CPPUNIT_ASSERT_EQUAL(document::BucketId(17, 0), - reply2->getSplitInfo()[0].first); - CPPUNIT_ASSERT_EQUAL(document::BucketId(17, 1), - reply2->getSplitInfo()[1].first); - CPPUNIT_ASSERT_EQUAL(BucketInfo(100, 1000, 10000, true, true), - reply2->getSplitInfo()[0].second); - CPPUNIT_ASSERT_EQUAL(BucketInfo(101, 1001, 10001, true, true), - reply2->getSplitInfo()[1].second); - - recordOutput(*cmd2); - recordOutput(*reply2); - recordSerialization50(); -} - -void -StorageProtocolTest::testJoinBuckets51() -{ - ScopedName test("testJoinBuckets51"); - document::BucketId bucketId(16, 0); - document::Bucket bucket(makeDocumentBucket(bucketId)); + auto cmd2 = copyCommand(cmd); + EXPECT_EQ(_bucket, cmd2->getBucket()); + EXPECT_EQ(20u, cmd2->getMinSplitBits()); + EXPECT_EQ(40u, cmd2->getMaxSplitBits()); + EXPECT_EQ(1000u, cmd2->getMinByteSize()); + EXPECT_EQ(5u, cmd2->getMinDocCount()); + + auto reply = std::make_shared<SplitBucketReply>(*cmd2); + reply->getSplitInfo().emplace_back(document::BucketId(17, 0), BucketInfo(100, 1000, 10000, true, true)); + reply->getSplitInfo().emplace_back(document::BucketId(17, 1), BucketInfo(101, 1001, 10001, true, true)); + auto reply2 = copyReply(reply); + + EXPECT_EQ(_bucket, reply2->getBucket()); + EXPECT_EQ(size_t(2), reply2->getSplitInfo().size()); + EXPECT_EQ(document::BucketId(17, 0), reply2->getSplitInfo()[0].first); + EXPECT_EQ(document::BucketId(17, 1), reply2->getSplitInfo()[1].first); + EXPECT_EQ(BucketInfo(100, 1000, 10000, true, true), reply2->getSplitInfo()[0].second); + EXPECT_EQ(BucketInfo(101, 1001, 10001, true, true), reply2->getSplitInfo()[1].second); +} + +TEST_P(StorageProtocolTest, join_buckets) { std::vector<document::BucketId> sources; sources.push_back(document::BucketId(17, 0)); sources.push_back(document::BucketId(17, 1)); - JoinBucketsCommand::SP cmd(new JoinBucketsCommand(bucket)); + auto cmd = std::make_shared<JoinBucketsCommand>(_bucket); cmd->getSourceBuckets() = sources; cmd->setMinJoinBits(3); - JoinBucketsCommand::SP cmd2(copyCommand(cmd, _version5_1)); + auto cmd2 = copyCommand(cmd); + EXPECT_EQ(_bucket, cmd2->getBucket()); - JoinBucketsReply::SP reply(new JoinBucketsReply(*cmd2)); + auto reply = std::make_shared<JoinBucketsReply>(*cmd2); reply->setBucketInfo(BucketInfo(3,4,5)); - JoinBucketsReply::SP reply2(copyReply(reply)); + auto reply2 = copyReply(reply); - CPPUNIT_ASSERT_EQUAL(sources, reply2->getSourceBuckets()); - CPPUNIT_ASSERT_EQUAL(3, (int)cmd2->getMinJoinBits()); - CPPUNIT_ASSERT_EQUAL(BucketInfo(3,4,5), reply2->getBucketInfo()); - CPPUNIT_ASSERT_EQUAL(bucketId, reply2->getBucketId()); - - recordOutput(*cmd2); - recordOutput(*reply2); + EXPECT_EQ(sources, reply2->getSourceBuckets()); + EXPECT_EQ(3, cmd2->getMinJoinBits()); + EXPECT_EQ(BucketInfo(3,4,5), reply2->getBucketInfo()); + EXPECT_EQ(_bucket, reply2->getBucket()); } -void -StorageProtocolTest::testDestroyVisitor51() -{ - ScopedName test("testDestroyVisitor51"); +TEST_P(StorageProtocolTest, destroy_visitor) { + auto cmd = std::make_shared<DestroyVisitorCommand>("instance"); + auto cmd2 = copyCommand(cmd); + EXPECT_EQ("instance", cmd2->getInstanceId()); - DestroyVisitorCommand::SP cmd( - new DestroyVisitorCommand("instance")); - DestroyVisitorCommand::SP cmd2(copyCommand(cmd, _version5_1)); - CPPUNIT_ASSERT_EQUAL(string("instance"), cmd2->getInstanceId()); - - DestroyVisitorReply::SP reply(new DestroyVisitorReply(*cmd2)); - DestroyVisitorReply::SP reply2(copyReply(reply)); - - recordOutput(*cmd2); - recordOutput(*reply2); - recordSerialization50(); + auto reply = std::make_shared<DestroyVisitorReply>(*cmd2); + auto reply2 = copyReply(reply); } -void -StorageProtocolTest::testRemoveLocation51() -{ - ScopedName test("testRemoveLocation51"); - document::BucketId bucketId(16, 1234); - document::Bucket bucket(makeDocumentBucket(bucketId)); - - RemoveLocationCommand::SP cmd( - new RemoveLocationCommand("id.group == \"mygroup\"", bucket)); - RemoveLocationCommand::SP cmd2(copyCommand(cmd, _version5_1)); - CPPUNIT_ASSERT_EQUAL(vespalib::string("id.group == \"mygroup\""), cmd2->getDocumentSelection()); - CPPUNIT_ASSERT_EQUAL(bucketId, cmd2->getBucketId()); - - RemoveLocationReply::SP reply(new RemoveLocationReply(*cmd2)); - RemoveLocationReply::SP reply2(copyReply(reply)); +TEST_P(StorageProtocolTest, remove_location) { + auto cmd = std::make_shared<RemoveLocationCommand>("id.group == \"mygroup\"", _bucket); + auto cmd2 = copyCommand(cmd); + EXPECT_EQ("id.group == \"mygroup\"", cmd2->getDocumentSelection()); + EXPECT_EQ(_bucket, cmd2->getBucket()); - recordOutput(*cmd2); - recordOutput(*reply2); - recordSerialization50(); + auto reply = std::make_shared<RemoveLocationReply>(*cmd2); + auto reply2 = copyReply(reply); } -void -StorageProtocolTest::testCreateVisitor51() -{ - ScopedName test("testCreateVisitor51"); - +TEST_P(StorageProtocolTest, create_visitor) { std::vector<document::BucketId> buckets; buckets.push_back(document::BucketId(16, 1)); buckets.push_back(document::BucketId(16, 2)); - CreateVisitorCommand::SP cmd( - new CreateVisitorCommand(makeBucketSpace(), "library", "id", "doc selection")); + auto cmd = std::make_shared<CreateVisitorCommand>(makeBucketSpace(), "library", "id", "doc selection"); cmd->setControlDestination("controldest"); cmd->setDataDestination("datadest"); cmd->setVisitorCmdId(1); @@ -681,40 +472,26 @@ StorageProtocolTest::testCreateVisitor51() cmd->setFieldSet("foo,bar,vekterli"); cmd->setVisitInconsistentBuckets(); cmd->setQueueTimeout(100); - cmd->setVisitorOrdering(document::OrderingSpecification::DESCENDING); cmd->setPriority(149); - CreateVisitorCommand::SP cmd2(copyCommand(cmd, _version5_1)); - CPPUNIT_ASSERT_EQUAL(string("library"), cmd2->getLibraryName()); - CPPUNIT_ASSERT_EQUAL(string("id"), cmd2->getInstanceId()); - CPPUNIT_ASSERT_EQUAL(string("doc selection"), - cmd2->getDocumentSelection()); - CPPUNIT_ASSERT_EQUAL(string("controldest"), - cmd2->getControlDestination()); - CPPUNIT_ASSERT_EQUAL(string("datadest"), cmd2->getDataDestination()); - CPPUNIT_ASSERT_EQUAL(api::Timestamp(123), cmd2->getFromTime()); - CPPUNIT_ASSERT_EQUAL(api::Timestamp(456), cmd2->getToTime()); - CPPUNIT_ASSERT_EQUAL(2u, cmd2->getMaximumPendingReplyCount()); - CPPUNIT_ASSERT_EQUAL(buckets, cmd2->getBuckets()); - CPPUNIT_ASSERT_EQUAL(vespalib::string("foo,bar,vekterli"), cmd2->getFieldSet()); - CPPUNIT_ASSERT(cmd2->visitInconsistentBuckets()); - CPPUNIT_ASSERT_EQUAL(document::OrderingSpecification::DESCENDING, cmd2->getVisitorOrdering()); - CPPUNIT_ASSERT_EQUAL(149, (int)cmd2->getPriority()); - - CreateVisitorReply::SP reply(new CreateVisitorReply(*cmd2)); - CreateVisitorReply::SP reply2(copyReply(reply)); - - recordOutput(*cmd2); - recordOutput(*reply2); - recordSerialization50(); -} - -void -StorageProtocolTest::testGetBucketDiff51() -{ - ScopedName test("testGetBucketDiff51"); - document::BucketId bucketId(623); - document::Bucket bucket(makeDocumentBucket(bucketId)); - + auto cmd2 = copyCommand(cmd); + EXPECT_EQ("library", cmd2->getLibraryName()); + EXPECT_EQ("id", cmd2->getInstanceId()); + EXPECT_EQ("doc selection", cmd2->getDocumentSelection()); + EXPECT_EQ("controldest", cmd2->getControlDestination()); + EXPECT_EQ("datadest", cmd2->getDataDestination()); + EXPECT_EQ(api::Timestamp(123), cmd2->getFromTime()); + EXPECT_EQ(api::Timestamp(456), cmd2->getToTime()); + EXPECT_EQ(2u, cmd2->getMaximumPendingReplyCount()); + EXPECT_EQ(buckets, cmd2->getBuckets()); + EXPECT_EQ("foo,bar,vekterli", cmd2->getFieldSet()); + EXPECT_TRUE(cmd2->visitInconsistentBuckets()); + EXPECT_EQ(149, cmd2->getPriority()); + + auto reply = std::make_shared<CreateVisitorReply>(*cmd2); + auto reply2 = copyReply(reply); +} + +TEST_P(StorageProtocolTest, get_bucket_diff) { std::vector<api::MergeBucketCommand::Node> nodes; nodes.push_back(4); nodes.push_back(13); @@ -727,56 +504,68 @@ StorageProtocolTest::testGetBucketDiff51() entries.back()._flags = 1; entries.back()._hasMask = 3; - CPPUNIT_ASSERT_EQUAL(std::string( - "Entry(timestamp: 123456, gid(0x313233343536373839306162), " - "hasMask: 0x3,\n" - " header size: 100, body size: 65536, flags 0x1)"), - entries.back().toString(true)); + EXPECT_EQ("Entry(timestamp: 123456, gid(0x313233343536373839306162), hasMask: 0x3,\n" + " header size: 100, body size: 65536, flags 0x1)", + entries.back().toString(true)); - GetBucketDiffCommand::SP cmd(new GetBucketDiffCommand(bucket, nodes, 1056)); + auto cmd = std::make_shared<GetBucketDiffCommand>(_bucket, nodes, 1056); cmd->getDiff() = entries; - GetBucketDiffCommand::SP cmd2(copyCommand(cmd, _version5_1)); + auto cmd2 = copyCommand(cmd); + EXPECT_EQ(_bucket, cmd2->getBucket()); + + auto reply = std::make_shared<GetBucketDiffReply>(*cmd2); + EXPECT_EQ(entries, reply->getDiff()); + auto reply2 = copyReply(reply); - GetBucketDiffReply::SP reply(new GetBucketDiffReply(*cmd2)); - CPPUNIT_ASSERT_EQUAL(entries, reply->getDiff()); - GetBucketDiffReply::SP reply2(copyReply(reply)); + EXPECT_EQ(nodes, reply2->getNodes()); + EXPECT_EQ(entries, reply2->getDiff()); + EXPECT_EQ(Timestamp(1056), reply2->getMaxTimestamp()); +} + +namespace { + +ApplyBucketDiffCommand::Entry dummy_apply_entry() { + ApplyBucketDiffCommand::Entry e; + e._docName = "my cool id"; + vespalib::string header_data = "fancy header"; + e._headerBlob.resize(header_data.size()); + memcpy(&e._headerBlob[0], header_data.data(), header_data.size()); - CPPUNIT_ASSERT_EQUAL(nodes, reply2->getNodes()); - CPPUNIT_ASSERT_EQUAL(entries, reply2->getDiff()); - CPPUNIT_ASSERT_EQUAL(Timestamp(1056), reply2->getMaxTimestamp()); + vespalib::string body_data = "fancier body!"; + e._bodyBlob.resize(body_data.size()); + memcpy(&e._bodyBlob[0], body_data.data(), body_data.size()); - recordOutput(*cmd2); - recordOutput(*reply2); - recordSerialization50(); + GetBucketDiffCommand::Entry meta; + meta._timestamp = 567890; + meta._hasMask = 0x3; + meta._flags = 0x1; + meta._headerSize = 12345; + meta._headerSize = header_data.size(); + meta._bodySize = body_data.size(); + + e._entry = meta; + return e; } -void -StorageProtocolTest::testApplyBucketDiff51() -{ - ScopedName test("testApplyBucketDiff51"); - document::BucketId bucketId(16, 623); - document::Bucket bucket(makeDocumentBucket(bucketId)); +} +TEST_P(StorageProtocolTest, apply_bucket_diff) { std::vector<api::MergeBucketCommand::Node> nodes; nodes.push_back(4); nodes.push_back(13); - std::vector<ApplyBucketDiffCommand::Entry> entries; - entries.push_back(ApplyBucketDiffCommand::Entry()); + std::vector<ApplyBucketDiffCommand::Entry> entries = {dummy_apply_entry()}; - ApplyBucketDiffCommand::SP cmd(new ApplyBucketDiffCommand(bucket, nodes, 1234)); + auto cmd = std::make_shared<ApplyBucketDiffCommand>(_bucket, nodes, 1234); cmd->getDiff() = entries; - ApplyBucketDiffCommand::SP cmd2(copyCommand(cmd, _version5_1)); + auto cmd2 = copyCommand(cmd); + EXPECT_EQ(_bucket, cmd2->getBucket()); - ApplyBucketDiffReply::SP reply(new ApplyBucketDiffReply(*cmd2)); - ApplyBucketDiffReply::SP reply2(copyReply(reply)); + auto reply = std::make_shared<ApplyBucketDiffReply>(*cmd2); + auto reply2 = copyReply(reply); - CPPUNIT_ASSERT_EQUAL(nodes, reply2->getNodes()); - CPPUNIT_ASSERT_EQUAL(entries, reply2->getDiff()); - CPPUNIT_ASSERT_EQUAL(1234u, reply2->getMaxBufferSize()); - - recordOutput(*cmd2); - recordOutput(*reply2); - recordSerialization50(); + EXPECT_EQ(nodes, reply2->getNodes()); + EXPECT_EQ(entries, reply2->getDiff()); + EXPECT_EQ(1234u, reply2->getMaxBufferSize()); } namespace { @@ -807,161 +596,97 @@ namespace { }; api::StorageReply::UP MyCommand::makeReply() { - return api::StorageReply::UP(new MyReply(*this)); + return std::make_unique<MyReply>(*this); } } -void -StorageProtocolTest::testInternalMessage() -{ - ScopedName test("testInternal51"); +TEST_P(StorageProtocolTest, internal_message) { MyCommand cmd; MyReply reply(cmd); - - recordOutput(cmd); - recordOutput(reply); + // TODO what's this even intended to test? } -void -StorageProtocolTest::testSetBucketState51() -{ - ScopedName test("testSetBucketState51"); - document::BucketId bucketId(16, 0); - document::Bucket bucket(makeDocumentBucket(bucketId)); - SetBucketStateCommand::SP cmd( - new SetBucketStateCommand(bucket, SetBucketStateCommand::ACTIVE)); - SetBucketStateCommand::SP cmd2(copyCommand(cmd, _version5_1)); +TEST_P(StorageProtocolTest, set_bucket_state_with_inactive_state) { + auto cmd = std::make_shared<SetBucketStateCommand>(_bucket, SetBucketStateCommand::INACTIVE); + auto cmd2 = copyCommand(cmd); + EXPECT_EQ(_bucket, cmd2->getBucket()); - SetBucketStateReply::SP reply(new SetBucketStateReply(*cmd2)); - SetBucketStateReply::SP reply2(copyReply(reply)); + auto reply = std::make_shared<SetBucketStateReply>(*cmd2); + auto reply2 = copyReply(reply); - CPPUNIT_ASSERT_EQUAL(SetBucketStateCommand::ACTIVE, cmd2->getState()); - CPPUNIT_ASSERT_EQUAL(bucketId, cmd2->getBucketId()); - CPPUNIT_ASSERT_EQUAL(bucketId, reply2->getBucketId()); - - recordOutput(*cmd2); - recordOutput(*reply2); + EXPECT_EQ(SetBucketStateCommand::INACTIVE, cmd2->getState()); + EXPECT_EQ(_bucket, reply2->getBucket()); } -void -StorageProtocolTest::testPutCommand52() -{ - ScopedName test("testPutCommand52"); +TEST_P(StorageProtocolTest, set_bucket_state_with_active_state) { + auto cmd = std::make_shared<SetBucketStateCommand>(_bucket, SetBucketStateCommand::ACTIVE); + auto cmd2 = copyCommand(cmd); + EXPECT_EQ(SetBucketStateCommand::ACTIVE, cmd2->getState()); +} - PutCommand::SP cmd(new PutCommand(_bucket, _testDoc, 14)); +TEST_P(StorageProtocolTest, put_command_with_condition) { + auto cmd = std::make_shared<PutCommand>(_bucket, _testDoc, 14); cmd->setCondition(TestAndSetCondition(CONDITION_STRING)); - PutCommand::SP cmd2(copyCommand(cmd, _version5_2)); - CPPUNIT_ASSERT_EQUAL(cmd->getCondition().getSelection(), cmd2->getCondition().getSelection()); + auto cmd2 = copyCommand(cmd); + EXPECT_EQ(cmd->getCondition().getSelection(), cmd2->getCondition().getSelection()); } -void -StorageProtocolTest::testUpdateCommand52() -{ - ScopedName test("testUpdateCommand52"); - - document::DocumentUpdate::SP update(new document::DocumentUpdate(_docMan.getTypeRepo(), *_testDoc->getDataType(), _testDoc->getId())); - UpdateCommand::SP cmd(new UpdateCommand(_bucket, update, 14)); +TEST_P(StorageProtocolTest, update_command_with_condition) { + auto update = std::make_shared<document::DocumentUpdate>( + _docMan.getTypeRepo(), *_testDoc->getDataType(), _testDoc->getId()); + auto cmd = std::make_shared<UpdateCommand>(_bucket, update, 14); cmd->setCondition(TestAndSetCondition(CONDITION_STRING)); - UpdateCommand::SP cmd2(copyCommand(cmd, _version5_2)); - CPPUNIT_ASSERT_EQUAL(cmd->getCondition().getSelection(), cmd2->getCondition().getSelection()); + auto cmd2 = copyCommand(cmd); + EXPECT_EQ(cmd->getCondition().getSelection(), cmd2->getCondition().getSelection()); } -void -StorageProtocolTest::testRemoveCommand52() -{ - ScopedName test("testRemoveCommand52"); - - RemoveCommand::SP cmd(new RemoveCommand(_bucket, _testDocId, 159)); +TEST_P(StorageProtocolTest, remove_command_with_condition) { + auto cmd = std::make_shared<RemoveCommand>(_bucket, _testDocId, 159); cmd->setCondition(TestAndSetCondition(CONDITION_STRING)); - RemoveCommand::SP cmd2(copyCommand(cmd, _version5_2)); - CPPUNIT_ASSERT_EQUAL(cmd->getCondition().getSelection(), cmd2->getCondition().getSelection()); + auto cmd2 = copyCommand(cmd); + EXPECT_EQ(cmd->getCondition().getSelection(), cmd2->getCondition().getSelection()); } -void -StorageProtocolTest::testPutCommandWithBucketSpace6_0() -{ - ScopedName test("testPutCommandWithBucketSpace6_0"); - - document::Bucket bucket(document::BucketSpace(5), _bucket.getBucketId()); +TEST_P(StorageProtocolTest, put_command_with_bucket_space) { + document::Bucket bucket(document::BucketSpace(5), _bucket_id); auto cmd = std::make_shared<PutCommand>(bucket, _testDoc, 14); - auto cmd2 = copyCommand(cmd, _version6_0); - CPPUNIT_ASSERT_EQUAL(bucket, cmd2->getBucket()); + auto cmd2 = copyCommand(cmd); + EXPECT_EQ(bucket, cmd2->getBucket()); } -void -StorageProtocolTest::testCreateVisitorWithBucketSpace6_0() -{ - ScopedName test("testCreateVisitorWithBucketSpace6_0"); - +TEST_P(StorageProtocolTest, create_visitor_with_bucket_space) { document::BucketSpace bucketSpace(5); auto cmd = std::make_shared<CreateVisitorCommand>(bucketSpace, "library", "id", "doc selection"); - auto cmd2 = copyCommand(cmd, _version6_0); - CPPUNIT_ASSERT_EQUAL(bucketSpace, cmd2->getBucketSpace()); + auto cmd2 = copyCommand(cmd); + EXPECT_EQ(bucketSpace, cmd2->getBucketSpace()); } -void -StorageProtocolTest::testRequestBucketInfoWithBucketSpace6_0() -{ - ScopedName test("testRequestBucketInfoWithBucketSpace6_0"); - +TEST_P(StorageProtocolTest, request_bucket_info_with_bucket_space) { document::BucketSpace bucketSpace(5); std::vector<document::BucketId> ids = {document::BucketId(3)}; auto cmd = std::make_shared<RequestBucketInfoCommand>(bucketSpace, ids); - auto cmd2 = copyCommand(cmd, _version6_0); - CPPUNIT_ASSERT_EQUAL(bucketSpace, cmd2->getBucketSpace()); - CPPUNIT_ASSERT_EQUAL(ids, cmd2->getBuckets()); -} - -void -StorageProtocolTest::serialized_size_is_used_to_set_approx_size_of_storage_message() -{ - ScopedName test("serialized_size_is_used_to_set_approx_size_of_storage_message"); - - PutCommand::SP cmd(new PutCommand(_bucket, _testDoc, 14)); - CPPUNIT_ASSERT_EQUAL(50u, cmd->getApproxByteSize()); - - PutCommand::SP cmd2(copyCommand(cmd, _version6_0)); - CPPUNIT_ASSERT_EQUAL(181u, cmd2->getApproxByteSize()); + auto cmd2 = copyCommand(cmd); + EXPECT_EQ(bucketSpace, cmd2->getBucketSpace()); + EXPECT_EQ(ids, cmd2->getBuckets()); } -void -StorageProtocolTest::testStringOutputs() -{ - std::cerr << "\nNon verbose output:\n"; - for (uint32_t i=0, n=_nonVerboseMessageStrings.size(); i<n; ++i) { - std::cerr << _nonVerboseMessageStrings[i] << "\n"; - } - std::cerr << "\nVerbose output:\n"; - for (uint32_t i=0, n=_verboseMessageStrings.size(); i<n; ++i) { - std::cerr << _verboseMessageStrings[i] << "\n"; - } -} +TEST_P(StorageProtocolTest, serialized_size_is_used_to_set_approx_size_of_storage_message) { + auto cmd = std::make_shared<PutCommand>(_bucket, _testDoc, 14); + EXPECT_EQ(50u, cmd->getApproxByteSize()); -void -StorageProtocolTest::testWriteSerialization50() -{ - std::ofstream of("mbusprot/mbusprot.5.0.serialization.5.1"); - of << std::hex << std::setfill('0'); - for (uint32_t i=0, n=_serialization50.size(); i<n; ++i) { - char c = _serialization50[i]; - if (c > 126 || (c < 32 && c != 10)) { - int32_t num = static_cast<int32_t>(c); - if (num < 0) num += 256; - of << '\\' << std::setw(2) << num; - } else if (c == '\\') { - of << "\\\\"; - } else { - of << c; - } + auto cmd2 = copyCommand(cmd); + auto version = GetParam(); + if (version.getMajor() == 7) { // Protobuf-based encoding + EXPECT_EQ(158u, cmd2->getApproxByteSize()); + } else { // Legacy encoding + EXPECT_EQ(181u, cmd2->getApproxByteSize()); } - of.close(); } -} // mbusprot -} // storage +} // storage::api diff --git a/storageapi/src/vespa/storageapi/CMakeLists.txt b/storageapi/src/vespa/storageapi/CMakeLists.txt index c08dcbc2419..90eb6dd9eca 100644 --- a/storageapi/src/vespa/storageapi/CMakeLists.txt +++ b/storageapi/src/vespa/storageapi/CMakeLists.txt @@ -1,4 +1,5 @@ # Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + vespa_add_library(storageapi SOURCES $<TARGET_OBJECTS:storageapi_message> @@ -8,3 +9,6 @@ vespa_add_library(storageapi INSTALL lib64 DEPENDS ) + +vespa_add_target_package_dependency(storageapi Protobuf) + diff --git a/storageapi/src/vespa/storageapi/mbusprot/.gitignore b/storageapi/src/vespa/storageapi/mbusprot/.gitignore index 526f91c6668..8e91fe9cab0 100644 --- a/storageapi/src/vespa/storageapi/mbusprot/.gitignore +++ b/storageapi/src/vespa/storageapi/mbusprot/.gitignore @@ -5,3 +5,6 @@ .deps .libs Makefile +*.pb.h +*.pb.cc + diff --git a/storageapi/src/vespa/storageapi/mbusprot/CMakeLists.txt b/storageapi/src/vespa/storageapi/mbusprot/CMakeLists.txt index d5952d7cb91..dc4e3897e49 100644 --- a/storageapi/src/vespa/storageapi/mbusprot/CMakeLists.txt +++ b/storageapi/src/vespa/storageapi/mbusprot/CMakeLists.txt @@ -1,4 +1,19 @@ # Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +find_package(Protobuf REQUIRED) +PROTOBUF_GENERATE_CPP(storageapi_PROTOBUF_SRCS storageapi_PROTOBUF_HDRS + protobuf/common.proto + protobuf/feed.proto + protobuf/visiting.proto + protobuf/maintenance.proto) + +# protoc-generated files emit compiler warnings that we normally treat as errors. +# Instead of rolling our own compiler plugin we'll pragmatically disable the noise. +set_source_files_properties(${storageapi_PROTOBUF_SRCS} PROPERTIES COMPILE_FLAGS "-Wno-suggest-override -Wno-inline") +# protoc explicitly annotates methods with inline, which triggers -Werror=inline when +# the header file grows over a certain size. +set_source_files_properties(protocolserialization7.cpp PROPERTIES COMPILE_FLAGS "-Wno-inline") + vespa_add_library(storageapi_mbusprot OBJECT SOURCES storagemessage.cpp @@ -11,5 +26,7 @@ vespa_add_library(storageapi_mbusprot OBJECT protocolserialization5_1.cpp protocolserialization5_2.cpp protocolserialization6_0.cpp + protocolserialization7.cpp + ${storageapi_PROTOBUF_SRCS} DEPENDS ) diff --git a/storageapi/src/vespa/storageapi/mbusprot/legacyprotocolserialization.h b/storageapi/src/vespa/storageapi/mbusprot/legacyprotocolserialization.h new file mode 100644 index 00000000000..ef4a6b28749 --- /dev/null +++ b/storageapi/src/vespa/storageapi/mbusprot/legacyprotocolserialization.h @@ -0,0 +1,31 @@ +// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include "protocolserialization.h" + +namespace storage::mbusprot { + +/* + * Utility base class for pre-v7 (protobuf) serialization implementations. + * + * TODO remove on Vespa 8 alongside legacy serialization formats. + */ +class LegacyProtocolSerialization : public ProtocolSerialization { + const std::shared_ptr<const document::DocumentTypeRepo> _repo; +public: + explicit LegacyProtocolSerialization(const std::shared_ptr<const document::DocumentTypeRepo>& repo) + : _repo(repo) + {} + + const document::DocumentTypeRepo& getTypeRepo() const { return *_repo; } + const std::shared_ptr<const document::DocumentTypeRepo> getTypeRepoSp() const { return _repo; } + + virtual document::Bucket getBucket(document::ByteBuffer& buf) const = 0; + virtual void putBucket(const document::Bucket& bucket, vespalib::GrowableByteBuffer& buf) const = 0; + virtual document::BucketSpace getBucketSpace(document::ByteBuffer& buf) const = 0; + virtual void putBucketSpace(document::BucketSpace bucketSpace, vespalib::GrowableByteBuffer& buf) const = 0; + virtual api::BucketInfo getBucketInfo(document::ByteBuffer& buf) const = 0; + virtual void putBucketInfo(const api::BucketInfo& info, vespalib::GrowableByteBuffer& buf) const = 0; +}; + +} // storage::mbusprot diff --git a/storageapi/src/vespa/storageapi/mbusprot/protobuf/common.proto b/storageapi/src/vespa/storageapi/mbusprot/protobuf/common.proto new file mode 100644 index 00000000000..d641449995d --- /dev/null +++ b/storageapi/src/vespa/storageapi/mbusprot/protobuf/common.proto @@ -0,0 +1,68 @@ +// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +syntax = "proto3"; + +option cc_enable_arenas = true; + +package storage.mbusprot.protobuf; + +// Note: we use a *Request/*Response naming convention rather than *Command/*Reply, +// as the former is the gRPC convention and that's where we intend to move. + +message BucketSpace { + uint64 space_id = 1; +} + +message BucketId { + fixed64 raw_id = 1; +} + +message Bucket { + uint64 space_id = 1; + fixed64 raw_bucket_id = 2; +} + +// Next tag to use: 3 +message BucketInfo { + uint64 last_modified_timestamp = 1; + fixed32 legacy_checksum = 2; + // TODO v2 checksum + uint32 doc_count = 3; + uint32 total_doc_size = 4; + uint32 meta_count = 5; + uint32 used_file_size = 6; + bool ready = 7; + bool active = 8; +} + +message GlobalId { + // 96 bits of GID data in _little_ endian. High entropy, so fixed encoding is better than varint. + // Low 64 bits as if memcpy()ed from bytes [0, 8) of the GID buffer + fixed64 lo_64 = 1; + // High 32 bits as if memcpy()ed from bytes [8, 12) of the GID buffer + fixed32 hi_32 = 2; +} + +// TODO these should ideally be gRPC headers.. +message RequestHeader { + uint64 message_id = 1; + uint32 priority = 2; // Always in range [0, 255] + uint32 source_index = 3; // Always in range [0, 65535] + fixed32 loadtype_id = 4; // It's a hash with high entropy, so fixed encoding is better than varint +} + +// TODO these should ideally be gRPC headers.. +message ResponseHeader { + // TODO this should ideally be gRPC Status... + uint32 return_code_id = 1; + bytes return_code_message = 2; // FIXME it's `bytes` since `string` will check for UTF-8... might not hold... + uint64 message_id = 3; + uint32 priority = 4; // Always in range [0, 255] +} + +message Document { + bytes payload = 1; +} + +message DocumentId { + bytes id = 1; +} diff --git a/storageapi/src/vespa/storageapi/mbusprot/protobuf/feed.proto b/storageapi/src/vespa/storageapi/mbusprot/protobuf/feed.proto new file mode 100644 index 00000000000..58da24df836 --- /dev/null +++ b/storageapi/src/vespa/storageapi/mbusprot/protobuf/feed.proto @@ -0,0 +1,91 @@ +// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +syntax = "proto3"; + +option cc_enable_arenas = true; + +package storage.mbusprot.protobuf; + +import "common.proto"; + +message TestAndSetCondition { + bytes selection = 1; +} + +message PutRequest { + Bucket bucket = 1; + Document document = 2; + uint64 new_timestamp = 3; + uint64 expected_old_timestamp = 4; // If zero; no expectation. + TestAndSetCondition condition = 5; +} + +message PutResponse { + BucketInfo bucket_info = 1; + BucketId remapped_bucket_id = 2; + bool was_found = 3; +} + +message Update { + bytes payload = 1; +} + +message UpdateRequest { + Bucket bucket = 1; + Update update = 2; + uint64 new_timestamp = 3; + uint64 expected_old_timestamp = 4; // If zero; no expectation. + TestAndSetCondition condition = 5; +} + +message UpdateResponse { + BucketInfo bucket_info = 1; + BucketId remapped_bucket_id = 2; + uint64 updated_timestamp = 3; +} + +message RemoveRequest { + Bucket bucket = 1; + bytes document_id = 2; + uint64 new_timestamp = 3; + TestAndSetCondition condition = 4; +} + +message RemoveResponse { + BucketInfo bucket_info = 1; + BucketId remapped_bucket_id = 2; + uint64 removed_timestamp = 3; +} + +message GetRequest { + Bucket bucket = 1; + bytes document_id = 2; + bytes field_set = 3; + uint64 before_timestamp = 4; +} + +message GetResponse { + Document document = 1; + uint64 last_modified_timestamp = 2; + BucketInfo bucket_info = 3; + BucketId remapped_bucket_id = 4; +} + +message RevertRequest { + Bucket bucket = 1; + repeated uint64 revert_tokens = 2; +} + +message RevertResponse { + BucketInfo bucket_info = 1; + BucketId remapped_bucket_id = 2; +} + +message RemoveLocationRequest { + Bucket bucket = 1; + bytes document_selection = 2; +} + +message RemoveLocationResponse { + BucketInfo bucket_info = 1; + BucketId remapped_bucket_id = 2; +} diff --git a/storageapi/src/vespa/storageapi/mbusprot/protobuf/maintenance.proto b/storageapi/src/vespa/storageapi/mbusprot/protobuf/maintenance.proto new file mode 100644 index 00000000000..c4766d2900a --- /dev/null +++ b/storageapi/src/vespa/storageapi/mbusprot/protobuf/maintenance.proto @@ -0,0 +1,160 @@ +// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +syntax = "proto3"; + +option cc_enable_arenas = true; + +package storage.mbusprot.protobuf; + +import "common.proto"; + +message DeleteBucketRequest { + Bucket bucket = 1; + BucketInfo expected_bucket_info = 2; +} + +message DeleteBucketResponse { + BucketInfo bucket_info = 1; + BucketId remapped_bucket_id = 2; +} + +message CreateBucketRequest { + Bucket bucket = 1; + bool create_as_active = 2; +} + +message CreateBucketResponse { + BucketInfo bucket_info = 1; + BucketId remapped_bucket_id = 2; +} + +message MergeNode { + uint32 index = 1; + bool source_only = 2; +} + +message MergeBucketRequest { + Bucket bucket = 1; + uint32 cluster_state_version = 2; + uint64 max_timestamp = 3; + repeated MergeNode nodes = 4; + repeated uint32 node_chain = 5; +} + +message MergeBucketResponse { + BucketId remapped_bucket_id = 1; +} + +message MetaDiffEntry { + uint64 timestamp = 1; + GlobalId gid = 2; + uint32 header_size = 3; + uint32 body_size = 4; + uint32 flags = 5; + uint32 presence_mask = 6; +} + +message GetBucketDiffRequest { + Bucket bucket = 1; + uint64 max_timestamp = 2; + repeated MergeNode nodes = 3; + repeated MetaDiffEntry diff = 4; +} + +message GetBucketDiffResponse { + BucketId remapped_bucket_id = 1; + repeated MetaDiffEntry diff = 2; +} + +message ApplyDiffEntry { + MetaDiffEntry entry_meta = 1; + bytes document_id = 2; + bytes header_blob = 3; + bytes body_blob = 4; +} + +message ApplyBucketDiffRequest { + Bucket bucket = 1; + repeated MergeNode nodes = 2; + uint32 max_buffer_size = 3; + repeated ApplyDiffEntry entries = 4; +} + +message ApplyBucketDiffResponse { + BucketId remapped_bucket_id = 1; + repeated ApplyDiffEntry entries = 4; +} + +message ExplicitBucketSet { + // `Bucket` is not needed, as the space is inferred from the owning message. + repeated BucketId bucket_ids = 2; +} + +message AllBuckets { + uint32 distributor_index = 1; + bytes cluster_state = 2; + bytes distribution_hash = 3; +} + +message RequestBucketInfoRequest { + BucketSpace bucket_space = 1; + oneof request_for { + ExplicitBucketSet explicit_bucket_set = 2; + AllBuckets all_buckets = 3; + } +} + +message BucketAndBucketInfo { + fixed64 raw_bucket_id = 1; + BucketInfo bucket_info = 2; +} + +message RequestBucketInfoResponse { + repeated BucketAndBucketInfo bucket_infos = 1; +} + +message NotifyBucketChangeRequest { + Bucket bucket = 1; + BucketInfo bucket_info = 2; +} + +message NotifyBucketChangeResponse { + // Currently empty +} + +message SplitBucketRequest { + Bucket bucket = 1; + uint32 min_split_bits = 2; + uint32 max_split_bits = 3; + uint32 min_byte_size = 4; + uint32 min_doc_count = 5; +} + +message SplitBucketResponse { + BucketId remapped_bucket_id = 1; + repeated BucketAndBucketInfo split_info = 2; +} + +message JoinBucketsRequest { + Bucket bucket = 1; + repeated BucketId source_buckets = 2; + uint32 min_join_bits = 3; +} + +message JoinBucketsResponse { + BucketInfo bucket_info = 1; + BucketId remapped_bucket_id = 2; +} + +message SetBucketStateRequest { + enum BucketState { + Inactive = 0; + Active = 1; + } + + Bucket bucket = 1; + BucketState state = 2; +} + +message SetBucketStateResponse { + BucketId remapped_bucket_id = 1; +} diff --git a/storageapi/src/vespa/storageapi/mbusprot/protobuf/visiting.proto b/storageapi/src/vespa/storageapi/mbusprot/protobuf/visiting.proto new file mode 100644 index 00000000000..89ce39e52a0 --- /dev/null +++ b/storageapi/src/vespa/storageapi/mbusprot/protobuf/visiting.proto @@ -0,0 +1,66 @@ +// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +syntax = "proto3"; + +option cc_enable_arenas = true; + +package storage.mbusprot.protobuf; + +import "common.proto"; + +message ClientVisitorParameter { + bytes key = 1; + bytes value = 2; +} + +message VisitorConstraints { + bytes document_selection = 1; + uint64 from_time_usec = 2; + uint64 to_time_usec = 3; + bool visit_removes = 4; + bytes field_set = 5; + bool visit_inconsistent_buckets = 6; +} + +message VisitorControlMeta { + bytes instance_id = 1; + bytes library_name = 2; + uint32 visitor_command_id = 3; + bytes control_destination = 4; + bytes data_destination = 5; + + // TODO move? + uint32 max_pending_reply_count = 6; + uint32 queue_timeout = 7; + uint32 max_buckets_per_visitor = 8; +} + +message CreateVisitorRequest { + BucketSpace bucket_space = 1; + repeated BucketId buckets = 2; + + VisitorConstraints constraints = 3; + VisitorControlMeta control_meta = 4; + repeated ClientVisitorParameter client_parameters = 5; +} + +message VisitorStatistics { + uint32 buckets_visited = 1; + uint64 documents_visited = 2; + uint64 bytes_visited = 3; + uint64 documents_returned = 4; + uint64 bytes_returned = 5; + uint64 second_pass_documents_returned = 6; // TODO don't include? orderdoc only + uint64 second_pass_bytes_returned = 7; // TODO don't include? orderdoc only +} + +message CreateVisitorResponse { + VisitorStatistics visitor_statistics = 1; +} + +message DestroyVisitorRequest { + bytes instance_id = 1; +} + +message DestroyVisitorResponse { + // Currently empty +} diff --git a/storageapi/src/vespa/storageapi/mbusprot/protobuf_includes.h b/storageapi/src/vespa/storageapi/mbusprot/protobuf_includes.h new file mode 100644 index 00000000000..8e878cf0560 --- /dev/null +++ b/storageapi/src/vespa/storageapi/mbusprot/protobuf_includes.h @@ -0,0 +1,13 @@ +// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +// Disable warnings emitted by protoc generated files +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wsuggest-override" + +#include "feed.pb.h" +#include "visiting.pb.h" +#include "maintenance.pb.h" + +#pragma GCC diagnostic pop diff --git a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization.cpp b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization.cpp index 172cd6c8de5..917b60c50c3 100644 --- a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization.cpp +++ b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization.cpp @@ -17,11 +17,6 @@ LOG_SETUP(".storage.api.mbusprot.serialization.base"); namespace storage::mbusprot { -ProtocolSerialization::ProtocolSerialization(const std::shared_ptr<const document::DocumentTypeRepo>& repo) - : _repo(repo) -{ -} - mbus::Blob ProtocolSerialization::encode(const api::StorageMessage& msg) const { diff --git a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization.h b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization.h index 9c3ddb88bdf..a57627b9ba9 100644 --- a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization.h +++ b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization.h @@ -59,21 +59,14 @@ class StorageCommand; class StorageReply; class ProtocolSerialization { - const std::shared_ptr<const document::DocumentTypeRepo> _repo; - public: virtual mbus::Blob encode(const api::StorageMessage&) const; virtual std::unique_ptr<StorageCommand> decodeCommand(mbus::BlobRef) const; virtual std::unique_ptr<StorageReply> decodeReply( mbus::BlobRef, const api::StorageCommand&) const; - protected: - const document::DocumentTypeRepo& getTypeRepo() const { return *_repo; } - const std::shared_ptr<const document::DocumentTypeRepo> getTypeRepoSp() const - { return _repo; } - - ProtocolSerialization(const std::shared_ptr<const document::DocumentTypeRepo> &repo); - virtual ~ProtocolSerialization() {} + ProtocolSerialization() = default; + virtual ~ProtocolSerialization() = default; typedef api::StorageCommand SCmd; typedef api::StorageReply SRep; @@ -102,13 +95,10 @@ protected: virtual void onEncode(GBBuf&, const api::GetBucketDiffReply&) const = 0; virtual void onEncode(GBBuf&, const api::ApplyBucketDiffCommand&) const = 0; virtual void onEncode(GBBuf&, const api::ApplyBucketDiffReply&) const = 0; - virtual void onEncode(GBBuf&, - const api::RequestBucketInfoCommand&) const = 0; + virtual void onEncode(GBBuf&, const api::RequestBucketInfoCommand&) const = 0; virtual void onEncode(GBBuf&, const api::RequestBucketInfoReply&) const = 0; - virtual void onEncode(GBBuf&, - const api::NotifyBucketChangeCommand&) const = 0; - virtual void onEncode(GBBuf&, - const api::NotifyBucketChangeReply&) const = 0; + virtual void onEncode(GBBuf&, const api::NotifyBucketChangeCommand&) const = 0; + virtual void onEncode(GBBuf&, const api::NotifyBucketChangeReply&) const = 0; virtual void onEncode(GBBuf&, const api::SplitBucketCommand&) const = 0; virtual void onEncode(GBBuf&, const api::SplitBucketReply&) const = 0; virtual void onEncode(GBBuf&, const api::JoinBucketsCommand&) const = 0; @@ -143,11 +133,9 @@ protected: virtual SCmd::UP onDecodeApplyBucketDiffCommand(BBuf&) const = 0; virtual SRep::UP onDecodeApplyBucketDiffReply(const SCmd&, BBuf&) const = 0; virtual SCmd::UP onDecodeRequestBucketInfoCommand(BBuf&) const = 0; - virtual SRep::UP onDecodeRequestBucketInfoReply(const SCmd&, - BBuf&) const = 0; + virtual SRep::UP onDecodeRequestBucketInfoReply(const SCmd&, BBuf&) const = 0; virtual SCmd::UP onDecodeNotifyBucketChangeCommand(BBuf&) const = 0; - virtual SRep::UP onDecodeNotifyBucketChangeReply(const SCmd&, - BBuf&) const = 0; + virtual SRep::UP onDecodeNotifyBucketChangeReply(const SCmd&, BBuf&) const = 0; virtual SCmd::UP onDecodeSplitBucketCommand(BBuf&) const = 0; virtual SRep::UP onDecodeSplitBucketReply(const SCmd&, BBuf&) const = 0; virtual SCmd::UP onDecodeJoinBucketsCommand(BBuf&) const = 0; @@ -160,14 +148,6 @@ protected: virtual SRep::UP onDecodeDestroyVisitorReply(const SCmd&, BBuf&) const = 0; virtual SCmd::UP onDecodeRemoveLocationCommand(BBuf&) const = 0; virtual SRep::UP onDecodeRemoveLocationReply(const SCmd&, BBuf&) const = 0; - - virtual document::Bucket getBucket(document::ByteBuffer& buf) const = 0; - virtual void putBucket(const document::Bucket& bucket, vespalib::GrowableByteBuffer& buf) const = 0; - virtual document::BucketSpace getBucketSpace(document::ByteBuffer& buf) const = 0; - virtual void putBucketSpace(document::BucketSpace bucketSpace, vespalib::GrowableByteBuffer& buf) const = 0; - virtual api::BucketInfo getBucketInfo(document::ByteBuffer& buf) const = 0; - virtual void putBucketInfo(const api::BucketInfo& info, vespalib::GrowableByteBuffer& buf) const = 0; - }; } diff --git a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization4_2.cpp b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization4_2.cpp index 74a0c964d19..466ff85f398 100644 --- a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization4_2.cpp +++ b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization4_2.cpp @@ -20,7 +20,7 @@ namespace storage::mbusprot { ProtocolSerialization4_2::ProtocolSerialization4_2( const std::shared_ptr<const document::DocumentTypeRepo>& repo) - : ProtocolSerialization(repo) + : LegacyProtocolSerialization(repo) { } diff --git a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization4_2.h b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization4_2.h index 56aa3d4ed30..e4ab36dc989 100644 --- a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization4_2.h +++ b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization4_2.h @@ -1,13 +1,13 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #pragma once -#include "protocolserialization.h" +#include "legacyprotocolserialization.h" namespace storage::mbusprot { -class ProtocolSerialization4_2 : public ProtocolSerialization { +class ProtocolSerialization4_2 : public LegacyProtocolSerialization { public: - ProtocolSerialization4_2(const std::shared_ptr<const document::DocumentTypeRepo>&); + explicit ProtocolSerialization4_2(const std::shared_ptr<const document::DocumentTypeRepo>&); protected: void onEncode(GBBuf&, const api::GetCommand&) const override; diff --git a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization5_0.h b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization5_0.h index 042ec7850ef..67f02aa2d2a 100644 --- a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization5_0.h +++ b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization5_0.h @@ -73,6 +73,9 @@ public: SRep::UP onDecodeCreateVisitorReply(const SCmd& cmd, BBuf& buf) const override; void onDecodeCommand(BBuf& buf, api::StorageCommand& msg) const override; void onDecodeReply(BBuf&, api::StorageReply&) const override; + +protected: + const documentapi::LoadTypeSet& loadTypes() const noexcept { return _loadTypes; }; }; } diff --git a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.cpp b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.cpp new file mode 100644 index 00000000000..ca77977046c --- /dev/null +++ b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.cpp @@ -0,0 +1,1268 @@ +// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "protocolserialization7.h" +#include "serializationhelper.h" +#include "protobuf_includes.h" + +#include <vespa/document/update/documentupdate.h> +#include <vespa/document/util/bufferexceptions.h> +#include <vespa/storageapi/message/bucketsplitting.h> +#include <vespa/storageapi/message/persistence.h> +#include <vespa/storageapi/message/removelocation.h> +#include <vespa/storageapi/message/visitor.h> + +namespace storage::mbusprot { + +ProtocolSerialization7::ProtocolSerialization7(std::shared_ptr<const document::DocumentTypeRepo> repo, + const documentapi::LoadTypeSet& load_types) + : ProtocolSerialization(), + _repo(std::move(repo)), + _load_types(load_types) +{ +} + +namespace { + +void set_bucket(protobuf::Bucket& dest, const document::Bucket& src) { + dest.set_raw_bucket_id(src.getBucketId().getRawId()); + dest.set_space_id(src.getBucketSpace().getId()); +} + +void set_bucket_id(protobuf::BucketId& dest, const document::BucketId& src) { + dest.set_raw_id(src.getRawId()); +} + +document::BucketId get_bucket_id(const protobuf::BucketId& src) { + return document::BucketId(src.raw_id()); +} + +void set_bucket_space(protobuf::BucketSpace& dest, const document::BucketSpace& src) { + dest.set_space_id(src.getId()); +} + +document::BucketSpace get_bucket_space(const protobuf::BucketSpace& src) { + return document::BucketSpace(src.space_id()); +} + +void set_bucket_info(protobuf::BucketInfo& dest, const api::BucketInfo& src) { + dest.set_last_modified_timestamp(src.getLastModified()); + dest.set_legacy_checksum(src.getChecksum()); + dest.set_doc_count(src.getDocumentCount()); + dest.set_total_doc_size(src.getTotalDocumentSize()); + dest.set_meta_count(src.getMetaCount()); + dest.set_used_file_size(src.getUsedFileSize()); + dest.set_active(src.isActive()); + dest.set_ready(src.isReady()); +} + +document::Bucket get_bucket(const protobuf::Bucket& src) { + return document::Bucket(document::BucketSpace(src.space_id()), + document::BucketId(src.raw_bucket_id())); +} + +api::BucketInfo get_bucket_info(const protobuf::BucketInfo& src) { + api::BucketInfo info; + info.setLastModified(src.last_modified_timestamp()); + info.setChecksum(src.legacy_checksum()); + info.setDocumentCount(src.doc_count()); + info.setTotalDocumentSize(src.total_doc_size()); + info.setMetaCount(src.meta_count()); + info.setUsedFileSize(src.used_file_size()); + info.setActive(src.active()); + info.setReady(src.ready()); + return info; +} + +documentapi::TestAndSetCondition get_tas_condition(const protobuf::TestAndSetCondition& src) { + return documentapi::TestAndSetCondition(src.selection()); +} + +void set_tas_condition(protobuf::TestAndSetCondition& dest, const documentapi::TestAndSetCondition& src) { + dest.set_selection(src.getSelection().data(), src.getSelection().size()); +} + +std::shared_ptr<document::Document> get_document(const protobuf::Document& src_doc, + const document::DocumentTypeRepo& type_repo) +{ + if (!src_doc.payload().empty()) { + document::ByteBuffer doc_buf(src_doc.payload().data(), src_doc.payload().size()); + return std::make_shared<document::Document>(type_repo, doc_buf); + } + return std::shared_ptr<document::Document>(); +} + +void set_update(protobuf::Update& dest, const document::DocumentUpdate& src) { + vespalib::nbostream stream; + src.serializeHEAD(stream); + dest.set_payload(stream.peek(), stream.size()); +} + +std::shared_ptr<document::DocumentUpdate> get_update(const protobuf::Update& src, + const document::DocumentTypeRepo& type_repo) +{ + if (!src.payload().empty()) { + return document::DocumentUpdate::createHEAD( + type_repo, vespalib::nbostream(src.payload().data(), src.payload().size())); + } + return std::shared_ptr<document::DocumentUpdate>(); +} + +void write_request_header(vespalib::GrowableByteBuffer& buf, const api::StorageCommand& cmd) { + protobuf::RequestHeader hdr; // Arena alloc not needed since there are no nested messages + hdr.set_message_id(cmd.getMsgId()); + hdr.set_priority(cmd.getPriority()); + hdr.set_source_index(cmd.getSourceIndex()); + hdr.set_loadtype_id(cmd.getLoadType().getId()); + + uint8_t dest[128]; // Only primitive fields, should be plenty large enough. + auto encoded_size = static_cast<uint32_t>(hdr.ByteSizeLong()); + assert(encoded_size <= sizeof(dest)); + [[maybe_unused]] bool ok = hdr.SerializeWithCachedSizesToArray(dest); + assert(ok); + buf.putInt(encoded_size); + buf.putBytes(reinterpret_cast<const char*>(dest), encoded_size); +} + +void write_response_header(vespalib::GrowableByteBuffer& buf, const api::StorageReply& reply) { + protobuf::ResponseHeader hdr; // Arena alloc not needed since there are no nested messages + const auto& result = reply.getResult(); + hdr.set_return_code_id(static_cast<uint32_t>(result.getResult())); + if (!result.getMessage().empty()) { + hdr.set_return_code_message(result.getMessage().data(), result.getMessage().size()); + } + hdr.set_message_id(reply.getMsgId()); + hdr.set_priority(reply.getPriority()); + + const auto header_size = hdr.ByteSizeLong(); + assert(header_size <= UINT32_MAX); + buf.putInt(static_cast<uint32_t>(header_size)); + + auto* dest_buf = reinterpret_cast<uint8_t*>(buf.allocate(header_size)); + [[maybe_unused]] bool ok = hdr.SerializeWithCachedSizesToArray(dest_buf); + assert(ok); +} + +void decode_request_header(document::ByteBuffer& buf, protobuf::RequestHeader& hdr) { + auto hdr_len = static_cast<uint32_t>(SerializationHelper::getInt(buf)); + if (hdr_len > buf.getRemaining()) { + throw document::BufferOutOfBoundsException(buf.getPos(), hdr_len); + } + bool ok = hdr.ParseFromArray(buf.getBufferAtPos(), hdr_len); + if (!ok) { + throw vespalib::IllegalArgumentException("Malformed protobuf request header"); + } + buf.incPos(hdr_len); +} + +void decode_response_header(document::ByteBuffer& buf, protobuf::ResponseHeader& hdr) { + auto hdr_len = static_cast<uint32_t>(SerializationHelper::getInt(buf)); + if (hdr_len > buf.getRemaining()) { + throw document::BufferOutOfBoundsException(buf.getPos(), hdr_len); + } + bool ok = hdr.ParseFromArray(buf.getBufferAtPos(), hdr_len); + if (!ok) { + throw vespalib::IllegalArgumentException("Malformed protobuf response header"); + } + buf.incPos(hdr_len); +} + +} // anonymous namespace + +template <typename ProtobufType> +class BaseEncoder { + vespalib::GrowableByteBuffer& _out_buf; + ::google::protobuf::Arena _arena; + ProtobufType* _proto_obj; +public: + explicit BaseEncoder(vespalib::GrowableByteBuffer& out_buf) + : _out_buf(out_buf), + _arena(), + _proto_obj(::google::protobuf::Arena::Create<ProtobufType>(&_arena)) + { + } + + void encode() { + assert(_proto_obj != nullptr); + const auto sz = _proto_obj->ByteSizeLong(); + assert(sz <= UINT32_MAX); + auto* buf = reinterpret_cast<uint8_t*>(_out_buf.allocate(sz)); + [[maybe_unused]] bool ok = _proto_obj->SerializeWithCachedSizesToArray(buf); + assert(ok); + _proto_obj = nullptr; + } +protected: + vespalib::GrowableByteBuffer& buffer() noexcept { return _out_buf; } + + // Precondition: encode() is not called + ProtobufType& proto_obj() noexcept { return *_proto_obj; } + const ProtobufType& proto_obj() const noexcept { return *_proto_obj; } +}; + +template <typename ProtobufType> +class RequestEncoder : public BaseEncoder<ProtobufType> { +public: + RequestEncoder(vespalib::GrowableByteBuffer& out_buf, const api::StorageCommand& cmd) + : BaseEncoder<ProtobufType>(out_buf) + { + write_request_header(out_buf, cmd); + } + + // Precondition: encode() is not called + ProtobufType& request() noexcept { return this->proto_obj(); } + const ProtobufType& request() const noexcept { return this->proto_obj(); } +}; + +template <typename ProtobufType> +class ResponseEncoder : public BaseEncoder<ProtobufType> { +public: + ResponseEncoder(vespalib::GrowableByteBuffer& out_buf, const api::StorageReply& reply) + : BaseEncoder<ProtobufType>(out_buf) + { + write_response_header(out_buf, reply); + } + + // Precondition: encode() is not called + ProtobufType& response() noexcept { return this->proto_obj(); } + const ProtobufType& response() const noexcept { return this->proto_obj(); } +}; + +template <typename ProtobufType> +class RequestDecoder { + protobuf::RequestHeader _hdr; + ::google::protobuf::Arena _arena; + ProtobufType* _proto_obj; + const documentapi::LoadTypeSet& _load_types; +public: + RequestDecoder(document::ByteBuffer& in_buf, const documentapi::LoadTypeSet& load_types) + : _arena(), + _proto_obj(::google::protobuf::Arena::Create<ProtobufType>(&_arena)), + _load_types(load_types) + { + decode_request_header(in_buf, _hdr); + assert(in_buf.getRemaining() <= INT_MAX); + bool ok = _proto_obj->ParseFromArray(in_buf.getBufferAtPos(), in_buf.getRemaining()); + if (!ok) { + throw vespalib::IllegalArgumentException( + vespalib::make_string("Malformed protobuf request payload for %s", + ProtobufType::descriptor()->full_name().c_str())); + } + } + + void transfer_meta_information_to(api::StorageCommand& dest) { + dest.forceMsgId(_hdr.message_id()); + dest.setPriority(static_cast<uint8_t>(_hdr.priority())); + dest.setSourceIndex(static_cast<uint16_t>(_hdr.source_index())); + dest.setLoadType(_load_types[_hdr.loadtype_id()]); + } + + ProtobufType& request() noexcept { return *_proto_obj; } + const ProtobufType& request() const noexcept { return *_proto_obj; } +}; + +template <typename ProtobufType> +class ResponseDecoder { + protobuf::ResponseHeader _hdr; + ::google::protobuf::Arena _arena; + ProtobufType* _proto_obj; +public: + explicit ResponseDecoder(document::ByteBuffer& in_buf) + : _arena(), + _proto_obj(::google::protobuf::Arena::Create<ProtobufType>(&_arena)) + { + decode_response_header(in_buf, _hdr); + assert(in_buf.getRemaining() <= INT_MAX); + bool ok = _proto_obj->ParseFromArray(in_buf.getBufferAtPos(), in_buf.getRemaining()); + if (!ok) { + throw vespalib::IllegalArgumentException( + vespalib::make_string("Malformed protobuf response payload for %s", + ProtobufType::descriptor()->full_name().c_str())); + } + } + + ProtobufType& response() noexcept { return *_proto_obj; } + const ProtobufType& response() const noexcept { return *_proto_obj; } +}; + +template <typename ProtobufType, typename Func> +void encode_request(vespalib::GrowableByteBuffer& out_buf, const api::StorageCommand& msg, Func&& f) { + RequestEncoder<ProtobufType> enc(out_buf, msg); + f(enc.request()); + enc.encode(); +} + +template <typename ProtobufType, typename Func> +void encode_response(vespalib::GrowableByteBuffer& out_buf, const api::StorageReply& reply, Func&& f) { + ResponseEncoder<ProtobufType> enc(out_buf, reply); + auto& res = enc.response(); + f(res); + enc.encode(); +} + +template <typename ProtobufType, typename Func> +std::unique_ptr<api::StorageCommand> +ProtocolSerialization7::decode_request(document::ByteBuffer& in_buf, Func&& f) const { + RequestDecoder<ProtobufType> dec(in_buf, _load_types); + const auto& req = dec.request(); + auto cmd = f(req); + dec.transfer_meta_information_to(*cmd); + return cmd; +} + +template <typename ProtobufType, typename Func> +std::unique_ptr<api::StorageReply> +ProtocolSerialization7::decode_response(document::ByteBuffer& in_buf, Func&& f) const { + ResponseDecoder<ProtobufType> dec(in_buf); + const auto& res = dec.response(); + auto reply = f(res); + return reply; +} + +template <typename ProtobufType, typename Func> +void encode_bucket_request(vespalib::GrowableByteBuffer& out_buf, const api::BucketCommand& msg, Func&& f) { + encode_request<ProtobufType>(out_buf, msg, [&](ProtobufType& req) { + set_bucket(*req.mutable_bucket(), msg.getBucket()); + f(req); + }); +} + +template <typename ProtobufType, typename Func> +std::unique_ptr<api::StorageCommand> +ProtocolSerialization7::decode_bucket_request(document::ByteBuffer& in_buf, Func&& f) const { + return decode_request<ProtobufType>(in_buf, [&](const ProtobufType& req) { + if (!req.has_bucket()) { + throw vespalib::IllegalArgumentException( + vespalib::make_string("Malformed protocol buffer request for %s; no bucket", + ProtobufType::descriptor()->full_name().c_str())); + } + const auto bucket = get_bucket(req.bucket()); + return f(req, bucket); + }); +} + +template <typename ProtobufType, typename Func> +void encode_bucket_response(vespalib::GrowableByteBuffer& out_buf, const api::BucketReply& reply, Func&& f) { + encode_response<ProtobufType>(out_buf, reply, [&](ProtobufType& res) { + if (reply.hasBeenRemapped()) { + set_bucket_id(*res.mutable_remapped_bucket_id(), reply.getBucketId()); + } + f(res); + }); +} + +template <typename ProtobufType, typename Func> +std::unique_ptr<api::StorageReply> +ProtocolSerialization7::decode_bucket_response(document::ByteBuffer& in_buf, Func&& f) const { + return decode_response<ProtobufType>(in_buf, [&](const ProtobufType& res) { + auto reply = f(res); + if (res.has_remapped_bucket_id()) { + reply->remapBucketId(get_bucket_id(res.remapped_bucket_id())); + } + return reply; + }); +} + +template <typename ProtobufType, typename Func> +void encode_bucket_info_response(vespalib::GrowableByteBuffer& out_buf, const api::BucketInfoReply& reply, Func&& f) { + encode_bucket_response<ProtobufType>(out_buf, reply, [&](ProtobufType& res) { + set_bucket_info(*res.mutable_bucket_info(), reply.getBucketInfo()); + f(res); + }); +} + +template <typename ProtobufType, typename Func> +std::unique_ptr<api::StorageReply> +ProtocolSerialization7::decode_bucket_info_response(document::ByteBuffer& in_buf, Func&& f) const { + return decode_bucket_response<ProtobufType>(in_buf, [&](const ProtobufType& res) { + auto reply = f(res); + if (res.has_bucket_info()) { + reply->setBucketInfo(get_bucket_info(res.bucket_info())); + } + return reply; + }); +} + +// TODO document protobuf ducktyping assumptions + +namespace { +// Inherit from known base class just to avoid having to template this. We don't care about its subtype anyway. +void no_op_encode([[maybe_unused]] ::google::protobuf::Message&) { + // nothing to do here. +} + +void set_document(protobuf::Document& target_doc, const document::Document& src_doc) { + vespalib::nbostream stream; + src_doc.serialize(stream); + target_doc.set_payload(stream.peek(), stream.size()); +} + +} + +// ----------------------------------------------------------------- +// Put +// ----------------------------------------------------------------- + +void ProtocolSerialization7::onEncode(GBBuf& buf, const api::PutCommand& msg) const { + encode_bucket_request<protobuf::PutRequest>(buf, msg, [&](auto& req) { + req.set_new_timestamp(msg.getTimestamp()); + req.set_expected_old_timestamp(msg.getUpdateTimestamp()); + if (msg.getCondition().isPresent()) { + set_tas_condition(*req.mutable_condition(), msg.getCondition()); + } + if (msg.getDocument()) { + set_document(*req.mutable_document(), *msg.getDocument()); + } + }); +} + +void ProtocolSerialization7::onEncode(GBBuf& buf, const api::PutReply& msg) const { + encode_bucket_info_response<protobuf::PutResponse>(buf, msg, [&](auto& res) { + res.set_was_found(msg.wasFound()); + }); +} + +api::StorageCommand::UP ProtocolSerialization7::onDecodePutCommand(BBuf& buf) const { + return decode_bucket_request<protobuf::PutRequest>(buf, [&](auto& req, auto& bucket) { + auto document = get_document(req.document(), type_repo()); + auto cmd = std::make_unique<api::PutCommand>(bucket, std::move(document), req.new_timestamp()); + cmd->setUpdateTimestamp(req.expected_old_timestamp()); + if (req.has_condition()) { + cmd->setCondition(get_tas_condition(req.condition())); + } + return cmd; + }); +} + +api::StorageReply::UP ProtocolSerialization7::onDecodePutReply(const SCmd& cmd, BBuf& buf) const { + return decode_bucket_info_response<protobuf::PutResponse>(buf, [&](auto& res) { + return std::make_unique<api::PutReply>(static_cast<const api::PutCommand&>(cmd), res.was_found()); + }); +} + +// ----------------------------------------------------------------- +// Update +// ----------------------------------------------------------------- + +void ProtocolSerialization7::onEncode(GBBuf& buf, const api::UpdateCommand& msg) const { + encode_bucket_request<protobuf::UpdateRequest>(buf, msg, [&](auto& req) { + auto* update = msg.getUpdate().get(); + if (update) { + set_update(*req.mutable_update(), *update); + } + req.set_new_timestamp(msg.getTimestamp()); + req.set_expected_old_timestamp(msg.getOldTimestamp()); + if (msg.getCondition().isPresent()) { + set_tas_condition(*req.mutable_condition(), msg.getCondition()); + } + }); +} + +void ProtocolSerialization7::onEncode(GBBuf& buf, const api::UpdateReply& msg) const { + encode_bucket_info_response<protobuf::UpdateResponse>(buf, msg, [&](auto& res) { + res.set_updated_timestamp(msg.getOldTimestamp()); + }); +} + +api::StorageCommand::UP ProtocolSerialization7::onDecodeUpdateCommand(BBuf& buf) const { + return decode_bucket_request<protobuf::UpdateRequest>(buf, [&](auto& req, auto& bucket) { + auto update = get_update(req.update(), type_repo()); + auto cmd = std::make_unique<api::UpdateCommand>(bucket, std::move(update), req.new_timestamp()); + cmd->setOldTimestamp(req.expected_old_timestamp()); + if (req.has_condition()) { + cmd->setCondition(get_tas_condition(req.condition())); + } + return cmd; + }); +} + +api::StorageReply::UP ProtocolSerialization7::onDecodeUpdateReply(const SCmd& cmd, BBuf& buf) const { + return decode_bucket_info_response<protobuf::UpdateResponse>(buf, [&](auto& res) { + return std::make_unique<api::UpdateReply>(static_cast<const api::UpdateCommand&>(cmd), + res.updated_timestamp()); + }); +} + +// ----------------------------------------------------------------- +// Remove +// ----------------------------------------------------------------- + +void ProtocolSerialization7::onEncode(GBBuf& buf, const api::RemoveCommand& msg) const { + encode_bucket_request<protobuf::RemoveRequest>(buf, msg, [&](auto& req) { + auto doc_id_str = msg.getDocumentId().toString(); + req.set_document_id(doc_id_str.data(), doc_id_str.size()); + req.set_new_timestamp(msg.getTimestamp()); + if (msg.getCondition().isPresent()) { + set_tas_condition(*req.mutable_condition(), msg.getCondition()); + } + }); +} + +void ProtocolSerialization7::onEncode(GBBuf& buf, const api::RemoveReply& msg) const { + encode_bucket_info_response<protobuf::RemoveResponse>(buf, msg, [&](auto& res) { + res.set_removed_timestamp(msg.getOldTimestamp()); + }); +} + +api::StorageCommand::UP ProtocolSerialization7::onDecodeRemoveCommand(BBuf& buf) const { + return decode_bucket_request<protobuf::RemoveRequest>(buf, [&](auto& req, auto& bucket) { + document::DocumentId doc_id(vespalib::stringref(req.document_id().data(), req.document_id().size())); + auto cmd = std::make_unique<api::RemoveCommand>(bucket, doc_id, req.new_timestamp()); + if (req.has_condition()) { + cmd->setCondition(get_tas_condition(req.condition())); + } + return cmd; + }); +} + +api::StorageReply::UP ProtocolSerialization7::onDecodeRemoveReply(const SCmd& cmd, BBuf& buf) const { + return decode_bucket_info_response<protobuf::RemoveResponse>(buf, [&](auto& res) { + return std::make_unique<api::RemoveReply>(static_cast<const api::RemoveCommand&>(cmd), + res.removed_timestamp()); + }); +} + +// ----------------------------------------------------------------- +// Get +// ----------------------------------------------------------------- + +void ProtocolSerialization7::onEncode(GBBuf& buf, const api::GetCommand& msg) const { + encode_bucket_request<protobuf::GetRequest>(buf, msg, [&](auto& req) { + auto doc_id = msg.getDocumentId().toString(); + req.set_document_id(doc_id.data(), doc_id.size()); + req.set_before_timestamp(msg.getBeforeTimestamp()); + if (!msg.getFieldSet().empty()) { + req.set_field_set(msg.getFieldSet().data(), msg.getFieldSet().size()); + } + }); +} + +void ProtocolSerialization7::onEncode(GBBuf& buf, const api::GetReply& msg) const { + encode_bucket_info_response<protobuf::GetResponse>(buf, msg, [&](auto& res) { + if (msg.getDocument()) { + set_document(*res.mutable_document(), *msg.getDocument()); + } + res.set_last_modified_timestamp(msg.getLastModifiedTimestamp()); + }); +} + +api::StorageCommand::UP ProtocolSerialization7::onDecodeGetCommand(BBuf& buf) const { + return decode_bucket_request<protobuf::GetRequest>(buf, [&](auto& req, auto& bucket) { + document::DocumentId doc_id(vespalib::stringref(req.document_id().data(), req.document_id().size())); + return std::make_unique<api::GetCommand>(bucket, std::move(doc_id), + req.field_set(), req.before_timestamp()); + }); +} + +api::StorageReply::UP ProtocolSerialization7::onDecodeGetReply(const SCmd& cmd, BBuf& buf) const { + return decode_bucket_info_response<protobuf::GetResponse>(buf, [&](auto& res) { + try { + auto document = get_document(res.document(), type_repo()); + return std::make_unique<api::GetReply>(static_cast<const api::GetCommand&>(cmd), + std::move(document), res.last_modified_timestamp()); + } catch (std::exception& e) { + auto reply = std::make_unique<api::GetReply>(static_cast<const api::GetCommand&>(cmd), + std::shared_ptr<document::Document>(), 0u); + reply->setResult(api::ReturnCode(api::ReturnCode::UNPARSEABLE, e.what())); + return reply; + } + }); +} + +// ----------------------------------------------------------------- +// Revert +// ----------------------------------------------------------------- + +void ProtocolSerialization7::onEncode(GBBuf& buf, const api::RevertCommand& msg) const { + encode_bucket_request<protobuf::RevertRequest>(buf, msg, [&](auto& req) { + auto* tokens = req.mutable_revert_tokens(); + assert(msg.getRevertTokens().size() <= INT_MAX); + tokens->Reserve(static_cast<int>(msg.getRevertTokens().size())); + for (auto token : msg.getRevertTokens()) { + tokens->Add(token); + } + }); +} + +void ProtocolSerialization7::onEncode(GBBuf& buf, const api::RevertReply& msg) const { + encode_bucket_info_response<protobuf::RevertResponse>(buf, msg, no_op_encode); +} + +api::StorageCommand::UP ProtocolSerialization7::onDecodeRevertCommand(BBuf& buf) const { + return decode_bucket_request<protobuf::RevertRequest>(buf, [&](auto& req, auto& bucket) { + std::vector<api::Timestamp> tokens; + tokens.reserve(req.revert_tokens_size()); + for (auto token : req.revert_tokens()) { + tokens.emplace_back(api::Timestamp(token)); + } + return std::make_unique<api::RevertCommand>(bucket, std::move(tokens)); + }); +} + +api::StorageReply::UP ProtocolSerialization7::onDecodeRevertReply(const SCmd& cmd, BBuf& buf) const { + return decode_bucket_info_response<protobuf::RevertResponse>(buf, [&]([[maybe_unused]] auto& res) { + return std::make_unique<api::RevertReply>(static_cast<const api::RevertCommand&>(cmd)); + }); +} + +// ----------------------------------------------------------------- +// RemoveLocation +// ----------------------------------------------------------------- + +void ProtocolSerialization7::onEncode(GBBuf& buf, const api::RemoveLocationCommand& msg) const { + encode_bucket_request<protobuf::RemoveLocationRequest>(buf, msg, [&](auto& req) { + req.set_document_selection(msg.getDocumentSelection().data(), msg.getDocumentSelection().size()); + }); +} + +void ProtocolSerialization7::onEncode(GBBuf& buf, const api::RemoveLocationReply& msg) const { + encode_bucket_info_response<protobuf::RemoveLocationResponse>(buf, msg, no_op_encode); +} + +api::StorageCommand::UP ProtocolSerialization7::onDecodeRemoveLocationCommand(BBuf& buf) const { + return decode_bucket_request<protobuf::RemoveLocationRequest>(buf, [&](auto& req, auto& bucket) { + return std::make_unique<api::RemoveLocationCommand>(req.document_selection(), bucket); + }); +} + +api::StorageReply::UP ProtocolSerialization7::onDecodeRemoveLocationReply(const SCmd& cmd, BBuf& buf) const { + return decode_bucket_info_response<protobuf::RemoveLocationResponse>(buf, [&]([[maybe_unused]] auto& res) { + return std::make_unique<api::RemoveLocationReply>(static_cast<const api::RemoveLocationCommand&>(cmd)); + }); +} + +// ----------------------------------------------------------------- +// DeleteBucket +// ----------------------------------------------------------------- + +void ProtocolSerialization7::onEncode(GBBuf& buf, const api::DeleteBucketCommand& msg) const { + encode_bucket_request<protobuf::DeleteBucketRequest>(buf, msg, [&](auto& req) { + set_bucket_info(*req.mutable_expected_bucket_info(), msg.getBucketInfo()); + }); +} + +void ProtocolSerialization7::onEncode(GBBuf& buf, const api::DeleteBucketReply& msg) const { + encode_bucket_info_response<protobuf::DeleteBucketResponse>(buf, msg, no_op_encode); +} + +api::StorageCommand::UP ProtocolSerialization7::onDecodeDeleteBucketCommand(BBuf& buf) const { + return decode_bucket_request<protobuf::DeleteBucketRequest>(buf, [&](auto& req, auto& bucket) { + auto cmd = std::make_unique<api::DeleteBucketCommand>(bucket); + if (req.has_expected_bucket_info()) { + cmd->setBucketInfo(get_bucket_info(req.expected_bucket_info())); + } + return cmd; + }); +} + +api::StorageReply::UP ProtocolSerialization7::onDecodeDeleteBucketReply(const SCmd& cmd, BBuf& buf) const { + return decode_bucket_info_response<protobuf::DeleteBucketResponse>(buf, [&]([[maybe_unused]] auto& res) { + return std::make_unique<api::DeleteBucketReply>(static_cast<const api::DeleteBucketCommand&>(cmd)); + }); +} + +// ----------------------------------------------------------------- +// CreateBucket +// ----------------------------------------------------------------- + +void ProtocolSerialization7::onEncode(GBBuf& buf, const api::CreateBucketCommand& msg) const { + encode_bucket_request<protobuf::CreateBucketRequest>(buf, msg, [&](auto& req) { + req.set_create_as_active(msg.getActive()); + }); +} + +void ProtocolSerialization7::onEncode(GBBuf& buf, const api::CreateBucketReply& msg) const { + encode_bucket_info_response<protobuf::CreateBucketResponse>(buf, msg, no_op_encode); +} + +api::StorageCommand::UP ProtocolSerialization7::onDecodeCreateBucketCommand(BBuf& buf) const { + return decode_bucket_request<protobuf::CreateBucketRequest>(buf, [&](auto& req, auto& bucket) { + auto cmd = std::make_unique<api::CreateBucketCommand>(bucket); + cmd->setActive(req.create_as_active()); + return cmd; + }); +} + +api::StorageReply::UP ProtocolSerialization7::onDecodeCreateBucketReply(const SCmd& cmd, BBuf& buf) const { + return decode_bucket_info_response<protobuf::CreateBucketResponse>(buf, [&]([[maybe_unused]] auto& res) { + return std::make_unique<api::CreateBucketReply>(static_cast<const api::CreateBucketCommand&>(cmd)); + }); +} + +// ----------------------------------------------------------------- +// MergeBucket +// ----------------------------------------------------------------- + +namespace { + +void set_merge_nodes(::google::protobuf::RepeatedPtrField<protobuf::MergeNode>& dest, + const std::vector<api::MergeBucketCommand::Node>& src) +{ + dest.Reserve(src.size()); + for (const auto& src_node : src) { + auto* dest_node = dest.Add(); + dest_node->set_index(src_node.index); + dest_node->set_source_only(src_node.sourceOnly); + } +} + +std::vector<api::MergeBucketCommand::Node> get_merge_nodes( + const ::google::protobuf::RepeatedPtrField<protobuf::MergeNode>& src) +{ + std::vector<api::MergeBucketCommand::Node> nodes; + nodes.reserve(src.size()); + for (const auto& node : src) { + nodes.emplace_back(node.index(), node.source_only()); + } + return nodes; +} + +} + +void ProtocolSerialization7::onEncode(GBBuf& buf, const api::MergeBucketCommand& msg) const { + encode_bucket_request<protobuf::MergeBucketRequest>(buf, msg, [&](auto& req) { + set_merge_nodes(*req.mutable_nodes(), msg.getNodes()); + req.set_max_timestamp(msg.getMaxTimestamp()); + req.set_cluster_state_version(msg.getClusterStateVersion()); + for (uint16_t chain_node : msg.getChain()) { + req.add_node_chain(chain_node); + } + }); +} + +void ProtocolSerialization7::onEncode(GBBuf& buf, const api::MergeBucketReply& msg) const { + encode_bucket_response<protobuf::MergeBucketResponse>(buf, msg, no_op_encode); +} + +api::StorageCommand::UP ProtocolSerialization7::onDecodeMergeBucketCommand(BBuf& buf) const { + return decode_bucket_request<protobuf::MergeBucketRequest>(buf, [&](auto& req, auto& bucket) { + auto nodes = get_merge_nodes(req.nodes()); + auto cmd = std::make_unique<api::MergeBucketCommand>(bucket, std::move(nodes), req.max_timestamp()); + cmd->setClusterStateVersion(req.cluster_state_version()); + std::vector<uint16_t> chain; + chain.reserve(req.node_chain_size()); + for (uint16_t node : req.node_chain()) { + chain.emplace_back(node); + } + cmd->setChain(std::move(chain)); + return cmd; + }); +} + +api::StorageReply::UP ProtocolSerialization7::onDecodeMergeBucketReply(const SCmd& cmd, BBuf& buf) const { + return decode_bucket_response<protobuf::MergeBucketResponse>(buf, [&]([[maybe_unused]] auto& res) { + return std::make_unique<api::MergeBucketReply>(static_cast<const api::MergeBucketCommand&>(cmd)); + }); +} + +// ----------------------------------------------------------------- +// GetBucketDiff +// ----------------------------------------------------------------- + +namespace { + +void set_global_id(protobuf::GlobalId& dest, const document::GlobalId& src) { + static_assert(document::GlobalId::LENGTH == 12); + uint64_t lo64; + uint32_t hi32; + memcpy(&lo64, src.get(), sizeof(uint64_t)); + memcpy(&hi32, src.get() + sizeof(uint64_t), sizeof(uint32_t)); + dest.set_lo_64(lo64); + dest.set_hi_32(hi32); +} + +document::GlobalId get_global_id(const protobuf::GlobalId& src) { + static_assert(document::GlobalId::LENGTH == 12); + const uint64_t lo64 = src.lo_64(); + const uint32_t hi32 = src.hi_32(); + + char buf[document::GlobalId::LENGTH]; + memcpy(buf, &lo64, sizeof(uint64_t)); + memcpy(buf + sizeof(uint64_t), &hi32, sizeof(uint32_t)); + return document::GlobalId(buf); +} + +void set_diff_entry(protobuf::MetaDiffEntry& dest, const api::GetBucketDiffCommand::Entry& src) { + dest.set_timestamp(src._timestamp); + set_global_id(*dest.mutable_gid(), src._gid); + dest.set_header_size(src._headerSize); + dest.set_body_size(src._bodySize); + dest.set_flags(src._flags); + dest.set_presence_mask(src._hasMask); +} + +api::GetBucketDiffCommand::Entry get_diff_entry(const protobuf::MetaDiffEntry& src) { + api::GetBucketDiffCommand::Entry e; + e._timestamp = src.timestamp(); + e._gid = get_global_id(src.gid()); + e._headerSize = src.header_size(); + e._bodySize = src.body_size(); + e._flags = src.flags(); + e._hasMask = src.presence_mask(); + return e; +} + +void fill_proto_meta_diff(::google::protobuf::RepeatedPtrField<protobuf::MetaDiffEntry>& dest, + const std::vector<api::GetBucketDiffCommand::Entry>& src) { + for (const auto& diff_entry : src) { + set_diff_entry(*dest.Add(), diff_entry); + } +} + +void fill_api_meta_diff(std::vector<api::GetBucketDiffCommand::Entry>& dest, + const ::google::protobuf::RepeatedPtrField<protobuf::MetaDiffEntry>& src) { + // FIXME GetBucketDiffReply ctor copies the diff from the request for some reason + // TODO verify this isn't actually used anywhere and remove this "feature". + dest.clear(); + dest.reserve(src.size()); + for (const auto& diff_entry : src) { + dest.emplace_back(get_diff_entry(diff_entry)); + } +} + +} // anonymous namespace + +void ProtocolSerialization7::onEncode(GBBuf& buf, const api::GetBucketDiffCommand& msg) const { + encode_bucket_request<protobuf::GetBucketDiffRequest>(buf, msg, [&](auto& req) { + set_merge_nodes(*req.mutable_nodes(), msg.getNodes()); + req.set_max_timestamp(msg.getMaxTimestamp()); + fill_proto_meta_diff(*req.mutable_diff(), msg.getDiff()); + }); +} + +void ProtocolSerialization7::onEncode(GBBuf& buf, const api::GetBucketDiffReply& msg) const { + encode_bucket_response<protobuf::GetBucketDiffResponse>(buf, msg, [&](auto& res) { + fill_proto_meta_diff(*res.mutable_diff(), msg.getDiff()); + }); +} + +api::StorageCommand::UP ProtocolSerialization7::onDecodeGetBucketDiffCommand(BBuf& buf) const { + return decode_bucket_request<protobuf::GetBucketDiffRequest>(buf, [&](auto& req, auto& bucket) { + auto nodes = get_merge_nodes(req.nodes()); + auto cmd = std::make_unique<api::GetBucketDiffCommand>(bucket, std::move(nodes), req.max_timestamp()); + fill_api_meta_diff(cmd->getDiff(), req.diff()); + return cmd; + }); +} + +api::StorageReply::UP ProtocolSerialization7::onDecodeGetBucketDiffReply(const SCmd& cmd, BBuf& buf) const { + return decode_bucket_response<protobuf::GetBucketDiffResponse>(buf, [&](auto& res) { + auto reply = std::make_unique<api::GetBucketDiffReply>(static_cast<const api::GetBucketDiffCommand&>(cmd)); + fill_api_meta_diff(reply->getDiff(), res.diff()); + return reply; + }); +} + +// ----------------------------------------------------------------- +// ApplyBucketDiff +// ----------------------------------------------------------------- + +namespace { + +void fill_api_apply_diff_vector(std::vector<api::ApplyBucketDiffCommand::Entry>& diff, + const ::google::protobuf::RepeatedPtrField<protobuf::ApplyDiffEntry>& src) +{ + // We use the same approach as the legacy protocols here in that we pre-reserve and + // directly write into the vector. This avoids having to ensure all buffer management is movable. + size_t n_entries = src.size(); + diff.resize(n_entries); + for (size_t i = 0; i < n_entries; ++i) { + auto& proto_entry = src.Get(i); + auto& dest = diff[i]; + dest._entry = get_diff_entry(proto_entry.entry_meta()); + dest._docName = proto_entry.document_id(); + // TODO consider making buffers std::strings instead to avoid explicit zeroing-on-resize overhead + dest._headerBlob.resize(proto_entry.header_blob().size()); + memcpy(dest._headerBlob.data(), proto_entry.header_blob().data(), proto_entry.header_blob().size()); + dest._bodyBlob.resize(proto_entry.body_blob().size()); + memcpy(dest._bodyBlob.data(), proto_entry.body_blob().data(), proto_entry.body_blob().size()); + } +} + +void fill_proto_apply_diff_vector(::google::protobuf::RepeatedPtrField<protobuf::ApplyDiffEntry>& dest, + const std::vector<api::ApplyBucketDiffCommand::Entry>& src) +{ + dest.Reserve(src.size()); + for (const auto& entry : src) { + auto* proto_entry = dest.Add(); + set_diff_entry(*proto_entry->mutable_entry_meta(), entry._entry); + proto_entry->set_document_id(entry._docName.data(), entry._docName.size()); + proto_entry->set_header_blob(entry._headerBlob.data(), entry._headerBlob.size()); + proto_entry->set_body_blob(entry._bodyBlob.data(), entry._bodyBlob.size()); + } +} + +} // anonymous namespace + +void ProtocolSerialization7::onEncode(GBBuf& buf, const api::ApplyBucketDiffCommand& msg) const { + encode_bucket_request<protobuf::ApplyBucketDiffRequest>(buf, msg, [&](auto& req) { + set_merge_nodes(*req.mutable_nodes(), msg.getNodes()); + req.set_max_buffer_size(msg.getMaxBufferSize()); + fill_proto_apply_diff_vector(*req.mutable_entries(), msg.getDiff()); + }); +} + +void ProtocolSerialization7::onEncode(GBBuf& buf, const api::ApplyBucketDiffReply& msg) const { + encode_bucket_response<protobuf::ApplyBucketDiffResponse>(buf, msg, [&](auto& res) { + fill_proto_apply_diff_vector(*res.mutable_entries(), msg.getDiff()); + }); +} + +api::StorageCommand::UP ProtocolSerialization7::onDecodeApplyBucketDiffCommand(BBuf& buf) const { + return decode_bucket_request<protobuf::ApplyBucketDiffRequest>(buf, [&](auto& req, auto& bucket) { + auto nodes = get_merge_nodes(req.nodes()); + auto cmd = std::make_unique<api::ApplyBucketDiffCommand>(bucket, std::move(nodes), req.max_buffer_size()); + fill_api_apply_diff_vector(cmd->getDiff(), req.entries()); + return cmd; + }); +} + +api::StorageReply::UP ProtocolSerialization7::onDecodeApplyBucketDiffReply(const SCmd& cmd, BBuf& buf) const { + return decode_bucket_response<protobuf::ApplyBucketDiffResponse>(buf, [&](auto& res) { + auto reply = std::make_unique<api::ApplyBucketDiffReply>(static_cast<const api::ApplyBucketDiffCommand&>(cmd)); + fill_api_apply_diff_vector(reply->getDiff(), res.entries()); + return reply; + }); +} + +// ----------------------------------------------------------------- +// RequestBucketInfo +// ----------------------------------------------------------------- + +void ProtocolSerialization7::onEncode(GBBuf& buf, const api::RequestBucketInfoCommand& msg) const { + encode_request<protobuf::RequestBucketInfoRequest>(buf, msg, [&](auto& req) { + set_bucket_space(*req.mutable_bucket_space(), msg.getBucketSpace()); + auto& buckets = msg.getBuckets(); + if (!buckets.empty()) { + auto* proto_buckets = req.mutable_explicit_bucket_set(); + for (const auto& b : buckets) { + set_bucket_id(*proto_buckets->add_bucket_ids(), b); + } + } else { + auto* all_buckets = req.mutable_all_buckets(); + auto cluster_state = msg.getSystemState().toString(); + all_buckets->set_distributor_index(msg.getDistributor()); + all_buckets->set_cluster_state(cluster_state.data(), cluster_state.size()); + all_buckets->set_distribution_hash(msg.getDistributionHash().data(), msg.getDistributionHash().size()); + } + }); +} + +void ProtocolSerialization7::onEncode(GBBuf& buf, const api::RequestBucketInfoReply& msg) const { + encode_response<protobuf::RequestBucketInfoResponse>(buf, msg, [&](auto& res) { + auto* proto_info = res.mutable_bucket_infos(); + proto_info->Reserve(msg.getBucketInfo().size()); + for (const auto& entry : msg.getBucketInfo()) { + auto* bucket_and_info = proto_info->Add(); + bucket_and_info->set_raw_bucket_id(entry._bucketId.getRawId()); + set_bucket_info(*bucket_and_info->mutable_bucket_info(), entry._info); + } + }); +} + +api::StorageCommand::UP ProtocolSerialization7::onDecodeRequestBucketInfoCommand(BBuf& buf) const { + return decode_request<protobuf::RequestBucketInfoRequest>(buf, [&](auto& req) { + auto bucket_space = get_bucket_space(req.bucket_space()); + if (req.has_explicit_bucket_set()) { + const uint32_t n_buckets = req.explicit_bucket_set().bucket_ids_size(); + std::vector<document::BucketId> buckets(n_buckets); + const auto& proto_buckets = req.explicit_bucket_set().bucket_ids(); + for (uint32_t i = 0; i < n_buckets; ++i) { + buckets[i] = get_bucket_id(proto_buckets.Get(i)); + } + return std::make_unique<api::RequestBucketInfoCommand>(bucket_space, std::move(buckets)); + } else if (req.has_all_buckets()) { + const auto& all_req = req.all_buckets(); + return std::make_unique<api::RequestBucketInfoCommand>( + bucket_space, all_req.distributor_index(), + lib::ClusterState(all_req.cluster_state()), all_req.distribution_hash()); + } else { + throw vespalib::IllegalArgumentException("RequestBucketInfo does not have any applicable fields set"); + } + }); +} + +api::StorageReply::UP ProtocolSerialization7::onDecodeRequestBucketInfoReply(const SCmd& cmd, BBuf& buf) const { + return decode_response<protobuf::RequestBucketInfoResponse>(buf, [&](auto& res) { + auto reply = std::make_unique<api::RequestBucketInfoReply>(static_cast<const api::RequestBucketInfoCommand&>(cmd)); + auto& dest_entries = reply->getBucketInfo(); + uint32_t n_entries = res.bucket_infos_size(); + dest_entries.resize(n_entries); + for (uint32_t i = 0; i < n_entries; ++i) { + const auto& proto_entry = res.bucket_infos(i); + dest_entries[i]._bucketId = document::BucketId(proto_entry.raw_bucket_id()); + dest_entries[i]._info = get_bucket_info(proto_entry.bucket_info()); + } + return reply; + }); +} + +// ----------------------------------------------------------------- +// NotifyBucketChange +// ----------------------------------------------------------------- + +void ProtocolSerialization7::onEncode(GBBuf& buf, const api::NotifyBucketChangeCommand& msg) const { + encode_bucket_request<protobuf::NotifyBucketChangeRequest>(buf, msg, [&](auto& req) { + set_bucket_info(*req.mutable_bucket_info(), msg.getBucketInfo()); + }); +} + +void ProtocolSerialization7::onEncode(GBBuf& buf, const api::NotifyBucketChangeReply& msg) const { + encode_response<protobuf::NotifyBucketChangeResponse>(buf, msg, no_op_encode); +} + +api::StorageCommand::UP ProtocolSerialization7::onDecodeNotifyBucketChangeCommand(BBuf& buf) const { + return decode_bucket_request<protobuf::NotifyBucketChangeRequest>(buf, [&](auto& req, auto& bucket) { + auto bucket_info = get_bucket_info(req.bucket_info()); + return std::make_unique<api::NotifyBucketChangeCommand>(bucket, bucket_info); + }); +} + +api::StorageReply::UP ProtocolSerialization7::onDecodeNotifyBucketChangeReply(const SCmd& cmd, BBuf& buf) const { + return decode_response<protobuf::NotifyBucketChangeResponse>(buf, [&]([[maybe_unused]] auto& res) { + return std::make_unique<api::NotifyBucketChangeReply>(static_cast<const api::NotifyBucketChangeCommand&>(cmd)); + }); +} + +// ----------------------------------------------------------------- +// SplitBucket +// ----------------------------------------------------------------- + +void ProtocolSerialization7::onEncode(GBBuf& buf, const api::SplitBucketCommand& msg) const { + encode_bucket_request<protobuf::SplitBucketRequest>(buf, msg, [&](auto& req) { + req.set_min_split_bits(msg.getMinSplitBits()); + req.set_max_split_bits(msg.getMaxSplitBits()); + req.set_min_byte_size(msg.getMinByteSize()); + req.set_min_doc_count(msg.getMinDocCount()); + }); +} + +void ProtocolSerialization7::onEncode(GBBuf& buf, const api::SplitBucketReply& msg) const { + encode_bucket_response<protobuf::SplitBucketResponse>(buf, msg, [&](auto& res) { + for (const auto& split_info : msg.getSplitInfo()) { + auto* proto_info = res.add_split_info(); + proto_info->set_raw_bucket_id(split_info.first.getRawId()); + set_bucket_info(*proto_info->mutable_bucket_info(), split_info.second); + } + }); +} + +api::StorageCommand::UP ProtocolSerialization7::onDecodeSplitBucketCommand(BBuf& buf) const { + return decode_bucket_request<protobuf::SplitBucketRequest>(buf, [&](auto& req, auto& bucket) { + auto cmd = std::make_unique<api::SplitBucketCommand>(bucket); + cmd->setMinSplitBits(static_cast<uint8_t>(req.min_split_bits())); + cmd->setMaxSplitBits(static_cast<uint8_t>(req.max_split_bits())); + cmd->setMinByteSize(req.min_byte_size()); + cmd->setMinDocCount(req.min_doc_count()); + return cmd; + }); +} + +api::StorageReply::UP ProtocolSerialization7::onDecodeSplitBucketReply(const SCmd& cmd, BBuf& buf) const { + return decode_bucket_response<protobuf::SplitBucketResponse>(buf, [&](auto& res) { + auto reply = std::make_unique<api::SplitBucketReply>(static_cast<const api::SplitBucketCommand&>(cmd)); + auto& dest_info = reply->getSplitInfo(); + dest_info.reserve(res.split_info_size()); + for (const auto& proto_info : res.split_info()) { + dest_info.emplace_back(document::BucketId(proto_info.raw_bucket_id()), + get_bucket_info(proto_info.bucket_info())); + } + return reply; + }); +} + +// ----------------------------------------------------------------- +// JoinBuckets +// ----------------------------------------------------------------- + +void ProtocolSerialization7::onEncode(GBBuf& buf, const api::JoinBucketsCommand& msg) const { + encode_bucket_request<protobuf::JoinBucketsRequest>(buf, msg, [&](auto& req) { + for (const auto& source : msg.getSourceBuckets()) { + set_bucket_id(*req.add_source_buckets(), source); + } + req.set_min_join_bits(msg.getMinJoinBits()); + }); +} + +void ProtocolSerialization7::onEncode(GBBuf& buf, const api::JoinBucketsReply& msg) const { + encode_bucket_info_response<protobuf::JoinBucketsResponse>(buf, msg, no_op_encode); +} + +api::StorageCommand::UP ProtocolSerialization7::onDecodeJoinBucketsCommand(BBuf& buf) const { + return decode_bucket_request<protobuf::JoinBucketsRequest>(buf, [&](auto& req, auto& bucket) { + auto cmd = std::make_unique<api::JoinBucketsCommand>(bucket); + auto& entries = cmd->getSourceBuckets(); + for (const auto& proto_bucket : req.source_buckets()) { + entries.emplace_back(get_bucket_id(proto_bucket)); + } + cmd->setMinJoinBits(static_cast<uint8_t>(req.min_join_bits())); + return cmd; + }); +} + +api::StorageReply::UP ProtocolSerialization7::onDecodeJoinBucketsReply(const SCmd& cmd, BBuf& buf) const { + return decode_bucket_info_response<protobuf::JoinBucketsResponse>(buf, [&]([[maybe_unused]] auto& res) { + return std::make_unique<api::JoinBucketsReply>(static_cast<const api::JoinBucketsCommand&>(cmd)); + }); +} + +// ----------------------------------------------------------------- +// SetBucketState +// ----------------------------------------------------------------- + +void ProtocolSerialization7::onEncode(GBBuf& buf, const api::SetBucketStateCommand& msg) const { + encode_bucket_request<protobuf::SetBucketStateRequest>(buf, msg, [&](auto& req) { + auto state = (msg.getState() == api::SetBucketStateCommand::BUCKET_STATE::ACTIVE + ? protobuf::SetBucketStateRequest_BucketState_Active + : protobuf::SetBucketStateRequest_BucketState_Inactive); + req.set_state(state); + }); +} + +void ProtocolSerialization7::onEncode(GBBuf& buf, const api::SetBucketStateReply& msg) const { + // SetBucketStateReply is _technically_ a BucketInfoReply, but the legacy protocol impls + // do _not_ encode bucket info as part of the wire format (and it's not used on the distributor), + // so we follow that here and only encode remapping information. + encode_bucket_response<protobuf::SetBucketStateResponse>(buf, msg, no_op_encode); +} + +api::StorageCommand::UP ProtocolSerialization7::onDecodeSetBucketStateCommand(BBuf& buf) const { + return decode_bucket_request<protobuf::SetBucketStateRequest>(buf, [&](auto& req, auto& bucket) { + auto state = (req.state() == protobuf::SetBucketStateRequest_BucketState_Active + ? api::SetBucketStateCommand::BUCKET_STATE::ACTIVE + : api::SetBucketStateCommand::BUCKET_STATE::INACTIVE); + return std::make_unique<api::SetBucketStateCommand>(bucket, state); + }); +} + +api::StorageReply::UP ProtocolSerialization7::onDecodeSetBucketStateReply(const SCmd& cmd, BBuf& buf) const { + return decode_bucket_response<protobuf::SetBucketStateResponse>(buf, [&]([[maybe_unused]] auto& res) { + return std::make_unique<api::SetBucketStateReply>(static_cast<const api::SetBucketStateCommand&>(cmd)); + }); +} + +// ----------------------------------------------------------------- +// CreateVisitor +// ----------------------------------------------------------------- + +void ProtocolSerialization7::onEncode(GBBuf& buf, const api::CreateVisitorCommand& msg) const { + encode_request<protobuf::CreateVisitorRequest>(buf, msg, [&](auto& req) { + set_bucket_space(*req.mutable_bucket_space(), msg.getBucketSpace()); + for (const auto& bucket : msg.getBuckets()) { + set_bucket_id(*req.add_buckets(), bucket); + } + + auto* ctrl_meta = req.mutable_control_meta(); + ctrl_meta->set_library_name(msg.getLibraryName().data(), msg.getLibraryName().size()); + ctrl_meta->set_instance_id(msg.getInstanceId().data(), msg.getInstanceId().size()); + ctrl_meta->set_visitor_command_id(msg.getVisitorCmdId()); + ctrl_meta->set_control_destination(msg.getControlDestination().data(), msg.getControlDestination().size()); + ctrl_meta->set_data_destination(msg.getDataDestination().data(), msg.getDataDestination().size()); + ctrl_meta->set_queue_timeout(msg.getQueueTimeout()); + ctrl_meta->set_max_pending_reply_count(msg.getMaximumPendingReplyCount()); + ctrl_meta->set_max_buckets_per_visitor(msg.getMaxBucketsPerVisitor()); + + auto* constraints = req.mutable_constraints(); + constraints->set_document_selection(msg.getDocumentSelection().data(), msg.getDocumentSelection().size()); + constraints->set_from_time_usec(msg.getFromTime()); + constraints->set_to_time_usec(msg.getToTime()); + constraints->set_visit_inconsistent_buckets(msg.visitInconsistentBuckets()); + constraints->set_visit_removes(msg.visitRemoves()); + constraints->set_field_set(msg.getFieldSet().data(), msg.getFieldSet().size()); + + for (const auto& param : msg.getParameters()) { + auto* proto_param = req.add_client_parameters(); + proto_param->set_key(param.first.data(), param.first.size()); + proto_param->set_value(param.second.data(), param.second.size()); + } + }); +} + +void ProtocolSerialization7::onEncode(GBBuf& buf, const api::CreateVisitorReply& msg) const { + encode_response<protobuf::CreateVisitorResponse>(buf, msg, [&](auto& res) { + auto& stats = msg.getVisitorStatistics(); + auto* proto_stats = res.mutable_visitor_statistics(); + proto_stats->set_buckets_visited(stats.getBucketsVisited()); + proto_stats->set_documents_visited(stats.getDocumentsVisited()); + proto_stats->set_bytes_visited(stats.getBytesVisited()); + proto_stats->set_documents_returned(stats.getDocumentsReturned()); + proto_stats->set_bytes_returned(stats.getBytesReturned()); + proto_stats->set_second_pass_documents_returned(stats.getSecondPassDocumentsReturned()); + proto_stats->set_second_pass_bytes_returned(stats.getSecondPassBytesReturned()); + }); +} + +api::StorageCommand::UP ProtocolSerialization7::onDecodeCreateVisitorCommand(BBuf& buf) const { + return decode_request<protobuf::CreateVisitorRequest>(buf, [&](auto& req) { + auto bucket_space = get_bucket_space(req.bucket_space()); + auto& ctrl_meta = req.control_meta(); + auto& constraints = req.constraints(); + auto cmd = std::make_unique<api::CreateVisitorCommand>(bucket_space, ctrl_meta.library_name(), + ctrl_meta.instance_id(), constraints.document_selection()); + for (const auto& proto_bucket : req.buckets()) { + cmd->getBuckets().emplace_back(get_bucket_id(proto_bucket)); + } + + cmd->setVisitorCmdId(ctrl_meta.visitor_command_id()); + cmd->setControlDestination(ctrl_meta.control_destination()); + cmd->setDataDestination(ctrl_meta.data_destination()); + cmd->setMaximumPendingReplyCount(ctrl_meta.max_pending_reply_count()); + cmd->setQueueTimeout(ctrl_meta.queue_timeout()); + cmd->setMaxBucketsPerVisitor(ctrl_meta.max_buckets_per_visitor()); + cmd->setVisitorDispatcherVersion(50); // FIXME this magic number is lifted verbatim from the 5.1 protocol impl + + for (const auto& proto_param : req.client_parameters()) { + cmd->getParameters().set(proto_param.key(), proto_param.value()); + } + + cmd->setFromTime(constraints.from_time_usec()); + cmd->setToTime(constraints.to_time_usec()); + cmd->setVisitRemoves(constraints.visit_removes()); + cmd->setFieldSet(constraints.field_set()); + cmd->setVisitInconsistentBuckets(constraints.visit_inconsistent_buckets()); + return cmd; + }); +} + +api::StorageReply::UP ProtocolSerialization7::onDecodeCreateVisitorReply(const SCmd& cmd, BBuf& buf) const { + return decode_response<protobuf::CreateVisitorResponse>(buf, [&](auto& res) { + auto reply = std::make_unique<api::CreateVisitorReply>(static_cast<const api::CreateVisitorCommand&>(cmd)); + vdslib::VisitorStatistics vs; + const auto& proto_stats = res.visitor_statistics(); + vs.setBucketsVisited(proto_stats.buckets_visited()); + vs.setDocumentsVisited(proto_stats.documents_visited()); + vs.setBytesVisited(proto_stats.bytes_visited()); + vs.setDocumentsReturned(proto_stats.documents_returned()); + vs.setBytesReturned(proto_stats.bytes_returned()); + vs.setSecondPassDocumentsReturned(proto_stats.second_pass_documents_returned()); + vs.setSecondPassBytesReturned(proto_stats.second_pass_bytes_returned()); + reply->setVisitorStatistics(vs); + return reply; + }); +} + +// ----------------------------------------------------------------- +// DestroyVisitor +// ----------------------------------------------------------------- + +void ProtocolSerialization7::onEncode(GBBuf& buf, const api::DestroyVisitorCommand& msg) const { + encode_request<protobuf::DestroyVisitorRequest>(buf, msg, [&](auto& req) { + req.set_instance_id(msg.getInstanceId().data(), msg.getInstanceId().size()); + }); +} + +void ProtocolSerialization7::onEncode(GBBuf& buf, const api::DestroyVisitorReply& msg) const { + encode_response<protobuf::DestroyVisitorResponse>(buf, msg, no_op_encode); +} + +api::StorageCommand::UP ProtocolSerialization7::onDecodeDestroyVisitorCommand(BBuf& buf) const { + return decode_request<protobuf::DestroyVisitorRequest>(buf, [&](auto& req) { + return std::make_unique<api::DestroyVisitorCommand>(req.instance_id()); + }); +} + +api::StorageReply::UP ProtocolSerialization7::onDecodeDestroyVisitorReply(const SCmd& cmd, BBuf& buf) const { + return decode_response<protobuf::DestroyVisitorResponse>(buf, [&]([[maybe_unused]] auto& res) { + return std::make_unique<api::DestroyVisitorReply>(static_cast<const api::DestroyVisitorCommand&>(cmd)); + }); +} + +} // storage::mbusprot diff --git a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.h b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.h new file mode 100644 index 00000000000..f3499150278 --- /dev/null +++ b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.h @@ -0,0 +1,146 @@ +// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include "protocolserialization.h" +#include <vespa/documentapi/loadtypes/loadtypeset.h> + +namespace storage { +namespace mbusprot { + +/** + * Protocol serialization version that uses Protocol Buffers for all its binary + * encoding and decoding. + */ +class ProtocolSerialization7 : public ProtocolSerialization { + const std::shared_ptr<const document::DocumentTypeRepo> _repo; + const documentapi::LoadTypeSet& _load_types; +public: + ProtocolSerialization7(std::shared_ptr<const document::DocumentTypeRepo> repo, + const documentapi::LoadTypeSet& load_types); + + const document::DocumentTypeRepo& type_repo() const { return *_repo; } + + // Put + void onEncode(GBBuf&, const api::PutCommand&) const override; + void onEncode(GBBuf&, const api::PutReply&) const override; + SCmd::UP onDecodePutCommand(BBuf&) const override; + SRep::UP onDecodePutReply(const SCmd&, BBuf&) const override; + + // Update + void onEncode(GBBuf&, const api::UpdateCommand&) const override; + void onEncode(GBBuf&, const api::UpdateReply&) const override; + SCmd::UP onDecodeUpdateCommand(BBuf&) const override; + SRep::UP onDecodeUpdateReply(const SCmd&, BBuf&) const override; + + // Remove + void onEncode(GBBuf&, const api::RemoveCommand&) const override; + void onEncode(GBBuf&, const api::RemoveReply&) const override; + SCmd::UP onDecodeRemoveCommand(BBuf&) const override; + SRep::UP onDecodeRemoveReply(const SCmd&, BBuf&) const override; + + // Get + void onEncode(GBBuf&, const api::GetCommand&) const override; + void onEncode(GBBuf&, const api::GetReply&) const override; + SCmd::UP onDecodeGetCommand(BBuf&) const override; + SRep::UP onDecodeGetReply(const SCmd&, BBuf&) const override; + + // Revert - TODO this is deprecated, no? + void onEncode(GBBuf&, const api::RevertCommand&) const override; + void onEncode(GBBuf&, const api::RevertReply&) const override; + SCmd::UP onDecodeRevertCommand(BBuf&) const override; + SRep::UP onDecodeRevertReply(const SCmd&, BBuf&) const override; + + // DeleteBucket + void onEncode(GBBuf&, const api::DeleteBucketCommand&) const override; + void onEncode(GBBuf&, const api::DeleteBucketReply&) const override; + SCmd::UP onDecodeDeleteBucketCommand(BBuf&) const override; + SRep::UP onDecodeDeleteBucketReply(const SCmd&, BBuf&) const override; + + // CreateBucket + void onEncode(GBBuf&, const api::CreateBucketCommand&) const override; + void onEncode(GBBuf&, const api::CreateBucketReply&) const override; + SCmd::UP onDecodeCreateBucketCommand(BBuf&) const override; + SRep::UP onDecodeCreateBucketReply(const SCmd&, BBuf&) const override; + + // MergeBucket + void onEncode(GBBuf&, const api::MergeBucketCommand&) const override; + void onEncode(GBBuf&, const api::MergeBucketReply&) const override; + SCmd::UP onDecodeMergeBucketCommand(BBuf&) const override; + SRep::UP onDecodeMergeBucketReply(const SCmd&, BBuf&) const override; + + // GetBucketDiff + void onEncode(GBBuf&, const api::GetBucketDiffCommand&) const override; + void onEncode(GBBuf&, const api::GetBucketDiffReply&) const override; + SCmd::UP onDecodeGetBucketDiffCommand(BBuf&) const override; + SRep::UP onDecodeGetBucketDiffReply(const SCmd&, BBuf&) const override; + + // ApplyBucketDiff + void onEncode(GBBuf&, const api::ApplyBucketDiffCommand&) const override; + void onEncode(GBBuf&, const api::ApplyBucketDiffReply&) const override; + SCmd::UP onDecodeApplyBucketDiffCommand(BBuf&) const override; + SRep::UP onDecodeApplyBucketDiffReply(const SCmd&, BBuf&) const override; + + // RequestBucketInfo + void onEncode(GBBuf&, const api::RequestBucketInfoCommand&) const override; + void onEncode(GBBuf&, const api::RequestBucketInfoReply&) const override; + SCmd::UP onDecodeRequestBucketInfoCommand(BBuf&) const override; + SRep::UP onDecodeRequestBucketInfoReply(const SCmd&, BBuf&) const override; + + // NotifyBucketChange + void onEncode(GBBuf&, const api::NotifyBucketChangeCommand&) const override; + void onEncode(GBBuf&, const api::NotifyBucketChangeReply&) const override; + SCmd::UP onDecodeNotifyBucketChangeCommand(BBuf&) const override; + SRep::UP onDecodeNotifyBucketChangeReply(const SCmd&, BBuf&) const override; + + // SplitBucket + void onEncode(GBBuf&, const api::SplitBucketCommand&) const override; + void onEncode(GBBuf&, const api::SplitBucketReply&) const override; + SCmd::UP onDecodeSplitBucketCommand(BBuf&) const override; + SRep::UP onDecodeSplitBucketReply(const SCmd&, BBuf&) const override; + + // JoinBuckets + void onEncode(GBBuf&, const api::JoinBucketsCommand&) const override; + void onEncode(GBBuf&, const api::JoinBucketsReply&) const override; + SCmd::UP onDecodeJoinBucketsCommand(BBuf&) const override; + SRep::UP onDecodeJoinBucketsReply(const SCmd&, BBuf&) const override; + + // SetBucketState + void onEncode(GBBuf&, const api::SetBucketStateCommand&) const override; + void onEncode(GBBuf&, const api::SetBucketStateReply&) const override; + SCmd::UP onDecodeSetBucketStateCommand(BBuf&) const override; + SRep::UP onDecodeSetBucketStateReply(const SCmd&, BBuf&) const override; + + // CreateVisitor + void onEncode(GBBuf&, const api::CreateVisitorCommand&) const override; + void onEncode(GBBuf&, const api::CreateVisitorReply&) const override; + SCmd::UP onDecodeCreateVisitorCommand(BBuf&) const override; + SRep::UP onDecodeCreateVisitorReply(const SCmd&, BBuf&) const override; + + // DestroyVisitor + void onEncode(GBBuf&, const api::DestroyVisitorCommand&) const override; + void onEncode(GBBuf&, const api::DestroyVisitorReply&) const override; + SCmd::UP onDecodeDestroyVisitorCommand(BBuf&) const override; + SRep::UP onDecodeDestroyVisitorReply(const SCmd&, BBuf&) const override; + + // RemoveLocation + void onEncode(GBBuf&, const api::RemoveLocationCommand&) const override; + void onEncode(GBBuf&, const api::RemoveLocationReply&) const override; + SCmd::UP onDecodeRemoveLocationCommand(BBuf&) const override; + SRep::UP onDecodeRemoveLocationReply(const SCmd&, BBuf&) const override; + +private: + template <typename ProtobufType, typename Func> + std::unique_ptr<api::StorageCommand> decode_request(document::ByteBuffer& in_buf, Func&& f) const; + template <typename ProtobufType, typename Func> + std::unique_ptr<api::StorageReply> decode_response(document::ByteBuffer& in_buf, Func&& f) const; + template <typename ProtobufType, typename Func> + std::unique_ptr<api::StorageCommand> decode_bucket_request(document::ByteBuffer& in_buf, Func&& f) const; + template <typename ProtobufType, typename Func> + std::unique_ptr<api::StorageReply> decode_bucket_response(document::ByteBuffer& in_buf, Func&& f) const; + template <typename ProtobufType, typename Func> + std::unique_ptr<api::StorageReply> decode_bucket_info_response(document::ByteBuffer& in_buf, Func&& f) const; +}; + +} +} diff --git a/storageapi/src/vespa/storageapi/mbusprot/storageprotocol.cpp b/storageapi/src/vespa/storageapi/mbusprot/storageprotocol.cpp index 7e6be0a84f5..7bc6333762b 100644 --- a/storageapi/src/vespa/storageapi/mbusprot/storageprotocol.cpp +++ b/storageapi/src/vespa/storageapi/mbusprot/storageprotocol.cpp @@ -20,7 +20,8 @@ StorageProtocol::StorageProtocol(const std::shared_ptr<const document::DocumentT : _serializer5_0(repo, loadTypes), _serializer5_1(repo, loadTypes), _serializer5_2(repo, loadTypes), - _serializer6_0(repo, loadTypes) + _serializer6_0(repo, loadTypes), + _serializer7_0(repo, loadTypes) { } @@ -33,6 +34,7 @@ StorageProtocol::createPolicy(const mbus::string&, const mbus::string&) const } namespace { + vespalib::Version version7_0(7, 40, 5); vespalib::Version version6_0(6, 240, 0); vespalib::Version version5_2(5, 93, 30); vespalib::Version version5_1(5, 1, 0); @@ -106,8 +108,10 @@ StorageProtocol::encode(const vespalib::Version& version, } else { if (version < version6_0) { return encodeMessage(_serializer5_2, routable, message, version5_2, version); - } else { + } else if (version < version7_0) { return encodeMessage(_serializer6_0, routable, message, version6_0, version); + } else { + return encodeMessage(_serializer7_0, routable, message, version7_0, version); } } @@ -180,8 +184,10 @@ StorageProtocol::decode(const vespalib::Version & version, } else { if (version < version6_0) { return decodeMessage(_serializer5_2, data, type, version5_2, version); - } else { + } else if (version < version7_0) { return decodeMessage(_serializer6_0, data, type, version6_0, version); + } else { + return decodeMessage(_serializer7_0, data, type, version7_0, version); } } } catch (std::exception & e) { diff --git a/storageapi/src/vespa/storageapi/mbusprot/storageprotocol.h b/storageapi/src/vespa/storageapi/mbusprot/storageprotocol.h index 1acd7c9675f..67ea121c340 100644 --- a/storageapi/src/vespa/storageapi/mbusprot/storageprotocol.h +++ b/storageapi/src/vespa/storageapi/mbusprot/storageprotocol.h @@ -3,6 +3,7 @@ #include "protocolserialization5_2.h" #include "protocolserialization6_0.h" +#include "protocolserialization7.h" #include <vespa/messagebus/iprotocol.h> namespace storage::mbusprot { @@ -28,6 +29,7 @@ private: ProtocolSerialization5_1 _serializer5_1; ProtocolSerialization5_2 _serializer5_2; ProtocolSerialization6_0 _serializer6_0; + ProtocolSerialization7 _serializer7_0; }; } |