diff options
author | Harald Musum <musum@verizonmedia.com> | 2022-12-20 22:44:43 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-12-20 22:44:43 +0100 |
commit | 499bd6838571b2563456250cb0603404126dd022 (patch) | |
tree | 21e9356652e7f4bc310031c3520f4621f3f48880 /config | |
parent | 30765dc8f24708e7d54b5544dc1947e18d656117 (diff) | |
parent | 01ddb9946e2eb1a6cd0d51338938c0a90ff5ce95 (diff) |
Merge pull request #25306 from vespa-engine/balder/immediate-shutdown
Add an eof object that can be sent to the Q to wake up the ones waiti…
Diffstat (limited to 'config')
-rw-r--r-- | config/src/main/java/com/yahoo/config/subscription/impl/JRTConfigSubscription.java | 45 |
1 files changed, 40 insertions, 5 deletions
diff --git a/config/src/main/java/com/yahoo/config/subscription/impl/JRTConfigSubscription.java b/config/src/main/java/com/yahoo/config/subscription/impl/JRTConfigSubscription.java index 8580b4584e2..90d8951fbc3 100644 --- a/config/src/main/java/com/yahoo/config/subscription/impl/JRTConfigSubscription.java +++ b/config/src/main/java/com/yahoo/config/subscription/impl/JRTConfigSubscription.java @@ -4,20 +4,25 @@ package com.yahoo.config.subscription.impl; import com.yahoo.config.ConfigInstance; import com.yahoo.config.ConfigurationRuntimeException; import com.yahoo.config.subscription.ConfigInterruptedException; +import com.yahoo.jrt.Request; import com.yahoo.vespa.config.ConfigKey; import com.yahoo.vespa.config.ConfigPayload; +import com.yahoo.vespa.config.PayloadChecksums; import com.yahoo.vespa.config.TimingValues; import com.yahoo.vespa.config.protocol.CompressionType; +import com.yahoo.vespa.config.protocol.DefContent; import com.yahoo.vespa.config.protocol.JRTClientConfigRequest; import com.yahoo.vespa.config.protocol.Payload; +import com.yahoo.vespa.config.protocol.Trace; +import com.yahoo.vespa.config.protocol.VespaVersion; import java.time.Duration; import java.time.Instant; +import java.util.Optional; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; -import static com.yahoo.vespa.config.PayloadChecksum.Type.MD5; import static java.util.logging.Level.FINE; import static java.util.logging.Level.INFO; @@ -29,6 +34,33 @@ import static java.util.logging.Level.INFO; */ public class JRTConfigSubscription<T extends ConfigInstance> extends ConfigSubscription<T> { + private static class EOFJRTClientConfigRequest implements JRTClientConfigRequest { + @Override public boolean validateResponse() { return false; } + @Override public boolean hasUpdatedGeneration() { return false; } + @Override public Payload getNewPayload() { return null; } + @Override public JRTClientConfigRequest nextRequest(long timeout) { return null; } + @Override public boolean isError() { return false;} + @Override public long getNewGeneration() { return 0; } + @Override public boolean responseIsApplyOnRestart() { return false; } + @Override public PayloadChecksums getNewChecksums() { return null; } + @Override public boolean hasUpdatedConfig() { return false; } + @Override public Trace getResponseTrace() { return null; } + @Override public DefContent getDefContent() { return null; } + @Override public ConfigKey<?> getConfigKey() { return null; } + @Override public boolean validateParameters() { return false; } + @Override public String getRequestDefMd5() { return null; } + @Override public PayloadChecksums getRequestConfigChecksums() { return null; } + @Override public long getRequestGeneration() { return 0; } + @Override public Request getRequest() { return null; } + @Override public String getShortDescription() { return null;} + @Override public int errorCode() { return 0; } + @Override public String errorMessage() { return null; } + @Override public long getTimeout() { return 0; } + @Override public long getProtocolVersion() { return 0; } + @Override public String getClientHostName() { return null; } + @Override public Optional<VespaVersion> getVespaVersion() { return Optional.empty(); } + } + private final JRTConfigRequester requester; private final TimingValues timingValues; @@ -97,7 +129,8 @@ public class JRTConfigSubscription<T extends ConfigInstance> extends ConfigSubsc private JRTClientConfigRequest pollQueue(long timeoutMillis) { try { // Only valid responses are on queue, no need to validate - return responseQueue.poll(timeoutMillis, TimeUnit.MILLISECONDS); + JRTClientConfigRequest request = responseQueue.poll(timeoutMillis, TimeUnit.MILLISECONDS); + return (request instanceof EOFJRTClientConfigRequest) ? null : request; } catch (InterruptedException e1) { throw new ConfigInterruptedException(e1); } @@ -135,8 +168,7 @@ public class JRTConfigSubscription<T extends ConfigInstance> extends ConfigSubsc private T toConfigInstance(JRTClientConfigRequest jrtRequest) { Payload payload = jrtRequest.getNewPayload(); ConfigPayload configPayload = ConfigPayload.fromUtf8Array(payload.withCompression(CompressionType.UNCOMPRESSED).getData()); - T configInstance = configPayload.toInstance(configClass, jrtRequest.getConfigKey().getConfigId()); - return configInstance; + return configPayload.toInstance(configClass, jrtRequest.getConfigKey().getConfigId()); } // Called by JRTConfigRequester when there is a config response for this subscription @@ -158,12 +190,15 @@ public class JRTConfigSubscription<T extends ConfigInstance> extends ConfigSubsc } req = responseQueue.peek(); } - return req != null; + return (req != null) && ! (req instanceof EOFJRTClientConfigRequest); } @Override public void close() { super.close(); + if ( ! responseQueue.offer(new EOFJRTClientConfigRequest())) { + setException(new ConfigurationRuntimeException("Failed offering EOF to queue during close() " + this)); + } responseQueue = new LinkedBlockingQueue<>() { @SuppressWarnings("NullableProblems") @Override |