diff options
author | sariel <shahar.ariel@verizonmedia.com> | 2021-11-01 16:56:18 +0200 |
---|---|---|
committer | sariel <shahar.ariel@verizonmedia.com> | 2021-11-01 16:56:18 +0200 |
commit | 0039fba5e9615536e81192a88e08d0fdc9fa13b4 (patch) | |
tree | 32dab3fad991266db058cf02ec190b583f9a9bce | |
parent | 0d8dd535f755f6751f1b94ec3b6dfe1fe6c59846 (diff) | |
parent | f2d3685aff36e286a4aafdb90a019bbad36ddc0d (diff) |
Merge remote-tracking branch 'origin/master'
20 files changed, 212 insertions, 50 deletions
diff --git a/client/pom.xml b/client/pom.xml index 816941d7b7f..ba153aed8f8 100644 --- a/client/pom.xml +++ b/client/pom.xml @@ -41,10 +41,8 @@ </dependency> <dependency> <groupId>org.codehaus.groovy</groupId> - <artifactId>groovy-all</artifactId> - <!-- any version of Groovy \>= 1.8.2 should work here --> - <version>2.5.6</version> - <type>pom</type> + <artifactId>groovy</artifactId> + <version>3.0.8</version> <scope>test</scope> </dependency> </dependencies> @@ -54,7 +52,7 @@ <plugin> <groupId>org.codehaus.gmavenplus</groupId> <artifactId>gmavenplus-plugin</artifactId> - <version>1.7.0</version> + <version>1.13.0</version> <executions> <execution> <goals> diff --git a/config-model/src/main/java/com/yahoo/vespa/model/container/ApplicationContainer.java b/config-model/src/main/java/com/yahoo/vespa/model/container/ApplicationContainer.java index d51427abd90..9ad257fad04 100644 --- a/config-model/src/main/java/com/yahoo/vespa/model/container/ApplicationContainer.java +++ b/config-model/src/main/java/com/yahoo/vespa/model/container/ApplicationContainer.java @@ -36,6 +36,7 @@ public final class ApplicationContainer extends Container implements addComponent(new SimpleComponent("com.yahoo.container.jdisc.messagebus.NetworkMultiplexerProvider")); addComponent(new SimpleComponent("com.yahoo.container.jdisc.messagebus.SessionCache")); addComponent(new SimpleComponent("com.yahoo.container.jdisc.SystemInfoProvider")); + addComponent(new SimpleComponent("com.yahoo.container.jdisc.ZoneInfoProvider")); } @Override diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/tenant/TenantRepository.java b/configserver/src/main/java/com/yahoo/vespa/config/server/tenant/TenantRepository.java index 1499100307c..70e7dbe19ec 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/tenant/TenantRepository.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/tenant/TenantRepository.java @@ -221,9 +221,12 @@ public class TenantRepository { } private ExecutorService createModelBuilderExecutor(int numThreads) { + final long GB = 1024*1024*1024; if (numThreads == 0) return new InThreadExecutorService(); if (numThreads < 0) { - numThreads = Runtime.getRuntime().availableProcessors(); + long maxHeap = Runtime.getRuntime().maxMemory(); + int maxThreadsToFitInMemory = (int)((maxHeap + (GB - 1))/(1*GB)); + numThreads = Math.min(Runtime.getRuntime().availableProcessors(), maxThreadsToFitInMemory); } return Executors.newFixedThreadPool(numThreads, ThreadFactoryFactory.getDaemonThreadFactory("deploy-helper")); } @@ -348,7 +351,7 @@ public class TenantRepository { PermanentApplicationPackage permanentApplicationPackage = new PermanentApplicationPackage(configserverConfig); SessionPreparer sessionPreparer = new SessionPreparer(modelFactoryRegistry, fileDistributionFactory, - deployHelperExecutor, + deployHelperExecutor, hostProvisionerProvider, permanentApplicationPackage, configserverConfig, diff --git a/configserver/src/main/sh/start-configserver b/configserver/src/main/sh/start-configserver index 317af4b2fea..b3b449fb5a1 100755 --- a/configserver/src/main/sh/start-configserver +++ b/configserver/src/main/sh/start-configserver @@ -171,6 +171,7 @@ vespa-run-as-vespa-user vespa-runserver -s configserver -r 30 -p $pidfile -- \ -XX:-OmitStackTraceInFastThrow \ -XX:MaxJavaStackTraceDepth=1000000 \ $jvmargs \ + --illegal-access=warn \ --add-opens=java.base/java.io=ALL-UNNAMED \ --add-opens=java.base/java.lang=ALL-UNNAMED \ --add-opens=java.base/java.net=ALL-UNNAMED \ diff --git a/container-core/src/main/java/com/yahoo/jdisc/http/server/jetty/ServletOutputStreamWriter.java b/container-core/src/main/java/com/yahoo/jdisc/http/server/jetty/ServletOutputStreamWriter.java index ffd342f36cd..44daec42b88 100644 --- a/container-core/src/main/java/com/yahoo/jdisc/http/server/jetty/ServletOutputStreamWriter.java +++ b/container-core/src/main/java/com/yahoo/jdisc/http/server/jetty/ServletOutputStreamWriter.java @@ -198,7 +198,7 @@ class ServletOutputStreamWriter { final int bytesToSend = buffer.remaining(); try { if (buffer.hasArray()) { - outputStream.write(buffer.array(), buffer.arrayOffset(), buffer.remaining()); + outputStream.write(buffer.array(), buffer.arrayOffset() + buffer.position(), buffer.remaining()); } else { final byte[] array = new byte[buffer.remaining()]; buffer.get(array); diff --git a/container-core/src/test/java/com/yahoo/jdisc/http/server/jetty/Utils.java b/container-core/src/test/java/com/yahoo/jdisc/http/server/jetty/Utils.java index 5b341e0a3e6..d4b0771f482 100644 --- a/container-core/src/test/java/com/yahoo/jdisc/http/server/jetty/Utils.java +++ b/container-core/src/test/java/com/yahoo/jdisc/http/server/jetty/Utils.java @@ -81,5 +81,4 @@ class Utils { client.start(); return client; } - } diff --git a/container-disc/src/main/java/com/yahoo/container/jdisc/ZoneInfoProvider.java b/container-disc/src/main/java/com/yahoo/container/jdisc/ZoneInfoProvider.java new file mode 100644 index 00000000000..30a4c740ff0 --- /dev/null +++ b/container-disc/src/main/java/com/yahoo/container/jdisc/ZoneInfoProvider.java @@ -0,0 +1,30 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.container.jdisc; + +import ai.vespa.cloud.Environment; +import ai.vespa.cloud.Zone; +import ai.vespa.cloud.ZoneInfo; +import com.google.inject.Inject; +import com.yahoo.cloud.config.ConfigserverConfig; +import com.yahoo.component.AbstractComponent; +import com.yahoo.container.di.componentgraph.Provider; + +/** + * Provides information about the zone in which this container is running. + * This is available and can be injected when running in a cloud environment. + * + * @author bratseth + */ +public class ZoneInfoProvider extends AbstractComponent implements Provider<ZoneInfo> { + + private final ZoneInfo instance; + + @Inject + public ZoneInfoProvider(ConfigserverConfig csConfig) { + this.instance = new ZoneInfo(new Zone(Environment.valueOf(csConfig.environment()), csConfig.region())); + } + + @Override + public ZoneInfo get() { return instance; } + +} diff --git a/container-disc/src/main/sh/vespa-start-container-daemon.sh b/container-disc/src/main/sh/vespa-start-container-daemon.sh index 8c122d3170e..730dd350ca7 100755 --- a/container-disc/src/main/sh/vespa-start-container-daemon.sh +++ b/container-disc/src/main/sh/vespa-start-container-daemon.sh @@ -260,6 +260,7 @@ exec $numactlcmd $envcmd java \ -XX:HeapDumpPath="${VESPA_HOME}/var/crash" \ -XX:ErrorFile="${VESPA_HOME}/var/crash/hs_err_pid%p.log" \ -XX:+ExitOnOutOfMemoryError \ + --illegal-access=warn \ --add-opens=java.base/java.io=ALL-UNNAMED \ --add-opens=java.base/java.lang=ALL-UNNAMED \ --add-opens=java.base/java.net=ALL-UNNAMED \ diff --git a/dist/vespa.spec b/dist/vespa.spec index e6eff61b214..92713517988 100644 --- a/dist/vespa.spec +++ b/dist/vespa.spec @@ -278,6 +278,7 @@ Requires: gtest %define _skip_vespamalloc 1 %endif Requires: %{name}-base = %{version}-%{release} +Requires: %{name}-base-libs = %{version}-%{release} Requires: %{name}-libs = %{version}-%{release} Requires: %{name}-clients = %{version}-%{release} Requires: %{name}-config-model-fat = %{version}-%{release} @@ -464,6 +465,7 @@ Vespa - The open big data serving engine - tools Summary: Vespa - The open big data serving engine - ann-benchmark +Requires: %{name}-base-libs = %{version}-%{release} Requires: %{name}-libs = %{version}-%{release} %if 0%{?el7} Requires: python3 @@ -487,6 +489,15 @@ nearest neighbor search used for low-level benchmarking. %setup -c -D -T %else %setup -q +echo '%{version}' > VERSION +case '%{version}' in + *.0) + : + ;; + *) + sed -i -e 's,<version>[0-9].*-SNAPSHOT</version>,<version>%{version}</version>,' $(find . -name pom.xml -print) + ;; +esac %endif %build diff --git a/flags/src/main/java/com/yahoo/vespa/flags/Flags.java b/flags/src/main/java/com/yahoo/vespa/flags/Flags.java index ef4b232c68a..18c2e794994 100644 --- a/flags/src/main/java/com/yahoo/vespa/flags/Flags.java +++ b/flags/src/main/java/com/yahoo/vespa/flags/Flags.java @@ -130,14 +130,14 @@ public class Flags { public static final UnboundBooleanFlag USE_THREE_PHASE_UPDATES = defineFeatureFlag( "use-three-phase-updates", false, - List.of("vekterli"), "2020-12-02", "2021-11-01", + List.of("vekterli"), "2020-12-02", "2022-01-01", "Whether to enable the use of three-phase updates when bucket replicas are out of sync.", "Takes effect at redeployment", ZONE_ID, APPLICATION_ID); public static final UnboundBooleanFlag HIDE_SHARED_ROUTING_ENDPOINT = defineFeatureFlag( "hide-shared-routing-endpoint", false, - List.of("tokle", "bjormel"), "2020-12-02", "2021-11-01", + List.of("tokle", "bjormel"), "2020-12-02", "2022-01-01", "Whether the controller should hide shared routing layer endpoint", "Takes effect immediately", APPLICATION_ID @@ -173,27 +173,27 @@ public class Flags { public static final UnboundIntFlag NUM_DEPLOY_HELPER_THREADS = defineIntFlag( "num-model-builder-threads", -1, - List.of("balder"), "2021-09-09", "2021-11-01", + List.of("balder"), "2021-09-09", "2022-01-01", "Number of threads used for speeding up building of models.", "Takes effect on first (re)start of config server"); public static final UnboundBooleanFlag ENABLE_FEED_BLOCK_IN_DISTRIBUTOR = defineFeatureFlag( "enable-feed-block-in-distributor", true, - List.of("geirst"), "2021-01-27", "2021-11-01", + List.of("geirst"), "2021-01-27", "2022-01-31", "Enables blocking of feed in the distributor if resource usage is above limit on at least one content node", "Takes effect at redeployment", ZONE_ID, APPLICATION_ID); public static final UnboundBooleanFlag CONTAINER_DUMP_HEAP_ON_SHUTDOWN_TIMEOUT = defineFeatureFlag( "container-dump-heap-on-shutdown-timeout", false, - List.of("baldersheim"), "2021-09-25", "2021-11-01", + List.of("baldersheim"), "2021-09-25", "2022-01-01", "Will trigger a heap dump during if container shutdown times out", "Takes effect at redeployment", ZONE_ID, APPLICATION_ID); public static final UnboundDoubleFlag CONTAINER_SHUTDOWN_TIMEOUT = defineDoubleFlag( "container-shutdown-timeout", 50.0, - List.of("baldersheim"), "2021-09-25", "2021-11-01", + List.of("baldersheim"), "2021-09-25", "2022-01-01", "Timeout for shutdown of a jdisc container", "Takes effect at redeployment", ZONE_ID, APPLICATION_ID); @@ -221,14 +221,14 @@ public class Flags { public static final UnboundIntFlag MAX_CONCURRENT_MERGES_PER_NODE = defineIntFlag( "max-concurrent-merges-per-node", 128, - List.of("balder", "vekterli"), "2021-06-06", "2021-11-01", + List.of("balder", "vekterli"), "2021-06-06", "2022-01-01", "Specifies max concurrent merges per content node.", "Takes effect at redeploy", ZONE_ID, APPLICATION_ID); public static final UnboundIntFlag MAX_MERGE_QUEUE_SIZE = defineIntFlag( "max-merge-queue-size", 1024, - List.of("balder", "vekterli"), "2021-06-06", "2021-11-01", + List.of("balder", "vekterli"), "2021-06-06", "2022-01-01", "Specifies max size of merge queue.", "Takes effect at redeploy", ZONE_ID, APPLICATION_ID); @@ -243,7 +243,7 @@ public class Flags { public static final UnboundIntFlag LARGE_RANK_EXPRESSION_LIMIT = defineIntFlag( "large-rank-expression-limit", 8192, - List.of("baldersheim"), "2021-06-09", "2021-11-01", + List.of("baldersheim"), "2021-06-09", "2022-01-01", "Limit for size of rank expressions distributed by filedistribution", "Takes effect on next internal redeployment", APPLICATION_ID); @@ -269,20 +269,20 @@ public class Flags { public static final UnboundListFlag<String> ALLOWED_SERVICE_VIEW_APIS = defineListFlag( "allowed-service-view-apis", List.of("state/v1/"), String.class, - List.of("mortent"), "2021-08-05", "2021-11-01", + List.of("mortent"), "2021-08-05", "2022-01-01", "Apis allowed to proxy through the service view api", "Takes effect immediately"); public static final UnboundBooleanFlag SEPARATE_TENANT_IAM_ROLES = defineFeatureFlag( "separate-tenant-iam-roles", false, - List.of("mortent"), "2021-08-12", "2021-11-01", + List.of("mortent"), "2021-08-12", "2022-01-01", "Create separate iam roles for tenant", "Takes effect on redeploy", TENANT_ID); public static final UnboundIntFlag METRICSPROXY_NUM_THREADS = defineIntFlag( "metricsproxy-num-threads", 2, - List.of("balder"), "2021-09-01", "2021-11-01", + List.of("balder"), "2021-09-01", "2022-01-01", "Number of threads for metrics proxy", "Takes effect at redeployment", ZONE_ID, APPLICATION_ID); diff --git a/hosted-zone-api/abi-spec.json b/hosted-zone-api/abi-spec.json index 4195fd5f10c..6a6da57a2a1 100644 --- a/hosted-zone-api/abi-spec.json +++ b/hosted-zone-api/abi-spec.json @@ -78,5 +78,17 @@ "public static ai.vespa.cloud.Zone from(java.lang.String)" ], "fields": [] + }, + "ai.vespa.cloud.ZoneInfo": { + "superClass": "java.lang.Object", + "interfaces": [], + "attributes": [ + "public" + ], + "methods": [ + "public void <init>(ai.vespa.cloud.Zone)", + "public ai.vespa.cloud.Zone zone()" + ], + "fields": [] } }
\ No newline at end of file diff --git a/hosted-zone-api/src/main/java/ai/vespa/cloud/ZoneInfo.java b/hosted-zone-api/src/main/java/ai/vespa/cloud/ZoneInfo.java new file mode 100644 index 00000000000..d9af2421ab9 --- /dev/null +++ b/hosted-zone-api/src/main/java/ai/vespa/cloud/ZoneInfo.java @@ -0,0 +1,27 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package ai.vespa.cloud; + +import java.util.Objects; + +/** + * Provides information about the zone in which this container is running. + * This is available and can be injected when running in a cloud environment. + * If you don't need any other information than the zone this should be preferred + * to SystemInfo as it will never change at runtime and therefore does not + * cause unnecessary reconstruction. + * + * @author bratseth + */ +public class ZoneInfo { + + private final Zone zone; + + public ZoneInfo(Zone zone) { + Objects.requireNonNull(zone, "Zone cannot be null!"); + this.zone = zone; + } + + /** Returns the zone this is running in */ + public Zone zone() { return zone; } + +} diff --git a/searchcore/src/tests/proton/index/indexmanager_test.cpp b/searchcore/src/tests/proton/index/indexmanager_test.cpp index be1e2c79f70..aa118600272 100644 --- a/searchcore/src/tests/proton/index/indexmanager_test.cpp +++ b/searchcore/src/tests/proton/index/indexmanager_test.cpp @@ -142,8 +142,7 @@ struct IndexManagerTest : public ::testing::Test { void resetIndexManager(); void removeDocument(uint32_t docId, SerialNum serialNum) { runAsIndex([&]() { _index_manager->removeDocument(docId, serialNum); - _index_manager->commit(serialNum, - emptyDestructorCallback); + _index_manager->commit(serialNum, emptyDestructorCallback); }); _writeService.indexFieldWriter().sync_all(); } diff --git a/searchcore/src/vespa/searchcore/proton/server/searchable_feed_view.cpp b/searchcore/src/vespa/searchcore/proton/server/searchable_feed_view.cpp index ca6551d7453..d973777020d 100644 --- a/searchcore/src/vespa/searchcore/proton/server/searchable_feed_view.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/searchable_feed_view.cpp @@ -170,11 +170,13 @@ void SearchableFeedView::handleCompactLidSpace(const CompactLidSpaceOperation &op) { Parent::handleCompactLidSpace(op); + vespalib::Gate gate; _writeService.index().execute( - makeLambdaTask([this, &op]() { + makeLambdaTask([this, &op, &gate]() { _indexWriter->compactLidSpace(op.getSerialNum(), op.getLidLimit()); + gate.countDown(); })); - _writeService.index().sync(); + gate.await(); } void diff --git a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp index c9af1d86bb4..c9a0892d3c3 100644 --- a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp @@ -769,10 +769,12 @@ StoreOnlyFeedView::handleCompactLidSpace(const CompactLidSpaceOperation &op) internalForceCommit(CommitParam(serialNum), commitContext); } if (useDocumentStore(serialNum)) { - _writeService.summary().execute(makeLambdaTask([this, &op]() { + vespalib::Gate gate; + _writeService.summary().execute(makeLambdaTask([this, &op, &gate]() { _summaryAdapter->compactLidSpace(op.getLidLimit()); + gate.countDown(); })); - _writeService.summary().sync(); + gate.await(); } } diff --git a/searchlib/src/vespa/searchlib/common/unique_issues.h b/searchlib/src/vespa/searchlib/common/unique_issues.h index 79cb7a9aa23..17c42da060c 100644 --- a/searchlib/src/vespa/searchlib/common/unique_issues.h +++ b/searchlib/src/vespa/searchlib/common/unique_issues.h @@ -18,7 +18,8 @@ private: public: using UP = std::unique_ptr<UniqueIssues>; void handle(const vespalib::Issue &issue) override; - void for_each_message(auto fun) const { + template <class Function> + void for_each_message(Function fun) const { for (const auto &msg: _messages) { fun(msg); } diff --git a/standalone-container/src/main/sh/standalone-container.sh b/standalone-container/src/main/sh/standalone-container.sh index 9e888bdfea2..9f862d2b14a 100755 --- a/standalone-container/src/main/sh/standalone-container.sh +++ b/standalone-container/src/main/sh/standalone-container.sh @@ -163,6 +163,7 @@ StartCommand() { -XX:+HeapDumpOnOutOfMemoryError \ -XX:HeapDumpPath="$VESPA_HOME/var/crash" \ -XX:+ExitOnOutOfMemoryError \ + --illegal-access=warn \ --add-opens=java.base/java.io=ALL-UNNAMED \ --add-opens=java.base/java.lang=ALL-UNNAMED \ --add-opens=java.base/java.net=ALL-UNNAMED \ diff --git a/storage/src/vespa/storage/persistence/mergehandler.h b/storage/src/vespa/storage/persistence/mergehandler.h index 4daec4c0689..5e66b364242 100644 --- a/storage/src/vespa/storage/persistence/mergehandler.h +++ b/storage/src/vespa/storage/persistence/mergehandler.h @@ -21,6 +21,7 @@ #include <vespa/storage/common/cluster_context.h> #include <vespa/storage/common/messagesender.h> #include <vespa/vespalib/util/monitored_refcount.h> +#include <atomic> namespace vespalib { class ISequencedTaskExecutor; } diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java index 20121d8d193..29fd419daa2 100644 --- a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java +++ b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java @@ -68,6 +68,7 @@ import com.yahoo.vespa.config.content.AllClustersBucketSpacesConfig; import com.yahoo.yolean.Exceptions; import com.yahoo.yolean.Exceptions.RunnableThrowingIOException; +import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -83,9 +84,11 @@ import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.Queue; import java.util.StringJoiner; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Executors; import java.util.concurrent.Phaser; import java.util.concurrent.ScheduledExecutorService; @@ -577,12 +580,19 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { private static class JsonResponse implements AutoCloseable { private static final ByteBuffer emptyBuffer = ByteBuffer.wrap(new byte[0]); + private static final int FLUSH_SIZE = 128; private final BufferedContentChannel buffer = new BufferedContentChannel(); private final OutputStream out = new ContentChannelOutputStream(buffer); private final JsonGenerator json; private final ResponseHandler handler; + private final Queue<CompletionHandler> acks = new ConcurrentLinkedQueue<>(); + private final Queue<ByteArrayOutputStream> docs = new ConcurrentLinkedQueue<>(); + private final AtomicLong documentsWritten = new AtomicLong(); + private final AtomicLong documentsFlushed = new AtomicLong(); + private final AtomicLong documentsAcked = new AtomicLong(); private boolean documentsDone = false; + private boolean first = true; private ContentChannel channel; private JsonResponse(ResponseHandler handler) throws IOException { @@ -695,19 +705,72 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { json.writeArrayFieldStart("documents"); } - synchronized void writeDocumentValue(Document document, CompletionHandler completionHandler) { - if ( ! documentsDone) - new JsonWriter(json).write(document); - + /** Writes documents to an internal queue, which is flushed regularly. */ + void writeDocumentValue(Document document, CompletionHandler completionHandler) throws IOException { if (completionHandler != null) { - if ( ! documentsDone) - buffer.write(emptyBuffer, completionHandler); + acks.add(completionHandler); + ackDocuments(); + } + + // Serialise document and add to queue, not necessarily in the order dictated by "written" above, + // i.e., the first 128 documents in the queue are not necessarily the ones ack'ed early. + ByteArrayOutputStream myOut = new ByteArrayOutputStream(1); + myOut.write(','); // Prepend rather than append, to avoid double memory copying. + try (JsonGenerator myJson = jsonFactory.createGenerator(myOut)) { + new JsonWriter(myJson).write(document); + } + docs.add(myOut); + + // Flush the first FLUSH_SIZE documents in the queue to the network layer if chunk is filled. + if (documentsWritten.incrementAndGet() % FLUSH_SIZE == 0) { + flushDocuments(); + } + } + + void ackDocuments() { + while (documentsAcked.incrementAndGet() <= documentsFlushed.get() + FLUSH_SIZE) { + CompletionHandler ack = acks.poll(); + if (ack != null) + ack.completed(); else - completionHandler.completed(); + break; } + documentsAcked.decrementAndGet(); // We overshoot by one above, so decrement again when done. + } + + synchronized void flushDocuments() throws IOException { + for (int i = 0; i < FLUSH_SIZE; i++) { + ByteArrayOutputStream doc = docs.poll(); + if (doc == null) + break; + + if ( ! documentsDone) { + if (first) { // First chunk, remove leading comma from first document, and flush "json" to "buffer". + json.flush(); + buffer.write(ByteBuffer.wrap(doc.toByteArray(), 1, doc.size() - 1), null); + first = false; + } + else { + buffer.write(ByteBuffer.wrap(doc.toByteArray()), null); + } + } + } + + // Ensure new, eligible acks are done, after flushing these documents. + buffer.write(emptyBuffer, new CompletionHandler() { + @Override public void completed() { + documentsFlushed.addAndGet(FLUSH_SIZE); + ackDocuments(); + } + @Override public void failed(Throwable t) { + log.log(WARNING, "Error writing documents", t); + completed(); + } + }); } synchronized void writeArrayEnd() throws IOException { + flushDocuments(); documentsDone = true; json.writeEndArray(); } @@ -989,9 +1052,6 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { if (cluster.isEmpty() && path.documentType().isEmpty()) throw new IllegalArgumentException("Must set 'cluster' parameter to a valid content cluster id when visiting at a root /document/v1/ level"); - Optional<Integer> slices = getProperty(request, SLICES, integerParser); - Optional<Integer> sliceId = getProperty(request, SLICE_ID, integerParser); - VisitorParameters parameters = parseCommonParameters(request, path, cluster); parameters.setFieldSet(getProperty(request, FIELD_SET).orElse(path.documentType().map(type -> type + ":[document]").orElse(AllFields.NAME))); parameters.setMaxTotalHits(wantedDocumentCount); @@ -1006,10 +1066,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { parameters.setThrottlePolicy(throttlePolicy); parameters.visitInconsistentBuckets(true); parameters.setSessionTimeoutMs(Math.max(1, request.getTimeout(TimeUnit.MILLISECONDS) - handlerTimeout.toMillis())); - if (slices.isPresent() && sliceId.isPresent()) - parameters.slice(slices.get(), sliceId.get()); - else if (slices.isPresent() != sliceId.isPresent()) - throw new IllegalArgumentException("None or both of '" + SLICES + "' and '" + SLICE_ID + "' must be set"); + return parameters; } @@ -1044,6 +1101,13 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { List.of(FixedBucketSpaces.defaultSpace(), FixedBucketSpaces.globalSpace()), getProperty(request, BUCKET_SPACE))); + Optional<Integer> slices = getProperty(request, SLICES, integerParser); + Optional<Integer> sliceId = getProperty(request, SLICE_ID, integerParser); + if (slices.isPresent() && sliceId.isPresent()) + parameters.slice(slices.get(), sliceId.get()); + else if (slices.isPresent() != sliceId.isPresent()) + throw new IllegalArgumentException("None or both of '" + SLICES + "' and '" + SLICE_ID + "' must be set"); + return parameters; } @@ -1124,14 +1188,22 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { response.writeDocumentsArrayStart(); } @Override public void onDocument(JsonResponse response, Document document, Runnable ack, Consumer<String> onError) { - if (streamed) - response.writeDocumentValue(document, new CompletionHandler() { - @Override public void completed() { ack.run();} - @Override public void failed(Throwable t) { ack.run(); onError.accept(t.getMessage()); } - }); - else { - response.writeDocumentValue(document, null); - ack.run(); + try { + if (streamed) + response.writeDocumentValue(document, new CompletionHandler() { + @Override public void completed() { ack.run();} + @Override public void failed(Throwable t) { + ack.run(); + onError.accept(t.getMessage()); + } + }); + else { + response.writeDocumentValue(document, null); + ack.run(); + } + } + catch (Exception e) { + onError.accept(e.getMessage()); } } @Override public void onEnd(JsonResponse response) throws IOException { diff --git a/vespalib/src/tests/executor/threadstackexecutor_test.cpp b/vespalib/src/tests/executor/threadstackexecutor_test.cpp index e61dc071b62..688d98ff032 100644 --- a/vespalib/src/tests/executor/threadstackexecutor_test.cpp +++ b/vespalib/src/tests/executor/threadstackexecutor_test.cpp @@ -4,6 +4,7 @@ #include <vespa/vespalib/util/threadstackexecutor.h> #include <vespa/vespalib/util/backtrace.h> #include <vespa/vespalib/util/size_literals.h> +#include <atomic> #include <thread> using namespace vespalib; |