001    /*
002     * Copyright 2011-2016 UnboundID Corp.
003     * All Rights Reserved.
004     */
005    /*
006     * Copyright (C) 2011-2016 UnboundID Corp.
007     *
008     * This program is free software; you can redistribute it and/or modify
009     * it under the terms of the GNU General Public License (GPLv2 only)
010     * or the terms of the GNU Lesser General Public License (LGPLv2.1 only)
011     * as published by the Free Software Foundation.
012     *
013     * This program is distributed in the hope that it will be useful,
014     * but WITHOUT ANY WARRANTY; without even the implied warranty of
015     * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
016     * GNU General Public License for more details.
017     *
018     * You should have received a copy of the GNU General Public License
019     * along with this program; if not, see <http://www.gnu.org/licenses>.
020     */
021    package com.unboundid.util;
022    
023    
024    
025    import java.io.File;
026    import java.io.FileInputStream;
027    import java.io.InputStream;
028    import java.io.IOException;
029    import java.util.ArrayList;
030    import java.util.Collection;
031    import java.util.Iterator;
032    
033    import static com.unboundid.util.UtilityMessages.*;
034    
035    
036    
037    /**
038     * This class provides an input stream implementation that can aggregate
039     * multiple input streams.  When reading data from this input stream, it will
040     * read from the first input stream until the end of it is reached, at point it
041     * will close it and start reading from the next one, and so on until all input
042     * streams have been exhausted.  Closing the aggregate input stream will cause
043     * all remaining input streams to be closed.
044     */
045    @ThreadSafety(level=ThreadSafetyLevel.NOT_THREADSAFE)
046    public final class AggregateInputStream
047           extends InputStream
048    {
049      // The currently-active input stream.
050      private volatile InputStream activeInputStream;
051    
052      // The iterator that will be used to access the input streams.
053      private final Iterator<InputStream> streamIterator;
054    
055    
056    
057      /**
058       * Creates a new aggregate input stream that will use the provided set of
059       * input streams.
060       *
061       * @param  inputStreams  The input streams to be used by this aggregate input
062       *                       stream.  It must not be {@code null}.
063       */
064      public AggregateInputStream(final InputStream... inputStreams)
065      {
066        this(StaticUtils.toList(inputStreams));
067      }
068    
069    
070    
071      /**
072       * Creates a new aggregate input stream that will use the provided set of
073       * input streams.
074       *
075       * @param  inputStreams  The input streams to be used by this aggregate input
076       *                       stream.  It must not be {@code null}.
077       */
078      public AggregateInputStream(
079                  final Collection<? extends InputStream> inputStreams)
080      {
081        Validator.ensureNotNull(inputStreams);
082    
083        final ArrayList<InputStream> streamList =
084             new ArrayList<InputStream>(inputStreams);
085        streamIterator = streamList.iterator();
086        activeInputStream = null;
087      }
088    
089    
090    
091      /**
092       * Creates a new aggregate input stream that will read data from the specified
093       * files.
094       *
095       * @param  files  The set of files to be read by this aggregate input stream.
096       *                It must not be {@code null}.
097       *
098       * @throws  IOException  If a problem is encountered while attempting to
099       *                       create input streams for the provided files.
100       */
101      public AggregateInputStream(final File... files)
102             throws IOException
103      {
104        Validator.ensureNotNull(files);
105    
106        final ArrayList<InputStream> streamList =
107             new ArrayList<InputStream>(files.length);
108    
109        IOException ioException = null;
110        for (final File f : files)
111        {
112          try
113          {
114            streamList.add(new FileInputStream(f));
115          }
116          catch (final IOException ioe)
117          {
118            Debug.debugException(ioe);
119            ioException = ioe;
120            break;
121          }
122        }
123    
124        if (ioException != null)
125        {
126          for (final InputStream s : streamList)
127          {
128            if (s != null)
129            {
130              try
131              {
132                s.close();
133              }
134              catch (final Exception e)
135              {
136                Debug.debugException(e);
137              }
138            }
139          }
140    
141          throw ioException;
142        }
143    
144        streamIterator = streamList.iterator();
145        activeInputStream = null;
146      }
147    
148    
149    
150      /**
151       * Reads the next byte of data from the current active input stream, switching
152       * to the next input stream in the set if appropriate.
153       *
154       * @return  The next byte of data that was read, or -1 if all streams have
155       *          been exhausted.
156       *
157       * @throws  IOException  If a problem is encountered while attempting to read
158       *                       data from an input stream.
159       */
160      @Override()
161      public int read()
162             throws IOException
163      {
164        while (true)
165        {
166          if (activeInputStream == null)
167          {
168            if (streamIterator.hasNext())
169            {
170              activeInputStream = streamIterator.next();
171              continue;
172            }
173            else
174            {
175              return -1;
176            }
177          }
178    
179          final int byteRead = activeInputStream.read();
180          if (byteRead < 0)
181          {
182            activeInputStream.close();
183            activeInputStream = null;
184          }
185          else
186          {
187            return byteRead;
188          }
189        }
190      }
191    
192    
193    
194      /**
195       * Reads data from the current active input stream into the provided array,
196       * switching to the next input stream in the set if appropriate.
197       *
198       * @param  b  The array into which the data read should be placed, starting
199       *            with an index of zero.  It must not be {@code null}.
200       *
201       * @return  The number of bytes read into the array, or -1 if all streams have
202       *          been exhausted.
203       *
204       * @throws  IOException  If a problem is encountered while attempting to read
205       *                       data from an input stream.
206       */
207      @Override()
208      public int read(final byte[] b)
209             throws IOException
210      {
211        return read(b, 0, b.length);
212      }
213    
214    
215    
216      /**
217       * Reads data from the current active input stream into the provided array,
218       * switching to the next input stream in the set if appropriate.
219       *
220       * @param  b    The array into which the data read should be placed.  It must
221       *              not be {@code null}.
222       * @param  off  The position in the array at which to start writing data.
223       * @param  len  The maximum number of bytes that may be read.
224       *
225       * @return  The number of bytes read into the array, or -1 if all streams have
226       *          been exhausted.
227       *
228       * @throws  IOException  If a problem is encountered while attempting to read
229       *                       data from an input stream.
230       */
231      @Override()
232      public int read(final byte[] b, final int off, final int len)
233             throws IOException
234      {
235        while (true)
236        {
237          if (activeInputStream == null)
238          {
239            if (streamIterator.hasNext())
240            {
241              activeInputStream = streamIterator.next();
242              continue;
243            }
244            else
245            {
246              return -1;
247            }
248          }
249    
250          final int bytesRead = activeInputStream.read(b, off, len);
251          if (bytesRead < 0)
252          {
253            activeInputStream.close();
254            activeInputStream = null;
255          }
256          else
257          {
258            return bytesRead;
259          }
260        }
261      }
262    
263    
264    
265      /**
266       * Attempts to skip and discard up to the specified number of bytes from the
267       * input stream.
268       *
269       * @param  n  The number of bytes to attempt to skip.
270       *
271       * @return  The number of bytes actually skipped.
272       *
273       * @throws  IOException  If a problem is encountered while attempting to skip
274       *                       data from the input stream.
275       */
276      @Override()
277      public long skip(final long n)
278             throws IOException
279      {
280        if (activeInputStream == null)
281        {
282          if (streamIterator.hasNext())
283          {
284            activeInputStream = streamIterator.next();
285            return activeInputStream.skip(n);
286          }
287          else
288          {
289            return 0L;
290          }
291        }
292        else
293        {
294          return activeInputStream.skip(n);
295        }
296      }
297    
298    
299    
300      /**
301       * Retrieves an estimate of the number of bytes that can be read without
302       * blocking.
303       *
304       * @return  An estimate of the number of bytes that can be read without
305       *          blocking.
306       *
307       * @throws  IOException  If a problem is encountered while attempting to make
308       *                       the determination.
309       */
310      @Override()
311      public int available()
312             throws IOException
313      {
314        if (activeInputStream == null)
315        {
316          if (streamIterator.hasNext())
317          {
318            activeInputStream = streamIterator.next();
319            return activeInputStream.available();
320          }
321          else
322          {
323            return 0;
324          }
325        }
326        else
327        {
328          return activeInputStream.available();
329        }
330      }
331    
332    
333    
334      /**
335       * Indicates whether this input stream supports the use of the {@code mark}
336       * and {@code reset} methods.  This implementation does not support that
337       * capability.
338       *
339       * @return  {@code false} to indicate that this input stream implementation
340       *          does not support the use of {@code mark} and {@code reset}.
341       */
342      @Override()
343      public boolean markSupported()
344      {
345        return false;
346      }
347    
348    
349    
350      /**
351       * Marks the current position in the input stream.  This input stream does not
352       * support this functionality, so no action will be taken.
353       *
354       * @param  readLimit  The maximum number of bytes that the caller may wish to
355       *                    read before being able to reset the stream.
356       */
357      @Override()
358      public void mark(final int readLimit)
359      {
360        // No implementation is required.
361      }
362    
363    
364    
365      /**
366       * Attempts to reset the position of this input stream to the mark location.
367       * This implementation does not support {@code mark} and {@code reset}
368       * functionality, so this method will always throw an exception.
369       *
370       * @throws  IOException  To indicate that reset is not supported.
371       */
372      @Override()
373      public void reset()
374             throws IOException
375      {
376        throw new IOException(ERR_AGGREGATE_INPUT_STREAM_MARK_NOT_SUPPORTED.get());
377      }
378    
379    
380    
381      /**
382       * Closes this input stream.  All associated input streams will be closed.
383       *
384       * @throws  IOException  If an exception was encountered while attempting to
385       *                       close any of the associated streams.  Note that even
386       *                       if an exception is encountered, an attempt will be
387       *                       made to close all streams.
388       */
389      @Override()
390      public void close()
391             throws IOException
392      {
393        IOException firstException = null;
394    
395        if (activeInputStream != null)
396        {
397          try
398          {
399            activeInputStream.close();
400          }
401          catch (final IOException ioe)
402          {
403            Debug.debugException(ioe);
404            firstException = ioe;
405          }
406          activeInputStream = null;
407        }
408    
409        while (streamIterator.hasNext())
410        {
411          final InputStream s = streamIterator.next();
412          try
413          {
414            s.close();
415          }
416          catch (final IOException ioe)
417          {
418            Debug.debugException(ioe);
419            if (firstException == null)
420            {
421              firstException = ioe;
422            }
423          }
424        }
425    
426        if (firstException != null)
427        {
428          throw firstException;
429        }
430      }
431    }