summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHarald Musum <musum@yahooinc.com>2021-10-06 17:59:41 +0200
committerHarald Musum <musum@yahooinc.com>2021-10-06 17:59:41 +0200
commit1ae463dc5a1c9ff89687f45cce01ce7087ffe4e6 (patch)
treef7ef58efb456a0bedc0606eaaf3386cc83008fdf
parent717082dcd93b2c158ccb815d19dbd41ded1809c4 (diff)
Refactor so that logic is in nextConfig()
Only other change is that we no longer call peek() and return early if there is nothing in queue, I cannot see any reason for that.
-rw-r--r--config/src/main/java/com/yahoo/config/subscription/ConfigSubscriber.java4
-rw-r--r--config/src/main/java/com/yahoo/config/subscription/impl/JRTConfigRequester.java10
-rw-r--r--config/src/main/java/com/yahoo/config/subscription/impl/JRTConfigSubscription.java80
3 files changed, 45 insertions, 49 deletions
diff --git a/config/src/main/java/com/yahoo/config/subscription/ConfigSubscriber.java b/config/src/main/java/com/yahoo/config/subscription/ConfigSubscriber.java
index 558213c43b9..f5ed79a1d44 100644
--- a/config/src/main/java/com/yahoo/config/subscription/ConfigSubscriber.java
+++ b/config/src/main/java/com/yahoo/config/subscription/ConfigSubscriber.java
@@ -1,4 +1,4 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.config.subscription;
import com.yahoo.config.ConfigInstance;
@@ -257,7 +257,7 @@ public class ConfigSubscriber implements AutoCloseable {
return acquireSnapshot(timeoutMillis, false, isInitializing);
}
- @Deprecated // TODO: Remov4e on Vespa 8
+ @Deprecated // TODO: Remove on Vespa 8
public boolean nextGeneration(long timeoutMillis) {
return nextGeneration(timeoutMillis, false);
}
diff --git a/config/src/main/java/com/yahoo/config/subscription/impl/JRTConfigRequester.java b/config/src/main/java/com/yahoo/config/subscription/impl/JRTConfigRequester.java
index 7713d509f69..dc37559c37e 100644
--- a/config/src/main/java/com/yahoo/config/subscription/impl/JRTConfigRequester.java
+++ b/config/src/main/java/com/yahoo/config/subscription/impl/JRTConfigRequester.java
@@ -162,8 +162,7 @@ public class JRTConfigRequester implements RequestWaiter {
private void handleFailedRequest(JRTClientConfigRequest jrtReq, JRTConfigSubscription<ConfigInstance> sub, Connection connection) {
logError(jrtReq, connection);
- // The subscription object has an "old" config, which is all we have to offer back now
- log.log(INFO, "Failure of config subscription tp " + connection.getAddress() +
+ log.log(INFO, "Failure of config subscription to " + connection.getAddress() +
", clients will keep existing config until resolved: " + sub);
connectionPool.setError(connection, jrtReq.errorCode());
failures++;
@@ -194,12 +193,7 @@ public class JRTConfigRequester implements RequestWaiter {
sub.setLastCallBackOKTS(Instant.now());
log.log(FINE, () -> "OK response received in handleOkRequest: " + jrtReq);
if (jrtReq.hasUpdatedGeneration()) {
- // We only want this latest generation to be in the queue, we do not preserve history in this system
- sub.getReqQueue().clear();
- boolean putOK = sub.getReqQueue().offer(jrtReq);
- if (!putOK) {
- sub.setException(new ConfigurationRuntimeException("Could not put returned request on queue of subscription " + sub));
- }
+ sub.updateConfig(jrtReq);
}
scheduleNextRequest(jrtReq, sub, calculateSuccessDelay(), calculateSuccessTimeout());
}
diff --git a/config/src/main/java/com/yahoo/config/subscription/impl/JRTConfigSubscription.java b/config/src/main/java/com/yahoo/config/subscription/impl/JRTConfigSubscription.java
index d8551c37f41..123366d4695 100644
--- a/config/src/main/java/com/yahoo/config/subscription/impl/JRTConfigSubscription.java
+++ b/config/src/main/java/com/yahoo/config/subscription/impl/JRTConfigSubscription.java
@@ -2,6 +2,7 @@
package com.yahoo.config.subscription.impl;
import com.yahoo.config.ConfigInstance;
+import com.yahoo.config.ConfigurationRuntimeException;
import com.yahoo.config.subscription.ConfigInterruptedException;
import com.yahoo.config.subscription.ConfigSource;
import com.yahoo.config.subscription.ConfigSourceSet;
@@ -24,7 +25,8 @@ import static java.util.logging.Level.FINE;
import static java.util.logging.Level.INFO;
/**
- * A JRT config subscription uses one {@link JRTConfigRequester} to fetch config using Vespa RPC from a config source, typically proxy or server
+ * A config subscription for a config instance, gets config using Vespa RPC from a config source
+ * (config proxy or config server).
*
* @author vegardh
*/
@@ -37,7 +39,7 @@ public class JRTConfigSubscription<T extends ConfigInstance> extends ConfigSubsc
private Instant lastOK = Instant.MIN;
/**
- * The queue containing either nothing or the one (newest) request that has got callback from JRT,
+ * A queue containing either zero or one (the newest) request that got a callback from JRT,
* but has not yet been handled.
*/
private BlockingQueue<JRTClientConfigRequest> reqQueue = new LinkedBlockingQueue<>();
@@ -53,56 +55,50 @@ public class JRTConfigSubscription<T extends ConfigInstance> extends ConfigSubsc
@Override
public boolean nextConfig(long timeoutMillis) {
+ // Note: since the JRT callback thread will clear the queue first when it inserts a brand new element,
+ // (see #updateConfig()) there is a race here. However: the caller will handle it no matter what it gets
+ // from the queue here, the important part is that local state on the subscription objects is preserved.
+
+ // Poll the queue for a next config until timeout
+ JRTClientConfigRequest jrtReq = pollQueue(timeoutMillis);
+ if (jrtReq == null) return newConfigOrException();
+
+ log.log(FINE, () -> "Polled queue and found config " + jrtReq);
+ // Might set the following (caller must check):
+ // generation, generation changed, config, config changed
+ // Important: it never <em>resets</em> those flags, we must persist that state until the
+ // ConfigSubscriber clears it
+ if (jrtReq.hasUpdatedGeneration()) {
+ setApplyOnRestart(jrtReq.responseIsApplyOnRestart());
+ if (jrtReq.hasUpdatedConfig()) {
+ setNewConfig(jrtReq);
+ } else {
+ setGeneration(jrtReq.getNewGeneration());
+ }
+ }
+
+ return newConfigOrException();
+ }
+
+ private boolean newConfigOrException() {
// These flags may have been left true from a previous call, since ConfigSubscriber's nextConfig
// not necessarily returned true and reset the flags then
ConfigState<T> configState = getConfigState();
- boolean gotNew = configState.isGenerationChanged() || configState.isConfigChanged() || hasException();
- // Return that now, if there's nothing in queue, so that ConfigSubscriber can move on to other subscriptions to check
- if (getReqQueue().peek() == null && gotNew) {
- return true;
- }
- // Otherwise poll the queue for another generation or timeout
- //
- // Note: since the JRT callback thread will clear the queue first when it inserts a brand new element,
- // there is a race here. However: the caller will handle it no matter what it gets from the queue here,
- // the important part is that local state on the subscription objects is preserved.
- if (!pollQueue(timeoutMillis)) return gotNew;
- configState = getConfigState();
- gotNew = configState.isGenerationChanged() || configState.isConfigChanged() || hasException();
- return gotNew;
+ return configState.isGenerationChanged() || configState.isConfigChanged() || hasException();
}
/**
- * Polls the callback queue and <em>maybe</em> sets the following (caller must check):
- * generation, generation changed, config, config changed
- * Important: it never <em>resets</em> those flags, we must persist that state until the
- * {@link ConfigSubscriber} clears it
+ * Polls the callback queue for new config
*
* @param timeoutMillis timeout when polling (returns after at most this time)
- * @return true if it got anything off the queue and <em>maybe</em> changed any state, false if timed out taking from queue
*/
- private boolean pollQueue(long timeoutMillis) {
- JRTClientConfigRequest jrtReq;
+ private JRTClientConfigRequest pollQueue(long timeoutMillis) {
try {
// Only valid responses are on queue, no need to validate
- jrtReq = getReqQueue().poll(timeoutMillis, TimeUnit.MILLISECONDS);
+ return reqQueue.poll(timeoutMillis, TimeUnit.MILLISECONDS);
} catch (InterruptedException e1) {
throw new ConfigInterruptedException(e1);
}
- if (jrtReq == null) {
- // timed out, we know nothing new.
- return false;
- }
- log.log(FINE, () -> "Polled queue and found config " + jrtReq);
- if (jrtReq.hasUpdatedGeneration()) {
- setApplyOnRestart(jrtReq.responseIsApplyOnRestart());
- if (jrtReq.hasUpdatedConfig()) {
- setNewConfig(jrtReq);
- } else {
- setGeneration(jrtReq.getNewGeneration());
- }
- }
- return true;
}
protected void setNewConfig(JRTClientConfigRequest jrtReq) {
@@ -135,7 +131,13 @@ public class JRTConfigSubscription<T extends ConfigInstance> extends ConfigSubsc
return configInstance;
}
- BlockingQueue<JRTClientConfigRequest> getReqQueue() { return reqQueue; }
+ // Will be called by JRTConfigRequester when there is a config with new generation for this subscription
+ void updateConfig(JRTClientConfigRequest jrtReq) {
+ // We only want this latest generation to be in the queue, we do not preserve history in this system
+ reqQueue.clear();
+ if ( ! reqQueue.offer(jrtReq))
+ setException(new ConfigurationRuntimeException("Failed offering returned request to queue of subscription " + this));
+ }
@Override
public boolean subscribe(long timeout) {