aboutsummaryrefslogtreecommitdiffstats
path: root/docproc/src/main/java/com/yahoo/docproc/Processing.java
blob: cc9d7b9e8b70d6cd8e594605f9290195d33655b8 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.docproc;

import com.yahoo.concurrent.SystemTimer;
import com.yahoo.docproc.impl.ProcessingAccess;
import com.yahoo.docproc.impl.ProcessingEndpoint;
import com.yahoo.document.DocumentOperation;

import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;

/**
 * A document processing. This contains the document(s) or document update(s) to process,
 * a map of processing context data and the processing instance to
 * invoke the next time any work needs to be done on this processing.
 *
 * @author bratseth
 */
public final class Processing extends ProcessingAccess {

    /** The name of the service which owns this processing. Null is the same as "default". */
    private String service = null;

    /** The processors to call the next work is done on this processing. */
    private CallStack callStack = null;

    /** The collection of documents or document updates processed by this. This is never null. */
    private final List<DocumentOperation> documentOperations;

    /**
     * Documents or document updates which should be added to <code>documents</code> before
     * the next access, or null if documents or document updates have never been added to
     * this processing.
     */
    private List<DocumentOperation> documentsToAdd = null;

    /** The processing context variables. */
    private Map<String, Object> context = null;

    /** The endpoint of this processing. */
    private ProcessingEndpoint endpoint = null;

    private boolean operationsGotten = false;

    private Instant expiresAt = Instant.MAX;

    /**
     * Create a Processing with no documents. Useful with DocprocService.process(Processing).
     * Note that the callstack is initially empty when using this constructor (but it is
     * set by DocprocService.process(Processing).)
     */
    public Processing() {
        this.documentOperations = new ArrayList<>(1);
    }

    /**
     * Create a Processing from the given document operation
     */
    public static Processing of(DocumentOperation documentOperation) {
        return new Processing(documentOperation);
    }

    private Processing(DocumentOperation documentOperation) {
        this();
        addDocumentOperation(documentOperation);
    }

    /**
     * Create a processing with one document. The given document put or document update will be the single
     * element in <code>documentOperations</code>.
     *
     * @param service           the unique name of the service processing this
     * @param documentOperation document operation (DocumentPut or DocumentUpdate)
     * @param callStack         the document processors to call in this processing.
     * @param endp              the endpoint of this processing
     */
    private Processing(String service, DocumentOperation documentOperation, CallStack callStack, ProcessingEndpoint endp) {
        this.service = service;
        this.documentOperations = new ArrayList<>(1);
        documentOperations.add(documentOperation);
        this.callStack = callStack;
        this.endpoint = endp;
    }

    /**
     * Create a processing with one document. The given document put or document update will be the single
     * element in <code>documentOperations</code>.
     *
     * @param service           the unique name of the service processing this
     * @param documentOperation document operation (DocumentPut or DocumentUpdate)
     * @param callStack         the document processors to call in this processing.
     *                          This <b>tranfers ownership</b> of this structure
     *                          to this class. The caller <i>must not</i> modify it
     */
    public Processing(String service, DocumentOperation documentOperation, CallStack callStack) {
        this(service, documentOperation, callStack, null);
    }

    @SuppressWarnings({"unused"})
    private Processing(String service, List<DocumentOperation> documentOpsAndUpdates, CallStack callStack, ProcessingEndpoint endp, boolean unused) {
        this.service = service;
        this.documentOperations = new ArrayList<>(documentOpsAndUpdates.size());
        documentOperations.addAll(documentOpsAndUpdates);
        this.callStack = callStack;
        this.endpoint = endp;
    }

