aboutsummaryrefslogtreecommitdiffstats
path: root/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/application/pkg/ApplicationPackageStream.java
blob: 3288759b174e23d58153942d372b24865f0ffc46 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
package com.yahoo.vespa.hosted.controller.application.pkg;

import com.yahoo.security.X509CertificateUtils;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.UncheckedIOException;
import java.security.cert.X509Certificate;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.function.UnaryOperator;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;
import java.util.zip.ZipOutputStream;

import static com.yahoo.security.X509CertificateUtils.certificateListFromPem;
import static java.io.OutputStream.nullOutputStream;
import static java.lang.Math.min;
import static java.nio.charset.StandardCharsets.UTF_8;

/**
 * Wraps a zipped application package stream.
 * This allows replacing content as the input stream is read.
 * This also retains a truncated {@link ApplicationPackage}, containing only the specified set of files,
 * which can be accessed when this stream is fully exhausted.
 *
 * @author jonmv
 */
public class ApplicationPackageStream {

    private final byte[] inBuffer = new byte[1 << 16];
    private final ByteArrayOutputStream out = new ByteArrayOutputStream(1 << 16);
    private final ZipOutputStream outZip = new ZipOutputStream(out);
    private final ByteArrayOutputStream teeOut = new ByteArrayOutputStream(1 << 16);
    private final ZipOutputStream teeZip = new ZipOutputStream(teeOut);
    private final Replacer replacer;
    private final Predicate<String> filter;
    private final Supplier<InputStream> in;

    private ApplicationPackage ap = null;
    private boolean done = false;

    public static Replacer addingCertificate(Optional<X509Certificate> certificate) {
        return certificate.map(cert -> Replacer.of(Map.of(ApplicationPackage.trustedCertificatesFile,
                                                          trustBytes -> append(trustBytes, cert))))
                          .orElse(Replacer.of(Map.of()));
    }

    static InputStream append(InputStream trustIn, X509Certificate cert) {
        try {
            List<X509Certificate> trusted = trustIn == null ? new ArrayList<>()
                                                            : new ArrayList<>(certificateListFromPem(new String(trustIn.readAllBytes(), UTF_8)));
            trusted.add(cert);
            return new ByteArrayInputStream(X509CertificateUtils.toPem(trusted).getBytes(UTF_8));
        }
        catch (IOException e) {
            throw new UncheckedIOException(e);
        }
    }

    /** Stream that effectively copies the input stream to its {@link #truncatedPackage()} when exhausted. */
    public ApplicationPackageStream(Supplier<InputStream> in) {
        this(in, __ -> true, Map.of());
    }

    /** Stream that replaces the indicated entries, and copies all metadata files to its {@link #truncatedPackage()} when exhausted. */
    public ApplicationPackageStream(Supplier<InputStream> in, Replacer replacer) {
        this(in, name -> ApplicationPackage.prePopulated.contains(name) || name.endsWith(".xml"), replacer);
    }

    /** Stream that replaces the indicated entries, and copies the filtered entries to its {@link #truncatedPackage()} when exhausted. */
    public ApplicationPackageStream(Supplier<InputStream> in, Predicate<String> truncation, Map<String, UnaryOperator<InputStream>> replacements) {
        this(in, truncation, Replacer.of(replacements));
    }

    /** Stream that uses the given replacer to modify content, and copies the filtered entries to its {@link #truncatedPackage()} when exhausted. */
    public ApplicationPackageStream(Supplier<InputStream> in, Predicate<String> truncation, Replacer replacer) {
        this.in = in;
        this.filter = truncation;
        this.replacer = replacer;
    }

    public InputStream zipStream() {
        return new Stream();
    }

    /** Returns the application backed by only the files indicated by the truncation filter. Throws if not yet exhausted. */
    public ApplicationPackage truncatedPackage() {
        if (ap == null) throw new IllegalStateException("must completely exhaust input before reading package");
        return ap;
    }

    private class Stream extends InputStream {

        private final ZipInputStream inZip = new ZipInputStream(in.get());
        private byte[] currentOut = new byte[0];
        private InputStream currentIn = InputStream.nullInputStream();
        private boolean includeCurrent = false;
        private int pos = 0;
        private boolean closed = false;

