summaryrefslogtreecommitdiffstats
path: root/documentapi
diff options
context:
space:
mode:
authorJon Marius Venstad <venstad@gmail.com>2020-09-28 11:50:41 +0200
committerJon Marius Venstad <venstad@gmail.com>2020-09-28 11:50:41 +0200
commit620b983bbc9a8d2bb1252fbb30e5941f3927563b (patch)
treef9ae28b86e16cfc1ae4005c5697ae9a25b43db18 /documentapi
parent6d1aa9aa09f3628b180541922d651b4ad7a036cb (diff)
Separate out delay queue, and add unit test for it
Diffstat (limited to 'documentapi')
-rw-r--r--documentapi/src/main/java/com/yahoo/documentapi/local/LocalAsyncSession.java19
1 files changed, 10 insertions, 9 deletions
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/local/LocalAsyncSession.java b/documentapi/src/main/java/com/yahoo/documentapi/local/LocalAsyncSession.java
index 565b14981e7..35dc3992179 100644
--- a/documentapi/src/main/java/com/yahoo/documentapi/local/LocalAsyncSession.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/local/LocalAsyncSession.java
@@ -22,6 +22,8 @@ import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol;
import java.util.Queue;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
@@ -39,6 +41,7 @@ public class LocalAsyncSession implements AsyncSession {
private final BlockingQueue<Response> responses = new LinkedBlockingQueue<>();
private final ResponseHandler handler;
private final SyncSession syncSession;
+ private final Executor executor = Executors.newCachedThreadPool();
private AtomicLong requestId = new AtomicLong(0);
private AtomicReference<Runnable> synchronizer = new AtomicReference<>();
@@ -172,16 +175,14 @@ public class LocalAsyncSession implements AsyncSession {
return new Result(resultType, new Error());
long req = requestId.incrementAndGet();
- synchronizer.getAndUpdate(runnable -> {
- if (runnable == null)
+ Runnable runnable = synchronizer.get();
+ if (runnable == null)
+ addResponse(responses.apply(req));
+ else
+ executor.execute(() -> {
+ runnable.run();
addResponse(responses.apply(req));
- else
- new Thread(() -> {
- runnable.run();
- addResponse(responses.apply(req));
- }).start();
- return runnable;
- });
+ });
return new Result(req);
}