summaryrefslogtreecommitdiffstats
path: root/documentapi
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@yahoo-inc.com>2016-07-28 14:10:14 +0200
committerTor Brede Vekterli <vekterli@yahoo-inc.com>2016-07-28 14:10:14 +0200
commitd9887f2149521fdfde3de857a97b240452a0c0c5 (patch)
treefe651e743ea80244e71b793f70ba5dbace0ac0bd /documentapi
parent893bb85f4517c3c8ea6c6b00315eb76df2f4e949 (diff)
Replace home-brewed blocking queue with Java's built-in one
Diffstat (limited to 'documentapi')
-rw-r--r--documentapi/src/main/java/com/yahoo/documentapi/VisitorDataQueue.java51
1 files changed, 11 insertions, 40 deletions
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/VisitorDataQueue.java b/documentapi/src/main/java/com/yahoo/documentapi/VisitorDataQueue.java
index 993638bb4b8..db5d760caba 100644
--- a/documentapi/src/main/java/com/yahoo/documentapi/VisitorDataQueue.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/VisitorDataQueue.java
@@ -1,17 +1,16 @@
// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.documentapi;
-import com.yahoo.document.BucketId;
import com.yahoo.document.DocumentOperation;
-import com.yahoo.documentapi.messagebus.protocol.DocumentListEntry;
import com.yahoo.documentapi.messagebus.protocol.PutDocumentMessage;
import com.yahoo.documentapi.messagebus.protocol.RemoveDocumentMessage;
import com.yahoo.messagebus.Message;
import com.yahoo.vdslib.DocumentList;
import com.yahoo.vdslib.Entry;
-import java.util.LinkedList;
-import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
/**
@@ -23,11 +22,12 @@ import java.util.List;
* <code>getNext</code> methods and need to be acked when processed for
* visiting not to halt. The class is thread safe.
*
- * @author <a href="mailto:humbe@yahoo-inc.com">Håkon Humberset</a>
+ * @author Håkon Humberset
+ * @author @vekterli
*/
public class VisitorDataQueue extends VisitorDataHandler {
- final LinkedList<VisitorResponse> pendingResponses = new LinkedList<VisitorResponse>();
+ final BlockingQueue<VisitorResponse> pendingResponses = new LinkedBlockingQueue<>();
/** Creates a new visitor data queue. */
public VisitorDataQueue() {
@@ -37,18 +37,13 @@ public class VisitorDataQueue extends VisitorDataHandler {
@Override
public void reset() {
super.reset();
- synchronized (pendingResponses) {
- pendingResponses.clear();
- }
+ pendingResponses.clear();
}
private void appendSingleOpToPendingList(final DocumentOperation op, final AckToken token) {
final DocumentList docList = DocumentList.create(Entry.create(op));
final DocumentListVisitorResponse response = new DocumentListVisitorResponse(docList, token);
- synchronized (pendingResponses) {
- pendingResponses.add(response);
- pendingResponses.notifyAll();
- }
+ pendingResponses.add(response);
}
@Override
@@ -69,43 +64,19 @@ public class VisitorDataQueue extends VisitorDataHandler {
*/
@Deprecated
public void onDocuments(DocumentList docs, AckToken token) {
- synchronized (pendingResponses) {
- pendingResponses.add(new DocumentListVisitorResponse(docs, token));
- pendingResponses.notifyAll();
- }
+ pendingResponses.add(new DocumentListVisitorResponse(docs, token));
}
// Inherit doc from VisitorDataHandler
@Override
public VisitorResponse getNext() {
- synchronized (pendingResponses) {
- return (pendingResponses.isEmpty()
- ? null : pendingResponses.removeFirst());
- }
+ return pendingResponses.poll();
}
// Inherit doc from VisitorDataHandler
@Override
public VisitorResponse getNext(int timeoutMilliseconds) throws InterruptedException {
- synchronized (pendingResponses) {
- if (pendingResponses.isEmpty()) {
- if (timeoutMilliseconds == 0) {
- while (pendingResponses.isEmpty()) {
- pendingResponses.wait();
- }
- } else {
- pendingResponses.wait(timeoutMilliseconds);
- }
- }
- return (pendingResponses.isEmpty()
- ? null : pendingResponses.removeFirst());
- }
+ return pendingResponses.poll(timeoutMilliseconds, TimeUnit.MILLISECONDS);
}
- @Override
- public void onDone() {
- synchronized (pendingResponses) {
- pendingResponses.notifyAll();
- }
- }
}