// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "protocolserialization7.h" #include "serializationhelper.h" #include "storagecommand.h" #include "storagereply.h" #include #include #include #include #include #include #include #include LOG_SETUP(".storage.api.mbusprot.serialization.base"); namespace storage::mbusprot { mbus::Blob ProtocolSerialization::encode(const api::StorageMessage& msg) const { vespalib::GrowableByteBuffer buf; buf.putInt(msg.getType().getId()); switch (msg.getType().getId()) { case api::MessageType::PUT_ID: onEncode(buf, static_cast(msg)); break; case api::MessageType::PUT_REPLY_ID: onEncode(buf, static_cast(msg)); break; case api::MessageType::UPDATE_ID: onEncode(buf, static_cast(msg)); break; case api::MessageType::UPDATE_REPLY_ID: onEncode(buf, static_cast(msg)); break; case api::MessageType::GET_ID: onEncode(buf, static_cast(msg)); break; case api::MessageType::GET_REPLY_ID: onEncode(buf, static_cast(msg)); break; case api::MessageType::REMOVE_ID: onEncode(buf, static_cast(msg)); break; case api::MessageType::REMOVE_REPLY_ID: onEncode(buf, static_cast(msg)); break; case api::MessageType::DELETEBUCKET_ID: onEncode(buf, static_cast(msg)); break; case api::MessageType::DELETEBUCKET_REPLY_ID: onEncode(buf, static_cast(msg)); break; case api::MessageType::CREATEBUCKET_ID: onEncode(buf, static_cast(msg)); break; case api::MessageType::CREATEBUCKET_REPLY_ID: onEncode(buf, static_cast(msg)); break; case api::MessageType::MERGEBUCKET_ID: onEncode(buf, static_cast(msg)); break; case api::MessageType::MERGEBUCKET_REPLY_ID: onEncode(buf, static_cast(msg)); break; case api::MessageType::GETBUCKETDIFF_ID: onEncode(buf, static_cast(msg)); break; case api::MessageType::GETBUCKETDIFF_REPLY_ID: onEncode(buf, static_cast(msg)); break; case api::MessageType::APPLYBUCKETDIFF_ID: onEncode(buf, static_cast(msg)); break; case api::MessageType::APPLYBUCKETDIFF_REPLY_ID: onEncode(buf, static_cast(msg)); break; case api::MessageType::REQUESTBUCKETINFO_ID: onEncode(buf, static_cast(msg)); break; case api::MessageType::REQUESTBUCKETINFO_REPLY_ID: onEncode(buf, static_cast(msg)); break; case api::MessageType::NOTIFYBUCKETCHANGE_ID: onEncode(buf, static_cast(msg)); break; case api::MessageType::NOTIFYBUCKETCHANGE_REPLY_ID: onEncode(buf, static_cast(msg)); break; case api::MessageType::SPLITBUCKET_ID: onEncode(buf, static_cast(msg)); break; case api::MessageType::SPLITBUCKET_REPLY_ID: onEncode(buf, static_cast(msg)); break; case api::MessageType::JOINBUCKETS_ID: onEncode(buf, static_cast(msg)); break; case api::MessageType::JOINBUCKETS_REPLY_ID: onEncode(buf, static_cast(msg)); break; case api::MessageType::VISITOR_CREATE_ID: onEncode(buf, static_cast(msg)); break; case api::MessageType::VISITOR_CREATE_REPLY_ID: onEncode(buf, static_cast(msg)); break; case api::MessageType::VISITOR_DESTROY_ID: onEncode(buf, static_cast(msg)); break; case api::MessageType::VISITOR_DESTROY_REPLY_ID: onEncode(buf, static_cast(msg)); break; case api::MessageType::STATBUCKET_ID: onEncode(buf, static_cast(msg)); break; case api::MessageType::STATBUCKET_REPLY_ID: onEncode(buf, static_cast(msg)); break; case api::MessageType::REMOVELOCATION_ID: onEncode(buf, static_cast(msg)); break; case api::MessageType::REMOVELOCATION_REPLY_ID: onEncode(buf, static_cast(msg)); break; case api::MessageType::SETBUCKETSTATE_ID: onEncode(buf, static_cast(msg)); break; case api::MessageType::SETBUCKETSTATE_REPLY_ID: onEncode(buf, static_cast(msg)); break; default: LOG(error, "Trying to encode unhandled type %s", msg.getType().toString().c_str()); abort(); } mbus::Blob retVal(buf.position()); memcpy(retVal.data(), buf.getBuffer(), buf.position()); return retVal; } StorageCommand::UP ProtocolSerialization::decodeCommand(mbus::BlobRef data) const { LOG(spam, "Decode %d bytes of data.", data.size()); if (data.size() < sizeof(int32_t)) { std::ostringstream ost; ost << "Request of size " << data.size() << " is not big enough to be " "able to store a request."; throw vespalib::IllegalArgumentException(ost.str(), VESPA_STRLOC); } document::ByteBuffer buf(data.data(), data.size()); int type; buf.getIntNetwork(type); SCmd::UP cmd; switch (type) { case api::MessageType::PUT_ID: cmd = onDecodePutCommand(buf); break; case api::MessageType::UPDATE_ID: cmd = onDecodeUpdateCommand(buf); break; case api::MessageType::GET_ID: cmd = onDecodeGetCommand(buf); break; case api::MessageType::REMOVE_ID: cmd = onDecodeRemoveCommand(buf); break; case api::MessageType::CREATEBUCKET_ID: cmd = onDecodeCreateBucketCommand(buf); break; case api::MessageType::DELETEBUCKET_ID: cmd = onDecodeDeleteBucketCommand(buf); break; case api::MessageType::MERGEBUCKET_ID: cmd = onDecodeMergeBucketCommand(buf); break; case api::MessageType::GETBUCKETDIFF_ID: cmd = onDecodeGetBucketDiffCommand(buf); break; case api::MessageType::APPLYBUCKETDIFF_ID: cmd = onDecodeApplyBucketDiffCommand(buf); break; case api::MessageType::REQUESTBUCKETINFO_ID: cmd = onDecodeRequestBucketInfoCommand(buf); break; case api::MessageType::NOTIFYBUCKETCHANGE_ID: cmd = onDecodeNotifyBucketChangeCommand(buf); break; case api::MessageType::SPLITBUCKET_ID: cmd = onDecodeSplitBucketCommand(buf); break; case api::MessageType::JOINBUCKETS_ID: cmd = onDecodeJoinBucketsCommand(buf); break; case api::MessageType::VISITOR_CREATE_ID: cmd = onDecodeCreateVisitorCommand(buf); break; case api::MessageType::VISITOR_DESTROY_ID: cmd = onDecodeDestroyVisitorCommand(buf); break; case api::MessageType::STATBUCKET_ID: cmd = onDecodeStatBucketCommand(buf); break; case api::MessageType::REMOVELOCATION_ID: cmd = onDecodeRemoveLocationCommand(buf); break; case api::MessageType::SETBUCKETSTATE_ID: cmd = onDecodeSetBucketStateCommand(buf); break; default: { std::ostringstream ost; ost << "Unknown storage command type " << type; throw vespalib::IllegalArgumentException(ost.str(), VESPA_STRLOC); } } return std::make_unique(std::move(cmd)); } StorageReply::UP ProtocolSerialization::decodeReply(mbus::BlobRef data, const api::StorageCommand& cmd) const { LOG(spam, "Decode %d bytes of data.", data.size()); if (data.size() < sizeof(int32_t)) { std::ostringstream ost; ost << "Request of size " << data.size() << " is not big enough to be " "able to store a request."; throw vespalib::IllegalArgumentException(ost.str(), VESPA_STRLOC); } document::ByteBuffer buf(data.data(), data.size()); int type; buf.getIntNetwork(type); SRep::UP reply; switch (type) { case api::MessageType::PUT_REPLY_ID: reply = onDecodePutReply(cmd, buf); break; case api::MessageType::UPDATE_REPLY_ID: reply = onDecodeUpdateReply(cmd, buf); break; case api::MessageType::GET_REPLY_ID: reply = onDecodeGetReply(cmd, buf); break; case api::MessageType::REMOVE_REPLY_ID: reply = onDecodeRemoveReply(cmd, buf); break; case api::MessageType::CREATEBUCKET_REPLY_ID: reply = onDecodeCreateBucketReply(cmd, buf); break; case api::MessageType::DELETEBUCKET_REPLY_ID: reply = onDecodeDeleteBucketReply(cmd, buf); break; case api::MessageType::MERGEBUCKET_REPLY_ID: reply = onDecodeMergeBucketReply(cmd, buf); break; case api::MessageType::GETBUCKETDIFF_REPLY_ID: reply = onDecodeGetBucketDiffReply(cmd, buf); break; case api::MessageType::APPLYBUCKETDIFF_REPLY_ID: reply = onDecodeApplyBucketDiffReply(cmd, buf); break; case api::MessageType::REQUESTBUCKETINFO_REPLY_ID: reply = onDecodeRequestBucketInfoReply(cmd, buf); break; case api::MessageType::NOTIFYBUCKETCHANGE_REPLY_ID: reply = onDecodeNotifyBucketChangeReply(cmd, buf); break; case api::MessageType::SPLITBUCKET_REPLY_ID: reply = onDecodeSplitBucketReply(cmd, buf); break; case api::MessageType::JOINBUCKETS_REPLY_ID: reply = onDecodeJoinBucketsReply(cmd, buf); break; case api::MessageType::VISITOR_CREATE_REPLY_ID: reply = onDecodeCreateVisitorReply(cmd, buf); break; case api::MessageType::VISITOR_DESTROY_REPLY_ID: reply = onDecodeDestroyVisitorReply(cmd, buf); break; case api::MessageType::STATBUCKET_REPLY_ID: reply = onDecodeStatBucketReply(cmd, buf); break; case api::MessageType::REMOVELOCATION_REPLY_ID: reply = onDecodeRemoveLocationReply(cmd, buf); break; case api::MessageType::SETBUCKETSTATE_REPLY_ID: reply = onDecodeSetBucketStateReply(cmd, buf); break; default: { std::ostringstream ost; ost << "Unknown message type " << type; throw vespalib::IllegalArgumentException(ost.str(), VESPA_STRLOC); } } return std::make_unique(std::move(reply)); } }