diff options
Diffstat (limited to 'storage')
10 files changed, 102 insertions, 206 deletions
diff --git a/storage/src/tests/storageserver/bouncertest.cpp b/storage/src/tests/storageserver/bouncertest.cpp index 5b7d279537e..c41696e1a02 100644 --- a/storage/src/tests/storageserver/bouncertest.cpp +++ b/storage/src/tests/storageserver/bouncertest.cpp @@ -51,10 +51,9 @@ struct BouncerTest : public Test { api::Timestamp timestamp, document::BucketSpace bucketSpace); - void expectMessageBouncedWithRejection() const; - void expect_message_bounced_with_node_down_abort() const; - void expect_message_bounced_with_shutdown_abort() const; - void expectMessageNotBounced() const; + void expectMessageBouncedWithRejection(); + void expectMessageBouncedWithAbort(); + void expectMessageNotBounced(); }; BouncerTest::BouncerTest() @@ -182,7 +181,7 @@ TEST_F(BouncerTest, allow_notify_bucket_change_even_when_distributor_down) { } void -BouncerTest::expectMessageBouncedWithRejection() const +BouncerTest::expectMessageBouncedWithRejection() { ASSERT_EQ(1, _upper->getNumReplies()); EXPECT_EQ(0, _upper->getNumCommands()); @@ -192,7 +191,7 @@ BouncerTest::expectMessageBouncedWithRejection() const } void -BouncerTest::expect_message_bounced_with_node_down_abort() const +BouncerTest::expectMessageBouncedWithAbort() { ASSERT_EQ(1, _upper->getNumReplies()); EXPECT_EQ(0, _upper->getNumCommands()); @@ -205,17 +204,7 @@ BouncerTest::expect_message_bounced_with_node_down_abort() const } void -BouncerTest::expect_message_bounced_with_shutdown_abort() const -{ - ASSERT_EQ(1, _upper->getNumReplies()); - EXPECT_EQ(0, _upper->getNumCommands()); - auto& reply = dynamic_cast<api::StorageReply&>(*_upper->getReply(0)); - EXPECT_EQ(api::ReturnCode(api::ReturnCode::ABORTED, "Node is shutting down"), reply.getResult()); - EXPECT_EQ(0, _lower->getNumCommands()); -} - -void -BouncerTest::expectMessageNotBounced() const +BouncerTest::expectMessageNotBounced() { EXPECT_EQ(size_t(0), _upper->getNumReplies()); EXPECT_EQ(size_t(1), _lower->getNumCommands()); @@ -307,7 +296,7 @@ TEST_F(BouncerTest, abort_request_when_derived_bucket_space_node_state_is_marked auto state = makeClusterStateBundle("distributor:3 storage:3", {{ document::FixedBucketSpaces::default_space(), "distributor:3 storage:3 .2.s:d" }}); _node->getNodeStateUpdater().setClusterStateBundle(state); _upper->sendDown(createDummyFeedMessage(11 * 1000000, document::FixedBucketSpaces::default_space())); - expect_message_bounced_with_node_down_abort(); + expectMessageBouncedWithAbort(); EXPECT_EQ(1, _manager->metrics().unavailable_node_aborts.getValue()); _upper->reset(); @@ -373,23 +362,5 @@ TEST_F(BouncerTest, operation_with_sufficient_bucket_bits_is_not_rejected) { expectMessageNotBounced(); } -TEST_F(BouncerTest, requests_are_rejected_after_close) { - _manager->close(); - _upper->sendDown(createDummyFeedMessage(11 * 1000000, document::FixedBucketSpaces::default_space())); - expect_message_bounced_with_shutdown_abort(); -} - -TEST_F(BouncerTest, replies_are_swallowed_after_close) { - _manager->close(); - auto req = createDummyFeedMessage(11 * 1000000, document::FixedBucketSpaces::default_space()); - auto reply = req->makeReply(); - _upper->sendDown(std::move(reply)); - - EXPECT_EQ(0, _upper->getNumCommands()); - EXPECT_EQ(0, _upper->getNumReplies()); - EXPECT_EQ(0, _lower->getNumCommands()); - EXPECT_EQ(0, _lower->getNumReplies()); -} - } // storage diff --git a/storage/src/vespa/storage/common/storagelink.cpp b/storage/src/vespa/storage/common/storagelink.cpp index e774e6967b9..beccd605650 100644 --- a/storage/src/vespa/storage/common/storagelink.cpp +++ b/storage/src/vespa/storage/common/storagelink.cpp @@ -14,20 +14,6 @@ using namespace storage::api; namespace storage { -StorageLink::StorageLink(const std::string& name, bool allow_msg_down_during_flushing) - : _name(name), - _up(nullptr), - _down(), - _state(CREATED), - _allow_msg_down_during_flushing(allow_msg_down_during_flushing) -{ -} - -StorageLink::StorageLink(const std::string& name) - : StorageLink(name, false) -{ -} - StorageLink::~StorageLink() { LOG(debug, "Destructing link %s.", toString().c_str()); } @@ -143,15 +129,9 @@ void StorageLink::sendDown(const StorageMessage::SP& msg) case CLOSING: case FLUSHINGDOWN: break; - case FLUSHINGUP: - if (_allow_msg_down_during_flushing) { - break; - } - [[fallthrough]]; default: - LOG(error, "Link %s trying to send %s down while in state %s. Stacktrace: %s", - toString().c_str(), msg->toString().c_str(), stateToString(getState()), - vespalib::getStackTrace(0).c_str()); + LOG(error, "Link %s trying to send %s down while in state %s", + toString().c_str(), msg->toString().c_str(), stateToString(getState())); assert(false); } assert(msg); @@ -192,9 +172,8 @@ void StorageLink::sendUp(const std::shared_ptr<StorageMessage> & msg) case FLUSHINGUP: break; default: - LOG(error, "Link %s trying to send %s up while in state %s. Stacktrace: %s", - toString().c_str(), msg->toString(true).c_str(), stateToString(getState()), - vespalib::getStackTrace(0).c_str()); + LOG(error, "Link %s trying to send %s up while in state %s", + toString().c_str(), msg->toString(true).c_str(), stateToString(getState())); assert(false); } assert(msg); @@ -302,14 +281,15 @@ Queue::getNext(std::shared_ptr<api::StorageMessage>& msg, vespalib::duration tim void Queue::enqueue(std::shared_ptr<api::StorageMessage> msg) { - std::lock_guard sync(_lock); - _queue.emplace(std::move(msg)); + { + std::lock_guard sync(_lock); + _queue.emplace(std::move(msg)); + } _cond.notify_one(); } void Queue::signal() { - std::lock_guard sync(_lock); _cond.notify_one(); } diff --git a/storage/src/vespa/storage/common/storagelink.h b/storage/src/vespa/storage/common/storagelink.h index 1682804a746..2b470d029d8 100644 --- a/storage/src/vespa/storage/common/storagelink.h +++ b/storage/src/vespa/storage/common/storagelink.h @@ -42,34 +42,28 @@ public: enum State { CREATED, OPENED, CLOSING, FLUSHINGDOWN, FLUSHINGUP, CLOSED }; private: - const std::string _name; - StorageLink* _up; + std::string _name; + StorageLink* _up; std::unique_ptr<StorageLink> _down; - std::atomic<State> _state; - const bool _allow_msg_down_during_flushing; + std::atomic<State> _state; public: - StorageLink(const std::string& name, bool allow_msg_down_during_flushing); - explicit StorageLink(const std::string& name); - StorageLink(const StorageLink &) = delete; StorageLink & operator = (const StorageLink &) = delete; + StorageLink(const std::string& name) + : _name(name), _up(0), _down(), _state(CREATED) {} ~StorageLink() override; - const std::string& getName() const noexcept { return _name; } - [[nodiscard]] bool isTop() const noexcept { return !_up; } - [[nodiscard]] bool isBottom() const noexcept { return !_down; } - [[nodiscard]] unsigned int size() const noexcept { - return (isBottom() ? 1 : _down->size() + 1); - } + const std::string& getName() const { return _name; } + bool isTop() const { return (_up == 0); } + bool isBottom() const { return (_down.get() == 0); } + unsigned int size() const { return (isBottom() ? 1 : _down->size() + 1); } /** Adds the link to the end of the chain. */ void push_back(StorageLink::UP); /** Get the current state of the storage link. */ - [[nodiscard]] State getState() const noexcept { - return _state.load(std::memory_order_relaxed); - } + State getState() const noexcept { return _state.load(std::memory_order_relaxed); } /** * Called by storage server after the storage chain have been created. diff --git a/storage/src/vespa/storage/storageserver/bouncer.cpp b/storage/src/vespa/storage/storageserver/bouncer.cpp index fa98afe9489..404058325b9 100644 --- a/storage/src/vespa/storage/storageserver/bouncer.cpp +++ b/storage/src/vespa/storage/storageserver/bouncer.cpp @@ -30,19 +30,19 @@ Bouncer::Bouncer(StorageComponentRegister& compReg, const config::ConfigUri & co _derivedNodeStates(), _clusterState(&lib::State::UP), _configFetcher(std::make_unique<config::ConfigFetcher>(configUri.getContext())), - _metrics(std::make_unique<BouncerMetrics>()), - _closed(false) + _metrics(std::make_unique<BouncerMetrics>()) { _component.getStateUpdater().addStateListener(*this); _component.registerMetric(*_metrics); // Register for config. Normally not critical, so catching config // exception allowing program to continue if missing/faulty config. - try { + try{ if (!configUri.empty()) { _configFetcher->subscribe<vespa::config::content::core::StorBouncerConfig>(configUri.getConfigId(), this); _configFetcher->start(); } else { - LOG(info, "No config id specified. Using defaults rather than config"); + LOG(info, "No config id specified. Using defaults rather than " + "config"); } } catch (config::InvalidConfigException& e) { LOG(info, "Bouncer failed to load config '%s'. This " @@ -70,8 +70,6 @@ Bouncer::onClose() { _configFetcher->close(); _component.getStateUpdater().removeStateListener(*this); - std::lock_guard guard(_lock); - _closed = true; } void @@ -88,7 +86,8 @@ const BouncerMetrics& Bouncer::metrics() const noexcept { } void -Bouncer::validateConfig(const vespa::config::content::core::StorBouncerConfig& newConfig) const +Bouncer::validateConfig( + const vespa::config::content::core::StorBouncerConfig& newConfig) const { if (newConfig.feedRejectionPriorityThreshold != -1) { if (newConfig.feedRejectionPriorityThreshold @@ -113,11 +112,12 @@ void Bouncer::append_node_identity(std::ostream& target_stream) const { } void -Bouncer::abortCommandForUnavailableNode(api::StorageMessage& msg, const lib::State& state) +Bouncer::abortCommandForUnavailableNode(api::StorageMessage& msg, + const lib::State& state) { // If we're not up or retired, fail due to this nodes state. std::shared_ptr<api::StorageReply> reply( - static_cast<api::StorageCommand&>(msg).makeReply()); + static_cast<api::StorageCommand&>(msg).makeReply().release()); std::ostringstream ost; ost << "We don't allow command of type " << msg.getType() << " when node is in state " << state.toString(true); @@ -128,7 +128,8 @@ Bouncer::abortCommandForUnavailableNode(api::StorageMessage& msg, const lib::Sta } void -Bouncer::rejectCommandWithTooHighClockSkew(api::StorageMessage& msg, int maxClockSkewInSeconds) +Bouncer::rejectCommandWithTooHighClockSkew(api::StorageMessage& msg, + int maxClockSkewInSeconds) { auto& as_cmd = dynamic_cast<api::StorageCommand&>(msg); std::ostringstream ost; @@ -139,7 +140,7 @@ Bouncer::rejectCommandWithTooHighClockSkew(api::StorageMessage& msg, int maxCloc as_cmd.getSourceIndex(), ost.str().c_str()); _metrics->clock_skew_aborts.inc(); - std::shared_ptr<api::StorageReply> reply(as_cmd.makeReply()); + std::shared_ptr<api::StorageReply> reply(as_cmd.makeReply().release()); reply->setResult(api::ReturnCode(api::ReturnCode::REJECTED, ost.str())); sendUp(reply); } @@ -147,7 +148,8 @@ Bouncer::rejectCommandWithTooHighClockSkew(api::StorageMessage& msg, int maxCloc void Bouncer::abortCommandDueToClusterDown(api::StorageMessage& msg, const lib::State& cluster_state) { - std::shared_ptr<api::StorageReply> reply(static_cast<api::StorageCommand&>(msg).makeReply()); + std::shared_ptr<api::StorageReply> reply( + static_cast<api::StorageCommand&>(msg).makeReply().release()); std::ostringstream ost; ost << "We don't allow external load while cluster is in state " << cluster_state.toString(true); @@ -170,35 +172,35 @@ uint64_t Bouncer::extractMutationTimestampIfAny(const api::StorageMessage& msg) { switch (msg.getType().getId()) { - case api::MessageType::PUT_ID: - return static_cast<const api::PutCommand&>(msg).getTimestamp(); - case api::MessageType::REMOVE_ID: - return static_cast<const api::RemoveCommand&>(msg).getTimestamp(); - case api::MessageType::UPDATE_ID: - return static_cast<const api::UpdateCommand&>(msg).getTimestamp(); - default: - return 0; + case api::MessageType::PUT_ID: + return static_cast<const api::PutCommand&>(msg).getTimestamp(); + case api::MessageType::REMOVE_ID: + return static_cast<const api::RemoveCommand&>(msg).getTimestamp(); + case api::MessageType::UPDATE_ID: + return static_cast<const api::UpdateCommand&>(msg).getTimestamp(); + default: + return 0; } } bool -Bouncer::isExternalLoad(const api::MessageType& type) noexcept +Bouncer::isExternalLoad(const api::MessageType& type) const noexcept { switch (type.getId()) { - case api::MessageType::PUT_ID: - case api::MessageType::REMOVE_ID: - case api::MessageType::UPDATE_ID: - case api::MessageType::GET_ID: - case api::MessageType::VISITOR_CREATE_ID: - case api::MessageType::STATBUCKET_ID: - return true; - default: - return false; + case api::MessageType::PUT_ID: + case api::MessageType::REMOVE_ID: + case api::MessageType::UPDATE_ID: + case api::MessageType::GET_ID: + case api::MessageType::VISITOR_CREATE_ID: + case api::MessageType::STATBUCKET_ID: + return true; + default: + return false; } } bool -Bouncer::isExternalWriteOperation(const api::MessageType& type) noexcept { +Bouncer::isExternalWriteOperation(const api::MessageType& type) const noexcept { switch (type.getId()) { case api::MessageType::PUT_ID: case api::MessageType::REMOVE_ID: @@ -214,7 +216,8 @@ Bouncer::rejectDueToInsufficientPriority( api::StorageMessage& msg, api::StorageMessage::Priority feedPriorityLowerBound) { - std::shared_ptr<api::StorageReply> reply(static_cast<api::StorageCommand&>(msg).makeReply()); + std::shared_ptr<api::StorageReply> reply( + static_cast<api::StorageCommand&>(msg).makeReply().release()); std::ostringstream ost; ost << "Operation priority (" << int(msg.getPriority()) << ") is lower than currently configured threshold (" @@ -228,7 +231,8 @@ Bouncer::rejectDueToInsufficientPriority( void Bouncer::reject_due_to_too_few_bucket_bits(api::StorageMessage& msg) { - std::shared_ptr<api::StorageReply> reply(dynamic_cast<api::StorageCommand&>(msg).makeReply()); + std::shared_ptr<api::StorageReply> reply( + dynamic_cast<api::StorageCommand&>(msg).makeReply()); reply->setResult(api::ReturnCode(api::ReturnCode::REJECTED, vespalib::make_string("Operation bucket %s has too few bits used (%u < minimum of %u)", msg.getBucketId().toString().c_str(), @@ -237,22 +241,31 @@ Bouncer::reject_due_to_too_few_bucket_bits(api::StorageMessage& msg) { sendUp(reply); } -void -Bouncer::reject_due_to_node_shutdown(api::StorageMessage& msg) { - std::shared_ptr<api::StorageReply> reply(dynamic_cast<api::StorageCommand&>(msg).makeReply()); - reply->setResult(api::ReturnCode(api::ReturnCode::ABORTED, "Node is shutting down")); - sendUp(reply); -} - bool Bouncer::onDown(const std::shared_ptr<api::StorageMessage>& msg) { + const api::MessageType& type(msg->getType()); + // All replies can come in. + if (type.isReply()) { + return false; + } + + switch (type.getId()) { + case api::MessageType::SETNODESTATE_ID: + case api::MessageType::GETNODESTATE_ID: + case api::MessageType::SETSYSTEMSTATE_ID: + case api::MessageType::ACTIVATE_CLUSTER_STATE_VERSION_ID: + case api::MessageType::NOTIFYBUCKETCHANGE_ID: + // state commands are always ok + return false; + default: + break; + } const lib::State* state; int maxClockSkewInSeconds; bool isInAvailableState; bool abortLoadWhenClusterDown; - bool closed; - const lib::State* cluster_state; + const lib::State *cluster_state; int feedPriorityLowerBound; { std::lock_guard lock(_lock); @@ -262,34 +275,7 @@ Bouncer::onDown(const std::shared_ptr<api::StorageMessage>& msg) cluster_state = _clusterState; isInAvailableState = state->oneOf(_config->stopAllLoadWhenNodestateNotIn.c_str()); feedPriorityLowerBound = _config->feedRejectionPriorityThreshold; - closed = _closed; - } - const api::MessageType& type = msg->getType(); - // If the node is shutting down, we want to prevent _any_ messages from reaching - // components further down the call chain. This means this case must be handled - // _before_ any logic that explicitly allows through certain message types. - if (closed) [[unlikely]] { - if (!type.isReply()) { - reject_due_to_node_shutdown(*msg); - } // else: swallow all replies - return true; } - // All replies can come in. - if (type.isReply()) { - return false; - } - switch (type.getId()) { - case api::MessageType::SETNODESTATE_ID: - case api::MessageType::GETNODESTATE_ID: - case api::MessageType::SETSYSTEMSTATE_ID: - case api::MessageType::ACTIVATE_CLUSTER_STATE_VERSION_ID: - case api::MessageType::NOTIFYBUCKETCHANGE_ID: - // state commands are always ok - return false; - default: - break; - } - // Special case for point lookup Gets while node is in maintenance mode // to allow reads to complete during two-phase cluster state transitions if ((*state == lib::State::MAINTENANCE) && (type.getId() == api::MessageType::GET_ID) && clusterIsUp(*cluster_state)) { diff --git a/storage/src/vespa/storage/storageserver/bouncer.h b/storage/src/vespa/storage/storageserver/bouncer.h index 1038e94ee94..78f07f10316 100644 --- a/storage/src/vespa/storage/storageserver/bouncer.h +++ b/storage/src/vespa/storage/storageserver/bouncer.h @@ -41,7 +41,6 @@ class Bouncer : public StorageLink, const lib::State* _clusterState; std::unique_ptr<config::ConfigFetcher> _configFetcher; std::unique_ptr<BouncerMetrics> _metrics; - bool _closed; public: Bouncer(StorageComponentRegister& compReg, const config::ConfigUri & configUri); @@ -61,12 +60,11 @@ private: void abortCommandDueToClusterDown(api::StorageMessage&, const lib::State&); void rejectDueToInsufficientPriority(api::StorageMessage&, api::StorageMessage::Priority); void reject_due_to_too_few_bucket_bits(api::StorageMessage&); - void reject_due_to_node_shutdown(api::StorageMessage&); static bool clusterIsUp(const lib::State& cluster_state); bool isDistributor() const; - static bool isExternalLoad(const api::MessageType&) noexcept; - static bool isExternalWriteOperation(const api::MessageType&) noexcept; - static bool priorityRejectionIsEnabled(int configuredPriority) noexcept { + bool isExternalLoad(const api::MessageType&) const noexcept; + bool isExternalWriteOperation(const api::MessageType&) const noexcept; + bool priorityRejectionIsEnabled(int configuredPriority) const noexcept { return (configuredPriority != -1); } @@ -74,7 +72,7 @@ private: * If msg is a command containing a mutating timestamp (put, remove or * update commands), return that timestamp. Otherwise, return 0. */ - static uint64_t extractMutationTimestampIfAny(const api::StorageMessage& msg); + uint64_t extractMutationTimestampIfAny(const api::StorageMessage& msg); bool onDown(const std::shared_ptr<api::StorageMessage>&) override; void handleNewState() noexcept override; const lib::NodeState &getDerivedNodeState(document::BucketSpace bucketSpace) const; diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.cpp b/storage/src/vespa/storage/storageserver/communicationmanager.cpp index 37ee7cc2301..610d9c8d707 100644 --- a/storage/src/vespa/storage/storageserver/communicationmanager.cpp +++ b/storage/src/vespa/storage/storageserver/communicationmanager.cpp @@ -217,7 +217,7 @@ convert_to_rpc_compression_config(const vespa::config::content::core::StorCommun } CommunicationManager::CommunicationManager(StorageComponentRegister& compReg, const config::ConfigUri & configUri) - : StorageLink("Communication manager", true), // Explicitly allow msg down during flushing (will be bounced) + : StorageLink("Communication manager"), _component(compReg, "communicationmanager"), _metrics(), _shared_rpc_resources(), // Created upon initial configuration @@ -278,25 +278,25 @@ CommunicationManager::onClose() // Avoid getting config during shutdown _configFetcher.reset(); - _closed.store(true, std::memory_order_seq_cst); + _closed = true; + + if (_mbus) { + if (_messageBusSession) { + _messageBusSession->close(); + } + } + + // TODO remove? this no longer has any particularly useful semantics if (_cc_rpc_service) { - _cc_rpc_service->close(); // Auto-abort all incoming CC RPC requests from now on + _cc_rpc_service->close(); } - // Sync all RPC threads to ensure that any subsequent RPCs must observe the closed-flags we just set + // TODO do this after we drain queues? if (_shared_rpc_resources) { - _shared_rpc_resources->sync_all_threads(); - } - - if (_mbus && _messageBusSession) { - // Closing the mbus session unregisters the destination session and syncs the worker - // thread(s), so once this call returns we should not observe further incoming requests - // through this pipeline. Previous messages may already be in flight internally; these - // will be handled by flushing-phases. - _messageBusSession->close(); + _shared_rpc_resources->shutdown(); } - // Stopping internal message dispatch thread should stop all incoming _async_ messages - // from being processed. _Synchronously_ dispatched RPCs are still passing through. + // Stopping pumper thread should stop all incoming messages from being + // processed. if (_thread) { _thread->interrupt(); _eventQueue.signal(); @@ -305,12 +305,13 @@ CommunicationManager::onClose() } // Emptying remaining queued messages + // FIXME but RPC/mbus is already shut down at this point...! Make sure we handle this std::shared_ptr<api::StorageMessage> msg; api::ReturnCode code(api::ReturnCode::ABORTED, "Node shutting down"); while (_eventQueue.size() > 0) { assert(_eventQueue.getNext(msg, 0ms)); if (!msg->getType().isReply()) { - std::shared_ptr<api::StorageReply> reply(dynamic_cast<api::StorageCommand&>(*msg).makeReply()); + std::shared_ptr<api::StorageReply> reply(static_cast<api::StorageCommand&>(*msg).makeReply()); reply->setResult(code); sendReply(reply); } @@ -318,29 +319,6 @@ CommunicationManager::onClose() } void -CommunicationManager::onFlush(bool downwards) -{ - if (downwards) { - // Sync RPC threads once more (with feeling!) to ensure that any closing done by other components - // during the storage chain onClose() is visible to these. - if (_shared_rpc_resources) { - _shared_rpc_resources->sync_all_threads(); - } - // By this point, no inbound RPCs (requests and responses) should be allowed any further down - // than the Bouncer component, where they will be, well, bounced. - } else { - // All components further down the storage chain should now be completely closed - // and flushed, and all message-dispatching threads should have been shut down. - // It's possible that the RPC threads are still butting heads up against the Bouncer - // component, so we conclude the shutdown ceremony by taking down the RPC subsystem. - // This transitively waits for all RPC threads to complete. - if (_shared_rpc_resources) { - _shared_rpc_resources->shutdown(); - } - } -} - -void CommunicationManager::configureMessageBusLimits(const CommunicationManagerConfig& cfg) { const bool isDist(_component.getNodeType() == lib::NodeType::DISTRIBUTOR); @@ -460,15 +438,11 @@ CommunicationManager::process(const std::shared_ptr<api::StorageMessage>& msg) } } -// Called directly by RPC threads void CommunicationManager::dispatch_sync(std::shared_ptr<api::StorageMessage> msg) { LOG(spam, "Direct dispatch of storage message %s, priority %d", msg->toString().c_str(), msg->getPriority()); - // If process is shutting down, msg will be synchronously aborted by the Bouncer component process(msg); } -// Called directly by RPC threads (for incoming CC requests) and by any other request-dispatching -// threads (i.e. calling sendUp) when address resolution fails and an internal error response is generated. void CommunicationManager::dispatch_async(std::shared_ptr<api::StorageMessage> msg) { LOG(spam, "Enqueued dispatch of storage message %s, priority %d", msg->toString().c_str(), msg->getPriority()); _eventQueue.enqueue(std::move(msg)); diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.h b/storage/src/vespa/storage/storageserver/communicationmanager.h index 3c986c59c5e..da45124ed2d 100644 --- a/storage/src/vespa/storage/storageserver/communicationmanager.h +++ b/storage/src/vespa/storage/storageserver/communicationmanager.h @@ -89,7 +89,6 @@ private: void onOpen() override; void onClose() override; - void onFlush(bool downwards) override; void process(const std::shared_ptr<api::StorageMessage>& msg); diff --git a/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.cpp b/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.cpp index eb933f5eb2c..172084662e2 100644 --- a/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.cpp +++ b/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.cpp @@ -105,10 +105,6 @@ void SharedRpcResources::wait_until_slobrok_is_ready() { } } -void SharedRpcResources::sync_all_threads() { - _transport->sync(); -} - void SharedRpcResources::shutdown() { assert(!_shutdown); if (listen_port() > 0) { diff --git a/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.h b/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.h index d8f7eefad53..1da89dd8869 100644 --- a/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.h +++ b/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.h @@ -42,8 +42,6 @@ public: // To be called after all RPC handlers have been registered. void start_server_and_register_slobrok(vespalib::stringref my_handle); - void sync_all_threads(); - void shutdown(); [[nodiscard]] int listen_port() const noexcept; // Only valid if server has been started diff --git a/storage/src/vespa/storage/storageserver/storagenode.cpp b/storage/src/vespa/storage/storageserver/storagenode.cpp index 452a94496af..3231deef268 100644 --- a/storage/src/vespa/storage/storageserver/storagenode.cpp +++ b/storage/src/vespa/storage/storageserver/storagenode.cpp @@ -358,7 +358,7 @@ StorageNode::shutdown() { // Try to shut down in opposite order of initialize. Bear in mind that // we might be shutting down after init exception causing only parts - // of the server to have been initialized + // of the server to have initialize LOG(debug, "Shutting down storage node of type %s", getNodeType().toString().c_str()); if (!attemptedStopped()) { LOG(debug, "Storage killed before requestShutdown() was called. No " |