diff options
author | Harald Musum <musum@yahooinc.com> | 2021-10-06 17:59:41 +0200 |
---|---|---|
committer | Harald Musum <musum@yahooinc.com> | 2021-10-06 17:59:41 +0200 |
commit | 1ae463dc5a1c9ff89687f45cce01ce7087ffe4e6 (patch) | |
tree | f7ef58efb456a0bedc0606eaaf3386cc83008fdf /config/src/main/java | |
parent | 717082dcd93b2c158ccb815d19dbd41ded1809c4 (diff) |
Refactor so that logic is in nextConfig()
Only other change is that we no longer call peek() and return early
if there is nothing in queue, I cannot see any reason for that.
Diffstat (limited to 'config/src/main/java')
3 files changed, 45 insertions, 49 deletions
diff --git a/config/src/main/java/com/yahoo/config/subscription/ConfigSubscriber.java b/config/src/main/java/com/yahoo/config/subscription/ConfigSubscriber.java index 558213c43b9..f5ed79a1d44 100644 --- a/config/src/main/java/com/yahoo/config/subscription/ConfigSubscriber.java +++ b/config/src/main/java/com/yahoo/config/subscription/ConfigSubscriber.java @@ -1,4 +1,4 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.config.subscription; import com.yahoo.config.ConfigInstance; @@ -257,7 +257,7 @@ public class ConfigSubscriber implements AutoCloseable { return acquireSnapshot(timeoutMillis, false, isInitializing); } - @Deprecated // TODO: Remov4e on Vespa 8 + @Deprecated // TODO: Remove on Vespa 8 public boolean nextGeneration(long timeoutMillis) { return nextGeneration(timeoutMillis, false); } diff --git a/config/src/main/java/com/yahoo/config/subscription/impl/JRTConfigRequester.java b/config/src/main/java/com/yahoo/config/subscription/impl/JRTConfigRequester.java index 7713d509f69..dc37559c37e 100644 --- a/config/src/main/java/com/yahoo/config/subscription/impl/JRTConfigRequester.java +++ b/config/src/main/java/com/yahoo/config/subscription/impl/JRTConfigRequester.java @@ -162,8 +162,7 @@ public class JRTConfigRequester implements RequestWaiter { private void handleFailedRequest(JRTClientConfigRequest jrtReq, JRTConfigSubscription<ConfigInstance> sub, Connection connection) { logError(jrtReq, connection); - // The subscription object has an "old" config, which is all we have to offer back now - log.log(INFO, "Failure of config subscription tp " + connection.getAddress() + + log.log(INFO, "Failure of config subscription to " + connection.getAddress() + ", clients will keep existing config until resolved: " + sub); connectionPool.setError(connection, jrtReq.errorCode()); failures++; @@ -194,12 +193,7 @@ public class JRTConfigRequester implements RequestWaiter { sub.setLastCallBackOKTS(Instant.now()); log.log(FINE, () -> "OK response received in handleOkRequest: " + jrtReq); if (jrtReq.hasUpdatedGeneration()) { - // We only want this latest generation to be in the queue, we do not preserve history in this system - sub.getReqQueue().clear(); - boolean putOK = sub.getReqQueue().offer(jrtReq); - if (!putOK) { - sub.setException(new ConfigurationRuntimeException("Could not put returned request on queue of subscription " + sub)); - } + sub.updateConfig(jrtReq); } scheduleNextRequest(jrtReq, sub, calculateSuccessDelay(), calculateSuccessTimeout()); } diff --git a/config/src/main/java/com/yahoo/config/subscription/impl/JRTConfigSubscription.java b/config/src/main/java/com/yahoo/config/subscription/impl/JRTConfigSubscription.java index d8551c37f41..123366d4695 100644 --- a/config/src/main/java/com/yahoo/config/subscription/impl/JRTConfigSubscription.java +++ b/config/src/main/java/com/yahoo/config/subscription/impl/JRTConfigSubscription.java @@ -2,6 +2,7 @@ package com.yahoo.config.subscription.impl; import com.yahoo.config.ConfigInstance; +import com.yahoo.config.ConfigurationRuntimeException; import com.yahoo.config.subscription.ConfigInterruptedException; import com.yahoo.config.subscription.ConfigSource; import com.yahoo.config.subscription.ConfigSourceSet; @@ -24,7 +25,8 @@ import static java.util.logging.Level.FINE; import static java.util.logging.Level.INFO; /** - * A JRT config subscription uses one {@link JRTConfigRequester} to fetch config using Vespa RPC from a config source, typically proxy or server + * A config subscription for a config instance, gets config using Vespa RPC from a config source + * (config proxy or config server). * * @author vegardh */ @@ -37,7 +39,7 @@ public class JRTConfigSubscription<T extends ConfigInstance> extends ConfigSubsc private Instant lastOK = Instant.MIN; /** - * The queue containing either nothing or the one (newest) request that has got callback from JRT, + * A queue containing either zero or one (the newest) request that got a callback from JRT, * but has not yet been handled. */ private BlockingQueue<JRTClientConfigRequest> reqQueue = new LinkedBlockingQueue<>(); @@ -53,56 +55,50 @@ public class JRTConfigSubscription<T extends ConfigInstance> extends ConfigSubsc @Override public boolean nextConfig(long timeoutMillis) { + // Note: since the JRT callback thread will clear the queue first when it inserts a brand new element, + // (see #updateConfig()) there is a race here. However: the caller will handle it no matter what it gets + // from the queue here, the important part is that local state on the subscription objects is preserved. + + // Poll the queue for a next config until timeout + JRTClientConfigRequest jrtReq = pollQueue(timeoutMillis); + if (jrtReq == null) return newConfigOrException(); + + log.log(FINE, () -> "Polled queue and found config " + jrtReq); + // Might set the following (caller must check): + // generation, generation changed, config, config changed + // Important: it never <em>resets</em> those flags, we must persist that state until the + // ConfigSubscriber clears it + if (jrtReq.hasUpdatedGeneration()) { + setApplyOnRestart(jrtReq.responseIsApplyOnRestart()); + if (jrtReq.hasUpdatedConfig()) { + setNewConfig(jrtReq); + } else { + setGeneration(jrtReq.getNewGeneration()); + } + } + + return newConfigOrException(); + } + + private boolean newConfigOrException() { // These flags may have been left true from a previous call, since ConfigSubscriber's nextConfig // not necessarily returned true and reset the flags then ConfigState<T> configState = getConfigState(); - boolean gotNew = configState.isGenerationChanged() || configState.isConfigChanged() || hasException(); - // Return that now, if there's nothing in queue, so that ConfigSubscriber can move on to other subscriptions to check - if (getReqQueue().peek() == null && gotNew) { - return true; - } - // Otherwise poll the queue for another generation or timeout - // - // Note: since the JRT callback thread will clear the queue first when it inserts a brand new element, - // there is a race here. However: the caller will handle it no matter what it gets from the queue here, - // the important part is that local state on the subscription objects is preserved. - if (!pollQueue(timeoutMillis)) return gotNew; - configState = getConfigState(); - gotNew = configState.isGenerationChanged() || configState.isConfigChanged() || hasException(); - return gotNew; + return configState.isGenerationChanged() || configState.isConfigChanged() || hasException(); } /** - * Polls the callback queue and <em>maybe</em> sets the following (caller must check): - * generation, generation changed, config, config changed - * Important: it never <em>resets</em> those flags, we must persist that state until the - * {@link ConfigSubscriber} clears it + * Polls the callback queue for new config * * @param timeoutMillis timeout when polling (returns after at most this time) - * @return true if it got anything off the queue and <em>maybe</em> changed any state, false if timed out taking from queue */ - private boolean pollQueue(long timeoutMillis) { - JRTClientConfigRequest jrtReq; + private JRTClientConfigRequest pollQueue(long timeoutMillis) { try { // Only valid responses are on queue, no need to validate - jrtReq = getReqQueue().poll(timeoutMillis, TimeUnit.MILLISECONDS); + return reqQueue.poll(timeoutMillis, TimeUnit.MILLISECONDS); } catch (InterruptedException e1) { throw new ConfigInterruptedException(e1); } - if (jrtReq == null) { - // timed out, we know nothing new. - return false; - } - log.log(FINE, () -> "Polled queue and found config " + jrtReq); - if (jrtReq.hasUpdatedGeneration()) { - setApplyOnRestart(jrtReq.responseIsApplyOnRestart()); - if (jrtReq.hasUpdatedConfig()) { - setNewConfig(jrtReq); - } else { - setGeneration(jrtReq.getNewGeneration()); - } - } - return true; } protected void setNewConfig(JRTClientConfigRequest jrtReq) { @@ -135,7 +131,13 @@ public class JRTConfigSubscription<T extends ConfigInstance> extends ConfigSubsc return configInstance; } - BlockingQueue<JRTClientConfigRequest> getReqQueue() { return reqQueue; } + // Will be called by JRTConfigRequester when there is a config with new generation for this subscription + void updateConfig(JRTClientConfigRequest jrtReq) { + // We only want this latest generation to be in the queue, we do not preserve history in this system + reqQueue.clear(); + if ( ! reqQueue.offer(jrtReq)) + setException(new ConfigurationRuntimeException("Failed offering returned request to queue of subscription " + this)); + } @Override public boolean subscribe(long timeout) { |