diff options
Diffstat (limited to 'vespajlib/src/main/java/com/yahoo/concurrent/Receiver.java')
-rw-r--r-- | vespajlib/src/main/java/com/yahoo/concurrent/Receiver.java | 86 |
1 files changed, 86 insertions, 0 deletions
diff --git a/vespajlib/src/main/java/com/yahoo/concurrent/Receiver.java b/vespajlib/src/main/java/com/yahoo/concurrent/Receiver.java new file mode 100644 index 00000000000..339d8002c4f --- /dev/null +++ b/vespajlib/src/main/java/com/yahoo/concurrent/Receiver.java @@ -0,0 +1,86 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.concurrent; + +import com.yahoo.collections.Tuple2; + +/** + * A class for sending single messages between threads with timeout. Typical use + * would be + * + * <pre> + * Receiver<SomeMessage> receiver = new Receiver<SomeMessage>(); + * SomeRunnable runnable = new SomeRunnable(receiver); + * Thread worker = new Thread(runnable); + * worker.start(); + * Pair<Receiver.MessageState, SomeMessage> answer = receiver.get(500L); + * </pre> + * + * ... and in the worker thread simply + * + * <pre> + * receiver.put(new SomeMessage(...)) + * </pre> + * + * <p> + * Any number of threads may wait for the same message. Sending null references + * is supported. The object is intended for delivering only single message, + * there is no support for recycling it. + * </p> + * + * @author <a href="mailto:steinar@yahoo-inc.com">Steinar Knutsen</a> + */ +public class Receiver<T> { + /** + * MessageState is the reason for returning from get(). If a message is + * received before timeout, the state will be VALID. If no message is + * received before timeout, state is TIMEOUT. + */ + public enum MessageState { + VALID, TIMEOUT; + }; + private final Object lock = new Object(); + private T message = null; + private boolean received = false; + + /** + * Make a message available for consumers. + * + * @param message the message to send + * @throws IllegalStateException if a message has already been received here + */ + public void put(T message) { + synchronized (lock) { + if (received) { + throw new IllegalStateException("Multiple puts on a single Receiver instance is not legal."); + } + this.message = message; + received = true; + lock.notifyAll(); + } + } + + /** + * Wait for up to "timeout" milliseconds for an incoming message. This hides + * spurious wakeup, but InterruptedException will be propagated. + * + * @param timeout + * maximum time to wait for message in milliseconds + * @return a Pair instance containing the reason for returning and the + * message possible received + * @throws InterruptedException if the waiting thread is interrupted + */ + public Tuple2<MessageState, T> get(long timeout) throws InterruptedException { + long barrier = System.currentTimeMillis() + timeout; + synchronized (lock) { + while (!received) { + long t = System.currentTimeMillis(); + if (t >= barrier) { + return new Tuple2<>(MessageState.TIMEOUT, null); + } + lock.wait(barrier - t); + } + return new Tuple2<>(MessageState.VALID, message); + } + } + +} |