summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorsariel <shahar.ariel@verizonmedia.com>2021-11-01 16:56:18 +0200
committersariel <shahar.ariel@verizonmedia.com>2021-11-01 16:56:18 +0200
commit0039fba5e9615536e81192a88e08d0fdc9fa13b4 (patch)
tree32dab3fad991266db058cf02ec190b583f9a9bce
parent0d8dd535f755f6751f1b94ec3b6dfe1fe6c59846 (diff)
parentf2d3685aff36e286a4aafdb90a019bbad36ddc0d (diff)
Merge remote-tracking branch 'origin/master'
-rw-r--r--client/pom.xml8
-rw-r--r--config-model/src/main/java/com/yahoo/vespa/model/container/ApplicationContainer.java1
-rw-r--r--configserver/src/main/java/com/yahoo/vespa/config/server/tenant/TenantRepository.java7
-rwxr-xr-xconfigserver/src/main/sh/start-configserver1
-rw-r--r--container-core/src/main/java/com/yahoo/jdisc/http/server/jetty/ServletOutputStreamWriter.java2
-rw-r--r--container-core/src/test/java/com/yahoo/jdisc/http/server/jetty/Utils.java1
-rw-r--r--container-disc/src/main/java/com/yahoo/container/jdisc/ZoneInfoProvider.java30
-rwxr-xr-xcontainer-disc/src/main/sh/vespa-start-container-daemon.sh1
-rw-r--r--dist/vespa.spec11
-rw-r--r--flags/src/main/java/com/yahoo/vespa/flags/Flags.java24
-rw-r--r--hosted-zone-api/abi-spec.json12
-rw-r--r--hosted-zone-api/src/main/java/ai/vespa/cloud/ZoneInfo.java27
-rw-r--r--searchcore/src/tests/proton/index/indexmanager_test.cpp3
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/searchable_feed_view.cpp6
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp6
-rw-r--r--searchlib/src/vespa/searchlib/common/unique_issues.h3
-rwxr-xr-xstandalone-container/src/main/sh/standalone-container.sh1
-rw-r--r--storage/src/vespa/storage/persistence/mergehandler.h1
-rw-r--r--vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java116
-rw-r--r--vespalib/src/tests/executor/threadstackexecutor_test.cpp1
20 files changed, 212 insertions, 50 deletions
diff --git a/client/pom.xml b/client/pom.xml
index 816941d7b7f..ba153aed8f8 100644
--- a/client/pom.xml
+++ b/client/pom.xml
@@ -41,10 +41,8 @@
</dependency>
<dependency>
<groupId>org.codehaus.groovy</groupId>
- <artifactId>groovy-all</artifactId>
- <!-- any version of Groovy \>= 1.8.2 should work here -->
- <version>2.5.6</version>
- <type>pom</type>
+ <artifactId>groovy</artifactId>
+ <version>3.0.8</version>
<scope>test</scope>
</dependency>
</dependencies>
@@ -54,7 +52,7 @@
<plugin>
<groupId>org.codehaus.gmavenplus</groupId>
<artifactId>gmavenplus-plugin</artifactId>
- <version>1.7.0</version>
+ <version>1.13.0</version>
<executions>
<execution>
<goals>
diff --git a/config-model/src/main/java/com/yahoo/vespa/model/container/ApplicationContainer.java b/config-model/src/main/java/com/yahoo/vespa/model/container/ApplicationContainer.java
index d51427abd90..9ad257fad04 100644
--- a/config-model/src/main/java/com/yahoo/vespa/model/container/ApplicationContainer.java
+++ b/config-model/src/main/java/com/yahoo/vespa/model/container/ApplicationContainer.java
@@ -36,6 +36,7 @@ public final class ApplicationContainer extends Container implements
addComponent(new SimpleComponent("com.yahoo.container.jdisc.messagebus.NetworkMultiplexerProvider"));
addComponent(new SimpleComponent("com.yahoo.container.jdisc.messagebus.SessionCache"));
addComponent(new SimpleComponent("com.yahoo.container.jdisc.SystemInfoProvider"));
+ addComponent(new SimpleComponent("com.yahoo.container.jdisc.ZoneInfoProvider"));
}
@Override
diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/tenant/TenantRepository.java b/configserver/src/main/java/com/yahoo/vespa/config/server/tenant/TenantRepository.java
index 1499100307c..70e7dbe19ec 100644
--- a/configserver/src/main/java/com/yahoo/vespa/config/server/tenant/TenantRepository.java
+++ b/configserver/src/main/java/com/yahoo/vespa/config/server/tenant/TenantRepository.java
@@ -221,9 +221,12 @@ public class TenantRepository {
}
private ExecutorService createModelBuilderExecutor(int numThreads) {
+ final long GB = 1024*1024*1024;
if (numThreads == 0) return new InThreadExecutorService();
if (numThreads < 0) {
- numThreads = Runtime.getRuntime().availableProcessors();
+ long maxHeap = Runtime.getRuntime().maxMemory();
+ int maxThreadsToFitInMemory = (int)((maxHeap + (GB - 1))/(1*GB));
+ numThreads = Math.min(Runtime.getRuntime().availableProcessors(), maxThreadsToFitInMemory);
}
return Executors.newFixedThreadPool(numThreads, ThreadFactoryFactory.getDaemonThreadFactory("deploy-helper"));
}
@@ -348,7 +351,7 @@ public class TenantRepository {
PermanentApplicationPackage permanentApplicationPackage = new PermanentApplicationPackage(configserverConfig);
SessionPreparer sessionPreparer = new SessionPreparer(modelFactoryRegistry,
fileDistributionFactory,
- deployHelperExecutor,
+ deployHelperExecutor,
hostProvisionerProvider,
permanentApplicationPackage,
configserverConfig,
diff --git a/configserver/src/main/sh/start-configserver b/configserver/src/main/sh/start-configserver
index 317af4b2fea..b3b449fb5a1 100755
--- a/configserver/src/main/sh/start-configserver
+++ b/configserver/src/main/sh/start-configserver
@@ -171,6 +171,7 @@ vespa-run-as-vespa-user vespa-runserver -s configserver -r 30 -p $pidfile -- \
-XX:-OmitStackTraceInFastThrow \
-XX:MaxJavaStackTraceDepth=1000000 \
$jvmargs \
+ --illegal-access=warn \
--add-opens=java.base/java.io=ALL-UNNAMED \
--add-opens=java.base/java.lang=ALL-UNNAMED \
--add-opens=java.base/java.net=ALL-UNNAMED \
diff --git a/container-core/src/main/java/com/yahoo/jdisc/http/server/jetty/ServletOutputStreamWriter.java b/container-core/src/main/java/com/yahoo/jdisc/http/server/jetty/ServletOutputStreamWriter.java
index ffd342f36cd..44daec42b88 100644
--- a/container-core/src/main/java/com/yahoo/jdisc/http/server/jetty/ServletOutputStreamWriter.java
+++ b/container-core/src/main/java/com/yahoo/jdisc/http/server/jetty/ServletOutputStreamWriter.java
@@ -198,7 +198,7 @@ class ServletOutputStreamWriter {
final int bytesToSend = buffer.remaining();
try {
if (buffer.hasArray()) {
- outputStream.write(buffer.array(), buffer.arrayOffset(), buffer.remaining());
+ outputStream.write(buffer.array(), buffer.arrayOffset() + buffer.position(), buffer.remaining());
} else {
final byte[] array = new byte[buffer.remaining()];
buffer.get(array);
diff --git a/container-core/src/test/java/com/yahoo/jdisc/http/server/jetty/Utils.java b/container-core/src/test/java/com/yahoo/jdisc/http/server/jetty/Utils.java
index 5b341e0a3e6..d4b0771f482 100644
--- a/container-core/src/test/java/com/yahoo/jdisc/http/server/jetty/Utils.java
+++ b/container-core/src/test/java/com/yahoo/jdisc/http/server/jetty/Utils.java
@@ -81,5 +81,4 @@ class Utils {
client.start();
return client;
}
-
}
diff --git a/container-disc/src/main/java/com/yahoo/container/jdisc/ZoneInfoProvider.java b/container-disc/src/main/java/com/yahoo/container/jdisc/ZoneInfoProvider.java
new file mode 100644
index 00000000000..30a4c740ff0
--- /dev/null
+++ b/container-disc/src/main/java/com/yahoo/container/jdisc/ZoneInfoProvider.java
@@ -0,0 +1,30 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.container.jdisc;
+
+import ai.vespa.cloud.Environment;
+import ai.vespa.cloud.Zone;
+import ai.vespa.cloud.ZoneInfo;
+import com.google.inject.Inject;
+import com.yahoo.cloud.config.ConfigserverConfig;
+import com.yahoo.component.AbstractComponent;
+import com.yahoo.container.di.componentgraph.Provider;
+
+/**
+ * Provides information about the zone in which this container is running.
+ * This is available and can be injected when running in a cloud environment.
+ *
+ * @author bratseth
+ */
+public class ZoneInfoProvider extends AbstractComponent implements Provider<ZoneInfo> {
+
+ private final ZoneInfo instance;
+
+ @Inject
+ public ZoneInfoProvider(ConfigserverConfig csConfig) {
+ this.instance = new ZoneInfo(new Zone(Environment.valueOf(csConfig.environment()), csConfig.region()));
+ }
+
+ @Override
+ public ZoneInfo get() { return instance; }
+
+}
diff --git a/container-disc/src/main/sh/vespa-start-container-daemon.sh b/container-disc/src/main/sh/vespa-start-container-daemon.sh
index 8c122d3170e..730dd350ca7 100755
--- a/container-disc/src/main/sh/vespa-start-container-daemon.sh
+++ b/container-disc/src/main/sh/vespa-start-container-daemon.sh
@@ -260,6 +260,7 @@ exec $numactlcmd $envcmd java \
-XX:HeapDumpPath="${VESPA_HOME}/var/crash" \
-XX:ErrorFile="${VESPA_HOME}/var/crash/hs_err_pid%p.log" \
-XX:+ExitOnOutOfMemoryError \
+ --illegal-access=warn \
--add-opens=java.base/java.io=ALL-UNNAMED \
--add-opens=java.base/java.lang=ALL-UNNAMED \
--add-opens=java.base/java.net=ALL-UNNAMED \
diff --git a/dist/vespa.spec b/dist/vespa.spec
index e6eff61b214..92713517988 100644
--- a/dist/vespa.spec
+++ b/dist/vespa.spec
@@ -278,6 +278,7 @@ Requires: gtest
%define _skip_vespamalloc 1
%endif
Requires: %{name}-base = %{version}-%{release}
+Requires: %{name}-base-libs = %{version}-%{release}
Requires: %{name}-libs = %{version}-%{release}
Requires: %{name}-clients = %{version}-%{release}
Requires: %{name}-config-model-fat = %{version}-%{release}
@@ -464,6 +465,7 @@ Vespa - The open big data serving engine - tools
Summary: Vespa - The open big data serving engine - ann-benchmark
+Requires: %{name}-base-libs = %{version}-%{release}
Requires: %{name}-libs = %{version}-%{release}
%if 0%{?el7}
Requires: python3
@@ -487,6 +489,15 @@ nearest neighbor search used for low-level benchmarking.
%setup -c -D -T
%else
%setup -q
+echo '%{version}' > VERSION
+case '%{version}' in
+ *.0)
+ :
+ ;;
+ *)
+ sed -i -e 's,<version>[0-9].*-SNAPSHOT</version>,<version>%{version}</version>,' $(find . -name pom.xml -print)
+ ;;
+esac
%endif
%build
diff --git a/flags/src/main/java/com/yahoo/vespa/flags/Flags.java b/flags/src/main/java/com/yahoo/vespa/flags/Flags.java
index ef4b232c68a..18c2e794994 100644
--- a/flags/src/main/java/com/yahoo/vespa/flags/Flags.java
+++ b/flags/src/main/java/com/yahoo/vespa/flags/Flags.java
@@ -130,14 +130,14 @@ public class Flags {
public static final UnboundBooleanFlag USE_THREE_PHASE_UPDATES = defineFeatureFlag(
"use-three-phase-updates", false,
- List.of("vekterli"), "2020-12-02", "2021-11-01",
+ List.of("vekterli"), "2020-12-02", "2022-01-01",
"Whether to enable the use of three-phase updates when bucket replicas are out of sync.",
"Takes effect at redeployment",
ZONE_ID, APPLICATION_ID);
public static final UnboundBooleanFlag HIDE_SHARED_ROUTING_ENDPOINT = defineFeatureFlag(
"hide-shared-routing-endpoint", false,
- List.of("tokle", "bjormel"), "2020-12-02", "2021-11-01",
+ List.of("tokle", "bjormel"), "2020-12-02", "2022-01-01",
"Whether the controller should hide shared routing layer endpoint",
"Takes effect immediately",
APPLICATION_ID
@@ -173,27 +173,27 @@ public class Flags {
public static final UnboundIntFlag NUM_DEPLOY_HELPER_THREADS = defineIntFlag(
"num-model-builder-threads", -1,
- List.of("balder"), "2021-09-09", "2021-11-01",
+ List.of("balder"), "2021-09-09", "2022-01-01",
"Number of threads used for speeding up building of models.",
"Takes effect on first (re)start of config server");
public static final UnboundBooleanFlag ENABLE_FEED_BLOCK_IN_DISTRIBUTOR = defineFeatureFlag(
"enable-feed-block-in-distributor", true,
- List.of("geirst"), "2021-01-27", "2021-11-01",
+ List.of("geirst"), "2021-01-27", "2022-01-31",
"Enables blocking of feed in the distributor if resource usage is above limit on at least one content node",
"Takes effect at redeployment",
ZONE_ID, APPLICATION_ID);
public static final UnboundBooleanFlag CONTAINER_DUMP_HEAP_ON_SHUTDOWN_TIMEOUT = defineFeatureFlag(
"container-dump-heap-on-shutdown-timeout", false,
- List.of("baldersheim"), "2021-09-25", "2021-11-01",
+ List.of("baldersheim"), "2021-09-25", "2022-01-01",
"Will trigger a heap dump during if container shutdown times out",
"Takes effect at redeployment",
ZONE_ID, APPLICATION_ID);
public static final UnboundDoubleFlag CONTAINER_SHUTDOWN_TIMEOUT = defineDoubleFlag(
"container-shutdown-timeout", 50.0,
- List.of("baldersheim"), "2021-09-25", "2021-11-01",
+ List.of("baldersheim"), "2021-09-25", "2022-01-01",
"Timeout for shutdown of a jdisc container",
"Takes effect at redeployment",
ZONE_ID, APPLICATION_ID);
@@ -221,14 +221,14 @@ public class Flags {
public static final UnboundIntFlag MAX_CONCURRENT_MERGES_PER_NODE = defineIntFlag(
"max-concurrent-merges-per-node", 128,
- List.of("balder", "vekterli"), "2021-06-06", "2021-11-01",
+ List.of("balder", "vekterli"), "2021-06-06", "2022-01-01",
"Specifies max concurrent merges per content node.",
"Takes effect at redeploy",
ZONE_ID, APPLICATION_ID);
public static final UnboundIntFlag MAX_MERGE_QUEUE_SIZE = defineIntFlag(
"max-merge-queue-size", 1024,
- List.of("balder", "vekterli"), "2021-06-06", "2021-11-01",
+ List.of("balder", "vekterli"), "2021-06-06", "2022-01-01",
"Specifies max size of merge queue.",
"Takes effect at redeploy",
ZONE_ID, APPLICATION_ID);
@@ -243,7 +243,7 @@ public class Flags {
public static final UnboundIntFlag LARGE_RANK_EXPRESSION_LIMIT = defineIntFlag(
"large-rank-expression-limit", 8192,
- List.of("baldersheim"), "2021-06-09", "2021-11-01",
+ List.of("baldersheim"), "2021-06-09", "2022-01-01",
"Limit for size of rank expressions distributed by filedistribution",
"Takes effect on next internal redeployment",
APPLICATION_ID);
@@ -269,20 +269,20 @@ public class Flags {
public static final UnboundListFlag<String> ALLOWED_SERVICE_VIEW_APIS = defineListFlag(
"allowed-service-view-apis", List.of("state/v1/"), String.class,
- List.of("mortent"), "2021-08-05", "2021-11-01",
+ List.of("mortent"), "2021-08-05", "2022-01-01",
"Apis allowed to proxy through the service view api",
"Takes effect immediately");
public static final UnboundBooleanFlag SEPARATE_TENANT_IAM_ROLES = defineFeatureFlag(
"separate-tenant-iam-roles", false,
- List.of("mortent"), "2021-08-12", "2021-11-01",
+ List.of("mortent"), "2021-08-12", "2022-01-01",
"Create separate iam roles for tenant",
"Takes effect on redeploy",
TENANT_ID);
public static final UnboundIntFlag METRICSPROXY_NUM_THREADS = defineIntFlag(
"metricsproxy-num-threads", 2,
- List.of("balder"), "2021-09-01", "2021-11-01",
+ List.of("balder"), "2021-09-01", "2022-01-01",
"Number of threads for metrics proxy",
"Takes effect at redeployment",
ZONE_ID, APPLICATION_ID);
diff --git a/hosted-zone-api/abi-spec.json b/hosted-zone-api/abi-spec.json
index 4195fd5f10c..6a6da57a2a1 100644
--- a/hosted-zone-api/abi-spec.json
+++ b/hosted-zone-api/abi-spec.json
@@ -78,5 +78,17 @@
"public static ai.vespa.cloud.Zone from(java.lang.String)"
],
"fields": []
+ },
+ "ai.vespa.cloud.ZoneInfo": {
+ "superClass": "java.lang.Object",
+ "interfaces": [],
+ "attributes": [
+ "public"
+ ],
+ "methods": [
+ "public void <init>(ai.vespa.cloud.Zone)",
+ "public ai.vespa.cloud.Zone zone()"
+ ],
+ "fields": []
}
} \ No newline at end of file
diff --git a/hosted-zone-api/src/main/java/ai/vespa/cloud/ZoneInfo.java b/hosted-zone-api/src/main/java/ai/vespa/cloud/ZoneInfo.java
new file mode 100644
index 00000000000..d9af2421ab9
--- /dev/null
+++ b/hosted-zone-api/src/main/java/ai/vespa/cloud/ZoneInfo.java
@@ -0,0 +1,27 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package ai.vespa.cloud;
+
+import java.util.Objects;
+
+/**
+ * Provides information about the zone in which this container is running.
+ * This is available and can be injected when running in a cloud environment.
+ * If you don't need any other information than the zone this should be preferred
+ * to SystemInfo as it will never change at runtime and therefore does not
+ * cause unnecessary reconstruction.
+ *
+ * @author bratseth
+ */
+public class ZoneInfo {
+
+ private final Zone zone;
+
+ public ZoneInfo(Zone zone) {
+ Objects.requireNonNull(zone, "Zone cannot be null!");
+ this.zone = zone;
+ }
+
+ /** Returns the zone this is running in */
+ public Zone zone() { return zone; }
+
+}
diff --git a/searchcore/src/tests/proton/index/indexmanager_test.cpp b/searchcore/src/tests/proton/index/indexmanager_test.cpp
index be1e2c79f70..aa118600272 100644
--- a/searchcore/src/tests/proton/index/indexmanager_test.cpp
+++ b/searchcore/src/tests/proton/index/indexmanager_test.cpp
@@ -142,8 +142,7 @@ struct IndexManagerTest : public ::testing::Test {
void resetIndexManager();
void removeDocument(uint32_t docId, SerialNum serialNum) {
runAsIndex([&]() { _index_manager->removeDocument(docId, serialNum);
- _index_manager->commit(serialNum,
- emptyDestructorCallback);
+ _index_manager->commit(serialNum, emptyDestructorCallback);
});
_writeService.indexFieldWriter().sync_all();
}
diff --git a/searchcore/src/vespa/searchcore/proton/server/searchable_feed_view.cpp b/searchcore/src/vespa/searchcore/proton/server/searchable_feed_view.cpp
index ca6551d7453..d973777020d 100644
--- a/searchcore/src/vespa/searchcore/proton/server/searchable_feed_view.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/searchable_feed_view.cpp
@@ -170,11 +170,13 @@ void
SearchableFeedView::handleCompactLidSpace(const CompactLidSpaceOperation &op)
{
Parent::handleCompactLidSpace(op);
+ vespalib::Gate gate;
_writeService.index().execute(
- makeLambdaTask([this, &op]() {
+ makeLambdaTask([this, &op, &gate]() {
_indexWriter->compactLidSpace(op.getSerialNum(), op.getLidLimit());
+ gate.countDown();
}));
- _writeService.index().sync();
+ gate.await();
}
void
diff --git a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp
index c9af1d86bb4..c9a0892d3c3 100644
--- a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp
@@ -769,10 +769,12 @@ StoreOnlyFeedView::handleCompactLidSpace(const CompactLidSpaceOperation &op)
internalForceCommit(CommitParam(serialNum), commitContext);
}
if (useDocumentStore(serialNum)) {
- _writeService.summary().execute(makeLambdaTask([this, &op]() {
+ vespalib::Gate gate;
+ _writeService.summary().execute(makeLambdaTask([this, &op, &gate]() {
_summaryAdapter->compactLidSpace(op.getLidLimit());
+ gate.countDown();
}));
- _writeService.summary().sync();
+ gate.await();
}
}
diff --git a/searchlib/src/vespa/searchlib/common/unique_issues.h b/searchlib/src/vespa/searchlib/common/unique_issues.h
index 79cb7a9aa23..17c42da060c 100644
--- a/searchlib/src/vespa/searchlib/common/unique_issues.h
+++ b/searchlib/src/vespa/searchlib/common/unique_issues.h
@@ -18,7 +18,8 @@ private:
public:
using UP = std::unique_ptr<UniqueIssues>;
void handle(const vespalib::Issue &issue) override;
- void for_each_message(auto fun) const {
+ template <class Function>
+ void for_each_message(Function fun) const {
for (const auto &msg: _messages) {
fun(msg);
}
diff --git a/standalone-container/src/main/sh/standalone-container.sh b/standalone-container/src/main/sh/standalone-container.sh
index 9e888bdfea2..9f862d2b14a 100755
--- a/standalone-container/src/main/sh/standalone-container.sh
+++ b/standalone-container/src/main/sh/standalone-container.sh
@@ -163,6 +163,7 @@ StartCommand() {
-XX:+HeapDumpOnOutOfMemoryError \
-XX:HeapDumpPath="$VESPA_HOME/var/crash" \
-XX:+ExitOnOutOfMemoryError \
+ --illegal-access=warn \
--add-opens=java.base/java.io=ALL-UNNAMED \
--add-opens=java.base/java.lang=ALL-UNNAMED \
--add-opens=java.base/java.net=ALL-UNNAMED \
diff --git a/storage/src/vespa/storage/persistence/mergehandler.h b/storage/src/vespa/storage/persistence/mergehandler.h
index 4daec4c0689..5e66b364242 100644
--- a/storage/src/vespa/storage/persistence/mergehandler.h
+++ b/storage/src/vespa/storage/persistence/mergehandler.h
@@ -21,6 +21,7 @@
#include <vespa/storage/common/cluster_context.h>
#include <vespa/storage/common/messagesender.h>
#include <vespa/vespalib/util/monitored_refcount.h>
+#include <atomic>
namespace vespalib { class ISequencedTaskExecutor; }
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java
index 20121d8d193..29fd419daa2 100644
--- a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java
+++ b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java
@@ -68,6 +68,7 @@ import com.yahoo.vespa.config.content.AllClustersBucketSpacesConfig;
import com.yahoo.yolean.Exceptions;
import com.yahoo.yolean.Exceptions.RunnableThrowingIOException;
+import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@@ -83,9 +84,11 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
+import java.util.Queue;
import java.util.StringJoiner;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.Phaser;
import java.util.concurrent.ScheduledExecutorService;
@@ -577,12 +580,19 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
private static class JsonResponse implements AutoCloseable {
private static final ByteBuffer emptyBuffer = ByteBuffer.wrap(new byte[0]);
+ private static final int FLUSH_SIZE = 128;
private final BufferedContentChannel buffer = new BufferedContentChannel();
private final OutputStream out = new ContentChannelOutputStream(buffer);
private final JsonGenerator json;
private final ResponseHandler handler;
+ private final Queue<CompletionHandler> acks = new ConcurrentLinkedQueue<>();
+ private final Queue<ByteArrayOutputStream> docs = new ConcurrentLinkedQueue<>();
+ private final AtomicLong documentsWritten = new AtomicLong();
+ private final AtomicLong documentsFlushed = new AtomicLong();
+ private final AtomicLong documentsAcked = new AtomicLong();
private boolean documentsDone = false;
+ private boolean first = true;
private ContentChannel channel;
private JsonResponse(ResponseHandler handler) throws IOException {
@@ -695,19 +705,72 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
json.writeArrayFieldStart("documents");
}
- synchronized void writeDocumentValue(Document document, CompletionHandler completionHandler) {
- if ( ! documentsDone)
- new JsonWriter(json).write(document);
-
+ /** Writes documents to an internal queue, which is flushed regularly. */
+ void writeDocumentValue(Document document, CompletionHandler completionHandler) throws IOException {
if (completionHandler != null) {
- if ( ! documentsDone)
- buffer.write(emptyBuffer, completionHandler);
+ acks.add(completionHandler);
+ ackDocuments();
+ }
+
+ // Serialise document and add to queue, not necessarily in the order dictated by "written" above,
+ // i.e., the first 128 documents in the queue are not necessarily the ones ack'ed early.
+ ByteArrayOutputStream myOut = new ByteArrayOutputStream(1);
+ myOut.write(','); // Prepend rather than append, to avoid double memory copying.
+ try (JsonGenerator myJson = jsonFactory.createGenerator(myOut)) {
+ new JsonWriter(myJson).write(document);
+ }
+ docs.add(myOut);
+
+ // Flush the first FLUSH_SIZE documents in the queue to the network layer if chunk is filled.
+ if (documentsWritten.incrementAndGet() % FLUSH_SIZE == 0) {
+ flushDocuments();
+ }
+ }
+
+ void ackDocuments() {
+ while (documentsAcked.incrementAndGet() <= documentsFlushed.get() + FLUSH_SIZE) {
+ CompletionHandler ack = acks.poll();
+ if (ack != null)
+ ack.completed();
else
- completionHandler.completed();
+ break;
}
+ documentsAcked.decrementAndGet(); // We overshoot by one above, so decrement again when done.
+ }
+
+ synchronized void flushDocuments() throws IOException {
+ for (int i = 0; i < FLUSH_SIZE; i++) {
+ ByteArrayOutputStream doc = docs.poll();
+ if (doc == null)
+ break;
+
+ if ( ! documentsDone) {
+ if (first) { // First chunk, remove leading comma from first document, and flush "json" to "buffer".
+ json.flush();
+ buffer.write(ByteBuffer.wrap(doc.toByteArray(), 1, doc.size() - 1), null);
+ first = false;
+ }
+ else {
+ buffer.write(ByteBuffer.wrap(doc.toByteArray()), null);
+ }
+ }
+ }
+
+ // Ensure new, eligible acks are done, after flushing these documents.
+ buffer.write(emptyBuffer, new CompletionHandler() {
+ @Override public void completed() {
+ documentsFlushed.addAndGet(FLUSH_SIZE);
+ ackDocuments();
+ }
+ @Override public void failed(Throwable t) {
+ log.log(WARNING, "Error writing documents", t);
+ completed();
+ }
+ });
}
synchronized void writeArrayEnd() throws IOException {
+ flushDocuments();
documentsDone = true;
json.writeEndArray();
}
@@ -989,9 +1052,6 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
if (cluster.isEmpty() && path.documentType().isEmpty())
throw new IllegalArgumentException("Must set 'cluster' parameter to a valid content cluster id when visiting at a root /document/v1/ level");
- Optional<Integer> slices = getProperty(request, SLICES, integerParser);
- Optional<Integer> sliceId = getProperty(request, SLICE_ID, integerParser);
-
VisitorParameters parameters = parseCommonParameters(request, path, cluster);
parameters.setFieldSet(getProperty(request, FIELD_SET).orElse(path.documentType().map(type -> type + ":[document]").orElse(AllFields.NAME)));
parameters.setMaxTotalHits(wantedDocumentCount);
@@ -1006,10 +1066,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
parameters.setThrottlePolicy(throttlePolicy);
parameters.visitInconsistentBuckets(true);
parameters.setSessionTimeoutMs(Math.max(1, request.getTimeout(TimeUnit.MILLISECONDS) - handlerTimeout.toMillis()));
- if (slices.isPresent() && sliceId.isPresent())
- parameters.slice(slices.get(), sliceId.get());
- else if (slices.isPresent() != sliceId.isPresent())
- throw new IllegalArgumentException("None or both of '" + SLICES + "' and '" + SLICE_ID + "' must be set");
+
return parameters;
}
@@ -1044,6 +1101,13 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
List.of(FixedBucketSpaces.defaultSpace(), FixedBucketSpaces.globalSpace()),
getProperty(request, BUCKET_SPACE)));
+ Optional<Integer> slices = getProperty(request, SLICES, integerParser);
+ Optional<Integer> sliceId = getProperty(request, SLICE_ID, integerParser);
+ if (slices.isPresent() && sliceId.isPresent())
+ parameters.slice(slices.get(), sliceId.get());
+ else if (slices.isPresent() != sliceId.isPresent())
+ throw new IllegalArgumentException("None or both of '" + SLICES + "' and '" + SLICE_ID + "' must be set");
+
return parameters;
}
@@ -1124,14 +1188,22 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
response.writeDocumentsArrayStart();
}
@Override public void onDocument(JsonResponse response, Document document, Runnable ack, Consumer<String> onError) {
- if (streamed)
- response.writeDocumentValue(document, new CompletionHandler() {
- @Override public void completed() { ack.run();}
- @Override public void failed(Throwable t) { ack.run(); onError.accept(t.getMessage()); }
- });
- else {
- response.writeDocumentValue(document, null);
- ack.run();
+ try {
+ if (streamed)
+ response.writeDocumentValue(document, new CompletionHandler() {
+ @Override public void completed() { ack.run();}
+ @Override public void failed(Throwable t) {
+ ack.run();
+ onError.accept(t.getMessage());
+ }
+ });
+ else {
+ response.writeDocumentValue(document, null);
+ ack.run();
+ }
+ }
+ catch (Exception e) {
+ onError.accept(e.getMessage());
}
}
@Override public void onEnd(JsonResponse response) throws IOException {
diff --git a/vespalib/src/tests/executor/threadstackexecutor_test.cpp b/vespalib/src/tests/executor/threadstackexecutor_test.cpp
index e61dc071b62..688d98ff032 100644
--- a/vespalib/src/tests/executor/threadstackexecutor_test.cpp
+++ b/vespalib/src/tests/executor/threadstackexecutor_test.cpp
@@ -4,6 +4,7 @@
#include <vespa/vespalib/util/threadstackexecutor.h>
#include <vespa/vespalib/util/backtrace.h>
#include <vespa/vespalib/util/size_literals.h>
+#include <atomic>
#include <thread>
using namespace vespalib;