ctdb-daemon: Fix implementation of process_exists control
[vlendec/samba-autobuild/.git] / ctdb / server / ctdb_traverse.c
1 /* 
2    efficient async ctdb traverse
3
4    Copyright (C) Andrew Tridgell  2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "replace.h"
21 #include "system/filesys.h"
22 #include "system/network.h"
23 #include "system/wait.h"
24 #include "system/time.h"
25
26 #include <talloc.h>
27 #include <tevent.h>
28
29 #include "lib/tdb_wrap/tdb_wrap.h"
30 #include "lib/util/dlinklist.h"
31 #include "lib/util/debug.h"
32 #include "lib/util/samba_util.h"
33 #include "lib/util/sys_rw.h"
34 #include "lib/util/util_process.h"
35
36 #include "ctdb_private.h"
37 #include "ctdb_client.h"
38
39 #include "common/reqid.h"
40 #include "common/system.h"
41 #include "common/common.h"
42 #include "common/logging.h"
43
44 typedef void (*ctdb_traverse_fn_t)(void *private_data, TDB_DATA key, TDB_DATA data);
45
46 /*
47   handle returned to caller - freeing this handler will kill the child and 
48   terminate the traverse
49  */
50 struct ctdb_traverse_local_handle {
51         struct ctdb_traverse_local_handle *next, *prev;
52         struct ctdb_db_context *ctdb_db;
53         int fd[2];
54         pid_t child;
55         uint64_t srvid;
56         uint32_t client_reqid;
57         uint32_t reqid;
58         int srcnode;
59         void *private_data;
60         ctdb_traverse_fn_t callback;
61         bool withemptyrecords;
62         struct tevent_fd *fde;
63         int records_failed;
64         int records_sent;
65 };
66
67 /*
68  * called when traverse is completed by child or on error
69  */
70 static void ctdb_traverse_child_handler(struct tevent_context *ev, struct tevent_fd *fde,
71                                         uint16_t flags, void *private_data)
72 {
73         struct ctdb_traverse_local_handle *h = talloc_get_type(private_data,
74                                                         struct ctdb_traverse_local_handle);
75         ctdb_traverse_fn_t callback = h->callback;
76         void *p = h->private_data;
77         int res;
78         ssize_t n;
79
80         /* Read the number of records sent by traverse child */
81         n = sys_read(h->fd[0], &res, sizeof(res));
82         if (n < 0 || n != sizeof(res)) {
83                 /* Traverse child failed */
84                 DEBUG(DEBUG_ERR, ("Local traverse failed db:%s reqid:%d\n",
85                                   h->ctdb_db->db_name, h->reqid));
86         } else if (res < 0) {
87                 /* Traverse failed */
88                 res = -res;
89                 DEBUG(DEBUG_ERR, ("Local traverse failed db:%s reqid:%d records:%d\n",
90                                   h->ctdb_db->db_name, h->reqid, res));
91         } else {
92                 DEBUG(DEBUG_INFO, ("Local traverse end db:%s reqid:%d records:%d\n",
93                                    h->ctdb_db->db_name, h->reqid, res));
94         }
95
96         callback(p, tdb_null, tdb_null);
97 }
98
99 /*
100   destroy a in-flight traverse operation
101  */
102 static int traverse_local_destructor(struct ctdb_traverse_local_handle *h)
103 {
104         DLIST_REMOVE(h->ctdb_db->traverse, h);
105         ctdb_kill(h->ctdb_db->ctdb, h->child, SIGKILL);
106         return 0;
107 }
108
109 /*
110   callback from tdb_traverse_read()
111  */
112 static int ctdb_traverse_local_fn(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
113 {
114         struct ctdb_traverse_local_handle *h = talloc_get_type(p,
115                                                                struct ctdb_traverse_local_handle);
116         struct ctdb_rec_data_old *d;
117         struct ctdb_ltdb_header *hdr;
118         int res, status;
119         TDB_DATA outdata;
120
121         hdr = (struct ctdb_ltdb_header *)data.dptr;
122
123         if (ctdb_db_volatile(h->ctdb_db)) {
124                 /* filter out zero-length records */
125                 if (!h->withemptyrecords &&
126                     data.dsize <= sizeof(struct ctdb_ltdb_header))
127                 {
128                         return 0;
129                 }
130
131                 /* filter out non-authoritative records */
132                 if (hdr->dmaster != h->ctdb_db->ctdb->pnn) {
133                         return 0;
134                 }
135         }
136
137         d = ctdb_marshall_record(h, h->reqid, key, NULL, data);
138         if (d == NULL) {
139                 /* error handling is tricky in this child code .... */
140                 h->records_failed++;
141                 return -1;
142         }
143
144         outdata.dptr = (uint8_t *)d;
145         outdata.dsize = d->length;
146
147         res = ctdb_control(h->ctdb_db->ctdb, h->srcnode, 0, CTDB_CONTROL_TRAVERSE_DATA,
148                            CTDB_CTRL_FLAG_NOREPLY, outdata, NULL, NULL, &status, NULL, NULL);
149         if (res != 0 || status != 0) {
150                 h->records_failed++;
151                 return -1;
152         }
153
154         h->records_sent++;
155         return 0;
156 }
157
158 struct traverse_all_state {
159         struct ctdb_context *ctdb;
160         struct ctdb_traverse_local_handle *h;
161         uint32_t reqid;
162         uint32_t srcnode;
163         uint32_t client_reqid;
164         uint64_t srvid;
165         bool withemptyrecords;
166 };
167
168 /*
169   setup a non-blocking traverse of a local ltdb. The callback function
170   will be called on every record in the local ltdb. To stop the
171   traverse, talloc_free() the traverse_handle.
172
173   The traverse is finished when the callback is called with tdb_null for key and data
174  */
175 static struct ctdb_traverse_local_handle *ctdb_traverse_local(struct ctdb_db_context *ctdb_db,
176                                                               ctdb_traverse_fn_t callback,
177                                                               struct traverse_all_state *all_state)
178 {
179         struct ctdb_traverse_local_handle *h;
180         int ret;
181
182         h = talloc_zero(all_state, struct ctdb_traverse_local_handle);
183         if (h == NULL) {
184                 return NULL;
185         }
186
187         ret = pipe(h->fd);
188
189         if (ret != 0) {
190                 talloc_free(h);
191                 return NULL;
192         }
193
194         h->child = ctdb_fork(ctdb_db->ctdb);
195
196         if (h->child == (pid_t)-1) {
197                 close(h->fd[0]);
198                 close(h->fd[1]);
199                 talloc_free(h);
200                 return NULL;
201         }
202
203         h->callback = callback;
204         h->private_data = all_state;
205         h->ctdb_db = ctdb_db;
206         h->client_reqid = all_state->client_reqid;
207         h->reqid = all_state->reqid;
208         h->srvid = all_state->srvid;
209         h->srcnode = all_state->srcnode;
210         h->withemptyrecords = all_state->withemptyrecords;
211
212         if (h->child == 0) {
213                 /* start the traverse in the child */
214                 int res, status;
215                 pid_t parent = getpid();
216                 struct ctdb_context *ctdb = ctdb_db->ctdb;
217                 struct ctdb_rec_data_old *d;
218                 TDB_DATA outdata;
219
220                 close(h->fd[0]);
221
222                 prctl_set_comment("ctdb_traverse");
223                 if (switch_from_server_to_client(ctdb) != 0) {
224                         DEBUG(DEBUG_CRIT, ("Failed to switch traverse child into client mode\n"));
225                         _exit(0);
226                 }
227
228                 d = ctdb_marshall_record(h, h->reqid, tdb_null, NULL, tdb_null);
229                 if (d == NULL) {
230                         res = 0;
231                         sys_write(h->fd[1], &res, sizeof(int));
232                         _exit(0);
233                 }
234
235                 res = tdb_traverse_read(ctdb_db->ltdb->tdb, ctdb_traverse_local_fn, h);
236                 if (res == -1 || h->records_failed > 0) {
237                         /* traverse failed */
238                         res = -(h->records_sent);
239                 } else {
240                         res = h->records_sent;
241                 }
242
243                 /* Wait till all the data is flushed from output queue */
244                 while (ctdb_queue_length(ctdb->daemon.queue) > 0) {
245                         tevent_loop_once(ctdb->ev);
246                 }
247
248                 /* End traverse by sending empty record */
249                 outdata.dptr = (uint8_t *)d;
250                 outdata.dsize = d->length;
251                 ret = ctdb_control(ctdb, h->srcnode, 0,
252                                    CTDB_CONTROL_TRAVERSE_DATA,
253                                    CTDB_CTRL_FLAG_NOREPLY, outdata,
254                                    NULL, NULL, &status, NULL, NULL);
255                 if (ret == -1 || status == -1) {
256                         if (res > 0) {
257                                 res = -res;
258                         }
259                 }
260
261                 sys_write(h->fd[1], &res, sizeof(res));
262
263                 ctdb_wait_for_process_to_exit(parent);
264                 _exit(0);
265         }
266
267         close(h->fd[1]);
268         set_close_on_exec(h->fd[0]);
269
270         talloc_set_destructor(h, traverse_local_destructor);
271
272         DLIST_ADD(ctdb_db->traverse, h);
273
274         h->fde = tevent_add_fd(ctdb_db->ctdb->ev, h, h->fd[0], TEVENT_FD_READ,
275                                ctdb_traverse_child_handler, h);
276         if (h->fde == NULL) {
277                 close(h->fd[0]);
278                 talloc_free(h);
279                 return NULL;
280         }
281         tevent_fd_set_auto_close(h->fde);
282
283         return h;
284 }
285
286
287 struct ctdb_traverse_all_handle {
288         struct ctdb_context *ctdb;
289         struct ctdb_db_context *ctdb_db;
290         uint32_t reqid;
291         ctdb_traverse_fn_t callback;
292         void *private_data;
293         uint32_t null_count;
294         bool timedout;
295 };
296
297 /*
298   destroy a traverse_all op
299  */
300 static int ctdb_traverse_all_destructor(struct ctdb_traverse_all_handle *state)
301 {
302         reqid_remove(state->ctdb->idr, state->reqid);
303         return 0;
304 }
305
306 /* called when a traverse times out */
307 static void ctdb_traverse_all_timeout(struct tevent_context *ev,
308                                       struct tevent_timer *te,
309                                       struct timeval t, void *private_data)
310 {
311         struct ctdb_traverse_all_handle *state = talloc_get_type(private_data, struct ctdb_traverse_all_handle);
312
313         DEBUG(DEBUG_ERR,(__location__ " Traverse all timeout on database:%s\n", state->ctdb_db->db_name));
314         CTDB_INCREMENT_STAT(state->ctdb, timeouts.traverse);
315
316         state->timedout = true;
317         state->callback(state->private_data, tdb_null, tdb_null);
318 }
319
320
321 struct traverse_start_state {
322         struct ctdb_context *ctdb;
323         struct ctdb_traverse_all_handle *h;
324         uint32_t srcnode;
325         uint32_t reqid;
326         uint32_t db_id;
327         uint64_t srvid;
328         bool withemptyrecords;
329         int num_records;
330 };
331
332
333 /*
334   setup a cluster-wide non-blocking traverse of a ctdb. The
335   callback function will be called on every record in the local
336   ltdb. To stop the traverse, talloc_free() the traverse_handle.
337
338   The traverse is finished when the callback is called with tdb_null
339   for key and data
340  */
341 static struct ctdb_traverse_all_handle *ctdb_daemon_traverse_all(struct ctdb_db_context *ctdb_db,
342                                                                  ctdb_traverse_fn_t callback,
343                                                                  struct traverse_start_state *start_state)
344 {
345         struct ctdb_traverse_all_handle *state;
346         struct ctdb_context *ctdb = ctdb_db->ctdb;
347         int ret;
348         TDB_DATA data;
349         struct ctdb_traverse_all r;
350         struct ctdb_traverse_all_ext r_ext;
351         uint32_t destination;
352
353         state = talloc(start_state, struct ctdb_traverse_all_handle);
354         if (state == NULL) {
355                 return NULL;
356         }
357
358         state->ctdb         = ctdb;
359         state->ctdb_db      = ctdb_db;
360         state->reqid        = reqid_new(ctdb_db->ctdb->idr, state);
361         state->callback     = callback;
362         state->private_data = start_state;
363         state->null_count   = 0;
364         state->timedout     = false;
365         
366         talloc_set_destructor(state, ctdb_traverse_all_destructor);
367
368         if (start_state->withemptyrecords) {
369                 r_ext.db_id = ctdb_db->db_id;
370                 r_ext.reqid = state->reqid;
371                 r_ext.pnn   = ctdb->pnn;
372                 r_ext.client_reqid = start_state->reqid;
373                 r_ext.srvid = start_state->srvid;
374                 r_ext.withemptyrecords = start_state->withemptyrecords;
375
376                 data.dptr = (uint8_t *)&r_ext;
377                 data.dsize = sizeof(r_ext);
378         } else {
379                 r.db_id = ctdb_db->db_id;
380                 r.reqid = state->reqid;
381                 r.pnn   = ctdb->pnn;
382                 r.client_reqid = start_state->reqid;
383                 r.srvid = start_state->srvid;
384
385                 data.dptr = (uint8_t *)&r;
386                 data.dsize = sizeof(r);
387         }
388
389         if (ctdb_db_volatile(ctdb_db)) {
390                 /* normal database, traverse all nodes */         
391                 destination = CTDB_BROADCAST_VNNMAP;
392         } else {
393                 int i;
394                 /* persistent database, traverse one node, preferably
395                  * the local one
396                  */
397                 destination = ctdb->pnn;
398                 /* check we are in the vnnmap */
399                 for (i=0; i < ctdb->vnn_map->size; i++) {
400                         if (ctdb->vnn_map->map[i] == ctdb->pnn) {
401                                 break;
402                         }
403                 }
404                 /* if we are not in the vnn map we just pick the first
405                  * node instead
406                  */
407                 if (i == ctdb->vnn_map->size) {
408                         destination = ctdb->vnn_map->map[0];
409                 }
410         }
411
412         /* tell all the nodes in the cluster to start sending records to this
413          * node, or if it is a persistent database, just tell the local
414          * node
415          */
416
417         if (start_state->withemptyrecords) {
418                 ret = ctdb_daemon_send_control(ctdb, destination, 0,
419                                        CTDB_CONTROL_TRAVERSE_ALL_EXT,
420                                        0, CTDB_CTRL_FLAG_NOREPLY, data, NULL, NULL);
421         } else {
422                 ret = ctdb_daemon_send_control(ctdb, destination, 0,
423                                        CTDB_CONTROL_TRAVERSE_ALL,
424                                        0, CTDB_CTRL_FLAG_NOREPLY, data, NULL, NULL);
425         }
426
427         if (ret != 0) {
428                 talloc_free(state);
429                 return NULL;
430         }
431
432         DEBUG(DEBUG_NOTICE,("Starting traverse on DB %s (id %d)\n",
433                             ctdb_db->db_name, state->reqid));
434
435         /* timeout the traverse */
436         tevent_add_timer(ctdb->ev, state,
437                          timeval_current_ofs(ctdb->tunable.traverse_timeout, 0),
438                          ctdb_traverse_all_timeout, state);
439
440         return state;
441 }
442
443 /*
444   called when local traverse ends
445  */
446 static void traverse_all_callback(void *p, TDB_DATA key, TDB_DATA data)
447 {
448         struct traverse_all_state *state = talloc_get_type(p, struct traverse_all_state);
449
450         /* we're done */
451         talloc_free(state);
452 }
453
454 /*
455  * extended version to take the "withemptyrecords" parameter"
456  */
457 int32_t ctdb_control_traverse_all_ext(struct ctdb_context *ctdb, TDB_DATA data, TDB_DATA *outdata)
458 {
459         struct ctdb_traverse_all_ext *c = (struct ctdb_traverse_all_ext *)data.dptr;
460         struct traverse_all_state *state;
461         struct ctdb_db_context *ctdb_db;
462
463         if (data.dsize != sizeof(struct ctdb_traverse_all_ext)) {
464                 DEBUG(DEBUG_ERR,(__location__ " Invalid size in ctdb_control_traverse_all_ext\n"));
465                 return -1;
466         }
467
468         ctdb_db = find_ctdb_db(ctdb, c->db_id);
469         if (ctdb_db == NULL) {
470                 return -1;
471         }
472
473         if (ctdb_db->unhealthy_reason) {
474                 if (ctdb->tunable.allow_unhealthy_db_read == 0) {
475                         DEBUG(DEBUG_ERR,("db(%s) unhealty in ctdb_control_traverse_all: %s\n",
476                                         ctdb_db->db_name, ctdb_db->unhealthy_reason));
477                         return -1;
478                 }
479                 DEBUG(DEBUG_WARNING,("warn: db(%s) unhealty in ctdb_control_traverse_all: %s\n",
480                                      ctdb_db->db_name, ctdb_db->unhealthy_reason));
481         }
482
483         state = talloc(ctdb_db, struct traverse_all_state);
484         if (state == NULL) {
485                 return -1;
486         }
487
488         state->reqid = c->reqid;
489         state->srcnode = c->pnn;
490         state->ctdb = ctdb;
491         state->client_reqid = c->client_reqid;
492         state->srvid = c->srvid;
493         state->withemptyrecords = c->withemptyrecords;
494
495         state->h = ctdb_traverse_local(ctdb_db, traverse_all_callback, state);
496         if (state->h == NULL) {
497                 talloc_free(state);
498                 return -1;
499         }
500
501         return 0;
502 }
503
504 /*
505   called when a CTDB_CONTROL_TRAVERSE_ALL control comes in. We then
506   setup a traverse of our local ltdb, sending the records as
507   CTDB_CONTROL_TRAVERSE_DATA records back to the originator
508  */
509 int32_t ctdb_control_traverse_all(struct ctdb_context *ctdb, TDB_DATA data, TDB_DATA *outdata)
510 {
511         struct ctdb_traverse_all *c = (struct ctdb_traverse_all *)data.dptr;
512         struct traverse_all_state *state;
513         struct ctdb_db_context *ctdb_db;
514
515         if (data.dsize != sizeof(struct ctdb_traverse_all)) {
516                 DEBUG(DEBUG_ERR,(__location__ " Invalid size in ctdb_control_traverse_all\n"));
517                 return -1;
518         }
519
520         ctdb_db = find_ctdb_db(ctdb, c->db_id);
521         if (ctdb_db == NULL) {
522                 return -1;
523         }
524
525         if (ctdb_db->unhealthy_reason) {
526                 if (ctdb->tunable.allow_unhealthy_db_read == 0) {
527                         DEBUG(DEBUG_ERR,("db(%s) unhealty in ctdb_control_traverse_all: %s\n",
528                                         ctdb_db->db_name, ctdb_db->unhealthy_reason));
529                         return -1;
530                 }
531                 DEBUG(DEBUG_WARNING,("warn: db(%s) unhealty in ctdb_control_traverse_all: %s\n",
532                                      ctdb_db->db_name, ctdb_db->unhealthy_reason));
533         }
534
535         state = talloc(ctdb_db, struct traverse_all_state);
536         if (state == NULL) {
537                 return -1;
538         }
539
540         state->reqid = c->reqid;
541         state->srcnode = c->pnn;
542         state->ctdb = ctdb;
543         state->client_reqid = c->client_reqid;
544         state->srvid = c->srvid;
545         state->withemptyrecords = false;
546
547         state->h = ctdb_traverse_local(ctdb_db, traverse_all_callback, state);
548         if (state->h == NULL) {
549                 talloc_free(state);
550                 return -1;
551         }
552
553         return 0;
554 }
555
556
557 /*
558   called when a CTDB_CONTROL_TRAVERSE_DATA control comes in. We then
559   call the traverse_all callback with the record
560  */
561 int32_t ctdb_control_traverse_data(struct ctdb_context *ctdb, TDB_DATA data, TDB_DATA *outdata)
562 {
563         struct ctdb_rec_data_old *d = (struct ctdb_rec_data_old *)data.dptr;
564         struct ctdb_traverse_all_handle *state;
565         TDB_DATA key;
566         ctdb_traverse_fn_t callback;
567         void *private_data;
568
569         if (data.dsize < sizeof(uint32_t) || data.dsize != d->length) {
570                 DEBUG(DEBUG_ERR,("Bad record size in ctdb_control_traverse_data\n"));
571                 return -1;
572         }
573
574         state = reqid_find(ctdb->idr, d->reqid, struct ctdb_traverse_all_handle);
575         if (state == NULL || d->reqid != state->reqid) {
576                 /* traverse might have been terminated already */
577                 return -1;
578         }
579
580         key.dsize = d->keylen;
581         key.dptr  = &d->data[0];
582         data.dsize = d->datalen;
583         data.dptr = &d->data[d->keylen];
584
585         if (key.dsize == 0 && data.dsize == 0) {
586                 state->null_count++;
587                 /* Persistent databases are only scanned on one node (the local
588                  * node)
589                  */
590                 if (ctdb_db_volatile(state->ctdb_db)) {
591                         if (state->null_count != ctdb_get_num_active_nodes(ctdb)) {
592                                 return 0;
593                         }
594                 }
595         }
596
597         callback = state->callback;
598         private_data = state->private_data;
599
600         callback(private_data, key, data);
601         return 0;
602 }       
603
604 /*
605   kill a in-progress traverse, used when a client disconnects
606  */
607 int32_t ctdb_control_traverse_kill(struct ctdb_context *ctdb, TDB_DATA data, 
608                                    TDB_DATA *outdata, uint32_t srcnode)
609 {
610         struct ctdb_traverse_start *d = (struct ctdb_traverse_start *)data.dptr;
611         struct ctdb_db_context *ctdb_db;
612         struct ctdb_traverse_local_handle *t;
613
614         ctdb_db = find_ctdb_db(ctdb, d->db_id);
615         if (ctdb_db == NULL) {
616                 return -1;
617         }
618
619         for (t=ctdb_db->traverse; t; t=t->next) {
620                 if (t->client_reqid == d->reqid &&
621                     t->srvid == d->srvid) {
622                         talloc_free(t);
623                         break;
624                 }
625         }
626
627         return 0;
628 }
629
630
631 /*
632   this is called when a client disconnects during a traverse
633   we need to notify all the nodes taking part in the search that they
634   should kill their traverse children
635  */
636 static int ctdb_traverse_start_destructor(struct traverse_start_state *state)
637 {
638         struct ctdb_traverse_start r;
639         TDB_DATA data;
640
641         DEBUG(DEBUG_ERR,(__location__ " Traverse cancelled by client disconnect for database:0x%08x\n", state->db_id));
642         r.db_id = state->db_id;
643         r.reqid = state->reqid;
644         r.srvid = state->srvid;
645
646         data.dptr = (uint8_t *)&r;
647         data.dsize = sizeof(r);
648
649         ctdb_daemon_send_control(state->ctdb, CTDB_BROADCAST_CONNECTED, 0, 
650                                  CTDB_CONTROL_TRAVERSE_KILL, 
651                                  0, CTDB_CTRL_FLAG_NOREPLY, data, NULL, NULL);
652         return 0;
653 }
654
655 /*
656   callback which sends records as messages to the client
657  */
658 static void traverse_start_callback(void *p, TDB_DATA key, TDB_DATA data)
659 {
660         struct traverse_start_state *state;
661         struct ctdb_rec_data_old *d;
662         TDB_DATA cdata;
663
664         state = talloc_get_type(p, struct traverse_start_state);
665
666         d = ctdb_marshall_record(state, state->reqid, key, NULL, data);
667         if (d == NULL) {
668                 return;
669         }
670
671         cdata.dptr = (uint8_t *)d;
672         cdata.dsize = d->length;
673
674         srvid_dispatch(state->ctdb->srv, state->srvid, 0, cdata);
675         if (key.dsize == 0 && data.dsize == 0) {
676                 DEBUG(DEBUG_NOTICE, ("Ending traverse on DB %s (id %d), records %d\n",
677                                      state->h->ctdb_db->db_name, state->h->reqid,
678                                      state->num_records));
679
680                 if (state->h->timedout) {
681                         /* timed out, send TRAVERSE_KILL control */
682                         talloc_free(state);
683                 } else {
684                         /* end of traverse */
685                         talloc_set_destructor(state, NULL);
686                         talloc_free(state);
687                 }
688         } else {
689                 state->num_records++;
690         }
691 }
692
693
694 /**
695  * start a traverse_all - called as a control from a client.
696  * extended version to take the "withemptyrecords" parameter.
697  */
698 int32_t ctdb_control_traverse_start_ext(struct ctdb_context *ctdb,
699                                         TDB_DATA data,
700                                         TDB_DATA *outdata,
701                                         uint32_t srcnode,
702                                         uint32_t client_id)
703 {
704         struct ctdb_traverse_start_ext *d = (struct ctdb_traverse_start_ext *)data.dptr;
705         struct traverse_start_state *state;
706         struct ctdb_db_context *ctdb_db;
707         struct ctdb_client *client = reqid_find(ctdb->idr, client_id, struct ctdb_client);
708
709         if (client == NULL) {
710                 DEBUG(DEBUG_ERR,(__location__ " No client found\n"));
711                 return -1;              
712         }
713
714         if (data.dsize != sizeof(*d)) {
715                 DEBUG(DEBUG_ERR,("Bad record size in ctdb_control_traverse_start\n"));
716                 return -1;
717         }
718
719         ctdb_db = find_ctdb_db(ctdb, d->db_id);
720         if (ctdb_db == NULL) {
721                 return -1;
722         }
723
724         if (ctdb_db->unhealthy_reason) {
725                 if (ctdb->tunable.allow_unhealthy_db_read == 0) {
726                         DEBUG(DEBUG_ERR,("db(%s) unhealty in ctdb_control_traverse_start: %s\n",
727                                         ctdb_db->db_name, ctdb_db->unhealthy_reason));
728                         return -1;
729                 }
730                 DEBUG(DEBUG_WARNING,("warn: db(%s) unhealty in ctdb_control_traverse_start: %s\n",
731                                      ctdb_db->db_name, ctdb_db->unhealthy_reason));
732         }
733
734         state = talloc(client, struct traverse_start_state);
735         if (state == NULL) {
736                 return -1;
737         }
738         
739         state->srcnode = srcnode;
740         state->reqid = d->reqid;
741         state->srvid = d->srvid;
742         state->db_id = d->db_id;
743         state->ctdb = ctdb;
744         state->withemptyrecords = d->withemptyrecords;
745         state->num_records = 0;
746
747         state->h = ctdb_daemon_traverse_all(ctdb_db, traverse_start_callback, state);
748         if (state->h == NULL) {
749                 talloc_free(state);
750                 return -1;
751         }
752
753         talloc_set_destructor(state, ctdb_traverse_start_destructor);
754
755         return 0;
756 }
757
758 /**
759  * start a traverse_all - called as a control from a client.
760  */
761 int32_t ctdb_control_traverse_start(struct ctdb_context *ctdb,
762                                     TDB_DATA data,
763                                     TDB_DATA *outdata,
764                                     uint32_t srcnode,
765                                     uint32_t client_id)
766 {
767         struct ctdb_traverse_start *d = (struct ctdb_traverse_start *)data.dptr;
768         struct ctdb_traverse_start_ext d2;
769         TDB_DATA data2;
770
771         ZERO_STRUCT(d2);
772         d2.db_id = d->db_id;
773         d2.reqid = d->reqid;
774         d2.srvid = d->srvid;
775         d2.withemptyrecords = false;
776
777         data2.dsize = sizeof(d2);
778         data2.dptr = (uint8_t *)&d2;
779
780         return ctdb_control_traverse_start_ext(ctdb, data2, outdata, srcnode, client_id);
781 }