diff options
author | Harald Musum <musum@verizonmedia.com> | 2022-01-04 07:14:09 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-01-04 07:14:09 +0100 |
commit | 394ed618a252f1a24bc3c0dea21e79e9480e6173 (patch) | |
tree | 3fd36ff8b06f4d5cc89197f5dee1b3ee55323cf1 | |
parent | 345a33600e758fa6a5d3615687942ad1e95560e8 (diff) | |
parent | 0a67c5df0c73331f6ca951557e635a9c25b9dc96 (diff) |
Merge pull request #20634 from vespa-engine/hmusum/stop-clearing-config-requests-queue
Avoid clearing config response queue [run-systemtest]
3 files changed, 36 insertions, 30 deletions
diff --git a/config/src/main/java/com/yahoo/config/subscription/ConfigSubscriber.java b/config/src/main/java/com/yahoo/config/subscription/ConfigSubscriber.java index 01008f0a8a2..4bf442a0890 100644 --- a/config/src/main/java/com/yahoo/config/subscription/ConfigSubscriber.java +++ b/config/src/main/java/com/yahoo/config/subscription/ConfigSubscriber.java @@ -434,8 +434,8 @@ public class ConfigSubscriber implements AutoCloseable { } /** - * Use this convenience method if you only want to subscribe on <em>one</em> config, and want generic error handling. - * Implement {@link SingleSubscriber} and pass to this method. + * Convenience method that can be used if you only want to subscribe to <em>one</em> config, and want generic error handling. + * Implement {@link SingleSubscriber} and pass it to this method. * You will get initial config, and a config thread will be started. The method will throw in your thread if initial * configuration fails, and the config thread will print a generic error message (but continue) if it fails thereafter. The config * thread will stop if you {@link #close()} this {@link ConfigSubscriber}. diff --git a/config/src/main/java/com/yahoo/config/subscription/impl/JRTConfigSubscription.java b/config/src/main/java/com/yahoo/config/subscription/impl/JRTConfigSubscription.java index 68e22ad5656..a18f1b4b260 100644 --- a/config/src/main/java/com/yahoo/config/subscription/impl/JRTConfigSubscription.java +++ b/config/src/main/java/com/yahoo/config/subscription/impl/JRTConfigSubscription.java @@ -36,10 +36,9 @@ public class JRTConfigSubscription<T extends ConfigInstance> extends ConfigSubsc private Instant lastOK = Instant.MIN; /** - * A queue containing either zero or one (the newest) request that got a callback from JRT, - * but has not yet been handled. + * A queue containing responses (requests that got a callback from JRT) that has not yet been handled. */ - private BlockingQueue<JRTClientConfigRequest> reqQueue = new LinkedBlockingQueue<>(); + private BlockingQueue<JRTClientConfigRequest> responseQueue = new LinkedBlockingQueue<>(); public JRTConfigSubscription(ConfigKey<T> key, JRTConfigRequester requester, TimingValues timingValues) { super(key); @@ -49,31 +48,40 @@ public class JRTConfigSubscription<T extends ConfigInstance> extends ConfigSubsc @Override public boolean nextConfig(long timeoutMillis) { - // Note: since the JRT callback thread will clear the queue first when it inserts a brand new element, - // (see #updateConfig()) there is a race here. However: the caller will handle it no matter what it gets - // from the queue here, the important part is that local state on the subscription objects is preserved. + JRTClientConfigRequest response = pollForNewConfig(timeoutMillis); + if (response == null) return newConfigOrException(); - // Poll the queue for a next config until timeout - JRTClientConfigRequest jrtReq = pollQueue(timeoutMillis); - if (jrtReq == null) return newConfigOrException(); - - log.log(FINE, () -> "Polled queue and found config " + jrtReq); + log.log(FINE, () -> "Polled queue and found config " + response); // Might set the following (caller must check): // generation, generation changed, config, config changed // Important: it never <em>resets</em> those flags, we must persist that state until the // ConfigSubscriber clears it - if (jrtReq.hasUpdatedGeneration()) { - setApplyOnRestart(jrtReq.responseIsApplyOnRestart()); - if (jrtReq.hasUpdatedConfig()) { - setNewConfig(jrtReq); + if (response.hasUpdatedGeneration()) { + setApplyOnRestart(response.responseIsApplyOnRestart()); + if (response.hasUpdatedConfig()) { + setNewConfig(response); } else { - setNewConfigAndGeneration(jrtReq); + setNewConfigAndGeneration(response); } } return newConfigOrException(); } + private JRTClientConfigRequest pollForNewConfig(long timeoutMillis) { + JRTClientConfigRequest response = pollQueue(timeoutMillis); + // There might be more than one response on the queue, so empty queue by polling with + // 0 timeout until queue is empty (returned value is null) + JRTClientConfigRequest temp; + do { + temp = pollQueue(0); + if (temp != null) + response = temp; + } while (temp != null); + + return response; + } + private boolean newConfigOrException() { // These flags may have been left true from a previous call, since ConfigSubscriber's nextConfig // not necessarily returned true and reset the flags then @@ -89,7 +97,7 @@ public class JRTConfigSubscription<T extends ConfigInstance> extends ConfigSubsc private JRTClientConfigRequest pollQueue(long timeoutMillis) { try { // Only valid responses are on queue, no need to validate - return reqQueue.poll(timeoutMillis, TimeUnit.MILLISECONDS); + return responseQueue.poll(timeoutMillis, TimeUnit.MILLISECONDS); } catch (InterruptedException e1) { throw new ConfigInterruptedException(e1); } @@ -133,11 +141,9 @@ public class JRTConfigSubscription<T extends ConfigInstance> extends ConfigSubsc return configInstance; } - // Will be called by JRTConfigRequester when there is a config with new generation for this subscription + // Called by JRTConfigRequester when there is a config with new generation for this subscription void updateConfig(JRTClientConfigRequest jrtReq) { - // We only want this latest generation to be in the queue, we do not preserve history in this system - reqQueue.clear(); - if ( ! reqQueue.offer(jrtReq)) + if ( ! responseQueue.offer(jrtReq)) setException(new ConfigurationRuntimeException("Failed offering returned request to queue of subscription " + this)); } @@ -145,14 +151,14 @@ public class JRTConfigSubscription<T extends ConfigInstance> extends ConfigSubsc public boolean subscribe(long timeout) { lastOK = Instant.now(); requester.request(this); - JRTClientConfigRequest req = reqQueue.peek(); + JRTClientConfigRequest req = responseQueue.peek(); while (req == null && (Instant.now().isBefore(lastOK.plus(Duration.ofMillis(timeout))))) { try { Thread.sleep(10); } catch (InterruptedException e) { throw new ConfigInterruptedException(e); } - req = reqQueue.peek(); + req = responseQueue.peek(); } return req != null; } @@ -160,11 +166,11 @@ public class JRTConfigSubscription<T extends ConfigInstance> extends ConfigSubsc @Override public void close() { super.close(); - reqQueue = new LinkedBlockingQueue<>() { + responseQueue = new LinkedBlockingQueue<>() { @SuppressWarnings("NullableProblems") @Override public void put(JRTClientConfigRequest e) { - // When closed, throw away all requests that callbacks try to put + // When closed, throw away all responses that callbacks try to put on queue } }; } diff --git a/config/src/test/java/com/yahoo/config/subscription/GenericConfigSubscriberTest.java b/config/src/test/java/com/yahoo/config/subscription/GenericConfigSubscriberTest.java index fc922cc3b07..61573fd19be 100644 --- a/config/src/test/java/com/yahoo/config/subscription/GenericConfigSubscriberTest.java +++ b/config/src/test/java/com/yahoo/config/subscription/GenericConfigSubscriberTest.java @@ -33,7 +33,7 @@ public class GenericConfigSubscriberTest { public void testSubscribeGeneric() throws InterruptedException { JRTConfigRequester requester = new JRTConfigRequester(new MockConnection(), tv); GenericConfigSubscriber sub = new GenericConfigSubscriber(requester); - final List<String> defContent = List.of("myVal int"); + List<String> defContent = List.of("myVal int"); GenericConfigHandle handle = sub.subscribe(new ConfigKey<>("simpletypes", "id", "config"), defContent, tv); @@ -46,9 +46,9 @@ public class GenericConfigSubscriberTest { assertFalse(handle.isChanged()); // Wait some time, config should be the same, but generation should be higher - Thread.sleep(tv.getFixedDelay() * 2); + Thread.sleep(tv.getFixedDelay() * 3); assertEquals("{}", getConfig(handle)); - assertTrue(handle.getRawConfig().getGeneration() > 1); + assertTrue("Unexpected generation (not > 1): " + handle.getRawConfig().getGeneration(), handle.getRawConfig().getGeneration() > 1); assertFalse(sub.nextConfig(false)); assertFalse(handle.isChanged()); } |