diff options
author | Jon Bratseth <bratseth@yahoo-inc.com> | 2017-06-13 18:52:46 +0200 |
---|---|---|
committer | Jon Bratseth <bratseth@yahoo-inc.com> | 2017-06-13 18:52:46 +0200 |
commit | 0155137e7d56730665d406c37aa75649e5a3418a (patch) | |
tree | 05100870215c7852cc27b6970b60c9ff5f77e763 /searchlib | |
parent | 4e5d87a9baa4176a1089f7afd1f433a8da275725 (diff) | |
parent | 6a64952133fc05136a0e445851b49bd0fc71c00e (diff) |
Merge with master
Diffstat (limited to 'searchlib')
6 files changed, 90 insertions, 63 deletions
diff --git a/searchlib/src/tests/docstore/chunk/chunk_test.cpp b/searchlib/src/tests/docstore/chunk/chunk_test.cpp index 7411fc668f8..4687f45acde 100644 --- a/searchlib/src/tests/docstore/chunk/chunk_test.cpp +++ b/searchlib/src/tests/docstore/chunk/chunk_test.cpp @@ -68,4 +68,34 @@ TEST("require that Chunk formats does not change between releases") testChunkFormat(v2, 34, "34 015BA32DE7000000220000000010ABCDEF987654321000000000000000074D000694"); } +constexpr const char * MY_LONG_STRING = "This is medium long string that hopefully will compress to something where lz4, zstandard and none" +" will make a difference. The intentions is to verify that we trigger all compresssions possible and are able to decompress them too." +" I guess that we need a considerable length in order to get the rather inefficient lz4 compression triger. ZStandard compression" +" should trigger a lot earlier"; + +void verifyChunkCompression(CompressionConfig::Type cfgType, const void * buf, size_t sz, size_t expectedLen) { + uint64_t MAGIC_CONTENT(0xabcdef9876543210); + ChunkFormatV2 chunk(10); + chunk.getBuffer() << MAGIC_CONTENT; + chunk.getBuffer().write(buf, sz); + vespalib::DataBuffer buffer; + CompressionConfig cfg(cfgType); + chunk.pack(7, buffer, cfg); + EXPECT_EQUAL(expectedLen, buffer.getDataLen()); + vespalib::nbostream is(buffer.getData(), buffer.getDataLen()); + ChunkFormat::UP deserialized = ChunkFormat::deserialize(buffer.getData(), buffer.getDataLen(), false); + uint64_t magic(0); + deserialized->getBuffer() >> magic; + EXPECT_EQUAL(MAGIC_CONTENT, magic); + std::vector<char> v(sz); + deserialized->getBuffer().read(&v[0], sz); + EXPECT_EQUAL(0, memcmp(buf, &v[0], sz)); +} + +TEST("require that V2 can create and handle lz4, zstd, and none") { + verifyChunkCompression(CompressionConfig::NONE, MY_LONG_STRING, strlen(MY_LONG_STRING), 421); + verifyChunkCompression(CompressionConfig::LZ4, MY_LONG_STRING, strlen(MY_LONG_STRING), 360); + verifyChunkCompression(CompressionConfig::ZSTD, MY_LONG_STRING, strlen(MY_LONG_STRING), 282); +} + TEST_MAIN() { TEST_RUN_ALL(); } diff --git a/searchlib/src/vespa/searchlib/common/packets.cpp b/searchlib/src/vespa/searchlib/common/packets.cpp index d705d3a2240..667334bf64e 100644 --- a/searchlib/src/vespa/searchlib/common/packets.cpp +++ b/searchlib/src/vespa/searchlib/common/packets.cpp @@ -7,6 +7,7 @@ #include <vespa/document/util/compressor.h> #include <vespa/vespalib/util/exceptions.h> #include <vespa/vespalib/data/slime/slime.h> +#include <vespa/vespalib/data/databuffer.h> #include <vespa/log/log.h> LOG_SETUP(".searchlib.common.fs4packets"); @@ -150,7 +151,7 @@ FS4PersistentPacketStreamer::Decode(FNET_DataBuffer *src, uint32_t plen, uint32_ uint32_t uncompressed_size = src->ReadInt32(); ConstBufferRef org(src->GetData(), plen - sizeof(uint32_t)); vespalib::DataBuffer uncompressed(uncompressed_size); - document::decompress(compressionType, uncompressed_size, org, uncompressed, false); + document::compression::decompress(compressionType, uncompressed_size, org, uncompressed, false); FNET_DataBuffer buf(uncompressed.getData(), uncompressed.getDataLen()); decodePacket(packet, buf, uncompressed_size, pcode); src->DataToDead(plen - sizeof(uint32_t)); @@ -191,7 +192,7 @@ FS4PersistentPacketStreamer::Encode(FNET_Packet *packet, uint32_t chid, FNET_Dat CompressionConfig config(_compressionType, _compressionLevel, 90); ConstBufferRef org(dst->GetData() + packet_start + header_len, body_len); vespalib::DataBuffer compressed(org.size()); - CompressionConfig::Type r = document::compress(config, org, compressed, false); + CompressionConfig::Type r = document::compression::compress(config, org, compressed, false); if (r != CompressionConfig::NONE) { dst->DataToFree(body_len + header_len); // sizeof(data + header + uncompressed_size) - sizeof(uint32_t) @@ -454,7 +455,7 @@ FS4Packet_PreSerialized::FS4Packet_PreSerialized(FNET_Packet & packet) 90); ConstBufferRef org(tmp.GetData(), tmp.GetDataLen()); vespalib::DataBuffer compressed(org.size()); - _compressionType = document::compress(config, org, compressed, false); + _compressionType = document::compression::compress(config, org, compressed, false); if (_compressionType != CompressionConfig::NONE) { _data.WriteInt32Fast(body_len); _data.WriteBytes(compressed.getData(), compressed.getDataLen()); diff --git a/searchlib/src/vespa/searchlib/docstore/chunkformat.cpp b/searchlib/src/vespa/searchlib/docstore/chunkformat.cpp index 5aef795b8bd..204c7b07acb 100644 --- a/searchlib/src/vespa/searchlib/docstore/chunkformat.cpp +++ b/searchlib/src/vespa/searchlib/docstore/chunkformat.cpp @@ -8,6 +8,10 @@ namespace search { using vespalib::make_string; using vespalib::Exception; +using document::compression::compress; +using document::compression::decompress; +using document::compression::computeMaxCompressedsize; +using document::CompressionConfig; ChunkException::ChunkException(const vespalib::stringref & msg, const vespalib::stringref & location) : Exception(make_string("Illegal chunk: %s", msg.c_str()), location) @@ -15,7 +19,7 @@ ChunkException::ChunkException(const vespalib::stringref & msg, const vespalib:: } void -ChunkFormat::pack(uint64_t lastSerial, vespalib::DataBuffer & compressed, const document::CompressionConfig & compression) +ChunkFormat::pack(uint64_t lastSerial, vespalib::DataBuffer & compressed, const CompressionConfig & compression) { vespalib::nbostream & os = _dataBuf; os << lastSerial; @@ -29,7 +33,7 @@ ChunkFormat::pack(uint64_t lastSerial, vespalib::DataBuffer & compressed, const const size_t oldPos(compressed.getDataLen()); compressed.writeInt8(compression.type); compressed.writeInt32(os.size()); - document::CompressionConfig::Type type(document::compress(compression, vespalib::ConstBufferRef(os.c_str(), os.size()), compressed, false)); + CompressionConfig::Type type(compress(compression, vespalib::ConstBufferRef(os.c_str(), os.size()), compressed, false)); if (compression.type != type) { compressed.getData()[oldPos] = type; } @@ -42,28 +46,22 @@ ChunkFormat::pack(uint64_t lastSerial, vespalib::DataBuffer & compressed, const } size_t -ChunkFormat::getMaxPackSize(const document::CompressionConfig & compression) const +ChunkFormat::getMaxPackSize(const CompressionConfig & compression) const { const size_t OVERHEAD(0); const size_t MINSIZE(1 + 1 + 4 + 4 + includeSerializedSize() ? 4 : 0); // version + type + real length + crc + lastserial const size_t formatSpecificSize(getHeaderSize()); size_t rawSize(MINSIZE + formatSpecificSize + OVERHEAD); const size_t payloadSize(_dataBuf.size() + 8); - // This is a little dirty -> need interface. - if (compression.type == document::CompressionConfig::LZ4) { - document::LZ4Compressor lz4; - rawSize += lz4.adjustProcessLen(0, payloadSize); - } else { - rawSize += payloadSize; - } - return rawSize; + return rawSize + computeMaxCompressedsize(compression.type, payloadSize); } void ChunkFormat::verifyCompression(uint8_t type) { - if ((type != document::CompressionConfig::LZ4) && - (type != document::CompressionConfig::NONE)) { + if ((type != CompressionConfig::LZ4) && + (type != CompressionConfig::ZSTD) && + (type != CompressionConfig::NONE)) { throw ChunkException(make_string("Unknown compressiontype %d", type), VESPA_STRLOC); } } @@ -145,7 +143,7 @@ ChunkFormat::deserializeBody(vespalib::nbostream & is) // This is a dirty trick to fool some odd sanity checking in DataBuffer::swap vespalib::DataBuffer uncompressed(const_cast<char *>(is.peek()), (size_t)0); vespalib::ConstBufferRef data(is.peek(), is.size() - sizeof(uint32_t)); - document::decompress(document::CompressionConfig::Type(type), uncompressedLen, data, uncompressed, true); + decompress(CompressionConfig::Type(type), uncompressedLen, data, uncompressed, true); assert(uncompressed.getData() == uncompressed.getDead()); if (uncompressed.getData() != data.c_str()) { const size_t sz(uncompressed.getDataLen()); diff --git a/searchlib/src/vespa/searchlib/docstore/documentstore.cpp b/searchlib/src/vespa/searchlib/docstore/documentstore.cpp index 744eb07c5f5..d154250d6a1 100644 --- a/searchlib/src/vespa/searchlib/docstore/documentstore.cpp +++ b/searchlib/src/vespa/searchlib/docstore/documentstore.cpp @@ -8,6 +8,11 @@ #include <vespa/vespalib/data/databuffer.h> #include <vespa/document/util/compressor.h> +using document::DocumentTypeRepo; +using document::CompressionConfig; +using document::compression::compress; +using document::compression::decompress; + namespace search { namespace { @@ -15,13 +20,13 @@ namespace { class DocumentVisitorAdapter : public IBufferVisitor { public: - DocumentVisitorAdapter(const document::DocumentTypeRepo & repo, IDocumentVisitor & visitor) : + DocumentVisitorAdapter(const DocumentTypeRepo & repo, IDocumentVisitor & visitor) : _repo(repo), _visitor(visitor) { } void visit(uint32_t lid, vespalib::ConstBufferRef buf) override; private: - const document::DocumentTypeRepo & _repo; + const DocumentTypeRepo & _repo; IDocumentVisitor & _visitor; }; @@ -45,7 +50,7 @@ public: using Alloc = vespalib::alloc::Alloc; typedef std::unique_ptr<Value> UP; - Value() : _compressedSize(0), _uncompressedSize(0), _compression(document::CompressionConfig::NONE) {} + Value() : _compressedSize(0), _uncompressedSize(0), _compression(CompressionConfig::NONE) {} Value(Value &&rhs) : _compressedSize(rhs._compressedSize), @@ -69,12 +74,12 @@ public: return *this; } - void setCompression(document::CompressionConfig::Type comp, size_t uncompressedSize) { + void setCompression(CompressionConfig::Type comp, size_t uncompressedSize) { _compression = comp; _uncompressedSize = uncompressedSize; } - document::CompressionConfig::Type getCompression() const { return _compression; } + CompressionConfig::Type getCompression() const { return _compression; } size_t getUncompressedSize() const { return _uncompressedSize; } @@ -82,13 +87,13 @@ public: * Compress buffer into temporary buffer and copy temporary buffer to * value along with compression config. */ - void set(vespalib::DataBuffer &&buf, ssize_t len, const document::CompressionConfig &compression); + void set(vespalib::DataBuffer &&buf, ssize_t len, const CompressionConfig &compression); /** * Decompress value into temporary buffer and deserialize document from * the temporary buffer. */ - document::Document::UP deserializeDocument(const document::DocumentTypeRepo &repo); + document::Document::UP deserializeDocument(const DocumentTypeRepo &repo); size_t size() const { return _compressedSize; } bool empty() const { return size() == 0; } @@ -98,36 +103,32 @@ public: private: size_t _compressedSize; size_t _uncompressedSize; - document::CompressionConfig::Type _compression; + CompressionConfig::Type _compression; Alloc _buf; }; class BackingStore { public: - BackingStore(IDataStore &store, const document::CompressionConfig &compression) : + BackingStore(IDataStore &store, const CompressionConfig &compression) : _backingStore(store), _compression(compression) { } bool read(DocumentIdT key, Value &value) const; - void visit(const IDocumentStore::LidVector &lids, const document::DocumentTypeRepo &repo, IDocumentVisitor &visitor) const; + void visit(const IDocumentStore::LidVector &lids, const DocumentTypeRepo &repo, IDocumentVisitor &visitor) const; void write(DocumentIdT, const Value &) {} void erase(DocumentIdT) {} - const document::CompressionConfig &getCompression(void) const { return _compression; } + const CompressionConfig &getCompression(void) const { return _compression; } private: IDataStore &_backingStore; - const document::CompressionConfig _compression; + const CompressionConfig _compression; }; void -Value::set(vespalib::DataBuffer &&buf, - ssize_t len, - const document::CompressionConfig &compression) { +Value::set(vespalib::DataBuffer &&buf, ssize_t len, const CompressionConfig &compression) { //Underlying buffer must be identical to allow swap. vespalib::DataBuffer compressed(buf.getData(), 0u); - document::CompressionConfig::Type type = - document::compress(compression, - vespalib::ConstBufferRef(buf.getData(), len), - compressed, true); + CompressionConfig::Type type = compress(compression, vespalib::ConstBufferRef(buf.getData(), len), + compressed, true); _compressedSize = compressed.getDataLen(); if (buf.getData() == compressed.getData()) { // Uncompressed so we can just steal the underlying buffer. @@ -135,28 +136,25 @@ Value::set(vespalib::DataBuffer &&buf, } else { compressed.stealBuffer().swap(_buf); } - assert(((type == document::CompressionConfig::NONE) && + assert(((type == CompressionConfig::NONE) && (len == ssize_t(_compressedSize))) || - ((type != document::CompressionConfig::NONE) && + ((type != CompressionConfig::NONE) && (len > ssize_t(_compressedSize)))); setCompression(type, len); } document::Document::UP -Value::deserializeDocument(const document::DocumentTypeRepo &repo) { +Value::deserializeDocument(const DocumentTypeRepo &repo) { vespalib::DataBuffer uncompressed((char *) _buf.get(), (size_t) 0); - document::decompress(getCompression(), - getUncompressedSize(), - vespalib::ConstBufferRef(*this, size()), - uncompressed, true); + decompress(getCompression(), getUncompressedSize(), vespalib::ConstBufferRef(*this, size()), uncompressed, true); vespalib::nbostream is(uncompressed.getData(), uncompressed.getDataLen()); return document::Document::UP(new document::Document(repo, is)); } void -BackingStore::visit(const IDocumentStore::LidVector &lids, const document::DocumentTypeRepo &repo, +BackingStore::visit(const IDocumentStore::LidVector &lids, const DocumentTypeRepo &repo, IDocumentVisitor &visitor) const { DocumentVisitorAdapter adapter(repo, visitor); _backingStore.read(lids, adapter); @@ -213,7 +211,7 @@ DocumentStore::useCache() const { } void -DocumentStore::visit(const LidVector & lids, const document::DocumentTypeRepo &repo, IDocumentVisitor & visitor) const +DocumentStore::visit(const LidVector & lids, const DocumentTypeRepo &repo, IDocumentVisitor & visitor) const { if (useCache() && _config.allowVisitCaching() && visitor.allowVisitCaching()) { docstore::BlobSet blobSet = _visitCache->read(lids).getBlobSet(); @@ -227,7 +225,7 @@ DocumentStore::visit(const LidVector & lids, const document::DocumentTypeRepo &r } document::Document::UP -DocumentStore::read(DocumentIdT lid, const document::DocumentTypeRepo &repo) const +DocumentStore::read(DocumentIdT lid, const DocumentTypeRepo &repo) const { document::Document::UP retval; Value value; @@ -305,25 +303,23 @@ DocumentStore::getLastFlushTime() const template <class Visitor> class DocumentStore::WrapVisitor : public IDataStoreVisitor { - Visitor &_visitor; - const document::DocumentTypeRepo &_repo; - const document::CompressionConfig &_compression; - IDocumentStore &_ds; - uint64_t _syncToken; + Visitor &_visitor; + const DocumentTypeRepo &_repo; + const CompressionConfig &_compression; + IDocumentStore &_ds; + uint64_t _syncToken; public: void visit(uint32_t lid, const void *buffer, size_t sz) override; WrapVisitor(Visitor &visitor, - const document::DocumentTypeRepo &repo, - const document::CompressionConfig &compresion, + const DocumentTypeRepo &repo, + const CompressionConfig &compresion, IDocumentStore &ds, uint64_t syncToken); inline void rewrite(uint32_t lid, const document::Document &doc); - inline void rewrite(uint32_t lid); - inline void visitRemove(uint32_t lid); }; @@ -429,8 +425,8 @@ DocumentStore::WrapVisitor<Visitor>::visit(uint32_t lid, template <class Visitor> DocumentStore::WrapVisitor<Visitor>:: WrapVisitor(Visitor &visitor, - const document::DocumentTypeRepo &repo, - const document::CompressionConfig &compression, + const DocumentTypeRepo &repo, + const CompressionConfig &compression, IDocumentStore &ds, uint64_t syncToken) : _visitor(visitor), @@ -445,7 +441,7 @@ WrapVisitor(Visitor &visitor, void DocumentStore::accept(IDocumentStoreReadVisitor &visitor, IDocumentStoreVisitorProgress &visitorProgress, - const document::DocumentTypeRepo &repo) + const DocumentTypeRepo &repo) { WrapVisitor<IDocumentStoreReadVisitor> wrap(visitor, repo, _store->getCompression(), @@ -460,7 +456,7 @@ DocumentStore::accept(IDocumentStoreReadVisitor &visitor, void DocumentStore::accept(IDocumentStoreRewriteVisitor &visitor, IDocumentStoreVisitorProgress &visitorProgress, - const document::DocumentTypeRepo &repo) + const DocumentTypeRepo &repo) { WrapVisitor<IDocumentStoreRewriteVisitor> wrap(visitor, repo, diff --git a/searchlib/src/vespa/searchlib/docstore/visitcache.cpp b/searchlib/src/vespa/searchlib/docstore/visitcache.cpp index e78ef81d127..c68aa00994f 100644 --- a/searchlib/src/vespa/searchlib/docstore/visitcache.cpp +++ b/searchlib/src/vespa/searchlib/docstore/visitcache.cpp @@ -90,7 +90,7 @@ CompressedBlobSet::CompressedBlobSet(const document::CompressionConfig &compress if ( ! _positions.empty() ) { DataBuffer compressed; ConstBufferRef org = uncompressed.getBuffer(); - _compression = document::compress(compression, org, compressed, false); + _compression = document::compression::compress(compression, org, compressed, false); _buffer.resize(compressed.getDataLen()); memcpy(_buffer, compressed.getData(), compressed.getDataLen()); } @@ -99,10 +99,12 @@ CompressedBlobSet::CompressedBlobSet(const document::CompressionConfig &compress BlobSet CompressedBlobSet::getBlobSet() const { + using document::compression::decompress; // These are frequent lage allocations that are to expensive to mmap. DataBuffer uncompressed(0, 1, Alloc::alloc(0, 16 * MemoryAllocator::HUGEPAGE_SIZE)); if ( ! _positions.empty() ) { - document::decompress(_compression, getBufferSize(_positions), ConstBufferRef(_buffer.c_str(), _buffer.size()), uncompressed, false); + decompress(_compression, getBufferSize(_positions), + ConstBufferRef(_buffer.c_str(), _buffer.size()), uncompressed, false); } return BlobSet(_positions, uncompressed.stealBuffer()); } diff --git a/searchlib/src/vespa/searchlib/grouping/sketch.h b/searchlib/src/vespa/searchlib/grouping/sketch.h index 76d42e71e75..bb7db02d81c 100644 --- a/searchlib/src/vespa/searchlib/grouping/sketch.h +++ b/searchlib/src/vespa/searchlib/grouping/sketch.h @@ -209,7 +209,7 @@ compress_buckets_into(char *buffer, uint32_t size) const { vespalib::ConstBufferRef org(&bucket[0], BUCKET_COUNT); vespalib::DataBuffer compress_buffer(buffer, size); document::CompressionConfig::Type r = - document::compress(config, org, compress_buffer, false); + document::compression::compress(config, org, compress_buffer, false); assert(compress_buffer.getDead() == buffer); if (r == document::CompressionConfig::LZ4) { assert(compress_buffer.getDataLen() < BUCKET_COUNT); @@ -228,7 +228,7 @@ decompress_buckets_from(char *buffer, uint32_t size) { } else { vespalib::ConstBufferRef compressed(buffer, size); vespalib::DataBuffer uncompressed(reinterpret_cast<char *>(&bucket[0]), BUCKET_COUNT); - document::decompress(document::CompressionConfig::LZ4, BUCKET_COUNT, compressed, uncompressed, false); + document::compression::decompress(document::CompressionConfig::LZ4, BUCKET_COUNT, compressed, uncompressed, false); } } template <int BucketBits, typename HashT> |