diff options
author | Geir Storli <geirstorli@yahoo.no> | 2017-08-31 09:20:09 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-08-31 09:20:09 +0200 |
commit | 0c7af1343cec5ca95e338ce83dda158f123e72dc (patch) | |
tree | 3194b97012558afea7783922c2c7ce849a37611d | |
parent | c339ebb9f795872c68a1eed64b83e47c44fd0421 (diff) | |
parent | 024f5a897f19dfceca8a9faa494f8494cd0251aa (diff) |
Merge pull request #3269 from vespa-engine/toregge/refactor-gid-to-lid-change-handler-usage
Toregge/refactor gid to lid change handler usage
21 files changed, 310 insertions, 147 deletions
diff --git a/searchcore/src/tests/proton/documentdb/feedview/feedview_test.cpp b/searchcore/src/tests/proton/documentdb/feedview/feedview_test.cpp index 4970ad867ce..2ed60f3c078 100644 --- a/searchcore/src/tests/proton/documentdb/feedview/feedview_test.cpp +++ b/searchcore/src/tests/proton/documentdb/feedview/feedview_test.cpp @@ -198,13 +198,23 @@ public: { } - virtual void notifyGidToLidChange(document::GlobalId gid, uint32_t lid) override { + virtual void notifyPut(document::GlobalId gid, uint32_t lid, SerialNum) override { _changeGid = gid; _changeLid = lid; _gidToLid[gid] = lid; ++_changes; } + virtual void notifyRemove(document::GlobalId gid, SerialNum) override { + _changeGid = gid; + _changeLid = 0; + _gidToLid[gid] = 0; + ++_changes; + } + + virtual void notifyRemoveDone(document::GlobalId, SerialNum) override { + } + void assertChanges(document::GlobalId expGid, uint32_t expLid, uint32_t expChanges) { EXPECT_EQUAL(expGid, _changeGid); EXPECT_EQUAL(expLid, _changeLid); diff --git a/searchcore/src/tests/proton/reference/gid_to_lid_change_handler/gid_to_lid_change_handler_test.cpp b/searchcore/src/tests/proton/reference/gid_to_lid_change_handler/gid_to_lid_change_handler_test.cpp index eb2052ef32f..1c5287f15e5 100644 --- a/searchcore/src/tests/proton/reference/gid_to_lid_change_handler/gid_to_lid_change_handler_test.cpp +++ b/searchcore/src/tests/proton/reference/gid_to_lid_change_handler/gid_to_lid_change_handler_test.cpp @@ -14,6 +14,7 @@ LOG_SETUP("gid_to_lid_change_handler_test"); using document::GlobalId; using document::DocumentId; using search::makeLambdaTask; +using search::SerialNum; namespace proton { @@ -30,7 +31,8 @@ vespalib::string doc1("id:test:music::1"); class ListenerStats { using lock_guard = std::lock_guard<std::mutex>; std::mutex _lock; - uint32_t _changes; + uint32_t _putChanges; + uint32_t _removeChanges; uint32_t _createdListeners; uint32_t _registeredListeners; uint32_t _destroyedListeners; @@ -38,7 +40,8 @@ class ListenerStats { public: ListenerStats() : _lock(), - _changes(0u), + _putChanges(0u), + _removeChanges(0u), _createdListeners(0u), _registeredListeners(0u), _destroyedListeners(0u) @@ -50,9 +53,13 @@ public: EXPECT_EQUAL(_createdListeners, _destroyedListeners); } - void notifyGidToLidChange() { + void notifyPut() { lock_guard guard(_lock); - ++_changes; + ++_putChanges; + } + void notifyRemove() { + lock_guard guard(_lock); + ++_removeChanges; } void markCreatedListener() { lock_guard guard(_lock); ++_createdListeners; } void markRegisteredListener() { lock_guard guard(_lock); ++_registeredListeners; } @@ -62,15 +69,18 @@ public: uint32_t getRegisteredListeners() const { return _registeredListeners; } uint32_t getDestroyedListeners() const { return _destroyedListeners; } - void assertCounts(uint32_t expCreatedListeners, - uint32_t expRegisteredListeners, - uint32_t expDestroyedListeners, - uint32_t expChanges) + void assertListeners(uint32_t expCreatedListeners, + uint32_t expRegisteredListeners, + uint32_t expDestroyedListeners) { EXPECT_EQUAL(expCreatedListeners, getCreatedListeners()); EXPECT_EQUAL(expRegisteredListeners, getRegisteredListeners()); EXPECT_EQUAL(expDestroyedListeners, getDestroyedListeners()); - EXPECT_EQUAL(expChanges, _changes); + } + void assertChanges(uint32_t expPutChanges, uint32_t expRemoveChanges) + { + EXPECT_EQUAL(expPutChanges, _putChanges); + EXPECT_EQUAL(expRemoveChanges, _removeChanges); } }; @@ -91,7 +101,8 @@ public: _stats.markCreatedListener(); } virtual ~MyListener() { _stats.markDestroyedListener(); } - virtual void notifyGidToLidChange(GlobalId, uint32_t) override { _stats.notifyGidToLidChange(); } + virtual void notifyPut(GlobalId, uint32_t) override { _stats.notifyPut(); } + virtual void notifyRemove(GlobalId) override { _stats.notifyRemove(); } virtual void notifyRegistered() override { _stats.markRegisteredListener(); } virtual const vespalib::string &getName() const override { return _name; } virtual const vespalib::string &getDocTypeName() const override { return _docTypeName; } @@ -99,16 +110,12 @@ public: struct Fixture { - vespalib::ThreadStackExecutor _masterExecutor; - ExecutorThreadService _master; std::vector<std::shared_ptr<ListenerStats>> _statss; std::shared_ptr<GidToLidChangeHandler> _handler; Fixture() - : _masterExecutor(1, 128 * 1024), - _master(_masterExecutor), - _statss(), - _handler(std::make_shared<GidToLidChangeHandler>(&_master)) + : _statss(), + _handler(std::make_shared<GidToLidChangeHandler>()) { } @@ -119,8 +126,7 @@ struct Fixture void close() { - _master.execute(makeLambdaTask([this]() { _handler->close(); })); - _master.sync(); + _handler->close(); } ListenerStats &addStats() { @@ -130,18 +136,23 @@ struct Fixture void addListener(std::unique_ptr<IGidToLidChangeListener> listener) { _handler->addListener(std::move(listener)); - _master.sync(); } - void notifyGidToLidChange(GlobalId gid, uint32_t lid) { - _master.execute(makeLambdaTask([this, gid, lid]() { _handler->notifyGidToLidChange(gid, lid); })); - _master.sync(); + void notifyPut(GlobalId gid, uint32_t lid, SerialNum serialNum) { + _handler->notifyPut(gid, lid, serialNum); + } + + void notifyRemove(GlobalId gid, SerialNum serialNum) { + _handler->notifyRemove(gid, serialNum); + } + + void notifyRemoveDone(GlobalId gid, SerialNum serialNum) { + _handler->notifyRemoveDone(gid, serialNum); } void removeListeners(const vespalib::string &docTypeName, const std::set<vespalib::string> &keepNames) { _handler->removeListeners(docTypeName, keepNames); - _master.sync(); } }; @@ -150,13 +161,13 @@ TEST_F("Test that we can register a listener", Fixture) { auto &stats = f.addStats(); auto listener = std::make_unique<MyListener>(stats, "test", "testdoc"); - TEST_DO(stats.assertCounts(1, 0, 0, 0)); + TEST_DO(stats.assertListeners(1, 0, 0)); f.addListener(std::move(listener)); - TEST_DO(stats.assertCounts(1, 1, 0, 0)); - f.notifyGidToLidChange(toGid(doc1), 10); - TEST_DO(stats.assertCounts(1, 1, 0, 1)); + TEST_DO(stats.assertListeners(1, 1, 0)); + f.notifyPut(toGid(doc1), 10, 10); + TEST_DO(stats.assertChanges(1, 0)); f.removeListeners("testdoc", {}); - TEST_DO(stats.assertCounts(1, 1, 1, 1)); + TEST_DO(stats.assertListeners(1, 1, 1)); } TEST_F("Test that we can register multiple listeners", Fixture) @@ -167,48 +178,86 @@ TEST_F("Test that we can register multiple listeners", Fixture) auto listener1 = std::make_unique<MyListener>(stats1, "test1", "testdoc"); auto listener2 = std::make_unique<MyListener>(stats2, "test2", "testdoc"); auto listener3 = std::make_unique<MyListener>(stats3, "test3", "testdoc2"); - TEST_DO(stats1.assertCounts(1, 0, 0, 0)); - TEST_DO(stats2.assertCounts(1, 0, 0, 0)); - TEST_DO(stats3.assertCounts(1, 0, 0, 0)); + TEST_DO(stats1.assertListeners(1, 0, 0)); + TEST_DO(stats2.assertListeners(1, 0, 0)); + TEST_DO(stats3.assertListeners(1, 0, 0)); f.addListener(std::move(listener1)); f.addListener(std::move(listener2)); f.addListener(std::move(listener3)); - TEST_DO(stats1.assertCounts(1, 1, 0, 0)); - TEST_DO(stats2.assertCounts(1, 1, 0, 0)); - TEST_DO(stats3.assertCounts(1, 1, 0, 0)); - f.notifyGidToLidChange(toGid(doc1), 10); - TEST_DO(stats1.assertCounts(1, 1, 0, 1)); - TEST_DO(stats2.assertCounts(1, 1, 0, 1)); - TEST_DO(stats3.assertCounts(1, 1, 0, 1)); + TEST_DO(stats1.assertListeners(1, 1, 0)); + TEST_DO(stats2.assertListeners(1, 1, 0)); + TEST_DO(stats3.assertListeners(1, 1, 0)); + f.notifyPut(toGid(doc1), 10, 10); + TEST_DO(stats1.assertChanges(1, 0)); + TEST_DO(stats2.assertChanges(1, 0)); + TEST_DO(stats3.assertChanges(1, 0)); f.removeListeners("testdoc", {"test1"}); - TEST_DO(stats1.assertCounts(1, 1, 0, 1)); - TEST_DO(stats2.assertCounts(1, 1, 1, 1)); - TEST_DO(stats3.assertCounts(1, 1, 0, 1)); + TEST_DO(stats1.assertListeners(1, 1, 0)); + TEST_DO(stats2.assertListeners(1, 1, 1)); + TEST_DO(stats3.assertListeners(1, 1, 0)); f.removeListeners("testdoc", {}); - TEST_DO(stats1.assertCounts(1, 1, 1, 1)); - TEST_DO(stats2.assertCounts(1, 1, 1, 1)); - TEST_DO(stats3.assertCounts(1, 1, 0, 1)); + TEST_DO(stats1.assertListeners(1, 1, 1)); + TEST_DO(stats2.assertListeners(1, 1, 1)); + TEST_DO(stats3.assertListeners(1, 1, 0)); f.removeListeners("testdoc2", {"test3"}); - TEST_DO(stats1.assertCounts(1, 1, 1, 1)); - TEST_DO(stats2.assertCounts(1, 1, 1, 1)); - TEST_DO(stats3.assertCounts(1, 1, 0, 1)); + TEST_DO(stats1.assertListeners(1, 1, 1)); + TEST_DO(stats2.assertListeners(1, 1, 1)); + TEST_DO(stats3.assertListeners(1, 1, 0)); f.removeListeners("testdoc2", {"foo"}); - TEST_DO(stats1.assertCounts(1, 1, 1, 1)); - TEST_DO(stats2.assertCounts(1, 1, 1, 1)); - TEST_DO(stats3.assertCounts(1, 1, 1, 1)); + TEST_DO(stats1.assertListeners(1, 1, 1)); + TEST_DO(stats2.assertListeners(1, 1, 1)); + TEST_DO(stats3.assertListeners(1, 1, 1)); } TEST_F("Test that we keep old listener when registering duplicate", Fixture) { auto &stats = f.addStats(); auto listener = std::make_unique<MyListener>(stats, "test1", "testdoc"); - TEST_DO(stats.assertCounts(1, 0, 0, 0)); + TEST_DO(stats.assertListeners(1, 0, 0)); f.addListener(std::move(listener)); - TEST_DO(stats.assertCounts(1, 1, 0, 0)); + TEST_DO(stats.assertListeners(1, 1, 0)); listener = std::make_unique<MyListener>(stats, "test1", "testdoc"); - TEST_DO(stats.assertCounts(2, 1, 0, 0)); + TEST_DO(stats.assertListeners(2, 1, 0)); + f.addListener(std::move(listener)); + TEST_DO(stats.assertListeners(2, 1, 1)); +} + +TEST_F("Test that put is ignored if we have a pending remove", Fixture) +{ + auto &stats = f.addStats(); + auto listener = std::make_unique<MyListener>(stats, "test", "testdoc"); f.addListener(std::move(listener)); - TEST_DO(stats.assertCounts(2, 1, 1, 0)); + f.notifyRemove(toGid(doc1), 20); + TEST_DO(stats.assertChanges(0, 1)); + f.notifyPut(toGid(doc1), 10, 10); + TEST_DO(stats.assertChanges(0, 1)); + f.notifyRemoveDone(toGid(doc1), 20); + TEST_DO(stats.assertChanges(0, 1)); + f.notifyPut(toGid(doc1), 11, 30); + TEST_DO(stats.assertChanges(1, 1)); + f.removeListeners("testdoc", {}); +} + +TEST_F("Test that pending removes are merged", Fixture) +{ + auto &stats = f.addStats(); + auto listener = std::make_unique<MyListener>(stats, "test", "testdoc"); + f.addListener(std::move(listener)); + f.notifyRemove(toGid(doc1), 20); + TEST_DO(stats.assertChanges(0, 1)); + f.notifyRemove(toGid(doc1), 40); + TEST_DO(stats.assertChanges(0, 1)); + f.notifyPut(toGid(doc1), 10, 10); + TEST_DO(stats.assertChanges(0, 1)); + f.notifyRemoveDone(toGid(doc1), 20); + TEST_DO(stats.assertChanges(0, 1)); + f.notifyPut(toGid(doc1), 11, 30); + TEST_DO(stats.assertChanges(0, 1)); + f.notifyRemoveDone(toGid(doc1), 40); + TEST_DO(stats.assertChanges(0, 1)); + f.notifyPut(toGid(doc1), 12, 50); + TEST_DO(stats.assertChanges(1, 1)); + f.removeListeners("testdoc", {}); } } diff --git a/searchcore/src/tests/proton/reference/gid_to_lid_change_listener/gid_to_lid_change_listener_test.cpp b/searchcore/src/tests/proton/reference/gid_to_lid_change_listener/gid_to_lid_change_listener_test.cpp index 780a6d79ad6..08787e41438 100644 --- a/searchcore/src/tests/proton/reference/gid_to_lid_change_listener/gid_to_lid_change_listener_test.cpp +++ b/searchcore/src/tests/proton/reference/gid_to_lid_change_listener/gid_to_lid_change_listener_test.cpp @@ -117,8 +117,8 @@ struct Fixture _listener = std::make_unique<GidToLidChangeListener>(_writer, _attr, _refCount, "test", "testdoc"); } - void notifyGidToLidChange(const GlobalId &gid, uint32_t referencedDoc) { - _listener->notifyGidToLidChange(gid, referencedDoc); + void notifyPut(const GlobalId &gid, uint32_t referencedDoc) { + _listener->notifyPut(gid, referencedDoc); } void notifyListenerRegistered() { @@ -137,9 +137,9 @@ TEST_F("Test that we can use gid to lid change listener", Fixture) TEST_DO(f.assertRefLid(0, 2)); TEST_DO(f.assertRefLid(0, 3)); f.allocListener(); - f.notifyGidToLidChange(toGid(doc1), 10); - f.notifyGidToLidChange(toGid(doc2), 20); - f.notifyGidToLidChange(toGid(doc3), 30); + f.notifyPut(toGid(doc1), 10); + f.notifyPut(toGid(doc2), 20); + f.notifyPut(toGid(doc3), 30); TEST_DO(f.assertRefLid(10, 1)); TEST_DO(f.assertRefLid(20, 2)); TEST_DO(f.assertRefLid(10, 3)); diff --git a/searchcore/src/tests/proton/reference/gid_to_lid_change_registrator/gid_to_lid_change_registrator_test.cpp b/searchcore/src/tests/proton/reference/gid_to_lid_change_registrator/gid_to_lid_change_registrator_test.cpp index e97f117e481..a5231647158 100644 --- a/searchcore/src/tests/proton/reference/gid_to_lid_change_registrator/gid_to_lid_change_registrator_test.cpp +++ b/searchcore/src/tests/proton/reference/gid_to_lid_change_registrator/gid_to_lid_change_registrator_test.cpp @@ -24,7 +24,8 @@ public: { } virtual ~MyListener() { } - virtual void notifyGidToLidChange(document::GlobalId, uint32_t) override { } + virtual void notifyPut(document::GlobalId, uint32_t) override { } + virtual void notifyRemove(document::GlobalId) override { } virtual void notifyRegistered() override { } virtual const vespalib::string &getName() const override { return _name; } virtual const vespalib::string &getDocTypeName() const override { return _docTypeName; } diff --git a/searchcore/src/vespa/searchcore/proton/reference/gid_to_lid_change_handler.cpp b/searchcore/src/vespa/searchcore/proton/reference/gid_to_lid_change_handler.cpp index e7b347c4c80..d7b2c16867a 100644 --- a/searchcore/src/vespa/searchcore/proton/reference/gid_to_lid_change_handler.cpp +++ b/searchcore/src/vespa/searchcore/proton/reference/gid_to_lid_change_handler.cpp @@ -6,63 +6,98 @@ #include <vespa/searchcorespi/index/i_thread_service.h> #include <vespa/document/base/globalid.h> #include <cassert> +#include <vespa/vespalib/stllike/hash_map.hpp> using search::makeLambdaTask; namespace proton { -GidToLidChangeHandler::GidToLidChangeHandler(searchcorespi::index::IThreadService *master) +GidToLidChangeHandler::GidToLidChangeHandler() : _lock(), _listeners(), - _master(master) + _closed(false), + _pendingRemove() + { } GidToLidChangeHandler::~GidToLidChangeHandler() { - assert(_master == nullptr); + assert(_closed); assert(_listeners.empty()); + assert(_pendingRemove.empty()); } void -GidToLidChangeHandler::notifyGidToLidChange(document::GlobalId gid, uint32_t lid) +GidToLidChangeHandler::notifyPut(GlobalId gid, uint32_t lid) { for (const auto &listener : _listeners) { - listener->notifyGidToLidChange(gid, lid); + listener->notifyPut(gid, lid); } } void -GidToLidChangeHandler::close() +GidToLidChangeHandler::notifyRemove(GlobalId gid) +{ + for (const auto &listener : _listeners) { + listener->notifyRemove(gid); + } +} + +void +GidToLidChangeHandler::notifyPut(GlobalId gid, uint32_t lid, SerialNum serialNum) { lock_guard guard(_lock); - if (_master != nullptr) { - assert(_master->isCurrentThread()); - _master = nullptr; - _listeners.clear(); + auto itr = _pendingRemove.find(gid); + if (itr != _pendingRemove.end()) { + assert(itr->second > serialNum); + return; // Document has already been removed later on } + notifyPut(gid, lid); } void -GidToLidChangeHandler::addListener(std::unique_ptr<IGidToLidChangeListener> listener) +GidToLidChangeHandler::notifyRemove(GlobalId gid, SerialNum serialNum) { lock_guard guard(_lock); - if (_master) { - auto self(shared_from_this()); - _master->execute(makeLambdaTask([self,listener(std::move(listener))]() mutable { self->performAddListener(std::move(listener)); })); + auto insRes = _pendingRemove.insert(std::make_pair(gid, serialNum)); + if (!insRes.second) { + assert(insRes.first->second < serialNum); + insRes.first->second = serialNum; } else { - assert(_listeners.empty()); + notifyRemove(gid); } } +void +GidToLidChangeHandler::notifyRemoveDone(GlobalId gid, SerialNum serialNum) +{ + lock_guard guard(_lock); + auto itr = _pendingRemove.find(gid); + assert(itr != _pendingRemove.end() && itr->second >= serialNum); + if (itr->second == serialNum) { + _pendingRemove.erase(itr); + } +} void -GidToLidChangeHandler::performAddListener(std::unique_ptr<IGidToLidChangeListener> listener) +GidToLidChangeHandler::close() +{ + Listeners deferredDelete; + { + lock_guard guard(_lock); + _closed = true; + _listeners.swap(deferredDelete); + } +} + +void +GidToLidChangeHandler::addListener(std::unique_ptr<IGidToLidChangeListener> listener) { lock_guard guard(_lock); - if (_master) { + if (!_closed) { const vespalib::string &docTypeName = listener->getDocTypeName(); const vespalib::string &name = listener->getName(); for (const auto &oldlistener : _listeners) { @@ -77,19 +112,6 @@ GidToLidChangeHandler::performAddListener(std::unique_ptr<IGidToLidChangeListene } } -void -GidToLidChangeHandler::removeListeners(const vespalib::string &docTypeName, - const std::set<vespalib::string> &keepNames) -{ - lock_guard guard(_lock); - if (_master) { - auto self(shared_from_this()); - _master->execute(makeLambdaTask([self,docTypeName,keepNames]() mutable { self->performRemoveListener(docTypeName, keepNames); })); - } else { - assert(_listeners.empty()); - } -} - namespace { bool shouldRemoveListener(const IGidToLidChangeListener &listener, @@ -103,21 +125,25 @@ bool shouldRemoveListener(const IGidToLidChangeListener &listener, } void -GidToLidChangeHandler::performRemoveListener(const vespalib::string &docTypeName, - const std::set<vespalib::string> &keepNames) +GidToLidChangeHandler::removeListeners(const vespalib::string &docTypeName, + const std::set<vespalib::string> &keepNames) { - lock_guard guard(_lock); - if (_master) { - auto itr = _listeners.begin(); - while (itr != _listeners.end()) { - if (shouldRemoveListener(**itr, docTypeName, keepNames)) { - itr = _listeners.erase(itr); - } else { - ++itr; + Listeners deferredDelete; + { + lock_guard guard(_lock); + if (!_closed) { + auto itr = _listeners.begin(); + while (itr != _listeners.end()) { + if (shouldRemoveListener(**itr, docTypeName, keepNames)) { + deferredDelete.emplace_back(std::move(*itr)); + itr = _listeners.erase(itr); + } else { + ++itr; + } } + } else { + assert(_listeners.empty()); } - } else { - assert(_listeners.empty()); } } diff --git a/searchcore/src/vespa/searchcore/proton/reference/gid_to_lid_change_handler.h b/searchcore/src/vespa/searchcore/proton/reference/gid_to_lid_change_handler.h index 2197490b65e..736a34aba76 100644 --- a/searchcore/src/vespa/searchcore/proton/reference/gid_to_lid_change_handler.h +++ b/searchcore/src/vespa/searchcore/proton/reference/gid_to_lid_change_handler.h @@ -5,6 +5,8 @@ #include "i_gid_to_lid_change_handler.h" #include <vector> #include <mutex> +#include <vespa/vespalib/stllike/hash_map.h> +#include <vespa/document/base/globalid.h> namespace searchcorespi { namespace index { class IThreadService; } } @@ -19,25 +21,27 @@ class GidToLidChangeHandler : public std::enable_shared_from_this<GidToLidChange public IGidToLidChangeHandler { using lock_guard = std::lock_guard<std::mutex>; + using Listeners = std::vector<std::unique_ptr<IGidToLidChangeListener>>; std::mutex _lock; - std::vector<std::unique_ptr<IGidToLidChangeListener>> _listeners; - searchcorespi::index::IThreadService *_master; + Listeners _listeners; + bool _closed; + vespalib::hash_map<GlobalId, SerialNum, GlobalId::hash> _pendingRemove; - void performAddListener(std::unique_ptr<IGidToLidChangeListener> listener); - void performRemoveListener(const vespalib::string &docTypeName, - const std::set<vespalib::string> &keepNames); + void notifyPut(GlobalId gid, uint32_t lid); + void notifyRemove(GlobalId gid); public: - GidToLidChangeHandler(searchcorespi::index::IThreadService *master); + GidToLidChangeHandler(); virtual ~GidToLidChangeHandler(); /** - * Notify gid to lid mapping change. Called by master executor. + * Notify gid to lid mapping change. */ - virtual void notifyGidToLidChange(document::GlobalId gid, uint32_t lid) override; + virtual void notifyPut(GlobalId gid, uint32_t lid, SerialNum serialNum) override; + virtual void notifyRemove(GlobalId gid, SerialNum serialNum) override; + virtual void notifyRemoveDone(GlobalId gid, SerialNum serialNum) override; /** - * Close handler, further notifications are blocked. Called by master - * executor. + * Close handler, further notifications are blocked. */ void close(); diff --git a/searchcore/src/vespa/searchcore/proton/reference/gid_to_lid_change_listener.cpp b/searchcore/src/vespa/searchcore/proton/reference/gid_to_lid_change_listener.cpp index 5aba8bf3150..6a368997770 100644 --- a/searchcore/src/vespa/searchcore/proton/reference/gid_to_lid_change_listener.cpp +++ b/searchcore/src/vespa/searchcore/proton/reference/gid_to_lid_change_listener.cpp @@ -26,12 +26,22 @@ GidToLidChangeListener::~GidToLidChangeListener() } void -GidToLidChangeListener::notifyGidToLidChange(document::GlobalId gid, uint32_t lid) +GidToLidChangeListener::notifyPut(document::GlobalId gid, uint32_t lid) { std::promise<bool> promise; std::future<bool> future = promise.get_future(); _attributeFieldWriter.executeLambda(_executorId, - [this, &promise, gid, lid]() { _attr->notifyGidToLidChange(gid, lid); promise.set_value(true); }); + [this, &promise, gid, lid]() { _attr->notifyReferencedPut(gid, lid); promise.set_value(true); }); + (void) future.get(); +} + +void +GidToLidChangeListener::notifyRemove(document::GlobalId gid) +{ + std::promise<bool> promise; + std::future<bool> future = promise.get_future(); + _attributeFieldWriter.executeLambda(_executorId, + [this, &promise, gid]() { _attr->notifyReferencedRemove(gid); promise.set_value(true); }); (void) future.get(); } diff --git a/searchcore/src/vespa/searchcore/proton/reference/gid_to_lid_change_listener.h b/searchcore/src/vespa/searchcore/proton/reference/gid_to_lid_change_listener.h index 8a3a7d00cec..35ff913d7af 100644 --- a/searchcore/src/vespa/searchcore/proton/reference/gid_to_lid_change_listener.h +++ b/searchcore/src/vespa/searchcore/proton/reference/gid_to_lid_change_listener.h @@ -30,7 +30,8 @@ public: const vespalib::string &name, const vespalib::string &docTypeName); virtual ~GidToLidChangeListener(); - virtual void notifyGidToLidChange(document::GlobalId gid, uint32_t lid) override; + virtual void notifyPut(document::GlobalId gid, uint32_t lid) override; + virtual void notifyRemove(document::GlobalId gid) override; virtual void notifyRegistered() override; virtual const vespalib::string &getName() const override; virtual const vespalib::string &getDocTypeName() const override; diff --git a/searchcore/src/vespa/searchcore/proton/reference/i_gid_to_lid_change_handler.h b/searchcore/src/vespa/searchcore/proton/reference/i_gid_to_lid_change_handler.h index 73ff140e2c6..a3b1db59abd 100644 --- a/searchcore/src/vespa/searchcore/proton/reference/i_gid_to_lid_change_handler.h +++ b/searchcore/src/vespa/searchcore/proton/reference/i_gid_to_lid_change_handler.h @@ -5,6 +5,7 @@ #include <set> #include <memory> #include <vespa/vespalib/stllike/string.h> +#include <vespa/searchlib/common/serialnum.h> namespace document { class GlobalId; } @@ -19,14 +20,18 @@ class IGidToLidChangeListener; class IGidToLidChangeHandler { public: + using SerialNum = search::SerialNum; + using GlobalId = document::GlobalId; + virtual ~IGidToLidChangeHandler() { } virtual void addListener(std::unique_ptr<IGidToLidChangeListener> listener) = 0; virtual void removeListeners(const vespalib::string &docTypeName, const std::set<vespalib::string> &keepNames) = 0; - virtual void notifyGidToLidChange(document::GlobalId gid, uint32_t lid) = 0; - + virtual void notifyPut(GlobalId gid, uint32_t lid, SerialNum serialNum) = 0; + virtual void notifyRemove(GlobalId gid, SerialNum serialNum) = 0; + virtual void notifyRemoveDone(GlobalId gid, SerialNum serialNum) = 0; }; } // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/reference/i_gid_to_lid_change_listener.h b/searchcore/src/vespa/searchcore/proton/reference/i_gid_to_lid_change_listener.h index 317f378bedc..d02979e168f 100644 --- a/searchcore/src/vespa/searchcore/proton/reference/i_gid_to_lid_change_listener.h +++ b/searchcore/src/vespa/searchcore/proton/reference/i_gid_to_lid_change_listener.h @@ -17,7 +17,8 @@ class IGidToLidChangeListener { public: virtual ~IGidToLidChangeListener() { } - virtual void notifyGidToLidChange(document::GlobalId gid, uint32_t lid) = 0; + virtual void notifyPut(document::GlobalId gid, uint32_t lid) = 0; + virtual void notifyRemove(document::GlobalId gid) = 0; virtual void notifyRegistered() = 0; virtual const vespalib::string &getName() const = 0; virtual const vespalib::string &getDocTypeName() const = 0; diff --git a/searchcore/src/vespa/searchcore/proton/server/searchable_feed_view.cpp b/searchcore/src/vespa/searchcore/proton/server/searchable_feed_view.cpp index 3a3264336c8..913956c509f 100644 --- a/searchcore/src/vespa/searchcore/proton/server/searchable_feed_view.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/searchable_feed_view.cpp @@ -262,9 +262,21 @@ SearchableFeedView::forceCommit(SerialNum serialNum, OnForceCommitDoneType onCom } void -SearchableFeedView::notifyGidToLidChange(const document::GlobalId &gid, uint32_t lid) +SearchableFeedView::notifyPutGidToLidChange(const document::GlobalId &gid, uint32_t lid, SerialNum serialNum) { - _gidToLidChangeHandler->notifyGidToLidChange(gid, lid); + _gidToLidChangeHandler->notifyPut(gid, lid, serialNum); +} + +void +SearchableFeedView::notifyRemoveGidToLidChange(const document::GlobalId &gid, SerialNum serialNum) +{ + _gidToLidChangeHandler->notifyRemove(gid, serialNum); +} + +void +SearchableFeedView::notifyRemoveDoneGidToLidChange(const document::GlobalId &gid, SerialNum serialNum) +{ + _gidToLidChangeHandler->notifyRemoveDone(gid, serialNum); } } // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/server/searchable_feed_view.h b/searchcore/src/vespa/searchcore/proton/server/searchable_feed_view.h index c0d9bfcfbc6..4b536d0adde 100644 --- a/searchcore/src/vespa/searchcore/proton/server/searchable_feed_view.h +++ b/searchcore/src/vespa/searchcore/proton/server/searchable_feed_view.h @@ -86,7 +86,9 @@ private: void performIndexForceCommit(SerialNum serialNum, OnForceCommitDoneType onCommitDone); void forceCommit(SerialNum serialNum, OnForceCommitDoneType onCommitDone) override; - virtual void notifyGidToLidChange(const document::GlobalId &gid, uint32_t lid) override; + virtual void notifyPutGidToLidChange(const document::GlobalId &gid, uint32_t lid, SerialNum serialNum) override; + virtual void notifyRemoveGidToLidChange(const document::GlobalId &gid, SerialNum serialNum) override; + virtual void notifyRemoveDoneGidToLidChange(const document::GlobalId &gid, SerialNum serialNum) override; public: SearchableFeedView(const StoreOnlyFeedView::Context &storeOnlyCtx, const PersistentParams ¶ms, diff --git a/searchcore/src/vespa/searchcore/proton/server/searchabledocsubdb.cpp b/searchcore/src/vespa/searchcore/proton/server/searchabledocsubdb.cpp index 8eb36435ed5..e9e0120dac1 100644 --- a/searchcore/src/vespa/searchcore/proton/server/searchabledocsubdb.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/searchabledocsubdb.cpp @@ -49,7 +49,7 @@ SearchableDocSubDB::SearchableDocSubDB(const Config &cfg, const Context &ctx) getSubDbName(), ctx._fastUpdCtx._storeOnlyCtx._owner.getDistributionKey()), _numSearcherThreads(cfg._numSearcherThreads), _warmupExecutor(ctx._warmupExecutor), - _gidToLidChangeHandler(std::make_shared<GidToLidChangeHandler>(&_writeService.master())) + _gidToLidChangeHandler(std::make_shared<GidToLidChangeHandler>()) { } SearchableDocSubDB::~SearchableDocSubDB() diff --git a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp index da3e09bea41..3e7e3d6ddfa 100644 --- a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp @@ -611,16 +611,17 @@ StoreOnlyFeedView::adjustMetaStore(const DocumentOperation &op, const DocumentId op.getLid() != op.getPrevLid()) { moveMetaData(_metaStore, docId, op); - notifyGidToLidChange(docId.getGlobalId(), op.getLid()); + notifyPutGidToLidChange(docId.getGlobalId(), op.getLid(), serialNum); } else { putMetaData(_metaStore, docId, op, _params._subDbType == SubDbType::REMOVED); if (op.getDbDocumentId() != op.getPrevDbDocumentId()) { - notifyGidToLidChange(docId.getGlobalId(), op.getLid()); + notifyPutGidToLidChange(docId.getGlobalId(), op.getLid(), serialNum); } } } else if (op.getValidPrevDbdId(_params._subDbId)) { removeMetaData(_metaStore, docId, op, _params._subDbType == SubDbType::REMOVED); - notifyGidToLidChange(docId.getGlobalId(), 0u); + notifyRemoveGidToLidChange(docId.getGlobalId(), serialNum); + notifyRemoveDoneGidToLidChange(docId.getGlobalId(), serialNum); } _metaStore.commit(serialNum, serialNum); } @@ -651,7 +652,8 @@ StoreOnlyFeedView::removeDocuments(const RemoveDocumentsOperation &op, bool remo std::vector<document::GlobalId> gidsToRemove(getGidsToRemove(_metaStore, lidsToRemove)); _metaStore.removeBatch(lidsToRemove, ctx->getDocIdLimit()); for (const auto &gid : gidsToRemove) { - notifyGidToLidChange(gid, 0u); + notifyRemoveGidToLidChange(gid, serialNum); + notifyRemoveDoneGidToLidChange(gid, serialNum); } _metaStore.commit(serialNum, serialNum); explicitReuseLids = _lidReuseDelayer.delayReuse(lidsToRemove); @@ -806,6 +808,12 @@ StoreOnlyFeedView::getDocumentMetaStorePtr() const } void -StoreOnlyFeedView::notifyGidToLidChange(const document::GlobalId &, uint32_t ) {} +StoreOnlyFeedView::notifyPutGidToLidChange(const document::GlobalId &, uint32_t, SerialNum) {} + +void +StoreOnlyFeedView::notifyRemoveGidToLidChange(const document::GlobalId &, SerialNum) {} + +void +StoreOnlyFeedView::notifyRemoveDoneGidToLidChange(const document::GlobalId &, SerialNum) {} } // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h index 982acf200a7..ec3003fb5d6 100644 --- a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h +++ b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h @@ -179,7 +179,9 @@ private: // Ack token early if visibility delay is nonzero void considerEarlyAck(FeedTokenUP &token, FeedOperation::Type opType); - virtual void notifyGidToLidChange(const document::GlobalId &gid, uint32_t lid); + virtual void notifyPutGidToLidChange(const document::GlobalId &gid, uint32_t lid, SerialNum serialNum); + virtual void notifyRemoveGidToLidChange(const document::GlobalId &gid, SerialNum serialNum); + virtual void notifyRemoveDoneGidToLidChange(const document::GlobalId &gid, SerialNum serialNum); void makeUpdatedDocument(SerialNum serialNum, Lid lid, DocumentUpdate::SP upd, OnOperationDoneType onWriteDone,PromisedDoc promisedDoc, PromisedStream promisedStream); diff --git a/searchcore/src/vespa/searchcore/proton/test/mock_gid_to_lid_change_handler.h b/searchcore/src/vespa/searchcore/proton/test/mock_gid_to_lid_change_handler.h index 2aa613461ef..41efb55e61c 100644 --- a/searchcore/src/vespa/searchcore/proton/test/mock_gid_to_lid_change_handler.h +++ b/searchcore/src/vespa/searchcore/proton/test/mock_gid_to_lid_change_handler.h @@ -44,7 +44,9 @@ public: _removes.emplace_back(docTypeName, keepNames); } - virtual void notifyGidToLidChange(document::GlobalId, uint32_t) override { } + virtual void notifyPut(document::GlobalId, uint32_t, SerialNum) override { } + virtual void notifyRemove(document::GlobalId, SerialNum) override { } + virtual void notifyRemoveDone(document::GlobalId, SerialNum) override { } void assertAdds(const std::vector<AddEntry> &expAdds) { diff --git a/searchlib/src/tests/attribute/reference_attribute/reference_attribute_test.cpp b/searchlib/src/tests/attribute/reference_attribute/reference_attribute_test.cpp index 7e2e8904170..1b1bc1f8796 100644 --- a/searchlib/src/tests/attribute/reference_attribute/reference_attribute_test.cpp +++ b/searchlib/src/tests/attribute/reference_attribute/reference_attribute_test.cpp @@ -182,8 +182,11 @@ struct Fixture iter, oldStatus.getUsed(), newStatus.getUsed()); } - void notifyGidToLidChange(const GlobalId &gid, uint32_t referencedDoc) { - _attr->notifyGidToLidChange(gid, referencedDoc); + void notifyReferencedPut(const GlobalId &gid, uint32_t referencedDoc) { + _attr->notifyReferencedPut(gid, referencedDoc); + } + void notifyReferencedRemove(const GlobalId &gid) { + _attr->notifyReferencedRemove(gid); } void populateReferencedLids() { _attr->populateReferencedLids(); @@ -300,7 +303,7 @@ TEST_F("require that update() uses gid-mapper to set referenced lid", Fixture) TEST_DO(f.assertRefLid(5, 0)); } -TEST_F("require that notifyGidToLidChange() updates lid-2-lid mapping", Fixture) +TEST_F("require that notifyReferencedPut() updates lid-2-lid mapping", Fixture) { f.ensureDocIdLimit(4); f.set(1, toGid(doc1)); @@ -310,9 +313,9 @@ TEST_F("require that notifyGidToLidChange() updates lid-2-lid mapping", Fixture) TEST_DO(f.assertRefLid(1, 0)); TEST_DO(f.assertRefLid(2, 0)); TEST_DO(f.assertRefLid(3, 0)); - f.notifyGidToLidChange(toGid(doc1), 10); - f.notifyGidToLidChange(toGid(doc2), 20); - f.notifyGidToLidChange(toGid(doc3), 30); + f.notifyReferencedPut(toGid(doc1), 10); + f.notifyReferencedPut(toGid(doc2), 20); + f.notifyReferencedPut(toGid(doc3), 30); TEST_DO(f.assertRefLid(1, 10)); TEST_DO(f.assertRefLid(2, 20)); TEST_DO(f.assertRefLid(3, 10)); @@ -369,18 +372,18 @@ TEST_F("require that populateReferencedLids() uses gid-mapper to update lid-2-li EXPECT_TRUE(vespalib::unlink("test.udat")); } -TEST_F("Require that notifyGidToLidChange changes reverse mapping", Fixture) +TEST_F("Require that notifyReferencedPut and notifyReferencedRemove changes reverse mapping", Fixture) { TEST_DO(preparePopulateReferencedLids(f)); TEST_DO(f.assertLids(10, { })); TEST_DO(f.assertLids(11, { })); - f.notifyGidToLidChange(toGid(doc1), 10); + f.notifyReferencedPut(toGid(doc1), 10); TEST_DO(f.assertLids(10, { 1, 3})); TEST_DO(f.assertLids(11, { })); - f.notifyGidToLidChange(toGid(doc1), 11); + f.notifyReferencedPut(toGid(doc1), 11); TEST_DO(f.assertLids(10, { })); TEST_DO(f.assertLids(11, { 1, 3})); - f.notifyGidToLidChange(toGid(doc1), 0); + f.notifyReferencedRemove(toGid(doc1)); TEST_DO(f.assertLids(10, { })); TEST_DO(f.assertLids(11, { })); } @@ -406,8 +409,8 @@ TEST_F("Require that reverse mapping recovers from temporary out of order glitch TEST_DO(f.assertRefLid(2, 17)); TEST_DO(f.assertRefLid(3, 10)); // Notify reference attribute about gid to lid mapping changes - f.notifyGidToLidChange(toGid(doc1), 0); - f.notifyGidToLidChange(toGid(doc3), 10); + f.notifyReferencedRemove(toGid(doc1)); + f.notifyReferencedPut(toGid(doc3), 10); TEST_DO(f.assertRefLid(1, 0)); TEST_DO(f.assertRefLid(2, 17)); TEST_DO(f.assertRefLid(3, 10)); diff --git a/searchlib/src/vespa/searchlib/attribute/reference_attribute.cpp b/searchlib/src/vespa/searchlib/attribute/reference_attribute.cpp index 7e17ba3a808..2cb05b14503 100644 --- a/searchlib/src/vespa/searchlib/attribute/reference_attribute.cpp +++ b/searchlib/src/vespa/searchlib/attribute/reference_attribute.cpp @@ -320,12 +320,23 @@ ReferenceAttribute::setGidToLidMapperFactory(std::shared_ptr<IGidToLidMapperFact } void -ReferenceAttribute::notifyGidToLidChange(const GlobalId &gid, DocId referencedLid) +ReferenceAttribute::notifyReferencedPut(const GlobalId &gid, DocId referencedLid) { EntryRef ref = _store.find(gid); if (ref.valid()) { const auto &entry = _store.get(ref); - _referenceMappings.notifyGidToLidChange(entry, referencedLid); + _referenceMappings.notifyReferencedPut(entry, referencedLid); + commit(); + } +} + +void +ReferenceAttribute::notifyReferencedRemove(const GlobalId &gid) +{ + EntryRef ref = _store.find(gid); + if (ref.valid()) { + const auto &entry = _store.get(ref); + _referenceMappings.notifyReferencedRemove(entry); commit(); } } diff --git a/searchlib/src/vespa/searchlib/attribute/reference_attribute.h b/searchlib/src/vespa/searchlib/attribute/reference_attribute.h index 4ee277b8733..ce7c908db99 100644 --- a/searchlib/src/vespa/searchlib/attribute/reference_attribute.h +++ b/searchlib/src/vespa/searchlib/attribute/reference_attribute.h @@ -78,7 +78,8 @@ public: ReverseMappingRefs getReverseMappingRefs() const { return _referenceMappings.getReverseMappingRefs(); } const ReverseMapping &getReverseMapping() const { return _referenceMappings.getReverseMapping(); } - void notifyGidToLidChange(const GlobalId &gid, DocId referencedLid); + void notifyReferencedPut(const GlobalId &gid, DocId referencedLid); + void notifyReferencedRemove(const GlobalId &gid); void populateReferencedLids(); virtual void clearDocs(DocId lidLow, DocId lidLimit) override; virtual void onShrinkLidSpace() override; diff --git a/searchlib/src/vespa/searchlib/attribute/reference_mappings.cpp b/searchlib/src/vespa/searchlib/attribute/reference_mappings.cpp index 4edd9d45e60..f2462cdc40d 100644 --- a/searchlib/src/vespa/searchlib/attribute/reference_mappings.cpp +++ b/searchlib/src/vespa/searchlib/attribute/reference_mappings.cpp @@ -93,7 +93,7 @@ ReferenceMappings::buildReverseMapping(const Reference &entry, const std::vector } void -ReferenceMappings::notifyGidToLidChange(const Reference &entry, uint32_t referencedLid) +ReferenceMappings::notifyReferencedPut(const Reference &entry, uint32_t referencedLid) { uint32_t oldReferencedLid = entry.lid(); if (oldReferencedLid != referencedLid) { @@ -107,6 +107,20 @@ ReferenceMappings::notifyGidToLidChange(const Reference &entry, uint32_t referen } void +ReferenceMappings::notifyReferencedRemove(const Reference &entry) +{ + uint32_t oldReferencedLid = entry.lid(); + if (oldReferencedLid != 0) { + if (oldReferencedLid < _reverseMappingIndices.size()) { + _reverseMappingIndices[oldReferencedLid] = EntryRef(); + } + entry.setLid(0); + } + syncReverseMappingIndices(entry); + syncForwardMapping(entry); +} + +void ReferenceMappings::onAddDocs(uint32_t docIdLimit) { _referencedLids.reserve(docIdLimit); diff --git a/searchlib/src/vespa/searchlib/attribute/reference_mappings.h b/searchlib/src/vespa/searchlib/attribute/reference_mappings.h index 3190e1b5a83..73754d9cb13 100644 --- a/searchlib/src/vespa/searchlib/attribute/reference_mappings.h +++ b/searchlib/src/vespa/searchlib/attribute/reference_mappings.h @@ -57,7 +57,8 @@ public: void transferHoldLists(generation_t generation) { _reverseMapping.transferHoldLists(generation); } // Handle mapping changes - void notifyGidToLidChange(const Reference &entry, uint32_t referencedLid); + void notifyReferencedPut(const Reference &entry, uint32_t referencedLid); + void notifyReferencedRemove(const Reference &entry); void removeReverseMapping(const Reference &entry, uint32_t lid); void addReverseMapping(const Reference &entry, uint32_t lid); void syncMappings(const Reference &entry); |