aboutsummaryrefslogtreecommitdiffstats
path: root/searchlib/src
diff options
context:
space:
mode:
Diffstat (limited to 'searchlib/src')
-rw-r--r--searchlib/src/tests/docstore/store_by_bucket/store_by_bucket_test.cpp85
-rw-r--r--searchlib/src/vespa/searchlib/docstore/compacter.cpp103
-rw-r--r--searchlib/src/vespa/searchlib/docstore/compacter.h56
-rw-r--r--searchlib/src/vespa/searchlib/docstore/storebybucket.cpp31
-rw-r--r--searchlib/src/vespa/searchlib/docstore/storebybucket.h62
5 files changed, 239 insertions, 98 deletions
diff --git a/searchlib/src/tests/docstore/store_by_bucket/store_by_bucket_test.cpp b/searchlib/src/tests/docstore/store_by_bucket/store_by_bucket_test.cpp
index 2361ab90e2a..d6dde433365 100644
--- a/searchlib/src/tests/docstore/store_by_bucket/store_by_bucket_test.cpp
+++ b/searchlib/src/tests/docstore/store_by_bucket/store_by_bucket_test.cpp
@@ -4,7 +4,7 @@
#include <vespa/document/bucket/bucketid.h>
#include <vespa/document/base/documentid.h>
-#include <vespa/searchlib/docstore/storebybucket.h>
+#include <vespa/searchlib/docstore/compacter.h>
#include <vespa/vespalib/stllike/asciistream.h>
#include <vespa/vespalib/stllike/hash_set.h>
#include <vespa/vespalib/util/threadstackexecutor.h>
@@ -25,8 +25,8 @@ createPayload(BucketId b) {
}
uint32_t userId(size_t i) { return i%100; }
-void
-add(StoreByBucket & sbb, size_t i) {
+BucketId
+createBucketId(size_t i) {
constexpr size_t USED_BITS=5;
vespalib::asciistream os;
os << "id:a:b:n=" << userId(i) << ":" << i;
@@ -34,13 +34,19 @@ add(StoreByBucket & sbb, size_t i) {
BucketId b = docId.getGlobalId().convertToBucketId();
EXPECT_EQUAL(userId(i), docId.getGlobalId().getLocationSpecificBits());
b.setUsedBits(USED_BITS);
+ return b;
+}
+void
+add(StoreByBucket & sbb, size_t i) {
+ BucketId b = createBucketId(i);
vespalib::string s = createPayload(b);
sbb.add(b, i%10, i, {s.c_str(), s.size()});
}
class VerifyBucketOrder : public StoreByBucket::IWrite {
public:
- VerifyBucketOrder() : _lastLid(0), _lastBucketId(0), _uniqueUser(), _uniqueBucket() { }
+ VerifyBucketOrder() : _lastLid(0), _lastBucketId(0), _uniqueUser(), _uniqueBucket(){ }
+ ~VerifyBucketOrder() override;
void write(BucketId bucketId, uint32_t chunkId, uint32_t lid, vespalib::ConstBufferRef data) override {
(void) chunkId;
EXPECT_LESS_EQUAL(_lastBucketId.toKey(), bucketId.toKey());
@@ -56,33 +62,90 @@ public:
_lastBucketId = bucketId;
EXPECT_EQUAL(0, memcmp(data.data(), createPayload(bucketId).c_str(), data.size()));
}
- ~VerifyBucketOrder() override;
+
private:
uint32_t _lastLid;
BucketId _lastBucketId;
vespalib::hash_set<uint32_t> _uniqueUser;
vespalib::hash_set<uint64_t> _uniqueBucket;
+
};
VerifyBucketOrder::~VerifyBucketOrder() = default;
+struct StoreIndex : public StoreByBucket::StoreIndex {
+ ~StoreIndex() override;
+ void store(const StoreByBucket::Index &index) override {
+ _where.push_back(index);
+ }
+ std::vector<StoreByBucket::Index> _where;
+};
+StoreIndex::~StoreIndex() = default;
+
+struct Iterator : public StoreByBucket::IndexIterator {
+ explicit Iterator(const std::vector<StoreByBucket::Index> & where) : _where(where), _current(0) {}
+
+ bool has_next() noexcept override {
+ return _current < _where.size();
+ }
+
+ StoreByBucket::Index next() noexcept override {
+ return _where[_current++];
+ }
+
+ const std::vector<StoreByBucket::Index> & _where;
+ uint32_t _current;
+};
+
TEST("require that StoreByBucket gives bucket by bucket and ordered within")
{
std::mutex backing_lock;
vespalib::MemoryDataStore backing(vespalib::alloc::Alloc::alloc(256), &backing_lock);
vespalib::ThreadStackExecutor executor(8);
- StoreByBucket sbb(backing, executor, CompressionConfig::LZ4);
- for (size_t i(1); i <=500; i++) {
+ StoreIndex storeIndex;
+ StoreByBucket sbb(storeIndex, backing, executor, CompressionConfig::LZ4);
+ for (size_t i(1); i <= 500u; i++) {
add(sbb, i);
}
- for (size_t i(1000); i > 500; i--) {
+ for (size_t i(1000); i > 500u; i--) {
add(sbb, i);
}
sbb.close();
- EXPECT_EQUAL(32u, sbb.getBucketCount());
- EXPECT_EQUAL(1000u, sbb.getLidCount());
+ std::sort(storeIndex._where.begin(), storeIndex._where.end());
+ EXPECT_EQUAL(1000u, storeIndex._where.size());
VerifyBucketOrder vbo;
- sbb.drain(vbo);
+ Iterator all(storeIndex._where);
+ sbb.drain(vbo, all);
+}
+
+constexpr uint32_t NUM_PARTS = 3;
+
+void
+verifyIter(BucketIndexStore &store, uint32_t partId, uint32_t expected_count) {
+ auto iter = store.createIterator(partId);
+ uint32_t count(0);
+ while (iter->has_next()) {
+ StoreByBucket::Index idx = iter->next();
+ EXPECT_EQUAL(store.toPartitionId(idx._bucketId), partId);
+ count++;
+ }
+ EXPECT_EQUAL(expected_count, count);
+}
+
+TEST("test that iterators cover the whole corpus and maps to correct partid") {
+
+ BucketIndexStore bucketIndexStore(32, NUM_PARTS);
+ for (size_t i(1); i <= 500u; i++) {
+ bucketIndexStore.store(StoreByBucket::Index(createBucketId(i), 1, 2, i));
+ }
+ bucketIndexStore.prepareForIterate();
+ EXPECT_EQUAL(500u, bucketIndexStore.getLidCount());
+ EXPECT_EQUAL(32u, bucketIndexStore.getBucketCount());
+ constexpr uint32_t COUNT_0 = 175, COUNT_1 = 155, COUNT_2 = 170;
+ verifyIter(bucketIndexStore, 0, COUNT_0);
+ verifyIter(bucketIndexStore, 1, COUNT_1);
+ verifyIter(bucketIndexStore, 2, COUNT_2);
+ EXPECT_EQUAL(500u, COUNT_0 + COUNT_1 + COUNT_2);
}
TEST_MAIN() { TEST_RUN_ALL(); }
diff --git a/searchlib/src/vespa/searchlib/docstore/compacter.cpp b/searchlib/src/vespa/searchlib/docstore/compacter.cpp
index 693b04bb96e..91faafc2a4e 100644
--- a/searchlib/src/vespa/searchlib/docstore/compacter.cpp
+++ b/searchlib/src/vespa/searchlib/docstore/compacter.cpp
@@ -4,6 +4,7 @@
#include "logdatastore.h"
#include <vespa/vespalib/util/size_literals.h>
#include <vespa/vespalib/util/array.hpp>
+#include <cassert>
#include <vespa/log/log.h>
LOG_SETUP(".searchlib.docstore.compacter");
@@ -13,7 +14,7 @@ namespace search::docstore {
using vespalib::alloc::Alloc;
namespace {
- static constexpr size_t INITIAL_BACKING_BUFFER_SIZE = 64_Mi;
+ constexpr size_t INITIAL_BACKING_BUFFER_SIZE = 64_Mi;
}
void
@@ -23,24 +24,84 @@ Compacter::write(LockGuard guard, uint32_t chunkId, uint32_t lid, ConstBufferRef
_ds.write(std::move(guard), fileId, lid, data);
}
+BucketIndexStore::BucketIndexStore(size_t maxSignificantBucketBits, uint32_t numPartitions) noexcept
+ : _inSignificantBucketBits((maxSignificantBucketBits > 8) ? (maxSignificantBucketBits - 8) : 0),
+ _where(),
+ _numPartitions(numPartitions),
+ _readyForIterate(true)
+{}
+BucketIndexStore::~BucketIndexStore() = default;
+
+void
+BucketIndexStore::prepareForIterate() {
+ std::sort(_where.begin(), _where.end());
+ _readyForIterate = true;
+}
+
+void
+BucketIndexStore::store(const StoreByBucket::Index & index) {
+ _where.push_back(index);
+ _readyForIterate = false;
+}
+
+size_t
+BucketIndexStore::getBucketCount() const noexcept {
+ if (_where.empty()) return 0;
+
+ size_t count = 0;
+ document::BucketId prev = _where.front()._bucketId;
+ for (const auto & lid : _where) {
+ if (lid._bucketId != prev) {
+ count++;
+ prev = lid._bucketId;
+ }
+ }
+ return count + 1;
+}
+
+std::unique_ptr<StoreByBucket::IndexIterator>
+BucketIndexStore::createIterator(uint32_t partitionId) const {
+ assert(_readyForIterate);
+ return std::make_unique<LidIterator>(*this, partitionId);
+}
+
+BucketIndexStore::LidIterator::LidIterator(const BucketIndexStore & store, size_t partitionId)
+ : _store(store),
+ _partitionId(partitionId),
+ _current(_store._where.begin())
+{}
+
+bool
+BucketIndexStore::LidIterator::has_next() noexcept {
+ for (;(_current != _store._where.end()) && (_store.toPartitionId(_current->_bucketId) != _partitionId); _current++);
+ return (_current != _store._where.end()) && (_store.toPartitionId(_current->_bucketId) == _partitionId);
+}
+
+StoreByBucket::Index
+BucketIndexStore::LidIterator::next() noexcept {
+ return *_current++;
+}
+
BucketCompacter::BucketCompacter(size_t maxSignificantBucketBits, CompressionConfig compression, LogDataStore & ds,
- Executor & executor, const IBucketizer & bucketizer, FileId source, FileId destination) :
- _unSignificantBucketBits((maxSignificantBucketBits > 8) ? (maxSignificantBucketBits - 8) : 0),
- _sourceFileId(source),
- _destinationFileId(destination),
- _ds(ds),
- _bucketizer(bucketizer),
- _lock(),
- _backingMemory(Alloc::alloc(INITIAL_BACKING_BUFFER_SIZE), &_lock),
- _tmpStore(),
- _lidGuard(ds.getLidReadGuard()),
- _stat()
+ Executor & executor, const IBucketizer & bucketizer, FileId source, FileId destination)
+ : _sourceFileId(source),
+ _destinationFileId(destination),
+ _ds(ds),
+ _bucketizer(bucketizer),
+ _lock(),
+ _backingMemory(Alloc::alloc(INITIAL_BACKING_BUFFER_SIZE), &_lock),
+ _bucketIndexStore(maxSignificantBucketBits, NUM_PARTITIONS),
+ _tmpStore(),
+ _lidGuard(ds.getLidReadGuard()),
+ _stat()
{
for (auto & partition : _tmpStore) {
- partition = std::make_unique<StoreByBucket>(_backingMemory, executor, compression);
+ partition = std::make_unique<StoreByBucket>(_bucketIndexStore, _backingMemory, executor, compression);
}
}
+BucketCompacter::~BucketCompacter() = default;
+
FileChunk::FileId
BucketCompacter::getDestinationId(const LockGuard & guard) const {
return (_destinationFileId.isActive()) ? _ds.getActiveFileId(guard) : _destinationFileId;
@@ -51,28 +112,24 @@ BucketCompacter::write(LockGuard guard, uint32_t chunkId, uint32_t lid, ConstBuf
{
guard.unlock();
BucketId bucketId = (data.size() > 0) ? _bucketizer.getBucketOf(_bucketizer.getGuard(), lid) : BucketId();
- uint64_t sortableBucketId = bucketId.toKey();
- _tmpStore[(sortableBucketId >> _unSignificantBucketBits) % _tmpStore.size()]->add(bucketId, chunkId, lid, data);
+ _tmpStore[_bucketIndexStore.toPartitionId(bucketId)]->add(bucketId, chunkId, lid, data);
}
void
BucketCompacter::close()
{
- size_t lidCount1(0);
- size_t bucketCount(0);
size_t chunkCount(0);
for (const auto & store : _tmpStore) {
store->close();
- lidCount1 += store->getLidCount();
- bucketCount += store->getBucketCount();
chunkCount += store->getChunkCount();
}
+ _bucketIndexStore.prepareForIterate();
LOG(info, "Have read %ld lids and placed them in %ld buckets. Temporary compressed in %ld chunks.",
- lidCount1, bucketCount, chunkCount);
+ _bucketIndexStore.getLidCount(), _bucketIndexStore.getBucketCount(), chunkCount);
- for (auto & store_ref : _tmpStore) {
- auto store = std::move(store_ref);
- store->drain(*this);
+ for (size_t partId(0); partId < _tmpStore.size(); partId++) {
+ auto partIterator = _bucketIndexStore.createIterator(partId);
+ _tmpStore[partId]->drain(*this, *partIterator);
}
// All partitions using _backingMemory should be destructed before clearing.
_backingMemory.clear();
diff --git a/searchlib/src/vespa/searchlib/docstore/compacter.h b/searchlib/src/vespa/searchlib/docstore/compacter.h
index 880340950e9..f44d7c341d2 100644
--- a/searchlib/src/vespa/searchlib/docstore/compacter.h
+++ b/searchlib/src/vespa/searchlib/docstore/compacter.h
@@ -16,21 +16,52 @@ namespace search::docstore {
class Compacter : public IWriteData
{
public:
- Compacter(LogDataStore & ds) : _ds(ds) { }
+ explicit Compacter(LogDataStore & ds) : _ds(ds) { }
void write(LockGuard guard, uint32_t chunkId, uint32_t lid, ConstBufferRef data) override;
void close() override { }
-
private:
LogDataStore & _ds;
};
+class BucketIndexStore : public StoreByBucket::StoreIndex {
+public:
+ BucketIndexStore(size_t maxSignificantBucketBits, uint32_t numPartitions) noexcept;
+ ~BucketIndexStore() override;
+ size_t toPartitionId(document::BucketId bucketId) const noexcept {
+ uint64_t sortableBucketId = bucketId.toKey();
+ return (sortableBucketId >> _inSignificantBucketBits) % _numPartitions;
+ }
+ void store(const StoreByBucket::Index & index) override;
+ size_t getBucketCount() const noexcept;
+ size_t getLidCount() const noexcept { return _where.size(); }
+ void prepareForIterate();
+ std::unique_ptr<StoreByBucket::IndexIterator> createIterator(uint32_t partitionId) const;
+private:
+ using IndexVector = std::vector<StoreByBucket::Index, vespalib::allocator_large<StoreByBucket::Index>>;
+ class LidIterator : public StoreByBucket::IndexIterator {
+ public:
+ LidIterator(const BucketIndexStore & bc, size_t partitionId);
+ bool has_next() noexcept override;
+ StoreByBucket::Index next() noexcept override;
+ private:
+ const BucketIndexStore & _store;
+ size_t _partitionId;
+ IndexVector::const_iterator _current;
+ };
+ size_t _inSignificantBucketBits;
+ IndexVector _where;
+ uint32_t _numPartitions;
+ bool _readyForIterate;
+};
+
/**
* This will split the incoming data into buckets.
* The buckets data will then be written out in bucket order.
* The buckets will be ordered, and the objects inside the buckets will be further ordered.
* All data are kept compressed to minimize memory usage.
**/
-class BucketCompacter : public IWriteData, public StoreByBucket::IWrite
+class BucketCompacter : public IWriteData,
+ public StoreByBucket::IWrite
{
using CompressionConfig = vespalib::compression::CompressionConfig;
using Executor = vespalib::Executor;
@@ -38,6 +69,7 @@ public:
using FileId = FileChunk::FileId;
BucketCompacter(size_t maxSignificantBucketBits, CompressionConfig compression, LogDataStore & ds,
Executor & executor, const IBucketizer & bucketizer, FileId source, FileId destination);
+ ~BucketCompacter() override;
void write(LockGuard guard, uint32_t chunkId, uint32_t lid, ConstBufferRef data) override;
void write(BucketId bucketId, uint32_t chunkId, uint32_t lid, ConstBufferRef data) override;
void close() override;
@@ -46,15 +78,15 @@ private:
using GenerationHandler = vespalib::GenerationHandler;
using Partitions = std::array<std::unique_ptr<StoreByBucket>, NUM_PARTITIONS>;
FileId getDestinationId(const LockGuard & guard) const;
- size_t _unSignificantBucketBits;
- FileId _sourceFileId;
- FileId _destinationFileId;
- LogDataStore & _ds;
- const IBucketizer & _bucketizer;
- std::mutex _lock;
- vespalib::MemoryDataStore _backingMemory;
- Partitions _tmpStore;
- GenerationHandler::Guard _lidGuard;
+ FileId _sourceFileId;
+ FileId _destinationFileId;
+ LogDataStore & _ds;
+ const IBucketizer & _bucketizer;
+ std::mutex _lock;
+ vespalib::MemoryDataStore _backingMemory;
+ BucketIndexStore _bucketIndexStore;
+ Partitions _tmpStore;
+ GenerationHandler::Guard _lidGuard;
vespalib::hash_map<uint64_t, uint32_t> _stat;
};
diff --git a/searchlib/src/vespa/searchlib/docstore/storebybucket.cpp b/searchlib/src/vespa/searchlib/docstore/storebybucket.cpp
index 9d77698ff6c..f26afc595f3 100644
--- a/searchlib/src/vespa/searchlib/docstore/storebybucket.cpp
+++ b/searchlib/src/vespa/searchlib/docstore/storebybucket.cpp
@@ -13,10 +13,10 @@ using document::BucketId;
using vespalib::CpuUsage;
using vespalib::makeLambdaTask;
-StoreByBucket::StoreByBucket(MemoryDataStore & backingMemory, Executor & executor, CompressionConfig compression) noexcept
+StoreByBucket::StoreByBucket(StoreIndex & storeIndex, MemoryDataStore & backingMemory, Executor & executor, CompressionConfig compression) noexcept
: _chunkSerial(0),
_current(),
- _where(),
+ _storeIndex(storeIndex),
_backingMemory(backingMemory),
_executor(executor),
_lock(),
@@ -43,7 +43,7 @@ StoreByBucket::add(BucketId bucketId, uint32_t chunkId, uint32_t lid, ConstBuffe
_executor.execute(CpuUsage::wrap(std::move(task), CpuUsage::Category::COMPACT));
}
_current->append(lid, data);
- _where.emplace_back(bucketId, _current->getId(), chunkId, lid);
+ _storeIndex.store(Index(bucketId, _current->getId(), chunkId, lid));
}
Chunk::UP
@@ -53,7 +53,7 @@ StoreByBucket::createChunk()
}
size_t
-StoreByBucket::getChunkCount() const {
+StoreByBucket::getChunkCount() const noexcept {
std::lock_guard guard(_lock);
return _chunks.size();
}
@@ -94,26 +94,10 @@ StoreByBucket::close() {
});
_executor.execute(CpuUsage::wrap(std::move(task), CpuUsage::Category::COMPACT));
waitAllProcessed();
- std::sort(_where.begin(), _where.end());
-}
-
-size_t
-StoreByBucket::getBucketCount() const {
- if (_where.empty()) return 0;
-
- size_t count = 0;
- BucketId prev = _where.front()._bucketId;
- for (const auto & lid : _where) {
- if (lid._bucketId != prev) {
- count++;
- prev = lid._bucketId;
- }
- }
- return count + 1;
}
void
-StoreByBucket::drain(IWrite & drainer)
+StoreByBucket::drain(IWrite & drainer, IndexIterator & indexIterator)
{
std::vector<Chunk::UP> chunks;
chunks.resize(_chunks.size());
@@ -122,8 +106,9 @@ StoreByBucket::drain(IWrite & drainer)
chunks[it.first] = std::make_unique<Chunk>(it.first, buf.data(), buf.size());
}
_chunks.clear();
- for (auto & idx : _where) {
- vespalib::ConstBufferRef data(chunks[idx._id]->getLid(idx._lid));
+ while (indexIterator.has_next()) {
+ Index idx = indexIterator.next();
+ vespalib::ConstBufferRef data(chunks[idx._localChunkId]->getLid(idx._lid));
drainer.write(idx._bucketId, idx._chunkId, idx._lid, data);
}
}
diff --git a/searchlib/src/vespa/searchlib/docstore/storebybucket.h b/searchlib/src/vespa/searchlib/docstore/storebybucket.h
index 4f51a7970e7..ea1c6e766e0 100644
--- a/searchlib/src/vespa/searchlib/docstore/storebybucket.h
+++ b/searchlib/src/vespa/searchlib/docstore/storebybucket.h
@@ -23,51 +23,55 @@ class StoreByBucket
using ConstBufferRef = vespalib::ConstBufferRef;
using CompressionConfig = vespalib::compression::CompressionConfig;
public:
- StoreByBucket(MemoryDataStore & backingMemory, Executor & executor, CompressionConfig compression) noexcept;
+ struct Index {
+ using BucketId=document::BucketId;
+ Index(BucketId bucketId, uint32_t localChunkId, uint32_t chunkId, uint32_t entry) noexcept :
+ _bucketId(bucketId), _localChunkId(localChunkId), _chunkId(chunkId), _lid(entry)
+ { }
+ bool operator < (const Index & b) const noexcept {
+ return BucketId::bucketIdToKey(_bucketId.getRawId()) < BucketId::bucketIdToKey(b._bucketId.getRawId());
+ }
+ BucketId _bucketId;
+ uint32_t _localChunkId;
+ uint32_t _chunkId;
+ uint32_t _lid;
+ };
+ using IndexVector = std::vector<Index, vespalib::allocator_large<Index>>;
+ struct IWrite {
+ using BucketId=document::BucketId;
+ virtual ~IWrite() = default;
+ virtual void write(BucketId bucketId, uint32_t chunkId, uint32_t lid, ConstBufferRef data) = 0;
+ };
+ struct IndexIterator {
+ virtual ~IndexIterator() = default;
+ virtual bool has_next() noexcept = 0;
+ virtual Index next() noexcept = 0;
+ };
+ struct StoreIndex {
+ virtual ~StoreIndex() = default;
+ virtual void store(const Index & index) = 0;
+ };
+ StoreByBucket(StoreIndex & storeIndex, MemoryDataStore & backingMemory,
+ Executor & executor, CompressionConfig compression) noexcept;
//TODO Putting the below move constructor into cpp file fails for some unknown reason. Needs to be resolved.
StoreByBucket(StoreByBucket &&) noexcept = delete;
StoreByBucket(const StoreByBucket &) = delete;
StoreByBucket & operator=(StoreByBucket &&) noexcept = delete;
StoreByBucket & operator = (const StoreByBucket &) = delete;
~StoreByBucket();
- class IWrite {
- public:
- using BucketId=document::BucketId;
- virtual ~IWrite() = default;
- virtual void write(BucketId bucketId, uint32_t chunkId, uint32_t lid, ConstBufferRef data) = 0;
- };
void add(document::BucketId bucketId, uint32_t chunkId, uint32_t lid, ConstBufferRef data);
void close();
/// close() must have been called prior to calling getBucketCount() or drain()
- void drain(IWrite & drain);
- size_t getBucketCount() const;
-
- size_t getChunkCount() const;
- size_t getLidCount() const {
- return _where.size();
- }
+ void drain(IWrite & drain, IndexIterator & iterator);
+ size_t getChunkCount() const noexcept;
private:
void incChunksPosted();
void waitAllProcessed();
Chunk::UP createChunk();
void closeChunk(Chunk::UP chunk);
- struct Index {
- using BucketId=document::BucketId;
- Index(BucketId bucketId, uint32_t id, uint32_t chunkId, uint32_t entry) noexcept :
- _bucketId(bucketId), _id(id), _chunkId(chunkId), _lid(entry)
- { }
- bool operator < (const Index & b) const noexcept {
- return BucketId::bucketIdToKey(_bucketId.getRawId()) < BucketId::bucketIdToKey(b._bucketId.getRawId());
- }
- BucketId _bucketId;
- uint32_t _id;
- uint32_t _chunkId;
- uint32_t _lid;
- };
- using IndexVector = std::vector<Index, vespalib::allocator_large<Index>>;
uint64_t _chunkSerial;
Chunk::UP _current;
- IndexVector _where;
+ StoreIndex & _storeIndex;
MemoryDataStore & _backingMemory;
Executor & _executor;
mutable std::mutex _lock;