diff options
author | Jon Bratseth <bratseth@gmail.com> | 2020-08-27 12:47:11 +0200 |
---|---|---|
committer | Jon Bratseth <bratseth@gmail.com> | 2020-08-27 12:47:11 +0200 |
commit | e2ce587c570e14bd78be3f9d7a7025dadc3b3f00 (patch) | |
tree | c672830f188989057ec854417f30dcca6152cfbe /vespa-http-client | |
parent | e82e9f8e8ebf8f25ea4f44aa8d9fe2e2342bb979 (diff) |
Add a gateway connection factory
Diffstat (limited to 'vespa-http-client')
8 files changed, 153 insertions, 73 deletions
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ApacheGatewayConnection.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ApacheGatewayConnection.java index f07ab025eb0..1ef52ac05bb 100644 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ApacheGatewayConnection.java +++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ApacheGatewayConnection.java @@ -28,8 +28,6 @@ import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; -import java.net.InetAddress; -import java.net.UnknownHostException; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.ArrayList; @@ -70,41 +68,28 @@ class ApacheGatewayConnection implements GatewayConnection { private final HttpClientFactory httpClientFactory; private final String shardingKey = UUID.randomUUID().toString().substring(0, 5); - ApacheGatewayConnection( - Endpoint endpoint, - FeedParams feedParams, - String clusterSpecificRoute, - ConnectionParams connectionParams, - HttpClientFactory httpClientFactory, - String clientId) { + ApacheGatewayConnection(Endpoint endpoint, + FeedParams feedParams, + String clusterSpecificRoute, + ConnectionParams connectionParams, + HttpClientFactory httpClientFactory, + String clientId) { SUPPORTED_VERSIONS.add(3); - this.endpoint = validate(endpoint); + this.endpoint = endpoint; this.feedParams = feedParams; this.clusterSpecificRoute = clusterSpecificRoute; this.httpClientFactory = httpClientFactory; this.connectionParams = connectionParams; this.httpClient = null; - boolean isJson = feedParams.getDataFormat() == FeedParams.DataFormat.JSON_UTF8; - if (isJson) { + this.clientId = clientId; + + if (feedParams.getDataFormat() == FeedParams.DataFormat.JSON_UTF8) { startOfFeed = START_OF_FEED_JSON; endOfFeed = END_OF_FEED_JSON; } else { startOfFeed = START_OF_FEED_XML; endOfFeed = END_OF_FEED_XML; } - this.clientId = clientId; - if (this.clientId == null) - throw new IllegalArgumentException("Got no client Id."); - } - - private static Endpoint validate(Endpoint endpoint) { - try { - InetAddress.getByName(endpoint.getHostname()); - return endpoint; - } - catch (UnknownHostException e) { - throw new IllegalArgumentException("Unknown host: " + endpoint); - } } @Override diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ApacheGatewayConnectionFactory.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ApacheGatewayConnectionFactory.java new file mode 100644 index 00000000000..be1aa80b9ff --- /dev/null +++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ApacheGatewayConnectionFactory.java @@ -0,0 +1,58 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.http.client.core.communication; + +import com.yahoo.vespa.http.client.config.ConnectionParams; +import com.yahoo.vespa.http.client.config.Endpoint; +import com.yahoo.vespa.http.client.config.FeedParams; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.Objects; + +/** + * @author bratseth + */ +public class ApacheGatewayConnectionFactory implements GatewayConnectionFactory { + + private final Endpoint endpoint; + private final FeedParams feedParams; + private final String clusterSpecificRoute; + private final ConnectionParams connectionParams; + private final ApacheGatewayConnection.HttpClientFactory httpClientFactory; + private final String clientId; + + public ApacheGatewayConnectionFactory(Endpoint endpoint, + FeedParams feedParams, + String clusterSpecificRoute, + ConnectionParams connectionParams, + ApacheGatewayConnection.HttpClientFactory httpClientFactory, + String clientId) { + this.endpoint = validate(endpoint); + this.feedParams = feedParams; + this.clusterSpecificRoute = clusterSpecificRoute; + this.httpClientFactory = httpClientFactory; + this.connectionParams = connectionParams; + this.clientId = Objects.requireNonNull(clientId, "clientId cannot be null"); + } + + private static Endpoint validate(Endpoint endpoint) { + try { + InetAddress.getByName(endpoint.getHostname()); + return endpoint; + } + catch (UnknownHostException e) { + throw new IllegalArgumentException("Unknown host: " + endpoint); + } + } + + @Override + public GatewayConnection newConnection() { + return new ApacheGatewayConnection(endpoint, + feedParams, + clusterSpecificRoute, + connectionParams, + httpClientFactory, + clientId); + } + +} diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ClusterConnection.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ClusterConnection.java index d254cd0bab8..c50f9420973 100644 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ClusterConnection.java +++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ClusterConnection.java @@ -66,21 +66,22 @@ public class ClusterConnection implements AutoCloseable { timeoutExecutor, feedParams.getServerTimeout(TimeUnit.MILLISECONDS) + feedParams.getClientTimeout(TimeUnit.MILLISECONDS)); for (int i = 0; i < connectionParams.getNumPersistentConnectionsPerEndpoint(); i++) { - GatewayConnection gatewayConnection; + GatewayConnectionFactory connectionFactory; if (connectionParams.isDryRun()) { - gatewayConnection = new DryRunGatewayConnection(endpoint); + connectionFactory = new DryRunGatewayConnectionFactory(endpoint); } else { - gatewayConnection = new ApacheGatewayConnection(endpoint, - feedParams, - cluster.getRoute(), - connectionParams, - new ApacheGatewayConnection.HttpClientFactory(connectionParams, endpoint.isUseSsl()), - operationProcessor.getClientId() + connectionFactory = new ApacheGatewayConnectionFactory(endpoint, + feedParams, + cluster.getRoute(), + connectionParams, + new ApacheGatewayConnection.HttpClientFactory(connectionParams, endpoint.isUseSsl()), + operationProcessor.getClientId() ); } IOThread ioThread = new IOThread(operationProcessor.getIoThreadGroup(), + endpoint, endpointResultQueue, - gatewayConnection, + connectionFactory, clusterId, feedParams.getMaxChunkSizeBytes(), maxInFlightPerSession, diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DryRunGatewayConnectionFactory.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DryRunGatewayConnectionFactory.java new file mode 100644 index 00000000000..21a18ed5983 --- /dev/null +++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DryRunGatewayConnectionFactory.java @@ -0,0 +1,22 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.http.client.core.communication; + +import com.yahoo.vespa.http.client.config.Endpoint; + +/** + * @author bratseth + */ +public class DryRunGatewayConnectionFactory implements GatewayConnectionFactory { + + private final Endpoint endpoint; + + public DryRunGatewayConnectionFactory(Endpoint endpoint) { + this.endpoint = endpoint; + } + + @Override + public GatewayConnection newConnection() { + return new DryRunGatewayConnection(endpoint); + } + +} diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/GatewayConnectionFactory.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/GatewayConnectionFactory.java new file mode 100644 index 00000000000..d27aa850995 --- /dev/null +++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/GatewayConnectionFactory.java @@ -0,0 +1,13 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.http.client.core.communication; + +/** + * Creates gateway connections on request + * + * @author bratseth + */ +public interface GatewayConnectionFactory { + + GatewayConnection newConnection(); + +} diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java index 4853a2592e1..bcbcddce6c4 100644 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java +++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java @@ -33,7 +33,7 @@ class IOThread implements Runnable, AutoCloseable { private static final Logger log = Logger.getLogger(IOThread.class.getName()); private final Endpoint endpoint; - private final GatewayConnection client; + private final GatewayConnection currentClient; private final DocumentQueue documentQueue; private final EndpointResultQueue resultQueue; private final Thread thread; @@ -59,8 +59,9 @@ class IOThread implements Runnable, AutoCloseable { private final AtomicInteger lastGatewayProcessTimeMillis = new AtomicInteger(0); IOThread(ThreadGroup ioThreadGroup, + Endpoint endpoint, EndpointResultQueue endpointResultQueue, - GatewayConnection client, + GatewayConnectionFactory connectionFactory, int clusterId, int maxChunkSizeBytes, int maxInFlightRequests, @@ -68,9 +69,9 @@ class IOThread implements Runnable, AutoCloseable { DocumentQueue documentQueue, long maxSleepTimeMs, double idlePollFrequency) { + this.endpoint = endpoint; this.documentQueue = documentQueue; - this.endpoint = client.getEndpoint(); - this.client = client; + this.currentClient = connectionFactory.newConnection(); this.resultQueue = endpointResultQueue; this.clusterId = clusterId; this.maxChunkSizeBytes = maxChunkSizeBytes; @@ -151,14 +152,14 @@ class IOThread implements Runnable, AutoCloseable { if (size > 0) { log.info("We have outstanding operations (" + size + ") , trying to fetch responses."); try { - processResponse(client.drain()); + processResponse(currentClient.drain()); } catch (Throwable e) { log.log(Level.SEVERE, "Some failures while trying to get latest responses from vespa.", e); } } try { - client.close(); + currentClient.close(); } finally { // If there is still documents in the queue, fail them. drainDocumentQueueWhenFailingPermanently(new Exception( @@ -235,7 +236,7 @@ class IOThread implements Runnable, AutoCloseable { private InputStream sendAndReceive(List<Document> docs) throws IOException, ServerResponseException { try { // Post the new docs and get async responses for other posts. - return client.writeOperations(docs); + return currentClient.writeOperations(docs); } catch (ServerResponseException ser) { markDocumentAsFailed(docs, ser); throw ser; @@ -313,7 +314,7 @@ class IOThread implements Runnable, AutoCloseable { switch(threadState) { case DISCONNECTED: try { - if (! client.connect()) { + if (! currentClient.connect()) { log.log(Level.WARNING, "Could not connect to endpoint: '" + endpoint + "'. Will re-try."); drainFirstDocumentsInQueueIfOld(); return ThreadState.DISCONNECTED; @@ -329,7 +330,7 @@ class IOThread implements Runnable, AutoCloseable { } case CONNECTED: try { - client.handshake(); + currentClient.handshake(); successfulHandshakes.getAndIncrement(); } catch (ServerResponseException ser) { @@ -346,7 +347,7 @@ class IOThread implements Runnable, AutoCloseable { log.log(Level.INFO, "Failed talking to endpoint. Handshake with server endpoint '" + endpoint + "' failed. Will re-try handshake. Failed with '" + Exceptions.toMessageString(throwable) + "'",throwable); drainFirstDocumentsInQueueIfOld(); - client.close(); + currentClient.close(); return ThreadState.DISCONNECTED; } return ThreadState.SESSION_SYNCED; @@ -364,13 +365,13 @@ class IOThread implements Runnable, AutoCloseable { catch (Throwable e) { // Covers IOException as well log.log(Level.INFO, "Problems while handing data over to endpoint '" + endpoint + "'. Will re-try. Connection level error. Failed with '" + Exceptions.toMessageString(e) + "'", e); - client.close(); + currentClient.close(); return ThreadState.DISCONNECTED; } return ThreadState.SESSION_SYNCED; default: { log.severe("Should never get here."); - client.close(); + currentClient.close(); return ThreadState.DISCONNECTED; } } diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/ApacheGatewayConnectionTest.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/ApacheGatewayConnectionTest.java index 59a8b613e67..c510b6d0337 100644 --- a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/ApacheGatewayConnectionTest.java +++ b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/ApacheGatewayConnectionTest.java @@ -106,26 +106,6 @@ public class ApacheGatewayConnectionTest { apacheGatewayConnection.writeOperations(documents); } - @Test(expected=RuntimeException.class) - public void testBadConfigParameters() throws Exception { - final Endpoint endpoint = Endpoint.create("localhost", 666, false); - final FeedParams feedParams = new FeedParams.Builder().setDataFormat(FeedParams.DataFormat.JSON_UTF8).build(); - final String clusterSpecificRoute = ""; - final ConnectionParams connectionParams = new ConnectionParams.Builder() - .build(); - - final ApacheGatewayConnection.HttpClientFactory mockFactory = - mock(ApacheGatewayConnection.HttpClientFactory.class); - - new ApacheGatewayConnection( - endpoint, - feedParams, - clusterSpecificRoute, - connectionParams, - mockFactory, - null); - } - @Test public void testJsonDocumentHeader() throws Exception { final Endpoint endpoint = Endpoint.create("localhost", 666, false); diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java index e81638ded1c..b313daa12cd 100644 --- a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java +++ b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java @@ -62,15 +62,12 @@ public class IOThreadTest { * @param isTransient checked on failure, if different, the mock will fail. * @param expectedException checked on failure, if exception toString is different, the mock will fail. */ - void setupEndpointResultQueueMock(String expectedDocIdFail, String expectedDocIdOk,boolean isTransient, String expectedException) { - + void setupEndpointResultQueueMock(String expectedDocIdFail, String expectedDocIdOk, boolean isTransient, String expectedException) { doAnswer(invocation -> { EndpointResult endpointResult = (EndpointResult) invocation.getArguments()[0]; assertThat(endpointResult.getOperationId(), is(expectedDocIdFail)); - assertThat(endpointResult.getDetail().getException().toString(), - containsString(expectedException)); - assertThat(endpointResult.getDetail().getResultType(), is( - isTransient ? Result.ResultType.TRANSITIVE_ERROR : Result.ResultType.FATAL_ERROR)); + assertThat(endpointResult.getDetail().getException().toString(), containsString(expectedException)); + assertThat(endpointResult.getDetail().getResultType(), is(isTransient ? Result.ResultType.TRANSITIVE_ERROR : Result.ResultType.FATAL_ERROR)); latch.countDown(); return null; @@ -86,7 +83,17 @@ public class IOThreadTest { } private IOThread createIOThread(int maxInFlightRequests, long localQueueTimeOut) { - return new IOThread(null, endpointResultQueue, apacheGatewayConnection, 0, 0, maxInFlightRequests, localQueueTimeOut, documentQueue, 0, 10); + return new IOThread(null, + ENDPOINT, + endpointResultQueue, + new SingletonGatewayConnectionFactory(apacheGatewayConnection), + 0, + 0, + maxInFlightRequests, + localQueueTimeOut, + documentQueue, + 0, + 10); } @Test @@ -198,4 +205,17 @@ public class IOThreadTest { return futureResult; } + private static final class SingletonGatewayConnectionFactory implements GatewayConnectionFactory { + + private final GatewayConnection singletonConnection; + + SingletonGatewayConnectionFactory(GatewayConnection singletonConnection) { + this.singletonConnection = singletonConnection; + } + + @Override + public GatewayConnection newConnection() { return singletonConnection; } + + } + } |