diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2022-12-19 18:16:49 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-12-19 18:16:49 +0100 |
commit | 05b58ac83b06b00ae97ecafad101e44d4dd76aee (patch) | |
tree | b0de117397a15b57bf5c0b31ef389865677a732a | |
parent | 6764b88cbbbdf3f7c32c93c56d787c2c834da996 (diff) | |
parent | a0a6893c5b965ce93583d18afa308d57e0d71f9e (diff) |
Merge pull request #25296 from vespa-engine/revert-25292-balder/allow-multiple-requestsv8.102.12
Revert "Balder/allow multiple requests [run-systemtest]"
-rw-r--r-- | config/src/tests/frt/frt.cpp | 85 | ||||
-rw-r--r-- | config/src/vespa/config/frt/frtsource.cpp | 39 | ||||
-rw-r--r-- | config/src/vespa/config/frt/frtsource.h | 7 |
3 files changed, 47 insertions, 84 deletions
diff --git a/config/src/tests/frt/frt.cpp b/config/src/tests/frt/frt.cpp index 5cfa125051a..3eda573cb27 100644 --- a/config/src/tests/frt/frt.cpp +++ b/config/src/tests/frt/frt.cpp @@ -32,45 +32,6 @@ using namespace config::protocol::v3; namespace { -struct Response { - vespalib::string defName; - vespalib::string defMd5; - vespalib::string configId; - vespalib::string configXxhash64; - int changed; - long generation; - StringVector payload; - vespalib::string ns; - void encodeResponse(FRT_RPCRequest * req) const { - FRT_Values & ret = *req->GetReturn(); - - ret.AddString(defName.c_str()); - ret.AddString(""); - ret.AddString(defMd5.c_str()); - ret.AddString(configId.c_str()); - ret.AddString(configXxhash64.c_str()); - ret.AddInt32(changed); - ret.AddInt64(generation); - FRT_StringValue * payload_arr = ret.AddStringArray(payload.size()); - for (uint32_t i = 0; i < payload.size(); i++) { - ret.SetString(&payload_arr[i], payload[i].c_str()); - } - if (!ns.empty()) - ret.AddString(ns.c_str()); - req->SetError(FRTE_NO_ERROR); - } - Response(vespalib::stringref name, vespalib::stringref md5, - vespalib::stringref id, vespalib::stringref hash, - int changed_in=0, long generation_in=0) - : defName(name), - defMd5(md5), - configId(id), - configXxhash64(hash), - changed(changed_in), - generation(generation_in) - {} -}; - struct RPCFixture { std::vector<FRT_RPCRequest *> requests; @@ -86,10 +47,32 @@ struct Response { requests.push_back(req); return req; } - FRT_RPCRequest * createOKRequest(const Response & response) + FRT_RPCRequest * createOKResponse(const vespalib::string & defName="", + const vespalib::string & defMd5="", + const vespalib::string & configId="", + const vespalib::string & configXxhash64="", + int changed=0, + long generation=0, + const StringVector & payload = StringVector(), + const vespalib::string & ns = "") { FRT_RPCRequest * req = new FRT_RPCRequest(); - response.encodeResponse(req); + FRT_Values & ret = *req->GetReturn(); + + ret.AddString(defName.c_str()); + ret.AddString(""); + ret.AddString(defMd5.c_str()); + ret.AddString(configId.c_str()); + ret.AddString(configXxhash64.c_str()); + ret.AddInt32(changed); + ret.AddInt64(generation); + FRT_StringValue * payload_arr = ret.AddStringArray(payload.size()); + for (uint32_t i = 0; i < payload.size(); i++) { + ret.SetString(&payload_arr[i], payload[i].c_str()); + } + if (!ns.empty()) + ret.AddString(ns.c_str()); + req->SetError(FRTE_NO_ERROR); requests.push_back(req); return req; } @@ -104,23 +87,20 @@ struct Response { struct ConnectionMock : public Connection { int errorCode; duration timeout; - std::unique_ptr<Response> ans; + FRT_RPCRequest * ans; fnet::frt::StandaloneFRT server; FRT_Supervisor & supervisor; FNET_Scheduler scheduler; vespalib::string address; - ConnectionMock() : ConnectionMock(std::unique_ptr<Response>()) { } - ConnectionMock(std::unique_ptr<Response> answer); + ConnectionMock(FRT_RPCRequest * answer = nullptr); ~ConnectionMock(); FRT_RPCRequest * allocRPCRequest() override { return supervisor.AllocRPCRequest(); } void setError(int ec) override { errorCode = ec; } void invoke(FRT_RPCRequest * req, duration t, FRT_IRequestWait * waiter) override { timeout = t; - if (ans != nullptr) { - ans->encodeResponse(req); - waiter->RequestDone(req); - } + if (ans != nullptr) + waiter->RequestDone(ans); else waiter->RequestDone(req); } @@ -128,10 +108,10 @@ struct Response { void setTransientDelay(duration delay) override { (void) delay; } }; - ConnectionMock::ConnectionMock(std::unique_ptr<Response> answer) + ConnectionMock::ConnectionMock(FRT_RPCRequest * answer) : errorCode(0), timeout(0ms), - ans(std::move(answer)), + ans(answer), server(), supervisor(server.supervisor()), address() @@ -149,6 +129,7 @@ struct Response { void syncTransport() override { } }; + struct AgentResultFixture { bool notified; @@ -194,7 +175,7 @@ struct Response { ConfigKey key; SourceFixture() : rpc(), - conn(std::make_unique<Response>("foo", "baz", "4", "boo")), + conn(rpc.createOKResponse("foo", "baz", "4", "boo")), key("foo", "bar", "4", "boo") { } @@ -234,7 +215,7 @@ TEST_F("require that response containing errors does not validate", RPCFixture() } TEST_F("require that response contains all values", RPCFixture()) { - FRTConfigResponseV3 ok(f1.createOKRequest(Response("foo", "baz", "bim", "boo", 12, 15))); + FRTConfigResponseV3 ok(f1.createOKResponse("foo", "baz", "bim", "boo", 12, 15)); ASSERT_FALSE(ok.validateResponse()); ASSERT_FALSE(ok.hasValidResponse()); } diff --git a/config/src/vespa/config/frt/frtsource.cpp b/config/src/vespa/config/frt/frtsource.cpp index 11839ac0b50..1cb514dcd4d 100644 --- a/config/src/vespa/config/frt/frtsource.cpp +++ b/config/src/vespa/config/frt/frtsource.cpp @@ -35,9 +35,9 @@ FRTSource::FRTSource(std::shared_ptr<ConnectionFactory> connectionFactory, const : _connectionFactory(std::move(connectionFactory)), _requestFactory(requestFactory), _agent(std::move(agent)), + _currentRequest(), _key(key), _lock(), - _inflight(), _task(std::make_unique<GetConfigTask>(_connectionFactory->getScheduler(), this)), _closed(false) { @@ -66,10 +66,8 @@ FRTSource::getConfig() std::unique_ptr<FRTConfigRequest> request = _requestFactory.createConfigRequest(_key, connection, state, serverTimeout); FRT_RPCRequest * req = request->getRequest(); - { - std::lock_guard guard(_lock); - _inflight[req] = std::move(request); - } + + _currentRequest = std::move(request); connection->invoke(req, clientTimeout, this); } @@ -81,24 +79,13 @@ FRTSource::RequestDone(FRT_RPCRequest * request) LOG(debug, "request aborted, stopping"); return; } - std::shared_ptr<FRTConfigRequest> configRequest; - { - std::lock_guard guard(_lock); - auto found = _inflight.find(request); - assert(found != _inflight.end()); - configRequest = found->second; - } + assert(_currentRequest); // If this was error from FRT side and nothing to do with config, notify // connection about the error. if (request->IsError()) { - configRequest->setError(request->GetErrorCode()); - } - _agent->handleResponse(*configRequest, configRequest->createResponse(request)); - { - std::lock_guard guard(_lock); - _inflight.erase(request); - _cond.notify_all(); + _currentRequest->setError(request->GetErrorCode()); } + _agent->handleResponse(*_currentRequest, _currentRequest->createResponse(request)); LOG(spam, "Calling schedule"); scheduleNextGetConfig(); } @@ -106,23 +93,19 @@ FRTSource::RequestDone(FRT_RPCRequest * request) void FRTSource::close() { - RequestMap inflight; { std::lock_guard guard(_lock); if (_closed) return; LOG(spam, "Killing task"); _task->Kill(); - inflight = _inflight; } LOG(spam, "Aborting"); - for (auto & request : inflight) { - std::move(request.second)->abort(); - } - inflight.clear(); - LOG(spam, "Waiting"); - std::unique_lock guard(_lock); - while (!_inflight.empty()) _cond.wait(guard); + if (_currentRequest.get() != NULL) + _currentRequest->abort(); + LOG(spam, "Syncing"); + _connectionFactory->syncTransport(); + _currentRequest.reset(0); LOG(spam, "closed"); } diff --git a/config/src/vespa/config/frt/frtsource.h b/config/src/vespa/config/frt/frtsource.h index 104b7318d8d..1885aa7e534 100644 --- a/config/src/vespa/config/frt/frtsource.h +++ b/config/src/vespa/config/frt/frtsource.h @@ -29,14 +29,13 @@ public: private: void scheduleNextGetConfig(); - using RequestMap = std::map<FRT_RPCRequest *, std::shared_ptr<FRTConfigRequest>>; std::shared_ptr<ConnectionFactory> _connectionFactory; const FRTConfigRequestFactory & _requestFactory; std::unique_ptr<ConfigAgent> _agent; + std::unique_ptr<FRTConfigRequest> _currentRequest; const ConfigKey _key; - std::mutex _lock; // Protects _inflight, _task and _closed - std::condition_variable _cond; - RequestMap _inflight; + + std::mutex _lock; // Protects _task and _closed std::unique_ptr<FNET_Task> _task; bool _closed; }; |