summaryrefslogtreecommitdiffstats
path: root/storage/src/vespa/storageapi/messageapi/storagemessage.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'storage/src/vespa/storageapi/messageapi/storagemessage.cpp')
-rw-r--r--storage/src/vespa/storageapi/messageapi/storagemessage.cpp306
1 files changed, 306 insertions, 0 deletions
diff --git a/storage/src/vespa/storageapi/messageapi/storagemessage.cpp b/storage/src/vespa/storageapi/messageapi/storagemessage.cpp
new file mode 100644
index 00000000000..db5c86af989
--- /dev/null
+++ b/storage/src/vespa/storageapi/messageapi/storagemessage.cpp
@@ -0,0 +1,306 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "storagemessage.h"
+#include <vespa/messagebus/routing/verbatimdirective.h>
+#include <vespa/vespalib/util/exceptions.h>
+#include <vespa/vespalib/stllike/asciistream.h>
+#include <vespa/vespalib/stllike/hash_fun.h>
+#include <sstream>
+#include <cassert>
+#include <atomic>
+
+namespace storage::api {
+
+namespace {
+
+std::atomic<uint64_t> _G_lastMsgId(1000);
+
+}
+
+static const vespalib::string STORAGEADDRESS_PREFIX = "storage/cluster.";
+
+const char*
+StorageMessage::getPriorityString(Priority p) {
+ switch (p) {
+ case LOW: return "LOW";
+ case NORMAL: return "NORMAL";
+ case HIGH: return "HIGH";
+ case VERYHIGH: return "VERYHIGH";
+ default: return "UNKNOWN";
+ }
+}
+
+std::map<MessageType::Id, MessageType*> MessageType::_codes;
+
+const MessageType MessageType::DOCBLOCK("DocBlock", DOCBLOCK_ID);
+const MessageType MessageType::DOCBLOCK_REPLY("DocBlock Reply", DOCBLOCK_REPLY_ID, &MessageType::DOCBLOCK);
+const MessageType MessageType::GET("Get", GET_ID);
+const MessageType MessageType::GET_REPLY("Get Reply", GET_REPLY_ID, &MessageType::GET);
+const MessageType MessageType::INTERNAL("Internal", INTERNAL_ID);
+const MessageType MessageType::INTERNAL_REPLY("Internal Reply", INTERNAL_REPLY_ID, &MessageType::INTERNAL);
+const MessageType MessageType::PUT("Put", PUT_ID);
+const MessageType MessageType::PUT_REPLY("Put Reply", PUT_REPLY_ID, &MessageType::PUT);
+const MessageType MessageType::UPDATE("Update", UPDATE_ID);
+const MessageType MessageType::UPDATE_REPLY("Update Reply", UPDATE_REPLY_ID, &MessageType::UPDATE);
+const MessageType MessageType::REMOVE("Remove", REMOVE_ID);
+const MessageType MessageType::REMOVE_REPLY("Remove Reply", REMOVE_REPLY_ID, &MessageType::REMOVE);
+const MessageType MessageType::REVERT("Revert", REVERT_ID);
+const MessageType MessageType::REVERT_REPLY("Revert Reply", REVERT_REPLY_ID, &MessageType::REVERT);
+const MessageType MessageType::VISITOR_CREATE("Visitor Create", VISITOR_CREATE_ID);
+const MessageType MessageType::VISITOR_CREATE_REPLY("Visitor Create Reply", VISITOR_CREATE_REPLY_ID, &MessageType::VISITOR_CREATE);
+const MessageType MessageType::VISITOR_DESTROY("Visitor Destroy", VISITOR_DESTROY_ID);
+const MessageType MessageType::VISITOR_DESTROY_REPLY("Visitor Destroy Reply", VISITOR_DESTROY_REPLY_ID, &MessageType::VISITOR_DESTROY);
+const MessageType MessageType::REQUESTBUCKETINFO("Request bucket info", REQUESTBUCKETINFO_ID);
+const MessageType MessageType::REQUESTBUCKETINFO_REPLY("Request bucket info reply", REQUESTBUCKETINFO_REPLY_ID, &MessageType::REQUESTBUCKETINFO);
+const MessageType MessageType::NOTIFYBUCKETCHANGE("Notify bucket change", NOTIFYBUCKETCHANGE_ID);
+const MessageType MessageType::NOTIFYBUCKETCHANGE_REPLY("Notify bucket change reply", NOTIFYBUCKETCHANGE_REPLY_ID, &MessageType::NOTIFYBUCKETCHANGE);
+const MessageType MessageType::CREATEBUCKET("Create bucket", CREATEBUCKET_ID);
+const MessageType MessageType::CREATEBUCKET_REPLY("Create bucket reply", CREATEBUCKET_REPLY_ID, &MessageType::CREATEBUCKET);
+const MessageType MessageType::MERGEBUCKET("Merge bucket", MERGEBUCKET_ID);
+const MessageType MessageType::MERGEBUCKET_REPLY("Merge bucket reply", MERGEBUCKET_REPLY_ID, &MessageType::MERGEBUCKET);
+const MessageType MessageType::DELETEBUCKET("Delete bucket", DELETEBUCKET_ID);
+const MessageType MessageType::DELETEBUCKET_REPLY("Delete bucket reply", DELETEBUCKET_REPLY_ID, &MessageType::DELETEBUCKET);
+const MessageType MessageType::SETNODESTATE("Set node state", SETNODESTATE_ID);
+const MessageType MessageType::SETNODESTATE_REPLY("Set node state reply", SETNODESTATE_REPLY_ID, &MessageType::SETNODESTATE);
+const MessageType MessageType::GETNODESTATE("Get node state", GETNODESTATE_ID);
+const MessageType MessageType::GETNODESTATE_REPLY("Get node state reply", GETNODESTATE_REPLY_ID, &MessageType::GETNODESTATE);
+const MessageType MessageType::SETSYSTEMSTATE("Set system state", SETSYSTEMSTATE_ID);
+const MessageType MessageType::SETSYSTEMSTATE_REPLY("Set system state reply", SETSYSTEMSTATE_REPLY_ID, &MessageType::SETSYSTEMSTATE);
+const MessageType MessageType::GETSYSTEMSTATE("Get system state", GETSYSTEMSTATE_ID);
+const MessageType MessageType::GETSYSTEMSTATE_REPLY("get system state reply", GETSYSTEMSTATE_REPLY_ID, &MessageType::GETSYSTEMSTATE);
+const MessageType MessageType::ACTIVATE_CLUSTER_STATE_VERSION("Activate cluster state version", ACTIVATE_CLUSTER_STATE_VERSION_ID);
+const MessageType MessageType::ACTIVATE_CLUSTER_STATE_VERSION_REPLY("Activate cluster state version reply", ACTIVATE_CLUSTER_STATE_VERSION_REPLY_ID, &MessageType::ACTIVATE_CLUSTER_STATE_VERSION);
+const MessageType MessageType::GETBUCKETDIFF("GetBucketDiff", GETBUCKETDIFF_ID);
+const MessageType MessageType::GETBUCKETDIFF_REPLY("GetBucketDiff reply", GETBUCKETDIFF_REPLY_ID, &MessageType::GETBUCKETDIFF);
+const MessageType MessageType::APPLYBUCKETDIFF("ApplyBucketDiff", APPLYBUCKETDIFF_ID);
+const MessageType MessageType::APPLYBUCKETDIFF_REPLY("ApplyBucketDiff reply", APPLYBUCKETDIFF_REPLY_ID, &MessageType::APPLYBUCKETDIFF);
+const MessageType MessageType::VISITOR_INFO("VisitorInfo", VISITOR_INFO_ID);
+const MessageType MessageType::VISITOR_INFO_REPLY("VisitorInfo reply", VISITOR_INFO_REPLY_ID, &MessageType::VISITOR_INFO);
+const MessageType MessageType::SEARCHRESULT("SearchResult", SEARCHRESULT_ID);
+const MessageType MessageType::SEARCHRESULT_REPLY("SearchResult reply", SEARCHRESULT_REPLY_ID, &MessageType::SEARCHRESULT);
+const MessageType MessageType::DOCUMENTSUMMARY("DocumentSummary", DOCUMENTSUMMARY_ID);
+const MessageType MessageType::DOCUMENTSUMMARY_REPLY("DocumentSummary reply", DOCUMENTSUMMARY_REPLY_ID, &MessageType::DOCUMENTSUMMARY);
+const MessageType MessageType::MAPVISITOR("Mapvisitor", MAPVISITOR_ID);
+const MessageType MessageType::MAPVISITOR_REPLY("Mapvisitor reply", MAPVISITOR_REPLY_ID, &MessageType::MAPVISITOR);
+const MessageType MessageType::SPLITBUCKET("SplitBucket", SPLITBUCKET_ID);
+const MessageType MessageType::SPLITBUCKET_REPLY("SplitBucket reply", SPLITBUCKET_REPLY_ID, &MessageType::SPLITBUCKET);
+const MessageType MessageType::JOINBUCKETS("Joinbuckets", JOINBUCKETS_ID);
+const MessageType MessageType::JOINBUCKETS_REPLY("Joinbuckets reply", JOINBUCKETS_REPLY_ID, &MessageType::JOINBUCKETS);
+const MessageType MessageType::STATBUCKET("Statbucket", STATBUCKET_ID);
+const MessageType MessageType::STATBUCKET_REPLY("Statbucket Reply", STATBUCKET_REPLY_ID, &MessageType::STATBUCKET);
+const MessageType MessageType::GETBUCKETLIST("Getbucketlist", GETBUCKETLIST_ID);
+const MessageType MessageType::GETBUCKETLIST_REPLY("Getbucketlist Reply", GETBUCKETLIST_REPLY_ID, &MessageType::GETBUCKETLIST);
+const MessageType MessageType::DOCUMENTLIST("documentlist", DOCUMENTLIST_ID);
+const MessageType MessageType::DOCUMENTLIST_REPLY("documentlist Reply", DOCUMENTLIST_REPLY_ID, &MessageType::DOCUMENTLIST);
+const MessageType MessageType::EMPTYBUCKETS("Emptybuckets", EMPTYBUCKETS_ID);
+const MessageType MessageType::EMPTYBUCKETS_REPLY("Emptybuckets Reply", EMPTYBUCKETS_REPLY_ID, &MessageType::EMPTYBUCKETS);
+const MessageType MessageType::REMOVELOCATION("Removelocation", REMOVELOCATION_ID);
+const MessageType MessageType::REMOVELOCATION_REPLY("Removelocation Reply", REMOVELOCATION_REPLY_ID, &MessageType::REMOVELOCATION);
+const MessageType MessageType::QUERYRESULT("QueryResult", QUERYRESULT_ID);
+const MessageType MessageType::QUERYRESULT_REPLY("QueryResult reply", QUERYRESULT_REPLY_ID, &MessageType::QUERYRESULT);
+const MessageType MessageType::SETBUCKETSTATE("SetBucketState", SETBUCKETSTATE_ID);
+const MessageType MessageType::SETBUCKETSTATE_REPLY("SetBucketStateReply", SETBUCKETSTATE_REPLY_ID, &MessageType::SETBUCKETSTATE);
+
+const MessageType&
+MessageType::MessageType::get(Id id)
+{
+ auto it = _codes.find(id);
+ if (it == _codes.end()) {
+ std::ostringstream ost;
+ ost << "No message type with id " << id << ".";
+ throw vespalib::IllegalArgumentException(ost.str(), VESPA_STRLOC);
+ }
+ return *it->second;
+}
+MessageType::MessageType(vespalib::stringref name, Id id,
+ const MessageType* replyOf)
+ : _name(name), _id(id), _reply(nullptr), _replyOf(replyOf)
+{
+ _codes[id] = this;
+ if (_replyOf) {
+ assert(_replyOf->_reply == nullptr);
+ // Ugly cast to let initialization work
+ auto& type = const_cast<MessageType&>(*_replyOf);
+ type._reply = this;
+ }
+}
+
+MessageType::~MessageType() = default;
+
+void
+MessageType::print(std::ostream& out, bool verbose, const std::string& indent) const
+{
+ (void) verbose; (void) indent;
+ out << "MessageType(" << _id << ", " << _name;
+ if (_replyOf) {
+ out << ", reply of " << _replyOf->getName();
+ }
+ out << ")";
+}
+
+std::ostream & operator << (std::ostream & os, const StorageMessageAddress & addr) {
+ return os << addr.toString();
+}
+
+namespace {
+
+vespalib::string
+createAddress(vespalib::stringref cluster, const lib::NodeType &type, uint16_t index) {
+ vespalib::asciistream os;
+ os << STORAGEADDRESS_PREFIX << cluster << '/' << type.toString() << '/' << index << "/default";
+ return os.str();
+}
+
+uint32_t
+calculate_node_hash(const lib::NodeType &type, uint16_t index) {
+ uint16_t buf[] = {type, index};
+ size_t hash = vespalib::hashValue(&buf, sizeof(buf));
+ return uint32_t(hash & 0xffffffffl) ^ uint32_t(hash >> 32);
+}
+
+vespalib::string Empty;
+
+}
+
+// TODO we ideally want this removed. Currently just in place to support usage as map key when emplacement not available
+StorageMessageAddress::StorageMessageAddress() noexcept
+ : _cluster(&Empty),
+ _precomputed_storage_hash(0),
+ _type(lib::NodeType::Type::UNKNOWN),
+ _protocol(Protocol::STORAGE),
+ _index(0)
+{}
+
+StorageMessageAddress::StorageMessageAddress(const vespalib::string * cluster, const lib::NodeType& type, uint16_t index) noexcept
+ : StorageMessageAddress(cluster, type, index, Protocol::STORAGE)
+{ }
+
+StorageMessageAddress::StorageMessageAddress(const vespalib::string * cluster, const lib::NodeType& type,
+ uint16_t index, Protocol protocol) noexcept
+ : _cluster(cluster),
+ _precomputed_storage_hash(calculate_node_hash(type, index)),
+ _type(type.getType()),
+ _protocol(protocol),
+ _index(index)
+{ }
+
+StorageMessageAddress::~StorageMessageAddress() = default;
+
+mbus::Route
+StorageMessageAddress::to_mbus_route() const
+{
+ mbus::Route result;
+ auto address_as_str = createAddress(getCluster(), lib::NodeType::get(_type), _index);
+ std::vector<mbus::IHopDirective::SP> directives;
+ directives.emplace_back(std::make_shared<mbus::VerbatimDirective>(std::move(address_as_str)));
+ result.addHop(mbus::Hop(std::move(directives), false));
+ return result;
+}
+
+bool
+StorageMessageAddress::operator==(const StorageMessageAddress& other) const noexcept
+{
+ if (_protocol != other._protocol) return false;
+ if (_type != other._type) return false;
+ if (_index != other._index) return false;
+ if (getCluster() != other.getCluster()) return false;
+ return true;
+}
+
+vespalib::string
+StorageMessageAddress::toString() const
+{
+ vespalib::asciistream os;
+ print(os);
+ return os.str();
+}
+
+void
+StorageMessageAddress::print(vespalib::asciistream & out) const
+{
+ out << "StorageMessageAddress(";
+ if (_protocol == Protocol::STORAGE) {
+ out << "Storage protocol";
+ } else {
+ out << "Document protocol";
+ }
+ if (_type == lib::NodeType::Type::UNKNOWN) {
+ out << ", " << to_mbus_route().toString() << ")";
+ } else {
+ out << ", cluster " << getCluster() << ", nodetype " << lib::NodeType::get(_type)
+ << ", index " << _index << ")";
+ }
+}
+
+TransportContext::~TransportContext() = default;
+
+StorageMessage::Id
+StorageMessage::generateMsgId() noexcept
+{
+ return _G_lastMsgId.fetch_add(1, std::memory_order_relaxed);
+}
+
+StorageMessage::StorageMessage(const MessageType& type, Id id) noexcept
+ : _type(type),
+ _msgId(id),
+ _address(),
+ _trace(),
+ _approxByteSize(50),
+ _priority(NORMAL)
+{
+}
+
+StorageMessage::StorageMessage(const StorageMessage& other, Id id) noexcept
+ : _type(other._type),
+ _msgId(id),
+ _address(),
+ _trace(other.getTrace().getLevel()),
+ _approxByteSize(other._approxByteSize),
+ _priority(other._priority)
+{
+}
+
+StorageMessage::~StorageMessage() = default;
+
+void
+StorageMessage::setNewMsgId() noexcept
+{
+ _msgId = generateMsgId();
+}
+
+vespalib::string
+StorageMessage::getSummary() const {
+ return toString();
+}
+
+const char*
+to_string(LockingRequirements req) noexcept {
+ switch (req) {
+ case LockingRequirements::Exclusive: return "Exclusive";
+ case LockingRequirements::Shared: return "Shared";
+ default: abort();
+ }
+}
+
+std::ostream&
+operator<<(std::ostream& os, LockingRequirements req) {
+ os << to_string(req);
+ return os;
+}
+
+const char*
+to_string(InternalReadConsistency consistency) noexcept {
+ switch (consistency) {
+ case InternalReadConsistency::Strong: return "Strong";
+ case InternalReadConsistency::Weak: return "Weak";
+ default: abort();
+ }
+}
+
+std::ostream&
+operator<<(std::ostream& os, InternalReadConsistency consistency) {
+ os << to_string(consistency);
+ return os;
+}
+
+}