001/*
002 * Copyright 2009-2024 Ping Identity Corporation
003 * All Rights Reserved.
004 */
005/*
006 * Copyright 2009-2024 Ping Identity Corporation
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *    http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020/*
021 * Copyright (C) 2009-2024 Ping Identity Corporation
022 *
023 * This program is free software; you can redistribute it and/or modify
024 * it under the terms of the GNU General Public License (GPLv2 only)
025 * or the terms of the GNU Lesser General Public License (LGPLv2.1 only)
026 * as published by the Free Software Foundation.
027 *
028 * This program is distributed in the hope that it will be useful,
029 * but WITHOUT ANY WARRANTY; without even the implied warranty of
030 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
031 * GNU General Public License for more details.
032 *
033 * You should have received a copy of the GNU General Public License
034 * along with this program; if not, see <http://www.gnu.org/licenses>.
035 */
036package com.unboundid.util;
037
038
039
040import java.io.Serializable;
041import java.util.ArrayList;
042import java.util.Collections;
043import java.util.List;
044import java.util.logging.Level;
045
046
047
048/**
049 * Instances of this class are used to ensure that certain actions are performed
050 * at a fixed rate per interval (e.g. 10000 search operations per second).
051 * <p>
052 * Once a class is constructed with the duration of an interval and the target
053 * per interval, the {@link #await} method only releases callers at the
054 * specified number of times per interval.  This class is most useful when
055 * the target number per interval exceeds the limits of other approaches
056 * such as {@code java.util.Timer} or
057 * {@code java.util.concurrent.ScheduledThreadPoolExecutor}.  For instance,
058 * this does a good job of ensuring that something happens about 10000 times
059 * per second, but it's overkill to ensure something happens five times per
060 * hour.  This does come at a cost.  In the worst case, a single thread is
061 * tied up in a loop doing a small amount of computation followed by a
062 * Thread.yield().  Calling Thread.sleep() is not possible because many
063 * platforms sleep for a minimum of 10ms, and all platforms require sleeping
064 * for at least 1ms.
065 * <p>
066 * Testing has shown that this class is accurate for a "no-op"
067 * action up to two million per second, which vastly exceeds its
068 * typical use in tools such as {@code searchrate} and {@code modrate}.  This
069 * class is designed to be called by multiple threads, however, it does not
070 * make any fairness guarantee between threads; a single-thread might be
071 * released from the {@link #await} method many times before another thread
072 * that is blocked in that method.
073 * <p>
074 * This class attempts to smooth out the target per interval throughout each
075 * interval.  At a given ratio, R between 0 and 1, through the interval, the
076 * expected number of actions to have been performed in the interval at that
077 * time is R times the target per interval.  That is, 10% of the way through
078 * the interval, approximately 10% of the actions have been performed, and
079 * 80% of the way through the interval, 80% of the actions have been performed.
080 * <p>
081 * It's possible to wait for multiple "actions" in one call with
082 * {@link #await(int)}. An example use is rate limiting writing bytes out to
083 * a file. You could configure a FixedRateBarrier to only allow 1M bytes to
084 * be written per second, and then call {@link #await(int)} with the size of
085 * the byte buffer to write. The call to {@link #await(int)} would block until
086 * writing out the buffer would not exceed the desired rate.
087 */
088@ThreadSafety(level=ThreadSafetyLevel.COMPLETELY_THREADSAFE)
089public final class FixedRateBarrier
090       implements Serializable
091{
092  /**
093   * The minimum number of milliseconds that Thread.sleep() can handle
094   * accurately.  This varies from platform to platform, so we measure it
095   * once in the static initializer below.  When using a low rate (such as
096   * 100 per second), we can often sleep between iterations instead of having
097   * to spin calling Thread.yield().
098   */
099  private static final long minSleepMillis;
100  static
101  {
102    // Calibrate the minimum number of milliseconds that we can reliably
103    // sleep on this system.  We take several measurements and take the median,
104    // which keeps us from choosing an outlier.
105    //
106    // It varies from system to system.  Testing on three systems, yielded
107    // three different measurements Solaris x86 (10 ms), RedHat Linux (2 ms),
108    // Windows 7 (1 ms).
109
110    final List<Long> minSleepMillisMeasurements = new ArrayList<>(11);
111
112    for (int i = 0; i < 11; i++)
113    {
114      final long timeBefore = System.currentTimeMillis();
115      try
116      {
117        Thread.sleep(1);
118      }
119      catch (final InterruptedException e)
120      {
121        Debug.debugException(e);
122      }
123      final long sleepMillis = System.currentTimeMillis() - timeBefore;
124      minSleepMillisMeasurements.add(sleepMillis);
125    }
126
127    Collections.sort(minSleepMillisMeasurements);
128    final long medianSleepMillis = minSleepMillisMeasurements.get(
129            minSleepMillisMeasurements.size()/2);
130
131    minSleepMillis = Math.max(medianSleepMillis, 1);
132
133    final String message = "Calibrated FixedRateBarrier to use " +
134          "minSleepMillis=" + minSleepMillis + ".  " +
135          "Minimum sleep measurements = " + minSleepMillisMeasurements;
136    Debug.debug(Level.INFO, DebugType.OTHER, message);
137  }
138
139
140
141  /**
142   * The serial version UID for this serializable class.
143   */
144  private static final long serialVersionUID = -9048370191248737239L;
145
146
147
148  // This tracks when this class is shut down.  Calls to await() after
149  // shutdownRequested() is called, will return immediately with a value of
150  // true.
151  private volatile boolean shutdownRequested = false;
152
153
154  //
155  // The following class variables are guarded by synchronized(this).
156  //
157
158  // The duration of the target interval in nano-seconds.
159  private long intervalDurationNanos;
160
161  // This tracks the number of milliseconds between each iteration if they
162  // were evenly spaced.
163  //
164  // If intervalDurationMs=1000 and perInterval=100, then this is 100.
165  // If intervalDurationMs=1000 and perInterval=10000, then this is .1.
166  private double millisBetweenIterations;
167
168  // The target number of times to release a thread per interval.
169  private int perInterval;
170
171  // A count of the number of times that await has returned within the current
172  // interval.
173  private long countInThisInterval;
174
175  // The start of this interval in terms of System.nanoTime().
176  private long intervalStartNanos;
177
178  // The end of this interval in terms of System.nanoTime().
179  private long intervalEndNanos;
180
181
182
183  /**
184   * Constructs a new FixedRateBarrier, which is active until
185   * {@link #shutdownRequested} is called.
186   *
187   * @param  intervalDurationMs  The duration of the interval in milliseconds.
188   * @param  perInterval  The target number of times that {@link #await} should
189   *                      return per interval.
190   */
191  public FixedRateBarrier(final long intervalDurationMs, final int perInterval)
192  {
193    setRate(intervalDurationMs, perInterval);
194  }
195
196
197
198  /**
199   * Updates the rates associated with this FixedRateBarrier.  The new rate
200   * will be in effect when this method returns.
201   *
202   * @param  intervalDurationMs  The duration of the interval in milliseconds.
203   * @param  perInterval  The target number of times that {@link #await} should
204   *                      return per interval.
205   */
206  public synchronized void setRate(final long intervalDurationMs,
207                                   final int perInterval)
208  {
209    Validator.ensureTrue(intervalDurationMs > 0,
210         "FixedRateBarrier.intervalDurationMs must be at least 1.");
211    Validator.ensureTrue(perInterval > 0,
212         "FixedRateBarrier.perInterval must be at least 1.");
213
214    this.perInterval = perInterval;
215
216    intervalDurationNanos = 1000L * 1000L * intervalDurationMs;
217
218    millisBetweenIterations = (double)intervalDurationMs/(double)perInterval;
219
220    // Reset the intervals and all of the counters.
221    countInThisInterval = 0;
222    intervalStartNanos = 0;
223    intervalEndNanos = 0;
224  }
225
226
227
228  /**
229   * This method waits until it is time for the next 'action' to be performed
230   * based on the specified interval duration and target per interval.  This
231   * method can be called by multiple threads simultaneously.  This method
232   * returns immediately if shutdown has been requested.
233   *
234   * @return  {@code true} if shutdown has been requested and {@code} false
235   *          otherwise.
236   */
237  public synchronized boolean await()
238  {
239    return await(1);
240  }
241
242
243
244  /**
245   * This method waits until it is time for the next {@code count} 'actions'
246   * to be performed based on the specified interval duration and target per
247   * interval.  To achieve the target rate, it's recommended that on average
248   * {@code count} is small relative to {@code perInterval} (and the
249   * {@code count} must not be larger than {@code perInterval}).  A
250   * {@code count} value will not be split across intervals, and due to timing
251   * issues, it's possible that a {@code count} that barely fits in the
252   * current interval will need to wait until the next interval.  If it's not
253   * possible to use smaller 'count' values, then increase {@code perInterval}
254   * and {@code intervalDurationMs} by the same relative amount.  As an
255   * example, if {@code count} is on average 1/10 as big as
256   * {@code perInterval}, then you can expect to attain 90% of the target
257   * rate.  Increasing {@code perInterval} and {@code intervalDurationMs} by
258   * 10x means that 99% of the target rate can be achieved.
259   * <p>
260   * This method can be called by multiple threads simultaneously.  This method
261   * returns immediately if shutdown has been requested.
262   *
263   * @param  count  The number of 'actions' being performed.  It must be less
264   *                than or equal to {@code perInterval}, and is recommended to
265   *                be fairly small relative to {@code perInterval} so that it
266   *                is easier to achieve the desired rate and exhibit smoother
267   *                performance.
268   *
269   * @return  {@code true} if shutdown has been requested and {@code} false
270   *          otherwise.
271   */
272  public synchronized boolean await(final int count)
273  {
274    if (count > perInterval)
275    {
276      Validator.ensureTrue(false,
277           "FixedRateBarrier.await(int) count value " + count +
278                " exceeds perInterval value " + perInterval +
279                ".  The provided count value must be less than or equal to " +
280                "the perInterval value.");
281    }
282    else if (count <= 0)
283    {
284      return shutdownRequested;
285    }
286
287    // Loop forever until we are requested to shutdown or it is time to perform
288    // the next 'action' in which case we break from the loop.
289    while (!shutdownRequested)
290    {
291      final long now = System.nanoTime();
292
293      if ((intervalStartNanos == 0) ||   // Handles the first time we're called.
294          (now < intervalStartNanos))    // Handles a change in the clock.
295      {
296        intervalStartNanos = now;
297        intervalEndNanos = intervalStartNanos + intervalDurationNanos;
298      }
299      else if (now >= intervalEndNanos)  // End of an interval.
300      {
301        countInThisInterval = 0;
302
303        if (now < (intervalEndNanos + intervalDurationNanos))
304        {
305          // If we have already passed the end of the next interval, then we
306          // don't try to catch up.  Instead we just reset the start of the
307          // next interval to now.  This could happen if the system clock
308          // was set to the future, we're running in a debugger, or we have
309          // very short intervals and are unable to keep up.
310          intervalStartNanos = now;
311        }
312        else
313        {
314          // Usually we're some small fraction into the next interval, so
315          // we set the start of the current interval to the end of the
316          // previous one.
317          intervalStartNanos = intervalEndNanos;
318        }
319        intervalEndNanos = intervalStartNanos + intervalDurationNanos;
320      }
321
322      final long intervalRemaining = intervalEndNanos - now;
323      if (intervalRemaining <= 0)
324      {
325        // This shouldn't happen, but we're careful not to divide by 0.
326        continue;
327      }
328
329      final double intervalFractionRemaining =
330           (double) intervalRemaining / intervalDurationNanos;
331
332      final double expectedRemaining = intervalFractionRemaining * perInterval;
333      final long actualRemaining = perInterval - countInThisInterval;
334
335      final long countBehind =
336              (long)Math.ceil(actualRemaining - expectedRemaining);
337
338      if (count <= countBehind)
339      {
340        // We are on schedule or behind schedule so let the 'action(s)'
341        // happen.
342        countInThisInterval += count;
343        break;
344      }
345      else
346      {
347        // If we can sleep until it's time to leave this barrier, then do
348        // so to keep from spinning on a CPU doing Thread.yield().
349
350        final long countNeeded = count - countBehind;
351        final long remainingMillis =
352             (long) Math.floor(millisBetweenIterations * countNeeded);
353
354        if (remainingMillis >= minSleepMillis)
355        {
356          // Cap how long we sleep so that we can respond to a change in the
357          // rate without too much delay.
358          try
359          {
360            // We need to wait here instead of Thread.sleep so that we don't
361            // block setRate.  Also, cap how long we sleep so that we can
362            // respond to a change in the rate without too much delay.
363            final long waitTime = Math.min(remainingMillis, 10);
364            wait(waitTime);
365          }
366          catch (final InterruptedException e)
367          {
368            Debug.debugException(e);
369            Thread.currentThread().interrupt();
370            return shutdownRequested;
371          }
372        }
373        else
374        {
375          // We're ahead of schedule so yield to other threads, and then try
376          // again.  Note: this is the most costly part of the algorithm because
377          // we have to busy wait due to the lack of sleeping for very small
378          // amounts of time.
379          Thread.yield();
380        }
381      }
382    }
383
384    return shutdownRequested;
385  }
386
387
388
389  /**
390   * Retrieves information about the current target rate for this barrier.  The
391   * value returned will include a {@code Long} that specifies the duration of
392   * the current interval in milliseconds and an {@code Integer} that specifies
393   * the number of times that the {@link #await} method should return per
394   * interval.
395   *
396   * @return  Information about hte current target rate for this barrier.
397   */
398  @NotNull()
399  public synchronized ObjectPair<Long,Integer> getTargetRate()
400  {
401    return new ObjectPair<>(
402         (intervalDurationNanos / (1000L * 1000L)),
403         perInterval);
404  }
405
406
407
408  /**
409   * Shuts down this barrier.  Future calls to await() will return immediately.
410   */
411  public void shutdownRequested()
412  {
413    shutdownRequested = true;
414  }
415
416
417
418  /**
419   * Returns {@code true} if shutdown has been requested.
420   *
421   * @return  {@code true} if shutdown has been requested and {@code false}
422   *          otherwise.
423   */
424  public boolean isShutdownRequested()
425  {
426    return shutdownRequested;
427  }
428}