From f73b5003a0c44c558adc445c40d2b6e4aecf1b8f Mon Sep 17 00:00:00 2001 From: Tor Brede Vekterli Date: Fri, 12 Apr 2019 14:29:42 +0200 Subject: Revert "Use protocol buffers for internal StorageAPI wire encoding" --- storageapi/src/tests/CMakeLists.txt | 3 +- storageapi/src/tests/mbusprot/CMakeLists.txt | 2 +- .../src/tests/mbusprot/storageprotocoltest.cpp | 1093 ++++++++++------- storageapi/src/vespa/storageapi/CMakeLists.txt | 4 - .../src/vespa/storageapi/mbusprot/.gitignore | 3 - .../src/vespa/storageapi/mbusprot/CMakeLists.txt | 17 - .../mbusprot/legacyprotocolserialization.h | 31 - .../storageapi/mbusprot/protobuf/common.proto | 68 -- .../vespa/storageapi/mbusprot/protobuf/feed.proto | 91 -- .../storageapi/mbusprot/protobuf/maintenance.proto | 160 --- .../storageapi/mbusprot/protobuf/visiting.proto | 66 - .../vespa/storageapi/mbusprot/protobuf_includes.h | 13 - .../storageapi/mbusprot/protocolserialization.cpp | 5 + .../storageapi/mbusprot/protocolserialization.h | 34 +- .../mbusprot/protocolserialization4_2.cpp | 2 +- .../storageapi/mbusprot/protocolserialization4_2.h | 6 +- .../storageapi/mbusprot/protocolserialization5_0.h | 3 - .../storageapi/mbusprot/protocolserialization7.cpp | 1268 -------------------- .../storageapi/mbusprot/protocolserialization7.h | 146 --- .../vespa/storageapi/mbusprot/storageprotocol.cpp | 12 +- .../vespa/storageapi/mbusprot/storageprotocol.h | 2 - 21 files changed, 725 insertions(+), 2304 deletions(-) delete mode 100644 storageapi/src/vespa/storageapi/mbusprot/legacyprotocolserialization.h delete mode 100644 storageapi/src/vespa/storageapi/mbusprot/protobuf/common.proto delete mode 100644 storageapi/src/vespa/storageapi/mbusprot/protobuf/feed.proto delete mode 100644 storageapi/src/vespa/storageapi/mbusprot/protobuf/maintenance.proto delete mode 100644 storageapi/src/vespa/storageapi/mbusprot/protobuf/visiting.proto delete mode 100644 storageapi/src/vespa/storageapi/mbusprot/protobuf_includes.h delete mode 100644 storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.cpp delete mode 100644 storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.h (limited to 'storageapi') diff --git a/storageapi/src/tests/CMakeLists.txt b/storageapi/src/tests/CMakeLists.txt index ddc43c70004..ebbf3b8357a 100644 --- a/storageapi/src/tests/CMakeLists.txt +++ b/storageapi/src/tests/CMakeLists.txt @@ -7,7 +7,6 @@ vespa_add_executable(storageapi_gtest_runner_app TEST gtest_runner.cpp DEPENDS storageapi_testbuckets - storageapi_testmbusprot storageapi gtest ) @@ -23,8 +22,8 @@ vespa_add_executable(storageapi_testrunner_app TEST testrunner.cpp DEPENDS storageapi_testmessageapi + storageapi_testmbusprot storageapi - vdstestlib ) vespa_add_test( diff --git a/storageapi/src/tests/mbusprot/CMakeLists.txt b/storageapi/src/tests/mbusprot/CMakeLists.txt index 2801c9a91dd..16ced76155c 100644 --- a/storageapi/src/tests/mbusprot/CMakeLists.txt +++ b/storageapi/src/tests/mbusprot/CMakeLists.txt @@ -4,5 +4,5 @@ vespa_add_library(storageapi_testmbusprot storageprotocoltest.cpp DEPENDS storageapi - gtest + vdstestlib ) diff --git a/storageapi/src/tests/mbusprot/storageprotocoltest.cpp b/storageapi/src/tests/mbusprot/storageprotocoltest.cpp index 8690d89e12b..f634667afd5 100644 --- a/storageapi/src/tests/mbusprot/storageprotocoltest.cpp +++ b/storageapi/src/tests/mbusprot/storageprotocoltest.cpp @@ -14,16 +14,12 @@ #include #include #include +#include #include #include - #include #include -#include - -using namespace ::testing; - using std::shared_ptr; using document::BucketSpace; using document::ByteBuffer; @@ -36,103 +32,183 @@ using document::test::makeBucketSpace; using storage::lib::ClusterState; using vespalib::string; -namespace vespalib { - -// Needed for GTest to properly understand how to print Version values. -// If not present, it will print the byte values of the presumed memory area -// (which will be overrun for some reason, causing Valgrind to scream at us). -void PrintTo(const vespalib::Version& v, std::ostream* os) { - *os << v.toString(); -} - -} - -namespace storage::api { +namespace storage { +namespace api { -struct StorageProtocolTest : TestWithParam { +struct StorageProtocolTest : public CppUnit::TestFixture { document::TestDocMan _docMan; document::Document::SP _testDoc; document::DocumentId _testDocId; - document::BucketId _bucket_id; document::Bucket _bucket; - document::BucketId _dummy_remap_bucket{17, 12345}; - BucketInfo _dummy_bucket_info{1,2,3,4,5, true, false, 48}; + vespalib::Version _version5_0{5, 0, 12}; + vespalib::Version _version5_1{5, 1, 0}; + vespalib::Version _version5_2{5, 93, 30}; + vespalib::Version _version6_0{6, 240, 0}; documentapi::LoadTypeSet _loadTypes; mbusprot::StorageProtocol _protocol; + static std::vector _nonVerboseMessageStrings; + static std::vector _verboseMessageStrings; + static std::vector _serialization50; static auto constexpr CONDITION_STRING = "There's just one condition"; StorageProtocolTest() : _docMan(), _testDoc(_docMan.createDocument()), _testDocId(_testDoc->getId()), - _bucket_id(16, 0x51), - _bucket(makeDocumentBucket(_bucket_id)), + _bucket(makeDocumentBucket(document::BucketId(16, 0x51))), _protocol(_docMan.getTypeRepoSP(), _loadTypes) { _loadTypes.addLoadType(34, "foo", documentapi::Priority::PRI_NORMAL_2); } - ~StorageProtocolTest(); - - void set_dummy_bucket_info_reply_fields(BucketInfoReply& reply) { - reply.setBucketInfo(_dummy_bucket_info); - reply.remapBucketId(_dummy_remap_bucket); - } - - void assert_bucket_info_reply_fields_propagated(const BucketInfoReply& reply) { - EXPECT_EQ(_dummy_bucket_info, reply.getBucketInfo()); - EXPECT_TRUE(reply.hasBeenRemapped()); - EXPECT_EQ(_dummy_remap_bucket, reply.getBucketId()); - EXPECT_EQ(_bucket_id, reply.getOriginalBucketId()); - } template - std::shared_ptr copyCommand(const std::shared_ptr&); + std::shared_ptr copyCommand(const std::shared_ptr&, vespalib::Version); template std::shared_ptr copyReply(const std::shared_ptr&); + void recordOutput(const api::StorageMessage& msg); + + void recordSerialization50(); + + void testWriteSerialization50(); + void testAddress50(); + void testStringOutputs(); + + void testPut51(); + void testUpdate51(); + void testGet51(); + void testRemove51(); + void testRevert51(); + void testRequestBucketInfo51(); + void testNotifyBucketChange51(); + void testCreateBucket51(); + void testDeleteBucket51(); + void testMergeBucket51(); + void testGetBucketDiff51(); + void testApplyBucketDiff51(); + void testSplitBucket51(); + void testSplitBucketChain51(); + void testJoinBuckets51(); + void testCreateVisitor51(); + void testDestroyVisitor51(); + void testRemoveLocation51(); + void testInternalMessage(); + void testSetBucketState51(); + + void testPutCommand52(); + void testUpdateCommand52(); + void testRemoveCommand52(); + + void testPutCommandWithBucketSpace6_0(); + void testCreateVisitorWithBucketSpace6_0(); + void testRequestBucketInfoWithBucketSpace6_0(); + + void serialized_size_is_used_to_set_approx_size_of_storage_message(); + + CPPUNIT_TEST_SUITE(StorageProtocolTest); + + // Enable to see string outputs of messages + // CPPUNIT_TEST_DISABLED(testStringOutputs); + + // Enable this to write 5.0 serialization to disk + // CPPUNIT_TEST_DISABLED(testWriteSerialization50); + // CPPUNIT_TEST_DISABLED(testAddress50); + + // 5.1 tests + CPPUNIT_TEST(testPut51); + CPPUNIT_TEST(testUpdate51); + CPPUNIT_TEST(testGet51); + CPPUNIT_TEST(testRemove51); + CPPUNIT_TEST(testRevert51); + CPPUNIT_TEST(testRequestBucketInfo51); + CPPUNIT_TEST(testNotifyBucketChange51); + CPPUNIT_TEST(testCreateBucket51); + CPPUNIT_TEST(testDeleteBucket51); + CPPUNIT_TEST(testMergeBucket51); + CPPUNIT_TEST(testGetBucketDiff51); + CPPUNIT_TEST(testApplyBucketDiff51); + CPPUNIT_TEST(testSplitBucket51); + CPPUNIT_TEST(testJoinBuckets51); + CPPUNIT_TEST(testCreateVisitor51); + CPPUNIT_TEST(testDestroyVisitor51); + CPPUNIT_TEST(testRemoveLocation51); + CPPUNIT_TEST(testInternalMessage); + CPPUNIT_TEST(testSetBucketState51); + + // 5.2 tests + CPPUNIT_TEST(testPutCommand52); + CPPUNIT_TEST(testUpdateCommand52); + CPPUNIT_TEST(testRemoveCommand52); + + // 6.0 tests + CPPUNIT_TEST(testPutCommandWithBucketSpace6_0); + CPPUNIT_TEST(testCreateVisitorWithBucketSpace6_0); + CPPUNIT_TEST(testRequestBucketInfoWithBucketSpace6_0); + + CPPUNIT_TEST(serialized_size_is_used_to_set_approx_size_of_storage_message); + + CPPUNIT_TEST_SUITE_END(); }; -StorageProtocolTest::~StorageProtocolTest() = default; +CPPUNIT_TEST_SUITE_REGISTRATION(StorageProtocolTest); -namespace { +std::vector StorageProtocolTest::_nonVerboseMessageStrings; +std::vector StorageProtocolTest::_verboseMessageStrings; +std::vector StorageProtocolTest::_serialization50; -std::string version_as_gtest_string(TestParamInfo info) { - std::ostringstream ss; - auto& p = info.param; - // Dots are not allowed in test names, so convert to underscores. - ss << p.getMajor() << '_' << p.getMinor() << '_' << p.getMicro(); - return ss.str(); +void +StorageProtocolTest::recordOutput(const api::StorageMessage& msg) +{ + std::ostringstream ost; + ost << " "; + msg.print(ost, false, " "); + _nonVerboseMessageStrings.push_back(ost.str()); + ost.str(""); + ost << " "; + msg.print(ost, true, " "); + _verboseMessageStrings.push_back(ost.str()); } -} +namespace { + + bool debug = false; + + struct ScopedName { + std::string _name; + + ScopedName(const std::string& s) : _name(s) { + if (debug) std::cerr << "Starting test " << _name << "\n"; + } + ~ScopedName() { + if (debug) std::cerr << "Finished test " << _name << "\n"; + } + }; -// TODO replace with INSTANTIATE_TEST_SUITE_P on newer gtest versions -INSTANTIATE_TEST_CASE_P(MultiVersionTest, StorageProtocolTest, - Values(vespalib::Version(6, 240, 0), - vespalib::Version(7, 40, 5)), - version_as_gtest_string); +} // Anonymous namespace namespace { mbus::Message::UP lastCommand; mbus::Reply::UP lastReply; } -TEST_F(StorageProtocolTest, testAddress50) { +void +StorageProtocolTest::testAddress50() +{ StorageMessageAddress address("foo", lib::NodeType::STORAGE, 3); - EXPECT_EQ(vespalib::string("storage/cluster.foo/storage/3/default"), + CPPUNIT_ASSERT_EQUAL(vespalib::string("storage/cluster.foo/storage/3/default"), address.getRoute().toString()); } template std::shared_ptr -StorageProtocolTest::copyCommand(const std::shared_ptr& m) +StorageProtocolTest::copyCommand(const std::shared_ptr& m, vespalib::Version version) { - auto mbusMessage = std::make_unique(m); - auto version = GetParam(); + mbus::Message::UP mbusMessage(new mbusprot::StorageCommand(m)); mbus::Blob blob = _protocol.encode(version, *mbusMessage); mbus::Routable::UP copy(_protocol.decode(version, blob)); - assert(copy.get()); - auto* copy2 = dynamic_cast(copy.get()); - assert(copy2 != nullptr); + CPPUNIT_ASSERT(copy.get()); + + mbusprot::StorageCommand* copy2(dynamic_cast(copy.get())); + CPPUNIT_ASSERT(copy2 != 0); StorageCommand::SP internalMessage(copy2->getCommand()); lastCommand = std::move(mbusMessage); @@ -143,61 +219,80 @@ StorageProtocolTest::copyCommand(const std::shared_ptr& m) template std::shared_ptr StorageProtocolTest::copyReply(const std::shared_ptr& m) { - auto mbusMessage = std::make_unique(m); - auto version = GetParam(); - mbus::Blob blob = _protocol.encode(version, *mbusMessage); - mbus::Routable::UP copy(_protocol.decode(version, blob)); - assert(copy.get()); - - auto* copy2 = dynamic_cast(copy.get()); - assert(copy2 != nullptr); - + mbus::Reply::UP mbusMessage(new mbusprot::StorageReply(m)); + mbus::Blob blob = _protocol.encode(_version5_1, *mbusMessage); + mbus::Routable::UP copy(_protocol.decode(_version5_1, blob)); + CPPUNIT_ASSERT(copy.get()); + mbusprot::StorageReply* copy2( + dynamic_cast(copy.get())); + CPPUNIT_ASSERT(copy2 != 0); copy2->setMessage(std::move(lastCommand)); - auto internalMessage = copy2->getReply(); + StorageReply::SP internalMessage(copy2->getReply()); lastReply = std::move(mbusMessage); lastCommand = copy2->getMessage(); return std::dynamic_pointer_cast(internalMessage); } -TEST_P(StorageProtocolTest, put) { - auto cmd = std::make_shared(_bucket, _testDoc, 14); +void +StorageProtocolTest::recordSerialization50() +{ + assert(lastCommand.get()); + assert(lastReply.get()); + for (uint32_t j=0; j<2; ++j) { + mbusprot::StorageMessage& msg(j == 0 + ? dynamic_cast(*lastCommand) + : dynamic_cast(*lastReply)); + msg.getInternalMessage()->forceMsgId(0); + mbus::Routable& routable(j == 0 + ? dynamic_cast(*lastCommand) + : dynamic_cast(*lastReply)); + mbus::Blob blob = _protocol.encode(_version5_0, routable); + _serialization50.push_back('\n'); + std::string type(msg.getInternalMessage()->getType().toString()); + for (uint32_t i=0, n=type.size(); isetUpdateTimestamp(Timestamp(13)); cmd->setLoadType(_loadTypes["foo"]); - auto cmd2 = copyCommand(cmd); - EXPECT_EQ(_bucket, cmd2->getBucket()); - EXPECT_EQ(*_testDoc, *cmd2->getDocument()); - EXPECT_EQ(vespalib::string("foo"), cmd2->getLoadType().getName()); - EXPECT_EQ(Timestamp(14), cmd2->getTimestamp()); - EXPECT_EQ(Timestamp(13), cmd2->getUpdateTimestamp()); - - auto reply = std::make_shared(*cmd2); - ASSERT_TRUE(reply->hasDocument()); - EXPECT_EQ(*_testDoc, *reply->getDocument()); - set_dummy_bucket_info_reply_fields(*reply); - auto reply2 = copyReply(reply); - ASSERT_TRUE(reply2->hasDocument()); - EXPECT_EQ(*_testDoc, *reply->getDocument()); - EXPECT_EQ(_testDoc->getId(), reply2->getDocumentId()); - EXPECT_EQ(Timestamp(14), reply2->getTimestamp()); - EXPECT_NO_FATAL_FAILURE(assert_bucket_info_reply_fields_propagated(*reply2)); -} - -TEST_P(StorageProtocolTest, response_without_remapped_bucket_preserves_original_bucket) { - auto cmd = std::make_shared(_bucket, _testDoc, 14); - auto cmd2 = copyCommand(cmd); - auto reply = std::make_shared(*cmd2); - auto reply2 = copyReply(reply); - - EXPECT_FALSE(reply2->hasBeenRemapped()); - EXPECT_EQ(_bucket_id, reply2->getBucketId()); - EXPECT_EQ(document::BucketId(), reply2->getOriginalBucketId()); - -} - -TEST_P(StorageProtocolTest, update) { - auto update = std::make_shared( - _docMan.getTypeRepo(), *_testDoc->getDataType(), _testDoc->getId()); - auto assignUpdate = std::make_shared(document::IntFieldValue(17)); + PutCommand::SP cmd2(copyCommand(cmd, _version5_1)); + CPPUNIT_ASSERT_EQUAL(*_testDoc, *cmd2->getDocument()); + CPPUNIT_ASSERT_EQUAL(vespalib::string("foo"), cmd2->getLoadType().getName()); + CPPUNIT_ASSERT_EQUAL(Timestamp(14), cmd2->getTimestamp()); + CPPUNIT_ASSERT_EQUAL(Timestamp(13), cmd2->getUpdateTimestamp()); + + PutReply::SP reply(new PutReply(*cmd2)); + CPPUNIT_ASSERT(reply->hasDocument()); + CPPUNIT_ASSERT_EQUAL(*_testDoc, *reply->getDocument()); + PutReply::SP reply2(copyReply(reply)); + CPPUNIT_ASSERT(reply2->hasDocument()); + CPPUNIT_ASSERT_EQUAL(*_testDoc, *reply->getDocument()); + CPPUNIT_ASSERT_EQUAL(_testDoc->getId(), reply2->getDocumentId()); + CPPUNIT_ASSERT_EQUAL(Timestamp(14), reply2->getTimestamp()); + + recordOutput(*cmd2); + recordOutput(*reply2); + recordSerialization50(); +} + +void +StorageProtocolTest::testUpdate51() +{ + ScopedName test("testUpdate51"); + document::DocumentUpdate::SP update(new document::DocumentUpdate(_docMan.getTypeRepo(), *_testDoc->getDataType(), _testDoc->getId())); + std::shared_ptr assignUpdate(new document::AssignValueUpdate(document::IntFieldValue(17))); document::FieldUpdate fieldUpdate(_testDoc->getField("headerval")); fieldUpdate.addUpdate(*assignUpdate); update->addUpdate(fieldUpdate); @@ -205,157 +300,217 @@ TEST_P(StorageProtocolTest, update) { update->addFieldPathUpdate(document::FieldPathUpdate::CP( new document::RemoveFieldPathUpdate("headerval", "testdoctype1.headerval > 0"))); - auto cmd = std::make_shared(_bucket, update, 14); - EXPECT_EQ(Timestamp(0), cmd->getOldTimestamp()); + UpdateCommand::SP cmd(new UpdateCommand(_bucket, update, 14)); + CPPUNIT_ASSERT_EQUAL(Timestamp(0), cmd->getOldTimestamp()); cmd->setOldTimestamp(10); - auto cmd2 = copyCommand(cmd); - EXPECT_EQ(_bucket, cmd2->getBucket()); - EXPECT_EQ(_testDocId, cmd2->getDocumentId()); - EXPECT_EQ(Timestamp(14), cmd2->getTimestamp()); - EXPECT_EQ(Timestamp(10), cmd2->getOldTimestamp()); - EXPECT_EQ(*update, *cmd2->getUpdate()); - - auto reply = std::make_shared(*cmd2, 8); - set_dummy_bucket_info_reply_fields(*reply); - auto reply2 = copyReply(reply); - EXPECT_EQ(_testDocId, reply2->getDocumentId()); - EXPECT_EQ(Timestamp(14), reply2->getTimestamp()); - EXPECT_EQ(Timestamp(8), reply->getOldTimestamp()); - EXPECT_NO_FATAL_FAILURE(assert_bucket_info_reply_fields_propagated(*reply2)); -} - -TEST_P(StorageProtocolTest, get) { - auto cmd = std::make_shared(_bucket, _testDocId, "foo,bar,vekterli", 123); - auto cmd2 = copyCommand(cmd); - EXPECT_EQ(_bucket, cmd2->getBucket()); - EXPECT_EQ(_testDocId, cmd2->getDocumentId()); - EXPECT_EQ(Timestamp(123), cmd2->getBeforeTimestamp()); - EXPECT_EQ(vespalib::string("foo,bar,vekterli"), cmd2->getFieldSet()); - - auto reply = std::make_shared(*cmd2, _testDoc, 100); - set_dummy_bucket_info_reply_fields(*reply); - auto reply2 = copyReply(reply); - ASSERT_TRUE(reply2.get() != nullptr); - ASSERT_TRUE(reply2->getDocument().get() != nullptr); - EXPECT_EQ(*_testDoc, *reply2->getDocument()); - EXPECT_EQ(_testDoc->getId(), reply2->getDocumentId()); - EXPECT_EQ(Timestamp(123), reply2->getBeforeTimestamp()); - EXPECT_EQ(Timestamp(100), reply2->getLastModifiedTimestamp()); - EXPECT_NO_FATAL_FAILURE(assert_bucket_info_reply_fields_propagated(*reply2)); -} - -TEST_P(StorageProtocolTest, remove) { - auto cmd = std::make_shared(_bucket, _testDocId, 159); - auto cmd2 = copyCommand(cmd); - EXPECT_EQ(_bucket, cmd2->getBucket()); - EXPECT_EQ(_testDocId, cmd2->getDocumentId()); - EXPECT_EQ(Timestamp(159), cmd2->getTimestamp()); - - auto reply = std::make_shared(*cmd2, 48); - set_dummy_bucket_info_reply_fields(*reply); - - auto reply2 = copyReply(reply); - EXPECT_EQ(_testDocId, reply2->getDocumentId()); - EXPECT_EQ(Timestamp(159), reply2->getTimestamp()); - EXPECT_EQ(Timestamp(48), reply2->getOldTimestamp()); - EXPECT_NO_FATAL_FAILURE(assert_bucket_info_reply_fields_propagated(*reply2)); -} - -TEST_P(StorageProtocolTest, revert) { + UpdateCommand::SP cmd2(copyCommand(cmd, _version5_1)); + CPPUNIT_ASSERT_EQUAL(_testDocId, cmd2->getDocumentId()); + CPPUNIT_ASSERT_EQUAL(Timestamp(14), cmd2->getTimestamp()); + CPPUNIT_ASSERT_EQUAL(Timestamp(10), cmd2->getOldTimestamp()); + CPPUNIT_ASSERT_EQUAL(*update, *cmd2->getUpdate()); + + UpdateReply::SP reply(new UpdateReply(*cmd2, 8)); + UpdateReply::SP reply2(copyReply(reply)); + CPPUNIT_ASSERT_EQUAL(_testDocId, reply2->getDocumentId()); + CPPUNIT_ASSERT_EQUAL(Timestamp(14), reply2->getTimestamp()); + CPPUNIT_ASSERT_EQUAL(Timestamp(8), reply->getOldTimestamp()); + + recordOutput(*cmd2); + recordOutput(*reply2); + recordSerialization50(); +} + +void +StorageProtocolTest::testGet51() +{ + ScopedName test("testGet51"); + GetCommand::SP cmd(new GetCommand(_bucket, _testDocId, "foo,bar,vekterli", 123)); + GetCommand::SP cmd2(copyCommand(cmd, _version5_1)); + CPPUNIT_ASSERT_EQUAL(_testDocId, cmd2->getDocumentId()); + CPPUNIT_ASSERT_EQUAL(Timestamp(123), cmd2->getBeforeTimestamp()); + CPPUNIT_ASSERT_EQUAL(vespalib::string("foo,bar,vekterli"), cmd2->getFieldSet()); + + GetReply::SP reply(new GetReply(*cmd2, _testDoc, 100)); + GetReply::SP reply2(copyReply(reply)); + CPPUNIT_ASSERT(reply2.get()); + CPPUNIT_ASSERT(reply2->getDocument().get()); + CPPUNIT_ASSERT_EQUAL(*_testDoc, *reply2->getDocument()); + CPPUNIT_ASSERT_EQUAL(_testDoc->getId(), reply2->getDocumentId()); + CPPUNIT_ASSERT_EQUAL(Timestamp(123), reply2->getBeforeTimestamp()); + CPPUNIT_ASSERT_EQUAL(Timestamp(100), reply2->getLastModifiedTimestamp()); + + recordOutput(*cmd2); + recordOutput(*reply2); + recordSerialization50(); +} + +void +StorageProtocolTest::testRemove51() +{ + ScopedName test("testRemove51"); + RemoveCommand::SP cmd(new RemoveCommand(_bucket, _testDocId, 159)); + RemoveCommand::SP cmd2(copyCommand(cmd, _version5_1)); + CPPUNIT_ASSERT_EQUAL(_testDocId, cmd2->getDocumentId()); + CPPUNIT_ASSERT_EQUAL(Timestamp(159), cmd2->getTimestamp()); + + RemoveReply::SP reply(new RemoveReply(*cmd2, 48)); + reply->setBucketInfo(BucketInfo(1,2,3,4,5, true, false, 48)); + + RemoveReply::SP reply2(copyReply(reply)); + CPPUNIT_ASSERT_EQUAL(_testDocId, reply2->getDocumentId()); + CPPUNIT_ASSERT_EQUAL(Timestamp(159), reply2->getTimestamp()); + CPPUNIT_ASSERT_EQUAL(Timestamp(48), reply2->getOldTimestamp()); + CPPUNIT_ASSERT_EQUAL(BucketInfo(1,2,3,4,5, true, false, 48), + reply2->getBucketInfo()); + + recordOutput(*cmd2); + recordOutput(*reply2); + recordSerialization50(); +} + +void +StorageProtocolTest::testRevert51() +{ + ScopedName test("testRevertCommand51"); std::vector tokens; tokens.push_back(59); - auto cmd = std::make_shared(_bucket, tokens); - auto cmd2 = copyCommand(cmd); - EXPECT_EQ(_bucket, cmd2->getBucket()); - EXPECT_EQ(tokens, cmd2->getRevertTokens()); + RevertCommand::SP cmd(new RevertCommand(_bucket, tokens)); + RevertCommand::SP cmd2(copyCommand(cmd, _version5_1)); + CPPUNIT_ASSERT_EQUAL(tokens, cmd2->getRevertTokens()); + + RevertReply::SP reply(new RevertReply(*cmd2)); + BucketInfo info(0x12345432, 101, 520); + reply->setBucketInfo(info); + RevertReply::SP reply2(copyReply(reply)); - auto reply = std::make_shared(*cmd2); - set_dummy_bucket_info_reply_fields(*reply); - auto reply2 = copyReply(reply); - EXPECT_NO_FATAL_FAILURE(assert_bucket_info_reply_fields_propagated(*reply2)); + CPPUNIT_ASSERT_EQUAL(info, reply2->getBucketInfo()); + + recordOutput(*cmd2); + recordOutput(*reply2); + recordSerialization50(); } -TEST_P(StorageProtocolTest, request_bucket_info) { +void +StorageProtocolTest::testRequestBucketInfo51() +{ + ScopedName test("testRequestBucketInfo51"); { std::vector ids; ids.push_back(document::BucketId(3)); ids.push_back(document::BucketId(7)); - auto cmd = std::make_shared(makeBucketSpace(), ids); - auto cmd2 = copyCommand(cmd); - EXPECT_EQ(ids, cmd2->getBuckets()); - EXPECT_FALSE(cmd2->hasSystemState()); + RequestBucketInfoCommand::SP cmd(new RequestBucketInfoCommand(makeBucketSpace(), ids)); + RequestBucketInfoCommand::SP cmd2(copyCommand(cmd, _version5_1)); + CPPUNIT_ASSERT_EQUAL(ids, cmd2->getBuckets()); + CPPUNIT_ASSERT(!cmd2->hasSystemState()); + + recordOutput(*cmd2); } { ClusterState state("distributor:3 .1.s:d"); - auto cmd = std::make_shared(makeBucketSpace(), 3, state, "14"); - auto cmd2 = copyCommand(cmd); - ASSERT_TRUE(cmd2->hasSystemState()); - EXPECT_EQ(uint16_t(3), cmd2->getDistributor()); - EXPECT_EQ(state, cmd2->getSystemState()); - EXPECT_EQ(size_t(0), cmd2->getBuckets().size()); - - auto reply = std::make_shared(*cmd); + RequestBucketInfoCommand::SP cmd(new RequestBucketInfoCommand( + makeBucketSpace(), + 3, state, "14")); + RequestBucketInfoCommand::SP cmd2(copyCommand(cmd, _version5_1)); + CPPUNIT_ASSERT(cmd2->hasSystemState()); + CPPUNIT_ASSERT_EQUAL(uint16_t(3), cmd2->getDistributor()); + CPPUNIT_ASSERT_EQUAL(state, cmd2->getSystemState()); + CPPUNIT_ASSERT_EQUAL(size_t(0), cmd2->getBuckets().size()); + + RequestBucketInfoReply::SP reply(new RequestBucketInfoReply(*cmd)); RequestBucketInfoReply::Entry e; e._bucketId = document::BucketId(4); const uint64_t lastMod = 0x1337cafe98765432ULL; e._info = BucketInfo(43, 24, 123, 44, 124, false, true, lastMod); reply->getBucketInfo().push_back(e); - auto reply2 = copyReply(reply); - EXPECT_EQ(size_t(1), reply2->getBucketInfo().size()); + RequestBucketInfoReply::SP reply2(copyReply(reply)); + CPPUNIT_ASSERT_EQUAL(size_t(1), reply2->getBucketInfo().size()); auto& entries(reply2->getBucketInfo()); - EXPECT_EQ(e, entries[0]); + CPPUNIT_ASSERT_EQUAL(e, entries[0]); // "Last modified" not counted by operator== for some reason. Testing // separately until we can figure out if this is by design or not. - EXPECT_EQ(lastMod, entries[0]._info.getLastModified()); + CPPUNIT_ASSERT_EQUAL(lastMod, entries[0]._info.getLastModified()); + + recordOutput(*cmd2); + recordOutput(*reply2); + recordSerialization50(); } } -TEST_P(StorageProtocolTest, notify_bucket_change) { - auto cmd = std::make_shared(_bucket, _dummy_bucket_info); - auto cmd2 = copyCommand(cmd); - EXPECT_EQ(_bucket, cmd2->getBucket()); - EXPECT_EQ(_dummy_bucket_info, cmd2->getBucketInfo()); - - auto reply = std::make_shared(*cmd); - auto reply2 = copyReply(reply); -} +void +StorageProtocolTest::testNotifyBucketChange51() +{ + ScopedName test("testNotifyBucketChange51"); + BucketInfo info(2, 3, 4); + document::BucketId modifiedBucketId(20, 1000); + document::Bucket modifiedBucket(makeDocumentBucket(modifiedBucketId)); + NotifyBucketChangeCommand::SP cmd(new NotifyBucketChangeCommand( + modifiedBucket, info)); + NotifyBucketChangeCommand::SP cmd2(copyCommand(cmd, _version5_1)); + CPPUNIT_ASSERT_EQUAL(document::BucketId(20, 1000), + cmd2->getBucketId()); + CPPUNIT_ASSERT_EQUAL(info, cmd2->getBucketInfo()); + + NotifyBucketChangeReply::SP reply(new NotifyBucketChangeReply(*cmd)); + NotifyBucketChangeReply::SP reply2(copyReply(reply)); + + recordOutput(*cmd2); + recordOutput(*reply2); + recordSerialization50(); +} + +void +StorageProtocolTest::testCreateBucket51() +{ + ScopedName test("testCreateBucket51"); + document::BucketId bucketId(623); + document::Bucket bucket(makeDocumentBucket(bucketId)); -TEST_P(StorageProtocolTest, create_bucket_without_activation) { - auto cmd = std::make_shared(_bucket); - EXPECT_FALSE(cmd->getActive()); - auto cmd2 = copyCommand(cmd); - EXPECT_EQ(_bucket, cmd2->getBucket()); - EXPECT_FALSE(cmd2->getActive()); + CreateBucketCommand::SP cmd(new CreateBucketCommand(bucket)); + CreateBucketCommand::SP cmd2(copyCommand(cmd, _version5_1)); + CPPUNIT_ASSERT_EQUAL(bucketId, cmd2->getBucketId()); - auto reply = std::make_shared(*cmd); - set_dummy_bucket_info_reply_fields(*reply); - auto reply2 = copyReply(reply); - EXPECT_NO_FATAL_FAILURE(assert_bucket_info_reply_fields_propagated(*reply2)); -} + CreateBucketReply::SP reply(new CreateBucketReply(*cmd)); + CreateBucketReply::SP reply2(copyReply(reply)); + CPPUNIT_ASSERT_EQUAL(bucketId, reply2->getBucketId()); -TEST_P(StorageProtocolTest, create_bucket_propagates_activation_flag) { - auto cmd = std::make_shared(_bucket); - cmd->setActive(true); - auto cmd2 = copyCommand(cmd); - EXPECT_TRUE(cmd2->getActive()); + recordOutput(*cmd2); + recordOutput(*reply2); + recordSerialization50(); } -TEST_P(StorageProtocolTest, delete_bucket) { - auto cmd = std::make_shared(_bucket); - cmd->setBucketInfo(_dummy_bucket_info); - auto cmd2 = copyCommand(cmd); - EXPECT_EQ(_bucket, cmd2->getBucket()); - EXPECT_EQ(_dummy_bucket_info, cmd2->getBucketInfo()); - - auto reply = std::make_shared(*cmd); +void +StorageProtocolTest::testDeleteBucket51() +{ + ScopedName test("testDeleteBucket51"); + document::BucketId bucketId(623); + document::Bucket bucket(makeDocumentBucket(bucketId)); + + DeleteBucketCommand::SP cmd(new DeleteBucketCommand(bucket)); + BucketInfo info(0x100, 200, 300); + cmd->setBucketInfo(info); + DeleteBucketCommand::SP cmd2(copyCommand(cmd, _version5_1)); + CPPUNIT_ASSERT_EQUAL(bucketId, cmd2->getBucketId()); + CPPUNIT_ASSERT_EQUAL(info, cmd2->getBucketInfo()); + + DeleteBucketReply::SP reply(new DeleteBucketReply(*cmd)); // Not set automatically by constructor reply->setBucketInfo(cmd2->getBucketInfo()); - auto reply2 = copyReply(reply); - EXPECT_EQ(_bucket_id, reply2->getBucketId()); - EXPECT_EQ(_dummy_bucket_info, reply2->getBucketInfo()); + DeleteBucketReply::SP reply2(copyReply(reply)); + CPPUNIT_ASSERT_EQUAL(bucketId, reply2->getBucketId()); + CPPUNIT_ASSERT_EQUAL(info, reply2->getBucketInfo()); + + recordOutput(*cmd2); + recordOutput(*reply2); + recordSerialization50(); } -TEST_P(StorageProtocolTest, merge_bucket) { +void +StorageProtocolTest::testMergeBucket51() +{ + ScopedName test("testMergeBucket51"); + document::BucketId bucketId(623); + document::Bucket bucket(makeDocumentBucket(bucketId)); + typedef api::MergeBucketCommand::Node Node; std::vector nodes; nodes.push_back(Node(4, false)); @@ -367,98 +522,152 @@ TEST_P(StorageProtocolTest, merge_bucket) { chain.push_back(7); chain.push_back(14); - auto cmd = std::make_shared(_bucket, nodes, Timestamp(1234), 567, chain); - auto cmd2 = copyCommand(cmd); - EXPECT_EQ(_bucket, cmd2->getBucket()); - EXPECT_EQ(nodes, cmd2->getNodes()); - EXPECT_EQ(Timestamp(1234), cmd2->getMaxTimestamp()); - EXPECT_EQ(uint32_t(567), cmd2->getClusterStateVersion()); - EXPECT_EQ(chain, cmd2->getChain()); - - auto reply = std::make_shared(*cmd); - auto reply2 = copyReply(reply); - EXPECT_EQ(_bucket_id, reply2->getBucketId()); - EXPECT_EQ(nodes, reply2->getNodes()); - EXPECT_EQ(Timestamp(1234), reply2->getMaxTimestamp()); - EXPECT_EQ(uint32_t(567), reply2->getClusterStateVersion()); - EXPECT_EQ(chain, reply2->getChain()); -} - -TEST_P(StorageProtocolTest, split_bucket) { - auto cmd = std::make_shared(_bucket); - EXPECT_EQ(0u, cmd->getMinSplitBits()); - EXPECT_EQ(58u, cmd->getMaxSplitBits()); - EXPECT_EQ(std::numeric_limits().max(), cmd->getMinByteSize()); - EXPECT_EQ(std::numeric_limits().max(), cmd->getMinDocCount()); + MergeBucketCommand::SP cmd( + new MergeBucketCommand(bucket, nodes, Timestamp(1234), 567, chain)); + MergeBucketCommand::SP cmd2(copyCommand(cmd, _version5_1)); + CPPUNIT_ASSERT_EQUAL(bucketId, cmd2->getBucketId()); + CPPUNIT_ASSERT_EQUAL(nodes, cmd2->getNodes()); + CPPUNIT_ASSERT_EQUAL(Timestamp(1234), cmd2->getMaxTimestamp()); + CPPUNIT_ASSERT_EQUAL(uint32_t(567), cmd2->getClusterStateVersion()); + CPPUNIT_ASSERT_EQUAL(chain, cmd2->getChain()); + + MergeBucketReply::SP reply(new MergeBucketReply(*cmd)); + MergeBucketReply::SP reply2(copyReply(reply)); + CPPUNIT_ASSERT_EQUAL(bucketId, reply2->getBucketId()); + CPPUNIT_ASSERT_EQUAL(nodes, reply2->getNodes()); + CPPUNIT_ASSERT_EQUAL(Timestamp(1234), reply2->getMaxTimestamp()); + CPPUNIT_ASSERT_EQUAL(uint32_t(567), reply2->getClusterStateVersion()); + CPPUNIT_ASSERT_EQUAL(chain, reply2->getChain()); + + recordOutput(*cmd2); + recordOutput(*reply2); + recordSerialization50(); +} + +void +StorageProtocolTest::testSplitBucket51() +{ + ScopedName test("testSplitBucket51"); + + document::BucketId bucketId(16, 0); + document::Bucket bucket(makeDocumentBucket(bucketId)); + SplitBucketCommand::SP cmd(new SplitBucketCommand(bucket)); + CPPUNIT_ASSERT_EQUAL(0u, (uint32_t) cmd->getMinSplitBits()); + CPPUNIT_ASSERT_EQUAL(58u, (uint32_t) cmd->getMaxSplitBits()); + CPPUNIT_ASSERT_EQUAL(std::numeric_limits().max(), + cmd->getMinByteSize()); + CPPUNIT_ASSERT_EQUAL(std::numeric_limits().max(), + cmd->getMinDocCount()); cmd->setMinByteSize(1000); cmd->setMinDocCount(5); cmd->setMaxSplitBits(40); cmd->setMinSplitBits(20); - auto cmd2 = copyCommand(cmd); - EXPECT_EQ(_bucket, cmd2->getBucket()); - EXPECT_EQ(20u, cmd2->getMinSplitBits()); - EXPECT_EQ(40u, cmd2->getMaxSplitBits()); - EXPECT_EQ(1000u, cmd2->getMinByteSize()); - EXPECT_EQ(5u, cmd2->getMinDocCount()); - - auto reply = std::make_shared(*cmd2); - reply->getSplitInfo().emplace_back(document::BucketId(17, 0), BucketInfo(100, 1000, 10000, true, true)); - reply->getSplitInfo().emplace_back(document::BucketId(17, 1), BucketInfo(101, 1001, 10001, true, true)); - auto reply2 = copyReply(reply); - - EXPECT_EQ(_bucket, reply2->getBucket()); - EXPECT_EQ(size_t(2), reply2->getSplitInfo().size()); - EXPECT_EQ(document::BucketId(17, 0), reply2->getSplitInfo()[0].first); - EXPECT_EQ(document::BucketId(17, 1), reply2->getSplitInfo()[1].first); - EXPECT_EQ(BucketInfo(100, 1000, 10000, true, true), reply2->getSplitInfo()[0].second); - EXPECT_EQ(BucketInfo(101, 1001, 10001, true, true), reply2->getSplitInfo()[1].second); -} - -TEST_P(StorageProtocolTest, join_buckets) { + SplitBucketCommand::SP cmd2(copyCommand(cmd, _version5_1)); + CPPUNIT_ASSERT_EQUAL(20u, (uint32_t) cmd2->getMinSplitBits()); + CPPUNIT_ASSERT_EQUAL(40u, (uint32_t) cmd2->getMaxSplitBits()); + CPPUNIT_ASSERT_EQUAL(1000u, cmd2->getMinByteSize()); + CPPUNIT_ASSERT_EQUAL(5u, cmd2->getMinDocCount()); + + SplitBucketReply::SP reply(new SplitBucketReply(*cmd2)); + reply->getSplitInfo().push_back(SplitBucketReply::Entry( + document::BucketId(17, 0), BucketInfo(100, 1000, 10000, true, true))); + reply->getSplitInfo().push_back(SplitBucketReply::Entry( + document::BucketId(17, 1), BucketInfo(101, 1001, 10001, true, true))); + SplitBucketReply::SP reply2(copyReply(reply)); + + CPPUNIT_ASSERT_EQUAL(bucketId, reply2->getBucketId()); + CPPUNIT_ASSERT_EQUAL(size_t(2), reply2->getSplitInfo().size()); + CPPUNIT_ASSERT_EQUAL(document::BucketId(17, 0), + reply2->getSplitInfo()[0].first); + CPPUNIT_ASSERT_EQUAL(document::BucketId(17, 1), + reply2->getSplitInfo()[1].first); + CPPUNIT_ASSERT_EQUAL(BucketInfo(100, 1000, 10000, true, true), + reply2->getSplitInfo()[0].second); + CPPUNIT_ASSERT_EQUAL(BucketInfo(101, 1001, 10001, true, true), + reply2->getSplitInfo()[1].second); + + recordOutput(*cmd2); + recordOutput(*reply2); + recordSerialization50(); +} + +void +StorageProtocolTest::testJoinBuckets51() +{ + ScopedName test("testJoinBuckets51"); + document::BucketId bucketId(16, 0); + document::Bucket bucket(makeDocumentBucket(bucketId)); std::vector sources; sources.push_back(document::BucketId(17, 0)); sources.push_back(document::BucketId(17, 1)); - auto cmd = std::make_shared(_bucket); + JoinBucketsCommand::SP cmd(new JoinBucketsCommand(bucket)); cmd->getSourceBuckets() = sources; cmd->setMinJoinBits(3); - auto cmd2 = copyCommand(cmd); - EXPECT_EQ(_bucket, cmd2->getBucket()); + JoinBucketsCommand::SP cmd2(copyCommand(cmd, _version5_1)); - auto reply = std::make_shared(*cmd2); + JoinBucketsReply::SP reply(new JoinBucketsReply(*cmd2)); reply->setBucketInfo(BucketInfo(3,4,5)); - auto reply2 = copyReply(reply); + JoinBucketsReply::SP reply2(copyReply(reply)); - EXPECT_EQ(sources, reply2->getSourceBuckets()); - EXPECT_EQ(3, cmd2->getMinJoinBits()); - EXPECT_EQ(BucketInfo(3,4,5), reply2->getBucketInfo()); - EXPECT_EQ(_bucket, reply2->getBucket()); + CPPUNIT_ASSERT_EQUAL(sources, reply2->getSourceBuckets()); + CPPUNIT_ASSERT_EQUAL(3, (int)cmd2->getMinJoinBits()); + CPPUNIT_ASSERT_EQUAL(BucketInfo(3,4,5), reply2->getBucketInfo()); + CPPUNIT_ASSERT_EQUAL(bucketId, reply2->getBucketId()); + + recordOutput(*cmd2); + recordOutput(*reply2); } -TEST_P(StorageProtocolTest, destroy_visitor) { - auto cmd = std::make_shared("instance"); - auto cmd2 = copyCommand(cmd); - EXPECT_EQ("instance", cmd2->getInstanceId()); +void +StorageProtocolTest::testDestroyVisitor51() +{ + ScopedName test("testDestroyVisitor51"); - auto reply = std::make_shared(*cmd2); - auto reply2 = copyReply(reply); + DestroyVisitorCommand::SP cmd( + new DestroyVisitorCommand("instance")); + DestroyVisitorCommand::SP cmd2(copyCommand(cmd, _version5_1)); + CPPUNIT_ASSERT_EQUAL(string("instance"), cmd2->getInstanceId()); + + DestroyVisitorReply::SP reply(new DestroyVisitorReply(*cmd2)); + DestroyVisitorReply::SP reply2(copyReply(reply)); + + recordOutput(*cmd2); + recordOutput(*reply2); + recordSerialization50(); } -TEST_P(StorageProtocolTest, remove_location) { - auto cmd = std::make_shared("id.group == \"mygroup\"", _bucket); - auto cmd2 = copyCommand(cmd); - EXPECT_EQ("id.group == \"mygroup\"", cmd2->getDocumentSelection()); - EXPECT_EQ(_bucket, cmd2->getBucket()); +void +StorageProtocolTest::testRemoveLocation51() +{ + ScopedName test("testRemoveLocation51"); + document::BucketId bucketId(16, 1234); + document::Bucket bucket(makeDocumentBucket(bucketId)); + + RemoveLocationCommand::SP cmd( + new RemoveLocationCommand("id.group == \"mygroup\"", bucket)); + RemoveLocationCommand::SP cmd2(copyCommand(cmd, _version5_1)); + CPPUNIT_ASSERT_EQUAL(vespalib::string("id.group == \"mygroup\""), cmd2->getDocumentSelection()); + CPPUNIT_ASSERT_EQUAL(bucketId, cmd2->getBucketId()); + + RemoveLocationReply::SP reply(new RemoveLocationReply(*cmd2)); + RemoveLocationReply::SP reply2(copyReply(reply)); - auto reply = std::make_shared(*cmd2); - auto reply2 = copyReply(reply); + recordOutput(*cmd2); + recordOutput(*reply2); + recordSerialization50(); } -TEST_P(StorageProtocolTest, create_visitor) { +void +StorageProtocolTest::testCreateVisitor51() +{ + ScopedName test("testCreateVisitor51"); + std::vector buckets; buckets.push_back(document::BucketId(16, 1)); buckets.push_back(document::BucketId(16, 2)); - auto cmd = std::make_shared(makeBucketSpace(), "library", "id", "doc selection"); + CreateVisitorCommand::SP cmd( + new CreateVisitorCommand(makeBucketSpace(), "library", "id", "doc selection")); cmd->setControlDestination("controldest"); cmd->setDataDestination("datadest"); cmd->setVisitorCmdId(1); @@ -472,26 +681,40 @@ TEST_P(StorageProtocolTest, create_visitor) { cmd->setFieldSet("foo,bar,vekterli"); cmd->setVisitInconsistentBuckets(); cmd->setQueueTimeout(100); + cmd->setVisitorOrdering(document::OrderingSpecification::DESCENDING); cmd->setPriority(149); - auto cmd2 = copyCommand(cmd); - EXPECT_EQ("library", cmd2->getLibraryName()); - EXPECT_EQ("id", cmd2->getInstanceId()); - EXPECT_EQ("doc selection", cmd2->getDocumentSelection()); - EXPECT_EQ("controldest", cmd2->getControlDestination()); - EXPECT_EQ("datadest", cmd2->getDataDestination()); - EXPECT_EQ(api::Timestamp(123), cmd2->getFromTime()); - EXPECT_EQ(api::Timestamp(456), cmd2->getToTime()); - EXPECT_EQ(2u, cmd2->getMaximumPendingReplyCount()); - EXPECT_EQ(buckets, cmd2->getBuckets()); - EXPECT_EQ("foo,bar,vekterli", cmd2->getFieldSet()); - EXPECT_TRUE(cmd2->visitInconsistentBuckets()); - EXPECT_EQ(149, cmd2->getPriority()); - - auto reply = std::make_shared(*cmd2); - auto reply2 = copyReply(reply); -} - -TEST_P(StorageProtocolTest, get_bucket_diff) { + CreateVisitorCommand::SP cmd2(copyCommand(cmd, _version5_1)); + CPPUNIT_ASSERT_EQUAL(string("library"), cmd2->getLibraryName()); + CPPUNIT_ASSERT_EQUAL(string("id"), cmd2->getInstanceId()); + CPPUNIT_ASSERT_EQUAL(string("doc selection"), + cmd2->getDocumentSelection()); + CPPUNIT_ASSERT_EQUAL(string("controldest"), + cmd2->getControlDestination()); + CPPUNIT_ASSERT_EQUAL(string("datadest"), cmd2->getDataDestination()); + CPPUNIT_ASSERT_EQUAL(api::Timestamp(123), cmd2->getFromTime()); + CPPUNIT_ASSERT_EQUAL(api::Timestamp(456), cmd2->getToTime()); + CPPUNIT_ASSERT_EQUAL(2u, cmd2->getMaximumPendingReplyCount()); + CPPUNIT_ASSERT_EQUAL(buckets, cmd2->getBuckets()); + CPPUNIT_ASSERT_EQUAL(vespalib::string("foo,bar,vekterli"), cmd2->getFieldSet()); + CPPUNIT_ASSERT(cmd2->visitInconsistentBuckets()); + CPPUNIT_ASSERT_EQUAL(document::OrderingSpecification::DESCENDING, cmd2->getVisitorOrdering()); + CPPUNIT_ASSERT_EQUAL(149, (int)cmd2->getPriority()); + + CreateVisitorReply::SP reply(new CreateVisitorReply(*cmd2)); + CreateVisitorReply::SP reply2(copyReply(reply)); + + recordOutput(*cmd2); + recordOutput(*reply2); + recordSerialization50(); +} + +void +StorageProtocolTest::testGetBucketDiff51() +{ + ScopedName test("testGetBucketDiff51"); + document::BucketId bucketId(623); + document::Bucket bucket(makeDocumentBucket(bucketId)); + std::vector nodes; nodes.push_back(4); nodes.push_back(13); @@ -504,68 +727,56 @@ TEST_P(StorageProtocolTest, get_bucket_diff) { entries.back()._flags = 1; entries.back()._hasMask = 3; - EXPECT_EQ("Entry(timestamp: 123456, gid(0x313233343536373839306162), hasMask: 0x3,\n" - " header size: 100, body size: 65536, flags 0x1)", - entries.back().toString(true)); + CPPUNIT_ASSERT_EQUAL(std::string( + "Entry(timestamp: 123456, gid(0x313233343536373839306162), " + "hasMask: 0x3,\n" + " header size: 100, body size: 65536, flags 0x1)"), + entries.back().toString(true)); - auto cmd = std::make_shared(_bucket, nodes, 1056); + GetBucketDiffCommand::SP cmd(new GetBucketDiffCommand(bucket, nodes, 1056)); cmd->getDiff() = entries; - auto cmd2 = copyCommand(cmd); - EXPECT_EQ(_bucket, cmd2->getBucket()); - - auto reply = std::make_shared(*cmd2); - EXPECT_EQ(entries, reply->getDiff()); - auto reply2 = copyReply(reply); + GetBucketDiffCommand::SP cmd2(copyCommand(cmd, _version5_1)); - EXPECT_EQ(nodes, reply2->getNodes()); - EXPECT_EQ(entries, reply2->getDiff()); - EXPECT_EQ(Timestamp(1056), reply2->getMaxTimestamp()); -} - -namespace { - -ApplyBucketDiffCommand::Entry dummy_apply_entry() { - ApplyBucketDiffCommand::Entry e; - e._docName = "my cool id"; - vespalib::string header_data = "fancy header"; - e._headerBlob.resize(header_data.size()); - memcpy(&e._headerBlob[0], header_data.data(), header_data.size()); + GetBucketDiffReply::SP reply(new GetBucketDiffReply(*cmd2)); + CPPUNIT_ASSERT_EQUAL(entries, reply->getDiff()); + GetBucketDiffReply::SP reply2(copyReply(reply)); - vespalib::string body_data = "fancier body!"; - e._bodyBlob.resize(body_data.size()); - memcpy(&e._bodyBlob[0], body_data.data(), body_data.size()); + CPPUNIT_ASSERT_EQUAL(nodes, reply2->getNodes()); + CPPUNIT_ASSERT_EQUAL(entries, reply2->getDiff()); + CPPUNIT_ASSERT_EQUAL(Timestamp(1056), reply2->getMaxTimestamp()); - GetBucketDiffCommand::Entry meta; - meta._timestamp = 567890; - meta._hasMask = 0x3; - meta._flags = 0x1; - meta._headerSize = 12345; - meta._headerSize = header_data.size(); - meta._bodySize = body_data.size(); - - e._entry = meta; - return e; + recordOutput(*cmd2); + recordOutput(*reply2); + recordSerialization50(); } -} +void +StorageProtocolTest::testApplyBucketDiff51() +{ + ScopedName test("testApplyBucketDiff51"); + document::BucketId bucketId(16, 623); + document::Bucket bucket(makeDocumentBucket(bucketId)); -TEST_P(StorageProtocolTest, apply_bucket_diff) { std::vector nodes; nodes.push_back(4); nodes.push_back(13); - std::vector entries = {dummy_apply_entry()}; + std::vector entries; + entries.push_back(ApplyBucketDiffCommand::Entry()); - auto cmd = std::make_shared(_bucket, nodes, 1234); + ApplyBucketDiffCommand::SP cmd(new ApplyBucketDiffCommand(bucket, nodes, 1234)); cmd->getDiff() = entries; - auto cmd2 = copyCommand(cmd); - EXPECT_EQ(_bucket, cmd2->getBucket()); + ApplyBucketDiffCommand::SP cmd2(copyCommand(cmd, _version5_1)); - auto reply = std::make_shared(*cmd2); - auto reply2 = copyReply(reply); + ApplyBucketDiffReply::SP reply(new ApplyBucketDiffReply(*cmd2)); + ApplyBucketDiffReply::SP reply2(copyReply(reply)); - EXPECT_EQ(nodes, reply2->getNodes()); - EXPECT_EQ(entries, reply2->getDiff()); - EXPECT_EQ(1234u, reply2->getMaxBufferSize()); + CPPUNIT_ASSERT_EQUAL(nodes, reply2->getNodes()); + CPPUNIT_ASSERT_EQUAL(entries, reply2->getDiff()); + CPPUNIT_ASSERT_EQUAL(1234u, reply2->getMaxBufferSize()); + + recordOutput(*cmd2); + recordOutput(*reply2); + recordSerialization50(); } namespace { @@ -596,97 +807,161 @@ namespace { }; api::StorageReply::UP MyCommand::makeReply() { - return std::make_unique(*this); + return api::StorageReply::UP(new MyReply(*this)); } } -TEST_P(StorageProtocolTest, internal_message) { +void +StorageProtocolTest::testInternalMessage() +{ + ScopedName test("testInternal51"); MyCommand cmd; MyReply reply(cmd); - // TODO what's this even intended to test? + + recordOutput(cmd); + recordOutput(reply); } -TEST_P(StorageProtocolTest, set_bucket_state_with_inactive_state) { - auto cmd = std::make_shared(_bucket, SetBucketStateCommand::INACTIVE); - auto cmd2 = copyCommand(cmd); - EXPECT_EQ(_bucket, cmd2->getBucket()); +void +StorageProtocolTest::testSetBucketState51() +{ + ScopedName test("testSetBucketState51"); + document::BucketId bucketId(16, 0); + document::Bucket bucket(makeDocumentBucket(bucketId)); + SetBucketStateCommand::SP cmd( + new SetBucketStateCommand(bucket, SetBucketStateCommand::ACTIVE)); + SetBucketStateCommand::SP cmd2(copyCommand(cmd, _version5_1)); - auto reply = std::make_shared(*cmd2); - auto reply2 = copyReply(reply); + SetBucketStateReply::SP reply(new SetBucketStateReply(*cmd2)); + SetBucketStateReply::SP reply2(copyReply(reply)); - EXPECT_EQ(SetBucketStateCommand::INACTIVE, cmd2->getState()); - EXPECT_EQ(_bucket, reply2->getBucket()); -} + CPPUNIT_ASSERT_EQUAL(SetBucketStateCommand::ACTIVE, cmd2->getState()); + CPPUNIT_ASSERT_EQUAL(bucketId, cmd2->getBucketId()); + CPPUNIT_ASSERT_EQUAL(bucketId, reply2->getBucketId()); -TEST_P(StorageProtocolTest, set_bucket_state_with_active_state) { - auto cmd = std::make_shared(_bucket, SetBucketStateCommand::ACTIVE); - auto cmd2 = copyCommand(cmd); - EXPECT_EQ(SetBucketStateCommand::ACTIVE, cmd2->getState()); + recordOutput(*cmd2); + recordOutput(*reply2); } -TEST_P(StorageProtocolTest, put_command_with_condition) { - auto cmd = std::make_shared(_bucket, _testDoc, 14); +void +StorageProtocolTest::testPutCommand52() +{ + ScopedName test("testPutCommand52"); + + PutCommand::SP cmd(new PutCommand(_bucket, _testDoc, 14)); cmd->setCondition(TestAndSetCondition(CONDITION_STRING)); - auto cmd2 = copyCommand(cmd); - EXPECT_EQ(cmd->getCondition().getSelection(), cmd2->getCondition().getSelection()); + PutCommand::SP cmd2(copyCommand(cmd, _version5_2)); + CPPUNIT_ASSERT_EQUAL(cmd->getCondition().getSelection(), cmd2->getCondition().getSelection()); } -TEST_P(StorageProtocolTest, update_command_with_condition) { - auto update = std::make_shared( - _docMan.getTypeRepo(), *_testDoc->getDataType(), _testDoc->getId()); - auto cmd = std::make_shared(_bucket, update, 14); +void +StorageProtocolTest::testUpdateCommand52() +{ + ScopedName test("testUpdateCommand52"); + + document::DocumentUpdate::SP update(new document::DocumentUpdate(_docMan.getTypeRepo(), *_testDoc->getDataType(), _testDoc->getId())); + UpdateCommand::SP cmd(new UpdateCommand(_bucket, update, 14)); cmd->setCondition(TestAndSetCondition(CONDITION_STRING)); - auto cmd2 = copyCommand(cmd); - EXPECT_EQ(cmd->getCondition().getSelection(), cmd2->getCondition().getSelection()); + UpdateCommand::SP cmd2(copyCommand(cmd, _version5_2)); + CPPUNIT_ASSERT_EQUAL(cmd->getCondition().getSelection(), cmd2->getCondition().getSelection()); } -TEST_P(StorageProtocolTest, remove_command_with_condition) { - auto cmd = std::make_shared(_bucket, _testDocId, 159); +void +StorageProtocolTest::testRemoveCommand52() +{ + ScopedName test("testRemoveCommand52"); + + RemoveCommand::SP cmd(new RemoveCommand(_bucket, _testDocId, 159)); cmd->setCondition(TestAndSetCondition(CONDITION_STRING)); - auto cmd2 = copyCommand(cmd); - EXPECT_EQ(cmd->getCondition().getSelection(), cmd2->getCondition().getSelection()); + RemoveCommand::SP cmd2(copyCommand(cmd, _version5_2)); + CPPUNIT_ASSERT_EQUAL(cmd->getCondition().getSelection(), cmd2->getCondition().getSelection()); } -TEST_P(StorageProtocolTest, put_command_with_bucket_space) { - document::Bucket bucket(document::BucketSpace(5), _bucket_id); +void +StorageProtocolTest::testPutCommandWithBucketSpace6_0() +{ + ScopedName test("testPutCommandWithBucketSpace6_0"); + + document::Bucket bucket(document::BucketSpace(5), _bucket.getBucketId()); auto cmd = std::make_shared(bucket, _testDoc, 14); - auto cmd2 = copyCommand(cmd); - EXPECT_EQ(bucket, cmd2->getBucket()); + auto cmd2 = copyCommand(cmd, _version6_0); + CPPUNIT_ASSERT_EQUAL(bucket, cmd2->getBucket()); } -TEST_P(StorageProtocolTest, create_visitor_with_bucket_space) { +void +StorageProtocolTest::testCreateVisitorWithBucketSpace6_0() +{ + ScopedName test("testCreateVisitorWithBucketSpace6_0"); + document::BucketSpace bucketSpace(5); auto cmd = std::make_shared(bucketSpace, "library", "id", "doc selection"); - auto cmd2 = copyCommand(cmd); - EXPECT_EQ(bucketSpace, cmd2->getBucketSpace()); + auto cmd2 = copyCommand(cmd, _version6_0); + CPPUNIT_ASSERT_EQUAL(bucketSpace, cmd2->getBucketSpace()); } -TEST_P(StorageProtocolTest, request_bucket_info_with_bucket_space) { +void +StorageProtocolTest::testRequestBucketInfoWithBucketSpace6_0() +{ + ScopedName test("testRequestBucketInfoWithBucketSpace6_0"); + document::BucketSpace bucketSpace(5); std::vector ids = {document::BucketId(3)}; auto cmd = std::make_shared(bucketSpace, ids); - auto cmd2 = copyCommand(cmd); - EXPECT_EQ(bucketSpace, cmd2->getBucketSpace()); - EXPECT_EQ(ids, cmd2->getBuckets()); + auto cmd2 = copyCommand(cmd, _version6_0); + CPPUNIT_ASSERT_EQUAL(bucketSpace, cmd2->getBucketSpace()); + CPPUNIT_ASSERT_EQUAL(ids, cmd2->getBuckets()); +} + +void +StorageProtocolTest::serialized_size_is_used_to_set_approx_size_of_storage_message() +{ + ScopedName test("serialized_size_is_used_to_set_approx_size_of_storage_message"); + + PutCommand::SP cmd(new PutCommand(_bucket, _testDoc, 14)); + CPPUNIT_ASSERT_EQUAL(50u, cmd->getApproxByteSize()); + + PutCommand::SP cmd2(copyCommand(cmd, _version6_0)); + CPPUNIT_ASSERT_EQUAL(181u, cmd2->getApproxByteSize()); } -TEST_P(StorageProtocolTest, serialized_size_is_used_to_set_approx_size_of_storage_message) { - auto cmd = std::make_shared(_bucket, _testDoc, 14); - EXPECT_EQ(50u, cmd->getApproxByteSize()); +void +StorageProtocolTest::testStringOutputs() +{ + std::cerr << "\nNon verbose output:\n"; + for (uint32_t i=0, n=_nonVerboseMessageStrings.size(); igetApproxByteSize()); - } else { // Legacy encoding - EXPECT_EQ(181u, cmd2->getApproxByteSize()); +void +StorageProtocolTest::testWriteSerialization50() +{ + std::ofstream of("mbusprot/mbusprot.5.0.serialization.5.1"); + of << std::hex << std::setfill('0'); + for (uint32_t i=0, n=_serialization50.size(); i 126 || (c < 32 && c != 10)) { + int32_t num = static_cast(c); + if (num < 0) num += 256; + of << '\\' << std::setw(2) << num; + } else if (c == '\\') { + of << "\\\\"; + } else { + of << c; + } } + of.close(); } -} // storage::api +} // mbusprot +} // storage diff --git a/storageapi/src/vespa/storageapi/CMakeLists.txt b/storageapi/src/vespa/storageapi/CMakeLists.txt index 90eb6dd9eca..c08dcbc2419 100644 --- a/storageapi/src/vespa/storageapi/CMakeLists.txt +++ b/storageapi/src/vespa/storageapi/CMakeLists.txt @@ -1,5 +1,4 @@ # Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - vespa_add_library(storageapi SOURCES $ @@ -9,6 +8,3 @@ vespa_add_library(storageapi INSTALL lib64 DEPENDS ) - -vespa_add_target_package_dependency(storageapi Protobuf) - diff --git a/storageapi/src/vespa/storageapi/mbusprot/.gitignore b/storageapi/src/vespa/storageapi/mbusprot/.gitignore index 8e91fe9cab0..526f91c6668 100644 --- a/storageapi/src/vespa/storageapi/mbusprot/.gitignore +++ b/storageapi/src/vespa/storageapi/mbusprot/.gitignore @@ -5,6 +5,3 @@ .deps .libs Makefile -*.pb.h -*.pb.cc - diff --git a/storageapi/src/vespa/storageapi/mbusprot/CMakeLists.txt b/storageapi/src/vespa/storageapi/mbusprot/CMakeLists.txt index dc4e3897e49..d5952d7cb91 100644 --- a/storageapi/src/vespa/storageapi/mbusprot/CMakeLists.txt +++ b/storageapi/src/vespa/storageapi/mbusprot/CMakeLists.txt @@ -1,19 +1,4 @@ # Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -find_package(Protobuf REQUIRED) -PROTOBUF_GENERATE_CPP(storageapi_PROTOBUF_SRCS storageapi_PROTOBUF_HDRS - protobuf/common.proto - protobuf/feed.proto - protobuf/visiting.proto - protobuf/maintenance.proto) - -# protoc-generated files emit compiler warnings that we normally treat as errors. -# Instead of rolling our own compiler plugin we'll pragmatically disable the noise. -set_source_files_properties(${storageapi_PROTOBUF_SRCS} PROPERTIES COMPILE_FLAGS "-Wno-suggest-override -Wno-inline") -# protoc explicitly annotates methods with inline, which triggers -Werror=inline when -# the header file grows over a certain size. -set_source_files_properties(protocolserialization7.cpp PROPERTIES COMPILE_FLAGS "-Wno-inline") - vespa_add_library(storageapi_mbusprot OBJECT SOURCES storagemessage.cpp @@ -26,7 +11,5 @@ vespa_add_library(storageapi_mbusprot OBJECT protocolserialization5_1.cpp protocolserialization5_2.cpp protocolserialization6_0.cpp - protocolserialization7.cpp - ${storageapi_PROTOBUF_SRCS} DEPENDS ) diff --git a/storageapi/src/vespa/storageapi/mbusprot/legacyprotocolserialization.h b/storageapi/src/vespa/storageapi/mbusprot/legacyprotocolserialization.h deleted file mode 100644 index ef4a6b28749..00000000000 --- a/storageapi/src/vespa/storageapi/mbusprot/legacyprotocolserialization.h +++ /dev/null @@ -1,31 +0,0 @@ -// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#pragma once - -#include "protocolserialization.h" - -namespace storage::mbusprot { - -/* - * Utility base class for pre-v7 (protobuf) serialization implementations. - * - * TODO remove on Vespa 8 alongside legacy serialization formats. - */ -class LegacyProtocolSerialization : public ProtocolSerialization { - const std::shared_ptr _repo; -public: - explicit LegacyProtocolSerialization(const std::shared_ptr& repo) - : _repo(repo) - {} - - const document::DocumentTypeRepo& getTypeRepo() const { return *_repo; } - const std::shared_ptr getTypeRepoSp() const { return _repo; } - - virtual document::Bucket getBucket(document::ByteBuffer& buf) const = 0; - virtual void putBucket(const document::Bucket& bucket, vespalib::GrowableByteBuffer& buf) const = 0; - virtual document::BucketSpace getBucketSpace(document::ByteBuffer& buf) const = 0; - virtual void putBucketSpace(document::BucketSpace bucketSpace, vespalib::GrowableByteBuffer& buf) const = 0; - virtual api::BucketInfo getBucketInfo(document::ByteBuffer& buf) const = 0; - virtual void putBucketInfo(const api::BucketInfo& info, vespalib::GrowableByteBuffer& buf) const = 0; -}; - -} // storage::mbusprot diff --git a/storageapi/src/vespa/storageapi/mbusprot/protobuf/common.proto b/storageapi/src/vespa/storageapi/mbusprot/protobuf/common.proto deleted file mode 100644 index d641449995d..00000000000 --- a/storageapi/src/vespa/storageapi/mbusprot/protobuf/common.proto +++ /dev/null @@ -1,68 +0,0 @@ -// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -syntax = "proto3"; - -option cc_enable_arenas = true; - -package storage.mbusprot.protobuf; - -// Note: we use a *Request/*Response naming convention rather than *Command/*Reply, -// as the former is the gRPC convention and that's where we intend to move. - -message BucketSpace { - uint64 space_id = 1; -} - -message BucketId { - fixed64 raw_id = 1; -} - -message Bucket { - uint64 space_id = 1; - fixed64 raw_bucket_id = 2; -} - -// Next tag to use: 3 -message BucketInfo { - uint64 last_modified_timestamp = 1; - fixed32 legacy_checksum = 2; - // TODO v2 checksum - uint32 doc_count = 3; - uint32 total_doc_size = 4; - uint32 meta_count = 5; - uint32 used_file_size = 6; - bool ready = 7; - bool active = 8; -} - -message GlobalId { - // 96 bits of GID data in _little_ endian. High entropy, so fixed encoding is better than varint. - // Low 64 bits as if memcpy()ed from bytes [0, 8) of the GID buffer - fixed64 lo_64 = 1; - // High 32 bits as if memcpy()ed from bytes [8, 12) of the GID buffer - fixed32 hi_32 = 2; -} - -// TODO these should ideally be gRPC headers.. -message RequestHeader { - uint64 message_id = 1; - uint32 priority = 2; // Always in range [0, 255] - uint32 source_index = 3; // Always in range [0, 65535] - fixed32 loadtype_id = 4; // It's a hash with high entropy, so fixed encoding is better than varint -} - -// TODO these should ideally be gRPC headers.. -message ResponseHeader { - // TODO this should ideally be gRPC Status... - uint32 return_code_id = 1; - bytes return_code_message = 2; // FIXME it's `bytes` since `string` will check for UTF-8... might not hold... - uint64 message_id = 3; - uint32 priority = 4; // Always in range [0, 255] -} - -message Document { - bytes payload = 1; -} - -message DocumentId { - bytes id = 1; -} diff --git a/storageapi/src/vespa/storageapi/mbusprot/protobuf/feed.proto b/storageapi/src/vespa/storageapi/mbusprot/protobuf/feed.proto deleted file mode 100644 index 58da24df836..00000000000 --- a/storageapi/src/vespa/storageapi/mbusprot/protobuf/feed.proto +++ /dev/null @@ -1,91 +0,0 @@ -// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -syntax = "proto3"; - -option cc_enable_arenas = true; - -package storage.mbusprot.protobuf; - -import "common.proto"; - -message TestAndSetCondition { - bytes selection = 1; -} - -message PutRequest { - Bucket bucket = 1; - Document document = 2; - uint64 new_timestamp = 3; - uint64 expected_old_timestamp = 4; // If zero; no expectation. - TestAndSetCondition condition = 5; -} - -message PutResponse { - BucketInfo bucket_info = 1; - BucketId remapped_bucket_id = 2; - bool was_found = 3; -} - -message Update { - bytes payload = 1; -} - -message UpdateRequest { - Bucket bucket = 1; - Update update = 2; - uint64 new_timestamp = 3; - uint64 expected_old_timestamp = 4; // If zero; no expectation. - TestAndSetCondition condition = 5; -} - -message UpdateResponse { - BucketInfo bucket_info = 1; - BucketId remapped_bucket_id = 2; - uint64 updated_timestamp = 3; -} - -message RemoveRequest { - Bucket bucket = 1; - bytes document_id = 2; - uint64 new_timestamp = 3; - TestAndSetCondition condition = 4; -} - -message RemoveResponse { - BucketInfo bucket_info = 1; - BucketId remapped_bucket_id = 2; - uint64 removed_timestamp = 3; -} - -message GetRequest { - Bucket bucket = 1; - bytes document_id = 2; - bytes field_set = 3; - uint64 before_timestamp = 4; -} - -message GetResponse { - Document document = 1; - uint64 last_modified_timestamp = 2; - BucketInfo bucket_info = 3; - BucketId remapped_bucket_id = 4; -} - -message RevertRequest { - Bucket bucket = 1; - repeated uint64 revert_tokens = 2; -} - -message RevertResponse { - BucketInfo bucket_info = 1; - BucketId remapped_bucket_id = 2; -} - -message RemoveLocationRequest { - Bucket bucket = 1; - bytes document_selection = 2; -} - -message RemoveLocationResponse { - BucketInfo bucket_info = 1; - BucketId remapped_bucket_id = 2; -} diff --git a/storageapi/src/vespa/storageapi/mbusprot/protobuf/maintenance.proto b/storageapi/src/vespa/storageapi/mbusprot/protobuf/maintenance.proto deleted file mode 100644 index c4766d2900a..00000000000 --- a/storageapi/src/vespa/storageapi/mbusprot/protobuf/maintenance.proto +++ /dev/null @@ -1,160 +0,0 @@ -// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -syntax = "proto3"; - -option cc_enable_arenas = true; - -package storage.mbusprot.protobuf; - -import "common.proto"; - -message DeleteBucketRequest { - Bucket bucket = 1; - BucketInfo expected_bucket_info = 2; -} - -message DeleteBucketResponse { - BucketInfo bucket_info = 1; - BucketId remapped_bucket_id = 2; -} - -message CreateBucketRequest { - Bucket bucket = 1; - bool create_as_active = 2; -} - -message CreateBucketResponse { - BucketInfo bucket_info = 1; - BucketId remapped_bucket_id = 2; -} - -message MergeNode { - uint32 index = 1; - bool source_only = 2; -} - -message MergeBucketRequest { - Bucket bucket = 1; - uint32 cluster_state_version = 2; - uint64 max_timestamp = 3; - repeated MergeNode nodes = 4; - repeated uint32 node_chain = 5; -} - -message MergeBucketResponse { - BucketId remapped_bucket_id = 1; -} - -message MetaDiffEntry { - uint64 timestamp = 1; - GlobalId gid = 2; - uint32 header_size = 3; - uint32 body_size = 4; - uint32 flags = 5; - uint32 presence_mask = 6; -} - -message GetBucketDiffRequest { - Bucket bucket = 1; - uint64 max_timestamp = 2; - repeated MergeNode nodes = 3; - repeated MetaDiffEntry diff = 4; -} - -message GetBucketDiffResponse { - BucketId remapped_bucket_id = 1; - repeated MetaDiffEntry diff = 2; -} - -message ApplyDiffEntry { - MetaDiffEntry entry_meta = 1; - bytes document_id = 2; - bytes header_blob = 3; - bytes body_blob = 4; -} - -message ApplyBucketDiffRequest { - Bucket bucket = 1; - repeated MergeNode nodes = 2; - uint32 max_buffer_size = 3; - repeated ApplyDiffEntry entries = 4; -} - -message ApplyBucketDiffResponse { - BucketId remapped_bucket_id = 1; - repeated ApplyDiffEntry entries = 4; -} - -message ExplicitBucketSet { - // `Bucket` is not needed, as the space is inferred from the owning message. - repeated BucketId bucket_ids = 2; -} - -message AllBuckets { - uint32 distributor_index = 1; - bytes cluster_state = 2; - bytes distribution_hash = 3; -} - -message RequestBucketInfoRequest { - BucketSpace bucket_space = 1; - oneof request_for { - ExplicitBucketSet explicit_bucket_set = 2; - AllBuckets all_buckets = 3; - } -} - -message BucketAndBucketInfo { - fixed64 raw_bucket_id = 1; - BucketInfo bucket_info = 2; -} - -message RequestBucketInfoResponse { - repeated BucketAndBucketInfo bucket_infos = 1; -} - -message NotifyBucketChangeRequest { - Bucket bucket = 1; - BucketInfo bucket_info = 2; -} - -message NotifyBucketChangeResponse { - // Currently empty -} - -message SplitBucketRequest { - Bucket bucket = 1; - uint32 min_split_bits = 2; - uint32 max_split_bits = 3; - uint32 min_byte_size = 4; - uint32 min_doc_count = 5; -} - -message SplitBucketResponse { - BucketId remapped_bucket_id = 1; - repeated BucketAndBucketInfo split_info = 2; -} - -message JoinBucketsRequest { - Bucket bucket = 1; - repeated BucketId source_buckets = 2; - uint32 min_join_bits = 3; -} - -message JoinBucketsResponse { - BucketInfo bucket_info = 1; - BucketId remapped_bucket_id = 2; -} - -message SetBucketStateRequest { - enum BucketState { - Inactive = 0; - Active = 1; - } - - Bucket bucket = 1; - BucketState state = 2; -} - -message SetBucketStateResponse { - BucketId remapped_bucket_id = 1; -} diff --git a/storageapi/src/vespa/storageapi/mbusprot/protobuf/visiting.proto b/storageapi/src/vespa/storageapi/mbusprot/protobuf/visiting.proto deleted file mode 100644 index 89ce39e52a0..00000000000 --- a/storageapi/src/vespa/storageapi/mbusprot/protobuf/visiting.proto +++ /dev/null @@ -1,66 +0,0 @@ -// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -syntax = "proto3"; - -option cc_enable_arenas = true; - -package storage.mbusprot.protobuf; - -import "common.proto"; - -message ClientVisitorParameter { - bytes key = 1; - bytes value = 2; -} - -message VisitorConstraints { - bytes document_selection = 1; - uint64 from_time_usec = 2; - uint64 to_time_usec = 3; - bool visit_removes = 4; - bytes field_set = 5; - bool visit_inconsistent_buckets = 6; -} - -message VisitorControlMeta { - bytes instance_id = 1; - bytes library_name = 2; - uint32 visitor_command_id = 3; - bytes control_destination = 4; - bytes data_destination = 5; - - // TODO move? - uint32 max_pending_reply_count = 6; - uint32 queue_timeout = 7; - uint32 max_buckets_per_visitor = 8; -} - -message CreateVisitorRequest { - BucketSpace bucket_space = 1; - repeated BucketId buckets = 2; - - VisitorConstraints constraints = 3; - VisitorControlMeta control_meta = 4; - repeated ClientVisitorParameter client_parameters = 5; -} - -message VisitorStatistics { - uint32 buckets_visited = 1; - uint64 documents_visited = 2; - uint64 bytes_visited = 3; - uint64 documents_returned = 4; - uint64 bytes_returned = 5; - uint64 second_pass_documents_returned = 6; // TODO don't include? orderdoc only - uint64 second_pass_bytes_returned = 7; // TODO don't include? orderdoc only -} - -message CreateVisitorResponse { - VisitorStatistics visitor_statistics = 1; -} - -message DestroyVisitorRequest { - bytes instance_id = 1; -} - -message DestroyVisitorResponse { - // Currently empty -} diff --git a/storageapi/src/vespa/storageapi/mbusprot/protobuf_includes.h b/storageapi/src/vespa/storageapi/mbusprot/protobuf_includes.h deleted file mode 100644 index 8e878cf0560..00000000000 --- a/storageapi/src/vespa/storageapi/mbusprot/protobuf_includes.h +++ /dev/null @@ -1,13 +0,0 @@ -// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#pragma once - -// Disable warnings emitted by protoc generated files -#pragma GCC diagnostic push -#pragma GCC diagnostic ignored "-Wsuggest-override" - -#include "feed.pb.h" -#include "visiting.pb.h" -#include "maintenance.pb.h" - -#pragma GCC diagnostic pop diff --git a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization.cpp b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization.cpp index 917b60c50c3..172cd6c8de5 100644 --- a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization.cpp +++ b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization.cpp @@ -17,6 +17,11 @@ LOG_SETUP(".storage.api.mbusprot.serialization.base"); namespace storage::mbusprot { +ProtocolSerialization::ProtocolSerialization(const std::shared_ptr& repo) + : _repo(repo) +{ +} + mbus::Blob ProtocolSerialization::encode(const api::StorageMessage& msg) const { diff --git a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization.h b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization.h index a57627b9ba9..9c3ddb88bdf 100644 --- a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization.h +++ b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization.h @@ -59,14 +59,21 @@ class StorageCommand; class StorageReply; class ProtocolSerialization { + const std::shared_ptr _repo; + public: virtual mbus::Blob encode(const api::StorageMessage&) const; virtual std::unique_ptr decodeCommand(mbus::BlobRef) const; virtual std::unique_ptr decodeReply( mbus::BlobRef, const api::StorageCommand&) const; + protected: - ProtocolSerialization() = default; - virtual ~ProtocolSerialization() = default; + const document::DocumentTypeRepo& getTypeRepo() const { return *_repo; } + const std::shared_ptr getTypeRepoSp() const + { return _repo; } + + ProtocolSerialization(const std::shared_ptr &repo); + virtual ~ProtocolSerialization() {} typedef api::StorageCommand SCmd; typedef api::StorageReply SRep; @@ -95,10 +102,13 @@ protected: virtual void onEncode(GBBuf&, const api::GetBucketDiffReply&) const = 0; virtual void onEncode(GBBuf&, const api::ApplyBucketDiffCommand&) const = 0; virtual void onEncode(GBBuf&, const api::ApplyBucketDiffReply&) const = 0; - virtual void onEncode(GBBuf&, const api::RequestBucketInfoCommand&) const = 0; + virtual void onEncode(GBBuf&, + const api::RequestBucketInfoCommand&) const = 0; virtual void onEncode(GBBuf&, const api::RequestBucketInfoReply&) const = 0; - virtual void onEncode(GBBuf&, const api::NotifyBucketChangeCommand&) const = 0; - virtual void onEncode(GBBuf&, const api::NotifyBucketChangeReply&) const = 0; + virtual void onEncode(GBBuf&, + const api::NotifyBucketChangeCommand&) const = 0; + virtual void onEncode(GBBuf&, + const api::NotifyBucketChangeReply&) const = 0; virtual void onEncode(GBBuf&, const api::SplitBucketCommand&) const = 0; virtual void onEncode(GBBuf&, const api::SplitBucketReply&) const = 0; virtual void onEncode(GBBuf&, const api::JoinBucketsCommand&) const = 0; @@ -133,9 +143,11 @@ protected: virtual SCmd::UP onDecodeApplyBucketDiffCommand(BBuf&) const = 0; virtual SRep::UP onDecodeApplyBucketDiffReply(const SCmd&, BBuf&) const = 0; virtual SCmd::UP onDecodeRequestBucketInfoCommand(BBuf&) const = 0; - virtual SRep::UP onDecodeRequestBucketInfoReply(const SCmd&, BBuf&) const = 0; + virtual SRep::UP onDecodeRequestBucketInfoReply(const SCmd&, + BBuf&) const = 0; virtual SCmd::UP onDecodeNotifyBucketChangeCommand(BBuf&) const = 0; - virtual SRep::UP onDecodeNotifyBucketChangeReply(const SCmd&, BBuf&) const = 0; + virtual SRep::UP onDecodeNotifyBucketChangeReply(const SCmd&, + BBuf&) const = 0; virtual SCmd::UP onDecodeSplitBucketCommand(BBuf&) const = 0; virtual SRep::UP onDecodeSplitBucketReply(const SCmd&, BBuf&) const = 0; virtual SCmd::UP onDecodeJoinBucketsCommand(BBuf&) const = 0; @@ -148,6 +160,14 @@ protected: virtual SRep::UP onDecodeDestroyVisitorReply(const SCmd&, BBuf&) const = 0; virtual SCmd::UP onDecodeRemoveLocationCommand(BBuf&) const = 0; virtual SRep::UP onDecodeRemoveLocationReply(const SCmd&, BBuf&) const = 0; + + virtual document::Bucket getBucket(document::ByteBuffer& buf) const = 0; + virtual void putBucket(const document::Bucket& bucket, vespalib::GrowableByteBuffer& buf) const = 0; + virtual document::BucketSpace getBucketSpace(document::ByteBuffer& buf) const = 0; + virtual void putBucketSpace(document::BucketSpace bucketSpace, vespalib::GrowableByteBuffer& buf) const = 0; + virtual api::BucketInfo getBucketInfo(document::ByteBuffer& buf) const = 0; + virtual void putBucketInfo(const api::BucketInfo& info, vespalib::GrowableByteBuffer& buf) const = 0; + }; } diff --git a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization4_2.cpp b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization4_2.cpp index 466ff85f398..74a0c964d19 100644 --- a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization4_2.cpp +++ b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization4_2.cpp @@ -20,7 +20,7 @@ namespace storage::mbusprot { ProtocolSerialization4_2::ProtocolSerialization4_2( const std::shared_ptr& repo) - : LegacyProtocolSerialization(repo) + : ProtocolSerialization(repo) { } diff --git a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization4_2.h b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization4_2.h index e4ab36dc989..56aa3d4ed30 100644 --- a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization4_2.h +++ b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization4_2.h @@ -1,13 +1,13 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #pragma once -#include "legacyprotocolserialization.h" +#include "protocolserialization.h" namespace storage::mbusprot { -class ProtocolSerialization4_2 : public LegacyProtocolSerialization { +class ProtocolSerialization4_2 : public ProtocolSerialization { public: - explicit ProtocolSerialization4_2(const std::shared_ptr&); + ProtocolSerialization4_2(const std::shared_ptr&); protected: void onEncode(GBBuf&, const api::GetCommand&) const override; diff --git a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization5_0.h b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization5_0.h index 67f02aa2d2a..042ec7850ef 100644 --- a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization5_0.h +++ b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization5_0.h @@ -73,9 +73,6 @@ public: SRep::UP onDecodeCreateVisitorReply(const SCmd& cmd, BBuf& buf) const override; void onDecodeCommand(BBuf& buf, api::StorageCommand& msg) const override; void onDecodeReply(BBuf&, api::StorageReply&) const override; - -protected: - const documentapi::LoadTypeSet& loadTypes() const noexcept { return _loadTypes; }; }; } diff --git a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.cpp b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.cpp deleted file mode 100644 index ca77977046c..00000000000 --- a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.cpp +++ /dev/null @@ -1,1268 +0,0 @@ -// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#include "protocolserialization7.h" -#include "serializationhelper.h" -#include "protobuf_includes.h" - -#include -#include -#include -#include -#include -#include - -namespace storage::mbusprot { - -ProtocolSerialization7::ProtocolSerialization7(std::shared_ptr repo, - const documentapi::LoadTypeSet& load_types) - : ProtocolSerialization(), - _repo(std::move(repo)), - _load_types(load_types) -{ -} - -namespace { - -void set_bucket(protobuf::Bucket& dest, const document::Bucket& src) { - dest.set_raw_bucket_id(src.getBucketId().getRawId()); - dest.set_space_id(src.getBucketSpace().getId()); -} - -void set_bucket_id(protobuf::BucketId& dest, const document::BucketId& src) { - dest.set_raw_id(src.getRawId()); -} - -document::BucketId get_bucket_id(const protobuf::BucketId& src) { - return document::BucketId(src.raw_id()); -} - -void set_bucket_space(protobuf::BucketSpace& dest, const document::BucketSpace& src) { - dest.set_space_id(src.getId()); -} - -document::BucketSpace get_bucket_space(const protobuf::BucketSpace& src) { - return document::BucketSpace(src.space_id()); -} - -void set_bucket_info(protobuf::BucketInfo& dest, const api::BucketInfo& src) { - dest.set_last_modified_timestamp(src.getLastModified()); - dest.set_legacy_checksum(src.getChecksum()); - dest.set_doc_count(src.getDocumentCount()); - dest.set_total_doc_size(src.getTotalDocumentSize()); - dest.set_meta_count(src.getMetaCount()); - dest.set_used_file_size(src.getUsedFileSize()); - dest.set_active(src.isActive()); - dest.set_ready(src.isReady()); -} - -document::Bucket get_bucket(const protobuf::Bucket& src) { - return document::Bucket(document::BucketSpace(src.space_id()), - document::BucketId(src.raw_bucket_id())); -} - -api::BucketInfo get_bucket_info(const protobuf::BucketInfo& src) { - api::BucketInfo info; - info.setLastModified(src.last_modified_timestamp()); - info.setChecksum(src.legacy_checksum()); - info.setDocumentCount(src.doc_count()); - info.setTotalDocumentSize(src.total_doc_size()); - info.setMetaCount(src.meta_count()); - info.setUsedFileSize(src.used_file_size()); - info.setActive(src.active()); - info.setReady(src.ready()); - return info; -} - -documentapi::TestAndSetCondition get_tas_condition(const protobuf::TestAndSetCondition& src) { - return documentapi::TestAndSetCondition(src.selection()); -} - -void set_tas_condition(protobuf::TestAndSetCondition& dest, const documentapi::TestAndSetCondition& src) { - dest.set_selection(src.getSelection().data(), src.getSelection().size()); -} - -std::shared_ptr get_document(const protobuf::Document& src_doc, - const document::DocumentTypeRepo& type_repo) -{ - if (!src_doc.payload().empty()) { - document::ByteBuffer doc_buf(src_doc.payload().data(), src_doc.payload().size()); - return std::make_shared(type_repo, doc_buf); - } - return std::shared_ptr(); -} - -void set_update(protobuf::Update& dest, const document::DocumentUpdate& src) { - vespalib::nbostream stream; - src.serializeHEAD(stream); - dest.set_payload(stream.peek(), stream.size()); -} - -std::shared_ptr get_update(const protobuf::Update& src, - const document::DocumentTypeRepo& type_repo) -{ - if (!src.payload().empty()) { - return document::DocumentUpdate::createHEAD( - type_repo, vespalib::nbostream(src.payload().data(), src.payload().size())); - } - return std::shared_ptr(); -} - -void write_request_header(vespalib::GrowableByteBuffer& buf, const api::StorageCommand& cmd) { - protobuf::RequestHeader hdr; // Arena alloc not needed since there are no nested messages - hdr.set_message_id(cmd.getMsgId()); - hdr.set_priority(cmd.getPriority()); - hdr.set_source_index(cmd.getSourceIndex()); - hdr.set_loadtype_id(cmd.getLoadType().getId()); - - uint8_t dest[128]; // Only primitive fields, should be plenty large enough. - auto encoded_size = static_cast(hdr.ByteSizeLong()); - assert(encoded_size <= sizeof(dest)); - [[maybe_unused]] bool ok = hdr.SerializeWithCachedSizesToArray(dest); - assert(ok); - buf.putInt(encoded_size); - buf.putBytes(reinterpret_cast(dest), encoded_size); -} - -void write_response_header(vespalib::GrowableByteBuffer& buf, const api::StorageReply& reply) { - protobuf::ResponseHeader hdr; // Arena alloc not needed since there are no nested messages - const auto& result = reply.getResult(); - hdr.set_return_code_id(static_cast(result.getResult())); - if (!result.getMessage().empty()) { - hdr.set_return_code_message(result.getMessage().data(), result.getMessage().size()); - } - hdr.set_message_id(reply.getMsgId()); - hdr.set_priority(reply.getPriority()); - - const auto header_size = hdr.ByteSizeLong(); - assert(header_size <= UINT32_MAX); - buf.putInt(static_cast(header_size)); - - auto* dest_buf = reinterpret_cast(buf.allocate(header_size)); - [[maybe_unused]] bool ok = hdr.SerializeWithCachedSizesToArray(dest_buf); - assert(ok); -} - -void decode_request_header(document::ByteBuffer& buf, protobuf::RequestHeader& hdr) { - auto hdr_len = static_cast(SerializationHelper::getInt(buf)); - if (hdr_len > buf.getRemaining()) { - throw document::BufferOutOfBoundsException(buf.getPos(), hdr_len); - } - bool ok = hdr.ParseFromArray(buf.getBufferAtPos(), hdr_len); - if (!ok) { - throw vespalib::IllegalArgumentException("Malformed protobuf request header"); - } - buf.incPos(hdr_len); -} - -void decode_response_header(document::ByteBuffer& buf, protobuf::ResponseHeader& hdr) { - auto hdr_len = static_cast(SerializationHelper::getInt(buf)); - if (hdr_len > buf.getRemaining()) { - throw document::BufferOutOfBoundsException(buf.getPos(), hdr_len); - } - bool ok = hdr.ParseFromArray(buf.getBufferAtPos(), hdr_len); - if (!ok) { - throw vespalib::IllegalArgumentException("Malformed protobuf response header"); - } - buf.incPos(hdr_len); -} - -} // anonymous namespace - -template -class BaseEncoder { - vespalib::GrowableByteBuffer& _out_buf; - ::google::protobuf::Arena _arena; - ProtobufType* _proto_obj; -public: - explicit BaseEncoder(vespalib::GrowableByteBuffer& out_buf) - : _out_buf(out_buf), - _arena(), - _proto_obj(::google::protobuf::Arena::Create(&_arena)) - { - } - - void encode() { - assert(_proto_obj != nullptr); - const auto sz = _proto_obj->ByteSizeLong(); - assert(sz <= UINT32_MAX); - auto* buf = reinterpret_cast(_out_buf.allocate(sz)); - [[maybe_unused]] bool ok = _proto_obj->SerializeWithCachedSizesToArray(buf); - assert(ok); - _proto_obj = nullptr; - } -protected: - vespalib::GrowableByteBuffer& buffer() noexcept { return _out_buf; } - - // Precondition: encode() is not called - ProtobufType& proto_obj() noexcept { return *_proto_obj; } - const ProtobufType& proto_obj() const noexcept { return *_proto_obj; } -}; - -template -class RequestEncoder : public BaseEncoder { -public: - RequestEncoder(vespalib::GrowableByteBuffer& out_buf, const api::StorageCommand& cmd) - : BaseEncoder(out_buf) - { - write_request_header(out_buf, cmd); - } - - // Precondition: encode() is not called - ProtobufType& request() noexcept { return this->proto_obj(); } - const ProtobufType& request() const noexcept { return this->proto_obj(); } -}; - -template -class ResponseEncoder : public BaseEncoder { -public: - ResponseEncoder(vespalib::GrowableByteBuffer& out_buf, const api::StorageReply& reply) - : BaseEncoder(out_buf) - { - write_response_header(out_buf, reply); - } - - // Precondition: encode() is not called - ProtobufType& response() noexcept { return this->proto_obj(); } - const ProtobufType& response() const noexcept { return this->proto_obj(); } -}; - -template -class RequestDecoder { - protobuf::RequestHeader _hdr; - ::google::protobuf::Arena _arena; - ProtobufType* _proto_obj; - const documentapi::LoadTypeSet& _load_types; -public: - RequestDecoder(document::ByteBuffer& in_buf, const documentapi::LoadTypeSet& load_types) - : _arena(), - _proto_obj(::google::protobuf::Arena::Create(&_arena)), - _load_types(load_types) - { - decode_request_header(in_buf, _hdr); - assert(in_buf.getRemaining() <= INT_MAX); - bool ok = _proto_obj->ParseFromArray(in_buf.getBufferAtPos(), in_buf.getRemaining()); - if (!ok) { - throw vespalib::IllegalArgumentException( - vespalib::make_string("Malformed protobuf request payload for %s", - ProtobufType::descriptor()->full_name().c_str())); - } - } - - void transfer_meta_information_to(api::StorageCommand& dest) { - dest.forceMsgId(_hdr.message_id()); - dest.setPriority(static_cast(_hdr.priority())); - dest.setSourceIndex(static_cast(_hdr.source_index())); - dest.setLoadType(_load_types[_hdr.loadtype_id()]); - } - - ProtobufType& request() noexcept { return *_proto_obj; } - const ProtobufType& request() const noexcept { return *_proto_obj; } -}; - -template -class ResponseDecoder { - protobuf::ResponseHeader _hdr; - ::google::protobuf::Arena _arena; - ProtobufType* _proto_obj; -public: - explicit ResponseDecoder(document::ByteBuffer& in_buf) - : _arena(), - _proto_obj(::google::protobuf::Arena::Create(&_arena)) - { - decode_response_header(in_buf, _hdr); - assert(in_buf.getRemaining() <= INT_MAX); - bool ok = _proto_obj->ParseFromArray(in_buf.getBufferAtPos(), in_buf.getRemaining()); - if (!ok) { - throw vespalib::IllegalArgumentException( - vespalib::make_string("Malformed protobuf response payload for %s", - ProtobufType::descriptor()->full_name().c_str())); - } - } - - ProtobufType& response() noexcept { return *_proto_obj; } - const ProtobufType& response() const noexcept { return *_proto_obj; } -}; - -template -void encode_request(vespalib::GrowableByteBuffer& out_buf, const api::StorageCommand& msg, Func&& f) { - RequestEncoder enc(out_buf, msg); - f(enc.request()); - enc.encode(); -} - -template -void encode_response(vespalib::GrowableByteBuffer& out_buf, const api::StorageReply& reply, Func&& f) { - ResponseEncoder enc(out_buf, reply); - auto& res = enc.response(); - f(res); - enc.encode(); -} - -template -std::unique_ptr -ProtocolSerialization7::decode_request(document::ByteBuffer& in_buf, Func&& f) const { - RequestDecoder dec(in_buf, _load_types); - const auto& req = dec.request(); - auto cmd = f(req); - dec.transfer_meta_information_to(*cmd); - return cmd; -} - -template -std::unique_ptr -ProtocolSerialization7::decode_response(document::ByteBuffer& in_buf, Func&& f) const { - ResponseDecoder dec(in_buf); - const auto& res = dec.response(); - auto reply = f(res); - return reply; -} - -template -void encode_bucket_request(vespalib::GrowableByteBuffer& out_buf, const api::BucketCommand& msg, Func&& f) { - encode_request(out_buf, msg, [&](ProtobufType& req) { - set_bucket(*req.mutable_bucket(), msg.getBucket()); - f(req); - }); -} - -template -std::unique_ptr -ProtocolSerialization7::decode_bucket_request(document::ByteBuffer& in_buf, Func&& f) const { - return decode_request(in_buf, [&](const ProtobufType& req) { - if (!req.has_bucket()) { - throw vespalib::IllegalArgumentException( - vespalib::make_string("Malformed protocol buffer request for %s; no bucket", - ProtobufType::descriptor()->full_name().c_str())); - } - const auto bucket = get_bucket(req.bucket()); - return f(req, bucket); - }); -} - -template -void encode_bucket_response(vespalib::GrowableByteBuffer& out_buf, const api::BucketReply& reply, Func&& f) { - encode_response(out_buf, reply, [&](ProtobufType& res) { - if (reply.hasBeenRemapped()) { - set_bucket_id(*res.mutable_remapped_bucket_id(), reply.getBucketId()); - } - f(res); - }); -} - -template -std::unique_ptr -ProtocolSerialization7::decode_bucket_response(document::ByteBuffer& in_buf, Func&& f) const { - return decode_response(in_buf, [&](const ProtobufType& res) { - auto reply = f(res); - if (res.has_remapped_bucket_id()) { - reply->remapBucketId(get_bucket_id(res.remapped_bucket_id())); - } - return reply; - }); -} - -template -void encode_bucket_info_response(vespalib::GrowableByteBuffer& out_buf, const api::BucketInfoReply& reply, Func&& f) { - encode_bucket_response(out_buf, reply, [&](ProtobufType& res) { - set_bucket_info(*res.mutable_bucket_info(), reply.getBucketInfo()); - f(res); - }); -} - -template -std::unique_ptr -ProtocolSerialization7::decode_bucket_info_response(document::ByteBuffer& in_buf, Func&& f) const { - return decode_bucket_response(in_buf, [&](const ProtobufType& res) { - auto reply = f(res); - if (res.has_bucket_info()) { - reply->setBucketInfo(get_bucket_info(res.bucket_info())); - } - return reply; - }); -} - -// TODO document protobuf ducktyping assumptions - -namespace { -// Inherit from known base class just to avoid having to template this. We don't care about its subtype anyway. -void no_op_encode([[maybe_unused]] ::google::protobuf::Message&) { - // nothing to do here. -} - -void set_document(protobuf::Document& target_doc, const document::Document& src_doc) { - vespalib::nbostream stream; - src_doc.serialize(stream); - target_doc.set_payload(stream.peek(), stream.size()); -} - -} - -// ----------------------------------------------------------------- -// Put -// ----------------------------------------------------------------- - -void ProtocolSerialization7::onEncode(GBBuf& buf, const api::PutCommand& msg) const { - encode_bucket_request(buf, msg, [&](auto& req) { - req.set_new_timestamp(msg.getTimestamp()); - req.set_expected_old_timestamp(msg.getUpdateTimestamp()); - if (msg.getCondition().isPresent()) { - set_tas_condition(*req.mutable_condition(), msg.getCondition()); - } - if (msg.getDocument()) { - set_document(*req.mutable_document(), *msg.getDocument()); - } - }); -} - -void ProtocolSerialization7::onEncode(GBBuf& buf, const api::PutReply& msg) const { - encode_bucket_info_response(buf, msg, [&](auto& res) { - res.set_was_found(msg.wasFound()); - }); -} - -api::StorageCommand::UP ProtocolSerialization7::onDecodePutCommand(BBuf& buf) const { - return decode_bucket_request(buf, [&](auto& req, auto& bucket) { - auto document = get_document(req.document(), type_repo()); - auto cmd = std::make_unique(bucket, std::move(document), req.new_timestamp()); - cmd->setUpdateTimestamp(req.expected_old_timestamp()); - if (req.has_condition()) { - cmd->setCondition(get_tas_condition(req.condition())); - } - return cmd; - }); -} - -api::StorageReply::UP ProtocolSerialization7::onDecodePutReply(const SCmd& cmd, BBuf& buf) const { - return decode_bucket_info_response(buf, [&](auto& res) { - return std::make_unique(static_cast(cmd), res.was_found()); - }); -} - -// ----------------------------------------------------------------- -// Update -// ----------------------------------------------------------------- - -void ProtocolSerialization7::onEncode(GBBuf& buf, const api::UpdateCommand& msg) const { - encode_bucket_request(buf, msg, [&](auto& req) { - auto* update = msg.getUpdate().get(); - if (update) { - set_update(*req.mutable_update(), *update); - } - req.set_new_timestamp(msg.getTimestamp()); - req.set_expected_old_timestamp(msg.getOldTimestamp()); - if (msg.getCondition().isPresent()) { - set_tas_condition(*req.mutable_condition(), msg.getCondition()); - } - }); -} - -void ProtocolSerialization7::onEncode(GBBuf& buf, const api::UpdateReply& msg) const { - encode_bucket_info_response(buf, msg, [&](auto& res) { - res.set_updated_timestamp(msg.getOldTimestamp()); - }); -} - -api::StorageCommand::UP ProtocolSerialization7::onDecodeUpdateCommand(BBuf& buf) const { - return decode_bucket_request(buf, [&](auto& req, auto& bucket) { - auto update = get_update(req.update(), type_repo()); - auto cmd = std::make_unique(bucket, std::move(update), req.new_timestamp()); - cmd->setOldTimestamp(req.expected_old_timestamp()); - if (req.has_condition()) { - cmd->setCondition(get_tas_condition(req.condition())); - } - return cmd; - }); -} - -api::StorageReply::UP ProtocolSerialization7::onDecodeUpdateReply(const SCmd& cmd, BBuf& buf) const { - return decode_bucket_info_response(buf, [&](auto& res) { - return std::make_unique(static_cast(cmd), - res.updated_timestamp()); - }); -} - -// ----------------------------------------------------------------- -// Remove -// ----------------------------------------------------------------- - -void ProtocolSerialization7::onEncode(GBBuf& buf, const api::RemoveCommand& msg) const { - encode_bucket_request(buf, msg, [&](auto& req) { - auto doc_id_str = msg.getDocumentId().toString(); - req.set_document_id(doc_id_str.data(), doc_id_str.size()); - req.set_new_timestamp(msg.getTimestamp()); - if (msg.getCondition().isPresent()) { - set_tas_condition(*req.mutable_condition(), msg.getCondition()); - } - }); -} - -void ProtocolSerialization7::onEncode(GBBuf& buf, const api::RemoveReply& msg) const { - encode_bucket_info_response(buf, msg, [&](auto& res) { - res.set_removed_timestamp(msg.getOldTimestamp()); - }); -} - -api::StorageCommand::UP ProtocolSerialization7::onDecodeRemoveCommand(BBuf& buf) const { - return decode_bucket_request(buf, [&](auto& req, auto& bucket) { - document::DocumentId doc_id(vespalib::stringref(req.document_id().data(), req.document_id().size())); - auto cmd = std::make_unique(bucket, doc_id, req.new_timestamp()); - if (req.has_condition()) { - cmd->setCondition(get_tas_condition(req.condition())); - } - return cmd; - }); -} - -api::StorageReply::UP ProtocolSerialization7::onDecodeRemoveReply(const SCmd& cmd, BBuf& buf) const { - return decode_bucket_info_response(buf, [&](auto& res) { - return std::make_unique(static_cast(cmd), - res.removed_timestamp()); - }); -} - -// ----------------------------------------------------------------- -// Get -// ----------------------------------------------------------------- - -void ProtocolSerialization7::onEncode(GBBuf& buf, const api::GetCommand& msg) const { - encode_bucket_request(buf, msg, [&](auto& req) { - auto doc_id = msg.getDocumentId().toString(); - req.set_document_id(doc_id.data(), doc_id.size()); - req.set_before_timestamp(msg.getBeforeTimestamp()); - if (!msg.getFieldSet().empty()) { - req.set_field_set(msg.getFieldSet().data(), msg.getFieldSet().size()); - } - }); -} - -void ProtocolSerialization7::onEncode(GBBuf& buf, const api::GetReply& msg) const { - encode_bucket_info_response(buf, msg, [&](auto& res) { - if (msg.getDocument()) { - set_document(*res.mutable_document(), *msg.getDocument()); - } - res.set_last_modified_timestamp(msg.getLastModifiedTimestamp()); - }); -} - -api::StorageCommand::UP ProtocolSerialization7::onDecodeGetCommand(BBuf& buf) const { - return decode_bucket_request(buf, [&](auto& req, auto& bucket) { - document::DocumentId doc_id(vespalib::stringref(req.document_id().data(), req.document_id().size())); - return std::make_unique(bucket, std::move(doc_id), - req.field_set(), req.before_timestamp()); - }); -} - -api::StorageReply::UP ProtocolSerialization7::onDecodeGetReply(const SCmd& cmd, BBuf& buf) const { - return decode_bucket_info_response(buf, [&](auto& res) { - try { - auto document = get_document(res.document(), type_repo()); - return std::make_unique(static_cast(cmd), - std::move(document), res.last_modified_timestamp()); - } catch (std::exception& e) { - auto reply = std::make_unique(static_cast(cmd), - std::shared_ptr(), 0u); - reply->setResult(api::ReturnCode(api::ReturnCode::UNPARSEABLE, e.what())); - return reply; - } - }); -} - -// ----------------------------------------------------------------- -// Revert -// ----------------------------------------------------------------- - -void ProtocolSerialization7::onEncode(GBBuf& buf, const api::RevertCommand& msg) const { - encode_bucket_request(buf, msg, [&](auto& req) { - auto* tokens = req.mutable_revert_tokens(); - assert(msg.getRevertTokens().size() <= INT_MAX); - tokens->Reserve(static_cast(msg.getRevertTokens().size())); - for (auto token : msg.getRevertTokens()) { - tokens->Add(token); - } - }); -} - -void ProtocolSerialization7::onEncode(GBBuf& buf, const api::RevertReply& msg) const { - encode_bucket_info_response(buf, msg, no_op_encode); -} - -api::StorageCommand::UP ProtocolSerialization7::onDecodeRevertCommand(BBuf& buf) const { - return decode_bucket_request(buf, [&](auto& req, auto& bucket) { - std::vector tokens; - tokens.reserve(req.revert_tokens_size()); - for (auto token : req.revert_tokens()) { - tokens.emplace_back(api::Timestamp(token)); - } - return std::make_unique(bucket, std::move(tokens)); - }); -} - -api::StorageReply::UP ProtocolSerialization7::onDecodeRevertReply(const SCmd& cmd, BBuf& buf) const { - return decode_bucket_info_response(buf, [&]([[maybe_unused]] auto& res) { - return std::make_unique(static_cast(cmd)); - }); -} - -// ----------------------------------------------------------------- -// RemoveLocation -// ----------------------------------------------------------------- - -void ProtocolSerialization7::onEncode(GBBuf& buf, const api::RemoveLocationCommand& msg) const { - encode_bucket_request(buf, msg, [&](auto& req) { - req.set_document_selection(msg.getDocumentSelection().data(), msg.getDocumentSelection().size()); - }); -} - -void ProtocolSerialization7::onEncode(GBBuf& buf, const api::RemoveLocationReply& msg) const { - encode_bucket_info_response(buf, msg, no_op_encode); -} - -api::StorageCommand::UP ProtocolSerialization7::onDecodeRemoveLocationCommand(BBuf& buf) const { - return decode_bucket_request(buf, [&](auto& req, auto& bucket) { - return std::make_unique(req.document_selection(), bucket); - }); -} - -api::StorageReply::UP ProtocolSerialization7::onDecodeRemoveLocationReply(const SCmd& cmd, BBuf& buf) const { - return decode_bucket_info_response(buf, [&]([[maybe_unused]] auto& res) { - return std::make_unique(static_cast(cmd)); - }); -} - -// ----------------------------------------------------------------- -// DeleteBucket -// ----------------------------------------------------------------- - -void ProtocolSerialization7::onEncode(GBBuf& buf, const api::DeleteBucketCommand& msg) const { - encode_bucket_request(buf, msg, [&](auto& req) { - set_bucket_info(*req.mutable_expected_bucket_info(), msg.getBucketInfo()); - }); -} - -void ProtocolSerialization7::onEncode(GBBuf& buf, const api::DeleteBucketReply& msg) const { - encode_bucket_info_response(buf, msg, no_op_encode); -} - -api::StorageCommand::UP ProtocolSerialization7::onDecodeDeleteBucketCommand(BBuf& buf) const { - return decode_bucket_request(buf, [&](auto& req, auto& bucket) { - auto cmd = std::make_unique(bucket); - if (req.has_expected_bucket_info()) { - cmd->setBucketInfo(get_bucket_info(req.expected_bucket_info())); - } - return cmd; - }); -} - -api::StorageReply::UP ProtocolSerialization7::onDecodeDeleteBucketReply(const SCmd& cmd, BBuf& buf) const { - return decode_bucket_info_response(buf, [&]([[maybe_unused]] auto& res) { - return std::make_unique(static_cast(cmd)); - }); -} - -// ----------------------------------------------------------------- -// CreateBucket -// ----------------------------------------------------------------- - -void ProtocolSerialization7::onEncode(GBBuf& buf, const api::CreateBucketCommand& msg) const { - encode_bucket_request(buf, msg, [&](auto& req) { - req.set_create_as_active(msg.getActive()); - }); -} - -void ProtocolSerialization7::onEncode(GBBuf& buf, const api::CreateBucketReply& msg) const { - encode_bucket_info_response(buf, msg, no_op_encode); -} - -api::StorageCommand::UP ProtocolSerialization7::onDecodeCreateBucketCommand(BBuf& buf) const { - return decode_bucket_request(buf, [&](auto& req, auto& bucket) { - auto cmd = std::make_unique(bucket); - cmd->setActive(req.create_as_active()); - return cmd; - }); -} - -api::StorageReply::UP ProtocolSerialization7::onDecodeCreateBucketReply(const SCmd& cmd, BBuf& buf) const { - return decode_bucket_info_response(buf, [&]([[maybe_unused]] auto& res) { - return std::make_unique(static_cast(cmd)); - }); -} - -// ----------------------------------------------------------------- -// MergeBucket -// ----------------------------------------------------------------- - -namespace { - -void set_merge_nodes(::google::protobuf::RepeatedPtrField& dest, - const std::vector& src) -{ - dest.Reserve(src.size()); - for (const auto& src_node : src) { - auto* dest_node = dest.Add(); - dest_node->set_index(src_node.index); - dest_node->set_source_only(src_node.sourceOnly); - } -} - -std::vector get_merge_nodes( - const ::google::protobuf::RepeatedPtrField& src) -{ - std::vector nodes; - nodes.reserve(src.size()); - for (const auto& node : src) { - nodes.emplace_back(node.index(), node.source_only()); - } - return nodes; -} - -} - -void ProtocolSerialization7::onEncode(GBBuf& buf, const api::MergeBucketCommand& msg) const { - encode_bucket_request(buf, msg, [&](auto& req) { - set_merge_nodes(*req.mutable_nodes(), msg.getNodes()); - req.set_max_timestamp(msg.getMaxTimestamp()); - req.set_cluster_state_version(msg.getClusterStateVersion()); - for (uint16_t chain_node : msg.getChain()) { - req.add_node_chain(chain_node); - } - }); -} - -void ProtocolSerialization7::onEncode(GBBuf& buf, const api::MergeBucketReply& msg) const { - encode_bucket_response(buf, msg, no_op_encode); -} - -api::StorageCommand::UP ProtocolSerialization7::onDecodeMergeBucketCommand(BBuf& buf) const { - return decode_bucket_request(buf, [&](auto& req, auto& bucket) { - auto nodes = get_merge_nodes(req.nodes()); - auto cmd = std::make_unique(bucket, std::move(nodes), req.max_timestamp()); - cmd->setClusterStateVersion(req.cluster_state_version()); - std::vector chain; - chain.reserve(req.node_chain_size()); - for (uint16_t node : req.node_chain()) { - chain.emplace_back(node); - } - cmd->setChain(std::move(chain)); - return cmd; - }); -} - -api::StorageReply::UP ProtocolSerialization7::onDecodeMergeBucketReply(const SCmd& cmd, BBuf& buf) const { - return decode_bucket_response(buf, [&]([[maybe_unused]] auto& res) { - return std::make_unique(static_cast(cmd)); - }); -} - -// ----------------------------------------------------------------- -// GetBucketDiff -// ----------------------------------------------------------------- - -namespace { - -void set_global_id(protobuf::GlobalId& dest, const document::GlobalId& src) { - static_assert(document::GlobalId::LENGTH == 12); - uint64_t lo64; - uint32_t hi32; - memcpy(&lo64, src.get(), sizeof(uint64_t)); - memcpy(&hi32, src.get() + sizeof(uint64_t), sizeof(uint32_t)); - dest.set_lo_64(lo64); - dest.set_hi_32(hi32); -} - -document::GlobalId get_global_id(const protobuf::GlobalId& src) { - static_assert(document::GlobalId::LENGTH == 12); - const uint64_t lo64 = src.lo_64(); - const uint32_t hi32 = src.hi_32(); - - char buf[document::GlobalId::LENGTH]; - memcpy(buf, &lo64, sizeof(uint64_t)); - memcpy(buf + sizeof(uint64_t), &hi32, sizeof(uint32_t)); - return document::GlobalId(buf); -} - -void set_diff_entry(protobuf::MetaDiffEntry& dest, const api::GetBucketDiffCommand::Entry& src) { - dest.set_timestamp(src._timestamp); - set_global_id(*dest.mutable_gid(), src._gid); - dest.set_header_size(src._headerSize); - dest.set_body_size(src._bodySize); - dest.set_flags(src._flags); - dest.set_presence_mask(src._hasMask); -} - -api::GetBucketDiffCommand::Entry get_diff_entry(const protobuf::MetaDiffEntry& src) { - api::GetBucketDiffCommand::Entry e; - e._timestamp = src.timestamp(); - e._gid = get_global_id(src.gid()); - e._headerSize = src.header_size(); - e._bodySize = src.body_size(); - e._flags = src.flags(); - e._hasMask = src.presence_mask(); - return e; -} - -void fill_proto_meta_diff(::google::protobuf::RepeatedPtrField& dest, - const std::vector& src) { - for (const auto& diff_entry : src) { - set_diff_entry(*dest.Add(), diff_entry); - } -} - -void fill_api_meta_diff(std::vector& dest, - const ::google::protobuf::RepeatedPtrField& src) { - // FIXME GetBucketDiffReply ctor copies the diff from the request for some reason - // TODO verify this isn't actually used anywhere and remove this "feature". - dest.clear(); - dest.reserve(src.size()); - for (const auto& diff_entry : src) { - dest.emplace_back(get_diff_entry(diff_entry)); - } -} - -} // anonymous namespace - -void ProtocolSerialization7::onEncode(GBBuf& buf, const api::GetBucketDiffCommand& msg) const { - encode_bucket_request(buf, msg, [&](auto& req) { - set_merge_nodes(*req.mutable_nodes(), msg.getNodes()); - req.set_max_timestamp(msg.getMaxTimestamp()); - fill_proto_meta_diff(*req.mutable_diff(), msg.getDiff()); - }); -} - -void ProtocolSerialization7::onEncode(GBBuf& buf, const api::GetBucketDiffReply& msg) const { - encode_bucket_response(buf, msg, [&](auto& res) { - fill_proto_meta_diff(*res.mutable_diff(), msg.getDiff()); - }); -} - -api::StorageCommand::UP ProtocolSerialization7::onDecodeGetBucketDiffCommand(BBuf& buf) const { - return decode_bucket_request(buf, [&](auto& req, auto& bucket) { - auto nodes = get_merge_nodes(req.nodes()); - auto cmd = std::make_unique(bucket, std::move(nodes), req.max_timestamp()); - fill_api_meta_diff(cmd->getDiff(), req.diff()); - return cmd; - }); -} - -api::StorageReply::UP ProtocolSerialization7::onDecodeGetBucketDiffReply(const SCmd& cmd, BBuf& buf) const { - return decode_bucket_response(buf, [&](auto& res) { - auto reply = std::make_unique(static_cast(cmd)); - fill_api_meta_diff(reply->getDiff(), res.diff()); - return reply; - }); -} - -// ----------------------------------------------------------------- -// ApplyBucketDiff -// ----------------------------------------------------------------- - -namespace { - -void fill_api_apply_diff_vector(std::vector& diff, - const ::google::protobuf::RepeatedPtrField& src) -{ - // We use the same approach as the legacy protocols here in that we pre-reserve and - // directly write into the vector. This avoids having to ensure all buffer management is movable. - size_t n_entries = src.size(); - diff.resize(n_entries); - for (size_t i = 0; i < n_entries; ++i) { - auto& proto_entry = src.Get(i); - auto& dest = diff[i]; - dest._entry = get_diff_entry(proto_entry.entry_meta()); - dest._docName = proto_entry.document_id(); - // TODO consider making buffers std::strings instead to avoid explicit zeroing-on-resize overhead - dest._headerBlob.resize(proto_entry.header_blob().size()); - memcpy(dest._headerBlob.data(), proto_entry.header_blob().data(), proto_entry.header_blob().size()); - dest._bodyBlob.resize(proto_entry.body_blob().size()); - memcpy(dest._bodyBlob.data(), proto_entry.body_blob().data(), proto_entry.body_blob().size()); - } -} - -void fill_proto_apply_diff_vector(::google::protobuf::RepeatedPtrField& dest, - const std::vector& src) -{ - dest.Reserve(src.size()); - for (const auto& entry : src) { - auto* proto_entry = dest.Add(); - set_diff_entry(*proto_entry->mutable_entry_meta(), entry._entry); - proto_entry->set_document_id(entry._docName.data(), entry._docName.size()); - proto_entry->set_header_blob(entry._headerBlob.data(), entry._headerBlob.size()); - proto_entry->set_body_blob(entry._bodyBlob.data(), entry._bodyBlob.size()); - } -} - -} // anonymous namespace - -void ProtocolSerialization7::onEncode(GBBuf& buf, const api::ApplyBucketDiffCommand& msg) const { - encode_bucket_request(buf, msg, [&](auto& req) { - set_merge_nodes(*req.mutable_nodes(), msg.getNodes()); - req.set_max_buffer_size(msg.getMaxBufferSize()); - fill_proto_apply_diff_vector(*req.mutable_entries(), msg.getDiff()); - }); -} - -void ProtocolSerialization7::onEncode(GBBuf& buf, const api::ApplyBucketDiffReply& msg) const { - encode_bucket_response(buf, msg, [&](auto& res) { - fill_proto_apply_diff_vector(*res.mutable_entries(), msg.getDiff()); - }); -} - -api::StorageCommand::UP ProtocolSerialization7::onDecodeApplyBucketDiffCommand(BBuf& buf) const { - return decode_bucket_request(buf, [&](auto& req, auto& bucket) { - auto nodes = get_merge_nodes(req.nodes()); - auto cmd = std::make_unique(bucket, std::move(nodes), req.max_buffer_size()); - fill_api_apply_diff_vector(cmd->getDiff(), req.entries()); - return cmd; - }); -} - -api::StorageReply::UP ProtocolSerialization7::onDecodeApplyBucketDiffReply(const SCmd& cmd, BBuf& buf) const { - return decode_bucket_response(buf, [&](auto& res) { - auto reply = std::make_unique(static_cast(cmd)); - fill_api_apply_diff_vector(reply->getDiff(), res.entries()); - return reply; - }); -} - -// ----------------------------------------------------------------- -// RequestBucketInfo -// ----------------------------------------------------------------- - -void ProtocolSerialization7::onEncode(GBBuf& buf, const api::RequestBucketInfoCommand& msg) const { - encode_request(buf, msg, [&](auto& req) { - set_bucket_space(*req.mutable_bucket_space(), msg.getBucketSpace()); - auto& buckets = msg.getBuckets(); - if (!buckets.empty()) { - auto* proto_buckets = req.mutable_explicit_bucket_set(); - for (const auto& b : buckets) { - set_bucket_id(*proto_buckets->add_bucket_ids(), b); - } - } else { - auto* all_buckets = req.mutable_all_buckets(); - auto cluster_state = msg.getSystemState().toString(); - all_buckets->set_distributor_index(msg.getDistributor()); - all_buckets->set_cluster_state(cluster_state.data(), cluster_state.size()); - all_buckets->set_distribution_hash(msg.getDistributionHash().data(), msg.getDistributionHash().size()); - } - }); -} - -void ProtocolSerialization7::onEncode(GBBuf& buf, const api::RequestBucketInfoReply& msg) const { - encode_response(buf, msg, [&](auto& res) { - auto* proto_info = res.mutable_bucket_infos(); - proto_info->Reserve(msg.getBucketInfo().size()); - for (const auto& entry : msg.getBucketInfo()) { - auto* bucket_and_info = proto_info->Add(); - bucket_and_info->set_raw_bucket_id(entry._bucketId.getRawId()); - set_bucket_info(*bucket_and_info->mutable_bucket_info(), entry._info); - } - }); -} - -api::StorageCommand::UP ProtocolSerialization7::onDecodeRequestBucketInfoCommand(BBuf& buf) const { - return decode_request(buf, [&](auto& req) { - auto bucket_space = get_bucket_space(req.bucket_space()); - if (req.has_explicit_bucket_set()) { - const uint32_t n_buckets = req.explicit_bucket_set().bucket_ids_size(); - std::vector buckets(n_buckets); - const auto& proto_buckets = req.explicit_bucket_set().bucket_ids(); - for (uint32_t i = 0; i < n_buckets; ++i) { - buckets[i] = get_bucket_id(proto_buckets.Get(i)); - } - return std::make_unique(bucket_space, std::move(buckets)); - } else if (req.has_all_buckets()) { - const auto& all_req = req.all_buckets(); - return std::make_unique( - bucket_space, all_req.distributor_index(), - lib::ClusterState(all_req.cluster_state()), all_req.distribution_hash()); - } else { - throw vespalib::IllegalArgumentException("RequestBucketInfo does not have any applicable fields set"); - } - }); -} - -api::StorageReply::UP ProtocolSerialization7::onDecodeRequestBucketInfoReply(const SCmd& cmd, BBuf& buf) const { - return decode_response(buf, [&](auto& res) { - auto reply = std::make_unique(static_cast(cmd)); - auto& dest_entries = reply->getBucketInfo(); - uint32_t n_entries = res.bucket_infos_size(); - dest_entries.resize(n_entries); - for (uint32_t i = 0; i < n_entries; ++i) { - const auto& proto_entry = res.bucket_infos(i); - dest_entries[i]._bucketId = document::BucketId(proto_entry.raw_bucket_id()); - dest_entries[i]._info = get_bucket_info(proto_entry.bucket_info()); - } - return reply; - }); -} - -// ----------------------------------------------------------------- -// NotifyBucketChange -// ----------------------------------------------------------------- - -void ProtocolSerialization7::onEncode(GBBuf& buf, const api::NotifyBucketChangeCommand& msg) const { - encode_bucket_request(buf, msg, [&](auto& req) { - set_bucket_info(*req.mutable_bucket_info(), msg.getBucketInfo()); - }); -} - -void ProtocolSerialization7::onEncode(GBBuf& buf, const api::NotifyBucketChangeReply& msg) const { - encode_response(buf, msg, no_op_encode); -} - -api::StorageCommand::UP ProtocolSerialization7::onDecodeNotifyBucketChangeCommand(BBuf& buf) const { - return decode_bucket_request(buf, [&](auto& req, auto& bucket) { - auto bucket_info = get_bucket_info(req.bucket_info()); - return std::make_unique(bucket, bucket_info); - }); -} - -api::StorageReply::UP ProtocolSerialization7::onDecodeNotifyBucketChangeReply(const SCmd& cmd, BBuf& buf) const { - return decode_response(buf, [&]([[maybe_unused]] auto& res) { - return std::make_unique(static_cast(cmd)); - }); -} - -// ----------------------------------------------------------------- -// SplitBucket -// ----------------------------------------------------------------- - -void ProtocolSerialization7::onEncode(GBBuf& buf, const api::SplitBucketCommand& msg) const { - encode_bucket_request(buf, msg, [&](auto& req) { - req.set_min_split_bits(msg.getMinSplitBits()); - req.set_max_split_bits(msg.getMaxSplitBits()); - req.set_min_byte_size(msg.getMinByteSize()); - req.set_min_doc_count(msg.getMinDocCount()); - }); -} - -void ProtocolSerialization7::onEncode(GBBuf& buf, const api::SplitBucketReply& msg) const { - encode_bucket_response(buf, msg, [&](auto& res) { - for (const auto& split_info : msg.getSplitInfo()) { - auto* proto_info = res.add_split_info(); - proto_info->set_raw_bucket_id(split_info.first.getRawId()); - set_bucket_info(*proto_info->mutable_bucket_info(), split_info.second); - } - }); -} - -api::StorageCommand::UP ProtocolSerialization7::onDecodeSplitBucketCommand(BBuf& buf) const { - return decode_bucket_request(buf, [&](auto& req, auto& bucket) { - auto cmd = std::make_unique(bucket); - cmd->setMinSplitBits(static_cast(req.min_split_bits())); - cmd->setMaxSplitBits(static_cast(req.max_split_bits())); - cmd->setMinByteSize(req.min_byte_size()); - cmd->setMinDocCount(req.min_doc_count()); - return cmd; - }); -} - -api::StorageReply::UP ProtocolSerialization7::onDecodeSplitBucketReply(const SCmd& cmd, BBuf& buf) const { - return decode_bucket_response(buf, [&](auto& res) { - auto reply = std::make_unique(static_cast(cmd)); - auto& dest_info = reply->getSplitInfo(); - dest_info.reserve(res.split_info_size()); - for (const auto& proto_info : res.split_info()) { - dest_info.emplace_back(document::BucketId(proto_info.raw_bucket_id()), - get_bucket_info(proto_info.bucket_info())); - } - return reply; - }); -} - -// ----------------------------------------------------------------- -// JoinBuckets -// ----------------------------------------------------------------- - -void ProtocolSerialization7::onEncode(GBBuf& buf, const api::JoinBucketsCommand& msg) const { - encode_bucket_request(buf, msg, [&](auto& req) { - for (const auto& source : msg.getSourceBuckets()) { - set_bucket_id(*req.add_source_buckets(), source); - } - req.set_min_join_bits(msg.getMinJoinBits()); - }); -} - -void ProtocolSerialization7::onEncode(GBBuf& buf, const api::JoinBucketsReply& msg) const { - encode_bucket_info_response(buf, msg, no_op_encode); -} - -api::StorageCommand::UP ProtocolSerialization7::onDecodeJoinBucketsCommand(BBuf& buf) const { - return decode_bucket_request(buf, [&](auto& req, auto& bucket) { - auto cmd = std::make_unique(bucket); - auto& entries = cmd->getSourceBuckets(); - for (const auto& proto_bucket : req.source_buckets()) { - entries.emplace_back(get_bucket_id(proto_bucket)); - } - cmd->setMinJoinBits(static_cast(req.min_join_bits())); - return cmd; - }); -} - -api::StorageReply::UP ProtocolSerialization7::onDecodeJoinBucketsReply(const SCmd& cmd, BBuf& buf) const { - return decode_bucket_info_response(buf, [&]([[maybe_unused]] auto& res) { - return std::make_unique(static_cast(cmd)); - }); -} - -// ----------------------------------------------------------------- -// SetBucketState -// ----------------------------------------------------------------- - -void ProtocolSerialization7::onEncode(GBBuf& buf, const api::SetBucketStateCommand& msg) const { - encode_bucket_request(buf, msg, [&](auto& req) { - auto state = (msg.getState() == api::SetBucketStateCommand::BUCKET_STATE::ACTIVE - ? protobuf::SetBucketStateRequest_BucketState_Active - : protobuf::SetBucketStateRequest_BucketState_Inactive); - req.set_state(state); - }); -} - -void ProtocolSerialization7::onEncode(GBBuf& buf, const api::SetBucketStateReply& msg) const { - // SetBucketStateReply is _technically_ a BucketInfoReply, but the legacy protocol impls - // do _not_ encode bucket info as part of the wire format (and it's not used on the distributor), - // so we follow that here and only encode remapping information. - encode_bucket_response(buf, msg, no_op_encode); -} - -api::StorageCommand::UP ProtocolSerialization7::onDecodeSetBucketStateCommand(BBuf& buf) const { - return decode_bucket_request(buf, [&](auto& req, auto& bucket) { - auto state = (req.state() == protobuf::SetBucketStateRequest_BucketState_Active - ? api::SetBucketStateCommand::BUCKET_STATE::ACTIVE - : api::SetBucketStateCommand::BUCKET_STATE::INACTIVE); - return std::make_unique(bucket, state); - }); -} - -api::StorageReply::UP ProtocolSerialization7::onDecodeSetBucketStateReply(const SCmd& cmd, BBuf& buf) const { - return decode_bucket_response(buf, [&]([[maybe_unused]] auto& res) { - return std::make_unique(static_cast(cmd)); - }); -} - -// ----------------------------------------------------------------- -// CreateVisitor -// ----------------------------------------------------------------- - -void ProtocolSerialization7::onEncode(GBBuf& buf, const api::CreateVisitorCommand& msg) const { - encode_request(buf, msg, [&](auto& req) { - set_bucket_space(*req.mutable_bucket_space(), msg.getBucketSpace()); - for (const auto& bucket : msg.getBuckets()) { - set_bucket_id(*req.add_buckets(), bucket); - } - - auto* ctrl_meta = req.mutable_control_meta(); - ctrl_meta->set_library_name(msg.getLibraryName().data(), msg.getLibraryName().size()); - ctrl_meta->set_instance_id(msg.getInstanceId().data(), msg.getInstanceId().size()); - ctrl_meta->set_visitor_command_id(msg.getVisitorCmdId()); - ctrl_meta->set_control_destination(msg.getControlDestination().data(), msg.getControlDestination().size()); - ctrl_meta->set_data_destination(msg.getDataDestination().data(), msg.getDataDestination().size()); - ctrl_meta->set_queue_timeout(msg.getQueueTimeout()); - ctrl_meta->set_max_pending_reply_count(msg.getMaximumPendingReplyCount()); - ctrl_meta->set_max_buckets_per_visitor(msg.getMaxBucketsPerVisitor()); - - auto* constraints = req.mutable_constraints(); - constraints->set_document_selection(msg.getDocumentSelection().data(), msg.getDocumentSelection().size()); - constraints->set_from_time_usec(msg.getFromTime()); - constraints->set_to_time_usec(msg.getToTime()); - constraints->set_visit_inconsistent_buckets(msg.visitInconsistentBuckets()); - constraints->set_visit_removes(msg.visitRemoves()); - constraints->set_field_set(msg.getFieldSet().data(), msg.getFieldSet().size()); - - for (const auto& param : msg.getParameters()) { - auto* proto_param = req.add_client_parameters(); - proto_param->set_key(param.first.data(), param.first.size()); - proto_param->set_value(param.second.data(), param.second.size()); - } - }); -} - -void ProtocolSerialization7::onEncode(GBBuf& buf, const api::CreateVisitorReply& msg) const { - encode_response(buf, msg, [&](auto& res) { - auto& stats = msg.getVisitorStatistics(); - auto* proto_stats = res.mutable_visitor_statistics(); - proto_stats->set_buckets_visited(stats.getBucketsVisited()); - proto_stats->set_documents_visited(stats.getDocumentsVisited()); - proto_stats->set_bytes_visited(stats.getBytesVisited()); - proto_stats->set_documents_returned(stats.getDocumentsReturned()); - proto_stats->set_bytes_returned(stats.getBytesReturned()); - proto_stats->set_second_pass_documents_returned(stats.getSecondPassDocumentsReturned()); - proto_stats->set_second_pass_bytes_returned(stats.getSecondPassBytesReturned()); - }); -} - -api::StorageCommand::UP ProtocolSerialization7::onDecodeCreateVisitorCommand(BBuf& buf) const { - return decode_request(buf, [&](auto& req) { - auto bucket_space = get_bucket_space(req.bucket_space()); - auto& ctrl_meta = req.control_meta(); - auto& constraints = req.constraints(); - auto cmd = std::make_unique(bucket_space, ctrl_meta.library_name(), - ctrl_meta.instance_id(), constraints.document_selection()); - for (const auto& proto_bucket : req.buckets()) { - cmd->getBuckets().emplace_back(get_bucket_id(proto_bucket)); - } - - cmd->setVisitorCmdId(ctrl_meta.visitor_command_id()); - cmd->setControlDestination(ctrl_meta.control_destination()); - cmd->setDataDestination(ctrl_meta.data_destination()); - cmd->setMaximumPendingReplyCount(ctrl_meta.max_pending_reply_count()); - cmd->setQueueTimeout(ctrl_meta.queue_timeout()); - cmd->setMaxBucketsPerVisitor(ctrl_meta.max_buckets_per_visitor()); - cmd->setVisitorDispatcherVersion(50); // FIXME this magic number is lifted verbatim from the 5.1 protocol impl - - for (const auto& proto_param : req.client_parameters()) { - cmd->getParameters().set(proto_param.key(), proto_param.value()); - } - - cmd->setFromTime(constraints.from_time_usec()); - cmd->setToTime(constraints.to_time_usec()); - cmd->setVisitRemoves(constraints.visit_removes()); - cmd->setFieldSet(constraints.field_set()); - cmd->setVisitInconsistentBuckets(constraints.visit_inconsistent_buckets()); - return cmd; - }); -} - -api::StorageReply::UP ProtocolSerialization7::onDecodeCreateVisitorReply(const SCmd& cmd, BBuf& buf) const { - return decode_response(buf, [&](auto& res) { - auto reply = std::make_unique(static_cast(cmd)); - vdslib::VisitorStatistics vs; - const auto& proto_stats = res.visitor_statistics(); - vs.setBucketsVisited(proto_stats.buckets_visited()); - vs.setDocumentsVisited(proto_stats.documents_visited()); - vs.setBytesVisited(proto_stats.bytes_visited()); - vs.setDocumentsReturned(proto_stats.documents_returned()); - vs.setBytesReturned(proto_stats.bytes_returned()); - vs.setSecondPassDocumentsReturned(proto_stats.second_pass_documents_returned()); - vs.setSecondPassBytesReturned(proto_stats.second_pass_bytes_returned()); - reply->setVisitorStatistics(vs); - return reply; - }); -} - -// ----------------------------------------------------------------- -// DestroyVisitor -// ----------------------------------------------------------------- - -void ProtocolSerialization7::onEncode(GBBuf& buf, const api::DestroyVisitorCommand& msg) const { - encode_request(buf, msg, [&](auto& req) { - req.set_instance_id(msg.getInstanceId().data(), msg.getInstanceId().size()); - }); -} - -void ProtocolSerialization7::onEncode(GBBuf& buf, const api::DestroyVisitorReply& msg) const { - encode_response(buf, msg, no_op_encode); -} - -api::StorageCommand::UP ProtocolSerialization7::onDecodeDestroyVisitorCommand(BBuf& buf) const { - return decode_request(buf, [&](auto& req) { - return std::make_unique(req.instance_id()); - }); -} - -api::StorageReply::UP ProtocolSerialization7::onDecodeDestroyVisitorReply(const SCmd& cmd, BBuf& buf) const { - return decode_response(buf, [&]([[maybe_unused]] auto& res) { - return std::make_unique(static_cast(cmd)); - }); -} - -} // storage::mbusprot diff --git a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.h b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.h deleted file mode 100644 index f3499150278..00000000000 --- a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.h +++ /dev/null @@ -1,146 +0,0 @@ -// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#pragma once - -#include "protocolserialization.h" -#include - -namespace storage { -namespace mbusprot { - -/** - * Protocol serialization version that uses Protocol Buffers for all its binary - * encoding and decoding. - */ -class ProtocolSerialization7 : public ProtocolSerialization { - const std::shared_ptr _repo; - const documentapi::LoadTypeSet& _load_types; -public: - ProtocolSerialization7(std::shared_ptr repo, - const documentapi::LoadTypeSet& load_types); - - const document::DocumentTypeRepo& type_repo() const { return *_repo; } - - // Put - void onEncode(GBBuf&, const api::PutCommand&) const override; - void onEncode(GBBuf&, const api::PutReply&) const override; - SCmd::UP onDecodePutCommand(BBuf&) const override; - SRep::UP onDecodePutReply(const SCmd&, BBuf&) const override; - - // Update - void onEncode(GBBuf&, const api::UpdateCommand&) const override; - void onEncode(GBBuf&, const api::UpdateReply&) const override; - SCmd::UP onDecodeUpdateCommand(BBuf&) const override; - SRep::UP onDecodeUpdateReply(const SCmd&, BBuf&) const override; - - // Remove - void onEncode(GBBuf&, const api::RemoveCommand&) const override; - void onEncode(GBBuf&, const api::RemoveReply&) const override; - SCmd::UP onDecodeRemoveCommand(BBuf&) const override; - SRep::UP onDecodeRemoveReply(const SCmd&, BBuf&) const override; - - // Get - void onEncode(GBBuf&, const api::GetCommand&) const override; - void onEncode(GBBuf&, const api::GetReply&) const override; - SCmd::UP onDecodeGetCommand(BBuf&) const override; - SRep::UP onDecodeGetReply(const SCmd&, BBuf&) const override; - - // Revert - TODO this is deprecated, no? - void onEncode(GBBuf&, const api::RevertCommand&) const override; - void onEncode(GBBuf&, const api::RevertReply&) const override; - SCmd::UP onDecodeRevertCommand(BBuf&) const override; - SRep::UP onDecodeRevertReply(const SCmd&, BBuf&) const override; - - // DeleteBucket - void onEncode(GBBuf&, const api::DeleteBucketCommand&) const override; - void onEncode(GBBuf&, const api::DeleteBucketReply&) const override; - SCmd::UP onDecodeDeleteBucketCommand(BBuf&) const override; - SRep::UP onDecodeDeleteBucketReply(const SCmd&, BBuf&) const override; - - // CreateBucket - void onEncode(GBBuf&, const api::CreateBucketCommand&) const override; - void onEncode(GBBuf&, const api::CreateBucketReply&) const override; - SCmd::UP onDecodeCreateBucketCommand(BBuf&) const override; - SRep::UP onDecodeCreateBucketReply(const SCmd&, BBuf&) const override; - - // MergeBucket - void onEncode(GBBuf&, const api::MergeBucketCommand&) const override; - void onEncode(GBBuf&, const api::MergeBucketReply&) const override; - SCmd::UP onDecodeMergeBucketCommand(BBuf&) const override; - SRep::UP onDecodeMergeBucketReply(const SCmd&, BBuf&) const override; - - // GetBucketDiff - void onEncode(GBBuf&, const api::GetBucketDiffCommand&) const override; - void onEncode(GBBuf&, const api::GetBucketDiffReply&) const override; - SCmd::UP onDecodeGetBucketDiffCommand(BBuf&) const override; - SRep::UP onDecodeGetBucketDiffReply(const SCmd&, BBuf&) const override; - - // ApplyBucketDiff - void onEncode(GBBuf&, const api::ApplyBucketDiffCommand&) const override; - void onEncode(GBBuf&, const api::ApplyBucketDiffReply&) const override; - SCmd::UP onDecodeApplyBucketDiffCommand(BBuf&) const override; - SRep::UP onDecodeApplyBucketDiffReply(const SCmd&, BBuf&) const override; - - // RequestBucketInfo - void onEncode(GBBuf&, const api::RequestBucketInfoCommand&) const override; - void onEncode(GBBuf&, const api::RequestBucketInfoReply&) const override; - SCmd::UP onDecodeRequestBucketInfoCommand(BBuf&) const override; - SRep::UP onDecodeRequestBucketInfoReply(const SCmd&, BBuf&) const override; - - // NotifyBucketChange - void onEncode(GBBuf&, const api::NotifyBucketChangeCommand&) const override; - void onEncode(GBBuf&, const api::NotifyBucketChangeReply&) const override; - SCmd::UP onDecodeNotifyBucketChangeCommand(BBuf&) const override; - SRep::UP onDecodeNotifyBucketChangeReply(const SCmd&, BBuf&) const override; - - // SplitBucket - void onEncode(GBBuf&, const api::SplitBucketCommand&) const override; - void onEncode(GBBuf&, const api::SplitBucketReply&) const override; - SCmd::UP onDecodeSplitBucketCommand(BBuf&) const override; - SRep::UP onDecodeSplitBucketReply(const SCmd&, BBuf&) const override; - - // JoinBuckets - void onEncode(GBBuf&, const api::JoinBucketsCommand&) const override; - void onEncode(GBBuf&, const api::JoinBucketsReply&) const override; - SCmd::UP onDecodeJoinBucketsCommand(BBuf&) const override; - SRep::UP onDecodeJoinBucketsReply(const SCmd&, BBuf&) const override; - - // SetBucketState - void onEncode(GBBuf&, const api::SetBucketStateCommand&) const override; - void onEncode(GBBuf&, const api::SetBucketStateReply&) const override; - SCmd::UP onDecodeSetBucketStateCommand(BBuf&) const override; - SRep::UP onDecodeSetBucketStateReply(const SCmd&, BBuf&) const override; - - // CreateVisitor - void onEncode(GBBuf&, const api::CreateVisitorCommand&) const override; - void onEncode(GBBuf&, const api::CreateVisitorReply&) const override; - SCmd::UP onDecodeCreateVisitorCommand(BBuf&) const override; - SRep::UP onDecodeCreateVisitorReply(const SCmd&, BBuf&) const override; - - // DestroyVisitor - void onEncode(GBBuf&, const api::DestroyVisitorCommand&) const override; - void onEncode(GBBuf&, const api::DestroyVisitorReply&) const override; - SCmd::UP onDecodeDestroyVisitorCommand(BBuf&) const override; - SRep::UP onDecodeDestroyVisitorReply(const SCmd&, BBuf&) const override; - - // RemoveLocation - void onEncode(GBBuf&, const api::RemoveLocationCommand&) const override; - void onEncode(GBBuf&, const api::RemoveLocationReply&) const override; - SCmd::UP onDecodeRemoveLocationCommand(BBuf&) const override; - SRep::UP onDecodeRemoveLocationReply(const SCmd&, BBuf&) const override; - -private: - template - std::unique_ptr decode_request(document::ByteBuffer& in_buf, Func&& f) const; - template - std::unique_ptr decode_response(document::ByteBuffer& in_buf, Func&& f) const; - template - std::unique_ptr decode_bucket_request(document::ByteBuffer& in_buf, Func&& f) const; - template - std::unique_ptr decode_bucket_response(document::ByteBuffer& in_buf, Func&& f) const; - template - std::unique_ptr decode_bucket_info_response(document::ByteBuffer& in_buf, Func&& f) const; -}; - -} -} diff --git a/storageapi/src/vespa/storageapi/mbusprot/storageprotocol.cpp b/storageapi/src/vespa/storageapi/mbusprot/storageprotocol.cpp index 7bc6333762b..7e6be0a84f5 100644 --- a/storageapi/src/vespa/storageapi/mbusprot/storageprotocol.cpp +++ b/storageapi/src/vespa/storageapi/mbusprot/storageprotocol.cpp @@ -20,8 +20,7 @@ StorageProtocol::StorageProtocol(const std::shared_ptr namespace storage::mbusprot { @@ -29,7 +28,6 @@ private: ProtocolSerialization5_1 _serializer5_1; ProtocolSerialization5_2 _serializer5_2; ProtocolSerialization6_0 _serializer6_0; - ProtocolSerialization7 _serializer7_0; }; } -- cgit v1.2.3