diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2022-12-19 16:58:25 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-12-19 16:58:25 +0100 |
commit | 6764b88cbbbdf3f7c32c93c56d787c2c834da996 (patch) | |
tree | 0dc14d37081897a56c85504a225e0741a704826e | |
parent | cacf23e9edb0875daccdc9413df1d9ddfc4058d9 (diff) | |
parent | 1b144ad08f66639425c2d689f92fa8426eb4968b (diff) |
Merge pull request #25292 from vespa-engine/balder/allow-multiple-requests
Balder/allow multiple requests [run-systemtest]
-rw-r--r-- | config/src/tests/frt/frt.cpp | 85 | ||||
-rw-r--r-- | config/src/vespa/config/frt/frtsource.cpp | 39 | ||||
-rw-r--r-- | config/src/vespa/config/frt/frtsource.h | 7 |
3 files changed, 84 insertions, 47 deletions
diff --git a/config/src/tests/frt/frt.cpp b/config/src/tests/frt/frt.cpp index 3eda573cb27..5cfa125051a 100644 --- a/config/src/tests/frt/frt.cpp +++ b/config/src/tests/frt/frt.cpp @@ -32,6 +32,45 @@ using namespace config::protocol::v3; namespace { +struct Response { + vespalib::string defName; + vespalib::string defMd5; + vespalib::string configId; + vespalib::string configXxhash64; + int changed; + long generation; + StringVector payload; + vespalib::string ns; + void encodeResponse(FRT_RPCRequest * req) const { + FRT_Values & ret = *req->GetReturn(); + + ret.AddString(defName.c_str()); + ret.AddString(""); + ret.AddString(defMd5.c_str()); + ret.AddString(configId.c_str()); + ret.AddString(configXxhash64.c_str()); + ret.AddInt32(changed); + ret.AddInt64(generation); + FRT_StringValue * payload_arr = ret.AddStringArray(payload.size()); + for (uint32_t i = 0; i < payload.size(); i++) { + ret.SetString(&payload_arr[i], payload[i].c_str()); + } + if (!ns.empty()) + ret.AddString(ns.c_str()); + req->SetError(FRTE_NO_ERROR); + } + Response(vespalib::stringref name, vespalib::stringref md5, + vespalib::stringref id, vespalib::stringref hash, + int changed_in=0, long generation_in=0) + : defName(name), + defMd5(md5), + configId(id), + configXxhash64(hash), + changed(changed_in), + generation(generation_in) + {} +}; + struct RPCFixture { std::vector<FRT_RPCRequest *> requests; @@ -47,32 +86,10 @@ namespace { requests.push_back(req); return req; } - FRT_RPCRequest * createOKResponse(const vespalib::string & defName="", - const vespalib::string & defMd5="", - const vespalib::string & configId="", - const vespalib::string & configXxhash64="", - int changed=0, - long generation=0, - const StringVector & payload = StringVector(), - const vespalib::string & ns = "") + FRT_RPCRequest * createOKRequest(const Response & response) { FRT_RPCRequest * req = new FRT_RPCRequest(); - FRT_Values & ret = *req->GetReturn(); - - ret.AddString(defName.c_str()); - ret.AddString(""); - ret.AddString(defMd5.c_str()); - ret.AddString(configId.c_str()); - ret.AddString(configXxhash64.c_str()); - ret.AddInt32(changed); - ret.AddInt64(generation); - FRT_StringValue * payload_arr = ret.AddStringArray(payload.size()); - for (uint32_t i = 0; i < payload.size(); i++) { - ret.SetString(&payload_arr[i], payload[i].c_str()); - } - if (!ns.empty()) - ret.AddString(ns.c_str()); - req->SetError(FRTE_NO_ERROR); + response.encodeResponse(req); requests.push_back(req); return req; } @@ -87,20 +104,23 @@ namespace { struct ConnectionMock : public Connection { int errorCode; duration timeout; - FRT_RPCRequest * ans; + std::unique_ptr<Response> ans; fnet::frt::StandaloneFRT server; FRT_Supervisor & supervisor; FNET_Scheduler scheduler; vespalib::string address; - ConnectionMock(FRT_RPCRequest * answer = nullptr); + ConnectionMock() : ConnectionMock(std::unique_ptr<Response>()) { } + ConnectionMock(std::unique_ptr<Response> answer); ~ConnectionMock(); FRT_RPCRequest * allocRPCRequest() override { return supervisor.AllocRPCRequest(); } void setError(int ec) override { errorCode = ec; } void invoke(FRT_RPCRequest * req, duration t, FRT_IRequestWait * waiter) override { timeout = t; - if (ans != nullptr) - waiter->RequestDone(ans); + if (ans != nullptr) { + ans->encodeResponse(req); + waiter->RequestDone(req); + } else waiter->RequestDone(req); } @@ -108,10 +128,10 @@ namespace { void setTransientDelay(duration delay) override { (void) delay; } }; - ConnectionMock::ConnectionMock(FRT_RPCRequest * answer) + ConnectionMock::ConnectionMock(std::unique_ptr<Response> answer) : errorCode(0), timeout(0ms), - ans(answer), + ans(std::move(answer)), server(), supervisor(server.supervisor()), address() @@ -129,7 +149,6 @@ namespace { void syncTransport() override { } }; - struct AgentResultFixture { bool notified; @@ -175,7 +194,7 @@ namespace { ConfigKey key; SourceFixture() : rpc(), - conn(rpc.createOKResponse("foo", "baz", "4", "boo")), + conn(std::make_unique<Response>("foo", "baz", "4", "boo")), key("foo", "bar", "4", "boo") { } @@ -215,7 +234,7 @@ TEST_F("require that response containing errors does not validate", RPCFixture() } TEST_F("require that response contains all values", RPCFixture()) { - FRTConfigResponseV3 ok(f1.createOKResponse("foo", "baz", "bim", "boo", 12, 15)); + FRTConfigResponseV3 ok(f1.createOKRequest(Response("foo", "baz", "bim", "boo", 12, 15))); ASSERT_FALSE(ok.validateResponse()); ASSERT_FALSE(ok.hasValidResponse()); } diff --git a/config/src/vespa/config/frt/frtsource.cpp b/config/src/vespa/config/frt/frtsource.cpp index 1cb514dcd4d..11839ac0b50 100644 --- a/config/src/vespa/config/frt/frtsource.cpp +++ b/config/src/vespa/config/frt/frtsource.cpp @@ -35,9 +35,9 @@ FRTSource::FRTSource(std::shared_ptr<ConnectionFactory> connectionFactory, const : _connectionFactory(std::move(connectionFactory)), _requestFactory(requestFactory), _agent(std::move(agent)), - _currentRequest(), _key(key), _lock(), + _inflight(), _task(std::make_unique<GetConfigTask>(_connectionFactory->getScheduler(), this)), _closed(false) { @@ -66,8 +66,10 @@ FRTSource::getConfig() std::unique_ptr<FRTConfigRequest> request = _requestFactory.createConfigRequest(_key, connection, state, serverTimeout); FRT_RPCRequest * req = request->getRequest(); - - _currentRequest = std::move(request); + { + std::lock_guard guard(_lock); + _inflight[req] = std::move(request); + } connection->invoke(req, clientTimeout, this); } @@ -79,13 +81,24 @@ FRTSource::RequestDone(FRT_RPCRequest * request) LOG(debug, "request aborted, stopping"); return; } - assert(_currentRequest); + std::shared_ptr<FRTConfigRequest> configRequest; + { + std::lock_guard guard(_lock); + auto found = _inflight.find(request); + assert(found != _inflight.end()); + configRequest = found->second; + } // If this was error from FRT side and nothing to do with config, notify // connection about the error. if (request->IsError()) { - _currentRequest->setError(request->GetErrorCode()); + configRequest->setError(request->GetErrorCode()); + } + _agent->handleResponse(*configRequest, configRequest->createResponse(request)); + { + std::lock_guard guard(_lock); + _inflight.erase(request); + _cond.notify_all(); } - _agent->handleResponse(*_currentRequest, _currentRequest->createResponse(request)); LOG(spam, "Calling schedule"); scheduleNextGetConfig(); } @@ -93,19 +106,23 @@ FRTSource::RequestDone(FRT_RPCRequest * request) void FRTSource::close() { + RequestMap inflight; { std::lock_guard guard(_lock); if (_closed) return; LOG(spam, "Killing task"); _task->Kill(); + inflight = _inflight; } LOG(spam, "Aborting"); - if (_currentRequest.get() != NULL) - _currentRequest->abort(); - LOG(spam, "Syncing"); - _connectionFactory->syncTransport(); - _currentRequest.reset(0); + for (auto & request : inflight) { + std::move(request.second)->abort(); + } + inflight.clear(); + LOG(spam, "Waiting"); + std::unique_lock guard(_lock); + while (!_inflight.empty()) _cond.wait(guard); LOG(spam, "closed"); } diff --git a/config/src/vespa/config/frt/frtsource.h b/config/src/vespa/config/frt/frtsource.h index 1885aa7e534..104b7318d8d 100644 --- a/config/src/vespa/config/frt/frtsource.h +++ b/config/src/vespa/config/frt/frtsource.h @@ -29,13 +29,14 @@ public: private: void scheduleNextGetConfig(); + using RequestMap = std::map<FRT_RPCRequest *, std::shared_ptr<FRTConfigRequest>>; std::shared_ptr<ConnectionFactory> _connectionFactory; const FRTConfigRequestFactory & _requestFactory; std::unique_ptr<ConfigAgent> _agent; - std::unique_ptr<FRTConfigRequest> _currentRequest; const ConfigKey _key; - - std::mutex _lock; // Protects _task and _closed + std::mutex _lock; // Protects _inflight, _task and _closed + std::condition_variable _cond; + RequestMap _inflight; std::unique_ptr<FNET_Task> _task; bool _closed; }; |