diff options
Diffstat (limited to 'config/src/main/java/com/yahoo/config/subscription/impl/JRTConfigSubscription.java')
-rw-r--r-- | config/src/main/java/com/yahoo/config/subscription/impl/JRTConfigSubscription.java | 21 |
1 files changed, 18 insertions, 3 deletions
diff --git a/config/src/main/java/com/yahoo/config/subscription/impl/JRTConfigSubscription.java b/config/src/main/java/com/yahoo/config/subscription/impl/JRTConfigSubscription.java index 0b98e9cd1b2..b27c75fb61d 100644 --- a/config/src/main/java/com/yahoo/config/subscription/impl/JRTConfigSubscription.java +++ b/config/src/main/java/com/yahoo/config/subscription/impl/JRTConfigSubscription.java @@ -4,6 +4,8 @@ package com.yahoo.config.subscription.impl; import com.yahoo.config.ConfigInstance; import com.yahoo.config.ConfigurationRuntimeException; import com.yahoo.config.subscription.ConfigInterruptedException; +import com.yahoo.config.subscription.ConfigSourceSet; +import com.yahoo.config.subscription.ConfigSubscriber; import com.yahoo.vespa.config.ConfigKey; import com.yahoo.vespa.config.ConfigPayload; import com.yahoo.vespa.config.TimingValues; @@ -29,8 +31,9 @@ import static java.util.logging.Level.INFO; */ public class JRTConfigSubscription<T extends ConfigInstance> extends ConfigSubscription<T> { - private final JRTConfigRequester requester; + private JRTConfigRequester requester; private final TimingValues timingValues; + private final ConfigSubscriber subscriber; // Last time we got an OK JRT callback private Instant lastOK = Instant.MIN; @@ -40,11 +43,13 @@ public class JRTConfigSubscription<T extends ConfigInstance> extends ConfigSubsc * but has not yet been handled. */ private BlockingQueue<JRTClientConfigRequest> reqQueue = new LinkedBlockingQueue<>(); + private final ConfigSourceSet sources; - public JRTConfigSubscription(ConfigKey<T> key, JRTConfigRequester requester, TimingValues timingValues) { + public JRTConfigSubscription(ConfigKey<T> key, ConfigSubscriber subscriber, ConfigSourceSet source, TimingValues timingValues) { super(key); this.timingValues = timingValues; - this.requester = requester; + this.subscriber = subscriber; + this.sources = source; } @Override @@ -143,6 +148,7 @@ public class JRTConfigSubscription<T extends ConfigInstance> extends ConfigSubsc @Override public boolean subscribe(long timeout) { lastOK = Instant.now(); + requester = getRequester(); requester.request(this); JRTClientConfigRequest req = reqQueue.peek(); while (req == null && (Instant.now().isBefore(lastOK.plus(Duration.ofMillis(timeout))))) { @@ -156,6 +162,15 @@ public class JRTConfigSubscription<T extends ConfigInstance> extends ConfigSubsc return req != null; } + private JRTConfigRequester getRequester() { + JRTConfigRequester requester = subscriber.requesters().get(sources); + if (requester == null) { + requester = JRTConfigRequester.create(sources, timingValues); + subscriber.requesters().put(sources, requester); + } + return requester; + } + @Override @SuppressWarnings("serial") public void close() { |