r17525: This is a merge from the Google Summer of Code 2006 project by Martin Kühl
[samba.git] / source / lib / ldb / modules / ldb_map_outbound.c
1 /*
2    ldb database mapping module
3
4    Copyright (C) Jelmer Vernooij 2005
5    Copyright (C) Martin Kuehl <mkhl@samba.org> 2006
6
7    * NOTICE: this module is NOT released under the GNU LGPL license as
8    * other ldb code. This module is release under the GNU GPL v2 or
9    * later license.
10
11    This program is free software; you can redistribute it and/or modify
12    it under the terms of the GNU General Public License as published by
13    the Free Software Foundation; either version 2 of the License, or
14    (at your option) any later version.
15    
16    This program is distributed in the hope that it will be useful,
17    but WITHOUT ANY WARRANTY; without even the implied warranty of
18    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19    GNU General Public License for more details.
20    
21    You should have received a copy of the GNU General Public License
22    along with this program; if not, write to the Free Software
23    Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
24 */
25
26 #include "includes.h"
27 #include "ldb/include/includes.h"
28
29 #include "ldb/modules/ldb_map.h"
30 #include "ldb/modules/ldb_map_private.h"
31
32
33 /* Mapping attributes
34  * ================== */
35
36 /* Select attributes that stay in the local partition. */
37 static const char **map_attrs_select_local(struct ldb_module *module, void *mem_ctx, const char * const *attrs)
38 {
39         const struct ldb_map_context *data = map_get_context(module);
40         const char **result;
41         int i, last;
42
43         if (attrs == NULL)
44                 return NULL;
45
46         last = 0;
47         result = talloc_array(mem_ctx, const char *, 1);
48         if (result == NULL) {
49                 goto failed;
50         }
51         result[0] = NULL;
52
53         for (i = 0; attrs[i]; i++) {
54                 /* Wildcards and ignored attributes are kept locally */
55                 if ((ldb_attr_cmp(attrs[i], "*") == 0) ||
56                     (!map_attr_check_remote(data, attrs[i]))) {
57                         result = talloc_realloc(mem_ctx, result, const char *, last+2);
58                         if (result == NULL) {
59                                 goto failed;
60                         }
61
62                         result[last] = talloc_strdup(result, attrs[i]);
63                         result[last+1] = NULL;
64                         last++;
65                 }
66         }
67
68         return result;
69
70 failed:
71         talloc_free(result);
72         map_oom(module);
73         return NULL;
74 }
75
76 /* Collect attributes that are mapped into the remote partition. */
77 static const char **map_attrs_collect_remote(struct ldb_module *module, void *mem_ctx, const char * const *attrs)
78 {
79         const struct ldb_map_context *data = map_get_context(module);
80         const char **result;
81         const struct ldb_map_attribute *map;
82         const char *name;
83         int i, j, last;
84
85         if (attrs == NULL)
86                 return NULL;
87
88         last = 0;
89         result = talloc_array(mem_ctx, const char *, 1);
90         if (result == NULL) {
91                 goto failed;
92         }
93         result[0] = NULL;
94
95         for (i = 0; attrs[i]; i++) {
96                 /* Wildcards are kept remotely, too */
97                 if (ldb_attr_cmp(attrs[i], "*") == 0) {
98                         name = attrs[i];
99                         goto named;
100                 }
101
102                 /* Add remote names of mapped attrs */
103                 map = map_attr_find_local(data, attrs[i]);
104                 if (map == NULL) {
105                         continue;
106                 }
107
108                 switch (map->type) {
109                 case MAP_IGNORE:
110                         continue;
111
112                 case MAP_KEEP:
113                         name = attrs[i];
114                         goto named;
115
116                 case MAP_RENAME:
117                 case MAP_CONVERT:
118                         name = map->u.rename.remote_name;
119                         goto named;
120
121                 case MAP_GENERATE:
122                         /* Add all remote names of "generate" attrs */
123                         for (j = 0; map->u.generate.remote_names[j]; j++) {
124                                 result = talloc_realloc(mem_ctx, result, const char *, last+2);
125                                 if (result == NULL) {
126                                         goto failed;
127                                 }
128
129                                 result[last] = talloc_strdup(result, map->u.generate.remote_names[j]);
130                                 result[last+1] = NULL;
131                                 last++;
132                         }
133                         continue;
134                 }
135
136         named:  /* We found a single remote name, add that */
137                 result = talloc_realloc(mem_ctx, result, const char *, last+2);
138                 if (result == NULL) {
139                         goto failed;
140                 }
141
142                 result[last] = talloc_strdup(result, name);
143                 result[last+1] = NULL;
144                 last++;
145         }
146
147         return result;
148
149 failed:
150         talloc_free(result);
151         map_oom(module);
152         return NULL;
153 }
154
155 /* Split attributes that stay in the local partition from those that
156  * are mapped into the remote partition. */
157 static int map_attrs_partition(struct ldb_module *module, void *local_ctx, void *remote_ctx, const char ***local_attrs, const char ***remote_attrs, const char * const *attrs)
158 {
159         *local_attrs = map_attrs_select_local(module, local_ctx, attrs);
160         *remote_attrs = map_attrs_collect_remote(module, remote_ctx, attrs);
161
162         return 0;
163 }
164
165 /* Merge two lists of attributes into a single one. */
166 static int map_attrs_merge(struct ldb_module *module, void *mem_ctx, const char ***attrs, const char * const *more_attrs)
167 {
168         int i, j, k;
169
170         for (i = 0; (*attrs)[i]; i++) /* noop */ ;
171         for (j = 0; more_attrs[j]; j++) /* noop */ ;
172
173         *attrs = talloc_realloc(mem_ctx, *attrs, const char *, i+j+1);
174         if (*attrs == NULL) {
175                 map_oom(module);
176                 return -1;
177         }
178
179         for (k = 0; k < j; k++) {
180                 (*attrs)[i+k] = more_attrs[k];
181         }
182
183         (*attrs)[i+k] = NULL;
184
185         return 0;
186 }
187
188 /* Mapping ldb values
189  * ================== */
190
191 /* Map an ldb value from a parse tree into the remote partition. */
192 static struct ldb_val ldb_val_map_subtree(struct ldb_module *module, struct ldb_parse_tree *new, const struct ldb_map_attribute *map, const struct ldb_parse_tree *tree)
193 {
194         struct ldb_val val;
195
196         /* Extract the old value */
197         switch (tree->operation) {
198         case LDB_OP_EQUALITY:
199                 val = tree->u.equality.value;
200                 break;
201         case LDB_OP_LESS:
202         case LDB_OP_GREATER:
203         case LDB_OP_APPROX:
204                 val = tree->u.comparison.value;
205                 break;
206         case LDB_OP_EXTENDED:
207                 val = tree->u.extended.value;
208                 break;
209         default:
210                 val.length = 0;
211                 val.data = NULL;
212                 return ldb_val_dup(new, &val);
213         }
214
215         /* Convert to the new value */
216         return ldb_val_map_local(module, new, map, val);
217 }
218
219 /* Mapping message elements
220  * ======================== */
221
222 /* Add an element to a message, overwriting any old identically named elements. */
223 static int ldb_msg_replace(struct ldb_message *msg, const struct ldb_message_element *el)
224 {
225         struct ldb_message_element *old;
226
227         old = ldb_msg_find_element(msg, el->name);
228
229         /* no local result, add as new element */
230         if (old == NULL) {
231                 if (ldb_msg_add_empty(msg, el->name, 0) != 0) {
232                         return -1;
233                 }
234
235                 old = ldb_msg_find_element(msg, el->name);
236                 if (old == NULL) {
237                         return -1;
238                 }
239         }
240
241         *old = *el;                     /* copy new element */
242
243         return 0;
244 }
245
246 /* Map a message element back into the local partition. */
247 static struct ldb_message_element *ldb_msg_el_map_remote(struct ldb_module *module, void *mem_ctx, const struct ldb_map_attribute *map, const struct ldb_message_element *old)
248 {
249         struct ldb_message_element *el;
250         int i;
251
252         el = talloc_zero(mem_ctx, struct ldb_message_element);
253         if (el == NULL) {
254                 map_oom(module);
255                 return NULL;
256         }
257
258         el->num_values = old->num_values;
259         el->values = talloc_array(el, struct ldb_val, el->num_values);
260         if (el->values == NULL) {
261                 talloc_free(el);
262                 map_oom(module);
263                 return NULL;
264         }
265
266         el->name = map_attr_map_remote(el, map, old->name);
267
268         for (i = 0; i < el->num_values; i++) {
269                 el->values[i] = ldb_val_map_remote(module, el->values, map, old->values[i]);
270         }
271
272         return el;
273 }
274
275 /* Merge a remote message element into a local message. */
276 static int ldb_msg_el_merge(struct ldb_module *module, struct ldb_message *local, struct ldb_message *remote, const char *attr_name, const struct ldb_message_element *old)
277 {
278         const struct ldb_map_context *data = map_get_context(module);
279         const struct ldb_map_attribute *map = map_attr_find_remote(data, attr_name);
280         struct ldb_message_element *el;
281
282         /* Unknown attribute in remote message:
283          * skip, attribute was probably auto-generated */
284         if (map == NULL) {
285                 ldb_debug(module->ldb, LDB_DEBUG_WARNING, "ldb_map: "
286                           "Skipping attribute '%s': no mapping found\n",
287                           old->name);
288                 return 0;
289         }
290
291         switch (map->type) {
292         case MAP_IGNORE:
293                 return -1;
294
295         case MAP_CONVERT:
296                 if (map->u.convert.convert_remote == NULL) {
297                         ldb_debug(module->ldb, LDB_DEBUG_ERROR, "ldb_map: "
298                                   "Skipping attribute '%s': "
299                                   "'convert_remote' not set\n",
300                                   old->name);
301                         return 0;
302                 }
303                 /* fall through */
304         case MAP_KEEP:
305         case MAP_RENAME:
306                 el = ldb_msg_el_map_remote(module, local, map, old);
307                 break;
308
309         case MAP_GENERATE:
310                 if (map->u.generate.generate_local == NULL) {
311                         ldb_debug(module->ldb, LDB_DEBUG_ERROR, "ldb_map: "
312                                   "Skipping attribute '%s': "
313                                   "'generate_local' not set\n",
314                                   old->name);
315                         return 0;
316                 }
317
318                 el = map->u.generate.generate_local(module, local, old->name, remote);
319                 break;
320         }
321
322         if (el == NULL) {
323                 return -1;
324         }
325
326         return ldb_msg_replace(local, el);
327 }
328
329 /* Mapping messages
330  * ================ */
331
332 /* Merge two local messages into a single one. */
333 static int ldb_msg_merge_local(struct ldb_module *module, struct ldb_message *msg1, struct ldb_message *msg2)
334 {
335         int i, ret;
336
337         for (i = 0; i < msg2->num_elements; i++) {
338                 ret = ldb_msg_replace(msg1, &msg2->elements[i]);
339                 if (ret) {
340                         return ret;
341                 }
342         }
343
344         return 0;
345 }
346
347 /* Merge a local and a remote message into a single local one. */
348 static int ldb_msg_merge_remote(struct ldb_module *module, struct ldb_message *local, struct ldb_message *remote)
349 {
350         int i, ret;
351
352         /* Try to map each attribute back;
353          * Add to local message is possible,
354          * Overwrite old local attribute if necessary */
355         for (i = 0; i < remote->num_elements; i++) {
356                 ret = ldb_msg_el_merge(module, local, remote, remote->elements[i].name, &remote->elements[i]);
357                 if (ret) {
358                         return ret;
359                 }
360         }
361
362         return 0;
363 }
364
365 /* Mapping search results
366  * ====================== */
367
368 /* Map a search result back into the local partition. */
369 static int map_reply_remote(struct ldb_module *module, struct ldb_reply *ares)
370 {
371         struct ldb_message *msg;
372         struct ldb_dn *dn;
373         int ret;
374
375         /* There is no result message, skip */
376         if (ares->type != LDB_REPLY_ENTRY) {
377                 return 0;
378         }
379
380         /* Create a new result message */
381         msg = ldb_msg_new(ares);
382         if (msg == NULL) {
383                 map_oom(module);
384                 return -1;
385         }
386
387         /* Merge remote message into new message */
388         ret = ldb_msg_merge_remote(module, msg, ares->message);
389         if (ret) {
390                 talloc_free(msg);
391                 return ret;
392         }
393
394         /* Create corresponding local DN */
395         dn = ldb_dn_map_rebase_remote(module, msg, ares->message->dn);
396         if (dn == NULL) {
397                 talloc_free(msg);
398                 return -1;
399         }
400         msg->dn = dn;
401
402         /* Store new message with new DN as the result */
403         talloc_free(ares->message);
404         ares->message = msg;
405
406         return 0;
407 }
408
409 /* Mapping parse trees
410  * =================== */
411
412 /* Check whether a parse tree can safely be split in two. */
413 static BOOL ldb_parse_tree_check_splittable(const struct ldb_parse_tree *tree)
414 {
415         const struct ldb_parse_tree *subtree = tree;
416         BOOL negate = False;
417
418         while (subtree) {
419                 switch (subtree->operation) {
420                 case LDB_OP_NOT:
421                         negate = !negate;
422                         subtree = subtree->u.isnot.child;
423                         continue;
424
425                 case LDB_OP_AND:
426                         return !negate; /* if negate: False */
427
428                 case LDB_OP_OR:
429                         return negate;  /* if negate: True */
430
431                 default:
432                         return True;    /* simple parse tree */
433                 }
434         }
435
436         return True;                    /* no parse tree */
437 }
438
439 /* Collect a list of attributes required to match a given parse tree. */
440 static int ldb_parse_tree_collect_attrs(struct ldb_module *module, void *mem_ctx, const char ***attrs, const struct ldb_parse_tree *tree)
441 {
442         const char **new_attrs;
443         int i, ret;
444
445         if (tree == NULL) {
446                 return 0;
447         }
448
449         switch (tree->operation) {
450         case LDB_OP_OR:
451         case LDB_OP_AND:                /* attributes stored in list of subtrees */
452                 for (i = 0; i < tree->u.list.num_elements; i++) {
453                         ret = ldb_parse_tree_collect_attrs(module, mem_ctx, attrs, tree->u.list.elements[i]);
454                         if (ret) {
455                                 return ret;
456                         }
457                 }
458                 return 0;
459
460         case LDB_OP_NOT:                /* attributes stored in single subtree */
461                 return ldb_parse_tree_collect_attrs(module, mem_ctx, attrs, tree->u.isnot.child);
462
463         default:                        /* single attribute in tree */
464                 new_attrs = ldb_attr_list_copy_add(mem_ctx, *attrs, tree->u.equality.attr);
465                 talloc_free(*attrs);
466                 *attrs = new_attrs;
467                 return 0;
468         }
469
470         return -1;
471 }
472
473 static int map_subtree_select_local(struct ldb_module *module, void *mem_ctx, struct ldb_parse_tree **new, const struct ldb_parse_tree *tree);
474
475 /* Select a negated subtree that queries attributes in the local partition */
476 static int map_subtree_select_local_not(struct ldb_module *module, void *mem_ctx, struct ldb_parse_tree **new, const struct ldb_parse_tree *tree)
477 {
478         struct ldb_parse_tree *child;
479         int ret;
480
481         /* Prepare new tree */
482         *new = talloc_memdup(mem_ctx, tree, sizeof(struct ldb_parse_tree));
483         if (*new == NULL) {
484                 map_oom(module);
485                 return -1;
486         }
487
488         /* Generate new subtree */
489         ret = map_subtree_select_local(module, *new, &child, tree->u.isnot.child);
490         if (ret) {
491                 talloc_free(*new);
492                 return ret;
493         }
494
495         /* Prune tree without subtree */
496         if (child == NULL) {
497                 talloc_free(*new);
498                 *new = NULL;
499                 return 0;
500         }
501
502         (*new)->u.isnot.child = child;
503
504         return ret;
505 }
506
507 /* Select a list of subtrees that query attributes in the local partition */
508 static int map_subtree_select_local_list(struct ldb_module *module, void *mem_ctx, struct ldb_parse_tree **new, const struct ldb_parse_tree *tree)
509 {
510         int i, j, ret;
511
512         /* Prepare new tree */
513         *new = talloc_memdup(mem_ctx, tree, sizeof(struct ldb_parse_tree));
514         if (*new == NULL) {
515                 map_oom(module);
516                 return -1;
517         }
518
519         /* Prepare list of subtrees */
520         (*new)->u.list.num_elements = 0;
521         (*new)->u.list.elements = talloc_array(*new, struct ldb_parse_tree *, tree->u.list.num_elements);
522         if ((*new)->u.list.elements == NULL) {
523                 map_oom(module);
524                 talloc_free(*new);
525                 return -1;
526         }
527
528         /* Generate new list of subtrees */
529         j = 0;
530         for (i = 0; i < tree->u.list.num_elements; i++) {
531                 struct ldb_parse_tree *child;
532                 ret = map_subtree_select_local(module, *new, &child, tree->u.list.elements[i]);
533                 if (ret) {
534                         talloc_free(*new);
535                         return ret;
536                 }
537
538                 if (child) {
539                         (*new)->u.list.elements[j] = child;
540                         j++;
541                 }
542         }
543
544         /* Prune tree without subtrees */
545         if (j == 0) {
546                 talloc_free(*new);
547                 *new = NULL;
548                 return 0;
549         }
550
551         /* Fix subtree list size */
552         (*new)->u.list.num_elements = j;
553         (*new)->u.list.elements = talloc_realloc(*new, (*new)->u.list.elements, struct ldb_parse_tree *, (*new)->u.list.num_elements);
554
555         return ret;
556 }
557
558 /* Select a simple subtree that queries attributes in the local partition */
559 static int map_subtree_select_local_simple(struct ldb_module *module, void *mem_ctx, struct ldb_parse_tree **new, const struct ldb_parse_tree *tree)
560 {
561         /* Prepare new tree */
562         *new = talloc_memdup(mem_ctx, tree, sizeof(struct ldb_parse_tree));
563         if (*new == NULL) {
564                 map_oom(module);
565                 return -1;
566         }
567
568         return 0;
569 }
570
571 /* Select subtrees that query attributes in the local partition */
572 static int map_subtree_select_local(struct ldb_module *module, void *mem_ctx, struct ldb_parse_tree **new, const struct ldb_parse_tree *tree)
573 {
574         const struct ldb_map_context *data = map_get_context(module);
575
576         if (tree == NULL) {
577                 return 0;
578         }
579
580         if (tree->operation == LDB_OP_NOT) {
581                 return map_subtree_select_local_not(module, mem_ctx, new, tree);
582         }
583
584         if (tree->operation == LDB_OP_AND || tree->operation == LDB_OP_OR) {
585                 return map_subtree_select_local_list(module, mem_ctx, new, tree);
586         }
587
588         if (map_attr_check_remote(data, tree->u.equality.attr)) {
589                 *new = NULL;
590                 return 0;
591         }
592
593         return map_subtree_select_local_simple(module, mem_ctx, new, tree);
594 }
595
596 static int map_subtree_collect_remote(struct ldb_module *module, void *mem_ctx, struct ldb_parse_tree **new, const struct ldb_parse_tree *tree);
597
598 /* Collect a negated subtree that queries attributes in the remote partition */
599 static int map_subtree_collect_remote_not(struct ldb_module *module, void *mem_ctx, struct ldb_parse_tree **new, const struct ldb_parse_tree *tree)
600 {
601         struct ldb_parse_tree *child;
602         int ret;
603
604         /* Prepare new tree */
605         *new = talloc_memdup(mem_ctx, tree, sizeof(struct ldb_parse_tree));
606         if (*new == NULL) {
607                 map_oom(module);
608                 return -1;
609         }
610
611         /* Generate new subtree */
612         ret = map_subtree_collect_remote(module, *new, &child, tree->u.isnot.child);
613         if (ret) {
614                 talloc_free(*new);
615                 return ret;
616         }
617
618         /* Prune tree without subtree */
619         if (child == NULL) {
620                 talloc_free(*new);
621                 *new = NULL;
622                 return 0;
623         }
624
625         (*new)->u.isnot.child = child;
626
627         return ret;
628 }
629
630 /* Collect a list of subtrees that query attributes in the remote partition */
631 static int map_subtree_collect_remote_list(struct ldb_module *module, void *mem_ctx, struct ldb_parse_tree **new, const struct ldb_parse_tree *tree)
632 {
633         int i, j, ret;
634
635         /* Prepare new tree */
636         *new = talloc_memdup(mem_ctx, tree, sizeof(struct ldb_parse_tree));
637         if (*new == NULL) {
638                 map_oom(module);
639                 return -1;
640         }
641
642         /* Prepare list of subtrees */
643         (*new)->u.list.num_elements = 0;
644         (*new)->u.list.elements = talloc_array(*new, struct ldb_parse_tree *, tree->u.list.num_elements);
645         if ((*new)->u.list.elements == NULL) {
646                 map_oom(module);
647                 talloc_free(*new);
648                 return -1;
649         }
650
651         /* Generate new list of subtrees */
652         j = 0;
653         for (i = 0; i < tree->u.list.num_elements; i++) {
654                 struct ldb_parse_tree *child;
655                 ret = map_subtree_collect_remote(module, *new, &child, tree->u.list.elements[i]);
656                 if (ret) {
657                         talloc_free(*new);
658                         return ret;
659                 }
660
661                 if (child) {
662                         (*new)->u.list.elements[j] = child;
663                         j++;
664                 }
665         }
666
667         /* Prune tree without subtrees */
668         if (j == 0) {
669                 talloc_free(*new);
670                 *new = NULL;
671                 return 0;
672         }
673
674         /* Fix subtree list size */
675         (*new)->u.list.num_elements = j;
676         (*new)->u.list.elements = talloc_realloc(*new, (*new)->u.list.elements, struct ldb_parse_tree *, (*new)->u.list.num_elements);
677
678         return ret;
679 }
680
681 /* Collect a simple subtree that queries attributes in the remote partition */
682 static int map_subtree_collect_remote_simple(struct ldb_module *module, void *mem_ctx, struct ldb_parse_tree **new, const struct ldb_parse_tree *tree, const struct ldb_map_attribute *map)
683 {
684         const char *attr;
685         struct ldb_val val;
686
687         /* Prepare new tree */
688         *new = talloc_memdup(mem_ctx, tree, sizeof(struct ldb_parse_tree));
689         if (*new == NULL) {
690                 map_oom(module);
691                 return -1;
692         }
693
694         /* Map attribute and value */
695         attr = map_attr_map_local(*new, map, tree->u.equality.attr);
696         if (attr == NULL) {
697                 talloc_free(*new);
698                 *new = NULL;
699                 return 0;
700         }
701         val = ldb_val_map_subtree(module, *new, map, tree);
702
703         /* Store attribute and value in new tree */
704         switch (tree->operation) {
705         case LDB_OP_PRESENT:
706                 (*new)->u.present.attr = attr;
707                 break;
708         case LDB_OP_SUBSTRING:
709                 (*new)->u.substring.attr = attr;
710                 (*new)->u.substring.chunks = NULL;      /* FIXME! */
711                 break;
712         case LDB_OP_EQUALITY:
713                 (*new)->u.equality.attr = attr;
714                 (*new)->u.equality.value = val;
715                 break;
716         case LDB_OP_LESS:
717         case LDB_OP_GREATER:
718         case LDB_OP_APPROX:
719                 (*new)->u.comparison.attr = attr;
720                 (*new)->u.comparison.value = val;
721                 break;
722         case LDB_OP_EXTENDED:
723                 (*new)->u.extended.attr = attr;
724                 (*new)->u.extended.value = val;
725                 (*new)->u.extended.rule_id = talloc_strdup(*new, tree->u.extended.rule_id);
726                 break;
727         default:                        /* unknown kind of simple subtree */
728                 talloc_free(*new);
729                 return -1;
730         }
731
732         return 0;
733 }
734
735 /* Collect subtrees that query attributes in the remote partition */
736 static int map_subtree_collect_remote(struct ldb_module *module, void *mem_ctx, struct ldb_parse_tree **new, const struct ldb_parse_tree *tree)
737 {
738         const struct ldb_map_context *data = map_get_context(module);
739         const struct ldb_map_attribute *map;
740
741         if (tree == NULL) {
742                 return 0;
743         }
744
745         if (tree->operation == LDB_OP_NOT) {
746                 return map_subtree_collect_remote_not(module, mem_ctx, new, tree);
747         }
748
749         if ((tree->operation == LDB_OP_AND) || (tree->operation == LDB_OP_OR)) {
750                 return map_subtree_collect_remote_list(module, mem_ctx, new, tree);
751         }
752
753         if (!map_attr_check_remote(data, tree->u.equality.attr)) {
754                 *new = NULL;
755                 return 0;
756         }
757
758         map = map_attr_find_local(data, tree->u.equality.attr);
759         if (map->convert_operator) {
760                 *new = map->convert_operator(data, mem_ctx, tree);
761                 return 0;
762         }
763
764         if (map->type == MAP_GENERATE) {
765                 ldb_debug(module->ldb, LDB_DEBUG_WARNING, "ldb_map: "
766                           "Skipping attribute '%s': "
767                           "'convert_operator' not set\n",
768                           tree->u.equality.attr);
769                 *new = NULL;
770                 return 0;
771         }
772
773         return map_subtree_collect_remote_simple(module, mem_ctx, new, tree, map);
774 }
775
776 /* Split subtrees that query attributes in the local partition from
777  * those that query the remote partition. */
778 static int ldb_parse_tree_partition(struct ldb_module *module, void *local_ctx, void *remote_ctx, struct ldb_parse_tree **local_tree, struct ldb_parse_tree **remote_tree, const struct ldb_parse_tree *tree)
779 {
780         int ret;
781
782         *local_tree = NULL;
783         *remote_tree = NULL;
784
785         /* No original tree */
786         if (tree == NULL) {
787                 return 0;
788         }
789
790         /* Generate local tree */
791         ret = map_subtree_select_local(module, local_ctx, local_tree, tree);
792         if (ret) {
793                 return ret;
794         }
795
796         /* Generate remote tree */
797         ret = map_subtree_collect_remote(module, remote_ctx, remote_tree, tree);
798         if (ret) {
799                 talloc_free(*local_tree);
800                 return ret;
801         }
802
803         return 0;
804 }
805
806 /* Collect a list of attributes required either explicitly from a
807  * given list or implicitly  from a given parse tree; split the
808  * collected list into local and remote parts. */
809 static int map_attrs_collect_and_partition(struct ldb_module *module, void *local_ctx, void *remote_ctx, const char ***local_attrs, const char ***remote_attrs, const char * const *search_attrs, const struct ldb_parse_tree *tree)
810 {
811         void *tmp_ctx;
812         const char **tree_attrs;
813         int ret;
814
815         /* Clear initial lists of partitioned attributes */
816         *local_attrs = NULL;
817         *remote_attrs = NULL;
818
819         /* There are no searched attributes, just stick to that */
820         if (search_attrs == NULL) {
821                 return 0;
822         }
823
824         /* There is no tree, just partition the searched attributes */
825         if (tree == NULL) {
826                 return map_attrs_partition(module, local_ctx, remote_ctx, local_attrs, remote_attrs, search_attrs);
827         }
828
829         /* Create context for temporary memory */
830         tmp_ctx = talloc_new(local_ctx);
831         if (tmp_ctx == NULL) {
832                 goto oom;
833         }
834
835         /* Prepare list of attributes from tree */
836         tree_attrs = talloc_array(tmp_ctx, const char *, 1);
837         if (tree_attrs == NULL) {
838                 talloc_free(tmp_ctx);
839                 goto oom;
840         }
841         tree_attrs[0] = NULL;
842
843         /* Collect attributes from tree */
844         ret = ldb_parse_tree_collect_attrs(module, tmp_ctx, &tree_attrs, tree);
845         if (ret) {
846                 goto done;
847         }
848
849         /* Merge attributes from search operation */
850         ret = map_attrs_merge(module, tmp_ctx, &tree_attrs, search_attrs);
851         if (ret) {
852                 goto done;
853         }
854
855         /* Split local from remote attributes */
856         ret = map_attrs_partition(module, local_ctx, remote_ctx, local_attrs, remote_attrs, tree_attrs);
857
858 done:
859         /* Free temporary memory */
860         talloc_free(tmp_ctx);
861         return ret;
862
863 oom:
864         map_oom(module);
865         return -1;
866 }
867
868
869 /* Outbound requests: search
870  * ========================= */
871
872 /* Pass a merged search result up the callback chain. */
873 int map_up_callback(struct ldb_context *ldb, const struct ldb_request *req, struct ldb_reply *ares)
874 {
875         int i;
876
877         /* No callback registered, stop */
878         if (req->callback == NULL) {
879                 return LDB_SUCCESS;
880         }
881
882         /* Only records need special treatment */
883         if (ares->type != LDB_REPLY_ENTRY) {
884                 return req->callback(ldb, req->context, ares);
885         }
886
887         /* Merged result doesn't match original query, skip */
888         if (!ldb_match_msg(ldb, ares->message, req->op.search.tree, req->op.search.base, req->op.search.scope)) {
889                 ldb_debug(ldb, LDB_DEBUG_TRACE, "ldb_map: "
890                           "Skipping record '%s': "
891                           "doesn't match original search\n",
892                           ldb_dn_linearize(ldb, ares->message->dn));
893                 return LDB_SUCCESS;
894         }
895
896         /* Limit result to requested attrs */
897         if ((req->op.search.attrs) && (!ldb_attr_in_list(req->op.search.attrs, "*"))) {
898                 for (i = 0; i < ares->message->num_elements; i++) {
899                         const struct ldb_message_element *el = &ares->message->elements[i];
900                         if (!ldb_attr_in_list(req->op.search.attrs, el->name)) {
901                                 ldb_msg_remove_attr(ares->message, el->name);
902                         }
903                 }
904         }
905
906         return req->callback(ldb, req->context, ares);
907 }
908
909 /* Merge the remote and local parts of a search result. */
910 int map_local_merge_callback(struct ldb_context *ldb, void *context, struct ldb_reply *ares)
911 {
912         struct map_search_context *sc;
913         int ret;
914
915         if (context == NULL || ares == NULL) {
916                 ldb_set_errstring(ldb, talloc_asprintf(ldb, "ldb_map: "
917                                                        "NULL Context or Result in `map_local_merge_callback`"));
918                 return LDB_ERR_OPERATIONS_ERROR;
919         }
920
921         sc = talloc_get_type(context, struct map_search_context);
922
923         switch (ares->type) {
924         case LDB_REPLY_ENTRY:
925                 /* We have already found a local record */
926                 if (sc->local_res) {
927                         ldb_set_errstring(ldb, talloc_asprintf(ldb, "ldb_map: "
928                                                                "Too many results to base search for local entry"));
929                         talloc_free(ares);
930                         return LDB_ERR_OPERATIONS_ERROR;
931                 }
932
933                 /* Store local result */
934                 sc->local_res = ares;
935
936                 /* Merge remote into local message */
937                 ret = ldb_msg_merge_local(sc->ac->module, ares->message, sc->remote_res->message);
938                 if (ret) {
939                         talloc_free(ares);
940                         return LDB_ERR_OPERATIONS_ERROR;
941                 }
942
943                 return map_up_callback(ldb, sc->ac->orig_req, ares);
944
945         case LDB_REPLY_DONE:
946                 /* No local record found, continue with remote record */
947                 if (sc->local_res == NULL) {
948                         return map_up_callback(ldb, sc->ac->orig_req, sc->remote_res);
949                 }
950                 return LDB_SUCCESS;
951
952         default:
953                 ldb_set_errstring(ldb, talloc_asprintf(ldb, "ldb_map: "
954                                                        "Unexpected result type in base search for local entry"));
955                 talloc_free(ares);
956                 return LDB_ERR_OPERATIONS_ERROR;
957         }
958 }
959
960 /* Search the local part of a remote search result. */
961 int map_remote_search_callback(struct ldb_context *ldb, void *context, struct ldb_reply *ares)
962 {
963         struct map_context *ac;
964         struct map_search_context *sc;
965         struct ldb_request *req;
966         int ret;
967
968         if (context == NULL || ares == NULL) {
969                 ldb_set_errstring(ldb, talloc_asprintf(ldb, "ldb_map: "
970                                                        "NULL Context or Result in `map_remote_search_callback`"));
971                 return LDB_ERR_OPERATIONS_ERROR;
972         }
973
974         ac = talloc_get_type(context, struct map_context);
975
976         /* It's not a record, stop searching */
977         if (ares->type != LDB_REPLY_ENTRY) {
978                 return map_up_callback(ldb, ac->orig_req, ares);
979         }
980
981         /* Map result record into a local message */
982         ret = map_reply_remote(ac->module, ares);
983         if (ret) {
984                 talloc_free(ares);
985                 return LDB_ERR_OPERATIONS_ERROR;
986         }
987
988         /* There is no local db, stop searching */
989         if (!map_check_local_db(ac->module)) {
990                 return map_up_callback(ldb, ac->orig_req, ares);
991         }
992
993         /* Prepare local search context */
994         sc = map_init_search_context(ac, ares);
995         if (sc == NULL) {
996                 talloc_free(ares);
997                 return LDB_ERR_OPERATIONS_ERROR;
998         }
999
1000         /* Prepare local search request */
1001         /* TODO: use GUIDs here instead? */
1002
1003         ac->search_reqs = talloc_realloc(ac, ac->search_reqs, struct ldb_request *, ac->num_searches + 2);
1004         if (ac->search_reqs == NULL) {
1005                 talloc_free(ares);
1006                 return LDB_ERR_OPERATIONS_ERROR;
1007         }
1008
1009         ac->search_reqs[ac->num_searches]
1010                 = req = map_search_base_req(ac, ares->message->dn, 
1011                                             NULL, NULL, sc, map_local_merge_callback);
1012         if (req == NULL) {
1013                 talloc_free(sc);
1014                 talloc_free(ares);
1015                 return LDB_ERR_OPERATIONS_ERROR;
1016         }
1017         ac->num_searches++;
1018         ac->search_reqs[ac->num_searches] = NULL;
1019
1020         return ldb_next_request(ac->module, req);
1021 }
1022
1023 /* Search a record. */
1024 int map_search(struct ldb_module *module, struct ldb_request *req)
1025 {
1026         struct ldb_handle *h;
1027         struct map_context *ac;
1028         struct ldb_parse_tree *local_tree, *remote_tree;
1029         const char **local_attrs, **remote_attrs;
1030         int ret;
1031
1032         /* Do not manipulate our control entries */
1033         if (ldb_dn_is_special(req->op.search.base))
1034                 return ldb_next_request(module, req);
1035
1036         /* No mapping requested, skip to next module */
1037         if ((req->op.search.base) && (!ldb_dn_check_local(module, req->op.search.base))) {
1038                 return ldb_next_request(module, req);
1039         }
1040
1041         /* TODO: How can we be sure about which partition we are
1042          *       targetting when there is no search base? */
1043
1044         /* Prepare context and handle */
1045         h = map_init_handle(req, module);
1046         if (h == NULL) {
1047                 return LDB_ERR_OPERATIONS_ERROR;
1048         }
1049         ac = talloc_get_type(h->private_data, struct map_context);
1050
1051         ac->search_reqs = talloc_array(ac, struct ldb_request *, 2);
1052         if (ac->search_reqs == NULL) {
1053                 talloc_free(h);
1054                 return LDB_ERR_OPERATIONS_ERROR;
1055         }
1056         ac->num_searches = 1;
1057         ac->search_reqs[1] = NULL;
1058
1059         /* Prepare the remote operation */
1060         ac->search_reqs[0] = talloc(ac, struct ldb_request);
1061         if (ac->search_reqs[0] == NULL) {
1062                 goto oom;
1063         }
1064
1065         *(ac->search_reqs[0]) = *req;   /* copy the request */
1066
1067         ac->search_reqs[0]->handle = h; /* return our own handle to deal with this call */
1068
1069         ac->search_reqs[0]->context = ac;
1070         ac->search_reqs[0]->callback = map_remote_search_callback;
1071
1072         /* Split local from remote attrs */
1073         ret = map_attrs_collect_and_partition(module, ac, ac->search_reqs[0], &local_attrs, &remote_attrs, req->op.search.attrs, req->op.search.tree);
1074         if (ret) {
1075                 goto failed;
1076         }
1077
1078         ac->local_attrs = local_attrs;
1079         ac->search_reqs[0]->op.search.attrs = remote_attrs;
1080
1081         /* Split local from remote tree */
1082         ret = ldb_parse_tree_partition(module, ac, ac->search_reqs[0], &local_tree, &remote_tree, req->op.search.tree);
1083         if (ret) {
1084                 goto failed;
1085         }
1086
1087         if (((local_tree == NULL) ^ (remote_tree == NULL)) &&
1088             (!ldb_parse_tree_check_splittable(req->op.search.tree))) {
1089                 /* The query can't safely be split, enumerate the remote partition */
1090                 local_tree = NULL;
1091                 remote_tree = NULL;
1092         }
1093
1094         if (local_tree == NULL) {
1095                 /* Construct default local parse tree */
1096                 local_tree = talloc_zero(ac, struct ldb_parse_tree);
1097                 if (local_tree == NULL) {
1098                         map_oom(ac->module);
1099                         goto failed;
1100                 }
1101
1102                 local_tree->operation = LDB_OP_PRESENT;
1103                 local_tree->u.present.attr = talloc_strdup(local_tree, IS_MAPPED);
1104         }
1105         if (remote_tree == NULL) {
1106                 /* Construct default remote parse tree */
1107                 remote_tree = ldb_parse_tree(ac->search_reqs[0], NULL);
1108                 if (remote_tree == NULL) {
1109                         goto failed;
1110                 }
1111         }
1112
1113         ac->local_tree = local_tree;
1114         ac->search_reqs[0]->op.search.tree = remote_tree;
1115
1116         ldb_set_timeout_from_prev_req(module->ldb, req, ac->search_reqs[0]);
1117
1118         h->state = LDB_ASYNC_INIT;
1119         h->status = LDB_SUCCESS;
1120
1121         ac->step = MAP_SEARCH_REMOTE;
1122
1123         ret = ldb_next_remote_request(module, ac->search_reqs[0]);
1124         if (ret == LDB_SUCCESS) {
1125                 req->handle = h;
1126         }
1127         return ret;
1128
1129 oom:
1130         map_oom(module);
1131 failed:
1132         talloc_free(h);
1133         return LDB_ERR_OPERATIONS_ERROR;
1134 }