summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2018-06-11 18:49:34 +0200
committerGitHub <noreply@github.com>2018-06-11 18:49:34 +0200
commit8fca37dad58eb97bd8cc34cf1b72bda0226ba31f (patch)
tree26bca3522e35766ddd04e70d147d5c94004af2d2
parente38286f1aae25d1e66bbc94efa1514b5b946a86a (diff)
parenta2421dafa786d686bab33fadae6d3b0d39ac2a96 (diff)
Merge pull request #6164 from vespa-engine/balder/batch-attribute-updates
Balder/batch attribute updates
-rw-r--r--searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp85
1 files changed, 65 insertions, 20 deletions
diff --git a/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp b/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp
index 2d45d6aa02d..2d78f837ecf 100644
--- a/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp
+++ b/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp
@@ -12,6 +12,8 @@
#include <vespa/searchlib/attribute/attributevector.hpp>
#include <vespa/searchlib/attribute/imported_attribute_vector.h>
#include <vespa/searchlib/common/isequencedtaskexecutor.h>
+#include <vespa/searchlib/common/idestructorcallback.h>
+
#include <vespa/log/log.h>
LOG_SETUP(".proton.server.attributeadapter");
@@ -119,16 +121,20 @@ applyRemoveToAttribute(SerialNum serialNum, DocumentIdT lid, bool immediateCommi
void
applyUpdateToAttribute(SerialNum serialNum, const FieldUpdate &fieldUpd,
- DocumentIdT lid, bool immediateCommit, AttributeVector &attr,
- AttributeWriter::OnWriteDoneType)
+ DocumentIdT lid, AttributeVector &attr)
{
ensureLidSpace(serialNum, lid, attr);
AttrUpdate::handleUpdate(attr, lid, fieldUpd);
- if (immediateCommit) {
- attr.commit(serialNum, serialNum);
- }
}
+void
+applyUpdateToAttributeAndCommit(SerialNum serialNum, const FieldUpdate &fieldUpd,
+ DocumentIdT lid, AttributeVector &attr)
+{
+ ensureLidSpace(serialNum, lid, attr);
+ AttrUpdate::handleUpdate(attr, lid, fieldUpd);
+ attr.commit(serialNum, serialNum);
+}
void
applyReplayDone(uint32_t docIdLimit, AttributeVector &attr)
@@ -149,10 +155,8 @@ applyHeartBeat(SerialNum serialNum, AttributeVector &attr)
}
void
-applyCommit(SerialNum serialNum, AttributeWriter::OnWriteDoneType onWriteDone,
- AttributeVector &attr)
+applyCommit(SerialNum serialNum, AttributeWriter::OnWriteDoneType , AttributeVector &attr)
{
- (void) onWriteDone;
if (attr.getStatus().getLastSyncToken() <= serialNum) {
attr.commit(serialNum, serialNum);
}
@@ -160,8 +164,7 @@ applyCommit(SerialNum serialNum, AttributeWriter::OnWriteDoneType onWriteDone,
void
-applyCompactLidSpace(uint32_t wantedLidLimit, SerialNum serialNum,
- AttributeVector &attr)
+applyCompactLidSpace(uint32_t wantedLidLimit, SerialNum serialNum, AttributeVector &attr)
{
if (attr.getStatus().getLastSyncToken() < serialNum) {
/*
@@ -177,6 +180,38 @@ applyCompactLidSpace(uint32_t wantedLidLimit, SerialNum serialNum,
}
}
+using AttrUpdates = std::vector<std::pair<AttributeVector *, const FieldUpdate *>>;
+
+struct BatchUpdateTask : public vespalib::Executor::Task {
+
+ BatchUpdateTask(SerialNum serialNum, DocumentIdT lid, bool immediateCommit)
+ : vespalib::Executor::Task(),
+ _serialNum(serialNum),
+ _lid(lid),
+ _immediateCommit(immediateCommit),
+ _onWriteDone()
+ { }
+
+ void run() override {
+ if (_immediateCommit) {
+ for (const auto & update : _updates) {
+ applyUpdateToAttributeAndCommit(_serialNum, *update.second, _lid, *update.first);
+ }
+ } else {
+ for (const auto & update : _updates) {
+ applyUpdateToAttribute(_serialNum, *update.second, _lid, *update.first);
+ }
+ }
+ }
+
+
+ SerialNum _serialNum;
+ DocumentIdT _lid;
+ bool _immediateCommit;
+ AttrUpdates _updates;
+ search::IDestructorCallback::SP _onWriteDone;
+};
+
class FieldContext
{
vespalib::string _name;
@@ -499,6 +534,14 @@ AttributeWriter::update(SerialNum serialNum, const DocumentUpdate &upd, Document
bool immediateCommit, OnWriteDoneType onWriteDone, IFieldUpdateCallback & onUpdate)
{
LOG(debug, "Inspecting update for document %d.", lid);
+ std::vector<std::unique_ptr<BatchUpdateTask>> args;
+ uint32_t numExecutors = _attributeFieldWriter.getNumExecutors();
+ args.reserve(numExecutors);
+ for (uint32_t i(0); i < numExecutors; i++) {
+ args.emplace_back(std::make_unique<BatchUpdateTask>(serialNum, lid, immediateCommit));
+ args.back()->_updates.reserve((2*upd.getUpdates().size())/numExecutors);
+ }
+
for (const auto &fupd : upd.getUpdates()) {
LOG(debug, "Retrieving guard for attribute vector '%s'.", fupd.getField().getName().c_str());
AttributeVector *attrp = _mgr->getWritableAttribute(fupd.getField().getName());
@@ -507,20 +550,22 @@ AttributeWriter::update(SerialNum serialNum, const DocumentUpdate &upd, Document
LOG(spam, "Failed to find attribute vector %s", fupd.getField().getName().c_str());
continue;
}
- AttributeVector &attr = *attrp;
// TODO: Check if we must use > due to multiple entries for same
// document and attribute.
- if (attr.getStatus().getLastSyncToken() >= serialNum)
+ if (attrp->getStatus().getLastSyncToken() >= serialNum)
continue;
-
- LOG(debug, "About to apply update for docId %u in attribute vector '%s'.", lid, attr.getName().c_str());
-
- // NOTE: The lifetime of the field update will be ensured by keeping the document update alive
- // in a operation done context object.
- _attributeFieldWriter.execute(attr.getName(),
- [serialNum, &fupd, lid, immediateCommit, &attr, onWriteDone]()
- { applyUpdateToAttribute(serialNum, fupd, lid, immediateCommit, attr, onWriteDone); });
+ args[_attributeFieldWriter.getExecutorId(attrp->getName())]->_updates.emplace_back(attrp, &fupd);
+ LOG(debug, "About to apply update for docId %u in attribute vector '%s'.", lid, attrp->getName().c_str());
}
+ // NOTE: The lifetime of the field update will be ensured by keeping the document update alive
+ // in a operation done context object.
+ for (uint32_t id(0); id < args.size(); id++) {
+ if ( ! args[id]->_updates.empty()) {
+ args[id]->_onWriteDone = onWriteDone;
+ _attributeFieldWriter.executeTask(id, std::move(args[id]));
+ }
+ }
+
}
void