diff options
author | HÃ¥kon Hallingstad <hakon.hallingstad@gmail.com> | 2022-09-21 12:56:33 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-09-21 12:56:33 +0200 |
commit | dce8374296c9cd7f6811eadb8ab62c440bb2c812 (patch) | |
tree | a259e19f9311d5ef1ae089e0d28bb22eeeda17bd /clustercontroller-core | |
parent | f81800f066f3291bfd6c3ecbca4a58008ddf9137 (diff) | |
parent | 6a1f9324121ae141c7ad1b7db290fcf25b261750 (diff) |
Merge pull request #24151 from vespa-engine/hmusum/cleanup-35
Cluster controller cleanup, part 17 [run-systemtest]
Diffstat (limited to 'clustercontroller-core')
5 files changed, 29 insertions, 48 deletions
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java index 80b79f4eb13..0baf1bba4fa 100644 --- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java @@ -3,7 +3,6 @@ package com.yahoo.vespa.clustercontroller.core; import com.yahoo.document.FixedBucketSpaces; import com.yahoo.exception.ExceptionUtils; -import com.yahoo.jrt.ListenFailedException; import com.yahoo.vdslib.distribution.ConfiguredNode; import com.yahoo.vdslib.state.ClusterState; import com.yahoo.vdslib.state.Node; @@ -511,13 +510,7 @@ public class FleetController implements NodeListener, SlobrokListener, SystemSta if (rpcServer != null) { rpcServer.setMasterElectionHandler(masterElectionHandler); - try{ - rpcServer.setSlobrokConnectionSpecs(options.slobrokConnectionSpecs(), options.rpcPort()); - } catch (ListenFailedException e) { - context.log(logger, Level.WARNING, "Failed to bind RPC server to port " + options.rpcPort() + ". This may be natural if cluster has altered the services running on this node: " + e.getMessage()); - } catch (Exception e) { - context.log(logger, Level.WARNING, "Failed to initialize RPC server socket: " + e.getMessage()); - } + rpcServer.setSlobrokConnectionSpecs(options.slobrokConnectionSpecs(), options.rpcPort()); } try { diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetControllerOptions.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetControllerOptions.java index 7d2b540c97e..c645c2005b0 100644 --- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetControllerOptions.java +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetControllerOptions.java @@ -1,6 +1,7 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.vespa.clustercontroller.core; +import ai.vespa.validation.Validation; import com.yahoo.jrt.slobrok.api.BackOffPolicy; import com.yahoo.vdslib.distribution.ConfiguredNode; import com.yahoo.vdslib.distribution.Distribution; @@ -194,7 +195,7 @@ public class FleetControllerOptions { this.minRatioOfDistributorNodesUp = minRatioOfDistributorNodesUp; this.minRatioOfStorageNodesUp = minRatioOfStorageNodesUp; this.minNodeRatioPerGroup = minNodeRatioPerGroup; - this.cycleWaitTime = cycleWaitTime; + this.cycleWaitTime = Validation.requireAtLeast(cycleWaitTime, "cycleWaitTime must be positive", 1); this.minTimeBeforeFirstSystemStateBroadcast = minTimeBeforeFirstSystemStateBroadcast; this.nodeStateRequestTimeoutMS = nodeStateRequestTimeoutMS; this.nodeStateRequestTimeoutEarliestPercentage = nodeStateRequestTimeoutEarliestPercentage; diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RpcServer.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RpcServer.java index 06f6777ab80..dcef432aec0 100644 --- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RpcServer.java +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RpcServer.java @@ -4,7 +4,6 @@ package com.yahoo.vespa.clustercontroller.core.rpc; import com.yahoo.jrt.Acceptor; import com.yahoo.jrt.ErrorCode; import com.yahoo.jrt.Int32Value; -import com.yahoo.jrt.ListenFailedException; import com.yahoo.jrt.Method; import com.yahoo.jrt.Request; import com.yahoo.jrt.Spec; @@ -28,7 +27,6 @@ import com.yahoo.vespa.clustercontroller.core.Timer; import com.yahoo.vespa.clustercontroller.core.listeners.NodeListener; import java.io.PrintWriter; import java.io.StringWriter; -import java.net.UnknownHostException; import java.util.ArrayList; import java.util.Arrays; import java.util.Iterator; @@ -79,7 +77,7 @@ public class RpcServer { return "storage/cluster." + clusterName + "/fleetcontroller/" + fleetControllerIndex; } - public void setSlobrokConnectionSpecs(String[] slobrokConnectionSpecs, int port) throws ListenFailedException, UnknownHostException { + public void setSlobrokConnectionSpecs(String[] slobrokConnectionSpecs, int port) { if (this.slobrokConnectionSpecs == null || !Arrays.equals(this.slobrokConnectionSpecs, slobrokConnectionSpecs) || this.port != port) { @@ -94,24 +92,26 @@ public class RpcServer { return (register != null); } - public void connect() throws ListenFailedException, UnknownHostException { + public void connect() { disconnect(); - log.log(Level.FINE, () -> "Fleetcontroller " + fleetControllerIndex + ": Connecting RPC server."); - if (supervisor != null) disconnect(); supervisor = new Supervisor(new Transport("rpc" + port)).setDropEmptyBuffers(true); addMethods(); - log.log(Level.FINE, () -> "Fleetcontroller " + fleetControllerIndex + ": Attempting to bind to port " + port); - acceptor = supervisor.listen(new Spec(port)); - log.log(Level.FINE, () -> "Fleetcontroller " + fleetControllerIndex + ": RPC server listening to port " + acceptor.port()); - StringBuilder slobroks = new StringBuilder("("); - for (String s : slobrokConnectionSpecs) { - slobroks.append(" ").append(s); + log.log(Level.FINE, () -> "Fleetcontroller " + fleetControllerIndex + ": RPC server attempting to bind to port " + port); + try { + acceptor = supervisor.listen(new Spec(port)); + } catch (Exception e) { + long time = timer.getCurrentTimeInMillis(); + if (!e.getMessage().equals(lastConnectError) || time - lastConnectErrorTime > 60 * 1000) { + lastConnectError = e.getMessage(); + lastConnectErrorTime = time; + log.log(Level.WARNING, "Failed to bind or initialize RPC server socket: " + e.getMessage()); + } } - slobroks.append(" )"); + log.log(Level.FINE, () -> "Fleetcontroller " + fleetControllerIndex + ": RPC server listening to port " + acceptor.port()); SlobrokList slist = new SlobrokList(); slist.setup(slobrokConnectionSpecs); Spec spec = new Spec(HostName.getLocalhost(), acceptor.port()); - log.log(Level.INFO, "Registering " + spec + " with slobrok at " + slobroks); + log.log(Level.INFO, "Registering " + spec + " with slobrok at " + String.join(" ", slobrokConnectionSpecs)); if (slobrokBackOffPolicy != null) { register = new Register(supervisor, slist, spec, slobrokBackOffPolicy); } else { @@ -136,7 +136,6 @@ public class RpcServer { } } - public void addMethods() { Method m = new Method("getMaster", "", "is", this::queueRpcRequest); m.methodDesc("Get index of current fleetcontroller master"); @@ -184,25 +183,10 @@ public class RpcServer { } public boolean handleRpcRequests(ContentCluster cluster, ClusterState systemState, NodeListener changeListener) { + if (!isConnected()) + connect(); + boolean handledAnyRequests = false; - if (!isConnected()) { - long time = timer.getCurrentTimeInMillis(); - try{ - connect(); - } catch (ListenFailedException e) { - if (!e.getMessage().equals(lastConnectError) || time - lastConnectErrorTime > 60 * 1000) { - lastConnectError = e.getMessage(); - lastConnectErrorTime = time; - log.log(Level.WARNING, "Failed to bind RPC server to port " + port +": " + e.getMessage()); - } - } catch (Exception e) { - if (!e.getMessage().equals(lastConnectError) || time - lastConnectErrorTime > 60 * 1000) { - lastConnectError = e.getMessage(); - lastConnectErrorTime = time; - log.log(Level.WARNING, "Failed to initialize RPC server socket: " + e.getMessage()); - } - } - } for (int j=0; j<10; ++j) { // Max perform 10 RPC requests per cycle. Request req; synchronized(monitor) { diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/testutils/StateWaiter.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/testutils/StateWaiter.java index d131c330381..32c4ee99c3c 100644 --- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/testutils/StateWaiter.java +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/testutils/StateWaiter.java @@ -62,7 +62,7 @@ public class StateWaiter implements SystemStateListener { } /** - * WARNING: If timeIntervalToProvokeRetry is set != 0 that means time will can be set far into future + * WARNING: If timeIntervalToProvokeRetry is set != 0 that means time will be set far into the future * and thus hit various unintended timeout periods. Only auto-step time if this is a non-issue. */ public void waitForState(String stateRegex, long timeout, long timeIntervalToProvokeRetry) { @@ -82,9 +82,10 @@ public class StateWaiter implements SystemStateListener { return; } } - try{ + try { if (timeIntervalToProvokeRetry == 0) { - timer.wait(endTime - startTime); + var waitTime = Math.max(1, endTime - startTime); + timer.wait(waitTime); } else { if (++iteration % 10 == 0) { timer.advanceTime(timeIntervalToProvokeRetry); @@ -92,10 +93,10 @@ public class StateWaiter implements SystemStateListener { timer.wait(10); } } catch (InterruptedException e) { + throw new RuntimeException(e); } } - startTime = System.currentTimeMillis(); - if (startTime >= endTime) { + if (System.currentTimeMillis() >= endTime) { throw new IllegalStateException("Timeout. Did not find a state matching " + stateRegex + " within timeout of " + timeout + " milliseconds. Current state is " + currentClusterState); } } diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/testutils/Waiter.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/testutils/Waiter.java index 5bff101906d..4f79500e84d 100644 --- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/testutils/Waiter.java +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/testutils/Waiter.java @@ -1,6 +1,7 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.vespa.clustercontroller.core.testutils; +import ai.vespa.validation.Validation; import com.yahoo.vdslib.state.ClusterState; import com.yahoo.vdslib.state.Node; import com.yahoo.vespa.clustercontroller.core.DummyVdsNode; @@ -90,6 +91,7 @@ public interface Waiter { public final void wait(WaitCondition c, WaitTask wt, Duration timeout) { Objects.requireNonNull(wt, "wait task cannot be null"); + Validation.requireAtLeast(timeout.toMillis(), "timeout must be positive", 1L); log.log(Level.INFO, "Waiting for " + c + " with wait task " + wt); Instant endTime = Instant.now().plus(timeout); @@ -112,7 +114,7 @@ public interface Waiter { allowWait = false; } Duration timeLeft = Duration.between(Instant.now(), endTime); - if (timeLeft.isNegative()) + if (timeLeft.isNegative() || timeLeft.isZero()) throw new IllegalStateException("Timed out waiting max " + timeout + " ms for " + c + "\n with wait task " + wt + ",\n reason: " + reason); if (allowWait) data.getMonitor().wait(Math.min(wt.getWaitTaskFrequencyInMillis(), timeLeft.toMillis())); |