From a0a6893c5b965ce93583d18afa308d57e0d71f9e Mon Sep 17 00:00:00 2001 From: Henning Baldersheim Date: Mon, 19 Dec 2022 18:14:59 +0100 Subject: Revert "Balder/allow multiple requests [run-systemtest]" --- config/src/tests/frt/frt.cpp | 85 ++++++++++++------------------- config/src/vespa/config/frt/frtsource.cpp | 39 ++++---------- config/src/vespa/config/frt/frtsource.h | 7 ++- 3 files changed, 47 insertions(+), 84 deletions(-) diff --git a/config/src/tests/frt/frt.cpp b/config/src/tests/frt/frt.cpp index 5cfa125051a..3eda573cb27 100644 --- a/config/src/tests/frt/frt.cpp +++ b/config/src/tests/frt/frt.cpp @@ -32,45 +32,6 @@ using namespace config::protocol::v3; namespace { -struct Response { - vespalib::string defName; - vespalib::string defMd5; - vespalib::string configId; - vespalib::string configXxhash64; - int changed; - long generation; - StringVector payload; - vespalib::string ns; - void encodeResponse(FRT_RPCRequest * req) const { - FRT_Values & ret = *req->GetReturn(); - - ret.AddString(defName.c_str()); - ret.AddString(""); - ret.AddString(defMd5.c_str()); - ret.AddString(configId.c_str()); - ret.AddString(configXxhash64.c_str()); - ret.AddInt32(changed); - ret.AddInt64(generation); - FRT_StringValue * payload_arr = ret.AddStringArray(payload.size()); - for (uint32_t i = 0; i < payload.size(); i++) { - ret.SetString(&payload_arr[i], payload[i].c_str()); - } - if (!ns.empty()) - ret.AddString(ns.c_str()); - req->SetError(FRTE_NO_ERROR); - } - Response(vespalib::stringref name, vespalib::stringref md5, - vespalib::stringref id, vespalib::stringref hash, - int changed_in=0, long generation_in=0) - : defName(name), - defMd5(md5), - configId(id), - configXxhash64(hash), - changed(changed_in), - generation(generation_in) - {} -}; - struct RPCFixture { std::vector requests; @@ -86,10 +47,32 @@ struct Response { requests.push_back(req); return req; } - FRT_RPCRequest * createOKRequest(const Response & response) + FRT_RPCRequest * createOKResponse(const vespalib::string & defName="", + const vespalib::string & defMd5="", + const vespalib::string & configId="", + const vespalib::string & configXxhash64="", + int changed=0, + long generation=0, + const StringVector & payload = StringVector(), + const vespalib::string & ns = "") { FRT_RPCRequest * req = new FRT_RPCRequest(); - response.encodeResponse(req); + FRT_Values & ret = *req->GetReturn(); + + ret.AddString(defName.c_str()); + ret.AddString(""); + ret.AddString(defMd5.c_str()); + ret.AddString(configId.c_str()); + ret.AddString(configXxhash64.c_str()); + ret.AddInt32(changed); + ret.AddInt64(generation); + FRT_StringValue * payload_arr = ret.AddStringArray(payload.size()); + for (uint32_t i = 0; i < payload.size(); i++) { + ret.SetString(&payload_arr[i], payload[i].c_str()); + } + if (!ns.empty()) + ret.AddString(ns.c_str()); + req->SetError(FRTE_NO_ERROR); requests.push_back(req); return req; } @@ -104,23 +87,20 @@ struct Response { struct ConnectionMock : public Connection { int errorCode; duration timeout; - std::unique_ptr ans; + FRT_RPCRequest * ans; fnet::frt::StandaloneFRT server; FRT_Supervisor & supervisor; FNET_Scheduler scheduler; vespalib::string address; - ConnectionMock() : ConnectionMock(std::unique_ptr()) { } - ConnectionMock(std::unique_ptr answer); + ConnectionMock(FRT_RPCRequest * answer = nullptr); ~ConnectionMock(); FRT_RPCRequest * allocRPCRequest() override { return supervisor.AllocRPCRequest(); } void setError(int ec) override { errorCode = ec; } void invoke(FRT_RPCRequest * req, duration t, FRT_IRequestWait * waiter) override { timeout = t; - if (ans != nullptr) { - ans->encodeResponse(req); - waiter->RequestDone(req); - } + if (ans != nullptr) + waiter->RequestDone(ans); else waiter->RequestDone(req); } @@ -128,10 +108,10 @@ struct Response { void setTransientDelay(duration delay) override { (void) delay; } }; - ConnectionMock::ConnectionMock(std::unique_ptr answer) + ConnectionMock::ConnectionMock(FRT_RPCRequest * answer) : errorCode(0), timeout(0ms), - ans(std::move(answer)), + ans(answer), server(), supervisor(server.supervisor()), address() @@ -149,6 +129,7 @@ struct Response { void syncTransport() override { } }; + struct AgentResultFixture { bool notified; @@ -194,7 +175,7 @@ struct Response { ConfigKey key; SourceFixture() : rpc(), - conn(std::make_unique("foo", "baz", "4", "boo")), + conn(rpc.createOKResponse("foo", "baz", "4", "boo")), key("foo", "bar", "4", "boo") { } @@ -234,7 +215,7 @@ TEST_F("require that response containing errors does not validate", RPCFixture() } TEST_F("require that response contains all values", RPCFixture()) { - FRTConfigResponseV3 ok(f1.createOKRequest(Response("foo", "baz", "bim", "boo", 12, 15))); + FRTConfigResponseV3 ok(f1.createOKResponse("foo", "baz", "bim", "boo", 12, 15)); ASSERT_FALSE(ok.validateResponse()); ASSERT_FALSE(ok.hasValidResponse()); } diff --git a/config/src/vespa/config/frt/frtsource.cpp b/config/src/vespa/config/frt/frtsource.cpp index 11839ac0b50..1cb514dcd4d 100644 --- a/config/src/vespa/config/frt/frtsource.cpp +++ b/config/src/vespa/config/frt/frtsource.cpp @@ -35,9 +35,9 @@ FRTSource::FRTSource(std::shared_ptr connectionFactory, const : _connectionFactory(std::move(connectionFactory)), _requestFactory(requestFactory), _agent(std::move(agent)), + _currentRequest(), _key(key), _lock(), - _inflight(), _task(std::make_unique(_connectionFactory->getScheduler(), this)), _closed(false) { @@ -66,10 +66,8 @@ FRTSource::getConfig() std::unique_ptr request = _requestFactory.createConfigRequest(_key, connection, state, serverTimeout); FRT_RPCRequest * req = request->getRequest(); - { - std::lock_guard guard(_lock); - _inflight[req] = std::move(request); - } + + _currentRequest = std::move(request); connection->invoke(req, clientTimeout, this); } @@ -81,24 +79,13 @@ FRTSource::RequestDone(FRT_RPCRequest * request) LOG(debug, "request aborted, stopping"); return; } - std::shared_ptr configRequest; - { - std::lock_guard guard(_lock); - auto found = _inflight.find(request); - assert(found != _inflight.end()); - configRequest = found->second; - } + assert(_currentRequest); // If this was error from FRT side and nothing to do with config, notify // connection about the error. if (request->IsError()) { - configRequest->setError(request->GetErrorCode()); - } - _agent->handleResponse(*configRequest, configRequest->createResponse(request)); - { - std::lock_guard guard(_lock); - _inflight.erase(request); - _cond.notify_all(); + _currentRequest->setError(request->GetErrorCode()); } + _agent->handleResponse(*_currentRequest, _currentRequest->createResponse(request)); LOG(spam, "Calling schedule"); scheduleNextGetConfig(); } @@ -106,23 +93,19 @@ FRTSource::RequestDone(FRT_RPCRequest * request) void FRTSource::close() { - RequestMap inflight; { std::lock_guard guard(_lock); if (_closed) return; LOG(spam, "Killing task"); _task->Kill(); - inflight = _inflight; } LOG(spam, "Aborting"); - for (auto & request : inflight) { - std::move(request.second)->abort(); - } - inflight.clear(); - LOG(spam, "Waiting"); - std::unique_lock guard(_lock); - while (!_inflight.empty()) _cond.wait(guard); + if (_currentRequest.get() != NULL) + _currentRequest->abort(); + LOG(spam, "Syncing"); + _connectionFactory->syncTransport(); + _currentRequest.reset(0); LOG(spam, "closed"); } diff --git a/config/src/vespa/config/frt/frtsource.h b/config/src/vespa/config/frt/frtsource.h index 104b7318d8d..1885aa7e534 100644 --- a/config/src/vespa/config/frt/frtsource.h +++ b/config/src/vespa/config/frt/frtsource.h @@ -29,14 +29,13 @@ public: private: void scheduleNextGetConfig(); - using RequestMap = std::map>; std::shared_ptr _connectionFactory; const FRTConfigRequestFactory & _requestFactory; std::unique_ptr _agent; + std::unique_ptr _currentRequest; const ConfigKey _key; - std::mutex _lock; // Protects _inflight, _task and _closed - std::condition_variable _cond; - RequestMap _inflight; + + std::mutex _lock; // Protects _task and _closed std::unique_ptr _task; bool _closed; }; -- cgit v1.2.3