001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.commons.compress.compressors.snappy;
020
021import java.io.IOException;
022import java.io.InputStream;
023import java.io.PushbackInputStream;
024import java.util.Arrays;
025
026import org.apache.commons.compress.compressors.CompressorInputStream;
027import org.apache.commons.compress.utils.BoundedInputStream;
028import org.apache.commons.compress.utils.IOUtils;
029
030/**
031 * CompressorInputStream for the framing Snappy format.
032 *
033 * <p>Based on the "spec" in the version "Last revised: 2013-10-25"</p>
034 *
035 * @see <a href="http://code.google.com/p/snappy/source/browse/trunk/framing_format.txt">Snappy framing format description</a>
036 * @since 1.7
037 */
038public class FramedSnappyCompressorInputStream extends CompressorInputStream {
039    /**
040     * package private for tests only.
041     */
042    static final long MASK_OFFSET = 0xa282ead8L;
043
044    private static final int STREAM_IDENTIFIER_TYPE = 0xff;
045    private static final int COMPRESSED_CHUNK_TYPE = 0;
046    private static final int UNCOMPRESSED_CHUNK_TYPE = 1;
047    private static final int PADDING_CHUNK_TYPE = 0xfe;
048    private static final int MIN_UNSKIPPABLE_TYPE = 2;
049    private static final int MAX_UNSKIPPABLE_TYPE = 0x7f;
050    private static final int MAX_SKIPPABLE_TYPE = 0xfd;
051
052    private static final byte[] SZ_SIGNATURE = new byte[] {
053        (byte) STREAM_IDENTIFIER_TYPE, // tag
054        6, 0, 0, // length
055        's', 'N', 'a', 'P', 'p', 'Y'
056    };
057
058    /** The underlying stream to read compressed data from */
059    private final PushbackInputStream in;
060
061    private SnappyCompressorInputStream currentCompressedChunk;
062
063    // used in no-arg read method
064    private final byte[] oneByte = new byte[1];
065
066    private boolean endReached, inUncompressedChunk;
067
068    private int uncompressedBytesRemaining;
069    private long expectedChecksum = -1;
070    private final PureJavaCrc32C checksum = new PureJavaCrc32C();
071
072    /**
073     * Constructs a new input stream that decompresses snappy-framed-compressed data
074     * from the specified input stream.
075     * @param in  the InputStream from which to read the compressed data
076     */
077    public FramedSnappyCompressorInputStream(InputStream in) throws IOException {
078        this.in = new PushbackInputStream(in, 1);
079        readStreamIdentifier();
080    }
081
082    /** {@inheritDoc} */
083    @Override
084    public int read() throws IOException {
085        return read(oneByte, 0, 1) == -1 ? -1 : oneByte[0] & 0xFF;
086    }
087
088    /** {@inheritDoc} */
089    @Override
090    public void close() throws IOException {
091        if (currentCompressedChunk != null) {
092            currentCompressedChunk.close();
093            currentCompressedChunk = null;
094        }
095        in.close();
096    }
097
098    /** {@inheritDoc} */
099    @Override
100    public int read(byte[] b, int off, int len) throws IOException {
101        int read = readOnce(b, off, len);
102        if (read == -1) {
103            readNextBlock();
104            if (endReached) {
105                return -1;
106            }
107            read = readOnce(b, off, len);
108        }
109        return read;
110    }
111
112    /** {@inheritDoc} */
113    @Override
114    public int available() throws IOException {
115        if (inUncompressedChunk) {
116            return Math.min(uncompressedBytesRemaining,
117                            in.available());
118        } else if (currentCompressedChunk != null) {
119            return currentCompressedChunk.available();
120        }
121        return 0;
122    }
123
124    /**
125     * Read from the current chunk into the given array.
126     *
127     * @return -1 if there is no current chunk or the number of bytes
128     * read from the current chunk (which may be -1 if the end of the
129     * chunk is reached).
130     */
131    private int readOnce(byte[] b, int off, int len) throws IOException {
132        int read = -1;
133        if (inUncompressedChunk) {
134            int amount = Math.min(uncompressedBytesRemaining, len);
135            if (amount == 0) {
136                return -1;
137            }
138            read = in.read(b, off, amount);
139            if (read != -1) {
140                uncompressedBytesRemaining -= read;
141                count(read);
142            }
143        } else if (currentCompressedChunk != null) {
144            long before = currentCompressedChunk.getBytesRead();
145            read = currentCompressedChunk.read(b, off, len);
146            if (read == -1) {
147                currentCompressedChunk.close();
148                currentCompressedChunk = null;
149            } else {
150                count(currentCompressedChunk.getBytesRead() - before);
151            }
152        }
153        if (read > 0) {
154            checksum.update(b, off, read);
155        }
156        return read;
157    }
158
159    private void readNextBlock() throws IOException {
160        verifyLastChecksumAndReset();
161        inUncompressedChunk = false;
162        int type = readOneByte();
163        if (type == -1) {
164            endReached = true;
165        } else if (type == STREAM_IDENTIFIER_TYPE) {
166            in.unread(type);
167            pushedBackBytes(1);
168            readStreamIdentifier();
169            readNextBlock();
170        } else if (type == PADDING_CHUNK_TYPE
171                   || (type > MAX_UNSKIPPABLE_TYPE && type <= MAX_SKIPPABLE_TYPE)) {
172            skipBlock();
173            readNextBlock();
174        } else if (type >= MIN_UNSKIPPABLE_TYPE && type <= MAX_UNSKIPPABLE_TYPE) {
175            throw new IOException("unskippable chunk with type " + type
176                                  + " (hex " + Integer.toHexString(type) + ")"
177                                  + " detected.");
178        } else if (type == UNCOMPRESSED_CHUNK_TYPE) {
179            inUncompressedChunk = true;
180            uncompressedBytesRemaining = readSize() - 4 /* CRC */;
181            expectedChecksum = unmask(readCrc());
182        } else if (type == COMPRESSED_CHUNK_TYPE) {
183            long size = readSize() - 4 /* CRC */;
184            expectedChecksum = unmask(readCrc());
185            currentCompressedChunk =
186                new SnappyCompressorInputStream(new BoundedInputStream(in, size));
187            // constructor reads uncompressed size
188            count(currentCompressedChunk.getBytesRead());
189        } else {
190            // impossible as all potential byte values have been covered
191            throw new IOException("unknown chunk type " + type
192                                  + " detected.");
193        }
194    }
195
196    private long readCrc() throws IOException {
197        byte[] b = new byte[4];
198        int read = IOUtils.readFully(in, b);
199        count(read);
200        if (read != 4) {
201            throw new IOException("premature end of stream");
202        }
203        long crc = 0;
204        for (int i = 0; i < 4; i++) {
205            crc |= (b[i] & 0xFFL) << (8 * i);
206        }
207        return crc;
208    }
209
210    static long unmask(long x) {
211        // ugly, maybe we should just have used ints and deal with the
212        // overflow
213        x -= MASK_OFFSET;
214        x &= 0xffffFFFFL;
215        return ((x >> 17) | (x << 15)) & 0xffffFFFFL;
216    }
217
218    private int readSize() throws IOException {
219        int b = 0;
220        int sz = 0;
221        for (int i = 0; i < 3; i++) {
222            b = readOneByte();
223            if (b == -1) {
224                throw new IOException("premature end of stream");
225            }
226            sz |= (b << (i * 8));
227        }
228        return sz;
229    }
230
231    private void skipBlock() throws IOException {
232        int size = readSize();
233        long read = IOUtils.skip(in, size);
234        count(read);
235        if (read != size) {
236            throw new IOException("premature end of stream");
237        }
238    }
239
240    private void readStreamIdentifier() throws IOException {
241        byte[] b = new byte[10];
242        int read = IOUtils.readFully(in, b);
243        count(read);
244        if (10 != read || !matches(b, 10)) {
245            throw new IOException("Not a framed Snappy stream");
246        }
247    }
248
249    private int readOneByte() throws IOException {
250        int b = in.read();
251        if (b != -1) {
252            count(1);
253            return b & 0xFF;
254        }
255        return -1;
256    }
257
258    private void verifyLastChecksumAndReset() throws IOException {
259        if (expectedChecksum >= 0 && expectedChecksum != checksum.getValue()) {
260            throw new IOException("Checksum verification failed");
261        }
262        expectedChecksum = -1;
263        checksum.reset();
264    }
265
266    /**
267     * Checks if the signature matches what is expected for a .sz file.
268     *
269     * <p>.sz files start with a chunk with tag 0xff and content sNaPpY.</p>
270     * 
271     * @param signature the bytes to check
272     * @param length    the number of bytes to check
273     * @return          true if this is a .sz stream, false otherwise
274     */
275    public static boolean matches(byte[] signature, int length) {
276
277        if (length < SZ_SIGNATURE.length) {
278            return false;
279        }
280
281        byte[] shortenedSig = signature;
282        if (signature.length > SZ_SIGNATURE.length) {
283            shortenedSig = new byte[SZ_SIGNATURE.length];
284            System.arraycopy(signature, 0, shortenedSig, 0, SZ_SIGNATURE.length);
285        }
286
287        return Arrays.equals(shortenedSig, SZ_SIGNATURE);
288    }
289
290}