From be42ed75fd30e172dd634acdce37985b1983b5a4 Mon Sep 17 00:00:00 2001 From: Henning Baldersheim Date: Fri, 23 Dec 2022 17:21:28 +0100 Subject: Revert "Revert "Add an eof object that can be sent to the Q to wake up the ones waiti…"" MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../subscription/impl/JRTConfigSubscription.java | 45 +++++++++++++++++++--- 1 file changed, 40 insertions(+), 5 deletions(-) diff --git a/config/src/main/java/com/yahoo/config/subscription/impl/JRTConfigSubscription.java b/config/src/main/java/com/yahoo/config/subscription/impl/JRTConfigSubscription.java index 8580b4584e2..90d8951fbc3 100644 --- a/config/src/main/java/com/yahoo/config/subscription/impl/JRTConfigSubscription.java +++ b/config/src/main/java/com/yahoo/config/subscription/impl/JRTConfigSubscription.java @@ -4,20 +4,25 @@ package com.yahoo.config.subscription.impl; import com.yahoo.config.ConfigInstance; import com.yahoo.config.ConfigurationRuntimeException; import com.yahoo.config.subscription.ConfigInterruptedException; +import com.yahoo.jrt.Request; import com.yahoo.vespa.config.ConfigKey; import com.yahoo.vespa.config.ConfigPayload; +import com.yahoo.vespa.config.PayloadChecksums; import com.yahoo.vespa.config.TimingValues; import com.yahoo.vespa.config.protocol.CompressionType; +import com.yahoo.vespa.config.protocol.DefContent; import com.yahoo.vespa.config.protocol.JRTClientConfigRequest; import com.yahoo.vespa.config.protocol.Payload; +import com.yahoo.vespa.config.protocol.Trace; +import com.yahoo.vespa.config.protocol.VespaVersion; import java.time.Duration; import java.time.Instant; +import java.util.Optional; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; -import static com.yahoo.vespa.config.PayloadChecksum.Type.MD5; import static java.util.logging.Level.FINE; import static java.util.logging.Level.INFO; @@ -29,6 +34,33 @@ import static java.util.logging.Level.INFO; */ public class JRTConfigSubscription extends ConfigSubscription { + private static class EOFJRTClientConfigRequest implements JRTClientConfigRequest { + @Override public boolean validateResponse() { return false; } + @Override public boolean hasUpdatedGeneration() { return false; } + @Override public Payload getNewPayload() { return null; } + @Override public JRTClientConfigRequest nextRequest(long timeout) { return null; } + @Override public boolean isError() { return false;} + @Override public long getNewGeneration() { return 0; } + @Override public boolean responseIsApplyOnRestart() { return false; } + @Override public PayloadChecksums getNewChecksums() { return null; } + @Override public boolean hasUpdatedConfig() { return false; } + @Override public Trace getResponseTrace() { return null; } + @Override public DefContent getDefContent() { return null; } + @Override public ConfigKey getConfigKey() { return null; } + @Override public boolean validateParameters() { return false; } + @Override public String getRequestDefMd5() { return null; } + @Override public PayloadChecksums getRequestConfigChecksums() { return null; } + @Override public long getRequestGeneration() { return 0; } + @Override public Request getRequest() { return null; } + @Override public String getShortDescription() { return null;} + @Override public int errorCode() { return 0; } + @Override public String errorMessage() { return null; } + @Override public long getTimeout() { return 0; } + @Override public long getProtocolVersion() { return 0; } + @Override public String getClientHostName() { return null; } + @Override public Optional getVespaVersion() { return Optional.empty(); } + } + private final JRTConfigRequester requester; private final TimingValues timingValues; @@ -97,7 +129,8 @@ public class JRTConfigSubscription extends ConfigSubsc private JRTClientConfigRequest pollQueue(long timeoutMillis) { try { // Only valid responses are on queue, no need to validate - return responseQueue.poll(timeoutMillis, TimeUnit.MILLISECONDS); + JRTClientConfigRequest request = responseQueue.poll(timeoutMillis, TimeUnit.MILLISECONDS); + return (request instanceof EOFJRTClientConfigRequest) ? null : request; } catch (InterruptedException e1) { throw new ConfigInterruptedException(e1); } @@ -135,8 +168,7 @@ public class JRTConfigSubscription extends ConfigSubsc private T toConfigInstance(JRTClientConfigRequest jrtRequest) { Payload payload = jrtRequest.getNewPayload(); ConfigPayload configPayload = ConfigPayload.fromUtf8Array(payload.withCompression(CompressionType.UNCOMPRESSED).getData()); - T configInstance = configPayload.toInstance(configClass, jrtRequest.getConfigKey().getConfigId()); - return configInstance; + return configPayload.toInstance(configClass, jrtRequest.getConfigKey().getConfigId()); } // Called by JRTConfigRequester when there is a config response for this subscription @@ -158,12 +190,15 @@ public class JRTConfigSubscription extends ConfigSubsc } req = responseQueue.peek(); } - return req != null; + return (req != null) && ! (req instanceof EOFJRTClientConfigRequest); } @Override public void close() { super.close(); + if ( ! responseQueue.offer(new EOFJRTClientConfigRequest())) { + setException(new ConfigurationRuntimeException("Failed offering EOF to queue during close() " + this)); + } responseQueue = new LinkedBlockingQueue<>() { @SuppressWarnings("NullableProblems") @Override -- cgit v1.2.3