001/*
002 * Copyright 2018-2024 Ping Identity Corporation
003 * All Rights Reserved.
004 */
005/*
006 * Copyright 2018-2024 Ping Identity Corporation
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *    http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020/*
021 * Copyright (C) 2018-2024 Ping Identity Corporation
022 *
023 * This program is free software; you can redistribute it and/or modify
024 * it under the terms of the GNU General Public License (GPLv2 only)
025 * or the terms of the GNU Lesser General Public License (LGPLv2.1 only)
026 * as published by the Free Software Foundation.
027 *
028 * This program is distributed in the hope that it will be useful,
029 * but WITHOUT ANY WARRANTY; without even the implied warranty of
030 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
031 * GNU General Public License for more details.
032 *
033 * You should have received a copy of the GNU General Public License
034 * along with this program; if not, see <http://www.gnu.org/licenses>.
035 */
036package com.unboundid.util;
037
038
039
040import java.io.InputStream;
041import java.io.IOException;
042
043
044
045/**
046 * This class provides an {@code InputStream} implementation that uses a
047 * {@link FixedRateBarrier} to impose an upper bound on the rate (in bytes per
048 * second) at which data can be read from a wrapped {@code InputStream}.
049 */
050@ThreadSafety(level=ThreadSafetyLevel.NOT_THREADSAFE)
051public final class RateLimitedInputStream
052       extends InputStream
053{
054  // The fixed-rate barrier that will serve as a rate limiter for this class.
055  @NotNull private final FixedRateBarrier rateLimiter;
056
057  // The input stream from which the data will actually be read.
058  @NotNull private final InputStream wrappedStream;
059
060  // The maximum number of bytes that can be read in any single call to the
061  // rate limiter.
062  private final int maxBytesPerRead;
063
064
065
066  /**
067   * Creates a new instance of this rate-limited input stream that wraps the
068   * provided input stream.
069   *
070   * @param  wrappedStream      The input stream from which the data will
071   *                            actually be read.  It must not be {@code null}.
072   * @param  maxBytesPerSecond  The maximum number of bytes per second that can
073   *                            be read using this input stream.  It must be
074   *                            greater than zero.
075   */
076  public RateLimitedInputStream(@NotNull final InputStream wrappedStream,
077                                final int maxBytesPerSecond)
078  {
079    Validator.ensureTrue((wrappedStream != null),
080         "RateLimitedInputStream.wrappedStream must not be null.");
081    Validator.ensureTrue((maxBytesPerSecond > 0),
082         "RateLimitedInputStream.maxBytesPerSecond must be greater than " +
083              "zero.  The provided value was " + maxBytesPerSecond);
084
085    this.wrappedStream = wrappedStream;
086
087    rateLimiter = new FixedRateBarrier(1000L, maxBytesPerSecond);
088    maxBytesPerRead = Math.max(1, (maxBytesPerSecond / 100));
089  }
090
091
092
093  /**
094   * Closes this input stream and the wrapped stream.
095   *
096   * @throws  IOException  If a problem is encountered while closing the wrapped
097   *                       input stream.
098   */
099  @Override()
100  public void close()
101         throws IOException
102  {
103    wrappedStream.close();
104  }
105
106
107
108  /**
109   * Reads a single byte of input from the wrapped input stream.
110   *
111   * @return  The byte that was read, or -1 if the end of the input stream has
112   *          been reached.
113   *
114   * @throws  IOException  If a problem is encountered while attempting to read
115   *                       data from the underlying input stream.
116   */
117  @Override()
118  public int read()
119         throws IOException
120  {
121    rateLimiter.await();
122    return wrappedStream.read();
123  }
124
125
126
127  /**
128   * Reads data from the wrapped input stream into the provided array.
129   *
130   * @param  b  The array into which the data will be placed.
131   *
132   * @return  The number of bytes that were read, or -1 if the end of the input
133   *          stream has been reached.
134   *
135   * @throws  IOException  If a problem is encountered while attempting to read
136   *                       data from the underlying input stream.
137   */
138  @Override()
139  public int read(@NotNull final byte[] b)
140         throws IOException
141  {
142    return read(b, 0, b.length);
143  }
144
145
146
147  /**
148   * Reads data from the wrapped input stream into the specified portion of the
149   * provided array.
150   *
151   * @param  b       The array into which the data will be placed.
152   * @param  offset  The index into the provided array at which the data should
153   *                 start being added.
154   * @param  length  The maximum number of bytes to be added into the array.
155   *
156   * @return  The number of bytes that were read, or -1 if the end of the input
157   *          stream has been reached.
158   *
159   * @throws  IOException  If a problem is encountered while attempting to read
160   *                       data from the underlying input stream.
161   */
162  @Override()
163  public int read(@NotNull final byte[] b, final int offset, final int length)
164         throws IOException
165  {
166    if (length <= 0)
167    {
168      return 0;
169    }
170
171    if (length <= maxBytesPerRead)
172    {
173      rateLimiter.await(length);
174      return wrappedStream.read(b, offset, length);
175    }
176    else
177    {
178      int pos = offset;
179      int remainingLength = length;
180      int totalBytesRead = 0;
181      while (remainingLength > 0)
182      {
183        final int lengthThisRead = Math.min(remainingLength, maxBytesPerRead);
184        rateLimiter.await(lengthThisRead);
185        final int bytesRead = wrappedStream.read(b, pos, lengthThisRead);
186        if (bytesRead < 0)
187        {
188          break;
189        }
190
191        pos += bytesRead;
192        totalBytesRead += bytesRead;
193        remainingLength -= bytesRead;
194      }
195
196      return totalBytesRead;
197    }
198  }
199
200
201
202  /**
203   * Retrieves the number of bytes that are immediately available to be read,
204   * if the wrapped stream supports this operation.
205   *
206   * @return  The number of bytes that are immediately available to be read, or
207   *          zero if there are no bytes to be read, if the end of the input
208   *          stream has been reached, or if the wrapped input stream does not
209   *          support this operation.
210   */
211  @Override()
212  public int available()
213         throws IOException
214  {
215    return wrappedStream.available();
216  }
217
218
219
220  /**
221   * Indicates whether this {@code InputStream} implementation supports the use
222   * of the {@link #mark(int)} and {@link #reset()} methods.  This
223   * implementation will support those methods if the wrapped stream supports
224   * them.
225   *
226   * @return  {@code true} if this {@code InputStream} supports the
227   *          {@code mark} and {@code reset} methods, or {@code false} if not.
228   */
229  @Override()
230  public boolean markSupported()
231  {
232    return wrappedStream.markSupported();
233  }
234
235
236
237  /**
238   * Attempts to mark the current position in the wrapped input stream so that
239   * it can optionally be reset after some amount of data has been read.
240   * fun
241   *
242   * @param  readLimit  The maximum number of bytes expected to be read before a
243   *                    call to the {@link #reset()} method before the mark will
244   *                    no longer be honored.
245   */
246  @Override()
247  public void mark(final int readLimit)
248  {
249    wrappedStream.mark(readLimit);
250  }
251
252
253
254  /**
255   * Attempts to reset the position of this input stream to the last mark
256   * position.
257   *
258   * @throws  IOException  If the input stream cannot be repositioned to the
259   *                       marked location, or if no mark has been set.
260   */
261  @Override()
262  public void reset()
263         throws IOException
264  {
265    wrappedStream.reset();
266  }
267}