001    /*
002     * Copyright 2009-2016 UnboundID Corp.
003     * All Rights Reserved.
004     */
005    /*
006     * Copyright (C) 2009-2016 UnboundID Corp.
007     *
008     * This program is free software; you can redistribute it and/or modify
009     * it under the terms of the GNU General Public License (GPLv2 only)
010     * or the terms of the GNU Lesser General Public License (LGPLv2.1 only)
011     * as published by the Free Software Foundation.
012     *
013     * This program is distributed in the hope that it will be useful,
014     * but WITHOUT ANY WARRANTY; without even the implied warranty of
015     * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
016     * GNU General Public License for more details.
017     *
018     * You should have received a copy of the GNU General Public License
019     * along with this program; if not, see <http://www.gnu.org/licenses>.
020     */
021    package com.unboundid.util;
022    
023    
024    
025    import java.io.Serializable;
026    import java.util.ArrayList;
027    import java.util.Collections;
028    import java.util.List;
029    import java.util.logging.Level;
030    
031    import static com.unboundid.util.Debug.*;
032    
033    
034    
035    /**
036     * Instances of this class are used to ensure that certain actions are performed
037     * at a fixed rate per interval (e.g. 10000 search operations per second).
038     * <p>
039     * Once a class is constructed with the duration of an interval and the target
040     * per interval, the {@link #await} method only releases callers at the
041     * specified number of times per interval.  This class is most useful when
042     * the target number per interval exceeds the limits of other approaches
043     * such as {@code java.util.Timer} or
044     * {@code java.util.concurrent.ScheduledThreadPoolExecutor}.  For instance,
045     * this does a good job of ensuring that something happens about 10000 times
046     * per second, but it's overkill to ensure something happens five times per
047     * hour.  This does come at a cost.  In the worst case, a single thread is
048     * tied up in a loop doing a small amount of computation followed by a
049     * Thread.yield().  Calling Thread.sleep() is not possible because many
050     * platforms sleep for a minimum of 10ms, and all platforms require sleeping
051     * for at least 1ms.
052     * <p>
053     * Testing has shown that this class is accurate for a "no-op"
054     * action up to two million per second, which vastly exceeds its
055     * typical use in tools such as {@code searchrate} and {@code modrate}.  This
056     * class is designed to be called by multiple threads, however, it does not
057     * make any fairness guarantee between threads; a single-thread might be
058     * released from the {@link #await} method many times before another thread
059     * that is blocked in that method.
060     * <p>
061     * This class attempts to smooth out the target per interval throughout each
062     * interval.  At a given ratio, R between 0 and 1, through the interval, the
063     * expected number of actions to have been performed in the interval at that
064     * time is R times the target per interval.  That is, 10% of the way through
065     * the interval, approximately 10% of the actions have been performed, and
066     * 80% of the way through the interval, 80% of the actions have been performed.
067     */
068    @ThreadSafety(level=ThreadSafetyLevel.COMPLETELY_THREADSAFE)
069    public final class FixedRateBarrier
070           implements Serializable
071    {
072      /**
073       * The serial version UID for this serializable class.
074       */
075      private static final long serialVersionUID = -3490156685189909611L;
076    
077      /**
078       * The minimum number of milliseconds that Thread.sleep() can handle
079       * accurately.  This varies from platform to platform, so we measure it
080       * once in the static initializer below.  When using a low rate (such as
081       * 100 per second), we can often sleep between iterations instead of having
082       * to spin calling Thread.yield().
083       */
084      private static final long minSleepMillis;
085    
086      static
087      {
088        // Calibrate the minimum number of milliseconds that we can reliably
089        // sleep on this system.  We take several measurements and take the median,
090        // which keeps us from choosing an outlier.
091        //
092        // It varies from system to system.  Testing on three systems, yielded
093        // three different measurements Solaris x86 (10 ms), RedHat Linux (2 ms),
094        // Windows 7 (1 ms).
095    
096        final List<Long> minSleepMillisMeasurements = new ArrayList<Long>();
097    
098        for (int i = 0; i < 11; i++)
099        {
100          final long timeBefore = System.currentTimeMillis();
101          try
102          {
103            Thread.sleep(1);
104          }
105          catch (InterruptedException e)
106          {
107            debugException(e);
108          }
109          final long sleepMillis = System.currentTimeMillis() - timeBefore;
110          minSleepMillisMeasurements.add(sleepMillis);
111        }
112    
113        Collections.sort(minSleepMillisMeasurements);
114        final long medianSleepMillis = minSleepMillisMeasurements.get(
115                minSleepMillisMeasurements.size()/2);
116    
117        minSleepMillis = Math.max(medianSleepMillis, 1);
118    
119        final String message = "Calibrated FixedRateBarrier to use " +
120              "minSleepMillis=" + minSleepMillis + ".  " +
121              "Minimum sleep measurements = " + minSleepMillisMeasurements;
122        debug(Level.INFO, DebugType.OTHER, message);
123      }
124    
125    
126      // This tracks when this class is shut down.  Calls to await() after
127      // shutdownRequested() is called, will return immediately with a value of
128      // true.
129      private volatile boolean shutdownRequested = false;
130    
131    
132      //
133      // The following class variables are guarded by synchronized(this).
134      //
135    
136      // The duration of the target interval in nano-seconds.
137      private long intervalDurationNanos;
138    
139      // This tracks the number of milliseconds between each iteration if they
140      // were evenly spaced.
141      //
142      // If intervalDurationMs=1000 and perInterval=100, then this is 100.
143      // If intervalDurationMs=1000 and perInterval=10000, then this is .1.
144      private double millisBetweenIterations;
145    
146      // The target number of times to release a thread per interval.
147      private int perInterval;
148    
149      // A count of the number of times that await has returned within the current
150      // interval.
151      private long countInThisInterval;
152    
153      // The start of this interval in terms of System.nanoTime().
154      private long intervalStartNanos;
155    
156      // The end of this interval in terms of System.nanoTime().
157      private long intervalEndNanos;
158    
159    
160    
161      /**
162       * Constructs a new FixedRateBarrier, which is active until
163       * {@link #shutdownRequested} is called.
164       *
165       * @param  intervalDurationMs  The duration of the interval in milliseconds.
166       * @param  perInterval  The target number of times that {@link #await} should
167       *                      return per interval.
168       */
169      public FixedRateBarrier(final long intervalDurationMs, final int perInterval)
170      {
171        setRate(intervalDurationMs, perInterval);
172      }
173    
174    
175    
176      /**
177       * Updates the rates associated with this FixedRateBarrier.  The new rate
178       * will be in effect when this method returns.
179       *
180       * @param  intervalDurationMs  The duration of the interval in milliseconds.
181       * @param  perInterval  The target number of times that {@link #await} should
182       *                      return per interval.
183       */
184      public synchronized void setRate(final long intervalDurationMs,
185                                       final int perInterval)
186      {
187        Validator.ensureTrue(intervalDurationMs > 0,
188             "FixedRateBarrier.intervalDurationMs must be at least 1.");
189        Validator.ensureTrue(perInterval > 0,
190             "FixedRateBarrier.perInterval must be at least 1.");
191    
192        this.perInterval = perInterval;
193    
194        intervalDurationNanos = 1000L * 1000L * intervalDurationMs;
195    
196        millisBetweenIterations = (double)intervalDurationMs/(double)perInterval;
197    
198        // Reset the intervals and all of the counters.
199        countInThisInterval = 0;
200        intervalStartNanos = 0;
201        intervalEndNanos = 0;
202      }
203    
204    
205    
206      /**
207       * This method waits until it is time for the next 'action' to be performed
208       * based on the specified interval duration and target per interval.  This
209       * method can be called by multiple threads simultaneously.  This method
210       * returns immediately if shutdown has been requested.
211       *
212       * @return  {@code true} if shutdown has been requested and {@code} false
213       *          otherwise.
214       */
215      public synchronized boolean await()
216      {
217        // Loop forever until we are requested to shutdown or it is time to perform
218        // the next 'action' in which case we break from the loop.
219        while (!shutdownRequested)
220        {
221          final long now = System.nanoTime();
222    
223          if ((intervalStartNanos == 0) ||   // Handles the first time we're called.
224              (now < intervalStartNanos))    // Handles a change in the clock.
225          {
226            intervalStartNanos = now;
227            intervalEndNanos = intervalStartNanos + intervalDurationNanos;
228          }
229          else if (now >= intervalEndNanos)  // End of an interval.
230          {
231            countInThisInterval = 0;
232    
233            if (now < (intervalEndNanos + intervalDurationNanos))
234            {
235              // If we have already passed the end of the next interval, then we
236              // don't try to catch up.  Instead we just reset the start of the
237              // next interval to now.  This could happen if the system clock
238              // was set to the future, we're running in a debugger, or we have
239              // very short intervals and are unable to keep up.
240              intervalStartNanos = now;
241            }
242            else
243            {
244              // Usually we're some small fraction into the next interval, so
245              // we set the start of the current interval to the end of the
246              // previous one.
247              intervalStartNanos = intervalEndNanos;
248            }
249            intervalEndNanos = intervalStartNanos + intervalDurationNanos;
250          }
251    
252          final long intervalRemaining = intervalEndNanos - now;
253          if (intervalRemaining <= 0)
254          {
255            // This shouldn't happen, but we're careful not to divide by 0.
256            continue;
257          }
258    
259          final double intervalFractionRemaining =
260               (double) intervalRemaining / intervalDurationNanos;
261    
262          final double expectedRemaining = intervalFractionRemaining * perInterval;
263          final long actualRemaining = perInterval - countInThisInterval;
264    
265          if (actualRemaining >= expectedRemaining)
266          {
267            // We are on schedule or behind schedule so let the next 'action'
268            // happen.
269            countInThisInterval++;
270            break;
271          }
272          else
273          {
274            // If we can sleep until it's time to leave this barrier, then do
275            // so to keep from spinning on a CPU doing Thread.yield().
276    
277            final double gapIterations = expectedRemaining - actualRemaining;
278            final long remainingMillis =
279                 (long) Math.floor(millisBetweenIterations * gapIterations);
280    
281            if (remainingMillis >= minSleepMillis)
282            {
283              // Cap how long we sleep so that we can respond to a change in the
284              // rate without too much delay.
285              final long waitTime = Math.min(remainingMillis, 10);
286              try
287              {
288                // We need to wait here instead of Thread.sleep so that we don't
289                // block setRate.
290                this.wait(waitTime);
291              }
292              catch (InterruptedException e)
293              {
294                debugException(e);
295              }
296            }
297            else
298            {
299              // We're ahead of schedule so yield to other threads, and then try
300              // again.  Note: this is the most costly part of the algorithm because
301              // we have to busy wait due to the lack of sleeping for very small
302              // amounts of time.
303              Thread.yield();
304            }
305          }
306        }
307    
308        return shutdownRequested;
309      }
310    
311    
312    
313      /**
314       * Retrieves information about the current target rate for this barrier.  The
315       * value returned will include a {@code Long} that specifies the duration of
316       * the current interval in milliseconds and an {@code Integer} that specifies
317       * the number of times that the {@link #await} method should return per
318       * interval.
319       *
320       * @return  Information about hte current target rate for this barrier.
321       */
322      public synchronized ObjectPair<Long,Integer> getTargetRate()
323      {
324        return new ObjectPair<Long,Integer>(
325             (intervalDurationNanos / (1000L * 1000L)),
326             perInterval);
327      }
328    
329    
330    
331      /**
332       * Shuts down this barrier.  Future calls to await() will return immediately.
333       */
334      public void shutdownRequested()
335      {
336        shutdownRequested = true;
337      }
338    
339    
340    
341      /**
342       * Returns {@code true} if shutdown has been requested.
343       *
344       * @return  {@code true} if shutdown has been requested and {@code false}
345       *          otherwise.
346       */
347      public boolean isShutdownRequested()
348      {
349        return shutdownRequested;
350      }
351    }