diff options
6 files changed, 56 insertions, 51 deletions
diff --git a/jrt/src/com/yahoo/jrt/Connection.java b/jrt/src/com/yahoo/jrt/Connection.java index 36e1f1c8a8e..d4938f8ecbb 100644 --- a/jrt/src/com/yahoo/jrt/Connection.java +++ b/jrt/src/com/yahoo/jrt/Connection.java @@ -1,8 +1,6 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.jrt; -import com.yahoo.jrt.CryptoSocket.HandshakeResult; - import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; diff --git a/messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java b/messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java index 0f0b704bba7..393cf420ae1 100644 --- a/messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java @@ -7,7 +7,12 @@ import com.yahoo.log.LogLevel; import com.yahoo.messagebus.metrics.MessageBusMetricSet; import com.yahoo.messagebus.network.Network; import com.yahoo.messagebus.network.NetworkOwner; -import com.yahoo.messagebus.routing.*; +import com.yahoo.messagebus.routing.Resender; +import com.yahoo.messagebus.routing.RetryPolicy; +import com.yahoo.messagebus.routing.RoutingPolicy; +import com.yahoo.messagebus.routing.RoutingSpec; +import com.yahoo.messagebus.routing.RoutingTable; +import com.yahoo.messagebus.routing.RoutingTableSpec; import com.yahoo.text.Utf8Array; import com.yahoo.text.Utf8String; diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java index ab741b36a05..fdd72c12532 100644 --- a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java @@ -17,7 +17,6 @@ import com.yahoo.jrt.Transport; import com.yahoo.jrt.slobrok.api.IMirror; import com.yahoo.jrt.slobrok.api.Mirror; import com.yahoo.jrt.slobrok.api.Register; -import com.yahoo.log.LogLevel; import com.yahoo.messagebus.EmptyReply; import com.yahoo.messagebus.Error; import com.yahoo.messagebus.ErrorCode; diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCSendAdapter.java b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCSendAdapter.java index dbdb6da6477..749ba4f4451 100755 --- a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCSendAdapter.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCSendAdapter.java @@ -17,7 +17,7 @@ public interface RPCSendAdapter { * * @param net The network to attach to. */ - public void attach(RPCNetwork net); + void attach(RPCNetwork net); /** * Performs the actual sending to the given recipient. @@ -27,5 +27,5 @@ public interface RPCSendAdapter { * @param payload The already serialized payload of the message to send. * @param timeRemaining The time remaining until the message expires. */ - public void send(RoutingNode recipient, Version version, byte[] payload, long timeRemaining); + void send(RoutingNode recipient, Version version, byte[] payload, long timeRemaining); } diff --git a/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/SimpleFeeder.java b/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/SimpleFeeder.java index e5b243c118a..e10dce31adf 100644 --- a/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/SimpleFeeder.java +++ b/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/SimpleFeeder.java @@ -3,7 +3,10 @@ package com.yahoo.vespa.feed.perf; import com.yahoo.document.DocumentPut; import com.yahoo.document.DocumentTypeManager; -import com.yahoo.documentapi.messagebus.protocol.*; +import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol; +import com.yahoo.documentapi.messagebus.protocol.PutDocumentMessage; +import com.yahoo.documentapi.messagebus.protocol.RemoveDocumentMessage; +import com.yahoo.documentapi.messagebus.protocol.UpdateDocumentMessage; import com.yahoo.messagebus.Error; import com.yahoo.messagebus.Message; import com.yahoo.messagebus.MessageBusParams; @@ -21,6 +24,8 @@ import java.io.IOException; import java.io.InputStream; import java.io.PrintStream; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; /** * @author Simon Thoresen Hult @@ -37,20 +42,19 @@ public class SimpleFeeder implements ReplyHandler { private final Route route; private final SourceSession session; private final long startTime = System.currentTimeMillis(); - private volatile Throwable failure; - private volatile long numReplies = 0; + private AtomicReference<Throwable> failure = new AtomicReference<>(null); + private AtomicLong numReplies = new AtomicLong(0); private long maxLatency = Long.MIN_VALUE; private long minLatency = Long.MAX_VALUE; private long nextHeader = startTime + HEADER_INTERVAL; private long nextReport = startTime + REPORT_INTERVAL; - private long numMessages = 0; private long sumLatency = 0; public static void main(String[] args) throws Throwable { new SimpleFeeder(new FeederParams().parseArgs(args)).run().close(); } - public SimpleFeeder(FeederParams params) { + SimpleFeeder(FeederParams params) { this.in = params.getStdIn(); this.out = params.getStdOut(); this.err = params.getStdErr(); @@ -60,11 +64,12 @@ public class SimpleFeeder implements ReplyHandler { this.docTypeMgr.configure(params.getConfigId()); } - public SimpleFeeder run() throws Throwable { + SimpleFeeder run() throws Throwable { VespaXMLFeedReader reader = new VespaXMLFeedReader(in, docTypeMgr); VespaXMLFeedReader.Operation op = new VespaXMLFeedReader.Operation(); printHeader(); - while (failure == null) { + long numMessages = 0; + while (failure.get() == null) { reader.read(op); if (op.getType() == VespaXMLFeedReader.OperationType.INVALID) { break; @@ -82,17 +87,17 @@ public class SimpleFeeder implements ReplyHandler { } ++numMessages; } - while (failure == null && numReplies < numMessages) { + while (failure.get() == null && numReplies.get() < numMessages) { Thread.sleep(100); } - if (failure != null) { - throw failure; + if (failure.get() != null) { + throw failure.get(); } printReport(); return this; } - public void close() { + void close() { session.destroy(); mbus.destroy(); } @@ -121,19 +126,22 @@ public class SimpleFeeder implements ReplyHandler { @Override public void handleReply(Reply reply) { - if (failure != null) { + if (failure.get() != null) { return; } if (reply.hasErrors()) { - failure = new IOException(formatErrors(reply)); + failure.compareAndSet(null, new IOException(formatErrors(reply))); return; } long now = System.currentTimeMillis(); - long latency = now - (long)reply.getContext(); + long latency = now - (long) reply.getContext(); + numReplies.incrementAndGet(); + accumulateReplies(now, latency); + } + private synchronized void accumulateReplies(long now, long latency) { minLatency = Math.min(minLatency, latency); maxLatency = Math.max(maxLatency, latency); sumLatency += latency; - ++numReplies; if (now > nextHeader) { printHeader(); nextHeader += HEADER_INTERVAL; @@ -150,7 +158,7 @@ public class SimpleFeeder implements ReplyHandler { private void printReport() { out.format("%10d, %12d, %11d, %11d, %11d\n", System.currentTimeMillis() - startTime, - numReplies, minLatency, sumLatency / numReplies, maxLatency); + numReplies.get(), minLatency, sumLatency / numReplies.get(), maxLatency); } private static String formatErrors(Reply reply) { diff --git a/vespaclient-java/src/main/java/com/yahoo/dummyreceiver/DummyReceiver.java b/vespaclient-java/src/main/java/com/yahoo/dummyreceiver/DummyReceiver.java index f8b785fd4e3..dd509d9e23a 100755 --- a/vespaclient-java/src/main/java/com/yahoo/dummyreceiver/DummyReceiver.java +++ b/vespaclient-java/src/main/java/com/yahoo/dummyreceiver/DummyReceiver.java @@ -20,39 +20,39 @@ import com.yahoo.messagebus.Reply; import com.yahoo.messagebus.network.Identity; import com.yahoo.messagebus.network.rpc.RPCNetworkParams; -import java.util.LinkedList; +import java.util.Arrays; import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import static java.lang.System.out; public class DummyReceiver implements MessageHandler { - String name = null; - DestinationSession session; - MessageBusDocumentAccess da; - long sleepTime = 0; - long messageCount = 0; - int maxPendingCount = 0; - long silentNum = 0; - boolean instant = false; - ThreadPoolExecutor executor = null; - int threads = 10; - long maxQueueTime = -1; - BlockingQueue<Runnable> queue; - boolean verbose = false; + private String name = null; + private DestinationSession session; + private MessageBusDocumentAccess da; + private long sleepTime = 0; + private AtomicLong messageCount = new AtomicLong(0); + private long silentNum = 0; + private boolean instant = false; + private ThreadPoolExecutor executor = null; + private int threads = 10; + private long maxQueueTime = -1; + private BlockingQueue<Runnable> queue; + private boolean verbose = false; private boolean helpOption = false; - DummyReceiver() { + private DummyReceiver() { } - public class Task implements Runnable { + class Task implements Runnable { Reply reply; - public Task(Reply reply) { + Task(Reply reply) { this.reply = reply; } @@ -68,21 +68,21 @@ public class DummyReceiver implements MessageHandler { } } - public void init() { + private void init() { MessageBusParams params = new MessageBusParams(new LoadTypeSet()); params.setRPCNetworkParams(new RPCNetworkParams().setIdentity(new Identity(name))); params.setDocumentManagerConfigId("client"); - params.getMessageBusParams().setMaxPendingCount(maxPendingCount); + params.getMessageBusParams().setMaxPendingCount(0); params.getMessageBusParams().setMaxPendingSize(0); da = new MessageBusDocumentAccess(params); queue = (maxQueueTime < 0) ? new LinkedBlockingDeque<>() : new ThroughputLimitQueue<>(maxQueueTime); session = da.getMessageBus().createDestinationSession("default", true, this); executor = new ThreadPoolExecutor(threads, threads, 5, TimeUnit.SECONDS, queue, new DaemonThreadFactory()); - System.out.println("Registered listener at " + name + "/default with " + maxPendingCount + " max pending and sleep time of " + sleepTime); + System.out.println("Registered listener at " + name + "/default with 0 max pending and sleep time of " + sleepTime); } public void handleMessage(Message message) { - messageCount++; + long messageCount = this.messageCount.incrementAndGet(); if ( silentNum == 0 ) { System.out.println("Received message " + message + ". Received " + messageCount + " messages so far. In queue size " + queue.size()); @@ -116,7 +116,7 @@ public class DummyReceiver implements MessageHandler { } } - String getParam(List<String> args, String arg) throws IllegalArgumentException { + private String getParam(List<String> args, String arg) throws IllegalArgumentException { try { return args.remove(0); } catch (Exception e) { @@ -138,7 +138,7 @@ public class DummyReceiver implements MessageHandler { " --verbose If set, dump the contents of certain messages to stdout"); } - boolean parseArgs(List<String> args) { + private boolean parseArgs(List<String> args) { try { while (!args.isEmpty()) { String arg = args.remove(0); @@ -174,16 +174,11 @@ public class DummyReceiver implements MessageHandler { } } - public static void main(String[] args) { LogSetup.initVespaLogging("dummyreceiver"); DummyReceiver rcv = new DummyReceiver(); - List<String> l = new LinkedList<>(); - for (String arg : args) { - l.add(arg); - } - if (!rcv.parseArgs(l) && !rcv.helpOption) { + if (!rcv.parseArgs(Arrays.asList(args)) && !rcv.helpOption) { System.exit(1); } if (rcv.helpOption) { |