diff options
Diffstat (limited to 'container-search')
7 files changed, 24 insertions, 20 deletions
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcInvokerFactory.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcInvokerFactory.java index 119037d16dd..8333080cdf0 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcInvokerFactory.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcInvokerFactory.java @@ -15,6 +15,7 @@ import com.yahoo.search.dispatch.SearchInvoker; import com.yahoo.search.dispatch.searchcluster.Node; import com.yahoo.search.dispatch.searchcluster.PingFactory; import com.yahoo.search.dispatch.searchcluster.Pinger; +import com.yahoo.search.dispatch.searchcluster.PongHandler; import com.yahoo.search.dispatch.searchcluster.SearchCluster; import java.util.Optional; @@ -66,7 +67,7 @@ public class RpcInvokerFactory extends InvokerFactory implements PingFactory { } @Override - public Pinger createPinger(Node node, ClusterMonitor<Node> monitor) { - return new RpcPing(node, monitor, rpcResourcePool); + public Pinger createPinger(Node node, ClusterMonitor<Node> monitor, PongHandler pongHandler) { + return new RpcPing(node, monitor, rpcResourcePool, pongHandler); } } diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcPing.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcPing.java index 9751cfce7db..ba3b050149c 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcPing.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcPing.java @@ -28,21 +28,22 @@ public class RpcPing implements Pinger, Client.ResponseReceiver { private final RpcResourcePool resourcePool; private final ClusterMonitor<Node> clusterMonitor; private final long pingSequenceId; - private final AtomicReference<PongHandler> handler = new AtomicReference<>(); + private final PongHandler pongHandler; - public RpcPing(Node node, ClusterMonitor<Node> clusterMonitor, RpcResourcePool rpcResourcePool) { + public RpcPing(Node node, ClusterMonitor<Node> clusterMonitor, RpcResourcePool rpcResourcePool, PongHandler pongHandler) { this.node = node; this.resourcePool = rpcResourcePool; this.clusterMonitor = clusterMonitor; pingSequenceId = node.createPingSequenceId(); + this.pongHandler = pongHandler; } @Override - public void ping(PongHandler handler) { + public void ping() { try { - sendPing(handler); + sendPing(); } catch (RuntimeException e) { - handler.handle(new Pong(ErrorMessage.createBackendCommunicationError("Exception when pinging " + node + + pongHandler.handle(new Pong(ErrorMessage.createBackendCommunicationError("Exception when pinging " + node + ": " + Exceptions.toMessageString(e)))); } } @@ -61,8 +62,7 @@ public class RpcPing implements Pinger, Client.ResponseReceiver { } } - private void sendPing(PongHandler handler) { - this.handler.set(handler); + private void sendPing() { var connection = resourcePool.getConnection(node.key()); var ping = SearchProtocol.MonitorRequest.newBuilder().build().toByteArray(); double timeoutSeconds = ((double) clusterMonitor.getConfiguration().getRequestTimeout()) / 1000.0; @@ -88,7 +88,7 @@ public class RpcPing implements Pinger, Client.ResponseReceiver { @Override public void receive(ResponseOrError<ProtobufResponse> response) { if (node.isLastReceivedPong(pingSequenceId)) { - handler.get().handle(toPong(response)); + pongHandler.handle(toPong(response)); } else { //TODO Reduce to debug or remove once we have enumerated what happens here. log.info("Pong " + pingSequenceId + " received too late, latest is " + node.getLastReceivedPongId()); diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/PingFactory.java b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/PingFactory.java index fe2937d2450..2e07d8d61e6 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/PingFactory.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/PingFactory.java @@ -6,6 +6,6 @@ import com.yahoo.search.cluster.ClusterMonitor; public interface PingFactory { - Pinger createPinger(Node node, ClusterMonitor<Node> monitor); + Pinger createPinger(Node node, ClusterMonitor<Node> monitor, PongHandler pongHandler); } diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Pinger.java b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Pinger.java index d6cf610cd65..b4a7ccbf98c 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Pinger.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Pinger.java @@ -8,5 +8,5 @@ package com.yahoo.search.dispatch.searchcluster; * @author baldersheim */ public interface Pinger { - void ping(PongHandler handler); + void ping(); } diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java index 675db0ee60d..a48d741bdca 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java @@ -281,8 +281,8 @@ public class SearchCluster implements NodeManager<Node> { public void ping(Node node, Executor executor) { if (pingFactory == null) return; // not initialized yet - Pinger pinger = pingFactory.createPinger(node, clusterMonitor); - pinger.ping(new PongCallback(node, clusterMonitor)); + Pinger pinger = pingFactory.createPinger(node, clusterMonitor, new PongCallback(node, clusterMonitor)); + pinger.ping(); } private void pingIterationCompletedSingleGroup() { diff --git a/container-search/src/test/java/com/yahoo/search/dispatch/DispatcherTest.java b/container-search/src/test/java/com/yahoo/search/dispatch/DispatcherTest.java index a369b859b13..a1ae3b6a19d 100644 --- a/container-search/src/test/java/com/yahoo/search/dispatch/DispatcherTest.java +++ b/container-search/src/test/java/com/yahoo/search/dispatch/DispatcherTest.java @@ -11,6 +11,7 @@ import com.yahoo.search.cluster.ClusterMonitor; import com.yahoo.search.dispatch.searchcluster.Node; import com.yahoo.search.dispatch.searchcluster.PingFactory; import com.yahoo.search.dispatch.searchcluster.Pinger; +import com.yahoo.search.dispatch.searchcluster.PongHandler; import com.yahoo.search.dispatch.searchcluster.SearchCluster; import org.junit.Test; @@ -143,7 +144,7 @@ public class DispatcherTest { } @Override - public Pinger createPinger(Node node, ClusterMonitor<Node> monitor) { + public Pinger createPinger(Node node, ClusterMonitor<Node> monitor, PongHandler pongHandler) { fail("Unexpected call to createPinger"); return null; } diff --git a/container-search/src/test/java/com/yahoo/search/dispatch/searchcluster/SearchClusterTest.java b/container-search/src/test/java/com/yahoo/search/dispatch/searchcluster/SearchClusterTest.java index bc32297d123..a1b5bd6d102 100644 --- a/container-search/src/test/java/com/yahoo/search/dispatch/searchcluster/SearchClusterTest.java +++ b/container-search/src/test/java/com/yahoo/search/dispatch/searchcluster/SearchClusterTest.java @@ -116,15 +116,17 @@ public class SearchClusterTest { private final AtomicInteger numDocs; private final AtomicInteger pingCount; - PingJob(AtomicInteger numDocs, AtomicInteger pingCount) { + private final PongHandler pongHandler; + PingJob(AtomicInteger numDocs, AtomicInteger pingCount, PongHandler pongHandler) { this.numDocs = numDocs; this.pingCount = pingCount; + this.pongHandler = pongHandler; } @Override - public void ping(PongHandler handler) { + public void ping() { int docs = numDocs.get(); pingCount.incrementAndGet(); - handler.handle ((docs < 0) + pongHandler.handle ((docs < 0) ? new Pong(ErrorMessage.createBackendCommunicationError("Negative numDocs = " + docs)) : new Pong(docs)); } @@ -141,9 +143,9 @@ public class SearchClusterTest { } @Override - public Pinger createPinger(Node node, ClusterMonitor<Node> monitor) { + public Pinger createPinger(Node node, ClusterMonitor<Node> monitor, PongHandler pongHandler) { int index = node.group() * numPerGroup + node.key(); - return new PingJob(activeDocs.get(index), pingCounts.get(index)); + return new PingJob(activeDocs.get(index), pingCounts.get(index), pongHandler); } } |