summaryrefslogtreecommitdiffstats
path: root/container-search
diff options
context:
space:
mode:
authorjonmv <venstad@gmail.com>2023-07-11 12:07:55 +0200
committerjonmv <venstad@gmail.com>2023-07-13 10:30:49 +0200
commit9a81c2221bfd201e4614e5146aa58251db30c914 (patch)
tree05989e0a352d269e08f2bada9a4a3aacbf1c0368 /container-search
parentbb006b94caafd982c633a2830f1eb9a63d24fc0c (diff)
Add skeleton for delaying search invoker RPC resource shutdown
Diffstat (limited to 'container-search')
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/CloseableInvoker.java7
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java79
2 files changed, 74 insertions, 12 deletions
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/CloseableInvoker.java b/container-search/src/main/java/com/yahoo/search/dispatch/CloseableInvoker.java
index 77496114df1..558b734be51 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/CloseableInvoker.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/CloseableInvoker.java
@@ -21,8 +21,11 @@ public abstract class CloseableInvoker implements Closeable {
private RequestDuration duration;
public void teardown(BiConsumer<Boolean, RequestDuration> teardown) {
- this.teardown = teardown;
- this.duration = new RequestDuration();
+ this.teardown = this.teardown == null ? teardown : (success, duration) -> {
+ this.teardown.accept(success, duration);
+ teardown.accept(success, duration);
+ };
+ this.duration = this.duration == null ? new RequestDuration() : this.duration;
}
protected void setFinalStatus(boolean success) {
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java
index 41f02b1b580..2e8fbe34781 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java
@@ -32,6 +32,8 @@ import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
/**
* A dispatcher communicates with search nodes to perform queries and fill hits.
@@ -61,12 +63,34 @@ public class Dispatcher extends AbstractComponent {
private volatile VolatileItems volatileItems;
private static class VolatileItems {
+
final LoadBalancer loadBalancer;
final InvokerFactory invokerFactory;
+ final AtomicInteger inflight = new AtomicInteger(1); // Initial reference.
+ Runnable cleanup = () -> { };
+
VolatileItems(LoadBalancer loadBalancer, InvokerFactory invokerFactory) {
this.loadBalancer = loadBalancer;
this.invokerFactory = invokerFactory;
}
+
+ private void countDown() {
+ if (inflight.decrementAndGet() == 0) cleanup.run();
+ }
+
+ private class Ref implements AutoCloseable {
+ boolean handedOff = false;
+ { inflight.incrementAndGet(); }
+ VolatileItems get() { return VolatileItems.this; }
+ /** Hands off the reference to the given invoker, which will decrement the counter when closed. */
+ <T extends CloseableInvoker> T register(T invoker) {
+ invoker.teardown((__, ___) -> countDown());
+ handedOff = true;
+ return invoker;
+ }
+ @Override public void close() { if ( ! handedOff) countDown(); }
+ }
+
}
private static final QueryProfileType argumentType;
@@ -103,6 +127,38 @@ public class Dispatcher extends AbstractComponent {
this.volatileItems = update(invokerFactory);
}
+ /** Returns the snapshot of volatile items that need to be kept together, incrementing its reference counter. */
+ private VolatileItems.Ref volatileItems() {
+ return volatileItems.new Ref();
+ }
+
+ /**
+ * This is called whenever we have new config for backend nodes.
+ * Normally, we'd want to handle partial failure of the component graph, by reinstating the old state;
+ * however, in this case, such a failure would be local to this container, and we instead want to keep
+ * the newest config, as that is what most accurately represents the actual backend.
+ *
+ * The flow of reconfiguration is:
+ * 1. The volatile snapshot of disposable items is replaced with a new one that only references updated nodes.
+ * 2. Dependencies of the items in 1., which must be configured, are updated, yielding a list of resources to close.
+ * 3. When inflight operations against the old snapshot are done, all obsolete resources are cleaned up.
+ */
+ void updateWithNewConfig(DispatchNodesConfig nodesConfig) {
+ try (var items = volatileItems()) { // Replace the volatile items snapshot, marking a reference to the old one.
+ items.get().countDown(); // Decrement for the initial creation reference.
+ this.volatileItems = update(null);
+
+ // Set up the cleanup that we need to do.
+ Collection<? extends AutoCloseable> connectionPoolsToClose = rpcResourcePool.updateNodes(nodesConfig);
+ items.get().cleanup = () -> {
+ for (AutoCloseable pool : connectionPoolsToClose) {
+ try { pool.close(); } catch (Exception ignored) { }
+ }
+ };
+
+ } // Close the old snapshot, which may trigger the cleanup right now, or when the last invoker is closed.
+ }
+
private VolatileItems update(InvokerFactory invokerFactory) {
var items = new VolatileItems(new LoadBalancer(searchCluster.groupList().groups(), toLoadBalancerPolicy(dispatchConfig.distributionPolicy())),
(invokerFactory == null)
@@ -165,20 +221,23 @@ public class Dispatcher extends AbstractComponent {
}
public FillInvoker getFillInvoker(Result result, VespaBackEndSearcher searcher) {
- return volatileItems.invokerFactory.createFillInvoker(searcher, result);
+ try (var items = volatileItems()) { // Take a snapshot, and release it when we're done.
+ return items.register(items.get().invokerFactory.createFillInvoker(searcher, result));
+ }
}
public SearchInvoker getSearchInvoker(Query query, VespaBackEndSearcher searcher) {
- VolatileItems items = volatileItems; // Take a snapshot
- int maxHitsPerNode = dispatchConfig.maxHitsPerNode();
- SearchInvoker invoker = getSearchPathInvoker(query, searcher, searchCluster.groupList(), items.invokerFactory, maxHitsPerNode)
- .orElseGet(() -> getInternalInvoker(query, searcher, searchCluster, items.loadBalancer, items.invokerFactory, maxHitsPerNode));
-
- if (query.properties().getBoolean(com.yahoo.search.query.Model.ESTIMATE)) {
- query.setHits(0);
- query.setOffset(0);
+ try (var items = volatileItems()) { // Take a snapshot, and release it when we're done.
+ int maxHitsPerNode = dispatchConfig.maxHitsPerNode();
+ SearchInvoker invoker = getSearchPathInvoker(query, searcher, searchCluster.groupList(), items.get().invokerFactory, maxHitsPerNode)
+ .orElseGet(() -> getInternalInvoker(query, searcher, searchCluster, items.get().loadBalancer, items.get().invokerFactory, maxHitsPerNode));
+
+ if (query.properties().getBoolean(com.yahoo.search.query.Model.ESTIMATE)) {
+ query.setHits(0);
+ query.setOffset(0);
+ }
+ return items.register(invoker);
}
- return invoker;
}
/** Builds an invoker based on searchpath */