aboutsummaryrefslogtreecommitdiffstats
path: root/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/dns/NameServiceQueue.java
blob: 033a019f35f8385e9a91ad969680db82c3785af3 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.vespa.hosted.controller.dns;

import com.yahoo.vespa.hosted.controller.api.integration.dns.NameService;
import com.yahoo.vespa.hosted.controller.application.TenantAndApplicationId;
import com.yahoo.yolean.Exceptions;

import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.function.UnaryOperator;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
 * A queue of outstanding {@link NameServiceRequest}s. Requests in this have not yet been dispatched to a
 * {@link NameService} and are thus not visible in DNS.
 *
 * This is immutable.
 *
 * @author mpolden
 * @author jonmv
 */
public record NameServiceQueue(List<NameServiceRequest> requests) {

    public static final NameServiceQueue EMPTY = new NameServiceQueue(List.of());

    /**
     * The number of {@link NameServiceRequest}s we allow to be queued. When the queue overflows, failing requests
     * are dropped in a FIFO order until the queue shrinks below this capacity. If that is not enough, the oldest
     * requests will also be dropped, as needed.
     */
    static final int QUEUE_CAPACITY = 400;

    private static final Logger log = Logger.getLogger(NameServiceQueue.class.getName());

    /** DO NOT USE. Public for serialization purposes */
    public NameServiceQueue(List<NameServiceRequest> requests) {
        this.requests = List.copyOf(Objects.requireNonNull(requests, "requests must be non-null"));
    }

    /** Returns a copy of this containing the last n requests */
    public NameServiceQueue last(int n) {
        return resize(n, (requests) -> requests.subList(requests.size() - n, requests.size()));
    }

    /** Returns a copy of this containing the first n requests */
    public NameServiceQueue first(int n) {
        return resize(n, (requests) -> requests.subList(0, n));
    }

    /** Returns a copy of this with given request queued according to priority */
    public NameServiceQueue with(NameServiceRequest request, Priority priority) {
        List<NameServiceRequest> copy = new ArrayList<>(this.requests.size() + 1);
        switch (priority) {
            case normal -> {
                copy.addAll(this.requests);
                copy.add(request);
            }
            case high -> {
                copy.add(request);
                copy.addAll(this.requests);
            }
        }
        return new NameServiceQueue(copy);
    }

    /** Returns a copy of this without the requests present in other. Duplicates are not removed */
    public NameServiceQueue without(NameServiceQueue other) {
        List<NameServiceRequest> toRemove = new ArrayList<>(other.requests);
        return new NameServiceQueue(requests.stream()
                                            .filter(request -> !toRemove.remove(request))
                                            .toList());
    }

    /** Returns a copy of this with given request added */
    public NameServiceQueue with(NameServiceRequest request) {
        return with(request, Priority.normal);
    }

    /**
     * Dispatch n requests from the head of this to given name service. Requests may be re-ordered if errors are
     * encountered, but are always dispatched in order within an application.
     *
     * @return A copy of this, without the successfully dispatched requests.
     */
    public NameServiceQueue dispatchTo(NameService nameService, int n) {
        requireNonNegative(n);
        if (requests.isEmpty()) return this;

        LinkedList<NameServiceRequest> pending = new LinkedList<>(requests);
        while (n-- > 0 && ! pending.isEmpty()) {
            NameServiceRequest request = pending.poll();
            try {
                request.dispatchTo(nameService);
            } catch (Exception e) {
                boolean dropFailingRequest = pending.size() > QUEUE_CAPACITY;
                log.log(Level.WARNING, "Failed to execute " + request + ": " + Exceptions.toMessageString(e) +
                                       ", request will " + (dropFailingRequest ? "be dropped, as queue is over capacity"
                                                                               : "be moved backwards, and retried"));
                if (dropFailingRequest) continue; // Drop this request and keep dispatching others

                // Move all requests with the same owner backwards as far as we can, i.e., to the back, or to the first owner-less request.
                Optional<TenantAndApplicationId> owner = request.owner();
                LinkedList<NameServiceRequest> owned = new LinkedList<>();
                LinkedList<NameServiceRequest> others = new LinkedList<>();
                do {
                    if (request.owner().isEmpty()) {
                        pending.push(request);
                        break;  // Can't modify anything past this, as owner-less requests must come in order with all others.
                    }
                    (request.owner().equals(owner) ? owned : others).offer(request);
                } while ((request = pending.poll()) != null);
                pending.addAll(0, owned);   // Append owned requests before those we can't modify (or none), and
                pending.addAll(0, others);  // then append requests owned by others before that again.
            }
        }

        NameServiceQueue remaining = new NameServiceQueue(pending);
        if (pending.size() > 2 * QUEUE_CAPACITY) {
            log.log(Level.WARNING, "Queue has " + pending.size() + " entries, and must be emptying far too slowly; " +
                                   "dropping the oldest entries past " + 2 * QUEUE_CAPACITY);
            remaining = remaining.last(2 * QUEUE_CAPACITY);
        }
        return remaining;
    }

    @Override
    public String toString() {
        return requests.toString();
    }

    private NameServiceQueue resize(int n, UnaryOperator<List<NameServiceRequest>> resizer) {
        requireNonNegative(n);
        if (requests.size() <= n) return this;
        return new NameServiceQueue(resizer.apply(requests));
    }

    private static void requireNonNegative(int n) {
        if (n < 0) throw new IllegalArgumentException("n must be >= 0, got " + n);
    }

    /**
     * Replaces the requests in {@code oldQueue} contained in this with requests in {@code newQueue}, or best effort
     * amendment when not contained.
     */
    public NameServiceQueue replace(NameServiceQueue oldQueue, NameServiceQueue newQueue) {
        int sublistIndex = indexOf(oldQueue.requests, requests);
        if (sublistIndex >= 0) {
            List<NameServiceRequest> updated = new ArrayList<>();
            updated.addAll(requests.subList(0, sublistIndex));
            updated.addAll(newQueue.requests);
            updated.addAll(requests.subList(sublistIndex + oldQueue.requests.size(), requests.size()));
            return new NameServiceQueue(updated);
        } else {
            log.log(Level.WARNING, "Name service queue has changed unexpectedly; expected requests: " +
                                   oldQueue.requests + " to be present, but that was not found in: " + requests);
            // Do a best-effort amendment, where requests removed from initial to remaining, are removed, from the front, from this.
            return without(oldQueue.without(newQueue));
        }
    }

    /**
     * Find the starting index of subList in list. I.e. the lowest index {@code i} in {@code list} so that
     * {@code list.subList(i, i + subList.size()).equals(subList)}. Naïve implementation.
     */
    private static <T> int indexOf(List<T> subList, List<T> list) {
        for (int i = 0; i + subList.size() <= list.size(); i++) {
            if (list.subList(i, i + subList.size()).equals(subList)) {
                return i;
            }
        }
        return -1;
    }

    /** Priority of a request added to this */
    public enum Priority {

        /** Default priority. Request will be delivered in FIFO order */
        normal,

        /** Request is queued first. Useful for code that needs to act on effects of a request */
        high

    }

}