s4-ldb: much more memory efficient msg filtering
[samba.git] / source4 / lib / ldb / ldb_tdb / ldb_search.c
1 /* 
2    ldb database library
3
4    Copyright (C) Andrew Tridgell  2004
5
6      ** NOTE! The following LGPL license applies to the ldb
7      ** library. This does NOT imply that all of Samba is released
8      ** under the LGPL
9    
10    This library is free software; you can redistribute it and/or
11    modify it under the terms of the GNU Lesser General Public
12    License as published by the Free Software Foundation; either
13    version 3 of the License, or (at your option) any later version.
14
15    This library is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18    Lesser General Public License for more details.
19
20    You should have received a copy of the GNU Lesser General Public
21    License along with this library; if not, see <http://www.gnu.org/licenses/>.
22 */
23
24 /*
25  *  Name: ldb
26  *
27  *  Component: ldb search functions
28  *
29  *  Description: functions to search ldb+tdb databases
30  *
31  *  Author: Andrew Tridgell
32  */
33
34 #include "ldb_tdb.h"
35
36 /*
37   add one element to a message
38 */
39 static int msg_add_element(struct ldb_message *ret, 
40                            const struct ldb_message_element *el,
41                            int check_duplicates)
42 {
43         unsigned int i;
44         struct ldb_message_element *e2, *elnew;
45
46         if (check_duplicates && ldb_msg_find_element(ret, el->name)) {
47                 /* its already there */
48                 return 0;
49         }
50
51         e2 = talloc_realloc(ret, ret->elements, struct ldb_message_element, ret->num_elements+1);
52         if (!e2) {
53                 return -1;
54         }
55         ret->elements = e2;
56         
57         elnew = &e2[ret->num_elements];
58
59         elnew->name = talloc_strdup(ret->elements, el->name);
60         if (!elnew->name) {
61                 return -1;
62         }
63
64         if (el->num_values) {
65                 elnew->values = talloc_array(ret->elements, struct ldb_val, el->num_values);
66                 if (!elnew->values) {
67                         return -1;
68                 }
69         } else {
70                 elnew->values = NULL;
71         }
72
73         for (i=0;i<el->num_values;i++) {
74                 elnew->values[i] = ldb_val_dup(elnew->values, &el->values[i]);
75                 if (elnew->values[i].length != el->values[i].length) {
76                         return -1;
77                 }
78         }
79
80         elnew->num_values = el->num_values;
81         elnew->flags = el->flags;
82
83         ret->num_elements++;
84
85         return 0;
86 }
87
88 /*
89   add the special distinguishedName element
90 */
91 static int msg_add_distinguished_name(struct ldb_message *msg)
92 {
93         struct ldb_message_element el;
94         struct ldb_val val;
95         int ret;
96
97         el.flags = 0;
98         el.name = "distinguishedName";
99         el.num_values = 1;
100         el.values = &val;
101         el.flags = 0;
102         val.data = (uint8_t *)ldb_dn_alloc_linearized(msg, msg->dn);
103         val.length = strlen((char *)val.data);
104         
105         ret = msg_add_element(msg, &el, 1);
106         return ret;
107 }
108
109 /*
110   add all elements from one message into another
111  */
112 static int msg_add_all_elements(struct ldb_module *module, struct ldb_message *ret,
113                                 const struct ldb_message *msg)
114 {
115         struct ldb_context *ldb;
116         unsigned int i;
117         int check_duplicates = (ret->num_elements != 0);
118
119         ldb = ldb_module_get_ctx(module);
120
121         if (msg_add_distinguished_name(ret) != 0) {
122                 return -1;
123         }
124
125         for (i=0;i<msg->num_elements;i++) {
126                 const struct ldb_schema_attribute *a;
127                 a = ldb_schema_attribute_by_name(ldb, msg->elements[i].name);
128                 if (a->flags & LDB_ATTR_FLAG_HIDDEN) {
129                         continue;
130                 }
131                 if (msg_add_element(ret, &msg->elements[i],
132                                     check_duplicates) != 0) {
133                         return -1;
134                 }
135         }
136
137         return 0;
138 }
139
140
141 /*
142   pull the specified list of attributes from a message
143  */
144 static struct ldb_message *ltdb_pull_attrs(struct ldb_module *module, 
145                                            TALLOC_CTX *mem_ctx, 
146                                            const struct ldb_message *msg, 
147                                            const char * const *attrs)
148 {
149         struct ldb_message *ret;
150         unsigned int i;
151
152         ret = talloc(mem_ctx, struct ldb_message);
153         if (!ret) {
154                 return NULL;
155         }
156
157         ret->dn = ldb_dn_copy(ret, msg->dn);
158         if (!ret->dn) {
159                 talloc_free(ret);
160                 return NULL;
161         }
162
163         ret->num_elements = 0;
164         ret->elements = NULL;
165
166         if (!attrs) {
167                 if (msg_add_all_elements(module, ret, msg) != 0) {
168                         talloc_free(ret);
169                         return NULL;
170                 }
171                 return ret;
172         }
173
174         for (i=0;attrs[i];i++) {
175                 struct ldb_message_element *el;
176
177                 if (strcmp(attrs[i], "*") == 0) {
178                         if (msg_add_all_elements(module, ret, msg) != 0) {
179                                 talloc_free(ret);
180                                 return NULL;
181                         }
182                         continue;
183                 }
184
185                 if (ldb_attr_cmp(attrs[i], "distinguishedName") == 0) {
186                         if (msg_add_distinguished_name(ret) != 0) {
187                                 return NULL;
188                         }
189                         continue;
190                 }
191
192                 el = ldb_msg_find_element(msg, attrs[i]);
193                 if (!el) {
194                         continue;
195                 }
196                 if (msg_add_element(ret, el, 1) != 0) {
197                         talloc_free(ret);
198                         return NULL;                            
199                 }
200         }
201
202         return ret;
203 }
204
205 /*
206   search the database for a single simple dn.
207   return LDB_ERR_NO_SUCH_OBJECT on record-not-found
208   and LDB_SUCCESS on success
209 */
210 static int ltdb_search_base(struct ldb_module *module, struct ldb_dn *dn)
211 {
212         void *data = ldb_module_get_private(module);
213         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
214         TDB_DATA tdb_key, tdb_data;
215
216         if (ldb_dn_is_null(dn)) {
217                 return LDB_ERR_NO_SUCH_OBJECT;
218         }
219
220         /* form the key */
221         tdb_key = ltdb_key(module, dn);
222         if (!tdb_key.dptr) {
223                 return LDB_ERR_OPERATIONS_ERROR;
224         }
225
226         tdb_data = tdb_fetch(ltdb->tdb, tdb_key);
227         talloc_free(tdb_key.dptr);
228         if (!tdb_data.dptr) {
229                 return LDB_ERR_NO_SUCH_OBJECT;
230         }
231         
232         free(tdb_data.dptr);
233         return LDB_SUCCESS;
234 }
235
236 /*
237   search the database for a single simple dn, returning all attributes
238   in a single message
239
240   return LDB_ERR_NO_SUCH_OBJECT on record-not-found
241   and LDB_SUCCESS on success
242 */
243 int ltdb_search_dn1(struct ldb_module *module, struct ldb_dn *dn, struct ldb_message *msg)
244 {
245         void *data = ldb_module_get_private(module);
246         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
247         int ret;
248         TDB_DATA tdb_key, tdb_data;
249
250         memset(msg, 0, sizeof(*msg));
251
252         /* form the key */
253         tdb_key = ltdb_key(module, dn);
254         if (!tdb_key.dptr) {
255                 return LDB_ERR_OPERATIONS_ERROR;
256         }
257
258         tdb_data = tdb_fetch(ltdb->tdb, tdb_key);
259         talloc_free(tdb_key.dptr);
260         if (!tdb_data.dptr) {
261                 return LDB_ERR_NO_SUCH_OBJECT;
262         }
263         
264         msg->num_elements = 0;
265         msg->elements = NULL;
266
267         ret = ltdb_unpack_data(module, &tdb_data, msg);
268         free(tdb_data.dptr);
269         if (ret == -1) {
270                 struct ldb_context *ldb = ldb_module_get_ctx(module);
271                 ldb_debug(ldb, LDB_DEBUG_ERROR, "Invalid data for index %s\n",
272                           ldb_dn_get_linearized(msg->dn));
273                 return LDB_ERR_OPERATIONS_ERROR;                
274         }
275
276         if (!msg->dn) {
277                 msg->dn = ldb_dn_copy(msg, dn);
278         }
279         if (!msg->dn) {
280                 return LDB_ERR_OPERATIONS_ERROR;
281         }
282
283         return LDB_SUCCESS;
284 }
285
286 /*
287   add a set of attributes from a record to a set of results
288   return 0 on success, -1 on failure
289 */
290 int ltdb_add_attr_results(struct ldb_module *module, 
291                           TALLOC_CTX *mem_ctx, 
292                           struct ldb_message *msg,
293                           const char * const attrs[], 
294                           unsigned int *count, 
295                           struct ldb_message ***res)
296 {
297         struct ldb_message *msg2;
298         struct ldb_message **res2;
299
300         /* pull the attributes that the user wants */
301         msg2 = ltdb_pull_attrs(module, mem_ctx, msg, attrs);
302         if (!msg2) {
303                 return -1;
304         }
305
306         /* add to the results list */
307         res2 = talloc_realloc(mem_ctx, *res, struct ldb_message *, (*count)+2);
308         if (!res2) {
309                 talloc_free(msg2);
310                 return -1;
311         }
312
313         (*res) = res2;
314
315         (*res)[*count] = talloc_move(*res, &msg2);
316         (*res)[(*count)+1] = NULL;
317         (*count)++;
318
319         return 0;
320 }
321
322
323
324 /*
325   filter the specified list of attributes from a message
326   removing not requested attrs.
327  */
328 int ltdb_filter_attrs(struct ldb_message *msg, const char * const *attrs)
329 {
330         unsigned int i;
331         int keep_all = 0;
332         struct ldb_message_element *el2;
333         uint32_t num_elements;
334
335         if (attrs) {
336                 /* check for special attrs */
337                 for (i = 0; attrs[i]; i++) {
338                         if (strcmp(attrs[i], "*") == 0) {
339                                 keep_all = 1;
340                                 break;
341                         }
342
343                         if (ldb_attr_cmp(attrs[i], "distinguishedName") == 0) {
344                                 if (msg_add_distinguished_name(msg) != 0) {
345                                         return -1;
346                                 }
347                         }
348                 }
349         } else {
350                 keep_all = 1;
351         }
352         
353         if (keep_all) {
354                 if (msg_add_distinguished_name(msg) != 0) {
355                         return -1;
356                 }
357                 return 0;
358         }
359
360         el2 = talloc_array(msg, struct ldb_message_element, msg->num_elements);
361         if (el2 == NULL) {
362                 return -1;
363         }
364         num_elements = 0;
365
366         for (i = 0; i < msg->num_elements; i++) {
367                 unsigned int j;
368                 int found = 0;
369                 
370                 for (j = 0; attrs[j]; j++) {
371                         if (ldb_attr_cmp(msg->elements[i].name, attrs[j]) == 0) {
372                                 found = 1;
373                                 break;
374                         }
375                 }
376
377                 if (found) {
378                         el2[num_elements] = msg->elements[i];
379                         talloc_steal(el2, el2[num_elements].name);
380                         talloc_steal(el2, el2[num_elements].values);
381                         num_elements++;
382                 }
383         }
384
385         talloc_free(msg->elements);
386         msg->elements = talloc_realloc(msg, el2, struct ldb_message_element, msg->num_elements);
387         if (msg->elements == NULL) {
388                 return -1;
389         }
390         msg->num_elements = num_elements;
391
392         return 0;
393 }
394
395 /*
396   search function for a non-indexed search
397  */
398 static int search_func(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
399 {
400         struct ldb_context *ldb;
401         struct ltdb_context *ac;
402         struct ldb_message *msg;
403         int ret;
404         bool matched;
405
406         ac = talloc_get_type(state, struct ltdb_context);
407         ldb = ldb_module_get_ctx(ac->module);
408
409         if (key.dsize < 4 || 
410             strncmp((char *)key.dptr, "DN=", 3) != 0) {
411                 return 0;
412         }
413
414         msg = ldb_msg_new(ac);
415         if (!msg) {
416                 return -1;
417         }
418
419         /* unpack the record */
420         ret = ltdb_unpack_data(ac->module, &data, msg);
421         if (ret == -1) {
422                 talloc_free(msg);
423                 return -1;
424         }
425
426         if (!msg->dn) {
427                 msg->dn = ldb_dn_new(msg, ldb,
428                                      (char *)key.dptr + 3);
429                 if (msg->dn == NULL) {
430                         talloc_free(msg);
431                         return -1;
432                 }
433         }
434
435         /* see if it matches the given expression */
436         ret = ldb_match_msg_error(ldb, msg,
437                                   ac->tree, ac->base, ac->scope, &matched);
438         if (ret != LDB_SUCCESS) {
439                 talloc_free(msg);
440                 return -1;
441         }
442         if (!matched) {
443                 talloc_free(msg);
444                 return 0;
445         }
446
447         /* filter the attributes that the user wants */
448         ret = ltdb_filter_attrs(msg, ac->attrs);
449
450         if (ret == -1) {
451                 talloc_free(msg);
452                 return -1;
453         }
454
455         ret = ldb_module_send_entry(ac->req, msg, NULL);
456         if (ret != LDB_SUCCESS) {
457                 ac->request_terminated = true;
458                 /* the callback failed, abort the operation */
459                 return -1;
460         }
461
462         return 0;
463 }
464
465
466 /*
467   search the database with a LDAP-like expression.
468   this is the "full search" non-indexed variant
469 */
470 static int ltdb_search_full(struct ltdb_context *ctx)
471 {
472         void *data = ldb_module_get_private(ctx->module);
473         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
474         int ret;
475
476         if (ltdb->in_transaction != 0) {
477                 ret = tdb_traverse(ltdb->tdb, search_func, ctx);
478         } else {
479                 ret = tdb_traverse_read(ltdb->tdb, search_func, ctx);
480         }
481
482         if (ret == -1) {
483                 return LDB_ERR_OPERATIONS_ERROR;
484         }
485
486         return LDB_SUCCESS;
487 }
488
489 /*
490   search the database with a LDAP-like expression.
491   choses a search method
492 */
493 int ltdb_search(struct ltdb_context *ctx)
494 {
495         struct ldb_context *ldb;
496         struct ldb_module *module = ctx->module;
497         struct ldb_request *req = ctx->req;
498         void *data = ldb_module_get_private(module);
499         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
500         int ret;
501
502         ldb = ldb_module_get_ctx(module);
503
504         ldb_request_set_state(req, LDB_ASYNC_PENDING);
505
506         if (ltdb_lock_read(module) != 0) {
507                 return LDB_ERR_OPERATIONS_ERROR;
508         }
509
510         if (ltdb_cache_load(module) != 0) {
511                 ltdb_unlock_read(module);
512                 return LDB_ERR_OPERATIONS_ERROR;
513         }
514
515         if (req->op.search.tree == NULL) {
516                 ltdb_unlock_read(module);
517                 return LDB_ERR_OPERATIONS_ERROR;
518         }
519
520         if ((req->op.search.base == NULL) || (ldb_dn_is_null(req->op.search.base) == true)) {
521
522                 /* Check what we should do with a NULL dn */
523                 switch (req->op.search.scope) {
524                 case LDB_SCOPE_BASE:
525                         ldb_asprintf_errstring(ldb, 
526                                                "NULL Base DN invalid for a base search");
527                         ret = LDB_ERR_INVALID_DN_SYNTAX;
528                         break;
529                 case LDB_SCOPE_ONELEVEL:
530                         ldb_asprintf_errstring(ldb, 
531                                                "NULL Base DN invalid for a one-level search");
532                         ret = LDB_ERR_INVALID_DN_SYNTAX;        
533                         break;
534                 case LDB_SCOPE_SUBTREE:
535                 default:
536                         /* We accept subtree searches from a NULL base DN, ie over the whole DB */
537                         ret = LDB_SUCCESS;
538                 }
539         } else if (ldb_dn_is_valid(req->op.search.base) == false) {
540
541                 /* We don't want invalid base DNs here */
542                 ldb_asprintf_errstring(ldb, 
543                                        "Invalid Base DN: %s", 
544                                        ldb_dn_get_linearized(req->op.search.base));
545                 ret = LDB_ERR_INVALID_DN_SYNTAX;
546
547         } else if (ltdb->check_base) {
548                 /* This database has been marked as 'checkBaseOnSearch', so do a spot check of the base dn */
549                 ret = ltdb_search_base(module, req->op.search.base);
550                 
551                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
552                         ldb_asprintf_errstring(ldb, 
553                                                "No such Base DN: %s", 
554                                                ldb_dn_get_linearized(req->op.search.base));
555                 }
556                         
557         } else {
558                 /* If we are not checking the base DN life is easy */
559                 ret = LDB_SUCCESS;
560         }
561
562         ctx->tree = req->op.search.tree;
563         ctx->scope = req->op.search.scope;
564         ctx->base = req->op.search.base;
565         ctx->attrs = req->op.search.attrs;
566
567         if (ret == LDB_SUCCESS) {
568                 uint32_t match_count = 0;
569
570                 ret = ltdb_search_indexed(ctx, &match_count);
571                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
572                         /* Not in the index, therefore OK! */
573                         ret = LDB_SUCCESS;
574                         
575                 }
576                 /* Check if we got just a normal error.
577                  * In that case proceed to a full search unless we got a
578                  * callback error */
579                 if ( ! ctx->request_terminated && ret != LDB_SUCCESS) {
580                         /* Not indexed, so we need to do a full scan */
581 #if 0
582                         /* useful for debugging when slow performance
583                          * is caused by unindexed searches */
584                         char *expression = ldb_filter_from_tree(ctx, ctx->tree);
585                         printf("FULL SEARCH: %s\n", expression);
586                         talloc_free(expression);
587 #endif
588                         if (match_count != 0) {
589                                 /* the indexing code gave an error
590                                  * after having returned at least one
591                                  * entry. This means the indexes are
592                                  * corrupt or a database record is
593                                  * corrupt. We cannot continue with a
594                                  * full search or we may return
595                                  * duplicate entries
596                                  */
597                                 ltdb_unlock_read(module);
598                                 return LDB_ERR_OPERATIONS_ERROR;
599                         }
600                         ret = ltdb_search_full(ctx);
601                         if (ret != LDB_SUCCESS) {
602                                 ldb_set_errstring(ldb, "Indexed and full searches both failed!\n");
603                         }
604                 }
605         }
606
607         ltdb_unlock_read(module);
608
609         return ret;
610 }
611