diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2020-02-20 10:41:42 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-02-20 10:41:42 +0100 |
commit | bbf4920054e3da58a71baa578df7786495df040a (patch) | |
tree | cec5154bf4db87b1ead41a5a15a996c197d31807 /config | |
parent | 1300c1c29066f1be5abf3a7f5cb1cbdc35ebe0dc (diff) | |
parent | 23eca521e7d728bb1e96b1e899e8a046cfd74bda (diff) |
Merge pull request #12271 from vespa-engine/balder/connectionpool-repo
Use a static connectionpool repo to reduce number of threads.
Diffstat (limited to 'config')
11 files changed, 131 insertions, 91 deletions
diff --git a/config/src/main/java/com/yahoo/config/subscription/ConfigSourceSet.java b/config/src/main/java/com/yahoo/config/subscription/ConfigSourceSet.java index c799186435c..7472439d6a4 100755 --- a/config/src/main/java/com/yahoo/config/subscription/ConfigSourceSet.java +++ b/config/src/main/java/com/yahoo/config/subscription/ConfigSourceSet.java @@ -3,7 +3,11 @@ package com.yahoo.config.subscription; import com.yahoo.log.LogLevel; -import java.util.*; +import java.util.Arrays; +import java.util.Collections; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Set; import java.util.logging.Logger; diff --git a/config/src/main/java/com/yahoo/config/subscription/impl/JRTConfigRequester.java b/config/src/main/java/com/yahoo/config/subscription/impl/JRTConfigRequester.java index 6d08976b61c..49c5dcd343c 100644 --- a/config/src/main/java/com/yahoo/config/subscription/impl/JRTConfigRequester.java +++ b/config/src/main/java/com/yahoo/config/subscription/impl/JRTConfigRequester.java @@ -20,7 +20,6 @@ import com.yahoo.yolean.Exceptions; import java.time.Duration; import java.time.Instant; import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; @@ -37,15 +36,17 @@ public class JRTConfigRequester implements RequestWaiter { private static final Logger log = Logger.getLogger(JRTConfigRequester.class.getName()); public static final ConfigSourceSet defaultSourceSet = ConfigSourceSet.createDefault(); + private static final JRTManagedConnectionPools managedPool = new JRTManagedConnectionPools(); private static final int TRACELEVEL = 6; private final TimingValues timingValues; private int fatalFailures = 0; // independent of transientFailures private int transientFailures = 0; // independent of fatalFailures - private final ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(1, new JRTSourceThreadFactory()); + private final ScheduledThreadPoolExecutor scheduler; private Instant suspendWarningLogged = Instant.MIN; private Instant noApplicationWarningLogged = Instant.MIN; private static final Duration delayBetweenWarnings = Duration.ofSeconds(60); private final ConnectionPool connectionPool; + private final ConfigSourceSet configSourceSet; static final float randomFraction = 0.2f; /* Time to be added to server timeout to create client timeout. This is the time allowed for the server to respond after serverTimeout has elapsed. */ private static final Double additionalTimeForClientTimeout = 10.0; @@ -56,11 +57,23 @@ public class JRTConfigRequester implements RequestWaiter { * @param connectionPool the connectionPool this requester should use * @param timingValues timeouts and delays used when sending JRT config requests */ - public JRTConfigRequester(ConnectionPool connectionPool, TimingValues timingValues) { + JRTConfigRequester(ConfigSourceSet configSourceSet, ScheduledThreadPoolExecutor scheduler, + ConnectionPool connectionPool, TimingValues timingValues) { + this.configSourceSet = configSourceSet; + this.scheduler = scheduler; this.connectionPool = connectionPool; this.timingValues = timingValues; } + /** Only for testing */ + public JRTConfigRequester(ConnectionPool connectionPool, TimingValues timingValues) { + this(null, new ScheduledThreadPoolExecutor(1), connectionPool, timingValues); + } + + public static JRTConfigRequester create(ConfigSourceSet sourceSet, TimingValues timingValues) { + return managedPool.acquire(sourceSet, timingValues); + } + /** * Requests the config for the {@link com.yahoo.config.ConfigInstance} on the given {@link ConfigSubscription} * @@ -273,18 +286,8 @@ public class JRTConfigRequester implements RequestWaiter { // Fake that we have logged to avoid printing warnings after this suspendWarningLogged = Instant.now(); noApplicationWarningLogged = Instant.now(); - - connectionPool.close(); - scheduler.shutdown(); - } - - private static class JRTSourceThreadFactory implements ThreadFactory { - @Override - public Thread newThread(Runnable runnable) { - Thread t = new Thread(runnable, String.format("jrt-config-requester-%d", System.currentTimeMillis())); - // We want a daemon thread to avoid hanging threads in case something goes wrong in the config system - t.setDaemon(true); - return t; + if (configSourceSet != null) { + managedPool.release(configSourceSet); } } diff --git a/config/src/main/java/com/yahoo/config/subscription/impl/JRTConfigSubscription.java b/config/src/main/java/com/yahoo/config/subscription/impl/JRTConfigSubscription.java index 39e6c69f539..a94a135f9d8 100644 --- a/config/src/main/java/com/yahoo/config/subscription/impl/JRTConfigSubscription.java +++ b/config/src/main/java/com/yahoo/config/subscription/impl/JRTConfigSubscription.java @@ -28,7 +28,7 @@ import com.yahoo.vespa.config.protocol.Payload; public class JRTConfigSubscription<T extends ConfigInstance> extends ConfigSubscription<T> { private JRTConfigRequester requester; - private TimingValues timingValues; + private final TimingValues timingValues; // Last time we got an OK JRT callback private Instant lastOK = Instant.MIN; @@ -156,7 +156,7 @@ public class JRTConfigSubscription<T extends ConfigInstance> extends ConfigSubsc private JRTConfigRequester getRequester() { JRTConfigRequester requester = subscriber.requesters().get(sources); if (requester == null) { - requester = new JRTConfigRequester(new JRTConnectionPool(sources), timingValues); + requester = JRTConfigRequester.create(sources, timingValues); subscriber.requesters().put(sources, requester); } return requester; diff --git a/config/src/main/java/com/yahoo/config/subscription/impl/JRTManagedConnectionPools.java b/config/src/main/java/com/yahoo/config/subscription/impl/JRTManagedConnectionPools.java new file mode 100644 index 00000000000..0a606416827 --- /dev/null +++ b/config/src/main/java/com/yahoo/config/subscription/impl/JRTManagedConnectionPools.java @@ -0,0 +1,63 @@ +package com.yahoo.config.subscription.impl; + +import com.yahoo.config.subscription.ConfigSourceSet; +import com.yahoo.vespa.config.JRTConnectionPool; +import com.yahoo.vespa.config.TimingValues; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; + +public class JRTManagedConnectionPools { + private static class JRTSourceThreadFactory implements ThreadFactory { + @Override + public Thread newThread(Runnable runnable) { + Thread t = new Thread(runnable, String.format("jrt-config-requester-%d", System.currentTimeMillis())); + // We want a daemon thread to avoid hanging threads in case something goes wrong in the config system + t.setDaemon(true); + return t; + } + } + private static class CountedPool { + long count; + final JRTConnectionPool pool; + final ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(1, new JRTSourceThreadFactory()); + CountedPool(JRTConnectionPool requester) { + this.pool = requester; + count = 0; + } + } + private Map<ConfigSourceSet, CountedPool> pools = new HashMap<>(); + + public JRTConfigRequester acquire(ConfigSourceSet sourceSet, TimingValues timingValues) { + CountedPool countedPool; + synchronized (pools) { + countedPool = pools.get(sourceSet); + if (countedPool == null) { + countedPool = new CountedPool(new JRTConnectionPool(sourceSet)); + pools.put(sourceSet, countedPool); + } + countedPool.count++; + } + return new JRTConfigRequester(sourceSet, countedPool.scheduler, countedPool.pool, timingValues); + } + public synchronized void release(ConfigSourceSet sourceSet) { + CountedPool countedPool; + synchronized (pools) { + countedPool = pools.get(sourceSet); + countedPool.count--; + if (countedPool.count > 0) return; + pools.remove(sourceSet); + } + + countedPool.pool.close(); + countedPool.scheduler.shutdown(); + try { + countedPool.scheduler.awaitTermination(30, TimeUnit.SECONDS); + } catch (InterruptedException e) { + throw new RuntimeException("Failed shutting down scheduler:", e); + } + } +} diff --git a/config/src/main/java/com/yahoo/vespa/config/JRTConnectionPool.java b/config/src/main/java/com/yahoo/vespa/config/JRTConnectionPool.java index a4da996effd..326c1287468 100644 --- a/config/src/main/java/com/yahoo/vespa/config/JRTConnectionPool.java +++ b/config/src/main/java/com/yahoo/vespa/config/JRTConnectionPool.java @@ -112,19 +112,6 @@ public class JRTConnectionPool implements ConnectionPool { return this; } - public String getAllSourceAddresses() { - StringBuilder sb = new StringBuilder(); - synchronized (connections) { - for (JRTConnection conn : connections.values()) { - sb.append(conn.getAddress()); - sb.append(","); - } - } - // Remove trailing "," - sb.deleteCharAt(sb.length() - 1); - return sb.toString(); - } - public String toString() { StringBuilder sb = new StringBuilder(); synchronized (connections) { diff --git a/config/src/main/java/com/yahoo/vespa/config/TimingValues.java b/config/src/main/java/com/yahoo/vespa/config/TimingValues.java index 780cf657009..5d5967e56c4 100644 --- a/config/src/main/java/com/yahoo/vespa/config/TimingValues.java +++ b/config/src/main/java/com/yahoo/vespa/config/TimingValues.java @@ -12,12 +12,11 @@ public class TimingValues { public static final long defaultNextConfigTimeout = 1000; // See getters below for an explanation of how these values are used and interpreted // All time values in milliseconds. - private long successTimeout = 600000; - private long errorTimeout = 20000; - private long initialTimeout = 15000; + private final long successTimeout; + private final long errorTimeout; + private final long initialTimeout; private long subscribeTimeout = 55000; private long configuredErrorTimeout = -1; // Don't ever timeout (and do not use error response) when we are already configured - private long nextConfigTimeout = defaultNextConfigTimeout; private long fixedDelay = 5000; private long unconfiguredDelay = 1000; @@ -26,6 +25,9 @@ public class TimingValues { private final Random rand; public TimingValues() { + successTimeout = 600000; + errorTimeout = 20000; + initialTimeout = 15000; this.rand = new Random(System.currentTimeMillis()); } @@ -100,20 +102,6 @@ public class TimingValues { } /** - * Returns initial timeout to use as server timeout when a config is requested for the first time. - * - * @return timeout in milliseconds. - */ - public long getInitialTimeout() { - return initialTimeout; - } - - public TimingValues setInitialTimeout(long t) { - initialTimeout = t; - return this; - } - - /** * Returns timeout to use as server timeout when subscribing for the first time. * * @return timeout in milliseconds. @@ -127,38 +115,12 @@ public class TimingValues { return this; } - /** - * Returns the time to retry getting config from the remote sources, until the next error response will - * be set as config. Counted from the last ok request was received. A negative value means that - * we will always retry getting config and never set an error response as config. - * - * @return timeout in milliseconds. - */ - public long getConfiguredErrorTimeout() { - return configuredErrorTimeout; - } - public TimingValues setConfiguredErrorTimeout(long t) { configuredErrorTimeout = t; return this; } /** - * Returns timeout used when calling {@link com.yahoo.config.subscription.ConfigSubscriber#nextConfig()} or - * {@link com.yahoo.config.subscription.ConfigSubscriber#nextGeneration()} - * - * @return timeout in milliseconds. - */ - public long getNextConfigTimeout() { - return nextConfigTimeout; - } - - public TimingValues setNextConfigTimeout(long t) { - nextConfigTimeout = t; - return this; - } - - /** * Returns time to wait until next attempt to get config after a failed request when the client has not * gotten a successful response to a config subscription (i.e, the client has not been configured). * A negative value means that there will never be a next attempt. If a negative value is set, the @@ -201,12 +163,6 @@ public class TimingValues { return maxDelayMultiplier; } - - public TimingValues setSuccessTimeout(long successTimeout) { - this.successTimeout = successTimeout; - return this; - } - /** * Returns fixed delay that is used when retrying getting config no matter if it was a success or an error * and independent of number of retries. @@ -228,10 +184,6 @@ public class TimingValues { return Math.round(val - (val * fraction) + (rand.nextFloat() * 2L * val * fraction)); } - Random getRandom() { - return rand; - } - @Override public String toString() { return "TimingValues [successTimeout=" + successTimeout diff --git a/config/src/test/java/com/yahoo/config/subscription/BasicTest.java b/config/src/test/java/com/yahoo/config/subscription/BasicTest.java index 3b8b7db6487..5b145d40b7f 100644 --- a/config/src/test/java/com/yahoo/config/subscription/BasicTest.java +++ b/config/src/test/java/com/yahoo/config/subscription/BasicTest.java @@ -1,13 +1,13 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.config.subscription; -import static org.junit.Assert.*; -import static org.hamcrest.CoreMatchers.is; import com.yahoo.foo.AppConfig; import org.junit.Test; +import static org.junit.Assert.assertEquals; + public class BasicTest { @@ -17,7 +17,8 @@ public class BasicTest { ConfigHandle<AppConfig> h = s.subscribe(AppConfig.class, "raw:times 0"); s.nextConfig(0); AppConfig c = h.getConfig(); - assertThat(c.times(), is(0)); + assertEquals(0, c.times()); + s.close(); } @Test @@ -26,6 +27,7 @@ public class BasicTest { ConfigHandle<AppConfig> h = s.subscribe(AppConfig.class, "raw:times 2"); s.nextGeneration(0); AppConfig c = h.getConfig(); - assertThat(c.times(), is(2)); + assertEquals(2, c.times()); + s.close(); } } diff --git a/config/src/test/java/com/yahoo/config/subscription/ConfigSetSubscriptionTest.java b/config/src/test/java/com/yahoo/config/subscription/ConfigSetSubscriptionTest.java index 21cdfbe7d30..db30e7b7389 100644 --- a/config/src/test/java/com/yahoo/config/subscription/ConfigSetSubscriptionTest.java +++ b/config/src/test/java/com/yahoo/config/subscription/ConfigSetSubscriptionTest.java @@ -128,6 +128,7 @@ public class ConfigSetSubscriptionTest { assertEquals(hA0.getConfig().times(), 8800); assertEquals(hA1.getConfig().times(), 890); assertEquals(hS.getConfig().stringVal(), "new StringVal"); + subscriber.close(); } @Test diff --git a/config/src/test/java/com/yahoo/config/subscription/ConfigSubscriptionTest.java b/config/src/test/java/com/yahoo/config/subscription/ConfigSubscriptionTest.java index 933a9fd130a..c8d4c081fc9 100644 --- a/config/src/test/java/com/yahoo/config/subscription/ConfigSubscriptionTest.java +++ b/config/src/test/java/com/yahoo/config/subscription/ConfigSubscriptionTest.java @@ -60,6 +60,7 @@ public class ConfigSubscriptionTest { assertEquals(c1, c1); assertNotEquals(c1, c2); + sub.close(); } @Test @@ -70,6 +71,7 @@ public class ConfigSubscriptionTest { sub.nextConfig(); assertTrue(handle.getConfig().boolval()); //assertTrue(sub.getSource() instanceof RawSource); + sub.close(); } // Test that subscription is closed and subscriptionHandles is empty if we get an exception diff --git a/config/src/test/java/com/yahoo/config/subscription/GenericConfigSubscriberTest.java b/config/src/test/java/com/yahoo/config/subscription/GenericConfigSubscriberTest.java index e9dc9cf7b98..9c83f2f3c9a 100644 --- a/config/src/test/java/com/yahoo/config/subscription/GenericConfigSubscriberTest.java +++ b/config/src/test/java/com/yahoo/config/subscription/GenericConfigSubscriberTest.java @@ -49,14 +49,17 @@ public class GenericConfigSubscriberTest { public void testGenericRequesterPooling() { ConfigSourceSet source1 = new ConfigSourceSet("tcp/foo:78"); ConfigSourceSet source2 = new ConfigSourceSet("tcp/bar:79"); - JRTConfigRequester req1 = new JRTConfigRequester(new JRTConnectionPool(source1), JRTConfigRequesterTest.getTestTimingValues()); - JRTConfigRequester req2 = new JRTConfigRequester(new JRTConnectionPool(source2), JRTConfigRequesterTest.getTestTimingValues()); + JRTConfigRequester req1 = JRTConfigRequester.create(source1, JRTConfigRequesterTest.getTestTimingValues()); + JRTConfigRequester req2 = JRTConfigRequester.create(source2, JRTConfigRequesterTest.getTestTimingValues()); Map<ConfigSourceSet, JRTConfigRequester> requesters = new LinkedHashMap<>(); requesters.put(source1, req1); requesters.put(source2, req2); GenericConfigSubscriber sub = new GenericConfigSubscriber(requesters); assertEquals(sub.requesters().get(source1).getConnectionPool().getCurrent().getAddress(), "tcp/foo:78"); assertEquals(sub.requesters().get(source2).getConnectionPool().getCurrent().getAddress(), "tcp/bar:79"); + for (JRTConfigRequester requester : requesters.values()) { + requester.close(); + } } @Test(expected=UnsupportedOperationException.class) diff --git a/config/src/test/java/com/yahoo/config/subscription/impl/JRTConfigRequesterTest.java b/config/src/test/java/com/yahoo/config/subscription/impl/JRTConfigRequesterTest.java index 757dd99f43b..4211345dff7 100644 --- a/config/src/test/java/com/yahoo/config/subscription/impl/JRTConfigRequesterTest.java +++ b/config/src/test/java/com/yahoo/config/subscription/impl/JRTConfigRequesterTest.java @@ -1,10 +1,12 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.config.subscription.impl; +import com.yahoo.config.subscription.ConfigSourceSet; import com.yahoo.foo.SimpletypesConfig; import com.yahoo.config.subscription.ConfigSubscriber; import com.yahoo.jrt.Request; import com.yahoo.vespa.config.ConfigKey; +import com.yahoo.vespa.config.ConnectionPool; import com.yahoo.vespa.config.ErrorCode; import com.yahoo.vespa.config.ErrorType; import com.yahoo.vespa.config.TimingValues; @@ -17,6 +19,8 @@ import java.util.Random; import static org.hamcrest.CoreMatchers.is; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNotSame; +import static org.junit.Assert.assertSame; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; @@ -349,4 +353,23 @@ public class JRTConfigRequesterTest { } } + @Test + public void testManagedPool() { + ConfigSourceSet sourceSet = ConfigSourceSet.createDefault(); + TimingValues timingValues = new TimingValues(); + JRTConfigRequester requester1 = JRTConfigRequester.create(sourceSet, timingValues); + JRTConfigRequester requester2 = JRTConfigRequester.create(sourceSet, timingValues); + assertNotSame(requester1, requester2); + assertSame(requester1.getConnectionPool(), requester2.getConnectionPool()); + ConnectionPool firstPool = requester1.getConnectionPool(); + requester1.close(); + requester2.close(); + requester1 = JRTConfigRequester.create(sourceSet, timingValues); + assertNotSame(firstPool, requester1.getConnectionPool()); + requester2 = JRTConfigRequester.create(new ConfigSourceSet("test-managed-pool-2"), timingValues); + assertNotSame(requester1.getConnectionPool(), requester2.getConnectionPool()); + requester1.close(); + requester2.close(); + } + } |