From 4911919473cf6511fbd193c95cf0db81e5ebae07 Mon Sep 17 00:00:00 2001 From: Jon Marius Venstad Date: Wed, 10 Jun 2020 16:22:03 +0200 Subject: Add draft LocalVisitorSession --- .../documentapi/local/LocalVisitorSession.java | 141 +++++++++++++++++++++ 1 file changed, 141 insertions(+) create mode 100644 documentapi/src/main/java/com/yahoo/documentapi/local/LocalVisitorSession.java (limited to 'documentapi') diff --git a/documentapi/src/main/java/com/yahoo/documentapi/local/LocalVisitorSession.java b/documentapi/src/main/java/com/yahoo/documentapi/local/LocalVisitorSession.java new file mode 100644 index 00000000000..4a11d8ae8bc --- /dev/null +++ b/documentapi/src/main/java/com/yahoo/documentapi/local/LocalVisitorSession.java @@ -0,0 +1,141 @@ +package com.yahoo.documentapi.local; + +import com.yahoo.document.Document; +import com.yahoo.document.DocumentId; +import com.yahoo.document.DocumentPut; +import com.yahoo.documentapi.AckToken; +import com.yahoo.documentapi.ProgressToken; +import com.yahoo.documentapi.VisitorControlHandler; +import com.yahoo.documentapi.VisitorDataHandler; +import com.yahoo.documentapi.VisitorDataQueue; +import com.yahoo.documentapi.VisitorParameters; +import com.yahoo.documentapi.VisitorResponse; +import com.yahoo.documentapi.VisitorSession; +import com.yahoo.documentapi.messagebus.protocol.PutDocumentMessage; +import com.yahoo.messagebus.Trace; +import com.yahoo.yolean.Exceptions; + +import java.util.Map; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Local visitor session that copies and iterates through all items in the local document access. + * Each document must be ack'ed for the session to be done visiting. + * Only document puts are sent by this session, and this is done from a separate thread. + * + * @author jonmv + */ +public class LocalVisitorSession implements VisitorSession { + + private enum State { RUNNING, FAILURE, ABORTED, SUCCESS } + + private final VisitorDataHandler data; + private final VisitorControlHandler control; + private final Map outstanding; + private final AtomicReference state; + + public LocalVisitorSession(LocalDocumentAccess access, VisitorParameters parameters) { + if (parameters.getResumeToken() != null) + throw new UnsupportedOperationException("Continuation via progress tokens is not supported"); + + if (parameters.getRemoteDataHandler() != null) + throw new UnsupportedOperationException("Remote data handlers are not supported"); + + this.data = parameters.getLocalDataHandler() == null ? new VisitorDataQueue() : parameters.getLocalDataHandler(); + this.data.reset(); + this.data.setSession(this); + + this.control = parameters.getControlHandler() == null ? new VisitorControlHandler() : parameters.getControlHandler(); + this.control.reset(); + this.control.setSession(this); + + this.outstanding = new ConcurrentSkipListMap<>(access.documents); + this.state = new AtomicReference<>(State.RUNNING); + + start(); + } + + void start() { + new Thread(() -> { + try { + // Iterate through all documents and pass on to data handler + outstanding.forEach((id, document) -> { + data.onMessage(new PutDocumentMessage(new DocumentPut(document)), + new AckToken(id)); + }); + // Transition to a terminal state when done + state.updateAndGet(current -> { + switch (current) { + case RUNNING: + control.onDone(VisitorControlHandler.CompletionCode.SUCCESS, "Success"); + return State.SUCCESS; + case ABORTED: + control.onDone(VisitorControlHandler.CompletionCode.ABORTED, "Aborted by user"); + return State.ABORTED; + default: + control.onDone(VisitorControlHandler.CompletionCode.FAILURE, "Unexpected state '" + current + "'");; + return State.FAILURE; + } + }); + } + // Transition to failure terminal state on error + catch (Exception e) { + state.set(State.FAILURE); + outstanding.clear(); + control.onDone(VisitorControlHandler.CompletionCode.FAILURE, Exceptions.toMessageString(e)); + } + finally { + data.onDone(); + } + }).start(); + } + + @Override + public boolean isDone() { + return outstanding.isEmpty(); + } + + @Override + public ProgressToken getProgress() { + throw new UnsupportedOperationException("Progress tokens are not supported"); + } + + @Override + public Trace getTrace() { + throw new UnsupportedOperationException("Traces are not supported"); + } + + @Override + public boolean waitUntilDone(long timeoutMs) throws InterruptedException { + return control.waitUntilDone(timeoutMs); + } + + @Override + public void ack(AckToken token) { + outstanding.remove((DocumentId) token.ackObject); + } + + @Override + public void abort() { + state.updateAndGet(current -> current == State.RUNNING ? State.ABORTED : current); + outstanding.clear(); + } + + @Override + public VisitorResponse getNext() { + return data.getNext(); + } + + @Override + public VisitorResponse getNext(int timeoutMilliseconds) throws InterruptedException { + return data.getNext(timeoutMilliseconds); + } + + @Override + public void destroy() { + if ( ! isDone()) + abort(); + } + +} -- cgit v1.2.3