diff options
Diffstat (limited to 'searchcore')
11 files changed, 99 insertions, 27 deletions
diff --git a/searchcore/src/apps/vespa-transactionlog-inspect/vespa-transactionlog-inspect.cpp b/searchcore/src/apps/vespa-transactionlog-inspect/vespa-transactionlog-inspect.cpp index a7c53293981..c649709c2eb 100644 --- a/searchcore/src/apps/vespa-transactionlog-inspect/vespa-transactionlog-inspect.cpp +++ b/searchcore/src/apps/vespa-transactionlog-inspect/vespa-transactionlog-inspect.cpp @@ -204,6 +204,7 @@ public: document::DocumentTypeRepo &getDeserializeRepo() override { return _repo; } + void check_serial_num(search::SerialNum) override { } void optionalCommit(search::SerialNum) override { } }; diff --git a/searchcore/src/tests/proton/server/feedstates_test.cpp b/searchcore/src/tests/proton/server/feedstates_test.cpp index 59e504de6ce..d45612cbcaf 100644 --- a/searchcore/src/tests/proton/server/feedstates_test.cpp +++ b/searchcore/src/tests/proton/server/feedstates_test.cpp @@ -53,6 +53,15 @@ struct MyReplayConfig : IReplayConfig { void replayConfig(SerialNum) override {} }; +struct MyIncSerialNum : IIncSerialNum { + SerialNum _serial_num; + MyIncSerialNum(SerialNum serial_num) + : _serial_num(serial_num) + { + } + SerialNum incSerialNum() override { return ++_serial_num; } +}; + struct Fixture { MyFeedView feed_view1; @@ -62,6 +71,7 @@ struct Fixture MemoryConfigStore config_store; BucketDBOwner _bucketDB; bucketdb::BucketDBHandler _bucketDBHandler; + MyIncSerialNum _inc_serial_num; ReplayTransactionLogState state; Fixture(); @@ -76,7 +86,8 @@ Fixture::Fixture() config_store(), _bucketDB(), _bucketDBHandler(_bucketDB), - state("doctypename", feed_view_ptr, _bucketDBHandler, replay_config, config_store) + _inc_serial_num(9u), + state("doctypename", feed_view_ptr, _bucketDBHandler, replay_config, config_store, _inc_serial_num) { } Fixture::~Fixture() = default; @@ -105,17 +116,23 @@ RemoveOperationContext::RemoveOperationContext(search::SerialNum serial) RemoveOperationContext::~RemoveOperationContext() = default; TEST_F("require that active FeedView can change during replay", Fixture) { - RemoveOperationContext opCtx(10); - auto wrap = std::make_shared<PacketWrapper>(*opCtx.packet, nullptr); ForegroundThreadExecutor executor; EXPECT_EQUAL(0, f.feed_view1.remove_handled); EXPECT_EQUAL(0, f.feed_view2.remove_handled); - f.state.receive(wrap, executor); + { + RemoveOperationContext opCtx(10); + auto wrap = std::make_shared<PacketWrapper>(*opCtx.packet, nullptr); + f.state.receive(wrap, executor); + } EXPECT_EQUAL(1, f.feed_view1.remove_handled); EXPECT_EQUAL(0, f.feed_view2.remove_handled); f.feed_view_ptr = &f.feed_view2; - f.state.receive(wrap, executor); + { + RemoveOperationContext opCtx(11); + auto wrap = std::make_shared<PacketWrapper>(*opCtx.packet, nullptr); + f.state.receive(wrap, executor); + } EXPECT_EQUAL(1, f.feed_view1.remove_handled); EXPECT_EQUAL(1, f.feed_view2.remove_handled); } diff --git a/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp b/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp index f509bcf31ef..c76a7270ab2 100644 --- a/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp @@ -610,11 +610,13 @@ DocumentDB::saveInitialConfig(const DocumentDBConfig &configSnapshot) if (_config_store->getBestSerialNum() != 0) return; // Initial config already present - SerialNum confSerial = _feedHandler->incSerialNum(); + SerialNum confSerial = _feedHandler->inc_replay_end_serial_num(); + _feedHandler->setSerialNum(confSerial); // Elide save of new config entry in transaction log, it would be // pruned at once anyway. // save noop entry in transaction log NoopOperation op; + op.setSerialNum(_feedHandler->inc_replay_end_serial_num()); (void) _feedHandler->storeOperationSync(op); sync(op.getSerialNum()); // Wipe everything in transaction log before initial config. @@ -633,12 +635,14 @@ DocumentDB::resumeSaveConfig() SerialNum bestSerial = _config_store->getBestSerialNum(); if (bestSerial == 0) return; - if (bestSerial != _feedHandler->getSerialNum() + 1) + if (bestSerial != _feedHandler->get_replay_end_serial_num() + 1) return; // proton was interrupted when saving later config. - SerialNum confSerial = _feedHandler->incSerialNum(); + SerialNum confSerial = _feedHandler->inc_replay_end_serial_num(); + _feedHandler->setSerialNum(confSerial); // resume operation, i.e. save config entry in transaction log NewConfigOperation op(confSerial, *_config_store); + op.setSerialNum(_feedHandler->inc_replay_end_serial_num()); (void) _feedHandler->storeOperationSync(op); sync(op.getSerialNum()); } diff --git a/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp b/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp index 25be8e4a929..88f785a58df 100644 --- a/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp @@ -309,6 +309,12 @@ FeedHandler::performEof() assert(_writeService.master().isCurrentThread()); _writeService.sync(); LOG(debug, "Visiting done for transaction log domain '%s', eof received", _tlsMgr.getDomainName().c_str()); + // Replay must be complete + if (_replay_end_serial_num != _serialNum) { + LOG(warning, "Expected replay end serial number %" PRIu64 ", got serial number %" PRIu64, + _replay_end_serial_num, _serialNum); + assert(_replay_end_serial_num == _serialNum); + } _owner.onTransactionLogReplayDone(); _tlsMgr.replayDone(); changeToNormalFeedState(); @@ -412,6 +418,7 @@ FeedHandler::FeedHandler(IThreadingService &writeService, _tlsReplayProgress(), _serialNum(0), _prunedSerialNum(0), + _replay_end_serial_num(0u), _prepare_serial_num(0u), _numOperationsPendingCommit(0), _numOperationsCompleted(0), @@ -435,13 +442,14 @@ FeedHandler::~FeedHandler() = default; void FeedHandler::init(SerialNum oldestConfigSerial) { - _tlsMgr.init(oldestConfigSerial, _prunedSerialNum, _serialNum); + _tlsMgr.init(oldestConfigSerial, _prunedSerialNum, _replay_end_serial_num); + _serialNum = _prunedSerialNum; if (_tlsWriter == nullptr) { _tlsMgrWriter = std::make_unique<TlsMgrWriter>(_tlsMgr, _tlsWriterfactory); _tlsWriter = _tlsMgrWriter.get(); } _allowSync = true; - syncTls(_serialNum); + syncTls(_replay_end_serial_num); } @@ -464,18 +472,18 @@ FeedHandler::replayTransactionLog(SerialNum flushedIndexMgrSerial, SerialNum flu assert(_activeFeedView); assert(_bucketDBHandler); auto state = make_shared<ReplayTransactionLogState> - (getDocTypeName(), _activeFeedView, *_bucketDBHandler, _replayConfig, config_store); + (getDocTypeName(), _activeFeedView, *_bucketDBHandler, _replayConfig, config_store, *this); changeFeedState(state); // Resurrected attribute vector might cause oldestFlushedSerial to // be lower than _prunedSerialNum, so don't warn for now. (void) oldestFlushedSerial; - assert(_serialNum >= newestFlushedSerial); + assert(_replay_end_serial_num >= newestFlushedSerial); TransactionLogManager::prepareReplay(_tlsMgr.getClient(), _docTypeName.getName(), flushedIndexMgrSerial, flushedSummaryMgrSerial, config_store); - _tlsReplayProgress = _tlsMgr.make_replay_progress(_prunedSerialNum, _serialNum); - _tlsMgr.startReplay(_prunedSerialNum, _serialNum, *this); + _tlsReplayProgress = _tlsMgr.make_replay_progress(_serialNum, _replay_end_serial_num); + _tlsMgr.startReplay(_serialNum, _replay_end_serial_num, *this); } void diff --git a/searchcore/src/vespa/searchcore/proton/server/feedhandler.h b/searchcore/src/vespa/searchcore/proton/server/feedhandler.h index 6d551c785a1..777b8a0c81d 100644 --- a/searchcore/src/vespa/searchcore/proton/server/feedhandler.h +++ b/searchcore/src/vespa/searchcore/proton/server/feedhandler.h @@ -2,6 +2,7 @@ #pragma once +#include "i_inc_serial_num.h" #include "i_operation_storer.h" #include "idocumentmovehandler.h" #include "igetserialnum.h" @@ -48,7 +49,8 @@ class FeedHandler: private search::transactionlog::client::Callback, public IPruneRemovedDocumentsHandler, public IHeartBeatHandler, public IOperationStorer, - public IGetSerialNum + public IGetSerialNum, + public IIncSerialNum { private: using Packet = search::transactionlog::Packet; @@ -73,9 +75,11 @@ private: std::unique_ptr<TlsWriter> _tlsMgrWriter; TlsWriter *_tlsWriter; TlsReplayProgress::UP _tlsReplayProgress; - // the serial num of the last message in the transaction log + // the serial num of the last feed operation processed by feed handler. SerialNum _serialNum; SerialNum _prunedSerialNum; + // the serial num of the last feed operation in the transaction log at startup before replay + SerialNum _replay_end_serial_num; uint64_t _prepare_serial_num; size_t _numOperationsPendingCommit; size_t _numOperationsCompleted; @@ -208,8 +212,11 @@ public: } void setSerialNum(SerialNum serialNum) { _serialNum = serialNum; } - SerialNum incSerialNum() { return ++_serialNum; } + SerialNum incSerialNum() override { return ++_serialNum; } SerialNum getSerialNum() const override { return _serialNum; } + // The two following methods are used when saving initial config + SerialNum get_replay_end_serial_num() const { return _replay_end_serial_num; } + SerialNum inc_replay_end_serial_num() { return ++_replay_end_serial_num; } SerialNum getPrunedSerialNum() const { return _prunedSerialNum; } uint64_t inc_prepare_serial_num() { return ++_prepare_serial_num; } diff --git a/searchcore/src/vespa/searchcore/proton/server/feedstates.cpp b/searchcore/src/vespa/searchcore/proton/server/feedstates.cpp index 656006bf3a6..7ab7b95ae24 100644 --- a/searchcore/src/vespa/searchcore/proton/server/feedstates.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/feedstates.cpp @@ -11,6 +11,7 @@ #include <vespa/searchlib/common/idestructorcallback.h> #include <vespa/vespalib/util/closuretask.h> #include <vespa/vespalib/util/lambdatask.h> +#include <cassert> #include <vespa/log/log.h> LOG_SETUP(".proton.server.feedstates"); @@ -68,17 +69,20 @@ class TransactionLogReplayPacketHandler : public IReplayPacketHandler { IBucketDBHandler &_bucketDBHandler; IReplayConfig &_replay_config; FeedConfigStore &_config_store; + IIncSerialNum &_inc_serial_num; CommitTimeTracker _commitTimeTracker; public: TransactionLogReplayPacketHandler(IFeedView *& feed_view_ptr, IBucketDBHandler &bucketDBHandler, IReplayConfig &replay_config, - FeedConfigStore &config_store) + FeedConfigStore &config_store, + IIncSerialNum &inc_serial_num) : _feed_view_ptr(feed_view_ptr), _bucketDBHandler(bucketDBHandler), _replay_config(replay_config), _config_store(config_store), + _inc_serial_num(inc_serial_num), _commitTimeTracker(5ms) { } @@ -126,6 +130,13 @@ public: const document::DocumentTypeRepo &getDeserializeRepo() override { return *_feed_view_ptr->getDocumentTypeRepo(); } + void check_serial_num(search::SerialNum serial_num) override { + auto exp_serial_num = _inc_serial_num.incSerialNum(); + if (exp_serial_num != serial_num) { + LOG(warning, "Expected replay serial number %" PRIu64 ", got serial number %" PRIu64, exp_serial_num, serial_num); + assert(exp_serial_num == serial_num); + } + } void optionalCommit(search::SerialNum serialNum) override { if (_commitTimeTracker.needCommit()) { _feed_view_ptr->forceCommit(serialNum); @@ -137,9 +148,11 @@ void startDispatch(IReplayPacketHandler *packet_handler, const Packet::Entry &en // Called by handlePacket() in executor thread. LOG(spam, "replay packet entry: entrySerial(%" PRIu64 "), entryType(%u)", entry.serial(), entry.type()); + auto entry_serial_num = entry.serial(); + packet_handler->check_serial_num(entry_serial_num); ReplayPacketDispatcher dispatcher(*packet_handler); dispatcher.replayEntry(entry); - packet_handler->optionalCommit(entry.serial()); + packet_handler->optionalCommit(entry_serial_num); } } // namespace @@ -149,10 +162,11 @@ ReplayTransactionLogState::ReplayTransactionLogState( IFeedView *& feed_view_ptr, IBucketDBHandler &bucketDBHandler, IReplayConfig &replay_config, - FeedConfigStore &config_store) + FeedConfigStore &config_store, + IIncSerialNum& inc_serial_num) : FeedState(REPLAY_TRANSACTION_LOG), _doc_type_name(name), - _packet_handler(std::make_unique<TransactionLogReplayPacketHandler>(feed_view_ptr, bucketDBHandler, replay_config, config_store)) + _packet_handler(std::make_unique<TransactionLogReplayPacketHandler>(feed_view_ptr, bucketDBHandler, replay_config, config_store, inc_serial_num)) { } ReplayTransactionLogState::~ReplayTransactionLogState() = default; diff --git a/searchcore/src/vespa/searchcore/proton/server/feedstates.h b/searchcore/src/vespa/searchcore/proton/server/feedstates.h index 3d24f4068a2..8804c77f78d 100644 --- a/searchcore/src/vespa/searchcore/proton/server/feedstates.h +++ b/searchcore/src/vespa/searchcore/proton/server/feedstates.h @@ -54,7 +54,8 @@ public: IFeedView *& feed_view_ptr, bucketdb::IBucketDBHandler &bucketDBHandler, IReplayConfig &replay_config, - FeedConfigStore &config_store); + FeedConfigStore &config_store, + IIncSerialNum &inc_serial_num); ~ReplayTransactionLogState() override; void handleOperation(FeedToken, FeedOperationUP op) override { diff --git a/searchcore/src/vespa/searchcore/proton/server/i_inc_serial_num.h b/searchcore/src/vespa/searchcore/proton/server/i_inc_serial_num.h new file mode 100644 index 00000000000..64afcf49f94 --- /dev/null +++ b/searchcore/src/vespa/searchcore/proton/server/i_inc_serial_num.h @@ -0,0 +1,19 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include <vespa/searchlib/common/serialnum.h> + +namespace proton { + +/** + * A simple interface for increasing a serial number and getting the new + * serial number. + **/ +class IIncSerialNum { +public: + virtual ~IIncSerialNum() = default; + virtual search::SerialNum incSerialNum() = 0; +}; + +} diff --git a/searchcore/src/vespa/searchcore/proton/server/ireplaypackethandler.h b/searchcore/src/vespa/searchcore/proton/server/ireplaypackethandler.h index 77caa52b5d5..bd96ee0f724 100644 --- a/searchcore/src/vespa/searchcore/proton/server/ireplaypackethandler.h +++ b/searchcore/src/vespa/searchcore/proton/server/ireplaypackethandler.h @@ -42,6 +42,7 @@ struct IReplayPacketHandler virtual void replay(const MoveOperation &op) = 0; virtual void replay(const CreateBucketOperation &op) = 0; virtual void replay(const CompactLidSpaceOperation &op) = 0; + virtual void check_serial_num(search::SerialNum serial_num) = 0; virtual void optionalCommit(search::SerialNum) = 0; virtual feedoperation::IStreamHandler &getNewConfigStreamHandler() = 0; diff --git a/searchcore/src/vespa/searchcore/proton/server/transactionlogmanager.cpp b/searchcore/src/vespa/searchcore/proton/server/transactionlogmanager.cpp index 029e32d254e..070cfd8085e 100644 --- a/searchcore/src/vespa/searchcore/proton/server/transactionlogmanager.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/transactionlogmanager.cpp @@ -34,11 +34,11 @@ TransactionLogManager::TransactionLogManager(const vespalib::string &tlsSpec, co TransactionLogManager::~TransactionLogManager() = default; void -TransactionLogManager::init(SerialNum oldestConfigSerial, SerialNum &prunedSerialNum, SerialNum &serialNum) +TransactionLogManager::init(SerialNum oldestConfigSerial, SerialNum &prunedSerialNum, SerialNum &replay_end_serial_num) { StatusResult res = TransactionLogManagerBase::init(); prunedSerialNum = res.serialBegin > 0 ? (res.serialBegin - 1) : 0; - serialNum = res.serialEnd; + replay_end_serial_num = res.serialEnd; if (oldestConfigSerial != 0) { prunedSerialNum = std::max(prunedSerialNum, oldestConfigSerial); } diff --git a/searchcore/src/vespa/searchcore/proton/server/transactionlogmanager.h b/searchcore/src/vespa/searchcore/proton/server/transactionlogmanager.h index 8ff9dad26a7..6ed6990d8d5 100644 --- a/searchcore/src/vespa/searchcore/proton/server/transactionlogmanager.h +++ b/searchcore/src/vespa/searchcore/proton/server/transactionlogmanager.h @@ -33,10 +33,10 @@ public: * @param oldestConfigSerial the serial num of the oldest config. * @param the pruned serial num will be set to 1 lower than * the serial num of the first entry in the transaction log. - * @param the current serial num will be set to 1 higher than - * the serial num of the last entry in the transaction log. + * @param replay_end_serial_num will be set to the serial num of + * the last entry in the transaction log. **/ - void init(SerialNum oldestConfigSerial, SerialNum &prunedSerialNum, SerialNum &serialNum); + void init(SerialNum oldestConfigSerial, SerialNum &prunedSerialNum, SerialNum &replay_end_serial_num); /** * Prepare replay of the transaction log. |