diff options
author | Harald Musum <musum@yahooinc.com> | 2022-01-03 14:37:16 +0100 |
---|---|---|
committer | Harald Musum <musum@yahooinc.com> | 2022-01-03 14:37:16 +0100 |
commit | 3369f835bcb2434a6d54db34b0345f196aa2506d (patch) | |
tree | c1105d0b9bdbe102dd8b6f7919863a9669511c84 /config | |
parent | 4528efcd87b9c078f91ecfc4f53dfee8632881e1 (diff) |
Avoid clearing config response queue
There have been races due to the fact that we used to clear the queue
when receiving a response, thus missing some of the responses. This
stops clearing the queue and handles several items on the queue by
polling until the queue is empty when a new item is found on the queue.
Diffstat (limited to 'config')
3 files changed, 21 insertions, 14 deletions
diff --git a/config/src/main/java/com/yahoo/config/subscription/ConfigSubscriber.java b/config/src/main/java/com/yahoo/config/subscription/ConfigSubscriber.java index 01008f0a8a2..4bf442a0890 100644 --- a/config/src/main/java/com/yahoo/config/subscription/ConfigSubscriber.java +++ b/config/src/main/java/com/yahoo/config/subscription/ConfigSubscriber.java @@ -434,8 +434,8 @@ public class ConfigSubscriber implements AutoCloseable { } /** - * Use this convenience method if you only want to subscribe on <em>one</em> config, and want generic error handling. - * Implement {@link SingleSubscriber} and pass to this method. + * Convenience method that can be used if you only want to subscribe to <em>one</em> config, and want generic error handling. + * Implement {@link SingleSubscriber} and pass it to this method. * You will get initial config, and a config thread will be started. The method will throw in your thread if initial * configuration fails, and the config thread will print a generic error message (but continue) if it fails thereafter. The config * thread will stop if you {@link #close()} this {@link ConfigSubscriber}. diff --git a/config/src/main/java/com/yahoo/config/subscription/impl/JRTConfigSubscription.java b/config/src/main/java/com/yahoo/config/subscription/impl/JRTConfigSubscription.java index 68e22ad5656..4e1de12770f 100644 --- a/config/src/main/java/com/yahoo/config/subscription/impl/JRTConfigSubscription.java +++ b/config/src/main/java/com/yahoo/config/subscription/impl/JRTConfigSubscription.java @@ -49,12 +49,7 @@ public class JRTConfigSubscription<T extends ConfigInstance> extends ConfigSubsc @Override public boolean nextConfig(long timeoutMillis) { - // Note: since the JRT callback thread will clear the queue first when it inserts a brand new element, - // (see #updateConfig()) there is a race here. However: the caller will handle it no matter what it gets - // from the queue here, the important part is that local state on the subscription objects is preserved. - - // Poll the queue for a next config until timeout - JRTClientConfigRequest jrtReq = pollQueue(timeoutMillis); + JRTClientConfigRequest jrtReq = pollForNewConfig(timeoutMillis); if (jrtReq == null) return newConfigOrException(); log.log(FINE, () -> "Polled queue and found config " + jrtReq); @@ -74,6 +69,20 @@ public class JRTConfigSubscription<T extends ConfigInstance> extends ConfigSubsc return newConfigOrException(); } + private JRTClientConfigRequest pollForNewConfig(long timeoutMillis) { + JRTClientConfigRequest request = pollQueue(timeoutMillis); + // There might be more than one request on the queue, so empty queue by polling with + // 0 timeout until queue is empty (returned value is null) + JRTClientConfigRequest temp; + do { + temp = pollQueue(0); + if (temp != null) + request = temp; + } while (temp != null); + + return request; + } + private boolean newConfigOrException() { // These flags may have been left true from a previous call, since ConfigSubscriber's nextConfig // not necessarily returned true and reset the flags then @@ -133,10 +142,8 @@ public class JRTConfigSubscription<T extends ConfigInstance> extends ConfigSubsc return configInstance; } - // Will be called by JRTConfigRequester when there is a config with new generation for this subscription + // Called by JRTConfigRequester when there is a config with new generation for this subscription void updateConfig(JRTClientConfigRequest jrtReq) { - // We only want this latest generation to be in the queue, we do not preserve history in this system - reqQueue.clear(); if ( ! reqQueue.offer(jrtReq)) setException(new ConfigurationRuntimeException("Failed offering returned request to queue of subscription " + this)); } diff --git a/config/src/test/java/com/yahoo/config/subscription/GenericConfigSubscriberTest.java b/config/src/test/java/com/yahoo/config/subscription/GenericConfigSubscriberTest.java index fc922cc3b07..61573fd19be 100644 --- a/config/src/test/java/com/yahoo/config/subscription/GenericConfigSubscriberTest.java +++ b/config/src/test/java/com/yahoo/config/subscription/GenericConfigSubscriberTest.java @@ -33,7 +33,7 @@ public class GenericConfigSubscriberTest { public void testSubscribeGeneric() throws InterruptedException { JRTConfigRequester requester = new JRTConfigRequester(new MockConnection(), tv); GenericConfigSubscriber sub = new GenericConfigSubscriber(requester); - final List<String> defContent = List.of("myVal int"); + List<String> defContent = List.of("myVal int"); GenericConfigHandle handle = sub.subscribe(new ConfigKey<>("simpletypes", "id", "config"), defContent, tv); @@ -46,9 +46,9 @@ public class GenericConfigSubscriberTest { assertFalse(handle.isChanged()); // Wait some time, config should be the same, but generation should be higher - Thread.sleep(tv.getFixedDelay() * 2); + Thread.sleep(tv.getFixedDelay() * 3); assertEquals("{}", getConfig(handle)); - assertTrue(handle.getRawConfig().getGeneration() > 1); + assertTrue("Unexpected generation (not > 1): " + handle.getRawConfig().getGeneration(), handle.getRawConfig().getGeneration() > 1); assertFalse(sub.nextConfig(false)); assertFalse(handle.isChanged()); } |