summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJon Bratseth <bratseth@gmail.com>2020-09-04 10:48:24 +0200
committerJon Bratseth <bratseth@gmail.com>2020-09-04 10:48:24 +0200
commitf54266496dc50f43a510eb1714ad2853ebd56005 (patch)
treedba2533c1c64fecb44f805afcf5468a44a9f5a53
parent13212978faf3b5921a3e139c1512677f9b882591 (diff)
Revert "Merge pull request #14263 from vespa-engine/revert-14259-bratseth/drain-in-separate-thread"
This reverts commit bed63d34ef760934ba45bb80d36699345c9416f5, reversing changes made to 8f1a532330638c039e2e554ef274879ec77802f6.
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/Endpoint.java43
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java197
-rw-r--r--vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java3
3 files changed, 157 insertions, 86 deletions
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/Endpoint.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/Endpoint.java
index 98aca13fff6..6a421370517 100644
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/Endpoint.java
+++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/Endpoint.java
@@ -12,37 +12,12 @@ import java.net.URL;
*/
public final class Endpoint implements Serializable {
- /**
- * Creates an Endpoint with the default port and without using SSL.
- *
- * @param hostname the hostname
- * @return an Endpoint instance
- */
- public static Endpoint create(String hostname) {
- return new Endpoint(hostname, DEFAULT_PORT, false);
- }
-
- /**
- * Creates an Endpoint with the given hostname, port and SSL setting.
- *
- * @param hostname the hostname
- * @param port the port
- * @param useSsl true if SSL is to be used
- * @return an Endpoint instance
- */
- public static Endpoint create(String hostname, int port, boolean useSsl) {
- return new Endpoint(hostname, port, useSsl);
- }
-
- public static Endpoint create(URL url) {
- return new Endpoint(url.getHost(), url.getPort(), "https".equals(url.getProtocol()));
- }
+ private static final int DEFAULT_PORT = 4080;
- private static final long serialVersionUID = 4545345L;
private final String hostname;
private final int port;
private final boolean useSsl;
- private static final int DEFAULT_PORT = 4080;
+
private Endpoint(String hostname, int port, boolean useSsl) {
if (hostname.startsWith("https://")) {
throw new RuntimeException("Hostname should be name of machine, not prefixed with protocol (https://)");
@@ -90,4 +65,18 @@ public final class Endpoint implements Serializable {
return result;
}
+ /** Creates an Endpoint with the default port and without using SSL */
+ public static Endpoint create(String hostname) {
+ return new Endpoint(hostname, DEFAULT_PORT, false);
+ }
+
+ /** Creates an Endpoint with the given hostname, port and SSL setting. */
+ public static Endpoint create(String hostname, int port, boolean useSsl) {
+ return new Endpoint(hostname, port, useSsl);
+ }
+
+ public static Endpoint create(URL url) {
+ return new Endpoint(url.getHost(), url.getPort(), "https".equals(url.getProtocol()));
+ }
+
}
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java
index 652f65489db..7bd0918eb74 100644
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java
+++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java
@@ -23,6 +23,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.Random;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -56,18 +57,11 @@ public class IOThread implements Runnable, AutoCloseable {
private final long pollIntervalUS;
private final Clock clock;
private final Random random = new Random();
+ private final OldConnectionsDrainer oldConnectionsDrainer;
private GatewayConnection currentConnection;
private ConnectionState connectionState = ConnectionState.DISCONNECTED;
- /**
- * Previous connections on which we have sent operations and are still waiting for the result
- * (so all connections in this are in state SESSION_SYNCED).
- * We need to drain results on the connection where they were sent to make sure we request results on
- * the node which received the operation also when going through a VIP.
- */
- private final List<GatewayConnection> oldConnections = new ArrayList<>();
-
private enum ConnectionState { DISCONNECTED, CONNECTED, SESSION_SYNCED };
private final AtomicInteger wrongSessionDetectedCounter = new AtomicInteger(0);
private final AtomicInteger wrongVersionDetectedCounter = new AtomicInteger(0);
@@ -106,10 +100,22 @@ public class IOThread implements Runnable, AutoCloseable {
this.pollIntervalUS = Math.max(1, (long)(1000000.0/Math.max(0.1, idlePollFrequency))); // ensure range [1us, 10s]
this.clock = clock;
this.localQueueTimeOut = localQueueTimeOut;
+ this.oldConnectionsDrainer = new OldConnectionsDrainer(endpoint,
+ clusterId,
+ pollIntervalUS,
+ connectionTimeToLive,
+ localQueueTimeOut,
+ statusReceivedCounter,
+ resultQueue,
+ stopSignal,
+ clock);
if (runThreads) {
this.thread = new Thread(ioThreadGroup, this, "IOThread " + endpoint);
thread.setDaemon(true);
thread.start();
+ Thread thread = new Thread(ioThreadGroup, oldConnectionsDrainer, "IOThread " + endpoint + " drainer");
+ thread.setDaemon(true);
+ thread.start();
}
else {
this.thread = null;
@@ -144,13 +150,13 @@ public class IOThread implements Runnable, AutoCloseable {
stopSignal.countDown();
log.finer("Closed called.");
+ oldConnectionsDrainer.close();
+
// Make a last attempt to get results from previous operations, we have already waited quite a bit before getting here.
int size = resultQueue.getPendingSize();
if (size > 0) {
log.info("We have outstanding operations (" + size + ") , trying to fetch responses.");
try {
- for (GatewayConnection oldConnection : oldConnections)
- processResponse(oldConnection.drain());
processResponse(currentConnection.drain());
} catch (Throwable e) {
log.log(Level.SEVERE, "Some failures while trying to get latest responses from vespa.", e);
@@ -158,8 +164,6 @@ public class IOThread implements Runnable, AutoCloseable {
}
try {
- for (GatewayConnection oldConnection : oldConnections)
- oldConnection.close();
currentConnection.close();
} finally {
// If there is still documents in the queue, fail them.
@@ -260,6 +264,14 @@ public class IOThread implements Runnable, AutoCloseable {
}
private ProcessResponse processResponse(InputStream serverResponse) throws IOException {
+ return processResponse(serverResponse, endpoint, clusterId, statusReceivedCounter, resultQueue);
+ }
+
+ private static ProcessResponse processResponse(InputStream serverResponse,
+ Endpoint endpoint,
+ int clusterId,
+ AtomicInteger statusReceivedCounter,
+ EndpointResultQueue resultQueue) throws IOException {
Collection<EndpointResult> endpointResults = EndPointResultFactory.createResult(endpoint, serverResponse);
statusReceivedCounter.addAndGet(endpointResults.size());
int transientErrors = 0;
@@ -409,7 +421,8 @@ public class IOThread implements Runnable, AutoCloseable {
public void tick() {
ConnectionState oldState = connectionState;
connectionState = cycle(connectionState);
- checkOldConnections();
+ if (thread == null)
+ oldConnectionsDrainer.checkOldConnections();
if (thread != null)
sleepIfProblemsGettingSyncedConnection(connectionState, oldState);
}
@@ -445,51 +458,11 @@ public class IOThread implements Runnable, AutoCloseable {
private ConnectionState refreshConnection(ConnectionState currentConnectionState) {
if (currentConnectionState == ConnectionState.SESSION_SYNCED)
- oldConnections.add(currentConnection);
+ oldConnectionsDrainer.add(currentConnection);
currentConnection = connectionFactory.newConnection();
return ConnectionState.DISCONNECTED;
}
- private void checkOldConnections() {
- for (Iterator<GatewayConnection> i = oldConnections.iterator(); i.hasNext(); ) {
- GatewayConnection connection = i.next();
- if (closingTime(connection).isBefore(clock.instant())) {
- try {
- processResponse(connection.poll());
- connection.close();
- i.remove();
- }
- catch (Exception e) {
- // Old connection; best effort
- }
- }
- else if (timeToPoll(connection)) {
- try {
- processResponse(connection.poll());
- }
- catch (Exception e) {
- // Old connection; best effort
- }
- }
- }
- }
-
- private Instant closingTime(GatewayConnection connection) {
- return connection.connectionTime().plus(connectionTimeToLive).plus(localQueueTimeOut);
- }
-
- private boolean timeToPoll(GatewayConnection connection) {
- if (connection.lastPollTime() == null) return true;
-
- // Exponential (2^x) dropoff:
- double unit = pollIntervalUS / 1000.0;
- double x = ( clock.millis() - connection.connectionTime().plus(connectionTimeToLive).toEpochMilli() ) / unit;
- double lastX = ( connection.lastPollTime().toEpochMilli() - connection.connectionTime().plus(connectionTimeToLive).toEpochMilli() ) / unit;
-
- double currentDelayDoublings = Math.log(lastX)/Math.log(2);
- return (x - lastX) > Math.pow(2, currentDelayDoublings);
- }
-
public static class ConnectionStats {
// NOTE: These fields are accessed by reflection in JSON serialization
@@ -528,10 +501,122 @@ public class IOThread implements Runnable, AutoCloseable {
/** For testing. Returns the current connection of this. Not thread safe. */
public GatewayConnection currentConnection() { return currentConnection; }
- /** For testing. Returns a snapshot of the old connections of this. Not thread safe. */
- public List<GatewayConnection> oldConnections() { return new ArrayList<>(oldConnections); }
+ /** For testing. Returns a snapshot of the old connections of this. */
+ public List<GatewayConnection> oldConnections() { return oldConnectionsDrainer.connections(); }
/** For testing */
public EndpointResultQueue resultQueue() { return resultQueue; }
+ /**
+ * We need to drain results on the connection where they were sent to make sure we request results on
+ * the node which received the operation also when going through a VIP.
+ */
+ private static class OldConnectionsDrainer implements Runnable {
+
+ private final Endpoint endpoint;
+ private final int clusterId;
+ private final long pollIntervalUS;
+ private final Duration connectionTimeToLive;
+ private final Duration localQueueTimeOut;
+ private final AtomicInteger statusReceivedCounter;
+ private final EndpointResultQueue resultQueue;
+ private final CountDownLatch stopSignal;
+ private final Clock clock;
+
+ /**
+ * Previous connections on which we may have sent operations and are still waiting for the results
+ * All connections in this are in state SESSION_SYNCED.
+ */
+ private final List<GatewayConnection> connections = new CopyOnWriteArrayList<>();
+
+ OldConnectionsDrainer(Endpoint endpoint,
+ int clusterId,
+ long pollIntervalUS,
+ Duration connectionTimeToLive,
+ Duration localQueueTimeOut,
+ AtomicInteger statusReceivedCounter,
+ EndpointResultQueue resultQueue,
+ CountDownLatch stopSignal,
+ Clock clock) {
+ this.endpoint = endpoint;
+ this.clusterId = clusterId;
+ this.pollIntervalUS = pollIntervalUS;
+ this.connectionTimeToLive = connectionTimeToLive;
+ this.localQueueTimeOut = localQueueTimeOut;
+ this.statusReceivedCounter = statusReceivedCounter;
+ this.resultQueue = resultQueue;
+ this.stopSignal = stopSignal;
+ this.clock = clock;
+ }
+
+ /** Add another old connection to this for draining */
+ public void add(GatewayConnection connection) {
+ connections.add(connection);
+ }
+
+ @Override
+ public void run() {
+ while (stopSignal.getCount() > 0)
+ checkOldConnections();
+ }
+
+ public void checkOldConnections() {
+ List<GatewayConnection> toRemove = new ArrayList<>();
+ for (GatewayConnection connection : connections) {
+ if (closingTime(connection).isBefore(clock.instant())) {
+ try {
+ IOThread.processResponse(connection.poll(), endpoint, clusterId, statusReceivedCounter, resultQueue);
+ connection.close();
+ toRemove.add(connection);
+ } catch (Exception e) {
+ // Old connection; best effort
+ }
+ } else if (timeToPoll(connection)) {
+ try {
+ IOThread.processResponse(connection.poll(), endpoint, clusterId, statusReceivedCounter, resultQueue);
+ } catch (Exception e) {
+ // Old connection; best effort
+ }
+ }
+ }
+ connections.removeAll(toRemove);
+ }
+
+ private boolean timeToPoll(GatewayConnection connection) {
+ if (connection.lastPollTime() == null) return true;
+
+ // Exponential (2^x) dropoff:
+ double unit = pollIntervalUS / 1000.0;
+ double x = ( clock.millis() - connection.connectionTime().plus(connectionTimeToLive).toEpochMilli() ) / unit;
+ double lastX = ( connection.lastPollTime().toEpochMilli() - connection.connectionTime().plus(connectionTimeToLive).toEpochMilli() ) / unit;
+
+ double currentDelayDoublings = Math.log(lastX)/Math.log(2);
+ return (x - lastX) > Math.pow(2, currentDelayDoublings);
+ }
+
+ private Instant closingTime(GatewayConnection connection) {
+ return connection.connectionTime().plus(connectionTimeToLive).plus(localQueueTimeOut);
+ }
+
+ private void close() {
+ int size = resultQueue.getPendingSize();
+ if (size > 0) {
+ log.info("We have outstanding operations (" + size + ") , trying to fetch responses.");
+ for (GatewayConnection connection : connections) {
+ try {
+ IOThread.processResponse(connection.poll(), endpoint, clusterId, statusReceivedCounter, resultQueue);
+ } catch (Throwable e) {
+ log.log(Level.SEVERE, "Some failures while trying to get latest responses from vespa.", e);
+ }
+ }
+ }
+ for (GatewayConnection oldConnection : connections)
+ oldConnection.close();
+ }
+
+ /** For testing. Returns the old connections of this. */
+ public List<GatewayConnection> connections() { return Collections.unmodifiableList(connections); }
+
+ }
+
}
diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java
index e684c929fda..c5913d73d7b 100644
--- a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java
+++ b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java
@@ -1,9 +1,6 @@
// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.vespa.http.client.core.communication;
-import com.yahoo.vespa.http.client.FeedClient;
-import com.yahoo.vespa.http.client.FeedEndpointException;
-import com.yahoo.vespa.http.client.Result;
import com.yahoo.vespa.http.client.core.OperationProcessorTester;
import com.yahoo.vespa.http.client.core.ServerResponseException;
import org.junit.Test;