summaryrefslogtreecommitdiffstats
path: root/searchcore
diff options
context:
space:
mode:
authorGeir Storli <geirst@oath.com>2017-11-07 11:43:31 +0000
committerGeir Storli <geirst@oath.com>2017-11-07 11:51:34 +0000
commit7b5f0ac4f504714d816b1bc844077b7744ba01f6 (patch)
tree57528ebd64b846d823e901d7e98876bc44a84c25 /searchcore
parent24d8206010c9a3cfe649025be96818fb97cc2910 (diff)
Only commit attributes once when handling a batch remove in attribute writer.
This avoids potential accumulation of dead memory when handling delete bucket operations while attributes are being flushed and attribute guards being held.
Diffstat (limited to 'searchcore')
-rw-r--r--searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp45
1 files changed, 41 insertions, 4 deletions
diff --git a/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp b/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp
index 7413d0f369f..816a434e56a 100644
--- a/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp
+++ b/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp
@@ -19,6 +19,8 @@ using search::attribute::ImportedAttributeVector;
namespace proton {
+using LidVector = LidVectorContext::LidVector;
+
AttributeWriter::WriteContext::WriteContext(uint32_t executorId)
: _executorId(executorId),
_fieldPaths(),
@@ -272,13 +274,47 @@ RemoveTask::run()
const auto &attributes = _wc.getAttributes();
for (auto &attrp : attributes) {
AttributeVector &attr = *attrp;
- // Must use <= due to batch remove
- if (attr.getStatus().getLastSyncToken() <= _serialNum) {
+ if (attr.getStatus().getLastSyncToken() < _serialNum) {
applyRemoveToAttribute(_serialNum, _lid, _immediateCommit, attr, _onWriteDone);
}
}
}
+class BatchRemoveTask : public vespalib::Executor::Task
+{
+private:
+ const AttributeWriter::WriteContext &_writeCtx;
+ const SerialNum _serialNum;
+ const LidVector _lidsToRemove;
+ const bool _immediateCommit;
+ std::remove_reference_t<AttributeWriter::OnWriteDoneType> _onWriteDone;
+public:
+ BatchRemoveTask(const AttributeWriter::WriteContext &writeCtx,
+ SerialNum serialNum,
+ const LidVector &lidsToRemove,
+ bool immediateCommit,
+ AttributeWriter::OnWriteDoneType onWriteDone)
+ : _writeCtx(writeCtx),
+ _serialNum(serialNum),
+ _lidsToRemove(lidsToRemove),
+ _immediateCommit(immediateCommit),
+ _onWriteDone(onWriteDone)
+ {}
+ virtual ~BatchRemoveTask() override {}
+ virtual void run() override {
+ for (auto attr : _writeCtx.getAttributes()) {
+ if (attr->getStatus().getLastSyncToken() < _serialNum) {
+ for (auto lidToRemove : _lidsToRemove) {
+ applyRemoveToAttribute(_serialNum, lidToRemove, false, *attr, _onWriteDone);
+ }
+ if (_immediateCommit) {
+ attr->commit(_serialNum, _serialNum);
+ }
+ }
+ }
+ }
+};
+
class CommitTask : public vespalib::Executor::Task
{
const AttributeWriter::WriteContext &_wc;
@@ -419,8 +455,9 @@ void
AttributeWriter::remove(const LidVector &lidsToRemove, SerialNum serialNum,
bool immediateCommit, OnWriteDoneType onWriteDone)
{
- for (const auto &lid : lidsToRemove) {
- internalRemove(serialNum, lid, immediateCommit, onWriteDone);
+ for (const auto &writeCtx : _writeContexts) {
+ auto removeTask = std::make_unique<BatchRemoveTask>(writeCtx, serialNum, lidsToRemove, immediateCommit, onWriteDone);
+ _attributeFieldWriter.executeTask(writeCtx.getExecutorId(), std::move(removeTask));
}
}