diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2020-09-22 12:15:25 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-09-22 12:15:25 +0200 |
commit | 804e8057c2eca61ec9bc8985430613e0731922a2 (patch) | |
tree | b9bca78fe11e1bb0a0bef13a4380b3de683271ef /searchlib/src | |
parent | 1fa18524f943ae986f6526db01945d41de2f9d6d (diff) | |
parent | 20d0f96d2e6f276669293780adc67421aea0dee2 (diff) |
Merge pull request #14476 from vespa-engine/balder/add-commit-result
Balder/add commit result
Diffstat (limited to 'searchlib/src')
7 files changed, 93 insertions, 61 deletions
diff --git a/searchlib/src/tests/transactionlog/translogclient_test.cpp b/searchlib/src/tests/transactionlog/translogclient_test.cpp index 9e5021b4778..fffb70467a3 100644 --- a/searchlib/src/tests/transactionlog/translogclient_test.cpp +++ b/searchlib/src/tests/transactionlog/translogclient_test.cpp @@ -329,11 +329,12 @@ fillDomainTest(TransLogServer & s1, const vespalib::string & domain, size_t numP Packet::Entry e(value+1, j+1, vespalib::ConstBufferRef((const char *)&value, sizeof(value))); p->add(e); if ( p->sizeBytes() > DEFAULT_PACKET_SIZE ) { - domainWriter->commit(*p, std::make_shared<CountDone>(inFlight)); + domainWriter->append(*p, std::make_shared<CountDone>(inFlight)); p = std::make_unique<Packet>(DEFAULT_PACKET_SIZE); } } - domainWriter->commit(*p, std::make_shared<CountDone>(inFlight)); + domainWriter->append(*p, std::make_shared<CountDone>(inFlight)); + auto keep = domainWriter->startCommit(Writer::DoneCallback()); LOG(info, "Inflight %ld", inFlight.load()); } while (inFlight.load() != 0) { diff --git a/searchlib/src/vespa/searchlib/common/fileheadercontext.h b/searchlib/src/vespa/searchlib/common/fileheadercontext.h index 6f76fe1717d..8bb3d6a56a6 100644 --- a/searchlib/src/vespa/searchlib/common/fileheadercontext.h +++ b/searchlib/src/vespa/searchlib/common/fileheadercontext.h @@ -3,40 +3,22 @@ #include <vespa/vespalib/stllike/string.h> -namespace vespalib -{ - -class GenericHeader; - +namespace vespalib { + class GenericHeader; } -namespace search -{ - -namespace common -{ +namespace search::common { class FileHeaderContext { public: FileHeaderContext(); + virtual ~FileHeaderContext(); - virtual - ~FileHeaderContext(); + virtual void addTags(vespalib::GenericHeader &header, const vespalib::string &name) const = 0; - virtual void - addTags(vespalib::GenericHeader &header, - const vespalib::string &name) const = 0; - - static void - addCreateAndFreezeTime(vespalib::GenericHeader &header); - - static void - setFreezeTime(vespalib::GenericHeader &header); + static void addCreateAndFreezeTime(vespalib::GenericHeader &header); + static void setFreezeTime(vespalib::GenericHeader &header); }; - -} // namespace common - -} // namespace search - +} diff --git a/searchlib/src/vespa/searchlib/transactionlog/common.cpp b/searchlib/src/vespa/searchlib/transactionlog/common.cpp index 556ebca06ec..3308f3182dc 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/common.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/common.cpp @@ -121,11 +121,26 @@ Packet::add(const Packet::Entry & e) _range.to(e.serial()); } +Writer::CommitResult::CommitResult() + : _callBacks() +{} +Writer::CommitResult::CommitResult( CommitPayload commitPayLoad) + : _callBacks(std::move(commitPayLoad)) +{} + +Writer::CommitResult::~CommitResult() = default; + CommitChunk::CommitChunk(size_t reserveBytes, size_t reserveCount) : _data(reserveBytes), - _callBacks() + _callBacks(std::make_shared<Writer::DoneCallbacksList>()) +{ + _callBacks->reserve(reserveCount); +} + +CommitChunk::CommitChunk(size_t reserveBytes, Writer::CommitPayload postponed) + : _data(reserveBytes), + _callBacks(std::move(postponed)) { - _callBacks.reserve(reserveCount); } CommitChunk::~CommitChunk() = default; @@ -133,7 +148,12 @@ CommitChunk::~CommitChunk() = default; void CommitChunk::add(const Packet &packet, Writer::DoneCallback onDone) { _data.merge(packet); - _callBacks.emplace_back(std::move(onDone)); + _callBacks->emplace_back(std::move(onDone)); +} + +Writer::CommitResult +CommitChunk::createCommitResult() const { + return Writer::CommitResult(_callBacks); } } diff --git a/searchlib/src/vespa/searchlib/transactionlog/common.h b/searchlib/src/vespa/searchlib/transactionlog/common.h index 7cdfad44b87..5d07d51cdf2 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/common.h +++ b/searchlib/src/vespa/searchlib/transactionlog/common.h @@ -16,7 +16,7 @@ class SerialNumRange { public: SerialNumRange() : _from(0), _to(0) { } - SerialNumRange(SerialNum f) : _from(f), _to(f ? f-1 : f) { } + explicit SerialNumRange(SerialNum f) : _from(f), _to(f ? f-1 : f) { } SerialNumRange(SerialNum f, SerialNum t) : _from(f), _to(t) { } bool operator == (const SerialNumRange & b) const { return cmp(b) == 0; } bool operator < (const SerialNumRange & b) const { return cmp(b) < 0; } @@ -63,7 +63,7 @@ public: vespalib::ConstBufferRef _data; }; public: - Packet(size_t reserved) : _count(0), _range(), _buf(reserved) { } + explicit Packet(size_t reserved) : _count(0), _range(), _buf(reserved) { } Packet(const void * buf, size_t sz); void add(const Entry & data); void clear() { _buf.clear(); _count = 0; _range.from(0); _range.to(0); } @@ -84,8 +84,24 @@ int makeDirectory(const char * dir); class Writer { public: using DoneCallback = std::shared_ptr<IDestructorCallback>; + using DoneCallbacksList = std::vector<DoneCallback>; + using CommitPayload = std::shared_ptr<DoneCallbacksList>; + class CommitResult { + public: + CommitResult(); + CommitResult(CommitPayload callBacks); + CommitResult(CommitResult &&) noexcept = default; + CommitResult & operator = (CommitResult &&) noexcept = default; + CommitResult(const CommitResult &) = delete; + CommitResult & operator = (const CommitResult &) = delete; + ~CommitResult(); + size_t getNumOperations() const { return _callBacks->size(); } + private: + CommitPayload _callBacks; + }; virtual ~Writer() = default; - virtual void commit(const Packet & packet, DoneCallback done) = 0; + virtual void append(const Packet & packet, DoneCallback done) = 0; + [[nodiscard]] virtual CommitResult startCommit(DoneCallback onDone) = 0; }; class WriterFactory { @@ -106,14 +122,20 @@ public: class CommitChunk { public: CommitChunk(size_t reserveBytes, size_t reserveCount); + CommitChunk(size_t reserveBytes, Writer::CommitPayload postponed); ~CommitChunk(); + bool empty() const { return _callBacks->empty(); } void add(const Packet & packet, Writer::DoneCallback onDone); size_t sizeBytes() const { return _data.sizeBytes(); } const Packet & getPacket() const { return _data; } - size_t getNumCallBacks() const { return _callBacks.size(); } + size_t getNumCallBacks() const { return _callBacks->size(); } + Writer::CommitResult createCommitResult() const; + void setCommitDoneCallback(Writer::DoneCallback onDone) { _onCommitDone = std::move(onDone); } + Writer::CommitPayload stealCallbacks() { return std::move(_callBacks); } private: - Packet _data; - std::vector<Writer::DoneCallback> _callBacks; + Packet _data; + Writer::CommitPayload _callBacks; + Writer::DoneCallback _onCommitDone; }; } diff --git a/searchlib/src/vespa/searchlib/transactionlog/domain.cpp b/searchlib/src/vespa/searchlib/transactionlog/domain.cpp index 415ccafda70..ff2d963ccd9 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/domain.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/domain.cpp @@ -25,7 +25,6 @@ using vespalib::makeClosure; using vespalib::makeLambdaTask; using vespalib::Monitor; using vespalib::MonitorGuard; -using search::common::FileHeaderContext; using std::runtime_error; using std::make_shared; @@ -35,7 +34,7 @@ Domain::Domain(const string &domainName, const string & baseDir, Executor & exec const DomainConfig & cfg, const FileHeaderContext &fileHeaderContext) : _config(cfg), _lastSerial(0), - _singleCommiter(std::make_unique<vespalib::ThreadStackExecutor>(1, 128*1024)), + _singleCommitter(std::make_unique<vespalib::ThreadStackExecutor>(1, 128 * 1024)), _executor(executor), _sessionId(1), _syncMonitor(), @@ -310,8 +309,14 @@ Domain::optionallyRotateFile(SerialNum serialNum) { return dp; } +Domain::CommitResult +Domain::startCommit(DoneCallback onDone) { + (void) onDone; + return CommitResult(); +} + void -Domain::commit(const Packet & packet, Writer::DoneCallback onDone) +Domain::append(const Packet & packet, Writer::DoneCallback onDone) { (void) onDone; vespalib::nbostream_longlivedbuf is(packet.getHandle().data(), packet.getHandle().size()); diff --git a/searchlib/src/vespa/searchlib/transactionlog/domain.h b/searchlib/src/vespa/searchlib/transactionlog/domain.h index 9adff564cc8..361ac8c1805 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/domain.h +++ b/searchlib/src/vespa/searchlib/transactionlog/domain.h @@ -18,8 +18,9 @@ public: using SP = std::shared_ptr<Domain>; using Executor = vespalib::SyncableThreadExecutor; using DomainPartSP = std::shared_ptr<DomainPart>; + using FileHeaderContext = common::FileHeaderContext; Domain(const vespalib::string &name, const vespalib::string &baseDir, Executor & executor, - const DomainConfig & cfg, const common::FileHeaderContext &fileHeaderContext); + const DomainConfig & cfg, const FileHeaderContext &fileHeaderContext); ~Domain() override; @@ -27,14 +28,14 @@ public: const vespalib::string & name() const { return _name; } bool erase(SerialNum to); - void commit(const Packet & packet, Writer::DoneCallback onDone) override; + void append(const Packet & packet, Writer::DoneCallback onDone) override; + [[nodiscard]] CommitResult startCommit(DoneCallback onDone) override; int visit(const Domain::SP & self, SerialNum from, SerialNum to, std::unique_ptr<Destination> dest); SerialNum begin() const; SerialNum end() const; SerialNum getSynced() const; void triggerSyncNow(); - bool commitIfStale(); bool getMarkedDeleted() const { return _markedDeleted; } void markDeleted() { _markedDeleted = true; } @@ -72,23 +73,23 @@ private: using DomainPartList = std::map<SerialNum, DomainPartSP>; using DurationSeconds = std::chrono::duration<double>; - DomainConfig _config; - SerialNum _lastSerial; - std::unique_ptr<Executor> _singleCommiter; - Executor & _executor; - std::atomic<int> _sessionId; - vespalib::Monitor _syncMonitor; - bool _pendingSync; - vespalib::string _name; - DomainPartList _parts; - vespalib::Lock _lock; - vespalib::Monitor _currentChunkMonitor; - vespalib::Lock _sessionLock; - SessionList _sessions; - DurationSeconds _maxSessionRunTime; - vespalib::string _baseDir; - const common::FileHeaderContext &_fileHeaderContext; - bool _markedDeleted; + DomainConfig _config; + SerialNum _lastSerial; + std::unique_ptr<Executor> _singleCommitter; + Executor &_executor; + std::atomic<int> _sessionId; + vespalib::Monitor _syncMonitor; + bool _pendingSync; + vespalib::string _name; + DomainPartList _parts; + vespalib::Lock _lock; + vespalib::Monitor _currentChunkMonitor; + vespalib::Lock _sessionLock; + SessionList _sessions; + DurationSeconds _maxSessionRunTime; + vespalib::string _baseDir; + const FileHeaderContext &_fileHeaderContext; + bool _markedDeleted; }; } diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp b/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp index 269ed7e9380..7be3dd708a5 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp @@ -572,7 +572,8 @@ TransLogServer::domainCommit(FRT_RPCRequest *req) Packet packet(params[1]._data._buf, params[1]._data._len); try { vespalib::Gate gate; - domain->commit(packet, make_shared<GateCallback>(gate)); + domain->append(packet, make_shared<GateCallback>(gate)); + auto keep = domain->startCommit(make_shared<IgnoreCallback>()); gate.await(); ret.AddInt32(0); ret.AddString("ok"); |