diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2017-08-28 13:53:07 +0200 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2017-08-28 13:55:47 +0200 |
commit | 2868058bbb1427bc16a941e453314ee856303c4e (patch) | |
tree | f23b807bf20443f721c436f27a435efd9009ad94 /vespalib | |
parent | ccf572d02b2552f033f2811666dc7a5cb9546fa6 (diff) |
Moved databuffer and compresssion to vespalib
Diffstat (limited to 'vespalib')
21 files changed, 1427 insertions, 5 deletions
diff --git a/vespalib/CMakeLists.txt b/vespalib/CMakeLists.txt index 112f67f3a70..3ac7b388ec4 100644 --- a/vespalib/CMakeLists.txt +++ b/vespalib/CMakeLists.txt @@ -6,6 +6,7 @@ vespa_define_module( EXTERNAL_DEPENDS lz4 + zstd APPS src/apps/make_fixture_macros @@ -22,6 +23,8 @@ vespa_define_module( src/tests/closure src/tests/component src/tests/compress + src/tests/compression + src/tests/data/databuffer src/tests/data/input_reader src/tests/data/lz4_encode_decode src/tests/data/memory_input diff --git a/vespalib/src/tests/compression/.gitignore b/vespalib/src/tests/compression/.gitignore new file mode 100644 index 00000000000..60ffd040a47 --- /dev/null +++ b/vespalib/src/tests/compression/.gitignore @@ -0,0 +1,4 @@ +*_test +.depend +Makefile +vespalib_compression_test_app diff --git a/vespalib/src/tests/compression/CMakeLists.txt b/vespalib/src/tests/compression/CMakeLists.txt new file mode 100644 index 00000000000..c5738733218 --- /dev/null +++ b/vespalib/src/tests/compression/CMakeLists.txt @@ -0,0 +1,8 @@ +# Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +vespa_add_executable(vespalib_compression_test_app TEST + SOURCES + compression_test.cpp + DEPENDS + vespalib +) +vespa_add_test(NAME vespalib_compression_test_app COMMAND vespalib_compression_test_app) diff --git a/vespalib/src/tests/compression/compression_test.cpp b/vespalib/src/tests/compression/compression_test.cpp new file mode 100644 index 00000000000..01cfe0af223 --- /dev/null +++ b/vespalib/src/tests/compression/compression_test.cpp @@ -0,0 +1,45 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include <vespa/vespalib/testkit/test_kit.h> +#include <vespa/vespalib/stllike/string.h> +#include <vespa/vespalib/util/compressor.h> +#include <vespa/vespalib/data/databuffer.h> + +#include <vespa/log/log.h> +LOG_SETUP("compression_test"); + +using namespace vespalib; +using namespace vespalib::compression; + +static vespalib::string _G_compressableText("AAAAAAAAAAAAAAABBBBBBBBBBBBBBBBBCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCDDDDDDDDDDDDDDDDDDDDDDDDDEEEEEEEEEEEEEEE" + "AAAAAAAAAAAAAAABBBBBBBBBBBBBBBBBCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCDDDDDDDDDDDDDDDDDDDDDDDDDEEEEEEEEEEEEEEE" + "AAAAAAAAAAAAAAABBBBBBBBBBBBBBBBBCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCDDDDDDDDDDDDDDDDDDDDDDDDDEEEEEEEEEEEEEEE" + "AAAAAAAAAAAAAAABBBBBBBBBBBBBBBBBCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCDDDDDDDDDDDDDDDDDDDDDDDDDEEEEEEEEEEEEEEE" + "AAAAAAAAAAAAAAABBBBBBBBBBBBBBBBBCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCDDDDDDDDDDDDDDDDDDDDDDDDDEEEEEEEEEEEEEEE" + "XYZABCDEFGHIJGJMNOPQRSTUVW" + "AAAAAAAAAAAAAAABBBBBBBBBBBBBBBBBCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCDDDDDDDDDDDDDDDDDDDDDDDDDEEEEEEEEEEEEEEE" + "AAAAAAAAAAAAAAABBBBBBBBBBBBBBBBBCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCDDDDDDDDDDDDDDDDDDDDDDDDDEEEEEEEEEEEEEEE" + "AAAAAAAAAAAAAAABBBBBBBBBBBBBBBBBCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCDDDDDDDDDDDDDDDDDDDDDDDDDEEEEEEEEEEEEEEE" + "AAAAAAAAAAAAAAABBBBBBBBBBBBBBBBBCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCDDDDDDDDDDDDDDDDDDDDDDDDDEEEEEEEEEEEEEEE" + "AAAAAAAAAAAAAAABBBBBBBBBBBBBBBBBCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCDDDDDDDDDDDDDDDDDDDDDDDDDEEEEEEEEEEEEEEE" + "XYZABCDEFGHIJGJMNOPQRSTUVW"); + +TEST("requireThatLZ4CompressFine") { + CompressionConfig cfg(CompressionConfig::Type::LZ4); + ConstBufferRef ref(_G_compressableText.c_str(), _G_compressableText.size()); + DataBuffer compressed; + EXPECT_EQUAL(CompressionConfig::Type::LZ4, compress(cfg, ref, compressed, false)); + EXPECT_EQUAL(66u, compressed.getDataLen()); +} + +TEST("requireThatZStdCompressFine") { + CompressionConfig cfg(CompressionConfig::Type::ZSTD); + ConstBufferRef ref(_G_compressableText.c_str(), _G_compressableText.size()); + DataBuffer compressed; + EXPECT_EQUAL(CompressionConfig::Type::ZSTD, compress(cfg, ref, compressed, false)); + EXPECT_EQUAL(64u, compressed.getDataLen()); +} + +TEST_MAIN() { + TEST_RUN_ALL(); +} diff --git a/vespalib/src/tests/data/databuffer/.gitignore b/vespalib/src/tests/data/databuffer/.gitignore new file mode 100644 index 00000000000..f144796c66a --- /dev/null +++ b/vespalib/src/tests/data/databuffer/.gitignore @@ -0,0 +1 @@ +vespalib_data_databuffer_test_app diff --git a/vespalib/src/tests/data/databuffer/CMakeLists.txt b/vespalib/src/tests/data/databuffer/CMakeLists.txt new file mode 100644 index 00000000000..f1c6c7c1862 --- /dev/null +++ b/vespalib/src/tests/data/databuffer/CMakeLists.txt @@ -0,0 +1,8 @@ +# Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +vespa_add_executable(vespalib_data_databuffer_test_app TEST + SOURCES + databuffer_test.cpp + DEPENDS + vespalib +) +vespa_add_test(NAME vespalib_data_databuffer_test_app COMMAND vespalib_data_databuffer_test_app) diff --git a/vespalib/src/tests/data/databuffer/databuffer_test.cpp b/vespalib/src/tests/data/databuffer/databuffer_test.cpp new file mode 100644 index 00000000000..f440ca1e15c --- /dev/null +++ b/vespalib/src/tests/data/databuffer/databuffer_test.cpp @@ -0,0 +1,142 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include <vespa/vespalib/testkit/testapp.h> +#include <vespa/vespalib/data/databuffer.h> +#include <iostream> + +using namespace vespalib; + +class Test : public vespalib::TestApp { +private: + void testBasic(); +public: + int Main() override { + TEST_INIT("databuffer_test"); + + testBasic(); TEST_FLUSH(); + + TEST_DONE(); + } +}; + +TEST_APPHOOK(Test); + +void +Test::testBasic() +{ + DataBuffer a(50); + EXPECT_EQUAL(256u, a.getBufSize()); + EXPECT_EQUAL(a.getFreeLen(), a.getBufSize()); + a.ensureFree(1000); + EXPECT_EQUAL(1024u, a.getBufSize()); + EXPECT_EQUAL(a.getFreeLen(), a.getBufSize()); + EXPECT_EQUAL(0u, a.getDeadLen()); + EXPECT_EQUAL(0u, a.getDataLen()); + EXPECT_EQUAL(a.getData(), a.getDead()); + EXPECT_EQUAL(a.getData(), a.getFree()); + EXPECT_EQUAL(a.getBufSize(), a.getFreeLen()); + a.assertValid(); + + a.writeInt16(7); + EXPECT_EQUAL(0u, a.getDeadLen()); + EXPECT_EQUAL(2u, a.getDataLen()); + EXPECT_EQUAL(a.getBufSize()-2, a.getFreeLen()); + EXPECT_EQUAL(a.getData(), a.getDead()); + EXPECT_EQUAL(a.getData()+2, a.getFree()); + a.clear(); + EXPECT_EQUAL(0u, a.getDeadLen()); + EXPECT_EQUAL(0u, a.getDataLen()); + EXPECT_EQUAL(a.getBufSize(), a.getFreeLen()); + + a.writeInt8(0xaau); + EXPECT_EQUAL(1u, a.getDataLen()); + EXPECT_EQUAL(0xaau, a.peekInt8(0)); + EXPECT_EQUAL(1u, a.getDataLen()); + EXPECT_EQUAL(0xaau, a.readInt8()); + EXPECT_EQUAL(0u, a.getDataLen()); + + a.writeInt16(0xaabbu); + EXPECT_EQUAL(2u, a.getDataLen()); + EXPECT_EQUAL(0xaabbu, a.peekInt16(0)); + EXPECT_EQUAL(2u, a.getDataLen()); + EXPECT_EQUAL(0xaabbu, a.readInt16()); + EXPECT_EQUAL(0u, a.getDataLen()); + a.writeInt16(0xaabbu); + EXPECT_EQUAL(2u, a.getDataLen()); + EXPECT_EQUAL(0xbbaau, a.peekInt16Reverse(0)); + EXPECT_EQUAL(2u, a.getDataLen()); + EXPECT_EQUAL(0xbbaau, a.readInt16Reverse()); + EXPECT_EQUAL(0u, a.getDataLen()); + + a.writeInt32(0xaabbccddu); + EXPECT_EQUAL(4u, a.getDataLen()); + EXPECT_EQUAL(0xaabbccddu, a.peekInt32(0)); + EXPECT_EQUAL(4u, a.getDataLen()); + EXPECT_EQUAL(0xaabbccddu, a.readInt32()); + EXPECT_EQUAL(0u, a.getDataLen()); + a.writeInt32(0xaabbccddu); + EXPECT_EQUAL(4u, a.getDataLen()); + EXPECT_EQUAL(0xddccbbaau, a.peekInt32Reverse(0)); + EXPECT_EQUAL(4u, a.getDataLen()); + EXPECT_EQUAL(0xddccbbaau, a.readInt32Reverse()); + EXPECT_EQUAL(0u, a.getDataLen()); + + a.writeInt64(0xaabbccddeeff9988ul); + EXPECT_EQUAL(8u, a.getDataLen()); + EXPECT_EQUAL(0xaabbccddeeff9988ul, a.peekInt64(0)); + EXPECT_EQUAL(8u, a.getDataLen()); + EXPECT_EQUAL(0xaabbccddeeff9988ul, a.readInt64()); + EXPECT_EQUAL(0u, a.getDataLen()); + a.writeInt64(0xaabbccddeeff9988ul); + EXPECT_EQUAL(8u, a.getDataLen()); + EXPECT_EQUAL(0x8899ffeeddccbbaaul, a.peekInt64Reverse(0)); + EXPECT_EQUAL(8u, a.getDataLen()); + EXPECT_EQUAL(0x8899ffeeddccbbaaul, a.readInt64Reverse()); + EXPECT_EQUAL(0u, a.getDataLen()); + + a.writeFloat(8.9f); + EXPECT_EQUAL(4u, a.getDataLen()); + EXPECT_EQUAL(8.9f, a.readFloat()); + EXPECT_EQUAL(0u, a.getDataLen()); + + a.writeDouble(8.9); + EXPECT_EQUAL(8u, a.getDataLen()); + EXPECT_EQUAL(8.9, a.readDouble()); + EXPECT_EQUAL(0u, a.getDataLen()); + + const char *c = "abc"; + char b[3]; + a.writeBytes(c, 3); + EXPECT_EQUAL(3u, a.getDataLen()); + EXPECT_EQUAL(0, memcmp(c, a.getData(), a.getDataLen())); + a.peekBytes(b, 3, 0); + EXPECT_EQUAL(3u, a.getDataLen()); + EXPECT_EQUAL(0, memcmp(c, b, sizeof(b))); + a.readBytes(b, sizeof(b)); + EXPECT_EQUAL(0u, a.getDataLen()); + EXPECT_EQUAL(0, memcmp(c, b, sizeof(b))); + + a.writeInt64(67); + EXPECT_EQUAL(8u, a.getDataLen()); + EXPECT_FALSE(a.shrink(1025)); + EXPECT_FALSE(a.shrink(7)); + EXPECT_TRUE(a.shrink(16)); + EXPECT_EQUAL(8u, a.getDataLen()); + EXPECT_EQUAL(16u, a.getBufSize()); + + a.writeInt64(89); + EXPECT_EQUAL(16u, a.getDataLen()); + EXPECT_EQUAL(16u, a.getBufSize()); + EXPECT_EQUAL(0u, a.getDeadLen()); + EXPECT_EQUAL(67u, a.readInt64()); + EXPECT_EQUAL(8u, a.getDataLen()); + EXPECT_EQUAL(8u, a.getDeadLen()); + EXPECT_EQUAL(16u, a.getBufSize()); + a.pack(16); + EXPECT_EQUAL(8u, a.getDataLen()); + EXPECT_EQUAL(0u, a.getDeadLen()); + EXPECT_EQUAL(256u, a.getBufSize()); + EXPECT_EQUAL(89u, a.readInt64()); + EXPECT_EQUAL(0u, a.getDataLen()); + EXPECT_EQUAL(256u, a.getBufSize()); +} diff --git a/vespalib/src/vespa/vespalib/component/version.cpp b/vespalib/src/vespa/vespalib/component/version.cpp index 3fab6d6f130..af38a675de8 100644 --- a/vespalib/src/vespa/vespalib/component/version.cpp +++ b/vespalib/src/vespa/vespalib/component/version.cpp @@ -7,7 +7,6 @@ namespace vespalib { - Version::Version(int major, int minor, int micro, const string & qualifier) : _major(major), _minor(minor), @@ -89,7 +88,7 @@ Version::Version(const string & versionString) _qualifier(), _stringValue(versionString) { - if (versionString != "") { + if ( ! versionString.empty()) { stringref r(versionString.c_str(), versionString.size()); stringref::size_type dot(r.find('.')); stringref majorS(r.substr(0, dot)); diff --git a/vespalib/src/vespa/vespalib/data/CMakeLists.txt b/vespalib/src/vespa/vespalib/data/CMakeLists.txt index 29c8055a0c0..3a94e00ae33 100644 --- a/vespalib/src/vespa/vespalib/data/CMakeLists.txt +++ b/vespalib/src/vespa/vespalib/data/CMakeLists.txt @@ -1,6 +1,7 @@ # Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. vespa_add_library(vespalib_vespalib_data OBJECT SOURCES + databuffer.cpp input.cpp input_reader.cpp lz4_input_decoder.cpp diff --git a/vespalib/src/vespa/vespalib/data/databuffer.cpp b/vespalib/src/vespa/vespalib/data/databuffer.cpp new file mode 100644 index 00000000000..5558a371836 --- /dev/null +++ b/vespalib/src/vespa/vespalib/data/databuffer.cpp @@ -0,0 +1,172 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include "databuffer.h" + +namespace vespalib { + +namespace { +size_t padbefore(size_t alignment, const char *buf) { + return (alignment - (size_t(buf) % alignment)) % alignment; +} +} + +DataBuffer::DataBuffer(size_t len, size_t alignment, const Alloc & initial) + : _alignment(alignment), + _externalBuf(NULL), + _bufstart(NULL), + _bufend(NULL), + _datapt(NULL), + _freept(NULL), + _buffer(initial.create(0)) +{ + assert(_alignment > 0); + if (len > 0) { + // avoid very small buffers for performance reasons: + size_t bufsize = std::max(256ul, roundUp2inN(len + (_alignment - 1))); + Alloc newBuf(initial.create(bufsize)); + _bufstart = static_cast<char *>(newBuf.get()); + _buffer.swap(newBuf); + + _datapt = _bufstart + padbefore(alignment, _bufstart); + _freept = _datapt; + _bufend = _bufstart + bufsize; + assert(_bufstart != NULL); + } +} + +DataBuffer::~DataBuffer() = default; + +void +DataBuffer::moveFreeToData(size_t len) +{ + assert(getFreeLen() >= len); + _freept += len; +} + + +void +DataBuffer::moveDeadToData(size_t len) +{ + assert(getDeadLen() >= len); + _datapt -= len; + if (_bufstart != _externalBuf) { + assert(getDeadLen() >= padbefore(_alignment, _bufstart)); // Do not move ahead of alignment. + } +} + + +void +DataBuffer::moveDataToFree(size_t len) +{ + assert(getDataLen() >= len); + _freept -= len; +} + + +bool +DataBuffer::shrink(size_t newsize) +{ + if (getBufSize() <= newsize || getDataLen() > newsize) { + return false; + } + char *newbuf = NULL; + char *newdata = NULL; + newsize += (_alignment - 1); + Alloc newBuf(_buffer.create(newsize)); + if (newsize != 0) { + newbuf = static_cast<char *>(newBuf.get()); + newdata = newbuf + padbefore(_alignment, newbuf); + memcpy(newdata, _datapt, getDataLen()); + } + _buffer.swap(newBuf); + _bufstart = newbuf; + _freept = newdata + getDataLen(); + _datapt = newdata; + _bufend = newbuf + newsize; + return true; +} + + +void +DataBuffer::pack(size_t needbytes) +{ + needbytes += (_alignment - 1); + size_t dataLen = getDataLen(); + + if ((getDeadLen() + getFreeLen()) < needbytes || + (getDeadLen() + getFreeLen()) * 4 < dataLen) + { + size_t bufsize = std::max(256ul, roundUp2inN(needbytes+dataLen)); + Alloc newBuf(_buffer.create(bufsize)); + char *newbuf = static_cast<char *>(newBuf.get()); + char *newdata = newbuf + padbefore(_alignment, newbuf); + memcpy(newdata, _datapt, dataLen); + _bufstart = newbuf; + _datapt = newdata; + _freept = newdata + dataLen; + _bufend = newbuf + bufsize; + _buffer.swap(newBuf); + } else { + char *datapt = _bufstart + padbefore(_alignment, _bufstart); + memmove(datapt, _datapt, dataLen); + _datapt = datapt; + _freept = _datapt + dataLen; + } +} + + +bool +DataBuffer::equals(DataBuffer *other) +{ + if (getDataLen() != other->getDataLen()) + return false; + return memcmp(getData(), other->getData(), getDataLen()) == 0; +} + + +void +DataBuffer::hexDump() +{ + char *pt = _datapt; + printf("*** DataBuffer HexDump BEGIN ***\n"); + uint32_t i = 0; + while (pt < _freept) { + printf("%x ", (unsigned char) *pt++); + if ((++i % 16) == 0) + printf("\n"); + } + if ((i % 16) != 0) + printf("\n"); + printf("*** DataBuffer HexDump END ***\n"); +} + + +void +DataBuffer::swap(DataBuffer &other) +{ + _buffer.swap(other._buffer); + std::swap(_alignment, other._alignment); + std::swap(_externalBuf, other._externalBuf); + std::swap(_bufstart, other._bufstart); + std::swap(_bufend, other._bufend); + std::swap(_datapt, other._datapt); + std::swap(_freept, other._freept); +} + +vespalib::alloc::Alloc +DataBuffer::stealBuffer() +{ + assert( ! referencesExternalData() ); + _externalBuf = nullptr; + _bufstart = nullptr; + _bufend = nullptr; + _datapt = nullptr; + _freept = nullptr; + return std::move(_buffer); +} + +bool +DataBuffer::referencesExternalData() const { + return (_externalBuf == _bufstart) && (getBufSize() > 0); +} + +} // namespace vespalib diff --git a/vespalib/src/vespa/vespalib/data/databuffer.h b/vespalib/src/vespa/vespalib/data/databuffer.h new file mode 100644 index 00000000000..28524f373b2 --- /dev/null +++ b/vespalib/src/vespa/vespalib/data/databuffer.h @@ -0,0 +1,611 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include <cstring> +#include <cassert> +#include <vespa/vespalib/util/alloc.h> + +namespace vespalib { + +/** + * This is a buffer that may hold the stream representation of + * packets. It has helper methods in order to simplify and standardize + * packet encoding and decoding. The default byte order for + * reading/writing integers is internet order (big endian). The + * methods with a 'Reverse' suffix assume that the data in the buffer + * is stored in reverse internet order (little endian). + * + * A databuffer covers a continuous chunk of memory that is split into + * 3 parts; 'dead', 'data' and 'free'. 'dead' denotes the space at the + * beginning of the buffer that may not currently be utilized, 'data' + * denotes the part that contains the actual data and 'free' denotes + * the free space at the end of the buffer. Initially, the 'dead' and + * 'data' parts are empty, and the 'free' part spans the entire + * buffer. When writing to the buffer, bytes are transfered from the + * 'free' part to the 'data' part of the buffer. When reading from the + * buffer, bytes are transferred from the 'data' part to the 'dead' + * part of the buffer. If the 'free' part of the buffer becomes empty, + * the data will be relocated within the buffer and/or a bigger buffer + * will be allocated. + **/ +class DataBuffer +{ +private: + using Alloc = alloc::Alloc; + size_t _alignment; + char *_externalBuf; + char *_bufstart; + char *_bufend; + char *_datapt; + char *_freept; + Alloc _buffer; + +public: + typedef std::unique_ptr<DataBuffer> UP; + DataBuffer(const DataBuffer &) = delete; + DataBuffer &operator=(const DataBuffer &) = delete; + + /** + * Construct a databuffer. + * + * @param len the initial size of the buffer. + * @param alignment required memory alignment for data start + **/ + DataBuffer(size_t len = 1024, size_t alignment = 1, const Alloc & initial = Alloc::alloc(0)); + + /** + * Construct a databuffer using externally allocated memory. Note + * that the externally allocated memory will not be freed by the + * databuffer. + * + * @param buf pointer to preallocated memory + * @param len length of preallocated memory + **/ + DataBuffer(char *buf, size_t len) : + _alignment(1), + _externalBuf(buf), + _bufstart(buf), + _bufend(buf + len), + _datapt(_bufstart), + _freept(_bufstart), + _buffer(Alloc::alloc(0)) + { } + + DataBuffer(const char *buf, size_t len) : + _alignment(1), + _externalBuf(const_cast<char *>(buf)), + _bufstart(_externalBuf), + _bufend(_bufstart + len), + _datapt(_bufstart), + _freept(_bufend), + _buffer(Alloc::alloc(0)) + { } + + ~DataBuffer(); + + /** + * @return a pointer to the dead part of this buffer. + **/ + char *getDead() const { return _bufstart; } + + /** + * @return a pointer to the data part of this buffer. + **/ + char *getData() const { return _datapt; } + + /** + * @return a pointer to the free part of this buffer. + **/ + char *getFree() const { return _freept; } + + /** + * @return the length of the dead part of this buffer. + **/ + size_t getDeadLen() const { return _datapt - _bufstart; } + + /** + * @return the length of the data part of this buffer. + **/ + size_t getDataLen() const { return _freept - _datapt; } + + /** + * @return the length of the free part of this buffer. + **/ + size_t getFreeLen() const { return _bufend - _freept; } + + /** + * @return the length of the entire buffer. + **/ + size_t getBufSize() const { return _bufend - _bufstart; } + + + /** + * 'Move' bytes from the free part to the data part of this buffer. + * This will have the same effect as if the data already located in + * the free part of this buffer was written to the buffer. + * + * @param len number of bytes to 'move'. + **/ + void moveFreeToData(size_t len); + + /** + * 'Move' bytes from the data part to the dead part of this buffer. + * This will have the effect of discarding data without having to + * read it. + * + * @param len number of bytes to 'move'. + **/ + void moveDataToDead(size_t len) { _datapt += len; } + + + /** + * 'Move' bytes from the dead part to the data part of this buffer. + * This may be used to undo a read operation (un-discarding + * data). Note that writing to the buffer may result in + * reorganization making the data part of the buffer disappear. + * + * @param len number of bytes to 'move'. + **/ + void moveDeadToData(size_t len); + + /** + * 'Move' bytes from the data part to the free part of this buffer. + * This may be used to undo a write operation; discarding the data + * most recently written. + * + * @param len number of bytes to 'move'. + **/ + void moveDataToFree(size_t len); + + + /** + * Clear this buffer. + **/ + void clear() { _datapt = _freept = _bufstart; } + + + /** + * Shrink this buffer. The given value is the new wanted size of + * this buffer. If the buffer is already smaller or equal in size + * compared to the given value, no resizing is performed and false + * is returned (Use the @ref ensureFree method to ensure free + * space). If the buffer currently contains more data than can be + * held in a buffer of the wanted size, no resizing is performed and + * false is returned. + * + * @param newsize the wanted new size of this buffer (in bytes). + * @return true if the buffer was shrunk, false otherwise. + **/ + bool shrink(size_t newsize); + + /** + * Reorganize this buffer such that the dead part becomes empty and + * the free part contains at least the given number of + * bytes. Allocate a bigger buffer if needed. + * + * @param needbytes required size of free part. + **/ + void pack(size_t needbytes); + + /** + * Ensure that the free part contains at least the given number of + * bytes. This method invokes the @ref Pack method if the free part + * of the buffer is too small. + * + * @param needbytes required size of free part. + **/ + void ensureFree(size_t needbytes) + { + if (needbytes > getFreeLen()) + pack(needbytes); + } + + + /** + * Write an 8-bit unsigned integer to this buffer. + * + * @param n the integer to write. + **/ + void writeInt8(uint8_t n) + { + ensureFree(1); + *_freept++ = (char)n; + } + + /** + * Write a 16-bit unsigned integer to this buffer. + * + * @param n the integer to write. + **/ + void writeInt16(uint16_t n) + { + ensureFree(2); + _freept[1] = (char)n; + n >>= 8; + _freept[0] = (char)n; + _freept += 2; + } + + /** + * Write a 32-bit unsigned integer to this buffer. + * + * @param n the integer to write. + **/ + void writeInt32(uint32_t n) + { + ensureFree(4); + _freept[3] = (char)n; + n >>= 8; + _freept[2] = (char)n; + n >>= 8; + _freept[1] = (char)n; + n >>= 8; + _freept[0] = (char)n; + _freept += 4; + } + + /** + * Write a 64-bit unsigned integer to this buffer. + * + * @param n the integer to write. + **/ + void writeInt64(uint64_t n) + { + ensureFree(8); + _freept[7] = (char)n; + n >>= 8; + _freept[6] = (char)n; + n >>= 8; + _freept[5] = (char)n; + n >>= 8; + _freept[4] = (char)n; + n >>= 8; + _freept[3] = (char)n; + n >>= 8; + _freept[2] = (char)n; + n >>= 8; + _freept[1] = (char)n; + n >>= 8; + _freept[0] = (char)n; + _freept += 8; + } + + + + /** + * Read an 8-bit unsigned integer from this buffer. + * + * @return the integer that has been read. + **/ + uint8_t readInt8() + { + return (unsigned char)(*_datapt++); + } + + /** + * Read a 16-bit unsigned integer from this buffer. + * + * @return the integer that has been read. + **/ + uint16_t readInt16() + { + unsigned char *tmp = (unsigned char *)(_datapt); + _datapt += 2; + return ((*tmp << 8) + *(tmp + 1)); + } + + /** + * Read a 16-bit unsigned integer stored in reverse internet order + * from this buffer. + * + * @return the integer that has been read. + **/ + uint16_t readInt16Reverse() + { + unsigned char *tmp = (unsigned char *)(_datapt); + _datapt += 2; + return ((*(tmp + 1) << 8) + *tmp); + } + + /** + * Read a 32-bit unsigned integer from this buffer. + * + * @return the integer that has been read. + **/ + uint32_t readInt32() + { + unsigned char *tmp = (unsigned char *)(_datapt); + _datapt += 4; + return + ((((((uint32_t)(*tmp << 8) + *(tmp + 1)) << 8) + + *(tmp + 2)) << 8) + *(tmp + 3)); + } + + /** + * Read a 32-bit unsigned integer stored in reverse internet order + * from this buffer. + * + * @return the integer that has been read. + **/ + uint32_t readInt32Reverse() + { + unsigned char *tmp = (unsigned char *)(_datapt); + _datapt += 4; + return + ((((((uint32_t)(*(tmp + 3) << 8) + *(tmp + 2)) << 8) + + *(tmp + 1)) << 8) + *tmp); + } + + /** + * Read a 64-bit unsigned integer from this buffer. + * + * @return the integer that has been read. + **/ + uint64_t readInt64() + { + unsigned char *tmp = (unsigned char *)(_datapt); + _datapt += 8; + return + ((((((((((((((uint64_t)(*tmp << 8) + *(tmp + 1)) << 8) + + *(tmp + 2)) << 8) + *(tmp + 3)) << 8) + + *(tmp + 4)) << 8) + *(tmp + 5)) << 8) + + *(tmp + 6)) << 8) + *(tmp + 7)); + } + + /** + * Read a 64-bit unsigned integer stored in reverse internet order + * from this buffer. + * + * @return the integer that has been read. + **/ + uint64_t readInt64Reverse() + { + unsigned char *tmp = (unsigned char *)(_datapt); + _datapt += 8; + return + ((((((((((((((uint64_t)(*(tmp + 7) << 8) + *(tmp + 6)) << 8) + + *(tmp + 5)) << 8) + *(tmp + 4)) << 8) + + *(tmp + 3)) << 8) + *(tmp + 2)) << 8) + + *(tmp + 1)) << 8) + *tmp); + } + + float readFloat() + { + float f; + uint32_t i = readInt32(); + memcpy(&f, &i, sizeof(f)); + return f; + } + + double readDouble() + { + double f; + uint64_t i = readInt64(); + memcpy(&f, &i, sizeof(f)); + return f; + } + + void writeFloat(float f) + { + uint32_t i; + memcpy(&i, &f, sizeof(f)); + writeInt32(i); + } + + void writeDouble(double f) + { + uint64_t i; + memcpy(&i, &f, sizeof(f)); + writeInt64(i); + } + + + /** + * Peek at an 8-bit unsigned integer in this buffer. Unlike a read + * operation, this will not modify the buffer. + * + * @param offset offset of the integer to access. + * @return value of the accessed integer. + **/ + uint8_t peekInt8(size_t offset) + { + assert(getDataLen() >= offset + 1); + return (uint8_t) *(_datapt + offset); + } + + /** + * Peek at a 16-bit unsigned integer in this buffer. Unlike a read + * operation, this will not modify the buffer. + * + * @param offset offset of the integer to access. + * @return value of the accessed integer. + **/ + uint16_t peekInt16(size_t offset) + { + assert(getDataLen() >= offset + 2); + unsigned char *tmp = (unsigned char *)(_datapt + offset); + return (uint16_t) ((*tmp << 8) + *(tmp + 1)); + } + + /** + * Peek at a 16-bit unsigned integer stored in reverse internet + * order in this buffer. Unlike a read operation, this will not + * modify the buffer. + * + * @param offset offset of the integer to access. + * @return value of the accessed integer. + **/ + uint16_t peekInt16Reverse(size_t offset) + { + assert(getDataLen() >= offset + 2); + unsigned char *tmp = (unsigned char *)(_datapt + offset); + return (uint16_t) ((*(tmp + 1) << 8) + *tmp); + } + + /** + * Peek at a 32-bit unsigned integer in this buffer. Unlike a read + * operation, this will not modify the buffer. + * + * @param offset offset of the integer to access. + * @return value of the accessed integer. + **/ + uint32_t peekInt32(size_t offset) + { + assert(getDataLen() >= offset + 4); + unsigned char *tmp = (unsigned char *)(_datapt + offset); + return + ((((((uint32_t)(*tmp << 8) + *(tmp + 1)) << 8) + + *(tmp + 2)) << 8) + *(tmp + 3)); + } + + /** + * Peek at a 32-bit unsigned integer stored in reverse internet + * order in this buffer. Unlike a read operation, this will not + * modify the buffer. + * + * @param offset offset of the integer to access. + * @return value of the accessed integer. + **/ + uint32_t peekInt32Reverse(size_t offset) + { + assert(getDataLen() >= offset + 4); + unsigned char *tmp = (unsigned char *)(_datapt + offset); + return + ((((((uint32_t)(*(tmp + 3) << 8) + *(tmp + 2)) << 8) + + *(tmp + 1)) << 8) + *tmp); + } + + /** + * Peek at a 64-bit unsigned integer in this buffer. Unlike a read + * operation, this will not modify the buffer. + * + * @param offset offset of the integer to access. + * @return value of the accessed integer. + **/ + uint64_t peekInt64(size_t offset) + { + assert(getDataLen() >= offset + 8); + unsigned char *tmp = (unsigned char *)(_datapt + offset); + return + ((((((((((((((uint64_t)(*tmp << 8) + *(tmp + 1)) << 8) + + *(tmp + 2)) << 8) + *(tmp + 3)) << 8) + + *(tmp + 4)) << 8) + *(tmp + 5)) << 8) + + *(tmp + 6)) << 8) + *(tmp + 7)); + } + + /** + * Peek at a 64-bit unsigned integer stored in reverse internet + * order in this buffer. Unlike a read operation, this will not + * modify the buffer. + * + * @param offset offset of the integer to access. + * @return value of the accessed integer. + **/ + uint64_t peekInt64Reverse(size_t offset) + { + assert(getDataLen() >= offset + 8); + unsigned char *tmp = (unsigned char *)(_datapt + offset); + return + ((((((((((((((uint64_t)(*(tmp + 7) << 8) + *(tmp + 6)) << 8) + + *(tmp + 5)) << 8) + *(tmp + 4)) << 8) + + *(tmp + 3)) << 8) + *(tmp + 2)) << 8) + + *(tmp + 1)) << 8) + *tmp); + } + + + /** + * Write bytes to this buffer. + * + * @param src source byte buffer. + * @param len number of bytes to write. + **/ + void writeBytes(const void *src, size_t len) + { + ensureFree(len); + memcpy(_freept, src, len); + _freept += len; + } + + /** + * Fill buffer with zero-bytes. + * + * @param len number of zero-bytes to write. + **/ + void zeroFill(size_t len) + { + ensureFree(len); + memset(_freept, 0, len); + _freept += len; + } + + /** + * Read bytes from this buffer. + * + * @param dst destination byte buffer. + * @param len number of bytes to read. + **/ + void readBytes(void *dst, size_t len) + { + memcpy(dst, _datapt, len); + _datapt += len; + } + + /** + * Peek at bytes in this buffer. Unlike a read operation, this will + * not modify the buffer. + * + * @param dst destination byte buffer. + * @param len number of bytes to extract. + * @param offset byte offset into the buffer. + **/ + void peekBytes(void *dst, size_t len, size_t offset) + { + assert(_freept >= _datapt + offset + len); + memcpy(dst, _datapt + offset, len); + } + + /** + * Check if the data stored in this buffer equals the data stored in + * another buffer. + * + * @return true(equal)/false(not equal) + * @param other the other buffer. + **/ + bool equals(DataBuffer *other); + + /** + * Print a human-readable representation of this buffer to + * stdout. This method may be used for debugging purposes. + **/ + void hexDump(); + + /** + * Run some asserts to verify that this databuffer is in a legal + * state. + **/ + void assertValid() + { + assert(_bufstart <= _datapt); + assert(_datapt <= _freept); + assert(_freept <= _bufend); + } + + /** + * @return true if this buffer is referencing external data. + **/ + bool referencesExternalData() const; + + /** + * Swap the data stored in this buffer with the data stored in + * another buffer. Neither buffer may use externally allocated + * memory when swap is called. + * + * @param other the other buffer. + **/ + void swap(DataBuffer &other); + + Alloc stealBuffer(); +}; + +} // namespace vespalib + diff --git a/vespalib/src/vespa/vespalib/data/memory.h b/vespalib/src/vespa/vespalib/data/memory.h index 74024549a0b..eae7c8d2f23 100644 --- a/vespalib/src/vespa/vespalib/data/memory.h +++ b/vespalib/src/vespa/vespalib/data/memory.h @@ -25,6 +25,7 @@ struct Memory Memory(const vespalib::stringref &str_ref) : data(str_ref.data()), size(str_ref.size()) {} vespalib::string make_string() const; + vespalib::stringref make_stringref() const { return stringref(data, size); } bool operator == (const Memory &rhs) const { return ((size == rhs.size) && ((data == rhs.data) || diff --git a/vespalib/src/vespa/vespalib/util/CMakeLists.txt b/vespalib/src/vespa/vespalib/util/CMakeLists.txt index 310e1dde68d..6d08c3b1126 100644 --- a/vespalib/src/vespa/vespalib/util/CMakeLists.txt +++ b/vespalib/src/vespa/vespalib/util/CMakeLists.txt @@ -14,6 +14,7 @@ vespa_add_library(vespalib_vespalib_util OBJECT classname.cpp closuretask.cpp compress.cpp + compressor.cpp dual_merge_director.cpp error.cpp exception.cpp @@ -25,6 +26,7 @@ vespa_add_library(vespalib_vespalib_util OBJECT host_name.cpp joinable.cpp left_right_heap.cpp + lz4compressor.cpp md5.c printable.cpp priority_queue.cpp @@ -47,5 +49,6 @@ vespa_add_library(vespalib_vespalib_util OBJECT threadstackexecutorbase.cpp time_tracker.cpp valgrind.cpp + zstdcompressor.cpp DEPENDS ) diff --git a/vespalib/src/vespa/vespalib/util/compress.cpp b/vespalib/src/vespa/vespalib/util/compress.cpp index bf32fcfe593..617b632e0bf 100644 --- a/vespalib/src/vespa/vespalib/util/compress.cpp +++ b/vespalib/src/vespa/vespalib/util/compress.cpp @@ -4,8 +4,7 @@ #include "stringfmt.h" #include "exceptions.h" -namespace vespalib { -namespace compress { +namespace vespalib::compress { size_t Integer::compressedPositiveLength(uint64_t n) { @@ -84,4 +83,3 @@ size_t Integer::compress(int64_t n, void *destination) } } -} diff --git a/vespalib/src/vespa/vespalib/util/compressionconfig.h b/vespalib/src/vespa/vespalib/util/compressionconfig.h new file mode 100644 index 00000000000..bb54a74ea41 --- /dev/null +++ b/vespalib/src/vespa/vespalib/util/compressionconfig.h @@ -0,0 +1,97 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include <cmath> +#include <cstdint> +#include <cstddef> +#include <cstring> + +namespace vespalib::compression { + +struct CompressionConfig { + enum Type { + NONE = 0, + HISTORIC_1 = 1, + HISTORIC_2 = 2, + HISTORIC_3 = 3, + HISTORIC_4 = 4, + UNCOMPRESSABLE = 5, + LZ4 = 6, + ZSTD = 7 + }; + + CompressionConfig() + : type(NONE), compressionLevel(0), threshold(90), minSize(0) {} + CompressionConfig(Type t) + : type(t), compressionLevel(9), threshold(90), minSize(0) {} + + CompressionConfig(Type t, uint8_t level, uint8_t minRes) + : type(t), compressionLevel(level), threshold(minRes), minSize(0) {} + + CompressionConfig(Type t, uint8_t lvl, uint8_t minRes, size_t minSz) + : type(t), compressionLevel(lvl), threshold(minRes), minSize(minSz) {} + + bool operator==(const CompressionConfig& o) const { + return (type == o.type + && compressionLevel == o.compressionLevel + && threshold == o.threshold); + } + bool operator!=(const CompressionConfig& o) const { + return !operator==(o); + } + + static Type toType(uint32_t val) { + switch (val) { + case 1: return HISTORIC_1; + case 2: return HISTORIC_2; + case 3: return HISTORIC_3; + case 4: return HISTORIC_4; + case 5: return UNCOMPRESSABLE; + case 6: return LZ4; + case 7: return ZSTD; + default: return NONE; + } + } + static Type toType(const char * val) { + if (strncasecmp(val, "lz4", 3) == 0) { + return LZ4; + } if (strncasecmp(val, "zstd", 4) == 0) { + return ZSTD; + } + return NONE; + } + static bool isCompressed(Type type) { + return (type != CompressionConfig::NONE && + type != CompressionConfig::UNCOMPRESSABLE); + } + bool useCompression() const { return isCompressed(type); } + + Type type; + uint8_t compressionLevel; + uint8_t threshold; + size_t minSize; +}; + +class CompressionInfo +{ +public: + CompressionInfo(size_t uncompressedSize) + : _uncompressedSize(uncompressedSize), _compressedSize(uncompressedSize) { } + CompressionInfo(size_t uncompressedSize, size_t compressedSize) + : _uncompressedSize(uncompressedSize), _compressedSize(compressedSize) { } + size_t getUncompressedSize() const { return _uncompressedSize; } + size_t getCompressedSize() const { return _compressedSize; } + double getCompressionRatio() const { return _uncompressedSize/_compressedSize; } +private: + size_t _uncompressedSize; + size_t _compressedSize; +}; + +inline CompressionInfo operator + (const CompressionInfo & a, const CompressionInfo & b) +{ + return CompressionInfo(a.getUncompressedSize() + b.getUncompressedSize(), a.getCompressedSize() + b.getCompressedSize()); +} + +} + + diff --git a/vespalib/src/vespa/vespalib/util/compressor.cpp b/vespalib/src/vespa/vespalib/util/compressor.cpp new file mode 100644 index 00000000000..6853c8375fd --- /dev/null +++ b/vespalib/src/vespa/vespalib/util/compressor.cpp @@ -0,0 +1,138 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "lz4compressor.h" +#include "zstdcompressor.h" +#include <vespa/vespalib/util/memory.h> +#include <vespa/vespalib/util/stringfmt.h> +#include <vespa/vespalib/data/databuffer.h> + +using vespalib::alloc::Alloc; + +namespace vespalib::compression { + +CompressionConfig::Type +compress(ICompressor & compressor, const CompressionConfig & compression, const ConstBufferRef & org, DataBuffer & dest) +{ + CompressionConfig::Type type(CompressionConfig::NONE); + dest.ensureFree(compressor.adjustProcessLen(0, org.size())); + size_t compressedSize(dest.getFreeLen()); + if (compressor.process(compression, org.c_str(), org.size(), dest.getFree(), compressedSize)) { + if (compressedSize < ((org.size() * compression.threshold)/100)) { + dest.moveFreeToData(compressedSize); + type = compression.type; + } + } + return type; +} + +CompressionConfig::Type +docompress(const CompressionConfig & compression, const ConstBufferRef & org, DataBuffer & dest) +{ + CompressionConfig::Type type(CompressionConfig::NONE); + switch (compression.type) { + case CompressionConfig::LZ4: + { + LZ4Compressor lz4; + type = compress(lz4, compression, org, dest); + } + break; + case CompressionConfig::ZSTD: + { + ZStdCompressor zstd; + type = compress(zstd, compression, org, dest); + } + break; + case CompressionConfig::NONE: + default: + break; + } + return type; +} + +CompressionConfig::Type +compress(const CompressionConfig & compression, const ConstBufferRef & org, DataBuffer & dest, bool allowSwap) +{ + CompressionConfig::Type type(CompressionConfig::NONE); + if (org.size() >= compression.minSize) { + type = docompress(compression, org, dest); + } + if (type == CompressionConfig::NONE) { + if (allowSwap) { + DataBuffer tmp(const_cast<char *>(org.c_str()), org.size()); + tmp.moveFreeToData(org.size()); + dest.swap(tmp); + } else { + dest.writeBytes(org.c_str(), org.size()); + } + } + return type; +} + + +void +decompress(ICompressor & decompressor, size_t uncompressedLen, const ConstBufferRef & org, DataBuffer & dest, bool allowSwap) +{ + dest.ensureFree(uncompressedLen); + size_t realUncompressedLen(dest.getFreeLen()); + if ( ! decompressor.unprocess(org.c_str(), org.size(), dest.getFree(), realUncompressedLen) ) { + if ( uncompressedLen < realUncompressedLen) { + if (allowSwap) { + DataBuffer tmp(const_cast<char *>(org.c_str()), org.size()); + tmp.moveFreeToData(org.size()); + dest.swap(tmp); + } else { + dest.writeBytes(org.c_str(), org.size()); + } + } else { + throw std::runtime_error(make_string("unprocess failed had %" PRIu64 ", wanted %" PRId64 ", got %" PRIu64, + org.size(), uncompressedLen, realUncompressedLen)); + } + } else { + dest.moveFreeToData(realUncompressedLen); + } +} + +void +decompress(const CompressionConfig::Type & type, size_t uncompressedLen, const ConstBufferRef & org, DataBuffer & dest, bool allowSwap) +{ + switch (type) { + case CompressionConfig::LZ4: + { + LZ4Compressor lz4; + decompress(lz4, uncompressedLen, org, dest, allowSwap); + } + break; + case CompressionConfig::ZSTD: + { + ZStdCompressor zstd; + decompress(zstd, uncompressedLen, org, dest, allowSwap); + } + break; + case CompressionConfig::NONE: + case CompressionConfig::UNCOMPRESSABLE: + if (allowSwap) { + DataBuffer tmp(const_cast<char *>(org.c_str()), org.size()); + tmp.moveFreeToData(org.size()); + dest.swap(tmp); + } else { + dest.writeBytes(org.c_str(), org.size()); + } + break; + default: + throw std::runtime_error(make_string("Unable to handle decompression of type '%d'", type)); + break; + } +} + +size_t computeMaxCompressedsize(CompressionConfig::Type type, size_t payloadSize) { + if (type == CompressionConfig::LZ4) { + LZ4Compressor lz4; + return lz4.adjustProcessLen(0, payloadSize); + } else if (type == CompressionConfig::ZSTD) { + ZStdCompressor zstd; + return zstd.adjustProcessLen(0, payloadSize); + } + return payloadSize; +} + +} diff --git a/vespalib/src/vespa/vespalib/util/compressor.h b/vespalib/src/vespa/vespalib/util/compressor.h new file mode 100644 index 00000000000..8f319a6735d --- /dev/null +++ b/vespalib/src/vespa/vespalib/util/compressor.h @@ -0,0 +1,46 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include "compressionconfig.h" +#include "buffer.h" + +namespace vespalib { class DataBuffer; } + +namespace vespalib::compression { + +class ICompressor +{ +public: + virtual ~ICompressor() { } + virtual bool process(const CompressionConfig& config, const void * input, size_t inputLen, void * output, size_t & outputLen) = 0; + virtual bool unprocess(const void * input, size_t inputLen, void * output, size_t & outputLen) = 0; + virtual size_t adjustProcessLen(uint16_t options, size_t len) const = 0; +}; + +/** + * Will try to compress a buffer according to the config. If the criteria can not + * be met it will return NONE and dest will get the input buffer. + * @param compression is config for how to compress and what criteria to meet. + * @param org is the original input buffer. + * @param dest is the destination buffer. The compressed data will be appended unless allowSwap is true + * and it is not compressable. Then it will be swapped in. + * @param allowSwap will tell it the data must be appended or if it can be swapped in if it is uncompressable or config is NONE. + */ +CompressionConfig::Type compress(const CompressionConfig & compression, const vespalib::ConstBufferRef & org, vespalib::DataBuffer & dest, bool allowSwap); + +/** + * Will try to decompress a buffer according to the config. + * be met it will return NONE and dest will get the input buffer. + * @param compression is the compression type used for the buffer. + * @param uncompressedLen is the length of the uncompressed data. + * @param org is the original input buffer. + * @param dest is the destination buffer. The decompressed data will be + * appended unless allowSwap is true and compression is NONE. + * Then it will be swapped in. + * @param allowSwap will tell it the data must be appended or if it can be swapped in if compression type is NONE. + */ +void decompress(const CompressionConfig::Type & compression, size_t uncompressedLen, const vespalib::ConstBufferRef & org, vespalib::DataBuffer & dest, bool allowSwap); + +size_t computeMaxCompressedsize(CompressionConfig::Type type, size_t uncompressedSize); + +} diff --git a/vespalib/src/vespa/vespalib/util/lz4compressor.cpp b/vespalib/src/vespa/vespalib/util/lz4compressor.cpp new file mode 100644 index 00000000000..366609ce7bf --- /dev/null +++ b/vespalib/src/vespa/vespalib/util/lz4compressor.cpp @@ -0,0 +1,46 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "lz4compressor.h" +#include <vespa/vespalib/util/alloc.h> +#include <lz4.h> +#include <lz4hc.h> +#include <cassert> + +using vespalib::alloc::Alloc; + +namespace vespalib::compression { + +size_t LZ4Compressor::adjustProcessLen(uint16_t, size_t len) const { return LZ4_compressBound(len); } + +bool +LZ4Compressor::process(const CompressionConfig& config, const void * inputV, size_t inputLen, void * outputV, size_t & outputLenV) +{ + const char * input(static_cast<const char *>(inputV)); + char * output(static_cast<char *>(outputV)); + int sz(-1); + int maxOutputLen = LZ4_compressBound(inputLen); + if (config.compressionLevel > 6) { + Alloc state = Alloc::alloc(LZ4_sizeofStateHC()); + sz = LZ4_compress_HC_extStateHC(state.get(), input, output, inputLen, maxOutputLen, config.compressionLevel); + } else { + Alloc state = Alloc::alloc(LZ4_sizeofState()); + sz = LZ4_compress_fast_extState(state.get(), input, output, inputLen, maxOutputLen, 1); + } + assert(sz != 0); + outputLenV = sz; + return (sz != 0); + +} + +bool +LZ4Compressor::unprocess(const void * inputV, size_t inputLen, void * outputV, size_t & outputLenV) +{ + const char * input(static_cast<const char *>(inputV)); + char * output(static_cast<char *>(outputV)); + int sz = LZ4_decompress_safe(input, output, inputLen, outputLenV); + assert(sz > 0); + outputLenV = sz; + return (sz > 0); +} + +} diff --git a/vespalib/src/vespa/vespalib/util/lz4compressor.h b/vespalib/src/vespa/vespalib/util/lz4compressor.h new file mode 100644 index 00000000000..558088a914c --- /dev/null +++ b/vespalib/src/vespa/vespalib/util/lz4compressor.h @@ -0,0 +1,17 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include "compressor.h" + +namespace vespalib::compression { + +class LZ4Compressor : public ICompressor +{ +public: + bool process(const CompressionConfig& config, const void * input, size_t inputLen, void * output, size_t & outputLen) override; + bool unprocess(const void * input, size_t inputLen, void * output, size_t & outputLen) override; + size_t adjustProcessLen(uint16_t options, size_t len) const override; +}; + +} + diff --git a/vespalib/src/vespa/vespalib/util/zstdcompressor.cpp b/vespalib/src/vespa/vespalib/util/zstdcompressor.cpp new file mode 100644 index 00000000000..2f09abf4846 --- /dev/null +++ b/vespalib/src/vespa/vespalib/util/zstdcompressor.cpp @@ -0,0 +1,65 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "zstdcompressor.h" +#include <vespa/vespalib/util/alloc.h> +#include <vespa/vespalib/util/sync.h> +#include <zstd.h> +#include <vector> +#include <cassert> + +using vespalib::alloc::Alloc; + +namespace vespalib::compression { + +namespace { + +class CompressContext { +public: + CompressContext() : _ctx(ZSTD_createCCtx()) {} + ~CompressContext() { ZSTD_freeCCtx(_ctx); } + ZSTD_CCtx * get() { return _ctx; } +private: + ZSTD_CCtx * _ctx; +}; +class DecompressContext { +public: + DecompressContext() : _ctx(ZSTD_createDCtx()) {} + ~DecompressContext() { ZSTD_freeDCtx(_ctx); } + ZSTD_DCtx * get() { return _ctx; } +private: + ZSTD_DCtx * _ctx; +}; + +thread_local std::unique_ptr<CompressContext> _tlCompressState; +thread_local std::unique_ptr<DecompressContext> _tlDecompressState; + +} + +size_t ZStdCompressor::adjustProcessLen(uint16_t, size_t len) const { return ZSTD_compressBound(len); } + +bool +ZStdCompressor::process(const CompressionConfig& config, const void * inputV, size_t inputLen, void * outputV, size_t & outputLenV) +{ + size_t maxOutputLen = ZSTD_compressBound(inputLen); + if ( ! _tlCompressState) { + _tlCompressState = std::make_unique<CompressContext>(); + } + size_t sz = ZSTD_compressCCtx(_tlCompressState->get(), outputV, maxOutputLen, inputV, inputLen, config.compressionLevel); + assert( ! ZSTD_isError(sz) ); + outputLenV = sz; + return ! ZSTD_isError(sz); +} + +bool +ZStdCompressor::unprocess(const void * inputV, size_t inputLen, void * outputV, size_t & outputLenV) +{ + if ( ! _tlDecompressState) { + _tlDecompressState = std::make_unique<DecompressContext>(); + } + size_t sz = ZSTD_decompressDCtx(_tlDecompressState->get(), outputV, outputLenV, inputV, inputLen); + assert( ! ZSTD_isError(sz) ); + outputLenV = sz; + return ! ZSTD_isError(sz); +} + +} diff --git a/vespalib/src/vespa/vespalib/util/zstdcompressor.h b/vespalib/src/vespa/vespalib/util/zstdcompressor.h new file mode 100644 index 00000000000..0d84b8e7ca4 --- /dev/null +++ b/vespalib/src/vespa/vespalib/util/zstdcompressor.h @@ -0,0 +1,17 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include "compressor.h" + +namespace vespalib::compression { + +class ZStdCompressor : public ICompressor +{ +public: + bool process(const CompressionConfig& config, const void * input, size_t inputLen, void * output, size_t & outputLen) override; + bool unprocess(const void * input, size_t inputLen, void * output, size_t & outputLen) override; + size_t adjustProcessLen(uint16_t options, size_t len) const override; +}; + +} + |