aboutsummaryrefslogtreecommitdiffstats
path: root/searchlib/src/vespa/searchlib/docstore/compacter.h
diff options
context:
space:
mode:
Diffstat (limited to 'searchlib/src/vespa/searchlib/docstore/compacter.h')
-rw-r--r--searchlib/src/vespa/searchlib/docstore/compacter.h69
1 files changed, 50 insertions, 19 deletions
diff --git a/searchlib/src/vespa/searchlib/docstore/compacter.h b/searchlib/src/vespa/searchlib/docstore/compacter.h
index 0d7633b1699..f44d7c341d2 100644
--- a/searchlib/src/vespa/searchlib/docstore/compacter.h
+++ b/searchlib/src/vespa/searchlib/docstore/compacter.h
@@ -1,4 +1,4 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#pragma once
@@ -16,20 +16,52 @@ namespace search::docstore {
class Compacter : public IWriteData
{
public:
- Compacter(LogDataStore & ds) : _ds(ds) { }
- void write(LockGuard guard, uint32_t chunkId, uint32_t lid, const void *buffer, size_t sz) override;
+ explicit Compacter(LogDataStore & ds) : _ds(ds) { }
+ void write(LockGuard guard, uint32_t chunkId, uint32_t lid, ConstBufferRef data) override;
void close() override { }
private:
LogDataStore & _ds;
};
+class BucketIndexStore : public StoreByBucket::StoreIndex {
+public:
+ BucketIndexStore(size_t maxSignificantBucketBits, uint32_t numPartitions) noexcept;
+ ~BucketIndexStore() override;
+ size_t toPartitionId(document::BucketId bucketId) const noexcept {
+ uint64_t sortableBucketId = bucketId.toKey();
+ return (sortableBucketId >> _inSignificantBucketBits) % _numPartitions;
+ }
+ void store(const StoreByBucket::Index & index) override;
+ size_t getBucketCount() const noexcept;
+ size_t getLidCount() const noexcept { return _where.size(); }
+ void prepareForIterate();
+ std::unique_ptr<StoreByBucket::IndexIterator> createIterator(uint32_t partitionId) const;
+private:
+ using IndexVector = std::vector<StoreByBucket::Index, vespalib::allocator_large<StoreByBucket::Index>>;
+ class LidIterator : public StoreByBucket::IndexIterator {
+ public:
+ LidIterator(const BucketIndexStore & bc, size_t partitionId);
+ bool has_next() noexcept override;
+ StoreByBucket::Index next() noexcept override;
+ private:
+ const BucketIndexStore & _store;
+ size_t _partitionId;
+ IndexVector::const_iterator _current;
+ };
+ size_t _inSignificantBucketBits;
+ IndexVector _where;
+ uint32_t _numPartitions;
+ bool _readyForIterate;
+};
+
/**
* This will split the incoming data into buckets.
* The buckets data will then be written out in bucket order.
* The buckets will be ordered, and the objects inside the buckets will be further ordered.
* All data are kept compressed to minimize memory usage.
**/
-class BucketCompacter : public IWriteData, public StoreByBucket::IWrite
+class BucketCompacter : public IWriteData,
+ public StoreByBucket::IWrite
{
using CompressionConfig = vespalib::compression::CompressionConfig;
using Executor = vespalib::Executor;
@@ -37,25 +69,24 @@ public:
using FileId = FileChunk::FileId;
BucketCompacter(size_t maxSignificantBucketBits, CompressionConfig compression, LogDataStore & ds,
Executor & executor, const IBucketizer & bucketizer, FileId source, FileId destination);
- void write(LockGuard guard, uint32_t chunkId, uint32_t lid, const void *buffer, size_t sz) override ;
- void write(BucketId bucketId, uint32_t chunkId, uint32_t lid, const void *buffer, size_t sz) override;
+ ~BucketCompacter() override;
+ void write(LockGuard guard, uint32_t chunkId, uint32_t lid, ConstBufferRef data) override;
+ void write(BucketId bucketId, uint32_t chunkId, uint32_t lid, ConstBufferRef data) override;
void close() override;
private:
+ static constexpr size_t NUM_PARTITIONS = 256;
using GenerationHandler = vespalib::GenerationHandler;
+ using Partitions = std::array<std::unique_ptr<StoreByBucket>, NUM_PARTITIONS>;
FileId getDestinationId(const LockGuard & guard) const;
- size_t _unSignificantBucketBits;
- FileId _sourceFileId;
- FileId _destinationFileId;
- LogDataStore & _ds;
- const IBucketizer & _bucketizer;
- uint64_t _writeCount;
- vespalib::duration _maxBucketGuardDuration;
- vespalib::steady_time _lastSample;
- std::mutex _lock;
- vespalib::MemoryDataStore _backingMemory;
- std::vector<StoreByBucket> _tmpStore;
- GenerationHandler::Guard _lidGuard;
- GenerationHandler::Guard _bucketizerGuard;
+ FileId _sourceFileId;
+ FileId _destinationFileId;
+ LogDataStore & _ds;
+ const IBucketizer & _bucketizer;
+ std::mutex _lock;
+ vespalib::MemoryDataStore _backingMemory;
+ BucketIndexStore _bucketIndexStore;
+ Partitions _tmpStore;
+ GenerationHandler::Guard _lidGuard;
vespalib::hash_map<uint64_t, uint32_t> _stat;
};