aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2020-09-09 07:54:54 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2020-09-09 07:54:54 +0000
commitd26ed36e397a79c1b809f439ab82e06fb95d504c (patch)
tree686bb6abc3f39ca5251189d950dbec5c2ece6995
parent9d770f5feaba1d205485c1bdde1d834a1a835332 (diff)
Use a thread that can schedule with higher frequency than the max frequency of 33hz that the fnet scheduler can do.
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/domain.cpp13
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/domain.h6
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp44
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/translogserver.h6
4 files changed, 33 insertions, 36 deletions
diff --git a/searchlib/src/vespa/searchlib/transactionlog/domain.cpp b/searchlib/src/vespa/searchlib/transactionlog/domain.cpp
index 624c85c9cc0..45afc0c3ee3 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/domain.cpp
+++ b/searchlib/src/vespa/searchlib/transactionlog/domain.cpp
@@ -365,28 +365,31 @@ Domain::grabCurrentChunk(const vespalib::MonitorGuard & guard) {
return chunk;
}
-void
+bool
Domain::commitIfStale() {
vespalib::MonitorGuard guard(_currentChunkMonitor);
- commitIfStale(guard);
+ return commitIfStale(guard);
}
-void
+bool
Domain::commitIfStale(const vespalib::MonitorGuard & guard) {
assert(guard.monitors(_currentChunkMonitor));
if ((_currentChunk->age() > _config.getChunkAgeLimit()) && ! _currentChunk->getPacket().empty()) {
- commitChunk(grabCurrentChunk(guard), guard);
+ return commitChunk(grabCurrentChunk(guard), guard);
}
+ return false;
}
-void
+bool
Domain::commitChunk(std::unique_ptr<Chunk> chunk, const vespalib::MonitorGuard & chunkOrderGuard) {
assert(chunkOrderGuard.monitors(_currentChunkMonitor));
if ( ! chunk->getPacket().empty()) {
_singleCommiter->execute( makeLambdaTask([this, chunk = std::move(chunk)]() mutable {
doCommit(std::move(chunk));
}));
+ return true;
}
+ return false;
}
void
diff --git a/searchlib/src/vespa/searchlib/transactionlog/domain.h b/searchlib/src/vespa/searchlib/transactionlog/domain.h
index 3688901d23f..5172f0508dc 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/domain.h
+++ b/searchlib/src/vespa/searchlib/transactionlog/domain.h
@@ -34,7 +34,7 @@ public:
SerialNum end() const;
SerialNum getSynced() const;
void triggerSyncNow();
- void commitIfStale();
+ bool commitIfStale();
bool getMarkedDeleted() const { return _markedDeleted; }
void markDeleted() { _markedDeleted = true; }
@@ -57,7 +57,7 @@ public:
uint64_t size() const;
Domain & setConfig(const DomainConfig & cfg);
private:
- void commitIfStale(const vespalib::MonitorGuard & guard);
+ bool commitIfStale(const vespalib::MonitorGuard & guard);
void commitIfFull(const vespalib::MonitorGuard & guard);
class Chunk {
public:
@@ -75,7 +75,7 @@ private:
};
std::unique_ptr<Chunk> grabCurrentChunk(const vespalib::MonitorGuard & guard);
- void commitChunk(std::unique_ptr<Chunk> chunk, const vespalib::MonitorGuard & chunkOrderGuard);
+ bool commitChunk(std::unique_ptr<Chunk> chunk, const vespalib::MonitorGuard & chunkOrderGuard);
void doCommit(std::unique_ptr<Chunk> chunk);
SerialNum begin(const vespalib::LockGuard & guard) const;
SerialNum end(const vespalib::LockGuard & guard) const;
diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp b/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp
index ec70f32694a..15ce2f537ea 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp
+++ b/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp
@@ -72,26 +72,6 @@ SyncHandler::PerformTask()
}
}
-class StaleCommitTask final : public FNET_Task {
-public:
- StaleCommitTask(FNET_Scheduler * sceduler, TransLogServer & server, double seconds2Wait)
- : FNET_Task(sceduler),
- _server(server),
- _seconds2Wait(seconds2Wait)
- {}
- ~StaleCommitTask() override = default;
-
- void PerformTask() override {
- if (_server.running()) {
- _server.commitIfStale();
- Schedule(_seconds2Wait);
- }
- }
-private:
- TransLogServer & _server;
- double _seconds2Wait;
-};
-
}
TransLogServer::TransLogServer(const vespalib::string &name, int listenPort, const vespalib::string &baseDir,
@@ -117,6 +97,7 @@ TransLogServer::TransLogServer(const vespalib::string &name, int listenPort, con
_threadPool(std::make_unique<FastOS_ThreadPool>(1024*120)),
_transport(std::make_unique<FNET_Transport>()),
_supervisor(std::make_unique<FRT_Supervisor>(_transport.get())),
+ _staleCommitThread(),
_domains(),
_reqQ(),
_fileHeaderContext(fileHeaderContext)
@@ -161,16 +142,19 @@ TransLogServer::TransLogServer(const vespalib::string &name, int listenPort, con
throw std::runtime_error(make_string("Failed creating tls base dir %s r(%d), e(%d). Requires manual intervention.", _baseDir.c_str(), retval, errno));
}
start(*_threadPool);
- double chunkAgeLimit = vespalib::to_s(_domainConfig.getChunkAgeLimit());
- _staleCommitTask = std::make_unique<StaleCommitTask>(_supervisor->GetScheduler(), *this, chunkAgeLimit);
- _staleCommitTask->ScheduleNow();
+ _staleCommitThread = std::make_unique<std::thread>([this]() {
+ while (running()) {
+ std::this_thread::sleep_for(getChunkAgeLimit());
+ commitIfStale();
+ }
+ });
}
TransLogServer::~TransLogServer()
{
- _staleCommitTask->Kill();
stop();
join();
+ _staleCommitThread->join();
_commitExecutor.shutdown();
_commitExecutor.sync();
_sessionExecutor.shutdown();
@@ -228,6 +212,12 @@ TransLogServer::run()
LOG(info, "TLS Stopped");
}
+vespalib::duration
+TransLogServer::getChunkAgeLimit() const
+{
+ Guard domainGuard(_domainMutex);
+ return _domainConfig.getChunkAgeLimit();
+}
TransLogServer &
TransLogServer::setDomainConfig(const DomainConfig & cfg) {
@@ -239,12 +229,14 @@ TransLogServer::setDomainConfig(const DomainConfig & cfg) {
return *this;
}
-void
+bool
TransLogServer::commitIfStale() {
MonitorGuard domainMonitor(_domainMutex);
+ bool committedAnything(false);
for (const auto &domain : _domains) {
- domain.second->commitIfStale();
+ committedAnything = committedAnything || domain.second->commitIfStale();
}
+ return committedAnything;
}
DomainStats
diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogserver.h b/searchlib/src/vespa/searchlib/transactionlog/translogserver.h
index 8e0e8041385..8d5d5aa892f 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/translogserver.h
+++ b/searchlib/src/vespa/searchlib/transactionlog/translogserver.h
@@ -12,6 +12,7 @@ class FRT_Supervisor;
class FNET_Transport;
class FNET_Task;
+namespace std {class thread; }
namespace search::common { class FileHeaderContext; }
namespace search::transactionlog {
@@ -32,9 +33,10 @@ public:
const common::FileHeaderContext &fileHeaderContext);
~TransLogServer() override;
DomainStats getDomainStats() const;
- void commitIfStale();
+ bool commitIfStale();
void commit(const vespalib::string & domainName, const Packet & packet, DoneCallback done) override;
TransLogServer & setDomainConfig(const DomainConfig & cfg);
+ vespalib::duration getChunkAgeLimit() const;
class Session
{
@@ -87,7 +89,7 @@ private:
std::unique_ptr<FastOS_ThreadPool> _threadPool;
std::unique_ptr<FNET_Transport> _transport;
std::unique_ptr<FRT_Supervisor> _supervisor;
- std::unique_ptr<FNET_Task> _staleCommitTask;
+ std::unique_ptr<std::thread> _staleCommitThread;
DomainList _domains;
mutable std::mutex _domainMutex; // Protects _domains
std::condition_variable _domainCondition;