001 /* 002 * Copyright 2011-2014 UnboundID Corp. 003 * All Rights Reserved. 004 */ 005 /* 006 * Copyright (C) 2011-2014 UnboundID Corp. 007 * 008 * This program is free software; you can redistribute it and/or modify 009 * it under the terms of the GNU General Public License (GPLv2 only) 010 * or the terms of the GNU Lesser General Public License (LGPLv2.1 only) 011 * as published by the Free Software Foundation. 012 * 013 * This program is distributed in the hope that it will be useful, 014 * but WITHOUT ANY WARRANTY; without even the implied warranty of 015 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 016 * GNU General Public License for more details. 017 * 018 * You should have received a copy of the GNU General Public License 019 * along with this program; if not, see <http://www.gnu.org/licenses>. 020 */ 021 package com.unboundid.util; 022 023 024 025 import java.io.File; 026 import java.io.FileInputStream; 027 import java.io.InputStream; 028 import java.io.IOException; 029 import java.util.ArrayList; 030 import java.util.Collection; 031 import java.util.Iterator; 032 033 import static com.unboundid.util.UtilityMessages.*; 034 035 036 037 /** 038 * This class provides an input stream implementation that can aggregate 039 * multiple input streams. When reading data from this input stream, it will 040 * read from the first input stream until the end of it is reached, at point it 041 * will close it and start reading from the next one, and so on until all input 042 * streams have been exhausted. Closing the aggregate input stream will cause 043 * all remaining input streams to be closed. 044 */ 045 public final class AggregateInputStream 046 extends InputStream 047 { 048 // The currently-active input stream. 049 private volatile InputStream activeInputStream; 050 051 // The iterator that will be used to access the input streams. 052 private final Iterator<InputStream> streamIterator; 053 054 055 056 /** 057 * Creates a new aggregate input stream that will use the provided set of 058 * input streams. 059 * 060 * @param inputStreams The input streams to be used by this aggregate input 061 * stream. It must not be {@code null}. 062 */ 063 public AggregateInputStream(final InputStream... inputStreams) 064 { 065 this(StaticUtils.toList(inputStreams)); 066 } 067 068 069 070 /** 071 * Creates a new aggregate input stream that will use the provided set of 072 * input streams. 073 * 074 * @param inputStreams The input streams to be used by this aggregate input 075 * stream. It must not be {@code null}. 076 */ 077 public AggregateInputStream( 078 final Collection<? extends InputStream> inputStreams) 079 { 080 Validator.ensureNotNull(inputStreams); 081 082 final ArrayList<InputStream> streamList = 083 new ArrayList<InputStream>(inputStreams); 084 streamIterator = streamList.iterator(); 085 activeInputStream = null; 086 } 087 088 089 090 /** 091 * Creates a new aggregate input stream that will read data from the specified 092 * files. 093 * 094 * @param files The set of files to be read by this aggregate input stream. 095 * It must not be {@code null}. 096 * 097 * @throws IOException If a problem is encountered while attempting to 098 * create input streams for the provided files. 099 */ 100 public AggregateInputStream(final File... files) 101 throws IOException 102 { 103 Validator.ensureNotNull(files); 104 105 final ArrayList<InputStream> streamList = 106 new ArrayList<InputStream>(files.length); 107 108 IOException ioException = null; 109 for (final File f : files) 110 { 111 try 112 { 113 streamList.add(new FileInputStream(f)); 114 } 115 catch (final IOException ioe) 116 { 117 Debug.debugException(ioe); 118 ioException = ioe; 119 break; 120 } 121 } 122 123 if (ioException != null) 124 { 125 for (final InputStream s : streamList) 126 { 127 if (s != null) 128 { 129 try 130 { 131 s.close(); 132 } 133 catch (final Exception e) 134 { 135 Debug.debugException(e); 136 } 137 } 138 } 139 140 throw ioException; 141 } 142 143 streamIterator = streamList.iterator(); 144 activeInputStream = null; 145 } 146 147 148 149 /** 150 * Reads the next byte of data from the current active input stream, switching 151 * to the next input stream in the set if appropriate. 152 * 153 * @return The next byte of data that was read, or -1 if all streams have 154 * been exhausted. 155 * 156 * @throws IOException If a problem is encountered while attempting to read 157 * data from an input stream. 158 */ 159 @Override() 160 public int read() 161 throws IOException 162 { 163 while (true) 164 { 165 if (activeInputStream == null) 166 { 167 if (streamIterator.hasNext()) 168 { 169 activeInputStream = streamIterator.next(); 170 continue; 171 } 172 else 173 { 174 return -1; 175 } 176 } 177 178 final int byteRead = activeInputStream.read(); 179 if (byteRead < 0) 180 { 181 activeInputStream.close(); 182 activeInputStream = null; 183 } 184 else 185 { 186 return byteRead; 187 } 188 } 189 } 190 191 192 193 /** 194 * Reads data from the current active input stream into the provided array, 195 * switching to the next input stream in the set if appropriate. 196 * 197 * @param b The array into which the data read should be placed, starting 198 * with an index of zero. It must not be {@code null}. 199 * 200 * @return The number of bytes read into the array, or -1 if all streams have 201 * been exhausted. 202 * 203 * @throws IOException If a problem is encountered while attempting to read 204 * data from an input stream. 205 */ 206 @Override() 207 public int read(final byte[] b) 208 throws IOException 209 { 210 return read(b, 0, b.length); 211 } 212 213 214 215 /** 216 * Reads data from the current active input stream into the provided array, 217 * switching to the next input stream in the set if appropriate. 218 * 219 * @param b The array into which the data read should be placed. It must 220 * not be {@code null}. 221 * @param off The position in the array at which to start writing data. 222 * @param len The maximum number of bytes that may be read. 223 * 224 * @return The number of bytes read into the array, or -1 if all streams have 225 * been exhausted. 226 * 227 * @throws IOException If a problem is encountered while attempting to read 228 * data from an input stream. 229 */ 230 @Override() 231 public int read(final byte[] b, final int off, final int len) 232 throws IOException 233 { 234 while (true) 235 { 236 if (activeInputStream == null) 237 { 238 if (streamIterator.hasNext()) 239 { 240 activeInputStream = streamIterator.next(); 241 continue; 242 } 243 else 244 { 245 return -1; 246 } 247 } 248 249 final int bytesRead = activeInputStream.read(b, off, len); 250 if (bytesRead < 0) 251 { 252 activeInputStream.close(); 253 activeInputStream = null; 254 } 255 else 256 { 257 return bytesRead; 258 } 259 } 260 } 261 262 263 264 /** 265 * Attempts to skip and discard up to the specified number of bytes from the 266 * input stream. 267 * 268 * @param n The number of bytes to attempt to skip. 269 * 270 * @return The number of bytes actually skipped. 271 * 272 * @throws IOException If a problem is encountered while attempting to skip 273 * data from the input stream. 274 */ 275 @Override() 276 public long skip(final long n) 277 throws IOException 278 { 279 if (activeInputStream == null) 280 { 281 if (streamIterator.hasNext()) 282 { 283 activeInputStream = streamIterator.next(); 284 return activeInputStream.skip(n); 285 } 286 else 287 { 288 return 0L; 289 } 290 } 291 else 292 { 293 return activeInputStream.skip(n); 294 } 295 } 296 297 298 299 /** 300 * Retrieves an estimate of the number of bytes that can be read without 301 * blocking. 302 * 303 * @return An estimate of the number of bytes that can be read without 304 * blocking. 305 * 306 * @throws IOException If a problem is encountered while attempting to make 307 * the determination. 308 */ 309 @Override() 310 public int available() 311 throws IOException 312 { 313 if (activeInputStream == null) 314 { 315 if (streamIterator.hasNext()) 316 { 317 activeInputStream = streamIterator.next(); 318 return activeInputStream.available(); 319 } 320 else 321 { 322 return 0; 323 } 324 } 325 else 326 { 327 return activeInputStream.available(); 328 } 329 } 330 331 332 333 /** 334 * Indicates whether this input stream supports the use of the {@code mark} 335 * and {@code reset} methods. This implementation does not support that 336 * capability. 337 * 338 * @return {@code false} to indicate that this input stream implementation 339 * does not support the use of {@code mark} and {@code reset}. 340 */ 341 @Override() 342 public boolean markSupported() 343 { 344 return false; 345 } 346 347 348 349 /** 350 * Marks the current position in the input stream. This input stream does not 351 * support this functionality, so no action will be taken. 352 * 353 * @param readLimit The maximum number of bytes that the caller may wish to 354 * read before being able to reset the stream. 355 */ 356 @Override() 357 public void mark(final int readLimit) 358 { 359 // No implementation is required. 360 } 361 362 363 364 /** 365 * Attempts to reset the position of this input stream to the mark location. 366 * This implementation does not support {@code mark} and {@code reset} 367 * functionality, so this method will always throw an exception. 368 * 369 * @throws IOException To indicate that reset is not supported. 370 */ 371 @Override() 372 public void reset() 373 throws IOException 374 { 375 throw new IOException(ERR_AGGREGATE_INPUT_STREAM_MARK_NOT_SUPPORTED.get()); 376 } 377 378 379 380 /** 381 * Closes this input stream. All associated input streams will be closed. 382 * 383 * @throws IOException If an exception was encountered while attempting to 384 * close any of the associated streams. Note that even 385 * if an exception is encountered, an attempt will be 386 * made to close all streams. 387 */ 388 @Override() 389 public void close() 390 throws IOException 391 { 392 IOException firstException = null; 393 394 if (activeInputStream != null) 395 { 396 try 397 { 398 activeInputStream.close(); 399 } 400 catch (final IOException ioe) 401 { 402 Debug.debugException(ioe); 403 firstException = ioe; 404 } 405 activeInputStream = null; 406 } 407 408 while (streamIterator.hasNext()) 409 { 410 final InputStream s = streamIterator.next(); 411 try 412 { 413 s.close(); 414 } 415 catch (final IOException ioe) 416 { 417 Debug.debugException(ioe); 418 if (firstException == null) 419 { 420 firstException = ioe; 421 } 422 } 423 } 424 425 if (firstException != null) 426 { 427 throw firstException; 428 } 429 } 430 }