diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2017-10-21 14:36:21 +0200 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2018-01-10 09:56:24 +0100 |
commit | 4c8c90f9ef909041e0066ac9b4ada817c495c5ba (patch) | |
tree | 10821a22249489e9bb2042b30a86c5867fb30800 /searchlib | |
parent | 2d8516e70764385df6e5bad05c319c27b7eb571a (diff) |
Add decoders for compressed TLS too.
Diffstat (limited to 'searchlib')
6 files changed, 141 insertions, 62 deletions
diff --git a/searchlib/src/vespa/searchlib/config/translogserver.def b/searchlib/src/vespa/searchlib/config/translogserver.def index 9d5e080e138..fa4b7ef3f3e 100644 --- a/searchlib/src/vespa/searchlib/config/translogserver.def +++ b/searchlib/src/vespa/searchlib/config/translogserver.def @@ -21,4 +21,12 @@ usefsync bool default=false restart maxthreads int default=4 restart ##Default crc method used -crcmethod enum {ccitt_crc32, xxh64, xxh64lz4} default=xxh64 +crcmethod enum {ccitt_crc32, xxh64} default=xxh64 + +## Control compression type. +compression.type enum {NONE, LZ4, ZSTD} default=LZ4 + +## Control compression level +## LZ4 has normal range 1..9 while ZSTD has range 1..19 +## 9 is a reasonable default for both +compression.level int default=9 diff --git a/searchlib/src/vespa/searchlib/transactionlog/chunks.cpp b/searchlib/src/vespa/searchlib/transactionlog/chunks.cpp index a41e538cf57..4c34aed74c0 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/chunks.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/chunks.cpp @@ -2,37 +2,50 @@ #include "chunks.h" #include <vespa/vespalib/util/stringfmt.h> +#include <vespa/vespalib/util/compressor.h> +#include <vespa/vespalib/data/databuffer.h> using std::runtime_error; using std::make_unique; using vespalib::make_string; +using vespalib::compression::compress; +using vespalib::compression::decompress; +using vespalib::compression::CompressionConfig; +using vespalib::DataBuffer; +using vespalib::ConstBufferRef; +using vespalib::nbostream; namespace search::transactionlog { -void CCITTCRC32None::onEncode(nbostream &os) { - (void) os; -} - -void CCITTCRC32None::onDecode(nbostream &is) { - if (is.size() < sizeof(int32_t)) { - throw runtime_error(make_string("Not even room for the crc. Only %zu bytes left", is.size())); +namespace { +void verifyCrc(nbostream & is, Encoding::Crc crcType) { + if (is.size() < sizeof(int32_t) * 2) { + throw runtime_error(make_string("Not even room for the crc and length. Only %zu bytes left", is.size())); } size_t start = is.rp(); is.adjustReadPos(is.size() - sizeof(int32_t)); int32_t crc(0); is >> crc; is.rp(start); - int32_t crcVerify = Encoding::calcCrc(Encoding::Crc::ccitt_crc32, is.c_str(), is.size() - sizeof(crc)); + int32_t crcVerify = Encoding::calcCrc(crcType, is.c_str(), is.size() - sizeof(crc)); if (crc != crcVerify) { throw runtime_error(make_string("Got bad crc : crcVerify = %d, expected %d", static_cast<int>(crcVerify), static_cast<int>(crc))); } is.rp(start); - while (is.good() && (is.size() > sizeof(int32_t))) { - Packet::Entry e; - e.deserialize(is); - add(e); - } +} + +} + +void CCITTCRC32None::onEncode(nbostream &os) { + (void) os; +} + +void CCITTCRC32None::onDecode(nbostream &is) { + verifyCrc(is, Encoding::Crc::ccitt_crc32); + nbostream data(is.peek(), is.size() - sizeof(int32_t)); + add(data); + is.adjustReadPos(is.size()); } void XXH64None::onEncode(nbostream &os) { @@ -40,25 +53,22 @@ void XXH64None::onEncode(nbostream &os) { } void XXH64None::onDecode(nbostream &is) { - if (is.size() < sizeof(int32_t)) { - throw runtime_error(make_string("Not even room for the crc. Only %zu bytes left", is.size())); - } - size_t start = is.rp(); - is.adjustReadPos(is.size() - sizeof(int32_t)); - int32_t crc(0); - is >> crc; - is.rp(start); - int32_t crcVerify = Encoding::calcCrc(Encoding::Crc::xxh64, is.c_str(), is.size() - sizeof(crc)); - if (crc != crcVerify) { - throw runtime_error(make_string("Got bad crc : crcVerify = %d, expected %d", - static_cast<int>(crcVerify), static_cast<int>(crc))); - } - is.rp(start); - while (is.good() && (is.size() > sizeof(int32_t))) { - Packet::Entry e; - e.deserialize(is); - add(e); - } + verifyCrc(is, Encoding::Crc::xxh64); + nbostream data(is.peek(), is.size() - sizeof(int32_t)); + add(data); + is.adjustReadPos(is.size()); +} + +void +XXH64Compressed::decompress(nbostream & is, vespalib::compression::CompressionConfig::Type type) { + uint32_t uncompressedLen; + is >> uncompressedLen; + vespalib::DataBuffer uncompressed; + ConstBufferRef compressed(is.peek(), is.size()-sizeof(uint32_t)*2); + ::decompress(type, uncompressedLen, compressed, uncompressed, false); + nbostream data(uncompressed.getData(), uncompressed.getDataLen()); + add(data); + is.adjustReadPos(is.size()); } void XXH64LZ4::onEncode(IChunk::nbostream &os) { @@ -67,7 +77,18 @@ void XXH64LZ4::onEncode(IChunk::nbostream &os) { } void XXH64LZ4::onDecode(IChunk::nbostream &is) { - (void) is; + verifyCrc(is, Encoding::Crc::xxh64); + decompress(is, CompressionConfig::LZ4); +} + +void XXH64ZSTD::onEncode(IChunk::nbostream &os) { + (void) os; + +} + +void XXH64ZSTD::onDecode(IChunk::nbostream &is) { + verifyCrc(is, Encoding::Crc::xxh64); + decompress(is, CompressionConfig::ZSTD); } } diff --git a/searchlib/src/vespa/searchlib/transactionlog/chunks.h b/searchlib/src/vespa/searchlib/transactionlog/chunks.h index d531133e9b9..49a93244cce 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/chunks.h +++ b/searchlib/src/vespa/searchlib/transactionlog/chunks.h @@ -2,6 +2,7 @@ #pragma once +#include <vespa/vespalib/util/compressionconfig.h> #include "ichunk.h" namespace search::transactionlog { @@ -20,11 +21,24 @@ protected: public: }; -class XXH64LZ4 : public IChunk { +class XXH64Compressed : public IChunk { +public: + void setLevel(uint8_t level) { _compressionLevel = level; } +protected: + void decompress(nbostream & os, vespalib::compression::CompressionConfig::Type type); +private: + uint8_t _compressionLevel; +}; +class XXH64LZ4 : public XXH64Compressed { +protected: + void onEncode(nbostream &os) override; + void onDecode(nbostream &is) override; +}; + +class XXH64ZSTD : public XXH64Compressed { protected: void onEncode(nbostream &os) override; void onDecode(nbostream &is) override; -public: }; } diff --git a/searchlib/src/vespa/searchlib/transactionlog/ichunk.cpp b/searchlib/src/vespa/searchlib/transactionlog/ichunk.cpp index d5a453cd5b0..f1822ead5c9 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/ichunk.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/ichunk.cpp @@ -28,6 +28,15 @@ IChunk::add(const Packet::Entry & entry) { } void +IChunk::add(nbostream & is) { + while (is.good() && !is.empty()) { + Packet::Entry e; + e.deserialize(is); + add(e); + } +} + +void IChunk::encode(nbostream & ) { } @@ -40,22 +49,28 @@ IChunk::decode(nbostream & is) { IChunk::UP IChunk::create(uint8_t chunkType) { Encoding encoding(chunkType); - if (encoding.getCrc() == Encoding::Crc::xxh64) { - if (encoding.getCompression() == Encoding::Compression::none) { - return make_unique<XXH64None>(); - } else if (encoding.getCompression() == Encoding::Compression::lz4) { - return make_unique<XXH64LZ4>(); - } else { - throw runtime_error(make_string("Unhandled compression type '%d'", encoding.getCompression())); - } - } else if (encoding.getCrc() == Encoding::Crc::ccitt_crc32) { - if (encoding.getCompression() == Encoding::Compression::none) { - return make_unique<CCITTCRC32None>(); - } else { - throw runtime_error(make_string("Unhandled compression type '%d'", encoding.getCompression())); - } - } else { - throw runtime_error(make_string("Unhandled crc type '%d'", encoding.getCrc())); + switch (encoding.getCrc()) { + case Encoding::Crc::xxh64: + switch (encoding.getCompression()) { + case Encoding::Compression::none: + return make_unique<XXH64None>(); + case Encoding::Compression::lz4: + return make_unique<XXH64LZ4>(); + case Encoding::Compression::zstd: + return make_unique<XXH64ZSTD>(); + default: + return make_unique<XXH64LZ4>(); + } + case Encoding::Crc::ccitt_crc32: + switch (encoding.getCompression()) { + case Encoding::Compression::none: + return make_unique<CCITTCRC32None>(); + default: + throw runtime_error(make_string("Unhandled compression type '%d' for ccitt_crc32 compression", + encoding.getCompression())); + } + default: + throw runtime_error(make_string("Unhandled crc type '%d'", encoding.getCrc())); } } diff --git a/searchlib/src/vespa/searchlib/transactionlog/ichunk.h b/searchlib/src/vespa/searchlib/transactionlog/ichunk.h index 6a5c7703d2d..cad1cc27495 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/ichunk.h +++ b/searchlib/src/vespa/searchlib/transactionlog/ichunk.h @@ -18,7 +18,8 @@ public: }; enum Compression { none = 0, - lz4 = 1 + lz4 = 1, + zstd = 2 }; Encoding(uint8_t raw) : _raw(raw) {} Encoding(Crc crc, Compression compression); @@ -45,6 +46,7 @@ public: protected: virtual void onEncode(nbostream & os) = 0; virtual void onDecode(nbostream & is) = 0; + void add(nbostream & is); private: Entries _entries; }; diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogserverapp.cpp b/searchlib/src/vespa/searchlib/transactionlog/translogserverapp.cpp index e21d6b1af62..60447e0c098 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/translogserverapp.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/translogserverapp.cpp @@ -23,18 +23,37 @@ TransLogServerApp::TransLogServerApp(const config::ConfigUri & tlsConfigUri, namespace { -Encoding -getEncoding(searchlib::TranslogserverConfig::Crcmethod crcType) + +Encoding::Crc +getCrc(searchlib::TranslogserverConfig::Crcmethod type) { - switch (crcType) { + switch (type) { case searchlib::TranslogserverConfig::ccitt_crc32: - return Encoding(Encoding::Crc::ccitt_crc32, Encoding::Compression::none); + return Encoding::Crc::ccitt_crc32; case searchlib::TranslogserverConfig::xxh64: - return Encoding(Encoding::Crc::xxh64, Encoding::Compression::none); - case searchlib::TranslogserverConfig::xxh64lz4: - return Encoding(Encoding::Crc::xxh64, Encoding::Compression::lz4); + return Encoding::Crc::xxh64; + } + return Encoding::Crc::xxh64; +} + +Encoding::Compression +getCompression(searchlib::TranslogserverConfig::Compression::Type type) +{ + switch (type) { + case searchlib::TranslogserverConfig::Compression::NONE: + return Encoding::Compression::none; + case searchlib::TranslogserverConfig::Compression::LZ4: + return Encoding::Compression::lz4; + case searchlib::TranslogserverConfig::Compression::ZSTD: + return Encoding::Compression::zstd; } - abort(); + return Encoding::Compression::lz4; +} + +Encoding +getEncoding(const searchlib::TranslogserverConfig & cfg) +{ + return Encoding(getCrc(cfg.crcmethod), getCompression(cfg.compression.type)); } } @@ -43,7 +62,7 @@ void TransLogServerApp::start() { std::shared_ptr<searchlib::TranslogserverConfig> c = _tlsConfig.get(); _tls.reset(new TransLogServer(c->servername, c->listenport, c->basedir, _fileHeaderContext, - c->filesizemax, c->maxthreads, getEncoding(c->crcmethod))); + c->filesizemax, c->maxthreads, getEncoding(*c))); } TransLogServerApp::~TransLogServerApp() |