aboutsummaryrefslogtreecommitdiffstats
path: root/searchcore
diff options
context:
space:
mode:
authorGeir Storli <geirst@oath.com>2017-11-07 11:43:31 +0000
committerGeir Storli <geirst@oath.com>2017-11-08 11:48:40 +0000
commit50ceda8ed58f57cc3d9116301ed3785abb975a52 (patch)
tree6ca701c045413e3d71f4a847c76370cbba797d85 /searchcore
parent24afca4a97de0d3e14b78de423fcc011d2d14519 (diff)
Only commit attributes once when handling a batch remove in attribute writer.
This avoids potential accumulation of dead memory when handling delete bucket operations while attributes are being flushed and attribute guards being held.
Diffstat (limited to 'searchcore')
-rw-r--r--searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp44
1 files changed, 41 insertions, 3 deletions
diff --git a/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp b/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp
index 7413d0f369f..a6329db2aee 100644
--- a/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp
+++ b/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp
@@ -19,6 +19,8 @@ using search::attribute::ImportedAttributeVector;
namespace proton {
+using LidVector = LidVectorContext::LidVector;
+
AttributeWriter::WriteContext::WriteContext(uint32_t executorId)
: _executorId(executorId),
_fieldPaths(),
@@ -272,13 +274,48 @@ RemoveTask::run()
const auto &attributes = _wc.getAttributes();
for (auto &attrp : attributes) {
AttributeVector &attr = *attrp;
- // Must use <= due to batch remove
+ // Must use <= due to how move operations are handled
if (attr.getStatus().getLastSyncToken() <= _serialNum) {
applyRemoveToAttribute(_serialNum, _lid, _immediateCommit, attr, _onWriteDone);
}
}
}
+class BatchRemoveTask : public vespalib::Executor::Task
+{
+private:
+ const AttributeWriter::WriteContext &_writeCtx;
+ const SerialNum _serialNum;
+ const LidVector _lidsToRemove;
+ const bool _immediateCommit;
+ std::remove_reference_t<AttributeWriter::OnWriteDoneType> _onWriteDone;
+public:
+ BatchRemoveTask(const AttributeWriter::WriteContext &writeCtx,
+ SerialNum serialNum,
+ const LidVector &lidsToRemove,
+ bool immediateCommit,
+ AttributeWriter::OnWriteDoneType onWriteDone)
+ : _writeCtx(writeCtx),
+ _serialNum(serialNum),
+ _lidsToRemove(lidsToRemove),
+ _immediateCommit(immediateCommit),
+ _onWriteDone(onWriteDone)
+ {}
+ virtual ~BatchRemoveTask() override {}
+ virtual void run() override {
+ for (auto attr : _writeCtx.getAttributes()) {
+ if (attr->getStatus().getLastSyncToken() < _serialNum) {
+ for (auto lidToRemove : _lidsToRemove) {
+ applyRemoveToAttribute(_serialNum, lidToRemove, false, *attr, _onWriteDone);
+ }
+ if (_immediateCommit) {
+ attr->commit(_serialNum, _serialNum);
+ }
+ }
+ }
+ }
+};
+
class CommitTask : public vespalib::Executor::Task
{
const AttributeWriter::WriteContext &_wc;
@@ -419,8 +456,9 @@ void
AttributeWriter::remove(const LidVector &lidsToRemove, SerialNum serialNum,
bool immediateCommit, OnWriteDoneType onWriteDone)
{
- for (const auto &lid : lidsToRemove) {
- internalRemove(serialNum, lid, immediateCommit, onWriteDone);
+ for (const auto &writeCtx : _writeContexts) {
+ auto removeTask = std::make_unique<BatchRemoveTask>(writeCtx, serialNum, lidsToRemove, immediateCommit, onWriteDone);
+ _attributeFieldWriter.executeTask(writeCtx.getExecutorId(), std::move(removeTask));
}
}