aboutsummaryrefslogtreecommitdiffstats
path: root/jdisc_core/src/main/java/com/yahoo/jdisc/core/ContainerWatchdog.java
diff options
context:
space:
mode:
Diffstat (limited to 'jdisc_core/src/main/java/com/yahoo/jdisc/core/ContainerWatchdog.java')
-rw-r--r--jdisc_core/src/main/java/com/yahoo/jdisc/core/ContainerWatchdog.java90
1 files changed, 86 insertions, 4 deletions
diff --git a/jdisc_core/src/main/java/com/yahoo/jdisc/core/ContainerWatchdog.java b/jdisc_core/src/main/java/com/yahoo/jdisc/core/ContainerWatchdog.java
index c010eac02b5..f28d5ea2b26 100644
--- a/jdisc_core/src/main/java/com/yahoo/jdisc/core/ContainerWatchdog.java
+++ b/jdisc_core/src/main/java/com/yahoo/jdisc/core/ContainerWatchdog.java
@@ -1,18 +1,29 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.jdisc.core;
+import com.yahoo.concurrent.Threads;
import com.yahoo.jdisc.Metric;
import com.yahoo.jdisc.statistics.ContainerWatchdogMetrics;
+import com.yahoo.lang.MutableInteger;
+import org.apache.felix.framework.BundleWiringImpl;
+import org.osgi.framework.Bundle;
+import java.lang.reflect.Field;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
+import java.util.Optional;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.logging.Level;
import java.util.logging.Logger;
/**
@@ -24,7 +35,7 @@ class ContainerWatchdog implements ContainerWatchdogMetrics, AutoCloseable {
static final Duration GRACE_PERIOD = Duration.ofMinutes(5);
static final Duration UPDATE_PERIOD = Duration.ofMinutes(5);
- static final Duration CHECK_PERIOD = Duration.ofSeconds(1);
+ static final Duration CONTAINER_CHECK_PERIOD = Duration.ofSeconds(1);
private static final Logger log = Logger.getLogger(ContainerWatchdog.class.getName());
@@ -37,6 +48,9 @@ class ContainerWatchdog implements ContainerWatchdogMetrics, AutoCloseable {
private Instant currentContainerActivationTime;
private int numStaleContainers;
private Instant lastLogTime;
+ private ScheduledFuture<?> threadMonitoringTask;
+ private ScheduledFuture<?> containerMontoringTask;
+
ContainerWatchdog() {
this(new ScheduledThreadPoolExecutor(
@@ -53,7 +67,11 @@ class ContainerWatchdog implements ContainerWatchdogMetrics, AutoCloseable {
this.scheduler = scheduler;
this.clock = clock;
this.lastLogTime = clock.instant();
- scheduler.scheduleAtFixedRate(this::monitorDeactivatedContainers, CHECK_PERIOD.getSeconds(), CHECK_PERIOD.getSeconds(), TimeUnit.SECONDS);
+ }
+
+ void start() {
+ this.containerMontoringTask = scheduler.scheduleAtFixedRate(this::monitorDeactivatedContainers,
+ CONTAINER_CHECK_PERIOD.getSeconds(), CONTAINER_CHECK_PERIOD.getSeconds(), TimeUnit.SECONDS);
}
@Override
@@ -67,8 +85,8 @@ class ContainerWatchdog implements ContainerWatchdogMetrics, AutoCloseable {
@Override
public void close() throws InterruptedException {
- scheduler.shutdownNow();
- scheduler.awaitTermination(1, TimeUnit.MINUTES);
+ if (containerMontoringTask != null) containerMontoringTask.cancel(false);
+ if (threadMonitoringTask != null) threadMonitoringTask.cancel(false);
synchronized (monitor) {
deactivatedContainers.clear();
currentContainer = null;
@@ -83,6 +101,8 @@ class ContainerWatchdog implements ContainerWatchdogMetrics, AutoCloseable {
}
currentContainer = nextContainer;
currentContainerActivationTime = clock.instant();
+ if (threadMonitoringTask != null) threadMonitoringTask.cancel(false);
+ threadMonitoringTask = scheduler.schedule(this::monitorThreads, 1, TimeUnit.MINUTES);
}
}
@@ -126,6 +146,68 @@ class ContainerWatchdog implements ContainerWatchdogMetrics, AutoCloseable {
return clock.instant().isAfter(container.timeDeactivated.plus(GRACE_PERIOD));
}
+ private void monitorThreads() {
+ Collection<Thread> threads = Threads.getAllThreads();
+ warnOnThreadsHavingClassloaderFromUninstalledBundles(threads);
+ }
+
+ /**
+ * Detect and warn on threads leaked by defunct application components.
+ * These are threads launched by components of application bundles and not stopped after subsequent component deconstruction.
+ * The below algorithm is a heuristic and will not detect all leaked threads.
+ */
+ private void warnOnThreadsHavingClassloaderFromUninstalledBundles(Collection<Thread> threads) {
+ record ThreadDetails(Thread thread, Bundle bundle) {}
+ List<ThreadDetails> staleThreads = new ArrayList<>();
+ for (Thread t : threads) {
+ // Find threads which are sub-classes of java.lang.Thread from an uninstalled bundle
+ Bundle b = hasClassloaderForUninstalledBundle(t).orElse(null);
+ if (b != null) {
+ staleThreads.add(new ThreadDetails(t, b));
+ continue;
+ }
+ // Find threads which Runnable is a class from an uninstalled bundle
+ // This may create false positives
+ b = getTargetFieldOfThread(t).flatMap(ContainerWatchdog::hasClassloaderForUninstalledBundle).orElse(null);
+ if (b != null) {
+ staleThreads.add(new ThreadDetails(t, b));
+ }
+ // Note: no reliable mechanism for detecting threads owned by leaked executors (e.g. ScheduledThreadPoolExecutor)
+ }
+ if (!staleThreads.isEmpty()) {
+ StringBuilder msg = new StringBuilder(
+ ("Found %d stale threads that should have been stopped during previous reconfiguration(s). " +
+ "These threads have a classloader for a bundle that has been uninstalled: \n")
+ .formatted(staleThreads.size()));
+ MutableInteger i = new MutableInteger(1);
+ Comparator<ThreadDetails> outputOrdering =
+ Comparator.<ThreadDetails, Long>comparing(td -> td.bundle().getBundleId())
+ .thenComparing(td -> td.thread().getName()).thenComparing(td -> td.thread().getId());
+ staleThreads.stream().sorted(outputOrdering).forEach(t ->
+ msg.append("%d) Thread '%s' using bundle '%s'. \n"
+ .formatted(i.next(), t.thread().getName(), t.bundle().toString())));
+ log.log(Level.INFO, msg::toString); // Level 'info' until deemed reliable enough as 'warning'
+ }
+ }
+
+ private static Optional<Runnable> getTargetFieldOfThread(Thread t) {
+ try {
+ Field f = Thread.class.getDeclaredField("target");
+ f.setAccessible(true);
+ return Optional.ofNullable((Runnable)f.get(t));
+ } catch (ReflectiveOperationException e) {
+ return Optional.empty();
+ }
+ }
+
+ private static Optional<Bundle> hasClassloaderForUninstalledBundle(Object o) {
+ if (o.getClass().getClassLoader() instanceof BundleWiringImpl.BundleClassLoader cl) {
+ Bundle b = cl.getBundle();
+ if (b.getState() == Bundle.UNINSTALLED) return Optional.of(b);
+ }
+ return Optional.empty();
+ }
+
private static class DeactivatedContainer {
final ActiveContainer instance;
final Instant timeActivated;