// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "protocolserialization7.h" #include "serializationhelper.h" #include "protobuf_includes.h" #include #include #include #include #include #include #include #include namespace storage::mbusprot { ProtocolSerialization7::ProtocolSerialization7(std::shared_ptr repo) : ProtocolSerialization(), _repo(std::move(repo)) { } namespace { void set_bucket(protobuf::Bucket& dest, const document::Bucket& src) { dest.set_raw_bucket_id(src.getBucketId().getRawId()); dest.set_space_id(src.getBucketSpace().getId()); } void set_bucket_id(protobuf::BucketId& dest, const document::BucketId& src) { dest.set_raw_id(src.getRawId()); } document::BucketId get_bucket_id(const protobuf::BucketId& src) { return document::BucketId(src.raw_id()); } void set_bucket_space(protobuf::BucketSpace& dest, const document::BucketSpace& src) { dest.set_space_id(src.getId()); } document::BucketSpace get_bucket_space(const protobuf::BucketSpace& src) { return document::BucketSpace(src.space_id()); } void set_bucket_info(protobuf::BucketInfo& dest, const api::BucketInfo& src) { dest.set_last_modified_timestamp(src.getLastModified()); dest.set_legacy_checksum(src.getChecksum()); dest.set_doc_count(src.getDocumentCount()); dest.set_total_doc_size(src.getTotalDocumentSize()); dest.set_meta_count(src.getMetaCount()); dest.set_used_file_size(src.getUsedFileSize()); dest.set_active(src.isActive()); dest.set_ready(src.isReady()); } document::Bucket get_bucket(const protobuf::Bucket& src) { return {document::BucketSpace(src.space_id()), document::BucketId(src.raw_bucket_id())}; } api::BucketInfo get_bucket_info(const protobuf::BucketInfo& src) { api::BucketInfo info; info.setLastModified(src.last_modified_timestamp()); info.setChecksum(src.legacy_checksum()); info.setDocumentCount(src.doc_count()); info.setTotalDocumentSize(src.total_doc_size()); info.setMetaCount(src.meta_count()); info.setUsedFileSize(src.used_file_size()); info.setActive(src.active()); info.setReady(src.ready()); return info; } documentapi::TestAndSetCondition get_tas_condition(const protobuf::TestAndSetCondition& src) { return documentapi::TestAndSetCondition(src.selection()); } void set_tas_condition(protobuf::TestAndSetCondition& dest, const documentapi::TestAndSetCondition& src) { dest.set_selection(src.getSelection().data(), src.getSelection().size()); } std::shared_ptr get_document(const protobuf::Document& src_doc, const document::DocumentTypeRepo& type_repo) { if (!src_doc.payload().empty()) { vespalib::nbostream doc_buf(src_doc.payload().data(), src_doc.payload().size()); return std::make_shared(type_repo, doc_buf); } return {}; } void set_update(protobuf::Update& dest, const document::DocumentUpdate& src) { vespalib::nbostream stream; src.serializeHEAD(stream); dest.set_payload(stream.peek(), stream.size()); } std::shared_ptr get_update(const protobuf::Update& src, const document::DocumentTypeRepo& type_repo) { if (!src.payload().empty()) { return document::DocumentUpdate::createHEAD( type_repo, vespalib::nbostream(src.payload().data(), src.payload().size())); } return {}; } void write_request_header(vespalib::GrowableByteBuffer& buf, const api::StorageCommand& cmd) { protobuf::RequestHeader hdr; // Arena alloc not needed since there are no nested messages // TODO deprecate this field entirely; we already match replies with their originating command // on the sender and set the reply's message ID from that command directly. There's therefore // no need to redundantly carry this on the wire as well. // Important: cannot be deprecated until there are no nodes using this value verbatim as the // internal message ID! hdr.set_message_id(cmd.getMsgId()); // This is _our_ process-internal message ID hdr.set_priority(cmd.getPriority()); hdr.set_source_index(cmd.getSourceIndex()); uint8_t dest[128]; // Only primitive fields, should be plenty large enough. auto encoded_size = static_cast(hdr.ByteSizeLong()); assert(encoded_size <= sizeof(dest)); [[maybe_unused]] bool ok = hdr.SerializeWithCachedSizesToArray(dest); assert(ok); buf.putInt(encoded_size); buf.putBytes(reinterpret_cast(dest), encoded_size); } void write_response_header(vespalib::GrowableByteBuffer& buf, const api::StorageReply& reply) { protobuf::ResponseHeader hdr; // Arena alloc not needed since there are no nested messages const auto& result = reply.getResult(); hdr.set_return_code_id(static_cast(result.getResult())); if (!result.getMessage().empty()) { hdr.set_return_code_message(result.getMessage().data(), result.getMessage().size()); } // TODO deprecate this field entirely hdr.set_message_id(reply.originator_msg_id()); // This is the _peer's_ process-internal message ID hdr.set_priority(reply.getPriority()); const auto header_size = hdr.ByteSizeLong(); assert(header_size <= UINT32_MAX); buf.putInt(static_cast(header_size)); auto* dest_buf = reinterpret_cast(buf.allocate(header_size)); [[maybe_unused]] bool ok = hdr.SerializeWithCachedSizesToArray(dest_buf); assert(ok); } void decode_request_header(document::ByteBuffer& buf, protobuf::RequestHeader& hdr) { auto hdr_len = static_cast(SerializationHelper::getInt(buf)); if (hdr_len > buf.getRemaining()) { throw document::BufferOutOfBoundsException(buf.getPos(), hdr_len); } bool ok = hdr.ParseFromArray(buf.getBufferAtPos(), hdr_len); if (!ok) { throw vespalib::IllegalArgumentException("Malformed protobuf request header"); } buf.incPos(hdr_len); } void decode_response_header(document::ByteBuffer& buf, protobuf::ResponseHeader& hdr) { auto hdr_len = static_cast(SerializationHelper::getInt(buf)); if (hdr_len > buf.getRemaining()) { throw document::BufferOutOfBoundsException(buf.getPos(), hdr_len); } bool ok = hdr.ParseFromArray(buf.getBufferAtPos(), hdr_len); if (!ok) { throw vespalib::IllegalArgumentException("Malformed protobuf response header"); } buf.incPos(hdr_len); } } // anonymous namespace template class BaseEncoder { vespalib::GrowableByteBuffer& _out_buf; ::google::protobuf::Arena _arena; ProtobufType* _proto_obj; public: explicit BaseEncoder(vespalib::GrowableByteBuffer& out_buf) : _out_buf(out_buf), _arena(), _proto_obj(::google::protobuf::Arena::Create(&_arena)) { } void encode() { assert(_proto_obj != nullptr); const auto sz = _proto_obj->ByteSizeLong(); assert(sz <= UINT32_MAX); auto* buf = reinterpret_cast(_out_buf.allocate(sz)); [[maybe_unused]] bool ok = _proto_obj->SerializeWithCachedSizesToArray(buf); assert(ok); _proto_obj = nullptr; } protected: vespalib::GrowableByteBuffer& buffer() noexcept { return _out_buf; } // Precondition: encode() is not called ProtobufType& proto_obj() noexcept { return *_proto_obj; } const ProtobufType& proto_obj() const noexcept { return *_proto_obj; } }; template class RequestEncoder : public BaseEncoder { public: RequestEncoder(vespalib::GrowableByteBuffer& out_buf, const api::StorageCommand& cmd) : BaseEncoder(out_buf) { write_request_header(out_buf, cmd); } // Precondition: encode() is not called ProtobufType& request() noexcept { return this->proto_obj(); } const ProtobufType& request() const noexcept { return this->proto_obj(); } }; template class ResponseEncoder : public BaseEncoder { public: ResponseEncoder(vespalib::GrowableByteBuffer& out_buf, const api::StorageReply& reply) : BaseEncoder(out_buf) { write_response_header(out_buf, reply); } // Precondition: encode() is not called ProtobufType& response() noexcept { return this->proto_obj(); } const ProtobufType& response() const noexcept { return this->proto_obj(); } }; template class RequestDecoder { protobuf::RequestHeader _hdr; ::google::protobuf::Arena _arena; ProtobufType* _proto_obj; public: explicit RequestDecoder(document::ByteBuffer& in_buf) : _arena(), _proto_obj(::google::protobuf::Arena::Create(&_arena)) { decode_request_header(in_buf, _hdr); assert(in_buf.getRemaining() <= INT_MAX); bool ok = _proto_obj->ParseFromArray(in_buf.getBufferAtPos(), in_buf.getRemaining()); if (!ok) { throw vespalib::IllegalArgumentException( vespalib::make_string("Malformed protobuf request payload for %s", ProtobufType::descriptor()->full_name().c_str())); } } void transfer_meta_information_to(api::StorageCommand& dest) { dest.force_originator_msg_id(_hdr.message_id()); // TODO deprecate dest.setPriority(static_cast(_hdr.priority())); dest.setSourceIndex(static_cast(_hdr.source_index())); } ProtobufType& request() noexcept { return *_proto_obj; } const ProtobufType& request() const noexcept { return *_proto_obj; } }; template class ResponseDecoder { protobuf::ResponseHeader _hdr; ::google::protobuf::Arena _arena; ProtobufType* _proto_obj; public: explicit ResponseDecoder(document::ByteBuffer& in_buf) : _arena(), _proto_obj(::google::protobuf::Arena::Create(&_arena)) { decode_response_header(in_buf, _hdr); assert(in_buf.getRemaining() <= INT_MAX); bool ok = _proto_obj->ParseFromArray(in_buf.getBufferAtPos(), in_buf.getRemaining()); if (!ok) { throw vespalib::IllegalArgumentException( vespalib::make_string("Malformed protobuf response payload for %s", ProtobufType::descriptor()->full_name().c_str())); } } void transfer_meta_information_to(api::StorageReply& dest) { // TODO deprecate this; not currently used internally (message ID is explicitly set // from the originator command instance) dest.force_originator_msg_id(_hdr.message_id()); dest.setPriority(static_cast(_hdr.priority())); dest.setResult(api::ReturnCode(static_cast(_hdr.return_code_id()), _hdr.return_code_message())); } ProtobufType& response() noexcept { return *_proto_obj; } const ProtobufType& response() const noexcept { return *_proto_obj; } }; template void encode_request(vespalib::GrowableByteBuffer& out_buf, const api::StorageCommand& msg, Func&& f) { RequestEncoder enc(out_buf, msg); f(enc.request()); enc.encode(); } template void encode_response(vespalib::GrowableByteBuffer& out_buf, const api::StorageReply& reply, Func&& f) { ResponseEncoder enc(out_buf, reply); auto& res = enc.response(); f(res); enc.encode(); } template std::unique_ptr ProtocolSerialization7::decode_request(document::ByteBuffer& in_buf, Func&& f) const { RequestDecoder dec(in_buf); const auto& req = dec.request(); auto cmd = f(req); dec.transfer_meta_information_to(*cmd); return cmd; } template std::unique_ptr ProtocolSerialization7::decode_response(document::ByteBuffer& in_buf, Func&& f) const { ResponseDecoder dec(in_buf); const auto& res = dec.response(); auto reply = f(res); dec.transfer_meta_information_to(*reply); return reply; } template void encode_bucket_request(vespalib::GrowableByteBuffer& out_buf, const api::BucketCommand& msg, Func&& f) { encode_request(out_buf, msg, [&](ProtobufType& req) { set_bucket(*req.mutable_bucket(), msg.getBucket()); f(req); }); } template std::unique_ptr ProtocolSerialization7::decode_bucket_request(document::ByteBuffer& in_buf, Func&& f) const { return decode_request(in_buf, [&](const ProtobufType& req) { if (!req.has_bucket()) { throw vespalib::IllegalArgumentException( vespalib::make_string("Malformed protocol buffer request for %s; no bucket", ProtobufType::descriptor()->full_name().c_str())); } const auto bucket = get_bucket(req.bucket()); return f(req, bucket); }); } template void encode_bucket_response(vespalib::GrowableByteBuffer& out_buf, const api::BucketReply& reply, Func&& f) { encode_response(out_buf, reply, [&](ProtobufType& res) { if (reply.hasBeenRemapped()) { set_bucket_id(*res.mutable_remapped_bucket_id(), reply.getBucketId()); } f(res); }); } template std::unique_ptr ProtocolSerialization7::decode_bucket_response(document::ByteBuffer& in_buf, Func&& f) const { return decode_response(in_buf, [&](const ProtobufType& res) { auto reply = f(res); if (res.has_remapped_bucket_id()) { reply->remapBucketId(get_bucket_id(res.remapped_bucket_id())); } return reply; }); } template void encode_bucket_info_response(vespalib::GrowableByteBuffer& out_buf, const api::BucketInfoReply& reply, Func&& f) { encode_bucket_response(out_buf, reply, [&](ProtobufType& res) { set_bucket_info(*res.mutable_bucket_info(), reply.getBucketInfo()); f(res); }); } template std::unique_ptr ProtocolSerialization7::decode_bucket_info_response(document::ByteBuffer& in_buf, Func&& f) const { return decode_bucket_response(in_buf, [&](const ProtobufType& res) { auto reply = f(res); reply->setBucketInfo(get_bucket_info(res.bucket_info())); // If not present, default of all zeroes is correct return reply; }); } // TODO document protobuf ducktyping assumptions namespace { // Inherit from known base class just to avoid having to template this. We don't care about its subtype anyway. void no_op_encode([[maybe_unused]] ::google::protobuf::Message&) { // nothing to do here. } void set_document(protobuf::Document& target_doc, const document::Document& src_doc) { vespalib::nbostream stream; src_doc.serialize(stream); target_doc.set_payload(stream.peek(), stream.size()); } } // ----------------------------------------------------------------- // Put // ----------------------------------------------------------------- void ProtocolSerialization7::onEncode(GBBuf& buf, const api::PutCommand& msg) const { encode_bucket_request(buf, msg, [&](auto& req) { req.set_new_timestamp(msg.getTimestamp()); req.set_expected_old_timestamp(msg.getUpdateTimestamp()); if (msg.getCondition().isPresent()) { set_tas_condition(*req.mutable_condition(), msg.getCondition()); } if (msg.getDocument()) { set_document(*req.mutable_document(), *msg.getDocument()); } req.set_create_if_non_existent(msg.get_create_if_non_existent()); }); } void ProtocolSerialization7::onEncode(GBBuf& buf, const api::PutReply& msg) const { encode_bucket_info_response(buf, msg, [&](auto& res) { res.set_was_found(msg.wasFound()); }); } api::StorageCommand::UP ProtocolSerialization7::onDecodePutCommand(BBuf& buf) const { return decode_bucket_request(buf, [&](auto& req, auto& bucket) { auto document = get_document(req.document(), type_repo()); auto cmd = std::make_unique(bucket, std::move(document), req.new_timestamp()); cmd->setUpdateTimestamp(req.expected_old_timestamp()); if (req.has_condition()) { cmd->setCondition(get_tas_condition(req.condition())); } cmd->set_create_if_non_existent(req.create_if_non_existent()); return cmd; }); } api::StorageReply::UP ProtocolSerialization7::onDecodePutReply(const SCmd& cmd, BBuf& buf) const { return decode_bucket_info_response(buf, [&](auto& res) { return std::make_unique(static_cast(cmd), res.was_found()); }); } // ----------------------------------------------------------------- // Update // ----------------------------------------------------------------- void ProtocolSerialization7::onEncode(GBBuf& buf, const api::UpdateCommand& msg) const { encode_bucket_request(buf, msg, [&](auto& req) { auto* update = msg.getUpdate().get(); if (update) { set_update(*req.mutable_update(), *update); } req.set_new_timestamp(msg.getTimestamp()); req.set_expected_old_timestamp(msg.getOldTimestamp()); if (msg.getCondition().isPresent()) { set_tas_condition(*req.mutable_condition(), msg.getCondition()); } if (msg.has_cached_create_if_missing()) { req.set_create_if_missing(msg.create_if_missing() ? protobuf::UpdateRequest_CreateIfMissing_TRUE : protobuf::UpdateRequest_CreateIfMissing_FALSE); } }); } void ProtocolSerialization7::onEncode(GBBuf& buf, const api::UpdateReply& msg) const { encode_bucket_info_response(buf, msg, [&](auto& res) { res.set_updated_timestamp(msg.getOldTimestamp()); }); } api::StorageCommand::UP ProtocolSerialization7::onDecodeUpdateCommand(BBuf& buf) const { return decode_bucket_request(buf, [&](auto& req, auto& bucket) { auto update = get_update(req.update(), type_repo()); auto cmd = std::make_unique(bucket, std::move(update), req.new_timestamp()); cmd->setOldTimestamp(req.expected_old_timestamp()); if (req.has_condition()) { cmd->setCondition(get_tas_condition(req.condition())); } if (req.create_if_missing() != protobuf::UpdateRequest_CreateIfMissing_UNSPECIFIED) { cmd->set_cached_create_if_missing(req.create_if_missing() == protobuf::UpdateRequest_CreateIfMissing_TRUE); } return cmd; }); } api::StorageReply::UP ProtocolSerialization7::onDecodeUpdateReply(const SCmd& cmd, BBuf& buf) const { return decode_bucket_info_response(buf, [&](auto& res) { return std::make_unique(static_cast(cmd), res.updated_timestamp()); }); } // ----------------------------------------------------------------- // Remove // ----------------------------------------------------------------- void ProtocolSerialization7::onEncode(GBBuf& buf, const api::RemoveCommand& msg) const { encode_bucket_request(buf, msg, [&](auto& req) { auto doc_id_str = msg.getDocumentId().toString(); req.set_document_id(doc_id_str.data(), doc_id_str.size()); req.set_new_timestamp(msg.getTimestamp()); if (msg.getCondition().isPresent()) { set_tas_condition(*req.mutable_condition(), msg.getCondition()); } }); } void ProtocolSerialization7::onEncode(GBBuf& buf, const api::RemoveReply& msg) const { encode_bucket_info_response(buf, msg, [&](auto& res) { res.set_removed_timestamp(msg.getOldTimestamp()); }); } api::StorageCommand::UP ProtocolSerialization7::onDecodeRemoveCommand(BBuf& buf) const { return decode_bucket_request(buf, [&](auto& req, auto& bucket) { document::DocumentId doc_id(vespalib::stringref(req.document_id().data(), req.document_id().size())); auto cmd = std::make_unique(bucket, doc_id, req.new_timestamp()); if (req.has_condition()) { cmd->setCondition(get_tas_condition(req.condition())); } return cmd; }); } api::StorageReply::UP ProtocolSerialization7::onDecodeRemoveReply(const SCmd& cmd, BBuf& buf) const { return decode_bucket_info_response(buf, [&](auto& res) { return std::make_unique(static_cast(cmd), res.removed_timestamp()); }); } // ----------------------------------------------------------------- // Get // ----------------------------------------------------------------- namespace { protobuf::GetRequest_InternalReadConsistency read_consistency_to_protobuf(api::InternalReadConsistency consistency) { switch (consistency) { case api::InternalReadConsistency::Strong: return protobuf::GetRequest_InternalReadConsistency_Strong; case api::InternalReadConsistency::Weak: return protobuf::GetRequest_InternalReadConsistency_Weak; default: return protobuf::GetRequest_InternalReadConsistency_Strong; } } api::InternalReadConsistency read_consistency_from_protobuf(protobuf::GetRequest_InternalReadConsistency consistency) { switch (consistency) { case protobuf::GetRequest_InternalReadConsistency_Strong: return api::InternalReadConsistency::Strong; case protobuf::GetRequest_InternalReadConsistency_Weak: return api::InternalReadConsistency::Weak; default: return api::InternalReadConsistency::Strong; } } } void ProtocolSerialization7::onEncode(GBBuf& buf, const api::GetCommand& msg) const { encode_bucket_request(buf, msg, [&](auto& req) { auto doc_id = msg.getDocumentId().toString(); req.set_document_id(doc_id.data(), doc_id.size()); req.set_before_timestamp(msg.getBeforeTimestamp()); if (!msg.getFieldSet().empty()) { req.set_field_set(msg.getFieldSet().data(), msg.getFieldSet().size()); } req.set_internal_read_consistency(read_consistency_to_protobuf(msg.internal_read_consistency())); if (msg.has_condition()) { set_tas_condition(*req.mutable_condition(), msg.condition()); } }); } void ProtocolSerialization7::onEncode(GBBuf& buf, const api::GetReply& msg) const { encode_bucket_info_response(buf, msg, [&](auto& res) { if (msg.getDocument()) { set_document(*res.mutable_document(), *msg.getDocument()); } if (!msg.is_tombstone()) { res.set_last_modified_timestamp(msg.getLastModifiedTimestamp()); } else { // This field will be ignored by older versions, making the behavior as if // a timestamp of zero was returned for tombstones, as it the legacy behavior. res.set_tombstone_timestamp(msg.getLastModifiedTimestamp()); // Will not be encoded onto the wire, but we include it here to hammer down the // point that it's intentional to have the last modified time appear as a not // found document for older versions. res.set_last_modified_timestamp(0); } res.set_condition_matched(msg.condition_matched()); }); } api::StorageCommand::UP ProtocolSerialization7::onDecodeGetCommand(BBuf& buf) const { return decode_bucket_request(buf, [&](auto& req, auto& bucket) { document::DocumentId doc_id(vespalib::stringref(req.document_id().data(), req.document_id().size())); auto op = std::make_unique(bucket, std::move(doc_id), req.field_set(), req.before_timestamp()); op->set_internal_read_consistency(read_consistency_from_protobuf(req.internal_read_consistency())); if (req.has_condition()) { op->set_condition(get_tas_condition(req.condition())); } return op; }); } api::StorageReply::UP ProtocolSerialization7::onDecodeGetReply(const SCmd& cmd, BBuf& buf) const { return decode_bucket_info_response(buf, [&](auto& res) { try { auto document = get_document(res.document(), type_repo()); const bool is_tombstone = (res.tombstone_timestamp() != 0); const auto effective_timestamp = (is_tombstone ? res.tombstone_timestamp() : res.last_modified_timestamp()); return std::make_unique(static_cast(cmd), std::move(document), effective_timestamp, false, is_tombstone, res.condition_matched()); } catch (std::exception& e) { auto reply = std::make_unique(static_cast(cmd), std::shared_ptr(), 0u); reply->setResult(api::ReturnCode(api::ReturnCode::UNPARSEABLE, e.what())); return reply; } }); } // ----------------------------------------------------------------- // RemoveLocation // ----------------------------------------------------------------- namespace { void set_document_id(protobuf::DocumentId& dest, const document::DocumentId& src) { *dest.mutable_id() = src.toString(); } document::DocumentId get_document_id(const protobuf::DocumentId& src) { return document::DocumentId(src.id()); // id() shall always be null terminated } void set_id_and_timestamp_vector(::google::protobuf::RepeatedPtrField& dest, const std::vector& src) { dest.Reserve(src.size()); for (const auto& src_entry : src) { auto* dest_entry = dest.Add(); dest_entry->set_timestamp(src_entry.timestamp); set_document_id(*dest_entry->mutable_id(), src_entry.id); } } std::vector get_id_and_timestamp_vector(const ::google::protobuf::RepeatedPtrField& src) { std::vector vec; vec.reserve(src.size()); for (const auto& src_entry : src) { vec.emplace_back(get_document_id(src_entry.id()), spi::Timestamp(src_entry.timestamp())); } return vec; } } void ProtocolSerialization7::onEncode(GBBuf& buf, const api::RemoveLocationCommand& msg) const { encode_bucket_request(buf, msg, [&](auto& req) { req.set_document_selection(msg.getDocumentSelection().data(), msg.getDocumentSelection().size()); if (msg.only_enumerate_docs()) { req.mutable_phase_one(); // Instantiating it is enough } else if (!msg.explicit_remove_set().empty()) { set_id_and_timestamp_vector(*req.mutable_phase_two()->mutable_explicit_remove_set(), msg.explicit_remove_set()); } }); } void ProtocolSerialization7::onEncode(GBBuf& buf, const api::RemoveLocationReply& msg) const { encode_bucket_info_response(buf, msg, [&](auto& res) { res.mutable_stats()->set_documents_removed(msg.documents_removed()); if (!msg.selection_matches().empty()) { set_id_and_timestamp_vector(*res.mutable_selection_matches(), msg.selection_matches()); } }); } api::StorageCommand::UP ProtocolSerialization7::onDecodeRemoveLocationCommand(BBuf& buf) const { return decode_bucket_request(buf, [&](auto& req, auto& bucket) { auto cmd = std::make_unique(req.document_selection(), bucket); if (req.has_phase_one()) { cmd->set_only_enumerate_docs(true); } else if (req.has_phase_two()) { cmd->set_explicit_remove_set(get_id_and_timestamp_vector(req.phase_two().explicit_remove_set())); } return cmd; }); } api::StorageReply::UP ProtocolSerialization7::onDecodeRemoveLocationReply(const SCmd& cmd, BBuf& buf) const { return decode_bucket_info_response(buf, [&](auto& res) { uint32_t documents_removed = (res.has_stats() ? res.stats().documents_removed() : 0u); auto reply = std::make_unique( static_cast(cmd), documents_removed); if (!res.selection_matches().empty()) { reply->set_selection_matches(get_id_and_timestamp_vector(res.selection_matches())); } return reply; }); } // ----------------------------------------------------------------- // DeleteBucket // ----------------------------------------------------------------- void ProtocolSerialization7::onEncode(GBBuf& buf, const api::DeleteBucketCommand& msg) const { encode_bucket_request(buf, msg, [&](auto& req) { set_bucket_info(*req.mutable_expected_bucket_info(), msg.getBucketInfo()); }); } void ProtocolSerialization7::onEncode(GBBuf& buf, const api::DeleteBucketReply& msg) const { encode_bucket_info_response(buf, msg, no_op_encode); } api::StorageCommand::UP ProtocolSerialization7::onDecodeDeleteBucketCommand(BBuf& buf) const { return decode_bucket_request(buf, [&](auto& req, auto& bucket) { auto cmd = std::make_unique(bucket); if (req.has_expected_bucket_info()) { cmd->setBucketInfo(get_bucket_info(req.expected_bucket_info())); } return cmd; }); } api::StorageReply::UP ProtocolSerialization7::onDecodeDeleteBucketReply(const SCmd& cmd, BBuf& buf) const { return decode_bucket_info_response(buf, [&]([[maybe_unused]] auto& res) { return std::make_unique(static_cast(cmd)); }); } // ----------------------------------------------------------------- // CreateBucket // ----------------------------------------------------------------- void ProtocolSerialization7::onEncode(GBBuf& buf, const api::CreateBucketCommand& msg) const { encode_bucket_request(buf, msg, [&](auto& req) { req.set_create_as_active(msg.getActive()); }); } void ProtocolSerialization7::onEncode(GBBuf& buf, const api::CreateBucketReply& msg) const { encode_bucket_info_response(buf, msg, no_op_encode); } api::StorageCommand::UP ProtocolSerialization7::onDecodeCreateBucketCommand(BBuf& buf) const { return decode_bucket_request(buf, [&](auto& req, auto& bucket) { auto cmd = std::make_unique(bucket); cmd->setActive(req.create_as_active()); return cmd; }); } api::StorageReply::UP ProtocolSerialization7::onDecodeCreateBucketReply(const SCmd& cmd, BBuf& buf) const { return decode_bucket_info_response(buf, [&]([[maybe_unused]] auto& res) { return std::make_unique(static_cast(cmd)); }); } // ----------------------------------------------------------------- // MergeBucket // ----------------------------------------------------------------- namespace { void set_merge_nodes(::google::protobuf::RepeatedPtrField& dest, const std::vector& src) { dest.Reserve(src.size()); for (const auto& src_node : src) { auto* dest_node = dest.Add(); dest_node->set_index(src_node.index); dest_node->set_source_only(src_node.sourceOnly); } } std::vector get_merge_nodes( const ::google::protobuf::RepeatedPtrField& src) { std::vector nodes; nodes.reserve(src.size()); for (const auto& node : src) { nodes.emplace_back(node.index(), node.source_only()); } return nodes; } } void ProtocolSerialization7::onEncode(GBBuf& buf, const api::MergeBucketCommand& msg) const { encode_bucket_request(buf, msg, [&](auto& req) { set_merge_nodes(*req.mutable_nodes(), msg.getNodes()); req.set_max_timestamp(msg.getMaxTimestamp()); req.set_cluster_state_version(msg.getClusterStateVersion()); req.set_unordered_forwarding(msg.use_unordered_forwarding()); req.set_estimated_memory_footprint(msg.estimated_memory_footprint()); for (uint16_t chain_node : msg.getChain()) { req.add_node_chain(chain_node); } }); } void ProtocolSerialization7::onEncode(GBBuf& buf, const api::MergeBucketReply& msg) const { encode_bucket_response(buf, msg, no_op_encode); } api::StorageCommand::UP ProtocolSerialization7::onDecodeMergeBucketCommand(BBuf& buf) const { return decode_bucket_request(buf, [&](auto& req, auto& bucket) { auto nodes = get_merge_nodes(req.nodes()); auto cmd = std::make_unique(bucket, std::move(nodes), req.max_timestamp()); cmd->setClusterStateVersion(req.cluster_state_version()); std::vector chain; chain.reserve(req.node_chain_size()); for (uint16_t node : req.node_chain()) { chain.emplace_back(node); } cmd->setChain(std::move(chain)); cmd->set_use_unordered_forwarding(req.unordered_forwarding()); cmd->set_estimated_memory_footprint(req.estimated_memory_footprint()); return cmd; }); } api::StorageReply::UP ProtocolSerialization7::onDecodeMergeBucketReply(const SCmd& cmd, BBuf& buf) const { return decode_bucket_response(buf, [&]([[maybe_unused]] auto& res) { return std::make_unique(static_cast(cmd)); }); } // ----------------------------------------------------------------- // GetBucketDiff // ----------------------------------------------------------------- namespace { void set_global_id(protobuf::GlobalId& dest, const document::GlobalId& src) { static_assert(document::GlobalId::LENGTH == 12); uint64_t lo64; uint32_t hi32; memcpy(&lo64, src.get(), sizeof(uint64_t)); memcpy(&hi32, src.get() + sizeof(uint64_t), sizeof(uint32_t)); dest.set_lo_64(lo64); dest.set_hi_32(hi32); } document::GlobalId get_global_id(const protobuf::GlobalId& src) { static_assert(document::GlobalId::LENGTH == 12); const uint64_t lo64 = src.lo_64(); const uint32_t hi32 = src.hi_32(); char buf[document::GlobalId::LENGTH]; memcpy(buf, &lo64, sizeof(uint64_t)); memcpy(buf + sizeof(uint64_t), &hi32, sizeof(uint32_t)); return document::GlobalId(buf); } void set_diff_entry(protobuf::MetaDiffEntry& dest, const api::GetBucketDiffCommand::Entry& src) { dest.set_timestamp(src._timestamp); set_global_id(*dest.mutable_gid(), src._gid); dest.set_header_size(src._headerSize); dest.set_body_size(src._bodySize); dest.set_flags(src._flags); dest.set_presence_mask(src._hasMask); } api::GetBucketDiffCommand::Entry get_diff_entry(const protobuf::MetaDiffEntry& src) { api::GetBucketDiffCommand::Entry e; e._timestamp = src.timestamp(); e._gid = get_global_id(src.gid()); e._headerSize = src.header_size(); e._bodySize = src.body_size(); e._flags = src.flags(); e._hasMask = src.presence_mask(); return e; } void fill_proto_meta_diff(::google::protobuf::RepeatedPtrField& dest, const std::vector& src) { for (const auto& diff_entry : src) { set_diff_entry(*dest.Add(), diff_entry); } } void fill_api_meta_diff(std::vector& dest, const ::google::protobuf::RepeatedPtrField& src) { // FIXME GetBucketDiffReply ctor copies the diff from the request for some reason // TODO verify this isn't actually used anywhere and remove this "feature". dest.clear(); dest.reserve(src.size()); for (const auto& diff_entry : src) { dest.emplace_back(get_diff_entry(diff_entry)); } } } // anonymous namespace void ProtocolSerialization7::onEncode(GBBuf& buf, const api::GetBucketDiffCommand& msg) const { encode_bucket_request(buf, msg, [&](auto& req) { set_merge_nodes(*req.mutable_nodes(), msg.getNodes()); req.set_max_timestamp(msg.getMaxTimestamp()); fill_proto_meta_diff(*req.mutable_diff(), msg.getDiff()); }); } void ProtocolSerialization7::onEncode(GBBuf& buf, const api::GetBucketDiffReply& msg) const { encode_bucket_response(buf, msg, [&](auto& res) { fill_proto_meta_diff(*res.mutable_diff(), msg.getDiff()); }); } api::StorageCommand::UP ProtocolSerialization7::onDecodeGetBucketDiffCommand(BBuf& buf) const { return decode_bucket_request(buf, [&](auto& req, auto& bucket) { auto nodes = get_merge_nodes(req.nodes()); auto cmd = std::make_unique(bucket, std::move(nodes), req.max_timestamp()); fill_api_meta_diff(cmd->getDiff(), req.diff()); return cmd; }); } api::StorageReply::UP ProtocolSerialization7::onDecodeGetBucketDiffReply(const SCmd& cmd, BBuf& buf) const { return decode_bucket_response(buf, [&](auto& res) { auto reply = std::make_unique(static_cast(cmd)); fill_api_meta_diff(reply->getDiff(), res.diff()); return reply; }); } // ----------------------------------------------------------------- // ApplyBucketDiff // ----------------------------------------------------------------- namespace { void fill_api_apply_diff_vector(std::vector& diff, const ::google::protobuf::RepeatedPtrField& src) { // We use the same approach as the legacy protocols here in that we pre-reserve and // directly write into the vector. This avoids having to ensure all buffer management is movable. size_t n_entries = src.size(); diff.resize(n_entries); for (size_t i = 0; i < n_entries; ++i) { auto& proto_entry = src.Get(i); auto& dest = diff[i]; dest._entry = get_diff_entry(proto_entry.entry_meta()); dest._docName = proto_entry.document_id(); // TODO consider making buffers std::strings instead to avoid explicit zeroing-on-resize overhead dest._headerBlob.resize(proto_entry.header_blob().size()); if (!proto_entry.header_blob().empty()) { memcpy(dest._headerBlob.data(), proto_entry.header_blob().data(), proto_entry.header_blob().size()); } dest._bodyBlob.resize(proto_entry.body_blob().size()); if (!proto_entry.body_blob().empty()) { memcpy(dest._bodyBlob.data(), proto_entry.body_blob().data(), proto_entry.body_blob().size()); } } } void fill_proto_apply_diff_vector(::google::protobuf::RepeatedPtrField& dest, const std::vector& src) { dest.Reserve(src.size()); for (const auto& entry : src) { auto* proto_entry = dest.Add(); set_diff_entry(*proto_entry->mutable_entry_meta(), entry._entry); proto_entry->set_document_id(entry._docName.data(), entry._docName.size()); proto_entry->set_header_blob(entry._headerBlob.data(), entry._headerBlob.size()); proto_entry->set_body_blob(entry._bodyBlob.data(), entry._bodyBlob.size()); } } } // anonymous namespace void ProtocolSerialization7::onEncode(GBBuf& buf, const api::ApplyBucketDiffCommand& msg) const { encode_bucket_request(buf, msg, [&](auto& req) { set_merge_nodes(*req.mutable_nodes(), msg.getNodes()); req.set_max_buffer_size(0x400000); // Unused, GC soon. fill_proto_apply_diff_vector(*req.mutable_entries(), msg.getDiff()); }); } void ProtocolSerialization7::onEncode(GBBuf& buf, const api::ApplyBucketDiffReply& msg) const { encode_bucket_response(buf, msg, [&](auto& res) { fill_proto_apply_diff_vector(*res.mutable_entries(), msg.getDiff()); }); } api::StorageCommand::UP ProtocolSerialization7::onDecodeApplyBucketDiffCommand(BBuf& buf) const { return decode_bucket_request(buf, [&](auto& req, auto& bucket) { auto nodes = get_merge_nodes(req.nodes()); auto cmd = std::make_unique(bucket, std::move(nodes)); fill_api_apply_diff_vector(cmd->getDiff(), req.entries()); return cmd; }); } api::StorageReply::UP ProtocolSerialization7::onDecodeApplyBucketDiffReply(const SCmd& cmd, BBuf& buf) const { return decode_bucket_response(buf, [&](auto& res) { auto reply = std::make_unique(static_cast(cmd)); fill_api_apply_diff_vector(reply->getDiff(), res.entries()); return reply; }); } // ----------------------------------------------------------------- // RequestBucketInfo // ----------------------------------------------------------------- void ProtocolSerialization7::onEncode(GBBuf& buf, const api::RequestBucketInfoCommand& msg) const { encode_request(buf, msg, [&](auto& req) { set_bucket_space(*req.mutable_bucket_space(), msg.getBucketSpace()); auto& buckets = msg.getBuckets(); if (!buckets.empty()) { auto* proto_buckets = req.mutable_explicit_bucket_set(); for (const auto& b : buckets) { set_bucket_id(*proto_buckets->add_bucket_ids(), b); } } else { auto* all_buckets = req.mutable_all_buckets(); auto cluster_state = msg.getSystemState().toString(); all_buckets->set_distributor_index(msg.getDistributor()); all_buckets->set_cluster_state(cluster_state.data(), cluster_state.size()); all_buckets->set_distribution_hash(msg.getDistributionHash().data(), msg.getDistributionHash().size()); } }); } void ProtocolSerialization7::onEncode(GBBuf& buf, const api::RequestBucketInfoReply& msg) const { encode_response(buf, msg, [&](auto& res) { auto* proto_info = res.mutable_bucket_infos(); proto_info->Reserve(msg.getBucketInfo().size()); for (const auto& entry : msg.getBucketInfo()) { auto* bucket_and_info = proto_info->Add(); bucket_and_info->set_raw_bucket_id(entry._bucketId.getRawId()); set_bucket_info(*bucket_and_info->mutable_bucket_info(), entry._info); } // We mark features as available at protocol level. Only included for full bucket fetch responses. if (msg.full_bucket_fetch()) { res.mutable_supported_node_features()->set_unordered_merge_chaining(true); res.mutable_supported_node_features()->set_two_phase_remove_location(true); res.mutable_supported_node_features()->set_no_implicit_indexing_of_active_buckets(true); res.mutable_supported_node_features()->set_document_condition_probe(true); } }); } api::StorageCommand::UP ProtocolSerialization7::onDecodeRequestBucketInfoCommand(BBuf& buf) const { return decode_request(buf, [&](auto& req) { auto bucket_space = get_bucket_space(req.bucket_space()); if (req.has_explicit_bucket_set()) { const uint32_t n_buckets = req.explicit_bucket_set().bucket_ids_size(); std::vector buckets(n_buckets); const auto& proto_buckets = req.explicit_bucket_set().bucket_ids(); for (uint32_t i = 0; i < n_buckets; ++i) { buckets[i] = get_bucket_id(proto_buckets.Get(i)); } return std::make_unique(bucket_space, std::move(buckets)); } else if (req.has_all_buckets()) { const auto& all_req = req.all_buckets(); return std::make_unique( bucket_space, all_req.distributor_index(), lib::ClusterState(all_req.cluster_state()), all_req.distribution_hash()); } else { throw vespalib::IllegalArgumentException("RequestBucketInfo does not have any applicable fields set"); } }); } api::StorageReply::UP ProtocolSerialization7::onDecodeRequestBucketInfoReply(const SCmd& cmd, BBuf& buf) const { return decode_response(buf, [&](auto& res) { auto reply = std::make_unique(static_cast(cmd)); auto& dest_entries = reply->getBucketInfo(); uint32_t n_entries = res.bucket_infos_size(); dest_entries.resize(n_entries); for (uint32_t i = 0; i < n_entries; ++i) { const auto& proto_entry = res.bucket_infos(i); dest_entries[i]._bucketId = document::BucketId(proto_entry.raw_bucket_id()); dest_entries[i]._info = get_bucket_info(proto_entry.bucket_info()); } if (res.has_supported_node_features()) { const auto& src_features = res.supported_node_features(); auto& dest_features = reply->supported_node_features(); dest_features.unordered_merge_chaining = src_features.unordered_merge_chaining(); dest_features.two_phase_remove_location = src_features.two_phase_remove_location(); dest_features.no_implicit_indexing_of_active_buckets = src_features.no_implicit_indexing_of_active_buckets(); dest_features.document_condition_probe = src_features.document_condition_probe(); } return reply; }); } // ----------------------------------------------------------------- // NotifyBucketChange // ----------------------------------------------------------------- void ProtocolSerialization7::onEncode(GBBuf& buf, const api::NotifyBucketChangeCommand& msg) const { encode_bucket_request(buf, msg, [&](auto& req) { set_bucket_info(*req.mutable_bucket_info(), msg.getBucketInfo()); }); } void ProtocolSerialization7::onEncode(GBBuf& buf, const api::NotifyBucketChangeReply& msg) const { encode_response(buf, msg, no_op_encode); } api::StorageCommand::UP ProtocolSerialization7::onDecodeNotifyBucketChangeCommand(BBuf& buf) const { return decode_bucket_request(buf, [&](auto& req, auto& bucket) { auto bucket_info = get_bucket_info(req.bucket_info()); return std::make_unique(bucket, bucket_info); }); } api::StorageReply::UP ProtocolSerialization7::onDecodeNotifyBucketChangeReply(const SCmd& cmd, BBuf& buf) const { return decode_response(buf, [&]([[maybe_unused]] auto& res) { return std::make_unique(static_cast(cmd)); }); } // ----------------------------------------------------------------- // SplitBucket // ----------------------------------------------------------------- void ProtocolSerialization7::onEncode(GBBuf& buf, const api::SplitBucketCommand& msg) const { encode_bucket_request(buf, msg, [&](auto& req) { req.set_min_split_bits(msg.getMinSplitBits()); req.set_max_split_bits(msg.getMaxSplitBits()); req.set_min_byte_size(msg.getMinByteSize()); req.set_min_doc_count(msg.getMinDocCount()); }); } void ProtocolSerialization7::onEncode(GBBuf& buf, const api::SplitBucketReply& msg) const { encode_bucket_response(buf, msg, [&](auto& res) { for (const auto& split_info : msg.getSplitInfo()) { auto* proto_info = res.add_split_info(); proto_info->set_raw_bucket_id(split_info.first.getRawId()); set_bucket_info(*proto_info->mutable_bucket_info(), split_info.second); } }); } api::StorageCommand::UP ProtocolSerialization7::onDecodeSplitBucketCommand(BBuf& buf) const { return decode_bucket_request(buf, [&](auto& req, auto& bucket) { auto cmd = std::make_unique(bucket); cmd->setMinSplitBits(static_cast(req.min_split_bits())); cmd->setMaxSplitBits(static_cast(req.max_split_bits())); cmd->setMinByteSize(req.min_byte_size()); cmd->setMinDocCount(req.min_doc_count()); return cmd; }); } api::StorageReply::UP ProtocolSerialization7::onDecodeSplitBucketReply(const SCmd& cmd, BBuf& buf) const { return decode_bucket_response(buf, [&](auto& res) { auto reply = std::make_unique(static_cast(cmd)); auto& dest_info = reply->getSplitInfo(); dest_info.reserve(res.split_info_size()); for (const auto& proto_info : res.split_info()) { dest_info.emplace_back(document::BucketId(proto_info.raw_bucket_id()), get_bucket_info(proto_info.bucket_info())); } return reply; }); } // ----------------------------------------------------------------- // JoinBuckets // ----------------------------------------------------------------- void ProtocolSerialization7::onEncode(GBBuf& buf, const api::JoinBucketsCommand& msg) const { encode_bucket_request(buf, msg, [&](auto& req) { for (const auto& source : msg.getSourceBuckets()) { set_bucket_id(*req.add_source_buckets(), source); } req.set_min_join_bits(msg.getMinJoinBits()); }); } void ProtocolSerialization7::onEncode(GBBuf& buf, const api::JoinBucketsReply& msg) const { encode_bucket_info_response(buf, msg, no_op_encode); } api::StorageCommand::UP ProtocolSerialization7::onDecodeJoinBucketsCommand(BBuf& buf) const { return decode_bucket_request(buf, [&](auto& req, auto& bucket) { auto cmd = std::make_unique(bucket); auto& entries = cmd->getSourceBuckets(); for (const auto& proto_bucket : req.source_buckets()) { entries.emplace_back(get_bucket_id(proto_bucket)); } cmd->setMinJoinBits(static_cast(req.min_join_bits())); return cmd; }); } api::StorageReply::UP ProtocolSerialization7::onDecodeJoinBucketsReply(const SCmd& cmd, BBuf& buf) const { return decode_bucket_info_response(buf, [&]([[maybe_unused]] auto& res) { return std::make_unique(static_cast(cmd)); }); } // ----------------------------------------------------------------- // SetBucketState // ----------------------------------------------------------------- void ProtocolSerialization7::onEncode(GBBuf& buf, const api::SetBucketStateCommand& msg) const { encode_bucket_request(buf, msg, [&](auto& req) { auto state = (msg.getState() == api::SetBucketStateCommand::BUCKET_STATE::ACTIVE ? protobuf::SetBucketStateRequest_BucketState_Active : protobuf::SetBucketStateRequest_BucketState_Inactive); req.set_state(state); }); } void ProtocolSerialization7::onEncode(GBBuf& buf, const api::SetBucketStateReply& msg) const { // SetBucketStateReply is _technically_ a BucketInfoReply, but the legacy protocol impls // do _not_ encode bucket info as part of the wire format (and it's not used on the distributor), // so we follow that here and only encode remapping information. encode_bucket_response(buf, msg, no_op_encode); } api::StorageCommand::UP ProtocolSerialization7::onDecodeSetBucketStateCommand(BBuf& buf) const { return decode_bucket_request(buf, [&](auto& req, auto& bucket) { auto state = (req.state() == protobuf::SetBucketStateRequest_BucketState_Active ? api::SetBucketStateCommand::BUCKET_STATE::ACTIVE : api::SetBucketStateCommand::BUCKET_STATE::INACTIVE); return std::make_unique(bucket, state); }); } api::StorageReply::UP ProtocolSerialization7::onDecodeSetBucketStateReply(const SCmd& cmd, BBuf& buf) const { return decode_bucket_response(buf, [&]([[maybe_unused]] auto& res) { return std::make_unique(static_cast(cmd)); }); } // ----------------------------------------------------------------- // CreateVisitor // ----------------------------------------------------------------- void ProtocolSerialization7::onEncode(GBBuf& buf, const api::CreateVisitorCommand& msg) const { encode_request(buf, msg, [&](auto& req) { set_bucket_space(*req.mutable_bucket_space(), msg.getBucketSpace()); for (const auto& bucket : msg.getBuckets()) { set_bucket_id(*req.add_buckets(), bucket); } auto* ctrl_meta = req.mutable_control_meta(); ctrl_meta->set_library_name(msg.getLibraryName().data(), msg.getLibraryName().size()); ctrl_meta->set_instance_id(msg.getInstanceId().data(), msg.getInstanceId().size()); ctrl_meta->set_visitor_command_id(msg.getVisitorCmdId()); ctrl_meta->set_control_destination(msg.getControlDestination().data(), msg.getControlDestination().size()); ctrl_meta->set_data_destination(msg.getDataDestination().data(), msg.getDataDestination().size()); ctrl_meta->set_queue_timeout(vespalib::count_ms(msg.getQueueTimeout())); ctrl_meta->set_max_pending_reply_count(msg.getMaximumPendingReplyCount()); ctrl_meta->set_max_buckets_per_visitor(msg.getMaxBucketsPerVisitor()); auto* constraints = req.mutable_constraints(); constraints->set_document_selection(msg.getDocumentSelection().data(), msg.getDocumentSelection().size()); constraints->set_from_time_usec(msg.getFromTime()); constraints->set_to_time_usec(msg.getToTime()); constraints->set_visit_inconsistent_buckets(msg.visitInconsistentBuckets()); constraints->set_visit_removes(msg.visitRemoves()); constraints->set_field_set(msg.getFieldSet().data(), msg.getFieldSet().size()); for (const auto& param : msg.getParameters()) { auto* proto_param = req.add_client_parameters(); proto_param->set_key(param.first.data(), param.first.size()); proto_param->set_value(param.second.data(), param.second.size()); } }); } void ProtocolSerialization7::onEncode(GBBuf& buf, const api::CreateVisitorReply& msg) const { encode_response(buf, msg, [&](auto& res) { auto& stats = msg.getVisitorStatistics(); auto* proto_stats = res.mutable_visitor_statistics(); proto_stats->set_buckets_visited(stats.getBucketsVisited()); proto_stats->set_documents_visited(stats.getDocumentsVisited()); proto_stats->set_bytes_visited(stats.getBytesVisited()); proto_stats->set_documents_returned(stats.getDocumentsReturned()); proto_stats->set_bytes_returned(stats.getBytesReturned()); }); } api::StorageCommand::UP ProtocolSerialization7::onDecodeCreateVisitorCommand(BBuf& buf) const { return decode_request(buf, [&](auto& req) { auto bucket_space = get_bucket_space(req.bucket_space()); auto& ctrl_meta = req.control_meta(); auto& constraints = req.constraints(); auto cmd = std::make_unique(bucket_space, ctrl_meta.library_name(), ctrl_meta.instance_id(), constraints.document_selection()); for (const auto& proto_bucket : req.buckets()) { cmd->getBuckets().emplace_back(get_bucket_id(proto_bucket)); } cmd->setVisitorCmdId(ctrl_meta.visitor_command_id()); cmd->setControlDestination(ctrl_meta.control_destination()); cmd->setDataDestination(ctrl_meta.data_destination()); cmd->setMaximumPendingReplyCount(ctrl_meta.max_pending_reply_count()); cmd->setQueueTimeout(std::chrono::milliseconds(ctrl_meta.queue_timeout())); cmd->setMaxBucketsPerVisitor(ctrl_meta.max_buckets_per_visitor()); cmd->setVisitorDispatcherVersion(50); // FIXME this magic number is lifted verbatim from the 5.1 protocol impl for (const auto& proto_param : req.client_parameters()) { cmd->getParameters().set(proto_param.key(), proto_param.value()); } cmd->setFromTime(constraints.from_time_usec()); cmd->setToTime(constraints.to_time_usec()); cmd->setVisitRemoves(constraints.visit_removes()); cmd->setFieldSet(constraints.field_set()); cmd->setVisitInconsistentBuckets(constraints.visit_inconsistent_buckets()); return cmd; }); } api::StorageReply::UP ProtocolSerialization7::onDecodeCreateVisitorReply(const SCmd& cmd, BBuf& buf) const { return decode_response(buf, [&](auto& res) { auto reply = std::make_unique(static_cast(cmd)); vdslib::VisitorStatistics vs; const auto& proto_stats = res.visitor_statistics(); vs.setBucketsVisited(proto_stats.buckets_visited()); vs.setDocumentsVisited(proto_stats.documents_visited()); vs.setBytesVisited(proto_stats.bytes_visited()); vs.setDocumentsReturned(proto_stats.documents_returned()); vs.setBytesReturned(proto_stats.bytes_returned()); reply->setVisitorStatistics(vs); return reply; }); } // ----------------------------------------------------------------- // DestroyVisitor // ----------------------------------------------------------------- void ProtocolSerialization7::onEncode(GBBuf& buf, const api::DestroyVisitorCommand& msg) const { encode_request(buf, msg, [&](auto& req) { req.set_instance_id(msg.getInstanceId().data(), msg.getInstanceId().size()); }); } void ProtocolSerialization7::onEncode(GBBuf& buf, const api::DestroyVisitorReply& msg) const { encode_response(buf, msg, no_op_encode); } api::StorageCommand::UP ProtocolSerialization7::onDecodeDestroyVisitorCommand(BBuf& buf) const { return decode_request(buf, [&](auto& req) { return std::make_unique(req.instance_id()); }); } api::StorageReply::UP ProtocolSerialization7::onDecodeDestroyVisitorReply(const SCmd& cmd, BBuf& buf) const { return decode_response(buf, [&]([[maybe_unused]] auto& res) { return std::make_unique(static_cast(cmd)); }); } // ----------------------------------------------------------------- // StatBucket // ----------------------------------------------------------------- void ProtocolSerialization7::onEncode(GBBuf& buf, const api::StatBucketCommand& msg) const { encode_bucket_request(buf, msg, [&](auto& req) { req.set_document_selection(msg.getDocumentSelection().data(), msg.getDocumentSelection().size()); }); } void ProtocolSerialization7::onEncode(GBBuf& buf, const api::StatBucketReply& msg) const { encode_bucket_response(buf, msg, [&](auto& res) { res.set_results(msg.getResults().data(), msg.getResults().size()); }); } api::StorageCommand::UP ProtocolSerialization7::onDecodeStatBucketCommand(BBuf& buf) const { return decode_bucket_request(buf, [&](auto& req, auto& bucket) { return std::make_unique(bucket, req.document_selection()); }); } api::StorageReply::UP ProtocolSerialization7::onDecodeStatBucketReply(const SCmd& cmd, BBuf& buf) const { return decode_bucket_response(buf, [&](auto& res) { return std::make_unique(static_cast(cmd), res.results()); }); } } // storage::mbusprot