diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2018-09-24 12:21:28 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-09-24 12:21:28 +0200 |
commit | edd8b607837c1cbdedbbc69b2d92e8f86f6be0cc (patch) | |
tree | aa5cb0d1815b1ab9057b6c684ac6ab2e7ee00489 | |
parent | 3a18bab028097dec03bac8186d06c23536fda1c0 (diff) | |
parent | 85491d4ce0d186321e2c8a34f62ae13cadd6d5a4 (diff) |
Merge pull request #7045 from vespa-engine/balder/add-in-memory-crc-for-summary-cache
Balder/add in memory crc for summary cache
7 files changed, 272 insertions, 195 deletions
diff --git a/searchlib/src/tests/docstore/document_store/document_store_test.cpp b/searchlib/src/tests/docstore/document_store/document_store_test.cpp index 0ef04d0e722..649fb675dca 100644 --- a/searchlib/src/tests/docstore/document_store/document_store_test.cpp +++ b/searchlib/src/tests/docstore/document_store/document_store_test.cpp @@ -1,6 +1,7 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include <vespa/vespalib/testkit/test_kit.h> #include <vespa/searchlib/docstore/logdocumentstore.h> +#include <vespa/searchlib/docstore/value.h> #include <vespa/searchlib/docstore/cachestats.h> #include <vespa/document/repo/documenttyperepo.h> #include <vespa/document/fieldvalue/document.h> @@ -81,4 +82,70 @@ TEST("require that LogDocumentStore::Config equality operator detects inequality EXPECT_FALSE(C(DC(), LC().setMaxBucketSpread(7)) == C()); } +using search::docstore::Value; +vespalib::stringref S1("this is a string long enough to be compressed and is just used for sanity checking of compression" + "Adding some repeatble sequences like aaaaaaaaaaaaaaaaaaaaaa bbbbbbbbbbbbbbbbbbbbbbb to ensure compression" + "xyz xyz xyz xyz xyz xyz xyz xyz xyz xyz xyz xyz xyz xyz xyz xyz xyz xyz xyz xyz xyz xyz xyz xyz"); + +Value createValue(vespalib::stringref s, const CompressionConfig & cfg) { + Value v(7); + vespalib::DataBuffer input; + input.writeBytes(s.data(), s.size()); + v.set(std::move(input), s.size(), cfg); + return v; +} +void verifyValue(vespalib::stringref s, const Value & v) { + Value::Result result = v.decompressed(); + ASSERT_TRUE(result.second); + EXPECT_EQUAL(s.size(), v.getUncompressedSize()); + EXPECT_EQUAL(7u, v.getSyncToken()); + EXPECT_EQUAL(0, memcmp(s.data(), result.first.getData(), result.first.getDataLen())); +} + +TEST("require that Value and cache entries have expected size") { + using pair = std::pair<DocumentIdT, Value>; + using Node = vespalib::hash_node<pair>; + EXPECT_EQUAL(64ul, sizeof(Value)); + EXPECT_EQUAL(72ul, sizeof(pair)); + EXPECT_EQUAL(80ul, sizeof(Node)); +} + +TEST("require that Value can store uncompressed data") { + Value v = createValue(S1, CompressionConfig::NONE); + verifyValue(S1, v); +} + +TEST("require that Value can be moved") { + Value v = createValue(S1, CompressionConfig::NONE); + Value m = std::move(v); + verifyValue(S1, m); +} + +TEST("require that Value can be copied") { + Value v = createValue(S1, CompressionConfig::NONE); + Value copy(v); + verifyValue(S1, v); + verifyValue(S1, copy); +} + +TEST("require that Value can store lz4 compressed data") { + Value v = createValue(S1, CompressionConfig::LZ4); + EXPECT_EQUAL(CompressionConfig::LZ4, v.getCompression()); + EXPECT_EQUAL(164u, v.size()); + verifyValue(S1, v); +} + +TEST("require that Value can store zstd compressed data") { + Value v = createValue(S1, CompressionConfig::ZSTD); + EXPECT_EQUAL(CompressionConfig::ZSTD, v.getCompression()); + EXPECT_EQUAL(128u, v.size()); + verifyValue(S1, v); +} + +TEST("require that Value can detect if output not equal to input") { + Value v = createValue(S1, CompressionConfig::NONE); + static_cast<uint8_t *>(v.get())[8] ^= 0xff; + EXPECT_FALSE(v.decompressed().second); +} + TEST_MAIN() { TEST_RUN_ALL(); } diff --git a/searchlib/src/tests/docstore/logdatastore/logdatastore_test.cpp b/searchlib/src/tests/docstore/logdatastore/logdatastore_test.cpp index 34046f551d9..eb49a556bdc 100644 --- a/searchlib/src/tests/docstore/logdatastore/logdatastore_test.cpp +++ b/searchlib/src/tests/docstore/logdatastore/logdatastore_test.cpp @@ -622,7 +622,7 @@ TEST("test that the integrated visit cache works.") { for (size_t i(1); i <= 100; i++) { vcs.verifyRead(i); } - constexpr size_t BASE_SZ = 21374; + constexpr size_t BASE_SZ = 22174; TEST_DO(verifyCacheStats(ds.getCacheStats(), 0, 100, 100, BASE_SZ)); for (size_t i(1); i <= 100; i++) { vcs.verifyRead(i); @@ -642,22 +642,22 @@ TEST("test that the integrated visit cache works.") { vcs.verifyVisit({7,9,17,19,67,88}, true); TEST_DO(verifyCacheStats(ds.getCacheStats(), 101, 102, 99, BASE_SZ+130)); vcs.verifyVisit({7,9,17,19,67,88,89}, true); - TEST_DO(verifyCacheStats(ds.getCacheStats(), 101, 103, 99, BASE_SZ+201)); + TEST_DO(verifyCacheStats(ds.getCacheStats(), 101, 103, 99, BASE_SZ+180)); vcs.rewrite(17); - TEST_DO(verifyCacheStats(ds.getCacheStats(), 101, 103, 97, BASE_SZ-657)); + TEST_DO(verifyCacheStats(ds.getCacheStats(), 101, 103, 97, BASE_SZ-680)); vcs.verifyVisit({7,9,17,19,67,88,89}, true); - TEST_DO(verifyCacheStats(ds.getCacheStats(), 101, 104, 98, BASE_SZ-3)); + TEST_DO(verifyCacheStats(ds.getCacheStats(), 101, 104, 98, BASE_SZ-20)); vcs.remove(17); - TEST_DO(verifyCacheStats(ds.getCacheStats(), 101, 104, 97, BASE_SZ-657)); + TEST_DO(verifyCacheStats(ds.getCacheStats(), 101, 104, 97, BASE_SZ-680)); vcs.verifyVisit({7,9,17,19,67,88,89}, {7,9,19,67,88,89}, true); - TEST_DO(verifyCacheStats(ds.getCacheStats(), 101, 105, 98, BASE_SZ-64)); + TEST_DO(verifyCacheStats(ds.getCacheStats(), 101, 105, 98, BASE_SZ-90)); vcs.verifyVisit({41, 42}, true); - TEST_DO(verifyCacheStats(ds.getCacheStats(), 101, 106, 99, BASE_SZ+238)); + TEST_DO(verifyCacheStats(ds.getCacheStats(), 101, 106, 99, BASE_SZ+210)); vcs.verifyVisit({43, 44}, true); - TEST_DO(verifyCacheStats(ds.getCacheStats(), 101, 107, 100, BASE_SZ+540)); + TEST_DO(verifyCacheStats(ds.getCacheStats(), 101, 107, 100, BASE_SZ+520)); vcs.verifyVisit({41, 42, 43, 44}, true); - TEST_DO(verifyCacheStats(ds.getCacheStats(), 101, 108, 99, BASE_SZ+362)); + TEST_DO(verifyCacheStats(ds.getCacheStats(), 101, 108, 99, BASE_SZ+340)); } TEST("testWriteRead") { diff --git a/searchlib/src/vespa/searchlib/docstore/CMakeLists.txt b/searchlib/src/vespa/searchlib/docstore/CMakeLists.txt index 735b836cf17..2b82d9e5af7 100644 --- a/searchlib/src/vespa/searchlib/docstore/CMakeLists.txt +++ b/searchlib/src/vespa/searchlib/docstore/CMakeLists.txt @@ -18,6 +18,7 @@ vespa_add_library(searchlib_docstore OBJECT randreaders.cpp storebybucket.cpp summaryexceptions.cpp + value.cpp visitcache.cpp writeablefilechunk.cpp DEPENDS diff --git a/searchlib/src/vespa/searchlib/docstore/documentstore.cpp b/searchlib/src/vespa/searchlib/docstore/documentstore.cpp index 95b6c3b1584..003f448ab07 100644 --- a/searchlib/src/vespa/searchlib/docstore/documentstore.cpp +++ b/searchlib/src/vespa/searchlib/docstore/documentstore.cpp @@ -4,15 +4,18 @@ #include "documentstore.h" #include "visitcache.h" #include "ibucketizer.h" +#include "value.h" #include <vespa/document/fieldvalue/document.h> #include <vespa/vespalib/stllike/cache.hpp> #include <vespa/vespalib/data/databuffer.h> #include <vespa/vespalib/util/compressor.h> +#include <vespa/log/log.h> + +LOG_SETUP(".searchlib.docstore.documentstore"); + using document::DocumentTypeRepo; using vespalib::compression::CompressionConfig; -using vespalib::compression::compress; -using vespalib::compression::decompress; namespace search { @@ -39,82 +42,18 @@ DocumentVisitorAdapter::visit(uint32_t lid, vespalib::ConstBufferRef buf) { } } +document::Document::UP +deserializeDocument(const vespalib::DataBuffer & uncompressed, const DocumentTypeRepo &repo) { + vespalib::nbostream is(uncompressed.getData(), uncompressed.getDataLen()); + return std::make_unique<document::Document>(repo, is); +} + } using vespalib::nbostream; namespace docstore { -class Value { -public: - using Alloc = vespalib::alloc::Alloc; - typedef std::unique_ptr<Value> UP; - - Value() - : _syncToken(0), - _compressedSize(0), - _uncompressedSize(0), - _compression(CompressionConfig::NONE) - {} - - Value(uint64_t syncToken) - : _syncToken(syncToken), - _compressedSize(0), - _uncompressedSize(0), - _compression(CompressionConfig::NONE) - {} - - Value(Value &&rhs) = default; - Value &operator=(Value &&rhs) = default; - - Value(const Value &rhs) - : _syncToken(rhs._syncToken), - _compressedSize(rhs._compressedSize), - _uncompressedSize(rhs._uncompressedSize), - _compression(rhs._compression), - _buf(Alloc::alloc(rhs.size())) - { - memcpy(get(), rhs.get(), size()); - } - - void setCompression(CompressionConfig::Type comp, size_t uncompressedSize) { - _compression = comp; - _uncompressedSize = uncompressedSize; - } - uint64_t getSyncToken() const { return _syncToken; } - - CompressionConfig::Type getCompression() const { return _compression; } - - size_t getUncompressedSize() const { return _uncompressedSize; } - - /** - * Compress buffer into temporary buffer and copy temporary buffer to - * value along with compression config. - */ - void set(vespalib::DataBuffer &&buf, ssize_t len, const CompressionConfig &compression); - // Keep buffer uncompressed - void set(vespalib::DataBuffer &&buf, ssize_t len); - - /** - * Decompress value into temporary buffer and deserialize document from - * the temporary buffer. - */ - document::Document::UP deserializeDocument(const DocumentTypeRepo &repo) const; - vespalib::DataBuffer decompressed() const; - - size_t size() const { return _compressedSize; } - bool empty() const { return size() == 0; } - operator const void *() const { return _buf.get(); } - const void *get() const { return _buf.get(); } - void *get() { return _buf.get(); } -private: - uint64_t _syncToken; - size_t _compressedSize; - size_t _uncompressedSize; - CompressionConfig::Type _compression; - Alloc _buf; -}; - class BackingStore { public: BackingStore(IDataStore &store, const CompressionConfig &compression) : @@ -133,45 +72,6 @@ private: CompressionConfig _compression; }; - -void -Value::set(vespalib::DataBuffer &&buf, ssize_t len) { - set(std::move(buf), len, CompressionConfig()); -} - -void -Value::set(vespalib::DataBuffer &&buf, ssize_t len, const CompressionConfig &compression) { - //Underlying buffer must be identical to allow swap. - vespalib::DataBuffer compressed(buf.getData(), 0u); - CompressionConfig::Type type = compress(compression, vespalib::ConstBufferRef(buf.getData(), len), compressed, true); - _compressedSize = compressed.getDataLen(); - if (buf.getData() == compressed.getData()) { - // Uncompressed so we can just steal the underlying buffer. - buf.stealBuffer().swap(_buf); - } else { - compressed.stealBuffer().swap(_buf); - } - assert(((type == CompressionConfig::NONE) && - (len == ssize_t(_compressedSize))) || - ((type != CompressionConfig::NONE) && - (len > ssize_t(_compressedSize)))); - setCompression(type, len); -} - -vespalib::DataBuffer -Value::decompressed() const { - vespalib::DataBuffer uncompressed(_buf.get(), (size_t) 0); - decompress(getCompression(), getUncompressedSize(), vespalib::ConstBufferRef(*this, size()), uncompressed, true); - return uncompressed; -} - -document::Document::UP -Value::deserializeDocument(const DocumentTypeRepo &repo) const { - vespalib::DataBuffer uncompressed(decompressed()); - vespalib::nbostream is(uncompressed.getData(), uncompressed.getDataLen()); - return std::make_unique<document::Document>(repo, is); -} - void BackingStore::visit(const IDocumentStore::LidVector &lids, const DocumentTypeRepo &repo, IDocumentVisitor &visitor) const { @@ -194,8 +94,9 @@ BackingStore::read(DocumentIdT key, Value &value) const { void BackingStore::write(DocumentIdT lid, const Value & value) { - vespalib::DataBuffer buf = value.decompressed(); - _backingStore.write(value.getSyncToken(), lid, buf.getData(), buf.getDataLen()); + Value::Result buf = value.decompressed(); + assert(buf.second); + _backingStore.write(value.getSyncToken(), lid, buf.first.getData(), buf.first.getDataLen()); } void @@ -203,8 +104,6 @@ BackingStore::reconfigure(const CompressionConfig &compression) { _compression = compression; } -} - using CacheParams = vespalib::CacheParam< vespalib::LruParam<DocumentIdT, docstore::Value>, docstore::BackingStore, @@ -216,12 +115,14 @@ public: Cache(BackingStore & b, size_t maxBytes) : vespalib::cache<CacheParams>(b, maxBytes) { } }; +} + using VisitCache = docstore::VisitCache; using docstore::Value; bool DocumentStore::Config::operator == (const Config &rhs) const { - return (_maxCacheBytes == rhs._maxCacheBytes) && + return (_maxCacheBytes == rhs._maxCacheBytes) && (_allowVisitCaching == rhs._allowVisitCaching) && (_initialCacheEntries == rhs._initialCacheEntries) && (_updateStrategy == rhs._updateStrategy) && @@ -233,9 +134,9 @@ DocumentStore::DocumentStore(const Config & config, IDataStore & store) : IDocumentStore(), _config(config), _backingStore(store), - _store(new docstore::BackingStore(_backingStore, config.getCompression())), - _cache(new Cache(*_store, config.getMaxCacheBytes())), - _visitCache(new VisitCache(store, config.getMaxCacheBytes(), config.getCompression())), + _store(std::make_unique<docstore::BackingStore>(_backingStore, config.getCompression())), + _cache(std::make_unique<docstore::Cache>(*_store, config.getMaxCacheBytes())), + _visitCache(std::make_unique<docstore::VisitCache>(store, config.getMaxCacheBytes(), config.getCompression())), _uncached_lookups(0) { _cache->reserveElements(config.getInitialCacheEntries()); @@ -271,21 +172,32 @@ DocumentStore::visit(const LidVector & lids, const DocumentTypeRepo &repo, IDocu } } -document::Document::UP +std::unique_ptr<document::Document> DocumentStore::read(DocumentIdT lid, const DocumentTypeRepo &repo) const { - document::Document::UP retval; Value value; if (useCache()) { value = _cache->read(lid); - } else { - _uncached_lookups.fetch_add(1); - _store->read(lid, value); + if (value.empty()) { + return std::unique_ptr<document::Document>(); + } + Value::Result result = value.decompressed(); + if ( result.second ) { + return deserializeDocument(result.first, repo); + } else { + LOG(warning, "Summary cache for lid %u is corrupt. Invalidating and reading directly from backing store", lid); + _cache->invalidate(lid); + } } + + _uncached_lookups.fetch_add(1); + _store->read(lid, value); if ( ! value.empty() ) { - retval = value.deserializeDocument(repo); + Value::Result result = value.decompressed(); + assert(result.second); + return deserializeDocument(result.first, repo); } - return retval; + return std::unique_ptr<document::Document>(); } void @@ -380,15 +292,12 @@ class DocumentStore::WrapVisitor : public IDataStoreVisitor public: void visit(uint32_t lid, const void *buffer, size_t sz) override; - WrapVisitor(Visitor &visitor, - const DocumentTypeRepo &repo, - const CompressionConfig &compresion, - IDocumentStore &ds, - uint64_t syncToken); + WrapVisitor(Visitor &visitor, const DocumentTypeRepo &repo, const CompressionConfig &compresion, + IDocumentStore &ds, uint64_t syncToken); - inline void rewrite(uint32_t lid, const document::Document &doc); - inline void rewrite(uint32_t lid); - inline void visitRemove(uint32_t lid); + void rewrite(uint32_t lid, const document::Document &doc); + void rewrite(uint32_t lid); + void visitRemove(uint32_t lid); }; @@ -406,34 +315,26 @@ public: } }; - template <> void DocumentStore::WrapVisitor<IDocumentStoreReadVisitor>:: -rewrite(uint32_t lid, const document::Document &doc) +rewrite(uint32_t , const document::Document &) { - (void) lid; - (void) doc; } template <> void -DocumentStore::WrapVisitor<IDocumentStoreReadVisitor>:: -rewrite(uint32_t lid) +DocumentStore::WrapVisitor<IDocumentStoreReadVisitor>::rewrite(uint32_t ) { - (void) lid; } - template <> void -DocumentStore::WrapVisitor<IDocumentStoreReadVisitor>:: -visitRemove(uint32_t lid) +DocumentStore::WrapVisitor<IDocumentStoreReadVisitor>::visitRemove(uint32_t lid) { _visitor.visit(lid); } - template <> void DocumentStore::WrapVisitor<IDocumentStoreRewriteVisitor>:: @@ -444,33 +345,21 @@ rewrite(uint32_t lid, const document::Document &doc) template <> void -DocumentStore::WrapVisitor<IDocumentStoreRewriteVisitor>:: -rewrite(uint32_t lid) +DocumentStore::WrapVisitor<IDocumentStoreRewriteVisitor>::rewrite(uint32_t lid) { _ds.remove(_syncToken, lid); } - template <> void -DocumentStore::WrapVisitor<IDocumentStoreRewriteVisitor>:: -visitRemove(uint32_t lid) +DocumentStore::WrapVisitor<IDocumentStoreRewriteVisitor>::visitRemove(uint32_t ) { - (void) lid; } - - template <class Visitor> void -DocumentStore::WrapVisitor<Visitor>::visit(uint32_t lid, - const void *buffer, - size_t sz) +DocumentStore::WrapVisitor<Visitor>::visit(uint32_t lid, const void *buffer, size_t sz) { - (void) lid; - (void) buffer; - (void) sz; - Value value; vespalib::DataBuffer buf(4096); buf.clear(); @@ -480,7 +369,7 @@ DocumentStore::WrapVisitor<Visitor>::visit(uint32_t lid, value.set(std::move(buf), len); } if (! value.empty()) { - std::shared_ptr<document::Document> doc(value.deserializeDocument(_repo)); + std::shared_ptr<document::Document> doc(deserializeDocument(value.decompressed().first, _repo)); _visitor.visit(lid, doc); rewrite(lid, *doc); } else { @@ -489,14 +378,10 @@ DocumentStore::WrapVisitor<Visitor>::visit(uint32_t lid, } } - template <class Visitor> DocumentStore::WrapVisitor<Visitor>:: -WrapVisitor(Visitor &visitor, - const DocumentTypeRepo &repo, - const CompressionConfig &compression, - IDocumentStore &ds, - uint64_t syncToken) +WrapVisitor(Visitor &visitor, const DocumentTypeRepo &repo, const CompressionConfig &compression, + IDocumentStore &ds, uint64_t syncToken) : _visitor(visitor), _repo(repo), _compression(compression), @@ -505,7 +390,6 @@ WrapVisitor(Visitor &visitor, { } - void DocumentStore::accept(IDocumentStoreReadVisitor &visitor, IDocumentStoreVisitorProgress &visitorProgress, const DocumentTypeRepo &repo) @@ -516,7 +400,6 @@ DocumentStore::accept(IDocumentStoreReadVisitor &visitor, IDocumentStoreVisitorP _backingStore.accept(wrap, wrapVisitorProgress, false); } - void DocumentStore::accept(IDocumentStoreRewriteVisitor &visitor, IDocumentStoreVisitorProgress &visitorProgress, const DocumentTypeRepo &repo) @@ -527,7 +410,6 @@ DocumentStore::accept(IDocumentStoreRewriteVisitor &visitor, IDocumentStoreVisit _backingStore.accept(wrap, wrapVisitorProgress, true); } - double DocumentStore::getVisitCost() const { @@ -584,5 +466,4 @@ DocumentStore::shrinkLidSpace() _backingStore.shrinkLidSpace(); } -} // namespace search - +} diff --git a/searchlib/src/vespa/searchlib/docstore/documentstore.h b/searchlib/src/vespa/searchlib/docstore/documentstore.h index 7bc3ab1c21d..baeed106531 100644 --- a/searchlib/src/vespa/searchlib/docstore/documentstore.h +++ b/searchlib/src/vespa/searchlib/docstore/documentstore.h @@ -5,17 +5,13 @@ #include "idocumentstore.h" #include <vespa/vespalib/util/compressionconfig.h> - -namespace search { - -namespace docstore { +namespace search::docstore { class VisitCache; class BackingStore; class Cache; } -using docstore::VisitCache; -using docstore::BackingStore; -using docstore::Cache; + +namespace search { /** * Simple document store that contains serialized Document instances. @@ -67,7 +63,7 @@ public: * @param baseDir The path to a directory where "simpledocstore.dat" will exist. **/ DocumentStore(const Config & config, IDataStore & store); - ~DocumentStore(); + ~DocumentStore() override; DocumentUP read(DocumentIdT lid, const document::DocumentTypeRepo &repo) const override; void visit(const LidVector & lids, const document::DocumentTypeRepo &repo, IDocumentVisitor & visitor) const override; @@ -111,12 +107,12 @@ private: template <class> class WrapVisitor; class WrapVisitorProgress; - Config _config; - IDataStore & _backingStore; - std::unique_ptr<BackingStore> _store; - std::shared_ptr<Cache> _cache; - std::shared_ptr<VisitCache> _visitCache; - mutable std::atomic<uint64_t> _uncached_lookups; + Config _config; + IDataStore & _backingStore; + std::unique_ptr<docstore::BackingStore> _store; + std::unique_ptr<docstore::Cache> _cache; + std::unique_ptr<docstore::VisitCache> _visitCache; + mutable std::atomic<uint64_t> _uncached_lookups; }; } // namespace search diff --git a/searchlib/src/vespa/searchlib/docstore/value.cpp b/searchlib/src/vespa/searchlib/docstore/value.cpp new file mode 100644 index 00000000000..8750413e3bc --- /dev/null +++ b/searchlib/src/vespa/searchlib/docstore/value.cpp @@ -0,0 +1,75 @@ +// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "value.h" +#include <vespa/vespalib/data/databuffer.h> +#include <vespa/vespalib/util/compressor.h> +#include <vespa/vespalib/xxhash/xxhash.h> + +using vespalib::compression::compress; +using vespalib::compression::decompress; + +namespace search::docstore { + +Value::Value() + : _syncToken(0), + _compressedSize(0), + _uncompressedSize(0), + _uncompressedCrc(0), + _compression(CompressionConfig::NONE) +{} + +Value::Value(uint64_t syncToken) + : _syncToken(syncToken), + _compressedSize(0), + _uncompressedSize(0), + _uncompressedCrc(0), + _compression(CompressionConfig::NONE) +{} + +Value::Value(const Value &rhs) + : _syncToken(rhs._syncToken), + _compressedSize(rhs._compressedSize), + _uncompressedSize(rhs._uncompressedSize), + _uncompressedCrc(rhs._uncompressedCrc), + _compression(rhs._compression), + _buf(Alloc::alloc(rhs.size())) +{ + memcpy(get(), rhs.get(), size()); +} + +void +Value::set(vespalib::DataBuffer &&buf, ssize_t len) { + set(std::move(buf), len, CompressionConfig()); +} + +void +Value::set(vespalib::DataBuffer &&buf, ssize_t len, const CompressionConfig &compression) { + //Underlying buffer must be identical to allow swap. + vespalib::DataBuffer compressed(buf.getData(), 0u); + vespalib::ConstBufferRef input(buf.getData(), len); + CompressionConfig::Type type = compress(compression, input, compressed, true); + _compressedSize = compressed.getDataLen(); + if (buf.getData() == compressed.getData()) { + // Uncompressed so we can just steal the underlying buffer. + buf.stealBuffer().swap(_buf); + } else { + compressed.stealBuffer().swap(_buf); + } + assert(((type == CompressionConfig::NONE) && + (len == ssize_t(_compressedSize))) || + ((type != CompressionConfig::NONE) && + (len > ssize_t(_compressedSize)))); + _compression = type; + _uncompressedSize = len; + _uncompressedCrc = XXH64(input.c_str(), input.size(), 0); +} + +Value::Result +Value::decompressed() const { + vespalib::DataBuffer uncompressed(_buf.get(), (size_t) 0); + decompress(getCompression(), getUncompressedSize(), vespalib::ConstBufferRef(*this, size()), uncompressed, true); + uint64_t crc = XXH64(uncompressed.getData(), uncompressed.getDataLen(), 0); + return std::make_pair<vespalib::DataBuffer, bool>(std::move(uncompressed), crc == _uncompressedCrc); +} + +} diff --git a/searchlib/src/vespa/searchlib/docstore/value.h b/searchlib/src/vespa/searchlib/docstore/value.h new file mode 100644 index 00000000000..426bcaf0e31 --- /dev/null +++ b/searchlib/src/vespa/searchlib/docstore/value.h @@ -0,0 +1,57 @@ +// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include <vespa/vespalib/util/compressionconfig.h> +#include <vespa/vespalib/data/databuffer.h> + +namespace search::docstore { + +/** + * This class is used to represent a serialized and optionally compressed blob. + * Has efficient move/copy operators for use in a cache/stl containers. + * Also has crc checks of uncompressed data. + */ +class Value { +public: + using Alloc = vespalib::alloc::Alloc; + using CompressionConfig = vespalib::compression::CompressionConfig; + using Result = std::pair<vespalib::DataBuffer, bool>; + + Value(); + Value(uint64_t syncToken); + + Value(Value &&rhs) = default; + Value &operator=(Value &&rhs) = default; + + Value(const Value &rhs); + + uint64_t getSyncToken() const { return _syncToken; } + CompressionConfig::Type getCompression() const { return _compression; } + size_t getUncompressedSize() const { return _uncompressedSize; } + + /** + * Compress buffer into temporary buffer and copy temporary buffer to + * value along with compression config. + */ + void set(vespalib::DataBuffer &&buf, ssize_t len, const CompressionConfig &compression); + // Keep buffer uncompressed + void set(vespalib::DataBuffer &&buf, ssize_t len); + + Result decompressed() const; + + size_t size() const { return _compressedSize; } + bool empty() const { return size() == 0; } + operator const void *() const { return _buf.get(); } + const void *get() const { return _buf.get(); } + void *get() { return _buf.get(); } +private: + uint64_t _syncToken; + size_t _compressedSize; + size_t _uncompressedSize; + uint64_t _uncompressedCrc; + CompressionConfig::Type _compression; + Alloc _buf; +}; + +} |