    /**
     * Creates a Processing from a list of operations.
     *
     * @param service               the unique name of the service processing this
     * @param documentsAndUpdates   the document operation list. This <b>transfers ownership</b> of this list
     *                              to this class. The caller <i>must not</i> modify it
     * @param callStack             the document processors to call in this processing.
     *                              This <b>transfers ownership</b> of this structure
     *                              to this class. The caller <i>must not</i> modify it
     */
    public static Processing createProcessingFromDocumentOperations(String service,
                                                                    List<DocumentOperation> documentsAndUpdates,
                                                                    CallStack callStack) {
        return new Processing(service, documentsAndUpdates, callStack, null, false);
    }

    /** Returns the name of the service processing this. This will never return null */
    public String getServiceName() {
        if (service == null) return "default";
        return service;
    }

    /** Sets the name of the service processing this. */
    public void setServiceName(String service) {
        this.service = service;
    }

    /** Returns a context variable, or null if it is not set */
    public Object getVariable(String name) {
        if (context == null) return null;
        return context.get(name);
    }

    /**
     * Returns an iterator of all context variables that are set
     *
     * @return an iterator over objects of type Map.Entry
     */
    public Iterator<Map.Entry<String, Object>> getVariableAndNameIterator() {
        if (context == null) context = new HashMap<>();
        return context.entrySet().iterator();
    }

    /** Clears all context variables that have been set */
    public void clearVariables() {
        if (context == null) return;
        context.clear();
    }

    /** Sets a context variable. */
    public void setVariable(String name, Object value) {
        if (context == null) context = new java.util.HashMap<>();
        context.put(name, value);
    }

    public Object removeVariable(String name) {
        if (context == null) return null;
        return context.remove(name);
    }

    /** Returns true if this variable is present, even if it is null */
    public boolean hasVariable(String name) {
        return context != null && context.containsKey(name);
    }

    @Override
    protected ProcessingEndpoint getEndpoint() {
        return endpoint;
    }

    @Override
    protected void setEndpoint(ProcessingEndpoint endpoint) {
        this.endpoint = endpoint;
    }

    public void addDocumentOperation(DocumentOperation documentOperation) {
        if (documentsToAdd == null) documentsToAdd = new ArrayList<>(1);
        documentsToAdd.add(documentOperation);
    }

    private void updateDocumentOperations() {
        if (documentsToAdd != null) {
            documentOperations.addAll(documentsToAdd);
            documentsToAdd.clear();
        }
    }

    /**
     * Returns the operations in this processing.
     * This can be mutated to add or remove operations to be performed.
     */
    public List<DocumentOperation> getDocumentOperations() {
        updateDocumentOperations();
        return documentOperations;
    }

    /** Returns the processors to call in this processing */
    public CallStack callStack() {
        return callStack;
    }

    @Override
    protected void setCallStack(CallStack callStack) {
        this.callStack = callStack;
    }

    @Override
    protected List<DocumentOperation> getOnceOperationsToBeProcessed() {
        if (operationsGotten)
            return Collections.emptyList();

        operationsGotten = true;
        return getDocumentOperations();
    }

    public void setExpiresAt(Instant i) { this.expiresAt = i; }

    public static final Duration NO_TIMEOUT = Duration.ofDays(10);
    /** @return time left or {@link #NO_TIMEOUT} if processing has no timeout */
    public Duration timeLeft() {
        if (expiresAt == Instant.MAX) return NO_TIMEOUT;
        Instant now = SystemTimer.INSTANCE.instant();
        if (now.isAfter(expiresAt)) return Duration.ZERO;
        return Duration.between(now, expiresAt);
    }

    @Override
    public String toString() {
        String previousCall = "";
        if (callStack != null) {
            Call call = callStack.getLastPopped();
            if (call != null) {
                previousCall = "Last call: " + call;
            }
        }
        if (documentOperations.size() == 1) {
            return "Processing of " + documentOperations.get(0) + ". " + previousCall;
        } else {
            String listString = documentOperations.toString();
            if (listString.length() > 100) {
                listString = listString.substring(0, 99);
                listString += "...]";
            }

            return "Processing of " + listString + ". " + previousCall;
        }
    }

}