aboutsummaryrefslogtreecommitdiffstats
path: root/vespa-http-client
diff options
context:
space:
mode:
authorJon Bratseth <bratseth@gmail.com>2020-08-27 12:47:11 +0200
committerJon Bratseth <bratseth@gmail.com>2020-08-27 12:47:11 +0200
commite2ce587c570e14bd78be3f9d7a7025dadc3b3f00 (patch)
treec672830f188989057ec854417f30dcca6152cfbe /vespa-http-client
parente82e9f8e8ebf8f25ea4f44aa8d9fe2e2342bb979 (diff)
Add a gateway connection factory
Diffstat (limited to 'vespa-http-client')
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ApacheGatewayConnection.java35
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ApacheGatewayConnectionFactory.java58
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ClusterConnection.java19
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DryRunGatewayConnectionFactory.java22
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/GatewayConnectionFactory.java13
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java25
-rw-r--r--vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/ApacheGatewayConnectionTest.java20
-rw-r--r--vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java34
8 files changed, 153 insertions, 73 deletions
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ApacheGatewayConnection.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ApacheGatewayConnection.java
index f07ab025eb0..1ef52ac05bb 100644
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ApacheGatewayConnection.java
+++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ApacheGatewayConnection.java
@@ -28,8 +28,6 @@ import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
@@ -70,41 +68,28 @@ class ApacheGatewayConnection implements GatewayConnection {
private final HttpClientFactory httpClientFactory;
private final String shardingKey = UUID.randomUUID().toString().substring(0, 5);
- ApacheGatewayConnection(
- Endpoint endpoint,
- FeedParams feedParams,
- String clusterSpecificRoute,
- ConnectionParams connectionParams,
- HttpClientFactory httpClientFactory,
- String clientId) {
+ ApacheGatewayConnection(Endpoint endpoint,
+ FeedParams feedParams,
+ String clusterSpecificRoute,
+ ConnectionParams connectionParams,
+ HttpClientFactory httpClientFactory,
+ String clientId) {
SUPPORTED_VERSIONS.add(3);
- this.endpoint = validate(endpoint);
+ this.endpoint = endpoint;
this.feedParams = feedParams;
this.clusterSpecificRoute = clusterSpecificRoute;
this.httpClientFactory = httpClientFactory;
this.connectionParams = connectionParams;
this.httpClient = null;
- boolean isJson = feedParams.getDataFormat() == FeedParams.DataFormat.JSON_UTF8;
- if (isJson) {
+ this.clientId = clientId;
+
+ if (feedParams.getDataFormat() == FeedParams.DataFormat.JSON_UTF8) {
startOfFeed = START_OF_FEED_JSON;
endOfFeed = END_OF_FEED_JSON;
} else {
startOfFeed = START_OF_FEED_XML;
endOfFeed = END_OF_FEED_XML;
}
- this.clientId = clientId;
- if (this.clientId == null)
- throw new IllegalArgumentException("Got no client Id.");
- }
-
- private static Endpoint validate(Endpoint endpoint) {
- try {
- InetAddress.getByName(endpoint.getHostname());
- return endpoint;
- }
- catch (UnknownHostException e) {
- throw new IllegalArgumentException("Unknown host: " + endpoint);
- }
}
@Override
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ApacheGatewayConnectionFactory.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ApacheGatewayConnectionFactory.java
new file mode 100644
index 00000000000..be1aa80b9ff
--- /dev/null
+++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ApacheGatewayConnectionFactory.java
@@ -0,0 +1,58 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.http.client.core.communication;
+
+import com.yahoo.vespa.http.client.config.ConnectionParams;
+import com.yahoo.vespa.http.client.config.Endpoint;
+import com.yahoo.vespa.http.client.config.FeedParams;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Objects;
+
+/**
+ * @author bratseth
+ */
+public class ApacheGatewayConnectionFactory implements GatewayConnectionFactory {
+
+ private final Endpoint endpoint;
+ private final FeedParams feedParams;
+ private final String clusterSpecificRoute;
+ private final ConnectionParams connectionParams;
+ private final ApacheGatewayConnection.HttpClientFactory httpClientFactory;
+ private final String clientId;
+
+ public ApacheGatewayConnectionFactory(Endpoint endpoint,
+ FeedParams feedParams,
+ String clusterSpecificRoute,
+ ConnectionParams connectionParams,
+ ApacheGatewayConnection.HttpClientFactory httpClientFactory,
+ String clientId) {
+ this.endpoint = validate(endpoint);
+ this.feedParams = feedParams;
+ this.clusterSpecificRoute = clusterSpecificRoute;
+ this.httpClientFactory = httpClientFactory;
+ this.connectionParams = connectionParams;
+ this.clientId = Objects.requireNonNull(clientId, "clientId cannot be null");
+ }
+
+ private static Endpoint validate(Endpoint endpoint) {
+ try {
+ InetAddress.getByName(endpoint.getHostname());
+ return endpoint;
+ }
+ catch (UnknownHostException e) {
+ throw new IllegalArgumentException("Unknown host: " + endpoint);
+ }
+ }
+
+ @Override
+ public GatewayConnection newConnection() {
+ return new ApacheGatewayConnection(endpoint,
+ feedParams,
+ clusterSpecificRoute,
+ connectionParams,
+ httpClientFactory,
+ clientId);
+ }
+
+}
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ClusterConnection.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ClusterConnection.java
index d254cd0bab8..c50f9420973 100644
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ClusterConnection.java
+++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ClusterConnection.java
@@ -66,21 +66,22 @@ public class ClusterConnection implements AutoCloseable {
timeoutExecutor,
feedParams.getServerTimeout(TimeUnit.MILLISECONDS) + feedParams.getClientTimeout(TimeUnit.MILLISECONDS));
for (int i = 0; i < connectionParams.getNumPersistentConnectionsPerEndpoint(); i++) {
- GatewayConnection gatewayConnection;
+ GatewayConnectionFactory connectionFactory;
if (connectionParams.isDryRun()) {
- gatewayConnection = new DryRunGatewayConnection(endpoint);
+ connectionFactory = new DryRunGatewayConnectionFactory(endpoint);
} else {
- gatewayConnection = new ApacheGatewayConnection(endpoint,
- feedParams,
- cluster.getRoute(),
- connectionParams,
- new ApacheGatewayConnection.HttpClientFactory(connectionParams, endpoint.isUseSsl()),
- operationProcessor.getClientId()
+ connectionFactory = new ApacheGatewayConnectionFactory(endpoint,
+ feedParams,
+ cluster.getRoute(),
+ connectionParams,
+ new ApacheGatewayConnection.HttpClientFactory(connectionParams, endpoint.isUseSsl()),
+ operationProcessor.getClientId()
);
}
IOThread ioThread = new IOThread(operationProcessor.getIoThreadGroup(),
+ endpoint,
endpointResultQueue,
- gatewayConnection,
+ connectionFactory,
clusterId,
feedParams.getMaxChunkSizeBytes(),
maxInFlightPerSession,
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DryRunGatewayConnectionFactory.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DryRunGatewayConnectionFactory.java
new file mode 100644
index 00000000000..21a18ed5983
--- /dev/null
+++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DryRunGatewayConnectionFactory.java
@@ -0,0 +1,22 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.http.client.core.communication;
+
+import com.yahoo.vespa.http.client.config.Endpoint;
+
+/**
+ * @author bratseth
+ */
+public class DryRunGatewayConnectionFactory implements GatewayConnectionFactory {
+
+ private final Endpoint endpoint;
+
+ public DryRunGatewayConnectionFactory(Endpoint endpoint) {
+ this.endpoint = endpoint;
+ }
+
+ @Override
+ public GatewayConnection newConnection() {
+ return new DryRunGatewayConnection(endpoint);
+ }
+
+}
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/GatewayConnectionFactory.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/GatewayConnectionFactory.java
new file mode 100644
index 00000000000..d27aa850995
--- /dev/null
+++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/GatewayConnectionFactory.java
@@ -0,0 +1,13 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.http.client.core.communication;
+
+/**
+ * Creates gateway connections on request
+ *
+ * @author bratseth
+ */
+public interface GatewayConnectionFactory {
+
+ GatewayConnection newConnection();
+
+}
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java
index 4853a2592e1..bcbcddce6c4 100644
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java
+++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java
@@ -33,7 +33,7 @@ class IOThread implements Runnable, AutoCloseable {
private static final Logger log = Logger.getLogger(IOThread.class.getName());
private final Endpoint endpoint;
- private final GatewayConnection client;
+ private final GatewayConnection currentClient;
private final DocumentQueue documentQueue;
private final EndpointResultQueue resultQueue;
private final Thread thread;
@@ -59,8 +59,9 @@ class IOThread implements Runnable, AutoCloseable {
private final AtomicInteger lastGatewayProcessTimeMillis = new AtomicInteger(0);
IOThread(ThreadGroup ioThreadGroup,
+ Endpoint endpoint,
EndpointResultQueue endpointResultQueue,
- GatewayConnection client,
+ GatewayConnectionFactory connectionFactory,
int clusterId,
int maxChunkSizeBytes,
int maxInFlightRequests,
@@ -68,9 +69,9 @@ class IOThread implements Runnable, AutoCloseable {
DocumentQueue documentQueue,
long maxSleepTimeMs,
double idlePollFrequency) {
+ this.endpoint = endpoint;
this.documentQueue = documentQueue;
- this.endpoint = client.getEndpoint();
- this.client = client;
+ this.currentClient = connectionFactory.newConnection();
this.resultQueue = endpointResultQueue;
this.clusterId = clusterId;
this.maxChunkSizeBytes = maxChunkSizeBytes;
@@ -151,14 +152,14 @@ class IOThread implements Runnable, AutoCloseable {
if (size > 0) {
log.info("We have outstanding operations (" + size + ") , trying to fetch responses.");
try {
- processResponse(client.drain());
+ processResponse(currentClient.drain());
} catch (Throwable e) {
log.log(Level.SEVERE, "Some failures while trying to get latest responses from vespa.", e);
}
}
try {
- client.close();
+ currentClient.close();
} finally {
// If there is still documents in the queue, fail them.
drainDocumentQueueWhenFailingPermanently(new Exception(
@@ -235,7 +236,7 @@ class IOThread implements Runnable, AutoCloseable {
private InputStream sendAndReceive(List<Document> docs) throws IOException, ServerResponseException {
try {
// Post the new docs and get async responses for other posts.
- return client.writeOperations(docs);
+ return currentClient.writeOperations(docs);
} catch (ServerResponseException ser) {
markDocumentAsFailed(docs, ser);
throw ser;
@@ -313,7 +314,7 @@ class IOThread implements Runnable, AutoCloseable {
switch(threadState) {
case DISCONNECTED:
try {
- if (! client.connect()) {
+ if (! currentClient.connect()) {
log.log(Level.WARNING, "Could not connect to endpoint: '" + endpoint + "'. Will re-try.");
drainFirstDocumentsInQueueIfOld();
return ThreadState.DISCONNECTED;
@@ -329,7 +330,7 @@ class IOThread implements Runnable, AutoCloseable {
}
case CONNECTED:
try {
- client.handshake();
+ currentClient.handshake();
successfulHandshakes.getAndIncrement();
} catch (ServerResponseException ser) {
@@ -346,7 +347,7 @@ class IOThread implements Runnable, AutoCloseable {
log.log(Level.INFO, "Failed talking to endpoint. Handshake with server endpoint '" + endpoint
+ "' failed. Will re-try handshake. Failed with '" + Exceptions.toMessageString(throwable) + "'",throwable);
drainFirstDocumentsInQueueIfOld();
- client.close();
+ currentClient.close();
return ThreadState.DISCONNECTED;
}
return ThreadState.SESSION_SYNCED;
@@ -364,13 +365,13 @@ class IOThread implements Runnable, AutoCloseable {
catch (Throwable e) { // Covers IOException as well
log.log(Level.INFO, "Problems while handing data over to endpoint '" + endpoint
+ "'. Will re-try. Connection level error. Failed with '" + Exceptions.toMessageString(e) + "'", e);
- client.close();
+ currentClient.close();
return ThreadState.DISCONNECTED;
}
return ThreadState.SESSION_SYNCED;
default: {
log.severe("Should never get here.");
- client.close();
+ currentClient.close();
return ThreadState.DISCONNECTED;
}
}
diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/ApacheGatewayConnectionTest.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/ApacheGatewayConnectionTest.java
index 59a8b613e67..c510b6d0337 100644
--- a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/ApacheGatewayConnectionTest.java
+++ b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/ApacheGatewayConnectionTest.java
@@ -106,26 +106,6 @@ public class ApacheGatewayConnectionTest {
apacheGatewayConnection.writeOperations(documents);
}
- @Test(expected=RuntimeException.class)
- public void testBadConfigParameters() throws Exception {
- final Endpoint endpoint = Endpoint.create("localhost", 666, false);
- final FeedParams feedParams = new FeedParams.Builder().setDataFormat(FeedParams.DataFormat.JSON_UTF8).build();
- final String clusterSpecificRoute = "";
- final ConnectionParams connectionParams = new ConnectionParams.Builder()
- .build();
-
- final ApacheGatewayConnection.HttpClientFactory mockFactory =
- mock(ApacheGatewayConnection.HttpClientFactory.class);
-
- new ApacheGatewayConnection(
- endpoint,
- feedParams,
- clusterSpecificRoute,
- connectionParams,
- mockFactory,
- null);
- }
-
@Test
public void testJsonDocumentHeader() throws Exception {
final Endpoint endpoint = Endpoint.create("localhost", 666, false);
diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java
index e81638ded1c..b313daa12cd 100644
--- a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java
+++ b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java
@@ -62,15 +62,12 @@ public class IOThreadTest {
* @param isTransient checked on failure, if different, the mock will fail.
* @param expectedException checked on failure, if exception toString is different, the mock will fail.
*/
- void setupEndpointResultQueueMock(String expectedDocIdFail, String expectedDocIdOk,boolean isTransient, String expectedException) {
-
+ void setupEndpointResultQueueMock(String expectedDocIdFail, String expectedDocIdOk, boolean isTransient, String expectedException) {
doAnswer(invocation -> {
EndpointResult endpointResult = (EndpointResult) invocation.getArguments()[0];
assertThat(endpointResult.getOperationId(), is(expectedDocIdFail));
- assertThat(endpointResult.getDetail().getException().toString(),
- containsString(expectedException));
- assertThat(endpointResult.getDetail().getResultType(), is(
- isTransient ? Result.ResultType.TRANSITIVE_ERROR : Result.ResultType.FATAL_ERROR));
+ assertThat(endpointResult.getDetail().getException().toString(), containsString(expectedException));
+ assertThat(endpointResult.getDetail().getResultType(), is(isTransient ? Result.ResultType.TRANSITIVE_ERROR : Result.ResultType.FATAL_ERROR));
latch.countDown();
return null;
@@ -86,7 +83,17 @@ public class IOThreadTest {
}
private IOThread createIOThread(int maxInFlightRequests, long localQueueTimeOut) {
- return new IOThread(null, endpointResultQueue, apacheGatewayConnection, 0, 0, maxInFlightRequests, localQueueTimeOut, documentQueue, 0, 10);
+ return new IOThread(null,
+ ENDPOINT,
+ endpointResultQueue,
+ new SingletonGatewayConnectionFactory(apacheGatewayConnection),
+ 0,
+ 0,
+ maxInFlightRequests,
+ localQueueTimeOut,
+ documentQueue,
+ 0,
+ 10);
}
@Test
@@ -198,4 +205,17 @@ public class IOThreadTest {
return futureResult;
}
+ private static final class SingletonGatewayConnectionFactory implements GatewayConnectionFactory {
+
+ private final GatewayConnection singletonConnection;
+
+ SingletonGatewayConnectionFactory(GatewayConnection singletonConnection) {
+ this.singletonConnection = singletonConnection;
+ }
+
+ @Override
+ public GatewayConnection newConnection() { return singletonConnection; }
+
+ }
+
}