diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2017-09-30 20:04:06 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-09-30 20:04:06 +0200 |
commit | b5eafc96a98f8e4c834cf3b9799f73bed493c313 (patch) | |
tree | b87b375b5e25aa91be27972d552e3ab59128f616 | |
parent | de6c48e04b825efc0eff25e084e39547acf883f1 (diff) | |
parent | 0b557c51c7e06a4786366b3e97c24dd19df10221 (diff) |
Merge pull request #3602 from vespa-engine/balder/do-more-indirect-commit
Balder/do more indirect commit
14 files changed, 63 insertions, 129 deletions
diff --git a/searchcore/src/apps/tests/persistenceconformance_test.cpp b/searchcore/src/apps/tests/persistenceconformance_test.cpp index 905d0434886..d369fa84542 100644 --- a/searchcore/src/apps/tests/persistenceconformance_test.cpp +++ b/searchcore/src/apps/tests/persistenceconformance_test.cpp @@ -189,11 +189,10 @@ public: const_cast<DocumentDBFactory &>(*this), _summaryExecutor, _summaryExecutor, - NULL, + _tls, _metricsWireService, _fileHeaderContext, - _config_stores.getConfigStore( - docType.toString()), + _config_stores.getConfigStore(docType.toString()), std::make_shared<vespalib::ThreadStackExecutor> (16, 128 * 1024), HwInfo())); diff --git a/searchcore/src/tests/proton/docsummary/docsummary.cpp b/searchcore/src/tests/proton/docsummary/docsummary.cpp index f492dc44fc6..97a96c4bac6 100644 --- a/searchcore/src/tests/proton/docsummary/docsummary.cpp +++ b/searchcore/src/tests/proton/docsummary/docsummary.cpp @@ -201,38 +201,20 @@ public: _sa() { assert(_mkdirOk); - auto b = std::make_shared<BootstrapConfig>(1, - _documenttypesConfig, - _repo, + auto b = std::make_shared<BootstrapConfig>(1, _documenttypesConfig, _repo, std::make_shared<ProtonConfig>(), std::make_shared<FiledistributorrpcConfig>(), _tuneFileDocumentDB); _configMgr.forwardConfig(b); _configMgr.nextGeneration(0); if (! FastOS_File::MakeDirectory((std::string("tmpdb/") + docTypeName).c_str())) { abort(); } - _ddb.reset(new DocumentDB("tmpdb", - _configMgr.getConfig(), - "tcp/localhost:9013", - _queryLimiter, - _clock, - DocTypeName(docTypeName), - ProtonConfig(), - *this, - _summaryExecutor, - _summaryExecutor, - NULL, - _dummy, - _fileHeaderContext, - ConfigStore::UP(new MemoryConfigStore), - std::make_shared<vespalib:: - ThreadStackExecutor> - (16, 128 * 1024), - _hwInfo)), + _ddb.reset(new DocumentDB("tmpdb", _configMgr.getConfig(), "tcp/localhost:9013", _queryLimiter, _clock, + DocTypeName(docTypeName), ProtonConfig(), *this, _summaryExecutor, _summaryExecutor, + _tls, _dummy, _fileHeaderContext, ConfigStore::UP(new MemoryConfigStore), + std::make_shared<vespalib::ThreadStackExecutor>(16, 128 * 1024), _hwInfo)), _ddb->start(); _ddb->waitForOnlineState(); - _aw = AttributeWriter::UP(new AttributeWriter(_ddb-> - getReadySubDB()-> - getAttributeManager())); + _aw = AttributeWriter::UP(new AttributeWriter(_ddb->getReadySubDB()->getAttributeManager())); _sa = _ddb->getReadySubDB()->getSummaryAdapter(); } ~DBContext() diff --git a/searchcore/src/tests/proton/documentdb/documentdb_test.cpp b/searchcore/src/tests/proton/documentdb/documentdb_test.cpp index b9a04acb8da..157e964ad83 100644 --- a/searchcore/src/tests/proton/documentdb/documentdb_test.cpp +++ b/searchcore/src/tests/proton/documentdb/documentdb_test.cpp @@ -107,22 +107,16 @@ Fixture::Fixture() config::DirSpec spec(TEST_PATH("cfg")); DocumentDBConfigHelper mgr(spec, "typea"); BootstrapConfig::SP - b(new BootstrapConfig(1, - documenttypesConfig, - repo, + b(new BootstrapConfig(1, documenttypesConfig, repo, std::make_shared<ProtonConfig>(), std::make_shared<FiledistributorrpcConfig>(), tuneFileDocumentDB)); mgr.forwardConfig(b); mgr.nextGeneration(0); - _db.reset(new DocumentDB(".", mgr.getConfig(), "tcp/localhost:9014", - _queryLimiter, _clock, DocTypeName("typea"), - ProtonConfig(), - _myDBOwner, _summaryExecutor, _summaryExecutor, NULL, _dummy, _fileHeaderContext, - ConfigStore::UP(new MemoryConfigStore), - std::make_shared<vespalib::ThreadStackExecutor> - (16, 128 * 1024), - _hwInfo)); + _db.reset(new DocumentDB(".", mgr.getConfig(), "tcp/localhost:9014", _queryLimiter, _clock, DocTypeName("typea"), + ProtonConfig(), _myDBOwner, _summaryExecutor, _summaryExecutor, _tls, _dummy, + _fileHeaderContext, ConfigStore::UP(new MemoryConfigStore), + std::make_shared<vespalib::ThreadStackExecutor>(16, 128 * 1024), _hwInfo)); _db->start(); _db->waitForOnlineState(); } diff --git a/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp b/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp index 1ccbff6e6cf..39a588f804d 100644 --- a/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp +++ b/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp @@ -411,12 +411,10 @@ struct MyTlsWriter : TlsWriter { bool erase_return; MyTlsWriter() : store_count(0), erase_count(0), erase_return(true) {} - virtual void storeOperation(const FeedOperation &) override { ++store_count; } - virtual bool erase(SerialNum) override { ++erase_count; return erase_return; } + void storeOperation(const FeedOperation &) override { ++store_count; } + bool erase(SerialNum) override { ++erase_count; return erase_return; } - virtual SerialNum - sync(SerialNum syncTo) override - { + SerialNum sync(SerialNum syncTo) override { return syncTo; } }; @@ -452,7 +450,7 @@ struct FeedHandlerFixture _bucketDB(), _bucketDBHandler(_bucketDB), handler(writeService, tlsSpec, schema.getDocType(), - feedMetrics._feed, _state, owner, writeFilter, replayConfig, NULL, &tls_writer) + feedMetrics._feed, _state, owner, writeFilter, replayConfig, tls, &tls_writer) { _state.enterLoadState(); _state.enterReplayTransactionLogState(); @@ -544,8 +542,7 @@ addLidToRemove(RemoveDocumentsOperation &op) TEST_F("require that handleMove calls FeedView", FeedHandlerFixture) { DocumentContext doc_context("doc:test:foo", *f.schema.builder); - MoveOperation op(doc_context.bucketId, Timestamp(2), doc_context.doc, - DbDocumentId(0, 2), 1); + MoveOperation op(doc_context.bucketId, Timestamp(2), doc_context.doc, DbDocumentId(0, 2), 1); op.setDbDocumentId(DbDocumentId(1, 2)); f.runAsMaster([&]() { f.handler.handleMove(op, IDestructorCallback::SP()); }); EXPECT_EQUAL(1, f.feedView.move_count); diff --git a/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp b/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp index 024de835d7a..756d710a07a 100644 --- a/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp @@ -90,7 +90,7 @@ DocumentDB::DocumentDB(const vespalib::string &baseDir, IDocumentDBOwner & owner, vespalib::ThreadExecutor & warmupExecutor, vespalib::ThreadStackExecutorBase & summaryExecutor, - search::transactionlog::Writer * tlsDirectWriter, + search::transactionlog::Writer & tlsDirectWriter, MetricsWireService &metricsWireService, const FileHeaderContext &fileHeaderContext, ConfigStore::UP config_store, @@ -136,31 +136,12 @@ DocumentDB::DocumentDB(const vespalib::string &baseDir, _state(), _dmUsageForwarder(_writeService.master()), _writeFilter(), - _feedHandler(_writeService, - tlsSpec, - docTypeName, + _feedHandler(_writeService, tlsSpec, docTypeName, getMetricsCollection().getLegacyMetrics().feed, - _state, - *this, - _writeFilter, - *this, - tlsDirectWriter), - _subDBs(*this, - *this, - _feedHandler, - _docTypeName, - _writeService, - warmupExecutor, - summaryExecutor, - fileHeaderContext, - metricsWireService, - getMetricsCollection(), - queryLimiter, - clock, - _configMutex, - _baseDir, - protonCfg, - hwInfo), + _state, *this, _writeFilter, *this, tlsDirectWriter), + _subDBs(*this, *this, _feedHandler, _docTypeName, _writeService, warmupExecutor, + summaryExecutor, fileHeaderContext, metricsWireService, getMetricsCollection(), + queryLimiter, clock, _configMutex, _baseDir, protonCfg, hwInfo), _maintenanceController(_writeService.master(), summaryExecutor, _docTypeName), _visibility(_feedHandler, _writeService, _feedView), _lidSpaceCompactionHandlers(), diff --git a/searchcore/src/vespa/searchcore/proton/server/documentdb.h b/searchcore/src/vespa/searchcore/proton/server/documentdb.h index d06ce9050b7..a3f99148e7c 100644 --- a/searchcore/src/vespa/searchcore/proton/server/documentdb.h +++ b/searchcore/src/vespa/searchcore/proton/server/documentdb.h @@ -242,7 +242,7 @@ public: IDocumentDBOwner & owner, vespalib::ThreadExecutor & warmupExecutor, vespalib::ThreadStackExecutorBase & summaryExecutor, - search::transactionlog::Writer * tlsDirectWriter, + search::transactionlog::Writer & tlsDirectWriter, MetricsWireService &metricsWireService, const search::common::FileHeaderContext &fileHeaderContext, ConfigStore::UP config_store, diff --git a/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp b/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp index cbfdb4895cc..0ed3de93965 100644 --- a/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp @@ -6,6 +6,7 @@ #include "i_feed_handler_owner.h" #include "ifeedview.h" #include "tlcproxy.h" +#include "configstore.h" #include <vespa/document/datatype/documenttype.h> #include <vespa/documentapi/messagebus/documentprotocol.h> #include <vespa/documentapi/messagebus/messages/documentreply.h> @@ -82,7 +83,7 @@ ignoreOperation(const DocumentOperation &op) void FeedHandler::TlsMgrWriter::storeOperation(const FeedOperation &op) { - TlcProxy(*_tls_mgr.getSession(), _tlsDirectWriter).storeOperation(op); + TlcProxy(_tls_mgr.getDomainName(), *_tlsDirectWriter).storeOperation(op); } bool FeedHandler::TlsMgrWriter::erase(SerialNum oldest_to_keep) { return _tls_mgr.getSession()->erase(oldest_to_keep); @@ -466,8 +467,7 @@ FeedHandler::changeFeedState(FeedState::SP newState) void -FeedHandler::changeFeedState(FeedState::SP newState, - const vespalib::LockGuard &) +FeedHandler::changeFeedState(FeedState::SP newState, const vespalib::LockGuard &) { LOG(debug, "Change feed state from '%s' -> '%s'", @@ -484,8 +484,8 @@ FeedHandler::FeedHandler(IThreadingService &writeService, IFeedHandlerOwner &owner, const IResourceWriteFilter &writeFilter, IReplayConfig &replayConfig, - search::transactionlog::Writer *tlsDirectWriter, - TlsWriter *tls_writer) + search::transactionlog::Writer & tlsDirectWriter, + TlsWriter * tlsWriter) : search::transactionlog::TransLogClient::Session::Callback(), IDocumentMoveHandler(), IPruneRemovedDocumentsHandler(), @@ -499,8 +499,8 @@ FeedHandler::FeedHandler(IThreadingService &writeService, _writeFilter(writeFilter), _replayConfig(replayConfig), _tlsMgr(tlsSpec, docTypeName.getName()), - _tlsMgrWriter(_tlsMgr, tlsDirectWriter), - _tlsWriter(tls_writer ? *tls_writer : _tlsMgrWriter), + _tlsMgrWriter(_tlsMgr, &tlsDirectWriter), + _tlsWriter(tlsWriter ? *tlsWriter : _tlsMgrWriter), _tlsReplayProgress(), _serialNum(0), _prunedSerialNum(0), @@ -830,5 +830,4 @@ FeedHandler::storeRemoteOperation(const FeedOperation &op) } } - } // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/server/feedhandler.h b/searchcore/src/vespa/searchcore/proton/server/feedhandler.h index 71f1a47050f..32ff8f5d690 100644 --- a/searchcore/src/vespa/searchcore/proton/server/feedhandler.h +++ b/searchcore/src/vespa/searchcore/proton/server/feedhandler.h @@ -72,11 +72,9 @@ private: _tls_mgr(tls_mgr), _tlsDirectWriter(tlsDirectWriter) { } - virtual void storeOperation(const FeedOperation &op) override; - virtual bool erase(SerialNum oldest_to_keep) override; - - virtual SerialNum - sync(SerialNum syncTo) override; + void storeOperation(const FeedOperation &op) override; + bool erase(SerialNum oldest_to_keep) override; + SerialNum sync(SerialNum syncTo) override; }; typedef searchcorespi::index::IThreadingService IThreadingService; @@ -126,10 +124,7 @@ private: void performRemove(FeedTokenUP token, RemoveOperation &op); private: void performGarbageCollect(FeedTokenUP token); - - void - performCreateBucket(FeedTokenUP token, CreateBucketOperation &op); - + void performCreateBucket(FeedTokenUP token, CreateBucketOperation &op); void performDeleteBucket(FeedTokenUP token, DeleteBucketOperation &op); void performSplit(FeedTokenUP token, SplitBucketOperation &op); void performJoin(FeedTokenUP token, JoinBucketsOperation &op); @@ -186,8 +181,8 @@ public: IFeedHandlerOwner &owner, const IResourceWriteFilter &writerFilter, IReplayConfig &replayConfig, - search::transactionlog::Writer *writer, - TlsWriter *tlsWriter = NULL); + search::transactionlog::Writer & writer, + TlsWriter * tlsWriter = nullptr); virtual ~FeedHandler(); diff --git a/searchcore/src/vespa/searchcore/proton/server/proton.cpp b/searchcore/src/vespa/searchcore/proton/server/proton.cpp index 664808acbc0..38ec899051d 100644 --- a/searchcore/src/vespa/searchcore/proton/server/proton.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/proton.cpp @@ -551,7 +551,7 @@ Proton::addDocumentDB(const document::DocumentType &docType, *this, *_warmupExecutor, *_summaryExecutor, - _tls->getTransLogServer().get(), + *_tls->getTransLogServer(), *_metricsEngine, _fileHeaderContext, std::move(config_store), diff --git a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp index 290b7b74e87..a445f62f630 100644 --- a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp @@ -8,13 +8,13 @@ #include "storeonlyfeedview.h" #include "updatedonecontext.h" #include "remove_batch_done_context.h" -#include <vespa/document/datatype/documenttype.h> #include <vespa/searchcore/proton/common/commit_time_tracker.h> #include <vespa/searchcore/proton/common/feedtoken.h> #include <vespa/searchcore/proton/documentmetastore/ilidreusedelayer.h> #include <vespa/searchcore/proton/metrics/feed_metrics.h> #include <vespa/searchcore/proton/reference/i_gid_to_lid_change_handler.h> -#include <vespa/searchlib/common/scheduletaskcallback.h> +#include <vespa/document/datatype/documenttype.h> +#include <vespa/document/fieldvalue/document.h> #include <vespa/vespalib/util/exceptions.h> #include <vespa/log/log.h> diff --git a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h index fbc8888ac79..021c2b2f8f7 100644 --- a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h +++ b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h @@ -7,7 +7,6 @@ #include "isummaryadapter.h" #include "replaypacketdispatcher.h" #include "searchcontext.h" -#include "tlcproxy.h" #include "pendinglidtracker.h" #include <vespa/searchcore/proton/common/doctypename.h> #include <vespa/searchcore/proton/common/feeddebugger.h> @@ -61,10 +60,13 @@ public: using OnPutDoneType = const std::shared_ptr<PutDoneContext> &; using OnRemoveDoneType = const std::shared_ptr<RemoveDoneContext> &; using FeedTokenUP = std::unique_ptr<FeedToken>; - using FutureDoc = std::future<Document::UP>; - using PromisedDoc = std::promise<Document::UP>; + using FutureDoc = std::future<std::unique_ptr<Document>>; + using PromisedDoc = std::promise<std::unique_ptr<Document>>; using FutureStream = std::future<vespalib::nbostream>; using PromisedStream = std::promise<vespalib::nbostream>; + using DocumentSP = std::shared_ptr<Document>; + using DocumentUpdateSP = std::shared_ptr<DocumentUpdate>; + using Lid = search::DocumentIdT; struct Context @@ -157,7 +159,7 @@ private: return _writeService.summary(); } void putSummary(SerialNum serialNum, Lid lid, FutureStream doc, OnOperationDoneType onDone); - void putSummary(SerialNum serialNum, Lid lid, Document::SP doc, OnOperationDoneType onDone); + void putSummary(SerialNum serialNum, Lid lid, DocumentSP doc, OnOperationDoneType onDone); void removeSummary(SerialNum serialNum, Lid lid, OnWriteDoneType onDone); void heartBeatSummary(SerialNum serialNum); @@ -187,7 +189,7 @@ private: // Ack token early if visibility delay is nonzero void considerEarlyAck(FeedTokenUP &token, FeedOperation::Type opType); - void makeUpdatedDocument(SerialNum serialNum, Lid lid, DocumentUpdate::SP upd, + void makeUpdatedDocument(SerialNum serialNum, Lid lid, DocumentUpdateSP upd, OnOperationDoneType onWriteDone,PromisedDoc promisedDoc, PromisedStream promisedStream); protected: @@ -199,7 +201,7 @@ private: virtual void putAttributes(SerialNum serialNum, Lid lid, const Document &doc, bool immediateCommit, OnPutDoneType onWriteDone); - virtual void putIndexedFields(SerialNum serialNum, Lid lid, const Document::SP &newDoc, + virtual void putIndexedFields(SerialNum serialNum, Lid lid, const DocumentSP &newDoc, bool immediateCommit, OnOperationDoneType onWriteDone); virtual UpdateScope getUpdateScope(const DocumentUpdate &upd); diff --git a/searchcore/src/vespa/searchcore/proton/server/tlcproxy.cpp b/searchcore/src/vespa/searchcore/proton/server/tlcproxy.cpp index c377be9f73d..215650b6664 100644 --- a/searchcore/src/vespa/searchcore/proton/server/tlcproxy.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/tlcproxy.cpp @@ -1,7 +1,7 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "tlcproxy.h" -#include <vespa/vespalib/util/exceptions.h> +#include <vespa/searchcore/proton/feedoperation/feedoperation.h> #include <vespa/log/log.h> LOG_SETUP(".proton.server.tlcproxy"); @@ -17,16 +17,8 @@ void TlcProxy::commit(search::SerialNum serialNum, search::transactionlog::Type Packet packet; packet.add(entry); packet.close(); - if (_tlsDirectWriter != NULL) { - _tlsDirectWriter->commit(_session.getDomain(), packet); - } else { - if (!_session.commit(vespalib::ConstBufferRef(packet.getHandle().c_str(), packet.getHandle().size()))) { - throw vespalib::IllegalStateException(vespalib::make_string( - "Failed to commit packet %" PRId64 - " to TLS (type = %d, size = %d).", - entry.serial(), type, (uint32_t)buf.size())); - } - } + _tlsDirectWriter.commit(_domain, packet); + } void diff --git a/searchcore/src/vespa/searchcore/proton/server/tlcproxy.h b/searchcore/src/vespa/searchcore/proton/server/tlcproxy.h index ccd870e18a1..8e4feb2f354 100644 --- a/searchcore/src/vespa/searchcore/proton/server/tlcproxy.h +++ b/searchcore/src/vespa/searchcore/proton/server/tlcproxy.h @@ -1,27 +1,22 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #pragma once -#include <vespa/document/fieldvalue/document.h> -#include <vespa/document/update/documentupdate.h> -#include <vespa/searchcore/proton/feedoperation/feedoperation.h> -#include <vespa/searchlib/query/base.h> -#include <vespa/searchlib/common/serialnum.h> -#include <vespa/searchlib/transactionlog/translogclient.h> -#include "fileconfigmanager.h" -#include <persistence/spi/types.h> +#include <vespa/searchlib/transactionlog/common.h> namespace proton { +class FeedOperation; + class TlcProxy { - search::transactionlog::TransLogClient::Session & _session; - search::transactionlog::Writer * _tlsDirectWriter; + vespalib::string _domain; + search::transactionlog::Writer & _tlsDirectWriter; - void commit( search::SerialNum serialNum, search::transactionlog::Type type, const vespalib::nbostream &buf); + void commit(search::SerialNum serialNum, search::transactionlog::Type type, const vespalib::nbostream &buf); public: typedef std::unique_ptr<TlcProxy> UP; - TlcProxy(search::transactionlog::TransLogClient::Session &session, search::transactionlog::Writer * writer = NULL) - : _session(session), _tlsDirectWriter(writer) {} + TlcProxy(const vespalib::string & domain, search::transactionlog::Writer & writer) + : _domain(domain), _tlsDirectWriter(writer) {} void storeOperation(const FeedOperation &op); }; diff --git a/searchcore/src/vespa/searchcore/proton/server/tlswriter.h b/searchcore/src/vespa/searchcore/proton/server/tlswriter.h index da55e3d8590..0956c0ae011 100644 --- a/searchcore/src/vespa/searchcore/proton/server/tlswriter.h +++ b/searchcore/src/vespa/searchcore/proton/server/tlswriter.h @@ -5,6 +5,7 @@ #include <vespa/searchlib/common/serialnum.h> namespace proton { + class FeedOperation; /** @@ -15,10 +16,7 @@ struct TlsWriter { virtual void storeOperation(const FeedOperation &op) = 0; virtual bool erase(search::SerialNum oldest_to_keep) = 0; - - virtual search::SerialNum - sync(search::SerialNum syncTo) = 0; + virtual search::SerialNum sync(search::SerialNum syncTo) = 0; }; -} // namespace proton - +} |