summaryrefslogtreecommitdiffstats
path: root/searchcore
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2018-06-11 09:03:38 +0000
committerHenning Baldersheim <balder@oath.com>2018-06-11 15:38:54 +0200
commit98d4316f41c6bac1be309c8f3f9c114fcfa17715 (patch)
treed6258492f8304a4b6378d87edc40cf1d3ab27a0a /searchcore
parent020b0adf2b2ece5e79ffb832e61d5c1d58b78ea2 (diff)
Batch attribute updates.
Diffstat (limited to 'searchcore')
-rw-r--r--searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp81
1 files changed, 61 insertions, 20 deletions
diff --git a/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp b/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp
index 2d45d6aa02d..e08ec456795 100644
--- a/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp
+++ b/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp
@@ -119,16 +119,20 @@ applyRemoveToAttribute(SerialNum serialNum, DocumentIdT lid, bool immediateCommi
void
applyUpdateToAttribute(SerialNum serialNum, const FieldUpdate &fieldUpd,
- DocumentIdT lid, bool immediateCommit, AttributeVector &attr,
- AttributeWriter::OnWriteDoneType)
+ DocumentIdT lid, AttributeVector &attr)
{
ensureLidSpace(serialNum, lid, attr);
AttrUpdate::handleUpdate(attr, lid, fieldUpd);
- if (immediateCommit) {
- attr.commit(serialNum, serialNum);
- }
}
+void
+applyUpdateToAttributeAndCommit(SerialNum serialNum, const FieldUpdate &fieldUpd,
+ DocumentIdT lid, AttributeVector &attr)
+{
+ ensureLidSpace(serialNum, lid, attr);
+ AttrUpdate::handleUpdate(attr, lid, fieldUpd);
+ attr.commit(serialNum, serialNum);
+}
void
applyReplayDone(uint32_t docIdLimit, AttributeVector &attr)
@@ -149,10 +153,8 @@ applyHeartBeat(SerialNum serialNum, AttributeVector &attr)
}
void
-applyCommit(SerialNum serialNum, AttributeWriter::OnWriteDoneType onWriteDone,
- AttributeVector &attr)
+applyCommit(SerialNum serialNum, AttributeWriter::OnWriteDoneType , AttributeVector &attr)
{
- (void) onWriteDone;
if (attr.getStatus().getLastSyncToken() <= serialNum) {
attr.commit(serialNum, serialNum);
}
@@ -160,8 +162,7 @@ applyCommit(SerialNum serialNum, AttributeWriter::OnWriteDoneType onWriteDone,
void
-applyCompactLidSpace(uint32_t wantedLidLimit, SerialNum serialNum,
- AttributeVector &attr)
+applyCompactLidSpace(uint32_t wantedLidLimit, SerialNum serialNum, AttributeVector &attr)
{
if (attr.getStatus().getLastSyncToken() < serialNum) {
/*
@@ -177,6 +178,39 @@ applyCompactLidSpace(uint32_t wantedLidLimit, SerialNum serialNum,
}
}
+using AttrUpdates = std::vector<std::pair<AttributeVector *, const FieldUpdate *>>;
+
+struct BatchUpdateTask : public vespalib::Executor::Task {
+
+ BatchUpdateTask(SerialNum serialNum, DocumentIdT lid, bool immediateCommit,
+ AttributeWriter::OnWriteDoneType onWriteDone)
+ : AttrUpdates(),
+ _serialNum(serialNum),
+ _lid(lid),
+ _immediateCommit(immediateCommit),
+ _onWriteDone(onWriteDone)
+ { }
+
+ void run() override {
+ if (_immediateCommit) {
+ for (const auto & update : _updates) {
+ applyUpdateToAttributeAndCommit(_serialNum, *update.second, _lid, *update.first);
+ }
+ } else {
+ for (const auto & update : _updates) {
+ applyUpdateToAttribute(_serialNum, *update.second, _lid, *update.first);
+ }
+ }
+ }
+
+
+ AttrUpdates _updates;
+ SerialNum _serialNum;
+ DocumentIdT _lid;
+ bool _immediateCommit;
+ std::remove_reference_t<AttributeWriter::OnWriteDoneType> _onWriteDone;
+};
+
class FieldContext
{
vespalib::string _name;
@@ -499,6 +533,14 @@ AttributeWriter::update(SerialNum serialNum, const DocumentUpdate &upd, Document
bool immediateCommit, OnWriteDoneType onWriteDone, IFieldUpdateCallback & onUpdate)
{
LOG(debug, "Inspecting update for document %d.", lid);
+ std::vector<std::unique_ptr<BatchUpdateTask>> args;
+ uint32_t numExecutors = _attributeFieldWriter.getNumExecutors();
+ args.reserve(numExecutors);
+ for (uint32_t i(0); i < numExecutors; i++) {
+ args.emplace_back(std::make_unique<BatchUpdateTask>(serialNum, lid, immediateCommit, onWriteDone));
+ args.back()->_updates.reserve((2*upd.getUpdates().size())/numExecutors);
+ }
+
for (const auto &fupd : upd.getUpdates()) {
LOG(debug, "Retrieving guard for attribute vector '%s'.", fupd.getField().getName().c_str());
AttributeVector *attrp = _mgr->getWritableAttribute(fupd.getField().getName());
@@ -507,20 +549,19 @@ AttributeWriter::update(SerialNum serialNum, const DocumentUpdate &upd, Document
LOG(spam, "Failed to find attribute vector %s", fupd.getField().getName().c_str());
continue;
}
- AttributeVector &attr = *attrp;
// TODO: Check if we must use > due to multiple entries for same
// document and attribute.
- if (attr.getStatus().getLastSyncToken() >= serialNum)
+ if (attrp->getStatus().getLastSyncToken() >= serialNum)
continue;
-
- LOG(debug, "About to apply update for docId %u in attribute vector '%s'.", lid, attr.getName().c_str());
-
- // NOTE: The lifetime of the field update will be ensured by keeping the document update alive
- // in a operation done context object.
- _attributeFieldWriter.execute(attr.getName(),
- [serialNum, &fupd, lid, immediateCommit, &attr, onWriteDone]()
- { applyUpdateToAttribute(serialNum, fupd, lid, immediateCommit, attr, onWriteDone); });
+ args[_attributeFieldWriter.getExecutorId(attrp->getName())]->_updates.emplace_back(attrp, &fupd);
+ LOG(debug, "About to apply update for docId %u in attribute vector '%s'.", lid, attrp->getName().c_str());
+ }
+ // NOTE: The lifetime of the field update will be ensured by keeping the document update alive
+ // in a operation done context object.
+ for (uint32_t id(0); id < args.size(); id++) {
+ _attributeFieldWriter.executeTask(id, std::move(args[id]));
}
+
}
void