ebed0d2bf943038f16fb8cf61f6e6b47922ef893
[sfrench/samba-autobuild/.git] / source4 / lib / ldb / ldb_map / ldb_map_outbound.c
1 /*
2    ldb database mapping module
3
4    Copyright (C) Jelmer Vernooij 2005
5    Copyright (C) Martin Kuehl <mkhl@samba.org> 2006
6    Copyright (C) Andrew Bartlett <abartlet@samba.org> 2006
7    Copyright (C) Simo Sorce <idra@samba.org> 2008
8
9      ** NOTE! The following LGPL license applies to the ldb
10      ** library. This does NOT imply that all of Samba is released
11      ** under the LGPL
12    
13    This library is free software; you can redistribute it and/or
14    modify it under the terms of the GNU Lesser General Public
15    License as published by the Free Software Foundation; either
16    version 3 of the License, or (at your option) any later version.
17
18    This library is distributed in the hope that it will be useful,
19    but WITHOUT ANY WARRANTY; without even the implied warranty of
20    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
21    Lesser General Public License for more details.
22
23    You should have received a copy of the GNU Lesser General Public
24    License along with this library; if not, see <http://www.gnu.org/licenses/>.
25
26 */
27
28 #include "ldb_includes.h"
29 #include "ldb_map.h"
30 #include "ldb_map_private.h"
31
32
33 /* Mapping attributes
34  * ================== */
35
36 /* Select attributes that stay in the local partition. */
37 static const char **map_attrs_select_local(struct ldb_module *module, void *mem_ctx, const char * const *attrs)
38 {
39         const struct ldb_map_context *data = map_get_context(module);
40         const char **result;
41         unsigned int i, last;
42
43         if (attrs == NULL)
44                 return NULL;
45
46         last = 0;
47         result = talloc_array(mem_ctx, const char *, 1);
48         if (result == NULL) {
49                 goto failed;
50         }
51         result[0] = NULL;
52
53         for (i = 0; attrs[i]; i++) {
54                 /* Wildcards and ignored attributes are kept locally */
55                 if ((ldb_attr_cmp(attrs[i], "*") == 0) ||
56                     (!map_attr_check_remote(data, attrs[i]))) {
57                         result = talloc_realloc(mem_ctx, result, const char *, last+2);
58                         if (result == NULL) {
59                                 goto failed;
60                         }
61
62                         result[last] = talloc_strdup(result, attrs[i]);
63                         result[last+1] = NULL;
64                         last++;
65                 }
66         }
67
68         return result;
69
70 failed:
71         talloc_free(result);
72         map_oom(module);
73         return NULL;
74 }
75
76 /* Collect attributes that are mapped into the remote partition. */
77 static const char **map_attrs_collect_remote(struct ldb_module *module, void *mem_ctx, 
78                                              const char * const *attrs)
79 {
80         const struct ldb_map_context *data = map_get_context(module);
81         const char **result;
82         const struct ldb_map_attribute *map;
83         const char *name=NULL;
84         unsigned int i, j, last;
85         int ret;
86
87         last = 0;
88         result = talloc_array(mem_ctx, const char *, 1);
89         if (result == NULL) {
90                 goto failed;
91         }
92         result[0] = NULL;
93
94         for (i = 0; attrs[i]; i++) {
95                 /* Wildcards are kept remotely, too */
96                 if (ldb_attr_cmp(attrs[i], "*") == 0) {
97                         const char **new_attrs = NULL;
98                         ret = map_attrs_merge(module, mem_ctx, &new_attrs, attrs);
99                         if (ret != LDB_SUCCESS) {
100                                 goto failed;
101                         }
102                         ret = map_attrs_merge(module, mem_ctx, &new_attrs, data->wildcard_attributes);
103                         if (ret != LDB_SUCCESS) {
104                                 goto failed;
105                         }
106
107                         attrs = new_attrs;
108                         break;
109                 }
110         }
111
112         for (i = 0; attrs[i]; i++) {
113                 /* Wildcards are kept remotely, too */
114                 if (ldb_attr_cmp(attrs[i], "*") == 0) {
115                         /* Add all 'include in wildcard' attributes */
116                         name = attrs[i];
117                         goto named;
118                 }
119
120                 /* Add remote names of mapped attrs */
121                 map = map_attr_find_local(data, attrs[i]);
122                 if (map == NULL) {
123                         continue;
124                 }
125
126                 switch (map->type) {
127                 case LDB_MAP_IGNORE:
128                         continue;
129
130                 case LDB_MAP_KEEP:
131                         name = attrs[i];
132                         goto named;
133
134                 case LDB_MAP_RENAME:
135                 case LDB_MAP_CONVERT:
136                         name = map->u.rename.remote_name;
137                         goto named;
138
139                 case LDB_MAP_GENERATE:
140                         /* Add all remote names of "generate" attrs */
141                         for (j = 0; map->u.generate.remote_names[j]; j++) {
142                                 result = talloc_realloc(mem_ctx, result, const char *, last+2);
143                                 if (result == NULL) {
144                                         goto failed;
145                                 }
146
147                                 result[last] = talloc_strdup(result, map->u.generate.remote_names[j]);
148                                 result[last+1] = NULL;
149                                 last++;
150                         }
151                         continue;
152                 }
153
154         named:  /* We found a single remote name, add that */
155                 result = talloc_realloc(mem_ctx, result, const char *, last+2);
156                 if (result == NULL) {
157                         goto failed;
158                 }
159
160                 result[last] = talloc_strdup(result, name);
161                 result[last+1] = NULL;
162                 last++;
163         }
164
165         return result;
166
167 failed:
168         talloc_free(result);
169         map_oom(module);
170         return NULL;
171 }
172
173 /* Split attributes that stay in the local partition from those that
174  * are mapped into the remote partition. */
175 static int map_attrs_partition(struct ldb_module *module, void *mem_ctx, const char ***local_attrs, const char ***remote_attrs, const char * const *attrs)
176 {
177         *local_attrs = map_attrs_select_local(module, mem_ctx, attrs);
178         *remote_attrs = map_attrs_collect_remote(module, mem_ctx, attrs);
179
180         return 0;
181 }
182
183 /* Mapping message elements
184  * ======================== */
185
186 /* Add an element to a message, overwriting any old identically named elements. */
187 static int ldb_msg_replace(struct ldb_message *msg, const struct ldb_message_element *el)
188 {
189         struct ldb_message_element *old;
190
191         old = ldb_msg_find_element(msg, el->name);
192
193         /* no local result, add as new element */
194         if (old == NULL) {
195                 if (ldb_msg_add_empty(msg, el->name, 0, &old) != 0) {
196                         return -1;
197                 }
198                 talloc_free(discard_const_p(char, old->name));
199         }
200
201         /* copy new element */
202         *old = *el;
203
204         /* and make sure we reference the contents */
205         if (!talloc_reference(msg->elements, el->name)) {
206                 return -1;
207         }
208         if (!talloc_reference(msg->elements, el->values)) {
209                 return -1;
210         }
211
212         return 0;
213 }
214
215 /* Map a message element back into the local partition. */
216 static struct ldb_message_element *ldb_msg_el_map_remote(struct ldb_module *module, 
217                                                          void *mem_ctx, 
218                                                          const struct ldb_map_attribute *map, 
219                                                          const char *attr_name,
220                                                          const struct ldb_message_element *old)
221 {
222         const struct ldb_map_context *data = map_get_context(module);
223         const char *local_attr_name = attr_name;
224         struct ldb_message_element *el;
225         unsigned int i;
226
227         el = talloc_zero(mem_ctx, struct ldb_message_element);
228         if (el == NULL) {
229                 map_oom(module);
230                 return NULL;
231         }
232
233         el->values = talloc_array(el, struct ldb_val, old->num_values);
234         if (el->values == NULL) {
235                 talloc_free(el);
236                 map_oom(module);
237                 return NULL;
238         }
239
240         for (i = 0; data->attribute_maps[i].local_name; i++) {
241                 struct ldb_map_attribute *am = &data->attribute_maps[i];
242                 if ((am->type == LDB_MAP_RENAME &&
243                         !strcmp(am->u.rename.remote_name, attr_name))
244                     || (am->type == LDB_MAP_CONVERT &&
245                         !strcmp(am->u.convert.remote_name, attr_name))) {
246
247                         local_attr_name = am->local_name;
248                         break;
249                 }
250         }
251
252         el->name = talloc_strdup(el, local_attr_name);
253         if (el->name == NULL) {
254                 talloc_free(el);
255                 map_oom(module);
256                 return NULL;
257         }
258
259         for (i = 0; i < old->num_values; i++) {
260                 el->values[i] = ldb_val_map_remote(module, el->values, map, &old->values[i]);
261                 /* Conversions might fail, in which case bail */
262                 if (!el->values[i].data) {
263                         talloc_free(el);
264                         return NULL;
265                 }
266                 el->num_values++;
267         }
268
269         return el;
270 }
271
272 /* Merge a remote message element into a local message. */
273 static int ldb_msg_el_merge(struct ldb_module *module, struct ldb_message *local, 
274                             struct ldb_message *remote, const char *attr_name)
275 {
276         const struct ldb_map_context *data = map_get_context(module);
277         const struct ldb_map_attribute *map;
278         struct ldb_message_element *old, *el=NULL;
279         const char *remote_name = NULL;
280         struct ldb_context *ldb;
281
282         ldb = ldb_module_get_ctx(module);
283
284         /* We handle wildcards in ldb_msg_el_merge_wildcard */
285         if (ldb_attr_cmp(attr_name, "*") == 0) {
286                 return LDB_SUCCESS;
287         }
288
289         map = map_attr_find_local(data, attr_name);
290
291         /* Unknown attribute in remote message:
292          * skip, attribute was probably auto-generated */
293         if (map == NULL) {
294                 return LDB_SUCCESS;
295         }
296
297         switch (map->type) {
298         case LDB_MAP_IGNORE:
299                 break;
300         case LDB_MAP_CONVERT:
301                 remote_name = map->u.convert.remote_name;
302                 break;
303         case LDB_MAP_KEEP:
304                 remote_name = attr_name;
305                 break;
306         case LDB_MAP_RENAME:
307                 remote_name = map->u.rename.remote_name;
308                 break;
309         case LDB_MAP_GENERATE:
310                 break;
311         }
312
313         switch (map->type) {
314         case LDB_MAP_IGNORE:
315                 return LDB_SUCCESS;
316
317         case LDB_MAP_CONVERT:
318                 if (map->u.convert.convert_remote == NULL) {
319                         ldb_debug(ldb, LDB_DEBUG_ERROR, "ldb_map: "
320                                   "Skipping attribute '%s': "
321                                   "'convert_remote' not set",
322                                   attr_name);
323                         return LDB_SUCCESS;
324                 }
325                 /* fall through */
326         case LDB_MAP_KEEP:
327         case LDB_MAP_RENAME:
328                 old = ldb_msg_find_element(remote, remote_name);
329                 if (old) {
330                         el = ldb_msg_el_map_remote(module, local, map, attr_name, old);
331                 } else {
332                         return LDB_ERR_NO_SUCH_ATTRIBUTE;
333                 }
334                 break;
335
336         case LDB_MAP_GENERATE:
337                 if (map->u.generate.generate_local == NULL) {
338                         ldb_debug(ldb, LDB_DEBUG_ERROR, "ldb_map: "
339                                   "Skipping attribute '%s': "
340                                   "'generate_local' not set",
341                                   attr_name);
342                         return LDB_SUCCESS;
343                 }
344
345                 el = map->u.generate.generate_local(module, local, attr_name, remote);
346                 if (!el) {
347                         /* Generation failure is probably due to lack of source attributes */
348                         return LDB_ERR_NO_SUCH_ATTRIBUTE;
349                 }
350                 break;
351         }
352
353         if (el == NULL) {
354                 return LDB_ERR_NO_SUCH_ATTRIBUTE;
355         }
356
357         return ldb_msg_replace(local, el);
358 }
359
360 /* Handle wildcard parts of merging a remote message element into a local message. */
361 static int ldb_msg_el_merge_wildcard(struct ldb_module *module, struct ldb_message *local, 
362                                      struct ldb_message *remote)
363 {
364         const struct ldb_map_context *data = map_get_context(module);
365         const struct ldb_map_attribute *map = map_attr_find_local(data, "*");
366         struct ldb_message_element *el=NULL;
367         unsigned int i;
368         int ret;
369
370         /* Perhaps we have a mapping for "*" */
371         if (map && map->type == LDB_MAP_KEEP) {
372                 /* We copy everything over, and hope that anything with a 
373                    more specific rule is overwritten */
374                 for (i = 0; i < remote->num_elements; i++) {
375                         el = ldb_msg_el_map_remote(module, local, map, remote->elements[i].name,
376                                                    &remote->elements[i]);
377                         if (el == NULL) {
378                                 return LDB_ERR_OPERATIONS_ERROR;
379                         }
380                         
381                         ret = ldb_msg_replace(local, el);
382                         if (ret) {
383                                 return ret;
384                         }
385                 }
386         }
387         
388         /* Now walk the list of possible mappings, and apply each */
389         for (i = 0; data->attribute_maps[i].local_name; i++) {
390                 ret = ldb_msg_el_merge(module, local, remote, 
391                                        data->attribute_maps[i].local_name);
392                 if (ret == LDB_ERR_NO_SUCH_ATTRIBUTE) {
393                         continue;
394                 } else if (ret) {
395                         return ret;
396                 } else {
397                         continue;
398                 }
399         }
400
401         return LDB_SUCCESS;
402 }
403
404 /* Mapping messages
405  * ================ */
406
407 /* Merge two local messages into a single one. */
408 static int ldb_msg_merge_local(struct ldb_module *module, struct ldb_message *msg1, struct ldb_message *msg2)
409 {
410         unsigned int i;
411         int ret;
412
413         for (i = 0; i < msg2->num_elements; i++) {
414                 ret = ldb_msg_replace(msg1, &msg2->elements[i]);
415                 if (ret) {
416                         return ret;
417                 }
418         }
419
420         return LDB_SUCCESS;
421 }
422
423 /* Merge a local and a remote message into a single local one. */
424 static int ldb_msg_merge_remote(struct map_context *ac, struct ldb_message *local, 
425                                 struct ldb_message *remote)
426 {
427         unsigned int i;
428         int ret;
429         const char * const *attrs = ac->all_attrs;
430         if (!attrs) {
431                 ret = ldb_msg_el_merge_wildcard(ac->module, local, remote);
432                 if (ret) {
433                         return ret;
434                 }
435         }
436
437         for (i = 0; attrs && attrs[i]; i++) {
438                 if (ldb_attr_cmp(attrs[i], "*") == 0) {
439                         ret = ldb_msg_el_merge_wildcard(ac->module, local, remote);
440                         if (ret) {
441                                 return ret;
442                         }
443                         break;
444                 }
445         }
446
447         /* Try to map each attribute back;
448          * Add to local message is possible,
449          * Overwrite old local attribute if necessary */
450         for (i = 0; attrs && attrs[i]; i++) {
451                 ret = ldb_msg_el_merge(ac->module, local, remote, 
452                                        attrs[i]);
453                 if (ret == LDB_ERR_NO_SUCH_ATTRIBUTE) {
454                 } else if (ret) {
455                         return ret;
456                 }
457         }
458
459         return LDB_SUCCESS;
460 }
461
462 /* Mapping search results
463  * ====================== */
464
465 /* Map a search result back into the local partition. */
466 static int map_reply_remote(struct map_context *ac, struct ldb_reply *ares)
467 {
468         struct ldb_message *msg;
469         struct ldb_dn *dn;
470         int ret;
471
472         /* There is no result message, skip */
473         if (ares->type != LDB_REPLY_ENTRY) {
474                 return 0;
475         }
476
477         /* Create a new result message */
478         msg = ldb_msg_new(ares);
479         if (msg == NULL) {
480                 map_oom(ac->module);
481                 return -1;
482         }
483
484         /* Merge remote message into new message */
485         ret = ldb_msg_merge_remote(ac, msg, ares->message);
486         if (ret) {
487                 talloc_free(msg);
488                 return ret;
489         }
490
491         /* Create corresponding local DN */
492         dn = ldb_dn_map_rebase_remote(ac->module, msg, ares->message->dn);
493         if (dn == NULL) {
494                 talloc_free(msg);
495                 return -1;
496         }
497         msg->dn = dn;
498
499         /* Store new message with new DN as the result */
500         talloc_free(ares->message);
501         ares->message = msg;
502
503         return 0;
504 }
505
506 /* Mapping parse trees
507  * =================== */
508
509 /* Check whether a parse tree can safely be split in two. */
510 static bool ldb_parse_tree_check_splittable(const struct ldb_parse_tree *tree)
511 {
512         const struct ldb_parse_tree *subtree = tree;
513         bool negate = false;
514
515         while (subtree) {
516                 switch (subtree->operation) {
517                 case LDB_OP_NOT:
518                         negate = !negate;
519                         subtree = subtree->u.isnot.child;
520                         continue;
521
522                 case LDB_OP_AND:
523                         return !negate; /* if negate: False */
524
525                 case LDB_OP_OR:
526                         return negate;  /* if negate: True */
527
528                 default:
529                         return true;    /* simple parse tree */
530                 }
531         }
532
533         return true;                    /* no parse tree */
534 }
535
536 /* Collect a list of attributes required to match a given parse tree. */
537 static int ldb_parse_tree_collect_attrs(struct ldb_module *module, void *mem_ctx, const char ***attrs, const struct ldb_parse_tree *tree)
538 {
539         const char **new_attrs;
540         unsigned int i;
541         int ret;
542
543         if (tree == NULL) {
544                 return 0;
545         }
546
547         switch (tree->operation) {
548         case LDB_OP_OR:
549         case LDB_OP_AND:                /* attributes stored in list of subtrees */
550                 for (i = 0; i < tree->u.list.num_elements; i++) {
551                         ret = ldb_parse_tree_collect_attrs(module, mem_ctx, 
552                                                            attrs, tree->u.list.elements[i]);
553                         if (ret) {
554                                 return ret;
555                         }
556                 }
557                 return 0;
558
559         case LDB_OP_NOT:                /* attributes stored in single subtree */
560                 return ldb_parse_tree_collect_attrs(module, mem_ctx, attrs, tree->u.isnot.child);
561
562         default:                        /* single attribute in tree */
563                 new_attrs = ldb_attr_list_copy_add(mem_ctx, *attrs, tree->u.equality.attr);
564                 talloc_free(*attrs);
565                 *attrs = new_attrs;
566                 return 0;
567         }
568 }
569
570 static int map_subtree_select_local(struct ldb_module *module, void *mem_ctx, struct ldb_parse_tree **new, const struct ldb_parse_tree *tree);
571
572 /* Select a negated subtree that queries attributes in the local partition */
573 static int map_subtree_select_local_not(struct ldb_module *module, void *mem_ctx, struct ldb_parse_tree **new, const struct ldb_parse_tree *tree)
574 {
575         struct ldb_parse_tree *child;
576         int ret;
577
578         /* Prepare new tree */
579         *new = talloc_memdup(mem_ctx, tree, sizeof(struct ldb_parse_tree));
580         if (*new == NULL) {
581                 map_oom(module);
582                 return -1;
583         }
584
585         /* Generate new subtree */
586         ret = map_subtree_select_local(module, *new, &child, tree->u.isnot.child);
587         if (ret) {
588                 talloc_free(*new);
589                 return ret;
590         }
591
592         /* Prune tree without subtree */
593         if (child == NULL) {
594                 talloc_free(*new);
595                 *new = NULL;
596                 return 0;
597         }
598
599         (*new)->u.isnot.child = child;
600
601         return ret;
602 }
603
604 /* Select a list of subtrees that query attributes in the local partition */
605 static int map_subtree_select_local_list(struct ldb_module *module, void *mem_ctx, struct ldb_parse_tree **new, const struct ldb_parse_tree *tree)
606 {
607         unsigned int i, j;
608         int ret=0;
609
610         /* Prepare new tree */
611         *new = talloc_memdup(mem_ctx, tree, sizeof(struct ldb_parse_tree));
612         if (*new == NULL) {
613                 map_oom(module);
614                 return -1;
615         }
616
617         /* Prepare list of subtrees */
618         (*new)->u.list.num_elements = 0;
619         (*new)->u.list.elements = talloc_array(*new, struct ldb_parse_tree *, tree->u.list.num_elements);
620         if ((*new)->u.list.elements == NULL) {
621                 map_oom(module);
622                 talloc_free(*new);
623                 return -1;
624         }
625
626         /* Generate new list of subtrees */
627         j = 0;
628         for (i = 0; i < tree->u.list.num_elements; i++) {
629                 struct ldb_parse_tree *child;
630                 ret = map_subtree_select_local(module, *new, &child, tree->u.list.elements[i]);
631                 if (ret) {
632                         talloc_free(*new);
633                         return ret;
634                 }
635
636                 if (child) {
637                         (*new)->u.list.elements[j] = child;
638                         j++;
639                 }
640         }
641
642         /* Prune tree without subtrees */
643         if (j == 0) {
644                 talloc_free(*new);
645                 *new = NULL;
646                 return 0;
647         }
648
649         /* Fix subtree list size */
650         (*new)->u.list.num_elements = j;
651         (*new)->u.list.elements = talloc_realloc(*new, (*new)->u.list.elements, struct ldb_parse_tree *, (*new)->u.list.num_elements);
652
653         return ret;
654 }
655
656 /* Select a simple subtree that queries attributes in the local partition */
657 static int map_subtree_select_local_simple(struct ldb_module *module, void *mem_ctx, struct ldb_parse_tree **new, const struct ldb_parse_tree *tree)
658 {
659         /* Prepare new tree */
660         *new = talloc_memdup(mem_ctx, tree, sizeof(struct ldb_parse_tree));
661         if (*new == NULL) {
662                 map_oom(module);
663                 return -1;
664         }
665
666         return 0;
667 }
668
669 /* Select subtrees that query attributes in the local partition */
670 static int map_subtree_select_local(struct ldb_module *module, void *mem_ctx, struct ldb_parse_tree **new, const struct ldb_parse_tree *tree)
671 {
672         const struct ldb_map_context *data = map_get_context(module);
673
674         if (tree == NULL) {
675                 return 0;
676         }
677
678         if (tree->operation == LDB_OP_NOT) {
679                 return map_subtree_select_local_not(module, mem_ctx, new, tree);
680         }
681
682         if (tree->operation == LDB_OP_AND || tree->operation == LDB_OP_OR) {
683                 return map_subtree_select_local_list(module, mem_ctx, new, tree);
684         }
685
686         if (map_attr_check_remote(data, tree->u.equality.attr)) {
687                 *new = NULL;
688                 return 0;
689         }
690
691         return map_subtree_select_local_simple(module, mem_ctx, new, tree);
692 }
693
694 static int map_subtree_collect_remote(struct ldb_module *module, void *mem_ctx, struct ldb_parse_tree **new, const struct ldb_parse_tree *tree);
695
696 /* Collect a negated subtree that queries attributes in the remote partition */
697 static int map_subtree_collect_remote_not(struct ldb_module *module, void *mem_ctx, struct ldb_parse_tree **new, const struct ldb_parse_tree *tree)
698 {
699         struct ldb_parse_tree *child;
700         int ret;
701
702         /* Prepare new tree */
703         *new = talloc_memdup(mem_ctx, tree, sizeof(struct ldb_parse_tree));
704         if (*new == NULL) {
705                 map_oom(module);
706                 return -1;
707         }
708
709         /* Generate new subtree */
710         ret = map_subtree_collect_remote(module, *new, &child, tree->u.isnot.child);
711         if (ret) {
712                 talloc_free(*new);
713                 return ret;
714         }
715
716         /* Prune tree without subtree */
717         if (child == NULL) {
718                 talloc_free(*new);
719                 *new = NULL;
720                 return 0;
721         }
722
723         (*new)->u.isnot.child = child;
724
725         return ret;
726 }
727
728 /* Collect a list of subtrees that query attributes in the remote partition */
729 static int map_subtree_collect_remote_list(struct ldb_module *module, void *mem_ctx, struct ldb_parse_tree **new, const struct ldb_parse_tree *tree)
730 {
731         unsigned int i, j;
732         int ret=0;
733
734         /* Prepare new tree */
735         *new = talloc_memdup(mem_ctx, tree, sizeof(struct ldb_parse_tree));
736         if (*new == NULL) {
737                 map_oom(module);
738                 return -1;
739         }
740
741         /* Prepare list of subtrees */
742         (*new)->u.list.num_elements = 0;
743         (*new)->u.list.elements = talloc_array(*new, struct ldb_parse_tree *, tree->u.list.num_elements);
744         if ((*new)->u.list.elements == NULL) {
745                 map_oom(module);
746                 talloc_free(*new);
747                 return -1;
748         }
749
750         /* Generate new list of subtrees */
751         j = 0;
752         for (i = 0; i < tree->u.list.num_elements; i++) {
753                 struct ldb_parse_tree *child;
754                 ret = map_subtree_collect_remote(module, *new, &child, tree->u.list.elements[i]);
755                 if (ret) {
756                         talloc_free(*new);
757                         return ret;
758                 }
759
760                 if (child) {
761                         (*new)->u.list.elements[j] = child;
762                         j++;
763                 }
764         }
765
766         /* Prune tree without subtrees */
767         if (j == 0) {
768                 talloc_free(*new);
769                 *new = NULL;
770                 return 0;
771         }
772
773         /* Fix subtree list size */
774         (*new)->u.list.num_elements = j;
775         (*new)->u.list.elements = talloc_realloc(*new, (*new)->u.list.elements, struct ldb_parse_tree *, (*new)->u.list.num_elements);
776
777         return ret;
778 }
779
780 /* Collect a simple subtree that queries attributes in the remote partition */
781 int map_subtree_collect_remote_simple(struct ldb_module *module, void *mem_ctx, struct ldb_parse_tree **new, const struct ldb_parse_tree *tree, const struct ldb_map_attribute *map)
782 {
783         const char *attr;
784
785         /* Prepare new tree */
786         *new = talloc(mem_ctx, struct ldb_parse_tree);
787         if (*new == NULL) {
788                 map_oom(module);
789                 return -1;
790         }
791         **new = *tree;
792         
793         if (map->type == LDB_MAP_KEEP) {
794                 /* Nothing to do here */
795                 return 0;
796         }
797
798         /* Store attribute and value in new tree */
799         switch (tree->operation) {
800         case LDB_OP_PRESENT:
801                 attr = map_attr_map_local(*new, map, tree->u.present.attr);
802                 (*new)->u.present.attr = attr;
803                 break;
804         case LDB_OP_SUBSTRING:
805         {
806                 attr = map_attr_map_local(*new, map, tree->u.substring.attr);
807                 (*new)->u.substring.attr = attr;
808                 break;
809         }
810         case LDB_OP_EQUALITY:
811                 attr = map_attr_map_local(*new, map, tree->u.equality.attr);
812                 (*new)->u.equality.attr = attr;
813                 break;
814         case LDB_OP_LESS:
815         case LDB_OP_GREATER:
816         case LDB_OP_APPROX:
817                 attr = map_attr_map_local(*new, map, tree->u.comparison.attr);
818                 (*new)->u.comparison.attr = attr;
819                 break;
820         case LDB_OP_EXTENDED:
821                 attr = map_attr_map_local(*new, map, tree->u.extended.attr);
822                 (*new)->u.extended.attr = attr;
823                 break;
824         default:                        /* unknown kind of simple subtree */
825                 talloc_free(*new);
826                 return -1;
827         }
828
829         if (attr == NULL) {
830                 talloc_free(*new);
831                 *new = NULL;
832                 return 0;
833         }
834
835         if (map->type == LDB_MAP_RENAME) {
836                 /* Nothing more to do here, the attribute has been renamed */
837                 return 0;
838         }
839
840         /* Store attribute and value in new tree */
841         switch (tree->operation) {
842         case LDB_OP_PRESENT:
843                 break;
844         case LDB_OP_SUBSTRING:
845         {
846                 int i;
847                 /* Map value */
848                 (*new)->u.substring.chunks = NULL;
849                 for (i=0; tree->u.substring.chunks[i]; i++) {
850                         (*new)->u.substring.chunks = talloc_realloc(*new, (*new)->u.substring.chunks, struct ldb_val *, i+2);
851                         if (!(*new)->u.substring.chunks) {
852                                 talloc_free(*new);
853                                 *new = NULL;
854                                 return 0;
855                         }
856                         (*new)->u.substring.chunks[i] = talloc(*new, struct ldb_val);
857                         if (!(*new)->u.substring.chunks[i]) {
858                                 talloc_free(*new);
859                                 *new = NULL;
860                                 return 0;
861                         }
862                         *(*new)->u.substring.chunks[i] = ldb_val_map_local(module, *new, map, tree->u.substring.chunks[i]);
863                         (*new)->u.substring.chunks[i+1] = NULL;
864                 }
865                 break;
866         }
867         case LDB_OP_EQUALITY:
868                 (*new)->u.equality.value = ldb_val_map_local(module, *new, map, &tree->u.equality.value);
869                 break;
870         case LDB_OP_LESS:
871         case LDB_OP_GREATER:
872         case LDB_OP_APPROX:
873                 (*new)->u.comparison.value = ldb_val_map_local(module, *new, map, &tree->u.comparison.value);
874                 break;
875         case LDB_OP_EXTENDED:
876                 (*new)->u.extended.value = ldb_val_map_local(module, *new, map, &tree->u.extended.value);
877                 (*new)->u.extended.rule_id = talloc_strdup(*new, tree->u.extended.rule_id);
878                 break;
879         default:                        /* unknown kind of simple subtree */
880                 talloc_free(*new);
881                 return -1;
882         }
883
884         return 0;
885 }
886
887 /* Collect subtrees that query attributes in the remote partition */
888 static int map_subtree_collect_remote(struct ldb_module *module, void *mem_ctx, struct ldb_parse_tree **new, const struct ldb_parse_tree *tree)
889 {
890         const struct ldb_map_context *data = map_get_context(module);
891         const struct ldb_map_attribute *map;
892         struct ldb_context *ldb;
893
894         ldb = ldb_module_get_ctx(module);
895
896         if (tree == NULL) {
897                 return 0;
898         }
899
900         if (tree->operation == LDB_OP_NOT) {
901                 return map_subtree_collect_remote_not(module, mem_ctx, new, tree);
902         }
903
904         if ((tree->operation == LDB_OP_AND) || (tree->operation == LDB_OP_OR)) {
905                 return map_subtree_collect_remote_list(module, mem_ctx, new, tree);
906         }
907
908         if (!map_attr_check_remote(data, tree->u.equality.attr)) {
909                 *new = NULL;
910                 return 0;
911         }
912
913         map = map_attr_find_local(data, tree->u.equality.attr);
914         if (map->convert_operator) {
915                 return map->convert_operator(module, mem_ctx, new, tree);
916         }
917
918         if (map->type == LDB_MAP_GENERATE) {
919                 ldb_debug(ldb, LDB_DEBUG_WARNING, "ldb_map: "
920                           "Skipping attribute '%s': "
921                           "'convert_operator' not set",
922                           tree->u.equality.attr);
923                 *new = NULL;
924                 return 0;
925         }
926
927         return map_subtree_collect_remote_simple(module, mem_ctx, new, tree, map);
928 }
929
930 /* Split subtrees that query attributes in the local partition from
931  * those that query the remote partition. */
932 static int ldb_parse_tree_partition(struct ldb_module *module,
933                                         void *mem_ctx,
934                                         struct ldb_parse_tree **local_tree,
935                                         struct ldb_parse_tree **remote_tree,
936                                         const struct ldb_parse_tree *tree)
937 {
938         int ret;
939
940         *local_tree = NULL;
941         *remote_tree = NULL;
942
943         /* No original tree */
944         if (tree == NULL) {
945                 return 0;
946         }
947
948         /* Generate local tree */
949         ret = map_subtree_select_local(module, mem_ctx, local_tree, tree);
950         if (ret) {
951                 return ret;
952         }
953
954         /* Generate remote tree */
955         ret = map_subtree_collect_remote(module, mem_ctx, remote_tree, tree);
956         if (ret) {
957                 talloc_free(*local_tree);
958                 return ret;
959         }
960
961         return 0;
962 }
963
964 /* Collect a list of attributes required either explicitly from a
965  * given list or implicitly  from a given parse tree; split the
966  * collected list into local and remote parts. */
967 static int map_attrs_collect_and_partition(struct ldb_module *module, struct map_context *ac,
968                                            const char * const *search_attrs, 
969                                            const struct ldb_parse_tree *tree)
970 {
971         void *tmp_ctx;
972         const char **tree_attrs;
973         const char **remote_attrs;
974         const char **local_attrs;
975         int ret;
976
977         /* There is no tree, just partition the searched attributes */
978         if (tree == NULL) {
979                 ret = map_attrs_partition(module, ac, 
980                                           &local_attrs, &remote_attrs, search_attrs);
981                 if (ret == 0) {
982                         ac->local_attrs = local_attrs;
983                         ac->remote_attrs = remote_attrs;
984                         ac->all_attrs = search_attrs;
985                 }
986                 return ret; 
987         }
988
989         /* Create context for temporary memory */
990         tmp_ctx = talloc_new(ac);
991         if (tmp_ctx == NULL) {
992                 goto oom;
993         }
994
995         /* Prepare list of attributes from tree */
996         tree_attrs = talloc_array(tmp_ctx, const char *, 1);
997         if (tree_attrs == NULL) {
998                 talloc_free(tmp_ctx);
999                 goto oom;
1000         }
1001         tree_attrs[0] = NULL;
1002
1003         /* Collect attributes from tree */
1004         ret = ldb_parse_tree_collect_attrs(module, tmp_ctx, &tree_attrs, tree);
1005         if (ret) {
1006                 goto done;
1007         }
1008
1009         /* Merge attributes from search operation */
1010         ret = map_attrs_merge(module, tmp_ctx, &tree_attrs, search_attrs);
1011         if (ret) {
1012                 goto done;
1013         }
1014
1015         /* Split local from remote attributes */
1016         ret = map_attrs_partition(module, ac, &local_attrs, 
1017                                   &remote_attrs, tree_attrs);
1018         
1019         if (ret == 0) {
1020                 ac->local_attrs = local_attrs;
1021                 ac->remote_attrs = remote_attrs;
1022                 talloc_steal(ac, tree_attrs);
1023                 ac->all_attrs = tree_attrs;
1024         }
1025 done:
1026         /* Free temporary memory */
1027         talloc_free(tmp_ctx);
1028         return ret;
1029
1030 oom:
1031         map_oom(module);
1032         return -1;
1033 }
1034
1035
1036 /* Outbound requests: search
1037  * ========================= */
1038
1039 static int map_remote_search_callback(struct ldb_request *req,
1040                                         struct ldb_reply *ares);
1041 static int map_local_merge_callback(struct ldb_request *req,
1042                                         struct ldb_reply *ares);
1043 static int map_search_local(struct map_context *ac);
1044
1045 static int map_save_entry(struct map_context *ac, struct ldb_reply *ares)
1046 {
1047         struct map_reply *mr;
1048
1049         mr = talloc_zero(ac, struct map_reply);
1050         if (mr == NULL) {
1051                 map_oom(ac->module);
1052                 return LDB_ERR_OPERATIONS_ERROR;
1053         }
1054         mr->remote = talloc_steal(mr, ares);
1055         if (ac->r_current) {
1056                 ac->r_current->next = mr;
1057         } else {
1058                 /* first entry */
1059                 ac->r_list = mr;
1060         }
1061         ac->r_current = mr;
1062
1063         return LDB_SUCCESS;
1064 }
1065
1066 /* Pass a merged search result up the callback chain. */
1067 int map_return_entry(struct map_context *ac, struct ldb_reply *ares)
1068 {
1069         struct ldb_message_element *el;
1070         const char * const *attrs;
1071         struct ldb_context *ldb;
1072         unsigned int i;
1073         int ret;
1074         bool matched;
1075
1076         ldb = ldb_module_get_ctx(ac->module);
1077
1078         /* Merged result doesn't match original query, skip */
1079         ret = ldb_match_msg_error(ldb, ares->message,
1080                                   ac->req->op.search.tree,
1081                                   ac->req->op.search.base,
1082                                   ac->req->op.search.scope,
1083                                   &matched);
1084         if (ret != LDB_SUCCESS) return ret;
1085         if (!matched) {
1086                 ldb_debug(ldb, LDB_DEBUG_TRACE, "ldb_map: "
1087                           "Skipping record '%s': "
1088                           "doesn't match original search",
1089                           ldb_dn_get_linearized(ares->message->dn));
1090                 return LDB_SUCCESS;
1091         }
1092
1093         /* Limit result to requested attrs */
1094         if (ac->req->op.search.attrs &&
1095             (! ldb_attr_in_list(ac->req->op.search.attrs, "*"))) {
1096
1097                 attrs = ac->req->op.search.attrs;
1098                 i = 0;
1099
1100                 while (i < ares->message->num_elements) {
1101
1102                         el = &ares->message->elements[i];
1103                         if ( ! ldb_attr_in_list(attrs, el->name)) {
1104                                 ldb_msg_remove_element(ares->message, el);
1105                         } else {
1106                                 i++;
1107                         }
1108                 }
1109         }
1110
1111         return ldb_module_send_entry(ac->req, ares->message, ares->controls);
1112 }
1113
1114 /* Search a record. */
1115 int map_search(struct ldb_module *module, struct ldb_request *req)
1116 {
1117         struct ldb_parse_tree *remote_tree;
1118         struct ldb_parse_tree *local_tree;
1119         struct ldb_request *remote_req;
1120         struct ldb_context *ldb;
1121         struct map_context *ac;
1122         int ret;
1123
1124         const char *wildcard[] = { "*", NULL };
1125         const char * const *attrs;
1126
1127         ldb = ldb_module_get_ctx(module);
1128
1129         /* if we're not yet initialized, go to the next module */
1130         if (!ldb_module_get_private(module))
1131                 return ldb_next_request(module, req);
1132
1133         /* Do not manipulate our control entries */
1134         if (ldb_dn_is_special(req->op.search.base)) {
1135                 return ldb_next_request(module, req);
1136         }
1137
1138         /* No mapping requested, skip to next module */
1139         if ((req->op.search.base) && (!ldb_dn_check_local(module, req->op.search.base))) {
1140                 return ldb_next_request(module, req);
1141         }
1142
1143         /* TODO: How can we be sure about which partition we are
1144          *       targetting when there is no search base? */
1145
1146         /* Prepare context and handle */
1147         ac = map_init_context(module, req);
1148         if (ac == NULL) {
1149                 return LDB_ERR_OPERATIONS_ERROR;
1150         }
1151
1152         /* It is easier to deal with the two different ways of
1153          * expressing the wildcard in the same codepath */
1154         attrs = req->op.search.attrs;
1155         if (attrs == NULL) {
1156                 attrs = wildcard;
1157         }
1158
1159         /* Split local from remote attrs */
1160         ret = map_attrs_collect_and_partition(module, ac, 
1161                                               attrs, req->op.search.tree);
1162         if (ret) {
1163                 return LDB_ERR_OPERATIONS_ERROR;
1164         }
1165
1166         /* Split local from remote tree */
1167         ret = ldb_parse_tree_partition(module, ac,
1168                                        &local_tree, &remote_tree,
1169                                        req->op.search.tree);
1170         if (ret) {
1171                 return LDB_ERR_OPERATIONS_ERROR;
1172         }
1173
1174         if (((local_tree != NULL) && (remote_tree != NULL)) &&
1175             (!ldb_parse_tree_check_splittable(req->op.search.tree))) {
1176                 /* The query can't safely be split, enumerate the remote partition */
1177                 local_tree = NULL;
1178                 remote_tree = NULL;
1179         }
1180
1181         if (local_tree == NULL) {
1182                 /* Construct default local parse tree */
1183                 local_tree = talloc_zero(ac, struct ldb_parse_tree);
1184                 if (local_tree == NULL) {
1185                         map_oom(ac->module);
1186                         return LDB_ERR_OPERATIONS_ERROR;
1187                 }
1188
1189                 local_tree->operation = LDB_OP_PRESENT;
1190                 local_tree->u.present.attr = talloc_strdup(local_tree, IS_MAPPED);
1191         }
1192         if (remote_tree == NULL) {
1193                 /* Construct default remote parse tree */
1194                 remote_tree = ldb_parse_tree(ac, NULL);
1195                 if (remote_tree == NULL) {
1196                         return LDB_ERR_OPERATIONS_ERROR;
1197                 }
1198         }
1199
1200         ac->local_tree = local_tree;
1201
1202         /* Prepare the remote operation */
1203         ret = ldb_build_search_req_ex(&remote_req, ldb, ac,
1204                                       req->op.search.base,
1205                                       req->op.search.scope,
1206                                       remote_tree,
1207                                       ac->remote_attrs,
1208                                       req->controls,
1209                                       ac, map_remote_search_callback,
1210                                       req);
1211         LDB_REQ_SET_LOCATION(remote_req);
1212         if (ret != LDB_SUCCESS) {
1213                 return LDB_ERR_OPERATIONS_ERROR;
1214         }
1215
1216         return ldb_next_remote_request(module, remote_req);
1217 }
1218
1219 /* Now, search the local part of a remote search result. */
1220 static int map_remote_search_callback(struct ldb_request *req,
1221                                         struct ldb_reply *ares)
1222 {
1223         struct map_context *ac;
1224         int ret;
1225
1226         ac = talloc_get_type(req->context, struct map_context);
1227
1228         if (!ares) {
1229                 return ldb_module_done(ac->req, NULL, NULL,
1230                                         LDB_ERR_OPERATIONS_ERROR);
1231         }
1232         if (ares->error != LDB_SUCCESS) {
1233                 return ldb_module_done(ac->req, ares->controls,
1234                                         ares->response, ares->error);
1235         }
1236
1237         switch (ares->type) {
1238         case LDB_REPLY_REFERRAL:
1239
1240                 /* ignore referrals */
1241                 talloc_free(ares);
1242                 return LDB_SUCCESS;
1243
1244         case LDB_REPLY_ENTRY:
1245
1246                 /* Map result record into a local message */
1247                 ret = map_reply_remote(ac, ares);
1248                 if (ret) {
1249                         talloc_free(ares);
1250                         return ldb_module_done(ac->req, NULL, NULL,
1251                                                 LDB_ERR_OPERATIONS_ERROR);
1252                 }
1253
1254                 /* if we have no local db, then we can just return the reply to
1255                  * the upper layer, otherwise we must save it and process it
1256                  * when all replies ahve been gathered */
1257                 if ( ! map_check_local_db(ac->module)) {
1258                         ret = map_return_entry(ac, ares);
1259                 } else {
1260                         ret = map_save_entry(ac,ares);
1261                 }
1262
1263                 if (ret != LDB_SUCCESS) {
1264                         talloc_free(ares);
1265                         return ldb_module_done(ac->req, NULL, NULL,
1266                                                 LDB_ERR_OPERATIONS_ERROR);
1267                 }
1268                 break;
1269
1270         case LDB_REPLY_DONE:
1271
1272                 if ( ! map_check_local_db(ac->module)) {
1273                         return ldb_module_done(ac->req, ares->controls,
1274                                                 ares->response, LDB_SUCCESS);
1275                 }
1276
1277                 /* reset the pointer to the start of the list */
1278                 ac->r_current = ac->r_list;
1279
1280                 /* no entry just return */
1281                 if (ac->r_current == NULL) {
1282                         ret = ldb_module_done(ac->req, ares->controls,
1283                                                 ares->response, LDB_SUCCESS);
1284                         talloc_free(ares);
1285                         return ret;
1286                 }
1287
1288                 ac->remote_done_ares = talloc_steal(ac, ares);
1289
1290                 ret = map_search_local(ac);
1291                 if (ret != LDB_SUCCESS) {
1292                         return ldb_module_done(ac->req, NULL, NULL, ret);
1293                 }
1294         }
1295
1296         return LDB_SUCCESS;
1297 }
1298
1299 static int map_search_local(struct map_context *ac)
1300 {
1301         struct ldb_request *search_req;
1302
1303         if (ac->r_current == NULL || ac->r_current->remote == NULL) {
1304                 return LDB_ERR_OPERATIONS_ERROR;
1305         }
1306
1307         /* Prepare local search request */
1308         /* TODO: use GUIDs here instead? */
1309         search_req = map_search_base_req(ac,
1310                                          ac->r_current->remote->message->dn,
1311                                          NULL, NULL,
1312                                          ac, map_local_merge_callback);
1313         if (search_req == NULL) {
1314                 return LDB_ERR_OPERATIONS_ERROR;
1315         }
1316
1317         return ldb_next_request(ac->module, search_req);
1318 }
1319
1320 /* Merge the remote and local parts of a search result. */
1321 int map_local_merge_callback(struct ldb_request *req, struct ldb_reply *ares)
1322 {
1323         struct ldb_context *ldb;
1324         struct map_context *ac;
1325         int ret;
1326
1327         ac = talloc_get_type(req->context, struct map_context);
1328         ldb = ldb_module_get_ctx(ac->module);
1329
1330         if (!ares) {
1331                 return ldb_module_done(ac->req, NULL, NULL,
1332                                         LDB_ERR_OPERATIONS_ERROR);
1333         }
1334         if (ares->error != LDB_SUCCESS) {
1335                 return ldb_module_done(ac->req, ares->controls,
1336                                         ares->response, ares->error);
1337         }
1338
1339         switch (ares->type) {
1340         case LDB_REPLY_ENTRY:
1341                 /* We have already found a local record */
1342                 if (ac->r_current->local) {
1343                         talloc_free(ares);
1344                         ldb_set_errstring(ldb, "ldb_map: Too many results!");
1345                         return ldb_module_done(ac->req, NULL, NULL,
1346                                                 LDB_ERR_OPERATIONS_ERROR);
1347                 }
1348
1349                 /* Store local result */
1350                 ac->r_current->local = talloc_steal(ac->r_current, ares);
1351
1352                 break;
1353
1354         case LDB_REPLY_REFERRAL:
1355                 /* ignore referrals */
1356                 talloc_free(ares);
1357                 break;
1358
1359         case LDB_REPLY_DONE:
1360                 /* We don't need the local 'ares', but we will use the remote one from below */
1361                 talloc_free(ares);
1362
1363                 /* No local record found, map and send remote record */
1364                 if (ac->r_current->local != NULL) {
1365                         /* Merge remote into local message */
1366                         ret = ldb_msg_merge_local(ac->module,
1367                                                   ac->r_current->local->message,
1368                                                   ac->r_current->remote->message);
1369                         if (ret == LDB_SUCCESS) {
1370                                 ret = map_return_entry(ac, ac->r_current->local);
1371                         }
1372                         if (ret != LDB_SUCCESS) {
1373                                 return ldb_module_done(ac->req, NULL, NULL,
1374                                                         LDB_ERR_OPERATIONS_ERROR);
1375                         }
1376                 } else {
1377                         ret = map_return_entry(ac, ac->r_current->remote);
1378                         if (ret != LDB_SUCCESS) {
1379                                 return ldb_module_done(ac->req,
1380                                                         NULL, NULL, ret);
1381                         }
1382                 }
1383
1384                 if (ac->r_current->next != NULL) {
1385                         ac->r_current = ac->r_current->next;
1386                         if (ac->r_current->remote->type == LDB_REPLY_ENTRY) {
1387                                 ret = map_search_local(ac);
1388                                 if (ret != LDB_SUCCESS) {
1389                                         return ldb_module_done(ac->req,
1390                                                                NULL, NULL, ret);
1391                                 }
1392                                 break;
1393                         }
1394                 }
1395
1396                 /* ok we are done with all search, finally it is time to
1397                  * finish operations for this module */
1398                 return ldb_module_done(ac->req,
1399                                         ac->remote_done_ares->controls,
1400                                         ac->remote_done_ares->response,
1401                                         ac->remote_done_ares->error);
1402         }
1403
1404         return LDB_SUCCESS;
1405 }