summaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@yahooinc.com>2022-02-28 14:14:30 +0100
committerGitHub <noreply@github.com>2022-02-28 14:14:30 +0100
commitf77539dc41418af0305ba863bef3299de6f8f18d (patch)
tree22ca7d91e02fc999b925cac9efe5dd41d138c466 /storage
parent5a51cc51bf80f4d7c1afa1993e7df6c50a6e3d71 (diff)
parentd6fe74c8a798bb3b59c22d01579818ab78cbeebd (diff)
Merge pull request #21408 from vespa-engine/vekterli/fix-storage-slobrok-threading-edge-cases
Gracefully flush pending Slobrok task on content node RPC teardown [run-systemtest]
Diffstat (limited to 'storage')
-rw-r--r--storage/src/tests/storageserver/communicationmanagertest.cpp29
-rw-r--r--storage/src/vespa/storage/storageserver/communicationmanager.cpp8
-rw-r--r--storage/src/vespa/storage/storageserver/communicationmanager.h4
-rw-r--r--storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.cpp5
-rw-r--r--storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.cpp9
-rw-r--r--storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.h2
6 files changed, 55 insertions, 2 deletions
diff --git a/storage/src/tests/storageserver/communicationmanagertest.cpp b/storage/src/tests/storageserver/communicationmanagertest.cpp
index f8462d528c7..05dc1c642d0 100644
--- a/storage/src/tests/storageserver/communicationmanagertest.cpp
+++ b/storage/src/tests/storageserver/communicationmanagertest.cpp
@@ -45,6 +45,24 @@ struct CommunicationManagerTest : Test {
}
};
+namespace {
+
+void
+wait_for_slobrok_visibility(const CommunicationManager& mgr,
+ const api::StorageMessageAddress& addr)
+{
+ const auto deadline = vespalib::steady_clock::now() + 60s;
+ do {
+ if (mgr.address_visible_in_slobrok(addr)) {
+ return;
+ }
+ std::this_thread::sleep_for(10ms);
+ } while (vespalib::steady_clock::now() < deadline);
+ FAIL() << "Timed out waiting for address " << addr.toString() << " to be visible in Slobrok";
+}
+
+}
+
TEST_F(CommunicationManagerTest, simple) {
mbus::Slobrok slobrok;
vdstestlib::DirConfig distConfig(getStandardConfig(false));
@@ -70,12 +88,19 @@ TEST_F(CommunicationManagerTest, simple) {
distributor.open();
storage.open();
- std::this_thread::sleep_for(1s);
+ auto stor_addr = api::StorageMessageAddress::create(&_Storage, lib::NodeType::STORAGE, 1);
+ auto distr_addr = api::StorageMessageAddress::create(&_Storage, lib::NodeType::DISTRIBUTOR, 1);
+ // It is undefined when the logical nodes will be visible in each others Slobrok
+ // mirrors, so explicitly wait until mutual visibility is ensured. Failure to do this
+ // might cause the below message to be immediately bounced due to failing to map the
+ // storage address to an actual RPC endpoint.
+ ASSERT_NO_FATAL_FAILURE(wait_for_slobrok_visibility(distributor, stor_addr));
+ ASSERT_NO_FATAL_FAILURE(wait_for_slobrok_visibility(storage, distr_addr));
// Send a message through from distributor to storage
auto cmd = std::make_shared<api::GetCommand>(
makeDocumentBucket(document::BucketId(0)), document::DocumentId("id:ns:mytype::mydoc"), document::AllFields::NAME);
- cmd->setAddress(api::StorageMessageAddress::create(&_Storage, lib::NodeType::STORAGE, 1));
+ cmd->setAddress(stor_addr);
distributorLink->sendUp(cmd);
storageLink->waitForMessages(1, MESSAGE_WAIT_TIME_SEC);
ASSERT_GT(storageLink->getNumCommands(), 0);
diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.cpp b/storage/src/vespa/storage/storageserver/communicationmanager.cpp
index 237dc76d783..975e9361072 100644
--- a/storage/src/vespa/storage/storageserver/communicationmanager.cpp
+++ b/storage/src/vespa/storage/storageserver/communicationmanager.cpp
@@ -6,6 +6,7 @@
#include <vespa/messagebus/emptyreply.h>
#include <vespa/messagebus/network/rpcnetworkparams.h>
#include <vespa/messagebus/rpcmessagebus.h>
+#include <vespa/slobrok/sbmirror.h>
#include <vespa/storage/common/bucket_resolver.h>
#include <vespa/storage/common/nodestateupdater.h>
#include <vespa/storage/config/config-stor-server.h>
@@ -806,4 +807,11 @@ void CommunicationManager::updateBucketSpacesConfig(const BucketspacesConfig& co
_docApiConverter.setBucketResolver(ConfigurableBucketResolver::from_config(config));
}
+bool
+CommunicationManager::address_visible_in_slobrok(const api::StorageMessageAddress& addr) const noexcept
+{
+ assert(_storage_api_rpc_service);
+ return _storage_api_rpc_service->address_visible_in_slobrok_uncached(addr);
+}
+
} // storage
diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.h b/storage/src/vespa/storage/storageserver/communicationmanager.h
index 80117b32030..31c6fa00f0e 100644
--- a/storage/src/vespa/storage/storageserver/communicationmanager.h
+++ b/storage/src/vespa/storage/storageserver/communicationmanager.h
@@ -164,6 +164,10 @@ public:
void updateBucketSpacesConfig(const BucketspacesConfig&);
const CommunicationManagerMetrics& metrics() const noexcept { return _metrics; }
+
+ // Intended primarily for unit tests that fire up multiple nodes and must wait until all
+ // nodes are cross-visible in Slobrok before progressing.
+ [[nodiscard]] bool address_visible_in_slobrok(const api::StorageMessageAddress& addr) const noexcept;
};
} // storage
diff --git a/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.cpp b/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.cpp
index e1a2dc6b03c..7ad59ee574c 100644
--- a/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.cpp
+++ b/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.cpp
@@ -110,7 +110,12 @@ void SharedRpcResources::shutdown() {
assert(!_shutdown);
if (listen_port() > 0) {
_slobrok_register->unregisterName(_handle);
+ // Give slobrok some time to dispatch unregister RPC
+ while (_slobrok_register->busy()) {
+ std::this_thread::sleep_for(10ms);
+ }
}
+ _slobrok_register.reset(); // Implicitly kill any pending slobrok tasks prior to shutting down transport layer
_transport->ShutDown(true);
// FIXME need to reset to break weak_ptrs? But ShutDown should already sync pending resolves...!
_shutdown = true;
diff --git a/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.cpp b/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.cpp
index 06323199341..78a9956c334 100644
--- a/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.cpp
+++ b/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.cpp
@@ -394,6 +394,15 @@ bool StorageApiRpcService::target_supports_direct_rpc(
return _direct_rpc_supported.load(std::memory_order_relaxed);
}
+bool
+StorageApiRpcService::address_visible_in_slobrok_uncached(
+ const api::StorageMessageAddress& addr) const noexcept
+{
+ auto sb_id = CachingRpcTargetResolver::address_to_slobrok_id(addr);
+ auto specs = _rpc_resources.slobrok_mirror().lookup(sb_id);
+ return !specs.empty();
+}
+
/*
* Major TODOs:
diff --git a/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.h b/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.h
index 94bf663837c..2526cf5434c 100644
--- a/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.h
+++ b/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.h
@@ -56,6 +56,8 @@ public:
~StorageApiRpcService() override;
[[nodiscard]] bool target_supports_direct_rpc(const api::StorageMessageAddress& addr) const noexcept;
+ // Bypasses resolver cache and returns whether local Slobrok mirror has at least 1 spec for the given address.
+ [[nodiscard]] bool address_visible_in_slobrok_uncached(const api::StorageMessageAddress& addr) const noexcept;
void RPC_rpc_v1_send(FRT_RPCRequest* req);
void encode_rpc_v1_response(FRT_RPCRequest& request, api::StorageReply& reply);