diff options
Diffstat (limited to 'jdisc_core/src/main/java/com/yahoo/jdisc/core/ContainerWatchdog.java')
-rw-r--r-- | jdisc_core/src/main/java/com/yahoo/jdisc/core/ContainerWatchdog.java | 90 |
1 files changed, 86 insertions, 4 deletions
diff --git a/jdisc_core/src/main/java/com/yahoo/jdisc/core/ContainerWatchdog.java b/jdisc_core/src/main/java/com/yahoo/jdisc/core/ContainerWatchdog.java index c010eac02b5..f28d5ea2b26 100644 --- a/jdisc_core/src/main/java/com/yahoo/jdisc/core/ContainerWatchdog.java +++ b/jdisc_core/src/main/java/com/yahoo/jdisc/core/ContainerWatchdog.java @@ -1,18 +1,29 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.jdisc.core; +import com.yahoo.concurrent.Threads; import com.yahoo.jdisc.Metric; import com.yahoo.jdisc.statistics.ContainerWatchdogMetrics; +import com.yahoo.lang.MutableInteger; +import org.apache.felix.framework.BundleWiringImpl; +import org.osgi.framework.Bundle; +import java.lang.reflect.Field; import java.time.Clock; import java.time.Duration; import java.time.Instant; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Comparator; import java.util.Iterator; import java.util.LinkedList; import java.util.List; +import java.util.Optional; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.logging.Level; import java.util.logging.Logger; /** @@ -24,7 +35,7 @@ class ContainerWatchdog implements ContainerWatchdogMetrics, AutoCloseable { static final Duration GRACE_PERIOD = Duration.ofMinutes(5); static final Duration UPDATE_PERIOD = Duration.ofMinutes(5); - static final Duration CHECK_PERIOD = Duration.ofSeconds(1); + static final Duration CONTAINER_CHECK_PERIOD = Duration.ofSeconds(1); private static final Logger log = Logger.getLogger(ContainerWatchdog.class.getName()); @@ -37,6 +48,9 @@ class ContainerWatchdog implements ContainerWatchdogMetrics, AutoCloseable { private Instant currentContainerActivationTime; private int numStaleContainers; private Instant lastLogTime; + private ScheduledFuture<?> threadMonitoringTask; + private ScheduledFuture<?> containerMontoringTask; + ContainerWatchdog() { this(new ScheduledThreadPoolExecutor( @@ -53,7 +67,11 @@ class ContainerWatchdog implements ContainerWatchdogMetrics, AutoCloseable { this.scheduler = scheduler; this.clock = clock; this.lastLogTime = clock.instant(); - scheduler.scheduleAtFixedRate(this::monitorDeactivatedContainers, CHECK_PERIOD.getSeconds(), CHECK_PERIOD.getSeconds(), TimeUnit.SECONDS); + } + + void start() { + this.containerMontoringTask = scheduler.scheduleAtFixedRate(this::monitorDeactivatedContainers, + CONTAINER_CHECK_PERIOD.getSeconds(), CONTAINER_CHECK_PERIOD.getSeconds(), TimeUnit.SECONDS); } @Override @@ -67,8 +85,8 @@ class ContainerWatchdog implements ContainerWatchdogMetrics, AutoCloseable { @Override public void close() throws InterruptedException { - scheduler.shutdownNow(); - scheduler.awaitTermination(1, TimeUnit.MINUTES); + if (containerMontoringTask != null) containerMontoringTask.cancel(false); + if (threadMonitoringTask != null) threadMonitoringTask.cancel(false); synchronized (monitor) { deactivatedContainers.clear(); currentContainer = null; @@ -83,6 +101,8 @@ class ContainerWatchdog implements ContainerWatchdogMetrics, AutoCloseable { } currentContainer = nextContainer; currentContainerActivationTime = clock.instant(); + if (threadMonitoringTask != null) threadMonitoringTask.cancel(false); + threadMonitoringTask = scheduler.schedule(this::monitorThreads, 1, TimeUnit.MINUTES); } } @@ -126,6 +146,68 @@ class ContainerWatchdog implements ContainerWatchdogMetrics, AutoCloseable { return clock.instant().isAfter(container.timeDeactivated.plus(GRACE_PERIOD)); } + private void monitorThreads() { + Collection<Thread> threads = Threads.getAllThreads(); + warnOnThreadsHavingClassloaderFromUninstalledBundles(threads); + } + + /** + * Detect and warn on threads leaked by defunct application components. + * These are threads launched by components of application bundles and not stopped after subsequent component deconstruction. + * The below algorithm is a heuristic and will not detect all leaked threads. + */ + private void warnOnThreadsHavingClassloaderFromUninstalledBundles(Collection<Thread> threads) { + record ThreadDetails(Thread thread, Bundle bundle) {} + List<ThreadDetails> staleThreads = new ArrayList<>(); + for (Thread t : threads) { + // Find threads which are sub-classes of java.lang.Thread from an uninstalled bundle + Bundle b = hasClassloaderForUninstalledBundle(t).orElse(null); + if (b != null) { + staleThreads.add(new ThreadDetails(t, b)); + continue; + } + // Find threads which Runnable is a class from an uninstalled bundle + // This may create false positives + b = getTargetFieldOfThread(t).flatMap(ContainerWatchdog::hasClassloaderForUninstalledBundle).orElse(null); + if (b != null) { + staleThreads.add(new ThreadDetails(t, b)); + } + // Note: no reliable mechanism for detecting threads owned by leaked executors (e.g. ScheduledThreadPoolExecutor) + } + if (!staleThreads.isEmpty()) { + StringBuilder msg = new StringBuilder( + ("Found %d stale threads that should have been stopped during previous reconfiguration(s). " + + "These threads have a classloader for a bundle that has been uninstalled: \n") + .formatted(staleThreads.size())); + MutableInteger i = new MutableInteger(1); + Comparator<ThreadDetails> outputOrdering = + Comparator.<ThreadDetails, Long>comparing(td -> td.bundle().getBundleId()) + .thenComparing(td -> td.thread().getName()).thenComparing(td -> td.thread().getId()); + staleThreads.stream().sorted(outputOrdering).forEach(t -> + msg.append("%d) Thread '%s' using bundle '%s'. \n" + .formatted(i.next(), t.thread().getName(), t.bundle().toString()))); + log.log(Level.INFO, msg::toString); // Level 'info' until deemed reliable enough as 'warning' + } + } + + private static Optional<Runnable> getTargetFieldOfThread(Thread t) { + try { + Field f = Thread.class.getDeclaredField("target"); + f.setAccessible(true); + return Optional.ofNullable((Runnable)f.get(t)); + } catch (ReflectiveOperationException e) { + return Optional.empty(); + } + } + + private static Optional<Bundle> hasClassloaderForUninstalledBundle(Object o) { + if (o.getClass().getClassLoader() instanceof BundleWiringImpl.BundleClassLoader cl) { + Bundle b = cl.getBundle(); + if (b.getState() == Bundle.UNINSTALLED) return Optional.of(b); + } + return Optional.empty(); + } + private static class DeactivatedContainer { final ActiveContainer instance; final Instant timeActivated; |