aboutsummaryrefslogtreecommitdiffstats
path: root/vespa-http-client
diff options
context:
space:
mode:
authorJon Bratseth <bratseth@gmail.com>2020-08-27 13:37:12 +0200
committerJon Bratseth <bratseth@gmail.com>2020-08-27 13:37:12 +0200
commitc5adf87ecf4d6de277ad233137beeec318c869c3 (patch)
treecfea931f2b451b6b928c74e9f81bdfc32700b8c3 /vespa-http-client
parente2ce587c570e14bd78be3f9d7a7025dadc3b3f00 (diff)
Remember when we connected
Diffstat (limited to 'vespa-http-client')
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ApacheGatewayConnection.java7
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DryRunGatewayConnection.java7
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/GatewayConnection.java4
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java20
-rw-r--r--vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/ApacheGatewayConnectionTest.java19
5 files changed, 36 insertions, 21 deletions
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ApacheGatewayConnection.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ApacheGatewayConnection.java
index 1ef52ac05bb..c5864e48681 100644
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ApacheGatewayConnection.java
+++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ApacheGatewayConnection.java
@@ -30,6 +30,8 @@ import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
+import java.time.Clock;
+import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@@ -62,6 +64,7 @@ class ApacheGatewayConnection implements GatewayConnection {
private final String clusterSpecificRoute;
private final ConnectionParams connectionParams;
private HttpClient httpClient;
+ private Instant connectionTime = null;
private String sessionId;
private final String clientId;
private int negotiatedVersion = -1;
@@ -108,9 +111,13 @@ class ApacheGatewayConnection implements GatewayConnection {
if (httpClient != null)
log.log(Level.WARNING, "Previous httpClient still exists.");
httpClient = httpClientFactory.createClient();
+ connectionTime = Clock.systemUTC().instant();
return httpClient != null;
}
+ @Override
+ public Instant connectionTime() { return connectionTime; }
+
// Protected for easier testing only.
protected static InputStreamEntity zipAndCreateEntity(final InputStream inputStream) throws IOException {
byte[] buffer = new byte[4096];
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DryRunGatewayConnection.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DryRunGatewayConnection.java
index 23ab5e36e14..f91a853c52c 100644
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DryRunGatewayConnection.java
+++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DryRunGatewayConnection.java
@@ -11,6 +11,8 @@ import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
+import java.time.Clock;
+import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
@@ -22,6 +24,7 @@ import java.util.List;
public class DryRunGatewayConnection implements GatewayConnection {
private final Endpoint endpoint;
+ private Instant connectionTime = null;
public DryRunGatewayConnection(Endpoint endpoint) {
this.endpoint = endpoint;
@@ -44,10 +47,14 @@ public class DryRunGatewayConnection implements GatewayConnection {
@Override
public boolean connect() {
+ connectionTime = Clock.systemUTC().instant();
return true;
}
@Override
+ public Instant connectionTime() { return connectionTime; }
+
+ @Override
public Endpoint getEndpoint() {
return endpoint;
}
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/GatewayConnection.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/GatewayConnection.java
index 3e5bdfe3056..1b205d8ee41 100644
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/GatewayConnection.java
+++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/GatewayConnection.java
@@ -6,10 +6,14 @@ import com.yahoo.vespa.http.client.core.Document;
import com.yahoo.vespa.http.client.core.ServerResponseException;
import java.io.IOException;
import java.io.InputStream;
+import java.time.Instant;
import java.util.List;
public interface GatewayConnection {
+ /** Returns the time this connected over the network, or null if not connected yet */
+ Instant connectionTime();
+
InputStream writeOperations(List<Document> docs) throws ServerResponseException, IOException;
InputStream drain() throws ServerResponseException, IOException;
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java
index bcbcddce6c4..9aad633bd7b 100644
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java
+++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java
@@ -33,7 +33,7 @@ class IOThread implements Runnable, AutoCloseable {
private static final Logger log = Logger.getLogger(IOThread.class.getName());
private final Endpoint endpoint;
- private final GatewayConnection currentClient;
+ private final GatewayConnection currentConnection;
private final DocumentQueue documentQueue;
private final EndpointResultQueue resultQueue;
private final Thread thread;
@@ -71,7 +71,7 @@ class IOThread implements Runnable, AutoCloseable {
double idlePollFrequency) {
this.endpoint = endpoint;
this.documentQueue = documentQueue;
- this.currentClient = connectionFactory.newConnection();
+ this.currentConnection = connectionFactory.newConnection();
this.resultQueue = endpointResultQueue;
this.clusterId = clusterId;
this.maxChunkSizeBytes = maxChunkSizeBytes;
@@ -152,14 +152,14 @@ class IOThread implements Runnable, AutoCloseable {
if (size > 0) {
log.info("We have outstanding operations (" + size + ") , trying to fetch responses.");
try {
- processResponse(currentClient.drain());
+ processResponse(currentConnection.drain());
} catch (Throwable e) {
log.log(Level.SEVERE, "Some failures while trying to get latest responses from vespa.", e);
}
}
try {
- currentClient.close();
+ currentConnection.close();
} finally {
// If there is still documents in the queue, fail them.
drainDocumentQueueWhenFailingPermanently(new Exception(
@@ -236,7 +236,7 @@ class IOThread implements Runnable, AutoCloseable {
private InputStream sendAndReceive(List<Document> docs) throws IOException, ServerResponseException {
try {
// Post the new docs and get async responses for other posts.
- return currentClient.writeOperations(docs);
+ return currentConnection.writeOperations(docs);
} catch (ServerResponseException ser) {
markDocumentAsFailed(docs, ser);
throw ser;
@@ -314,7 +314,7 @@ class IOThread implements Runnable, AutoCloseable {
switch(threadState) {
case DISCONNECTED:
try {
- if (! currentClient.connect()) {
+ if (! currentConnection.connect()) {
log.log(Level.WARNING, "Could not connect to endpoint: '" + endpoint + "'. Will re-try.");
drainFirstDocumentsInQueueIfOld();
return ThreadState.DISCONNECTED;
@@ -330,7 +330,7 @@ class IOThread implements Runnable, AutoCloseable {
}
case CONNECTED:
try {
- currentClient.handshake();
+ currentConnection.handshake();
successfulHandshakes.getAndIncrement();
} catch (ServerResponseException ser) {
@@ -347,7 +347,7 @@ class IOThread implements Runnable, AutoCloseable {
log.log(Level.INFO, "Failed talking to endpoint. Handshake with server endpoint '" + endpoint
+ "' failed. Will re-try handshake. Failed with '" + Exceptions.toMessageString(throwable) + "'",throwable);
drainFirstDocumentsInQueueIfOld();
- currentClient.close();
+ currentConnection.close();
return ThreadState.DISCONNECTED;
}
return ThreadState.SESSION_SYNCED;
@@ -365,13 +365,13 @@ class IOThread implements Runnable, AutoCloseable {
catch (Throwable e) { // Covers IOException as well
log.log(Level.INFO, "Problems while handing data over to endpoint '" + endpoint
+ "'. Will re-try. Connection level error. Failed with '" + Exceptions.toMessageString(e) + "'", e);
- currentClient.close();
+ currentConnection.close();
return ThreadState.DISCONNECTED;
}
return ThreadState.SESSION_SYNCED;
default: {
log.severe("Should never get here.");
- currentClient.close();
+ currentConnection.close();
return ThreadState.DISCONNECTED;
}
}
diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/ApacheGatewayConnectionTest.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/ApacheGatewayConnectionTest.java
index c510b6d0337..494f901d8d7 100644
--- a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/ApacheGatewayConnectionTest.java
+++ b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/ApacheGatewayConnectionTest.java
@@ -42,7 +42,6 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
-
public class ApacheGatewayConnectionTest {
@Rule
@@ -50,20 +49,18 @@ public class ApacheGatewayConnectionTest {
@Test
public void testProtocolV3() throws Exception {
- final Endpoint endpoint = Endpoint.create("localhost", 666, false);
- final FeedParams feedParams = new FeedParams.Builder().setDataFormat(FeedParams.DataFormat.JSON_UTF8).build();
- final String clusterSpecificRoute = "";
- final ConnectionParams connectionParams = new ConnectionParams.Builder()
- .build();
- final List<Document> documents = new ArrayList<>();
+ Endpoint endpoint = Endpoint.create("localhost", 666, false);
+ FeedParams feedParams = new FeedParams.Builder().setDataFormat(FeedParams.DataFormat.JSON_UTF8).build();
+ String clusterSpecificRoute = "";
+ ConnectionParams connectionParams = new ConnectionParams.Builder().build();
+ List<Document> documents = new ArrayList<>();
- final String vespaDocContent = "Hello, I a JSON doc.";
- final String docId = "42";
+ String vespaDocContent = "Hello, I a JSON doc.";
+ String docId = "42";
- final AtomicInteger requestsReceived = new AtomicInteger(0);
// This is the fake server, takes header client ID and uses this as session Id.
ApacheGatewayConnection.HttpClientFactory mockFactory = mockHttpClientFactory(post -> {
- final Header clientIdHeader = post.getFirstHeader(Headers.CLIENT_ID);
+ Header clientIdHeader = post.getFirstHeader(Headers.CLIENT_ID);
return httpResponse(clientIdHeader.getValue(), "3");
});