aboutsummaryrefslogtreecommitdiffstats
path: root/vespa-http-client/src/main/java/com
diff options
context:
space:
mode:
Diffstat (limited to 'vespa-http-client/src/main/java/com')
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/FeedClient.java143
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/FeedClientFactory.java60
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/FeedConnectException.java25
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/FeedEndpointException.java28
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/FeedProtocolException.java44
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/Result.java152
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/Session.java65
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/SessionFactory.java82
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/SimpleLoggerResultCallback.java113
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/SyncFeedClient.java193
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/Cluster.java72
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/ConnectionParams.java486
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/Endpoint.java82
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/FeedParams.java332
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/SessionParams.java193
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/package-info.java8
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/Document.java96
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/Encoder.java100
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/EndpointResult.java29
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/ErrorCode.java33
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/Exceptions.java47
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/Headers.java37
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/JsonReader.java245
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/OperationStatus.java90
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/ServerResponseException.java44
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/ThrottlePolicy.java74
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/XmlFeedReader.java154
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/api/FeedClientImpl.java102
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/api/MultiClusterSessionOutputStream.java40
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/api/SessionImpl.java73
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ApacheGatewayConnection.java507
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ApacheGatewayConnectionFactory.java63
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ByteBufferInputStream.java77
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ClusterConnection.java193
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DocumentQueue.java125
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DryRunGatewayConnection.java123
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DryRunGatewayConnectionFactory.java26
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/EndpointIOException.java26
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/EndpointResultQueue.java136
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/GatewayConnection.java36
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/GatewayConnectionFactory.java13
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/GatewayThrottler.java46
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java649
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/ConcurrentDocumentOperationBlocker.java69
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/DocumentSendInfo.java75
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/EndPointResultFactory.java92
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/IncompleteResultsThrottler.java181
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessor.java307
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationStats.java76
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/package-info.java2
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/package-info.java9
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/runner/CommandLineArguments.java323
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/runner/FormatInputStream.java101
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/runner/Runner.java106
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/runner/package-info.java2
55 files changed, 0 insertions, 6605 deletions
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/FeedClient.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/FeedClient.java
deleted file mode 100644
index d9ff09552ef..00000000000
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/FeedClient.java
+++ /dev/null
@@ -1,143 +0,0 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.vespa.http.client;
-
-import com.yahoo.vespa.http.client.core.JsonReader;
-import com.yahoo.vespa.http.client.core.XmlFeedReader;
-
-import java.io.InputStream;
-import java.util.concurrent.atomic.AtomicInteger;
-
-/**
- * API for feeding document operations (add, removes or updates) to one or many Vespa clusters.
- * Use the factory to configure and set up an instance of this. Instances are expensive - create one instance of this
- * and use it for all feed operations (from multiple threads, if desired) for the duration of your client runtime.
- * The feedclient does automatic error recovery and reconnects to hosts when connections die.
- *
- * A {@link FeedClientFactory} is provided to instantiate Sessions.
- *
- * See com.yahoo.text.Text.stripInvalidCharacters(String) to remove invalid characters from string fields before feeding
- *
- * Instances of this are multithread safe.
- *
- * @author dybis
- * @see FeedClientFactory
- * @deprecated Vespa-http-client will be removed in Vespa 8. It's replaced by <a href="https://docs.vespa.ai/en/vespa-feed-client.html">vespa-feed-client</a>
- */
-@Deprecated
-public interface FeedClient extends AutoCloseable {
-
- /**
- * Issues a document operation to the configured cluster(s).
- * If the pipeline and buffers are full, this call will be blocking, ensuring that operations are not
- * produced faster than the can be handled. Transient failured are retried internally by this client.
- * Exactly one callback will always be received for each (completed) call to this.
- *
- * @param documentId the document id of the document
- * @param documentData the document data as JSON or XML (as specified when using the factory to create the API)
- */
- default void stream(String documentId, CharSequence documentData) {
- stream(documentId, documentData, null);
- }
-
- /**
- * Issues a document operation to the configured cluster(s).
- * If the pipeline and buffers are full, this call will be blocking, ensuring that operations are not
- * produced faster than the can be handled. Transient failured are retried internally by this client.
- * Exactly one callback will always be received for each (completed) call to this.
- *
- * @param documentId the document id of the document
- * @param documentData the document data as JSON or XML (as specified when using the factory to create the API)
- * @param context a context object which will be accessible in the result of the callback, or null if none
- */
- default void stream(String documentId, CharSequence documentData, Object context) {
- stream(documentId, null, documentData, context);
- }
-
- /**
- * Issues a document operation to the configured cluster(s).
- * If the pipeline and buffers are full, this call will be blocking, ensuring that operations are not
- * produced faster than the can be handled. Transient failures are retried internally by this client.
- * Exactly one callback will always be received for each (completed) call to this.
- *
- * @param documentId the document id of the document
- * @param operationId the id to use for this operation, or null to let the client decide an operation id.
- * This id must be unique for every operation. Passing the operation id allows clients
- * to prepare to receive a response for it before issuing the operation to the client.
- * @param documentData the document data as JSON or XML (as specified when using the factory to create the API)
- * @param context a context object which will be accessible in the result of the callback, or null if none
- */
- void stream(String documentId, String operationId, CharSequence documentData, Object context);
-
- /**
- * Waits for all results to arrive and closes the FeedClient. Don't call any other method after calling close().
- * Does not throw any exceptions.
- */
- @Override
- void close();
-
- /**
- * Returns stats about the cluster
- *
- * @return JSON string with information about cluster
- */
- String getStatsAsJson();
-
- /**
- * Utility function that takes an array of JSON documents and calls the FeedClient for each element.
- *
- * @param inputStream the stream to feed. This can be a very large stream.
- * The outer element must be an array of document operations.
- * @param feedClient the feed client that will receive the document operations
- * @param numSent increased per document sent to API (but not waiting for results)
- */
- static void feedJson(InputStream inputStream, FeedClient feedClient, AtomicInteger numSent) {
- JsonReader.read(inputStream, feedClient, numSent);
- }
-
- /**
- * Utility function that takes an array of XML documents and calls the FeedClient for each element.
- * The XML document has to be formatted with line space on each line (like "regular" XML, but stricter
- * than the specifications of XML).
- *
- * @param inputStream the stream to feed. This can be a very large stream. Operations must be enclosed in a
- * top-level &lt;vespafeed&gt; tag
- * @param feedClient the feed client that will receive the document operations
- * @param numSent increased per document sent to API (but not waiting for results)
- */
- static void feedXml(InputStream inputStream, FeedClient feedClient, AtomicInteger numSent) {
- try {
- XmlFeedReader.read(inputStream, feedClient, numSent);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- /**
- * This callback is executed when new results are arriving or an error occur.
- * Don't do any heavy lifting in this thread (no IO, disk, or heavy CPU usage).
- * This call back will run in a different thread than your main program so use e.g.
- * AtomicInteger for counters and follow general guides for thread-safe programming.
- * There is an example implementation in class SimpleLoggerResultCallback.
- */
- interface ResultCallback {
-
- /**
- * This callback is always called exactly once for each feed operation passed to the client
- * instance, whether or not it was successful.
- */
- void onCompletion(String docId, Result documentResult);
-
- /**
- * Called with an exception whenever an endpoint specific error occurs during feeding.
- * The error may or may not be transient - the operation will in both cases be retried until it's successful.
- * This callback is intended for application level monitoring (logging, metrics, altering etc).
- * Document specific errors will be reported back through {@link #onCompletion(String, Result)}.
- *
- * @see FeedEndpointException
- * @param exception an exception specifying endpoint and cause. See {@link FeedEndpointException} for details.
- */
- default void onEndpointException(FeedEndpointException exception) {}
-
- }
-
-}
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/FeedClientFactory.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/FeedClientFactory.java
deleted file mode 100644
index ce2f9e0b140..00000000000
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/FeedClientFactory.java
+++ /dev/null
@@ -1,60 +0,0 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.vespa.http.client;
-
-
-import com.yahoo.vespa.http.client.config.SessionParams;
-import com.yahoo.vespa.http.client.core.api.FeedClientImpl;
-
-import java.time.Clock;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.ThreadFactory;
-
-/**
- * Factory for creating FeedClient.
- *
- * @author dybis
- * @deprecated Vespa-http-client will be removed in Vespa 8. It's Vespa-http-client will be removed in Vespa 8. It's replaced by <a href="https://docs.vespa.ai/en/vespa-feed-client.html">vespa-feed-client</a>
- */
-@Deprecated
-public class FeedClientFactory {
-
- /**
- * Creates a FeedClient. Call this sparingly: Feed clients are expensive and should be as long-lived as possible.
- *
- * @param sessionParams parameters for connection, hosts, cluster configurations and more.
- * @param resultCallback on each result, this callback is called.
- * @return newly created FeedClient API object.
- */
- public static FeedClient create(SessionParams sessionParams, FeedClient.ResultCallback resultCallback) {
- return new FeedClientImpl(sessionParams, resultCallback, createTimeoutExecutor(), Clock.systemUTC());
- }
-
- static ScheduledThreadPoolExecutor createTimeoutExecutor() {
- ScheduledThreadPoolExecutor timeoutExecutor;
- timeoutExecutor = new ScheduledThreadPoolExecutor(1, new DaemonThreadFactory("timeout-"));
- timeoutExecutor.setRemoveOnCancelPolicy(true);
- timeoutExecutor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
- timeoutExecutor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
- return timeoutExecutor;
- }
-
- private static class DaemonThreadFactory implements ThreadFactory {
-
- private final ThreadFactory defaultThreadFactory = Executors.defaultThreadFactory();
- private final String prefix;
-
- private DaemonThreadFactory(String prefix) {
- this.prefix = prefix;
- }
-
- @Override
- public Thread newThread(Runnable runnable) {
- Thread t = defaultThreadFactory.newThread(runnable);
- t.setDaemon(true);
- t.setName(prefix + t.getName());
- return t;
- }
- }
-
-}
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/FeedConnectException.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/FeedConnectException.java
deleted file mode 100644
index 822a670ae08..00000000000
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/FeedConnectException.java
+++ /dev/null
@@ -1,25 +0,0 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.vespa.http.client;
-
-import com.yahoo.vespa.http.client.config.Endpoint;
-
-/**
- * An exception thrown when the client is unable to connect to a feed endpoint.
- *
- * @author bjorncs
- * @deprecated Vespa-http-client will be removed in Vespa 8. It's replaced by <a href="https://docs.vespa.ai/en/vespa-feed-client.html">vespa-feed-client</a>
- */
-@Deprecated
-public class FeedConnectException extends FeedEndpointException {
-
- public FeedConnectException(Throwable cause, Endpoint endpoint) {
- super(createMessage(cause, endpoint), cause, endpoint);
- }
-
- private static String createMessage(Throwable cause, Endpoint endpoint) {
- return String.format("Handshake to endpoint '%s:%d' failed: %s",
- endpoint.getHostname(),
- endpoint.getPort(),
- cause.getMessage());
- }
-}
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/FeedEndpointException.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/FeedEndpointException.java
deleted file mode 100644
index 304e2ea321b..00000000000
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/FeedEndpointException.java
+++ /dev/null
@@ -1,28 +0,0 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.vespa.http.client;
-
-import com.yahoo.vespa.http.client.config.Endpoint;
-
-/**
- * An exception type for endpoint specific errors.
- *
- * @see FeedConnectException
- * @see FeedProtocolException
- * @author bjorncs
- * @deprecated Vespa-http-client will be removed in Vespa 8. It's replaced by <a href="https://docs.vespa.ai/en/vespa-feed-client.html">vespa-feed-client</a>
- */
-@Deprecated
-public abstract class FeedEndpointException extends RuntimeException {
-
- private final Endpoint endpoint;
-
- protected FeedEndpointException(String message, Throwable cause, Endpoint endpoint) {
- super(message, cause);
- this.endpoint = endpoint;
- }
-
- public Endpoint getEndpoint() {
- return endpoint;
- }
-
-}
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/FeedProtocolException.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/FeedProtocolException.java
deleted file mode 100644
index 93041cced1c..00000000000
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/FeedProtocolException.java
+++ /dev/null
@@ -1,44 +0,0 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.vespa.http.client;
-
-import com.yahoo.vespa.http.client.config.Endpoint;
-
-/**
- * An exception thrown when a feed endpoint returns an error during feeding.
- *
- * @author bjorncs
- * @deprecated Vespa-http-client will be removed in Vespa 8. It's replaced by <a href="https://docs.vespa.ai/en/vespa-feed-client.html">vespa-feed-client</a>
- */
-@Deprecated
-public class FeedProtocolException extends FeedEndpointException {
-
- private final int httpStatusCode;
- private final String httpResponseMessage;
-
- public FeedProtocolException(int httpStatusCode,
- String httpResponseMessage,
- Throwable cause,
- Endpoint endpoint) {
- super(createMessage(httpStatusCode, httpResponseMessage, endpoint), cause, endpoint);
- this.httpStatusCode = httpStatusCode;
- this.httpResponseMessage = httpResponseMessage;
- }
-
- private static String createMessage(int httpStatusCode,
- String httpResponseMessage,
- Endpoint endpoint) {
- return String.format("Endpoint '%s:%d' returned an error on handshake: %d - %s",
- endpoint.getHostname(),
- endpoint.getPort(),
- httpStatusCode,
- httpResponseMessage);
- }
-
- public int getHttpStatusCode() {
- return httpStatusCode;
- }
-
- public String getHttpResponseMessage() {
- return httpResponseMessage;
- }
-}
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/Result.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/Result.java
deleted file mode 100644
index 5db592da02f..00000000000
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/Result.java
+++ /dev/null
@@ -1,152 +0,0 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.vespa.http.client;
-
-import com.yahoo.vespa.http.client.config.Endpoint;
-import com.yahoo.vespa.http.client.core.Document;
-import com.yahoo.vespa.http.client.core.Exceptions;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-
-/**
- * The result of a stream operation. A Result refers to a single document,
- * but may contain more than one Result.Detail instances, as these pertains to a
- * single endpoint, and a Result may wrap data for multiple endpoints.
- *
- * @author Einar M R Rosenvinge
- * @deprecated Vespa-http-client will be removed in Vespa 8. It's replaced by <a href="https://docs.vespa.ai/en/vespa-feed-client.html">vespa-feed-client</a>
- */
-@Deprecated
-public class Result {
-
- public enum ResultType {
- OPERATION_EXECUTED,
- TRANSITIVE_ERROR,
- CONDITION_NOT_MET,
- FATAL_ERROR
- }
-
- private final Document document;
- private final boolean success;
- private final List<Detail> details;
- private final String localTrace;
-
- public Result(Document document, Collection<Detail> values, StringBuilder localTrace) {
- this.document = document;
- this.details = Collections.unmodifiableList(new ArrayList<>(values));
- this.success = details.stream().allMatch(d -> d.getResultType() == ResultType.OPERATION_EXECUTED);
- this.localTrace = localTrace == null ? null : localTrace.toString();
- }
-
- /** Returns the document id that this result is for */
- public String getDocumentId() {
- return document.getDocumentId();
- }
-
- /** Returns the id of the operation this is the result of */
- public String getOperationId() { return document.getOperationId(); }
-
- /** Returns the document data */
- public CharSequence getDocumentDataAsCharSequence() {
- return document.getDataAsString();
- }
-
- /** Returns the context of the object if any */
- public Object getContext() {
- return document.getContext();
- }
-
- /**
- * Returns true if the operation(s) was successful. If at least one {@link Detail}
- * in {@link #getDetails()} is unsuccessful, this will return false.
- */
- public boolean isSuccess() {
- return success;
- }
- public boolean isSuccessOrConditionNotMet() {
- return isSuccess() ||
- details.stream().allMatch(d -> d.getResultType() == Result.ResultType.OPERATION_EXECUTED ||
- d.getResultType() == Result.ResultType.CONDITION_NOT_MET);
- }
-
- public List<Detail> getDetails() { return details; }
-
- /** Returns whether the operation has been set up with local tracing */
- public boolean hasLocalTrace() {
- return localTrace != null;
- }
-
- /** Information in a Result for a single operation sent to a single endpoint. */
- public static final class Detail {
-
- private final ResultType resultType;
- private final Endpoint endpoint;
- private final Exception exception;
- private final String traceMessage;
-
- public Detail(Endpoint endpoint, ResultType resultType, String traceMessage, Exception e) {
- this.endpoint = endpoint;
- this.resultType = resultType;
- this.exception = e;
- this.traceMessage = traceMessage;
- }
-
- public Detail(Endpoint endpoint) {
- this.endpoint = endpoint;
- this.resultType = ResultType.OPERATION_EXECUTED;
- this.exception = null;
- this.traceMessage = null;
- }
-
- /**
- * Returns the endpoint from which the result was received,
- * or null if this failed before being assigned an endpoint
- */
- public Endpoint getEndpoint() {
- return endpoint;
- }
-
- /** Returns whether the operation was successful */
- public boolean isSuccess() {
- return resultType == ResultType.OPERATION_EXECUTED;
- }
-
- /** Returns the result of the operation */
- public ResultType getResultType() {
- return resultType;
- }
-
- /** Returns any exception related to this Detail, if unsuccessful. Might be null. */
- public Exception getException() {
- return exception;
- }
-
- /** Returns any trace message produces, or null if none */
- public String getTraceMessage() {
- return traceMessage;
- }
-
- @Override
- public String toString() {
- StringBuilder b = new StringBuilder();
- b.append("Detail ");
- b.append("resultType=").append(resultType);
- if (exception != null)
- b.append(" exception='").append(Exceptions.toMessageString(exception)).append("'");
- if (traceMessage != null && ! traceMessage.isEmpty())
- b.append(" trace='").append(traceMessage).append("'");
- if (endpoint != null)
- b.append(" endpoint=").append(endpoint);
- return b.toString();
- }
-
- }
-
- @Override
- public String toString() {
- return "Result for " + document + " " + (localTrace != null ? localTrace : "");
- }
-
-}
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/Session.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/Session.java
deleted file mode 100644
index 203f2d3b462..00000000000
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/Session.java
+++ /dev/null
@@ -1,65 +0,0 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.vespa.http.client;
-
-import java.io.OutputStream;
-import java.util.concurrent.BlockingQueue;
-
-/**
- * A Session is an entity used to feed operations (like documents, removes or updates) to one Vespa
- * cluster or several clusters in parallel. Current implementations are fail-fast, i.e. all feeding
- * errors are propagated to the user as quickly as possible and with as much detail as possible.
- *
- * Implementations of this interface are required to be thread safe.
- *
- * A {@link SessionFactory} is provided to instantiate Sessions.
- *
- * @author Einar M R Rosenvinge
- * @see SessionFactory
- * @deprecated Vespa-http-client will be removed in Vespa 8. It's replaced by <a href="https://docs.vespa.ai/en/vespa-feed-client.html">vespa-feed-client</a>
- */
-@Deprecated
-public interface Session extends AutoCloseable {
-
- /**
- * Returns an OutputStream that can be used to write ONE operation, identified by the
- * given document ID. The data format must match the
- * {@link com.yahoo.vespa.http.client.config.FeedParams.DataFormat} given when this
- * Session was instantiated. Note that most data formats include the document ID in the
- * actual buffer, which <em>must</em> match the document ID given as a parameter to this
- * method. It is (as always) important to close the OutputStream returned - nothing
- * is written to the wire until this is done. Note also that the Session holds a certain,
- * dynamically determined maximum number of document operations in memory.
- * When this threshold is reached, {@link java.io.OutputStream#close()} will block.
- *
- *
- * @param documentId the unique ID identifying this operation in the system
- * @return an OutputStream to write the operation payload into
- */
- OutputStream stream(CharSequence documentId);
-
- /**
- * Returns {@link Result}s for all operations enqueued by {@link #stream(CharSequence)}.
- * Note that the order of results is non-deterministic, with <em>one</em> exception - results
- * for one document ID are returned in the order they were enqueued. In all other cases
- * Results may appear out-of-order.
- *
- * @return a blocking queue for retrieving results
- * @see Result
- */
- BlockingQueue<Result> results();
-
- /**
- * Closes this Session. All resources are freed, persistent connections are closed and
- * internal threads are stopped.
- *
- * @throws RuntimeException in cases where underlying resources throw on shutdown/close
- */
- void close();
-
- /**
- * Returns stats about the cluster.
- * @return JSON string with information about cluster.
- */
- String getStatsAsJson();
-
-}
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/SessionFactory.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/SessionFactory.java
deleted file mode 100644
index ef231ccc713..00000000000
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/SessionFactory.java
+++ /dev/null
@@ -1,82 +0,0 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.vespa.http.client;
-
-import com.yahoo.vespa.http.client.config.Cluster;
-import com.yahoo.vespa.http.client.config.Endpoint;
-import com.yahoo.vespa.http.client.config.SessionParams;
-
-import java.time.Clock;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.ThreadFactory;
-
-/**
- * Factory for creating {@link Session} instances.
- *
- * @author Einar M R Rosenvinge
- * @deprecated Vespa-http-client will be removed in Vespa 8. It's replaced by <a href="https://docs.vespa.ai/en/vespa-feed-client.html">vespa-feed-client</a>
- */
-@Deprecated
-public final class SessionFactory {
-
- /**
- * Creates a {@link Session} with the given parameters.
- *
- * @param params the parameters to use when creating the Session.
- * @return a new Session instance
- */
- public static Session create(SessionParams params) {
- return createInternal(params);
- }
-
- @SuppressWarnings("deprecation")
- static Session createInternal(SessionParams params) {
- return new com.yahoo.vespa.http.client.core.api.SessionImpl(params, createTimeoutExecutor(), Clock.systemUTC());
- }
-
- static ScheduledThreadPoolExecutor createTimeoutExecutor() {
- ScheduledThreadPoolExecutor timeoutExecutor;
- timeoutExecutor = new ScheduledThreadPoolExecutor(1, new DaemonThreadFactory("timeout-"));
- timeoutExecutor.setRemoveOnCancelPolicy(true);
- timeoutExecutor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
- timeoutExecutor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
- return timeoutExecutor;
- }
-
- /**
- * Creates a {@link Session} to a single {@link Endpoint}, with default values for everything.
- * For full control of all parameters, or to feed to more than one Endpoint or more than one {@link Cluster},
- * see {@link #create(com.yahoo.vespa.http.client.config.SessionParams)}.
- *
- * @param endpoint the Endpoint to feed to.
- * @return a new Session instance
- * @see #create(com.yahoo.vespa.http.client.config.SessionParams)
- */
- public static Session create(Endpoint endpoint) {
- return createInternal(endpoint);
- }
-
- static Session createInternal(Endpoint endpoint) {
- SessionParams params = new SessionParams.Builder().addCluster(
- new Cluster.Builder().addEndpoint(endpoint).build()).build();
- return create(params);
- }
-
- private static class DaemonThreadFactory implements ThreadFactory {
- private final ThreadFactory defaultThreadFactory = Executors.defaultThreadFactory();
- private final String prefix;
-
- public DaemonThreadFactory(String prefix) {
- this.prefix = prefix;
- }
-
- @Override
- public Thread newThread(Runnable runnable) {
- Thread t = defaultThreadFactory.newThread(runnable);
- t.setDaemon(true);
- t.setName(prefix + t.getName());
- return t;
- }
- }
-
-}
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/SimpleLoggerResultCallback.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/SimpleLoggerResultCallback.java
deleted file mode 100644
index e03bfd2b816..00000000000
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/SimpleLoggerResultCallback.java
+++ /dev/null
@@ -1,113 +0,0 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.vespa.http.client;
-
-import java.time.Duration;
-import java.time.Instant;
-import java.util.Date;
-import java.util.Locale;
-import java.util.concurrent.atomic.AtomicInteger;
-
-/**
- * Simple implementation of the ResultCallback that logs to std err for every X documents:
- * "Result received: 34 (1 failed so far, 2003 sent, success rate 1999.23 docs/sec)."
- * On each failure it will print the Result object content. If tracing is enabled, it will print trace messages to
- * std err as well.
- *
- * @author dybis
- * @deprecated Vespa-http-client will be removed in Vespa 8. It's replaced by <a href="https://docs.vespa.ai/en/vespa-feed-client.html">vespa-feed-client</a>
- */
-@Deprecated
-public class SimpleLoggerResultCallback implements FeedClient.ResultCallback {
-
- private final Object monitor = new Object();
- private int resultCounter = 0;
- private int failureCounter = 0;
- private final AtomicInteger sentDocumentCounter;
- private final int printStatsForEveryXDocument;
- private final boolean ignoreConditionNotMet;
- private Instant startSampleInstant = Instant.now();
- private int startSampleResultCount = 0;
-
- protected void println(String output) {
- System.err.println(output);
- }
-
- /**
- * Constructor
- *
- * @param sentDocumentCounter a counter that is increased outside this class, but can be nice to print here.
- * @param printStatsForEveryXDocument how often to print stats.
- */
- public SimpleLoggerResultCallback(AtomicInteger sentDocumentCounter, int printStatsForEveryXDocument, boolean ignoreConditionNotMet) {
- this.sentDocumentCounter = sentDocumentCounter;
- this.printStatsForEveryXDocument = printStatsForEveryXDocument;
- this.ignoreConditionNotMet = ignoreConditionNotMet;
- }
-
- /**
- * Prints how many documents that are received, failed and sent.
- */
- public void printProgress() {
- synchronized (monitor) {
- DocumentRate docRate = newSamplingPeriod(Instant.now());
- println(new Date() + " Result received: " + resultCounter
- + " (" + failureCounter + " failed so far, " + sentDocumentCounter.get()
- + " sent, success rate " + docRate + ").");
- }
- }
-
- static class DocumentRate {
- public final double rate;
- DocumentRate(double rate) {
- this.rate = rate;
- }
- @Override
- public String toString() {
- return String.format(Locale.US, "%.2f docs/sec", rate);
- }
- }
-
- /*
- * Returns success results per second for last interval and resets variables.
- */
- protected DocumentRate newSamplingPeriod(Instant now) {
- double docsDelta = resultCounter - failureCounter - startSampleResultCount;
- Duration duration = Duration.between(startSampleInstant, now);
- startSampleInstant = now;
- startSampleResultCount = resultCounter - failureCounter;
- long durationMilliSecs = duration.toMillis() + 1; // +1 to avoid division by zero
- return new DocumentRate(1000. * docsDelta / durationMilliSecs);
- }
-
- int getResultCount() {
- synchronized (monitor) {
- return resultCounter;
- }
- }
-
- int getFailedDocumentCount() {
- synchronized (monitor) {
- return failureCounter;
- }
- }
-
- @Override
- public void onCompletion(String docId, Result documentResult) {
- synchronized (monitor) {
- if (printStatsForEveryXDocument > 0 && (resultCounter % printStatsForEveryXDocument) == 0) {
- printProgress();
- }
- resultCounter++;
- boolean success = ignoreConditionNotMet
- ? documentResult.isSuccessOrConditionNotMet()
- : documentResult.isSuccess();
- if ( ! success ) {
- failureCounter++;
- println("Failure: " + documentResult + (documentResult.getDetails().isEmpty() ? "" : ":"));
- for (Result.Detail detail : documentResult.getDetails())
- println(" " + detail);
- }
- }
- }
-
-}
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/SyncFeedClient.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/SyncFeedClient.java
deleted file mode 100644
index 2fc63bbbcc5..00000000000
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/SyncFeedClient.java
+++ /dev/null
@@ -1,193 +0,0 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.vespa.http.client;
-
-import com.yahoo.vespa.http.client.config.SessionParams;
-
-import java.math.BigInteger;
-import java.util.ArrayList;
-import java.util.ConcurrentModificationException;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Objects;
-import java.util.concurrent.ThreadLocalRandom;
-
-/**
- * A utility wrapper of a FeedClient which feeds a list of documents and blocks until all responses are returned,
- * before returning the results.
- *
- * Not multithread safe: A sync feed client instance can only be used by a single thread
- * (but it can and should be reused for multiple subsequent synchronous calls).
- *
- * @author bratseth
- * @deprecated Vespa-http-client will be removed in Vespa 8. It's replaced by <a href="https://docs.vespa.ai/en/vespa-feed-client.html">vespa-feed-client</a>
- */
-@Deprecated
-public class SyncFeedClient implements AutoCloseable {
-
- private final FeedClient wrappedClient;
- private final Callback callback;
-
- public SyncFeedClient(SessionParams params) {
- callback = new SyncFeedClient.Callback();
- if (params.getFeedParams().getIdlePollFrequency() == null) {
- params = params.toBuilder()
- .setFeedParams(params.getFeedParams().toBuilder()
- .setIdlePollFrequency(500.0)
- .build())
- .build();
- }
- this.wrappedClient = FeedClientFactory.create(params, callback);
- }
-
- /**
- * Calls FeedClient.stream for each entry in the list, blocks until all results are ready and returns them.
- * This will block for at most the time it takes to feed these operations + clientTimeout given in the
- * sessions params when creating this.
- *
- * @param operations the Vespa write operations to stream
- * @return the result of feeding all these operations
- */
- public SyncResult stream(List<SyncOperation> operations) {
- callback.expectResultsOf(operations);
- for (SyncOperation operation : operations)
- wrappedClient.stream(operation.documentId, operation.operationId, operation.documentData, operation.context);
- return callback.waitForResults();
- }
-
- @Override
- public void close() {
- wrappedClient.close();
- }
-
- /** Holds the arguments to a single stream operation */
- public static class SyncOperation {
-
- private final String documentId;
- private final CharSequence documentData;
- private final Object context;
-
- /** Operation id passed on to the Document created from this */
- private final String operationId;
-
- public SyncOperation(String documentId, CharSequence documentData) {
- this(documentId, documentData, null);
- }
-
- public SyncOperation(String documentId, CharSequence documentData, Object context) {
- this(documentId, documentData, new BigInteger(64, ThreadLocalRandom.current()).toString(32), context);
- }
-
- public SyncOperation(String documentId, CharSequence documentData, String operationId, Object context) {
- this.documentId = Objects.requireNonNull(documentId, "documentId");
- this.documentData = Objects.requireNonNull(documentData, "documentData");
- this.context = context;
- this.operationId = Objects.requireNonNull(operationId);
- }
-
- }
-
- /**
- * The result of a SyncFeedClient.stream call. This always holds exactly one Result per SyncOperation
- * attempted, and the results are guaranteed to be returned in the same order as in the List of SyncOperations.
- */
- public static class SyncResult {
-
- private final Exception exception;
- private final List<Result> results;
-
- private SyncResult(List<Result> results, Exception exception) {
- this.results = results;
- this.exception = exception;
- }
-
- /**
- * Returns the results of this. This has the same size and order as the List of SyncOperations that
- * created this. The list returned is modifiable and owned by the client. Multiple calls to this returns the
- * same list instance.
- */
- public List<Result> results() { return results; }
-
- /**
- * Returns the last exception received when attempting the operations this is the result of, or null if none.
- * Even if there is an exception, results() will return one Result per operation attempted.
- */
- public Exception exception() { return exception; }
-
- /** Returns true if all Results in this are successful */
- public boolean isSuccess() {
- return results.stream().allMatch(Result::isSuccess);
- }
-
- }
-
- private static class Callback implements FeedClient.ResultCallback {
-
- private final Object monitor = new Object();
-
- // The rest of the state of this is reset each time we call expectResultsOf
-
- private int resultsReceived;
- private Exception exception = null;
-
- /**
- * A map from operation ids to their results. This is initially populated with null values to keep track of
- * which responses we are waiting for.
- */
- private LinkedHashMap<String, Result> results = null;
-
- void expectResultsOf(List<SyncOperation> operations) {
- synchronized (monitor) {
- if (results != null)
- throw new ConcurrentModificationException("A SyncFeedClient instance is used by multiple threads");
-
- resultsReceived = 0;
- exception = null;
- results = new LinkedHashMap<>(operations.size());
- for (SyncOperation operation : operations)
- results.put(operation.operationId, null);
- }
- }
-
- SyncResult waitForResults() {
- try {
- synchronized (monitor) {
- while ( ! complete())
- monitor.wait();
-
- SyncResult syncResult = new SyncResult(new ArrayList<>(results.values()), exception);
- results = null;
- return syncResult;
- }
- }
- catch (InterruptedException e) {
- throw new RuntimeException("Interrupted while waiting for feeding results", e);
- }
- }
-
- @Override
- public void onCompletion(String docId, Result documentResult) {
- synchronized (monitor) {
- if ( ! results.containsKey(documentResult.getOperationId())) return; // Stale result - ignore
-
- Result previousValue = results.put(documentResult.getOperationId(), documentResult);
- if (previousValue != null)
- throw new IllegalStateException("Received duplicate result for " + docId);
-
- resultsReceived++;
- if (complete())
- monitor.notifyAll();
- }
- }
-
- @Override
- public void onEndpointException(FeedEndpointException exception) {
- this.exception = exception; // We will still receive one onCompletion per stream invocation done
- }
-
- private boolean complete() {
- return resultsReceived == results.size();
- }
-
- }
-
-}
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/Cluster.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/Cluster.java
deleted file mode 100644
index 4f72c380e59..00000000000
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/Cluster.java
+++ /dev/null
@@ -1,72 +0,0 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.vespa.http.client.config;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.LinkedList;
-import java.util.List;
-
-/**
- * A set of {@link Endpoint} instances. Construct using {@link Cluster.Builder}.
- *
- * @author Einar M R Rosenvinge
- */
-public final class Cluster {
-
- /** Builder for {@link Cluster}. */
- public final static class Builder {
- private final List<Endpoint> endpoints = new LinkedList<>();
- private String route = null;
-
- /**
- * Adds an Endpoint (a HTTP gateway) to this Cluster.
- *
- * @param endpoint the Endpoint to add
- * @return this, for chaining
- */
- public Builder addEndpoint(Endpoint endpoint) {
- endpoints.add(endpoint);
- return this;
- }
-
- /**
- * Sets a route specific to this cluster, which overrides the route set in {@link com.yahoo.vespa.http.client.config.FeedParams#getRoute()}.
- *
- * @param route a route specific to this cluster
- * @return this, for chaining
- */
- public Builder setRoute(String route) {
- this.route = route;
- return this;
- }
-
- public Cluster build() {
- return new Cluster(endpoints, route);
- }
-
- public String getRoute() {
- return route;
- }
- }
- private final List<Endpoint> endpoints;
- private final String route;
-
- private Cluster(List<Endpoint> endpoints, String route) {
- this.endpoints = Collections.unmodifiableList(new ArrayList<>(endpoints));
- this.route = route;
- }
-
- public List<Endpoint> getEndpoints() {
- return endpoints;
- }
-
- public String getRoute() {
- return route;
- }
-
- @Override
- public String toString() {
- return "cluster with endpoints " + endpoints + " and route '" + route + "'";
- }
-
-}
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/ConnectionParams.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/ConnectionParams.java
deleted file mode 100644
index 00cc2512ae3..00000000000
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/ConnectionParams.java
+++ /dev/null
@@ -1,486 +0,0 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.vespa.http.client.config;
-
-import com.fasterxml.jackson.annotation.JsonIgnore;
-import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.Multimap;
-import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
-
-import javax.net.ssl.HostnameVerifier;
-import javax.net.ssl.SSLContext;
-import java.nio.file.Path;
-import java.time.Duration;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Objects;
-import java.util.concurrent.TimeUnit;
-
-/**
- * Connection level parameters.
- * This class is immutable
- * and has no public constructor - to instantiate one, use a {@link Builder}.
- *
- * @author Einar M R Rosenvinge
- */
-public final class ConnectionParams {
-
- /**
- * Builder for {@link ConnectionParams}.
- */
- public static final class Builder {
-
- private SSLContext sslContext = null;
- private HostnameVerifier hostnameVerifier = SSLConnectionSocketFactory.getDefaultHostnameVerifier();
- private final Multimap<String, String> headers = ArrayListMultimap.create();
- private final Map<String, HeaderProvider> headerProviders = new HashMap<>();
- private int numPersistentConnectionsPerEndpoint = 1;
- private String proxyHost = null;
- private int proxyPort = 8080;
- private boolean useCompression = false;
- private int maxRetries = 100;
- private long minTimeBetweenRetriesMs = 700;
- private boolean dryRun = false;
- private boolean runThreads = true;
- private int traceLevel = 0;
- private int traceEveryXOperation = 0;
- private boolean printTraceToStdErr = true;
- private boolean useTlsConfigFromEnvironment = false;
- private Duration connectionTimeToLive = Duration.ofSeconds(30);
- private Path privateKey;
- private Path certificate;
- private Path caCertificates;
-
- /**
- * Use TLS configuration through the standard Vespa environment variables.
- * Setting this to 'true' will override any other TLS/HTTPS related configuration.
- */
- public Builder setUseTlsConfigFromEnvironment(boolean useTlsConfigFromEnvironment) {
- this.useTlsConfigFromEnvironment = useTlsConfigFromEnvironment;
- return this;
- }
-
- /**
- * Sets the SSLContext for the connection to the gateway when SSL is enabled for Endpoint.
- * Default null (no ssl). See also Endpoint configuration.
- *
- * @param sslContext sslContext for connection to gateway.
- * @return pointer to builder.
- */
- public Builder setSslContext(SSLContext sslContext) {
- this.sslContext = sslContext;
- return this;
- }
-
- /**
- * Sets the {@link HostnameVerifier} for the connection to the gateway when SSL is enabled for Endpoint.
- * Defaults to instance returned by {@link SSLConnectionSocketFactory#getDefaultHostnameVerifier()}.
- *
- * @param hostnameVerifier hostname verifier for connection to gateway.
- * @return pointer to builder.
- */
- public Builder setHostnameVerifier(HostnameVerifier hostnameVerifier) {
- this.hostnameVerifier = hostnameVerifier;
- return this;
- }
-
- /**
- * Set path to private key and certificate files. Both the private key and certificate must be PEM-encoded.
- */
- public Builder setCertificateAndPrivateKey(Path privateKey, Path certificate) {
- this.privateKey = privateKey;
- this.certificate = certificate;
- return this;
- }
-
- /**
- * Set path a PEM file containing the CA certificates.
- */
- public Builder setCaCertificates(Path caCertificates) {
- this.caCertificates = caCertificates;
- return this;
- }
-
- /**
- * Set custom headers to be used
- *
- * @param key header name
- * @param value header value
- * @return pointer to builder.
- */
- public Builder addHeader(String key, String value) {
- headers.put(key, value);
- return this;
- }
-
- /**
- * Adds a header provider for dynamic headers; headers where the value may change during a feeding session
- * (e.g. security tokens with limited life time). Only one {@link HeaderProvider} is allowed for a given header name.
- *
- * @param provider A provider for a dynamic header
- * @return pointer to builder.
- * @throws IllegalArgumentException if a provider is already registered for the given header name
- */
- public Builder addDynamicHeader(String headerName, HeaderProvider provider) {
- Objects.requireNonNull(headerName, "Header name cannot be null");
- Objects.requireNonNull(provider, "Header provider cannot be null");
- if (headerProviders.containsKey(headerName)) {
- throw new IllegalArgumentException("Provider already registered for name '" + headerName + "'");
- }
- headerProviders.put(headerName, provider);
- return this;
- }
-
- /**
- * The number of connections between the http client and the gateways. A very low number can result
- * in the network not fully utilized and the round-trip time can be a limiting factor. A low number
- * can cause skew in distribution of load between gateways. A too high number will cause
- * many threads to run, more context switching and potential more memory usage. We recommend using about
- * 16 connections per gateway.
- *
- * @param numPersistentConnectionsPerEndpoint number of channels per endpoint
- * @return pointer to builder.
- */
- public Builder setNumPersistentConnectionsPerEndpoint(int numPersistentConnectionsPerEndpoint) {
- this.numPersistentConnectionsPerEndpoint = numPersistentConnectionsPerEndpoint;
- return this;
- }
-
- /**
- * Sets the HTTP proxy host name to use.
- *
- * @param proxyHost host name for proxy.
- * @return pointer to builder.
- */
- public Builder setProxyHost(String proxyHost) {
- this.proxyHost = proxyHost;
- return this;
- }
-
- /**
- * Sets the HTTP proxy host port to use.
- *
- * @param proxyPort host port for proxy.
- * @return pointer to builder.
- */
- public Builder setProxyPort(int proxyPort) {
- this.proxyPort = proxyPort;
- return this;
- }
-
- /**
- * Set whether compression of document operations during communication to server should be enabled.
- *
- * @param useCompression true if compression should be enabled.
- * @return pointer to builder.
- */
- public Builder setUseCompression(boolean useCompression) {
- this.useCompression = useCompression;
- return this;
- }
-
- /**
- * Set how many times to retry sending an operation to a gateway when encountering transient problems.
- *
- * @param maxRetries max number of retries
- * @return pointer to builder.
- */
- public Builder setMaxRetries(int maxRetries) {
- this.maxRetries = maxRetries;
- return this;
- }
-
- /**
- * Set to true to skip making network connections and instead
- * let requests complete successfully with no effect.
- */
- public Builder setDryRun(boolean dryRun) {
- this.dryRun = dryRun;
- return this;
- }
-
- /**
- * Set to false to skip starting io threads, such that any operation must be driven by a calling thread.
- * Useful for testing.
- */
- public Builder setRunThreads(boolean runThreads) {
- this.runThreads = runThreads;
- return this;
- }
-
- /**
- * Set the min time between retries when temporarily failing against a gateway.
- *
- * @param minTimeBetweenRetries the min time value
- * @param unit the unit of the min time.
- * @return pointer to builder.
- */
- public Builder setMinTimeBetweenRetries(long minTimeBetweenRetries, TimeUnit unit) {
- this.minTimeBetweenRetriesMs = unit.toMillis(minTimeBetweenRetries);
- return this;
- }
-
- public long getMinTimeBetweenRetriesMs() {
- return minTimeBetweenRetriesMs;
- }
-
- /**
- * Sets the trace level for tracing messagebus. 0 means to tracing.
- *
- * @param traceLevel tracelevel, larger value means more tracing.
- * @return pointer to builder.
- */
- public Builder setTraceLevel(int traceLevel) {
- this.traceLevel = traceLevel;
- return this;
- }
-
- /**
- * How often to trace messages in client. Please note that this does not affect tracing with messagebus
- *
- * @param traceEveryXOperation if zero, no tracing, 1 = every message, and so on.
- * @return pointer to builder.
- */
- public Builder setTraceEveryXOperation(int traceEveryXOperation) {
- this.traceEveryXOperation = traceEveryXOperation;
- return this;
- }
-
- /**
- * If enabled will write internal trace to stderr.
- *
- * @param printTraceToStdErr if value is true it is enabled.
- * @return pointer to builder.
- */
- public Builder setPrintTraceToStdErr(boolean printTraceToStdErr) {
- this.printTraceToStdErr = printTraceToStdErr;
- return this;
- }
-
- /**
- * Set the maximum time to live for persistent connections
- */
- public Builder setConnectionTimeToLive(Duration connectionTimeToLive) {
- this.connectionTimeToLive = connectionTimeToLive;
- return this;
- }
-
- public ConnectionParams build() {
- return new ConnectionParams(
- sslContext,
- privateKey,
- certificate,
- caCertificates,
- hostnameVerifier,
- headers,
- headerProviders,
- numPersistentConnectionsPerEndpoint,
- proxyHost,
- proxyPort,
- useCompression,
- maxRetries,
- minTimeBetweenRetriesMs,
- dryRun,
- runThreads,
- traceLevel,
- traceEveryXOperation,
- printTraceToStdErr,
- useTlsConfigFromEnvironment,
- connectionTimeToLive);
- }
-
- public int getNumPersistentConnectionsPerEndpoint() {
- return numPersistentConnectionsPerEndpoint;
- }
-
- public String getProxyHost() {
- return proxyHost;
- }
-
- public boolean isDryRun() {
- return dryRun;
- }
-
- public boolean runThreads() { return runThreads; }
-
- public int getMaxRetries() {
- return maxRetries;
- }
- public int getTraceLevel() {
- return traceLevel;
- }
- public int getTraceEveryXOperation() {
- return traceEveryXOperation;
- }
-
- public boolean getPrintTraceToStdErr() {
- return printTraceToStdErr;
- }
-
- public int getProxyPort() {
- return proxyPort;
- }
-
- public SSLContext getSslContext() {
- return sslContext;
- }
-
- public HostnameVerifier getHostnameVerifier() {
- return hostnameVerifier;
- }
-
- public boolean useTlsConfigFromEnvironment() {
- return useTlsConfigFromEnvironment;
- }
-
- public Duration getConnectionTimeToLive() {
- return connectionTimeToLive;
- }
- public Path getPrivateKey() { return privateKey; }
- public Path getCertificate() { return certificate; }
- public Path getCaCertificates() { return caCertificates; }
- }
-
- private final SSLContext sslContext;
- private final Path privateKey;
- private final Path certificate;
- private final Path caCertificates;
- private final HostnameVerifier hostnameVerifier;
- private final Multimap<String, String> headers = ArrayListMultimap.create();
- private final Map<String, HeaderProvider> headerProviders = new HashMap<>();
- private final int numPersistentConnectionsPerEndpoint;
- private final String proxyHost;
- private final int proxyPort;
- private final boolean useCompression;
- private final int maxRetries;
- private final long minTimeBetweenRetriesMs;
- private final boolean dryRun;
- private final boolean runThreads;
- private final int traceLevel;
- private final int traceEveryXOperation;
- private final boolean printTraceToStdErr;
- private final boolean useTlsConfigFromEnvironment;
- private final Duration connectionTimeToLive;
-
- private ConnectionParams(
- SSLContext sslContext,
- Path privateKey, Path certificate, Path caCertificates,
- HostnameVerifier hostnameVerifier,
- Multimap<String, String> headers,
- Map<String, HeaderProvider> headerProviders,
- int numPersistentConnectionsPerEndpoint,
- String proxyHost,
- int proxyPort,
- boolean useCompression,
- int maxRetries,
- long minTimeBetweenRetriesMs,
- boolean dryRun,
- boolean runThreads,
- int traceLevel,
- int traceEveryXOperation,
- boolean printTraceToStdErr,
- boolean useTlsConfigFromEnvironment,
- Duration connectionTimeToLive) {
- this.sslContext = sslContext;
- this.privateKey = privateKey;
- this.certificate = certificate;
- this.caCertificates = caCertificates;
- this.hostnameVerifier = hostnameVerifier;
- this.useTlsConfigFromEnvironment = useTlsConfigFromEnvironment;
- this.connectionTimeToLive = connectionTimeToLive;
- this.headers.putAll(headers);
- this.headerProviders.putAll(headerProviders);
- this.numPersistentConnectionsPerEndpoint = numPersistentConnectionsPerEndpoint;
- this.proxyHost = proxyHost;
- this.proxyPort = proxyPort;
- this.useCompression = useCompression;
- this.maxRetries = maxRetries;
- this.minTimeBetweenRetriesMs = minTimeBetweenRetriesMs;
- this.dryRun = dryRun;
- this.runThreads = runThreads;
- this.traceLevel = traceLevel;
- this.traceEveryXOperation = traceEveryXOperation;
- this.printTraceToStdErr = printTraceToStdErr;
- }
-
- @JsonIgnore
- public SSLContext getSslContext() {
- return sslContext;
- }
-
- @JsonIgnore
- public HostnameVerifier getHostnameVerifier() {
- return hostnameVerifier;
- }
-
- public Collection<Map.Entry<String, String>> getHeaders() {
- return Collections.unmodifiableCollection(headers.entries());
- }
- @JsonIgnore
- public Map<String, HeaderProvider> getDynamicHeaders() {
- return Collections.unmodifiableMap(headerProviders);
- }
-
- public int getNumPersistentConnectionsPerEndpoint() {
- return numPersistentConnectionsPerEndpoint;
- }
-
- public String getProxyHost() {
- return proxyHost;
- }
-
- public int getProxyPort() {
- return proxyPort;
- }
-
- public boolean getUseCompression() {
- return useCompression;
- }
-
- public int getMaxRetries() {
- return maxRetries;
- }
-
- public long getMinTimeBetweenRetriesMs() {
- return minTimeBetweenRetriesMs;
- }
-
- public boolean isDryRun() {
- return dryRun;
- }
-
- public boolean runThreads() { return runThreads; }
-
- public int getTraceLevel() {
- return traceLevel;
- }
-
- public int getTraceEveryXOperation() {
- return traceEveryXOperation;
- }
-
- public boolean getPrintTraceToStdErr() {
- return printTraceToStdErr;
- }
-
- public boolean useTlsConfigFromEnvironment() {
- return useTlsConfigFromEnvironment;
- }
-
- public Duration getConnectionTimeToLive() {
- return connectionTimeToLive;
- }
-
- /**
- * A header provider that provides a header value. {@link #getHeaderValue()} is called each time a new HTTP request
- * is constructed by {@link com.yahoo.vespa.http.client.FeedClient}.
- *
- * Important: The implementation of {@link #getHeaderValue()} must be thread-safe!
- */
- public interface HeaderProvider { String getHeaderValue(); }
-
- public Path getPrivateKey() { return privateKey; }
- public Path getCertificate() { return certificate; }
- public Path getCaCertificates() { return caCertificates; }
-}
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/Endpoint.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/Endpoint.java
deleted file mode 100644
index ae0cf19a3d1..00000000000
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/Endpoint.java
+++ /dev/null
@@ -1,82 +0,0 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.vespa.http.client.config;
-
-import java.io.Serializable;
-import java.net.URL;
-
-/**
- * Represents an endpoint, in most cases a JDisc container
- * in a Vespa cluster configured with <code>document-api</code>.
- *
- * @author Einar M R Rosenvinge
- */
-public final class Endpoint implements Serializable {
-
- private static final int DEFAULT_PORT = 4080;
-
- private final String hostname;
- private final int port;
- private final boolean useSsl;
-
- private Endpoint(String hostname, int port, boolean useSsl) {
- if (hostname.startsWith("https://")) {
- throw new RuntimeException("Hostname should be name of machine, not prefixed with protocol (https://)");
- }
- // A lot of people put http:// before the servername, let us allow that.
- if (hostname.startsWith("http://")) {
- this.hostname = hostname.replaceFirst("http://", "");
- } else {
- this.hostname = hostname;
- }
- this.port = port;
- this.useSsl = useSsl;
- }
-
- public String getHostname() {
- return hostname;
- }
-
- public int getPort() {
- return port;
- }
-
- public boolean isUseSsl() {
- return useSsl;
- }
-
- @Override
- public String toString() {
- return hostname + ":" + port + " ssl=" + useSsl;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (!(o instanceof Endpoint)) return false;
- Endpoint endpoint = (Endpoint) o;
- return port == endpoint.port && useSsl == endpoint.useSsl && hostname.equals(endpoint.hostname);
- }
-
- @Override
- public int hashCode() {
- int result = hostname.hashCode();
- result = 31 * result + port;
- result = 31 * result + (useSsl ? 1 : 0);
- return result;
- }
-
- /** Creates an Endpoint with the default port and without using SSL */
- public static Endpoint create(String hostname) {
- return new Endpoint(hostname, DEFAULT_PORT, false);
- }
-
- /** Creates an Endpoint with the given hostname, port and SSL setting. */
- public static Endpoint create(String hostname, int port, boolean useSsl) {
- return new Endpoint(hostname, port, useSsl);
- }
-
- public static Endpoint create(URL url) {
- return new Endpoint(url.getHost(), url.getPort(), "https".equals(url.getProtocol()));
- }
-
-}
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/FeedParams.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/FeedParams.java
deleted file mode 100644
index 01f314a7e36..00000000000
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/FeedParams.java
+++ /dev/null
@@ -1,332 +0,0 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.vespa.http.client.config;
-
-import java.util.concurrent.TimeUnit;
-
-/**
- * Feed level parameters. This class is immutable
- * and has no public constructor - to instantiate one, use a {@link Builder}.
-
- * @author Einar M R Rosenvinge
- */
-public final class FeedParams {
-
- public boolean getDenyIfBusyV3() { return denyIfBusyV3; }
-
- public long getMaxSleepTimeMs() { return maxSleepTimeMs; }
-
- public boolean getSilentUpgrade() { return silentUpgrade; }
-
- /**
- * Enumeration of data formats that are acceptable by the
- * {@link com.yahoo.vespa.http.client.FeedClient} methods.
- */
- public enum DataFormat {
- /** UTF-8-encoded XML. Preamble is not necessary. */
- XML_UTF8,
- JSON_UTF8
- }
- /**
- * Mutable class used to instantiate a {@link FeedParams}.
- */
- public static final class Builder {
-
- private DataFormat dataFormat = DataFormat.JSON_UTF8;
- private long serverTimeout = TimeUnit.SECONDS.toMillis(180);
- private long clientTimeout = TimeUnit.SECONDS.toMillis(20);
- private String route = null;
- private int maxChunkSizeBytes = 50 * 1024;
- private int maxInFlightRequests = 5000;
- private long localQueueTimeOut = 180000;
- private String priority = null;
- private boolean denyIfBusyV3 = true;
- private long maxSleepTimeMs = 3000;
- private boolean silentUpgrade = true;
- private Double idlePollFrequency = null;
-
- /**
- * Make server not throw 4xx/5xx for situations that are normal during upgrade as this can esily mask
- * other problems. This feature need to be supported on server side to work, but it is still safe
- * to enable it, even if server does not yet support it. As of Nov 22 2016 it is not yet implemented on
- * the server side.
- * @param silentUpgrade true for reducing "false" 4xx/5xx.
- * @return this, for chaining
- */
- public Builder setSilentUpgrade(boolean silentUpgrade) {
- this.silentUpgrade = silentUpgrade;
- return this;
- }
-
- /**
- * When throttling the load due to transient errors on gateway, what is the most time to wait between
- * requests per thread. Only active for V3 protocol.
- * @param ms max with time
- * @return this, for chaining
- */
- public Builder setMaxSleepTimeMs(long ms) {
- this.maxSleepTimeMs = ms;
- return this;
- }
-
- /**
- * If this is set to false, the gateway will block threads until messagebus can send the message.
- * If true, the gateway will exit and fail the request early if there are many threads already
- * blocked.
- * @param value true to reduce number of blocked threads in gateway.
- * @return this, for chaining
- */
- public Builder setDenyIfBusyV3(boolean value) {
- denyIfBusyV3 = value;
- return this;
- }
-
- /**
- * Sets the data format to be used.
- *
- * @param dataFormat the data format to be used.
- * @see DataFormat
- * @return this, for chaining
- */
- public Builder setDataFormat(DataFormat dataFormat) {
- this.dataFormat = dataFormat;
- return this;
- }
-
- /**
- * Sets a route to be used for all Clusters, unless overridden on a per-cluster basis
- * in {@link com.yahoo.vespa.http.client.config.Cluster#getRoute()}.
- *
- * @param route a route to be used for all Clusters.
- * @return this, for chaining
- */
- public Builder setRoute(String route) {
- this.route = route;
- return this;
- }
-
- /**
- * Sets the server-side timeout of each operation - i.e. the timeout used by
- * the server endpoint for operations going over the message bus protocol into
- * Vespa.
- *
- * Note that the TOTAL timeout of any one operation in this API would be
- * {@link #getServerTimeout(java.util.concurrent.TimeUnit)} +
- * {@link #getClientTimeout(java.util.concurrent.TimeUnit)}.
- *
- * @param serverTimeout timeout value
- * @param unit unit of timeout value
- * @return this, for chaining
- */
- public Builder setServerTimeout(long serverTimeout, TimeUnit unit) {
- if (serverTimeout <= 0L) {
- throw new IllegalArgumentException("Server timeout cannot be zero or negative.");
- }
- this.serverTimeout = unit.toMillis(serverTimeout);
- return this;
- }
-
- /**
- * Sets the client-side timeout for each operation.&nbsp;If BOTH the server-side
- * timeout AND this timeout has passed, the {@link com.yahoo.vespa.http.client.FeedClient}
- * will synthesize a {@link com.yahoo.vespa.http.client.Result}.
- *
- * Note that the TOTAL timeout of any one operation in this API would be
- * {@link #getServerTimeout(java.util.concurrent.TimeUnit)} +
- * {@link #getClientTimeout(java.util.concurrent.TimeUnit)},
- * after which a result callback is guaranteed to be made.
- *
- * @param clientTimeout timeout value
- * @param unit unit of timeout value
- * @return this, for chaining
- */
- public Builder setClientTimeout(long clientTimeout, TimeUnit unit) {
- if (clientTimeout <= 0L) {
- throw new IllegalArgumentException("Client timeout cannot be zero or negative.");
- }
- this.clientTimeout = unit.toMillis(clientTimeout);
- return this;
- }
-
- /**
- * Sets the maximum number of bytes of document data to send per HTTP request.
- *
- * @param maxChunkSizeBytes max number of bytes per HTTP request.
- * @return this, for chaining
- */
- public Builder setMaxChunkSizeBytes(int maxChunkSizeBytes) {
- this.maxChunkSizeBytes = maxChunkSizeBytes;
- return this;
- }
-
- /**
- * Sets the maximum number of operations to be in-flight.
- *
- * @param maxInFlightRequests max number of operations.
- * @return this, for chaining
- */
- public Builder setMaxInFlightRequests(int maxInFlightRequests) {
- this.maxInFlightRequests = maxInFlightRequests;
- return this;
- }
-
- /**
- * Sets the number of milliseconds until we respond with a timeout for a document operation
- * if we still have not received a response.
- */
- public Builder setLocalQueueTimeOut(long timeOutMs) {
- this.localQueueTimeOut = timeOutMs;
- return this;
- }
-
- /**
- * Set what frequency to poll for async responses. Default is 10hz (every 0.1s), but 1000hz when using SyncFeedClient
- */
- public Builder setIdlePollFrequency(Double idlePollFrequency) {
- this.idlePollFrequency = idlePollFrequency;
- return this;
- }
-
- /**
- * Sets the messagebus priority. The allowed values are HIGHEST, VERY_HIGH, HIGH_[1-3],
- * NORMAL_[1-6], LOW_[1-3], VERY_LOW, and LOWEST..
- * @param priority messagebus priority of this message.
- * @return this, for chaining
- */
- public Builder setPriority(String priority) {
- if (priority == null) {
- return this;
- }
- switch (priority) {
- case "HIGHEST":
- case "VERY_HIGH":
- case "HIGH_1":
- case "HIGH_2":
- case "HIGH_3":
- case "NORMAL_1":
- case "NORMAL_2":
- case "NORMAL_3":
- case "NORMAL_4":
- case "NORMAL_5":
- case "NORMAL_6":
- case "LOW_1":
- case "LOW_2":
- case "LOW_3":
- case "VERY_LOW":
- case "LOWEST":
- this.priority = priority;
- return this;
- default:
- throw new IllegalArgumentException("Unknown value for priority: " + priority
- + " Allowed values are HIGHEST, VERY_HIGH, HIGH_[1-3], " +
- "NORMAL_[1-6], LOW_[1-3], VERY_LOW, and LOWEST.");
- }
- }
-
- /**
- * Instantiates a {@link FeedParams}.
- *
- * @return a FeedParams object with the parameters of this Builder
- */
- public FeedParams build() {
- return new FeedParams(
- dataFormat, serverTimeout, clientTimeout, route,
- maxChunkSizeBytes, maxInFlightRequests, localQueueTimeOut, priority,
- denyIfBusyV3, maxSleepTimeMs, silentUpgrade, idlePollFrequency);
- }
-
- public long getClientTimeout(TimeUnit unit) {
- return unit.convert(clientTimeout, TimeUnit.MILLISECONDS);
- }
-
- public long getServerTimeout(TimeUnit unit) {
- return unit.convert(serverTimeout, TimeUnit.MILLISECONDS);
- }
-
- public String getRoute() {
- return route;
- }
-
- public DataFormat getDataFormat() {
- return dataFormat;
- }
-
- public int getMaxChunkSizeBytes() {
- return maxChunkSizeBytes;
- }
-
- public int getMaxInFlightRequests() {
- return maxInFlightRequests;
- }
-
- }
-
- // NOTE! See toBuilder at the end of this class if you add fields here
-
- private final DataFormat dataFormat;
- private final long serverTimeoutMillis;
- private final long clientTimeoutMillis;
- private final String route;
- private final int maxChunkSizeBytes;
- private final int maxInFlightRequests;
- private final long localQueueTimeOut;
- private final String priority;
- private final boolean denyIfBusyV3;
- private final long maxSleepTimeMs;
- private final boolean silentUpgrade;
- private final Double idlePollFrequency;
-
- private FeedParams(DataFormat dataFormat, long serverTimeout, long clientTimeout, String route,
- int maxChunkSizeBytes, final int maxInFlightRequests,
- long localQueueTimeOut, String priority, boolean denyIfBusyV3, long maxSleepTimeMs,
- boolean silentUpgrade, Double idlePollFrequency) {
- this.dataFormat = dataFormat;
- this.serverTimeoutMillis = serverTimeout;
- this.clientTimeoutMillis = clientTimeout;
- this.route = route;
- this.maxChunkSizeBytes = maxChunkSizeBytes;
- this.maxInFlightRequests = maxInFlightRequests;
- this.localQueueTimeOut = localQueueTimeOut;
- this.priority = priority;
- this.denyIfBusyV3 = denyIfBusyV3;
- this.maxSleepTimeMs = maxSleepTimeMs;
- this.silentUpgrade = silentUpgrade;
- this.idlePollFrequency = idlePollFrequency;
-
- }
-
- public DataFormat getDataFormat() { return dataFormat; }
- public String getRoute() { return route; }
- public long getServerTimeout(TimeUnit unit) { return unit.convert(serverTimeoutMillis, TimeUnit.MILLISECONDS); }
- public long getClientTimeout(TimeUnit unit) { return unit.convert(clientTimeoutMillis, TimeUnit.MILLISECONDS); }
-
- public int getMaxChunkSizeBytes() { return maxChunkSizeBytes; }
- public String getPriority() { return priority; }
-
- public String toUriParameters() {
- StringBuilder b = new StringBuilder();
- b.append("&dataformat=").append(dataFormat.name()); //name in dataFormat enum obviously must be ascii
- return b.toString();
- }
-
- public int getMaxInFlightRequests() { return maxInFlightRequests; }
- public long getLocalQueueTimeOut() { return localQueueTimeOut; }
- public Double getIdlePollFrequency() { return idlePollFrequency; }
-
- /** Returns a builder initialized to the values of this */
- public FeedParams.Builder toBuilder() {
- Builder b = new Builder();
- b.setDataFormat(dataFormat);
- b.setServerTimeout(serverTimeoutMillis, TimeUnit.MILLISECONDS);
- b.setClientTimeout(clientTimeoutMillis, TimeUnit.MILLISECONDS);
- b.setRoute(route);
- b.setMaxChunkSizeBytes(maxChunkSizeBytes);
- b.setMaxInFlightRequests(maxInFlightRequests);
- b.setPriority(priority);
- b.setDenyIfBusyV3(denyIfBusyV3);
- b.setMaxSleepTimeMs(maxSleepTimeMs);
- b.setSilentUpgrade(silentUpgrade);
- b.setIdlePollFrequency(idlePollFrequency);
- return b;
- }
-
-}
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/SessionParams.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/SessionParams.java
deleted file mode 100644
index e8052fa7faa..00000000000
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/SessionParams.java
+++ /dev/null
@@ -1,193 +0,0 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.vespa.http.client.config;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.LinkedList;
-import java.util.List;
-
-/**
- * Parameters given to a {@link com.yahoo.vespa.http.client.FeedClientFactory}
- * when creating {@link com.yahoo.vespa.http.client.FeedClient}s. This class is immutable
- * and has no public constructor - to instantiate one, use a {@link Builder}.
- *
- * @author Einar M R Rosenvinge
- * @see com.yahoo.vespa.http.client.FeedClientFactory
- * @see Builder
- */
-public final class SessionParams {
-
- /**
- * Interface for handling serious errors with connection.
- */
- public interface ErrorReporter {
- void onSessionError(Endpoint endpoint, String oldSessionID, String newSessionId);
- }
-
- /**
- * Mutable class used to instantiate a {@link SessionParams}. A builder
- * instance will at the very least contain cluster settings (
- * {@link #addCluster(Cluster)}), for supporting SSL and similar transport
- * settings, use {@link #setConnectionParams(ConnectionParams)}.
- */
- public static final class Builder {
-
- private final List<Cluster> clusters = new LinkedList<>();
- private FeedParams feedParams = new FeedParams.Builder().build();
- private ConnectionParams connectionParams = new ConnectionParams.Builder().build();
- private int clientQueueSize = 10000;
- private ErrorReporter errorReporter = null;
- private int throttlerMinSize = 0;
-
- /**
- * Add a Vespa installation for feeding documents into.
- *
- * @return this Builder instance, to support chaining
- */
- public Builder addCluster(Cluster cluster) {
- clusters.add(cluster);
- return this;
- }
-
- /**
- * Set parameters used for feeding the documents in the receiving
- * cluster. Reasonable defaults are supplied, so setting this should not
- * be necessary for testing.
- *
- * @return this builder instance to support chaining
- */
- public Builder setFeedParams(FeedParams feedParams) {
- this.feedParams = feedParams;
- return this;
- }
-
- /**
- * Transport parameters, like custom HTTP headers.
- *
- * @return this Builder instance, to support chaining
- */
- public Builder setConnectionParams(ConnectionParams connectionParams) {
- this.connectionParams = connectionParams;
- return this;
- }
-
- /**
- * Sets an error reporter that is invoked in case of serious errors.
- *
- * @param errorReporter the handler
- * @return pointer to builder.
- */
- public Builder setErrorReporter(ErrorReporter errorReporter) {
- this.errorReporter = errorReporter;
- return this;
- }
-
- /**
- * Sets the maximum number of document operations to hold in memory, waiting to be
- * sent to Vespa. When this threshold is reached, {@link java.io.OutputStream#close()} will block.
- *
- * @param clientQueueSize the maximum number of document operations to hold in memory.
- * @return pointer to builder.
- */
- public Builder setClientQueueSize(int clientQueueSize) {
- this.clientQueueSize = clientQueueSize;
- return this;
- }
-
- /**
- * Sets the minimum queue size of the throttler. If this is zero, it means that dynamic throttling is
- * not enabled. Otherwise it is the minimum size of the throttler for how many parallel requests that are
- * accepted. The max size of the throttler is the clientQueueSize.
- * @return the minimum number of requests to be used in throttler or zero if throttler is static.
- *
- * @param throttlerMinSize the value of the min size.
- */
- public Builder setThrottlerMinSize(int throttlerMinSize) {
- this.throttlerMinSize = throttlerMinSize;
- return this;
- }
-
- /**
- * Instantiates a {@link SessionParams} that can be given to a {@link com.yahoo.vespa.http.client.FeedClientFactory}.
- *
- * @return a SessionParams object with the parameters of this Builder
- */
- public SessionParams build() {
- return new SessionParams(
- clusters, feedParams, connectionParams, clientQueueSize, errorReporter, throttlerMinSize);
- }
-
- public FeedParams getFeedParams() {
- return feedParams;
- }
- public ConnectionParams getConnectionParams() {
- return connectionParams;
- }
- public int getClientQueueSize() {
- return clientQueueSize;
- }
- public int getThrottlerMinSize() {
- return throttlerMinSize;
- }
- }
-
- // NOTE! See toBuilder at the end of this class if you add fields here
-
- private final List<Cluster> clusters;
- private final FeedParams feedParams;
- private final ConnectionParams connectionParams;
- private final int clientQueueSize;
- private final ErrorReporter errorReport;
- private final int throttlerMinSize;
-
- private SessionParams(Collection<Cluster> clusters,
- FeedParams feedParams,
- ConnectionParams connectionParams,
- int clientQueueSize,
- ErrorReporter errorReporter,
- int throttlerMinSize) {
- this.clusters = Collections.unmodifiableList(new ArrayList<>(clusters));
- this.feedParams = feedParams;
- this.connectionParams = connectionParams;
- this.clientQueueSize = clientQueueSize;
- this.errorReport = errorReporter;
- this.throttlerMinSize = throttlerMinSize;
- }
-
- public List<Cluster> getClusters() {
- return clusters;
- }
-
- public FeedParams getFeedParams() {
- return feedParams;
- }
-
- public ConnectionParams getConnectionParams() {
- return connectionParams;
- }
-
- public int getClientQueueSize() {
- return clientQueueSize;
- }
-
- public int getThrottlerMinSize() {
- return throttlerMinSize;
- }
-
- public ErrorReporter getErrorReport() {
- return errorReport;
- }
-
- public Builder toBuilder() {
- Builder b = new Builder();
- clusters.forEach(c -> b.addCluster(c));
- b.setFeedParams(feedParams);
- b.setConnectionParams(connectionParams);
- b.setClientQueueSize(clientQueueSize);
- b.setErrorReporter(errorReport);
- b.setThrottlerMinSize(throttlerMinSize);
- return b;
- }
-
-}
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/package-info.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/package-info.java
deleted file mode 100644
index 9c07e819e90..00000000000
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/package-info.java
+++ /dev/null
@@ -1,8 +0,0 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-/**
- * Settings for creating clients/sessions.
- *
- * NOTE: This is a PUBLIC API, but not annotated as such because this is not a bundle and
- * we don't want to introduce Vespa dependencies.
- */
-package com.yahoo.vespa.http.client.config;
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/Document.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/Document.java
deleted file mode 100644
index 07262a60dd8..00000000000
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/Document.java
+++ /dev/null
@@ -1,96 +0,0 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.vespa.http.client.core;
-
-import java.io.IOException;
-import java.math.BigInteger;
-import java.nio.ByteBuffer;
-import java.nio.CharBuffer;
-import java.nio.charset.CharacterCodingException;
-import java.nio.charset.StandardCharsets;
-import java.time.Instant;
-import java.util.Objects;
-import java.util.concurrent.ThreadLocalRandom;
-
-/**
- * A document operation
- *
- * @author Einar M R Rosenvinge
- */
-final public class Document {
-
- private final String documentId;
- private final ByteBuffer data;
- private final Instant createTime;
- // This is initialized lazily to reduce work on calling thread (which is the thread calling the API)
- private String operationId = null;
- private final Object context;
- private Instant queueInsertTime;
-
- public Document(String documentId, byte[] data, Object context, Instant createTime) {
- this(documentId, null, ByteBuffer.wrap(data), context, createTime);
- }
-
- public Document(String documentId, String operationId, CharSequence data, Object context, Instant createTime) {
- this(documentId, operationId, encode(data, documentId), context, createTime);
- }
-
- private Document(String documentId, String operationId, ByteBuffer data, Object context, Instant createTime) {
- this.documentId = documentId;
- this.operationId = operationId;
- this.data = data;
- this.context = context;
- this.createTime = Objects.requireNonNull(createTime, "createTime cannot be null");
- this.queueInsertTime = createTime;
- }
-
- public void setQueueInsertTime(Instant queueInsertTime) {
- this.queueInsertTime = queueInsertTime;
- }
-
- public Instant getQueueInsertTime() { return queueInsertTime; }
-
- public CharSequence getDataAsString() {
- return StandardCharsets.UTF_8.decode(data.asReadOnlyBuffer());
- }
-
- public Object getContext() { return context; }
-
- public static class DocumentException extends IOException {
- private static final long serialVersionUID = 29832833292L;
- public DocumentException(String message)
- {
- super(message);
- }
- }
-
- public String getDocumentId() { return documentId; }
-
- public ByteBuffer getData() {
- return data.asReadOnlyBuffer();
- }
-
- public int size() {
- return data.remaining();
- }
-
- public Instant createTime() { return createTime; }
-
- public String getOperationId() {
- if (operationId == null) {
- operationId = new BigInteger(64, ThreadLocalRandom.current()).toString(32);
- }
- return operationId;
- }
-
- @Override
- public String toString() { return "document '" + documentId + "'"; }
-
- private static ByteBuffer encode(CharSequence data, String documentId) {
- try {
- return StandardCharsets.UTF_8.newEncoder().encode(CharBuffer.wrap(data));
- } catch (CharacterCodingException e) {
- throw new RuntimeException("Error encoding document data into UTF8 " + documentId, e);
- }
- }
-
-}
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/Encoder.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/Encoder.java
deleted file mode 100644
index e4781dc3a3f..00000000000
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/Encoder.java
+++ /dev/null
@@ -1,100 +0,0 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.vespa.http.client.core;
-
-/**
- * Simple encoding scheme to remove space, linefeed, control characters and
- * anything outside ISO 646.irv:1991 from strings. The scheme is supposed to be
- * human readable and debugging friendly. Opening and closing curly braces are
- * used as quoting characters, the output is by definition US-ASCII only
- * characters.
- *
- * @author Steinar Knutsen
- */
-public final class Encoder {
-
- /**
- * ISO 646.irv:1991 safe quoting into a StringBuilder instance.
- *
- * @param input the string to encode
- * @param output the destination buffer
- * @return the destination buffer given as input
- */
- public static StringBuilder encode(String input, StringBuilder output) {
- for (int i = 0; i < input.length(); i = input.offsetByCodePoints(i, 1)) {
- int c = input.codePointAt(i);
- if (c <= '~') {
- if (c <= ' ') {
- encode(c, output);
- } else {
- switch (c) {
- case '{':
- case '}':
- encode(c, output);
- break;
- default:
- output.append((char) c);
- }
- }
- } else {
- encode(c, output);
- }
- }
- return output;
- }
-
- /**
- * ISO 646.irv:1991 safe unquoting into a StringBuilder instance.
- *
- * @param input the string to decode
- * @param output the destination buffer
- * @return the destination buffer given as input
- * @throws IllegalArgumentException if the input string contains unexpected or invalid data
- */
- public static StringBuilder decode(String input, StringBuilder output) {
- for (int i = 0; i < input.length(); i = input.offsetByCodePoints(i, 1)) {
- int c = input.codePointAt(i);
- if (c > '~')
- throw new IllegalArgumentException("Input contained character above printable ASCII at position " + i);
- if (c == '{')
- i = decode(input, i, output);
- else
- output.append((char) c);
- }
- return output;
- }
-
- private static int decode(String input, int offset, StringBuilder output) {
- char c = 0;
- int end = offset;
- int start = offset + 1;
- int codePoint;
-
- while ('}' != c) {
- if (++end >= input.length()) {
- throw new IllegalArgumentException("Unterminated quoted character or empty quoting.");
- }
- c = input.charAt(end);
- }
- try {
- codePoint = Integer.parseInt(input.substring(start, end), 16);
- } catch (NumberFormatException e) {
- throw new IllegalArgumentException("Unexpected quoted data: [" + input.substring(start, end) + "]", e);
- }
- if (Character.charCount(codePoint) > 1) {
- try {
- output.append(Character.toChars(codePoint));
- } catch (IllegalArgumentException e) {
- throw new IllegalArgumentException("Unexpected quoted data: [" + input.substring(start, end) + "]", e);
- }
- } else {
- output.append((char) codePoint);
- }
- return end;
-
- }
-
- private static void encode(int c, StringBuilder output) {
- output.append("{").append(Integer.toHexString(c)).append("}");
- }
-
-}
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/EndpointResult.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/EndpointResult.java
deleted file mode 100644
index 94d7237422a..00000000000
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/EndpointResult.java
+++ /dev/null
@@ -1,29 +0,0 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.vespa.http.client.core;
-
-import com.yahoo.vespa.http.client.Result;
-
-/**
- * Result from a single endpoint.
- *
- * @author dybis
- */
-public class EndpointResult {
-
- private final String operationId;
- private final Result.Detail detail;
-
- public EndpointResult(String operationId, Result.Detail detail) {
- this.operationId = operationId;
- this.detail = detail;
- }
-
- public String getOperationId() {
- return operationId;
- }
-
- public Result.Detail getDetail() {
- return detail;
- }
-
-}
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/ErrorCode.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/ErrorCode.java
deleted file mode 100644
index 4e739218319..00000000000
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/ErrorCode.java
+++ /dev/null
@@ -1,33 +0,0 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.vespa.http.client.core;
-
-/**
- * Return types for the server.
- *
- * @author Einar M R Rosenvinge
- * @author Steinar Knutsen
- */
-public enum ErrorCode {
-
- OK(true, true),
- ERROR(false, false),
- TRANSIENT_ERROR(false, true),
- END_OF_FEED(true, true);
-
- private boolean success;
- private boolean _transient;
-
- ErrorCode(boolean success, boolean _transient) {
- this.success = success;
- this._transient = _transient;
- }
-
- public boolean isSuccess() {
- return success;
- }
-
- public boolean isTransient() {
- return _transient;
- }
-
-}
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/Exceptions.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/Exceptions.java
deleted file mode 100644
index 9ff3f793756..00000000000
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/Exceptions.java
+++ /dev/null
@@ -1,47 +0,0 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.vespa.http.client.core;
-
-/**
- * Helper methods for handling exceptions
- *
- * @author bratseth
- */
-public abstract class Exceptions {
-
- /**
- * <p>Returns a use friendly error message string which includes information from all nested exceptions.</p>
- *
- * <p>The form of this string is
- * <code>e.getMessage(): e.getCause().getMessage(): e.getCause().getCause().getMessage()...</code>
- * In addition, some heuristics are used to clean up common cases where exception nesting causes bad messages.
- * </p>
- */
- public static String toMessageString(Throwable t) {
- StringBuilder b = new StringBuilder();
- String lastMessage = null;
- String message;
- for (; t != null; t = t.getCause(), lastMessage = message) {
- message = getMessage(t);
- if (message == null) continue;
- if (message.equals(lastMessage)) continue;
- if (b.length() > 0) {
- b.append(": ");
- }
- b.append(message);
- }
- return b.toString();
- }
-
- /** Returns a useful message from *this* exception, or null if none */
- private static String getMessage(Throwable t) {
- String message = t.getMessage();
- if (t.getCause() == null) {
- if (message == null) return t.getClass().getSimpleName();
- } else {
- if (message == null) return null;
- if (message.equals(t.getCause().getClass().getName() + ": " + t.getCause().getMessage())) return null;
- }
- return message;
- }
-
-}
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/Headers.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/Headers.java
deleted file mode 100644
index d41f42ef652..00000000000
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/Headers.java
+++ /dev/null
@@ -1,37 +0,0 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.vespa.http.client.core;
-
-/**
- * Wrapper for shared constants used by both client and server.
- *
- * @author Steinar Knutsen
- */
-public final class Headers {
-
- private Headers() {
- }
-
- public static final String CLIENT_VERSION = "Vespa-Client-Version";
-
- public static final String TIMEOUT = "X-Yahoo-Feed-Timeout";
- public static final String DRAIN = "X-Yahoo-Feed-Drain";
- public static final String ROUTE = "X-Yahoo-Feed-Route";
- public static final String VERSION = "X-Yahoo-Feed-Protocol-Version";
- public static final String SESSION_ID = "X-Yahoo-Feed-Session-Id";
- public static final String DENY_IF_BUSY = "X-Yahoo-Feed-Deny-If-Busy";
- public static final String DATA_FORMAT = "X-Yahoo-Feed-Data-Format";
- // This value can be used to route the request to a specific server when using
- // several servers. It is a random value that is the same for the whole session.
- public static final String SHARDING_KEY = "X-Yahoo-Feed-Sharding-Key";
- public static final String PRIORITY = "X-Yahoo-Feed-Priority";
- public static final String TRACE_LEVEL = "X-Yahoo-Feed-Trace-Level";
-
- public static final int HTTP_NOT_ACCEPTABLE = 406;
-
- // For version 3 of the API
- public static final String CLIENT_ID = "X-Yahoo-Client-Id";
- public static final String OUTSTANDING_REQUESTS = "X-Yahoo-Outstanding-Requests";
- public static final String HOSTNAME = "X-Yahoo-Hostname";
- public static final String SILENTUPGRADE = "X-Yahoo-Silent-Upgrade";
-
-}
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/JsonReader.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/JsonReader.java
deleted file mode 100644
index 34b6d8b9144..00000000000
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/JsonReader.java
+++ /dev/null
@@ -1,245 +0,0 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.vespa.http.client.core;
-
-import com.fasterxml.jackson.core.JsonFactory;
-import com.fasterxml.jackson.core.JsonFactoryBuilder;
-import com.fasterxml.jackson.core.JsonParser;
-import com.yahoo.vespa.http.client.FeedClient;
-
-import java.io.EOFException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.io.UncheckedIOException;
-import java.nio.charset.StandardCharsets;
-import java.util.concurrent.atomic.AtomicInteger;
-
-/**
- * Reads a stream of json documents and sends them to feedClient.
- *
- * @author dybis
- */
-public class JsonReader {
-
- /**
- * Max size of documents. As we stream docs in for finding doc id, we buffer the data and later stream them to
- * feedclient after doc id has been revealed.
- */
- private final static int maxDocumentSizeChars = 50 * 1024 * 1024;
-
- // Intended to be used as static.
- private JsonReader() {}
-
- /**
- * Process one inputstream and send all documents to feedclient.
- *
- * @param inputStream source of array of json document.
- * @param feedClient where data is sent.
- * @param numSent counter to be incremented for every document streamed.
- */
- public static void read(InputStream inputStream, FeedClient feedClient, AtomicInteger numSent) {
- try (InputStreamJsonElementBuffer jsonElementBuffer = new InputStreamJsonElementBuffer(inputStream)) {
- JsonFactory jfactory = new JsonFactoryBuilder().disable(JsonFactory.Feature.CANONICALIZE_FIELD_NAMES).build();
- JsonParser jParser = jfactory.createParser(jsonElementBuffer);
- while (true) {
- int documentStart = (int) jParser.getCurrentLocation().getCharOffset();
- String docId = parseOneDocument(jParser);
- if (docId == null) {
- int documentEnd = (int) jParser.getCurrentLocation().getCharOffset();
- int documentLength = documentEnd - documentStart;
- int maxTruncatedLength = 500;
- StringBuilder stringBuilder = new StringBuilder(maxTruncatedLength + 3);
- for (int i = 0; i < Math.min(documentLength, maxTruncatedLength); i++)
- stringBuilder.append(jsonElementBuffer.circular.get(documentStart + i));
-
- if (documentLength > maxTruncatedLength)
- stringBuilder.append("...");
-
- throw new IllegalArgumentException("Document is missing ID: '" + stringBuilder.toString() + "'");
- }
- CharSequence data = jsonElementBuffer.getJsonAsArray(jParser.getCurrentLocation().getCharOffset());
- feedClient.stream(docId, data);
- numSent.incrementAndGet();
- }
- } catch (EOFException ignored) {
- // No more documents
- } catch (IOException ioe) {
- System.err.println(ioe.getMessage());
- throw new UncheckedIOException(ioe);
- }
- }
-
- /**
- * This class is intended to be used with a json parser. The data is sent through this intermediate stream
- * and to the parser. When the parser is done with a document, it calls postJsonAsArray which will
- * stream the document up to the current position of the parser.
- */
- private static class InputStreamJsonElementBuffer extends InputStreamReader {
-
- /**
- * Simple class implementing a circular array with some custom function used for finding start and end
- * of json object. The reason this is needed is that the json parser reads more than it parses
- * from the input stream (seems like about 8k). Using a ByteBuffer and manually moving data
- * is an order of magnitude slower than this implementation.
- */
- private class CircularCharBuffer {
-
- int readPointer = 0;
- int writePointer = 0;
- final char[] data;
- final int size;
-
- public CircularCharBuffer(int chars) {
- data = new char[chars];
- size = chars;
- }
-
- /**
- * This is for throwing away [ and spaces in front of a json object, and find the position of {.
- * Not for parsing much text.
- *
- * @return position for {
- */
- public int findNextObjectStart() {
- int readerPos = 0;
- while (get(readerPos) != '{') {
- readerPos++;
- assert(readerPos<=size);
- }
- return readerPos;
- }
-
- /**
- * This is for throwing away comma and or ], and for finding the position of the last }.
- * @param fromPos where to start searching
- * @return position for }
- */
- public int findLastObjectEnd(int fromPos) {
- while (get(fromPos-1) != '}') {
- fromPos--;
- assert(fromPos >=0);
- }
- return fromPos;
- }
-
- public void put(char dataByte) {
- data[writePointer] = dataByte;
- writePointer++;
- if (writePointer >= size) writePointer = 0;
- assert(writePointer != readPointer);
- }
-
- public char get(int pos) {
- int readPos = readPointer + pos;
- if (readPos >= size) readPos -= size;
- assert(readPos != writePointer);
- return data[readPos];
- }
-
- public void advance(int end) {
- readPointer += end;
- if (readPointer >= size) readPointer -= size;
- }
- }
-
- private final CircularCharBuffer circular = new CircularCharBuffer(maxDocumentSizeChars);
- private int processedChars = 0;
-
- public InputStreamJsonElementBuffer(InputStream inputStream) {
- super(inputStream, StandardCharsets.UTF_8);
- }
-
- /**
- * Removes comma, start/end array tag (last element), spaces etc that might be surrounding a json element.
- * Then sends the element to the outputstream.
- * @param parserPosition how far the parser has come. Please note that the parser might have processed
- * more data from the input source as it is reading chunks of data.
- * @throws IOException on errors
- */
- public CharSequence getJsonAsArray(long parserPosition) throws IOException {
- final int charSize = (int)parserPosition - processedChars;
- final int endPosOfJson = circular.findLastObjectEnd(charSize);
- final int startPosOfJson = circular.findNextObjectStart();
- processedChars += charSize;
- // This can be optimized since we rarely wrap the circular buffer.
- StringBuilder dataBuffer = new StringBuilder(endPosOfJson - startPosOfJson);
- for (int x = startPosOfJson; x < endPosOfJson; x++) {
- dataBuffer.append(circular.get(x));
- }
- circular.advance(charSize);
- return dataBuffer.toString();
- }
-
- @Override
- public int read(char[] b, int off, int len) throws IOException {
- int length = 0;
- int value = 0;
- while (length < len && value != -1) {
- value = read();
- if (value == -1) {
- return length == 0 ? -1 : length;
- }
- b[off + length] = (char) value;
- length++;
- }
- return length;
- }
-
- @Override
- public int read() throws IOException {
- int value = super.read();
- if (value >= 0) circular.put((char)value);
- return value;
- }
- }
-
- /**
- * Parse one document from the stream and return doc id.
- *
- * @param jParser parser with stream.
- * @return doc id of document or null if no more docs.
- * @throws IOException on problems
- */
- private static String parseOneDocument(JsonParser jParser) throws IOException {
- int objectLevel = 0;
- String documentId = null;
- boolean foundObject = false;
- boolean valueIsDocumentId = false;
- while (jParser.nextToken() != null) {
- String tokenAsText = jParser.getText();
- if (valueIsDocumentId) {
- if (documentId != null) {
- throw new RuntimeException("Several document ids");
- }
- documentId = tokenAsText;
- valueIsDocumentId = false;
- }
- switch(jParser.getCurrentToken()) {
- case START_OBJECT:
- foundObject = true;
- objectLevel++;
- break;
- case END_OBJECT:
- objectLevel--;
- if (objectLevel == 0) {
- return documentId;
- }
- break;
- case FIELD_NAME:
- if (objectLevel == 1 &&
- (tokenAsText.equals("put")
- || tokenAsText.endsWith("id")
- || tokenAsText.endsWith("update")
- || tokenAsText.equals("remove"))) {
- valueIsDocumentId = true;
- }
- break;
- default: // No operation on all other tags.
- }
- }
- if (!foundObject)
- throw new EOFException("No more documents");
- return null;
- }
-
-}
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/OperationStatus.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/OperationStatus.java
deleted file mode 100644
index ee6d96aa600..00000000000
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/OperationStatus.java
+++ /dev/null
@@ -1,90 +0,0 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.vespa.http.client.core;
-
-import com.google.common.base.Splitter;
-import java.util.Iterator;
-
-/**
- * Serialization/deserialization class for the result of a single document operation against Vespa.
- *
- * @author Steinar Knutsen
- */
-public final class OperationStatus {
-
- public static final String IS_CONDITION_NOT_MET = "IS-CONDITION-NOT-MET";
- public final String message;
- public final String operationId;
- public final ErrorCode errorCode;
- public final String traceMessage;
- public final boolean isConditionNotMet;
-
- private static final char EOL = '\n';
- private static final char SEPARATOR = ' ';
- private static final Splitter spaceSep = Splitter.on(SEPARATOR);
-
- /**
- * Constructor
- * @param message some human readable information what happened
- * @param operationId the doc ID for the operation
- * @param errorCode if it is success, transitive, or fatal
- * @param isConditionNotMet if error is due to condition not met
- * @param traceMessage any tracemessage
- */
- public OperationStatus(String message, String operationId, ErrorCode errorCode, boolean isConditionNotMet, String traceMessage) {
- this.isConditionNotMet = isConditionNotMet;
- this.message = message;
- this.operationId = operationId;
- this.errorCode = errorCode;
- this.traceMessage = traceMessage;
- }
-
- /**
- * Parse a single rendered OperationStatus string. White space may be padded after
- * and before the given status.
- *
- * @param singleLine
- * a rendered OperationStatus
- * @return an OperationStatus instance reflecting the input
- * @throws IllegalArgumentException
- * if there are illegal input data characters or the status
- * element has no corresponding value in the ErrorCode
- * enumeration
- */
- public static OperationStatus parse(String singleLine) {
- // Do note there is specifically left room for more arguments after
- // the first in the serialized form.
- Iterator<String> input = spaceSep.split(singleLine.trim()).iterator();
- String operationId;
- ErrorCode errorCode;
- String message;
- String traceMessage = "";
-
- operationId = Encoder.decode(input.next(), new StringBuilder())
- .toString();
- errorCode = ErrorCode.valueOf(Encoder.decode(input.next(),
- new StringBuilder()).toString());
-
- message = Encoder.decode(input.next(), new StringBuilder()).toString();
- // We are backwards compatible, meaning it is ok not to supply the last argument.
- boolean isConditionNotMet = false;
- if (message.startsWith(IS_CONDITION_NOT_MET)) {
- message = message.replaceFirst(IS_CONDITION_NOT_MET, "");
- isConditionNotMet = true;
- }
- if (input.hasNext()) {
- traceMessage = Encoder.decode(input.next(), new StringBuilder()).toString();
- }
- return new OperationStatus(message, operationId, errorCode, isConditionNotMet, traceMessage);
- }
-
- /** Returns a string representing the status. */
- public String render() {
- StringBuilder s = new StringBuilder();
- Encoder.encode(operationId, s).append(SEPARATOR);
- Encoder.encode(errorCode.toString(), s).append(SEPARATOR);
- Encoder.encode(isConditionNotMet ? IS_CONDITION_NOT_MET + message : message, s).append(SEPARATOR);
- Encoder.encode(traceMessage, s).append(EOL);
- return s.toString();
- }
-
-}
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/ServerResponseException.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/ServerResponseException.java
deleted file mode 100644
index d5a09d2566c..00000000000
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/ServerResponseException.java
+++ /dev/null
@@ -1,44 +0,0 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.vespa.http.client.core;
-
-/**
- * The request was not processed properly on the server.
- *
- * @author Einar M R Rosenvinge
- */
-@SuppressWarnings("serial")
-public class ServerResponseException extends Exception {
-
- private final int responseCode;
- private final String responseString;
-
- public ServerResponseException(int responseCode, String responseString) {
- super(responseString);
- this.responseCode = responseCode;
- this.responseString = responseString;
- }
-
- public ServerResponseException(String responseString) {
- super(responseString);
- this.responseCode = 0;
- this.responseString = responseString;
- }
-
- public int getResponseCode() {
- return responseCode;
- }
-
- public String getResponseString() {
- return responseString;
- }
-
- @Override
- public String toString() {
- if (responseCode > 0) {
- return responseCode + ": " + responseString;
- }
- return responseString;
- }
-
-}
-
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/ThrottlePolicy.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/ThrottlePolicy.java
deleted file mode 100644
index 101bd001fb8..00000000000
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/ThrottlePolicy.java
+++ /dev/null
@@ -1,74 +0,0 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.vespa.http.client.core;
-
-import static java.lang.Math.abs;
-import static java.lang.Math.max;
-import static java.lang.Math.min;
-
-/**
- * Class that has a method for finding next maxInFlight.
- *
- * @author dybis
- */
-public class ThrottlePolicy {
-
- public static final double SMALL_DIFFERENCE_IN_SUCCESSES_RATIO = 0.15;
- private static final double MINIMUM_DIFFERENCE = 0.05;
-
- /**
- * Generate nex in-flight value for throttling.
- *
- * @param maxPerformanceChange This value limit the dynamics of the algorithm.
- * @param numOk number of success in last phase
- * @param previousNumOk number of success in previous (before last) phase.
- * @param previousMaxInFlight number of max-in-flight in previous (before last) phase.
- * @param maxInFlightNow number of max-in-flight in last phase.
- * @param messagesQueued if any messages where queued.
- * @return The new value to be used for max-in-flight (should be cropped externally to fit max/min values).
- */
- public int calcNewMaxInFlight(double maxPerformanceChange, int numOk, int previousNumOk, int previousMaxInFlight,
- int maxInFlightNow, boolean messagesQueued) {
-
- double difference = calculateRuleBasedDifference(maxPerformanceChange, numOk, previousNumOk, previousMaxInFlight, maxInFlightNow);
- boolean previousRunWasBetter = numOk < previousNumOk;
- boolean previousRunHadLessInFlight = previousMaxInFlight < maxInFlightNow;
-
-
- int delta;
- if (previousRunWasBetter == previousRunHadLessInFlight) {
- delta = (int) (-1.1 * difference * maxInFlightNow);
- } else {
- delta = (int) (difference * maxInFlightNow);
- }
-
- // We don't want the same size since we need different sizes for algorithm to adjust.
- if (abs(delta) < 2) {
- delta = -3;
- }
- // We never used all permits in previous run, no reason to grow more, we should rather reduce permits.
- if (!messagesQueued && delta > 0) {
- delta = -2;
- }
- return maxInFlightNow + delta;
- }
-
- private static double calculateRuleBasedDifference(double maxPerformanceChange, double numOk, double previousNumOk,
- double previousMaxInFlight, double maxInFlightNow) {
- double difference = min(
- maxPerformanceChange,
- abs((numOk - previousNumOk) / safeDenominator(previousNumOk)));
-
- if (abs(previousMaxInFlight - maxInFlightNow) / safeDenominator(min(previousMaxInFlight, maxInFlightNow))
- < SMALL_DIFFERENCE_IN_SUCCESSES_RATIO) {
- difference = min(difference, 0.2);
- }
-
- // We want some changes so we can track performance as a result of different throttling.
- return max(difference, MINIMUM_DIFFERENCE);
- }
-
- private static double safeDenominator(double x) {
- return x == 0.0 ? 1.0 : x;
- }
-
-}
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/XmlFeedReader.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/XmlFeedReader.java
deleted file mode 100644
index 349959f496e..00000000000
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/XmlFeedReader.java
+++ /dev/null
@@ -1,154 +0,0 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.vespa.http.client.core;
-
-import com.yahoo.vespa.http.client.FeedClient;
-import org.xml.sax.Attributes;
-import org.xml.sax.InputSource;
-import org.xml.sax.ext.DefaultHandler2;
-
-import javax.xml.parsers.SAXParser;
-import javax.xml.parsers.SAXParserFactory;
-import java.io.InputStream;
-import java.nio.charset.StandardCharsets;
-import java.util.concurrent.atomic.AtomicInteger;
-
-/**
- * Reads an input stream of xml, sends these to session.
- *
- * @author dybis
-*/
-public class XmlFeedReader {
-
- // Static class.
- private XmlFeedReader() {}
-
- public static void read(InputStream inputStream, FeedClient feedClient, AtomicInteger numSent) throws Exception {
- SAXParserFactory parserFactory = SAXParserFactory.newInstance();
- // XXE prevention:
- parserFactory.setFeature("http://xml.org/sax/features/external-general-entities", false);
- parserFactory.setValidating(false);
- parserFactory.setNamespaceAware(false);
- SAXParser parser = parserFactory.newSAXParser();
- SAXClientFeeder saxClientFeeder = new SAXClientFeeder(feedClient, numSent);
-
- InputSource inputSource = new InputSource();
- inputSource.setEncoding(StandardCharsets.UTF_8.displayName());
- inputSource.setByteStream(inputStream);
- // This is to send events about CDATA to the saxClientFeeder
- // (https://docs.oracle.com/javase/tutorial/jaxp/sax/events.html)
- parser.setProperty("http://xml.org/sax/properties/lexical-handler", saxClientFeeder);
- parser.parse(inputSource, saxClientFeeder);
- }
-}
-
-/**
- * Streams XML and sends each document operation to feeder.
- */
-class SAXClientFeeder extends DefaultHandler2 {
-
- public static final String CDATA_START = "<![CDATA[";
- public static final String CDATA_STOP = "]]>";
- private final FeedClient feedClient;
- int vespaIndent = 0;
- int documentIndent = 0;
- String documentId = null;
- StringBuilder content = new StringBuilder();
- final AtomicInteger numSent;
- boolean isCData = false;
-
- public SAXClientFeeder(FeedClient feedClient, AtomicInteger numSent) {
- this.feedClient = feedClient;
- this.numSent = numSent;
- }
-
- @Override
- public void startCDATA() {
- content.append(CDATA_START);
- isCData = true;
- }
-
- @Override
- public void endCDATA() {
- content.append(CDATA_STOP);
- isCData = false;
- }
-
- @Override
- public void comment(char[] ch, int start, int length) { }
-
- @SuppressWarnings("fallthrough")
- @Override
- public void startElement(String uri, String localName, String qName, Attributes attributes) {
- switch(qName){
- case "vespafeed":
- vespaIndent++;
- if (vespaIndent == 1 && documentIndent == 0) {
- // If this is the first vespafeed tag, it should not be added to content of the first item.
- return;
- }
- case "update":
- case "remove":
- case "document" :
- documentIndent++;
- documentId = attributes.getValue("documentid");
- content = new StringBuilder();
- }
- content.append("<" + qName);
- if (attributes != null) {
- for (int i = 0; i < attributes.getLength (); i++) {
- content.append(" ")
- .append(attributes.getQName(i))
- .append("=\"");
- String attributesValue = attributes.getValue(i);
- characters(attributesValue.toCharArray(), 0, attributesValue.length());
- content.append("\"");
- }
- }
- content.append(">");
- }
-
- @Override
- public void endElement(String uri, String localName, String qName) {
- content.append("</")
- .append(qName)
- .append(">");
- switch(qName){
- case "vespafeed":
- vespaIndent--;
- return;
- case "update":
- case "remove":
- case "document" :
- documentIndent--;
- if (documentIndent == 0) {
- if (documentId == null || documentId.isEmpty()) {
- throw new IllegalArgumentException("no docid");
- }
- feedClient.stream(documentId, content);
- numSent.incrementAndGet();
- }
- }
- }
-
- @Override
- public void characters (char buf [], int offset, int len) {
- if (isCData) {
- content.append(buf, offset, len);
- return;
- }
-
- // This is on the critical loop for performance, otherwise a library would have been used.
- // We can do a few shortcuts as well as this data is already decoded by SAX parser.
- for (int x = offset ; x < len + offset ; x++) {
- switch (buf[x]) {
- case '&' : content.append("&amp;"); continue;
- case '<' : content.append("&lt;"); continue;
- case '>' : content.append("&gt;"); continue;
- case '"' : content.append("&quot;"); continue;
- case '\'' : content.append("&apos;"); continue;
- default: content.append(buf[x]); continue;
- }
- }
- }
-
-}
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/api/FeedClientImpl.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/api/FeedClientImpl.java
deleted file mode 100644
index b46f23923f5..00000000000
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/api/FeedClientImpl.java
+++ /dev/null
@@ -1,102 +0,0 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.vespa.http.client.core.api;
-
-import com.yahoo.vespa.http.client.FeedClient;
-import com.yahoo.vespa.http.client.config.SessionParams;
-import com.yahoo.vespa.http.client.core.Document;
-import com.yahoo.vespa.http.client.core.ThrottlePolicy;
-import com.yahoo.vespa.http.client.core.operationProcessor.IncompleteResultsThrottler;
-import com.yahoo.vespa.http.client.core.operationProcessor.OperationProcessor;
-
-import java.nio.charset.CharsetEncoder;
-import java.nio.charset.CodingErrorAction;
-import java.nio.charset.StandardCharsets;
-import java.time.Clock;
-import java.time.Instant;
-import java.util.Optional;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.logging.Logger;
-
-/**
- * Implementation of FeedClient. It is a thin layer on top of multiClusterHandler and multiClusterResultAggregator.
- *
- * @author dybis
- */
-public class FeedClientImpl implements FeedClient {
-
- private static final Logger log = Logger.getLogger(FeedClientImpl.class.getName());
- private static final AtomicBoolean warningPrinted = new AtomicBoolean(false);
-
- private final Clock clock;
- private final OperationProcessor operationProcessor;
- private final long closeTimeoutMs;
- private final long sleepTimeMs = 500;
-
- public FeedClientImpl(SessionParams sessionParams,
- ResultCallback resultCallback,
- ScheduledThreadPoolExecutor timeoutExecutor,
- Clock clock) {
- this.clock = clock;
- this.closeTimeoutMs = (10 + 3 * sessionParams.getConnectionParams().getMaxRetries()) *
- (sessionParams.getFeedParams().getServerTimeout(TimeUnit.MILLISECONDS) +
- sessionParams.getFeedParams().getClientTimeout(TimeUnit.MILLISECONDS));
- this.operationProcessor = new OperationProcessor(
- new IncompleteResultsThrottler(sessionParams.getThrottlerMinSize(),
- sessionParams.getClientQueueSize(),
- clock,
- new ThrottlePolicy()),
- resultCallback,
- sessionParams,
- timeoutExecutor,
- clock);
- if (warningPrinted.compareAndSet(false, true)) {
- log.warning("The vespa-http-client is deprecated and will be removed on Vespa 8. " +
- "See https://docs.vespa.ai/en/vespa8-release-notes.html");
- }
- }
-
- @Override
- public void stream(String documentId, String operationId, CharSequence documentData, Object context) {
- CharsetEncoder charsetEncoder = StandardCharsets.UTF_8.newEncoder();
- charsetEncoder.onMalformedInput(CodingErrorAction.REPORT);
- charsetEncoder.onUnmappableCharacter(CodingErrorAction.REPORT);
-
- Document document = new Document(documentId, operationId, documentData, context, clock.instant());
- operationProcessor.sendDocument(document);
- }
-
- @Override
- public void close() {
- Instant lastOldestResultReceivedAt = Instant.now();
- Optional<String> oldestIncompleteId = operationProcessor.oldestIncompleteResultId();
-
- while (oldestIncompleteId.isPresent() && waitForOperations(lastOldestResultReceivedAt, sleepTimeMs, closeTimeoutMs)) {
- Optional<String> oldestIncompleteIdNow = operationProcessor.oldestIncompleteResultId();
- if ( ! oldestIncompleteId.equals(oldestIncompleteIdNow))
- lastOldestResultReceivedAt = Instant.now();
- oldestIncompleteId = oldestIncompleteIdNow;
- }
- operationProcessor.close();
- }
-
- @Override
- public String getStatsAsJson() {
- return operationProcessor.getStatsAsJson();
- }
-
- // On return value true, wait more. Public for testing.
- public static boolean waitForOperations(Instant lastResultReceived, long sleepTimeMs, long closeTimeoutMs) {
- if (lastResultReceived.plusMillis(closeTimeoutMs).isBefore(Instant.now())) {
- return false;
- }
- try {
- Thread.sleep(sleepTimeMs);
- } catch (InterruptedException e) {
- return false;
- }
- return true;
- }
-
-}
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/api/MultiClusterSessionOutputStream.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/api/MultiClusterSessionOutputStream.java
deleted file mode 100644
index 27fc22b9675..00000000000
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/api/MultiClusterSessionOutputStream.java
+++ /dev/null
@@ -1,40 +0,0 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.vespa.http.client.core.api;
-
-import com.yahoo.vespa.http.client.core.Document;
-import com.yahoo.vespa.http.client.core.operationProcessor.OperationProcessor;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.time.Clock;
-
-/**
- * Class for wiring up the Session API. It is the return value of stream() in the Session API.
- *
- * @author dybis
-*/
-class MultiClusterSessionOutputStream extends ByteArrayOutputStream {
-
- private final CharSequence documentId;
- private final OperationProcessor operationProcessor;
- private final Object context;
- private final Clock clock;
-
- public MultiClusterSessionOutputStream(CharSequence documentId,
- OperationProcessor operationProcessor,
- Object context,
- Clock clock) {
- this.documentId = documentId;
- this.context = context;
- this.operationProcessor = operationProcessor;
- this.clock = clock;
- }
-
- @Override
- public void close() throws IOException {
- Document document = new Document(documentId.toString(), toByteArray(), context, clock.instant());
- operationProcessor.sendDocument(document);
- super.close();
- }
-
-}
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/api/SessionImpl.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/api/SessionImpl.java
deleted file mode 100644
index 05b66ac4a46..00000000000
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/api/SessionImpl.java
+++ /dev/null
@@ -1,73 +0,0 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.vespa.http.client.core.api;
-
-import com.yahoo.vespa.http.client.FeedClient;
-import com.yahoo.vespa.http.client.Result;
-import com.yahoo.vespa.http.client.config.SessionParams;
-import com.yahoo.vespa.http.client.core.ThrottlePolicy;
-import com.yahoo.vespa.http.client.core.operationProcessor.IncompleteResultsThrottler;
-import com.yahoo.vespa.http.client.core.operationProcessor.OperationProcessor;
-
-import java.io.OutputStream;
-import java.time.Clock;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-
-/**
- * This class wires up the Session API using MultiClusterHandler and MultiClusterSessionOutputStream.
- *
- * @deprecated
- */
-@Deprecated // TODO: Remove on Vespa 8
-public class SessionImpl implements com.yahoo.vespa.http.client.Session {
-
- private final OperationProcessor operationProcessor;
- private final BlockingQueue<Result> resultQueue = new LinkedBlockingQueue<>();
- private final Clock clock;
-
- public SessionImpl(SessionParams sessionParams, ScheduledThreadPoolExecutor timeoutExecutor, Clock clock) {
- this.clock = clock;
- this.operationProcessor = new OperationProcessor(
- new IncompleteResultsThrottler(
- sessionParams.getThrottlerMinSize(),
- sessionParams.getClientQueueSize(),
- clock,
- new ThrottlePolicy()),
- new FeedClient.ResultCallback() {
- @Override
- public void onCompletion(String docId, Result documentResult) {
- resultQueue.offer(documentResult);
- }
- },
- sessionParams,
- timeoutExecutor,
- clock);
- }
-
- @Override
- public OutputStream stream(CharSequence documentId) {
- return new MultiClusterSessionOutputStream(documentId, operationProcessor, null, clock);
- }
-
- @Override
- public BlockingQueue<Result> results() {
- return resultQueue;
- }
-
- @Override
- public void close() {
- operationProcessor.close();
- }
-
- @Override
- public String getStatsAsJson() {
- return operationProcessor.getStatsAsJson();
- }
-
- // For testing only (legacy tests).
- public int getIncompleteResultQueueSize() {
- return operationProcessor.getIncompleteResultQueueSize();
- }
-
-}
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ApacheGatewayConnection.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ApacheGatewayConnection.java
deleted file mode 100644
index 6b1078fa393..00000000000
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ApacheGatewayConnection.java
+++ /dev/null
@@ -1,507 +0,0 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.vespa.http.client.core.communication;
-
-import ai.vespa.util.http.hc4.VespaHttpClientBuilder;
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.yahoo.security.SslContextBuilder;
-import com.yahoo.vespa.http.client.config.ConnectionParams;
-import com.yahoo.vespa.http.client.config.Endpoint;
-import com.yahoo.vespa.http.client.config.FeedParams;
-import com.yahoo.vespa.http.client.core.Document;
-import com.yahoo.vespa.http.client.core.Encoder;
-import com.yahoo.vespa.http.client.core.Headers;
-import com.yahoo.vespa.http.client.core.ServerResponseException;
-import com.yahoo.vespa.http.client.core.Vtag;
-import org.apache.http.Header;
-import org.apache.http.HttpHost;
-import org.apache.http.HttpRequest;
-import org.apache.http.HttpResponse;
-import org.apache.http.StatusLine;
-import org.apache.http.auth.AUTH;
-import org.apache.http.auth.AuthScope;
-import org.apache.http.auth.ChallengeState;
-import org.apache.http.auth.Credentials;
-import org.apache.http.auth.UsernamePasswordCredentials;
-import org.apache.http.client.config.RequestConfig;
-import org.apache.http.client.methods.HttpPost;
-import org.apache.http.client.protocol.HttpClientContext;
-import org.apache.http.entity.InputStreamEntity;
-import org.apache.http.impl.auth.BasicScheme;
-import org.apache.http.impl.client.BasicAuthCache;
-import org.apache.http.impl.client.BasicCredentialsProvider;
-import org.apache.http.impl.client.CloseableHttpClient;
-import org.apache.http.impl.client.HttpClientBuilder;
-import org.apache.http.message.BasicHeader;
-import org.apache.http.protocol.BasicHttpContext;
-import org.apache.http.protocol.HttpContext;
-import org.apache.http.util.EntityUtils;
-
-import javax.net.ssl.SSLContext;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.ByteBuffer;
-import java.nio.charset.StandardCharsets;
-import java.time.Clock;
-import java.time.Instant;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Optional;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-import java.util.zip.GZIPOutputStream;
-
-/**
- * @author Einar M R Rosenvinge
- */
-class ApacheGatewayConnection implements GatewayConnection {
-
- private static final Logger log = Logger.getLogger(ApacheGatewayConnection.class.getName());
- private static final ObjectMapper mapper = new ObjectMapper();
- private static final String PATH = "/reserved-for-internal-use/feedapi?";
- private static final byte[] START_OF_FEED_XML = "<vespafeed>\n".getBytes(StandardCharsets.UTF_8);
- private static final byte[] END_OF_FEED_XML = "\n</vespafeed>\n".getBytes(StandardCharsets.UTF_8);
- private static final byte[] START_OF_FEED_JSON = "[".getBytes(StandardCharsets.UTF_8);
- private static final byte[] END_OF_FEED_JSON = "]".getBytes(StandardCharsets.UTF_8);
-
- private final List<Integer> supportedVersions = new ArrayList<>();
- private final byte[] startOfFeed;
- private final byte[] endOfFeed;
- private final Endpoint endpoint;
- private final FeedParams feedParams;
- private final String clusterSpecificRoute;
- private final ConnectionParams connectionParams;
- private CloseableHttpClient httpClient;
- private Instant connectionTime = null;
- private Instant lastPollTime = null;
- private String sessionId;
- private final String clientId;
- private int negotiatedVersion = -1;
- private final HttpClientFactory httpClientFactory;
- private final String shardingKey = UUID.randomUUID().toString().substring(0, 5);
- private final Clock clock;
-
- ApacheGatewayConnection(Endpoint endpoint,
- FeedParams feedParams,
- String clusterSpecificRoute,
- ConnectionParams connectionParams,
- HttpClientFactory httpClientFactory,
- String clientId,
- Clock clock) {
- supportedVersions.add(3);
- this.endpoint = endpoint;
- this.feedParams = feedParams;
- this.clusterSpecificRoute = clusterSpecificRoute;
- this.httpClientFactory = httpClientFactory;
- this.connectionParams = connectionParams;
- this.httpClient = null;
- this.clientId = clientId;
- this.clock = clock;
-
- if (feedParams.getDataFormat() == FeedParams.DataFormat.JSON_UTF8) {
- startOfFeed = START_OF_FEED_JSON;
- endOfFeed = END_OF_FEED_JSON;
- } else {
- startOfFeed = START_OF_FEED_XML;
- endOfFeed = END_OF_FEED_XML;
- }
- }
-
- @Override
- public InputStream write(List<Document> docs) throws ServerResponseException, IOException {
- return write(docs, false, connectionParams.getUseCompression());
- }
-
- @Override
- public InputStream poll() throws ServerResponseException, IOException {
- lastPollTime = clock.instant();
- return write(Collections.<Document>emptyList(), false, false);
- }
-
- @Override
- public Instant lastPollTime() { return lastPollTime; }
-
- @Override
- public InputStream drain() throws ServerResponseException, IOException {
- return write(Collections.<Document>emptyList(), true, false);
- }
-
- @Override
- public boolean connect() {
- log.fine(() -> "Attempting to connect to " + endpoint);
- if (httpClient != null)
- log.log(Level.WARNING, "Previous httpClient still exists.");
- httpClient = httpClientFactory.createClient();
- connectionTime = clock.instant();
- return httpClient != null;
- }
-
- @Override
- public Instant connectionTime() { return connectionTime; }
-
- // Protected for easier testing only.
- protected static InputStreamEntity zipAndCreateEntity(final InputStream inputStream) throws IOException {
- byte[] buffer = new byte[4096];
- GZIPOutputStream gzos = null;
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- try {
- gzos = new GZIPOutputStream(baos);
- while (inputStream.available() > 0) {
- int length = inputStream.read(buffer);
- gzos.write(buffer, 0,length);
- }
- } finally {
- if (gzos != null) {
- gzos.close();
- }
- }
- byte[] fooGzippedBytes = baos.toByteArray();
- return new InputStreamEntity(new ByteArrayInputStream(fooGzippedBytes), -1);
- }
-
- private InputStream write(List<Document> docs, boolean drain, boolean useCompression)
- throws ServerResponseException, IOException {
- HttpPost httpPost = createPost(drain, useCompression, false);
-
- ByteBuffer[] buffers = getDataWithStartAndEndOfFeed(docs, negotiatedVersion);
- InputStream inputStream = new ByteBufferInputStream(buffers);
- InputStreamEntity reqEntity = useCompression ? zipAndCreateEntity(inputStream)
- : new InputStreamEntity(inputStream, -1);
- reqEntity.setChunked(true);
- httpPost.setEntity(reqEntity);
- return executePost(httpPost);
- }
-
- private ByteBuffer[] getDataWithStartAndEndOfFeed(List<Document> docs, int version) {
- List<ByteBuffer> data = new ArrayList<>();
- if (version == 3) {
- for (Document doc : docs) {
- int operationSize = doc.size() + startOfFeed.length + endOfFeed.length;
- StringBuilder envelope = new StringBuilder();
- Encoder.encode(doc.getOperationId(), envelope);
- envelope.append(' ');
- envelope.append(Integer.toHexString(operationSize));
- envelope.append('\n');
- data.add(StandardCharsets.US_ASCII.encode(envelope.toString()));
- data.add(ByteBuffer.wrap(startOfFeed));
- data.add(doc.getData());
- data.add(ByteBuffer.wrap(endOfFeed));
- }
- } else {
- throw new IllegalArgumentException("Protocol version " + version + " unsupported by client.");
- }
- return data.toArray(new ByteBuffer[data.size()]);
- }
-
- private HttpPost createPost(boolean drain, boolean useCompression, boolean isHandshake) {
- HttpPost httpPost = new HttpPost(createUri());
-
- for (int v : supportedVersions) {
- httpPost.addHeader(Headers.VERSION, "" + v);
- }
- if (sessionId != null) {
- httpPost.setHeader(Headers.SESSION_ID, sessionId);
- }
- if (clientId != null) {
- httpPost.setHeader(Headers.CLIENT_ID, clientId);
- }
- httpPost.setHeader(Headers.SHARDING_KEY, shardingKey);
- httpPost.setHeader(Headers.DRAIN, drain ? "true" : "false");
- if (clusterSpecificRoute != null) {
- httpPost.setHeader(Headers.ROUTE, feedParams.getRoute());
- } else {
- if (feedParams.getRoute() != null) {
- httpPost.setHeader(Headers.ROUTE, feedParams.getRoute());
- }
- }
- if (!isHandshake) {
- if (feedParams.getDataFormat() == FeedParams.DataFormat.JSON_UTF8) {
- httpPost.setHeader(Headers.DATA_FORMAT, FeedParams.DataFormat.JSON_UTF8.name());
- } else {
- httpPost.setHeader(Headers.DATA_FORMAT, FeedParams.DataFormat.XML_UTF8.name());
- }
- if (feedParams.getPriority() != null) {
- httpPost.setHeader(Headers.PRIORITY, feedParams.getPriority());
- }
- if (connectionParams.getTraceLevel() != 0) {
- httpPost.setHeader(Headers.TRACE_LEVEL, String.valueOf(connectionParams.getTraceLevel()));
- }
- if (negotiatedVersion == 3 && feedParams.getDenyIfBusyV3()) {
- httpPost.setHeader(Headers.DENY_IF_BUSY, "true");
- }
- }
- if (feedParams.getSilentUpgrade()) {
- httpPost.setHeader(Headers.SILENTUPGRADE, "true");
- }
- httpPost.setHeader(Headers.TIMEOUT, "" + feedParams.getServerTimeout(TimeUnit.SECONDS));
-
- for (Map.Entry<String, String> extraHeader : connectionParams.getHeaders()) {
- httpPost.addHeader(extraHeader.getKey(), extraHeader.getValue());
- }
- connectionParams.getDynamicHeaders().forEach((headerName, provider) -> {
- String headerValue = Objects.requireNonNull(
- provider.getHeaderValue(),
- provider.getClass().getName() + ".getHeader() returned null as header value!");
- httpPost.addHeader(headerName, headerValue);
- });
-
- if (useCompression) { // This causes the apache client to gzip the request content. Weird, huh?
- httpPost.setHeader("Content-Encoding", "gzip");
- }
- return httpPost;
- }
-
- private InputStream executePost(HttpPost httpPost) throws ServerResponseException, IOException {
- if (httpClient == null)
- throw new IOException("Trying to executePost while not having a connection/http client");
- String proxyAuthzHeader = getCustomProxyAuthorizationHeader(connectionParams).orElse(null);
- HttpResponse response;
- if (connectionParams.getProxyHost() != null && proxyAuthzHeader != null) {
- HttpContext context = createContextForcingPreemptiveProxyAuth(proxyAuthzHeader);
- response = httpClient.execute(httpPost, context);
- } else {
- response = httpClient.execute(httpPost);
- }
- try {
- verifyServerResponseCode(response);
- verifyServerVersion(response.getFirstHeader(Headers.VERSION));
- verifySessionHeader(response.getFirstHeader(Headers.SESSION_ID));
- } catch (ServerResponseException e) {
- // Ensure response is consumed to allow connection reuse later on
- EntityUtils.consumeQuietly(response.getEntity());
- throw e;
- }
- // Consume response now to allow connection to be reused immediately
- byte[] responseData = EntityUtils.toByteArray(response.getEntity());
- return responseData == null ? null : new ByteArrayInputStream(responseData);
- }
-
- private static Optional<String> getCustomProxyAuthorizationHeader(ConnectionParams params) {
- return params.getHeaders().stream()
- .filter(h -> h.getKey().equals(AUTH.PROXY_AUTH_RESP))
- .findAny()
- .map(Map.Entry::getValue);
- }
-
- private HttpContext createContextForcingPreemptiveProxyAuth(String proxyAuthzHeader) {
- BasicAuthCache authCache = new BasicAuthCache();
- HttpHost proxy = new HttpHost(connectionParams.getProxyHost(), connectionParams.getProxyPort());
- authCache.put(proxy, new CustomAuthScheme(proxyAuthzHeader));
- HttpContext context = new BasicHttpContext();
- context.setAttribute(HttpClientContext.AUTH_CACHE, authCache);
- BasicCredentialsProvider prov = new BasicCredentialsProvider();
- prov.setCredentials(new AuthScope(proxy), new UsernamePasswordCredentials("", ""));
- context.setAttribute(HttpClientContext.CREDS_PROVIDER, prov);
- return context;
- }
- private static class CustomAuthScheme extends BasicScheme {
- final String proxyAuthzHeader;
- @SuppressWarnings("deprecation")
- CustomAuthScheme(String proxyAuthzHeader) {
- super(ChallengeState.PROXY);
- this.proxyAuthzHeader = proxyAuthzHeader;
- }
- @Override
- public Header authenticate(Credentials credentials, HttpRequest request, HttpContext context) {
- return new BasicHeader(AUTH.PROXY_AUTH_RESP, proxyAuthzHeader);
- }
- }
-
-
- private void verifyServerResponseCode(HttpResponse response) throws ServerResponseException {
- StatusLine statusLine = response.getStatusLine();
- int statusCode = statusLine.getStatusCode();
-
- // We use code 261-299 to report errors related to internal transitive errors that the tenants should not care
- // about to avoid masking more serious errors.
- if (statusCode > 199 && statusCode < 260) return;
- if (statusCode == 299) throw new ServerResponseException(429, "Too many requests.");
- throw new ServerResponseException(statusCode,
- tryGetDetailedErrorMessage(response).orElseGet(statusLine::getReasonPhrase));
- }
-
- private static Optional<String> tryGetDetailedErrorMessage(HttpResponse response) {
- Header contentType = response.getEntity().getContentType();
- if (contentType == null || !contentType.getValue().equalsIgnoreCase("application/json")) return Optional.empty();
- try (InputStream in = response.getEntity().getContent()) {
- JsonNode jsonNode = mapper.readTree(in);
- JsonNode message = jsonNode.get("message");
- if (message == null || message.textValue() == null) return Optional.empty();
- return Optional.of(response.getStatusLine().getReasonPhrase() + " - " + message.textValue());
- } catch (IOException e) {
- return Optional.empty();
- }
- }
-
- private void verifySessionHeader(Header serverHeader) throws ServerResponseException {
- if (serverHeader == null) {
- throw new ServerResponseException("Got no session ID from server.");
- }
- final String serverHeaderVal = serverHeader.getValue().trim();
- if (negotiatedVersion == 3) {
- if (clientId == null || !clientId.equals(serverHeaderVal)) {
- String message = "Running using v3. However, server responds with different session " +
- "than client has set; " + serverHeaderVal + " vs client code " + clientId;
- log.severe(message);
- throw new ServerResponseException(message);
- }
- return;
- }
- if (sessionId == null) { //this must be the first request
- log.finer("Got session ID from server: " + serverHeaderVal);
- this.sessionId = serverHeaderVal;
- } else {
- if (!sessionId.equals(serverHeaderVal)) {
- log.info("Request has been routed to a server which does not recognize the client session." +
- " Most likely cause is upgrading of cluster, transitive error.");
- throw new ServerResponseException("Session ID received from server ('" + serverHeaderVal +
- "') does not match cached session ID ('" + sessionId + "')");
- }
- }
- }
-
- private void verifyServerVersion(Header serverHeader) throws ServerResponseException {
- if (serverHeader == null) {
- throw new ServerResponseException("Got bad protocol version from server.");
- }
- int serverVersion;
- try {
- serverVersion = Integer.parseInt(serverHeader.getValue());
- } catch (NumberFormatException nfe) {
- throw new ServerResponseException("Got bad protocol version from server: " + nfe.getMessage());
- }
- if (!supportedVersions.contains(serverVersion)) {
- throw new ServerResponseException("Unsupported version: " + serverVersion
- + ". Supported versions: " + supportedVersions);
- }
- if (negotiatedVersion == -1) {
- if (log.isLoggable(Level.FINE)) {
- log.log(Level.FINE, "Server decided upon protocol version " + serverVersion + ".");
- }
- }
- this.negotiatedVersion = serverVersion;
- }
-
- private String createUri() {
- StringBuilder u = new StringBuilder();
- u.append(endpoint.isUseSsl() ? "https://" : "http://");
- u.append(endpoint.getHostname());
- u.append(":").append(endpoint.getPort());
- u.append(PATH);
- u.append(feedParams.toUriParameters());
- return u.toString();
- }
-
- @Override
- public Endpoint getEndpoint() {
- return endpoint;
- }
-
- @Override
- public void handshake() throws ServerResponseException, IOException {
- boolean useCompression = false;
- boolean drain = false;
- boolean handshake = true;
- HttpPost httpPost = createPost(drain, useCompression, handshake);
-
- String oldSessionID = sessionId;
- sessionId = null;
- try (InputStream stream = executePost(httpPost)) {
- if (oldSessionID != null && !oldSessionID.equals(sessionId)) {
- throw new ServerResponseException(
- "Session ID changed after new handshake, some documents might not be acked to correct thread. "
- + getEndpoint() + " old " + oldSessionID + " new " + sessionId);
- }
- if (stream == null) {
- log.fine("Stream is null.");
- }
- log.fine("Got session ID " + sessionId);
- }
- }
-
- @Override
- public void close() {
- try {
- if (httpClient != null)
- httpClient.close();
- }
- catch (IOException e) {
- log.log(Level.WARNING, "Failed closing HTTP client", e);
- }
- httpClient = null;
- }
-
- /**
- * On re-connect we want to recreate the connection, hence we need a factory.
- */
- public static class HttpClientFactory {
-
- private final FeedParams feedParams;
- final ConnectionParams connectionParams;
- final boolean useSsl;
-
- public HttpClientFactory(FeedParams feedParams, ConnectionParams connectionParams, boolean useSsl) {
- this.feedParams = feedParams;
- this.connectionParams = connectionParams;
- this.useSsl = useSsl;
- }
-
- public CloseableHttpClient createClient() {
- HttpClientBuilder clientBuilder;
- if (connectionParams.useTlsConfigFromEnvironment()) {
- clientBuilder = VespaHttpClientBuilder.create();
- } else {
- clientBuilder = HttpClientBuilder.create();
- if (connectionParams.getSslContext() != null) {
- setSslContext(clientBuilder, connectionParams.getSslContext());
- } else {
- SslContextBuilder builder = new SslContextBuilder();
- if (connectionParams.getPrivateKey() != null && connectionParams.getCertificate() != null) {
- builder.withKeyStore(connectionParams.getPrivateKey(), connectionParams.getCertificate());
- }
- if (connectionParams.getCaCertificates() != null) {
- builder.withTrustStore(connectionParams.getCaCertificates());
- }
- setSslContext(clientBuilder, builder.build());
- }
- if (connectionParams.getHostnameVerifier() != null) {
- clientBuilder.setSSLHostnameVerifier(connectionParams.getHostnameVerifier());
- }
- clientBuilder.setUserTokenHandler(context -> null); // https://stackoverflow.com/a/42112034/1615280
- }
- clientBuilder.setMaxConnPerRoute(1);
- clientBuilder.setMaxConnTotal(1);
- clientBuilder.setUserAgent(String.format("vespa-http-client (%s)", Vtag.V_TAG_COMPONENT));
- clientBuilder.setDefaultHeaders(Collections.singletonList(new BasicHeader(Headers.CLIENT_VERSION, Vtag.V_TAG_COMPONENT)));
- int millisTotalTimeout = (int) (feedParams.getClientTimeout(TimeUnit.MILLISECONDS) + feedParams.getServerTimeout(TimeUnit.MILLISECONDS));
- RequestConfig.Builder requestConfigBuilder = RequestConfig.custom()
- .setSocketTimeout(millisTotalTimeout)
- .setConnectTimeout(millisTotalTimeout);
- if (connectionParams.getProxyHost() != null) {
- requestConfigBuilder.setProxy(new HttpHost(connectionParams.getProxyHost(), connectionParams.getProxyPort()));
- }
- clientBuilder.setDefaultRequestConfig(requestConfigBuilder.build());
-
- log.fine(() -> "Creating HttpClient:" +
- " ConnectionTimeout " + connectionParams.getConnectionTimeToLive().getSeconds() + " seconds" +
- " proxyhost (can be null) " + connectionParams.getProxyHost() + ":" + connectionParams.getProxyPort()
- + (useSsl ? " using ssl " : " not using ssl")
- );
- return clientBuilder.build();
- }
- }
-
- // Note: Using deprecated setSslContext() to allow httpclient 4.4 on classpath (e.g unexpected Maven dependency resolution for test classpath)
- @SuppressWarnings("deprecation")
- private static void setSslContext(HttpClientBuilder builder, SSLContext sslContext) {
- builder.setSslcontext(sslContext);
- }
-
-}
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ApacheGatewayConnectionFactory.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ApacheGatewayConnectionFactory.java
deleted file mode 100644
index 0b02626d6e8..00000000000
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ApacheGatewayConnectionFactory.java
+++ /dev/null
@@ -1,63 +0,0 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.vespa.http.client.core.communication;
-
-import com.yahoo.vespa.http.client.config.ConnectionParams;
-import com.yahoo.vespa.http.client.config.Endpoint;
-import com.yahoo.vespa.http.client.config.FeedParams;
-
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.time.Clock;
-import java.util.Objects;
-
-/**
- * @author bratseth
- */
-public class ApacheGatewayConnectionFactory implements GatewayConnectionFactory {
-
- private final Endpoint endpoint;
- private final FeedParams feedParams;
- private final String clusterSpecificRoute;
- private final ConnectionParams connectionParams;
- private final ApacheGatewayConnection.HttpClientFactory httpClientFactory;
- private final String clientId;
- private final Clock clock;
-
- public ApacheGatewayConnectionFactory(Endpoint endpoint,
- FeedParams feedParams,
- String clusterSpecificRoute,
- ConnectionParams connectionParams,
- ApacheGatewayConnection.HttpClientFactory httpClientFactory,
- String clientId,
- Clock clock) {
- this.endpoint = validate(endpoint);
- this.feedParams = feedParams;
- this.clusterSpecificRoute = clusterSpecificRoute;
- this.httpClientFactory = httpClientFactory;
- this.connectionParams = connectionParams;
- this.clientId = Objects.requireNonNull(clientId, "clientId cannot be null");
- this.clock = clock;
- }
-
- private static Endpoint validate(Endpoint endpoint) {
- try {
- InetAddress.getByName(endpoint.getHostname());
- return endpoint;
- }
- catch (UnknownHostException e) {
- throw new IllegalArgumentException("Unknown host: " + endpoint);
- }
- }
-
- @Override
- public GatewayConnection newConnection() {
- return new ApacheGatewayConnection(endpoint,
- feedParams,
- clusterSpecificRoute,
- connectionParams,
- httpClientFactory,
- clientId,
- clock);
- }
-
-}
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ByteBufferInputStream.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ByteBufferInputStream.java
deleted file mode 100644
index f88519c2615..00000000000
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ByteBufferInputStream.java
+++ /dev/null
@@ -1,77 +0,0 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.vespa.http.client.core.communication;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.ByteBuffer;
-import java.util.ArrayDeque;
-import java.util.Deque;
-
-/**
- * @author Einar M R Rosenvinge
- */
-class ByteBufferInputStream extends InputStream {
- private final Deque<ByteBuffer> currentBuffers = new ArrayDeque<>();
-
- ByteBufferInputStream(ByteBuffer[] buffers) {
- for (int i = buffers.length - 1; i > -1; i--) {
- currentBuffers.push(buffers[i]);
- }
- }
-
- @Override
- public int read() throws IOException {
- pop();
- if (currentBuffers.isEmpty()) {
- return -1;
- }
- return currentBuffers.peek().get();
- }
-
- private void pop() {
- if (currentBuffers.isEmpty()) {
- return;
- }
-
- while (!currentBuffers.isEmpty() && !currentBuffers.peek().hasRemaining()) {
- //it's exhausted, get rid of it
- currentBuffers.pop();
- }
- }
-
- @Override
- public int read(byte[] b, int off, int len) throws IOException {
- if (b == null) {
- throw new NullPointerException();
- } else if (off < 0 || len < 0 || len > b.length - off) {
- throw new IndexOutOfBoundsException();
- } else if (len == 0) {
- return 0;
- }
- pop();
- if (currentBuffers.isEmpty()) {
- return -1;
- }
- int toRead = Math.min(len, currentBuffers.peek().remaining());
- currentBuffers.peek().get(b, off, toRead);
- return toRead;
- }
-
- @Override
- public long skip(long n) throws IOException {
- throw new IOException("skip() not supported.");
- }
-
- @Override
- public int available() throws IOException {
- if (currentBuffers.isEmpty()) {
- return 0;
- }
-
- int size = 0;
- for (ByteBuffer b : currentBuffers) {
- size += b.remaining();
- }
- return size;
- }
-}
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ClusterConnection.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ClusterConnection.java
deleted file mode 100644
index ef5b7afd16a..00000000000
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ClusterConnection.java
+++ /dev/null
@@ -1,193 +0,0 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.vespa.http.client.core.communication;
-
-import com.fasterxml.jackson.core.JsonGenerator;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
-import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
-import com.yahoo.vespa.http.client.config.Cluster;
-import com.yahoo.vespa.http.client.config.ConnectionParams;
-import com.yahoo.vespa.http.client.config.Endpoint;
-import com.yahoo.vespa.http.client.config.FeedParams;
-import com.yahoo.vespa.http.client.core.Document;
-import com.yahoo.vespa.http.client.core.Exceptions;
-import com.yahoo.vespa.http.client.core.operationProcessor.OperationProcessor;
-
-import java.io.IOException;
-import java.io.StringWriter;
-import java.time.Clock;
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
-/**
- * @author Einar M R Rosenvinge
- */
-public class ClusterConnection implements AutoCloseable {
-
- private static ObjectMapper jsonMapper = createMapper();
-
- private final List<IOThread> ioThreads = new ArrayList<>();
- private final int clusterId;
- private final ThreadGroup ioThreadGroup;
-
- /** The shared queue of document operations the io threads will take from */
- private final DocumentQueue documentQueue;
-
- /** The single endpoint this sends to, or null if it will send to multiple endpoints */
- private final Endpoint singleEndpoint;
-
- public ClusterConnection(OperationProcessor operationProcessor,
- FeedParams feedParams,
- ConnectionParams connectionParams,
- Cluster cluster,
- int clusterId,
- int clientQueueSizePerCluster,
- ScheduledThreadPoolExecutor timeoutExecutor,
- Clock clock) {
- if (cluster.getEndpoints().isEmpty())
- throw new IllegalArgumentException("At least a single endpoint is required in " + cluster);
-
- this.clusterId = clusterId;
- int totalNumberOfEndpointsInThisCluster = cluster.getEndpoints().size() * connectionParams.getNumPersistentConnectionsPerEndpoint();
- if (totalNumberOfEndpointsInThisCluster == 0)
- throw new IllegalArgumentException("At least 1 persistent connection per endpoint is required in " + cluster);
- int maxInFlightPerSession = Math.max(1, feedParams.getMaxInFlightRequests() / totalNumberOfEndpointsInThisCluster);
-
- documentQueue = new DocumentQueue(clientQueueSizePerCluster, clock);
- ioThreadGroup = operationProcessor.getIoThreadGroup();
- singleEndpoint = cluster.getEndpoints().size() == 1 ? cluster.getEndpoints().get(0) : null;
- Double idlePollFrequency = feedParams.getIdlePollFrequency();
- if (idlePollFrequency == null)
- idlePollFrequency = 10.0;
- for (Endpoint endpoint : cluster.getEndpoints()) {
- EndpointResultQueue endpointResultQueue = new EndpointResultQueue(operationProcessor,
- endpoint,
- clusterId,
- timeoutExecutor,
- feedParams.getServerTimeout(TimeUnit.MILLISECONDS) + feedParams.getClientTimeout(TimeUnit.MILLISECONDS));
- for (int i = 0; i < connectionParams.getNumPersistentConnectionsPerEndpoint(); i++) {
- GatewayConnectionFactory connectionFactory;
- if (connectionParams.isDryRun()) {
- connectionFactory = new DryRunGatewayConnectionFactory(endpoint, clock);
- } else {
- connectionFactory = new ApacheGatewayConnectionFactory(endpoint,
- feedParams,
- cluster.getRoute(),
- connectionParams,
- new ApacheGatewayConnection.HttpClientFactory(feedParams, connectionParams, endpoint.isUseSsl()),
- operationProcessor.getClientId(),
- clock
- );
- }
- IOThread ioThread = new IOThread(operationProcessor.getIoThreadGroup(),
- endpoint,
- endpointResultQueue,
- connectionFactory,
- clusterId,
- feedParams.getMaxChunkSizeBytes(),
- maxInFlightPerSession,
- Duration.ofMillis(feedParams.getLocalQueueTimeOut()),
- documentQueue,
- feedParams.getMaxSleepTimeMs(),
- connectionParams.getConnectionTimeToLive(),
- connectionParams.runThreads(),
- idlePollFrequency,
- clock);
- ioThreads.add(ioThread);
- }
- }
- }
-
- private static ObjectMapper createMapper() {
- ObjectMapper mapper = new ObjectMapper();
- mapper.registerModule(new Jdk8Module());
- mapper.registerModule(new JavaTimeModule());
- return mapper;
- }
-
- public int getClusterId() {
- return clusterId;
- }
-
- public void post(Document document) throws EndpointIOException {
- try {
- documentQueue.put(document, Thread.currentThread().getThreadGroup() == ioThreadGroup);
- } catch (Throwable t) { // InterruptedException if shutting down, IllegalStateException if already shut down
- throw new EndpointIOException(singleEndpoint, "While sending", t);
- }
- }
-
- @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
- @Override
- public void close() {
- List<Exception> exceptions = new ArrayList<>();
- for (IOThread ioThread : ioThreads) {
- try {
- ioThread.close();
- } catch (Exception e) {
- exceptions.add(e);
- }
- }
- if (exceptions.isEmpty()) {
- return;
- }
- if (exceptions.size() == 1) {
- if (exceptions.get(0) instanceof RuntimeException) {
- throw (RuntimeException) exceptions.get(0);
- } else {
- throw new RuntimeException(exceptions.get(0));
- }
- }
- StringBuilder b = new StringBuilder();
- b.append("Exception thrown while closing one or more endpoints: ");
- for (int i = 0; i < exceptions.size(); i++) {
- Exception e = exceptions.get(i);
- b.append(Exceptions.toMessageString(e));
- if (i != (exceptions.size() - 1)) {
- b.append(", ");
- }
- }
- throw new RuntimeException(b.toString(), exceptions.get(0));
- }
-
- public String getStatsAsJSon() throws IOException {
- StringWriter stringWriter = new StringWriter();
- JsonGenerator jsonGenerator = jsonMapper.createGenerator(stringWriter);
- jsonGenerator.writeStartObject();
- jsonGenerator.writeArrayFieldStart("session");
- for (IOThread ioThread : ioThreads) {
- jsonGenerator.writeStartObject();
- jsonGenerator.writeObjectFieldStart("endpoint");
- jsonGenerator.writeStringField("host", ioThread.getEndpoint().getHostname());
- jsonGenerator.writeNumberField("port", ioThread.getEndpoint().getPort());
- jsonGenerator.writeEndObject();
- jsonGenerator.writeFieldName("stats");
- IOThread.ConnectionStats connectionStats = ioThread.getConnectionStats();
- jsonMapper.writeValue(jsonGenerator, connectionStats);
- jsonGenerator.writeEndObject();
- }
- jsonGenerator.writeEndArray();
- jsonGenerator.writeEndObject();
- jsonGenerator.close();
- return stringWriter.toString();
- }
-
- public List<IOThread> ioThreads() {
- return Collections.unmodifiableList(ioThreads);
- }
-
- @Override
- public boolean equals(Object o) {
- return (this == o) || (o instanceof ClusterConnection && clusterId == ((ClusterConnection) o).clusterId);
- }
-
- @Override
- public int hashCode() {
- return clusterId;
- }
-
-}
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DocumentQueue.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DocumentQueue.java
deleted file mode 100644
index 8164534ca37..00000000000
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DocumentQueue.java
+++ /dev/null
@@ -1,125 +0,0 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.vespa.http.client.core.communication;
-
-import com.yahoo.vespa.http.client.core.Document;
-
-import java.time.Clock;
-import java.time.Duration;
-import java.util.ArrayDeque;
-import java.util.ArrayList;
-import java.util.Deque;
-import java.util.List;
-import java.util.Optional;
-import java.util.concurrent.TimeUnit;
-
-/**
- * Shared document queue that gives clients operations on documents which do not have operations already in flight.
- * This is multithread safe.
- *
- * @author dybis
- */
-class DocumentQueue {
-
- private final Deque<Document> queue;
- private final int maxSize;
- private boolean closed = false;
- private final Clock clock;
-
- DocumentQueue(int maxSize, Clock clock) {
- this.maxSize = maxSize;
- this.queue = new ArrayDeque<>(maxSize);
- this.clock = clock;
- }
-
- List<Document> removeAllDocuments() {
- synchronized (queue) {
- List<Document> allDocs = new ArrayList<>();
- while (!queue.isEmpty()) {
- allDocs.add(queue.poll());
- }
- queue.notifyAll();
- return allDocs;
- }
- }
-
- void put(Document document, boolean calledFromIoThreadGroup) throws InterruptedException {
- document.setQueueInsertTime(clock.instant());
- synchronized (queue) {
- while (!closed && (queue.size() >= maxSize) && !calledFromIoThreadGroup) {
- queue.wait();
- }
- if (closed) {
- throw new IllegalStateException("Cannot add elements to closed queue.");
- }
- queue.add(document);
- queue.notifyAll();
- }
- }
-
- Document poll(long timeout, TimeUnit unit) throws InterruptedException {
- synchronized (queue) {
- long remainingToWait = unit.toMillis(timeout);
- while (queue.isEmpty()) {
- long startTime = clock.millis();
- queue.wait(remainingToWait);
- remainingToWait -= (clock.millis() - startTime);
- if (remainingToWait <= 0) {
- break;
- }
- }
- Document document = queue.poll();
- queue.notifyAll();
- return document;
- }
- }
-
- Document poll() {
- synchronized (queue) {
- Document document = queue.poll();
- queue.notifyAll();
- return document;
- }
- }
-
- boolean isEmpty() {
- synchronized (queue) {
- return queue.isEmpty();
- }
- }
-
- int size() {
- synchronized (queue) {
- return queue.size();
- }
- }
-
- void clear() {
- synchronized (queue) {
- queue.clear();
- queue.notifyAll();
- }
- }
-
- boolean close() {
- boolean previousState;
- synchronized (queue) {
- previousState = closed;
- closed = true;
- queue.notifyAll();
- }
- return previousState;
- }
-
- Optional<Document> pollDocumentIfTimedoutInQueue(Duration localQueueTimeOut) {
- synchronized (queue) {
- if (queue.isEmpty()) return Optional.empty();
-
- Document document = queue.peek();
- if (document.getQueueInsertTime().plus(localQueueTimeOut).isBefore(clock.instant()))
- return Optional.ofNullable(queue.poll());
- else
- return Optional.empty();
- }
- }
-
-}
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DryRunGatewayConnection.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DryRunGatewayConnection.java
deleted file mode 100644
index a6f6992b238..00000000000
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DryRunGatewayConnection.java
+++ /dev/null
@@ -1,123 +0,0 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.vespa.http.client.core.communication;
-
-import com.yahoo.vespa.http.client.config.Endpoint;
-import com.yahoo.vespa.http.client.core.Document;
-import com.yahoo.vespa.http.client.core.ErrorCode;
-import com.yahoo.vespa.http.client.core.OperationStatus;
-import com.yahoo.vespa.http.client.core.ServerResponseException;
-
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.charset.StandardCharsets;
-import java.time.Clock;
-import java.time.Instant;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-/**
- * Dummy implementation.
- *
- * @author dybis
- */
-public class DryRunGatewayConnection implements GatewayConnection {
-
- private final Endpoint endpoint;
- private final Clock clock;
- private Instant connectionTime = null;
- private Instant lastPollTime = null;
-
- /** Set to true to hold off responding with a result to any incoming operations until this is set false */
- private boolean hold = false;
- private final List<Document> held = new ArrayList<>();
-
- /** If this is set, handshake operations will throw this exception */
- private ServerResponseException throwThisOnHandshake = null;
-
- /** If this is set, all write operations will throw this exception */
- private IOException throwThisOnWrite = null;
-
- public DryRunGatewayConnection(Endpoint endpoint, Clock clock) {
- this.endpoint = endpoint;
- this.clock = clock;
- }
-
- @Override
- public synchronized InputStream write(List<Document> docs) throws IOException {
- if (throwThisOnWrite != null)
- throw throwThisOnWrite;
-
- if (hold) {
- held.addAll(docs);
- return new ByteArrayInputStream("".getBytes(StandardCharsets.UTF_8));
- }
- else {
- StringBuilder result = new StringBuilder();
- for (Document doc : held)
- result.append(okResponse(doc).render());
- held.clear();
- for (Document doc : docs)
- result.append(okResponse(doc).render());
- return new ByteArrayInputStream(result.toString().getBytes(StandardCharsets.UTF_8));
- }
- }
-
- @Override
- public synchronized InputStream poll() throws IOException {
- lastPollTime = clock.instant();
- return write(new ArrayList<>());
- }
-
- @Override
- public synchronized Instant lastPollTime() { return lastPollTime; }
-
- @Override
- public synchronized InputStream drain() throws IOException {
- return write(new ArrayList<>());
- }
-
- @Override
- public synchronized boolean connect() {
- connectionTime = clock.instant();
- return true;
- }
-
- @Override
- public synchronized Instant connectionTime() { return connectionTime; }
-
- @Override
- public Endpoint getEndpoint() {
- return endpoint;
- }
-
- @Override
- public synchronized void handshake() throws ServerResponseException {
- if (throwThisOnHandshake != null)
- throw throwThisOnHandshake;
- }
-
- @Override
- public synchronized void close() { }
-
- public synchronized void hold(boolean hold) {
- this.hold = hold;
- }
-
- /** Returns the document currently held in this */
- public synchronized List<Document> held() { return Collections.unmodifiableList(held); }
-
- public synchronized void throwOnWrite(IOException throwThisOnWrite) {
- this.throwThisOnWrite = throwThisOnWrite;
- }
-
- public synchronized void throwOnHandshake(ServerResponseException throwThisOnHandshake) {
- this.throwThisOnHandshake = throwThisOnHandshake;
- }
-
- private OperationStatus okResponse(Document document) {
- return new OperationStatus("ok", document.getOperationId(), ErrorCode.OK, false, "");
- }
-
-}
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DryRunGatewayConnectionFactory.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DryRunGatewayConnectionFactory.java
deleted file mode 100644
index 01bec563889..00000000000
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DryRunGatewayConnectionFactory.java
+++ /dev/null
@@ -1,26 +0,0 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.vespa.http.client.core.communication;
-
-import com.yahoo.vespa.http.client.config.Endpoint;
-
-import java.time.Clock;
-
-/**
- * @author bratseth
- */
-public class DryRunGatewayConnectionFactory implements GatewayConnectionFactory {
-
- private final Endpoint endpoint;
- private final Clock clock;
-
- public DryRunGatewayConnectionFactory(Endpoint endpoint, Clock clock) {
- this.endpoint = endpoint;
- this.clock = clock;
- }
-
- @Override
- public GatewayConnection newConnection() {
- return new DryRunGatewayConnection(endpoint, clock);
- }
-
-}
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/EndpointIOException.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/EndpointIOException.java
deleted file mode 100644
index 34b8ef76068..00000000000
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/EndpointIOException.java
+++ /dev/null
@@ -1,26 +0,0 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.vespa.http.client.core.communication;
-
-import com.yahoo.vespa.http.client.config.Endpoint;
-
-import java.io.IOException;
-
-/**
- * Class for throwing exception from endpoint.
- *
- * @author dybis
-*/
-public class EndpointIOException extends IOException {
-
- private final Endpoint endpoint;
- private static final long serialVersionUID = 29335813211L;
-
- public EndpointIOException(Endpoint endpoint, String message, Throwable cause) {
- super(message, cause);
- this.endpoint = endpoint;
- }
-
- /** Returns the endpoint, or null if the failure occurred before this was assigned to a unique endpoint */
- public Endpoint getEndpoint() { return endpoint; }
-
-}
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/EndpointResultQueue.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/EndpointResultQueue.java
deleted file mode 100644
index 98b6aee1e33..00000000000
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/EndpointResultQueue.java
+++ /dev/null
@@ -1,136 +0,0 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.vespa.http.client.core.communication;
-
-import com.yahoo.vespa.http.client.FeedEndpointException;
-import com.yahoo.vespa.http.client.config.Endpoint;
-import com.yahoo.vespa.http.client.core.EndpointResult;
-import com.yahoo.vespa.http.client.core.operationProcessor.EndPointResultFactory;
-import com.yahoo.vespa.http.client.core.operationProcessor.OperationProcessor;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.logging.Logger;
-
-/**
- * The shared queue of operation results.
- * This is multithread safe.
- *
- * @author Einar M R Rosenvinge
- */
-class EndpointResultQueue {
-
- private static final Logger log = Logger.getLogger(EndpointResultQueue.class.getName());
- private final OperationProcessor operationProcessor;
-
- private final Map<String, InflightOperation> inflightOperations = new HashMap<>();
-
- private final Endpoint endpoint;
- private final int clusterId;
- private final ScheduledThreadPoolExecutor timer;
- private final long totalTimeoutMs;
-
- EndpointResultQueue(OperationProcessor operationProcessor,
- Endpoint endpoint,
- int clusterId,
- ScheduledThreadPoolExecutor timer,
- long totalTimeoutMs) {
- this.operationProcessor = operationProcessor;
- this.endpoint = endpoint;
- this.clusterId = clusterId;
- this.timer = timer;
- this.totalTimeoutMs = totalTimeoutMs;
- }
-
- public synchronized void operationSent(String operationId, GatewayConnection connection) {
- DocumentTimerTask task = new DocumentTimerTask(operationId);
- ScheduledFuture<?> future = timer.schedule(task, totalTimeoutMs, TimeUnit.MILLISECONDS);
- inflightOperations.put(operationId, new InflightOperation(future, connection));
- }
-
- public synchronized void failOperation(EndpointResult result, int clusterId) {
- resultReceived(result, clusterId, false);
- }
-
- public synchronized void resultReceived(EndpointResult result, int clusterId) {
- resultReceived(result, clusterId, true);
- }
-
- void onEndpointError(FeedEndpointException e) {
- operationProcessor.onEndpointError(e);
- }
-
- private synchronized void resultReceived(EndpointResult result, int clusterId, boolean duplicateGivesWarning) {
- operationProcessor.resultReceived(result, clusterId);
- InflightOperation operation = inflightOperations.remove(result.getOperationId());
- if (operation == null) {
- if (duplicateGivesWarning) {
- log.info("Result for ID '" + result.getOperationId() + "' received from '" + endpoint +
- "', but we have no record of a sent operation. This may happen if an operation is " +
- "initiated, but also retried, due to HTTP failure. Otherwise, something is wrong on " +
- "the server side (bad VIP usage?), or operation was received _after_ client-side timeout.");
- }
- return;
- }
- operation.future.cancel(false);
- }
-
- /** Called only from ScheduledThreadPoolExecutor thread in DocumentTimerTask.run(), see below */
- private synchronized void timeout(String operationId) {
- InflightOperation operation = inflightOperations.remove(operationId);
- if (operation == null) {
- log.finer("Timeout of operation '" + operationId + "', but operation " +
- "not found in map. Result was probably received just-in-time from server, while timeout " +
- "task could not be cancelled.");
- return;
- }
- EndpointResult endpointResult = EndPointResultFactory.createTransientError(
- endpoint, operationId, new RuntimeException("Timed out waiting for reply from server."));
- operationProcessor.resultReceived(endpointResult, clusterId);
- }
-
- public synchronized int getPendingSize() {
- return inflightOperations.values().size();
- }
-
- public synchronized void failPending(Exception exception) {
- inflightOperations.forEach((operationId, operation) -> {
- operation.future.cancel(false);
- EndpointResult result = EndPointResultFactory.createError(endpoint, operationId, exception);
- operationProcessor.resultReceived(result, clusterId);
- });
- inflightOperations.clear();
- }
-
- public synchronized boolean hasInflightOperations(GatewayConnection connection) {
- return inflightOperations.entrySet().stream()
- .anyMatch(entry -> entry.getValue().connection.equals(connection));
- }
-
- private class DocumentTimerTask implements Runnable {
-
- private final String operationId;
-
- private DocumentTimerTask(String operationId) {
- this.operationId = operationId;
- }
-
- @Override
- public void run() {
- timeout(operationId);
- }
-
- }
-
- private static class InflightOperation {
- final ScheduledFuture<?> future;
- final GatewayConnection connection;
-
- InflightOperation(ScheduledFuture<?> future, GatewayConnection connection) {
- this.future = future;
- this.connection = connection;
- }
- }
-}
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/GatewayConnection.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/GatewayConnection.java
deleted file mode 100644
index 25057a1dead..00000000000
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/GatewayConnection.java
+++ /dev/null
@@ -1,36 +0,0 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.vespa.http.client.core.communication;
-
-import com.yahoo.vespa.http.client.config.Endpoint;
-import com.yahoo.vespa.http.client.core.Document;
-import com.yahoo.vespa.http.client.core.ServerResponseException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.time.Instant;
-import java.util.List;
-
-public interface GatewayConnection {
-
- /** Returns the time this connected over the network, or null if not connected yet */
- Instant connectionTime();
-
- /** Returns the last time poll was called on this, or null if never */
- Instant lastPollTime();
-
- InputStream write(List<Document> docs) throws ServerResponseException, IOException;
-
- /** Returns any operation results that are ready now */
- InputStream poll() throws ServerResponseException, IOException;
-
- /** Attempt to drain all outstanding operations, even if this leads to blocking */
- InputStream drain() throws ServerResponseException, IOException;
-
- boolean connect();
-
- Endpoint getEndpoint();
-
- void handshake() throws ServerResponseException, IOException;
-
- void close();
-
-}
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/GatewayConnectionFactory.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/GatewayConnectionFactory.java
deleted file mode 100644
index 4988b73510f..00000000000
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/GatewayConnectionFactory.java
+++ /dev/null
@@ -1,13 +0,0 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.vespa.http.client.core.communication;
-
-/**
- * Creates gateway connections on request
- *
- * @author bratseth
- */
-public interface GatewayConnectionFactory {
-
- GatewayConnection newConnection();
-
-}
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/GatewayThrottler.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/GatewayThrottler.java
deleted file mode 100644
index 41a1b6a7e87..00000000000
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/GatewayThrottler.java
+++ /dev/null
@@ -1,46 +0,0 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.vespa.http.client.core.communication;
-
-import java.util.concurrent.ThreadLocalRandom;
-
-/**
- * When the gateways says it can not handle more load, we should send less load. That is the responsibility
- * of this component
- *
- * @author dybis
- */
-public class GatewayThrottler {
-
- private long backOffTimeMs = 0;
- private final long maxSleepTimeMs;
-
- public GatewayThrottler(long maxSleepTimeMs) {
- this.maxSleepTimeMs = maxSleepTimeMs;
- }
-
- public void handleCall(int transientErrors) {
- if (transientErrors > 0) {
- backOffTimeMs = Math.min(maxSleepTimeMs, backOffTimeMs + distribute(100));
- } else {
- backOffTimeMs = Math.max(0, backOffTimeMs - distribute(10));
- }
- sleepMs(backOffTimeMs);
- }
-
- protected void sleepMs(long sleepTime) {
- try {
- if (backOffTimeMs > 0L) {
- Thread.sleep(backOffTimeMs);
- }
- } catch (InterruptedException e) {
- // Do nothing
- }
- }
-
- public int distribute(int expected) {
- double factor = 0.5 + ThreadLocalRandom.current().nextDouble();
- Double result = expected * factor;
- return result.intValue();
- }
-
-}
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java
deleted file mode 100644
index aeb164227f1..00000000000
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java
+++ /dev/null
@@ -1,649 +0,0 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.vespa.http.client.core.communication;
-
-import com.yahoo.vespa.http.client.FeedConnectException;
-import com.yahoo.vespa.http.client.FeedProtocolException;
-import com.yahoo.vespa.http.client.Result;
-import com.yahoo.vespa.http.client.config.Endpoint;
-import com.yahoo.vespa.http.client.core.Document;
-import com.yahoo.vespa.http.client.core.EndpointResult;
-import com.yahoo.vespa.http.client.core.Exceptions;
-import com.yahoo.vespa.http.client.core.ServerResponseException;
-import com.yahoo.vespa.http.client.core.operationProcessor.EndPointResultFactory;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.time.Clock;
-import java.time.Duration;
-import java.time.Instant;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Optional;
-import java.util.Random;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-/**
- * Thread which feeds document operations asynchronously and processes the results.
- *
- * @author Einar M R Rosenvinge
- */
-public class IOThread implements Runnable, AutoCloseable {
-
- private static final Logger log = Logger.getLogger(IOThread.class.getName());
-
- private final Endpoint endpoint;
- private final GatewayConnectionFactory connectionFactory;
- private final DocumentQueue documentQueue;
- private final EndpointResultQueue resultQueue;
-
- /** The thread running this, or null if it does not run a thread (meaning tick() must be called from the outside) */
- private final Thread thread;
- private final int clusterId;
- private final CountDownLatch running = new CountDownLatch(1);
- private final CountDownLatch stopSignal = new CountDownLatch(1);
- private final int maxChunkSizeBytes;
- private final int maxInFlightRequests;
- private final Duration localQueueTimeOut;
- private final GatewayThrottler gatewayThrottler;
- private final Duration connectionTimeToLive;
- private final long pollIntervalUS;
- private final Clock clock;
- private final Random random = new Random();
- private final OldConnectionsDrainer oldConnectionsDrainer;
-
- private volatile GatewayConnection currentConnection;
- private volatile ConnectionState connectionState = ConnectionState.DISCONNECTED;
-
- private enum ConnectionState { DISCONNECTED, CONNECTED, SESSION_SYNCED };
- private final AtomicInteger wrongSessionDetectedCounter = new AtomicInteger(0);
- private final AtomicInteger wrongVersionDetectedCounter = new AtomicInteger(0);
- private final AtomicInteger problemStatusCodeFromServerCounter = new AtomicInteger(0);
- private final AtomicInteger executeProblemsCounter = new AtomicInteger(0);
- private final AtomicInteger docsReceivedCounter = new AtomicInteger(0);
- private final AtomicInteger statusReceivedCounter = new AtomicInteger(0);
- private final AtomicInteger pendingDocumentStatusCount = new AtomicInteger(0);
- private final AtomicInteger successfulHandshakes = new AtomicInteger(0);
- private final AtomicInteger lastGatewayProcessTimeMillis = new AtomicInteger(0);
-
- IOThread(ThreadGroup ioThreadGroup,
- Endpoint endpoint,
- EndpointResultQueue endpointResultQueue,
- GatewayConnectionFactory connectionFactory,
- int clusterId,
- int maxChunkSizeBytes,
- int maxInFlightRequests,
- Duration localQueueTimeOut,
- DocumentQueue documentQueue,
- long maxSleepTimeMs,
- Duration connectionTimeToLive,
- boolean runThreads,
- double idlePollFrequency,
- Clock clock) {
- this.endpoint = endpoint;
- this.documentQueue = documentQueue;
- this.connectionFactory = connectionFactory;
- this.currentConnection = connectionFactory.newConnection();
- this.resultQueue = endpointResultQueue;
- this.clusterId = clusterId;
- this.maxChunkSizeBytes = maxChunkSizeBytes;
- this.maxInFlightRequests = maxInFlightRequests;
- this.connectionTimeToLive = connectionTimeToLive;
- this.gatewayThrottler = new GatewayThrottler(maxSleepTimeMs);
- this.pollIntervalUS = Math.max(1000, (long)(1000000.0/Math.max(0.1, idlePollFrequency))); // ensure range [1ms, 10s]
- this.clock = clock;
- this.localQueueTimeOut = localQueueTimeOut;
- this.oldConnectionsDrainer = new OldConnectionsDrainer(endpoint,
- clusterId,
- Duration.ofMillis(pollIntervalUS/1000),
- connectionTimeToLive,
- localQueueTimeOut,
- statusReceivedCounter,
- resultQueue,
- stopSignal,
- clock);
- if (runThreads) {
- this.thread = new Thread(ioThreadGroup, this, "IOThread " + endpoint);
- thread.setDaemon(true);
- thread.start();
- Thread thread = new Thread(ioThreadGroup, oldConnectionsDrainer, "IOThread " + endpoint + " drainer");
- thread.setDaemon(true);
- thread.start();
- }
- else {
- this.thread = null;
- }
- }
-
- public Endpoint getEndpoint() {
- return endpoint;
- }
-
- /**
- * Returns a snapshot of counters. Threadsafe.
- */
- public ConnectionStats getConnectionStats() {
- return new ConnectionStats(
- wrongSessionDetectedCounter.get(),
- wrongVersionDetectedCounter.get(),
- problemStatusCodeFromServerCounter.get(),
- executeProblemsCounter.get(),
- docsReceivedCounter.get(),
- statusReceivedCounter.get(),
- pendingDocumentStatusCount.get(),
- successfulHandshakes.get(),
- lastGatewayProcessTimeMillis.get());
- }
-
- @Override
- public void close() {
- documentQueue.close();
- if (stopSignal.getCount() == 0) return;
-
- stopSignal.countDown();
- log.finer("Closed called.");
-
- oldConnectionsDrainer.close();
-
- // Make a last attempt to get results from previous operations, we have already waited quite a bit before getting here.
- int size = resultQueue.getPendingSize();
- if (size > 0) {
- log.info("We have outstanding operations (" + size + ") , trying to fetch responses.");
- try {
- processResponse(currentConnection.drain());
- } catch (Throwable e) {
- log.log(Level.SEVERE, "Some failures while trying to get latest responses from vespa.", e);
- }
- }
-
- try {
- currentConnection.close();
- } finally {
- // If there is still documents in the queue, fail them.
- drainDocumentQueueWhenFailingPermanently(new Exception("Closed call, did not manage to process everything so failing this document."));
- }
-
- log.fine("Session to " + endpoint + " closed.");
- }
-
- /** For testing only */
- public void post(Document document) throws InterruptedException {
- documentQueue.put(document, true);
- }
-
- @Override
- public String toString() {
- return "I/O thread (for " + endpoint + ")";
- }
-
- List<Document> getNextDocsForFeeding(long maxWaitUnits, TimeUnit timeUnit) {
- List<Document> docsForSendChunk = new ArrayList<>();
- int chunkSizeBytes = 0;
- try {
- drainFirstDocumentsInQueueIfOld();
- Document doc = thread != null ? documentQueue.poll(maxWaitUnits, timeUnit) : documentQueue.poll();
- if (doc != null) {
- docsForSendChunk.add(doc);
- chunkSizeBytes = doc.size();
- }
- } catch (InterruptedException ie) {
- log.fine("Got break signal while waiting for new documents to feed");
- return docsForSendChunk;
- }
- int pendingSize = 1 + resultQueue.getPendingSize();
-
- // see if we can get more documents without blocking
- // slightly randomize how much is taken to avoid harmonic interactions leading
- // to some threads consistently taking more than others
- int thisMaxChunkSizeBytes = randomize(maxChunkSizeBytes);
- int thisMaxInFlightRequests = randomize(maxInFlightRequests);
- while (chunkSizeBytes < thisMaxChunkSizeBytes && pendingSize < thisMaxInFlightRequests) {
- drainFirstDocumentsInQueueIfOld();
- Document document = documentQueue.poll();
- if (document == null) break;
- docsForSendChunk.add(document);
- chunkSizeBytes += document.size();
- pendingSize++;
- }
- if (log.isLoggable(Level.FINEST))
- log.finest("Chunk has " + docsForSendChunk.size() + " docs with a size " + chunkSizeBytes + " bytes");
- docsReceivedCounter.addAndGet(docsForSendChunk.size());
- return docsForSendChunk;
- }
-
- private int randomize(int limit) {
- double multiplier = 0.75 + 0.25 * random.nextDouble();
- return Math.max(1, (int)(limit * multiplier));
- }
-
- private void addDocumentsToResultQueue(List<Document> docs) {
- for (Document doc : docs) {
- resultQueue.operationSent(doc.getOperationId(), currentConnection);
- }
- }
-
- private void markDocumentAsFailed(List<Document> docs, ServerResponseException servletException) {
- for (Document doc : docs) {
- resultQueue.failOperation(EndPointResultFactory.createTransientError(endpoint,
- doc.getOperationId(),
- servletException),
- clusterId);
- }
- }
-
- private InputStream sendAndReceive(List<Document> docs) throws IOException, ServerResponseException {
- try {
- // Post the new docs and get async responses for other posts.
- return currentConnection.write(docs);
- } catch (ServerResponseException ser) {
- markDocumentAsFailed(docs, ser);
- throw ser;
- } catch (Exception e) {
- markDocumentAsFailed(docs, new ServerResponseException(Exceptions.toMessageString(e)));
- throw e;
- }
- }
-
- private static class ProcessResponse {
-
- private final int transitiveErrorCount;
- private final int processResultsCount;
-
- ProcessResponse(int transitiveErrorCount, int processResultsCount) {
- this.transitiveErrorCount = transitiveErrorCount;
- this.processResultsCount = processResultsCount;
- }
-
- }
-
- private ProcessResponse processResponse(InputStream serverResponse) throws IOException {
- return processResponse(serverResponse, endpoint, clusterId, statusReceivedCounter, resultQueue);
- }
-
- private static ProcessResponse processResponse(InputStream serverResponse,
- Endpoint endpoint,
- int clusterId,
- AtomicInteger statusReceivedCounter,
- EndpointResultQueue resultQueue) throws IOException {
- Collection<EndpointResult> endpointResults = EndPointResultFactory.createResult(endpoint, serverResponse);
- statusReceivedCounter.addAndGet(endpointResults.size());
- int transientErrors = 0;
- for (EndpointResult endpointResult : endpointResults) {
- if (endpointResult.getDetail().getResultType() == Result.ResultType.TRANSITIVE_ERROR) {
- transientErrors++;
- }
- resultQueue.resultReceived(endpointResult, clusterId);
- }
- return new ProcessResponse(transientErrors, endpointResults.size());
- }
-
- private ProcessResponse feedDocumentAndProcessResults(List<Document> docs)
- throws ServerResponseException, IOException {
- addDocumentsToResultQueue(docs);
- long startTime = clock.millis();
- InputStream serverResponse = sendAndReceive(docs);
-
- ProcessResponse processResponse = processResponse(serverResponse);
- lastGatewayProcessTimeMillis.set((int) (clock.millis() - startTime));
- return processResponse;
- }
-
- private ProcessResponse pullAndProcessData(long maxWaitTimeUS) throws ServerResponseException, IOException {
- int pendingResultQueueSize = resultQueue.getPendingSize();
- pendingDocumentStatusCount.set(pendingResultQueueSize);
-
- List<Document> nextDocsForFeeding = (pendingResultQueueSize > maxInFlightRequests)
- ? new ArrayList<>() // The queue is full, will not send more documents
- : getNextDocsForFeeding(maxWaitTimeUS, TimeUnit.MICROSECONDS);
-
- if (nextDocsForFeeding.isEmpty() && pendingResultQueueSize == 0) {
- //we have no unfinished business with the server now.
- log.finest("No document awaiting feeding, not waiting for results.");
- return new ProcessResponse(0, 0);
- }
- log.finest("Awaiting " + pendingResultQueueSize + " results.");
- ProcessResponse processResponse = feedDocumentAndProcessResults(nextDocsForFeeding);
-
- if (pendingResultQueueSize > maxInFlightRequests && processResponse.processResultsCount == 0) {
- try {
- // Max outstanding document operations, no more results on server side, wait a bit before asking again
- Thread.sleep(300);
- } catch (InterruptedException e) {
- // Ignore
- }
- }
- return processResponse;
- }
-
- /** Given a current connection state, take the appropriate action and return the resulting new connection state */
- private ConnectionState cycle(ConnectionState connectionState) {
- switch(connectionState) {
- case DISCONNECTED:
- try {
- if (! currentConnection.connect()) {
- log.log(Level.WARNING, "Could not connect to endpoint: '" + endpoint + "'. Will re-try.");
- drainFirstDocumentsInQueueIfOld();
- return ConnectionState.DISCONNECTED;
- }
- return ConnectionState.CONNECTED;
- } catch (Throwable throwable1) {
- drainFirstDocumentsInQueueIfOld();
-
- log.log(Level.INFO, "Failed connecting to endpoint: '" + endpoint + "'. Will re-try connecting.",
- throwable1);
- executeProblemsCounter.incrementAndGet();
- return ConnectionState.DISCONNECTED;
- }
- case CONNECTED:
- try {
- if (isStale(currentConnection))
- return refreshConnection(connectionState);
- currentConnection.handshake();
- successfulHandshakes.getAndIncrement();
- } catch (ServerResponseException ser) {
- int code = ser.getResponseCode();
- if (code == 401 || code == 403) {
- drainDocumentQueueWhenFailingPermanently(new Exception("Denied access by endpoint:" + ser.getResponseString()));
- log.log(Level.SEVERE, "Failed authentication or authorization with '" + endpoint + "': " + Exceptions.toMessageString(ser));
- return ConnectionState.CONNECTED; // Should ideally exit immediately, instead of doing this per X documents :/
- }
-
- executeProblemsCounter.incrementAndGet();
- log.log(Level.INFO, "Failed talking to endpoint. Handshake with server endpoint '" + endpoint +
- "' failed -- will re-try handshake: " + Exceptions.toMessageString(ser));
-
- drainFirstDocumentsInQueueIfOld();
- resultQueue.onEndpointError(new FeedProtocolException(ser.getResponseCode(), ser.getResponseString(), ser, endpoint));
- return ConnectionState.CONNECTED;
- } catch (Throwable throwable) { // This cover IOException as well
- executeProblemsCounter.incrementAndGet();
- resultQueue.onEndpointError(new FeedConnectException(throwable, endpoint));
- log.log(Level.INFO, "Failed talking to endpoint. Handshake with server endpoint '" +
- endpoint + "' failed. Will re-try handshake.",
- throwable);
- drainFirstDocumentsInQueueIfOld();
- currentConnection.close();
- return ConnectionState.DISCONNECTED;
- }
- return ConnectionState.SESSION_SYNCED;
- case SESSION_SYNCED:
- try {
- if (isStale(currentConnection))
- return refreshConnection(connectionState);
- ProcessResponse processResponse = pullAndProcessData(pollIntervalUS);
- gatewayThrottler.handleCall(processResponse.transitiveErrorCount);
- }
- catch (ServerResponseException ser) {
- log.log(Level.INFO, "Problems while handing data over to endpoint '" + endpoint +
- "'. Will re-try. Endpoint responded with an unexpected HTTP response code.",
- ser);
- return ConnectionState.CONNECTED;
- }
- catch (Throwable e) {
- log.log(Level.INFO,
- "Connection level error handing data over to endpoint '" + endpoint + "'. Will re-try.",
- e);
- currentConnection.close();
- return ConnectionState.DISCONNECTED;
- }
- return ConnectionState.SESSION_SYNCED;
- default: {
- log.severe("Should never get here.");
- currentConnection.close();
- return ConnectionState.DISCONNECTED;
- }
- }
- }
-
- private void sleepIfProblemsGettingSyncedConnection(ConnectionState newState, ConnectionState oldState) {
- if (newState == ConnectionState.SESSION_SYNCED) return;
- if (newState == ConnectionState.CONNECTED && oldState == ConnectionState.DISCONNECTED) return;
- try {
- // Take it easy we have problems getting a connection up.
- if (stopSignal.getCount() > 0 || !documentQueue.isEmpty()) {
- Thread.sleep(gatewayThrottler.distribute(3000));
- }
- } catch (InterruptedException e) {
- }
- }
-
- @Override
- public void run() {
- while (stopSignal.getCount() > 0 || !documentQueue.isEmpty())
- tick();
- log.finer(toString() + " exiting, documentQueue.size()=" + documentQueue.size());
- running.countDown();
- }
-
- /** Do one iteration of work. Should be called from the single worker thread of this. */
- public void tick() {
- ConnectionState oldState = connectionState;
- connectionState = cycle(connectionState);
- if (thread == null)
- oldConnectionsDrainer.checkOldConnections();
- if (thread != null)
- sleepIfProblemsGettingSyncedConnection(connectionState, oldState);
- }
-
- private void drainFirstDocumentsInQueueIfOld() {
- while (true) {
- Optional<Document> document = documentQueue.pollDocumentIfTimedoutInQueue(localQueueTimeOut);
- if ( ! document.isPresent()) return;
-
- EndpointResult endpointResult = EndPointResultFactory.createTransientError(
- endpoint, document.get().getOperationId(),
- new Exception("Not sending document operation, timed out in queue after " +
- (clock.millis() - document.get().getQueueInsertTime().toEpochMilli()) + " ms."));
- resultQueue.failOperation(endpointResult, clusterId);
- }
- }
-
- private void drainDocumentQueueWhenFailingPermanently(Exception exception) {
- // first, clear sentOperations:
- resultQueue.failPending(exception);
-
- for (Document document : documentQueue.removeAllDocuments()) {
- EndpointResult endpointResult=
- EndPointResultFactory.createError(endpoint, document.getOperationId(), exception);
- resultQueue.failOperation(endpointResult, clusterId);
- }
- }
-
- private boolean isStale(GatewayConnection connection) {
- return connection.connectionTime() != null
- && connection.connectionTime().plus(connectionTimeToLive).isBefore(clock.instant());
- }
-
- private ConnectionState refreshConnection(ConnectionState currentConnectionState) {
- if (currentConnectionState == ConnectionState.SESSION_SYNCED)
- oldConnectionsDrainer.add(currentConnection);
- currentConnection = connectionFactory.newConnection();
- return ConnectionState.DISCONNECTED;
- }
-
- public static class ConnectionStats {
-
- // NOTE: These fields are accessed by reflection in JSON serialization
-
- public final int wrongSessionDetectedCounter;
- public final int wrongVersionDetectedCounter;
- public final int problemStatusCodeFromServerCounter;
- public final int executeProblemsCounter;
- public final int docsReceivedCounter;
- public final int statusReceivedCounter;
- public final int pendingDocumentStatusCount;
- public final int successfullHandshakes;
- public final int lastGatewayProcessTimeMillis;
-
- ConnectionStats(int wrongSessionDetectedCounter,
- int wrongVersionDetectedCounter,
- int problemStatusCodeFromServerCounter,
- int executeProblemsCounter,
- int docsReceivedCounter,
- int statusReceivedCounter,
- int pendingDocumentStatusCount,
- int successfullHandshakes,
- int lastGatewayProcessTimeMillis) {
- this.wrongSessionDetectedCounter = wrongSessionDetectedCounter;
- this.wrongVersionDetectedCounter = wrongVersionDetectedCounter;
- this.problemStatusCodeFromServerCounter = problemStatusCodeFromServerCounter;
- this.executeProblemsCounter = executeProblemsCounter;
- this.docsReceivedCounter = docsReceivedCounter;
- this.statusReceivedCounter = statusReceivedCounter;
- this.pendingDocumentStatusCount = pendingDocumentStatusCount;
- this.successfullHandshakes = successfullHandshakes;
- this.lastGatewayProcessTimeMillis = lastGatewayProcessTimeMillis;
- }
- }
-
- /** For testing. Returns the current connection of this. Not thread safe. */
- public GatewayConnection currentConnection() { return currentConnection; }
-
- /** For testing. Returns a snapshot of the old connections of this. */
- public List<GatewayConnection> oldConnections() { return oldConnectionsDrainer.connections(); }
-
- /** For testing */
- public EndpointResultQueue resultQueue() { return resultQueue; }
-
- /**
- * We need to drain results on the connection where they were sent to make sure we request results on
- * the node which received the operation also when going through a VIP.
- */
- private static class OldConnectionsDrainer implements Runnable {
-
- private static final Logger log = Logger.getLogger(OldConnectionsDrainer.class.getName());
-
- private final Endpoint endpoint;
- private final int clusterId;
- private final Duration pollInterval;
- private final Duration connectionTimeToLive;
- private final Duration localQueueTimeOut;
- private final AtomicInteger statusReceivedCounter;
- private final EndpointResultQueue resultQueue;
- private final CountDownLatch stopSignal;
- private final Clock clock;
-
- /**
- * Previous connections on which we may have sent operations and are still waiting for the results
- * All connections in this are in state SESSION_SYNCED.
- */
- private final List<GatewayConnection> connections = new CopyOnWriteArrayList<>();
-
- OldConnectionsDrainer(Endpoint endpoint,
- int clusterId,
- Duration pollInterval,
- Duration connectionTimeToLive,
- Duration localQueueTimeOut,
- AtomicInteger statusReceivedCounter,
- EndpointResultQueue resultQueue,
- CountDownLatch stopSignal,
- Clock clock) {
- this.endpoint = endpoint;
- this.clusterId = clusterId;
- this.pollInterval = pollInterval;
- this.connectionTimeToLive = connectionTimeToLive;
- this.localQueueTimeOut = localQueueTimeOut;
- this.statusReceivedCounter = statusReceivedCounter;
- this.resultQueue = resultQueue;
- this.stopSignal = stopSignal;
- this.clock = clock;
- }
-
- /** Add another old connection to this for draining */
- public void add(GatewayConnection connection) {
- connections.add(connection);
- }
-
- @Override
- public void run() {
- while (stopSignal.getCount() > 0) {
- try {
- checkOldConnections();
- Thread.sleep(pollInterval.toMillis());
- }
- catch (InterruptedException e) {
- log.log(Level.WARNING, "Close thread was interrupted: " + e.getMessage(), e);
- Thread.currentThread().interrupt();
- return;
- } catch (Exception e) {
- log.log(Level.WARNING, "Connection draining failed: " + e.getMessage(), e);
- }
- }
- }
-
- public void checkOldConnections() {
- for (GatewayConnection connection : connections) {
- if (!resultQueue.hasInflightOperations(connection)) {
- log.fine(() -> connection + " no longer has inflight operations");
- closeConnection(connection);
- } else if (closingTime(connection).isBefore(clock.instant())) {
- log.fine(() -> connection + " still has inflight operations, but drain period is over");
- tryPollAndDrainInflightOperations(connection);
- closeConnection(connection);
- } else if (timeToPoll(connection)) {
- tryPollAndDrainInflightOperations(connection);
- }
- }
- }
-
- private void closeConnection(GatewayConnection connection) {
- log.fine(() -> "Closing " + connection);
- connection.close();
- connections.remove(connection); // Safe as CopyOnWriteArrayList allows removal during iteration
- }
-
- private void tryPollAndDrainInflightOperations(GatewayConnection connection) {
- try {
- log.fine(() -> "Polling and draining inflight operations for " + connection);
- IOThread.processResponse(connection.poll(), endpoint, clusterId, statusReceivedCounter, resultQueue);
- } catch (Exception e) {
- // Old connection; best effort
- log.log(Level.FINE, e, () -> "Polling status of inflight operations failed: " + e.getMessage());
- }
- }
-
- private boolean timeToPoll(GatewayConnection connection) {
- // connectionEndOfLife < connectionLastPolled < now
- Instant now = clock.instant();
- Instant endOfLife = connection.connectionTime().plus(connectionTimeToLive);
- if (connection.lastPollTime() == null) return endOfLife.plus(pollInterval).isBefore(now);
- if (connection.lastPollTime().plus(pollInterval).isAfter(now)) return false;
-
- // Exponential (2^x) dropoff:
- double connectionEndOfLife = endOfLife.toEpochMilli();
- double connectionLastPolled = connection.lastPollTime().toEpochMilli();
- return now.toEpochMilli() - connectionEndOfLife > 2 * (connectionLastPolled - connectionEndOfLife);
- }
-
- private Instant closingTime(GatewayConnection connection) {
- return connection.connectionTime().plus(connectionTimeToLive).plus(localQueueTimeOut);
- }
-
- private void close() {
- int size = resultQueue.getPendingSize();
- if (size > 0) {
- log.info("We have outstanding operations (" + size + ") , trying to fetch responses.");
- for (GatewayConnection connection : connections) {
- try {
- IOThread.processResponse(connection.poll(), endpoint, clusterId, statusReceivedCounter, resultQueue);
- } catch (Throwable e) {
- log.log(Level.SEVERE, "Some failures while trying to get latest responses from vespa.", e);
- }
- }
- }
- for (GatewayConnection oldConnection : connections)
- oldConnection.close();
- }
-
- /** For testing. Returns the old connections of this. */
- public List<GatewayConnection> connections() { return Collections.unmodifiableList(connections); }
-
- }
-
-}
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/ConcurrentDocumentOperationBlocker.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/ConcurrentDocumentOperationBlocker.java
deleted file mode 100644
index 79a04d8f043..00000000000
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/ConcurrentDocumentOperationBlocker.java
+++ /dev/null
@@ -1,69 +0,0 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.vespa.http.client.core.operationProcessor;
-
-import java.util.concurrent.Semaphore;
-
-/**
- * A semaphore that can be re-sized.
- *
- * @author dybis
- */
-final public class ConcurrentDocumentOperationBlocker {
-
- private static final int INITIAL_SIZE = 0;
- private final ReducableSemaphore semaphore = new ReducableSemaphore();
- private int maxConcurrency = INITIAL_SIZE;
- private final Object monitor = new Object();
-
- /*
- * Resizes the semaphore. It does not wait for threads that are in the queue when downsizing.
- */
- void setMaxConcurrency(int maxConcurrency) {
- synchronized (monitor) {
- int deltaConcurrency = maxConcurrency - this.maxConcurrency;
-
- if (deltaConcurrency > 0) {
- semaphore.release(deltaConcurrency);
- }
- if (deltaConcurrency < 0) {
- semaphore.reducePermits(-1 * deltaConcurrency);
- }
- this.maxConcurrency = maxConcurrency;
- }
- }
-
- /**
- * Release a permit.
- */
- void operationDone() {
- semaphore.release();
- }
-
- /**
- * Acquire a permit. Blocking if no permits available.
- */
- void startOperation() throws InterruptedException {
- semaphore.acquire();
- }
-
- int availablePermits() {
- return semaphore.availablePermits();
- }
-
- /**
- * We need to extend Semaphore to get access to protected reducePermit() method.
- */
- @SuppressWarnings("serial")
- private static final class ReducableSemaphore extends Semaphore {
-
- ReducableSemaphore() {
- super(INITIAL_SIZE, true /* FIFO */);
- }
-
- @Override
- protected void reducePermits(int reduction) {
- super.reducePermits(reduction);
- }
- }
-
-}
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/DocumentSendInfo.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/DocumentSendInfo.java
deleted file mode 100644
index b72a6c67398..00000000000
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/DocumentSendInfo.java
+++ /dev/null
@@ -1,75 +0,0 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.vespa.http.client.core.operationProcessor;
-
-import com.yahoo.vespa.http.client.Result;
-import com.yahoo.vespa.http.client.core.Document;
-
-import java.time.Clock;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * Keeps an overview of what is sent and what is received for an operation.
- * This class is not thread-safe.
- */
-class DocumentSendInfo {
-
- private final Document document;
- private final Map<Integer, Result.Detail> detailByClusterId = new HashMap<>();
- // This is lazily populated as normal cases does not require retries.
- private Map<Integer, Integer> attemptedRetriesByClusterId = null;
- private final StringBuilder localTrace;
- private final Clock clock;
-
- DocumentSendInfo(Document document, boolean traceThisDoc, Clock clock) {
- this.document = document;
- localTrace = traceThisDoc ? new StringBuilder("\n" + document.createTime() + " Trace starting " + "\n")
- : null;
- this.clock = clock;
- }
-
- boolean addIfNotAlreadyThere(Result.Detail detail, int clusterId) {
- if (detailByClusterId.containsKey(clusterId)) {
- if (localTrace != null) {
- localTrace.append(clock.millis() + " Got duplicate detail, ignoring this: " +
- detail.toString() + "\n");
- }
- return false;
- }
- if (localTrace != null) {
- localTrace.append(clock.millis() + " Got detail: " + detail.toString() + "\n");
- }
- detailByClusterId.put(clusterId, detail);
- return true;
- }
-
- int detailCount() {
- return detailByClusterId.size();
- }
-
- public Result createResult() {
- return new Result(document, detailByClusterId.values(), localTrace);
- }
-
- int incRetries(int clusterId, Result.Detail detail) {
- if (attemptedRetriesByClusterId == null) {
- attemptedRetriesByClusterId = new HashMap<>();
- }
- int retries = 0;
- if (attemptedRetriesByClusterId.containsKey(clusterId)) {
- retries = attemptedRetriesByClusterId.get(clusterId);
- }
- retries++;
- attemptedRetriesByClusterId.put(clusterId, retries);
- if (localTrace != null) {
- localTrace.append(clock.millis() + " Asked about retrying for cluster ID "
- + clusterId + ", number of retries is " + retries + " Detail:\n" + detail.toString());
- }
- return retries;
- }
-
- Document getDocument() {
- return document;
- }
-
-}
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/EndPointResultFactory.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/EndPointResultFactory.java
deleted file mode 100644
index 9af63f6637a..00000000000
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/EndPointResultFactory.java
+++ /dev/null
@@ -1,92 +0,0 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.vespa.http.client.core.operationProcessor;
-
-import com.yahoo.vespa.http.client.Result;
-import com.yahoo.vespa.http.client.config.Endpoint;
-import com.yahoo.vespa.http.client.core.EndpointResult;
-import com.yahoo.vespa.http.client.core.OperationStatus;
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.logging.Logger;
-
-/**
- * @author Einar M R Rosenvinge
- */
-public final class EndPointResultFactory {
-
- private static final Logger log = Logger.getLogger(EndPointResultFactory.class.getName());
- private static final String EMPTY_MESSAGE = "-";
-
- public static Collection<EndpointResult> createResult(Endpoint endpoint,
- InputStream inputStream) throws IOException {
- List<EndpointResult> results = new ArrayList<>();
- try (BufferedReader reader = new BufferedReader(
- new InputStreamReader(inputStream, StandardCharsets.US_ASCII))) {
- String line;
- while ((line = reader.readLine()) != null) {
- results.add(parseResult(line, endpoint));
- }
- }
- return results;
- }
-
- public static EndpointResult createError(Endpoint endpoint, String operationId, Exception exception) {
- return new EndpointResult(operationId, new Result.Detail(endpoint,
- Result.ResultType.FATAL_ERROR,
- null,
- exception));
- }
-
- public static EndpointResult createTransientError(Endpoint endpoint, String operationId, Exception exception) {
- return new EndpointResult(operationId, new Result.Detail(endpoint,
- Result.ResultType.TRANSITIVE_ERROR,
- null,
- exception));
- }
-
- private static Result.ResultType replyToResultType(OperationStatus reply) {
- // The ordering below is important, e.g. if success, it is never a transient error even if isTransient is true.
- if (reply.errorCode.isSuccess())
- return Result.ResultType.OPERATION_EXECUTED;
- if (reply.isConditionNotMet)
- return Result.ResultType.CONDITION_NOT_MET;
- if (reply.errorCode.isTransient())
- return Result.ResultType.TRANSITIVE_ERROR;
- return Result.ResultType.FATAL_ERROR;
- }
-
- private static EndpointResult parseResult(String line, Endpoint endpoint) {
- try {
- OperationStatus reply = OperationStatus.parse(line);
- String message;
- if (EMPTY_MESSAGE.equals(reply.message)) {
- message = null;
- } else {
- message = reply.message;
- }
- Exception exception = null;
- if (!reply.errorCode.isSuccess() && message != null) {
- exception = new RuntimeException(message);
- }
- if (reply.traceMessage != null && !reply.traceMessage.isEmpty()) {
- log.fine("Got trace message: " + reply.traceMessage);
- }
- return new EndpointResult(
- reply.operationId,
- new Result.Detail(endpoint,
- replyToResultType(reply),
- reply.traceMessage,
- exception));
- } catch (Throwable t) {
- throw new IllegalArgumentException("Bad result line from server: '" + line + "'", t);
- }
- }
-
-}
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/IncompleteResultsThrottler.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/IncompleteResultsThrottler.java
deleted file mode 100644
index a88d5af8094..00000000000
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/IncompleteResultsThrottler.java
+++ /dev/null
@@ -1,181 +0,0 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.vespa.http.client.core.operationProcessor;
-
-import com.yahoo.vespa.http.client.core.ThrottlePolicy;
-
-import java.time.Clock;
-import java.util.concurrent.ThreadLocalRandom;
-
-/**
- * Adjusts in-flight operations based on throughput. It will walk the graph and try to find
- * local optimum.
- *
- * It looks at the throughput, adjust max in-flight based on the previous throughput and settings.
- *
- * In the beginning it moves faster, and then stabilizes.
- *
- * It will wait a bit after adjusting before it starts to sample, since there is a latency between adjustment
- * and result.
- *
- * There are several mechanisms to reduce impact of several clients running in parallel. The window size has a
- * random part, and the wait time before sampling after adjustment has a random part as well.
- *
- * To avoid running wild with large values of max-in flight, it is tuned to stay on the smaller part, and
- * rather reduce max-in flight than to have a too large value.
- *
- * In case the where the queue is moved to minimum size, it will now and then increase queue size to get
- * more sample data and possibly grow size.
- *
- * Class is fully thread safe, i.e. all public methods are thread safe.
- *
- * @author dybis
- */
-public class IncompleteResultsThrottler {
-
- private final ConcurrentDocumentOperationBlocker blocker = new ConcurrentDocumentOperationBlocker();
- private final int maxInFlightValue;
- private final int minInFlightValue;
- private final ThrottlePolicy policy;
-
- // 9-11 seconds with some randomness to avoid fully synchronous feeders.
- public final long phaseSizeMs = 9000 + (ThreadLocalRandom.current().nextInt() % 2000);
- private final Clock clock;
-
- private final Object monitor = new Object();
- private long sampleStartTimeMs = 0;
- private int previousNumOk = 0;
- private int previousMaxInFlight = 0;
- private int stabilizingPhasesLeft = 0;
- private int adjustCycleCount = 0;
- private int maxInFlightNow;
- private int numOk = 0;
- private int minWindowSizeCounter = 0;
- private int minPermitsAvailable = 0;
-
- protected static int INITIAL_MAX_IN_FLIGHT_VALUE = 200;
- protected static int SECOND_MAX_IN_FLIGHT_VALUE = 270;
- private StringBuilder debugMessage = new StringBuilder();
-
- /**
- * Creates the throttler.
- *
- * @param minInFlightValue the throttler will never throttle beyond this limit.
- * @param maxInFlightValue the throttler will never throttle above this limit. If zero, no limit.
- * @param clock use to calculate window size. Can be null if minWindowSize and maxInFlightValue are equal.
- * @param policy is the algorithm for finding next value of the number of in-flight documents operations.
- */
- public IncompleteResultsThrottler(int minInFlightValue, int maxInFlightValue, Clock clock, ThrottlePolicy policy) {
- this.maxInFlightValue = maxInFlightValue == 0 ? Integer.MAX_VALUE : maxInFlightValue;
- this.minInFlightValue = minInFlightValue == 0 ? this.maxInFlightValue : minInFlightValue;
- this.policy = policy;
- this.clock = clock;
- if (minInFlightValue != maxInFlightValue) {
- this.sampleStartTimeMs = clock.millis();
- }
- setNewSemaphoreSize(INITIAL_MAX_IN_FLIGHT_VALUE);
- }
-
- public int availableCapacity() {
- return blocker.availablePermits();
- }
-
- public void operationStart() {
- try {
- blocker.startOperation();
- } catch (InterruptedException e) {
- // Ignore
- }
- if (maxInFlightValue != minInFlightValue) {
- synchronized (monitor) {
- adjustThrottling();
- }
- }
- }
-
- public String getDebugMessage() {
- synchronized (monitor) {
- return debugMessage.toString();
- }
- }
-
- public void resultReady(boolean success) {
- blocker.operationDone();
- if (!success) {
- return;
- }
- synchronized (monitor) {
- numOk++;
- minPermitsAvailable = Math.min(minPermitsAvailable, blocker.availablePermits());
- }
- }
-
- // Only for testing
- protected int waitingThreads() {
- synchronized (monitor) {
- return maxInFlightNow - blocker.availablePermits();
- }
- }
-
- private double getCeilingDifferencePerformance(int adjustCycle) {
- // We want larger adjustments in the early phase.
- if (adjustCycle > 10) {
- return 0.7;
- }
- return 1.2;
- }
-
- private void adjustCycle() {
- adjustCycleCount++;
- stabilizingPhasesLeft = adjustCycleCount < 5 ? 1 : 2 + ThreadLocalRandom.current().nextInt() % 2;
-
- double maxPerformanceChange = getCeilingDifferencePerformance(adjustCycleCount);
- boolean messagesQueued = minPermitsAvailable < 2;
-
- int newMaxInFlight = policy.calcNewMaxInFlight(
- maxPerformanceChange, numOk, previousNumOk, previousMaxInFlight, maxInFlightNow, messagesQueued);
- debugMessage = new StringBuilder();
- debugMessage.append("previousMaxInFlight: " + previousMaxInFlight
- + " maxInFlightNow: " + maxInFlightNow
- + " numOk: " + numOk + " " + " previousOk: " + previousNumOk
- + " new size is: " + newMaxInFlight);
- previousMaxInFlight = maxInFlightNow;
- previousNumOk = numOk;
-
- setNewSemaphoreSize(adjustCycleCount == 1 ? SECOND_MAX_IN_FLIGHT_VALUE : newMaxInFlight);
- }
-
- private void adjustThrottling() {
- if (clock.millis() < sampleStartTimeMs + phaseSizeMs) return;
-
- sampleStartTimeMs += phaseSizeMs;
-
- if (stabilizingPhasesLeft-- == 0) {
- adjustCycle();
- }
- numOk = 0;
- this.minPermitsAvailable = maxInFlightNow;
- }
-
- private int tryBoostingSizeIfMinValueOverSeveralCycles(final int size) {
- if (size <= minInFlightValue) {
- minWindowSizeCounter++;
- } else {
- minWindowSizeCounter = 0;
- }
- if (minWindowSizeCounter == 4) {
- debugMessage.append(" (inc max in flight to get more data)");
- minWindowSizeCounter = 0;
- return size + 10;
- }
- return size;
-
- }
-
- private void setNewSemaphoreSize(final int size) {
- maxInFlightNow =
- Math.max(minInFlightValue, Math.min(
- tryBoostingSizeIfMinValueOverSeveralCycles(size), maxInFlightValue));
- blocker.setMaxConcurrency(maxInFlightNow);
- }
-
-}
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessor.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessor.java
deleted file mode 100644
index 5d0e063efdf..00000000000
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessor.java
+++ /dev/null
@@ -1,307 +0,0 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.vespa.http.client.core.operationProcessor;
-
-import com.google.common.collect.ArrayListMultimap;
-import com.yahoo.vespa.http.client.FeedClient;
-import com.yahoo.vespa.http.client.FeedEndpointException;
-import com.yahoo.vespa.http.client.Result;
-import com.yahoo.vespa.http.client.config.Cluster;
-import com.yahoo.vespa.http.client.config.SessionParams;
-import com.yahoo.vespa.http.client.core.Document;
-import com.yahoo.vespa.http.client.core.communication.EndpointIOException;
-import com.yahoo.vespa.http.client.core.EndpointResult;
-import com.yahoo.vespa.http.client.core.Exceptions;
-import com.yahoo.vespa.http.client.core.communication.ClusterConnection;
-
-import java.math.BigInteger;
-import java.security.SecureRandom;
-import java.time.Clock;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Random;
-import java.util.Set;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-/**
- * Merges several endpointResult into one Result and does the callback.
- *
- * @author dybis
- */
-public class OperationProcessor {
-
- private static final Logger log = Logger.getLogger(OperationProcessor.class.getName());
- private final Map<String, DocumentSendInfo> docSendInfoByOperationId = new LinkedHashMap<>();
- private final ArrayListMultimap<String, Document> blockedDocumentsByDocumentId = ArrayListMultimap.create();
- private final Set<String> inflightDocumentIds = new HashSet<>();
- private final int numDestinations;
- private final FeedClient.ResultCallback resultCallback;
- private final Object monitor = new Object();
- private final IncompleteResultsThrottler incompleteResultsThrottler;
- // Position in the array is cluster ID.
- private final List<ClusterConnection> clusters = new ArrayList<>();
- private final ScheduledThreadPoolExecutor timeoutExecutor;
- private final OperationStats operationStats;
- private final int maxRetries;
- private final long minTimeBetweenRetriesMs;
- private final Random random = new SecureRandom();
- private final int traceEveryXOperation;
- private int traceCounter = 0;
- private final boolean traceToStderr;
- private final ThreadGroup ioThreadGroup;
- private final String clientId = new BigInteger(130, random).toString(32);
- private final Clock clock;
-
- public OperationProcessor(IncompleteResultsThrottler incompleteResultsThrottler,
- FeedClient.ResultCallback resultCallback,
- SessionParams sessionParams,
- ScheduledThreadPoolExecutor timeoutExecutor,
- Clock clock) {
- this.numDestinations = sessionParams.getClusters().size();
- this.resultCallback = resultCallback;
- this.incompleteResultsThrottler = incompleteResultsThrottler;
- this.timeoutExecutor = timeoutExecutor;
- this.ioThreadGroup = new ThreadGroup("operationprocessor");
- this.clock = clock;
-
- if (sessionParams.getClusters().isEmpty())
- throw new IllegalArgumentException("Cannot feed to 0 clusters.");
-
- for (Cluster cluster : sessionParams.getClusters()) {
- if (cluster.getEndpoints().isEmpty())
- throw new IllegalArgumentException("Cannot feed to empty cluster.");
- }
-
- for (int i = 0; i < sessionParams.getClusters().size(); i++) {
- Cluster cluster = sessionParams.getClusters().get(i);
- clusters.add(new ClusterConnection(this,
- sessionParams.getFeedParams(),
- sessionParams.getConnectionParams(),
- cluster,
- i,
- sessionParams.getClientQueueSize() / sessionParams.getClusters().size(),
- timeoutExecutor,
- clock));
- }
- operationStats = new OperationStats(sessionParams, clusters, incompleteResultsThrottler);
- maxRetries = sessionParams.getConnectionParams().getMaxRetries();
- minTimeBetweenRetriesMs = sessionParams.getConnectionParams().getMinTimeBetweenRetriesMs();
- traceEveryXOperation = sessionParams.getConnectionParams().getTraceEveryXOperation();
- traceToStderr = sessionParams.getConnectionParams().getPrintTraceToStdErr();
- }
-
- public ThreadGroup getIoThreadGroup() {
- return ioThreadGroup;
- }
-
- public int getIncompleteResultQueueSize() {
- synchronized (monitor) {
- return docSendInfoByOperationId.size();
- }
- }
-
- /** Returns the id of the oldest operation to be sent. */
- public Optional<String> oldestIncompleteResultId() {
- synchronized (monitor) {
- return docSendInfoByOperationId.isEmpty()
- ? Optional.empty()
- : Optional.of(docSendInfoByOperationId.keySet().iterator().next());
- }
- }
-
- public String getClientId() {
- return clientId;
- }
-
- private boolean retriedThis(EndpointResult endpointResult, DocumentSendInfo documentSendInfo, int clusterId) {
- Result.Detail detail = endpointResult.getDetail();
- if (detail.getResultType() == Result.ResultType.OPERATION_EXECUTED) return false; // Success: No retries
-
- int retries = documentSendInfo.incRetries(clusterId, detail);
- if (retries > maxRetries) return false;
-
- String exceptionMessage = detail.getException() == null ? "" : detail.getException().getMessage();
- if (exceptionMessage == null)
- exceptionMessage = "";
-
- // TODO: Return proper error code in structured data in next version of internal API.
- // Error codes from messagebus/src/cpp/messagebus/errorcode.h
- boolean retryThisOperation =
- detail.getResultType() == Result.ResultType.TRANSITIVE_ERROR ||
- exceptionMessage.contains("SEND_QUEUE_CLOSED") ||
- exceptionMessage.contains("ILLEGAL_ROUTE") ||
- exceptionMessage.contains("NO_SERVICES_FOR_ROUTE") ||
- exceptionMessage.contains("NETWORK_ERROR") ||
- exceptionMessage.contains("SEQUENCE_ERROR") ||
- exceptionMessage.contains("NETWORK_SHUTDOWN") ||
- exceptionMessage.contains("TIMEOUT");
-
- if (retryThisOperation) {
- int waitTime = (int) (minTimeBetweenRetriesMs * (1 + random.nextDouble() / 3));
- log.finest("Retrying due to " + detail + " attempt " + retries + " in " + waitTime + " ms.");
- timeoutExecutor.schedule(() -> postToCluster(clusters.get(clusterId), documentSendInfo.getDocument()),
- waitTime,
- TimeUnit.MILLISECONDS);
- return true;
- }
-
- return false;
- }
-
- private Result process(EndpointResult endpointResult, int clusterId) {
- Result result;
- Document blockedDocumentToSend = null;
- synchronized (monitor) {
- if (!docSendInfoByOperationId.containsKey(endpointResult.getOperationId())) {
- log.finer("Received out-of-order or too late result, discarding: " + endpointResult);
- return null;
- }
- DocumentSendInfo documentSendInfo = docSendInfoByOperationId.get(endpointResult.getOperationId());
-
- if (retriedThis(endpointResult, documentSendInfo, clusterId)) return null;
-
- // Duplicate message
- if ( ! documentSendInfo.addIfNotAlreadyThere(endpointResult.getDetail(), clusterId)) return null;
-
- // Is this the last operation we are waiting for?
- if (documentSendInfo.detailCount() != numDestinations) return null;
-
- result = documentSendInfo.createResult();
- docSendInfoByOperationId.remove(endpointResult.getOperationId());
-
- String documentId = documentSendInfo.getDocument().getDocumentId();
- // If we got a pending operation against this document
- // dont't remove it from inflightDocuments and send blocked document operation
- List<Document> blockedDocuments = blockedDocumentsByDocumentId.get(documentId);
- if (blockedDocuments.isEmpty()) {
- inflightDocumentIds.remove(documentId);
- } else {
- blockedDocumentToSend = blockedDocuments.remove(0);
- }
- }
- if (blockedDocumentToSend != null) {
- sendToClusters(blockedDocumentToSend, clock);
- }
- return result;
- }
-
- public void resultReceived(EndpointResult endpointResult, int clusterId) {
- Result result = process(endpointResult, clusterId);
- if (result != null) {
- incompleteResultsThrottler.resultReady(result.isSuccess());
- resultCallback.onCompletion(result.getDocumentId(), result);
- if (traceToStderr && result.hasLocalTrace()) {
- System.err.println(result.toString());
- }
- }
- }
-
- public void onEndpointError(FeedEndpointException e) {
- resultCallback.onEndpointException(e);
- }
-
- public List<Exception> closeClusters() {
- List<Exception> exceptions = new ArrayList<>();
- // first, close cluster sessions and allow connections to drain normally
- for (ClusterConnection cluster : clusters) {
- try {
- cluster.close();
- } catch (Exception e) {
- exceptions.add(e);
- }
- }
- return exceptions;
- }
-
- public void sendDocument(Document document) {
- incompleteResultsThrottler.operationStart();
-
- synchronized (monitor) {
- if (inflightDocumentIds.contains(document.getDocumentId())) {
- blockedDocumentsByDocumentId.put(document.getDocumentId(), document);
- return;
- }
- inflightDocumentIds.add(document.getDocumentId());
- }
-
- sendToClusters(document, clock);
- }
-
- private void sendToClusters(Document document, Clock clock) {
- synchronized (monitor) {
- boolean traceThisDoc = traceEveryXOperation > 0 && traceCounter++ % traceEveryXOperation == 0;
- docSendInfoByOperationId.put(document.getOperationId(), new DocumentSendInfo(document, traceThisDoc, clock));
- }
-
- for (ClusterConnection clusterConnection : clusters) {
- postToCluster(clusterConnection, document);
- }
- }
-
- private void postToCluster(ClusterConnection clusterConnection, Document document) {
- try {
- clusterConnection.post(document);
- } catch (EndpointIOException eio) {
- resultReceived(EndPointResultFactory.createError(eio.getEndpoint(),
- document.getOperationId(),
- eio),
- clusterConnection.getClusterId());
- }
- }
-
- public List<ClusterConnection> clusters() { return Collections.unmodifiableList(clusters); }
-
- public String getStatsAsJson() {
- return operationStats.getStatsAsJson();
- }
-
- public void close() {
- List<Exception> exceptions = closeClusters();
- try {
- closeExecutor();
- } catch (InterruptedException e) {
- exceptions.add(e);
- }
-
- if (exceptions.isEmpty()) {
- return;
- }
- if (exceptions.size() == 1) {
- if (exceptions.get(0) instanceof RuntimeException) {
- throw (RuntimeException) exceptions.get(0);
- } else {
- throw new RuntimeException(exceptions.get(0));
- }
- }
-
- StringBuilder b = new StringBuilder();
- b.append("Exception thrown while closing one or more clusters: ");
- for (int i = 0; i < exceptions.size(); i++) {
- Exception e = exceptions.get(i);
- b.append(Exceptions.toMessageString(e));
- if (i != (exceptions.size() - 1)) {
- b.append(", ");
- }
- }
- throw new RuntimeException(b.toString(), exceptions.get(0));
- }
-
- private void closeExecutor() throws InterruptedException {
- log.log(Level.FINE, "Shutting down timeout executor.");
- timeoutExecutor.shutdownNow();
-
- log.log(Level.FINE, "Awaiting termination of already running timeout tasks.");
- if (! timeoutExecutor.awaitTermination(300, TimeUnit.SECONDS)) {
- log.severe("Did not manage to shut down the executors within 300 secs, system stuck?");
- throw new RuntimeException("Did not manage to shut down retry threads. Please report problem.");
- }
- }
-
-}
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationStats.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationStats.java
deleted file mode 100644
index 1eebe593062..00000000000
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationStats.java
+++ /dev/null
@@ -1,76 +0,0 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.vespa.http.client.core.operationProcessor;
-
-import com.fasterxml.jackson.core.JsonGenerator;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
-import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
-import com.yahoo.vespa.http.client.config.SessionParams;
-import com.yahoo.vespa.http.client.core.communication.ClusterConnection;
-
-import java.io.IOException;
-import java.io.StringWriter;
-import java.util.List;
-
-public class OperationStats {
-
- private static ObjectMapper jsonMapper = createMapper();
-
- private final String sessionParamsAsXmlString;
- private List<ClusterConnection> clusters;
- private IncompleteResultsThrottler throttler;
-
- public OperationStats(
- SessionParams sessionParams,
- List<ClusterConnection> clusters,
- IncompleteResultsThrottler throttler) {
- this.sessionParamsAsXmlString = generateSessionParamsAsXmlString(sessionParams);
- this.clusters = clusters;
- this.throttler = throttler;
- }
-
- private static ObjectMapper createMapper() {
- ObjectMapper mapper = new ObjectMapper();
- mapper.registerModule(new Jdk8Module());
- mapper.registerModule(new JavaTimeModule());
- return mapper;
- }
-
- private String generateSessionParamsAsXmlString(final SessionParams sessionParams) {
- StringWriter stringWriter = new StringWriter();
- try {
- JsonGenerator jsonGenerator = jsonMapper.createGenerator(stringWriter);
- jsonMapper.writeValue(jsonGenerator, sessionParams); // TODO SessionParams should not be blindly serialized. This may serialize objects that are not really serializable.
- return stringWriter.toString();
- } catch (IOException e) {
- return e.getMessage();
- }
- }
-
- public String getStatsAsJson() {
- try {
- StringWriter stringWriter = new StringWriter();
- JsonGenerator jsonGenerator = jsonMapper.createGenerator(stringWriter);
- jsonGenerator.writeStartObject();
- jsonGenerator.writeArrayFieldStart("clusters");
- for (ClusterConnection cluster : clusters) {
- jsonGenerator.writeStartObject();
- jsonGenerator.writeNumberField("clusterid", cluster.getClusterId());
- jsonGenerator.writeFieldName("stats");
- jsonGenerator.writeRawValue(cluster.getStatsAsJSon());
- jsonGenerator.writeEndObject();
- }
- jsonGenerator.writeEndArray();
- jsonGenerator.writeFieldName("sessionParams");
- jsonGenerator.writeRawValue(sessionParamsAsXmlString);
- jsonGenerator.writeFieldName("throttleDebugMessage");
- jsonGenerator.writeRawValue("\"" + throttler.getDebugMessage() + "\"");
- jsonGenerator.writeEndObject();
- jsonGenerator.close();
- return stringWriter.toString();
- } catch (IOException e) {
- return "{ \"Error\" : \""+ e.getMessage() + "\"}";
- }
- }
-
-}
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/package-info.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/package-info.java
deleted file mode 100644
index 13d4e768daf..00000000000
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/package-info.java
+++ /dev/null
@@ -1,2 +0,0 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.vespa.http.client.core;
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/package-info.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/package-info.java
deleted file mode 100644
index 958d3793875..00000000000
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/package-info.java
+++ /dev/null
@@ -1,9 +0,0 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-/**
- * Programmatic API for feeding to Vespa clusters independently of the
- * cluster configuration.
- *
- * NOTE: This is a PUBLIC API, but not annotated as such because this is not a bundle and
- * we don't want to introduce Vespa dependencies.
- */
-package com.yahoo.vespa.http.client;
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/runner/CommandLineArguments.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/runner/CommandLineArguments.java
deleted file mode 100644
index 7ccdf3ebd43..00000000000
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/runner/CommandLineArguments.java
+++ /dev/null
@@ -1,323 +0,0 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.vespa.http.client.runner;
-
-import com.google.common.base.Splitter;
-import com.yahoo.vespa.http.client.config.Cluster;
-import com.yahoo.vespa.http.client.config.ConnectionParams;
-import com.yahoo.vespa.http.client.config.Endpoint;
-import com.yahoo.vespa.http.client.config.FeedParams;
-import com.yahoo.vespa.http.client.config.SessionParams;
-import io.airlift.command.Command;
-import io.airlift.command.HelpOption;
-import io.airlift.command.Option;
-import io.airlift.command.SingleCommand;
-import org.apache.http.Header;
-import org.apache.http.ParseException;
-import org.apache.http.conn.ssl.NoopHostnameVerifier;
-import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
-import org.apache.http.message.BasicLineParser;
-
-import javax.inject.Inject;
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Optional;
-import java.util.concurrent.TimeUnit;
-
-/**
- * Commandline interface for the binary.
- *
- * @author dybis
- */
-@Command(name = "vespa-http-client",
- description = "This is a tool for feeding xml or json data to a Vespa application.")
-public class CommandLineArguments {
-
- /**
- * Creates a CommandLineArguments instance and populates it with data.
- *
- * @param args array of arguments.
- * @return null on failure or if help option is set to true.
- */
- static CommandLineArguments build(String[] args) {
- CommandLineArguments cmdArgs;
- try {
- cmdArgs = SingleCommand.singleCommand(CommandLineArguments.class).parse(args);
- } catch (Exception e) {
- System.err.println(e.getMessage());
- System.err.println("Use --help to show usage.\n");
- return null;
- }
- if (cmdArgs.helpOption.showHelpIfRequested()) {
- return null;
- }
- if (cmdArgs.endpointArg != null) {
- if (cmdArgs.hostArg != null) {
- System.err.println("Cannot set both '--host' and '--endpoint' ");
- return null;
- }
- try {
- URL url = new URL(cmdArgs.endpointArg);
- } catch (MalformedURLException e) {
- e.printStackTrace(System.err);
- return null;
- }
- } else {
- if (cmdArgs.hostArg == null) {
- System.err.println("'--host' or '--endpoint' not set.");
- return null;
- }
- }
- if (cmdArgs.priorityArg != null && ! checkPriorityFlag(cmdArgs.priorityArg)) {
- return null;
- }
-
- for (String header : cmdArgs.headers) {
- try {
- cmdArgs.parsedHeaders.add(BasicLineParser.parseHeader(header, null));
- } catch (ParseException e) {
- System.err.printf("Invalid header: '%s' (%s)%n", header, e.getMessage());
- return null;
- }
- }
-
- if (cmdArgs.privateKeyPath == null && cmdArgs.certificatePath != null ||
- cmdArgs.privateKeyPath != null && cmdArgs.certificatePath == null) {
- System.err.println("Both '--privateKey' and '--certificate' must be set");
- return null;
- }
-
- return cmdArgs;
- }
-
- private static boolean checkPriorityFlag(String priorityArg) {
- switch (priorityArg) {
- case "HIGHEST":
- case "VERY_HIGH":
- case "HIGH_1":
- case "HIGH_2":
- case "HIGH_3":
- case "NORMAL_1":
- case "NORMAL_2":
- case "NORMAL_3":
- case "NORMAL_4":
- case "NORMAL_5":
- case "NORMAL_6":
- case "LOW_1":
- case "LOW_2":
- case "LOW_3":
- case "VERY_LOW":
- case "LOWEST":
- return true;
- default:
- System.err.println("Not valid value for priority. Allowed values are HIGHEST, VERY_HIGH, HIGH_[1-3], " +
- "NORMAL_[1-6], LOW_[1-3], VERY_LOW, and LOWEST.");
- return false;
- }
- }
-
- // TODO Don't duplicate default values from ConnectionParams.Builder. Some defaults are already inconsistent.
-
- @Inject
- private HelpOption helpOption;
-
- @Option(name = {"--useV3Protocol"}, description = "Use V3 protocol to gateway. This is the default protocol.")
- private boolean enableV3Protocol = true;
-
- @Option(name = {"--file"},
- description = "The name of the input file to read.")
- private String fileArg = null;
-
- @Option(name = {"--add-root-element-to-xml"},
- description = "Add <vespafeed> tag to XML document, makes it easier to feed raw data.")
- private boolean addRootElementToXml = false;
-
- @Option(name = {"--route"},
- description = "(=default)The route to send the data to.")
- private String routeArg = "default";
-
- @Option(name = {"--endpoint"},
- description = "Vespa endpoint.")
- private String endpointArg;
-
- @Option(name = {"--host"},
- description = "The host(s) for the gateway. If using several, use comma to separate them.")
- private String hostArg;
-
- @Option(name = {"--port"},
- description = "The port for the host of the gateway.")
- private int portArg = 4080;
-
- @Option(name = {"--timeout"},
- description = "(=180) The time (in seconds) allowed for sending operations.")
- private long timeoutArg = 180;
-
- @Option(name = {"--useCompression"},
- description = "Use compression over network.")
- private boolean useCompressionArg = false;
-
- @Option(name = {"--useDynamicThrottling"},
- description = "Try to maximize throughput by using dynamic throttling.")
- private boolean useDynamicThrottlingArg = false;
-
- @Option(name = {"--maxpending"},
- description = "The maximum number of operations that are allowed " +
- "to be pending at any given time.")
- private int maxPendingOperationCountArg = 10000;
-
- @Option(name = {"-v", "--verbose"},
- description = "Enable verbose output of progress.")
- private boolean verboseArg = false;
-
- @Option(name = {"--noretry"},
- description = "Turns off retries of recoverable failures..")
- private boolean noRetryArg = false;
-
- @Option(name = {"--retrydelay"},
- description = "The time (in seconds) to wait between retries of a failed operation.")
- private int retrydelayArg = 1;
-
- @Option(name = {"--trace"},
- description = "(=0 (=off)) The trace level of network traffic.")
- private int traceArg = 0;
-
- @Option(name = {"--printTraceEveryXOperation"},
- description = "(=1) How often to to tracing.")
- private int traceEveryXOperation = 1;
-
- @Option(name = {"--validate"},
- description = "Run validation tool on input files instead of feeding them.")
- private boolean validateArg = false;
-
- @Option(name = {"--ignoreConditionNotMet"},
- description = "Ignore condition not met failures.")
- private boolean ignoreConditionNotMet = false;
-
- @Option(name = {"--priority"},
- description = "Specify priority of sent messages, see documentation ")
- private String priorityArg = null;
-
- @Option(name = {"--numPersistentConnectionsPerEndpoint"},
- description = "How many tcp connections to establish per endoint.)")
- private int numPersistentConnectionsPerEndpoint = 4;
-
- @Option(name = {"--maxChunkSizeBytes"},
- description = "How much data to send to gateway in each message.")
- private int maxChunkSizeBytes = 20 * 1024;
-
- @Option(name = {"--whenVerboseEnabledPrintMessageForEveryXDocuments"},
- description = "How often to print verbose message.)")
- private int whenVerboseEnabledPrintMessageForEveryXDocuments = 1000;
-
- @Option(name = {"--useTls"},
- description = "Use TLS when connecting to endpoint")
- private boolean useTls = false;
-
- @Option(name = {"--insecure", "--disable-hostname-verification"},
- description = "Skip hostname verification when using TLS")
- private boolean insecure = false;
-
- @Option(name = {"--header"},
- description = "Add http header to every request. Header must have the format '<Name>: <Value>'. Use this parameter multiple times for multiple headers")
- private List<String> headers = new ArrayList<>();
-
- @Option(name = {"--vespaTls"},
- description = "BETA! Use Vespa TLS configuration from environment if available. Other HTTPS/TLS configuration will be ignored if this is set.")
- private boolean useTlsConfigFromEnvironment = false;
-
- @Option(name = {"--connectionTimeToLive"},
- description = "Maximum time to live for persistent connections. Specified as integer, in seconds.")
- private long connectionTimeToLive = 15;
-
- @Option(name = {"--certificate"},
- description = "Path to a file containing a PEM encoded x509 certificate")
- private String certificatePath;
-
- @Option(name = {"--privateKey"},
- description = "Path to a file containing a PEM encoded private key")
- private String privateKeyPath;
-
- @Option(name = "--caCertificates",
- description = "Path to a file containing a PEM encoded CA certificates")
- private String caCertificatesPath;
-
- private final List<Header> parsedHeaders = new ArrayList<>();
-
- int getWhenVerboseEnabledPrintMessageForEveryXDocuments() {
- return whenVerboseEnabledPrintMessageForEveryXDocuments;
- }
-
- public String getFile() { return fileArg; };
-
- public boolean getVerbose() { return verboseArg; }
-
- public boolean getIgnoreConditionNotMet() { return ignoreConditionNotMet; }
-
- public boolean getAddRootElementToXml() { return addRootElementToXml; }
-
- SessionParams createSessionParams(boolean useJson) {
- int minThrottleValue = useDynamicThrottlingArg ? 10 : 0;
- Path privateKeyPath = Optional.ofNullable(this.privateKeyPath).map(Paths::get).orElse(null);
- Path certificatePath = Optional.ofNullable(this.certificatePath).map(Paths::get).orElse(null);
- Path caCertificatesPath = Optional.ofNullable(this.caCertificatesPath).map(Paths::get).orElse(null);
- ConnectionParams.Builder connectionParamsBuilder = new ConnectionParams.Builder();
- parsedHeaders.forEach(header -> connectionParamsBuilder.addHeader(header.getName(), header.getValue()));
- SessionParams.Builder builder = new SessionParams.Builder()
- .setFeedParams(
- new FeedParams.Builder()
- .setDataFormat(useJson
- ? FeedParams.DataFormat.JSON_UTF8
- : FeedParams.DataFormat.XML_UTF8)
- .setRoute(routeArg)
- .setMaxInFlightRequests(maxPendingOperationCountArg)
- .setClientTimeout(timeoutArg, TimeUnit.SECONDS)
- .setServerTimeout(timeoutArg, TimeUnit.SECONDS)
- .setLocalQueueTimeOut(timeoutArg * 1000)
- .setPriority(priorityArg)
- .setMaxChunkSizeBytes(maxChunkSizeBytes)
- .build()
- )
- .setConnectionParams(
- connectionParamsBuilder
- .setHostnameVerifier(insecure ? NoopHostnameVerifier.INSTANCE :
- SSLConnectionSocketFactory.getDefaultHostnameVerifier())
- .setUseCompression(useCompressionArg)
- .setMaxRetries(noRetryArg ? 0 : 100)
- .setMinTimeBetweenRetries(retrydelayArg, TimeUnit.SECONDS)
- .setDryRun(validateArg)
- .setTraceLevel(traceArg)
- .setTraceEveryXOperation(traceEveryXOperation)
- .setPrintTraceToStdErr(traceArg > 0)
- .setNumPersistentConnectionsPerEndpoint(numPersistentConnectionsPerEndpoint)
- .setCertificateAndPrivateKey(privateKeyPath, certificatePath)
- .setCaCertificates(caCertificatesPath)
- .setUseTlsConfigFromEnvironment(useTlsConfigFromEnvironment)
- .setConnectionTimeToLive(Duration.ofSeconds(connectionTimeToLive))
- .build()
- )
- // Enable dynamic throttling.
- .setThrottlerMinSize(minThrottleValue)
- .setClientQueueSize(maxPendingOperationCountArg);
- if (endpointArg != null) {
- try {
- builder.addCluster(new Cluster.Builder()
- .addEndpoint(Endpoint.create(new URL(endpointArg)))
- .build());
- }
- catch (MalformedURLException e) {} // already checked when parsing arguments
- }
- else {
- Iterable<String> hosts = Splitter.on(',').trimResults().split(hostArg);
- for (String host : hosts) {
- builder.addCluster(new Cluster.Builder().addEndpoint(Endpoint.create(host, portArg, useTls))
- .build());
- }
- }
- return builder.build();
- }
-
-}
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/runner/FormatInputStream.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/runner/FormatInputStream.java
deleted file mode 100644
index dd6fbc29e5f..00000000000
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/runner/FormatInputStream.java
+++ /dev/null
@@ -1,101 +0,0 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.vespa.http.client.runner;
-
-import com.fasterxml.jackson.core.JsonFactory;
-import com.fasterxml.jackson.core.format.DataFormatDetector;
-import com.fasterxml.jackson.core.format.DataFormatMatcher;
-import com.fasterxml.jackson.core.format.MatchStrength;
-import com.fasterxml.jackson.dataformat.xml.XmlFactory;
-
-import java.io.BufferedInputStream;
-import java.io.ByteArrayInputStream;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.SequenceInputStream;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Optional;
-
-/**
- * @author valerijf
- */
-public class FormatInputStream {
-
- private InputStream inputStream;
- private final Format format;
-
- /**
- * Creates a single data input stream from either file or InputStream depending on which one is present. Preference
- * for file if both present. Additionally also detects input data format of the result stream, throws
- * IllegalArgumentException if unable to determine data format.
- *
- * @param stream InputStream of the data if present
- * @param inputFile path to file to use as input
- * @param addRootElementToXml to add vespafeed root element around the input data stream
- * @throws IOException on errors
- */
- public FormatInputStream(InputStream stream, Optional<String> inputFile, boolean addRootElementToXml)
- throws IOException {
- DataFormatDetector dataFormatDetector = new DataFormatDetector(new JsonFactory(), new XmlFactory());
- DataFormatMatcher formatMatcher;
-
- if (inputFile.isPresent()) {
- try (FileInputStream fileInputStream = new FileInputStream(inputFile.get())) {
- formatMatcher = dataFormatDetector.findFormat(fileInputStream);
- }
- inputStream = new FileInputStream(inputFile.get());
-
- } else {
- if (stream.available() == 0)
- System.out.println("No data in stream yet and no file specified, waiting for data.");
-
- inputStream = stream.markSupported() ? stream : new BufferedInputStream(stream);
- inputStream.mark(DataFormatDetector.DEFAULT_MAX_INPUT_LOOKAHEAD);
- formatMatcher = dataFormatDetector.findFormat(inputStream);
- inputStream.reset();
- }
-
- if (addRootElementToXml) {
- inputStream = addVespafeedTag(inputStream);
- format = Format.XML;
- return;
- }
-
- if (formatMatcher.getMatchStrength() == MatchStrength.INCONCLUSIVE
- || formatMatcher.getMatchStrength() == MatchStrength.NO_MATCH) {
- throw new IllegalArgumentException("Could not detect input format");
- }
-
- switch (formatMatcher.getMatchedFormatName().toLowerCase()) {
- case "json":
- format = Format.JSON;
- break;
- case "xml":
- format = Format.XML;
- break;
- default:
- throw new IllegalArgumentException("Unknown data format");
- }
- }
-
- private static InputStream addVespafeedTag(InputStream inputStream) {
- return new SequenceInputStream(Collections.enumeration(Arrays.asList(
- new ByteArrayInputStream("<vespafeed>".getBytes()), inputStream,
- new ByteArrayInputStream("</vespafeed>".getBytes())))
- );
- }
-
- public InputStream getInputStream() {
- return inputStream;
- }
-
- public Format getFormat() {
- return format;
- }
-
- public enum Format {
- JSON, XML
- }
-
-}
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/runner/Runner.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/runner/Runner.java
deleted file mode 100644
index e3e90c8bbfc..00000000000
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/runner/Runner.java
+++ /dev/null
@@ -1,106 +0,0 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.vespa.http.client.runner;
-
-import com.yahoo.vespa.http.client.FeedClient;
-import com.yahoo.vespa.http.client.FeedClientFactory;
-import com.yahoo.vespa.http.client.SimpleLoggerResultCallback;
-import com.yahoo.vespa.http.client.core.JsonReader;
-import com.yahoo.vespa.http.client.core.XmlFeedReader;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.time.Clock;
-import java.util.Optional;
-import java.util.concurrent.atomic.AtomicInteger;
-
-/**
- * @author Einar M R Rosenvinge
- * @author dybis
- */
-public class Runner {
-
- /**
- * Feed data from inputFile to session.
- *
- * @param feedClient where to send data to
- * @param inputStream source of data
- * @param isJson if input stream is of json formatted data
- * @param numSent is updated while sending by this method
- * @param verbose if true will print some information to stderr
- * @return send time in ms, not including validating
- */
- public static long send(FeedClient feedClient,
- InputStream inputStream,
- boolean isJson,
- AtomicInteger numSent,
- boolean verbose) {
- Clock clock = Clock.systemUTC();
- if (verbose)
- System.err.println("Now sending data.");
-
- long sendStartTime = clock.millis();
- if (isJson) {
- JsonReader.read(inputStream, feedClient, numSent);
- } else {
- try {
- XmlFeedReader.read(inputStream, feedClient, numSent);
- } catch (Exception e) {
- System.err.println("Stopped reading feed, got problems with XML: " + e.getMessage());
- }
- }
-
- long sendTotalTime = clock.millis() - sendStartTime;
-
- if (verbose)
- System.err.println("Waiting for all results, sent " + numSent.get() + " docs.");
-
- feedClient.close();
- if (verbose)
- System.err.println("Session closed.");
- return sendTotalTime;
- }
-
-
- public static void main(String[] args) throws IOException {
- CommandLineArguments commandLineArgs = CommandLineArguments.build(args);
- if (commandLineArgs == null)
- System.exit(1);
-
- FormatInputStream formatInputStream = new FormatInputStream(System.in,
- Optional.ofNullable(commandLineArgs.getFile()),
- commandLineArgs.getAddRootElementToXml());
-
- int intervalOfLogging =
- commandLineArgs.getVerbose()
- ? commandLineArgs.getWhenVerboseEnabledPrintMessageForEveryXDocuments()
- : Integer.MAX_VALUE;
- AtomicInteger numSent = new AtomicInteger(0);
- SimpleLoggerResultCallback callback = new SimpleLoggerResultCallback(numSent, intervalOfLogging, commandLineArgs.getIgnoreConditionNotMet());
-
- FeedClient feedClient = FeedClientFactory.create(commandLineArgs.createSessionParams(formatInputStream.getFormat()== FormatInputStream.Format.JSON),
- callback);
-
- long sendTotalTimeMs = send(feedClient,
- formatInputStream.getInputStream(),
- formatInputStream.getFormat() == FormatInputStream.Format.JSON,
- numSent,
- commandLineArgs.getVerbose());
-
- if (commandLineArgs.getVerbose()) {
- System.err.println(feedClient.getStatsAsJson());
- double transferTimeSec = ((double) sendTotalTimeMs) / 1000.0;
- if (transferTimeSec > 0)
- System.err.printf("Docs/sec %.3f%n", numSent.get() / transferTimeSec);
-
- if (commandLineArgs.getFile() != null) {
- double fileSizeMb = ((double) new File(commandLineArgs.getFile()).length()) / 1024.0 / 1024.0;
- System.err.println("Sent " + fileSizeMb + " MB in " + transferTimeSec + " seconds.");
- System.err.println("Speed: " + ((fileSizeMb / transferTimeSec) * 8.0) + " Mbits/sec, + HTTP overhead " +
- "(not taking compression into account)");
- }
- }
- callback.printProgress();
- }
-
-}
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/runner/package-info.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/runner/package-info.java
deleted file mode 100644
index 8be767d67cf..00000000000
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/runner/package-info.java
+++ /dev/null
@@ -1,2 +0,0 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.vespa.http.client.runner;