diff options
5 files changed, 50 insertions, 2 deletions
diff --git a/storage/src/tests/storageserver/communicationmanagertest.cpp b/storage/src/tests/storageserver/communicationmanagertest.cpp index f8462d528c7..05dc1c642d0 100644 --- a/storage/src/tests/storageserver/communicationmanagertest.cpp +++ b/storage/src/tests/storageserver/communicationmanagertest.cpp @@ -45,6 +45,24 @@ struct CommunicationManagerTest : Test { } }; +namespace { + +void +wait_for_slobrok_visibility(const CommunicationManager& mgr, + const api::StorageMessageAddress& addr) +{ + const auto deadline = vespalib::steady_clock::now() + 60s; + do { + if (mgr.address_visible_in_slobrok(addr)) { + return; + } + std::this_thread::sleep_for(10ms); + } while (vespalib::steady_clock::now() < deadline); + FAIL() << "Timed out waiting for address " << addr.toString() << " to be visible in Slobrok"; +} + +} + TEST_F(CommunicationManagerTest, simple) { mbus::Slobrok slobrok; vdstestlib::DirConfig distConfig(getStandardConfig(false)); @@ -70,12 +88,19 @@ TEST_F(CommunicationManagerTest, simple) { distributor.open(); storage.open(); - std::this_thread::sleep_for(1s); + auto stor_addr = api::StorageMessageAddress::create(&_Storage, lib::NodeType::STORAGE, 1); + auto distr_addr = api::StorageMessageAddress::create(&_Storage, lib::NodeType::DISTRIBUTOR, 1); + // It is undefined when the logical nodes will be visible in each others Slobrok + // mirrors, so explicitly wait until mutual visibility is ensured. Failure to do this + // might cause the below message to be immediately bounced due to failing to map the + // storage address to an actual RPC endpoint. + ASSERT_NO_FATAL_FAILURE(wait_for_slobrok_visibility(distributor, stor_addr)); + ASSERT_NO_FATAL_FAILURE(wait_for_slobrok_visibility(storage, distr_addr)); // Send a message through from distributor to storage auto cmd = std::make_shared<api::GetCommand>( makeDocumentBucket(document::BucketId(0)), document::DocumentId("id:ns:mytype::mydoc"), document::AllFields::NAME); - cmd->setAddress(api::StorageMessageAddress::create(&_Storage, lib::NodeType::STORAGE, 1)); + cmd->setAddress(stor_addr); distributorLink->sendUp(cmd); storageLink->waitForMessages(1, MESSAGE_WAIT_TIME_SEC); ASSERT_GT(storageLink->getNumCommands(), 0); diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.cpp b/storage/src/vespa/storage/storageserver/communicationmanager.cpp index 237dc76d783..975e9361072 100644 --- a/storage/src/vespa/storage/storageserver/communicationmanager.cpp +++ b/storage/src/vespa/storage/storageserver/communicationmanager.cpp @@ -6,6 +6,7 @@ #include <vespa/messagebus/emptyreply.h> #include <vespa/messagebus/network/rpcnetworkparams.h> #include <vespa/messagebus/rpcmessagebus.h> +#include <vespa/slobrok/sbmirror.h> #include <vespa/storage/common/bucket_resolver.h> #include <vespa/storage/common/nodestateupdater.h> #include <vespa/storage/config/config-stor-server.h> @@ -806,4 +807,11 @@ void CommunicationManager::updateBucketSpacesConfig(const BucketspacesConfig& co _docApiConverter.setBucketResolver(ConfigurableBucketResolver::from_config(config)); } +bool +CommunicationManager::address_visible_in_slobrok(const api::StorageMessageAddress& addr) const noexcept +{ + assert(_storage_api_rpc_service); + return _storage_api_rpc_service->address_visible_in_slobrok_uncached(addr); +} + } // storage diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.h b/storage/src/vespa/storage/storageserver/communicationmanager.h index 80117b32030..31c6fa00f0e 100644 --- a/storage/src/vespa/storage/storageserver/communicationmanager.h +++ b/storage/src/vespa/storage/storageserver/communicationmanager.h @@ -164,6 +164,10 @@ public: void updateBucketSpacesConfig(const BucketspacesConfig&); const CommunicationManagerMetrics& metrics() const noexcept { return _metrics; } + + // Intended primarily for unit tests that fire up multiple nodes and must wait until all + // nodes are cross-visible in Slobrok before progressing. + [[nodiscard]] bool address_visible_in_slobrok(const api::StorageMessageAddress& addr) const noexcept; }; } // storage diff --git a/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.cpp b/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.cpp index 06323199341..78a9956c334 100644 --- a/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.cpp +++ b/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.cpp @@ -394,6 +394,15 @@ bool StorageApiRpcService::target_supports_direct_rpc( return _direct_rpc_supported.load(std::memory_order_relaxed); } +bool +StorageApiRpcService::address_visible_in_slobrok_uncached( + const api::StorageMessageAddress& addr) const noexcept +{ + auto sb_id = CachingRpcTargetResolver::address_to_slobrok_id(addr); + auto specs = _rpc_resources.slobrok_mirror().lookup(sb_id); + return !specs.empty(); +} + /* * Major TODOs: diff --git a/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.h b/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.h index 94bf663837c..2526cf5434c 100644 --- a/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.h +++ b/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.h @@ -56,6 +56,8 @@ public: ~StorageApiRpcService() override; [[nodiscard]] bool target_supports_direct_rpc(const api::StorageMessageAddress& addr) const noexcept; + // Bypasses resolver cache and returns whether local Slobrok mirror has at least 1 spec for the given address. + [[nodiscard]] bool address_visible_in_slobrok_uncached(const api::StorageMessageAddress& addr) const noexcept; void RPC_rpc_v1_send(FRT_RPCRequest* req); void encode_rpc_v1_response(FRT_RPCRequest& request, api::StorageReply& reply); |