diff options
Diffstat (limited to 'vespa-http-client/src/main/java/com')
55 files changed, 0 insertions, 6605 deletions
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/FeedClient.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/FeedClient.java deleted file mode 100644 index d9ff09552ef..00000000000 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/FeedClient.java +++ /dev/null @@ -1,143 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.vespa.http.client; - -import com.yahoo.vespa.http.client.core.JsonReader; -import com.yahoo.vespa.http.client.core.XmlFeedReader; - -import java.io.InputStream; -import java.util.concurrent.atomic.AtomicInteger; - -/** - * API for feeding document operations (add, removes or updates) to one or many Vespa clusters. - * Use the factory to configure and set up an instance of this. Instances are expensive - create one instance of this - * and use it for all feed operations (from multiple threads, if desired) for the duration of your client runtime. - * The feedclient does automatic error recovery and reconnects to hosts when connections die. - * - * A {@link FeedClientFactory} is provided to instantiate Sessions. - * - * See com.yahoo.text.Text.stripInvalidCharacters(String) to remove invalid characters from string fields before feeding - * - * Instances of this are multithread safe. - * - * @author dybis - * @see FeedClientFactory - * @deprecated Vespa-http-client will be removed in Vespa 8. It's replaced by <a href="https://docs.vespa.ai/en/vespa-feed-client.html">vespa-feed-client</a> - */ -@Deprecated -public interface FeedClient extends AutoCloseable { - - /** - * Issues a document operation to the configured cluster(s). - * If the pipeline and buffers are full, this call will be blocking, ensuring that operations are not - * produced faster than the can be handled. Transient failured are retried internally by this client. - * Exactly one callback will always be received for each (completed) call to this. - * - * @param documentId the document id of the document - * @param documentData the document data as JSON or XML (as specified when using the factory to create the API) - */ - default void stream(String documentId, CharSequence documentData) { - stream(documentId, documentData, null); - } - - /** - * Issues a document operation to the configured cluster(s). - * If the pipeline and buffers are full, this call will be blocking, ensuring that operations are not - * produced faster than the can be handled. Transient failured are retried internally by this client. - * Exactly one callback will always be received for each (completed) call to this. - * - * @param documentId the document id of the document - * @param documentData the document data as JSON or XML (as specified when using the factory to create the API) - * @param context a context object which will be accessible in the result of the callback, or null if none - */ - default void stream(String documentId, CharSequence documentData, Object context) { - stream(documentId, null, documentData, context); - } - - /** - * Issues a document operation to the configured cluster(s). - * If the pipeline and buffers are full, this call will be blocking, ensuring that operations are not - * produced faster than the can be handled. Transient failures are retried internally by this client. - * Exactly one callback will always be received for each (completed) call to this. - * - * @param documentId the document id of the document - * @param operationId the id to use for this operation, or null to let the client decide an operation id. - * This id must be unique for every operation. Passing the operation id allows clients - * to prepare to receive a response for it before issuing the operation to the client. - * @param documentData the document data as JSON or XML (as specified when using the factory to create the API) - * @param context a context object which will be accessible in the result of the callback, or null if none - */ - void stream(String documentId, String operationId, CharSequence documentData, Object context); - - /** - * Waits for all results to arrive and closes the FeedClient. Don't call any other method after calling close(). - * Does not throw any exceptions. - */ - @Override - void close(); - - /** - * Returns stats about the cluster - * - * @return JSON string with information about cluster - */ - String getStatsAsJson(); - - /** - * Utility function that takes an array of JSON documents and calls the FeedClient for each element. - * - * @param inputStream the stream to feed. This can be a very large stream. - * The outer element must be an array of document operations. - * @param feedClient the feed client that will receive the document operations - * @param numSent increased per document sent to API (but not waiting for results) - */ - static void feedJson(InputStream inputStream, FeedClient feedClient, AtomicInteger numSent) { - JsonReader.read(inputStream, feedClient, numSent); - } - - /** - * Utility function that takes an array of XML documents and calls the FeedClient for each element. - * The XML document has to be formatted with line space on each line (like "regular" XML, but stricter - * than the specifications of XML). - * - * @param inputStream the stream to feed. This can be a very large stream. Operations must be enclosed in a - * top-level <vespafeed> tag - * @param feedClient the feed client that will receive the document operations - * @param numSent increased per document sent to API (but not waiting for results) - */ - static void feedXml(InputStream inputStream, FeedClient feedClient, AtomicInteger numSent) { - try { - XmlFeedReader.read(inputStream, feedClient, numSent); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - - /** - * This callback is executed when new results are arriving or an error occur. - * Don't do any heavy lifting in this thread (no IO, disk, or heavy CPU usage). - * This call back will run in a different thread than your main program so use e.g. - * AtomicInteger for counters and follow general guides for thread-safe programming. - * There is an example implementation in class SimpleLoggerResultCallback. - */ - interface ResultCallback { - - /** - * This callback is always called exactly once for each feed operation passed to the client - * instance, whether or not it was successful. - */ - void onCompletion(String docId, Result documentResult); - - /** - * Called with an exception whenever an endpoint specific error occurs during feeding. - * The error may or may not be transient - the operation will in both cases be retried until it's successful. - * This callback is intended for application level monitoring (logging, metrics, altering etc). - * Document specific errors will be reported back through {@link #onCompletion(String, Result)}. - * - * @see FeedEndpointException - * @param exception an exception specifying endpoint and cause. See {@link FeedEndpointException} for details. - */ - default void onEndpointException(FeedEndpointException exception) {} - - } - -} diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/FeedClientFactory.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/FeedClientFactory.java deleted file mode 100644 index ce2f9e0b140..00000000000 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/FeedClientFactory.java +++ /dev/null @@ -1,60 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.vespa.http.client; - - -import com.yahoo.vespa.http.client.config.SessionParams; -import com.yahoo.vespa.http.client.core.api.FeedClientImpl; - -import java.time.Clock; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.ThreadFactory; - -/** - * Factory for creating FeedClient. - * - * @author dybis - * @deprecated Vespa-http-client will be removed in Vespa 8. It's Vespa-http-client will be removed in Vespa 8. It's replaced by <a href="https://docs.vespa.ai/en/vespa-feed-client.html">vespa-feed-client</a> - */ -@Deprecated -public class FeedClientFactory { - - /** - * Creates a FeedClient. Call this sparingly: Feed clients are expensive and should be as long-lived as possible. - * - * @param sessionParams parameters for connection, hosts, cluster configurations and more. - * @param resultCallback on each result, this callback is called. - * @return newly created FeedClient API object. - */ - public static FeedClient create(SessionParams sessionParams, FeedClient.ResultCallback resultCallback) { - return new FeedClientImpl(sessionParams, resultCallback, createTimeoutExecutor(), Clock.systemUTC()); - } - - static ScheduledThreadPoolExecutor createTimeoutExecutor() { - ScheduledThreadPoolExecutor timeoutExecutor; - timeoutExecutor = new ScheduledThreadPoolExecutor(1, new DaemonThreadFactory("timeout-")); - timeoutExecutor.setRemoveOnCancelPolicy(true); - timeoutExecutor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false); - timeoutExecutor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false); - return timeoutExecutor; - } - - private static class DaemonThreadFactory implements ThreadFactory { - - private final ThreadFactory defaultThreadFactory = Executors.defaultThreadFactory(); - private final String prefix; - - private DaemonThreadFactory(String prefix) { - this.prefix = prefix; - } - - @Override - public Thread newThread(Runnable runnable) { - Thread t = defaultThreadFactory.newThread(runnable); - t.setDaemon(true); - t.setName(prefix + t.getName()); - return t; - } - } - -} diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/FeedConnectException.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/FeedConnectException.java deleted file mode 100644 index 822a670ae08..00000000000 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/FeedConnectException.java +++ /dev/null @@ -1,25 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.vespa.http.client; - -import com.yahoo.vespa.http.client.config.Endpoint; - -/** - * An exception thrown when the client is unable to connect to a feed endpoint. - * - * @author bjorncs - * @deprecated Vespa-http-client will be removed in Vespa 8. It's replaced by <a href="https://docs.vespa.ai/en/vespa-feed-client.html">vespa-feed-client</a> - */ -@Deprecated -public class FeedConnectException extends FeedEndpointException { - - public FeedConnectException(Throwable cause, Endpoint endpoint) { - super(createMessage(cause, endpoint), cause, endpoint); - } - - private static String createMessage(Throwable cause, Endpoint endpoint) { - return String.format("Handshake to endpoint '%s:%d' failed: %s", - endpoint.getHostname(), - endpoint.getPort(), - cause.getMessage()); - } -} diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/FeedEndpointException.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/FeedEndpointException.java deleted file mode 100644 index 304e2ea321b..00000000000 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/FeedEndpointException.java +++ /dev/null @@ -1,28 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.vespa.http.client; - -import com.yahoo.vespa.http.client.config.Endpoint; - -/** - * An exception type for endpoint specific errors. - * - * @see FeedConnectException - * @see FeedProtocolException - * @author bjorncs - * @deprecated Vespa-http-client will be removed in Vespa 8. It's replaced by <a href="https://docs.vespa.ai/en/vespa-feed-client.html">vespa-feed-client</a> - */ -@Deprecated -public abstract class FeedEndpointException extends RuntimeException { - - private final Endpoint endpoint; - - protected FeedEndpointException(String message, Throwable cause, Endpoint endpoint) { - super(message, cause); - this.endpoint = endpoint; - } - - public Endpoint getEndpoint() { - return endpoint; - } - -} diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/FeedProtocolException.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/FeedProtocolException.java deleted file mode 100644 index 93041cced1c..00000000000 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/FeedProtocolException.java +++ /dev/null @@ -1,44 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.vespa.http.client; - -import com.yahoo.vespa.http.client.config.Endpoint; - -/** - * An exception thrown when a feed endpoint returns an error during feeding. - * - * @author bjorncs - * @deprecated Vespa-http-client will be removed in Vespa 8. It's replaced by <a href="https://docs.vespa.ai/en/vespa-feed-client.html">vespa-feed-client</a> - */ -@Deprecated -public class FeedProtocolException extends FeedEndpointException { - - private final int httpStatusCode; - private final String httpResponseMessage; - - public FeedProtocolException(int httpStatusCode, - String httpResponseMessage, - Throwable cause, - Endpoint endpoint) { - super(createMessage(httpStatusCode, httpResponseMessage, endpoint), cause, endpoint); - this.httpStatusCode = httpStatusCode; - this.httpResponseMessage = httpResponseMessage; - } - - private static String createMessage(int httpStatusCode, - String httpResponseMessage, - Endpoint endpoint) { - return String.format("Endpoint '%s:%d' returned an error on handshake: %d - %s", - endpoint.getHostname(), - endpoint.getPort(), - httpStatusCode, - httpResponseMessage); - } - - public int getHttpStatusCode() { - return httpStatusCode; - } - - public String getHttpResponseMessage() { - return httpResponseMessage; - } -} diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/Result.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/Result.java deleted file mode 100644 index 5db592da02f..00000000000 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/Result.java +++ /dev/null @@ -1,152 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.vespa.http.client; - -import com.yahoo.vespa.http.client.config.Endpoint; -import com.yahoo.vespa.http.client.core.Document; -import com.yahoo.vespa.http.client.core.Exceptions; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.List; - -/** - * The result of a stream operation. A Result refers to a single document, - * but may contain more than one Result.Detail instances, as these pertains to a - * single endpoint, and a Result may wrap data for multiple endpoints. - * - * @author Einar M R Rosenvinge - * @deprecated Vespa-http-client will be removed in Vespa 8. It's replaced by <a href="https://docs.vespa.ai/en/vespa-feed-client.html">vespa-feed-client</a> - */ -@Deprecated -public class Result { - - public enum ResultType { - OPERATION_EXECUTED, - TRANSITIVE_ERROR, - CONDITION_NOT_MET, - FATAL_ERROR - } - - private final Document document; - private final boolean success; - private final List<Detail> details; - private final String localTrace; - - public Result(Document document, Collection<Detail> values, StringBuilder localTrace) { - this.document = document; - this.details = Collections.unmodifiableList(new ArrayList<>(values)); - this.success = details.stream().allMatch(d -> d.getResultType() == ResultType.OPERATION_EXECUTED); - this.localTrace = localTrace == null ? null : localTrace.toString(); - } - - /** Returns the document id that this result is for */ - public String getDocumentId() { - return document.getDocumentId(); - } - - /** Returns the id of the operation this is the result of */ - public String getOperationId() { return document.getOperationId(); } - - /** Returns the document data */ - public CharSequence getDocumentDataAsCharSequence() { - return document.getDataAsString(); - } - - /** Returns the context of the object if any */ - public Object getContext() { - return document.getContext(); - } - - /** - * Returns true if the operation(s) was successful. If at least one {@link Detail} - * in {@link #getDetails()} is unsuccessful, this will return false. - */ - public boolean isSuccess() { - return success; - } - public boolean isSuccessOrConditionNotMet() { - return isSuccess() || - details.stream().allMatch(d -> d.getResultType() == Result.ResultType.OPERATION_EXECUTED || - d.getResultType() == Result.ResultType.CONDITION_NOT_MET); - } - - public List<Detail> getDetails() { return details; } - - /** Returns whether the operation has been set up with local tracing */ - public boolean hasLocalTrace() { - return localTrace != null; - } - - /** Information in a Result for a single operation sent to a single endpoint. */ - public static final class Detail { - - private final ResultType resultType; - private final Endpoint endpoint; - private final Exception exception; - private final String traceMessage; - - public Detail(Endpoint endpoint, ResultType resultType, String traceMessage, Exception e) { - this.endpoint = endpoint; - this.resultType = resultType; - this.exception = e; - this.traceMessage = traceMessage; - } - - public Detail(Endpoint endpoint) { - this.endpoint = endpoint; - this.resultType = ResultType.OPERATION_EXECUTED; - this.exception = null; - this.traceMessage = null; - } - - /** - * Returns the endpoint from which the result was received, - * or null if this failed before being assigned an endpoint - */ - public Endpoint getEndpoint() { - return endpoint; - } - - /** Returns whether the operation was successful */ - public boolean isSuccess() { - return resultType == ResultType.OPERATION_EXECUTED; - } - - /** Returns the result of the operation */ - public ResultType getResultType() { - return resultType; - } - - /** Returns any exception related to this Detail, if unsuccessful. Might be null. */ - public Exception getException() { - return exception; - } - - /** Returns any trace message produces, or null if none */ - public String getTraceMessage() { - return traceMessage; - } - - @Override - public String toString() { - StringBuilder b = new StringBuilder(); - b.append("Detail "); - b.append("resultType=").append(resultType); - if (exception != null) - b.append(" exception='").append(Exceptions.toMessageString(exception)).append("'"); - if (traceMessage != null && ! traceMessage.isEmpty()) - b.append(" trace='").append(traceMessage).append("'"); - if (endpoint != null) - b.append(" endpoint=").append(endpoint); - return b.toString(); - } - - } - - @Override - public String toString() { - return "Result for " + document + " " + (localTrace != null ? localTrace : ""); - } - -} diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/Session.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/Session.java deleted file mode 100644 index 203f2d3b462..00000000000 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/Session.java +++ /dev/null @@ -1,65 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.vespa.http.client; - -import java.io.OutputStream; -import java.util.concurrent.BlockingQueue; - -/** - * A Session is an entity used to feed operations (like documents, removes or updates) to one Vespa - * cluster or several clusters in parallel. Current implementations are fail-fast, i.e. all feeding - * errors are propagated to the user as quickly as possible and with as much detail as possible. - * - * Implementations of this interface are required to be thread safe. - * - * A {@link SessionFactory} is provided to instantiate Sessions. - * - * @author Einar M R Rosenvinge - * @see SessionFactory - * @deprecated Vespa-http-client will be removed in Vespa 8. It's replaced by <a href="https://docs.vespa.ai/en/vespa-feed-client.html">vespa-feed-client</a> - */ -@Deprecated -public interface Session extends AutoCloseable { - - /** - * Returns an OutputStream that can be used to write ONE operation, identified by the - * given document ID. The data format must match the - * {@link com.yahoo.vespa.http.client.config.FeedParams.DataFormat} given when this - * Session was instantiated. Note that most data formats include the document ID in the - * actual buffer, which <em>must</em> match the document ID given as a parameter to this - * method. It is (as always) important to close the OutputStream returned - nothing - * is written to the wire until this is done. Note also that the Session holds a certain, - * dynamically determined maximum number of document operations in memory. - * When this threshold is reached, {@link java.io.OutputStream#close()} will block. - * - * - * @param documentId the unique ID identifying this operation in the system - * @return an OutputStream to write the operation payload into - */ - OutputStream stream(CharSequence documentId); - - /** - * Returns {@link Result}s for all operations enqueued by {@link #stream(CharSequence)}. - * Note that the order of results is non-deterministic, with <em>one</em> exception - results - * for one document ID are returned in the order they were enqueued. In all other cases - * Results may appear out-of-order. - * - * @return a blocking queue for retrieving results - * @see Result - */ - BlockingQueue<Result> results(); - - /** - * Closes this Session. All resources are freed, persistent connections are closed and - * internal threads are stopped. - * - * @throws RuntimeException in cases where underlying resources throw on shutdown/close - */ - void close(); - - /** - * Returns stats about the cluster. - * @return JSON string with information about cluster. - */ - String getStatsAsJson(); - -} diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/SessionFactory.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/SessionFactory.java deleted file mode 100644 index ef231ccc713..00000000000 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/SessionFactory.java +++ /dev/null @@ -1,82 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.vespa.http.client; - -import com.yahoo.vespa.http.client.config.Cluster; -import com.yahoo.vespa.http.client.config.Endpoint; -import com.yahoo.vespa.http.client.config.SessionParams; - -import java.time.Clock; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.ThreadFactory; - -/** - * Factory for creating {@link Session} instances. - * - * @author Einar M R Rosenvinge - * @deprecated Vespa-http-client will be removed in Vespa 8. It's replaced by <a href="https://docs.vespa.ai/en/vespa-feed-client.html">vespa-feed-client</a> - */ -@Deprecated -public final class SessionFactory { - - /** - * Creates a {@link Session} with the given parameters. - * - * @param params the parameters to use when creating the Session. - * @return a new Session instance - */ - public static Session create(SessionParams params) { - return createInternal(params); - } - - @SuppressWarnings("deprecation") - static Session createInternal(SessionParams params) { - return new com.yahoo.vespa.http.client.core.api.SessionImpl(params, createTimeoutExecutor(), Clock.systemUTC()); - } - - static ScheduledThreadPoolExecutor createTimeoutExecutor() { - ScheduledThreadPoolExecutor timeoutExecutor; - timeoutExecutor = new ScheduledThreadPoolExecutor(1, new DaemonThreadFactory("timeout-")); - timeoutExecutor.setRemoveOnCancelPolicy(true); - timeoutExecutor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false); - timeoutExecutor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false); - return timeoutExecutor; - } - - /** - * Creates a {@link Session} to a single {@link Endpoint}, with default values for everything. - * For full control of all parameters, or to feed to more than one Endpoint or more than one {@link Cluster}, - * see {@link #create(com.yahoo.vespa.http.client.config.SessionParams)}. - * - * @param endpoint the Endpoint to feed to. - * @return a new Session instance - * @see #create(com.yahoo.vespa.http.client.config.SessionParams) - */ - public static Session create(Endpoint endpoint) { - return createInternal(endpoint); - } - - static Session createInternal(Endpoint endpoint) { - SessionParams params = new SessionParams.Builder().addCluster( - new Cluster.Builder().addEndpoint(endpoint).build()).build(); - return create(params); - } - - private static class DaemonThreadFactory implements ThreadFactory { - private final ThreadFactory defaultThreadFactory = Executors.defaultThreadFactory(); - private final String prefix; - - public DaemonThreadFactory(String prefix) { - this.prefix = prefix; - } - - @Override - public Thread newThread(Runnable runnable) { - Thread t = defaultThreadFactory.newThread(runnable); - t.setDaemon(true); - t.setName(prefix + t.getName()); - return t; - } - } - -} diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/SimpleLoggerResultCallback.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/SimpleLoggerResultCallback.java deleted file mode 100644 index e03bfd2b816..00000000000 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/SimpleLoggerResultCallback.java +++ /dev/null @@ -1,113 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.vespa.http.client; - -import java.time.Duration; -import java.time.Instant; -import java.util.Date; -import java.util.Locale; -import java.util.concurrent.atomic.AtomicInteger; - -/** - * Simple implementation of the ResultCallback that logs to std err for every X documents: - * "Result received: 34 (1 failed so far, 2003 sent, success rate 1999.23 docs/sec)." - * On each failure it will print the Result object content. If tracing is enabled, it will print trace messages to - * std err as well. - * - * @author dybis - * @deprecated Vespa-http-client will be removed in Vespa 8. It's replaced by <a href="https://docs.vespa.ai/en/vespa-feed-client.html">vespa-feed-client</a> - */ -@Deprecated -public class SimpleLoggerResultCallback implements FeedClient.ResultCallback { - - private final Object monitor = new Object(); - private int resultCounter = 0; - private int failureCounter = 0; - private final AtomicInteger sentDocumentCounter; - private final int printStatsForEveryXDocument; - private final boolean ignoreConditionNotMet; - private Instant startSampleInstant = Instant.now(); - private int startSampleResultCount = 0; - - protected void println(String output) { - System.err.println(output); - } - - /** - * Constructor - * - * @param sentDocumentCounter a counter that is increased outside this class, but can be nice to print here. - * @param printStatsForEveryXDocument how often to print stats. - */ - public SimpleLoggerResultCallback(AtomicInteger sentDocumentCounter, int printStatsForEveryXDocument, boolean ignoreConditionNotMet) { - this.sentDocumentCounter = sentDocumentCounter; - this.printStatsForEveryXDocument = printStatsForEveryXDocument; - this.ignoreConditionNotMet = ignoreConditionNotMet; - } - - /** - * Prints how many documents that are received, failed and sent. - */ - public void printProgress() { - synchronized (monitor) { - DocumentRate docRate = newSamplingPeriod(Instant.now()); - println(new Date() + " Result received: " + resultCounter - + " (" + failureCounter + " failed so far, " + sentDocumentCounter.get() - + " sent, success rate " + docRate + ")."); - } - } - - static class DocumentRate { - public final double rate; - DocumentRate(double rate) { - this.rate = rate; - } - @Override - public String toString() { - return String.format(Locale.US, "%.2f docs/sec", rate); - } - } - - /* - * Returns success results per second for last interval and resets variables. - */ - protected DocumentRate newSamplingPeriod(Instant now) { - double docsDelta = resultCounter - failureCounter - startSampleResultCount; - Duration duration = Duration.between(startSampleInstant, now); - startSampleInstant = now; - startSampleResultCount = resultCounter - failureCounter; - long durationMilliSecs = duration.toMillis() + 1; // +1 to avoid division by zero - return new DocumentRate(1000. * docsDelta / durationMilliSecs); - } - - int getResultCount() { - synchronized (monitor) { - return resultCounter; - } - } - - int getFailedDocumentCount() { - synchronized (monitor) { - return failureCounter; - } - } - - @Override - public void onCompletion(String docId, Result documentResult) { - synchronized (monitor) { - if (printStatsForEveryXDocument > 0 && (resultCounter % printStatsForEveryXDocument) == 0) { - printProgress(); - } - resultCounter++; - boolean success = ignoreConditionNotMet - ? documentResult.isSuccessOrConditionNotMet() - : documentResult.isSuccess(); - if ( ! success ) { - failureCounter++; - println("Failure: " + documentResult + (documentResult.getDetails().isEmpty() ? "" : ":")); - for (Result.Detail detail : documentResult.getDetails()) - println(" " + detail); - } - } - } - -} diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/SyncFeedClient.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/SyncFeedClient.java deleted file mode 100644 index 2fc63bbbcc5..00000000000 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/SyncFeedClient.java +++ /dev/null @@ -1,193 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.vespa.http.client; - -import com.yahoo.vespa.http.client.config.SessionParams; - -import java.math.BigInteger; -import java.util.ArrayList; -import java.util.ConcurrentModificationException; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Objects; -import java.util.concurrent.ThreadLocalRandom; - -/** - * A utility wrapper of a FeedClient which feeds a list of documents and blocks until all responses are returned, - * before returning the results. - * - * Not multithread safe: A sync feed client instance can only be used by a single thread - * (but it can and should be reused for multiple subsequent synchronous calls). - * - * @author bratseth - * @deprecated Vespa-http-client will be removed in Vespa 8. It's replaced by <a href="https://docs.vespa.ai/en/vespa-feed-client.html">vespa-feed-client</a> - */ -@Deprecated -public class SyncFeedClient implements AutoCloseable { - - private final FeedClient wrappedClient; - private final Callback callback; - - public SyncFeedClient(SessionParams params) { - callback = new SyncFeedClient.Callback(); - if (params.getFeedParams().getIdlePollFrequency() == null) { - params = params.toBuilder() - .setFeedParams(params.getFeedParams().toBuilder() - .setIdlePollFrequency(500.0) - .build()) - .build(); - } - this.wrappedClient = FeedClientFactory.create(params, callback); - } - - /** - * Calls FeedClient.stream for each entry in the list, blocks until all results are ready and returns them. - * This will block for at most the time it takes to feed these operations + clientTimeout given in the - * sessions params when creating this. - * - * @param operations the Vespa write operations to stream - * @return the result of feeding all these operations - */ - public SyncResult stream(List<SyncOperation> operations) { - callback.expectResultsOf(operations); - for (SyncOperation operation : operations) - wrappedClient.stream(operation.documentId, operation.operationId, operation.documentData, operation.context); - return callback.waitForResults(); - } - - @Override - public void close() { - wrappedClient.close(); - } - - /** Holds the arguments to a single stream operation */ - public static class SyncOperation { - - private final String documentId; - private final CharSequence documentData; - private final Object context; - - /** Operation id passed on to the Document created from this */ - private final String operationId; - - public SyncOperation(String documentId, CharSequence documentData) { - this(documentId, documentData, null); - } - - public SyncOperation(String documentId, CharSequence documentData, Object context) { - this(documentId, documentData, new BigInteger(64, ThreadLocalRandom.current()).toString(32), context); - } - - public SyncOperation(String documentId, CharSequence documentData, String operationId, Object context) { - this.documentId = Objects.requireNonNull(documentId, "documentId"); - this.documentData = Objects.requireNonNull(documentData, "documentData"); - this.context = context; - this.operationId = Objects.requireNonNull(operationId); - } - - } - - /** - * The result of a SyncFeedClient.stream call. This always holds exactly one Result per SyncOperation - * attempted, and the results are guaranteed to be returned in the same order as in the List of SyncOperations. - */ - public static class SyncResult { - - private final Exception exception; - private final List<Result> results; - - private SyncResult(List<Result> results, Exception exception) { - this.results = results; - this.exception = exception; - } - - /** - * Returns the results of this. This has the same size and order as the List of SyncOperations that - * created this. The list returned is modifiable and owned by the client. Multiple calls to this returns the - * same list instance. - */ - public List<Result> results() { return results; } - - /** - * Returns the last exception received when attempting the operations this is the result of, or null if none. - * Even if there is an exception, results() will return one Result per operation attempted. - */ - public Exception exception() { return exception; } - - /** Returns true if all Results in this are successful */ - public boolean isSuccess() { - return results.stream().allMatch(Result::isSuccess); - } - - } - - private static class Callback implements FeedClient.ResultCallback { - - private final Object monitor = new Object(); - - // The rest of the state of this is reset each time we call expectResultsOf - - private int resultsReceived; - private Exception exception = null; - - /** - * A map from operation ids to their results. This is initially populated with null values to keep track of - * which responses we are waiting for. - */ - private LinkedHashMap<String, Result> results = null; - - void expectResultsOf(List<SyncOperation> operations) { - synchronized (monitor) { - if (results != null) - throw new ConcurrentModificationException("A SyncFeedClient instance is used by multiple threads"); - - resultsReceived = 0; - exception = null; - results = new LinkedHashMap<>(operations.size()); - for (SyncOperation operation : operations) - results.put(operation.operationId, null); - } - } - - SyncResult waitForResults() { - try { - synchronized (monitor) { - while ( ! complete()) - monitor.wait(); - - SyncResult syncResult = new SyncResult(new ArrayList<>(results.values()), exception); - results = null; - return syncResult; - } - } - catch (InterruptedException e) { - throw new RuntimeException("Interrupted while waiting for feeding results", e); - } - } - - @Override - public void onCompletion(String docId, Result documentResult) { - synchronized (monitor) { - if ( ! results.containsKey(documentResult.getOperationId())) return; // Stale result - ignore - - Result previousValue = results.put(documentResult.getOperationId(), documentResult); - if (previousValue != null) - throw new IllegalStateException("Received duplicate result for " + docId); - - resultsReceived++; - if (complete()) - monitor.notifyAll(); - } - } - - @Override - public void onEndpointException(FeedEndpointException exception) { - this.exception = exception; // We will still receive one onCompletion per stream invocation done - } - - private boolean complete() { - return resultsReceived == results.size(); - } - - } - -} diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/Cluster.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/Cluster.java deleted file mode 100644 index 4f72c380e59..00000000000 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/Cluster.java +++ /dev/null @@ -1,72 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.vespa.http.client.config; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.LinkedList; -import java.util.List; - -/** - * A set of {@link Endpoint} instances. Construct using {@link Cluster.Builder}. - * - * @author Einar M R Rosenvinge - */ -public final class Cluster { - - /** Builder for {@link Cluster}. */ - public final static class Builder { - private final List<Endpoint> endpoints = new LinkedList<>(); - private String route = null; - - /** - * Adds an Endpoint (a HTTP gateway) to this Cluster. - * - * @param endpoint the Endpoint to add - * @return this, for chaining - */ - public Builder addEndpoint(Endpoint endpoint) { - endpoints.add(endpoint); - return this; - } - - /** - * Sets a route specific to this cluster, which overrides the route set in {@link com.yahoo.vespa.http.client.config.FeedParams#getRoute()}. - * - * @param route a route specific to this cluster - * @return this, for chaining - */ - public Builder setRoute(String route) { - this.route = route; - return this; - } - - public Cluster build() { - return new Cluster(endpoints, route); - } - - public String getRoute() { - return route; - } - } - private final List<Endpoint> endpoints; - private final String route; - - private Cluster(List<Endpoint> endpoints, String route) { - this.endpoints = Collections.unmodifiableList(new ArrayList<>(endpoints)); - this.route = route; - } - - public List<Endpoint> getEndpoints() { - return endpoints; - } - - public String getRoute() { - return route; - } - - @Override - public String toString() { - return "cluster with endpoints " + endpoints + " and route '" + route + "'"; - } - -} diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/ConnectionParams.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/ConnectionParams.java deleted file mode 100644 index 00cc2512ae3..00000000000 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/ConnectionParams.java +++ /dev/null @@ -1,486 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.vespa.http.client.config; - -import com.fasterxml.jackson.annotation.JsonIgnore; -import com.google.common.collect.ArrayListMultimap; -import com.google.common.collect.Multimap; -import org.apache.http.conn.ssl.SSLConnectionSocketFactory; - -import javax.net.ssl.HostnameVerifier; -import javax.net.ssl.SSLContext; -import java.nio.file.Path; -import java.time.Duration; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; -import java.util.Objects; -import java.util.concurrent.TimeUnit; - -/** - * Connection level parameters. - * This class is immutable - * and has no public constructor - to instantiate one, use a {@link Builder}. - * - * @author Einar M R Rosenvinge - */ -public final class ConnectionParams { - - /** - * Builder for {@link ConnectionParams}. - */ - public static final class Builder { - - private SSLContext sslContext = null; - private HostnameVerifier hostnameVerifier = SSLConnectionSocketFactory.getDefaultHostnameVerifier(); - private final Multimap<String, String> headers = ArrayListMultimap.create(); - private final Map<String, HeaderProvider> headerProviders = new HashMap<>(); - private int numPersistentConnectionsPerEndpoint = 1; - private String proxyHost = null; - private int proxyPort = 8080; - private boolean useCompression = false; - private int maxRetries = 100; - private long minTimeBetweenRetriesMs = 700; - private boolean dryRun = false; - private boolean runThreads = true; - private int traceLevel = 0; - private int traceEveryXOperation = 0; - private boolean printTraceToStdErr = true; - private boolean useTlsConfigFromEnvironment = false; - private Duration connectionTimeToLive = Duration.ofSeconds(30); - private Path privateKey; - private Path certificate; - private Path caCertificates; - - /** - * Use TLS configuration through the standard Vespa environment variables. - * Setting this to 'true' will override any other TLS/HTTPS related configuration. - */ - public Builder setUseTlsConfigFromEnvironment(boolean useTlsConfigFromEnvironment) { - this.useTlsConfigFromEnvironment = useTlsConfigFromEnvironment; - return this; - } - - /** - * Sets the SSLContext for the connection to the gateway when SSL is enabled for Endpoint. - * Default null (no ssl). See also Endpoint configuration. - * - * @param sslContext sslContext for connection to gateway. - * @return pointer to builder. - */ - public Builder setSslContext(SSLContext sslContext) { - this.sslContext = sslContext; - return this; - } - - /** - * Sets the {@link HostnameVerifier} for the connection to the gateway when SSL is enabled for Endpoint. - * Defaults to instance returned by {@link SSLConnectionSocketFactory#getDefaultHostnameVerifier()}. - * - * @param hostnameVerifier hostname verifier for connection to gateway. - * @return pointer to builder. - */ - public Builder setHostnameVerifier(HostnameVerifier hostnameVerifier) { - this.hostnameVerifier = hostnameVerifier; - return this; - } - - /** - * Set path to private key and certificate files. Both the private key and certificate must be PEM-encoded. - */ - public Builder setCertificateAndPrivateKey(Path privateKey, Path certificate) { - this.privateKey = privateKey; - this.certificate = certificate; - return this; - } - - /** - * Set path a PEM file containing the CA certificates. - */ - public Builder setCaCertificates(Path caCertificates) { - this.caCertificates = caCertificates; - return this; - } - - /** - * Set custom headers to be used - * - * @param key header name - * @param value header value - * @return pointer to builder. - */ - public Builder addHeader(String key, String value) { - headers.put(key, value); - return this; - } - - /** - * Adds a header provider for dynamic headers; headers where the value may change during a feeding session - * (e.g. security tokens with limited life time). Only one {@link HeaderProvider} is allowed for a given header name. - * - * @param provider A provider for a dynamic header - * @return pointer to builder. - * @throws IllegalArgumentException if a provider is already registered for the given header name - */ - public Builder addDynamicHeader(String headerName, HeaderProvider provider) { - Objects.requireNonNull(headerName, "Header name cannot be null"); - Objects.requireNonNull(provider, "Header provider cannot be null"); - if (headerProviders.containsKey(headerName)) { - throw new IllegalArgumentException("Provider already registered for name '" + headerName + "'"); - } - headerProviders.put(headerName, provider); - return this; - } - - /** - * The number of connections between the http client and the gateways. A very low number can result - * in the network not fully utilized and the round-trip time can be a limiting factor. A low number - * can cause skew in distribution of load between gateways. A too high number will cause - * many threads to run, more context switching and potential more memory usage. We recommend using about - * 16 connections per gateway. - * - * @param numPersistentConnectionsPerEndpoint number of channels per endpoint - * @return pointer to builder. - */ - public Builder setNumPersistentConnectionsPerEndpoint(int numPersistentConnectionsPerEndpoint) { - this.numPersistentConnectionsPerEndpoint = numPersistentConnectionsPerEndpoint; - return this; - } - - /** - * Sets the HTTP proxy host name to use. - * - * @param proxyHost host name for proxy. - * @return pointer to builder. - */ - public Builder setProxyHost(String proxyHost) { - this.proxyHost = proxyHost; - return this; - } - - /** - * Sets the HTTP proxy host port to use. - * - * @param proxyPort host port for proxy. - * @return pointer to builder. - */ - public Builder setProxyPort(int proxyPort) { - this.proxyPort = proxyPort; - return this; - } - - /** - * Set whether compression of document operations during communication to server should be enabled. - * - * @param useCompression true if compression should be enabled. - * @return pointer to builder. - */ - public Builder setUseCompression(boolean useCompression) { - this.useCompression = useCompression; - return this; - } - - /** - * Set how many times to retry sending an operation to a gateway when encountering transient problems. - * - * @param maxRetries max number of retries - * @return pointer to builder. - */ - public Builder setMaxRetries(int maxRetries) { - this.maxRetries = maxRetries; - return this; - } - - /** - * Set to true to skip making network connections and instead - * let requests complete successfully with no effect. - */ - public Builder setDryRun(boolean dryRun) { - this.dryRun = dryRun; - return this; - } - - /** - * Set to false to skip starting io threads, such that any operation must be driven by a calling thread. - * Useful for testing. - */ - public Builder setRunThreads(boolean runThreads) { - this.runThreads = runThreads; - return this; - } - - /** - * Set the min time between retries when temporarily failing against a gateway. - * - * @param minTimeBetweenRetries the min time value - * @param unit the unit of the min time. - * @return pointer to builder. - */ - public Builder setMinTimeBetweenRetries(long minTimeBetweenRetries, TimeUnit unit) { - this.minTimeBetweenRetriesMs = unit.toMillis(minTimeBetweenRetries); - return this; - } - - public long getMinTimeBetweenRetriesMs() { - return minTimeBetweenRetriesMs; - } - - /** - * Sets the trace level for tracing messagebus. 0 means to tracing. - * - * @param traceLevel tracelevel, larger value means more tracing. - * @return pointer to builder. - */ - public Builder setTraceLevel(int traceLevel) { - this.traceLevel = traceLevel; - return this; - } - - /** - * How often to trace messages in client. Please note that this does not affect tracing with messagebus - * - * @param traceEveryXOperation if zero, no tracing, 1 = every message, and so on. - * @return pointer to builder. - */ - public Builder setTraceEveryXOperation(int traceEveryXOperation) { - this.traceEveryXOperation = traceEveryXOperation; - return this; - } - - /** - * If enabled will write internal trace to stderr. - * - * @param printTraceToStdErr if value is true it is enabled. - * @return pointer to builder. - */ - public Builder setPrintTraceToStdErr(boolean printTraceToStdErr) { - this.printTraceToStdErr = printTraceToStdErr; - return this; - } - - /** - * Set the maximum time to live for persistent connections - */ - public Builder setConnectionTimeToLive(Duration connectionTimeToLive) { - this.connectionTimeToLive = connectionTimeToLive; - return this; - } - - public ConnectionParams build() { - return new ConnectionParams( - sslContext, - privateKey, - certificate, - caCertificates, - hostnameVerifier, - headers, - headerProviders, - numPersistentConnectionsPerEndpoint, - proxyHost, - proxyPort, - useCompression, - maxRetries, - minTimeBetweenRetriesMs, - dryRun, - runThreads, - traceLevel, - traceEveryXOperation, - printTraceToStdErr, - useTlsConfigFromEnvironment, - connectionTimeToLive); - } - - public int getNumPersistentConnectionsPerEndpoint() { - return numPersistentConnectionsPerEndpoint; - } - - public String getProxyHost() { - return proxyHost; - } - - public boolean isDryRun() { - return dryRun; - } - - public boolean runThreads() { return runThreads; } - - public int getMaxRetries() { - return maxRetries; - } - public int getTraceLevel() { - return traceLevel; - } - public int getTraceEveryXOperation() { - return traceEveryXOperation; - } - - public boolean getPrintTraceToStdErr() { - return printTraceToStdErr; - } - - public int getProxyPort() { - return proxyPort; - } - - public SSLContext getSslContext() { - return sslContext; - } - - public HostnameVerifier getHostnameVerifier() { - return hostnameVerifier; - } - - public boolean useTlsConfigFromEnvironment() { - return useTlsConfigFromEnvironment; - } - - public Duration getConnectionTimeToLive() { - return connectionTimeToLive; - } - public Path getPrivateKey() { return privateKey; } - public Path getCertificate() { return certificate; } - public Path getCaCertificates() { return caCertificates; } - } - - private final SSLContext sslContext; - private final Path privateKey; - private final Path certificate; - private final Path caCertificates; - private final HostnameVerifier hostnameVerifier; - private final Multimap<String, String> headers = ArrayListMultimap.create(); - private final Map<String, HeaderProvider> headerProviders = new HashMap<>(); - private final int numPersistentConnectionsPerEndpoint; - private final String proxyHost; - private final int proxyPort; - private final boolean useCompression; - private final int maxRetries; - private final long minTimeBetweenRetriesMs; - private final boolean dryRun; - private final boolean runThreads; - private final int traceLevel; - private final int traceEveryXOperation; - private final boolean printTraceToStdErr; - private final boolean useTlsConfigFromEnvironment; - private final Duration connectionTimeToLive; - - private ConnectionParams( - SSLContext sslContext, - Path privateKey, Path certificate, Path caCertificates, - HostnameVerifier hostnameVerifier, - Multimap<String, String> headers, - Map<String, HeaderProvider> headerProviders, - int numPersistentConnectionsPerEndpoint, - String proxyHost, - int proxyPort, - boolean useCompression, - int maxRetries, - long minTimeBetweenRetriesMs, - boolean dryRun, - boolean runThreads, - int traceLevel, - int traceEveryXOperation, - boolean printTraceToStdErr, - boolean useTlsConfigFromEnvironment, - Duration connectionTimeToLive) { - this.sslContext = sslContext; - this.privateKey = privateKey; - this.certificate = certificate; - this.caCertificates = caCertificates; - this.hostnameVerifier = hostnameVerifier; - this.useTlsConfigFromEnvironment = useTlsConfigFromEnvironment; - this.connectionTimeToLive = connectionTimeToLive; - this.headers.putAll(headers); - this.headerProviders.putAll(headerProviders); - this.numPersistentConnectionsPerEndpoint = numPersistentConnectionsPerEndpoint; - this.proxyHost = proxyHost; - this.proxyPort = proxyPort; - this.useCompression = useCompression; - this.maxRetries = maxRetries; - this.minTimeBetweenRetriesMs = minTimeBetweenRetriesMs; - this.dryRun = dryRun; - this.runThreads = runThreads; - this.traceLevel = traceLevel; - this.traceEveryXOperation = traceEveryXOperation; - this.printTraceToStdErr = printTraceToStdErr; - } - - @JsonIgnore - public SSLContext getSslContext() { - return sslContext; - } - - @JsonIgnore - public HostnameVerifier getHostnameVerifier() { - return hostnameVerifier; - } - - public Collection<Map.Entry<String, String>> getHeaders() { - return Collections.unmodifiableCollection(headers.entries()); - } - @JsonIgnore - public Map<String, HeaderProvider> getDynamicHeaders() { - return Collections.unmodifiableMap(headerProviders); - } - - public int getNumPersistentConnectionsPerEndpoint() { - return numPersistentConnectionsPerEndpoint; - } - - public String getProxyHost() { - return proxyHost; - } - - public int getProxyPort() { - return proxyPort; - } - - public boolean getUseCompression() { - return useCompression; - } - - public int getMaxRetries() { - return maxRetries; - } - - public long getMinTimeBetweenRetriesMs() { - return minTimeBetweenRetriesMs; - } - - public boolean isDryRun() { - return dryRun; - } - - public boolean runThreads() { return runThreads; } - - public int getTraceLevel() { - return traceLevel; - } - - public int getTraceEveryXOperation() { - return traceEveryXOperation; - } - - public boolean getPrintTraceToStdErr() { - return printTraceToStdErr; - } - - public boolean useTlsConfigFromEnvironment() { - return useTlsConfigFromEnvironment; - } - - public Duration getConnectionTimeToLive() { - return connectionTimeToLive; - } - - /** - * A header provider that provides a header value. {@link #getHeaderValue()} is called each time a new HTTP request - * is constructed by {@link com.yahoo.vespa.http.client.FeedClient}. - * - * Important: The implementation of {@link #getHeaderValue()} must be thread-safe! - */ - public interface HeaderProvider { String getHeaderValue(); } - - public Path getPrivateKey() { return privateKey; } - public Path getCertificate() { return certificate; } - public Path getCaCertificates() { return caCertificates; } -} diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/Endpoint.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/Endpoint.java deleted file mode 100644 index ae0cf19a3d1..00000000000 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/Endpoint.java +++ /dev/null @@ -1,82 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.vespa.http.client.config; - -import java.io.Serializable; -import java.net.URL; - -/** - * Represents an endpoint, in most cases a JDisc container - * in a Vespa cluster configured with <code>document-api</code>. - * - * @author Einar M R Rosenvinge - */ -public final class Endpoint implements Serializable { - - private static final int DEFAULT_PORT = 4080; - - private final String hostname; - private final int port; - private final boolean useSsl; - - private Endpoint(String hostname, int port, boolean useSsl) { - if (hostname.startsWith("https://")) { - throw new RuntimeException("Hostname should be name of machine, not prefixed with protocol (https://)"); - } - // A lot of people put http:// before the servername, let us allow that. - if (hostname.startsWith("http://")) { - this.hostname = hostname.replaceFirst("http://", ""); - } else { - this.hostname = hostname; - } - this.port = port; - this.useSsl = useSsl; - } - - public String getHostname() { - return hostname; - } - - public int getPort() { - return port; - } - - public boolean isUseSsl() { - return useSsl; - } - - @Override - public String toString() { - return hostname + ":" + port + " ssl=" + useSsl; - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (!(o instanceof Endpoint)) return false; - Endpoint endpoint = (Endpoint) o; - return port == endpoint.port && useSsl == endpoint.useSsl && hostname.equals(endpoint.hostname); - } - - @Override - public int hashCode() { - int result = hostname.hashCode(); - result = 31 * result + port; - result = 31 * result + (useSsl ? 1 : 0); - return result; - } - - /** Creates an Endpoint with the default port and without using SSL */ - public static Endpoint create(String hostname) { - return new Endpoint(hostname, DEFAULT_PORT, false); - } - - /** Creates an Endpoint with the given hostname, port and SSL setting. */ - public static Endpoint create(String hostname, int port, boolean useSsl) { - return new Endpoint(hostname, port, useSsl); - } - - public static Endpoint create(URL url) { - return new Endpoint(url.getHost(), url.getPort(), "https".equals(url.getProtocol())); - } - -} diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/FeedParams.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/FeedParams.java deleted file mode 100644 index 01f314a7e36..00000000000 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/FeedParams.java +++ /dev/null @@ -1,332 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.vespa.http.client.config; - -import java.util.concurrent.TimeUnit; - -/** - * Feed level parameters. This class is immutable - * and has no public constructor - to instantiate one, use a {@link Builder}. - - * @author Einar M R Rosenvinge - */ -public final class FeedParams { - - public boolean getDenyIfBusyV3() { return denyIfBusyV3; } - - public long getMaxSleepTimeMs() { return maxSleepTimeMs; } - - public boolean getSilentUpgrade() { return silentUpgrade; } - - /** - * Enumeration of data formats that are acceptable by the - * {@link com.yahoo.vespa.http.client.FeedClient} methods. - */ - public enum DataFormat { - /** UTF-8-encoded XML. Preamble is not necessary. */ - XML_UTF8, - JSON_UTF8 - } - /** - * Mutable class used to instantiate a {@link FeedParams}. - */ - public static final class Builder { - - private DataFormat dataFormat = DataFormat.JSON_UTF8; - private long serverTimeout = TimeUnit.SECONDS.toMillis(180); - private long clientTimeout = TimeUnit.SECONDS.toMillis(20); - private String route = null; - private int maxChunkSizeBytes = 50 * 1024; - private int maxInFlightRequests = 5000; - private long localQueueTimeOut = 180000; - private String priority = null; - private boolean denyIfBusyV3 = true; - private long maxSleepTimeMs = 3000; - private boolean silentUpgrade = true; - private Double idlePollFrequency = null; - - /** - * Make server not throw 4xx/5xx for situations that are normal during upgrade as this can esily mask - * other problems. This feature need to be supported on server side to work, but it is still safe - * to enable it, even if server does not yet support it. As of Nov 22 2016 it is not yet implemented on - * the server side. - * @param silentUpgrade true for reducing "false" 4xx/5xx. - * @return this, for chaining - */ - public Builder setSilentUpgrade(boolean silentUpgrade) { - this.silentUpgrade = silentUpgrade; - return this; - } - - /** - * When throttling the load due to transient errors on gateway, what is the most time to wait between - * requests per thread. Only active for V3 protocol. - * @param ms max with time - * @return this, for chaining - */ - public Builder setMaxSleepTimeMs(long ms) { - this.maxSleepTimeMs = ms; - return this; - } - - /** - * If this is set to false, the gateway will block threads until messagebus can send the message. - * If true, the gateway will exit and fail the request early if there are many threads already - * blocked. - * @param value true to reduce number of blocked threads in gateway. - * @return this, for chaining - */ - public Builder setDenyIfBusyV3(boolean value) { - denyIfBusyV3 = value; - return this; - } - - /** - * Sets the data format to be used. - * - * @param dataFormat the data format to be used. - * @see DataFormat - * @return this, for chaining - */ - public Builder setDataFormat(DataFormat dataFormat) { - this.dataFormat = dataFormat; - return this; - } - - /** - * Sets a route to be used for all Clusters, unless overridden on a per-cluster basis - * in {@link com.yahoo.vespa.http.client.config.Cluster#getRoute()}. - * - * @param route a route to be used for all Clusters. - * @return this, for chaining - */ - public Builder setRoute(String route) { - this.route = route; - return this; - } - - /** - * Sets the server-side timeout of each operation - i.e. the timeout used by - * the server endpoint for operations going over the message bus protocol into - * Vespa. - * - * Note that the TOTAL timeout of any one operation in this API would be - * {@link #getServerTimeout(java.util.concurrent.TimeUnit)} + - * {@link #getClientTimeout(java.util.concurrent.TimeUnit)}. - * - * @param serverTimeout timeout value - * @param unit unit of timeout value - * @return this, for chaining - */ - public Builder setServerTimeout(long serverTimeout, TimeUnit unit) { - if (serverTimeout <= 0L) { - throw new IllegalArgumentException("Server timeout cannot be zero or negative."); - } - this.serverTimeout = unit.toMillis(serverTimeout); - return this; - } - - /** - * Sets the client-side timeout for each operation. If BOTH the server-side - * timeout AND this timeout has passed, the {@link com.yahoo.vespa.http.client.FeedClient} - * will synthesize a {@link com.yahoo.vespa.http.client.Result}. - * - * Note that the TOTAL timeout of any one operation in this API would be - * {@link #getServerTimeout(java.util.concurrent.TimeUnit)} + - * {@link #getClientTimeout(java.util.concurrent.TimeUnit)}, - * after which a result callback is guaranteed to be made. - * - * @param clientTimeout timeout value - * @param unit unit of timeout value - * @return this, for chaining - */ - public Builder setClientTimeout(long clientTimeout, TimeUnit unit) { - if (clientTimeout <= 0L) { - throw new IllegalArgumentException("Client timeout cannot be zero or negative."); - } - this.clientTimeout = unit.toMillis(clientTimeout); - return this; - } - - /** - * Sets the maximum number of bytes of document data to send per HTTP request. - * - * @param maxChunkSizeBytes max number of bytes per HTTP request. - * @return this, for chaining - */ - public Builder setMaxChunkSizeBytes(int maxChunkSizeBytes) { - this.maxChunkSizeBytes = maxChunkSizeBytes; - return this; - } - - /** - * Sets the maximum number of operations to be in-flight. - * - * @param maxInFlightRequests max number of operations. - * @return this, for chaining - */ - public Builder setMaxInFlightRequests(int maxInFlightRequests) { - this.maxInFlightRequests = maxInFlightRequests; - return this; - } - - /** - * Sets the number of milliseconds until we respond with a timeout for a document operation - * if we still have not received a response. - */ - public Builder setLocalQueueTimeOut(long timeOutMs) { - this.localQueueTimeOut = timeOutMs; - return this; - } - - /** - * Set what frequency to poll for async responses. Default is 10hz (every 0.1s), but 1000hz when using SyncFeedClient - */ - public Builder setIdlePollFrequency(Double idlePollFrequency) { - this.idlePollFrequency = idlePollFrequency; - return this; - } - - /** - * Sets the messagebus priority. The allowed values are HIGHEST, VERY_HIGH, HIGH_[1-3], - * NORMAL_[1-6], LOW_[1-3], VERY_LOW, and LOWEST.. - * @param priority messagebus priority of this message. - * @return this, for chaining - */ - public Builder setPriority(String priority) { - if (priority == null) { - return this; - } - switch (priority) { - case "HIGHEST": - case "VERY_HIGH": - case "HIGH_1": - case "HIGH_2": - case "HIGH_3": - case "NORMAL_1": - case "NORMAL_2": - case "NORMAL_3": - case "NORMAL_4": - case "NORMAL_5": - case "NORMAL_6": - case "LOW_1": - case "LOW_2": - case "LOW_3": - case "VERY_LOW": - case "LOWEST": - this.priority = priority; - return this; - default: - throw new IllegalArgumentException("Unknown value for priority: " + priority - + " Allowed values are HIGHEST, VERY_HIGH, HIGH_[1-3], " + - "NORMAL_[1-6], LOW_[1-3], VERY_LOW, and LOWEST."); - } - } - - /** - * Instantiates a {@link FeedParams}. - * - * @return a FeedParams object with the parameters of this Builder - */ - public FeedParams build() { - return new FeedParams( - dataFormat, serverTimeout, clientTimeout, route, - maxChunkSizeBytes, maxInFlightRequests, localQueueTimeOut, priority, - denyIfBusyV3, maxSleepTimeMs, silentUpgrade, idlePollFrequency); - } - - public long getClientTimeout(TimeUnit unit) { - return unit.convert(clientTimeout, TimeUnit.MILLISECONDS); - } - - public long getServerTimeout(TimeUnit unit) { - return unit.convert(serverTimeout, TimeUnit.MILLISECONDS); - } - - public String getRoute() { - return route; - } - - public DataFormat getDataFormat() { - return dataFormat; - } - - public int getMaxChunkSizeBytes() { - return maxChunkSizeBytes; - } - - public int getMaxInFlightRequests() { - return maxInFlightRequests; - } - - } - - // NOTE! See toBuilder at the end of this class if you add fields here - - private final DataFormat dataFormat; - private final long serverTimeoutMillis; - private final long clientTimeoutMillis; - private final String route; - private final int maxChunkSizeBytes; - private final int maxInFlightRequests; - private final long localQueueTimeOut; - private final String priority; - private final boolean denyIfBusyV3; - private final long maxSleepTimeMs; - private final boolean silentUpgrade; - private final Double idlePollFrequency; - - private FeedParams(DataFormat dataFormat, long serverTimeout, long clientTimeout, String route, - int maxChunkSizeBytes, final int maxInFlightRequests, - long localQueueTimeOut, String priority, boolean denyIfBusyV3, long maxSleepTimeMs, - boolean silentUpgrade, Double idlePollFrequency) { - this.dataFormat = dataFormat; - this.serverTimeoutMillis = serverTimeout; - this.clientTimeoutMillis = clientTimeout; - this.route = route; - this.maxChunkSizeBytes = maxChunkSizeBytes; - this.maxInFlightRequests = maxInFlightRequests; - this.localQueueTimeOut = localQueueTimeOut; - this.priority = priority; - this.denyIfBusyV3 = denyIfBusyV3; - this.maxSleepTimeMs = maxSleepTimeMs; - this.silentUpgrade = silentUpgrade; - this.idlePollFrequency = idlePollFrequency; - - } - - public DataFormat getDataFormat() { return dataFormat; } - public String getRoute() { return route; } - public long getServerTimeout(TimeUnit unit) { return unit.convert(serverTimeoutMillis, TimeUnit.MILLISECONDS); } - public long getClientTimeout(TimeUnit unit) { return unit.convert(clientTimeoutMillis, TimeUnit.MILLISECONDS); } - - public int getMaxChunkSizeBytes() { return maxChunkSizeBytes; } - public String getPriority() { return priority; } - - public String toUriParameters() { - StringBuilder b = new StringBuilder(); - b.append("&dataformat=").append(dataFormat.name()); //name in dataFormat enum obviously must be ascii - return b.toString(); - } - - public int getMaxInFlightRequests() { return maxInFlightRequests; } - public long getLocalQueueTimeOut() { return localQueueTimeOut; } - public Double getIdlePollFrequency() { return idlePollFrequency; } - - /** Returns a builder initialized to the values of this */ - public FeedParams.Builder toBuilder() { - Builder b = new Builder(); - b.setDataFormat(dataFormat); - b.setServerTimeout(serverTimeoutMillis, TimeUnit.MILLISECONDS); - b.setClientTimeout(clientTimeoutMillis, TimeUnit.MILLISECONDS); - b.setRoute(route); - b.setMaxChunkSizeBytes(maxChunkSizeBytes); - b.setMaxInFlightRequests(maxInFlightRequests); - b.setPriority(priority); - b.setDenyIfBusyV3(denyIfBusyV3); - b.setMaxSleepTimeMs(maxSleepTimeMs); - b.setSilentUpgrade(silentUpgrade); - b.setIdlePollFrequency(idlePollFrequency); - return b; - } - -} diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/SessionParams.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/SessionParams.java deleted file mode 100644 index e8052fa7faa..00000000000 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/SessionParams.java +++ /dev/null @@ -1,193 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.vespa.http.client.config; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.LinkedList; -import java.util.List; - -/** - * Parameters given to a {@link com.yahoo.vespa.http.client.FeedClientFactory} - * when creating {@link com.yahoo.vespa.http.client.FeedClient}s. This class is immutable - * and has no public constructor - to instantiate one, use a {@link Builder}. - * - * @author Einar M R Rosenvinge - * @see com.yahoo.vespa.http.client.FeedClientFactory - * @see Builder - */ -public final class SessionParams { - - /** - * Interface for handling serious errors with connection. - */ - public interface ErrorReporter { - void onSessionError(Endpoint endpoint, String oldSessionID, String newSessionId); - } - - /** - * Mutable class used to instantiate a {@link SessionParams}. A builder - * instance will at the very least contain cluster settings ( - * {@link #addCluster(Cluster)}), for supporting SSL and similar transport - * settings, use {@link #setConnectionParams(ConnectionParams)}. - */ - public static final class Builder { - - private final List<Cluster> clusters = new LinkedList<>(); - private FeedParams feedParams = new FeedParams.Builder().build(); - private ConnectionParams connectionParams = new ConnectionParams.Builder().build(); - private int clientQueueSize = 10000; - private ErrorReporter errorReporter = null; - private int throttlerMinSize = 0; - - /** - * Add a Vespa installation for feeding documents into. - * - * @return this Builder instance, to support chaining - */ - public Builder addCluster(Cluster cluster) { - clusters.add(cluster); - return this; - } - - /** - * Set parameters used for feeding the documents in the receiving - * cluster. Reasonable defaults are supplied, so setting this should not - * be necessary for testing. - * - * @return this builder instance to support chaining - */ - public Builder setFeedParams(FeedParams feedParams) { - this.feedParams = feedParams; - return this; - } - - /** - * Transport parameters, like custom HTTP headers. - * - * @return this Builder instance, to support chaining - */ - public Builder setConnectionParams(ConnectionParams connectionParams) { - this.connectionParams = connectionParams; - return this; - } - - /** - * Sets an error reporter that is invoked in case of serious errors. - * - * @param errorReporter the handler - * @return pointer to builder. - */ - public Builder setErrorReporter(ErrorReporter errorReporter) { - this.errorReporter = errorReporter; - return this; - } - - /** - * Sets the maximum number of document operations to hold in memory, waiting to be - * sent to Vespa. When this threshold is reached, {@link java.io.OutputStream#close()} will block. - * - * @param clientQueueSize the maximum number of document operations to hold in memory. - * @return pointer to builder. - */ - public Builder setClientQueueSize(int clientQueueSize) { - this.clientQueueSize = clientQueueSize; - return this; - } - - /** - * Sets the minimum queue size of the throttler. If this is zero, it means that dynamic throttling is - * not enabled. Otherwise it is the minimum size of the throttler for how many parallel requests that are - * accepted. The max size of the throttler is the clientQueueSize. - * @return the minimum number of requests to be used in throttler or zero if throttler is static. - * - * @param throttlerMinSize the value of the min size. - */ - public Builder setThrottlerMinSize(int throttlerMinSize) { - this.throttlerMinSize = throttlerMinSize; - return this; - } - - /** - * Instantiates a {@link SessionParams} that can be given to a {@link com.yahoo.vespa.http.client.FeedClientFactory}. - * - * @return a SessionParams object with the parameters of this Builder - */ - public SessionParams build() { - return new SessionParams( - clusters, feedParams, connectionParams, clientQueueSize, errorReporter, throttlerMinSize); - } - - public FeedParams getFeedParams() { - return feedParams; - } - public ConnectionParams getConnectionParams() { - return connectionParams; - } - public int getClientQueueSize() { - return clientQueueSize; - } - public int getThrottlerMinSize() { - return throttlerMinSize; - } - } - - // NOTE! See toBuilder at the end of this class if you add fields here - - private final List<Cluster> clusters; - private final FeedParams feedParams; - private final ConnectionParams connectionParams; - private final int clientQueueSize; - private final ErrorReporter errorReport; - private final int throttlerMinSize; - - private SessionParams(Collection<Cluster> clusters, - FeedParams feedParams, - ConnectionParams connectionParams, - int clientQueueSize, - ErrorReporter errorReporter, - int throttlerMinSize) { - this.clusters = Collections.unmodifiableList(new ArrayList<>(clusters)); - this.feedParams = feedParams; - this.connectionParams = connectionParams; - this.clientQueueSize = clientQueueSize; - this.errorReport = errorReporter; - this.throttlerMinSize = throttlerMinSize; - } - - public List<Cluster> getClusters() { - return clusters; - } - - public FeedParams getFeedParams() { - return feedParams; - } - - public ConnectionParams getConnectionParams() { - return connectionParams; - } - - public int getClientQueueSize() { - return clientQueueSize; - } - - public int getThrottlerMinSize() { - return throttlerMinSize; - } - - public ErrorReporter getErrorReport() { - return errorReport; - } - - public Builder toBuilder() { - Builder b = new Builder(); - clusters.forEach(c -> b.addCluster(c)); - b.setFeedParams(feedParams); - b.setConnectionParams(connectionParams); - b.setClientQueueSize(clientQueueSize); - b.setErrorReporter(errorReport); - b.setThrottlerMinSize(throttlerMinSize); - return b; - } - -} diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/package-info.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/package-info.java deleted file mode 100644 index 9c07e819e90..00000000000 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/package-info.java +++ /dev/null @@ -1,8 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -/** - * Settings for creating clients/sessions. - * - * NOTE: This is a PUBLIC API, but not annotated as such because this is not a bundle and - * we don't want to introduce Vespa dependencies. - */ -package com.yahoo.vespa.http.client.config; diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/Document.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/Document.java deleted file mode 100644 index 07262a60dd8..00000000000 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/Document.java +++ /dev/null @@ -1,96 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.vespa.http.client.core; - -import java.io.IOException; -import java.math.BigInteger; -import java.nio.ByteBuffer; -import java.nio.CharBuffer; -import java.nio.charset.CharacterCodingException; -import java.nio.charset.StandardCharsets; -import java.time.Instant; -import java.util.Objects; -import java.util.concurrent.ThreadLocalRandom; - -/** - * A document operation - * - * @author Einar M R Rosenvinge - */ -final public class Document { - - private final String documentId; - private final ByteBuffer data; - private final Instant createTime; - // This is initialized lazily to reduce work on calling thread (which is the thread calling the API) - private String operationId = null; - private final Object context; - private Instant queueInsertTime; - - public Document(String documentId, byte[] data, Object context, Instant createTime) { - this(documentId, null, ByteBuffer.wrap(data), context, createTime); - } - - public Document(String documentId, String operationId, CharSequence data, Object context, Instant createTime) { - this(documentId, operationId, encode(data, documentId), context, createTime); - } - - private Document(String documentId, String operationId, ByteBuffer data, Object context, Instant createTime) { - this.documentId = documentId; - this.operationId = operationId; - this.data = data; - this.context = context; - this.createTime = Objects.requireNonNull(createTime, "createTime cannot be null"); - this.queueInsertTime = createTime; - } - - public void setQueueInsertTime(Instant queueInsertTime) { - this.queueInsertTime = queueInsertTime; - } - - public Instant getQueueInsertTime() { return queueInsertTime; } - - public CharSequence getDataAsString() { - return StandardCharsets.UTF_8.decode(data.asReadOnlyBuffer()); - } - - public Object getContext() { return context; } - - public static class DocumentException extends IOException { - private static final long serialVersionUID = 29832833292L; - public DocumentException(String message) - { - super(message); - } - } - - public String getDocumentId() { return documentId; } - - public ByteBuffer getData() { - return data.asReadOnlyBuffer(); - } - - public int size() { - return data.remaining(); - } - - public Instant createTime() { return createTime; } - - public String getOperationId() { - if (operationId == null) { - operationId = new BigInteger(64, ThreadLocalRandom.current()).toString(32); - } - return operationId; - } - - @Override - public String toString() { return "document '" + documentId + "'"; } - - private static ByteBuffer encode(CharSequence data, String documentId) { - try { - return StandardCharsets.UTF_8.newEncoder().encode(CharBuffer.wrap(data)); - } catch (CharacterCodingException e) { - throw new RuntimeException("Error encoding document data into UTF8 " + documentId, e); - } - } - -} diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/Encoder.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/Encoder.java deleted file mode 100644 index e4781dc3a3f..00000000000 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/Encoder.java +++ /dev/null @@ -1,100 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.vespa.http.client.core; - -/** - * Simple encoding scheme to remove space, linefeed, control characters and - * anything outside ISO 646.irv:1991 from strings. The scheme is supposed to be - * human readable and debugging friendly. Opening and closing curly braces are - * used as quoting characters, the output is by definition US-ASCII only - * characters. - * - * @author Steinar Knutsen - */ -public final class Encoder { - - /** - * ISO 646.irv:1991 safe quoting into a StringBuilder instance. - * - * @param input the string to encode - * @param output the destination buffer - * @return the destination buffer given as input - */ - public static StringBuilder encode(String input, StringBuilder output) { - for (int i = 0; i < input.length(); i = input.offsetByCodePoints(i, 1)) { - int c = input.codePointAt(i); - if (c <= '~') { - if (c <= ' ') { - encode(c, output); - } else { - switch (c) { - case '{': - case '}': - encode(c, output); - break; - default: - output.append((char) c); - } - } - } else { - encode(c, output); - } - } - return output; - } - - /** - * ISO 646.irv:1991 safe unquoting into a StringBuilder instance. - * - * @param input the string to decode - * @param output the destination buffer - * @return the destination buffer given as input - * @throws IllegalArgumentException if the input string contains unexpected or invalid data - */ - public static StringBuilder decode(String input, StringBuilder output) { - for (int i = 0; i < input.length(); i = input.offsetByCodePoints(i, 1)) { - int c = input.codePointAt(i); - if (c > '~') - throw new IllegalArgumentException("Input contained character above printable ASCII at position " + i); - if (c == '{') - i = decode(input, i, output); - else - output.append((char) c); - } - return output; - } - - private static int decode(String input, int offset, StringBuilder output) { - char c = 0; - int end = offset; - int start = offset + 1; - int codePoint; - - while ('}' != c) { - if (++end >= input.length()) { - throw new IllegalArgumentException("Unterminated quoted character or empty quoting."); - } - c = input.charAt(end); - } - try { - codePoint = Integer.parseInt(input.substring(start, end), 16); - } catch (NumberFormatException e) { - throw new IllegalArgumentException("Unexpected quoted data: [" + input.substring(start, end) + "]", e); - } - if (Character.charCount(codePoint) > 1) { - try { - output.append(Character.toChars(codePoint)); - } catch (IllegalArgumentException e) { - throw new IllegalArgumentException("Unexpected quoted data: [" + input.substring(start, end) + "]", e); - } - } else { - output.append((char) codePoint); - } - return end; - - } - - private static void encode(int c, StringBuilder output) { - output.append("{").append(Integer.toHexString(c)).append("}"); - } - -} diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/EndpointResult.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/EndpointResult.java deleted file mode 100644 index 94d7237422a..00000000000 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/EndpointResult.java +++ /dev/null @@ -1,29 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.vespa.http.client.core; - -import com.yahoo.vespa.http.client.Result; - -/** - * Result from a single endpoint. - * - * @author dybis - */ -public class EndpointResult { - - private final String operationId; - private final Result.Detail detail; - - public EndpointResult(String operationId, Result.Detail detail) { - this.operationId = operationId; - this.detail = detail; - } - - public String getOperationId() { - return operationId; - } - - public Result.Detail getDetail() { - return detail; - } - -} diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/ErrorCode.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/ErrorCode.java deleted file mode 100644 index 4e739218319..00000000000 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/ErrorCode.java +++ /dev/null @@ -1,33 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.vespa.http.client.core; - -/** - * Return types for the server. - * - * @author Einar M R Rosenvinge - * @author Steinar Knutsen - */ -public enum ErrorCode { - - OK(true, true), - ERROR(false, false), - TRANSIENT_ERROR(false, true), - END_OF_FEED(true, true); - - private boolean success; - private boolean _transient; - - ErrorCode(boolean success, boolean _transient) { - this.success = success; - this._transient = _transient; - } - - public boolean isSuccess() { - return success; - } - - public boolean isTransient() { - return _transient; - } - -} diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/Exceptions.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/Exceptions.java deleted file mode 100644 index 9ff3f793756..00000000000 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/Exceptions.java +++ /dev/null @@ -1,47 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.vespa.http.client.core; - -/** - * Helper methods for handling exceptions - * - * @author bratseth - */ -public abstract class Exceptions { - - /** - * <p>Returns a use friendly error message string which includes information from all nested exceptions.</p> - * - * <p>The form of this string is - * <code>e.getMessage(): e.getCause().getMessage(): e.getCause().getCause().getMessage()...</code> - * In addition, some heuristics are used to clean up common cases where exception nesting causes bad messages. - * </p> - */ - public static String toMessageString(Throwable t) { - StringBuilder b = new StringBuilder(); - String lastMessage = null; - String message; - for (; t != null; t = t.getCause(), lastMessage = message) { - message = getMessage(t); - if (message == null) continue; - if (message.equals(lastMessage)) continue; - if (b.length() > 0) { - b.append(": "); - } - b.append(message); - } - return b.toString(); - } - - /** Returns a useful message from *this* exception, or null if none */ - private static String getMessage(Throwable t) { - String message = t.getMessage(); - if (t.getCause() == null) { - if (message == null) return t.getClass().getSimpleName(); - } else { - if (message == null) return null; - if (message.equals(t.getCause().getClass().getName() + ": " + t.getCause().getMessage())) return null; - } - return message; - } - -} diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/Headers.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/Headers.java deleted file mode 100644 index d41f42ef652..00000000000 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/Headers.java +++ /dev/null @@ -1,37 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.vespa.http.client.core; - -/** - * Wrapper for shared constants used by both client and server. - * - * @author Steinar Knutsen - */ -public final class Headers { - - private Headers() { - } - - public static final String CLIENT_VERSION = "Vespa-Client-Version"; - - public static final String TIMEOUT = "X-Yahoo-Feed-Timeout"; - public static final String DRAIN = "X-Yahoo-Feed-Drain"; - public static final String ROUTE = "X-Yahoo-Feed-Route"; - public static final String VERSION = "X-Yahoo-Feed-Protocol-Version"; - public static final String SESSION_ID = "X-Yahoo-Feed-Session-Id"; - public static final String DENY_IF_BUSY = "X-Yahoo-Feed-Deny-If-Busy"; - public static final String DATA_FORMAT = "X-Yahoo-Feed-Data-Format"; - // This value can be used to route the request to a specific server when using - // several servers. It is a random value that is the same for the whole session. - public static final String SHARDING_KEY = "X-Yahoo-Feed-Sharding-Key"; - public static final String PRIORITY = "X-Yahoo-Feed-Priority"; - public static final String TRACE_LEVEL = "X-Yahoo-Feed-Trace-Level"; - - public static final int HTTP_NOT_ACCEPTABLE = 406; - - // For version 3 of the API - public static final String CLIENT_ID = "X-Yahoo-Client-Id"; - public static final String OUTSTANDING_REQUESTS = "X-Yahoo-Outstanding-Requests"; - public static final String HOSTNAME = "X-Yahoo-Hostname"; - public static final String SILENTUPGRADE = "X-Yahoo-Silent-Upgrade"; - -} diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/JsonReader.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/JsonReader.java deleted file mode 100644 index 34b6d8b9144..00000000000 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/JsonReader.java +++ /dev/null @@ -1,245 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.vespa.http.client.core; - -import com.fasterxml.jackson.core.JsonFactory; -import com.fasterxml.jackson.core.JsonFactoryBuilder; -import com.fasterxml.jackson.core.JsonParser; -import com.yahoo.vespa.http.client.FeedClient; - -import java.io.EOFException; -import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.io.UncheckedIOException; -import java.nio.charset.StandardCharsets; -import java.util.concurrent.atomic.AtomicInteger; - -/** - * Reads a stream of json documents and sends them to feedClient. - * - * @author dybis - */ -public class JsonReader { - - /** - * Max size of documents. As we stream docs in for finding doc id, we buffer the data and later stream them to - * feedclient after doc id has been revealed. - */ - private final static int maxDocumentSizeChars = 50 * 1024 * 1024; - - // Intended to be used as static. - private JsonReader() {} - - /** - * Process one inputstream and send all documents to feedclient. - * - * @param inputStream source of array of json document. - * @param feedClient where data is sent. - * @param numSent counter to be incremented for every document streamed. - */ - public static void read(InputStream inputStream, FeedClient feedClient, AtomicInteger numSent) { - try (InputStreamJsonElementBuffer jsonElementBuffer = new InputStreamJsonElementBuffer(inputStream)) { - JsonFactory jfactory = new JsonFactoryBuilder().disable(JsonFactory.Feature.CANONICALIZE_FIELD_NAMES).build(); - JsonParser jParser = jfactory.createParser(jsonElementBuffer); - while (true) { - int documentStart = (int) jParser.getCurrentLocation().getCharOffset(); - String docId = parseOneDocument(jParser); - if (docId == null) { - int documentEnd = (int) jParser.getCurrentLocation().getCharOffset(); - int documentLength = documentEnd - documentStart; - int maxTruncatedLength = 500; - StringBuilder stringBuilder = new StringBuilder(maxTruncatedLength + 3); - for (int i = 0; i < Math.min(documentLength, maxTruncatedLength); i++) - stringBuilder.append(jsonElementBuffer.circular.get(documentStart + i)); - - if (documentLength > maxTruncatedLength) - stringBuilder.append("..."); - - throw new IllegalArgumentException("Document is missing ID: '" + stringBuilder.toString() + "'"); - } - CharSequence data = jsonElementBuffer.getJsonAsArray(jParser.getCurrentLocation().getCharOffset()); - feedClient.stream(docId, data); - numSent.incrementAndGet(); - } - } catch (EOFException ignored) { - // No more documents - } catch (IOException ioe) { - System.err.println(ioe.getMessage()); - throw new UncheckedIOException(ioe); - } - } - - /** - * This class is intended to be used with a json parser. The data is sent through this intermediate stream - * and to the parser. When the parser is done with a document, it calls postJsonAsArray which will - * stream the document up to the current position of the parser. - */ - private static class InputStreamJsonElementBuffer extends InputStreamReader { - - /** - * Simple class implementing a circular array with some custom function used for finding start and end - * of json object. The reason this is needed is that the json parser reads more than it parses - * from the input stream (seems like about 8k). Using a ByteBuffer and manually moving data - * is an order of magnitude slower than this implementation. - */ - private class CircularCharBuffer { - - int readPointer = 0; - int writePointer = 0; - final char[] data; - final int size; - - public CircularCharBuffer(int chars) { - data = new char[chars]; - size = chars; - } - - /** - * This is for throwing away [ and spaces in front of a json object, and find the position of {. - * Not for parsing much text. - * - * @return position for { - */ - public int findNextObjectStart() { - int readerPos = 0; - while (get(readerPos) != '{') { - readerPos++; - assert(readerPos<=size); - } - return readerPos; - } - - /** - * This is for throwing away comma and or ], and for finding the position of the last }. - * @param fromPos where to start searching - * @return position for } - */ - public int findLastObjectEnd(int fromPos) { - while (get(fromPos-1) != '}') { - fromPos--; - assert(fromPos >=0); - } - return fromPos; - } - - public void put(char dataByte) { - data[writePointer] = dataByte; - writePointer++; - if (writePointer >= size) writePointer = 0; - assert(writePointer != readPointer); - } - - public char get(int pos) { - int readPos = readPointer + pos; - if (readPos >= size) readPos -= size; - assert(readPos != writePointer); - return data[readPos]; - } - - public void advance(int end) { - readPointer += end; - if (readPointer >= size) readPointer -= size; - } - } - - private final CircularCharBuffer circular = new CircularCharBuffer(maxDocumentSizeChars); - private int processedChars = 0; - - public InputStreamJsonElementBuffer(InputStream inputStream) { - super(inputStream, StandardCharsets.UTF_8); - } - - /** - * Removes comma, start/end array tag (last element), spaces etc that might be surrounding a json element. - * Then sends the element to the outputstream. - * @param parserPosition how far the parser has come. Please note that the parser might have processed - * more data from the input source as it is reading chunks of data. - * @throws IOException on errors - */ - public CharSequence getJsonAsArray(long parserPosition) throws IOException { - final int charSize = (int)parserPosition - processedChars; - final int endPosOfJson = circular.findLastObjectEnd(charSize); - final int startPosOfJson = circular.findNextObjectStart(); - processedChars += charSize; - // This can be optimized since we rarely wrap the circular buffer. - StringBuilder dataBuffer = new StringBuilder(endPosOfJson - startPosOfJson); - for (int x = startPosOfJson; x < endPosOfJson; x++) { - dataBuffer.append(circular.get(x)); - } - circular.advance(charSize); - return dataBuffer.toString(); - } - - @Override - public int read(char[] b, int off, int len) throws IOException { - int length = 0; - int value = 0; - while (length < len && value != -1) { - value = read(); - if (value == -1) { - return length == 0 ? -1 : length; - } - b[off + length] = (char) value; - length++; - } - return length; - } - - @Override - public int read() throws IOException { - int value = super.read(); - if (value >= 0) circular.put((char)value); - return value; - } - } - - /** - * Parse one document from the stream and return doc id. - * - * @param jParser parser with stream. - * @return doc id of document or null if no more docs. - * @throws IOException on problems - */ - private static String parseOneDocument(JsonParser jParser) throws IOException { - int objectLevel = 0; - String documentId = null; - boolean foundObject = false; - boolean valueIsDocumentId = false; - while (jParser.nextToken() != null) { - String tokenAsText = jParser.getText(); - if (valueIsDocumentId) { - if (documentId != null) { - throw new RuntimeException("Several document ids"); - } - documentId = tokenAsText; - valueIsDocumentId = false; - } - switch(jParser.getCurrentToken()) { - case START_OBJECT: - foundObject = true; - objectLevel++; - break; - case END_OBJECT: - objectLevel--; - if (objectLevel == 0) { - return documentId; - } - break; - case FIELD_NAME: - if (objectLevel == 1 && - (tokenAsText.equals("put") - || tokenAsText.endsWith("id") - || tokenAsText.endsWith("update") - || tokenAsText.equals("remove"))) { - valueIsDocumentId = true; - } - break; - default: // No operation on all other tags. - } - } - if (!foundObject) - throw new EOFException("No more documents"); - return null; - } - -} diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/OperationStatus.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/OperationStatus.java deleted file mode 100644 index ee6d96aa600..00000000000 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/OperationStatus.java +++ /dev/null @@ -1,90 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.vespa.http.client.core; - -import com.google.common.base.Splitter; -import java.util.Iterator; - -/** - * Serialization/deserialization class for the result of a single document operation against Vespa. - * - * @author Steinar Knutsen - */ -public final class OperationStatus { - - public static final String IS_CONDITION_NOT_MET = "IS-CONDITION-NOT-MET"; - public final String message; - public final String operationId; - public final ErrorCode errorCode; - public final String traceMessage; - public final boolean isConditionNotMet; - - private static final char EOL = '\n'; - private static final char SEPARATOR = ' '; - private static final Splitter spaceSep = Splitter.on(SEPARATOR); - - /** - * Constructor - * @param message some human readable information what happened - * @param operationId the doc ID for the operation - * @param errorCode if it is success, transitive, or fatal - * @param isConditionNotMet if error is due to condition not met - * @param traceMessage any tracemessage - */ - public OperationStatus(String message, String operationId, ErrorCode errorCode, boolean isConditionNotMet, String traceMessage) { - this.isConditionNotMet = isConditionNotMet; - this.message = message; - this.operationId = operationId; - this.errorCode = errorCode; - this.traceMessage = traceMessage; - } - - /** - * Parse a single rendered OperationStatus string. White space may be padded after - * and before the given status. - * - * @param singleLine - * a rendered OperationStatus - * @return an OperationStatus instance reflecting the input - * @throws IllegalArgumentException - * if there are illegal input data characters or the status - * element has no corresponding value in the ErrorCode - * enumeration - */ - public static OperationStatus parse(String singleLine) { - // Do note there is specifically left room for more arguments after - // the first in the serialized form. - Iterator<String> input = spaceSep.split(singleLine.trim()).iterator(); - String operationId; - ErrorCode errorCode; - String message; - String traceMessage = ""; - - operationId = Encoder.decode(input.next(), new StringBuilder()) - .toString(); - errorCode = ErrorCode.valueOf(Encoder.decode(input.next(), - new StringBuilder()).toString()); - - message = Encoder.decode(input.next(), new StringBuilder()).toString(); - // We are backwards compatible, meaning it is ok not to supply the last argument. - boolean isConditionNotMet = false; - if (message.startsWith(IS_CONDITION_NOT_MET)) { - message = message.replaceFirst(IS_CONDITION_NOT_MET, ""); - isConditionNotMet = true; - } - if (input.hasNext()) { - traceMessage = Encoder.decode(input.next(), new StringBuilder()).toString(); - } - return new OperationStatus(message, operationId, errorCode, isConditionNotMet, traceMessage); - } - - /** Returns a string representing the status. */ - public String render() { - StringBuilder s = new StringBuilder(); - Encoder.encode(operationId, s).append(SEPARATOR); - Encoder.encode(errorCode.toString(), s).append(SEPARATOR); - Encoder.encode(isConditionNotMet ? IS_CONDITION_NOT_MET + message : message, s).append(SEPARATOR); - Encoder.encode(traceMessage, s).append(EOL); - return s.toString(); - } - -} diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/ServerResponseException.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/ServerResponseException.java deleted file mode 100644 index d5a09d2566c..00000000000 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/ServerResponseException.java +++ /dev/null @@ -1,44 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.vespa.http.client.core; - -/** - * The request was not processed properly on the server. - * - * @author Einar M R Rosenvinge - */ -@SuppressWarnings("serial") -public class ServerResponseException extends Exception { - - private final int responseCode; - private final String responseString; - - public ServerResponseException(int responseCode, String responseString) { - super(responseString); - this.responseCode = responseCode; - this.responseString = responseString; - } - - public ServerResponseException(String responseString) { - super(responseString); - this.responseCode = 0; - this.responseString = responseString; - } - - public int getResponseCode() { - return responseCode; - } - - public String getResponseString() { - return responseString; - } - - @Override - public String toString() { - if (responseCode > 0) { - return responseCode + ": " + responseString; - } - return responseString; - } - -} - diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/ThrottlePolicy.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/ThrottlePolicy.java deleted file mode 100644 index 101bd001fb8..00000000000 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/ThrottlePolicy.java +++ /dev/null @@ -1,74 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.vespa.http.client.core; - -import static java.lang.Math.abs; -import static java.lang.Math.max; -import static java.lang.Math.min; - -/** - * Class that has a method for finding next maxInFlight. - * - * @author dybis - */ -public class ThrottlePolicy { - - public static final double SMALL_DIFFERENCE_IN_SUCCESSES_RATIO = 0.15; - private static final double MINIMUM_DIFFERENCE = 0.05; - - /** - * Generate nex in-flight value for throttling. - * - * @param maxPerformanceChange This value limit the dynamics of the algorithm. - * @param numOk number of success in last phase - * @param previousNumOk number of success in previous (before last) phase. - * @param previousMaxInFlight number of max-in-flight in previous (before last) phase. - * @param maxInFlightNow number of max-in-flight in last phase. - * @param messagesQueued if any messages where queued. - * @return The new value to be used for max-in-flight (should be cropped externally to fit max/min values). - */ - public int calcNewMaxInFlight(double maxPerformanceChange, int numOk, int previousNumOk, int previousMaxInFlight, - int maxInFlightNow, boolean messagesQueued) { - - double difference = calculateRuleBasedDifference(maxPerformanceChange, numOk, previousNumOk, previousMaxInFlight, maxInFlightNow); - boolean previousRunWasBetter = numOk < previousNumOk; - boolean previousRunHadLessInFlight = previousMaxInFlight < maxInFlightNow; - - - int delta; - if (previousRunWasBetter == previousRunHadLessInFlight) { - delta = (int) (-1.1 * difference * maxInFlightNow); - } else { - delta = (int) (difference * maxInFlightNow); - } - - // We don't want the same size since we need different sizes for algorithm to adjust. - if (abs(delta) < 2) { - delta = -3; - } - // We never used all permits in previous run, no reason to grow more, we should rather reduce permits. - if (!messagesQueued && delta > 0) { - delta = -2; - } - return maxInFlightNow + delta; - } - - private static double calculateRuleBasedDifference(double maxPerformanceChange, double numOk, double previousNumOk, - double previousMaxInFlight, double maxInFlightNow) { - double difference = min( - maxPerformanceChange, - abs((numOk - previousNumOk) / safeDenominator(previousNumOk))); - - if (abs(previousMaxInFlight - maxInFlightNow) / safeDenominator(min(previousMaxInFlight, maxInFlightNow)) - < SMALL_DIFFERENCE_IN_SUCCESSES_RATIO) { - difference = min(difference, 0.2); - } - - // We want some changes so we can track performance as a result of different throttling. - return max(difference, MINIMUM_DIFFERENCE); - } - - private static double safeDenominator(double x) { - return x == 0.0 ? 1.0 : x; - } - -} diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/XmlFeedReader.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/XmlFeedReader.java deleted file mode 100644 index 349959f496e..00000000000 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/XmlFeedReader.java +++ /dev/null @@ -1,154 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.vespa.http.client.core; - -import com.yahoo.vespa.http.client.FeedClient; -import org.xml.sax.Attributes; -import org.xml.sax.InputSource; -import org.xml.sax.ext.DefaultHandler2; - -import javax.xml.parsers.SAXParser; -import javax.xml.parsers.SAXParserFactory; -import java.io.InputStream; -import java.nio.charset.StandardCharsets; -import java.util.concurrent.atomic.AtomicInteger; - -/** - * Reads an input stream of xml, sends these to session. - * - * @author dybis -*/ -public class XmlFeedReader { - - // Static class. - private XmlFeedReader() {} - - public static void read(InputStream inputStream, FeedClient feedClient, AtomicInteger numSent) throws Exception { - SAXParserFactory parserFactory = SAXParserFactory.newInstance(); - // XXE prevention: - parserFactory.setFeature("http://xml.org/sax/features/external-general-entities", false); - parserFactory.setValidating(false); - parserFactory.setNamespaceAware(false); - SAXParser parser = parserFactory.newSAXParser(); - SAXClientFeeder saxClientFeeder = new SAXClientFeeder(feedClient, numSent); - - InputSource inputSource = new InputSource(); - inputSource.setEncoding(StandardCharsets.UTF_8.displayName()); - inputSource.setByteStream(inputStream); - // This is to send events about CDATA to the saxClientFeeder - // (https://docs.oracle.com/javase/tutorial/jaxp/sax/events.html) - parser.setProperty("http://xml.org/sax/properties/lexical-handler", saxClientFeeder); - parser.parse(inputSource, saxClientFeeder); - } -} - -/** - * Streams XML and sends each document operation to feeder. - */ -class SAXClientFeeder extends DefaultHandler2 { - - public static final String CDATA_START = "<![CDATA["; - public static final String CDATA_STOP = "]]>"; - private final FeedClient feedClient; - int vespaIndent = 0; - int documentIndent = 0; - String documentId = null; - StringBuilder content = new StringBuilder(); - final AtomicInteger numSent; - boolean isCData = false; - - public SAXClientFeeder(FeedClient feedClient, AtomicInteger numSent) { - this.feedClient = feedClient; - this.numSent = numSent; - } - - @Override - public void startCDATA() { - content.append(CDATA_START); - isCData = true; - } - - @Override - public void endCDATA() { - content.append(CDATA_STOP); - isCData = false; - } - - @Override - public void comment(char[] ch, int start, int length) { } - - @SuppressWarnings("fallthrough") - @Override - public void startElement(String uri, String localName, String qName, Attributes attributes) { - switch(qName){ - case "vespafeed": - vespaIndent++; - if (vespaIndent == 1 && documentIndent == 0) { - // If this is the first vespafeed tag, it should not be added to content of the first item. - return; - } - case "update": - case "remove": - case "document" : - documentIndent++; - documentId = attributes.getValue("documentid"); - content = new StringBuilder(); - } - content.append("<" + qName); - if (attributes != null) { - for (int i = 0; i < attributes.getLength (); i++) { - content.append(" ") - .append(attributes.getQName(i)) - .append("=\""); - String attributesValue = attributes.getValue(i); - characters(attributesValue.toCharArray(), 0, attributesValue.length()); - content.append("\""); - } - } - content.append(">"); - } - - @Override - public void endElement(String uri, String localName, String qName) { - content.append("</") - .append(qName) - .append(">"); - switch(qName){ - case "vespafeed": - vespaIndent--; - return; - case "update": - case "remove": - case "document" : - documentIndent--; - if (documentIndent == 0) { - if (documentId == null || documentId.isEmpty()) { - throw new IllegalArgumentException("no docid"); - } - feedClient.stream(documentId, content); - numSent.incrementAndGet(); - } - } - } - - @Override - public void characters (char buf [], int offset, int len) { - if (isCData) { - content.append(buf, offset, len); - return; - } - - // This is on the critical loop for performance, otherwise a library would have been used. - // We can do a few shortcuts as well as this data is already decoded by SAX parser. - for (int x = offset ; x < len + offset ; x++) { - switch (buf[x]) { - case '&' : content.append("&"); continue; - case '<' : content.append("<"); continue; - case '>' : content.append(">"); continue; - case '"' : content.append("""); continue; - case '\'' : content.append("'"); continue; - default: content.append(buf[x]); continue; - } - } - } - -} diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/api/FeedClientImpl.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/api/FeedClientImpl.java deleted file mode 100644 index b46f23923f5..00000000000 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/api/FeedClientImpl.java +++ /dev/null @@ -1,102 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.vespa.http.client.core.api; - -import com.yahoo.vespa.http.client.FeedClient; -import com.yahoo.vespa.http.client.config.SessionParams; -import com.yahoo.vespa.http.client.core.Document; -import com.yahoo.vespa.http.client.core.ThrottlePolicy; -import com.yahoo.vespa.http.client.core.operationProcessor.IncompleteResultsThrottler; -import com.yahoo.vespa.http.client.core.operationProcessor.OperationProcessor; - -import java.nio.charset.CharsetEncoder; -import java.nio.charset.CodingErrorAction; -import java.nio.charset.StandardCharsets; -import java.time.Clock; -import java.time.Instant; -import java.util.Optional; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.logging.Logger; - -/** - * Implementation of FeedClient. It is a thin layer on top of multiClusterHandler and multiClusterResultAggregator. - * - * @author dybis - */ -public class FeedClientImpl implements FeedClient { - - private static final Logger log = Logger.getLogger(FeedClientImpl.class.getName()); - private static final AtomicBoolean warningPrinted = new AtomicBoolean(false); - - private final Clock clock; - private final OperationProcessor operationProcessor; - private final long closeTimeoutMs; - private final long sleepTimeMs = 500; - - public FeedClientImpl(SessionParams sessionParams, - ResultCallback resultCallback, - ScheduledThreadPoolExecutor timeoutExecutor, - Clock clock) { - this.clock = clock; - this.closeTimeoutMs = (10 + 3 * sessionParams.getConnectionParams().getMaxRetries()) * - (sessionParams.getFeedParams().getServerTimeout(TimeUnit.MILLISECONDS) + - sessionParams.getFeedParams().getClientTimeout(TimeUnit.MILLISECONDS)); - this.operationProcessor = new OperationProcessor( - new IncompleteResultsThrottler(sessionParams.getThrottlerMinSize(), - sessionParams.getClientQueueSize(), - clock, - new ThrottlePolicy()), - resultCallback, - sessionParams, - timeoutExecutor, - clock); - if (warningPrinted.compareAndSet(false, true)) { - log.warning("The vespa-http-client is deprecated and will be removed on Vespa 8. " + - "See https://docs.vespa.ai/en/vespa8-release-notes.html"); - } - } - - @Override - public void stream(String documentId, String operationId, CharSequence documentData, Object context) { - CharsetEncoder charsetEncoder = StandardCharsets.UTF_8.newEncoder(); - charsetEncoder.onMalformedInput(CodingErrorAction.REPORT); - charsetEncoder.onUnmappableCharacter(CodingErrorAction.REPORT); - - Document document = new Document(documentId, operationId, documentData, context, clock.instant()); - operationProcessor.sendDocument(document); - } - - @Override - public void close() { - Instant lastOldestResultReceivedAt = Instant.now(); - Optional<String> oldestIncompleteId = operationProcessor.oldestIncompleteResultId(); - - while (oldestIncompleteId.isPresent() && waitForOperations(lastOldestResultReceivedAt, sleepTimeMs, closeTimeoutMs)) { - Optional<String> oldestIncompleteIdNow = operationProcessor.oldestIncompleteResultId(); - if ( ! oldestIncompleteId.equals(oldestIncompleteIdNow)) - lastOldestResultReceivedAt = Instant.now(); - oldestIncompleteId = oldestIncompleteIdNow; - } - operationProcessor.close(); - } - - @Override - public String getStatsAsJson() { - return operationProcessor.getStatsAsJson(); - } - - // On return value true, wait more. Public for testing. - public static boolean waitForOperations(Instant lastResultReceived, long sleepTimeMs, long closeTimeoutMs) { - if (lastResultReceived.plusMillis(closeTimeoutMs).isBefore(Instant.now())) { - return false; - } - try { - Thread.sleep(sleepTimeMs); - } catch (InterruptedException e) { - return false; - } - return true; - } - -} diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/api/MultiClusterSessionOutputStream.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/api/MultiClusterSessionOutputStream.java deleted file mode 100644 index 27fc22b9675..00000000000 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/api/MultiClusterSessionOutputStream.java +++ /dev/null @@ -1,40 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.vespa.http.client.core.api; - -import com.yahoo.vespa.http.client.core.Document; -import com.yahoo.vespa.http.client.core.operationProcessor.OperationProcessor; - -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.time.Clock; - -/** - * Class for wiring up the Session API. It is the return value of stream() in the Session API. - * - * @author dybis -*/ -class MultiClusterSessionOutputStream extends ByteArrayOutputStream { - - private final CharSequence documentId; - private final OperationProcessor operationProcessor; - private final Object context; - private final Clock clock; - - public MultiClusterSessionOutputStream(CharSequence documentId, - OperationProcessor operationProcessor, - Object context, - Clock clock) { - this.documentId = documentId; - this.context = context; - this.operationProcessor = operationProcessor; - this.clock = clock; - } - - @Override - public void close() throws IOException { - Document document = new Document(documentId.toString(), toByteArray(), context, clock.instant()); - operationProcessor.sendDocument(document); - super.close(); - } - -} diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/api/SessionImpl.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/api/SessionImpl.java deleted file mode 100644 index 05b66ac4a46..00000000000 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/api/SessionImpl.java +++ /dev/null @@ -1,73 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.vespa.http.client.core.api; - -import com.yahoo.vespa.http.client.FeedClient; -import com.yahoo.vespa.http.client.Result; -import com.yahoo.vespa.http.client.config.SessionParams; -import com.yahoo.vespa.http.client.core.ThrottlePolicy; -import com.yahoo.vespa.http.client.core.operationProcessor.IncompleteResultsThrottler; -import com.yahoo.vespa.http.client.core.operationProcessor.OperationProcessor; - -import java.io.OutputStream; -import java.time.Clock; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ScheduledThreadPoolExecutor; - -/** - * This class wires up the Session API using MultiClusterHandler and MultiClusterSessionOutputStream. - * - * @deprecated - */ -@Deprecated // TODO: Remove on Vespa 8 -public class SessionImpl implements com.yahoo.vespa.http.client.Session { - - private final OperationProcessor operationProcessor; - private final BlockingQueue<Result> resultQueue = new LinkedBlockingQueue<>(); - private final Clock clock; - - public SessionImpl(SessionParams sessionParams, ScheduledThreadPoolExecutor timeoutExecutor, Clock clock) { - this.clock = clock; - this.operationProcessor = new OperationProcessor( - new IncompleteResultsThrottler( - sessionParams.getThrottlerMinSize(), - sessionParams.getClientQueueSize(), - clock, - new ThrottlePolicy()), - new FeedClient.ResultCallback() { - @Override - public void onCompletion(String docId, Result documentResult) { - resultQueue.offer(documentResult); - } - }, - sessionParams, - timeoutExecutor, - clock); - } - - @Override - public OutputStream stream(CharSequence documentId) { - return new MultiClusterSessionOutputStream(documentId, operationProcessor, null, clock); - } - - @Override - public BlockingQueue<Result> results() { - return resultQueue; - } - - @Override - public void close() { - operationProcessor.close(); - } - - @Override - public String getStatsAsJson() { - return operationProcessor.getStatsAsJson(); - } - - // For testing only (legacy tests). - public int getIncompleteResultQueueSize() { - return operationProcessor.getIncompleteResultQueueSize(); - } - -} diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ApacheGatewayConnection.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ApacheGatewayConnection.java deleted file mode 100644 index 6b1078fa393..00000000000 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ApacheGatewayConnection.java +++ /dev/null @@ -1,507 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.vespa.http.client.core.communication; - -import ai.vespa.util.http.hc4.VespaHttpClientBuilder; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.yahoo.security.SslContextBuilder; -import com.yahoo.vespa.http.client.config.ConnectionParams; -import com.yahoo.vespa.http.client.config.Endpoint; -import com.yahoo.vespa.http.client.config.FeedParams; -import com.yahoo.vespa.http.client.core.Document; -import com.yahoo.vespa.http.client.core.Encoder; -import com.yahoo.vespa.http.client.core.Headers; -import com.yahoo.vespa.http.client.core.ServerResponseException; -import com.yahoo.vespa.http.client.core.Vtag; -import org.apache.http.Header; -import org.apache.http.HttpHost; -import org.apache.http.HttpRequest; -import org.apache.http.HttpResponse; -import org.apache.http.StatusLine; -import org.apache.http.auth.AUTH; -import org.apache.http.auth.AuthScope; -import org.apache.http.auth.ChallengeState; -import org.apache.http.auth.Credentials; -import org.apache.http.auth.UsernamePasswordCredentials; -import org.apache.http.client.config.RequestConfig; -import org.apache.http.client.methods.HttpPost; -import org.apache.http.client.protocol.HttpClientContext; -import org.apache.http.entity.InputStreamEntity; -import org.apache.http.impl.auth.BasicScheme; -import org.apache.http.impl.client.BasicAuthCache; -import org.apache.http.impl.client.BasicCredentialsProvider; -import org.apache.http.impl.client.CloseableHttpClient; -import org.apache.http.impl.client.HttpClientBuilder; -import org.apache.http.message.BasicHeader; -import org.apache.http.protocol.BasicHttpContext; -import org.apache.http.protocol.HttpContext; -import org.apache.http.util.EntityUtils; - -import javax.net.ssl.SSLContext; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.nio.ByteBuffer; -import java.nio.charset.StandardCharsets; -import java.time.Clock; -import java.time.Instant; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Optional; -import java.util.UUID; -import java.util.concurrent.TimeUnit; -import java.util.logging.Level; -import java.util.logging.Logger; -import java.util.zip.GZIPOutputStream; - -/** - * @author Einar M R Rosenvinge - */ -class ApacheGatewayConnection implements GatewayConnection { - - private static final Logger log = Logger.getLogger(ApacheGatewayConnection.class.getName()); - private static final ObjectMapper mapper = new ObjectMapper(); - private static final String PATH = "/reserved-for-internal-use/feedapi?"; - private static final byte[] START_OF_FEED_XML = "<vespafeed>\n".getBytes(StandardCharsets.UTF_8); - private static final byte[] END_OF_FEED_XML = "\n</vespafeed>\n".getBytes(StandardCharsets.UTF_8); - private static final byte[] START_OF_FEED_JSON = "[".getBytes(StandardCharsets.UTF_8); - private static final byte[] END_OF_FEED_JSON = "]".getBytes(StandardCharsets.UTF_8); - - private final List<Integer> supportedVersions = new ArrayList<>(); - private final byte[] startOfFeed; - private final byte[] endOfFeed; - private final Endpoint endpoint; - private final FeedParams feedParams; - private final String clusterSpecificRoute; - private final ConnectionParams connectionParams; - private CloseableHttpClient httpClient; - private Instant connectionTime = null; - private Instant lastPollTime = null; - private String sessionId; - private final String clientId; - private int negotiatedVersion = -1; - private final HttpClientFactory httpClientFactory; - private final String shardingKey = UUID.randomUUID().toString().substring(0, 5); - private final Clock clock; - - ApacheGatewayConnection(Endpoint endpoint, - FeedParams feedParams, - String clusterSpecificRoute, - ConnectionParams connectionParams, - HttpClientFactory httpClientFactory, - String clientId, - Clock clock) { - supportedVersions.add(3); - this.endpoint = endpoint; - this.feedParams = feedParams; - this.clusterSpecificRoute = clusterSpecificRoute; - this.httpClientFactory = httpClientFactory; - this.connectionParams = connectionParams; - this.httpClient = null; - this.clientId = clientId; - this.clock = clock; - - if (feedParams.getDataFormat() == FeedParams.DataFormat.JSON_UTF8) { - startOfFeed = START_OF_FEED_JSON; - endOfFeed = END_OF_FEED_JSON; - } else { - startOfFeed = START_OF_FEED_XML; - endOfFeed = END_OF_FEED_XML; - } - } - - @Override - public InputStream write(List<Document> docs) throws ServerResponseException, IOException { - return write(docs, false, connectionParams.getUseCompression()); - } - - @Override - public InputStream poll() throws ServerResponseException, IOException { - lastPollTime = clock.instant(); - return write(Collections.<Document>emptyList(), false, false); - } - - @Override - public Instant lastPollTime() { return lastPollTime; } - - @Override - public InputStream drain() throws ServerResponseException, IOException { - return write(Collections.<Document>emptyList(), true, false); - } - - @Override - public boolean connect() { - log.fine(() -> "Attempting to connect to " + endpoint); - if (httpClient != null) - log.log(Level.WARNING, "Previous httpClient still exists."); - httpClient = httpClientFactory.createClient(); - connectionTime = clock.instant(); - return httpClient != null; - } - - @Override - public Instant connectionTime() { return connectionTime; } - - // Protected for easier testing only. - protected static InputStreamEntity zipAndCreateEntity(final InputStream inputStream) throws IOException { - byte[] buffer = new byte[4096]; - GZIPOutputStream gzos = null; - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - try { - gzos = new GZIPOutputStream(baos); - while (inputStream.available() > 0) { - int length = inputStream.read(buffer); - gzos.write(buffer, 0,length); - } - } finally { - if (gzos != null) { - gzos.close(); - } - } - byte[] fooGzippedBytes = baos.toByteArray(); - return new InputStreamEntity(new ByteArrayInputStream(fooGzippedBytes), -1); - } - - private InputStream write(List<Document> docs, boolean drain, boolean useCompression) - throws ServerResponseException, IOException { - HttpPost httpPost = createPost(drain, useCompression, false); - - ByteBuffer[] buffers = getDataWithStartAndEndOfFeed(docs, negotiatedVersion); - InputStream inputStream = new ByteBufferInputStream(buffers); - InputStreamEntity reqEntity = useCompression ? zipAndCreateEntity(inputStream) - : new InputStreamEntity(inputStream, -1); - reqEntity.setChunked(true); - httpPost.setEntity(reqEntity); - return executePost(httpPost); - } - - private ByteBuffer[] getDataWithStartAndEndOfFeed(List<Document> docs, int version) { - List<ByteBuffer> data = new ArrayList<>(); - if (version == 3) { - for (Document doc : docs) { - int operationSize = doc.size() + startOfFeed.length + endOfFeed.length; - StringBuilder envelope = new StringBuilder(); - Encoder.encode(doc.getOperationId(), envelope); - envelope.append(' '); - envelope.append(Integer.toHexString(operationSize)); - envelope.append('\n'); - data.add(StandardCharsets.US_ASCII.encode(envelope.toString())); - data.add(ByteBuffer.wrap(startOfFeed)); - data.add(doc.getData()); - data.add(ByteBuffer.wrap(endOfFeed)); - } - } else { - throw new IllegalArgumentException("Protocol version " + version + " unsupported by client."); - } - return data.toArray(new ByteBuffer[data.size()]); - } - - private HttpPost createPost(boolean drain, boolean useCompression, boolean isHandshake) { - HttpPost httpPost = new HttpPost(createUri()); - - for (int v : supportedVersions) { - httpPost.addHeader(Headers.VERSION, "" + v); - } - if (sessionId != null) { - httpPost.setHeader(Headers.SESSION_ID, sessionId); - } - if (clientId != null) { - httpPost.setHeader(Headers.CLIENT_ID, clientId); - } - httpPost.setHeader(Headers.SHARDING_KEY, shardingKey); - httpPost.setHeader(Headers.DRAIN, drain ? "true" : "false"); - if (clusterSpecificRoute != null) { - httpPost.setHeader(Headers.ROUTE, feedParams.getRoute()); - } else { - if (feedParams.getRoute() != null) { - httpPost.setHeader(Headers.ROUTE, feedParams.getRoute()); - } - } - if (!isHandshake) { - if (feedParams.getDataFormat() == FeedParams.DataFormat.JSON_UTF8) { - httpPost.setHeader(Headers.DATA_FORMAT, FeedParams.DataFormat.JSON_UTF8.name()); - } else { - httpPost.setHeader(Headers.DATA_FORMAT, FeedParams.DataFormat.XML_UTF8.name()); - } - if (feedParams.getPriority() != null) { - httpPost.setHeader(Headers.PRIORITY, feedParams.getPriority()); - } - if (connectionParams.getTraceLevel() != 0) { - httpPost.setHeader(Headers.TRACE_LEVEL, String.valueOf(connectionParams.getTraceLevel())); - } - if (negotiatedVersion == 3 && feedParams.getDenyIfBusyV3()) { - httpPost.setHeader(Headers.DENY_IF_BUSY, "true"); - } - } - if (feedParams.getSilentUpgrade()) { - httpPost.setHeader(Headers.SILENTUPGRADE, "true"); - } - httpPost.setHeader(Headers.TIMEOUT, "" + feedParams.getServerTimeout(TimeUnit.SECONDS)); - - for (Map.Entry<String, String> extraHeader : connectionParams.getHeaders()) { - httpPost.addHeader(extraHeader.getKey(), extraHeader.getValue()); - } - connectionParams.getDynamicHeaders().forEach((headerName, provider) -> { - String headerValue = Objects.requireNonNull( - provider.getHeaderValue(), - provider.getClass().getName() + ".getHeader() returned null as header value!"); - httpPost.addHeader(headerName, headerValue); - }); - - if (useCompression) { // This causes the apache client to gzip the request content. Weird, huh? - httpPost.setHeader("Content-Encoding", "gzip"); - } - return httpPost; - } - - private InputStream executePost(HttpPost httpPost) throws ServerResponseException, IOException { - if (httpClient == null) - throw new IOException("Trying to executePost while not having a connection/http client"); - String proxyAuthzHeader = getCustomProxyAuthorizationHeader(connectionParams).orElse(null); - HttpResponse response; - if (connectionParams.getProxyHost() != null && proxyAuthzHeader != null) { - HttpContext context = createContextForcingPreemptiveProxyAuth(proxyAuthzHeader); - response = httpClient.execute(httpPost, context); - } else { - response = httpClient.execute(httpPost); - } - try { - verifyServerResponseCode(response); - verifyServerVersion(response.getFirstHeader(Headers.VERSION)); - verifySessionHeader(response.getFirstHeader(Headers.SESSION_ID)); - } catch (ServerResponseException e) { - // Ensure response is consumed to allow connection reuse later on - EntityUtils.consumeQuietly(response.getEntity()); - throw e; - } - // Consume response now to allow connection to be reused immediately - byte[] responseData = EntityUtils.toByteArray(response.getEntity()); - return responseData == null ? null : new ByteArrayInputStream(responseData); - } - - private static Optional<String> getCustomProxyAuthorizationHeader(ConnectionParams params) { - return params.getHeaders().stream() - .filter(h -> h.getKey().equals(AUTH.PROXY_AUTH_RESP)) - .findAny() - .map(Map.Entry::getValue); - } - - private HttpContext createContextForcingPreemptiveProxyAuth(String proxyAuthzHeader) { - BasicAuthCache authCache = new BasicAuthCache(); - HttpHost proxy = new HttpHost(connectionParams.getProxyHost(), connectionParams.getProxyPort()); - authCache.put(proxy, new CustomAuthScheme(proxyAuthzHeader)); - HttpContext context = new BasicHttpContext(); - context.setAttribute(HttpClientContext.AUTH_CACHE, authCache); - BasicCredentialsProvider prov = new BasicCredentialsProvider(); - prov.setCredentials(new AuthScope(proxy), new UsernamePasswordCredentials("", "")); - context.setAttribute(HttpClientContext.CREDS_PROVIDER, prov); - return context; - } - private static class CustomAuthScheme extends BasicScheme { - final String proxyAuthzHeader; - @SuppressWarnings("deprecation") - CustomAuthScheme(String proxyAuthzHeader) { - super(ChallengeState.PROXY); - this.proxyAuthzHeader = proxyAuthzHeader; - } - @Override - public Header authenticate(Credentials credentials, HttpRequest request, HttpContext context) { - return new BasicHeader(AUTH.PROXY_AUTH_RESP, proxyAuthzHeader); - } - } - - - private void verifyServerResponseCode(HttpResponse response) throws ServerResponseException { - StatusLine statusLine = response.getStatusLine(); - int statusCode = statusLine.getStatusCode(); - - // We use code 261-299 to report errors related to internal transitive errors that the tenants should not care - // about to avoid masking more serious errors. - if (statusCode > 199 && statusCode < 260) return; - if (statusCode == 299) throw new ServerResponseException(429, "Too many requests."); - throw new ServerResponseException(statusCode, - tryGetDetailedErrorMessage(response).orElseGet(statusLine::getReasonPhrase)); - } - - private static Optional<String> tryGetDetailedErrorMessage(HttpResponse response) { - Header contentType = response.getEntity().getContentType(); - if (contentType == null || !contentType.getValue().equalsIgnoreCase("application/json")) return Optional.empty(); - try (InputStream in = response.getEntity().getContent()) { - JsonNode jsonNode = mapper.readTree(in); - JsonNode message = jsonNode.get("message"); - if (message == null || message.textValue() == null) return Optional.empty(); - return Optional.of(response.getStatusLine().getReasonPhrase() + " - " + message.textValue()); - } catch (IOException e) { - return Optional.empty(); - } - } - - private void verifySessionHeader(Header serverHeader) throws ServerResponseException { - if (serverHeader == null) { - throw new ServerResponseException("Got no session ID from server."); - } - final String serverHeaderVal = serverHeader.getValue().trim(); - if (negotiatedVersion == 3) { - if (clientId == null || !clientId.equals(serverHeaderVal)) { - String message = "Running using v3. However, server responds with different session " + - "than client has set; " + serverHeaderVal + " vs client code " + clientId; - log.severe(message); - throw new ServerResponseException(message); - } - return; - } - if (sessionId == null) { //this must be the first request - log.finer("Got session ID from server: " + serverHeaderVal); - this.sessionId = serverHeaderVal; - } else { - if (!sessionId.equals(serverHeaderVal)) { - log.info("Request has been routed to a server which does not recognize the client session." + - " Most likely cause is upgrading of cluster, transitive error."); - throw new ServerResponseException("Session ID received from server ('" + serverHeaderVal + - "') does not match cached session ID ('" + sessionId + "')"); - } - } - } - - private void verifyServerVersion(Header serverHeader) throws ServerResponseException { - if (serverHeader == null) { - throw new ServerResponseException("Got bad protocol version from server."); - } - int serverVersion; - try { - serverVersion = Integer.parseInt(serverHeader.getValue()); - } catch (NumberFormatException nfe) { - throw new ServerResponseException("Got bad protocol version from server: " + nfe.getMessage()); - } - if (!supportedVersions.contains(serverVersion)) { - throw new ServerResponseException("Unsupported version: " + serverVersion - + ". Supported versions: " + supportedVersions); - } - if (negotiatedVersion == -1) { - if (log.isLoggable(Level.FINE)) { - log.log(Level.FINE, "Server decided upon protocol version " + serverVersion + "."); - } - } - this.negotiatedVersion = serverVersion; - } - - private String createUri() { - StringBuilder u = new StringBuilder(); - u.append(endpoint.isUseSsl() ? "https://" : "http://"); - u.append(endpoint.getHostname()); - u.append(":").append(endpoint.getPort()); - u.append(PATH); - u.append(feedParams.toUriParameters()); - return u.toString(); - } - - @Override - public Endpoint getEndpoint() { - return endpoint; - } - - @Override - public void handshake() throws ServerResponseException, IOException { - boolean useCompression = false; - boolean drain = false; - boolean handshake = true; - HttpPost httpPost = createPost(drain, useCompression, handshake); - - String oldSessionID = sessionId; - sessionId = null; - try (InputStream stream = executePost(httpPost)) { - if (oldSessionID != null && !oldSessionID.equals(sessionId)) { - throw new ServerResponseException( - "Session ID changed after new handshake, some documents might not be acked to correct thread. " - + getEndpoint() + " old " + oldSessionID + " new " + sessionId); - } - if (stream == null) { - log.fine("Stream is null."); - } - log.fine("Got session ID " + sessionId); - } - } - - @Override - public void close() { - try { - if (httpClient != null) - httpClient.close(); - } - catch (IOException e) { - log.log(Level.WARNING, "Failed closing HTTP client", e); - } - httpClient = null; - } - - /** - * On re-connect we want to recreate the connection, hence we need a factory. - */ - public static class HttpClientFactory { - - private final FeedParams feedParams; - final ConnectionParams connectionParams; - final boolean useSsl; - - public HttpClientFactory(FeedParams feedParams, ConnectionParams connectionParams, boolean useSsl) { - this.feedParams = feedParams; - this.connectionParams = connectionParams; - this.useSsl = useSsl; - } - - public CloseableHttpClient createClient() { - HttpClientBuilder clientBuilder; - if (connectionParams.useTlsConfigFromEnvironment()) { - clientBuilder = VespaHttpClientBuilder.create(); - } else { - clientBuilder = HttpClientBuilder.create(); - if (connectionParams.getSslContext() != null) { - setSslContext(clientBuilder, connectionParams.getSslContext()); - } else { - SslContextBuilder builder = new SslContextBuilder(); - if (connectionParams.getPrivateKey() != null && connectionParams.getCertificate() != null) { - builder.withKeyStore(connectionParams.getPrivateKey(), connectionParams.getCertificate()); - } - if (connectionParams.getCaCertificates() != null) { - builder.withTrustStore(connectionParams.getCaCertificates()); - } - setSslContext(clientBuilder, builder.build()); - } - if (connectionParams.getHostnameVerifier() != null) { - clientBuilder.setSSLHostnameVerifier(connectionParams.getHostnameVerifier()); - } - clientBuilder.setUserTokenHandler(context -> null); // https://stackoverflow.com/a/42112034/1615280 - } - clientBuilder.setMaxConnPerRoute(1); - clientBuilder.setMaxConnTotal(1); - clientBuilder.setUserAgent(String.format("vespa-http-client (%s)", Vtag.V_TAG_COMPONENT)); - clientBuilder.setDefaultHeaders(Collections.singletonList(new BasicHeader(Headers.CLIENT_VERSION, Vtag.V_TAG_COMPONENT))); - int millisTotalTimeout = (int) (feedParams.getClientTimeout(TimeUnit.MILLISECONDS) + feedParams.getServerTimeout(TimeUnit.MILLISECONDS)); - RequestConfig.Builder requestConfigBuilder = RequestConfig.custom() - .setSocketTimeout(millisTotalTimeout) - .setConnectTimeout(millisTotalTimeout); - if (connectionParams.getProxyHost() != null) { - requestConfigBuilder.setProxy(new HttpHost(connectionParams.getProxyHost(), connectionParams.getProxyPort())); - } - clientBuilder.setDefaultRequestConfig(requestConfigBuilder.build()); - - log.fine(() -> "Creating HttpClient:" + - " ConnectionTimeout " + connectionParams.getConnectionTimeToLive().getSeconds() + " seconds" + - " proxyhost (can be null) " + connectionParams.getProxyHost() + ":" + connectionParams.getProxyPort() - + (useSsl ? " using ssl " : " not using ssl") - ); - return clientBuilder.build(); - } - } - - // Note: Using deprecated setSslContext() to allow httpclient 4.4 on classpath (e.g unexpected Maven dependency resolution for test classpath) - @SuppressWarnings("deprecation") - private static void setSslContext(HttpClientBuilder builder, SSLContext sslContext) { - builder.setSslcontext(sslContext); - } - -} diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ApacheGatewayConnectionFactory.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ApacheGatewayConnectionFactory.java deleted file mode 100644 index 0b02626d6e8..00000000000 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ApacheGatewayConnectionFactory.java +++ /dev/null @@ -1,63 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.vespa.http.client.core.communication; - -import com.yahoo.vespa.http.client.config.ConnectionParams; -import com.yahoo.vespa.http.client.config.Endpoint; -import com.yahoo.vespa.http.client.config.FeedParams; - -import java.net.InetAddress; -import java.net.UnknownHostException; -import java.time.Clock; -import java.util.Objects; - -/** - * @author bratseth - */ -public class ApacheGatewayConnectionFactory implements GatewayConnectionFactory { - - private final Endpoint endpoint; - private final FeedParams feedParams; - private final String clusterSpecificRoute; - private final ConnectionParams connectionParams; - private final ApacheGatewayConnection.HttpClientFactory httpClientFactory; - private final String clientId; - private final Clock clock; - - public ApacheGatewayConnectionFactory(Endpoint endpoint, - FeedParams feedParams, - String clusterSpecificRoute, - ConnectionParams connectionParams, - ApacheGatewayConnection.HttpClientFactory httpClientFactory, - String clientId, - Clock clock) { - this.endpoint = validate(endpoint); - this.feedParams = feedParams; - this.clusterSpecificRoute = clusterSpecificRoute; - this.httpClientFactory = httpClientFactory; - this.connectionParams = connectionParams; - this.clientId = Objects.requireNonNull(clientId, "clientId cannot be null"); - this.clock = clock; - } - - private static Endpoint validate(Endpoint endpoint) { - try { - InetAddress.getByName(endpoint.getHostname()); - return endpoint; - } - catch (UnknownHostException e) { - throw new IllegalArgumentException("Unknown host: " + endpoint); - } - } - - @Override - public GatewayConnection newConnection() { - return new ApacheGatewayConnection(endpoint, - feedParams, - clusterSpecificRoute, - connectionParams, - httpClientFactory, - clientId, - clock); - } - -} diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ByteBufferInputStream.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ByteBufferInputStream.java deleted file mode 100644 index f88519c2615..00000000000 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ByteBufferInputStream.java +++ /dev/null @@ -1,77 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.vespa.http.client.core.communication; - -import java.io.IOException; -import java.io.InputStream; -import java.nio.ByteBuffer; -import java.util.ArrayDeque; -import java.util.Deque; - -/** - * @author Einar M R Rosenvinge - */ -class ByteBufferInputStream extends InputStream { - private final Deque<ByteBuffer> currentBuffers = new ArrayDeque<>(); - - ByteBufferInputStream(ByteBuffer[] buffers) { - for (int i = buffers.length - 1; i > -1; i--) { - currentBuffers.push(buffers[i]); - } - } - - @Override - public int read() throws IOException { - pop(); - if (currentBuffers.isEmpty()) { - return -1; - } - return currentBuffers.peek().get(); - } - - private void pop() { - if (currentBuffers.isEmpty()) { - return; - } - - while (!currentBuffers.isEmpty() && !currentBuffers.peek().hasRemaining()) { - //it's exhausted, get rid of it - currentBuffers.pop(); - } - } - - @Override - public int read(byte[] b, int off, int len) throws IOException { - if (b == null) { - throw new NullPointerException(); - } else if (off < 0 || len < 0 || len > b.length - off) { - throw new IndexOutOfBoundsException(); - } else if (len == 0) { - return 0; - } - pop(); - if (currentBuffers.isEmpty()) { - return -1; - } - int toRead = Math.min(len, currentBuffers.peek().remaining()); - currentBuffers.peek().get(b, off, toRead); - return toRead; - } - - @Override - public long skip(long n) throws IOException { - throw new IOException("skip() not supported."); - } - - @Override - public int available() throws IOException { - if (currentBuffers.isEmpty()) { - return 0; - } - - int size = 0; - for (ByteBuffer b : currentBuffers) { - size += b.remaining(); - } - return size; - } -} diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ClusterConnection.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ClusterConnection.java deleted file mode 100644 index ef5b7afd16a..00000000000 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ClusterConnection.java +++ /dev/null @@ -1,193 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.vespa.http.client.core.communication; - -import com.fasterxml.jackson.core.JsonGenerator; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.datatype.jdk8.Jdk8Module; -import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; -import com.yahoo.vespa.http.client.config.Cluster; -import com.yahoo.vespa.http.client.config.ConnectionParams; -import com.yahoo.vespa.http.client.config.Endpoint; -import com.yahoo.vespa.http.client.config.FeedParams; -import com.yahoo.vespa.http.client.core.Document; -import com.yahoo.vespa.http.client.core.Exceptions; -import com.yahoo.vespa.http.client.core.operationProcessor.OperationProcessor; - -import java.io.IOException; -import java.io.StringWriter; -import java.time.Clock; -import java.time.Duration; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.TimeUnit; - -/** - * @author Einar M R Rosenvinge - */ -public class ClusterConnection implements AutoCloseable { - - private static ObjectMapper jsonMapper = createMapper(); - - private final List<IOThread> ioThreads = new ArrayList<>(); - private final int clusterId; - private final ThreadGroup ioThreadGroup; - - /** The shared queue of document operations the io threads will take from */ - private final DocumentQueue documentQueue; - - /** The single endpoint this sends to, or null if it will send to multiple endpoints */ - private final Endpoint singleEndpoint; - - public ClusterConnection(OperationProcessor operationProcessor, - FeedParams feedParams, - ConnectionParams connectionParams, - Cluster cluster, - int clusterId, - int clientQueueSizePerCluster, - ScheduledThreadPoolExecutor timeoutExecutor, - Clock clock) { - if (cluster.getEndpoints().isEmpty()) - throw new IllegalArgumentException("At least a single endpoint is required in " + cluster); - - this.clusterId = clusterId; - int totalNumberOfEndpointsInThisCluster = cluster.getEndpoints().size() * connectionParams.getNumPersistentConnectionsPerEndpoint(); - if (totalNumberOfEndpointsInThisCluster == 0) - throw new IllegalArgumentException("At least 1 persistent connection per endpoint is required in " + cluster); - int maxInFlightPerSession = Math.max(1, feedParams.getMaxInFlightRequests() / totalNumberOfEndpointsInThisCluster); - - documentQueue = new DocumentQueue(clientQueueSizePerCluster, clock); - ioThreadGroup = operationProcessor.getIoThreadGroup(); - singleEndpoint = cluster.getEndpoints().size() == 1 ? cluster.getEndpoints().get(0) : null; - Double idlePollFrequency = feedParams.getIdlePollFrequency(); - if (idlePollFrequency == null) - idlePollFrequency = 10.0; - for (Endpoint endpoint : cluster.getEndpoints()) { - EndpointResultQueue endpointResultQueue = new EndpointResultQueue(operationProcessor, - endpoint, - clusterId, - timeoutExecutor, - feedParams.getServerTimeout(TimeUnit.MILLISECONDS) + feedParams.getClientTimeout(TimeUnit.MILLISECONDS)); - for (int i = 0; i < connectionParams.getNumPersistentConnectionsPerEndpoint(); i++) { - GatewayConnectionFactory connectionFactory; - if (connectionParams.isDryRun()) { - connectionFactory = new DryRunGatewayConnectionFactory(endpoint, clock); - } else { - connectionFactory = new ApacheGatewayConnectionFactory(endpoint, - feedParams, - cluster.getRoute(), - connectionParams, - new ApacheGatewayConnection.HttpClientFactory(feedParams, connectionParams, endpoint.isUseSsl()), - operationProcessor.getClientId(), - clock - ); - } - IOThread ioThread = new IOThread(operationProcessor.getIoThreadGroup(), - endpoint, - endpointResultQueue, - connectionFactory, - clusterId, - feedParams.getMaxChunkSizeBytes(), - maxInFlightPerSession, - Duration.ofMillis(feedParams.getLocalQueueTimeOut()), - documentQueue, - feedParams.getMaxSleepTimeMs(), - connectionParams.getConnectionTimeToLive(), - connectionParams.runThreads(), - idlePollFrequency, - clock); - ioThreads.add(ioThread); - } - } - } - - private static ObjectMapper createMapper() { - ObjectMapper mapper = new ObjectMapper(); - mapper.registerModule(new Jdk8Module()); - mapper.registerModule(new JavaTimeModule()); - return mapper; - } - - public int getClusterId() { - return clusterId; - } - - public void post(Document document) throws EndpointIOException { - try { - documentQueue.put(document, Thread.currentThread().getThreadGroup() == ioThreadGroup); - } catch (Throwable t) { // InterruptedException if shutting down, IllegalStateException if already shut down - throw new EndpointIOException(singleEndpoint, "While sending", t); - } - } - - @SuppressWarnings("ThrowableResultOfMethodCallIgnored") - @Override - public void close() { - List<Exception> exceptions = new ArrayList<>(); - for (IOThread ioThread : ioThreads) { - try { - ioThread.close(); - } catch (Exception e) { - exceptions.add(e); - } - } - if (exceptions.isEmpty()) { - return; - } - if (exceptions.size() == 1) { - if (exceptions.get(0) instanceof RuntimeException) { - throw (RuntimeException) exceptions.get(0); - } else { - throw new RuntimeException(exceptions.get(0)); - } - } - StringBuilder b = new StringBuilder(); - b.append("Exception thrown while closing one or more endpoints: "); - for (int i = 0; i < exceptions.size(); i++) { - Exception e = exceptions.get(i); - b.append(Exceptions.toMessageString(e)); - if (i != (exceptions.size() - 1)) { - b.append(", "); - } - } - throw new RuntimeException(b.toString(), exceptions.get(0)); - } - - public String getStatsAsJSon() throws IOException { - StringWriter stringWriter = new StringWriter(); - JsonGenerator jsonGenerator = jsonMapper.createGenerator(stringWriter); - jsonGenerator.writeStartObject(); - jsonGenerator.writeArrayFieldStart("session"); - for (IOThread ioThread : ioThreads) { - jsonGenerator.writeStartObject(); - jsonGenerator.writeObjectFieldStart("endpoint"); - jsonGenerator.writeStringField("host", ioThread.getEndpoint().getHostname()); - jsonGenerator.writeNumberField("port", ioThread.getEndpoint().getPort()); - jsonGenerator.writeEndObject(); - jsonGenerator.writeFieldName("stats"); - IOThread.ConnectionStats connectionStats = ioThread.getConnectionStats(); - jsonMapper.writeValue(jsonGenerator, connectionStats); - jsonGenerator.writeEndObject(); - } - jsonGenerator.writeEndArray(); - jsonGenerator.writeEndObject(); - jsonGenerator.close(); - return stringWriter.toString(); - } - - public List<IOThread> ioThreads() { - return Collections.unmodifiableList(ioThreads); - } - - @Override - public boolean equals(Object o) { - return (this == o) || (o instanceof ClusterConnection && clusterId == ((ClusterConnection) o).clusterId); - } - - @Override - public int hashCode() { - return clusterId; - } - -} diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DocumentQueue.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DocumentQueue.java deleted file mode 100644 index 8164534ca37..00000000000 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DocumentQueue.java +++ /dev/null @@ -1,125 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.vespa.http.client.core.communication; - -import com.yahoo.vespa.http.client.core.Document; - -import java.time.Clock; -import java.time.Duration; -import java.util.ArrayDeque; -import java.util.ArrayList; -import java.util.Deque; -import java.util.List; -import java.util.Optional; -import java.util.concurrent.TimeUnit; - -/** - * Shared document queue that gives clients operations on documents which do not have operations already in flight. - * This is multithread safe. - * - * @author dybis - */ -class DocumentQueue { - - private final Deque<Document> queue; - private final int maxSize; - private boolean closed = false; - private final Clock clock; - - DocumentQueue(int maxSize, Clock clock) { - this.maxSize = maxSize; - this.queue = new ArrayDeque<>(maxSize); - this.clock = clock; - } - - List<Document> removeAllDocuments() { - synchronized (queue) { - List<Document> allDocs = new ArrayList<>(); - while (!queue.isEmpty()) { - allDocs.add(queue.poll()); - } - queue.notifyAll(); - return allDocs; - } - } - - void put(Document document, boolean calledFromIoThreadGroup) throws InterruptedException { - document.setQueueInsertTime(clock.instant()); - synchronized (queue) { - while (!closed && (queue.size() >= maxSize) && !calledFromIoThreadGroup) { - queue.wait(); - } - if (closed) { - throw new IllegalStateException("Cannot add elements to closed queue."); - } - queue.add(document); - queue.notifyAll(); - } - } - - Document poll(long timeout, TimeUnit unit) throws InterruptedException { - synchronized (queue) { - long remainingToWait = unit.toMillis(timeout); - while (queue.isEmpty()) { - long startTime = clock.millis(); - queue.wait(remainingToWait); - remainingToWait -= (clock.millis() - startTime); - if (remainingToWait <= 0) { - break; - } - } - Document document = queue.poll(); - queue.notifyAll(); - return document; - } - } - - Document poll() { - synchronized (queue) { - Document document = queue.poll(); - queue.notifyAll(); - return document; - } - } - - boolean isEmpty() { - synchronized (queue) { - return queue.isEmpty(); - } - } - - int size() { - synchronized (queue) { - return queue.size(); - } - } - - void clear() { - synchronized (queue) { - queue.clear(); - queue.notifyAll(); - } - } - - boolean close() { - boolean previousState; - synchronized (queue) { - previousState = closed; - closed = true; - queue.notifyAll(); - } - return previousState; - } - - Optional<Document> pollDocumentIfTimedoutInQueue(Duration localQueueTimeOut) { - synchronized (queue) { - if (queue.isEmpty()) return Optional.empty(); - - Document document = queue.peek(); - if (document.getQueueInsertTime().plus(localQueueTimeOut).isBefore(clock.instant())) - return Optional.ofNullable(queue.poll()); - else - return Optional.empty(); - } - } - -} diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DryRunGatewayConnection.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DryRunGatewayConnection.java deleted file mode 100644 index a6f6992b238..00000000000 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DryRunGatewayConnection.java +++ /dev/null @@ -1,123 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.vespa.http.client.core.communication; - -import com.yahoo.vespa.http.client.config.Endpoint; -import com.yahoo.vespa.http.client.core.Document; -import com.yahoo.vespa.http.client.core.ErrorCode; -import com.yahoo.vespa.http.client.core.OperationStatus; -import com.yahoo.vespa.http.client.core.ServerResponseException; - -import java.io.ByteArrayInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.nio.charset.StandardCharsets; -import java.time.Clock; -import java.time.Instant; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; - -/** - * Dummy implementation. - * - * @author dybis - */ -public class DryRunGatewayConnection implements GatewayConnection { - - private final Endpoint endpoint; - private final Clock clock; - private Instant connectionTime = null; - private Instant lastPollTime = null; - - /** Set to true to hold off responding with a result to any incoming operations until this is set false */ - private boolean hold = false; - private final List<Document> held = new ArrayList<>(); - - /** If this is set, handshake operations will throw this exception */ - private ServerResponseException throwThisOnHandshake = null; - - /** If this is set, all write operations will throw this exception */ - private IOException throwThisOnWrite = null; - - public DryRunGatewayConnection(Endpoint endpoint, Clock clock) { - this.endpoint = endpoint; - this.clock = clock; - } - - @Override - public synchronized InputStream write(List<Document> docs) throws IOException { - if (throwThisOnWrite != null) - throw throwThisOnWrite; - - if (hold) { - held.addAll(docs); - return new ByteArrayInputStream("".getBytes(StandardCharsets.UTF_8)); - } - else { - StringBuilder result = new StringBuilder(); - for (Document doc : held) - result.append(okResponse(doc).render()); - held.clear(); - for (Document doc : docs) - result.append(okResponse(doc).render()); - return new ByteArrayInputStream(result.toString().getBytes(StandardCharsets.UTF_8)); - } - } - - @Override - public synchronized InputStream poll() throws IOException { - lastPollTime = clock.instant(); - return write(new ArrayList<>()); - } - - @Override - public synchronized Instant lastPollTime() { return lastPollTime; } - - @Override - public synchronized InputStream drain() throws IOException { - return write(new ArrayList<>()); - } - - @Override - public synchronized boolean connect() { - connectionTime = clock.instant(); - return true; - } - - @Override - public synchronized Instant connectionTime() { return connectionTime; } - - @Override - public Endpoint getEndpoint() { - return endpoint; - } - - @Override - public synchronized void handshake() throws ServerResponseException { - if (throwThisOnHandshake != null) - throw throwThisOnHandshake; - } - - @Override - public synchronized void close() { } - - public synchronized void hold(boolean hold) { - this.hold = hold; - } - - /** Returns the document currently held in this */ - public synchronized List<Document> held() { return Collections.unmodifiableList(held); } - - public synchronized void throwOnWrite(IOException throwThisOnWrite) { - this.throwThisOnWrite = throwThisOnWrite; - } - - public synchronized void throwOnHandshake(ServerResponseException throwThisOnHandshake) { - this.throwThisOnHandshake = throwThisOnHandshake; - } - - private OperationStatus okResponse(Document document) { - return new OperationStatus("ok", document.getOperationId(), ErrorCode.OK, false, ""); - } - -} diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DryRunGatewayConnectionFactory.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DryRunGatewayConnectionFactory.java deleted file mode 100644 index 01bec563889..00000000000 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DryRunGatewayConnectionFactory.java +++ /dev/null @@ -1,26 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.vespa.http.client.core.communication; - -import com.yahoo.vespa.http.client.config.Endpoint; - -import java.time.Clock; - -/** - * @author bratseth - */ -public class DryRunGatewayConnectionFactory implements GatewayConnectionFactory { - - private final Endpoint endpoint; - private final Clock clock; - - public DryRunGatewayConnectionFactory(Endpoint endpoint, Clock clock) { - this.endpoint = endpoint; - this.clock = clock; - } - - @Override - public GatewayConnection newConnection() { - return new DryRunGatewayConnection(endpoint, clock); - } - -} diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/EndpointIOException.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/EndpointIOException.java deleted file mode 100644 index 34b8ef76068..00000000000 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/EndpointIOException.java +++ /dev/null @@ -1,26 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.vespa.http.client.core.communication; - -import com.yahoo.vespa.http.client.config.Endpoint; - -import java.io.IOException; - -/** - * Class for throwing exception from endpoint. - * - * @author dybis -*/ -public class EndpointIOException extends IOException { - - private final Endpoint endpoint; - private static final long serialVersionUID = 29335813211L; - - public EndpointIOException(Endpoint endpoint, String message, Throwable cause) { - super(message, cause); - this.endpoint = endpoint; - } - - /** Returns the endpoint, or null if the failure occurred before this was assigned to a unique endpoint */ - public Endpoint getEndpoint() { return endpoint; } - -} diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/EndpointResultQueue.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/EndpointResultQueue.java deleted file mode 100644 index 98b6aee1e33..00000000000 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/EndpointResultQueue.java +++ /dev/null @@ -1,136 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.vespa.http.client.core.communication; - -import com.yahoo.vespa.http.client.FeedEndpointException; -import com.yahoo.vespa.http.client.config.Endpoint; -import com.yahoo.vespa.http.client.core.EndpointResult; -import com.yahoo.vespa.http.client.core.operationProcessor.EndPointResultFactory; -import com.yahoo.vespa.http.client.core.operationProcessor.OperationProcessor; - -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.logging.Logger; - -/** - * The shared queue of operation results. - * This is multithread safe. - * - * @author Einar M R Rosenvinge - */ -class EndpointResultQueue { - - private static final Logger log = Logger.getLogger(EndpointResultQueue.class.getName()); - private final OperationProcessor operationProcessor; - - private final Map<String, InflightOperation> inflightOperations = new HashMap<>(); - - private final Endpoint endpoint; - private final int clusterId; - private final ScheduledThreadPoolExecutor timer; - private final long totalTimeoutMs; - - EndpointResultQueue(OperationProcessor operationProcessor, - Endpoint endpoint, - int clusterId, - ScheduledThreadPoolExecutor timer, - long totalTimeoutMs) { - this.operationProcessor = operationProcessor; - this.endpoint = endpoint; - this.clusterId = clusterId; - this.timer = timer; - this.totalTimeoutMs = totalTimeoutMs; - } - - public synchronized void operationSent(String operationId, GatewayConnection connection) { - DocumentTimerTask task = new DocumentTimerTask(operationId); - ScheduledFuture<?> future = timer.schedule(task, totalTimeoutMs, TimeUnit.MILLISECONDS); - inflightOperations.put(operationId, new InflightOperation(future, connection)); - } - - public synchronized void failOperation(EndpointResult result, int clusterId) { - resultReceived(result, clusterId, false); - } - - public synchronized void resultReceived(EndpointResult result, int clusterId) { - resultReceived(result, clusterId, true); - } - - void onEndpointError(FeedEndpointException e) { - operationProcessor.onEndpointError(e); - } - - private synchronized void resultReceived(EndpointResult result, int clusterId, boolean duplicateGivesWarning) { - operationProcessor.resultReceived(result, clusterId); - InflightOperation operation = inflightOperations.remove(result.getOperationId()); - if (operation == null) { - if (duplicateGivesWarning) { - log.info("Result for ID '" + result.getOperationId() + "' received from '" + endpoint + - "', but we have no record of a sent operation. This may happen if an operation is " + - "initiated, but also retried, due to HTTP failure. Otherwise, something is wrong on " + - "the server side (bad VIP usage?), or operation was received _after_ client-side timeout."); - } - return; - } - operation.future.cancel(false); - } - - /** Called only from ScheduledThreadPoolExecutor thread in DocumentTimerTask.run(), see below */ - private synchronized void timeout(String operationId) { - InflightOperation operation = inflightOperations.remove(operationId); - if (operation == null) { - log.finer("Timeout of operation '" + operationId + "', but operation " + - "not found in map. Result was probably received just-in-time from server, while timeout " + - "task could not be cancelled."); - return; - } - EndpointResult endpointResult = EndPointResultFactory.createTransientError( - endpoint, operationId, new RuntimeException("Timed out waiting for reply from server.")); - operationProcessor.resultReceived(endpointResult, clusterId); - } - - public synchronized int getPendingSize() { - return inflightOperations.values().size(); - } - - public synchronized void failPending(Exception exception) { - inflightOperations.forEach((operationId, operation) -> { - operation.future.cancel(false); - EndpointResult result = EndPointResultFactory.createError(endpoint, operationId, exception); - operationProcessor.resultReceived(result, clusterId); - }); - inflightOperations.clear(); - } - - public synchronized boolean hasInflightOperations(GatewayConnection connection) { - return inflightOperations.entrySet().stream() - .anyMatch(entry -> entry.getValue().connection.equals(connection)); - } - - private class DocumentTimerTask implements Runnable { - - private final String operationId; - - private DocumentTimerTask(String operationId) { - this.operationId = operationId; - } - - @Override - public void run() { - timeout(operationId); - } - - } - - private static class InflightOperation { - final ScheduledFuture<?> future; - final GatewayConnection connection; - - InflightOperation(ScheduledFuture<?> future, GatewayConnection connection) { - this.future = future; - this.connection = connection; - } - } -} diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/GatewayConnection.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/GatewayConnection.java deleted file mode 100644 index 25057a1dead..00000000000 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/GatewayConnection.java +++ /dev/null @@ -1,36 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.vespa.http.client.core.communication; - -import com.yahoo.vespa.http.client.config.Endpoint; -import com.yahoo.vespa.http.client.core.Document; -import com.yahoo.vespa.http.client.core.ServerResponseException; -import java.io.IOException; -import java.io.InputStream; -import java.time.Instant; -import java.util.List; - -public interface GatewayConnection { - - /** Returns the time this connected over the network, or null if not connected yet */ - Instant connectionTime(); - - /** Returns the last time poll was called on this, or null if never */ - Instant lastPollTime(); - - InputStream write(List<Document> docs) throws ServerResponseException, IOException; - - /** Returns any operation results that are ready now */ - InputStream poll() throws ServerResponseException, IOException; - - /** Attempt to drain all outstanding operations, even if this leads to blocking */ - InputStream drain() throws ServerResponseException, IOException; - - boolean connect(); - - Endpoint getEndpoint(); - - void handshake() throws ServerResponseException, IOException; - - void close(); - -} diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/GatewayConnectionFactory.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/GatewayConnectionFactory.java deleted file mode 100644 index 4988b73510f..00000000000 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/GatewayConnectionFactory.java +++ /dev/null @@ -1,13 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.vespa.http.client.core.communication; - -/** - * Creates gateway connections on request - * - * @author bratseth - */ -public interface GatewayConnectionFactory { - - GatewayConnection newConnection(); - -} diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/GatewayThrottler.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/GatewayThrottler.java deleted file mode 100644 index 41a1b6a7e87..00000000000 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/GatewayThrottler.java +++ /dev/null @@ -1,46 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.vespa.http.client.core.communication; - -import java.util.concurrent.ThreadLocalRandom; - -/** - * When the gateways says it can not handle more load, we should send less load. That is the responsibility - * of this component - * - * @author dybis - */ -public class GatewayThrottler { - - private long backOffTimeMs = 0; - private final long maxSleepTimeMs; - - public GatewayThrottler(long maxSleepTimeMs) { - this.maxSleepTimeMs = maxSleepTimeMs; - } - - public void handleCall(int transientErrors) { - if (transientErrors > 0) { - backOffTimeMs = Math.min(maxSleepTimeMs, backOffTimeMs + distribute(100)); - } else { - backOffTimeMs = Math.max(0, backOffTimeMs - distribute(10)); - } - sleepMs(backOffTimeMs); - } - - protected void sleepMs(long sleepTime) { - try { - if (backOffTimeMs > 0L) { - Thread.sleep(backOffTimeMs); - } - } catch (InterruptedException e) { - // Do nothing - } - } - - public int distribute(int expected) { - double factor = 0.5 + ThreadLocalRandom.current().nextDouble(); - Double result = expected * factor; - return result.intValue(); - } - -} diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java deleted file mode 100644 index aeb164227f1..00000000000 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java +++ /dev/null @@ -1,649 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.vespa.http.client.core.communication; - -import com.yahoo.vespa.http.client.FeedConnectException; -import com.yahoo.vespa.http.client.FeedProtocolException; -import com.yahoo.vespa.http.client.Result; -import com.yahoo.vespa.http.client.config.Endpoint; -import com.yahoo.vespa.http.client.core.Document; -import com.yahoo.vespa.http.client.core.EndpointResult; -import com.yahoo.vespa.http.client.core.Exceptions; -import com.yahoo.vespa.http.client.core.ServerResponseException; -import com.yahoo.vespa.http.client.core.operationProcessor.EndPointResultFactory; - -import java.io.IOException; -import java.io.InputStream; -import java.time.Clock; -import java.time.Duration; -import java.time.Instant; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.List; -import java.util.Optional; -import java.util.Random; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.logging.Level; -import java.util.logging.Logger; - -/** - * Thread which feeds document operations asynchronously and processes the results. - * - * @author Einar M R Rosenvinge - */ -public class IOThread implements Runnable, AutoCloseable { - - private static final Logger log = Logger.getLogger(IOThread.class.getName()); - - private final Endpoint endpoint; - private final GatewayConnectionFactory connectionFactory; - private final DocumentQueue documentQueue; - private final EndpointResultQueue resultQueue; - - /** The thread running this, or null if it does not run a thread (meaning tick() must be called from the outside) */ - private final Thread thread; - private final int clusterId; - private final CountDownLatch running = new CountDownLatch(1); - private final CountDownLatch stopSignal = new CountDownLatch(1); - private final int maxChunkSizeBytes; - private final int maxInFlightRequests; - private final Duration localQueueTimeOut; - private final GatewayThrottler gatewayThrottler; - private final Duration connectionTimeToLive; - private final long pollIntervalUS; - private final Clock clock; - private final Random random = new Random(); - private final OldConnectionsDrainer oldConnectionsDrainer; - - private volatile GatewayConnection currentConnection; - private volatile ConnectionState connectionState = ConnectionState.DISCONNECTED; - - private enum ConnectionState { DISCONNECTED, CONNECTED, SESSION_SYNCED }; - private final AtomicInteger wrongSessionDetectedCounter = new AtomicInteger(0); - private final AtomicInteger wrongVersionDetectedCounter = new AtomicInteger(0); - private final AtomicInteger problemStatusCodeFromServerCounter = new AtomicInteger(0); - private final AtomicInteger executeProblemsCounter = new AtomicInteger(0); - private final AtomicInteger docsReceivedCounter = new AtomicInteger(0); - private final AtomicInteger statusReceivedCounter = new AtomicInteger(0); - private final AtomicInteger pendingDocumentStatusCount = new AtomicInteger(0); - private final AtomicInteger successfulHandshakes = new AtomicInteger(0); - private final AtomicInteger lastGatewayProcessTimeMillis = new AtomicInteger(0); - - IOThread(ThreadGroup ioThreadGroup, - Endpoint endpoint, - EndpointResultQueue endpointResultQueue, - GatewayConnectionFactory connectionFactory, - int clusterId, - int maxChunkSizeBytes, - int maxInFlightRequests, - Duration localQueueTimeOut, - DocumentQueue documentQueue, - long maxSleepTimeMs, - Duration connectionTimeToLive, - boolean runThreads, - double idlePollFrequency, - Clock clock) { - this.endpoint = endpoint; - this.documentQueue = documentQueue; - this.connectionFactory = connectionFactory; - this.currentConnection = connectionFactory.newConnection(); - this.resultQueue = endpointResultQueue; - this.clusterId = clusterId; - this.maxChunkSizeBytes = maxChunkSizeBytes; - this.maxInFlightRequests = maxInFlightRequests; - this.connectionTimeToLive = connectionTimeToLive; - this.gatewayThrottler = new GatewayThrottler(maxSleepTimeMs); - this.pollIntervalUS = Math.max(1000, (long)(1000000.0/Math.max(0.1, idlePollFrequency))); // ensure range [1ms, 10s] - this.clock = clock; - this.localQueueTimeOut = localQueueTimeOut; - this.oldConnectionsDrainer = new OldConnectionsDrainer(endpoint, - clusterId, - Duration.ofMillis(pollIntervalUS/1000), - connectionTimeToLive, - localQueueTimeOut, - statusReceivedCounter, - resultQueue, - stopSignal, - clock); - if (runThreads) { - this.thread = new Thread(ioThreadGroup, this, "IOThread " + endpoint); - thread.setDaemon(true); - thread.start(); - Thread thread = new Thread(ioThreadGroup, oldConnectionsDrainer, "IOThread " + endpoint + " drainer"); - thread.setDaemon(true); - thread.start(); - } - else { - this.thread = null; - } - } - - public Endpoint getEndpoint() { - return endpoint; - } - - /** - * Returns a snapshot of counters. Threadsafe. - */ - public ConnectionStats getConnectionStats() { - return new ConnectionStats( - wrongSessionDetectedCounter.get(), - wrongVersionDetectedCounter.get(), - problemStatusCodeFromServerCounter.get(), - executeProblemsCounter.get(), - docsReceivedCounter.get(), - statusReceivedCounter.get(), - pendingDocumentStatusCount.get(), - successfulHandshakes.get(), - lastGatewayProcessTimeMillis.get()); - } - - @Override - public void close() { - documentQueue.close(); - if (stopSignal.getCount() == 0) return; - - stopSignal.countDown(); - log.finer("Closed called."); - - oldConnectionsDrainer.close(); - - // Make a last attempt to get results from previous operations, we have already waited quite a bit before getting here. - int size = resultQueue.getPendingSize(); - if (size > 0) { - log.info("We have outstanding operations (" + size + ") , trying to fetch responses."); - try { - processResponse(currentConnection.drain()); - } catch (Throwable e) { - log.log(Level.SEVERE, "Some failures while trying to get latest responses from vespa.", e); - } - } - - try { - currentConnection.close(); - } finally { - // If there is still documents in the queue, fail them. - drainDocumentQueueWhenFailingPermanently(new Exception("Closed call, did not manage to process everything so failing this document.")); - } - - log.fine("Session to " + endpoint + " closed."); - } - - /** For testing only */ - public void post(Document document) throws InterruptedException { - documentQueue.put(document, true); - } - - @Override - public String toString() { - return "I/O thread (for " + endpoint + ")"; - } - - List<Document> getNextDocsForFeeding(long maxWaitUnits, TimeUnit timeUnit) { - List<Document> docsForSendChunk = new ArrayList<>(); - int chunkSizeBytes = 0; - try { - drainFirstDocumentsInQueueIfOld(); - Document doc = thread != null ? documentQueue.poll(maxWaitUnits, timeUnit) : documentQueue.poll(); - if (doc != null) { - docsForSendChunk.add(doc); - chunkSizeBytes = doc.size(); - } - } catch (InterruptedException ie) { - log.fine("Got break signal while waiting for new documents to feed"); - return docsForSendChunk; - } - int pendingSize = 1 + resultQueue.getPendingSize(); - - // see if we can get more documents without blocking - // slightly randomize how much is taken to avoid harmonic interactions leading - // to some threads consistently taking more than others - int thisMaxChunkSizeBytes = randomize(maxChunkSizeBytes); - int thisMaxInFlightRequests = randomize(maxInFlightRequests); - while (chunkSizeBytes < thisMaxChunkSizeBytes && pendingSize < thisMaxInFlightRequests) { - drainFirstDocumentsInQueueIfOld(); - Document document = documentQueue.poll(); - if (document == null) break; - docsForSendChunk.add(document); - chunkSizeBytes += document.size(); - pendingSize++; - } - if (log.isLoggable(Level.FINEST)) - log.finest("Chunk has " + docsForSendChunk.size() + " docs with a size " + chunkSizeBytes + " bytes"); - docsReceivedCounter.addAndGet(docsForSendChunk.size()); - return docsForSendChunk; - } - - private int randomize(int limit) { - double multiplier = 0.75 + 0.25 * random.nextDouble(); - return Math.max(1, (int)(limit * multiplier)); - } - - private void addDocumentsToResultQueue(List<Document> docs) { - for (Document doc : docs) { - resultQueue.operationSent(doc.getOperationId(), currentConnection); - } - } - - private void markDocumentAsFailed(List<Document> docs, ServerResponseException servletException) { - for (Document doc : docs) { - resultQueue.failOperation(EndPointResultFactory.createTransientError(endpoint, - doc.getOperationId(), - servletException), - clusterId); - } - } - - private InputStream sendAndReceive(List<Document> docs) throws IOException, ServerResponseException { - try { - // Post the new docs and get async responses for other posts. - return currentConnection.write(docs); - } catch (ServerResponseException ser) { - markDocumentAsFailed(docs, ser); - throw ser; - } catch (Exception e) { - markDocumentAsFailed(docs, new ServerResponseException(Exceptions.toMessageString(e))); - throw e; - } - } - - private static class ProcessResponse { - - private final int transitiveErrorCount; - private final int processResultsCount; - - ProcessResponse(int transitiveErrorCount, int processResultsCount) { - this.transitiveErrorCount = transitiveErrorCount; - this.processResultsCount = processResultsCount; - } - - } - - private ProcessResponse processResponse(InputStream serverResponse) throws IOException { - return processResponse(serverResponse, endpoint, clusterId, statusReceivedCounter, resultQueue); - } - - private static ProcessResponse processResponse(InputStream serverResponse, - Endpoint endpoint, - int clusterId, - AtomicInteger statusReceivedCounter, - EndpointResultQueue resultQueue) throws IOException { - Collection<EndpointResult> endpointResults = EndPointResultFactory.createResult(endpoint, serverResponse); - statusReceivedCounter.addAndGet(endpointResults.size()); - int transientErrors = 0; - for (EndpointResult endpointResult : endpointResults) { - if (endpointResult.getDetail().getResultType() == Result.ResultType.TRANSITIVE_ERROR) { - transientErrors++; - } - resultQueue.resultReceived(endpointResult, clusterId); - } - return new ProcessResponse(transientErrors, endpointResults.size()); - } - - private ProcessResponse feedDocumentAndProcessResults(List<Document> docs) - throws ServerResponseException, IOException { - addDocumentsToResultQueue(docs); - long startTime = clock.millis(); - InputStream serverResponse = sendAndReceive(docs); - - ProcessResponse processResponse = processResponse(serverResponse); - lastGatewayProcessTimeMillis.set((int) (clock.millis() - startTime)); - return processResponse; - } - - private ProcessResponse pullAndProcessData(long maxWaitTimeUS) throws ServerResponseException, IOException { - int pendingResultQueueSize = resultQueue.getPendingSize(); - pendingDocumentStatusCount.set(pendingResultQueueSize); - - List<Document> nextDocsForFeeding = (pendingResultQueueSize > maxInFlightRequests) - ? new ArrayList<>() // The queue is full, will not send more documents - : getNextDocsForFeeding(maxWaitTimeUS, TimeUnit.MICROSECONDS); - - if (nextDocsForFeeding.isEmpty() && pendingResultQueueSize == 0) { - //we have no unfinished business with the server now. - log.finest("No document awaiting feeding, not waiting for results."); - return new ProcessResponse(0, 0); - } - log.finest("Awaiting " + pendingResultQueueSize + " results."); - ProcessResponse processResponse = feedDocumentAndProcessResults(nextDocsForFeeding); - - if (pendingResultQueueSize > maxInFlightRequests && processResponse.processResultsCount == 0) { - try { - // Max outstanding document operations, no more results on server side, wait a bit before asking again - Thread.sleep(300); - } catch (InterruptedException e) { - // Ignore - } - } - return processResponse; - } - - /** Given a current connection state, take the appropriate action and return the resulting new connection state */ - private ConnectionState cycle(ConnectionState connectionState) { - switch(connectionState) { - case DISCONNECTED: - try { - if (! currentConnection.connect()) { - log.log(Level.WARNING, "Could not connect to endpoint: '" + endpoint + "'. Will re-try."); - drainFirstDocumentsInQueueIfOld(); - return ConnectionState.DISCONNECTED; - } - return ConnectionState.CONNECTED; - } catch (Throwable throwable1) { - drainFirstDocumentsInQueueIfOld(); - - log.log(Level.INFO, "Failed connecting to endpoint: '" + endpoint + "'. Will re-try connecting.", - throwable1); - executeProblemsCounter.incrementAndGet(); - return ConnectionState.DISCONNECTED; - } - case CONNECTED: - try { - if (isStale(currentConnection)) - return refreshConnection(connectionState); - currentConnection.handshake(); - successfulHandshakes.getAndIncrement(); - } catch (ServerResponseException ser) { - int code = ser.getResponseCode(); - if (code == 401 || code == 403) { - drainDocumentQueueWhenFailingPermanently(new Exception("Denied access by endpoint:" + ser.getResponseString())); - log.log(Level.SEVERE, "Failed authentication or authorization with '" + endpoint + "': " + Exceptions.toMessageString(ser)); - return ConnectionState.CONNECTED; // Should ideally exit immediately, instead of doing this per X documents :/ - } - - executeProblemsCounter.incrementAndGet(); - log.log(Level.INFO, "Failed talking to endpoint. Handshake with server endpoint '" + endpoint + - "' failed -- will re-try handshake: " + Exceptions.toMessageString(ser)); - - drainFirstDocumentsInQueueIfOld(); - resultQueue.onEndpointError(new FeedProtocolException(ser.getResponseCode(), ser.getResponseString(), ser, endpoint)); - return ConnectionState.CONNECTED; - } catch (Throwable throwable) { // This cover IOException as well - executeProblemsCounter.incrementAndGet(); - resultQueue.onEndpointError(new FeedConnectException(throwable, endpoint)); - log.log(Level.INFO, "Failed talking to endpoint. Handshake with server endpoint '" + - endpoint + "' failed. Will re-try handshake.", - throwable); - drainFirstDocumentsInQueueIfOld(); - currentConnection.close(); - return ConnectionState.DISCONNECTED; - } - return ConnectionState.SESSION_SYNCED; - case SESSION_SYNCED: - try { - if (isStale(currentConnection)) - return refreshConnection(connectionState); - ProcessResponse processResponse = pullAndProcessData(pollIntervalUS); - gatewayThrottler.handleCall(processResponse.transitiveErrorCount); - } - catch (ServerResponseException ser) { - log.log(Level.INFO, "Problems while handing data over to endpoint '" + endpoint + - "'. Will re-try. Endpoint responded with an unexpected HTTP response code.", - ser); - return ConnectionState.CONNECTED; - } - catch (Throwable e) { - log.log(Level.INFO, - "Connection level error handing data over to endpoint '" + endpoint + "'. Will re-try.", - e); - currentConnection.close(); - return ConnectionState.DISCONNECTED; - } - return ConnectionState.SESSION_SYNCED; - default: { - log.severe("Should never get here."); - currentConnection.close(); - return ConnectionState.DISCONNECTED; - } - } - } - - private void sleepIfProblemsGettingSyncedConnection(ConnectionState newState, ConnectionState oldState) { - if (newState == ConnectionState.SESSION_SYNCED) return; - if (newState == ConnectionState.CONNECTED && oldState == ConnectionState.DISCONNECTED) return; - try { - // Take it easy we have problems getting a connection up. - if (stopSignal.getCount() > 0 || !documentQueue.isEmpty()) { - Thread.sleep(gatewayThrottler.distribute(3000)); - } - } catch (InterruptedException e) { - } - } - - @Override - public void run() { - while (stopSignal.getCount() > 0 || !documentQueue.isEmpty()) - tick(); - log.finer(toString() + " exiting, documentQueue.size()=" + documentQueue.size()); - running.countDown(); - } - - /** Do one iteration of work. Should be called from the single worker thread of this. */ - public void tick() { - ConnectionState oldState = connectionState; - connectionState = cycle(connectionState); - if (thread == null) - oldConnectionsDrainer.checkOldConnections(); - if (thread != null) - sleepIfProblemsGettingSyncedConnection(connectionState, oldState); - } - - private void drainFirstDocumentsInQueueIfOld() { - while (true) { - Optional<Document> document = documentQueue.pollDocumentIfTimedoutInQueue(localQueueTimeOut); - if ( ! document.isPresent()) return; - - EndpointResult endpointResult = EndPointResultFactory.createTransientError( - endpoint, document.get().getOperationId(), - new Exception("Not sending document operation, timed out in queue after " + - (clock.millis() - document.get().getQueueInsertTime().toEpochMilli()) + " ms.")); - resultQueue.failOperation(endpointResult, clusterId); - } - } - - private void drainDocumentQueueWhenFailingPermanently(Exception exception) { - // first, clear sentOperations: - resultQueue.failPending(exception); - - for (Document document : documentQueue.removeAllDocuments()) { - EndpointResult endpointResult= - EndPointResultFactory.createError(endpoint, document.getOperationId(), exception); - resultQueue.failOperation(endpointResult, clusterId); - } - } - - private boolean isStale(GatewayConnection connection) { - return connection.connectionTime() != null - && connection.connectionTime().plus(connectionTimeToLive).isBefore(clock.instant()); - } - - private ConnectionState refreshConnection(ConnectionState currentConnectionState) { - if (currentConnectionState == ConnectionState.SESSION_SYNCED) - oldConnectionsDrainer.add(currentConnection); - currentConnection = connectionFactory.newConnection(); - return ConnectionState.DISCONNECTED; - } - - public static class ConnectionStats { - - // NOTE: These fields are accessed by reflection in JSON serialization - - public final int wrongSessionDetectedCounter; - public final int wrongVersionDetectedCounter; - public final int problemStatusCodeFromServerCounter; - public final int executeProblemsCounter; - public final int docsReceivedCounter; - public final int statusReceivedCounter; - public final int pendingDocumentStatusCount; - public final int successfullHandshakes; - public final int lastGatewayProcessTimeMillis; - - ConnectionStats(int wrongSessionDetectedCounter, - int wrongVersionDetectedCounter, - int problemStatusCodeFromServerCounter, - int executeProblemsCounter, - int docsReceivedCounter, - int statusReceivedCounter, - int pendingDocumentStatusCount, - int successfullHandshakes, - int lastGatewayProcessTimeMillis) { - this.wrongSessionDetectedCounter = wrongSessionDetectedCounter; - this.wrongVersionDetectedCounter = wrongVersionDetectedCounter; - this.problemStatusCodeFromServerCounter = problemStatusCodeFromServerCounter; - this.executeProblemsCounter = executeProblemsCounter; - this.docsReceivedCounter = docsReceivedCounter; - this.statusReceivedCounter = statusReceivedCounter; - this.pendingDocumentStatusCount = pendingDocumentStatusCount; - this.successfullHandshakes = successfullHandshakes; - this.lastGatewayProcessTimeMillis = lastGatewayProcessTimeMillis; - } - } - - /** For testing. Returns the current connection of this. Not thread safe. */ - public GatewayConnection currentConnection() { return currentConnection; } - - /** For testing. Returns a snapshot of the old connections of this. */ - public List<GatewayConnection> oldConnections() { return oldConnectionsDrainer.connections(); } - - /** For testing */ - public EndpointResultQueue resultQueue() { return resultQueue; } - - /** - * We need to drain results on the connection where they were sent to make sure we request results on - * the node which received the operation also when going through a VIP. - */ - private static class OldConnectionsDrainer implements Runnable { - - private static final Logger log = Logger.getLogger(OldConnectionsDrainer.class.getName()); - - private final Endpoint endpoint; - private final int clusterId; - private final Duration pollInterval; - private final Duration connectionTimeToLive; - private final Duration localQueueTimeOut; - private final AtomicInteger statusReceivedCounter; - private final EndpointResultQueue resultQueue; - private final CountDownLatch stopSignal; - private final Clock clock; - - /** - * Previous connections on which we may have sent operations and are still waiting for the results - * All connections in this are in state SESSION_SYNCED. - */ - private final List<GatewayConnection> connections = new CopyOnWriteArrayList<>(); - - OldConnectionsDrainer(Endpoint endpoint, - int clusterId, - Duration pollInterval, - Duration connectionTimeToLive, - Duration localQueueTimeOut, - AtomicInteger statusReceivedCounter, - EndpointResultQueue resultQueue, - CountDownLatch stopSignal, - Clock clock) { - this.endpoint = endpoint; - this.clusterId = clusterId; - this.pollInterval = pollInterval; - this.connectionTimeToLive = connectionTimeToLive; - this.localQueueTimeOut = localQueueTimeOut; - this.statusReceivedCounter = statusReceivedCounter; - this.resultQueue = resultQueue; - this.stopSignal = stopSignal; - this.clock = clock; - } - - /** Add another old connection to this for draining */ - public void add(GatewayConnection connection) { - connections.add(connection); - } - - @Override - public void run() { - while (stopSignal.getCount() > 0) { - try { - checkOldConnections(); - Thread.sleep(pollInterval.toMillis()); - } - catch (InterruptedException e) { - log.log(Level.WARNING, "Close thread was interrupted: " + e.getMessage(), e); - Thread.currentThread().interrupt(); - return; - } catch (Exception e) { - log.log(Level.WARNING, "Connection draining failed: " + e.getMessage(), e); - } - } - } - - public void checkOldConnections() { - for (GatewayConnection connection : connections) { - if (!resultQueue.hasInflightOperations(connection)) { - log.fine(() -> connection + " no longer has inflight operations"); - closeConnection(connection); - } else if (closingTime(connection).isBefore(clock.instant())) { - log.fine(() -> connection + " still has inflight operations, but drain period is over"); - tryPollAndDrainInflightOperations(connection); - closeConnection(connection); - } else if (timeToPoll(connection)) { - tryPollAndDrainInflightOperations(connection); - } - } - } - - private void closeConnection(GatewayConnection connection) { - log.fine(() -> "Closing " + connection); - connection.close(); - connections.remove(connection); // Safe as CopyOnWriteArrayList allows removal during iteration - } - - private void tryPollAndDrainInflightOperations(GatewayConnection connection) { - try { - log.fine(() -> "Polling and draining inflight operations for " + connection); - IOThread.processResponse(connection.poll(), endpoint, clusterId, statusReceivedCounter, resultQueue); - } catch (Exception e) { - // Old connection; best effort - log.log(Level.FINE, e, () -> "Polling status of inflight operations failed: " + e.getMessage()); - } - } - - private boolean timeToPoll(GatewayConnection connection) { - // connectionEndOfLife < connectionLastPolled < now - Instant now = clock.instant(); - Instant endOfLife = connection.connectionTime().plus(connectionTimeToLive); - if (connection.lastPollTime() == null) return endOfLife.plus(pollInterval).isBefore(now); - if (connection.lastPollTime().plus(pollInterval).isAfter(now)) return false; - - // Exponential (2^x) dropoff: - double connectionEndOfLife = endOfLife.toEpochMilli(); - double connectionLastPolled = connection.lastPollTime().toEpochMilli(); - return now.toEpochMilli() - connectionEndOfLife > 2 * (connectionLastPolled - connectionEndOfLife); - } - - private Instant closingTime(GatewayConnection connection) { - return connection.connectionTime().plus(connectionTimeToLive).plus(localQueueTimeOut); - } - - private void close() { - int size = resultQueue.getPendingSize(); - if (size > 0) { - log.info("We have outstanding operations (" + size + ") , trying to fetch responses."); - for (GatewayConnection connection : connections) { - try { - IOThread.processResponse(connection.poll(), endpoint, clusterId, statusReceivedCounter, resultQueue); - } catch (Throwable e) { - log.log(Level.SEVERE, "Some failures while trying to get latest responses from vespa.", e); - } - } - } - for (GatewayConnection oldConnection : connections) - oldConnection.close(); - } - - /** For testing. Returns the old connections of this. */ - public List<GatewayConnection> connections() { return Collections.unmodifiableList(connections); } - - } - -} diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/ConcurrentDocumentOperationBlocker.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/ConcurrentDocumentOperationBlocker.java deleted file mode 100644 index 79a04d8f043..00000000000 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/ConcurrentDocumentOperationBlocker.java +++ /dev/null @@ -1,69 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.vespa.http.client.core.operationProcessor; - -import java.util.concurrent.Semaphore; - -/** - * A semaphore that can be re-sized. - * - * @author dybis - */ -final public class ConcurrentDocumentOperationBlocker { - - private static final int INITIAL_SIZE = 0; - private final ReducableSemaphore semaphore = new ReducableSemaphore(); - private int maxConcurrency = INITIAL_SIZE; - private final Object monitor = new Object(); - - /* - * Resizes the semaphore. It does not wait for threads that are in the queue when downsizing. - */ - void setMaxConcurrency(int maxConcurrency) { - synchronized (monitor) { - int deltaConcurrency = maxConcurrency - this.maxConcurrency; - - if (deltaConcurrency > 0) { - semaphore.release(deltaConcurrency); - } - if (deltaConcurrency < 0) { - semaphore.reducePermits(-1 * deltaConcurrency); - } - this.maxConcurrency = maxConcurrency; - } - } - - /** - * Release a permit. - */ - void operationDone() { - semaphore.release(); - } - - /** - * Acquire a permit. Blocking if no permits available. - */ - void startOperation() throws InterruptedException { - semaphore.acquire(); - } - - int availablePermits() { - return semaphore.availablePermits(); - } - - /** - * We need to extend Semaphore to get access to protected reducePermit() method. - */ - @SuppressWarnings("serial") - private static final class ReducableSemaphore extends Semaphore { - - ReducableSemaphore() { - super(INITIAL_SIZE, true /* FIFO */); - } - - @Override - protected void reducePermits(int reduction) { - super.reducePermits(reduction); - } - } - -} diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/DocumentSendInfo.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/DocumentSendInfo.java deleted file mode 100644 index b72a6c67398..00000000000 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/DocumentSendInfo.java +++ /dev/null @@ -1,75 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.vespa.http.client.core.operationProcessor; - -import com.yahoo.vespa.http.client.Result; -import com.yahoo.vespa.http.client.core.Document; - -import java.time.Clock; -import java.util.HashMap; -import java.util.Map; - -/** - * Keeps an overview of what is sent and what is received for an operation. - * This class is not thread-safe. - */ -class DocumentSendInfo { - - private final Document document; - private final Map<Integer, Result.Detail> detailByClusterId = new HashMap<>(); - // This is lazily populated as normal cases does not require retries. - private Map<Integer, Integer> attemptedRetriesByClusterId = null; - private final StringBuilder localTrace; - private final Clock clock; - - DocumentSendInfo(Document document, boolean traceThisDoc, Clock clock) { - this.document = document; - localTrace = traceThisDoc ? new StringBuilder("\n" + document.createTime() + " Trace starting " + "\n") - : null; - this.clock = clock; - } - - boolean addIfNotAlreadyThere(Result.Detail detail, int clusterId) { - if (detailByClusterId.containsKey(clusterId)) { - if (localTrace != null) { - localTrace.append(clock.millis() + " Got duplicate detail, ignoring this: " + - detail.toString() + "\n"); - } - return false; - } - if (localTrace != null) { - localTrace.append(clock.millis() + " Got detail: " + detail.toString() + "\n"); - } - detailByClusterId.put(clusterId, detail); - return true; - } - - int detailCount() { - return detailByClusterId.size(); - } - - public Result createResult() { - return new Result(document, detailByClusterId.values(), localTrace); - } - - int incRetries(int clusterId, Result.Detail detail) { - if (attemptedRetriesByClusterId == null) { - attemptedRetriesByClusterId = new HashMap<>(); - } - int retries = 0; - if (attemptedRetriesByClusterId.containsKey(clusterId)) { - retries = attemptedRetriesByClusterId.get(clusterId); - } - retries++; - attemptedRetriesByClusterId.put(clusterId, retries); - if (localTrace != null) { - localTrace.append(clock.millis() + " Asked about retrying for cluster ID " - + clusterId + ", number of retries is " + retries + " Detail:\n" + detail.toString()); - } - return retries; - } - - Document getDocument() { - return document; - } - -} diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/EndPointResultFactory.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/EndPointResultFactory.java deleted file mode 100644 index 9af63f6637a..00000000000 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/EndPointResultFactory.java +++ /dev/null @@ -1,92 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.vespa.http.client.core.operationProcessor; - -import com.yahoo.vespa.http.client.Result; -import com.yahoo.vespa.http.client.config.Endpoint; -import com.yahoo.vespa.http.client.core.EndpointResult; -import com.yahoo.vespa.http.client.core.OperationStatus; - -import java.io.BufferedReader; -import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.logging.Logger; - -/** - * @author Einar M R Rosenvinge - */ -public final class EndPointResultFactory { - - private static final Logger log = Logger.getLogger(EndPointResultFactory.class.getName()); - private static final String EMPTY_MESSAGE = "-"; - - public static Collection<EndpointResult> createResult(Endpoint endpoint, - InputStream inputStream) throws IOException { - List<EndpointResult> results = new ArrayList<>(); - try (BufferedReader reader = new BufferedReader( - new InputStreamReader(inputStream, StandardCharsets.US_ASCII))) { - String line; - while ((line = reader.readLine()) != null) { - results.add(parseResult(line, endpoint)); - } - } - return results; - } - - public static EndpointResult createError(Endpoint endpoint, String operationId, Exception exception) { - return new EndpointResult(operationId, new Result.Detail(endpoint, - Result.ResultType.FATAL_ERROR, - null, - exception)); - } - - public static EndpointResult createTransientError(Endpoint endpoint, String operationId, Exception exception) { - return new EndpointResult(operationId, new Result.Detail(endpoint, - Result.ResultType.TRANSITIVE_ERROR, - null, - exception)); - } - - private static Result.ResultType replyToResultType(OperationStatus reply) { - // The ordering below is important, e.g. if success, it is never a transient error even if isTransient is true. - if (reply.errorCode.isSuccess()) - return Result.ResultType.OPERATION_EXECUTED; - if (reply.isConditionNotMet) - return Result.ResultType.CONDITION_NOT_MET; - if (reply.errorCode.isTransient()) - return Result.ResultType.TRANSITIVE_ERROR; - return Result.ResultType.FATAL_ERROR; - } - - private static EndpointResult parseResult(String line, Endpoint endpoint) { - try { - OperationStatus reply = OperationStatus.parse(line); - String message; - if (EMPTY_MESSAGE.equals(reply.message)) { - message = null; - } else { - message = reply.message; - } - Exception exception = null; - if (!reply.errorCode.isSuccess() && message != null) { - exception = new RuntimeException(message); - } - if (reply.traceMessage != null && !reply.traceMessage.isEmpty()) { - log.fine("Got trace message: " + reply.traceMessage); - } - return new EndpointResult( - reply.operationId, - new Result.Detail(endpoint, - replyToResultType(reply), - reply.traceMessage, - exception)); - } catch (Throwable t) { - throw new IllegalArgumentException("Bad result line from server: '" + line + "'", t); - } - } - -} diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/IncompleteResultsThrottler.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/IncompleteResultsThrottler.java deleted file mode 100644 index a88d5af8094..00000000000 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/IncompleteResultsThrottler.java +++ /dev/null @@ -1,181 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.vespa.http.client.core.operationProcessor; - -import com.yahoo.vespa.http.client.core.ThrottlePolicy; - -import java.time.Clock; -import java.util.concurrent.ThreadLocalRandom; - -/** - * Adjusts in-flight operations based on throughput. It will walk the graph and try to find - * local optimum. - * - * It looks at the throughput, adjust max in-flight based on the previous throughput and settings. - * - * In the beginning it moves faster, and then stabilizes. - * - * It will wait a bit after adjusting before it starts to sample, since there is a latency between adjustment - * and result. - * - * There are several mechanisms to reduce impact of several clients running in parallel. The window size has a - * random part, and the wait time before sampling after adjustment has a random part as well. - * - * To avoid running wild with large values of max-in flight, it is tuned to stay on the smaller part, and - * rather reduce max-in flight than to have a too large value. - * - * In case the where the queue is moved to minimum size, it will now and then increase queue size to get - * more sample data and possibly grow size. - * - * Class is fully thread safe, i.e. all public methods are thread safe. - * - * @author dybis - */ -public class IncompleteResultsThrottler { - - private final ConcurrentDocumentOperationBlocker blocker = new ConcurrentDocumentOperationBlocker(); - private final int maxInFlightValue; - private final int minInFlightValue; - private final ThrottlePolicy policy; - - // 9-11 seconds with some randomness to avoid fully synchronous feeders. - public final long phaseSizeMs = 9000 + (ThreadLocalRandom.current().nextInt() % 2000); - private final Clock clock; - - private final Object monitor = new Object(); - private long sampleStartTimeMs = 0; - private int previousNumOk = 0; - private int previousMaxInFlight = 0; - private int stabilizingPhasesLeft = 0; - private int adjustCycleCount = 0; - private int maxInFlightNow; - private int numOk = 0; - private int minWindowSizeCounter = 0; - private int minPermitsAvailable = 0; - - protected static int INITIAL_MAX_IN_FLIGHT_VALUE = 200; - protected static int SECOND_MAX_IN_FLIGHT_VALUE = 270; - private StringBuilder debugMessage = new StringBuilder(); - - /** - * Creates the throttler. - * - * @param minInFlightValue the throttler will never throttle beyond this limit. - * @param maxInFlightValue the throttler will never throttle above this limit. If zero, no limit. - * @param clock use to calculate window size. Can be null if minWindowSize and maxInFlightValue are equal. - * @param policy is the algorithm for finding next value of the number of in-flight documents operations. - */ - public IncompleteResultsThrottler(int minInFlightValue, int maxInFlightValue, Clock clock, ThrottlePolicy policy) { - this.maxInFlightValue = maxInFlightValue == 0 ? Integer.MAX_VALUE : maxInFlightValue; - this.minInFlightValue = minInFlightValue == 0 ? this.maxInFlightValue : minInFlightValue; - this.policy = policy; - this.clock = clock; - if (minInFlightValue != maxInFlightValue) { - this.sampleStartTimeMs = clock.millis(); - } - setNewSemaphoreSize(INITIAL_MAX_IN_FLIGHT_VALUE); - } - - public int availableCapacity() { - return blocker.availablePermits(); - } - - public void operationStart() { - try { - blocker.startOperation(); - } catch (InterruptedException e) { - // Ignore - } - if (maxInFlightValue != minInFlightValue) { - synchronized (monitor) { - adjustThrottling(); - } - } - } - - public String getDebugMessage() { - synchronized (monitor) { - return debugMessage.toString(); - } - } - - public void resultReady(boolean success) { - blocker.operationDone(); - if (!success) { - return; - } - synchronized (monitor) { - numOk++; - minPermitsAvailable = Math.min(minPermitsAvailable, blocker.availablePermits()); - } - } - - // Only for testing - protected int waitingThreads() { - synchronized (monitor) { - return maxInFlightNow - blocker.availablePermits(); - } - } - - private double getCeilingDifferencePerformance(int adjustCycle) { - // We want larger adjustments in the early phase. - if (adjustCycle > 10) { - return 0.7; - } - return 1.2; - } - - private void adjustCycle() { - adjustCycleCount++; - stabilizingPhasesLeft = adjustCycleCount < 5 ? 1 : 2 + ThreadLocalRandom.current().nextInt() % 2; - - double maxPerformanceChange = getCeilingDifferencePerformance(adjustCycleCount); - boolean messagesQueued = minPermitsAvailable < 2; - - int newMaxInFlight = policy.calcNewMaxInFlight( - maxPerformanceChange, numOk, previousNumOk, previousMaxInFlight, maxInFlightNow, messagesQueued); - debugMessage = new StringBuilder(); - debugMessage.append("previousMaxInFlight: " + previousMaxInFlight - + " maxInFlightNow: " + maxInFlightNow - + " numOk: " + numOk + " " + " previousOk: " + previousNumOk - + " new size is: " + newMaxInFlight); - previousMaxInFlight = maxInFlightNow; - previousNumOk = numOk; - - setNewSemaphoreSize(adjustCycleCount == 1 ? SECOND_MAX_IN_FLIGHT_VALUE : newMaxInFlight); - } - - private void adjustThrottling() { - if (clock.millis() < sampleStartTimeMs + phaseSizeMs) return; - - sampleStartTimeMs += phaseSizeMs; - - if (stabilizingPhasesLeft-- == 0) { - adjustCycle(); - } - numOk = 0; - this.minPermitsAvailable = maxInFlightNow; - } - - private int tryBoostingSizeIfMinValueOverSeveralCycles(final int size) { - if (size <= minInFlightValue) { - minWindowSizeCounter++; - } else { - minWindowSizeCounter = 0; - } - if (minWindowSizeCounter == 4) { - debugMessage.append(" (inc max in flight to get more data)"); - minWindowSizeCounter = 0; - return size + 10; - } - return size; - - } - - private void setNewSemaphoreSize(final int size) { - maxInFlightNow = - Math.max(minInFlightValue, Math.min( - tryBoostingSizeIfMinValueOverSeveralCycles(size), maxInFlightValue)); - blocker.setMaxConcurrency(maxInFlightNow); - } - -} diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessor.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessor.java deleted file mode 100644 index 5d0e063efdf..00000000000 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessor.java +++ /dev/null @@ -1,307 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.vespa.http.client.core.operationProcessor; - -import com.google.common.collect.ArrayListMultimap; -import com.yahoo.vespa.http.client.FeedClient; -import com.yahoo.vespa.http.client.FeedEndpointException; -import com.yahoo.vespa.http.client.Result; -import com.yahoo.vespa.http.client.config.Cluster; -import com.yahoo.vespa.http.client.config.SessionParams; -import com.yahoo.vespa.http.client.core.Document; -import com.yahoo.vespa.http.client.core.communication.EndpointIOException; -import com.yahoo.vespa.http.client.core.EndpointResult; -import com.yahoo.vespa.http.client.core.Exceptions; -import com.yahoo.vespa.http.client.core.communication.ClusterConnection; - -import java.math.BigInteger; -import java.security.SecureRandom; -import java.time.Clock; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashSet; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.Random; -import java.util.Set; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.logging.Level; -import java.util.logging.Logger; - -/** - * Merges several endpointResult into one Result and does the callback. - * - * @author dybis - */ -public class OperationProcessor { - - private static final Logger log = Logger.getLogger(OperationProcessor.class.getName()); - private final Map<String, DocumentSendInfo> docSendInfoByOperationId = new LinkedHashMap<>(); - private final ArrayListMultimap<String, Document> blockedDocumentsByDocumentId = ArrayListMultimap.create(); - private final Set<String> inflightDocumentIds = new HashSet<>(); - private final int numDestinations; - private final FeedClient.ResultCallback resultCallback; - private final Object monitor = new Object(); - private final IncompleteResultsThrottler incompleteResultsThrottler; - // Position in the array is cluster ID. - private final List<ClusterConnection> clusters = new ArrayList<>(); - private final ScheduledThreadPoolExecutor timeoutExecutor; - private final OperationStats operationStats; - private final int maxRetries; - private final long minTimeBetweenRetriesMs; - private final Random random = new SecureRandom(); - private final int traceEveryXOperation; - private int traceCounter = 0; - private final boolean traceToStderr; - private final ThreadGroup ioThreadGroup; - private final String clientId = new BigInteger(130, random).toString(32); - private final Clock clock; - - public OperationProcessor(IncompleteResultsThrottler incompleteResultsThrottler, - FeedClient.ResultCallback resultCallback, - SessionParams sessionParams, - ScheduledThreadPoolExecutor timeoutExecutor, - Clock clock) { - this.numDestinations = sessionParams.getClusters().size(); - this.resultCallback = resultCallback; - this.incompleteResultsThrottler = incompleteResultsThrottler; - this.timeoutExecutor = timeoutExecutor; - this.ioThreadGroup = new ThreadGroup("operationprocessor"); - this.clock = clock; - - if (sessionParams.getClusters().isEmpty()) - throw new IllegalArgumentException("Cannot feed to 0 clusters."); - - for (Cluster cluster : sessionParams.getClusters()) { - if (cluster.getEndpoints().isEmpty()) - throw new IllegalArgumentException("Cannot feed to empty cluster."); - } - - for (int i = 0; i < sessionParams.getClusters().size(); i++) { - Cluster cluster = sessionParams.getClusters().get(i); - clusters.add(new ClusterConnection(this, - sessionParams.getFeedParams(), - sessionParams.getConnectionParams(), - cluster, - i, - sessionParams.getClientQueueSize() / sessionParams.getClusters().size(), - timeoutExecutor, - clock)); - } - operationStats = new OperationStats(sessionParams, clusters, incompleteResultsThrottler); - maxRetries = sessionParams.getConnectionParams().getMaxRetries(); - minTimeBetweenRetriesMs = sessionParams.getConnectionParams().getMinTimeBetweenRetriesMs(); - traceEveryXOperation = sessionParams.getConnectionParams().getTraceEveryXOperation(); - traceToStderr = sessionParams.getConnectionParams().getPrintTraceToStdErr(); - } - - public ThreadGroup getIoThreadGroup() { - return ioThreadGroup; - } - - public int getIncompleteResultQueueSize() { - synchronized (monitor) { - return docSendInfoByOperationId.size(); - } - } - - /** Returns the id of the oldest operation to be sent. */ - public Optional<String> oldestIncompleteResultId() { - synchronized (monitor) { - return docSendInfoByOperationId.isEmpty() - ? Optional.empty() - : Optional.of(docSendInfoByOperationId.keySet().iterator().next()); - } - } - - public String getClientId() { - return clientId; - } - - private boolean retriedThis(EndpointResult endpointResult, DocumentSendInfo documentSendInfo, int clusterId) { - Result.Detail detail = endpointResult.getDetail(); - if (detail.getResultType() == Result.ResultType.OPERATION_EXECUTED) return false; // Success: No retries - - int retries = documentSendInfo.incRetries(clusterId, detail); - if (retries > maxRetries) return false; - - String exceptionMessage = detail.getException() == null ? "" : detail.getException().getMessage(); - if (exceptionMessage == null) - exceptionMessage = ""; - - // TODO: Return proper error code in structured data in next version of internal API. - // Error codes from messagebus/src/cpp/messagebus/errorcode.h - boolean retryThisOperation = - detail.getResultType() == Result.ResultType.TRANSITIVE_ERROR || - exceptionMessage.contains("SEND_QUEUE_CLOSED") || - exceptionMessage.contains("ILLEGAL_ROUTE") || - exceptionMessage.contains("NO_SERVICES_FOR_ROUTE") || - exceptionMessage.contains("NETWORK_ERROR") || - exceptionMessage.contains("SEQUENCE_ERROR") || - exceptionMessage.contains("NETWORK_SHUTDOWN") || - exceptionMessage.contains("TIMEOUT"); - - if (retryThisOperation) { - int waitTime = (int) (minTimeBetweenRetriesMs * (1 + random.nextDouble() / 3)); - log.finest("Retrying due to " + detail + " attempt " + retries + " in " + waitTime + " ms."); - timeoutExecutor.schedule(() -> postToCluster(clusters.get(clusterId), documentSendInfo.getDocument()), - waitTime, - TimeUnit.MILLISECONDS); - return true; - } - - return false; - } - - private Result process(EndpointResult endpointResult, int clusterId) { - Result result; - Document blockedDocumentToSend = null; - synchronized (monitor) { - if (!docSendInfoByOperationId.containsKey(endpointResult.getOperationId())) { - log.finer("Received out-of-order or too late result, discarding: " + endpointResult); - return null; - } - DocumentSendInfo documentSendInfo = docSendInfoByOperationId.get(endpointResult.getOperationId()); - - if (retriedThis(endpointResult, documentSendInfo, clusterId)) return null; - - // Duplicate message - if ( ! documentSendInfo.addIfNotAlreadyThere(endpointResult.getDetail(), clusterId)) return null; - - // Is this the last operation we are waiting for? - if (documentSendInfo.detailCount() != numDestinations) return null; - - result = documentSendInfo.createResult(); - docSendInfoByOperationId.remove(endpointResult.getOperationId()); - - String documentId = documentSendInfo.getDocument().getDocumentId(); - // If we got a pending operation against this document - // dont't remove it from inflightDocuments and send blocked document operation - List<Document> blockedDocuments = blockedDocumentsByDocumentId.get(documentId); - if (blockedDocuments.isEmpty()) { - inflightDocumentIds.remove(documentId); - } else { - blockedDocumentToSend = blockedDocuments.remove(0); - } - } - if (blockedDocumentToSend != null) { - sendToClusters(blockedDocumentToSend, clock); - } - return result; - } - - public void resultReceived(EndpointResult endpointResult, int clusterId) { - Result result = process(endpointResult, clusterId); - if (result != null) { - incompleteResultsThrottler.resultReady(result.isSuccess()); - resultCallback.onCompletion(result.getDocumentId(), result); - if (traceToStderr && result.hasLocalTrace()) { - System.err.println(result.toString()); - } - } - } - - public void onEndpointError(FeedEndpointException e) { - resultCallback.onEndpointException(e); - } - - public List<Exception> closeClusters() { - List<Exception> exceptions = new ArrayList<>(); - // first, close cluster sessions and allow connections to drain normally - for (ClusterConnection cluster : clusters) { - try { - cluster.close(); - } catch (Exception e) { - exceptions.add(e); - } - } - return exceptions; - } - - public void sendDocument(Document document) { - incompleteResultsThrottler.operationStart(); - - synchronized (monitor) { - if (inflightDocumentIds.contains(document.getDocumentId())) { - blockedDocumentsByDocumentId.put(document.getDocumentId(), document); - return; - } - inflightDocumentIds.add(document.getDocumentId()); - } - - sendToClusters(document, clock); - } - - private void sendToClusters(Document document, Clock clock) { - synchronized (monitor) { - boolean traceThisDoc = traceEveryXOperation > 0 && traceCounter++ % traceEveryXOperation == 0; - docSendInfoByOperationId.put(document.getOperationId(), new DocumentSendInfo(document, traceThisDoc, clock)); - } - - for (ClusterConnection clusterConnection : clusters) { - postToCluster(clusterConnection, document); - } - } - - private void postToCluster(ClusterConnection clusterConnection, Document document) { - try { - clusterConnection.post(document); - } catch (EndpointIOException eio) { - resultReceived(EndPointResultFactory.createError(eio.getEndpoint(), - document.getOperationId(), - eio), - clusterConnection.getClusterId()); - } - } - - public List<ClusterConnection> clusters() { return Collections.unmodifiableList(clusters); } - - public String getStatsAsJson() { - return operationStats.getStatsAsJson(); - } - - public void close() { - List<Exception> exceptions = closeClusters(); - try { - closeExecutor(); - } catch (InterruptedException e) { - exceptions.add(e); - } - - if (exceptions.isEmpty()) { - return; - } - if (exceptions.size() == 1) { - if (exceptions.get(0) instanceof RuntimeException) { - throw (RuntimeException) exceptions.get(0); - } else { - throw new RuntimeException(exceptions.get(0)); - } - } - - StringBuilder b = new StringBuilder(); - b.append("Exception thrown while closing one or more clusters: "); - for (int i = 0; i < exceptions.size(); i++) { - Exception e = exceptions.get(i); - b.append(Exceptions.toMessageString(e)); - if (i != (exceptions.size() - 1)) { - b.append(", "); - } - } - throw new RuntimeException(b.toString(), exceptions.get(0)); - } - - private void closeExecutor() throws InterruptedException { - log.log(Level.FINE, "Shutting down timeout executor."); - timeoutExecutor.shutdownNow(); - - log.log(Level.FINE, "Awaiting termination of already running timeout tasks."); - if (! timeoutExecutor.awaitTermination(300, TimeUnit.SECONDS)) { - log.severe("Did not manage to shut down the executors within 300 secs, system stuck?"); - throw new RuntimeException("Did not manage to shut down retry threads. Please report problem."); - } - } - -} diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationStats.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationStats.java deleted file mode 100644 index 1eebe593062..00000000000 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationStats.java +++ /dev/null @@ -1,76 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.vespa.http.client.core.operationProcessor; - -import com.fasterxml.jackson.core.JsonGenerator; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.datatype.jdk8.Jdk8Module; -import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; -import com.yahoo.vespa.http.client.config.SessionParams; -import com.yahoo.vespa.http.client.core.communication.ClusterConnection; - -import java.io.IOException; -import java.io.StringWriter; -import java.util.List; - -public class OperationStats { - - private static ObjectMapper jsonMapper = createMapper(); - - private final String sessionParamsAsXmlString; - private List<ClusterConnection> clusters; - private IncompleteResultsThrottler throttler; - - public OperationStats( - SessionParams sessionParams, - List<ClusterConnection> clusters, - IncompleteResultsThrottler throttler) { - this.sessionParamsAsXmlString = generateSessionParamsAsXmlString(sessionParams); - this.clusters = clusters; - this.throttler = throttler; - } - - private static ObjectMapper createMapper() { - ObjectMapper mapper = new ObjectMapper(); - mapper.registerModule(new Jdk8Module()); - mapper.registerModule(new JavaTimeModule()); - return mapper; - } - - private String generateSessionParamsAsXmlString(final SessionParams sessionParams) { - StringWriter stringWriter = new StringWriter(); - try { - JsonGenerator jsonGenerator = jsonMapper.createGenerator(stringWriter); - jsonMapper.writeValue(jsonGenerator, sessionParams); // TODO SessionParams should not be blindly serialized. This may serialize objects that are not really serializable. - return stringWriter.toString(); - } catch (IOException e) { - return e.getMessage(); - } - } - - public String getStatsAsJson() { - try { - StringWriter stringWriter = new StringWriter(); - JsonGenerator jsonGenerator = jsonMapper.createGenerator(stringWriter); - jsonGenerator.writeStartObject(); - jsonGenerator.writeArrayFieldStart("clusters"); - for (ClusterConnection cluster : clusters) { - jsonGenerator.writeStartObject(); - jsonGenerator.writeNumberField("clusterid", cluster.getClusterId()); - jsonGenerator.writeFieldName("stats"); - jsonGenerator.writeRawValue(cluster.getStatsAsJSon()); - jsonGenerator.writeEndObject(); - } - jsonGenerator.writeEndArray(); - jsonGenerator.writeFieldName("sessionParams"); - jsonGenerator.writeRawValue(sessionParamsAsXmlString); - jsonGenerator.writeFieldName("throttleDebugMessage"); - jsonGenerator.writeRawValue("\"" + throttler.getDebugMessage() + "\""); - jsonGenerator.writeEndObject(); - jsonGenerator.close(); - return stringWriter.toString(); - } catch (IOException e) { - return "{ \"Error\" : \""+ e.getMessage() + "\"}"; - } - } - -} diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/package-info.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/package-info.java deleted file mode 100644 index 13d4e768daf..00000000000 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/package-info.java +++ /dev/null @@ -1,2 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.vespa.http.client.core; diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/package-info.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/package-info.java deleted file mode 100644 index 958d3793875..00000000000 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/package-info.java +++ /dev/null @@ -1,9 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -/** - * Programmatic API for feeding to Vespa clusters independently of the - * cluster configuration. - * - * NOTE: This is a PUBLIC API, but not annotated as such because this is not a bundle and - * we don't want to introduce Vespa dependencies. - */ -package com.yahoo.vespa.http.client; diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/runner/CommandLineArguments.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/runner/CommandLineArguments.java deleted file mode 100644 index 7ccdf3ebd43..00000000000 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/runner/CommandLineArguments.java +++ /dev/null @@ -1,323 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.vespa.http.client.runner; - -import com.google.common.base.Splitter; -import com.yahoo.vespa.http.client.config.Cluster; -import com.yahoo.vespa.http.client.config.ConnectionParams; -import com.yahoo.vespa.http.client.config.Endpoint; -import com.yahoo.vespa.http.client.config.FeedParams; -import com.yahoo.vespa.http.client.config.SessionParams; -import io.airlift.command.Command; -import io.airlift.command.HelpOption; -import io.airlift.command.Option; -import io.airlift.command.SingleCommand; -import org.apache.http.Header; -import org.apache.http.ParseException; -import org.apache.http.conn.ssl.NoopHostnameVerifier; -import org.apache.http.conn.ssl.SSLConnectionSocketFactory; -import org.apache.http.message.BasicLineParser; - -import javax.inject.Inject; -import java.net.MalformedURLException; -import java.net.URL; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.time.Duration; -import java.util.ArrayList; -import java.util.List; -import java.util.Optional; -import java.util.concurrent.TimeUnit; - -/** - * Commandline interface for the binary. - * - * @author dybis - */ -@Command(name = "vespa-http-client", - description = "This is a tool for feeding xml or json data to a Vespa application.") -public class CommandLineArguments { - - /** - * Creates a CommandLineArguments instance and populates it with data. - * - * @param args array of arguments. - * @return null on failure or if help option is set to true. - */ - static CommandLineArguments build(String[] args) { - CommandLineArguments cmdArgs; - try { - cmdArgs = SingleCommand.singleCommand(CommandLineArguments.class).parse(args); - } catch (Exception e) { - System.err.println(e.getMessage()); - System.err.println("Use --help to show usage.\n"); - return null; - } - if (cmdArgs.helpOption.showHelpIfRequested()) { - return null; - } - if (cmdArgs.endpointArg != null) { - if (cmdArgs.hostArg != null) { - System.err.println("Cannot set both '--host' and '--endpoint' "); - return null; - } - try { - URL url = new URL(cmdArgs.endpointArg); - } catch (MalformedURLException e) { - e.printStackTrace(System.err); - return null; - } - } else { - if (cmdArgs.hostArg == null) { - System.err.println("'--host' or '--endpoint' not set."); - return null; - } - } - if (cmdArgs.priorityArg != null && ! checkPriorityFlag(cmdArgs.priorityArg)) { - return null; - } - - for (String header : cmdArgs.headers) { - try { - cmdArgs.parsedHeaders.add(BasicLineParser.parseHeader(header, null)); - } catch (ParseException e) { - System.err.printf("Invalid header: '%s' (%s)%n", header, e.getMessage()); - return null; - } - } - - if (cmdArgs.privateKeyPath == null && cmdArgs.certificatePath != null || - cmdArgs.privateKeyPath != null && cmdArgs.certificatePath == null) { - System.err.println("Both '--privateKey' and '--certificate' must be set"); - return null; - } - - return cmdArgs; - } - - private static boolean checkPriorityFlag(String priorityArg) { - switch (priorityArg) { - case "HIGHEST": - case "VERY_HIGH": - case "HIGH_1": - case "HIGH_2": - case "HIGH_3": - case "NORMAL_1": - case "NORMAL_2": - case "NORMAL_3": - case "NORMAL_4": - case "NORMAL_5": - case "NORMAL_6": - case "LOW_1": - case "LOW_2": - case "LOW_3": - case "VERY_LOW": - case "LOWEST": - return true; - default: - System.err.println("Not valid value for priority. Allowed values are HIGHEST, VERY_HIGH, HIGH_[1-3], " + - "NORMAL_[1-6], LOW_[1-3], VERY_LOW, and LOWEST."); - return false; - } - } - - // TODO Don't duplicate default values from ConnectionParams.Builder. Some defaults are already inconsistent. - - @Inject - private HelpOption helpOption; - - @Option(name = {"--useV3Protocol"}, description = "Use V3 protocol to gateway. This is the default protocol.") - private boolean enableV3Protocol = true; - - @Option(name = {"--file"}, - description = "The name of the input file to read.") - private String fileArg = null; - - @Option(name = {"--add-root-element-to-xml"}, - description = "Add <vespafeed> tag to XML document, makes it easier to feed raw data.") - private boolean addRootElementToXml = false; - - @Option(name = {"--route"}, - description = "(=default)The route to send the data to.") - private String routeArg = "default"; - - @Option(name = {"--endpoint"}, - description = "Vespa endpoint.") - private String endpointArg; - - @Option(name = {"--host"}, - description = "The host(s) for the gateway. If using several, use comma to separate them.") - private String hostArg; - - @Option(name = {"--port"}, - description = "The port for the host of the gateway.") - private int portArg = 4080; - - @Option(name = {"--timeout"}, - description = "(=180) The time (in seconds) allowed for sending operations.") - private long timeoutArg = 180; - - @Option(name = {"--useCompression"}, - description = "Use compression over network.") - private boolean useCompressionArg = false; - - @Option(name = {"--useDynamicThrottling"}, - description = "Try to maximize throughput by using dynamic throttling.") - private boolean useDynamicThrottlingArg = false; - - @Option(name = {"--maxpending"}, - description = "The maximum number of operations that are allowed " + - "to be pending at any given time.") - private int maxPendingOperationCountArg = 10000; - - @Option(name = {"-v", "--verbose"}, - description = "Enable verbose output of progress.") - private boolean verboseArg = false; - - @Option(name = {"--noretry"}, - description = "Turns off retries of recoverable failures..") - private boolean noRetryArg = false; - - @Option(name = {"--retrydelay"}, - description = "The time (in seconds) to wait between retries of a failed operation.") - private int retrydelayArg = 1; - - @Option(name = {"--trace"}, - description = "(=0 (=off)) The trace level of network traffic.") - private int traceArg = 0; - - @Option(name = {"--printTraceEveryXOperation"}, - description = "(=1) How often to to tracing.") - private int traceEveryXOperation = 1; - - @Option(name = {"--validate"}, - description = "Run validation tool on input files instead of feeding them.") - private boolean validateArg = false; - - @Option(name = {"--ignoreConditionNotMet"}, - description = "Ignore condition not met failures.") - private boolean ignoreConditionNotMet = false; - - @Option(name = {"--priority"}, - description = "Specify priority of sent messages, see documentation ") - private String priorityArg = null; - - @Option(name = {"--numPersistentConnectionsPerEndpoint"}, - description = "How many tcp connections to establish per endoint.)") - private int numPersistentConnectionsPerEndpoint = 4; - - @Option(name = {"--maxChunkSizeBytes"}, - description = "How much data to send to gateway in each message.") - private int maxChunkSizeBytes = 20 * 1024; - - @Option(name = {"--whenVerboseEnabledPrintMessageForEveryXDocuments"}, - description = "How often to print verbose message.)") - private int whenVerboseEnabledPrintMessageForEveryXDocuments = 1000; - - @Option(name = {"--useTls"}, - description = "Use TLS when connecting to endpoint") - private boolean useTls = false; - - @Option(name = {"--insecure", "--disable-hostname-verification"}, - description = "Skip hostname verification when using TLS") - private boolean insecure = false; - - @Option(name = {"--header"}, - description = "Add http header to every request. Header must have the format '<Name>: <Value>'. Use this parameter multiple times for multiple headers") - private List<String> headers = new ArrayList<>(); - - @Option(name = {"--vespaTls"}, - description = "BETA! Use Vespa TLS configuration from environment if available. Other HTTPS/TLS configuration will be ignored if this is set.") - private boolean useTlsConfigFromEnvironment = false; - - @Option(name = {"--connectionTimeToLive"}, - description = "Maximum time to live for persistent connections. Specified as integer, in seconds.") - private long connectionTimeToLive = 15; - - @Option(name = {"--certificate"}, - description = "Path to a file containing a PEM encoded x509 certificate") - private String certificatePath; - - @Option(name = {"--privateKey"}, - description = "Path to a file containing a PEM encoded private key") - private String privateKeyPath; - - @Option(name = "--caCertificates", - description = "Path to a file containing a PEM encoded CA certificates") - private String caCertificatesPath; - - private final List<Header> parsedHeaders = new ArrayList<>(); - - int getWhenVerboseEnabledPrintMessageForEveryXDocuments() { - return whenVerboseEnabledPrintMessageForEveryXDocuments; - } - - public String getFile() { return fileArg; }; - - public boolean getVerbose() { return verboseArg; } - - public boolean getIgnoreConditionNotMet() { return ignoreConditionNotMet; } - - public boolean getAddRootElementToXml() { return addRootElementToXml; } - - SessionParams createSessionParams(boolean useJson) { - int minThrottleValue = useDynamicThrottlingArg ? 10 : 0; - Path privateKeyPath = Optional.ofNullable(this.privateKeyPath).map(Paths::get).orElse(null); - Path certificatePath = Optional.ofNullable(this.certificatePath).map(Paths::get).orElse(null); - Path caCertificatesPath = Optional.ofNullable(this.caCertificatesPath).map(Paths::get).orElse(null); - ConnectionParams.Builder connectionParamsBuilder = new ConnectionParams.Builder(); - parsedHeaders.forEach(header -> connectionParamsBuilder.addHeader(header.getName(), header.getValue())); - SessionParams.Builder builder = new SessionParams.Builder() - .setFeedParams( - new FeedParams.Builder() - .setDataFormat(useJson - ? FeedParams.DataFormat.JSON_UTF8 - : FeedParams.DataFormat.XML_UTF8) - .setRoute(routeArg) - .setMaxInFlightRequests(maxPendingOperationCountArg) - .setClientTimeout(timeoutArg, TimeUnit.SECONDS) - .setServerTimeout(timeoutArg, TimeUnit.SECONDS) - .setLocalQueueTimeOut(timeoutArg * 1000) - .setPriority(priorityArg) - .setMaxChunkSizeBytes(maxChunkSizeBytes) - .build() - ) - .setConnectionParams( - connectionParamsBuilder - .setHostnameVerifier(insecure ? NoopHostnameVerifier.INSTANCE : - SSLConnectionSocketFactory.getDefaultHostnameVerifier()) - .setUseCompression(useCompressionArg) - .setMaxRetries(noRetryArg ? 0 : 100) - .setMinTimeBetweenRetries(retrydelayArg, TimeUnit.SECONDS) - .setDryRun(validateArg) - .setTraceLevel(traceArg) - .setTraceEveryXOperation(traceEveryXOperation) - .setPrintTraceToStdErr(traceArg > 0) - .setNumPersistentConnectionsPerEndpoint(numPersistentConnectionsPerEndpoint) - .setCertificateAndPrivateKey(privateKeyPath, certificatePath) - .setCaCertificates(caCertificatesPath) - .setUseTlsConfigFromEnvironment(useTlsConfigFromEnvironment) - .setConnectionTimeToLive(Duration.ofSeconds(connectionTimeToLive)) - .build() - ) - // Enable dynamic throttling. - .setThrottlerMinSize(minThrottleValue) - .setClientQueueSize(maxPendingOperationCountArg); - if (endpointArg != null) { - try { - builder.addCluster(new Cluster.Builder() - .addEndpoint(Endpoint.create(new URL(endpointArg))) - .build()); - } - catch (MalformedURLException e) {} // already checked when parsing arguments - } - else { - Iterable<String> hosts = Splitter.on(',').trimResults().split(hostArg); - for (String host : hosts) { - builder.addCluster(new Cluster.Builder().addEndpoint(Endpoint.create(host, portArg, useTls)) - .build()); - } - } - return builder.build(); - } - -} diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/runner/FormatInputStream.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/runner/FormatInputStream.java deleted file mode 100644 index dd6fbc29e5f..00000000000 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/runner/FormatInputStream.java +++ /dev/null @@ -1,101 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.vespa.http.client.runner; - -import com.fasterxml.jackson.core.JsonFactory; -import com.fasterxml.jackson.core.format.DataFormatDetector; -import com.fasterxml.jackson.core.format.DataFormatMatcher; -import com.fasterxml.jackson.core.format.MatchStrength; -import com.fasterxml.jackson.dataformat.xml.XmlFactory; - -import java.io.BufferedInputStream; -import java.io.ByteArrayInputStream; -import java.io.FileInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.SequenceInputStream; -import java.util.Arrays; -import java.util.Collections; -import java.util.Optional; - -/** - * @author valerijf - */ -public class FormatInputStream { - - private InputStream inputStream; - private final Format format; - - /** - * Creates a single data input stream from either file or InputStream depending on which one is present. Preference - * for file if both present. Additionally also detects input data format of the result stream, throws - * IllegalArgumentException if unable to determine data format. - * - * @param stream InputStream of the data if present - * @param inputFile path to file to use as input - * @param addRootElementToXml to add vespafeed root element around the input data stream - * @throws IOException on errors - */ - public FormatInputStream(InputStream stream, Optional<String> inputFile, boolean addRootElementToXml) - throws IOException { - DataFormatDetector dataFormatDetector = new DataFormatDetector(new JsonFactory(), new XmlFactory()); - DataFormatMatcher formatMatcher; - - if (inputFile.isPresent()) { - try (FileInputStream fileInputStream = new FileInputStream(inputFile.get())) { - formatMatcher = dataFormatDetector.findFormat(fileInputStream); - } - inputStream = new FileInputStream(inputFile.get()); - - } else { - if (stream.available() == 0) - System.out.println("No data in stream yet and no file specified, waiting for data."); - - inputStream = stream.markSupported() ? stream : new BufferedInputStream(stream); - inputStream.mark(DataFormatDetector.DEFAULT_MAX_INPUT_LOOKAHEAD); - formatMatcher = dataFormatDetector.findFormat(inputStream); - inputStream.reset(); - } - - if (addRootElementToXml) { - inputStream = addVespafeedTag(inputStream); - format = Format.XML; - return; - } - - if (formatMatcher.getMatchStrength() == MatchStrength.INCONCLUSIVE - || formatMatcher.getMatchStrength() == MatchStrength.NO_MATCH) { - throw new IllegalArgumentException("Could not detect input format"); - } - - switch (formatMatcher.getMatchedFormatName().toLowerCase()) { - case "json": - format = Format.JSON; - break; - case "xml": - format = Format.XML; - break; - default: - throw new IllegalArgumentException("Unknown data format"); - } - } - - private static InputStream addVespafeedTag(InputStream inputStream) { - return new SequenceInputStream(Collections.enumeration(Arrays.asList( - new ByteArrayInputStream("<vespafeed>".getBytes()), inputStream, - new ByteArrayInputStream("</vespafeed>".getBytes()))) - ); - } - - public InputStream getInputStream() { - return inputStream; - } - - public Format getFormat() { - return format; - } - - public enum Format { - JSON, XML - } - -} diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/runner/Runner.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/runner/Runner.java deleted file mode 100644 index e3e90c8bbfc..00000000000 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/runner/Runner.java +++ /dev/null @@ -1,106 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.vespa.http.client.runner; - -import com.yahoo.vespa.http.client.FeedClient; -import com.yahoo.vespa.http.client.FeedClientFactory; -import com.yahoo.vespa.http.client.SimpleLoggerResultCallback; -import com.yahoo.vespa.http.client.core.JsonReader; -import com.yahoo.vespa.http.client.core.XmlFeedReader; - -import java.io.File; -import java.io.IOException; -import java.io.InputStream; -import java.time.Clock; -import java.util.Optional; -import java.util.concurrent.atomic.AtomicInteger; - -/** - * @author Einar M R Rosenvinge - * @author dybis - */ -public class Runner { - - /** - * Feed data from inputFile to session. - * - * @param feedClient where to send data to - * @param inputStream source of data - * @param isJson if input stream is of json formatted data - * @param numSent is updated while sending by this method - * @param verbose if true will print some information to stderr - * @return send time in ms, not including validating - */ - public static long send(FeedClient feedClient, - InputStream inputStream, - boolean isJson, - AtomicInteger numSent, - boolean verbose) { - Clock clock = Clock.systemUTC(); - if (verbose) - System.err.println("Now sending data."); - - long sendStartTime = clock.millis(); - if (isJson) { - JsonReader.read(inputStream, feedClient, numSent); - } else { - try { - XmlFeedReader.read(inputStream, feedClient, numSent); - } catch (Exception e) { - System.err.println("Stopped reading feed, got problems with XML: " + e.getMessage()); - } - } - - long sendTotalTime = clock.millis() - sendStartTime; - - if (verbose) - System.err.println("Waiting for all results, sent " + numSent.get() + " docs."); - - feedClient.close(); - if (verbose) - System.err.println("Session closed."); - return sendTotalTime; - } - - - public static void main(String[] args) throws IOException { - CommandLineArguments commandLineArgs = CommandLineArguments.build(args); - if (commandLineArgs == null) - System.exit(1); - - FormatInputStream formatInputStream = new FormatInputStream(System.in, - Optional.ofNullable(commandLineArgs.getFile()), - commandLineArgs.getAddRootElementToXml()); - - int intervalOfLogging = - commandLineArgs.getVerbose() - ? commandLineArgs.getWhenVerboseEnabledPrintMessageForEveryXDocuments() - : Integer.MAX_VALUE; - AtomicInteger numSent = new AtomicInteger(0); - SimpleLoggerResultCallback callback = new SimpleLoggerResultCallback(numSent, intervalOfLogging, commandLineArgs.getIgnoreConditionNotMet()); - - FeedClient feedClient = FeedClientFactory.create(commandLineArgs.createSessionParams(formatInputStream.getFormat()== FormatInputStream.Format.JSON), - callback); - - long sendTotalTimeMs = send(feedClient, - formatInputStream.getInputStream(), - formatInputStream.getFormat() == FormatInputStream.Format.JSON, - numSent, - commandLineArgs.getVerbose()); - - if (commandLineArgs.getVerbose()) { - System.err.println(feedClient.getStatsAsJson()); - double transferTimeSec = ((double) sendTotalTimeMs) / 1000.0; - if (transferTimeSec > 0) - System.err.printf("Docs/sec %.3f%n", numSent.get() / transferTimeSec); - - if (commandLineArgs.getFile() != null) { - double fileSizeMb = ((double) new File(commandLineArgs.getFile()).length()) / 1024.0 / 1024.0; - System.err.println("Sent " + fileSizeMb + " MB in " + transferTimeSec + " seconds."); - System.err.println("Speed: " + ((fileSizeMb / transferTimeSec) * 8.0) + " Mbits/sec, + HTTP overhead " + - "(not taking compression into account)"); - } - } - callback.printProgress(); - } - -} diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/runner/package-info.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/runner/package-info.java deleted file mode 100644 index 8be767d67cf..00000000000 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/runner/package-info.java +++ /dev/null @@ -1,2 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.vespa.http.client.runner; |