aboutsummaryrefslogtreecommitdiffstats
path: root/searchlib
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2017-10-17 19:35:28 +0200
committerHenning Baldersheim <balder@yahoo-inc.com>2018-01-10 09:56:22 +0100
commit2b55ae64f687e83e72c4f5c0062ff482c818d47e (patch)
tree3325a9816e4728ddfd198ce057dcf32049f97061 /searchlib
parentd993d34963d3e52ce56df43496dd8455239e8247 (diff)
Wire in grouping in larger packets.
Diffstat (limited to 'searchlib')
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/common.cpp3
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/domain.cpp99
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/domain.h36
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/domainpart.cpp27
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp44
5 files changed, 149 insertions, 60 deletions
diff --git a/searchlib/src/vespa/searchlib/transactionlog/common.cpp b/searchlib/src/vespa/searchlib/transactionlog/common.cpp
index a84e27b2e53..b6009111e3e 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/common.cpp
+++ b/searchlib/src/vespa/searchlib/transactionlog/common.cpp
@@ -83,8 +83,7 @@ Packet::Entry::Entry(SerialNum u, Type t, const vespalib::ConstBufferRef & d) :
_type(t),
_valid(true),
_data(d)
-{
-}
+{ }
bool Packet::add(const Packet::Entry & e)
diff --git a/searchlib/src/vespa/searchlib/transactionlog/domain.cpp b/searchlib/src/vespa/searchlib/transactionlog/domain.cpp
index 88c2dd9ecc3..d7711c80051 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/domain.cpp
+++ b/searchlib/src/vespa/searchlib/transactionlog/domain.cpp
@@ -20,28 +20,35 @@ using vespalib::MonitorGuard;
using search::common::FileHeaderContext;
using std::runtime_error;
using namespace std::chrono_literals;
+using namespace std::chrono;
namespace search::transactionlog {
-Domain::Domain(const string &domainName, const string & baseDir, Executor & commitExecutor,
- Executor & sessionExecutor, uint64_t domainPartSize, DomainPart::Crc defaultCrcType,
- const FileHeaderContext &fileHeaderContext) :
+Domain::Domain(const string &domainName, const string & baseDir, FastOS_ThreadPool & threadPool,
+ Executor & commitExecutor, Executor & sessionExecutor, uint64_t domainPartSize,
+ DomainPart::Crc defaultCrcType, const FileHeaderContext &fileHeaderContext) :
+ _currentChunk(std::make_unique<Chunk>()),
_defaultCrcType(defaultCrcType),
+ _threadPool(threadPool),
_commitExecutor(commitExecutor),
_sessionExecutor(sessionExecutor),
_sessionId(1),
_syncMonitor(),
_pendingSync(false),
_name(domainName),
- _domainPartSize(domainPartSize),
+ _domainPartSizeLimit(domainPartSize),
+ _chunkSizeLimit(0x40000),
+ _chunkAgeLimit(10ms),
_parts(),
_lock(),
+ _currentChunkMonitor(),
_sessionLock(),
_sessions(),
_maxSessionRunTime(),
_baseDir(baseDir),
_fileHeaderContext(fileHeaderContext),
- _markedDeleted(false)
+ _markedDeleted(false),
+ _self(nullptr)
{
int retval(0);
if ((retval = makeDirectory(_baseDir.c_str())) != 0) {
@@ -61,8 +68,19 @@ Domain::Domain(const string &domainName, const string & baseDir, Executor & comm
if (_parts.empty() || _parts.crbegin()->second->isClosed()) {
_parts[lastPart].reset(new DomainPart(_name, dir(), lastPart, _defaultCrcType, _fileHeaderContext, false));
}
+ _self = _threadPool.NewThread(this);
+ assert(_self);
}
+void
+Domain::Run(FastOS_ThreadInterface *thisThread, void *) {
+
+ while (!thisThread->GetBreakFlag()) {
+ vespalib::MonitorGuard guard(_currentChunkMonitor);
+ guard.wait(duration_cast<milliseconds>(_chunkAgeLimit).count());
+ commitIfStale(guard);
+ }
+}
void Domain::addPart(int64_t partId, bool isLastPart) {
DomainPart::SP dp(new DomainPart(_name, dir(), partId, _defaultCrcType, _fileHeaderContext, isLastPart));
if (dp->size() == 0) {
@@ -102,7 +120,16 @@ private:
bool & _pendingSync;
};
-Domain::~Domain() { }
+Domain::~Domain() {
+ if (_self) {
+ _self->SetBreakFlag();
+ {
+ MonitorGuard guard(_currentChunkMonitor);
+ guard.broadcast();
+ }
+ _self->Join();
+ }
+}
DomainInfo
Domain::getDomainInfo() const
@@ -267,7 +294,8 @@ void Domain::cleanSessions()
namespace {
-void waitPendingSync(vespalib::Monitor &syncMonitor, bool &pendingSync)
+void
+waitPendingSync(vespalib::Monitor &syncMonitor, bool &pendingSync)
{
MonitorGuard guard(syncMonitor);
while (pendingSync) {
@@ -277,13 +305,66 @@ void waitPendingSync(vespalib::Monitor &syncMonitor, bool &pendingSync)
}
-void Domain::commit(const Packet & packet)
+void
+Domain::Chunk::add(const Packet &packet, Writer::DoneCallback onDone) {
+ if (_callBacks.empty()) {
+ _firstArrivalTime = steady_clock::now();
+ }
+ if ( ! _data.merge(packet) ) {
+ throw runtime_error(make_string("Failed merging of packet %zu into packet %zu",
+ packet.range().from(), _data.range().from()));
+ }
+ _callBacks.emplace_back(std::move(onDone));
+}
+
+microseconds
+Domain::Chunk::age() const {
+ if (_callBacks.empty()) {
+ return 0ms;
+ }
+ return duration_cast<microseconds>(steady_clock::now() - _firstArrivalTime);
+}
+
+void
+Domain::commit(const Packet & packet, Writer::DoneCallback onDone) {
+
+ std::unique_ptr<Chunk> completed;
+ vespalib::MonitorGuard guard(_currentChunkMonitor);
+ _currentChunk->add(packet, std::move(onDone));
+ if (_currentChunk->sizeBytes() > _chunkSizeLimit) {
+ completed = grabCurrentChunk(guard);
+ }
+ if (completed) {
+ commitChunk(std::move(_currentChunk), guard);
+ }
+}
+
+std::unique_ptr<Domain::Chunk>
+Domain::grabCurrentChunk(const vespalib::MonitorGuard & guard) {
+ assert(guard.monitors(_currentChunkMonitor));
+ auto chunk = std::move(_currentChunk);
+ _currentChunk = std::make_unique<Chunk>();
+ return chunk;
+}
+
+void
+Domain::commitIfStale(const vespalib::MonitorGuard & guard) {
+ assert(guard.monitors(_currentChunkMonitor));
+ if (_currentChunk->age() > _chunkAgeLimit) {
+ commitChunk(grabCurrentChunk(guard), guard);
+ }
+}
+
+void
+Domain::commitChunk(std::unique_ptr<Chunk> chunk, const vespalib::MonitorGuard & chunkOrderGuard)
{
+ assert(chunkOrderGuard.monitors(_currentChunkMonitor));
+ const Packet & packet = chunk->getPacket();
DomainPart::SP dp(_parts.rbegin()->second);
vespalib::nbostream_longlivedbuf is(packet.getHandle().c_str(), packet.getHandle().size());
Packet::Entry entry;
entry.deserialize(is);
- if (dp->byteSize() > _domainPartSize) {
+ if (dp->byteSize() > _domainPartSizeLimit) {
waitPendingSync(_syncMonitor, _pendingSync);
triggerSyncNow();
waitPendingSync(_syncMonitor, _pendingSync);
diff --git a/searchlib/src/vespa/searchlib/transactionlog/domain.h b/searchlib/src/vespa/searchlib/transactionlog/domain.h
index c1ff9157a6f..6d508e6d72e 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/domain.h
+++ b/searchlib/src/vespa/searchlib/transactionlog/domain.h
@@ -35,22 +35,22 @@ struct DomainInfo {
typedef std::map<vespalib::string, DomainInfo> DomainStats;
-class Domain
+class Domain final : public FastOS_Runnable
{
public:
using SP = std::shared_ptr<Domain>;
using Executor = vespalib::ThreadExecutor;
- Domain(const vespalib::string &name, const vespalib::string &baseDir, Executor & commitExecutor,
- Executor & sessionExecutor, uint64_t domainPartSize, DomainPart::Crc defaultCrcType,
- const common::FileHeaderContext &fileHeaderContext);
+ Domain(const vespalib::string &name, const vespalib::string &baseDir, FastOS_ThreadPool & threadPool,
+ Executor & commitExecutor, Executor & sessionExecutor, uint64_t domainPartSize,
+ DomainPart::Crc defaultCrcType, const common::FileHeaderContext &fileHeaderContext);
- virtual ~Domain();
+ ~Domain() override;
DomainInfo getDomainInfo() const;
const vespalib::string & name() const { return _name; }
bool erase(SerialNum to);
- void commit(const Packet & packet);
+ void commit(const Packet & packet, Writer::DoneCallback onDone);
int visit(const Domain::SP & self, SerialNum from, SerialNum to, FRT_Supervisor & supervisor, FNET_Connection *conn);
SerialNum begin() const;
@@ -79,6 +79,22 @@ public:
uint64_t size() const;
private:
+ void Run(FastOS_ThreadInterface *thisThread, void *arguments) override;
+ void commitIfStale(const vespalib::MonitorGuard & guard);
+ class Chunk {
+ public:
+ void add(const Packet & packet, Writer::DoneCallback onDone);
+ size_t sizeBytes() const { return _data.sizeBytes(); }
+ const Packet & getPacket() const { return _data; }
+ std::chrono::microseconds age() const;
+ private:
+ Packet _data;
+ std::vector<Writer::DoneCallback> _callBacks;
+ std::chrono::steady_clock::time_point _firstArrivalTime;
+ };
+
+ std::unique_ptr<Chunk> grabCurrentChunk(const vespalib::MonitorGuard & guard);
+ void commitChunk(std::unique_ptr<Chunk> chunk, const vespalib::MonitorGuard & chunkOrderGuard);
SerialNum begin(const vespalib::LockGuard & guard) const;
SerialNum end(const vespalib::LockGuard & guard) const;
size_t byteSize(const vespalib::LockGuard & guard) const;
@@ -95,22 +111,28 @@ private:
using DomainPartList = std::map<int64_t, DomainPart::SP>;
using DurationSeconds = std::chrono::duration<double>;
+ std::unique_ptr<Chunk> _currentChunk;
DomainPart::Crc _defaultCrcType;
+ FastOS_ThreadPool & _threadPool;
Executor & _commitExecutor;
Executor & _sessionExecutor;
std::atomic<int> _sessionId;
vespalib::Monitor _syncMonitor;
bool _pendingSync;
vespalib::string _name;
- uint64_t _domainPartSize;
+ const uint64_t _domainPartSizeLimit;
+ const uint64_t _chunkSizeLimit;
+ const std::chrono::microseconds _chunkAgeLimit;
DomainPartList _parts;
vespalib::Lock _lock;
+ vespalib::Monitor _currentChunkMonitor;
vespalib::Lock _sessionLock;
SessionList _sessions;
DurationSeconds _maxSessionRunTime;
vespalib::string _baseDir;
const common::FileHeaderContext &_fileHeaderContext;
bool _markedDeleted;
+ FastOS_ThreadInterface * _self;
};
}
diff --git a/searchlib/src/vespa/searchlib/transactionlog/domainpart.cpp b/searchlib/src/vespa/searchlib/transactionlog/domainpart.cpp
index 35bdc71c963..06e0820c592 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/domainpart.cpp
+++ b/searchlib/src/vespa/searchlib/transactionlog/domainpart.cpp
@@ -31,23 +31,15 @@ void
handleSync(FastOS_FileInterface &file) __attribute__ ((noinline));
string
-handleWriteError(const char *text,
- FastOS_FileInterface &file,
- int64_t lastKnownGoodPos,
- const Packet::Entry &entry,
- int bufLen) __attribute__ ((noinline));
+handleWriteError(const char *text, FastOS_FileInterface &file, int64_t lastKnownGoodPos,
+ const Packet::Entry &entry, int bufLen) __attribute__ ((noinline));
bool
-handleReadError(const char *text,
- FastOS_FileInterface &file,
- ssize_t len,
- ssize_t rlen,
- int64_t lastKnownGoodPos,
- bool allowTruncate) __attribute__ ((noinline));
+handleReadError(const char *text, FastOS_FileInterface &file, ssize_t len, ssize_t rlen,
+ int64_t lastKnownGoodPos, bool allowTruncate) __attribute__ ((noinline));
bool
-addPacket(Packet &packet,
- const Packet::Entry &e) __attribute__ ((noinline));
+addPacket(Packet &packet, const Packet::Entry &e) __attribute__ ((noinline));
bool
tailOfFileIsZero(FastOS_FileInterface &file, int64_t lastKnownGoodPos) __attribute__ ((noinline));
@@ -599,10 +591,7 @@ DomainPart::write(FastOS_FileInterface &file, const Packet::Entry &entry)
}
bool
-DomainPart::read(FastOS_FileInterface &file,
- Packet::Entry &entry,
- Alloc & buf,
- bool allowTruncate)
+DomainPart::read(FastOS_FileInterface &file, Packet::Entry &entry, Alloc & buf, bool allowTruncate)
{
bool retval(true);
char tmp[5];
@@ -615,8 +604,8 @@ DomainPart::read(FastOS_FileInterface &file,
if ((retval = (rlen == sizeof(tmp)))) {
if ( ! (retval = (version == ccitt_crc32) || version == xxh64)) {
string msg(make_string("Version mismatch. Expected 'ccitt_crc32=1' or 'xxh64=2',"
- " got %d from '%s' at position %ld",
- version, file.GetFileName(), lastKnownGoodPos));
+ " got %d from '%s' at position %ld",
+ version, file.GetFileName(), lastKnownGoodPos));
if ((version == 0) && (len == 0) && tailOfFileIsZero(file, lastKnownGoodPos)) {
LOG(warning, "%s", msg.c_str());
return handleReadError("packet version", file, sizeof(tmp), rlen, lastKnownGoodPos, allowTruncate);
diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp b/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp
index 4c3c5609a93..cf2b1510526 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp
+++ b/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp
@@ -1,5 +1,6 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "translogserver.h"
+#include <vespa/searchlib/common/gatecallback.h>
#include <vespa/vespalib/util/stringfmt.h>
#include <vespa/vespalib/io/fileutil.h>
#include <vespa/vespalib/util/exceptions.h>
@@ -13,6 +14,8 @@ using vespalib::make_string;
using vespalib::stringref;
using vespalib::IllegalArgumentException;
using search::common::FileHeaderContext;
+using std::make_shared;
+using std::runtime_error;
namespace search::transactionlog {
@@ -26,21 +29,16 @@ class SyncHandler : public FNET_Task
SerialNum _syncTo;
public:
- SyncHandler(FRT_Supervisor *supervisor,
- FRT_RPCRequest *req,const Domain::SP &domain,
- const TransLogServer::Session::SP &session,
- SerialNum syncTo);
+ SyncHandler(FRT_Supervisor *supervisor, FRT_RPCRequest *req,const Domain::SP &domain,
+ const TransLogServer::Session::SP &session, SerialNum syncTo);
~SyncHandler();
void PerformTask() override;
};
-SyncHandler::SyncHandler(FRT_Supervisor *supervisor,
- FRT_RPCRequest *req,
- const Domain::SP &domain,
- const TransLogServer::Session::SP &session,
- SerialNum syncTo)
+SyncHandler::SyncHandler(FRT_Supervisor *supervisor, FRT_RPCRequest *req, const Domain::SP &domain,
+ const TransLogServer::Session::SP &session, SerialNum syncTo)
: FNET_Task(supervisor->GetScheduler()),
_req(*req),
_domain(domain),
@@ -50,9 +48,7 @@ SyncHandler::SyncHandler(FRT_Supervisor *supervisor,
}
-SyncHandler::~SyncHandler()
-{
-}
+SyncHandler::~SyncHandler() = default;
void
@@ -95,7 +91,7 @@ TransLogServer::TransLogServer(const vespalib::string &name, int listenPort, con
_defaultCrcType(defaultCrcType),
_commitExecutor(maxThreads, 128*1024),
_sessionExecutor(maxThreads, 128*1024),
- _threadPool(8192, 1),
+ _threadPool(0x20000),
_supervisor(std::make_unique<FRT_Supervisor>()),
_domains(),
_reqQ(),
@@ -110,8 +106,8 @@ TransLogServer::TransLogServer(const vespalib::string &name, int listenPort, con
domainDir >> domainName;
if ( ! domainName.empty()) {
try {
- auto domain = std::make_shared<Domain>(domainName, dir(), _commitExecutor, _sessionExecutor,
- _domainPartSize, _defaultCrcType,_fileHeaderContext);
+ auto domain = make_shared<Domain>(domainName, dir(), _threadPool, _commitExecutor, _sessionExecutor,
+ _domainPartSize, _defaultCrcType,_fileHeaderContext);
_domains[domain->name()] = domain;
} catch (const std::exception & e) {
LOG(warning, "Failed creating %s domain on startup. Exception = %s", domainName.c_str(), e.what());
@@ -132,13 +128,13 @@ TransLogServer::TransLogServer(const vespalib::string &name, int listenPort, con
}
}
if ( ! listenOk ) {
- throw std::runtime_error(make_string("Failed listening at port %s. Giving up. Requires manual intervention.", listenSpec));
+ throw runtime_error(make_string("Failed listening at port %s. Giving up. Requires manual intervention.", listenSpec));
}
} else {
- throw std::runtime_error(make_string("Failed creating tls dir %s r(%d), e(%d). Requires manual intervention.", dir().c_str(), retval, errno));
+ throw runtime_error(make_string("Failed creating tls dir %s r(%d), e(%d). Requires manual intervention.", dir().c_str(), retval, errno));
}
} else {
- throw std::runtime_error(make_string("Failed creating tls base dir %s r(%d), e(%d). Requires manual intervention.", _baseDir.c_str(), retval, errno));
+ throw runtime_error(make_string("Failed creating tls base dir %s r(%d), e(%d). Requires manual intervention.", _baseDir.c_str(), retval, errno));
}
start(_threadPool);
}
@@ -200,7 +196,7 @@ void TransLogServer::run()
}
}
logMetric();
- } while (running() && !(hasPacket && (req == NULL)));
+ } while (running() && !(hasPacket && (req == nullptr)));
LOG(info, "TLS Stopped");
}
@@ -349,8 +345,8 @@ void TransLogServer::createDomain(FRT_RPCRequest *req)
Domain::SP domain(findDomain(domainName));
if ( !domain ) {
try {
- domain = std::make_shared<Domain>(domainName, dir(), _commitExecutor, _sessionExecutor,
- _domainPartSize, _defaultCrcType, _fileHeaderContext);
+ domain = make_shared<Domain>(domainName, dir(), _threadPool, _commitExecutor, _sessionExecutor,
+ _domainPartSize, _defaultCrcType, _fileHeaderContext);
{
Guard domainGuard(_lock);
_domains[domain->name()] = domain;
@@ -462,7 +458,7 @@ void TransLogServer::commit(const vespalib::string & domainName, const Packet &
(void) done;
Domain::SP domain(findDomain(domainName));
if (domain) {
- domain->commit(packet);
+ domain->commit(packet, std::move(done));
} else {
throw IllegalArgumentException("Could not find domain " + domainName);
}
@@ -478,7 +474,9 @@ void TransLogServer::domainCommit(FRT_RPCRequest *req)
if (domain) {
Packet packet(params[1]._data._buf, params[1]._data._len);
try {
- domain->commit(packet);
+ vespalib::Gate gate;
+ domain->commit(packet, make_shared<GateCallback>(gate));
+ gate.await();
ret.AddInt32(0);
ret.AddString("ok");
} catch (const std::exception & e) {