summaryrefslogtreecommitdiffstats
path: root/config/src/main/java/com/yahoo/config/subscription/impl/JRTConfigSubscription.java
diff options
context:
space:
mode:
Diffstat (limited to 'config/src/main/java/com/yahoo/config/subscription/impl/JRTConfigSubscription.java')
-rw-r--r--config/src/main/java/com/yahoo/config/subscription/impl/JRTConfigSubscription.java21
1 files changed, 18 insertions, 3 deletions
diff --git a/config/src/main/java/com/yahoo/config/subscription/impl/JRTConfigSubscription.java b/config/src/main/java/com/yahoo/config/subscription/impl/JRTConfigSubscription.java
index 0b98e9cd1b2..b27c75fb61d 100644
--- a/config/src/main/java/com/yahoo/config/subscription/impl/JRTConfigSubscription.java
+++ b/config/src/main/java/com/yahoo/config/subscription/impl/JRTConfigSubscription.java
@@ -4,6 +4,8 @@ package com.yahoo.config.subscription.impl;
import com.yahoo.config.ConfigInstance;
import com.yahoo.config.ConfigurationRuntimeException;
import com.yahoo.config.subscription.ConfigInterruptedException;
+import com.yahoo.config.subscription.ConfigSourceSet;
+import com.yahoo.config.subscription.ConfigSubscriber;
import com.yahoo.vespa.config.ConfigKey;
import com.yahoo.vespa.config.ConfigPayload;
import com.yahoo.vespa.config.TimingValues;
@@ -29,8 +31,9 @@ import static java.util.logging.Level.INFO;
*/
public class JRTConfigSubscription<T extends ConfigInstance> extends ConfigSubscription<T> {
- private final JRTConfigRequester requester;
+ private JRTConfigRequester requester;
private final TimingValues timingValues;
+ private final ConfigSubscriber subscriber;
// Last time we got an OK JRT callback
private Instant lastOK = Instant.MIN;
@@ -40,11 +43,13 @@ public class JRTConfigSubscription<T extends ConfigInstance> extends ConfigSubsc
* but has not yet been handled.
*/
private BlockingQueue<JRTClientConfigRequest> reqQueue = new LinkedBlockingQueue<>();
+ private final ConfigSourceSet sources;
- public JRTConfigSubscription(ConfigKey<T> key, JRTConfigRequester requester, TimingValues timingValues) {
+ public JRTConfigSubscription(ConfigKey<T> key, ConfigSubscriber subscriber, ConfigSourceSet source, TimingValues timingValues) {
super(key);
this.timingValues = timingValues;
- this.requester = requester;
+ this.subscriber = subscriber;
+ this.sources = source;
}
@Override
@@ -143,6 +148,7 @@ public class JRTConfigSubscription<T extends ConfigInstance> extends ConfigSubsc
@Override
public boolean subscribe(long timeout) {
lastOK = Instant.now();
+ requester = getRequester();
requester.request(this);
JRTClientConfigRequest req = reqQueue.peek();
while (req == null && (Instant.now().isBefore(lastOK.plus(Duration.ofMillis(timeout))))) {
@@ -156,6 +162,15 @@ public class JRTConfigSubscription<T extends ConfigInstance> extends ConfigSubsc
return req != null;
}
+ private JRTConfigRequester getRequester() {
+ JRTConfigRequester requester = subscriber.requesters().get(sources);
+ if (requester == null) {
+ requester = JRTConfigRequester.create(sources, timingValues);
+ subscriber.requesters().put(sources, requester);
+ }
+ return requester;
+ }
+
@Override
@SuppressWarnings("serial")
public void close() {