diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2020-09-02 12:27:37 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-09-02 12:27:37 +0200 |
commit | d68713378b51b84a2d653bbb0cdbf00d78659473 (patch) | |
tree | fd13d599bc9bf2426e5415b11cf9972053a64f05 | |
parent | f5487d3d9eda0632b3762312c478c57d83178827 (diff) | |
parent | 05c5a7341b39a63299d7c95ca88045a34f21e3f7 (diff) |
Merge pull request #14235 from vespa-engine/balder/add-initial-chunks-with-compressions-and-multiple-entries
- Add contained support for current serialization formats.
10 files changed, 433 insertions, 13 deletions
diff --git a/searchlib/src/tests/transactionlog/CMakeLists.txt b/searchlib/src/tests/transactionlog/CMakeLists.txt index f096d79d2de..c4a637c9e15 100644 --- a/searchlib/src/tests/transactionlog/CMakeLists.txt +++ b/searchlib/src/tests/transactionlog/CMakeLists.txt @@ -7,3 +7,11 @@ vespa_add_executable(searchlib_translogclient_test_app TEST ) vespa_add_test(NAME searchlib_translogclient_test_app COMMAND ${CMAKE_CURRENT_SOURCE_DIR}/translogclient_test.sh DEPENDS searchlib_translogclient_test_app) + +vespa_add_executable(searchlib_translog_chunks_test_app TEST + SOURCES + chunks_test.cpp + DEPENDS + searchlib +) +vespa_add_test(NAME searchlib_translog_chunks_test_app COMMAND searchlib_translog_chunks_test_app) diff --git a/searchlib/src/tests/transactionlog/chunks_test.cpp b/searchlib/src/tests/transactionlog/chunks_test.cpp new file mode 100644 index 00000000000..de530884933 --- /dev/null +++ b/searchlib/src/tests/transactionlog/chunks_test.cpp @@ -0,0 +1,61 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include <vespa/searchlib/transactionlog/chunks.h> +#include <vespa/vespalib/testkit/testapp.h> + +#include <vespa/log/log.h> +LOG_SETUP("translog_chunks_test"); + +using namespace search::transactionlog; +using vespalib::ConstBufferRef; +using vespalib::nbostream; +using vespalib::compression::CompressionConfig; + +constexpr const char * TEXT = "abcdefghijklmnopqrstuvwxyz_abcdefghijklmnopqrstuvwxyz_abcdefghijklmnopqrstuvwxyz_abcdefghijklmnopqrstuvwxyz_abcdefghijklmnopqrstuvwxyz"; + +void +verifySerializationAndDeserialization(IChunk & org, size_t numEntries) { + for (size_t i(0); i < numEntries; i++) { + const char *start = TEXT + (i%20); + Packet::Entry entry(i, i%8, ConstBufferRef(start, strlen(start))); + org.add(entry); + } + nbostream os; + + Encoding encoding = org.encode(os); + auto deserialized = IChunk::create(encoding.getRaw()); + deserialized->decode(os); + EXPECT_TRUE(os.empty()); + EXPECT_EQUAL(numEntries, deserialized->getEntries().size()); +} + +TEST("test serialization and deserialization of current default uncompressed xxh64") { + XXH64NoneChunk chunk; + verifySerializationAndDeserialization(chunk, 1); +} + +TEST("test serialization and deserialization of legacy uncompressed ccittcrc32") { + CCITTCRC32NoneChunk chunk; + verifySerializationAndDeserialization(chunk, 1); +} + +TEST("test serialization and deserialization of future multientry xxh64 lz4 compression") { + for (size_t level(1); level < 9; level++) { + XXH64CompressedChunk chunk(CompressionConfig::Type::LZ4, level); + verifySerializationAndDeserialization(chunk, 100); + } +} + +TEST("test serialization and deserialization of future multientry xxh64 zstd compression") { + for (size_t level(1); level < 9; level++) { + XXH64CompressedChunk chunk(CompressionConfig::Type::ZSTD, level); + verifySerializationAndDeserialization(chunk, 100); + } +} + +TEST("test serialization and deserialization of future multientry xxh64 no compression") { + XXH64CompressedChunk chunk(CompressionConfig::Type::NONE_MULTI, 1); + verifySerializationAndDeserialization(chunk, 100); +} + +TEST_MAIN() { TEST_RUN_ALL(); } diff --git a/searchlib/src/vespa/searchlib/transactionlog/CMakeLists.txt b/searchlib/src/vespa/searchlib/transactionlog/CMakeLists.txt index bc889ec356d..4ead34552bd 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/CMakeLists.txt +++ b/searchlib/src/vespa/searchlib/transactionlog/CMakeLists.txt @@ -1,9 +1,11 @@ # Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. vespa_add_library(searchlib_transactionlog OBJECT SOURCES + chunks.cpp common.cpp domain.cpp domainpart.cpp + ichunk.cpp nosyncproxy.cpp session.cpp trans_log_server_explorer.cpp diff --git a/searchlib/src/vespa/searchlib/transactionlog/chunks.cpp b/searchlib/src/vespa/searchlib/transactionlog/chunks.cpp new file mode 100644 index 00000000000..bc740a27cdb --- /dev/null +++ b/searchlib/src/vespa/searchlib/transactionlog/chunks.cpp @@ -0,0 +1,133 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "chunks.h" +#include <vespa/vespalib/util/stringfmt.h> +#include <vespa/vespalib/util/compressor.h> +#include <vespa/vespalib/data/databuffer.h> + +using std::runtime_error; +using std::make_unique; +using vespalib::make_string; +using vespalib::compression::compress; +using vespalib::compression::decompress; +using vespalib::compression::CompressionConfig; +using vespalib::DataBuffer; +using vespalib::ConstBufferRef; +using vespalib::nbostream; + +namespace search::transactionlog { + +namespace { +void +verifyCrc(nbostream & is, Encoding::Crc crcType) { + if (is.size() < sizeof(int32_t) * 2) { + throw runtime_error(make_string("Not even room for the crc and length. Only %zu bytes left", is.size())); + } + size_t start = is.rp(); + is.adjustReadPos(is.size() - sizeof(int32_t)); + int32_t crc(0); + is >> crc; + is.rp(start); + int32_t crcVerify = Encoding::calcCrc(crcType, is.data() + start, is.size() - sizeof(crc)); + if (crc != crcVerify) { + throw runtime_error(make_string("Got bad crc : crcVerify = %d, expected %d", + static_cast<int>(crcVerify), static_cast<int>(crc))); + } +} + +Encoding::Compression +toCompression(CompressionConfig::Type type) { + switch (type) { + case CompressionConfig::ZSTD: + return Encoding::Compression::zstd; + case CompressionConfig::LZ4: + return Encoding::Compression::lz4; + case CompressionConfig::NONE_MULTI: + return Encoding::Compression::none_multi; + case CompressionConfig::NONE: + return Encoding::Compression::none; + default: + abort(); + } +} + +} + +Encoding +CCITTCRC32NoneChunk::onEncode(nbostream &os) const { + size_t start = os.wp(); + assert(getEntries().size() == 1); + serializeEntries(os); + os << int32_t(Encoding::calcCrc(Encoding::Crc::ccitt_crc32, os.data()+start, os.size() - start)); + return Encoding(Encoding::Crc::ccitt_crc32, Encoding::Compression::none); +} + +void +CCITTCRC32NoneChunk::onDecode(nbostream &is) { + verifyCrc(is, Encoding::Crc::ccitt_crc32); + nbostream data(is.peek(), is.size() - sizeof(int32_t)); + deserializeEntries(data); + is.adjustReadPos(is.size()); +} + +Encoding +XXH64NoneChunk::onEncode(nbostream &os) const { + size_t start = os.wp(); + assert(getEntries().size() == 1); + serializeEntries(os); + os << int32_t(Encoding::calcCrc(Encoding::Crc::xxh64, os.data()+start, os.size() - start)); + return Encoding(Encoding::Crc::xxh64, Encoding::Compression::none); +} + +void +XXH64NoneChunk::onDecode(nbostream &is) { + verifyCrc(is, Encoding::Crc::xxh64); + nbostream data(is.peek(), is.size() - sizeof(int32_t)); + deserializeEntries(data); + is.adjustReadPos(is.size()); +} + +void +XXH64CompressedChunk::decompress(nbostream & is, uint32_t uncompressedLen) { + vespalib::DataBuffer uncompressed; + ConstBufferRef compressed(is.peek(), is.size() - sizeof(int32_t)); + ::decompress(_type, uncompressedLen, compressed, uncompressed, false); + nbostream data(uncompressed.getData(), uncompressed.getDataLen()); + deserializeEntries(data); + is.adjustReadPos(is.size()); +} + +XXH64CompressedChunk::XXH64CompressedChunk(CompressionConfig::Type type, uint8_t level) + : _type(type), + _level(level) +{ } + +Encoding +XXH64CompressedChunk::compress(nbostream & os, Encoding::Crc crc) const { + nbostream org; + serializeEntries(org); + DataBuffer compressed; + CompressionConfig cfg(_type, _level, 80); + ConstBufferRef uncompressed(org.data(), org.size()); + Encoding::Compression actual = toCompression(::compress(cfg, uncompressed, compressed, false)); + os << uint32_t(uncompressed.size()); + size_t start = os.wp(); + os.write(compressed.getData(), compressed.getDataLen()); + os << int32_t(Encoding::calcCrc(crc, os.data()+start, os.size() - start)); + return Encoding(Encoding::Crc::xxh64, actual); +} + +Encoding +XXH64CompressedChunk::onEncode(IChunk::nbostream &os) const { + return compress(os, Encoding::Crc::xxh64); +} + +void +XXH64CompressedChunk::onDecode(IChunk::nbostream &is) { + uint32_t uncompressedLen; + is >> uncompressedLen; + verifyCrc(is, Encoding::Crc::xxh64); + decompress(is, uncompressedLen); +} + +} diff --git a/searchlib/src/vespa/searchlib/transactionlog/chunks.h b/searchlib/src/vespa/searchlib/transactionlog/chunks.h new file mode 100644 index 00000000000..9b09a0b20fb --- /dev/null +++ b/searchlib/src/vespa/searchlib/transactionlog/chunks.h @@ -0,0 +1,41 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include "ichunk.h" +#include <vespa/vespalib/util/compressionconfig.h> + +namespace search::transactionlog { + +/// Current default chunk serialisation format +class XXH64NoneChunk : public IChunk { +protected: + Encoding onEncode(nbostream &os) const override; + void onDecode(nbostream &is) override; +public: +}; + +/// TODO Legacy chunk serialisation format to be removed soon. +class CCITTCRC32NoneChunk : public IChunk { +protected: + Encoding onEncode(nbostream &os) const override; + void onDecode(nbostream &is) override; +public: +}; + +/// Future default chunk serialisation format +class XXH64CompressedChunk : public IChunk { +public: + using CompressionConfig = vespalib::compression::CompressionConfig; + XXH64CompressedChunk(CompressionConfig::Type, uint8_t level); +protected: + void decompress(nbostream & os, uint32_t uncompressedLen); + Encoding compress(nbostream & os, Encoding::Crc crc) const; + Encoding onEncode(nbostream &os) const override; + void onDecode(nbostream &is) override; +private: + CompressionConfig::Type _type; + uint8_t _level; +}; + +} diff --git a/searchlib/src/vespa/searchlib/transactionlog/ichunk.cpp b/searchlib/src/vespa/searchlib/transactionlog/ichunk.cpp new file mode 100644 index 00000000000..36edd832c5a --- /dev/null +++ b/searchlib/src/vespa/searchlib/transactionlog/ichunk.cpp @@ -0,0 +1,112 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "chunks.h" +#include <vespa/vespalib/util/stringfmt.h> +#include <vespa/vespalib/util/crc.h> +#include <vespa/vespalib/util/exceptions.h> +#include <xxhash.h> +#include <cassert> + +using std::make_unique; +using vespalib::make_string_short::fmt; +using vespalib::nbostream_longlivedbuf; +using vespalib::compression::CompressionConfig; +using vespalib::IllegalArgumentException; + +namespace search::transactionlog { + +Encoding::Encoding(Crc crc, Compression compression) + : _raw(crc | (compression << 4u)) +{ + assert(crc <= Crc::xxh64); + assert(compression <= Compression::zstd); +} + +IChunk::~IChunk() = default; + +void +IChunk::add(const Packet::Entry & entry) { + _entries.emplace_back(entry); +} + +SerialNumRange +IChunk::range() const { + return _entries.empty() + ? SerialNumRange() + : SerialNumRange(_entries.front().serial(), _entries.back().serial()); +} + +void +IChunk::deserializeEntries(nbostream & is) { + while (is.good() && !is.empty()) { + Packet::Entry e; + e.deserialize(is); + add(e); + } +} + +void +IChunk::serializeEntries(nbostream & os) const { + for (const auto & e : _entries) { + e.serialize(os); + } +} + +Encoding +IChunk::encode(nbostream & os) const { + return onEncode(os); +} + +void +IChunk::decode(nbostream & is) { + onDecode(is); +} + +IChunk::UP +IChunk::create(uint8_t chunkType) { + return create(Encoding(chunkType), 9); +} +IChunk::UP +IChunk::create(Encoding encoding, uint8_t compressionLevel) { + switch (encoding.getCrc()) { + case Encoding::Crc::xxh64: + switch (encoding.getCompression()) { + case Encoding::Compression::none: + return make_unique<XXH64NoneChunk>(); + case Encoding::Compression::none_multi: + return make_unique<XXH64CompressedChunk>(CompressionConfig::NONE_MULTI, compressionLevel); + case Encoding::Compression::lz4: + return make_unique<XXH64CompressedChunk>(CompressionConfig::LZ4, compressionLevel); + case Encoding::Compression::zstd: + return make_unique<XXH64CompressedChunk>(CompressionConfig::ZSTD, compressionLevel); + default: + throw IllegalArgumentException(fmt("Unhandled compression type '%d' for xxh64, compression=", + encoding.getCompression())); + } + case Encoding::Crc::ccitt_crc32: + switch (encoding.getCompression()) { + case Encoding::Compression::none: + return make_unique<CCITTCRC32NoneChunk>(); + default: + throw IllegalArgumentException(fmt("Unhandled compression type '%d' for ccitt_crc32, compression=", + encoding.getCompression())); + } + default: + throw IllegalArgumentException(fmt("Unhandled crc type '%d'", encoding.getCrc())); + } +} + +int32_t Encoding::calcCrc(Crc version, const void * buf, size_t sz) +{ + if (version == xxh64) { + return static_cast<int32_t>(XXH64(buf, sz, 0ll)); + } else if (version == ccitt_crc32) { + vespalib::crc_32_type calculator; + calculator.process_bytes(buf, sz); + return calculator.checksum(); + } else { + abort(); + } +} + +} diff --git a/searchlib/src/vespa/searchlib/transactionlog/ichunk.h b/searchlib/src/vespa/searchlib/transactionlog/ichunk.h new file mode 100644 index 00000000000..4aa1b0a5a90 --- /dev/null +++ b/searchlib/src/vespa/searchlib/transactionlog/ichunk.h @@ -0,0 +1,62 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include "common.h" +#include <memory> + +namespace vespalib { class nbostream; } + +namespace search::transactionlog { + +class Encoding { +public: + enum Crc { + nocrc = 0, + ccitt_crc32 = 1, + xxh64 = 2 + }; + enum Compression { + none = 0, + none_multi = 1, + lz4 = 2, + zstd = 3 + }; + Encoding(uint8_t raw) : _raw(raw) {} + Encoding(Crc crc, Compression compression); + Crc getCrc() const { return Crc(_raw & 0xf); } + Compression getCompression() const { return Compression((_raw >> 4) & 0xf); } + static int32_t calcCrc(Crc version, const void * buf, size_t sz); + uint8_t getRaw() const { return _raw; } +private: + uint8_t _raw; +}; + +/** + * Interface for different chunk formats. + * Format specifies both crc type, and compression type. + */ +class IChunk { +public: + using UP = std::unique_ptr<IChunk>; + using Entries = std::vector<Packet::Entry>; + using nbostream = vespalib::nbostream; + using ConstBufferRef = vespalib::ConstBufferRef; + virtual ~IChunk(); + const Entries & getEntries() const { return _entries; } + void add(const Packet::Entry & entry); + Encoding encode(nbostream & os) const; + void decode(nbostream & buf); + static UP create(uint8_t chunkType); + static UP create(Encoding chunkType, uint8_t compressionLevel); + SerialNumRange range() const; +protected: + virtual Encoding onEncode(nbostream & os) const = 0; + virtual void onDecode(nbostream & is) = 0; + void deserializeEntries(nbostream & is); + void serializeEntries(nbostream & os) const; +private: + Entries _entries; +}; + +} diff --git a/vespalib/src/vespa/vespalib/util/compressionconfig.h b/vespalib/src/vespa/vespalib/util/compressionconfig.h index bb54a74ea41..c0010e8e05c 100644 --- a/vespalib/src/vespa/vespalib/util/compressionconfig.h +++ b/vespalib/src/vespa/vespalib/util/compressionconfig.h @@ -11,7 +11,7 @@ namespace vespalib::compression { struct CompressionConfig { enum Type { NONE = 0, - HISTORIC_1 = 1, + NONE_MULTI = 1, HISTORIC_2 = 2, HISTORIC_3 = 3, HISTORIC_4 = 4, @@ -42,7 +42,7 @@ struct CompressionConfig { static Type toType(uint32_t val) { switch (val) { - case 1: return HISTORIC_1; + case 1: return NONE_MULTI; case 2: return HISTORIC_2; case 3: return HISTORIC_3; case 4: return HISTORIC_4; @@ -75,8 +75,6 @@ struct CompressionConfig { class CompressionInfo { public: - CompressionInfo(size_t uncompressedSize) - : _uncompressedSize(uncompressedSize), _compressedSize(uncompressedSize) { } CompressionInfo(size_t uncompressedSize, size_t compressedSize) : _uncompressedSize(uncompressedSize), _compressedSize(compressedSize) { } size_t getUncompressedSize() const { return _uncompressedSize; } diff --git a/vespalib/src/vespa/vespalib/util/compressor.cpp b/vespalib/src/vespa/vespalib/util/compressor.cpp index 56533a77643..9e059a9a29b 100644 --- a/vespalib/src/vespa/vespalib/util/compressor.cpp +++ b/vespalib/src/vespa/vespalib/util/compressor.cpp @@ -2,7 +2,6 @@ #include "lz4compressor.h" #include "zstdcompressor.h" -#include <vespa/vespalib/util/memory.h> #include <vespa/vespalib/util/stringfmt.h> #include <vespa/vespalib/data/databuffer.h> #include <stdexcept> @@ -58,35 +57,37 @@ compress(ICompressor & compressor, const CompressionConfig & compression, const CompressionConfig::Type docompress(const CompressionConfig & compression, const ConstBufferRef & org, DataBuffer & dest) { - CompressionConfig::Type type(CompressionConfig::NONE); switch (compression.type) { case CompressionConfig::LZ4: { LZ4Compressor lz4; - type = compress(lz4, compression, org, dest); + return compress(lz4, compression, org, dest); } - break; case CompressionConfig::ZSTD: { ZStdCompressor zstd; - type = compress(zstd, compression, org, dest); + return compress(zstd, compression, org, dest); } - break; + case CompressionConfig::NONE_MULTI: + return CompressionConfig::NONE_MULTI; case CompressionConfig::NONE: default: - break; + return CompressionConfig::NONE; } - return type; } CompressionConfig::Type +compress(CompressionConfig::Type compression, const ConstBufferRef & org, DataBuffer & dest, bool allowSwap) { + return compress(CompressionConfig(compression), org, dest, allowSwap); +} +CompressionConfig::Type compress(const CompressionConfig & compression, const ConstBufferRef & org, DataBuffer & dest, bool allowSwap) { CompressionConfig::Type type(CompressionConfig::NONE); if (org.size() >= compression.minSize) { type = docompress(compression, org, dest); } - if (type == CompressionConfig::NONE) { + if ((type == CompressionConfig::NONE) || (type == CompressionConfig::NONE_MULTI)) { if (allowSwap) { DataBuffer tmp(const_cast<char *>(org.c_str()), org.size()); tmp.moveFreeToData(org.size()); @@ -139,6 +140,7 @@ decompress(const CompressionConfig::Type & type, size_t uncompressedLen, const C } break; case CompressionConfig::NONE: + case CompressionConfig::NONE_MULTI: case CompressionConfig::UNCOMPRESSABLE: if (allowSwap) { DataBuffer tmp(const_cast<char *>(org.c_str()), org.size()); diff --git a/vespalib/src/vespa/vespalib/util/compressor.h b/vespalib/src/vespa/vespalib/util/compressor.h index 106b87c4c90..3a8938c888e 100644 --- a/vespalib/src/vespa/vespalib/util/compressor.h +++ b/vespalib/src/vespa/vespalib/util/compressor.h @@ -27,6 +27,7 @@ public: * and it is not compressable. Then it will be swapped in. * @param allowSwap will tell it the data must be appended or if it can be swapped in if it is uncompressable or config is NONE. */ +CompressionConfig::Type compress(CompressionConfig::Type compression, const ConstBufferRef & org, DataBuffer & dest, bool allowSwap); CompressionConfig::Type compress(const CompressionConfig & compression, const vespalib::ConstBufferRef & org, vespalib::DataBuffer & dest, bool allowSwap); /** |