summaryrefslogtreecommitdiffstats
path: root/searchlib
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2019-08-25 13:53:41 +0200
committerHenning Baldersheim <balder@yahoo-inc.com>2019-08-25 13:53:41 +0200
commit3416fd10a13080811f32765eb3f9b3857d3d8670 (patch)
tree1f07db5d9709da51aa55b4ca71f71f64cf1804a1 /searchlib
parent3be112024a96ea88f92da9e5d984dee282a5999c (diff)
Use explicit sync, as sync on shared executor is not very wise.
Diffstat (limited to 'searchlib')
-rw-r--r--searchlib/src/vespa/searchlib/docstore/storebybucket.cpp26
-rw-r--r--searchlib/src/vespa/searchlib/docstore/storebybucket.h9
2 files changed, 28 insertions, 7 deletions
diff --git a/searchlib/src/vespa/searchlib/docstore/storebybucket.cpp b/searchlib/src/vespa/searchlib/docstore/storebybucket.cpp
index d0d2d1cdac2..85eccb6b728 100644
--- a/searchlib/src/vespa/searchlib/docstore/storebybucket.cpp
+++ b/searchlib/src/vespa/searchlib/docstore/storebybucket.cpp
@@ -19,13 +19,14 @@ StoreByBucket::StoreByBucket(MemoryDataStore & backingMemory, ThreadExecutor & e
_backingMemory(backingMemory),
_executor(executor),
_lock(),
+ _allInFlight(false),
_chunks(),
_compression(compression)
{
createChunk().swap(_current);
}
-StoreByBucket::~StoreByBucket() { }
+StoreByBucket::~StoreByBucket() = default;
void
StoreByBucket::add(BucketId bucketId, uint32_t chunkId, uint32_t lid, const void *buffer, size_t sz)
@@ -33,6 +34,7 @@ StoreByBucket::add(BucketId bucketId, uint32_t chunkId, uint32_t lid, const void
if ( ! _current->hasRoom(sz)) {
Chunk::UP tmpChunk = createChunk();
_current.swap(tmpChunk);
+ _inflight
_executor.execute(makeTask(makeClosure(this, &StoreByBucket::closeChunk, std::move(tmpChunk))));
}
Index idx(bucketId, _current->getId(), chunkId, lid);
@@ -59,15 +61,33 @@ StoreByBucket::closeChunk(Chunk::UP chunk)
chunk->pack(1, buffer, _compression);
buffer.shrink(buffer.getDataLen());
ConstBufferRef bufferRef(_backingMemory.push_back(buffer.getData(), buffer.getDataLen()).data(), buffer.getDataLen());
- vespalib::LockGuard guard(_lock);
+ vespalib::MonitorGuard guard(_monitor);
_chunks[chunk->getId()] = bufferRef;
+ if (_inFlight == _chunks.size()) {
+ guard.signal();
+ }
+}
+
+void
+StoreByBucket::incInFlight() {
+ vespalib::MonitorGuard guard(_monitor);
+ _inFlight++;
+}
+
+void
+StoreByBucket::waitAllProcessed() {
+ vespalib::MonitorGuard guard(_monitor);
+ while (_inFlight != _chunks.size()) {
+ guard.wait();
+ }
}
void
StoreByBucket::drain(IWrite & drainer)
{
+ incInFlight();
_executor.execute(makeTask(makeClosure(this, &StoreByBucket::closeChunk, std::move(_current))));
- _executor.sync();
+ waitAllProcessed();
std::vector<Chunk::UP> chunks;
chunks.resize(_chunks.size());
for (const auto & it : _chunks) {
diff --git a/searchlib/src/vespa/searchlib/docstore/storebybucket.h b/searchlib/src/vespa/searchlib/docstore/storebybucket.h
index ac1f6fbe007..ed37f05500a 100644
--- a/searchlib/src/vespa/searchlib/docstore/storebybucket.h
+++ b/searchlib/src/vespa/searchlib/docstore/storebybucket.h
@@ -10,8 +10,7 @@
#include <vespa/vespalib/stllike/hash_map.h>
#include <map>
-namespace search {
-namespace docstore {
+namespace search::docstore {
/**
* StoreByBucket will organize the data you add to it by buckets.
@@ -47,6 +46,8 @@ public:
return lidCount;
}
private:
+ void incInFlight();
+ void waitAllProcessed();
Chunk::UP createChunk();
void closeChunk(Chunk::UP chunk);
struct Index {
@@ -68,10 +69,10 @@ private:
std::map<uint64_t, IndexVector> _where;
MemoryDataStore & _backingMemory;
ThreadExecutor & _executor;
- vespalib::Lock _lock;
+ vespalib::Monitor _monitor;
+ int64_t _inFlight;
vespalib::hash_map<uint64_t, ConstBufferRef> _chunks;
CompressionConfig _compression;
};
}
-}