aboutsummaryrefslogtreecommitdiffstats
path: root/docproc/src/main/java/com/yahoo/docproc/jdisc/messagebus/MbusRequestContext.java
blob: 0f3f39387012ce7a1d1e7c8f68c832214fc5a4c6 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.docproc.jdisc.messagebus;

import com.yahoo.component.provider.ComponentRegistry;
import com.yahoo.concurrent.CopyOnWriteHashMap;
import com.yahoo.container.core.document.ContainerDocumentConfig;
import com.yahoo.docproc.AbstractConcreteDocumentFactory;
import com.yahoo.docproc.DocprocService;
import com.yahoo.docproc.HandledProcessingException;
import com.yahoo.docproc.Processing;
import com.yahoo.docproc.TransientFailureException;
import com.yahoo.docproc.jdisc.RequestContext;
import com.yahoo.document.DocumentOperation;
import com.yahoo.documentapi.messagebus.protocol.DocumentMessage;
import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol;
import com.yahoo.jdisc.Request;
import com.yahoo.jdisc.Response;
import com.yahoo.jdisc.handler.ContentChannel;
import com.yahoo.jdisc.handler.RequestDispatch;
import com.yahoo.jdisc.handler.ResponseDispatch;
import com.yahoo.jdisc.handler.ResponseHandler;
import com.yahoo.log.LogLevel;
import com.yahoo.messagebus.Message;
import com.yahoo.messagebus.Reply;
import com.yahoo.messagebus.jdisc.MbusRequest;
import com.yahoo.messagebus.jdisc.MbusResponse;
import com.yahoo.messagebus.routing.Route;

import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Logger;

/**
 * @author Einar M R Rosenvinge
 */
public class MbusRequestContext implements RequestContext, ResponseHandler {

    private final static Logger log = Logger.getLogger(MbusRequestContext.class.getName());
    private final static CopyOnWriteHashMap<String, URI> uriCache = new CopyOnWriteHashMap<>();
    private final AtomicBoolean deserialized = new AtomicBoolean(false);
    private final AtomicBoolean responded = new AtomicBoolean(false);
    private final ProcessingFactory processingFactory;
    private final MessageFactory messageFactory;
    private final MbusRequest request;
    private final DocumentMessage requestMsg;
    private final ResponseHandler responseHandler;
    private volatile int cachedApproxSize;
    // When spawning off new documents inside document processor, we do not want
    // throttling since this can lead to live locks. This is because the
    // document being processed is a resource and is then grabbing more resources of
    // the same type without releasing its own resources.
    public final static String internalNoThrottledSource = "internalNoThrottledSource";

    public MbusRequestContext(MbusRequest request, ResponseHandler responseHandler,
                              ComponentRegistry<DocprocService> docprocServiceComponentRegistry,
                              ComponentRegistry<AbstractConcreteDocumentFactory> docFactoryRegistry,
                              ContainerDocumentConfig containerDocConfig) {
        this.request = request;
        this.requestMsg = (DocumentMessage)request.getMessage();
        this.responseHandler = responseHandler;
        this.processingFactory = new ProcessingFactory(docprocServiceComponentRegistry, docFactoryRegistry,
                                                       containerDocConfig, getServiceName());
        this.messageFactory = newMessageFactory(requestMsg);
    }

    @Override
    public List<Processing> getProcessings() {
        if (deserialized.getAndSet(true)) {
            return Collections.emptyList();
        }
        return processingFactory.fromMessage(requestMsg);
    }

    @Override
    public void skip() {
        if (deserialized.get()) {
            throw new IllegalStateException("Can not skip processing after deserialization.");
        }
        dispatchRequest(requestMsg, request.getUri().getPath(), responseHandler);
    }

