aboutsummaryrefslogtreecommitdiffstats
path: root/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedHandlerV3.java
blob: 28bcb50a1449b07af5a0fec0384fb47b99debb38 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.vespa.http.server;

import com.yahoo.concurrent.ThreadFactoryFactory;
import com.yahoo.container.handler.ThreadpoolConfig;
import com.yahoo.container.jdisc.HttpRequest;
import com.yahoo.container.jdisc.HttpResponse;
import com.yahoo.container.jdisc.LoggingRequestHandler;
import com.yahoo.container.jdisc.messagebus.SessionCache;
import com.yahoo.container.logging.AccessLog;
import com.yahoo.document.DocumentTypeManager;
import com.yahoo.document.config.DocumentmanagerConfig;
import com.yahoo.documentapi.metrics.DocumentApiMetrics;
import com.yahoo.jdisc.Metric;
import com.yahoo.jdisc.ReferencedResource;
import com.yahoo.log.LogLevel;
import com.yahoo.messagebus.ReplyHandler;
import com.yahoo.messagebus.SourceSessionParams;
import com.yahoo.messagebus.shared.SharedSourceSession;
import com.yahoo.vespa.http.client.core.Headers;
import com.yahoo.yolean.Exceptions;

import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Logger;

/**
 * This code is based on v2 code, however, in v3, one client has one ClientFeederV3 shared between all client threads.
 * The new API has more logic for shutting down cleanly as the server is more likely to be upgraded.
 * The code is restructured a bit.
 * 
 * @author dybis
 */
public class FeedHandlerV3 extends LoggingRequestHandler {

    private DocumentTypeManager docTypeManager;
    private final Map<String, ClientFeederV3> clientFeederByClientId = new HashMap<>();
    private final ScheduledThreadPoolExecutor cron;
    private final SessionCache sessionCache;
    protected final ReplyHandler feedReplyHandler;
    private final Metric metric;
    private final Object monitor = new Object();
    private final AtomicInteger threadsAvailableForFeeding;
    private static final Logger log = Logger.getLogger(FeedHandlerV3.class.getName());

    public FeedHandlerV3(
            Executor executor,
            DocumentmanagerConfig documentManagerConfig,
            SessionCache sessionCache,
            Metric metric,
            AccessLog accessLog,
            ThreadpoolConfig threadpoolConfig,
            DocumentApiMetrics metricsHelper) throws Exception {
        super(executor, accessLog, metric);
        docTypeManager = new DocumentTypeManager(documentManagerConfig);
        this.sessionCache = sessionCache;
        feedReplyHandler = new FeedReplyReader(metric, metricsHelper);
        cron = new ScheduledThreadPoolExecutor(1, ThreadFactoryFactory.getThreadFactory("feedhandlerv3.cron"));
        cron.scheduleWithFixedDelay(this::removeOldClients, 16, 11, TimeUnit.MINUTES);
        this.metric = metric;
        // 40% of the threads can be blocking on feeding before we deny requests.
        if (threadpoolConfig != null) {
            threadsAvailableForFeeding = new AtomicInteger(Math.max((int) (0.4 * threadpoolConfig.maxthreads()), 1));
        } else {
            log.warning("No config for threadpool, using 200 for max blocking threads for feeding.");
            threadsAvailableForFeeding = new AtomicInteger(200);
        }
    }

    public void injectDocumentManangerForTests(DocumentTypeManager docTypeManager) {
        this.docTypeManager = docTypeManager;
    }

    // TODO: If this is set up to run without first invoking the old FeedHandler code, we should
    // verify the version header first. This is done in the old code.
    @Override
    public HttpResponse handle(HttpRequest request) {
        final String clientId = clientId(request);
        final ClientFeederV3 clientFeederV3;
        synchronized (monitor) {
            if (! clientFeederByClientId.containsKey(clientId)) {
                SourceSessionParams sourceSessionParams = sourceSessionParams(request);
                clientFeederByClientId.put(
                        clientId,
                        new ClientFeederV3(
                                retainSource(sessionCache, sourceSessionParams),
                                new FeedReaderFactory(),
                                docTypeManager,
                                clientId,
                                metric,
                                feedReplyHandler,
                                threadsAvailableForFeeding));
            }
            clientFeederV3 = clientFeederByClientId.get(clientId);
        }
        try {
            return clientFeederV3.handleRequest(request);
        } catch (UnknownClientException uce) {
            String msg = Exceptions.toMessageString(uce);
            log.log(LogLevel.WARNING, msg);
            return new ErrorHttpResponse(com.yahoo.jdisc.http.HttpResponse.Status.BAD_REQUEST, msg);
        } catch (Exception e) {
            String msg = "Could not initialize document parsing: " + Exceptions.toMessageString(e);
            log.log(LogLevel.WARNING, msg);
            return new ErrorHttpResponse(com.yahoo.jdisc.http.HttpResponse.Status.INTERNAL_SERVER_ERROR, msg);
        }
    }

    // SessionCache is final and no easy way to mock it so we need this to be able to do testing.
    protected ReferencedResource<SharedSourceSession> retainSource(SessionCache sessionCache, SourceSessionParams params) {
        return sessionCache.retainSource(params);
    }

    @Override
    protected void destroy() {
        // We are forking this to avoid that accidental dereferrencing causes any random thread doing destruction.
        // This caused a deadlock when the single Messenger thread in MessageBus was the last one referring this
        // and started destructing something that required something only the messenger thread could provide.
        Thread destroyer = new Thread(() -> {
            super.destroy();
            cron.shutdown();
            synchronized (monitor) {
                for (ClientFeederV3 client : clientFeederByClientId.values()) {
                    client.kill();
                }
                clientFeederByClientId.clear();
            }
        });
        destroyer.setDaemon(true);
        destroyer.start();
    }

    private String clientId(HttpRequest request) {
        String clientDictatedId = request.getHeader(Headers.CLIENT_ID);
        if (clientDictatedId == null ||  clientDictatedId.isEmpty()) {
            throw new IllegalArgumentException("Did not get any CLIENT_ID header (" + Headers.CLIENT_ID + ")");
        }
        return clientDictatedId;
    }

    private SourceSessionParams sourceSessionParams(HttpRequest request) {
        SourceSessionParams params = new SourceSessionParams();
        String timeout = request.getHeader(Headers.TIMEOUT);

        if (timeout != null) {
            try {
                params.setTimeout(Double.parseDouble(timeout));
            } catch (NumberFormatException e) {
                // NOP
            }
        }
        return params;
    }

    private void removeOldClients() {
        synchronized (monitor) {
            for (Iterator<Map.Entry<String, ClientFeederV3>> iterator = clientFeederByClientId
                    .entrySet().iterator(); iterator.hasNext();) {
                ClientFeederV3 client = iterator.next().getValue();
                if (client.timedOut()) {
                    client.kill();
                    iterator.remove();
                }
            }
        }
    }

}