summaryrefslogtreecommitdiffstats
path: root/storageapi
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@verizonmedia.com>2019-04-04 14:46:23 +0000
committerTor Brede Vekterli <vekterli@verizonmedia.com>2019-04-05 11:27:33 +0000
commit7e6b01c4b21565b13365ba1587969412bf22b62d (patch)
tree2ed16e1ba24ad144b8a5f861208c44584cb7a4a5 /storageapi
parent0fd9e1693eccc7a8aea1db61871821121cf514d2 (diff)
Implement SetBucketState, Create/DestroyVisitor and RemoveLocation
Diffstat (limited to 'storageapi')
-rw-r--r--storageapi/src/tests/mbusprot/storageprotocoltest.cpp2
-rw-r--r--storageapi/src/vespa/storageapi/mbusprot/protobuf/storageapi.proto82
-rw-r--r--storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.cpp189
-rw-r--r--storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.h24
4 files changed, 278 insertions, 19 deletions
diff --git a/storageapi/src/tests/mbusprot/storageprotocoltest.cpp b/storageapi/src/tests/mbusprot/storageprotocoltest.cpp
index a0018578238..0851d3baa43 100644
--- a/storageapi/src/tests/mbusprot/storageprotocoltest.cpp
+++ b/storageapi/src/tests/mbusprot/storageprotocoltest.cpp
@@ -548,7 +548,6 @@ TEST_P(StorageProtocolTest, testCreateVisitor) {
cmd->setFieldSet("foo,bar,vekterli");
cmd->setVisitInconsistentBuckets();
cmd->setQueueTimeout(100);
- cmd->setVisitorOrdering(document::OrderingSpecification::DESCENDING);
cmd->setPriority(149);
auto cmd2 = copyCommand(cmd);
EXPECT_EQ("library", cmd2->getLibraryName());
@@ -562,7 +561,6 @@ TEST_P(StorageProtocolTest, testCreateVisitor) {
EXPECT_EQ(buckets, cmd2->getBuckets());
EXPECT_EQ("foo,bar,vekterli", cmd2->getFieldSet());
EXPECT_TRUE(cmd2->visitInconsistentBuckets());
- EXPECT_EQ(document::OrderingSpecification::DESCENDING, cmd2->getVisitorOrdering());
EXPECT_EQ(149, cmd2->getPriority());
auto reply = std::make_shared<CreateVisitorReply>(*cmd2);
diff --git a/storageapi/src/vespa/storageapi/mbusprot/protobuf/storageapi.proto b/storageapi/src/vespa/storageapi/mbusprot/protobuf/storageapi.proto
index 19362bf40db..26cbd7ef303 100644
--- a/storageapi/src/vespa/storageapi/mbusprot/protobuf/storageapi.proto
+++ b/storageapi/src/vespa/storageapi/mbusprot/protobuf/storageapi.proto
@@ -319,3 +319,85 @@ message JoinBucketsResponse {
BucketInfo bucket_info = 1;
BucketId remapped_bucket_id = 2;
}
+
+message SetBucketStateRequest {
+ enum BucketState {
+ Inactive = 0;
+ Active = 1;
+ }
+
+ Bucket bucket = 1;
+ BucketState state = 2;
+}
+
+message SetBucketStateResponse {
+ BucketId remapped_bucket_id = 1;
+}
+
+message ClientVisitorParameter {
+ bytes key = 1;
+ bytes value = 2;
+}
+
+message VisitorConstraints {
+ bytes document_selection = 1;
+ uint64 from_time_usec = 2;
+ uint64 to_time_usec = 3;
+ bool visit_removes = 5;
+ bytes field_set = 6;
+ bool visit_inconsistent_buckets = 7;
+}
+
+message VisitorControlMeta {
+ bytes instance_id = 1;
+ bytes library_name = 2;
+ uint32 visitor_command_id = 3;
+ bytes control_destination = 4;
+ bytes data_destination = 5;
+
+ // TODO move?
+ uint32 max_pending_reply_count = 6;
+ uint32 queue_timeout = 7;
+ uint32 max_buckets_per_visitor = 8;
+}
+
+message CreateVisitorRequest {
+ BucketSpace bucket_space = 1;
+ repeated BucketId buckets = 2;
+
+ VisitorConstraints constraints = 3;
+ VisitorControlMeta control_meta = 4;
+ repeated ClientVisitorParameter client_parameters = 5;
+}
+
+message VisitorStatistics {
+ uint32 buckets_visited = 1;
+ uint64 documents_visited = 2;
+ uint64 bytes_visited = 3;
+ uint64 documents_returned = 4;
+ uint64 bytes_returned = 5;
+ uint64 second_pass_documents_returned = 6; // TODO don't include? orderdoc only
+ uint64 second_pass_bytes_returned = 7; // TODO don't include? orderdoc only
+}
+
+message CreateVisitorResponse {
+ VisitorStatistics visitor_statistics = 1;
+}
+
+message DestroyVisitorRequest {
+ bytes instance_id = 1;
+}
+
+message DestroyVisitorResponse {
+ // Currently empty
+}
+
+message RemoveLocationRequest {
+ Bucket bucket = 1;
+ bytes document_selection = 2;
+}
+
+message RemoveLocationResponse {
+ BucketInfo bucket_info = 1;
+ BucketId remapped_bucket_id = 2;
+}
diff --git a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.cpp b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.cpp
index 0f9f5908ca9..2e0ac075853 100644
--- a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.cpp
+++ b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.cpp
@@ -8,9 +8,11 @@
#include "protocolserialization7.h"
#include "serializationhelper.h"
#include "storageapi.pb.h"
-#include <vespa/storageapi/message/bucketsplitting.h>
#include <vespa/document/update/documentupdate.h>
#include <vespa/document/util/bufferexceptions.h>
+#include <vespa/storageapi/message/bucketsplitting.h>
+#include <vespa/storageapi/message/removelocation.h>
+#include <vespa/storageapi/message/visitor.h>
#pragma GCC diagnostic pop
@@ -1015,7 +1017,7 @@ api::StorageReply::UP ProtocolSerialization7::onDecodeSplitBucketReply(const SCm
// -----------------------------------------------------------------
void ProtocolSerialization7::onEncode(GBBuf& buf, const api::JoinBucketsCommand& msg) const {
- encode_bucket_request<protobuf::JoinBucketsRequest>(buf, msg, [&](protobuf::JoinBucketsRequest& req) {
+ encode_bucket_request<protobuf::JoinBucketsRequest>(buf, msg, [&](auto& req) {
for (const auto& source : msg.getSourceBuckets()) {
req.add_source_buckets()->set_raw_id(source.getRawId());
}
@@ -1045,31 +1047,184 @@ api::StorageReply::UP ProtocolSerialization7::onDecodeJoinBucketsReply(const SCm
});
}
-/*
+// -----------------------------------------------------------------
+
+void ProtocolSerialization7::onEncode(GBBuf& buf, const api::SetBucketStateCommand& msg) const {
+ encode_bucket_request<protobuf::SetBucketStateRequest>(buf, msg, [&](auto& req) {
+ auto state = (msg.getState() == api::SetBucketStateCommand::BUCKET_STATE::ACTIVE
+ ? protobuf::SetBucketStateRequest_BucketState_Active
+ : protobuf::SetBucketStateRequest_BucketState_Inactive);
+ req.set_state(state);
+ });
+}
+
+void ProtocolSerialization7::onEncode(GBBuf& buf, const api::SetBucketStateReply& msg) const {
+ // SetBucketStateReply is _technically_ a BucketInfoReply, but the legacy protocol impls
+ // do _not_ encode bucket info as part of the wire format (and it's not used on the distributor),
+ // so we follow that here and only encode remapping information.
+ encode_bucket_response<protobuf::SetBucketStateResponse>(buf, msg, no_op_encode);
+}
+
+api::StorageCommand::UP ProtocolSerialization7::onDecodeSetBucketStateCommand(BBuf& buf) const {
+ return decode_bucket_request<protobuf::SetBucketStateRequest>(buf, [&](auto& req, auto& bucket) {
+ auto state = (req.state() == protobuf::SetBucketStateRequest_BucketState_Active
+ ? api::SetBucketStateCommand::BUCKET_STATE::ACTIVE
+ : api::SetBucketStateCommand::BUCKET_STATE::INACTIVE);
+ return std::make_unique<api::SetBucketStateCommand>(bucket, state);
+ });
+}
+
+api::StorageReply::UP ProtocolSerialization7::onDecodeSetBucketStateReply(const SCmd& cmd, BBuf& buf) const {
+ return decode_bucket_response<protobuf::SetBucketStateResponse>(buf, [&]([[maybe_unused]] auto& res) {
+ return std::make_unique<api::SetBucketStateReply>(static_cast<const api::SetBucketStateCommand&>(cmd));
+ });
+}
+
+// -----------------------------------------------------------------
+
+void ProtocolSerialization7::onEncode(GBBuf& buf, const api::CreateVisitorCommand& msg) const {
+ encode_request<protobuf::CreateVisitorRequest>(buf, msg, [&](auto& req) {
+ req.mutable_bucket_space()->set_space_id(msg.getBucketSpace().getId());
+ for (const auto& bucket : msg.getBuckets()) {
+ req.add_buckets()->set_raw_id(bucket.getRawId());
+ }
+
+ auto* ctrl_meta = req.mutable_control_meta();
+ ctrl_meta->set_library_name(msg.getLibraryName().data(), msg.getLibraryName().size());
+ ctrl_meta->set_instance_id(msg.getInstanceId().data(), msg.getInstanceId().size());
+ ctrl_meta->set_visitor_command_id(msg.getVisitorCmdId());
+ ctrl_meta->set_control_destination(msg.getControlDestination().data(), msg.getControlDestination().size());
+ ctrl_meta->set_data_destination(msg.getDataDestination().data(), msg.getDataDestination().size());
+ ctrl_meta->set_queue_timeout(msg.getQueueTimeout());
+ ctrl_meta->set_max_pending_reply_count(msg.getMaximumPendingReplyCount());
+ ctrl_meta->set_max_buckets_per_visitor(msg.getMaxBucketsPerVisitor());
+
+ auto* constraints = req.mutable_constraints();
+ constraints->set_document_selection(msg.getDocumentSelection().data(), msg.getDocumentSelection().size());
+ constraints->set_from_time_usec(msg.getFromTime());
+ constraints->set_to_time_usec(msg.getToTime());
+ constraints->set_visit_inconsistent_buckets(msg.visitInconsistentBuckets());
+ constraints->set_visit_removes(msg.visitRemoves());
+ constraints->set_field_set(msg.getFieldSet().data(), msg.getFieldSet().size());
+
+ for (const auto& param : msg.getParameters()) {
+ auto* proto_param = req.add_client_parameters();
+ proto_param->set_key(param.first.data(), param.first.size());
+ proto_param->set_value(param.second.data(), param.second.size());
+ }
+ });
+}
+
+void ProtocolSerialization7::onEncode(GBBuf& buf, const api::CreateVisitorReply& msg) const {
+ encode_response<protobuf::CreateVisitorResponse>(buf, msg, [&](auto& res) {
+ auto& stats = msg.getVisitorStatistics();
+ auto* proto_stats = res.mutable_visitor_statistics();
+ proto_stats->set_buckets_visited(stats.getBucketsVisited());
+ proto_stats->set_documents_visited(stats.getDocumentsVisited());
+ proto_stats->set_bytes_visited(stats.getBytesVisited());
+ proto_stats->set_documents_returned(stats.getDocumentsReturned());
+ proto_stats->set_bytes_returned(stats.getBytesReturned());
+ proto_stats->set_second_pass_documents_returned(stats.getSecondPassDocumentsReturned());
+ proto_stats->set_second_pass_bytes_returned(stats.getSecondPassBytesReturned());
+ });
+}
+
+api::StorageCommand::UP ProtocolSerialization7::onDecodeCreateVisitorCommand(BBuf& buf) const {
+ return decode_request<protobuf::CreateVisitorRequest>(buf, [&](auto& req) {
+ document::BucketSpace bucket_space(req.bucket_space().space_id());
+ auto& ctrl_meta = req.control_meta();
+ auto& constraints = req.constraints();
+ auto cmd = std::make_unique<api::CreateVisitorCommand>(bucket_space, ctrl_meta.library_name(),
+ ctrl_meta.instance_id(), constraints.document_selection());
+ for (const auto& proto_bucket : req.buckets()) {
+ cmd->getBuckets().emplace_back(proto_bucket.raw_id());
+ }
+
+ cmd->setVisitorCmdId(ctrl_meta.visitor_command_id());
+ cmd->setControlDestination(ctrl_meta.control_destination());
+ cmd->setDataDestination(ctrl_meta.data_destination());
+ cmd->setMaximumPendingReplyCount(ctrl_meta.max_pending_reply_count());
+ cmd->setQueueTimeout(ctrl_meta.queue_timeout());
+ cmd->setMaxBucketsPerVisitor(ctrl_meta.max_buckets_per_visitor());
+ cmd->setVisitorDispatcherVersion(50); // FIXME this magic number is lifted verbatim from the 5.1 protocol
+
+ for (const auto& proto_param : req.client_parameters()) {
+ cmd->getParameters().set(proto_param.key(), proto_param.value());
+ }
+
+ cmd->setFromTime(constraints.from_time_usec());
+ cmd->setToTime(constraints.to_time_usec());
+ cmd->setVisitRemoves(constraints.visit_removes());
+ cmd->setFieldSet(constraints.field_set());
+ cmd->setVisitInconsistentBuckets(constraints.visit_inconsistent_buckets());
+ return cmd;
+ });
+}
+
+api::StorageReply::UP ProtocolSerialization7::onDecodeCreateVisitorReply(const SCmd& cmd, BBuf& buf) const {
+ return decode_response<protobuf::CreateVisitorResponse>(buf, [&](auto& res) {
+ auto reply = std::make_unique<api::CreateVisitorReply>(static_cast<const api::CreateVisitorCommand&>(cmd));
+ vdslib::VisitorStatistics vs;
+ const auto& proto_stats = res.visitor_statistics();
+ vs.setBucketsVisited(proto_stats.buckets_visited());
+ vs.setDocumentsVisited(proto_stats.documents_visited());
+ vs.setBytesVisited(proto_stats.bytes_visited());
+ vs.setDocumentsReturned(proto_stats.documents_returned());
+ vs.setBytesReturned(proto_stats.bytes_returned());
+ vs.setSecondPassDocumentsReturned(proto_stats.second_pass_documents_returned());
+ vs.setSecondPassBytesReturned(proto_stats.second_pass_bytes_returned());
+ reply->setVisitorStatistics(vs);
+ return reply;
+ });
+}
// -----------------------------------------------------------------
-void ProtocolSerialization7::onEncode(GBBuf& buf, const api::Command& msg) const {
- (void)buf;
- (void)msg;
+void ProtocolSerialization7::onEncode(GBBuf& buf, const api::DestroyVisitorCommand& msg) const {
+ encode_request<protobuf::DestroyVisitorRequest>(buf, msg, [&](auto& req) {
+ req.set_instance_id(msg.getInstanceId().data(), msg.getInstanceId().size());
+ });
}
-void ProtocolSerialization7::onEncode(GBBuf& buf, const api::Reply& msg) const {
- (void)buf;
- (void)msg;
+void ProtocolSerialization7::onEncode(GBBuf& buf, const api::DestroyVisitorReply& msg) const {
+ encode_response<protobuf::DestroyVisitorResponse>(buf, msg, no_op_encode);
}
-api::StorageCommand::UP ProtocolSerialization7::onDecodeCommand(BBuf& buf) const {
- (void)buf;
- return api::StorageCommand::UP();
+api::StorageCommand::UP ProtocolSerialization7::onDecodeDestroyVisitorCommand(BBuf& buf) const {
+ return decode_request<protobuf::DestroyVisitorRequest>(buf, [&](auto& req) {
+ return std::make_unique<api::DestroyVisitorCommand>(req.instance_id());
+ });
}
-api::StorageReply::UP ProtocolSerialization7::onDecodeReply(const SCmd& cmd, BBuf& buf) const {
- (void)cmd;
- (void)buf;
- return api::StorageReply::UP();
+api::StorageReply::UP ProtocolSerialization7::onDecodeDestroyVisitorReply(const SCmd& cmd, BBuf& buf) const {
+ return decode_response<protobuf::DestroyVisitorResponse>(buf, [&]([[maybe_unused]] auto& res) {
+ return std::make_unique<api::DestroyVisitorReply>(static_cast<const api::DestroyVisitorCommand&>(cmd));
+ });
+}
+
+// -----------------------------------------------------------------
+
+void ProtocolSerialization7::onEncode(GBBuf& buf, const api::RemoveLocationCommand& msg) const {
+ encode_bucket_request<protobuf::RemoveLocationRequest>(buf, msg, [&](auto& req) {
+ req.set_document_selection(msg.getDocumentSelection().data(), msg.getDocumentSelection().size());
+ });
+}
+
+void ProtocolSerialization7::onEncode(GBBuf& buf, const api::RemoveLocationReply& msg) const {
+ encode_bucket_info_response<protobuf::RemoveLocationResponse>(buf, msg, no_op_encode);
+}
+
+api::StorageCommand::UP ProtocolSerialization7::onDecodeRemoveLocationCommand(BBuf& buf) const {
+ return decode_bucket_request<protobuf::RemoveLocationRequest>(buf, [&](auto& req, auto& bucket) {
+ return std::make_unique<api::RemoveLocationCommand>(req.document_selection(), bucket);
+ });
+}
+
+api::StorageReply::UP ProtocolSerialization7::onDecodeRemoveLocationReply(const SCmd& cmd, BBuf& buf) const {
+ return decode_bucket_info_response<protobuf::RemoveLocationResponse>(buf, [&]([[maybe_unused]] auto& res) {
+ return std::make_unique<api::RemoveLocationReply>(static_cast<const api::RemoveLocationCommand&>(cmd));
+ });
}
- */
/*
* TODO extend testing of:
diff --git a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.h b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.h
index 5c527f8d032..17c4c71ef8e 100644
--- a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.h
+++ b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.h
@@ -103,6 +103,30 @@ public:
SCmd::UP onDecodeJoinBucketsCommand(BBuf&) const override;
SRep::UP onDecodeJoinBucketsReply(const SCmd&, BBuf&) const override;
+ // SetBucketState
+ void onEncode(GBBuf&, const api::SetBucketStateCommand&) const override;
+ void onEncode(GBBuf&, const api::SetBucketStateReply&) const override;
+ SCmd::UP onDecodeSetBucketStateCommand(BBuf&) const override;
+ SRep::UP onDecodeSetBucketStateReply(const SCmd&, BBuf&) const override;
+
+ // CreateVisitor
+ void onEncode(GBBuf&, const api::CreateVisitorCommand&) const override;
+ void onEncode(GBBuf&, const api::CreateVisitorReply&) const override;
+ SCmd::UP onDecodeCreateVisitorCommand(BBuf&) const override;
+ SRep::UP onDecodeCreateVisitorReply(const SCmd&, BBuf&) const override;
+
+ // DestroyVisitor
+ void onEncode(GBBuf&, const api::DestroyVisitorCommand&) const override;
+ void onEncode(GBBuf&, const api::DestroyVisitorReply&) const override;
+ SCmd::UP onDecodeDestroyVisitorCommand(BBuf&) const override;
+ SRep::UP onDecodeDestroyVisitorReply(const SCmd&, BBuf&) const override;
+
+ // RemoveLocation
+ void onEncode(GBBuf&, const api::RemoveLocationCommand&) const override;
+ void onEncode(GBBuf&, const api::RemoveLocationReply&) const override;
+ SCmd::UP onDecodeRemoveLocationCommand(BBuf&) const override;
+ SRep::UP onDecodeRemoveLocationReply(const SCmd&, BBuf&) const override;
+
private:
template <typename ProtobufType, typename Func>
std::unique_ptr<api::StorageCommand> decode_request(document::ByteBuffer& in_buf, Func&& f) const;