001    /*
002     * Copyright 2011-2015 UnboundID Corp.
003     * All Rights Reserved.
004     */
005    /*
006     * Copyright (C) 2011-2015 UnboundID Corp.
007     *
008     * This program is free software; you can redistribute it and/or modify
009     * it under the terms of the GNU General Public License (GPLv2 only)
010     * or the terms of the GNU Lesser General Public License (LGPLv2.1 only)
011     * as published by the Free Software Foundation.
012     *
013     * This program is distributed in the hope that it will be useful,
014     * but WITHOUT ANY WARRANTY; without even the implied warranty of
015     * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
016     * GNU General Public License for more details.
017     *
018     * You should have received a copy of the GNU General Public License
019     * along with this program; if not, see <http://www.gnu.org/licenses>.
020     */
021    package com.unboundid.util;
022    
023    
024    
025    import java.io.File;
026    import java.io.FileInputStream;
027    import java.io.InputStream;
028    import java.io.IOException;
029    import java.util.ArrayList;
030    import java.util.Collection;
031    import java.util.Iterator;
032    
033    import static com.unboundid.util.UtilityMessages.*;
034    
035    
036    
037    /**
038     * This class provides an input stream implementation that can aggregate
039     * multiple input streams.  When reading data from this input stream, it will
040     * read from the first input stream until the end of it is reached, at point it
041     * will close it and start reading from the next one, and so on until all input
042     * streams have been exhausted.  Closing the aggregate input stream will cause
043     * all remaining input streams to be closed.
044     */
045    public final class AggregateInputStream
046           extends InputStream
047    {
048      // The currently-active input stream.
049      private volatile InputStream activeInputStream;
050    
051      // The iterator that will be used to access the input streams.
052      private final Iterator<InputStream> streamIterator;
053    
054    
055    
056      /**
057       * Creates a new aggregate input stream that will use the provided set of
058       * input streams.
059       *
060       * @param  inputStreams  The input streams to be used by this aggregate input
061       *                       stream.  It must not be {@code null}.
062       */
063      public AggregateInputStream(final InputStream... inputStreams)
064      {
065        this(StaticUtils.toList(inputStreams));
066      }
067    
068    
069    
070      /**
071       * Creates a new aggregate input stream that will use the provided set of
072       * input streams.
073       *
074       * @param  inputStreams  The input streams to be used by this aggregate input
075       *                       stream.  It must not be {@code null}.
076       */
077      public AggregateInputStream(
078                  final Collection<? extends InputStream> inputStreams)
079      {
080        Validator.ensureNotNull(inputStreams);
081    
082        final ArrayList<InputStream> streamList =
083             new ArrayList<InputStream>(inputStreams);
084        streamIterator = streamList.iterator();
085        activeInputStream = null;
086      }
087    
088    
089    
090      /**
091       * Creates a new aggregate input stream that will read data from the specified
092       * files.
093       *
094       * @param  files  The set of files to be read by this aggregate input stream.
095       *                It must not be {@code null}.
096       *
097       * @throws  IOException  If a problem is encountered while attempting to
098       *                       create input streams for the provided files.
099       */
100      public AggregateInputStream(final File... files)
101             throws IOException
102      {
103        Validator.ensureNotNull(files);
104    
105        final ArrayList<InputStream> streamList =
106             new ArrayList<InputStream>(files.length);
107    
108        IOException ioException = null;
109        for (final File f : files)
110        {
111          try
112          {
113            streamList.add(new FileInputStream(f));
114          }
115          catch (final IOException ioe)
116          {
117            Debug.debugException(ioe);
118            ioException = ioe;
119            break;
120          }
121        }
122    
123        if (ioException != null)
124        {
125          for (final InputStream s : streamList)
126          {
127            if (s != null)
128            {
129              try
130              {
131                s.close();
132              }
133              catch (final Exception e)
134              {
135                Debug.debugException(e);
136              }
137            }
138          }
139    
140          throw ioException;
141        }
142    
143        streamIterator = streamList.iterator();
144        activeInputStream = null;
145      }
146    
147    
148    
149      /**
150       * Reads the next byte of data from the current active input stream, switching
151       * to the next input stream in the set if appropriate.
152       *
153       * @return  The next byte of data that was read, or -1 if all streams have
154       *          been exhausted.
155       *
156       * @throws  IOException  If a problem is encountered while attempting to read
157       *                       data from an input stream.
158       */
159      @Override()
160      public int read()
161             throws IOException
162      {
163        while (true)
164        {
165          if (activeInputStream == null)
166          {
167            if (streamIterator.hasNext())
168            {
169              activeInputStream = streamIterator.next();
170              continue;
171            }
172            else
173            {
174              return -1;
175            }
176          }
177    
178          final int byteRead = activeInputStream.read();
179          if (byteRead < 0)
180          {
181            activeInputStream.close();
182            activeInputStream = null;
183          }
184          else
185          {
186            return byteRead;
187          }
188        }
189      }
190    
191    
192    
193      /**
194       * Reads data from the current active input stream into the provided array,
195       * switching to the next input stream in the set if appropriate.
196       *
197       * @param  b  The array into which the data read should be placed, starting
198       *            with an index of zero.  It must not be {@code null}.
199       *
200       * @return  The number of bytes read into the array, or -1 if all streams have
201       *          been exhausted.
202       *
203       * @throws  IOException  If a problem is encountered while attempting to read
204       *                       data from an input stream.
205       */
206      @Override()
207      public int read(final byte[] b)
208             throws IOException
209      {
210        return read(b, 0, b.length);
211      }
212    
213    
214    
215      /**
216       * Reads data from the current active input stream into the provided array,
217       * switching to the next input stream in the set if appropriate.
218       *
219       * @param  b    The array into which the data read should be placed.  It must
220       *              not be {@code null}.
221       * @param  off  The position in the array at which to start writing data.
222       * @param  len  The maximum number of bytes that may be read.
223       *
224       * @return  The number of bytes read into the array, or -1 if all streams have
225       *          been exhausted.
226       *
227       * @throws  IOException  If a problem is encountered while attempting to read
228       *                       data from an input stream.
229       */
230      @Override()
231      public int read(final byte[] b, final int off, final int len)
232             throws IOException
233      {
234        while (true)
235        {
236          if (activeInputStream == null)
237          {
238            if (streamIterator.hasNext())
239            {
240              activeInputStream = streamIterator.next();
241              continue;
242            }
243            else
244            {
245              return -1;
246            }
247          }
248    
249          final int bytesRead = activeInputStream.read(b, off, len);
250          if (bytesRead < 0)
251          {
252            activeInputStream.close();
253            activeInputStream = null;
254          }
255          else
256          {
257            return bytesRead;
258          }
259        }
260      }
261    
262    
263    
264      /**
265       * Attempts to skip and discard up to the specified number of bytes from the
266       * input stream.
267       *
268       * @param  n  The number of bytes to attempt to skip.
269       *
270       * @return  The number of bytes actually skipped.
271       *
272       * @throws  IOException  If a problem is encountered while attempting to skip
273       *                       data from the input stream.
274       */
275      @Override()
276      public long skip(final long n)
277             throws IOException
278      {
279        if (activeInputStream == null)
280        {
281          if (streamIterator.hasNext())
282          {
283            activeInputStream = streamIterator.next();
284            return activeInputStream.skip(n);
285          }
286          else
287          {
288            return 0L;
289          }
290        }
291        else
292        {
293          return activeInputStream.skip(n);
294        }
295      }
296    
297    
298    
299      /**
300       * Retrieves an estimate of the number of bytes that can be read without
301       * blocking.
302       *
303       * @return  An estimate of the number of bytes that can be read without
304       *          blocking.
305       *
306       * @throws  IOException  If a problem is encountered while attempting to make
307       *                       the determination.
308       */
309      @Override()
310      public int available()
311             throws IOException
312      {
313        if (activeInputStream == null)
314        {
315          if (streamIterator.hasNext())
316          {
317            activeInputStream = streamIterator.next();
318            return activeInputStream.available();
319          }
320          else
321          {
322            return 0;
323          }
324        }
325        else
326        {
327          return activeInputStream.available();
328        }
329      }
330    
331    
332    
333      /**
334       * Indicates whether this input stream supports the use of the {@code mark}
335       * and {@code reset} methods.  This implementation does not support that
336       * capability.
337       *
338       * @return  {@code false} to indicate that this input stream implementation
339       *          does not support the use of {@code mark} and {@code reset}.
340       */
341      @Override()
342      public boolean markSupported()
343      {
344        return false;
345      }
346    
347    
348    
349      /**
350       * Marks the current position in the input stream.  This input stream does not
351       * support this functionality, so no action will be taken.
352       *
353       * @param  readLimit  The maximum number of bytes that the caller may wish to
354       *                    read before being able to reset the stream.
355       */
356      @Override()
357      public void mark(final int readLimit)
358      {
359        // No implementation is required.
360      }
361    
362    
363    
364      /**
365       * Attempts to reset the position of this input stream to the mark location.
366       * This implementation does not support {@code mark} and {@code reset}
367       * functionality, so this method will always throw an exception.
368       *
369       * @throws  IOException  To indicate that reset is not supported.
370       */
371      @Override()
372      public void reset()
373             throws IOException
374      {
375        throw new IOException(ERR_AGGREGATE_INPUT_STREAM_MARK_NOT_SUPPORTED.get());
376      }
377    
378    
379    
380      /**
381       * Closes this input stream.  All associated input streams will be closed.
382       *
383       * @throws  IOException  If an exception was encountered while attempting to
384       *                       close any of the associated streams.  Note that even
385       *                       if an exception is encountered, an attempt will be
386       *                       made to close all streams.
387       */
388      @Override()
389      public void close()
390             throws IOException
391      {
392        IOException firstException = null;
393    
394        if (activeInputStream != null)
395        {
396          try
397          {
398            activeInputStream.close();
399          }
400          catch (final IOException ioe)
401          {
402            Debug.debugException(ioe);
403            firstException = ioe;
404          }
405          activeInputStream = null;
406        }
407    
408        while (streamIterator.hasNext())
409        {
410          final InputStream s = streamIterator.next();
411          try
412          {
413            s.close();
414          }
415          catch (final IOException ioe)
416          {
417            Debug.debugException(ioe);
418            if (firstException == null)
419            {
420              firstException = ioe;
421            }
422          }
423        }
424    
425        if (firstException != null)
426        {
427          throw firstException;
428        }
429      }
430    }