aboutsummaryrefslogtreecommitdiffstats
path: root/vespaclient-core/src/main/java/com/yahoo/feedhandler/VespaFeedHandler.java
blob: 4adfe3ebe1d4bb97da415d60fe3585224b2714f3 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.feedhandler;

import com.google.inject.Inject;
import com.yahoo.clientmetrics.RouteMetricSet;
import com.yahoo.cloud.config.ClusterListConfig;
import com.yahoo.cloud.config.SlobroksConfig;
import com.yahoo.container.jdisc.EmptyResponse;
import com.yahoo.container.jdisc.HttpRequest;
import com.yahoo.container.jdisc.HttpResponse;
import com.yahoo.container.protect.Error;
import com.yahoo.document.config.DocumentmanagerConfig;
import com.yahoo.feedapi.DocprocMessageProcessor;
import com.yahoo.feedapi.FeedContext;
import com.yahoo.feedapi.Feeder;
import com.yahoo.feedapi.JsonFeeder;
import com.yahoo.feedapi.MessagePropertyProcessor;
import com.yahoo.feedapi.SimpleFeedAccess;
import com.yahoo.feedapi.SingleSender;
import com.yahoo.feedapi.XMLFeeder;
import com.yahoo.jdisc.Metric;
import com.yahoo.vespa.config.content.LoadTypeConfig;
import com.yahoo.vespaclient.config.FeederConfig;

import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;


/**
 * Feed documents from a com.yahoo.container.handler.Request.
 *
 * @author Thomas Gundersen
 * @author steinar
 */
public final class VespaFeedHandler extends VespaFeedHandlerBase {

    public static final String JSON_INPUT = "jsonInput";

    private final AtomicInteger busyThreads = new AtomicInteger(0);
    private final int maxBusyThreads;

    @SuppressWarnings("unused")
    @Inject
    public VespaFeedHandler(FeederConfig feederConfig,
                            LoadTypeConfig loadTypeConfig,
                            DocumentmanagerConfig documentmanagerConfig,
                            SlobroksConfig slobroksConfig,
                            ClusterListConfig clusterListConfig,
                            Executor executor,
                            Metric metric)  {
        super(feederConfig, loadTypeConfig, documentmanagerConfig, slobroksConfig, clusterListConfig, executor, metric);
        this.maxBusyThreads = feederConfig.maxbusythreads();
    }

    VespaFeedHandler(FeedContext context, Executor executor) {
        super(context, executor);
        this.maxBusyThreads = 32;
    }

    public static VespaFeedHandler createFromContext(FeedContext context, Executor executor) {
        return new VespaFeedHandler(context, executor);
    }

    @Override
    public HttpResponse handle(HttpRequest request) {
        return handle(request, null, 1);
    }

    public HttpResponse handle(HttpRequest request, RouteMetricSet.ProgressCallback callback, int numThreads) {
        if (request.getProperty("status") != null) {
            return new MetricResponse(context.getMetrics().getMetricSet());
        }
        try {
            int busy = busyThreads.incrementAndGet();
            if (busy > maxBusyThreads)
                return new EmptyResponse(com.yahoo.jdisc.http.HttpResponse.Status.SERVICE_UNAVAILABLE);

            boolean asynchronous = request.getBooleanProperty("asynchronous");

            MessagePropertyProcessor.PropertySetter properties = getPropertyProcessor().buildPropertySetter(request);

            String route = properties.getRoute().toString();
            FeedResponse response = new FeedResponse(new RouteMetricSet(route, callback));

            SingleSender sender = new SingleSender(response, getSharedSender(route), !asynchronous);
            sender.addMessageProcessor(properties);
            sender.addMessageProcessor(new DocprocMessageProcessor(getDocprocChain(request), getDocprocServiceRegistry(request)));
            ThreadedFeedAccess feedAccess = new ThreadedFeedAccess(numThreads, sender);
            Feeder feeder = createFeeder(sender, request);
            feeder.setAbortOnDocumentError(properties.getAbortOnDocumentError());
            feeder.setCreateIfNonExistent(properties.getCreateIfNonExistent());
            response.setAbortOnFeedError(properties.getAbortOnFeedError());

            List<String> errors = feeder.parse();
            for (String s : errors) {
                response.addXMLParseError(s);
            }
            if (errors.size() > 0 && feeder instanceof XMLFeeder) {
                response.addXMLParseError("If you are trying to feed JSON, set the Content-Type header to application/json.");
            }

            sender.done();
            feedAccess.close();

            if (asynchronous) {
                return response;
            }
            long millis = getTimeoutMillis(request);
            boolean completed = sender.waitForPending(millis);
            if (!completed) {
                response.addError(Error.TIMEOUT, "Timed out after " + millis + " ms waiting for responses");
            }
            response.done();
            return response;
        } finally {
            busyThreads.decrementAndGet();
        }
    }

    private Feeder createFeeder(SimpleFeedAccess sender, HttpRequest request) {
        String contentType = request.getHeader("Content-Type");
        if (Boolean.valueOf(request.getProperty(JSON_INPUT)) || (contentType != null && contentType.startsWith("application/json"))) {
            return new JsonFeeder(getDocumentTypeManager(), sender, getRequestInputStream(request));
        } else {
            return new XMLFeeder(getDocumentTypeManager(), sender, getRequestInputStream(request));
        }
    }

}