aboutsummaryrefslogtreecommitdiffstats
path: root/docproc/src/main/java/com/yahoo/docproc/jdisc/messagebus/MbusRequestContext.java
blob: e1b9670dea8c3f311cd2127ebe1d56e1e9c42368 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.docproc.jdisc.messagebus;

import com.yahoo.component.provider.ComponentRegistry;
import com.yahoo.concurrent.CopyOnWriteHashMap;
import com.yahoo.container.core.document.ContainerDocumentConfig;
import com.yahoo.docproc.AbstractConcreteDocumentFactory;
import com.yahoo.docproc.impl.DocprocService;
import com.yahoo.docproc.impl.HandledProcessingException;
import com.yahoo.docproc.Processing;
import com.yahoo.docproc.impl.TransientFailureException;
import com.yahoo.docproc.jdisc.RequestContext;
import com.yahoo.document.DocumentOperation;
import com.yahoo.documentapi.messagebus.protocol.DocumentMessage;
import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol;
import com.yahoo.jdisc.Request;
import com.yahoo.jdisc.Response;
import com.yahoo.jdisc.handler.ContentChannel;
import com.yahoo.jdisc.handler.RequestDispatch;
import com.yahoo.jdisc.handler.ResponseDispatch;
import com.yahoo.jdisc.handler.ResponseHandler;
import java.util.logging.Level;
import com.yahoo.messagebus.Message;
import com.yahoo.messagebus.Reply;
import com.yahoo.messagebus.jdisc.MbusRequest;
import com.yahoo.messagebus.jdisc.MbusResponse;

import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Logger;

import static java.util.logging.Level.WARNING;

/**
 * @author Einar M R Rosenvinge
 */
public class MbusRequestContext implements RequestContext, ResponseHandler {

    private final static Logger log = Logger.getLogger(MbusRequestContext.class.getName());
    private final static CopyOnWriteHashMap<String, URI> uriCache = new CopyOnWriteHashMap<>();
    private final AtomicBoolean deserialized = new AtomicBoolean(false);
    private final AtomicBoolean responded = new AtomicBoolean(false);
    private final ProcessingFactory processingFactory;
    private final MessageFactory messageFactory;
    private final MbusRequest request;
    private final DocumentMessage requestMsg;
    private final ResponseHandler responseHandler;
    // When spawning off new documents inside document processor, we do not want
    // throttling since this can lead to live locks. This is because the
    // document being processed is a resource and is then grabbing more resources of
    // the same type without releasing its own resources.
    public final static String internalNoThrottledSource = "internalNoThrottledSource";
    private final static String internalNoThrottledSourcePath = "/" + internalNoThrottledSource;

    public MbusRequestContext(MbusRequest request, ResponseHandler responseHandler,
                              ComponentRegistry<DocprocService> docprocServiceComponentRegistry,
                              ComponentRegistry<AbstractConcreteDocumentFactory> docFactoryRegistry,
                              ContainerDocumentConfig containerDocConfig) {
        this.request = request;
        this.requestMsg = (DocumentMessage)request.getMessage();
        this.responseHandler = responseHandler;
        this.processingFactory = new ProcessingFactory(docFactoryRegistry,
                                                       containerDocConfig, getServiceName());
        this.messageFactory = newMessageFactory(requestMsg);
    }

    @Override
    public List<Processing> getProcessings() {
        if (deserialized.getAndSet(true)) {
            return List.of();
        }
        return processingFactory.fromMessage(requestMsg);
    }

    @Override
    public void skip() {
        if (deserialized.get())
            throw new IllegalStateException("Can not skip processing after deserialization");
        dispatchRequest(requestMsg, getUri().getPath(), responseHandler);
    }

