summaryrefslogtreecommitdiffstats
path: root/clustercontroller-apputil/src/main/java/com/yahoo/vespa/clustercontroller/apputil/communication/http/ApacheAsyncHttpClient.java
blob: afed1eb39ab831ab581e54b72ea2739d3b7ef1e1 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.vespa.clustercontroller.apputil.communication.http;

import com.yahoo.vespa.clustercontroller.utils.communication.async.AsyncOperation;
import com.yahoo.vespa.clustercontroller.utils.communication.async.AsyncOperationImpl;
import com.yahoo.vespa.clustercontroller.utils.communication.http.AsyncHttpClient;
import com.yahoo.vespa.clustercontroller.utils.communication.http.HttpRequest;
import com.yahoo.vespa.clustercontroller.utils.communication.http.HttpResult;
import com.yahoo.vespa.clustercontroller.utils.communication.http.SyncHttpClient;

import java.util.*;
import java.util.concurrent.Executor;
import java.util.logging.Logger;

/**
 * There are some stuff to work around with the apache client.
 *   - Whether to use a proxy or not is global, not per request.
 *   - Timeout is not handled per request (and is not a request timeout but seems like a "something happening on TCP" timeout.
 *   - It is not thread safe.
 *
 * This class gets around these issues by creating one instance per unique setting, and ensuring only one request use a given instance at a time.
 */
public class ApacheAsyncHttpClient implements AsyncHttpClient<HttpResult> {
    private static final Logger log = Logger.getLogger(ApacheAsyncHttpClient.class.getName());
    public interface SyncHttpClientFactory {
        SyncHttpClient createInstance(String proxyHost, int proxyPort, long timeoutMs);
    }
    public static class Settings {
        String proxyHost;
        int proxyPort;
        long timeout;

        Settings(HttpRequest request) {
            timeout = request.getTimeoutMillis();
            if (request.getPath() != null
                && !request.getPath().isEmpty()
                && request.getPath().charAt(0) != '/')
            {
                proxyHost = request.getHost();
                proxyPort = request.getPort();
                int colo = request.getPath().indexOf(':');
                int slash = request.getPath().indexOf('/', colo);
                if (colo < 0 && slash < 0) {
                    throw new IllegalStateException("Http path '" + request.getPath() + "' looks invalid. "
                                                  + "Cannot extract proxy server data. Is it a regular request that "
                                                  + "should start with a slash?");
                }
                if (colo < 0) {
                    request.setPort(80);
                    request.setHost(request.getPath().substring(0, slash));
                } else {
                    request.setHost(request.getPath().substring(0, colo));
                    request.setPort(Integer.valueOf(request.getPath().substring(colo + 1, slash)));
                }
                request.setPath(request.getPath().substring(slash));
            }
        }

        @Override
        public boolean equals(Object other) {
            Settings o = (Settings) other;
            if (timeout != o.timeout || proxyPort != o.proxyPort
                || (proxyHost == null ^ o.proxyHost == null)
                || (proxyHost != null && !proxyHost.equals(o.proxyHost)))
            {
                return false;
            }
            return true;
        }

        @Override
        public int hashCode() {
            return (proxyHost == null ? 0 : proxyHost.hashCode()) ^ proxyPort ^ Long.valueOf(timeout).hashCode();
        }
    }
    private final Executor executor;
    private final SyncHttpClientFactory clientFactory;
    private boolean closed = false;
    private final int maxInstanceCacheSize; // Maximum number of currently unused instances.
    private final Map<Settings, LinkedList<SyncHttpClient>> apacheInstances = new LinkedHashMap<Settings, LinkedList<SyncHttpClient>>() {
        protected @Override boolean removeEldestEntry(Map.Entry<Settings,LinkedList<SyncHttpClient>> eldest) {
            return getUnusedCacheSize() > maxInstanceCacheSize;
        }
    };

    public ApacheAsyncHttpClient(Executor executor) {
        this(executor, new SyncHttpClientFactory() {
            @Override
            public SyncHttpClient createInstance(String proxyHost, int proxyPort, long timeoutMs) {
                return new ApacheHttpInstance(proxyHost, proxyPort, timeoutMs);
            }
        });
    }

    public ApacheAsyncHttpClient(Executor executor, SyncHttpClientFactory clientFactory) {
        this.executor = executor;
        this.clientFactory = clientFactory;
        maxInstanceCacheSize = 16;
        log.fine("Starting apache com.yahoo.vespa.clustercontroller.utils.communication.async HTTP client");
    }

    private SyncHttpClient getFittingInstance(Settings settings) {
        synchronized (apacheInstances) {
            if (closed) throw new IllegalStateException("Http client has been closed for business.");
            LinkedList<SyncHttpClient> fittingInstances = apacheInstances.get(settings);
            if (fittingInstances == null) {
                fittingInstances = new LinkedList<>();
                apacheInstances.put(settings, fittingInstances);
            }
            if (fittingInstances.isEmpty()) {
                return clientFactory.createInstance(settings.proxyHost, settings.proxyPort, settings.timeout);
            } else {
                return fittingInstances.removeFirst();
            }
        }
    }
    private void insertInstance(Settings settings, SyncHttpClient instance) {
        synchronized (apacheInstances) {
            LinkedList<SyncHttpClient> fittingInstances = apacheInstances.get(settings);
            if (closed || fittingInstances == null) {
                instance.close();
                return;
            }
            fittingInstances.addLast(instance);
        }
    }
    private int getUnusedCacheSize() {
        int size = 0;
        synchronized (apacheInstances) {
            for (LinkedList<SyncHttpClient> list : apacheInstances.values()) {
                size += list.size();
            }
        }
        return size;
    }

    @Override
    public AsyncOperation<HttpResult> execute(HttpRequest r) {
        final HttpRequest request = r.clone(); // Gonna modify it to extract proxy information
        final Settings settings = new Settings(request);
        final SyncHttpClient instance = getFittingInstance(settings);
        final AsyncOperationImpl<HttpResult> op = new AsyncOperationImpl<>(r.toString(), r.toString(true));
        executor.execute(new Runnable() {
            @Override
            public void run() {
                HttpResult result;
                Exception failure = null;
                try{
                    result = instance.execute(request);
                } catch (Exception e) {
                    result = new HttpResult().setHttpCode(500, "Apache client failed to execute request.");
                    failure = e;
                }
                insertInstance(settings, instance);
                // Must insert back instance before tagging operation complete to ensure a following
                // call can reuse same instance
                if (failure != null) {
                    op.setFailure(failure, result);
                } else {
                    op.setResult(result);
                }
            }
        });
        return op;
    }

    @Override
    public void close() {
        synchronized (apacheInstances) {
            closed = true;
            for (LinkedList<SyncHttpClient> list : apacheInstances.values()) {
                for (SyncHttpClient instance : list) {
                    instance.close();
                }
            }
            apacheInstances.clear();
        }
    }
}