001/* 002 * Copyright 2009-2024 Ping Identity Corporation 003 * All Rights Reserved. 004 */ 005/* 006 * Copyright 2009-2024 Ping Identity Corporation 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020/* 021 * Copyright (C) 2009-2024 Ping Identity Corporation 022 * 023 * This program is free software; you can redistribute it and/or modify 024 * it under the terms of the GNU General Public License (GPLv2 only) 025 * or the terms of the GNU Lesser General Public License (LGPLv2.1 only) 026 * as published by the Free Software Foundation. 027 * 028 * This program is distributed in the hope that it will be useful, 029 * but WITHOUT ANY WARRANTY; without even the implied warranty of 030 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 031 * GNU General Public License for more details. 032 * 033 * You should have received a copy of the GNU General Public License 034 * along with this program; if not, see <http://www.gnu.org/licenses>. 035 */ 036package com.unboundid.util; 037 038 039 040import java.io.Serializable; 041import java.util.ArrayList; 042import java.util.Collections; 043import java.util.List; 044import java.util.logging.Level; 045 046 047 048/** 049 * Instances of this class are used to ensure that certain actions are performed 050 * at a fixed rate per interval (e.g. 10000 search operations per second). 051 * <p> 052 * Once a class is constructed with the duration of an interval and the target 053 * per interval, the {@link #await} method only releases callers at the 054 * specified number of times per interval. This class is most useful when 055 * the target number per interval exceeds the limits of other approaches 056 * such as {@code java.util.Timer} or 057 * {@code java.util.concurrent.ScheduledThreadPoolExecutor}. For instance, 058 * this does a good job of ensuring that something happens about 10000 times 059 * per second, but it's overkill to ensure something happens five times per 060 * hour. This does come at a cost. In the worst case, a single thread is 061 * tied up in a loop doing a small amount of computation followed by a 062 * Thread.yield(). Calling Thread.sleep() is not possible because many 063 * platforms sleep for a minimum of 10ms, and all platforms require sleeping 064 * for at least 1ms. 065 * <p> 066 * Testing has shown that this class is accurate for a "no-op" 067 * action up to two million per second, which vastly exceeds its 068 * typical use in tools such as {@code searchrate} and {@code modrate}. This 069 * class is designed to be called by multiple threads, however, it does not 070 * make any fairness guarantee between threads; a single-thread might be 071 * released from the {@link #await} method many times before another thread 072 * that is blocked in that method. 073 * <p> 074 * This class attempts to smooth out the target per interval throughout each 075 * interval. At a given ratio, R between 0 and 1, through the interval, the 076 * expected number of actions to have been performed in the interval at that 077 * time is R times the target per interval. That is, 10% of the way through 078 * the interval, approximately 10% of the actions have been performed, and 079 * 80% of the way through the interval, 80% of the actions have been performed. 080 * <p> 081 * It's possible to wait for multiple "actions" in one call with 082 * {@link #await(int)}. An example use is rate limiting writing bytes out to 083 * a file. You could configure a FixedRateBarrier to only allow 1M bytes to 084 * be written per second, and then call {@link #await(int)} with the size of 085 * the byte buffer to write. The call to {@link #await(int)} would block until 086 * writing out the buffer would not exceed the desired rate. 087 */ 088@ThreadSafety(level=ThreadSafetyLevel.COMPLETELY_THREADSAFE) 089public final class FixedRateBarrier 090 implements Serializable 091{ 092 /** 093 * The minimum number of milliseconds that Thread.sleep() can handle 094 * accurately. This varies from platform to platform, so we measure it 095 * once in the static initializer below. When using a low rate (such as 096 * 100 per second), we can often sleep between iterations instead of having 097 * to spin calling Thread.yield(). 098 */ 099 private static final long minSleepMillis; 100 static 101 { 102 // Calibrate the minimum number of milliseconds that we can reliably 103 // sleep on this system. We take several measurements and take the median, 104 // which keeps us from choosing an outlier. 105 // 106 // It varies from system to system. Testing on three systems, yielded 107 // three different measurements Solaris x86 (10 ms), RedHat Linux (2 ms), 108 // Windows 7 (1 ms). 109 110 final List<Long> minSleepMillisMeasurements = new ArrayList<>(11); 111 112 for (int i = 0; i < 11; i++) 113 { 114 final long timeBefore = System.currentTimeMillis(); 115 try 116 { 117 Thread.sleep(1); 118 } 119 catch (final InterruptedException e) 120 { 121 Debug.debugException(e); 122 } 123 final long sleepMillis = System.currentTimeMillis() - timeBefore; 124 minSleepMillisMeasurements.add(sleepMillis); 125 } 126 127 Collections.sort(minSleepMillisMeasurements); 128 final long medianSleepMillis = minSleepMillisMeasurements.get( 129 minSleepMillisMeasurements.size()/2); 130 131 minSleepMillis = Math.max(medianSleepMillis, 1); 132 133 final String message = "Calibrated FixedRateBarrier to use " + 134 "minSleepMillis=" + minSleepMillis + ". " + 135 "Minimum sleep measurements = " + minSleepMillisMeasurements; 136 Debug.debug(Level.INFO, DebugType.OTHER, message); 137 } 138 139 140 141 /** 142 * The serial version UID for this serializable class. 143 */ 144 private static final long serialVersionUID = -9048370191248737239L; 145 146 147 148 // This tracks when this class is shut down. Calls to await() after 149 // shutdownRequested() is called, will return immediately with a value of 150 // true. 151 private volatile boolean shutdownRequested = false; 152 153 154 // 155 // The following class variables are guarded by synchronized(this). 156 // 157 158 // The duration of the target interval in nano-seconds. 159 private long intervalDurationNanos; 160 161 // This tracks the number of milliseconds between each iteration if they 162 // were evenly spaced. 163 // 164 // If intervalDurationMs=1000 and perInterval=100, then this is 100. 165 // If intervalDurationMs=1000 and perInterval=10000, then this is .1. 166 private double millisBetweenIterations; 167 168 // The target number of times to release a thread per interval. 169 private int perInterval; 170 171 // A count of the number of times that await has returned within the current 172 // interval. 173 private long countInThisInterval; 174 175 // The start of this interval in terms of System.nanoTime(). 176 private long intervalStartNanos; 177 178 // The end of this interval in terms of System.nanoTime(). 179 private long intervalEndNanos; 180 181 182 183 /** 184 * Constructs a new FixedRateBarrier, which is active until 185 * {@link #shutdownRequested} is called. 186 * 187 * @param intervalDurationMs The duration of the interval in milliseconds. 188 * @param perInterval The target number of times that {@link #await} should 189 * return per interval. 190 */ 191 public FixedRateBarrier(final long intervalDurationMs, final int perInterval) 192 { 193 setRate(intervalDurationMs, perInterval); 194 } 195 196 197 198 /** 199 * Updates the rates associated with this FixedRateBarrier. The new rate 200 * will be in effect when this method returns. 201 * 202 * @param intervalDurationMs The duration of the interval in milliseconds. 203 * @param perInterval The target number of times that {@link #await} should 204 * return per interval. 205 */ 206 public synchronized void setRate(final long intervalDurationMs, 207 final int perInterval) 208 { 209 Validator.ensureTrue(intervalDurationMs > 0, 210 "FixedRateBarrier.intervalDurationMs must be at least 1."); 211 Validator.ensureTrue(perInterval > 0, 212 "FixedRateBarrier.perInterval must be at least 1."); 213 214 this.perInterval = perInterval; 215 216 intervalDurationNanos = 1000L * 1000L * intervalDurationMs; 217 218 millisBetweenIterations = (double)intervalDurationMs/(double)perInterval; 219 220 // Reset the intervals and all of the counters. 221 countInThisInterval = 0; 222 intervalStartNanos = 0; 223 intervalEndNanos = 0; 224 } 225 226 227 228 /** 229 * This method waits until it is time for the next 'action' to be performed 230 * based on the specified interval duration and target per interval. This 231 * method can be called by multiple threads simultaneously. This method 232 * returns immediately if shutdown has been requested. 233 * 234 * @return {@code true} if shutdown has been requested and {@code} false 235 * otherwise. 236 */ 237 public synchronized boolean await() 238 { 239 return await(1); 240 } 241 242 243 244 /** 245 * This method waits until it is time for the next {@code count} 'actions' 246 * to be performed based on the specified interval duration and target per 247 * interval. To achieve the target rate, it's recommended that on average 248 * {@code count} is small relative to {@code perInterval} (and the 249 * {@code count} must not be larger than {@code perInterval}). A 250 * {@code count} value will not be split across intervals, and due to timing 251 * issues, it's possible that a {@code count} that barely fits in the 252 * current interval will need to wait until the next interval. If it's not 253 * possible to use smaller 'count' values, then increase {@code perInterval} 254 * and {@code intervalDurationMs} by the same relative amount. As an 255 * example, if {@code count} is on average 1/10 as big as 256 * {@code perInterval}, then you can expect to attain 90% of the target 257 * rate. Increasing {@code perInterval} and {@code intervalDurationMs} by 258 * 10x means that 99% of the target rate can be achieved. 259 * <p> 260 * This method can be called by multiple threads simultaneously. This method 261 * returns immediately if shutdown has been requested. 262 * 263 * @param count The number of 'actions' being performed. It must be less 264 * than or equal to {@code perInterval}, and is recommended to 265 * be fairly small relative to {@code perInterval} so that it 266 * is easier to achieve the desired rate and exhibit smoother 267 * performance. 268 * 269 * @return {@code true} if shutdown has been requested and {@code} false 270 * otherwise. 271 */ 272 public synchronized boolean await(final int count) 273 { 274 if (count > perInterval) 275 { 276 Validator.ensureTrue(false, 277 "FixedRateBarrier.await(int) count value " + count + 278 " exceeds perInterval value " + perInterval + 279 ". The provided count value must be less than or equal to " + 280 "the perInterval value."); 281 } 282 else if (count <= 0) 283 { 284 return shutdownRequested; 285 } 286 287 // Loop forever until we are requested to shutdown or it is time to perform 288 // the next 'action' in which case we break from the loop. 289 while (!shutdownRequested) 290 { 291 final long now = System.nanoTime(); 292 293 if ((intervalStartNanos == 0) || // Handles the first time we're called. 294 (now < intervalStartNanos)) // Handles a change in the clock. 295 { 296 intervalStartNanos = now; 297 intervalEndNanos = intervalStartNanos + intervalDurationNanos; 298 } 299 else if (now >= intervalEndNanos) // End of an interval. 300 { 301 countInThisInterval = 0; 302 303 if (now < (intervalEndNanos + intervalDurationNanos)) 304 { 305 // If we have already passed the end of the next interval, then we 306 // don't try to catch up. Instead we just reset the start of the 307 // next interval to now. This could happen if the system clock 308 // was set to the future, we're running in a debugger, or we have 309 // very short intervals and are unable to keep up. 310 intervalStartNanos = now; 311 } 312 else 313 { 314 // Usually we're some small fraction into the next interval, so 315 // we set the start of the current interval to the end of the 316 // previous one. 317 intervalStartNanos = intervalEndNanos; 318 } 319 intervalEndNanos = intervalStartNanos + intervalDurationNanos; 320 } 321 322 final long intervalRemaining = intervalEndNanos - now; 323 if (intervalRemaining <= 0) 324 { 325 // This shouldn't happen, but we're careful not to divide by 0. 326 continue; 327 } 328 329 final double intervalFractionRemaining = 330 (double) intervalRemaining / intervalDurationNanos; 331 332 final double expectedRemaining = intervalFractionRemaining * perInterval; 333 final long actualRemaining = perInterval - countInThisInterval; 334 335 final long countBehind = 336 (long)Math.ceil(actualRemaining - expectedRemaining); 337 338 if (count <= countBehind) 339 { 340 // We are on schedule or behind schedule so let the 'action(s)' 341 // happen. 342 countInThisInterval += count; 343 break; 344 } 345 else 346 { 347 // If we can sleep until it's time to leave this barrier, then do 348 // so to keep from spinning on a CPU doing Thread.yield(). 349 350 final long countNeeded = count - countBehind; 351 final long remainingMillis = 352 (long) Math.floor(millisBetweenIterations * countNeeded); 353 354 if (remainingMillis >= minSleepMillis) 355 { 356 // Cap how long we sleep so that we can respond to a change in the 357 // rate without too much delay. 358 try 359 { 360 // We need to wait here instead of Thread.sleep so that we don't 361 // block setRate. Also, cap how long we sleep so that we can 362 // respond to a change in the rate without too much delay. 363 final long waitTime = Math.min(remainingMillis, 10); 364 wait(waitTime); 365 } 366 catch (final InterruptedException e) 367 { 368 Debug.debugException(e); 369 Thread.currentThread().interrupt(); 370 return shutdownRequested; 371 } 372 } 373 else 374 { 375 // We're ahead of schedule so yield to other threads, and then try 376 // again. Note: this is the most costly part of the algorithm because 377 // we have to busy wait due to the lack of sleeping for very small 378 // amounts of time. 379 Thread.yield(); 380 } 381 } 382 } 383 384 return shutdownRequested; 385 } 386 387 388 389 /** 390 * Retrieves information about the current target rate for this barrier. The 391 * value returned will include a {@code Long} that specifies the duration of 392 * the current interval in milliseconds and an {@code Integer} that specifies 393 * the number of times that the {@link #await} method should return per 394 * interval. 395 * 396 * @return Information about hte current target rate for this barrier. 397 */ 398 @NotNull() 399 public synchronized ObjectPair<Long,Integer> getTargetRate() 400 { 401 return new ObjectPair<>( 402 (intervalDurationNanos / (1000L * 1000L)), 403 perInterval); 404 } 405 406 407 408 /** 409 * Shuts down this barrier. Future calls to await() will return immediately. 410 */ 411 public void shutdownRequested() 412 { 413 shutdownRequested = true; 414 } 415 416 417 418 /** 419 * Returns {@code true} if shutdown has been requested. 420 * 421 * @return {@code true} if shutdown has been requested and {@code false} 422 * otherwise. 423 */ 424 public boolean isShutdownRequested() 425 { 426 return shutdownRequested; 427 } 428}