aboutsummaryrefslogtreecommitdiffstats
path: root/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedHandlerV3.java
blob: 4de3eebec2d8a23b4f76d13ea56d55d8965e3752 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.vespa.http.server;

import com.yahoo.concurrent.ThreadFactoryFactory;
import com.yahoo.container.jdisc.HttpRequest;
import com.yahoo.container.jdisc.HttpResponse;
import com.yahoo.container.jdisc.ThreadedHttpRequestHandler;
import com.yahoo.container.jdisc.messagebus.SessionCache;
import com.yahoo.document.DocumentTypeManager;
import com.yahoo.documentapi.metrics.DocumentApiMetrics;
import com.yahoo.jdisc.Metric;
import com.yahoo.jdisc.ReferencedResource;
import com.yahoo.messagebus.ReplyHandler;
import com.yahoo.messagebus.SourceSessionParams;
import com.yahoo.messagebus.shared.SharedSourceSession;
import com.yahoo.yolean.Exceptions;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
 * One client has one ClientFeederV3 shared between all client threads.
 * Contains logic for shutting down cleanly as the server is upgraded.
 *
 * @author dybis
 */
public class FeedHandlerV3 extends ThreadedHttpRequestHandler {

    private DocumentTypeManager docTypeManager;
    private final Map<String, ClientFeederV3> clientFeederByClientId = new HashMap<>();
    private final ScheduledThreadPoolExecutor cron;
    private final SessionCache sessionCache;
    protected final ReplyHandler feedReplyHandler;
    private final Metric metric;
    private final Object monitor = new Object();
    private static final Logger log = Logger.getLogger(FeedHandlerV3.class.getName());

    public FeedHandlerV3(Executor executor,
                         Metric metric,
                         DocumentTypeManager documentTypeManager,
                         SessionCache sessionCache,
                         DocumentApiMetrics metricsHelper) {
        super(executor, metric);
        docTypeManager = documentTypeManager;
        this.sessionCache = sessionCache;
        feedReplyHandler = new FeedReplyReader(metric, metricsHelper);
        cron = new ScheduledThreadPoolExecutor(1, ThreadFactoryFactory.getThreadFactory("feed-handler-v3-janitor"));
        cron.scheduleWithFixedDelay(this::removeOldClients, 3, 3, TimeUnit.SECONDS);
        this.metric = metric;
    }

    public void injectDocumentManangerForTests(DocumentTypeManager docTypeManager) {
        this.docTypeManager = docTypeManager;
    }

    // TODO: If this is set up to run without first invoking the old FeedHandler code, we should
    //       verify the version header first.
    @Override
    public HttpResponse handle(HttpRequest request) {
        String clientId = clientId(request);
        ClientFeederV3 clientFeederV3;
        synchronized (monitor) {
            if (! clientFeederByClientId.containsKey(clientId)) {
                SourceSessionParams sourceSessionParams = sourceSessionParams(request);
                clientFeederByClientId.put(clientId,
                                           new ClientFeederV3(retainSource(sessionCache, sourceSessionParams),
                                                              new FeedReaderFactory(true), // TODO: Make error debugging configurable
                                                              docTypeManager,
                                                              clientId,
                                                              metric,
                                                              feedReplyHandler));
            }
            clientFeederV3 = clientFeederByClientId.get(clientId);
        }
        try {
            return clientFeederV3.handleRequest(request);
        } catch (UnknownClientException uce) {
            String msg = Exceptions.toMessageString(uce);
            log.log(Level.WARNING, msg);
            return new ErrorHttpResponse(com.yahoo.jdisc.http.HttpResponse.Status.BAD_REQUEST, msg);
        } catch (Exception e) {
            String msg = "Could not initialize document parsing: " + Exceptions.toMessageString(e);
            log.log(Level.WARNING, msg);
            return new ErrorHttpResponse(com.yahoo.jdisc.http.HttpResponse.Status.INTERNAL_SERVER_ERROR, msg);
        }
    }

    // SessionCache is final and no easy way to mock it so we need this to be able to do testing.
    protected ReferencedResource<SharedSourceSession> retainSource(SessionCache sessionCache, SourceSessionParams params) {
        return sessionCache.retainSource(params);
    }

    @Override
    protected void destroy() {
        // We are forking this to avoid that accidental de-referencing causes any random thread doing destruction.
        // This caused a deadlock when the single Messenger thread in MessageBus was the last one referring this
        // and started destructing something that required something only the messenger thread could provide.
        Thread destroyer = new Thread(() -> {
            cron.shutdown();
            synchronized (monitor) {
                for (var iterator = clientFeederByClientId.values().iterator(); iterator.hasNext(); ) {
                    iterator.next().kill();
                    iterator.remove();
                }
            }
        }, "feed-handler-v3-adhoc-destroyer");
        destroyer.setDaemon(true);
        destroyer.start();
    }

    private String clientId(HttpRequest request) {
        String clientDictatedId = request.getHeader(Headers.CLIENT_ID);
        if (clientDictatedId == null ||  clientDictatedId.isEmpty()) {
            throw new IllegalArgumentException("Did not get any CLIENT_ID header (" + Headers.CLIENT_ID + ")");
        }
        return clientDictatedId;
    }

    private SourceSessionParams sourceSessionParams(HttpRequest request) {
        SourceSessionParams params = new SourceSessionParams();
        String timeout = request.getHeader(Headers.TIMEOUT);

        if (timeout != null) {
            try {
                params.setTimeout(Double.parseDouble(timeout));
            } catch (NumberFormatException e) {
                // NOP
            }
        }
        return params;
    }

    private void removeOldClients() {
        synchronized (monitor) {
            for (var iterator = clientFeederByClientId.values().iterator(); iterator.hasNext(); ) {
                ClientFeederV3 client = iterator.next();
                if (client.timedOut()) {
                    client.kill();
                    iterator.remove();
                }
            }
        }
    }

}