summaryrefslogtreecommitdiffstats
path: root/searchlib
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2021-12-06 13:23:50 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2021-12-06 13:27:42 +0000
commit65c26f63261639575911dcda9a5d43dfcf7ebb04 (patch)
tree6f80c844486ab92802f8ee032448b3e5ae550d3d /searchlib
parent8072a49a023e19d5bf1246c5ce46205d2bc10631 (diff)
- Separate encoding and actual persitence of the transaction log.
- Refactor in preparation of multithreaded encode.
Diffstat (limited to 'searchlib')
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/domain.cpp26
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/domainpart.cpp65
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/domainpart.h10
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/ichunk.cpp45
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/ichunk.h16
5 files changed, 98 insertions, 64 deletions
diff --git a/searchlib/src/vespa/searchlib/transactionlog/domain.cpp b/searchlib/src/vespa/searchlib/transactionlog/domain.cpp
index 24943b53e6d..7fa87f3ec09 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/domain.cpp
+++ b/searchlib/src/vespa/searchlib/transactionlog/domain.cpp
@@ -56,11 +56,13 @@ Domain::Domain(const string &domainName, const string & baseDir, Executor & exec
_fileHeaderContext(fileHeaderContext),
_markedDeleted(false)
{
- int retval(0);
- if ((retval = makeDirectory(_baseDir.c_str())) != 0) {
+ assert(_config.getEncoding().getCompression() != Encoding::Compression::none);
+ int retval = makeDirectory(_baseDir.c_str());
+ if (retval != 0) {
throw runtime_error(fmt("Failed creating basedirectory %s r(%d), e(%d)", _baseDir.c_str(), retval, errno));
}
- if ((retval = makeDirectory(dir().c_str())) != 0) {
+ retval = makeDirectory(dir().c_str());
+ if (retval != 0) {
throw runtime_error(fmt("Failed creating domaindir %s r(%d), e(%d)", dir().c_str(), retval, errno));
}
SerialNumList partIdVector = scanDir();
@@ -76,8 +78,7 @@ Domain::Domain(const string &domainName, const string & baseDir, Executor & exec
}
pending.waitForZeroRefCount();
if (_parts.empty() || _parts.crbegin()->second->isClosed()) {
- _parts[lastPart] = std::make_shared<DomainPart>(_name, dir(), lastPart, _config.getEncoding(),
- _config.getCompressionlevel(), _fileHeaderContext, false);
+ _parts[lastPart] = std::make_shared<DomainPart>(_name, dir(), lastPart, _fileHeaderContext, false);
vespalib::File::sync(dir());
}
_lastSerial = end();
@@ -86,13 +87,13 @@ Domain::Domain(const string &domainName, const string & baseDir, Executor & exec
Domain &
Domain::setConfig(const DomainConfig & cfg) {
_config = cfg;
+ assert(_config.getEncoding().getCompression() != Encoding::Compression::none);
return *this;
}
void
Domain::addPart(SerialNum partId, bool isLastPart) {
- auto dp = std::make_shared<DomainPart>(_name, dir(), partId, _config.getEncoding(),
- _config.getCompressionlevel(), _fileHeaderContext, isLastPart);
+ auto dp = std::make_shared<DomainPart>(_name, dir(), partId, _fileHeaderContext, isLastPart);
if (dp->size() == 0) {
// Only last domain part is allowed to be truncated down to
// empty size.
@@ -331,8 +332,7 @@ Domain::optionallyRotateFile(SerialNum serialNum) {
triggerSyncNow({});
waitPendingSync(_syncMonitor, _syncCond, _pendingSync);
dp->close();
- dp = std::make_shared<DomainPart>(_name, dir(), serialNum, _config.getEncoding(),
- _config.getCompressionlevel(), _fileHeaderContext, false);
+ dp = std::make_shared<DomainPart>(_name, dir(), serialNum, _fileHeaderContext, false);
{
std::lock_guard guard(_lock);
_parts[serialNum] = dp;
@@ -399,14 +399,16 @@ Domain::commitChunk(std::unique_ptr<CommitChunk> chunk, const UniqueLock & chunk
}));
}
+
+
void
Domain::doCommit(std::unique_ptr<CommitChunk> chunk) {
const Packet & packet = chunk->getPacket();
if (packet.empty()) return;
- SerialNum firstSerial = packet.range().from();
- DomainPart::SP dp = optionallyRotateFile(firstSerial);
- dp->commit(firstSerial, packet);
+ SerializedChunk serialized(packet, _config.getEncoding(), _config.getCompressionlevel());
+ DomainPart::SP dp = optionallyRotateFile(packet.range().from());
+ dp->commit(serialized);
if (_config.getFSyncOnCommit()) {
dp->sync();
}
diff --git a/searchlib/src/vespa/searchlib/transactionlog/domainpart.cpp b/searchlib/src/vespa/searchlib/transactionlog/domainpart.cpp
index 3dad67df177..2ca2f15545d 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/domainpart.cpp
+++ b/searchlib/src/vespa/searchlib/transactionlog/domainpart.cpp
@@ -247,11 +247,9 @@ DomainPart::buildPacketMapping(bool allowTruncate)
return currPos;
}
-DomainPart::DomainPart(const string & name, const string & baseDir, SerialNum s, Encoding encoding,
- uint8_t compressionLevel, const FileHeaderContext &fileHeaderContext, bool allowTruncate)
- : _encoding(encoding),
- _compressionLevel(compressionLevel),
- _lock(),
+DomainPart::DomainPart(const string & name, const string & baseDir, SerialNum s,
+ const FileHeaderContext &fileHeaderContext, bool allowTruncate)
+ : _lock(),
_fileLock(),
_range(s),
_sz(0),
@@ -379,35 +377,21 @@ DomainPart::erase(SerialNum to)
}
void
-DomainPart::commit(SerialNum firstSerial, const Packet &packet)
+DomainPart::commit(const SerializedChunk & serialized)
{
+ SerialNumRange range = serialized.range();
+
int64_t firstPos(byteSize());
- nbostream_longlivedbuf h(packet.getHandle().data(), packet.getHandle().size());
+ assert(_range.to() < range.to());
+ _sz += serialized.getNumEntries();
+ _range.to(range.to());
if (_range.from() == 0) {
- _range.from(firstSerial);
- }
- IChunk::UP chunk = IChunk::create(_encoding, _compressionLevel);
- for (size_t i(0); h.size() > 0; i++) {
- //LOG(spam,
- //"Pos(%d) Len(%d), Lim(%d), Remaining(%d)",
- //h.getPos(), h.getLength(), h.getLimit(), h.getRemaining());
- Packet::Entry entry;
- entry.deserialize(h);
- if (_range.to() < entry.serial()) {
- chunk->add(entry);
- assert(_encoding.getCompression() != Encoding::Compression::none);
- _sz++;
- _range.to(entry.serial());
- } else {
- throw runtime_error(fmt("Incoming serial number(%" PRIu64 ") must be bigger than the last one (%" PRIu64 ").",
- entry.serial(), _range.to()));
- }
- }
- if ( ! chunk->getEntries().empty()) {
- write(*_transLog, *chunk);
+ _range.from(range.from());
}
+
+ write(*_transLog, range, serialized.getData());
std::lock_guard guard(_lock);
- _skipList.emplace_back(firstSerial, firstPos);
+ _skipList.emplace_back(range.from(), firstPos);
}
void
@@ -442,26 +426,15 @@ DomainPart::visit(FastOS_FileInterface &file, SerialNumRange &r, Packet &packet)
}
void
-DomainPart::write(FastOS_FileInterface &file, const IChunk & chunk)
+DomainPart::write(FastOS_FileInterface &file, SerialNumRange range, vespalib::ConstBufferRef buf)
{
- nbostream os;
- size_t begin = os.wp();
- os << _encoding.getRaw(); // Placeholder for encoding
- os << uint32_t(0); // Placeholder for size
- Encoding realEncoding = chunk.encode(os);
- size_t end = os.wp();
- os.wp(0);
- os << realEncoding.getRaw(); //Patching real encoding
- os << uint32_t(end - (begin + sizeof(uint32_t) + sizeof(uint8_t))); // Patching actual size.
- os.wp(end);
std::lock_guard guard(_writeLock);
- if ( ! file.CheckedWrite(os.data(), os.size()) ) {
- throw runtime_error(handleWriteError("Failed writing the entry.", file, byteSize(), chunk.range(), os.size()));
+ if ( ! file.CheckedWrite(buf.data(), buf.size()) ) {
+ throw runtime_error(handleWriteError("Failed writing the entry.", file, byteSize(), range, buf.size()));
}
- LOG(debug, "Wrote chunk with %zu entries and %zu bytes, range[%" PRIu64 ", %" PRIu64 "] encoding(wanted=%x, real=%x)",
- chunk.getEntries().size(), os.size(), chunk.range().from(), chunk.range().to(), _encoding.getRaw(), realEncoding.getRaw());
- _writtenSerial = chunk.range().to();
- _byteSize.fetch_add(os.size(), std::memory_order_release);
+ LOG(debug, "Wrote chunk with and %zu bytes, range[%" PRIu64 ", %" PRIu64 "]", buf.size(), range.from(), range.to());
+ _writtenSerial = range.to();
+ _byteSize.fetch_add(buf.size(), std::memory_order_release);
}
bool
diff --git a/searchlib/src/vespa/searchlib/transactionlog/domainpart.h b/searchlib/src/vespa/searchlib/transactionlog/domainpart.h
index 9ab0db54391..ea5290c433b 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/domainpart.h
+++ b/searchlib/src/vespa/searchlib/transactionlog/domainpart.h
@@ -19,13 +19,13 @@ public:
using SP = std::shared_ptr<DomainPart>;
DomainPart(const DomainPart &) = delete;
DomainPart& operator=(const DomainPart &) = delete;
- DomainPart(const vespalib::string &name, const vespalib::string &baseDir, SerialNum s, Encoding defaultEncoding,
- uint8_t compressionLevel, const common::FileHeaderContext &FileHeaderContext, bool allowTruncate);
+ DomainPart(const vespalib::string &name, const vespalib::string &baseDir, SerialNum s,
+ const common::FileHeaderContext &FileHeaderContext, bool allowTruncate);
~DomainPart();
const vespalib::string &fileName() const { return _fileName; }
- void commit(SerialNum firstSerial, const Packet &packet);
+ void commit(const SerializedChunk & serialized);
bool erase(SerialNum to);
bool visit(FastOS_FileInterface &file, SerialNumRange &r, Packet &packet);
bool close();
@@ -49,7 +49,7 @@ private:
static Packet readPacket(FastOS_FileInterface & file, SerialNumRange wanted, size_t targetSize, bool allowTruncate);
static bool read(FastOS_FileInterface &file, IChunk::UP & chunk, Alloc &buf, bool allowTruncate);
- void write(FastOS_FileInterface &file, const IChunk & entry);
+ void write(FastOS_FileInterface &file, SerialNumRange range, vespalib::ConstBufferRef buf);
void writeHeader(const common::FileHeaderContext &fileHeaderContext);
class SkipInfo
@@ -69,8 +69,6 @@ private:
SerialNum _id;
uint64_t _pos;
};
- const Encoding _encoding;
- const uint8_t _compressionLevel;
std::mutex _lock;
std::mutex _fileLock;
SerialNumRange _range;
diff --git a/searchlib/src/vespa/searchlib/transactionlog/ichunk.cpp b/searchlib/src/vespa/searchlib/transactionlog/ichunk.cpp
index ee1631ea8c2..e3d98cd576d 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/ichunk.cpp
+++ b/searchlib/src/vespa/searchlib/transactionlog/ichunk.cpp
@@ -8,6 +8,9 @@
#include <cassert>
#include <ostream>
+#include <vespa/log/log.h>
+LOG_SETUP(".searchlib.transactionlog.ichunk");
+
using std::make_unique;
using vespalib::make_string_short::fmt;
using vespalib::nbostream_longlivedbuf;
@@ -115,4 +118,46 @@ std::ostream &
operator << (std::ostream & os, Encoding e) {
return os << "crc=" << e.getCrc() << " compression=" << e.getCompression();
}
+
+void
+encode(vespalib::nbostream & os, const IChunk & chunk, Encoding encoding) {
+ size_t begin = os.wp();
+ os << encoding.getRaw(); // Placeholder for encoding
+ os << uint32_t(0); // Placeholder for size
+ Encoding realEncoding = chunk.encode(os);
+ size_t end = os.wp();
+ os.wp(0);
+ os << realEncoding.getRaw(); //Patching real encoding
+ os << uint32_t(end - (begin + sizeof(uint32_t) + sizeof(uint8_t))); // Patching actual size.
+ os.wp(end);
+ SerialNumRange range = chunk.range();
+ LOG(spam, "Encoded chunk with %zu entries and %zu bytes, range[%" PRIu64 ", %" PRIu64 "] encoding(wanted=%x, real=%x)",
+ chunk.getEntries().size(), os.size(), range.from(), range.to(), encoding.getRaw(), realEncoding.getRaw());
+}
+
+SerializedChunk::SerializedChunk(const Packet & packet, Encoding encoding, uint8_t compressionLevel)
+ : _os(),
+ _range(packet.range()),
+ _numEntries(packet.size())
+{
+ nbostream_longlivedbuf h(packet.getHandle().data(), packet.getHandle().size());
+
+ IChunk::UP chunk = IChunk::create(encoding, compressionLevel);
+ SerialNum prev = 0;
+ for (size_t i(0); h.size() > 0; i++) {
+ //LOG(spam,
+ //"Pos(%d) Len(%d), Lim(%d), Remaining(%d)",
+ //h.getPos(), h.getLength(), h.getLimit(), h.getRemaining());
+ Packet::Entry entry;
+ entry.deserialize(h);
+ assert (prev < entry.serial());
+ chunk->add(entry);
+ prev = entry.serial();
+ }
+ assert(! chunk->getEntries().empty());
+ encode(_os, *chunk, encoding);
+}
+vespalib::ConstBufferRef SerializedChunk::getData() const {
+ return vespalib::ConstBufferRef(_os.data(), _os.size());
+}
}
diff --git a/searchlib/src/vespa/searchlib/transactionlog/ichunk.h b/searchlib/src/vespa/searchlib/transactionlog/ichunk.h
index 02bd0ce9426..dccfd6617f5 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/ichunk.h
+++ b/searchlib/src/vespa/searchlib/transactionlog/ichunk.h
@@ -33,6 +33,22 @@ private:
std::ostream & operator << (std::ostream & os, Encoding e);
/**
+ * Represents a completely encoded chunk with a buffer ready to be persisted,
+ * and the range and number of entries it covers.
+ */
+class SerializedChunk {
+public:
+ SerializedChunk(const Packet & packet, Encoding encoding, uint8_t compressionLevel);
+ vespalib::ConstBufferRef getData() const;
+ SerialNumRange range() const { return _range; }
+ size_t getNumEntries() const { return _numEntries; }
+private:
+ vespalib::nbostream _os;
+ SerialNumRange _range;
+ size_t _numEntries;
+};
+
+/**
* Interface for different chunk formats.
* Format specifies both crc type, and compression type.
*/