// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.concurrent;
import com.yahoo.collections.Tuple2;
/**
* A class for sending single messages between threads with timeout. Typical use
* would be
*
*
* Receiver<SomeMessage> receiver = new Receiver<SomeMessage>();
* SomeRunnable runnable = new SomeRunnable(receiver);
* Thread worker = new Thread(runnable);
* worker.start();
* Pair<Receiver.MessageState, SomeMessage> answer = receiver.get(500L);
*
*
* ... and in the worker thread simply
*
*
* receiver.put(new SomeMessage(...))
*
*
*
* Any number of threads may wait for the same message. Sending null references
* is supported. The object is intended for delivering only single message,
* there is no support for recycling it.
*
*
* @author Steinar Knutsen
*/
public class Receiver {
/**
* MessageState is the reason for returning from get(). If a message is
* received before timeout, the state will be VALID. If no message is
* received before timeout, state is TIMEOUT.
*/
public enum MessageState {
VALID, TIMEOUT;
};
private final Object lock = new Object();
private T message = null;
private boolean received = false;
/**
* Make a message available for consumers.
*
* @param message the message to send
* @throws IllegalStateException if a message has already been received here
*/
public void put(T message) {
synchronized (lock) {
if (received) {
throw new IllegalStateException("Multiple puts on a single Receiver instance is not legal.");
}
this.message = message;
received = true;
lock.notifyAll();
}
}
/**
* Wait for up to "timeout" milliseconds for an incoming message. This hides
* spurious wakeup, but InterruptedException will be propagated.
*
* @param timeout
* maximum time to wait for message in milliseconds
* @return a Pair instance containing the reason for returning and the
* message possible received
* @throws InterruptedException if the waiting thread is interrupted
*/
public Tuple2 get(long timeout) throws InterruptedException {
long barrier = System.currentTimeMillis() + timeout;
synchronized (lock) {
while (!received) {
long t = System.currentTimeMillis();
if (t >= barrier) {
return new Tuple2<>(MessageState.TIMEOUT, null);
}
lock.wait(barrier - t);
}
return new Tuple2<>(MessageState.VALID, message);
}
}
}