    @Override
    public void processingDone(List<Processing> processings) {
        List<DocumentMessage> messages = new ArrayList<>();
        if (messageFactory != null) {
            for (Processing processing : processings) {
                for (DocumentOperation documentOperation : processing.getDocumentOperations()) {
                    messages.add(messageFactory.fromDocumentOperation(processing, documentOperation));
                }
            }
        }
        log.log(Level.FINE, () ->"Forwarding " + messages.size() + " messages from " + processings.size() + " processings.");
        if (messages.isEmpty()) {
            dispatchResponse(Response.Status.OK);
            return;
        }
        long inputSequenceId = requestMsg.getSequenceId();
        ResponseMerger responseHandler = new ResponseMerger(requestMsg, messages.size(), this);
        int numMsgWithOriginalSequenceId = 0;
        for (Message message : messages) {
            if (message.getSequenceId() == inputSequenceId) numMsgWithOriginalSequenceId++;
        }
        for (Message message : messages) {
            String path = internalNoThrottledSourcePath;
            if ((numMsgWithOriginalSequenceId == 1) && (message.getSequenceId() == inputSequenceId))
                path = getUri().getPath();
            // See comment for internalNoThrottledSource
            dispatchRequest(message, path, responseHandler);
        }
    }

    @Override
    public void processingFailed(Exception exception) {
        ErrorCode errorCode;
        if (exception instanceof TransientFailureException) {
            errorCode = ErrorCode.ERROR_ABORTED;
        } else {
            errorCode = ErrorCode.ERROR_PROCESSING_FAILURE;
        }
        StringBuilder errorMsg = new StringBuilder("Processing failed.");
        if (exception instanceof HandledProcessingException) {
            errorMsg.append(" Error message: ").append(exception.getMessage());
        } else if (exception != null) {
            errorMsg.append(" Error message: ").append(exception.toString());
        }
        errorMsg.append(" -- See Vespa log for details.");
        processingFailed(errorCode, errorMsg.toString());
    }

    @Override
    public void processingFailed(ErrorCode errorCode, String errorMsg) {
        MbusResponse response = new MbusResponse(errorCode.getDiscStatus(), requestMsg.createReply());
        response.getReply().addError(new com.yahoo.messagebus.Error(errorCode.getDocumentProtocolStatus(), errorMsg));
        ResponseDispatch.newInstance(response).dispatch(this);
    }

    @Override
    public URI getUri() {
        return request.getUri();
    }

    @Override
    public String getServiceName() {
        String path = getUri().getPath();
        return path.substring(7);
    }

    @Override
    public boolean isProcessable() {
        return switch (requestMsg.getType()) {
            case DocumentProtocol.MESSAGE_PUTDOCUMENT,
                 DocumentProtocol.MESSAGE_UPDATEDOCUMENT,
                 DocumentProtocol.MESSAGE_REMOVEDOCUMENT -> true;
            default -> false;
        };
    }

    @Override
    public boolean hasExpired() {
        return requestMsg.isExpired();
    }

    @Override
    public ContentChannel handleResponse(Response response) {
        if (responded.getAndSet(true)) {
            return null;
        }
        Reply reply = ((MbusResponse)response).getReply();
        reply.swapState(requestMsg);
        return responseHandler.handleResponse(response);
    }

    private void dispatchResponse(int status) {
        ResponseDispatch.newInstance(new MbusResponse(status, requestMsg.createReply())).dispatch(this);
    }

    private void dispatchRequest(Message msg, String uriPath, ResponseHandler handler) {
        try {
            new RequestDispatch() {
                @Override
                protected Request newRequest() {
                    return new MbusRequest(request,
                                           uriCache.computeIfAbsent(uriPath, __ -> URI.create("mbus://remotehost" + uriPath)),
                                           msg);
                }
                @Override
                public ContentChannel handleResponse(Response response) {
                    return handler.handleResponse(response);
                }
            }.dispatch();
        } catch (Exception e) {
            dispatchResponse(Response.Status.INTERNAL_SERVER_ERROR);
            log.log(WARNING, "Failed to dispatch request", e);
        }
    }

    private static MessageFactory newMessageFactory(DocumentMessage message) {
        if (message == null) return null;
        if (message.getRoute() == null || ! message.getRoute().hasHops()) return null;
        return new MessageFactory(message);
    }

}