summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2022-12-19 13:55:30 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2022-12-19 13:59:33 +0000
commitf2242995a019ec23640e98976ff7a3937913e403 (patch)
tree5ab0d8765eea8a62f481cb4a8080bc18fe71fa2f
parent408d4a4aa6bc5896a2ea73164ddb4cae780e6890 (diff)
- Allow the FRTSource to handle multiple concurrent requests by using a guarded map.
- Also wait for all requests to finish and avoid transportSync().
-rw-r--r--config/src/vespa/config/frt/frtsource.cpp40
-rw-r--r--config/src/vespa/config/frt/frtsource.h7
2 files changed, 33 insertions, 14 deletions
diff --git a/config/src/vespa/config/frt/frtsource.cpp b/config/src/vespa/config/frt/frtsource.cpp
index 1cb514dcd4d..d58be3c2ae4 100644
--- a/config/src/vespa/config/frt/frtsource.cpp
+++ b/config/src/vespa/config/frt/frtsource.cpp
@@ -35,9 +35,9 @@ FRTSource::FRTSource(std::shared_ptr<ConnectionFactory> connectionFactory, const
: _connectionFactory(std::move(connectionFactory)),
_requestFactory(requestFactory),
_agent(std::move(agent)),
- _currentRequest(),
_key(key),
_lock(),
+ _inflight(),
_task(std::make_unique<GetConfigTask>(_connectionFactory->getScheduler(), this)),
_closed(false)
{
@@ -66,8 +66,10 @@ FRTSource::getConfig()
std::unique_ptr<FRTConfigRequest> request = _requestFactory.createConfigRequest(_key, connection, state, serverTimeout);
FRT_RPCRequest * req = request->getRequest();
-
- _currentRequest = std::move(request);
+ {
+ std::lock_guard guard(_lock);
+ _inflight[req] = std::move(request);
+ }
connection->invoke(req, clientTimeout, this);
}
@@ -79,13 +81,25 @@ FRTSource::RequestDone(FRT_RPCRequest * request)
LOG(debug, "request aborted, stopping");
return;
}
- assert(_currentRequest);
+ std::shared_ptr<FRTConfigRequest> configRequest;
+ {
+ auto found = _inflight.find(request);
+ if (found == _inflight.end()) {
+ assert(false);
+ }
+ configRequest = found->second;
+ }
// If this was error from FRT side and nothing to do with config, notify
// connection about the error.
if (request->IsError()) {
- _currentRequest->setError(request->GetErrorCode());
+ configRequest->setError(request->GetErrorCode());
+ }
+ _agent->handleResponse(*configRequest, configRequest->createResponse(request));
+ {
+ std::lock_guard guard(_lock);
+ _inflight.erase(request);
+ _cond.notify_all();
}
- _agent->handleResponse(*_currentRequest, _currentRequest->createResponse(request));
LOG(spam, "Calling schedule");
scheduleNextGetConfig();
}
@@ -93,19 +107,23 @@ FRTSource::RequestDone(FRT_RPCRequest * request)
void
FRTSource::close()
{
+ RequestMap inflight;
{
std::lock_guard guard(_lock);
if (_closed)
return;
LOG(spam, "Killing task");
_task->Kill();
+ inflight = _inflight;
}
LOG(spam, "Aborting");
- if (_currentRequest.get() != NULL)
- _currentRequest->abort();
- LOG(spam, "Syncing");
- _connectionFactory->syncTransport();
- _currentRequest.reset(0);
+ for (auto & request : inflight) {
+ std::move(request.second)->abort();
+ }
+ inflight.clear();
+ LOG(spam, "Waiting");
+ std::unique_lock guard(_lock);
+ while (!_inflight.empty()) _cond.wait(guard);
LOG(spam, "closed");
}
diff --git a/config/src/vespa/config/frt/frtsource.h b/config/src/vespa/config/frt/frtsource.h
index 1885aa7e534..5c878730db2 100644
--- a/config/src/vespa/config/frt/frtsource.h
+++ b/config/src/vespa/config/frt/frtsource.h
@@ -29,13 +29,14 @@ public:
private:
void scheduleNextGetConfig();
+ using RequestMap = std::map<FRT_RPCRequest *, std::shared_ptr<FRTConfigRequest>>;
std::shared_ptr<ConnectionFactory> _connectionFactory;
const FRTConfigRequestFactory & _requestFactory;
std::unique_ptr<ConfigAgent> _agent;
- std::unique_ptr<FRTConfigRequest> _currentRequest;
const ConfigKey _key;
-
- std::mutex _lock; // Protects _task and _closed
+ std::mutex _lock; // Protects _requests, _task and _closed
+ std::condition_variable _cond;
+ RequestMap _inflight;
std::unique_ptr<FNET_Task> _task;
bool _closed;
};