diff options
Diffstat (limited to 'searchlib/src/vespa/searchlib/docstore/storebybucket.cpp')
-rw-r--r-- | searchlib/src/vespa/searchlib/docstore/storebybucket.cpp | 51 |
1 files changed, 26 insertions, 25 deletions
diff --git a/searchlib/src/vespa/searchlib/docstore/storebybucket.cpp b/searchlib/src/vespa/searchlib/docstore/storebybucket.cpp index 6d3c39a51dc..f26afc595f3 100644 --- a/searchlib/src/vespa/searchlib/docstore/storebybucket.cpp +++ b/searchlib/src/vespa/searchlib/docstore/storebybucket.cpp @@ -1,4 +1,4 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "storebybucket.h" #include <vespa/vespalib/data/databuffer.h> @@ -13,14 +13,14 @@ using document::BucketId; using vespalib::CpuUsage; using vespalib::makeLambdaTask; -StoreByBucket::StoreByBucket(MemoryDataStore & backingMemory, Executor & executor, CompressionConfig compression) noexcept +StoreByBucket::StoreByBucket(StoreIndex & storeIndex, MemoryDataStore & backingMemory, Executor & executor, CompressionConfig compression) noexcept : _chunkSerial(0), _current(), - _where(), + _storeIndex(storeIndex), _backingMemory(backingMemory), _executor(executor), - _lock(std::make_unique<std::mutex>()), - _cond(std::make_unique<std::condition_variable>()), + _lock(), + _cond(), _numChunksPosted(0), _chunks(), _compression(compression) @@ -31,9 +31,9 @@ StoreByBucket::StoreByBucket(MemoryDataStore & backingMemory, Executor & executo StoreByBucket::~StoreByBucket() = default; void -StoreByBucket::add(BucketId bucketId, uint32_t chunkId, uint32_t lid, const void *buffer, size_t sz) +StoreByBucket::add(BucketId bucketId, uint32_t chunkId, uint32_t lid, ConstBufferRef data) { - if ( ! _current->hasRoom(sz)) { + if ( ! _current->hasRoom(data.size())) { Chunk::UP tmpChunk = createChunk(); _current.swap(tmpChunk); incChunksPosted(); @@ -42,9 +42,8 @@ StoreByBucket::add(BucketId bucketId, uint32_t chunkId, uint32_t lid, const void }); _executor.execute(CpuUsage::wrap(std::move(task), CpuUsage::Category::COMPACT)); } - Index idx(bucketId, _current->getId(), chunkId, lid); - _current->append(lid, buffer, sz); - _where[bucketId.toKey()].push_back(idx); + _current->append(lid, data); + _storeIndex.store(Index(bucketId, _current->getId(), chunkId, lid)); } Chunk::UP @@ -54,8 +53,8 @@ StoreByBucket::createChunk() } size_t -StoreByBucket::getChunkCount() const { - std::lock_guard guard(*_lock); +StoreByBucket::getChunkCount() const noexcept { + std::lock_guard guard(_lock); return _chunks.size(); } @@ -66,36 +65,40 @@ StoreByBucket::closeChunk(Chunk::UP chunk) chunk->pack(1, buffer, _compression); buffer.shrink(buffer.getDataLen()); ConstBufferRef bufferRef(_backingMemory.push_back(buffer.getData(), buffer.getDataLen()).data(), buffer.getDataLen()); - std::lock_guard guard(*_lock); + std::lock_guard guard(_lock); _chunks[chunk->getId()] = bufferRef; if (_numChunksPosted == _chunks.size()) { - _cond->notify_one(); + _cond.notify_one(); } } void StoreByBucket::incChunksPosted() { - std::lock_guard guard(*_lock); + std::lock_guard guard(_lock); _numChunksPosted++; } void StoreByBucket::waitAllProcessed() { - std::unique_lock guard(*_lock); + std::unique_lock guard(_lock); while (_numChunksPosted != _chunks.size()) { - _cond->wait(guard); + _cond.wait(guard); } } void -StoreByBucket::drain(IWrite & drainer) -{ +StoreByBucket::close() { incChunksPosted(); auto task = makeLambdaTask([this, chunk=std::move(_current)]() mutable { closeChunk(std::move(chunk)); }); _executor.execute(CpuUsage::wrap(std::move(task), CpuUsage::Category::COMPACT)); waitAllProcessed(); +} + +void +StoreByBucket::drain(IWrite & drainer, IndexIterator & indexIterator) +{ std::vector<Chunk::UP> chunks; chunks.resize(_chunks.size()); for (const auto & it : _chunks) { @@ -103,12 +106,10 @@ StoreByBucket::drain(IWrite & drainer) chunks[it.first] = std::make_unique<Chunk>(it.first, buf.data(), buf.size()); } _chunks.clear(); - for (auto & it : _where) { - std::sort(it.second.begin(), it.second.end()); - for (Index idx : it.second) { - vespalib::ConstBufferRef data(chunks[idx._id]->getLid(idx._lid)); - drainer.write(idx._bucketId, idx._chunkId, idx._lid, data.c_str(), data.size()); - } + while (indexIterator.has_next()) { + Index idx = indexIterator.next(); + vespalib::ConstBufferRef data(chunks[idx._localChunkId]->getLid(idx._lid)); + drainer.write(idx._bucketId, idx._chunkId, idx._lid, data); } } |