diff options
author | Jon Bratseth <bratseth@gmail.com> | 2020-09-04 10:48:24 +0200 |
---|---|---|
committer | Jon Bratseth <bratseth@gmail.com> | 2020-09-04 10:48:24 +0200 |
commit | f54266496dc50f43a510eb1714ad2853ebd56005 (patch) | |
tree | dba2533c1c64fecb44f805afcf5468a44a9f5a53 /vespa-http-client | |
parent | 13212978faf3b5921a3e139c1512677f9b882591 (diff) |
Revert "Merge pull request #14263 from vespa-engine/revert-14259-bratseth/drain-in-separate-thread"
This reverts commit bed63d34ef760934ba45bb80d36699345c9416f5, reversing
changes made to 8f1a532330638c039e2e554ef274879ec77802f6.
Diffstat (limited to 'vespa-http-client')
3 files changed, 157 insertions, 86 deletions
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/Endpoint.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/Endpoint.java index 98aca13fff6..6a421370517 100644 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/Endpoint.java +++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/Endpoint.java @@ -12,37 +12,12 @@ import java.net.URL; */ public final class Endpoint implements Serializable { - /** - * Creates an Endpoint with the default port and without using SSL. - * - * @param hostname the hostname - * @return an Endpoint instance - */ - public static Endpoint create(String hostname) { - return new Endpoint(hostname, DEFAULT_PORT, false); - } - - /** - * Creates an Endpoint with the given hostname, port and SSL setting. - * - * @param hostname the hostname - * @param port the port - * @param useSsl true if SSL is to be used - * @return an Endpoint instance - */ - public static Endpoint create(String hostname, int port, boolean useSsl) { - return new Endpoint(hostname, port, useSsl); - } - - public static Endpoint create(URL url) { - return new Endpoint(url.getHost(), url.getPort(), "https".equals(url.getProtocol())); - } + private static final int DEFAULT_PORT = 4080; - private static final long serialVersionUID = 4545345L; private final String hostname; private final int port; private final boolean useSsl; - private static final int DEFAULT_PORT = 4080; + private Endpoint(String hostname, int port, boolean useSsl) { if (hostname.startsWith("https://")) { throw new RuntimeException("Hostname should be name of machine, not prefixed with protocol (https://)"); @@ -90,4 +65,18 @@ public final class Endpoint implements Serializable { return result; } + /** Creates an Endpoint with the default port and without using SSL */ + public static Endpoint create(String hostname) { + return new Endpoint(hostname, DEFAULT_PORT, false); + } + + /** Creates an Endpoint with the given hostname, port and SSL setting. */ + public static Endpoint create(String hostname, int port, boolean useSsl) { + return new Endpoint(hostname, port, useSsl); + } + + public static Endpoint create(URL url) { + return new Endpoint(url.getHost(), url.getPort(), "https".equals(url.getProtocol())); + } + } diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java index 652f65489db..7bd0918eb74 100644 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java +++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java @@ -23,6 +23,7 @@ import java.util.Iterator; import java.util.List; import java.util.Optional; import java.util.Random; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -56,18 +57,11 @@ public class IOThread implements Runnable, AutoCloseable { private final long pollIntervalUS; private final Clock clock; private final Random random = new Random(); + private final OldConnectionsDrainer oldConnectionsDrainer; private GatewayConnection currentConnection; private ConnectionState connectionState = ConnectionState.DISCONNECTED; - /** - * Previous connections on which we have sent operations and are still waiting for the result - * (so all connections in this are in state SESSION_SYNCED). - * We need to drain results on the connection where they were sent to make sure we request results on - * the node which received the operation also when going through a VIP. - */ - private final List<GatewayConnection> oldConnections = new ArrayList<>(); - private enum ConnectionState { DISCONNECTED, CONNECTED, SESSION_SYNCED }; private final AtomicInteger wrongSessionDetectedCounter = new AtomicInteger(0); private final AtomicInteger wrongVersionDetectedCounter = new AtomicInteger(0); @@ -106,10 +100,22 @@ public class IOThread implements Runnable, AutoCloseable { this.pollIntervalUS = Math.max(1, (long)(1000000.0/Math.max(0.1, idlePollFrequency))); // ensure range [1us, 10s] this.clock = clock; this.localQueueTimeOut = localQueueTimeOut; + this.oldConnectionsDrainer = new OldConnectionsDrainer(endpoint, + clusterId, + pollIntervalUS, + connectionTimeToLive, + localQueueTimeOut, + statusReceivedCounter, + resultQueue, + stopSignal, + clock); if (runThreads) { this.thread = new Thread(ioThreadGroup, this, "IOThread " + endpoint); thread.setDaemon(true); thread.start(); + Thread thread = new Thread(ioThreadGroup, oldConnectionsDrainer, "IOThread " + endpoint + " drainer"); + thread.setDaemon(true); + thread.start(); } else { this.thread = null; @@ -144,13 +150,13 @@ public class IOThread implements Runnable, AutoCloseable { stopSignal.countDown(); log.finer("Closed called."); + oldConnectionsDrainer.close(); + // Make a last attempt to get results from previous operations, we have already waited quite a bit before getting here. int size = resultQueue.getPendingSize(); if (size > 0) { log.info("We have outstanding operations (" + size + ") , trying to fetch responses."); try { - for (GatewayConnection oldConnection : oldConnections) - processResponse(oldConnection.drain()); processResponse(currentConnection.drain()); } catch (Throwable e) { log.log(Level.SEVERE, "Some failures while trying to get latest responses from vespa.", e); @@ -158,8 +164,6 @@ public class IOThread implements Runnable, AutoCloseable { } try { - for (GatewayConnection oldConnection : oldConnections) - oldConnection.close(); currentConnection.close(); } finally { // If there is still documents in the queue, fail them. @@ -260,6 +264,14 @@ public class IOThread implements Runnable, AutoCloseable { } private ProcessResponse processResponse(InputStream serverResponse) throws IOException { + return processResponse(serverResponse, endpoint, clusterId, statusReceivedCounter, resultQueue); + } + + private static ProcessResponse processResponse(InputStream serverResponse, + Endpoint endpoint, + int clusterId, + AtomicInteger statusReceivedCounter, + EndpointResultQueue resultQueue) throws IOException { Collection<EndpointResult> endpointResults = EndPointResultFactory.createResult(endpoint, serverResponse); statusReceivedCounter.addAndGet(endpointResults.size()); int transientErrors = 0; @@ -409,7 +421,8 @@ public class IOThread implements Runnable, AutoCloseable { public void tick() { ConnectionState oldState = connectionState; connectionState = cycle(connectionState); - checkOldConnections(); + if (thread == null) + oldConnectionsDrainer.checkOldConnections(); if (thread != null) sleepIfProblemsGettingSyncedConnection(connectionState, oldState); } @@ -445,51 +458,11 @@ public class IOThread implements Runnable, AutoCloseable { private ConnectionState refreshConnection(ConnectionState currentConnectionState) { if (currentConnectionState == ConnectionState.SESSION_SYNCED) - oldConnections.add(currentConnection); + oldConnectionsDrainer.add(currentConnection); currentConnection = connectionFactory.newConnection(); return ConnectionState.DISCONNECTED; } - private void checkOldConnections() { - for (Iterator<GatewayConnection> i = oldConnections.iterator(); i.hasNext(); ) { - GatewayConnection connection = i.next(); - if (closingTime(connection).isBefore(clock.instant())) { - try { - processResponse(connection.poll()); - connection.close(); - i.remove(); - } - catch (Exception e) { - // Old connection; best effort - } - } - else if (timeToPoll(connection)) { - try { - processResponse(connection.poll()); - } - catch (Exception e) { - // Old connection; best effort - } - } - } - } - - private Instant closingTime(GatewayConnection connection) { - return connection.connectionTime().plus(connectionTimeToLive).plus(localQueueTimeOut); - } - - private boolean timeToPoll(GatewayConnection connection) { - if (connection.lastPollTime() == null) return true; - - // Exponential (2^x) dropoff: - double unit = pollIntervalUS / 1000.0; - double x = ( clock.millis() - connection.connectionTime().plus(connectionTimeToLive).toEpochMilli() ) / unit; - double lastX = ( connection.lastPollTime().toEpochMilli() - connection.connectionTime().plus(connectionTimeToLive).toEpochMilli() ) / unit; - - double currentDelayDoublings = Math.log(lastX)/Math.log(2); - return (x - lastX) > Math.pow(2, currentDelayDoublings); - } - public static class ConnectionStats { // NOTE: These fields are accessed by reflection in JSON serialization @@ -528,10 +501,122 @@ public class IOThread implements Runnable, AutoCloseable { /** For testing. Returns the current connection of this. Not thread safe. */ public GatewayConnection currentConnection() { return currentConnection; } - /** For testing. Returns a snapshot of the old connections of this. Not thread safe. */ - public List<GatewayConnection> oldConnections() { return new ArrayList<>(oldConnections); } + /** For testing. Returns a snapshot of the old connections of this. */ + public List<GatewayConnection> oldConnections() { return oldConnectionsDrainer.connections(); } /** For testing */ public EndpointResultQueue resultQueue() { return resultQueue; } + /** + * We need to drain results on the connection where they were sent to make sure we request results on + * the node which received the operation also when going through a VIP. + */ + private static class OldConnectionsDrainer implements Runnable { + + private final Endpoint endpoint; + private final int clusterId; + private final long pollIntervalUS; + private final Duration connectionTimeToLive; + private final Duration localQueueTimeOut; + private final AtomicInteger statusReceivedCounter; + private final EndpointResultQueue resultQueue; + private final CountDownLatch stopSignal; + private final Clock clock; + + /** + * Previous connections on which we may have sent operations and are still waiting for the results + * All connections in this are in state SESSION_SYNCED. + */ + private final List<GatewayConnection> connections = new CopyOnWriteArrayList<>(); + + OldConnectionsDrainer(Endpoint endpoint, + int clusterId, + long pollIntervalUS, + Duration connectionTimeToLive, + Duration localQueueTimeOut, + AtomicInteger statusReceivedCounter, + EndpointResultQueue resultQueue, + CountDownLatch stopSignal, + Clock clock) { + this.endpoint = endpoint; + this.clusterId = clusterId; + this.pollIntervalUS = pollIntervalUS; + this.connectionTimeToLive = connectionTimeToLive; + this.localQueueTimeOut = localQueueTimeOut; + this.statusReceivedCounter = statusReceivedCounter; + this.resultQueue = resultQueue; + this.stopSignal = stopSignal; + this.clock = clock; + } + + /** Add another old connection to this for draining */ + public void add(GatewayConnection connection) { + connections.add(connection); + } + + @Override + public void run() { + while (stopSignal.getCount() > 0) + checkOldConnections(); + } + + public void checkOldConnections() { + List<GatewayConnection> toRemove = new ArrayList<>(); + for (GatewayConnection connection : connections) { + if (closingTime(connection).isBefore(clock.instant())) { + try { + IOThread.processResponse(connection.poll(), endpoint, clusterId, statusReceivedCounter, resultQueue); + connection.close(); + toRemove.add(connection); + } catch (Exception e) { + // Old connection; best effort + } + } else if (timeToPoll(connection)) { + try { + IOThread.processResponse(connection.poll(), endpoint, clusterId, statusReceivedCounter, resultQueue); + } catch (Exception e) { + // Old connection; best effort + } + } + } + connections.removeAll(toRemove); + } + + private boolean timeToPoll(GatewayConnection connection) { + if (connection.lastPollTime() == null) return true; + + // Exponential (2^x) dropoff: + double unit = pollIntervalUS / 1000.0; + double x = ( clock.millis() - connection.connectionTime().plus(connectionTimeToLive).toEpochMilli() ) / unit; + double lastX = ( connection.lastPollTime().toEpochMilli() - connection.connectionTime().plus(connectionTimeToLive).toEpochMilli() ) / unit; + + double currentDelayDoublings = Math.log(lastX)/Math.log(2); + return (x - lastX) > Math.pow(2, currentDelayDoublings); + } + + private Instant closingTime(GatewayConnection connection) { + return connection.connectionTime().plus(connectionTimeToLive).plus(localQueueTimeOut); + } + + private void close() { + int size = resultQueue.getPendingSize(); + if (size > 0) { + log.info("We have outstanding operations (" + size + ") , trying to fetch responses."); + for (GatewayConnection connection : connections) { + try { + IOThread.processResponse(connection.poll(), endpoint, clusterId, statusReceivedCounter, resultQueue); + } catch (Throwable e) { + log.log(Level.SEVERE, "Some failures while trying to get latest responses from vespa.", e); + } + } + } + for (GatewayConnection oldConnection : connections) + oldConnection.close(); + } + + /** For testing. Returns the old connections of this. */ + public List<GatewayConnection> connections() { return Collections.unmodifiableList(connections); } + + } + } diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java index e684c929fda..c5913d73d7b 100644 --- a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java +++ b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java @@ -1,9 +1,6 @@ // Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.vespa.http.client.core.communication; -import com.yahoo.vespa.http.client.FeedClient; -import com.yahoo.vespa.http.client.FeedEndpointException; -import com.yahoo.vespa.http.client.Result; import com.yahoo.vespa.http.client.core.OperationProcessorTester; import com.yahoo.vespa.http.client.core.ServerResponseException; import org.junit.Test; |