diff options
author | jonmv <venstad@gmail.com> | 2023-07-11 12:07:55 +0200 |
---|---|---|
committer | jonmv <venstad@gmail.com> | 2023-07-13 10:30:49 +0200 |
commit | 9a81c2221bfd201e4614e5146aa58251db30c914 (patch) | |
tree | 05989e0a352d269e08f2bada9a4a3aacbf1c0368 /container-search | |
parent | bb006b94caafd982c633a2830f1eb9a63d24fc0c (diff) |
Add skeleton for delaying search invoker RPC resource shutdown
Diffstat (limited to 'container-search')
-rw-r--r-- | container-search/src/main/java/com/yahoo/search/dispatch/CloseableInvoker.java | 7 | ||||
-rw-r--r-- | container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java | 79 |
2 files changed, 74 insertions, 12 deletions
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/CloseableInvoker.java b/container-search/src/main/java/com/yahoo/search/dispatch/CloseableInvoker.java index 77496114df1..558b734be51 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/CloseableInvoker.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/CloseableInvoker.java @@ -21,8 +21,11 @@ public abstract class CloseableInvoker implements Closeable { private RequestDuration duration; public void teardown(BiConsumer<Boolean, RequestDuration> teardown) { - this.teardown = teardown; - this.duration = new RequestDuration(); + this.teardown = this.teardown == null ? teardown : (success, duration) -> { + this.teardown.accept(success, duration); + teardown.accept(success, duration); + }; + this.duration = this.duration == null ? new RequestDuration() : this.duration; } protected void setFinalStatus(boolean success) { diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java index 41f02b1b580..2e8fbe34781 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java @@ -32,6 +32,8 @@ import java.util.HashSet; import java.util.List; import java.util.Optional; import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; /** * A dispatcher communicates with search nodes to perform queries and fill hits. @@ -61,12 +63,34 @@ public class Dispatcher extends AbstractComponent { private volatile VolatileItems volatileItems; private static class VolatileItems { + final LoadBalancer loadBalancer; final InvokerFactory invokerFactory; + final AtomicInteger inflight = new AtomicInteger(1); // Initial reference. + Runnable cleanup = () -> { }; + VolatileItems(LoadBalancer loadBalancer, InvokerFactory invokerFactory) { this.loadBalancer = loadBalancer; this.invokerFactory = invokerFactory; } + + private void countDown() { + if (inflight.decrementAndGet() == 0) cleanup.run(); + } + + private class Ref implements AutoCloseable { + boolean handedOff = false; + { inflight.incrementAndGet(); } + VolatileItems get() { return VolatileItems.this; } + /** Hands off the reference to the given invoker, which will decrement the counter when closed. */ + <T extends CloseableInvoker> T register(T invoker) { + invoker.teardown((__, ___) -> countDown()); + handedOff = true; + return invoker; + } + @Override public void close() { if ( ! handedOff) countDown(); } + } + } private static final QueryProfileType argumentType; @@ -103,6 +127,38 @@ public class Dispatcher extends AbstractComponent { this.volatileItems = update(invokerFactory); } + /** Returns the snapshot of volatile items that need to be kept together, incrementing its reference counter. */ + private VolatileItems.Ref volatileItems() { + return volatileItems.new Ref(); + } + + /** + * This is called whenever we have new config for backend nodes. + * Normally, we'd want to handle partial failure of the component graph, by reinstating the old state; + * however, in this case, such a failure would be local to this container, and we instead want to keep + * the newest config, as that is what most accurately represents the actual backend. + * + * The flow of reconfiguration is: + * 1. The volatile snapshot of disposable items is replaced with a new one that only references updated nodes. + * 2. Dependencies of the items in 1., which must be configured, are updated, yielding a list of resources to close. + * 3. When inflight operations against the old snapshot are done, all obsolete resources are cleaned up. + */ + void updateWithNewConfig(DispatchNodesConfig nodesConfig) { + try (var items = volatileItems()) { // Replace the volatile items snapshot, marking a reference to the old one. + items.get().countDown(); // Decrement for the initial creation reference. + this.volatileItems = update(null); + + // Set up the cleanup that we need to do. + Collection<? extends AutoCloseable> connectionPoolsToClose = rpcResourcePool.updateNodes(nodesConfig); + items.get().cleanup = () -> { + for (AutoCloseable pool : connectionPoolsToClose) { + try { pool.close(); } catch (Exception ignored) { } + } + }; + + } // Close the old snapshot, which may trigger the cleanup right now, or when the last invoker is closed. + } + private VolatileItems update(InvokerFactory invokerFactory) { var items = new VolatileItems(new LoadBalancer(searchCluster.groupList().groups(), toLoadBalancerPolicy(dispatchConfig.distributionPolicy())), (invokerFactory == null) @@ -165,20 +221,23 @@ public class Dispatcher extends AbstractComponent { } public FillInvoker getFillInvoker(Result result, VespaBackEndSearcher searcher) { - return volatileItems.invokerFactory.createFillInvoker(searcher, result); + try (var items = volatileItems()) { // Take a snapshot, and release it when we're done. + return items.register(items.get().invokerFactory.createFillInvoker(searcher, result)); + } } public SearchInvoker getSearchInvoker(Query query, VespaBackEndSearcher searcher) { - VolatileItems items = volatileItems; // Take a snapshot - int maxHitsPerNode = dispatchConfig.maxHitsPerNode(); - SearchInvoker invoker = getSearchPathInvoker(query, searcher, searchCluster.groupList(), items.invokerFactory, maxHitsPerNode) - .orElseGet(() -> getInternalInvoker(query, searcher, searchCluster, items.loadBalancer, items.invokerFactory, maxHitsPerNode)); - - if (query.properties().getBoolean(com.yahoo.search.query.Model.ESTIMATE)) { - query.setHits(0); - query.setOffset(0); + try (var items = volatileItems()) { // Take a snapshot, and release it when we're done. + int maxHitsPerNode = dispatchConfig.maxHitsPerNode(); + SearchInvoker invoker = getSearchPathInvoker(query, searcher, searchCluster.groupList(), items.get().invokerFactory, maxHitsPerNode) + .orElseGet(() -> getInternalInvoker(query, searcher, searchCluster, items.get().loadBalancer, items.get().invokerFactory, maxHitsPerNode)); + + if (query.properties().getBoolean(com.yahoo.search.query.Model.ESTIMATE)) { + query.setHits(0); + query.setOffset(0); + } + return items.register(invoker); } - return invoker; } /** Builds an invoker based on searchpath */ |