diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2018-06-11 09:03:38 +0000 |
---|---|---|
committer | Henning Baldersheim <balder@oath.com> | 2018-06-11 15:38:54 +0200 |
commit | 98d4316f41c6bac1be309c8f3f9c114fcfa17715 (patch) | |
tree | d6258492f8304a4b6378d87edc40cf1d3ab27a0a /searchcore | |
parent | 020b0adf2b2ece5e79ffb832e61d5c1d58b78ea2 (diff) |
Batch attribute updates.
Diffstat (limited to 'searchcore')
-rw-r--r-- | searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp | 81 |
1 files changed, 61 insertions, 20 deletions
diff --git a/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp b/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp index 2d45d6aa02d..e08ec456795 100644 --- a/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp +++ b/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp @@ -119,16 +119,20 @@ applyRemoveToAttribute(SerialNum serialNum, DocumentIdT lid, bool immediateCommi void applyUpdateToAttribute(SerialNum serialNum, const FieldUpdate &fieldUpd, - DocumentIdT lid, bool immediateCommit, AttributeVector &attr, - AttributeWriter::OnWriteDoneType) + DocumentIdT lid, AttributeVector &attr) { ensureLidSpace(serialNum, lid, attr); AttrUpdate::handleUpdate(attr, lid, fieldUpd); - if (immediateCommit) { - attr.commit(serialNum, serialNum); - } } +void +applyUpdateToAttributeAndCommit(SerialNum serialNum, const FieldUpdate &fieldUpd, + DocumentIdT lid, AttributeVector &attr) +{ + ensureLidSpace(serialNum, lid, attr); + AttrUpdate::handleUpdate(attr, lid, fieldUpd); + attr.commit(serialNum, serialNum); +} void applyReplayDone(uint32_t docIdLimit, AttributeVector &attr) @@ -149,10 +153,8 @@ applyHeartBeat(SerialNum serialNum, AttributeVector &attr) } void -applyCommit(SerialNum serialNum, AttributeWriter::OnWriteDoneType onWriteDone, - AttributeVector &attr) +applyCommit(SerialNum serialNum, AttributeWriter::OnWriteDoneType , AttributeVector &attr) { - (void) onWriteDone; if (attr.getStatus().getLastSyncToken() <= serialNum) { attr.commit(serialNum, serialNum); } @@ -160,8 +162,7 @@ applyCommit(SerialNum serialNum, AttributeWriter::OnWriteDoneType onWriteDone, void -applyCompactLidSpace(uint32_t wantedLidLimit, SerialNum serialNum, - AttributeVector &attr) +applyCompactLidSpace(uint32_t wantedLidLimit, SerialNum serialNum, AttributeVector &attr) { if (attr.getStatus().getLastSyncToken() < serialNum) { /* @@ -177,6 +178,39 @@ applyCompactLidSpace(uint32_t wantedLidLimit, SerialNum serialNum, } } +using AttrUpdates = std::vector<std::pair<AttributeVector *, const FieldUpdate *>>; + +struct BatchUpdateTask : public vespalib::Executor::Task { + + BatchUpdateTask(SerialNum serialNum, DocumentIdT lid, bool immediateCommit, + AttributeWriter::OnWriteDoneType onWriteDone) + : AttrUpdates(), + _serialNum(serialNum), + _lid(lid), + _immediateCommit(immediateCommit), + _onWriteDone(onWriteDone) + { } + + void run() override { + if (_immediateCommit) { + for (const auto & update : _updates) { + applyUpdateToAttributeAndCommit(_serialNum, *update.second, _lid, *update.first); + } + } else { + for (const auto & update : _updates) { + applyUpdateToAttribute(_serialNum, *update.second, _lid, *update.first); + } + } + } + + + AttrUpdates _updates; + SerialNum _serialNum; + DocumentIdT _lid; + bool _immediateCommit; + std::remove_reference_t<AttributeWriter::OnWriteDoneType> _onWriteDone; +}; + class FieldContext { vespalib::string _name; @@ -499,6 +533,14 @@ AttributeWriter::update(SerialNum serialNum, const DocumentUpdate &upd, Document bool immediateCommit, OnWriteDoneType onWriteDone, IFieldUpdateCallback & onUpdate) { LOG(debug, "Inspecting update for document %d.", lid); + std::vector<std::unique_ptr<BatchUpdateTask>> args; + uint32_t numExecutors = _attributeFieldWriter.getNumExecutors(); + args.reserve(numExecutors); + for (uint32_t i(0); i < numExecutors; i++) { + args.emplace_back(std::make_unique<BatchUpdateTask>(serialNum, lid, immediateCommit, onWriteDone)); + args.back()->_updates.reserve((2*upd.getUpdates().size())/numExecutors); + } + for (const auto &fupd : upd.getUpdates()) { LOG(debug, "Retrieving guard for attribute vector '%s'.", fupd.getField().getName().c_str()); AttributeVector *attrp = _mgr->getWritableAttribute(fupd.getField().getName()); @@ -507,20 +549,19 @@ AttributeWriter::update(SerialNum serialNum, const DocumentUpdate &upd, Document LOG(spam, "Failed to find attribute vector %s", fupd.getField().getName().c_str()); continue; } - AttributeVector &attr = *attrp; // TODO: Check if we must use > due to multiple entries for same // document and attribute. - if (attr.getStatus().getLastSyncToken() >= serialNum) + if (attrp->getStatus().getLastSyncToken() >= serialNum) continue; - - LOG(debug, "About to apply update for docId %u in attribute vector '%s'.", lid, attr.getName().c_str()); - - // NOTE: The lifetime of the field update will be ensured by keeping the document update alive - // in a operation done context object. - _attributeFieldWriter.execute(attr.getName(), - [serialNum, &fupd, lid, immediateCommit, &attr, onWriteDone]() - { applyUpdateToAttribute(serialNum, fupd, lid, immediateCommit, attr, onWriteDone); }); + args[_attributeFieldWriter.getExecutorId(attrp->getName())]->_updates.emplace_back(attrp, &fupd); + LOG(debug, "About to apply update for docId %u in attribute vector '%s'.", lid, attrp->getName().c_str()); + } + // NOTE: The lifetime of the field update will be ensured by keeping the document update alive + // in a operation done context object. + for (uint32_t id(0); id < args.size(); id++) { + _attributeFieldWriter.executeTask(id, std::move(args[id])); } + } void |