    @Override
    public void processingDone(List<Processing> processings) {
        List<DocumentMessage> messages = new ArrayList<>();
        if (messageFactory != null) {
            for (Processing processing : processings) {
                for (DocumentOperation documentOperation : processing.getDocumentOperations()) {
                    messages.add(messageFactory.fromDocumentOperation(processing, documentOperation));
                }
            }
        }
        if (log.isLoggable(LogLevel.DEBUG)) {
            log.log(LogLevel.DEBUG, "Forwarding " + messages.size() + " messages from " + processings.size() +
                                    " processings.");
        }
        if (messages.isEmpty()) {
            dispatchResponse(Response.Status.OK);
            return;
        }
        long inputSequenceId = requestMsg.getSequenceId();
        ResponseMerger responseHandler = new ResponseMerger(requestMsg, messages.size(), this);
        for (Message message : messages) {
            // See comment for internalNoThrottledSource.
            dispatchRequest(message, (inputSequenceId == message.getSequenceId())
                            ? getUri().getPath()
                            : "/" + internalNoThrottledSource,
                            responseHandler);
        }
    }

    @Override
    public void processingFailed(Exception exception) {
        ErrorCode errorCode;
        if (exception instanceof TransientFailureException) {
            errorCode = ErrorCode.ERROR_ABORTED;
        } else {
            errorCode = ErrorCode.ERROR_PROCESSING_FAILURE;
        }
        StringBuilder errorMsg = new StringBuilder("Processing failed.");
        if (exception instanceof HandledProcessingException) {
            errorMsg.append(" Error message: ").append(exception.getMessage());
        } else if (exception != null) {
            errorMsg.append(" Error message: ").append(exception.toString());
        }
        errorMsg.append(" -- See Vespa log for details.");
        processingFailed(errorCode, errorMsg.toString());
    }

    @Override
    public void processingFailed(ErrorCode errorCode, String errorMsg) {
        MbusResponse response = new MbusResponse(errorCode.getDiscStatus(), requestMsg.createReply());
        response.getReply().addError(new com.yahoo.messagebus.Error(errorCode.getDocumentProtocolStatus(), errorMsg));
        ResponseDispatch.newInstance(response).dispatch(this);
    }

    @Override
    public int getApproxSize() {
        if (cachedApproxSize > 0) {
            return cachedApproxSize;
        }
        cachedApproxSize = requestMsg.getApproxSize();
        return cachedApproxSize;
    }

    @Override
    public int getPriority() {
        return requestMsg.getPriority().getValue();
    }

    @Override
    public URI getUri() {
        return request.getUri();
    }

    @Override
    public String getServiceName() {
        String path = getUri().getPath();
        return path.substring(7, path.length());
    }

    @Override
    public boolean isProcessable() {
        Message msg = requestMsg;
        switch (msg.getType()) {
        case DocumentProtocol.MESSAGE_PUTDOCUMENT:
        case DocumentProtocol.MESSAGE_UPDATEDOCUMENT:
        case DocumentProtocol.MESSAGE_REMOVEDOCUMENT:
        case DocumentProtocol.MESSAGE_BATCHDOCUMENTUPDATE:
            return true;
        }
        return false;
    }

    @Override
    public boolean hasExpired() {
        return requestMsg.isExpired();
    }

    @Override
    public ContentChannel handleResponse(Response response) {
        if (responded.getAndSet(true)) {
            return null;
        }
        Reply reply = ((MbusResponse)response).getReply();
        reply.swapState(requestMsg);
        return responseHandler.handleResponse(response);
    }

    private void dispatchResponse(int status) {
        ResponseDispatch.newInstance(new MbusResponse(status, requestMsg.createReply())).dispatch(this);
    }

    private void dispatchRequest(final Message msg, final String uriPath, final ResponseHandler handler) {
        try {
            new RequestDispatch() {

                @Override
                protected Request newRequest() {
                    return new MbusRequest(request, resolveUri(uriPath), msg);
                }

                @Override
                public ContentChannel handleResponse(Response response) {
                    return handler.handleResponse(response);
                }
            }.dispatch();
        } catch (Exception e) {
            dispatchResponse(Response.Status.INTERNAL_SERVER_ERROR);
            e.printStackTrace();
        }
    }

    private static MessageFactory newMessageFactory(final DocumentMessage msg) {
        if (msg == null) {
            return null;
        }
        final Route route = msg.getRoute();
        if (route == null || !route.hasHops()) {
            return null;
        }
        return new MessageFactory(msg);
    }

    private static URI resolveUri(String path) {
        URI uri = uriCache.get(path);
        if (uri == null) {
            uri = URI.create("mbus://remotehost" + path);
            uriCache.put(path, uri);
        }
        return uri;
    }
}