diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2017-09-15 17:50:56 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-09-15 17:50:56 +0200 |
commit | 78dbf5da1d22f28d0dde886e13f1a4c384d612b5 (patch) | |
tree | 24dba586b825b2257bf5f059935160cf85e47307 /searchcore | |
parent | 49ddfc889623fdeb5c255f349389f924eb62e783 (diff) | |
parent | 6237802f98df78c21643b72814ef4a4ea2752f1f (diff) |
Merge pull request #3419 from vespa-engine/toregge/fixup-notify-putdone-remove-removedone-issues
Toregge/fixup notify putdone remove removedone issues
Diffstat (limited to 'searchcore')
21 files changed, 256 insertions, 117 deletions
diff --git a/searchcore/src/tests/proton/documentdb/feedview/feedview_test.cpp b/searchcore/src/tests/proton/documentdb/feedview/feedview_test.cpp index c02aa033bb9..eeec8122703 100644 --- a/searchcore/src/tests/proton/documentdb/feedview/feedview_test.cpp +++ b/searchcore/src/tests/proton/documentdb/feedview/feedview_test.cpp @@ -198,7 +198,7 @@ public: { } - virtual void notifyPut(document::GlobalId gid, uint32_t lid, SerialNum) override { + virtual void notifyPutDone(document::GlobalId gid, uint32_t lid, SerialNum) override { _changeGid = gid; _changeLid = lid; _gidToLid[gid] = lid; diff --git a/searchcore/src/tests/proton/reference/gid_to_lid_change_handler/gid_to_lid_change_handler_test.cpp b/searchcore/src/tests/proton/reference/gid_to_lid_change_handler/gid_to_lid_change_handler_test.cpp index 1c5287f15e5..cddce944e4d 100644 --- a/searchcore/src/tests/proton/reference/gid_to_lid_change_handler/gid_to_lid_change_handler_test.cpp +++ b/searchcore/src/tests/proton/reference/gid_to_lid_change_handler/gid_to_lid_change_handler_test.cpp @@ -53,7 +53,7 @@ public: EXPECT_EQUAL(_createdListeners, _destroyedListeners); } - void notifyPut() { + void notifyPutDone() { lock_guard guard(_lock); ++_putChanges; } @@ -101,7 +101,7 @@ public: _stats.markCreatedListener(); } virtual ~MyListener() { _stats.markDestroyedListener(); } - virtual void notifyPut(GlobalId, uint32_t) override { _stats.notifyPut(); } + virtual void notifyPutDone(GlobalId, uint32_t) override { _stats.notifyPutDone(); } virtual void notifyRemove(GlobalId) override { _stats.notifyRemove(); } virtual void notifyRegistered() override { _stats.markRegisteredListener(); } virtual const vespalib::string &getName() const override { return _name; } @@ -138,8 +138,8 @@ struct Fixture _handler->addListener(std::move(listener)); } - void notifyPut(GlobalId gid, uint32_t lid, SerialNum serialNum) { - _handler->notifyPut(gid, lid, serialNum); + void notifyPutDone(GlobalId gid, uint32_t lid, SerialNum serialNum) { + _handler->notifyPutDone(gid, lid, serialNum); } void notifyRemove(GlobalId gid, SerialNum serialNum) { @@ -164,7 +164,7 @@ TEST_F("Test that we can register a listener", Fixture) TEST_DO(stats.assertListeners(1, 0, 0)); f.addListener(std::move(listener)); TEST_DO(stats.assertListeners(1, 1, 0)); - f.notifyPut(toGid(doc1), 10, 10); + f.notifyPutDone(toGid(doc1), 10, 10); TEST_DO(stats.assertChanges(1, 0)); f.removeListeners("testdoc", {}); TEST_DO(stats.assertListeners(1, 1, 1)); @@ -187,7 +187,7 @@ TEST_F("Test that we can register multiple listeners", Fixture) TEST_DO(stats1.assertListeners(1, 1, 0)); TEST_DO(stats2.assertListeners(1, 1, 0)); TEST_DO(stats3.assertListeners(1, 1, 0)); - f.notifyPut(toGid(doc1), 10, 10); + f.notifyPutDone(toGid(doc1), 10, 10); TEST_DO(stats1.assertChanges(1, 0)); TEST_DO(stats2.assertChanges(1, 0)); TEST_DO(stats3.assertChanges(1, 0)); @@ -222,42 +222,81 @@ TEST_F("Test that we keep old listener when registering duplicate", Fixture) TEST_DO(stats.assertListeners(2, 1, 1)); } -TEST_F("Test that put is ignored if we have a pending remove", Fixture) +class StatsFixture : public Fixture +{ + ListenerStats &_stats; + +public: + StatsFixture() + : Fixture(), + _stats(addStats()) + { + addListener(std::make_unique<MyListener>(_stats, "test", "testdoc")); + } + + ~StatsFixture() + { + removeListeners("testdoc", {}); + } + + void assertChanges(uint32_t expPutChanges, uint32_t expRemoveChanges) + { + TEST_DO(_stats.assertChanges(expPutChanges, expRemoveChanges)); + } +}; + +TEST_F("Test that put is ignored if we have a pending remove", StatsFixture) { - auto &stats = f.addStats(); - auto listener = std::make_unique<MyListener>(stats, "test", "testdoc"); - f.addListener(std::move(listener)); f.notifyRemove(toGid(doc1), 20); - TEST_DO(stats.assertChanges(0, 1)); - f.notifyPut(toGid(doc1), 10, 10); - TEST_DO(stats.assertChanges(0, 1)); + TEST_DO(f.assertChanges(0, 1)); + f.notifyPutDone(toGid(doc1), 10, 10); + TEST_DO(f.assertChanges(0, 1)); f.notifyRemoveDone(toGid(doc1), 20); - TEST_DO(stats.assertChanges(0, 1)); - f.notifyPut(toGid(doc1), 11, 30); - TEST_DO(stats.assertChanges(1, 1)); - f.removeListeners("testdoc", {}); + TEST_DO(f.assertChanges(0, 1)); + f.notifyPutDone(toGid(doc1), 11, 30); + TEST_DO(f.assertChanges(1, 1)); } -TEST_F("Test that pending removes are merged", Fixture) +TEST_F("Test that pending removes are merged", StatsFixture) { - auto &stats = f.addStats(); - auto listener = std::make_unique<MyListener>(stats, "test", "testdoc"); - f.addListener(std::move(listener)); f.notifyRemove(toGid(doc1), 20); - TEST_DO(stats.assertChanges(0, 1)); + TEST_DO(f.assertChanges(0, 1)); f.notifyRemove(toGid(doc1), 40); - TEST_DO(stats.assertChanges(0, 1)); - f.notifyPut(toGid(doc1), 10, 10); - TEST_DO(stats.assertChanges(0, 1)); + TEST_DO(f.assertChanges(0, 1)); + f.notifyPutDone(toGid(doc1), 10, 10); + TEST_DO(f.assertChanges(0, 1)); f.notifyRemoveDone(toGid(doc1), 20); - TEST_DO(stats.assertChanges(0, 1)); - f.notifyPut(toGid(doc1), 11, 30); - TEST_DO(stats.assertChanges(0, 1)); + TEST_DO(f.assertChanges(0, 1)); + f.notifyPutDone(toGid(doc1), 11, 30); + TEST_DO(f.assertChanges(0, 1)); f.notifyRemoveDone(toGid(doc1), 40); - TEST_DO(stats.assertChanges(0, 1)); - f.notifyPut(toGid(doc1), 12, 50); - TEST_DO(stats.assertChanges(1, 1)); - f.removeListeners("testdoc", {}); + TEST_DO(f.assertChanges(0, 1)); + f.notifyPutDone(toGid(doc1), 12, 50); + TEST_DO(f.assertChanges(1, 1)); +} + +TEST_F("Test that out of order notifyRemoveDone is handled", StatsFixture) +{ + f.notifyRemove(toGid(doc1), 20); + TEST_DO(f.assertChanges(0, 1)); + f.notifyRemove(toGid(doc1), 40); + TEST_DO(f.assertChanges(0, 1)); + f.notifyRemoveDone(toGid(doc1), 40); + TEST_DO(f.assertChanges(0, 1)); + f.notifyRemoveDone(toGid(doc1), 20); + TEST_DO(f.assertChanges(0, 1)); + f.notifyPutDone(toGid(doc1), 12, 50); + TEST_DO(f.assertChanges(1, 1)); +} + +TEST_F("Test that out of order notifyPutDone is handled", StatsFixture) +{ + f.notifyRemove(toGid(doc1), 20); + TEST_DO(f.assertChanges(0, 1)); + f.notifyPutDone(toGid(doc1), 12, 50); + TEST_DO(f.assertChanges(1, 1)); + f.notifyRemoveDone(toGid(doc1), 20); + TEST_DO(f.assertChanges(1, 1)); } } diff --git a/searchcore/src/tests/proton/reference/gid_to_lid_change_listener/gid_to_lid_change_listener_test.cpp b/searchcore/src/tests/proton/reference/gid_to_lid_change_listener/gid_to_lid_change_listener_test.cpp index 4c6f9017000..1cea981e9ff 100644 --- a/searchcore/src/tests/proton/reference/gid_to_lid_change_listener/gid_to_lid_change_listener_test.cpp +++ b/searchcore/src/tests/proton/reference/gid_to_lid_change_listener/gid_to_lid_change_listener_test.cpp @@ -94,8 +94,8 @@ struct Fixture _listener = std::make_unique<GidToLidChangeListener>(_writer, _attr, _refCount, "test", "testdoc"); } - void notifyPut(const GlobalId &gid, uint32_t referencedDoc) { - _listener->notifyPut(gid, referencedDoc); + void notifyPutDone(const GlobalId &gid, uint32_t referencedDoc) { + _listener->notifyPutDone(gid, referencedDoc); } void notifyListenerRegistered() { @@ -114,9 +114,9 @@ TEST_F("Test that we can use gid to lid change listener", Fixture) TEST_DO(f.assertRefLid(0, 2)); TEST_DO(f.assertRefLid(0, 3)); f.allocListener(); - f.notifyPut(toGid(doc1), 10); - f.notifyPut(toGid(doc2), 20); - f.notifyPut(toGid(doc3), 30); + f.notifyPutDone(toGid(doc1), 10); + f.notifyPutDone(toGid(doc2), 20); + f.notifyPutDone(toGid(doc3), 30); TEST_DO(f.assertRefLid(10, 1)); TEST_DO(f.assertRefLid(20, 2)); TEST_DO(f.assertRefLid(10, 3)); diff --git a/searchcore/src/tests/proton/reference/gid_to_lid_change_registrator/gid_to_lid_change_registrator_test.cpp b/searchcore/src/tests/proton/reference/gid_to_lid_change_registrator/gid_to_lid_change_registrator_test.cpp index a5231647158..d240c437ef2 100644 --- a/searchcore/src/tests/proton/reference/gid_to_lid_change_registrator/gid_to_lid_change_registrator_test.cpp +++ b/searchcore/src/tests/proton/reference/gid_to_lid_change_registrator/gid_to_lid_change_registrator_test.cpp @@ -24,7 +24,7 @@ public: { } virtual ~MyListener() { } - virtual void notifyPut(document::GlobalId, uint32_t) override { } + virtual void notifyPutDone(document::GlobalId, uint32_t) override { } virtual void notifyRemove(document::GlobalId) override { } virtual void notifyRegistered() override { } virtual const vespalib::string &getName() const override { return _name; } diff --git a/searchcore/src/vespa/searchcore/proton/reference/CMakeLists.txt b/searchcore/src/vespa/searchcore/proton/reference/CMakeLists.txt index fe2ca7a7a88..a98b095cc21 100644 --- a/searchcore/src/vespa/searchcore/proton/reference/CMakeLists.txt +++ b/searchcore/src/vespa/searchcore/proton/reference/CMakeLists.txt @@ -10,6 +10,7 @@ vespa_add_library(searchcore_reference STATIC gid_to_lid_change_registrator.cpp gid_to_lid_mapper.cpp gid_to_lid_mapper_factory.cpp + pending_notify_remove_done.cpp DEPENDS searchcore_attribute searchcore_documentmetastore diff --git a/searchcore/src/vespa/searchcore/proton/reference/dummy_gid_to_lid_change_handler.cpp b/searchcore/src/vespa/searchcore/proton/reference/dummy_gid_to_lid_change_handler.cpp index c3c516a51dc..8f70d697b9a 100644 --- a/searchcore/src/vespa/searchcore/proton/reference/dummy_gid_to_lid_change_handler.cpp +++ b/searchcore/src/vespa/searchcore/proton/reference/dummy_gid_to_lid_change_handler.cpp @@ -15,7 +15,7 @@ DummyGidToLidChangeHandler::~DummyGidToLidChangeHandler() } void -DummyGidToLidChangeHandler::notifyPut(GlobalId, uint32_t, SerialNum) +DummyGidToLidChangeHandler::notifyPutDone(GlobalId, uint32_t, SerialNum) { } diff --git a/searchcore/src/vespa/searchcore/proton/reference/dummy_gid_to_lid_change_handler.h b/searchcore/src/vespa/searchcore/proton/reference/dummy_gid_to_lid_change_handler.h index 24773924aaa..28eb281ec95 100644 --- a/searchcore/src/vespa/searchcore/proton/reference/dummy_gid_to_lid_change_handler.h +++ b/searchcore/src/vespa/searchcore/proton/reference/dummy_gid_to_lid_change_handler.h @@ -24,7 +24,7 @@ public: DummyGidToLidChangeHandler(); virtual ~DummyGidToLidChangeHandler(); - virtual void notifyPut(GlobalId gid, uint32_t lid, SerialNum serialNum) override; + virtual void notifyPutDone(GlobalId gid, uint32_t lid, SerialNum serialNum) override; virtual void notifyRemove(GlobalId gid, SerialNum serialNum) override; virtual void notifyRemoveDone(GlobalId gid, SerialNum serialNum) override; virtual void addListener(std::unique_ptr<IGidToLidChangeListener> listener) override; diff --git a/searchcore/src/vespa/searchcore/proton/reference/gid_to_lid_change_handler.cpp b/searchcore/src/vespa/searchcore/proton/reference/gid_to_lid_change_handler.cpp index d7b2c16867a..341c5434623 100644 --- a/searchcore/src/vespa/searchcore/proton/reference/gid_to_lid_change_handler.cpp +++ b/searchcore/src/vespa/searchcore/proton/reference/gid_to_lid_change_handler.cpp @@ -31,10 +31,10 @@ GidToLidChangeHandler::~GidToLidChangeHandler() } void -GidToLidChangeHandler::notifyPut(GlobalId gid, uint32_t lid) +GidToLidChangeHandler::notifyPutDone(GlobalId gid, uint32_t lid) { for (const auto &listener : _listeners) { - listener->notifyPut(gid, lid); + listener->notifyPutDone(gid, lid); } } @@ -47,25 +47,36 @@ GidToLidChangeHandler::notifyRemove(GlobalId gid) } void -GidToLidChangeHandler::notifyPut(GlobalId gid, uint32_t lid, SerialNum serialNum) +GidToLidChangeHandler::notifyPutDone(GlobalId gid, uint32_t lid, SerialNum serialNum) { lock_guard guard(_lock); auto itr = _pendingRemove.find(gid); if (itr != _pendingRemove.end()) { - assert(itr->second > serialNum); - return; // Document has already been removed later on + auto &entry = itr->second; + assert(entry.removeSerialNum != serialNum); + if (entry.removeSerialNum > serialNum) { + return; // Document has already been removed later on + } + assert(entry.putSerialNum < serialNum); + entry.putSerialNum = serialNum; } - notifyPut(gid, lid); + notifyPutDone(gid, lid); } void GidToLidChangeHandler::notifyRemove(GlobalId gid, SerialNum serialNum) { lock_guard guard(_lock); - auto insRes = _pendingRemove.insert(std::make_pair(gid, serialNum)); + auto insRes = _pendingRemove.insert(std::make_pair(gid, PendingRemoveEntry(serialNum))); if (!insRes.second) { - assert(insRes.first->second < serialNum); - insRes.first->second = serialNum; + auto &entry = insRes.first->second; + assert(entry.removeSerialNum < serialNum); + assert(entry.putSerialNum < serialNum); + if (entry.removeSerialNum < entry.putSerialNum) { + notifyRemove(gid); + } + entry.removeSerialNum = serialNum; + ++entry.refCount; } else { notifyRemove(gid); } @@ -76,9 +87,13 @@ GidToLidChangeHandler::notifyRemoveDone(GlobalId gid, SerialNum serialNum) { lock_guard guard(_lock); auto itr = _pendingRemove.find(gid); - assert(itr != _pendingRemove.end() && itr->second >= serialNum); - if (itr->second == serialNum) { + assert(itr != _pendingRemove.end()); + auto &entry = itr->second; + assert(entry.removeSerialNum >= serialNum); + if (entry.refCount == 1) { _pendingRemove.erase(itr); + } else { + --entry.refCount; } } diff --git a/searchcore/src/vespa/searchcore/proton/reference/gid_to_lid_change_handler.h b/searchcore/src/vespa/searchcore/proton/reference/gid_to_lid_change_handler.h index 840cb61cc2a..264ece76eaa 100644 --- a/searchcore/src/vespa/searchcore/proton/reference/gid_to_lid_change_handler.h +++ b/searchcore/src/vespa/searchcore/proton/reference/gid_to_lid_change_handler.h @@ -22,18 +22,36 @@ class GidToLidChangeHandler : public std::enable_shared_from_this<GidToLidChange { using lock_guard = std::lock_guard<std::mutex>; using Listeners = std::vector<std::unique_ptr<IGidToLidChangeListener>>; + struct PendingRemoveEntry { + SerialNum removeSerialNum; + SerialNum putSerialNum; + uint32_t refCount; + + PendingRemoveEntry(SerialNum removeSerialNum_) + : removeSerialNum(removeSerialNum_), + putSerialNum(0), + refCount(1) + { + } + + PendingRemoveEntry() + : PendingRemoveEntry(0) + { + } + }; + std::mutex _lock; Listeners _listeners; bool _closed; - vespalib::hash_map<GlobalId, SerialNum, GlobalId::hash> _pendingRemove; + vespalib::hash_map<GlobalId, PendingRemoveEntry, GlobalId::hash> _pendingRemove; - void notifyPut(GlobalId gid, uint32_t lid); + void notifyPutDone(GlobalId gid, uint32_t lid); void notifyRemove(GlobalId gid); public: GidToLidChangeHandler(); virtual ~GidToLidChangeHandler(); - virtual void notifyPut(GlobalId gid, uint32_t lid, SerialNum serialNum) override; + virtual void notifyPutDone(GlobalId gid, uint32_t lid, SerialNum serialNum) override; virtual void notifyRemove(GlobalId gid, SerialNum serialNum) override; virtual void notifyRemoveDone(GlobalId gid, SerialNum serialNum) override; diff --git a/searchcore/src/vespa/searchcore/proton/reference/gid_to_lid_change_listener.cpp b/searchcore/src/vespa/searchcore/proton/reference/gid_to_lid_change_listener.cpp index 6a368997770..cb5c5de0f5a 100644 --- a/searchcore/src/vespa/searchcore/proton/reference/gid_to_lid_change_listener.cpp +++ b/searchcore/src/vespa/searchcore/proton/reference/gid_to_lid_change_listener.cpp @@ -26,7 +26,7 @@ GidToLidChangeListener::~GidToLidChangeListener() } void -GidToLidChangeListener::notifyPut(document::GlobalId gid, uint32_t lid) +GidToLidChangeListener::notifyPutDone(document::GlobalId gid, uint32_t lid) { std::promise<bool> promise; std::future<bool> future = promise.get_future(); diff --git a/searchcore/src/vespa/searchcore/proton/reference/gid_to_lid_change_listener.h b/searchcore/src/vespa/searchcore/proton/reference/gid_to_lid_change_listener.h index 35ff913d7af..0e9298e3e51 100644 --- a/searchcore/src/vespa/searchcore/proton/reference/gid_to_lid_change_listener.h +++ b/searchcore/src/vespa/searchcore/proton/reference/gid_to_lid_change_listener.h @@ -30,7 +30,7 @@ public: const vespalib::string &name, const vespalib::string &docTypeName); virtual ~GidToLidChangeListener(); - virtual void notifyPut(document::GlobalId gid, uint32_t lid) override; + virtual void notifyPutDone(document::GlobalId gid, uint32_t lid) override; virtual void notifyRemove(document::GlobalId gid) override; virtual void notifyRegistered() override; virtual const vespalib::string &getName() const override; diff --git a/searchcore/src/vespa/searchcore/proton/reference/i_gid_to_lid_change_handler.h b/searchcore/src/vespa/searchcore/proton/reference/i_gid_to_lid_change_handler.h index 53ade83ff32..7de837236e8 100644 --- a/searchcore/src/vespa/searchcore/proton/reference/i_gid_to_lid_change_handler.h +++ b/searchcore/src/vespa/searchcore/proton/reference/i_gid_to_lid_change_handler.h @@ -40,7 +40,7 @@ public: /** * Notify gid to lid mapping change. */ - virtual void notifyPut(GlobalId gid, uint32_t lid, SerialNum serialNum) = 0; + virtual void notifyPutDone(GlobalId gid, uint32_t lid, SerialNum serialNum) = 0; virtual void notifyRemove(GlobalId gid, SerialNum serialNum) = 0; virtual void notifyRemoveDone(GlobalId gid, SerialNum serialNum) = 0; }; diff --git a/searchcore/src/vespa/searchcore/proton/reference/i_gid_to_lid_change_listener.h b/searchcore/src/vespa/searchcore/proton/reference/i_gid_to_lid_change_listener.h index d02979e168f..ebae8d40611 100644 --- a/searchcore/src/vespa/searchcore/proton/reference/i_gid_to_lid_change_listener.h +++ b/searchcore/src/vespa/searchcore/proton/reference/i_gid_to_lid_change_listener.h @@ -17,7 +17,7 @@ class IGidToLidChangeListener { public: virtual ~IGidToLidChangeListener() { } - virtual void notifyPut(document::GlobalId gid, uint32_t lid) = 0; + virtual void notifyPutDone(document::GlobalId gid, uint32_t lid) = 0; virtual void notifyRemove(document::GlobalId gid) = 0; virtual void notifyRegistered() = 0; virtual const vespalib::string &getName() const = 0; diff --git a/searchcore/src/vespa/searchcore/proton/reference/pending_notify_remove_done.cpp b/searchcore/src/vespa/searchcore/proton/reference/pending_notify_remove_done.cpp new file mode 100644 index 00000000000..e806628bc02 --- /dev/null +++ b/searchcore/src/vespa/searchcore/proton/reference/pending_notify_remove_done.cpp @@ -0,0 +1,50 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "pending_notify_remove_done.h" +#include <vespa/searchcore/proton/reference/i_gid_to_lid_change_handler.h> +#include <cassert> + +namespace proton +{ + +PendingNotifyRemoveDone::PendingNotifyRemoveDone() + : _gidToLidChangeHandler(nullptr), + _gid(), + _serialNum(0), + _pending(false) +{ +} + +PendingNotifyRemoveDone::PendingNotifyRemoveDone(PendingNotifyRemoveDone &&rhs) + : _gidToLidChangeHandler(rhs._gidToLidChangeHandler), + _gid(rhs._gid), + _serialNum(rhs._serialNum), + _pending(rhs._pending) +{ + rhs._pending = false; +} + +PendingNotifyRemoveDone::~PendingNotifyRemoveDone() +{ + assert(!_pending); // Fail if notifyRemoveDone is still pending +} + +void +PendingNotifyRemoveDone::setup(IGidToLidChangeHandler &gidToLidChangeHandler, document::GlobalId gid, search::SerialNum serialNum) +{ + _gidToLidChangeHandler = &gidToLidChangeHandler; + _gid = gid; + _serialNum = serialNum; + _pending = true; +} + +void +PendingNotifyRemoveDone::invoke() +{ + if (_pending) { + _gidToLidChangeHandler->notifyRemoveDone(_gid, _serialNum); + _pending = false; + } +} + +} // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/reference/pending_notify_remove_done.h b/searchcore/src/vespa/searchcore/proton/reference/pending_notify_remove_done.h new file mode 100644 index 00000000000..95aad182b10 --- /dev/null +++ b/searchcore/src/vespa/searchcore/proton/reference/pending_notify_remove_done.h @@ -0,0 +1,35 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include <vespa/document/base/globalid.h> +#include <vespa/searchlib/common/serialnum.h> + +namespace proton +{ + +class IGidToLidChangeHandler; + +/* + * Class used to keep track of a pending notifyRemoveDone() call to + * a gid to lid change handler. + */ +class PendingNotifyRemoveDone +{ + IGidToLidChangeHandler *_gidToLidChangeHandler; + document::GlobalId _gid; + search::SerialNum _serialNum; + bool _pending; + +public: + PendingNotifyRemoveDone(); + PendingNotifyRemoveDone(PendingNotifyRemoveDone &&rhs); + PendingNotifyRemoveDone(const PendingNotifyRemoveDone &rhs) = delete; + PendingNotifyRemoveDone &operator=(const PendingNotifyRemoveDone &rhs) = delete; + PendingNotifyRemoveDone &operator=(PendingNotifyRemoveDone &&rhs) = delete; + ~PendingNotifyRemoveDone(); + void setup(IGidToLidChangeHandler &gidToLidChangeHandler, document::GlobalId gid, search::SerialNum serialNum); + void invoke(); +}; + +} // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/server/putdonecontext.cpp b/searchcore/src/vespa/searchcore/proton/server/putdonecontext.cpp index f05d8bc0823..efb5a58dd2e 100644 --- a/searchcore/src/vespa/searchcore/proton/server/putdonecontext.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/putdonecontext.cpp @@ -31,7 +31,7 @@ PutDoneContext::~PutDoneContext() _docIdLimit->bumpUpLimit(_lid + 1); } if (_enableNotifyPut) { - _gidToLidChangeHandler.notifyPut(_gid, _lid, _serialNum); + _gidToLidChangeHandler.notifyPutDone(_gid, _lid, _serialNum); } } diff --git a/searchcore/src/vespa/searchcore/proton/server/removedonecontext.cpp b/searchcore/src/vespa/searchcore/proton/server/removedonecontext.cpp index 522b0aed617..627e8d9f627 100644 --- a/searchcore/src/vespa/searchcore/proton/server/removedonecontext.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/removedonecontext.cpp @@ -12,18 +12,12 @@ RemoveDoneContext::RemoveDoneContext(std::unique_ptr<FeedToken> token, PerDocTypeFeedMetrics &metrics, vespalib::Executor &executor, IDocumentMetaStore &documentMetaStore, - IGidToLidChangeHandler &gidToLidChangeHandler, - const document::GlobalId &gid, - uint32_t lid, - search::SerialNum serialNum, - bool enableNotifyRemoveDone) + PendingNotifyRemoveDone &&pendingNotifyRemoveDone, + uint32_t lid) : OperationDoneContext(std::move(token), opType, metrics), _executor(executor), _task(), - _gidToLidChangeHandler(gidToLidChangeHandler), - _gid(gid), - _serialNum(serialNum), - _enableNotifyRemoveDone(enableNotifyRemoveDone) + _pendingNotifyRemoveDone(std::move(pendingNotifyRemoveDone)) { if (lid != 0) { _task = std::make_unique<RemoveDoneTask>(documentMetaStore, lid); @@ -32,9 +26,7 @@ RemoveDoneContext::RemoveDoneContext(std::unique_ptr<FeedToken> token, RemoveDoneContext::~RemoveDoneContext() { - if (_enableNotifyRemoveDone) { - _gidToLidChangeHandler.notifyRemoveDone(_gid, _serialNum); - } + _pendingNotifyRemoveDone.invoke(); ack(); if (_task) { vespalib::Executor::Task::UP res = _executor.execute(std::move(_task)); diff --git a/searchcore/src/vespa/searchcore/proton/server/removedonecontext.h b/searchcore/src/vespa/searchcore/proton/server/removedonecontext.h index 9311a6d2b6e..c4fafb4e886 100644 --- a/searchcore/src/vespa/searchcore/proton/server/removedonecontext.h +++ b/searchcore/src/vespa/searchcore/proton/server/removedonecontext.h @@ -6,12 +6,12 @@ #include <vespa/vespalib/util/executor.h> #include <vespa/document/base/globalid.h> #include <vespa/searchlib/common/serialnum.h> +#include <vespa/searchcore/proton/reference/pending_notify_remove_done.h> namespace proton { class IDocumentMetaStore; -class IGidToLidChangeHandler; /** @@ -26,10 +26,7 @@ class RemoveDoneContext : public OperationDoneContext { vespalib::Executor &_executor; std::unique_ptr<vespalib::Executor::Task> _task; - IGidToLidChangeHandler &_gidToLidChangeHandler; - document::GlobalId _gid; - search::SerialNum _serialNum; - bool _enableNotifyRemoveDone; + PendingNotifyRemoveDone _pendingNotifyRemoveDone; public: RemoveDoneContext(std::unique_ptr<FeedToken> token, @@ -37,11 +34,8 @@ public: PerDocTypeFeedMetrics &metrics, vespalib::Executor &executor, IDocumentMetaStore &documentMetaStore, - IGidToLidChangeHandler &gidToLidChangeHandler, - const document::GlobalId &gid, - uint32_t lid, - search::SerialNum serialNum, - bool enableNotifyRemoveDone); + PendingNotifyRemoveDone &&pendingNotifyRemoveDone, + uint32_t lid); virtual ~RemoveDoneContext(); }; diff --git a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp index 5f0fc3b8aa6..a853db7427d 100644 --- a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp @@ -117,14 +117,11 @@ public: RemoveDoneContextForMove(std::unique_ptr<FeedToken> token, const FeedOperation::Type opType, PerDocTypeFeedMetrics &metrics, vespalib::Executor &executor, IDocumentMetaStore &documentMetaStore, - IGidToLidChangeHandler &gidToLidChangeHandler, - const document::GlobalId &gid, + PendingNotifyRemoveDone &&pendingNotifyRemoveDone, uint32_t lid, - SerialNum serialNum, - bool enableNotifyRemoveDone, IDestructorCallback::SP moveDoneCtx) - : RemoveDoneContext(std::move(token), opType, metrics, executor, documentMetaStore, gidToLidChangeHandler, gid, lid, serialNum, enableNotifyRemoveDone), - _moveDoneCtx(std::move(moveDoneCtx)) + : RemoveDoneContext(std::move(token), opType, metrics, executor, documentMetaStore, std::move(pendingNotifyRemoveDone) ,lid), + _moveDoneCtx(std::move(moveDoneCtx)) {} virtual ~RemoveDoneContextForMove() {} }; @@ -133,19 +130,16 @@ std::shared_ptr<RemoveDoneContext> createRemoveDoneContext(std::unique_ptr<FeedToken> token, const FeedOperation::Type opType, PerDocTypeFeedMetrics &metrics, vespalib::Executor &executor, IDocumentMetaStore &documentMetaStore, - IGidToLidChangeHandler &gidToLidChangeHandler, - const document::GlobalId &gid, + PendingNotifyRemoveDone &&pendingNotifyRemoveDone, uint32_t lid, - SerialNum serialNum, - bool enableNotifyRemoveDone, IDestructorCallback::SP moveDoneCtx) { if (moveDoneCtx) { return std::make_shared<RemoveDoneContextForMove> - (std::move(token), opType, metrics, executor, documentMetaStore, gidToLidChangeHandler, gid, lid, serialNum, enableNotifyRemoveDone, std::move(moveDoneCtx)); + (std::move(token), opType, metrics, executor, documentMetaStore, std::move(pendingNotifyRemoveDone), lid, std::move(moveDoneCtx)); } else { return std::make_shared<RemoveDoneContext> - (std::move(token), opType, metrics, executor, documentMetaStore, gidToLidChangeHandler, gid, lid, serialNum, enableNotifyRemoveDone); + (std::move(token), opType, metrics, executor, documentMetaStore, std::move(pendingNotifyRemoveDone), lid); } } @@ -298,7 +292,7 @@ StoreOnlyFeedView::internalPut(FeedToken::UP token, const PutOperation &putOp) putOp.getSubDbId(), putOp.getLid(), putOp.getPrevSubDbId(), putOp.getPrevLid(), _params._subDbId, doc->toString(true).size(), doc->toString(true).c_str()); - adjustMetaStore(putOp, docId); + PendingNotifyRemoveDone pendingNotifyRemoveDone = adjustMetaStore(putOp, docId); considerEarlyAck(token, putOp.getType()); bool docAlreadyExists = putOp.getValidPrevDbdId(_params._subDbId); @@ -315,8 +309,7 @@ StoreOnlyFeedView::internalPut(FeedToken::UP token, const PutOperation &putOp) } if (docAlreadyExists && putOp.changedDbdId()) { assert(!putOp.getValidDbdId(_params._subDbId)); - const document::GlobalId &gid = docId.getGlobalId(); - internalRemove(std::move(token), serialNum, gid, putOp.getPrevLid(), putOp.getType(), useDocumentMetaStore(serialNum), IDestructorCallback::SP()); + internalRemove(std::move(token), serialNum, std::move(pendingNotifyRemoveDone), putOp.getPrevLid(), putOp.getType(), IDestructorCallback::SP()); } if (token.get() != NULL) { token->ack(putOp.getType(), _params._metrics); @@ -399,10 +392,11 @@ void StoreOnlyFeedView::putSummary(SerialNum serialNum, Lid lid, Document::SP do })); #pragma GCC diagnostic pop } -void StoreOnlyFeedView::removeSummary(SerialNum serialNum, Lid lid) { +void StoreOnlyFeedView::removeSummary(SerialNum serialNum, Lid lid, OnWriteDoneType onDone) { _pendingLidTracker.produce(lid); summaryExecutor().execute( - makeLambdaTask([serialNum, lid, this] { + makeLambdaTask([serialNum, lid, onDone, this] { + (void) onDone; _summaryAdapter->remove(serialNum, lid); _pendingLidTracker.consume(lid); })); @@ -581,7 +575,7 @@ StoreOnlyFeedView::internalRemove(FeedToken::UP token, const RemoveOperation &rm _params._docTypeName.toString().c_str(), serialNum, docId.toString().c_str(), rmOp.getSubDbId(), rmOp.getLid(), rmOp.getPrevSubDbId(), rmOp.getPrevLid(), _params._subDbId); - adjustMetaStore(rmOp, docId); + PendingNotifyRemoveDone pendingNotifyRemoveDone = adjustMetaStore(rmOp, docId); considerEarlyAck(token, rmOp.getType()); if (rmOp.getValidDbdId(_params._subDbId)) { @@ -593,8 +587,7 @@ StoreOnlyFeedView::internalRemove(FeedToken::UP token, const RemoveOperation &rm if (rmOp.getValidPrevDbdId(_params._subDbId)) { if (rmOp.changedDbdId()) { assert(!rmOp.getValidDbdId(_params._subDbId)); - const document::GlobalId &gid = docId.getGlobalId(); - internalRemove(std::move(token), serialNum, gid, rmOp.getPrevLid(), rmOp.getType(), useDocumentMetaStore(serialNum), IDestructorCallback::SP()); + internalRemove(std::move(token), serialNum, std::move(pendingNotifyRemoveDone), rmOp.getPrevLid(), rmOp.getType(), IDestructorCallback::SP()); } } if (token.get() != NULL) { @@ -603,22 +596,23 @@ StoreOnlyFeedView::internalRemove(FeedToken::UP token, const RemoveOperation &rm } void -StoreOnlyFeedView::internalRemove(FeedToken::UP token, SerialNum serialNum, const document::GlobalId &gid, Lid lid, - FeedOperation::Type opType, bool enableNotifyRemoveDone, IDestructorCallback::SP moveDoneCtx) +StoreOnlyFeedView::internalRemove(FeedToken::UP token, SerialNum serialNum, PendingNotifyRemoveDone &&pendingNotifyRemoveDone, Lid lid, + FeedOperation::Type opType, IDestructorCallback::SP moveDoneCtx) { - removeSummary(serialNum, lid); bool explicitReuseLid = _lidReuseDelayer.delayReuse(lid); std::shared_ptr<RemoveDoneContext> onWriteDone; onWriteDone = createRemoveDoneContext(std::move(token), opType, _params._metrics, _writeService.master(), - _metaStore, _gidToLidChangeHandler, gid, (explicitReuseLid ? lid : 0u), serialNum, enableNotifyRemoveDone, moveDoneCtx); + _metaStore, std::move(pendingNotifyRemoveDone), (explicitReuseLid ? lid : 0u), moveDoneCtx); + removeSummary(serialNum, lid, onWriteDone); bool immediateCommit = _commitTimeTracker.needCommit(); removeAttributes(serialNum, lid, immediateCommit, onWriteDone); removeIndexedFields(serialNum, lid, immediateCommit, onWriteDone); } -void +PendingNotifyRemoveDone StoreOnlyFeedView::adjustMetaStore(const DocumentOperation &op, const DocumentId &docId) { + PendingNotifyRemoveDone pendingNotifyRemoveDone; const SerialNum serialNum = op.getSerialNum(); if (useDocumentMetaStore(serialNum)) { if (op.getValidDbdId(_params._subDbId)) { @@ -632,10 +626,12 @@ StoreOnlyFeedView::adjustMetaStore(const DocumentOperation &op, const DocumentId } } else if (op.getValidPrevDbdId(_params._subDbId)) { _gidToLidChangeHandler.notifyRemove(docId.getGlobalId(), serialNum); + pendingNotifyRemoveDone.setup(_gidToLidChangeHandler, docId.getGlobalId(), serialNum); removeMetaData(_metaStore, docId, op, _params._subDbType == SubDbType::REMOVED); } _metaStore.commit(serialNum, serialNum); } + return pendingNotifyRemoveDone; } void @@ -684,7 +680,7 @@ StoreOnlyFeedView::removeDocuments(const RemoveDocumentsOperation &op, bool remo } if (useDocumentStore(serialNum + 1)) { for (const auto &lid : lidsToRemove) { - removeSummary(serialNum, lid); + removeSummary(serialNum, lid, onWriteDone); } } return lidsToRemove.size(); @@ -750,7 +746,7 @@ StoreOnlyFeedView::handleMove(const MoveOperation &moveOp, IDestructorCallback:: moveOp.getSubDbId(), moveOp.getLid(), moveOp.getPrevSubDbId(), moveOp.getPrevLid(), _params._subDbId, doc->toString(true).size(), doc->toString(true).c_str()); - adjustMetaStore(moveOp, docId); + PendingNotifyRemoveDone pendingNotifyRemoveDone = adjustMetaStore(moveOp, docId); bool docAlreadyExists = moveOp.getValidPrevDbdId(_params._subDbId); if (moveOp.getValidDbdId(_params._subDbId)) { bool immediateCommit = _commitTimeTracker.needCommit(); @@ -765,9 +761,7 @@ StoreOnlyFeedView::handleMove(const MoveOperation &moveOp, IDestructorCallback:: putIndexedFields(serialNum, moveOp.getLid(), doc, immediateCommit, onWriteDone); } if (docAlreadyExists && moveOp.changedDbdId()) { - const document::GlobalId &gid = docId.getGlobalId(); - bool enableNotifyRemoveDone = useDocumentMetaStore(serialNum) && !moveOp.getValidDbdId(_params._subDbId); - internalRemove(FeedToken::UP(), serialNum, gid, moveOp.getPrevLid(), moveOp.getType(), enableNotifyRemoveDone, doneCtx); + internalRemove(FeedToken::UP(), serialNum, std::move(pendingNotifyRemoveDone), moveOp.getPrevLid(), moveOp.getType(), doneCtx); } } diff --git a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h index 01a8122ed1e..fbc8888ac79 100644 --- a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h +++ b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h @@ -15,6 +15,7 @@ #include <vespa/searchcore/proton/documentmetastore/documentmetastorecontext.h> #include <vespa/searchcore/proton/feedoperation/feedoperation.h> #include <vespa/searchcore/proton/persistenceengine/resulthandler.h> +#include <vespa/searchcore/proton/reference/pending_notify_remove_done.h> #include <vespa/searchcorespi/index/ithreadingservice.h> #include <vespa/searchlib/query/base.h> #include <vespa/vespalib/util/threadstackexecutorbase.h> @@ -157,7 +158,7 @@ private: } void putSummary(SerialNum serialNum, Lid lid, FutureStream doc, OnOperationDoneType onDone); void putSummary(SerialNum serialNum, Lid lid, Document::SP doc, OnOperationDoneType onDone); - void removeSummary(SerialNum serialNum, Lid lid); + void removeSummary(SerialNum serialNum, Lid lid, OnWriteDoneType onDone); void heartBeatSummary(SerialNum serialNum); @@ -168,7 +169,7 @@ private: return replaySerialNum > _params._flushedDocumentMetaStoreSerialNum; } - void adjustMetaStore(const DocumentOperation &op, const document::DocumentId &docId); + PendingNotifyRemoveDone adjustMetaStore(const DocumentOperation &op, const document::DocumentId &docId); void internalPut(FeedTokenUP token, const PutOperation &putOp); void internalUpdate(FeedTokenUP token, const UpdateOperation &updOp); @@ -180,8 +181,8 @@ private: size_t removeDocuments(const RemoveDocumentsOperation &op, bool remove_index_and_attribute_fields, bool immediateCommit); - void internalRemove(FeedTokenUP token, SerialNum serialNum, const document::GlobalId &gid, Lid lid, - FeedOperation::Type opType, bool enableNotifyRemoveDone, std::shared_ptr<search::IDestructorCallback> moveDoneCtx); + void internalRemove(FeedTokenUP token, SerialNum serialNum, PendingNotifyRemoveDone &&pendingNotifyRemoveDone, Lid lid, + FeedOperation::Type opType, std::shared_ptr<search::IDestructorCallback> moveDoneCtx); // Ack token early if visibility delay is nonzero void considerEarlyAck(FeedTokenUP &token, FeedOperation::Type opType); diff --git a/searchcore/src/vespa/searchcore/proton/test/mock_gid_to_lid_change_handler.h b/searchcore/src/vespa/searchcore/proton/test/mock_gid_to_lid_change_handler.h index 41efb55e61c..c1c2e2909d9 100644 --- a/searchcore/src/vespa/searchcore/proton/test/mock_gid_to_lid_change_handler.h +++ b/searchcore/src/vespa/searchcore/proton/test/mock_gid_to_lid_change_handler.h @@ -44,7 +44,7 @@ public: _removes.emplace_back(docTypeName, keepNames); } - virtual void notifyPut(document::GlobalId, uint32_t, SerialNum) override { } + virtual void notifyPutDone(document::GlobalId, uint32_t, SerialNum) override { } virtual void notifyRemove(document::GlobalId, SerialNum) override { } virtual void notifyRemoveDone(document::GlobalId, SerialNum) override { } |