diff options
author | Henning Baldersheim <balder@oath.com> | 2018-06-17 22:35:07 +0200 |
---|---|---|
committer | Henning Baldersheim <balder@oath.com> | 2018-06-17 22:35:07 +0200 |
commit | 8926abb4432ca8ab0ccae3ee6e9b4f376d37ecbf (patch) | |
tree | 83d4f7131db9fe5b52da1c2cb7f713f034da4875 /searchcore | |
parent | 8076e0074b86d5d4e44bc459593ab1b54f744a7c (diff) |
- Remove the execute(string, ...) and force the use of ExecutorId.
- Remove some double bookkeeping in AttributeWriter.
- Ensure that we always use attribute.getNamePrefix() to compute executor id.
Diffstat (limited to 'searchcore')
8 files changed, 50 insertions, 65 deletions
diff --git a/searchcore/src/tests/proton/attribute/exclusive_attribute_read_accessor/exclusive_attribute_read_accessor_test.cpp b/searchcore/src/tests/proton/attribute/exclusive_attribute_read_accessor/exclusive_attribute_read_accessor_test.cpp index 6d69d9b225b..c5ae0f97875 100644 --- a/searchcore/src/tests/proton/attribute/exclusive_attribute_read_accessor/exclusive_attribute_read_accessor_test.cpp +++ b/searchcore/src/tests/proton/attribute/exclusive_attribute_read_accessor/exclusive_attribute_read_accessor_test.cpp @@ -37,7 +37,7 @@ TEST_F("require that attribute write thread is blocked while guard is held", Fix { ReadGuard::UP guard = f.accessor.takeGuard(); Gate gate; - f.writer.execute("myattr", [&gate]() { gate.countDown(); }); + f.writer.execute(f.writer.getExecutorId(f.attribute->getNamePrefix()), [&gate]() { gate.countDown(); }); bool reachedZero = gate.await(100); EXPECT_FALSE(reachedZero); EXPECT_EQUAL(1u, gate.getCount()); diff --git a/searchcore/src/tests/proton/docsummary/docsummary.cpp b/searchcore/src/tests/proton/docsummary/docsummary.cpp index 7eae1c9d12d..b2bf768052b 100644 --- a/searchcore/src/tests/proton/docsummary/docsummary.cpp +++ b/searchcore/src/tests/proton/docsummary/docsummary.cpp @@ -829,20 +829,16 @@ Test::requireThatAttributesAreUsed() "bi:[]}", *rep, 1, false)); TEST_DO(assertTensor(Tensor::UP(), "bj", *rep, 1, rclass)); - proton::IAttributeManager::SP attributeManager = - dc._ddb->getReadySubDB()->getAttributeManager(); - search::ISequencedTaskExecutor &attributeFieldWriter = - attributeManager->getAttributeFieldWriter(); - search::AttributeVector *bjAttr = - attributeManager->getWritableAttribute("bj"); - search::tensor::TensorAttribute *bjTensorAttr = - dynamic_cast<search::tensor::TensorAttribute *>(bjAttr); - - attributeFieldWriter. - execute("bj", - [&]() { bjTensorAttr->setTensor(3, - *createTensor({ {{{"x", "a"},{"y", "b"}}, 4} }, { "x"})); - bjTensorAttr->commit(); }); + proton::IAttributeManager::SP attributeManager = dc._ddb->getReadySubDB()->getAttributeManager(); + search::ISequencedTaskExecutor &attributeFieldWriter = attributeManager->getAttributeFieldWriter(); + search::AttributeVector *bjAttr = attributeManager->getWritableAttribute("bj"); + auto bjTensorAttr = dynamic_cast<search::tensor::TensorAttribute *>(bjAttr); + + attributeFieldWriter.execute(attributeFieldWriter.getExecutorId(bjAttr->getNamePrefix()), + [&]() { + bjTensorAttr->setTensor(3, *createTensor({ {{{"x", "a"},{"y", "b"}}, 4} }, { "x"})); + bjTensorAttr->commit(); + }); attributeFieldWriter.sync(); DocsumReply::UP rep2 = dc._ddb->getDocsums(req); @@ -961,8 +957,7 @@ Test::requireThatUrisAreUsed() Document::UP exp = bc._bld.startDocument("doc::0"). startIndexField("urisingle"). startSubField("all"). - addUrlTokenizedString( - "http://www.example.com:81/fluke?ab=2#4"). + addUrlTokenizedString("http://www.example.com:81/fluke?ab=2#4"). endSubField(). startSubField("scheme"). addUrlTokenizedString("http"). @@ -986,8 +981,7 @@ Test::requireThatUrisAreUsed() startIndexField("uriarray"). startElement(1). startSubField("all"). - addUrlTokenizedString( - "http://www.example.com:82/fluke?ab=2#8"). + addUrlTokenizedString("http://www.example.com:82/fluke?ab=2#8"). endSubField(). startSubField("scheme"). addUrlTokenizedString("http"). @@ -1010,8 +1004,7 @@ Test::requireThatUrisAreUsed() endElement(). startElement(1). startSubField("all"). - addUrlTokenizedString( - "http://www.flickr.com:82/fluke?ab=2#9"). + addUrlTokenizedString("http://www.flickr.com:82/fluke?ab=2#9"). endSubField(). startSubField("scheme"). addUrlTokenizedString("http"). @@ -1036,8 +1029,7 @@ Test::requireThatUrisAreUsed() startIndexField("uriwset"). startElement(4). startSubField("all"). - addUrlTokenizedString( - "http://www.example.com:83/fluke?ab=2#12"). + addUrlTokenizedString("http://www.example.com:83/fluke?ab=2#12"). endSubField(). startSubField("scheme"). addUrlTokenizedString("http"). @@ -1060,8 +1052,7 @@ Test::requireThatUrisAreUsed() endElement(). startElement(7). startSubField("all"). - addUrlTokenizedString( - "http://www.flickr.com:85/fluke?ab=2#13"). + addUrlTokenizedString("http://www.flickr.com:85/fluke?ab=2#13"). endSubField(). startSubField("scheme"). addUrlTokenizedString("http"). diff --git a/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp b/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp index 352a9aa1bb8..2e9b512706c 100644 --- a/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp +++ b/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp @@ -419,7 +419,7 @@ AttributeWriter::setupWriteContexts() { std::vector<FieldContext> fieldContexts; assert(_writeContexts.empty()); - for (auto attr : _writableAttributes) { + for (auto attr : getWritableAttributes()) { fieldContexts.emplace_back(_attributeFieldWriter, attr); } std::sort(fieldContexts.begin(), fieldContexts.end()); @@ -476,19 +476,23 @@ AttributeWriter::internalRemove(SerialNum serialNum, DocumentIdT lid, bool immed AttributeWriter::AttributeWriter(const proton::IAttributeManager::SP &mgr) : _mgr(mgr), _attributeFieldWriter(mgr->getAttributeFieldWriter()), - _writableAttributes(mgr->getWritableAttributes()), _writeContexts(), _dataType(nullptr), _hasStructFieldAttribute(false), _attrMap() { setupWriteContexts(); + setupAttriuteMapping(); +} + +void AttributeWriter::setupAttriuteMapping() { for (auto attr : getWritableAttributes()) { vespalib::stringref name = attr->getName(); _attrMap[name] = AttrWithId(attr, _attributeFieldWriter.getExecutorId(getPrefix(name))); - } + } } + AttributeWriter::~AttributeWriter() { _attributeFieldWriter.sync(); @@ -585,11 +589,10 @@ AttributeWriter::update(SerialNum serialNum, const DocumentUpdate &upd, Document void AttributeWriter::heartBeat(SerialNum serialNum) { - for (auto attrp : _writableAttributes) { - auto &attr = *attrp; - _attributeFieldWriter.execute(attr.getName(), - [serialNum, &attr]() - { applyHeartBeat(serialNum, attr); }); + for (auto entry : _attrMap) { + _attributeFieldWriter.execute(entry.second.second, + [serialNum, attr=entry.second.first]() + { applyHeartBeat(serialNum, *attr); }); } } @@ -614,11 +617,10 @@ AttributeWriter::forceCommit(SerialNum serialNum, OnWriteDoneType onWriteDone) void AttributeWriter::onReplayDone(uint32_t docIdLimit) { - for (auto attrp : _writableAttributes) { - auto &attr = *attrp; - _attributeFieldWriter.execute(attr.getName(), - [docIdLimit, &attr]() - { applyReplayDone(docIdLimit, attr); }); + for (auto entry : _attrMap) { + _attributeFieldWriter.execute(entry.second.second, + [docIdLimit, attr = entry.second.first]() + { applyReplayDone(docIdLimit, *attr); }); } _attributeFieldWriter.sync(); } @@ -627,12 +629,11 @@ AttributeWriter::onReplayDone(uint32_t docIdLimit) void AttributeWriter::compactLidSpace(uint32_t wantedLidLimit, SerialNum serialNum) { - for (auto attrp : _writableAttributes) { - auto &attr = *attrp; + for (auto entry : _attrMap) { _attributeFieldWriter. - execute(attr.getName(), - [wantedLidLimit, serialNum, &attr]() - { applyCompactLidSpace(wantedLidLimit, serialNum, attr); }); + execute(entry.second.second, + [wantedLidLimit, serialNum, attr=entry.second.first]() + { applyCompactLidSpace(wantedLidLimit, serialNum, *attr); }); } _attributeFieldWriter.sync(); } diff --git a/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.h b/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.h index c3d7b470045..4ea7f3fda6c 100644 --- a/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.h +++ b/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.h @@ -26,7 +26,6 @@ private: typedef document::FieldValue FieldValue; const IAttributeManager::SP _mgr; search::ISequencedTaskExecutor &_attributeFieldWriter; - const std::vector<search::AttributeVector *> &_writableAttributes; using ExecutorId = search::ISequencedTaskExecutor::ExecutorId; public: class WriteField @@ -67,6 +66,7 @@ private: AttrMap _attrMap; void setupWriteContexts(); + void setupAttriuteMapping(); void buildFieldPaths(const DocumentType &docType, const DataType *dataType); void internalPut(SerialNum serialNum, const Document &doc, DocumentIdT lid, bool immediateCommit, bool allAttributes, OnWriteDoneType onWriteDone); @@ -77,13 +77,13 @@ public: AttributeWriter(const proton::IAttributeManager::SP &mgr); ~AttributeWriter(); + /* Only for in tests that add attributes after AttributeWriter construction. */ + /** * Implements IAttributeWriter. */ - std::vector<search::AttributeVector *> - getWritableAttributes() const override; - search::AttributeVector * - getWritableAttribute(const vespalib::string &name) const override; + std::vector<search::AttributeVector *> getWritableAttributes() const override; + search::AttributeVector *getWritableAttribute(const vespalib::string &name) const override; void put(SerialNum serialNum, const Document &doc, DocumentIdT lid, bool immediateCommit, OnWriteDoneType onWriteDone) override; void remove(SerialNum serialNum, DocumentIdT lid, diff --git a/searchcore/src/vespa/searchcore/proton/attribute/attributemanager.cpp b/searchcore/src/vespa/searchcore/proton/attribute/attributemanager.cpp index ef818f7b407..bcc7d2f1359 100644 --- a/searchcore/src/vespa/searchcore/proton/attribute/attributemanager.cpp +++ b/searchcore/src/vespa/searchcore/proton/attribute/attributemanager.cpp @@ -569,16 +569,15 @@ AttributeManager::getWritableAttributes() const void -AttributeManager::asyncForEachAttribute(std::shared_ptr<IAttributeFunctor> - func) const +AttributeManager::asyncForEachAttribute(std::shared_ptr<IAttributeFunctor> func) const { for (const auto &attr : _attributes) { if (attr.second.isExtra()) { continue; } AttributeVector::SP attrsp = attr.second.getAttribute(); - _attributeFieldWriter. - execute(attr.first, [attrsp, func]() { (*func)(*attrsp); }); + _attributeFieldWriter.execute(_attributeFieldWriter.getExecutorId(attrsp->getNamePrefix()), + [attrsp, func]() { (*func)(*attrsp); }); } } diff --git a/searchcore/src/vespa/searchcore/proton/attribute/exclusive_attribute_read_accessor.cpp b/searchcore/src/vespa/searchcore/proton/attribute/exclusive_attribute_read_accessor.cpp index d9a0ff3d8dd..d1d5b1c9af7 100644 --- a/searchcore/src/vespa/searchcore/proton/attribute/exclusive_attribute_read_accessor.cpp +++ b/searchcore/src/vespa/searchcore/proton/attribute/exclusive_attribute_read_accessor.cpp @@ -37,8 +37,7 @@ ExclusiveAttributeReadAccessor(const AttributeVector::SP &attribute, namespace { void -attributeWriteBlockingTask(GateSP entranceGate, - GateSP exitGate) +attributeWriteBlockingTask(GateSP entranceGate, GateSP exitGate) { entranceGate->countDown(); exitGate->await(); @@ -51,9 +50,8 @@ ExclusiveAttributeReadAccessor::takeGuard() { GateSP entranceGate = std::make_shared<Gate>(); GateSP exitGate = std::make_shared<Gate>(); - _attributeFieldWriter.execute(_attribute->getName(), - [entranceGate, exitGate]() - { attributeWriteBlockingTask(entranceGate, exitGate); }); + _attributeFieldWriter.execute(_attributeFieldWriter.getExecutorId(_attribute->getNamePrefix()), + [entranceGate, exitGate]() { attributeWriteBlockingTask(entranceGate, exitGate); }); entranceGate->await(); return std::make_unique<Guard>(*_attribute, exitGate); } diff --git a/searchcore/src/vespa/searchcore/proton/attribute/filter_attribute_manager.cpp b/searchcore/src/vespa/searchcore/proton/attribute/filter_attribute_manager.cpp index d3a74bb9a98..8474efb15c9 100644 --- a/searchcore/src/vespa/searchcore/proton/attribute/filter_attribute_manager.cpp +++ b/searchcore/src/vespa/searchcore/proton/attribute/filter_attribute_manager.cpp @@ -197,9 +197,8 @@ FilterAttributeManager::asyncForEachAttribute(std::shared_ptr<IAttributeFunctor> search::AttributeVector::SP attrsp = guard.getSP(); // Name must be extracted in document db master thread or attribute // writer thread - vespalib::string attributeName = attrsp->getName(); - attributeFieldWriter. - execute(attributeName, [attrsp, func]() { (*func)(*attrsp); }); + attributeFieldWriter.execute(attributeFieldWriter.getExecutorId(attrsp->getNamePrefix()), + [attrsp, func]() { (*func)(*attrsp); }); } } diff --git a/searchcore/src/vespa/searchcore/proton/attribute/flushableattribute.cpp b/searchcore/src/vespa/searchcore/proton/attribute/flushableattribute.cpp index a658b11263a..7716fc5ee61 100644 --- a/searchcore/src/vespa/searchcore/proton/attribute/flushableattribute.cpp +++ b/searchcore/src/vespa/searchcore/proton/attribute/flushableattribute.cpp @@ -243,10 +243,8 @@ FlushableAttribute::initFlush(SerialNum currentSerial) // Called by document db executor std::promise<IFlushTarget::Task::UP> promise; std::future<IFlushTarget::Task::UP> future = promise.get_future(); - _attributeFieldWriter.execute(_attr->getName(), - [&]() { promise.set_value( - internalInitFlush(currentSerial)); - }); + _attributeFieldWriter.execute(_attributeFieldWriter.getExecutorId(_attr->getNamePrefix()), + [&]() { promise.set_value(internalInitFlush(currentSerial)); }); return future.get(); } @@ -257,5 +255,4 @@ FlushableAttribute::getApproxBytesToWriteToDisk() const return _attr->getEstimatedSaveByteSize(); } - } // namespace proton |