summaryrefslogtreecommitdiffstats
path: root/searchcore
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2017-10-12 14:18:26 +0200
committerHenning Baldersheim <balder@yahoo-inc.com>2017-10-17 15:52:10 +0200
commitb2de6349a5e71ac46b1bd7f38ff38c96f4b1a189 (patch)
tree25b6e907470902fbf507f4913bead957ec424a8a /searchcore
parent4d335b9bde8f4c4b624471a5eb956af4ae6ba73e (diff)
Introduce async write interface in the TLS.
Wire it in all the way up and in to proton. The implementation is still synchronous.
Diffstat (limited to 'searchcore')
-rw-r--r--searchcore/src/tests/proton/docsummary/docsummary.cpp5
-rw-r--r--searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp2
-rw-r--r--searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_compaction_test.cpp2
-rw-r--r--searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp42
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/documentdb.cpp13
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp60
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/feedhandler.h5
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/i_operation_storer.h7
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.cpp12
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/tlcproxy.cpp10
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/tlcproxy.h14
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/tlswriter.h8
12 files changed, 88 insertions, 92 deletions
diff --git a/searchcore/src/tests/proton/docsummary/docsummary.cpp b/searchcore/src/tests/proton/docsummary/docsummary.cpp
index 3b199d266a8..bce3fb7267c 100644
--- a/searchcore/src/tests/proton/docsummary/docsummary.cpp
+++ b/searchcore/src/tests/proton/docsummary/docsummary.cpp
@@ -21,9 +21,8 @@
#include <vespa/searchcore/proton/server/memoryconfigstore.h>
#include <vespa/searchcore/proton/server/searchview.h>
#include <vespa/searchcore/proton/server/summaryadapter.h>
-#include <vespa/searchlib/common/idestructorcallback.h>
+#include <vespa/searchlib/common/gatecallback.h>
#include <vespa/searchlib/common/transport.h>
-#include <vespa/searchlib/docstore/logdocumentstore.h>
#include <vespa/searchlib/engine/docsumapi.h>
#include <vespa/searchlib/index/docbuilder.h>
#include <vespa/searchlib/index/dummyfileheadercontext.h>
@@ -254,7 +253,7 @@ public:
op.setSerialNum(serialNum);
op.setDbDocumentId(dbdId);
op.setPrevDbDocumentId(prevDbdId);
- _ddb->getFeedHandler().storeOperation(op);
+ _ddb->getFeedHandler().storeOperation(op, std::make_shared<search::IgnoreCallback>());
SearchView *sv(dynamic_cast<SearchView *>(_ddb->getReadySubDB()->getSearchView().get()));
if (sv != NULL) {
// cf. FeedView::putAttributes()
diff --git a/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp b/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp
index 823c31dd1c2..b8ffc41d3cd 100644
--- a/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp
+++ b/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp
@@ -370,7 +370,7 @@ struct MyTlsWriter : TlsWriter {
bool erase_return;
MyTlsWriter() : store_count(0), erase_count(0), erase_return(true) {}
- void storeOperation(const FeedOperation &) override { ++store_count; }
+ void storeOperation(const FeedOperation &, DoneCallback) override { ++store_count; }
bool erase(SerialNum) override { ++erase_count; return erase_return; }
SerialNum sync(SerialNum syncTo) override {
diff --git a/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_compaction_test.cpp b/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_compaction_test.cpp
index 55f71da9687..56bd99c90f6 100644
--- a/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_compaction_test.cpp
+++ b/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_compaction_test.cpp
@@ -138,7 +138,7 @@ struct MyStorer : public IOperationStorer
: _moveCnt(0),
_compactCnt(0)
{}
- virtual void storeOperation(FeedOperation &op) override {
+ void storeOperation(const FeedOperation &op, DoneCallback) override {
if (op.getType() == FeedOperation::MOVE) {
++ _moveCnt;
} else if (op.getType() == FeedOperation::COMPACT_LID_SPACE) {
diff --git a/searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp b/searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp
index 559dbb240a8..f20ad01bcf6 100644
--- a/searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp
+++ b/searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp
@@ -26,7 +26,7 @@
#include <vespa/searchcore/proton/test/test.h>
#include <vespa/searchlib/attribute/attributecontext.h>
#include <vespa/searchlib/attribute/attributeguard.h>
-#include <vespa/searchlib/common/idestructorcallback.h>
+#include <vespa/searchlib/common/gatecallback.h>
#include <vespa/searchlib/common/idocumentmetastore.h>
#include <vespa/searchlib/index/docbuilder.h>
#include <vespa/vespalib/data/slime/slime.h>
@@ -232,7 +232,7 @@ public:
}
// Implements IOperationStorer
- virtual void storeOperation(FeedOperation &op) override;
+ virtual void storeOperation(const FeedOperation &op, DoneCallback) override;
uint32_t getHeartBeats() {
return _heartBeats;
@@ -781,7 +781,6 @@ MyFeedHandler::isExecutorThread()
void
MyFeedHandler::handleMove(MoveOperation &op, IDestructorCallback::SP moveDoneCtx)
{
- (void) moveDoneCtx;
assert(isExecutorThread());
assert(op.getValidPrevDbdId());
_subDBs[op.getSubDbId()]->prepareMove(op);
@@ -792,7 +791,7 @@ MyFeedHandler::handleMove(MoveOperation &op, IDestructorCallback::SP moveDoneCtx
assert(op.getPrevSubDbId() != 1u);
assert(op.getSubDbId() < _subDBs.size());
assert(op.getPrevSubDbId() < _subDBs.size());
- storeOperation(op);
+ storeOperation(op, std::move(moveDoneCtx));
_subDBs[op.getSubDbId()]->handleMove(op);
_subDBs[op.getPrevSubDbId()]->handleMove(op);
}
@@ -803,7 +802,7 @@ MyFeedHandler::performPruneRemovedDocuments(PruneRemovedDocumentsOperation &op)
{
assert(isExecutorThread());
if (op.getLidsToRemove()->getNumLids() != 0u) {
- storeOperation(op);
+ storeOperation(op, std::make_shared<search::IgnoreCallback>());
// magic number.
_subDBs[1u]->handlePruneRemovedDocuments(op);
}
@@ -826,9 +825,9 @@ MyFeedHandler::setSubDBs(const std::vector<MyDocumentSubDB *> &subDBs)
void
-MyFeedHandler::storeOperation(FeedOperation &op)
+MyFeedHandler::storeOperation(const FeedOperation &op, DoneCallback)
{
- op.setSerialNum(incSerialNum());
+ const_cast<FeedOperation &>(op).setSerialNum(incSerialNum());
}
@@ -1011,22 +1010,16 @@ MaintenanceControllerFixture::performForwardMaintenanceConfig()
void
-MaintenanceControllerFixture::insertDocs(const test::UserDocuments &docs,
- MyDocumentSubDB &subDb)
+MaintenanceControllerFixture::insertDocs(const test::UserDocuments &docs, MyDocumentSubDB &subDb)
{
- for (test::UserDocuments::Iterator itr = docs.begin();
- itr != docs.end();
- ++itr) {
+ for (auto itr = docs.begin(); itr != docs.end(); ++itr) {
const test::BucketDocuments &bucketDocs = itr->second;
for (size_t i = 0; i < bucketDocs.getDocs().size(); ++i) {
const test::Document &testDoc = bucketDocs.getDocs()[i];
- PutOperation op(testDoc.getBucket(),
- testDoc.getTimestamp(),
- testDoc.getDoc());
- op.setDbDocumentId(DbDocumentId(subDb.getSubDBId(),
- testDoc.getLid()));
- _fh.storeOperation(op);
+ PutOperation op(testDoc.getBucket(), testDoc.getTimestamp(), testDoc.getDoc());
+ op.setDbDocumentId(DbDocumentId(subDb.getSubDBId(), testDoc.getLid()));
+ _fh.storeOperation(op, std::make_shared<search::IgnoreCallback>());
subDb.handlePut(op);
}
}
@@ -1038,18 +1031,13 @@ MaintenanceControllerFixture::removeDocs(const test::UserDocuments &docs,
Timestamp timestamp)
{
- for (test::UserDocuments::Iterator itr = docs.begin();
- itr != docs.end();
- ++itr) {
+ for (auto itr = docs.begin(); itr != docs.end(); ++itr) {
const test::BucketDocuments &bucketDocs = itr->second;
for (size_t i = 0; i < bucketDocs.getDocs().size(); ++i) {
const test::Document &testDoc = bucketDocs.getDocs()[i];
- RemoveOperation op(testDoc.getBucket(),
- timestamp,
- testDoc.getDoc()->getId());
- op.setDbDocumentId(DbDocumentId(_removed.getSubDBId(),
- testDoc.getLid()));
- _fh.storeOperation(op);
+ RemoveOperation op(testDoc.getBucket(), timestamp, testDoc.getDoc()->getId());
+ op.setDbDocumentId(DbDocumentId(_removed.getSubDBId(), testDoc.getLid()));
+ _fh.storeOperation(op, std::make_shared<search::IgnoreCallback>());
_removed.handleRemove(op);
}
}
diff --git a/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp b/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp
index 4198803d1fe..5babacfc4b6 100644
--- a/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp
@@ -307,7 +307,7 @@ DocumentDB::enterReprocessState()
if (!runner.empty()) {
runner.run();
NoopOperation op;
- _feedHandler.storeOperation(op);
+ _feedHandler.storeOperationSync(op);
sync(op.getSerialNum());
_subDBs.pruneRemovedFields(op.getSerialNum());
}
@@ -397,15 +397,14 @@ DocumentDB::applyConfig(DocumentDBConfig::SP configSnapshot, SerialNum serialNum
_config_store->saveConfig(*configSnapshot, serialNum);
// save entry in transaction log
NewConfigOperation op(serialNum, *_config_store);
- _feedHandler.storeOperation(op);
+ _feedHandler.storeOperationSync(op);
sync(op.getSerialNum());
}
bool hasVisibilityDelayChanged = false;
{
bool elidedConfigSave = equalReplayConfig && tlsReplayDone;
// Flush changes to attributes and memory index, cf. visibilityDelay
- _feedView.get()->forceCommit(elidedConfigSave ? serialNum :
- serialNum - 1);
+ _feedView.get()->forceCommit(elidedConfigSave ? serialNum : serialNum - 1);
_writeService.sync();
fastos::TimeStamp visibilityDelay = configSnapshot->getMaintenanceConfigSP()->getVisibilityDelay();
hasVisibilityDelayChanged = (visibilityDelay != _visibility.getVisibilityDelay());
@@ -585,7 +584,7 @@ DocumentDB::saveInitialConfig(const DocumentDBConfig &configSnapshot)
// pruned at once anyway.
// save noop entry in transaction log
NoopOperation op;
- _feedHandler.storeOperation(op);
+ _feedHandler.storeOperationSync(op);
sync(op.getSerialNum());
// Wipe everything in transaction log before initial config.
try {
@@ -609,7 +608,7 @@ DocumentDB::resumeSaveConfig()
SerialNum confSerial = _feedHandler.incSerialNum();
// resume operation, i.e. save config entry in transaction log
NewConfigOperation op(confSerial, *_config_store);
- _feedHandler.storeOperation(op);
+ _feedHandler.storeOperationSync(op);
sync(op.getSerialNum());
}
@@ -776,7 +775,7 @@ DocumentDB::enterRedoReprocessState()
runner.run();
_subDBs.onReprocessDone(_feedHandler.getSerialNum());
NoopOperation op;
- _feedHandler.storeOperation(op);
+ _feedHandler.storeOperationSync(op);
sync(op.getSerialNum());
_subDBs.pruneRemovedFields(op.getSerialNum());
}
diff --git a/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp b/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp
index b01ba43cb49..5890489415e 100644
--- a/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp
@@ -12,7 +12,7 @@
#include <vespa/searchcore/proton/persistenceengine/i_resource_write_filter.h>
#include <vespa/searchcore/proton/persistenceengine/transport_latch.h>
#include <vespa/searchcorespi/index/ithreadingservice.h>
-#include <vespa/searchlib/common/idestructorcallback.h>
+#include <vespa/searchlib/common/gatecallback.h>
#include <vespa/vespalib/util/exceptions.h>
#include <unistd.h>
@@ -46,8 +46,8 @@ ignoreOperation(const DocumentOperation &op) {
} // namespace
-void FeedHandler::TlsMgrWriter::storeOperation(const FeedOperation &op) {
- TlcProxy(_tls_mgr.getDomainName(), *_tlsDirectWriter).storeOperation(op);
+void FeedHandler::TlsMgrWriter::storeOperation(const FeedOperation &op, DoneCallback onDone) {
+ TlcProxy(_tls_mgr.getDomainName(), *_tlsDirectWriter).storeOperation(op, std::move(onDone));
}
bool FeedHandler::TlsMgrWriter::erase(SerialNum oldest_to_keep) {
return _tls_mgr.getSession()->erase(oldest_to_keep);
@@ -72,7 +72,6 @@ FeedHandler::TlsMgrWriter::sync(SerialNum syncTo)
LOG(spam, "Tls sync incomplete, reached %" PRIu64 ", retrying", syncedTo);
}
throw IllegalStateException(make_string("Failed to sync TLS to token %" PRIu64 ".", syncTo));
- return 0;
}
void
@@ -94,7 +93,7 @@ void FeedHandler::performPut(FeedToken token, PutOperation &op) {
}
return;
}
- storeOperation(op);
+ storeOperation(op, token);
if (token) {
token->setResult(std::make_unique<Result>(), false);
}
@@ -121,7 +120,7 @@ FeedHandler::performUpdate(FeedToken token, UpdateOperation &op)
void
FeedHandler::performInternalUpdate(FeedToken token, UpdateOperation &op)
{
- storeOperation(op);
+ storeOperation(op, token);
if (token) {
token->setResult(ResultUP(new UpdateResult(op.getPrevTimestamp())), true);
}
@@ -137,7 +136,7 @@ FeedHandler::createNonExistingDocument(FeedToken token, const UpdateOperation &o
op.getUpdate()->applyTo(*doc);
PutOperation putOp(op.getBucketId(), op.getTimestamp(), doc);
_activeFeedView->preparePut(putOp);
- storeOperation(putOp);
+ storeOperation(putOp, token);
if (token) {
token->setResult(ResultUP(new UpdateResult(putOp.getTimestamp())), true);
}
@@ -160,7 +159,7 @@ void FeedHandler::performRemove(FeedToken token, RemoveOperation &op) {
if (op.getPrevDbDocumentId().valid()) {
assert(op.getValidNewOrPrevDbdId());
assert(op.notMovingLidInSameSubDb());
- storeOperation(op);
+ storeOperation(op, token);
if (token) {
bool documentWasFound = !op.getPrevMarkedAsRemoved();
token->setResult(ResultUP(new RemoveResult(documentWasFound)), documentWasFound);
@@ -168,7 +167,7 @@ void FeedHandler::performRemove(FeedToken token, RemoveOperation &op) {
_activeFeedView->handleRemove(std::move(token), op);
} else if (op.hasDocType()) {
assert(op.getDocType() == _docTypeName.getName());
- storeOperation(op);
+ storeOperation(op, token);
if (token) {
token->setResult(ResultUP(new RemoveResult(false)), false);
}
@@ -186,20 +185,16 @@ FeedHandler::performGarbageCollect(FeedToken token)
(void) token;
}
-
void
FeedHandler::performCreateBucket(FeedToken token, CreateBucketOperation &op)
{
- (void) token;
- storeOperation(op);
+ storeOperation(op, std::move(token));
_bucketDBHandler->handleCreateBucket(op.getBucketId());
}
-
void FeedHandler::performDeleteBucket(FeedToken token, DeleteBucketOperation &op) {
- (void) token;
_activeFeedView->prepareDeleteBucket(op);
- storeOperation(op);
+ storeOperation(op, std::move(token));
// Delete documents in bucket
_activeFeedView->handleDeleteBucket(op);
// Delete bucket itself, should no longer have documents.
@@ -207,21 +202,16 @@ void FeedHandler::performDeleteBucket(FeedToken token, DeleteBucketOperation &op
}
-
void FeedHandler::performSplit(FeedToken token, SplitBucketOperation &op) {
- (void) token;
- storeOperation(op);
+ storeOperation(op, std::move(token));
_bucketDBHandler->handleSplit(op.getSerialNum(), op.getSource(), op.getTarget1(), op.getTarget2());
}
-
void FeedHandler::performJoin(FeedToken token, JoinBucketsOperation &op) {
- (void) token;
- storeOperation(op);
+ storeOperation(op, std::move(token));
_bucketDBHandler->handleJoin(op.getSerialNum(), op.getSource1(), op.getSource2(), op.getTarget());
}
-
void
FeedHandler::performSync()
{
@@ -412,18 +402,28 @@ FeedHandler::isDoingReplay() const {
return _tlsMgr.isDoingReplay();
}
-bool FeedHandler::getTransactionLogReplayDone() const {
+bool
+FeedHandler::getTransactionLogReplayDone() const {
return _tlsMgr.getReplayDone();
}
-void FeedHandler::storeOperation(FeedOperation &op) {
+void
+FeedHandler::storeOperation(const FeedOperation &op, TlsWriter::DoneCallback onDone) {
if (!op.getSerialNum()) {
- op.setSerialNum(incSerialNum());
+ const_cast<FeedOperation &>(op).setSerialNum(incSerialNum());
}
- _tlsWriter.storeOperation(op);
+ _tlsWriter.storeOperation(op, std::move(onDone));
}
-void FeedHandler::tlsPrune(SerialNum oldest_to_keep) {
+void
+FeedHandler::storeOperationSync(const FeedOperation &op) {
+ vespalib::Gate gate;
+ storeOperation(op, std::make_shared<search::GateCallback>(gate));
+ gate.await();
+}
+
+void
+FeedHandler::tlsPrune(SerialNum oldest_to_keep) {
if (!_tlsWriter.erase(oldest_to_keep)) {
throw IllegalStateException(make_string("Failed to prune TLS to token %" PRIu64 ".", oldest_to_keep));
}
@@ -533,7 +533,7 @@ FeedHandler::handleMove(MoveOperation &op, std::shared_ptr<search::IDestructorCa
assert(op.getValidDbdId());
assert(op.getValidPrevDbdId());
assert(op.getSubDbId() != op.getPrevSubDbId());
- storeOperation(op);
+ storeOperation(op, moveDoneCtx);
_activeFeedView->handleMove(op, std::move(moveDoneCtx));
}
@@ -577,7 +577,9 @@ performPruneRemovedDocuments(PruneRemovedDocumentsOperation &pruneOp)
{
const LidVectorContext::SP lids_to_remove = pruneOp.getLidsToRemove();
if (lids_to_remove && lids_to_remove->getNumLids() != 0) {
- storeOperation(pruneOp);
+ vespalib::Gate gate;
+ storeOperation(pruneOp, std::make_shared<search::GateCallback>(gate));
+ gate.await();
_activeFeedView->handlePruneRemovedDocuments(pruneOp);
}
}
diff --git a/searchcore/src/vespa/searchcore/proton/server/feedhandler.h b/searchcore/src/vespa/searchcore/proton/server/feedhandler.h
index d717346883a..8c28fcdc1ea 100644
--- a/searchcore/src/vespa/searchcore/proton/server/feedhandler.h
+++ b/searchcore/src/vespa/searchcore/proton/server/feedhandler.h
@@ -65,7 +65,7 @@ private:
_tls_mgr(tls_mgr),
_tlsDirectWriter(tlsDirectWriter)
{ }
- void storeOperation(const FeedOperation &op) override;
+ void storeOperation(const FeedOperation &op, DoneCallback onDone) override;
bool erase(SerialNum oldest_to_keep) override;
SerialNum sync(SerialNum syncTo) override;
};
@@ -234,7 +234,8 @@ public:
void eof() override;
void performPruneRemovedDocuments(PruneRemovedDocumentsOperation &pruneOp) override;
void syncTls(SerialNum syncTo);
- void storeOperation(FeedOperation &op) override;
+ void storeOperation(const FeedOperation &op, DoneCallback onDone) override;
+ void storeOperationSync(const FeedOperation & op);
void considerDelayedPrune();
};
diff --git a/searchcore/src/vespa/searchcore/proton/server/i_operation_storer.h b/searchcore/src/vespa/searchcore/proton/server/i_operation_storer.h
index 4e5958cd9e2..760250844a8 100644
--- a/searchcore/src/vespa/searchcore/proton/server/i_operation_storer.h
+++ b/searchcore/src/vespa/searchcore/proton/server/i_operation_storer.h
@@ -2,6 +2,8 @@
#pragma once
+#include <vespa/searchlib/transactionlog/common.h>
+
namespace proton {
class FeedOperation;
@@ -11,12 +13,13 @@ class FeedOperation;
*/
struct IOperationStorer
{
- virtual ~IOperationStorer() {}
+ using DoneCallback = search::transactionlog::Writer::DoneCallback;
+ virtual ~IOperationStorer() = default;
/**
* Assign serial number to (if not set) and store the given operation.
*/
- virtual void storeOperation(FeedOperation &op) = 0;
+ virtual void storeOperation(const FeedOperation &op, DoneCallback onDone) = 0;
};
} // namespace proton
diff --git a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.cpp b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.cpp
index d6c1a032cea..2ae8d826ebc 100644
--- a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.cpp
@@ -5,7 +5,8 @@
#include "imaintenancejobrunner.h"
#include "lid_space_compaction_job.h"
#include <vespa/searchcore/proton/common/eventlogger.h>
-#include <vespa/searchlib/common/idestructorcallback.h>
+#include <vespa/searchlib/common/gatecallback.h>
+#include <vespa/vespalib/util/sync.h>
#include <cassert>
#include <vespa/log/log.h>
@@ -55,8 +56,9 @@ LidSpaceCompactionJob::scanDocuments(const LidUsageStats &stats)
return true;
} else {
MoveOperation::UP op = _handler.createMoveOperation(document, stats.getLowestFreeLid());
- _opStorer.storeOperation(*op);
- _handler.handleMove(*op, _moveOpsLimiter->beginOperation());
+ search::IDestructorCallback::SP context = _moveOpsLimiter->beginOperation();
+ _opStorer.storeOperation(*op, context);
+ _handler.handleMove(*op, std::move(context));
if (isBlocked(BlockedReason::OUTSTANDING_OPS)) {
return true;
}
@@ -79,7 +81,9 @@ LidSpaceCompactionJob::compactLidSpace(const LidUsageStats &stats)
{
uint32_t wantedLidLimit = stats.getHighestUsedLid() + 1;
CompactLidSpaceOperation op(_handler.getSubDbId(), wantedLidLimit);
- _opStorer.storeOperation(op);
+ vespalib::Gate gate;
+ _opStorer.storeOperation(op, std::make_shared<search::GateCallback>(gate));
+ gate.await();
_handler.handleCompactLidSpace(op);
EventLogger::lidSpaceCompactionComplete(_handler.getName(), wantedLidLimit);
_shouldCompactLidSpace = false;
diff --git a/searchcore/src/vespa/searchcore/proton/server/tlcproxy.cpp b/searchcore/src/vespa/searchcore/proton/server/tlcproxy.cpp
index 215650b6664..bfc59dee35e 100644
--- a/searchcore/src/vespa/searchcore/proton/server/tlcproxy.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/tlcproxy.cpp
@@ -11,24 +11,24 @@ using search::transactionlog::Packet;
namespace proton {
-void TlcProxy::commit(search::SerialNum serialNum, search::transactionlog::Type type, const vespalib::nbostream &buf)
+void TlcProxy::commit(search::SerialNum serialNum, search::transactionlog::Type type,
+ const vespalib::nbostream &buf, DoneCallback onDone)
{
Packet::Entry entry(serialNum, type, vespalib::ConstBufferRef(buf.c_str(), buf.size()));
Packet packet;
packet.add(entry);
packet.close();
- _tlsDirectWriter.commit(_domain, packet);
-
+ _tlsDirectWriter.commit(_domain, packet, std::move(onDone));
}
void
-TlcProxy::storeOperation(const FeedOperation &op)
+TlcProxy::storeOperation(const FeedOperation &op, DoneCallback onDone)
{
nbostream stream;
op.serialize(stream);
LOG(debug, "storeOperation(): serialNum(%" PRIu64 "), type(%u), size(%zu)",
op.getSerialNum(), (uint32_t)op.getType(), stream.size());
- commit(op.getSerialNum(), (uint32_t)op.getType(), stream);
+ commit(op.getSerialNum(), (uint32_t)op.getType(), stream, std::move(onDone));
}
} // namespace proton
diff --git a/searchcore/src/vespa/searchcore/proton/server/tlcproxy.h b/searchcore/src/vespa/searchcore/proton/server/tlcproxy.h
index 8e4feb2f354..2dc6501731e 100644
--- a/searchcore/src/vespa/searchcore/proton/server/tlcproxy.h
+++ b/searchcore/src/vespa/searchcore/proton/server/tlcproxy.h
@@ -8,18 +8,20 @@ namespace proton {
class FeedOperation;
class TlcProxy {
- vespalib::string _domain;
- search::transactionlog::Writer & _tlsDirectWriter;
+ using DoneCallback = search::transactionlog::Writer::DoneCallback;
+ using Writer = search::transactionlog::Writer;
+ vespalib::string _domain;
+ Writer & _tlsDirectWriter;
- void commit(search::SerialNum serialNum, search::transactionlog::Type type, const vespalib::nbostream &buf);
+ void commit(search::SerialNum serialNum, search::transactionlog::Type type,
+ const vespalib::nbostream &buf, DoneCallback onDone);
public:
typedef std::unique_ptr<TlcProxy> UP;
- TlcProxy(const vespalib::string & domain, search::transactionlog::Writer & writer)
+ TlcProxy(const vespalib::string & domain, Writer & writer)
: _domain(domain), _tlsDirectWriter(writer) {}
- void storeOperation(const FeedOperation &op);
+ void storeOperation(const FeedOperation &op, DoneCallback onDone);
};
} // namespace proton
-
diff --git a/searchcore/src/vespa/searchcore/proton/server/tlswriter.h b/searchcore/src/vespa/searchcore/proton/server/tlswriter.h
index 0956c0ae011..5d51580c0ad 100644
--- a/searchcore/src/vespa/searchcore/proton/server/tlswriter.h
+++ b/searchcore/src/vespa/searchcore/proton/server/tlswriter.h
@@ -2,19 +2,17 @@
#pragma once
+#include "i_operation_storer.h"
#include <vespa/searchlib/common/serialnum.h>
namespace proton {
-class FeedOperation;
-
/**
* Interface for writing to the TransactionLogServer.
*/
-struct TlsWriter {
- virtual ~TlsWriter() {}
+struct TlsWriter : public IOperationStorer {
+ virtual ~TlsWriter() = default;
- virtual void storeOperation(const FeedOperation &op) = 0;
virtual bool erase(search::SerialNum oldest_to_keep) = 0;
virtual search::SerialNum sync(search::SerialNum syncTo) = 0;
};