001/*
002 * Copyright 2011-2024 Ping Identity Corporation
003 * All Rights Reserved.
004 */
005/*
006 * Copyright 2011-2024 Ping Identity Corporation
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *    http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020/*
021 * Copyright (C) 2011-2024 Ping Identity Corporation
022 *
023 * This program is free software; you can redistribute it and/or modify
024 * it under the terms of the GNU General Public License (GPLv2 only)
025 * or the terms of the GNU Lesser General Public License (LGPLv2.1 only)
026 * as published by the Free Software Foundation.
027 *
028 * This program is distributed in the hope that it will be useful,
029 * but WITHOUT ANY WARRANTY; without even the implied warranty of
030 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
031 * GNU General Public License for more details.
032 *
033 * You should have received a copy of the GNU General Public License
034 * along with this program; if not, see <http://www.gnu.org/licenses>.
035 */
036package com.unboundid.util;
037
038
039
040import java.io.ByteArrayInputStream;
041import java.io.File;
042import java.io.FileInputStream;
043import java.io.InputStream;
044import java.io.IOException;
045import java.util.ArrayList;
046import java.util.Collection;
047import java.util.Iterator;
048
049import static com.unboundid.util.UtilityMessages.*;
050
051
052
053/**
054 * This class provides an input stream implementation that can aggregate
055 * multiple input streams.  When reading data from this input stream, it will
056 * read from the first input stream until the end of it is reached, at point it
057 * will close it and start reading from the next one, and so on until all input
058 * streams have been exhausted.  Closing the aggregate input stream will cause
059 * all remaining input streams to be closed.
060 */
061@ThreadSafety(level=ThreadSafetyLevel.NOT_THREADSAFE)
062public final class AggregateInputStream
063       extends InputStream
064{
065  // The currently-active input stream.
066  @Nullable private volatile InputStream activeInputStream;
067
068  // The iterator that will be used to access the input streams.
069  @NotNull private final Iterator<InputStream> streamIterator;
070
071
072
073  /**
074   * Creates a new aggregate input stream that will use the provided set of
075   * input streams.
076   *
077   * @param  inputStreams  The input streams to be used by this aggregate input
078   *                       stream.  It must not be {@code null}.
079   */
080  public AggregateInputStream(@NotNull final InputStream... inputStreams)
081  {
082    this(StaticUtils.toList(inputStreams));
083  }
084
085
086
087  /**
088   * Creates a new aggregate input stream that will use the provided set of
089   * input streams.
090   *
091   * @param  inputStreams  The input streams to be used by this aggregate input
092   *                       stream.  It must not be {@code null}.
093   */
094  public AggregateInputStream(
095              @NotNull final Collection<? extends InputStream> inputStreams)
096  {
097    Validator.ensureNotNull(inputStreams);
098
099    final ArrayList<InputStream> streamList = new ArrayList<>(inputStreams);
100    streamIterator = streamList.iterator();
101    activeInputStream = null;
102  }
103
104
105
106  /**
107   * Creates a new aggregate input stream that will read data from the specified
108   * files.
109   *
110   * @param  files  The set of files to be read by this aggregate input stream.
111   *                It must not be {@code null}.
112   *
113   * @throws  IOException  If a problem is encountered while attempting to
114   *                       create input streams for the provided files.
115   */
116  public AggregateInputStream(@NotNull final File... files)
117         throws IOException
118  {
119    this(false, files);
120  }
121
122
123
124  /**
125   * Creates a new aggregate input stream that will read data from the specified
126   * files.
127   *
128   * @param  ensureBlankLinesBetweenFiles  Indicates whether to ensure that
129   *                                       there is at least one completely
130   *                                       blank line between files.  This may
131   *                                       be useful when blank lines are
132   *                                       used as delimiters (for example, when
133   *                                       reading LDIF data), there is a chance
134   *                                       that the files may not end with blank
135   *                                       lines, and the inclusion of extra
136   *                                       blank lines between files will not
137   *                                       cause any harm.
138   * @param  files                         The set of files to be read by this
139   *                                       aggregate input stream.  It must not
140   *                                       be {@code null}.
141   *
142   * @throws  IOException  If a problem is encountered while attempting to
143   *                       create input streams for the provided files.
144   */
145  public AggregateInputStream(final boolean ensureBlankLinesBetweenFiles,
146                              @NotNull final File... files)
147         throws IOException
148  {
149    Validator.ensureNotNull(files);
150
151    final ArrayList<InputStream> streamList = new ArrayList<>(2 * files.length);
152
153    IOException ioException = null;
154    for (final File f : files)
155    {
156      if (ensureBlankLinesBetweenFiles && (! streamList.isEmpty()))
157      {
158        final ByteStringBuffer buffer = new ByteStringBuffer(4);
159        buffer.append(StaticUtils.EOL_BYTES);
160        buffer.append(StaticUtils.EOL_BYTES);
161        streamList.add(new ByteArrayInputStream(buffer.toByteArray()));
162      }
163
164      try
165      {
166        streamList.add(new FileInputStream(f));
167      }
168      catch (final IOException ioe)
169      {
170        Debug.debugException(ioe);
171        ioException = ioe;
172        break;
173      }
174    }
175
176    if (ioException != null)
177    {
178      for (final InputStream s : streamList)
179      {
180        if (s != null)
181        {
182          try
183          {
184            s.close();
185          }
186          catch (final Exception e)
187          {
188            Debug.debugException(e);
189          }
190        }
191      }
192
193      throw ioException;
194    }
195
196    streamIterator = streamList.iterator();
197    activeInputStream = null;
198  }
199
200
201
202  /**
203   * Reads the next byte of data from the current active input stream, switching
204   * to the next input stream in the set if appropriate.
205   *
206   * @return  The next byte of data that was read, or -1 if all streams have
207   *          been exhausted.
208   *
209   * @throws  IOException  If a problem is encountered while attempting to read
210   *                       data from an input stream.
211   */
212  @Override()
213  public int read()
214         throws IOException
215  {
216    while (true)
217    {
218      if (activeInputStream == null)
219      {
220        if (streamIterator.hasNext())
221        {
222          activeInputStream = streamIterator.next();
223          continue;
224        }
225        else
226        {
227          return -1;
228        }
229      }
230
231      final int byteRead = activeInputStream.read();
232      if (byteRead < 0)
233      {
234        activeInputStream.close();
235        activeInputStream = null;
236      }
237      else
238      {
239        return byteRead;
240      }
241    }
242  }
243
244
245
246  /**
247   * Reads data from the current active input stream into the provided array,
248   * switching to the next input stream in the set if appropriate.
249   *
250   * @param  b  The array into which the data read should be placed, starting
251   *            with an index of zero.  It must not be {@code null}.
252   *
253   * @return  The number of bytes read into the array, or -1 if all streams have
254   *          been exhausted.
255   *
256   * @throws  IOException  If a problem is encountered while attempting to read
257   *                       data from an input stream.
258   */
259  @Override()
260  public int read(@NotNull final byte[] b)
261         throws IOException
262  {
263    return read(b, 0, b.length);
264  }
265
266
267
268  /**
269   * Reads data from the current active input stream into the provided array,
270   * switching to the next input stream in the set if appropriate.
271   *
272   * @param  b    The array into which the data read should be placed.  It must
273   *              not be {@code null}.
274   * @param  off  The position in the array at which to start writing data.
275   * @param  len  The maximum number of bytes that may be read.
276   *
277   * @return  The number of bytes read into the array, or -1 if all streams have
278   *          been exhausted.
279   *
280   * @throws  IOException  If a problem is encountered while attempting to read
281   *                       data from an input stream.
282   */
283  @Override()
284  public int read(@NotNull final byte[] b, final int off, final int len)
285         throws IOException
286  {
287    while (true)
288    {
289      if (activeInputStream == null)
290      {
291        if (streamIterator.hasNext())
292        {
293          activeInputStream = streamIterator.next();
294          continue;
295        }
296        else
297        {
298          return -1;
299        }
300      }
301
302      final int bytesRead = activeInputStream.read(b, off, len);
303      if (bytesRead < 0)
304      {
305        activeInputStream.close();
306        activeInputStream = null;
307      }
308      else
309      {
310        return bytesRead;
311      }
312    }
313  }
314
315
316
317  /**
318   * Attempts to skip and discard up to the specified number of bytes from the
319   * input stream.
320   *
321   * @param  n  The number of bytes to attempt to skip.
322   *
323   * @return  The number of bytes actually skipped.
324   *
325   * @throws  IOException  If a problem is encountered while attempting to skip
326   *                       data from the input stream.
327   */
328  @Override()
329  public long skip(final long n)
330         throws IOException
331  {
332    if (activeInputStream == null)
333    {
334      if (streamIterator.hasNext())
335      {
336        activeInputStream = streamIterator.next();
337        return activeInputStream.skip(n);
338      }
339      else
340      {
341        return 0L;
342      }
343    }
344    else
345    {
346      return activeInputStream.skip(n);
347    }
348  }
349
350
351
352  /**
353   * Retrieves an estimate of the number of bytes that can be read without
354   * blocking.
355   *
356   * @return  An estimate of the number of bytes that can be read without
357   *          blocking.
358   *
359   * @throws  IOException  If a problem is encountered while attempting to make
360   *                       the determination.
361   */
362  @Override()
363  public int available()
364         throws IOException
365  {
366    if (activeInputStream == null)
367    {
368      if (streamIterator.hasNext())
369      {
370        activeInputStream = streamIterator.next();
371        return activeInputStream.available();
372      }
373      else
374      {
375        return 0;
376      }
377    }
378    else
379    {
380      return activeInputStream.available();
381    }
382  }
383
384
385
386  /**
387   * Indicates whether this input stream supports the use of the {@code mark}
388   * and {@code reset} methods.  This implementation does not support that
389   * capability.
390   *
391   * @return  {@code false} to indicate that this input stream implementation
392   *          does not support the use of {@code mark} and {@code reset}.
393   */
394  @Override()
395  public boolean markSupported()
396  {
397    return false;
398  }
399
400
401
402  /**
403   * Marks the current position in the input stream.  This input stream does not
404   * support this functionality, so no action will be taken.
405   *
406   * @param  readLimit  The maximum number of bytes that the caller may wish to
407   *                    read before being able to reset the stream.
408   */
409  @Override()
410  public void mark(final int readLimit)
411  {
412    // No implementation is required.
413  }
414
415
416
417  /**
418   * Attempts to reset the position of this input stream to the mark location.
419   * This implementation does not support {@code mark} and {@code reset}
420   * functionality, so this method will always throw an exception.
421   *
422   * @throws  IOException  To indicate that reset is not supported.
423   */
424  @Override()
425  public void reset()
426         throws IOException
427  {
428    throw new IOException(ERR_AGGREGATE_INPUT_STREAM_MARK_NOT_SUPPORTED.get());
429  }
430
431
432
433  /**
434   * Closes this input stream.  All associated input streams will be closed.
435   *
436   * @throws  IOException  If an exception was encountered while attempting to
437   *                       close any of the associated streams.  Note that even
438   *                       if an exception is encountered, an attempt will be
439   *                       made to close all streams.
440   */
441  @Override()
442  public void close()
443         throws IOException
444  {
445    IOException firstException = null;
446
447    if (activeInputStream != null)
448    {
449      try
450      {
451        activeInputStream.close();
452      }
453      catch (final IOException ioe)
454      {
455        Debug.debugException(ioe);
456        firstException = ioe;
457      }
458      activeInputStream = null;
459    }
460
461    while (streamIterator.hasNext())
462    {
463      final InputStream s = streamIterator.next();
464      try
465      {
466        s.close();
467      }
468      catch (final IOException ioe)
469      {
470        Debug.debugException(ioe);
471        if (firstException == null)
472        {
473          firstException = ioe;
474        }
475      }
476    }
477
478    if (firstException != null)
479    {
480      throw firstException;
481    }
482  }
483}