diff options
author | Jon Bratseth <bratseth@gmail.com> | 2020-09-03 10:12:08 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-09-03 10:12:08 +0200 |
commit | 79739839c06bd6c6ddd8608d45e67e4da5dcfe0d (patch) | |
tree | 5de7617e25de78082e847f9c83939b6a41021abb /vespa-http-client/src | |
parent | 8f1a532330638c039e2e554ef274879ec77802f6 (diff) |
Revert "Drain in a separate thread"
Diffstat (limited to 'vespa-http-client/src')
3 files changed, 86 insertions, 157 deletions
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/Endpoint.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/Endpoint.java index 6a421370517..98aca13fff6 100644 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/Endpoint.java +++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/Endpoint.java @@ -12,12 +12,37 @@ import java.net.URL; */ public final class Endpoint implements Serializable { - private static final int DEFAULT_PORT = 4080; + /** + * Creates an Endpoint with the default port and without using SSL. + * + * @param hostname the hostname + * @return an Endpoint instance + */ + public static Endpoint create(String hostname) { + return new Endpoint(hostname, DEFAULT_PORT, false); + } + /** + * Creates an Endpoint with the given hostname, port and SSL setting. + * + * @param hostname the hostname + * @param port the port + * @param useSsl true if SSL is to be used + * @return an Endpoint instance + */ + public static Endpoint create(String hostname, int port, boolean useSsl) { + return new Endpoint(hostname, port, useSsl); + } + + public static Endpoint create(URL url) { + return new Endpoint(url.getHost(), url.getPort(), "https".equals(url.getProtocol())); + } + + private static final long serialVersionUID = 4545345L; private final String hostname; private final int port; private final boolean useSsl; - + private static final int DEFAULT_PORT = 4080; private Endpoint(String hostname, int port, boolean useSsl) { if (hostname.startsWith("https://")) { throw new RuntimeException("Hostname should be name of machine, not prefixed with protocol (https://)"); @@ -65,18 +90,4 @@ public final class Endpoint implements Serializable { return result; } - /** Creates an Endpoint with the default port and without using SSL */ - public static Endpoint create(String hostname) { - return new Endpoint(hostname, DEFAULT_PORT, false); - } - - /** Creates an Endpoint with the given hostname, port and SSL setting. */ - public static Endpoint create(String hostname, int port, boolean useSsl) { - return new Endpoint(hostname, port, useSsl); - } - - public static Endpoint create(URL url) { - return new Endpoint(url.getHost(), url.getPort(), "https".equals(url.getProtocol())); - } - } diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java index 7bd0918eb74..652f65489db 100644 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java +++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java @@ -23,7 +23,6 @@ import java.util.Iterator; import java.util.List; import java.util.Optional; import java.util.Random; -import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -57,11 +56,18 @@ public class IOThread implements Runnable, AutoCloseable { private final long pollIntervalUS; private final Clock clock; private final Random random = new Random(); - private final OldConnectionsDrainer oldConnectionsDrainer; private GatewayConnection currentConnection; private ConnectionState connectionState = ConnectionState.DISCONNECTED; + /** + * Previous connections on which we have sent operations and are still waiting for the result + * (so all connections in this are in state SESSION_SYNCED). + * We need to drain results on the connection where they were sent to make sure we request results on + * the node which received the operation also when going through a VIP. + */ + private final List<GatewayConnection> oldConnections = new ArrayList<>(); + private enum ConnectionState { DISCONNECTED, CONNECTED, SESSION_SYNCED }; private final AtomicInteger wrongSessionDetectedCounter = new AtomicInteger(0); private final AtomicInteger wrongVersionDetectedCounter = new AtomicInteger(0); @@ -100,22 +106,10 @@ public class IOThread implements Runnable, AutoCloseable { this.pollIntervalUS = Math.max(1, (long)(1000000.0/Math.max(0.1, idlePollFrequency))); // ensure range [1us, 10s] this.clock = clock; this.localQueueTimeOut = localQueueTimeOut; - this.oldConnectionsDrainer = new OldConnectionsDrainer(endpoint, - clusterId, - pollIntervalUS, - connectionTimeToLive, - localQueueTimeOut, - statusReceivedCounter, - resultQueue, - stopSignal, - clock); if (runThreads) { this.thread = new Thread(ioThreadGroup, this, "IOThread " + endpoint); thread.setDaemon(true); thread.start(); - Thread thread = new Thread(ioThreadGroup, oldConnectionsDrainer, "IOThread " + endpoint + " drainer"); - thread.setDaemon(true); - thread.start(); } else { this.thread = null; @@ -150,13 +144,13 @@ public class IOThread implements Runnable, AutoCloseable { stopSignal.countDown(); log.finer("Closed called."); - oldConnectionsDrainer.close(); - // Make a last attempt to get results from previous operations, we have already waited quite a bit before getting here. int size = resultQueue.getPendingSize(); if (size > 0) { log.info("We have outstanding operations (" + size + ") , trying to fetch responses."); try { + for (GatewayConnection oldConnection : oldConnections) + processResponse(oldConnection.drain()); processResponse(currentConnection.drain()); } catch (Throwable e) { log.log(Level.SEVERE, "Some failures while trying to get latest responses from vespa.", e); @@ -164,6 +158,8 @@ public class IOThread implements Runnable, AutoCloseable { } try { + for (GatewayConnection oldConnection : oldConnections) + oldConnection.close(); currentConnection.close(); } finally { // If there is still documents in the queue, fail them. @@ -264,14 +260,6 @@ public class IOThread implements Runnable, AutoCloseable { } private ProcessResponse processResponse(InputStream serverResponse) throws IOException { - return processResponse(serverResponse, endpoint, clusterId, statusReceivedCounter, resultQueue); - } - - private static ProcessResponse processResponse(InputStream serverResponse, - Endpoint endpoint, - int clusterId, - AtomicInteger statusReceivedCounter, - EndpointResultQueue resultQueue) throws IOException { Collection<EndpointResult> endpointResults = EndPointResultFactory.createResult(endpoint, serverResponse); statusReceivedCounter.addAndGet(endpointResults.size()); int transientErrors = 0; @@ -421,8 +409,7 @@ public class IOThread implements Runnable, AutoCloseable { public void tick() { ConnectionState oldState = connectionState; connectionState = cycle(connectionState); - if (thread == null) - oldConnectionsDrainer.checkOldConnections(); + checkOldConnections(); if (thread != null) sleepIfProblemsGettingSyncedConnection(connectionState, oldState); } @@ -458,11 +445,51 @@ public class IOThread implements Runnable, AutoCloseable { private ConnectionState refreshConnection(ConnectionState currentConnectionState) { if (currentConnectionState == ConnectionState.SESSION_SYNCED) - oldConnectionsDrainer.add(currentConnection); + oldConnections.add(currentConnection); currentConnection = connectionFactory.newConnection(); return ConnectionState.DISCONNECTED; } + private void checkOldConnections() { + for (Iterator<GatewayConnection> i = oldConnections.iterator(); i.hasNext(); ) { + GatewayConnection connection = i.next(); + if (closingTime(connection).isBefore(clock.instant())) { + try { + processResponse(connection.poll()); + connection.close(); + i.remove(); + } + catch (Exception e) { + // Old connection; best effort + } + } + else if (timeToPoll(connection)) { + try { + processResponse(connection.poll()); + } + catch (Exception e) { + // Old connection; best effort + } + } + } + } + + private Instant closingTime(GatewayConnection connection) { + return connection.connectionTime().plus(connectionTimeToLive).plus(localQueueTimeOut); + } + + private boolean timeToPoll(GatewayConnection connection) { + if (connection.lastPollTime() == null) return true; + + // Exponential (2^x) dropoff: + double unit = pollIntervalUS / 1000.0; + double x = ( clock.millis() - connection.connectionTime().plus(connectionTimeToLive).toEpochMilli() ) / unit; + double lastX = ( connection.lastPollTime().toEpochMilli() - connection.connectionTime().plus(connectionTimeToLive).toEpochMilli() ) / unit; + + double currentDelayDoublings = Math.log(lastX)/Math.log(2); + return (x - lastX) > Math.pow(2, currentDelayDoublings); + } + public static class ConnectionStats { // NOTE: These fields are accessed by reflection in JSON serialization @@ -501,122 +528,10 @@ public class IOThread implements Runnable, AutoCloseable { /** For testing. Returns the current connection of this. Not thread safe. */ public GatewayConnection currentConnection() { return currentConnection; } - /** For testing. Returns a snapshot of the old connections of this. */ - public List<GatewayConnection> oldConnections() { return oldConnectionsDrainer.connections(); } + /** For testing. Returns a snapshot of the old connections of this. Not thread safe. */ + public List<GatewayConnection> oldConnections() { return new ArrayList<>(oldConnections); } /** For testing */ public EndpointResultQueue resultQueue() { return resultQueue; } - /** - * We need to drain results on the connection where they were sent to make sure we request results on - * the node which received the operation also when going through a VIP. - */ - private static class OldConnectionsDrainer implements Runnable { - - private final Endpoint endpoint; - private final int clusterId; - private final long pollIntervalUS; - private final Duration connectionTimeToLive; - private final Duration localQueueTimeOut; - private final AtomicInteger statusReceivedCounter; - private final EndpointResultQueue resultQueue; - private final CountDownLatch stopSignal; - private final Clock clock; - - /** - * Previous connections on which we may have sent operations and are still waiting for the results - * All connections in this are in state SESSION_SYNCED. - */ - private final List<GatewayConnection> connections = new CopyOnWriteArrayList<>(); - - OldConnectionsDrainer(Endpoint endpoint, - int clusterId, - long pollIntervalUS, - Duration connectionTimeToLive, - Duration localQueueTimeOut, - AtomicInteger statusReceivedCounter, - EndpointResultQueue resultQueue, - CountDownLatch stopSignal, - Clock clock) { - this.endpoint = endpoint; - this.clusterId = clusterId; - this.pollIntervalUS = pollIntervalUS; - this.connectionTimeToLive = connectionTimeToLive; - this.localQueueTimeOut = localQueueTimeOut; - this.statusReceivedCounter = statusReceivedCounter; - this.resultQueue = resultQueue; - this.stopSignal = stopSignal; - this.clock = clock; - } - - /** Add another old connection to this for draining */ - public void add(GatewayConnection connection) { - connections.add(connection); - } - - @Override - public void run() { - while (stopSignal.getCount() > 0) - checkOldConnections(); - } - - public void checkOldConnections() { - List<GatewayConnection> toRemove = new ArrayList<>(); - for (GatewayConnection connection : connections) { - if (closingTime(connection).isBefore(clock.instant())) { - try { - IOThread.processResponse(connection.poll(), endpoint, clusterId, statusReceivedCounter, resultQueue); - connection.close(); - toRemove.add(connection); - } catch (Exception e) { - // Old connection; best effort - } - } else if (timeToPoll(connection)) { - try { - IOThread.processResponse(connection.poll(), endpoint, clusterId, statusReceivedCounter, resultQueue); - } catch (Exception e) { - // Old connection; best effort - } - } - } - connections.removeAll(toRemove); - } - - private boolean timeToPoll(GatewayConnection connection) { - if (connection.lastPollTime() == null) return true; - - // Exponential (2^x) dropoff: - double unit = pollIntervalUS / 1000.0; - double x = ( clock.millis() - connection.connectionTime().plus(connectionTimeToLive).toEpochMilli() ) / unit; - double lastX = ( connection.lastPollTime().toEpochMilli() - connection.connectionTime().plus(connectionTimeToLive).toEpochMilli() ) / unit; - - double currentDelayDoublings = Math.log(lastX)/Math.log(2); - return (x - lastX) > Math.pow(2, currentDelayDoublings); - } - - private Instant closingTime(GatewayConnection connection) { - return connection.connectionTime().plus(connectionTimeToLive).plus(localQueueTimeOut); - } - - private void close() { - int size = resultQueue.getPendingSize(); - if (size > 0) { - log.info("We have outstanding operations (" + size + ") , trying to fetch responses."); - for (GatewayConnection connection : connections) { - try { - IOThread.processResponse(connection.poll(), endpoint, clusterId, statusReceivedCounter, resultQueue); - } catch (Throwable e) { - log.log(Level.SEVERE, "Some failures while trying to get latest responses from vespa.", e); - } - } - } - for (GatewayConnection oldConnection : connections) - oldConnection.close(); - } - - /** For testing. Returns the old connections of this. */ - public List<GatewayConnection> connections() { return Collections.unmodifiableList(connections); } - - } - } diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java index c5913d73d7b..e684c929fda 100644 --- a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java +++ b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java @@ -1,6 +1,9 @@ // Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.vespa.http.client.core.communication; +import com.yahoo.vespa.http.client.FeedClient; +import com.yahoo.vespa.http.client.FeedEndpointException; +import com.yahoo.vespa.http.client.Result; import com.yahoo.vespa.http.client.core.OperationProcessorTester; import com.yahoo.vespa.http.client.core.ServerResponseException; import org.junit.Test; |