aboutsummaryrefslogtreecommitdiffstats
path: root/config
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2022-12-20 15:19:36 +0100
committerHenning Baldersheim <balder@yahoo-inc.com>2022-12-20 15:19:36 +0100
commit01ddb9946e2eb1a6cd0d51338938c0a90ff5ce95 (patch)
tree0996d6135d54cd6dbb726d782cb9df62409e87ad /config
parent2d82ff0d3a0f95a3b19c61e318a3cb3dfa6a2491 (diff)
Add an eof object that can be sent to the Q to wake up the ones waiting for config.
This enables faster close.
Diffstat (limited to 'config')
-rw-r--r--config/src/main/java/com/yahoo/config/subscription/impl/JRTConfigSubscription.java45
1 files changed, 40 insertions, 5 deletions
diff --git a/config/src/main/java/com/yahoo/config/subscription/impl/JRTConfigSubscription.java b/config/src/main/java/com/yahoo/config/subscription/impl/JRTConfigSubscription.java
index 8580b4584e2..90d8951fbc3 100644
--- a/config/src/main/java/com/yahoo/config/subscription/impl/JRTConfigSubscription.java
+++ b/config/src/main/java/com/yahoo/config/subscription/impl/JRTConfigSubscription.java
@@ -4,20 +4,25 @@ package com.yahoo.config.subscription.impl;
import com.yahoo.config.ConfigInstance;
import com.yahoo.config.ConfigurationRuntimeException;
import com.yahoo.config.subscription.ConfigInterruptedException;
+import com.yahoo.jrt.Request;
import com.yahoo.vespa.config.ConfigKey;
import com.yahoo.vespa.config.ConfigPayload;
+import com.yahoo.vespa.config.PayloadChecksums;
import com.yahoo.vespa.config.TimingValues;
import com.yahoo.vespa.config.protocol.CompressionType;
+import com.yahoo.vespa.config.protocol.DefContent;
import com.yahoo.vespa.config.protocol.JRTClientConfigRequest;
import com.yahoo.vespa.config.protocol.Payload;
+import com.yahoo.vespa.config.protocol.Trace;
+import com.yahoo.vespa.config.protocol.VespaVersion;
import java.time.Duration;
import java.time.Instant;
+import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
-import static com.yahoo.vespa.config.PayloadChecksum.Type.MD5;
import static java.util.logging.Level.FINE;
import static java.util.logging.Level.INFO;
@@ -29,6 +34,33 @@ import static java.util.logging.Level.INFO;
*/
public class JRTConfigSubscription<T extends ConfigInstance> extends ConfigSubscription<T> {
+ private static class EOFJRTClientConfigRequest implements JRTClientConfigRequest {
+ @Override public boolean validateResponse() { return false; }
+ @Override public boolean hasUpdatedGeneration() { return false; }
+ @Override public Payload getNewPayload() { return null; }
+ @Override public JRTClientConfigRequest nextRequest(long timeout) { return null; }
+ @Override public boolean isError() { return false;}
+ @Override public long getNewGeneration() { return 0; }
+ @Override public boolean responseIsApplyOnRestart() { return false; }
+ @Override public PayloadChecksums getNewChecksums() { return null; }
+ @Override public boolean hasUpdatedConfig() { return false; }
+ @Override public Trace getResponseTrace() { return null; }
+ @Override public DefContent getDefContent() { return null; }
+ @Override public ConfigKey<?> getConfigKey() { return null; }
+ @Override public boolean validateParameters() { return false; }
+ @Override public String getRequestDefMd5() { return null; }
+ @Override public PayloadChecksums getRequestConfigChecksums() { return null; }
+ @Override public long getRequestGeneration() { return 0; }
+ @Override public Request getRequest() { return null; }
+ @Override public String getShortDescription() { return null;}
+ @Override public int errorCode() { return 0; }
+ @Override public String errorMessage() { return null; }
+ @Override public long getTimeout() { return 0; }
+ @Override public long getProtocolVersion() { return 0; }
+ @Override public String getClientHostName() { return null; }
+ @Override public Optional<VespaVersion> getVespaVersion() { return Optional.empty(); }
+ }
+
private final JRTConfigRequester requester;
private final TimingValues timingValues;
@@ -97,7 +129,8 @@ public class JRTConfigSubscription<T extends ConfigInstance> extends ConfigSubsc
private JRTClientConfigRequest pollQueue(long timeoutMillis) {
try {
// Only valid responses are on queue, no need to validate
- return responseQueue.poll(timeoutMillis, TimeUnit.MILLISECONDS);
+ JRTClientConfigRequest request = responseQueue.poll(timeoutMillis, TimeUnit.MILLISECONDS);
+ return (request instanceof EOFJRTClientConfigRequest) ? null : request;
} catch (InterruptedException e1) {
throw new ConfigInterruptedException(e1);
}
@@ -135,8 +168,7 @@ public class JRTConfigSubscription<T extends ConfigInstance> extends ConfigSubsc
private T toConfigInstance(JRTClientConfigRequest jrtRequest) {
Payload payload = jrtRequest.getNewPayload();
ConfigPayload configPayload = ConfigPayload.fromUtf8Array(payload.withCompression(CompressionType.UNCOMPRESSED).getData());
- T configInstance = configPayload.toInstance(configClass, jrtRequest.getConfigKey().getConfigId());
- return configInstance;
+ return configPayload.toInstance(configClass, jrtRequest.getConfigKey().getConfigId());
}
// Called by JRTConfigRequester when there is a config response for this subscription
@@ -158,12 +190,15 @@ public class JRTConfigSubscription<T extends ConfigInstance> extends ConfigSubsc
}
req = responseQueue.peek();
}
- return req != null;
+ return (req != null) && ! (req instanceof EOFJRTClientConfigRequest);
}
@Override
public void close() {
super.close();
+ if ( ! responseQueue.offer(new EOFJRTClientConfigRequest())) {
+ setException(new ConfigurationRuntimeException("Failed offering EOF to queue during close() " + this));
+ }
responseQueue = new LinkedBlockingQueue<>() {
@SuppressWarnings("NullableProblems")
@Override