diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2017-10-18 09:18:37 +0200 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2018-01-10 09:56:22 +0100 |
commit | f148226ecd5daf4ae23f642bd3f731db748ddbdd (patch) | |
tree | 9098aa59d26d4fa8f058ac7e18d59da280535905 /searchlib | |
parent | 2b55ae64f687e83e72c4f5c0062ff482c818d47e (diff) |
merge must handle empty destination.
Diffstat (limited to 'searchlib')
-rw-r--r-- | searchlib/src/vespa/searchlib/transactionlog/common.cpp | 5 | ||||
-rw-r--r-- | searchlib/src/vespa/searchlib/transactionlog/domain.cpp | 13 | ||||
-rw-r--r-- | searchlib/src/vespa/searchlib/transactionlog/domain.h | 2 |
3 files changed, 16 insertions, 4 deletions
diff --git a/searchlib/src/vespa/searchlib/transactionlog/common.cpp b/searchlib/src/vespa/searchlib/transactionlog/common.cpp index b6009111e3e..7d919f9ad2e 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/common.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/common.cpp @@ -51,8 +51,11 @@ Packet::Packet(const void * buf, size_t sz) : bool Packet::merge(const Packet & packet) { - bool retval(_range.to() < packet._range.from()); + bool retval(_range.to() < packet.range().from()); if (retval) { + if (_buf.empty()) { + _range.from(packet.range().from()); + } _count += packet._count; _range.to(packet._range.to()); _buf.write(packet.getHandle().c_str(), packet.getHandle().size()); diff --git a/searchlib/src/vespa/searchlib/transactionlog/domain.cpp b/searchlib/src/vespa/searchlib/transactionlog/domain.cpp index d7711c80051..3248a0aba91 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/domain.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/domain.cpp @@ -28,6 +28,7 @@ Domain::Domain(const string &domainName, const string & baseDir, FastOS_ThreadPo Executor & commitExecutor, Executor & sessionExecutor, uint64_t domainPartSize, DomainPart::Crc defaultCrcType, const FileHeaderContext &fileHeaderContext) : _currentChunk(std::make_unique<Chunk>()), + _lastSerial(0), _defaultCrcType(defaultCrcType), _threadPool(threadPool), _commitExecutor(commitExecutor), @@ -68,6 +69,7 @@ Domain::Domain(const string &domainName, const string & baseDir, FastOS_ThreadPo if (_parts.empty() || _parts.crbegin()->second->isClosed()) { _parts[lastPart].reset(new DomainPart(_name, dir(), lastPart, _defaultCrcType, _fileHeaderContext, false)); } + _lastSerial = end(); _self = _threadPool.NewThread(this); assert(_self); } @@ -156,7 +158,7 @@ Domain::begin(const LockGuard & guard) const assert(guard.locks(_lock)); SerialNum s(0); if ( ! _parts.empty() ) { - s = _parts.begin()->second->range().from(); + s = _parts.cbegin()->second->range().from(); } return s; } @@ -174,7 +176,7 @@ Domain::end(const LockGuard & guard) const assert(guard.locks(_lock)); SerialNum s(0); if ( ! _parts.empty() ) { - s = _parts.rbegin()->second->range().to(); + s = _parts.crbegin()->second->range().to(); } return s; } @@ -327,9 +329,14 @@ Domain::Chunk::age() const { void Domain::commit(const Packet & packet, Writer::DoneCallback onDone) { - std::unique_ptr<Chunk> completed; vespalib::MonitorGuard guard(_currentChunkMonitor); + if (! (_lastSerial < packet.range().from())) { + throw runtime_error(make_string("Incomming serial number(%ld) must be bigger than the last one (%ld).", + packet.range().from(), _lastSerial)); + } else { + _lastSerial = packet.range().to(); + } _currentChunk->add(packet, std::move(onDone)); if (_currentChunk->sizeBytes() > _chunkSizeLimit) { completed = grabCurrentChunk(guard); diff --git a/searchlib/src/vespa/searchlib/transactionlog/domain.h b/searchlib/src/vespa/searchlib/transactionlog/domain.h index 6d508e6d72e..7eb8e201d67 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/domain.h +++ b/searchlib/src/vespa/searchlib/transactionlog/domain.h @@ -83,6 +83,7 @@ private: void commitIfStale(const vespalib::MonitorGuard & guard); class Chunk { public: + void add(const Packet & packet, Writer::DoneCallback onDone); size_t sizeBytes() const { return _data.sizeBytes(); } const Packet & getPacket() const { return _data; } @@ -112,6 +113,7 @@ private: using DurationSeconds = std::chrono::duration<double>; std::unique_ptr<Chunk> _currentChunk; + SerialNum _lastSerial; DomainPart::Crc _defaultCrcType; FastOS_ThreadPool & _threadPool; Executor & _commitExecutor; |