diff options
author | Jon Marius Venstad <venstad@gmail.com> | 2020-09-28 11:50:41 +0200 |
---|---|---|
committer | Jon Marius Venstad <venstad@gmail.com> | 2020-09-28 11:50:41 +0200 |
commit | 620b983bbc9a8d2bb1252fbb30e5941f3927563b (patch) | |
tree | f9ae28b86e16cfc1ae4005c5697ae9a25b43db18 /documentapi | |
parent | 6d1aa9aa09f3628b180541922d651b4ad7a036cb (diff) |
Separate out delay queue, and add unit test for it
Diffstat (limited to 'documentapi')
-rw-r--r-- | documentapi/src/main/java/com/yahoo/documentapi/local/LocalAsyncSession.java | 19 |
1 files changed, 10 insertions, 9 deletions
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/local/LocalAsyncSession.java b/documentapi/src/main/java/com/yahoo/documentapi/local/LocalAsyncSession.java index 565b14981e7..35dc3992179 100644 --- a/documentapi/src/main/java/com/yahoo/documentapi/local/LocalAsyncSession.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/local/LocalAsyncSession.java @@ -22,6 +22,8 @@ import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol; import java.util.Queue; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -39,6 +41,7 @@ public class LocalAsyncSession implements AsyncSession { private final BlockingQueue<Response> responses = new LinkedBlockingQueue<>(); private final ResponseHandler handler; private final SyncSession syncSession; + private final Executor executor = Executors.newCachedThreadPool(); private AtomicLong requestId = new AtomicLong(0); private AtomicReference<Runnable> synchronizer = new AtomicReference<>(); @@ -172,16 +175,14 @@ public class LocalAsyncSession implements AsyncSession { return new Result(resultType, new Error()); long req = requestId.incrementAndGet(); - synchronizer.getAndUpdate(runnable -> { - if (runnable == null) + Runnable runnable = synchronizer.get(); + if (runnable == null) + addResponse(responses.apply(req)); + else + executor.execute(() -> { + runnable.run(); addResponse(responses.apply(req)); - else - new Thread(() -> { - runnable.run(); - addResponse(responses.apply(req)); - }).start(); - return runnable; - }); + }); return new Result(req); } |