blob: 20192e636711e2f8f94d153d89d9d7bb779f99b7 (
plain) (
blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
|
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.vespa.clustercontroller.utils.communication.http;
import com.yahoo.vespa.clustercontroller.utils.communication.async.AsyncCallback;
import com.yahoo.vespa.clustercontroller.utils.communication.async.AsyncOperation;
import java.util.LinkedList;
import java.util.logging.Logger;
/**
* Utility class to schedule HTTP requests and keeping a maximum amount of them pending at a time.
*/
public class RequestQueue<V extends HttpResult> {
private static final Logger log = Logger.getLogger(RequestQueue.class.getName());
private final AsyncHttpClient<V> httpClient;
private final LinkedList<Request<V>> requestQueue = new LinkedList<>();
private final int maxPendingRequests;
private int pendingRequests = 0;
public RequestQueue(AsyncHttpClient<V> httpClient, int maxPendingRequests) {
this.httpClient = httpClient;
this.maxPendingRequests = maxPendingRequests;
}
public boolean empty() {
synchronized (requestQueue) {
return (requestQueue.isEmpty() && pendingRequests == 0);
}
}
public void waitUntilEmpty() throws InterruptedException {
synchronized (requestQueue) {
while (!empty()) {
requestQueue.wait();
}
}
}
public void schedule(HttpRequest request, AsyncCallback<V> callback) {
log.fine("Scheduling " + request + " call");
synchronized (requestQueue) {
requestQueue.addLast(new Request<>(request, callback));
sendMore();
}
}
private void sendMore() {
while (pendingRequests < maxPendingRequests && !requestQueue.isEmpty()) {
Request<V> call = requestQueue.removeFirst();
log.fine("Sending " + call.getRequest() + ".");
++pendingRequests;
AsyncOperation<V> op = httpClient.execute(call.getRequest());
op.register(call);
}
}
private class Request<V extends HttpResult> implements AsyncCallback<V> {
private final HttpRequest request;
private final AsyncCallback<V> callback;
Request(HttpRequest request, AsyncCallback<V> callback) {
this.request = request;
this.callback = callback;
}
public HttpRequest getRequest() { return request; }
@Override
public void done(AsyncOperation<V> op) {
if (op.isSuccess()) {
log.fine("Operation " + op.getName() + " completed successfully");
} else {
log.fine("Operation " + op.getName() + " failed: " + op.getCause());
}
synchronized (requestQueue) {
--pendingRequests;
}
callback.done(op);
synchronized (requestQueue) {
requestQueue.notifyAll();
sendMore();
}
}
}
}
|