summaryrefslogtreecommitdiffstats
path: root/jdisc_http_service/src/main/java/com/yahoo/jdisc/http/client/ChunkedRequestContent.java
diff options
context:
space:
mode:
Diffstat (limited to 'jdisc_http_service/src/main/java/com/yahoo/jdisc/http/client/ChunkedRequestContent.java')
-rw-r--r--jdisc_http_service/src/main/java/com/yahoo/jdisc/http/client/ChunkedRequestContent.java124
1 files changed, 0 insertions, 124 deletions
diff --git a/jdisc_http_service/src/main/java/com/yahoo/jdisc/http/client/ChunkedRequestContent.java b/jdisc_http_service/src/main/java/com/yahoo/jdisc/http/client/ChunkedRequestContent.java
deleted file mode 100644
index 265315d3eb8..00000000000
--- a/jdisc_http_service/src/main/java/com/yahoo/jdisc/http/client/ChunkedRequestContent.java
+++ /dev/null
@@ -1,124 +0,0 @@
-// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.jdisc.http.client;
-
-import com.ning.http.client.Body;
-import com.ning.http.client.BodyGenerator;
-import com.yahoo.jdisc.Request;
-import com.yahoo.jdisc.handler.CompletionHandler;
-import com.yahoo.jdisc.handler.ContentChannel;
-import com.yahoo.jdisc.http.core.HeaderFieldsUtil;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.charset.StandardCharsets;
-import java.util.LinkedList;
-import java.util.Objects;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-
-/**
- * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen Hult</a>
- */
-class ChunkedRequestContent implements BodyGenerator, ContentChannel {
-
- private static final byte[] LAST_CHUNK = "0\r\n".getBytes(StandardCharsets.UTF_8);
- private static final byte[] CRLF_BYTES = "\r\n".getBytes(StandardCharsets.UTF_8);
- private final AtomicReference<ChunkedRequestBody> body = new AtomicReference<>(new ChunkedRequestBody(this));
- private final AtomicBoolean writerClosed = new AtomicBoolean(false);
- private final Queue<Entry> writeQueue = new ConcurrentLinkedQueue<>();
- private final Queue<ByteBuffer> readQueue = new LinkedList<>();
- private final Request request;
- private boolean readerClosed = false;
-
- public ChunkedRequestContent(Request request) {
- this.request = request;
- }
-
- @Override
- public Body createBody() throws IOException {
- // this is called by Netty, and presumably has to be thread-safe since Netty assigns thread by connection --
- // retries are necessarily done using new connections
- Body body = this.body.getAndSet(null);
- if (body == null) {
- throw new UnsupportedOperationException("ChunkedRequestContent does not support retries.");
- }
- return body;
- }
-
- @Override
- public void write(ByteBuffer buf, CompletionHandler handler) {
- // this can be called by any JDisc thread, and needs to be thread-safe
- Objects.requireNonNull(buf, "buf");
- if (writerClosed.get()) {
- throw new IllegalStateException("ChunkedRequestContent is closed.");
- }
- writeQueue.add(new Entry(buf, handler));
- }
-
- @Override
- public void close(CompletionHandler handler) {
- // this can be called by any JDisc thread, and needs to be thread-safe
- if (writerClosed.getAndSet(true)) {
- throw new IllegalStateException("ChunkedRequestContent already closed.");
- }
- writeQueue.add(new Entry(null, handler));
- }
-
- public ByteBuffer nextChunk() {
- // this method is only called by the ChunkedRequestBody, which in turns is only called by the thread assigned to
- // the underlying Netty connection -- it does not need to be thread-safe
- if (!readQueue.isEmpty()) {
- ByteBuffer buf = readQueue.poll();
- if (buf == null) {
- readerClosed = true;
- }
- return buf;
- }
- if (writeQueue.isEmpty()) {
- return null;
- }
- Entry entry = writeQueue.poll();
- try {
- entry.handler.completed();
- } catch (Exception e) {
- // TODO: fail and close write queue
- // TODO: rethrow e to make ning abort request
- }
- if (entry.buf != null) {
- readQueue.add(ByteBuffer.wrap(Integer.toHexString(entry.buf.remaining()).getBytes(StandardCharsets.UTF_8)));
- readQueue.add(ByteBuffer.wrap(CRLF_BYTES));
- readQueue.add(entry.buf);
- readQueue.add(ByteBuffer.wrap(CRLF_BYTES));
- } else {
- readQueue.add(ByteBuffer.wrap(LAST_CHUNK));
- ByteArrayOutputStream out = new ByteArrayOutputStream();
- HeaderFieldsUtil.copyTrailers(request, out);
- byte[] buf = out.toByteArray();
- if (buf.length > 0) {
- readQueue.add(ByteBuffer.wrap(buf));
- }
- readQueue.add(ByteBuffer.wrap(CRLF_BYTES));
- readQueue.add(null);
- }
- return readQueue.poll();
- }
-
- public boolean isEndOfInput() {
- // only called by the assigned Netty thread, does not need to be thread-safe
- return readerClosed;
- }
-
- private static class Entry {
-
- final ByteBuffer buf;
- final CompletionHandler handler;
-
- Entry(ByteBuffer buf, CompletionHandler handler) {
- this.buf = buf;
- this.handler = handler;
- }
- }
-}