// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.vespa.http.server; import com.yahoo.container.jdisc.HttpRequest; import com.yahoo.container.jdisc.HttpResponse; import com.yahoo.container.jdisc.messagebus.SessionCache; import com.yahoo.container.logging.AccessLog; import com.yahoo.document.DocumentTypeManager; import com.yahoo.document.config.DocumentmanagerConfig; import com.yahoo.jdisc.ReferencedResource; import com.yahoo.jdisc.References; import com.yahoo.jdisc.http.HttpRequest.Method; import com.yahoo.messagebus.*; import com.yahoo.messagebus.Error; import com.yahoo.messagebus.shared.SharedMessageBus; import com.yahoo.messagebus.shared.SharedSourceSession; import com.yahoo.metrics.simple.MetricReceiver; import com.yahoo.text.Utf8; import com.yahoo.text.Utf8String; import com.yahoo.vespa.http.client.core.Headers; import com.yahoo.vespa.http.client.core.OperationStatus; import com.yahoo.vespaxmlparser.MockFeedReaderFactory; import org.junit.After; import org.junit.Before; import org.junit.Test; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import static org.junit.Assert.assertEquals; /** * Check FeedHandler APIs. * * @author Steinar Knutsen */ public class V2ErrorsInResultTestCase { LessConfiguredHandler handler; ExecutorService workers; @Before public void setUp() throws Exception { workers = Executors.newCachedThreadPool(); handler = new LessConfiguredHandler(workers); } @After public void tearDown() throws Exception { handler.destroy(); workers.shutdown(); } private static class LessConfiguredHandler extends FeedHandler { public LessConfiguredHandler(Executor executor) throws Exception { super(new FeedHandler.Context(executor, AccessLog.voidAccessLog(), new DummyMetric()), null, null, null, MetricReceiver.nullImplementation); } @Override protected Feeder createFeeder(HttpRequest request, InputStream requestInputStream, BlockingQueue operations, String clientId, boolean sessionIdWasGeneratedJustNow, int protocolVersion) throws Exception { return new LessConfiguredFeeder(requestInputStream, operations, popClient(clientId), new FeederSettings(request), clientId, sessionIdWasGeneratedJustNow, sourceSessionParams(request), null, this, this.feedReplyHandler, ""); } @Override protected DocumentTypeManager createDocumentManager( DocumentmanagerConfig documentManagerConfig) { return null; } } private static class MockSharedSession extends SharedSourceSession { int count; public MockSharedSession(SourceSessionParams params) { super(new SharedMessageBus(new MessageBus(new MockNetwork(), new MessageBusParams())), params); count = 0; } @Override public Result sendMessageBlocking(Message msg) throws InterruptedException { return sendMessage(msg); } @Override public Result sendMessage(Message msg) { Result r; ReplyHandler handler = msg.popHandler(); switch (count++) { case 0: r = new Result(ErrorCode.FATAL_ERROR, "boom"); break; case 1: r = new Result(ErrorCode.TRANSIENT_ERROR, "transient boom"); break; case 2: final FailedReply reply = new FailedReply(msg.getContext()); reply.addError(new Error( ErrorCode.FATAL_ERROR, "bad mojo, dude")); handler.handleReply(reply); r = Result.ACCEPTED; break; default: handler.handleReply(new MockReply(msg.getContext())); r = Result.ACCEPTED; } return r; } } private static class FailedReply extends Reply { Object context; public FailedReply(Object context) { this.context = context; } @Override public Utf8String getProtocol() { return null; } @Override public int getType() { return 0; } @Override public Object getContext() { return context; } } private static class LessConfiguredFeeder extends Feeder { public LessConfiguredFeeder(InputStream stream, BlockingQueue operations, ClientState storedState, FeederSettings settings, String clientId, boolean sessionIdWasGeneratedJustNow, SourceSessionParams sessionParams, SessionCache sessionCache, FeedHandler handler, ReplyHandler feedReplyHandler, String localHostname) throws Exception { super(stream, new MockFeedReaderFactory(), null, operations, storedState, settings, clientId, sessionIdWasGeneratedJustNow, sessionParams, sessionCache, handler, new DummyMetric(), feedReplyHandler, localHostname); } protected ReferencedResource retainSession( SourceSessionParams sessionParams, SessionCache sessionCache) { final SharedSourceSession session = new MockSharedSession(sessionParams); return new ReferencedResource<>(session, References.fromResource(session)); } } @Test public final void test() throws IOException { String sessionId; { InputStream in = new MetaStream(new byte[] { 1 }); ByteArrayOutputStream out = new ByteArrayOutputStream(); HttpRequest nalle = HttpRequest .createTestRequest( "http://test4-steinar:19020/reserved-for-internal-use/feedapi", Method.POST, in); nalle.getJDiscRequest().headers().add(Headers.VERSION, "2"); nalle.getJDiscRequest().headers().add(Headers.DRAIN, "false"); HttpResponse r = handler.handle(nalle); sessionId = r.headers().getFirst(Headers.SESSION_ID); r.render(out); assertEquals("", Utf8.toString(out.toByteArray())); } { InputStream in = new MetaStream(new byte[] { 1 }); ByteArrayOutputStream out = new ByteArrayOutputStream(); HttpRequest nalle = HttpRequest .createTestRequest( "http://test4-steinar:19020/reserved-for-internal-use/feedapi", Method.POST, in); nalle.getJDiscRequest().headers().add(Headers.VERSION, "2"); nalle.getJDiscRequest().headers().add(Headers.DRAIN, "false"); nalle.getJDiscRequest().headers().add(Headers.SESSION_ID, sessionId); HttpResponse r = handler.handle(nalle); r.render(out); assertEquals("id:banana:banana::doc1 ERROR boom \n", Utf8.toString(out.toByteArray())); } { InputStream in = new MetaStream(new byte[] { 1 }); ByteArrayOutputStream out = new ByteArrayOutputStream(); HttpRequest nalle = HttpRequest .createTestRequest( "http://test4-steinar:19020/reserved-for-internal-use/feedapi", Method.POST, in); nalle.getJDiscRequest().headers().add(Headers.VERSION, "2"); nalle.getJDiscRequest().headers().add(Headers.DRAIN, "false"); nalle.getJDiscRequest().headers().add(Headers.SESSION_ID, sessionId); HttpResponse r = handler.handle(nalle); r.render(out); assertEquals("id:banana:banana::doc1 TRANSIENT_ERROR transient{20}boom \n", Utf8.toString(out.toByteArray())); } { InputStream in = new MetaStream(new byte[] { 1 }); ByteArrayOutputStream out = new ByteArrayOutputStream(); HttpRequest nalle = HttpRequest .createTestRequest( "http://test4-steinar:19020/reserved-for-internal-use/feedapi", Method.POST, in); nalle.getJDiscRequest().headers().add(Headers.VERSION, "2"); nalle.getJDiscRequest().headers().add(Headers.SESSION_ID, sessionId); nalle.getJDiscRequest().headers().add(Headers.DRAIN, "true"); HttpResponse r = handler.handle(nalle); r.render(out); assertEquals("id:banana:banana::doc1 ERROR bad{20}mojo,{20}dude \n", Utf8.toString(out.toByteArray())); } } }