summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2020-03-31 00:19:10 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2020-09-06 11:13:09 +0000
commit76ec6e75656dd944ea61de46dbfd5d1173f3bb81 (patch)
treeaf58ebb5bab8b956a6e1805af14288b1db472998
parent466bc196eee4571e2197624f17b8a7d8aee38cf0 (diff)
Group commits to TLS and sync (resurrected).
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/domain.cpp100
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/domain.h25
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/domainpart.cpp1
3 files changed, 120 insertions, 6 deletions
diff --git a/searchlib/src/vespa/searchlib/transactionlog/domain.cpp b/searchlib/src/vespa/searchlib/transactionlog/domain.cpp
index 6b47393336a..4df5727c7e9 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/domain.cpp
+++ b/searchlib/src/vespa/searchlib/transactionlog/domain.cpp
@@ -40,6 +40,7 @@ DomainConfig::DomainConfig()
Domain::Domain(const string &domainName, const string & baseDir, Executor & commitExecutor,
Executor & sessionExecutor, const DomainConfig & cfg, const FileHeaderContext &fileHeaderContext)
: _config(cfg),
+ _currentChunk(std::make_unique<Chunk>()),
_lastSerial(0),
_singleCommiter(std::make_unique<vespalib::ThreadStackExecutor>(1, 128*1024)),
_commitExecutor(commitExecutor),
@@ -79,6 +80,8 @@ Domain::Domain(const string &domainName, const string & baseDir, Executor & comm
vespalib::File::sync(dir());
}
_lastSerial = end();
+ _self = _threadPool.NewThread(this);
+ assert(_self);
}
Domain &
@@ -87,6 +90,16 @@ Domain::setConfig(const DomainConfig & cfg) {
return *this;
}
+void
+Domain::Run(FastOS_ThreadInterface *thisThread, void *) {
+
+ while (!thisThread->GetBreakFlag()) {
+ vespalib::MonitorGuard guard(_currentChunkMonitor);
+ guard.wait(_config.getChunkAgeLimit());
+ commitIfStale(guard);
+ }
+}
+
void Domain::addPart(int64_t partId, bool isLastPart) {
auto dp = std::make_shared<DomainPart>(_name, dir(), partId, _config.getEncoding(),
_config.getCompressionlevel(), _fileHeaderContext, isLastPart);
@@ -127,7 +140,17 @@ private:
bool & _pendingSync;
};
-Domain::~Domain() { }
+Domain::~Domain() {
+ if (_self) {
+ _self->SetBreakFlag();
+ {
+ MonitorGuard guard(_currentChunkMonitor);
+ guard.broadcast();
+ }
+ _self->Join();
+ }
+ _singleCommiter->shutdown().sync();
+}
DomainInfo
Domain::getDomainInfo() const
@@ -308,10 +331,79 @@ waitPendingSync(vespalib::Monitor &syncMonitor, bool &pendingSync)
}
+Domain::Chunk::Chunk()
+ : _data(size_t(-1)),
+ _callBacks(),
+ _firstArrivalTime()
+{}
+
+Domain::Chunk::~Chunk() = default;
+
void
-Domain::commit(const Packet & packet, Writer::DoneCallback onDone)
-{
- (void) onDone;
+Domain::Chunk::add(const Packet &packet, Writer::DoneCallback onDone) {
+ if (_callBacks.empty()) {
+ _firstArrivalTime = vespalib::steady_clock::now();
+ }
+ _data.merge(packet);
+ _callBacks.emplace_back(std::move(onDone));
+}
+
+vespalib::duration
+Domain::Chunk::age() const {
+ if (_callBacks.empty()) {
+ return 0ms;
+ }
+ return (vespalib::steady_clock::now() - _firstArrivalTime);
+}
+
+void
+Domain::commit(const Packet & packet, Writer::DoneCallback onDone) {
+ vespalib::MonitorGuard guard(_currentChunkMonitor);
+ if (! (_lastSerial < packet.range().from())) {
+ throw runtime_error(make_string("Incomming serial number(%ld) must be bigger than the last one (%ld).",
+ packet.range().from(), _lastSerial));
+ } else {
+ _lastSerial = packet.range().to();
+ }
+ _currentChunk->add(packet, std::move(onDone));
+ commitIfFull(guard);
+}
+
+void
+Domain::commitIfFull(const vespalib::MonitorGuard &guard) {
+ if (_currentChunk->sizeBytes() > _config.getChunkSizeLimit()) {
+ auto completed = grabCurrentChunk(guard);
+ if (completed) {
+ commitChunk(std::move(completed), guard);
+ }
+ }
+}
+
+std::unique_ptr<Domain::Chunk>
+Domain::grabCurrentChunk(const vespalib::MonitorGuard & guard) {
+ assert(guard.monitors(_currentChunkMonitor));
+ auto chunk = std::move(_currentChunk);
+ _currentChunk = std::make_unique<Chunk>();
+ return chunk;
+}
+
+void
+Domain::commitIfStale(const vespalib::MonitorGuard & guard) {
+ assert(guard.monitors(_currentChunkMonitor));
+ if (_currentChunk->age() > _config.getChunkAgeLimit()) {
+ commitChunk(grabCurrentChunk(guard), guard);
+ }
+}
+
+void
+Domain::commitChunk(std::unique_ptr<Chunk> chunk, const vespalib::MonitorGuard & chunkOrderGuard) {
+ assert(chunkOrderGuard.monitors(_currentChunkMonitor));
+ _singleCommiter->execute(vespalib::makeLambdaTask([this, chunk = std::move(chunk)] () mutable { doCommit(std::move(chunk)); } ));
+}
+
+void
+Domain::doCommit(std::unique_ptr<Chunk> chunk) {
+ const Packet & packet = chunk->getPacket();
DomainPart::SP dp(_parts.rbegin()->second);
vespalib::nbostream_longlivedbuf is(packet.getHandle().data(), packet.getHandle().size());
Packet::Entry entry;
diff --git a/searchlib/src/vespa/searchlib/transactionlog/domain.h b/searchlib/src/vespa/searchlib/transactionlog/domain.h
index a6c6dad5fe8..ad9e10ba426 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/domain.h
+++ b/searchlib/src/vespa/searchlib/transactionlog/domain.h
@@ -60,7 +60,7 @@ struct DomainInfo {
typedef std::map<vespalib::string, DomainInfo> DomainStats;
-class Domain
+class Domain final : public FastOS_Runnable
{
public:
using SP = std::shared_ptr<Domain>;
@@ -68,7 +68,7 @@ public:
Domain(const vespalib::string &name, const vespalib::string &baseDir, Executor & commitExecutor,
Executor & sessionExecutor, const DomainConfig & cfg, const common::FileHeaderContext &fileHeaderContext);
- ~Domain();
+ ~Domain() override;
DomainInfo getDomainInfo() const;
const vespalib::string & name() const { return _name; }
@@ -103,6 +103,26 @@ public:
uint64_t size() const;
Domain & setConfig(const DomainConfig & cfg);
private:
+ void Run(FastOS_ThreadInterface *thisThread, void *arguments) override;
+ void commitIfStale(const vespalib::MonitorGuard & guard);
+ void commitIfFull(const vespalib::MonitorGuard & guard);
+ class Chunk {
+ public:
+ Chunk();
+ ~Chunk();
+ void add(const Packet & packet, Writer::DoneCallback onDone);
+ size_t sizeBytes() const { return _data.sizeBytes(); }
+ const Packet & getPacket() const { return _data; }
+ vespalib::duration age() const;
+ private:
+ Packet _data;
+ std::vector<Writer::DoneCallback> _callBacks;
+ vespalib::steady_time _firstArrivalTime;
+ };
+
+ std::unique_ptr<Chunk> grabCurrentChunk(const vespalib::MonitorGuard & guard);
+ void commitChunk(std::unique_ptr<Chunk> chunk, const vespalib::MonitorGuard & chunkOrderGuard);
+ void doCommit(std::unique_ptr<Chunk> chunk);
SerialNum begin(const vespalib::LockGuard & guard) const;
SerialNum end(const vespalib::LockGuard & guard) const;
size_t byteSize(const vespalib::LockGuard & guard) const;
@@ -120,6 +140,7 @@ private:
using DurationSeconds = std::chrono::duration<double>;
DomainConfig _config;
+ std::unique_ptr<Chunk> _currentChunk;
SerialNum _lastSerial;
std::unique_ptr<Executor> _singleCommiter;
Executor & _commitExecutor;
diff --git a/searchlib/src/vespa/searchlib/transactionlog/domainpart.cpp b/searchlib/src/vespa/searchlib/transactionlog/domainpart.cpp
index 96e75c35a81..416cdb566ce 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/domainpart.cpp
+++ b/searchlib/src/vespa/searchlib/transactionlog/domainpart.cpp
@@ -427,6 +427,7 @@ DomainPart::commit(SerialNum firstSerial, const Packet &packet)
_packets.insert(std::make_pair(firstSerial, std::move(packet)));
_skipList.push_back(SkipInfo(firstSerial, firstPos));
}
+ sync();
}
void DomainPart::sync()