summaryrefslogtreecommitdiffstats
path: root/config
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2022-12-20 12:15:32 +0100
committerGitHub <noreply@github.com>2022-12-20 12:15:32 +0100
commit2d82ff0d3a0f95a3b19c61e318a3cb3dfa6a2491 (patch)
treef03103dc0a2c0fe21795448956f016f690b3b9c9 /config
parent341f28abc688e5891aff2701f817159b4b01943f (diff)
parent2eea867391eb583affa921774f43c6a3a368c03a (diff)
Merge pull request #25301 from vespa-engine/balder/allow-multiple-concurrent-requests
Balder/allow multiple concurrent requests [run-systemtest]
Diffstat (limited to 'config')
-rw-r--r--config/src/vespa/config/frt/frtsource.cpp59
-rw-r--r--config/src/vespa/config/frt/frtsource.h10
2 files changed, 55 insertions, 14 deletions
diff --git a/config/src/vespa/config/frt/frtsource.cpp b/config/src/vespa/config/frt/frtsource.cpp
index 1cb514dcd4d..6030b27da02 100644
--- a/config/src/vespa/config/frt/frtsource.cpp
+++ b/config/src/vespa/config/frt/frtsource.cpp
@@ -35,9 +35,9 @@ FRTSource::FRTSource(std::shared_ptr<ConnectionFactory> connectionFactory, const
: _connectionFactory(std::move(connectionFactory)),
_requestFactory(requestFactory),
_agent(std::move(agent)),
- _currentRequest(),
_key(key),
_lock(),
+ _inflight(),
_task(std::make_unique<GetConfigTask>(_connectionFactory->getScheduler(), this)),
_closed(false)
{
@@ -66,26 +66,57 @@ FRTSource::getConfig()
std::unique_ptr<FRTConfigRequest> request = _requestFactory.createConfigRequest(_key, connection, state, serverTimeout);
FRT_RPCRequest * req = request->getRequest();
-
- _currentRequest = std::move(request);
+ {
+ std::lock_guard guard(_lock);
+ _inflight[req] = std::move(request);
+ }
connection->invoke(req, clientTimeout, this);
}
+void
+FRTSource::erase(FRT_RPCRequest * request) {
+ std::lock_guard guard(_lock);
+ auto num_erased = _inflight.erase(request);
+ assert(1u == num_erased);
+ _cond.notify_all();
+}
+
+std::shared_ptr<FRTConfigRequest>
+FRTSource::find(FRT_RPCRequest * request) {
+ std::lock_guard guard(_lock);
+ auto found = _inflight.find(request);
+ assert(found != _inflight.end());
+ return found->second;
+}
+
+class FRTSource::CleanupGuard {
+public:
+ CleanupGuard(FRTSource * frtSource, FRT_RPCRequest * request)
+ : _frtSource(frtSource), _request(request) {}
+ ~CleanupGuard() {
+ _frtSource->erase(_request);
+ }
+private:
+ FRTSource * _frtSource;
+ FRT_RPCRequest * _request;
+};
+
void
FRTSource::RequestDone(FRT_RPCRequest * request)
{
+ FRTSource::CleanupGuard cleanup(this, request);
if (request->GetErrorCode() == FRTE_RPC_ABORT) {
LOG(debug, "request aborted, stopping");
return;
}
- assert(_currentRequest);
+ std::shared_ptr<FRTConfigRequest> configRequest = find(request);
// If this was error from FRT side and nothing to do with config, notify
// connection about the error.
if (request->IsError()) {
- _currentRequest->setError(request->GetErrorCode());
+ configRequest->setError(request->GetErrorCode());
}
- _agent->handleResponse(*_currentRequest, _currentRequest->createResponse(request));
+ _agent->handleResponse(*configRequest, configRequest->createResponse(request));
LOG(spam, "Calling schedule");
scheduleNextGetConfig();
}
@@ -93,19 +124,25 @@ FRTSource::RequestDone(FRT_RPCRequest * request)
void
FRTSource::close()
{
+ RequestMap inflight;
{
std::lock_guard guard(_lock);
if (_closed)
return;
LOG(spam, "Killing task");
_task->Kill();
+ inflight = _inflight;
}
LOG(spam, "Aborting");
- if (_currentRequest.get() != NULL)
- _currentRequest->abort();
- LOG(spam, "Syncing");
- _connectionFactory->syncTransport();
- _currentRequest.reset(0);
+ for (auto & request : inflight) {
+ request.second->abort();
+ }
+ inflight.clear();
+ LOG(spam, "Waiting");
+ std::unique_lock guard(_lock);
+ while (!_inflight.empty()) {
+ _cond.wait(guard);
+ }
LOG(spam, "closed");
}
diff --git a/config/src/vespa/config/frt/frtsource.h b/config/src/vespa/config/frt/frtsource.h
index 1885aa7e534..ceb0484a6bb 100644
--- a/config/src/vespa/config/frt/frtsource.h
+++ b/config/src/vespa/config/frt/frtsource.h
@@ -27,15 +27,19 @@ public:
void reload(int64_t generation) override;
void getConfig() override;
private:
+ class CleanupGuard;
void scheduleNextGetConfig();
+ void erase(FRT_RPCRequest *);
+ std::shared_ptr<FRTConfigRequest> find(FRT_RPCRequest *);
+ using RequestMap = std::map<FRT_RPCRequest *, std::shared_ptr<FRTConfigRequest>>;
std::shared_ptr<ConnectionFactory> _connectionFactory;
const FRTConfigRequestFactory & _requestFactory;
std::unique_ptr<ConfigAgent> _agent;
- std::unique_ptr<FRTConfigRequest> _currentRequest;
const ConfigKey _key;
-
- std::mutex _lock; // Protects _task and _closed
+ std::mutex _lock; // Protects _inflight, _task and _closed
+ std::condition_variable _cond;
+ RequestMap _inflight;
std::unique_ptr<FNET_Task> _task;
bool _closed;
};