From c5adf87ecf4d6de277ad233137beeec318c869c3 Mon Sep 17 00:00:00 2001 From: Jon Bratseth Date: Thu, 27 Aug 2020 13:37:12 +0200 Subject: Remember when we connected --- .../core/communication/ApacheGatewayConnection.java | 7 +++++++ .../core/communication/DryRunGatewayConnection.java | 7 +++++++ .../client/core/communication/GatewayConnection.java | 4 ++++ .../http/client/core/communication/IOThread.java | 20 ++++++++++---------- .../communication/ApacheGatewayConnectionTest.java | 19 ++++++++----------- 5 files changed, 36 insertions(+), 21 deletions(-) diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ApacheGatewayConnection.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ApacheGatewayConnection.java index 1ef52ac05bb..c5864e48681 100644 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ApacheGatewayConnection.java +++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ApacheGatewayConnection.java @@ -30,6 +30,8 @@ import java.io.IOException; import java.io.InputStream; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; +import java.time.Clock; +import java.time.Instant; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -62,6 +64,7 @@ class ApacheGatewayConnection implements GatewayConnection { private final String clusterSpecificRoute; private final ConnectionParams connectionParams; private HttpClient httpClient; + private Instant connectionTime = null; private String sessionId; private final String clientId; private int negotiatedVersion = -1; @@ -108,9 +111,13 @@ class ApacheGatewayConnection implements GatewayConnection { if (httpClient != null) log.log(Level.WARNING, "Previous httpClient still exists."); httpClient = httpClientFactory.createClient(); + connectionTime = Clock.systemUTC().instant(); return httpClient != null; } + @Override + public Instant connectionTime() { return connectionTime; } + // Protected for easier testing only. protected static InputStreamEntity zipAndCreateEntity(final InputStream inputStream) throws IOException { byte[] buffer = new byte[4096]; diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DryRunGatewayConnection.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DryRunGatewayConnection.java index 23ab5e36e14..f91a853c52c 100644 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DryRunGatewayConnection.java +++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DryRunGatewayConnection.java @@ -11,6 +11,8 @@ import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; import java.nio.charset.StandardCharsets; +import java.time.Clock; +import java.time.Instant; import java.util.ArrayList; import java.util.List; @@ -22,6 +24,7 @@ import java.util.List; public class DryRunGatewayConnection implements GatewayConnection { private final Endpoint endpoint; + private Instant connectionTime = null; public DryRunGatewayConnection(Endpoint endpoint) { this.endpoint = endpoint; @@ -44,9 +47,13 @@ public class DryRunGatewayConnection implements GatewayConnection { @Override public boolean connect() { + connectionTime = Clock.systemUTC().instant(); return true; } + @Override + public Instant connectionTime() { return connectionTime; } + @Override public Endpoint getEndpoint() { return endpoint; diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/GatewayConnection.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/GatewayConnection.java index 3e5bdfe3056..1b205d8ee41 100644 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/GatewayConnection.java +++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/GatewayConnection.java @@ -6,10 +6,14 @@ import com.yahoo.vespa.http.client.core.Document; import com.yahoo.vespa.http.client.core.ServerResponseException; import java.io.IOException; import java.io.InputStream; +import java.time.Instant; import java.util.List; public interface GatewayConnection { + /** Returns the time this connected over the network, or null if not connected yet */ + Instant connectionTime(); + InputStream writeOperations(List docs) throws ServerResponseException, IOException; InputStream drain() throws ServerResponseException, IOException; diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java index bcbcddce6c4..9aad633bd7b 100644 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java +++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java @@ -33,7 +33,7 @@ class IOThread implements Runnable, AutoCloseable { private static final Logger log = Logger.getLogger(IOThread.class.getName()); private final Endpoint endpoint; - private final GatewayConnection currentClient; + private final GatewayConnection currentConnection; private final DocumentQueue documentQueue; private final EndpointResultQueue resultQueue; private final Thread thread; @@ -71,7 +71,7 @@ class IOThread implements Runnable, AutoCloseable { double idlePollFrequency) { this.endpoint = endpoint; this.documentQueue = documentQueue; - this.currentClient = connectionFactory.newConnection(); + this.currentConnection = connectionFactory.newConnection(); this.resultQueue = endpointResultQueue; this.clusterId = clusterId; this.maxChunkSizeBytes = maxChunkSizeBytes; @@ -152,14 +152,14 @@ class IOThread implements Runnable, AutoCloseable { if (size > 0) { log.info("We have outstanding operations (" + size + ") , trying to fetch responses."); try { - processResponse(currentClient.drain()); + processResponse(currentConnection.drain()); } catch (Throwable e) { log.log(Level.SEVERE, "Some failures while trying to get latest responses from vespa.", e); } } try { - currentClient.close(); + currentConnection.close(); } finally { // If there is still documents in the queue, fail them. drainDocumentQueueWhenFailingPermanently(new Exception( @@ -236,7 +236,7 @@ class IOThread implements Runnable, AutoCloseable { private InputStream sendAndReceive(List docs) throws IOException, ServerResponseException { try { // Post the new docs and get async responses for other posts. - return currentClient.writeOperations(docs); + return currentConnection.writeOperations(docs); } catch (ServerResponseException ser) { markDocumentAsFailed(docs, ser); throw ser; @@ -314,7 +314,7 @@ class IOThread implements Runnable, AutoCloseable { switch(threadState) { case DISCONNECTED: try { - if (! currentClient.connect()) { + if (! currentConnection.connect()) { log.log(Level.WARNING, "Could not connect to endpoint: '" + endpoint + "'. Will re-try."); drainFirstDocumentsInQueueIfOld(); return ThreadState.DISCONNECTED; @@ -330,7 +330,7 @@ class IOThread implements Runnable, AutoCloseable { } case CONNECTED: try { - currentClient.handshake(); + currentConnection.handshake(); successfulHandshakes.getAndIncrement(); } catch (ServerResponseException ser) { @@ -347,7 +347,7 @@ class IOThread implements Runnable, AutoCloseable { log.log(Level.INFO, "Failed talking to endpoint. Handshake with server endpoint '" + endpoint + "' failed. Will re-try handshake. Failed with '" + Exceptions.toMessageString(throwable) + "'",throwable); drainFirstDocumentsInQueueIfOld(); - currentClient.close(); + currentConnection.close(); return ThreadState.DISCONNECTED; } return ThreadState.SESSION_SYNCED; @@ -365,13 +365,13 @@ class IOThread implements Runnable, AutoCloseable { catch (Throwable e) { // Covers IOException as well log.log(Level.INFO, "Problems while handing data over to endpoint '" + endpoint + "'. Will re-try. Connection level error. Failed with '" + Exceptions.toMessageString(e) + "'", e); - currentClient.close(); + currentConnection.close(); return ThreadState.DISCONNECTED; } return ThreadState.SESSION_SYNCED; default: { log.severe("Should never get here."); - currentClient.close(); + currentConnection.close(); return ThreadState.DISCONNECTED; } } diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/ApacheGatewayConnectionTest.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/ApacheGatewayConnectionTest.java index c510b6d0337..494f901d8d7 100644 --- a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/ApacheGatewayConnectionTest.java +++ b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/ApacheGatewayConnectionTest.java @@ -42,7 +42,6 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; - public class ApacheGatewayConnectionTest { @Rule @@ -50,20 +49,18 @@ public class ApacheGatewayConnectionTest { @Test public void testProtocolV3() throws Exception { - final Endpoint endpoint = Endpoint.create("localhost", 666, false); - final FeedParams feedParams = new FeedParams.Builder().setDataFormat(FeedParams.DataFormat.JSON_UTF8).build(); - final String clusterSpecificRoute = ""; - final ConnectionParams connectionParams = new ConnectionParams.Builder() - .build(); - final List documents = new ArrayList<>(); + Endpoint endpoint = Endpoint.create("localhost", 666, false); + FeedParams feedParams = new FeedParams.Builder().setDataFormat(FeedParams.DataFormat.JSON_UTF8).build(); + String clusterSpecificRoute = ""; + ConnectionParams connectionParams = new ConnectionParams.Builder().build(); + List documents = new ArrayList<>(); - final String vespaDocContent = "Hello, I a JSON doc."; - final String docId = "42"; + String vespaDocContent = "Hello, I a JSON doc."; + String docId = "42"; - final AtomicInteger requestsReceived = new AtomicInteger(0); // This is the fake server, takes header client ID and uses this as session Id. ApacheGatewayConnection.HttpClientFactory mockFactory = mockHttpClientFactory(post -> { - final Header clientIdHeader = post.getFirstHeader(Headers.CLIENT_ID); + Header clientIdHeader = post.getFirstHeader(Headers.CLIENT_ID); return httpResponse(clientIdHeader.getValue(), "3"); }); -- cgit v1.2.3