// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.messagebus.jdisc;
import com.google.inject.AbstractModule;
import com.google.inject.Module;
import com.yahoo.jdisc.test.ServerProviderConformanceTest;
import com.yahoo.messagebus.DestinationSessionParams;
import com.yahoo.messagebus.MessageBus;
import com.yahoo.messagebus.MessageBusParams;
import com.yahoo.messagebus.Reply;
import com.yahoo.messagebus.ReplyHandler;
import com.yahoo.messagebus.SourceSession;
import com.yahoo.messagebus.SourceSessionParams;
import com.yahoo.messagebus.network.local.LocalNetwork;
import com.yahoo.messagebus.network.local.LocalWire;
import com.yahoo.messagebus.routing.Route;
import com.yahoo.messagebus.shared.ServerSession;
import com.yahoo.messagebus.shared.SharedMessageBus;
import com.yahoo.messagebus.test.SimpleMessage;
import com.yahoo.messagebus.test.SimpleProtocol;
import org.hamcrest.Matcher;
import org.junit.Ignore;
import org.junit.Test;
import java.io.Closeable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import static com.yahoo.messagebus.ErrorCode.APP_FATAL_ERROR;
import static com.yahoo.messagebus.ErrorCode.SESSION_BUSY;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.MatcherAssert.assertThat;
/**
* @author Simon Thoresen Hult
*/
public class MbusServerConformanceTest extends ServerProviderConformanceTest {
/* Many of the "success" expectations here (may) seem odd. But this is the current behavior of the
* messagebus server. We should probably look into whether the behavior is correct in all cases.
*/
@Override
@Test
public void testContainerNotReadyException() throws Throwable {
new TestRunner().setRequestTimeout(100, TimeUnit.MILLISECONDS)
.expectError(is(SESSION_BUSY))
.executeAndClose();
}
@Override
@Test
public void testBindingSetNotFoundException() throws Throwable {
new TestRunner().expectError(is(APP_FATAL_ERROR))
.executeAndClose();
}
@Override
@Test
public void testNoBindingSetSelectedException() throws Throwable {
new TestRunner().expectError(is(APP_FATAL_ERROR))
.executeAndClose();
}
@Override
@Test
public void testBindingNotFoundException() throws Throwable {
new TestRunner().expectError(is(APP_FATAL_ERROR))
.executeAndClose();
}
@Override
@Test
public void testRequestHandlerWithSyncCloseResponse() throws Throwable {
new TestRunner().expectSuccess()
.executeAndClose();
}
@Override
@Test
public void testRequestHandlerWithSyncWriteResponse() throws Throwable {
new TestRunner().expectSuccess()
.executeAndClose();
}
@Override
@Test
public void testRequestHandlerWithSyncHandleResponse() throws Throwable {
new TestRunner().expectSuccess()
.executeAndClose();
}
@Override
@Test
public void testRequestHandlerWithAsyncHandleResponse() throws Throwable {
new TestRunner().expectSuccess()
.executeAndClose();
}
@Override
@Test
public void testRequestException() throws Throwable {
new TestRunner().expectError(is(APP_FATAL_ERROR))
.executeAndClose();
}
@Override
@Test
public void testRequestExceptionWithSyncCloseResponse() throws Throwable {
new TestRunner().expectSuccess()
.executeAndClose();
}
@Override
@Test
public void testRequestExceptionWithSyncWriteResponse() throws Throwable {
new TestRunner().expectSuccess()
.executeAndClose();
}
@Override
@Test
public void testRequestNondeterministicExceptionWithSyncHandleResponse() throws Throwable {
new TestRunner().expectSuccess()
.executeAndClose();
}
@Override
@Test
public void testRequestExceptionBeforeResponseWriteWithSyncHandleResponse() throws Throwable {
new TestRunner().expectSuccess()
.executeAndClose();
}
@Override
@Test
@Ignore // N/A: The messagebus protocol does not have content.
public void testRequestExceptionAfterResponseWriteWithSyncHandleResponse() throws Throwable {
}
@Override
@Test
public void testRequestNondeterministicExceptionWithAsyncHandleResponse() throws Throwable {
new TestRunner().executeAndClose();
}
@Override
@Test
public void testRequestExceptionBeforeResponseWriteWithAsyncHandleResponse() throws Throwable {
new TestRunner().expectError(is(APP_FATAL_ERROR))
.executeAndClose();
}
@Override
@Test
public void testRequestExceptionAfterResponseCloseNoContentWithAsyncHandleResponse() throws Throwable {
new TestRunner().expectSuccess()
.executeAndClose();
}
@Override
@Test
@Ignore // N/A: The messagebus protocol does not have content.
public void testRequestExceptionAfterResponseWriteWithAsyncHandleResponse() throws Throwable {
}
@Override
@Test
@Ignore // N/A: The messagebus protocol does not have content.
public void testRequestContentWriteWithSyncCompletion() throws Throwable {
}
@Override
@Test
@Ignore // N/A: The messagebus protocol does not have content.
public void testRequestContentWriteWithAsyncCompletion() throws Throwable {
}
@Override
@Test
@Ignore // N/A: The messagebus protocol does not have content.
public void testRequestContentWriteWithNondeterministicSyncFailure() throws Throwable {
}
@Override
@Test
@Ignore // N/A: The messagebus protocol does not have content.
public void testRequestContentWriteWithSyncFailureBeforeResponseWrite() throws Throwable {
}
@Override
@Test
@Ignore // N/A: The messagebus protocol does not have content.
public void testRequestContentWriteWithSyncFailureAfterResponseWrite() throws Throwable {
}
@Override
@Test
@Ignore // N/A: The messagebus protocol does not have content.
public void testRequestContentWriteWithNondeterministicAsyncFailure() throws Throwable {
}
@Override
@Test
@Ignore // N/A: The messagebus protocol does not have content.
public void testRequestContentWriteWithAsyncFailureBeforeResponseWrite() throws Throwable {
}
@Override
@Test
@Ignore // N/A: The messagebus protocol does not have content.
public void testRequestContentWriteWithAsyncFailureAfterResponseWrite() throws Throwable {
}
@Override
@Test
@Ignore // N/A: The messagebus protocol does not have content.
public void testRequestContentWriteWithAsyncFailureAfterResponseCloseNoContent() throws Throwable {
}
@Override
@Test
@Ignore // N/A: The messagebus protocol does not have content.
public void testRequestContentWriteNondeterministicException() throws Throwable {
}
@Override
@Test
@Ignore // N/A: The messagebus protocol does not have content.
public void testRequestContentWriteExceptionBeforeResponseWrite() throws Throwable {
}
@Override
@Test
@Ignore // N/A: The messagebus protocol does not have content.
public void testRequestContentWriteExceptionAfterResponseWrite() throws Throwable {
}
@Override
@Test
@Ignore // N/A: The messagebus protocol does not have content.
public void testRequestContentWriteExceptionAfterResponseCloseNoContent() throws Throwable {
}
@Override
@Test
@Ignore // N/A: The messagebus protocol does not have content.
public void testRequestContentWriteNondeterministicExceptionWithSyncCompletion() throws Throwable {
}
@Override
@Test
@Ignore // N/A: The messagebus protocol does not have content.
public void testRequestContentWriteExceptionBeforeResponseWriteWithSyncCompletion() throws Throwable {
}
@Override
@Test
@Ignore // N/A: The messagebus protocol does not have content.
public void testRequestContentWriteExceptionAfterResponseWriteWithSyncCompletion() throws Throwable {
}
@Override
@Test
@Ignore // N/A: The messagebus protocol does not have content.
public void testRequestContentWriteExceptionAfterResponseCloseNoContentWithSyncCompletion() throws Throwable {
}
@Override
@Test
@Ignore // N/A: The messagebus protocol does not have content.
public void testRequestContentWriteNondeterministicExceptionWithAsyncCompletion() throws Throwable {
}
@Override
@Test
@Ignore // N/A: The messagebus protocol does not have content.
public void testRequestContentWriteExceptionBeforeResponseWriteWithAsyncCompletion() throws Throwable {
}
@Override
@Test
@Ignore // N/A: The messagebus protocol does not have content.
public void testRequestContentWriteExceptionAfterResponseWriteWithAsyncCompletion() throws Throwable {
}
@Override
@Test
@Ignore // N/A: The messagebus protocol does not have content.
public void testRequestContentWriteExceptionAfterResponseCloseNoContentWithAsyncCompletion() throws Throwable {
}
@Override
@Test
@Ignore // N/A: The messagebus protocol does not have content.
public void testRequestContentWriteExceptionWithNondeterministicSyncFailure() throws Throwable {
}
@Override
@Test
@Ignore // N/A: The messagebus protocol does not have content.
public void testRequestContentWriteExceptionWithSyncFailureBeforeResponseWrite() throws Throwable {
}
@Override
@Test
@Ignore // N/A: The messagebus protocol does not have content.
public void testRequestContentWriteExceptionWithSyncFailureAfterResponseWrite() throws Throwable {
}
@Override
@Test
@Ignore // N/A: The messagebus protocol does not have content.
public void testRequestContentWriteExceptionWithSyncFailureAfterResponseCloseNoContent() throws Throwable {
}
@Override
@Test
@Ignore // N/A: The messagebus protocol does not have content.
public void testRequestContentWriteExceptionWithNondeterministicAsyncFailure() throws Throwable {
}
@Override
@Test
@Ignore // N/A: The messagebus protocol does not have content.
public void testRequestContentWriteExceptionWithAsyncFailureBeforeResponseWrite() throws Throwable {
}
@Override
@Test
@Ignore // N/A: The messagebus protocol does not have content.
public void testRequestContentWriteExceptionWithAsyncFailureAfterResponseWrite() throws Throwable {
}
@Override
@Test
@Ignore // N/A: The messagebus protocol does not have content.
public void testRequestContentWriteExceptionWithAsyncFailureAfterResponseCloseNoContent() throws Throwable {
}
@Override
@Test
public void testRequestContentCloseWithSyncCompletion() throws Throwable {
new TestRunner().expectSuccess()
.executeAndClose();
}
@Override
@Test
public void testRequestContentCloseWithAsyncCompletion() throws Throwable {
new TestRunner().expectSuccess()
.executeAndClose();
}
@Override
@Test
public void testRequestContentCloseWithNondeterministicSyncFailure() throws Throwable {
new TestRunner().expectSuccess()
.executeAndClose();
}
@Override
@Test
public void testRequestContentCloseWithSyncFailureBeforeResponseWrite() throws Throwable {
new TestRunner().expectSuccess()
.executeAndClose();
}
@Override
@Test
@Ignore // N/A: The messagebus protocol does not have content.
public void testRequestContentCloseWithSyncFailureAfterResponseWrite() throws Throwable {
}
@Override
@Test
public void testRequestContentCloseWithSyncFailureAfterResponseCloseNoContent() throws Throwable {
new TestRunner().expectSuccess()
.executeAndClose();
}
@Override
@Test
public void testRequestContentCloseWithNondeterministicAsyncFailure() throws Throwable {
new TestRunner().expectSuccess()
.executeAndClose();
}
@Override
@Test
public void testRequestContentCloseWithAsyncFailureBeforeResponseWrite() throws Throwable {
new TestRunner().expectSuccess()
.executeAndClose();
}
@Override
@Test
@Ignore // N/A: The messagebus protocol does not have content.
public void testRequestContentCloseWithAsyncFailureAfterResponseWrite() throws Throwable {
}
@Override
@Test
public void testRequestContentCloseWithAsyncFailureAfterResponseCloseNoContent() throws Throwable {
new TestRunner().expectSuccess()
.executeAndClose();
}
@Override
@Test
public void testRequestContentCloseNondeterministicException() throws Throwable {
new TestRunner().expectSuccess()
.executeAndClose();
}
@Override
@Test
public void testRequestContentCloseExceptionBeforeResponseWrite() throws Throwable {
new TestRunner().expectSuccess()
.executeAndClose();
}
@Override
@Test
@Ignore // N/A: The messagebus protocol does not have content.
public void testRequestContentCloseExceptionAfterResponseWrite() throws Throwable {
}
@Override
@Test
public void testRequestContentCloseExceptionAfterResponseCloseNoContent() throws Throwable {
new TestRunner().expectSuccess()
.executeAndClose();
}
@Override
@Test
public void testRequestContentCloseNondeterministicExceptionWithSyncCompletion() throws Throwable {
new TestRunner().expectSuccess()
.executeAndClose();
}
@Override
@Test
public void testRequestContentCloseExceptionBeforeResponseWriteWithSyncCompletion() throws Throwable {
new TestRunner().expectSuccess()
.executeAndClose();
}
@Override
@Test
@Ignore // N/A: The messagebus protocol does not have content.
public void testRequestContentCloseExceptionAfterResponseWriteWithSyncCompletion() throws Throwable {
}
@Override
@Test
public void testRequestContentCloseExceptionAfterResponseCloseNoContentWithSyncCompletion() throws Throwable {
new TestRunner().expectSuccess()
.executeAndClose();
}
@Override
@Test
public void testRequestContentCloseNondeterministicExceptionWithAsyncCompletion() throws Throwable {
new TestRunner().expectSuccess()
.executeAndClose();
}
@Override
@Test
public void testRequestContentCloseExceptionBeforeResponseWriteWithAsyncCompletion() throws Throwable {
new TestRunner().expectSuccess()
.executeAndClose();
}
@Override
@Test
@Ignore // N/A: The messagebus protocol does not have content.
public void testRequestContentCloseExceptionAfterResponseWriteWithAsyncCompletion() throws Throwable {
}
@Override
@Test
public void testRequestContentCloseExceptionAfterResponseCloseNoContentWithAsyncCompletion() throws Throwable {
new TestRunner().expectSuccess()
.executeAndClose();
}
@Override
@Test
public void testRequestContentCloseNondeterministicExceptionWithSyncFailure() throws Throwable {
new TestRunner().expectSuccess()
.executeAndClose();
}
@Override
@Test
public void testRequestContentCloseExceptionBeforeResponseWriteWithSyncFailure() throws Throwable {
new TestRunner().expectSuccess()
.executeAndClose();
}
@Override
@Test
@Ignore // N/A: The messagebus protocol does not have content.
public void testRequestContentCloseExceptionAfterResponseWriteWithSyncFailure() throws Throwable {
}
@Override
@Test
public void testRequestContentCloseExceptionAfterResponseCloseNoContentWithSyncFailure() throws Throwable {
new TestRunner().expectSuccess()
.executeAndClose();
}
@Override
@Test
public void testRequestContentCloseNondeterministicExceptionWithAsyncFailure() throws Throwable {
new TestRunner().expectSuccess()
.executeAndClose();
}
@Override
@Test
public void testRequestContentCloseExceptionBeforeResponseWriteWithAsyncFailure() throws Throwable {
new TestRunner().expectSuccess()
.executeAndClose();
}
@Override
@Test
@Ignore // N/A: The messagebus protocol does not have content.
public void testRequestContentCloseExceptionAfterResponseWriteWithAsyncFailure() throws Throwable {
}
@Override
@Test
public void testRequestContentCloseExceptionAfterResponseCloseNoContentWithAsyncFailure() throws Throwable {
new TestRunner().expectSuccess()
.executeAndClose();
}
@Override
@Test
@Ignore // N/A: The messagebus protocol does not have content.
public void testResponseWriteCompletionException() throws Throwable {
}
@Override
@Test
public void testResponseCloseCompletionException() throws Throwable {
new TestRunner().expectSuccess()
.executeAndClose();
}
@Override
@Test
public void testResponseCloseCompletionExceptionNoContent() throws Throwable {
new TestRunner().expectSuccess()
.executeAndClose();
}
private class TestRunner implements Adapter {
final LocalWire wire = new LocalWire();
final SharedMessageBus mbus;
final ServerSession session;
Matcher expectedError = null;
boolean successExpected = false;
long timeoutMillis = TimeUnit.SECONDS.toMillis(60);
TestRunner() {
this(new MessageBusParams().addProtocol(new SimpleProtocol()),
new DestinationSessionParams());
}
TestRunner(final MessageBusParams mbusParams, final DestinationSessionParams sessionParams) {
this.mbus = new SharedMessageBus(new MessageBus(new LocalNetwork(wire), mbusParams));
this.session = mbus.newDestinationSession(sessionParams);
}
TestRunner setRequestTimeout(final long timeout, final TimeUnit unit) {
timeoutMillis = unit.toMillis(timeout);
return this;
}
TestRunner expectError(final Matcher matcher) {
assertThat(successExpected, is(false));
expectedError = matcher;
return this;
}
TestRunner expectSuccess() {
assertThat(expectedError, is(nullValue()));
successExpected = true;
return this;
}
@Override
public Module newConfigModule() {
return new AbstractModule() {
@Override
protected void configure() {
bind(ServerSession.class).toInstance(session);
}
};
}
@Override
public Class getServerProviderClass() {
return MbusServer.class;
}
@Override
public MyClient newClient(final MbusServer server) throws Throwable {
return new MyClient(wire, server.connectionSpec());
}
@Override
public Reply executeRequest(final MyClient client, final boolean withRequestContent) throws Throwable {
// This protocol doesn't have the concept of "request content", so if we are asked to send any, it's a bug.
assertThat(withRequestContent, is(false));
final SimpleMessage msg = new SimpleMessage("foo");
msg.getTrace().setLevel(9);
msg.setRoute(client.route);
msg.setTimeRemaining(timeoutMillis);
assertThat("client.session.send(msg).isAccepted()",
client.session.send(msg).isAccepted(), is(true));
final Reply reply = client.replies.poll(60, TimeUnit.SECONDS);
assertThat("reply != null", reply, notNullValue());
return reply;
}
@Override
public Iterable newResponseContent() {
return Collections.emptyList();
}
@Override
public void validateResponse(final Reply reply) throws Throwable {
final String trace = String.valueOf(reply.getTrace());
if (expectedError != null) {
assertThat(reply.hasErrors(), is(true));
final int error = reply.getError(0).getCode();
assertThat(trace, error, expectedError);
}
if (successExpected) {
assertThat(trace, reply.hasErrors(), is(false));
}
}
void executeAndClose() throws Throwable {
runTest(this);
session.release();
mbus.release();
}
}
public static class MyClient implements Closeable, ReplyHandler {
final BlockingDeque replies = new LinkedBlockingDeque<>();
final MessageBus mbus;
final Route route;
final SourceSession session;
MyClient(final LocalWire wire, final String connectionSpec) {
this.mbus = new MessageBus(new LocalNetwork(wire),
new MessageBusParams().addProtocol(new SimpleProtocol()));
this.session = mbus.createSourceSession(new SourceSessionParams().setReplyHandler(this));
this.route = Route.parse(connectionSpec);
}
@Override
public void close() throws IOException {
session.destroy();
mbus.destroy();
}
@Override
public void handleReply(final Reply reply) {
replies.addLast(reply);
}
}
}