summaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@verizonmedia.com>2020-09-09 13:53:27 +0000
committerTor Brede Vekterli <vekterli@verizonmedia.com>2020-09-14 13:22:14 +0000
commitd4cc122432a835a18588c053d3f3f2043b92b831 (patch)
tree5e9b95ad212c2d6f0d55c51f98c5f182a49d2a69 /storage
parent2bfdf23737dbe5291d5552cd6f43f0cb5a751889 (diff)
Remove old dispatcher whose functionality has never been used
Diffstat (limited to 'storage')
-rw-r--r--storage/src/vespa/storage/storageserver/messagedispatcher.cpp231
-rw-r--r--storage/src/vespa/storage/storageserver/messagedispatcher.h75
2 files changed, 0 insertions, 306 deletions
diff --git a/storage/src/vespa/storage/storageserver/messagedispatcher.cpp b/storage/src/vespa/storage/storageserver/messagedispatcher.cpp
deleted file mode 100644
index e6b66c7065c..00000000000
--- a/storage/src/vespa/storage/storageserver/messagedispatcher.cpp
+++ /dev/null
@@ -1,231 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-#include "messagedispatcher.h"
-
-#include <vespa/storageapi/message/state.h>
-#include <storageapi/messageapi/chainedcommand.h>
-#include <storageapi/messageapi/chainedreply.h>
-#include <vespa/document/bucket/bucketid.h>
-
-#include <vespa/log/log.h>
-LOG_SETUP(".message.dispatcher");
-
-using std::shared_ptr;
-
-namespace storage {
-
-MessageDispatcher::MessageDispatcher(StorageServerInterface& server)
- : StorageLink(),
- _access(),
- _cache(),
- _systemState(""),
- _server(server)
-{
-}
-
-MessageDispatcher::~MessageDispatcher()
-{
- closeNextLink();
- LOG(debug, "Deleting link %s.", toString().c_str());
-}
-
-void
-MessageDispatcher::onClose()
-{
- vespalib::LockGuard lock(_access);
- for (std::map<api::StorageMessage::Id, std::shared_ptr<ReplyPair> >
- ::iterator it = _cache.begin(); it != _cache.end(); ++it)
- {
- std::shared_ptr<api::ChainedReply> reply(it->second->first);
- if (it->second->second != 0) {
- reply->setResult(api::ReturnCode(api::ReturnCode::ABORTED,
- "Storage node closing down. Aborting command."));
- sendUp(reply);
- it->second->second = 0;
- }
- }
-
-}
-
-void
-MessageDispatcher::print(std::ostream& out, bool verbose,
- const std::string& indent) const
-{
- (void) verbose; (void) indent;
- out << "MessageDispatcher()";
-}
-
-bool MessageDispatcher::onDown(const shared_ptr<api::StorageMessage> & msg)
-{
- if (msg->getType().isReply()) {
- shared_ptr<api::ChainedReply> reply(
- std::dynamic_pointer_cast<api::ChainedReply>(msg));
- if (reply.get()) {
- return handleReply(reply, false);
- }
- } else {
- shared_ptr<api::ChainedCommand> cmd(
- std::dynamic_pointer_cast<api::ChainedCommand>(msg));
- if (cmd.get()) {
- return handleCommand(cmd);
- }
- if (msg->getType() == api::MessageType::SETSYSTEMSTATE) {
- shared_ptr<api::SetSystemStateCommand> stateCmd(
- std::dynamic_pointer_cast<api::SetSystemStateCommand>(
- msg));
- assert(stateCmd.get());
- _systemState = stateCmd->getSystemState();
- LOG(debug, "Got new distributor state %s.",
- _systemState.toString().c_str());
- }
- }
- return false;
-}
-
-bool MessageDispatcher::onUp(const std::shared_ptr<api::StorageMessage> & msg)
-{
- if (msg->getType().isReply()) {
- shared_ptr<api::ChainedReply> reply(
- std::dynamic_pointer_cast<api::ChainedReply>(msg));
- if (reply.get()) {
- return handleReply(reply, true);
- }
- }
- return false;
-}
-
-bool MessageDispatcher::
-handleCommand(const std::shared_ptr<api::ChainedCommand> & cmd)
-{
- // If we're the first node in the chain,
- // the message has a bucket id related to it,
- // and message came from wrong distributor, fail the message.
- uint16_t expectedNode = 0xFFFF;
- if (cmd->getSourceIndex() != 0xFFFF &&
- cmd->hasBucketId() &&
- !isCorrectDistributor(cmd->getBucketId(), cmd->getSourceIndex(),
- expectedNode))
- {
- std::string msg;
-
- if (expectedNode != 0xFFFF) {
- msg = vespalib::make_string(
- "Got chained command %s with bucket id %s from distributor "
- "%d, which is wrong given our state. Correct should be %d. "
- "Ignoring since we're primary node.",
- cmd->getType().getName().c_str(),
- cmd->getBucketId().toString().c_str(),
- cmd->getSourceIndex(),
- expectedNode);
- } else {
- msg = vespalib::make_string(
- "Got chained command %s with bucket id %s, but no "
- "distributors in system state. Haven't received system "
- "state yet?",
- cmd->getType().getName().c_str(),
- cmd->getBucketId().toString().c_str());
- }
-
- LOG(debug, msg.c_str());
- shared_ptr<api::StorageReply> reply(cmd->makeReply().release());
- reply->setResult(api::ReturnCode(api::ReturnCode::ABORTED, msg));
- sendUp(reply);
- return true;
-
- }
- // If not used chained, just pass it through
- if (!cmd->hasNodes()) {
- LOG(spam, "Chained command contains no nodes, passing it through");
- return false;
- }
- bool runLocally = cmd->getNodes().back()._run;
- // If last node in chain, handle directly
- if (cmd->getNodeCount() == 1) {
- if (runLocally) {
- LOG(spam, "Last node in chain, running it locally.");
- return false;
- } else {
- LOG(spam, "Last node in chain, not running locally, so returning.");
- shared_ptr<api::StorageReply> reply(cmd->makeReply().release());
- sendUp(reply);
- return true;
- }
- }
- // Create commands first, as we need ids for cache.
- shared_ptr<api::ChainedCommand> extCmd(cmd->clone());
- shared_ptr<api::ChainedCommand> localCmd(runLocally ? cmd->clone() : 0);
-
- // When stuff in cache, to be sure it's there when reply comes.
- shared_ptr<api::ChainedReply> reply(dynamic_cast<api::ChainedReply*>(
- cmd->makeReply().release()));
- assert(reply.get());
- {
- vespalib::LockGuard lock(_access);
- shared_ptr<ReplyPair> pair(new ReplyPair(reply, runLocally ? 2 : 1));
- _cache[extCmd->getMsgId()] = pair;
- if (localCmd.get()) {
- _cache[localCmd->getMsgId()] = pair;
- }
- }
- // Send external first since it will probably use the most time
- extCmd->setSourceIndex(0xFFFF);
- extCmd->getNodes().pop_back();
- extCmd->setAddress(api::ServerAddress(_server.getClusterName(), "storage", extCmd->getNodes().back()._node));
-
- LOG(spam, "Sending chained command on to node %d.",
- extCmd->getNodes().back()._node);
- sendUp(extCmd);
- // Send internal copy if run locally flag is set
- if (runLocally) {
- LOG(spam, "Running chained command locally.");
- localCmd->setSourceIndex(0xFFFF);
- sendDown(localCmd);
- }
- return true;
-}
-
-bool
-MessageDispatcher::handleReply(
- const std::shared_ptr<api::ChainedReply>& reply, bool localSource)
-{
- // Ignore replies on their way up in the storage chain, with a
- // destination object set. These are replies on commands not sent
- // locally, thus not replies possibly for the message dispatcher.
- if (localSource && !reply->isLocal()) return false;
-
- vespalib::LockGuard lock(_access);
- std::map<api::StorageMessage::Id, shared_ptr<ReplyPair> >::iterator it
- = _cache.find(reply->getMsgId());
- if (it == _cache.end()) {
- return false; // Not for us
- }
- if (it->second.get() == 0) {
- LOG(debug, "Reply already sent back (probably due to shutdown)");
- return true; // Already sent
- }
- bool lastReply = (--it->second->second == 0);
- if (!lastReply || localSource) {
- it->second->first->appendState(*reply);
- } else {
- it->second->first->prependState(*reply);
- }
- if (lastReply) {
- LOG(spam, "Last chained reply retrieved, sending original reply.");
- sendUp(it->second->first);
- } else {
- LOG(spam, "Got chained reply, waiting for next");
- }
- _cache.erase(it);
- return true;
-}
-
-bool
-MessageDispatcher::isCorrectDistributor(
- const document::BucketId& id, uint16_t distributor, uint16_t& expected)
-{
- std::vector<uint16_t> distributors;
- (id).getIdealNodes(lib::NodeType::DISTRIBUTOR, _systemState, _server.getBucketIdFactory(), distributors);
- return (distributors.size() > 0 && (expected = distributors[0]) == distributor);
-}
-
-} // storage
diff --git a/storage/src/vespa/storage/storageserver/messagedispatcher.h b/storage/src/vespa/storage/storageserver/messagedispatcher.h
deleted file mode 100644
index 0c8fdb8916b..00000000000
--- a/storage/src/vespa/storage/storageserver/messagedispatcher.h
+++ /dev/null
@@ -1,75 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-/**
- * @class storage::MessageDispatcher
- * @ingroup storageserver
- *
- * @brief Sends messages through to multiple hosts.
- *
- * In VDS, some messages are sent to the first storage node, and the node itself
- * should send the request on to another storage node and so on (put/remove).
- * This link is responsible for receiving such messages, and send it through to
- * next host, as well as through to the local host, wait for both responses and
- * reply back. If one of the responses fails, it should issue a revert command.
- *
- * @author H�kon Humberset
- * @date 2006-01-16
- * @version $Id$
- */
-
-#pragma once
-
-#include <vespa/vespalib/util/sync.h>
-#include <map>
-#include <vdslib/state/systemstate.h>
-#include <vespa/storage/common/storagelink.h>
-
-namespace storage {
-namespace api {
- class BucketId;
- class ChainedCommand;
- class ChainedReply;
-}
-
-class MessageDispatcher : public StorageLink {
- mutable vespalib::Lock _access;
- typedef std::pair<std::shared_ptr<api::ChainedReply>, uint32_t> ReplyPair;
- std::map<api::StorageMessage::Id, std::shared_ptr<ReplyPair> > _cache;
- lib::ClusterState _systemState;
- StorageServerInterface& _server;
-
-public:
- explicit MessageDispatcher(StorageServerInterface& server);
- ~MessageDispatcher();
-
- virtual void onClose();
-
- virtual void print(std::ostream& out, bool verbose,
- const std::string& indent) const;
-
- class Factory : public StorageLink::Factory {
- public:
- std::unique_ptr<StorageLink> create(const std::string& configId,
- StorageServerInterface& server) const
- {
- (void) configId;
- return std::unique_ptr<StorageLink>(new MessageDispatcher(server));
- }
- };
-
-private:
-
- bool onDown(const std::shared_ptr<api::StorageMessage> & msg);
- bool onUp(const std::shared_ptr<api::StorageMessage> & msg);
-
- bool handleCommand(const std::shared_ptr<api::ChainedCommand>& cmd);
- bool handleReply(const std::shared_ptr<api::ChainedReply>& reply,
- bool localSource);
-
- bool isCorrectDistributor(const document::BucketId& id, uint16_t distributor,
- uint16_t& expected);
-
-};
-
-} // storage
-
-