diff options
author | Tor Brede Vekterli <vekterli@verizonmedia.com> | 2019-04-04 14:46:23 +0000 |
---|---|---|
committer | Tor Brede Vekterli <vekterli@verizonmedia.com> | 2019-04-05 11:27:33 +0000 |
commit | 7e6b01c4b21565b13365ba1587969412bf22b62d (patch) | |
tree | 2ed16e1ba24ad144b8a5f861208c44584cb7a4a5 /storageapi | |
parent | 0fd9e1693eccc7a8aea1db61871821121cf514d2 (diff) |
Implement SetBucketState, Create/DestroyVisitor and RemoveLocation
Diffstat (limited to 'storageapi')
4 files changed, 278 insertions, 19 deletions
diff --git a/storageapi/src/tests/mbusprot/storageprotocoltest.cpp b/storageapi/src/tests/mbusprot/storageprotocoltest.cpp index a0018578238..0851d3baa43 100644 --- a/storageapi/src/tests/mbusprot/storageprotocoltest.cpp +++ b/storageapi/src/tests/mbusprot/storageprotocoltest.cpp @@ -548,7 +548,6 @@ TEST_P(StorageProtocolTest, testCreateVisitor) { cmd->setFieldSet("foo,bar,vekterli"); cmd->setVisitInconsistentBuckets(); cmd->setQueueTimeout(100); - cmd->setVisitorOrdering(document::OrderingSpecification::DESCENDING); cmd->setPriority(149); auto cmd2 = copyCommand(cmd); EXPECT_EQ("library", cmd2->getLibraryName()); @@ -562,7 +561,6 @@ TEST_P(StorageProtocolTest, testCreateVisitor) { EXPECT_EQ(buckets, cmd2->getBuckets()); EXPECT_EQ("foo,bar,vekterli", cmd2->getFieldSet()); EXPECT_TRUE(cmd2->visitInconsistentBuckets()); - EXPECT_EQ(document::OrderingSpecification::DESCENDING, cmd2->getVisitorOrdering()); EXPECT_EQ(149, cmd2->getPriority()); auto reply = std::make_shared<CreateVisitorReply>(*cmd2); diff --git a/storageapi/src/vespa/storageapi/mbusprot/protobuf/storageapi.proto b/storageapi/src/vespa/storageapi/mbusprot/protobuf/storageapi.proto index 19362bf40db..26cbd7ef303 100644 --- a/storageapi/src/vespa/storageapi/mbusprot/protobuf/storageapi.proto +++ b/storageapi/src/vespa/storageapi/mbusprot/protobuf/storageapi.proto @@ -319,3 +319,85 @@ message JoinBucketsResponse { BucketInfo bucket_info = 1; BucketId remapped_bucket_id = 2; } + +message SetBucketStateRequest { + enum BucketState { + Inactive = 0; + Active = 1; + } + + Bucket bucket = 1; + BucketState state = 2; +} + +message SetBucketStateResponse { + BucketId remapped_bucket_id = 1; +} + +message ClientVisitorParameter { + bytes key = 1; + bytes value = 2; +} + +message VisitorConstraints { + bytes document_selection = 1; + uint64 from_time_usec = 2; + uint64 to_time_usec = 3; + bool visit_removes = 5; + bytes field_set = 6; + bool visit_inconsistent_buckets = 7; +} + +message VisitorControlMeta { + bytes instance_id = 1; + bytes library_name = 2; + uint32 visitor_command_id = 3; + bytes control_destination = 4; + bytes data_destination = 5; + + // TODO move? + uint32 max_pending_reply_count = 6; + uint32 queue_timeout = 7; + uint32 max_buckets_per_visitor = 8; +} + +message CreateVisitorRequest { + BucketSpace bucket_space = 1; + repeated BucketId buckets = 2; + + VisitorConstraints constraints = 3; + VisitorControlMeta control_meta = 4; + repeated ClientVisitorParameter client_parameters = 5; +} + +message VisitorStatistics { + uint32 buckets_visited = 1; + uint64 documents_visited = 2; + uint64 bytes_visited = 3; + uint64 documents_returned = 4; + uint64 bytes_returned = 5; + uint64 second_pass_documents_returned = 6; // TODO don't include? orderdoc only + uint64 second_pass_bytes_returned = 7; // TODO don't include? orderdoc only +} + +message CreateVisitorResponse { + VisitorStatistics visitor_statistics = 1; +} + +message DestroyVisitorRequest { + bytes instance_id = 1; +} + +message DestroyVisitorResponse { + // Currently empty +} + +message RemoveLocationRequest { + Bucket bucket = 1; + bytes document_selection = 2; +} + +message RemoveLocationResponse { + BucketInfo bucket_info = 1; + BucketId remapped_bucket_id = 2; +} diff --git a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.cpp b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.cpp index 0f9f5908ca9..2e0ac075853 100644 --- a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.cpp +++ b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.cpp @@ -8,9 +8,11 @@ #include "protocolserialization7.h" #include "serializationhelper.h" #include "storageapi.pb.h" -#include <vespa/storageapi/message/bucketsplitting.h> #include <vespa/document/update/documentupdate.h> #include <vespa/document/util/bufferexceptions.h> +#include <vespa/storageapi/message/bucketsplitting.h> +#include <vespa/storageapi/message/removelocation.h> +#include <vespa/storageapi/message/visitor.h> #pragma GCC diagnostic pop @@ -1015,7 +1017,7 @@ api::StorageReply::UP ProtocolSerialization7::onDecodeSplitBucketReply(const SCm // ----------------------------------------------------------------- void ProtocolSerialization7::onEncode(GBBuf& buf, const api::JoinBucketsCommand& msg) const { - encode_bucket_request<protobuf::JoinBucketsRequest>(buf, msg, [&](protobuf::JoinBucketsRequest& req) { + encode_bucket_request<protobuf::JoinBucketsRequest>(buf, msg, [&](auto& req) { for (const auto& source : msg.getSourceBuckets()) { req.add_source_buckets()->set_raw_id(source.getRawId()); } @@ -1045,31 +1047,184 @@ api::StorageReply::UP ProtocolSerialization7::onDecodeJoinBucketsReply(const SCm }); } -/* +// ----------------------------------------------------------------- + +void ProtocolSerialization7::onEncode(GBBuf& buf, const api::SetBucketStateCommand& msg) const { + encode_bucket_request<protobuf::SetBucketStateRequest>(buf, msg, [&](auto& req) { + auto state = (msg.getState() == api::SetBucketStateCommand::BUCKET_STATE::ACTIVE + ? protobuf::SetBucketStateRequest_BucketState_Active + : protobuf::SetBucketStateRequest_BucketState_Inactive); + req.set_state(state); + }); +} + +void ProtocolSerialization7::onEncode(GBBuf& buf, const api::SetBucketStateReply& msg) const { + // SetBucketStateReply is _technically_ a BucketInfoReply, but the legacy protocol impls + // do _not_ encode bucket info as part of the wire format (and it's not used on the distributor), + // so we follow that here and only encode remapping information. + encode_bucket_response<protobuf::SetBucketStateResponse>(buf, msg, no_op_encode); +} + +api::StorageCommand::UP ProtocolSerialization7::onDecodeSetBucketStateCommand(BBuf& buf) const { + return decode_bucket_request<protobuf::SetBucketStateRequest>(buf, [&](auto& req, auto& bucket) { + auto state = (req.state() == protobuf::SetBucketStateRequest_BucketState_Active + ? api::SetBucketStateCommand::BUCKET_STATE::ACTIVE + : api::SetBucketStateCommand::BUCKET_STATE::INACTIVE); + return std::make_unique<api::SetBucketStateCommand>(bucket, state); + }); +} + +api::StorageReply::UP ProtocolSerialization7::onDecodeSetBucketStateReply(const SCmd& cmd, BBuf& buf) const { + return decode_bucket_response<protobuf::SetBucketStateResponse>(buf, [&]([[maybe_unused]] auto& res) { + return std::make_unique<api::SetBucketStateReply>(static_cast<const api::SetBucketStateCommand&>(cmd)); + }); +} + +// ----------------------------------------------------------------- + +void ProtocolSerialization7::onEncode(GBBuf& buf, const api::CreateVisitorCommand& msg) const { + encode_request<protobuf::CreateVisitorRequest>(buf, msg, [&](auto& req) { + req.mutable_bucket_space()->set_space_id(msg.getBucketSpace().getId()); + for (const auto& bucket : msg.getBuckets()) { + req.add_buckets()->set_raw_id(bucket.getRawId()); + } + + auto* ctrl_meta = req.mutable_control_meta(); + ctrl_meta->set_library_name(msg.getLibraryName().data(), msg.getLibraryName().size()); + ctrl_meta->set_instance_id(msg.getInstanceId().data(), msg.getInstanceId().size()); + ctrl_meta->set_visitor_command_id(msg.getVisitorCmdId()); + ctrl_meta->set_control_destination(msg.getControlDestination().data(), msg.getControlDestination().size()); + ctrl_meta->set_data_destination(msg.getDataDestination().data(), msg.getDataDestination().size()); + ctrl_meta->set_queue_timeout(msg.getQueueTimeout()); + ctrl_meta->set_max_pending_reply_count(msg.getMaximumPendingReplyCount()); + ctrl_meta->set_max_buckets_per_visitor(msg.getMaxBucketsPerVisitor()); + + auto* constraints = req.mutable_constraints(); + constraints->set_document_selection(msg.getDocumentSelection().data(), msg.getDocumentSelection().size()); + constraints->set_from_time_usec(msg.getFromTime()); + constraints->set_to_time_usec(msg.getToTime()); + constraints->set_visit_inconsistent_buckets(msg.visitInconsistentBuckets()); + constraints->set_visit_removes(msg.visitRemoves()); + constraints->set_field_set(msg.getFieldSet().data(), msg.getFieldSet().size()); + + for (const auto& param : msg.getParameters()) { + auto* proto_param = req.add_client_parameters(); + proto_param->set_key(param.first.data(), param.first.size()); + proto_param->set_value(param.second.data(), param.second.size()); + } + }); +} + +void ProtocolSerialization7::onEncode(GBBuf& buf, const api::CreateVisitorReply& msg) const { + encode_response<protobuf::CreateVisitorResponse>(buf, msg, [&](auto& res) { + auto& stats = msg.getVisitorStatistics(); + auto* proto_stats = res.mutable_visitor_statistics(); + proto_stats->set_buckets_visited(stats.getBucketsVisited()); + proto_stats->set_documents_visited(stats.getDocumentsVisited()); + proto_stats->set_bytes_visited(stats.getBytesVisited()); + proto_stats->set_documents_returned(stats.getDocumentsReturned()); + proto_stats->set_bytes_returned(stats.getBytesReturned()); + proto_stats->set_second_pass_documents_returned(stats.getSecondPassDocumentsReturned()); + proto_stats->set_second_pass_bytes_returned(stats.getSecondPassBytesReturned()); + }); +} + +api::StorageCommand::UP ProtocolSerialization7::onDecodeCreateVisitorCommand(BBuf& buf) const { + return decode_request<protobuf::CreateVisitorRequest>(buf, [&](auto& req) { + document::BucketSpace bucket_space(req.bucket_space().space_id()); + auto& ctrl_meta = req.control_meta(); + auto& constraints = req.constraints(); + auto cmd = std::make_unique<api::CreateVisitorCommand>(bucket_space, ctrl_meta.library_name(), + ctrl_meta.instance_id(), constraints.document_selection()); + for (const auto& proto_bucket : req.buckets()) { + cmd->getBuckets().emplace_back(proto_bucket.raw_id()); + } + + cmd->setVisitorCmdId(ctrl_meta.visitor_command_id()); + cmd->setControlDestination(ctrl_meta.control_destination()); + cmd->setDataDestination(ctrl_meta.data_destination()); + cmd->setMaximumPendingReplyCount(ctrl_meta.max_pending_reply_count()); + cmd->setQueueTimeout(ctrl_meta.queue_timeout()); + cmd->setMaxBucketsPerVisitor(ctrl_meta.max_buckets_per_visitor()); + cmd->setVisitorDispatcherVersion(50); // FIXME this magic number is lifted verbatim from the 5.1 protocol + + for (const auto& proto_param : req.client_parameters()) { + cmd->getParameters().set(proto_param.key(), proto_param.value()); + } + + cmd->setFromTime(constraints.from_time_usec()); + cmd->setToTime(constraints.to_time_usec()); + cmd->setVisitRemoves(constraints.visit_removes()); + cmd->setFieldSet(constraints.field_set()); + cmd->setVisitInconsistentBuckets(constraints.visit_inconsistent_buckets()); + return cmd; + }); +} + +api::StorageReply::UP ProtocolSerialization7::onDecodeCreateVisitorReply(const SCmd& cmd, BBuf& buf) const { + return decode_response<protobuf::CreateVisitorResponse>(buf, [&](auto& res) { + auto reply = std::make_unique<api::CreateVisitorReply>(static_cast<const api::CreateVisitorCommand&>(cmd)); + vdslib::VisitorStatistics vs; + const auto& proto_stats = res.visitor_statistics(); + vs.setBucketsVisited(proto_stats.buckets_visited()); + vs.setDocumentsVisited(proto_stats.documents_visited()); + vs.setBytesVisited(proto_stats.bytes_visited()); + vs.setDocumentsReturned(proto_stats.documents_returned()); + vs.setBytesReturned(proto_stats.bytes_returned()); + vs.setSecondPassDocumentsReturned(proto_stats.second_pass_documents_returned()); + vs.setSecondPassBytesReturned(proto_stats.second_pass_bytes_returned()); + reply->setVisitorStatistics(vs); + return reply; + }); +} // ----------------------------------------------------------------- -void ProtocolSerialization7::onEncode(GBBuf& buf, const api::Command& msg) const { - (void)buf; - (void)msg; +void ProtocolSerialization7::onEncode(GBBuf& buf, const api::DestroyVisitorCommand& msg) const { + encode_request<protobuf::DestroyVisitorRequest>(buf, msg, [&](auto& req) { + req.set_instance_id(msg.getInstanceId().data(), msg.getInstanceId().size()); + }); } -void ProtocolSerialization7::onEncode(GBBuf& buf, const api::Reply& msg) const { - (void)buf; - (void)msg; +void ProtocolSerialization7::onEncode(GBBuf& buf, const api::DestroyVisitorReply& msg) const { + encode_response<protobuf::DestroyVisitorResponse>(buf, msg, no_op_encode); } -api::StorageCommand::UP ProtocolSerialization7::onDecodeCommand(BBuf& buf) const { - (void)buf; - return api::StorageCommand::UP(); +api::StorageCommand::UP ProtocolSerialization7::onDecodeDestroyVisitorCommand(BBuf& buf) const { + return decode_request<protobuf::DestroyVisitorRequest>(buf, [&](auto& req) { + return std::make_unique<api::DestroyVisitorCommand>(req.instance_id()); + }); } -api::StorageReply::UP ProtocolSerialization7::onDecodeReply(const SCmd& cmd, BBuf& buf) const { - (void)cmd; - (void)buf; - return api::StorageReply::UP(); +api::StorageReply::UP ProtocolSerialization7::onDecodeDestroyVisitorReply(const SCmd& cmd, BBuf& buf) const { + return decode_response<protobuf::DestroyVisitorResponse>(buf, [&]([[maybe_unused]] auto& res) { + return std::make_unique<api::DestroyVisitorReply>(static_cast<const api::DestroyVisitorCommand&>(cmd)); + }); +} + +// ----------------------------------------------------------------- + +void ProtocolSerialization7::onEncode(GBBuf& buf, const api::RemoveLocationCommand& msg) const { + encode_bucket_request<protobuf::RemoveLocationRequest>(buf, msg, [&](auto& req) { + req.set_document_selection(msg.getDocumentSelection().data(), msg.getDocumentSelection().size()); + }); +} + +void ProtocolSerialization7::onEncode(GBBuf& buf, const api::RemoveLocationReply& msg) const { + encode_bucket_info_response<protobuf::RemoveLocationResponse>(buf, msg, no_op_encode); +} + +api::StorageCommand::UP ProtocolSerialization7::onDecodeRemoveLocationCommand(BBuf& buf) const { + return decode_bucket_request<protobuf::RemoveLocationRequest>(buf, [&](auto& req, auto& bucket) { + return std::make_unique<api::RemoveLocationCommand>(req.document_selection(), bucket); + }); +} + +api::StorageReply::UP ProtocolSerialization7::onDecodeRemoveLocationReply(const SCmd& cmd, BBuf& buf) const { + return decode_bucket_info_response<protobuf::RemoveLocationResponse>(buf, [&]([[maybe_unused]] auto& res) { + return std::make_unique<api::RemoveLocationReply>(static_cast<const api::RemoveLocationCommand&>(cmd)); + }); } - */ /* * TODO extend testing of: diff --git a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.h b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.h index 5c527f8d032..17c4c71ef8e 100644 --- a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.h +++ b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.h @@ -103,6 +103,30 @@ public: SCmd::UP onDecodeJoinBucketsCommand(BBuf&) const override; SRep::UP onDecodeJoinBucketsReply(const SCmd&, BBuf&) const override; + // SetBucketState + void onEncode(GBBuf&, const api::SetBucketStateCommand&) const override; + void onEncode(GBBuf&, const api::SetBucketStateReply&) const override; + SCmd::UP onDecodeSetBucketStateCommand(BBuf&) const override; + SRep::UP onDecodeSetBucketStateReply(const SCmd&, BBuf&) const override; + + // CreateVisitor + void onEncode(GBBuf&, const api::CreateVisitorCommand&) const override; + void onEncode(GBBuf&, const api::CreateVisitorReply&) const override; + SCmd::UP onDecodeCreateVisitorCommand(BBuf&) const override; + SRep::UP onDecodeCreateVisitorReply(const SCmd&, BBuf&) const override; + + // DestroyVisitor + void onEncode(GBBuf&, const api::DestroyVisitorCommand&) const override; + void onEncode(GBBuf&, const api::DestroyVisitorReply&) const override; + SCmd::UP onDecodeDestroyVisitorCommand(BBuf&) const override; + SRep::UP onDecodeDestroyVisitorReply(const SCmd&, BBuf&) const override; + + // RemoveLocation + void onEncode(GBBuf&, const api::RemoveLocationCommand&) const override; + void onEncode(GBBuf&, const api::RemoveLocationReply&) const override; + SCmd::UP onDecodeRemoveLocationCommand(BBuf&) const override; + SRep::UP onDecodeRemoveLocationReply(const SCmd&, BBuf&) const override; + private: template <typename ProtobufType, typename Func> std::unique_ptr<api::StorageCommand> decode_request(document::ByteBuffer& in_buf, Func&& f) const; |