summaryrefslogtreecommitdiffstats
path: root/config
diff options
context:
space:
mode:
authorHarald Musum <musum@verizonmedia.com>2021-04-28 08:27:55 +0200
committerHarald Musum <musum@verizonmedia.com>2021-04-28 08:27:55 +0200
commit50945a2b6d44ffd72ca4fb8a6e5259b4ed7acd93 (patch)
tree5410180703be293ddc1063bdd0bd7072f64bd3a4 /config
parente79af49a3159e5505cd3e5f2605c299d38fe40cd (diff)
Switch connection only when the current one is failing
Subscribers set error on a connection if it fails, so when having many subscribers we should not switch unless the current connection is having errors
Diffstat (limited to 'config')
-rw-r--r--config/src/main/java/com/yahoo/config/subscription/impl/MockConnection.java2
-rw-r--r--config/src/main/java/com/yahoo/vespa/config/ConnectionPool.java13
-rw-r--r--config/src/main/java/com/yahoo/vespa/config/JRTConnectionPool.java26
-rw-r--r--config/src/test/java/com/yahoo/vespa/config/JRTConnectionPoolTest.java77
4 files changed, 58 insertions, 60 deletions
diff --git a/config/src/main/java/com/yahoo/config/subscription/impl/MockConnection.java b/config/src/main/java/com/yahoo/config/subscription/impl/MockConnection.java
index e6b81adf787..c234fdda32c 100644
--- a/config/src/main/java/com/yahoo/config/subscription/impl/MockConnection.java
+++ b/config/src/main/java/com/yahoo/config/subscription/impl/MockConnection.java
@@ -87,7 +87,7 @@ public class MockConnection implements ConnectionPool, Connection {
}
@Override
- public Connection switchConnection() { return this; }
+ public Connection switchConnection(Connection connection) { return this; }
@Override
public int getSize() {
diff --git a/config/src/main/java/com/yahoo/vespa/config/ConnectionPool.java b/config/src/main/java/com/yahoo/vespa/config/ConnectionPool.java
index 786dfa975f4..93135fc4661 100644
--- a/config/src/main/java/com/yahoo/vespa/config/ConnectionPool.java
+++ b/config/src/main/java/com/yahoo/vespa/config/ConnectionPool.java
@@ -12,7 +12,7 @@ public interface ConnectionPool extends AutoCloseable {
/**
* Sets the supplied Connection to have an error, implementations are expected to call
- * {@link #switchConnection()} after setting state for the supplied Connection.
+ * {@link #switchConnection(Connection)} after setting state for the supplied Connection.
*
*/
void setError(Connection connection, int i);
@@ -25,16 +25,7 @@ public interface ConnectionPool extends AutoCloseable {
*
* @return a Connection
*/
- Connection switchConnection();
-
- /**
- * Sets the current JRTConnection instance by randomly choosing
- * from the available sources and returns the result.
- *
- * @return a Connection
- */
- @Deprecated
- default Connection setNewCurrentConnection() { return switchConnection(); };
+ Connection switchConnection(Connection failingConnection);
int getSize();
diff --git a/config/src/main/java/com/yahoo/vespa/config/JRTConnectionPool.java b/config/src/main/java/com/yahoo/vespa/config/JRTConnectionPool.java
index 019103c7015..29c05e0a83b 100644
--- a/config/src/main/java/com/yahoo/vespa/config/JRTConnectionPool.java
+++ b/config/src/main/java/com/yahoo/vespa/config/JRTConnectionPool.java
@@ -66,19 +66,31 @@ public class JRTConnectionPool implements ConnectionPool {
}
@Override
- public synchronized JRTConnection switchConnection() {
+ public synchronized JRTConnection switchConnection(Connection failingConnection) {
List<JRTConnection> sources = getSources();
if (sources.size() <= 1) return currentConnection;
- List<JRTConnection> sourceCandidates = sources.stream()
- .filter(JRTConnection::isHealthy)
- .collect(Collectors.toList());
- JRTConnection newConnection;
+ if ( ! currentConnection.equals(failingConnection)) return currentConnection;
+
+ return switchConnection();
+ }
+
+ /**
+ * Preconditions:
+ * 1. the current connection is unhealthy and should not be selected when switching
+ * 2. There is more than 1 source.
+ */
+ public synchronized JRTConnection switchConnection() {
+ if (getSources().size() <= 1) throw new IllegalStateException("Cannot switch connection, not enough sources");
+
+ List<JRTConnection> sourceCandidates = getSources().stream()
+ .filter(JRTConnection::isHealthy)
+ .collect(Collectors.toList());
if (sourceCandidates.size() == 0) {
sourceCandidates = getSources();
sourceCandidates.remove(currentConnection);
}
- newConnection = pickNewConnectionRandomly(sourceCandidates);
+ JRTConnection newConnection = pickNewConnectionRandomly(sourceCandidates);
log.log(Level.INFO, () -> "Switching from " + currentConnection + " to " + newConnection);
return currentConnection = newConnection;
}
@@ -106,7 +118,7 @@ public class JRTConnectionPool implements ConnectionPool {
@Override
public void setError(Connection connection, int errorCode) {
connection.setError(errorCode);
- switchConnection();
+ switchConnection(connection);
}
public JRTConnectionPool updateSources(List<String> addresses) {
diff --git a/config/src/test/java/com/yahoo/vespa/config/JRTConnectionPoolTest.java b/config/src/test/java/com/yahoo/vespa/config/JRTConnectionPoolTest.java
index 08f9d0ad31d..8bd88d8958f 100644
--- a/config/src/test/java/com/yahoo/vespa/config/JRTConnectionPoolTest.java
+++ b/config/src/test/java/com/yahoo/vespa/config/JRTConnectionPoolTest.java
@@ -29,18 +29,15 @@ import static org.junit.Assert.assertTrue;
public class JRTConnectionPoolTest {
private static final List<String> sources = new ArrayList<>((Arrays.asList("host0", "host1", "host2")));
- /**
- * Tests that hash-based selection through the list works.
- */
@Test
- public void test_random_selection_of_sourceBasicHashBasedSelection() {
+ public void test_random_selection_of_source() {
JRTConnectionPool sourcePool = new JRTConnectionPool(sources);
assertEquals("host0,host1,host2",
sourcePool.getSources().stream().map(JRTConnection::getAddress).collect(Collectors.joining(",")));
Map<String, Integer> sourceOccurrences = new HashMap<>();
for (int i = 0; i < 1000; i++) {
- final String address = sourcePool.switchConnection().getAddress();
+ String address = sourcePool.switchConnection().getAddress();
if (sourceOccurrences.containsKey(address)) {
sourceOccurrences.put(address, sourceOccurrences.get(address) + 1);
} else {
@@ -60,10 +57,7 @@ public class JRTConnectionPoolTest {
public void testManySources() {
Map<String, Integer> timesUsed = new LinkedHashMap<>();
- List<String> twoSources = new ArrayList<>();
-
- twoSources.add("host0");
- twoSources.add("host1");
+ List<String> twoSources = List.of("host0", "host1");
JRTConnectionPool sourcePool = new JRTConnectionPool(twoSources);
int count = 1000;
@@ -99,10 +93,8 @@ public class JRTConnectionPoolTest {
*/
@Test
public void updateSources() {
- List<String> twoSources = new ArrayList<>();
+ List<String> twoSources = List.of("host0", "host1");
- twoSources.add("host0");
- twoSources.add("host1");
JRTConnectionPool sourcePool = new JRTConnectionPool(twoSources);
ConfigSourceSet sourcesBefore = sourcePool.getSourceSet();
@@ -138,36 +130,39 @@ public class JRTConnectionPoolTest {
@Test
public void testFailingSources() {
- List<String> sources = new ArrayList<>();
-
- sources.add("host0");
- sources.add("host1");
- sources.add("host2");
- JRTConnectionPool sourcePool = new JRTConnectionPool(sources);
-
- Connection firstConnection = sourcePool.getCurrent();
-
- // Should change connection away from first connection
- sourcePool.setError(firstConnection, 123);
- JRTConnection secondConnection = sourcePool.getCurrent();
- assertNotEquals(secondConnection, firstConnection);
-
- // Should change connection away from first AND second connection
- sourcePool.setError(secondConnection, 123);
- JRTConnection thirdConnection = sourcePool.getCurrent();
- assertNotEquals(sourcePool.getCurrent(), firstConnection);
- assertNotEquals(sourcePool.getCurrent(), secondConnection);
-
- // Should change connection away from third connection
- sourcePool.setError(thirdConnection, 123);
- JRTConnection currentConnection = sourcePool.getCurrent();
- assertNotEquals(sourcePool.getCurrent(), thirdConnection);
-
- // Should change connection from current connection
- sourcePool.setError(thirdConnection, 123);
- assertNotEquals(sourcePool.getCurrent(), currentConnection);
+ List<String> sources = List.of("host0", "host1", "host2");
+ JRTConnectionPool connectionPool = new JRTConnectionPool(sources);
+
+ Connection firstConnection = connectionPool.getCurrent();
+
+ // Should change connection, not getting first connection as new
+ JRTConnection secondConnection = failAndGetNewConnection(connectionPool, firstConnection);
+ assertNotEquals(firstConnection, secondConnection);
+
+ // Should change connection, , not getting first or seconds connection as new
+ JRTConnection thirdConnection = failAndGetNewConnection(connectionPool, secondConnection);
+ // Fail a few more times with old connection, as will happen when there are multiple subscribers
+ // Connection should not change
+ assertEquals(thirdConnection, failAndGetNewConnection(connectionPool, secondConnection));
+ assertEquals(thirdConnection, failAndGetNewConnection(connectionPool, secondConnection));
+ assertEquals(thirdConnection, failAndGetNewConnection(connectionPool, secondConnection));
+ assertNotEquals(firstConnection, thirdConnection);
+ assertNotEquals(secondConnection, thirdConnection);
+
+ // Should change connection, not getting third connection as new
+ JRTConnection currentConnection = failAndGetNewConnection(connectionPool, thirdConnection);
+ assertNotEquals(thirdConnection, currentConnection);
+
+ // Should change connection, not getting current connection as new
+ JRTConnection currentConnection2 = failAndGetNewConnection(connectionPool, currentConnection);
+ assertNotEquals(currentConnection, currentConnection2);
+
+ connectionPool.close();
+ }
- sourcePool.close();
+ private JRTConnection failAndGetNewConnection(JRTConnectionPool connectionPool, Connection failingConnection) {
+ connectionPool.setError(failingConnection, 123);
+ return connectionPool.getCurrent();
}
}