aboutsummaryrefslogtreecommitdiffstats
path: root/zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/Reconfigurer.java
blob: 0702d01b9d0a3f8a93e05424c8032c48652790b1 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.vespa.zookeeper;

import com.yahoo.cloud.config.ZookeeperServerConfig;
import com.yahoo.component.AbstractComponent;
import com.yahoo.component.annotation.Inject;
import com.yahoo.protect.Process;
import com.yahoo.yolean.Exceptions;

import java.time.Duration;
import java.time.Instant;
import java.util.Objects;
import java.util.function.Supplier;
import java.util.logging.Level;
import java.util.logging.Logger;

import static java.util.stream.Collectors.joining;

/**
 * Starts zookeeper server and supports reconfiguring zookeeper cluster. Keep this as a component
 * without injected config, to make sure that it is not recreated when config changes.
 *
 * @author hmusum
 */
public class Reconfigurer extends AbstractComponent {

    private static final Logger log = java.util.logging.Logger.getLogger(Reconfigurer.class.getName());

    static final Duration TIMEOUT = Duration.ofMinutes(15);

    private final ExponentialBackoff backoff = new ExponentialBackoff(Duration.ofMillis(50), Duration.ofSeconds(10));
    private final Duration timeout;
    private final boolean haltOnFailure;
    private final VespaZooKeeperAdmin vespaZooKeeperAdmin;
    private final Sleeper sleeper;

    private QuorumPeer peer;
    private ZooKeeperRunner zooKeeperRunner;
    private ZookeeperServerConfig activeConfig;

    @Inject
    public Reconfigurer(VespaZooKeeperAdmin vespaZooKeeperAdmin) {
        this(vespaZooKeeperAdmin, new Sleeper(), true, TIMEOUT);
    }

    public Reconfigurer(VespaZooKeeperAdmin vespaZooKeeperAdmin, Sleeper sleeper, boolean haltOnFailure, Duration timeout) {
        this.vespaZooKeeperAdmin = Objects.requireNonNull(vespaZooKeeperAdmin);
        this.sleeper = Objects.requireNonNull(sleeper);
        this.haltOnFailure = haltOnFailure;
        this.timeout = timeout;
    }

    @Override
    public void deconstruct() {
        shutdown();
    }

    QuorumPeer startOrReconfigure(ZookeeperServerConfig newConfig,
                                  VespaZooKeeperServer server,
                                  Supplier<QuorumPeer> quorumPeerCreator) {
        if (zooKeeperRunner == null) {
            peer = quorumPeerCreator.get(); // Obtain the peer from the server. This will be shared with later servers.
            zooKeeperRunner = startServer(newConfig, server);
        }

        if (newConfig.dynamicReconfiguration()) {
            reconfigure(newConfig);
        }
        return peer;
    }

    ZookeeperServerConfig activeConfig() {
        return activeConfig;
    }

    void shutdown() {
        if (zooKeeperRunner != null) {
            zooKeeperRunner.shutdown();
        }
    }

    private ZooKeeperRunner startServer(ZookeeperServerConfig zookeeperServerConfig, VespaZooKeeperServer server) {
        ZooKeeperRunner runner = new ZooKeeperRunner(zookeeperServerConfig, server);
        activeConfig = zookeeperServerConfig;
        return runner;
    }

    // TODO jonmv: read dynamic file, discard if old quorum impossible (config file + .dynamic.<id>)
    // TODO jonmv: if dynamic file, all unlisted servers are observers; otherwise joiners are observers
    // TODO jonmv: wrap Curator in Provider, for Curator shutdown
    private void reconfigure(ZookeeperServerConfig newConfig) {
        Instant reconfigTriggered = Instant.now();
        String newServers = servers(newConfig);
        log.log(Level.INFO, "Will reconfigure ZooKeeper cluster." +
                            "\nServers in active config:" + servers(activeConfig) +
                            "\nServers in new config:" + newServers);
        String connectionSpec = vespaZooKeeperAdmin.localConnectionSpec(activeConfig);
        Instant now = Instant.now();
        // For reconfig to succeed, the current and resulting ensembles must have a majority. When an ensemble grows and
        // the joining servers outnumber the existing ones, we have to wait for enough of them to start to have a majority.
        Instant end = now.plus(timeout);
        // Loop reconfiguring since we might need to wait until another reconfiguration is finished before we can succeed
        for (int attempt = 1; ; attempt++) {
            try {
                Instant reconfigStarted = Instant.now();
                vespaZooKeeperAdmin.reconfigure(connectionSpec, newServers);
                Instant reconfigEnded = Instant.now();
                log.log(Level.INFO, "Reconfiguration completed in " +
                                    Duration.between(reconfigTriggered, reconfigEnded) +
                                    ", after " + attempt + " attempt(s). ZooKeeper reconfig call took " +
                                    Duration.between(reconfigStarted, reconfigEnded));
                activeConfig = newConfig;
                return;
            } catch (ReconfigException e) {
                Duration delay = backoff.delay(attempt);
                now = Instant.now();
                if (now.isBefore(end)) {
                    log.log(Level.INFO, "Reconfiguration attempt " + attempt + " failed. Retrying in " + delay +
                                           ", time left " + Duration.between(now, end) + ": " + Exceptions.toMessageString(e));
                    sleeper.sleep(delay);
                }
                else {
                    log.log(Level.SEVERE, "Reconfiguration attempt " + attempt + " failed, and was failing for " +
                                          timeout + "; giving up now: " + Exceptions.toMessageString(e));
                    shutdown();
                    if (haltOnFailure)
                        Process.logAndDie("Reconfiguration did not complete within timeout " + timeout + ". Forcing container shutdown.");
                    else
                        throw e;
                }
            }
        }
    }

    private static String servers(ZookeeperServerConfig config) {
        return Configurator.getServerConfig(config.server().stream().filter(server -> ! server.retired()).toList(), -1)
                           .entrySet().stream().map(entry -> entry.getKey() + "=" + entry.getValue()).collect(joining(","));
    }

}