summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHarald Musum <musum@yahooinc.com>2022-01-03 14:37:16 +0100
committerHarald Musum <musum@yahooinc.com>2022-01-03 14:37:16 +0100
commit3369f835bcb2434a6d54db34b0345f196aa2506d (patch)
treec1105d0b9bdbe102dd8b6f7919863a9669511c84
parent4528efcd87b9c078f91ecfc4f53dfee8632881e1 (diff)
Avoid clearing config response queue
There have been races due to the fact that we used to clear the queue when receiving a response, thus missing some of the responses. This stops clearing the queue and handles several items on the queue by polling until the queue is empty when a new item is found on the queue.
-rw-r--r--config/src/main/java/com/yahoo/config/subscription/ConfigSubscriber.java4
-rw-r--r--config/src/main/java/com/yahoo/config/subscription/impl/JRTConfigSubscription.java25
-rw-r--r--config/src/test/java/com/yahoo/config/subscription/GenericConfigSubscriberTest.java6
3 files changed, 21 insertions, 14 deletions
diff --git a/config/src/main/java/com/yahoo/config/subscription/ConfigSubscriber.java b/config/src/main/java/com/yahoo/config/subscription/ConfigSubscriber.java
index 01008f0a8a2..4bf442a0890 100644
--- a/config/src/main/java/com/yahoo/config/subscription/ConfigSubscriber.java
+++ b/config/src/main/java/com/yahoo/config/subscription/ConfigSubscriber.java
@@ -434,8 +434,8 @@ public class ConfigSubscriber implements AutoCloseable {
}
/**
- * Use this convenience method if you only want to subscribe on <em>one</em> config, and want generic error handling.
- * Implement {@link SingleSubscriber} and pass to this method.
+ * Convenience method that can be used if you only want to subscribe to <em>one</em> config, and want generic error handling.
+ * Implement {@link SingleSubscriber} and pass it to this method.
* You will get initial config, and a config thread will be started. The method will throw in your thread if initial
* configuration fails, and the config thread will print a generic error message (but continue) if it fails thereafter. The config
* thread will stop if you {@link #close()} this {@link ConfigSubscriber}.
diff --git a/config/src/main/java/com/yahoo/config/subscription/impl/JRTConfigSubscription.java b/config/src/main/java/com/yahoo/config/subscription/impl/JRTConfigSubscription.java
index 68e22ad5656..4e1de12770f 100644
--- a/config/src/main/java/com/yahoo/config/subscription/impl/JRTConfigSubscription.java
+++ b/config/src/main/java/com/yahoo/config/subscription/impl/JRTConfigSubscription.java
@@ -49,12 +49,7 @@ public class JRTConfigSubscription<T extends ConfigInstance> extends ConfigSubsc
@Override
public boolean nextConfig(long timeoutMillis) {
- // Note: since the JRT callback thread will clear the queue first when it inserts a brand new element,
- // (see #updateConfig()) there is a race here. However: the caller will handle it no matter what it gets
- // from the queue here, the important part is that local state on the subscription objects is preserved.
-
- // Poll the queue for a next config until timeout
- JRTClientConfigRequest jrtReq = pollQueue(timeoutMillis);
+ JRTClientConfigRequest jrtReq = pollForNewConfig(timeoutMillis);
if (jrtReq == null) return newConfigOrException();
log.log(FINE, () -> "Polled queue and found config " + jrtReq);
@@ -74,6 +69,20 @@ public class JRTConfigSubscription<T extends ConfigInstance> extends ConfigSubsc
return newConfigOrException();
}
+ private JRTClientConfigRequest pollForNewConfig(long timeoutMillis) {
+ JRTClientConfigRequest request = pollQueue(timeoutMillis);
+ // There might be more than one request on the queue, so empty queue by polling with
+ // 0 timeout until queue is empty (returned value is null)
+ JRTClientConfigRequest temp;
+ do {
+ temp = pollQueue(0);
+ if (temp != null)
+ request = temp;
+ } while (temp != null);
+
+ return request;
+ }
+
private boolean newConfigOrException() {
// These flags may have been left true from a previous call, since ConfigSubscriber's nextConfig
// not necessarily returned true and reset the flags then
@@ -133,10 +142,8 @@ public class JRTConfigSubscription<T extends ConfigInstance> extends ConfigSubsc
return configInstance;
}
- // Will be called by JRTConfigRequester when there is a config with new generation for this subscription
+ // Called by JRTConfigRequester when there is a config with new generation for this subscription
void updateConfig(JRTClientConfigRequest jrtReq) {
- // We only want this latest generation to be in the queue, we do not preserve history in this system
- reqQueue.clear();
if ( ! reqQueue.offer(jrtReq))
setException(new ConfigurationRuntimeException("Failed offering returned request to queue of subscription " + this));
}
diff --git a/config/src/test/java/com/yahoo/config/subscription/GenericConfigSubscriberTest.java b/config/src/test/java/com/yahoo/config/subscription/GenericConfigSubscriberTest.java
index fc922cc3b07..61573fd19be 100644
--- a/config/src/test/java/com/yahoo/config/subscription/GenericConfigSubscriberTest.java
+++ b/config/src/test/java/com/yahoo/config/subscription/GenericConfigSubscriberTest.java
@@ -33,7 +33,7 @@ public class GenericConfigSubscriberTest {
public void testSubscribeGeneric() throws InterruptedException {
JRTConfigRequester requester = new JRTConfigRequester(new MockConnection(), tv);
GenericConfigSubscriber sub = new GenericConfigSubscriber(requester);
- final List<String> defContent = List.of("myVal int");
+ List<String> defContent = List.of("myVal int");
GenericConfigHandle handle = sub.subscribe(new ConfigKey<>("simpletypes", "id", "config"),
defContent,
tv);
@@ -46,9 +46,9 @@ public class GenericConfigSubscriberTest {
assertFalse(handle.isChanged());
// Wait some time, config should be the same, but generation should be higher
- Thread.sleep(tv.getFixedDelay() * 2);
+ Thread.sleep(tv.getFixedDelay() * 3);
assertEquals("{}", getConfig(handle));
- assertTrue(handle.getRawConfig().getGeneration() > 1);
+ assertTrue("Unexpected generation (not > 1): " + handle.getRawConfig().getGeneration(), handle.getRawConfig().getGeneration() > 1);
assertFalse(sub.nextConfig(false));
assertFalse(handle.isChanged());
}