diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2019-08-25 13:53:41 +0200 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2019-08-25 13:53:41 +0200 |
commit | 3416fd10a13080811f32765eb3f9b3857d3d8670 (patch) | |
tree | 1f07db5d9709da51aa55b4ca71f71f64cf1804a1 /searchlib | |
parent | 3be112024a96ea88f92da9e5d984dee282a5999c (diff) |
Use explicit sync, as sync on shared executor is not very wise.
Diffstat (limited to 'searchlib')
-rw-r--r-- | searchlib/src/vespa/searchlib/docstore/storebybucket.cpp | 26 | ||||
-rw-r--r-- | searchlib/src/vespa/searchlib/docstore/storebybucket.h | 9 |
2 files changed, 28 insertions, 7 deletions
diff --git a/searchlib/src/vespa/searchlib/docstore/storebybucket.cpp b/searchlib/src/vespa/searchlib/docstore/storebybucket.cpp index d0d2d1cdac2..85eccb6b728 100644 --- a/searchlib/src/vespa/searchlib/docstore/storebybucket.cpp +++ b/searchlib/src/vespa/searchlib/docstore/storebybucket.cpp @@ -19,13 +19,14 @@ StoreByBucket::StoreByBucket(MemoryDataStore & backingMemory, ThreadExecutor & e _backingMemory(backingMemory), _executor(executor), _lock(), + _allInFlight(false), _chunks(), _compression(compression) { createChunk().swap(_current); } -StoreByBucket::~StoreByBucket() { } +StoreByBucket::~StoreByBucket() = default; void StoreByBucket::add(BucketId bucketId, uint32_t chunkId, uint32_t lid, const void *buffer, size_t sz) @@ -33,6 +34,7 @@ StoreByBucket::add(BucketId bucketId, uint32_t chunkId, uint32_t lid, const void if ( ! _current->hasRoom(sz)) { Chunk::UP tmpChunk = createChunk(); _current.swap(tmpChunk); + _inflight _executor.execute(makeTask(makeClosure(this, &StoreByBucket::closeChunk, std::move(tmpChunk)))); } Index idx(bucketId, _current->getId(), chunkId, lid); @@ -59,15 +61,33 @@ StoreByBucket::closeChunk(Chunk::UP chunk) chunk->pack(1, buffer, _compression); buffer.shrink(buffer.getDataLen()); ConstBufferRef bufferRef(_backingMemory.push_back(buffer.getData(), buffer.getDataLen()).data(), buffer.getDataLen()); - vespalib::LockGuard guard(_lock); + vespalib::MonitorGuard guard(_monitor); _chunks[chunk->getId()] = bufferRef; + if (_inFlight == _chunks.size()) { + guard.signal(); + } +} + +void +StoreByBucket::incInFlight() { + vespalib::MonitorGuard guard(_monitor); + _inFlight++; +} + +void +StoreByBucket::waitAllProcessed() { + vespalib::MonitorGuard guard(_monitor); + while (_inFlight != _chunks.size()) { + guard.wait(); + } } void StoreByBucket::drain(IWrite & drainer) { + incInFlight(); _executor.execute(makeTask(makeClosure(this, &StoreByBucket::closeChunk, std::move(_current)))); - _executor.sync(); + waitAllProcessed(); std::vector<Chunk::UP> chunks; chunks.resize(_chunks.size()); for (const auto & it : _chunks) { diff --git a/searchlib/src/vespa/searchlib/docstore/storebybucket.h b/searchlib/src/vespa/searchlib/docstore/storebybucket.h index ac1f6fbe007..ed37f05500a 100644 --- a/searchlib/src/vespa/searchlib/docstore/storebybucket.h +++ b/searchlib/src/vespa/searchlib/docstore/storebybucket.h @@ -10,8 +10,7 @@ #include <vespa/vespalib/stllike/hash_map.h> #include <map> -namespace search { -namespace docstore { +namespace search::docstore { /** * StoreByBucket will organize the data you add to it by buckets. @@ -47,6 +46,8 @@ public: return lidCount; } private: + void incInFlight(); + void waitAllProcessed(); Chunk::UP createChunk(); void closeChunk(Chunk::UP chunk); struct Index { @@ -68,10 +69,10 @@ private: std::map<uint64_t, IndexVector> _where; MemoryDataStore & _backingMemory; ThreadExecutor & _executor; - vespalib::Lock _lock; + vespalib::Monitor _monitor; + int64_t _inFlight; vespalib::hash_map<uint64_t, ConstBufferRef> _chunks; CompressionConfig _compression; }; } -} |