diff options
Diffstat (limited to 'searchlib/src/vespa/searchlib/docstore/compacter.h')
-rw-r--r-- | searchlib/src/vespa/searchlib/docstore/compacter.h | 69 |
1 files changed, 50 insertions, 19 deletions
diff --git a/searchlib/src/vespa/searchlib/docstore/compacter.h b/searchlib/src/vespa/searchlib/docstore/compacter.h index 0d7633b1699..f44d7c341d2 100644 --- a/searchlib/src/vespa/searchlib/docstore/compacter.h +++ b/searchlib/src/vespa/searchlib/docstore/compacter.h @@ -1,4 +1,4 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #pragma once @@ -16,20 +16,52 @@ namespace search::docstore { class Compacter : public IWriteData { public: - Compacter(LogDataStore & ds) : _ds(ds) { } - void write(LockGuard guard, uint32_t chunkId, uint32_t lid, const void *buffer, size_t sz) override; + explicit Compacter(LogDataStore & ds) : _ds(ds) { } + void write(LockGuard guard, uint32_t chunkId, uint32_t lid, ConstBufferRef data) override; void close() override { } private: LogDataStore & _ds; }; +class BucketIndexStore : public StoreByBucket::StoreIndex { +public: + BucketIndexStore(size_t maxSignificantBucketBits, uint32_t numPartitions) noexcept; + ~BucketIndexStore() override; + size_t toPartitionId(document::BucketId bucketId) const noexcept { + uint64_t sortableBucketId = bucketId.toKey(); + return (sortableBucketId >> _inSignificantBucketBits) % _numPartitions; + } + void store(const StoreByBucket::Index & index) override; + size_t getBucketCount() const noexcept; + size_t getLidCount() const noexcept { return _where.size(); } + void prepareForIterate(); + std::unique_ptr<StoreByBucket::IndexIterator> createIterator(uint32_t partitionId) const; +private: + using IndexVector = std::vector<StoreByBucket::Index, vespalib::allocator_large<StoreByBucket::Index>>; + class LidIterator : public StoreByBucket::IndexIterator { + public: + LidIterator(const BucketIndexStore & bc, size_t partitionId); + bool has_next() noexcept override; + StoreByBucket::Index next() noexcept override; + private: + const BucketIndexStore & _store; + size_t _partitionId; + IndexVector::const_iterator _current; + }; + size_t _inSignificantBucketBits; + IndexVector _where; + uint32_t _numPartitions; + bool _readyForIterate; +}; + /** * This will split the incoming data into buckets. * The buckets data will then be written out in bucket order. * The buckets will be ordered, and the objects inside the buckets will be further ordered. * All data are kept compressed to minimize memory usage. **/ -class BucketCompacter : public IWriteData, public StoreByBucket::IWrite +class BucketCompacter : public IWriteData, + public StoreByBucket::IWrite { using CompressionConfig = vespalib::compression::CompressionConfig; using Executor = vespalib::Executor; @@ -37,25 +69,24 @@ public: using FileId = FileChunk::FileId; BucketCompacter(size_t maxSignificantBucketBits, CompressionConfig compression, LogDataStore & ds, Executor & executor, const IBucketizer & bucketizer, FileId source, FileId destination); - void write(LockGuard guard, uint32_t chunkId, uint32_t lid, const void *buffer, size_t sz) override ; - void write(BucketId bucketId, uint32_t chunkId, uint32_t lid, const void *buffer, size_t sz) override; + ~BucketCompacter() override; + void write(LockGuard guard, uint32_t chunkId, uint32_t lid, ConstBufferRef data) override; + void write(BucketId bucketId, uint32_t chunkId, uint32_t lid, ConstBufferRef data) override; void close() override; private: + static constexpr size_t NUM_PARTITIONS = 256; using GenerationHandler = vespalib::GenerationHandler; + using Partitions = std::array<std::unique_ptr<StoreByBucket>, NUM_PARTITIONS>; FileId getDestinationId(const LockGuard & guard) const; - size_t _unSignificantBucketBits; - FileId _sourceFileId; - FileId _destinationFileId; - LogDataStore & _ds; - const IBucketizer & _bucketizer; - uint64_t _writeCount; - vespalib::duration _maxBucketGuardDuration; - vespalib::steady_time _lastSample; - std::mutex _lock; - vespalib::MemoryDataStore _backingMemory; - std::vector<StoreByBucket> _tmpStore; - GenerationHandler::Guard _lidGuard; - GenerationHandler::Guard _bucketizerGuard; + FileId _sourceFileId; + FileId _destinationFileId; + LogDataStore & _ds; + const IBucketizer & _bucketizer; + std::mutex _lock; + vespalib::MemoryDataStore _backingMemory; + BucketIndexStore _bucketIndexStore; + Partitions _tmpStore; + GenerationHandler::Guard _lidGuard; vespalib::hash_map<uint64_t, uint32_t> _stat; }; |