From 7dbefcfae54ae88292f4fb9c9c775b0c92f11680 Mon Sep 17 00:00:00 2001 From: Jon Marius Venstad Date: Thu, 5 Nov 2020 17:32:36 +0100 Subject: Dummy support for progress tokens and remote destination in LocalVisitorSession --- .../documentapi/local/LocalVisitorSession.java | 41 +++++++++++++--------- 1 file changed, 24 insertions(+), 17 deletions(-) diff --git a/documentapi/src/main/java/com/yahoo/documentapi/local/LocalVisitorSession.java b/documentapi/src/main/java/com/yahoo/documentapi/local/LocalVisitorSession.java index e0ae0278de8..eba884c5ee8 100644 --- a/documentapi/src/main/java/com/yahoo/documentapi/local/LocalVisitorSession.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/local/LocalVisitorSession.java @@ -28,7 +28,7 @@ import java.util.concurrent.atomic.AtomicReference; /** * Local visitor session that copies and iterates through all items in the local document access. - * Each document must be ack'ed for the session to be done visiting. + * Each document must be ack'ed for the session to be done visiting, unless the destinatino is remote. * Only document puts are sent by this session, and this is done from a separate thread. * * @author jonmv @@ -44,20 +44,23 @@ public class LocalVisitorSession implements VisitorSession { private final FieldSet fieldSet; private final AtomicReference state; private final AtomicReference phaser; + private final ProgressToken token; public LocalVisitorSession(LocalDocumentAccess access, VisitorParameters parameters) throws ParseException { - if (parameters.getResumeToken() != null) - throw new UnsupportedOperationException("Continuation via progress tokens is not supported"); - - if (parameters.getRemoteDataHandler() != null) - throw new UnsupportedOperationException("Remote data handlers are not supported"); - this.selector = new DocumentSelector(parameters.getDocumentSelection()); this.fieldSet = new FieldSetRepo().parse(access.getDocumentTypeManager(), parameters.fieldSet()); - - this.data = parameters.getLocalDataHandler() == null ? new VisitorDataQueue() : parameters.getLocalDataHandler(); - this.data.reset(); - this.data.setSession(this); + this.token = parameters.getResumeToken(); + + if (parameters.getRemoteDataHandler() == null) { + this.data = parameters.getLocalDataHandler() == null ? new VisitorDataQueue() : parameters.getLocalDataHandler(); + this.data.reset(); + this.data.setSession(this); + } + else { + if (parameters.getLocalDataHandler() != null) + throw new IllegalArgumentException("Cannot have both a remote and a local data handler"); + this.data = null; + } this.control = parameters.getControlHandler() == null ? new VisitorControlHandler() : parameters.getControlHandler(); this.control.reset(); @@ -98,8 +101,11 @@ public class LocalVisitorSession implements VisitorSession { if (synchronizer != null) synchronizer.arriveAndAwaitAdvance(); - data.onMessage(new PutDocumentMessage(new DocumentPut(copy)), - new AckToken(id)); + if (data != null) + data.onMessage(new PutDocumentMessage(new DocumentPut(copy)), + new AckToken(id)); + else + outstanding.remove(id); if (synchronizer != null) synchronizer.arriveAndAwaitAdvance(); @@ -127,9 +133,9 @@ public class LocalVisitorSession implements VisitorSession { } finally { if (synchronizer != null) - synchronizer.arriveAndDeregister(); - - data.onDone(); + synchronizer.awaitAdvance(synchronizer.arriveAndDeregister()); + if (data != null) + data.onDone(); } }).start(); } @@ -140,9 +146,10 @@ public class LocalVisitorSession implements VisitorSession { && control.isDone(); // Control handler has been notified } + /** Returns the token set in the parameters used to create this. */ @Override public ProgressToken getProgress() { - throw new UnsupportedOperationException("Progress tokens are not supported"); + return token; } @Override -- cgit v1.2.3