kdb_kv_search: spell 'linearized'
[bbaumbach/samba-autobuild/.git] / lib / ldb / ldb_key_value / ldb_kv_search.c
1 /*
2    ldb database library
3
4    Copyright (C) Andrew Tridgell  2004
5
6      ** NOTE! The following LGPL license applies to the ldb
7      ** library. This does NOT imply that all of Samba is released
8      ** under the LGPL
9
10    This library is free software; you can redistribute it and/or
11    modify it under the terms of the GNU Lesser General Public
12    License as published by the Free Software Foundation; either
13    version 3 of the License, or (at your option) any later version.
14
15    This library is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18    Lesser General Public License for more details.
19
20    You should have received a copy of the GNU Lesser General Public
21    License along with this library; if not, see <http://www.gnu.org/licenses/>.
22 */
23
24 /*
25  *  Name: ldb
26  *
27  *  Component: ldb search functions
28  *
29  *  Description: functions to search ldb+tdb databases
30  *
31  *  Author: Andrew Tridgell
32  */
33
34 #include "ldb_kv.h"
35 #include "ldb_private.h"
36
37 /*
38   add one element to a message
39 */
40 static int msg_add_element(struct ldb_message *ret,
41                            const struct ldb_message_element *el,
42                            int check_duplicates)
43 {
44         unsigned int i;
45         struct ldb_message_element *e2, *elnew;
46
47         if (check_duplicates && ldb_msg_find_element(ret, el->name)) {
48                 /* its already there */
49                 return 0;
50         }
51
52         e2 = talloc_realloc(ret, ret->elements, struct ldb_message_element, ret->num_elements+1);
53         if (!e2) {
54                 return -1;
55         }
56         ret->elements = e2;
57
58         elnew = &e2[ret->num_elements];
59
60         elnew->name = talloc_strdup(ret->elements, el->name);
61         if (!elnew->name) {
62                 return -1;
63         }
64
65         if (el->num_values) {
66                 elnew->values = talloc_array(ret->elements, struct ldb_val, el->num_values);
67                 if (!elnew->values) {
68                         return -1;
69                 }
70         } else {
71                 elnew->values = NULL;
72         }
73
74         for (i=0;i<el->num_values;i++) {
75                 elnew->values[i] = ldb_val_dup(elnew->values, &el->values[i]);
76                 if (elnew->values[i].length != el->values[i].length) {
77                         return -1;
78                 }
79         }
80
81         elnew->num_values = el->num_values;
82         elnew->flags = el->flags;
83
84         ret->num_elements++;
85
86         return 0;
87 }
88
89 /*
90   add the special distinguishedName element
91 */
92 static int msg_add_distinguished_name(struct ldb_message *msg)
93 {
94         struct ldb_message_element el;
95         struct ldb_val val;
96         int ret;
97
98         el.flags = 0;
99         el.name = "distinguishedName";
100         el.num_values = 1;
101         el.values = &val;
102         el.flags = 0;
103         val.data = (uint8_t *)ldb_dn_alloc_linearized(msg, msg->dn);
104         if (val.data == NULL) {
105                 return -1;
106         }
107         val.length = strlen((char *)val.data);
108
109         ret = msg_add_element(msg, &el, 1);
110         return ret;
111 }
112
113 /*
114   search the database for a single simple dn.
115   return LDB_ERR_NO_SUCH_OBJECT on record-not-found
116   and LDB_SUCCESS on success
117 */
118 int ldb_kv_search_base(struct ldb_module *module,
119                        TALLOC_CTX *mem_ctx,
120                        struct ldb_dn *dn,
121                        struct ldb_dn **ret_dn)
122 {
123         int exists;
124         int ret;
125         struct ldb_message *msg = NULL;
126
127         if (ldb_dn_is_null(dn)) {
128                 return LDB_ERR_NO_SUCH_OBJECT;
129         }
130
131         /*
132          * We can't use tdb_exists() directly on a key when the TDB
133          * key is the GUID one, not the DN based one.  So we just do a
134          * normal search and avoid most of the allocation with the
135          * LDB_UNPACK_DATA_FLAG_NO_DN and
136          * LDB_UNPACK_DATA_FLAG_NO_ATTRS flags
137          */
138         msg = ldb_msg_new(module);
139         if (msg == NULL) {
140                 return LDB_ERR_OPERATIONS_ERROR;
141         }
142
143         ret = ldb_kv_search_dn1(module, dn, msg, LDB_UNPACK_DATA_FLAG_NO_ATTRS);
144         if (ret == LDB_SUCCESS) {
145                 const char *dn_linearized
146                         = ldb_dn_get_linearized(dn);
147                 const char *msg_dn_linearized
148                         = ldb_dn_get_linearized(msg->dn);
149
150                 if (strcmp(dn_linearized, msg_dn_linearized) == 0) {
151                         /*
152                          * Re-use the full incoming DN for
153                          * subtree checks
154                          */
155                         *ret_dn = dn;
156                 } else {
157                         /*
158                          * Use the string DN from the unpack, so that
159                          * we have a case-exact match of the base
160                          */
161                         *ret_dn = talloc_steal(mem_ctx, msg->dn);
162                 }
163                 exists = true;
164         } else if (ret == LDB_ERR_NO_SUCH_OBJECT) {
165                 exists = false;
166         } else {
167                 talloc_free(msg);
168                 return ret;
169         }
170         talloc_free(msg);
171         if (exists) {
172                 return LDB_SUCCESS;
173         }
174         return LDB_ERR_NO_SUCH_OBJECT;
175 }
176
177 struct ldb_kv_parse_data_unpack_ctx {
178         struct ldb_message *msg;
179         struct ldb_module *module;
180         struct ldb_kv_private *ldb_kv;
181         unsigned int unpack_flags;
182 };
183
184 static int ldb_kv_parse_data_unpack(struct ldb_val key,
185                                     struct ldb_val data,
186                                     void *private_data)
187 {
188         struct ldb_kv_parse_data_unpack_ctx *ctx = private_data;
189         unsigned int nb_elements_in_db;
190         int ret;
191         struct ldb_context *ldb = ldb_module_get_ctx(ctx->module);
192         struct ldb_val data_parse = data;
193
194         struct ldb_kv_private *ldb_kv = ctx->ldb_kv;
195
196         if ((ctx->unpack_flags & LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC)) {
197                 if ((ldb_kv->kv_ops->options & LDB_KV_OPTION_STABLE_READ_LOCK) &&
198                     (ctx->unpack_flags & LDB_UNPACK_DATA_FLAG_READ_LOCKED) &&
199                     !ldb_kv->kv_ops->transaction_active(ldb_kv)) {
200                         /*
201                          * In the case where no transactions are active and
202                          * we're in a read-lock, we can point directly into
203                          * database memory.
204                          *
205                          * The database can't be changed underneath us and we
206                          * will duplicate this data in the call to filter.
207                          *
208                          * This is seen in:
209                          * - ldb_kv_index_filter
210                          * - ldb_kv_search_and_return_base
211                          */
212                 } else {
213                         /*
214                          * In every other case, if we got
215                          * LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC we need at least
216                          * do a memdup on the whole data buffer as that may
217                          * change later and the caller needs a stable result.
218                          *
219                          * During transactions, pointers could change and in
220                          * TDB, there just aren't the same guarantees.
221                          */
222                         data_parse.data = talloc_memdup(ctx->msg,
223                                                         data.data,
224                                                         data.length);
225                         if (data_parse.data == NULL) {
226                                 ldb_debug(ldb, LDB_DEBUG_ERROR,
227                                           "Unable to allocate data(%d) for %*.*s\n",
228                                           (int)data.length,
229                                           (int)key.length, (int)key.length, key.data);
230                                 return LDB_ERR_OPERATIONS_ERROR;
231                         }
232                 }
233         }
234
235         ret = ldb_unpack_data_only_attr_list_flags(ldb, &data_parse,
236                                                    ctx->msg,
237                                                    NULL, 0,
238                                                    ctx->unpack_flags,
239                                                    &nb_elements_in_db);
240         if (ret == -1) {
241                 if (data_parse.data != data.data) {
242                         talloc_free(data_parse.data);
243                 }
244
245                 ldb_debug(ldb, LDB_DEBUG_ERROR, "Invalid data for index %*.*s\n",
246                           (int)key.length, (int)key.length, key.data);
247                 return LDB_ERR_OPERATIONS_ERROR;
248         }
249         return ret;
250 }
251
252 /*
253   search the database for a single simple dn, returning all attributes
254   in a single message
255
256   return LDB_ERR_NO_SUCH_OBJECT on record-not-found
257   and LDB_SUCCESS on success
258 */
259 int ldb_kv_search_key(struct ldb_module *module,
260                       struct ldb_kv_private *ldb_kv,
261                       const struct ldb_val ldb_key,
262                       struct ldb_message *msg,
263                       unsigned int unpack_flags)
264 {
265         int ret;
266         struct ldb_kv_parse_data_unpack_ctx ctx = {
267                 .msg = msg,
268                 .module = module,
269                 .unpack_flags = unpack_flags,
270                 .ldb_kv = ldb_kv
271         };
272
273         memset(msg, 0, sizeof(*msg));
274
275         msg->num_elements = 0;
276         msg->elements = NULL;
277
278         ret = ldb_kv->kv_ops->fetch_and_parse(
279             ldb_kv, ldb_key, ldb_kv_parse_data_unpack, &ctx);
280
281         if (ret == -1) {
282                 ret = ldb_kv->kv_ops->error(ldb_kv);
283                 if (ret == LDB_SUCCESS) {
284                         /*
285                          * Just to be sure we don't turn errors
286                          * into success
287                          */
288                         return LDB_ERR_OPERATIONS_ERROR;
289                 }
290                 return ret;
291         } else if (ret != LDB_SUCCESS) {
292                 return ret;
293         }
294
295         return LDB_SUCCESS;
296 }
297
298 /*
299   search the database for a single simple dn, returning all attributes
300   in a single message
301
302   return LDB_ERR_NO_SUCH_OBJECT on record-not-found
303   and LDB_SUCCESS on success
304 */
305 int ldb_kv_search_dn1(struct ldb_module *module,
306                       struct ldb_dn *dn,
307                       struct ldb_message *msg,
308                       unsigned int unpack_flags)
309 {
310         void *data = ldb_module_get_private(module);
311         struct ldb_kv_private *ldb_kv =
312             talloc_get_type(data, struct ldb_kv_private);
313         int ret;
314         uint8_t guid_key[LDB_KV_GUID_KEY_SIZE];
315         struct ldb_val key = {
316                 .data = guid_key,
317                 .length = sizeof(guid_key)
318         };
319         TALLOC_CTX *tdb_key_ctx = NULL;
320
321         bool valid_dn = ldb_dn_validate(dn);
322         if (valid_dn == false) {
323                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
324                                        "Invalid Base DN: %s",
325                                        ldb_dn_get_linearized(dn));
326                 return LDB_ERR_INVALID_DN_SYNTAX;
327         }
328
329         if (ldb_kv->cache->GUID_index_attribute == NULL ||
330             ldb_dn_is_special(dn)) {
331
332                 tdb_key_ctx = talloc_new(msg);
333                 if (!tdb_key_ctx) {
334                         return ldb_module_oom(module);
335                 }
336
337                 /* form the key */
338                 key = ldb_kv_key_dn(module, tdb_key_ctx, dn);
339                 if (!key.data) {
340                         TALLOC_FREE(tdb_key_ctx);
341                         return LDB_ERR_OPERATIONS_ERROR;
342                 }
343         } else {
344                 /*
345                  * Look in the index to find the key for this DN.
346                  *
347                  * the tdb_key memory is allocated above, msg is just
348                  * used for internal memory.
349                  *
350                  */
351                 ret = ldb_kv_key_dn_from_idx(module, ldb_kv, msg, dn, &key);
352                 if (ret != LDB_SUCCESS) {
353                         return ret;
354                 }
355         }
356
357         ret = ldb_kv_search_key(module, ldb_kv, key, msg, unpack_flags);
358
359         TALLOC_FREE(tdb_key_ctx);
360
361         if (ret != LDB_SUCCESS) {
362                 return ret;
363         }
364
365         if ((unpack_flags & LDB_UNPACK_DATA_FLAG_NO_DN) == 0) {
366                 if (!msg->dn) {
367                         msg->dn = ldb_dn_copy(msg, dn);
368                 }
369                 if (!msg->dn) {
370                         return LDB_ERR_OPERATIONS_ERROR;
371                 }
372         }
373
374         return LDB_SUCCESS;
375 }
376
377 /*
378   filter the specified list of attributes from a message
379   removing not requested attrs from the new message constructed.
380
381   The reason this makes a new message is that the old one may not be
382   individually allocated, which is what our callers expect.
383
384  */
385 int ldb_kv_filter_attrs(TALLOC_CTX *mem_ctx,
386                         const struct ldb_message *msg,
387                         const char *const *attrs,
388                         struct ldb_message **filtered_msg)
389 {
390         unsigned int i;
391         bool keep_all = false;
392         bool add_dn = false;
393         uint32_t num_elements;
394         uint32_t elements_size;
395         struct ldb_message *msg2;
396
397         msg2 = ldb_msg_new(mem_ctx);
398         if (msg2 == NULL) {
399                 goto failed;
400         }
401
402         msg2->dn = ldb_dn_copy(msg2, msg->dn);
403         if (msg2->dn == NULL) {
404                 goto failed;
405         }
406
407         if (attrs) {
408                 /* check for special attrs */
409                 for (i = 0; attrs[i]; i++) {
410                         int cmp = strcmp(attrs[i], "*");
411                         if (cmp == 0) {
412                                 keep_all = true;
413                                 break;
414                         }
415                         cmp = ldb_attr_cmp(attrs[i], "distinguishedName");
416                         if (cmp == 0) {
417                                 add_dn = true;
418                         }
419                 }
420         } else {
421                 keep_all = true;
422         }
423
424         if (keep_all) {
425                 add_dn = true;
426                 elements_size = msg->num_elements + 1;
427
428         /* Shortcuts for the simple cases */
429         } else if (add_dn && i == 1) {
430                 if (msg_add_distinguished_name(msg2) != 0) {
431                         goto failed;
432                 }
433                 *filtered_msg = msg2;
434                 return 0;
435         } else if (i == 0) {
436                 *filtered_msg = msg2;
437                 return 0;
438
439         /* Otherwise we are copying at most as many element as we have attributes */
440         } else {
441                 elements_size = i;
442         }
443
444         msg2->elements = talloc_array(msg2, struct ldb_message_element,
445                                       elements_size);
446         if (msg2->elements == NULL) goto failed;
447
448         num_elements = 0;
449
450         for (i = 0; i < msg->num_elements; i++) {
451                 struct ldb_message_element *el = &msg->elements[i];
452                 struct ldb_message_element *el2 = &msg2->elements[num_elements];
453                 unsigned int j;
454
455                 if (keep_all == false) {
456                         bool found = false;
457                         for (j = 0; attrs[j]; j++) {
458                                 int cmp = ldb_attr_cmp(el->name, attrs[j]);
459                                 if (cmp == 0) {
460                                         found = true;
461                                         break;
462                                 }
463                         }
464                         if (found == false) {
465                                 continue;
466                         }
467                 }
468                 *el2 = *el;
469                 el2->name = talloc_strdup(msg2->elements, el->name);
470                 if (el2->name == NULL) {
471                         goto failed;
472                 }
473                 el2->values = talloc_array(msg2->elements, struct ldb_val, el->num_values);
474                 if (el2->values == NULL) {
475                         goto failed;
476                 }
477                 for (j=0;j<el->num_values;j++) {
478                         el2->values[j] = ldb_val_dup(el2->values, &el->values[j]);
479                         if (el2->values[j].data == NULL && el->values[j].length != 0) {
480                                 goto failed;
481                         }
482                 }
483                 num_elements++;
484
485                 /* Pidginhole principle: we can't have more elements
486                  * than the number of attributes if they are unique in
487                  * the DB */
488                 if (num_elements > elements_size) {
489                         goto failed;
490                 }
491         }
492
493         msg2->num_elements = num_elements;
494
495         if (add_dn) {
496                 if (msg_add_distinguished_name(msg2) != 0) {
497                         goto failed;
498                 }
499         }
500
501         if (msg2->num_elements > 0) {
502                 msg2->elements = talloc_realloc(msg2, msg2->elements,
503                                                 struct ldb_message_element,
504                                                 msg2->num_elements);
505                 if (msg2->elements == NULL) {
506                         goto failed;
507                 }
508         } else {
509                 talloc_free(msg2->elements);
510                 msg2->elements = NULL;
511         }
512
513         *filtered_msg = msg2;
514
515         return 0;
516 failed:
517         TALLOC_FREE(msg2);
518         return -1;
519 }
520
521 /*
522   search function for a non-indexed search
523  */
524 static int search_func(struct ldb_kv_private *ldb_kv,
525                        struct ldb_val key,
526                        struct ldb_val val,
527                        void *state)
528 {
529         struct ldb_context *ldb;
530         struct ldb_kv_context *ac;
531         struct ldb_message *msg, *filtered_msg;
532         int ret;
533         bool matched;
534         unsigned int nb_elements_in_db;
535
536         ac = talloc_get_type(state, struct ldb_kv_context);
537         ldb = ldb_module_get_ctx(ac->module);
538
539         /*
540          * We want to skip @ records early in a search full scan
541          *
542          * @ records like @IDXLIST are only available via a base
543          * search on the specific name but the method by which they
544          * were excluded was expensive, after the unpack the DN is
545          * exploded and ldb_match_msg_error() would reject it for
546          * failing to match the scope.
547          *
548          * ldb_kv_key_is_normal_record() uses the fact that @ records
549          * have the DN=@ prefix on their TDB/LMDB key to quickly
550          * exclude them from consideration.
551          *
552          * (any other non-records are also excluded by the same key
553          * match)
554          */
555
556         if (ldb_kv_key_is_normal_record(key) == false) {
557                 return 0;
558         }
559
560         msg = ldb_msg_new(ac);
561         if (!msg) {
562                 ac->error = LDB_ERR_OPERATIONS_ERROR;
563                 return -1;
564         }
565
566         /* unpack the record */
567         ret = ldb_unpack_data_only_attr_list_flags(ldb, &val,
568                                                    msg,
569                                                    NULL, 0,
570                                                    LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC|
571                                                    LDB_UNPACK_DATA_FLAG_NO_VALUES_ALLOC,
572                                                    &nb_elements_in_db);
573         if (ret == -1) {
574                 talloc_free(msg);
575                 ac->error = LDB_ERR_OPERATIONS_ERROR;
576                 return -1;
577         }
578
579         if (!msg->dn) {
580                 msg->dn = ldb_dn_new(msg, ldb,
581                                      (char *)key.data + 3);
582                 if (msg->dn == NULL) {
583                         talloc_free(msg);
584                         ac->error = LDB_ERR_OPERATIONS_ERROR;
585                         return -1;
586                 }
587         }
588
589         /* see if it matches the given expression */
590         ret = ldb_match_msg_error(ldb, msg,
591                                   ac->tree, ac->base, ac->scope, &matched);
592         if (ret != LDB_SUCCESS) {
593                 talloc_free(msg);
594                 ac->error = LDB_ERR_OPERATIONS_ERROR;
595                 return -1;
596         }
597         if (!matched) {
598                 talloc_free(msg);
599                 return 0;
600         }
601
602         /* filter the attributes that the user wants */
603         ret = ldb_kv_filter_attrs(ac, msg, ac->attrs, &filtered_msg);
604         talloc_free(msg);
605
606         if (ret == -1) {
607                 ac->error = LDB_ERR_OPERATIONS_ERROR;
608                 return -1;
609         }
610
611         ret = ldb_module_send_entry(ac->req, filtered_msg, NULL);
612         if (ret != LDB_SUCCESS) {
613                 ac->request_terminated = true;
614                 /* the callback failed, abort the operation */
615                 ac->error = LDB_ERR_OPERATIONS_ERROR;
616                 return -1;
617         }
618
619         return 0;
620 }
621
622
623 /*
624   search the database with a LDAP-like expression.
625   this is the "full search" non-indexed variant
626 */
627 static int ldb_kv_search_full(struct ldb_kv_context *ctx)
628 {
629         void *data = ldb_module_get_private(ctx->module);
630         struct ldb_kv_private *ldb_kv =
631             talloc_get_type(data, struct ldb_kv_private);
632         int ret;
633
634         ctx->error = LDB_SUCCESS;
635         ret = ldb_kv->kv_ops->iterate(ldb_kv, search_func, ctx);
636
637         if (ret < 0) {
638                 return LDB_ERR_OPERATIONS_ERROR;
639         }
640
641         return ctx->error;
642 }
643
644 static int ldb_kv_search_and_return_base(struct ldb_kv_private *ldb_kv,
645                                          struct ldb_kv_context *ctx)
646 {
647         struct ldb_message *msg, *filtered_msg;
648         struct ldb_context *ldb = ldb_module_get_ctx(ctx->module);
649         const char *dn_linearized;
650         const char *msg_dn_linearized;
651         int ret;
652         bool matched;
653
654         msg = ldb_msg_new(ctx);
655         if (!msg) {
656                 return LDB_ERR_OPERATIONS_ERROR;
657         }
658         ret = ldb_kv_search_dn1(ctx->module,
659                                 ctx->base,
660                                 msg,
661                                 LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC |
662                                 LDB_UNPACK_DATA_FLAG_NO_VALUES_ALLOC |
663                                 LDB_UNPACK_DATA_FLAG_READ_LOCKED);
664
665         if (ret == LDB_ERR_NO_SUCH_OBJECT) {
666                 if (ldb_kv->check_base == false) {
667                         /*
668                          * In this case, we are done, as no base
669                          * checking is allowed in this DB
670                          */
671                         talloc_free(msg);
672                         return LDB_SUCCESS;
673                 }
674                 ldb_asprintf_errstring(ldb,
675                                        "No such Base DN: %s",
676                                        ldb_dn_get_linearized(ctx->base));
677         }
678         if (ret != LDB_SUCCESS) {
679                 talloc_free(msg);
680                 return ret;
681         }
682
683
684         /*
685          * We use this, not ldb_match_msg_error() as we know
686          * we matched on the scope BASE, as we just fetched
687          * the base DN
688          */
689
690         ret = ldb_match_message(ldb, msg,
691                                 ctx->tree,
692                                 ctx->scope,
693                                 &matched);
694         if (ret != LDB_SUCCESS) {
695                 talloc_free(msg);
696                 return ret;
697         }
698         if (!matched) {
699                 talloc_free(msg);
700                 return LDB_SUCCESS;
701         }
702
703         dn_linearized = ldb_dn_get_linearized(ctx->base);
704         msg_dn_linearized = ldb_dn_get_linearized(msg->dn);
705
706         if (strcmp(dn_linearized, msg_dn_linearized) == 0) {
707                 /*
708                  * If the DN is exactly the same string, then
709                  * re-use the full incoming DN for the
710                  * returned result, as it has already been
711                  * casefolded
712                  */
713                 msg->dn = ctx->base;
714         }
715
716         /*
717          * filter the attributes that the user wants.
718          *
719          * This copies msg->dn including the casefolding, so the above
720          * assignment is safe
721          */
722         ret = ldb_kv_filter_attrs(ctx, msg, ctx->attrs, &filtered_msg);
723         if (ret == -1) {
724                 talloc_free(msg);
725                 filtered_msg = NULL;
726                 return LDB_ERR_OPERATIONS_ERROR;
727         }
728
729         /*
730          * Remove any extended components possibly copied in from
731          * msg->dn, we just want the casefold components
732          */
733         ldb_dn_remove_extended_components(filtered_msg->dn);
734         talloc_free(msg);
735
736         ret = ldb_module_send_entry(ctx->req, filtered_msg, NULL);
737         if (ret != LDB_SUCCESS) {
738                 /* Regardless of success or failure, the msg
739                  * is the callbacks responsiblity, and should
740                  * not be talloc_free()'ed */
741                 ctx->request_terminated = true;
742                 return ret;
743         }
744
745         return LDB_SUCCESS;
746 }
747
748 /*
749   search the database with a LDAP-like expression.
750   choses a search method
751 */
752 int ldb_kv_search(struct ldb_kv_context *ctx)
753 {
754         struct ldb_context *ldb;
755         struct ldb_module *module = ctx->module;
756         struct ldb_request *req = ctx->req;
757         void *data = ldb_module_get_private(module);
758         struct ldb_kv_private *ldb_kv =
759             talloc_get_type(data, struct ldb_kv_private);
760         int ret;
761
762         ldb = ldb_module_get_ctx(module);
763
764         ldb_request_set_state(req, LDB_ASYNC_PENDING);
765
766         if (ldb_kv->kv_ops->lock_read(module) != 0) {
767                 return LDB_ERR_OPERATIONS_ERROR;
768         }
769
770         if (ldb_kv_cache_load(module) != 0) {
771                 ldb_kv->kv_ops->unlock_read(module);
772                 return LDB_ERR_OPERATIONS_ERROR;
773         }
774
775         if (req->op.search.tree == NULL) {
776                 ldb_kv->kv_ops->unlock_read(module);
777                 return LDB_ERR_OPERATIONS_ERROR;
778         }
779
780         ctx->tree = req->op.search.tree;
781         ctx->scope = req->op.search.scope;
782         ctx->base = req->op.search.base;
783         ctx->attrs = req->op.search.attrs;
784
785         if ((req->op.search.base == NULL) || (ldb_dn_is_null(req->op.search.base) == true)) {
786
787                 /* Check what we should do with a NULL dn */
788                 switch (req->op.search.scope) {
789                 case LDB_SCOPE_BASE:
790                         ldb_asprintf_errstring(ldb,
791                                                "NULL Base DN invalid for a base search");
792                         ret = LDB_ERR_INVALID_DN_SYNTAX;
793                         break;
794                 case LDB_SCOPE_ONELEVEL:
795                         ldb_asprintf_errstring(ldb,
796                                                "NULL Base DN invalid for a one-level search");
797                         ret = LDB_ERR_INVALID_DN_SYNTAX;
798                         break;
799                 case LDB_SCOPE_SUBTREE:
800                 default:
801                         /* We accept subtree searches from a NULL base DN, ie over the whole DB */
802                         ret = LDB_SUCCESS;
803                 }
804         } else if (req->op.search.scope == LDB_SCOPE_BASE) {
805
806                 /*
807                  * If we are LDB_SCOPE_BASE, do just one search and
808                  * return early.  This is critical to ensure we do not
809                  * go into the index code for special DNs, as that
810                  * will try to look up an index record for a special
811                  * record (which doesn't exist).
812                  */
813                 ret = ldb_kv_search_and_return_base(ldb_kv, ctx);
814
815                 ldb_kv->kv_ops->unlock_read(module);
816
817                 return ret;
818
819         } else if (ldb_kv->check_base) {
820                 /*
821                  * This database has been marked as
822                  * 'checkBaseOnSearch', so do a spot check of the base
823                  * dn.  Also optimise the subsequent filter by filling
824                  * in the ctx->base to be exactly case correct
825                  */
826                 ret = ldb_kv_search_base(
827                     module, ctx, req->op.search.base, &ctx->base);
828
829                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
830                         ldb_asprintf_errstring(ldb,
831                                                "No such Base DN: %s",
832                                                ldb_dn_get_linearized(req->op.search.base));
833                 }
834
835         } else if (ldb_dn_validate(req->op.search.base) == false) {
836
837                 /* We don't want invalid base DNs here */
838                 ldb_asprintf_errstring(ldb,
839                                        "Invalid Base DN: %s",
840                                        ldb_dn_get_linearized(req->op.search.base));
841                 ret = LDB_ERR_INVALID_DN_SYNTAX;
842
843         } else {
844                 /* If we are not checking the base DN life is easy */
845                 ret = LDB_SUCCESS;
846         }
847
848         if (ret == LDB_SUCCESS) {
849                 uint32_t match_count = 0;
850
851                 ret = ldb_kv_search_indexed(ctx, &match_count);
852                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
853                         /* Not in the index, therefore OK! */
854                         ret = LDB_SUCCESS;
855
856                 }
857                 /* Check if we got just a normal error.
858                  * In that case proceed to a full search unless we got a
859                  * callback error */
860                 if (!ctx->request_terminated && ret != LDB_SUCCESS) {
861                         /* Not indexed, so we need to do a full scan */
862                         if (ldb_kv->warn_unindexed ||
863                             ldb_kv->disable_full_db_scan) {
864                                 /* useful for debugging when slow performance
865                                  * is caused by unindexed searches */
866                                 char *expression = ldb_filter_from_tree(ctx, ctx->tree);
867                                 ldb_debug(ldb, LDB_DEBUG_ERROR, "ldb FULL SEARCH: %s SCOPE: %s DN: %s",
868                                                         expression,
869                                                         req->op.search.scope==LDB_SCOPE_BASE?"base":
870                                                         req->op.search.scope==LDB_SCOPE_ONELEVEL?"one":
871                                                         req->op.search.scope==LDB_SCOPE_SUBTREE?"sub":"UNKNOWN",
872                                                         ldb_dn_get_linearized(req->op.search.base));
873
874                                 talloc_free(expression);
875                         }
876
877                         if (match_count != 0) {
878                                 /* the indexing code gave an error
879                                  * after having returned at least one
880                                  * entry. This means the indexes are
881                                  * corrupt or a database record is
882                                  * corrupt. We cannot continue with a
883                                  * full search or we may return
884                                  * duplicate entries
885                                  */
886                                 ldb_kv->kv_ops->unlock_read(module);
887                                 return LDB_ERR_OPERATIONS_ERROR;
888                         }
889
890                         if (ldb_kv->disable_full_db_scan) {
891                                 ldb_set_errstring(ldb,
892                                                   "ldb FULL SEARCH disabled");
893                                 ldb_kv->kv_ops->unlock_read(module);
894                                 return LDB_ERR_INAPPROPRIATE_MATCHING;
895                         }
896
897                         ret = ldb_kv_search_full(ctx);
898                         if (ret != LDB_SUCCESS) {
899                                 ldb_set_errstring(ldb, "Indexed and full searches both failed!\n");
900                         }
901                 }
902         }
903
904         ldb_kv->kv_ops->unlock_read(module);
905
906         return ret;
907 }