diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2022-12-20 12:15:32 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-12-20 12:15:32 +0100 |
commit | 2d82ff0d3a0f95a3b19c61e318a3cb3dfa6a2491 (patch) | |
tree | f03103dc0a2c0fe21795448956f016f690b3b9c9 /config | |
parent | 341f28abc688e5891aff2701f817159b4b01943f (diff) | |
parent | 2eea867391eb583affa921774f43c6a3a368c03a (diff) |
Merge pull request #25301 from vespa-engine/balder/allow-multiple-concurrent-requests
Balder/allow multiple concurrent requests [run-systemtest]
Diffstat (limited to 'config')
-rw-r--r-- | config/src/vespa/config/frt/frtsource.cpp | 59 | ||||
-rw-r--r-- | config/src/vespa/config/frt/frtsource.h | 10 |
2 files changed, 55 insertions, 14 deletions
diff --git a/config/src/vespa/config/frt/frtsource.cpp b/config/src/vespa/config/frt/frtsource.cpp index 1cb514dcd4d..6030b27da02 100644 --- a/config/src/vespa/config/frt/frtsource.cpp +++ b/config/src/vespa/config/frt/frtsource.cpp @@ -35,9 +35,9 @@ FRTSource::FRTSource(std::shared_ptr<ConnectionFactory> connectionFactory, const : _connectionFactory(std::move(connectionFactory)), _requestFactory(requestFactory), _agent(std::move(agent)), - _currentRequest(), _key(key), _lock(), + _inflight(), _task(std::make_unique<GetConfigTask>(_connectionFactory->getScheduler(), this)), _closed(false) { @@ -66,26 +66,57 @@ FRTSource::getConfig() std::unique_ptr<FRTConfigRequest> request = _requestFactory.createConfigRequest(_key, connection, state, serverTimeout); FRT_RPCRequest * req = request->getRequest(); - - _currentRequest = std::move(request); + { + std::lock_guard guard(_lock); + _inflight[req] = std::move(request); + } connection->invoke(req, clientTimeout, this); } +void +FRTSource::erase(FRT_RPCRequest * request) { + std::lock_guard guard(_lock); + auto num_erased = _inflight.erase(request); + assert(1u == num_erased); + _cond.notify_all(); +} + +std::shared_ptr<FRTConfigRequest> +FRTSource::find(FRT_RPCRequest * request) { + std::lock_guard guard(_lock); + auto found = _inflight.find(request); + assert(found != _inflight.end()); + return found->second; +} + +class FRTSource::CleanupGuard { +public: + CleanupGuard(FRTSource * frtSource, FRT_RPCRequest * request) + : _frtSource(frtSource), _request(request) {} + ~CleanupGuard() { + _frtSource->erase(_request); + } +private: + FRTSource * _frtSource; + FRT_RPCRequest * _request; +}; + void FRTSource::RequestDone(FRT_RPCRequest * request) { + FRTSource::CleanupGuard cleanup(this, request); if (request->GetErrorCode() == FRTE_RPC_ABORT) { LOG(debug, "request aborted, stopping"); return; } - assert(_currentRequest); + std::shared_ptr<FRTConfigRequest> configRequest = find(request); // If this was error from FRT side and nothing to do with config, notify // connection about the error. if (request->IsError()) { - _currentRequest->setError(request->GetErrorCode()); + configRequest->setError(request->GetErrorCode()); } - _agent->handleResponse(*_currentRequest, _currentRequest->createResponse(request)); + _agent->handleResponse(*configRequest, configRequest->createResponse(request)); LOG(spam, "Calling schedule"); scheduleNextGetConfig(); } @@ -93,19 +124,25 @@ FRTSource::RequestDone(FRT_RPCRequest * request) void FRTSource::close() { + RequestMap inflight; { std::lock_guard guard(_lock); if (_closed) return; LOG(spam, "Killing task"); _task->Kill(); + inflight = _inflight; } LOG(spam, "Aborting"); - if (_currentRequest.get() != NULL) - _currentRequest->abort(); - LOG(spam, "Syncing"); - _connectionFactory->syncTransport(); - _currentRequest.reset(0); + for (auto & request : inflight) { + request.second->abort(); + } + inflight.clear(); + LOG(spam, "Waiting"); + std::unique_lock guard(_lock); + while (!_inflight.empty()) { + _cond.wait(guard); + } LOG(spam, "closed"); } diff --git a/config/src/vespa/config/frt/frtsource.h b/config/src/vespa/config/frt/frtsource.h index 1885aa7e534..ceb0484a6bb 100644 --- a/config/src/vespa/config/frt/frtsource.h +++ b/config/src/vespa/config/frt/frtsource.h @@ -27,15 +27,19 @@ public: void reload(int64_t generation) override; void getConfig() override; private: + class CleanupGuard; void scheduleNextGetConfig(); + void erase(FRT_RPCRequest *); + std::shared_ptr<FRTConfigRequest> find(FRT_RPCRequest *); + using RequestMap = std::map<FRT_RPCRequest *, std::shared_ptr<FRTConfigRequest>>; std::shared_ptr<ConnectionFactory> _connectionFactory; const FRTConfigRequestFactory & _requestFactory; std::unique_ptr<ConfigAgent> _agent; - std::unique_ptr<FRTConfigRequest> _currentRequest; const ConfigKey _key; - - std::mutex _lock; // Protects _task and _closed + std::mutex _lock; // Protects _inflight, _task and _closed + std::condition_variable _cond; + RequestMap _inflight; std::unique_ptr<FNET_Task> _task; bool _closed; }; |