aboutsummaryrefslogtreecommitdiffstats
path: root/searchlib/src/vespa/searchlib/docstore/storebybucket.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'searchlib/src/vespa/searchlib/docstore/storebybucket.cpp')
-rw-r--r--searchlib/src/vespa/searchlib/docstore/storebybucket.cpp51
1 files changed, 26 insertions, 25 deletions
diff --git a/searchlib/src/vespa/searchlib/docstore/storebybucket.cpp b/searchlib/src/vespa/searchlib/docstore/storebybucket.cpp
index 6d3c39a51dc..f26afc595f3 100644
--- a/searchlib/src/vespa/searchlib/docstore/storebybucket.cpp
+++ b/searchlib/src/vespa/searchlib/docstore/storebybucket.cpp
@@ -1,4 +1,4 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "storebybucket.h"
#include <vespa/vespalib/data/databuffer.h>
@@ -13,14 +13,14 @@ using document::BucketId;
using vespalib::CpuUsage;
using vespalib::makeLambdaTask;
-StoreByBucket::StoreByBucket(MemoryDataStore & backingMemory, Executor & executor, CompressionConfig compression) noexcept
+StoreByBucket::StoreByBucket(StoreIndex & storeIndex, MemoryDataStore & backingMemory, Executor & executor, CompressionConfig compression) noexcept
: _chunkSerial(0),
_current(),
- _where(),
+ _storeIndex(storeIndex),
_backingMemory(backingMemory),
_executor(executor),
- _lock(std::make_unique<std::mutex>()),
- _cond(std::make_unique<std::condition_variable>()),
+ _lock(),
+ _cond(),
_numChunksPosted(0),
_chunks(),
_compression(compression)
@@ -31,9 +31,9 @@ StoreByBucket::StoreByBucket(MemoryDataStore & backingMemory, Executor & executo
StoreByBucket::~StoreByBucket() = default;
void
-StoreByBucket::add(BucketId bucketId, uint32_t chunkId, uint32_t lid, const void *buffer, size_t sz)
+StoreByBucket::add(BucketId bucketId, uint32_t chunkId, uint32_t lid, ConstBufferRef data)
{
- if ( ! _current->hasRoom(sz)) {
+ if ( ! _current->hasRoom(data.size())) {
Chunk::UP tmpChunk = createChunk();
_current.swap(tmpChunk);
incChunksPosted();
@@ -42,9 +42,8 @@ StoreByBucket::add(BucketId bucketId, uint32_t chunkId, uint32_t lid, const void
});
_executor.execute(CpuUsage::wrap(std::move(task), CpuUsage::Category::COMPACT));
}
- Index idx(bucketId, _current->getId(), chunkId, lid);
- _current->append(lid, buffer, sz);
- _where[bucketId.toKey()].push_back(idx);
+ _current->append(lid, data);
+ _storeIndex.store(Index(bucketId, _current->getId(), chunkId, lid));
}
Chunk::UP
@@ -54,8 +53,8 @@ StoreByBucket::createChunk()
}
size_t
-StoreByBucket::getChunkCount() const {
- std::lock_guard guard(*_lock);
+StoreByBucket::getChunkCount() const noexcept {
+ std::lock_guard guard(_lock);
return _chunks.size();
}
@@ -66,36 +65,40 @@ StoreByBucket::closeChunk(Chunk::UP chunk)
chunk->pack(1, buffer, _compression);
buffer.shrink(buffer.getDataLen());
ConstBufferRef bufferRef(_backingMemory.push_back(buffer.getData(), buffer.getDataLen()).data(), buffer.getDataLen());
- std::lock_guard guard(*_lock);
+ std::lock_guard guard(_lock);
_chunks[chunk->getId()] = bufferRef;
if (_numChunksPosted == _chunks.size()) {
- _cond->notify_one();
+ _cond.notify_one();
}
}
void
StoreByBucket::incChunksPosted() {
- std::lock_guard guard(*_lock);
+ std::lock_guard guard(_lock);
_numChunksPosted++;
}
void
StoreByBucket::waitAllProcessed() {
- std::unique_lock guard(*_lock);
+ std::unique_lock guard(_lock);
while (_numChunksPosted != _chunks.size()) {
- _cond->wait(guard);
+ _cond.wait(guard);
}
}
void
-StoreByBucket::drain(IWrite & drainer)
-{
+StoreByBucket::close() {
incChunksPosted();
auto task = makeLambdaTask([this, chunk=std::move(_current)]() mutable {
closeChunk(std::move(chunk));
});
_executor.execute(CpuUsage::wrap(std::move(task), CpuUsage::Category::COMPACT));
waitAllProcessed();
+}
+
+void
+StoreByBucket::drain(IWrite & drainer, IndexIterator & indexIterator)
+{
std::vector<Chunk::UP> chunks;
chunks.resize(_chunks.size());
for (const auto & it : _chunks) {
@@ -103,12 +106,10 @@ StoreByBucket::drain(IWrite & drainer)
chunks[it.first] = std::make_unique<Chunk>(it.first, buf.data(), buf.size());
}
_chunks.clear();
- for (auto & it : _where) {
- std::sort(it.second.begin(), it.second.end());
- for (Index idx : it.second) {
- vespalib::ConstBufferRef data(chunks[idx._id]->getLid(idx._lid));
- drainer.write(idx._bucketId, idx._chunkId, idx._lid, data.c_str(), data.size());
- }
+ while (indexIterator.has_next()) {
+ Index idx = indexIterator.next();
+ vespalib::ConstBufferRef data(chunks[idx._localChunkId]->getLid(idx._lid));
+ drainer.write(idx._bucketId, idx._chunkId, idx._lid, data);
}
}