summaryrefslogtreecommitdiffstats
path: root/searchlib
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2017-10-18 09:18:37 +0200
committerHenning Baldersheim <balder@yahoo-inc.com>2018-01-10 09:56:22 +0100
commitf148226ecd5daf4ae23f642bd3f731db748ddbdd (patch)
tree9098aa59d26d4fa8f058ac7e18d59da280535905 /searchlib
parent2b55ae64f687e83e72c4f5c0062ff482c818d47e (diff)
merge must handle empty destination.
Diffstat (limited to 'searchlib')
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/common.cpp5
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/domain.cpp13
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/domain.h2
3 files changed, 16 insertions, 4 deletions
diff --git a/searchlib/src/vespa/searchlib/transactionlog/common.cpp b/searchlib/src/vespa/searchlib/transactionlog/common.cpp
index b6009111e3e..7d919f9ad2e 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/common.cpp
+++ b/searchlib/src/vespa/searchlib/transactionlog/common.cpp
@@ -51,8 +51,11 @@ Packet::Packet(const void * buf, size_t sz) :
bool Packet::merge(const Packet & packet)
{
- bool retval(_range.to() < packet._range.from());
+ bool retval(_range.to() < packet.range().from());
if (retval) {
+ if (_buf.empty()) {
+ _range.from(packet.range().from());
+ }
_count += packet._count;
_range.to(packet._range.to());
_buf.write(packet.getHandle().c_str(), packet.getHandle().size());
diff --git a/searchlib/src/vespa/searchlib/transactionlog/domain.cpp b/searchlib/src/vespa/searchlib/transactionlog/domain.cpp
index d7711c80051..3248a0aba91 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/domain.cpp
+++ b/searchlib/src/vespa/searchlib/transactionlog/domain.cpp
@@ -28,6 +28,7 @@ Domain::Domain(const string &domainName, const string & baseDir, FastOS_ThreadPo
Executor & commitExecutor, Executor & sessionExecutor, uint64_t domainPartSize,
DomainPart::Crc defaultCrcType, const FileHeaderContext &fileHeaderContext) :
_currentChunk(std::make_unique<Chunk>()),
+ _lastSerial(0),
_defaultCrcType(defaultCrcType),
_threadPool(threadPool),
_commitExecutor(commitExecutor),
@@ -68,6 +69,7 @@ Domain::Domain(const string &domainName, const string & baseDir, FastOS_ThreadPo
if (_parts.empty() || _parts.crbegin()->second->isClosed()) {
_parts[lastPart].reset(new DomainPart(_name, dir(), lastPart, _defaultCrcType, _fileHeaderContext, false));
}
+ _lastSerial = end();
_self = _threadPool.NewThread(this);
assert(_self);
}
@@ -156,7 +158,7 @@ Domain::begin(const LockGuard & guard) const
assert(guard.locks(_lock));
SerialNum s(0);
if ( ! _parts.empty() ) {
- s = _parts.begin()->second->range().from();
+ s = _parts.cbegin()->second->range().from();
}
return s;
}
@@ -174,7 +176,7 @@ Domain::end(const LockGuard & guard) const
assert(guard.locks(_lock));
SerialNum s(0);
if ( ! _parts.empty() ) {
- s = _parts.rbegin()->second->range().to();
+ s = _parts.crbegin()->second->range().to();
}
return s;
}
@@ -327,9 +329,14 @@ Domain::Chunk::age() const {
void
Domain::commit(const Packet & packet, Writer::DoneCallback onDone) {
-
std::unique_ptr<Chunk> completed;
vespalib::MonitorGuard guard(_currentChunkMonitor);
+ if (! (_lastSerial < packet.range().from())) {
+ throw runtime_error(make_string("Incomming serial number(%ld) must be bigger than the last one (%ld).",
+ packet.range().from(), _lastSerial));
+ } else {
+ _lastSerial = packet.range().to();
+ }
_currentChunk->add(packet, std::move(onDone));
if (_currentChunk->sizeBytes() > _chunkSizeLimit) {
completed = grabCurrentChunk(guard);
diff --git a/searchlib/src/vespa/searchlib/transactionlog/domain.h b/searchlib/src/vespa/searchlib/transactionlog/domain.h
index 6d508e6d72e..7eb8e201d67 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/domain.h
+++ b/searchlib/src/vespa/searchlib/transactionlog/domain.h
@@ -83,6 +83,7 @@ private:
void commitIfStale(const vespalib::MonitorGuard & guard);
class Chunk {
public:
+
void add(const Packet & packet, Writer::DoneCallback onDone);
size_t sizeBytes() const { return _data.sizeBytes(); }
const Packet & getPacket() const { return _data; }
@@ -112,6 +113,7 @@ private:
using DurationSeconds = std::chrono::duration<double>;
std::unique_ptr<Chunk> _currentChunk;
+ SerialNum _lastSerial;
DomainPart::Crc _defaultCrcType;
FastOS_ThreadPool & _threadPool;
Executor & _commitExecutor;