From 1583971111661c5edb1170309a61c485c102f95b Mon Sep 17 00:00:00 2001 From: Harald Musum Date: Sat, 4 Dec 2021 12:18:13 +0100 Subject: Revert "Config subscription refactoring, part 5 [run-systemtest]" --- .../vespa/config/proxy/RpcConfigSourceClient.java | 12 ++-- .../com/yahoo/vespa/config/proxy/Subscriber.java | 20 ++++-- config/abi-spec.json | 5 +- .../config/subscription/ConfigSubscriber.java | 33 +++++++-- .../subscription/impl/ConfigSubscription.java | 8 +-- .../subscription/impl/GenericConfigSubscriber.java | 15 +++-- .../impl/GenericJRTConfigSubscription.java | 7 +- .../subscription/impl/JRTConfigSubscription.java | 21 +++++- .../subscription/impl/JrtConfigRequesters.java | 38 ----------- .../subscription/ConfigSetSubscriptionTest.java | 10 +-- .../subscription/ConfigSubscriptionTest.java | 18 +++-- .../subscription/GenericConfigSubscriberTest.java | 34 ++++++++-- .../impl/FileConfigSubscriptionTest.java | 7 +- .../subscription/impl/JRTConfigRequesterTest.java | 78 ++++++++++++---------- .../protocol/JRTConfigRequestFactoryTest.java | 11 ++- .../config/protocol/JRTConfigRequestV3Test.java | 18 ++--- 16 files changed, 188 insertions(+), 147 deletions(-) delete mode 100644 config/src/main/java/com/yahoo/config/subscription/impl/JrtConfigRequesters.java diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/RpcConfigSourceClient.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/RpcConfigSourceClient.java index ab85c534251..5df7b1fc021 100644 --- a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/RpcConfigSourceClient.java +++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/RpcConfigSourceClient.java @@ -4,7 +4,7 @@ package com.yahoo.vespa.config.proxy; import com.yahoo.concurrent.DaemonThreadFactory; import com.yahoo.config.ConfigurationRuntimeException; import com.yahoo.config.subscription.ConfigSourceSet; -import com.yahoo.config.subscription.impl.JrtConfigRequesters; +import com.yahoo.config.subscription.impl.JRTConfigRequester; import com.yahoo.jrt.Request; import com.yahoo.jrt.Spec; import com.yahoo.jrt.Supervisor; @@ -51,7 +51,7 @@ class RpcConfigSourceClient implements ConfigSourceClient, Runnable { private final ScheduledExecutorService nextConfigScheduler = Executors.newScheduledThreadPool(1, new DaemonThreadFactory("next config")); private final ScheduledFuture nextConfigFuture; - private final JrtConfigRequesters requesters; + private final JRTConfigRequester requester; // Scheduled executor that periodically checks for requests that have timed out and response should be returned to clients private final ScheduledExecutorService delayedResponsesScheduler = Executors.newScheduledThreadPool(1, new DaemonThreadFactory("delayed responses")); @@ -64,7 +64,7 @@ class RpcConfigSourceClient implements ConfigSourceClient, Runnable { this.delayedResponses = new DelayedResponses(); checkConfigSources(); nextConfigFuture = nextConfigScheduler.scheduleAtFixedRate(this, 0, 10, MILLISECONDS); - this.requesters = new JrtConfigRequesters(); + this.requester = JRTConfigRequester.create(configSourceSet, timingValues); DelayedResponseHandler command = new DelayedResponseHandler(delayedResponses, memoryCache, rpcServer); this.delayedResponsesFuture = delayedResponsesScheduler.scheduleAtFixedRate(command, 5, 1, SECONDS); } @@ -142,7 +142,7 @@ class RpcConfigSourceClient implements ConfigSourceClient, Runnable { if (activeSubscribers.containsKey(configCacheKey)) return; log.log(Level.FINE, () -> "Could not find good config in cache, creating subscriber for: " + configCacheKey); - var subscriber = new Subscriber(input, timingValues, requesters.getRequester(configSourceSet, timingValues)); + var subscriber = new Subscriber(input, configSourceSet, timingValues, requester); try { subscriber.subscribe(); activeSubscribers.put(configCacheKey, subscriber); @@ -187,12 +187,12 @@ class RpcConfigSourceClient implements ConfigSourceClient, Runnable { log.log(Level.FINE, "nextConfigScheduler.shutdownNow"); nextConfigScheduler.shutdownNow(); log.log(Level.FINE, "requester.close"); - requesters.close(); + requester.close(); } @Override public String getActiveSourceConnection() { - return requesters.getRequester(configSourceSet, timingValues).getConnectionPool().getCurrent().getAddress(); + return requester.getConnectionPool().getCurrent().getAddress(); } @Override diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/Subscriber.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/Subscriber.java index b407c0e7e76..70ff4456f6c 100644 --- a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/Subscriber.java +++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/Subscriber.java @@ -1,6 +1,7 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.vespa.config.proxy; +import com.yahoo.config.subscription.ConfigSourceSet; import com.yahoo.config.subscription.impl.GenericConfigHandle; import com.yahoo.config.subscription.impl.GenericConfigSubscriber; import com.yahoo.config.subscription.impl.JRTConfigRequester; @@ -9,6 +10,7 @@ import com.yahoo.vespa.config.RawConfig; import com.yahoo.vespa.config.TimingValues; import com.yahoo.yolean.Exceptions; +import java.util.Map; import java.util.Optional; import java.util.logging.Level; import java.util.logging.Logger; @@ -21,20 +23,22 @@ public class Subscriber { private final static Logger log = Logger.getLogger(Subscriber.class.getName()); private final RawConfig config; + private final ConfigSourceSet configSourceSet; private final TimingValues timingValues; private final GenericConfigSubscriber subscriber; private GenericConfigHandle handle; - Subscriber(RawConfig config, TimingValues timingValues, JRTConfigRequester requester) { + Subscriber(RawConfig config, ConfigSourceSet configSourceSet, TimingValues timingValues, JRTConfigRequester requester) { this.config = config; + this.configSourceSet = configSourceSet; this.timingValues = timingValues; - this.subscriber = new GenericConfigSubscriber(requester); + this.subscriber = new GenericConfigSubscriber(Map.of(configSourceSet, requester)); } void subscribe() { ConfigKey key = config.getKey(); handle = subscriber.subscribe(new ConfigKey<>(key.getName(), key.getConfigId(), key.getNamespace()), - config.getDefContent(), timingValues); + config.getDefContent(), configSourceSet, timingValues); } public Optional nextGeneration() { @@ -54,8 +58,14 @@ public class Subscriber { return Optional.empty(); } - public void cancel() { subscriber.close(); } + public void cancel() { + if (subscriber != null) { + subscriber.close(); + } + } - boolean isClosed() { return subscriber.isClosed(); } + boolean isClosed() { + return subscriber.isClosed(); + } } diff --git a/config/abi-spec.json b/config/abi-spec.json index 844835ae1c5..fa016fd91da 100644 --- a/config/abi-spec.json +++ b/config/abi-spec.json @@ -212,18 +212,21 @@ "public boolean nextGeneration(long)", "protected void throwIfExceptionSet(com.yahoo.config.subscription.impl.ConfigSubscription)", "public void close()", + "protected void closeRequesters()", "public java.lang.String toString()", "public java.lang.Thread startConfigThread(java.lang.Runnable)", "protected com.yahoo.config.subscription.ConfigSubscriber$State state()", "public void reload(long)", "public com.yahoo.config.subscription.ConfigSource getSource()", + "public java.util.Map requesters()", "public boolean isClosed()", "public com.yahoo.config.subscription.ConfigHandle subscribe(com.yahoo.config.subscription.ConfigSubscriber$SingleSubscriber, java.lang.Class, java.lang.String)", "public long getGeneration()", "protected void finalize()" ], "fields": [ - "protected final java.util.List subscriptionHandles" + "protected final java.util.List subscriptionHandles", + "protected java.util.Map requesters" ] }, "com.yahoo.config.subscription.ConfigURI": { diff --git a/config/src/main/java/com/yahoo/config/subscription/ConfigSubscriber.java b/config/src/main/java/com/yahoo/config/subscription/ConfigSubscriber.java index 01008f0a8a2..07132c460f9 100644 --- a/config/src/main/java/com/yahoo/config/subscription/ConfigSubscriber.java +++ b/config/src/main/java/com/yahoo/config/subscription/ConfigSubscriber.java @@ -5,13 +5,15 @@ import com.yahoo.config.ConfigInstance; import com.yahoo.config.ConfigurationRuntimeException; import com.yahoo.config.subscription.impl.ConfigSubscription; import com.yahoo.config.subscription.impl.JRTConfigRequester; -import com.yahoo.config.subscription.impl.JrtConfigRequesters; import com.yahoo.vespa.config.ConfigKey; import com.yahoo.vespa.config.TimingValues; import com.yahoo.yolean.Exceptions; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.CopyOnWriteArrayList; + import java.util.logging.Level; import java.util.logging.Logger; @@ -38,7 +40,6 @@ public class ConfigSubscriber implements AutoCloseable { private final ConfigSource source; private final Object monitor = new Object(); private final Throwable stackTraceAtConstruction; // TODO Remove once finalizer is gone - private final JrtConfigRequesters requesters = new JrtConfigRequesters(); /** The last complete config generation received by this */ private long generation = -1; @@ -50,6 +51,11 @@ public class ConfigSubscriber implements AutoCloseable { */ private boolean applyOnRestart = false; + /** + * Reuse requesters for equal source sets, limit number if many subscriptions. + */ + protected Map requesters = new HashMap<>(); + /** * The states of the subscriber. Affects the validity of calling certain methods. * @@ -108,8 +114,8 @@ public class ConfigSubscriber implements AutoCloseable { // for testing ConfigHandle subscribe(Class configClass, String configId, ConfigSource source, TimingValues timingValues) { checkStateBeforeSubscribe(); - ConfigKey configKey = new ConfigKey<>(configClass, configId); - ConfigSubscription sub = ConfigSubscription.get(configKey, requesters, source, timingValues); + final ConfigKey configKey = new ConfigKey<>(configClass, configId); + ConfigSubscription sub = ConfigSubscription.get(configKey, this, source, timingValues); ConfigHandle handle = new ConfigHandle<>(sub); subscribeAndHandleErrors(sub, configKey, handle, timingValues); return handle; @@ -369,10 +375,19 @@ public class ConfigSubscriber implements AutoCloseable { for (ConfigHandle h : subscriptionHandles) { h.subscription().close(); } - requesters.close(); + closeRequesters(); log.log(FINE, () -> "Config subscriber has been closed."); } + /** + * Closes all open requesters + */ + protected void closeRequesters() { + for (JRTConfigRequester requester : requesters.values()) { + requester.close(); + } + } + @Override public String toString() { StringBuilder sb = new StringBuilder(); @@ -427,6 +442,14 @@ public class ConfigSubscriber implements AutoCloseable { return source; } + /** + * Implementation detail, do not use. + * @return requesters + */ + public Map requesters() { + return requesters; + } + public boolean isClosed() { synchronized (monitor) { return state == State.CLOSED; diff --git a/config/src/main/java/com/yahoo/config/subscription/impl/ConfigSubscription.java b/config/src/main/java/com/yahoo/config/subscription/impl/ConfigSubscription.java index f8a45a11b70..780556e93fa 100644 --- a/config/src/main/java/com/yahoo/config/subscription/impl/ConfigSubscription.java +++ b/config/src/main/java/com/yahoo/config/subscription/impl/ConfigSubscription.java @@ -111,9 +111,10 @@ public abstract class ConfigSubscription { * Correct type of ConfigSubscription instance based on type of source or form of config id * * @param key a {@link ConfigKey} + * @param subscriber the subscriber for this subscription * @return a subclass of a ConfigsSubscription */ - public static ConfigSubscription get(ConfigKey key, JrtConfigRequesters requesters, + public static ConfigSubscription get(ConfigKey key, ConfigSubscriber subscriber, ConfigSource source, TimingValues timingValues) { String configId = key.getConfigId(); if (source instanceof RawSource || configId.startsWith("raw:")) return getRawSub(key, source); @@ -121,10 +122,7 @@ public abstract class ConfigSubscription { if (source instanceof DirSource || configId.startsWith("dir:")) return getDirFileSub(key, source); if (source instanceof JarSource || configId.startsWith("jar:")) return getJarSub(key, source); if (source instanceof ConfigSet) return new ConfigSetSubscription<>(key, source); - if (source instanceof ConfigSourceSet) { - JRTConfigRequester requester = requesters.getRequester((ConfigSourceSet) source, timingValues); - return new JRTConfigSubscription<>(key, requester, timingValues); - } + if (source instanceof ConfigSourceSet) return new JRTConfigSubscription<>(key, subscriber, (ConfigSourceSet) source, timingValues); throw new IllegalArgumentException("Unknown source type: " + source); } diff --git a/config/src/main/java/com/yahoo/config/subscription/impl/GenericConfigSubscriber.java b/config/src/main/java/com/yahoo/config/subscription/impl/GenericConfigSubscriber.java index e382bab576e..6dc18137639 100644 --- a/config/src/main/java/com/yahoo/config/subscription/impl/GenericConfigSubscriber.java +++ b/config/src/main/java/com/yahoo/config/subscription/impl/GenericConfigSubscriber.java @@ -3,12 +3,14 @@ package com.yahoo.config.subscription.impl; import com.yahoo.config.ConfigInstance; import com.yahoo.config.subscription.ConfigHandle; +import com.yahoo.config.subscription.ConfigSourceSet; import com.yahoo.config.subscription.ConfigSubscriber; import com.yahoo.vespa.config.ConfigKey; import com.yahoo.vespa.config.RawConfig; import com.yahoo.vespa.config.TimingValues; import java.util.List; +import java.util.Map; /** * A subscriber that can subscribe without the class. Used by config proxy. @@ -17,18 +19,16 @@ import java.util.List; */ public class GenericConfigSubscriber extends ConfigSubscriber { - private final JRTConfigRequester requester; - /** * Constructs a new subscriber using the given pool of requesters (JRTConfigRequester holds 1 connection which in * turn is subject to failover across the elements in the source set.) * The behaviour is undefined if the map key is different from the source set the requester was built with. * See also {@link JRTConfigRequester#JRTConfigRequester(com.yahoo.vespa.config.ConnectionPool, com.yahoo.vespa.config.TimingValues)} * - * @param requester a config requester + * @param requesters a map from config source set to config requester */ - public GenericConfigSubscriber(JRTConfigRequester requester) { - this.requester = requester; + public GenericConfigSubscriber(Map requesters) { + this.requesters = requesters; } /** @@ -36,12 +36,13 @@ public class GenericConfigSubscriber extends ConfigSubscriber { * * @param key the {@link ConfigKey to subscribe to} * @param defContent the config definition content for the config to subscribe to + * @param source the config source to use * @param timingValues {@link TimingValues} * @return generic handle */ - public GenericConfigHandle subscribe(ConfigKey key, List defContent, TimingValues timingValues) { + public GenericConfigHandle subscribe(ConfigKey key, List defContent, ConfigSourceSet source, TimingValues timingValues) { checkStateBeforeSubscribe(); - GenericJRTConfigSubscription sub = new GenericJRTConfigSubscription(key, defContent, requester, timingValues); + GenericJRTConfigSubscription sub = new GenericJRTConfigSubscription(key, defContent, this, source, timingValues); GenericConfigHandle handle = new GenericConfigHandle(sub); subscribeAndHandleErrors(sub, key, handle, timingValues); return handle; diff --git a/config/src/main/java/com/yahoo/config/subscription/impl/GenericJRTConfigSubscription.java b/config/src/main/java/com/yahoo/config/subscription/impl/GenericJRTConfigSubscription.java index 43f7a1fc168..737ca64b075 100644 --- a/config/src/main/java/com/yahoo/config/subscription/impl/GenericJRTConfigSubscription.java +++ b/config/src/main/java/com/yahoo/config/subscription/impl/GenericJRTConfigSubscription.java @@ -1,6 +1,8 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.config.subscription.impl; +import com.yahoo.config.subscription.ConfigSourceSet; +import com.yahoo.config.subscription.ConfigSubscriber; import com.yahoo.vespa.config.ConfigKey; import com.yahoo.vespa.config.RawConfig; import com.yahoo.vespa.config.TimingValues; @@ -23,9 +25,10 @@ public class GenericJRTConfigSubscription extends JRTConfigSubscription key, List defContent, - JRTConfigRequester requester, + ConfigSubscriber subscriber, + ConfigSourceSet source, TimingValues timingValues) { - super(key, requester, timingValues); + super(key, subscriber, source, timingValues); this.defContent = defContent; } diff --git a/config/src/main/java/com/yahoo/config/subscription/impl/JRTConfigSubscription.java b/config/src/main/java/com/yahoo/config/subscription/impl/JRTConfigSubscription.java index 0b98e9cd1b2..b27c75fb61d 100644 --- a/config/src/main/java/com/yahoo/config/subscription/impl/JRTConfigSubscription.java +++ b/config/src/main/java/com/yahoo/config/subscription/impl/JRTConfigSubscription.java @@ -4,6 +4,8 @@ package com.yahoo.config.subscription.impl; import com.yahoo.config.ConfigInstance; import com.yahoo.config.ConfigurationRuntimeException; import com.yahoo.config.subscription.ConfigInterruptedException; +import com.yahoo.config.subscription.ConfigSourceSet; +import com.yahoo.config.subscription.ConfigSubscriber; import com.yahoo.vespa.config.ConfigKey; import com.yahoo.vespa.config.ConfigPayload; import com.yahoo.vespa.config.TimingValues; @@ -29,8 +31,9 @@ import static java.util.logging.Level.INFO; */ public class JRTConfigSubscription extends ConfigSubscription { - private final JRTConfigRequester requester; + private JRTConfigRequester requester; private final TimingValues timingValues; + private final ConfigSubscriber subscriber; // Last time we got an OK JRT callback private Instant lastOK = Instant.MIN; @@ -40,11 +43,13 @@ public class JRTConfigSubscription extends ConfigSubsc * but has not yet been handled. */ private BlockingQueue reqQueue = new LinkedBlockingQueue<>(); + private final ConfigSourceSet sources; - public JRTConfigSubscription(ConfigKey key, JRTConfigRequester requester, TimingValues timingValues) { + public JRTConfigSubscription(ConfigKey key, ConfigSubscriber subscriber, ConfigSourceSet source, TimingValues timingValues) { super(key); this.timingValues = timingValues; - this.requester = requester; + this.subscriber = subscriber; + this.sources = source; } @Override @@ -143,6 +148,7 @@ public class JRTConfigSubscription extends ConfigSubsc @Override public boolean subscribe(long timeout) { lastOK = Instant.now(); + requester = getRequester(); requester.request(this); JRTClientConfigRequest req = reqQueue.peek(); while (req == null && (Instant.now().isBefore(lastOK.plus(Duration.ofMillis(timeout))))) { @@ -156,6 +162,15 @@ public class JRTConfigSubscription extends ConfigSubsc return req != null; } + private JRTConfigRequester getRequester() { + JRTConfigRequester requester = subscriber.requesters().get(sources); + if (requester == null) { + requester = JRTConfigRequester.create(sources, timingValues); + subscriber.requesters().put(sources, requester); + } + return requester; + } + @Override @SuppressWarnings("serial") public void close() { diff --git a/config/src/main/java/com/yahoo/config/subscription/impl/JrtConfigRequesters.java b/config/src/main/java/com/yahoo/config/subscription/impl/JrtConfigRequesters.java deleted file mode 100644 index 1e9612272d5..00000000000 --- a/config/src/main/java/com/yahoo/config/subscription/impl/JrtConfigRequesters.java +++ /dev/null @@ -1,38 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.config.subscription.impl; - -import com.yahoo.config.subscription.ConfigSourceSet; -import com.yahoo.vespa.config.TimingValues; - -import java.util.HashMap; -import java.util.Map; - -/** - * Keeps track of requesters per config subscriber - * - * @author hmusum - */ -public class JrtConfigRequesters { - - /** - * Reuse requesters for equal source sets, limit number if many subscriptions. - */ - protected Map requesters = new HashMap<>(); - - public JRTConfigRequester getRequester(ConfigSourceSet source, TimingValues timingValues) { - JRTConfigRequester requester = requesters.get(source); - if (requester == null) { - requester = JRTConfigRequester.create(source, timingValues); - requesters.put(source, requester); - } - return requester; - } - - /** - * Closes all open requesters - */ - public void close() { - requesters.values().forEach(JRTConfigRequester::close); - } - -} diff --git a/config/src/test/java/com/yahoo/config/subscription/ConfigSetSubscriptionTest.java b/config/src/test/java/com/yahoo/config/subscription/ConfigSetSubscriptionTest.java index 346368ee7d9..0d9b8745888 100644 --- a/config/src/test/java/com/yahoo/config/subscription/ConfigSetSubscriptionTest.java +++ b/config/src/test/java/com/yahoo/config/subscription/ConfigSetSubscriptionTest.java @@ -2,7 +2,6 @@ package com.yahoo.config.subscription; import com.yahoo.config.subscription.impl.ConfigSubscription; -import com.yahoo.config.subscription.impl.JrtConfigRequesters; import com.yahoo.foo.AppConfig; import com.yahoo.foo.SimpletypesConfig; import com.yahoo.foo.StringConfig; @@ -19,21 +18,21 @@ public class ConfigSetSubscriptionTest { @Test public void testConfigSubscription() { + ConfigSubscriber subscriber = new ConfigSubscriber(); ConfigSet configSet = new ConfigSet(); AppConfig.Builder a0builder = new AppConfig.Builder().message("A message, 0").times(88); configSet.addBuilder("app/0", a0builder); AppConfig.Builder a1builder = new AppConfig.Builder().message("A message, 1").times(89); configSet.addBuilder("app/1", a1builder); - JrtConfigRequesters requesters = new JrtConfigRequesters(); ConfigSubscription c1 = ConfigSubscription.get( new ConfigKey<>(AppConfig.class, "app/0"), - requesters, + subscriber, configSet, new TimingValues()); ConfigSubscription c2 = ConfigSubscription.get( new ConfigKey<>(AppConfig.class, "app/1"), - requesters, + subscriber, configSet, new TimingValues()); @@ -43,13 +42,14 @@ public class ConfigSetSubscriptionTest { @Test(expected = IllegalArgumentException.class) public void testUnknownKey() { + ConfigSubscriber subscriber = new ConfigSubscriber(); ConfigSet configSet = new ConfigSet(); AppConfig.Builder a0builder = new AppConfig.Builder().message("A message, 0").times(88); configSet.addBuilder("app/0", a0builder); ConfigSubscription.get( new ConfigKey<>(SimpletypesConfig.class, "simpletypes/1"), - new JrtConfigRequesters(), + subscriber, configSet, new TimingValues()); } diff --git a/config/src/test/java/com/yahoo/config/subscription/ConfigSubscriptionTest.java b/config/src/test/java/com/yahoo/config/subscription/ConfigSubscriptionTest.java index 1b0bc858361..270c618ee1b 100644 --- a/config/src/test/java/com/yahoo/config/subscription/ConfigSubscriptionTest.java +++ b/config/src/test/java/com/yahoo/config/subscription/ConfigSubscriptionTest.java @@ -4,7 +4,6 @@ package com.yahoo.config.subscription; import com.yahoo.config.ConfigInstance; import com.yahoo.config.ConfigurationRuntimeException; import com.yahoo.config.subscription.impl.ConfigSubscription; -import com.yahoo.config.subscription.impl.JrtConfigRequesters; import com.yahoo.foo.AppConfig; import com.yahoo.foo.SimpletypesConfig; import com.yahoo.vespa.config.ConfigKey; @@ -30,10 +29,9 @@ public class ConfigSubscriptionTest { public void testEquals() { ConfigSubscriber sub = new ConfigSubscriber(); - JrtConfigRequesters requesters = new JrtConfigRequesters(); - ConfigSubscription a = createSubscription(requesters, "test"); - ConfigSubscription b = createSubscription(requesters, "test"); - ConfigSubscription c = createSubscription(requesters, "test2"); + ConfigSubscription a = createSubscription(sub, "test"); + ConfigSubscription b = createSubscription(sub, "test"); + ConfigSubscription c = createSubscription(sub, "test2"); assertEquals(b, a); assertEquals(a, a); assertEquals(b, b); @@ -41,21 +39,21 @@ public class ConfigSubscriptionTest { assertNotEquals(c, a); assertNotEquals(c, b); + ConfigSubscriber subscriber = new ConfigSubscriber(); ConfigSet configSet = new ConfigSet(); AppConfig.Builder a0builder = new AppConfig.Builder().message("A message, 0").times(88); configSet.addBuilder("app/0", a0builder); AppConfig.Builder a1builder = new AppConfig.Builder().message("A message, 1").times(89); configSet.addBuilder("app/1", a1builder); - ConfigSubscription c1 = ConfigSubscription.get( new ConfigKey<>(AppConfig.class, "app/0"), - requesters, + subscriber, configSet, new TimingValues()); ConfigSubscription c2 = ConfigSubscription.get( new ConfigKey<>(AppConfig.class, "app/1"), - requesters, + subscriber, configSet, new TimingValues()); @@ -88,9 +86,9 @@ public class ConfigSubscriptionTest { } } - private ConfigSubscription createSubscription(JrtConfigRequesters requesters, String configId) { + private ConfigSubscription createSubscription(ConfigSubscriber sub, String configId) { return ConfigSubscription.get(new ConfigKey<>(SimpletypesConfig.class, configId), - requesters, new RawSource("boolval true"), new TimingValues()); + sub, new RawSource("boolval true"), new TimingValues()); } private static class TestConfigSubscriber extends ConfigSubscriber { diff --git a/config/src/test/java/com/yahoo/config/subscription/GenericConfigSubscriberTest.java b/config/src/test/java/com/yahoo/config/subscription/GenericConfigSubscriberTest.java index fc922cc3b07..4616630557e 100644 --- a/config/src/test/java/com/yahoo/config/subscription/GenericConfigSubscriberTest.java +++ b/config/src/test/java/com/yahoo/config/subscription/GenericConfigSubscriberTest.java @@ -6,15 +6,15 @@ import com.yahoo.config.subscription.impl.GenericConfigSubscriber; import com.yahoo.config.subscription.impl.JRTConfigRequester; import com.yahoo.config.subscription.impl.JRTConfigRequesterTest; import com.yahoo.config.subscription.impl.MockConnection; -import com.yahoo.jrt.Supervisor; -import com.yahoo.jrt.Transport; import com.yahoo.vespa.config.ConfigKey; -import com.yahoo.vespa.config.JRTConnectionPool; import com.yahoo.vespa.config.TimingValues; import com.yahoo.vespa.config.protocol.CompressionType; import org.junit.Test; +import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.List; +import java.util.Map; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -31,11 +31,14 @@ public class GenericConfigSubscriberTest { @Test public void testSubscribeGeneric() throws InterruptedException { - JRTConfigRequester requester = new JRTConfigRequester(new MockConnection(), tv); - GenericConfigSubscriber sub = new GenericConfigSubscriber(requester); + Map requesters = new HashMap<>(); + ConfigSourceSet sourceSet = new ConfigSourceSet("blabla"); + requesters.put(sourceSet, new JRTConfigRequester(new MockConnection(), tv)); + GenericConfigSubscriber sub = new GenericConfigSubscriber(requesters); final List defContent = List.of("myVal int"); GenericConfigHandle handle = sub.subscribe(new ConfigKey<>("simpletypes", "id", "config"), defContent, + sourceSet, tv); assertTrue(sub.nextConfig(false)); assertTrue(handle.isChanged()); @@ -57,6 +60,23 @@ public class GenericConfigSubscriberTest { return handle.getRawConfig().getPayload().withCompression(CompressionType.UNCOMPRESSED).toString(); } + @Test + public void testGenericRequesterPooling() { + ConfigSourceSet source1 = new ConfigSourceSet("tcp/foo:78"); + ConfigSourceSet source2 = new ConfigSourceSet("tcp/bar:79"); + JRTConfigRequester req1 = JRTConfigRequester.create(source1, tv); + JRTConfigRequester req2 = JRTConfigRequester.create(source2, tv); + Map requesters = new LinkedHashMap<>(); + requesters.put(source1, req1); + requesters.put(source2, req2); + GenericConfigSubscriber sub = new GenericConfigSubscriber(requesters); + assertEquals(sub.requesters().get(source1).getConnectionPool().getCurrent().getAddress(), "tcp/foo:78"); + assertEquals(sub.requesters().get(source2).getConnectionPool().getCurrent().getAddress(), "tcp/bar:79"); + for (JRTConfigRequester requester : requesters.values()) { + requester.close(); + } + } + @Test(expected=UnsupportedOperationException.class) public void testOverriddenSubscribeInvalid1() { createSubscriber().subscribe(null, null); @@ -73,7 +93,9 @@ public class GenericConfigSubscriberTest { } private GenericConfigSubscriber createSubscriber() { - return new GenericConfigSubscriber(new JRTConfigRequester(new JRTConnectionPool(new ConfigSourceSet("foo"), new Supervisor(new Transport())), tv)); + return new GenericConfigSubscriber(Map.of( + new ConfigSourceSet("blabla"), + new JRTConfigRequester(new MockConnection(), JRTConfigRequesterTest.getTestTimingValues()))); } } diff --git a/config/src/test/java/com/yahoo/config/subscription/impl/FileConfigSubscriptionTest.java b/config/src/test/java/com/yahoo/config/subscription/impl/FileConfigSubscriptionTest.java index 74af35e39dc..15f39f590aa 100644 --- a/config/src/test/java/com/yahoo/config/subscription/impl/FileConfigSubscriptionTest.java +++ b/config/src/test/java/com/yahoo/config/subscription/impl/FileConfigSubscriptionTest.java @@ -1,6 +1,7 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.config.subscription.impl; +import com.yahoo.config.subscription.ConfigSubscriber; import com.yahoo.config.subscription.DirSource; import com.yahoo.foo.SimpletypesConfig; import com.yahoo.foo.TestReferenceConfig; @@ -97,10 +98,8 @@ public class FileConfigSubscriptionTest { final String cfgDir = "src/test/resources/configs/foo"; final String cfgId = "dir:" + cfgDir; final ConfigKey key = new ConfigKey<>(TestReferenceConfig.class, cfgId); - ConfigSubscription sub = ConfigSubscription.get(key, - new JrtConfigRequesters(), - new DirSource(new File(cfgDir)), - new TimingValues()); + ConfigSubscriber subscriber = new ConfigSubscriber(); + ConfigSubscription sub = ConfigSubscription.get(key, subscriber, new DirSource(new File(cfgDir)), new TimingValues()); assertTrue(sub.nextConfig(1000)); assertThat(sub.getConfigState().getConfig().configId(), is(cfgId)); } diff --git a/config/src/test/java/com/yahoo/config/subscription/impl/JRTConfigRequesterTest.java b/config/src/test/java/com/yahoo/config/subscription/impl/JRTConfigRequesterTest.java index dca0c2d0018..62a25fadf25 100644 --- a/config/src/test/java/com/yahoo/config/subscription/impl/JRTConfigRequesterTest.java +++ b/config/src/test/java/com/yahoo/config/subscription/impl/JRTConfigRequesterTest.java @@ -2,6 +2,7 @@ package com.yahoo.config.subscription.impl; import com.yahoo.config.subscription.ConfigSourceSet; +import com.yahoo.config.subscription.ConfigSubscriber; import com.yahoo.foo.SimpletypesConfig; import com.yahoo.jrt.Request; import com.yahoo.vespa.config.ConfigKey; @@ -50,11 +51,12 @@ public class JRTConfigRequesterTest { @Test public void testFirstRequestAfterSubscribing() { - TimingValues timingValues = getTestTimingValues(); - MockConnection connection = new MockConnection(); - JRTConfigRequester requester = new JRTConfigRequester(connection, timingValues); - JRTConfigSubscription sub = createSubscription(requester, timingValues); + ConfigSubscriber subscriber = new ConfigSubscriber(); + final TimingValues timingValues = getTestTimingValues(); + JRTConfigSubscription sub = createSubscription(subscriber, timingValues); + final MockConnection connection = new MockConnection(); + JRTConfigRequester requester = new JRTConfigRequester(connection, timingValues); assertEquals(requester.getConnectionPool(), connection); requester.request(sub); final Request request = connection.getRequest(); @@ -68,24 +70,25 @@ public class JRTConfigRequesterTest { @Test public void testFatalError() { + ConfigSubscriber subscriber = new ConfigSubscriber(); final TimingValues timingValues = getTestTimingValues(); final MockConnection connection = new MockConnection(new ErrorResponseHandler()); JRTConfigRequester requester = new JRTConfigRequester(connection, timingValues); - requester.request(createSubscription(requester, timingValues)); + requester.request(createSubscription(subscriber, timingValues)); waitUntilResponse(connection); assertEquals(1, requester.getFailures()); } @Test public void testFatalErrorSubscribed() { - TimingValues timingValues = getTestTimingValues(); - MockConnection connection = new MockConnection(new ErrorResponseHandler()); - JRTConfigRequester requester = new JRTConfigRequester(connection, timingValues); - - JRTConfigSubscription sub = createSubscription(requester, timingValues); + ConfigSubscriber subscriber = new ConfigSubscriber(); + final TimingValues timingValues = getTestTimingValues(); + JRTConfigSubscription sub = createSubscription(subscriber, timingValues); sub.setConfig(1L, false, config(), PayloadChecksums.empty()); + final MockConnection connection = new MockConnection(new ErrorResponseHandler()); + JRTConfigRequester requester = new JRTConfigRequester(connection, timingValues); requester.request(sub); waitUntilResponse(connection); assertEquals(1, requester.getFailures()); @@ -93,23 +96,25 @@ public class JRTConfigRequesterTest { @Test public void testTransientError() { - TimingValues timingValues = getTestTimingValues(); + ConfigSubscriber subscriber = new ConfigSubscriber(); + final TimingValues timingValues = getTestTimingValues(); - MockConnection connection = new MockConnection(new ErrorResponseHandler(com.yahoo.jrt.ErrorCode.TIMEOUT)); + final MockConnection connection = new MockConnection(new ErrorResponseHandler(com.yahoo.jrt.ErrorCode.TIMEOUT)); JRTConfigRequester requester = new JRTConfigRequester(connection, timingValues); - requester.request(createSubscription(requester, timingValues)); + requester.request(createSubscription(subscriber, timingValues)); waitUntilResponse(connection); assertEquals(1, requester.getFailures()); } @Test public void testTransientErrorSubscribed() { - TimingValues timingValues = getTestTimingValues(); - MockConnection connection = new MockConnection(new ErrorResponseHandler(com.yahoo.jrt.ErrorCode.TIMEOUT)); - JRTConfigRequester requester = new JRTConfigRequester(connection, timingValues); - JRTConfigSubscription sub = createSubscription(requester, timingValues); + ConfigSubscriber subscriber = new ConfigSubscriber(); + final TimingValues timingValues = getTestTimingValues(); + JRTConfigSubscription sub = createSubscription(subscriber, timingValues); sub.setConfig(1L, false, config(), PayloadChecksums.empty()); + final MockConnection connection = new MockConnection(new ErrorResponseHandler(com.yahoo.jrt.ErrorCode.TIMEOUT)); + JRTConfigRequester requester = new JRTConfigRequester(connection, timingValues); requester.request(sub); waitUntilResponse(connection); assertEquals(1, requester.getFailures()); @@ -117,12 +122,13 @@ public class JRTConfigRequesterTest { @Test public void testUnknownConfigDefinitionError() { - TimingValues timingValues = getTestTimingValues(); - MockConnection connection = new MockConnection(new ErrorResponseHandler(ErrorCode.UNKNOWN_DEFINITION)); - JRTConfigRequester requester = new JRTConfigRequester(connection, timingValues); - JRTConfigSubscription sub = createSubscription(requester, timingValues); + ConfigSubscriber subscriber = new ConfigSubscriber(); + final TimingValues timingValues = getTestTimingValues(); + JRTConfigSubscription sub = createSubscription(subscriber, timingValues); sub.setConfig(1L, false, config(), PayloadChecksums.empty()); + final MockConnection connection = new MockConnection(new ErrorResponseHandler(ErrorCode.UNKNOWN_DEFINITION)); + JRTConfigRequester requester = new JRTConfigRequester(connection, timingValues); assertEquals(requester.getConnectionPool(), connection); requester.request(sub); waitUntilResponse(connection); @@ -131,12 +137,13 @@ public class JRTConfigRequesterTest { @Test public void testClosedSubscription() { - TimingValues timingValues = getTestTimingValues(); - MockConnection connection = new MockConnection(new MockConnection.OKResponseHandler()); - JRTConfigRequester requester = new JRTConfigRequester(connection, timingValues); - JRTConfigSubscription sub = createSubscription(requester, timingValues); + ConfigSubscriber subscriber = new ConfigSubscriber(); + final TimingValues timingValues = getTestTimingValues(); + JRTConfigSubscription sub = createSubscription(subscriber, timingValues); sub.close(); + final MockConnection connection = new MockConnection(new MockConnection.OKResponseHandler()); + JRTConfigRequester requester = new JRTConfigRequester(connection, timingValues); requester.request(sub); assertEquals(1, connection.getNumberOfRequests()); // Check that no further request was sent? @@ -150,14 +157,16 @@ public class JRTConfigRequesterTest { @Test public void testTimeout() { - TimingValues timingValues = getTestTimingValues(); - MockConnection connection = new MockConnection(new DelayedResponseHandler(timingValues.getSubscribeTimeout()), - 2); // fake that we have more than one source - JRTConfigRequester requester = new JRTConfigRequester(connection, timingValues); - JRTConfigSubscription sub = createSubscription(requester, timingValues); + ConfigSubscriber subscriber = new ConfigSubscriber(); + final TimingValues timingValues = getTestTimingValues(); + JRTConfigSubscription sub = createSubscription(subscriber, timingValues); sub.close(); - requester.request(createSubscription(requester, timingValues)); + final MockConnection connection = new MockConnection( + new DelayedResponseHandler(timingValues.getSubscribeTimeout()), + 2); // fake that we have more than one source + JRTConfigRequester requester = new JRTConfigRequester(connection, timingValues); + requester.request(createSubscription(subscriber, timingValues)); // Check that no further request was sent? try { Thread.sleep(timingValues.getFixedDelay()*2); @@ -166,10 +175,9 @@ public class JRTConfigRequesterTest { } } - private JRTConfigSubscription createSubscription(JRTConfigRequester requester, TimingValues timingValues) { - return new JRTConfigSubscription<>(new ConfigKey<>(SimpletypesConfig.class, "testid"), - requester, - timingValues); + private JRTConfigSubscription createSubscription(ConfigSubscriber subscriber, TimingValues timingValues) { + return new JRTConfigSubscription<>( + new ConfigKey<>(SimpletypesConfig.class, "testid"), subscriber, null, timingValues); } private SimpletypesConfig config() { diff --git a/config/src/test/java/com/yahoo/vespa/config/protocol/JRTConfigRequestFactoryTest.java b/config/src/test/java/com/yahoo/vespa/config/protocol/JRTConfigRequestFactoryTest.java index 14183aa087a..4f7b1df5a43 100644 --- a/config/src/test/java/com/yahoo/vespa/config/protocol/JRTConfigRequestFactoryTest.java +++ b/config/src/test/java/com/yahoo/vespa/config/protocol/JRTConfigRequestFactoryTest.java @@ -2,11 +2,10 @@ package com.yahoo.vespa.config.protocol; import com.yahoo.config.subscription.ConfigSourceSet; -import com.yahoo.config.subscription.impl.JRTConfigRequester; +import com.yahoo.config.subscription.ConfigSubscriber; import com.yahoo.config.subscription.impl.JRTConfigSubscription; import com.yahoo.foo.FunctionTestConfig; import com.yahoo.vespa.config.ConfigKey; -import com.yahoo.vespa.config.JRTConnectionPool; import com.yahoo.vespa.config.RawConfig; import com.yahoo.vespa.config.TimingValues; import org.junit.Test; @@ -43,13 +42,11 @@ public class JRTConfigRequestFactoryTest { @Test public void testCreateFromSub() { + ConfigSubscriber subscriber = new ConfigSubscriber(); Class clazz = FunctionTestConfig.class; final String configId = "foo"; - TimingValues timingValues = new TimingValues(); - JRTConfigSubscription sub = - new JRTConfigSubscription<>(new ConfigKey<>(clazz, configId), - new JRTConfigRequester(new JRTConnectionPool(new ConfigSourceSet("tcp/localhost:12345")), timingValues), - timingValues); + JRTConfigSubscription sub = new JRTConfigSubscription<>( + new ConfigKey<>(clazz, configId), subscriber, new ConfigSourceSet(), new TimingValues()); JRTClientConfigRequest request = JRTConfigRequestFactory.createFromSub(sub); assertThat(request.getVespaVersion().get(), is(defaultVespaVersion)); diff --git a/config/src/test/java/com/yahoo/vespa/config/protocol/JRTConfigRequestV3Test.java b/config/src/test/java/com/yahoo/vespa/config/protocol/JRTConfigRequestV3Test.java index dabd87e1eec..5f2a5c73fa5 100644 --- a/config/src/test/java/com/yahoo/vespa/config/protocol/JRTConfigRequestV3Test.java +++ b/config/src/test/java/com/yahoo/vespa/config/protocol/JRTConfigRequestV3Test.java @@ -2,6 +2,8 @@ package com.yahoo.vespa.config.protocol; import com.yahoo.config.subscription.ConfigSourceSet; +import com.yahoo.config.subscription.ConfigSubscriber; +import com.yahoo.config.subscription.impl.GenericConfigSubscriber; import com.yahoo.config.subscription.impl.JRTConfigRequester; import com.yahoo.config.subscription.impl.JRTConfigSubscription; import com.yahoo.config.subscription.impl.MockConnection; @@ -14,7 +16,6 @@ import com.yahoo.test.ManualClock; import com.yahoo.vespa.config.ConfigKey; import com.yahoo.vespa.config.ConfigPayload; import com.yahoo.vespa.config.ErrorCode; -import com.yahoo.vespa.config.JRTConnectionPool; import com.yahoo.vespa.config.PayloadChecksums; import com.yahoo.vespa.config.RawConfig; import com.yahoo.vespa.config.TimingValues; @@ -22,6 +23,7 @@ import com.yahoo.vespa.config.util.ConfigUtils; import org.junit.Before; import org.junit.Test; +import java.util.Collections; import java.util.List; import java.util.Optional; @@ -188,11 +190,12 @@ public class JRTConfigRequestV3Test { @Test public void created_from_subscription() { - TimingValues timingValues = new TimingValues(); + ConfigSubscriber subscriber = new ConfigSubscriber(); JRTConfigSubscription sub = new JRTConfigSubscription<>(new ConfigKey<>(SimpletypesConfig.class, configId), - new JRTConfigRequester(new JRTConnectionPool(new ConfigSourceSet("tcp/localhost:985")), timingValues), - timingValues); + subscriber, + new ConfigSourceSet(), + new TimingValues()); JRTClientConfigRequest request = createReq(sub, Trace.createNew(9)); assertThat(request.getConfigKey().getName(), is(SimpletypesConfig.CONFIG_DEF_NAME)); JRTServerConfigRequest serverRequest = createReq(request.getRequest()); @@ -209,10 +212,9 @@ public class JRTConfigRequestV3Test { } }); - TimingValues timingValues = new TimingValues(); - JRTConfigSubscription sub = new JRTConfigSubscription<>(new ConfigKey<>(SimpletypesConfig.class, configId), - new JRTConfigRequester(connection, timingValues), - timingValues); + ConfigSourceSet src = new ConfigSourceSet(); + ConfigSubscriber subscriber = new GenericConfigSubscriber(Collections.singletonMap(src, new JRTConfigRequester(connection, new TimingValues()))); + JRTConfigSubscription sub = new JRTConfigSubscription<>(new ConfigKey<>(SimpletypesConfig.class, configId), subscriber, src, new TimingValues()); sub.subscribe(120_0000); assertTrue(sub.nextConfig(120_0000)); sub.close(); -- cgit v1.2.3