aboutsummaryrefslogtreecommitdiffstats
path: root/searchlib
diff options
context:
space:
mode:
authorTor Egge <Tor.Egge@broadpark.no>2021-01-08 02:19:59 +0100
committerTor Egge <Tor.Egge@broadpark.no>2021-01-08 09:51:17 +0100
commit06670bd70b273e99574872eea1c6b523cf94cbd5 (patch)
tree7cb54d444c64b184bffd1629e8f1565352646aa9 /searchlib
parentaff5ea6fe2fe923f720b08d2bf8a5fc4ed35f13c (diff)
Stop using fnet scheduler for handling of tls domainSync rpc.
Diffstat (limited to 'searchlib')
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/domain.cpp13
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/domain.h3
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp37
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/translogserver.h2
4 files changed, 35 insertions, 20 deletions
diff --git a/searchlib/src/vespa/searchlib/transactionlog/domain.cpp b/searchlib/src/vespa/searchlib/transactionlog/domain.cpp
index 9f1e6bde06b..8dcca2c7b89 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/domain.cpp
+++ b/searchlib/src/vespa/searchlib/transactionlog/domain.cpp
@@ -45,6 +45,7 @@ Domain::Domain(const string &domainName, const string & baseDir, Executor & exec
_sessionId(1),
_syncMonitor(),
_pendingSync(false),
+ _done_sync_tasks(),
_name(domainName),
_parts(),
_lock(),
@@ -206,13 +207,16 @@ Domain::getSynced() const
void
-Domain::triggerSyncNow()
+Domain::triggerSyncNow(std::unique_ptr<vespalib::Executor::Task> done_sync_task)
{
{
std::unique_lock guard(_currentChunkMonitor);
commitAndTransferResponses(guard);
}
std::unique_lock guard(_syncMonitor);
+ if (done_sync_task) {
+ _done_sync_tasks.push_back(std::move(done_sync_task));
+ }
if (!_pendingSync) {
_pendingSync = true;
_executor.execute(makeLambdaTask([this, domainPart= getActivePart()]() {
@@ -220,6 +224,11 @@ Domain::triggerSyncNow()
std::lock_guard monitorGuard(_syncMonitor);
_pendingSync = false;
_syncCond.notify_all();
+ for (auto &task : _done_sync_tasks) {
+ auto failed_task = _executor.execute(std::move(task));
+ assert(!failed_task);
+ }
+ _done_sync_tasks.clear();
}));
}
}
@@ -316,7 +325,7 @@ Domain::optionallyRotateFile(SerialNum serialNum) {
DomainPart::SP dp = getActivePart();
if (dp->byteSize() > _config.getPartSizeLimit()) {
waitPendingSync(_syncMonitor, _syncCond, _pendingSync);
- triggerSyncNow();
+ triggerSyncNow({});
waitPendingSync(_syncMonitor, _syncCond, _pendingSync);
dp->close();
dp = std::make_shared<DomainPart>(_name, dir(), serialNum, _config.getEncoding(),
diff --git a/searchlib/src/vespa/searchlib/transactionlog/domain.h b/searchlib/src/vespa/searchlib/transactionlog/domain.h
index c9eb6385b15..5a80758dd0b 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/domain.h
+++ b/searchlib/src/vespa/searchlib/transactionlog/domain.h
@@ -36,7 +36,7 @@ public:
SerialNum begin() const;
SerialNum end() const;
SerialNum getSynced() const;
- void triggerSyncNow();
+ void triggerSyncNow(std::unique_ptr<vespalib::Executor::Task> done_sync_task);
bool getMarkedDeleted() const { return _markedDeleted; }
void markDeleted() { _markedDeleted = true; }
@@ -92,6 +92,7 @@ private:
std::mutex _syncMonitor;
std::condition_variable _syncCond;
bool _pendingSync;
+ std::vector<std::unique_ptr<vespalib::Executor::Task>> _done_sync_tasks;
vespalib::string _name;
DomainPartList _parts;
mutable std::mutex _lock;
diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp b/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp
index 0c0c9186e12..376de530c6e 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp
+++ b/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp
@@ -6,6 +6,7 @@
#include <vespa/vespalib/util/stringfmt.h>
#include <vespa/vespalib/io/fileutil.h>
#include <vespa/vespalib/util/exceptions.h>
+#include <vespa/vespalib/util/lambdatask.h>
#include <vespa/fnet/frt/supervisor.h>
#include <vespa/fnet/frt/rpcrequest.h>
#include <vespa/fnet/task.h>
@@ -28,24 +29,25 @@ namespace search::transactionlog {
namespace {
-class SyncHandler : public FNET_Task
+class SyncHandler : public std::enable_shared_from_this<SyncHandler>
{
+ std::atomic<bool> & _closed;
FRT_RPCRequest & _req;
Domain::SP _domain;
TransLogServer::Session::SP _session;
SerialNum _syncTo;
public:
- SyncHandler(FRT_Supervisor *supervisor, FRT_RPCRequest *req, const Domain::SP &domain,
- const TransLogServer::Session::SP &session, SerialNum syncTo);
+ SyncHandler(std::atomic<bool>& closed, FRT_RPCRequest *req, const Domain::SP &domain,
+ const TransLogServer::Session::SP &session, SerialNum syncTo) noexcept;
- ~SyncHandler() override;
- void PerformTask() override;
+ ~SyncHandler();
+ void poll();
};
-SyncHandler::SyncHandler(FRT_Supervisor *supervisor, FRT_RPCRequest *req, const Domain::SP &domain,
- const TransLogServer::Session::SP &session, SerialNum syncTo)
- : FNET_Task(supervisor->GetScheduler()),
+SyncHandler::SyncHandler(std::atomic<bool>& closed, FRT_RPCRequest *req, const Domain::SP &domain,
+ const TransLogServer::Session::SP &session, SerialNum syncTo) noexcept
+ : _closed(closed),
_req(*req),
_domain(domain),
_session(session),
@@ -56,20 +58,19 @@ SyncHandler::SyncHandler(FRT_Supervisor *supervisor, FRT_RPCRequest *req, const
SyncHandler::~SyncHandler() = default;
void
-SyncHandler::PerformTask()
+SyncHandler::poll()
{
SerialNum synced(_domain->getSynced());
if (_session->getDown() ||
_domain->getMarkedDeleted() ||
+ _closed.load(std::memory_order_acquire) ||
synced >= _syncTo) {
FRT_Values &rvals = *_req.GetReturn();
rvals.AddInt32(0);
rvals.AddInt64(synced);
_req.Return();
- delete this;
} else {
- _domain->triggerSyncNow();
- Schedule(0.05); // Retry in 0.05 seconds
+ _domain->triggerSyncNow(vespalib::makeLambdaTask([self = shared_from_this()]() { self->poll(); }));
}
}
@@ -101,7 +102,8 @@ TransLogServer::TransLogServer(const vespalib::string &name, int listenPort, con
_supervisor(std::make_unique<FRT_Supervisor>(_transport.get())),
_domains(),
_reqQ(),
- _fileHeaderContext(fileHeaderContext)
+ _fileHeaderContext(fileHeaderContext),
+ _closed(false)
{
int retval(0);
if ((retval = makeDirectory(_baseDir.c_str())) == 0) {
@@ -146,8 +148,10 @@ TransLogServer::TransLogServer(const vespalib::string &name, int listenPort, con
TransLogServer::~TransLogServer()
{
+ _closed = true;
stop();
join();
+ _executor.sync();
_executor.shutdown();
_executor.sync();
_transport->ShutDown(true);
@@ -719,10 +723,9 @@ TransLogServer::domainSync(FRT_RPCRequest *req)
req->Return();
return;
}
-
- SyncHandler *syncHandler = new SyncHandler(_supervisor.get(), req, domain, session, syncTo);
-
- syncHandler->ScheduleNow();
+
+ auto syncHandler = std::make_shared<SyncHandler>(_closed, req, domain, session, syncTo);
+ syncHandler->poll();
}
}
diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogserver.h b/searchlib/src/vespa/searchlib/transactionlog/translogserver.h
index 37133615c1e..3c6efa20550 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/translogserver.h
+++ b/searchlib/src/vespa/searchlib/transactionlog/translogserver.h
@@ -7,6 +7,7 @@
#include <vespa/document/util/queue.h>
#include <vespa/fnet/frt/invokable.h>
#include <shared_mutex>
+#include <atomic>
class FRT_Supervisor;
class FNET_Transport;
@@ -92,6 +93,7 @@ private:
std::mutex _fileLock; // Protects the creating and deleting domains including file system operations.
document::Queue<FRT_RPCRequest *> _reqQ;
const common::FileHeaderContext &_fileHeaderContext;
+ std::atomic<bool> _closed;
};
}