summaryrefslogtreecommitdiffstats
path: root/metrics-proxy
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2022-11-01 16:21:19 +0100
committerHenning Baldersheim <balder@yahoo-inc.com>2022-11-01 16:21:19 +0100
commit1510b566b8c65a235e33b190021f12367727cb1a (patch)
tree2774e175d58b71df5a14c0e559e86de1efd64b52 /metrics-proxy
parent3231cdcaeb3491ac20616bb19e3dc5dcd8ec2ce7 (diff)
- Handle CancellationException.
- Check stopped on every iteration. - Resort to Thread.interrupt() if nothing else helps, and log a warning.
Diffstat (limited to 'metrics-proxy')
-rw-r--r--metrics-proxy/src/main/java/ai/vespa/metricsproxy/http/application/ApplicationMetricsRetriever.java22
1 files changed, 15 insertions, 7 deletions
diff --git a/metrics-proxy/src/main/java/ai/vespa/metricsproxy/http/application/ApplicationMetricsRetriever.java b/metrics-proxy/src/main/java/ai/vespa/metricsproxy/http/application/ApplicationMetricsRetriever.java
index 2bdad7d1f0b..bc550ccb1d1 100644
--- a/metrics-proxy/src/main/java/ai/vespa/metricsproxy/http/application/ApplicationMetricsRetriever.java
+++ b/metrics-proxy/src/main/java/ai/vespa/metricsproxy/http/application/ApplicationMetricsRetriever.java
@@ -25,7 +25,9 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.CancellationException;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Collectors;
@@ -54,7 +56,7 @@ public class ApplicationMetricsRetriever extends AbstractComponent implements Ru
private final Thread pollThread;
private final Set<ConsumerId> consumerSet;
private long pollCount = 0;
- private boolean stopped;
+ private final AtomicBoolean stopped;
private final AtomicReference<Duration> taskTimeout;
@@ -62,7 +64,7 @@ public class ApplicationMetricsRetriever extends AbstractComponent implements Ru
public ApplicationMetricsRetriever(MetricsNodesConfig nodesConfig) {
clients = createNodeClients(nodesConfig);
taskTimeout = new AtomicReference<>(timeout(clients.size()));
- stopped = false;
+ stopped = new AtomicBoolean(false);
consumerSet = new HashSet<>();
httpClient.start();
pollThread = new Thread(this, "metrics-poller");
@@ -90,7 +92,7 @@ public class ApplicationMetricsRetriever extends AbstractComponent implements Ru
pollCount++;
pollThread.notifyAll();
pollThread.wait(timeUntilNextPoll.toMillis());
- if (stopped) return;
+ if (stopped.get()) return;
}
} catch (InterruptedException e) {
} catch (Exception e) {
@@ -108,11 +110,16 @@ public class ApplicationMetricsRetriever extends AbstractComponent implements Ru
log.warning("Failed closing httpclient: " + e);
}
synchronized (pollThread) {
- stopped = true;
+ stopped.set(true);
pollThread.notifyAll();
}
try {
- pollThread.join();
+ pollThread.join(Duration.ofSeconds(3).toMillis());
+ if (pollThread.isAlive()) {
+ log.log(Level.WARNING, "metrics poller thread still not stopped, using Thread.interrupt():");
+ pollThread.interrupt();
+ pollThread.join();
+ }
} catch (InterruptedException e) {}
super.deconstruct();
}
@@ -160,12 +167,13 @@ public class ApplicationMetricsRetriever extends AbstractComponent implements Ru
int numOk = 0;
int numTried = futures.size();
for (Map.Entry<Node, Future<?>> entry : futures.entrySet()) {
+ if (stopped.get()) break;
try {
entry.getValue().get(taskTimeout.get().toMillis(), TimeUnit.MILLISECONDS);
numOk++;
- } catch (InterruptedException | ExecutionException | TimeoutException e) {
+ } catch (InterruptedException | ExecutionException | TimeoutException | CancellationException e) {
Throwable cause = e.getCause();
- if (stopped || e instanceof ExecutionException && ((cause instanceof SocketException) || cause instanceof ConnectTimeoutException)) {
+ if (stopped.get() || e instanceof ExecutionException && ((cause instanceof SocketException) || cause instanceof ConnectTimeoutException)) {
log.log(Level.FINE, "Failed retrieving metrics for '" + entry.getKey() + "' : " + cause.getMessage());
} else {
log.log(Level.WARNING, "Failed retrieving metrics for '" + entry.getKey() + "' : ", e);