diff options
Diffstat (limited to 'searchlib/src/vespa/searchlib/docstore/compacter.cpp')
-rw-r--r-- | searchlib/src/vespa/searchlib/docstore/compacter.cpp | 149 |
1 files changed, 95 insertions, 54 deletions
diff --git a/searchlib/src/vespa/searchlib/docstore/compacter.cpp b/searchlib/src/vespa/searchlib/docstore/compacter.cpp index c886e52659f..91faafc2a4e 100644 --- a/searchlib/src/vespa/searchlib/docstore/compacter.cpp +++ b/searchlib/src/vespa/searchlib/docstore/compacter.cpp @@ -1,10 +1,10 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "compacter.h" #include "logdatastore.h" #include <vespa/vespalib/util/size_literals.h> #include <vespa/vespalib/util/array.hpp> -#include <cinttypes> +#include <cassert> #include <vespa/log/log.h> LOG_SETUP(".searchlib.docstore.compacter"); @@ -14,83 +14,124 @@ namespace search::docstore { using vespalib::alloc::Alloc; namespace { - static constexpr size_t INITIAL_BACKING_BUFFER_SIZE = 64_Mi; + constexpr size_t INITIAL_BACKING_BUFFER_SIZE = 64_Mi; } void -Compacter::write(LockGuard guard, uint32_t chunkId, uint32_t lid, const void *buffer, size_t sz) { +Compacter::write(LockGuard guard, uint32_t chunkId, uint32_t lid, ConstBufferRef data) { (void) chunkId; - FileChunk::FileId fileId= _ds.getActiveFileId(guard); - _ds.write(std::move(guard), fileId, lid, buffer, sz); + FileChunk::FileId fileId = _ds.getActiveFileId(guard); + _ds.write(std::move(guard), fileId, lid, data); +} + +BucketIndexStore::BucketIndexStore(size_t maxSignificantBucketBits, uint32_t numPartitions) noexcept + : _inSignificantBucketBits((maxSignificantBucketBits > 8) ? (maxSignificantBucketBits - 8) : 0), + _where(), + _numPartitions(numPartitions), + _readyForIterate(true) +{} +BucketIndexStore::~BucketIndexStore() = default; + +void +BucketIndexStore::prepareForIterate() { + std::sort(_where.begin(), _where.end()); + _readyForIterate = true; +} + +void +BucketIndexStore::store(const StoreByBucket::Index & index) { + _where.push_back(index); + _readyForIterate = false; +} + +size_t +BucketIndexStore::getBucketCount() const noexcept { + if (_where.empty()) return 0; + + size_t count = 0; + document::BucketId prev = _where.front()._bucketId; + for (const auto & lid : _where) { + if (lid._bucketId != prev) { + count++; + prev = lid._bucketId; + } + } + return count + 1; +} + +std::unique_ptr<StoreByBucket::IndexIterator> +BucketIndexStore::createIterator(uint32_t partitionId) const { + assert(_readyForIterate); + return std::make_unique<LidIterator>(*this, partitionId); +} + +BucketIndexStore::LidIterator::LidIterator(const BucketIndexStore & store, size_t partitionId) + : _store(store), + _partitionId(partitionId), + _current(_store._where.begin()) +{} + +bool +BucketIndexStore::LidIterator::has_next() noexcept { + for (;(_current != _store._where.end()) && (_store.toPartitionId(_current->_bucketId) != _partitionId); _current++); + return (_current != _store._where.end()) && (_store.toPartitionId(_current->_bucketId) == _partitionId); +} + +StoreByBucket::Index +BucketIndexStore::LidIterator::next() noexcept { + return *_current++; } BucketCompacter::BucketCompacter(size_t maxSignificantBucketBits, CompressionConfig compression, LogDataStore & ds, - Executor & executor, const IBucketizer & bucketizer, FileId source, FileId destination) : - _unSignificantBucketBits((maxSignificantBucketBits > 8) ? (maxSignificantBucketBits - 8) : 0), - _sourceFileId(source), - _destinationFileId(destination), - _ds(ds), - _bucketizer(bucketizer), - _writeCount(0), - _maxBucketGuardDuration(vespalib::duration::zero()), - _lastSample(vespalib::steady_clock::now()), - _lock(), - _backingMemory(Alloc::alloc(INITIAL_BACKING_BUFFER_SIZE), &_lock), - _tmpStore(), - _lidGuard(ds.getLidReadGuard()), - _bucketizerGuard(), - _stat() + Executor & executor, const IBucketizer & bucketizer, FileId source, FileId destination) + : _sourceFileId(source), + _destinationFileId(destination), + _ds(ds), + _bucketizer(bucketizer), + _lock(), + _backingMemory(Alloc::alloc(INITIAL_BACKING_BUFFER_SIZE), &_lock), + _bucketIndexStore(maxSignificantBucketBits, NUM_PARTITIONS), + _tmpStore(), + _lidGuard(ds.getLidReadGuard()), + _stat() { - _tmpStore.reserve(256); - for (size_t i(0); i < 256; i++) { - _tmpStore.emplace_back(_backingMemory, executor, compression); + for (auto & partition : _tmpStore) { + partition = std::make_unique<StoreByBucket>(_bucketIndexStore, _backingMemory, executor, compression); } } +BucketCompacter::~BucketCompacter() = default; + FileChunk::FileId BucketCompacter::getDestinationId(const LockGuard & guard) const { return (_destinationFileId.isActive()) ? _ds.getActiveFileId(guard) : _destinationFileId; } void -BucketCompacter::write(LockGuard guard, uint32_t chunkId, uint32_t lid, const void *buffer, size_t sz) +BucketCompacter::write(LockGuard guard, uint32_t chunkId, uint32_t lid, ConstBufferRef data) { - if (_writeCount++ == 0) { - _bucketizerGuard = _bucketizer.getGuard(); - _lastSample = vespalib::steady_clock::now(); - } guard.unlock(); - BucketId bucketId = (sz > 0) ? _bucketizer.getBucketOf(_bucketizerGuard, lid) : BucketId(); - uint64_t sortableBucketId = bucketId.toKey(); - _tmpStore[(sortableBucketId >> _unSignificantBucketBits) % _tmpStore.size()].add(bucketId, chunkId, lid, buffer, sz); - if ((_writeCount % 1000) == 0) { - _bucketizerGuard = _bucketizer.getGuard(); - vespalib::steady_time now = vespalib::steady_clock::now(); - _maxBucketGuardDuration = std::max(_maxBucketGuardDuration, now - _lastSample); - _lastSample = now; - } + BucketId bucketId = (data.size() > 0) ? _bucketizer.getBucketOf(_bucketizer.getGuard(), lid) : BucketId(); + _tmpStore[_bucketIndexStore.toPartitionId(bucketId)]->add(bucketId, chunkId, lid, data); } void BucketCompacter::close() { - _bucketizerGuard = GenerationHandler::Guard(); - vespalib::duration lastBucketGuardDuration = vespalib::steady_clock::now() - _lastSample; - size_t lidCount1(0); - size_t bucketCount(0); size_t chunkCount(0); - for (const StoreByBucket & store : _tmpStore) { - lidCount1 += store.getLidCount(); - bucketCount += store.getBucketCount(); - chunkCount += store.getChunkCount(); + for (const auto & store : _tmpStore) { + store->close(); + chunkCount += store->getChunkCount(); } - LOG(info, "Have read %ld lids and placed them in %ld buckets. Temporary compressed in %ld chunks." - " Max bucket guard held for %" PRId64 " us, and last before close for %" PRId64 " us", - lidCount1, bucketCount, chunkCount, vespalib::count_us(_maxBucketGuardDuration), vespalib::count_us(lastBucketGuardDuration)); + _bucketIndexStore.prepareForIterate(); + LOG(info, "Have read %ld lids and placed them in %ld buckets. Temporary compressed in %ld chunks.", + _bucketIndexStore.getLidCount(), _bucketIndexStore.getBucketCount(), chunkCount); - for (StoreByBucket & store : _tmpStore) { - store.drain(*this); + for (size_t partId(0); partId < _tmpStore.size(); partId++) { + auto partIterator = _bucketIndexStore.createIterator(partId); + _tmpStore[partId]->drain(*this, *partIterator); } + // All partitions using _backingMemory should be destructed before clearing. _backingMemory.clear(); size_t lidCount(0); @@ -101,14 +142,14 @@ BucketCompacter::close() } void -BucketCompacter::write(BucketId bucketId, uint32_t chunkId, uint32_t lid, const void *buffer, size_t sz) +BucketCompacter::write(BucketId bucketId, uint32_t chunkId, uint32_t lid, ConstBufferRef data) { _stat[bucketId.getId()]++; LockGuard guard(_ds.getLidGuard(lid)); - LidInfo lidInfo(_sourceFileId.getId(), chunkId, sz); + LidInfo lidInfo(_sourceFileId.getId(), chunkId, data.size()); if (_ds.getLid(_lidGuard, lid) == lidInfo) { FileId fileId = getDestinationId(guard); - _ds.write(std::move(guard), fileId, lid, buffer, sz); + _ds.write(std::move(guard), fileId, lid, data); } } |