summaryrefslogtreecommitdiffstats
path: root/searchcore
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@oath.com>2018-06-17 22:35:07 +0200
committerHenning Baldersheim <balder@oath.com>2018-06-17 22:35:07 +0200
commit8926abb4432ca8ab0ccae3ee6e9b4f376d37ecbf (patch)
tree83d4f7131db9fe5b52da1c2cb7f713f034da4875 /searchcore
parent8076e0074b86d5d4e44bc459593ab1b54f744a7c (diff)
- Remove the execute(string, ...) and force the use of ExecutorId.
- Remove some double bookkeeping in AttributeWriter. - Ensure that we always use attribute.getNamePrefix() to compute executor id.
Diffstat (limited to 'searchcore')
-rw-r--r--searchcore/src/tests/proton/attribute/exclusive_attribute_read_accessor/exclusive_attribute_read_accessor_test.cpp2
-rw-r--r--searchcore/src/tests/proton/docsummary/docsummary.cpp39
-rw-r--r--searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp37
-rw-r--r--searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.h10
-rw-r--r--searchcore/src/vespa/searchcore/proton/attribute/attributemanager.cpp7
-rw-r--r--searchcore/src/vespa/searchcore/proton/attribute/exclusive_attribute_read_accessor.cpp8
-rw-r--r--searchcore/src/vespa/searchcore/proton/attribute/filter_attribute_manager.cpp5
-rw-r--r--searchcore/src/vespa/searchcore/proton/attribute/flushableattribute.cpp7
8 files changed, 50 insertions, 65 deletions
diff --git a/searchcore/src/tests/proton/attribute/exclusive_attribute_read_accessor/exclusive_attribute_read_accessor_test.cpp b/searchcore/src/tests/proton/attribute/exclusive_attribute_read_accessor/exclusive_attribute_read_accessor_test.cpp
index 6d69d9b225b..c5ae0f97875 100644
--- a/searchcore/src/tests/proton/attribute/exclusive_attribute_read_accessor/exclusive_attribute_read_accessor_test.cpp
+++ b/searchcore/src/tests/proton/attribute/exclusive_attribute_read_accessor/exclusive_attribute_read_accessor_test.cpp
@@ -37,7 +37,7 @@ TEST_F("require that attribute write thread is blocked while guard is held", Fix
{
ReadGuard::UP guard = f.accessor.takeGuard();
Gate gate;
- f.writer.execute("myattr", [&gate]() { gate.countDown(); });
+ f.writer.execute(f.writer.getExecutorId(f.attribute->getNamePrefix()), [&gate]() { gate.countDown(); });
bool reachedZero = gate.await(100);
EXPECT_FALSE(reachedZero);
EXPECT_EQUAL(1u, gate.getCount());
diff --git a/searchcore/src/tests/proton/docsummary/docsummary.cpp b/searchcore/src/tests/proton/docsummary/docsummary.cpp
index 7eae1c9d12d..b2bf768052b 100644
--- a/searchcore/src/tests/proton/docsummary/docsummary.cpp
+++ b/searchcore/src/tests/proton/docsummary/docsummary.cpp
@@ -829,20 +829,16 @@ Test::requireThatAttributesAreUsed()
"bi:[]}", *rep, 1, false));
TEST_DO(assertTensor(Tensor::UP(), "bj", *rep, 1, rclass));
- proton::IAttributeManager::SP attributeManager =
- dc._ddb->getReadySubDB()->getAttributeManager();
- search::ISequencedTaskExecutor &attributeFieldWriter =
- attributeManager->getAttributeFieldWriter();
- search::AttributeVector *bjAttr =
- attributeManager->getWritableAttribute("bj");
- search::tensor::TensorAttribute *bjTensorAttr =
- dynamic_cast<search::tensor::TensorAttribute *>(bjAttr);
-
- attributeFieldWriter.
- execute("bj",
- [&]() { bjTensorAttr->setTensor(3,
- *createTensor({ {{{"x", "a"},{"y", "b"}}, 4} }, { "x"}));
- bjTensorAttr->commit(); });
+ proton::IAttributeManager::SP attributeManager = dc._ddb->getReadySubDB()->getAttributeManager();
+ search::ISequencedTaskExecutor &attributeFieldWriter = attributeManager->getAttributeFieldWriter();
+ search::AttributeVector *bjAttr = attributeManager->getWritableAttribute("bj");
+ auto bjTensorAttr = dynamic_cast<search::tensor::TensorAttribute *>(bjAttr);
+
+ attributeFieldWriter.execute(attributeFieldWriter.getExecutorId(bjAttr->getNamePrefix()),
+ [&]() {
+ bjTensorAttr->setTensor(3, *createTensor({ {{{"x", "a"},{"y", "b"}}, 4} }, { "x"}));
+ bjTensorAttr->commit();
+ });
attributeFieldWriter.sync();
DocsumReply::UP rep2 = dc._ddb->getDocsums(req);
@@ -961,8 +957,7 @@ Test::requireThatUrisAreUsed()
Document::UP exp = bc._bld.startDocument("doc::0").
startIndexField("urisingle").
startSubField("all").
- addUrlTokenizedString(
- "http://www.example.com:81/fluke?ab=2#4").
+ addUrlTokenizedString("http://www.example.com:81/fluke?ab=2#4").
endSubField().
startSubField("scheme").
addUrlTokenizedString("http").
@@ -986,8 +981,7 @@ Test::requireThatUrisAreUsed()
startIndexField("uriarray").
startElement(1).
startSubField("all").
- addUrlTokenizedString(
- "http://www.example.com:82/fluke?ab=2#8").
+ addUrlTokenizedString("http://www.example.com:82/fluke?ab=2#8").
endSubField().
startSubField("scheme").
addUrlTokenizedString("http").
@@ -1010,8 +1004,7 @@ Test::requireThatUrisAreUsed()
endElement().
startElement(1).
startSubField("all").
- addUrlTokenizedString(
- "http://www.flickr.com:82/fluke?ab=2#9").
+ addUrlTokenizedString("http://www.flickr.com:82/fluke?ab=2#9").
endSubField().
startSubField("scheme").
addUrlTokenizedString("http").
@@ -1036,8 +1029,7 @@ Test::requireThatUrisAreUsed()
startIndexField("uriwset").
startElement(4).
startSubField("all").
- addUrlTokenizedString(
- "http://www.example.com:83/fluke?ab=2#12").
+ addUrlTokenizedString("http://www.example.com:83/fluke?ab=2#12").
endSubField().
startSubField("scheme").
addUrlTokenizedString("http").
@@ -1060,8 +1052,7 @@ Test::requireThatUrisAreUsed()
endElement().
startElement(7).
startSubField("all").
- addUrlTokenizedString(
- "http://www.flickr.com:85/fluke?ab=2#13").
+ addUrlTokenizedString("http://www.flickr.com:85/fluke?ab=2#13").
endSubField().
startSubField("scheme").
addUrlTokenizedString("http").
diff --git a/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp b/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp
index 352a9aa1bb8..2e9b512706c 100644
--- a/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp
+++ b/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp
@@ -419,7 +419,7 @@ AttributeWriter::setupWriteContexts()
{
std::vector<FieldContext> fieldContexts;
assert(_writeContexts.empty());
- for (auto attr : _writableAttributes) {
+ for (auto attr : getWritableAttributes()) {
fieldContexts.emplace_back(_attributeFieldWriter, attr);
}
std::sort(fieldContexts.begin(), fieldContexts.end());
@@ -476,19 +476,23 @@ AttributeWriter::internalRemove(SerialNum serialNum, DocumentIdT lid, bool immed
AttributeWriter::AttributeWriter(const proton::IAttributeManager::SP &mgr)
: _mgr(mgr),
_attributeFieldWriter(mgr->getAttributeFieldWriter()),
- _writableAttributes(mgr->getWritableAttributes()),
_writeContexts(),
_dataType(nullptr),
_hasStructFieldAttribute(false),
_attrMap()
{
setupWriteContexts();
+ setupAttriuteMapping();
+}
+
+void AttributeWriter::setupAttriuteMapping() {
for (auto attr : getWritableAttributes()) {
vespalib::stringref name = attr->getName();
_attrMap[name] = AttrWithId(attr, _attributeFieldWriter.getExecutorId(getPrefix(name)));
- }
+ }
}
+
AttributeWriter::~AttributeWriter()
{
_attributeFieldWriter.sync();
@@ -585,11 +589,10 @@ AttributeWriter::update(SerialNum serialNum, const DocumentUpdate &upd, Document
void
AttributeWriter::heartBeat(SerialNum serialNum)
{
- for (auto attrp : _writableAttributes) {
- auto &attr = *attrp;
- _attributeFieldWriter.execute(attr.getName(),
- [serialNum, &attr]()
- { applyHeartBeat(serialNum, attr); });
+ for (auto entry : _attrMap) {
+ _attributeFieldWriter.execute(entry.second.second,
+ [serialNum, attr=entry.second.first]()
+ { applyHeartBeat(serialNum, *attr); });
}
}
@@ -614,11 +617,10 @@ AttributeWriter::forceCommit(SerialNum serialNum, OnWriteDoneType onWriteDone)
void
AttributeWriter::onReplayDone(uint32_t docIdLimit)
{
- for (auto attrp : _writableAttributes) {
- auto &attr = *attrp;
- _attributeFieldWriter.execute(attr.getName(),
- [docIdLimit, &attr]()
- { applyReplayDone(docIdLimit, attr); });
+ for (auto entry : _attrMap) {
+ _attributeFieldWriter.execute(entry.second.second,
+ [docIdLimit, attr = entry.second.first]()
+ { applyReplayDone(docIdLimit, *attr); });
}
_attributeFieldWriter.sync();
}
@@ -627,12 +629,11 @@ AttributeWriter::onReplayDone(uint32_t docIdLimit)
void
AttributeWriter::compactLidSpace(uint32_t wantedLidLimit, SerialNum serialNum)
{
- for (auto attrp : _writableAttributes) {
- auto &attr = *attrp;
+ for (auto entry : _attrMap) {
_attributeFieldWriter.
- execute(attr.getName(),
- [wantedLidLimit, serialNum, &attr]()
- { applyCompactLidSpace(wantedLidLimit, serialNum, attr); });
+ execute(entry.second.second,
+ [wantedLidLimit, serialNum, attr=entry.second.first]()
+ { applyCompactLidSpace(wantedLidLimit, serialNum, *attr); });
}
_attributeFieldWriter.sync();
}
diff --git a/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.h b/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.h
index c3d7b470045..4ea7f3fda6c 100644
--- a/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.h
+++ b/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.h
@@ -26,7 +26,6 @@ private:
typedef document::FieldValue FieldValue;
const IAttributeManager::SP _mgr;
search::ISequencedTaskExecutor &_attributeFieldWriter;
- const std::vector<search::AttributeVector *> &_writableAttributes;
using ExecutorId = search::ISequencedTaskExecutor::ExecutorId;
public:
class WriteField
@@ -67,6 +66,7 @@ private:
AttrMap _attrMap;
void setupWriteContexts();
+ void setupAttriuteMapping();
void buildFieldPaths(const DocumentType &docType, const DataType *dataType);
void internalPut(SerialNum serialNum, const Document &doc, DocumentIdT lid,
bool immediateCommit, bool allAttributes, OnWriteDoneType onWriteDone);
@@ -77,13 +77,13 @@ public:
AttributeWriter(const proton::IAttributeManager::SP &mgr);
~AttributeWriter();
+ /* Only for in tests that add attributes after AttributeWriter construction. */
+
/**
* Implements IAttributeWriter.
*/
- std::vector<search::AttributeVector *>
- getWritableAttributes() const override;
- search::AttributeVector *
- getWritableAttribute(const vespalib::string &name) const override;
+ std::vector<search::AttributeVector *> getWritableAttributes() const override;
+ search::AttributeVector *getWritableAttribute(const vespalib::string &name) const override;
void put(SerialNum serialNum, const Document &doc, DocumentIdT lid,
bool immediateCommit, OnWriteDoneType onWriteDone) override;
void remove(SerialNum serialNum, DocumentIdT lid,
diff --git a/searchcore/src/vespa/searchcore/proton/attribute/attributemanager.cpp b/searchcore/src/vespa/searchcore/proton/attribute/attributemanager.cpp
index ef818f7b407..bcc7d2f1359 100644
--- a/searchcore/src/vespa/searchcore/proton/attribute/attributemanager.cpp
+++ b/searchcore/src/vespa/searchcore/proton/attribute/attributemanager.cpp
@@ -569,16 +569,15 @@ AttributeManager::getWritableAttributes() const
void
-AttributeManager::asyncForEachAttribute(std::shared_ptr<IAttributeFunctor>
- func) const
+AttributeManager::asyncForEachAttribute(std::shared_ptr<IAttributeFunctor> func) const
{
for (const auto &attr : _attributes) {
if (attr.second.isExtra()) {
continue;
}
AttributeVector::SP attrsp = attr.second.getAttribute();
- _attributeFieldWriter.
- execute(attr.first, [attrsp, func]() { (*func)(*attrsp); });
+ _attributeFieldWriter.execute(_attributeFieldWriter.getExecutorId(attrsp->getNamePrefix()),
+ [attrsp, func]() { (*func)(*attrsp); });
}
}
diff --git a/searchcore/src/vespa/searchcore/proton/attribute/exclusive_attribute_read_accessor.cpp b/searchcore/src/vespa/searchcore/proton/attribute/exclusive_attribute_read_accessor.cpp
index d9a0ff3d8dd..d1d5b1c9af7 100644
--- a/searchcore/src/vespa/searchcore/proton/attribute/exclusive_attribute_read_accessor.cpp
+++ b/searchcore/src/vespa/searchcore/proton/attribute/exclusive_attribute_read_accessor.cpp
@@ -37,8 +37,7 @@ ExclusiveAttributeReadAccessor(const AttributeVector::SP &attribute,
namespace {
void
-attributeWriteBlockingTask(GateSP entranceGate,
- GateSP exitGate)
+attributeWriteBlockingTask(GateSP entranceGate, GateSP exitGate)
{
entranceGate->countDown();
exitGate->await();
@@ -51,9 +50,8 @@ ExclusiveAttributeReadAccessor::takeGuard()
{
GateSP entranceGate = std::make_shared<Gate>();
GateSP exitGate = std::make_shared<Gate>();
- _attributeFieldWriter.execute(_attribute->getName(),
- [entranceGate, exitGate]()
- { attributeWriteBlockingTask(entranceGate, exitGate); });
+ _attributeFieldWriter.execute(_attributeFieldWriter.getExecutorId(_attribute->getNamePrefix()),
+ [entranceGate, exitGate]() { attributeWriteBlockingTask(entranceGate, exitGate); });
entranceGate->await();
return std::make_unique<Guard>(*_attribute, exitGate);
}
diff --git a/searchcore/src/vespa/searchcore/proton/attribute/filter_attribute_manager.cpp b/searchcore/src/vespa/searchcore/proton/attribute/filter_attribute_manager.cpp
index d3a74bb9a98..8474efb15c9 100644
--- a/searchcore/src/vespa/searchcore/proton/attribute/filter_attribute_manager.cpp
+++ b/searchcore/src/vespa/searchcore/proton/attribute/filter_attribute_manager.cpp
@@ -197,9 +197,8 @@ FilterAttributeManager::asyncForEachAttribute(std::shared_ptr<IAttributeFunctor>
search::AttributeVector::SP attrsp = guard.getSP();
// Name must be extracted in document db master thread or attribute
// writer thread
- vespalib::string attributeName = attrsp->getName();
- attributeFieldWriter.
- execute(attributeName, [attrsp, func]() { (*func)(*attrsp); });
+ attributeFieldWriter.execute(attributeFieldWriter.getExecutorId(attrsp->getNamePrefix()),
+ [attrsp, func]() { (*func)(*attrsp); });
}
}
diff --git a/searchcore/src/vespa/searchcore/proton/attribute/flushableattribute.cpp b/searchcore/src/vespa/searchcore/proton/attribute/flushableattribute.cpp
index a658b11263a..7716fc5ee61 100644
--- a/searchcore/src/vespa/searchcore/proton/attribute/flushableattribute.cpp
+++ b/searchcore/src/vespa/searchcore/proton/attribute/flushableattribute.cpp
@@ -243,10 +243,8 @@ FlushableAttribute::initFlush(SerialNum currentSerial)
// Called by document db executor
std::promise<IFlushTarget::Task::UP> promise;
std::future<IFlushTarget::Task::UP> future = promise.get_future();
- _attributeFieldWriter.execute(_attr->getName(),
- [&]() { promise.set_value(
- internalInitFlush(currentSerial));
- });
+ _attributeFieldWriter.execute(_attributeFieldWriter.getExecutorId(_attr->getNamePrefix()),
+ [&]() { promise.set_value(internalInitFlush(currentSerial)); });
return future.get();
}
@@ -257,5 +255,4 @@ FlushableAttribute::getApproxBytesToWriteToDisk() const
return _attr->getEstimatedSaveByteSize();
}
-
} // namespace proton