summaryrefslogtreecommitdiffstats
path: root/searchlib/src
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2020-09-22 12:15:25 +0200
committerGitHub <noreply@github.com>2020-09-22 12:15:25 +0200
commit804e8057c2eca61ec9bc8985430613e0731922a2 (patch)
treeb9bca78fe11e1bb0a0bef13a4380b3de683271ef /searchlib/src
parent1fa18524f943ae986f6526db01945d41de2f9d6d (diff)
parent20d0f96d2e6f276669293780adc67421aea0dee2 (diff)
Merge pull request #14476 from vespa-engine/balder/add-commit-result
Balder/add commit result
Diffstat (limited to 'searchlib/src')
-rw-r--r--searchlib/src/tests/transactionlog/translogclient_test.cpp5
-rw-r--r--searchlib/src/vespa/searchlib/common/fileheadercontext.h34
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/common.cpp26
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/common.h34
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/domain.cpp11
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/domain.h41
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp3
7 files changed, 93 insertions, 61 deletions
diff --git a/searchlib/src/tests/transactionlog/translogclient_test.cpp b/searchlib/src/tests/transactionlog/translogclient_test.cpp
index 9e5021b4778..fffb70467a3 100644
--- a/searchlib/src/tests/transactionlog/translogclient_test.cpp
+++ b/searchlib/src/tests/transactionlog/translogclient_test.cpp
@@ -329,11 +329,12 @@ fillDomainTest(TransLogServer & s1, const vespalib::string & domain, size_t numP
Packet::Entry e(value+1, j+1, vespalib::ConstBufferRef((const char *)&value, sizeof(value)));
p->add(e);
if ( p->sizeBytes() > DEFAULT_PACKET_SIZE ) {
- domainWriter->commit(*p, std::make_shared<CountDone>(inFlight));
+ domainWriter->append(*p, std::make_shared<CountDone>(inFlight));
p = std::make_unique<Packet>(DEFAULT_PACKET_SIZE);
}
}
- domainWriter->commit(*p, std::make_shared<CountDone>(inFlight));
+ domainWriter->append(*p, std::make_shared<CountDone>(inFlight));
+ auto keep = domainWriter->startCommit(Writer::DoneCallback());
LOG(info, "Inflight %ld", inFlight.load());
}
while (inFlight.load() != 0) {
diff --git a/searchlib/src/vespa/searchlib/common/fileheadercontext.h b/searchlib/src/vespa/searchlib/common/fileheadercontext.h
index 6f76fe1717d..8bb3d6a56a6 100644
--- a/searchlib/src/vespa/searchlib/common/fileheadercontext.h
+++ b/searchlib/src/vespa/searchlib/common/fileheadercontext.h
@@ -3,40 +3,22 @@
#include <vespa/vespalib/stllike/string.h>
-namespace vespalib
-{
-
-class GenericHeader;
-
+namespace vespalib {
+ class GenericHeader;
}
-namespace search
-{
-
-namespace common
-{
+namespace search::common {
class FileHeaderContext
{
public:
FileHeaderContext();
+ virtual ~FileHeaderContext();
- virtual
- ~FileHeaderContext();
+ virtual void addTags(vespalib::GenericHeader &header, const vespalib::string &name) const = 0;
- virtual void
- addTags(vespalib::GenericHeader &header,
- const vespalib::string &name) const = 0;
-
- static void
- addCreateAndFreezeTime(vespalib::GenericHeader &header);
-
- static void
- setFreezeTime(vespalib::GenericHeader &header);
+ static void addCreateAndFreezeTime(vespalib::GenericHeader &header);
+ static void setFreezeTime(vespalib::GenericHeader &header);
};
-
-} // namespace common
-
-} // namespace search
-
+}
diff --git a/searchlib/src/vespa/searchlib/transactionlog/common.cpp b/searchlib/src/vespa/searchlib/transactionlog/common.cpp
index 556ebca06ec..3308f3182dc 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/common.cpp
+++ b/searchlib/src/vespa/searchlib/transactionlog/common.cpp
@@ -121,11 +121,26 @@ Packet::add(const Packet::Entry & e)
_range.to(e.serial());
}
+Writer::CommitResult::CommitResult()
+ : _callBacks()
+{}
+Writer::CommitResult::CommitResult( CommitPayload commitPayLoad)
+ : _callBacks(std::move(commitPayLoad))
+{}
+
+Writer::CommitResult::~CommitResult() = default;
+
CommitChunk::CommitChunk(size_t reserveBytes, size_t reserveCount)
: _data(reserveBytes),
- _callBacks()
+ _callBacks(std::make_shared<Writer::DoneCallbacksList>())
+{
+ _callBacks->reserve(reserveCount);
+}
+
+CommitChunk::CommitChunk(size_t reserveBytes, Writer::CommitPayload postponed)
+ : _data(reserveBytes),
+ _callBacks(std::move(postponed))
{
- _callBacks.reserve(reserveCount);
}
CommitChunk::~CommitChunk() = default;
@@ -133,7 +148,12 @@ CommitChunk::~CommitChunk() = default;
void
CommitChunk::add(const Packet &packet, Writer::DoneCallback onDone) {
_data.merge(packet);
- _callBacks.emplace_back(std::move(onDone));
+ _callBacks->emplace_back(std::move(onDone));
+}
+
+Writer::CommitResult
+CommitChunk::createCommitResult() const {
+ return Writer::CommitResult(_callBacks);
}
}
diff --git a/searchlib/src/vespa/searchlib/transactionlog/common.h b/searchlib/src/vespa/searchlib/transactionlog/common.h
index 7cdfad44b87..5d07d51cdf2 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/common.h
+++ b/searchlib/src/vespa/searchlib/transactionlog/common.h
@@ -16,7 +16,7 @@ class SerialNumRange
{
public:
SerialNumRange() : _from(0), _to(0) { }
- SerialNumRange(SerialNum f) : _from(f), _to(f ? f-1 : f) { }
+ explicit SerialNumRange(SerialNum f) : _from(f), _to(f ? f-1 : f) { }
SerialNumRange(SerialNum f, SerialNum t) : _from(f), _to(t) { }
bool operator == (const SerialNumRange & b) const { return cmp(b) == 0; }
bool operator < (const SerialNumRange & b) const { return cmp(b) < 0; }
@@ -63,7 +63,7 @@ public:
vespalib::ConstBufferRef _data;
};
public:
- Packet(size_t reserved) : _count(0), _range(), _buf(reserved) { }
+ explicit Packet(size_t reserved) : _count(0), _range(), _buf(reserved) { }
Packet(const void * buf, size_t sz);
void add(const Entry & data);
void clear() { _buf.clear(); _count = 0; _range.from(0); _range.to(0); }
@@ -84,8 +84,24 @@ int makeDirectory(const char * dir);
class Writer {
public:
using DoneCallback = std::shared_ptr<IDestructorCallback>;
+ using DoneCallbacksList = std::vector<DoneCallback>;
+ using CommitPayload = std::shared_ptr<DoneCallbacksList>;
+ class CommitResult {
+ public:
+ CommitResult();
+ CommitResult(CommitPayload callBacks);
+ CommitResult(CommitResult &&) noexcept = default;
+ CommitResult & operator = (CommitResult &&) noexcept = default;
+ CommitResult(const CommitResult &) = delete;
+ CommitResult & operator = (const CommitResult &) = delete;
+ ~CommitResult();
+ size_t getNumOperations() const { return _callBacks->size(); }
+ private:
+ CommitPayload _callBacks;
+ };
virtual ~Writer() = default;
- virtual void commit(const Packet & packet, DoneCallback done) = 0;
+ virtual void append(const Packet & packet, DoneCallback done) = 0;
+ [[nodiscard]] virtual CommitResult startCommit(DoneCallback onDone) = 0;
};
class WriterFactory {
@@ -106,14 +122,20 @@ public:
class CommitChunk {
public:
CommitChunk(size_t reserveBytes, size_t reserveCount);
+ CommitChunk(size_t reserveBytes, Writer::CommitPayload postponed);
~CommitChunk();
+ bool empty() const { return _callBacks->empty(); }
void add(const Packet & packet, Writer::DoneCallback onDone);
size_t sizeBytes() const { return _data.sizeBytes(); }
const Packet & getPacket() const { return _data; }
- size_t getNumCallBacks() const { return _callBacks.size(); }
+ size_t getNumCallBacks() const { return _callBacks->size(); }
+ Writer::CommitResult createCommitResult() const;
+ void setCommitDoneCallback(Writer::DoneCallback onDone) { _onCommitDone = std::move(onDone); }
+ Writer::CommitPayload stealCallbacks() { return std::move(_callBacks); }
private:
- Packet _data;
- std::vector<Writer::DoneCallback> _callBacks;
+ Packet _data;
+ Writer::CommitPayload _callBacks;
+ Writer::DoneCallback _onCommitDone;
};
}
diff --git a/searchlib/src/vespa/searchlib/transactionlog/domain.cpp b/searchlib/src/vespa/searchlib/transactionlog/domain.cpp
index 415ccafda70..ff2d963ccd9 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/domain.cpp
+++ b/searchlib/src/vespa/searchlib/transactionlog/domain.cpp
@@ -25,7 +25,6 @@ using vespalib::makeClosure;
using vespalib::makeLambdaTask;
using vespalib::Monitor;
using vespalib::MonitorGuard;
-using search::common::FileHeaderContext;
using std::runtime_error;
using std::make_shared;
@@ -35,7 +34,7 @@ Domain::Domain(const string &domainName, const string & baseDir, Executor & exec
const DomainConfig & cfg, const FileHeaderContext &fileHeaderContext)
: _config(cfg),
_lastSerial(0),
- _singleCommiter(std::make_unique<vespalib::ThreadStackExecutor>(1, 128*1024)),
+ _singleCommitter(std::make_unique<vespalib::ThreadStackExecutor>(1, 128 * 1024)),
_executor(executor),
_sessionId(1),
_syncMonitor(),
@@ -310,8 +309,14 @@ Domain::optionallyRotateFile(SerialNum serialNum) {
return dp;
}
+Domain::CommitResult
+Domain::startCommit(DoneCallback onDone) {
+ (void) onDone;
+ return CommitResult();
+}
+
void
-Domain::commit(const Packet & packet, Writer::DoneCallback onDone)
+Domain::append(const Packet & packet, Writer::DoneCallback onDone)
{
(void) onDone;
vespalib::nbostream_longlivedbuf is(packet.getHandle().data(), packet.getHandle().size());
diff --git a/searchlib/src/vespa/searchlib/transactionlog/domain.h b/searchlib/src/vespa/searchlib/transactionlog/domain.h
index 9adff564cc8..361ac8c1805 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/domain.h
+++ b/searchlib/src/vespa/searchlib/transactionlog/domain.h
@@ -18,8 +18,9 @@ public:
using SP = std::shared_ptr<Domain>;
using Executor = vespalib::SyncableThreadExecutor;
using DomainPartSP = std::shared_ptr<DomainPart>;
+ using FileHeaderContext = common::FileHeaderContext;
Domain(const vespalib::string &name, const vespalib::string &baseDir, Executor & executor,
- const DomainConfig & cfg, const common::FileHeaderContext &fileHeaderContext);
+ const DomainConfig & cfg, const FileHeaderContext &fileHeaderContext);
~Domain() override;
@@ -27,14 +28,14 @@ public:
const vespalib::string & name() const { return _name; }
bool erase(SerialNum to);
- void commit(const Packet & packet, Writer::DoneCallback onDone) override;
+ void append(const Packet & packet, Writer::DoneCallback onDone) override;
+ [[nodiscard]] CommitResult startCommit(DoneCallback onDone) override;
int visit(const Domain::SP & self, SerialNum from, SerialNum to, std::unique_ptr<Destination> dest);
SerialNum begin() const;
SerialNum end() const;
SerialNum getSynced() const;
void triggerSyncNow();
- bool commitIfStale();
bool getMarkedDeleted() const { return _markedDeleted; }
void markDeleted() { _markedDeleted = true; }
@@ -72,23 +73,23 @@ private:
using DomainPartList = std::map<SerialNum, DomainPartSP>;
using DurationSeconds = std::chrono::duration<double>;
- DomainConfig _config;
- SerialNum _lastSerial;
- std::unique_ptr<Executor> _singleCommiter;
- Executor & _executor;
- std::atomic<int> _sessionId;
- vespalib::Monitor _syncMonitor;
- bool _pendingSync;
- vespalib::string _name;
- DomainPartList _parts;
- vespalib::Lock _lock;
- vespalib::Monitor _currentChunkMonitor;
- vespalib::Lock _sessionLock;
- SessionList _sessions;
- DurationSeconds _maxSessionRunTime;
- vespalib::string _baseDir;
- const common::FileHeaderContext &_fileHeaderContext;
- bool _markedDeleted;
+ DomainConfig _config;
+ SerialNum _lastSerial;
+ std::unique_ptr<Executor> _singleCommitter;
+ Executor &_executor;
+ std::atomic<int> _sessionId;
+ vespalib::Monitor _syncMonitor;
+ bool _pendingSync;
+ vespalib::string _name;
+ DomainPartList _parts;
+ vespalib::Lock _lock;
+ vespalib::Monitor _currentChunkMonitor;
+ vespalib::Lock _sessionLock;
+ SessionList _sessions;
+ DurationSeconds _maxSessionRunTime;
+ vespalib::string _baseDir;
+ const FileHeaderContext &_fileHeaderContext;
+ bool _markedDeleted;
};
}
diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp b/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp
index 269ed7e9380..7be3dd708a5 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp
+++ b/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp
@@ -572,7 +572,8 @@ TransLogServer::domainCommit(FRT_RPCRequest *req)
Packet packet(params[1]._data._buf, params[1]._data._len);
try {
vespalib::Gate gate;
- domain->commit(packet, make_shared<GateCallback>(gate));
+ domain->append(packet, make_shared<GateCallback>(gate));
+ auto keep = domain->startCommit(make_shared<IgnoreCallback>());
gate.await();
ret.AddInt32(0);
ret.AddString("ok");