summaryrefslogtreecommitdiffstats
path: root/documentapi
diff options
context:
space:
mode:
authorJon Marius Venstad <jonmv@users.noreply.github.com>2020-11-09 13:58:54 +0100
committerGitHub <noreply@github.com>2020-11-09 13:58:54 +0100
commit7734b8a6eb9a07af57f6f9009fef1202bc628e09 (patch)
treef72a1b70aecaeb7f34b74b96f82c5ca1f5cfd152 /documentapi
parent089145d4f8f7225b41a2c23c6d551fc599d609f6 (diff)
parent215d8a7a446a37d74be3900061d4488a662b54cb (diff)
Merge pull request #15187 from vespa-engine/jonmv/reindexig-controller
Jonmv/reindexig controller
Diffstat (limited to 'documentapi')
-rwxr-xr-xdocumentapi/src/main/java/com/yahoo/documentapi/VisitorIterator.java4
-rw-r--r--documentapi/src/main/java/com/yahoo/documentapi/local/LocalVisitorSession.java47
-rwxr-xr-xdocumentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java2
3 files changed, 33 insertions, 20 deletions
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/VisitorIterator.java b/documentapi/src/main/java/com/yahoo/documentapi/VisitorIterator.java
index 0dd96275f9d..e98be6871b4 100755
--- a/documentapi/src/main/java/com/yahoo/documentapi/VisitorIterator.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/VisitorIterator.java
@@ -26,8 +26,8 @@ import java.util.logging.Logger;
*/
public class VisitorIterator {
- private ProgressToken progressToken;
- private BucketSource bucketSource;
+ private final ProgressToken progressToken;
+ private final BucketSource bucketSource;
private int distributionBitCount;
private static final Logger log = Logger.getLogger(VisitorIterator.class.getName());
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/local/LocalVisitorSession.java b/documentapi/src/main/java/com/yahoo/documentapi/local/LocalVisitorSession.java
index e0ae0278de8..d332b1fb1ca 100644
--- a/documentapi/src/main/java/com/yahoo/documentapi/local/LocalVisitorSession.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/local/LocalVisitorSession.java
@@ -28,7 +28,7 @@ import java.util.concurrent.atomic.AtomicReference;
/**
* Local visitor session that copies and iterates through all items in the local document access.
- * Each document must be ack'ed for the session to be done visiting.
+ * Each document must be ack'ed for the session to be done visiting, unless the destination is remote.
* Only document puts are sent by this session, and this is done from a separate thread.
*
* @author jonmv
@@ -44,20 +44,23 @@ public class LocalVisitorSession implements VisitorSession {
private final FieldSet fieldSet;
private final AtomicReference<State> state;
private final AtomicReference<Phaser> phaser;
+ private final ProgressToken token;
public LocalVisitorSession(LocalDocumentAccess access, VisitorParameters parameters) throws ParseException {
- if (parameters.getResumeToken() != null)
- throw new UnsupportedOperationException("Continuation via progress tokens is not supported");
-
- if (parameters.getRemoteDataHandler() != null)
- throw new UnsupportedOperationException("Remote data handlers are not supported");
-
this.selector = new DocumentSelector(parameters.getDocumentSelection());
this.fieldSet = new FieldSetRepo().parse(access.getDocumentTypeManager(), parameters.fieldSet());
-
- this.data = parameters.getLocalDataHandler() == null ? new VisitorDataQueue() : parameters.getLocalDataHandler();
- this.data.reset();
- this.data.setSession(this);
+ this.token = parameters.getResumeToken();
+
+ if (parameters.getRemoteDataHandler() == null) {
+ this.data = parameters.getLocalDataHandler() == null ? new VisitorDataQueue() : parameters.getLocalDataHandler();
+ this.data.reset();
+ this.data.setSession(this);
+ }
+ else {
+ if (parameters.getLocalDataHandler() != null)
+ throw new IllegalArgumentException("Cannot have both a remote and a local data handler");
+ this.data = null;
+ }
this.control = parameters.getControlHandler() == null ? new VisitorControlHandler() : parameters.getControlHandler();
this.control.reset();
@@ -98,8 +101,11 @@ public class LocalVisitorSession implements VisitorSession {
if (synchronizer != null)
synchronizer.arriveAndAwaitAdvance();
- data.onMessage(new PutDocumentMessage(new DocumentPut(copy)),
- new AckToken(id));
+ if (data != null)
+ data.onMessage(new PutDocumentMessage(new DocumentPut(copy)),
+ new AckToken(id));
+ else
+ outstanding.remove(id);
if (synchronizer != null)
synchronizer.arriveAndAwaitAdvance();
@@ -127,9 +133,9 @@ public class LocalVisitorSession implements VisitorSession {
}
finally {
if (synchronizer != null)
- synchronizer.arriveAndDeregister();
-
- data.onDone();
+ synchronizer.awaitAdvance(synchronizer.arriveAndDeregister());
+ if (data != null)
+ data.onDone();
}
}).start();
}
@@ -140,9 +146,10 @@ public class LocalVisitorSession implements VisitorSession {
&& control.isDone(); // Control handler has been notified
}
+ /** Returns the token set in the parameters used to create this. */
@Override
public ProgressToken getProgress() {
- throw new UnsupportedOperationException("Progress tokens are not supported");
+ return token;
}
@Override
@@ -179,6 +186,12 @@ public class LocalVisitorSession implements VisitorSession {
@Override
public void destroy() {
abort();
+ try {
+ control.waitUntilDone(0);
+ }
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
}
}
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java
index e1d18080faf..257d491ea93 100755
--- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java
@@ -1166,7 +1166,7 @@ public class MessageBusVisitorSession implements VisitorSession {
}
}
} catch (InterruptedException e) {
- e.printStackTrace();
+ log.log(Level.WARNING, "Interrupted waiting for visitor session to be destroyed");
} finally {
try {
sender.destroy();