summaryrefslogtreecommitdiffstats
path: root/container-search/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'container-search/src/main')
-rw-r--r--container-search/src/main/java/com/yahoo/search/Result.java2
-rw-r--r--container-search/src/main/java/com/yahoo/search/rendering/EventRenderer.java116
-rw-r--r--container-search/src/main/java/com/yahoo/search/rendering/JsonRenderer.java24
-rw-r--r--container-search/src/main/java/com/yahoo/search/rendering/RendererRegistry.java8
-rw-r--r--container-search/src/main/java/com/yahoo/search/result/EventStream.java120
5 files changed, 260 insertions, 10 deletions
diff --git a/container-search/src/main/java/com/yahoo/search/Result.java b/container-search/src/main/java/com/yahoo/search/Result.java
index b1a0107c6d8..a989688575d 100644
--- a/container-search/src/main/java/com/yahoo/search/Result.java
+++ b/container-search/src/main/java/com/yahoo/search/Result.java
@@ -20,7 +20,7 @@ import java.util.Iterator;
* a single HitGroup containing hits of the result. The HitGroup may contain Hits, which are the individual
* result items, as well as further HitGroups, making up a <i>composite</i> structure. This allows the hits of a result
* to be hierarchically organized. A Hit is polymorphic and may contain any kind of information deemed
- * an approriate partial answer to the Query.
+ * an appropriate partial answer to the Query.
* <p>
* Do not cache this as it holds references to objects that should be garbage collected.
*
diff --git a/container-search/src/main/java/com/yahoo/search/rendering/EventRenderer.java b/container-search/src/main/java/com/yahoo/search/rendering/EventRenderer.java
new file mode 100644
index 00000000000..83ae349f5a0
--- /dev/null
+++ b/container-search/src/main/java/com/yahoo/search/rendering/EventRenderer.java
@@ -0,0 +1,116 @@
+// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.search.rendering;
+
+import com.yahoo.search.result.EventStream;
+import com.fasterxml.jackson.core.JsonEncoding;
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonFactoryBuilder;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.core.StreamReadConstraints;
+import com.fasterxml.jackson.core.io.SerializedString;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.yahoo.processing.rendering.AsynchronousSectionedRenderer;
+import com.yahoo.processing.response.Data;
+import com.yahoo.processing.response.DataList;
+import com.yahoo.search.Result;
+import com.yahoo.search.result.ErrorHit;
+import com.yahoo.search.result.ErrorMessage;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.concurrent.Executor;
+
+import static com.fasterxml.jackson.databind.SerializationFeature.FLUSH_AFTER_WRITE_VALUE;
+
+/**
+ * A Server-Sent Events (SSE) renderer for asynchronous events such as
+ * tokens from a language model.
+ *
+ * @author lesters
+ */
+public class EventRenderer extends AsynchronousSectionedRenderer<Result> {
+
+ private static final JsonFactory generatorFactory = createGeneratorFactory();
+ private volatile JsonGenerator generator;
+
+ private static JsonFactory createGeneratorFactory() {
+ var factory = new JsonFactoryBuilder()
+ .streamReadConstraints(StreamReadConstraints.builder().maxStringLength(Integer.MAX_VALUE).build())
+ .build();
+ factory.setCodec(new ObjectMapper(factory).disable(FLUSH_AFTER_WRITE_VALUE));
+ return factory;
+ }
+
+ private static final boolean RENDER_EVENT_HEADER = true;
+ private static final boolean RENDER_END_EVENT = true;
+
+ public EventRenderer() {
+ this(null);
+ }
+
+ public EventRenderer(Executor executor) {
+ super(executor);
+ }
+
+ @Override
+ public void beginResponse(OutputStream outputStream) throws IOException {
+ generator = generatorFactory.createGenerator(outputStream, JsonEncoding.UTF8);
+ generator.setRootValueSeparator(new SerializedString(""));
+ }
+
+ @Override
+ public void beginList(DataList<?> dataList) throws IOException {
+ }
+
+ @Override
+ public void data(Data data) throws IOException {
+ if (data instanceof EventStream.Event event) {
+ if (RENDER_EVENT_HEADER) {
+ generator.writeRaw("event: " + event.type() + "\n");
+ }
+ generator.writeRaw("data: ");
+ generator.writeStartObject();
+ generator.writeStringField(event.type(), event.toString());
+ generator.writeEndObject();
+ generator.writeRaw("\n\n");
+ generator.flush();
+ }
+ else if (data instanceof ErrorHit) {
+ for (ErrorMessage error : ((ErrorHit) data).errors()) {
+ generator.writeRaw("event: error\n");
+ generator.writeRaw("data: ");
+ generator.writeStartObject();
+ generator.writeStringField("source", error.getSource());
+ generator.writeNumberField("error", error.getCode());
+ generator.writeStringField("message", error.getMessage());
+ generator.writeEndObject();
+ generator.writeRaw("\n\n");
+ generator.flush();
+ }
+ }
+ // Todo: support other types of data such as search results (hits), timing and trace
+ }
+
+ @Override
+ public void endList(DataList<?> dataList) throws IOException {
+ }
+
+ @Override
+ public void endResponse() throws IOException {
+ if (RENDER_END_EVENT) {
+ generator.writeRaw("event: end\n");
+ }
+ generator.close();
+ }
+
+ @Override
+ public String getEncoding() {
+ return "utf-8";
+ }
+
+ @Override
+ public String getMimeType() {
+ return "text/event-stream";
+ }
+
+}
diff --git a/container-search/src/main/java/com/yahoo/search/rendering/JsonRenderer.java b/container-search/src/main/java/com/yahoo/search/rendering/JsonRenderer.java
index e876f8e06d0..69410070453 100644
--- a/container-search/src/main/java/com/yahoo/search/rendering/JsonRenderer.java
+++ b/container-search/src/main/java/com/yahoo/search/rendering/JsonRenderer.java
@@ -40,6 +40,7 @@ import com.yahoo.search.result.Coverage;
import com.yahoo.search.result.DefaultErrorHit;
import com.yahoo.search.result.ErrorHit;
import com.yahoo.search.result.ErrorMessage;
+import com.yahoo.search.result.EventStream;
import com.yahoo.search.result.FeatureData;
import com.yahoo.search.result.Hit;
import com.yahoo.search.result.HitGroup;
@@ -243,17 +244,19 @@ public class JsonRenderer extends AsynchronousSectionedRenderer<Result> {
@Override
public void beginList(DataList<?> list) throws IOException {
- Preconditions.checkArgument(list instanceof HitGroup,
- "Expected subclass of com.yahoo.search.result.HitGroup, got %s.",
- list.getClass());
moreChildren();
- renderHitGroupHead((HitGroup) list);
+ if (list instanceof HitGroup) {
+ renderHitGroupHead((HitGroup) list);
+ } else if (list instanceof EventStream) {
+ renderHitGroupHead(new HitGroup("event_stream")); // Consider waiting for all events and create a single summary hit
+ } else {
+ throw new IllegalArgumentException("Expected subclass of com.yahoo.search.result.HitGroup, got " + list.getClass());
+ }
}
protected void moreChildren() throws IOException {
if (!renderedChildren.isEmpty())
childrenArray();
-
renderedChildren.push(0);
}
@@ -443,10 +446,13 @@ public class JsonRenderer extends AsynchronousSectionedRenderer<Result> {
@Override
public void data(Data data) throws IOException {
- Preconditions.checkArgument(data instanceof Hit,
- "Expected subclass of com.yahoo.search.result.Hit, got %s.",
- data.getClass());
- renderHit((Hit) data);
+ if (data instanceof Hit) {
+ renderHit((Hit) data);
+ } else if (data instanceof EventStream.Event) {
+ renderHit(((EventStream.Event) data).asHit());
+ } else {
+ throw new IllegalArgumentException("Expected subclass of com.yahoo.search.result.Hit, got " + data.getClass());
+ }
}
@Override
diff --git a/container-search/src/main/java/com/yahoo/search/rendering/RendererRegistry.java b/container-search/src/main/java/com/yahoo/search/rendering/RendererRegistry.java
index 3287a61c81f..d62860afcda 100644
--- a/container-search/src/main/java/com/yahoo/search/rendering/RendererRegistry.java
+++ b/container-search/src/main/java/com/yahoo/search/rendering/RendererRegistry.java
@@ -24,6 +24,7 @@ public final class RendererRegistry extends ComponentRegistry<com.yahoo.processi
public static final ComponentId xmlRendererId = ComponentId.fromString("XmlRenderer");
public static final ComponentId pageRendererId = ComponentId.fromString("PageTemplatesXmlRenderer");
public static final ComponentId jsonRendererId = ComponentId.fromString("JsonRenderer");
+ public static final ComponentId eventRendererId = ComponentId.fromString("EventRenderer");
public static final ComponentId defaultRendererId = jsonRendererId;
@@ -56,6 +57,11 @@ public final class RendererRegistry extends ComponentRegistry<com.yahoo.processi
pageRenderer.initId(pageRendererId);
register(pageRenderer.getId(), pageRenderer);
+ // Add event renderer
+ Renderer eventRenderer = new EventRenderer(executor);
+ eventRenderer.initId(eventRendererId);
+ register(eventRenderer.getId(), eventRenderer);
+
// add application renderers
for (Renderer renderer : renderers)
register(renderer.getId(), renderer);
@@ -69,6 +75,7 @@ public final class RendererRegistry extends ComponentRegistry<com.yahoo.processi
getRenderer(jsonRendererId.toSpecification()).deconstruct();
getRenderer(xmlRendererId.toSpecification()).deconstruct();
getRenderer(pageRendererId.toSpecification()).deconstruct();
+ getRenderer(eventRendererId.toSpecification()).deconstruct();
}
/**
@@ -92,6 +99,7 @@ public final class RendererRegistry extends ComponentRegistry<com.yahoo.processi
if (format.stringValue().equals("json")) return getComponent(jsonRendererId);
if (format.stringValue().equals("xml")) return getComponent(xmlRendererId);
if (format.stringValue().equals("page")) return getComponent(pageRendererId);
+ if (format.stringValue().equals("sse")) return getComponent(eventRendererId);
com.yahoo.processing.rendering.Renderer<Result> renderer = getComponent(format);
if (renderer == null)
diff --git a/container-search/src/main/java/com/yahoo/search/result/EventStream.java b/container-search/src/main/java/com/yahoo/search/result/EventStream.java
new file mode 100644
index 00000000000..b393a91e6d0
--- /dev/null
+++ b/container-search/src/main/java/com/yahoo/search/result/EventStream.java
@@ -0,0 +1,120 @@
+// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.search.result;
+
+import com.yahoo.collections.ListenableArrayList;
+import com.yahoo.component.provider.ListenableFreezableClass;
+import com.yahoo.processing.Request;
+import com.yahoo.processing.response.Data;
+import com.yahoo.processing.response.DataList;
+import com.yahoo.processing.response.DefaultIncomingData;
+import com.yahoo.processing.response.IncomingData;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * A stream of events which can be rendered as Server-Sent Events (SSE).
+ *
+ * @author lesters
+ */
+public class EventStream extends Hit implements DataList<Data> {
+
+ private final ListenableArrayList<Data> data = new ListenableArrayList<>(16);
+ private final IncomingData<Data> incomingData;
+ private final AtomicInteger eventCount = new AtomicInteger(0);
+
+ public final static String EVENT_TYPE_TOKEN = "token";
+ public final static String DEFAULT_EVENT_TYPE = EVENT_TYPE_TOKEN;
+
+ public EventStream() {
+ super();
+ this.incomingData = new DefaultIncomingData<>(this);
+ }
+
+ public void add(String data) {
+ incoming().add(new Event(eventCount.incrementAndGet(), data, DEFAULT_EVENT_TYPE));
+ }
+
+ public void add(String data, String type) {
+ incoming().add(new Event(eventCount.incrementAndGet(), data, type));
+ }
+
+ public void error(String source, ErrorMessage message) {
+ incoming().add(new DefaultErrorHit(source, message));
+ }
+
+ public void markComplete() {
+ incoming().markComplete();
+ }
+
+ @Override
+ public Data add(Data event) {
+ data.add(event);
+ return event;
+ }
+
+ @Override
+ public Data get(int index) {
+ return data.get(index);
+ }
+
+ @Override
+ public List<Data> asList() {
+ return data;
+ }
+
+ @Override
+ public IncomingData<Data> incoming() {
+ return incomingData;
+ }
+
+ @Override
+ public CompletableFuture<DataList<Data>> completeFuture() {
+ return incomingData.completedFuture();
+ }
+
+ @Override
+ public void addDataListener(Runnable runnable) {
+ data.addListener(runnable);
+ }
+
+ @Override
+ public void close() {
+ }
+
+ public static class Event extends ListenableFreezableClass implements Data {
+
+ private final int eventNumber;
+ private final String data;
+ private final String type;
+
+ public Event(int eventNumber, String data, String type) {
+ this.eventNumber = eventNumber;
+ this.data = data;
+ this.type = type;
+ }
+
+ public String toString() {
+ return data;
+ }
+
+ public String type() {
+ return type;
+ }
+
+ @Override
+ public Request request() {
+ return null;
+ }
+
+ // For json rendering
+ public Hit asHit() {
+ Hit hit = new Hit(String.valueOf(eventNumber));
+ hit.setField(type, data);
+ return hit;
+ }
+
+ }
+
+}