diff options
author | Jon Marius Venstad <jonmv@users.noreply.github.com> | 2020-11-09 13:58:54 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-11-09 13:58:54 +0100 |
commit | 7734b8a6eb9a07af57f6f9009fef1202bc628e09 (patch) | |
tree | f72a1b70aecaeb7f34b74b96f82c5ca1f5cfd152 /documentapi | |
parent | 089145d4f8f7225b41a2c23c6d551fc599d609f6 (diff) | |
parent | 215d8a7a446a37d74be3900061d4488a662b54cb (diff) |
Merge pull request #15187 from vespa-engine/jonmv/reindexig-controller
Jonmv/reindexig controller
Diffstat (limited to 'documentapi')
3 files changed, 33 insertions, 20 deletions
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/VisitorIterator.java b/documentapi/src/main/java/com/yahoo/documentapi/VisitorIterator.java index 0dd96275f9d..e98be6871b4 100755 --- a/documentapi/src/main/java/com/yahoo/documentapi/VisitorIterator.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/VisitorIterator.java @@ -26,8 +26,8 @@ import java.util.logging.Logger; */ public class VisitorIterator { - private ProgressToken progressToken; - private BucketSource bucketSource; + private final ProgressToken progressToken; + private final BucketSource bucketSource; private int distributionBitCount; private static final Logger log = Logger.getLogger(VisitorIterator.class.getName()); diff --git a/documentapi/src/main/java/com/yahoo/documentapi/local/LocalVisitorSession.java b/documentapi/src/main/java/com/yahoo/documentapi/local/LocalVisitorSession.java index e0ae0278de8..d332b1fb1ca 100644 --- a/documentapi/src/main/java/com/yahoo/documentapi/local/LocalVisitorSession.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/local/LocalVisitorSession.java @@ -28,7 +28,7 @@ import java.util.concurrent.atomic.AtomicReference; /** * Local visitor session that copies and iterates through all items in the local document access. - * Each document must be ack'ed for the session to be done visiting. + * Each document must be ack'ed for the session to be done visiting, unless the destination is remote. * Only document puts are sent by this session, and this is done from a separate thread. * * @author jonmv @@ -44,20 +44,23 @@ public class LocalVisitorSession implements VisitorSession { private final FieldSet fieldSet; private final AtomicReference<State> state; private final AtomicReference<Phaser> phaser; + private final ProgressToken token; public LocalVisitorSession(LocalDocumentAccess access, VisitorParameters parameters) throws ParseException { - if (parameters.getResumeToken() != null) - throw new UnsupportedOperationException("Continuation via progress tokens is not supported"); - - if (parameters.getRemoteDataHandler() != null) - throw new UnsupportedOperationException("Remote data handlers are not supported"); - this.selector = new DocumentSelector(parameters.getDocumentSelection()); this.fieldSet = new FieldSetRepo().parse(access.getDocumentTypeManager(), parameters.fieldSet()); - - this.data = parameters.getLocalDataHandler() == null ? new VisitorDataQueue() : parameters.getLocalDataHandler(); - this.data.reset(); - this.data.setSession(this); + this.token = parameters.getResumeToken(); + + if (parameters.getRemoteDataHandler() == null) { + this.data = parameters.getLocalDataHandler() == null ? new VisitorDataQueue() : parameters.getLocalDataHandler(); + this.data.reset(); + this.data.setSession(this); + } + else { + if (parameters.getLocalDataHandler() != null) + throw new IllegalArgumentException("Cannot have both a remote and a local data handler"); + this.data = null; + } this.control = parameters.getControlHandler() == null ? new VisitorControlHandler() : parameters.getControlHandler(); this.control.reset(); @@ -98,8 +101,11 @@ public class LocalVisitorSession implements VisitorSession { if (synchronizer != null) synchronizer.arriveAndAwaitAdvance(); - data.onMessage(new PutDocumentMessage(new DocumentPut(copy)), - new AckToken(id)); + if (data != null) + data.onMessage(new PutDocumentMessage(new DocumentPut(copy)), + new AckToken(id)); + else + outstanding.remove(id); if (synchronizer != null) synchronizer.arriveAndAwaitAdvance(); @@ -127,9 +133,9 @@ public class LocalVisitorSession implements VisitorSession { } finally { if (synchronizer != null) - synchronizer.arriveAndDeregister(); - - data.onDone(); + synchronizer.awaitAdvance(synchronizer.arriveAndDeregister()); + if (data != null) + data.onDone(); } }).start(); } @@ -140,9 +146,10 @@ public class LocalVisitorSession implements VisitorSession { && control.isDone(); // Control handler has been notified } + /** Returns the token set in the parameters used to create this. */ @Override public ProgressToken getProgress() { - throw new UnsupportedOperationException("Progress tokens are not supported"); + return token; } @Override @@ -179,6 +186,12 @@ public class LocalVisitorSession implements VisitorSession { @Override public void destroy() { abort(); + try { + control.waitUntilDone(0); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } } } diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java index e1d18080faf..257d491ea93 100755 --- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java @@ -1166,7 +1166,7 @@ public class MessageBusVisitorSession implements VisitorSession { } } } catch (InterruptedException e) { - e.printStackTrace(); + log.log(Level.WARNING, "Interrupted waiting for visitor session to be destroyed"); } finally { try { sender.destroy(); |