001/*
002 * Copyright 2016-2022 Ping Identity Corporation
003 * All Rights Reserved.
004 */
005/*
006 * Copyright 2016-2022 Ping Identity Corporation
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *    http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020/*
021 * Copyright (C) 2016-2022 Ping Identity Corporation
022 *
023 * This program is free software; you can redistribute it and/or modify
024 * it under the terms of the GNU General Public License (GPLv2 only)
025 * or the terms of the GNU Lesser General Public License (LGPLv2.1 only)
026 * as published by the Free Software Foundation.
027 *
028 * This program is distributed in the hope that it will be useful,
029 * but WITHOUT ANY WARRANTY; without even the implied warranty of
030 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
031 * GNU General Public License for more details.
032 *
033 * You should have received a copy of the GNU General Public License
034 * along with this program; if not, see <http://www.gnu.org/licenses>.
035 */
036package com.unboundid.ldap.listener;
037
038
039
040import java.util.List;
041import java.util.concurrent.Semaphore;
042import java.util.concurrent.TimeUnit;
043
044import com.unboundid.ldap.protocol.AbandonRequestProtocolOp;
045import com.unboundid.ldap.protocol.AddRequestProtocolOp;
046import com.unboundid.ldap.protocol.AddResponseProtocolOp;
047import com.unboundid.ldap.protocol.BindRequestProtocolOp;
048import com.unboundid.ldap.protocol.BindResponseProtocolOp;
049import com.unboundid.ldap.protocol.CompareRequestProtocolOp;
050import com.unboundid.ldap.protocol.CompareResponseProtocolOp;
051import com.unboundid.ldap.protocol.DeleteRequestProtocolOp;
052import com.unboundid.ldap.protocol.DeleteResponseProtocolOp;
053import com.unboundid.ldap.protocol.ExtendedRequestProtocolOp;
054import com.unboundid.ldap.protocol.ExtendedResponseProtocolOp;
055import com.unboundid.ldap.protocol.LDAPMessage;
056import com.unboundid.ldap.protocol.ModifyRequestProtocolOp;
057import com.unboundid.ldap.protocol.ModifyResponseProtocolOp;
058import com.unboundid.ldap.protocol.ModifyDNRequestProtocolOp;
059import com.unboundid.ldap.protocol.ModifyDNResponseProtocolOp;
060import com.unboundid.ldap.protocol.SearchRequestProtocolOp;
061import com.unboundid.ldap.protocol.SearchResultDoneProtocolOp;
062import com.unboundid.ldap.sdk.Control;
063import com.unboundid.ldap.sdk.LDAPException;
064import com.unboundid.ldap.sdk.OperationType;
065import com.unboundid.ldap.sdk.ResultCode;
066import com.unboundid.util.Debug;
067import com.unboundid.util.NotMutable;
068import com.unboundid.util.NotNull;
069import com.unboundid.util.Nullable;
070import com.unboundid.util.StaticUtils;
071import com.unboundid.util.ThreadSafety;
072import com.unboundid.util.ThreadSafetyLevel;
073import com.unboundid.util.Validator;
074
075import static com.unboundid.ldap.listener.ListenerMessages.*;
076
077
078
079/**
080 * This class provides an implementation of an LDAP listener request handler
081 * that can be used to limit the number of requests that may be processed
082 * concurrently.  It uses one or more {@link Semaphore} instances to limit the
083 * number of requests that may be processed at any time, and provides the
084 * ability to impose limiting on a per-operation-type basis.
085 */
086@NotMutable()
087@ThreadSafety(level=ThreadSafetyLevel.COMPLETELY_THREADSAFE)
088public final class ConcurrentRequestLimiterRequestHandler
089       extends LDAPListenerRequestHandler
090{
091  // The downstream request handler that will be used to process the requests
092  // after any appropriate concurrent request limiting has been performed.
093  @NotNull private final LDAPListenerRequestHandler downstreamRequestHandler;
094
095  // A timeout value (expressed in milliseconds) that will cause the operation
096  // to be rejected rather than processed if the associated semaphore cannot be
097  // acquired in this length of time.
098  private final long rejectTimeoutMillis;
099
100  // The semaphores that will be used for each type of operation.
101  @Nullable private final          Semaphore abandonSemaphore;
102  @Nullable private final Semaphore addSemaphore;
103  @Nullable private final Semaphore bindSemaphore;
104  @Nullable private final Semaphore compareSemaphore;
105  @Nullable private final Semaphore deleteSemaphore;
106  @Nullable private final Semaphore extendedSemaphore;
107  @Nullable private final Semaphore modifySemaphore;
108  @Nullable private final Semaphore modifyDNSemaphore;
109  @Nullable private final Semaphore searchSemaphore;
110
111
112
113  /**
114   * Creates a new concurrent request limiter request handler that will impose
115   * the specified limit on the number of operations that may be in progress at
116   * any time.  The limit will be enforced for all types of operations except
117   * abandon and unbind operations, which will not be limited.
118   *
119   * @param  downstreamRequestHandler  The downstream request handler that will
120   *                                   be used to actually process the requests
121   *                                   after any appropriate limiting has been
122   *                                   performed.
123   * @param  maxConcurrentRequests     The maximum number of requests that may
124   *                                   be processed at any given time.  This
125   *                                   limit will be enforced for all operation
126   *                                   types except abandon and unbind, which
127   *                                   will not be limited.
128   * @param  rejectTimeoutMillis       A timeout value (expressed in
129   *                                   milliseconds) that will cause a requested
130   *                                   operation to be rejected rather than
131   *                                   processed if the associate semaphore
132   *                                   cannot be acquired in this length of
133   *                                   time.  A value of zero indicates that the
134   *                                   operation should be rejected immediately
135   *                                   if the maximum number of concurrent
136   *                                   requests are already in progress.  A
137   *                                   value that is less than zero indicates
138   *                                   that no timeout should be imposed and
139   *                                   that requests should be forced to wait as
140   *                                   long as necessary until they can be
141   *                                   processed.
142   */
143  public ConcurrentRequestLimiterRequestHandler(
144       @NotNull final LDAPListenerRequestHandler downstreamRequestHandler,
145       final int maxConcurrentRequests, final long rejectTimeoutMillis)
146  {
147    this(downstreamRequestHandler, new Semaphore(maxConcurrentRequests),
148         rejectTimeoutMillis);
149  }
150
151
152
153  /**
154   * Creates a new concurrent request limiter request handler that will use the
155   * provided semaphore to limit on the number of operations that may be in
156   * progress at any time.  The limit will be enforced for all types of
157   * operations except abandon and unbind operations, which will not be limited.
158   *
159   * @param  downstreamRequestHandler  The downstream request handler that will
160   *                                   be used to actually process the requests
161   *                                   after any appropriate limiting has been
162   *                                   performed.
163   * @param  semaphore                 The semaphore that will be used to limit
164   *                                   the number of concurrent operations in
165   *                                   progress, for all operation types except
166   *                                   abandon and unbind.
167   * @param  rejectTimeoutMillis       A timeout value (expressed in
168   *                                   milliseconds) that will cause a requested
169   *                                   operation to be rejected rather than
170   *                                   processed if the associate semaphore
171   *                                   cannot be acquired in this length of
172   *                                   time.  A value of zero indicates that the
173   *                                   operation should be rejected immediately
174   *                                   if the maximum number of concurrent
175   *                                   requests are already in progress.  A
176   *                                   value that is less than zero indicates
177   *                                   that no timeout should be imposed and
178   *                                   that requests should be forced to wait as
179   *                                   long as necessary until they can be
180   *                                   processed.
181   */
182  public ConcurrentRequestLimiterRequestHandler(
183       @NotNull final LDAPListenerRequestHandler downstreamRequestHandler,
184       @NotNull final Semaphore semaphore, final long rejectTimeoutMillis)
185  {
186    this(downstreamRequestHandler, null, semaphore, semaphore, semaphore,
187         semaphore, semaphore, semaphore, semaphore, semaphore,
188         rejectTimeoutMillis);
189  }
190
191
192
193  /**
194   * Creates a new concurrent request limiter request handler that can use the
195   * provided semaphore instances to limit the number of operations in progress
196   * concurrently for each type of operation.  The same semaphore instance can
197   * be provided for multiple operation types if performance for those
198   * operations should be limited in aggregate rather than individually (e.g.,
199   * if you don't want the total combined number of search and modify operations
200   * in progress at any time to exceed a given threshold, then you could provide
201   * the same semaphore instance for the {@code modifySemaphore} and
202   * {@code searchSemaphore} arguments).
203   *
204   * @param  downstreamRequestHandler  The downstream request handler that will
205   *                                   be used to actually process the requests
206   *                                   after any appropriate rate limiting has
207   *                                   been performed.  It must not be
208   *                                   {@code null}.
209   * @param  abandonSemaphore          The semaphore to use when processing
210   *                                   abandon operations.  It may be
211   *                                   {@code null} if no concurrent request
212   *                                   limiting should be performed for abandon
213   *                                   operations.
214   * @param  addSemaphore              The semaphore to use when processing add
215   *                                   operations.  It may be {@code null} if no
216   *                                   concurrent request limiting should be
217   *                                   performed for add operations.
218   * @param  bindSemaphore             The semaphore to use when processing
219   *                                   bind operations.  It may be
220   *                                   {@code null} if no concurrent request
221   *                                   limiting should be performed for bind
222   *                                   operations.
223   * @param  compareSemaphore          The semaphore to use when processing
224   *                                   compare operations.  It may be
225   *                                   {@code null} if no concurrent request
226   *                                   limiting should be performed for compare
227   *                                   operations.
228   * @param  deleteSemaphore           The semaphore to use when processing
229   *                                   delete operations.  It may be
230   *                                   {@code null} if no concurrent request
231   *                                   limiting should be performed for delete
232   *                                   operations.
233   * @param  extendedSemaphore         The semaphore to use when processing
234   *                                   extended operations.  It may be
235   *                                   {@code null} if no concurrent request
236   *                                   limiting should be performed for extended
237   *                                   operations.
238   * @param  modifySemaphore           The semaphore to use when processing
239   *                                   modify operations.  It may be
240   *                                   {@code null} if no concurrent request
241   *                                   limiting should be performed for modify
242   *                                   operations.
243   * @param  modifyDNSemaphore         The semaphore to use when processing
244   *                                   modify DN operations.  It may be
245   *                                   {@code null} if no concurrent request
246   *                                   limiting should be performed for modify
247   *                                   DN operations.
248   * @param  searchSemaphore           The semaphore to use when processing
249   *                                   search operations.  It may be
250   *                                   {@code null} if no concurrent request
251   *                                   limiting should be performed for search
252   *                                   operations.
253   * @param  rejectTimeoutMillis       A timeout value (expressed in
254   *                                   milliseconds) that will cause a requested
255   *                                   operation to be rejected rather than
256   *                                   processed if the associate semaphore
257   *                                   cannot be acquired in this length of
258   *                                   time.  A value of zero indicates that the
259   *                                   operation should be rejected immediately
260   *                                   if the maximum number of concurrent
261   *                                   requests are already in progress.  A
262   *                                   value that is less than zero indicates
263   *                                   that no timeout should be imposed and
264   *                                   that requests should be forced to wait as
265   *                                   long as necessary until they can be
266   *                                   processed.
267   */
268  public ConcurrentRequestLimiterRequestHandler(
269       @NotNull final LDAPListenerRequestHandler downstreamRequestHandler,
270       @Nullable final Semaphore abandonSemaphore,
271       @Nullable final Semaphore addSemaphore,
272       @Nullable final Semaphore bindSemaphore,
273       @Nullable final Semaphore compareSemaphore,
274       @Nullable final Semaphore deleteSemaphore,
275       @Nullable final Semaphore extendedSemaphore,
276       @Nullable final Semaphore modifySemaphore,
277       @Nullable final Semaphore modifyDNSemaphore,
278       @Nullable final Semaphore searchSemaphore,
279       final long rejectTimeoutMillis)
280  {
281    Validator.ensureNotNull(downstreamRequestHandler);
282
283    this.downstreamRequestHandler = downstreamRequestHandler;
284    this.abandonSemaphore         = abandonSemaphore;
285    this.addSemaphore             = addSemaphore;
286    this.bindSemaphore            = bindSemaphore;
287    this.compareSemaphore         = compareSemaphore;
288    this.deleteSemaphore          = deleteSemaphore;
289    this.extendedSemaphore        = extendedSemaphore;
290    this.modifySemaphore          = modifySemaphore;
291    this.modifyDNSemaphore        = modifyDNSemaphore;
292    this.searchSemaphore          = searchSemaphore;
293
294    if (rejectTimeoutMillis >= 0L)
295    {
296      this.rejectTimeoutMillis = rejectTimeoutMillis;
297    }
298    else
299    {
300      this.rejectTimeoutMillis = (long) Integer.MAX_VALUE;
301    }
302  }
303
304
305
306  /**
307   * {@inheritDoc}
308   */
309  @Override()
310  @NotNull()
311  public ConcurrentRequestLimiterRequestHandler newInstance(
312              @NotNull final LDAPListenerClientConnection connection)
313         throws LDAPException
314  {
315    return new ConcurrentRequestLimiterRequestHandler(
316         downstreamRequestHandler.newInstance(connection), abandonSemaphore,
317         addSemaphore, bindSemaphore, compareSemaphore, deleteSemaphore,
318         extendedSemaphore, modifySemaphore, modifyDNSemaphore,
319         searchSemaphore, rejectTimeoutMillis);
320  }
321
322
323
324  /**
325   * {@inheritDoc}
326   */
327  @Override()
328  public void processAbandonRequest(final int messageID,
329                   @NotNull final AbandonRequestProtocolOp request,
330                   @NotNull final List<Control> controls)
331  {
332    try
333    {
334      acquirePermit(abandonSemaphore, OperationType.ABANDON);
335    }
336    catch (final LDAPException le)
337    {
338      Debug.debugException(le);
339      return;
340    }
341
342    try
343    {
344      downstreamRequestHandler.processAbandonRequest(messageID, request,
345           controls);
346    }
347    finally
348    {
349      releasePermit(abandonSemaphore);
350    }
351  }
352
353
354
355  /**
356   * {@inheritDoc}
357   */
358  @Override()
359  @NotNull()
360  public LDAPMessage processAddRequest(final int messageID,
361                          @NotNull final AddRequestProtocolOp request,
362                          @NotNull final List<Control> controls)
363  {
364    try
365    {
366      acquirePermit(addSemaphore, OperationType.ADD);
367    }
368    catch (final LDAPException le)
369    {
370      Debug.debugException(le);
371      return new LDAPMessage(messageID,
372           new AddResponseProtocolOp(le.toLDAPResult()));
373    }
374
375    try
376    {
377      return downstreamRequestHandler.processAddRequest(messageID, request,
378           controls);
379    }
380    finally
381    {
382      releasePermit(addSemaphore);
383    }
384  }
385
386
387
388  /**
389   * {@inheritDoc}
390   */
391  @Override()
392  @NotNull()
393  public LDAPMessage processBindRequest(final int messageID,
394                          @NotNull final BindRequestProtocolOp request,
395                          @NotNull final List<Control> controls)
396  {
397    try
398    {
399      acquirePermit(bindSemaphore, OperationType.BIND);
400    }
401    catch (final LDAPException le)
402    {
403      Debug.debugException(le);
404      return new LDAPMessage(messageID,
405           new BindResponseProtocolOp(le.toLDAPResult()));
406    }
407
408    try
409    {
410      return downstreamRequestHandler.processBindRequest(messageID, request,
411           controls);
412    }
413    finally
414    {
415      releasePermit(bindSemaphore);
416    }
417  }
418
419
420
421  /**
422   * {@inheritDoc}
423   */
424  @Override()
425  @NotNull()
426  public LDAPMessage processCompareRequest(final int messageID,
427                          @NotNull final CompareRequestProtocolOp request,
428                          @NotNull final List<Control> controls)
429  {
430    try
431    {
432      acquirePermit(compareSemaphore, OperationType.COMPARE);
433    }
434    catch (final LDAPException le)
435    {
436      Debug.debugException(le);
437      return new LDAPMessage(messageID,
438           new CompareResponseProtocolOp(le.toLDAPResult()));
439    }
440
441    try
442    {
443      return downstreamRequestHandler.processCompareRequest(messageID, request,
444           controls);
445    }
446    finally
447    {
448      releasePermit(compareSemaphore);
449    }
450  }
451
452
453
454  /**
455   * {@inheritDoc}
456   */
457  @Override()
458  @NotNull()
459  public LDAPMessage processDeleteRequest(final int messageID,
460                          @NotNull final DeleteRequestProtocolOp request,
461                          @NotNull final List<Control> controls)
462  {
463    try
464    {
465      acquirePermit(deleteSemaphore, OperationType.DELETE);
466    }
467    catch (final LDAPException le)
468    {
469      Debug.debugException(le);
470      return new LDAPMessage(messageID,
471           new DeleteResponseProtocolOp(le.toLDAPResult()));
472    }
473
474    try
475    {
476      return downstreamRequestHandler.processDeleteRequest(messageID, request,
477           controls);
478    }
479    finally
480    {
481      releasePermit(deleteSemaphore);
482    }
483  }
484
485
486
487  /**
488   * {@inheritDoc}
489   */
490  @Override()
491  @NotNull()
492  public LDAPMessage processExtendedRequest(final int messageID,
493                          @NotNull final ExtendedRequestProtocolOp request,
494                          @NotNull final List<Control> controls)
495  {
496    try
497    {
498      acquirePermit(extendedSemaphore, OperationType.EXTENDED);
499    }
500    catch (final LDAPException le)
501    {
502      Debug.debugException(le);
503      return new LDAPMessage(messageID,
504           new ExtendedResponseProtocolOp(le.toLDAPResult()));
505    }
506
507    try
508    {
509      return downstreamRequestHandler.processExtendedRequest(messageID, request,
510           controls);
511    }
512    finally
513    {
514      releasePermit(extendedSemaphore);
515    }
516  }
517
518
519
520  /**
521   * {@inheritDoc}
522   */
523  @Override()
524  @NotNull()
525  public LDAPMessage processModifyRequest(final int messageID,
526                          @NotNull final ModifyRequestProtocolOp request,
527                          @NotNull final List<Control> controls)
528  {
529    try
530    {
531      acquirePermit(modifySemaphore, OperationType.MODIFY);
532    }
533    catch (final LDAPException le)
534    {
535      Debug.debugException(le);
536      return new LDAPMessage(messageID,
537           new ModifyResponseProtocolOp(le.toLDAPResult()));
538    }
539
540    try
541    {
542      return downstreamRequestHandler.processModifyRequest(messageID, request,
543           controls);
544    }
545    finally
546    {
547      releasePermit(modifySemaphore);
548    }
549  }
550
551
552
553  /**
554   * {@inheritDoc}
555   */
556  @Override()
557  @NotNull()
558  public LDAPMessage processModifyDNRequest(final int messageID,
559                          @NotNull final ModifyDNRequestProtocolOp request,
560                          @NotNull final List<Control> controls)
561  {
562    try
563    {
564      acquirePermit(modifyDNSemaphore, OperationType.MODIFY_DN);
565    }
566    catch (final LDAPException le)
567    {
568      Debug.debugException(le);
569      return new LDAPMessage(messageID,
570           new ModifyDNResponseProtocolOp(le.toLDAPResult()));
571    }
572
573    try
574    {
575      return downstreamRequestHandler.processModifyDNRequest(messageID, request,
576           controls);
577    }
578    finally
579    {
580      releasePermit(modifyDNSemaphore);
581    }
582  }
583
584
585
586  /**
587   * {@inheritDoc}
588   */
589  @Override()
590  @NotNull()
591  public LDAPMessage processSearchRequest(final int messageID,
592                          @NotNull final SearchRequestProtocolOp request,
593                          @NotNull final List<Control> controls)
594  {
595    try
596    {
597      acquirePermit(searchSemaphore, OperationType.SEARCH);
598    }
599    catch (final LDAPException le)
600    {
601      Debug.debugException(le);
602      return new LDAPMessage(messageID,
603           new SearchResultDoneProtocolOp(le.toLDAPResult()));
604    }
605
606    try
607    {
608      return downstreamRequestHandler.processSearchRequest(messageID, request,
609           controls);
610    }
611    finally
612    {
613      releasePermit(searchSemaphore);
614    }
615  }
616
617
618
619  /**
620   * Acquires a permit from the provided semaphore.
621   *
622   * @param  semaphore      The semaphore from which to acquire a permit.  It
623   *                        may be {@code null} if no semaphore is needed for
624   *                        the associated operation type.
625   * @param  operationType  The type of operation
626   *
627   * @throws  LDAPException  If it was not possible to acquire a permit from the
628   *                         provided semaphore.
629   */
630  private void acquirePermit(@NotNull final Semaphore semaphore,
631                             @NotNull final OperationType operationType)
632          throws LDAPException
633  {
634    if (semaphore == null)
635    {
636      return;
637    }
638
639    try
640    {
641      if (rejectTimeoutMillis == 0L)
642      {
643        if (! semaphore.tryAcquire())
644        {
645          throw new LDAPException(ResultCode.BUSY,
646               ERR_CONCURRENT_LIMITER_REQUEST_HANDLER_NO_TIMEOUT.get(
647                    operationType.name()));
648        }
649      }
650      else
651      {
652        if (! semaphore.tryAcquire(rejectTimeoutMillis, TimeUnit.MILLISECONDS))
653        {
654          throw new LDAPException(ResultCode.BUSY,
655               ERR_CONCURRENT_LIMITER_REQUEST_HANDLER_TIMEOUT.get(
656                    operationType.name(), rejectTimeoutMillis));
657        }
658      }
659    }
660    catch (final LDAPException le)
661    {
662      throw le;
663    }
664    catch (final Exception e)
665    {
666      Debug.debugException(e);
667      throw new LDAPException(ResultCode.OTHER,
668           ERR_CONCURRENT_LIMITER_REQUEST_HANDLER_SEMAPHORE_EXCEPTION.get(
669                operationType.name(), StaticUtils.getExceptionMessage(e)),
670           e);
671    }
672  }
673
674
675
676  /**
677   * Releases a permit back to the provided semaphore.
678   *
679   * @param  semaphore  The semaphore to which the permit should be released.
680   *                    It may be {@code null} if no semaphore is needed for the
681   *                    associated operation type.
682   */
683  private static void releasePermit(@NotNull final Semaphore semaphore)
684  {
685    if (semaphore != null)
686    {
687      semaphore.release();
688    }
689  }
690}