aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2022-12-19 18:16:49 +0100
committerGitHub <noreply@github.com>2022-12-19 18:16:49 +0100
commit05b58ac83b06b00ae97ecafad101e44d4dd76aee (patch)
treeb0de117397a15b57bf5c0b31ef389865677a732a
parent6764b88cbbbdf3f7c32c93c56d787c2c834da996 (diff)
parenta0a6893c5b965ce93583d18afa308d57e0d71f9e (diff)
Merge pull request #25296 from vespa-engine/revert-25292-balder/allow-multiple-requestsv8.102.12
Revert "Balder/allow multiple requests [run-systemtest]"
-rw-r--r--config/src/tests/frt/frt.cpp85
-rw-r--r--config/src/vespa/config/frt/frtsource.cpp39
-rw-r--r--config/src/vespa/config/frt/frtsource.h7
3 files changed, 47 insertions, 84 deletions
diff --git a/config/src/tests/frt/frt.cpp b/config/src/tests/frt/frt.cpp
index 5cfa125051a..3eda573cb27 100644
--- a/config/src/tests/frt/frt.cpp
+++ b/config/src/tests/frt/frt.cpp
@@ -32,45 +32,6 @@ using namespace config::protocol::v3;
namespace {
-struct Response {
- vespalib::string defName;
- vespalib::string defMd5;
- vespalib::string configId;
- vespalib::string configXxhash64;
- int changed;
- long generation;
- StringVector payload;
- vespalib::string ns;
- void encodeResponse(FRT_RPCRequest * req) const {
- FRT_Values & ret = *req->GetReturn();
-
- ret.AddString(defName.c_str());
- ret.AddString("");
- ret.AddString(defMd5.c_str());
- ret.AddString(configId.c_str());
- ret.AddString(configXxhash64.c_str());
- ret.AddInt32(changed);
- ret.AddInt64(generation);
- FRT_StringValue * payload_arr = ret.AddStringArray(payload.size());
- for (uint32_t i = 0; i < payload.size(); i++) {
- ret.SetString(&payload_arr[i], payload[i].c_str());
- }
- if (!ns.empty())
- ret.AddString(ns.c_str());
- req->SetError(FRTE_NO_ERROR);
- }
- Response(vespalib::stringref name, vespalib::stringref md5,
- vespalib::stringref id, vespalib::stringref hash,
- int changed_in=0, long generation_in=0)
- : defName(name),
- defMd5(md5),
- configId(id),
- configXxhash64(hash),
- changed(changed_in),
- generation(generation_in)
- {}
-};
-
struct RPCFixture
{
std::vector<FRT_RPCRequest *> requests;
@@ -86,10 +47,32 @@ struct Response {
requests.push_back(req);
return req;
}
- FRT_RPCRequest * createOKRequest(const Response & response)
+ FRT_RPCRequest * createOKResponse(const vespalib::string & defName="",
+ const vespalib::string & defMd5="",
+ const vespalib::string & configId="",
+ const vespalib::string & configXxhash64="",
+ int changed=0,
+ long generation=0,
+ const StringVector & payload = StringVector(),
+ const vespalib::string & ns = "")
{
FRT_RPCRequest * req = new FRT_RPCRequest();
- response.encodeResponse(req);
+ FRT_Values & ret = *req->GetReturn();
+
+ ret.AddString(defName.c_str());
+ ret.AddString("");
+ ret.AddString(defMd5.c_str());
+ ret.AddString(configId.c_str());
+ ret.AddString(configXxhash64.c_str());
+ ret.AddInt32(changed);
+ ret.AddInt64(generation);
+ FRT_StringValue * payload_arr = ret.AddStringArray(payload.size());
+ for (uint32_t i = 0; i < payload.size(); i++) {
+ ret.SetString(&payload_arr[i], payload[i].c_str());
+ }
+ if (!ns.empty())
+ ret.AddString(ns.c_str());
+ req->SetError(FRTE_NO_ERROR);
requests.push_back(req);
return req;
}
@@ -104,23 +87,20 @@ struct Response {
struct ConnectionMock : public Connection {
int errorCode;
duration timeout;
- std::unique_ptr<Response> ans;
+ FRT_RPCRequest * ans;
fnet::frt::StandaloneFRT server;
FRT_Supervisor & supervisor;
FNET_Scheduler scheduler;
vespalib::string address;
- ConnectionMock() : ConnectionMock(std::unique_ptr<Response>()) { }
- ConnectionMock(std::unique_ptr<Response> answer);
+ ConnectionMock(FRT_RPCRequest * answer = nullptr);
~ConnectionMock();
FRT_RPCRequest * allocRPCRequest() override { return supervisor.AllocRPCRequest(); }
void setError(int ec) override { errorCode = ec; }
void invoke(FRT_RPCRequest * req, duration t, FRT_IRequestWait * waiter) override
{
timeout = t;
- if (ans != nullptr) {
- ans->encodeResponse(req);
- waiter->RequestDone(req);
- }
+ if (ans != nullptr)
+ waiter->RequestDone(ans);
else
waiter->RequestDone(req);
}
@@ -128,10 +108,10 @@ struct Response {
void setTransientDelay(duration delay) override { (void) delay; }
};
- ConnectionMock::ConnectionMock(std::unique_ptr<Response> answer)
+ ConnectionMock::ConnectionMock(FRT_RPCRequest * answer)
: errorCode(0),
timeout(0ms),
- ans(std::move(answer)),
+ ans(answer),
server(),
supervisor(server.supervisor()),
address()
@@ -149,6 +129,7 @@ struct Response {
void syncTransport() override { }
};
+
struct AgentResultFixture
{
bool notified;
@@ -194,7 +175,7 @@ struct Response {
ConfigKey key;
SourceFixture()
: rpc(),
- conn(std::make_unique<Response>("foo", "baz", "4", "boo")),
+ conn(rpc.createOKResponse("foo", "baz", "4", "boo")),
key("foo", "bar", "4", "boo")
{ }
@@ -234,7 +215,7 @@ TEST_F("require that response containing errors does not validate", RPCFixture()
}
TEST_F("require that response contains all values", RPCFixture()) {
- FRTConfigResponseV3 ok(f1.createOKRequest(Response("foo", "baz", "bim", "boo", 12, 15)));
+ FRTConfigResponseV3 ok(f1.createOKResponse("foo", "baz", "bim", "boo", 12, 15));
ASSERT_FALSE(ok.validateResponse());
ASSERT_FALSE(ok.hasValidResponse());
}
diff --git a/config/src/vespa/config/frt/frtsource.cpp b/config/src/vespa/config/frt/frtsource.cpp
index 11839ac0b50..1cb514dcd4d 100644
--- a/config/src/vespa/config/frt/frtsource.cpp
+++ b/config/src/vespa/config/frt/frtsource.cpp
@@ -35,9 +35,9 @@ FRTSource::FRTSource(std::shared_ptr<ConnectionFactory> connectionFactory, const
: _connectionFactory(std::move(connectionFactory)),
_requestFactory(requestFactory),
_agent(std::move(agent)),
+ _currentRequest(),
_key(key),
_lock(),
- _inflight(),
_task(std::make_unique<GetConfigTask>(_connectionFactory->getScheduler(), this)),
_closed(false)
{
@@ -66,10 +66,8 @@ FRTSource::getConfig()
std::unique_ptr<FRTConfigRequest> request = _requestFactory.createConfigRequest(_key, connection, state, serverTimeout);
FRT_RPCRequest * req = request->getRequest();
- {
- std::lock_guard guard(_lock);
- _inflight[req] = std::move(request);
- }
+
+ _currentRequest = std::move(request);
connection->invoke(req, clientTimeout, this);
}
@@ -81,24 +79,13 @@ FRTSource::RequestDone(FRT_RPCRequest * request)
LOG(debug, "request aborted, stopping");
return;
}
- std::shared_ptr<FRTConfigRequest> configRequest;
- {
- std::lock_guard guard(_lock);
- auto found = _inflight.find(request);
- assert(found != _inflight.end());
- configRequest = found->second;
- }
+ assert(_currentRequest);
// If this was error from FRT side and nothing to do with config, notify
// connection about the error.
if (request->IsError()) {
- configRequest->setError(request->GetErrorCode());
- }
- _agent->handleResponse(*configRequest, configRequest->createResponse(request));
- {
- std::lock_guard guard(_lock);
- _inflight.erase(request);
- _cond.notify_all();
+ _currentRequest->setError(request->GetErrorCode());
}
+ _agent->handleResponse(*_currentRequest, _currentRequest->createResponse(request));
LOG(spam, "Calling schedule");
scheduleNextGetConfig();
}
@@ -106,23 +93,19 @@ FRTSource::RequestDone(FRT_RPCRequest * request)
void
FRTSource::close()
{
- RequestMap inflight;
{
std::lock_guard guard(_lock);
if (_closed)
return;
LOG(spam, "Killing task");
_task->Kill();
- inflight = _inflight;
}
LOG(spam, "Aborting");
- for (auto & request : inflight) {
- std::move(request.second)->abort();
- }
- inflight.clear();
- LOG(spam, "Waiting");
- std::unique_lock guard(_lock);
- while (!_inflight.empty()) _cond.wait(guard);
+ if (_currentRequest.get() != NULL)
+ _currentRequest->abort();
+ LOG(spam, "Syncing");
+ _connectionFactory->syncTransport();
+ _currentRequest.reset(0);
LOG(spam, "closed");
}
diff --git a/config/src/vespa/config/frt/frtsource.h b/config/src/vespa/config/frt/frtsource.h
index 104b7318d8d..1885aa7e534 100644
--- a/config/src/vespa/config/frt/frtsource.h
+++ b/config/src/vespa/config/frt/frtsource.h
@@ -29,14 +29,13 @@ public:
private:
void scheduleNextGetConfig();
- using RequestMap = std::map<FRT_RPCRequest *, std::shared_ptr<FRTConfigRequest>>;
std::shared_ptr<ConnectionFactory> _connectionFactory;
const FRTConfigRequestFactory & _requestFactory;
std::unique_ptr<ConfigAgent> _agent;
+ std::unique_ptr<FRTConfigRequest> _currentRequest;
const ConfigKey _key;
- std::mutex _lock; // Protects _inflight, _task and _closed
- std::condition_variable _cond;
- RequestMap _inflight;
+
+ std::mutex _lock; // Protects _task and _closed
std::unique_ptr<FNET_Task> _task;
bool _closed;
};