summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--searchcore/src/vespa/searchcore/proton/common/feedtoken.h27
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp264
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/feedhandler.h37
-rw-r--r--searchcorespi/src/vespa/searchcorespi/index/i_thread_service.h8
-rw-r--r--searchcorespi/src/vespa/searchcorespi/index/ithreadingservice.h6
5 files changed, 88 insertions, 254 deletions
diff --git a/searchcore/src/vespa/searchcore/proton/common/feedtoken.h b/searchcore/src/vespa/searchcore/proton/common/feedtoken.h
index 6bca156ddba..fd97cc6242a 100644
--- a/searchcore/src/vespa/searchcore/proton/common/feedtoken.h
+++ b/searchcore/src/vespa/searchcore/proton/common/feedtoken.h
@@ -8,9 +8,7 @@
#include <vespa/searchcore/proton/feedoperation/feedoperation.h>
#include <atomic>
-namespace proton
-{
-
+namespace proton {
class PerDocTypeFeedMetrics;
typedef std::unique_ptr<storage::spi::Result> ResultUP;
@@ -38,7 +36,6 @@ private:
State & operator = (const State &) = delete;
State(ITransport & transport, mbus::Reply::UP reply, uint32_t numAcksRequired);
~State();
- void setNumAcksRequired(uint32_t numAcksRequired) { _unAckedCount = numAcksRequired; }
void ack();
void ack(const FeedOperation::Type opType, PerDocTypeFeedMetrics &metrics);
@@ -81,6 +78,12 @@ public:
*/
FeedToken(ITransport &transport, mbus::Reply::UP reply);
+ FeedToken(FeedToken &&) = default;
+ FeedToken & operator =(FeedToken &&) = default;
+ FeedToken(const FeedToken &) = default;
+ FeedToken & operator =(const FeedToken &) = default;
+ ~FeedToken() = default;
+
/**
* Passes a receipt back to the originating FeedEngine, declaring that this
* operation succeeded. If an error occured while processing the operation,
@@ -88,15 +91,11 @@ public:
*/
void ack() const { _state->ack(); }
- void
- ack(const FeedOperation::Type opType, PerDocTypeFeedMetrics &metrics) const
- {
+ void ack(const FeedOperation::Type opType, PerDocTypeFeedMetrics &metrics) const {
_state->ack(opType, metrics);
}
- void
- incNeededAcks() const
- {
+ void incNeededAcks() const {
_state->incNeededAcks();
}
@@ -148,14 +147,6 @@ public:
_state->setResult(std::move(result), documentWasFound);
}
- /**
- * This controls how many acks are required before it is acked back to the sender.
- * Default is 1, and so far only adjusted by multioperation handling.
- *
- * @param numAcksRequired How many acks must be received before it is considered acked.
- */
- void setNumAcksRequired(uint32_t numAcksRequired) const { _state->setNumAcksRequired(numAcksRequired); }
-
FastOS_Time getStartTime() const { return _state->getStartTime(); }
};
diff --git a/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp b/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp
index 6b48c94fabd..7cb68a8721b 100644
--- a/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp
@@ -18,7 +18,6 @@
#include <vespa/searchcore/proton/persistenceengine/transport_latch.h>
#include <vespa/searchcorespi/index/ithreadingservice.h>
#include <vespa/searchlib/common/idestructorcallback.h>
-#include <vespa/vespalib/util/closuretask.h>
#include <vespa/vespalib/util/exceptions.h>
#include <unistd.h>
@@ -41,47 +40,40 @@ using storage::spi::Timestamp;
using storage::spi::UpdateResult;
using vespalib::Executor;
using vespalib::IllegalStateException;
-using vespalib::makeClosure;
-using vespalib::makeTask;
+using vespalib::makeLambdaTask;
using vespalib::make_string;
using vespalib::MonitorGuard;
using vespalib::LockGuard;
namespace proton {
-
namespace {
+
void
setUpdateWasFound(mbus::Reply &reply, bool was_found)
{
- assert(static_cast<DocumentReply&>(reply).getType() ==
- DocumentProtocol::REPLY_UPDATEDOCUMENT);
+ assert(static_cast<DocumentReply&>(reply).getType() == DocumentProtocol::REPLY_UPDATEDOCUMENT);
UpdateDocumentReply &update_rep = static_cast<UpdateDocumentReply&>(reply);
update_rep.setWasFound(was_found);
}
-
void
setRemoveWasFound(mbus::Reply &reply, bool was_found)
{
- assert(static_cast<DocumentReply&>(reply).getType() ==
- DocumentProtocol::REPLY_REMOVEDOCUMENT);
+ assert(static_cast<DocumentReply&>(reply).getType() == DocumentProtocol::REPLY_REMOVEDOCUMENT);
RemoveDocumentReply &remove_rep = static_cast<RemoveDocumentReply&>(reply);
remove_rep.setWasFound(was_found);
}
-
bool
ignoreOperation(const DocumentOperation &op)
{
- return op.getPrevTimestamp() != 0 &&
- op.getTimestamp() < op.getPrevTimestamp();
+ return (op.getPrevTimestamp() != 0)
+ && (op.getTimestamp() < op.getPrevTimestamp());
}
-
} // namespace
-
void FeedHandler::TlsMgrWriter::storeOperation(const FeedOperation &op) {
TlcProxy(_tls_mgr.getDomainName(), *_tlsDirectWriter).storeOperation(op);
}
@@ -93,7 +85,6 @@ search::SerialNum
FeedHandler::TlsMgrWriter::sync(SerialNum syncTo)
{
for (int retryCount = 0; retryCount < 10; ++retryCount) {
-
SerialNum syncedTo(0);
LOG(spam, "Trying tls sync(%" PRIu64 ")", syncTo);
bool res = _tls_mgr.getSession()->sync(syncTo, syncedTo);
@@ -103,19 +94,12 @@ FeedHandler::TlsMgrWriter::sync(SerialNum syncTo)
continue;
}
if (syncedTo >= syncTo) {
- LOG(spam,
- "Tls sync complete, reached %" PRIu64", returning",
- syncedTo);
+ LOG(spam, "Tls sync complete, reached %" PRIu64", returning", syncedTo);
return syncedTo;
}
- LOG(spam,
- "Tls sync incomplete, reached %" PRIu64 ", retrying",
- syncedTo);
+ LOG(spam, "Tls sync incomplete, reached %" PRIu64 ", retrying", syncedTo);
}
- throw vespalib::IllegalStateException(
- vespalib::make_string(
- "Failed to sync TLS to token %" PRIu64 ".",
- syncTo));
+ throw IllegalStateException(make_string("Failed to sync TLS to token %" PRIu64 ".", syncTo));
return 0;
}
@@ -123,7 +107,7 @@ void
FeedHandler::doHandleOperation(FeedToken token, FeedOperation::UP op)
{
assert(_writeService.master().isCurrentThread());
- vespalib::LockGuard guard(_feedLock);
+ LockGuard guard(_feedLock);
_feedState->handleOperation(token, std::move(op));
}
@@ -131,33 +115,23 @@ void FeedHandler::performPut(FeedToken::UP token, PutOperation &op) {
op.assertValid();
_activeFeedView->preparePut(op);
if (ignoreOperation(op)) {
- LOG(debug, "performPut(): ignoreOperation: docId(%s), "
- "timestamp(%" PRIu64 "), prevTimestamp(%" PRIu64 ")",
- op.getDocument()->getId().toString().c_str(),
- (uint64_t)op.getTimestamp(),
- (uint64_t)op.getPrevTimestamp());
- if (token.get() != NULL) {
+ LOG(debug, "performPut(): ignoreOperation: docId(%s), timestamp(%" PRIu64 "), prevTimestamp(%" PRIu64 ")",
+ op.getDocument()->getId().toString().c_str(), (uint64_t)op.getTimestamp(), (uint64_t)op.getPrevTimestamp());
+ if (token) {
token->setResult(ResultUP(new Result), false);
token->ack(op.getType(), _metrics);
}
return;
}
storeOperation(op);
- if (token.get() != NULL) {
+ if (token) {
token->setResult(ResultUP(new Result), false);
if (token->shouldTrace(1)) {
const document::DocumentId &docId = op.getDocument()->getId();
const document::GlobalId &gid = docId.getGlobalId();
- token->trace(1,
- make_string(
- "Indexing document '%s' (gid = '%s',"
- " lid = '%u,%u' prevlid = '%u,%u').",
- docId.toString().c_str(),
- gid.toString().c_str(),
- op.getSubDbId(),
- op.getLid(),
- op.getPrevSubDbId(),
- op.getPrevLid()));
+ token->trace(1, make_string("Indexing document '%s' (gid = '%s',lid = '%u,%u' prevlid = '%u,%u').",
+ docId.toString().c_str(), gid.toString().c_str(), op.getSubDbId(),
+ op.getLid(), op.getPrevSubDbId(), op.getPrevLid()));
}
}
_activeFeedView->handlePut(token.get(), op);
@@ -173,15 +147,12 @@ FeedHandler::performUpdate(FeedToken::UP token, UpdateOperation &op)
} else if (op.getUpdate()->getCreateIfNonExistent()) {
createNonExistingDocument(std::move(token), op);
} else {
- if (token.get() != NULL) {
+ if (token) {
token->setResult(ResultUP(new UpdateResult(Timestamp(0))), false);
if (token->shouldTrace(1)) {
const document::DocumentId &docId = op.getUpdate()->getId();
- token->trace(1,
- make_string(
- "Document '%s' not found."
- " Update operation ignored",
- docId.toString().c_str()));
+ token->trace(1, make_string("Document '%s' not found. Update operation ignored",
+ docId.toString().c_str()));
}
setUpdateWasFound(token->getReply(), false);
token->ack(op.getType(), _metrics);
@@ -194,22 +165,14 @@ void
FeedHandler::performInternalUpdate(FeedToken::UP token, UpdateOperation &op)
{
storeOperation(op);
- if (token.get() != NULL) {
- token->setResult(ResultUP(new UpdateResult(op.getPrevTimestamp())),
- true);
+ if (token) {
+ token->setResult(ResultUP(new UpdateResult(op.getPrevTimestamp())), true);
if (token->shouldTrace(1)) {
const document::DocumentId &docId = op.getUpdate()->getId();
const document::GlobalId &gid = docId.getGlobalId();
- token->trace(1,
- make_string(
- "Updating document '%s' (gid = '%s',"
- " lid = '%u,%u' prevlid = '%u,%u').",
- docId.toString().c_str(),
- gid.toString().c_str(),
- op.getSubDbId(),
- op.getLid(),
- op.getPrevSubDbId(),
- op.getPrevLid()));
+ token->trace(1, make_string("Updating document '%s' (gid = '%s', lid = '%u,%u' prevlid = '%u,%u').",
+ docId.toString().c_str(), gid.toString().c_str(), op.getSubDbId(),
+ op.getLid(), op.getPrevSubDbId(), op.getPrevLid()));
}
setUpdateWasFound(token->getReply(), true);
}
@@ -226,7 +189,7 @@ FeedHandler::createNonExistingDocument(FeedToken::UP token, const UpdateOperatio
PutOperation putOp(op.getBucketId(), op.getTimestamp(), doc);
_activeFeedView->preparePut(putOp);
storeOperation(putOp);
- if (token.get() != NULL) {
+ if (token) {
token->setResult(ResultUP(new UpdateResult(putOp.getTimestamp())), true);
if (token->shouldTrace(1)) {
const document::DocumentId &docId = putOp.getDocument()->getId();
@@ -246,7 +209,7 @@ FeedHandler::createNonExistingDocument(FeedToken::UP token, const UpdateOperatio
FeedToken putToken(latch, mbus::Reply::UP(new FeedReply(DocumentProtocol::REPLY_PUTDOCUMENT)));
_activeFeedView->handlePut(&putToken, putOp);
latch.await();
- if (token.get() != NULL) {
+ if (token) {
token->ack();
}
}
@@ -255,12 +218,9 @@ FeedHandler::createNonExistingDocument(FeedToken::UP token, const UpdateOperatio
void FeedHandler::performRemove(FeedToken::UP token, RemoveOperation &op) {
_activeFeedView->prepareRemove(op);
if (ignoreOperation(op)) {
- LOG(debug, "performRemove(): ignoreOperation: docId(%s), "
- "timestamp(%" PRIu64 "), prevTimestamp(%" PRIu64 ")",
- op.getDocumentId().toString().c_str(),
- (uint64_t)op.getTimestamp(),
- (uint64_t)op.getPrevTimestamp());
- if (token.get() != NULL) {
+ LOG(debug, "performRemove(): ignoreOperation: docId(%s), timestamp(%" PRIu64 "), prevTimestamp(%" PRIu64 ")",
+ op.getDocumentId().toString().c_str(), (uint64_t)op.getTimestamp(), (uint64_t)op.getPrevTimestamp());
+ if (token) {
token->setResult(ResultUP(new RemoveResult(false)), false);
token->ack(op.getType(), _metrics);
}
@@ -270,23 +230,15 @@ void FeedHandler::performRemove(FeedToken::UP token, RemoveOperation &op) {
assert(op.getValidNewOrPrevDbdId());
assert(op.notMovingLidInSameSubDb());
storeOperation(op);
- if (token.get() != NULL) {
+ if (token) {
bool documentWasFound = !op.getPrevMarkedAsRemoved();
- token->setResult(ResultUP(new RemoveResult(documentWasFound)),
- documentWasFound);
+ token->setResult(ResultUP(new RemoveResult(documentWasFound)), documentWasFound);
if (token->shouldTrace(1)) {
const document::DocumentId &docId = op.getDocumentId();
const document::GlobalId &gid = docId.getGlobalId();
- token->trace(1,
- make_string(
- "Removing document '%s' (gid = '%s',"
- " lid = '%u,%u' prevlid = '%u,%u').",
- docId.toString().c_str(),
- gid.toString().c_str(),
- op.getSubDbId(),
- op.getLid(),
- op.getPrevSubDbId(),
- op.getPrevLid()));
+ token->trace(1, make_string("Removing document '%s' (gid = '%s', lid = '%u,%u' prevlid = '%u,%u').",
+ docId.toString().c_str(), gid.toString().c_str(), op.getSubDbId(),
+ op.getLid(), op.getPrevSubDbId(), op.getPrevLid()));
}
setRemoveWasFound(token->getReply(), documentWasFound);
}
@@ -294,27 +246,21 @@ void FeedHandler::performRemove(FeedToken::UP token, RemoveOperation &op) {
} else if (op.hasDocType()) {
assert(op.getDocType() == _docTypeName.getName());
storeOperation(op);
- if (token.get() != NULL) {
+ if (token) {
token->setResult(ResultUP(new RemoveResult(false)), false);
if (token->shouldTrace(1)) {
- token->trace(1,
- make_string(
- "Document '%s' not found."
- " Remove operation stored.",
- op.getDocumentId().toString().c_str()));
+ token->trace(1, make_string("Document '%s' not found. Remove operation stored.",
+ op.getDocumentId().toString().c_str()));
}
setRemoveWasFound(token->getReply(), false);
}
_activeFeedView->handleRemove(token.get(), op);
} else {
- if (token.get() != NULL) {
+ if (token) {
token->setResult(ResultUP(new RemoveResult(false)), false);
if (token->shouldTrace(1)) {
- token->trace(1,
- make_string(
- "Document '%s' not found."
- " Remove operation ignored",
- op.getDocumentId().toString().c_str()));
+ token->trace(1, make_string("Document '%s' not found. Remove operation ignored",
+ op.getDocumentId().toString().c_str()));
}
setRemoveWasFound(token->getReply(), false);
token->ack(op.getType(), _metrics);
@@ -325,15 +271,14 @@ void FeedHandler::performRemove(FeedToken::UP token, RemoveOperation &op) {
void
FeedHandler::performGarbageCollect(FeedToken::UP token)
{
- if (token.get() != NULL) {
+ if (token) {
token->ack();
}
}
void
-FeedHandler::performCreateBucket(FeedToken::UP token,
- CreateBucketOperation &op)
+FeedHandler::performCreateBucket(FeedToken::UP token, CreateBucketOperation &op)
{
storeOperation(op);
_bucketDBHandler->handleCreateBucket(op.getBucketId());
@@ -343,8 +288,7 @@ FeedHandler::performCreateBucket(FeedToken::UP token,
}
-void FeedHandler::performDeleteBucket(FeedToken::UP token,
- DeleteBucketOperation &op) {
+void FeedHandler::performDeleteBucket(FeedToken::UP token, DeleteBucketOperation &op) {
_activeFeedView->prepareDeleteBucket(op);
storeOperation(op);
// Delete documents in bucket
@@ -359,10 +303,7 @@ void FeedHandler::performDeleteBucket(FeedToken::UP token,
void FeedHandler::performSplit(FeedToken::UP token, SplitBucketOperation &op) {
storeOperation(op);
- _bucketDBHandler->handleSplit(op.getSerialNum(),
- op.getSource(),
- op.getTarget1(),
- op.getTarget2());
+ _bucketDBHandler->handleSplit(op.getSerialNum(), op.getSource(), op.getTarget1(), op.getTarget2());
if (token) {
token->ack();
}
@@ -371,10 +312,7 @@ void FeedHandler::performSplit(FeedToken::UP token, SplitBucketOperation &op) {
void FeedHandler::performJoin(FeedToken::UP token, JoinBucketsOperation &op) {
storeOperation(op);
- _bucketDBHandler->handleJoin(op.getSerialNum(),
- op.getSource1(),
- op.getSource2(),
- op.getTarget());
+ _bucketDBHandler->handleJoin(op.getSerialNum(), op.getSource1(), op.getSource2(), op.getTarget());
if (token) {
token->ack();
}
@@ -393,9 +331,7 @@ FeedHandler::performEof()
{
assert(_writeService.master().isCurrentThread());
_writeService.sync();
- LOG(debug,
- "Visiting done for transaction log domain '%s', eof received",
- _tlsMgr.getDomainName().c_str());
+ LOG(debug, "Visiting done for transaction log domain '%s', eof received", _tlsMgr.getDomainName().c_str());
_owner.onTransactionLogReplayDone();
_tlsMgr.replayDone();
changeToNormalFeedState();
@@ -430,7 +366,7 @@ FeedHandler::performPrune(SerialNum flushedSerial)
tlsPrune(flushedSerial); // throws on error
LOG(debug, "Pruned TLS to token %" PRIu64 ".", flushedSerial);
_owner.onPerformPrune(flushedSerial);
- } catch (const vespalib::IllegalStateException & e) {
+ } catch (const IllegalStateException & e) {
LOG(warning, "FeedHandler::performPrune failed due to '%s'.", e.what());
}
}
@@ -451,7 +387,7 @@ FeedHandler::getFeedState() const
{
FeedState::SP state;
{
- vespalib::LockGuard guard(_feedLock);
+ LockGuard guard(_feedLock);
state = _feedState;
}
return state;
@@ -461,17 +397,15 @@ FeedHandler::getFeedState() const
void
FeedHandler::changeFeedState(FeedState::SP newState)
{
- vespalib::LockGuard guard(_feedLock);
+ LockGuard guard(_feedLock);
changeFeedState(newState, guard);
}
void
-FeedHandler::changeFeedState(FeedState::SP newState, const vespalib::LockGuard &)
+FeedHandler::changeFeedState(FeedState::SP newState, const LockGuard &)
{
- LOG(debug,
- "Change feed state from '%s' -> '%s'",
- _feedState->getName().c_str(), newState->getName().c_str());
+ LOG(debug, "Change feed state from '%s' -> '%s'", _feedState->getName().c_str(), newState->getName().c_str());
_feedState = newState;
}
@@ -513,14 +447,10 @@ FeedHandler::FeedHandler(IThreadingService &writeService,
_syncLock(),
_syncedSerialNum(0),
_allowSync(false)
-{
-}
+{ }
-FeedHandler::~FeedHandler()
-{
-}
-
+FeedHandler::~FeedHandler() = default;
// Called on DocumentDB creatio
void
@@ -542,70 +472,40 @@ FeedHandler::close()
_tlsMgr.close();
}
-
void
-FeedHandler::replayTransactionLog(SerialNum flushedIndexMgrSerial,
- SerialNum flushedSummaryMgrSerial,
- SerialNum oldestFlushedSerial,
- SerialNum newestFlushedSerial,
+FeedHandler::replayTransactionLog(SerialNum flushedIndexMgrSerial, SerialNum flushedSummaryMgrSerial,
+ SerialNum oldestFlushedSerial, SerialNum newestFlushedSerial,
ConfigStore &config_store)
{
(void) newestFlushedSerial;
assert(_activeFeedView);
assert(_bucketDBHandler);
FeedState::SP state = std::make_shared<ReplayTransactionLogState>
- (getDocTypeName(),
- _activeFeedView,
- *_bucketDBHandler,
- _replayConfig,
- config_store);
+ (getDocTypeName(), _activeFeedView, *_bucketDBHandler, _replayConfig, config_store);
changeFeedState(state);
// Resurrected attribute vector might cause oldestFlushedSerial to
// be lower than _prunedSerialNum, so don't warn for now.
(void) oldestFlushedSerial;
assert(_serialNum >= newestFlushedSerial);
- TransactionLogManager::prepareReplay(
- _tlsMgr.getClient(),
- _docTypeName.getName(),
- flushedIndexMgrSerial,
- flushedSummaryMgrSerial,
- config_store);
+ TransactionLogManager::prepareReplay(_tlsMgr.getClient(), _docTypeName.getName(),
+ flushedIndexMgrSerial, flushedSummaryMgrSerial, config_store);
_tlsReplayProgress = _tlsMgr.startReplay(_prunedSerialNum, _serialNum, *this);
}
-
void
FeedHandler::flushDone(SerialNum flushedSerial)
{
// Called by flush worker thread after performing a flush task
- _writeService.master().execute(
- makeTask(
- makeClosure(
- this,
- &FeedHandler::performFlushDone,
- flushedSerial)));
+ _writeService.master().execute(makeLambdaTask([this, flushedSerial]() { performFlushDone(flushedSerial); }));
+
}
void FeedHandler::changeToNormalFeedState() {
changeFeedState(FeedState::SP(new NormalState(*this)));
}
-void
-FeedHandler::waitForReplayDone()
-{
- _tlsMgr.waitForReplayDone();
-}
-
-void FeedHandler::setReplayDone() {
- _tlsMgr.changeReplayDone();
-}
-
-bool FeedHandler::getReplayDone() const {
- return _tlsMgr.getReplayDone();
-}
-
bool
FeedHandler::isDoingReplay() const {
return _tlsMgr.isDoingReplay();
@@ -624,9 +524,7 @@ void FeedHandler::storeOperation(FeedOperation &op) {
void FeedHandler::tlsPrune(SerialNum oldest_to_keep) {
if (!_tlsWriter.erase(oldest_to_keep)) {
- throw vespalib::IllegalStateException(vespalib::make_string(
- "Failed to prune TLS to token %" PRIu64 ".",
- oldest_to_keep));
+ throw IllegalStateException(make_string("Failed to prune TLS to token %" PRIu64 ".", oldest_to_keep));
}
_prunedSerialNum = oldest_to_keep;
}
@@ -644,8 +542,8 @@ void feedOperationRejected(FeedToken *token, const vespalib::string &opType, con
DocTypeName docTypeName, const vespalib::string &rejectMessage)
{
if (token) {
- vespalib::string message = make_string("%s operation rejected for document '%s' of type '%s': '%s'",
- opType.c_str(), docId.c_str(), docTypeName.toString().c_str(), rejectMessage.c_str());
+ auto message = make_string("%s operation rejected for document '%s' of type '%s': '%s'",
+ opType.c_str(), docId.c_str(), docTypeName.toString().c_str(), rejectMessage.c_str());
token->setResult(ResultUP(new ResultType(Result::RESOURCE_EXHAUSTED, message)), false);
token->fail(documentapi::DocumentProtocol::ERROR_REJECTED, message);
}
@@ -721,9 +619,9 @@ FeedHandler::performOperation(FeedToken::UP token, FeedOperation::UP op)
void
FeedHandler::handleOperation(FeedToken token, FeedOperation::UP op)
{
- _writeService.master().execute(
- makeTask(makeClosure(this,
- &FeedHandler::doHandleOperation, token, std::move(op))));
+ _writeService.master().execute(makeLambdaTask([this, token = std::move(token), op = std::move(op)]() mutable {
+ doHandleOperation(std::move(token), std::move(op));
+ }));
}
void
@@ -738,7 +636,6 @@ FeedHandler::handleMove(MoveOperation &op, std::shared_ptr<search::IDestructorCa
_activeFeedView->handleMove(op, std::move(moveDoneCtx));
}
-
void
FeedHandler::heartBeat()
{
@@ -746,15 +643,13 @@ FeedHandler::heartBeat()
_activeFeedView->heartBeat(_serialNum);
}
-
void
FeedHandler::sync()
{
- _writeService.master().execute(makeTask(makeClosure(this, &FeedHandler::performSync)));
+ _writeService.master().execute(makeLambdaTask([this]() { performSync(); }));
_writeService.sync();
}
-
FeedHandler::RPC::Result
FeedHandler::receive(const Packet &packet)
{
@@ -768,12 +663,11 @@ FeedHandler::receive(const Packet &packet)
return wrap->result;
}
-
void
FeedHandler::eof()
{
// Only called by visit, subscription gets one or more inSync() callbacks.
- _writeService.master().execute(makeTask(makeClosure(this, &FeedHandler::performEof)));
+ _writeService.master().execute(makeLambdaTask([this]() { performEof(); }));
}
void
@@ -781,13 +675,12 @@ FeedHandler::
performPruneRemovedDocuments(PruneRemovedDocumentsOperation &pruneOp)
{
const LidVectorContext::SP lids_to_remove = pruneOp.getLidsToRemove();
- if (lids_to_remove.get() && lids_to_remove->getNumLids() != 0) {
+ if (lids_to_remove && lids_to_remove->getNumLids() != 0) {
storeOperation(pruneOp);
_activeFeedView->handlePruneRemovedDocuments(pruneOp);
}
}
-
void
FeedHandler::syncTls(SerialNum syncTo)
{
@@ -797,11 +690,7 @@ FeedHandler::syncTls(SerialNum syncTo)
return;
}
if (!_allowSync) {
- throw vespalib::IllegalStateException(
- vespalib::make_string(
- "Attempted to sync TLS to token %" PRIu64
- " at wrong time.",
- syncTo));
+ throw IllegalStateException(make_string("Attempted to sync TLS to token %" PRIu64 " at wrong time.", syncTo));
}
SerialNum syncedTo(_tlsWriter.sync(syncTo));
{
@@ -811,15 +700,4 @@ FeedHandler::syncTls(SerialNum syncTo)
}
}
-void
-FeedHandler::storeRemoteOperation(const FeedOperation &op)
-{
- SerialNum serialNum(op.getSerialNum());
- assert(serialNum != 0);
- if (serialNum > _serialNum) {
- _tlsWriter.storeOperation(op);
- _serialNum = serialNum;
- }
-}
-
} // namespace proton
diff --git a/searchcore/src/vespa/searchcore/proton/server/feedhandler.h b/searchcore/src/vespa/searchcore/proton/server/feedhandler.h
index 94e70ff0fd2..335a86e0279 100644
--- a/searchcore/src/vespa/searchcore/proton/server/feedhandler.h
+++ b/searchcore/src/vespa/searchcore/proton/server/feedhandler.h
@@ -35,12 +35,7 @@ class RemoveOperation;
class SplitBucketOperation;
class UpdateOperation;
-namespace bucketdb
-{
-
-class IBucketDBHandler;
-
-}
+namespace bucketdb { class IBucketDBHandler; }
/**
* Class handling all aspects of feeding for a document database.
@@ -122,18 +117,12 @@ private:
void createNonExistingDocument(FeedTokenUP, const UpdateOperation &op);
void performRemove(FeedTokenUP token, RemoveOperation &op);
-private:
void performGarbageCollect(FeedTokenUP token);
void performCreateBucket(FeedTokenUP token, CreateBucketOperation &op);
void performDeleteBucket(FeedTokenUP token, DeleteBucketOperation &op);
void performSplit(FeedTokenUP token, SplitBucketOperation &op);
void performJoin(FeedTokenUP token, JoinBucketsOperation &op);
void performSync();
-
- /**
- * Used during callback from transaction log.
- */
- void handleTransactionLogEntry(const Packet::Entry &entry);
void performEof();
/**
@@ -142,22 +131,9 @@ private:
void performFlushDone(SerialNum flushedSerial);
void performPrune(SerialNum flushedSerial);
-public:
- void considerDelayedPrune();
-
-private:
- /**
- * Returns the current feed state of this feed handler.
- */
FeedStateSP getFeedState() const;
-
- /**
- * Used to handle feed state transitions.
- */
void changeFeedState(FeedStateSP newState);
-
void changeFeedState(FeedStateSP newState, const vespalib::LockGuard &feedGuard);
-
public:
FeedHandler(const FeedHandler &) = delete;
FeedHandler & operator = (const FeedHandler &) = delete;
@@ -184,7 +160,7 @@ public:
search::transactionlog::Writer & writer,
TlsWriter * tlsWriter = nullptr);
- virtual~FeedHandler();
+ virtual ~FeedHandler();
/**
* Init this feed handler.
@@ -239,18 +215,14 @@ public:
_bucketDBHandler = bucketDBHandler;
}
- void waitForReplayDone();
-
void setSerialNum(SerialNum serialNum) { _serialNum = serialNum; }
SerialNum incSerialNum() { return ++_serialNum; }
SerialNum getSerialNum() const override { return _serialNum; }
SerialNum getPrunedSerialNum() const { return _prunedSerialNum; }
- void setReplayDone();
- bool getReplayDone() const;
bool isDoingReplay() const;
float getReplayProgress() const {
- return _tlsReplayProgress.get() != nullptr ? _tlsReplayProgress->getProgress() : 0;
+ return _tlsReplayProgress ? _tlsReplayProgress->getProgress() : 0;
}
bool getTransactionLogReplayDone() const;
vespalib::string getDocTypeName() const { return _docTypeName.getName(); }
@@ -263,14 +235,13 @@ public:
void heartBeat() override;
virtual void sync();
-
RPC::Result receive(const Packet &packet) override;
void eof() override;
void performPruneRemovedDocuments(PruneRemovedDocumentsOperation &pruneOp) override;
void syncTls(SerialNum syncTo);
- void storeRemoteOperation(const FeedOperation &op);
void storeOperation(FeedOperation &op) override;
+ void considerDelayedPrune();
};
} // namespace proton
diff --git a/searchcorespi/src/vespa/searchcorespi/index/i_thread_service.h b/searchcorespi/src/vespa/searchcorespi/index/i_thread_service.h
index eefd14f7b01..40d92010c9b 100644
--- a/searchcorespi/src/vespa/searchcorespi/index/i_thread_service.h
+++ b/searchcorespi/src/vespa/searchcorespi/index/i_thread_service.h
@@ -4,8 +4,7 @@
#include <vespa/vespalib/util/runnable.h>
#include <vespa/vespalib/util/threadexecutor.h>
-namespace searchcorespi {
-namespace index {
+namespace searchcorespi::index {
/**
* Interface for a single thread used for write tasks.
@@ -29,7 +28,4 @@ struct IThreadService : public vespalib::ThreadExecutor
};
-} // namespace index
-} // namespace searchcorespi
-
-
+}
diff --git a/searchcorespi/src/vespa/searchcorespi/index/ithreadingservice.h b/searchcorespi/src/vespa/searchcorespi/index/ithreadingservice.h
index 65dd8cc1f3f..bded09143ab 100644
--- a/searchcorespi/src/vespa/searchcorespi/index/ithreadingservice.h
+++ b/searchcorespi/src/vespa/searchcorespi/index/ithreadingservice.h
@@ -7,8 +7,7 @@
#include <vespa/vespalib/util/syncable.h>
#include <vespa/searchlib/common/isequencedtaskexecutor.h>
-namespace searchcorespi {
-namespace index {
+namespace searchcorespi::index {
/**
* Interface for the thread model used for write tasks.
@@ -62,5 +61,4 @@ struct IThreadingService : public vespalib::Syncable
virtual search::ISequencedTaskExecutor &attributeFieldWriter() = 0;
};
-} // namespace index
-} // namespace searchcorespi
+}