diff options
25 files changed, 357 insertions, 68 deletions
diff --git a/config-model/src/main/java/com/yahoo/vespa/model/admin/clustercontroller/ClusterControllerContainer.java b/config-model/src/main/java/com/yahoo/vespa/model/admin/clustercontroller/ClusterControllerContainer.java index e39bddd5594..9a573172ec9 100644 --- a/config-model/src/main/java/com/yahoo/vespa/model/admin/clustercontroller/ClusterControllerContainer.java +++ b/config-model/src/main/java/com/yahoo/vespa/model/admin/clustercontroller/ClusterControllerContainer.java @@ -6,8 +6,10 @@ import com.yahoo.component.ComponentSpecification; import com.yahoo.config.model.producer.AbstractConfigProducer; import com.yahoo.container.BundlesConfig; import com.yahoo.container.bundle.BundleInstantiationSpecification; +import com.yahoo.container.handler.ThreadpoolConfig; import com.yahoo.log.LogLevel; import com.yahoo.osgi.provider.model.ComponentModel; +import com.yahoo.search.config.QrStartConfig; import com.yahoo.vespa.config.content.FleetcontrollerConfig; import static com.yahoo.vespa.defaults.Defaults.getDefaults; import com.yahoo.vespa.model.application.validation.RestartConfigs; @@ -23,7 +25,12 @@ import java.util.TreeSet; * Extends the container producer to allow us to override ports. */ @RestartConfigs({FleetcontrollerConfig.class, ZookeeperServerConfig.class}) -public class ClusterControllerContainer extends Container implements BundlesConfig.Producer, ZookeeperServerConfig.Producer { +public class ClusterControllerContainer extends Container implements + BundlesConfig.Producer, + ZookeeperServerConfig.Producer, + QrStartConfig.Producer, + ThreadpoolConfig.Producer +{ private static final ComponentSpecification CLUSTERCONTROLLER_BUNDLE = new ComponentSpecification("clustercontroller-apps"); private static final ComponentSpecification ZKFACADE_BUNDLE = new ComponentSpecification("zkfacade"); private final int index; @@ -104,6 +111,15 @@ public class ClusterControllerContainer extends Container implements BundlesConf builder.myid(index); } + @Override + public void getConfig(QrStartConfig.Builder builder) { + builder.jvm(new QrStartConfig.Jvm.Builder().heapsize(512)); + } + @Override + public void getConfig(ThreadpoolConfig.Builder builder) { + builder.maxthreads(10); + } + int getIndex() { return index; } diff --git a/config-model/src/test/java/com/yahoo/vespa/model/container/ContainerClusterTest.java b/config-model/src/test/java/com/yahoo/vespa/model/container/ContainerClusterTest.java index c6de6835d49..8ae3c982d81 100755 --- a/config-model/src/test/java/com/yahoo/vespa/model/container/ContainerClusterTest.java +++ b/config-model/src/test/java/com/yahoo/vespa/model/container/ContainerClusterTest.java @@ -10,10 +10,12 @@ import com.yahoo.config.model.test.MockRoot; import com.yahoo.config.provision.Environment; import com.yahoo.config.provision.RegionName; import com.yahoo.config.provision.Zone; +import com.yahoo.container.handler.ThreadpoolConfig; import com.yahoo.container.jdisc.config.MetricDefaultsConfig; import com.yahoo.search.config.QrStartConfig; import com.yahoo.vespa.model.Host; import com.yahoo.vespa.model.HostResource; +import com.yahoo.vespa.model.admin.clustercontroller.ClusterControllerContainer; import com.yahoo.vespa.model.container.docproc.ContainerDocproc; import com.yahoo.vespa.model.container.search.ContainerSearch; import com.yahoo.vespa.model.container.search.searchchain.SearchChains; @@ -130,6 +132,25 @@ public class ContainerClusterTest { container.setJvmArgs(null); verifyJvmArgs(isHosted, hasDocProc, "", container.getJvmArgs()); } + + @Test + public void testClusterControllerResourceUsage() { + boolean isHosted = false; + ContainerCluster cluster = createContainerCluster(isHosted); + addClusterController(cluster, "host-c1"); + assertEquals(1, cluster.getContainers().size()); + ClusterControllerContainer container = (ClusterControllerContainer) cluster.getContainers().get(0); + QrStartConfig.Builder qrBuilder = new QrStartConfig.Builder(); + container.getConfig(qrBuilder); + QrStartConfig qrStartConfig = new QrStartConfig(qrBuilder); + assertEquals(512, qrStartConfig.jvm().heapsize()); + + ThreadpoolConfig.Builder tpBuilder = new ThreadpoolConfig.Builder(); + container.getConfig(tpBuilder); + ThreadpoolConfig threadpoolConfig = new ThreadpoolConfig(tpBuilder); + assertEquals(10, threadpoolConfig.maxthreads()); + } + @Test public void requireThatJvmArgsControlWorksForHostedAndNot() { verifyJvmArgs(true, false); @@ -138,9 +159,6 @@ public class ContainerClusterTest { verifyJvmArgs(false, true); } - private void verifyThatWeCanHandleNull(boolean isHosted) { - - } @Test public void requireThatWeCanhandleNull() { ContainerCluster cluster = createContainerCluster(false); @@ -171,6 +189,13 @@ public class ContainerClusterTest { cluster.addContainer(container); } + private static void addClusterController(ContainerCluster cluster, String hostName) { + Container container = new ClusterControllerContainer(cluster, 1, false); + container.setHostResource(new HostResource(new Host(null, hostName))); + container.initService(); + cluster.addContainer(container); + } + private static ContainerCluster newContainerCluster() { ContainerCluster cluster = new ContainerCluster(null, "subId", "name"); addContainer(cluster, "c1", "host-c1"); diff --git a/container-disc/src/main/sh/vespa-start-container-daemon.sh b/container-disc/src/main/sh/vespa-start-container-daemon.sh index a5c64126322..3afefd6f86d 100755 --- a/container-disc/src/main/sh/vespa-start-container-daemon.sh +++ b/container-disc/src/main/sh/vespa-start-container-daemon.sh @@ -84,7 +84,7 @@ configure_memory() { } configure_numactl() { - echo "debug starting ${VESPA_SERVICE_NAME} for ${VESPA_CONFIG_ID}" + log_message debug "starting ${VESPA_SERVICE_NAME} for ${VESPA_CONFIG_ID}" if numactl --interleave all true &> /dev/null; then # We are allowed to use numactl numnodes=$(numactl --hardware | @@ -94,17 +94,17 @@ configure_numactl() { [ "$numnodes" -gt 1 ] then node=$(($VESPA_AFFINITY_CPU_SOCKET % $numnodes)) - echo "debug with affinity to $VESPA_AFFINITY_CPU_SOCKET out of $numnodes cpu sockets" + log_message debug "with affinity to $VESPA_AFFINITY_CPU_SOCKET out of $numnodes cpu sockets" numactlcmd="numactl --cpunodebind=$node --membind=$node" else - echo "debug with memory interleaving on all nodes" + log_message debug "with memory interleaving on all nodes" numactlcmd="numactl --interleave all" fi else - echo "debug without numactl (no permission or not available)" + log_message debug "without numactl (no permission or not available)" numactlcmd="" fi - echo "debug numactlcmd: $numactlcmd" + log_message debug "numactlcmd: $numactlcmd" } configure_gcopts() { @@ -123,7 +123,7 @@ configure_env_vars() { export ${setting%%=*} ;; *) - echo "warning ignoring invalid qrs_env setting '$setting' from '$qrs_env'" + log_message warning "ignoring invalid qrs_env setting '$setting' from '$qrs_env'" ;; esac done @@ -140,8 +140,103 @@ configure_preload () { export JAVAVM_LD_PRELOAD= unset LD_PRELOAD envcmd="/usr/bin/env" + # trim whitespace: + PRELOAD=${PRELOAD# } + PRELOAD=${PRELOAD% } if [ "$PRELOAD" ]; then envcmd="/usr/bin/env JAVAVM_LD_PRELOAD=$PRELOAD LD_PRELOAD=$PRELOAD" + log_message config "setting up extra preload: $envcmd" + fi +} + +# import VARIABLENAME with default VALUE +import_cfg_var () { + varname=$1 + ret_val=$2 + prefixed_varname="vespa_container__${varname}" + + if varhasvalue $varname ; then + : already set + elif varhasvalue $prefixed_varname ; then + eval "$varname=\${$prefixed_varname}" + else + eval "$varname=\${ret_val}" + fi +} + +exec_jsvc () { + if [ "$jsvc_classpath_pre" ]; then + CP="${jsvc_classpath_pre}:${CP}" + fi + for jf in $jsvc_extra_classpath_libjars ; do + CP="${CP}:${VESPA_HOME}lib/jars/$jf.jar" + done + for jf in $jsvc_extra_classpath_files ; do + CP="${CP}:jf" + done + + PRELOAD="$PRELOAD $jsvc_extra_preload" + configure_preload + exec $numactlcmd $envcmd $jsvc_binary_name \ + -Dconfig.id="${VESPA_CONFIG_ID}" \ + ${jsvc_opts} \ + ${memory_options} \ + ${jvm_gcopts} \ + -XX:MaxJavaStackTraceDepth=-1 \ + -XX:+HeapDumpOnOutOfMemoryError \ + -XX:HeapDumpPath="${VESPA_HOME}var/crash" \ + -XX:OnOutOfMemoryError='kill -9 %p' \ + -Djava.library.path="${VESPA_HOME}lib64" \ + -Djava.awt.headless=true \ + -Djavax.net.ssl.keyStoreType=JKS \ + -Dsun.rmi.dgc.client.gcInterval=3600000 \ + -Dsun.net.client.defaultConnectTimeout=5000 -Dsun.net.client.defaultReadTimeout=60000 \ + -Djdisc.config.file="$cfpfile" \ + -Djdisc.export.packages=${jdisc_export_packages} \ + -Djdisc.cache.path="$bundlecachedir" \ + -Djdisc.debug.resources=false \ + -Djdisc.bundle.path="${VESPA_HOME}lib/jars" \ + -Djdisc.logger.enabled=true \ + -Djdisc.logger.level=ALL \ + -Djdisc.logger.tag="${VESPA_CONFIG_ID}" \ + -Dorg.apache.commons.logging.Log=org.apache.commons.logging.impl.Jdk14Logger \ + -Dvespa.log.control.dir="${VESPA_LOG_CONTROL_DIR}" \ + -Dzookeeperlogfile="${ZOOKEEPER_LOG_FILE}" \ + -Dfile.encoding=UTF-8 \ + -cp "$CP" \ + "$@" \ + com.yahoo.jdisc.core.BootstrapDaemon file:${VESPA_HOME}lib/jars/container-disc-jar-with-dependencies.jar +} + +maybe_use_jsvc () { + import_cfg_var use_jsvc "false" + + # if configured, run JSVC aka commons.daemon + if [ "$use_jsvc" = "true" ]; then + import_cfg_var jsvc_binary_name "jsvc" + import_cfg_var jsvc_extra_preload + + import_cfg_var jsvc_use_pidfile "false" + + import_cfg_var jsvc_classpath_pre + import_cfg_var jsvc_extra_classpath_libjars + import_cfg_var jsvc_extra_classpath_files + import_cfg_var jsvc_ipv6opts + import_cfg_var jsvc_extra_opts + import_cfg_var jsvc_normal_opts + import_cfg_var jsvc_java_home_opt + if [ "$jsvc_use_pidfile" = "true" ]; then + import_cfg_var jsvc_pidfile_opt "-pidfile ${VESPA_HOME}var/run/jsvc.${VESPA_SERVICE_NAME}.pid" + else + import_cfg_var jsvc_pidfile_opt "" + fi + import_cfg_var jsvc_user_opt + import_cfg_var jsvc_agent_opt + import_cfg_var jsvc_ynet_opt + import_cfg_var jsvc_unknown_opts + + jsvc_opts="$jsvc_ipv6opts $jsvc_extra_opts $jsvc_normal_opts $jsvc_java_home_opt $jsvc_pidfile_opt $jsvc_user_opt $jsvc_agent_opt $jsvc_ynet_opt $jsvc_unknown_opts" + exec_jsvc fi } @@ -152,6 +247,7 @@ configure_env_vars configure_classpath configure_numactl configure_preload +maybe_use_jsvc exec $numactlcmd $envcmd java \ -Dconfig.id="${VESPA_CONFIG_ID}" \ diff --git a/searchcore/src/vespa/searchcore/proton/server/CMakeLists.txt b/searchcore/src/vespa/searchcore/proton/server/CMakeLists.txt index dbcdd8e2039..6582df2f9d3 100644 --- a/searchcore/src/vespa/searchcore/proton/server/CMakeLists.txt +++ b/searchcore/src/vespa/searchcore/proton/server/CMakeLists.txt @@ -81,7 +81,6 @@ vespa_add_library(searchcore_server STATIC sample_attribute_usage_job.cpp schema_config_validator.cpp searchable_doc_subdb_configurer.cpp - searchable_document_retriever.cpp searchable_feed_view.cpp searchabledocsubdb.cpp searchcontext.cpp diff --git a/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp b/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp index 8233363b6ff..a7bf2623d65 100644 --- a/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp @@ -98,9 +98,10 @@ DocumentDB::DocumentDB(const vespalib::string &baseDir, _docTypeName(docTypeName), _baseDir(baseDir + "/" + _docTypeName.toString()), // Only one thread per executor, or performDropFeedView() will fail. + _defaultExecutorTaskLimit(protonCfg.indexing.tasklimit), _writeService(std::max(1, protonCfg.indexing.threads), indexing_thread_stack_size, - protonCfg.indexing.tasklimit), + _defaultExecutorTaskLimit), _initializeThreads(initializeThreads), _initConfigSnapshot(), _initConfigSerialNum(0u), @@ -443,6 +444,11 @@ DocumentDB::applyConfig(DocumentDBConfig::SP configSnapshot, hasVisibilityDelayChanged = (visibilityDelay != _visibility.getVisibilityDelay()); _visibility.setVisibilityDelay(visibilityDelay); } + if (_visibility.getVisibilityDelay() > 0) { + _writeService.setTaskLimit(std::numeric_limits<uint32_t>::max()); + } else { + _writeService.setTaskLimit(_defaultExecutorTaskLimit); + } if (params.shouldSubDbsChange() || hasVisibilityDelayChanged) { _subDBs.applyConfig(*configSnapshot, *_activeConfigSnapshot, serialNum, params); if (serialNum < _feedHandler.getSerialNum()) { diff --git a/searchcore/src/vespa/searchcore/proton/server/documentdb.h b/searchcore/src/vespa/searchcore/proton/server/documentdb.h index 9f592a292fd..b6fc9cef004 100644 --- a/searchcore/src/vespa/searchcore/proton/server/documentdb.h +++ b/searchcore/src/vespa/searchcore/proton/server/documentdb.h @@ -91,6 +91,7 @@ private: DocTypeName _docTypeName; vespalib::string _baseDir; + uint32_t _defaultExecutorTaskLimit; // Only one thread per executor, or dropFeedView() will fail. ExecutorThreadingService _writeService; // threads for initializer tasks during proton startup diff --git a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp index 8789a012490..683dfeaad5c 100644 --- a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp @@ -54,6 +54,14 @@ ExecutorThreadingService::shutdown() _indexFieldWriter.sync(); } +void +ExecutorThreadingService::setTaskLimit(uint32_t taskLimit) +{ + _indexExecutor.setTaskLimit(taskLimit); + _indexFieldInverter.setTaskLimit(taskLimit); + _indexFieldWriter.setTaskLimit(taskLimit); + _attributeFieldWriter.setTaskLimit(taskLimit); +} } // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h index 9c836d10f96..db060b15eee 100644 --- a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h +++ b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h @@ -42,6 +42,8 @@ public: void shutdown(); + void setTaskLimit(uint32_t taskLimit); + // Expose the underlying executors for stats fetching and testing. vespalib::ThreadStackExecutorBase &getMasterExecutor() { return _masterExecutor; diff --git a/searchcore/src/vespa/searchcore/proton/server/fast_access_doc_subdb.cpp b/searchcore/src/vespa/searchcore/proton/server/fast_access_doc_subdb.cpp index a95794742e2..cb8abd1cfd9 100644 --- a/searchcore/src/vespa/searchcore/proton/server/fast_access_doc_subdb.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/fast_access_doc_subdb.cpp @@ -295,7 +295,7 @@ FastAccessDocSubDB::getDocumentRetriever() { FastAccessFeedView::SP feedView = _fastUpdateFeedView.get(); proton::IAttributeManager::SP attrMgr = extractAttributeManager(feedView); - return IDocumentRetriever::UP(new FastAccessDocumentRetriever(feedView, attrMgr, _docIdLimit)); + return IDocumentRetriever::UP(new FastAccessDocumentRetriever(feedView, attrMgr)); } void diff --git a/searchcore/src/vespa/searchcore/proton/server/fast_access_document_retriever.h b/searchcore/src/vespa/searchcore/proton/server/fast_access_document_retriever.h index 7ff43e20b0d..e1957342e31 100644 --- a/searchcore/src/vespa/searchcore/proton/server/fast_access_document_retriever.h +++ b/searchcore/src/vespa/searchcore/proton/server/fast_access_document_retriever.h @@ -17,14 +17,11 @@ namespace proton { class FastAccessDocumentRetriever : public DocumentRetriever { private: - FastAccessFeedView::SP _feedView; - IAttributeManager::SP _attrMgr; - const DocIdLimit &_docIdLimit; + FastAccessFeedView::SP _feedView; + IAttributeManager::SP _attrMgr; public: - FastAccessDocumentRetriever(const FastAccessFeedView::SP &feedView, - const IAttributeManager::SP &attrMgr, - const DocIdLimit & docIdLimit) + FastAccessDocumentRetriever(const FastAccessFeedView::SP &feedView, const IAttributeManager::SP &attrMgr) : DocumentRetriever(feedView->getPersistentParams()._docTypeName, *feedView->getDocumentTypeRepo(), *feedView->getSchema(), @@ -32,11 +29,9 @@ public: *attrMgr, feedView->getDocumentStore()), _feedView(feedView), - _attrMgr(attrMgr), - _docIdLimit(docIdLimit) - { - } - uint32_t getDocIdLimit() const override { return _docIdLimit.get(); } + _attrMgr(attrMgr) + { } + uint32_t getDocIdLimit() const override { return _feedView->getDocIdLimit().get(); } }; } // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/server/fast_access_feed_view.h b/searchcore/src/vespa/searchcore/proton/server/fast_access_feed_view.h index 338a846942f..3ed6d84ffc3 100644 --- a/searchcore/src/vespa/searchcore/proton/server/fast_access_feed_view.h +++ b/searchcore/src/vespa/searchcore/proton/server/fast_access_feed_view.h @@ -40,7 +40,7 @@ private: const IAttributeWriter::SP _attributeWriter; DocIdLimit &_docIdLimit; - virtual UpdateScope getUpdateScope(const document::DocumentUpdate &upd); + virtual UpdateScope getUpdateScope(const document::DocumentUpdate &upd) override; virtual void putAttributes(SerialNum serialNum, search::DocumentIdT lid, @@ -52,24 +52,22 @@ private: search::DocumentIdT lid, const document::DocumentUpdate &upd, bool immediateCommit, - OnOperationDoneType onWriteDone); + OnOperationDoneType onWriteDone) override; virtual void removeAttributes(SerialNum serialNum, search::DocumentIdT lid, bool immediateCommit, - OnRemoveDoneType onWriteDone); + OnRemoveDoneType onWriteDone) override; virtual void removeAttributes(SerialNum serialNum, const LidVector &lidsToRemove, bool immediateCommit, - OnWriteDoneType onWriteDone); + OnWriteDoneType onWriteDone) override; - virtual void heartBeatAttributes(SerialNum serialNum); + virtual void heartBeatAttributes(SerialNum serialNum) override; protected: - virtual void - forceCommit(SerialNum serialNum, OnForceCommitDoneType onCommitDone) - override; + virtual void forceCommit(SerialNum serialNum, OnForceCommitDoneType onCommitDone) override; public: FastAccessFeedView(const StoreOnlyFeedView::Context &storeOnlyCtx, @@ -84,10 +82,9 @@ public: return _docIdLimit; } - virtual void handleCompactLidSpace(const CompactLidSpaceOperation &op); + virtual void handleCompactLidSpace(const CompactLidSpaceOperation &op) override; - virtual void - sync() override; + virtual void sync() override; bool fastPartialUpdateAttribute(const vespalib::string &fieldName) { search::AttributeVector *attribute = diff --git a/searchcore/src/vespa/searchcore/proton/server/searchable_document_retriever.cpp b/searchcore/src/vespa/searchcore/proton/server/searchable_document_retriever.cpp deleted file mode 100644 index 11f103ca7b4..00000000000 --- a/searchcore/src/vespa/searchcore/proton/server/searchable_document_retriever.cpp +++ /dev/null @@ -1,20 +0,0 @@ -// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#include <vespa/fastos/fastos.h> -#include "searchable_document_retriever.h" - -namespace proton { - -SearchableDocumentRetriever::SearchableDocumentRetriever( - const SearchableFeedView::SP &fw, const SearchView::SP &sv) - : DocumentRetriever(fw->getPersistentParams()._docTypeName, - *fw->getDocumentTypeRepo(), - *fw->getSchema(), - *sv->getDocumentMetaStore(), - *sv->getAttributeManager(), - fw->getDocumentStore()), - feedView(fw) -{ -} - -} // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/server/searchable_document_retriever.h b/searchcore/src/vespa/searchcore/proton/server/searchable_document_retriever.h index dd27e549af7..cde180a230d 100644 --- a/searchcore/src/vespa/searchcore/proton/server/searchable_document_retriever.h +++ b/searchcore/src/vespa/searchcore/proton/server/searchable_document_retriever.h @@ -2,17 +2,18 @@ #pragma once -#include "documentretriever.h" +#include "fast_access_document_retriever.h" #include "searchable_feed_view.h" #include "searchview.h" namespace proton { -struct SearchableDocumentRetriever : DocumentRetriever { - SearchableFeedView::SP feedView; - +class SearchableDocumentRetriever : public FastAccessDocumentRetriever { +public: // Assumes the FeedView also ensures that the MatchView stays alive. - SearchableDocumentRetriever(const SearchableFeedView::SP &fw, const SearchView::SP &sv); + SearchableDocumentRetriever(const SearchableFeedView::SP &fw, const SearchView::SP &sv) : + FastAccessDocumentRetriever(fw, sv->getAttributeManager()) + { } }; } // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/server/searchabledocsubdb.cpp b/searchcore/src/vespa/searchcore/proton/server/searchabledocsubdb.cpp index e2db3bf0d70..83d0436eba1 100644 --- a/searchcore/src/vespa/searchcore/proton/server/searchabledocsubdb.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/searchabledocsubdb.cpp @@ -343,8 +343,7 @@ SearchableDocSubDB::getSearchableStats() const IDocumentRetriever::UP SearchableDocSubDB::getDocumentRetriever() { - return IDocumentRetriever::UP(new SearchableDocumentRetriever( - _rFeedView.get(), _rSearchView.get())); + return IDocumentRetriever::UP(new SearchableDocumentRetriever(_rFeedView.get(), _rSearchView.get())); } MatchingStats diff --git a/searchlib/src/vespa/searchlib/common/sequencedtaskexecutor.cpp b/searchlib/src/vespa/searchlib/common/sequencedtaskexecutor.cpp index 3fdfe2fff3f..16ab7ab5ee5 100644 --- a/searchlib/src/vespa/searchlib/common/sequencedtaskexecutor.cpp +++ b/searchlib/src/vespa/searchlib/common/sequencedtaskexecutor.cpp @@ -34,6 +34,13 @@ SequencedTaskExecutor::~SequencedTaskExecutor() sync(); } +void +SequencedTaskExecutor::setTaskLimit(uint32_t taskLimit) +{ + for (const auto &executor : _executors) { + executor->setTaskLimit(taskLimit); + } +} void SequencedTaskExecutor::executeTask(uint64_t id, diff --git a/searchlib/src/vespa/searchlib/common/sequencedtaskexecutor.h b/searchlib/src/vespa/searchlib/common/sequencedtaskexecutor.h index 0d97ee9a758..4f20c5b14a4 100644 --- a/searchlib/src/vespa/searchlib/common/sequencedtaskexecutor.h +++ b/searchlib/src/vespa/searchlib/common/sequencedtaskexecutor.h @@ -3,6 +3,7 @@ #include "isequencedtaskexecutor.h" #include <vespa/vespalib/stllike/hash_map.h> +#include <vespa/vespalib/util/blockingthreadstackexecutor.h> namespace vespalib { @@ -20,13 +21,15 @@ namespace search */ class SequencedTaskExecutor : public ISequencedTaskExecutor { - std::vector<std::shared_ptr<vespalib::ThreadStackExecutorBase>> _executors; + std::vector<std::shared_ptr<vespalib::BlockingThreadStackExecutor>> _executors; vespalib::hash_map<size_t, size_t> _ids; public: SequencedTaskExecutor(uint32_t threads, uint32_t taskLimit = 1000); ~SequencedTaskExecutor(); + void setTaskLimit(uint32_t taskLimit); + virtual void executeTask(uint64_t id, vespalib::Executor::Task::UP task) override; diff --git a/vespabase/src/common-env.sh b/vespabase/src/common-env.sh index d2e30ec3368..a2c55439a3f 100755 --- a/vespabase/src/common-env.sh +++ b/vespabase/src/common-env.sh @@ -236,12 +236,20 @@ getJavaOptionsIPV46() { fi } +log_message () { + msg_log_level="info" + case $1 in + error|warning|info|event|config|debug|spam) msg_log_level=$1; shift;; + esac + printf "%s\t%s\t%s\n" $$ $msg_log_level "$*" +} + log_debug_message () { - if [ -n "$YINST_RUNNING" ]; then - echo "debug $*" 1>&2 + if [ "$YINST_RUNNING" ]; then + log_message "debug" "$*" fi } log_warning_message () { - echo "warning $*" 1>&2 + log_message "warning" "$*" 1>&2 } diff --git a/vespaclient/src/perl/lib/Yahoo/Vespa/VespaModel.pm b/vespaclient/src/perl/lib/Yahoo/Vespa/VespaModel.pm index a489f698cad..4525ee06c5f 100644 --- a/vespaclient/src/perl/lib/Yahoo/Vespa/VespaModel.pm +++ b/vespaclient/src/perl/lib/Yahoo/Vespa/VespaModel.pm @@ -169,14 +169,14 @@ sub retrieveModelConfigDefault { # () } if (!defined $CONFIG_SERVER_HOST) { - my $temp = `${VESPA_HOME}/bin/print-vespa-default configservers`; + my $temp = `${VESPA_HOME}/bin/vespa-print-default configservers`; my @configServerHosts = split(' ', $temp); $CONFIG_SERVER_HOST = $configServerHosts[0]; } $cmd .= " -s $CONFIG_SERVER_HOST"; if (!defined $CONFIG_SERVER_PORT) { - my $temp = `${VESPA_HOME}/bin/print-vespa-default configserver_rpc_port`; + my $temp = `${VESPA_HOME}/bin/vespa-print-default configserver_rpc_port`; chomp($temp); $CONFIG_SERVER_PORT = $temp; } diff --git a/vespalib/src/tests/executor/CMakeLists.txt b/vespalib/src/tests/executor/CMakeLists.txt index 0745c7523b1..0435c03f34c 100644 --- a/vespalib/src/tests/executor/CMakeLists.txt +++ b/vespalib/src/tests/executor/CMakeLists.txt @@ -20,3 +20,10 @@ vespa_add_executable(vespalib_stress_test_app vespalib ) vespa_add_test(NAME vespalib_stress_test_app COMMAND vespalib_stress_test_app BENCHMARK) +vespa_add_executable(vespalib_blockingthreadstackexecutor_test_app TEST + SOURCES + blockingthreadstackexecutor_test.cpp + DEPENDS + vespalib +) +vespa_add_test(NAME vespalib_blockingthreadstackexecutor_test_app COMMAND vespalib_blockingthreadstackexecutor_test_app) diff --git a/vespalib/src/tests/executor/FILES b/vespalib/src/tests/executor/FILES index 0a6c8c0a73d..5081bfe8ce0 100644 --- a/vespalib/src/tests/executor/FILES +++ b/vespalib/src/tests/executor/FILES @@ -1 +1,4 @@ threadstackexecutor_test.cpp +executor_test.cpp +stress_test.cpp +blockingthreadstackexecutor_test.cpp diff --git a/vespalib/src/tests/executor/blockingthreadstackexecutor_test.cpp b/vespalib/src/tests/executor/blockingthreadstackexecutor_test.cpp new file mode 100644 index 00000000000..f8136fe2a10 --- /dev/null +++ b/vespalib/src/tests/executor/blockingthreadstackexecutor_test.cpp @@ -0,0 +1,111 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <vespa/vespalib/testkit/test_kit.h> + +#include <vespa/vespalib/util/blockingthreadstackexecutor.h> +#include <vespa/vespalib/util/executor.h> +#include <vespa/vespalib/util/sync.h> +#include <thread> + +using namespace vespalib; + +constexpr int msWait = 30000; + +class MyTask : public Executor::Task +{ +private: + Gate &_entryGate; + CountDownLatch &_exitLatch; + +public: + MyTask(Gate &entryGate, CountDownLatch &exitLatch) + : _entryGate(entryGate), + _exitLatch(exitLatch) + {} + virtual void run() override { + _entryGate.await(msWait); + _exitLatch.countDown(); + } + static Task::UP create(Gate &entryGate, CountDownLatch &exitLatch) { + return std::make_unique<MyTask>(entryGate, exitLatch); + } +}; + +void +blockedExecute(BlockingThreadStackExecutor *executor, Gate *workersEntryGate, CountDownLatch *workersExitLatch, Gate *exitGate) +{ + executor->execute(MyTask::create(*workersEntryGate, *workersExitLatch)); // this should be a blocking call + exitGate->countDown(); +} + +using ThreadUP = std::unique_ptr<std::thread>; + +struct Fixture +{ + BlockingThreadStackExecutor executor; + Gate workersEntryGate; + CountDownLatch workersExitLatch; + Gate blockedExecuteGate; + + Fixture(uint32_t taskLimit, uint32_t tasksToWaitFor) + : executor(1, 128000, taskLimit), + workersEntryGate(), + workersExitLatch(tasksToWaitFor), + blockedExecuteGate() + {} + void execute(size_t numTasks) { + for (size_t i = 0; i < numTasks; ++i) { + executor.execute(MyTask::create(workersEntryGate, workersExitLatch)); + } + } + void updateTaskLimit(uint32_t taskLimit) { + executor.setTaskLimit(taskLimit); + } + void openForWorkers() { + workersEntryGate.countDown(); + } + void waitForWorkers() { + workersExitLatch.await(msWait); + } + void assertExecuteIsBlocked() { + blockedExecuteGate.await(10); + EXPECT_EQUAL(1u, blockedExecuteGate.getCount()); + } + void waitForExecuteIsFinished() { + blockedExecuteGate.await(msWait); + EXPECT_EQUAL(0u, blockedExecuteGate.getCount()); + } + ThreadUP blockedExecuteThread() { + return std::make_unique<std::thread>(blockedExecute, &executor, &workersEntryGate, &workersExitLatch, &blockedExecuteGate); + } + void blockedExecuteAndWaitUntilFinished() { + ThreadUP thread = blockedExecuteThread(); + TEST_DO(assertExecuteIsBlocked()); + openForWorkers(); + TEST_DO(waitForExecuteIsFinished()); + thread->join(); + waitForWorkers(); + } +}; + +TEST_F("require that execute() blocks when task limits is reached", Fixture(3, 4)) +{ + f.execute(3); + f.blockedExecuteAndWaitUntilFinished(); +} + +TEST_F("require that task limit can be increased", Fixture(3, 5)) +{ + f.execute(3); + f.updateTaskLimit(4); + f.execute(1); + f.blockedExecuteAndWaitUntilFinished(); +} + +TEST_F("require that task limit can be decreased", Fixture(3, 3)) +{ + f.execute(2); + f.updateTaskLimit(2); + f.blockedExecuteAndWaitUntilFinished(); +} + +TEST_MAIN() { TEST_RUN_ALL(); } diff --git a/vespalib/src/vespa/vespalib/util/blockingthreadstackexecutor.cpp b/vespalib/src/vespa/vespalib/util/blockingthreadstackexecutor.cpp index 4df04d15ce4..95d0b147707 100644 --- a/vespalib/src/vespa/vespalib/util/blockingthreadstackexecutor.cpp +++ b/vespalib/src/vespa/vespalib/util/blockingthreadstackexecutor.cpp @@ -31,4 +31,10 @@ BlockingThreadStackExecutor::wakeup(MonitorGuard & monitor) monitor.broadcast(); } +void +BlockingThreadStackExecutor::setTaskLimit(uint32_t taskLimit) +{ + ThreadStackExecutorBase::setTaskLimit(taskLimit); +} + } // namespace vespalib diff --git a/vespalib/src/vespa/vespalib/util/blockingthreadstackexecutor.h b/vespalib/src/vespa/vespalib/util/blockingthreadstackexecutor.h index 5fba2c7dbe6..6e1b7ad8100 100644 --- a/vespalib/src/vespa/vespalib/util/blockingthreadstackexecutor.h +++ b/vespalib/src/vespa/vespalib/util/blockingthreadstackexecutor.h @@ -27,6 +27,11 @@ public: **/ BlockingThreadStackExecutor(uint32_t threads, uint32_t stackSize, uint32_t taskLimit); ~BlockingThreadStackExecutor(); + + /** + * Sets a new upper limit for accepted number of tasks. + */ + void setTaskLimit(uint32_t taskLimit); }; } // namespace vespalib diff --git a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp index 780f0ed5dd0..816c1feaf1b 100644 --- a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp +++ b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp @@ -129,6 +129,13 @@ ThreadStackExecutorBase::start(uint32_t threads) } } +void +ThreadStackExecutorBase::setTaskLimit(uint32_t taskLimit) +{ + MonitorGuard monitor(_monitor); + _taskLimit = taskLimit; +} + ThreadStackExecutorBase::Stats ThreadStackExecutorBase::getStats() { diff --git a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h index 215a8377fc6..b5ab46ed335 100644 --- a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h +++ b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h @@ -139,6 +139,7 @@ protected: * @param taskLimit upper limit on accepted tasks **/ ThreadStackExecutorBase(uint32_t stackSize, uint32_t taskLimit); + /** * This will start the theads. This is to avoid starting tasks in * constructor of base class. @@ -146,6 +147,12 @@ protected: * @param threads number of worker threads (concurrent tasks) */ void start(uint32_t threads); + + /** + * Sets a new upper limit for accepted number of tasks. + */ + void setTaskLimit(uint32_t taskLimit); + public: /** * Observe and reset stats for this object. |