diff options
author | Harald Musum <musum@verizonmedia.com> | 2021-04-28 08:27:55 +0200 |
---|---|---|
committer | Harald Musum <musum@verizonmedia.com> | 2021-04-28 08:27:55 +0200 |
commit | 50945a2b6d44ffd72ca4fb8a6e5259b4ed7acd93 (patch) | |
tree | 5410180703be293ddc1063bdd0bd7072f64bd3a4 | |
parent | e79af49a3159e5505cd3e5f2605c299d38fe40cd (diff) |
Switch connection only when the current one is failing
Subscribers set error on a connection if it fails, so when having
many subscribers we should not switch unless the current connection
is having errors
7 files changed, 61 insertions, 63 deletions
diff --git a/config/src/main/java/com/yahoo/config/subscription/impl/MockConnection.java b/config/src/main/java/com/yahoo/config/subscription/impl/MockConnection.java index e6b81adf787..c234fdda32c 100644 --- a/config/src/main/java/com/yahoo/config/subscription/impl/MockConnection.java +++ b/config/src/main/java/com/yahoo/config/subscription/impl/MockConnection.java @@ -87,7 +87,7 @@ public class MockConnection implements ConnectionPool, Connection { } @Override - public Connection switchConnection() { return this; } + public Connection switchConnection(Connection connection) { return this; } @Override public int getSize() { diff --git a/config/src/main/java/com/yahoo/vespa/config/ConnectionPool.java b/config/src/main/java/com/yahoo/vespa/config/ConnectionPool.java index 786dfa975f4..93135fc4661 100644 --- a/config/src/main/java/com/yahoo/vespa/config/ConnectionPool.java +++ b/config/src/main/java/com/yahoo/vespa/config/ConnectionPool.java @@ -12,7 +12,7 @@ public interface ConnectionPool extends AutoCloseable { /** * Sets the supplied Connection to have an error, implementations are expected to call - * {@link #switchConnection()} after setting state for the supplied Connection. + * {@link #switchConnection(Connection)} after setting state for the supplied Connection. * */ void setError(Connection connection, int i); @@ -25,16 +25,7 @@ public interface ConnectionPool extends AutoCloseable { * * @return a Connection */ - Connection switchConnection(); - - /** - * Sets the current JRTConnection instance by randomly choosing - * from the available sources and returns the result. - * - * @return a Connection - */ - @Deprecated - default Connection setNewCurrentConnection() { return switchConnection(); }; + Connection switchConnection(Connection failingConnection); int getSize(); diff --git a/config/src/main/java/com/yahoo/vespa/config/JRTConnectionPool.java b/config/src/main/java/com/yahoo/vespa/config/JRTConnectionPool.java index 019103c7015..29c05e0a83b 100644 --- a/config/src/main/java/com/yahoo/vespa/config/JRTConnectionPool.java +++ b/config/src/main/java/com/yahoo/vespa/config/JRTConnectionPool.java @@ -66,19 +66,31 @@ public class JRTConnectionPool implements ConnectionPool { } @Override - public synchronized JRTConnection switchConnection() { + public synchronized JRTConnection switchConnection(Connection failingConnection) { List<JRTConnection> sources = getSources(); if (sources.size() <= 1) return currentConnection; - List<JRTConnection> sourceCandidates = sources.stream() - .filter(JRTConnection::isHealthy) - .collect(Collectors.toList()); - JRTConnection newConnection; + if ( ! currentConnection.equals(failingConnection)) return currentConnection; + + return switchConnection(); + } + + /** + * Preconditions: + * 1. the current connection is unhealthy and should not be selected when switching + * 2. There is more than 1 source. + */ + public synchronized JRTConnection switchConnection() { + if (getSources().size() <= 1) throw new IllegalStateException("Cannot switch connection, not enough sources"); + + List<JRTConnection> sourceCandidates = getSources().stream() + .filter(JRTConnection::isHealthy) + .collect(Collectors.toList()); if (sourceCandidates.size() == 0) { sourceCandidates = getSources(); sourceCandidates.remove(currentConnection); } - newConnection = pickNewConnectionRandomly(sourceCandidates); + JRTConnection newConnection = pickNewConnectionRandomly(sourceCandidates); log.log(Level.INFO, () -> "Switching from " + currentConnection + " to " + newConnection); return currentConnection = newConnection; } @@ -106,7 +118,7 @@ public class JRTConnectionPool implements ConnectionPool { @Override public void setError(Connection connection, int errorCode) { connection.setError(errorCode); - switchConnection(); + switchConnection(connection); } public JRTConnectionPool updateSources(List<String> addresses) { diff --git a/config/src/test/java/com/yahoo/vespa/config/JRTConnectionPoolTest.java b/config/src/test/java/com/yahoo/vespa/config/JRTConnectionPoolTest.java index 08f9d0ad31d..8bd88d8958f 100644 --- a/config/src/test/java/com/yahoo/vespa/config/JRTConnectionPoolTest.java +++ b/config/src/test/java/com/yahoo/vespa/config/JRTConnectionPoolTest.java @@ -29,18 +29,15 @@ import static org.junit.Assert.assertTrue; public class JRTConnectionPoolTest { private static final List<String> sources = new ArrayList<>((Arrays.asList("host0", "host1", "host2"))); - /** - * Tests that hash-based selection through the list works. - */ @Test - public void test_random_selection_of_sourceBasicHashBasedSelection() { + public void test_random_selection_of_source() { JRTConnectionPool sourcePool = new JRTConnectionPool(sources); assertEquals("host0,host1,host2", sourcePool.getSources().stream().map(JRTConnection::getAddress).collect(Collectors.joining(","))); Map<String, Integer> sourceOccurrences = new HashMap<>(); for (int i = 0; i < 1000; i++) { - final String address = sourcePool.switchConnection().getAddress(); + String address = sourcePool.switchConnection().getAddress(); if (sourceOccurrences.containsKey(address)) { sourceOccurrences.put(address, sourceOccurrences.get(address) + 1); } else { @@ -60,10 +57,7 @@ public class JRTConnectionPoolTest { public void testManySources() { Map<String, Integer> timesUsed = new LinkedHashMap<>(); - List<String> twoSources = new ArrayList<>(); - - twoSources.add("host0"); - twoSources.add("host1"); + List<String> twoSources = List.of("host0", "host1"); JRTConnectionPool sourcePool = new JRTConnectionPool(twoSources); int count = 1000; @@ -99,10 +93,8 @@ public class JRTConnectionPoolTest { */ @Test public void updateSources() { - List<String> twoSources = new ArrayList<>(); + List<String> twoSources = List.of("host0", "host1"); - twoSources.add("host0"); - twoSources.add("host1"); JRTConnectionPool sourcePool = new JRTConnectionPool(twoSources); ConfigSourceSet sourcesBefore = sourcePool.getSourceSet(); @@ -138,36 +130,39 @@ public class JRTConnectionPoolTest { @Test public void testFailingSources() { - List<String> sources = new ArrayList<>(); - - sources.add("host0"); - sources.add("host1"); - sources.add("host2"); - JRTConnectionPool sourcePool = new JRTConnectionPool(sources); - - Connection firstConnection = sourcePool.getCurrent(); - - // Should change connection away from first connection - sourcePool.setError(firstConnection, 123); - JRTConnection secondConnection = sourcePool.getCurrent(); - assertNotEquals(secondConnection, firstConnection); - - // Should change connection away from first AND second connection - sourcePool.setError(secondConnection, 123); - JRTConnection thirdConnection = sourcePool.getCurrent(); - assertNotEquals(sourcePool.getCurrent(), firstConnection); - assertNotEquals(sourcePool.getCurrent(), secondConnection); - - // Should change connection away from third connection - sourcePool.setError(thirdConnection, 123); - JRTConnection currentConnection = sourcePool.getCurrent(); - assertNotEquals(sourcePool.getCurrent(), thirdConnection); - - // Should change connection from current connection - sourcePool.setError(thirdConnection, 123); - assertNotEquals(sourcePool.getCurrent(), currentConnection); + List<String> sources = List.of("host0", "host1", "host2"); + JRTConnectionPool connectionPool = new JRTConnectionPool(sources); + + Connection firstConnection = connectionPool.getCurrent(); + + // Should change connection, not getting first connection as new + JRTConnection secondConnection = failAndGetNewConnection(connectionPool, firstConnection); + assertNotEquals(firstConnection, secondConnection); + + // Should change connection, , not getting first or seconds connection as new + JRTConnection thirdConnection = failAndGetNewConnection(connectionPool, secondConnection); + // Fail a few more times with old connection, as will happen when there are multiple subscribers + // Connection should not change + assertEquals(thirdConnection, failAndGetNewConnection(connectionPool, secondConnection)); + assertEquals(thirdConnection, failAndGetNewConnection(connectionPool, secondConnection)); + assertEquals(thirdConnection, failAndGetNewConnection(connectionPool, secondConnection)); + assertNotEquals(firstConnection, thirdConnection); + assertNotEquals(secondConnection, thirdConnection); + + // Should change connection, not getting third connection as new + JRTConnection currentConnection = failAndGetNewConnection(connectionPool, thirdConnection); + assertNotEquals(thirdConnection, currentConnection); + + // Should change connection, not getting current connection as new + JRTConnection currentConnection2 = failAndGetNewConnection(connectionPool, currentConnection); + assertNotEquals(currentConnection, currentConnection2); + + connectionPool.close(); + } - sourcePool.close(); + private JRTConnection failAndGetNewConnection(JRTConnectionPool connectionPool, Connection failingConnection) { + connectionPool.setError(failingConnection, 123); + return connectionPool.getCurrent(); } } diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileDistributionUtil.java b/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileDistributionUtil.java index 8dc9272b1b7..33cd425d6aa 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileDistributionUtil.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileDistributionUtil.java @@ -79,7 +79,7 @@ public class FileDistributionUtil { public Connection getCurrent() { return null; } @Override - public Connection switchConnection() { return null; } + public Connection switchConnection(Connection connection) { return null; } @Override public int getSize() { return 0; } diff --git a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java index 05fbd457a0d..da9d4ceab88 100644 --- a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java +++ b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java @@ -127,7 +127,7 @@ public class FileReferenceDownloader { return true; } else { log.log(logLevel, "File reference '" + fileReference + "' not found at " + connection.getAddress()); - connectionPool.switchConnection(); + connectionPool.switchConnection(connection); return false; } } else { diff --git a/filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileDownloaderTest.java b/filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileDownloaderTest.java index 1344b7afbb3..d6bae088ee2 100644 --- a/filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileDownloaderTest.java +++ b/filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileDownloaderTest.java @@ -341,7 +341,7 @@ public class FileDownloaderTest { } @Override - public Connection switchConnection() { + public Connection switchConnection(Connection connection) { return this; } |