summaryrefslogtreecommitdiffstats
path: root/zookeeper-server/zookeeper-server-3.5.6/src/main/java/com/yahoo/vespa/zookeeper/Reconfigurer.java
blob: 6a87d8547c127ffd03c14e5c768473b5ccc1c340 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.vespa.zookeeper;

import com.google.inject.Inject;
import com.yahoo.cloud.config.ZookeeperServerConfig;
import com.yahoo.component.AbstractComponent;
import com.yahoo.net.HostName;
import com.yahoo.yolean.Exceptions;

import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Consumer;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Collectors;

/**
 * Starts zookeeper server and supports reconfiguring zookeeper cluster. Created as a component
 * without any config injected, to make sure that it is not recreated when config changes.
 *
 * @author hmusum
 */
public class Reconfigurer extends AbstractComponent {

    private static final Logger log = java.util.logging.Logger.getLogger(Reconfigurer.class.getName());

    // How long to wait before triggering reconfig. This is multiplied by the node ID
    private static final Duration reconfigInterval = Duration.ofSeconds(5);

    // Total timeout for a reconfiguration
    private static final Duration reconfigTimeout = Duration.ofSeconds(30);

    // How long to wait between each retry
    private static final Duration retryWait = Duration.ofSeconds(1);

    private ZooKeeperRunner zooKeeperRunner;
    private ZookeeperServerConfig activeConfig;

    protected final ZkAdmin zkAdmin;

    @Inject
    public Reconfigurer(ZkAdmin zkAdmin) {
        this.zkAdmin = zkAdmin;
        log.log(Level.FINE, "Created ZooKeeperReconfigurer");
    }

    void startOrReconfigure(ZookeeperServerConfig newConfig) {
        startOrReconfigure(newConfig, Reconfigurer::defaultSleeper);
    }

    void startOrReconfigure(ZookeeperServerConfig newConfig, Consumer<Duration> sleeper) {
        if (zooKeeperRunner == null)
            zooKeeperRunner = startServer(newConfig);

        if (shouldReconfigure(newConfig))
            reconfigure(newConfig, sleeper);
    }

    ZookeeperServerConfig activeConfig() {
        return activeConfig;
    }

    void shutdown() {
        if (zooKeeperRunner != null) {
            zooKeeperRunner.shutdown();
        }
    }

    private boolean shouldReconfigure(ZookeeperServerConfig newConfig) {
        if (!newConfig.dynamicReconfiguration()) return false;
        if (activeConfig == null) return false;
        return !newConfig.equals(activeConfig());
    }

    private ZooKeeperRunner startServer(ZookeeperServerConfig zookeeperServerConfig) {
        ZooKeeperRunner runner = new ZooKeeperRunner(zookeeperServerConfig);
        activeConfig = zookeeperServerConfig;
        return runner;
    }

    private void reconfigure(ZookeeperServerConfig newConfig, Consumer<Duration> sleeper) {
        Instant reconfigTriggered = Instant.now();
        String leavingServers = String.join(",", difference(serverIds(activeConfig), serverIds(newConfig)));
        String joiningServers = String.join(",", difference(servers(newConfig), servers(activeConfig)));
        leavingServers = leavingServers.isEmpty() ? null : leavingServers;
        joiningServers = joiningServers.isEmpty() ? null : joiningServers;
        log.log(Level.INFO, "Will reconfigure ZooKeeper cluster in " + reconfigWaitPeriod() +
                            ". Joining servers: " + joiningServers + ", leaving servers: " + leavingServers);
        sleeper.accept(reconfigWaitPeriod());
        String connectionSpec = localConnectionSpec(activeConfig);
        Instant end = Instant.now().plus(reconfigTimeout);
        // Loop reconfiguring since we might need to wait until another reconfiguration is finished before we can succeed
        for (int attempts = 1; Instant.now().isBefore(end); attempts++) {
            try {
                Instant reconfigStarted = Instant.now();
                zkAdmin.reconfigure(connectionSpec, joiningServers, leavingServers);
                Instant reconfigEnded = Instant.now();
                log.log(Level.INFO, "Reconfiguration completed in " +
                                    Duration.between(reconfigTriggered, reconfigEnded) +
                                    ", after " + attempts + " attempt(s). ZooKeeper reconfig call took " +
                                    Duration.between(reconfigStarted, reconfigEnded));
                activeConfig = newConfig;
                return;
            } catch (ReconfigException e) {
                log.log(Level.INFO, "Reconfiguration failed. Retrying in " + retryWait + ": " +
                                    Exceptions.toMessageString(e));
                sleeper.accept(retryWait);
            }
        }
    }

    /** Returns how long this node should wait before reconfiguring the cluster */
    private Duration reconfigWaitPeriod() {
        if (activeConfig == null) return Duration.ZERO;
        return reconfigInterval.multipliedBy(activeConfig.myid());
    }

    private static String localConnectionSpec(ZookeeperServerConfig config) {
        return HostName.getLocalhost() + ":" + config.clientPort();
    }

    private static List<String> serverIds(ZookeeperServerConfig config) {
        return config.server().stream()
                     .map(ZookeeperServerConfig.Server::id)
                     .map(String::valueOf)
                     .collect(Collectors.toList());
    }

    private static List<String> servers(ZookeeperServerConfig config) {
        // See https://zookeeper.apache.org/doc/r3.5.8/zookeeperReconfig.html#sc_reconfig_clientport for format
        return config.server().stream()
                     .map(server -> server.id() + "=" + server.hostname() + ":" + server.quorumPort() + ":" +
                                    server.electionPort() + ";" + config.clientPort())
                     .collect(Collectors.toList());
    }

    private static <T> List<T> difference(List<T> list1, List<T> list2) {
        List<T> copy = new ArrayList<>(list1);
        copy.removeAll(list2);
        return copy;
    }

    private static void defaultSleeper(Duration duration) {
        try {
            Thread.sleep(duration.toMillis());
        } catch (InterruptedException interruptedException) {
            interruptedException.printStackTrace();
        }
    }

}