summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--container-core/src/main/java/com/yahoo/jdisc/http/server/jetty/ServletOutputStreamWriter.java12
-rw-r--r--searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp37
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/documentdb.cpp44
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/documentdb.h1
-rwxr-xr-xvespamalloc/src/tests/thread/thread_test.sh4
5 files changed, 58 insertions, 40 deletions
diff --git a/container-core/src/main/java/com/yahoo/jdisc/http/server/jetty/ServletOutputStreamWriter.java b/container-core/src/main/java/com/yahoo/jdisc/http/server/jetty/ServletOutputStreamWriter.java
index 44daec42b88..4b66715fcf7 100644
--- a/container-core/src/main/java/com/yahoo/jdisc/http/server/jetty/ServletOutputStreamWriter.java
+++ b/container-core/src/main/java/com/yahoo/jdisc/http/server/jetty/ServletOutputStreamWriter.java
@@ -76,6 +76,7 @@ class ServletOutputStreamWriter {
void writeBuffer(ByteBuffer buf, CompletionHandler handler) {
boolean thisThreadShouldWrite = false;
+ Throwable registrationFailure = null;
synchronized (monitor) {
if (state == State.FINISHED_OR_ERROR) {
@@ -85,8 +86,12 @@ class ServletOutputStreamWriter {
responseContentQueue.addLast(new ResponseContentPart(buf, handler));
switch (state) {
case NOT_STARTED:
- state = State.WAITING_FOR_WRITE_POSSIBLE_CALLBACK;
- outputStream.setWriteListener(writeListener);
+ try {
+ outputStream.setWriteListener(writeListener);
+ state = State.WAITING_FOR_WRITE_POSSIBLE_CALLBACK;
+ } catch (Throwable t) {
+ registrationFailure = t;
+ }
break;
case WAITING_FOR_WRITE_POSSIBLE_CALLBACK:
case WRITING_BUFFERS:
@@ -99,6 +104,9 @@ class ServletOutputStreamWriter {
throw new IllegalStateException("Invalid state " + state);
}
}
+ if (registrationFailure != null) {
+ setFinished(registrationFailure);
+ }
if (thisThreadShouldWrite) {
writeBuffersInQueueToOutputStream();
diff --git a/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp b/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp
index 587ec8dda1f..f3d90d37e42 100644
--- a/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp
+++ b/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp
@@ -14,6 +14,8 @@
#include <vespa/searchlib/attribute/imported_attribute_vector.h>
#include <vespa/searchlib/tensor/prepare_result.h>
#include <vespa/vespalib/stllike/hash_map.hpp>
+#include <vespa/vespalib/util/destructor_callbacks.h>
+#include <vespa/vespalib/util/gate.h>
#include <vespa/vespalib/util/idestructorcallback.h>
#include <vespa/vespalib/util/threadexecutor.h>
#include <future>
@@ -27,6 +29,7 @@ using namespace search;
using ExecutorId = vespalib::ISequencedTaskExecutor::ExecutorId;
using search::attribute::ImportedAttributeVector;
using search::tensor::PrepareResult;
+using vespalib::GateCallback;
using vespalib::ISequencedTaskExecutor;
namespace proton {
@@ -821,24 +824,38 @@ AttributeWriter::forceCommit(const CommitParam & param, OnWriteDoneType onWriteD
void
AttributeWriter::onReplayDone(uint32_t docIdLimit)
{
- for (auto entry : _attrMap) {
- _attributeFieldWriter.execute(entry.second.executor_id,
- [docIdLimit, attr = entry.second.attribute]()
- { applyReplayDone(docIdLimit, *attr); });
+ vespalib::Gate gate;
+ {
+ auto on_write_done = std::make_shared<GateCallback>(gate);
+ for (auto entry : _attrMap) {
+ _attributeFieldWriter.execute(entry.second.executor_id,
+ [docIdLimit, attr = entry.second.attribute, on_write_done]()
+ {
+ (void) on_write_done;
+ applyReplayDone(docIdLimit, *attr);
+ });
+ }
}
- _attributeFieldWriter.sync_all();
+ gate.await();
}
void
AttributeWriter::compactLidSpace(uint32_t wantedLidLimit, SerialNum serialNum)
{
- for (auto entry : _attrMap) {
- _attributeFieldWriter.execute(entry.second.executor_id,
- [wantedLidLimit, serialNum, attr=entry.second.attribute]()
- { applyCompactLidSpace(wantedLidLimit, serialNum, *attr); });
+ vespalib::Gate gate;
+ {
+ auto on_write_done = std::make_shared<GateCallback>(gate);
+ for (auto entry : _attrMap) {
+ _attributeFieldWriter.execute(entry.second.executor_id,
+ [wantedLidLimit, serialNum, attr=entry.second.attribute, on_write_done]()
+ {
+ (void) on_write_done;
+ applyCompactLidSpace(wantedLidLimit, serialNum, *attr);
+ });
+ }
}
- _attributeFieldWriter.sync_all();
+ gate.await();
}
bool
diff --git a/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp b/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp
index ccfdb3b9b36..3f2fb6c4634 100644
--- a/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp
@@ -62,6 +62,8 @@ using storage::spi::Timestamp;
using search::common::FileHeaderContext;
using proton::initializer::InitializerTask;
using proton::initializer::TaskRunner;
+using vespalib::GateCallback;
+using vespalib::IDestructorCallback;
using vespalib::makeLambdaTask;
using searchcorespi::IFlushTarget;
@@ -186,7 +188,6 @@ DocumentDB::DocumentDB(const vespalib::string &baseDir,
_metricsHook(std::make_unique<MetricsUpdateHook>(*this)),
_feedView(),
_refCount(),
- _syncFeedViewEnabled(false),
_owner(owner),
_bucketExecutor(bucketExecutor),
_state(),
@@ -313,9 +314,9 @@ void
DocumentDB::initFinish(DocumentDBConfig::SP configSnapshot)
{
// Called by executor thread
+ assert(_writeService.master().isCurrentThread());
_bucketHandler.setReadyBucketHandler(_subDBs.getReadySubDB()->getDocumentMetaStoreContext().get());
_subDBs.initViews(*configSnapshot, _sessionManager);
- _syncFeedViewEnabled = true;
syncFeedView();
// Check that feed view has been activated.
assert(_feedView.get());
@@ -376,9 +377,13 @@ void
DocumentDB::enterOnlineState()
{
// Called by executor thread
- // Ensure that all replayed operations are committed to memory structures
- _feedView.get()->forceCommit(CommitParam(_feedHandler->getSerialNum()));
- _writeService.sync_all_executors();
+ assert(_writeService.master().isCurrentThread());
+ {
+ vespalib::Gate gate;
+ // Ensure that all replayed operations are committed to memory structures
+ _feedView.get()->forceCommit(CommitParam(_feedHandler->getSerialNum()), std::make_shared<GateCallback>(gate));
+ gate.await();
+ }
(void) _state.enterOnlineState();
// Consider delayed pruning of transaction log and config history
@@ -464,10 +469,11 @@ DocumentDB::applyConfig(DocumentDBConfig::SP configSnapshot, SerialNum serialNum
}
{
bool elidedConfigSave = equalReplayConfig && tlsReplayDone;
+ vespalib::Gate gate;
// Flush changes to attributes and memory index, cf. visibilityDelay
_feedView.get()->forceCommit(CommitParam(elidedConfigSave ? serialNum : serialNum - 1),
- std::make_shared<vespalib::KeepAlive<FeedHandler::CommitResult>>(std::move(commit_result)));
- _writeService.sync_all_executors();
+ std::make_shared<vespalib::KeepAlive<std::pair<FeedHandler::CommitResult, std::shared_ptr<IDestructorCallback>>>>(std::make_pair(std::move(commit_result), std::make_shared<GateCallback>(gate))));
+ gate.await();
}
if (params.shouldMaintenanceControllerChange()) {
_maintenanceController.killJobs();
@@ -510,20 +516,11 @@ DocumentDB::applyConfig(DocumentDBConfig::SP configSnapshot, SerialNum serialNum
}
-namespace {
-void
-doNothing(IFeedView::SP)
-{
- // Called by index executor, delays when feed view is dropped.
-}
-} // namespace
-
void
DocumentDB::performDropFeedView(IFeedView::SP feedView)
{
- // Called by executor task, delays when feed view is dropped.
- // Also called by DocumentDB::receive() method to keep feed view alive
-
+ // Delays when feed view is dropped.
+ assert(_writeService.master().isCurrentThread());
_writeService.attributeFieldWriter().sync_all();
_writeService.summary().sync();
@@ -534,11 +531,11 @@ DocumentDB::performDropFeedView(IFeedView::SP feedView)
void
DocumentDB::performDropFeedView2(IFeedView::SP feedView) {
- // Called by executor task, delays when feed view is dropped.
- // Also called by DocumentDB::receive() method to keep feed view alive
+ // Delays when feed view is dropped.
+ assert(_writeService.index().isCurrentThread());
_writeService.indexFieldInverter().sync_all();
_writeService.indexFieldWriter().sync_all();
- masterExecute([feedView]() { doNothing(feedView); });
+ masterExecute([feedView]() { (void) feedView; });
}
@@ -912,10 +909,7 @@ DocumentDB::getActiveGeneration() const {
void
DocumentDB::syncFeedView()
{
- // Called by executor or while in rendezvous with executor
-
- if (!_syncFeedViewEnabled)
- return;
+ assert(_writeService.master().isCurrentThread());
IFeedView::SP oldFeedView(_feedView.get());
IFeedView::SP newFeedView(_subDBs.getFeedView());
diff --git a/searchcore/src/vespa/searchcore/proton/server/documentdb.h b/searchcore/src/vespa/searchcore/proton/server/documentdb.h
index 014bba11f83..6b855cd40a8 100644
--- a/searchcore/src/vespa/searchcore/proton/server/documentdb.h
+++ b/searchcore/src/vespa/searchcore/proton/server/documentdb.h
@@ -112,7 +112,6 @@ private:
std::unique_ptr<metrics::UpdateHook> _metricsHook;
vespalib::VarHolder<IFeedView::SP> _feedView;
vespalib::MonitoredRefCount _refCount;
- bool _syncFeedViewEnabled;
IDocumentDBOwner &_owner;
storage::spi::BucketExecutor &_bucketExecutor;
DDBState _state;
diff --git a/vespamalloc/src/tests/thread/thread_test.sh b/vespamalloc/src/tests/thread/thread_test.sh
index 45734bab3a7..cf92f8eb0fc 100755
--- a/vespamalloc/src/tests/thread/thread_test.sh
+++ b/vespamalloc/src/tests/thread/thread_test.sh
@@ -4,8 +4,8 @@ set -e
echo "Trying to find limit for processes:"
if ulimit -u; then
- echo "Fixing limit to 31100"
- ulimit -u 31100
+ echo "Fixing limit to 14100"
+ ulimit -u 14100
elif [ "$RETRYEXEC" ]; then
echo "Already tried to re-exec script, giving up."
exit 1