summaryrefslogtreecommitdiffstats
path: root/vespa-feed-client
diff options
context:
space:
mode:
authorJon Marius Venstad <venstad@gmail.com>2021-05-14 15:01:40 +0200
committerJon Marius Venstad <venstad@gmail.com>2021-05-14 15:01:40 +0200
commit97a3551624f32fbf3888edeca7499098e75857d7 (patch)
tree4dd5fd8a792b1a9acbdeef2823e6d370e599bdc4 /vespa-feed-client
parent861fc6eb317b8104562d6e0b7af568d5312f5ff1 (diff)
More draft
Diffstat (limited to 'vespa-feed-client')
-rw-r--r--vespa-feed-client/src/main/java/com/yahoo/vespa/feed/client/DocumentId.java78
-rw-r--r--vespa-feed-client/src/main/java/com/yahoo/vespa/feed/client/FeedClient.java13
-rw-r--r--vespa-feed-client/src/main/java/com/yahoo/vespa/feed/client/FeedClientBuilder.java64
-rw-r--r--vespa-feed-client/src/main/java/com/yahoo/vespa/feed/client/HttpFeedClient.java101
-rw-r--r--vespa-feed-client/src/main/java/com/yahoo/vespa/feed/client/OperationParameters.java5
-rw-r--r--vespa-feed-client/src/main/java/com/yahoo/vespa/feed/client/Result.java26
6 files changed, 255 insertions, 32 deletions
diff --git a/vespa-feed-client/src/main/java/com/yahoo/vespa/feed/client/DocumentId.java b/vespa-feed-client/src/main/java/com/yahoo/vespa/feed/client/DocumentId.java
new file mode 100644
index 00000000000..c88c84ecffc
--- /dev/null
+++ b/vespa-feed-client/src/main/java/com/yahoo/vespa/feed/client/DocumentId.java
@@ -0,0 +1,78 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.feed.client;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.OptionalLong;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * @author jonmv
+ */
+public class DocumentId {
+
+ private final String documentType;
+ private final String namespace;
+ private final OptionalLong number;
+ private final Optional<String> group;
+ private final String userSpecific;
+
+ private DocumentId(String documentType, String namespace, OptionalLong number, Optional<String> group, String userSpecific) {
+ this.documentType = requireNonNull(documentType);
+ this.namespace = requireNonNull(namespace);
+ this.number = requireNonNull(number);
+ this.group = requireNonNull(group);
+ this.userSpecific = requireNonNull(userSpecific);
+ }
+
+ public static DocumentId of(String namespace, String documentType, String userSpecific) {
+ return new DocumentId(namespace, documentType, OptionalLong.empty(), Optional.empty(), userSpecific);
+ }
+
+ public static DocumentId of(String namespace, String documentType, long number, String userSpecific) {
+ return new DocumentId(namespace, documentType, OptionalLong.of(number), Optional.empty(), userSpecific);
+ }
+
+ public static DocumentId of(String namespace, String documentType, String group, String userSpecific) {
+ return new DocumentId(namespace, documentType, OptionalLong.empty(), Optional.of(group), userSpecific);
+ }
+
+ public static DocumentId of(String serialized) {
+ String[] parts = serialized.split(":");
+ while (parts.length >= 5 && parts[0].equals("id")) {
+ if (parts[3].startsWith("n="))
+ return DocumentId.of(parts[1], parts[2], Long.parseLong(parts[3]), parts[4]);
+ if (parts[3].startsWith("g="))
+ return DocumentId.of(parts[1], parts[2], parts[3], parts[4]);
+ else if (parts[3].isEmpty())
+ return DocumentId.of(parts[1], parts[2], parts[4]);
+ }
+ throw new IllegalArgumentException("Document ID must be on the form " +
+ "'id:<namespace>:<document-type>:[n=number|g=group]:<user-specific>', " +
+ "but was '" + serialized + "'");
+ }
+
+ public String documentType() {
+ return documentType;
+ }
+
+ public String namespace() {
+ return namespace;
+ }
+
+ public OptionalLong number() {
+ return number;
+ }
+
+ public Optional<String> group() {
+ return group;
+ }
+
+ public String userSpecific() {
+ return userSpecific;
+ }
+
+}
diff --git a/vespa-feed-client/src/main/java/com/yahoo/vespa/feed/client/FeedClient.java b/vespa-feed-client/src/main/java/com/yahoo/vespa/feed/client/FeedClient.java
index 2233b0a1a39..9d796230354 100644
--- a/vespa-feed-client/src/main/java/com/yahoo/vespa/feed/client/FeedClient.java
+++ b/vespa-feed-client/src/main/java/com/yahoo/vespa/feed/client/FeedClient.java
@@ -2,19 +2,16 @@
package com.yahoo.vespa.feed.client;
import java.io.Closeable;
-import java.util.concurrent.Future;
+import java.util.concurrent.CompletableFuture;
/**
* @author bjorncs
+ * @author jonmv
*/
public interface FeedClient extends Closeable {
- Future<Result> put(String documentId, String documentJson, OperationParameters params, ResultCallback callback);
- Future<Result> remove(String documentId, OperationParameters params, ResultCallback callback);
- Future<Result> update(String documentId, String documentJson, OperationParameters params, ResultCallback callback);
- interface ResultCallback {
- void completed(Result result);
- void failed(FeedException e);
- }
+ CompletableFuture<Result> put(DocumentId documentId, String documentJson, OperationParameters params);
+ CompletableFuture<Result> update(DocumentId documentId, String updateJson, OperationParameters params);
+ CompletableFuture<Result> remove(DocumentId documentId, OperationParameters params);
}
diff --git a/vespa-feed-client/src/main/java/com/yahoo/vespa/feed/client/FeedClientBuilder.java b/vespa-feed-client/src/main/java/com/yahoo/vespa/feed/client/FeedClientBuilder.java
index 9e5f2a53a8d..0e746d51a51 100644
--- a/vespa-feed-client/src/main/java/com/yahoo/vespa/feed/client/FeedClientBuilder.java
+++ b/vespa-feed-client/src/main/java/com/yahoo/vespa/feed/client/FeedClientBuilder.java
@@ -6,39 +6,81 @@ import javax.net.ssl.SSLContext;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
+import java.util.Objects;
import java.util.function.Supplier;
+import static java.util.Objects.requireNonNull;
+
/**
* Builder for creating a {@link FeedClient} instance.
*
* @author bjorncs
+ * @author jonmv
*/
public class FeedClientBuilder {
+ final URI endpoint;
+ final Map<String, Supplier<String>> requestHeaders = new HashMap<>();
SSLContext sslContext;
HostnameVerifier hostnameVerifier;
- final Map<String, Supplier<String>> requestHeaders = new HashMap<>();
- URI endpoint;
Integer maxConnections;
+ Integer maxStreamsPerConnection;
- public static FeedClientBuilder create() { return new FeedClientBuilder(); }
+ public static FeedClientBuilder create(URI endpoint) { return new FeedClientBuilder(endpoint); }
- private FeedClientBuilder() {}
+ private FeedClientBuilder(URI endpoint) {
+ requireNonNull(endpoint.getHost());
+ this.endpoint = endpoint;
+ }
- public FeedClientBuilder setMaxConnection(int max) { this.maxConnections = max; return this; }
+ /**
+ * Sets the maximum number of connections this client will use.
+ *
+ * A reasonable value here is a small multiple of the numbers of containers in the
+ * cluster to feed, so load can be balanced across these.
+ * In general, this value should be kept as low as possible, but poor connectivity
+ * between feeder and cluster may also warrant a higher number of connections.
+ */
+ public FeedClientBuilder setMaxConnections(int max) {
+ if (max < 1) throw new IllegalArgumentException("Max connections must be at least 1, but was " + max);
+ this.maxConnections = max;
+ return this;
+ }
- public FeedClientBuilder setEndpoint(URI endpoint) { this.endpoint = endpoint; return this; }
+ /**
+ * Sets the maximum number of streams per HTTP/2 connection for this client.
+ *
+ * This determines the maximum number of concurrent, inflight requests for this client,
+ * which is {@code maxConnections * maxStreamsPerConnection}. Prefer more streams over
+ * more connections, when possible. The server's maximum is usually around 128-256.
+ */
+ public FeedClientBuilder setMaxStreamPerConnection(int max) {
+ if (max < 1) throw new IllegalArgumentException("Max streams per connection must be at least 1, but was " + max);
+ this.maxStreamsPerConnection = max;
+ return this;
+ }
- public FeedClientBuilder setSslContext(SSLContext context) { this.sslContext = context; return this; }
+ public FeedClientBuilder setSslContext(SSLContext context) {
+ this.sslContext = requireNonNull(context);
+ return this;
+ }
- public FeedClientBuilder setHostnameVerifier(HostnameVerifier verifier) { this.hostnameVerifier = verifier; return this; }
+ public FeedClientBuilder setHostnameVerifier(HostnameVerifier verifier) {
+ this.hostnameVerifier = requireNonNull(verifier);
+ return this;
+ }
- public FeedClientBuilder addRequestHeader(String name, String value) { return addRequestHeader(name, () -> value); }
+ public FeedClientBuilder addRequestHeader(String name, String value) {
+ return addRequestHeader(name, () -> requireNonNull(value));
+ }
public FeedClientBuilder addRequestHeader(String name, Supplier<String> valueSupplier) {
- this.requestHeaders.put(name, valueSupplier);
+ this.requestHeaders.put(requireNonNull(name), requireNonNull(valueSupplier));
return this;
}
- public FeedClient build() { return new HttpFeedClient(this); }
+ public FeedClient build() {
+ return new HttpFeedClient(this);
+ }
+
}
diff --git a/vespa-feed-client/src/main/java/com/yahoo/vespa/feed/client/HttpFeedClient.java b/vespa-feed-client/src/main/java/com/yahoo/vespa/feed/client/HttpFeedClient.java
index a7e39001117..6edad257d47 100644
--- a/vespa-feed-client/src/main/java/com/yahoo/vespa/feed/client/HttpFeedClient.java
+++ b/vespa-feed-client/src/main/java/com/yahoo/vespa/feed/client/HttpFeedClient.java
@@ -1,13 +1,18 @@
// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.vespa.feed.client;
+import org.apache.hc.client5.http.async.methods.SimpleHttpRequest;
+import org.apache.hc.client5.http.async.methods.SimpleHttpResponse;
import org.apache.hc.client5.http.config.RequestConfig;
import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
import org.apache.hc.client5.http.impl.async.HttpAsyncClientBuilder;
import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManagerBuilder;
import org.apache.hc.client5.http.ssl.ClientTlsStrategyBuilder;
+import org.apache.hc.core5.concurrent.FutureCallback;
+import org.apache.hc.core5.http.ContentType;
import org.apache.hc.core5.http.message.BasicHeader;
import org.apache.hc.core5.http2.config.H2Config;
+import org.apache.hc.core5.net.URIBuilder;
import org.apache.hc.core5.pool.PoolConcurrencyPolicy;
import org.apache.hc.core5.reactor.IOReactorConfig;
import org.apache.hc.core5.util.TimeValue;
@@ -15,27 +20,38 @@ import org.apache.hc.core5.util.Timeout;
import java.io.IOException;
import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
-import java.util.concurrent.Future;
+import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
+import static java.util.Objects.requireNonNull;
+
/**
* HTTP implementation of {@link FeedClient}
*
* @author bjorncs
+ * @author jonmv
*/
class HttpFeedClient implements FeedClient {
private final CloseableHttpAsyncClient httpClient;
private final URI endpoint;
private final Map<String, Supplier<String>> requestHeaders;
+ private final int maxPendingRequests;
HttpFeedClient(FeedClientBuilder builder) {
this.httpClient = createHttpClient(builder);
this.endpoint = getEndpoint(builder);
this.requestHeaders = new HashMap<>(builder.requestHeaders);
+ this.maxPendingRequests = (builder.maxConnections != null ? builder.maxConnections : 4)
+ * (builder.maxStreamsPerConnection != null ? builder.maxStreamsPerConnection : 128);
+
+ this.httpClient.start();
}
private static CloseableHttpAsyncClient createHttpClient(FeedClientBuilder builder) {
@@ -55,7 +71,7 @@ class HttpFeedClient implements FeedClient {
.setResponseTimeout(Timeout.ofMinutes(5))
.build())
.setH2Config(H2Config.custom()
- .setMaxConcurrentStreams(128)
+ .setMaxConcurrentStreams(builder.maxStreamsPerConnection != null ? builder.maxStreamsPerConnection : 128)
.setPushEnabled(false)
.build());
@@ -83,20 +99,89 @@ class HttpFeedClient implements FeedClient {
}
@Override
- public Future<Result> put(String documentId, String documentJson, OperationParameters params, ResultCallback callback) {
- return null;
+ public CompletableFuture<Result> put(DocumentId documentId, String documentJson, OperationParameters params) {
+ return send("POST", documentId, requireNonNull(documentJson), params);
}
@Override
- public Future<Result> remove(String documentId, OperationParameters params, ResultCallback callback) {
- return null;
+ public CompletableFuture<Result> update(DocumentId documentId, String updateJson, OperationParameters params) {
+ return send("PUT", documentId, requireNonNull(updateJson), params);
}
@Override
- public Future<Result> update(String documentId, String documentJson, OperationParameters params, ResultCallback callback) {
- return null;
+ public CompletableFuture<Result> remove(DocumentId documentId, OperationParameters params) {
+ return send("DELETE", documentId, null, params);
}
@Override public void close() throws IOException { this.httpClient.close(); }
+ private CompletableFuture<Result> send(String method, DocumentId documentId, String operationJson, OperationParameters params) {
+ SimpleHttpRequest request = new SimpleHttpRequest(method, operationUrl(endpoint, documentId, params));
+ requestHeaders.forEach(request::setHeader);
+ if (operationJson != null)
+ request.setBody(operationJson, ContentType.APPLICATION_JSON);
+
+ CompletableFuture<Result> future = new CompletableFuture<>();
+ httpClient.execute(new SimpleHttpRequest(method, endpoint),
+ new FutureCallback<SimpleHttpResponse>() {
+ @Override public void completed(SimpleHttpResponse response) {
+ Result result = toResult(response, documentId);
+ future.complete(result); // TODO: add retrying
+ }
+ @Override public void failed(Exception ex) {
+ Result result = new Result(Result.Type.failure, documentId, ex.getMessage(), null);
+ future.completeExceptionally(ex); // TODO: add retrying
+ }
+ @Override public void cancelled() {
+ Result result = new Result(Result.Type.cancelled, documentId, null, null);
+ future.cancel(false); // TODO: add retrying
+ }
+ });
+ return future;
+ }
+
+ static Result toResult(SimpleHttpResponse response, DocumentId documentId) {
+ return new Result(Result.Type.failure, documentId, null, null); // TODO: parse JSON and status code
+ }
+
+ static List<String> toPath(DocumentId documentId) {
+ List<String> path = new ArrayList<>();
+ path.add("document");
+ path.add("v1");
+ path.add(documentId.namespace());
+ path.add(documentId.documentType());
+ if (documentId.number().isPresent()) {
+ path.add("number");
+ path.add(Long.toUnsignedString(documentId.number().getAsLong()));
+ }
+ else if (documentId.group().isPresent()) {
+ path.add("group");
+ path.add(documentId.group().get());
+ }
+ else {
+ path.add("docid");
+ }
+ path.add(documentId.userSpecific());
+
+ return path;
+ }
+
+ static URI operationUrl(URI endpoint, DocumentId documentId, OperationParameters params) {
+ URIBuilder url = new URIBuilder(endpoint);
+ url.setPathSegments(toPath(documentId));
+
+ if (params.createIfNonExistent()) url.addParameter("create", "true");
+ params.testAndSetCondition().ifPresent(condition -> url.addParameter("condition", condition));
+ params.timeout().ifPresent(timeout -> url.addParameter("timeout", timeout.toMillis() + "ms"));
+ params.route().ifPresent(route -> url.addParameter("route", route));
+ params.tracelevel().ifPresent(tracelevel -> url.addParameter("tracelevel", Integer.toString(tracelevel)));
+
+ try {
+ return url.build();
+ }
+ catch (URISyntaxException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
}
diff --git a/vespa-feed-client/src/main/java/com/yahoo/vespa/feed/client/OperationParameters.java b/vespa-feed-client/src/main/java/com/yahoo/vespa/feed/client/OperationParameters.java
index 977a4b2d44e..35756a93329 100644
--- a/vespa-feed-client/src/main/java/com/yahoo/vespa/feed/client/OperationParameters.java
+++ b/vespa-feed-client/src/main/java/com/yahoo/vespa/feed/client/OperationParameters.java
@@ -3,9 +3,11 @@ package com.yahoo.vespa.feed.client;
import java.time.Duration;
import java.util.Optional;
+import java.util.OptionalInt;
/**
* @author bjorncs
+ * @author jonmv
*/
public class OperationParameters {
@@ -15,8 +17,11 @@ public class OperationParameters {
}
+ public boolean createIfNonExistent() { return false; }
public Optional<String> testAndSetCondition() { return Optional.empty(); }
public Optional<Duration> timeout() { return Optional.empty(); }
+ public Optional<String> route() { return Optional.empty(); }
+ public OptionalInt tracelevel() { return OptionalInt.empty(); }
public static class Builder {
diff --git a/vespa-feed-client/src/main/java/com/yahoo/vespa/feed/client/Result.java b/vespa-feed-client/src/main/java/com/yahoo/vespa/feed/client/Result.java
index 3a3f695ca2b..da7f5580289 100644
--- a/vespa-feed-client/src/main/java/com/yahoo/vespa/feed/client/Result.java
+++ b/vespa-feed-client/src/main/java/com/yahoo/vespa/feed/client/Result.java
@@ -5,15 +5,31 @@ import java.util.Optional;
/**
* @author bjorncs
+ * @author jonmv
*/
public class Result {
- public enum Type {
+ private final Type type;
+ private final DocumentId documentId;
+ private final String resultMessage;
+ private final String traceMessage;
+ Result(Type type, DocumentId documentId, String resultMessage, String traceMessage) {
+ this.type = type;
+ this.documentId = documentId;
+ this.resultMessage = resultMessage;
+ this.traceMessage = traceMessage;
}
- public Type type() { return null; }
- public String documentId() { return null; }
- public String resultMessage() { return null; }
- public Optional<String> traceMessage() { return Optional.empty(); }
+ public enum Type {
+ success,
+ cancelled,
+ failure
+ }
+
+ public Type type() { return type; }
+ public DocumentId documentId() { return documentId; }
+ public Optional<String> resultMessage() { return Optional.ofNullable(resultMessage); }
+ public Optional<String> traceMessage() { return Optional.ofNullable(traceMessage); }
+
}