aboutsummaryrefslogtreecommitdiffstats
path: root/searchcore
diff options
context:
space:
mode:
authorTor Egge <Tor.Egge@broadpark.no>2020-11-17 13:26:22 +0100
committerTor Egge <Tor.Egge@broadpark.no>2020-11-17 13:26:22 +0100
commitda4dee87d3cc7ee29ff7aeca757ee95edfdb3f2c (patch)
treea612c4ab12e7ac751adf31ce5e1926e0dce142b0 /searchcore
parent770c0f039885e61a71f543ddbf4da1b131a8a7eb (diff)
Adjust serial number in feed handler during replay to ensure
proper semantics when using IGetSerialNum interface.
Diffstat (limited to 'searchcore')
-rw-r--r--searchcore/src/apps/vespa-transactionlog-inspect/vespa-transactionlog-inspect.cpp1
-rw-r--r--searchcore/src/tests/proton/server/feedstates_test.cpp27
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/documentdb.cpp10
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp20
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/feedhandler.h13
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/feedstates.cpp22
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/feedstates.h3
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/i_inc_serial_num.h19
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/ireplaypackethandler.h1
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/transactionlogmanager.cpp4
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/transactionlogmanager.h6
11 files changed, 99 insertions, 27 deletions
diff --git a/searchcore/src/apps/vespa-transactionlog-inspect/vespa-transactionlog-inspect.cpp b/searchcore/src/apps/vespa-transactionlog-inspect/vespa-transactionlog-inspect.cpp
index a7c53293981..c649709c2eb 100644
--- a/searchcore/src/apps/vespa-transactionlog-inspect/vespa-transactionlog-inspect.cpp
+++ b/searchcore/src/apps/vespa-transactionlog-inspect/vespa-transactionlog-inspect.cpp
@@ -204,6 +204,7 @@ public:
document::DocumentTypeRepo &getDeserializeRepo() override {
return _repo;
}
+ void check_serial_num(search::SerialNum) override { }
void optionalCommit(search::SerialNum) override { }
};
diff --git a/searchcore/src/tests/proton/server/feedstates_test.cpp b/searchcore/src/tests/proton/server/feedstates_test.cpp
index 59e504de6ce..d45612cbcaf 100644
--- a/searchcore/src/tests/proton/server/feedstates_test.cpp
+++ b/searchcore/src/tests/proton/server/feedstates_test.cpp
@@ -53,6 +53,15 @@ struct MyReplayConfig : IReplayConfig {
void replayConfig(SerialNum) override {}
};
+struct MyIncSerialNum : IIncSerialNum {
+ SerialNum _serial_num;
+ MyIncSerialNum(SerialNum serial_num)
+ : _serial_num(serial_num)
+ {
+ }
+ SerialNum incSerialNum() override { return ++_serial_num; }
+};
+
struct Fixture
{
MyFeedView feed_view1;
@@ -62,6 +71,7 @@ struct Fixture
MemoryConfigStore config_store;
BucketDBOwner _bucketDB;
bucketdb::BucketDBHandler _bucketDBHandler;
+ MyIncSerialNum _inc_serial_num;
ReplayTransactionLogState state;
Fixture();
@@ -76,7 +86,8 @@ Fixture::Fixture()
config_store(),
_bucketDB(),
_bucketDBHandler(_bucketDB),
- state("doctypename", feed_view_ptr, _bucketDBHandler, replay_config, config_store)
+ _inc_serial_num(9u),
+ state("doctypename", feed_view_ptr, _bucketDBHandler, replay_config, config_store, _inc_serial_num)
{
}
Fixture::~Fixture() = default;
@@ -105,17 +116,23 @@ RemoveOperationContext::RemoveOperationContext(search::SerialNum serial)
RemoveOperationContext::~RemoveOperationContext() = default;
TEST_F("require that active FeedView can change during replay", Fixture)
{
- RemoveOperationContext opCtx(10);
- auto wrap = std::make_shared<PacketWrapper>(*opCtx.packet, nullptr);
ForegroundThreadExecutor executor;
EXPECT_EQUAL(0, f.feed_view1.remove_handled);
EXPECT_EQUAL(0, f.feed_view2.remove_handled);
- f.state.receive(wrap, executor);
+ {
+ RemoveOperationContext opCtx(10);
+ auto wrap = std::make_shared<PacketWrapper>(*opCtx.packet, nullptr);
+ f.state.receive(wrap, executor);
+ }
EXPECT_EQUAL(1, f.feed_view1.remove_handled);
EXPECT_EQUAL(0, f.feed_view2.remove_handled);
f.feed_view_ptr = &f.feed_view2;
- f.state.receive(wrap, executor);
+ {
+ RemoveOperationContext opCtx(11);
+ auto wrap = std::make_shared<PacketWrapper>(*opCtx.packet, nullptr);
+ f.state.receive(wrap, executor);
+ }
EXPECT_EQUAL(1, f.feed_view1.remove_handled);
EXPECT_EQUAL(1, f.feed_view2.remove_handled);
}
diff --git a/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp b/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp
index f509bcf31ef..c76a7270ab2 100644
--- a/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp
@@ -610,11 +610,13 @@ DocumentDB::saveInitialConfig(const DocumentDBConfig &configSnapshot)
if (_config_store->getBestSerialNum() != 0)
return; // Initial config already present
- SerialNum confSerial = _feedHandler->incSerialNum();
+ SerialNum confSerial = _feedHandler->inc_replay_end_serial_num();
+ _feedHandler->setSerialNum(confSerial);
// Elide save of new config entry in transaction log, it would be
// pruned at once anyway.
// save noop entry in transaction log
NoopOperation op;
+ op.setSerialNum(_feedHandler->inc_replay_end_serial_num());
(void) _feedHandler->storeOperationSync(op);
sync(op.getSerialNum());
// Wipe everything in transaction log before initial config.
@@ -633,12 +635,14 @@ DocumentDB::resumeSaveConfig()
SerialNum bestSerial = _config_store->getBestSerialNum();
if (bestSerial == 0)
return;
- if (bestSerial != _feedHandler->getSerialNum() + 1)
+ if (bestSerial != _feedHandler->get_replay_end_serial_num() + 1)
return;
// proton was interrupted when saving later config.
- SerialNum confSerial = _feedHandler->incSerialNum();
+ SerialNum confSerial = _feedHandler->inc_replay_end_serial_num();
+ _feedHandler->setSerialNum(confSerial);
// resume operation, i.e. save config entry in transaction log
NewConfigOperation op(confSerial, *_config_store);
+ op.setSerialNum(_feedHandler->inc_replay_end_serial_num());
(void) _feedHandler->storeOperationSync(op);
sync(op.getSerialNum());
}
diff --git a/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp b/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp
index 25be8e4a929..88f785a58df 100644
--- a/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp
@@ -309,6 +309,12 @@ FeedHandler::performEof()
assert(_writeService.master().isCurrentThread());
_writeService.sync();
LOG(debug, "Visiting done for transaction log domain '%s', eof received", _tlsMgr.getDomainName().c_str());
+ // Replay must be complete
+ if (_replay_end_serial_num != _serialNum) {
+ LOG(warning, "Expected replay end serial number %" PRIu64 ", got serial number %" PRIu64,
+ _replay_end_serial_num, _serialNum);
+ assert(_replay_end_serial_num == _serialNum);
+ }
_owner.onTransactionLogReplayDone();
_tlsMgr.replayDone();
changeToNormalFeedState();
@@ -412,6 +418,7 @@ FeedHandler::FeedHandler(IThreadingService &writeService,
_tlsReplayProgress(),
_serialNum(0),
_prunedSerialNum(0),
+ _replay_end_serial_num(0u),
_prepare_serial_num(0u),
_numOperationsPendingCommit(0),
_numOperationsCompleted(0),
@@ -435,13 +442,14 @@ FeedHandler::~FeedHandler() = default;
void
FeedHandler::init(SerialNum oldestConfigSerial)
{
- _tlsMgr.init(oldestConfigSerial, _prunedSerialNum, _serialNum);
+ _tlsMgr.init(oldestConfigSerial, _prunedSerialNum, _replay_end_serial_num);
+ _serialNum = _prunedSerialNum;
if (_tlsWriter == nullptr) {
_tlsMgrWriter = std::make_unique<TlsMgrWriter>(_tlsMgr, _tlsWriterfactory);
_tlsWriter = _tlsMgrWriter.get();
}
_allowSync = true;
- syncTls(_serialNum);
+ syncTls(_replay_end_serial_num);
}
@@ -464,18 +472,18 @@ FeedHandler::replayTransactionLog(SerialNum flushedIndexMgrSerial, SerialNum flu
assert(_activeFeedView);
assert(_bucketDBHandler);
auto state = make_shared<ReplayTransactionLogState>
- (getDocTypeName(), _activeFeedView, *_bucketDBHandler, _replayConfig, config_store);
+ (getDocTypeName(), _activeFeedView, *_bucketDBHandler, _replayConfig, config_store, *this);
changeFeedState(state);
// Resurrected attribute vector might cause oldestFlushedSerial to
// be lower than _prunedSerialNum, so don't warn for now.
(void) oldestFlushedSerial;
- assert(_serialNum >= newestFlushedSerial);
+ assert(_replay_end_serial_num >= newestFlushedSerial);
TransactionLogManager::prepareReplay(_tlsMgr.getClient(), _docTypeName.getName(),
flushedIndexMgrSerial, flushedSummaryMgrSerial, config_store);
- _tlsReplayProgress = _tlsMgr.make_replay_progress(_prunedSerialNum, _serialNum);
- _tlsMgr.startReplay(_prunedSerialNum, _serialNum, *this);
+ _tlsReplayProgress = _tlsMgr.make_replay_progress(_serialNum, _replay_end_serial_num);
+ _tlsMgr.startReplay(_serialNum, _replay_end_serial_num, *this);
}
void
diff --git a/searchcore/src/vespa/searchcore/proton/server/feedhandler.h b/searchcore/src/vespa/searchcore/proton/server/feedhandler.h
index 6d551c785a1..777b8a0c81d 100644
--- a/searchcore/src/vespa/searchcore/proton/server/feedhandler.h
+++ b/searchcore/src/vespa/searchcore/proton/server/feedhandler.h
@@ -2,6 +2,7 @@
#pragma once
+#include "i_inc_serial_num.h"
#include "i_operation_storer.h"
#include "idocumentmovehandler.h"
#include "igetserialnum.h"
@@ -48,7 +49,8 @@ class FeedHandler: private search::transactionlog::client::Callback,
public IPruneRemovedDocumentsHandler,
public IHeartBeatHandler,
public IOperationStorer,
- public IGetSerialNum
+ public IGetSerialNum,
+ public IIncSerialNum
{
private:
using Packet = search::transactionlog::Packet;
@@ -73,9 +75,11 @@ private:
std::unique_ptr<TlsWriter> _tlsMgrWriter;
TlsWriter *_tlsWriter;
TlsReplayProgress::UP _tlsReplayProgress;
- // the serial num of the last message in the transaction log
+ // the serial num of the last feed operation processed by feed handler.
SerialNum _serialNum;
SerialNum _prunedSerialNum;
+ // the serial num of the last feed operation in the transaction log at startup before replay
+ SerialNum _replay_end_serial_num;
uint64_t _prepare_serial_num;
size_t _numOperationsPendingCommit;
size_t _numOperationsCompleted;
@@ -208,8 +212,11 @@ public:
}
void setSerialNum(SerialNum serialNum) { _serialNum = serialNum; }
- SerialNum incSerialNum() { return ++_serialNum; }
+ SerialNum incSerialNum() override { return ++_serialNum; }
SerialNum getSerialNum() const override { return _serialNum; }
+ // The two following methods are used when saving initial config
+ SerialNum get_replay_end_serial_num() const { return _replay_end_serial_num; }
+ SerialNum inc_replay_end_serial_num() { return ++_replay_end_serial_num; }
SerialNum getPrunedSerialNum() const { return _prunedSerialNum; }
uint64_t inc_prepare_serial_num() { return ++_prepare_serial_num; }
diff --git a/searchcore/src/vespa/searchcore/proton/server/feedstates.cpp b/searchcore/src/vespa/searchcore/proton/server/feedstates.cpp
index 656006bf3a6..7ab7b95ae24 100644
--- a/searchcore/src/vespa/searchcore/proton/server/feedstates.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/feedstates.cpp
@@ -11,6 +11,7 @@
#include <vespa/searchlib/common/idestructorcallback.h>
#include <vespa/vespalib/util/closuretask.h>
#include <vespa/vespalib/util/lambdatask.h>
+#include <cassert>
#include <vespa/log/log.h>
LOG_SETUP(".proton.server.feedstates");
@@ -68,17 +69,20 @@ class TransactionLogReplayPacketHandler : public IReplayPacketHandler {
IBucketDBHandler &_bucketDBHandler;
IReplayConfig &_replay_config;
FeedConfigStore &_config_store;
+ IIncSerialNum &_inc_serial_num;
CommitTimeTracker _commitTimeTracker;
public:
TransactionLogReplayPacketHandler(IFeedView *& feed_view_ptr,
IBucketDBHandler &bucketDBHandler,
IReplayConfig &replay_config,
- FeedConfigStore &config_store)
+ FeedConfigStore &config_store,
+ IIncSerialNum &inc_serial_num)
: _feed_view_ptr(feed_view_ptr),
_bucketDBHandler(bucketDBHandler),
_replay_config(replay_config),
_config_store(config_store),
+ _inc_serial_num(inc_serial_num),
_commitTimeTracker(5ms)
{ }
@@ -126,6 +130,13 @@ public:
const document::DocumentTypeRepo &getDeserializeRepo() override {
return *_feed_view_ptr->getDocumentTypeRepo();
}
+ void check_serial_num(search::SerialNum serial_num) override {
+ auto exp_serial_num = _inc_serial_num.incSerialNum();
+ if (exp_serial_num != serial_num) {
+ LOG(warning, "Expected replay serial number %" PRIu64 ", got serial number %" PRIu64, exp_serial_num, serial_num);
+ assert(exp_serial_num == serial_num);
+ }
+ }
void optionalCommit(search::SerialNum serialNum) override {
if (_commitTimeTracker.needCommit()) {
_feed_view_ptr->forceCommit(serialNum);
@@ -137,9 +148,11 @@ void startDispatch(IReplayPacketHandler *packet_handler, const Packet::Entry &en
// Called by handlePacket() in executor thread.
LOG(spam, "replay packet entry: entrySerial(%" PRIu64 "), entryType(%u)", entry.serial(), entry.type());
+ auto entry_serial_num = entry.serial();
+ packet_handler->check_serial_num(entry_serial_num);
ReplayPacketDispatcher dispatcher(*packet_handler);
dispatcher.replayEntry(entry);
- packet_handler->optionalCommit(entry.serial());
+ packet_handler->optionalCommit(entry_serial_num);
}
} // namespace
@@ -149,10 +162,11 @@ ReplayTransactionLogState::ReplayTransactionLogState(
IFeedView *& feed_view_ptr,
IBucketDBHandler &bucketDBHandler,
IReplayConfig &replay_config,
- FeedConfigStore &config_store)
+ FeedConfigStore &config_store,
+ IIncSerialNum& inc_serial_num)
: FeedState(REPLAY_TRANSACTION_LOG),
_doc_type_name(name),
- _packet_handler(std::make_unique<TransactionLogReplayPacketHandler>(feed_view_ptr, bucketDBHandler, replay_config, config_store))
+ _packet_handler(std::make_unique<TransactionLogReplayPacketHandler>(feed_view_ptr, bucketDBHandler, replay_config, config_store, inc_serial_num))
{ }
ReplayTransactionLogState::~ReplayTransactionLogState() = default;
diff --git a/searchcore/src/vespa/searchcore/proton/server/feedstates.h b/searchcore/src/vespa/searchcore/proton/server/feedstates.h
index 3d24f4068a2..8804c77f78d 100644
--- a/searchcore/src/vespa/searchcore/proton/server/feedstates.h
+++ b/searchcore/src/vespa/searchcore/proton/server/feedstates.h
@@ -54,7 +54,8 @@ public:
IFeedView *& feed_view_ptr,
bucketdb::IBucketDBHandler &bucketDBHandler,
IReplayConfig &replay_config,
- FeedConfigStore &config_store);
+ FeedConfigStore &config_store,
+ IIncSerialNum &inc_serial_num);
~ReplayTransactionLogState() override;
void handleOperation(FeedToken, FeedOperationUP op) override {
diff --git a/searchcore/src/vespa/searchcore/proton/server/i_inc_serial_num.h b/searchcore/src/vespa/searchcore/proton/server/i_inc_serial_num.h
new file mode 100644
index 00000000000..64afcf49f94
--- /dev/null
+++ b/searchcore/src/vespa/searchcore/proton/server/i_inc_serial_num.h
@@ -0,0 +1,19 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include <vespa/searchlib/common/serialnum.h>
+
+namespace proton {
+
+/**
+ * A simple interface for increasing a serial number and getting the new
+ * serial number.
+ **/
+class IIncSerialNum {
+public:
+ virtual ~IIncSerialNum() = default;
+ virtual search::SerialNum incSerialNum() = 0;
+};
+
+}
diff --git a/searchcore/src/vespa/searchcore/proton/server/ireplaypackethandler.h b/searchcore/src/vespa/searchcore/proton/server/ireplaypackethandler.h
index 77caa52b5d5..bd96ee0f724 100644
--- a/searchcore/src/vespa/searchcore/proton/server/ireplaypackethandler.h
+++ b/searchcore/src/vespa/searchcore/proton/server/ireplaypackethandler.h
@@ -42,6 +42,7 @@ struct IReplayPacketHandler
virtual void replay(const MoveOperation &op) = 0;
virtual void replay(const CreateBucketOperation &op) = 0;
virtual void replay(const CompactLidSpaceOperation &op) = 0;
+ virtual void check_serial_num(search::SerialNum serial_num) = 0;
virtual void optionalCommit(search::SerialNum) = 0;
virtual feedoperation::IStreamHandler &getNewConfigStreamHandler() = 0;
diff --git a/searchcore/src/vespa/searchcore/proton/server/transactionlogmanager.cpp b/searchcore/src/vespa/searchcore/proton/server/transactionlogmanager.cpp
index 029e32d254e..070cfd8085e 100644
--- a/searchcore/src/vespa/searchcore/proton/server/transactionlogmanager.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/transactionlogmanager.cpp
@@ -34,11 +34,11 @@ TransactionLogManager::TransactionLogManager(const vespalib::string &tlsSpec, co
TransactionLogManager::~TransactionLogManager() = default;
void
-TransactionLogManager::init(SerialNum oldestConfigSerial, SerialNum &prunedSerialNum, SerialNum &serialNum)
+TransactionLogManager::init(SerialNum oldestConfigSerial, SerialNum &prunedSerialNum, SerialNum &replay_end_serial_num)
{
StatusResult res = TransactionLogManagerBase::init();
prunedSerialNum = res.serialBegin > 0 ? (res.serialBegin - 1) : 0;
- serialNum = res.serialEnd;
+ replay_end_serial_num = res.serialEnd;
if (oldestConfigSerial != 0) {
prunedSerialNum = std::max(prunedSerialNum, oldestConfigSerial);
}
diff --git a/searchcore/src/vespa/searchcore/proton/server/transactionlogmanager.h b/searchcore/src/vespa/searchcore/proton/server/transactionlogmanager.h
index 8ff9dad26a7..6ed6990d8d5 100644
--- a/searchcore/src/vespa/searchcore/proton/server/transactionlogmanager.h
+++ b/searchcore/src/vespa/searchcore/proton/server/transactionlogmanager.h
@@ -33,10 +33,10 @@ public:
* @param oldestConfigSerial the serial num of the oldest config.
* @param the pruned serial num will be set to 1 lower than
* the serial num of the first entry in the transaction log.
- * @param the current serial num will be set to 1 higher than
- * the serial num of the last entry in the transaction log.
+ * @param replay_end_serial_num will be set to the serial num of
+ * the last entry in the transaction log.
**/
- void init(SerialNum oldestConfigSerial, SerialNum &prunedSerialNum, SerialNum &serialNum);
+ void init(SerialNum oldestConfigSerial, SerialNum &prunedSerialNum, SerialNum &replay_end_serial_num);
/**
* Prepare replay of the transaction log.