diff options
5 files changed, 58 insertions, 40 deletions
diff --git a/container-core/src/main/java/com/yahoo/jdisc/http/server/jetty/ServletOutputStreamWriter.java b/container-core/src/main/java/com/yahoo/jdisc/http/server/jetty/ServletOutputStreamWriter.java index 44daec42b88..4b66715fcf7 100644 --- a/container-core/src/main/java/com/yahoo/jdisc/http/server/jetty/ServletOutputStreamWriter.java +++ b/container-core/src/main/java/com/yahoo/jdisc/http/server/jetty/ServletOutputStreamWriter.java @@ -76,6 +76,7 @@ class ServletOutputStreamWriter { void writeBuffer(ByteBuffer buf, CompletionHandler handler) { boolean thisThreadShouldWrite = false; + Throwable registrationFailure = null; synchronized (monitor) { if (state == State.FINISHED_OR_ERROR) { @@ -85,8 +86,12 @@ class ServletOutputStreamWriter { responseContentQueue.addLast(new ResponseContentPart(buf, handler)); switch (state) { case NOT_STARTED: - state = State.WAITING_FOR_WRITE_POSSIBLE_CALLBACK; - outputStream.setWriteListener(writeListener); + try { + outputStream.setWriteListener(writeListener); + state = State.WAITING_FOR_WRITE_POSSIBLE_CALLBACK; + } catch (Throwable t) { + registrationFailure = t; + } break; case WAITING_FOR_WRITE_POSSIBLE_CALLBACK: case WRITING_BUFFERS: @@ -99,6 +104,9 @@ class ServletOutputStreamWriter { throw new IllegalStateException("Invalid state " + state); } } + if (registrationFailure != null) { + setFinished(registrationFailure); + } if (thisThreadShouldWrite) { writeBuffersInQueueToOutputStream(); diff --git a/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp b/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp index 587ec8dda1f..f3d90d37e42 100644 --- a/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp +++ b/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp @@ -14,6 +14,8 @@ #include <vespa/searchlib/attribute/imported_attribute_vector.h> #include <vespa/searchlib/tensor/prepare_result.h> #include <vespa/vespalib/stllike/hash_map.hpp> +#include <vespa/vespalib/util/destructor_callbacks.h> +#include <vespa/vespalib/util/gate.h> #include <vespa/vespalib/util/idestructorcallback.h> #include <vespa/vespalib/util/threadexecutor.h> #include <future> @@ -27,6 +29,7 @@ using namespace search; using ExecutorId = vespalib::ISequencedTaskExecutor::ExecutorId; using search::attribute::ImportedAttributeVector; using search::tensor::PrepareResult; +using vespalib::GateCallback; using vespalib::ISequencedTaskExecutor; namespace proton { @@ -821,24 +824,38 @@ AttributeWriter::forceCommit(const CommitParam & param, OnWriteDoneType onWriteD void AttributeWriter::onReplayDone(uint32_t docIdLimit) { - for (auto entry : _attrMap) { - _attributeFieldWriter.execute(entry.second.executor_id, - [docIdLimit, attr = entry.second.attribute]() - { applyReplayDone(docIdLimit, *attr); }); + vespalib::Gate gate; + { + auto on_write_done = std::make_shared<GateCallback>(gate); + for (auto entry : _attrMap) { + _attributeFieldWriter.execute(entry.second.executor_id, + [docIdLimit, attr = entry.second.attribute, on_write_done]() + { + (void) on_write_done; + applyReplayDone(docIdLimit, *attr); + }); + } } - _attributeFieldWriter.sync_all(); + gate.await(); } void AttributeWriter::compactLidSpace(uint32_t wantedLidLimit, SerialNum serialNum) { - for (auto entry : _attrMap) { - _attributeFieldWriter.execute(entry.second.executor_id, - [wantedLidLimit, serialNum, attr=entry.second.attribute]() - { applyCompactLidSpace(wantedLidLimit, serialNum, *attr); }); + vespalib::Gate gate; + { + auto on_write_done = std::make_shared<GateCallback>(gate); + for (auto entry : _attrMap) { + _attributeFieldWriter.execute(entry.second.executor_id, + [wantedLidLimit, serialNum, attr=entry.second.attribute, on_write_done]() + { + (void) on_write_done; + applyCompactLidSpace(wantedLidLimit, serialNum, *attr); + }); + } } - _attributeFieldWriter.sync_all(); + gate.await(); } bool diff --git a/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp b/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp index ccfdb3b9b36..3f2fb6c4634 100644 --- a/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp @@ -62,6 +62,8 @@ using storage::spi::Timestamp; using search::common::FileHeaderContext; using proton::initializer::InitializerTask; using proton::initializer::TaskRunner; +using vespalib::GateCallback; +using vespalib::IDestructorCallback; using vespalib::makeLambdaTask; using searchcorespi::IFlushTarget; @@ -186,7 +188,6 @@ DocumentDB::DocumentDB(const vespalib::string &baseDir, _metricsHook(std::make_unique<MetricsUpdateHook>(*this)), _feedView(), _refCount(), - _syncFeedViewEnabled(false), _owner(owner), _bucketExecutor(bucketExecutor), _state(), @@ -313,9 +314,9 @@ void DocumentDB::initFinish(DocumentDBConfig::SP configSnapshot) { // Called by executor thread + assert(_writeService.master().isCurrentThread()); _bucketHandler.setReadyBucketHandler(_subDBs.getReadySubDB()->getDocumentMetaStoreContext().get()); _subDBs.initViews(*configSnapshot, _sessionManager); - _syncFeedViewEnabled = true; syncFeedView(); // Check that feed view has been activated. assert(_feedView.get()); @@ -376,9 +377,13 @@ void DocumentDB::enterOnlineState() { // Called by executor thread - // Ensure that all replayed operations are committed to memory structures - _feedView.get()->forceCommit(CommitParam(_feedHandler->getSerialNum())); - _writeService.sync_all_executors(); + assert(_writeService.master().isCurrentThread()); + { + vespalib::Gate gate; + // Ensure that all replayed operations are committed to memory structures + _feedView.get()->forceCommit(CommitParam(_feedHandler->getSerialNum()), std::make_shared<GateCallback>(gate)); + gate.await(); + } (void) _state.enterOnlineState(); // Consider delayed pruning of transaction log and config history @@ -464,10 +469,11 @@ DocumentDB::applyConfig(DocumentDBConfig::SP configSnapshot, SerialNum serialNum } { bool elidedConfigSave = equalReplayConfig && tlsReplayDone; + vespalib::Gate gate; // Flush changes to attributes and memory index, cf. visibilityDelay _feedView.get()->forceCommit(CommitParam(elidedConfigSave ? serialNum : serialNum - 1), - std::make_shared<vespalib::KeepAlive<FeedHandler::CommitResult>>(std::move(commit_result))); - _writeService.sync_all_executors(); + std::make_shared<vespalib::KeepAlive<std::pair<FeedHandler::CommitResult, std::shared_ptr<IDestructorCallback>>>>(std::make_pair(std::move(commit_result), std::make_shared<GateCallback>(gate)))); + gate.await(); } if (params.shouldMaintenanceControllerChange()) { _maintenanceController.killJobs(); @@ -510,20 +516,11 @@ DocumentDB::applyConfig(DocumentDBConfig::SP configSnapshot, SerialNum serialNum } -namespace { -void -doNothing(IFeedView::SP) -{ - // Called by index executor, delays when feed view is dropped. -} -} // namespace - void DocumentDB::performDropFeedView(IFeedView::SP feedView) { - // Called by executor task, delays when feed view is dropped. - // Also called by DocumentDB::receive() method to keep feed view alive - + // Delays when feed view is dropped. + assert(_writeService.master().isCurrentThread()); _writeService.attributeFieldWriter().sync_all(); _writeService.summary().sync(); @@ -534,11 +531,11 @@ DocumentDB::performDropFeedView(IFeedView::SP feedView) void DocumentDB::performDropFeedView2(IFeedView::SP feedView) { - // Called by executor task, delays when feed view is dropped. - // Also called by DocumentDB::receive() method to keep feed view alive + // Delays when feed view is dropped. + assert(_writeService.index().isCurrentThread()); _writeService.indexFieldInverter().sync_all(); _writeService.indexFieldWriter().sync_all(); - masterExecute([feedView]() { doNothing(feedView); }); + masterExecute([feedView]() { (void) feedView; }); } @@ -912,10 +909,7 @@ DocumentDB::getActiveGeneration() const { void DocumentDB::syncFeedView() { - // Called by executor or while in rendezvous with executor - - if (!_syncFeedViewEnabled) - return; + assert(_writeService.master().isCurrentThread()); IFeedView::SP oldFeedView(_feedView.get()); IFeedView::SP newFeedView(_subDBs.getFeedView()); diff --git a/searchcore/src/vespa/searchcore/proton/server/documentdb.h b/searchcore/src/vespa/searchcore/proton/server/documentdb.h index 014bba11f83..6b855cd40a8 100644 --- a/searchcore/src/vespa/searchcore/proton/server/documentdb.h +++ b/searchcore/src/vespa/searchcore/proton/server/documentdb.h @@ -112,7 +112,6 @@ private: std::unique_ptr<metrics::UpdateHook> _metricsHook; vespalib::VarHolder<IFeedView::SP> _feedView; vespalib::MonitoredRefCount _refCount; - bool _syncFeedViewEnabled; IDocumentDBOwner &_owner; storage::spi::BucketExecutor &_bucketExecutor; DDBState _state; diff --git a/vespamalloc/src/tests/thread/thread_test.sh b/vespamalloc/src/tests/thread/thread_test.sh index 45734bab3a7..cf92f8eb0fc 100755 --- a/vespamalloc/src/tests/thread/thread_test.sh +++ b/vespamalloc/src/tests/thread/thread_test.sh @@ -4,8 +4,8 @@ set -e echo "Trying to find limit for processes:" if ulimit -u; then - echo "Fixing limit to 31100" - ulimit -u 31100 + echo "Fixing limit to 14100" + ulimit -u 14100 elif [ "$RETRYEXEC" ]; then echo "Already tried to re-exec script, giving up." exit 1 |