001/* 002 * Copyright 2018-2024 Ping Identity Corporation 003 * All Rights Reserved. 004 */ 005/* 006 * Copyright 2018-2024 Ping Identity Corporation 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020/* 021 * Copyright (C) 2018-2024 Ping Identity Corporation 022 * 023 * This program is free software; you can redistribute it and/or modify 024 * it under the terms of the GNU General Public License (GPLv2 only) 025 * or the terms of the GNU Lesser General Public License (LGPLv2.1 only) 026 * as published by the Free Software Foundation. 027 * 028 * This program is distributed in the hope that it will be useful, 029 * but WITHOUT ANY WARRANTY; without even the implied warranty of 030 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 031 * GNU General Public License for more details. 032 * 033 * You should have received a copy of the GNU General Public License 034 * along with this program; if not, see <http://www.gnu.org/licenses>. 035 */ 036package com.unboundid.util; 037 038 039 040import java.io.InputStream; 041import java.io.IOException; 042 043 044 045/** 046 * This class provides an {@code InputStream} implementation that uses a 047 * {@link FixedRateBarrier} to impose an upper bound on the rate (in bytes per 048 * second) at which data can be read from a wrapped {@code InputStream}. 049 */ 050@ThreadSafety(level=ThreadSafetyLevel.NOT_THREADSAFE) 051public final class RateLimitedInputStream 052 extends InputStream 053{ 054 // The fixed-rate barrier that will serve as a rate limiter for this class. 055 @NotNull private final FixedRateBarrier rateLimiter; 056 057 // The input stream from which the data will actually be read. 058 @NotNull private final InputStream wrappedStream; 059 060 // The maximum number of bytes that can be read in any single call to the 061 // rate limiter. 062 private final int maxBytesPerRead; 063 064 065 066 /** 067 * Creates a new instance of this rate-limited input stream that wraps the 068 * provided input stream. 069 * 070 * @param wrappedStream The input stream from which the data will 071 * actually be read. It must not be {@code null}. 072 * @param maxBytesPerSecond The maximum number of bytes per second that can 073 * be read using this input stream. It must be 074 * greater than zero. 075 */ 076 public RateLimitedInputStream(@NotNull final InputStream wrappedStream, 077 final int maxBytesPerSecond) 078 { 079 Validator.ensureTrue((wrappedStream != null), 080 "RateLimitedInputStream.wrappedStream must not be null."); 081 Validator.ensureTrue((maxBytesPerSecond > 0), 082 "RateLimitedInputStream.maxBytesPerSecond must be greater than " + 083 "zero. The provided value was " + maxBytesPerSecond); 084 085 this.wrappedStream = wrappedStream; 086 087 rateLimiter = new FixedRateBarrier(1000L, maxBytesPerSecond); 088 maxBytesPerRead = Math.max(1, (maxBytesPerSecond / 100)); 089 } 090 091 092 093 /** 094 * Closes this input stream and the wrapped stream. 095 * 096 * @throws IOException If a problem is encountered while closing the wrapped 097 * input stream. 098 */ 099 @Override() 100 public void close() 101 throws IOException 102 { 103 wrappedStream.close(); 104 } 105 106 107 108 /** 109 * Reads a single byte of input from the wrapped input stream. 110 * 111 * @return The byte that was read, or -1 if the end of the input stream has 112 * been reached. 113 * 114 * @throws IOException If a problem is encountered while attempting to read 115 * data from the underlying input stream. 116 */ 117 @Override() 118 public int read() 119 throws IOException 120 { 121 rateLimiter.await(); 122 return wrappedStream.read(); 123 } 124 125 126 127 /** 128 * Reads data from the wrapped input stream into the provided array. 129 * 130 * @param b The array into which the data will be placed. 131 * 132 * @return The number of bytes that were read, or -1 if the end of the input 133 * stream has been reached. 134 * 135 * @throws IOException If a problem is encountered while attempting to read 136 * data from the underlying input stream. 137 */ 138 @Override() 139 public int read(@NotNull final byte[] b) 140 throws IOException 141 { 142 return read(b, 0, b.length); 143 } 144 145 146 147 /** 148 * Reads data from the wrapped input stream into the specified portion of the 149 * provided array. 150 * 151 * @param b The array into which the data will be placed. 152 * @param offset The index into the provided array at which the data should 153 * start being added. 154 * @param length The maximum number of bytes to be added into the array. 155 * 156 * @return The number of bytes that were read, or -1 if the end of the input 157 * stream has been reached. 158 * 159 * @throws IOException If a problem is encountered while attempting to read 160 * data from the underlying input stream. 161 */ 162 @Override() 163 public int read(@NotNull final byte[] b, final int offset, final int length) 164 throws IOException 165 { 166 if (length <= 0) 167 { 168 return 0; 169 } 170 171 if (length <= maxBytesPerRead) 172 { 173 rateLimiter.await(length); 174 return wrappedStream.read(b, offset, length); 175 } 176 else 177 { 178 int pos = offset; 179 int remainingLength = length; 180 int totalBytesRead = 0; 181 while (remainingLength > 0) 182 { 183 final int lengthThisRead = Math.min(remainingLength, maxBytesPerRead); 184 rateLimiter.await(lengthThisRead); 185 final int bytesRead = wrappedStream.read(b, pos, lengthThisRead); 186 if (bytesRead < 0) 187 { 188 break; 189 } 190 191 pos += bytesRead; 192 totalBytesRead += bytesRead; 193 remainingLength -= bytesRead; 194 } 195 196 return totalBytesRead; 197 } 198 } 199 200 201 202 /** 203 * Retrieves the number of bytes that are immediately available to be read, 204 * if the wrapped stream supports this operation. 205 * 206 * @return The number of bytes that are immediately available to be read, or 207 * zero if there are no bytes to be read, if the end of the input 208 * stream has been reached, or if the wrapped input stream does not 209 * support this operation. 210 */ 211 @Override() 212 public int available() 213 throws IOException 214 { 215 return wrappedStream.available(); 216 } 217 218 219 220 /** 221 * Indicates whether this {@code InputStream} implementation supports the use 222 * of the {@link #mark(int)} and {@link #reset()} methods. This 223 * implementation will support those methods if the wrapped stream supports 224 * them. 225 * 226 * @return {@code true} if this {@code InputStream} supports the 227 * {@code mark} and {@code reset} methods, or {@code false} if not. 228 */ 229 @Override() 230 public boolean markSupported() 231 { 232 return wrappedStream.markSupported(); 233 } 234 235 236 237 /** 238 * Attempts to mark the current position in the wrapped input stream so that 239 * it can optionally be reset after some amount of data has been read. 240 * fun 241 * 242 * @param readLimit The maximum number of bytes expected to be read before a 243 * call to the {@link #reset()} method before the mark will 244 * no longer be honored. 245 */ 246 @Override() 247 public void mark(final int readLimit) 248 { 249 wrappedStream.mark(readLimit); 250 } 251 252 253 254 /** 255 * Attempts to reset the position of this input stream to the last mark 256 * position. 257 * 258 * @throws IOException If the input stream cannot be repositioned to the 259 * marked location, or if no mark has been set. 260 */ 261 @Override() 262 public void reset() 263 throws IOException 264 { 265 wrappedStream.reset(); 266 } 267}