diff options
author | Tor Brede Vekterli <vekterli@verizonmedia.com> | 2019-04-09 14:35:06 +0000 |
---|---|---|
committer | Tor Brede Vekterli <vekterli@verizonmedia.com> | 2019-04-09 14:35:06 +0000 |
commit | d9391e832e4f0c5e87000a49eac72b017631e5a7 (patch) | |
tree | 473a9122bbe2c9635ac8d1cf6add3aa36c7aedd1 /storageapi | |
parent | 0fc0216005f381793f3a4a519d609f4958f397e2 (diff) |
Misc cleanup and refactoring
Diffstat (limited to 'storageapi')
4 files changed, 88 insertions, 77 deletions
diff --git a/storageapi/src/vespa/storageapi/mbusprot/protobuf/common.proto b/storageapi/src/vespa/storageapi/mbusprot/protobuf/common.proto index e7c0a641407..2745fcc28b0 100644 --- a/storageapi/src/vespa/storageapi/mbusprot/protobuf/common.proto +++ b/storageapi/src/vespa/storageapi/mbusprot/protobuf/common.proto @@ -58,8 +58,10 @@ message BucketInfo { } message GlobalId { - // 96 bits of GID data in _little_ endian. + // 96 bits of GID data in _little_ endian. High entropy, so fixed encoding is better than varint. + // Low 64 bits as if memcpy()ed from bytes [0, 8) of the GID buffer fixed64 lo_64 = 1; + // High 32 bits as if memcpy()ed from bytes [8, 12) of the GID buffer fixed32 hi_32 = 2; } @@ -68,7 +70,7 @@ message RequestHeader { uint64 message_id = 1; uint32 priority = 2; // Always in range [0, 255] uint32 source_index = 3; // Always in range [0, 65535] - fixed32 loadtype_id = 4; + fixed32 loadtype_id = 4; // It's a hash with high entropy, so fixed encoding is better than varint } // TODO these should ideally be gRPC headers.. diff --git a/storageapi/src/vespa/storageapi/mbusprot/protobuf/visiting.proto b/storageapi/src/vespa/storageapi/mbusprot/protobuf/visiting.proto index aefd4c6c805..e3f5271599e 100644 --- a/storageapi/src/vespa/storageapi/mbusprot/protobuf/visiting.proto +++ b/storageapi/src/vespa/storageapi/mbusprot/protobuf/visiting.proto @@ -14,32 +14,32 @@ message ClientVisitorParameter { } message VisitorConstraints { - bytes document_selection = 1; - uint64 from_time_usec = 2; - uint64 to_time_usec = 3; - bool visit_removes = 5; - bytes field_set = 6; - bool visit_inconsistent_buckets = 7; + bytes document_selection = 1; + uint64 from_time_usec = 2; + uint64 to_time_usec = 3; + bool visit_removes = 4; + bytes field_set = 5; + bool visit_inconsistent_buckets = 6; } message VisitorControlMeta { - bytes instance_id = 1; - bytes library_name = 2; - uint32 visitor_command_id = 3; - bytes control_destination = 4; - bytes data_destination = 5; + bytes instance_id = 1; + bytes library_name = 2; + uint32 visitor_command_id = 3; + bytes control_destination = 4; + bytes data_destination = 5; // TODO move? uint32 max_pending_reply_count = 6; - uint32 queue_timeout = 7; + uint32 queue_timeout = 7; uint32 max_buckets_per_visitor = 8; } message CreateVisitorRequest { - BucketSpace bucket_space = 1; - repeated BucketId buckets = 2; + BucketSpace bucket_space = 1; + repeated BucketId buckets = 2; - VisitorConstraints constraints = 3; + VisitorConstraints constraints = 3; VisitorControlMeta control_meta = 4; repeated ClientVisitorParameter client_parameters = 5; } diff --git a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization.h b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization.h index 6fe152302ee..a57627b9ba9 100644 --- a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization.h +++ b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization.h @@ -95,13 +95,10 @@ protected: virtual void onEncode(GBBuf&, const api::GetBucketDiffReply&) const = 0; virtual void onEncode(GBBuf&, const api::ApplyBucketDiffCommand&) const = 0; virtual void onEncode(GBBuf&, const api::ApplyBucketDiffReply&) const = 0; - virtual void onEncode(GBBuf&, - const api::RequestBucketInfoCommand&) const = 0; + virtual void onEncode(GBBuf&, const api::RequestBucketInfoCommand&) const = 0; virtual void onEncode(GBBuf&, const api::RequestBucketInfoReply&) const = 0; - virtual void onEncode(GBBuf&, - const api::NotifyBucketChangeCommand&) const = 0; - virtual void onEncode(GBBuf&, - const api::NotifyBucketChangeReply&) const = 0; + virtual void onEncode(GBBuf&, const api::NotifyBucketChangeCommand&) const = 0; + virtual void onEncode(GBBuf&, const api::NotifyBucketChangeReply&) const = 0; virtual void onEncode(GBBuf&, const api::SplitBucketCommand&) const = 0; virtual void onEncode(GBBuf&, const api::SplitBucketReply&) const = 0; virtual void onEncode(GBBuf&, const api::JoinBucketsCommand&) const = 0; @@ -136,11 +133,9 @@ protected: virtual SCmd::UP onDecodeApplyBucketDiffCommand(BBuf&) const = 0; virtual SRep::UP onDecodeApplyBucketDiffReply(const SCmd&, BBuf&) const = 0; virtual SCmd::UP onDecodeRequestBucketInfoCommand(BBuf&) const = 0; - virtual SRep::UP onDecodeRequestBucketInfoReply(const SCmd&, - BBuf&) const = 0; + virtual SRep::UP onDecodeRequestBucketInfoReply(const SCmd&, BBuf&) const = 0; virtual SCmd::UP onDecodeNotifyBucketChangeCommand(BBuf&) const = 0; - virtual SRep::UP onDecodeNotifyBucketChangeReply(const SCmd&, - BBuf&) const = 0; + virtual SRep::UP onDecodeNotifyBucketChangeReply(const SCmd&, BBuf&) const = 0; virtual SCmd::UP onDecodeSplitBucketCommand(BBuf&) const = 0; virtual SRep::UP onDecodeSplitBucketReply(const SCmd&, BBuf&) const = 0; virtual SCmd::UP onDecodeJoinBucketsCommand(BBuf&) const = 0; diff --git a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.cpp b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.cpp index 23ec5ab86e7..9352e2a75db 100644 --- a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.cpp +++ b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.cpp @@ -28,6 +28,22 @@ void set_bucket(protobuf::Bucket& dest, const document::Bucket& src) { dest.set_space_id(src.getBucketSpace().getId()); } +void set_bucket_id(protobuf::BucketId& dest, const document::BucketId& src) { + dest.set_raw_id(src.getRawId()); +} + +document::BucketId get_bucket_id(const protobuf::BucketId& src) { + return document::BucketId(src.raw_id()); +} + +void set_bucket_space(protobuf::BucketSpace& dest, const document::BucketSpace& src) { + dest.set_space_id(src.getId()); +} + +document::BucketSpace get_bucket_space(const protobuf::BucketSpace& src) { + return document::BucketSpace(src.space_id()); +} + void set_bucket_info(protobuf::BucketInfo& dest, const api::BucketInfo& src) { auto* info = dest.mutable_info_v1(); info->set_last_modified_timestamp(src.getLastModified()); @@ -70,7 +86,6 @@ void set_tas_condition(protobuf::TestAndSetCondition& dest, const documentapi::T dest.set_selection(src.getSelection().data(), src.getSelection().size()); } -// TODO add test with unset doc field in root proto std::shared_ptr<document::Document> get_document(const protobuf::Document& src_doc, const document::DocumentTypeRepo& type_repo) { @@ -229,9 +244,12 @@ public: _load_types(load_types) { decode_request_header(in_buf, _hdr); - bool ok = _proto_obj->ParseFromArray(in_buf.getBufferAtPos(), in_buf.getRemaining()); // FIXME size handling + assert(in_buf.getRemaining() <= INT_MAX); + bool ok = _proto_obj->ParseFromArray(in_buf.getBufferAtPos(), in_buf.getRemaining()); if (!ok) { - throw vespalib::IllegalArgumentException("Malformed protobuf request payload"); + throw vespalib::IllegalArgumentException( + vespalib::make_string("Malformed protobuf request payload for %s", + ProtobufType::descriptor()->full_name().c_str())); } } @@ -257,9 +275,12 @@ public: _proto_obj(::google::protobuf::Arena::Create<ProtobufType>(&_arena)) { decode_response_header(in_buf, _hdr); - bool ok = _proto_obj->ParseFromArray(in_buf.getBufferAtPos(), in_buf.getRemaining()); // FIXME size handling + assert(in_buf.getRemaining() <= INT_MAX); + bool ok = _proto_obj->ParseFromArray(in_buf.getBufferAtPos(), in_buf.getRemaining()); if (!ok) { - throw vespalib::IllegalArgumentException("Malformed protobuf response payload"); + throw vespalib::IllegalArgumentException( + vespalib::make_string("Malformed protobuf response payload for %s", + ProtobufType::descriptor()->full_name().c_str())); } } @@ -327,7 +348,7 @@ template <typename ProtobufType, typename Func> void encode_bucket_response(vespalib::GrowableByteBuffer& out_buf, const api::BucketReply& reply, Func&& f) { encode_response<ProtobufType>(out_buf, reply, [&](ProtobufType& res) { if (reply.hasBeenRemapped()) { - res.mutable_remapped_bucket_id()->set_raw_id(reply.getBucketId().getRawId()); + set_bucket_id(*res.mutable_remapped_bucket_id(), reply.getBucketId()); } f(res); }); @@ -339,7 +360,7 @@ ProtocolSerialization7::decode_bucket_response(document::ByteBuffer& in_buf, Fun return decode_response<ProtobufType>(in_buf, [&](const ProtobufType& res) { auto reply = f(res); if (res.has_remapped_bucket_id()) { - reply->remapBucketId(document::BucketId(res.remapped_bucket_id().raw_id())); + reply->remapBucketId(get_bucket_id(res.remapped_bucket_id())); } return reply; }); @@ -373,12 +394,10 @@ void no_op_encode([[maybe_unused]] ::google::protobuf::Message&) { // nothing to do here. } -void set_document_if_present(protobuf::Document& target_doc, const document::Document* src_doc) { - if (src_doc) { - vespalib::nbostream stream; - src_doc->serialize(stream); - target_doc.set_payload(stream.peek(), stream.size()); - } +void set_document(protobuf::Document& target_doc, const document::Document& src_doc) { + vespalib::nbostream stream; + src_doc.serialize(stream); + target_doc.set_payload(stream.peek(), stream.size()); } } @@ -394,7 +413,9 @@ void ProtocolSerialization7::onEncode(GBBuf& buf, const api::PutCommand& msg) co if (msg.getCondition().isPresent()) { set_tas_condition(*req.mutable_condition(), msg.getCondition()); } - set_document_if_present(*req.mutable_document(), msg.getDocument().get()); + if (msg.getDocument()) { + set_document(*req.mutable_document(), *msg.getDocument()); + } }); } @@ -521,8 +542,9 @@ void ProtocolSerialization7::onEncode(GBBuf& buf, const api::GetCommand& msg) co void ProtocolSerialization7::onEncode(GBBuf& buf, const api::GetReply& msg) const { encode_bucket_info_response<protobuf::GetResponse>(buf, msg, [&](auto& res) { - // FIXME this will always create an empty document field! - set_document_if_present(*res.mutable_document(), msg.getDocument().get()); + if (msg.getDocument()) { + set_document(*res.mutable_document(), *msg.getDocument()); + } res.set_last_modified_timestamp(msg.getLastModifiedTimestamp()); }); } @@ -557,7 +579,7 @@ api::StorageReply::UP ProtocolSerialization7::onDecodeGetReply(const SCmd& cmd, void ProtocolSerialization7::onEncode(GBBuf& buf, const api::RevertCommand& msg) const { encode_bucket_request<protobuf::RevertRequest>(buf, msg, [&](auto& req) { auto* tokens = req.mutable_revert_tokens(); - assert(msg.getRevertTokens().size() < INT_MAX); + assert(msg.getRevertTokens().size() <= INT_MAX); tokens->Reserve(static_cast<int>(msg.getRevertTokens().size())); for (auto token : msg.getRevertTokens()) { tokens->Add(token); @@ -690,8 +712,7 @@ void set_merge_nodes(::google::protobuf::RepeatedPtrField<protobuf::MergeNode>& std::vector<api::MergeBucketCommand::Node> get_merge_nodes( const ::google::protobuf::RepeatedPtrField<protobuf::MergeNode>& src) { - using Node = api::MergeBucketCommand::Node; - std::vector<Node> nodes; + std::vector<api::MergeBucketCommand::Node> nodes; nodes.reserve(src.size()); for (const auto& node : src) { nodes.emplace_back(node.index(), node.source_only()); @@ -719,14 +740,13 @@ void ProtocolSerialization7::onEncode(GBBuf& buf, const api::MergeBucketReply& m api::StorageCommand::UP ProtocolSerialization7::onDecodeMergeBucketCommand(BBuf& buf) const { return decode_bucket_request<protobuf::MergeBucketRequest>(buf, [&](auto& req, auto& bucket) { auto nodes = get_merge_nodes(req.nodes()); + auto cmd = std::make_unique<api::MergeBucketCommand>(bucket, std::move(nodes), req.max_timestamp()); + cmd->setClusterStateVersion(req.cluster_state_version()); std::vector<uint16_t> chain; chain.reserve(req.node_chain_size()); for (uint16_t node : req.node_chain()) { chain.emplace_back(node); } - - auto cmd = std::make_unique<api::MergeBucketCommand>(bucket, std::move(nodes), req.max_timestamp()); - cmd->setClusterStateVersion(req.cluster_state_version()); cmd->setChain(std::move(chain)); return cmd; }); @@ -748,10 +768,10 @@ void set_global_id(protobuf::GlobalId& dest, const document::GlobalId& src) { static_assert(document::GlobalId::LENGTH == 12); uint64_t lo64; uint32_t hi32; - memcpy(&lo64, src.get() + sizeof(uint32_t), sizeof(uint64_t)); - memcpy(&hi32, src.get(), sizeof(uint32_t)); - dest.set_hi_32(hi32); + memcpy(&lo64, src.get(), sizeof(uint64_t)); + memcpy(&hi32, src.get() + sizeof(uint64_t), sizeof(uint32_t)); dest.set_lo_64(lo64); + dest.set_hi_32(hi32); } document::GlobalId get_global_id(const protobuf::GlobalId& src) { @@ -760,8 +780,8 @@ document::GlobalId get_global_id(const protobuf::GlobalId& src) { const uint32_t hi32 = src.hi_32(); char buf[document::GlobalId::LENGTH]; - memcpy(buf, &hi32, sizeof(uint32_t)); - memcpy(buf + sizeof(uint32_t), &lo64, sizeof(uint64_t)); + memcpy(buf, &lo64, sizeof(uint64_t)); + memcpy(buf + sizeof(uint64_t), &hi32, sizeof(uint32_t)); return document::GlobalId(buf); } @@ -854,6 +874,7 @@ void fill_api_apply_diff_vector(std::vector<api::ApplyBucketDiffCommand::Entry>& auto& dest = diff[i]; dest._entry = get_diff_entry(proto_entry.entry_meta()); dest._docName = proto_entry.document_id(); + // TODO consider making buffers std::strings instead to avoid explicit zeroing-on-resize overhead dest._headerBlob.resize(proto_entry.header_blob().size()); memcpy(dest._headerBlob.data(), proto_entry.header_blob().data(), proto_entry.header_blob().size()); dest._bodyBlob.resize(proto_entry.body_blob().size()); @@ -913,12 +934,12 @@ api::StorageReply::UP ProtocolSerialization7::onDecodeApplyBucketDiffReply(const void ProtocolSerialization7::onEncode(GBBuf& buf, const api::RequestBucketInfoCommand& msg) const { encode_request<protobuf::RequestBucketInfoRequest>(buf, msg, [&](auto& req) { - req.mutable_bucket_space()->set_space_id(msg.getBucketSpace().getId()); + set_bucket_space(*req.mutable_bucket_space(), msg.getBucketSpace()); auto& buckets = msg.getBuckets(); if (!buckets.empty()) { auto* proto_buckets = req.mutable_explicit_bucket_set(); for (const auto& b : buckets) { - proto_buckets->add_bucket_ids()->set_raw_id(b.getRawId()); + set_bucket_id(*proto_buckets->add_bucket_ids(), b); } } else { auto* all_buckets = req.mutable_all_buckets(); @@ -944,17 +965,17 @@ void ProtocolSerialization7::onEncode(GBBuf& buf, const api::RequestBucketInfoRe api::StorageCommand::UP ProtocolSerialization7::onDecodeRequestBucketInfoCommand(BBuf& buf) const { return decode_request<protobuf::RequestBucketInfoRequest>(buf, [&](auto& req) { - document::BucketSpace bucket_space(req.bucket_space().space_id()); + auto bucket_space = get_bucket_space(req.bucket_space()); if (req.has_explicit_bucket_set()) { const uint32_t n_buckets = req.explicit_bucket_set().bucket_ids_size(); std::vector<document::BucketId> buckets(n_buckets); const auto& proto_buckets = req.explicit_bucket_set().bucket_ids(); for (uint32_t i = 0; i < n_buckets; ++i) { - buckets[i] = document::BucketId(proto_buckets.Get(i).raw_id()); + buckets[i] = get_bucket_id(proto_buckets.Get(i)); } return std::make_unique<api::RequestBucketInfoCommand>(bucket_space, std::move(buckets)); } else if (req.has_all_buckets()) { - auto& all_req = req.all_buckets(); + const auto& all_req = req.all_buckets(); return std::make_unique<api::RequestBucketInfoCommand>( bucket_space, all_req.distributor_index(), lib::ClusterState(all_req.cluster_state()), all_req.distribution_hash()); @@ -971,7 +992,7 @@ api::StorageReply::UP ProtocolSerialization7::onDecodeRequestBucketInfoReply(con uint32_t n_entries = res.bucket_infos_size(); dest_entries.resize(n_entries); for (uint32_t i = 0; i < n_entries; ++i) { - auto& proto_entry = res.bucket_infos(i); + const auto& proto_entry = res.bucket_infos(i); dest_entries[i]._bucketId = document::BucketId(proto_entry.raw_bucket_id()); dest_entries[i]._info = get_bucket_info(proto_entry.bucket_info()); } @@ -1060,7 +1081,7 @@ api::StorageReply::UP ProtocolSerialization7::onDecodeSplitBucketReply(const SCm void ProtocolSerialization7::onEncode(GBBuf& buf, const api::JoinBucketsCommand& msg) const { encode_bucket_request<protobuf::JoinBucketsRequest>(buf, msg, [&](auto& req) { for (const auto& source : msg.getSourceBuckets()) { - req.add_source_buckets()->set_raw_id(source.getRawId()); + set_bucket_id(*req.add_source_buckets(), source); } req.set_min_join_bits(msg.getMinJoinBits()); }); @@ -1075,9 +1096,9 @@ api::StorageCommand::UP ProtocolSerialization7::onDecodeJoinBucketsCommand(BBuf& auto cmd = std::make_unique<api::JoinBucketsCommand>(bucket); auto& entries = cmd->getSourceBuckets(); for (const auto& proto_bucket : req.source_buckets()) { - entries.emplace_back(proto_bucket.raw_id()); + entries.emplace_back(get_bucket_id(proto_bucket)); } - cmd->setMinJoinBits(req.min_join_bits()); + cmd->setMinJoinBits(static_cast<uint8_t>(req.min_join_bits())); return cmd; }); } @@ -1129,9 +1150,9 @@ api::StorageReply::UP ProtocolSerialization7::onDecodeSetBucketStateReply(const void ProtocolSerialization7::onEncode(GBBuf& buf, const api::CreateVisitorCommand& msg) const { encode_request<protobuf::CreateVisitorRequest>(buf, msg, [&](auto& req) { - req.mutable_bucket_space()->set_space_id(msg.getBucketSpace().getId()); + set_bucket_space(*req.mutable_bucket_space(), msg.getBucketSpace()); for (const auto& bucket : msg.getBuckets()) { - req.add_buckets()->set_raw_id(bucket.getRawId()); + set_bucket_id(*req.add_buckets(), bucket); } auto* ctrl_meta = req.mutable_control_meta(); @@ -1176,13 +1197,13 @@ void ProtocolSerialization7::onEncode(GBBuf& buf, const api::CreateVisitorReply& api::StorageCommand::UP ProtocolSerialization7::onDecodeCreateVisitorCommand(BBuf& buf) const { return decode_request<protobuf::CreateVisitorRequest>(buf, [&](auto& req) { - document::BucketSpace bucket_space(req.bucket_space().space_id()); - auto& ctrl_meta = req.control_meta(); + auto bucket_space = get_bucket_space(req.bucket_space()); + auto& ctrl_meta = req.control_meta(); auto& constraints = req.constraints(); auto cmd = std::make_unique<api::CreateVisitorCommand>(bucket_space, ctrl_meta.library_name(), ctrl_meta.instance_id(), constraints.document_selection()); for (const auto& proto_bucket : req.buckets()) { - cmd->getBuckets().emplace_back(proto_bucket.raw_id()); + cmd->getBuckets().emplace_back(get_bucket_id(proto_bucket)); } cmd->setVisitorCmdId(ctrl_meta.visitor_command_id()); @@ -1191,7 +1212,7 @@ api::StorageCommand::UP ProtocolSerialization7::onDecodeCreateVisitorCommand(BBu cmd->setMaximumPendingReplyCount(ctrl_meta.max_pending_reply_count()); cmd->setQueueTimeout(ctrl_meta.queue_timeout()); cmd->setMaxBucketsPerVisitor(ctrl_meta.max_buckets_per_visitor()); - cmd->setVisitorDispatcherVersion(50); // FIXME this magic number is lifted verbatim from the 5.1 protocol + cmd->setVisitorDispatcherVersion(50); // FIXME this magic number is lifted verbatim from the 5.1 protocol impl for (const auto& proto_param : req.client_parameters()) { cmd->getParameters().set(proto_param.key(), proto_param.value()); @@ -1249,11 +1270,4 @@ api::StorageReply::UP ProtocolSerialization7::onDecodeDestroyVisitorReply(const }); } -/* - * TODO extend testing of: - * - bucket info in responses - * - bucket remapping in responses - * - presence of fields - */ - -} +} // storage::mbusprot |