aboutsummaryrefslogtreecommitdiffstats
path: root/zkfacade/src/main/java/com/yahoo/vespa/curator/CuratorCompletionWaiter.java
blob: 9a8b9b5bf607b887db4eb4f17c7bec08e6d1eac2 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.vespa.curator;

import com.yahoo.path.Path;
import com.yahoo.vespa.curator.transaction.CuratorOperations;
import com.yahoo.vespa.curator.transaction.CuratorTransaction;

import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.logging.Level;

import static com.yahoo.vespa.curator.Curator.CompletionWaiter;

/**
 * Implementation of a Barrier that handles the case where more than number of members can call synchronize.
 * Will wait for some time for all servers to do the operation, but will accept the majority of servers to have
 * done the operation if it takes longer than a specified amount of time.
 *
 * @author Vegard Havdal
 * @author Ulf Lilleengen
 */
class CuratorCompletionWaiter implements CompletionWaiter {

    private static final java.util.logging.Logger log = java.util.logging.Logger.getLogger(CuratorCompletionWaiter.class.getName());

    private final Curator curator;
    private final Path barrierPath;
    private final Path waiterNode;
    private final Clock clock;
    private final Duration waitForAll;

    CuratorCompletionWaiter(Curator curator, Path barrierPath, String serverId, Clock clock, Duration waitForAll) {
        this.waiterNode = barrierPath.append(serverId);
        this.curator = curator;
        this.barrierPath = barrierPath;
        this.clock = clock;
        this.waitForAll = waitForAll;
    }

    @Override
    public void awaitCompletion(Duration timeout) {
        List<String> respondents;
        try {
            log.log(Level.FINE, () -> "Synchronizing on barrier " + barrierPath);
            respondents = awaitInternal(timeout);
            log.log(Level.FINE, () -> "Done synchronizing on barrier " + barrierPath);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
        if (respondents.size() < barrierMemberCount()) {
            throw new CompletionTimeoutException("Timed out waiting for config servers to complete operation " +
                                                 "(waited for barrier " + barrierPath + ")." +
                                                 "Got response from " + respondents + ", but need response from " +
                                                 "at least " + barrierMemberCount() + " server(s). " +
                                                 "Timeout passed as argument was " + timeout.toMillis() + " ms");
        }
    }

    private List<String> awaitInternal(Duration timeout) throws Exception {
        Instant startTime = clock.instant();
        Instant endTime = startTime.plus(timeout);
        Instant gotQuorumTime = Instant.EPOCH;

        List<String> respondents;
        do {
            respondents = curator.framework().getChildren().forPath(barrierPath.getAbsolute());
            if (log.isLoggable(Level.FINER)) {
                log.log(Level.FINER, respondents.size() + "/" + curator.zooKeeperEnsembleCount() + " responded: " +
                                     respondents + ", all participants: " + curator.zooKeeperEnsembleConnectionSpec());
            }

            // If all config servers responded, return
            if (respondents.size() == curator.zooKeeperEnsembleCount()) {
                logBarrierCompleted(respondents, startTime);
                break;
            }
            // If some are missing, quorum is enough, but wait for all up to ´waitForAll´ seconds before returning
            if (respondents.size() >= barrierMemberCount()) {
                if (gotQuorumTime.isBefore(startTime))
                    gotQuorumTime = clock.instant();

                if (Duration.between(clock.instant(), gotQuorumTime.plus(waitForAll)).isNegative()) {
                    logBarrierCompleted(respondents, startTime);
                    break;
                }
            }

            Thread.sleep(100);
        } while (clock.instant().isBefore(endTime));

        return respondents;
    }

    private void logBarrierCompleted(List<String> respondents, Instant startTime) {
        Duration duration = Duration.between(startTime, Instant.now());
        Level level = duration.minus(Duration.ofSeconds(5)).isNegative() ? Level.FINE : Level.INFO;
        log.log(level, () -> barrierCompletedMessage(respondents, duration));
    }

    private String barrierCompletedMessage(List<String> respondents, Duration duration) {
        return barrierPath + " completed in " + duration.toString() +
                ", " + respondents.size() + "/" + curator.zooKeeperEnsembleCount() + " responded: " + respondents;
    }

    @Override
    public void notifyCompletion() {
        try {
            curator.framework().create().forPath(waiterNode.getAbsolute());
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public String toString() {
        return "'" + barrierPath + "', " + barrierMemberCount() + " members";
    }

    public static CompletionWaiter create(Curator curator, Path barrierPath, String id, Duration waitForAll) {
        return new CuratorCompletionWaiter(curator, barrierPath, id, Clock.systemUTC(), waitForAll);
    }

    public static CompletionWaiter createAndInitialize(Curator curator, Path barrierPath, String id, Duration waitForAll) {
        // Note: Should be done atomically, but unable to that when path may not exist before delete
        // and create should be able to create any missing parent paths
        curator.delete(barrierPath);
        curator.create(barrierPath);

        return new CuratorCompletionWaiter(curator, barrierPath, id, Clock.systemUTC(), waitForAll);
    }

    private int barrierMemberCount() {
        return (curator.zooKeeperEnsembleCount() / 2) + 1; // majority
    }

}