aboutsummaryrefslogtreecommitdiffstats
path: root/clustercontroller-core/src/main
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@oath.com>2018-04-20 17:04:34 +0200
committerTor Brede Vekterli <vekterli@oath.com>2018-04-24 14:12:54 +0200
commit2e736fdd071fc3ad72912afe96c767c7902870d1 (patch)
tree452c8e9d9f63c8007d77e536123791ff35e5c4f3 /clustercontroller-core/src/main
parent6a7351dc77ad191540b61555360d20caea94c156 (diff)
ZooKeeper-persist and load published cluster state bundles
Store synchronously upon each new versioned state, load whenever controller is elected master. Effectively carries over visible node states from one controller's lifetime to the next. This removes the edge case where default bucket space content nodes are marked as in Maintainence until their global merge status is known. To avoid controller tripping over its own feet, state bundles are now _not_ versioned at all until the initial send time period has passed. This prevents overwriting the state persisted from a previous controller with a transient state where all nodes are down due to not having Slobrok contact yet. A new cluster state recompute+send edge has been added when the master passes its initial state send time period.
Diffstat (limited to 'clustercontroller-core/src/main')
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ClusterStateBundle.java4
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java95
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/StateVersionTracker.java18
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java18
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/Database.java6
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/DatabaseFactory.java34
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/DatabaseHandler.java52
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/ZooKeeperDatabase.java93
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/ZooKeeperDatabaseFactory.java14
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/listeners/SystemStateListener.java5
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/EnvelopedClusterStateBundleCodec.java19
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/SlimeClusterStateBundleCodec.java31
12 files changed, 316 insertions, 73 deletions
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ClusterStateBundle.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ClusterStateBundle.java
index 9291c8277da..76177e8f1c1 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ClusterStateBundle.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ClusterStateBundle.java
@@ -78,6 +78,10 @@ public class ClusterStateBundle {
return new ClusterStateBundle(baselineState, Collections.emptyMap());
}
+ public static ClusterStateBundle empty() {
+ return ofBaselineOnly(AnnotatedClusterState.emptyState());
+ }
+
public AnnotatedClusterState getBaselineAnnotatedState() {
return baselineState;
}
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java
index 54c6c11f81e..e781bf3b145 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java
@@ -10,6 +10,7 @@ import com.yahoo.vdslib.state.Node;
import com.yahoo.vdslib.state.NodeState;
import com.yahoo.vdslib.state.State;
import com.yahoo.vespa.clustercontroller.core.database.DatabaseHandler;
+import com.yahoo.vespa.clustercontroller.core.database.ZooKeeperDatabaseFactory;
import com.yahoo.vespa.clustercontroller.core.hostinfo.HostInfo;
import com.yahoo.vespa.clustercontroller.core.listeners.*;
import com.yahoo.vespa.clustercontroller.core.rpc.RPCCommunicator;
@@ -25,7 +26,18 @@ import org.apache.commons.lang.exception.ExceptionUtils;
import java.io.FileNotFoundException;
-import java.util.*;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.TimeZone;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Logger;
import java.util.stream.Collectors;
@@ -54,7 +66,7 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd
private AtomicBoolean running = new AtomicBoolean(true);
private FleetControllerOptions options;
private FleetControllerOptions nextOptions;
- private final List<SystemStateListener> systemStateListeners = new LinkedList<>();
+ private final List<SystemStateListener> systemStateListeners = new CopyOnWriteArrayList<>();
private boolean processingCycle = false;
private boolean wantedStateChanged = false;
private long cycleCount = 0;
@@ -185,7 +197,7 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd
options.nodeStateRequestTimeoutEarliestPercentage,
options.nodeStateRequestTimeoutLatestPercentage,
options.nodeStateRequestRoundTripTimeMaxSeconds);
- DatabaseHandler database = new DatabaseHandler(timer, options.zooKeeperServerAddress, options.fleetControllerIndex, timer);
+ DatabaseHandler database = new DatabaseHandler(new ZooKeeperDatabaseFactory(), timer, options.zooKeeperServerAddress, options.fleetControllerIndex, timer);
NodeLookup lookUp = new SlobrokClient(timer);
StateChangeHandler stateGenerator = new StateChangeHandler(timer, log, metricUpdater);
SystemStateBroadcaster stateBroadcaster = new SystemStateBroadcaster(timer, timer);
@@ -219,6 +231,12 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd
}
}
+ public ClusterStateBundle getClusterStateBundle() {
+ synchronized (monitor) {
+ return systemStateBroadcaster.getClusterStateBundle();
+ }
+ }
+
public void schedule(RemoteClusterControllerTask task) {
synchronized (monitor) {
log.fine("Scheduled remote task " + task.getClass().getName() + " for execution");
@@ -228,13 +246,13 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd
/** Used for unit testing. */
public void addSystemStateListener(SystemStateListener listener) {
- synchronized (systemStateListeners) {
- systemStateListeners.add(listener);
- // Always give cluster state listeners the current state, in case acceptable state has come before listener is registered.
- com.yahoo.vdslib.state.ClusterState state = getSystemState();
- if (state == null) throw new NullPointerException("Cluster state should never be null at this point");
- listener.handleNewSystemState(ClusterStateBundle.ofBaselineOnly(AnnotatedClusterState.withoutAnnotations(state)));
+ systemStateListeners.add(listener);
+ // Always give cluster state listeners the current state, in case acceptable state has come before listener is registered.
+ com.yahoo.vdslib.state.ClusterState state = getSystemState();
+ if (state == null) {
+ throw new NullPointerException("Cluster state should never be null at this point");
}
+ listener.handleNewPublishedState(ClusterStateBundle.ofBaselineOnly(AnnotatedClusterState.withoutAnnotations(state)));
}
public FleetControllerOptions getOptions() {
@@ -351,7 +369,7 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd
}
@Override
- public void handleNewSystemState(ClusterStateBundle stateBundle) {
+ public void handleNewPublishedState(ClusterStateBundle stateBundle) {
verifyInControllerThread();
ClusterState baselineState = stateBundle.getBaselineClusterState();
newStates.add(stateBundle);
@@ -361,13 +379,14 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd
// nodes so that a cluster controller crash after publishing but before a successful
// ZK store will not risk reusing the same version number.
if (masterElectionHandler.isMaster()) {
- storeClusterStateVersionToZooKeeper(baselineState);
+ storeClusterStateMetaDataToZooKeeper(stateBundle);
}
}
- private void storeClusterStateVersionToZooKeeper(ClusterState state) {
+ private void storeClusterStateMetaDataToZooKeeper(ClusterStateBundle stateBundle) {
try {
- database.saveLatestSystemStateVersion(databaseContext, state.getVersion());
+ database.saveLatestSystemStateVersion(databaseContext, stateBundle.getVersion());
+ database.saveLatestClusterStateBundle(databaseContext, stateBundle);
} catch (InterruptedException e) {
// Rethrow as RuntimeException to propagate exception up to main thread method.
// Don't want to hide failures to write cluster state version.
@@ -654,6 +673,7 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd
}
sentAny = systemStateBroadcaster.broadcastNewState(databaseContext, communicator);
if (sentAny) {
+ // FIXME won't this inhibit resending to unresponsive nodes?
nextStateSendTime = currentTime + options.minTimeBetweenNewSystemStates;
}
}
@@ -665,7 +685,7 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd
synchronized (systemStateListeners) {
for (ClusterStateBundle stateBundle : newStates) {
for(SystemStateListener listener : systemStateListeners) {
- listener.handleNewSystemState(stateBundle);
+ listener.handleNewPublishedState(stateBundle);
}
}
newStates.clear();
@@ -782,12 +802,21 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd
eventLog.add(new ClusterEvent(ClusterEvent.Type.MASTER_ELECTION, "This node just became node state gatherer as we are fleetcontroller master candidate.", timer.getCurrentTimeInMillis()));
// Update versions to use so what is shown is closer to what is reality on the master
stateVersionTracker.setVersionRetrievedFromZooKeeper(database.getLatestSystemStateVersion());
+ stateChangeHandler.setStateChangedFlag();
}
}
isStateGatherer = true;
return didWork;
}
+ private void invokeCandidateStateListeners(ClusterStateBundle candidateBundle) {
+ systemStateListeners.forEach(listener -> listener.handleNewCandidateState(candidateBundle));
+ }
+
+ private boolean hasPassedFirstStateBroadcastTimePoint(long timeNowMs) {
+ return timeNowMs >= firstAllowedStateBroadcast || cluster.allStatesReported();
+ }
+
private boolean recomputeClusterStateIfRequired() {
boolean stateWasChanged = false;
if (mustRecomputeCandidateClusterState()) {
@@ -800,16 +829,18 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd
.stateDeriver(createBucketSpaceStateDeriver())
.deriveAndBuild();
stateVersionTracker.updateLatestCandidateStateBundle(candidateBundle);
+ invokeCandidateStateListeners(candidateBundle);
- if (stateVersionTracker.candidateChangedEnoughFromCurrentToWarrantPublish()
- || stateVersionTracker.hasReceivedNewVersionFromZooKeeper())
+ final long timeNowMs = timer.getCurrentTimeInMillis();
+ if (hasPassedFirstStateBroadcastTimePoint(timeNowMs)
+ && (stateVersionTracker.candidateChangedEnoughFromCurrentToWarrantPublish()
+ || stateVersionTracker.hasReceivedNewVersionFromZooKeeper()))
{
- final long timeNowMs = timer.getCurrentTimeInMillis();
final ClusterStateBundle before = stateVersionTracker.getVersionedClusterStateBundle();
stateVersionTracker.promoteCandidateToVersionedState(timeNowMs);
emitEventsForAlteredStateEdges(before, stateVersionTracker.getVersionedClusterStateBundle(), timeNowMs);
- handleNewSystemState(stateVersionTracker.getVersionedClusterStateBundle());
+ handleNewPublishedState(stateVersionTracker.getVersionedClusterStateBundle());
stateWasChanged = true;
}
}
@@ -893,10 +924,19 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd
}
}
+ private boolean atFirstClusterStateSendTimeEdge() {
+ // We only care about triggering a state recomputation for the master, which is the only
+ // one allowed to actually broadcast any states.
+ if (!isMaster || systemStateBroadcaster.hasBroadcastedClusterStateBundle()) {
+ return false;
+ }
+ return hasPassedFirstStateBroadcastTimePoint(timer.getCurrentTimeInMillis());
+ }
+
private boolean mustRecomputeCandidateClusterState() {
return stateChangeHandler.stateMayHaveChanged()
- || stateVersionTracker.hasReceivedNewVersionFromZooKeeper()
- || stateVersionTracker.bucketSpaceMergeCompletionStateHasChanged();
+ || stateVersionTracker.bucketSpaceMergeCompletionStateHasChanged()
+ || atFirstClusterStateSendTimeEdge();
}
private boolean handleLeadershipEdgeTransitions() throws InterruptedException {
@@ -904,16 +944,25 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd
if (masterElectionHandler.isMaster()) {
if ( ! isMaster) {
metricUpdater.becameMaster();
- // If we just became master, restore wanted states from database
+ // If we just became master, restore state from ZooKeeper
+ stateChangeHandler.setStateChangedFlag();
+ systemStateBroadcaster.resetBroadcastedClusterStateBundle();
+
stateVersionTracker.setVersionRetrievedFromZooKeeper(database.getLatestSystemStateVersion());
- didWork = database.loadStartTimestamps(cluster);
- didWork |= database.loadWantedStates(databaseContext);
+ ClusterStateBundle previousBundle = database.getLatestClusterStateBundle();
+ database.loadStartTimestamps(cluster);
+ database.loadWantedStates(databaseContext);
+
+ log.info(() -> String.format("Loaded previous cluster state bundle from ZooKeeper: %s", previousBundle));
+ stateVersionTracker.setClusterStateBundleRetrievedFromZooKeeper(previousBundle);
+
eventLog.add(new ClusterEvent(ClusterEvent.Type.MASTER_ELECTION, "This node just became fleetcontroller master. Bumped version to "
+ stateVersionTracker.getCurrentVersion() + " to be in line.", timer.getCurrentTimeInMillis()));
long currentTime = timer.getCurrentTimeInMillis();
firstAllowedStateBroadcast = currentTime + options.minTimeBeforeFirstSystemStateBroadcast;
log.log(LogLevel.DEBUG, "At time " + currentTime + " we set first system state broadcast time to be "
+ options.minTimeBeforeFirstSystemStateBroadcast + " ms after at time " + firstAllowedStateBroadcast + ".");
+ didWork = true;
}
isMaster = true;
if (wantedStateChanged) {
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/StateVersionTracker.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/StateVersionTracker.java
index 177efe31766..e2f98cf5492 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/StateVersionTracker.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/StateVersionTracker.java
@@ -50,6 +50,21 @@ public class StateVersionTracker {
this.lastZooKeeperVersion = this.currentVersion;
}
+ void setClusterStateBundleRetrievedFromZooKeeper(ClusterStateBundle bundle) {
+ // There is an edge where the bundle version will mismatch with the version set
+ // via setVersionRetrievedFromZooKeeper() if the controller (or ZK) crashes before
+ // it can write both sequentially. But since we use the ZK-written version explicitly
+ // when choosing a new version for our own published states, it should not matter in
+ // practice. Worst case is that the current state reflects the same version that a
+ // previous controller had, but we will never publish this state ourselves; publishing
+ // only happens after we've generated our own, new candidate state and overwritten
+ // the empty states set below. Publishing also, as mentioned, sets a version based on
+ // the ZK version, not the version stored in the bundle itself.
+ currentClusterState = bundle;
+ currentUnversionedState = ClusterStateBundle.empty();
+ latestCandidateState = ClusterStateBundle.empty();
+ }
+
/**
* Sets limit on how many cluster states can be kept in the in-memory queue. Once
* the list exceeds this limit, the oldest state is repeatedly removed until the limit
@@ -140,8 +155,7 @@ public class StateVersionTracker {
}
private void recordCurrentStateInHistoryAtTime(final long currentTimeMs) {
- clusterStateHistory.addFirst(new ClusterStateHistoryEntry(
- currentClusterState, currentTimeMs));
+ clusterStateHistory.addFirst(new ClusterStateHistoryEntry(currentClusterState, currentTimeMs));
while (clusterStateHistory.size() > maxHistoryEntryCount) {
clusterStateHistory.removeLast();
}
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java
index b0869b560f6..629800fb13c 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java
@@ -40,6 +40,18 @@ public class SystemStateBroadcaster {
return clusterStateBundle.getBaselineClusterState();
}
+ public boolean hasBroadcastedClusterStateBundle() {
+ return clusterStateBundle != null;
+ }
+
+ public void resetBroadcastedClusterStateBundle() {
+ clusterStateBundle = null;
+ }
+
+ public ClusterStateBundle getClusterStateBundle() {
+ return clusterStateBundle;
+ }
+
private void reportNodeError(boolean nodeOk, NodeInfo info, String message) {
long time = timer.getCurrentTimeInMillis();
Long lastReported = lastErrorReported.get(info.getNode());
@@ -113,10 +125,10 @@ public class SystemStateBroadcaster {
void checkIfClusterStateIsAckedByAllDistributors(DatabaseHandler database,
DatabaseHandler.Context dbContext,
FleetController fleetController) throws InterruptedException {
- final int currentStateVersion = clusterStateBundle.getVersion();
- if ((clusterStateBundle == null) || (lastClusterStateInSync == currentStateVersion)) {
+ if ((clusterStateBundle == null) || (lastClusterStateInSync == clusterStateBundle.getVersion())) {
return; // Nothing to do for the current state
}
+ final int currentStateVersion = clusterStateBundle.getVersion();
boolean anyOutdatedDistributorNodes = dbContext.getCluster().getNodeInfo().stream()
.filter(NodeInfo::isDistributor)
.anyMatch(this::nodeNeedsClusterState);
@@ -137,7 +149,7 @@ public class SystemStateBroadcaster {
if (!baselineState.isOfficial()) {
log.log(LogLevel.INFO, String.format("Publishing cluster state version %d", baselineState.getVersion()));
- baselineState.setOfficial(true);
+ baselineState.setOfficial(true); // FIXME this violates state bundle immutability
}
List<NodeInfo> recipients = resolveStateVersionSendSet(dbContext);
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/Database.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/Database.java
index ac9f1473b57..c98e362c2d2 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/Database.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/Database.java
@@ -3,6 +3,7 @@ package com.yahoo.vespa.clustercontroller.core.database;
import com.yahoo.vdslib.state.Node;
import com.yahoo.vdslib.state.NodeState;
+import com.yahoo.vespa.clustercontroller.core.ClusterStateBundle;
import java.util.Map;
@@ -82,4 +83,9 @@ public abstract class Database {
* Fetch the start times of distributor and service layer nodes.
*/
public abstract Map<Node, Long> retrieveStartTimestamps() throws InterruptedException;
+
+ public abstract boolean storeLastPublishedStateBundle(ClusterStateBundle stateBundle) throws InterruptedException;
+
+ public abstract ClusterStateBundle retrieveLastPublishedStateBundle() throws InterruptedException;
+
}
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/DatabaseFactory.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/DatabaseFactory.java
new file mode 100644
index 00000000000..3e7b98fe3fb
--- /dev/null
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/DatabaseFactory.java
@@ -0,0 +1,34 @@
+// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.clustercontroller.core.database;
+
+import com.yahoo.vespa.clustercontroller.core.ContentCluster;
+
+/**
+ * Database factory to enable test mocking of DB features. In practice, this
+ * will always be {@link ZooKeeperDatabase} due to rather heavy ZK feature
+ * dependencies and leaky abstractions built on top of them.
+ */
+public interface DatabaseFactory {
+
+ class Params {
+ ContentCluster cluster;
+ int nodeIndex;
+ String dbAddress;
+ int dbSessionTimeout;
+ Database.DatabaseListener listener;
+
+ Params cluster(ContentCluster c) { this.cluster = c; return this; }
+ ContentCluster cluster() { return cluster; }
+ Params nodeIndex(int i) { this.nodeIndex = i; return this; }
+ int nodeIndex() { return nodeIndex; }
+ Params databaseAddress(String address) { this.dbAddress = address; return this; }
+ String databaseAddress() { return dbAddress; }
+ Params databaseSessionTimeout(int timeout) { this.dbSessionTimeout = timeout; return this; }
+ int databaseSessionTimeout() { return dbSessionTimeout; }
+ Params databaseListener(Database.DatabaseListener listener) { this.listener = listener; return this; }
+ Database.DatabaseListener databaseListener() { return listener; }
+ }
+
+ Database create(Params params) throws Exception;
+
+}
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/DatabaseHandler.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/DatabaseHandler.java
index c89bb6a136a..fcf5b58e62d 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/DatabaseHandler.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/DatabaseHandler.java
@@ -5,6 +5,8 @@ import com.yahoo.log.LogLevel;
import com.yahoo.vdslib.state.Node;
import com.yahoo.vdslib.state.NodeState;
import com.yahoo.vdslib.state.State;
+import com.yahoo.vespa.clustercontroller.core.AnnotatedClusterState;
+import com.yahoo.vespa.clustercontroller.core.ClusterStateBundle;
import com.yahoo.vespa.clustercontroller.core.FleetController;
import com.yahoo.vespa.clustercontroller.core.NodeInfo;
import com.yahoo.vespa.clustercontroller.core.Timer;
@@ -39,12 +41,14 @@ public class DatabaseHandler {
Integer lastSystemStateVersion;
Map<Node, NodeState> wantedStates;
Map<Node, Long> startTimestamps;
+ ClusterStateBundle clusterStateBundle;
void clear() {
masterVote = null;
lastSystemStateVersion = null;
wantedStates = null;
startTimestamps = null;
+ clusterStateBundle = null;
}
}
private class DatabaseListener implements Database.DatabaseListener {
@@ -68,15 +72,13 @@ public class DatabaseHandler {
}
}
+ private final DatabaseFactory databaseFactory;
private final Timer timer;
private final int nodeIndex;
private final Object monitor;
private String zooKeeperAddress;
private int zooKeeperSessionTimeout = 5000;
private final Object databaseMonitor = new Object();
-
- /** This is always ZooKeeperDatabase */
- // TODO: Get rid of the interface as it is both unnecessary and gives a false impression of independence
private Database database;
private DatabaseListener dbListener = new DatabaseListener();
@@ -87,8 +89,9 @@ public class DatabaseHandler {
private boolean lostZooKeeperConnectionEvent = false;
private Map<Integer, Integer> masterDataEvent = null;
- public DatabaseHandler(Timer timer, String zooKeeperAddress, int ourIndex, Object monitor) throws InterruptedException
+ public DatabaseHandler(DatabaseFactory databaseFactory, Timer timer, String zooKeeperAddress, int ourIndex, Object monitor) throws InterruptedException
{
+ this.databaseFactory = databaseFactory;
this.timer = timer;
this.nodeIndex = ourIndex;
pendingStore.masterVote = ourIndex; // To begin with we'll vote for ourselves.
@@ -166,8 +169,13 @@ public class DatabaseHandler {
clearSessionMetaData();
log.log(LogLevel.INFO,
"Fleetcontroller " + nodeIndex + ": Setting up new ZooKeeper session at " + zooKeeperAddress);
- database = new ZooKeeperDatabase(cluster,
- nodeIndex, zooKeeperAddress, zooKeeperSessionTimeout, dbListener);
+ DatabaseFactory.Params params = new DatabaseFactory.Params()
+ .cluster(cluster)
+ .nodeIndex(nodeIndex)
+ .databaseAddress(zooKeeperAddress)
+ .databaseSessionTimeout(zooKeeperSessionTimeout)
+ .databaseListener(dbListener);
+ database = databaseFactory.create(params);
}
} catch (KeeperException.NodeExistsException e) {
log.log(LogLevel.DEBUG, "Fleetcontroller " + nodeIndex + ": Cannot create ephemeral fleetcontroller node. ZooKeeper server "
@@ -279,6 +287,15 @@ public class DatabaseHandler {
return didWork;
}
}
+ if (pendingStore.clusterStateBundle != null) {
+ didWork = true;
+ if (database.storeLastPublishedStateBundle(pendingStore.clusterStateBundle)) {
+ currentlyStored.clusterStateBundle = pendingStore.clusterStateBundle;
+ pendingStore.clusterStateBundle = null;
+ } else {
+ return true;
+ }
+ }
}
return didWork;
}
@@ -325,11 +342,28 @@ public class DatabaseHandler {
if (usingZooKeeper()) {
log.log(LogLevel.WARNING, "Fleetcontroller " + nodeIndex + ": Failed to retrieve latest system state version from ZooKeeper. Returning version 0.");
}
- return 0;
+ return 0; // FIXME "fail-oblivious" is not a good error handling mode for such a critical component!
}
return version;
}
+ public void saveLatestClusterStateBundle(Context context, ClusterStateBundle clusterStateBundle) throws InterruptedException {
+ log.log(LogLevel.DEBUG, () -> String.format("Fleetcontroller %d: Scheduling bundle %s to be saved to ZooKeeper", nodeIndex, clusterStateBundle));
+ pendingStore.clusterStateBundle = clusterStateBundle;
+ doNextZooKeeperTask(context);
+ }
+
+ public ClusterStateBundle getLatestClusterStateBundle() throws InterruptedException {
+ log.log(LogLevel.DEBUG, () -> String.format("Fleetcontroller %d: Retrieving latest cluster state bundle from ZooKeeper", nodeIndex));
+ synchronized (databaseMonitor) {
+ if (database != null && !database.isClosed()) {
+ return database.retrieveLastPublishedStateBundle();
+ } else {
+ return ClusterStateBundle.empty();
+ }
+ }
+ }
+
public void saveWantedStates(Context context) throws InterruptedException {
log.log(LogLevel.DEBUG, "Fleetcontroller " + nodeIndex + ": Checking whether wanted states have changed compared to zookeeper version.");
Map<Node, NodeState> wantedStates = new TreeMap<>();
@@ -399,7 +433,9 @@ public class DatabaseHandler {
public boolean loadStartTimestamps(ContentCluster cluster) throws InterruptedException {
log.log(LogLevel.DEBUG, "Fleetcontroller " + nodeIndex + ": Retrieving start timestamps");
synchronized (databaseMonitor) {
- if (database == null || database.isClosed()) return false;
+ if (database == null || database.isClosed()) {
+ return false;
+ }
currentlyStored.startTimestamps = database.retrieveStartTimestamps();
}
Map<Node, Long> startTimestamps = currentlyStored.startTimestamps;
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/ZooKeeperDatabase.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/ZooKeeperDatabase.java
index 5399430fd59..fe1d9a2a5cd 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/ZooKeeperDatabase.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/ZooKeeperDatabase.java
@@ -1,7 +1,11 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.vespa.clustercontroller.core.database;
+import com.yahoo.vespa.clustercontroller.core.AnnotatedClusterState;
+import com.yahoo.vespa.clustercontroller.core.ClusterStateBundle;
import com.yahoo.vespa.clustercontroller.core.ContentCluster;
+import com.yahoo.vespa.clustercontroller.core.rpc.EnvelopedClusterStateBundleCodec;
+import com.yahoo.vespa.clustercontroller.core.rpc.SlimeClusterStateBundleCodec;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.data.ACL;
@@ -128,6 +132,7 @@ public class ZooKeeperDatabase extends Database {
createNode(zooKeeperRoot, "wantedstates", new byte[0]);
createNode(zooKeeperRoot, "starttimestamps", new byte[0]);
createNode(zooKeeperRoot, "latestversion", new Integer(0).toString().getBytes(utf8));
+ createNode(zooKeeperRoot, "published_state_bundle", new byte[0]); // TODO dedupe string constants
byte val[] = String.valueOf(nodeIndex).getBytes(utf8);
deleteNodeIfExists(getMyIndexPath());
log.log(LogLevel.INFO, "Fleetcontroller " + nodeIndex +
@@ -165,6 +170,15 @@ public class ZooKeeperDatabase extends Database {
return (!sessionOpen || watcher.getState().equals(Watcher.Event.KeeperState.Expired));
}
+ private void maybeLogExceptionWarning(Exception e, String message) {
+ if (sessionOpen && reportErrors) {
+ StringWriter sw = new StringWriter();
+ e.printStackTrace(new PrintWriter(sw));
+ log.log(LogLevel.WARNING, String.format("Fleetcontroller %s: %s. Exception: %s\n%s",
+ nodeIndex, message, e.getMessage(), sw.toString()));
+ }
+ }
+
public boolean storeMasterVote(int wantedMasterIndex) throws InterruptedException {
byte val[] = String.valueOf(wantedMasterIndex).getBytes(utf8);
try{
@@ -174,11 +188,7 @@ public class ZooKeeperDatabase extends Database {
} catch (InterruptedException e) {
throw (InterruptedException) new InterruptedException("Interrupted").initCause(e);
} catch (Exception e) {
- if (sessionOpen && reportErrors) {
- StringWriter sw = new StringWriter();
- e.printStackTrace(new PrintWriter(sw));
- log.log(LogLevel.WARNING, "Fleetcontroller " + nodeIndex + ": Failed to create our ephemeral node and store master vote:\n" + sw);
- }
+ maybeLogExceptionWarning(e, "Failed to create our ephemeral node and store master vote");
}
return false;
}
@@ -191,11 +201,7 @@ public class ZooKeeperDatabase extends Database {
} catch (InterruptedException e) {
throw (InterruptedException) new InterruptedException("Interrupted").initCause(e);
} catch (Exception e) {
- if (sessionOpen && reportErrors) {
- StringWriter sw = new StringWriter();
- e.printStackTrace(new PrintWriter(sw));
- log.log(LogLevel.WARNING, "Fleetcontroller " + nodeIndex + ": Failed to store latest system state version used " + version + "\n" + sw);
- }
+ maybeLogExceptionWarning(e, "Failed to store latest system state version used " + version);
return false;
}
}
@@ -211,11 +217,7 @@ public class ZooKeeperDatabase extends Database {
} catch (InterruptedException e) {
throw (InterruptedException) new InterruptedException("Interrupted").initCause(e);
} catch (Exception e) {
- if (sessionOpen && reportErrors) {
- StringWriter sw = new StringWriter();
- e.printStackTrace(new PrintWriter(sw));
- log.log(LogLevel.WARNING, "Fleetcontroller " + nodeIndex + ": Failed to retrieve latest system state version used. Returning null.\n" + sw);
- }
+ maybeLogExceptionWarning(e, "Failed to retrieve latest system state version used. Returning null");
return null;
}
}
@@ -242,11 +244,7 @@ public class ZooKeeperDatabase extends Database {
} catch (InterruptedException e) {
throw (InterruptedException) new InterruptedException("Interrupted").initCause(e);
} catch (Exception e) {
- if (sessionOpen && reportErrors) {
- StringWriter sw = new StringWriter();
- e.printStackTrace(new PrintWriter(sw));
- log.log(LogLevel.WARNING, "Fleetcontroller " + nodeIndex + ": Failed to store wanted states in zookeeper: " + e.getMessage() + "\n" + sw);
- }
+ maybeLogExceptionWarning(e, "Failed to store wanted states in ZooKeeper");
return false;
}
}
@@ -276,11 +274,7 @@ public class ZooKeeperDatabase extends Database {
} catch (InterruptedException e) {
throw (InterruptedException) new InterruptedException("Interrupted").initCause(e);
} catch (Exception e) {
- if (sessionOpen && reportErrors) {
- StringWriter sw = new StringWriter();
- e.printStackTrace(new PrintWriter(sw));
- log.log(LogLevel.WARNING, "Fleetcontroller " + nodeIndex + ": Failed to retrieve wanted states from zookeeper: " + e.getMessage() + "\n" + sw);
- }
+ maybeLogExceptionWarning(e, "Failed to retrieve wanted states from ZooKeeper");
return null;
}
}
@@ -301,11 +295,7 @@ public class ZooKeeperDatabase extends Database {
} catch (InterruptedException e) {
throw (InterruptedException) new InterruptedException("Interrupted").initCause(e);
} catch (Exception e) {
- if (sessionOpen && reportErrors) {
- StringWriter sw = new StringWriter();
- e.printStackTrace(new PrintWriter(sw));
- log.log(LogLevel.WARNING, "Fleetcontroller " + nodeIndex + ": Failed to store start timestamps in zookeeper: " + e.getMessage() + "\n" + sw);
- }
+ maybeLogExceptionWarning(e, "Failed to store start timestamps in ZooKeeper");
return false;
}
}
@@ -336,12 +326,45 @@ public class ZooKeeperDatabase extends Database {
} catch (InterruptedException e) {
throw (InterruptedException) new InterruptedException("Interrupted").initCause(e);
} catch (Exception e) {
- if (sessionOpen && reportErrors) {
- StringWriter sw = new StringWriter();
- e.printStackTrace(new PrintWriter(sw));
- log.log(LogLevel.WARNING, "Fleetcontroller " + nodeIndex + ": Failed to retrieve start timestamps from zookeeper: " + e.getMessage() + "\n" + sw);
- }
+ maybeLogExceptionWarning(e, "Failed to retrieve start timestamps from ZooKeeper");
return null;
}
}
+
+ @Override
+ public boolean storeLastPublishedStateBundle(ClusterStateBundle stateBundle) throws InterruptedException {
+ EnvelopedClusterStateBundleCodec envelopedBundleCodec = new SlimeClusterStateBundleCodec();
+ byte[] encodedBundle = envelopedBundleCodec.encodeWithEnvelope(stateBundle);
+ try{
+ log.log(LogLevel.DEBUG, () -> String.format("Fleetcontroller %d: Storing published state bundle %s at '%spublished_state_bundle'",
+ nodeIndex, stateBundle, zooKeeperRoot));
+ // TODO CAS on expected zknode version
+ session.setData(zooKeeperRoot + "published_state_bundle", encodedBundle, -1);
+ } catch (InterruptedException e) {
+ throw (InterruptedException) new InterruptedException("Interrupted").initCause(e);
+ } catch (Exception e) {
+ maybeLogExceptionWarning(e, "Failed to store start timestamps in ZooKeeper");
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public ClusterStateBundle retrieveLastPublishedStateBundle() throws InterruptedException {
+ Stat stat = new Stat();
+ try {
+ byte[] data = session.getData(zooKeeperRoot + "published_state_bundle", false, stat);
+ if (data != null && data.length != 0) {
+ EnvelopedClusterStateBundleCodec envelopedBundleCodec = new SlimeClusterStateBundleCodec();
+ return envelopedBundleCodec.decodeWithEnvelope(data);
+ }
+ } catch (InterruptedException e) {
+ throw (InterruptedException) new InterruptedException("Interrupted").initCause(e);
+ } catch (Exception e) {
+ maybeLogExceptionWarning(e, "Failed to retrieve last published cluster state bundle from " +
+ "ZooKeeper, will use an empty state as baseline");
+ }
+ return ClusterStateBundle.ofBaselineOnly(AnnotatedClusterState.emptyState());
+ }
+
}
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/ZooKeeperDatabaseFactory.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/ZooKeeperDatabaseFactory.java
new file mode 100644
index 00000000000..64dfcccebc9
--- /dev/null
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/ZooKeeperDatabaseFactory.java
@@ -0,0 +1,14 @@
+// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.clustercontroller.core.database;
+
+import com.yahoo.vespa.clustercontroller.core.ContentCluster;
+
+public class ZooKeeperDatabaseFactory implements DatabaseFactory {
+
+ @Override
+ public Database create(Params params) throws Exception {
+ return new ZooKeeperDatabase(params.cluster, params.nodeIndex, params.dbAddress,
+ params.dbSessionTimeout, params.listener);
+ }
+
+}
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/listeners/SystemStateListener.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/listeners/SystemStateListener.java
index 40fe91f0c1e..764bb3a0d92 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/listeners/SystemStateListener.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/listeners/SystemStateListener.java
@@ -5,6 +5,9 @@ import com.yahoo.vespa.clustercontroller.core.ClusterStateBundle;
public interface SystemStateListener {
- void handleNewSystemState(ClusterStateBundle states);
+ // TODO consider rename to bundle
+ void handleNewPublishedState(ClusterStateBundle states);
+
+ default void handleNewCandidateState(ClusterStateBundle states) {}
}
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/EnvelopedClusterStateBundleCodec.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/EnvelopedClusterStateBundleCodec.java
new file mode 100644
index 00000000000..ab98952cddb
--- /dev/null
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/EnvelopedClusterStateBundleCodec.java
@@ -0,0 +1,19 @@
+// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.clustercontroller.core.rpc;
+
+import com.yahoo.vespa.clustercontroller.core.ClusterStateBundle;
+
+/**
+ * Cluster state bundle codec which opaquely encodes/decodes both the bundle
+ * as well as the metadata required to correctly perform compression/decompression.
+ *
+ * Useful for embedding an opaque bundle blob somewhere without needing to care aboout
+ * any of the associated metadata.
+ */
+public interface EnvelopedClusterStateBundleCodec {
+
+ byte[] encodeWithEnvelope(ClusterStateBundle stateBundle);
+
+ ClusterStateBundle decodeWithEnvelope(byte[] encodedClusterStateBundle);
+
+}
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/SlimeClusterStateBundleCodec.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/SlimeClusterStateBundleCodec.java
index f6034cb34ff..1c391f9aacf 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/SlimeClusterStateBundleCodec.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/SlimeClusterStateBundleCodec.java
@@ -18,8 +18,11 @@ import java.util.Map;
*
* LZ4 compression is transparently applied during encoding and decompression is
* subsequently applied during decoding.
+ *
+ * Implements optional Slime-based enveloping for *WithEnvelope methods, which removes
+ * need to explicitly track compression metadata by the caller.
*/
-public class SlimeClusterStateBundleCodec implements ClusterStateBundleCodec {
+public class SlimeClusterStateBundleCodec implements ClusterStateBundleCodec, EnvelopedClusterStateBundleCodec {
private static final Compressor compressor = new Compressor(CompressionType.LZ4, 3, 0.90, 1024);
@@ -55,4 +58,30 @@ public class SlimeClusterStateBundleCodec implements ClusterStateBundleCodec {
return ClusterStateBundle.of(AnnotatedClusterState.withoutAnnotations(baseline), derivedStates);
}
+
+ // Technically the Slime enveloping could be its own class that is bundle codec independent, but
+ // realistically there won't be any other implementations. Can be trivially factored out if required.
+ @Override
+ public byte[] encodeWithEnvelope(ClusterStateBundle stateBundle) {
+ EncodedClusterStateBundle toEnvelope = encode(stateBundle);
+
+ Slime slime = new Slime();
+ Cursor root = slime.setObject();
+ root.setLong("compression-type", toEnvelope.getCompression().type().getCode());
+ root.setLong("uncompressed-size", toEnvelope.getCompression().uncompressedSize());
+ root.setData("data", toEnvelope.getCompression().data());
+ return BinaryFormat.encode(slime);
+ }
+
+ @Override
+ public ClusterStateBundle decodeWithEnvelope(byte[] encodedClusterStateBundle) {
+ // We expect ZK writes to be atomic, letting us not worry about partially written or corrupted blobs.
+ Slime slime = BinaryFormat.decode(encodedClusterStateBundle);
+ Inspector root = slime.get();
+ CompressionType compressionType = CompressionType.valueOf((byte)root.field("compression-type").asLong());
+ int uncompressedSize = (int)root.field("uncompressed-size").asLong();
+ byte[] data = root.field("data").asData();
+ return decode(EncodedClusterStateBundle.fromCompressionBuffer(
+ new Compressor.Compression(compressionType, uncompressedSize, data)));
+ }
}