diff options
author | Harald Musum <musum@verizonmedia.com> | 2021-04-27 10:08:44 +0200 |
---|---|---|
committer | Harald Musum <musum@verizonmedia.com> | 2021-04-27 10:08:44 +0200 |
commit | 1474a930b38858bc814e14a11175b1787981bcfd (patch) | |
tree | b3a86a8ae5e4e4e5f16202335170f1f7086f6c0e /config | |
parent | 636e0b9c27c406e0b75a5c9e9d3da2af1d333b9d (diff) |
Change the way we switch connection when the current one is not working
Consider only healthy sources (if there are any) when switching to
a new one fue to failures.
Also log when connecting to a source
Diffstat (limited to 'config')
5 files changed, 90 insertions, 55 deletions
diff --git a/config/src/main/java/com/yahoo/config/subscription/impl/MockConnection.java b/config/src/main/java/com/yahoo/config/subscription/impl/MockConnection.java index 58eed7f9e78..e6b81adf787 100644 --- a/config/src/main/java/com/yahoo/config/subscription/impl/MockConnection.java +++ b/config/src/main/java/com/yahoo/config/subscription/impl/MockConnection.java @@ -1,4 +1,4 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.config.subscription.impl; import com.yahoo.jrt.Request; @@ -16,7 +16,7 @@ import com.yahoo.vespa.config.util.ConfigUtils; * * @author hmusum */ -public class MockConnection implements ConnectionPool, com.yahoo.vespa.config.Connection { +public class MockConnection implements ConnectionPool, Connection { private Request lastRequest; private final ResponseHandler responseHandler; @@ -87,9 +87,7 @@ public class MockConnection implements ConnectionPool, com.yahoo.vespa.config.Co } @Override - public Connection setNewCurrentConnection() { - return this; - } + public Connection switchConnection() { return this; } @Override public int getSize() { diff --git a/config/src/main/java/com/yahoo/vespa/config/ConnectionPool.java b/config/src/main/java/com/yahoo/vespa/config/ConnectionPool.java index c6a93348d39..786dfa975f4 100644 --- a/config/src/main/java/com/yahoo/vespa/config/ConnectionPool.java +++ b/config/src/main/java/com/yahoo/vespa/config/ConnectionPool.java @@ -1,4 +1,4 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.vespa.config; import com.yahoo.jrt.Supervisor; @@ -10,13 +10,35 @@ public interface ConnectionPool extends AutoCloseable { void close(); + /** + * Sets the supplied Connection to have an error, implementations are expected to call + * {@link #switchConnection()} after setting state for the supplied Connection. + * + */ void setError(Connection connection, int i); Connection getCurrent(); - Connection setNewCurrentConnection(); + /** + * Switches to another (healthy, if one exists) Connection instance. + * Returns the resulting Connection. See also {@link #setError(Connection, int)} + * + * @return a Connection + */ + Connection switchConnection(); + + /** + * Sets the current JRTConnection instance by randomly choosing + * from the available sources and returns the result. + * + * @return a Connection + */ + @Deprecated + default Connection setNewCurrentConnection() { return switchConnection(); }; int getSize(); + // TODO: Exposes implementation, try to remove Supervisor getSupervisor(); + } diff --git a/config/src/main/java/com/yahoo/vespa/config/JRTConnection.java b/config/src/main/java/com/yahoo/vespa/config/JRTConnection.java index cde46eb9de0..52031ebde72 100644 --- a/config/src/main/java/com/yahoo/vespa/config/JRTConnection.java +++ b/config/src/main/java/com/yahoo/vespa/config/JRTConnection.java @@ -1,36 +1,33 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.vespa.config; -import com.yahoo.jrt.*; +import com.yahoo.jrt.Request; +import com.yahoo.jrt.RequestWaiter; +import com.yahoo.jrt.Spec; +import com.yahoo.jrt.Supervisor; +import com.yahoo.jrt.Target; -import java.text.SimpleDateFormat; -import java.util.TimeZone; +import java.time.Duration; +import java.time.Instant; +import java.util.logging.Level; import java.util.logging.Logger; /** * A JRT connection to a config server or config proxy. * * @author Gunnar Gauslaa Bergem + * @author hmusum */ public class JRTConnection implements Connection { - public final static Logger logger = Logger.getLogger(JRTConnection.class.getPackage().getName()); + private final static Logger logger = Logger.getLogger(JRTConnection.class.getPackage().getName()); private final String address; private final Supervisor supervisor; private Target target; - private long lastConnectionAttempt = 0; // Timestamp for last connection attempt - private long lastSuccess = 0; - private long lastFailure = 0; - - private static final long delayBetweenConnectionMessage = 30000; //ms - - private static SimpleDateFormat yyyyMMddz; - static { - yyyyMMddz = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss z"); - yyyyMMddz.setTimeZone(TimeZone.getTimeZone("GMT")); - } - + private Instant lastConnected = Instant.EPOCH.plus(Duration.ofSeconds(1)); // to be healthy initially, see isHealthy() + private Instant lastSuccess = Instant.EPOCH; + private Instant lastFailure = Instant.EPOCH; public JRTConnection(String address, Supervisor supervisor) { this.address = address; @@ -59,39 +56,43 @@ public class JRTConnection implements Connection { */ public synchronized Target getTarget() { if (target == null || !target.isValid()) { - if ((System.currentTimeMillis() - lastConnectionAttempt) > delayBetweenConnectionMessage) { - logger.fine("Connecting to " + address); - } - lastConnectionAttempt = System.currentTimeMillis(); + logger.log(Level.INFO, "Connecting to " + address); target = supervisor.connect(new Spec(address)); + lastConnected = Instant.now(); } return target; } @Override public synchronized void setError(int errorCode) { - lastFailure = System.currentTimeMillis(); + lastFailure = Instant.now(); } @Override public synchronized void setSuccess() { - lastSuccess = System.currentTimeMillis(); + lastSuccess = Instant.now(); + } + + public synchronized boolean isHealthy() { + return lastSuccess.isAfter(lastFailure) || lastConnected.isAfter(lastFailure); } public String toString() { StringBuilder sb = new StringBuilder(); sb.append("Address: "); sb.append(address); - if (lastSuccess > 0) { + sb.append("\n").append("Healthy: ").append(isHealthy()); + if (lastSuccess.isAfter(Instant.EPOCH)) { sb.append("\n"); sb.append("Last success: "); - sb.append(yyyyMMddz.format(lastSuccess)); + sb.append(lastSuccess); } - if (lastFailure > 0) { + if (lastFailure.isAfter(Instant.EPOCH)) { sb.append("\n"); sb.append("Last failure: "); - sb.append(yyyyMMddz.format(lastFailure)); + sb.append(lastFailure); } return sb.toString(); } + } diff --git a/config/src/main/java/com/yahoo/vespa/config/JRTConnectionPool.java b/config/src/main/java/com/yahoo/vespa/config/JRTConnectionPool.java index 2d25602bd66..801cdc868e1 100644 --- a/config/src/main/java/com/yahoo/vespa/config/JRTConnectionPool.java +++ b/config/src/main/java/com/yahoo/vespa/config/JRTConnectionPool.java @@ -12,16 +12,14 @@ import java.util.Map; import java.util.concurrent.ThreadLocalRandom; import java.util.logging.Level; import java.util.logging.Logger; - -import static java.util.logging.Level.FINE; +import java.util.stream.Collectors; /** * A pool of JRT connections to a config source (either a config server or a config proxy). - * The current connection is chosen randomly when calling {#link #setNewCurrentConnection} - * (since the connection is chosen randomly, it might end up using the same connection again, - * and it will always do so if there is only one source). + * The current connection is chosen randomly when calling {#link {@link #switchConnection()}} + * (it will continue to use the same connection if there is only one source). * The current connection is available with {@link #getCurrent()}. - * When calling {@link #setError(Connection, int)}, {#link #setNewCurrentConnection} will always be called. + * When calling {@link #setError(Connection, int)}, {#link {@link #switchConnection()}} will always be called. * * @author Gunnar Gauslaa Bergem * @author hmusum @@ -55,7 +53,7 @@ public class JRTConnectionPool implements ConnectionPool { connections.put(address, new JRTConnection(address, supervisor)); } } - setNewCurrentConnection(); + initialize(); } /** @@ -67,20 +65,34 @@ public class JRTConnectionPool implements ConnectionPool { return currentConnection; } - /** - * Returns and set the current JRTConnection instance by randomly choosing - * from the available sources (this means that you might end up using - * the same connection). - * - * @return a JRTConnection - */ - public synchronized JRTConnection setNewCurrentConnection() { + @Override + public synchronized JRTConnection switchConnection() { List<JRTConnection> sources = getSources(); - currentConnection = sources.get(ThreadLocalRandom.current().nextInt(0, sources.size())); + if (sources.size() <= 1) return currentConnection; + + List<JRTConnection> healthySources = sources.stream() + .filter(JRTConnection::isHealthy) + .collect(Collectors.toList()); + if (healthySources.size() == 0) { + log.log(Level.INFO, "No healthy sources, keep using " + currentConnection); + return currentConnection; + } + JRTConnection newConnection = pickNewConnectionRandomly(healthySources); + log.log(Level.INFO, () -> "Switching from " + currentConnection + " to " + newConnection); + return currentConnection = newConnection; + } + + public synchronized JRTConnection initialize() { + List<JRTConnection> sources = getSources(); + currentConnection = pickNewConnectionRandomly(sources); log.log(Level.INFO, () -> "Choosing new connection: " + currentConnection); return currentConnection; } + private JRTConnection pickNewConnectionRandomly(List<JRTConnection> sources) { + return sources.get(ThreadLocalRandom.current().nextInt(0, sources.size())); + } + List<JRTConnection> getSources() { List<JRTConnection> ret; synchronized (connections) { @@ -96,7 +108,7 @@ public class JRTConnectionPool implements ConnectionPool { @Override public void setError(Connection connection, int errorCode) { connection.setError(errorCode); - setNewCurrentConnection(); + switchConnection(); } public JRTConnectionPool updateSources(List<String> addresses) { diff --git a/config/src/test/java/com/yahoo/vespa/config/JRTConnectionPoolTest.java b/config/src/test/java/com/yahoo/vespa/config/JRTConnectionPoolTest.java index 6604fe46d3f..56063fd94a5 100644 --- a/config/src/test/java/com/yahoo/vespa/config/JRTConnectionPoolTest.java +++ b/config/src/test/java/com/yahoo/vespa/config/JRTConnectionPoolTest.java @@ -1,10 +1,11 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.vespa.config; import com.yahoo.config.subscription.ConfigSourceSet; import org.junit.Test; import java.util.*; +import java.util.stream.Collectors; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.CoreMatchers.not; @@ -25,11 +26,12 @@ public class JRTConnectionPoolTest { @Test public void test_random_selection_of_sourceBasicHashBasedSelection() { JRTConnectionPool sourcePool = new JRTConnectionPool(sources); - assertThat(sourcePool.toString(), is("Address: host0\nAddress: host1\nAddress: host2\n")); + assertEquals("host0,host1,host2", + sourcePool.getSources().stream().map(JRTConnection::getAddress).collect(Collectors.joining(","))); Map<String, Integer> sourceOccurrences = new HashMap<>(); for (int i = 0; i < 1000; i++) { - final String address = sourcePool.setNewCurrentConnection().getAddress(); + final String address = sourcePool.switchConnection().getAddress(); if (sourceOccurrences.containsKey(address)) { sourceOccurrences.put(address, sourceOccurrences.get(address) + 1); } else { @@ -57,7 +59,7 @@ public class JRTConnectionPoolTest { int count = 1000; for (int i = 0; i < count; i++) { - String address = sourcePool.setNewCurrentConnection().getAddress(); + String address = sourcePool.switchConnection().getAddress(); if (timesUsed.containsKey(address)) { int times = timesUsed.get(address); timesUsed.put(address, times + 1); |