diff options
author | Jon Bratseth <bratseth@gmail.com> | 2024-03-21 14:22:03 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-03-21 14:22:03 +0100 |
commit | 138da140cfde273599fb37c8f65e28f1a5c6957a (patch) | |
tree | 635ced173447b6fec679c88b947958e577db95cf /container-search/src/main | |
parent | 723d6cacbdce4c45e01c92cb3e2eeb71f7b513f2 (diff) | |
parent | d0333079f0cc7c13185b2bf4f015a304c72af2f1 (diff) |
Merge pull request #30526 from vespa-engine/lesters/server-sent-events
Add server-sent events (SSE) renderer
Diffstat (limited to 'container-search/src/main')
5 files changed, 260 insertions, 10 deletions
diff --git a/container-search/src/main/java/com/yahoo/search/Result.java b/container-search/src/main/java/com/yahoo/search/Result.java index b1a0107c6d8..a989688575d 100644 --- a/container-search/src/main/java/com/yahoo/search/Result.java +++ b/container-search/src/main/java/com/yahoo/search/Result.java @@ -20,7 +20,7 @@ import java.util.Iterator; * a single HitGroup containing hits of the result. The HitGroup may contain Hits, which are the individual * result items, as well as further HitGroups, making up a <i>composite</i> structure. This allows the hits of a result * to be hierarchically organized. A Hit is polymorphic and may contain any kind of information deemed - * an approriate partial answer to the Query. + * an appropriate partial answer to the Query. * <p> * Do not cache this as it holds references to objects that should be garbage collected. * diff --git a/container-search/src/main/java/com/yahoo/search/rendering/EventRenderer.java b/container-search/src/main/java/com/yahoo/search/rendering/EventRenderer.java new file mode 100644 index 00000000000..83ae349f5a0 --- /dev/null +++ b/container-search/src/main/java/com/yahoo/search/rendering/EventRenderer.java @@ -0,0 +1,116 @@ +// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.search.rendering; + +import com.yahoo.search.result.EventStream; +import com.fasterxml.jackson.core.JsonEncoding; +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonFactoryBuilder; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.core.StreamReadConstraints; +import com.fasterxml.jackson.core.io.SerializedString; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.yahoo.processing.rendering.AsynchronousSectionedRenderer; +import com.yahoo.processing.response.Data; +import com.yahoo.processing.response.DataList; +import com.yahoo.search.Result; +import com.yahoo.search.result.ErrorHit; +import com.yahoo.search.result.ErrorMessage; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.concurrent.Executor; + +import static com.fasterxml.jackson.databind.SerializationFeature.FLUSH_AFTER_WRITE_VALUE; + +/** + * A Server-Sent Events (SSE) renderer for asynchronous events such as + * tokens from a language model. + * + * @author lesters + */ +public class EventRenderer extends AsynchronousSectionedRenderer<Result> { + + private static final JsonFactory generatorFactory = createGeneratorFactory(); + private volatile JsonGenerator generator; + + private static JsonFactory createGeneratorFactory() { + var factory = new JsonFactoryBuilder() + .streamReadConstraints(StreamReadConstraints.builder().maxStringLength(Integer.MAX_VALUE).build()) + .build(); + factory.setCodec(new ObjectMapper(factory).disable(FLUSH_AFTER_WRITE_VALUE)); + return factory; + } + + private static final boolean RENDER_EVENT_HEADER = true; + private static final boolean RENDER_END_EVENT = true; + + public EventRenderer() { + this(null); + } + + public EventRenderer(Executor executor) { + super(executor); + } + + @Override + public void beginResponse(OutputStream outputStream) throws IOException { + generator = generatorFactory.createGenerator(outputStream, JsonEncoding.UTF8); + generator.setRootValueSeparator(new SerializedString("")); + } + + @Override + public void beginList(DataList<?> dataList) throws IOException { + } + + @Override + public void data(Data data) throws IOException { + if (data instanceof EventStream.Event event) { + if (RENDER_EVENT_HEADER) { + generator.writeRaw("event: " + event.type() + "\n"); + } + generator.writeRaw("data: "); + generator.writeStartObject(); + generator.writeStringField(event.type(), event.toString()); + generator.writeEndObject(); + generator.writeRaw("\n\n"); + generator.flush(); + } + else if (data instanceof ErrorHit) { + for (ErrorMessage error : ((ErrorHit) data).errors()) { + generator.writeRaw("event: error\n"); + generator.writeRaw("data: "); + generator.writeStartObject(); + generator.writeStringField("source", error.getSource()); + generator.writeNumberField("error", error.getCode()); + generator.writeStringField("message", error.getMessage()); + generator.writeEndObject(); + generator.writeRaw("\n\n"); + generator.flush(); + } + } + // Todo: support other types of data such as search results (hits), timing and trace + } + + @Override + public void endList(DataList<?> dataList) throws IOException { + } + + @Override + public void endResponse() throws IOException { + if (RENDER_END_EVENT) { + generator.writeRaw("event: end\n"); + } + generator.close(); + } + + @Override + public String getEncoding() { + return "utf-8"; + } + + @Override + public String getMimeType() { + return "text/event-stream"; + } + +} diff --git a/container-search/src/main/java/com/yahoo/search/rendering/JsonRenderer.java b/container-search/src/main/java/com/yahoo/search/rendering/JsonRenderer.java index e876f8e06d0..69410070453 100644 --- a/container-search/src/main/java/com/yahoo/search/rendering/JsonRenderer.java +++ b/container-search/src/main/java/com/yahoo/search/rendering/JsonRenderer.java @@ -40,6 +40,7 @@ import com.yahoo.search.result.Coverage; import com.yahoo.search.result.DefaultErrorHit; import com.yahoo.search.result.ErrorHit; import com.yahoo.search.result.ErrorMessage; +import com.yahoo.search.result.EventStream; import com.yahoo.search.result.FeatureData; import com.yahoo.search.result.Hit; import com.yahoo.search.result.HitGroup; @@ -243,17 +244,19 @@ public class JsonRenderer extends AsynchronousSectionedRenderer<Result> { @Override public void beginList(DataList<?> list) throws IOException { - Preconditions.checkArgument(list instanceof HitGroup, - "Expected subclass of com.yahoo.search.result.HitGroup, got %s.", - list.getClass()); moreChildren(); - renderHitGroupHead((HitGroup) list); + if (list instanceof HitGroup) { + renderHitGroupHead((HitGroup) list); + } else if (list instanceof EventStream) { + renderHitGroupHead(new HitGroup("event_stream")); // Consider waiting for all events and create a single summary hit + } else { + throw new IllegalArgumentException("Expected subclass of com.yahoo.search.result.HitGroup, got " + list.getClass()); + } } protected void moreChildren() throws IOException { if (!renderedChildren.isEmpty()) childrenArray(); - renderedChildren.push(0); } @@ -443,10 +446,13 @@ public class JsonRenderer extends AsynchronousSectionedRenderer<Result> { @Override public void data(Data data) throws IOException { - Preconditions.checkArgument(data instanceof Hit, - "Expected subclass of com.yahoo.search.result.Hit, got %s.", - data.getClass()); - renderHit((Hit) data); + if (data instanceof Hit) { + renderHit((Hit) data); + } else if (data instanceof EventStream.Event) { + renderHit(((EventStream.Event) data).asHit()); + } else { + throw new IllegalArgumentException("Expected subclass of com.yahoo.search.result.Hit, got " + data.getClass()); + } } @Override diff --git a/container-search/src/main/java/com/yahoo/search/rendering/RendererRegistry.java b/container-search/src/main/java/com/yahoo/search/rendering/RendererRegistry.java index 3287a61c81f..d62860afcda 100644 --- a/container-search/src/main/java/com/yahoo/search/rendering/RendererRegistry.java +++ b/container-search/src/main/java/com/yahoo/search/rendering/RendererRegistry.java @@ -24,6 +24,7 @@ public final class RendererRegistry extends ComponentRegistry<com.yahoo.processi public static final ComponentId xmlRendererId = ComponentId.fromString("XmlRenderer"); public static final ComponentId pageRendererId = ComponentId.fromString("PageTemplatesXmlRenderer"); public static final ComponentId jsonRendererId = ComponentId.fromString("JsonRenderer"); + public static final ComponentId eventRendererId = ComponentId.fromString("EventRenderer"); public static final ComponentId defaultRendererId = jsonRendererId; @@ -56,6 +57,11 @@ public final class RendererRegistry extends ComponentRegistry<com.yahoo.processi pageRenderer.initId(pageRendererId); register(pageRenderer.getId(), pageRenderer); + // Add event renderer + Renderer eventRenderer = new EventRenderer(executor); + eventRenderer.initId(eventRendererId); + register(eventRenderer.getId(), eventRenderer); + // add application renderers for (Renderer renderer : renderers) register(renderer.getId(), renderer); @@ -69,6 +75,7 @@ public final class RendererRegistry extends ComponentRegistry<com.yahoo.processi getRenderer(jsonRendererId.toSpecification()).deconstruct(); getRenderer(xmlRendererId.toSpecification()).deconstruct(); getRenderer(pageRendererId.toSpecification()).deconstruct(); + getRenderer(eventRendererId.toSpecification()).deconstruct(); } /** @@ -92,6 +99,7 @@ public final class RendererRegistry extends ComponentRegistry<com.yahoo.processi if (format.stringValue().equals("json")) return getComponent(jsonRendererId); if (format.stringValue().equals("xml")) return getComponent(xmlRendererId); if (format.stringValue().equals("page")) return getComponent(pageRendererId); + if (format.stringValue().equals("sse")) return getComponent(eventRendererId); com.yahoo.processing.rendering.Renderer<Result> renderer = getComponent(format); if (renderer == null) diff --git a/container-search/src/main/java/com/yahoo/search/result/EventStream.java b/container-search/src/main/java/com/yahoo/search/result/EventStream.java new file mode 100644 index 00000000000..b393a91e6d0 --- /dev/null +++ b/container-search/src/main/java/com/yahoo/search/result/EventStream.java @@ -0,0 +1,120 @@ +// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.search.result; + +import com.yahoo.collections.ListenableArrayList; +import com.yahoo.component.provider.ListenableFreezableClass; +import com.yahoo.processing.Request; +import com.yahoo.processing.response.Data; +import com.yahoo.processing.response.DataList; +import com.yahoo.processing.response.DefaultIncomingData; +import com.yahoo.processing.response.IncomingData; + +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * A stream of events which can be rendered as Server-Sent Events (SSE). + * + * @author lesters + */ +public class EventStream extends Hit implements DataList<Data> { + + private final ListenableArrayList<Data> data = new ListenableArrayList<>(16); + private final IncomingData<Data> incomingData; + private final AtomicInteger eventCount = new AtomicInteger(0); + + public final static String EVENT_TYPE_TOKEN = "token"; + public final static String DEFAULT_EVENT_TYPE = EVENT_TYPE_TOKEN; + + public EventStream() { + super(); + this.incomingData = new DefaultIncomingData<>(this); + } + + public void add(String data) { + incoming().add(new Event(eventCount.incrementAndGet(), data, DEFAULT_EVENT_TYPE)); + } + + public void add(String data, String type) { + incoming().add(new Event(eventCount.incrementAndGet(), data, type)); + } + + public void error(String source, ErrorMessage message) { + incoming().add(new DefaultErrorHit(source, message)); + } + + public void markComplete() { + incoming().markComplete(); + } + + @Override + public Data add(Data event) { + data.add(event); + return event; + } + + @Override + public Data get(int index) { + return data.get(index); + } + + @Override + public List<Data> asList() { + return data; + } + + @Override + public IncomingData<Data> incoming() { + return incomingData; + } + + @Override + public CompletableFuture<DataList<Data>> completeFuture() { + return incomingData.completedFuture(); + } + + @Override + public void addDataListener(Runnable runnable) { + data.addListener(runnable); + } + + @Override + public void close() { + } + + public static class Event extends ListenableFreezableClass implements Data { + + private final int eventNumber; + private final String data; + private final String type; + + public Event(int eventNumber, String data, String type) { + this.eventNumber = eventNumber; + this.data = data; + this.type = type; + } + + public String toString() { + return data; + } + + public String type() { + return type; + } + + @Override + public Request request() { + return null; + } + + // For json rendering + public Hit asHit() { + Hit hit = new Hit(String.valueOf(eventNumber)); + hit.setField(type, data); + return hit; + } + + } + +} |