summaryrefslogtreecommitdiffstats
path: root/searchlib
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2023-10-06 07:54:05 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2023-10-06 11:16:47 +0000
commit62cf056fb15b5f77686a297f362dba2b21648074 (patch)
treeb1466bbd3642e41142698540ca8da055aaaa5b96 /searchlib
parent57f34d015e3d9fd85a593455e4cd61919ea6bee0 (diff)
- Use a single store for mapping lid to its data that are split into partitions and chunks.
- This enable memory to be released after compaction is done.
Diffstat (limited to 'searchlib')
-rw-r--r--searchlib/src/tests/docstore/store_by_bucket/store_by_bucket_test.cpp41
-rw-r--r--searchlib/src/vespa/searchlib/docstore/compacter.cpp55
-rw-r--r--searchlib/src/vespa/searchlib/docstore/compacter.h40
-rw-r--r--searchlib/src/vespa/searchlib/docstore/storebybucket.cpp29
-rw-r--r--searchlib/src/vespa/searchlib/docstore/storebybucket.h62
5 files changed, 149 insertions, 78 deletions
diff --git a/searchlib/src/tests/docstore/store_by_bucket/store_by_bucket_test.cpp b/searchlib/src/tests/docstore/store_by_bucket/store_by_bucket_test.cpp
index 053a2806b5d..50e99b15fb2 100644
--- a/searchlib/src/tests/docstore/store_by_bucket/store_by_bucket_test.cpp
+++ b/searchlib/src/tests/docstore/store_by_bucket/store_by_bucket_test.cpp
@@ -40,7 +40,8 @@ add(StoreByBucket & sbb, size_t i) {
class VerifyBucketOrder : public StoreByBucket::IWrite {
public:
- VerifyBucketOrder() : _lastLid(0), _lastBucketId(0), _uniqueUser(), _uniqueBucket() { }
+ VerifyBucketOrder() : _lastLid(0), _lastBucketId(0), _uniqueUser(), _uniqueBucket(){ }
+ ~VerifyBucketOrder() override;
void write(BucketId bucketId, uint32_t chunkId, uint32_t lid, vespalib::ConstBufferRef data) override {
(void) chunkId;
EXPECT_LESS_EQUAL(_lastBucketId.toKey(), bucketId.toKey());
@@ -56,22 +57,48 @@ public:
_lastBucketId = bucketId;
EXPECT_EQUAL(0, memcmp(data.data(), createPayload(bucketId).c_str(), data.size()));
}
- ~VerifyBucketOrder() override;
+
private:
uint32_t _lastLid;
BucketId _lastBucketId;
vespalib::hash_set<uint32_t> _uniqueUser;
vespalib::hash_set<uint64_t> _uniqueBucket;
+
};
VerifyBucketOrder::~VerifyBucketOrder() = default;
+struct StoreIndex : public StoreByBucket::StoreIndex {
+ ~StoreIndex() override;
+ void store(const StoreByBucket::Index &index) override {
+ _where.push_back(index);
+ }
+ std::vector<StoreByBucket::Index> _where;
+};
+StoreIndex::~StoreIndex() = default;
+
+struct Iterator : public StoreByBucket::IndexIterator {
+ Iterator(const std::vector<StoreByBucket::Index> & where) : _where(where), _current(0) {}
+
+ bool has_next() noexcept override {
+ return _current < _where.size();
+ }
+
+ StoreByBucket::Index next() noexcept override {
+ return _where[_current++];
+ }
+
+ const std::vector<StoreByBucket::Index> & _where;
+ uint32_t _current;
+};
+
TEST("require that StoreByBucket gives bucket by bucket and ordered within")
{
std::mutex backing_lock;
vespalib::MemoryDataStore backing(vespalib::alloc::Alloc::alloc(256), &backing_lock);
vespalib::ThreadStackExecutor executor(8);
- StoreByBucket sbb(backing, executor, CompressionConfig::LZ4);
+ StoreIndex storeIndex;
+ StoreByBucket sbb(storeIndex, backing, executor, CompressionConfig::LZ4);
for (size_t i(1); i <=500; i++) {
add(sbb, i);
}
@@ -79,10 +106,12 @@ TEST("require that StoreByBucket gives bucket by bucket and ordered within")
add(sbb, i);
}
sbb.close();
- EXPECT_EQUAL(32u, sbb.getBucketCount());
- EXPECT_EQUAL(1000u, sbb.getLidCount());
+ std::sort(storeIndex._where.begin(), storeIndex._where.end());
+ //EXPECT_EQUAL(32u, sbb.getBucketCount());
+ EXPECT_EQUAL(1000u, storeIndex._where.size());
VerifyBucketOrder vbo;
- sbb.drain(vbo);
+ Iterator all(storeIndex._where);
+ sbb.drain(vbo, all);
}
TEST_MAIN() { TEST_RUN_ALL(); }
diff --git a/searchlib/src/vespa/searchlib/docstore/compacter.cpp b/searchlib/src/vespa/searchlib/docstore/compacter.cpp
index 803b916b67d..6caafe42040 100644
--- a/searchlib/src/vespa/searchlib/docstore/compacter.cpp
+++ b/searchlib/src/vespa/searchlib/docstore/compacter.cpp
@@ -37,7 +37,7 @@ BucketCompacter::BucketCompacter(size_t maxSignificantBucketBits, CompressionCon
_stat()
{
for (auto & partition : _tmpStore) {
- partition = std::make_unique<StoreByBucket>(_backingMemory, executor, compression);
+ partition = std::make_unique<StoreByBucket>(*this, _backingMemory, executor, compression);
}
}
@@ -51,28 +51,61 @@ BucketCompacter::write(LockGuard guard, uint32_t chunkId, uint32_t lid, ConstBuf
{
guard.unlock();
BucketId bucketId = (data.size() > 0) ? _bucketizer.getBucketOf(_bucketizer.getGuard(), lid) : BucketId();
- uint64_t sortableBucketId = bucketId.toKey();
- _tmpStore[(sortableBucketId >> _unSignificantBucketBits) % _tmpStore.size()]->add(bucketId, chunkId, lid, data);
+ _tmpStore[toPartitionId(bucketId)]->add(bucketId, chunkId, lid, data);
+}
+
+void
+BucketCompacter::store(const StoreByBucket::Index & index) {
+ _where.push_back(index);
+}
+
+size_t
+BucketCompacter::getBucketCount() const noexcept {
+ if (_where.empty()) return 0;
+
+ size_t count = 0;
+ BucketId prev = _where.front()._bucketId;
+ for (const auto & lid : _where) {
+ if (lid._bucketId != prev) {
+ count++;
+ prev = lid._bucketId;
+ }
+ }
+ return count + 1;
+}
+
+BucketCompacter::LidIterator::LidIterator(const BucketCompacter & bc, size_t partitionId)
+ : _bc(bc),
+ _partitionId(partitionId),
+ _current(_bc._where.begin())
+{}
+
+bool
+BucketCompacter::LidIterator::has_next() noexcept {
+ for (;(_current != _bc._where.end()) && (_bc.toPartitionId(_current->_bucketId) != _partitionId); _current++);
+ return (_current != _bc._where.end()) && (_bc.toPartitionId(_current->_bucketId) == _partitionId);
+}
+
+StoreByBucket::Index
+BucketCompacter::LidIterator::next() noexcept {
+ return *_current++;
}
void
BucketCompacter::close()
{
- size_t lidCount1(0);
- size_t bucketCount(0);
size_t chunkCount(0);
for (const auto & store : _tmpStore) {
store->close();
- lidCount1 += store->getLidCount();
- bucketCount += store->getBucketCount();
chunkCount += store->getChunkCount();
}
+ std::sort(_where.begin(), _where.end());
LOG(info, "Have read %ld lids and placed them in %ld buckets. Temporary compressed in %ld chunks.",
- lidCount1, bucketCount, chunkCount);
+ _where.size(), getBucketCount(), chunkCount);
- for (auto & store_ref : _tmpStore) {
- auto store = std::move(store_ref);
- store->drain(*this);
+ for (size_t partId(0); partId < _tmpStore.size(); partId++) {
+ LidIterator partIterator(*this, partId);
+ _tmpStore[partId]->drain(*this, partIterator);
}
// All partitions using _backingMemory should be destructed before clearing.
_backingMemory.clear();
diff --git a/searchlib/src/vespa/searchlib/docstore/compacter.h b/searchlib/src/vespa/searchlib/docstore/compacter.h
index 354ca24ede9..1eb3fda78a6 100644
--- a/searchlib/src/vespa/searchlib/docstore/compacter.h
+++ b/searchlib/src/vespa/searchlib/docstore/compacter.h
@@ -30,7 +30,9 @@ private:
* The buckets will be ordered, and the objects inside the buckets will be further ordered.
* All data are kept compressed to minimize memory usage.
**/
-class BucketCompacter : public IWriteData, public StoreByBucket::IWrite
+class BucketCompacter : public IWriteData,
+ public StoreByBucket::IWrite,
+ public StoreByBucket::StoreIndex
{
using CompressionConfig = vespalib::compression::CompressionConfig;
using Executor = vespalib::Executor;
@@ -40,21 +42,39 @@ public:
Executor & executor, const IBucketizer & bucketizer, FileId source, FileId destination);
void write(LockGuard guard, uint32_t chunkId, uint32_t lid, ConstBufferRef data) override;
void write(BucketId bucketId, uint32_t chunkId, uint32_t lid, ConstBufferRef data) override;
+ void store(const StoreByBucket::Index & index) override;
+ size_t toPartitionId(BucketId bucketId) const noexcept {
+ uint64_t sortableBucketId = bucketId.toKey();
+ return (sortableBucketId >> _unSignificantBucketBits) % _tmpStore.size();
+ }
void close() override;
private:
+ size_t getBucketCount() const noexcept;
static constexpr size_t NUM_PARTITIONS = 256;
using GenerationHandler = vespalib::GenerationHandler;
using Partitions = std::array<std::unique_ptr<StoreByBucket>, NUM_PARTITIONS>;
+ using IndexVector = std::vector<StoreByBucket::Index, vespalib::allocator_large<StoreByBucket::Index>>;
+ class LidIterator : public StoreByBucket::IndexIterator {
+ public:
+ LidIterator(const BucketCompacter & bc, size_t partitionId);
+ bool has_next() noexcept override;
+ StoreByBucket::Index next() noexcept override;
+ private:
+ const BucketCompacter & _bc;
+ size_t _partitionId;
+ IndexVector::const_iterator _current;
+ };
FileId getDestinationId(const LockGuard & guard) const;
- size_t _unSignificantBucketBits;
- FileId _sourceFileId;
- FileId _destinationFileId;
- LogDataStore & _ds;
- const IBucketizer & _bucketizer;
- std::mutex _lock;
- vespalib::MemoryDataStore _backingMemory;
- Partitions _tmpStore;
- GenerationHandler::Guard _lidGuard;
+ size_t _unSignificantBucketBits;
+ FileId _sourceFileId;
+ FileId _destinationFileId;
+ LogDataStore & _ds;
+ const IBucketizer & _bucketizer;
+ std::mutex _lock;
+ vespalib::MemoryDataStore _backingMemory;
+ IndexVector _where;
+ Partitions _tmpStore;
+ GenerationHandler::Guard _lidGuard;
vespalib::hash_map<uint64_t, uint32_t> _stat;
};
diff --git a/searchlib/src/vespa/searchlib/docstore/storebybucket.cpp b/searchlib/src/vespa/searchlib/docstore/storebybucket.cpp
index dbcbaafbbb7..34280ffd16e 100644
--- a/searchlib/src/vespa/searchlib/docstore/storebybucket.cpp
+++ b/searchlib/src/vespa/searchlib/docstore/storebybucket.cpp
@@ -13,10 +13,10 @@ using document::BucketId;
using vespalib::CpuUsage;
using vespalib::makeLambdaTask;
-StoreByBucket::StoreByBucket(MemoryDataStore & backingMemory, Executor & executor, CompressionConfig compression) noexcept
+StoreByBucket::StoreByBucket(StoreIndex & storeIndex, MemoryDataStore & backingMemory, Executor & executor, CompressionConfig compression) noexcept
: _chunkSerial(0),
_current(),
- _where(),
+ _storeIndex(storeIndex),
_backingMemory(backingMemory),
_executor(executor),
_lock(),
@@ -43,7 +43,7 @@ StoreByBucket::add(BucketId bucketId, uint32_t chunkId, uint32_t lid, ConstBuffe
_executor.execute(CpuUsage::wrap(std::move(task), CpuUsage::Category::COMPACT));
}
_current->append(lid, data);
- _where.emplace_back(bucketId, _current->getId(), chunkId, lid);
+ _storeIndex.store(Index(bucketId, _current->getId(), chunkId, lid));
}
Chunk::UP
@@ -53,7 +53,7 @@ StoreByBucket::createChunk()
}
size_t
-StoreByBucket::getChunkCount() const {
+StoreByBucket::getChunkCount() const noexcept {
std::lock_guard guard(_lock);
return _chunks.size();
}
@@ -94,26 +94,10 @@ StoreByBucket::close() {
});
_executor.execute(CpuUsage::wrap(std::move(task), CpuUsage::Category::COMPACT));
waitAllProcessed();
- std::sort(_where.begin(), _where.end());
-}
-
-size_t
-StoreByBucket::getBucketCount() const {
- if (_where.empty()) return 0;
-
- size_t count = 0;
- BucketId prev = _where.front()._bucketId;
- for (const auto & lid : _where) {
- if (lid._bucketId != prev) {
- count++;
- prev = lid._bucketId;
- }
- }
- return count + 1;
}
void
-StoreByBucket::drain(IWrite & drainer)
+StoreByBucket::drain(IWrite & drainer, IndexIterator & indexIterator)
{
std::vector<Chunk::UP> chunks;
chunks.resize(_chunks.size());
@@ -122,7 +106,8 @@ StoreByBucket::drain(IWrite & drainer)
chunks[it.first] = std::make_unique<Chunk>(it.first, buf.data(), buf.size());
}
_chunks.clear();
- for (auto & idx : _where) {
+ while (indexIterator.has_next()) {
+ Index idx = indexIterator.next();
vespalib::ConstBufferRef data(chunks[idx._id]->getLid(idx._lid));
drainer.write(idx._bucketId, idx._chunkId, idx._lid, data);
}
diff --git a/searchlib/src/vespa/searchlib/docstore/storebybucket.h b/searchlib/src/vespa/searchlib/docstore/storebybucket.h
index 6e52695d529..7507ba0fca6 100644
--- a/searchlib/src/vespa/searchlib/docstore/storebybucket.h
+++ b/searchlib/src/vespa/searchlib/docstore/storebybucket.h
@@ -23,51 +23,55 @@ class StoreByBucket
using ConstBufferRef = vespalib::ConstBufferRef;
using CompressionConfig = vespalib::compression::CompressionConfig;
public:
- StoreByBucket(MemoryDataStore & backingMemory, Executor & executor, CompressionConfig compression) noexcept;
+ struct Index {
+ using BucketId=document::BucketId;
+ Index(BucketId bucketId, uint32_t id, uint32_t chunkId, uint32_t entry) noexcept :
+ _bucketId(bucketId), _id(id), _chunkId(chunkId), _lid(entry)
+ { }
+ bool operator < (const Index & b) const noexcept {
+ return BucketId::bucketIdToKey(_bucketId.getRawId()) < BucketId::bucketIdToKey(b._bucketId.getRawId());
+ }
+ BucketId _bucketId;
+ uint32_t _id;
+ uint32_t _chunkId;
+ uint32_t _lid;
+ };
+ using IndexVector = std::vector<Index, vespalib::allocator_large<Index>>;
+ struct IWrite {
+ using BucketId=document::BucketId;
+ virtual ~IWrite() = default;
+ virtual void write(BucketId bucketId, uint32_t chunkId, uint32_t lid, ConstBufferRef data) = 0;
+ };
+ struct IndexIterator {
+ virtual ~IndexIterator() = default;
+ virtual bool has_next() noexcept = 0;
+ virtual Index next() noexcept = 0;
+ };
+ struct StoreIndex {
+ virtual ~StoreIndex() = default;
+ virtual void store(const Index & index) = 0;
+ };
+ StoreByBucket(StoreIndex & storeIndex, MemoryDataStore & backingMemory,
+ Executor & executor, CompressionConfig compression) noexcept;
//TODO Putting the below move constructor into cpp file fails for some unknown reason. Needs to be resolved.
StoreByBucket(StoreByBucket &&) noexcept = delete;
StoreByBucket(const StoreByBucket &) = delete;
StoreByBucket & operator=(StoreByBucket &&) noexcept = delete;
StoreByBucket & operator = (const StoreByBucket &) = delete;
~StoreByBucket();
- class IWrite {
- public:
- using BucketId=document::BucketId;
- virtual ~IWrite() = default;
- virtual void write(BucketId bucketId, uint32_t chunkId, uint32_t lid, ConstBufferRef data) = 0;
- };
void add(document::BucketId bucketId, uint32_t chunkId, uint32_t lid, ConstBufferRef data);
void close();
/// close() must have been called prior to calling getBucketCount() or drain()
- void drain(IWrite & drain);
- size_t getBucketCount() const;
-
- size_t getChunkCount() const;
- size_t getLidCount() const {
- return _where.size();
- }
+ void drain(IWrite & drain, IndexIterator & iterator);
+ size_t getChunkCount() const noexcept;
private:
void incChunksPosted();
void waitAllProcessed();
Chunk::UP createChunk();
void closeChunk(Chunk::UP chunk);
- struct Index {
- using BucketId=document::BucketId;
- Index(BucketId bucketId, uint32_t id, uint32_t chunkId, uint32_t entry) noexcept :
- _bucketId(bucketId), _id(id), _chunkId(chunkId), _lid(entry)
- { }
- bool operator < (const Index & b) const noexcept {
- return BucketId::bucketIdToKey(_bucketId.getRawId()) < BucketId::bucketIdToKey(b._bucketId.getRawId());
- }
- BucketId _bucketId;
- uint32_t _id;
- uint32_t _chunkId;
- uint32_t _lid;
- };
- using IndexVector = std::vector<Index, vespalib::allocator_large<Index>>;
uint64_t _chunkSerial;
Chunk::UP _current;
- IndexVector _where;
+ StoreIndex & _storeIndex;
MemoryDataStore & _backingMemory;
Executor & _executor;
mutable std::mutex _lock;