diff options
Diffstat (limited to 'searchlib/src')
5 files changed, 239 insertions, 98 deletions
diff --git a/searchlib/src/tests/docstore/store_by_bucket/store_by_bucket_test.cpp b/searchlib/src/tests/docstore/store_by_bucket/store_by_bucket_test.cpp index 2361ab90e2a..d6dde433365 100644 --- a/searchlib/src/tests/docstore/store_by_bucket/store_by_bucket_test.cpp +++ b/searchlib/src/tests/docstore/store_by_bucket/store_by_bucket_test.cpp @@ -4,7 +4,7 @@ #include <vespa/document/bucket/bucketid.h> #include <vespa/document/base/documentid.h> -#include <vespa/searchlib/docstore/storebybucket.h> +#include <vespa/searchlib/docstore/compacter.h> #include <vespa/vespalib/stllike/asciistream.h> #include <vespa/vespalib/stllike/hash_set.h> #include <vespa/vespalib/util/threadstackexecutor.h> @@ -25,8 +25,8 @@ createPayload(BucketId b) { } uint32_t userId(size_t i) { return i%100; } -void -add(StoreByBucket & sbb, size_t i) { +BucketId +createBucketId(size_t i) { constexpr size_t USED_BITS=5; vespalib::asciistream os; os << "id:a:b:n=" << userId(i) << ":" << i; @@ -34,13 +34,19 @@ add(StoreByBucket & sbb, size_t i) { BucketId b = docId.getGlobalId().convertToBucketId(); EXPECT_EQUAL(userId(i), docId.getGlobalId().getLocationSpecificBits()); b.setUsedBits(USED_BITS); + return b; +} +void +add(StoreByBucket & sbb, size_t i) { + BucketId b = createBucketId(i); vespalib::string s = createPayload(b); sbb.add(b, i%10, i, {s.c_str(), s.size()}); } class VerifyBucketOrder : public StoreByBucket::IWrite { public: - VerifyBucketOrder() : _lastLid(0), _lastBucketId(0), _uniqueUser(), _uniqueBucket() { } + VerifyBucketOrder() : _lastLid(0), _lastBucketId(0), _uniqueUser(), _uniqueBucket(){ } + ~VerifyBucketOrder() override; void write(BucketId bucketId, uint32_t chunkId, uint32_t lid, vespalib::ConstBufferRef data) override { (void) chunkId; EXPECT_LESS_EQUAL(_lastBucketId.toKey(), bucketId.toKey()); @@ -56,33 +62,90 @@ public: _lastBucketId = bucketId; EXPECT_EQUAL(0, memcmp(data.data(), createPayload(bucketId).c_str(), data.size())); } - ~VerifyBucketOrder() override; + private: uint32_t _lastLid; BucketId _lastBucketId; vespalib::hash_set<uint32_t> _uniqueUser; vespalib::hash_set<uint64_t> _uniqueBucket; + }; VerifyBucketOrder::~VerifyBucketOrder() = default; +struct StoreIndex : public StoreByBucket::StoreIndex { + ~StoreIndex() override; + void store(const StoreByBucket::Index &index) override { + _where.push_back(index); + } + std::vector<StoreByBucket::Index> _where; +}; +StoreIndex::~StoreIndex() = default; + +struct Iterator : public StoreByBucket::IndexIterator { + explicit Iterator(const std::vector<StoreByBucket::Index> & where) : _where(where), _current(0) {} + + bool has_next() noexcept override { + return _current < _where.size(); + } + + StoreByBucket::Index next() noexcept override { + return _where[_current++]; + } + + const std::vector<StoreByBucket::Index> & _where; + uint32_t _current; +}; + TEST("require that StoreByBucket gives bucket by bucket and ordered within") { std::mutex backing_lock; vespalib::MemoryDataStore backing(vespalib::alloc::Alloc::alloc(256), &backing_lock); vespalib::ThreadStackExecutor executor(8); - StoreByBucket sbb(backing, executor, CompressionConfig::LZ4); - for (size_t i(1); i <=500; i++) { + StoreIndex storeIndex; + StoreByBucket sbb(storeIndex, backing, executor, CompressionConfig::LZ4); + for (size_t i(1); i <= 500u; i++) { add(sbb, i); } - for (size_t i(1000); i > 500; i--) { + for (size_t i(1000); i > 500u; i--) { add(sbb, i); } sbb.close(); - EXPECT_EQUAL(32u, sbb.getBucketCount()); - EXPECT_EQUAL(1000u, sbb.getLidCount()); + std::sort(storeIndex._where.begin(), storeIndex._where.end()); + EXPECT_EQUAL(1000u, storeIndex._where.size()); VerifyBucketOrder vbo; - sbb.drain(vbo); + Iterator all(storeIndex._where); + sbb.drain(vbo, all); +} + +constexpr uint32_t NUM_PARTS = 3; + +void +verifyIter(BucketIndexStore &store, uint32_t partId, uint32_t expected_count) { + auto iter = store.createIterator(partId); + uint32_t count(0); + while (iter->has_next()) { + StoreByBucket::Index idx = iter->next(); + EXPECT_EQUAL(store.toPartitionId(idx._bucketId), partId); + count++; + } + EXPECT_EQUAL(expected_count, count); +} + +TEST("test that iterators cover the whole corpus and maps to correct partid") { + + BucketIndexStore bucketIndexStore(32, NUM_PARTS); + for (size_t i(1); i <= 500u; i++) { + bucketIndexStore.store(StoreByBucket::Index(createBucketId(i), 1, 2, i)); + } + bucketIndexStore.prepareForIterate(); + EXPECT_EQUAL(500u, bucketIndexStore.getLidCount()); + EXPECT_EQUAL(32u, bucketIndexStore.getBucketCount()); + constexpr uint32_t COUNT_0 = 175, COUNT_1 = 155, COUNT_2 = 170; + verifyIter(bucketIndexStore, 0, COUNT_0); + verifyIter(bucketIndexStore, 1, COUNT_1); + verifyIter(bucketIndexStore, 2, COUNT_2); + EXPECT_EQUAL(500u, COUNT_0 + COUNT_1 + COUNT_2); } TEST_MAIN() { TEST_RUN_ALL(); } diff --git a/searchlib/src/vespa/searchlib/docstore/compacter.cpp b/searchlib/src/vespa/searchlib/docstore/compacter.cpp index 693b04bb96e..91faafc2a4e 100644 --- a/searchlib/src/vespa/searchlib/docstore/compacter.cpp +++ b/searchlib/src/vespa/searchlib/docstore/compacter.cpp @@ -4,6 +4,7 @@ #include "logdatastore.h" #include <vespa/vespalib/util/size_literals.h> #include <vespa/vespalib/util/array.hpp> +#include <cassert> #include <vespa/log/log.h> LOG_SETUP(".searchlib.docstore.compacter"); @@ -13,7 +14,7 @@ namespace search::docstore { using vespalib::alloc::Alloc; namespace { - static constexpr size_t INITIAL_BACKING_BUFFER_SIZE = 64_Mi; + constexpr size_t INITIAL_BACKING_BUFFER_SIZE = 64_Mi; } void @@ -23,24 +24,84 @@ Compacter::write(LockGuard guard, uint32_t chunkId, uint32_t lid, ConstBufferRef _ds.write(std::move(guard), fileId, lid, data); } +BucketIndexStore::BucketIndexStore(size_t maxSignificantBucketBits, uint32_t numPartitions) noexcept + : _inSignificantBucketBits((maxSignificantBucketBits > 8) ? (maxSignificantBucketBits - 8) : 0), + _where(), + _numPartitions(numPartitions), + _readyForIterate(true) +{} +BucketIndexStore::~BucketIndexStore() = default; + +void +BucketIndexStore::prepareForIterate() { + std::sort(_where.begin(), _where.end()); + _readyForIterate = true; +} + +void +BucketIndexStore::store(const StoreByBucket::Index & index) { + _where.push_back(index); + _readyForIterate = false; +} + +size_t +BucketIndexStore::getBucketCount() const noexcept { + if (_where.empty()) return 0; + + size_t count = 0; + document::BucketId prev = _where.front()._bucketId; + for (const auto & lid : _where) { + if (lid._bucketId != prev) { + count++; + prev = lid._bucketId; + } + } + return count + 1; +} + +std::unique_ptr<StoreByBucket::IndexIterator> +BucketIndexStore::createIterator(uint32_t partitionId) const { + assert(_readyForIterate); + return std::make_unique<LidIterator>(*this, partitionId); +} + +BucketIndexStore::LidIterator::LidIterator(const BucketIndexStore & store, size_t partitionId) + : _store(store), + _partitionId(partitionId), + _current(_store._where.begin()) +{} + +bool +BucketIndexStore::LidIterator::has_next() noexcept { + for (;(_current != _store._where.end()) && (_store.toPartitionId(_current->_bucketId) != _partitionId); _current++); + return (_current != _store._where.end()) && (_store.toPartitionId(_current->_bucketId) == _partitionId); +} + +StoreByBucket::Index +BucketIndexStore::LidIterator::next() noexcept { + return *_current++; +} + BucketCompacter::BucketCompacter(size_t maxSignificantBucketBits, CompressionConfig compression, LogDataStore & ds, - Executor & executor, const IBucketizer & bucketizer, FileId source, FileId destination) : - _unSignificantBucketBits((maxSignificantBucketBits > 8) ? (maxSignificantBucketBits - 8) : 0), - _sourceFileId(source), - _destinationFileId(destination), - _ds(ds), - _bucketizer(bucketizer), - _lock(), - _backingMemory(Alloc::alloc(INITIAL_BACKING_BUFFER_SIZE), &_lock), - _tmpStore(), - _lidGuard(ds.getLidReadGuard()), - _stat() + Executor & executor, const IBucketizer & bucketizer, FileId source, FileId destination) + : _sourceFileId(source), + _destinationFileId(destination), + _ds(ds), + _bucketizer(bucketizer), + _lock(), + _backingMemory(Alloc::alloc(INITIAL_BACKING_BUFFER_SIZE), &_lock), + _bucketIndexStore(maxSignificantBucketBits, NUM_PARTITIONS), + _tmpStore(), + _lidGuard(ds.getLidReadGuard()), + _stat() { for (auto & partition : _tmpStore) { - partition = std::make_unique<StoreByBucket>(_backingMemory, executor, compression); + partition = std::make_unique<StoreByBucket>(_bucketIndexStore, _backingMemory, executor, compression); } } +BucketCompacter::~BucketCompacter() = default; + FileChunk::FileId BucketCompacter::getDestinationId(const LockGuard & guard) const { return (_destinationFileId.isActive()) ? _ds.getActiveFileId(guard) : _destinationFileId; @@ -51,28 +112,24 @@ BucketCompacter::write(LockGuard guard, uint32_t chunkId, uint32_t lid, ConstBuf { guard.unlock(); BucketId bucketId = (data.size() > 0) ? _bucketizer.getBucketOf(_bucketizer.getGuard(), lid) : BucketId(); - uint64_t sortableBucketId = bucketId.toKey(); - _tmpStore[(sortableBucketId >> _unSignificantBucketBits) % _tmpStore.size()]->add(bucketId, chunkId, lid, data); + _tmpStore[_bucketIndexStore.toPartitionId(bucketId)]->add(bucketId, chunkId, lid, data); } void BucketCompacter::close() { - size_t lidCount1(0); - size_t bucketCount(0); size_t chunkCount(0); for (const auto & store : _tmpStore) { store->close(); - lidCount1 += store->getLidCount(); - bucketCount += store->getBucketCount(); chunkCount += store->getChunkCount(); } + _bucketIndexStore.prepareForIterate(); LOG(info, "Have read %ld lids and placed them in %ld buckets. Temporary compressed in %ld chunks.", - lidCount1, bucketCount, chunkCount); + _bucketIndexStore.getLidCount(), _bucketIndexStore.getBucketCount(), chunkCount); - for (auto & store_ref : _tmpStore) { - auto store = std::move(store_ref); - store->drain(*this); + for (size_t partId(0); partId < _tmpStore.size(); partId++) { + auto partIterator = _bucketIndexStore.createIterator(partId); + _tmpStore[partId]->drain(*this, *partIterator); } // All partitions using _backingMemory should be destructed before clearing. _backingMemory.clear(); diff --git a/searchlib/src/vespa/searchlib/docstore/compacter.h b/searchlib/src/vespa/searchlib/docstore/compacter.h index 880340950e9..f44d7c341d2 100644 --- a/searchlib/src/vespa/searchlib/docstore/compacter.h +++ b/searchlib/src/vespa/searchlib/docstore/compacter.h @@ -16,21 +16,52 @@ namespace search::docstore { class Compacter : public IWriteData { public: - Compacter(LogDataStore & ds) : _ds(ds) { } + explicit Compacter(LogDataStore & ds) : _ds(ds) { } void write(LockGuard guard, uint32_t chunkId, uint32_t lid, ConstBufferRef data) override; void close() override { } - private: LogDataStore & _ds; }; +class BucketIndexStore : public StoreByBucket::StoreIndex { +public: + BucketIndexStore(size_t maxSignificantBucketBits, uint32_t numPartitions) noexcept; + ~BucketIndexStore() override; + size_t toPartitionId(document::BucketId bucketId) const noexcept { + uint64_t sortableBucketId = bucketId.toKey(); + return (sortableBucketId >> _inSignificantBucketBits) % _numPartitions; + } + void store(const StoreByBucket::Index & index) override; + size_t getBucketCount() const noexcept; + size_t getLidCount() const noexcept { return _where.size(); } + void prepareForIterate(); + std::unique_ptr<StoreByBucket::IndexIterator> createIterator(uint32_t partitionId) const; +private: + using IndexVector = std::vector<StoreByBucket::Index, vespalib::allocator_large<StoreByBucket::Index>>; + class LidIterator : public StoreByBucket::IndexIterator { + public: + LidIterator(const BucketIndexStore & bc, size_t partitionId); + bool has_next() noexcept override; + StoreByBucket::Index next() noexcept override; + private: + const BucketIndexStore & _store; + size_t _partitionId; + IndexVector::const_iterator _current; + }; + size_t _inSignificantBucketBits; + IndexVector _where; + uint32_t _numPartitions; + bool _readyForIterate; +}; + /** * This will split the incoming data into buckets. * The buckets data will then be written out in bucket order. * The buckets will be ordered, and the objects inside the buckets will be further ordered. * All data are kept compressed to minimize memory usage. **/ -class BucketCompacter : public IWriteData, public StoreByBucket::IWrite +class BucketCompacter : public IWriteData, + public StoreByBucket::IWrite { using CompressionConfig = vespalib::compression::CompressionConfig; using Executor = vespalib::Executor; @@ -38,6 +69,7 @@ public: using FileId = FileChunk::FileId; BucketCompacter(size_t maxSignificantBucketBits, CompressionConfig compression, LogDataStore & ds, Executor & executor, const IBucketizer & bucketizer, FileId source, FileId destination); + ~BucketCompacter() override; void write(LockGuard guard, uint32_t chunkId, uint32_t lid, ConstBufferRef data) override; void write(BucketId bucketId, uint32_t chunkId, uint32_t lid, ConstBufferRef data) override; void close() override; @@ -46,15 +78,15 @@ private: using GenerationHandler = vespalib::GenerationHandler; using Partitions = std::array<std::unique_ptr<StoreByBucket>, NUM_PARTITIONS>; FileId getDestinationId(const LockGuard & guard) const; - size_t _unSignificantBucketBits; - FileId _sourceFileId; - FileId _destinationFileId; - LogDataStore & _ds; - const IBucketizer & _bucketizer; - std::mutex _lock; - vespalib::MemoryDataStore _backingMemory; - Partitions _tmpStore; - GenerationHandler::Guard _lidGuard; + FileId _sourceFileId; + FileId _destinationFileId; + LogDataStore & _ds; + const IBucketizer & _bucketizer; + std::mutex _lock; + vespalib::MemoryDataStore _backingMemory; + BucketIndexStore _bucketIndexStore; + Partitions _tmpStore; + GenerationHandler::Guard _lidGuard; vespalib::hash_map<uint64_t, uint32_t> _stat; }; diff --git a/searchlib/src/vespa/searchlib/docstore/storebybucket.cpp b/searchlib/src/vespa/searchlib/docstore/storebybucket.cpp index 9d77698ff6c..f26afc595f3 100644 --- a/searchlib/src/vespa/searchlib/docstore/storebybucket.cpp +++ b/searchlib/src/vespa/searchlib/docstore/storebybucket.cpp @@ -13,10 +13,10 @@ using document::BucketId; using vespalib::CpuUsage; using vespalib::makeLambdaTask; -StoreByBucket::StoreByBucket(MemoryDataStore & backingMemory, Executor & executor, CompressionConfig compression) noexcept +StoreByBucket::StoreByBucket(StoreIndex & storeIndex, MemoryDataStore & backingMemory, Executor & executor, CompressionConfig compression) noexcept : _chunkSerial(0), _current(), - _where(), + _storeIndex(storeIndex), _backingMemory(backingMemory), _executor(executor), _lock(), @@ -43,7 +43,7 @@ StoreByBucket::add(BucketId bucketId, uint32_t chunkId, uint32_t lid, ConstBuffe _executor.execute(CpuUsage::wrap(std::move(task), CpuUsage::Category::COMPACT)); } _current->append(lid, data); - _where.emplace_back(bucketId, _current->getId(), chunkId, lid); + _storeIndex.store(Index(bucketId, _current->getId(), chunkId, lid)); } Chunk::UP @@ -53,7 +53,7 @@ StoreByBucket::createChunk() } size_t -StoreByBucket::getChunkCount() const { +StoreByBucket::getChunkCount() const noexcept { std::lock_guard guard(_lock); return _chunks.size(); } @@ -94,26 +94,10 @@ StoreByBucket::close() { }); _executor.execute(CpuUsage::wrap(std::move(task), CpuUsage::Category::COMPACT)); waitAllProcessed(); - std::sort(_where.begin(), _where.end()); -} - -size_t -StoreByBucket::getBucketCount() const { - if (_where.empty()) return 0; - - size_t count = 0; - BucketId prev = _where.front()._bucketId; - for (const auto & lid : _where) { - if (lid._bucketId != prev) { - count++; - prev = lid._bucketId; - } - } - return count + 1; } void -StoreByBucket::drain(IWrite & drainer) +StoreByBucket::drain(IWrite & drainer, IndexIterator & indexIterator) { std::vector<Chunk::UP> chunks; chunks.resize(_chunks.size()); @@ -122,8 +106,9 @@ StoreByBucket::drain(IWrite & drainer) chunks[it.first] = std::make_unique<Chunk>(it.first, buf.data(), buf.size()); } _chunks.clear(); - for (auto & idx : _where) { - vespalib::ConstBufferRef data(chunks[idx._id]->getLid(idx._lid)); + while (indexIterator.has_next()) { + Index idx = indexIterator.next(); + vespalib::ConstBufferRef data(chunks[idx._localChunkId]->getLid(idx._lid)); drainer.write(idx._bucketId, idx._chunkId, idx._lid, data); } } diff --git a/searchlib/src/vespa/searchlib/docstore/storebybucket.h b/searchlib/src/vespa/searchlib/docstore/storebybucket.h index 4f51a7970e7..ea1c6e766e0 100644 --- a/searchlib/src/vespa/searchlib/docstore/storebybucket.h +++ b/searchlib/src/vespa/searchlib/docstore/storebybucket.h @@ -23,51 +23,55 @@ class StoreByBucket using ConstBufferRef = vespalib::ConstBufferRef; using CompressionConfig = vespalib::compression::CompressionConfig; public: - StoreByBucket(MemoryDataStore & backingMemory, Executor & executor, CompressionConfig compression) noexcept; + struct Index { + using BucketId=document::BucketId; + Index(BucketId bucketId, uint32_t localChunkId, uint32_t chunkId, uint32_t entry) noexcept : + _bucketId(bucketId), _localChunkId(localChunkId), _chunkId(chunkId), _lid(entry) + { } + bool operator < (const Index & b) const noexcept { + return BucketId::bucketIdToKey(_bucketId.getRawId()) < BucketId::bucketIdToKey(b._bucketId.getRawId()); + } + BucketId _bucketId; + uint32_t _localChunkId; + uint32_t _chunkId; + uint32_t _lid; + }; + using IndexVector = std::vector<Index, vespalib::allocator_large<Index>>; + struct IWrite { + using BucketId=document::BucketId; + virtual ~IWrite() = default; + virtual void write(BucketId bucketId, uint32_t chunkId, uint32_t lid, ConstBufferRef data) = 0; + }; + struct IndexIterator { + virtual ~IndexIterator() = default; + virtual bool has_next() noexcept = 0; + virtual Index next() noexcept = 0; + }; + struct StoreIndex { + virtual ~StoreIndex() = default; + virtual void store(const Index & index) = 0; + }; + StoreByBucket(StoreIndex & storeIndex, MemoryDataStore & backingMemory, + Executor & executor, CompressionConfig compression) noexcept; //TODO Putting the below move constructor into cpp file fails for some unknown reason. Needs to be resolved. StoreByBucket(StoreByBucket &&) noexcept = delete; StoreByBucket(const StoreByBucket &) = delete; StoreByBucket & operator=(StoreByBucket &&) noexcept = delete; StoreByBucket & operator = (const StoreByBucket &) = delete; ~StoreByBucket(); - class IWrite { - public: - using BucketId=document::BucketId; - virtual ~IWrite() = default; - virtual void write(BucketId bucketId, uint32_t chunkId, uint32_t lid, ConstBufferRef data) = 0; - }; void add(document::BucketId bucketId, uint32_t chunkId, uint32_t lid, ConstBufferRef data); void close(); /// close() must have been called prior to calling getBucketCount() or drain() - void drain(IWrite & drain); - size_t getBucketCount() const; - - size_t getChunkCount() const; - size_t getLidCount() const { - return _where.size(); - } + void drain(IWrite & drain, IndexIterator & iterator); + size_t getChunkCount() const noexcept; private: void incChunksPosted(); void waitAllProcessed(); Chunk::UP createChunk(); void closeChunk(Chunk::UP chunk); - struct Index { - using BucketId=document::BucketId; - Index(BucketId bucketId, uint32_t id, uint32_t chunkId, uint32_t entry) noexcept : - _bucketId(bucketId), _id(id), _chunkId(chunkId), _lid(entry) - { } - bool operator < (const Index & b) const noexcept { - return BucketId::bucketIdToKey(_bucketId.getRawId()) < BucketId::bucketIdToKey(b._bucketId.getRawId()); - } - BucketId _bucketId; - uint32_t _id; - uint32_t _chunkId; - uint32_t _lid; - }; - using IndexVector = std::vector<Index, vespalib::allocator_large<Index>>; uint64_t _chunkSerial; Chunk::UP _current; - IndexVector _where; + StoreIndex & _storeIndex; MemoryDataStore & _backingMemory; Executor & _executor; mutable std::mutex _lock; |