summaryrefslogtreecommitdiffstats
path: root/vespajlib/src/main/java/com/yahoo/concurrent/Receiver.java
diff options
context:
space:
mode:
Diffstat (limited to 'vespajlib/src/main/java/com/yahoo/concurrent/Receiver.java')
-rw-r--r--vespajlib/src/main/java/com/yahoo/concurrent/Receiver.java86
1 files changed, 86 insertions, 0 deletions
diff --git a/vespajlib/src/main/java/com/yahoo/concurrent/Receiver.java b/vespajlib/src/main/java/com/yahoo/concurrent/Receiver.java
new file mode 100644
index 00000000000..339d8002c4f
--- /dev/null
+++ b/vespajlib/src/main/java/com/yahoo/concurrent/Receiver.java
@@ -0,0 +1,86 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.concurrent;
+
+import com.yahoo.collections.Tuple2;
+
+/**
+ * A class for sending single messages between threads with timeout. Typical use
+ * would be
+ *
+ * <pre>
+ * Receiver&lt;SomeMessage&gt; receiver = new Receiver&lt;SomeMessage&gt;();
+ * SomeRunnable runnable = new SomeRunnable(receiver);
+ * Thread worker = new Thread(runnable);
+ * worker.start();
+ * Pair&lt;Receiver.MessageState, SomeMessage&gt; answer = receiver.get(500L);
+ * </pre>
+ *
+ * ... and in the worker thread simply
+ *
+ * <pre>
+ * receiver.put(new SomeMessage(...))
+ * </pre>
+ *
+ * <p>
+ * Any number of threads may wait for the same message. Sending null references
+ * is supported. The object is intended for delivering only single message,
+ * there is no support for recycling it.
+ * </p>
+ *
+ * @author <a href="mailto:steinar@yahoo-inc.com">Steinar Knutsen</a>
+ */
+public class Receiver<T> {
+ /**
+ * MessageState is the reason for returning from get(). If a message is
+ * received before timeout, the state will be VALID. If no message is
+ * received before timeout, state is TIMEOUT.
+ */
+ public enum MessageState {
+ VALID, TIMEOUT;
+ };
+ private final Object lock = new Object();
+ private T message = null;
+ private boolean received = false;
+
+ /**
+ * Make a message available for consumers.
+ *
+ * @param message the message to send
+ * @throws IllegalStateException if a message has already been received here
+ */
+ public void put(T message) {
+ synchronized (lock) {
+ if (received) {
+ throw new IllegalStateException("Multiple puts on a single Receiver instance is not legal.");
+ }
+ this.message = message;
+ received = true;
+ lock.notifyAll();
+ }
+ }
+
+ /**
+ * Wait for up to "timeout" milliseconds for an incoming message. This hides
+ * spurious wakeup, but InterruptedException will be propagated.
+ *
+ * @param timeout
+ * maximum time to wait for message in milliseconds
+ * @return a Pair instance containing the reason for returning and the
+ * message possible received
+ * @throws InterruptedException if the waiting thread is interrupted
+ */
+ public Tuple2<MessageState, T> get(long timeout) throws InterruptedException {
+ long barrier = System.currentTimeMillis() + timeout;
+ synchronized (lock) {
+ while (!received) {
+ long t = System.currentTimeMillis();
+ if (t >= barrier) {
+ return new Tuple2<>(MessageState.TIMEOUT, null);
+ }
+ lock.wait(barrier - t);
+ }
+ return new Tuple2<>(MessageState.VALID, message);
+ }
+ }
+
+}