summaryrefslogtreecommitdiffstats
path: root/searchlib
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2017-10-21 14:36:21 +0200
committerHenning Baldersheim <balder@yahoo-inc.com>2018-01-10 09:56:24 +0100
commit4c8c90f9ef909041e0066ac9b4ada817c495c5ba (patch)
tree10821a22249489e9bb2042b30a86c5867fb30800 /searchlib
parent2d8516e70764385df6e5bad05c319c27b7eb571a (diff)
Add decoders for compressed TLS too.
Diffstat (limited to 'searchlib')
-rw-r--r--searchlib/src/vespa/searchlib/config/translogserver.def10
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/chunks.cpp87
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/chunks.h18
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/ichunk.cpp47
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/ichunk.h4
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/translogserverapp.cpp37
6 files changed, 141 insertions, 62 deletions
diff --git a/searchlib/src/vespa/searchlib/config/translogserver.def b/searchlib/src/vespa/searchlib/config/translogserver.def
index 9d5e080e138..fa4b7ef3f3e 100644
--- a/searchlib/src/vespa/searchlib/config/translogserver.def
+++ b/searchlib/src/vespa/searchlib/config/translogserver.def
@@ -21,4 +21,12 @@ usefsync bool default=false restart
maxthreads int default=4 restart
##Default crc method used
-crcmethod enum {ccitt_crc32, xxh64, xxh64lz4} default=xxh64
+crcmethod enum {ccitt_crc32, xxh64} default=xxh64
+
+## Control compression type.
+compression.type enum {NONE, LZ4, ZSTD} default=LZ4
+
+## Control compression level
+## LZ4 has normal range 1..9 while ZSTD has range 1..19
+## 9 is a reasonable default for both
+compression.level int default=9
diff --git a/searchlib/src/vespa/searchlib/transactionlog/chunks.cpp b/searchlib/src/vespa/searchlib/transactionlog/chunks.cpp
index a41e538cf57..4c34aed74c0 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/chunks.cpp
+++ b/searchlib/src/vespa/searchlib/transactionlog/chunks.cpp
@@ -2,37 +2,50 @@
#include "chunks.h"
#include <vespa/vespalib/util/stringfmt.h>
+#include <vespa/vespalib/util/compressor.h>
+#include <vespa/vespalib/data/databuffer.h>
using std::runtime_error;
using std::make_unique;
using vespalib::make_string;
+using vespalib::compression::compress;
+using vespalib::compression::decompress;
+using vespalib::compression::CompressionConfig;
+using vespalib::DataBuffer;
+using vespalib::ConstBufferRef;
+using vespalib::nbostream;
namespace search::transactionlog {
-void CCITTCRC32None::onEncode(nbostream &os) {
- (void) os;
-}
-
-void CCITTCRC32None::onDecode(nbostream &is) {
- if (is.size() < sizeof(int32_t)) {
- throw runtime_error(make_string("Not even room for the crc. Only %zu bytes left", is.size()));
+namespace {
+void verifyCrc(nbostream & is, Encoding::Crc crcType) {
+ if (is.size() < sizeof(int32_t) * 2) {
+ throw runtime_error(make_string("Not even room for the crc and length. Only %zu bytes left", is.size()));
}
size_t start = is.rp();
is.adjustReadPos(is.size() - sizeof(int32_t));
int32_t crc(0);
is >> crc;
is.rp(start);
- int32_t crcVerify = Encoding::calcCrc(Encoding::Crc::ccitt_crc32, is.c_str(), is.size() - sizeof(crc));
+ int32_t crcVerify = Encoding::calcCrc(crcType, is.c_str(), is.size() - sizeof(crc));
if (crc != crcVerify) {
throw runtime_error(make_string("Got bad crc : crcVerify = %d, expected %d",
static_cast<int>(crcVerify), static_cast<int>(crc)));
}
is.rp(start);
- while (is.good() && (is.size() > sizeof(int32_t))) {
- Packet::Entry e;
- e.deserialize(is);
- add(e);
- }
+}
+
+}
+
+void CCITTCRC32None::onEncode(nbostream &os) {
+ (void) os;
+}
+
+void CCITTCRC32None::onDecode(nbostream &is) {
+ verifyCrc(is, Encoding::Crc::ccitt_crc32);
+ nbostream data(is.peek(), is.size() - sizeof(int32_t));
+ add(data);
+ is.adjustReadPos(is.size());
}
void XXH64None::onEncode(nbostream &os) {
@@ -40,25 +53,22 @@ void XXH64None::onEncode(nbostream &os) {
}
void XXH64None::onDecode(nbostream &is) {
- if (is.size() < sizeof(int32_t)) {
- throw runtime_error(make_string("Not even room for the crc. Only %zu bytes left", is.size()));
- }
- size_t start = is.rp();
- is.adjustReadPos(is.size() - sizeof(int32_t));
- int32_t crc(0);
- is >> crc;
- is.rp(start);
- int32_t crcVerify = Encoding::calcCrc(Encoding::Crc::xxh64, is.c_str(), is.size() - sizeof(crc));
- if (crc != crcVerify) {
- throw runtime_error(make_string("Got bad crc : crcVerify = %d, expected %d",
- static_cast<int>(crcVerify), static_cast<int>(crc)));
- }
- is.rp(start);
- while (is.good() && (is.size() > sizeof(int32_t))) {
- Packet::Entry e;
- e.deserialize(is);
- add(e);
- }
+ verifyCrc(is, Encoding::Crc::xxh64);
+ nbostream data(is.peek(), is.size() - sizeof(int32_t));
+ add(data);
+ is.adjustReadPos(is.size());
+}
+
+void
+XXH64Compressed::decompress(nbostream & is, vespalib::compression::CompressionConfig::Type type) {
+ uint32_t uncompressedLen;
+ is >> uncompressedLen;
+ vespalib::DataBuffer uncompressed;
+ ConstBufferRef compressed(is.peek(), is.size()-sizeof(uint32_t)*2);
+ ::decompress(type, uncompressedLen, compressed, uncompressed, false);
+ nbostream data(uncompressed.getData(), uncompressed.getDataLen());
+ add(data);
+ is.adjustReadPos(is.size());
}
void XXH64LZ4::onEncode(IChunk::nbostream &os) {
@@ -67,7 +77,18 @@ void XXH64LZ4::onEncode(IChunk::nbostream &os) {
}
void XXH64LZ4::onDecode(IChunk::nbostream &is) {
- (void) is;
+ verifyCrc(is, Encoding::Crc::xxh64);
+ decompress(is, CompressionConfig::LZ4);
+}
+
+void XXH64ZSTD::onEncode(IChunk::nbostream &os) {
+ (void) os;
+
+}
+
+void XXH64ZSTD::onDecode(IChunk::nbostream &is) {
+ verifyCrc(is, Encoding::Crc::xxh64);
+ decompress(is, CompressionConfig::ZSTD);
}
}
diff --git a/searchlib/src/vespa/searchlib/transactionlog/chunks.h b/searchlib/src/vespa/searchlib/transactionlog/chunks.h
index d531133e9b9..49a93244cce 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/chunks.h
+++ b/searchlib/src/vespa/searchlib/transactionlog/chunks.h
@@ -2,6 +2,7 @@
#pragma once
+#include <vespa/vespalib/util/compressionconfig.h>
#include "ichunk.h"
namespace search::transactionlog {
@@ -20,11 +21,24 @@ protected:
public:
};
-class XXH64LZ4 : public IChunk {
+class XXH64Compressed : public IChunk {
+public:
+ void setLevel(uint8_t level) { _compressionLevel = level; }
+protected:
+ void decompress(nbostream & os, vespalib::compression::CompressionConfig::Type type);
+private:
+ uint8_t _compressionLevel;
+};
+class XXH64LZ4 : public XXH64Compressed {
+protected:
+ void onEncode(nbostream &os) override;
+ void onDecode(nbostream &is) override;
+};
+
+class XXH64ZSTD : public XXH64Compressed {
protected:
void onEncode(nbostream &os) override;
void onDecode(nbostream &is) override;
-public:
};
}
diff --git a/searchlib/src/vespa/searchlib/transactionlog/ichunk.cpp b/searchlib/src/vespa/searchlib/transactionlog/ichunk.cpp
index d5a453cd5b0..f1822ead5c9 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/ichunk.cpp
+++ b/searchlib/src/vespa/searchlib/transactionlog/ichunk.cpp
@@ -28,6 +28,15 @@ IChunk::add(const Packet::Entry & entry) {
}
void
+IChunk::add(nbostream & is) {
+ while (is.good() && !is.empty()) {
+ Packet::Entry e;
+ e.deserialize(is);
+ add(e);
+ }
+}
+
+void
IChunk::encode(nbostream & ) {
}
@@ -40,22 +49,28 @@ IChunk::decode(nbostream & is) {
IChunk::UP
IChunk::create(uint8_t chunkType) {
Encoding encoding(chunkType);
- if (encoding.getCrc() == Encoding::Crc::xxh64) {
- if (encoding.getCompression() == Encoding::Compression::none) {
- return make_unique<XXH64None>();
- } else if (encoding.getCompression() == Encoding::Compression::lz4) {
- return make_unique<XXH64LZ4>();
- } else {
- throw runtime_error(make_string("Unhandled compression type '%d'", encoding.getCompression()));
- }
- } else if (encoding.getCrc() == Encoding::Crc::ccitt_crc32) {
- if (encoding.getCompression() == Encoding::Compression::none) {
- return make_unique<CCITTCRC32None>();
- } else {
- throw runtime_error(make_string("Unhandled compression type '%d'", encoding.getCompression()));
- }
- } else {
- throw runtime_error(make_string("Unhandled crc type '%d'", encoding.getCrc()));
+ switch (encoding.getCrc()) {
+ case Encoding::Crc::xxh64:
+ switch (encoding.getCompression()) {
+ case Encoding::Compression::none:
+ return make_unique<XXH64None>();
+ case Encoding::Compression::lz4:
+ return make_unique<XXH64LZ4>();
+ case Encoding::Compression::zstd:
+ return make_unique<XXH64ZSTD>();
+ default:
+ return make_unique<XXH64LZ4>();
+ }
+ case Encoding::Crc::ccitt_crc32:
+ switch (encoding.getCompression()) {
+ case Encoding::Compression::none:
+ return make_unique<CCITTCRC32None>();
+ default:
+ throw runtime_error(make_string("Unhandled compression type '%d' for ccitt_crc32 compression",
+ encoding.getCompression()));
+ }
+ default:
+ throw runtime_error(make_string("Unhandled crc type '%d'", encoding.getCrc()));
}
}
diff --git a/searchlib/src/vespa/searchlib/transactionlog/ichunk.h b/searchlib/src/vespa/searchlib/transactionlog/ichunk.h
index 6a5c7703d2d..cad1cc27495 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/ichunk.h
+++ b/searchlib/src/vespa/searchlib/transactionlog/ichunk.h
@@ -18,7 +18,8 @@ public:
};
enum Compression {
none = 0,
- lz4 = 1
+ lz4 = 1,
+ zstd = 2
};
Encoding(uint8_t raw) : _raw(raw) {}
Encoding(Crc crc, Compression compression);
@@ -45,6 +46,7 @@ public:
protected:
virtual void onEncode(nbostream & os) = 0;
virtual void onDecode(nbostream & is) = 0;
+ void add(nbostream & is);
private:
Entries _entries;
};
diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogserverapp.cpp b/searchlib/src/vespa/searchlib/transactionlog/translogserverapp.cpp
index e21d6b1af62..60447e0c098 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/translogserverapp.cpp
+++ b/searchlib/src/vespa/searchlib/transactionlog/translogserverapp.cpp
@@ -23,18 +23,37 @@ TransLogServerApp::TransLogServerApp(const config::ConfigUri & tlsConfigUri,
namespace {
-Encoding
-getEncoding(searchlib::TranslogserverConfig::Crcmethod crcType)
+
+Encoding::Crc
+getCrc(searchlib::TranslogserverConfig::Crcmethod type)
{
- switch (crcType) {
+ switch (type) {
case searchlib::TranslogserverConfig::ccitt_crc32:
- return Encoding(Encoding::Crc::ccitt_crc32, Encoding::Compression::none);
+ return Encoding::Crc::ccitt_crc32;
case searchlib::TranslogserverConfig::xxh64:
- return Encoding(Encoding::Crc::xxh64, Encoding::Compression::none);
- case searchlib::TranslogserverConfig::xxh64lz4:
- return Encoding(Encoding::Crc::xxh64, Encoding::Compression::lz4);
+ return Encoding::Crc::xxh64;
+ }
+ return Encoding::Crc::xxh64;
+}
+
+Encoding::Compression
+getCompression(searchlib::TranslogserverConfig::Compression::Type type)
+{
+ switch (type) {
+ case searchlib::TranslogserverConfig::Compression::NONE:
+ return Encoding::Compression::none;
+ case searchlib::TranslogserverConfig::Compression::LZ4:
+ return Encoding::Compression::lz4;
+ case searchlib::TranslogserverConfig::Compression::ZSTD:
+ return Encoding::Compression::zstd;
}
- abort();
+ return Encoding::Compression::lz4;
+}
+
+Encoding
+getEncoding(const searchlib::TranslogserverConfig & cfg)
+{
+ return Encoding(getCrc(cfg.crcmethod), getCompression(cfg.compression.type));
}
}
@@ -43,7 +62,7 @@ void TransLogServerApp::start()
{
std::shared_ptr<searchlib::TranslogserverConfig> c = _tlsConfig.get();
_tls.reset(new TransLogServer(c->servername, c->listenport, c->basedir, _fileHeaderContext,
- c->filesizemax, c->maxthreads, getEncoding(c->crcmethod)));
+ c->filesizemax, c->maxthreads, getEncoding(*c)));
}
TransLogServerApp::~TransLogServerApp()