From 553ae81bbfb5b69ca94561f0a31d11619c0f46e7 Mon Sep 17 00:00:00 2001 From: Henning Baldersheim Date: Tue, 20 Dec 2022 06:37:17 +0000 Subject: - Allow the FRTSource to handle multiple concurrent requests by using a guarded map. - Also wait for all requests to finish and avoid transportSync(). --- config/src/vespa/config/frt/frtsource.cpp | 39 ++++++++++++++++++++++--------- config/src/vespa/config/frt/frtsource.h | 7 +++--- 2 files changed, 32 insertions(+), 14 deletions(-) (limited to 'config') diff --git a/config/src/vespa/config/frt/frtsource.cpp b/config/src/vespa/config/frt/frtsource.cpp index 1cb514dcd4d..11839ac0b50 100644 --- a/config/src/vespa/config/frt/frtsource.cpp +++ b/config/src/vespa/config/frt/frtsource.cpp @@ -35,9 +35,9 @@ FRTSource::FRTSource(std::shared_ptr connectionFactory, const : _connectionFactory(std::move(connectionFactory)), _requestFactory(requestFactory), _agent(std::move(agent)), - _currentRequest(), _key(key), _lock(), + _inflight(), _task(std::make_unique(_connectionFactory->getScheduler(), this)), _closed(false) { @@ -66,8 +66,10 @@ FRTSource::getConfig() std::unique_ptr request = _requestFactory.createConfigRequest(_key, connection, state, serverTimeout); FRT_RPCRequest * req = request->getRequest(); - - _currentRequest = std::move(request); + { + std::lock_guard guard(_lock); + _inflight[req] = std::move(request); + } connection->invoke(req, clientTimeout, this); } @@ -79,13 +81,24 @@ FRTSource::RequestDone(FRT_RPCRequest * request) LOG(debug, "request aborted, stopping"); return; } - assert(_currentRequest); + std::shared_ptr configRequest; + { + std::lock_guard guard(_lock); + auto found = _inflight.find(request); + assert(found != _inflight.end()); + configRequest = found->second; + } // If this was error from FRT side and nothing to do with config, notify // connection about the error. if (request->IsError()) { - _currentRequest->setError(request->GetErrorCode()); + configRequest->setError(request->GetErrorCode()); + } + _agent->handleResponse(*configRequest, configRequest->createResponse(request)); + { + std::lock_guard guard(_lock); + _inflight.erase(request); + _cond.notify_all(); } - _agent->handleResponse(*_currentRequest, _currentRequest->createResponse(request)); LOG(spam, "Calling schedule"); scheduleNextGetConfig(); } @@ -93,19 +106,23 @@ FRTSource::RequestDone(FRT_RPCRequest * request) void FRTSource::close() { + RequestMap inflight; { std::lock_guard guard(_lock); if (_closed) return; LOG(spam, "Killing task"); _task->Kill(); + inflight = _inflight; } LOG(spam, "Aborting"); - if (_currentRequest.get() != NULL) - _currentRequest->abort(); - LOG(spam, "Syncing"); - _connectionFactory->syncTransport(); - _currentRequest.reset(0); + for (auto & request : inflight) { + std::move(request.second)->abort(); + } + inflight.clear(); + LOG(spam, "Waiting"); + std::unique_lock guard(_lock); + while (!_inflight.empty()) _cond.wait(guard); LOG(spam, "closed"); } diff --git a/config/src/vespa/config/frt/frtsource.h b/config/src/vespa/config/frt/frtsource.h index 1885aa7e534..104b7318d8d 100644 --- a/config/src/vespa/config/frt/frtsource.h +++ b/config/src/vespa/config/frt/frtsource.h @@ -29,13 +29,14 @@ public: private: void scheduleNextGetConfig(); + using RequestMap = std::map>; std::shared_ptr _connectionFactory; const FRTConfigRequestFactory & _requestFactory; std::unique_ptr _agent; - std::unique_ptr _currentRequest; const ConfigKey _key; - - std::mutex _lock; // Protects _task and _closed + std::mutex _lock; // Protects _inflight, _task and _closed + std::condition_variable _cond; + RequestMap _inflight; std::unique_ptr _task; bool _closed; }; -- cgit v1.2.3 From f3956e6144fdb4fc05b4d61cab45b77a89dca48a Mon Sep 17 00:00:00 2001 From: Henning Baldersheim Date: Tue, 20 Dec 2022 06:40:17 +0000 Subject: Also drop aborted requests. --- config/src/vespa/config/frt/frtsource.cpp | 33 +++++++++++++++++++------------ config/src/vespa/config/frt/frtsource.h | 2 ++ 2 files changed, 22 insertions(+), 13 deletions(-) (limited to 'config') diff --git a/config/src/vespa/config/frt/frtsource.cpp b/config/src/vespa/config/frt/frtsource.cpp index 11839ac0b50..ea0b514e168 100644 --- a/config/src/vespa/config/frt/frtsource.cpp +++ b/config/src/vespa/config/frt/frtsource.cpp @@ -73,32 +73,37 @@ FRTSource::getConfig() connection->invoke(req, clientTimeout, this); } +void +FRTSource::erase(FRT_RPCRequest * request) { + std::lock_guard guard(_lock); + _inflight.erase(request); + _cond.notify_all(); +} + +std::shared_ptr +FRTSource::find(FRT_RPCRequest * request) { + std::lock_guard guard(_lock); + auto found = _inflight.find(request); + assert(found != _inflight.end()); + return found->second; +} void FRTSource::RequestDone(FRT_RPCRequest * request) { if (request->GetErrorCode() == FRTE_RPC_ABORT) { LOG(debug, "request aborted, stopping"); + erase(request); return; } - std::shared_ptr configRequest; - { - std::lock_guard guard(_lock); - auto found = _inflight.find(request); - assert(found != _inflight.end()); - configRequest = found->second; - } + std::shared_ptr configRequest = find(request); // If this was error from FRT side and nothing to do with config, notify // connection about the error. if (request->IsError()) { configRequest->setError(request->GetErrorCode()); } _agent->handleResponse(*configRequest, configRequest->createResponse(request)); - { - std::lock_guard guard(_lock); - _inflight.erase(request); - _cond.notify_all(); - } + erase(request); LOG(spam, "Calling schedule"); scheduleNextGetConfig(); } @@ -122,7 +127,9 @@ FRTSource::close() inflight.clear(); LOG(spam, "Waiting"); std::unique_lock guard(_lock); - while (!_inflight.empty()) _cond.wait(guard); + while (!_inflight.empty()) { + _cond.wait(guard); + } LOG(spam, "closed"); } diff --git a/config/src/vespa/config/frt/frtsource.h b/config/src/vespa/config/frt/frtsource.h index 104b7318d8d..8bc654dd05a 100644 --- a/config/src/vespa/config/frt/frtsource.h +++ b/config/src/vespa/config/frt/frtsource.h @@ -28,6 +28,8 @@ public: void getConfig() override; private: void scheduleNextGetConfig(); + void erase(FRT_RPCRequest *); + std::shared_ptr find(FRT_RPCRequest *); using RequestMap = std::map>; std::shared_ptr _connectionFactory; -- cgit v1.2.3 From d4ff7d34d3d60ff7da1323e4ddb7308efadecdd9 Mon Sep 17 00:00:00 2001 From: Henning Baldersheim Date: Tue, 20 Dec 2022 10:31:40 +0000 Subject: Use a guard to ensure cleanup is done in all cases, and the last thing done before leaving method. --- config/src/vespa/config/frt/frtsource.cpp | 20 ++++++++++++++++---- config/src/vespa/config/frt/frtsource.h | 1 + 2 files changed, 17 insertions(+), 4 deletions(-) (limited to 'config') diff --git a/config/src/vespa/config/frt/frtsource.cpp b/config/src/vespa/config/frt/frtsource.cpp index ea0b514e168..08eeb5171f6 100644 --- a/config/src/vespa/config/frt/frtsource.cpp +++ b/config/src/vespa/config/frt/frtsource.cpp @@ -76,7 +76,7 @@ FRTSource::getConfig() void FRTSource::erase(FRT_RPCRequest * request) { std::lock_guard guard(_lock); - _inflight.erase(request); + assert(1 == _inflight.erase(request)); _cond.notify_all(); } @@ -88,12 +88,25 @@ FRTSource::find(FRT_RPCRequest * request) { return found->second; } +class FRTSource::CleanupGuard { +public: + CleanupGuard(FRTSource * frtSource, FRT_RPCRequest * request) + : _frtSource(frtSource), _request(request) {} + ~CleanupGuard() { + _frtSource->erase(_request); + } +private: + FRTSource * _frtSource; + FRT_RPCRequest * _request; +}; + + void FRTSource::RequestDone(FRT_RPCRequest * request) { + FRTSource::CleanupGuard cleanup(this, request); if (request->GetErrorCode() == FRTE_RPC_ABORT) { LOG(debug, "request aborted, stopping"); - erase(request); return; } std::shared_ptr configRequest = find(request); @@ -103,7 +116,6 @@ FRTSource::RequestDone(FRT_RPCRequest * request) configRequest->setError(request->GetErrorCode()); } _agent->handleResponse(*configRequest, configRequest->createResponse(request)); - erase(request); LOG(spam, "Calling schedule"); scheduleNextGetConfig(); } @@ -122,7 +134,7 @@ FRTSource::close() } LOG(spam, "Aborting"); for (auto & request : inflight) { - std::move(request.second)->abort(); + request.second->abort(); } inflight.clear(); LOG(spam, "Waiting"); diff --git a/config/src/vespa/config/frt/frtsource.h b/config/src/vespa/config/frt/frtsource.h index 8bc654dd05a..ceb0484a6bb 100644 --- a/config/src/vespa/config/frt/frtsource.h +++ b/config/src/vespa/config/frt/frtsource.h @@ -27,6 +27,7 @@ public: void reload(int64_t generation) override; void getConfig() override; private: + class CleanupGuard; void scheduleNextGetConfig(); void erase(FRT_RPCRequest *); std::shared_ptr find(FRT_RPCRequest *); -- cgit v1.2.3 From 2eea867391eb583affa921774f43c6a3a368c03a Mon Sep 17 00:00:00 2001 From: Henning Baldersheim Date: Tue, 20 Dec 2022 10:46:56 +0000 Subject: No active code in assert..... --- config/src/vespa/config/frt/frtsource.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'config') diff --git a/config/src/vespa/config/frt/frtsource.cpp b/config/src/vespa/config/frt/frtsource.cpp index 08eeb5171f6..6030b27da02 100644 --- a/config/src/vespa/config/frt/frtsource.cpp +++ b/config/src/vespa/config/frt/frtsource.cpp @@ -76,7 +76,8 @@ FRTSource::getConfig() void FRTSource::erase(FRT_RPCRequest * request) { std::lock_guard guard(_lock); - assert(1 == _inflight.erase(request)); + auto num_erased = _inflight.erase(request); + assert(1u == num_erased); _cond.notify_all(); } -- cgit v1.2.3