diff options
author | Tor Brede Vekterli <vekterli@yahoo-inc.com> | 2016-07-28 14:10:14 +0200 |
---|---|---|
committer | Tor Brede Vekterli <vekterli@yahoo-inc.com> | 2016-07-28 14:10:14 +0200 |
commit | d9887f2149521fdfde3de857a97b240452a0c0c5 (patch) | |
tree | fe651e743ea80244e71b793f70ba5dbace0ac0bd /documentapi | |
parent | 893bb85f4517c3c8ea6c6b00315eb76df2f4e949 (diff) |
Replace home-brewed blocking queue with Java's built-in one
Diffstat (limited to 'documentapi')
-rw-r--r-- | documentapi/src/main/java/com/yahoo/documentapi/VisitorDataQueue.java | 51 |
1 files changed, 11 insertions, 40 deletions
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/VisitorDataQueue.java b/documentapi/src/main/java/com/yahoo/documentapi/VisitorDataQueue.java index 993638bb4b8..db5d760caba 100644 --- a/documentapi/src/main/java/com/yahoo/documentapi/VisitorDataQueue.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/VisitorDataQueue.java @@ -1,17 +1,16 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.documentapi; -import com.yahoo.document.BucketId; import com.yahoo.document.DocumentOperation; -import com.yahoo.documentapi.messagebus.protocol.DocumentListEntry; import com.yahoo.documentapi.messagebus.protocol.PutDocumentMessage; import com.yahoo.documentapi.messagebus.protocol.RemoveDocumentMessage; import com.yahoo.messagebus.Message; import com.yahoo.vdslib.DocumentList; import com.yahoo.vdslib.Entry; -import java.util.LinkedList; -import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; /** @@ -23,11 +22,12 @@ import java.util.List; * <code>getNext</code> methods and need to be acked when processed for * visiting not to halt. The class is thread safe. * - * @author <a href="mailto:humbe@yahoo-inc.com">Håkon Humberset</a> + * @author Håkon Humberset + * @author @vekterli */ public class VisitorDataQueue extends VisitorDataHandler { - final LinkedList<VisitorResponse> pendingResponses = new LinkedList<VisitorResponse>(); + final BlockingQueue<VisitorResponse> pendingResponses = new LinkedBlockingQueue<>(); /** Creates a new visitor data queue. */ public VisitorDataQueue() { @@ -37,18 +37,13 @@ public class VisitorDataQueue extends VisitorDataHandler { @Override public void reset() { super.reset(); - synchronized (pendingResponses) { - pendingResponses.clear(); - } + pendingResponses.clear(); } private void appendSingleOpToPendingList(final DocumentOperation op, final AckToken token) { final DocumentList docList = DocumentList.create(Entry.create(op)); final DocumentListVisitorResponse response = new DocumentListVisitorResponse(docList, token); - synchronized (pendingResponses) { - pendingResponses.add(response); - pendingResponses.notifyAll(); - } + pendingResponses.add(response); } @Override @@ -69,43 +64,19 @@ public class VisitorDataQueue extends VisitorDataHandler { */ @Deprecated public void onDocuments(DocumentList docs, AckToken token) { - synchronized (pendingResponses) { - pendingResponses.add(new DocumentListVisitorResponse(docs, token)); - pendingResponses.notifyAll(); - } + pendingResponses.add(new DocumentListVisitorResponse(docs, token)); } // Inherit doc from VisitorDataHandler @Override public VisitorResponse getNext() { - synchronized (pendingResponses) { - return (pendingResponses.isEmpty() - ? null : pendingResponses.removeFirst()); - } + return pendingResponses.poll(); } // Inherit doc from VisitorDataHandler @Override public VisitorResponse getNext(int timeoutMilliseconds) throws InterruptedException { - synchronized (pendingResponses) { - if (pendingResponses.isEmpty()) { - if (timeoutMilliseconds == 0) { - while (pendingResponses.isEmpty()) { - pendingResponses.wait(); - } - } else { - pendingResponses.wait(timeoutMilliseconds); - } - } - return (pendingResponses.isEmpty() - ? null : pendingResponses.removeFirst()); - } + return pendingResponses.poll(timeoutMilliseconds, TimeUnit.MILLISECONDS); } - @Override - public void onDone() { - synchronized (pendingResponses) { - pendingResponses.notifyAll(); - } - } } |