diff options
author | Tor Egge <Tor.Egge@broadpark.no> | 2020-11-19 12:53:49 +0100 |
---|---|---|
committer | Tor Egge <Tor.Egge@broadpark.no> | 2020-11-19 16:39:21 +0100 |
commit | 67f83ab6dbf0912cf53885b65991aa63d5a41fcc (patch) | |
tree | 3cd9dbe568d77e7ded3d1f3b0f9146320efd0d1b /searchcore | |
parent | 19e2e6fcc911a17041771784f2dfa27b002bb27b (diff) |
Delay gid to lid change notifications for put operations.
Diffstat (limited to 'searchcore')
28 files changed, 260 insertions, 299 deletions
diff --git a/searchcore/src/tests/proton/documentdb/feedview/feedview_test.cpp b/searchcore/src/tests/proton/documentdb/feedview/feedview_test.cpp index b875ab8e058..2b2b8acbc50 100644 --- a/searchcore/src/tests/proton/documentdb/feedview/feedview_test.cpp +++ b/searchcore/src/tests/proton/documentdb/feedview/feedview_test.cpp @@ -179,23 +179,20 @@ public: { } - void notifyPutDone(IDestructorCallbackSP, document::GlobalId gid, uint32_t lid, SerialNum) override { + void notifyPut(IDestructorCallbackSP, document::GlobalId gid, uint32_t lid, SerialNum) override { _changeGid = gid; _changeLid = lid; _gidToLid[gid] = lid; ++_changes; } - void notifyRemove(IDestructorCallbackSP, document::GlobalId gid, SerialNum) override { + void notifyRemove(IDestructorCallbackSP, document::GlobalId gid, SerialNum) override { _changeGid = gid; _changeLid = 0; _gidToLid[gid] = 0; ++_changes; } - void notifyRemoveDone(document::GlobalId, SerialNum) override { - } - void assertChanges(document::GlobalId expGid, uint32_t expLid, uint32_t expChanges) { EXPECT_EQUAL(expGid, _changeGid); EXPECT_EQUAL(expLid, _changeLid); diff --git a/searchcore/src/tests/proton/reference/gid_to_lid_change_handler/gid_to_lid_change_handler_test.cpp b/searchcore/src/tests/proton/reference/gid_to_lid_change_handler/gid_to_lid_change_handler_test.cpp index a10d48ee7fe..9d72045c918 100644 --- a/searchcore/src/tests/proton/reference/gid_to_lid_change_handler/gid_to_lid_change_handler_test.cpp +++ b/searchcore/src/tests/proton/reference/gid_to_lid_change_handler/gid_to_lid_change_handler_test.cpp @@ -6,6 +6,7 @@ #include <vespa/searchcore/proton/server/executor_thread_service.h> #include <vespa/vespalib/util/lambdatask.h> #include <vespa/searchcore/proton/reference/i_gid_to_lid_change_listener.h> +#include <vespa/searchcore/proton/reference/i_pending_gid_to_lid_changes.h> #include <vespa/searchcore/proton/reference/gid_to_lid_change_handler.h> #include <vespa/searchlib/common/gatecallback.h> #include <map> @@ -112,11 +113,13 @@ public: struct Fixture { std::vector<std::shared_ptr<ListenerStats>> _statss; - std::shared_ptr<GidToLidChangeHandler> _handler; + std::shared_ptr<GidToLidChangeHandler> _real_handler; + std::shared_ptr<IGidToLidChangeHandler> _handler; Fixture() : _statss(), - _handler(std::make_shared<GidToLidChangeHandler>()) + _real_handler(std::make_shared<GidToLidChangeHandler>()), + _handler(_real_handler) { } @@ -127,7 +130,7 @@ struct Fixture void close() { - _handler->close(); + _real_handler->close(); } ListenerStats &addStats() { @@ -139,10 +142,15 @@ struct Fixture _handler->addListener(std::move(listener)); } - void notifyPutDone(GlobalId gid, uint32_t lid, SerialNum serialNum) { - vespalib::Gate gate; - _handler->notifyPutDone(std::make_shared<search::GateCallback>(gate), gid, lid, serialNum); - gate.await(); + void commit() { + auto pending = _handler->grab_pending_changes(); + if (pending) { + pending->notify_done(); + } + } + + void notifyPut(GlobalId gid, uint32_t lid, SerialNum serial_num) { + _handler->notifyPut(std::shared_ptr<search::IDestructorCallback>(), gid, lid, serial_num); } void notifyRemove(GlobalId gid, SerialNum serialNum) { @@ -151,10 +159,6 @@ struct Fixture gate.await(); } - void notifyRemoveDone(GlobalId gid, SerialNum serialNum) { - _handler->notifyRemoveDone(gid, serialNum); - } - void removeListeners(const vespalib::string &docTypeName, const std::set<vespalib::string> &keepNames) { _handler->removeListeners(docTypeName, keepNames); @@ -169,7 +173,8 @@ TEST_F("Test that we can register a listener", Fixture) TEST_DO(stats.assertListeners(1, 0, 0)); f.addListener(std::move(listener)); TEST_DO(stats.assertListeners(1, 1, 0)); - f.notifyPutDone(toGid(doc1), 10, 10); + f.notifyPut(toGid(doc1), 10, 10); + f.commit(); TEST_DO(stats.assertChanges(1, 0)); f.removeListeners("testdoc", {}); TEST_DO(stats.assertListeners(1, 1, 1)); @@ -192,7 +197,8 @@ TEST_F("Test that we can register multiple listeners", Fixture) TEST_DO(stats1.assertListeners(1, 1, 0)); TEST_DO(stats2.assertListeners(1, 1, 0)); TEST_DO(stats3.assertListeners(1, 1, 0)); - f.notifyPutDone(toGid(doc1), 10, 10); + f.notifyPut(toGid(doc1), 10, 10); + f.commit(); TEST_DO(stats1.assertChanges(1, 0)); TEST_DO(stats2.assertChanges(1, 0)); TEST_DO(stats3.assertChanges(1, 0)); @@ -250,62 +256,39 @@ public: } }; -TEST_F("Test that put is ignored if we have a pending remove", StatsFixture) +TEST_F("Test that multiple puts are processed", StatsFixture) { - f.notifyRemove(toGid(doc1), 20); - TEST_DO(f.assertChanges(0, 1)); - f.notifyPutDone(toGid(doc1), 10, 10); - TEST_DO(f.assertChanges(0, 1)); - f.notifyRemoveDone(toGid(doc1), 20); - TEST_DO(f.assertChanges(0, 1)); - f.notifyPutDone(toGid(doc1), 11, 30); - TEST_DO(f.assertChanges(1, 1)); + f.notifyPut(toGid(doc1), 10, 10); + TEST_DO(f.assertChanges(0, 0)); + f.notifyPut(toGid(doc1), 11, 20); + TEST_DO(f.assertChanges(0, 0)); + f.commit(); + TEST_DO(f.assertChanges(2, 0)); } -TEST_F("Test that pending removes are merged", StatsFixture) +TEST_F("Test that put is ignored if we have a pending remove", StatsFixture) { + f.notifyPut(toGid(doc1), 10, 10); + TEST_DO(f.assertChanges(0, 0)); f.notifyRemove(toGid(doc1), 20); TEST_DO(f.assertChanges(0, 1)); - f.notifyRemove(toGid(doc1), 40); + f.commit(); TEST_DO(f.assertChanges(0, 1)); - f.notifyPutDone(toGid(doc1), 10, 10); - TEST_DO(f.assertChanges(0, 1)); - f.notifyRemoveDone(toGid(doc1), 20); - TEST_DO(f.assertChanges(0, 1)); - f.notifyPutDone(toGid(doc1), 11, 30); - TEST_DO(f.assertChanges(0, 1)); - f.notifyRemoveDone(toGid(doc1), 40); - TEST_DO(f.assertChanges(0, 1)); - f.notifyPutDone(toGid(doc1), 12, 50); + f.notifyPut(toGid(doc1), 11, 30); + f.commit(); TEST_DO(f.assertChanges(1, 1)); } -TEST_F("Test that out of order notifyRemoveDone is handled", StatsFixture) +TEST_F("Test that pending removes are merged", StatsFixture) { - f.notifyRemove(toGid(doc1), 20); + f.notifyPut(toGid(doc1), 10, 10); + TEST_DO(f.assertChanges(0, 0)); + f.notifyRemove(toGid(doc1), 20); TEST_DO(f.assertChanges(0, 1)); f.notifyRemove(toGid(doc1), 40); TEST_DO(f.assertChanges(0, 1)); - f.notifyRemoveDone(toGid(doc1), 40); - TEST_DO(f.assertChanges(0, 1)); - f.notifyRemoveDone(toGid(doc1), 20); + f.commit(); TEST_DO(f.assertChanges(0, 1)); - f.notifyPutDone(toGid(doc1), 12, 50); - TEST_DO(f.assertChanges(1, 1)); -} - -TEST_F("Test that out of order notifyPutDone is partially handled", StatsFixture) -{ - f.notifyRemove(toGid(doc1), 20); - TEST_DO(f.assertChanges(0, 1)); - f.notifyPutDone(toGid(doc1), 12, 50); - TEST_DO(f.assertChanges(1, 1)); - f.notifyPutDone(toGid(doc1), 11, 40); - TEST_DO(f.assertChanges(1, 1)); - f.notifyPutDone(toGid(doc1), 13, 55); - TEST_DO(f.assertChanges(2, 1)); - f.notifyRemoveDone(toGid(doc1), 20); - TEST_DO(f.assertChanges(2, 1)); } } diff --git a/searchcore/src/vespa/searchcore/proton/reference/CMakeLists.txt b/searchcore/src/vespa/searchcore/proton/reference/CMakeLists.txt index a98b095cc21..1ba96e9adf5 100644 --- a/searchcore/src/vespa/searchcore/proton/reference/CMakeLists.txt +++ b/searchcore/src/vespa/searchcore/proton/reference/CMakeLists.txt @@ -10,7 +10,7 @@ vespa_add_library(searchcore_reference STATIC gid_to_lid_change_registrator.cpp gid_to_lid_mapper.cpp gid_to_lid_mapper_factory.cpp - pending_notify_remove_done.cpp + pending_gid_to_lid_changes.cpp DEPENDS searchcore_attribute searchcore_documentmetastore diff --git a/searchcore/src/vespa/searchcore/proton/reference/dummy_gid_to_lid_change_handler.cpp b/searchcore/src/vespa/searchcore/proton/reference/dummy_gid_to_lid_change_handler.cpp index 6c45096a53f..9c86d8bc083 100644 --- a/searchcore/src/vespa/searchcore/proton/reference/dummy_gid_to_lid_change_handler.cpp +++ b/searchcore/src/vespa/searchcore/proton/reference/dummy_gid_to_lid_change_handler.cpp @@ -1,6 +1,7 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "dummy_gid_to_lid_change_handler.h" +#include "i_pending_gid_to_lid_changes.h" namespace proton { @@ -9,7 +10,7 @@ DummyGidToLidChangeHandler::DummyGidToLidChangeHandler() = default; DummyGidToLidChangeHandler::~DummyGidToLidChangeHandler() = default; void -DummyGidToLidChangeHandler::notifyPutDone(IDestructorCallbackSP , GlobalId, uint32_t, SerialNum) +DummyGidToLidChangeHandler::notifyPut(IDestructorCallbackSP, GlobalId, uint32_t, SerialNum) { } @@ -18,9 +19,10 @@ DummyGidToLidChangeHandler::notifyRemove(IDestructorCallbackSP , GlobalId, Seria { } -void -DummyGidToLidChangeHandler::notifyRemoveDone(GlobalId, SerialNum) +std::unique_ptr<IPendingGidToLidChanges> +DummyGidToLidChangeHandler::grab_pending_changes() { + return std::unique_ptr<IPendingGidToLidChanges>(); } void diff --git a/searchcore/src/vespa/searchcore/proton/reference/dummy_gid_to_lid_change_handler.h b/searchcore/src/vespa/searchcore/proton/reference/dummy_gid_to_lid_change_handler.h index d5f6d788885..54cc0e2144a 100644 --- a/searchcore/src/vespa/searchcore/proton/reference/dummy_gid_to_lid_change_handler.h +++ b/searchcore/src/vespa/searchcore/proton/reference/dummy_gid_to_lid_change_handler.h @@ -22,11 +22,11 @@ public: DummyGidToLidChangeHandler(); ~DummyGidToLidChangeHandler() override; - void notifyPutDone(IDestructorCallbackSP context, GlobalId gid, uint32_t lid, SerialNum serialNum) override; + void notifyPut(IDestructorCallbackSP context, GlobalId gid, uint32_t lid, SerialNum serial_num) override; void notifyRemove(IDestructorCallbackSP context, GlobalId gid, SerialNum serialNum) override; - void notifyRemoveDone(GlobalId gid, SerialNum serialNum) override; void addListener(std::unique_ptr<IGidToLidChangeListener> listener) override; void removeListeners(const vespalib::string &docTypeName, const std::set<vespalib::string> &keepNames) override; + std::unique_ptr<IPendingGidToLidChanges> grab_pending_changes() override; }; } // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/reference/gid_to_lid_change_handler.cpp b/searchcore/src/vespa/searchcore/proton/reference/gid_to_lid_change_handler.cpp index 0c9087405b6..833dfa415ad 100644 --- a/searchcore/src/vespa/searchcore/proton/reference/gid_to_lid_change_handler.cpp +++ b/searchcore/src/vespa/searchcore/proton/reference/gid_to_lid_change_handler.cpp @@ -2,6 +2,7 @@ #include "gid_to_lid_change_handler.h" #include "i_gid_to_lid_change_listener.h" +#include "pending_gid_to_lid_changes.h" #include <vespa/vespalib/util/lambdatask.h> #include <cassert> #include <vespa/vespalib/stllike/hash_map.hpp> @@ -14,8 +15,8 @@ GidToLidChangeHandler::GidToLidChangeHandler() : _lock(), _listeners(), _closed(false), - _pendingRemove() - + _pendingRemove(), + _pending_changes() { } @@ -43,6 +44,13 @@ GidToLidChangeHandler::notifyRemove(IDestructorCallbackSP context, GlobalId gid) } void +GidToLidChangeHandler::notifyPut(IDestructorCallbackSP context, GlobalId gid, uint32_t lid, SerialNum serial_num) +{ + lock_guard guard(_lock); + _pending_changes.emplace_back(std::move(context), gid, lid, serial_num, false); +} + +void GidToLidChangeHandler::notifyPutDone(IDestructorCallbackSP context, GlobalId gid, uint32_t lid, SerialNum serialNum) { lock_guard guard(_lock); @@ -79,6 +87,7 @@ GidToLidChangeHandler::notifyRemove(IDestructorCallbackSP context, GlobalId gid, } else { notifyRemove(std::move(context), gid); } + _pending_changes.emplace_back(IDestructorCallbackSP(), gid, 0, serialNum, true); } void @@ -96,6 +105,16 @@ GidToLidChangeHandler::notifyRemoveDone(GlobalId gid, SerialNum serialNum) } } +std::unique_ptr<IPendingGidToLidChanges> +GidToLidChangeHandler::grab_pending_changes() +{ + lock_guard guard(_lock); + if (_pending_changes.empty()) { + return std::unique_ptr<IPendingGidToLidChanges>(); + } + return std::make_unique<PendingGidToLidChanges>(*this, std::move(_pending_changes)); +} + void GidToLidChangeHandler::close() { diff --git a/searchcore/src/vespa/searchcore/proton/reference/gid_to_lid_change_handler.h b/searchcore/src/vespa/searchcore/proton/reference/gid_to_lid_change_handler.h index 13a51edd0b5..25645aebcc9 100644 --- a/searchcore/src/vespa/searchcore/proton/reference/gid_to_lid_change_handler.h +++ b/searchcore/src/vespa/searchcore/proton/reference/gid_to_lid_change_handler.h @@ -3,8 +3,8 @@ #pragma once #include "i_gid_to_lid_change_handler.h" +#include "pending_gid_to_lid_change.h" #include <vespa/vespalib/stllike/hash_map.h> -#include <vespa/document/base/globalid.h> #include <vector> #include <mutex> @@ -44,6 +44,7 @@ class GidToLidChangeHandler : public std::enable_shared_from_this<GidToLidChange Listeners _listeners; bool _closed; vespalib::hash_map<GlobalId, PendingRemoveEntry, GlobalId::hash> _pendingRemove; + std::vector<PendingGidToLidChange> _pending_changes; void notifyPutDone(IDestructorCallbackSP context, GlobalId gid, uint32_t lid); void notifyRemove(IDestructorCallbackSP context, GlobalId gid); @@ -51,9 +52,11 @@ public: GidToLidChangeHandler(); ~GidToLidChangeHandler() override; - void notifyPutDone(IDestructorCallbackSP context, GlobalId gid, uint32_t lid, SerialNum serialNum) override; + void notifyPut(IDestructorCallbackSP context, GlobalId gid, uint32_t lid, SerialNum serial_num) override; + void notifyPutDone(IDestructorCallbackSP context, GlobalId gid, uint32_t lid, SerialNum serialNum); void notifyRemove(IDestructorCallbackSP context, GlobalId gid, SerialNum serialNum) override; - void notifyRemoveDone(GlobalId gid, SerialNum serialNum) override; + void notifyRemoveDone(GlobalId gid, SerialNum serialNum); + std::unique_ptr<IPendingGidToLidChanges> grab_pending_changes() override; /** * Close handler, further notifications are blocked. diff --git a/searchcore/src/vespa/searchcore/proton/reference/i_gid_to_lid_change_handler.h b/searchcore/src/vespa/searchcore/proton/reference/i_gid_to_lid_change_handler.h index cbafef57e46..56d4a38fee5 100644 --- a/searchcore/src/vespa/searchcore/proton/reference/i_gid_to_lid_change_handler.h +++ b/searchcore/src/vespa/searchcore/proton/reference/i_gid_to_lid_change_handler.h @@ -10,6 +10,8 @@ namespace document { class GlobalId; } namespace proton { +class IPendingGidToLidChanges; + /* * Interface class for registering listeners that get notification when * gid to lid mapping changes. @@ -38,9 +40,9 @@ public: /** * Notify gid to lid mapping change. */ - virtual void notifyPutDone(IDestructorCallbackSP context, GlobalId gid, uint32_t lid, SerialNum serialNum) = 0; + virtual void notifyPut(IDestructorCallbackSP context, GlobalId gid, uint32_t lid, SerialNum serial_num) = 0; virtual void notifyRemove(IDestructorCallbackSP context, GlobalId gid, SerialNum serialNum) = 0; - virtual void notifyRemoveDone(GlobalId gid, SerialNum serialNum) = 0; + virtual std::unique_ptr<IPendingGidToLidChanges> grab_pending_changes() = 0; }; } // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/reference/i_pending_gid_to_lid_changes.h b/searchcore/src/vespa/searchcore/proton/reference/i_pending_gid_to_lid_changes.h new file mode 100644 index 00000000000..6e4e9f240ff --- /dev/null +++ b/searchcore/src/vespa/searchcore/proton/reference/i_pending_gid_to_lid_changes.h @@ -0,0 +1,18 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +namespace proton { + +/* + * Interface class for a container of gid to lid changes awaiting a + * force commit. + */ +class IPendingGidToLidChanges +{ +public: + virtual ~IPendingGidToLidChanges() = default; + virtual void notify_done() = 0; +}; + +} diff --git a/searchcore/src/vespa/searchcore/proton/reference/pending_gid_to_lid_change.h b/searchcore/src/vespa/searchcore/proton/reference/pending_gid_to_lid_change.h new file mode 100644 index 00000000000..01204a5fe92 --- /dev/null +++ b/searchcore/src/vespa/searchcore/proton/reference/pending_gid_to_lid_change.h @@ -0,0 +1,44 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include <vespa/searchlib/common/idestructorcallback.h> +#include <vespa/document/base/globalid.h> +#include <vespa/searchlib/common/serialnum.h> + +namespace proton { + +/* + * Class for a gid to lid change awaiting a force commit. + */ +class PendingGidToLidChange +{ + using Context = std::shared_ptr<search::IDestructorCallback>; + using GlobalId = document::GlobalId; + using SerialNum = search::SerialNum; + + Context _context; + GlobalId _gid; + uint32_t _lid; + SerialNum _serial_num; + bool _is_remove; +public: + PendingGidToLidChange(); + PendingGidToLidChange(Context context, const GlobalId& gid, uint32_t lid, SerialNum serial_num, bool is_remove_) noexcept + : _context(std::move(context)), + _gid(gid), + _lid(lid), + _serial_num(serial_num), + _is_remove(is_remove_) + { + } + ~PendingGidToLidChange() = default; + + Context steal_context() && { return std::move(_context); } + const GlobalId &get_gid() const { return _gid; } + uint32_t get_lid() const { return _lid; } + SerialNum get_serial_num() const { return _serial_num; } + bool is_remove() const { return _is_remove; } +}; + +} diff --git a/searchcore/src/vespa/searchcore/proton/reference/pending_gid_to_lid_changes.cpp b/searchcore/src/vespa/searchcore/proton/reference/pending_gid_to_lid_changes.cpp new file mode 100644 index 00000000000..bb9da2d9c7f --- /dev/null +++ b/searchcore/src/vespa/searchcore/proton/reference/pending_gid_to_lid_changes.cpp @@ -0,0 +1,29 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "pending_gid_to_lid_changes.h" +#include "gid_to_lid_change_handler.h" + +namespace proton { + +PendingGidToLidChanges::PendingGidToLidChanges(GidToLidChangeHandler& handler, std::vector<PendingGidToLidChange> &&pending_changes) + : IPendingGidToLidChanges(), + _handler(handler), + _pending_changes(std::move(pending_changes)) +{ +} + +PendingGidToLidChanges::~PendingGidToLidChanges() = default; + +void +PendingGidToLidChanges::notify_done() +{ + for (auto& change : _pending_changes) { + if (change.is_remove()) { + _handler.notifyRemoveDone(change.get_gid(), change.get_serial_num()); + } else { + _handler.notifyPutDone(std::move(change).steal_context(), change.get_gid(), change.get_lid(), change.get_serial_num()); + } + } +} + +} diff --git a/searchcore/src/vespa/searchcore/proton/reference/pending_gid_to_lid_changes.h b/searchcore/src/vespa/searchcore/proton/reference/pending_gid_to_lid_changes.h new file mode 100644 index 00000000000..a3e74e74b1f --- /dev/null +++ b/searchcore/src/vespa/searchcore/proton/reference/pending_gid_to_lid_changes.h @@ -0,0 +1,26 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include "i_pending_gid_to_lid_changes.h" +#include "pending_gid_to_lid_change.h" +#include <vector> + +namespace proton { + +class GidToLidChangeHandler; + +/* + * Class for a vector of gid to lid changes awaiting a force commit. + */ +class PendingGidToLidChanges : public IPendingGidToLidChanges +{ + GidToLidChangeHandler& _handler; + std::vector<PendingGidToLidChange> _pending_changes; +public: + PendingGidToLidChanges(GidToLidChangeHandler& handler, std::vector<PendingGidToLidChange> &&pending_changes); + ~PendingGidToLidChanges() override; + void notify_done() override; +}; + +} diff --git a/searchcore/src/vespa/searchcore/proton/reference/pending_notify_remove_done.cpp b/searchcore/src/vespa/searchcore/proton/reference/pending_notify_remove_done.cpp deleted file mode 100644 index e806628bc02..00000000000 --- a/searchcore/src/vespa/searchcore/proton/reference/pending_notify_remove_done.cpp +++ /dev/null @@ -1,50 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#include "pending_notify_remove_done.h" -#include <vespa/searchcore/proton/reference/i_gid_to_lid_change_handler.h> -#include <cassert> - -namespace proton -{ - -PendingNotifyRemoveDone::PendingNotifyRemoveDone() - : _gidToLidChangeHandler(nullptr), - _gid(), - _serialNum(0), - _pending(false) -{ -} - -PendingNotifyRemoveDone::PendingNotifyRemoveDone(PendingNotifyRemoveDone &&rhs) - : _gidToLidChangeHandler(rhs._gidToLidChangeHandler), - _gid(rhs._gid), - _serialNum(rhs._serialNum), - _pending(rhs._pending) -{ - rhs._pending = false; -} - -PendingNotifyRemoveDone::~PendingNotifyRemoveDone() -{ - assert(!_pending); // Fail if notifyRemoveDone is still pending -} - -void -PendingNotifyRemoveDone::setup(IGidToLidChangeHandler &gidToLidChangeHandler, document::GlobalId gid, search::SerialNum serialNum) -{ - _gidToLidChangeHandler = &gidToLidChangeHandler; - _gid = gid; - _serialNum = serialNum; - _pending = true; -} - -void -PendingNotifyRemoveDone::invoke() -{ - if (_pending) { - _gidToLidChangeHandler->notifyRemoveDone(_gid, _serialNum); - _pending = false; - } -} - -} // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/reference/pending_notify_remove_done.h b/searchcore/src/vespa/searchcore/proton/reference/pending_notify_remove_done.h deleted file mode 100644 index 95aad182b10..00000000000 --- a/searchcore/src/vespa/searchcore/proton/reference/pending_notify_remove_done.h +++ /dev/null @@ -1,35 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#pragma once - -#include <vespa/document/base/globalid.h> -#include <vespa/searchlib/common/serialnum.h> - -namespace proton -{ - -class IGidToLidChangeHandler; - -/* - * Class used to keep track of a pending notifyRemoveDone() call to - * a gid to lid change handler. - */ -class PendingNotifyRemoveDone -{ - IGidToLidChangeHandler *_gidToLidChangeHandler; - document::GlobalId _gid; - search::SerialNum _serialNum; - bool _pending; - -public: - PendingNotifyRemoveDone(); - PendingNotifyRemoveDone(PendingNotifyRemoveDone &&rhs); - PendingNotifyRemoveDone(const PendingNotifyRemoveDone &rhs) = delete; - PendingNotifyRemoveDone &operator=(const PendingNotifyRemoveDone &rhs) = delete; - PendingNotifyRemoveDone &operator=(PendingNotifyRemoveDone &&rhs) = delete; - ~PendingNotifyRemoveDone(); - void setup(IGidToLidChangeHandler &gidToLidChangeHandler, document::GlobalId gid, search::SerialNum serialNum); - void invoke(); -}; - -} // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/server/CMakeLists.txt b/searchcore/src/vespa/searchcore/proton/server/CMakeLists.txt index 93432221e61..57445775df3 100644 --- a/searchcore/src/vespa/searchcore/proton/server/CMakeLists.txt +++ b/searchcore/src/vespa/searchcore/proton/server/CMakeLists.txt @@ -80,7 +80,6 @@ vespa_add_library(searchcore_server STATIC pruneremoveddocumentsjob.cpp putdonecontext.cpp reconfig_params.cpp - remove_batch_done_context.cpp remove_operations_rate_tracker.cpp removedonecontext.cpp removedonetask.cpp diff --git a/searchcore/src/vespa/searchcore/proton/server/forcecommitcontext.cpp b/searchcore/src/vespa/searchcore/proton/server/forcecommitcontext.cpp index 9c9c3fc7eca..32554555984 100644 --- a/searchcore/src/vespa/searchcore/proton/server/forcecommitcontext.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/forcecommitcontext.cpp @@ -3,6 +3,7 @@ #include "forcecommitcontext.h" #include "forcecommitdonetask.h" #include <vespa/searchcore/proton/common/docid_limit.h> +#include <vespa/searchcore/proton/reference/i_pending_gid_to_lid_changes.h> #include <cassert> namespace proton { @@ -10,9 +11,10 @@ namespace proton { ForceCommitContext::ForceCommitContext(vespalib::Executor &executor, IDocumentMetaStore &documentMetaStore, PendingLidTrackerBase::Snapshot lidsToCommit, + std::unique_ptr<IPendingGidToLidChanges> pending_gid_to_lid_changes, std::shared_ptr<IDestructorCallback> onDone) : _executor(executor), - _task(std::make_unique<ForceCommitDoneTask>(documentMetaStore)), + _task(std::make_unique<ForceCommitDoneTask>(documentMetaStore, std::move(pending_gid_to_lid_changes))), _committedDocIdLimit(0u), _docIdLimit(nullptr), _lidsToCommit(std::move(lidsToCommit)), diff --git a/searchcore/src/vespa/searchcore/proton/server/forcecommitcontext.h b/searchcore/src/vespa/searchcore/proton/server/forcecommitcontext.h index a9987a15da6..73c4ac97f42 100644 --- a/searchcore/src/vespa/searchcore/proton/server/forcecommitcontext.h +++ b/searchcore/src/vespa/searchcore/proton/server/forcecommitcontext.h @@ -12,6 +12,7 @@ namespace proton { class ForceCommitDoneTask; struct IDocumentMetaStore; class DocIdLimit; +class IPendingGidToLidChanges; /** * Context class for forced commits that schedules a task when @@ -34,6 +35,7 @@ public: ForceCommitContext(vespalib::Executor &executor, IDocumentMetaStore &documentMetaStore, PendingLidTrackerBase::Snapshot lidsToCommit, + std::unique_ptr<IPendingGidToLidChanges> pending_gid_to_lid_changes, std::shared_ptr<IDestructorCallback> onDone); ~ForceCommitContext() override; diff --git a/searchcore/src/vespa/searchcore/proton/server/forcecommitdonetask.cpp b/searchcore/src/vespa/searchcore/proton/server/forcecommitdonetask.cpp index 81da2a4ec3e..733c15d55bb 100644 --- a/searchcore/src/vespa/searchcore/proton/server/forcecommitdonetask.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/forcecommitdonetask.cpp @@ -2,13 +2,15 @@ #include "forcecommitdonetask.h" #include <vespa/searchcore/proton/documentmetastore/i_document_meta_store.h> +#include <vespa/searchcore/proton/reference/i_pending_gid_to_lid_changes.h> namespace proton { -ForceCommitDoneTask::ForceCommitDoneTask(IDocumentMetaStore &documentMetaStore) +ForceCommitDoneTask::ForceCommitDoneTask(IDocumentMetaStore &documentMetaStore, std::unique_ptr<IPendingGidToLidChanges> pending_gid_to_lid_changes) : _lidsToReuse(), _holdUnblockShrinkLidSpace(false), - _documentMetaStore(documentMetaStore) + _documentMetaStore(documentMetaStore), + _pending_gid_to_lid_changes(std::move(pending_gid_to_lid_changes)) { } @@ -24,6 +26,9 @@ ForceCommitDoneTask::reuseLids(std::vector<uint32_t> &&lids) void ForceCommitDoneTask::run() { + if (_pending_gid_to_lid_changes) { + _pending_gid_to_lid_changes->notify_done(); + } if (!_lidsToReuse.empty()) { if (_lidsToReuse.size() == 1) { _documentMetaStore.removeComplete(_lidsToReuse[0]); diff --git a/searchcore/src/vespa/searchcore/proton/server/forcecommitdonetask.h b/searchcore/src/vespa/searchcore/proton/server/forcecommitdonetask.h index cffe4199e84..fe95d1575e9 100644 --- a/searchcore/src/vespa/searchcore/proton/server/forcecommitdonetask.h +++ b/searchcore/src/vespa/searchcore/proton/server/forcecommitdonetask.h @@ -8,6 +8,7 @@ namespace proton { struct IDocumentMetaStore; +class IPendingGidToLidChanges; /** * Class for task to be executed when a forced commit has completed and @@ -28,9 +29,10 @@ class ForceCommitDoneTask : public vespalib::Executor::Task std::vector<uint32_t> _lidsToReuse; bool _holdUnblockShrinkLidSpace; IDocumentMetaStore &_documentMetaStore; + std::unique_ptr<IPendingGidToLidChanges> _pending_gid_to_lid_changes; public: - ForceCommitDoneTask(IDocumentMetaStore &documentMetaStore); + ForceCommitDoneTask(IDocumentMetaStore &documentMetaStore, std::unique_ptr<IPendingGidToLidChanges> pending_gid_to_lid_changes); ~ForceCommitDoneTask() override; void reuseLids(std::vector<uint32_t> &&lids); @@ -42,7 +44,7 @@ public: void run() override; bool empty() const { - return _lidsToReuse.empty() && !_holdUnblockShrinkLidSpace; + return _lidsToReuse.empty() && !_holdUnblockShrinkLidSpace && !_pending_gid_to_lid_changes; } }; diff --git a/searchcore/src/vespa/searchcore/proton/server/putdonecontext.cpp b/searchcore/src/vespa/searchcore/proton/server/putdonecontext.cpp index 20ffe203235..23caaf1250b 100644 --- a/searchcore/src/vespa/searchcore/proton/server/putdonecontext.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/putdonecontext.cpp @@ -10,18 +10,12 @@ using document::Document; namespace proton { PutDoneContext::PutDoneContext(FeedToken token, IPendingLidTracker::Token uncommitted, - IGidToLidChangeHandler &gidToLidChangeHandler, std::shared_ptr<const Document> doc, - const document::GlobalId &gid, uint32_t lid, - search::SerialNum serialNum, bool enableNotifyPut) + uint32_t lid) : OperationDoneContext(std::move(token)), _uncommitted(std::move(uncommitted)), _lid(lid), _docIdLimit(nullptr), - _gidToLidChangeHandler(gidToLidChangeHandler), - _gid(gid), - _serialNum(serialNum), - _enableNotifyPut(enableNotifyPut), _doc(std::move(doc)) { } @@ -31,9 +25,6 @@ PutDoneContext::~PutDoneContext() if (_docIdLimit != nullptr) { _docIdLimit->bumpUpLimit(_lid + 1); } - if (_enableNotifyPut) { - _gidToLidChangeHandler.notifyPutDone(steal(), _gid, _lid, _serialNum); - } } } // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/server/putdonecontext.h b/searchcore/src/vespa/searchcore/proton/server/putdonecontext.h index c5e8c558c9e..e7271d8a1b3 100644 --- a/searchcore/src/vespa/searchcore/proton/server/putdonecontext.h +++ b/searchcore/src/vespa/searchcore/proton/server/putdonecontext.h @@ -12,7 +12,6 @@ namespace document { class Document; } namespace proton { class DocIdLimit; -class IGidToLidChangeHandler; /** * Context class for document put operations that acks operation when @@ -26,16 +25,12 @@ class PutDoneContext : public OperationDoneContext IPendingLidTracker::Token _uncommitted; uint32_t _lid; DocIdLimit *_docIdLimit; - IGidToLidChangeHandler &_gidToLidChangeHandler; - document::GlobalId _gid; - search::SerialNum _serialNum; - bool _enableNotifyPut; std::shared_ptr<const document::Document> _doc; public: - PutDoneContext(FeedToken token, IPendingLidTracker::Token uncommitted, IGidToLidChangeHandler &gidToLidChangeHandler, + PutDoneContext(FeedToken token, IPendingLidTracker::Token uncommitted, std::shared_ptr<const document::Document> doc, - const document::GlobalId &gid, uint32_t lid, search::SerialNum serialNum, bool enableNotifyPut); + uint32_t lid); ~PutDoneContext() override; void registerPutLid(DocIdLimit *docIdLimit) { _docIdLimit = docIdLimit; } diff --git a/searchcore/src/vespa/searchcore/proton/server/remove_batch_done_context.cpp b/searchcore/src/vespa/searchcore/proton/server/remove_batch_done_context.cpp deleted file mode 100644 index b0ece5f35a1..00000000000 --- a/searchcore/src/vespa/searchcore/proton/server/remove_batch_done_context.cpp +++ /dev/null @@ -1,27 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#include "remove_batch_done_context.h" -#include <vespa/searchcore/proton/reference/i_gid_to_lid_change_handler.h> - -namespace proton { - -RemoveBatchDoneContext::RemoveBatchDoneContext(vespalib::Executor &executor, - vespalib::Executor::Task::UP task, - IGidToLidChangeHandler &gidToLidChangeHandler, - std::vector<document::GlobalId> gidsToRemove, - search::SerialNum serialNum) - : search::ScheduleTaskCallback(executor, std::move(task)), - _gidToLidChangeHandler(gidToLidChangeHandler), - _gidsToRemove(std::move(gidsToRemove)), - _serialNum(serialNum) -{ -} - -RemoveBatchDoneContext::~RemoveBatchDoneContext() -{ - for (const auto &gid : _gidsToRemove) { - _gidToLidChangeHandler.notifyRemoveDone(gid, _serialNum); - } -} - -} // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/server/remove_batch_done_context.h b/searchcore/src/vespa/searchcore/proton/server/remove_batch_done_context.h deleted file mode 100644 index 2a93239574a..00000000000 --- a/searchcore/src/vespa/searchcore/proton/server/remove_batch_done_context.h +++ /dev/null @@ -1,38 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#pragma once - -#include <vespa/searchlib/common/scheduletaskcallback.h> -#include <vespa/document/base/globalid.h> -#include <vespa/searchlib/common/serialnum.h> -#include <vector> - -namespace proton -{ - -class IGidToLidChangeHandler; - -/** - * Context class for document batch remove that notifies gid to lid - * change handler about each remove done and schedules a - * task when instance is destroyed. Typically a shared pointer to an - * instance is passed around to multiple worker threads that performs - * portions of a larger task before dropping the shared pointer. - */ -class RemoveBatchDoneContext : public search::ScheduleTaskCallback -{ - IGidToLidChangeHandler &_gidToLidChangeHandler; - std::vector<document::GlobalId> _gidsToRemove; - search::SerialNum _serialNum; - -public: - RemoveBatchDoneContext(vespalib::Executor &executor, - vespalib::Executor::Task::UP task, - IGidToLidChangeHandler &gidToLidChangeHandler, - std::vector<document::GlobalId> gidsToRemove, - search::SerialNum serialNum); - - virtual ~RemoveBatchDoneContext(); -}; - -} // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/server/removedonecontext.cpp b/searchcore/src/vespa/searchcore/proton/server/removedonecontext.cpp index d35e6bb7127..859d8693f6d 100644 --- a/searchcore/src/vespa/searchcore/proton/server/removedonecontext.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/removedonecontext.cpp @@ -9,11 +9,10 @@ namespace proton { RemoveDoneContext::RemoveDoneContext(FeedToken token, IPendingLidTracker::Token uncommitted, vespalib::Executor &executor, IDocumentMetaStore &documentMetaStore, - PendingNotifyRemoveDone &&pendingNotifyRemoveDone, uint32_t lid) + uint32_t lid) : OperationDoneContext(std::move(token)), _executor(executor), _task(), - _pendingNotifyRemoveDone(std::move(pendingNotifyRemoveDone)), _uncommitted(std::move(uncommitted)) { if (lid != 0) { @@ -23,7 +22,6 @@ RemoveDoneContext::RemoveDoneContext(FeedToken token, IPendingLidTracker::Token RemoveDoneContext::~RemoveDoneContext() { - _pendingNotifyRemoveDone.invoke(); ack(); if (_task) { vespalib::Executor::Task::UP res = _executor.execute(std::move(_task)); diff --git a/searchcore/src/vespa/searchcore/proton/server/removedonecontext.h b/searchcore/src/vespa/searchcore/proton/server/removedonecontext.h index 7b6c6be1fe1..485b82dd141 100644 --- a/searchcore/src/vespa/searchcore/proton/server/removedonecontext.h +++ b/searchcore/src/vespa/searchcore/proton/server/removedonecontext.h @@ -3,7 +3,6 @@ #pragma once #include "operationdonecontext.h" -#include <vespa/searchcore/proton/reference/pending_notify_remove_done.h> #include <vespa/searchcore/proton/common/ipendinglidtracker.h> #include <vespa/vespalib/util/executor.h> #include <vespa/document/base/globalid.h> @@ -26,12 +25,11 @@ class RemoveDoneContext : public OperationDoneContext { vespalib::Executor &_executor; std::unique_ptr<vespalib::Executor::Task> _task; - PendingNotifyRemoveDone _pendingNotifyRemoveDone; IPendingLidTracker::Token _uncommitted; public: RemoveDoneContext(FeedToken token, IPendingLidTracker::Token uncommitted, vespalib::Executor &executor, IDocumentMetaStore &documentMetaStore, - PendingNotifyRemoveDone &&pendingNotifyRemoveDone, uint32_t lid); + uint32_t lid); ~RemoveDoneContext() override; }; diff --git a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp index bf357188766..7c3c796bda3 100644 --- a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp @@ -5,7 +5,6 @@ #include "ireplayconfig.h" #include "operationdonecontext.h" #include "putdonecontext.h" -#include "remove_batch_done_context.h" #include "removedonecontext.h" #include "updatedonecontext.h" #include <vespa/document/datatype/documenttype.h> @@ -15,7 +14,9 @@ #include <vespa/searchcore/proton/common/feedtoken.h> #include <vespa/searchcore/proton/feedoperation/operations.h> #include <vespa/searchcore/proton/reference/i_gid_to_lid_change_handler.h> +#include <vespa/searchcore/proton/reference/i_pending_gid_to_lid_changes.h> #include <vespa/searchlib/common/gatecallback.h> +#include <vespa/searchlib/common/scheduletaskcallback.h> #include <vespa/vespalib/util/isequencedtaskexecutor.h> #include <vespa/vespalib/util/exceptions.h> @@ -48,11 +49,10 @@ private: public: PutDoneContextForMove(FeedToken token, IPendingLidTracker::Token uncommitted, - IGidToLidChangeHandler &gidToLidChangeHandler, std::shared_ptr<const Document> doc, - const document::GlobalId &gid, uint32_t lid, search::SerialNum serialNum, - bool enableNotifyPut, IDestructorCallback::SP moveDoneCtx) - : PutDoneContext(std::move(token), std::move(uncommitted), gidToLidChangeHandler, std::move(doc), gid, lid, serialNum, enableNotifyPut), + uint32_t lid, + IDestructorCallback::SP moveDoneCtx) + : PutDoneContext(std::move(token), std::move(uncommitted),std::move(doc), lid), _moveDoneCtx(std::move(moveDoneCtx)) {} ~PutDoneContextForMove() override = default; @@ -60,31 +60,28 @@ public: std::shared_ptr<PutDoneContext> createPutDoneContext(FeedToken token, IPendingLidTracker::Token uncommitted, - IGidToLidChangeHandler &gidToLidChangeHandler, std::shared_ptr<const Document> doc, - const document::GlobalId &gid, uint32_t lid, - SerialNum serialNum, bool enableNotifyPut, + uint32_t lid, IDestructorCallback::SP moveDoneCtx) { std::shared_ptr<PutDoneContext> result; if (moveDoneCtx) { - result = std::make_shared<PutDoneContextForMove>(std::move(token), std::move(uncommitted), gidToLidChangeHandler, - std::move(doc), gid, lid, serialNum, enableNotifyPut, std::move(moveDoneCtx)); + result = std::make_shared<PutDoneContextForMove>(std::move(token), std::move(uncommitted), + std::move(doc), lid, std::move(moveDoneCtx)); } else { - result = std::make_shared<PutDoneContext>(std::move(token), std::move(uncommitted), gidToLidChangeHandler, - std::move(doc), gid, lid, serialNum, enableNotifyPut); + result = std::make_shared<PutDoneContext>(std::move(token), std::move(uncommitted), + std::move(doc), lid); } return result; } std::shared_ptr<PutDoneContext> createPutDoneContext(FeedToken token, IPendingLidTracker::Token uncommitted, - IGidToLidChangeHandler &gidToLidChangeHandler, std::shared_ptr<const Document> doc, - const document::GlobalId &gid, uint32_t lid, SerialNum serialNum, bool enableNotifyPut) + uint32_t lid) { - return createPutDoneContext(std::move(token), std::move(uncommitted), gidToLidChangeHandler, std::move(doc), gid, - lid, serialNum, enableNotifyPut, IDestructorCallback::SP()); + return createPutDoneContext(std::move(token), std::move(uncommitted), std::move(doc), + lid, IDestructorCallback::SP()); } std::shared_ptr<UpdateDoneContext> @@ -109,10 +106,10 @@ private: public: RemoveDoneContextForMove(FeedToken token, IPendingLidTracker::Token uncommitted, vespalib::Executor &executor, - IDocumentMetaStore &documentMetaStore, PendingNotifyRemoveDone &&pendingNotifyRemoveDone, + IDocumentMetaStore &documentMetaStore, uint32_t lid, IDestructorCallback::SP moveDoneCtx) : RemoveDoneContext(std::move(token), std::move(uncommitted), executor, - documentMetaStore, std::move(pendingNotifyRemoveDone) ,lid), + documentMetaStore, lid), _moveDoneCtx(std::move(moveDoneCtx)) {} ~RemoveDoneContextForMove() override = default; @@ -120,16 +117,16 @@ public: std::shared_ptr<RemoveDoneContext> createRemoveDoneContext(FeedToken token, IPendingLidTracker::Token uncommitted, vespalib::Executor &executor, - IDocumentMetaStore &documentMetaStore,PendingNotifyRemoveDone &&pendingNotifyRemoveDone, + IDocumentMetaStore &documentMetaStore, uint32_t lid, IDestructorCallback::SP moveDoneCtx) { if (moveDoneCtx) { return std::make_shared<RemoveDoneContextForMove> - (std::move(token), std::move(uncommitted), executor, documentMetaStore, std::move(pendingNotifyRemoveDone), + (std::move(token), std::move(uncommitted), executor, documentMetaStore, lid, std::move(moveDoneCtx)); } else { return std::make_shared<RemoveDoneContext> - (std::move(token), std::move(uncommitted), executor, documentMetaStore, std::move(pendingNotifyRemoveDone), lid); + (std::move(token), std::move(uncommitted), executor, documentMetaStore, lid); } } @@ -243,6 +240,7 @@ StoreOnlyFeedView::forceCommit(SerialNum serialNum, DoneCallback onDone) { internalForceCommit(serialNum, std::make_shared<ForceCommitContext>(_writeService.master(), _metaStore, _pendingLidsForCommit->produceSnapshot(), + _gidToLidChangeHandler.grab_pending_changes(), std::move(onDone))); } @@ -305,17 +303,18 @@ StoreOnlyFeedView::internalPut(FeedToken token, const PutOperation &putOp) putOp.getSubDbId(), putOp.getLid(), putOp.getPrevSubDbId(), putOp.getPrevLid(), _params._subDbId, doc->toString(true).size(), doc->toString(true).c_str()); - PendingNotifyRemoveDone pendingNotifyRemoveDone = adjustMetaStore(putOp, docId.getGlobalId(), docId); + adjustMetaStore(putOp, docId.getGlobalId(), docId); auto uncommitted = get_pending_lid_token(putOp); bool docAlreadyExists = putOp.getValidPrevDbdId(_params._subDbId); if (putOp.getValidDbdId(_params._subDbId)) { - const document::GlobalId &gid = docId.getGlobalId(); + if (putOp.changedDbdId() && useDocumentMetaStore(serialNum)) { + _gidToLidChangeHandler.notifyPut(token, docId.getGlobalId(), putOp.getLid(), serialNum); + } std::shared_ptr<PutDoneContext> onWriteDone = createPutDoneContext(std::move(token), std::move(uncommitted), - _gidToLidChangeHandler, doc, gid, putOp.getLid(), serialNum, - putOp.changedDbdId() && useDocumentMetaStore(serialNum)); + doc, putOp.getLid()); putSummary(serialNum, putOp.getLid(), doc, onWriteDone); putAttributes(serialNum, putOp.getLid(), *doc, onWriteDone); putIndexedFields(serialNum, putOp.getLid(), doc, onWriteDone); @@ -323,7 +322,7 @@ StoreOnlyFeedView::internalPut(FeedToken token, const PutOperation &putOp) if (docAlreadyExists && putOp.changedDbdId()) { assert(!putOp.getValidDbdId(_params._subDbId)); internalRemove(std::move(token), _pendingLidsForCommit->produce(putOp.getPrevLid()), serialNum, - std::move(pendingNotifyRemoveDone), putOp.getPrevLid(), IDestructorCallback::SP()); + putOp.getPrevLid(), IDestructorCallback::SP()); } } @@ -574,7 +573,7 @@ StoreOnlyFeedView::internalRemove(FeedToken token, const RemoveOperationWithDocI _params._docTypeName.toString().c_str(), serialNum, docId.toString().c_str(), rmOp.getSubDbId(), rmOp.getLid(), rmOp.getPrevSubDbId(), rmOp.getPrevLid(), _params._subDbId); - PendingNotifyRemoveDone pendingNotifyRemoveDone = adjustMetaStore(rmOp, docId.getGlobalId(), docId); + adjustMetaStore(rmOp, docId.getGlobalId(), docId); auto uncommitted = get_pending_lid_token(rmOp); if (rmOp.getValidDbdId(_params._subDbId)) { @@ -587,7 +586,7 @@ StoreOnlyFeedView::internalRemove(FeedToken token, const RemoveOperationWithDocI if (rmOp.changedDbdId()) { assert(!rmOp.getValidDbdId(_params._subDbId)); internalRemove(std::move(token), _pendingLidsForCommit->produce(rmOp.getPrevLid()), serialNum, - std::move(pendingNotifyRemoveDone), rmOp.getPrevLid(), IDestructorCallback::SP()); + rmOp.getPrevLid(), IDestructorCallback::SP()); } } } @@ -599,13 +598,13 @@ StoreOnlyFeedView::internalRemove(FeedToken token, const RemoveOperationWithGid assert(rmOp.notMovingLidInSameSubDb()); const SerialNum serialNum = rmOp.getSerialNum(); DocumentId dummy; - PendingNotifyRemoveDone pendingNotifyRemoveDone = adjustMetaStore(rmOp, rmOp.getGlobalId(), dummy); + adjustMetaStore(rmOp, rmOp.getGlobalId(), dummy); auto uncommitted = _pendingLidsForCommit->produce(rmOp.getLid()); if (rmOp.getValidPrevDbdId(_params._subDbId)) { if (rmOp.changedDbdId()) { assert(!rmOp.getValidDbdId(_params._subDbId)); - internalRemove(std::move(token), _pendingLidsForCommit->produce(rmOp.getPrevLid()), serialNum, std::move(pendingNotifyRemoveDone), + internalRemove(std::move(token), _pendingLidsForCommit->produce(rmOp.getPrevLid()), serialNum, rmOp.getPrevLid(), IDestructorCallback::SP()); } } @@ -613,23 +612,22 @@ StoreOnlyFeedView::internalRemove(FeedToken token, const RemoveOperationWithGid void StoreOnlyFeedView::internalRemove(FeedToken token, IPendingLidTracker::Token uncommitted, SerialNum serialNum, - PendingNotifyRemoveDone &&pendingNotifyRemoveDone, Lid lid, + Lid lid, IDestructorCallback::SP moveDoneCtx) { bool explicitReuseLid = _lidReuseDelayer.delayReuse(lid); std::shared_ptr<RemoveDoneContext> onWriteDone; onWriteDone = createRemoveDoneContext(std::move(token), std::move(uncommitted),_writeService.master(), _metaStore, - std::move(pendingNotifyRemoveDone), (explicitReuseLid ? lid : 0u), + (explicitReuseLid ? lid : 0u), std::move(moveDoneCtx)); removeSummary(serialNum, lid, onWriteDone); removeAttributes(serialNum, lid, onWriteDone); removeIndexedFields(serialNum, lid, onWriteDone); } -PendingNotifyRemoveDone +void StoreOnlyFeedView::adjustMetaStore(const DocumentOperation &op, const GlobalId & gid, const DocumentId &docId) { - PendingNotifyRemoveDone pendingNotifyRemoveDone; const SerialNum serialNum = op.getSerialNum(); if (useDocumentMetaStore(serialNum)) { if (op.getValidDbdId(_params._subDbId)) { @@ -644,13 +642,11 @@ StoreOnlyFeedView::adjustMetaStore(const DocumentOperation &op, const GlobalId & } else if (op.getValidPrevDbdId(_params._subDbId)) { vespalib::Gate gate; _gidToLidChangeHandler.notifyRemove(std::make_shared<search::GateCallback>(gate), gid, serialNum); - pendingNotifyRemoveDone.setup(_gidToLidChangeHandler, gid, serialNum); gate.await(); removeMetaData(_metaStore, gid, docId, op, _params._subDbType == SubDbType::REMOVED); } _metaStore.commit(serialNum, serialNum); } - return pendingNotifyRemoveDone; } void @@ -695,8 +691,7 @@ StoreOnlyFeedView::removeDocuments(const RemoveDocumentsOperation &op, bool remo } else { removeBatchDoneTask = makeLambdaTask([]() {}); } - onWriteDone = std::make_shared<RemoveBatchDoneContext>(_writeService.master(), std::move(removeBatchDoneTask), - _gidToLidChangeHandler, std::move(gidsToRemove), serialNum); + onWriteDone = std::make_shared<search::ScheduleTaskCallback>(_writeService.master(), std::move(removeBatchDoneTask)); if (remove_index_and_attributes) { removeIndexedFields(serialNum, lidsToRemove, onWriteDone); removeAttributes(serialNum, lidsToRemove, onWriteDone); @@ -767,20 +762,21 @@ StoreOnlyFeedView::handleMove(const MoveOperation &moveOp, IDestructorCallback:: moveOp.getSubDbId(), moveOp.getLid(), moveOp.getPrevSubDbId(), moveOp.getPrevLid(), _params._subDbId, doc->toString(true).size(), doc->toString(true).c_str()); - PendingNotifyRemoveDone pendingNotifyRemoveDone = adjustMetaStore(moveOp, docId.getGlobalId(), docId); + adjustMetaStore(moveOp, docId.getGlobalId(), docId); bool docAlreadyExists = moveOp.getValidPrevDbdId(_params._subDbId); if (moveOp.getValidDbdId(_params._subDbId)) { - const document::GlobalId &gid = docId.getGlobalId(); + if (moveOp.changedDbdId() && useDocumentMetaStore(serialNum)) { + _gidToLidChangeHandler.notifyPut(FeedToken(), docId.getGlobalId(), moveOp.getLid(), serialNum); + } std::shared_ptr<PutDoneContext> onWriteDone = createPutDoneContext(FeedToken(), _pendingLidsForCommit->produce(moveOp.getLid()), - _gidToLidChangeHandler, doc, gid, moveOp.getLid(), serialNum, - moveOp.changedDbdId() && useDocumentMetaStore(serialNum), doneCtx); + doc, moveOp.getLid(), doneCtx); putSummary(serialNum, moveOp.getLid(), doc, onWriteDone); putAttributes(serialNum, moveOp.getLid(), *doc, onWriteDone); putIndexedFields(serialNum, moveOp.getLid(), doc, onWriteDone); } if (docAlreadyExists && moveOp.changedDbdId()) { - internalRemove(FeedToken(), _pendingLidsForCommit->produce(moveOp.getPrevLid()), serialNum, std::move(pendingNotifyRemoveDone), moveOp.getPrevLid(), doneCtx); + internalRemove(FeedToken(), _pendingLidsForCommit->produce(moveOp.getPrevLid()), serialNum, moveOp.getPrevLid(), doneCtx); } } @@ -820,6 +816,7 @@ StoreOnlyFeedView::handleCompactLidSpace(const CompactLidSpaceOperation &op) getDocumentMetaStore()->get().compactLidSpace(op.getLidLimit()); auto commitContext(std::make_shared<ForceCommitContext>(_writeService.master(), _metaStore, _pendingLidsForCommit->produceSnapshot(), + _gidToLidChangeHandler.grab_pending_changes(), DoneCallback())); commitContext->holdUnblockShrinkLidSpace(); internalForceCommit(serialNum, commitContext); diff --git a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h index da7d5e53a88..9927c93add4 100644 --- a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h +++ b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h @@ -16,7 +16,6 @@ #include <vespa/searchcore/proton/documentmetastore/lidreusedelayer.h> #include <vespa/searchcore/proton/feedoperation/lidvectorcontext.h> #include <vespa/searchcore/proton/persistenceengine/resulthandler.h> -#include <vespa/searchcore/proton/reference/pending_notify_remove_done.h> #include <vespa/searchcorespi/index/ithreadingservice.h> #include <vespa/searchlib/query/base.h> #include <vespa/vespalib/util/threadstackexecutorbase.h> @@ -169,7 +168,7 @@ private: return replaySerialNum > _params._flushedDocumentMetaStoreSerialNum; } - PendingNotifyRemoveDone adjustMetaStore(const DocumentOperation &op, const document::GlobalId & gid, const document::DocumentId &docId); + void adjustMetaStore(const DocumentOperation &op, const document::GlobalId & gid, const document::DocumentId &docId); void internalPut(FeedToken token, const PutOperation &putOp); void internalUpdate(FeedToken token, const UpdateOperation &updOp); @@ -182,7 +181,6 @@ private: size_t removeDocuments(const RemoveDocumentsOperation &op, bool remove_index_and_attribute_fields); void internalRemove(FeedToken token, IPendingLidTracker::Token uncommitted, SerialNum serialNum, - PendingNotifyRemoveDone &&pendingNotifyRemoveDone, Lid lid, std::shared_ptr<search::IDestructorCallback> moveDoneCtx); IPendingLidTracker::Token get_pending_lid_token(const DocumentOperation &op); diff --git a/searchcore/src/vespa/searchcore/proton/test/mock_gid_to_lid_change_handler.h b/searchcore/src/vespa/searchcore/proton/test/mock_gid_to_lid_change_handler.h index f9531486e9b..01cfe90a9b6 100644 --- a/searchcore/src/vespa/searchcore/proton/test/mock_gid_to_lid_change_handler.h +++ b/searchcore/src/vespa/searchcore/proton/test/mock_gid_to_lid_change_handler.h @@ -3,6 +3,7 @@ #include <vespa/searchcore/proton/reference/i_gid_to_lid_change_handler.h> #include <vespa/searchcore/proton/reference/i_gid_to_lid_change_listener.h> +#include <vespa/searchcore/proton/reference/i_pending_gid_to_lid_changes.h> #include <vespa/vespalib/testkit/testapp.h> #include <vespa/vespalib/test/insertion_operators.h> @@ -42,9 +43,9 @@ public: _removes.emplace_back(docTypeName, keepNames); } - void notifyPutDone(IDestructorCallbackSP, document::GlobalId, uint32_t, SerialNum) override { } + void notifyPut(IDestructorCallbackSP, document::GlobalId, uint32_t, SerialNum) override { } void notifyRemove(IDestructorCallbackSP, document::GlobalId, SerialNum) override { } - void notifyRemoveDone(document::GlobalId, SerialNum) override { } + std::unique_ptr<IPendingGidToLidChanges> grab_pending_changes() override { return std::unique_ptr<IPendingGidToLidChanges>(); } void assertAdds(const std::vector<AddEntry> &expAdds) { |