001/*
002 * Copyright 2009-2022 Ping Identity Corporation
003 * All Rights Reserved.
004 */
005/*
006 * Copyright 2009-2022 Ping Identity Corporation
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *    http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020/*
021 * Copyright (C) 2009-2022 Ping Identity Corporation
022 *
023 * This program is free software; you can redistribute it and/or modify
024 * it under the terms of the GNU General Public License (GPLv2 only)
025 * or the terms of the GNU Lesser General Public License (LGPLv2.1 only)
026 * as published by the Free Software Foundation.
027 *
028 * This program is distributed in the hope that it will be useful,
029 * but WITHOUT ANY WARRANTY; without even the implied warranty of
030 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
031 * GNU General Public License for more details.
032 *
033 * You should have received a copy of the GNU General Public License
034 * along with this program; if not, see <http://www.gnu.org/licenses>.
035 */
036package com.unboundid.ldap.sdk;
037
038
039
040import java.util.concurrent.LinkedBlockingQueue;
041import java.util.concurrent.TimeUnit;
042import java.util.concurrent.atomic.AtomicBoolean;
043import java.util.concurrent.atomic.AtomicReference;
044
045import com.unboundid.util.Debug;
046import com.unboundid.util.InternalUseOnly;
047import com.unboundid.util.NotNull;
048import com.unboundid.util.Nullable;
049import com.unboundid.util.ThreadSafety;
050import com.unboundid.util.ThreadSafetyLevel;
051import com.unboundid.util.Validator;
052
053import static com.unboundid.ldap.sdk.LDAPMessages.*;
054
055
056
057/**
058 * This class provides an {@link EntrySource} that will read entries matching a
059 * given set of search criteria from an LDAP directory server.  It may
060 * optionally close the associated connection after all entries have been read.
061 * <BR><BR>
062 * This implementation processes the search asynchronously, which provides two
063 * benefits:
064 * <UL>
065 *   <LI>It makes it easier to provide a throttling mechanism to prevent the
066 *       entries from piling up and causing the client to run out of memory if
067 *       the server returns them faster than the client can process them.  If
068 *       this occurs, then the client will queue up a small number of entries
069 *       but will then push back against the server to block it from sending
070 *       additional entries until the client can catch up.  In this case, no
071 *       entries should be lost, although some servers may impose limits on how
072 *       long a search may be active or other forms of constraints.</LI>
073 *   <LI>It makes it possible to abandon the search if the entry source is no
074 *       longer needed (as signified by calling the {@link #close} method) and
075 *       the caller intends to stop iterating through the results.</LI>
076 * </UL>
077 * <H2>Example</H2>
078 * The following example demonstrates the process that may be used for iterating
079 * across all entries containing the {@code person} object class using the LDAP
080 * entry source API:
081 * <PRE>
082 * SearchRequest searchRequest = new SearchRequest("dc=example,dc=com",
083 *      SearchScope.SUB, Filter.createEqualityFilter("objectClass", "person"));
084 * LDAPEntrySource entrySource = new LDAPEntrySource(connection,
085 *      searchRequest, false);
086 *
087 * int entriesRead = 0;
088 * int referencesRead = 0;
089 * int exceptionsCaught = 0;
090 * try
091 * {
092 *   while (true)
093 *   {
094 *     try
095 *     {
096 *       Entry entry = entrySource.nextEntry();
097 *       if (entry == null)
098 *       {
099 *         // There are no more entries to be read.
100 *         break;
101 *       }
102 *       else
103 *       {
104 *         // Do something with the entry here.
105 *         entriesRead++;
106 *       }
107 *     }
108 *     catch (SearchResultReferenceEntrySourceException e)
109 *     {
110 *       // The directory server returned a search result reference.
111 *       SearchResultReference searchReference = e.getSearchReference();
112 *       referencesRead++;
113 *     }
114 *     catch (EntrySourceException e)
115 *     {
116 *       // Some kind of problem was encountered (e.g., the connection is no
117 *       // longer valid).  See if we can continue reading entries.
118 *       exceptionsCaught++;
119 *       if (! e.mayContinueReading())
120 *       {
121 *         break;
122 *       }
123 *     }
124 *   }
125 * }
126 * finally
127 * {
128 *   entrySource.close();
129 * }
130 * </PRE>
131 */
132@ThreadSafety(level=ThreadSafetyLevel.NOT_THREADSAFE)
133public final class LDAPEntrySource
134       extends EntrySource
135       implements AsyncSearchResultListener
136{
137  /**
138   * The bogus entry that will be used to signify the end of the results.
139   */
140  @NotNull private static final String END_OF_RESULTS = "END OF RESULTS";
141
142
143
144  /**
145   * The serial version UID for this serializable class.
146   */
147  private static final long serialVersionUID = 1080386705549149135L;
148
149
150
151  // The request ID associated with the asynchronous search.
152  @NotNull private final AsyncRequestID asyncRequestID;
153
154  // Indicates whether this entry source has been closed.
155  @NotNull private final AtomicBoolean closed;
156
157  // The search result for the search operation.
158  @NotNull private final AtomicReference<SearchResult> searchResult;
159
160  // Indicates whether to close the connection when this entry source is closed.
161  private final boolean closeConnection;
162
163  // The connection that will be used to read the entries.
164  @NotNull private final LDAPConnection connection;
165
166  // The queue from which entries will be read.
167  @NotNull private final LinkedBlockingQueue<Object> queue;
168
169
170
171  /**
172   * Creates a new LDAP entry source with the provided information.
173   *
174   * @param  connection       The connection to the directory server from which
175   *                          the entries will be read.  It must not be
176   *                          {@code null}.
177   * @param  searchRequest    The search request that will be used to identify
178   *                          which entries should be returned.  It must not be
179   *                          {@code null}, and it must not be configured with a
180   *                          {@link SearchResultListener}.
181   * @param  closeConnection  Indicates whether the provided connection should
182   *                          be closed whenever all of the entries have been
183   *                          read, or if the {@link #close} method is called.
184   *
185   * @throws  LDAPException  If there is a problem with the provided search
186   *                         request or when trying to communicate with the
187   *                         directory server over the provided connection.
188   */
189  public LDAPEntrySource(@NotNull final LDAPConnection connection,
190                         @NotNull final SearchRequest searchRequest,
191                         final boolean closeConnection)
192         throws LDAPException
193  {
194    this(connection, searchRequest, closeConnection, 100);
195  }
196
197
198
199  /**
200   * Creates a new LDAP entry source with the provided information.
201   *
202   * @param  connection       The connection to the directory server from which
203   *                          the entries will be read.  It must not be
204   *                          {@code null}.
205   * @param  searchRequest    The search request that will be used to identify
206   *                          which entries should be returned.  It must not be
207   *                          {@code null}, and it must not be configured with a
208   *                          {@link SearchResultListener}.
209   * @param  closeConnection  Indicates whether the provided connection should
210   *                          be closed whenever all of the entries have been
211   *                          read, or if the {@link #close} method is called.
212   * @param  queueSize        The size of the internal queue used to hold search
213   *                          result entries until they can be consumed by the
214   *                          {@link #nextEntry} method.  The value must be
215   *                          greater than zero.
216   *
217   * @throws  LDAPException  If there is a problem with the provided search
218   *                         request or when trying to communicate with the
219   *                         directory server over the provided connection.
220   */
221  public LDAPEntrySource(@NotNull final LDAPConnection connection,
222                         @NotNull final SearchRequest searchRequest,
223                         final boolean closeConnection,
224                         final int queueSize)
225         throws LDAPException
226  {
227    Validator.ensureNotNull(connection, searchRequest);
228    Validator.ensureTrue(queueSize > 0,
229         "LDAPEntrySource.queueSize must be greater than 0.");
230
231    this.connection      = connection;
232    this.closeConnection = closeConnection;
233
234    if (searchRequest.getSearchResultListener() != null)
235    {
236      throw new LDAPException(ResultCode.PARAM_ERROR,
237                              ERR_LDAP_ENTRY_SOURCE_REQUEST_HAS_LISTENER.get());
238    }
239
240    closed       = new AtomicBoolean(false);
241    searchResult = new AtomicReference<>();
242    queue        = new LinkedBlockingQueue<>(queueSize);
243
244    final SearchRequest r = new SearchRequest(this, searchRequest.getControls(),
245         searchRequest.getBaseDN(), searchRequest.getScope(),
246         searchRequest.getDereferencePolicy(), searchRequest.getSizeLimit(),
247         searchRequest.getTimeLimitSeconds(), searchRequest.typesOnly(),
248         searchRequest.getFilter(), searchRequest.getAttributes());
249    asyncRequestID = connection.asyncSearch(r);
250  }
251
252
253
254  /**
255   * {@inheritDoc}
256   */
257  @Override()
258  @Nullable()
259  public Entry nextEntry()
260         throws EntrySourceException
261  {
262    while (true)
263    {
264      if (closed.get() && queue.isEmpty())
265      {
266        return null;
267      }
268
269      final Object o;
270      try
271      {
272        o = queue.poll(10L, TimeUnit.MILLISECONDS);
273      }
274      catch (final InterruptedException ie)
275      {
276        Debug.debugException(ie);
277        Thread.currentThread().interrupt();
278        throw new EntrySourceException(true,
279             ERR_LDAP_ENTRY_SOURCE_NEXT_ENTRY_INTERRUPTED.get(), ie);
280      }
281
282      if (o != null)
283      {
284        if (o == END_OF_RESULTS)
285        {
286          return null;
287        }
288        else if (o instanceof Entry)
289        {
290          return (Entry) o;
291        }
292        else
293        {
294          throw (EntrySourceException) o;
295        }
296      }
297    }
298  }
299
300
301
302  /**
303   * {@inheritDoc}
304   */
305  @Override()
306  public void close()
307  {
308    closeInternal(true);
309  }
310
311
312
313  /**
314   * Closes this LDAP entry source.
315   *
316   * @param  abandon  Indicates whether to attempt to abandon the search.
317   */
318  private void closeInternal(final boolean abandon)
319  {
320    addToQueue(END_OF_RESULTS);
321
322    if (closed.compareAndSet(false, true))
323    {
324      if (abandon)
325      {
326        try
327        {
328          connection.abandon(asyncRequestID);
329        }
330        catch (final Exception e)
331        {
332          Debug.debugException(e);
333        }
334      }
335
336      if (closeConnection)
337      {
338        connection.close();
339      }
340    }
341  }
342
343
344
345  /**
346   * Retrieves the search result for the search operation, if available.  It
347   * will not be available until the search has completed (as indicated by a
348   * {@code null} return value from the {@link #nextEntry} method).
349   *
350   * @return  The search result for the search operation, or {@code null} if it
351   *          is not available (e.g., because the search has not yet completed).
352   */
353  @Nullable()
354  public SearchResult getSearchResult()
355  {
356    return searchResult.get();
357  }
358
359
360
361  /**
362   * {@inheritDoc}  This is intended for internal use only and should not be
363   * called by anything outside of the LDAP SDK itself.
364   */
365  @InternalUseOnly()
366  @Override()
367  public void searchEntryReturned(@NotNull final SearchResultEntry searchEntry)
368  {
369    addToQueue(searchEntry);
370  }
371
372
373
374  /**
375   * {@inheritDoc}  This is intended for internal use only and should not be
376   * called by anything outside of the LDAP SDK itself.
377   */
378  @InternalUseOnly()
379  @Override()
380  public void searchReferenceReturned(
381                   @NotNull final SearchResultReference searchReference)
382  {
383    addToQueue(new SearchResultReferenceEntrySourceException(searchReference));
384  }
385
386
387
388  /**
389   * {@inheritDoc}  This is intended for internal use only and should not be
390   * called by anything outside of the LDAP SDK itself.
391   */
392  @InternalUseOnly()
393  @Override()
394  public void searchResultReceived(@NotNull final AsyncRequestID requestID,
395                                   @NotNull final SearchResult searchResult)
396  {
397    this.searchResult.set(searchResult);
398
399    if (! searchResult.getResultCode().equals(ResultCode.SUCCESS))
400    {
401      addToQueue(new EntrySourceException(false,
402           new LDAPSearchException(searchResult)));
403    }
404
405    closeInternal(false);
406  }
407
408
409
410  /**
411   * Adds the provided object to the queue, waiting as long as needed until it
412   * has been added.
413   *
414   * @param  o  The object to be added.  It must not be {@code null}.
415   */
416  private void addToQueue(@NotNull final Object o)
417  {
418    while (true)
419    {
420      if (closed.get())
421      {
422        return;
423      }
424
425      try
426      {
427        if (queue.offer(o, 100L, TimeUnit.MILLISECONDS))
428        {
429          return;
430        }
431      }
432      catch (final InterruptedException ie)
433      {
434        Debug.debugException(ie);
435      }
436    }
437  }
438}