diff options
author | Håvard Pettersen <havardpe@oath.com> | 2018-08-29 11:17:14 +0000 |
---|---|---|
committer | Håvard Pettersen <havardpe@oath.com> | 2018-08-29 14:09:45 +0000 |
commit | 1d8efbb9fbcc7e52a9d9378a017c85890eff745e (patch) | |
tree | 4f3c379903c92e39564f1ec2f48328a54a0cbaf6 /jrt | |
parent | 582b3b0454f92de40c1b5fa6b506c05572bdfa75 (diff) |
integrate crypto concepts into jrt
Diffstat (limited to 'jrt')
-rw-r--r-- | jrt/src/com/yahoo/jrt/Connection.java | 179 | ||||
-rw-r--r-- | jrt/src/com/yahoo/jrt/CryptoEngine.java | 17 | ||||
-rw-r--r-- | jrt/src/com/yahoo/jrt/CryptoSocket.java | 88 | ||||
-rw-r--r-- | jrt/src/com/yahoo/jrt/NullCryptoEngine.java | 15 | ||||
-rw-r--r-- | jrt/src/com/yahoo/jrt/NullCryptoSocket.java | 23 | ||||
-rw-r--r-- | jrt/src/com/yahoo/jrt/Transport.java | 31 | ||||
-rw-r--r-- | jrt/src/com/yahoo/jrt/XorCryptoEngine.java | 17 | ||||
-rw-r--r-- | jrt/src/com/yahoo/jrt/XorCryptoSocket.java | 121 |
8 files changed, 429 insertions, 62 deletions
diff --git a/jrt/src/com/yahoo/jrt/Connection.java b/jrt/src/com/yahoo/jrt/Connection.java index c4cc9533b8e..1168583348c 100644 --- a/jrt/src/com/yahoo/jrt/Connection.java +++ b/jrt/src/com/yahoo/jrt/Connection.java @@ -23,9 +23,10 @@ class Connection extends Target { private static final int WRITE_SIZE = 8192; private static final int WRITE_REDO = 10; - private static final int INITIAL = 0; - private static final int CONNECTED = 1; - private static final int CLOSED = 2; + private static final int INITIAL = 0; + private static final int CONNECTING = 1; + private static final int CONNECTED = 2; + private static final int CLOSED = 3; private int state = INITIAL; private Queue queue = new Queue(); @@ -41,7 +42,8 @@ class Connection extends Target { private Transport parent; private Supervisor owner; private Spec spec; - private SocketChannel channel; + private CryptoSocket socket; + private int readSize = READ_SIZE; private boolean server; private AtomicLong requestId = new AtomicLong(0); private SelectionKey selectionKey; @@ -52,8 +54,8 @@ class Connection extends Target { log.log(Level.WARNING, "Bogus state transition: " + this.state + "->" + state); return; } - boolean live = (this.state == INITIAL && state == CONNECTED); - boolean down = (this.state != CLOSED && state == CLOSED); + boolean live = (state == CONNECTED); + boolean down = (state == CLOSED); boolean fini; boolean pendingWrite; synchronized (this) { @@ -62,8 +64,11 @@ class Connection extends Target { pendingWrite = (writeWork > 0); } if (live) { + enableRead(); if (pendingWrite) { enableWrite(); + } else { + disableWrite(); } owner.sessionLive(this); } @@ -86,7 +91,7 @@ class Connection extends Target { this.parent = parent; this.owner = owner; - this.channel = channel; + this.socket = parent.createCryptoSocket(channel, true); server = true; owner.sessionInit(this); } @@ -162,9 +167,8 @@ class Connection extends Target { setLostReason(new IllegalArgumentException("jrt: malformed or missing spec")); return this; } - try { - channel = SocketChannel.open(spec.address()); + socket = parent.createCryptoSocket(SocketChannel.open(spec.address()), false); } catch (Exception e) { setLostReason(e); } @@ -172,24 +176,34 @@ class Connection extends Target { } public boolean init(Selector selector) { - if (channel == null) { + if (!hasSocket()) { return false; } try { - channel.configureBlocking(false); - channel.socket().setTcpNoDelay(true); - selectionKey = channel.register(selector, - SelectionKey.OP_READ, - this); + socket.channel().configureBlocking(false); + socket.channel().socket().setTcpNoDelay(true); + selectionKey = socket.channel().register(selector, + SelectionKey.OP_READ | SelectionKey.OP_WRITE, + this); } catch (Exception e) { log.log(Level.WARNING, "Error initializing connection", e); setLostReason(e); return false; } - setState(CONNECTED); + setState(CONNECTING); return true; } + public void enableRead() { + selectionKey.interestOps(selectionKey.interestOps() + | SelectionKey.OP_READ); + } + + public void disableRead() { + selectionKey.interestOps(selectionKey.interestOps() + & ~SelectionKey.OP_READ); + } + public void enableWrite() { selectionKey.interestOps(selectionKey.interestOps() | SelectionKey.OP_WRITE); @@ -200,45 +214,89 @@ class Connection extends Target { & ~SelectionKey.OP_WRITE); } - public void read() throws IOException { + private void handshake() throws IOException { + switch (socket.handshake()) { + case DONE: + if (socket.getMinimumReadBufferSize() > readSize) { + readSize = socket.getMinimumReadBufferSize(); + } + while (socket.drain(input.getChannelWritable(readSize)) > 0) { + handlePackets(); + } + if (socket.flush() == CryptoSocket.FlushResult.NEED_WRITE) { + synchronized (this) { + writeWork++; + } + } + setState(CONNECTED); + break; + case NEED_READ: + enableRead(); + disableWrite(); + break; + case NEED_WRITE: + disableRead(); + enableWrite(); + break; + } + } + + private void handlePackets() throws IOException { + ByteBuffer rb = input.getReadable(); + while (true) { + PacketInfo info = PacketInfo.getPacketInfo(rb); + if (info == null || info.packetLength() > rb.remaining()) { + break; + } + owner.readPacket(info); + Packet packet; + try { + packet = info.decodePacket(rb); + } catch (RuntimeException e) { + log.log(Level.WARNING, "got garbage; closing connection: " + toString()); + throw new IOException("jrt: decode error", e); + } + ReplyHandler handler; + synchronized (this) { + handler = replyMap.remove(packet.requestId()); + } + if (handler != null) { + handler.handleReply(packet); + } else { + owner.handlePacket(this, packet); + } + } + } + + private void read() throws IOException { boolean doneRead = false; for (int i = 0; !doneRead && i < READ_REDO; i++) { - ByteBuffer wb = input.getChannelWritable(READ_SIZE); - if (channel.read(wb) == -1) { + ByteBuffer wb = input.getChannelWritable(readSize); + if (socket.read(wb) == -1) { throw new IOException("jrt: Connection closed by peer"); } doneRead = (wb.remaining() > 0); - ByteBuffer rb = input.getReadable(); - while (true) { - PacketInfo info = PacketInfo.getPacketInfo(rb); - if (info == null || info.packetLength() > rb.remaining()) { - break; - } - owner.readPacket(info); - Packet packet; - try { - packet = info.decodePacket(rb); - } catch (RuntimeException e) { - log.log(Level.WARNING, "got garbage; closing connection: " + toString()); - throw new IOException("jrt: decode error", e); - } - ReplyHandler handler; - synchronized (this) { - handler = replyMap.remove(packet.requestId()); - } - if (handler != null) { - handler.handleReply(packet); - } else { - owner.handlePacket(this, packet); - } - } + handlePackets(); + } + while (socket.drain(input.getChannelWritable(readSize)) > 0) { + handlePackets(); } if (maxInputSize > 0) { input.shrink(maxInputSize); } } - public void write() throws IOException { + public void handleReadEvent() throws IOException { + if (state == CONNECTED) { + read(); + } else if (state == CONNECTING) { + handshake(); + } else { + throw new IOException("jrt: got read event in incompatible state: " + state); + } + } + + private void write() throws IOException { synchronized (this) { queue.flush(myQueue); } @@ -257,16 +315,23 @@ class Connection extends Target { if (rb.remaining() == 0) { break; } - channel.write(rb); + socket.write(rb); if (rb.remaining() > 0) { break; } } + int myWriteWork = 0; + if (output.bytes() > 0) { + myWriteWork++; + } + if (socket.flush() == CryptoSocket.FlushResult.NEED_WRITE) { + myWriteWork++; + } boolean disableWrite; synchronized (this) { writeWork = queue.size() - + myQueue.size() - + ((output.bytes() > 0) ? 1 : 0); + + myQueue.size() + + myWriteWork; disableWrite = (writeWork == 0); } if (disableWrite) { @@ -277,6 +342,16 @@ class Connection extends Target { } } + public void handleWriteEvent() throws IOException { + if (state == CONNECTED) { + write(); + } else if (state == CONNECTING) { + handshake(); + } else { + throw new IOException("jrt: got write event in incompatible state: " + state); + } + } + public void fini() { setState(CLOSED); if (selectionKey != null) { @@ -289,13 +364,13 @@ class Connection extends Target { } public boolean hasSocket() { - return (channel != null); + return ((socket != null) && (socket.channel() != null)); } public void closeSocket() { - if (channel != null) { + if (hasSocket()) { try { - channel.socket().close(); + socket.channel().socket().close(); } catch (Exception e) { log.log(Level.WARNING, "Error closing connection", e); } @@ -393,8 +468,8 @@ class Connection extends Target { } public String toString() { - if (channel != null) { - return "Connection { " + channel.socket() + " }"; + if (hasSocket()) { + return "Connection { " + socket.channel().socket() + " }"; } return "Connection { no socket, spec " + spec + " }"; } diff --git a/jrt/src/com/yahoo/jrt/CryptoEngine.java b/jrt/src/com/yahoo/jrt/CryptoEngine.java new file mode 100644 index 00000000000..9852d5a88a6 --- /dev/null +++ b/jrt/src/com/yahoo/jrt/CryptoEngine.java @@ -0,0 +1,17 @@ +// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.jrt; + + +import java.nio.channels.SocketChannel; + + +/** + * Component responsible for wrapping low-level sockets into + * appropriate CryptoSocket instances. This is the top-level interface + * used by code wanting to perform network io with appropriate + * encryption. + **/ +public interface CryptoEngine { + public CryptoSocket createCryptoSocket(SocketChannel channel, boolean isServer); + static public CryptoEngine createDefault() { return new NullCryptoEngine(); } +} diff --git a/jrt/src/com/yahoo/jrt/CryptoSocket.java b/jrt/src/com/yahoo/jrt/CryptoSocket.java new file mode 100644 index 00000000000..626eed47468 --- /dev/null +++ b/jrt/src/com/yahoo/jrt/CryptoSocket.java @@ -0,0 +1,88 @@ +// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.jrt; + + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.SocketChannel; + + +/** + * Abstraction of a low-level async network socket which can produce + * io events and allows encrypting written data and decrypting read + * data. The interface is complexified to handle the use of internal + * buffers that may mask io events and pending work. The interface is + * simplified by assuming there will be no mid-stream re-negotiation + * (no read/write cross-dependencies). Handshaking is explicit and + * up-front. This interface is initially designed for persistent + * transport connections where closing the connection has no + * application-level semantics. + **/ +public interface CryptoSocket { + + /** + * Obtain the underlying socket channel used by this CryptoSocket. + **/ + public SocketChannel channel(); + + public enum HandshakeResult { DONE, NEED_READ, NEED_WRITE } + + /** + * Try to progress the initial connection handshake. Handshaking + * will be done once, before any normal reads or writes are + * performed. Re-negotiation at a later stage will not be + * permitted. This function will be called multiple times until + * the status is either DONE or an IOException is thrown. When + * NEED_READ or NEED_WRITE is returned, the handshake function + * will be called again when the appropriate io event has + * triggered. + **/ + public HandshakeResult handshake() throws IOException; + + /** + * This function should be called after handshaking has completed + * before calling the read function. It dictates the minimum size + * of the application read buffer presented to the read + * function. This is needed to support frame-based stateless + * decryption of incoming data. + **/ + public int getMinimumReadBufferSize(); + + /** + * Called when the underlying socket has available data. Read + * through the entire input pipeline. The semantics are the same + * as with a normal socket read except it can also fail for + * cryptographic reasons. + **/ + public int read(ByteBuffer dst) throws IOException; + + /** + * Similar to read, but this function is not allowed to read from + * the underlying socket. This is to enable the application to + * make sure that there is no more input data in the read pipeline + * that is independent of data not yet read from the actual + * socket. Draining data from the input pipeline is done to + * prevent masking read events. + **/ + public int drain(ByteBuffer dst) throws IOException; + + /** + * Called when the application has data it wants to write. Write + * through the entire output pipeline. The semantics are the same + * as with a normal socket write. + **/ + public int write(ByteBuffer src) throws IOException; + + public enum FlushResult { DONE, NEED_WRITE } + + /** + * Try to flush data in the write pipeline that is not depenedent + * on data not yet written by the application into the underlying + * socket. This is to enable the application to identify pending + * work that may not be completed until the underlying socket is + * ready for writing more data. When NEED_WRITE is returned, + * either write or flush will be called again when the appropriate + * io event has triggered. + **/ + public FlushResult flush() throws IOException; +} diff --git a/jrt/src/com/yahoo/jrt/NullCryptoEngine.java b/jrt/src/com/yahoo/jrt/NullCryptoEngine.java new file mode 100644 index 00000000000..601ca6aff4f --- /dev/null +++ b/jrt/src/com/yahoo/jrt/NullCryptoEngine.java @@ -0,0 +1,15 @@ +// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.jrt; + + +import java.nio.channels.SocketChannel; + + +/** + * CryptoEngine implementation that performs no encryption. + **/ +public class NullCryptoEngine implements CryptoEngine { + @Override public CryptoSocket createCryptoSocket(SocketChannel channel, boolean isServer) { + return new NullCryptoSocket(channel); + } +} diff --git a/jrt/src/com/yahoo/jrt/NullCryptoSocket.java b/jrt/src/com/yahoo/jrt/NullCryptoSocket.java new file mode 100644 index 00000000000..0397ba8f9fd --- /dev/null +++ b/jrt/src/com/yahoo/jrt/NullCryptoSocket.java @@ -0,0 +1,23 @@ +// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.jrt; + + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.SocketChannel; + + +/** + * A CryptoSocket with no encryption + **/ +public class NullCryptoSocket implements CryptoSocket { + private SocketChannel channel; + public NullCryptoSocket(SocketChannel channel) { this.channel = channel; } + @Override public SocketChannel channel() { return channel; } + @Override public HandshakeResult handshake() throws IOException { return HandshakeResult.DONE; } + @Override public int getMinimumReadBufferSize() { return 1; } + @Override public int read(ByteBuffer dst) throws IOException { return channel.read(dst); } + @Override public int drain(ByteBuffer dst) throws IOException { return 0; } + @Override public int write(ByteBuffer src) throws IOException { return channel.write(src); } + @Override public FlushResult flush() throws IOException { return FlushResult.DONE; } +} diff --git a/jrt/src/com/yahoo/jrt/Transport.java b/jrt/src/com/yahoo/jrt/Transport.java index 7b56fcc83e3..f19a9b17ca5 100644 --- a/jrt/src/com/yahoo/jrt/Transport.java +++ b/jrt/src/com/yahoo/jrt/Transport.java @@ -5,6 +5,7 @@ package com.yahoo.jrt; import java.io.IOException; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; +import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.logging.Level; import java.util.logging.Logger; @@ -67,6 +68,7 @@ public class Transport { private static Logger log = Logger.getLogger(Transport.class.getName()); private FatalErrorHandler fatalHandler; // NB: this must be set first + private CryptoEngine cryptoEngine; private Thread thread; private Queue queue; private Queue myQueue; @@ -136,7 +138,7 @@ public class Transport { } if (key.isReadable()) { try { - conn.read(); + conn.handleReadEvent(); } catch (IOException e) { conn.setLostReason(e); return false; @@ -144,7 +146,7 @@ public class Transport { } if (key.isWritable()) { try { - conn.write(); + conn.handleWriteEvent(); } catch (IOException e) { conn.setLostReason(e); return false; @@ -154,14 +156,19 @@ public class Transport { } /** - * Create a new Transport object with the given fatal error handler. + * Create a new Transport object with the given fatal error + * handler and CryptoEngine. If a fatal error occurs when no fatal + * error handler is registered, the default action is to log the + * error and exit with exit code 1. * * @param fatalHandler fatal error handler + * @param cryptoEngine crypto engine to use **/ - public Transport(FatalErrorHandler fatalHandler) { + public Transport(FatalErrorHandler fatalHandler, CryptoEngine cryptoEngine) { synchronized (this) { this.fatalHandler = fatalHandler; // NB: this must be set first } + this.cryptoEngine = cryptoEngine; thread = new Thread(new Run(), "<transport>"); queue = new Queue(); myQueue = new Queue(); @@ -177,15 +184,19 @@ public class Transport { thread.setDaemon(true); thread.start(); } + public Transport(CryptoEngine cryptoEngine) { this(null, cryptoEngine); } + public Transport(FatalErrorHandler fatalHandler) { this(fatalHandler, CryptoEngine.createDefault()); } + public Transport() { this(null, CryptoEngine.createDefault()); } /** - * Create a Transport object with no fatal error handler. If a - * fatal error occurs when no fatal error handler is registered, - * the default action is to log the error and exit with exit code - * 1. + * Use the underlying CryptoEngine to create a CryptoSocket. + * + * @return CryptoSocket handling appropriate encryption + * @param channel low-level socket channel to be wrapped by the CryptoSocket + * @param isServer flag indicating which end of the connection we are **/ - public Transport() { - this(null); + CryptoSocket createCryptoSocket(SocketChannel channel, boolean isServer) { + return cryptoEngine.createCryptoSocket(channel, isServer); } /** diff --git a/jrt/src/com/yahoo/jrt/XorCryptoEngine.java b/jrt/src/com/yahoo/jrt/XorCryptoEngine.java new file mode 100644 index 00000000000..4ba6d00faa4 --- /dev/null +++ b/jrt/src/com/yahoo/jrt/XorCryptoEngine.java @@ -0,0 +1,17 @@ +// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.jrt; + + +import java.nio.channels.SocketChannel; + + +/** + * Very simple crypto engine that requires connection handshaking and + * data transformation. Used to test encryption integration separate + * from TLS. + **/ +public class XorCryptoEngine implements CryptoEngine { + @Override public CryptoSocket createCryptoSocket(SocketChannel channel, boolean isServer) { + return new XorCryptoSocket(channel, isServer); + } +} diff --git a/jrt/src/com/yahoo/jrt/XorCryptoSocket.java b/jrt/src/com/yahoo/jrt/XorCryptoSocket.java new file mode 100644 index 00000000000..90be58bb700 --- /dev/null +++ b/jrt/src/com/yahoo/jrt/XorCryptoSocket.java @@ -0,0 +1,121 @@ +// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.jrt; + + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.SocketChannel; +import java.util.ArrayDeque; +import java.util.Queue; +import java.util.Random; + + +/** + * A very simple CryptoSocket that performs connection handshaking and + * data transformation. Used to test encryption integration separate + * from TLS. + **/ +public class XorCryptoSocket implements CryptoSocket { + + private static final int CHUNK_SIZE = 4096; + enum OP { READ_KEY, WRITE_KEY } + + private Queue<OP> opList = new ArrayDeque<>(); + private byte myKey = genKey(); + private byte peerKey; + private Buffer input = new Buffer(CHUNK_SIZE); + private Buffer output = new Buffer(CHUNK_SIZE); + private SocketChannel channel; + + private static byte genKey() { + return (byte) new Random().nextInt(256); + } + + private HandshakeResult readKey() throws IOException { + int res = channel.read(input.getWritable(1)); + if (res > 0) { + peerKey = input.getReadable().get(); + return HandshakeResult.DONE; + } else if (res == 0) { + return HandshakeResult.NEED_READ; + } else { + throw new IOException("EOF during handshake"); + } + } + private HandshakeResult writeKey() throws IOException { + if (output.bytes() == 0) { + output.getWritable(1).put(myKey); + } + if (channel.write(output.getReadable()) == 0) { + return HandshakeResult.NEED_WRITE; + } + return HandshakeResult.DONE; + } + private HandshakeResult perform(OP op) throws IOException { + switch (op) { + case READ_KEY: return readKey(); + case WRITE_KEY: return writeKey(); + } + throw new IOException("invalid handshake operation"); + } + + public XorCryptoSocket(SocketChannel channel, boolean isServer) { + this.channel = channel; + if (isServer) { + opList.add(OP.READ_KEY); + opList.add(OP.WRITE_KEY); + } else { + opList.add(OP.WRITE_KEY); + opList.add(OP.READ_KEY); + } + } + @Override public SocketChannel channel() { return channel; } + @Override public HandshakeResult handshake() throws IOException { + while (!opList.isEmpty()) { + HandshakeResult partialResult = perform(opList.element()); + if (partialResult != HandshakeResult.DONE) { + return partialResult; + } + opList.remove(); + } + return HandshakeResult.DONE; + } + @Override public int getMinimumReadBufferSize() { return 1; } + @Override public int read(ByteBuffer dst) throws IOException { + if (input.bytes() == 0) { + if (channel.read(input.getWritable(CHUNK_SIZE)) == -1) { + return -1; // EOF + } + } + return drain(dst); + } + @Override public int drain(ByteBuffer dst) throws IOException { + int cnt = 0; + ByteBuffer src = input.getReadable(); + while (src.hasRemaining() && dst.hasRemaining()) { + dst.put((byte)(src.get() ^ myKey)); + cnt++; + } + return cnt; + } + @Override public int write(ByteBuffer src) throws IOException { + int cnt = 0; + if (flush() == FlushResult.DONE) { + ByteBuffer dst = output.getWritable(CHUNK_SIZE); + while (src.hasRemaining() && dst.hasRemaining()) { + dst.put((byte)(src.get() ^ peerKey)); + cnt++; + } + } + return cnt; + } + @Override public FlushResult flush() throws IOException { + ByteBuffer src = output.getReadable(); + channel.write(src); + if (src.hasRemaining()) { + return FlushResult.NEED_WRITE; + } else { + return FlushResult.DONE; + } + } +} |