aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorjonmv <venstad@gmail.com>2022-05-03 07:30:55 +0200
committerjonmv <venstad@gmail.com>2022-05-03 07:30:55 +0200
commit3592e408848787f75e721bf2d6f99f3f010f6610 (patch)
tree8a41d51e4bbc420bc1dbc65d76e79e3f8ab423d3
parente046ae79779261b09f85bcf4c04c906b83075775 (diff)
Revert "Merge pull request #22394 from vespa-engine/revert-22374-jonmv/remove-last-controller-jersey-client"
This reverts commit e046ae79779261b09f85bcf4c04c906b83075775, reversing changes made to d2066c0a0c04e2aa2ada12a5c85f5eae9ff65b02.
-rw-r--r--configserver/pom.xml74
-rw-r--r--container-core/src/main/java/com/yahoo/container/core/documentapi/DocumentAccessProvider.java3
-rw-r--r--container-core/src/main/java/com/yahoo/container/core/documentapi/VespaDocumentAccess.java26
-rw-r--r--container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/NetworkMultiplexerHolder.java27
-rw-r--r--container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java19
-rw-r--r--controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/integration/organization/IssueHandler.java21
-rw-r--r--controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/integration/organization/IssueInfo.java65
-rw-r--r--controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/integration/organization/MockIssueHandler.java20
-rw-r--r--hosted-api/src/main/java/ai/vespa/hosted/api/MultiPartStreamer.java29
-rw-r--r--http-client/src/main/java/ai/vespa/hosted/client/AbstractHttpClient.java15
-rw-r--r--http-client/src/main/java/ai/vespa/hosted/client/HttpClient.java8
-rw-r--r--http-utils/src/main/java/ai/vespa/util/http/hc5/DefaultHttpClientBuilder.java49
-rw-r--r--yolean/abi-spec.json30
-rw-r--r--yolean/src/main/java/com/yahoo/yolean/concurrent/Memoized.java64
-rw-r--r--yolean/src/test/java/com/yahoo/yolean/concurrent/MemoizedTest.java101
15 files changed, 396 insertions, 155 deletions
diff --git a/configserver/pom.xml b/configserver/pom.xml
index 131e4503f6d..99307d1b2b8 100644
--- a/configserver/pom.xml
+++ b/configserver/pom.xml
@@ -189,39 +189,11 @@
<scope>provided</scope>
</dependency>
<dependency>
- <!-- Do not remove, as long as this is provided by jdisc and configserver uses jersey-client -->
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>jackson-annotations</artifactId>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <!-- Do not remove, as long as this is provided by jdisc and configserver uses jersey-client -->
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>jackson-core</artifactId>
- <scope>provided</scope>
- </dependency>
- <dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
- <groupId>com.fasterxml.jackson.jaxrs</groupId>
- <artifactId>jackson-jaxrs-json-provider</artifactId>
- <exclusions>
- <exclusion>
- <!-- Conflicts with javax.activation:javax.activation-api:1.2.0, which is "exported" via jdisc_core. -->
- <groupId>jakarta.activation</groupId>
- <artifactId>jakarta.activation-api</artifactId>
- </exclusion>
- <exclusion>
- <!-- Conflicts with javax.xml.bind:jaxb-api:2.3, which is "exported" via jdisc_core.-->
- <groupId>jakarta.xml.bind</groupId>
- <artifactId>jakarta.xml.bind-api</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
</dependency>
@@ -241,52 +213,6 @@
<version>${project.version}</version>
</dependency>
- <!-- Jersey, needed by orchestrator -->
- <dependency>
- <groupId>javax.ws.rs</groupId>
- <artifactId>javax.ws.rs-api</artifactId>
- <scope>provided</scope> <!-- TODO: Vespa 8: Set to compile if we get rid of the javax.ws.rs-api bundle -->
- </dependency>
- <dependency>
- <groupId>org.glassfish.jersey.core</groupId>
- <artifactId>jersey-client</artifactId>
- </dependency>
- <dependency>
- <groupId>org.glassfish.jersey.core</groupId>
- <artifactId>jersey-server</artifactId>
- <exclusions>
- <exclusion>
- <groupId>org.glassfish.jersey.media</groupId>
- <artifactId>jersey-media-jaxb</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>org.glassfish.jersey.ext</groupId>
- <artifactId>jersey-proxy-client</artifactId>
- </dependency>
- <dependency>
- <groupId>org.glassfish.jersey.media</groupId>
- <artifactId>jersey-media-json-jackson</artifactId>
- <exclusions>
- <!-- Prevent embedding deps provided by jdisc -->
- <exclusion>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>jackson-annotations</artifactId>
- </exclusion>
- <exclusion>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>jackson-core</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <!-- Not needed by configserver, but by controller. Also pulls in mimepull. -->
- <groupId>org.glassfish.jersey.media</groupId>
- <artifactId>jersey-media-multipart</artifactId>
- </dependency>
- <!-- Jersey END -->
-
</dependencies>
<build>
<plugins>
diff --git a/container-core/src/main/java/com/yahoo/container/core/documentapi/DocumentAccessProvider.java b/container-core/src/main/java/com/yahoo/container/core/documentapi/DocumentAccessProvider.java
index be8ba669ec0..9da16744eec 100644
--- a/container-core/src/main/java/com/yahoo/container/core/documentapi/DocumentAccessProvider.java
+++ b/container-core/src/main/java/com/yahoo/container/core/documentapi/DocumentAccessProvider.java
@@ -7,10 +7,9 @@ import com.yahoo.document.config.DocumentmanagerConfig;
import com.yahoo.documentapi.messagebus.protocol.DocumentProtocolPoliciesConfig;
import com.yahoo.messagebus.MessagebusConfig;
import com.yahoo.vespa.config.content.DistributionConfig;
-import com.yahoo.vespa.config.content.LoadTypeConfig;
/**
- * Lets a lazily initialised DocumentAccess forwarding to a real MessageBusDocumentAccess be injected in containers.
+ * Lets a lazily initialised DocumentAccess that forwards to a MessageBusDocumentAccess be injected in containers.
*
* @author jonmv
*/
diff --git a/container-core/src/main/java/com/yahoo/container/core/documentapi/VespaDocumentAccess.java b/container-core/src/main/java/com/yahoo/container/core/documentapi/VespaDocumentAccess.java
index 7c5bebc47e8..c708e87e6c5 100644
--- a/container-core/src/main/java/com/yahoo/container/core/documentapi/VespaDocumentAccess.java
+++ b/container-core/src/main/java/com/yahoo/container/core/documentapi/VespaDocumentAccess.java
@@ -20,8 +20,8 @@ import com.yahoo.documentapi.messagebus.MessageBusParams;
import com.yahoo.documentapi.messagebus.protocol.DocumentProtocolPoliciesConfig;
import com.yahoo.messagebus.MessagebusConfig;
import com.yahoo.vespa.config.content.DistributionConfig;
+import com.yahoo.yolean.concurrent.Memoized;
-import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -37,8 +37,7 @@ public class VespaDocumentAccess extends DocumentAccess {
private final MessageBusParams parameters;
- private final AtomicReference<DocumentAccess> delegate = new AtomicReference<>();
- private boolean shutDown = false;
+ private final Memoized<DocumentAccess, RuntimeException> delegate;
VespaDocumentAccess(DocumentmanagerConfig documentmanagerConfig,
String slobroksConfigId,
@@ -51,19 +50,11 @@ public class VespaDocumentAccess extends DocumentAccess {
this.parameters.setDocumentmanagerConfig(documentmanagerConfig);
this.parameters.getRPCNetworkParams().setSlobrokConfigId(slobroksConfigId);
this.parameters.getMessageBusParams().setMessageBusConfig(messagebusConfig);
+ this.delegate = new Memoized<>(() -> new MessageBusDocumentAccess(parameters), DocumentAccess::shutdown);
}
public DocumentAccess delegate() {
- DocumentAccess access = delegate.getAcquire();
- return access != null ? access : delegate.updateAndGet(value -> {
- if (value != null)
- return value;
-
- if (shutDown)
- throw new IllegalStateException("This document access has been shut down");
-
- return new MessageBusDocumentAccess(parameters);
- });
+ return delegate.get();
}
@Override
@@ -72,14 +63,7 @@ public class VespaDocumentAccess extends DocumentAccess {
}
void protectedShutdown() {
- delegate.updateAndGet(access -> {
- super.shutdown();
- shutDown = true;
- if (access != null)
- access.shutdown();
-
- return null;
- });
+ delegate.close();
}
@Override
diff --git a/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/NetworkMultiplexerHolder.java b/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/NetworkMultiplexerHolder.java
index cb28807ac73..8b3b2664cd1 100644
--- a/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/NetworkMultiplexerHolder.java
+++ b/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/NetworkMultiplexerHolder.java
@@ -7,6 +7,9 @@ import com.yahoo.messagebus.network.NetworkMultiplexer;
import com.yahoo.messagebus.network.rpc.RPCNetwork;
import com.yahoo.messagebus.network.rpc.RPCNetworkParams;
import com.yahoo.messagebus.shared.NullNetwork;
+import com.yahoo.yolean.concurrent.Memoized;
+
+import java.util.concurrent.atomic.AtomicReference;
/**
* Holds a reference to a singleton {@link NetworkMultiplexer}.
@@ -15,21 +18,17 @@ import com.yahoo.messagebus.shared.NullNetwork;
*/
public class NetworkMultiplexerHolder extends AbstractComponent {
- private final Object monitor = new Object();
- private boolean destroyed = false;
- private NetworkMultiplexer net;
+ private final AtomicReference<RPCNetworkParams> params = new AtomicReference<>();
+ private final Memoized<NetworkMultiplexer, RuntimeException> net = new Memoized<>(() -> NetworkMultiplexer.shared(newNetwork(params.get())),
+ NetworkMultiplexer::disown);
/** Get the singleton RPCNetworkAdapter, creating it if this hasn't yet been done. */
public NetworkMultiplexer get(RPCNetworkParams params) {
- synchronized (monitor) {
- if (destroyed)
- throw new IllegalStateException("Component already destroyed");
-
- return net = net != null ? net : NetworkMultiplexer.shared(newNetwork(params));
- }
+ this.params.set(params);
+ return net.get();
}
- private Network newNetwork(RPCNetworkParams params) {
+ private static Network newNetwork(RPCNetworkParams params) {
return params.getSlobroksConfig() != null && params.getSlobroksConfig().slobrok().isEmpty()
? new NullNetwork() // For LocalApplication, test setup.
: new RPCNetwork(params);
@@ -37,13 +36,7 @@ public class NetworkMultiplexerHolder extends AbstractComponent {
@Override
public void deconstruct() {
- synchronized (monitor) {
- if (net != null) {
- net.disown();
- net = null;
- }
- destroyed = true;
- }
+ net.close();
}
}
diff --git a/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java b/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java
index 91a3181cb68..3045247a6d3 100644
--- a/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java
+++ b/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java
@@ -29,6 +29,7 @@ import com.yahoo.messagebus.shared.SharedMessageBus;
import com.yahoo.messagebus.shared.SharedSourceSession;
import com.yahoo.vespa.config.content.DistributionConfig;
import com.yahoo.vespa.config.content.LoadTypeConfig;
+import com.yahoo.yolean.concurrent.Memoized;
import java.util.HashMap;
import java.util.Map;
@@ -54,9 +55,7 @@ public final class SessionCache extends AbstractComponent {
private static final Logger log = Logger.getLogger(SessionCache.class.getName());
- private final Object monitor = new Object();
- private Supplier<SharedMessageBus> messageBuses;
- private SharedMessageBus messageBus;
+ private final Memoized<SharedMessageBus, RuntimeException> messageBus;
private final Object intermediateLock = new Object();
private final Map<String, SharedIntermediateSession> intermediates = new HashMap<>();
@@ -96,24 +95,18 @@ public final class SessionCache extends AbstractComponent {
public SessionCache(Supplier<NetworkMultiplexer> net, ContainerMbusConfig containerMbusConfig,
MessagebusConfig messagebusConfig, Protocol protocol) {
- this.messageBuses = () -> createSharedMessageBus(net.get(), containerMbusConfig, messagebusConfig, protocol);
+ this.messageBus = new Memoized<>(() -> createSharedMessageBus(net.get(), containerMbusConfig, messagebusConfig, protocol),
+ SharedMessageBus::release);
}
@Override
public void deconstruct() {
- synchronized (monitor) {
- messageBuses = () -> { throw new IllegalStateException("Session cache already deconstructed"); };
-
- if (messageBus != null)
- messageBus.release();
- }
+ messageBus.close();
}
// Lazily create shared message bus.
private SharedMessageBus bus() {
- synchronized (monitor) {
- return messageBus = messageBus != null ? messageBus : messageBuses.get();
- }
+ return messageBus.get();
}
private static SharedMessageBus createSharedMessageBus(NetworkMultiplexer net,
diff --git a/controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/integration/organization/IssueHandler.java b/controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/integration/organization/IssueHandler.java
index dc8b22ac32d..8123b6f2ce6 100644
--- a/controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/integration/organization/IssueHandler.java
+++ b/controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/integration/organization/IssueHandler.java
@@ -2,8 +2,13 @@
package com.yahoo.vespa.hosted.controller.api.integration.organization;
+import com.yahoo.vespa.hosted.controller.api.integration.jira.JiraIssue;
+
+import java.io.InputStream;
import java.time.Duration;
+import java.util.List;
import java.util.Optional;
+import java.util.function.Supplier;
/**
* @author jonmv
@@ -19,12 +24,22 @@ public interface IssueHandler {
IssueId file(Issue issue);
/**
+ * Returns all open issues similar to the given.
+ *
+ * @param issue The issue to search for; relevant fields are the summary and the owner (propertyId).
+ * @return All open, similar issues.
+ */
+ List<IssueInfo> findAllBySimilarity(Issue issue);
+
+ /**
* Returns the ID of this issue, if it exists and is open, based on a similarity search.
*
* @param issue The issue to search for; relevant fields are the summary and the owner (propertyId).
* @return ID of the issue, if it is found.
*/
- Optional<IssueId> findBySimilarity(Issue issue);
+ default Optional<IssueId> findBySimilarity(Issue issue) {
+ return findAllBySimilarity(issue).stream().findFirst().map(IssueInfo::id);
+ }
/**
* Update the description of the issue with the given ID.
@@ -108,4 +123,8 @@ public interface IssueHandler {
* @throws RuntimeException exception if project not found
*/
ProjectInfo projectInfo(String projectKey);
+
+ /** Upload an attachment to the issue, with indicated filename, from the given input stream. */
+ void addAttachment(IssueId id, String filename, Supplier<InputStream> contentAsStream);
+
}
diff --git a/controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/integration/organization/IssueInfo.java b/controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/integration/organization/IssueInfo.java
new file mode 100644
index 00000000000..52c022bebdf
--- /dev/null
+++ b/controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/integration/organization/IssueInfo.java
@@ -0,0 +1,65 @@
+package com.yahoo.vespa.hosted.controller.api.integration.organization;
+
+import com.yahoo.vespa.hosted.controller.api.integration.organization.IssueId;
+import com.yahoo.vespa.hosted.controller.api.integration.organization.User;
+
+import java.time.Instant;
+import java.util.Optional;
+
+/**
+ * Information about a stored issue.
+ *
+ * @author jonmv
+ */
+public class IssueInfo {
+
+ private final IssueId id;
+ private final Instant updated;
+ private final Status status;
+ private final User assignee;
+
+ public IssueInfo(IssueId id, Instant updated, Status status, User assignee) {
+ this.id = id;
+ this.updated = updated;
+ this.status = status;
+ this.assignee = assignee;
+ }
+
+ public IssueId id() {
+ return id;
+ }
+
+ public Instant updated() {
+ return updated;
+ }
+
+ public Status status() {
+ return status;
+ }
+
+ public Optional<User> assignee() {
+ return Optional.ofNullable(assignee);
+ }
+
+
+ public enum Status {
+
+ toDo("To Do"),
+ inProgress("In Progress"),
+ done("Done"),
+ noCategory("No Category");
+
+ private final String value;
+
+ Status(String value) { this.value = value; }
+
+ public static Status fromValue(String value) {
+ for (Status status : Status.values())
+ if (status.value.equals(value))
+ return status;
+ throw new IllegalArgumentException(value + " is not a valid status.");
+ }
+
+ }
+
+}
diff --git a/controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/integration/organization/MockIssueHandler.java b/controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/integration/organization/MockIssueHandler.java
index 257d2ff5e67..a62f43d1cf5 100644
--- a/controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/integration/organization/MockIssueHandler.java
+++ b/controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/integration/organization/MockIssueHandler.java
@@ -2,7 +2,9 @@
package com.yahoo.vespa.hosted.controller.api.integration.organization;
import com.google.inject.Inject;
+import com.yahoo.vespa.hosted.controller.api.integration.organization.IssueInfo.Status;
+import java.io.InputStream;
import java.net.URI;
import java.time.Clock;
import java.time.Duration;
@@ -14,6 +16,7 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Supplier;
import java.util.stream.Collectors;
/**
@@ -24,6 +27,7 @@ public class MockIssueHandler implements IssueHandler {
private final Clock clock;
private final AtomicLong counter = new AtomicLong();
private final Map<IssueId, MockIssue> issues = new HashMap<>();
+ private final Map<IssueId, Map<String, InputStream>> attachments = new HashMap<>();
private final Map<String, ProjectInfo> projects = new HashMap<>();
@Inject
@@ -45,11 +49,14 @@ public class MockIssueHandler implements IssueHandler {
}
@Override
- public Optional<IssueId> findBySimilarity(Issue issue) {
+ public List<IssueInfo> findAllBySimilarity(Issue issue) {
return issues.entrySet().stream()
- .filter(entry -> entry.getValue().issue.summary().equals(issue.summary()))
- .findFirst()
- .map(Map.Entry::getKey);
+ .filter(entry -> entry.getValue().issue.summary().equals(issue.summary()))
+ .map(entry -> new IssueInfo(entry.getKey(),
+ entry.getValue().updated,
+ entry.getValue().isOpen() ? Status.toDo : Status.done,
+ entry.getValue().assignee))
+ .collect(Collectors.toList());
}
@Override
@@ -118,6 +125,11 @@ public class MockIssueHandler implements IssueHandler {
return projects.get(projectKey);
}
+ @Override
+ public void addAttachment(IssueId id, String filename, Supplier<InputStream> contentAsStream) {
+ attachments.computeIfAbsent(id, __ -> new HashMap<>()).put(filename, contentAsStream.get());
+ }
+
public MockIssueHandler close(IssueId issueId) {
issues.get(issueId).open = false;
touch(issueId);
diff --git a/hosted-api/src/main/java/ai/vespa/hosted/api/MultiPartStreamer.java b/hosted-api/src/main/java/ai/vespa/hosted/api/MultiPartStreamer.java
index f155cbbce07..8c858437ad7 100644
--- a/hosted-api/src/main/java/ai/vespa/hosted/api/MultiPartStreamer.java
+++ b/hosted-api/src/main/java/ai/vespa/hosted/api/MultiPartStreamer.java
@@ -1,6 +1,8 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package ai.vespa.hosted.api;
+import com.yahoo.yolean.Exceptions;
+
import java.io.BufferedInputStream;
import java.io.ByteArrayInputStream;
import java.io.IOException;
@@ -51,26 +53,26 @@ public class MultiPartStreamer {
/** Adds the given data as a named part in this, using the given content type. */
public MultiPartStreamer addData(String name, String type, String data) {
- streams.add(() -> separator(name, type));
- streams.add(() -> asStream(data));
-
- return this;
+ return addData(name, type, null, () -> asStream(data));
}
/** Adds the given data as a named part in this, using the {@code "application/octet-stream" content type}. */
public MultiPartStreamer addBytes(String name, byte[] bytes) {
- streams.add(() -> separator(name, "application/octet-stream"));
- streams.add(() -> new ByteArrayInputStream(bytes));
+ return addData(name, "application/octet-stream", null, () -> new ByteArrayInputStream(bytes));
+ }
+
+ /** Adds the given data as a named part in this, using the given content type. */
+ public MultiPartStreamer addData(String name, String type, String filename, Supplier<InputStream> data) {
+ streams.add(() -> separator(name, filename, type));
+ streams.add(data);
return this;
}
/** Adds the contents of the file at the given path as a named part in this. */
public MultiPartStreamer addFile(String name, Path path) {
- streams.add(() -> separator(name, path));
- streams.add(() -> asStream(path));
-
- return this;
+ String type = Exceptions.uncheck(() -> Files.probeContentType(path));
+ return addData(name, type != null ? type : "application/octet-stream", path.getFileName().toString(), () -> asStream(path));
}
/**
@@ -107,16 +109,15 @@ public class MultiPartStreamer {
}
/** Returns the separator to put between one part and the next, when this is a string. */
- private InputStream separator(String name, String contentType) {
- return asStream(disposition(name) + type(contentType));
+ private InputStream separator(String name, String filename, String contentType) {
+ return asStream(disposition(name) + (filename == null ? "" : "; filename=\"" + filename + "\"") + type(contentType));
}
/** Returns the separator to put between one part and the next, when this is a file. */
private InputStream separator(String name, Path path) {
try {
String contentType = Files.probeContentType(path);
- return asStream(disposition(name) + "; filename=\"" + path.getFileName() + "\"" +
- type(contentType != null ? contentType : "application/octet-stream"));
+ return separator(name, path.getFileName().toString(), contentType != null ? contentType : "application/octet-stream");
}
catch (IOException e) {
throw new UncheckedIOException(e);
diff --git a/http-client/src/main/java/ai/vespa/hosted/client/AbstractHttpClient.java b/http-client/src/main/java/ai/vespa/hosted/client/AbstractHttpClient.java
index 68741d6d509..6a76ef65082 100644
--- a/http-client/src/main/java/ai/vespa/hosted/client/AbstractHttpClient.java
+++ b/http-client/src/main/java/ai/vespa/hosted/client/AbstractHttpClient.java
@@ -79,9 +79,9 @@ public abstract class AbstractHttpClient implements HttpClient {
.asURI())
.build();
builder.headers.forEach((name, values) -> values.forEach(value -> request.setHeader(name, value)));
- request.setEntity(builder.entity);
- try {
+ try (HttpEntity entity = builder.entity.get()) {
+ request.setEntity(entity);
try {
return handler.apply(execute(request, contextWithTimeout(builder)), request);
}
@@ -90,16 +90,15 @@ public abstract class AbstractHttpClient implements HttpClient {
throw RetryException.wrap(e, request);
}
}
+ catch (IOException e) {
+ throw new UncheckedIOException("failed closing request entity", e);
+ }
catch (RetryException e) {
if (thrown == null)
thrown = e.getCause();
else
thrown.addSuppressed(e.getCause());
- if (builder.entity != null && ! builder.entity.isRepeatable()) {
- log.log(WARNING, "Cannot retry " + request + " as entity is not repeatable");
- break;
- }
log.log(FINE, e.getCause(), () -> request + " failed; will retry");
}
}
@@ -152,7 +151,7 @@ public abstract class AbstractHttpClient implements HttpClient {
private HttpURL.Query query = Query.empty();
private List<Supplier<Query>> dynamicQuery = new ArrayList<>();
private Map<String, List<String>> headers = new LinkedHashMap<>();
- private HttpEntity entity;
+ private Supplier<HttpEntity> entity = () -> null;
private RequestConfig config = HttpClient.defaultRequestConfig;
private ResponseVerifier verifier = HttpClient.throwOnError;
private ExceptionHandler catcher = HttpClient.retryAll;
@@ -178,7 +177,7 @@ public abstract class AbstractHttpClient implements HttpClient {
}
@Override
- public RequestBuilder body(HttpEntity entity) {
+ public RequestBuilder body(Supplier<HttpEntity> entity) {
this.entity = requireNonNull(entity);
return this;
}
diff --git a/http-client/src/main/java/ai/vespa/hosted/client/HttpClient.java b/http-client/src/main/java/ai/vespa/hosted/client/HttpClient.java
index b41b16c25be..16a419bf324 100644
--- a/http-client/src/main/java/ai/vespa/hosted/client/HttpClient.java
+++ b/http-client/src/main/java/ai/vespa/hosted/client/HttpClient.java
@@ -78,7 +78,13 @@ public interface HttpClient extends Closeable {
RequestBuilder body(byte[] json);
/** Sets the request body. */
- RequestBuilder body(HttpEntity entity);
+ default RequestBuilder body(HttpEntity entity) {
+ if (entity.isRepeatable()) return body(() -> entity);
+ throw new IllegalArgumentException("entitiy must be repeatable, or a supplier must be used");
+ }
+
+ /** Sets the request body. */
+ RequestBuilder body(Supplier<HttpEntity> entity);
/** Sets query parameters without a value, like {@code ?debug&recursive}. */
default RequestBuilder emptyParameters(String... keys) {
diff --git a/http-utils/src/main/java/ai/vespa/util/http/hc5/DefaultHttpClientBuilder.java b/http-utils/src/main/java/ai/vespa/util/http/hc5/DefaultHttpClientBuilder.java
new file mode 100644
index 00000000000..8ad9d63cd1a
--- /dev/null
+++ b/http-utils/src/main/java/ai/vespa/util/http/hc5/DefaultHttpClientBuilder.java
@@ -0,0 +1,49 @@
+package ai.vespa.util.http.hc5;
+
+import org.apache.hc.client5.http.config.RequestConfig;
+import org.apache.hc.client5.http.impl.classic.HttpClientBuilder;
+import org.apache.hc.client5.http.impl.classic.HttpClients;
+import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManagerBuilder;
+import org.apache.hc.client5.http.ssl.SSLConnectionSocketFactoryBuilder;
+import org.apache.hc.core5.http.ContentType;
+import org.apache.hc.core5.http.HttpHeaders;
+import org.apache.hc.core5.util.Timeout;
+
+import javax.net.ssl.SSLContext;
+import java.time.Duration;
+import java.util.Map;
+import java.util.function.Supplier;
+
+/**
+ * Like {@link VespaHttpClientBuilder}, but with standard TLS based on provided SSL context.
+ *
+ * @author jonmv
+ */
+public class DefaultHttpClientBuilder {
+
+ public static final Duration connectTimeout = Duration.ofSeconds(5);
+ public static final Duration socketTimeout = Duration.ofSeconds(5);
+
+ private DefaultHttpClientBuilder() { }
+
+ public static HttpClientBuilder create(SSLContext sslContext, String userAgent) {
+ return create(() -> sslContext, userAgent);
+ }
+
+ /** Creates an HTTP client builder with the given SSL context, and using the provided timeouts for requests where config is not overridden. */
+ public static HttpClientBuilder create(Supplier<SSLContext> sslContext, String userAgent) {
+ return HttpClientBuilder.create()
+ .setConnectionManager(PoolingHttpClientConnectionManagerBuilder
+ .create()
+ .setSSLSocketFactory(SSLConnectionSocketFactoryBuilder
+ .create()
+ .setSslContext(sslContext.get())
+ .build())
+ .build())
+ .setUserAgent(userAgent)
+ .disableCookieManagement()
+ .disableAutomaticRetries()
+ .disableAuthCaching();
+ }
+
+}
diff --git a/yolean/abi-spec.json b/yolean/abi-spec.json
index 6285cc54118..553a8aa61e1 100644
--- a/yolean/abi-spec.json
+++ b/yolean/abi-spec.json
@@ -234,6 +234,36 @@
],
"fields": []
},
+ "com.yahoo.yolean.concurrent.Memoized$Closer": {
+ "superClass": "java.lang.Object",
+ "interfaces": [],
+ "attributes": [
+ "public",
+ "interface",
+ "abstract"
+ ],
+ "methods": [
+ "public abstract void close(java.lang.Object)"
+ ],
+ "fields": []
+ },
+ "com.yahoo.yolean.concurrent.Memoized": {
+ "superClass": "java.lang.Object",
+ "interfaces": [
+ "java.util.function.Supplier",
+ "java.lang.AutoCloseable"
+ ],
+ "attributes": [
+ "public"
+ ],
+ "methods": [
+ "public void <init>(java.util.function.Supplier, com.yahoo.yolean.concurrent.Memoized$Closer)",
+ "public static com.yahoo.yolean.concurrent.Memoized of(java.util.function.Supplier)",
+ "public java.lang.Object get()",
+ "public void close()"
+ ],
+ "fields": []
+ },
"com.yahoo.yolean.concurrent.ResourceFactory": {
"superClass": "java.lang.Object",
"interfaces": [],
diff --git a/yolean/src/main/java/com/yahoo/yolean/concurrent/Memoized.java b/yolean/src/main/java/com/yahoo/yolean/concurrent/Memoized.java
new file mode 100644
index 00000000000..e8660504a8a
--- /dev/null
+++ b/yolean/src/main/java/com/yahoo/yolean/concurrent/Memoized.java
@@ -0,0 +1,64 @@
+package com.yahoo.yolean.concurrent;
+
+import java.util.function.Supplier;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Wraps a lazily initialised resource which needs to be shut down.
+ * The wrapped supplier may not return {@code null}, and should be retryable on failure.
+ * If it throws, it will be retried if {@link #get} is retried. A supplier that fails to
+ * clean up partial state on failure may cause a resource leak.
+ *
+ * @author jonmv
+ */
+public class Memoized<T, E extends Exception> implements Supplier<T>, AutoCloseable {
+
+ /** Provides a tighter bound on the thrown exception type. */
+ @FunctionalInterface
+ public interface Closer<T, E extends Exception> { void close(T t) throws E; }
+
+ private final Object monitor = new Object();
+ private final Closer<T, E> closer;
+ private volatile T wrapped;
+ private Supplier<T> factory;
+
+ public Memoized(Supplier<T> factory, Closer<T, E> closer) {
+ this.factory = requireNonNull(factory);
+ this.closer = requireNonNull(closer);
+ }
+
+ public static <T extends AutoCloseable> Memoized<T, ?> of(Supplier<T> factory) {
+ return new Memoized<>(factory, AutoCloseable::close);
+ }
+
+ @Override
+ public T get() {
+ // Double-checked locking: try the variable, and if not initialized, try to initialize it.
+ if (wrapped == null) synchronized (monitor) {
+ // Ensure the factory is called only once, by clearing it once successfully called.
+ if (factory != null) wrapped = requireNonNull(factory.get());
+ factory = null;
+
+ // If we found the factory, we won the initialization race, and return normally; otherwise
+ // if wrapped is non-null, we lost the race, wrapped was set by the winner, and we return; otherwise
+ // we tried to initialise because wrapped was cleared by closing this, and we fail.
+ if (wrapped == null) throw new IllegalStateException("already closed");
+ }
+ return wrapped;
+ }
+
+ @Override
+ public void close() throws E {
+ // Alter state only when synchronized with calls to get().
+ synchronized (monitor) {
+ // Ensure we only try to close the generated resource once, by clearing it after picking it up here.
+ T maybe = wrapped;
+ wrapped = null;
+ // Clear the factory, to signal this has been closed.
+ factory = null;
+ if (maybe != null) closer.close(maybe);
+ }
+ }
+
+} \ No newline at end of file
diff --git a/yolean/src/test/java/com/yahoo/yolean/concurrent/MemoizedTest.java b/yolean/src/test/java/com/yahoo/yolean/concurrent/MemoizedTest.java
new file mode 100644
index 00000000000..7f2f49c75f2
--- /dev/null
+++ b/yolean/src/test/java/com/yahoo/yolean/concurrent/MemoizedTest.java
@@ -0,0 +1,101 @@
+package com.yahoo.yolean.concurrent;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.Phaser;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Supplier;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.fail;
+
+/**
+ * @author jonmv
+ */
+public class MemoizedTest {
+
+ final Phaser phaser = new Phaser();
+ final int threads = 128;
+
+ @Test
+ public void test() throws ExecutionException, InterruptedException {
+ var lazy = new Memoized<>(new OnceSupplier(), OnceCloseable::close);
+ phaser.register(); // test thread
+ phaser.register(); // whoever calls the factory
+
+ Phaser latch = new Phaser(threads + 1);
+ ExecutorService executor = Executors.newFixedThreadPool(threads);
+ List<Future<?>> futures = new ArrayList<>();
+ for (int i = 0; i < 128; i++) {
+ futures.add(executor.submit(() -> {
+ latch.arriveAndAwaitAdvance();
+ lazy.get().rendezvous();
+ while (true) lazy.get();
+ }));
+ }
+
+ // All threads waiting for latch, will race to factory
+ latch.arriveAndAwaitAdvance();
+
+ // One thread waiting in factory, the others are blocked, will go to rendezvous
+ phaser.arriveAndAwaitAdvance();
+
+ // All threads waiting in rendezvous, will repeatedly get until failure
+ phaser.arriveAndAwaitAdvance();
+
+ // Unsynchronized close should be detected by all threads
+ lazy.close();
+
+ // Close should carry through only once
+ lazy.close();
+
+ assertEquals("already closed",
+ assertThrows(IllegalStateException.class, lazy::get).getMessage());
+
+ for (Future<?> future : futures)
+ assertEquals("java.lang.IllegalStateException: already closed",
+ assertThrows(ExecutionException.class, future::get).getMessage());
+
+ executor.shutdown();
+ }
+
+ @Test
+ public void closeBeforeFirstGet() throws Exception {
+ OnceSupplier supplier = new OnceSupplier();
+ Memoized<OnceCloseable, ?> lazy = Memoized.of(supplier);
+ lazy.close();
+ assertEquals("already closed",
+ assertThrows(IllegalStateException.class, lazy::get).getMessage());
+ lazy.close();
+ assertFalse(supplier.initialized.get());
+ }
+
+ class OnceSupplier implements Supplier<OnceCloseable> {
+ final AtomicBoolean initialized = new AtomicBoolean();
+ @Override public OnceCloseable get() {
+ phaser.arriveAndAwaitAdvance();
+ if ( ! initialized.compareAndSet(false, true)) fail("initialized more than once");
+ phaser.bulkRegister(threads - 1); // register all the threads who didn't get the factory
+ return new OnceCloseable();
+ }
+ }
+
+ class OnceCloseable implements AutoCloseable {
+ final AtomicBoolean closed = new AtomicBoolean();
+ @Override public void close() {
+ if ( ! closed.compareAndSet(false, true)) fail("closed more than once");
+ }
+ void rendezvous() {
+ phaser.arriveAndAwaitAdvance();
+ }
+ }
+
+}