diff options
67 files changed, 753 insertions, 541 deletions
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java index 20b62576ff5..10ba907b5a2 100644 --- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java @@ -50,8 +50,9 @@ import java.util.stream.Stream; public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAddedOrRemovedListener, SystemStateListener, Runnable, RemoteClusterControllerTaskScheduler { - private static final Logger log = Logger.getLogger(FleetController.class.getName()); + private static final Logger logger = Logger.getLogger(FleetController.class.getName()); + private final FleetControllerContext context; private final Timer timer; private final Object monitor; private final EventLog eventLog; @@ -70,7 +71,6 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd private final AtomicBoolean running = new AtomicBoolean(true); private FleetControllerOptions options; private FleetControllerOptions nextOptions; - private final int configuredIndex; private final List<SystemStateListener> systemStateListeners = new CopyOnWriteArrayList<>(); private boolean processingCycle = false; private boolean wantedStateChanged = false; @@ -112,7 +112,8 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd public ContentCluster getCluster() { return cluster; } }; - public FleetController(Timer timer, + public FleetController(FleetControllerContext context, + Timer timer, EventLog eventLog, ContentCluster cluster, NodeStateGatherer nodeStateGatherer, @@ -126,8 +127,8 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd MasterElectionHandler masterElectionHandler, MetricUpdater metricUpdater, FleetControllerOptions options) { - log.info("Starting up cluster controller " + options.fleetControllerIndex + " for cluster " + cluster.getName()); - this.configuredIndex = options.fleetControllerIndex; + context.log(logger, Level.INFO, "Created"); + this.context = context; this.timer = timer; this.monitor = timer; this.eventLog = eventLog; @@ -169,15 +170,16 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd public static FleetController create(FleetControllerOptions options, StatusPageServerInterface statusPageServer, MetricReporter metricReporter) throws Exception { - Timer timer = new RealTimer(); - MetricUpdater metricUpdater = new MetricUpdater(metricReporter, options.fleetControllerIndex, options.clusterName); - EventLog log = new EventLog(timer, metricUpdater); - ContentCluster cluster = new ContentCluster( + var context = new FleetControllerContextImpl(options); + var timer = new RealTimer(); + var metricUpdater = new MetricUpdater(metricReporter, options.fleetControllerIndex, options.clusterName); + var log = new EventLog(timer, metricUpdater); + var cluster = new ContentCluster( options.clusterName, options.nodes, options.storageDistribution); - NodeStateGatherer stateGatherer = new NodeStateGatherer(timer, timer, log); - Communicator communicator = new RPCCommunicator( + var stateGatherer = new NodeStateGatherer(timer, timer, log); + var communicator = new RPCCommunicator( RPCCommunicator.createRealSupervisor(), timer, options.fleetControllerIndex, @@ -185,13 +187,14 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd options.nodeStateRequestTimeoutEarliestPercentage, options.nodeStateRequestTimeoutLatestPercentage, options.nodeStateRequestRoundTripTimeMaxSeconds); - DatabaseHandler database = new DatabaseHandler(new ZooKeeperDatabaseFactory(), timer, options.zooKeeperServerAddress, options.fleetControllerIndex, timer); - NodeLookup lookUp = new SlobrokClient(timer); - StateChangeHandler stateGenerator = new StateChangeHandler(timer, log); - SystemStateBroadcaster stateBroadcaster = new SystemStateBroadcaster(timer, timer); - MasterElectionHandler masterElectionHandler = new MasterElectionHandler(options.fleetControllerIndex, options.fleetControllerCount, timer, timer); - FleetController controller = new FleetController( - timer, log, cluster, stateGatherer, communicator, statusPageServer, null, lookUp, database, stateGenerator, stateBroadcaster, masterElectionHandler, metricUpdater, options); + var database = new DatabaseHandler(context, new ZooKeeperDatabaseFactory(), timer, options.zooKeeperServerAddress, timer); + var lookUp = new SlobrokClient(timer); + var stateGenerator = new StateChangeHandler(timer, log); + var stateBroadcaster = new SystemStateBroadcaster(timer, timer); + var masterElectionHandler = new MasterElectionHandler(context, options.fleetControllerIndex, options.fleetControllerCount, timer, timer); + var controller = new FleetController(context, timer, log, cluster, stateGatherer, communicator, + statusPageServer, null, lookUp, database, stateGenerator, + stateBroadcaster, masterElectionHandler, metricUpdater, options); controller.start(); return controller; } @@ -227,7 +230,7 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd public void schedule(RemoteClusterControllerTask task) { synchronized (monitor) { - log.fine("Scheduled remote task " + task.getClass().getName() + " for execution"); + context.log(logger, Level.FINE, "Scheduled remote task " + task.getClass().getName() + " for execution"); remoteTasks.add(task); } } @@ -280,12 +283,12 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd public void shutdown() throws InterruptedException, java.io.IOException { if (runner != null && isRunning()) { - log.log(Level.INFO, "Joining event thread."); + context.log(logger, Level.INFO, "Joining event thread."); running.set(false); synchronized(monitor) { monitor.notifyAll(); } runner.join(); } - log.log(Level.INFO, "Fleetcontroller done shutting down event thread."); + context.log(logger, Level.INFO, "FleetController done shutting down event thread."); controllerThreadId = Thread.currentThread().getId(); database.shutdown(databaseContext); @@ -300,9 +303,10 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd } public void updateOptions(FleetControllerOptions options, long configGeneration) { + var newId = FleetControllerId.fromOptions(options); synchronized(monitor) { - assert(this.options.fleetControllerIndex == options.fleetControllerIndex); - log.log(Level.INFO, "Fleetcontroller " + options.fleetControllerIndex + " has new options"); + assert newId.equals(context.id()); + context.log(logger, Level.INFO, "FleetController has new options"); nextOptions = options.clone(); nextConfigGeneration = configGeneration; monitor.notifyAll(); @@ -348,8 +352,8 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd var previouslyExhausted = calc.enumerateNodeResourceExhaustions(nodeInfo); var nowExhausted = calc.resourceExhaustionsFromHostInfo(nodeInfo, newHostInfo); if (!previouslyExhausted.equals(nowExhausted)) { - log.fine(() -> String.format("Triggering state recomputation due to change in cluster feed block: %s -> %s", - previouslyExhausted, nowExhausted)); + context.log(logger, Level.FINE, () -> String.format("Triggering state recomputation due to change in cluster feed block: %s -> %s", + previouslyExhausted, nowExhausted)); stateChangeHandler.setStateChangedFlag(); } } @@ -421,7 +425,7 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd /** * This function gives data of the current state in master election. - * The keys in the given map are indexes of fleet controllers. + * The keys in the given map are indices of fleet controllers. * The values are what fleetcontroller that fleetcontroller wants to * become master. * @@ -431,7 +435,7 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd */ public void handleFleetData(Map<Integer, Integer> data) { verifyInControllerThread(); - log.log(Level.FINEST, "Sending fleet data event on to master election handler"); + context.log(logger, Level.FINEST, "Sending fleet data event on to master election handler"); metricUpdater.updateMasterElectionMetrics(data); masterElectionHandler.handleFleetData(data); } @@ -466,12 +470,12 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd } /** Called when all distributors have acked newest cluster state version. */ - public void handleAllDistributorsInSync(DatabaseHandler database, DatabaseHandler.Context context) throws InterruptedException { + public void handleAllDistributorsInSync(DatabaseHandler database, DatabaseHandler.DatabaseContext dbContext) throws InterruptedException { Set<ConfiguredNode> nodes = new HashSet<>(cluster.clusterInfo().getConfiguredNodes().values()); // TODO wouldn't it be better to always get bundle information from the state broadcaster? var currentBundle = stateVersionTracker.getVersionedClusterStateBundle(); - log.fine(() -> String.format("All distributors have ACKed cluster state version %d", currentBundle.getVersion())); - stateChangeHandler.handleAllDistributorsInSync(currentBundle.getBaselineClusterState(), nodes, database, context); + context.log(logger, Level.FINE, () -> String.format("All distributors have ACKed cluster state version %d", currentBundle.getVersion())); + stateChangeHandler.handleAllDistributorsInSync(currentBundle.getBaselineClusterState(), nodes, database, dbContext); convergedStates.add(currentBundle); } @@ -531,9 +535,9 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd try{ rpcServer.setSlobrokConnectionSpecs(options.slobrokConnectionSpecs, options.rpcPort); } catch (ListenFailedException e) { - log.log(Level.WARNING, "Failed to bind RPC server to port " + options.rpcPort +". This may be natural if cluster has altered the services running on this node: " + e.getMessage()); + context.log(logger, Level.WARNING, "Failed to bind RPC server to port " + options.rpcPort + ". This may be natural if cluster has altered the services running on this node: " + e.getMessage()); } catch (Exception e) { - log.log(Level.WARNING, "Failed to initialize RPC server socket: " + e.getMessage()); + context.log(logger, Level.WARNING, "Failed to initialize RPC server socket: " + e.getMessage()); } } @@ -541,7 +545,7 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd try{ statusPageServer.setPort(options.httpPort); } catch (Exception e) { - log.log(Level.WARNING, "Failed to initialize status server socket. This may be natural if cluster has altered the services running on this node: " + e.getMessage()); + context.log(logger, Level.WARNING, "Failed to initialize status server socket. This may be natural if cluster has altered the services running on this node: " + e.getMessage()); } } @@ -552,10 +556,10 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd } private void selfTerminateIfConfiguredNodeIndexHasChanged() { - if (options.fleetControllerIndex != configuredIndex) { - log.warning(String.format("Got new configuration where CC index has changed from %d to %d. We do not support "+ - "doing this live; immediately exiting now to force new configuration", - configuredIndex, options.fleetControllerIndex)); + var newId = new FleetControllerId(options.clusterName, options.fleetControllerIndex); + if (!newId.equals(context.id())) { + context.log(logger, Level.WARNING, context.id() + " got new configuration for " + newId + ". We do not support doing this live; " + + "immediately exiting now to force new configuration"); prepareShutdownEdge(); System.exit(1); } @@ -565,7 +569,7 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd verifyInControllerThread(); StatusPageResponse.ResponseCode responseCode; String message; - String hiddenMessage = ""; + final String hiddenMessage; try { StatusPageServer.RequestHandler handler = statusRequestRouter.resolveHandler(httpRequest); if (handler == null) { @@ -575,12 +579,12 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd } catch (FileNotFoundException e) { responseCode = StatusPageResponse.ResponseCode.NOT_FOUND; message = e.getMessage(); + hiddenMessage = ""; } catch (Exception e) { responseCode = StatusPageResponse.ResponseCode.INTERNAL_SERVER_ERROR; message = "Internal Server Error"; hiddenMessage = ExceptionUtils.getStackTraceAsString(e); - if (log.isLoggable(Level.FINE)) - log.log(Level.FINE, "Unknown exception thrown for request " + httpRequest.getRequest() + ": " + hiddenMessage); + context.log(logger, Level.FINE, () -> "Unknown exception thrown for request " + httpRequest.getRequest() + ": " + hiddenMessage); } TimeZone tz = TimeZone.getTimeZone("UTC"); @@ -664,7 +668,7 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd } catch (InterruptedException e) { throw new RuntimeException(e); } catch (Exception e) { - log.log(Level.WARNING, "Failed to watch master election: " + e.toString()); + context.log(logger, Level.WARNING, "Failed to watch master election: " + e); } return false; } @@ -683,7 +687,7 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd try { propagateOptions(); } catch (Exception e) { - log.log(Level.SEVERE, "Failed to handle new fleet controller config", e); + context.log(logger, Level.SEVERE, "Failed to handle new fleet controller config", e); } } @@ -702,7 +706,7 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd // If there's a pending DB store we have not yet been able to store the // current state bundle to ZK and must therefore _not_ allow it to be published. if (database.hasPendingClusterStateMetaDataStore()) { - log.log(Level.FINE, "Can't publish current cluster state as it has one or more pending ZooKeeper stores"); + context.log(logger, Level.FINE, "Can't publish current cluster state as it has one or more pending ZooKeeper stores"); return false; } boolean sentAny = false; @@ -713,7 +717,7 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd && currentTime >= nextStateSendTime) { if (inMasterMoratorium) { - log.info(currentTime < firstAllowedStateBroadcast ? + context.log(logger, Level.INFO, currentTime < firstAllowedStateBroadcast ? "Master moratorium complete: all nodes have reported in" : "Master moratorium complete: timed out waiting for all nodes to report in"); // Reset firstAllowedStateBroadcast to make sure all future times are after firstAllowedStateBroadcast @@ -763,14 +767,14 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd return false; } - final RemoteClusterControllerTask.Context context = createRemoteTaskProcessingContext(); - log.finest(() -> String.format("Processing remote task of type '%s'", task.getClass().getName())); - task.doRemoteFleetControllerTask(context); + final RemoteClusterControllerTask.Context taskContext = createRemoteTaskProcessingContext(); + context.log(logger, Level.FINEST, () -> String.format("Processing remote task of type '%s'", task.getClass().getName())); + task.doRemoteFleetControllerTask(taskContext); if (taskMayBeCompletedImmediately(task)) { - log.finest(() -> String.format("Done processing remote task of type '%s'", task.getClass().getName())); + context.log(logger, Level.FINEST, () -> String.format("Done processing remote task of type '%s'", task.getClass().getName())); task.notifyCompleted(); } else { - log.finest(() -> String.format("Remote task of type '%s' queued until state recomputation", task.getClass().getName())); + context.log(logger, Level.FINEST, () -> String.format("Remote task of type '%s' queued until state recomputation", task.getClass().getName())); tasksPendingStateRecompute.add(task); } @@ -849,14 +853,14 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd VersionDependentTaskCompletion taskCompletion = taskCompletionQueue.peek(); // TODO expose and use monotonic clock instead of system clock if (publishedVersion >= taskCompletion.getMinimumVersion()) { - log.fine(() -> String.format("Deferred task of type '%s' has minimum version %d, published is %d; completing", - taskCompletion.getTask().getClass().getName(), taskCompletion.getMinimumVersion(), publishedVersion)); + context.log(logger, Level.FINE, () -> String.format("Deferred task of type '%s' has minimum version %d, published is %d; completing", + taskCompletion.getTask().getClass().getName(), taskCompletion.getMinimumVersion(), publishedVersion)); taskCompletion.getTask().notifyCompleted(); taskCompletionQueue.remove(); } else if (taskCompletion.getDeadlineTimePointMs() <= now) { var details = buildNodesNotYetConvergedMessage(taskCompletion.getMinimumVersion()); - log.log(Level.WARNING, () -> String.format("Deferred task of type '%s' has exceeded wait deadline; completing with failure (details: %s)", - taskCompletion.getTask().getClass().getName(), details)); + context.log(logger, Level.WARNING, () -> String.format("Deferred task of type '%s' has exceeded wait deadline; completing with failure (details: %s)", + taskCompletion.getTask().getClass().getName(), details)); taskCompletion.getTask().handleFailure(RemoteClusterControllerTask.Failure.of( RemoteClusterControllerTask.FailureCondition.DEADLINE_EXCEEDED, details)); taskCompletion.getTask().notifyCompleted(); @@ -1007,8 +1011,8 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd // TODO expose and use monotonic clock instead of system clock final long maxDeadlineTimePointMs = timer.getCurrentTimeInMillis() + options.getMaxDeferredTaskVersionWaitTime().toMillis(); for (RemoteClusterControllerTask task : tasksPendingStateRecompute) { - log.finest(() -> String.format("Adding task of type '%s' to be completed at version %d", - task.getClass().getName(), completeAtVersion)); + context.log(logger, Level.FINEST, () -> String.format("Adding task of type '%s' to be completed at version %d", + task.getClass().getName(), completeAtVersion)); taskCompletionQueue.add(new VersionDependentTaskCompletion(completeAtVersion, task, maxDeadlineTimePointMs)); } tasksPendingStateRecompute.clear(); @@ -1085,7 +1089,7 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd database.loadWantedStates(databaseContext); // TODO determine if we need any specialized handling here if feed block is set in the loaded bundle - log.info(() -> String.format("Loaded previous cluster state bundle from ZooKeeper: %s", previousBundle)); + context.log(logger, Level.INFO, () -> String.format("Loaded previous cluster state bundle from ZooKeeper: %s", previousBundle)); stateVersionTracker.setClusterStateBundleRetrievedFromZooKeeper(previousBundle); eventLog.add(new ClusterEvent(ClusterEvent.Type.MASTER_ELECTION, "This node just became fleetcontroller master. Bumped version to " @@ -1094,8 +1098,8 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd firstAllowedStateBroadcast = currentTime + options.minTimeBeforeFirstSystemStateBroadcast; isMaster = true; inMasterMoratorium = true; - log.log(Level.FINE, () -> "At time " + currentTime + " we set first system state broadcast time to be " - + options.minTimeBeforeFirstSystemStateBroadcast + " ms after at time " + firstAllowedStateBroadcast + "."); + context.log(logger, Level.FINE, () -> "At time " + currentTime + " we set first system state broadcast time to be " + + options.minTimeBeforeFirstSystemStateBroadcast + " ms after at time " + firstAllowedStateBroadcast + "."); didWork = true; } if (wantedStateChanged) { @@ -1123,16 +1127,18 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd @Override public void run() { controllerThreadId = Thread.currentThread().getId(); + context.log(logger, Level.INFO, "Starting tick loop"); try { processingCycle = true; while (isRunning()) { tick(); } + context.log(logger, Level.INFO, "Tick loop stopped"); } catch (InterruptedException e) { - log.log(Level.FINE, () -> "Event thread stopped by interrupt exception: " + e); + context.log(logger, Level.INFO, "Event thread stopped by interrupt exception: ", e); } catch (Throwable t) { t.printStackTrace(); - log.log(Level.SEVERE, "Fatal error killed fleet controller", t); + context.log(logger, Level.SEVERE, "Fatal error killed fleet controller", t); synchronized (monitor) { running.set(false); } System.exit(1); } finally { @@ -1146,7 +1152,7 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd synchronized (monitor) { monitor.notifyAll(); } } - public DatabaseHandler.Context databaseContext = new DatabaseHandler.Context() { + public DatabaseHandler.DatabaseContext databaseContext = new DatabaseHandler.DatabaseContext() { @Override public ContentCluster getCluster() { return cluster; } @Override @@ -1191,7 +1197,7 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd } } if (ackedNodes >= nodeCount) { - log.log(Level.INFO, ackedNodes + " nodes now have acked system state " + version + " or higher."); + context.log(logger, Level.INFO, ackedNodes + " nodes now have acked system state " + version + " or higher."); return; } long remainingTime = maxTime - System.currentTimeMillis(); @@ -1239,5 +1245,4 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd public EventLog getEventLog() { return eventLog; } - } diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetControllerContext.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetControllerContext.java new file mode 100644 index 00000000000..cc94dd88e60 --- /dev/null +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetControllerContext.java @@ -0,0 +1,19 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.clustercontroller.core; + +import java.util.function.Supplier; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * Context for FleetController and all instances 1:1 with the FleetController. + * + * @author hakon + */ +public interface FleetControllerContext { + FleetControllerId id(); + + default void log(Logger logger, Level level, String message) { log(logger, level, () -> message); } + void log(Logger logger, Level level, String message, Throwable t); + void log(Logger logger, Level level, Supplier<String> message); +} diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetControllerContextImpl.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetControllerContextImpl.java new file mode 100644 index 00000000000..c718189e752 --- /dev/null +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetControllerContextImpl.java @@ -0,0 +1,36 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.clustercontroller.core; + +import java.util.function.Supplier; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * @author hakon + */ +public class FleetControllerContextImpl implements FleetControllerContext { + private final FleetControllerId id; + + public FleetControllerContextImpl(FleetControllerOptions options) { + this(FleetControllerId.fromOptions(options)); + } + + public FleetControllerContextImpl(FleetControllerId id) { + this.id = id; + } + + @Override + public FleetControllerId id() { return id; } + + @Override + public void log(Logger logger, Level level, String message, Throwable t) { + logger.log(level, withLogPrefix(message), t); + } + + @Override + public void log(Logger logger, Level level, Supplier<String> message) { + logger.log(level, () -> withLogPrefix(message.get())); + } + + protected String withLogPrefix(String message) { return "Cluster '" + id.clusterName() + "': " + message; } +} diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetControllerId.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetControllerId.java new file mode 100644 index 00000000000..fd0784d69c9 --- /dev/null +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetControllerId.java @@ -0,0 +1,44 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.clustercontroller.core; + +import java.util.Objects; + +/** + * Uniquely identifies a running FleetController: cluster name + index. + * + * @author hakon + */ +public class FleetControllerId { + private final String clusterName; + private final int index; + + public static FleetControllerId fromOptions(FleetControllerOptions options) { + return new FleetControllerId(options.clusterName, options.fleetControllerIndex); + } + + public FleetControllerId(String clusterName, int index) { + this.clusterName = clusterName; + this.index = index; + } + + public String clusterName() { return clusterName; } + public int index() { return index; } + + @Override + public String toString() { + return "FleetController " + index + " for cluster '" + clusterName + '\''; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + FleetControllerId that = (FleetControllerId) o; + return index == that.index && Objects.equals(clusterName, that.clusterName); + } + + @Override + public int hashCode() { + return Objects.hash(clusterName, index); + } +} diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/MasterElectionHandler.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/MasterElectionHandler.java index 9ae3d543121..637aca16ee7 100644 --- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/MasterElectionHandler.java +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/MasterElectionHandler.java @@ -12,8 +12,9 @@ import java.util.logging.Logger; */ public class MasterElectionHandler implements MasterInterface { - private static final Logger log = Logger.getLogger(MasterElectionHandler.class.getName()); + private static final Logger logger = Logger.getLogger(MasterElectionHandler.class.getName()); + private final FleetControllerContext context; private final Object monitor; private final Timer timer; private final int index; @@ -27,7 +28,8 @@ public class MasterElectionHandler implements MasterInterface { private long masterZooKeeperCooldownPeriod; // The period in ms that we won't take over unless master come back. private boolean usingZooKeeper = false; // Unit tests may not use ZooKeeper at all. - public MasterElectionHandler(int index, int totalCount, Object monitor, Timer timer) { + public MasterElectionHandler(FleetControllerContext context, int index, int totalCount, Object monitor, Timer timer) { + this.context = context; this.monitor = monitor; this.timer = timer; this.index = index; @@ -35,7 +37,7 @@ public class MasterElectionHandler implements MasterInterface { this.nextInLineCount = Integer.MAX_VALUE; // Only a given set of nodes can ever become master if (index > (totalCount - 1) / 2) { - log.log(Level.FINE, () -> "Cluster controller " + index + ": We can never become master and will always stay a follower."); + context.log(logger, Level.FINE, () -> "We can never become master and will always stay a follower."); } // Tag current time as when we have not seen any other master. Make sure we're not taking over at once for master that is on the way down masterGoneFromZooKeeperTime = timer.getCurrentTimeInMillis(); @@ -119,38 +121,38 @@ public class MasterElectionHandler implements MasterInterface { public boolean isAmongNthFirst(int first) { return (nextInLineCount < first); } public boolean watchMasterElection(DatabaseHandler database, - DatabaseHandler.Context dbContext) throws InterruptedException { + DatabaseHandler.DatabaseContext dbContext) throws InterruptedException { if (totalCount == 1 && !usingZooKeeper) { return false; // Allow single configured node to become master implicitly if no ZK configured } if (nextMasterData == null) { if (masterCandidate == null) { - log.log(Level.FINEST, () -> "Cluster controller " + index + ": No current master candidate. Waiting for data to do master election."); + context.log(logger, Level.FINEST, () -> "No current master candidate. Waiting for data to do master election."); } return false; // Nothing have happened since last time. } // Move next data to temporary, such that we don't need to keep lock, and such that we don't retry // if we happen to fail processing the data. Map<Integer, Integer> state; - log.log(Level.INFO, "Cluster controller " + index + ": Handling new master election, as we have received " + nextMasterData.size() + " entries"); + context.log(logger, Level.INFO, "Handling new master election, as we have received " + nextMasterData.size() + " entries"); synchronized (monitor) { state = nextMasterData; nextMasterData = null; } - log.log(Level.INFO, "Cluster controller " + index + ": Got master election state " + toString(state) + "."); + context.log(logger, Level.INFO, "Got master election state " + toString(state) + "."); if (state.isEmpty()) throw new IllegalStateException("Database has no master data. We should at least have data for ourselves."); Map.Entry<Integer, Integer> first = state.entrySet().iterator().next(); Integer currentMaster = getMaster(); if (currentMaster != null && first.getKey().intValue() != currentMaster.intValue()) { - log.log(Level.INFO, "Cluster controller " + index + ": Master gone from ZooKeeper. Tagging timestamp. Will wait " + this.masterZooKeeperCooldownPeriod + " ms."); + context.log(logger, Level.INFO, "Master gone from ZooKeeper. Tagging timestamp. Will wait " + this.masterZooKeeperCooldownPeriod + " ms."); masterGoneFromZooKeeperTime = timer.getCurrentTimeInMillis(); masterCandidate = null; } if (first.getValue().intValue() != first.getKey().intValue()) { - log.log(Level.INFO, "Fleet controller " + index + ": First index is not currently trying to become master. Waiting for it to change state"); + context.log(logger, Level.INFO, "First index is not currently trying to become master. Waiting for it to change state"); masterCandidate = null; if (first.getKey() == index) { - log.log(Level.INFO, "Cluster controller " + index + ": We are next in line to become master. Altering our state to look for followers"); + context.log(logger, Level.INFO, "We are next in line to become master. Altering our state to look for followers"); database.setMasterVote(dbContext, index); } } else { @@ -164,21 +166,21 @@ public class MasterElectionHandler implements MasterInterface { if (2 * followers > totalCount) { Integer newMaster = getMaster(); if (newMaster != null && currentMaster != null && newMaster.intValue() == currentMaster.intValue()) { - log.log(Level.INFO, "MASTER_ELECTION: Cluster controller " + index + ": " + currentMaster + " is still the master"); + context.log(logger, Level.INFO, currentMaster + " is still the master"); } else if (newMaster != null && currentMaster != null) { - log.log(Level.INFO, "MASTER_ELECTION: Cluster controller " + index + ": " + newMaster + " took over for fleet controller " + currentMaster + " as master"); + context.log(logger, Level.INFO, newMaster + " took over for fleet controller " + currentMaster + " as master"); } else if (newMaster == null) { - log.log(Level.INFO, "MASTER_ELECTION: Cluster controller " + index + ": " + masterCandidate + " is new master candidate, but needs to wait before it can take over"); + context.log(logger, Level.INFO, masterCandidate + " is new master candidate, but needs to wait before it can take over"); } else { - log.log(Level.INFO, "MASTER_ELECTION: Cluster controller " + index + ": " + newMaster + " is newly elected master"); + context.log(logger, Level.INFO, newMaster + " is newly elected master"); } } else { - log.log(Level.INFO, "MASTER_ELECTION: Cluster controller " + index + ": Currently too few followers for cluster controller candidate " + masterCandidate + ". No current master. (" + followers + "/" + totalCount + " followers)"); + context.log(logger, Level.INFO, "Currently too few followers for cluster controller candidate " + masterCandidate + ". No current master. (" + followers + "/" + totalCount + " followers)"); } Integer ourState = state.get(index); if (ourState == null) throw new IllegalStateException("Database lacks data from ourselves. This should always be present."); if (ourState.intValue() != first.getKey().intValue()) { - log.log(Level.INFO, "Cluster controller " + index + ": Altering our state to follow new fleet controller master candidate " + first.getKey()); + context.log(logger, Level.INFO, "Altering our state to follow new fleet controller master candidate " + first.getKey()); database.setMasterVote(dbContext, first.getKey()); } } @@ -195,7 +197,7 @@ public class MasterElectionHandler implements MasterInterface { if (nextInLineCount != ourPosition) { nextInLineCount = ourPosition; if (ourPosition > 0) { - log.log(Level.FINE, () -> "Cluster controller " + index + ": We are now " + getPosition(nextInLineCount) + " in queue to take over being master."); + context.log(logger, Level.FINE, () -> "We are now " + getPosition(nextInLineCount) + " in queue to take over being master."); } } } @@ -225,7 +227,7 @@ public class MasterElectionHandler implements MasterInterface { } public void handleFleetData(Map<Integer, Integer> data) { - log.log(Level.INFO, "Cluster controller " + index + ": Got new fleet data with " + data.size() + " entries: " + data); + context.log(logger, Level.INFO, "Got new fleet data with " + data.size() + " entries: " + data); synchronized (monitor) { nextMasterData = data; monitor.notifyAll(); @@ -234,7 +236,7 @@ public class MasterElectionHandler implements MasterInterface { public void lostDatabaseConnection() { if (totalCount > 1 || usingZooKeeper) { - log.log(Level.INFO, "Cluster controller " + index + ": Clearing master data as we lost connection on node " + index); + context.log(logger, Level.INFO, "Clearing master data as we lost connection on node " + index); resetElectionProgress(); } } diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/StateChangeHandler.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/StateChangeHandler.java index 4a57be7ddf1..5035ed1aa88 100644 --- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/StateChangeHandler.java +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/StateChangeHandler.java @@ -50,7 +50,7 @@ public class StateChangeHandler { public void handleAllDistributorsInSync(final ClusterState currentState, final Set<ConfiguredNode> nodes, final DatabaseHandler database, - final DatabaseHandler.Context dbContext) throws InterruptedException { + final DatabaseHandler.DatabaseContext dbContext) throws InterruptedException { int startTimestampsReset = 0; log.log(Level.FINE, "handleAllDistributorsInSync invoked for state version %d", currentState.getVersion()); for (NodeType nodeType : NodeType.getTypes()) { diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java index 9a0ac182017..bc8d84c4634 100644 --- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java @@ -178,7 +178,7 @@ public class SystemStateBroadcaster { return nodeIsReachable(node); } - private List<NodeInfo> resolveStateVersionSendSet(DatabaseHandler.Context dbContext) { + private List<NodeInfo> resolveStateVersionSendSet(DatabaseHandler.DatabaseContext dbContext) { return dbContext.getCluster().getNodeInfo().stream() .filter(this::nodeNeedsClusterStateBundle) .filter(node -> !newestStateBundleAlreadySentToNode(node)) @@ -186,7 +186,7 @@ public class SystemStateBroadcaster { } // Precondition: no nodes in the cluster need to receive the current cluster state version bundle - private List<NodeInfo> resolveStateActivationSendSet(DatabaseHandler.Context dbContext) { + private List<NodeInfo> resolveStateActivationSendSet(DatabaseHandler.DatabaseContext dbContext) { return dbContext.getCluster().getNodeInfo().stream() .filter(this::nodeNeedsClusterStateActivation) .filter(node -> !newestStateActivationAlreadySentToNode(node)) @@ -207,7 +207,7 @@ public class SystemStateBroadcaster { * object and updates the broadcaster's last known in-sync cluster state version. */ void checkIfClusterStateIsAckedByAllDistributors(DatabaseHandler database, - DatabaseHandler.Context dbContext, + DatabaseHandler.DatabaseContext dbContext, FleetController fleetController) throws InterruptedException { if ((clusterStateBundle == null) || currentClusterStateIsConverged()) { return; // Nothing to do for the current state @@ -248,7 +248,7 @@ public class SystemStateBroadcaster { lastStateVersionBundleAcked = clusterStateBundle.getVersion(); } - private void markCurrentClusterStateAsConverged(DatabaseHandler database, DatabaseHandler.Context dbContext, FleetController fleetController) throws InterruptedException { + private void markCurrentClusterStateAsConverged(DatabaseHandler database, DatabaseHandler.DatabaseContext dbContext, FleetController fleetController) throws InterruptedException { log.log(Level.FINE, "All distributors have newest clusterstate, updating start timestamps in zookeeper and clearing them from cluster state"); lastClusterStateVersionConverged = clusterStateBundle.getVersion(); lastClusterStateBundleConverged = clusterStateBundle; @@ -267,7 +267,7 @@ public class SystemStateBroadcaster { lastOfficialStateVersion = clusterStateBundle.getVersion(); } - public boolean broadcastNewStateBundleIfRequired(DatabaseHandler.Context dbContext, Communicator communicator, + public boolean broadcastNewStateBundleIfRequired(DatabaseHandler.DatabaseContext dbContext, Communicator communicator, int lastClusterStateVersionWrittenToZooKeeper) { if (clusterStateBundle == null || clusterStateBundle.getVersion() == 0) { return false; @@ -302,7 +302,7 @@ public class SystemStateBroadcaster { return !recipients.isEmpty(); } - public boolean broadcastStateActivationsIfRequired(DatabaseHandler.Context dbContext, Communicator communicator) { + public boolean broadcastStateActivationsIfRequired(DatabaseHandler.DatabaseContext dbContext, Communicator communicator) { if (clusterStateBundle == null || clusterStateBundle.getVersion() == 0 || !currentBundleVersionIsTaggedOfficial()) { return false; } @@ -331,7 +331,7 @@ public class SystemStateBroadcaster { return node.getStartTimestamp() != 0 && node.getWentDownWithStartTime() == node.getStartTimestamp(); } - private static ClusterState buildModifiedClusterState(ClusterState sourceState, DatabaseHandler.Context dbContext) { + private static ClusterState buildModifiedClusterState(ClusterState sourceState, DatabaseHandler.DatabaseContext dbContext) { ClusterState newState = sourceState.clone(); for (NodeInfo n : dbContext.getCluster().getNodeInfo()) { NodeState ns = newState.getNodeState(n.getNode()); diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/DatabaseHandler.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/DatabaseHandler.java index c2574f4c6cf..a7c909ded95 100644 --- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/DatabaseHandler.java +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/DatabaseHandler.java @@ -6,6 +6,7 @@ import com.yahoo.vdslib.state.NodeState; import com.yahoo.vdslib.state.State; import com.yahoo.vespa.clustercontroller.core.ClusterStateBundle; import com.yahoo.vespa.clustercontroller.core.ContentCluster; +import com.yahoo.vespa.clustercontroller.core.FleetControllerContext; import com.yahoo.vespa.clustercontroller.core.FleetController; import com.yahoo.vespa.clustercontroller.core.NodeInfo; import com.yahoo.vespa.clustercontroller.core.Timer; @@ -26,9 +27,9 @@ import java.util.logging.Logger; */ public class DatabaseHandler { - private static final Logger log = Logger.getLogger(DatabaseHandler.class.getName()); + private static final Logger logger = Logger.getLogger(DatabaseHandler.class.getName()); - public interface Context { + public interface DatabaseContext { ContentCluster getCluster(); FleetController getFleetController(); NodeAddedOrRemovedListener getNodeAddedOrRemovedListener(); @@ -56,7 +57,7 @@ public class DatabaseHandler { } private class DatabaseListener implements Database.DatabaseListener { public void handleZooKeeperSessionDown() { - log.log(Level.FINE, () -> "Fleetcontroller " + nodeIndex + ": Lost contact with zookeeper server"); + fleetControllerContext.log(logger, Level.FINE, () -> "Lost contact with zookeeper server"); synchronized(monitor) { lostZooKeeperConnectionEvent = true; monitor.notifyAll(); @@ -66,7 +67,7 @@ public class DatabaseHandler { public void handleMasterData(Map<Integer, Integer> data) { synchronized (monitor) { if (masterDataEvent != null && masterDataEvent.equals(data)) { - log.log(Level.FINE, () -> "Fleetcontroller " + nodeIndex + ": New master data was the same as the last one. Not responding to it"); + fleetControllerContext.log(logger, Level.FINE, () -> "New master data was the same as the last one. Not responding to it"); } else { masterDataEvent = data; } @@ -75,9 +76,9 @@ public class DatabaseHandler { } } + private final FleetControllerContext fleetControllerContext; private final DatabaseFactory databaseFactory; private final Timer timer; - private final int nodeIndex; private final Object monitor; private String zooKeeperAddress; private int zooKeeperSessionTimeout = 5000; @@ -91,15 +92,14 @@ public class DatabaseHandler { private long lastZooKeeperConnectionAttempt = 0; private int minimumWaitBetweenFailedConnectionAttempts = 10000; private boolean lostZooKeeperConnectionEvent = false; - private boolean connectionEstablishmentIsAllowed = false; private Map<Integer, Integer> masterDataEvent = null; - public DatabaseHandler(DatabaseFactory databaseFactory, Timer timer, String zooKeeperAddress, int ourIndex, Object monitor) throws InterruptedException + public DatabaseHandler(FleetControllerContext fleetControllerContext, DatabaseFactory databaseFactory, Timer timer, String zooKeeperAddress, Object monitor) throws InterruptedException { + this.fleetControllerContext = fleetControllerContext; this.databaseFactory = databaseFactory; this.timer = timer; - this.nodeIndex = ourIndex; - pendingStore.masterVote = ourIndex; // To begin with we'll vote for ourselves. + pendingStore.masterVote = fleetControllerContext.id().index(); // To begin with we'll vote for ourselves. this.monitor = monitor; // TODO: Require non-null, not possible now since at least ClusterFeedBlockTest uses null address this.zooKeeperAddress = zooKeeperAddress; @@ -111,8 +111,8 @@ public class DatabaseHandler { } } - public void shutdown(Context context) { - relinquishDatabaseConnectivity(context); + public void shutdown(DatabaseContext databaseContext) { + relinquishDatabaseConnectivity(databaseContext); } public boolean isClosed() { return database == null || database.isClosed(); } @@ -125,21 +125,21 @@ public class DatabaseHandler { this.minimumWaitBetweenFailedConnectionAttempts = minimumWaitBetweenFailedConnectionAttempts; } - public void reset(Context context) { + public void reset(DatabaseContext databaseContext) { final boolean wasRunning; synchronized (databaseMonitor) { wasRunning = database != null; if (wasRunning) { - log.log(Level.INFO, "Fleetcontroller " + nodeIndex + ": Resetting database state"); + fleetControllerContext.log(logger, Level.INFO, "Resetting database state"); database.close(); database = null; } } clearSessionMetaData(true); - context.getFleetController().lostDatabaseConnection(); + databaseContext.getFleetController().lostDatabaseConnection(); if (wasRunning) { - log.log(Level.INFO, "Fleetcontroller " + nodeIndex + ": Done resetting database state"); + fleetControllerContext.log(logger, Level.INFO, "Done resetting database state"); } } @@ -157,24 +157,24 @@ public class DatabaseHandler { pendingStore.clearNonClusterStateFields(); } pendingStore.masterVote = currentVote; - log.log(Level.FINE, () -> "Cleared session metadata. Pending master vote is now " + pendingStore.masterVote); + fleetControllerContext.log(logger, Level.FINE, () -> "Cleared session metadata. Pending master vote is now " + pendingStore.masterVote); } - public void setZooKeeperAddress(String address, Context context) { + public void setZooKeeperAddress(String address, DatabaseContext databaseContext) { if (address == null && zooKeeperAddress == null) return; if (address != null && address.equals(zooKeeperAddress)) return; if (zooKeeperAddress != null) { - log.log(Level.INFO, "Fleetcontroller " + nodeIndex + ": " + (address == null ? "Stopped using ZooKeeper." : "Got new ZooKeeper address to use: " + address)); + fleetControllerContext.log(logger, Level.INFO, "" + (address == null ? "Stopped using ZooKeeper." : "Got new ZooKeeper address to use: " + address)); } zooKeeperAddress = address; - reset(context); + reset(databaseContext); } - public void setZooKeeperSessionTimeout(int timeout, Context context) { + public void setZooKeeperSessionTimeout(int timeout, DatabaseContext databaseContext) { if (timeout == zooKeeperSessionTimeout) return; - log.log(Level.FINE, () -> "Fleetcontroller " + nodeIndex + ": Got new ZooKeeper session timeout of " + timeout + " milliseconds."); + fleetControllerContext.log(logger, Level.FINE, () -> "Got new ZooKeeper session timeout of " + timeout + " milliseconds."); zooKeeperSessionTimeout = timeout; - reset(context); + reset(databaseContext); } private boolean usingZooKeeper() { return (zooKeeperAddress != null); } @@ -190,31 +190,30 @@ public class DatabaseHandler { // Don't clear pending state writes in case they were attempted prior to connect() // being called, but after receiving a database loss event. clearSessionMetaData(false); - log.log(Level.INFO, - "Fleetcontroller " + nodeIndex + ": Setting up new ZooKeeper session at " + zooKeeperAddress); + fleetControllerContext.log(logger, Level.INFO, "Setting up new ZooKeeper session at " + zooKeeperAddress); DatabaseFactory.Params params = new DatabaseFactory.Params() .cluster(cluster) - .nodeIndex(nodeIndex) + .nodeIndex(fleetControllerContext.id().index()) .databaseAddress(zooKeeperAddress) .databaseSessionTimeout(zooKeeperSessionTimeout) .databaseListener(dbListener); database = databaseFactory.create(params); } } catch (KeeperException.NodeExistsException e) { - log.log(Level.FINE, () -> "Fleetcontroller " + nodeIndex + ": Cannot create ephemeral fleetcontroller node. ZooKeeper server " - + "not seen old fleetcontroller instance disappear? It already exists. Will retry later: " + e.getMessage()); + fleetControllerContext.log(logger, Level.FINE, () -> "Cannot create ephemeral fleetcontroller node. ZooKeeper server " + + "not seen old fleetcontroller instance disappear? It already exists. Will retry later: " + e.getMessage()); } catch (InterruptedException e) { throw new RuntimeException(e); } catch (KeeperException.ConnectionLossException e) { - log.log(Level.WARNING, "Fleetcontroller " + nodeIndex + ": Failed to connect to ZooKeeper at " + zooKeeperAddress - + " with session timeout " + zooKeeperSessionTimeout + ": " + e.getMessage()); + fleetControllerContext.log(logger, Level.WARNING, "Failed to connect to ZooKeeper at " + zooKeeperAddress + + " with session timeout " + zooKeeperSessionTimeout + ": " + e.getMessage()); } catch (Exception e) { StringWriter sw = new StringWriter(); e.printStackTrace(new PrintWriter(sw)); - log.log(Level.WARNING, "Fleetcontroller " + nodeIndex + ": Failed to connect to ZooKeeper at " + zooKeeperAddress - + " with session timeout " + zooKeeperSessionTimeout + ": " + sw); + fleetControllerContext.log(logger, Level.WARNING, "Failed to connect to ZooKeeper at " + zooKeeperAddress + + " with session timeout " + zooKeeperSessionTimeout + ": " + sw); } - log.log(Level.INFO, "Fleetcontroller " + nodeIndex + ": Done setting up new ZooKeeper session at " + zooKeeperAddress); + fleetControllerContext.log(logger, Level.INFO, "Done setting up new ZooKeeper session at " + zooKeeperAddress); } /** @@ -222,27 +221,27 @@ public class DatabaseHandler { * * @return true if we did or attempted any work. */ - public boolean doNextZooKeeperTask(Context context) { + public boolean doNextZooKeeperTask(DatabaseContext databaseContext) { boolean didWork = false; synchronized (monitor) { if (lostZooKeeperConnectionEvent) { - log.log(Level.FINE, () -> "Fleetcontroller " + nodeIndex + ": doNextZooKeeperTask(): lost connection"); - context.getFleetController().lostDatabaseConnection(); + fleetControllerContext.log(logger, Level.FINE, () -> "doNextZooKeeperTask(): lost connection"); + databaseContext.getFleetController().lostDatabaseConnection(); lostZooKeeperConnectionEvent = false; didWork = true; if (masterDataEvent != null) { - log.log(Level.FINE, () -> "Fleetcontroller " + nodeIndex + ": Had new master data queued on disconnect. Removing master data event"); + fleetControllerContext.log(logger, Level.FINE, () -> "Had new master data queued on disconnect. Removing master data event"); masterDataEvent = null; } } if (masterDataEvent != null) { - log.log(Level.FINE, () -> "Fleetcontroller " + nodeIndex + ": doNextZooKeeperTask(): new master data"); - if (!masterDataEvent.containsKey(nodeIndex)) { + fleetControllerContext.log(logger, Level.FINE, () -> "doNextZooKeeperTask(): new master data"); + if (!masterDataEvent.containsKey(fleetControllerContext.id().index())) { Integer currentVote = (pendingStore.masterVote != null ? pendingStore.masterVote : currentlyStored.masterVote); assert(currentVote != null); - masterDataEvent.put(nodeIndex, currentVote); + masterDataEvent.put(fleetControllerContext.id().index(), currentVote); } - context.getFleetController().handleFleetData(masterDataEvent); + databaseContext.getFleetController().handleFleetData(masterDataEvent); masterDataEvent = null; didWork = true; } @@ -253,7 +252,7 @@ public class DatabaseHandler { return false; // Not time to attempt connection yet. } didWork = true; - connect(context.getCluster(), currentTime); + connect(databaseContext.getCluster(), currentTime); } try { synchronized (databaseMonitor) { @@ -263,11 +262,11 @@ public class DatabaseHandler { didWork |= performZooKeeperWrites(); } } catch (CasWriteFailed e) { - log.log(Level.WARNING, String.format("CaS write to ZooKeeper failed, another controller " + - "has likely taken over ownership: %s", e.getMessage())); + fleetControllerContext.log(logger, Level.WARNING, String.format("CaS write to ZooKeeper failed, another controller " + + "has likely taken over ownership: %s", e.getMessage())); // Clear DB and master election state. This shall trigger a full re-fetch of all // version and election-related metadata. - relinquishDatabaseConnectivity(context); + relinquishDatabaseConnectivity(databaseContext); } return didWork; } @@ -277,32 +276,31 @@ public class DatabaseHandler { return zooKeeperAddress != null; } - private void relinquishDatabaseConnectivity(Context context) { + private void relinquishDatabaseConnectivity(DatabaseContext databaseContext) { // reset() will handle both session clearing and trigger a database loss callback into the CC. - reset(context); + reset(databaseContext); } private boolean performZooKeeperWrites() { boolean didWork = false; if (pendingStore.masterVote != null) { didWork = true; - log.log(Level.FINE, () -> "Fleetcontroller " + nodeIndex + ": Attempting to store master vote " - + pendingStore.masterVote + " into zookeeper."); + fleetControllerContext.log(logger, Level.FINE, () -> "Attempting to store master vote " + + pendingStore.masterVote + " into zookeeper."); if (database.storeMasterVote(pendingStore.masterVote)) { - log.log(Level.FINE, () -> "Fleetcontroller " + nodeIndex + ": Managed to store master vote " - + pendingStore.masterVote + " into zookeeper."); + fleetControllerContext.log(logger, Level.FINE, () -> "Managed to store master vote " + + pendingStore.masterVote + " into zookeeper."); currentlyStored.masterVote = pendingStore.masterVote; pendingStore.masterVote = null; } else { - log.log(Level.WARNING, "Fleetcontroller " + nodeIndex + ": Failed to store master vote"); + fleetControllerContext.log(logger, Level.WARNING, "Failed to store master vote"); return true; } } if (pendingStore.lastSystemStateVersion != null) { didWork = true; - log.log(Level.FINE, () -> "Fleetcontroller " + nodeIndex - + ": Attempting to store last system state version " + pendingStore.lastSystemStateVersion - + " into zookeeper."); + fleetControllerContext.log(logger, Level.FINE, () -> "Attempting to store last system state version " + + pendingStore.lastSystemStateVersion + " into zookeeper."); if (database.storeLatestSystemStateVersion(pendingStore.lastSystemStateVersion)) { currentlyStored.lastSystemStateVersion = pendingStore.lastSystemStateVersion; pendingStore.lastSystemStateVersion = null; @@ -312,8 +310,8 @@ public class DatabaseHandler { } if (pendingStore.startTimestamps != null) { didWork = true; - log.log(Level.FINE, () -> "Fleetcontroller " + nodeIndex + ": Attempting to store " - + pendingStore.startTimestamps.size() + " start timestamps into zookeeper."); + fleetControllerContext.log(logger, Level.FINE, () -> "Attempting to store " + pendingStore.startTimestamps.size() + + " start timestamps into zookeeper."); if (database.storeStartTimestamps(pendingStore.startTimestamps)) { currentlyStored.startTimestamps = pendingStore.startTimestamps; pendingStore.startTimestamps = null; @@ -323,8 +321,8 @@ public class DatabaseHandler { } if (pendingStore.wantedStates != null) { didWork = true; - log.log(Level.FINE, () -> "Fleetcontroller " + nodeIndex + ": Attempting to store " - + pendingStore.wantedStates.size() + " wanted states into zookeeper."); + fleetControllerContext.log(logger, Level.FINE, () -> "Attempting to store " + + pendingStore.wantedStates.size() + " wanted states into zookeeper."); if (database.storeWantedStates(pendingStore.wantedStates)) { currentlyStored.wantedStates = pendingStore.wantedStates; pendingStore.wantedStates = null; @@ -334,8 +332,8 @@ public class DatabaseHandler { } if (pendingStore.clusterStateBundle != null) { didWork = true; - log.fine(() -> String.format("Fleetcontroller %d: Attempting to store last cluster state bundle with version %d into zookeeper.", - nodeIndex, pendingStore.clusterStateBundle.getVersion())); + fleetControllerContext.log(logger, Level.FINE, () -> "Attempting to store last cluster state bundle with version " + + pendingStore.clusterStateBundle.getVersion() + " into zookeeper."); if (database.storeLastPublishedStateBundle(pendingStore.clusterStateBundle)) { lastKnownStateBundleVersionWrittenBySelf = pendingStore.clusterStateBundle.getVersion(); currentlyStored.clusterStateBundle = pendingStore.clusterStateBundle; @@ -347,8 +345,8 @@ public class DatabaseHandler { return didWork; } - public void setMasterVote(Context context, int wantedMasterCandidate) throws InterruptedException { - log.log(Level.FINE, () -> "Fleetcontroller " + nodeIndex + ": Checking if master vote has been updated and need to be stored."); + public void setMasterVote(DatabaseContext databaseContext, int wantedMasterCandidate) throws InterruptedException { + fleetControllerContext.log(logger, Level.FINE, () -> "Checking if master vote has been updated and need to be stored."); // Schedule a write if one of the following is true: // - There is already a pending vote to be written, that may have been written already without our knowledge // - We don't know what is actually stored now @@ -356,14 +354,14 @@ public class DatabaseHandler { if (pendingStore.masterVote != null || currentlyStored.masterVote == null || currentlyStored.masterVote != wantedMasterCandidate) { - log.log(Level.FINE, () -> "Fleetcontroller " + nodeIndex + ": Scheduling master vote " + wantedMasterCandidate + " to be stored in zookeeper."); + fleetControllerContext.log(logger, Level.FINE, () -> "Scheduling master vote " + wantedMasterCandidate + " to be stored in zookeeper."); pendingStore.masterVote = wantedMasterCandidate; - doNextZooKeeperTask(context); + doNextZooKeeperTask(databaseContext); } } - public void saveLatestSystemStateVersion(Context context, int version) throws InterruptedException { - log.log(Level.FINE, () -> "Fleetcontroller " + nodeIndex + ": Checking if latest system state version has been updated and need to be stored."); + public void saveLatestSystemStateVersion(DatabaseContext databaseContext, int version) throws InterruptedException { + fleetControllerContext.log(logger, Level.FINE, () -> "Checking if latest system state version has been updated and need to be stored."); // Schedule a write if one of the following is true: // - There is already a pending vote to be written, that may have been written already without our knowledge // - We don't know what is actually stored now @@ -371,14 +369,14 @@ public class DatabaseHandler { if (pendingStore.lastSystemStateVersion != null || currentlyStored.lastSystemStateVersion == null || currentlyStored.lastSystemStateVersion != version) { - log.log(Level.FINE, () -> "Fleetcontroller " + nodeIndex + ": Scheduling new last system state version " + version + " to be stored in zookeeper."); + fleetControllerContext.log(logger, Level.FINE, () -> "Scheduling new last system state version " + version + " to be stored in zookeeper."); pendingStore.lastSystemStateVersion = version; - doNextZooKeeperTask(context); + doNextZooKeeperTask(databaseContext); } } public int getLatestSystemStateVersion() { - log.log(Level.FINE, () -> "Fleetcontroller " + nodeIndex + ": Retrieving latest system state version."); + fleetControllerContext.log(logger, Level.FINE, () -> "Retrieving latest system state version."); synchronized (databaseMonitor) { if (database != null && !database.isClosed()) { currentlyStored.lastSystemStateVersion = database.retrieveLatestSystemStateVersion(); @@ -387,23 +385,23 @@ public class DatabaseHandler { Integer version = currentlyStored.lastSystemStateVersion; if (version == null) { if (usingZooKeeper()) { - log.log(Level.WARNING, "Fleetcontroller " + nodeIndex + ": Failed to retrieve latest system state version from ZooKeeper. Returning version 0."); + fleetControllerContext.log(logger, Level.WARNING, "Failed to retrieve latest system state version from ZooKeeper. Returning version 0."); } return 0; // FIXME "fail-oblivious" is not a good error handling mode for such a critical component! } return version; } - public void saveLatestClusterStateBundle(Context context, ClusterStateBundle clusterStateBundle) { - log.log(Level.FINE, () -> String.format("Fleetcontroller %d: Scheduling bundle %s to be saved to ZooKeeper", nodeIndex, clusterStateBundle)); + public void saveLatestClusterStateBundle(DatabaseContext databaseContext, ClusterStateBundle clusterStateBundle) { + fleetControllerContext.log(logger, Level.FINE, () -> "Scheduling bundle " + clusterStateBundle + " to be saved to ZooKeeper"); pendingStore.clusterStateBundle = clusterStateBundle; - doNextZooKeeperTask(context); + doNextZooKeeperTask(databaseContext); // FIXME this is a nasty hack to get around the fact that a massive amount of unit tests // set up the system with a null ZooKeeper server address. If we don't fake that we have // written the state version, the tests will never progress past waiting for state broadcasts. if (zooKeeperAddress == null) { - log.warning(() -> String.format("Fleetcontroller %d: Simulating ZK write of version %d. This should not happen in production!", - nodeIndex, clusterStateBundle.getVersion())); + logger.warning(() -> "Simulating ZK write of version " + clusterStateBundle.getVersion() + + ". This should not happen in production!"); lastKnownStateBundleVersionWrittenBySelf = clusterStateBundle.getVersion(); } } @@ -418,7 +416,7 @@ public class DatabaseHandler { } public ClusterStateBundle getLatestClusterStateBundle() { - log.log(Level.FINE, () -> String.format("Fleetcontroller %d: Retrieving latest cluster state bundle from ZooKeeper", nodeIndex)); + fleetControllerContext.log(logger, Level.FINE, () -> "Retrieving latest cluster state bundle from ZooKeeper"); synchronized (databaseMonitor) { if (database != null && !database.isClosed()) { return database.retrieveLastPublishedStateBundle(); @@ -428,10 +426,10 @@ public class DatabaseHandler { } } - public void saveWantedStates(Context context) { - log.log(Level.FINE, () -> "Fleetcontroller " + nodeIndex + ": Checking whether wanted states have changed compared to zookeeper version."); + public void saveWantedStates(DatabaseContext databaseContext) { + fleetControllerContext.log(logger, Level.FINE, () -> "Checking whether wanted states have changed compared to zookeeper version."); Map<Node, NodeState> wantedStates = new TreeMap<>(); - for (NodeInfo info : context.getCluster().getNodeInfo()) { + for (NodeInfo info : databaseContext.getCluster().getNodeInfo()) { if (!info.getUserWantedState().equals(new NodeState(info.getNode().getType(), State.UP))) { wantedStates.put(info.getNode(), info.getUserWantedState()); } @@ -443,14 +441,14 @@ public class DatabaseHandler { if (pendingStore.wantedStates != null || currentlyStored.wantedStates == null || !currentlyStored.wantedStates.equals(wantedStates)) { - log.log(Level.FINE, () -> "Fleetcontroller " + nodeIndex + ": Scheduling new wanted states to be stored into zookeeper."); + fleetControllerContext.log(logger, Level.FINE, () -> "Scheduling new wanted states to be stored into zookeeper."); pendingStore.wantedStates = wantedStates; - doNextZooKeeperTask(context); + doNextZooKeeperTask(databaseContext); } } - public boolean loadWantedStates(Context context) { - log.log(Level.FINE, () -> "Fleetcontroller " + nodeIndex + ": Retrieving node wanted states."); + public boolean loadWantedStates(DatabaseContext databaseContext) { + fleetControllerContext.log(logger, Level.FINE, () -> "Retrieving node wanted states."); synchronized (databaseMonitor) { if (database != null && !database.isClosed()) { currentlyStored.wantedStates = database.retrieveWantedStates(); @@ -461,43 +459,43 @@ public class DatabaseHandler { if (usingZooKeeper()) { // We get here if the ZooKeeper client has lost the connection to ZooKeeper. // TODO: Should instead fail the tick until connected!? - log.log(Level.FINE, () -> "Fleetcontroller " + nodeIndex + ": Failed to retrieve wanted states from ZooKeeper. Assuming UP for all nodes."); + fleetControllerContext.log(logger, Level.FINE, () -> "Failed to retrieve wanted states from ZooKeeper. Assuming UP for all nodes."); } wantedStates = new TreeMap<>(); } boolean altered = false; for (Node node : wantedStates.keySet()) { - NodeInfo nodeInfo = context.getCluster().getNodeInfo(node); + NodeInfo nodeInfo = databaseContext.getCluster().getNodeInfo(node); if (nodeInfo == null) continue; // ignore wanted state of nodes which doesn't exist NodeState wantedState = wantedStates.get(node); if ( ! nodeInfo.getUserWantedState().equals(wantedState)) { nodeInfo.setWantedState(wantedState); - context.getNodeStateUpdateListener().handleNewWantedNodeState(nodeInfo, wantedState); + databaseContext.getNodeStateUpdateListener().handleNewWantedNodeState(nodeInfo, wantedState); altered = true; } - log.log(Level.FINE, () -> "Fleetcontroller " + nodeIndex + ": Node " + node + " has wanted state " + wantedState); + fleetControllerContext.log(logger, Level.FINE, () -> "Node " + node + " has wanted state " + wantedState); } // Remove wanted state from any node having a wanted state set that is no longer valid - for (NodeInfo info : context.getCluster().getNodeInfo()) { + for (NodeInfo info : databaseContext.getCluster().getNodeInfo()) { NodeState wantedState = wantedStates.get(info.getNode()); if (wantedState == null && !info.getUserWantedState().equals(new NodeState(info.getNode().getType(), State.UP))) { info.setWantedState(null); - context.getNodeStateUpdateListener().handleNewWantedNodeState(info, info.getWantedState().clone()); + databaseContext.getNodeStateUpdateListener().handleNewWantedNodeState(info, info.getWantedState().clone()); altered = true; } } return altered; } - public void saveStartTimestamps(Context context) { - log.log(Level.FINE, () -> "Fleetcontroller " + nodeIndex + ": Scheduling start timestamps to be stored into zookeeper."); - pendingStore.startTimestamps = context.getCluster().getStartTimestamps(); - doNextZooKeeperTask(context); + public void saveStartTimestamps(DatabaseContext databaseContext) { + fleetControllerContext.log(logger, Level.FINE, () -> "Scheduling start timestamps to be stored into zookeeper."); + pendingStore.startTimestamps = databaseContext.getCluster().getStartTimestamps(); + doNextZooKeeperTask(databaseContext); } public boolean loadStartTimestamps(ContentCluster cluster) { - log.log(Level.FINE, () -> "Fleetcontroller " + nodeIndex + ": Retrieving start timestamps"); + fleetControllerContext.log(logger, Level.FINE, () -> "Retrieving start timestamps"); synchronized (databaseMonitor) { if (database == null || database.isClosed()) { return false; @@ -507,13 +505,13 @@ public class DatabaseHandler { Map<Node, Long> startTimestamps = currentlyStored.startTimestamps; if (startTimestamps == null) { if (usingZooKeeper()) { - log.log(Level.WARNING, "Fleetcontroller " + nodeIndex + ": Failed to retrieve start timestamps from ZooKeeper. Cluster state will be bloated with timestamps until we get them set."); + fleetControllerContext.log(logger, Level.WARNING, "Failed to retrieve start timestamps from ZooKeeper. Cluster state will be bloated with timestamps until we get them set."); } startTimestamps = new TreeMap<>(); } for (Map.Entry<Node, Long> e : startTimestamps.entrySet()) { cluster.setStartTimestamp(e.getKey(), e.getValue()); - log.log(Level.FINE, () -> "Fleetcontroller " + nodeIndex + ": Node " + e.getKey() + " has start timestamp " + e.getValue()); + fleetControllerContext.log(logger, Level.FINE, () -> "Node " + e.getKey() + " has start timestamp " + e.getValue()); } return true; } diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ClusterFeedBlockTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ClusterFeedBlockTest.java index 308a78ee40e..7c4e05aa0e9 100644 --- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ClusterFeedBlockTest.java +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ClusterFeedBlockTest.java @@ -48,16 +48,17 @@ public class ClusterFeedBlockTest extends FleetControllerTest { nodes.add(new Node(NodeType.DISTRIBUTOR, i)); } + var context = new TestFleetControllerContext(options); communicator = new DummyCommunicator(nodes, timer); - MetricUpdater metricUpdater = new MetricUpdater(new NoMetricReporter(), options.fleetControllerIndex, options.clusterName); - EventLog eventLog = new EventLog(timer, metricUpdater); - ContentCluster cluster = new ContentCluster(options.clusterName, options.nodes, options.storageDistribution); - NodeStateGatherer stateGatherer = new NodeStateGatherer(timer, timer, eventLog); - DatabaseHandler database = new DatabaseHandler(new ZooKeeperDatabaseFactory(), timer, options.zooKeeperServerAddress, options.fleetControllerIndex, timer); - StateChangeHandler stateGenerator = new StateChangeHandler(timer, eventLog); - SystemStateBroadcaster stateBroadcaster = new SystemStateBroadcaster(timer, timer); - MasterElectionHandler masterElectionHandler = new MasterElectionHandler(options.fleetControllerIndex, options.fleetControllerCount, timer, timer); - ctrl = new FleetController(timer, eventLog, cluster, stateGatherer, communicator, null, null, communicator, database, stateGenerator, stateBroadcaster, masterElectionHandler, metricUpdater, options); + var metricUpdater = new MetricUpdater(new NoMetricReporter(), options.fleetControllerIndex, options.clusterName); + var eventLog = new EventLog(timer, metricUpdater); + var cluster = new ContentCluster(options.clusterName, options.nodes, options.storageDistribution); + var stateGatherer = new NodeStateGatherer(timer, timer, eventLog); + var database = new DatabaseHandler(context, new ZooKeeperDatabaseFactory(), timer, options.zooKeeperServerAddress, timer); + var stateGenerator = new StateChangeHandler(timer, eventLog); + var stateBroadcaster = new SystemStateBroadcaster(timer, timer); + var masterElectionHandler = new MasterElectionHandler(context, options.fleetControllerIndex, options.fleetControllerCount, timer, timer); + ctrl = new FleetController(context, timer, eventLog, cluster, stateGatherer, communicator, null, null, communicator, database, stateGenerator, stateBroadcaster, masterElectionHandler, metricUpdater, options); ctrl.tick(); markAllNodesAsUp(options); diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DatabaseHandlerTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DatabaseHandlerTest.java index 630a4898dfa..35431703824 100644 --- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DatabaseHandlerTest.java +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DatabaseHandlerTest.java @@ -10,7 +10,6 @@ import org.junit.Test; import static org.hamcrest.Matchers.equalTo; import static org.junit.Assert.assertThat; -import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.eq; @@ -41,8 +40,8 @@ public class DatabaseHandlerTest { when(mockTimer.getCurrentTimeInMillis()).thenReturn(1000000L); } - DatabaseHandler.Context createMockContext() { - return new DatabaseHandler.Context() { + DatabaseHandler.DatabaseContext createMockContext() { + return new DatabaseHandler.DatabaseContext() { @Override public ContentCluster getCluster() { return clusterFixture.cluster(); @@ -66,7 +65,9 @@ public class DatabaseHandlerTest { } DatabaseHandler createHandler() throws Exception { - return new DatabaseHandler(mockDbFactory, mockTimer, databaseAddress, 0, monitor); + FleetControllerContext fleetControllerContext = mock(FleetControllerContext.class); + when(fleetControllerContext.id()).thenReturn(new FleetControllerId("clusterName", 0)); + return new DatabaseHandler(fleetControllerContext, mockDbFactory, mockTimer, databaseAddress, monitor); } } diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DistributionBitCountTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DistributionBitCountTest.java index 0f172f6f248..e38e48375ed 100644 --- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DistributionBitCountTest.java +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DistributionBitCountTest.java @@ -7,11 +7,12 @@ import com.yahoo.vdslib.state.NodeState; import com.yahoo.vdslib.state.NodeType; import com.yahoo.vdslib.state.State; import org.junit.Test; -import static org.junit.Assert.assertEquals; import java.util.ArrayList; import java.util.List; +import static org.junit.Assert.assertEquals; + public class DistributionBitCountTest extends FleetControllerTest { private void setUpSystem(String testName) throws Exception { diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/FleetControllerTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/FleetControllerTest.java index 55677782a3d..d115f9f0060 100644 --- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/FleetControllerTest.java +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/FleetControllerTest.java @@ -157,15 +157,16 @@ public abstract class FleetControllerTest implements Waiter { FleetController createFleetController(boolean useFakeTimer, FleetControllerOptions options, boolean startThread, StatusPageServerInterface status) throws Exception { Objects.requireNonNull(status, "status server cannot be null"); + var context = new TestFleetControllerContext(options); Timer timer = useFakeTimer ? this.timer : new RealTimer(); - MetricUpdater metricUpdater = new MetricUpdater(new NoMetricReporter(), options.fleetControllerIndex, options.clusterName); - EventLog log = new EventLog(timer, metricUpdater); - ContentCluster cluster = new ContentCluster( + var metricUpdater = new MetricUpdater(new NoMetricReporter(), options.fleetControllerIndex, options.clusterName); + var log = new EventLog(timer, metricUpdater); + var cluster = new ContentCluster( options.clusterName, options.nodes, options.storageDistribution); - NodeStateGatherer stateGatherer = new NodeStateGatherer(timer, timer, log); - Communicator communicator = new RPCCommunicator( + var stateGatherer = new NodeStateGatherer(timer, timer, log); + var communicator = new RPCCommunicator( RPCCommunicator.createRealSupervisor(), timer, options.fleetControllerIndex, @@ -173,20 +174,20 @@ public abstract class FleetControllerTest implements Waiter { options.nodeStateRequestTimeoutEarliestPercentage, options.nodeStateRequestTimeoutLatestPercentage, options.nodeStateRequestRoundTripTimeMaxSeconds); - SlobrokClient lookUp = new SlobrokClient(timer); + var lookUp = new SlobrokClient(timer); lookUp.setSlobrokConnectionSpecs(new String[0]); - RpcServer rpcServer = new RpcServer(timer, timer, options.clusterName, options.fleetControllerIndex, options.slobrokBackOffPolicy); - DatabaseHandler database = new DatabaseHandler(new ZooKeeperDatabaseFactory(), timer, options.zooKeeperServerAddress, options.fleetControllerIndex, timer); + var rpcServer = new RpcServer(timer, timer, options.clusterName, options.fleetControllerIndex, options.slobrokBackOffPolicy); + var database = new DatabaseHandler(context, new ZooKeeperDatabaseFactory(), timer, options.zooKeeperServerAddress, timer); // Setting this <1000 ms causes ECONNREFUSED on socket trying to connect to ZK server, in ZooKeeper, // after creating a new ZooKeeper (session). This causes ~10s extra time to connect after connection loss. // Reasons unknown. Larger values like the default 10_000 causes that much additional running time for some tests. database.setMinimumWaitBetweenFailedConnectionAttempts(2_000); - StateChangeHandler stateGenerator = new StateChangeHandler(timer, log); - SystemStateBroadcaster stateBroadcaster = new SystemStateBroadcaster(timer, timer); - MasterElectionHandler masterElectionHandler = new MasterElectionHandler(options.fleetControllerIndex, options.fleetControllerCount, timer, timer); - FleetController controller = new FleetController(timer, log, cluster, stateGatherer, communicator, status, rpcServer, lookUp, database, stateGenerator, stateBroadcaster, masterElectionHandler, metricUpdater, options); + var stateGenerator = new StateChangeHandler(timer, log); + var stateBroadcaster = new SystemStateBroadcaster(timer, timer); + var masterElectionHandler = new MasterElectionHandler(context, options.fleetControllerIndex, options.fleetControllerCount, timer, timer); + var controller = new FleetController(context, timer, log, cluster, stateGatherer, communicator, status, rpcServer, lookUp, database, stateGenerator, stateBroadcaster, masterElectionHandler, metricUpdater, options); if (startThread) { controller.start(); } diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/GroupAutoTakedownTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/GroupAutoTakedownTest.java index 83167367a91..a3bc39bde39 100644 --- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/GroupAutoTakedownTest.java +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/GroupAutoTakedownTest.java @@ -307,7 +307,7 @@ public class GroupAutoTakedownTest { assertEquals("distributor:6 storage:6 .4.t:123456", fixture.generatedClusterState()); DatabaseHandler handler = mock(DatabaseHandler.class); - DatabaseHandler.Context context = mock(DatabaseHandler.Context.class); + DatabaseHandler.DatabaseContext context = mock(DatabaseHandler.DatabaseContext.class); when(context.getCluster()).thenReturn(fixture.cluster); Set<ConfiguredNode> nodes = new HashSet<>(fixture.cluster.clusterInfo().getConfiguredNodes().values()); diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/MasterElectionTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/MasterElectionTest.java index 8d32986bcaf..0599baf7c7a 100644 --- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/MasterElectionTest.java +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/MasterElectionTest.java @@ -7,7 +7,6 @@ import com.yahoo.jrt.Supervisor; import com.yahoo.jrt.Target; import com.yahoo.jrt.Transport; import com.yahoo.jrt.slobrok.server.Slobrok; -import java.util.logging.Level; import com.yahoo.vdslib.state.ClusterState; import com.yahoo.vdslib.state.NodeState; import com.yahoo.vdslib.state.NodeType; @@ -22,6 +21,7 @@ import org.junit.rules.Timeout; import java.util.ArrayList; import java.util.List; import java.util.concurrent.TimeoutException; +import java.util.logging.Level; import java.util.logging.Logger; import java.util.stream.Stream; diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateChangeTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateChangeTest.java index 2a0ab389865..b601412ecc4 100644 --- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateChangeTest.java +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateChangeTest.java @@ -16,12 +16,6 @@ import com.yahoo.vespa.clustercontroller.utils.util.NoMetricReporter; import org.junit.Before; import org.junit.Test; -import static org.hamcrest.CoreMatchers.equalTo; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - import java.time.Duration; import java.util.ArrayList; import java.util.HashMap; @@ -29,6 +23,12 @@ import java.util.List; import java.util.Map; import java.util.logging.Logger; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + public class StateChangeTest extends FleetControllerTest { public static Logger log = Logger.getLogger(StateChangeTest.class.getName()); @@ -49,16 +49,17 @@ public class StateChangeTest extends FleetControllerTest { nodes.add(new Node(NodeType.DISTRIBUTOR, i)); } + var context = new TestFleetControllerContext(options); communicator = new DummyCommunicator(nodes, timer); - MetricUpdater metricUpdater = new MetricUpdater(new NoMetricReporter(), options.fleetControllerIndex, options.clusterName); + var metricUpdater = new MetricUpdater(new NoMetricReporter(), options.fleetControllerIndex, options.clusterName); eventLog = new EventLog(timer, metricUpdater); - ContentCluster cluster = new ContentCluster(options.clusterName, options.nodes, options.storageDistribution); - NodeStateGatherer stateGatherer = new NodeStateGatherer(timer, timer, eventLog); - DatabaseHandler database = new DatabaseHandler(new ZooKeeperDatabaseFactory(), timer, options.zooKeeperServerAddress, options.fleetControllerIndex, timer); - StateChangeHandler stateGenerator = new StateChangeHandler(timer, eventLog); - SystemStateBroadcaster stateBroadcaster = new SystemStateBroadcaster(timer, timer); - MasterElectionHandler masterElectionHandler = new MasterElectionHandler(options.fleetControllerIndex, options.fleetControllerCount, timer, timer); - ctrl = new FleetController(timer, eventLog, cluster, stateGatherer, communicator, null, null, communicator, database, stateGenerator, stateBroadcaster, masterElectionHandler, metricUpdater, options); + var cluster = new ContentCluster(options.clusterName, options.nodes, options.storageDistribution); + var stateGatherer = new NodeStateGatherer(timer, timer, eventLog); + var database = new DatabaseHandler(context, new ZooKeeperDatabaseFactory(), timer, options.zooKeeperServerAddress, timer); + var stateGenerator = new StateChangeHandler(timer, eventLog); + var stateBroadcaster = new SystemStateBroadcaster(timer, timer); + var masterElectionHandler = new MasterElectionHandler(context, options.fleetControllerIndex, options.fleetControllerCount, timer, timer); + ctrl = new FleetController(context, timer, eventLog, cluster, stateGatherer, communicator, null, null, communicator, database, stateGenerator, stateBroadcaster, masterElectionHandler, metricUpdater, options); ctrl.tick(); if (options.fleetControllerCount == 1) { diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcasterTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcasterTest.java index dfbe4d5a8e8..84b479cfc29 100644 --- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcasterTest.java +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcasterTest.java @@ -55,8 +55,8 @@ public class SystemStateBroadcasterTest { } } - private static DatabaseHandler.Context dbContextFrom(ContentCluster cluster) { - return new DatabaseHandler.Context() { + private static DatabaseHandler.DatabaseContext dbContextFrom(ContentCluster cluster) { + return new DatabaseHandler.DatabaseContext() { @Override public ContentCluster getCluster() { return cluster; diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/TestFleetControllerContext.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/TestFleetControllerContext.java new file mode 100644 index 00000000000..b9d8474affb --- /dev/null +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/TestFleetControllerContext.java @@ -0,0 +1,17 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.clustercontroller.core; + +/** + * @author hakon + */ +public class TestFleetControllerContext extends FleetControllerContextImpl { + public TestFleetControllerContext(FleetControllerOptions options) { + super(options); + } + + @Override + protected String withLogPrefix(String message) { + // Include fleet controller index in prefix in tests, since many may be running + return id() + ": " + message; + } +} diff --git a/config-model/pom.xml b/config-model/pom.xml index 3442a81159e..6dfc2e7740f 100644 --- a/config-model/pom.xml +++ b/config-model/pom.xml @@ -113,7 +113,7 @@ </exclusion> <exclusion> <groupId>org.apache.velocity</groupId> - <artifactId>velocity</artifactId> + <artifactId>velocity-engine-core</artifactId> </exclusion> </exclusions> </dependency> diff --git a/configserver/src/test/java/com/yahoo/vespa/config/server/http/v2/ApplicationHandlerTest.java b/configserver/src/test/java/com/yahoo/vespa/config/server/http/v2/ApplicationHandlerTest.java index 5706851943e..2df7739e552 100644 --- a/configserver/src/test/java/com/yahoo/vespa/config/server/http/v2/ApplicationHandlerTest.java +++ b/configserver/src/test/java/com/yahoo/vespa/config/server/http/v2/ApplicationHandlerTest.java @@ -296,7 +296,7 @@ public class ApplicationHandlerTest { " \"ready\": {" + " \"bar\": {" + " \"readyMillis\": " + (now - 1000) + - " }," + + " }" + " }" + " }," + " \"foo\": {" + @@ -524,7 +524,7 @@ public class ApplicationHandlerTest { " \"ready\": {\n" + " \"bar\": {},\n" + " \"hax\": {}\n" + - " },\n" + + " }\n" + " },\n" + " \"moo\": {\n" + " \"pending\": {},\n" + diff --git a/container-search/abi-spec.json b/container-search/abi-spec.json index 2108a5c7aa2..fc642ca7b22 100644 --- a/container-search/abi-spec.json +++ b/container-search/abi-spec.json @@ -736,6 +736,8 @@ "public static final enum com.yahoo.prelude.query.Item$ItemType WORD_ALTERNATIVES", "public static final enum com.yahoo.prelude.query.Item$ItemType NEAREST_NEIGHBOR", "public static final enum com.yahoo.prelude.query.Item$ItemType GEO_LOCATION_TERM", + "public static final enum com.yahoo.prelude.query.Item$ItemType TRUE", + "public static final enum com.yahoo.prelude.query.Item$ItemType FALSE", "public final int code" ] }, diff --git a/container-search/src/main/java/com/yahoo/prelude/query/FalseItem.java b/container-search/src/main/java/com/yahoo/prelude/query/FalseItem.java index 34670ec6fdd..9a8cc78758f 100644 --- a/container-search/src/main/java/com/yahoo/prelude/query/FalseItem.java +++ b/container-search/src/main/java/com/yahoo/prelude/query/FalseItem.java @@ -15,7 +15,7 @@ public class FalseItem extends Item { @Override public ItemType getItemType() { - return ItemType.WORD; // Implemented as a non-matching word as the backend does not support FalseItem + return ItemType.FALSE; } @Override @@ -30,12 +30,11 @@ public class FalseItem extends Item { @Override public int encode(ByteBuffer buffer) { super.encodeThis(buffer); - putString(" ", buffer); // searching for space will not match return 1; } @Override - public int getTermCount() { return 1; } + public int getTermCount() { return 0; } @Override protected void appendBodyString(StringBuilder buffer) { } diff --git a/container-search/src/main/java/com/yahoo/prelude/query/Item.java b/container-search/src/main/java/com/yahoo/prelude/query/Item.java index d2a594bad98..24cfa3a1c5b 100644 --- a/container-search/src/main/java/com/yahoo/prelude/query/Item.java +++ b/container-search/src/main/java/com/yahoo/prelude/query/Item.java @@ -61,7 +61,9 @@ public abstract class Item implements Cloneable { REGEXP(24), WORD_ALTERNATIVES(25), NEAREST_NEIGHBOR(26), - GEO_LOCATION_TERM(27); + GEO_LOCATION_TERM(27), + TRUE(28), + FALSE(29); public final int code; diff --git a/container-search/src/main/java/com/yahoo/search/yql/VespaSerializer.java b/container-search/src/main/java/com/yahoo/search/yql/VespaSerializer.java index decdaae3a0f..012f427a781 100644 --- a/container-search/src/main/java/com/yahoo/search/yql/VespaSerializer.java +++ b/container-search/src/main/java/com/yahoo/search/yql/VespaSerializer.java @@ -69,6 +69,7 @@ import com.yahoo.prelude.query.AndSegmentItem; import com.yahoo.prelude.query.BoolItem; import com.yahoo.prelude.query.DotProductItem; import com.yahoo.prelude.query.EquivItem; +import com.yahoo.prelude.query.FalseItem; import com.yahoo.prelude.query.ExactStringItem; import com.yahoo.prelude.query.IndexedItem; import com.yahoo.prelude.query.IntItem; @@ -483,6 +484,16 @@ public class VespaSerializer { } + private static class FalseSerializer extends Serializer<FalseItem> { + @Override + void onExit(StringBuilder destination, FalseItem item) { } + @Override + boolean serialize(StringBuilder destination, FalseItem item) { + destination.append("false"); + return false; + } + } + private static class RegExpSerializer extends Serializer<RegExpItem> { @Override @@ -1194,6 +1205,7 @@ public class VespaSerializer { dispatchBuilder.put(IntItem.class, new NumberSerializer()); dispatchBuilder.put(GeoLocationItem.class, new GeoLocationSerializer()); dispatchBuilder.put(BoolItem.class, new BoolSerializer()); + dispatchBuilder.put(FalseItem.class, new FalseSerializer()); dispatchBuilder.put(MarkerWordItem.class, new WordSerializer()); // gotcha dispatchBuilder.put(NearItem.class, new NearSerializer()); dispatchBuilder.put(NearestNeighborItem.class, new NearestNeighborSerializer()); diff --git a/container-search/src/main/java/com/yahoo/search/yql/YqlParser.java b/container-search/src/main/java/com/yahoo/search/yql/YqlParser.java index cdfcef7e2a9..fa2b9fd14e7 100644 --- a/container-search/src/main/java/com/yahoo/search/yql/YqlParser.java +++ b/container-search/src/main/java/com/yahoo/search/yql/YqlParser.java @@ -33,6 +33,7 @@ import com.yahoo.prelude.query.BoolItem; import com.yahoo.prelude.query.CompositeItem; import com.yahoo.prelude.query.DotProductItem; import com.yahoo.prelude.query.EquivItem; +import com.yahoo.prelude.query.FalseItem; import com.yahoo.prelude.query.ExactStringItem; import com.yahoo.prelude.query.IntItem; import com.yahoo.prelude.query.Item; @@ -151,12 +152,12 @@ public class YqlParser implements Parser { public static final String CONNECTION_WEIGHT = "weight"; public static final String CONNECTIVITY = "connectivity"; public static final String DISTANCE = "distance"; + public static final String DISTANCE_THRESHOLD = "distanceThreshold"; public static final String DOT_PRODUCT = "dotProduct"; public static final String EQUIV = "equiv"; public static final String FILTER = "filter"; public static final String GEO_LOCATION = "geoLocation"; public static final String HIT_LIMIT = "hitLimit"; - public static final String DISTANCE_THRESHOLD = "distanceThreshold"; public static final String HNSW_EXPLORE_ADDITIONAL_HITS = "hnsw.exploreAdditionalHits"; public static final String IMPLICIT_TRANSFORMS = "implicitTransforms"; public static final String LABEL = "label"; @@ -164,16 +165,16 @@ public class YqlParser implements Parser { public static final String NEAREST_NEIGHBOR = "nearestNeighbor"; public static final String NORMALIZE_CASE = "normalizeCase"; public static final String ONEAR = "onear"; + public static final String ORIGIN = "origin"; public static final String ORIGIN_LENGTH = "length"; public static final String ORIGIN_OFFSET = "offset"; - public static final String ORIGIN = "origin"; public static final String ORIGIN_ORIGINAL = "original"; public static final String PHRASE = "phrase"; public static final String PREDICATE = "predicate"; public static final String PREFIX = "prefix"; public static final String RANGE = "range"; - public static final String RANKED = "ranked"; public static final String RANK = "rank"; + public static final String RANKED = "ranked"; public static final String SAME_ELEMENT = "sameElement"; public static final String SCORE_THRESHOLD = "scoreThreshold"; public static final String SIGNIFICANCE = "significance"; @@ -184,12 +185,12 @@ public class YqlParser implements Parser { public static final String TARGET_NUM_HITS = "targetNumHits"; public static final String THRESHOLD_BOOST_FACTOR = "thresholdBoostFactor"; public static final String UNIQUE_ID = "id"; + public static final String URI = "uri"; public static final String USE_POSITION_DATA = "usePositionData"; public static final String WAND = "wand"; public static final String WEAK_AND = "weakAnd"; - public static final String WEIGHTED_SET = "weightedSet"; public static final String WEIGHT = "weight"; - public static final String URI = "uri"; + public static final String WEIGHTED_SET = "weightedSet"; private final IndexFacts indexFacts; private final List<ConnectedItem> connectedItems = new ArrayList<>(); @@ -352,6 +353,8 @@ public class YqlParser implements Parser { return buildRegExpSearch(ast); case CALL: return buildFunctionCall(ast); + case LITERAL: + return buildLiteral(ast); default: throw newUnexpectedArgumentException(ast.getOperator(), ExpressionOperator.AND, ExpressionOperator.CALL, @@ -445,6 +448,14 @@ public class YqlParser implements Parser { return item; } + private Item buildLiteral(OperatorNode<ExpressionOperator> ast) { + var literal = ast.getArgument(0); + if (Boolean.FALSE.equals(literal)) { + return new FalseItem(); + } + throw newUnexpectedArgumentException(literal, Boolean.FALSE); + } + private Item buildNearestNeighbor(OperatorNode<ExpressionOperator> ast) { List<OperatorNode<ExpressionOperator>> args = ast.getArgument(1); Preconditions.checkArgument(args.size() == 2, "Expected 2 arguments, got %s.", args.size()); diff --git a/container-search/src/test/java/com/yahoo/search/yql/VespaSerializerTestCase.java b/container-search/src/test/java/com/yahoo/search/yql/VespaSerializerTestCase.java index 92c947b0989..308907df6b4 100644 --- a/container-search/src/test/java/com/yahoo/search/yql/VespaSerializerTestCase.java +++ b/container-search/src/test/java/com/yahoo/search/yql/VespaSerializerTestCase.java @@ -143,6 +143,11 @@ public class VespaSerializerTestCase { } @Test + public void testFalse() { + parseAndConfirm("false"); + } + + @Test public void testNumbers() { parseAndConfirm("title = 500"); parseAndConfirm("title > 500"); diff --git a/container-search/src/test/java/com/yahoo/search/yql/YqlParserTestCase.java b/container-search/src/test/java/com/yahoo/search/yql/YqlParserTestCase.java index 1010c43c2be..e7cba25d1ed 100644 --- a/container-search/src/test/java/com/yahoo/search/yql/YqlParserTestCase.java +++ b/container-search/src/test/java/com/yahoo/search/yql/YqlParserTestCase.java @@ -592,6 +592,11 @@ public class YqlParserTestCase { } @Test + public void testFalse() { + assertParse("select foo from bar where false;", "FALSE"); + } + + @Test public void testPredicate() { assertParse("select foo from bar where predicate(predicate_field, " + "{\"gender\":\"male\", \"hobby\":[\"music\", \"hiking\"]}, {\"age\":23L});", diff --git a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDownloader.java b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDownloader.java index 0678509dc68..13b6f5d8cc3 100644 --- a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDownloader.java +++ b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileDownloader.java @@ -64,7 +64,11 @@ public class FileDownloader implements AutoCloseable { this.sleepBetweenRetries = sleepBetweenRetries; // Needed to receive RPC receiveFile* calls from server after starting download of file reference new FileReceiver(supervisor, downloads, downloadDirectory); - this.fileReferenceDownloader = new FileReferenceDownloader(connectionPool, downloads, timeout, sleepBetweenRetries); + this.fileReferenceDownloader = new FileReferenceDownloader(connectionPool, + downloads, + timeout, + sleepBetweenRetries, + downloadDirectory); } public Optional<File> getFile(FileReference fileReference) { @@ -86,9 +90,12 @@ public class FileDownloader implements AutoCloseable { FileReference fileReference = fileReferenceDownload.fileReference(); Optional<File> file = getFileFromFileSystem(fileReference); - return (file.isPresent()) - ? CompletableFuture.completedFuture(file) - : startDownload(fileReferenceDownload); + if (file.isPresent()) { + downloads.setDownloadStatus(fileReference, 1.0); + return CompletableFuture.completedFuture(file); + } else { + return startDownload(fileReferenceDownload); + } } public Map<FileReference, Double> downloadStatus() { return downloads.downloadStatus(); } @@ -102,6 +109,10 @@ public class FileDownloader implements AutoCloseable { } private Optional<File> getFileFromFileSystem(FileReference fileReference) { + return getFileFromFileSystem(fileReference, downloadDirectory); + } + + private static Optional<File> getFileFromFileSystem(FileReference fileReference, File downloadDirectory) { File[] files = new File(downloadDirectory, fileReference.value()).listFiles(); if (files == null) return Optional.empty(); if (files.length == 0) return Optional.empty(); @@ -114,13 +125,12 @@ public class FileDownloader implements AutoCloseable { throw new RuntimeException("File reference '" + fileReference.value() + "' exists, but unable to read it"); } else { log.log(Level.FINE, () -> "File reference '" + fileReference.value() + "' found: " + file.getAbsolutePath()); - downloads.setDownloadStatus(fileReference, 1.0); return Optional.of(file); } } - boolean fileReferenceExists(FileReference fileReference) { - return getFileFromFileSystem(fileReference).isPresent(); + static boolean fileReferenceExists(FileReference fileReference, File downloadDirectory) { + return getFileFromFileSystem(fileReference, downloadDirectory).isPresent(); } boolean isDownloading(FileReference fileReference) { @@ -129,7 +139,7 @@ public class FileDownloader implements AutoCloseable { /** Start a download if needed, don't wait for result */ public void downloadIfNeeded(FileReferenceDownload fileReferenceDownload) { - if (fileReferenceExists(fileReferenceDownload.fileReference())) return; + if (fileReferenceExists(fileReferenceDownload.fileReference(), downloadDirectory)) return; startDownload(fileReferenceDownload); } diff --git a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java index 740bf23796f..445106f4fe4 100644 --- a/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java +++ b/filedistribution/src/main/java/com/yahoo/vespa/filedistribution/FileReferenceDownloader.java @@ -36,15 +36,18 @@ public class FileReferenceDownloader { private final Duration downloadTimeout; private final Duration sleepBetweenRetries; private final Duration rpcTimeout; + private final File downloadDirectory; FileReferenceDownloader(ConnectionPool connectionPool, Downloads downloads, Duration timeout, - Duration sleepBetweenRetries) { + Duration sleepBetweenRetries, + File downloadDirectory) { this.connectionPool = connectionPool; this.downloads = downloads; this.downloadTimeout = timeout; this.sleepBetweenRetries = sleepBetweenRetries; + this.downloadDirectory = downloadDirectory; String timeoutString = System.getenv("VESPA_CONFIGPROXY_FILEDOWNLOAD_RPC_TIMEOUT"); this.rpcTimeout = Duration.ofSeconds(timeoutString == null ? 30 : Integer.parseInt(timeoutString)); } @@ -53,6 +56,8 @@ public class FileReferenceDownloader { FileReference fileReference = fileReferenceDownload.fileReference(); int retryCount = 0; do { + if (FileDownloader.fileReferenceExists(fileReference, downloadDirectory)) + return; if (startDownloadRpc(fileReferenceDownload, retryCount)) return; diff --git a/filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileDownloaderTest.java b/filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileDownloaderTest.java index 79530c39ad7..97b948ef5d4 100644 --- a/filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileDownloaderTest.java +++ b/filedistribution/src/test/java/com/yahoo/vespa/filedistribution/FileDownloaderTest.java @@ -76,6 +76,7 @@ public class FileDownloaderTest { FileReference fileReference = new FileReference(fileReferenceString); File fileReferenceFullPath = fileReferenceFullPath(downloadDir, fileReference); writeFileReference(downloadDir, fileReferenceString, filename); + fileDownloader.downloads().completedDownloading(fileReference, fileReferenceFullPath); // Check that we get correct path and content when asking for file reference Optional<File> pathToFile = fileDownloader.getFile(fileReference); diff --git a/metrics-proxy/pom.xml b/metrics-proxy/pom.xml index 83cab8de366..6e43b6c1d62 100644 --- a/metrics-proxy/pom.xml +++ b/metrics-proxy/pom.xml @@ -108,7 +108,14 @@ </dependency> <dependency> <groupId>org.apache.velocity</groupId> - <artifactId>velocity</artifactId> + <artifactId>velocity-engine-core</artifactId> + <exclusions> + <exclusion> + <!-- Must use the one provided by Jdisc to prevent two instances of slf4j classes. --> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </exclusion> + </exclusions> </dependency> <!-- test scope --> diff --git a/node-admin/pom.xml b/node-admin/pom.xml index 29bc8406b15..8fe5b2aecd1 100644 --- a/node-admin/pom.xml +++ b/node-admin/pom.xml @@ -73,8 +73,15 @@ </dependency> <dependency> <groupId>org.apache.velocity</groupId> - <artifactId>velocity</artifactId> + <artifactId>velocity-engine-core</artifactId> <scope>compile</scope> + <exclusions> + <exclusion> + <!-- Must use the one provided by Jdisc to prevent two instances of slf4j classes. --> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </exclusion> + </exclusions> </dependency> <!-- Test --> diff --git a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/maintenance/identity/AthenzCredentialsMaintainer.java b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/maintenance/identity/AthenzCredentialsMaintainer.java index 280e58c91f1..6fef2cbbb50 100644 --- a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/maintenance/identity/AthenzCredentialsMaintainer.java +++ b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/maintenance/identity/AthenzCredentialsMaintainer.java @@ -20,10 +20,11 @@ import com.yahoo.vespa.athenz.identityprovider.client.CsrGenerator; import com.yahoo.vespa.athenz.identityprovider.client.DefaultIdentityDocumentClient; import com.yahoo.vespa.athenz.tls.AthenzIdentityVerifier; import com.yahoo.vespa.athenz.utils.SiaUtils; -import com.yahoo.vespa.hosted.node.admin.container.ContainerName; import com.yahoo.vespa.hosted.node.admin.component.ConfigServerInfo; +import com.yahoo.vespa.hosted.node.admin.container.ContainerName; import com.yahoo.vespa.hosted.node.admin.nodeagent.NodeAgentContext; import com.yahoo.vespa.hosted.node.admin.nodeagent.NodeAgentTask; +import com.yahoo.vespa.hosted.node.admin.nodeagent.VespaUser; import com.yahoo.vespa.hosted.node.admin.task.util.file.FileFinder; import com.yahoo.vespa.hosted.node.admin.task.util.file.UnixPath; import com.yahoo.vespa.hosted.node.admin.task.util.fs.ContainerPath; @@ -206,7 +207,7 @@ public class AthenzCredentialsMaintainer implements CredentialsMaintainer { EntityBindingsMapper.toAttestationData(signedIdentityDocument), csr); EntityBindingsMapper.writeSignedIdentityDocumentToFile(identityDocumentFile, signedIdentityDocument); - writePrivateKeyAndCertificate(context.userNamespace().vespaUserId(), + writePrivateKeyAndCertificate(context.vespaUser(), privateKeyFile, keyPair.getPrivate(), certificateFile, instanceIdentity.certificate()); context.log(logger, "Instance successfully registered and credentials written to file"); } @@ -234,7 +235,7 @@ public class AthenzCredentialsMaintainer implements CredentialsMaintainer { context.identity(), identityDocument.providerUniqueId().asDottedString(), csr); - writePrivateKeyAndCertificate(context.userNamespace().vespaUserId(), + writePrivateKeyAndCertificate(context.vespaUser(), privateKeyFile, keyPair.getPrivate(), certificateFile, instanceIdentity.certificate()); context.log(logger, "Instance successfully refreshed and credentials written to file"); } catch (ZtsClientException e) { @@ -251,19 +252,19 @@ public class AthenzCredentialsMaintainer implements CredentialsMaintainer { } - private static void writePrivateKeyAndCertificate(int vespaUid, + private static void writePrivateKeyAndCertificate(VespaUser vespaUser, ContainerPath privateKeyFile, PrivateKey privateKey, ContainerPath certificateFile, X509Certificate certificate) { - writeFile(privateKeyFile, vespaUid, KeyUtils.toPem(privateKey)); - writeFile(certificateFile, vespaUid, X509CertificateUtils.toPem(certificate)); + writeFile(privateKeyFile, vespaUser, KeyUtils.toPem(privateKey)); + writeFile(certificateFile, vespaUser, X509CertificateUtils.toPem(certificate)); } - private static void writeFile(ContainerPath path, int vespaUid, String utf8Content) { + private static void writeFile(ContainerPath path, VespaUser vespaUser, String utf8Content) { new UnixPath(path.resolveSibling(path.getFileName() + ".tmp")) .writeUtf8File(utf8Content, "r--------") - .setOwnerId(vespaUid) + .setOwnerId(vespaUser.uid()) .atomicMove(path); } diff --git a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/maintenance/servicedump/VespaServiceDumperImpl.java b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/maintenance/servicedump/VespaServiceDumperImpl.java index 0a9496be0a6..b299e1f3f0d 100644 --- a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/maintenance/servicedump/VespaServiceDumperImpl.java +++ b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/maintenance/servicedump/VespaServiceDumperImpl.java @@ -101,8 +101,8 @@ public class VespaServiceDumperImpl implements VespaServiceDumper { } context.log(log, Level.INFO, "Creating '" + unixPathDirectory +"'."); unixPathDirectory.createDirectory("rwxr-x---") - .setOwner(context.userNamespace().vespaUser()) - .setGroup(context.userNamespace().vespaGroup()); + .setOwner(context.vespaUser().name()) + .setGroup(context.vespaUser().group()); URI destination = serviceDumpDestination(nodeSpec, createDumpId(request)); ProducerContext producerCtx = new ProducerContext(context, directory, request); List<Artifact> producedArtifacts = new ArrayList<>(); diff --git a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/nodeadmin/NodeAdminStateUpdater.java b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/nodeadmin/NodeAdminStateUpdater.java index 53c9e741f59..dda404797d9 100644 --- a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/nodeadmin/NodeAdminStateUpdater.java +++ b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/nodeadmin/NodeAdminStateUpdater.java @@ -3,7 +3,6 @@ package com.yahoo.vespa.hosted.node.admin.nodeadmin; import com.yahoo.concurrent.ThreadFactoryFactory; import com.yahoo.config.provision.HostName; -import com.yahoo.vespa.flags.FlagSource; import com.yahoo.vespa.hosted.node.admin.configserver.noderepository.Acl; import com.yahoo.vespa.hosted.node.admin.configserver.noderepository.NodeRepository; import com.yahoo.vespa.hosted.node.admin.configserver.noderepository.NodeSpec; @@ -13,7 +12,6 @@ import com.yahoo.vespa.hosted.node.admin.nodeagent.NodeAgentContext; import com.yahoo.vespa.hosted.node.admin.nodeagent.NodeAgentContextFactory; import com.yahoo.yolean.Exceptions; -import java.time.Clock; import java.time.Duration; import java.util.ArrayList; import java.util.EnumSet; @@ -59,9 +57,7 @@ public class NodeAdminStateUpdater { NodeRepository nodeRepository, Orchestrator orchestrator, NodeAdmin nodeAdmin, - HostName hostHostname, - Clock clock, - FlagSource flagSource) { + HostName hostHostname) { this.nodeAgentContextFactory = nodeAgentContextFactory; this.nodeRepository = nodeRepository; this.orchestrator = orchestrator; diff --git a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/nodeagent/NodeAgentContext.java b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/nodeagent/NodeAgentContext.java index 8cf8553bc34..23a81458134 100644 --- a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/nodeagent/NodeAgentContext.java +++ b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/nodeagent/NodeAgentContext.java @@ -6,10 +6,10 @@ import com.yahoo.config.provision.HostName; import com.yahoo.config.provision.NodeType; import com.yahoo.config.provision.zone.ZoneApi; import com.yahoo.vespa.athenz.api.AthenzIdentity; -import com.yahoo.vespa.hosted.node.admin.container.ContainerName; import com.yahoo.vespa.hosted.node.admin.component.TaskContext; import com.yahoo.vespa.hosted.node.admin.configserver.noderepository.Acl; import com.yahoo.vespa.hosted.node.admin.configserver.noderepository.NodeSpec; +import com.yahoo.vespa.hosted.node.admin.container.ContainerName; import com.yahoo.vespa.hosted.node.admin.container.ContainerNetworkMode; import com.yahoo.vespa.hosted.node.admin.task.util.fs.ContainerPath; @@ -42,7 +42,8 @@ public interface NodeAgentContext extends TaskContext { ZoneApi zone(); - UserNamespace userNamespace(); + /** @return information about the Vespa user inside the container */ + VespaUser vespaUser(); default boolean isDisabled(NodeAgentTask task) { return false; diff --git a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/nodeagent/NodeAgentContextImpl.java b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/nodeagent/NodeAgentContextImpl.java index 9bcf5d58d6e..0b1f7f24ced 100644 --- a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/nodeagent/NodeAgentContextImpl.java +++ b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/nodeagent/NodeAgentContextImpl.java @@ -12,9 +12,9 @@ import com.yahoo.vespa.flags.FetchVector; import com.yahoo.vespa.flags.FlagSource; import com.yahoo.vespa.flags.InMemoryFlagSource; import com.yahoo.vespa.flags.PermanentFlags; -import com.yahoo.vespa.hosted.node.admin.container.ContainerName; import com.yahoo.vespa.hosted.node.admin.configserver.noderepository.Acl; import com.yahoo.vespa.hosted.node.admin.configserver.noderepository.NodeSpec; +import com.yahoo.vespa.hosted.node.admin.container.ContainerName; import com.yahoo.vespa.hosted.node.admin.container.ContainerNetworkMode; import com.yahoo.vespa.hosted.node.admin.task.util.fs.ContainerFileSystem; import com.yahoo.vespa.hosted.node.admin.task.util.fs.ContainerPath; @@ -42,16 +42,14 @@ public class NodeAgentContextImpl implements NodeAgentContext { private final ZoneApi zone; private final ContainerFileSystem containerFs; private final ContainerPath pathToVespaHome; - private final UserNamespace userNamespace; private final double cpuSpeedup; private final Set<NodeAgentTask> disabledNodeAgentTasks; private final Optional<ApplicationId> hostExclusiveTo; public NodeAgentContextImpl(NodeSpec node, Acl acl, AthenzIdentity identity, ContainerNetworkMode containerNetworkMode, ZoneApi zone, - FlagSource flagSource, Path pathToContainerStorage, String pathToVespaHome, - UserNamespace userNamespace, double cpuSpeedup, - Optional<ApplicationId> hostExclusiveTo) { + FlagSource flagSource, ContainerFileSystem containerFs, String pathToVespaHome, + double cpuSpeedup, Optional<ApplicationId> hostExclusiveTo) { if (cpuSpeedup <= 0) throw new IllegalArgumentException("cpuSpeedUp must be positive, was: " + cpuSpeedup); @@ -61,10 +59,9 @@ public class NodeAgentContextImpl implements NodeAgentContext { this.identity = Objects.requireNonNull(identity); this.containerNetworkMode = Objects.requireNonNull(containerNetworkMode); this.zone = Objects.requireNonNull(zone); - this.containerFs = ContainerFileSystem.create(pathToContainerStorage.resolve(containerName.asString()), userNamespace); + this.containerFs = Objects.requireNonNull(containerFs); this.pathToVespaHome = containerFs.getPath(pathToVespaHome); this.logPrefix = containerName.asString() + ": "; - this.userNamespace = Objects.requireNonNull(userNamespace); this.cpuSpeedup = cpuSpeedup; this.disabledNodeAgentTasks = NodeAgentTask.fromString( PermanentFlags.DISABLED_HOST_ADMIN_TASKS.bindTo(flagSource).with(FetchVector.Dimension.HOSTNAME, node.hostname()).value()); @@ -102,8 +99,8 @@ public class NodeAgentContextImpl implements NodeAgentContext { } @Override - public UserNamespace userNamespace() { - return userNamespace; + public VespaUser vespaUser() { + return containerFs.getUserPrincipalLookupService().vespaUser(); } @Override @@ -191,6 +188,7 @@ public class NodeAgentContextImpl implements NodeAgentContext { private ContainerNetworkMode containerNetworkMode; private ZoneApi zone; private UserNamespace userNamespace; + private VespaUser vespaUser; private Path containerStorage; private FlagSource flagSource; private double cpuSpeedUp = 1; @@ -230,6 +228,12 @@ public class NodeAgentContextImpl implements NodeAgentContext { return this; } + public Builder vespaUser(VespaUser vespaUser) { + this.vespaUser = vespaUser; + return this; + } + + /** Sets the file system to use for paths. */ public Builder fileSystem(FileSystem fileSystem) { return containerStorage(fileSystem.getPath(DEFAULT_CONTAINER_STORAGE.toString())); @@ -258,6 +262,14 @@ public class NodeAgentContextImpl implements NodeAgentContext { public NodeAgentContextImpl build() { Objects.requireNonNull(containerStorage, "Must set one of containerStorage or fileSystem"); + UserNamespace userNamespace = Optional.ofNullable(this.userNamespace) + .orElseGet(() -> new UserNamespace(100000, 100000)); + VespaUser vespaUser = Optional.ofNullable(this.vespaUser) + .orElseGet(() -> new VespaUser("vespa", "vespa", 1000, 100)); + ContainerFileSystem containerFs = ContainerFileSystem.create(containerStorage + .resolve(nodeSpecBuilder.hostname().split("\\.")[0]), userNamespace, vespaUser); + containerFs.createRoot(); + return new NodeAgentContextImpl( nodeSpecBuilder.build(), Optional.ofNullable(acl).orElse(Acl.EMPTY), @@ -285,9 +297,8 @@ public class NodeAgentContextImpl implements NodeAgentContext { } }), Optional.ofNullable(flagSource).orElseGet(InMemoryFlagSource::new), - containerStorage, + containerFs, "/opt/vespa", - Optional.ofNullable(userNamespace).orElseGet(() -> new UserNamespace(100000, 100000, "vespa", "vespa", 1000, 100)), cpuSpeedUp, hostExclusiveTo); } } diff --git a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/nodeagent/UserNamespace.java b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/nodeagent/UserNamespace.java index 1a25b5c3c5e..005452411bd 100644 --- a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/nodeagent/UserNamespace.java +++ b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/nodeagent/UserNamespace.java @@ -1,8 +1,6 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.vespa.hosted.node.admin.nodeagent; -import java.util.Objects; - /** * @author valerijf */ @@ -16,20 +14,12 @@ public class UserNamespace { * https://github.com/torvalds/linux/blob/5bfc75d92efd494db37f5c4c173d3639d4772966/Documentation/admin-guide/sysctl/fs.rst#overflowgid--overflowuid */ private static final int OVERFLOW_ID = 65_534; - private final int uidOffset; - private final int gidOffset; - private final String vespaUser; - private final String vespaGroup; - private final int vespaUserId; - private final int vespaGroupId; + private volatile int uidOffset; + private volatile int gidOffset; - public UserNamespace(int uidOffset, int gidOffset, String vespaUser, String vespaGroup, int vespaUserId, int vespaGroupId) { + public UserNamespace(int uidOffset, int gidOffset) { this.uidOffset = uidOffset; this.gidOffset = gidOffset; - this.vespaUser = Objects.requireNonNull(vespaUser); - this.vespaGroup = Objects.requireNonNull(vespaGroup); - this.vespaUserId = vespaUserId; - this.vespaGroupId = vespaGroupId; } public int userIdOnHost(int containerUid) { return toHostId(containerUid, uidOffset); } @@ -37,14 +27,15 @@ public class UserNamespace { public int userIdInContainer(int hostUid) { return toContainerId(hostUid, uidOffset); } public int groupIdInContainer(int hostGid) { return toContainerId(hostGid, gidOffset); } - public String vespaUser() { return vespaUser; } - public String vespaGroup() { return vespaGroup; } - public int vespaUserId() { return vespaUserId; } - public int vespaGroupId() { return vespaGroupId; } - public int idRange() { return ID_RANGE; } public int overflowId() { return OVERFLOW_ID; } + // Remove after migration to mapped namespaces is complete, make fields final + public void setOffsets(int idOffset) { + this.uidOffset = idOffset; + this.gidOffset = idOffset; + } + private static int toHostId(int containerId, int idOffset) { if (containerId < 0 || containerId > ID_RANGE) throw new IllegalArgumentException("Invalid container id: " + containerId); diff --git a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/nodeagent/VespaUser.java b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/nodeagent/VespaUser.java new file mode 100644 index 00000000000..78ccca80beb --- /dev/null +++ b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/nodeagent/VespaUser.java @@ -0,0 +1,29 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.hosted.node.admin.nodeagent; + +import java.util.Objects; + +/** + * Describes Vespa user inside the container user namespace. + * + * @author valerijf + */ +public class VespaUser { + + private final String name; + private final String group; + private final int uid; + private final int gid; + + public VespaUser(String name, String group, int uid, int gid) { + this.name = Objects.requireNonNull(name); + this.group = Objects.requireNonNull(group); + this.uid = uid; + this.gid = gid; + } + + public String name() { return name; } + public String group() { return group; } + public int uid() { return uid; } + public int gid() { return gid; } +} diff --git a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/task/util/file/Template.java b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/task/util/file/Template.java index 26d97d06724..f0ba6272679 100644 --- a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/task/util/file/Template.java +++ b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/task/util/file/Template.java @@ -3,6 +3,8 @@ package com.yahoo.vespa.hosted.node.admin.task.util.file; import org.apache.velocity.VelocityContext; import org.apache.velocity.app.Velocity; +import org.apache.velocity.runtime.RuntimeConstants; +import org.slf4j.helpers.NOPLogger; import java.io.StringWriter; import java.nio.file.Files; @@ -19,7 +21,7 @@ import static com.yahoo.yolean.Exceptions.uncheck; public class Template { static { - Velocity.addProperty(Velocity.RUNTIME_LOG_LOGSYSTEM_CLASS, "org.apache.velocity.runtime.log.NullLogSystem"); + Velocity.addProperty(RuntimeConstants.RUNTIME_LOG_INSTANCE, NOPLogger.NOP_LOGGER); Velocity.init(); } diff --git a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/task/util/fs/ContainerFileSystem.java b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/task/util/fs/ContainerFileSystem.java index 078a60ba7a5..6f0ab5576b7 100644 --- a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/task/util/fs/ContainerFileSystem.java +++ b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/task/util/fs/ContainerFileSystem.java @@ -2,28 +2,35 @@ package com.yahoo.vespa.hosted.node.admin.task.util.fs; import com.yahoo.vespa.hosted.node.admin.nodeagent.UserNamespace; +import com.yahoo.vespa.hosted.node.admin.nodeagent.VespaUser; import java.io.IOException; import java.nio.file.FileStore; import java.nio.file.FileSystem; -import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.PathMatcher; import java.nio.file.WatchService; -import java.nio.file.attribute.UserPrincipalLookupService; import java.util.Set; -import static com.yahoo.yolean.Exceptions.uncheck; - /** * @author valerijf */ public class ContainerFileSystem extends FileSystem { private final ContainerFileSystemProvider containerFsProvider; + private final Path containerRootOnHost; - ContainerFileSystem(ContainerFileSystemProvider containerFsProvider) { + ContainerFileSystem(ContainerFileSystemProvider containerFsProvider, Path containerRootOnHost) { this.containerFsProvider = containerFsProvider; + this.containerRootOnHost = containerRootOnHost; + } + + public Path containerRootOnHost() { + return containerRootOnHost; + } + + public void createRoot() { + provider().createFileSystemRoot(); } @Override @@ -52,7 +59,7 @@ public class ContainerFileSystem extends FileSystem { } @Override - public UserPrincipalLookupService getUserPrincipalLookupService() { + public ContainerUserPrincipalLookupService getUserPrincipalLookupService() { return containerFsProvider.userPrincipalLookupService(); } @@ -86,8 +93,7 @@ public class ContainerFileSystem extends FileSystem { throw new UnsupportedOperationException(); } - public static ContainerFileSystem create(Path containerStorageRoot, UserNamespace userNamespace) { - uncheck(() -> Files.createDirectories(containerStorageRoot)); - return new ContainerFileSystemProvider(containerStorageRoot, userNamespace).getFileSystem(null); + public static ContainerFileSystem create(Path containerStorageRoot, UserNamespace userNamespace, VespaUser vespaUser) { + return new ContainerFileSystemProvider(containerStorageRoot, userNamespace, vespaUser).getFileSystem(null); } } diff --git a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/task/util/fs/ContainerFileSystemProvider.java b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/task/util/fs/ContainerFileSystemProvider.java index 909c6c9cbc1..00dda1d7cd2 100644 --- a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/task/util/fs/ContainerFileSystemProvider.java +++ b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/task/util/fs/ContainerFileSystemProvider.java @@ -2,6 +2,7 @@ package com.yahoo.vespa.hosted.node.admin.task.util.fs; import com.yahoo.vespa.hosted.node.admin.nodeagent.UserNamespace; +import com.yahoo.vespa.hosted.node.admin.nodeagent.VespaUser; import java.io.IOException; import java.net.URI; @@ -31,8 +32,8 @@ import java.util.Iterator; import java.util.Map; import java.util.Set; -import static com.yahoo.vespa.hosted.node.admin.task.util.fs.ContainerAttributeViews.ContainerPosixFileAttributes; import static com.yahoo.vespa.hosted.node.admin.task.util.fs.ContainerAttributeViews.ContainerPosixFileAttributeView; +import static com.yahoo.vespa.hosted.node.admin.task.util.fs.ContainerAttributeViews.ContainerPosixFileAttributes; import static com.yahoo.vespa.hosted.node.admin.task.util.fs.ContainerUserPrincipalLookupService.ContainerGroupPrincipal; import static com.yahoo.vespa.hosted.node.admin.task.util.fs.ContainerUserPrincipalLookupService.ContainerUserPrincipal; import static com.yahoo.yolean.Exceptions.uncheck; @@ -43,17 +44,11 @@ import static com.yahoo.yolean.Exceptions.uncheck; class ContainerFileSystemProvider extends FileSystemProvider { private final ContainerFileSystem containerFs; private final ContainerUserPrincipalLookupService userPrincipalLookupService; - private final Path containerRootOnHost; - ContainerFileSystemProvider(Path containerRootOnHost, UserNamespace userNamespace) { - this.containerFs = new ContainerFileSystem(this); + ContainerFileSystemProvider(Path containerRootOnHost, UserNamespace userNamespace, VespaUser vespaUser) { + this.containerFs = new ContainerFileSystem(this, containerRootOnHost); this.userPrincipalLookupService = new ContainerUserPrincipalLookupService( - containerRootOnHost.getFileSystem().getUserPrincipalLookupService(), userNamespace); - this.containerRootOnHost = containerRootOnHost; - } - - public Path containerRootOnHost() { - return containerRootOnHost; + containerRootOnHost.getFileSystem().getUserPrincipalLookupService(), userNamespace, vespaUser); } public ContainerUserPrincipalLookupService userPrincipalLookupService() { @@ -225,6 +220,16 @@ class ContainerFileSystemProvider extends FileSystemProvider { return value; } + void createFileSystemRoot() { + ContainerPath root = containerFs.getPath("/"); + if (!Files.exists(root)) { + uncheck(() -> { + Files.createDirectories(root.pathOnHost()); + fixOwnerToContainerRoot(root); + }); + } + } + private void fixOwnerToContainerRoot(ContainerPath path) throws IOException { setAttribute(path, "unix:uid", 0); setAttribute(path, "unix:gid", 0); diff --git a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/task/util/fs/ContainerPath.java b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/task/util/fs/ContainerPath.java index 15295ffd087..853646d53b5 100644 --- a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/task/util/fs/ContainerPath.java +++ b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/task/util/fs/ContainerPath.java @@ -32,7 +32,7 @@ public class ContainerPath implements Path { if (!pathOnHost.isAbsolute()) throw new IllegalArgumentException("Path host must be absolute: " + pathOnHost); - Path containerRootOnHost = containerFs.provider().containerRootOnHost(); + Path containerRootOnHost = containerFs.containerRootOnHost(); if (!pathOnHost.startsWith(containerRootOnHost)) throw new IllegalArgumentException("Path on host (" + pathOnHost + ") must start with container root on host (" + containerRootOnHost + ")"); } @@ -173,7 +173,7 @@ public class ContainerPath implements Path { @Override public String toString() { - return containerFs.provider().containerRootOnHost().getFileName() + ":" + pathInContainer(); + return containerFs.containerRootOnHost().getFileName() + ":" + pathInContainer(); } private static ContainerPath resolve(ContainerFileSystem containerFs, String[] currentParts, Path other) { @@ -189,7 +189,7 @@ public class ContainerPath implements Path { } return new ContainerPath(containerFs, - containerFs.provider().containerRootOnHost().resolve(String.join("/", parts)), + containerFs.containerRootOnHost().resolve(String.join("/", parts)), parts.toArray(String[]::new)); } @@ -201,7 +201,7 @@ public class ContainerPath implements Path { public static ContainerPath fromPathOnHost(ContainerFileSystem containerFs, Path pathOnHost) { pathOnHost = pathOnHost.normalize(); - Path containerRootOnHost = containerFs.provider().containerRootOnHost(); + Path containerRootOnHost = containerFs.containerRootOnHost(); Path pathUnderContainerStorage = containerRootOnHost.relativize(pathOnHost); if (pathUnderContainerStorage.getNameCount() == 0 || pathUnderContainerStorage.getName(0).toString().isEmpty()) diff --git a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/task/util/fs/ContainerUserPrincipalLookupService.java b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/task/util/fs/ContainerUserPrincipalLookupService.java index ae65f6a7f7f..8e35bdccc23 100644 --- a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/task/util/fs/ContainerUserPrincipalLookupService.java +++ b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/task/util/fs/ContainerUserPrincipalLookupService.java @@ -2,6 +2,7 @@ package com.yahoo.vespa.hosted.node.admin.task.util.fs; import com.yahoo.vespa.hosted.node.admin.nodeagent.UserNamespace; +import com.yahoo.vespa.hosted.node.admin.nodeagent.VespaUser; import java.io.IOException; import java.nio.file.attribute.GroupPrincipal; @@ -13,16 +14,22 @@ import java.util.Objects; /** * @author valerijf */ -class ContainerUserPrincipalLookupService extends UserPrincipalLookupService { +public class ContainerUserPrincipalLookupService extends UserPrincipalLookupService { private final UserPrincipalLookupService baseFsUserPrincipalLookupService; private final UserNamespace userNamespace; + private final VespaUser vespaUser; - ContainerUserPrincipalLookupService(UserPrincipalLookupService baseFsUserPrincipalLookupService, UserNamespace userNamespace) { + ContainerUserPrincipalLookupService( + UserPrincipalLookupService baseFsUserPrincipalLookupService, UserNamespace userNamespace, VespaUser vespaUser) { this.baseFsUserPrincipalLookupService = Objects.requireNonNull(baseFsUserPrincipalLookupService); this.userNamespace = Objects.requireNonNull(userNamespace); + this.vespaUser = Objects.requireNonNull(vespaUser); } + public UserNamespace userNamespace() { return userNamespace; } + public VespaUser vespaUser() { return vespaUser; } + public int userIdOnHost(int containerUid) { return userNamespace.userIdOnHost(containerUid); } public int groupIdOnHost(int containerGid) { return userNamespace.groupIdOnHost(containerGid); } public int userIdInContainer(int hostUid) { return userNamespace.userIdInContainer(hostUid); } @@ -30,27 +37,27 @@ class ContainerUserPrincipalLookupService extends UserPrincipalLookupService { @Override public ContainerUserPrincipal lookupPrincipalByName(String name) throws IOException { - int containerUid = resolveName(name, userNamespace.vespaUser(), userNamespace.vespaUserId()); - String user = resolveId(containerUid, userNamespace.vespaUser(), userNamespace.vespaUserId()); + int containerUid = resolveName(name, vespaUser.name(), vespaUser.uid()); + String user = resolveId(containerUid, vespaUser.name(), vespaUser.uid()); String hostUid = String.valueOf(userIdOnHost(containerUid)); return new ContainerUserPrincipal(containerUid, user, baseFsUserPrincipalLookupService.lookupPrincipalByName(hostUid)); } @Override public ContainerGroupPrincipal lookupPrincipalByGroupName(String group) throws IOException { - int containerGid = resolveName(group, userNamespace.vespaGroup(), userNamespace.vespaGroupId()); - String name = resolveId(containerGid, userNamespace.vespaGroup(), userNamespace.vespaGroupId()); + int containerGid = resolveName(group, vespaUser.group(), vespaUser.gid()); + String name = resolveId(containerGid, vespaUser.group(), vespaUser.gid()); String hostGid = String.valueOf(groupIdOnHost(containerGid)); return new ContainerGroupPrincipal(containerGid, name, baseFsUserPrincipalLookupService.lookupPrincipalByGroupName(hostGid)); } public ContainerUserPrincipal userPrincipal(int uid, UserPrincipal baseFsPrincipal) { - String name = resolveId(uid, userNamespace.vespaUser(), userNamespace.vespaUserId()); + String name = resolveId(uid, vespaUser.name(), vespaUser.uid()); return new ContainerUserPrincipal(uid, name, baseFsPrincipal); } public ContainerGroupPrincipal groupPrincipal(int gid, GroupPrincipal baseFsPrincipal) { - String name = resolveId(gid, userNamespace.vespaGroup(), userNamespace.vespaGroupId()); + String name = resolveId(gid, vespaUser.group(), vespaUser.gid()); return new ContainerGroupPrincipal(gid, name, baseFsPrincipal); } diff --git a/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/integration/ContainerTester.java b/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/integration/ContainerTester.java index 08e335f188a..4a26195dd3a 100644 --- a/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/integration/ContainerTester.java +++ b/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/integration/ContainerTester.java @@ -93,7 +93,7 @@ public class ContainerTester implements AutoCloseable { NodeAgentContextFactory nodeAgentContextFactory = (nodeSpec, acl) -> NodeAgentContextImpl.builder(nodeSpec).acl(acl).fileSystem(fileSystem).build(); nodeAdminStateUpdater = new NodeAdminStateUpdater(nodeAgentContextFactory, nodeRepository, orchestrator, - nodeAdmin, HOST_HOSTNAME, clock, flagSource); + nodeAdmin, HOST_HOSTNAME); loopThread = new Thread(() -> { nodeAdminStateUpdater.start(); diff --git a/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/nodeadmin/NodeAdminStateUpdaterTest.java b/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/nodeadmin/NodeAdminStateUpdaterTest.java index f9b0070a3d6..5436f84f467 100644 --- a/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/nodeadmin/NodeAdminStateUpdaterTest.java +++ b/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/nodeadmin/NodeAdminStateUpdaterTest.java @@ -3,8 +3,6 @@ package com.yahoo.vespa.hosted.node.admin.nodeadmin; import com.yahoo.config.provision.HostName; import com.yahoo.config.provision.NodeType; -import com.yahoo.test.ManualClock; -import com.yahoo.vespa.flags.InMemoryFlagSource; import com.yahoo.vespa.hosted.node.admin.configserver.noderepository.Acl; import com.yahoo.vespa.hosted.node.admin.configserver.noderepository.NodeSpec; import com.yahoo.vespa.hosted.node.admin.configserver.noderepository.NodeState; @@ -50,11 +48,9 @@ public class NodeAdminStateUpdaterTest { private final Orchestrator orchestrator = mock(Orchestrator.class); private final NodeAdmin nodeAdmin = mock(NodeAdmin.class); private final HostName hostHostname = HostName.from("basehost1.test.yahoo.com"); - private final ManualClock clock = new ManualClock(); - private final InMemoryFlagSource flagSource = new InMemoryFlagSource(); private final NodeAdminStateUpdater updater = spy(new NodeAdminStateUpdater( - nodeAgentContextFactory, nodeRepository, orchestrator, nodeAdmin, hostHostname, clock, flagSource)); + nodeAgentContextFactory, nodeRepository, orchestrator, nodeAdmin, hostHostname)); @Test diff --git a/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/nodeagent/UserNamespaceTest.java b/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/nodeagent/UserNamespaceTest.java index 73b59a17c37..bb02667a550 100644 --- a/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/nodeagent/UserNamespaceTest.java +++ b/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/nodeagent/UserNamespaceTest.java @@ -11,7 +11,7 @@ import static org.junit.jupiter.api.Assertions.assertThrows; */ class UserNamespaceTest { - private final UserNamespace userNamespace = new UserNamespace(1000, 2000, "vespa", "users", 1000, 100); + private final UserNamespace userNamespace = new UserNamespace(1000, 2000); @Test public void translates_between_ids() { diff --git a/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/task/util/fs/ContainerFileSystemTest.java b/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/task/util/fs/ContainerFileSystemTest.java index 4e85052a176..242a2458f07 100644 --- a/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/task/util/fs/ContainerFileSystemTest.java +++ b/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/task/util/fs/ContainerFileSystemTest.java @@ -2,6 +2,7 @@ package com.yahoo.vespa.hosted.node.admin.task.util.fs; import com.yahoo.vespa.hosted.node.admin.nodeagent.UserNamespace; +import com.yahoo.vespa.hosted.node.admin.nodeagent.VespaUser; import com.yahoo.vespa.hosted.node.admin.task.util.file.UnixPath; import com.yahoo.vespa.test.file.TestFileSystem; import org.junit.jupiter.api.Test; @@ -24,8 +25,10 @@ class ContainerFileSystemTest { private final FileSystem fileSystem = TestFileSystem.create(); private final UnixPath containerRootOnHost = new UnixPath(fileSystem.getPath("/data/storage/ctr1")); - private final UserNamespace userNamespace = new UserNamespace(10_000, 11_000, "vespa", "users", 1000, 100); - private final ContainerFileSystem containerFs = ContainerFileSystem.create(containerRootOnHost.createDirectories().toPath(), userNamespace); + private final UserNamespace userNamespace = new UserNamespace(10_000, 11_000); + private final VespaUser vespaUser = new VespaUser("vespa", "users", 1000, 100); + private final ContainerFileSystem containerFs = ContainerFileSystem.create( + containerRootOnHost.createDirectories().toPath(), userNamespace, vespaUser); @Test public void creates_files_and_directories_with_container_root_as_owner() throws IOException { diff --git a/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/task/util/fs/ContainerPathTest.java b/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/task/util/fs/ContainerPathTest.java index 6bca8c2f0b1..795b3b198f5 100644 --- a/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/task/util/fs/ContainerPathTest.java +++ b/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/task/util/fs/ContainerPathTest.java @@ -2,11 +2,18 @@ package com.yahoo.vespa.hosted.node.admin.task.util.fs; import com.yahoo.vespa.hosted.node.admin.nodeagent.UserNamespace; +import com.yahoo.vespa.hosted.node.admin.nodeagent.VespaUser; import com.yahoo.vespa.test.file.TestFileSystem; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.function.Executable; +import java.io.IOException; +import java.nio.file.FileSystem; +import java.nio.file.Files; +import java.nio.file.LinkOption; +import java.nio.file.Path; + import static com.yahoo.vespa.hosted.node.admin.task.util.fs.ContainerPath.fromPathInContainer; import static com.yahoo.vespa.hosted.node.admin.task.util.fs.ContainerPath.fromPathOnHost; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -15,19 +22,13 @@ import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.mock; -import java.io.IOException; -import java.nio.file.FileSystem; -import java.nio.file.Files; -import java.nio.file.LinkOption; -import java.nio.file.Path; - /** * @author valerijf */ class ContainerPathTest { private final FileSystem baseFs = TestFileSystem.create(); - private final ContainerFileSystem containerFs = ContainerFileSystem.create(baseFs.getPath("/data/storage/ctr1"), mock(UserNamespace.class)); + private final ContainerFileSystem containerFs = ContainerFileSystem.create(baseFs.getPath("/data/storage/ctr1"), mock(UserNamespace.class), mock(VespaUser.class)); @Test public void create_new_container_path() { diff --git a/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/task/util/fs/ContainerUserPrincipalLookupServiceTest.java b/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/task/util/fs/ContainerUserPrincipalLookupServiceTest.java index bc26cfa73f3..9a6e69ce27c 100644 --- a/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/task/util/fs/ContainerUserPrincipalLookupServiceTest.java +++ b/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/task/util/fs/ContainerUserPrincipalLookupServiceTest.java @@ -2,14 +2,15 @@ package com.yahoo.vespa.hosted.node.admin.task.util.fs; import com.yahoo.vespa.hosted.node.admin.nodeagent.UserNamespace; +import com.yahoo.vespa.hosted.node.admin.nodeagent.VespaUser; import com.yahoo.vespa.test.file.TestFileSystem; import org.junit.jupiter.api.Test; import java.io.IOException; import java.nio.file.attribute.UserPrincipalNotFoundException; -import static com.yahoo.vespa.hosted.node.admin.task.util.fs.ContainerUserPrincipalLookupService.ContainerUserPrincipal; import static com.yahoo.vespa.hosted.node.admin.task.util.fs.ContainerUserPrincipalLookupService.ContainerGroupPrincipal; +import static com.yahoo.vespa.hosted.node.admin.task.util.fs.ContainerUserPrincipalLookupService.ContainerUserPrincipal; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -18,9 +19,10 @@ import static org.junit.jupiter.api.Assertions.assertThrows; */ class ContainerUserPrincipalLookupServiceTest { - private final UserNamespace userNamespace = new UserNamespace(10_000, 11_000, "vespa", "users", 1000, 100); + private final UserNamespace userNamespace = new UserNamespace(10_000, 11_000); + private final VespaUser vespaUser = new VespaUser("vespa", "users", 1000, 100); private final ContainerUserPrincipalLookupService userPrincipalLookupService = - new ContainerUserPrincipalLookupService(TestFileSystem.create().getUserPrincipalLookupService(), userNamespace); + new ContainerUserPrincipalLookupService(TestFileSystem.create().getUserPrincipalLookupService(), userNamespace, vespaUser); @Test public void correctly_resolves_ids() throws IOException { diff --git a/parent/pom.xml b/parent/pom.xml index 72729b284c2..01c3e335693 100644 --- a/parent/pom.xml +++ b/parent/pom.xml @@ -699,8 +699,8 @@ </dependency> <dependency> <groupId>org.apache.velocity</groupId> - <artifactId>velocity</artifactId> - <version>1.7</version> + <artifactId>velocity-engine-core</artifactId> + <version>2.3</version> </dependency> <dependency> <groupId>org.assertj</groupId> diff --git a/screwdriver/release-java-artifacts.sh b/screwdriver/release-java-artifacts.sh index a91585d6ad5..afb456699b8 100755 --- a/screwdriver/release-java-artifacts.sh +++ b/screwdriver/release-java-artifacts.sh @@ -51,10 +51,10 @@ STG_REPO=$(cat $TMPFILE | grep 'Staging repository at http' | head -1 | awk -F/ rm -f $TMPFILE # Deploy plugins -mvn $COMMON_MAVEN_OPTS --file ./maven-plugins/pom.xml -DskipStagingRepositoryClose=true -DstagingRepositoryId=$STG_REPO deploy +mvn $COMMON_MAVEN_OPTS --file ./maven-plugins/pom.xml -DskipLocalStaging=true -DskipStagingRepositoryClose=true -DstagingRepositoryId=$STG_REPO deploy # Deploy the rest of the artifacts -mvn $COMMON_MAVEN_OPTS --threads 1C -DskipStagingRepositoryClose=true -DstagingRepositoryId=$STG_REPO deploy +mvn $COMMON_MAVEN_OPTS --threads 1C -DskipLocalStaging=true -DskipStagingRepositoryClose=true -DstagingRepositoryId=$STG_REPO deploy # Close with checks mvn $COMMON_MAVEN_OPTS -N org.sonatype.plugins:nexus-staging-maven-plugin:rc-close -DnexusUrl=https://oss.sonatype.org/ -DserverId=ossrh -DstagingRepositoryId=$STG_REPO diff --git a/searchcore/src/apps/vespa-redistribute-bm/vespa_redistribute_bm.cpp b/searchcore/src/apps/vespa-redistribute-bm/vespa_redistribute_bm.cpp index b957226dd8a..f9833b430a0 100644 --- a/searchcore/src/apps/vespa-redistribute-bm/vespa_redistribute_bm.cpp +++ b/searchcore/src/apps/vespa-redistribute-bm/vespa_redistribute_bm.cpp @@ -466,6 +466,7 @@ App::usage() "USAGE:\n"; std::cerr << "vespa-redistribute-bm\n" + "[--async-apply-bucket-diff]\n" "[--bucket-db-stripe-bits bits]\n" "[--client-threads threads]\n" "[--distributor-merge-busy-wait distributor-merge-busy-wait]\n" @@ -481,7 +482,7 @@ App::usage() "[--max-merge-queue-size max-merge-queue-size]\n" "[--max-pending max-pending]\n" "[--max-pending-idealstate-operations max-pending-idealstate-operations]\n" - "[--mbus-distributor-node-max-pending-count] count\n" + "[--mbus-distributor-node-max-pending-count count]\n" "[--mode [grow, shrink, perm-crash, temp-crash, replace]\n" "[--nodes-per-group nodes-per-group]\n" "[--redundancy redundancy]\n" @@ -502,6 +503,7 @@ App::get_options() const char *opt_argument = nullptr; int long_opt_index = 0; static struct option long_opts[] = { + { "async-apply-bucket-diff", 0, nullptr, 0 }, { "bucket-db-stripe-bits", 1, nullptr, 0 }, { "client-threads", 1, nullptr, 0 }, { "distributor-merge-busy-wait", 1, nullptr, 0 }, @@ -532,6 +534,7 @@ App::get_options() { nullptr, 0, nullptr, 0 } }; enum longopts_enum { + LONGOPT_ASYNC_APPLY_BUCKET_DIFF, LONGOPT_BUCKET_DB_STRIPE_BITS, LONGOPT_CLIENT_THREADS, LONGOPT_DISTRIBUTOR_MERGE_BUSY_WAIT, @@ -566,6 +569,9 @@ App::get_options() switch (c) { case 0: switch(long_opt_index) { + case LONGOPT_ASYNC_APPLY_BUCKET_DIFF: + _bm_params.set_async_apply_bucket_diff(true); + break; case LONGOPT_BUCKET_DB_STRIPE_BITS: _bm_params.set_bucket_db_stripe_bits(atoi(opt_argument)); break; diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_params.cpp b/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_params.cpp index 3ff10b19164..4a3466f1a51 100644 --- a/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_params.cpp +++ b/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_params.cpp @@ -6,7 +6,8 @@ namespace search::bmcluster { BmClusterParams::BmClusterParams() - : _bucket_db_stripe_bits(4), + : _async_apply_bucket_diff(), + _bucket_db_stripe_bits(4), _disable_queue_limits_for_chained_merges(false), // Same default as in stor-server.def _distributor_merge_busy_wait(10), // Same default as stor_distributormanager.def _distributor_stripes(0), diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_params.h b/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_params.h index d365a28b0b6..36b4c22f6a8 100644 --- a/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_params.h +++ b/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_params.h @@ -13,6 +13,7 @@ namespace search::bmcluster { */ class BmClusterParams { + std::optional<bool> _async_apply_bucket_diff; uint32_t _bucket_db_stripe_bits; bool _disable_queue_limits_for_chained_merges; uint32_t _distributor_merge_busy_wait; @@ -44,6 +45,7 @@ class BmClusterParams public: BmClusterParams(); ~BmClusterParams(); + const std::optional<bool>& get_async_apply_bucket_diff() const noexcept { return _async_apply_bucket_diff; } uint32_t get_bucket_db_stripe_bits() const { return _bucket_db_stripe_bits; } bool get_disable_queue_limits_for_chained_merges() const noexcept { return _disable_queue_limits_for_chained_merges; } uint32_t get_distributor_merge_busy_wait() const { return _distributor_merge_busy_wait; } @@ -73,6 +75,7 @@ public: bool needs_distributor() const { return _enable_distributor || _use_document_api; } bool needs_message_bus() const { return _use_message_bus || _use_document_api; } bool needs_service_layer() const { return _enable_service_layer || _enable_distributor || _use_storage_chain || _use_message_bus || _use_document_api; } + void set_async_apply_bucket_diff(bool value) { _async_apply_bucket_diff = value; } void set_bucket_db_stripe_bits(uint32_t value) { _bucket_db_stripe_bits = value; } void set_disable_queue_limits_for_chained_merges(bool value) { _disable_queue_limits_for_chained_merges = value; } void set_distributor_merge_busy_wait(uint32_t value) { _distributor_merge_busy_wait = value; } diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_node.cpp b/searchcore/src/vespa/searchcore/bmcluster/bm_node.cpp index 545fb08c762..0882153edd6 100644 --- a/searchcore/src/vespa/searchcore/bmcluster/bm_node.cpp +++ b/searchcore/src/vespa/searchcore/bmcluster/bm_node.cpp @@ -382,6 +382,9 @@ struct ServiceLayerConfigSet : public StorageConfigSet stor_bucket_init(), stor_visitor() { + if (params.get_async_apply_bucket_diff().has_value()) { + stor_filestor.asyncApplyBucketDiff = params.get_async_apply_bucket_diff().value(); + } stor_filestor.numResponseThreads = params.get_response_threads(); stor_filestor.numNetworkThreads = params.get_rpc_network_threads(); stor_filestor.useAsyncMessageHandlingOnSchedule = params.get_use_async_message_handling_on_schedule(); diff --git a/storage/src/tests/persistence/apply_bucket_diff_state_test.cpp b/storage/src/tests/persistence/apply_bucket_diff_state_test.cpp index 188b541c1b8..fe77477fa77 100644 --- a/storage/src/tests/persistence/apply_bucket_diff_state_test.cpp +++ b/storage/src/tests/persistence/apply_bucket_diff_state_test.cpp @@ -1,6 +1,5 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#include <vespa/storage/persistence/apply_bucket_diff_entry_result.h> #include <vespa/storage/persistence/apply_bucket_diff_state.h> #include <vespa/storage/persistence/merge_bucket_info_syncer.h> #include <vespa/document/base/documentid.h> @@ -21,44 +20,49 @@ spi::Result spi_result_ok; spi::Result spi_result_fail(spi::Result::ErrorType::RESOURCE_EXHAUSTED, "write blocked"); document::BucketIdFactory bucket_id_factory; const char *test_op = "put"; -metrics::DoubleAverageMetric dummy_metric("dummy", metrics::DoubleAverageMetric::Tags(), "dummy desc"); document::Bucket dummy_document_bucket(makeDocumentBucket(document::BucketId(0, 16))); class DummyMergeBucketInfoSyncer : public MergeBucketInfoSyncer { uint32_t& _sync_count; + vespalib::string _fail; public: DummyMergeBucketInfoSyncer(uint32_t& sync_count) : MergeBucketInfoSyncer(), - _sync_count(sync_count) + _sync_count(sync_count), + _fail() { } + ~DummyMergeBucketInfoSyncer(); void sync_bucket_info(const spi::Bucket& bucket) const override { EXPECT_EQ(bucket, spi::Bucket(dummy_document_bucket)); ++_sync_count; + if (!_fail.empty()) { + throw std::runtime_error(_fail); + } } + void set_fail(vespalib::string fail) { _fail = std::move(fail); } }; -ApplyBucketDiffEntryResult -make_result(spi::Result &spi_result, const DocumentId &doc_id) +DummyMergeBucketInfoSyncer::~DummyMergeBucketInfoSyncer() = default; + +void +make_result(ApplyBucketDiffState& state, spi::Result &spi_result, const DocumentId &doc_id) { - std::promise<std::pair<std::unique_ptr<spi::Result>, double>> result_promise; - result_promise.set_value(std::make_pair(std::make_unique<spi::Result>(spi_result), 0.1)); - spi::Bucket bucket(makeDocumentBucket(bucket_id_factory.getBucketId(doc_id))); - return ApplyBucketDiffEntryResult(result_promise.get_future(), bucket, doc_id, test_op, dummy_metric); + state.on_entry_complete(std::make_unique<spi::Result>(spi_result), doc_id, test_op); } void push_ok(ApplyBucketDiffState &state) { - state.push_back(make_result(spi_result_ok, DocumentId("id::test::0"))); - state.push_back(make_result(spi_result_ok, DocumentId("id::test::1"))); + make_result(state, spi_result_ok, DocumentId("id::test::0")); + make_result(state, spi_result_ok, DocumentId("id::test::1")); } void push_bad(ApplyBucketDiffState &state) { - state.push_back(make_result(spi_result_ok, DocumentId("id::test::0"))); - state.push_back(make_result(spi_result_fail, DocumentId("id::test::1"))); - state.push_back(make_result(spi_result_fail, DocumentId("id::test::2"))); + make_result(state, spi_result_ok, DocumentId("id::test::0")); + make_result(state, spi_result_fail, DocumentId("id::test::1")); + make_result(state, spi_result_fail, DocumentId("id::test::2")); } } @@ -76,15 +80,19 @@ public: { } + ~ApplyBucketDiffStateTestBase(); + std::unique_ptr<ApplyBucketDiffState> make_state() { return std::make_unique<ApplyBucketDiffState>(syncer, spi::Bucket(dummy_document_bucket)); } }; +ApplyBucketDiffStateTestBase::~ApplyBucketDiffStateTestBase() = default; + class ApplyBucketDiffStateTest : public ApplyBucketDiffStateTestBase { public: - std::unique_ptr<ApplyBucketDiffState> state; + std::shared_ptr<ApplyBucketDiffState> state; ApplyBucketDiffStateTest() : ApplyBucketDiffStateTestBase(), @@ -96,13 +104,14 @@ public: state = make_state(); } + void check_failure(std::string expected) { + auto future = state->get_future(); + state.reset(); + std::string fail_message = future.get(); + EXPECT_EQ(expected, fail_message); + } void check_failure() { - try { - state->check(); - FAIL() << "Failed to throw exception for failed result"; - } catch (std::exception &e) { - EXPECT_EQ("Failed put for id::test::1 in Bucket(0xeb4700c03842cac4): Result(5, write blocked)", std::string(e.what())); - } + check_failure("Failed put for id::test::1 in Bucket(0x0000000000000010): Result(5, write blocked)"); } }; @@ -110,7 +119,7 @@ public: TEST_F(ApplyBucketDiffStateTest, ok_results_can_be_checked) { push_ok(*state); - state->check(); + check_failure(""); } TEST_F(ApplyBucketDiffStateTest, failed_result_errors_ignored) @@ -147,4 +156,12 @@ TEST_F(ApplyBucketDiffStateTest, explicit_sync_bucket_info_works) EXPECT_EQ(1, sync_count); } +TEST_F(ApplyBucketDiffStateTest, failed_sync_bucket_info_is_detected) +{ + vespalib::string fail("sync bucket failed"); + syncer.set_fail(fail); + state->mark_stale_bucket_info(); + check_failure(fail); +} + } diff --git a/storage/src/vespa/storage/persistence/CMakeLists.txt b/storage/src/vespa/storage/persistence/CMakeLists.txt index 4b41d3fa778..c737d2bed28 100644 --- a/storage/src/vespa/storage/persistence/CMakeLists.txt +++ b/storage/src/vespa/storage/persistence/CMakeLists.txt @@ -2,7 +2,6 @@ vespa_add_library(storage_spersistence OBJECT SOURCES apply_bucket_diff_entry_complete.cpp - apply_bucket_diff_entry_result.cpp apply_bucket_diff_state.cpp asynchandler.cpp bucketownershipnotifier.cpp diff --git a/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_complete.cpp b/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_complete.cpp index 6d6d56351ec..1fbe155b16d 100644 --- a/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_complete.cpp +++ b/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_complete.cpp @@ -1,15 +1,19 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "apply_bucket_diff_entry_complete.h" +#include "apply_bucket_diff_state.h" #include <vespa/persistence/spi/result.h> #include <cassert> namespace storage { -ApplyBucketDiffEntryComplete::ApplyBucketDiffEntryComplete(ResultPromise result_promise, const framework::Clock& clock) +ApplyBucketDiffEntryComplete::ApplyBucketDiffEntryComplete(std::shared_ptr<ApplyBucketDiffState> state, document::DocumentId doc_id, const char *op, const framework::Clock& clock, metrics::DoubleAverageMetric& latency_metric) : _result_handler(nullptr), - _result_promise(std::move(result_promise)), - _start_time(clock) + _state(std::move(state)), + _doc_id(std::move(doc_id)), + _op(op), + _start_time(clock), + _latency_metric(latency_metric) { } @@ -21,7 +25,9 @@ ApplyBucketDiffEntryComplete::onComplete(std::unique_ptr<spi::Result> result) if (_result_handler != nullptr) { _result_handler->handle(*result); } - _result_promise.set_value(std::make_pair(std::move(result), _start_time.getElapsedTimeAsDouble())); + double elapsed = _start_time.getElapsedTimeAsDouble(); + _latency_metric.addValue(elapsed); + _state->on_entry_complete(std::move(result), _doc_id, _op); } void diff --git a/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_complete.h b/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_complete.h index d1794666e54..dd2346d9dee 100644 --- a/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_complete.h +++ b/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_complete.h @@ -2,24 +2,30 @@ #pragma once +#include <vespa/document/base/documentid.h> +#include <vespa/metrics/valuemetric.h> #include <vespa/persistence/spi/operationcomplete.h> #include <vespa/storageframework/generic/clock/timer.h> #include <future> namespace storage { +class ApplyBucketDiffState; + /* * Complete handler for a bucket diff entry spi operation (putAsync * or removeAsync) */ class ApplyBucketDiffEntryComplete : public spi::OperationComplete { - using ResultPromise = std::promise<std::pair<std::unique_ptr<spi::Result>, double>>; - const spi::ResultHandler* _result_handler; - ResultPromise _result_promise; - framework::MilliSecTimer _start_time; + const spi::ResultHandler* _result_handler; + std::shared_ptr<ApplyBucketDiffState> _state; + document::DocumentId _doc_id; + const char* _op; + framework::MilliSecTimer _start_time; + metrics::DoubleAverageMetric& _latency_metric; public: - ApplyBucketDiffEntryComplete(ResultPromise result_promise, const framework::Clock& clock); + ApplyBucketDiffEntryComplete(std::shared_ptr<ApplyBucketDiffState> state, document::DocumentId doc_id, const char *op, const framework::Clock& clock, metrics::DoubleAverageMetric& latency_metric); ~ApplyBucketDiffEntryComplete(); void onComplete(std::unique_ptr<spi::Result> result) override; void addResultHandler(const spi::ResultHandler* resultHandler) override; diff --git a/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_result.cpp b/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_result.cpp deleted file mode 100644 index abd09d647a5..00000000000 --- a/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_result.cpp +++ /dev/null @@ -1,46 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#include "apply_bucket_diff_entry_result.h" -#include <vespa/persistence/spi/result.h> -#include <vespa/vespalib/stllike/asciistream.h> -#include <cassert> - -namespace storage { - -ApplyBucketDiffEntryResult::ApplyBucketDiffEntryResult(FutureResult future_result, spi::Bucket bucket, document::DocumentId doc_id, const char *op, metrics::DoubleAverageMetric& latency_metric) - : _future_result(std::move(future_result)), - _bucket(bucket), - _doc_id(std::move(doc_id)), - _op(op), - _latency_metric(latency_metric) -{ -} - -ApplyBucketDiffEntryResult::ApplyBucketDiffEntryResult(ApplyBucketDiffEntryResult &&rhs) = default; - -ApplyBucketDiffEntryResult::~ApplyBucketDiffEntryResult() = default; - -void -ApplyBucketDiffEntryResult::wait() -{ - assert(_future_result.valid()); - _future_result.wait(); -} - -void -ApplyBucketDiffEntryResult::check_result() -{ - assert(_future_result.valid()); - auto result = _future_result.get(); - if (result.first->hasError()) { - vespalib::asciistream ss; - ss << "Failed " << _op - << " for " << _doc_id.toString() - << " in " << _bucket - << ": " << result.first->toString(); - throw std::runtime_error(ss.str()); - } - _latency_metric.addValue(result.second); -} - -} diff --git a/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_result.h b/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_result.h deleted file mode 100644 index f2213070831..00000000000 --- a/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_result.h +++ /dev/null @@ -1,33 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#pragma once - -#include <vespa/document/base/documentid.h> -#include <vespa/persistence/spi/bucket.h> -#include <vespa/metrics/valuemetric.h> -#include <future> - -namespace storage::spi { class Result; } - -namespace storage { - -/* - * Result of a bucket diff entry spi operation (putAsync or removeAsync) - */ -class ApplyBucketDiffEntryResult { - using FutureResult = std::future<std::pair<std::unique_ptr<spi::Result>, double>>; - FutureResult _future_result; - spi::Bucket _bucket; - document::DocumentId _doc_id; - const char* _op; - metrics::DoubleAverageMetric& _latency_metric; - -public: - ApplyBucketDiffEntryResult(FutureResult future_result, spi::Bucket bucket, document::DocumentId doc_id, const char *op, metrics::DoubleAverageMetric& latency_metric); - ApplyBucketDiffEntryResult(ApplyBucketDiffEntryResult &&rhs); - ~ApplyBucketDiffEntryResult(); - void wait(); - void check_result(); -}; - -} diff --git a/storage/src/vespa/storage/persistence/apply_bucket_diff_state.cpp b/storage/src/vespa/storage/persistence/apply_bucket_diff_state.cpp index c754796ba51..eb7a5ef5bc6 100644 --- a/storage/src/vespa/storage/persistence/apply_bucket_diff_state.cpp +++ b/storage/src/vespa/storage/persistence/apply_bucket_diff_state.cpp @@ -1,64 +1,50 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "apply_bucket_diff_state.h" -#include "apply_bucket_diff_entry_result.h" #include "mergehandler.h" +#include <vespa/document/base/documentid.h> +#include <vespa/persistence/spi/result.h> +#include <vespa/vespalib/stllike/asciistream.h> + +using storage::spi::Result; namespace storage { ApplyBucketDiffState::ApplyBucketDiffState(const MergeBucketInfoSyncer& merge_bucket_info_syncer, const spi::Bucket& bucket) - : _async_results(), - _merge_bucket_info_syncer(merge_bucket_info_syncer), + : _merge_bucket_info_syncer(merge_bucket_info_syncer), _bucket(bucket), - _stale_bucket_info(false) + _fail_message(), + _failed_flag(), + _stale_bucket_info(false), + _promise() { } ApplyBucketDiffState::~ApplyBucketDiffState() { - wait(); try { sync_bucket_info(); - } catch (std::exception&) { - } -} - -bool -ApplyBucketDiffState::empty() const -{ - return _async_results.empty(); -} - -void -ApplyBucketDiffState::wait() -{ - if (!_async_results.empty()) { - _async_results.back().wait(); + } catch (std::exception& e) { + if (_fail_message.empty()) { + _fail_message = e.what(); + } } - for (auto &result_to_check : _async_results) { - result_to_check.wait(); + if (_promise.has_value()) { + _promise.value().set_value(_fail_message); } } void -ApplyBucketDiffState::check() +ApplyBucketDiffState::on_entry_complete(std::unique_ptr<Result> result, const document::DocumentId &doc_id, const char *op) { - wait(); - try { - for (auto& result_to_check : _async_results) { - result_to_check.check_result(); - } - } catch (std::exception&) { - _async_results.clear(); - throw; + if (result->hasError() && !_failed_flag.test_and_set()) { + vespalib::asciistream ss; + ss << "Failed " << op + << " for " << doc_id.toString() + << " in " << _bucket + << ": " << result->toString(); + _fail_message = ss.str(); } - _async_results.clear(); -} - -void -ApplyBucketDiffState::push_back(ApplyBucketDiffEntryResult&& result) -{ - _async_results.push_back(std::move(result)); } void @@ -76,4 +62,11 @@ ApplyBucketDiffState::sync_bucket_info() } } +std::future<vespalib::string> +ApplyBucketDiffState::get_future() +{ + _promise = std::promise<vespalib::string>(); + return _promise.value().get_future(); +} + } diff --git a/storage/src/vespa/storage/persistence/apply_bucket_diff_state.h b/storage/src/vespa/storage/persistence/apply_bucket_diff_state.h index 489e75e4a72..af4174b06d6 100644 --- a/storage/src/vespa/storage/persistence/apply_bucket_diff_state.h +++ b/storage/src/vespa/storage/persistence/apply_bucket_diff_state.h @@ -3,8 +3,13 @@ #pragma once #include <vespa/persistence/spi/bucket.h> +#include <future> +#include <memory> #include <vector> +namespace document { class DocumentId; } +namespace storage::spi { class Result; } + namespace storage { class ApplyBucketDiffEntryResult; @@ -15,19 +20,22 @@ class MergeBucketInfoSyncer; * for one or more ApplyBucketDiffCommand / ApplyBucketDiffReply. */ class ApplyBucketDiffState { - std::vector<ApplyBucketDiffEntryResult> _async_results; const MergeBucketInfoSyncer& _merge_bucket_info_syncer; spi::Bucket _bucket; + vespalib::string _fail_message; + std::atomic_flag _failed_flag; bool _stale_bucket_info; + std::optional<std::promise<vespalib::string>> _promise; + public: ApplyBucketDiffState(const MergeBucketInfoSyncer &merge_bucket_info_syncer, const spi::Bucket& bucket); ~ApplyBucketDiffState(); - void push_back(ApplyBucketDiffEntryResult&& result); - bool empty() const; + void on_entry_complete(std::unique_ptr<storage::spi::Result> result, const document::DocumentId &doc_id, const char *op); void wait(); void check(); void mark_stale_bucket_info(); void sync_bucket_info(); + std::future<vespalib::string> get_future(); }; } diff --git a/storage/src/vespa/storage/persistence/mergehandler.cpp b/storage/src/vespa/storage/persistence/mergehandler.cpp index 963fddd9fb5..d2737089bba 100644 --- a/storage/src/vespa/storage/persistence/mergehandler.cpp +++ b/storage/src/vespa/storage/persistence/mergehandler.cpp @@ -3,7 +3,6 @@ #include "mergehandler.h" #include "persistenceutil.h" #include "apply_bucket_diff_entry_complete.h" -#include "apply_bucket_diff_entry_result.h" #include "apply_bucket_diff_state.h" #include <vespa/storage/persistence/filestorage/mergestatus.h> #include <vespa/persistence/spi/persistenceprovider.h> @@ -96,6 +95,17 @@ struct DiffEntryTimestampPredicate { } }; + +void check_apply_diff_sync(std::shared_ptr<ApplyBucketDiffState> async_results) { + auto future = async_results->get_future(); + async_results.reset(); + future.wait(); + auto fail_message = future.get(); + if (!fail_message.empty()) { + throw std::runtime_error(fail_message); + } +} + } // anonymous namespace void @@ -483,27 +493,24 @@ MergeHandler::deserializeDiffDocument( return doc; } -ApplyBucketDiffEntryResult -MergeHandler::applyDiffEntry(const spi::Bucket& bucket, +void +MergeHandler::applyDiffEntry(std::shared_ptr<ApplyBucketDiffState> async_results, + const spi::Bucket& bucket, const api::ApplyBucketDiffCommand::Entry& e, spi::Context& context, const document::DocumentTypeRepo& repo) const { - std::promise<std::pair<std::unique_ptr<spi::Result>, double>> result_promise; - auto future_result = result_promise.get_future(); spi::Timestamp timestamp(e._entry._timestamp); if (!(e._entry._flags & (DELETED | DELETED_IN_PLACE))) { // Regular put entry Document::SP doc(deserializeDiffDocument(e, repo)); DocumentId docId = doc->getId(); - auto complete = std::make_unique<ApplyBucketDiffEntryComplete>(std::move(result_promise), _clock); + auto complete = std::make_unique<ApplyBucketDiffEntryComplete>(std::move(async_results), std::move(docId), "put", _clock, _env._metrics.merge_handler_metrics.put_latency); _spi.putAsync(bucket, timestamp, std::move(doc), context, std::move(complete)); - return ApplyBucketDiffEntryResult(std::move(future_result), bucket, std::move(docId), "put", _env._metrics.merge_handler_metrics.put_latency); } else { DocumentId docId(e._docName); - auto complete = std::make_unique<ApplyBucketDiffEntryComplete>(std::move(result_promise), _clock); + auto complete = std::make_unique<ApplyBucketDiffEntryComplete>(std::move(async_results), std::move(docId), "remove", _clock, _env._metrics.merge_handler_metrics.remove_latency); _spi.removeAsync(bucket, timestamp, docId, context, std::move(complete)); - return ApplyBucketDiffEntryResult(std::move(future_result), bucket, std::move(docId), "remove", _env._metrics.merge_handler_metrics.remove_latency); } } @@ -516,7 +523,7 @@ MergeHandler::applyDiffLocally( std::vector<api::ApplyBucketDiffCommand::Entry>& diff, uint8_t nodeIndex, spi::Context& context, - ApplyBucketDiffState& async_results) const + std::shared_ptr<ApplyBucketDiffState> async_results) const { // Sort the data to apply by which file they should be added to LOG(spam, "Merge(%s): Applying data locally. Diff has %zu entries", @@ -527,7 +534,7 @@ MergeHandler::applyDiffLocally( uint32_t addedCount = 0; uint32_t notNeededByteCount = 0; - async_results.mark_stale_bucket_info(); + async_results->mark_stale_bucket_info(); std::vector<spi::DocEntry::UP> entries; populateMetaData(bucket, MAX_TIMESTAMP, entries, context); @@ -565,7 +572,7 @@ MergeHandler::applyDiffLocally( ++i; LOG(spam, "ApplyBucketDiff(%s): Adding slot %s", bucket.toString().c_str(), e.toString().c_str()); - async_results.push_back(applyDiffEntry(bucket, e, context, repo)); + applyDiffEntry(async_results, bucket, e, context, repo); } else { assert(spi::Timestamp(e._entry._timestamp) == existing.getTimestamp()); // Diffing for existing timestamp; should either both be put @@ -578,7 +585,7 @@ MergeHandler::applyDiffLocally( "timestamp in %s. Diff slot: %s. Existing slot: %s", bucket.toString().c_str(), e.toString().c_str(), existing.toString().c_str()); - async_results.push_back(applyDiffEntry(bucket, e, context, repo)); + applyDiffEntry(async_results, bucket, e, context, repo); } else { // Duplicate put, just ignore it. LOG(debug, "During diff apply, attempting to add slot " @@ -610,7 +617,7 @@ MergeHandler::applyDiffLocally( LOG(spam, "ApplyBucketDiff(%s): Adding slot %s", bucket.toString().c_str(), e.toString().c_str()); - async_results.push_back(applyDiffEntry(bucket, e, context, repo)); + applyDiffEntry(async_results, bucket, e, context, repo); byteCount += e._headerBlob.size() + e._bodyBlob.size(); } if (byteCount + notNeededByteCount != 0) { @@ -1204,7 +1211,7 @@ MergeHandler::handleApplyBucketDiff(api::ApplyBucketDiffCommand& cmd, MessageTra tracker->setMetric(_env._metrics.applyBucketDiff); spi::Bucket bucket(cmd.getBucket()); - ApplyBucketDiffState async_results(*this, bucket); + auto async_results = std::make_shared<ApplyBucketDiffState>(*this, bucket); LOG(debug, "%s", cmd.toString().c_str()); if (_env._fileStorHandler.isMerging(bucket.getBucket())) { @@ -1229,8 +1236,7 @@ MergeHandler::handleApplyBucketDiff(api::ApplyBucketDiffCommand& cmd, MessageTra applyDiffLocally(bucket, cmd.getDiff(), index, tracker->context(), async_results); _env._metrics.merge_handler_metrics.mergeDataWriteLatency.addValue( startTime.getElapsedTimeAsDouble()); - async_results.check(); - async_results.sync_bucket_info(); + check_apply_diff_sync(std::move(async_results)); } else { LOG(spam, "Merge(%s): Didn't need fetched data on node %u (%u).", bucket.toString().c_str(), _env._nodeIndex, index); @@ -1289,7 +1295,7 @@ MergeHandler::handleApplyBucketDiffReply(api::ApplyBucketDiffReply& reply,Messag (void) tracker; _env._metrics.applyBucketDiffReply.inc(); spi::Bucket bucket(reply.getBucket()); - ApplyBucketDiffState async_results(*this, bucket); + auto async_results = std::make_shared<ApplyBucketDiffState>(*this, bucket); std::vector<api::ApplyBucketDiffCommand::Entry>& diff(reply.getDiff()); LOG(debug, "%s", reply.toString().c_str()); @@ -1325,8 +1331,7 @@ MergeHandler::handleApplyBucketDiffReply(api::ApplyBucketDiffReply& reply,Messag framework::MilliSecTimer startTime(_clock); applyDiffLocally(bucket, diff, index, s->context, async_results); _env._metrics.merge_handler_metrics.mergeDataWriteLatency.addValue(startTime.getElapsedTimeAsDouble()); - async_results.check(); - async_results.sync_bucket_info(); + check_apply_diff_sync(std::move(async_results)); } else { LOG(spam, "Merge(%s): Didn't need fetched data on node %u (%u)", bucket.toString().c_str(), diff --git a/storage/src/vespa/storage/persistence/mergehandler.h b/storage/src/vespa/storage/persistence/mergehandler.h index 0ff8f3c0ef8..f6e8ddcf306 100644 --- a/storage/src/vespa/storage/persistence/mergehandler.h +++ b/storage/src/vespa/storage/persistence/mergehandler.h @@ -62,7 +62,7 @@ public: std::vector<api::ApplyBucketDiffCommand::Entry>& diff, uint8_t nodeIndex, spi::Context& context, - ApplyBucketDiffState& async_results) const; + std::shared_ptr<ApplyBucketDiffState> async_results) const; void sync_bucket_info(const spi::Bucket& bucket) const override; MessageTrackerUP handleMergeBucket(api::MergeBucketCommand&, MessageTrackerUP) const; @@ -90,10 +90,11 @@ private: * Invoke either put, remove or unrevertable remove on the SPI * depending on the flags in the diff entry. */ - ApplyBucketDiffEntryResult applyDiffEntry(const spi::Bucket&, - const api::ApplyBucketDiffCommand::Entry&, - spi::Context& context, - const document::DocumentTypeRepo& repo) const; + void applyDiffEntry(std::shared_ptr<ApplyBucketDiffState> async_results, + const spi::Bucket&, + const api::ApplyBucketDiffCommand::Entry&, + spi::Context& context, + const document::DocumentTypeRepo& repo) const; /** * Fill entries-vector with metadata for bucket up to maxTimestamp, diff --git a/testutil/pom.xml b/testutil/pom.xml index 26bb3056e2f..dc795a66dab 100644 --- a/testutil/pom.xml +++ b/testutil/pom.xml @@ -37,11 +37,6 @@ <scope>compile</scope> </dependency> <dependency> - <groupId>uk.co.datumedge</groupId> - <artifactId>hamcrest-json</artifactId> - <scope>compile</scope> - </dependency> - <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <scope>compile</scope> diff --git a/testutil/src/main/java/com/yahoo/test/json/JsonTestHelper.java b/testutil/src/main/java/com/yahoo/test/json/JsonTestHelper.java index 08afb2fd95f..05532d2a504 100644 --- a/testutil/src/main/java/com/yahoo/test/json/JsonTestHelper.java +++ b/testutil/src/main/java/com/yahoo/test/json/JsonTestHelper.java @@ -5,11 +5,10 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Joiner; -import org.hamcrest.MatcherAssert; import java.io.UncheckedIOException; -import static uk.co.datumedge.hamcrest.json.SameJSONAs.sameJSONAs; +import static org.junit.Assert.assertEquals; /** * @author Vegard Sjonfjell @@ -29,7 +28,13 @@ public class JsonTestHelper { /** Structurally compare two JSON encoded strings */ public static void assertJsonEquals(String inputJson, String expectedJson) { - MatcherAssert.assertThat(inputJson, sameJSONAs(expectedJson)); + try { + JsonNode expected = mapper.readTree(expectedJson); + JsonNode actual = mapper.readTree(inputJson); + assertEquals(expected, actual); + } catch (JsonProcessingException e) { + throw new RuntimeException("Exception when comparing json strings." , e); + } } /** Structurally compare a {@link JsonNode} and a JSON string. */ |