        private void fill() throws IOException {
            if (done) return;
            while (out.size() == 0) {
                // Exhaust current entry first.
                int i, n = out.size();
                while (out.size() == 0 && (i = currentIn.read(inBuffer)) != -1) {
                    if (includeCurrent) teeZip.write(inBuffer, 0, i);
                    outZip.write(inBuffer, 0, i);
                    n += i;
                }

                // Current entry exhausted, look for next.
                if (n == 0) {
                    next();
                    if (done) break;
                }
            }

            currentOut = out.toByteArray();
            out.reset();
            pos = 0;
        }

        private void next() throws IOException {
            if (includeCurrent) teeZip.closeEntry();
            outZip.closeEntry();

            ZipEntry next = inZip.getNextEntry();
            String name;
            InputStream content = null;
            if (next == null) {
                // We may still have replacements to fill in, but if we don't, we're done filling, forever!
                name = replacer.next();
                if (name == null) {
                    outZip.close(); // This typically makes new output available, so must check for that after this.
                    teeZip.close();
                    currentIn = nullInputStream();
                    ap = new ApplicationPackage(teeOut.toByteArray());
                    done = true;
                    return;
                }
            }
            else {
                name = next.getName();
                content = new FilterInputStream(inZip) { @Override public void close() { } }; // Protect inZip from replacements closing it.
            }

            includeCurrent = filter.test(name);
            currentIn = replacer.modify(name, content);
            if (currentIn == null) {
                currentIn = InputStream.nullInputStream();
            }
            else {
                if (includeCurrent) teeZip.putNextEntry(new ZipEntry(name));
                outZip.putNextEntry(new ZipEntry(name));
            }
        }

        @Override
        public int read() throws IOException {
            if (closed) throw new IOException("stream closed");
            if (pos == currentOut.length) {
                fill();
                if (pos == currentOut.length) return -1;
            }
            return 0xff & inBuffer[pos++];
        }

        @Override
        public int read(byte[] out, int off, int len) throws IOException {
            if (closed) throw new IOException("stream closed");
            if ((off | len | (off + len) | (out.length - (off + len))) < 0) throw new IndexOutOfBoundsException();
            if (pos == currentOut.length) {
                fill();
                if (pos == currentOut.length) return -1;
            }
            int n = min(currentOut.length - pos, len);
            System.arraycopy(currentOut, pos, out, off, n);
            pos += n;
            return n;
        }

        @Override
        public int available() throws IOException {
            return pos == currentOut.length && done ? 0 : 1;
        }

        @Override
        public void close() {
            if ( ! closed) try {
                transferTo(nullOutputStream());
                inZip.close();
                closed = true;
            }
            catch (IOException e) {
                throw new UncheckedIOException(e);
            }
        }

    }

    /** Replaces entries in a zip stream as they are encountered, then appends remaining entries at the end. */
    public interface Replacer {

        /** Called when the entries of the original zip stream are exhausted. Return remaining names, or {@code null} when none left. */
        String next();

        /** Modify content for a given name; return {@code null} for removal; in is {@code null} for entries not present in the input. */
        InputStream modify(String name, InputStream in);

        /**
         * Wraps a map of fixed replacements, and:
         * <ul>
         * <li>Removes entries whose value is {@code null}.</li>
         * <li>Modifies entries present in both input and the map.</li>
         * <li>Appends entries present exclusively in the map.</li>
         * <li>Writes all other entries as they are.</li>
         * </ul>
         */
        static Replacer of(Map<String, UnaryOperator<InputStream>> replacements) {
            return new Replacer() {
                final Map<String, UnaryOperator<InputStream>> remaining = new HashMap<>(replacements);
                @Override public String next() {
                    return remaining.isEmpty() ? null : remaining.keySet().iterator().next();
                }
                @Override public InputStream modify(String name, InputStream in) {
                    UnaryOperator<InputStream> mapper = remaining.remove(name);
                    return mapper == null ? in : mapper.apply(in);
                }
            };
        }

    }

}