aboutsummaryrefslogtreecommitdiffstats
path: root/jrt
diff options
context:
space:
mode:
authorHåvard Pettersen <havardpe@oath.com>2018-08-29 11:17:14 +0000
committerHåvard Pettersen <havardpe@oath.com>2018-08-29 14:09:45 +0000
commit1d8efbb9fbcc7e52a9d9378a017c85890eff745e (patch)
tree4f3c379903c92e39564f1ec2f48328a54a0cbaf6 /jrt
parent582b3b0454f92de40c1b5fa6b506c05572bdfa75 (diff)
integrate crypto concepts into jrt
Diffstat (limited to 'jrt')
-rw-r--r--jrt/src/com/yahoo/jrt/Connection.java179
-rw-r--r--jrt/src/com/yahoo/jrt/CryptoEngine.java17
-rw-r--r--jrt/src/com/yahoo/jrt/CryptoSocket.java88
-rw-r--r--jrt/src/com/yahoo/jrt/NullCryptoEngine.java15
-rw-r--r--jrt/src/com/yahoo/jrt/NullCryptoSocket.java23
-rw-r--r--jrt/src/com/yahoo/jrt/Transport.java31
-rw-r--r--jrt/src/com/yahoo/jrt/XorCryptoEngine.java17
-rw-r--r--jrt/src/com/yahoo/jrt/XorCryptoSocket.java121
8 files changed, 429 insertions, 62 deletions
diff --git a/jrt/src/com/yahoo/jrt/Connection.java b/jrt/src/com/yahoo/jrt/Connection.java
index c4cc9533b8e..1168583348c 100644
--- a/jrt/src/com/yahoo/jrt/Connection.java
+++ b/jrt/src/com/yahoo/jrt/Connection.java
@@ -23,9 +23,10 @@ class Connection extends Target {
private static final int WRITE_SIZE = 8192;
private static final int WRITE_REDO = 10;
- private static final int INITIAL = 0;
- private static final int CONNECTED = 1;
- private static final int CLOSED = 2;
+ private static final int INITIAL = 0;
+ private static final int CONNECTING = 1;
+ private static final int CONNECTED = 2;
+ private static final int CLOSED = 3;
private int state = INITIAL;
private Queue queue = new Queue();
@@ -41,7 +42,8 @@ class Connection extends Target {
private Transport parent;
private Supervisor owner;
private Spec spec;
- private SocketChannel channel;
+ private CryptoSocket socket;
+ private int readSize = READ_SIZE;
private boolean server;
private AtomicLong requestId = new AtomicLong(0);
private SelectionKey selectionKey;
@@ -52,8 +54,8 @@ class Connection extends Target {
log.log(Level.WARNING, "Bogus state transition: " + this.state + "->" + state);
return;
}
- boolean live = (this.state == INITIAL && state == CONNECTED);
- boolean down = (this.state != CLOSED && state == CLOSED);
+ boolean live = (state == CONNECTED);
+ boolean down = (state == CLOSED);
boolean fini;
boolean pendingWrite;
synchronized (this) {
@@ -62,8 +64,11 @@ class Connection extends Target {
pendingWrite = (writeWork > 0);
}
if (live) {
+ enableRead();
if (pendingWrite) {
enableWrite();
+ } else {
+ disableWrite();
}
owner.sessionLive(this);
}
@@ -86,7 +91,7 @@ class Connection extends Target {
this.parent = parent;
this.owner = owner;
- this.channel = channel;
+ this.socket = parent.createCryptoSocket(channel, true);
server = true;
owner.sessionInit(this);
}
@@ -162,9 +167,8 @@ class Connection extends Target {
setLostReason(new IllegalArgumentException("jrt: malformed or missing spec"));
return this;
}
-
try {
- channel = SocketChannel.open(spec.address());
+ socket = parent.createCryptoSocket(SocketChannel.open(spec.address()), false);
} catch (Exception e) {
setLostReason(e);
}
@@ -172,24 +176,34 @@ class Connection extends Target {
}
public boolean init(Selector selector) {
- if (channel == null) {
+ if (!hasSocket()) {
return false;
}
try {
- channel.configureBlocking(false);
- channel.socket().setTcpNoDelay(true);
- selectionKey = channel.register(selector,
- SelectionKey.OP_READ,
- this);
+ socket.channel().configureBlocking(false);
+ socket.channel().socket().setTcpNoDelay(true);
+ selectionKey = socket.channel().register(selector,
+ SelectionKey.OP_READ | SelectionKey.OP_WRITE,
+ this);
} catch (Exception e) {
log.log(Level.WARNING, "Error initializing connection", e);
setLostReason(e);
return false;
}
- setState(CONNECTED);
+ setState(CONNECTING);
return true;
}
+ public void enableRead() {
+ selectionKey.interestOps(selectionKey.interestOps()
+ | SelectionKey.OP_READ);
+ }
+
+ public void disableRead() {
+ selectionKey.interestOps(selectionKey.interestOps()
+ & ~SelectionKey.OP_READ);
+ }
+
public void enableWrite() {
selectionKey.interestOps(selectionKey.interestOps()
| SelectionKey.OP_WRITE);
@@ -200,45 +214,89 @@ class Connection extends Target {
& ~SelectionKey.OP_WRITE);
}
- public void read() throws IOException {
+ private void handshake() throws IOException {
+ switch (socket.handshake()) {
+ case DONE:
+ if (socket.getMinimumReadBufferSize() > readSize) {
+ readSize = socket.getMinimumReadBufferSize();
+ }
+ while (socket.drain(input.getChannelWritable(readSize)) > 0) {
+ handlePackets();
+ }
+ if (socket.flush() == CryptoSocket.FlushResult.NEED_WRITE) {
+ synchronized (this) {
+ writeWork++;
+ }
+ }
+ setState(CONNECTED);
+ break;
+ case NEED_READ:
+ enableRead();
+ disableWrite();
+ break;
+ case NEED_WRITE:
+ disableRead();
+ enableWrite();
+ break;
+ }
+ }
+
+ private void handlePackets() throws IOException {
+ ByteBuffer rb = input.getReadable();
+ while (true) {
+ PacketInfo info = PacketInfo.getPacketInfo(rb);
+ if (info == null || info.packetLength() > rb.remaining()) {
+ break;
+ }
+ owner.readPacket(info);
+ Packet packet;
+ try {
+ packet = info.decodePacket(rb);
+ } catch (RuntimeException e) {
+ log.log(Level.WARNING, "got garbage; closing connection: " + toString());
+ throw new IOException("jrt: decode error", e);
+ }
+ ReplyHandler handler;
+ synchronized (this) {
+ handler = replyMap.remove(packet.requestId());
+ }
+ if (handler != null) {
+ handler.handleReply(packet);
+ } else {
+ owner.handlePacket(this, packet);
+ }
+ }
+ }
+
+ private void read() throws IOException {
boolean doneRead = false;
for (int i = 0; !doneRead && i < READ_REDO; i++) {
- ByteBuffer wb = input.getChannelWritable(READ_SIZE);
- if (channel.read(wb) == -1) {
+ ByteBuffer wb = input.getChannelWritable(readSize);
+ if (socket.read(wb) == -1) {
throw new IOException("jrt: Connection closed by peer");
}
doneRead = (wb.remaining() > 0);
- ByteBuffer rb = input.getReadable();
- while (true) {
- PacketInfo info = PacketInfo.getPacketInfo(rb);
- if (info == null || info.packetLength() > rb.remaining()) {
- break;
- }
- owner.readPacket(info);
- Packet packet;
- try {
- packet = info.decodePacket(rb);
- } catch (RuntimeException e) {
- log.log(Level.WARNING, "got garbage; closing connection: " + toString());
- throw new IOException("jrt: decode error", e);
- }
- ReplyHandler handler;
- synchronized (this) {
- handler = replyMap.remove(packet.requestId());
- }
- if (handler != null) {
- handler.handleReply(packet);
- } else {
- owner.handlePacket(this, packet);
- }
- }
+ handlePackets();
+ }
+ while (socket.drain(input.getChannelWritable(readSize)) > 0) {
+ handlePackets();
}
if (maxInputSize > 0) {
input.shrink(maxInputSize);
}
}
- public void write() throws IOException {
+ public void handleReadEvent() throws IOException {
+ if (state == CONNECTED) {
+ read();
+ } else if (state == CONNECTING) {
+ handshake();
+ } else {
+ throw new IOException("jrt: got read event in incompatible state: " + state);
+ }
+ }
+
+ private void write() throws IOException {
synchronized (this) {
queue.flush(myQueue);
}
@@ -257,16 +315,23 @@ class Connection extends Target {
if (rb.remaining() == 0) {
break;
}
- channel.write(rb);
+ socket.write(rb);
if (rb.remaining() > 0) {
break;
}
}
+ int myWriteWork = 0;
+ if (output.bytes() > 0) {
+ myWriteWork++;
+ }
+ if (socket.flush() == CryptoSocket.FlushResult.NEED_WRITE) {
+ myWriteWork++;
+ }
boolean disableWrite;
synchronized (this) {
writeWork = queue.size()
- + myQueue.size()
- + ((output.bytes() > 0) ? 1 : 0);
+ + myQueue.size()
+ + myWriteWork;
disableWrite = (writeWork == 0);
}
if (disableWrite) {
@@ -277,6 +342,16 @@ class Connection extends Target {
}
}
+ public void handleWriteEvent() throws IOException {
+ if (state == CONNECTED) {
+ write();
+ } else if (state == CONNECTING) {
+ handshake();
+ } else {
+ throw new IOException("jrt: got write event in incompatible state: " + state);
+ }
+ }
+
public void fini() {
setState(CLOSED);
if (selectionKey != null) {
@@ -289,13 +364,13 @@ class Connection extends Target {
}
public boolean hasSocket() {
- return (channel != null);
+ return ((socket != null) && (socket.channel() != null));
}
public void closeSocket() {
- if (channel != null) {
+ if (hasSocket()) {
try {
- channel.socket().close();
+ socket.channel().socket().close();
} catch (Exception e) {
log.log(Level.WARNING, "Error closing connection", e);
}
@@ -393,8 +468,8 @@ class Connection extends Target {
}
public String toString() {
- if (channel != null) {
- return "Connection { " + channel.socket() + " }";
+ if (hasSocket()) {
+ return "Connection { " + socket.channel().socket() + " }";
}
return "Connection { no socket, spec " + spec + " }";
}
diff --git a/jrt/src/com/yahoo/jrt/CryptoEngine.java b/jrt/src/com/yahoo/jrt/CryptoEngine.java
new file mode 100644
index 00000000000..9852d5a88a6
--- /dev/null
+++ b/jrt/src/com/yahoo/jrt/CryptoEngine.java
@@ -0,0 +1,17 @@
+// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.jrt;
+
+
+import java.nio.channels.SocketChannel;
+
+
+/**
+ * Component responsible for wrapping low-level sockets into
+ * appropriate CryptoSocket instances. This is the top-level interface
+ * used by code wanting to perform network io with appropriate
+ * encryption.
+ **/
+public interface CryptoEngine {
+ public CryptoSocket createCryptoSocket(SocketChannel channel, boolean isServer);
+ static public CryptoEngine createDefault() { return new NullCryptoEngine(); }
+}
diff --git a/jrt/src/com/yahoo/jrt/CryptoSocket.java b/jrt/src/com/yahoo/jrt/CryptoSocket.java
new file mode 100644
index 00000000000..626eed47468
--- /dev/null
+++ b/jrt/src/com/yahoo/jrt/CryptoSocket.java
@@ -0,0 +1,88 @@
+// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.jrt;
+
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+
+
+/**
+ * Abstraction of a low-level async network socket which can produce
+ * io events and allows encrypting written data and decrypting read
+ * data. The interface is complexified to handle the use of internal
+ * buffers that may mask io events and pending work. The interface is
+ * simplified by assuming there will be no mid-stream re-negotiation
+ * (no read/write cross-dependencies). Handshaking is explicit and
+ * up-front. This interface is initially designed for persistent
+ * transport connections where closing the connection has no
+ * application-level semantics.
+ **/
+public interface CryptoSocket {
+
+ /**
+ * Obtain the underlying socket channel used by this CryptoSocket.
+ **/
+ public SocketChannel channel();
+
+ public enum HandshakeResult { DONE, NEED_READ, NEED_WRITE }
+
+ /**
+ * Try to progress the initial connection handshake. Handshaking
+ * will be done once, before any normal reads or writes are
+ * performed. Re-negotiation at a later stage will not be
+ * permitted. This function will be called multiple times until
+ * the status is either DONE or an IOException is thrown. When
+ * NEED_READ or NEED_WRITE is returned, the handshake function
+ * will be called again when the appropriate io event has
+ * triggered.
+ **/
+ public HandshakeResult handshake() throws IOException;
+
+ /**
+ * This function should be called after handshaking has completed
+ * before calling the read function. It dictates the minimum size
+ * of the application read buffer presented to the read
+ * function. This is needed to support frame-based stateless
+ * decryption of incoming data.
+ **/
+ public int getMinimumReadBufferSize();
+
+ /**
+ * Called when the underlying socket has available data. Read
+ * through the entire input pipeline. The semantics are the same
+ * as with a normal socket read except it can also fail for
+ * cryptographic reasons.
+ **/
+ public int read(ByteBuffer dst) throws IOException;
+
+ /**
+ * Similar to read, but this function is not allowed to read from
+ * the underlying socket. This is to enable the application to
+ * make sure that there is no more input data in the read pipeline
+ * that is independent of data not yet read from the actual
+ * socket. Draining data from the input pipeline is done to
+ * prevent masking read events.
+ **/
+ public int drain(ByteBuffer dst) throws IOException;
+
+ /**
+ * Called when the application has data it wants to write. Write
+ * through the entire output pipeline. The semantics are the same
+ * as with a normal socket write.
+ **/
+ public int write(ByteBuffer src) throws IOException;
+
+ public enum FlushResult { DONE, NEED_WRITE }
+
+ /**
+ * Try to flush data in the write pipeline that is not depenedent
+ * on data not yet written by the application into the underlying
+ * socket. This is to enable the application to identify pending
+ * work that may not be completed until the underlying socket is
+ * ready for writing more data. When NEED_WRITE is returned,
+ * either write or flush will be called again when the appropriate
+ * io event has triggered.
+ **/
+ public FlushResult flush() throws IOException;
+}
diff --git a/jrt/src/com/yahoo/jrt/NullCryptoEngine.java b/jrt/src/com/yahoo/jrt/NullCryptoEngine.java
new file mode 100644
index 00000000000..601ca6aff4f
--- /dev/null
+++ b/jrt/src/com/yahoo/jrt/NullCryptoEngine.java
@@ -0,0 +1,15 @@
+// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.jrt;
+
+
+import java.nio.channels.SocketChannel;
+
+
+/**
+ * CryptoEngine implementation that performs no encryption.
+ **/
+public class NullCryptoEngine implements CryptoEngine {
+ @Override public CryptoSocket createCryptoSocket(SocketChannel channel, boolean isServer) {
+ return new NullCryptoSocket(channel);
+ }
+}
diff --git a/jrt/src/com/yahoo/jrt/NullCryptoSocket.java b/jrt/src/com/yahoo/jrt/NullCryptoSocket.java
new file mode 100644
index 00000000000..0397ba8f9fd
--- /dev/null
+++ b/jrt/src/com/yahoo/jrt/NullCryptoSocket.java
@@ -0,0 +1,23 @@
+// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.jrt;
+
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+
+
+/**
+ * A CryptoSocket with no encryption
+ **/
+public class NullCryptoSocket implements CryptoSocket {
+ private SocketChannel channel;
+ public NullCryptoSocket(SocketChannel channel) { this.channel = channel; }
+ @Override public SocketChannel channel() { return channel; }
+ @Override public HandshakeResult handshake() throws IOException { return HandshakeResult.DONE; }
+ @Override public int getMinimumReadBufferSize() { return 1; }
+ @Override public int read(ByteBuffer dst) throws IOException { return channel.read(dst); }
+ @Override public int drain(ByteBuffer dst) throws IOException { return 0; }
+ @Override public int write(ByteBuffer src) throws IOException { return channel.write(src); }
+ @Override public FlushResult flush() throws IOException { return FlushResult.DONE; }
+}
diff --git a/jrt/src/com/yahoo/jrt/Transport.java b/jrt/src/com/yahoo/jrt/Transport.java
index 7b56fcc83e3..f19a9b17ca5 100644
--- a/jrt/src/com/yahoo/jrt/Transport.java
+++ b/jrt/src/com/yahoo/jrt/Transport.java
@@ -5,6 +5,7 @@ package com.yahoo.jrt;
import java.io.IOException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
+import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -67,6 +68,7 @@ public class Transport {
private static Logger log = Logger.getLogger(Transport.class.getName());
private FatalErrorHandler fatalHandler; // NB: this must be set first
+ private CryptoEngine cryptoEngine;
private Thread thread;
private Queue queue;
private Queue myQueue;
@@ -136,7 +138,7 @@ public class Transport {
}
if (key.isReadable()) {
try {
- conn.read();
+ conn.handleReadEvent();
} catch (IOException e) {
conn.setLostReason(e);
return false;
@@ -144,7 +146,7 @@ public class Transport {
}
if (key.isWritable()) {
try {
- conn.write();
+ conn.handleWriteEvent();
} catch (IOException e) {
conn.setLostReason(e);
return false;
@@ -154,14 +156,19 @@ public class Transport {
}
/**
- * Create a new Transport object with the given fatal error handler.
+ * Create a new Transport object with the given fatal error
+ * handler and CryptoEngine. If a fatal error occurs when no fatal
+ * error handler is registered, the default action is to log the
+ * error and exit with exit code 1.
*
* @param fatalHandler fatal error handler
+ * @param cryptoEngine crypto engine to use
**/
- public Transport(FatalErrorHandler fatalHandler) {
+ public Transport(FatalErrorHandler fatalHandler, CryptoEngine cryptoEngine) {
synchronized (this) {
this.fatalHandler = fatalHandler; // NB: this must be set first
}
+ this.cryptoEngine = cryptoEngine;
thread = new Thread(new Run(), "<transport>");
queue = new Queue();
myQueue = new Queue();
@@ -177,15 +184,19 @@ public class Transport {
thread.setDaemon(true);
thread.start();
}
+ public Transport(CryptoEngine cryptoEngine) { this(null, cryptoEngine); }
+ public Transport(FatalErrorHandler fatalHandler) { this(fatalHandler, CryptoEngine.createDefault()); }
+ public Transport() { this(null, CryptoEngine.createDefault()); }
/**
- * Create a Transport object with no fatal error handler. If a
- * fatal error occurs when no fatal error handler is registered,
- * the default action is to log the error and exit with exit code
- * 1.
+ * Use the underlying CryptoEngine to create a CryptoSocket.
+ *
+ * @return CryptoSocket handling appropriate encryption
+ * @param channel low-level socket channel to be wrapped by the CryptoSocket
+ * @param isServer flag indicating which end of the connection we are
**/
- public Transport() {
- this(null);
+ CryptoSocket createCryptoSocket(SocketChannel channel, boolean isServer) {
+ return cryptoEngine.createCryptoSocket(channel, isServer);
}
/**
diff --git a/jrt/src/com/yahoo/jrt/XorCryptoEngine.java b/jrt/src/com/yahoo/jrt/XorCryptoEngine.java
new file mode 100644
index 00000000000..4ba6d00faa4
--- /dev/null
+++ b/jrt/src/com/yahoo/jrt/XorCryptoEngine.java
@@ -0,0 +1,17 @@
+// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.jrt;
+
+
+import java.nio.channels.SocketChannel;
+
+
+/**
+ * Very simple crypto engine that requires connection handshaking and
+ * data transformation. Used to test encryption integration separate
+ * from TLS.
+ **/
+public class XorCryptoEngine implements CryptoEngine {
+ @Override public CryptoSocket createCryptoSocket(SocketChannel channel, boolean isServer) {
+ return new XorCryptoSocket(channel, isServer);
+ }
+}
diff --git a/jrt/src/com/yahoo/jrt/XorCryptoSocket.java b/jrt/src/com/yahoo/jrt/XorCryptoSocket.java
new file mode 100644
index 00000000000..90be58bb700
--- /dev/null
+++ b/jrt/src/com/yahoo/jrt/XorCryptoSocket.java
@@ -0,0 +1,121 @@
+// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.jrt;
+
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+import java.util.ArrayDeque;
+import java.util.Queue;
+import java.util.Random;
+
+
+/**
+ * A very simple CryptoSocket that performs connection handshaking and
+ * data transformation. Used to test encryption integration separate
+ * from TLS.
+ **/
+public class XorCryptoSocket implements CryptoSocket {
+
+ private static final int CHUNK_SIZE = 4096;
+ enum OP { READ_KEY, WRITE_KEY }
+
+ private Queue<OP> opList = new ArrayDeque<>();
+ private byte myKey = genKey();
+ private byte peerKey;
+ private Buffer input = new Buffer(CHUNK_SIZE);
+ private Buffer output = new Buffer(CHUNK_SIZE);
+ private SocketChannel channel;
+
+ private static byte genKey() {
+ return (byte) new Random().nextInt(256);
+ }
+
+ private HandshakeResult readKey() throws IOException {
+ int res = channel.read(input.getWritable(1));
+ if (res > 0) {
+ peerKey = input.getReadable().get();
+ return HandshakeResult.DONE;
+ } else if (res == 0) {
+ return HandshakeResult.NEED_READ;
+ } else {
+ throw new IOException("EOF during handshake");
+ }
+ }
+ private HandshakeResult writeKey() throws IOException {
+ if (output.bytes() == 0) {
+ output.getWritable(1).put(myKey);
+ }
+ if (channel.write(output.getReadable()) == 0) {
+ return HandshakeResult.NEED_WRITE;
+ }
+ return HandshakeResult.DONE;
+ }
+ private HandshakeResult perform(OP op) throws IOException {
+ switch (op) {
+ case READ_KEY: return readKey();
+ case WRITE_KEY: return writeKey();
+ }
+ throw new IOException("invalid handshake operation");
+ }
+
+ public XorCryptoSocket(SocketChannel channel, boolean isServer) {
+ this.channel = channel;
+ if (isServer) {
+ opList.add(OP.READ_KEY);
+ opList.add(OP.WRITE_KEY);
+ } else {
+ opList.add(OP.WRITE_KEY);
+ opList.add(OP.READ_KEY);
+ }
+ }
+ @Override public SocketChannel channel() { return channel; }
+ @Override public HandshakeResult handshake() throws IOException {
+ while (!opList.isEmpty()) {
+ HandshakeResult partialResult = perform(opList.element());
+ if (partialResult != HandshakeResult.DONE) {
+ return partialResult;
+ }
+ opList.remove();
+ }
+ return HandshakeResult.DONE;
+ }
+ @Override public int getMinimumReadBufferSize() { return 1; }
+ @Override public int read(ByteBuffer dst) throws IOException {
+ if (input.bytes() == 0) {
+ if (channel.read(input.getWritable(CHUNK_SIZE)) == -1) {
+ return -1; // EOF
+ }
+ }
+ return drain(dst);
+ }
+ @Override public int drain(ByteBuffer dst) throws IOException {
+ int cnt = 0;
+ ByteBuffer src = input.getReadable();
+ while (src.hasRemaining() && dst.hasRemaining()) {
+ dst.put((byte)(src.get() ^ myKey));
+ cnt++;
+ }
+ return cnt;
+ }
+ @Override public int write(ByteBuffer src) throws IOException {
+ int cnt = 0;
+ if (flush() == FlushResult.DONE) {
+ ByteBuffer dst = output.getWritable(CHUNK_SIZE);
+ while (src.hasRemaining() && dst.hasRemaining()) {
+ dst.put((byte)(src.get() ^ peerKey));
+ cnt++;
+ }
+ }
+ return cnt;
+ }
+ @Override public FlushResult flush() throws IOException {
+ ByteBuffer src = output.getReadable();
+ channel.write(src);
+ if (src.hasRemaining()) {
+ return FlushResult.NEED_WRITE;
+ } else {
+ return FlushResult.DONE;
+ }
+ }
+}