summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHarald Musum <musum@verizonmedia.com>2022-01-04 07:14:09 +0100
committerGitHub <noreply@github.com>2022-01-04 07:14:09 +0100
commit394ed618a252f1a24bc3c0dea21e79e9480e6173 (patch)
tree3fd36ff8b06f4d5cc89197f5dee1b3ee55323cf1
parent345a33600e758fa6a5d3615687942ad1e95560e8 (diff)
parent0a67c5df0c73331f6ca951557e635a9c25b9dc96 (diff)
Merge pull request #20634 from vespa-engine/hmusum/stop-clearing-config-requests-queue
Avoid clearing config response queue [run-systemtest]
-rw-r--r--config/src/main/java/com/yahoo/config/subscription/ConfigSubscriber.java4
-rw-r--r--config/src/main/java/com/yahoo/config/subscription/impl/JRTConfigSubscription.java56
-rw-r--r--config/src/test/java/com/yahoo/config/subscription/GenericConfigSubscriberTest.java6
3 files changed, 36 insertions, 30 deletions
diff --git a/config/src/main/java/com/yahoo/config/subscription/ConfigSubscriber.java b/config/src/main/java/com/yahoo/config/subscription/ConfigSubscriber.java
index 01008f0a8a2..4bf442a0890 100644
--- a/config/src/main/java/com/yahoo/config/subscription/ConfigSubscriber.java
+++ b/config/src/main/java/com/yahoo/config/subscription/ConfigSubscriber.java
@@ -434,8 +434,8 @@ public class ConfigSubscriber implements AutoCloseable {
}
/**
- * Use this convenience method if you only want to subscribe on <em>one</em> config, and want generic error handling.
- * Implement {@link SingleSubscriber} and pass to this method.
+ * Convenience method that can be used if you only want to subscribe to <em>one</em> config, and want generic error handling.
+ * Implement {@link SingleSubscriber} and pass it to this method.
* You will get initial config, and a config thread will be started. The method will throw in your thread if initial
* configuration fails, and the config thread will print a generic error message (but continue) if it fails thereafter. The config
* thread will stop if you {@link #close()} this {@link ConfigSubscriber}.
diff --git a/config/src/main/java/com/yahoo/config/subscription/impl/JRTConfigSubscription.java b/config/src/main/java/com/yahoo/config/subscription/impl/JRTConfigSubscription.java
index 68e22ad5656..a18f1b4b260 100644
--- a/config/src/main/java/com/yahoo/config/subscription/impl/JRTConfigSubscription.java
+++ b/config/src/main/java/com/yahoo/config/subscription/impl/JRTConfigSubscription.java
@@ -36,10 +36,9 @@ public class JRTConfigSubscription<T extends ConfigInstance> extends ConfigSubsc
private Instant lastOK = Instant.MIN;
/**
- * A queue containing either zero or one (the newest) request that got a callback from JRT,
- * but has not yet been handled.
+ * A queue containing responses (requests that got a callback from JRT) that has not yet been handled.
*/
- private BlockingQueue<JRTClientConfigRequest> reqQueue = new LinkedBlockingQueue<>();
+ private BlockingQueue<JRTClientConfigRequest> responseQueue = new LinkedBlockingQueue<>();
public JRTConfigSubscription(ConfigKey<T> key, JRTConfigRequester requester, TimingValues timingValues) {
super(key);
@@ -49,31 +48,40 @@ public class JRTConfigSubscription<T extends ConfigInstance> extends ConfigSubsc
@Override
public boolean nextConfig(long timeoutMillis) {
- // Note: since the JRT callback thread will clear the queue first when it inserts a brand new element,
- // (see #updateConfig()) there is a race here. However: the caller will handle it no matter what it gets
- // from the queue here, the important part is that local state on the subscription objects is preserved.
+ JRTClientConfigRequest response = pollForNewConfig(timeoutMillis);
+ if (response == null) return newConfigOrException();
- // Poll the queue for a next config until timeout
- JRTClientConfigRequest jrtReq = pollQueue(timeoutMillis);
- if (jrtReq == null) return newConfigOrException();
-
- log.log(FINE, () -> "Polled queue and found config " + jrtReq);
+ log.log(FINE, () -> "Polled queue and found config " + response);
// Might set the following (caller must check):
// generation, generation changed, config, config changed
// Important: it never <em>resets</em> those flags, we must persist that state until the
// ConfigSubscriber clears it
- if (jrtReq.hasUpdatedGeneration()) {
- setApplyOnRestart(jrtReq.responseIsApplyOnRestart());
- if (jrtReq.hasUpdatedConfig()) {
- setNewConfig(jrtReq);
+ if (response.hasUpdatedGeneration()) {
+ setApplyOnRestart(response.responseIsApplyOnRestart());
+ if (response.hasUpdatedConfig()) {
+ setNewConfig(response);
} else {
- setNewConfigAndGeneration(jrtReq);
+ setNewConfigAndGeneration(response);
}
}
return newConfigOrException();
}
+ private JRTClientConfigRequest pollForNewConfig(long timeoutMillis) {
+ JRTClientConfigRequest response = pollQueue(timeoutMillis);
+ // There might be more than one response on the queue, so empty queue by polling with
+ // 0 timeout until queue is empty (returned value is null)
+ JRTClientConfigRequest temp;
+ do {
+ temp = pollQueue(0);
+ if (temp != null)
+ response = temp;
+ } while (temp != null);
+
+ return response;
+ }
+
private boolean newConfigOrException() {
// These flags may have been left true from a previous call, since ConfigSubscriber's nextConfig
// not necessarily returned true and reset the flags then
@@ -89,7 +97,7 @@ public class JRTConfigSubscription<T extends ConfigInstance> extends ConfigSubsc
private JRTClientConfigRequest pollQueue(long timeoutMillis) {
try {
// Only valid responses are on queue, no need to validate
- return reqQueue.poll(timeoutMillis, TimeUnit.MILLISECONDS);
+ return responseQueue.poll(timeoutMillis, TimeUnit.MILLISECONDS);
} catch (InterruptedException e1) {
throw new ConfigInterruptedException(e1);
}
@@ -133,11 +141,9 @@ public class JRTConfigSubscription<T extends ConfigInstance> extends ConfigSubsc
return configInstance;
}
- // Will be called by JRTConfigRequester when there is a config with new generation for this subscription
+ // Called by JRTConfigRequester when there is a config with new generation for this subscription
void updateConfig(JRTClientConfigRequest jrtReq) {
- // We only want this latest generation to be in the queue, we do not preserve history in this system
- reqQueue.clear();
- if ( ! reqQueue.offer(jrtReq))
+ if ( ! responseQueue.offer(jrtReq))
setException(new ConfigurationRuntimeException("Failed offering returned request to queue of subscription " + this));
}
@@ -145,14 +151,14 @@ public class JRTConfigSubscription<T extends ConfigInstance> extends ConfigSubsc
public boolean subscribe(long timeout) {
lastOK = Instant.now();
requester.request(this);
- JRTClientConfigRequest req = reqQueue.peek();
+ JRTClientConfigRequest req = responseQueue.peek();
while (req == null && (Instant.now().isBefore(lastOK.plus(Duration.ofMillis(timeout))))) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
throw new ConfigInterruptedException(e);
}
- req = reqQueue.peek();
+ req = responseQueue.peek();
}
return req != null;
}
@@ -160,11 +166,11 @@ public class JRTConfigSubscription<T extends ConfigInstance> extends ConfigSubsc
@Override
public void close() {
super.close();
- reqQueue = new LinkedBlockingQueue<>() {
+ responseQueue = new LinkedBlockingQueue<>() {
@SuppressWarnings("NullableProblems")
@Override
public void put(JRTClientConfigRequest e) {
- // When closed, throw away all requests that callbacks try to put
+ // When closed, throw away all responses that callbacks try to put on queue
}
};
}
diff --git a/config/src/test/java/com/yahoo/config/subscription/GenericConfigSubscriberTest.java b/config/src/test/java/com/yahoo/config/subscription/GenericConfigSubscriberTest.java
index fc922cc3b07..61573fd19be 100644
--- a/config/src/test/java/com/yahoo/config/subscription/GenericConfigSubscriberTest.java
+++ b/config/src/test/java/com/yahoo/config/subscription/GenericConfigSubscriberTest.java
@@ -33,7 +33,7 @@ public class GenericConfigSubscriberTest {
public void testSubscribeGeneric() throws InterruptedException {
JRTConfigRequester requester = new JRTConfigRequester(new MockConnection(), tv);
GenericConfigSubscriber sub = new GenericConfigSubscriber(requester);
- final List<String> defContent = List.of("myVal int");
+ List<String> defContent = List.of("myVal int");
GenericConfigHandle handle = sub.subscribe(new ConfigKey<>("simpletypes", "id", "config"),
defContent,
tv);
@@ -46,9 +46,9 @@ public class GenericConfigSubscriberTest {
assertFalse(handle.isChanged());
// Wait some time, config should be the same, but generation should be higher
- Thread.sleep(tv.getFixedDelay() * 2);
+ Thread.sleep(tv.getFixedDelay() * 3);
assertEquals("{}", getConfig(handle));
- assertTrue(handle.getRawConfig().getGeneration() > 1);
+ assertTrue("Unexpected generation (not > 1): " + handle.getRawConfig().getGeneration(), handle.getRawConfig().getGeneration() > 1);
assertFalse(sub.nextConfig(false));
assertFalse(handle.isChanged());
}