aboutsummaryrefslogtreecommitdiffstats
path: root/vespa-http-client
diff options
context:
space:
mode:
authorJon Bratseth <bratseth@gmail.com>2020-09-03 10:12:08 +0200
committerGitHub <noreply@github.com>2020-09-03 10:12:08 +0200
commit79739839c06bd6c6ddd8608d45e67e4da5dcfe0d (patch)
tree5de7617e25de78082e847f9c83939b6a41021abb /vespa-http-client
parent8f1a532330638c039e2e554ef274879ec77802f6 (diff)
Revert "Drain in a separate thread"
Diffstat (limited to 'vespa-http-client')
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/Endpoint.java43
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java197
-rw-r--r--vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java3
3 files changed, 86 insertions, 157 deletions
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/Endpoint.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/Endpoint.java
index 6a421370517..98aca13fff6 100644
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/Endpoint.java
+++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/Endpoint.java
@@ -12,12 +12,37 @@ import java.net.URL;
*/
public final class Endpoint implements Serializable {
- private static final int DEFAULT_PORT = 4080;
+ /**
+ * Creates an Endpoint with the default port and without using SSL.
+ *
+ * @param hostname the hostname
+ * @return an Endpoint instance
+ */
+ public static Endpoint create(String hostname) {
+ return new Endpoint(hostname, DEFAULT_PORT, false);
+ }
+ /**
+ * Creates an Endpoint with the given hostname, port and SSL setting.
+ *
+ * @param hostname the hostname
+ * @param port the port
+ * @param useSsl true if SSL is to be used
+ * @return an Endpoint instance
+ */
+ public static Endpoint create(String hostname, int port, boolean useSsl) {
+ return new Endpoint(hostname, port, useSsl);
+ }
+
+ public static Endpoint create(URL url) {
+ return new Endpoint(url.getHost(), url.getPort(), "https".equals(url.getProtocol()));
+ }
+
+ private static final long serialVersionUID = 4545345L;
private final String hostname;
private final int port;
private final boolean useSsl;
-
+ private static final int DEFAULT_PORT = 4080;
private Endpoint(String hostname, int port, boolean useSsl) {
if (hostname.startsWith("https://")) {
throw new RuntimeException("Hostname should be name of machine, not prefixed with protocol (https://)");
@@ -65,18 +90,4 @@ public final class Endpoint implements Serializable {
return result;
}
- /** Creates an Endpoint with the default port and without using SSL */
- public static Endpoint create(String hostname) {
- return new Endpoint(hostname, DEFAULT_PORT, false);
- }
-
- /** Creates an Endpoint with the given hostname, port and SSL setting. */
- public static Endpoint create(String hostname, int port, boolean useSsl) {
- return new Endpoint(hostname, port, useSsl);
- }
-
- public static Endpoint create(URL url) {
- return new Endpoint(url.getHost(), url.getPort(), "https".equals(url.getProtocol()));
- }
-
}
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java
index 7bd0918eb74..652f65489db 100644
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java
+++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java
@@ -23,7 +23,6 @@ import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.Random;
-import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -57,11 +56,18 @@ public class IOThread implements Runnable, AutoCloseable {
private final long pollIntervalUS;
private final Clock clock;
private final Random random = new Random();
- private final OldConnectionsDrainer oldConnectionsDrainer;
private GatewayConnection currentConnection;
private ConnectionState connectionState = ConnectionState.DISCONNECTED;
+ /**
+ * Previous connections on which we have sent operations and are still waiting for the result
+ * (so all connections in this are in state SESSION_SYNCED).
+ * We need to drain results on the connection where they were sent to make sure we request results on
+ * the node which received the operation also when going through a VIP.
+ */
+ private final List<GatewayConnection> oldConnections = new ArrayList<>();
+
private enum ConnectionState { DISCONNECTED, CONNECTED, SESSION_SYNCED };
private final AtomicInteger wrongSessionDetectedCounter = new AtomicInteger(0);
private final AtomicInteger wrongVersionDetectedCounter = new AtomicInteger(0);
@@ -100,22 +106,10 @@ public class IOThread implements Runnable, AutoCloseable {
this.pollIntervalUS = Math.max(1, (long)(1000000.0/Math.max(0.1, idlePollFrequency))); // ensure range [1us, 10s]
this.clock = clock;
this.localQueueTimeOut = localQueueTimeOut;
- this.oldConnectionsDrainer = new OldConnectionsDrainer(endpoint,
- clusterId,
- pollIntervalUS,
- connectionTimeToLive,
- localQueueTimeOut,
- statusReceivedCounter,
- resultQueue,
- stopSignal,
- clock);
if (runThreads) {
this.thread = new Thread(ioThreadGroup, this, "IOThread " + endpoint);
thread.setDaemon(true);
thread.start();
- Thread thread = new Thread(ioThreadGroup, oldConnectionsDrainer, "IOThread " + endpoint + " drainer");
- thread.setDaemon(true);
- thread.start();
}
else {
this.thread = null;
@@ -150,13 +144,13 @@ public class IOThread implements Runnable, AutoCloseable {
stopSignal.countDown();
log.finer("Closed called.");
- oldConnectionsDrainer.close();
-
// Make a last attempt to get results from previous operations, we have already waited quite a bit before getting here.
int size = resultQueue.getPendingSize();
if (size > 0) {
log.info("We have outstanding operations (" + size + ") , trying to fetch responses.");
try {
+ for (GatewayConnection oldConnection : oldConnections)
+ processResponse(oldConnection.drain());
processResponse(currentConnection.drain());
} catch (Throwable e) {
log.log(Level.SEVERE, "Some failures while trying to get latest responses from vespa.", e);
@@ -164,6 +158,8 @@ public class IOThread implements Runnable, AutoCloseable {
}
try {
+ for (GatewayConnection oldConnection : oldConnections)
+ oldConnection.close();
currentConnection.close();
} finally {
// If there is still documents in the queue, fail them.
@@ -264,14 +260,6 @@ public class IOThread implements Runnable, AutoCloseable {
}
private ProcessResponse processResponse(InputStream serverResponse) throws IOException {
- return processResponse(serverResponse, endpoint, clusterId, statusReceivedCounter, resultQueue);
- }
-
- private static ProcessResponse processResponse(InputStream serverResponse,
- Endpoint endpoint,
- int clusterId,
- AtomicInteger statusReceivedCounter,
- EndpointResultQueue resultQueue) throws IOException {
Collection<EndpointResult> endpointResults = EndPointResultFactory.createResult(endpoint, serverResponse);
statusReceivedCounter.addAndGet(endpointResults.size());
int transientErrors = 0;
@@ -421,8 +409,7 @@ public class IOThread implements Runnable, AutoCloseable {
public void tick() {
ConnectionState oldState = connectionState;
connectionState = cycle(connectionState);
- if (thread == null)
- oldConnectionsDrainer.checkOldConnections();
+ checkOldConnections();
if (thread != null)
sleepIfProblemsGettingSyncedConnection(connectionState, oldState);
}
@@ -458,11 +445,51 @@ public class IOThread implements Runnable, AutoCloseable {
private ConnectionState refreshConnection(ConnectionState currentConnectionState) {
if (currentConnectionState == ConnectionState.SESSION_SYNCED)
- oldConnectionsDrainer.add(currentConnection);
+ oldConnections.add(currentConnection);
currentConnection = connectionFactory.newConnection();
return ConnectionState.DISCONNECTED;
}
+ private void checkOldConnections() {
+ for (Iterator<GatewayConnection> i = oldConnections.iterator(); i.hasNext(); ) {
+ GatewayConnection connection = i.next();
+ if (closingTime(connection).isBefore(clock.instant())) {
+ try {
+ processResponse(connection.poll());
+ connection.close();
+ i.remove();
+ }
+ catch (Exception e) {
+ // Old connection; best effort
+ }
+ }
+ else if (timeToPoll(connection)) {
+ try {
+ processResponse(connection.poll());
+ }
+ catch (Exception e) {
+ // Old connection; best effort
+ }
+ }
+ }
+ }
+
+ private Instant closingTime(GatewayConnection connection) {
+ return connection.connectionTime().plus(connectionTimeToLive).plus(localQueueTimeOut);
+ }
+
+ private boolean timeToPoll(GatewayConnection connection) {
+ if (connection.lastPollTime() == null) return true;
+
+ // Exponential (2^x) dropoff:
+ double unit = pollIntervalUS / 1000.0;
+ double x = ( clock.millis() - connection.connectionTime().plus(connectionTimeToLive).toEpochMilli() ) / unit;
+ double lastX = ( connection.lastPollTime().toEpochMilli() - connection.connectionTime().plus(connectionTimeToLive).toEpochMilli() ) / unit;
+
+ double currentDelayDoublings = Math.log(lastX)/Math.log(2);
+ return (x - lastX) > Math.pow(2, currentDelayDoublings);
+ }
+
public static class ConnectionStats {
// NOTE: These fields are accessed by reflection in JSON serialization
@@ -501,122 +528,10 @@ public class IOThread implements Runnable, AutoCloseable {
/** For testing. Returns the current connection of this. Not thread safe. */
public GatewayConnection currentConnection() { return currentConnection; }
- /** For testing. Returns a snapshot of the old connections of this. */
- public List<GatewayConnection> oldConnections() { return oldConnectionsDrainer.connections(); }
+ /** For testing. Returns a snapshot of the old connections of this. Not thread safe. */
+ public List<GatewayConnection> oldConnections() { return new ArrayList<>(oldConnections); }
/** For testing */
public EndpointResultQueue resultQueue() { return resultQueue; }
- /**
- * We need to drain results on the connection where they were sent to make sure we request results on
- * the node which received the operation also when going through a VIP.
- */
- private static class OldConnectionsDrainer implements Runnable {
-
- private final Endpoint endpoint;
- private final int clusterId;
- private final long pollIntervalUS;
- private final Duration connectionTimeToLive;
- private final Duration localQueueTimeOut;
- private final AtomicInteger statusReceivedCounter;
- private final EndpointResultQueue resultQueue;
- private final CountDownLatch stopSignal;
- private final Clock clock;
-
- /**
- * Previous connections on which we may have sent operations and are still waiting for the results
- * All connections in this are in state SESSION_SYNCED.
- */
- private final List<GatewayConnection> connections = new CopyOnWriteArrayList<>();
-
- OldConnectionsDrainer(Endpoint endpoint,
- int clusterId,
- long pollIntervalUS,
- Duration connectionTimeToLive,
- Duration localQueueTimeOut,
- AtomicInteger statusReceivedCounter,
- EndpointResultQueue resultQueue,
- CountDownLatch stopSignal,
- Clock clock) {
- this.endpoint = endpoint;
- this.clusterId = clusterId;
- this.pollIntervalUS = pollIntervalUS;
- this.connectionTimeToLive = connectionTimeToLive;
- this.localQueueTimeOut = localQueueTimeOut;
- this.statusReceivedCounter = statusReceivedCounter;
- this.resultQueue = resultQueue;
- this.stopSignal = stopSignal;
- this.clock = clock;
- }
-
- /** Add another old connection to this for draining */
- public void add(GatewayConnection connection) {
- connections.add(connection);
- }
-
- @Override
- public void run() {
- while (stopSignal.getCount() > 0)
- checkOldConnections();
- }
-
- public void checkOldConnections() {
- List<GatewayConnection> toRemove = new ArrayList<>();
- for (GatewayConnection connection : connections) {
- if (closingTime(connection).isBefore(clock.instant())) {
- try {
- IOThread.processResponse(connection.poll(), endpoint, clusterId, statusReceivedCounter, resultQueue);
- connection.close();
- toRemove.add(connection);
- } catch (Exception e) {
- // Old connection; best effort
- }
- } else if (timeToPoll(connection)) {
- try {
- IOThread.processResponse(connection.poll(), endpoint, clusterId, statusReceivedCounter, resultQueue);
- } catch (Exception e) {
- // Old connection; best effort
- }
- }
- }
- connections.removeAll(toRemove);
- }
-
- private boolean timeToPoll(GatewayConnection connection) {
- if (connection.lastPollTime() == null) return true;
-
- // Exponential (2^x) dropoff:
- double unit = pollIntervalUS / 1000.0;
- double x = ( clock.millis() - connection.connectionTime().plus(connectionTimeToLive).toEpochMilli() ) / unit;
- double lastX = ( connection.lastPollTime().toEpochMilli() - connection.connectionTime().plus(connectionTimeToLive).toEpochMilli() ) / unit;
-
- double currentDelayDoublings = Math.log(lastX)/Math.log(2);
- return (x - lastX) > Math.pow(2, currentDelayDoublings);
- }
-
- private Instant closingTime(GatewayConnection connection) {
- return connection.connectionTime().plus(connectionTimeToLive).plus(localQueueTimeOut);
- }
-
- private void close() {
- int size = resultQueue.getPendingSize();
- if (size > 0) {
- log.info("We have outstanding operations (" + size + ") , trying to fetch responses.");
- for (GatewayConnection connection : connections) {
- try {
- IOThread.processResponse(connection.poll(), endpoint, clusterId, statusReceivedCounter, resultQueue);
- } catch (Throwable e) {
- log.log(Level.SEVERE, "Some failures while trying to get latest responses from vespa.", e);
- }
- }
- }
- for (GatewayConnection oldConnection : connections)
- oldConnection.close();
- }
-
- /** For testing. Returns the old connections of this. */
- public List<GatewayConnection> connections() { return Collections.unmodifiableList(connections); }
-
- }
-
}
diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java
index c5913d73d7b..e684c929fda 100644
--- a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java
+++ b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java
@@ -1,6 +1,9 @@
// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.vespa.http.client.core.communication;
+import com.yahoo.vespa.http.client.FeedClient;
+import com.yahoo.vespa.http.client.FeedEndpointException;
+import com.yahoo.vespa.http.client.Result;
import com.yahoo.vespa.http.client.core.OperationProcessorTester;
import com.yahoo.vespa.http.client.core.ServerResponseException;
import org.junit.Test;