001    /*
002     * Copyright 2016 UnboundID Corp.
003     * All Rights Reserved.
004     */
005    /*
006     * Copyright (C) 2016 UnboundID Corp.
007     *
008     * This program is free software; you can redistribute it and/or modify
009     * it under the terms of the GNU General Public License (GPLv2 only)
010     * or the terms of the GNU Lesser General Public License (LGPLv2.1 only)
011     * as published by the Free Software Foundation.
012     *
013     * This program is distributed in the hope that it will be useful,
014     * but WITHOUT ANY WARRANTY; without even the implied warranty of
015     * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
016     * GNU General Public License for more details.
017     *
018     * You should have received a copy of the GNU General Public License
019     * along with this program; if not, see <http://www.gnu.org/licenses>.
020     */
021    package com.unboundid.ldap.listener;
022    
023    
024    
025    import java.util.List;
026    import java.util.concurrent.Semaphore;
027    import java.util.concurrent.TimeUnit;
028    
029    import com.unboundid.ldap.protocol.AbandonRequestProtocolOp;
030    import com.unboundid.ldap.protocol.AddRequestProtocolOp;
031    import com.unboundid.ldap.protocol.AddResponseProtocolOp;
032    import com.unboundid.ldap.protocol.BindRequestProtocolOp;
033    import com.unboundid.ldap.protocol.BindResponseProtocolOp;
034    import com.unboundid.ldap.protocol.CompareRequestProtocolOp;
035    import com.unboundid.ldap.protocol.CompareResponseProtocolOp;
036    import com.unboundid.ldap.protocol.DeleteRequestProtocolOp;
037    import com.unboundid.ldap.protocol.DeleteResponseProtocolOp;
038    import com.unboundid.ldap.protocol.ExtendedRequestProtocolOp;
039    import com.unboundid.ldap.protocol.ExtendedResponseProtocolOp;
040    import com.unboundid.ldap.protocol.LDAPMessage;
041    import com.unboundid.ldap.protocol.ModifyRequestProtocolOp;
042    import com.unboundid.ldap.protocol.ModifyResponseProtocolOp;
043    import com.unboundid.ldap.protocol.ModifyDNRequestProtocolOp;
044    import com.unboundid.ldap.protocol.ModifyDNResponseProtocolOp;
045    import com.unboundid.ldap.protocol.SearchRequestProtocolOp;
046    import com.unboundid.ldap.protocol.SearchResultDoneProtocolOp;
047    import com.unboundid.ldap.sdk.Control;
048    import com.unboundid.ldap.sdk.LDAPException;
049    import com.unboundid.ldap.sdk.OperationType;
050    import com.unboundid.ldap.sdk.ResultCode;
051    import com.unboundid.util.Debug;
052    import com.unboundid.util.NotMutable;
053    import com.unboundid.util.StaticUtils;
054    import com.unboundid.util.ThreadSafety;
055    import com.unboundid.util.ThreadSafetyLevel;
056    import com.unboundid.util.Validator;
057    
058    import static com.unboundid.ldap.listener.ListenerMessages.*;
059    
060    
061    
062    /**
063     * This class provides an implementation of an LDAP listener request handler
064     * that can be used to limit the number of requests that may be processed
065     * concurrently.  It uses one or more {@link Semaphore} instances to limit the
066     * number of requests that may be processed at any time, and provides the
067     * ability to impose limiting on a per-operation-type basis.
068     */
069    @NotMutable()
070    @ThreadSafety(level=ThreadSafetyLevel.COMPLETELY_THREADSAFE)
071    public final class ConcurrentRequestLimiterRequestHandler
072           extends LDAPListenerRequestHandler
073    {
074      // The downstream request handler that will be used to process the requests
075      // after any appropriate concurrent request limiting has been performed.
076      private final LDAPListenerRequestHandler downstreamRequestHandler;
077    
078      // A timeout value (expressed in milliseconds) that will cause the operation
079      // to be rejected rather than processed if the associated semaphore cannot be
080      // acquired in this length of time.
081      private final long rejectTimeoutMillis;
082    
083      // The semaphores that will be used for each type of operation.
084      private final Semaphore abandonSemaphore;
085      private final Semaphore addSemaphore;
086      private final Semaphore bindSemaphore;
087      private final Semaphore compareSemaphore;
088      private final Semaphore deleteSemaphore;
089      private final Semaphore extendedSemaphore;
090      private final Semaphore modifySemaphore;
091      private final Semaphore modifyDNSemaphore;
092      private final Semaphore searchSemaphore;
093    
094    
095    
096      /**
097       * Creates a new concurrent request limiter request handler that will impose
098       * the specified limit on the number of operations that may be in progress at
099       * any time.  The limit will be enforced for all types of operations except
100       * abandon and unbind operations, which will not be limited.
101       *
102       * @param  downstreamRequestHandler  The downstream request handler that will
103       *                                   be used to actually process the requests
104       *                                   after any appropriate limiting has been
105       *                                   performed.
106       * @param  maxConcurrentRequests     The maximum number of requests that may
107       *                                   be processed at any given time.  This
108       *                                   limit will be enforced for all operation
109       *                                   types except abandon and unbind, which
110       *                                   will not be limited.
111       * @param  rejectTimeoutMillis       A timeout value (expressed in
112       *                                   milliseconds) that will cause a requested
113       *                                   operation to be rejected rather than
114       *                                   processed if the associate semaphore
115       *                                   cannot be acquired in this length of
116       *                                   time.  A value of zero indicates that the
117       *                                   operation should be rejected immediately
118       *                                   if the maximum number of concurrent
119       *                                   requests are already in progress.  A
120       *                                   value that is less than zero indicates
121       *                                   that no timeout should be imposed and
122       *                                   that requests should be forced to wait as
123       *                                   long as necessary until they can be
124       *                                   processed.
125       */
126      public ConcurrentRequestLimiterRequestHandler(
127                  final LDAPListenerRequestHandler downstreamRequestHandler,
128                  final int maxConcurrentRequests, final long rejectTimeoutMillis)
129      {
130        this(downstreamRequestHandler, new Semaphore(maxConcurrentRequests),
131             rejectTimeoutMillis);
132      }
133    
134    
135    
136      /**
137       * Creates a new concurrent request limiter request handler that will use the
138       * provided semaphore to limit on the number of operations that may be in
139       * progress at any time.  The limit will be enforced for all types of
140       * operations except abandon and unbind operations, which will not be limited.
141       *
142       * @param  downstreamRequestHandler  The downstream request handler that will
143       *                                   be used to actually process the requests
144       *                                   after any appropriate limiting has been
145       *                                   performed.
146       * @param  semaphore                 The semaphore that will be used to limit
147       *                                   the number of concurrent operations in
148       *                                   progress, for all operation types except
149       *                                   abandon and unbind.
150       * @param  rejectTimeoutMillis       A timeout value (expressed in
151       *                                   milliseconds) that will cause a requested
152       *                                   operation to be rejected rather than
153       *                                   processed if the associate semaphore
154       *                                   cannot be acquired in this length of
155       *                                   time.  A value of zero indicates that the
156       *                                   operation should be rejected immediately
157       *                                   if the maximum number of concurrent
158       *                                   requests are already in progress.  A
159       *                                   value that is less than zero indicates
160       *                                   that no timeout should be imposed and
161       *                                   that requests should be forced to wait as
162       *                                   long as necessary until they can be
163       *                                   processed.
164       */
165      public ConcurrentRequestLimiterRequestHandler(
166                  final LDAPListenerRequestHandler downstreamRequestHandler,
167                  final Semaphore semaphore, final long rejectTimeoutMillis)
168      {
169        this(downstreamRequestHandler, null, semaphore, semaphore, semaphore,
170             semaphore, semaphore, semaphore, semaphore, semaphore,
171             rejectTimeoutMillis);
172      }
173    
174    
175    
176      /**
177       * Creates a new concurrent request limiter request handler that can use the
178       * provided semaphore instances to limit the number of operations in progress
179       * concurrently for each type of operation.  The same semaphore instance can
180       * be provided for multiple operation types if performance for those
181       * operations should be limited in aggregate rather than individually (e.g.,
182       * if you don't want the total combined number of search and modify operations
183       * in progress at any time to exceed a given threshold, then you could provide
184       * the same semaphore instance for the {@code modifySemaphore} and
185       * {@code searchSemaphore} arguments).
186       *
187       * @param  downstreamRequestHandler  The downstream request handler that will
188       *                                   be used to actually process the requests
189       *                                   after any appropriate rate limiting has
190       *                                   been performed.  It must not be
191       *                                   {@code null}.
192       * @param  abandonSemaphore          The semaphore to use when processing
193       *                                   abandon operations.  It may be
194       *                                   {@code null} if no concurrent request
195       *                                   limiting should be performed for abandon
196       *                                   operations.
197       * @param  addSemaphore              The semaphore to use when processing add
198       *                                   operations.  It may be {@code null} if no
199       *                                   concurrent request limiting should be
200       *                                   performed for add operations.
201       * @param  bindSemaphore             The semaphore to use when processing
202       *                                   bind operations.  It may be
203       *                                   {@code null} if no concurrent request
204       *                                   limiting should be performed for bind
205       *                                   operations.
206       * @param  compareSemaphore          The semaphore to use when processing
207       *                                   compare operations.  It may be
208       *                                   {@code null} if no concurrent request
209       *                                   limiting should be performed for compare
210       *                                   operations.
211       * @param  deleteSemaphore           The semaphore to use when processing
212       *                                   delete operations.  It may be
213       *                                   {@code null} if no concurrent request
214       *                                   limiting should be performed for delete
215       *                                   operations.
216       * @param  extendedSemaphore         The semaphore to use when processing
217       *                                   extended operations.  It may be
218       *                                   {@code null} if no concurrent request
219       *                                   limiting should be performed for extended
220       *                                   operations.
221       * @param  modifySemaphore           The semaphore to use when processing
222       *                                   modify operations.  It may be
223       *                                   {@code null} if no concurrent request
224       *                                   limiting should be performed for modify
225       *                                   operations.
226       * @param  modifyDNSemaphore         The semaphore to use when processing
227       *                                   modify DN operations.  It may be
228       *                                   {@code null} if no concurrent request
229       *                                   limiting should be performed for modify
230       *                                   DN operations.
231       * @param  searchSemaphore           The semaphore to use when processing
232       *                                   search operations.  It may be
233       *                                   {@code null} if no concurrent request
234       *                                   limiting should be performed for search
235       *                                   operations.
236       * @param  rejectTimeoutMillis       A timeout value (expressed in
237       *                                   milliseconds) that will cause a requested
238       *                                   operation to be rejected rather than
239       *                                   processed if the associate semaphore
240       *                                   cannot be acquired in this length of
241       *                                   time.  A value of zero indicates that the
242       *                                   operation should be rejected immediately
243       *                                   if the maximum number of concurrent
244       *                                   requests are already in progress.  A
245       *                                   value that is less than zero indicates
246       *                                   that no timeout should be imposed and
247       *                                   that requests should be forced to wait as
248       *                                   long as necessary until they can be
249       *                                   processed.
250       */
251      public ConcurrentRequestLimiterRequestHandler(
252                  final LDAPListenerRequestHandler downstreamRequestHandler,
253                  final Semaphore abandonSemaphore,
254                  final Semaphore addSemaphore,
255                  final Semaphore bindSemaphore,
256                  final Semaphore compareSemaphore,
257                  final Semaphore deleteSemaphore,
258                  final Semaphore extendedSemaphore,
259                  final Semaphore modifySemaphore,
260                  final Semaphore modifyDNSemaphore,
261                  final Semaphore searchSemaphore,
262                  final long rejectTimeoutMillis)
263      {
264        Validator.ensureNotNull(downstreamRequestHandler);
265    
266        this.downstreamRequestHandler = downstreamRequestHandler;
267        this.abandonSemaphore         = abandonSemaphore;
268        this.addSemaphore             = addSemaphore;
269        this.bindSemaphore            = bindSemaphore;
270        this.compareSemaphore         = compareSemaphore;
271        this.deleteSemaphore          = deleteSemaphore;
272        this.extendedSemaphore        = extendedSemaphore;
273        this.modifySemaphore          = modifySemaphore;
274        this.modifyDNSemaphore        = modifyDNSemaphore;
275        this.searchSemaphore          = searchSemaphore;
276    
277        if (rejectTimeoutMillis >= 0L)
278        {
279          this.rejectTimeoutMillis = rejectTimeoutMillis;
280        }
281        else
282        {
283          this.rejectTimeoutMillis = (long) Integer.MAX_VALUE;
284        }
285      }
286    
287    
288    
289      /**
290       * {@inheritDoc}
291       */
292      @Override()
293      public ConcurrentRequestLimiterRequestHandler newInstance(
294                  final LDAPListenerClientConnection connection)
295             throws LDAPException
296      {
297        return new ConcurrentRequestLimiterRequestHandler(
298             downstreamRequestHandler.newInstance(connection), abandonSemaphore,
299             addSemaphore, bindSemaphore, compareSemaphore, deleteSemaphore,
300             extendedSemaphore, modifySemaphore, modifyDNSemaphore,
301             searchSemaphore, rejectTimeoutMillis);
302      }
303    
304    
305    
306      /**
307       * {@inheritDoc}
308       */
309      @Override()
310      public void processAbandonRequest(final int messageID,
311                                        final AbandonRequestProtocolOp request,
312                                        final List<Control> controls)
313      {
314        try
315        {
316          acquirePermit(abandonSemaphore, OperationType.ABANDON);
317        }
318        catch (final LDAPException le)
319        {
320          Debug.debugException(le);
321          return;
322        }
323    
324        try
325        {
326          downstreamRequestHandler.processAbandonRequest(messageID, request,
327               controls);
328        }
329        finally
330        {
331          releasePermit(abandonSemaphore);
332        }
333      }
334    
335    
336    
337      /**
338       * {@inheritDoc}
339       */
340      @Override()
341      public LDAPMessage processAddRequest(final int messageID,
342                                           final AddRequestProtocolOp request,
343                                           final List<Control> controls)
344      {
345        try
346        {
347          acquirePermit(addSemaphore, OperationType.ADD);
348        }
349        catch (final LDAPException le)
350        {
351          Debug.debugException(le);
352          return new LDAPMessage(messageID,
353               new AddResponseProtocolOp(le.toLDAPResult()));
354        }
355    
356        try
357        {
358          return downstreamRequestHandler.processAddRequest(messageID, request,
359               controls);
360        }
361        finally
362        {
363          releasePermit(addSemaphore);
364        }
365      }
366    
367    
368    
369      /**
370       * {@inheritDoc}
371       */
372      @Override()
373      public LDAPMessage processBindRequest(final int messageID,
374                                            final BindRequestProtocolOp request,
375                                            final List<Control> controls)
376      {
377        try
378        {
379          acquirePermit(bindSemaphore, OperationType.BIND);
380        }
381        catch (final LDAPException le)
382        {
383          Debug.debugException(le);
384          return new LDAPMessage(messageID,
385               new BindResponseProtocolOp(le.toLDAPResult()));
386        }
387    
388        try
389        {
390          return downstreamRequestHandler.processBindRequest(messageID, request,
391               controls);
392        }
393        finally
394        {
395          releasePermit(bindSemaphore);
396        }
397      }
398    
399    
400    
401      /**
402       * {@inheritDoc}
403       */
404      @Override()
405      public LDAPMessage processCompareRequest(final int messageID,
406                              final CompareRequestProtocolOp request,
407                              final List<Control> controls)
408      {
409        try
410        {
411          acquirePermit(compareSemaphore, OperationType.COMPARE);
412        }
413        catch (final LDAPException le)
414        {
415          Debug.debugException(le);
416          return new LDAPMessage(messageID,
417               new CompareResponseProtocolOp(le.toLDAPResult()));
418        }
419    
420        try
421        {
422          return downstreamRequestHandler.processCompareRequest(messageID, request,
423               controls);
424        }
425        finally
426        {
427          releasePermit(compareSemaphore);
428        }
429      }
430    
431    
432    
433      /**
434       * {@inheritDoc}
435       */
436      @Override()
437      public LDAPMessage processDeleteRequest(final int messageID,
438                                              final DeleteRequestProtocolOp request,
439                                              final List<Control> controls)
440      {
441        try
442        {
443          acquirePermit(deleteSemaphore, OperationType.DELETE);
444        }
445        catch (final LDAPException le)
446        {
447          Debug.debugException(le);
448          return new LDAPMessage(messageID,
449               new DeleteResponseProtocolOp(le.toLDAPResult()));
450        }
451    
452        try
453        {
454          return downstreamRequestHandler.processDeleteRequest(messageID, request,
455               controls);
456        }
457        finally
458        {
459          releasePermit(deleteSemaphore);
460        }
461      }
462    
463    
464    
465      /**
466       * {@inheritDoc}
467       */
468      @Override()
469      public LDAPMessage processExtendedRequest(final int messageID,
470                              final ExtendedRequestProtocolOp request,
471                              final List<Control> controls)
472      {
473        try
474        {
475          acquirePermit(extendedSemaphore, OperationType.EXTENDED);
476        }
477        catch (final LDAPException le)
478        {
479          Debug.debugException(le);
480          return new LDAPMessage(messageID,
481               new ExtendedResponseProtocolOp(le.toLDAPResult()));
482        }
483    
484        try
485        {
486          return downstreamRequestHandler.processExtendedRequest(messageID, request,
487               controls);
488        }
489        finally
490        {
491          releasePermit(extendedSemaphore);
492        }
493      }
494    
495    
496    
497      /**
498       * {@inheritDoc}
499       */
500      @Override()
501      public LDAPMessage processModifyRequest(final int messageID,
502                                              final ModifyRequestProtocolOp request,
503                                              final List<Control> controls)
504      {
505        try
506        {
507          acquirePermit(modifySemaphore, OperationType.MODIFY);
508        }
509        catch (final LDAPException le)
510        {
511          Debug.debugException(le);
512          return new LDAPMessage(messageID,
513               new ModifyResponseProtocolOp(le.toLDAPResult()));
514        }
515    
516        try
517        {
518          return downstreamRequestHandler.processModifyRequest(messageID, request,
519               controls);
520        }
521        finally
522        {
523          releasePermit(modifySemaphore);
524        }
525      }
526    
527    
528    
529      /**
530       * {@inheritDoc}
531       */
532      @Override()
533      public LDAPMessage processModifyDNRequest(final int messageID,
534                              final ModifyDNRequestProtocolOp request,
535                              final List<Control> controls)
536      {
537        try
538        {
539          acquirePermit(modifyDNSemaphore, OperationType.MODIFY_DN);
540        }
541        catch (final LDAPException le)
542        {
543          Debug.debugException(le);
544          return new LDAPMessage(messageID,
545               new ModifyDNResponseProtocolOp(le.toLDAPResult()));
546        }
547    
548        try
549        {
550          return downstreamRequestHandler.processModifyDNRequest(messageID, request,
551               controls);
552        }
553        finally
554        {
555          releasePermit(modifyDNSemaphore);
556        }
557      }
558    
559    
560    
561      /**
562       * {@inheritDoc}
563       */
564      @Override()
565      public LDAPMessage processSearchRequest(final int messageID,
566                                              final SearchRequestProtocolOp request,
567                                              final List<Control> controls)
568      {
569        try
570        {
571          acquirePermit(searchSemaphore, OperationType.SEARCH);
572        }
573        catch (final LDAPException le)
574        {
575          Debug.debugException(le);
576          return new LDAPMessage(messageID,
577               new SearchResultDoneProtocolOp(le.toLDAPResult()));
578        }
579    
580        try
581        {
582          return downstreamRequestHandler.processSearchRequest(messageID, request,
583               controls);
584        }
585        finally
586        {
587          releasePermit(searchSemaphore);
588        }
589      }
590    
591    
592    
593      /**
594       * Acquires a permit from the provided semaphore.
595       *
596       * @param  semaphore      The semaphore from which to acquire a permit.  It
597       *                        may be {@code null} if no semaphore is needed for
598       *                        the associated operation type.
599       * @param  operationType  The type of operation
600       *
601       * @throws  LDAPException  If it was not possible to acquire a permit from the
602       *                         provided semaphore.
603       */
604      private void acquirePermit(final Semaphore semaphore,
605                                 final OperationType operationType)
606              throws LDAPException
607      {
608        if (semaphore == null)
609        {
610          return;
611        }
612    
613        try
614        {
615          if (rejectTimeoutMillis == 0L)
616          {
617            if (! semaphore.tryAcquire())
618            {
619              throw new LDAPException(ResultCode.BUSY,
620                   ERR_CONCURRENT_LIMITER_REQUEST_HANDLER_NO_TIMEOUT.get(
621                        operationType.name()));
622            }
623          }
624          else
625          {
626            if (! semaphore.tryAcquire(rejectTimeoutMillis, TimeUnit.MILLISECONDS))
627            {
628              throw new LDAPException(ResultCode.BUSY,
629                   ERR_CONCURRENT_LIMITER_REQUEST_HANDLER_TIMEOUT.get(
630                        operationType.name(), rejectTimeoutMillis));
631            }
632          }
633        }
634        catch (final LDAPException le)
635        {
636          throw le;
637        }
638        catch (final Exception e)
639        {
640          Debug.debugException(e);
641          throw new LDAPException(ResultCode.OTHER,
642               ERR_CONCURRENT_LIMITER_REQUEST_HANDLER_SEMAPHORE_EXCEPTION.get(
643                    operationType.name(), StaticUtils.getExceptionMessage(e)),
644               e);
645        }
646      }
647    
648    
649    
650      /**
651       * Releases a permit back to the provided semaphore.
652       *
653       * @param  semaphore  The semaphore to which the permit should be released.
654       *                    It may be {@code null} if no semaphore is needed for the
655       *                    associated operation type.
656       */
657      private static void releasePermit(final Semaphore semaphore)
658      {
659        if (semaphore != null)
660        {
661          semaphore.release();
662        }
663      }
664    }