001    /*
002     * Copyright 2014-2015 UnboundID Corp.
003     * All Rights Reserved.
004     */
005    /*
006     * Copyright (C) 2014-2015 UnboundID Corp.
007     *
008     * This program is free software; you can redistribute it and/or modify
009     * it under the terms of the GNU General Public License (GPLv2 only)
010     * or the terms of the GNU Lesser General Public License (LGPLv2.1 only)
011     * as published by the Free Software Foundation.
012     *
013     * This program is distributed in the hope that it will be useful,
014     * but WITHOUT ANY WARRANTY; without even the implied warranty of
015     * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
016     * GNU General Public License for more details.
017     *
018     * You should have received a copy of the GNU General Public License
019     * along with this program; if not, see <http://www.gnu.org/licenses>.
020     */
021    package com.unboundid.util;
022    
023    
024    
025    import java.io.BufferedReader;
026    import java.io.File;
027    import java.io.FileReader;
028    import java.io.IOException;
029    import java.io.PrintWriter;
030    import java.io.Reader;
031    import java.util.ArrayList;
032    import java.util.Arrays;
033    import java.util.Collections;
034    import java.util.Iterator;
035    import java.util.LinkedHashMap;
036    import java.util.LinkedHashSet;
037    import java.util.LinkedList;
038    import java.util.List;
039    import java.util.Map;
040    import java.util.Set;
041    import java.util.concurrent.CountDownLatch;
042    import java.util.concurrent.TimeUnit;
043    import java.util.regex.Pattern;
044    
045    import com.unboundid.util.args.ArgumentException;
046    import com.unboundid.util.args.DurationArgument;
047    
048    import static com.unboundid.util.Debug.*;
049    import static com.unboundid.util.UtilityMessages.*;
050    
051    
052    
053    /**
054     * This class allows a FixedRateBarrier to change dynamically.  The rate changes
055     * are governed by lines read from a {@code Reader} (typically backed by a
056     * file). The input starts with a header that provides some global options and
057     * then has a list of lines, where each line contains a single rate per second,
058     * a comma, and a duration to maintain that rate.  Rates are specified as an
059     * absolute rate per second or as a rate relative to the base rate per second.
060     * The duration is an integer followed by a time unit (ms=milliseconds,
061     * s=seconds, m=minutes, h=hours, and d=days).
062     * <BR><BR>
063     * The following simple example will run at a target rate of 1000 per second
064     * for one minute, and then 10000 per second for 10 seconds.
065     * <pre>
066     *   # format=rate-duration
067     *   1000,1m
068     *   10000,10s
069     * </pre>
070     * <BR>
071     * The following example has a default duration of one minute, and will repeat
072     * the two intervals until this RateAdjustor is shut down.  The first interval
073     * is run for the default of 1 minute at two and half times the base rate, and
074     * then run for 10 seconds at 10000 per second.
075     * <pre>
076     *   # format=rate-duration
077     *   # default-duration=1m
078     *   # repeat=true
079     *   2.5X
080     *   10000,10s
081     * </pre>
082     * A {@code RateAdjustor} is a daemon thread.  It is necessary to call the
083     * {@code start()} method to start the thread and begin the rate changes.
084     * Once this finished processing the rates, the thread will complete.
085     * It can be stopped prematurely by calling {@code shutDown()}.
086     * <BR><BR>
087     * The header can contain the following options:
088     * <UL>
089     *   <LI>{@code format} (required):  This must currently have the value
090     *       {@code rate-duration}.</LI>
091     *   <LI>{@code default-duration} (optional):  This can specify a default
092     *       duration for intervals that do not include a duration.  The format
093     *       is an integer followed by a time unit as described above.</LI>
094     *   <LI>{@code repeat} (optional):  If this has a value of {@code true}, then
095     *       the rates in the input will be repeated until {@code shutDown()} is
096     *       called.</LI>
097     * </UL>
098     */
099    @ThreadSafety(level = ThreadSafetyLevel.MOSTLY_THREADSAFE)
100    public final class RateAdjustor extends Thread
101    {
102      /**
103       * This starts a comment in the input.
104       */
105      public static final char COMMENT_START = '#';
106    
107    
108    
109      /**
110       * The text that must appear on a line by itself in order to denote that the
111       * end of the file header has been reached.
112       */
113      public static final String END_HEADER_TEXT = "END HEADER";
114    
115    
116    
117      /**
118       * The header key that represents the default duration.
119       */
120      public static final String DEFAULT_DURATION_KEY = "default-duration";
121    
122    
123    
124      /**
125       * The header key that represents the format of the file.
126       */
127      public static final String FORMAT_KEY = "format";
128    
129    
130    
131      /**
132       * The value of the format key that represents a list of rates and durations
133       * within the input file.
134       */
135      public static final String FORMAT_VALUE_RATE_DURATION = "rate-and-duration";
136    
137    
138    
139      /**
140       * A list of all formats that we support.
141       */
142      public static final List<String> FORMATS =
143           Arrays.asList(FORMAT_VALUE_RATE_DURATION);
144    
145    
146    
147      /**
148       * The header key that represents whether the input should be repeated.
149       */
150      public static final String REPEAT_KEY = "repeat";
151    
152    
153    
154      /**
155       * A list of all header keys that we support.
156       */
157      public static final List<String> KEYS =
158           Arrays.asList(DEFAULT_DURATION_KEY, FORMAT_KEY, REPEAT_KEY);
159    
160    
161    
162      // Other headers to consider:
163      // * rate-multiplier, so you can easily proportionally increase or decrease
164      //   every target rate without changing all the target rates directly.
165      // * duration-multiplier, so you can easily proportionally increase or
166      //   decrease the length of time to spend at target rates.
167      // * rate-change-behavior, so you can specify the behavior that should be
168      //   exhibited when transitioning from one rate to another (e.g., instant
169      //   jump, linear acceleration, sine-based acceleration, etc.).
170      // * jitter, so we can introduce some amount of random jitter in the target
171      //   rate (in which the actual target rate may be frequently adjusted to be
172      //   slightly higher or lower than the designated target rate).
173      // * spike, so we can introduce periodic, substantial increases in the target
174      //   rate.
175    
176    
177    
178      // The barrier whose rate is adjusted.
179      private final FixedRateBarrier barrier;
180    
181      // A list of rates per second and the number of milliseconds that the
182      // specified rate should be maintained.
183      private final List<ObjectPair<Double,Long>> ratesAndDurations;
184    
185      // If this is true, then the ratesAndDurations will be repeated until this is
186      // shut down.
187      private final boolean repeat;
188    
189      // Set to true when this should shut down.
190      private volatile boolean shutDown = false;
191    
192      // This is used to make sure we set the initial rate before start() returns.
193      private final CountDownLatch initialRateSetLatch = new CountDownLatch(1);
194    
195      // This allows us to interrupt when we are sleeping.
196      private final WakeableSleeper sleeper = new WakeableSleeper();
197    
198    
199    
200      /**
201       * Returns a new RateAdjustor with the specified parameters.  See the
202       * class-level javadoc for more information.
203       *
204       * @param  barrier            The barrier to update based on the specified
205       *                            rates.
206       * @param  baseRatePerSecond  The baseline rate per second, or {@code null}
207       *                            if none was specified.
208       * @param  rates              A file containing a list of rates and durations
209       *                            as described in the class-level javadoc.
210       *
211       * @return  A new RateAdjustor constructed from the specified parameters.
212       *
213       * @throws  IOException               If there is a problem reading from
214       *                                    the rates Reader.
215       * @throws  IllegalArgumentException  If there is a problem with the rates
216       *                                    input.
217       */
218      public static RateAdjustor newInstance(final FixedRateBarrier barrier,
219                                             final Integer baseRatePerSecond,
220                                             final File rates)
221             throws IOException, IllegalArgumentException
222      {
223        final Reader reader = new FileReader(rates);
224        return new RateAdjustor(
225             barrier,
226             (baseRatePerSecond == null) ? 0 : baseRatePerSecond,
227             reader);
228      }
229    
230    
231    
232      /**
233       * Retrieves a string that may be used as the description of the argument that
234       * specifies the path to a variable rate data file for use in conjunction with
235       * this rate adjustor.
236       *
237       * @param  genArgName  The name of the argument that may be used to generate a
238       *                     sample variable rate data file.
239       *
240       * @return   A string that may be used as the description of the argument that
241       *           specifies the path to a variable rate data file for use in
242       *           conjunction with this rate adjustor.
243       */
244      public static String getVariableRateDataArgumentDescription(
245                                final String genArgName)
246      {
247        return INFO_RATE_ADJUSTOR_VARIABLE_RATE_DATA_ARG_DESCRIPTION.get(
248             genArgName);
249      }
250    
251    
252    
253      /**
254       * Retrieves a string that may be used as the description of the argument that
255       * generates a sample variable rate data file that serves as documentation of
256       * the variable rate data format.
257       *
258       * @param  dataFileArgName  The name of the argument that specifies the path
259       *                          to a file
260       *
261       * @return   A string that may be used as the description of the argument that
262       *           generates a sample variable rate data file that serves as
263       *           documentation of the variable rate data format.
264       */
265      public static String getGenerateSampleVariableRateFileDescription(
266                                final String dataFileArgName)
267      {
268        return INFO_RATE_ADJUSTOR_GENERATE_SAMPLE_RATE_FILE_ARG_DESCRIPTION.get(
269             dataFileArgName);
270      }
271    
272    
273    
274      /**
275       * Writes a sample variable write data file to the specified location.
276       *
277       * @param  f  The path to the file to be written.
278       *
279       * @throws  IOException  If a problem is encountered while writing to the
280       *                       specified file.
281       */
282      public static void writeSampleVariableRateFile(final File f)
283             throws IOException
284      {
285        final PrintWriter w = new PrintWriter(f);
286        try
287        {
288          w.println("# This is an example variable rate data file.  All blank " +
289               "lines will be ignored.");
290          w.println("# All lines starting with the '#' character are considered " +
291               "comments and will");
292          w.println("# also be ignored.");
293          w.println();
294          w.println("# The beginning of the file must be a header containing " +
295               "properties pertaining");
296          w.println("# to the variable rate data.  All headers must be in the " +
297               "format 'name=value',");
298          w.println("# in which any spaces surrounding the equal sign will be " +
299               "ignored.");
300          w.println();
301          w.println("# The first header should be the 'format' header, which " +
302               "specifies the format");
303          w.println("# for the variable rate data file.  This header is " +
304               "required.  At present, the");
305          w.println("# only supported format is 'rate-and-duration', although " +
306               "additional formats may");
307          w.println("# be added in the future.");
308          w.println("format = rate-and-duration");
309          w.println();
310          w.println("# The optional 'default-duration' header may be used to " +
311               "specify a duration that");
312          w.println("# will be used for any interval that does not explicitly " +
313               "specify a duration.");
314          w.println("# The duration must consist of a positive integer value " +
315               "followed by a time");
316          w.println("# unit (with zero or more spaces separating the integer " +
317               "value from the unit).");
318          w.println("# The supported time units are:");
319          w.println("#");
320          w.println("# - nanoseconds, nanosecond, nanos, nano, ns");
321          w.println("# - microseconds, microseconds, micros, micro, us");
322          w.println("# - milliseconds, millisecond, millis, milli, ms");
323          w.println("# - seconds, second, secs, sec, s");
324          w.println("# - minutes, minute, mins, min, m");
325          w.println("# - hours, hour, hrs, hr, h");
326          w.println("# - days, day, d");
327          w.println("#");
328          w.println("# If no 'default-duration' header is present, then every " +
329               "data interval must");
330          w.println("# include an explicitly-specified duration.");
331          w.println("default-duration = 10 seconds");
332          w.println();
333          w.println("# The optional 'repeat' header may be used to indicate how " +
334               "the tool should");
335          w.println("# behave once the end of the variable rate data definitions " +
336               "has been reached.");
337          w.println("# If the 'repeat' header is present with a value of 'true', " +
338               "then the tool will");
339          w.println("# operate in an endless loop, returning to the beginning of " +
340               "the variable rate");
341          w.println("# definitions once the end has been reached.  If the " +
342               "'repeat' header is present");
343          w.println("# with a value of 'false', or if the 'repeat' header is " +
344               "absent, then the tool");
345          w.println("# will exit after it has processed all of the variable " +
346               "rate definitions.");
347          w.println("repeat = true");
348          w.println();
349          w.println("# After all header properties have been specified, the end " +
350               "of the header must");
351          w.println("# be signified with a line containing only the text 'END " +
352               "HEADER'.");
353          w.println("END HEADER");
354          w.println();
355          w.println();
356          w.println("# After the header is complete, the variable rate " +
357               "definitions should be");
358          w.println("# provided.  Each definition should be given on a line by " +
359               "itself, and should");
360          w.println("# contain a target rate per second and an optional length " +
361               "of time to maintain");
362          w.println("# that rate.");
363          w.println("#");
364          w.println("# The target rate must always be present in a variable " +
365               "rate definition.  It may");
366          w.println("# be either a positive integer value that specifies the " +
367               "absolute target rate");
368          w.println("# per second (e.g., a value of '1000' indicates a target " +
369               "rate of 1000");
370          w.println("# operations per second), or it may be a floating-point " +
371               "value followed by the");
372          w.println("# letter 'x' to indicate that it is a multiplier of the " +
373               "value specified by the");
374          w.println("# '--ratePerSecond' argument (e.g., if the " +
375               "'--ratePerSecond' argument is");
376          w.println("# present with a value of 1000, then a target rate value " +
377               "of '0.75x' indicates a");
378          w.println("# target rate that is 75% of the '--ratePerSecond' value, " +
379               "or 750 operations per");
380          w.println("# second).  If the latter format is used, then the " +
381               "'--ratePerSecond' argument");
382          w.println("# must be provided.");
383          w.println("#");
384          w.println("# The duration may optionally be present in a variable " +
385               "rate definition.  If");
386          w.println("# present, it must be separated from the target rate by a " +
387               "comma (and there may");
388          w.println("# be zero or more spaces on either side of the comma).  " +
389               "The duration must be in");
390          w.println("# the same format as specified in the description of the " +
391               "'default-duration'");
392          w.println("# header above (i.e., a positive integer followed by a " +
393               "time unit).  If a");
394          w.println("# variable rate definition does not include a duration, " +
395               "then the");
396          w.println("# 'default-duration' header must have been specified, and " +
397               "that default duration");
398          w.println("# will be used for that variable rate definition.");
399          w.println("#");
400          w.println("# The following variable rate definitions may be used to " +
401               "stairstep the target");
402          w.println("# rate from 1000 operations per second to 10000 operations " +
403               "per second, in");
404          w.println("# increments of 1000 operations per second, spending one " +
405               "minute at each level.");
406          w.println("# If the 'repeat' header is present with a value of 'true', " +
407               "then the process");
408          w.println("# will start back over at 1000 operations per second after " +
409               "completing one");
410          w.println("# minute at 10000 operations per second.  Otherwise, the " +
411               "tool will exit after");
412          w.println("# completing the 10000 operation-per-second interval.");
413          w.println("1000, 1 minute");
414          w.println("2000, 1 minute");
415          w.println("3000, 1 minute");
416          w.println("4000, 1 minute");
417          w.println("5000, 1 minute");
418          w.println("6000, 1 minute");
419          w.println("7000, 1 minute");
420          w.println("8000, 1 minute");
421          w.println("9000, 1 minute");
422          w.println("10000, 1 minute");
423          w.println();
424        }
425        finally
426        {
427          w.close();
428        }
429      }
430    
431    
432    
433      /**
434       * Constructs a new RateAdjustor with the specified parameters.  See the
435       * class-level javadoc for more information.
436       *
437       * @param  barrier            The barrier to update based on the specified
438       *                            rates.
439       * @param  baseRatePerSecond  The baseline rate per second, or 0 if none was
440       *                            specified.
441       * @param  rates              A list of rates and durations as described in
442       *                            the class-level javadoc.  The reader will
443       *                            always be closed before this method returns.
444       *
445       * @throws  IOException               If there is a problem reading from
446       *                                    the rates Reader.
447       * @throws  IllegalArgumentException  If there is a problem with the rates
448       *                                    input.
449       */
450      public RateAdjustor(final FixedRateBarrier barrier,
451                          final long baseRatePerSecond,
452                          final Reader rates)
453             throws IOException, IllegalArgumentException
454      {
455        // Read the header first.
456        final List<String> lines;
457        try
458        {
459          Validator.ensureNotNull(barrier, rates);
460          setDaemon(true);
461          this.barrier = barrier;
462    
463          lines = readLines(rates);
464        }
465        finally
466        {
467          rates.close();
468        }
469    
470        final Map<String,String> header = consumeHeader(lines);
471    
472        final Set<String> invalidKeys = new LinkedHashSet<String>(header.keySet());
473        invalidKeys.removeAll(KEYS);
474        if (! invalidKeys.isEmpty())
475        {
476          throw new IllegalArgumentException(
477               ERR_RATE_ADJUSTOR_INVALID_KEYS.get(invalidKeys, KEYS));
478        }
479    
480        final String format = header.get(FORMAT_KEY);
481        if (format == null)
482        {
483          throw new IllegalArgumentException(ERR_RATE_ADJUSTOR_MISSING_FORMAT.get(
484               FORMAT_KEY, FORMATS, COMMENT_START));
485        }
486    
487        if (! format.equals(FORMAT_VALUE_RATE_DURATION))
488        {
489          // For now this is the only format that we support.
490          throw new IllegalArgumentException(
491               ERR_RATE_ADJUSTOR_INVALID_FORMAT.get(format, FORMAT_KEY, FORMATS));
492        }
493    
494        repeat = Boolean.parseBoolean(header.get(REPEAT_KEY));
495    
496        // This will be non-zero if it's set in the input.
497        long defaultDurationMillis = 0;
498        final String defaultDurationStr = header.get(DEFAULT_DURATION_KEY);
499        if (defaultDurationStr != null)
500        {
501          try
502          {
503            defaultDurationMillis = DurationArgument.parseDuration(
504                 defaultDurationStr, TimeUnit.MILLISECONDS);
505          }
506          catch (final ArgumentException e)
507          {
508            debugException(e);
509            throw new IllegalArgumentException(
510                 ERR_RATE_ADJUSTOR_INVALID_DEFAULT_DURATION.get(
511                            defaultDurationStr, e.getExceptionMessage()),
512                 e);
513          }
514        }
515    
516        // Now parse out the rates and durations, which will look like this:
517        //  1000,1s
518        //  1.5,1d
519        //  0.5X, 1m
520        //  # Duration can be omitted if default-duration header was included.
521        //  1000
522        final List<ObjectPair<Double,Long>> ratesAndDurationList =
523                new ArrayList<ObjectPair<Double,Long>>(10);
524        final Pattern splitPattern = Pattern.compile("\\s*,\\s*");
525        for (final String fullLine: lines)
526        {
527          // Strip out comments and white space.
528          String line = fullLine;
529          final int commentStart = fullLine.indexOf(COMMENT_START);
530          if (commentStart >= 0)
531          {
532            line = line.substring(0, commentStart);
533          }
534          line = line.trim();
535    
536          if (line.length() == 0)
537          {
538            continue;
539          }
540    
541          final String[] fields = splitPattern.split(line);
542          if (!((fields.length == 2) ||
543                ((fields.length == 1) && defaultDurationMillis != 0)))
544          {
545            throw new IllegalArgumentException(ERR_RATE_ADJUSTOR_INVALID_LINE.get(
546                 fullLine, DEFAULT_DURATION_KEY));
547          }
548    
549          String rateStr = fields[0];
550    
551          boolean isRateMultiplier = false;
552          if (rateStr.endsWith("X") || rateStr.endsWith("x"))
553          {
554            rateStr = rateStr.substring(0, rateStr.length() - 1).trim();
555            isRateMultiplier = true;
556          }
557    
558          double rate;
559          try
560          {
561            rate = Double.parseDouble(rateStr);
562          }
563          catch (final NumberFormatException e)
564          {
565            debugException(e);
566            throw new IllegalArgumentException(
567                 ERR_RATE_ADJUSTOR_INVALID_RATE.get(rateStr, fullLine), e);
568          }
569    
570          // Values that look like 2X are a multiplier on the base rate.
571          if (isRateMultiplier)
572          {
573            if (baseRatePerSecond <= 0)
574            {
575              throw new IllegalArgumentException(
576                      ERR_RATE_ADJUSTOR_RELATIVE_RATE_WITHOUT_BASELINE.get(
577                              rateStr, fullLine));
578            }
579    
580            rate *= baseRatePerSecond;
581          }
582    
583          final long durationMillis;
584          if (fields.length < 2)
585          {
586            durationMillis = defaultDurationMillis;
587          }
588          else
589          {
590            final String duration = fields[1];
591            try
592            {
593              durationMillis = DurationArgument.parseDuration(
594                      duration, TimeUnit.MILLISECONDS);
595            }
596            catch (final ArgumentException e)
597            {
598              debugException(e);
599              throw new IllegalArgumentException(
600                   ERR_RATE_ADJUSTOR_INVALID_DURATION.get(duration, fullLine,
601                        e.getExceptionMessage()),
602                   e);
603            }
604          }
605    
606          ratesAndDurationList.add(
607               new ObjectPair<Double,Long>(rate, durationMillis));
608        }
609        ratesAndDurations = Collections.unmodifiableList(ratesAndDurationList);
610      }
611    
612    
613    
614      /**
615       * Starts this thread and waits for the initial rate to be set.
616       */
617      @Override
618      public void start()
619      {
620        super.start();
621    
622        // Wait until the initial rate is set.  Assuming the caller starts this
623        // RateAdjustor before the FixedRateBarrier is used by other threads,
624        // this will guarantee that the initial rate is in place before the
625        // barrier is used.
626        try
627        {
628          initialRateSetLatch.await();
629        }
630        catch (final InterruptedException e)
631        {
632          debugException(e);
633        }
634      }
635    
636    
637    
638      /**
639       * Adjusts the rate in FixedRateBarrier as described in the rates.
640       */
641      @Override
642      public void run()
643      {
644        try
645        {
646          if (ratesAndDurations.isEmpty())
647          {
648            return;
649          }
650    
651          do
652          {
653            final List<ObjectPair<Double,Long>> ratesAndEndTimes =
654                 new ArrayList<ObjectPair<Double,Long>>(ratesAndDurations.size());
655            long endTime = System.currentTimeMillis();
656            for (final ObjectPair<Double,Long> rateAndDuration : ratesAndDurations)
657            {
658              endTime += rateAndDuration.getSecond();
659              ratesAndEndTimes.add(new ObjectPair<Double,Long>(
660                   rateAndDuration.getFirst(), endTime));
661            }
662    
663            for (final ObjectPair<Double,Long> rateAndEndTime: ratesAndEndTimes)
664            {
665              if (shutDown)
666              {
667                return;
668              }
669    
670              final double rate = rateAndEndTime.getFirst();
671              final long intervalMillis = barrier.getTargetRate().getFirst();
672              final int perInterval = calculatePerInterval(intervalMillis, rate);
673    
674              barrier.setRate(intervalMillis, perInterval);
675    
676              // Signal start() that we've set the initial rate.
677              if (initialRateSetLatch.getCount() > 0)
678              {
679                initialRateSetLatch.countDown();
680              }
681    
682              // Hold at this rate for the specified duration.
683              final long durationMillis =
684                   rateAndEndTime.getSecond() - System.currentTimeMillis();
685              if (durationMillis > 0L)
686              {
687                sleeper.sleep(durationMillis);
688              }
689            }
690          }
691          while (repeat);
692        }
693        finally
694        {
695          // Just in case we happened to be shutdown before we were started.
696          // We still want start() to be able to return.
697          if (initialRateSetLatch.getCount() > 0)
698          {
699            initialRateSetLatch.countDown();
700          }
701        }
702      }
703    
704    
705    
706      /**
707       * Signals this to shut down.
708       */
709      public void shutDown()
710      {
711        shutDown = true;
712        sleeper.wakeup();
713      }
714    
715    
716    
717      /**
718       * Returns the of rates and durations.  This is primarily here for testing
719       * purposes.
720       *
721       * @return  The list of rates and durations.
722       */
723      List<ObjectPair<Double,Long>> getRatesAndDurations()
724      {
725        return ratesAndDurations;
726      }
727    
728    
729    
730      /**
731       * Calculates the rate per interval given the specified interval width
732       * and the target rate per second.  (This is static and non-private so that
733       * it can be unit tested.)
734       *
735       * @param intervalDurationMillis  The duration of the interval in
736       *                                milliseconds.
737       * @param ratePerSecond           The target rate per second.
738       *
739       * @return  The rate per interval, which will be at least 1.
740       */
741      static int calculatePerInterval(final long intervalDurationMillis,
742                                      final double ratePerSecond)
743      {
744        final double intervalDurationSeconds = intervalDurationMillis / 1000.0;
745        final double ratePerInterval = ratePerSecond * intervalDurationSeconds;
746        return (int)Math.max(1, Math.round(ratePerInterval));
747      }
748    
749    
750    
751      /**
752       * This reads the header at the start of the file.  All blank lines and
753       * comment lines will be ignored.  The end of the header will be signified by
754       * a line containing only the text "END HEADER".  All non-blank, non-comment
755       * lines in the header must be in the format "name=value", where there may be
756       * zero or more spaces on either side of the equal sign, the name must not
757       * contain either the space or the equal sign character, and the value must
758       * not begin or end with a space.  Header lines must not contain partial-line
759       * comments.
760       *
761       * @param  lines  The lines of input that include the header.
762       *
763       * @return  A map of key/value pairs extracted from the header.
764       *
765       * @throws  IllegalArgumentException  If a problem is encountered while
766       *                                    parsing the header (e.g., a malformed
767       *                                    header line is encountered, multiple
768       *                                    headers have the same key, there is no
769       *                                    end of header marker, etc.).
770       */
771      static Map<String,String> consumeHeader(final List<String> lines)
772             throws IllegalArgumentException
773      {
774        // The header will look like this:
775        // key1=value1
776        // key2 = value2
777        // END HEADER
778        boolean endHeaderFound = false;
779        final Map<String,String> headerMap = new LinkedHashMap<String,String>(3);
780        final Iterator<String> lineIter = lines.iterator();
781        while (lineIter.hasNext())
782        {
783          final String line = lineIter.next().trim();
784          lineIter.remove();
785    
786          if ((line.length() == 0) ||
787               line.startsWith(String.valueOf(COMMENT_START)))
788          {
789            continue;
790          }
791    
792          if (line.equalsIgnoreCase(END_HEADER_TEXT))
793          {
794            endHeaderFound = true;
795            break;
796          }
797    
798          final int equalPos = line.indexOf('=');
799          if (equalPos < 0)
800          {
801            throw new IllegalArgumentException(
802                 ERR_RATE_ADJUSTOR_HEADER_NO_EQUAL.get(line));
803          }
804    
805          final String key = line.substring(0, equalPos).trim();
806          if (key.length() == 0)
807          {
808            throw new IllegalArgumentException(
809                 ERR_RATE_ADJUSTOR_HEADER_EMPTY_KEY.get(line));
810          }
811    
812          final String newValue = line.substring(equalPos+1).trim();
813          final String existingValue = headerMap.get(key);
814          if (existingValue != null)
815          {
816            throw new IllegalArgumentException(
817                 ERR_RATE_ADJUSTOR_DUPLICATE_HEADER_KEY.get(key, existingValue,
818                      newValue));
819          }
820    
821          headerMap.put(key, newValue);
822        }
823    
824        if (! endHeaderFound)
825        {
826          // This means we iterated across all lines without finding the end header
827          // marker.
828          throw new IllegalArgumentException(
829               ERR_RATE_ADJUSTOR_NO_END_HEADER_FOUND.get(END_HEADER_TEXT));
830        }
831    
832        return headerMap;
833      }
834    
835    
836    
837      /**
838       * Returns a list of the lines read from the specified Reader.
839       *
840       * @param  reader  The Reader to read from.
841       *
842       * @return  A list of the lines read from the specified Reader.
843       *
844       * @throws  IOException  If there is a problem reading from the Reader.
845       */
846      private static List<String> readLines(final Reader reader) throws IOException
847      {
848        final BufferedReader bufferedReader = new BufferedReader(reader);
849    
850        // We remove items from the front of the list, so a linked list works best.
851        final List<String> lines = new LinkedList<String>();
852    
853        String line;
854        while ((line = bufferedReader.readLine()) != null)
855        {
856          lines.add(line);
857        }
858    
859        return lines;
860      }
861    }
862