001/* 002 * Copyright 2009-2024 Ping Identity Corporation 003 * All Rights Reserved. 004 */ 005/* 006 * Copyright 2009-2024 Ping Identity Corporation 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020/* 021 * Copyright (C) 2009-2024 Ping Identity Corporation 022 * 023 * This program is free software; you can redistribute it and/or modify 024 * it under the terms of the GNU General Public License (GPLv2 only) 025 * or the terms of the GNU Lesser General Public License (LGPLv2.1 only) 026 * as published by the Free Software Foundation. 027 * 028 * This program is distributed in the hope that it will be useful, 029 * but WITHOUT ANY WARRANTY; without even the implied warranty of 030 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 031 * GNU General Public License for more details. 032 * 033 * You should have received a copy of the GNU General Public License 034 * along with this program; if not, see <http://www.gnu.org/licenses>. 035 */ 036package com.unboundid.ldap.sdk; 037 038 039 040import java.util.concurrent.LinkedBlockingQueue; 041import java.util.concurrent.TimeUnit; 042import java.util.concurrent.atomic.AtomicBoolean; 043import java.util.concurrent.atomic.AtomicReference; 044 045import com.unboundid.util.Debug; 046import com.unboundid.util.InternalUseOnly; 047import com.unboundid.util.NotNull; 048import com.unboundid.util.Nullable; 049import com.unboundid.util.ThreadSafety; 050import com.unboundid.util.ThreadSafetyLevel; 051import com.unboundid.util.Validator; 052 053import static com.unboundid.ldap.sdk.LDAPMessages.*; 054 055 056 057/** 058 * This class provides an {@link EntrySource} that will read entries matching a 059 * given set of search criteria from an LDAP directory server. It may 060 * optionally close the associated connection after all entries have been read. 061 * <BR><BR> 062 * This implementation processes the search asynchronously, which provides two 063 * benefits: 064 * <UL> 065 * <LI>It makes it easier to provide a throttling mechanism to prevent the 066 * entries from piling up and causing the client to run out of memory if 067 * the server returns them faster than the client can process them. If 068 * this occurs, then the client will queue up a small number of entries 069 * but will then push back against the server to block it from sending 070 * additional entries until the client can catch up. In this case, no 071 * entries should be lost, although some servers may impose limits on how 072 * long a search may be active or other forms of constraints.</LI> 073 * <LI>It makes it possible to abandon the search if the entry source is no 074 * longer needed (as signified by calling the {@link #close} method) and 075 * the caller intends to stop iterating through the results.</LI> 076 * </UL> 077 * <H2>Example</H2> 078 * The following example demonstrates the process that may be used for iterating 079 * across all entries containing the {@code person} object class using the LDAP 080 * entry source API: 081 * <PRE> 082 * SearchRequest searchRequest = new SearchRequest("dc=example,dc=com", 083 * SearchScope.SUB, Filter.createEqualityFilter("objectClass", "person")); 084 * LDAPEntrySource entrySource = new LDAPEntrySource(connection, 085 * searchRequest, false); 086 * 087 * int entriesRead = 0; 088 * int referencesRead = 0; 089 * int exceptionsCaught = 0; 090 * try 091 * { 092 * while (true) 093 * { 094 * try 095 * { 096 * Entry entry = entrySource.nextEntry(); 097 * if (entry == null) 098 * { 099 * // There are no more entries to be read. 100 * break; 101 * } 102 * else 103 * { 104 * // Do something with the entry here. 105 * entriesRead++; 106 * } 107 * } 108 * catch (SearchResultReferenceEntrySourceException e) 109 * { 110 * // The directory server returned a search result reference. 111 * SearchResultReference searchReference = e.getSearchReference(); 112 * referencesRead++; 113 * } 114 * catch (EntrySourceException e) 115 * { 116 * // Some kind of problem was encountered (e.g., the connection is no 117 * // longer valid). See if we can continue reading entries. 118 * exceptionsCaught++; 119 * if (! e.mayContinueReading()) 120 * { 121 * break; 122 * } 123 * } 124 * } 125 * } 126 * finally 127 * { 128 * entrySource.close(); 129 * } 130 * </PRE> 131 */ 132@ThreadSafety(level=ThreadSafetyLevel.NOT_THREADSAFE) 133public final class LDAPEntrySource 134 extends EntrySource 135 implements AsyncSearchResultListener 136{ 137 /** 138 * The bogus entry that will be used to signify the end of the results. 139 */ 140 @NotNull private static final String END_OF_RESULTS = "END OF RESULTS"; 141 142 143 144 /** 145 * The serial version UID for this serializable class. 146 */ 147 private static final long serialVersionUID = 1080386705549149135L; 148 149 150 151 // The request ID associated with the asynchronous search. 152 @NotNull private final AsyncRequestID asyncRequestID; 153 154 // Indicates whether this entry source has been closed. 155 @NotNull private final AtomicBoolean closed; 156 157 // The search result for the search operation. 158 @NotNull private final AtomicReference<SearchResult> searchResult; 159 160 // Indicates whether to close the connection when this entry source is closed. 161 private final boolean closeConnection; 162 163 // The connection that will be used to read the entries. 164 @NotNull private final LDAPConnection connection; 165 166 // The queue from which entries will be read. 167 @NotNull private final LinkedBlockingQueue<Object> queue; 168 169 170 171 /** 172 * Creates a new LDAP entry source with the provided information. 173 * 174 * @param connection The connection to the directory server from which 175 * the entries will be read. It must not be 176 * {@code null}. 177 * @param searchRequest The search request that will be used to identify 178 * which entries should be returned. It must not be 179 * {@code null}, and it must not be configured with a 180 * {@link SearchResultListener}. 181 * @param closeConnection Indicates whether the provided connection should 182 * be closed whenever all of the entries have been 183 * read, or if the {@link #close} method is called. 184 * 185 * @throws LDAPException If there is a problem with the provided search 186 * request or when trying to communicate with the 187 * directory server over the provided connection. 188 */ 189 public LDAPEntrySource(@NotNull final LDAPConnection connection, 190 @NotNull final SearchRequest searchRequest, 191 final boolean closeConnection) 192 throws LDAPException 193 { 194 this(connection, searchRequest, closeConnection, 100); 195 } 196 197 198 199 /** 200 * Creates a new LDAP entry source with the provided information. 201 * 202 * @param connection The connection to the directory server from which 203 * the entries will be read. It must not be 204 * {@code null}. 205 * @param searchRequest The search request that will be used to identify 206 * which entries should be returned. It must not be 207 * {@code null}, and it must not be configured with a 208 * {@link SearchResultListener}. 209 * @param closeConnection Indicates whether the provided connection should 210 * be closed whenever all of the entries have been 211 * read, or if the {@link #close} method is called. 212 * @param queueSize The size of the internal queue used to hold search 213 * result entries until they can be consumed by the 214 * {@link #nextEntry} method. The value must be 215 * greater than zero. 216 * 217 * @throws LDAPException If there is a problem with the provided search 218 * request or when trying to communicate with the 219 * directory server over the provided connection. 220 */ 221 public LDAPEntrySource(@NotNull final LDAPConnection connection, 222 @NotNull final SearchRequest searchRequest, 223 final boolean closeConnection, 224 final int queueSize) 225 throws LDAPException 226 { 227 Validator.ensureNotNull(connection, searchRequest); 228 Validator.ensureTrue(queueSize > 0, 229 "LDAPEntrySource.queueSize must be greater than 0."); 230 231 this.connection = connection; 232 this.closeConnection = closeConnection; 233 234 if (searchRequest.getSearchResultListener() != null) 235 { 236 throw new LDAPException(ResultCode.PARAM_ERROR, 237 ERR_LDAP_ENTRY_SOURCE_REQUEST_HAS_LISTENER.get()); 238 } 239 240 closed = new AtomicBoolean(false); 241 searchResult = new AtomicReference<>(); 242 queue = new LinkedBlockingQueue<>(queueSize); 243 244 final SearchRequest r = new SearchRequest(this, searchRequest.getControls(), 245 searchRequest.getBaseDN(), searchRequest.getScope(), 246 searchRequest.getDereferencePolicy(), searchRequest.getSizeLimit(), 247 searchRequest.getTimeLimitSeconds(), searchRequest.typesOnly(), 248 searchRequest.getFilter(), searchRequest.getAttributes()); 249 asyncRequestID = connection.asyncSearch(r); 250 } 251 252 253 254 /** 255 * {@inheritDoc} 256 */ 257 @Override() 258 @Nullable() 259 public Entry nextEntry() 260 throws EntrySourceException 261 { 262 while (true) 263 { 264 if (closed.get() && queue.isEmpty()) 265 { 266 return null; 267 } 268 269 final Object o; 270 try 271 { 272 o = queue.poll(10L, TimeUnit.MILLISECONDS); 273 } 274 catch (final InterruptedException ie) 275 { 276 Debug.debugException(ie); 277 Thread.currentThread().interrupt(); 278 throw new EntrySourceException(true, 279 ERR_LDAP_ENTRY_SOURCE_NEXT_ENTRY_INTERRUPTED.get(), ie); 280 } 281 282 if (o != null) 283 { 284 if (o == END_OF_RESULTS) 285 { 286 return null; 287 } 288 else if (o instanceof Entry) 289 { 290 return (Entry) o; 291 } 292 else 293 { 294 throw (EntrySourceException) o; 295 } 296 } 297 } 298 } 299 300 301 302 /** 303 * {@inheritDoc} 304 */ 305 @Override() 306 public void close() 307 { 308 closeInternal(true); 309 } 310 311 312 313 /** 314 * Closes this LDAP entry source. 315 * 316 * @param abandon Indicates whether to attempt to abandon the search. 317 */ 318 private void closeInternal(final boolean abandon) 319 { 320 addToQueue(END_OF_RESULTS); 321 322 if (closed.compareAndSet(false, true)) 323 { 324 if (abandon) 325 { 326 try 327 { 328 connection.abandon(asyncRequestID); 329 } 330 catch (final Exception e) 331 { 332 Debug.debugException(e); 333 } 334 } 335 336 if (closeConnection) 337 { 338 connection.close(); 339 } 340 } 341 } 342 343 344 345 /** 346 * Retrieves the search result for the search operation, if available. It 347 * will not be available until the search has completed (as indicated by a 348 * {@code null} return value from the {@link #nextEntry} method). 349 * 350 * @return The search result for the search operation, or {@code null} if it 351 * is not available (e.g., because the search has not yet completed). 352 */ 353 @Nullable() 354 public SearchResult getSearchResult() 355 { 356 return searchResult.get(); 357 } 358 359 360 361 /** 362 * {@inheritDoc} This is intended for internal use only and should not be 363 * called by anything outside of the LDAP SDK itself. 364 */ 365 @InternalUseOnly() 366 @Override() 367 public void searchEntryReturned(@NotNull final SearchResultEntry searchEntry) 368 { 369 addToQueue(searchEntry); 370 } 371 372 373 374 /** 375 * {@inheritDoc} This is intended for internal use only and should not be 376 * called by anything outside of the LDAP SDK itself. 377 */ 378 @InternalUseOnly() 379 @Override() 380 public void searchReferenceReturned( 381 @NotNull final SearchResultReference searchReference) 382 { 383 addToQueue(new SearchResultReferenceEntrySourceException(searchReference)); 384 } 385 386 387 388 /** 389 * {@inheritDoc} This is intended for internal use only and should not be 390 * called by anything outside of the LDAP SDK itself. 391 */ 392 @InternalUseOnly() 393 @Override() 394 public void searchResultReceived(@NotNull final AsyncRequestID requestID, 395 @NotNull final SearchResult searchResult) 396 { 397 this.searchResult.set(searchResult); 398 399 if (! searchResult.getResultCode().equals(ResultCode.SUCCESS)) 400 { 401 addToQueue(new EntrySourceException(false, 402 new LDAPSearchException(searchResult))); 403 } 404 405 closeInternal(false); 406 } 407 408 409 410 /** 411 * Adds the provided object to the queue, waiting as long as needed until it 412 * has been added. 413 * 414 * @param o The object to be added. It must not be {@code null}. 415 */ 416 private void addToQueue(@NotNull final Object o) 417 { 418 while (true) 419 { 420 if (closed.get()) 421 { 422 return; 423 } 424 425 try 426 { 427 if (queue.offer(o, 100L, TimeUnit.MILLISECONDS)) 428 { 429 return; 430 } 431 } 432 catch (final InterruptedException ie) 433 { 434 Debug.debugException(ie); 435 } 436 } 437 } 438}