diff options
author | Tor Egge <Tor.Egge@yahoo-inc.com> | 2017-05-04 15:45:55 +0000 |
---|---|---|
committer | Tor Egge <Tor.Egge@yahoo-inc.com> | 2017-05-04 15:45:55 +0000 |
commit | 44df619d2a1ab78c32f4600599d2027c317c4db7 (patch) | |
tree | c02c002819c592e9daa2fe44ce341960797fb741 /searchcore | |
parent | a2aff166a08556c2d934fc5584bbc6a84b7ac42c (diff) |
Handle multiple attributes per scheduled tasks, to reduce task scheduling
overhead when number of attributes is much larger than number of attribute
write threads.
Diffstat (limited to 'searchcore')
-rw-r--r-- | searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp | 308 | ||||
-rw-r--r-- | searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.h | 27 |
2 files changed, 263 insertions, 72 deletions
diff --git a/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp b/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp index 01727800a44..94d2f22f0a2 100644 --- a/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp +++ b/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp @@ -13,12 +13,59 @@ LOG_SETUP(".proton.server.attributeadapter"); #include <vespa/searchlib/common/isequencedtaskexecutor.h> #include <vespa/document/fieldvalue/weightedsetfieldvalue.h> #include <vespa/document/fieldvalue/arrayfieldvalue.h> +#include <type_traits> using namespace document; using namespace search; namespace proton { +AttributeWriter::WriteContext::WriteContext(uint32_t executorId) + : _executorId(executorId), + _fieldPaths(), + _attributes() +{ +} + + +AttributeWriter::WriteContext::WriteContext(WriteContext &&rhs) = default; + +AttributeWriter::WriteContext::~WriteContext() = default; + +AttributeWriter::WriteContext &AttributeWriter::WriteContext::operator=(WriteContext &&rhs) = default; + +void +AttributeWriter::WriteContext::add(AttributeVector *attr) +{ + _attributes.emplace_back(attr); + _fieldPaths.emplace_back(); +} + +void +AttributeWriter::WriteContext::buildFieldPaths(const DocumentType &docType) +{ + size_t fieldId = 0; + for (const auto &attrp : _attributes) { + vespalib::string name = attrp->getName(); + FieldPath::UP fp = docType.buildFieldPath(name); + if (!fp) { + /// Should be exception but due to incomplete unit test we can not be strict enough, must fix unit test proton/docsummary + // The above comment is actually incorrect. This is expected during reconfig as long as do not stop accepting feed while doing reconfig. + // throw std::runtime_error(vespalib::make_string("Mismatch between documentdefinition and schema. No field named '%s' from schema in document type '%s'", attribute.getName().c_str(), docType.getName().c_str())); + LOG(warning, + "Mismatch between documentdefinition and schema. " + "No field named '%s' from schema in document type '%s'. " + "This might happen if an attribute field has been added and you are feeding while reconfiguring", + name.c_str(), + docType.getName().c_str()); + } + assert(fieldId < _fieldPaths.size()); + _fieldPaths[fieldId] = std::move(fp); + ++fieldId; + } + assert(fieldId == _fieldPaths.size()); +} + namespace { void @@ -108,59 +155,198 @@ applyCompactLidSpace(uint32_t wantedLidLimit, SerialNum serialNum, } } +class FieldContext +{ + vespalib::string _name; + uint32_t _executorId; + AttributeVector *_attr; + +public: + FieldContext(ISequencedTaskExecutor &writer, AttributeVector *attr); + ~FieldContext(); + bool operator<(const FieldContext &rhs) const; + uint32_t getExecutorId() const { return _executorId; } + AttributeVector *getAttribute() const { return _attr; } +}; + + +FieldContext::FieldContext(ISequencedTaskExecutor &writer, AttributeVector *attr) + : _name(attr->getName()), + _executorId(writer.getExecutorId(_name)), + _attr(attr) +{ +} + +FieldContext::~FieldContext() = default; + +bool +FieldContext::operator<(const FieldContext &rhs) const +{ + if (_executorId != rhs._executorId) { + return _executorId < rhs._executorId; + } + return _name < rhs._name; +} + +class PutTask : public vespalib::Executor::Task +{ + const AttributeWriter::WriteContext &_wc; + const SerialNum _serialNum; + const uint32_t _lid; + const bool _immediateCommit; + std::remove_reference_t<AttributeWriter::OnWriteDoneType> _onWriteDone; + std::vector<FieldValue::UP> _fieldValues; +public: + PutTask(const AttributeWriter::WriteContext &wc, SerialNum serialNum, const Document &doc, uint32_t lid, bool immediateCommit, AttributeWriter::OnWriteDoneType onWriteDone); + virtual ~PutTask() override; + virtual void run() override; +}; + +PutTask::PutTask(const AttributeWriter::WriteContext &wc, SerialNum serialNum, const Document &doc, uint32_t lid, bool immediateCommit, AttributeWriter::OnWriteDoneType onWriteDone) + : _wc(wc), + _serialNum(serialNum), + _lid(lid), + _immediateCommit(immediateCommit), + _onWriteDone(onWriteDone) +{ + const auto &fieldPaths = _wc.getFieldPaths(); + _fieldValues.reserve(fieldPaths.size()); + for (const auto &fieldPath : fieldPaths) { + FieldValue::UP fv; + if (fieldPath) { + fv = doc.getNestedFieldValue(fieldPath->getFullRange()); + } + _fieldValues.emplace_back(std::move(fv)); + } +} + +PutTask::~PutTask() +{ } void -AttributeWriter::buildFieldPath(const DocumentType & docType, const DataType *dataType) +PutTask::run() { - _fieldPaths.clear(); - _fieldPaths.resize(_writableAttributes.size()); - size_t i = 0; - for (auto attrp : _writableAttributes) { - const search::attribute::IAttributeVector & attribute(*attrp); - FieldPath::UP fp = docType.buildFieldPath(attribute.getName()); - if (fp.get() == NULL) { - /// Should be exception but due to incomplete unit test we can not be strict enough, must fix unit test proton/docsummary - // The above comment is actually incorrect. This is expected during reconfig as long as do not stop accepting feed while doing reconfig. - // throw std::runtime_error(vespalib::make_string("Mismatch between documentdefinition and schema. No field named '%s' from schema in document type '%s'", attribute.getName().c_str(), docType.getName().c_str())); - LOG(warning, - "Mismatch between documentdefinition and schema. " - "No field named '%s' from schema in document type '%s'. " - "This might happen if an attribute field has been added and you are feeding while reconfiguring", - attribute.getName().c_str(), - docType.getName().c_str()); + uint32_t fieldId = 0; + const auto &attributes = _wc.getAttributes(); + for (auto attrp : attributes) { + AttributeVector &attr = *attrp; + if (attr.getStatus().getLastSyncToken() < _serialNum) { + applyPutToAttribute(_serialNum, _fieldValues[fieldId], _lid, _immediateCommit, attr, _onWriteDone); } - _fieldPaths[i] = std::move(fp); - ++i; + ++fieldId; } - _dataType = dataType; +} + +class RemoveTask : public vespalib::Executor::Task +{ + const AttributeWriter::WriteContext &_wc; + const SerialNum _serialNum; + const uint32_t _lid; + const bool _immediateCommit; + std::remove_reference_t<AttributeWriter::OnWriteDoneType> _onWriteDone; +public: + RemoveTask(const AttributeWriter::WriteContext &wc, SerialNum serialNum, uint32_t lid, bool immediateCommit, AttributeWriter::OnWriteDoneType onWriteDone); + virtual ~RemoveTask() override; + virtual void run() override; +}; + + +RemoveTask::RemoveTask(const AttributeWriter::WriteContext &wc, SerialNum serialNum, uint32_t lid, bool immediateCommit, AttributeWriter::OnWriteDoneType onWriteDone) + : _wc(wc), + _serialNum(serialNum), + _lid(lid), + _immediateCommit(immediateCommit), + _onWriteDone(onWriteDone) +{ +} + +RemoveTask::~RemoveTask() +{ } void -AttributeWriter::internalPut(SerialNum serialNum, const Document &doc, DocumentIdT lid, - bool immediateCommit, OnWriteDoneType onWriteDone) +RemoveTask::run() { - size_t fieldId = 0; - for (auto attrp : _writableAttributes) { + const auto &attributes = _wc.getAttributes(); + for (auto &attrp : attributes) { AttributeVector &attr = *attrp; - if (attr.getStatus().getLastSyncToken() >= serialNum) { - LOG(debug, "internalPut(): change already applied: serial=%" PRIu64 "" - ", docId='%s', lid=%u, attribute='%s', lastSyncToken=%" PRIu64 "", - serialNum, doc.getId().toString().c_str(), lid, attr.getName().c_str(), - attr.getStatus().getLastSyncToken()); - ++fieldId; - continue; // Change already applied + // Must use <= due to batch remove + if (attr.getStatus().getLastSyncToken() <= _serialNum) { + applyRemoveToAttribute(_serialNum, _lid, _immediateCommit, attr, _onWriteDone); } - const FieldPath *const fieldPath(_fieldPaths[fieldId].get()); - FieldValue::UP fv; - if (fieldPath != NULL) { - fv = doc.getNestedFieldValue(fieldPath->getFullRange()); + } +} + +class CommitTask : public vespalib::Executor::Task +{ + const AttributeWriter::WriteContext &_wc; + const SerialNum _serialNum; + std::remove_reference_t<AttributeWriter::OnWriteDoneType> _onWriteDone; +public: + CommitTask(const AttributeWriter::WriteContext &wc, SerialNum serialNum, AttributeWriter::OnWriteDoneType onWriteDone); + virtual ~CommitTask() override; + virtual void run() override; +}; + + +CommitTask::CommitTask(const AttributeWriter::WriteContext &wc, SerialNum serialNum, AttributeWriter::OnWriteDoneType onWriteDone) + : _wc(wc), + _serialNum(serialNum), + _onWriteDone(onWriteDone) +{ +} + +CommitTask::~CommitTask() +{ +} + +void +CommitTask::run() +{ + const auto &attributes = _wc.getAttributes(); + for (auto &attrp : attributes) { + AttributeVector &attr = *attrp; + applyCommit(_serialNum, _onWriteDone, attr); + } +} + +} + +void +AttributeWriter::setupWriteContexts() +{ + std::vector<FieldContext> fieldContexts; + assert(_writeContexts.empty()); + for (auto attr : _writableAttributes) { + fieldContexts.emplace_back(_attributeFieldWriter, attr); + } + std::sort(fieldContexts.begin(), fieldContexts.end()); + for (auto &fc : fieldContexts) { + if (_writeContexts.empty() || + (_writeContexts.back().getExecutorId() != fc.getExecutorId())) { + _writeContexts.emplace_back(fc.getExecutorId()); } + _writeContexts.back().add(fc.getAttribute()); + } +} - _attributeFieldWriter.execute(attr.getName(), - [serialNum, fv(std::move(fv)), lid, immediateCommit, &attr, onWriteDone]() - { applyPutToAttribute(serialNum, fv, lid, immediateCommit, attr, onWriteDone); }); - ++fieldId; +void +AttributeWriter::buildFieldPaths(const DocumentType & docType, const DataType *dataType) +{ + for (auto &wc : _writeContexts) { + wc.buildFieldPaths(docType); + } + _dataType = dataType; +} + +void +AttributeWriter::internalPut(SerialNum serialNum, const Document &doc, DocumentIdT lid, + bool immediateCommit, OnWriteDoneType onWriteDone) +{ + for (const auto &wc : _writeContexts) { + auto putTask = std::make_unique<PutTask>(wc, serialNum, doc, lid, immediateCommit, onWriteDone); + _attributeFieldWriter.executeTask(wc.getExecutorId(), std::move(putTask)); } } @@ -169,32 +355,26 @@ AttributeWriter::internalRemove(SerialNum serialNum, DocumentIdT lid, bool immediateCommit, OnWriteDoneType onWriteDone) { - for (auto &attrp : _writableAttributes) { - AttributeVector &attr = *attrp; - // XXX: Want to use >=, but must use > due to batch remove - // Might be OK due to clearDoc() being idempotent. - if (attr.getStatus().getLastSyncToken() > serialNum) - continue; // Change already applied - LOG(debug, "About to remove docId %u from attribute vector '%s'.", - lid, attr.getName().c_str()); - - _attributeFieldWriter.execute(attr.getName(), - [serialNum, lid, immediateCommit, &attr, onWriteDone]() - { applyRemoveToAttribute(serialNum, lid, immediateCommit, attr, onWriteDone); }); + for (const auto &wc : _writeContexts) { + auto removeTask = std::make_unique<RemoveTask>(wc, serialNum, lid, immediateCommit, onWriteDone); + _attributeFieldWriter.executeTask(wc.getExecutorId(), std::move(removeTask)); } } AttributeWriter::AttributeWriter(const proton::IAttributeManager::SP &mgr) : _mgr(mgr), - _fieldPaths(), - _dataType(), - _fieldPathsDocTypeName(), _attributeFieldWriter(mgr->getAttributeFieldWriter()), - _writableAttributes(mgr->getWritableAttributes()) + _writableAttributes(mgr->getWritableAttributes()), + _writeContexts(), + _dataType(nullptr) { + setupWriteContexts(); } -AttributeWriter::~AttributeWriter() {} +AttributeWriter::~AttributeWriter() +{ + _attributeFieldWriter.sync(); +} std::vector<search::AttributeVector *> AttributeWriter::getWritableAttributes() const @@ -221,11 +401,8 @@ AttributeWriter::put(SerialNum serialNum, const Document &doc, DocumentIdT lid, lid, doc.toString(true).c_str()); const DataType *dataType(doc.getDataType()); - if (_fieldPaths.empty() || - _dataType != dataType || - doc.getType().getName() != _fieldPathsDocTypeName) { - buildFieldPath(doc.getType(), dataType); - _fieldPathsDocTypeName = doc.getType().getName(); + if (_dataType != dataType) { + buildFieldPaths(doc.getType(), dataType); } internalPut(serialNum, doc, lid, immediateCommit, onWriteDone); } @@ -293,12 +470,9 @@ AttributeWriter::heartBeat(SerialNum serialNum) void AttributeWriter::commit(SerialNum serialNum, OnWriteDoneType onWriteDone) { - for (auto attrp : _writableAttributes) { - auto &attr = *attrp; - _attributeFieldWriter.execute(attr.getName(), - [serialNum, onWriteDone, &attr]() - { applyCommit(serialNum, onWriteDone, - attr); }); + for (const auto &wc : _writeContexts) { + auto commitTask = std::make_unique<CommitTask>(wc, serialNum, onWriteDone); + _attributeFieldWriter.executeTask(wc.getExecutorId(), std::move(commitTask)); } } diff --git a/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.h b/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.h index 32b23d93e4c..446acf80c5c 100644 --- a/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.h +++ b/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.h @@ -19,15 +19,32 @@ private: typedef document::DataType DataType; typedef document::DocumentType DocumentType; typedef document::FieldValue FieldValue; - typedef std::vector<std::unique_ptr<FieldPath> > AttributeFieldPaths; const IAttributeManager::SP _mgr; - AttributeFieldPaths _fieldPaths; - const DataType *_dataType; - vespalib::string _fieldPathsDocTypeName; search::ISequencedTaskExecutor &_attributeFieldWriter; const std::vector<search::AttributeVector *> &_writableAttributes; +public: + class WriteContext + { + uint32_t _executorId; + std::vector<std::unique_ptr<FieldPath>> _fieldPaths; + std::vector<AttributeVector *> _attributes; + public: + WriteContext(uint32_t executorId); + WriteContext(WriteContext &&rhs); + ~WriteContext(); + WriteContext &operator=(WriteContext &&rhs); + void buildFieldPaths(const DocumentType &docType); + void add(AttributeVector *attr); + uint32_t getExecutorId() const { return _executorId; } + const std::vector<std::unique_ptr<FieldPath>> &getFieldPaths() const { return _fieldPaths; } + const std::vector<AttributeVector *> &getAttributes() const { return _attributes; } + }; +private: + std::vector<WriteContext> _writeContexts; + const DataType *_dataType; - void buildFieldPath(const DocumentType &docType, const DataType *dataType); + void setupWriteContexts(); + void buildFieldPaths(const DocumentType &docType, const DataType *dataType); void internalPut(SerialNum serialNum, const Document &doc, DocumentIdT lid, bool immediateCommit, OnWriteDoneType onWriteDone); void internalRemove(SerialNum serialNum, DocumentIdT lid, |