summaryrefslogtreecommitdiffstats
path: root/container-search/src/main/java/com/yahoo/search/rendering/EventRenderer.java
diff options
context:
space:
mode:
Diffstat (limited to 'container-search/src/main/java/com/yahoo/search/rendering/EventRenderer.java')
-rw-r--r--container-search/src/main/java/com/yahoo/search/rendering/EventRenderer.java119
1 files changed, 119 insertions, 0 deletions
diff --git a/container-search/src/main/java/com/yahoo/search/rendering/EventRenderer.java b/container-search/src/main/java/com/yahoo/search/rendering/EventRenderer.java
new file mode 100644
index 00000000000..46f9a53e698
--- /dev/null
+++ b/container-search/src/main/java/com/yahoo/search/rendering/EventRenderer.java
@@ -0,0 +1,119 @@
+// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.search.rendering;
+
+import com.yahoo.search.result.EventStream;
+import com.fasterxml.jackson.core.JsonEncoding;
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonFactoryBuilder;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.core.StreamReadConstraints;
+import com.fasterxml.jackson.core.io.SerializedString;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.yahoo.processing.rendering.AsynchronousSectionedRenderer;
+import com.yahoo.processing.response.Data;
+import com.yahoo.processing.response.DataList;
+import com.yahoo.search.Result;
+import com.yahoo.search.result.ErrorHit;
+import com.yahoo.search.result.ErrorMessage;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.concurrent.Executor;
+
+import static com.fasterxml.jackson.databind.SerializationFeature.FLUSH_AFTER_WRITE_VALUE;
+
+/**
+ * A Server-Sent Events (SSE) renderer for asynchronous events such as
+ * tokens from a language model.
+ *
+ * @author lesters
+ */
+public class EventRenderer extends AsynchronousSectionedRenderer<Result> {
+
+ private static final JsonFactory generatorFactory = createGeneratorFactory();
+ private volatile JsonGenerator generator;
+
+ private static JsonFactory createGeneratorFactory() {
+ var factory = new JsonFactoryBuilder()
+ .streamReadConstraints(StreamReadConstraints.builder().maxStringLength(Integer.MAX_VALUE).build())
+ .build();
+ factory.setCodec(new ObjectMapper(factory).disable(FLUSH_AFTER_WRITE_VALUE));
+ return factory;
+ }
+
+ private static final boolean RENDER_EVENT_HEADER = true;
+ private static final boolean RENDER_END_EVENT = true;
+
+ public EventRenderer() {
+ this(null);
+ }
+
+ public EventRenderer(Executor executor) {
+ super(executor);
+ }
+
+ @Override
+ public void beginResponse(OutputStream outputStream) throws IOException {
+ generator = generatorFactory.createGenerator(outputStream, JsonEncoding.UTF8);
+ generator.setRootValueSeparator(new SerializedString(""));
+ }
+
+ @Override
+ public void beginList(DataList<?> dataList) throws IOException {
+ if ( ! (dataList instanceof EventStream)) {
+ throw new IllegalArgumentException("EventRenderer currently only supports EventStreams");
+ // Todo: support results and timing and trace by delegating to JsonRenderer
+ }
+ }
+
+ @Override
+ public void data(Data data) throws IOException {
+ if (data instanceof EventStream.Event event) {
+ if (RENDER_EVENT_HEADER) {
+ generator.writeRaw("event: " + event.type() + "\n");
+ }
+ generator.writeRaw("data: ");
+ generator.writeStartObject();
+ generator.writeStringField(event.type(), event.toString());
+ generator.writeEndObject();
+ generator.writeRaw("\n\n");
+ generator.flush();
+ }
+ else if (data instanceof ErrorHit) {
+ for (ErrorMessage error : ((ErrorHit) data).errors()) {
+ generator.writeRaw("event: error\n");
+ generator.writeRaw("data: ");
+ generator.writeStartObject();
+ generator.writeStringField("source", error.getSource());
+ generator.writeNumberField("error", error.getCode());
+ generator.writeStringField("message", error.getMessage());
+ generator.writeEndObject();
+ generator.writeRaw("\n\n");
+ generator.flush();
+ }
+ }
+ }
+
+ @Override
+ public void endList(DataList<?> dataList) throws IOException {
+ }
+
+ @Override
+ public void endResponse() throws IOException {
+ if (RENDER_END_EVENT) {
+ generator.writeRaw("event: end\n");
+ }
+ generator.close();
+ }
+
+ @Override
+ public String getEncoding() {
+ return "utf-8";
+ }
+
+ @Override
+ public String getMimeType() {
+ return "text/event-stream";
+ }
+
+}