summaryrefslogtreecommitdiffstats
path: root/storageapi
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@verizonmedia.com>2019-04-09 14:35:06 +0000
committerTor Brede Vekterli <vekterli@verizonmedia.com>2019-04-09 14:35:06 +0000
commitd9391e832e4f0c5e87000a49eac72b017631e5a7 (patch)
tree473a9122bbe2c9635ac8d1cf6add3aa36c7aedd1 /storageapi
parent0fc0216005f381793f3a4a519d609f4958f397e2 (diff)
Misc cleanup and refactoring
Diffstat (limited to 'storageapi')
-rw-r--r--storageapi/src/vespa/storageapi/mbusprot/protobuf/common.proto6
-rw-r--r--storageapi/src/vespa/storageapi/mbusprot/protobuf/visiting.proto30
-rw-r--r--storageapi/src/vespa/storageapi/mbusprot/protocolserialization.h15
-rw-r--r--storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.cpp114
4 files changed, 88 insertions, 77 deletions
diff --git a/storageapi/src/vespa/storageapi/mbusprot/protobuf/common.proto b/storageapi/src/vespa/storageapi/mbusprot/protobuf/common.proto
index e7c0a641407..2745fcc28b0 100644
--- a/storageapi/src/vespa/storageapi/mbusprot/protobuf/common.proto
+++ b/storageapi/src/vespa/storageapi/mbusprot/protobuf/common.proto
@@ -58,8 +58,10 @@ message BucketInfo {
}
message GlobalId {
- // 96 bits of GID data in _little_ endian.
+ // 96 bits of GID data in _little_ endian. High entropy, so fixed encoding is better than varint.
+ // Low 64 bits as if memcpy()ed from bytes [0, 8) of the GID buffer
fixed64 lo_64 = 1;
+ // High 32 bits as if memcpy()ed from bytes [8, 12) of the GID buffer
fixed32 hi_32 = 2;
}
@@ -68,7 +70,7 @@ message RequestHeader {
uint64 message_id = 1;
uint32 priority = 2; // Always in range [0, 255]
uint32 source_index = 3; // Always in range [0, 65535]
- fixed32 loadtype_id = 4;
+ fixed32 loadtype_id = 4; // It's a hash with high entropy, so fixed encoding is better than varint
}
// TODO these should ideally be gRPC headers..
diff --git a/storageapi/src/vespa/storageapi/mbusprot/protobuf/visiting.proto b/storageapi/src/vespa/storageapi/mbusprot/protobuf/visiting.proto
index aefd4c6c805..e3f5271599e 100644
--- a/storageapi/src/vespa/storageapi/mbusprot/protobuf/visiting.proto
+++ b/storageapi/src/vespa/storageapi/mbusprot/protobuf/visiting.proto
@@ -14,32 +14,32 @@ message ClientVisitorParameter {
}
message VisitorConstraints {
- bytes document_selection = 1;
- uint64 from_time_usec = 2;
- uint64 to_time_usec = 3;
- bool visit_removes = 5;
- bytes field_set = 6;
- bool visit_inconsistent_buckets = 7;
+ bytes document_selection = 1;
+ uint64 from_time_usec = 2;
+ uint64 to_time_usec = 3;
+ bool visit_removes = 4;
+ bytes field_set = 5;
+ bool visit_inconsistent_buckets = 6;
}
message VisitorControlMeta {
- bytes instance_id = 1;
- bytes library_name = 2;
- uint32 visitor_command_id = 3;
- bytes control_destination = 4;
- bytes data_destination = 5;
+ bytes instance_id = 1;
+ bytes library_name = 2;
+ uint32 visitor_command_id = 3;
+ bytes control_destination = 4;
+ bytes data_destination = 5;
// TODO move?
uint32 max_pending_reply_count = 6;
- uint32 queue_timeout = 7;
+ uint32 queue_timeout = 7;
uint32 max_buckets_per_visitor = 8;
}
message CreateVisitorRequest {
- BucketSpace bucket_space = 1;
- repeated BucketId buckets = 2;
+ BucketSpace bucket_space = 1;
+ repeated BucketId buckets = 2;
- VisitorConstraints constraints = 3;
+ VisitorConstraints constraints = 3;
VisitorControlMeta control_meta = 4;
repeated ClientVisitorParameter client_parameters = 5;
}
diff --git a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization.h b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization.h
index 6fe152302ee..a57627b9ba9 100644
--- a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization.h
+++ b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization.h
@@ -95,13 +95,10 @@ protected:
virtual void onEncode(GBBuf&, const api::GetBucketDiffReply&) const = 0;
virtual void onEncode(GBBuf&, const api::ApplyBucketDiffCommand&) const = 0;
virtual void onEncode(GBBuf&, const api::ApplyBucketDiffReply&) const = 0;
- virtual void onEncode(GBBuf&,
- const api::RequestBucketInfoCommand&) const = 0;
+ virtual void onEncode(GBBuf&, const api::RequestBucketInfoCommand&) const = 0;
virtual void onEncode(GBBuf&, const api::RequestBucketInfoReply&) const = 0;
- virtual void onEncode(GBBuf&,
- const api::NotifyBucketChangeCommand&) const = 0;
- virtual void onEncode(GBBuf&,
- const api::NotifyBucketChangeReply&) const = 0;
+ virtual void onEncode(GBBuf&, const api::NotifyBucketChangeCommand&) const = 0;
+ virtual void onEncode(GBBuf&, const api::NotifyBucketChangeReply&) const = 0;
virtual void onEncode(GBBuf&, const api::SplitBucketCommand&) const = 0;
virtual void onEncode(GBBuf&, const api::SplitBucketReply&) const = 0;
virtual void onEncode(GBBuf&, const api::JoinBucketsCommand&) const = 0;
@@ -136,11 +133,9 @@ protected:
virtual SCmd::UP onDecodeApplyBucketDiffCommand(BBuf&) const = 0;
virtual SRep::UP onDecodeApplyBucketDiffReply(const SCmd&, BBuf&) const = 0;
virtual SCmd::UP onDecodeRequestBucketInfoCommand(BBuf&) const = 0;
- virtual SRep::UP onDecodeRequestBucketInfoReply(const SCmd&,
- BBuf&) const = 0;
+ virtual SRep::UP onDecodeRequestBucketInfoReply(const SCmd&, BBuf&) const = 0;
virtual SCmd::UP onDecodeNotifyBucketChangeCommand(BBuf&) const = 0;
- virtual SRep::UP onDecodeNotifyBucketChangeReply(const SCmd&,
- BBuf&) const = 0;
+ virtual SRep::UP onDecodeNotifyBucketChangeReply(const SCmd&, BBuf&) const = 0;
virtual SCmd::UP onDecodeSplitBucketCommand(BBuf&) const = 0;
virtual SRep::UP onDecodeSplitBucketReply(const SCmd&, BBuf&) const = 0;
virtual SCmd::UP onDecodeJoinBucketsCommand(BBuf&) const = 0;
diff --git a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.cpp b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.cpp
index 23ec5ab86e7..9352e2a75db 100644
--- a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.cpp
+++ b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.cpp
@@ -28,6 +28,22 @@ void set_bucket(protobuf::Bucket& dest, const document::Bucket& src) {
dest.set_space_id(src.getBucketSpace().getId());
}
+void set_bucket_id(protobuf::BucketId& dest, const document::BucketId& src) {
+ dest.set_raw_id(src.getRawId());
+}
+
+document::BucketId get_bucket_id(const protobuf::BucketId& src) {
+ return document::BucketId(src.raw_id());
+}
+
+void set_bucket_space(protobuf::BucketSpace& dest, const document::BucketSpace& src) {
+ dest.set_space_id(src.getId());
+}
+
+document::BucketSpace get_bucket_space(const protobuf::BucketSpace& src) {
+ return document::BucketSpace(src.space_id());
+}
+
void set_bucket_info(protobuf::BucketInfo& dest, const api::BucketInfo& src) {
auto* info = dest.mutable_info_v1();
info->set_last_modified_timestamp(src.getLastModified());
@@ -70,7 +86,6 @@ void set_tas_condition(protobuf::TestAndSetCondition& dest, const documentapi::T
dest.set_selection(src.getSelection().data(), src.getSelection().size());
}
-// TODO add test with unset doc field in root proto
std::shared_ptr<document::Document> get_document(const protobuf::Document& src_doc,
const document::DocumentTypeRepo& type_repo)
{
@@ -229,9 +244,12 @@ public:
_load_types(load_types)
{
decode_request_header(in_buf, _hdr);
- bool ok = _proto_obj->ParseFromArray(in_buf.getBufferAtPos(), in_buf.getRemaining()); // FIXME size handling
+ assert(in_buf.getRemaining() <= INT_MAX);
+ bool ok = _proto_obj->ParseFromArray(in_buf.getBufferAtPos(), in_buf.getRemaining());
if (!ok) {
- throw vespalib::IllegalArgumentException("Malformed protobuf request payload");
+ throw vespalib::IllegalArgumentException(
+ vespalib::make_string("Malformed protobuf request payload for %s",
+ ProtobufType::descriptor()->full_name().c_str()));
}
}
@@ -257,9 +275,12 @@ public:
_proto_obj(::google::protobuf::Arena::Create<ProtobufType>(&_arena))
{
decode_response_header(in_buf, _hdr);
- bool ok = _proto_obj->ParseFromArray(in_buf.getBufferAtPos(), in_buf.getRemaining()); // FIXME size handling
+ assert(in_buf.getRemaining() <= INT_MAX);
+ bool ok = _proto_obj->ParseFromArray(in_buf.getBufferAtPos(), in_buf.getRemaining());
if (!ok) {
- throw vespalib::IllegalArgumentException("Malformed protobuf response payload");
+ throw vespalib::IllegalArgumentException(
+ vespalib::make_string("Malformed protobuf response payload for %s",
+ ProtobufType::descriptor()->full_name().c_str()));
}
}
@@ -327,7 +348,7 @@ template <typename ProtobufType, typename Func>
void encode_bucket_response(vespalib::GrowableByteBuffer& out_buf, const api::BucketReply& reply, Func&& f) {
encode_response<ProtobufType>(out_buf, reply, [&](ProtobufType& res) {
if (reply.hasBeenRemapped()) {
- res.mutable_remapped_bucket_id()->set_raw_id(reply.getBucketId().getRawId());
+ set_bucket_id(*res.mutable_remapped_bucket_id(), reply.getBucketId());
}
f(res);
});
@@ -339,7 +360,7 @@ ProtocolSerialization7::decode_bucket_response(document::ByteBuffer& in_buf, Fun
return decode_response<ProtobufType>(in_buf, [&](const ProtobufType& res) {
auto reply = f(res);
if (res.has_remapped_bucket_id()) {
- reply->remapBucketId(document::BucketId(res.remapped_bucket_id().raw_id()));
+ reply->remapBucketId(get_bucket_id(res.remapped_bucket_id()));
}
return reply;
});
@@ -373,12 +394,10 @@ void no_op_encode([[maybe_unused]] ::google::protobuf::Message&) {
// nothing to do here.
}
-void set_document_if_present(protobuf::Document& target_doc, const document::Document* src_doc) {
- if (src_doc) {
- vespalib::nbostream stream;
- src_doc->serialize(stream);
- target_doc.set_payload(stream.peek(), stream.size());
- }
+void set_document(protobuf::Document& target_doc, const document::Document& src_doc) {
+ vespalib::nbostream stream;
+ src_doc.serialize(stream);
+ target_doc.set_payload(stream.peek(), stream.size());
}
}
@@ -394,7 +413,9 @@ void ProtocolSerialization7::onEncode(GBBuf& buf, const api::PutCommand& msg) co
if (msg.getCondition().isPresent()) {
set_tas_condition(*req.mutable_condition(), msg.getCondition());
}
- set_document_if_present(*req.mutable_document(), msg.getDocument().get());
+ if (msg.getDocument()) {
+ set_document(*req.mutable_document(), *msg.getDocument());
+ }
});
}
@@ -521,8 +542,9 @@ void ProtocolSerialization7::onEncode(GBBuf& buf, const api::GetCommand& msg) co
void ProtocolSerialization7::onEncode(GBBuf& buf, const api::GetReply& msg) const {
encode_bucket_info_response<protobuf::GetResponse>(buf, msg, [&](auto& res) {
- // FIXME this will always create an empty document field!
- set_document_if_present(*res.mutable_document(), msg.getDocument().get());
+ if (msg.getDocument()) {
+ set_document(*res.mutable_document(), *msg.getDocument());
+ }
res.set_last_modified_timestamp(msg.getLastModifiedTimestamp());
});
}
@@ -557,7 +579,7 @@ api::StorageReply::UP ProtocolSerialization7::onDecodeGetReply(const SCmd& cmd,
void ProtocolSerialization7::onEncode(GBBuf& buf, const api::RevertCommand& msg) const {
encode_bucket_request<protobuf::RevertRequest>(buf, msg, [&](auto& req) {
auto* tokens = req.mutable_revert_tokens();
- assert(msg.getRevertTokens().size() < INT_MAX);
+ assert(msg.getRevertTokens().size() <= INT_MAX);
tokens->Reserve(static_cast<int>(msg.getRevertTokens().size()));
for (auto token : msg.getRevertTokens()) {
tokens->Add(token);
@@ -690,8 +712,7 @@ void set_merge_nodes(::google::protobuf::RepeatedPtrField<protobuf::MergeNode>&
std::vector<api::MergeBucketCommand::Node> get_merge_nodes(
const ::google::protobuf::RepeatedPtrField<protobuf::MergeNode>& src)
{
- using Node = api::MergeBucketCommand::Node;
- std::vector<Node> nodes;
+ std::vector<api::MergeBucketCommand::Node> nodes;
nodes.reserve(src.size());
for (const auto& node : src) {
nodes.emplace_back(node.index(), node.source_only());
@@ -719,14 +740,13 @@ void ProtocolSerialization7::onEncode(GBBuf& buf, const api::MergeBucketReply& m
api::StorageCommand::UP ProtocolSerialization7::onDecodeMergeBucketCommand(BBuf& buf) const {
return decode_bucket_request<protobuf::MergeBucketRequest>(buf, [&](auto& req, auto& bucket) {
auto nodes = get_merge_nodes(req.nodes());
+ auto cmd = std::make_unique<api::MergeBucketCommand>(bucket, std::move(nodes), req.max_timestamp());
+ cmd->setClusterStateVersion(req.cluster_state_version());
std::vector<uint16_t> chain;
chain.reserve(req.node_chain_size());
for (uint16_t node : req.node_chain()) {
chain.emplace_back(node);
}
-
- auto cmd = std::make_unique<api::MergeBucketCommand>(bucket, std::move(nodes), req.max_timestamp());
- cmd->setClusterStateVersion(req.cluster_state_version());
cmd->setChain(std::move(chain));
return cmd;
});
@@ -748,10 +768,10 @@ void set_global_id(protobuf::GlobalId& dest, const document::GlobalId& src) {
static_assert(document::GlobalId::LENGTH == 12);
uint64_t lo64;
uint32_t hi32;
- memcpy(&lo64, src.get() + sizeof(uint32_t), sizeof(uint64_t));
- memcpy(&hi32, src.get(), sizeof(uint32_t));
- dest.set_hi_32(hi32);
+ memcpy(&lo64, src.get(), sizeof(uint64_t));
+ memcpy(&hi32, src.get() + sizeof(uint64_t), sizeof(uint32_t));
dest.set_lo_64(lo64);
+ dest.set_hi_32(hi32);
}
document::GlobalId get_global_id(const protobuf::GlobalId& src) {
@@ -760,8 +780,8 @@ document::GlobalId get_global_id(const protobuf::GlobalId& src) {
const uint32_t hi32 = src.hi_32();
char buf[document::GlobalId::LENGTH];
- memcpy(buf, &hi32, sizeof(uint32_t));
- memcpy(buf + sizeof(uint32_t), &lo64, sizeof(uint64_t));
+ memcpy(buf, &lo64, sizeof(uint64_t));
+ memcpy(buf + sizeof(uint64_t), &hi32, sizeof(uint32_t));
return document::GlobalId(buf);
}
@@ -854,6 +874,7 @@ void fill_api_apply_diff_vector(std::vector<api::ApplyBucketDiffCommand::Entry>&
auto& dest = diff[i];
dest._entry = get_diff_entry(proto_entry.entry_meta());
dest._docName = proto_entry.document_id();
+ // TODO consider making buffers std::strings instead to avoid explicit zeroing-on-resize overhead
dest._headerBlob.resize(proto_entry.header_blob().size());
memcpy(dest._headerBlob.data(), proto_entry.header_blob().data(), proto_entry.header_blob().size());
dest._bodyBlob.resize(proto_entry.body_blob().size());
@@ -913,12 +934,12 @@ api::StorageReply::UP ProtocolSerialization7::onDecodeApplyBucketDiffReply(const
void ProtocolSerialization7::onEncode(GBBuf& buf, const api::RequestBucketInfoCommand& msg) const {
encode_request<protobuf::RequestBucketInfoRequest>(buf, msg, [&](auto& req) {
- req.mutable_bucket_space()->set_space_id(msg.getBucketSpace().getId());
+ set_bucket_space(*req.mutable_bucket_space(), msg.getBucketSpace());
auto& buckets = msg.getBuckets();
if (!buckets.empty()) {
auto* proto_buckets = req.mutable_explicit_bucket_set();
for (const auto& b : buckets) {
- proto_buckets->add_bucket_ids()->set_raw_id(b.getRawId());
+ set_bucket_id(*proto_buckets->add_bucket_ids(), b);
}
} else {
auto* all_buckets = req.mutable_all_buckets();
@@ -944,17 +965,17 @@ void ProtocolSerialization7::onEncode(GBBuf& buf, const api::RequestBucketInfoRe
api::StorageCommand::UP ProtocolSerialization7::onDecodeRequestBucketInfoCommand(BBuf& buf) const {
return decode_request<protobuf::RequestBucketInfoRequest>(buf, [&](auto& req) {
- document::BucketSpace bucket_space(req.bucket_space().space_id());
+ auto bucket_space = get_bucket_space(req.bucket_space());
if (req.has_explicit_bucket_set()) {
const uint32_t n_buckets = req.explicit_bucket_set().bucket_ids_size();
std::vector<document::BucketId> buckets(n_buckets);
const auto& proto_buckets = req.explicit_bucket_set().bucket_ids();
for (uint32_t i = 0; i < n_buckets; ++i) {
- buckets[i] = document::BucketId(proto_buckets.Get(i).raw_id());
+ buckets[i] = get_bucket_id(proto_buckets.Get(i));
}
return std::make_unique<api::RequestBucketInfoCommand>(bucket_space, std::move(buckets));
} else if (req.has_all_buckets()) {
- auto& all_req = req.all_buckets();
+ const auto& all_req = req.all_buckets();
return std::make_unique<api::RequestBucketInfoCommand>(
bucket_space, all_req.distributor_index(),
lib::ClusterState(all_req.cluster_state()), all_req.distribution_hash());
@@ -971,7 +992,7 @@ api::StorageReply::UP ProtocolSerialization7::onDecodeRequestBucketInfoReply(con
uint32_t n_entries = res.bucket_infos_size();
dest_entries.resize(n_entries);
for (uint32_t i = 0; i < n_entries; ++i) {
- auto& proto_entry = res.bucket_infos(i);
+ const auto& proto_entry = res.bucket_infos(i);
dest_entries[i]._bucketId = document::BucketId(proto_entry.raw_bucket_id());
dest_entries[i]._info = get_bucket_info(proto_entry.bucket_info());
}
@@ -1060,7 +1081,7 @@ api::StorageReply::UP ProtocolSerialization7::onDecodeSplitBucketReply(const SCm
void ProtocolSerialization7::onEncode(GBBuf& buf, const api::JoinBucketsCommand& msg) const {
encode_bucket_request<protobuf::JoinBucketsRequest>(buf, msg, [&](auto& req) {
for (const auto& source : msg.getSourceBuckets()) {
- req.add_source_buckets()->set_raw_id(source.getRawId());
+ set_bucket_id(*req.add_source_buckets(), source);
}
req.set_min_join_bits(msg.getMinJoinBits());
});
@@ -1075,9 +1096,9 @@ api::StorageCommand::UP ProtocolSerialization7::onDecodeJoinBucketsCommand(BBuf&
auto cmd = std::make_unique<api::JoinBucketsCommand>(bucket);
auto& entries = cmd->getSourceBuckets();
for (const auto& proto_bucket : req.source_buckets()) {
- entries.emplace_back(proto_bucket.raw_id());
+ entries.emplace_back(get_bucket_id(proto_bucket));
}
- cmd->setMinJoinBits(req.min_join_bits());
+ cmd->setMinJoinBits(static_cast<uint8_t>(req.min_join_bits()));
return cmd;
});
}
@@ -1129,9 +1150,9 @@ api::StorageReply::UP ProtocolSerialization7::onDecodeSetBucketStateReply(const
void ProtocolSerialization7::onEncode(GBBuf& buf, const api::CreateVisitorCommand& msg) const {
encode_request<protobuf::CreateVisitorRequest>(buf, msg, [&](auto& req) {
- req.mutable_bucket_space()->set_space_id(msg.getBucketSpace().getId());
+ set_bucket_space(*req.mutable_bucket_space(), msg.getBucketSpace());
for (const auto& bucket : msg.getBuckets()) {
- req.add_buckets()->set_raw_id(bucket.getRawId());
+ set_bucket_id(*req.add_buckets(), bucket);
}
auto* ctrl_meta = req.mutable_control_meta();
@@ -1176,13 +1197,13 @@ void ProtocolSerialization7::onEncode(GBBuf& buf, const api::CreateVisitorReply&
api::StorageCommand::UP ProtocolSerialization7::onDecodeCreateVisitorCommand(BBuf& buf) const {
return decode_request<protobuf::CreateVisitorRequest>(buf, [&](auto& req) {
- document::BucketSpace bucket_space(req.bucket_space().space_id());
- auto& ctrl_meta = req.control_meta();
+ auto bucket_space = get_bucket_space(req.bucket_space());
+ auto& ctrl_meta = req.control_meta();
auto& constraints = req.constraints();
auto cmd = std::make_unique<api::CreateVisitorCommand>(bucket_space, ctrl_meta.library_name(),
ctrl_meta.instance_id(), constraints.document_selection());
for (const auto& proto_bucket : req.buckets()) {
- cmd->getBuckets().emplace_back(proto_bucket.raw_id());
+ cmd->getBuckets().emplace_back(get_bucket_id(proto_bucket));
}
cmd->setVisitorCmdId(ctrl_meta.visitor_command_id());
@@ -1191,7 +1212,7 @@ api::StorageCommand::UP ProtocolSerialization7::onDecodeCreateVisitorCommand(BBu
cmd->setMaximumPendingReplyCount(ctrl_meta.max_pending_reply_count());
cmd->setQueueTimeout(ctrl_meta.queue_timeout());
cmd->setMaxBucketsPerVisitor(ctrl_meta.max_buckets_per_visitor());
- cmd->setVisitorDispatcherVersion(50); // FIXME this magic number is lifted verbatim from the 5.1 protocol
+ cmd->setVisitorDispatcherVersion(50); // FIXME this magic number is lifted verbatim from the 5.1 protocol impl
for (const auto& proto_param : req.client_parameters()) {
cmd->getParameters().set(proto_param.key(), proto_param.value());
@@ -1249,11 +1270,4 @@ api::StorageReply::UP ProtocolSerialization7::onDecodeDestroyVisitorReply(const
});
}
-/*
- * TODO extend testing of:
- * - bucket info in responses
- * - bucket remapping in responses
- * - presence of fields
- */
-
-}
+} // storage::mbusprot