summaryrefslogtreecommitdiffstats
path: root/searchcore
diff options
context:
space:
mode:
authorTor Egge <Tor.Egge@yahoo-inc.com>2017-05-04 15:45:55 +0000
committerTor Egge <Tor.Egge@yahoo-inc.com>2017-05-04 15:45:55 +0000
commit44df619d2a1ab78c32f4600599d2027c317c4db7 (patch)
treec02c002819c592e9daa2fe44ce341960797fb741 /searchcore
parenta2aff166a08556c2d934fc5584bbc6a84b7ac42c (diff)
Handle multiple attributes per scheduled tasks, to reduce task scheduling
overhead when number of attributes is much larger than number of attribute write threads.
Diffstat (limited to 'searchcore')
-rw-r--r--searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp308
-rw-r--r--searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.h27
2 files changed, 263 insertions, 72 deletions
diff --git a/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp b/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp
index 01727800a44..94d2f22f0a2 100644
--- a/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp
+++ b/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp
@@ -13,12 +13,59 @@ LOG_SETUP(".proton.server.attributeadapter");
#include <vespa/searchlib/common/isequencedtaskexecutor.h>
#include <vespa/document/fieldvalue/weightedsetfieldvalue.h>
#include <vespa/document/fieldvalue/arrayfieldvalue.h>
+#include <type_traits>
using namespace document;
using namespace search;
namespace proton {
+AttributeWriter::WriteContext::WriteContext(uint32_t executorId)
+ : _executorId(executorId),
+ _fieldPaths(),
+ _attributes()
+{
+}
+
+
+AttributeWriter::WriteContext::WriteContext(WriteContext &&rhs) = default;
+
+AttributeWriter::WriteContext::~WriteContext() = default;
+
+AttributeWriter::WriteContext &AttributeWriter::WriteContext::operator=(WriteContext &&rhs) = default;
+
+void
+AttributeWriter::WriteContext::add(AttributeVector *attr)
+{
+ _attributes.emplace_back(attr);
+ _fieldPaths.emplace_back();
+}
+
+void
+AttributeWriter::WriteContext::buildFieldPaths(const DocumentType &docType)
+{
+ size_t fieldId = 0;
+ for (const auto &attrp : _attributes) {
+ vespalib::string name = attrp->getName();
+ FieldPath::UP fp = docType.buildFieldPath(name);
+ if (!fp) {
+ /// Should be exception but due to incomplete unit test we can not be strict enough, must fix unit test proton/docsummary
+ // The above comment is actually incorrect. This is expected during reconfig as long as do not stop accepting feed while doing reconfig.
+ // throw std::runtime_error(vespalib::make_string("Mismatch between documentdefinition and schema. No field named '%s' from schema in document type '%s'", attribute.getName().c_str(), docType.getName().c_str()));
+ LOG(warning,
+ "Mismatch between documentdefinition and schema. "
+ "No field named '%s' from schema in document type '%s'. "
+ "This might happen if an attribute field has been added and you are feeding while reconfiguring",
+ name.c_str(),
+ docType.getName().c_str());
+ }
+ assert(fieldId < _fieldPaths.size());
+ _fieldPaths[fieldId] = std::move(fp);
+ ++fieldId;
+ }
+ assert(fieldId == _fieldPaths.size());
+}
+
namespace {
void
@@ -108,59 +155,198 @@ applyCompactLidSpace(uint32_t wantedLidLimit, SerialNum serialNum,
}
}
+class FieldContext
+{
+ vespalib::string _name;
+ uint32_t _executorId;
+ AttributeVector *_attr;
+
+public:
+ FieldContext(ISequencedTaskExecutor &writer, AttributeVector *attr);
+ ~FieldContext();
+ bool operator<(const FieldContext &rhs) const;
+ uint32_t getExecutorId() const { return _executorId; }
+ AttributeVector *getAttribute() const { return _attr; }
+};
+
+
+FieldContext::FieldContext(ISequencedTaskExecutor &writer, AttributeVector *attr)
+ : _name(attr->getName()),
+ _executorId(writer.getExecutorId(_name)),
+ _attr(attr)
+{
+}
+
+FieldContext::~FieldContext() = default;
+
+bool
+FieldContext::operator<(const FieldContext &rhs) const
+{
+ if (_executorId != rhs._executorId) {
+ return _executorId < rhs._executorId;
+ }
+ return _name < rhs._name;
+}
+
+class PutTask : public vespalib::Executor::Task
+{
+ const AttributeWriter::WriteContext &_wc;
+ const SerialNum _serialNum;
+ const uint32_t _lid;
+ const bool _immediateCommit;
+ std::remove_reference_t<AttributeWriter::OnWriteDoneType> _onWriteDone;
+ std::vector<FieldValue::UP> _fieldValues;
+public:
+ PutTask(const AttributeWriter::WriteContext &wc, SerialNum serialNum, const Document &doc, uint32_t lid, bool immediateCommit, AttributeWriter::OnWriteDoneType onWriteDone);
+ virtual ~PutTask() override;
+ virtual void run() override;
+};
+
+PutTask::PutTask(const AttributeWriter::WriteContext &wc, SerialNum serialNum, const Document &doc, uint32_t lid, bool immediateCommit, AttributeWriter::OnWriteDoneType onWriteDone)
+ : _wc(wc),
+ _serialNum(serialNum),
+ _lid(lid),
+ _immediateCommit(immediateCommit),
+ _onWriteDone(onWriteDone)
+{
+ const auto &fieldPaths = _wc.getFieldPaths();
+ _fieldValues.reserve(fieldPaths.size());
+ for (const auto &fieldPath : fieldPaths) {
+ FieldValue::UP fv;
+ if (fieldPath) {
+ fv = doc.getNestedFieldValue(fieldPath->getFullRange());
+ }
+ _fieldValues.emplace_back(std::move(fv));
+ }
+}
+
+PutTask::~PutTask()
+{
}
void
-AttributeWriter::buildFieldPath(const DocumentType & docType, const DataType *dataType)
+PutTask::run()
{
- _fieldPaths.clear();
- _fieldPaths.resize(_writableAttributes.size());
- size_t i = 0;
- for (auto attrp : _writableAttributes) {
- const search::attribute::IAttributeVector & attribute(*attrp);
- FieldPath::UP fp = docType.buildFieldPath(attribute.getName());
- if (fp.get() == NULL) {
- /// Should be exception but due to incomplete unit test we can not be strict enough, must fix unit test proton/docsummary
- // The above comment is actually incorrect. This is expected during reconfig as long as do not stop accepting feed while doing reconfig.
- // throw std::runtime_error(vespalib::make_string("Mismatch between documentdefinition and schema. No field named '%s' from schema in document type '%s'", attribute.getName().c_str(), docType.getName().c_str()));
- LOG(warning,
- "Mismatch between documentdefinition and schema. "
- "No field named '%s' from schema in document type '%s'. "
- "This might happen if an attribute field has been added and you are feeding while reconfiguring",
- attribute.getName().c_str(),
- docType.getName().c_str());
+ uint32_t fieldId = 0;
+ const auto &attributes = _wc.getAttributes();
+ for (auto attrp : attributes) {
+ AttributeVector &attr = *attrp;
+ if (attr.getStatus().getLastSyncToken() < _serialNum) {
+ applyPutToAttribute(_serialNum, _fieldValues[fieldId], _lid, _immediateCommit, attr, _onWriteDone);
}
- _fieldPaths[i] = std::move(fp);
- ++i;
+ ++fieldId;
}
- _dataType = dataType;
+}
+
+class RemoveTask : public vespalib::Executor::Task
+{
+ const AttributeWriter::WriteContext &_wc;
+ const SerialNum _serialNum;
+ const uint32_t _lid;
+ const bool _immediateCommit;
+ std::remove_reference_t<AttributeWriter::OnWriteDoneType> _onWriteDone;
+public:
+ RemoveTask(const AttributeWriter::WriteContext &wc, SerialNum serialNum, uint32_t lid, bool immediateCommit, AttributeWriter::OnWriteDoneType onWriteDone);
+ virtual ~RemoveTask() override;
+ virtual void run() override;
+};
+
+
+RemoveTask::RemoveTask(const AttributeWriter::WriteContext &wc, SerialNum serialNum, uint32_t lid, bool immediateCommit, AttributeWriter::OnWriteDoneType onWriteDone)
+ : _wc(wc),
+ _serialNum(serialNum),
+ _lid(lid),
+ _immediateCommit(immediateCommit),
+ _onWriteDone(onWriteDone)
+{
+}
+
+RemoveTask::~RemoveTask()
+{
}
void
-AttributeWriter::internalPut(SerialNum serialNum, const Document &doc, DocumentIdT lid,
- bool immediateCommit, OnWriteDoneType onWriteDone)
+RemoveTask::run()
{
- size_t fieldId = 0;
- for (auto attrp : _writableAttributes) {
+ const auto &attributes = _wc.getAttributes();
+ for (auto &attrp : attributes) {
AttributeVector &attr = *attrp;
- if (attr.getStatus().getLastSyncToken() >= serialNum) {
- LOG(debug, "internalPut(): change already applied: serial=%" PRIu64 ""
- ", docId='%s', lid=%u, attribute='%s', lastSyncToken=%" PRIu64 "",
- serialNum, doc.getId().toString().c_str(), lid, attr.getName().c_str(),
- attr.getStatus().getLastSyncToken());
- ++fieldId;
- continue; // Change already applied
+ // Must use <= due to batch remove
+ if (attr.getStatus().getLastSyncToken() <= _serialNum) {
+ applyRemoveToAttribute(_serialNum, _lid, _immediateCommit, attr, _onWriteDone);
}
- const FieldPath *const fieldPath(_fieldPaths[fieldId].get());
- FieldValue::UP fv;
- if (fieldPath != NULL) {
- fv = doc.getNestedFieldValue(fieldPath->getFullRange());
+ }
+}
+
+class CommitTask : public vespalib::Executor::Task
+{
+ const AttributeWriter::WriteContext &_wc;
+ const SerialNum _serialNum;
+ std::remove_reference_t<AttributeWriter::OnWriteDoneType> _onWriteDone;
+public:
+ CommitTask(const AttributeWriter::WriteContext &wc, SerialNum serialNum, AttributeWriter::OnWriteDoneType onWriteDone);
+ virtual ~CommitTask() override;
+ virtual void run() override;
+};
+
+
+CommitTask::CommitTask(const AttributeWriter::WriteContext &wc, SerialNum serialNum, AttributeWriter::OnWriteDoneType onWriteDone)
+ : _wc(wc),
+ _serialNum(serialNum),
+ _onWriteDone(onWriteDone)
+{
+}
+
+CommitTask::~CommitTask()
+{
+}
+
+void
+CommitTask::run()
+{
+ const auto &attributes = _wc.getAttributes();
+ for (auto &attrp : attributes) {
+ AttributeVector &attr = *attrp;
+ applyCommit(_serialNum, _onWriteDone, attr);
+ }
+}
+
+}
+
+void
+AttributeWriter::setupWriteContexts()
+{
+ std::vector<FieldContext> fieldContexts;
+ assert(_writeContexts.empty());
+ for (auto attr : _writableAttributes) {
+ fieldContexts.emplace_back(_attributeFieldWriter, attr);
+ }
+ std::sort(fieldContexts.begin(), fieldContexts.end());
+ for (auto &fc : fieldContexts) {
+ if (_writeContexts.empty() ||
+ (_writeContexts.back().getExecutorId() != fc.getExecutorId())) {
+ _writeContexts.emplace_back(fc.getExecutorId());
}
+ _writeContexts.back().add(fc.getAttribute());
+ }
+}
- _attributeFieldWriter.execute(attr.getName(),
- [serialNum, fv(std::move(fv)), lid, immediateCommit, &attr, onWriteDone]()
- { applyPutToAttribute(serialNum, fv, lid, immediateCommit, attr, onWriteDone); });
- ++fieldId;
+void
+AttributeWriter::buildFieldPaths(const DocumentType & docType, const DataType *dataType)
+{
+ for (auto &wc : _writeContexts) {
+ wc.buildFieldPaths(docType);
+ }
+ _dataType = dataType;
+}
+
+void
+AttributeWriter::internalPut(SerialNum serialNum, const Document &doc, DocumentIdT lid,
+ bool immediateCommit, OnWriteDoneType onWriteDone)
+{
+ for (const auto &wc : _writeContexts) {
+ auto putTask = std::make_unique<PutTask>(wc, serialNum, doc, lid, immediateCommit, onWriteDone);
+ _attributeFieldWriter.executeTask(wc.getExecutorId(), std::move(putTask));
}
}
@@ -169,32 +355,26 @@ AttributeWriter::internalRemove(SerialNum serialNum, DocumentIdT lid,
bool immediateCommit,
OnWriteDoneType onWriteDone)
{
- for (auto &attrp : _writableAttributes) {
- AttributeVector &attr = *attrp;
- // XXX: Want to use >=, but must use > due to batch remove
- // Might be OK due to clearDoc() being idempotent.
- if (attr.getStatus().getLastSyncToken() > serialNum)
- continue; // Change already applied
- LOG(debug, "About to remove docId %u from attribute vector '%s'.",
- lid, attr.getName().c_str());
-
- _attributeFieldWriter.execute(attr.getName(),
- [serialNum, lid, immediateCommit, &attr, onWriteDone]()
- { applyRemoveToAttribute(serialNum, lid, immediateCommit, attr, onWriteDone); });
+ for (const auto &wc : _writeContexts) {
+ auto removeTask = std::make_unique<RemoveTask>(wc, serialNum, lid, immediateCommit, onWriteDone);
+ _attributeFieldWriter.executeTask(wc.getExecutorId(), std::move(removeTask));
}
}
AttributeWriter::AttributeWriter(const proton::IAttributeManager::SP &mgr)
: _mgr(mgr),
- _fieldPaths(),
- _dataType(),
- _fieldPathsDocTypeName(),
_attributeFieldWriter(mgr->getAttributeFieldWriter()),
- _writableAttributes(mgr->getWritableAttributes())
+ _writableAttributes(mgr->getWritableAttributes()),
+ _writeContexts(),
+ _dataType(nullptr)
{
+ setupWriteContexts();
}
-AttributeWriter::~AttributeWriter() {}
+AttributeWriter::~AttributeWriter()
+{
+ _attributeFieldWriter.sync();
+}
std::vector<search::AttributeVector *>
AttributeWriter::getWritableAttributes() const
@@ -221,11 +401,8 @@ AttributeWriter::put(SerialNum serialNum, const Document &doc, DocumentIdT lid,
lid,
doc.toString(true).c_str());
const DataType *dataType(doc.getDataType());
- if (_fieldPaths.empty() ||
- _dataType != dataType ||
- doc.getType().getName() != _fieldPathsDocTypeName) {
- buildFieldPath(doc.getType(), dataType);
- _fieldPathsDocTypeName = doc.getType().getName();
+ if (_dataType != dataType) {
+ buildFieldPaths(doc.getType(), dataType);
}
internalPut(serialNum, doc, lid, immediateCommit, onWriteDone);
}
@@ -293,12 +470,9 @@ AttributeWriter::heartBeat(SerialNum serialNum)
void
AttributeWriter::commit(SerialNum serialNum, OnWriteDoneType onWriteDone)
{
- for (auto attrp : _writableAttributes) {
- auto &attr = *attrp;
- _attributeFieldWriter.execute(attr.getName(),
- [serialNum, onWriteDone, &attr]()
- { applyCommit(serialNum, onWriteDone,
- attr); });
+ for (const auto &wc : _writeContexts) {
+ auto commitTask = std::make_unique<CommitTask>(wc, serialNum, onWriteDone);
+ _attributeFieldWriter.executeTask(wc.getExecutorId(), std::move(commitTask));
}
}
diff --git a/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.h b/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.h
index 32b23d93e4c..446acf80c5c 100644
--- a/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.h
+++ b/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.h
@@ -19,15 +19,32 @@ private:
typedef document::DataType DataType;
typedef document::DocumentType DocumentType;
typedef document::FieldValue FieldValue;
- typedef std::vector<std::unique_ptr<FieldPath> > AttributeFieldPaths;
const IAttributeManager::SP _mgr;
- AttributeFieldPaths _fieldPaths;
- const DataType *_dataType;
- vespalib::string _fieldPathsDocTypeName;
search::ISequencedTaskExecutor &_attributeFieldWriter;
const std::vector<search::AttributeVector *> &_writableAttributes;
+public:
+ class WriteContext
+ {
+ uint32_t _executorId;
+ std::vector<std::unique_ptr<FieldPath>> _fieldPaths;
+ std::vector<AttributeVector *> _attributes;
+ public:
+ WriteContext(uint32_t executorId);
+ WriteContext(WriteContext &&rhs);
+ ~WriteContext();
+ WriteContext &operator=(WriteContext &&rhs);
+ void buildFieldPaths(const DocumentType &docType);
+ void add(AttributeVector *attr);
+ uint32_t getExecutorId() const { return _executorId; }
+ const std::vector<std::unique_ptr<FieldPath>> &getFieldPaths() const { return _fieldPaths; }
+ const std::vector<AttributeVector *> &getAttributes() const { return _attributes; }
+ };
+private:
+ std::vector<WriteContext> _writeContexts;
+ const DataType *_dataType;
- void buildFieldPath(const DocumentType &docType, const DataType *dataType);
+ void setupWriteContexts();
+ void buildFieldPaths(const DocumentType &docType, const DataType *dataType);
void internalPut(SerialNum serialNum, const Document &doc, DocumentIdT lid,
bool immediateCommit, OnWriteDoneType onWriteDone);
void internalRemove(SerialNum serialNum, DocumentIdT lid,