diff options
5 files changed, 149 insertions, 78 deletions
diff --git a/searchlib/src/tests/docstore/store_by_bucket/store_by_bucket_test.cpp b/searchlib/src/tests/docstore/store_by_bucket/store_by_bucket_test.cpp index 053a2806b5d..50e99b15fb2 100644 --- a/searchlib/src/tests/docstore/store_by_bucket/store_by_bucket_test.cpp +++ b/searchlib/src/tests/docstore/store_by_bucket/store_by_bucket_test.cpp @@ -40,7 +40,8 @@ add(StoreByBucket & sbb, size_t i) { class VerifyBucketOrder : public StoreByBucket::IWrite { public: - VerifyBucketOrder() : _lastLid(0), _lastBucketId(0), _uniqueUser(), _uniqueBucket() { } + VerifyBucketOrder() : _lastLid(0), _lastBucketId(0), _uniqueUser(), _uniqueBucket(){ } + ~VerifyBucketOrder() override; void write(BucketId bucketId, uint32_t chunkId, uint32_t lid, vespalib::ConstBufferRef data) override { (void) chunkId; EXPECT_LESS_EQUAL(_lastBucketId.toKey(), bucketId.toKey()); @@ -56,22 +57,48 @@ public: _lastBucketId = bucketId; EXPECT_EQUAL(0, memcmp(data.data(), createPayload(bucketId).c_str(), data.size())); } - ~VerifyBucketOrder() override; + private: uint32_t _lastLid; BucketId _lastBucketId; vespalib::hash_set<uint32_t> _uniqueUser; vespalib::hash_set<uint64_t> _uniqueBucket; + }; VerifyBucketOrder::~VerifyBucketOrder() = default; +struct StoreIndex : public StoreByBucket::StoreIndex { + ~StoreIndex() override; + void store(const StoreByBucket::Index &index) override { + _where.push_back(index); + } + std::vector<StoreByBucket::Index> _where; +}; +StoreIndex::~StoreIndex() = default; + +struct Iterator : public StoreByBucket::IndexIterator { + Iterator(const std::vector<StoreByBucket::Index> & where) : _where(where), _current(0) {} + + bool has_next() noexcept override { + return _current < _where.size(); + } + + StoreByBucket::Index next() noexcept override { + return _where[_current++]; + } + + const std::vector<StoreByBucket::Index> & _where; + uint32_t _current; +}; + TEST("require that StoreByBucket gives bucket by bucket and ordered within") { std::mutex backing_lock; vespalib::MemoryDataStore backing(vespalib::alloc::Alloc::alloc(256), &backing_lock); vespalib::ThreadStackExecutor executor(8); - StoreByBucket sbb(backing, executor, CompressionConfig::LZ4); + StoreIndex storeIndex; + StoreByBucket sbb(storeIndex, backing, executor, CompressionConfig::LZ4); for (size_t i(1); i <=500; i++) { add(sbb, i); } @@ -79,10 +106,12 @@ TEST("require that StoreByBucket gives bucket by bucket and ordered within") add(sbb, i); } sbb.close(); - EXPECT_EQUAL(32u, sbb.getBucketCount()); - EXPECT_EQUAL(1000u, sbb.getLidCount()); + std::sort(storeIndex._where.begin(), storeIndex._where.end()); + //EXPECT_EQUAL(32u, sbb.getBucketCount()); + EXPECT_EQUAL(1000u, storeIndex._where.size()); VerifyBucketOrder vbo; - sbb.drain(vbo); + Iterator all(storeIndex._where); + sbb.drain(vbo, all); } TEST_MAIN() { TEST_RUN_ALL(); } diff --git a/searchlib/src/vespa/searchlib/docstore/compacter.cpp b/searchlib/src/vespa/searchlib/docstore/compacter.cpp index 803b916b67d..6caafe42040 100644 --- a/searchlib/src/vespa/searchlib/docstore/compacter.cpp +++ b/searchlib/src/vespa/searchlib/docstore/compacter.cpp @@ -37,7 +37,7 @@ BucketCompacter::BucketCompacter(size_t maxSignificantBucketBits, CompressionCon _stat() { for (auto & partition : _tmpStore) { - partition = std::make_unique<StoreByBucket>(_backingMemory, executor, compression); + partition = std::make_unique<StoreByBucket>(*this, _backingMemory, executor, compression); } } @@ -51,28 +51,61 @@ BucketCompacter::write(LockGuard guard, uint32_t chunkId, uint32_t lid, ConstBuf { guard.unlock(); BucketId bucketId = (data.size() > 0) ? _bucketizer.getBucketOf(_bucketizer.getGuard(), lid) : BucketId(); - uint64_t sortableBucketId = bucketId.toKey(); - _tmpStore[(sortableBucketId >> _unSignificantBucketBits) % _tmpStore.size()]->add(bucketId, chunkId, lid, data); + _tmpStore[toPartitionId(bucketId)]->add(bucketId, chunkId, lid, data); +} + +void +BucketCompacter::store(const StoreByBucket::Index & index) { + _where.push_back(index); +} + +size_t +BucketCompacter::getBucketCount() const noexcept { + if (_where.empty()) return 0; + + size_t count = 0; + BucketId prev = _where.front()._bucketId; + for (const auto & lid : _where) { + if (lid._bucketId != prev) { + count++; + prev = lid._bucketId; + } + } + return count + 1; +} + +BucketCompacter::LidIterator::LidIterator(const BucketCompacter & bc, size_t partitionId) + : _bc(bc), + _partitionId(partitionId), + _current(_bc._where.begin()) +{} + +bool +BucketCompacter::LidIterator::has_next() noexcept { + for (;(_current != _bc._where.end()) && (_bc.toPartitionId(_current->_bucketId) != _partitionId); _current++); + return (_current != _bc._where.end()) && (_bc.toPartitionId(_current->_bucketId) == _partitionId); +} + +StoreByBucket::Index +BucketCompacter::LidIterator::next() noexcept { + return *_current++; } void BucketCompacter::close() { - size_t lidCount1(0); - size_t bucketCount(0); size_t chunkCount(0); for (const auto & store : _tmpStore) { store->close(); - lidCount1 += store->getLidCount(); - bucketCount += store->getBucketCount(); chunkCount += store->getChunkCount(); } + std::sort(_where.begin(), _where.end()); LOG(info, "Have read %ld lids and placed them in %ld buckets. Temporary compressed in %ld chunks.", - lidCount1, bucketCount, chunkCount); + _where.size(), getBucketCount(), chunkCount); - for (auto & store_ref : _tmpStore) { - auto store = std::move(store_ref); - store->drain(*this); + for (size_t partId(0); partId < _tmpStore.size(); partId++) { + LidIterator partIterator(*this, partId); + _tmpStore[partId]->drain(*this, partIterator); } // All partitions using _backingMemory should be destructed before clearing. _backingMemory.clear(); diff --git a/searchlib/src/vespa/searchlib/docstore/compacter.h b/searchlib/src/vespa/searchlib/docstore/compacter.h index 354ca24ede9..1eb3fda78a6 100644 --- a/searchlib/src/vespa/searchlib/docstore/compacter.h +++ b/searchlib/src/vespa/searchlib/docstore/compacter.h @@ -30,7 +30,9 @@ private: * The buckets will be ordered, and the objects inside the buckets will be further ordered. * All data are kept compressed to minimize memory usage. **/ -class BucketCompacter : public IWriteData, public StoreByBucket::IWrite +class BucketCompacter : public IWriteData, + public StoreByBucket::IWrite, + public StoreByBucket::StoreIndex { using CompressionConfig = vespalib::compression::CompressionConfig; using Executor = vespalib::Executor; @@ -40,21 +42,39 @@ public: Executor & executor, const IBucketizer & bucketizer, FileId source, FileId destination); void write(LockGuard guard, uint32_t chunkId, uint32_t lid, ConstBufferRef data) override; void write(BucketId bucketId, uint32_t chunkId, uint32_t lid, ConstBufferRef data) override; + void store(const StoreByBucket::Index & index) override; + size_t toPartitionId(BucketId bucketId) const noexcept { + uint64_t sortableBucketId = bucketId.toKey(); + return (sortableBucketId >> _unSignificantBucketBits) % _tmpStore.size(); + } void close() override; private: + size_t getBucketCount() const noexcept; static constexpr size_t NUM_PARTITIONS = 256; using GenerationHandler = vespalib::GenerationHandler; using Partitions = std::array<std::unique_ptr<StoreByBucket>, NUM_PARTITIONS>; + using IndexVector = std::vector<StoreByBucket::Index, vespalib::allocator_large<StoreByBucket::Index>>; + class LidIterator : public StoreByBucket::IndexIterator { + public: + LidIterator(const BucketCompacter & bc, size_t partitionId); + bool has_next() noexcept override; + StoreByBucket::Index next() noexcept override; + private: + const BucketCompacter & _bc; + size_t _partitionId; + IndexVector::const_iterator _current; + }; FileId getDestinationId(const LockGuard & guard) const; - size_t _unSignificantBucketBits; - FileId _sourceFileId; - FileId _destinationFileId; - LogDataStore & _ds; - const IBucketizer & _bucketizer; - std::mutex _lock; - vespalib::MemoryDataStore _backingMemory; - Partitions _tmpStore; - GenerationHandler::Guard _lidGuard; + size_t _unSignificantBucketBits; + FileId _sourceFileId; + FileId _destinationFileId; + LogDataStore & _ds; + const IBucketizer & _bucketizer; + std::mutex _lock; + vespalib::MemoryDataStore _backingMemory; + IndexVector _where; + Partitions _tmpStore; + GenerationHandler::Guard _lidGuard; vespalib::hash_map<uint64_t, uint32_t> _stat; }; diff --git a/searchlib/src/vespa/searchlib/docstore/storebybucket.cpp b/searchlib/src/vespa/searchlib/docstore/storebybucket.cpp index dbcbaafbbb7..34280ffd16e 100644 --- a/searchlib/src/vespa/searchlib/docstore/storebybucket.cpp +++ b/searchlib/src/vespa/searchlib/docstore/storebybucket.cpp @@ -13,10 +13,10 @@ using document::BucketId; using vespalib::CpuUsage; using vespalib::makeLambdaTask; -StoreByBucket::StoreByBucket(MemoryDataStore & backingMemory, Executor & executor, CompressionConfig compression) noexcept +StoreByBucket::StoreByBucket(StoreIndex & storeIndex, MemoryDataStore & backingMemory, Executor & executor, CompressionConfig compression) noexcept : _chunkSerial(0), _current(), - _where(), + _storeIndex(storeIndex), _backingMemory(backingMemory), _executor(executor), _lock(), @@ -43,7 +43,7 @@ StoreByBucket::add(BucketId bucketId, uint32_t chunkId, uint32_t lid, ConstBuffe _executor.execute(CpuUsage::wrap(std::move(task), CpuUsage::Category::COMPACT)); } _current->append(lid, data); - _where.emplace_back(bucketId, _current->getId(), chunkId, lid); + _storeIndex.store(Index(bucketId, _current->getId(), chunkId, lid)); } Chunk::UP @@ -53,7 +53,7 @@ StoreByBucket::createChunk() } size_t -StoreByBucket::getChunkCount() const { +StoreByBucket::getChunkCount() const noexcept { std::lock_guard guard(_lock); return _chunks.size(); } @@ -94,26 +94,10 @@ StoreByBucket::close() { }); _executor.execute(CpuUsage::wrap(std::move(task), CpuUsage::Category::COMPACT)); waitAllProcessed(); - std::sort(_where.begin(), _where.end()); -} - -size_t -StoreByBucket::getBucketCount() const { - if (_where.empty()) return 0; - - size_t count = 0; - BucketId prev = _where.front()._bucketId; - for (const auto & lid : _where) { - if (lid._bucketId != prev) { - count++; - prev = lid._bucketId; - } - } - return count + 1; } void -StoreByBucket::drain(IWrite & drainer) +StoreByBucket::drain(IWrite & drainer, IndexIterator & indexIterator) { std::vector<Chunk::UP> chunks; chunks.resize(_chunks.size()); @@ -122,7 +106,8 @@ StoreByBucket::drain(IWrite & drainer) chunks[it.first] = std::make_unique<Chunk>(it.first, buf.data(), buf.size()); } _chunks.clear(); - for (auto & idx : _where) { + while (indexIterator.has_next()) { + Index idx = indexIterator.next(); vespalib::ConstBufferRef data(chunks[idx._id]->getLid(idx._lid)); drainer.write(idx._bucketId, idx._chunkId, idx._lid, data); } diff --git a/searchlib/src/vespa/searchlib/docstore/storebybucket.h b/searchlib/src/vespa/searchlib/docstore/storebybucket.h index 6e52695d529..7507ba0fca6 100644 --- a/searchlib/src/vespa/searchlib/docstore/storebybucket.h +++ b/searchlib/src/vespa/searchlib/docstore/storebybucket.h @@ -23,51 +23,55 @@ class StoreByBucket using ConstBufferRef = vespalib::ConstBufferRef; using CompressionConfig = vespalib::compression::CompressionConfig; public: - StoreByBucket(MemoryDataStore & backingMemory, Executor & executor, CompressionConfig compression) noexcept; + struct Index { + using BucketId=document::BucketId; + Index(BucketId bucketId, uint32_t id, uint32_t chunkId, uint32_t entry) noexcept : + _bucketId(bucketId), _id(id), _chunkId(chunkId), _lid(entry) + { } + bool operator < (const Index & b) const noexcept { + return BucketId::bucketIdToKey(_bucketId.getRawId()) < BucketId::bucketIdToKey(b._bucketId.getRawId()); + } + BucketId _bucketId; + uint32_t _id; + uint32_t _chunkId; + uint32_t _lid; + }; + using IndexVector = std::vector<Index, vespalib::allocator_large<Index>>; + struct IWrite { + using BucketId=document::BucketId; + virtual ~IWrite() = default; + virtual void write(BucketId bucketId, uint32_t chunkId, uint32_t lid, ConstBufferRef data) = 0; + }; + struct IndexIterator { + virtual ~IndexIterator() = default; + virtual bool has_next() noexcept = 0; + virtual Index next() noexcept = 0; + }; + struct StoreIndex { + virtual ~StoreIndex() = default; + virtual void store(const Index & index) = 0; + }; + StoreByBucket(StoreIndex & storeIndex, MemoryDataStore & backingMemory, + Executor & executor, CompressionConfig compression) noexcept; //TODO Putting the below move constructor into cpp file fails for some unknown reason. Needs to be resolved. StoreByBucket(StoreByBucket &&) noexcept = delete; StoreByBucket(const StoreByBucket &) = delete; StoreByBucket & operator=(StoreByBucket &&) noexcept = delete; StoreByBucket & operator = (const StoreByBucket &) = delete; ~StoreByBucket(); - class IWrite { - public: - using BucketId=document::BucketId; - virtual ~IWrite() = default; - virtual void write(BucketId bucketId, uint32_t chunkId, uint32_t lid, ConstBufferRef data) = 0; - }; void add(document::BucketId bucketId, uint32_t chunkId, uint32_t lid, ConstBufferRef data); void close(); /// close() must have been called prior to calling getBucketCount() or drain() - void drain(IWrite & drain); - size_t getBucketCount() const; - - size_t getChunkCount() const; - size_t getLidCount() const { - return _where.size(); - } + void drain(IWrite & drain, IndexIterator & iterator); + size_t getChunkCount() const noexcept; private: void incChunksPosted(); void waitAllProcessed(); Chunk::UP createChunk(); void closeChunk(Chunk::UP chunk); - struct Index { - using BucketId=document::BucketId; - Index(BucketId bucketId, uint32_t id, uint32_t chunkId, uint32_t entry) noexcept : - _bucketId(bucketId), _id(id), _chunkId(chunkId), _lid(entry) - { } - bool operator < (const Index & b) const noexcept { - return BucketId::bucketIdToKey(_bucketId.getRawId()) < BucketId::bucketIdToKey(b._bucketId.getRawId()); - } - BucketId _bucketId; - uint32_t _id; - uint32_t _chunkId; - uint32_t _lid; - }; - using IndexVector = std::vector<Index, vespalib::allocator_large<Index>>; uint64_t _chunkSerial; Chunk::UP _current; - IndexVector _where; + StoreIndex & _storeIndex; MemoryDataStore & _backingMemory; Executor & _executor; mutable std::mutex _lock; |