diff options
6 files changed, 255 insertions, 32 deletions
diff --git a/vespa-feed-client/src/main/java/com/yahoo/vespa/feed/client/DocumentId.java b/vespa-feed-client/src/main/java/com/yahoo/vespa/feed/client/DocumentId.java new file mode 100644 index 00000000000..c88c84ecffc --- /dev/null +++ b/vespa-feed-client/src/main/java/com/yahoo/vespa/feed/client/DocumentId.java @@ -0,0 +1,78 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.feed.client; + +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.OptionalInt; +import java.util.OptionalLong; + +import static java.util.Objects.requireNonNull; + +/** + * @author jonmv + */ +public class DocumentId { + + private final String documentType; + private final String namespace; + private final OptionalLong number; + private final Optional<String> group; + private final String userSpecific; + + private DocumentId(String documentType, String namespace, OptionalLong number, Optional<String> group, String userSpecific) { + this.documentType = requireNonNull(documentType); + this.namespace = requireNonNull(namespace); + this.number = requireNonNull(number); + this.group = requireNonNull(group); + this.userSpecific = requireNonNull(userSpecific); + } + + public static DocumentId of(String namespace, String documentType, String userSpecific) { + return new DocumentId(namespace, documentType, OptionalLong.empty(), Optional.empty(), userSpecific); + } + + public static DocumentId of(String namespace, String documentType, long number, String userSpecific) { + return new DocumentId(namespace, documentType, OptionalLong.of(number), Optional.empty(), userSpecific); + } + + public static DocumentId of(String namespace, String documentType, String group, String userSpecific) { + return new DocumentId(namespace, documentType, OptionalLong.empty(), Optional.of(group), userSpecific); + } + + public static DocumentId of(String serialized) { + String[] parts = serialized.split(":"); + while (parts.length >= 5 && parts[0].equals("id")) { + if (parts[3].startsWith("n=")) + return DocumentId.of(parts[1], parts[2], Long.parseLong(parts[3]), parts[4]); + if (parts[3].startsWith("g=")) + return DocumentId.of(parts[1], parts[2], parts[3], parts[4]); + else if (parts[3].isEmpty()) + return DocumentId.of(parts[1], parts[2], parts[4]); + } + throw new IllegalArgumentException("Document ID must be on the form " + + "'id:<namespace>:<document-type>:[n=number|g=group]:<user-specific>', " + + "but was '" + serialized + "'"); + } + + public String documentType() { + return documentType; + } + + public String namespace() { + return namespace; + } + + public OptionalLong number() { + return number; + } + + public Optional<String> group() { + return group; + } + + public String userSpecific() { + return userSpecific; + } + +} diff --git a/vespa-feed-client/src/main/java/com/yahoo/vespa/feed/client/FeedClient.java b/vespa-feed-client/src/main/java/com/yahoo/vespa/feed/client/FeedClient.java index 2233b0a1a39..9d796230354 100644 --- a/vespa-feed-client/src/main/java/com/yahoo/vespa/feed/client/FeedClient.java +++ b/vespa-feed-client/src/main/java/com/yahoo/vespa/feed/client/FeedClient.java @@ -2,19 +2,16 @@ package com.yahoo.vespa.feed.client; import java.io.Closeable; -import java.util.concurrent.Future; +import java.util.concurrent.CompletableFuture; /** * @author bjorncs + * @author jonmv */ public interface FeedClient extends Closeable { - Future<Result> put(String documentId, String documentJson, OperationParameters params, ResultCallback callback); - Future<Result> remove(String documentId, OperationParameters params, ResultCallback callback); - Future<Result> update(String documentId, String documentJson, OperationParameters params, ResultCallback callback); - interface ResultCallback { - void completed(Result result); - void failed(FeedException e); - } + CompletableFuture<Result> put(DocumentId documentId, String documentJson, OperationParameters params); + CompletableFuture<Result> update(DocumentId documentId, String updateJson, OperationParameters params); + CompletableFuture<Result> remove(DocumentId documentId, OperationParameters params); } diff --git a/vespa-feed-client/src/main/java/com/yahoo/vespa/feed/client/FeedClientBuilder.java b/vespa-feed-client/src/main/java/com/yahoo/vespa/feed/client/FeedClientBuilder.java index 9e5f2a53a8d..0e746d51a51 100644 --- a/vespa-feed-client/src/main/java/com/yahoo/vespa/feed/client/FeedClientBuilder.java +++ b/vespa-feed-client/src/main/java/com/yahoo/vespa/feed/client/FeedClientBuilder.java @@ -6,39 +6,81 @@ import javax.net.ssl.SSLContext; import java.net.URI; import java.util.HashMap; import java.util.Map; +import java.util.Objects; import java.util.function.Supplier; +import static java.util.Objects.requireNonNull; + /** * Builder for creating a {@link FeedClient} instance. * * @author bjorncs + * @author jonmv */ public class FeedClientBuilder { + final URI endpoint; + final Map<String, Supplier<String>> requestHeaders = new HashMap<>(); SSLContext sslContext; HostnameVerifier hostnameVerifier; - final Map<String, Supplier<String>> requestHeaders = new HashMap<>(); - URI endpoint; Integer maxConnections; + Integer maxStreamsPerConnection; - public static FeedClientBuilder create() { return new FeedClientBuilder(); } + public static FeedClientBuilder create(URI endpoint) { return new FeedClientBuilder(endpoint); } - private FeedClientBuilder() {} + private FeedClientBuilder(URI endpoint) { + requireNonNull(endpoint.getHost()); + this.endpoint = endpoint; + } - public FeedClientBuilder setMaxConnection(int max) { this.maxConnections = max; return this; } + /** + * Sets the maximum number of connections this client will use. + * + * A reasonable value here is a small multiple of the numbers of containers in the + * cluster to feed, so load can be balanced across these. + * In general, this value should be kept as low as possible, but poor connectivity + * between feeder and cluster may also warrant a higher number of connections. + */ + public FeedClientBuilder setMaxConnections(int max) { + if (max < 1) throw new IllegalArgumentException("Max connections must be at least 1, but was " + max); + this.maxConnections = max; + return this; + } - public FeedClientBuilder setEndpoint(URI endpoint) { this.endpoint = endpoint; return this; } + /** + * Sets the maximum number of streams per HTTP/2 connection for this client. + * + * This determines the maximum number of concurrent, inflight requests for this client, + * which is {@code maxConnections * maxStreamsPerConnection}. Prefer more streams over + * more connections, when possible. The server's maximum is usually around 128-256. + */ + public FeedClientBuilder setMaxStreamPerConnection(int max) { + if (max < 1) throw new IllegalArgumentException("Max streams per connection must be at least 1, but was " + max); + this.maxStreamsPerConnection = max; + return this; + } - public FeedClientBuilder setSslContext(SSLContext context) { this.sslContext = context; return this; } + public FeedClientBuilder setSslContext(SSLContext context) { + this.sslContext = requireNonNull(context); + return this; + } - public FeedClientBuilder setHostnameVerifier(HostnameVerifier verifier) { this.hostnameVerifier = verifier; return this; } + public FeedClientBuilder setHostnameVerifier(HostnameVerifier verifier) { + this.hostnameVerifier = requireNonNull(verifier); + return this; + } - public FeedClientBuilder addRequestHeader(String name, String value) { return addRequestHeader(name, () -> value); } + public FeedClientBuilder addRequestHeader(String name, String value) { + return addRequestHeader(name, () -> requireNonNull(value)); + } public FeedClientBuilder addRequestHeader(String name, Supplier<String> valueSupplier) { - this.requestHeaders.put(name, valueSupplier); + this.requestHeaders.put(requireNonNull(name), requireNonNull(valueSupplier)); return this; } - public FeedClient build() { return new HttpFeedClient(this); } + public FeedClient build() { + return new HttpFeedClient(this); + } + } diff --git a/vespa-feed-client/src/main/java/com/yahoo/vespa/feed/client/HttpFeedClient.java b/vespa-feed-client/src/main/java/com/yahoo/vespa/feed/client/HttpFeedClient.java index a7e39001117..6edad257d47 100644 --- a/vespa-feed-client/src/main/java/com/yahoo/vespa/feed/client/HttpFeedClient.java +++ b/vespa-feed-client/src/main/java/com/yahoo/vespa/feed/client/HttpFeedClient.java @@ -1,13 +1,18 @@ // Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.vespa.feed.client; +import org.apache.hc.client5.http.async.methods.SimpleHttpRequest; +import org.apache.hc.client5.http.async.methods.SimpleHttpResponse; import org.apache.hc.client5.http.config.RequestConfig; import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient; import org.apache.hc.client5.http.impl.async.HttpAsyncClientBuilder; import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManagerBuilder; import org.apache.hc.client5.http.ssl.ClientTlsStrategyBuilder; +import org.apache.hc.core5.concurrent.FutureCallback; +import org.apache.hc.core5.http.ContentType; import org.apache.hc.core5.http.message.BasicHeader; import org.apache.hc.core5.http2.config.H2Config; +import org.apache.hc.core5.net.URIBuilder; import org.apache.hc.core5.pool.PoolConcurrencyPolicy; import org.apache.hc.core5.reactor.IOReactorConfig; import org.apache.hc.core5.util.TimeValue; @@ -15,27 +20,38 @@ import org.apache.hc.core5.util.Timeout; import java.io.IOException; import java.net.URI; +import java.net.URISyntaxException; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; -import java.util.concurrent.Future; +import java.util.concurrent.CompletableFuture; import java.util.function.Supplier; +import static java.util.Objects.requireNonNull; + /** * HTTP implementation of {@link FeedClient} * * @author bjorncs + * @author jonmv */ class HttpFeedClient implements FeedClient { private final CloseableHttpAsyncClient httpClient; private final URI endpoint; private final Map<String, Supplier<String>> requestHeaders; + private final int maxPendingRequests; HttpFeedClient(FeedClientBuilder builder) { this.httpClient = createHttpClient(builder); this.endpoint = getEndpoint(builder); this.requestHeaders = new HashMap<>(builder.requestHeaders); + this.maxPendingRequests = (builder.maxConnections != null ? builder.maxConnections : 4) + * (builder.maxStreamsPerConnection != null ? builder.maxStreamsPerConnection : 128); + + this.httpClient.start(); } private static CloseableHttpAsyncClient createHttpClient(FeedClientBuilder builder) { @@ -55,7 +71,7 @@ class HttpFeedClient implements FeedClient { .setResponseTimeout(Timeout.ofMinutes(5)) .build()) .setH2Config(H2Config.custom() - .setMaxConcurrentStreams(128) + .setMaxConcurrentStreams(builder.maxStreamsPerConnection != null ? builder.maxStreamsPerConnection : 128) .setPushEnabled(false) .build()); @@ -83,20 +99,89 @@ class HttpFeedClient implements FeedClient { } @Override - public Future<Result> put(String documentId, String documentJson, OperationParameters params, ResultCallback callback) { - return null; + public CompletableFuture<Result> put(DocumentId documentId, String documentJson, OperationParameters params) { + return send("POST", documentId, requireNonNull(documentJson), params); } @Override - public Future<Result> remove(String documentId, OperationParameters params, ResultCallback callback) { - return null; + public CompletableFuture<Result> update(DocumentId documentId, String updateJson, OperationParameters params) { + return send("PUT", documentId, requireNonNull(updateJson), params); } @Override - public Future<Result> update(String documentId, String documentJson, OperationParameters params, ResultCallback callback) { - return null; + public CompletableFuture<Result> remove(DocumentId documentId, OperationParameters params) { + return send("DELETE", documentId, null, params); } @Override public void close() throws IOException { this.httpClient.close(); } + private CompletableFuture<Result> send(String method, DocumentId documentId, String operationJson, OperationParameters params) { + SimpleHttpRequest request = new SimpleHttpRequest(method, operationUrl(endpoint, documentId, params)); + requestHeaders.forEach(request::setHeader); + if (operationJson != null) + request.setBody(operationJson, ContentType.APPLICATION_JSON); + + CompletableFuture<Result> future = new CompletableFuture<>(); + httpClient.execute(new SimpleHttpRequest(method, endpoint), + new FutureCallback<SimpleHttpResponse>() { + @Override public void completed(SimpleHttpResponse response) { + Result result = toResult(response, documentId); + future.complete(result); // TODO: add retrying + } + @Override public void failed(Exception ex) { + Result result = new Result(Result.Type.failure, documentId, ex.getMessage(), null); + future.completeExceptionally(ex); // TODO: add retrying + } + @Override public void cancelled() { + Result result = new Result(Result.Type.cancelled, documentId, null, null); + future.cancel(false); // TODO: add retrying + } + }); + return future; + } + + static Result toResult(SimpleHttpResponse response, DocumentId documentId) { + return new Result(Result.Type.failure, documentId, null, null); // TODO: parse JSON and status code + } + + static List<String> toPath(DocumentId documentId) { + List<String> path = new ArrayList<>(); + path.add("document"); + path.add("v1"); + path.add(documentId.namespace()); + path.add(documentId.documentType()); + if (documentId.number().isPresent()) { + path.add("number"); + path.add(Long.toUnsignedString(documentId.number().getAsLong())); + } + else if (documentId.group().isPresent()) { + path.add("group"); + path.add(documentId.group().get()); + } + else { + path.add("docid"); + } + path.add(documentId.userSpecific()); + + return path; + } + + static URI operationUrl(URI endpoint, DocumentId documentId, OperationParameters params) { + URIBuilder url = new URIBuilder(endpoint); + url.setPathSegments(toPath(documentId)); + + if (params.createIfNonExistent()) url.addParameter("create", "true"); + params.testAndSetCondition().ifPresent(condition -> url.addParameter("condition", condition)); + params.timeout().ifPresent(timeout -> url.addParameter("timeout", timeout.toMillis() + "ms")); + params.route().ifPresent(route -> url.addParameter("route", route)); + params.tracelevel().ifPresent(tracelevel -> url.addParameter("tracelevel", Integer.toString(tracelevel))); + + try { + return url.build(); + } + catch (URISyntaxException e) { + throw new IllegalStateException(e); + } + } + } diff --git a/vespa-feed-client/src/main/java/com/yahoo/vespa/feed/client/OperationParameters.java b/vespa-feed-client/src/main/java/com/yahoo/vespa/feed/client/OperationParameters.java index 977a4b2d44e..35756a93329 100644 --- a/vespa-feed-client/src/main/java/com/yahoo/vespa/feed/client/OperationParameters.java +++ b/vespa-feed-client/src/main/java/com/yahoo/vespa/feed/client/OperationParameters.java @@ -3,9 +3,11 @@ package com.yahoo.vespa.feed.client; import java.time.Duration; import java.util.Optional; +import java.util.OptionalInt; /** * @author bjorncs + * @author jonmv */ public class OperationParameters { @@ -15,8 +17,11 @@ public class OperationParameters { } + public boolean createIfNonExistent() { return false; } public Optional<String> testAndSetCondition() { return Optional.empty(); } public Optional<Duration> timeout() { return Optional.empty(); } + public Optional<String> route() { return Optional.empty(); } + public OptionalInt tracelevel() { return OptionalInt.empty(); } public static class Builder { diff --git a/vespa-feed-client/src/main/java/com/yahoo/vespa/feed/client/Result.java b/vespa-feed-client/src/main/java/com/yahoo/vespa/feed/client/Result.java index 3a3f695ca2b..da7f5580289 100644 --- a/vespa-feed-client/src/main/java/com/yahoo/vespa/feed/client/Result.java +++ b/vespa-feed-client/src/main/java/com/yahoo/vespa/feed/client/Result.java @@ -5,15 +5,31 @@ import java.util.Optional; /** * @author bjorncs + * @author jonmv */ public class Result { - public enum Type { + private final Type type; + private final DocumentId documentId; + private final String resultMessage; + private final String traceMessage; + Result(Type type, DocumentId documentId, String resultMessage, String traceMessage) { + this.type = type; + this.documentId = documentId; + this.resultMessage = resultMessage; + this.traceMessage = traceMessage; } - public Type type() { return null; } - public String documentId() { return null; } - public String resultMessage() { return null; } - public Optional<String> traceMessage() { return Optional.empty(); } + public enum Type { + success, + cancelled, + failure + } + + public Type type() { return type; } + public DocumentId documentId() { return documentId; } + public Optional<String> resultMessage() { return Optional.ofNullable(resultMessage); } + public Optional<String> traceMessage() { return Optional.ofNullable(traceMessage); } + } |