summaryrefslogtreecommitdiffstats
path: root/config
diff options
context:
space:
mode:
Diffstat (limited to 'config')
-rw-r--r--config/src/main/java/com/yahoo/config/subscription/ConfigSubscriber.java4
-rw-r--r--config/src/main/java/com/yahoo/config/subscription/impl/JRTConfigRequester.java10
-rw-r--r--config/src/main/java/com/yahoo/config/subscription/impl/JRTConfigSubscription.java80
3 files changed, 45 insertions, 49 deletions
diff --git a/config/src/main/java/com/yahoo/config/subscription/ConfigSubscriber.java b/config/src/main/java/com/yahoo/config/subscription/ConfigSubscriber.java
index 558213c43b9..f5ed79a1d44 100644
--- a/config/src/main/java/com/yahoo/config/subscription/ConfigSubscriber.java
+++ b/config/src/main/java/com/yahoo/config/subscription/ConfigSubscriber.java
@@ -1,4 +1,4 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.config.subscription;
import com.yahoo.config.ConfigInstance;
@@ -257,7 +257,7 @@ public class ConfigSubscriber implements AutoCloseable {
return acquireSnapshot(timeoutMillis, false, isInitializing);
}
- @Deprecated // TODO: Remov4e on Vespa 8
+ @Deprecated // TODO: Remove on Vespa 8
public boolean nextGeneration(long timeoutMillis) {
return nextGeneration(timeoutMillis, false);
}
diff --git a/config/src/main/java/com/yahoo/config/subscription/impl/JRTConfigRequester.java b/config/src/main/java/com/yahoo/config/subscription/impl/JRTConfigRequester.java
index ce24d784776..c88bdf9f3b8 100644
--- a/config/src/main/java/com/yahoo/config/subscription/impl/JRTConfigRequester.java
+++ b/config/src/main/java/com/yahoo/config/subscription/impl/JRTConfigRequester.java
@@ -162,8 +162,7 @@ public class JRTConfigRequester implements RequestWaiter {
private void handleFailedRequest(JRTClientConfigRequest jrtReq, JRTConfigSubscription<ConfigInstance> sub, Connection connection) {
logError(jrtReq, connection);
- // The subscription object has an "old" config, which is all we have to offer back now
- log.log(INFO, "Failure of config subscription tp " + connection.getAddress() +
+ log.log(INFO, "Failure of config subscription to " + connection.getAddress() +
", clients will keep existing config until resolved: " + sub);
connectionPool.switchConnection(connection);
failures++;
@@ -194,12 +193,7 @@ public class JRTConfigRequester implements RequestWaiter {
sub.setLastCallBackOKTS(Instant.now());
log.log(FINE, () -> "OK response received in handleOkRequest: " + jrtReq);
if (jrtReq.hasUpdatedGeneration()) {
- // We only want this latest generation to be in the queue, we do not preserve history in this system
- sub.getReqQueue().clear();
- boolean putOK = sub.getReqQueue().offer(jrtReq);
- if (!putOK) {
- sub.setException(new ConfigurationRuntimeException("Could not put returned request on queue of subscription " + sub));
- }
+ sub.updateConfig(jrtReq);
}
scheduleNextRequest(jrtReq, sub, calculateSuccessDelay(), calculateSuccessTimeout());
}
diff --git a/config/src/main/java/com/yahoo/config/subscription/impl/JRTConfigSubscription.java b/config/src/main/java/com/yahoo/config/subscription/impl/JRTConfigSubscription.java
index d8551c37f41..123366d4695 100644
--- a/config/src/main/java/com/yahoo/config/subscription/impl/JRTConfigSubscription.java
+++ b/config/src/main/java/com/yahoo/config/subscription/impl/JRTConfigSubscription.java
@@ -2,6 +2,7 @@
package com.yahoo.config.subscription.impl;
import com.yahoo.config.ConfigInstance;
+import com.yahoo.config.ConfigurationRuntimeException;
import com.yahoo.config.subscription.ConfigInterruptedException;
import com.yahoo.config.subscription.ConfigSource;
import com.yahoo.config.subscription.ConfigSourceSet;
@@ -24,7 +25,8 @@ import static java.util.logging.Level.FINE;
import static java.util.logging.Level.INFO;
/**
- * A JRT config subscription uses one {@link JRTConfigRequester} to fetch config using Vespa RPC from a config source, typically proxy or server
+ * A config subscription for a config instance, gets config using Vespa RPC from a config source
+ * (config proxy or config server).
*
* @author vegardh
*/
@@ -37,7 +39,7 @@ public class JRTConfigSubscription<T extends ConfigInstance> extends ConfigSubsc
private Instant lastOK = Instant.MIN;
/**
- * The queue containing either nothing or the one (newest) request that has got callback from JRT,
+ * A queue containing either zero or one (the newest) request that got a callback from JRT,
* but has not yet been handled.
*/
private BlockingQueue<JRTClientConfigRequest> reqQueue = new LinkedBlockingQueue<>();
@@ -53,56 +55,50 @@ public class JRTConfigSubscription<T extends ConfigInstance> extends ConfigSubsc
@Override
public boolean nextConfig(long timeoutMillis) {
+ // Note: since the JRT callback thread will clear the queue first when it inserts a brand new element,
+ // (see #updateConfig()) there is a race here. However: the caller will handle it no matter what it gets
+ // from the queue here, the important part is that local state on the subscription objects is preserved.
+
+ // Poll the queue for a next config until timeout
+ JRTClientConfigRequest jrtReq = pollQueue(timeoutMillis);
+ if (jrtReq == null) return newConfigOrException();
+
+ log.log(FINE, () -> "Polled queue and found config " + jrtReq);
+ // Might set the following (caller must check):
+ // generation, generation changed, config, config changed
+ // Important: it never <em>resets</em> those flags, we must persist that state until the
+ // ConfigSubscriber clears it
+ if (jrtReq.hasUpdatedGeneration()) {
+ setApplyOnRestart(jrtReq.responseIsApplyOnRestart());
+ if (jrtReq.hasUpdatedConfig()) {
+ setNewConfig(jrtReq);
+ } else {
+ setGeneration(jrtReq.getNewGeneration());
+ }
+ }
+
+ return newConfigOrException();
+ }
+
+ private boolean newConfigOrException() {
// These flags may have been left true from a previous call, since ConfigSubscriber's nextConfig
// not necessarily returned true and reset the flags then
ConfigState<T> configState = getConfigState();
- boolean gotNew = configState.isGenerationChanged() || configState.isConfigChanged() || hasException();
- // Return that now, if there's nothing in queue, so that ConfigSubscriber can move on to other subscriptions to check
- if (getReqQueue().peek() == null && gotNew) {
- return true;
- }
- // Otherwise poll the queue for another generation or timeout
- //
- // Note: since the JRT callback thread will clear the queue first when it inserts a brand new element,
- // there is a race here. However: the caller will handle it no matter what it gets from the queue here,
- // the important part is that local state on the subscription objects is preserved.
- if (!pollQueue(timeoutMillis)) return gotNew;
- configState = getConfigState();
- gotNew = configState.isGenerationChanged() || configState.isConfigChanged() || hasException();
- return gotNew;
+ return configState.isGenerationChanged() || configState.isConfigChanged() || hasException();
}
/**
- * Polls the callback queue and <em>maybe</em> sets the following (caller must check):
- * generation, generation changed, config, config changed
- * Important: it never <em>resets</em> those flags, we must persist that state until the
- * {@link ConfigSubscriber} clears it
+ * Polls the callback queue for new config
*
* @param timeoutMillis timeout when polling (returns after at most this time)
- * @return true if it got anything off the queue and <em>maybe</em> changed any state, false if timed out taking from queue
*/
- private boolean pollQueue(long timeoutMillis) {
- JRTClientConfigRequest jrtReq;
+ private JRTClientConfigRequest pollQueue(long timeoutMillis) {
try {
// Only valid responses are on queue, no need to validate
- jrtReq = getReqQueue().poll(timeoutMillis, TimeUnit.MILLISECONDS);
+ return reqQueue.poll(timeoutMillis, TimeUnit.MILLISECONDS);
} catch (InterruptedException e1) {
throw new ConfigInterruptedException(e1);
}
- if (jrtReq == null) {
- // timed out, we know nothing new.
- return false;
- }
- log.log(FINE, () -> "Polled queue and found config " + jrtReq);
- if (jrtReq.hasUpdatedGeneration()) {
- setApplyOnRestart(jrtReq.responseIsApplyOnRestart());
- if (jrtReq.hasUpdatedConfig()) {
- setNewConfig(jrtReq);
- } else {
- setGeneration(jrtReq.getNewGeneration());
- }
- }
- return true;
}
protected void setNewConfig(JRTClientConfigRequest jrtReq) {
@@ -135,7 +131,13 @@ public class JRTConfigSubscription<T extends ConfigInstance> extends ConfigSubsc
return configInstance;
}
- BlockingQueue<JRTClientConfigRequest> getReqQueue() { return reqQueue; }
+ // Will be called by JRTConfigRequester when there is a config with new generation for this subscription
+ void updateConfig(JRTClientConfigRequest jrtReq) {
+ // We only want this latest generation to be in the queue, we do not preserve history in this system
+ reqQueue.clear();
+ if ( ! reqQueue.offer(jrtReq))
+ setException(new ConfigurationRuntimeException("Failed offering returned request to queue of subscription " + this));
+ }
@Override
public boolean subscribe(long timeout) {