summaryrefslogtreecommitdiffstats
path: root/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/DocumentOperationExecutor.java
blob: d56c8c1524e756ea2610f9821c09611fb6f7d5bb (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.document.restapi;

import com.yahoo.document.Document;
import com.yahoo.document.DocumentId;
import com.yahoo.document.DocumentPut;
import com.yahoo.document.DocumentUpdate;
import com.yahoo.document.FixedBucketSpaces;
import com.yahoo.document.fieldset.AllFields;
import com.yahoo.documentapi.DocumentOperationParameters;
import com.yahoo.documentapi.ProgressToken;
import com.yahoo.documentapi.VisitorParameters;
import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol;
import com.yahoo.messagebus.StaticThrottlePolicy;
import com.yahoo.text.Text;

import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.StringJoiner;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.stream.Stream;

/**
 * Wraps the document API with an executor that can retry and time out document operations,
 * as well as compute the required visitor parameters for visitor sessions.
 *
 * @author jonmv
 */
public interface DocumentOperationExecutor {

    default void shutdown() { }

    void get(DocumentId id, DocumentOperationParameters parameters, OperationContext context);

    void put(DocumentPut put, DocumentOperationParameters parameters, OperationContext context);

    void update(DocumentUpdate update, DocumentOperationParameters parameters, OperationContext context);

    void remove(DocumentId id, DocumentOperationParameters parameters, OperationContext context);

    void visit(VisitorOptions options, VisitOperationsContext context);

    String routeToCluster(String cluster);

    enum ErrorType {
        OVERLOAD,
        NOT_FOUND,
        PRECONDITION_FAILED,
        BAD_REQUEST,
        ERROR,
        TIMEOUT,
        INSUFFICIENT_STORAGE;
    }


    /** The executor will call <em>exactly one</em> callback <em>exactly once</em> for contexts submitted to it. */
    class Context<T> {

        private final AtomicBoolean handled = new AtomicBoolean();
        private final BiConsumer<ErrorType, String> onError;
        private final Consumer<T> onSuccess;

        Context(BiConsumer<ErrorType, String> onError, Consumer<T> onSuccess) {
            this.onError = onError;
            this.onSuccess = onSuccess;
        }

        public void error(ErrorType type, String message) {
            if ( ! handled.getAndSet(true))
                onError.accept(type, message);
        }

        public void success(T result) {
            if ( ! handled.getAndSet(true))
                onSuccess.accept(result);
        }

        public boolean handled() {
            return handled.get();
        }

    }


    /** Context for reacting to the progress of a visitor session. Completion signalled by an optional progress token. */
    class VisitOperationsContext extends Context<Optional<String>> {

        private final Consumer<Document> onDocument;

        public VisitOperationsContext(BiConsumer<ErrorType, String> onError, Consumer<Optional<String>> onSuccess, Consumer<Document> onDocument) {
            super(onError, onSuccess);
            this.onDocument = onDocument;
        }

        public void document(Document document) {
            if ( ! handled())
                onDocument.accept(document);
        }

    }


    /** Context for a document operation. */
    class OperationContext extends Context<Optional<Document>> {

        public OperationContext(BiConsumer<ErrorType, String> onError, Consumer<Optional<Document>> onSuccess) {
            super(onError, onSuccess);
        }

    }


    class VisitorOptions {

        final Optional<String> cluster;
        final Optional<String> namespace;
        final Optional<String> documentType;
        final Optional<Group> group;
        final Optional<String> selection;
        final Optional<String> fieldSet;
        final Optional<String> continuation;
        final Optional<String> bucketSpace;
        final Optional<Integer> wantedDocumentCount;
        final Optional<Integer> concurrency;

        private VisitorOptions(Optional<String> cluster, Optional<String> documentType, Optional<String> namespace,
                               Optional<Group> group, Optional<String> selection, Optional<String> fieldSet,
                               Optional<String> continuation, Optional<String> bucketSpace,
                               Optional<Integer> wantedDocumentCount, Optional<Integer> concurrency) {
            this.cluster = cluster;
            this.namespace = namespace;
            this.documentType = documentType;
            this.group = group;
            this.selection = selection;
            this.fieldSet = fieldSet;
            this.continuation = continuation;
            this.bucketSpace = bucketSpace;
            this.wantedDocumentCount = wantedDocumentCount;
            this.concurrency = concurrency;
        }

        @Override
        public boolean equals(Object o) {
            if (this == o) return true;
            if (o == null || getClass() != o.getClass()) return false;
            VisitorOptions that = (VisitorOptions) o;
            return cluster.equals(that.cluster) &&
                   namespace.equals(that.namespace) &&
                   documentType.equals(that.documentType) &&
                   group.equals(that.group) &&
                   selection.equals(that.selection) &&
                   fieldSet.equals(that.fieldSet) &&
                   continuation.equals(that.continuation) &&
                   bucketSpace.equals(that.bucketSpace) &&
                   wantedDocumentCount.equals(that.wantedDocumentCount) &&
                   concurrency.equals(that.concurrency);
        }

        @Override
        public int hashCode() {
            return Objects.hash(cluster, namespace, documentType, group, selection, fieldSet, continuation, bucketSpace, wantedDocumentCount, concurrency);
        }

        @Override
        public String toString() {
            return "VisitorOptions{" +
                   "cluster=" + cluster +
                   ", namespace=" + namespace +
                   ", documentType=" + documentType +
                   ", group=" + group +
                   ", selection=" + selection +
                   ", fieldSet=" + fieldSet +
                   ", continuation=" + continuation +
                   ", bucketSpace=" + bucketSpace +
                   ", wantedDocumentCount=" + wantedDocumentCount +
                   ", concurrency=" + concurrency +
                   '}';
        }

        public static Builder builder() { return new Builder(); }


        public static class Builder {

            private String cluster;
            private String documentType;
            private String namespace;
            private Group group;
            private String selection;
            private String fieldSet;
            private String continuation;
            private String bucketSpace;
            private Integer wantedDocumentCount;
            private Integer concurrency;

            public Builder cluster(String cluster) {
                this.cluster = cluster;
                return this;
            }

            public Builder documentType(String documentType) {
                this.documentType = documentType;
                return this;
            }

            public Builder namespace(String namespace) {
                this.namespace = namespace;
                return this;
            }

            public Builder group(Group group) {
                this.group = group;
                return this;
            }

            public Builder selection(String selection) {
                this.selection = selection;
                return this;
            }

            public Builder fieldSet(String fieldSet) {
                this.fieldSet = fieldSet;
                return this;
            }

            public Builder continuation(String continuation) {
                this.continuation = continuation;
                return this;
            }

            public Builder bucketSpace(String bucketSpace) {
                this.bucketSpace = bucketSpace;
                return this;
            }

            public Builder wantedDocumentCount(Integer wantedDocumentCount) {
                this.wantedDocumentCount = wantedDocumentCount;
                return this;
            }

            public Builder concurrency(Integer concurrency) {
                this.concurrency = concurrency;
                return this;
            }

            public VisitorOptions build() {
                return new VisitorOptions(Optional.ofNullable(cluster), Optional.ofNullable(documentType),
                                          Optional.ofNullable(namespace), Optional.ofNullable(group),
                                          Optional.ofNullable(selection), Optional.ofNullable(fieldSet),
                                          Optional.ofNullable(continuation), Optional.ofNullable(bucketSpace),
                                          Optional.ofNullable(wantedDocumentCount), Optional.ofNullable(concurrency));
            }

        }

    }


    class Group {

        private final String value;
        private final String docIdPart;
        private final String selection;

        private Group(String value, String docIdPart, String selection) {
            Text.validateTextString(value)
                .ifPresent(codePoint -> { throw new IllegalArgumentException(String.format("Illegal code point U%04X in group", codePoint)); });
            this.value = value;
            this.docIdPart = docIdPart;
            this.selection = selection;
        }

        public static Group of(long value) { return new Group(Long.toString(value), "n=" + value, "id.user==" + value); }
        public static Group of(String value) { return new Group(value, "g=" + value, "id.group=='" + value.replaceAll("'", "\\'") + "'"); }

        public String value() { return value; }
        public String docIdPart() { return docIdPart; }
        public String selection() { return selection; }

        @Override
        public boolean equals(Object o) {
            if (this == o) return true;
            if (o == null || getClass() != o.getClass()) return false;
            Group group = (Group) o;
            return value.equals(group.value) &&
                   docIdPart.equals(group.docIdPart) &&
                   selection.equals(group.selection);
        }

        @Override
        public int hashCode() {
            return Objects.hash(value, docIdPart, selection);
        }

        @Override
        public String toString() {
            return "Group{" +
                   "value='" + value + '\'' +
                   ", docIdPart='" + docIdPart + '\'' +
                   ", selection='" + selection + '\'' +
                   '}';
        }

    }

}