r344: fixed deletion of index records
[abartlet/samba.git/.git] / source4 / lib / ldb / ldb_tdb / ldb_index.c
1 /* 
2    ldb database library
3
4    Copyright (C) Andrew Tridgell  2004
5
6      ** NOTE! The following LGPL license applies to the ldb
7      ** library. This does NOT imply that all of Samba is released
8      ** under the LGPL
9    
10    This library is free software; you can redistribute it and/or
11    modify it under the terms of the GNU Lesser General Public
12    License as published by the Free Software Foundation; either
13    version 2 of the License, or (at your option) any later version.
14
15    This library is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18    Lesser General Public License for more details.
19
20    You should have received a copy of the GNU Lesser General Public
21    License along with this library; if not, write to the Free Software
22    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
23 */
24
25 /*
26  *  Name: ldb
27  *
28  *  Component: ldb tdb backend - indexing
29  *
30  *  Description: indexing routines for ldb tdb backend
31  *
32  *  Author: Andrew Tridgell
33  */
34
35 #include "includes.h"
36 #include "ldb/ldb_tdb/ldb_tdb.h"
37
38 struct dn_list {
39         unsigned int count;
40         char **dn;
41 };
42
43 /*
44   free a struct dn_list
45 */
46 static void dn_list_free(struct dn_list *list)
47 {
48         int i;
49         for (i=0;i<list->count;i++) {
50                 free(list->dn[i]);
51         }
52         if (list->dn) free(list->dn);
53 }
54
55 /*
56   return the dn key to be used for an index
57   caller frees
58 */
59 static char *ldb_dn_key(const char *attr, const struct ldb_val *value)
60 {
61         char *ret = NULL;
62
63         if (ldb_should_b64_encode(value)) {
64                 char *vstr = ldb_base64_encode(value->data, value->length);
65                 if (!vstr) return NULL;
66                 asprintf(&ret, "@INDEX:%s::%s", attr, vstr);
67                 free(vstr);
68                 return ret;
69         }
70
71         asprintf(&ret, "@INDEX:%s:%s", attr, (char *)value->data);
72         return ret;
73 }
74
75 /*
76   see if a attribute value is in the list of indexed attributes
77 */
78 static int ldb_msg_find_idx(const struct ldb_message *msg, const char *attr,
79                             int *v_idx)
80 {
81         int i, j;
82         for (i=0;i<msg->num_elements;i++) {
83                 if (strcmp(msg->elements[i].name, "@IDXATTR") == 0) {
84                         const struct ldb_message_element *el = 
85                                 &msg->elements[i];
86                         for (j=0;j<el->num_values;j++) {
87                                 if (strcmp((char *)el->values[j].data, attr) == 0) {
88                                         if (v_idx) {
89                                                 *v_idx = j;
90                                         }
91                                         return i;
92                                 }
93                         }
94                 }
95         }
96         return -1;
97 }
98
99 /*
100   return a list of dn's that might match a simple indexed search or
101  */
102 static int ltdb_index_dn_simple(struct ldb_context *ldb, 
103                                 struct ldb_parse_tree *tree,
104                                 const struct ldb_message *index_list,
105                                 struct dn_list *list)
106 {
107         char *dn = NULL;
108         int ret, i, j;
109         struct ldb_message msg;
110
111         list->count = 0;
112         list->dn = NULL;
113
114         /*
115           if the value is a wildcard then we can't do a match via indexing
116         */
117         if (ltdb_has_wildcard(&tree->u.simple.value)) {
118                 return -1;
119         }
120
121         /* if the attribute isn't in the list of indexed attributes then
122            this node needs a full search */
123         if (ldb_msg_find_idx(index_list, tree->u.simple.attr, NULL) == -1) {
124                 return -1;
125         }
126
127         /* the attribute is indexed. Pull the list of DNs that match the 
128            search criterion */
129         dn = ldb_dn_key(tree->u.simple.attr, &tree->u.simple.value);
130         if (!dn) return -1;
131
132         ret = ltdb_search_dn1(ldb, dn, &msg);
133         free(dn);
134         if (ret == 0 || ret == -1) {
135                 return ret;
136         }
137
138         for (i=0;i<msg.num_elements;i++) {
139                 struct ldb_message_element *el;
140
141                 if (strcmp(msg.elements[i].name, "@IDX") != 0) {
142                         continue;
143                 }
144
145                 el = &msg.elements[i];
146
147                 list->dn = malloc_array_p(char *, el->num_values);
148                 if (!list->dn) {
149                         break;          
150                 }
151
152                 for (j=0;j<el->num_values;j++) {
153                         list->dn[list->count] = 
154                                 strdup((char *)el->values[j].data);
155                         if (!list->dn[list->count]) {
156                                 dn_list_free(list);
157                                 ltdb_search_dn1_free(ldb, &msg);
158                                 return -1;
159                         }
160                         list->count++;
161                 }
162         }
163
164         ltdb_search_dn1_free(ldb, &msg);
165
166         qsort(list->dn, list->count, sizeof(char *), (comparison_fn_t) strcmp);
167
168         return 1;
169 }
170
171 /*
172   list intersection
173   list = list & list2
174   relies on the lists being sorted
175 */
176 static int list_intersect(struct dn_list *list, const struct dn_list *list2)
177 {
178         struct dn_list list3;
179         int i;
180
181         if (list->count == 0 || list2->count == 0) {
182                 /* 0 & X == 0 */
183                 dn_list_free(list);
184                 return 0;
185         }
186
187         list3.dn = malloc_array_p(char *, list->count);
188         if (!list3.dn) {
189                 dn_list_free(list);
190                 return -1;
191         }
192         list3.count = 0;
193
194         for (i=0;i<list->count;i++) {
195                 if (list_find(list->dn[i], list2->dn, list2->count, 
196                               sizeof(char *), (comparison_fn_t)strcmp) != -1) {
197                         list3.dn[list3.count] = list->dn[i];
198                         list3.count++;
199                 } else {
200                         free(list->dn[i]);
201                 }               
202         }
203
204         free(list->dn);
205         list->dn = list3.dn;
206         list->count = list3.count;
207
208         return 0;
209 }
210
211
212 /*
213   list union
214   list = list | list2
215   relies on the lists being sorted
216 */
217 static int list_union(struct dn_list *list, const struct dn_list *list2)
218 {
219         int i;
220         char **d;
221         unsigned int count = list->count;
222
223         if (list->count == 0 && list2->count == 0) {
224                 /* 0 | 0 == 0 */
225                 dn_list_free(list);
226                 return 0;
227         }
228
229         d = realloc_p(list->dn, char *, list->count + list2->count);
230         if (!d) {
231                 dn_list_free(list);
232                 return -1;
233         }
234         list->dn = d;
235
236         for (i=0;i<list2->count;i++) {
237                 if (list_find(list2->dn[i], list->dn, count, 
238                               sizeof(char *), (comparison_fn_t)strcmp) == -1) {
239                         list->dn[list->count] = strdup(list2->dn[i]);
240                         if (!list->dn[list->count]) {
241                                 dn_list_free(list);
242                                 return -1;
243                         }
244                         list->count++;
245                 }               
246         }
247
248         if (list->count != count) {
249                 qsort(list->dn, list->count, sizeof(char *), (comparison_fn_t)strcmp);
250         }
251
252         return 0;
253 }
254
255 static int ltdb_index_dn(struct ldb_context *ldb, 
256                          struct ldb_parse_tree *tree,
257                          const struct ldb_message *index_list,
258                          struct dn_list *list);
259
260
261 /*
262   OR two index results
263  */
264 static int ltdb_index_dn_or(struct ldb_context *ldb, 
265                             struct ldb_parse_tree *tree,
266                             const struct ldb_message *index_list,
267                             struct dn_list *list)
268 {
269         int ret, i;
270         
271         ret = -1;
272         list->dn = NULL;
273         list->count = 0;
274
275         for (i=0;i<tree->u.list.num_elements;i++) {
276                 struct dn_list list2;
277                 int v;
278                 v = ltdb_index_dn(ldb, tree->u.list.elements[i], index_list, &list2);
279
280                 if (v == 0) {
281                         /* 0 || X == X */
282                         if (ret == -1) {
283                                 ret = 0;
284                         }
285                         continue;
286                 }
287
288                 if (v == -1) {
289                         /* 1 || X == 1 */
290                         dn_list_free(list);
291                         return -1;
292                 }
293
294                 if (ret == -1) {
295                         ret = 1;
296                         *list = list2;
297                 } else {
298                         if (list_union(list, &list2) == -1) {
299                                 dn_list_free(&list2);
300                                 return -1;
301                         }
302                         dn_list_free(&list2);
303                 }
304         }
305
306         if (list->count == 0) {
307                 dn_list_free(list);
308                 return 0;
309         }
310
311         return ret;
312 }
313
314
315 /*
316   NOT an index results
317  */
318 static int ltdb_index_dn_not(struct ldb_context *ldb, 
319                              struct ldb_parse_tree *tree,
320                              const struct ldb_message *index_list,
321                              struct dn_list *list)
322 {
323         /* the only way to do an indexed not would be if we could
324            negate the not via another not or if we knew the total
325            number of database elements so we could know that the
326            existing expression covered the whole database. 
327            
328            instead, we just give up, and rely on a full index scan
329            (unless an outer & manages to reduce the list)
330         */
331         return -1;
332 }
333
334 /*
335   AND two index results
336  */
337 static int ltdb_index_dn_and(struct ldb_context *ldb, 
338                              struct ldb_parse_tree *tree,
339                              const struct ldb_message *index_list,
340                              struct dn_list *list)
341 {
342         int ret, i;
343         
344         ret = -1;
345         list->dn = NULL;
346         list->count = 0;
347
348         for (i=0;i<tree->u.list.num_elements;i++) {
349                 struct dn_list list2;
350                 int v;
351                 v = ltdb_index_dn(ldb, tree->u.list.elements[i], index_list, &list2);
352
353                 if (v == 0) {
354                         /* 0 && X == 0 */
355                         dn_list_free(list);
356                         return 0;
357                 }
358
359                 if (v == -1) {
360                         continue;
361                 }
362
363                 if (ret == -1) {
364                         ret = 1;
365                         *list = list2;
366                 } else {
367                         if (list_intersect(list, &list2) == -1) {
368                                 dn_list_free(&list2);
369                                 return -1;
370                         }
371                         dn_list_free(&list2);
372                 }
373
374                 if (list->count == 0) {
375                         if (list->dn) free(list->dn);
376                         return 0;
377                 }
378         }
379
380         return ret;
381 }
382
383 /*
384   return a list of dn's that might match a indexed search or
385   -1 if an error. return 0 for no matches, or 1 for matches
386  */
387 static int ltdb_index_dn(struct ldb_context *ldb, 
388                          struct ldb_parse_tree *tree,
389                          const struct ldb_message *index_list,
390                          struct dn_list *list)
391 {
392         int ret;
393
394         switch (tree->operation) {
395         case LDB_OP_SIMPLE:
396                 ret = ltdb_index_dn_simple(ldb, tree, index_list, list);
397                 break;
398
399         case LDB_OP_AND:
400                 ret = ltdb_index_dn_and(ldb, tree, index_list, list);
401                 break;
402
403         case LDB_OP_OR:
404                 ret = ltdb_index_dn_or(ldb, tree, index_list, list);
405                 break;
406
407         case LDB_OP_NOT:
408                 ret = ltdb_index_dn_not(ldb, tree, index_list, list);
409                 break;
410         }
411
412         return ret;
413 }
414
415 /*
416   filter a candidate dn_list from an indexed search into a set of results
417   extracting just the given attributes
418 */
419 static int ldb_index_filter(struct ldb_context *ldb, struct ldb_parse_tree *tree,
420                             const char *base,
421                             enum ldb_scope scope,
422                             const struct dn_list *dn_list, 
423                             const char *attrs[], struct ldb_message ***res)
424 {
425         int i;
426         unsigned int count = 0;
427
428         for (i=0;i<dn_list->count;i++) {
429                 struct ldb_message msg;
430                 int ret;
431                 ret = ltdb_search_dn1(ldb, dn_list->dn[i], &msg);
432                 if (ret == 0) {
433                         /* the record has disappeared? yes, this can happen */
434                         continue;
435                 }
436
437                 if (ret == -1) {
438                         /* an internal error */
439                         return -1;
440                 }
441
442                 if (ldb_message_match(ldb, &msg, tree, base, scope) == 1) {
443                         ret = ltdb_add_attr_results(ldb, &msg, attrs, &count, res);
444                 }
445                 ltdb_search_dn1_free(ldb, &msg);
446                 if (ret != 0) {
447                         return -1;
448                 }
449         }
450
451         return count;
452 }
453
454 /*
455   search the database with a LDAP-like expression using indexes
456   returns -1 if an indexed search is not possible, in which
457   case the caller should call ltdb_search_full() 
458 */
459 int ltdb_search_indexed(struct ldb_context *ldb, 
460                         const char *base,
461                         enum ldb_scope scope,
462                         struct ldb_parse_tree *tree,
463                         const char *attrs[], struct ldb_message ***res)
464 {
465         struct ldb_message index_list;
466         struct dn_list dn_list;
467         int ret;
468
469         /* find the list of indexed fields */   
470         ret = ltdb_search_dn1(ldb, "@INDEXLIST", &index_list);
471         if (ret != 1) {
472                 /* no index list? must do full search */
473                 return -1;
474         }
475
476         ret = ltdb_index_dn(ldb, tree, &index_list, &dn_list);
477         ltdb_search_dn1_free(ldb, &index_list);
478
479         if (ret == 1) {
480                 /* we've got a candidate list - now filter by the full tree
481                    and extract the needed attributes */
482                 ret = ldb_index_filter(ldb, tree, base, scope, &dn_list, 
483                                        attrs, res);
484                 dn_list_free(&dn_list);
485         }
486
487         return ret;
488 }
489
490 /*
491   add a index element where this is the first indexed DN for this value
492 */
493 static int ltdb_index_add1_new(struct ldb_context *ldb, 
494                                struct ldb_message *msg,
495                                struct ldb_message_element *el,
496                                const char *dn)
497 {
498         struct ldb_message_element *el2;
499
500         /* add another entry */
501         el2 = realloc_p(msg->elements, struct ldb_message_element, msg->num_elements+1);
502         if (!el2) {
503                 return -1;
504         }
505
506         msg->elements = el2;
507         msg->elements[msg->num_elements].name = "@IDX";
508         msg->elements[msg->num_elements].num_values = 0;
509         msg->elements[msg->num_elements].values = malloc_p(struct ldb_val);
510         if (!msg->elements[msg->num_elements].values) {
511                 return -1;
512         }
513         msg->elements[msg->num_elements].values[0].length = strlen(dn);
514         msg->elements[msg->num_elements].values[0].data = dn;
515         msg->elements[msg->num_elements].num_values = 1;
516         msg->num_elements++;
517
518         return 0;
519 }
520
521
522 /*
523   add a index element where this is not the first indexed DN for this
524   value
525 */
526 static int ltdb_index_add1_add(struct ldb_context *ldb, 
527                                struct ldb_message *msg,
528                                struct ldb_message_element *el,
529                                int idx,
530                                const char *dn)
531 {
532         struct ldb_val *v2;
533
534         v2 = realloc_p(msg->elements[idx].values,
535                        struct ldb_val, 
536                        msg->elements[idx].num_values+1);
537         if (!v2) {
538                 return -1;
539         }
540         msg->elements[idx].values = v2;
541
542         msg->elements[idx].values[msg->elements[idx].num_values].length = strlen(dn);
543         msg->elements[idx].values[msg->elements[idx].num_values].data = dn;
544         msg->elements[idx].num_values++;
545
546         return 0;
547 }
548
549 /*
550   add an index entry for one message element
551 */
552 static int ltdb_index_add1(struct ldb_context *ldb, const char *dn, 
553                            struct ldb_message_element *el, int v_idx)
554 {
555         struct ldb_message msg;
556         char *dn_key;
557         int ret, i;
558
559         dn_key = ldb_dn_key(el->name, &el->values[v_idx]);
560         if (!dn_key) {
561                 return -1;
562         }
563
564         ret = ltdb_search_dn1(ldb, dn_key, &msg);
565         if (ret == -1) {
566                 free(dn_key);
567                 return -1;
568         }
569
570         if (ret == 0) {
571                 msg.dn = strdup(dn_key);
572                 if (!msg.dn) {
573                         free(dn_key);
574                         errno = ENOMEM;
575                         return -1;
576                 }
577                 msg.num_elements = 0;
578                 msg.elements = NULL;
579                 msg.private = NULL;
580         }
581
582         free(dn_key);
583
584         for (i=0;i<msg.num_elements;i++) {
585                 if (strcmp("@IDX", msg.elements[i].name) == 0) {
586                         break;
587                 }
588         }
589
590         if (i == msg.num_elements) {
591                 ret = ltdb_index_add1_new(ldb, &msg, el, dn);
592         } else {
593                 ret = ltdb_index_add1_add(ldb, &msg, el, i, dn);
594         }
595
596         if (ret == 0) {
597                 ret = ltdb_store(ldb, &msg, TDB_REPLACE);
598         }
599
600         ltdb_search_dn1_free(ldb, &msg);
601
602         return ret;
603 }
604
605 /*
606   add the index entries for a new record
607   return -1 on failure
608 */
609 int ltdb_index_add(struct ldb_context *ldb, const struct ldb_message *msg)
610 {
611         int ret, i, j;
612         struct ldb_message index_list;
613
614         /* find the list of indexed fields */   
615         ret = ltdb_search_dn1(ldb, "@INDEXLIST", &index_list);
616         if (ret != 1) {
617                 /* no indexed fields or an error */
618                 return ret;
619         }
620
621         for (i=0;i<msg->num_elements;i++) {
622                 ret = ldb_msg_find_idx(&index_list, msg->elements[i].name, NULL);
623                 if (ret == -1) {
624                         continue;
625                 }
626                 for (j=0;j<msg->elements[i].num_values;j++) {
627                         ret = ltdb_index_add1(ldb, msg->dn, &msg->elements[i], j);
628                         if (ret == -1) {
629                                 ltdb_search_dn1_free(ldb, &index_list);
630                                 return -1;
631                         }
632                 }
633         }
634
635         ltdb_search_dn1_free(ldb, &index_list);
636
637         return 0;
638 }
639
640
641 /*
642   delete an index entry for one message element
643 */
644 static int ltdb_index_del1(struct ldb_context *ldb, const char *dn, 
645                            struct ldb_message_element *el, int v_idx)
646 {
647         struct ldb_message msg;
648         char *dn_key;
649         int ret, i, j;
650
651         dn_key = ldb_dn_key(el->name, &el->values[v_idx]);
652         if (!dn_key) {
653                 return -1;
654         }
655
656         ret = ltdb_search_dn1(ldb, dn_key, &msg);
657         if (ret == -1) {
658                 free(dn_key);
659                 return -1;
660         }
661
662         if (ret == 0) {
663                 /* it wasn't indexed. Did we have an earlier error? If we did then
664                    its gone now */
665                 ltdb_search_dn1_free(ldb, &msg);
666                 return 0;
667         }
668
669         i = ldb_msg_find_idx(&msg, dn, &j);
670         if (i == -1) {
671                 /* it ain't there. hmmm */
672                 ltdb_search_dn1_free(ldb, &msg);
673                 return 0;
674         }
675
676         if (j != msg.elements[i].num_values - 1) {
677                 memmove(&msg.elements[i].values[j], 
678                         &msg.elements[i].values[j+1], 
679                         (msg.elements[i].num_values-1) * 
680                         sizeof(msg.elements[i].values[0]));
681         }
682         msg.elements[i].num_values--;
683
684         if (msg.elements[i].num_values == 0) {
685                 ret = ltdb_delete_noindex(ldb, dn_key);
686         } else {
687                 ret = ltdb_store(ldb, &msg, TDB_REPLACE);
688         }
689
690         ltdb_search_dn1_free(ldb, &msg);
691
692         return ret;
693 }
694
695 /*
696   delete the index entries for a record
697   return -1 on failure
698 */
699 int ltdb_index_del(struct ldb_context *ldb, const struct ldb_message *msg)
700 {
701         int ret, i, j;
702         struct ldb_message index_list;
703
704         /* find the list of indexed fields */   
705         ret = ltdb_search_dn1(ldb, "@INDEXLIST", &index_list);
706         if (ret != 1) {
707                 /* no indexed fields or an error */
708                 return ret;
709         }
710
711         for (i=0;i<msg->num_elements;i++) {
712                 ret = ldb_msg_find_idx(&index_list, msg->elements[i].name, NULL);
713                 if (ret == -1) {
714                         continue;
715                 }
716                 for (j=0;j<msg->elements[i].num_values;j++) {
717                         ret = ltdb_index_del1(ldb, msg->dn, &msg->elements[i], j);
718                         if (ret == -1) {
719                                 ltdb_search_dn1_free(ldb, &index_list);
720                                 return -1;
721                         }
722                 }
723         }
724
725         return 0;
726 }
727
728
729 /*
730   traversal function that deletes all @INDEX records
731 */
732 static int delete_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
733 {
734         if (strncmp(key.dptr, "DN=@INDEX:", 10) == 0) {
735                 return tdb_delete(tdb, key);
736         }
737         return 0;
738 }
739
740 /*
741   traversal function that adds @INDEX records during a re index
742 */
743 static int re_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
744 {
745         struct ldb_context *ldb = state;
746         struct ldb_message msg;
747         int ret;
748
749         if (strncmp(key.dptr, "DN=@", 4) == 0 ||
750             strncmp(key.dptr, "DN=", 3) != 0) {
751                 return 0;
752         }
753
754         ret = ltdb_unpack_data(ldb, &data, &msg);
755         if (ret != 0) {
756                 return -1;
757         }
758
759         msg.dn = key.dptr+3;
760
761         ret = ltdb_index_add(ldb, &msg);
762
763         ltdb_unpack_data_free(&msg);
764
765         return ret;
766 }
767
768 /*
769   force a complete reindex of the database
770 */
771 int ltdb_reindex(struct ldb_context *ldb)
772 {
773         struct ltdb_private *ltdb = ldb->private;
774         int ret;
775
776         /* first traverse the database deleting any @INDEX records */
777         ret = tdb_traverse(ltdb->tdb, delete_index, NULL);
778         if (ret == -1) {
779                 errno = EIO;
780                 return -1;
781         }
782
783         /* now traverse adding any indexes for normal LDB records */
784         ret = tdb_traverse(ltdb->tdb, re_index, ldb);
785         if (ret == -1) {
786                 errno = EIO;
787                 return -1;
788         }
789
790         return 0;
791 }