ctdb-tool: Correctly print timed out event scripts output
[obnox/samba/samba-obnox.git] / ctdb / tools / ctdb.c
1 /* 
2    ctdb control tool
3
4    Copyright (C) Andrew Tridgell  2007
5    Copyright (C) Ronnie Sahlberg  2007
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program; if not, see <http://www.gnu.org/licenses/>.
19 */
20
21 #include "includes.h"
22 #include "system/time.h"
23 #include "system/filesys.h"
24 #include "system/network.h"
25 #include "system/locale.h"
26 #include "popt.h"
27 #include "cmdline.h"
28 #include "../include/ctdb_version.h"
29 #include "../include/ctdb_client.h"
30 #include "../include/ctdb_private.h"
31 #include "../common/rb_tree.h"
32 #include "lib/tdb_wrap/tdb_wrap.h"
33 #include "lib/util/dlinklist.h"
34
35 #define ERR_TIMEOUT     20      /* timed out trying to reach node */
36 #define ERR_NONODE      21      /* node does not exist */
37 #define ERR_DISNODE     22      /* node is disconnected */
38
39 static void usage(void);
40
41 static struct {
42         int timelimit;
43         uint32_t pnn;
44         uint32_t *nodes;
45         int machinereadable;
46         const char *machineseparator;
47         int verbose;
48         int maxruntime;
49         int printemptyrecords;
50         int printdatasize;
51         int printlmaster;
52         int printhash;
53         int printrecordflags;
54 } options;
55
56 #define LONGTIMEOUT options.timelimit*10
57
58 #define TIMELIMIT() timeval_current_ofs(options.timelimit, 0)
59 #define LONGTIMELIMIT() timeval_current_ofs(LONGTIMEOUT, 0)
60
61 static double timeval_delta(struct timeval *tv2, struct timeval *tv)
62 {
63         return (tv2->tv_sec - tv->tv_sec) +
64                 (tv2->tv_usec - tv->tv_usec)*1.0e-6;
65 }
66
67 static int control_version(struct ctdb_context *ctdb, int argc, const char **argv)
68 {
69         printf("CTDB version: %s\n", CTDB_VERSION_STRING);
70         return 0;
71 }
72
73 /* Like printf(3) but substitute for separator in format */
74 static int printm(const char *format, ...) PRINTF_ATTRIBUTE(1,2);
75 static int printm(const char *format, ...)
76 {
77         va_list ap;
78         int ret;
79         size_t len = strlen(format);
80         char new_format[len+1];
81
82         strcpy(new_format, format);
83
84         if (options.machineseparator[0] != ':') {
85                 all_string_sub(new_format,
86                                ":", options.machineseparator, len + 1);
87         }
88
89         va_start(ap, format);
90         ret = vprintf(new_format, ap);
91         va_end(ap);
92
93         return ret;
94 }
95
96 #define CTDB_NOMEM_ABORT(p) do { if (!(p)) {                            \
97                 DEBUG(DEBUG_ALERT,("ctdb fatal error: %s\n",            \
98                                    "Out of memory in " __location__ )); \
99                 abort();                                                \
100         }} while (0)
101
102 static uint32_t getpnn(struct ctdb_context *ctdb)
103 {
104         if ((options.pnn == CTDB_BROADCAST_ALL) ||
105             (options.pnn == CTDB_MULTICAST)) {
106                 DEBUG(DEBUG_ERR,
107                       ("Cannot get PNN for node %u\n", options.pnn));
108                 exit(1);
109         }
110
111         if (options.pnn == CTDB_CURRENT_NODE) {
112                 return ctdb_get_pnn(ctdb);
113         } else {
114                 return options.pnn;
115         }
116 }
117
118 static void assert_single_node_only(void)
119 {
120         if ((options.pnn == CTDB_BROADCAST_ALL) ||
121             (options.pnn == CTDB_MULTICAST)) {
122                 DEBUG(DEBUG_ERR,
123                       ("This control can not be applied to multiple PNNs\n"));
124                 exit(1);
125         }
126 }
127
128 static void assert_current_node_only(struct ctdb_context *ctdb)
129 {
130         if (options.pnn != ctdb_get_pnn(ctdb)) {
131                 DEBUG(DEBUG_ERR,
132                       ("This control can only be applied to the current node\n"));
133                 exit(1);
134         }
135 }
136
137 /* Pretty print the flags to a static buffer in human-readable format.
138  * This never returns NULL!
139  */
140 static const char *pretty_print_flags(uint32_t flags)
141 {
142         int j;
143         static const struct {
144                 uint32_t flag;
145                 const char *name;
146         } flag_names[] = {
147                 { NODE_FLAGS_DISCONNECTED,          "DISCONNECTED" },
148                 { NODE_FLAGS_PERMANENTLY_DISABLED,  "DISABLED" },
149                 { NODE_FLAGS_BANNED,                "BANNED" },
150                 { NODE_FLAGS_UNHEALTHY,             "UNHEALTHY" },
151                 { NODE_FLAGS_DELETED,               "DELETED" },
152                 { NODE_FLAGS_STOPPED,               "STOPPED" },
153                 { NODE_FLAGS_INACTIVE,              "INACTIVE" },
154         };
155         static char flags_str[512]; /* Big enough to contain all flag names */
156
157         flags_str[0] = '\0';
158         for (j=0;j<ARRAY_SIZE(flag_names);j++) {
159                 if (flags & flag_names[j].flag) {
160                         if (flags_str[0] == '\0') {
161                                 (void) strcpy(flags_str, flag_names[j].name);
162                         } else {
163                                 (void) strncat(flags_str, "|", sizeof(flags_str)-1);
164                                 (void) strncat(flags_str, flag_names[j].name,
165                                                sizeof(flags_str)-1);
166                         }
167                 }
168         }
169         if (flags_str[0] == '\0') {
170                 (void) strcpy(flags_str, "OK");
171         }
172
173         return flags_str;
174 }
175
176 static int h2i(char h)
177 {
178         if (h >= 'a' && h <= 'f') return h - 'a' + 10;
179         if (h >= 'A' && h <= 'F') return h - 'f' + 10;
180         return h - '0';
181 }
182
183 static TDB_DATA hextodata(TALLOC_CTX *mem_ctx, const char *str, size_t len)
184 {
185         int i;
186         TDB_DATA key = {NULL, 0};
187
188         if (len & 0x01) {
189                 DEBUG(DEBUG_ERR,("Key specified with odd number of hexadecimal digits\n"));
190                 return key;
191         }
192
193         key.dsize = len>>1;
194         key.dptr  = talloc_size(mem_ctx, key.dsize);
195
196         for (i=0; i < len/2; i++) {
197                 key.dptr[i] = h2i(str[i*2]) << 4 | h2i(str[i*2+1]);
198         }
199         return key;
200 }
201
202 static TDB_DATA strtodata(TALLOC_CTX *mem_ctx, const char *str, size_t len)
203 {
204         TDB_DATA key;
205
206         if (!strncmp(str, "0x", 2)) {
207                 key = hextodata(mem_ctx, str + 2, len - 2);
208         } else {
209                 key.dptr  = talloc_memdup(mem_ctx, str, len);
210                 key.dsize = len;
211         }
212
213         return key;
214 }
215
216 /* Parse a nodestring.  Parameter dd_ok controls what happens to nodes
217  * that are disconnected or deleted.  If dd_ok is true those nodes are
218  * included in the output list of nodes.  If dd_ok is false, those
219  * nodes are filtered from the "all" case and cause an error if
220  * explicitly specified.
221  */
222 static bool parse_nodestring(struct ctdb_context *ctdb,
223                              TALLOC_CTX *mem_ctx,
224                              const char * nodestring,
225                              uint32_t current_pnn,
226                              bool dd_ok,
227                              uint32_t **nodes,
228                              uint32_t *pnn_mode)
229 {
230         TALLOC_CTX *tmp_ctx = talloc_new(mem_ctx);
231         int n;
232         uint32_t i;
233         struct ctdb_node_map *nodemap;
234         int ret;
235
236         *nodes = NULL;
237
238         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
239         if (ret != 0) {
240                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
241                 talloc_free(tmp_ctx);
242                 exit(10);
243         }
244
245         if (nodestring != NULL) {
246                 *nodes = talloc_array(mem_ctx, uint32_t, 0);
247                 if (*nodes == NULL) {
248                         goto failed;
249                 }
250
251                 n = 0;
252
253                 if (strcmp(nodestring, "all") == 0) {
254                         *pnn_mode = CTDB_BROADCAST_ALL;
255
256                         /* all */
257                         for (i = 0; i < nodemap->num; i++) {
258                                 if ((nodemap->nodes[i].flags &
259                                      (NODE_FLAGS_DISCONNECTED |
260                                       NODE_FLAGS_DELETED)) && !dd_ok) {
261                                         continue;
262                                 }
263                                 *nodes = talloc_realloc(mem_ctx, *nodes,
264                                                         uint32_t, n+1);
265                                 if (*nodes == NULL) {
266                                         goto failed;
267                                 }
268                                 (*nodes)[n] = i;
269                                 n++;
270                         }
271                 } else {
272                         /* x{,y...} */
273                         char *ns, *tok;
274
275                         ns = talloc_strdup(tmp_ctx, nodestring);
276                         tok = strtok(ns, ",");
277                         while (tok != NULL) {
278                                 uint32_t pnn;
279                                 char *endptr;
280                                 i = (uint32_t)strtoul(tok, &endptr, 0);
281                                 if (i == 0 && tok == endptr) {
282                                         DEBUG(DEBUG_ERR,
283                                               ("Invalid node %s\n", tok));
284                                         talloc_free(tmp_ctx);
285                                         exit(ERR_NONODE);
286                                 }
287                                 if (i >= nodemap->num) {
288                                         DEBUG(DEBUG_ERR, ("Node %u does not exist\n", i));
289                                         talloc_free(tmp_ctx);
290                                         exit(ERR_NONODE);
291                                 }
292                                 if ((nodemap->nodes[i].flags & 
293                                      (NODE_FLAGS_DISCONNECTED |
294                                       NODE_FLAGS_DELETED)) && !dd_ok) {
295                                         DEBUG(DEBUG_ERR, ("Node %u has status %s\n", i, pretty_print_flags(nodemap->nodes[i].flags)));
296                                         talloc_free(tmp_ctx);
297                                         exit(ERR_DISNODE);
298                                 }
299                                 if ((pnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), i)) < 0) {
300                                         DEBUG(DEBUG_ERR, ("Can not access node %u. Node is not operational.\n", i));
301                                         talloc_free(tmp_ctx);
302                                         exit(10);
303                                 }
304
305                                 *nodes = talloc_realloc(mem_ctx, *nodes,
306                                                         uint32_t, n+1);
307                                 if (*nodes == NULL) {
308                                         goto failed;
309                                 }
310
311                                 (*nodes)[n] = i;
312                                 n++;
313
314                                 tok = strtok(NULL, ",");
315                         }
316                         talloc_free(ns);
317
318                         if (n == 1) {
319                                 *pnn_mode = (*nodes)[0];
320                         } else {
321                                 *pnn_mode = CTDB_MULTICAST;
322                         }
323                 }
324         } else {
325                 /* default - no nodes specified */
326                 *nodes = talloc_array(mem_ctx, uint32_t, 1);
327                 if (*nodes == NULL) {
328                         goto failed;
329                 }
330                 *pnn_mode = CTDB_CURRENT_NODE;
331
332                 if (((*nodes)[0] = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), current_pnn)) < 0) {
333                         goto failed;
334                 }
335         }
336
337         talloc_free(tmp_ctx);
338         return true;
339
340 failed:
341         talloc_free(tmp_ctx);
342         return false;
343 }
344
345 /*
346  check if a database exists
347 */
348 static bool db_exists(struct ctdb_context *ctdb, const char *dbarg,
349                       uint32_t *dbid, const char **dbname, uint8_t *flags)
350 {
351         int i, ret;
352         struct ctdb_dbid_map *dbmap=NULL;
353         bool dbid_given = false, found = false;
354         uint32_t id;
355         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
356         const char *name = NULL;
357
358         ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &dbmap);
359         if (ret != 0) {
360                 DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n", options.pnn));
361                 goto fail;
362         }
363
364         if (strncmp(dbarg, "0x", 2) == 0) {
365                 id = strtoul(dbarg, NULL, 0);
366                 dbid_given = true;
367         }
368
369         for(i=0; i<dbmap->num; i++) {
370                 if (dbid_given) {
371                         if (id == dbmap->dbs[i].dbid) {
372                                 found = true;
373                                 break;
374                         }
375                 } else {
376                         ret = ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, tmp_ctx, &name);
377                         if (ret != 0) {
378                                 DEBUG(DEBUG_ERR, ("Unable to get dbname from dbid %u\n", dbmap->dbs[i].dbid));
379                                 goto fail;
380                         }
381
382                         if (strcmp(name, dbarg) == 0) {
383                                 id = dbmap->dbs[i].dbid;
384                                 found = true;
385                                 break;
386                         }
387                 }
388         }
389
390         if (found && dbid_given && dbname != NULL) {
391                 ret = ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, tmp_ctx, &name);
392                 if (ret != 0) {
393                         DEBUG(DEBUG_ERR, ("Unable to get dbname from dbid %u\n", dbmap->dbs[i].dbid));
394                         found = false;
395                         goto fail;
396                 }
397         }
398
399         if (found) {
400                 if (dbid) *dbid = id;
401                 if (dbname) *dbname = talloc_strdup(ctdb, name);
402                 if (flags) *flags = dbmap->dbs[i].flags;
403         } else {
404                 DEBUG(DEBUG_ERR,("No database matching '%s' found\n", dbarg));
405         }
406
407 fail:
408         talloc_free(tmp_ctx);
409         return found;
410 }
411
412 /*
413   see if a process exists
414  */
415 static int control_process_exists(struct ctdb_context *ctdb, int argc, const char **argv)
416 {
417         uint32_t pnn, pid;
418         int ret;
419         if (argc < 1) {
420                 usage();
421         }
422
423         if (sscanf(argv[0], "%u:%u", &pnn, &pid) != 2) {
424                 DEBUG(DEBUG_ERR, ("Badly formed pnn:pid\n"));
425                 return -1;
426         }
427
428         ret = ctdb_ctrl_process_exists(ctdb, pnn, pid);
429         if (ret == 0) {
430                 printf("%u:%u exists\n", pnn, pid);
431         } else {
432                 printf("%u:%u does not exist\n", pnn, pid);
433         }
434         return ret;
435 }
436
437 /*
438   display statistics structure
439  */
440 static void show_statistics(struct ctdb_statistics *s, int show_header)
441 {
442         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
443         int i;
444         const char *prefix=NULL;
445         int preflen=0;
446         int tmp, days, hours, minutes, seconds;
447         const struct {
448                 const char *name;
449                 uint32_t offset;
450         } fields[] = {
451 #define STATISTICS_FIELD(n) { #n, offsetof(struct ctdb_statistics, n) }
452                 STATISTICS_FIELD(num_clients),
453                 STATISTICS_FIELD(frozen),
454                 STATISTICS_FIELD(recovering),
455                 STATISTICS_FIELD(num_recoveries),
456                 STATISTICS_FIELD(client_packets_sent),
457                 STATISTICS_FIELD(client_packets_recv),
458                 STATISTICS_FIELD(node_packets_sent),
459                 STATISTICS_FIELD(node_packets_recv),
460                 STATISTICS_FIELD(keepalive_packets_sent),
461                 STATISTICS_FIELD(keepalive_packets_recv),
462                 STATISTICS_FIELD(node.req_call),
463                 STATISTICS_FIELD(node.reply_call),
464                 STATISTICS_FIELD(node.req_dmaster),
465                 STATISTICS_FIELD(node.reply_dmaster),
466                 STATISTICS_FIELD(node.reply_error),
467                 STATISTICS_FIELD(node.req_message),
468                 STATISTICS_FIELD(node.req_control),
469                 STATISTICS_FIELD(node.reply_control),
470                 STATISTICS_FIELD(client.req_call),
471                 STATISTICS_FIELD(client.req_message),
472                 STATISTICS_FIELD(client.req_control),
473                 STATISTICS_FIELD(timeouts.call),
474                 STATISTICS_FIELD(timeouts.control),
475                 STATISTICS_FIELD(timeouts.traverse),
476                 STATISTICS_FIELD(locks.num_calls),
477                 STATISTICS_FIELD(locks.num_current),
478                 STATISTICS_FIELD(locks.num_pending),
479                 STATISTICS_FIELD(locks.num_failed),
480                 STATISTICS_FIELD(total_calls),
481                 STATISTICS_FIELD(pending_calls),
482                 STATISTICS_FIELD(childwrite_calls),
483                 STATISTICS_FIELD(pending_childwrite_calls),
484                 STATISTICS_FIELD(memory_used),
485                 STATISTICS_FIELD(max_hop_count),
486                 STATISTICS_FIELD(total_ro_delegations),
487                 STATISTICS_FIELD(total_ro_revokes),
488         };
489         
490         tmp = s->statistics_current_time.tv_sec - s->statistics_start_time.tv_sec;
491         seconds = tmp%60;
492         tmp    /= 60;
493         minutes = tmp%60;
494         tmp    /= 60;
495         hours   = tmp%24;
496         tmp    /= 24;
497         days    = tmp;
498
499         if (options.machinereadable){
500                 if (show_header) {
501                         printm("CTDB version:");
502                         printm("Current time of statistics:");
503                         printm("Statistics collected since:");
504                         for (i=0;i<ARRAY_SIZE(fields);i++) {
505                                 printm("%s:", fields[i].name);
506                         }
507                         printm("num_reclock_ctdbd_latency:");
508                         printm("min_reclock_ctdbd_latency:");
509                         printm("avg_reclock_ctdbd_latency:");
510                         printm("max_reclock_ctdbd_latency:");
511
512                         printm("num_reclock_recd_latency:");
513                         printm("min_reclock_recd_latency:");
514                         printm("avg_reclock_recd_latency:");
515                         printm("max_reclock_recd_latency:");
516
517                         printm("num_call_latency:");
518                         printm("min_call_latency:");
519                         printm("avg_call_latency:");
520                         printm("max_call_latency:");
521
522                         printm("num_lockwait_latency:");
523                         printm("min_lockwait_latency:");
524                         printm("avg_lockwait_latency:");
525                         printm("max_lockwait_latency:");
526
527                         printm("num_childwrite_latency:");
528                         printm("min_childwrite_latency:");
529                         printm("avg_childwrite_latency:");
530                         printm("max_childwrite_latency:");
531                         printm("\n");
532                 }
533                 printm("%d:", CTDB_PROTOCOL);
534                 printm("%d:", (int)s->statistics_current_time.tv_sec);
535                 printm("%d:", (int)s->statistics_start_time.tv_sec);
536                 for (i=0;i<ARRAY_SIZE(fields);i++) {
537                         printm("%d:", *(uint32_t *)(fields[i].offset+(uint8_t *)s));
538                 }
539                 printm("%d:", s->reclock.ctdbd.num);
540                 printm("%.6f:", s->reclock.ctdbd.min);
541                 printm("%.6f:", s->reclock.ctdbd.num?s->reclock.ctdbd.total/s->reclock.ctdbd.num:0.0);
542                 printm("%.6f:", s->reclock.ctdbd.max);
543
544                 printm("%d:", s->reclock.recd.num);
545                 printm("%.6f:", s->reclock.recd.min);
546                 printm("%.6f:", s->reclock.recd.num?s->reclock.recd.total/s->reclock.recd.num:0.0);
547                 printm("%.6f:", s->reclock.recd.max);
548
549                 printm("%d:", s->call_latency.num);
550                 printm("%.6f:", s->call_latency.min);
551                 printm("%.6f:", s->call_latency.num?s->call_latency.total/s->call_latency.num:0.0);
552                 printm("%.6f:", s->call_latency.max);
553
554                 printm("%d:", s->childwrite_latency.num);
555                 printm("%.6f:", s->childwrite_latency.min);
556                 printm("%.6f:", s->childwrite_latency.num?s->childwrite_latency.total/s->childwrite_latency.num:0.0);
557                 printm("%.6f:", s->childwrite_latency.max);
558                 printm("\n");
559         } else {
560                 printf("CTDB version %u\n", CTDB_PROTOCOL);
561                 printf("Current time of statistics  :                %s", ctime(&s->statistics_current_time.tv_sec));
562                 printf("Statistics collected since  : (%03d %02d:%02d:%02d) %s", days, hours, minutes, seconds, ctime(&s->statistics_start_time.tv_sec));
563
564                 for (i=0;i<ARRAY_SIZE(fields);i++) {
565                         if (strchr(fields[i].name, '.')) {
566                                 preflen = strcspn(fields[i].name, ".")+1;
567                                 if (!prefix || strncmp(prefix, fields[i].name, preflen) != 0) {
568                                         prefix = fields[i].name;
569                                         printf(" %*.*s\n", preflen-1, preflen-1, fields[i].name);
570                                 }
571                         } else {
572                                 preflen = 0;
573                         }
574                         printf(" %*s%-22s%*s%10u\n", 
575                                preflen?4:0, "",
576                                fields[i].name+preflen, 
577                                preflen?0:4, "",
578                                *(uint32_t *)(fields[i].offset+(uint8_t *)s));
579                 }
580                 printf(" hop_count_buckets:");
581                 for (i=0;i<MAX_COUNT_BUCKETS;i++) {
582                         printf(" %d", s->hop_count_bucket[i]);
583                 }
584                 printf("\n");
585                 printf(" lock_buckets:");
586                 for (i=0; i<MAX_COUNT_BUCKETS; i++) {
587                         printf(" %d", s->locks.buckets[i]);
588                 }
589                 printf("\n");
590                 printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n", "locks_latency      MIN/AVG/MAX", s->locks.latency.min, s->locks.latency.num?s->locks.latency.total/s->locks.latency.num:0.0, s->locks.latency.max, s->locks.latency.num);
591
592                 printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n", "reclock_ctdbd      MIN/AVG/MAX", s->reclock.ctdbd.min, s->reclock.ctdbd.num?s->reclock.ctdbd.total/s->reclock.ctdbd.num:0.0, s->reclock.ctdbd.max, s->reclock.ctdbd.num);
593
594                 printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n", "reclock_recd       MIN/AVG/MAX", s->reclock.recd.min, s->reclock.recd.num?s->reclock.recd.total/s->reclock.recd.num:0.0, s->reclock.recd.max, s->reclock.recd.num);
595
596                 printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n", "call_latency       MIN/AVG/MAX", s->call_latency.min, s->call_latency.num?s->call_latency.total/s->call_latency.num:0.0, s->call_latency.max, s->call_latency.num);
597                 printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n", "childwrite_latency MIN/AVG/MAX", s->childwrite_latency.min, s->childwrite_latency.num?s->childwrite_latency.total/s->childwrite_latency.num:0.0, s->childwrite_latency.max, s->childwrite_latency.num);
598         }
599
600         talloc_free(tmp_ctx);
601 }
602
603 /*
604   display remote ctdb statistics combined from all nodes
605  */
606 static int control_statistics_all(struct ctdb_context *ctdb)
607 {
608         int ret, i;
609         struct ctdb_statistics statistics;
610         uint32_t *nodes;
611         uint32_t num_nodes;
612
613         nodes = ctdb_get_connected_nodes(ctdb, TIMELIMIT(), ctdb, &num_nodes);
614         CTDB_NO_MEMORY(ctdb, nodes);
615         
616         ZERO_STRUCT(statistics);
617
618         for (i=0;i<num_nodes;i++) {
619                 struct ctdb_statistics s1;
620                 int j;
621                 uint32_t *v1 = (uint32_t *)&s1;
622                 uint32_t *v2 = (uint32_t *)&statistics;
623                 uint32_t num_ints = 
624                         offsetof(struct ctdb_statistics, __last_counter) / sizeof(uint32_t);
625                 ret = ctdb_ctrl_statistics(ctdb, nodes[i], &s1);
626                 if (ret != 0) {
627                         DEBUG(DEBUG_ERR, ("Unable to get statistics from node %u\n", nodes[i]));
628                         return ret;
629                 }
630                 for (j=0;j<num_ints;j++) {
631                         v2[j] += v1[j];
632                 }
633                 statistics.max_hop_count = 
634                         MAX(statistics.max_hop_count, s1.max_hop_count);
635                 statistics.call_latency.max = 
636                         MAX(statistics.call_latency.max, s1.call_latency.max);
637         }
638         talloc_free(nodes);
639         printf("Gathered statistics for %u nodes\n", num_nodes);
640         show_statistics(&statistics, 1);
641         return 0;
642 }
643
644 /*
645   display remote ctdb statistics
646  */
647 static int control_statistics(struct ctdb_context *ctdb, int argc, const char **argv)
648 {
649         int ret;
650         struct ctdb_statistics statistics;
651
652         if (options.pnn == CTDB_BROADCAST_ALL) {
653                 return control_statistics_all(ctdb);
654         }
655
656         ret = ctdb_ctrl_statistics(ctdb, options.pnn, &statistics);
657         if (ret != 0) {
658                 DEBUG(DEBUG_ERR, ("Unable to get statistics from node %u\n", options.pnn));
659                 return ret;
660         }
661         show_statistics(&statistics, 1);
662         return 0;
663 }
664
665
666 /*
667   reset remote ctdb statistics
668  */
669 static int control_statistics_reset(struct ctdb_context *ctdb, int argc, const char **argv)
670 {
671         int ret;
672
673         ret = ctdb_statistics_reset(ctdb, options.pnn);
674         if (ret != 0) {
675                 DEBUG(DEBUG_ERR, ("Unable to reset statistics on node %u\n", options.pnn));
676                 return ret;
677         }
678         return 0;
679 }
680
681
682 /*
683   display remote ctdb rolling statistics
684  */
685 static int control_stats(struct ctdb_context *ctdb, int argc, const char **argv)
686 {
687         int ret;
688         struct ctdb_statistics_wire *stats;
689         int i, num_records = -1;
690
691         assert_single_node_only();
692
693         if (argc ==1) {
694                 num_records = atoi(argv[0]) - 1;
695         }
696
697         ret = ctdb_ctrl_getstathistory(ctdb, TIMELIMIT(), options.pnn, ctdb, &stats);
698         if (ret != 0) {
699                 DEBUG(DEBUG_ERR, ("Unable to get rolling statistics from node %u\n", options.pnn));
700                 return ret;
701         }
702         for (i=0;i<stats->num;i++) {
703                 if (stats->stats[i].statistics_start_time.tv_sec == 0) {
704                         continue;
705                 }
706                 show_statistics(&stats->stats[i], i==0);
707                 if (i == num_records) {
708                         break;
709                 }
710         }
711         return 0;
712 }
713
714
715 /*
716   display remote ctdb db statistics
717  */
718 static int control_dbstatistics(struct ctdb_context *ctdb, int argc, const char **argv)
719 {
720         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
721         struct ctdb_db_statistics *dbstat;
722         int i;
723         uint32_t db_id;
724         int num_hot_keys;
725         int ret;
726
727         if (argc < 1) {
728                 usage();
729         }
730
731         if (!db_exists(ctdb, argv[0], &db_id, NULL, NULL)) {
732                 return -1;
733         }
734
735         ret = ctdb_ctrl_dbstatistics(ctdb, options.pnn, db_id, tmp_ctx, &dbstat);
736         if (ret != 0) {
737                 DEBUG(DEBUG_ERR,("Failed to read db statistics from node\n"));
738                 talloc_free(tmp_ctx);
739                 return -1;
740         }
741
742         printf("DB Statistics: %s\n", argv[0]);
743         printf(" %*s%-22s%*s%10u\n", 0, "", "ro_delegations", 4, "",
744                 dbstat->db_ro_delegations);
745         printf(" %*s%-22s%*s%10u\n", 0, "", "ro_revokes", 4, "",
746                 dbstat->db_ro_delegations);
747         printf(" %s\n", "locks");
748         printf(" %*s%-22s%*s%10u\n", 4, "", "total", 0, "",
749                 dbstat->locks.num_calls);
750         printf(" %*s%-22s%*s%10u\n", 4, "", "failed", 0, "",
751                 dbstat->locks.num_failed);
752         printf(" %*s%-22s%*s%10u\n", 4, "", "current", 0, "",
753                 dbstat->locks.num_current);
754         printf(" %*s%-22s%*s%10u\n", 4, "", "pending", 0, "",
755                 dbstat->locks.num_pending);
756         printf(" %s", "hop_count_buckets:");
757         for (i=0; i<MAX_COUNT_BUCKETS; i++) {
758                 printf(" %d", dbstat->hop_count_bucket[i]);
759         }
760         printf("\n");
761         printf(" %s", "lock_buckets:");
762         for (i=0; i<MAX_COUNT_BUCKETS; i++) {
763                 printf(" %d", dbstat->locks.buckets[i]);
764         }
765         printf("\n");
766         printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n",
767                 "locks_latency      MIN/AVG/MAX",
768                 dbstat->locks.latency.min,
769                 (dbstat->locks.latency.num ?
770                  dbstat->locks.latency.total /dbstat->locks.latency.num :
771                  0.0),
772                 dbstat->locks.latency.max,
773                 dbstat->locks.latency.num);
774         printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n",
775                 "vacuum_latency     MIN/AVG/MAX",
776                 dbstat->vacuum.latency.min,
777                 (dbstat->vacuum.latency.num ?
778                  dbstat->vacuum.latency.total /dbstat->vacuum.latency.num :
779                  0.0),
780                 dbstat->vacuum.latency.max,
781                 dbstat->vacuum.latency.num);
782         num_hot_keys = 0;
783         for (i=0; i<dbstat->num_hot_keys; i++) {
784                 if (dbstat->hot_keys[i].count > 0) {
785                         num_hot_keys++;
786                 }
787         }
788         dbstat->num_hot_keys = num_hot_keys;
789
790         printf(" Num Hot Keys:     %d\n", dbstat->num_hot_keys);
791         for (i = 0; i < dbstat->num_hot_keys; i++) {
792                 int j;
793                 printf("     Count:%d Key:", dbstat->hot_keys[i].count);
794                 for (j = 0; j < dbstat->hot_keys[i].key.dsize; j++) {
795                         printf("%02x", dbstat->hot_keys[i].key.dptr[j]&0xff);
796                 }
797                 printf("\n");
798         }
799
800         talloc_free(tmp_ctx);
801         return 0;
802 }
803
804 /*
805   display uptime of remote node
806  */
807 static int control_uptime(struct ctdb_context *ctdb, int argc, const char **argv)
808 {
809         int ret;
810         struct ctdb_uptime *uptime = NULL;
811         int tmp, days, hours, minutes, seconds;
812
813         ret = ctdb_ctrl_uptime(ctdb, ctdb, TIMELIMIT(), options.pnn, &uptime);
814         if (ret != 0) {
815                 DEBUG(DEBUG_ERR, ("Unable to get uptime from node %u\n", options.pnn));
816                 return ret;
817         }
818
819         if (options.machinereadable){
820                 printm(":Current Node Time:Ctdb Start Time:Last Recovery/Failover Time:Last Recovery/IPFailover Duration:\n");
821                 printm(":%u:%u:%u:%lf\n",
822                         (unsigned int)uptime->current_time.tv_sec,
823                         (unsigned int)uptime->ctdbd_start_time.tv_sec,
824                         (unsigned int)uptime->last_recovery_finished.tv_sec,
825                         timeval_delta(&uptime->last_recovery_finished,
826                                       &uptime->last_recovery_started)
827                 );
828                 return 0;
829         }
830
831         printf("Current time of node          :                %s", ctime(&uptime->current_time.tv_sec));
832
833         tmp = uptime->current_time.tv_sec - uptime->ctdbd_start_time.tv_sec;
834         seconds = tmp%60;
835         tmp    /= 60;
836         minutes = tmp%60;
837         tmp    /= 60;
838         hours   = tmp%24;
839         tmp    /= 24;
840         days    = tmp;
841         printf("Ctdbd start time              : (%03d %02d:%02d:%02d) %s", days, hours, minutes, seconds, ctime(&uptime->ctdbd_start_time.tv_sec));
842
843         tmp = uptime->current_time.tv_sec - uptime->last_recovery_finished.tv_sec;
844         seconds = tmp%60;
845         tmp    /= 60;
846         minutes = tmp%60;
847         tmp    /= 60;
848         hours   = tmp%24;
849         tmp    /= 24;
850         days    = tmp;
851         printf("Time of last recovery/failover: (%03d %02d:%02d:%02d) %s", days, hours, minutes, seconds, ctime(&uptime->last_recovery_finished.tv_sec));
852         
853         printf("Duration of last recovery/failover: %lf seconds\n",
854                 timeval_delta(&uptime->last_recovery_finished,
855                               &uptime->last_recovery_started));
856
857         return 0;
858 }
859
860 /*
861   show the PNN of the current node
862  */
863 static int control_pnn(struct ctdb_context *ctdb, int argc, const char **argv)
864 {
865         uint32_t mypnn;
866
867         mypnn = getpnn(ctdb);
868
869         printf("PNN:%d\n", mypnn);
870         return 0;
871 }
872
873
874 static struct ctdb_node_map *read_nodes_file(TALLOC_CTX *mem_ctx)
875 {
876         const char *nodes_list;
877
878         /* read the nodes file */
879         nodes_list = getenv("CTDB_NODES");
880         if (nodes_list == NULL) {
881                 nodes_list = talloc_asprintf(mem_ctx, "%s/nodes",
882                                              getenv("CTDB_BASE"));
883                 if (nodes_list == NULL) {
884                         DEBUG(DEBUG_ALERT,(__location__ " Out of memory\n"));
885                         exit(1);
886                 }
887         }
888
889         return ctdb_read_nodes_file(mem_ctx, nodes_list);
890 }
891
892 /*
893   show the PNN of the current node
894   discover the pnn by loading the nodes file and try to bind to all
895   addresses one at a time until the ip address is found.
896  */
897 static int find_node_xpnn(void)
898 {
899         TALLOC_CTX *mem_ctx = talloc_new(NULL);
900         struct ctdb_node_map *node_map;
901         int i, pnn;
902
903         node_map = read_nodes_file(mem_ctx);
904         if (node_map == NULL) {
905                 talloc_free(mem_ctx);
906                 return -1;
907         }
908
909         for (i = 0; i < node_map->num; i++) {
910                 if (node_map->nodes[i].flags & NODE_FLAGS_DELETED) {
911                         continue;
912                 }
913                 if (ctdb_sys_have_ip(&node_map->nodes[i].addr)) {
914                         pnn = node_map->nodes[i].pnn;
915                         talloc_free(mem_ctx);
916                         return pnn;
917                 }
918         }
919
920         printf("Failed to detect which PNN this node is\n");
921         talloc_free(mem_ctx);
922         return -1;
923 }
924
925 static int control_xpnn(struct ctdb_context *ctdb, int argc, const char **argv)
926 {
927         uint32_t pnn;
928
929         assert_single_node_only();
930
931         pnn = find_node_xpnn();
932         if (pnn == -1) {
933                 return -1;
934         }
935
936         printf("PNN:%d\n", pnn);
937         return 0;
938 }
939
940 /* Helpers for ctdb status
941  */
942 static bool is_partially_online(struct ctdb_context *ctdb, struct ctdb_node_and_flags *node)
943 {
944         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
945         int j;
946         bool ret = false;
947
948         if (node->flags == 0) {
949                 struct ctdb_control_get_ifaces *ifaces;
950
951                 if (ctdb_ctrl_get_ifaces(ctdb, TIMELIMIT(), node->pnn,
952                                          tmp_ctx, &ifaces) == 0) {
953                         for (j=0; j < ifaces->num; j++) {
954                                 if (ifaces->ifaces[j].link_state != 0) {
955                                         continue;
956                                 }
957                                 ret = true;
958                                 break;
959                         }
960                 }
961         }
962         talloc_free(tmp_ctx);
963
964         return ret;
965 }
966
967 static void control_status_header_machine(void)
968 {
969         printm(":Node:IP:Disconnected:Banned:Disabled:Unhealthy:Stopped"
970                ":Inactive:PartiallyOnline:ThisNode:\n");
971 }
972
973 static int control_status_1_machine(struct ctdb_context *ctdb, int mypnn,
974                                     struct ctdb_node_and_flags *node)
975 {
976         printm(":%d:%s:%d:%d:%d:%d:%d:%d:%d:%c:\n", node->pnn,
977                ctdb_addr_to_str(&node->addr),
978                !!(node->flags&NODE_FLAGS_DISCONNECTED),
979                !!(node->flags&NODE_FLAGS_BANNED),
980                !!(node->flags&NODE_FLAGS_PERMANENTLY_DISABLED),
981                !!(node->flags&NODE_FLAGS_UNHEALTHY),
982                !!(node->flags&NODE_FLAGS_STOPPED),
983                !!(node->flags&NODE_FLAGS_INACTIVE),
984                is_partially_online(ctdb, node) ? 1 : 0,
985                (node->pnn == mypnn)?'Y':'N');
986
987         return node->flags;
988 }
989
990 static int control_status_1_human(struct ctdb_context *ctdb, int mypnn,
991                                   struct ctdb_node_and_flags *node)
992 {
993        printf("pnn:%d %-16s %s%s\n", node->pnn,
994               ctdb_addr_to_str(&node->addr),
995               is_partially_online(ctdb, node) ? "PARTIALLYONLINE" : pretty_print_flags(node->flags),
996               node->pnn == mypnn?" (THIS NODE)":"");
997
998        return node->flags;
999 }
1000
1001 /*
1002   display remote ctdb status
1003  */
1004 static int control_status(struct ctdb_context *ctdb, int argc, const char **argv)
1005 {
1006         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1007         int i;
1008         struct ctdb_vnn_map *vnnmap=NULL;
1009         struct ctdb_node_map *nodemap=NULL;
1010         uint32_t recmode, recmaster, mypnn;
1011         int num_deleted_nodes = 0;
1012         int ret;
1013
1014         mypnn = getpnn(ctdb);
1015
1016         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &nodemap);
1017         if (ret != 0) {
1018                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
1019                 talloc_free(tmp_ctx);
1020                 return -1;
1021         }
1022
1023         if (options.machinereadable) {
1024                 control_status_header_machine();
1025                 for (i=0;i<nodemap->num;i++) {
1026                         if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
1027                                 continue;
1028                         }
1029                         (void) control_status_1_machine(ctdb, mypnn,
1030                                                         &nodemap->nodes[i]);
1031                 }
1032                 talloc_free(tmp_ctx);
1033                 return 0;
1034         }
1035
1036         for (i=0; i<nodemap->num; i++) {
1037                 if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
1038                         num_deleted_nodes++;
1039                 }
1040         }
1041         if (num_deleted_nodes == 0) {
1042                 printf("Number of nodes:%d\n", nodemap->num);
1043         } else {
1044                 printf("Number of nodes:%d (including %d deleted nodes)\n",
1045                        nodemap->num, num_deleted_nodes);
1046         }
1047         for(i=0;i<nodemap->num;i++){
1048                 if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
1049                         continue;
1050                 }
1051                 (void) control_status_1_human(ctdb, mypnn, &nodemap->nodes[i]);
1052         }
1053
1054         ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &vnnmap);
1055         if (ret != 0) {
1056                 DEBUG(DEBUG_ERR, ("Unable to get vnnmap from node %u\n", options.pnn));
1057                 talloc_free(tmp_ctx);
1058                 return -1;
1059         }
1060         if (vnnmap->generation == INVALID_GENERATION) {
1061                 printf("Generation:INVALID\n");
1062         } else {
1063                 printf("Generation:%d\n",vnnmap->generation);
1064         }
1065         printf("Size:%d\n",vnnmap->size);
1066         for(i=0;i<vnnmap->size;i++){
1067                 printf("hash:%d lmaster:%d\n", i, vnnmap->map[i]);
1068         }
1069
1070         ret = ctdb_ctrl_getrecmode(ctdb, tmp_ctx, TIMELIMIT(), options.pnn, &recmode);
1071         if (ret != 0) {
1072                 DEBUG(DEBUG_ERR, ("Unable to get recmode from node %u\n", options.pnn));
1073                 talloc_free(tmp_ctx);
1074                 return -1;
1075         }
1076         printf("Recovery mode:%s (%d)\n",recmode==CTDB_RECOVERY_NORMAL?"NORMAL":"RECOVERY",recmode);
1077
1078         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, TIMELIMIT(), options.pnn, &recmaster);
1079         if (ret != 0) {
1080                 DEBUG(DEBUG_ERR, ("Unable to get recmaster from node %u\n", options.pnn));
1081                 talloc_free(tmp_ctx);
1082                 return -1;
1083         }
1084         printf("Recovery master:%d\n",recmaster);
1085
1086         talloc_free(tmp_ctx);
1087         return 0;
1088 }
1089
1090 static int control_nodestatus(struct ctdb_context *ctdb, int argc, const char **argv)
1091 {
1092         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1093         int i, ret;
1094         struct ctdb_node_map *nodemap=NULL;
1095         uint32_t * nodes;
1096         uint32_t pnn_mode, mypnn;
1097
1098         if (argc > 1) {
1099                 usage();
1100         }
1101
1102         if (!parse_nodestring(ctdb, tmp_ctx, argc == 1 ? argv[0] : NULL,
1103                               options.pnn, true, &nodes, &pnn_mode)) {
1104                 return -1;
1105         }
1106
1107         if (options.machinereadable) {
1108                 control_status_header_machine();
1109         } else if (pnn_mode == CTDB_BROADCAST_ALL) {
1110                 printf("Number of nodes:%d\n", (int) talloc_array_length(nodes));
1111         }
1112
1113         mypnn = getpnn(ctdb);
1114
1115         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &nodemap);
1116         if (ret != 0) {
1117                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
1118                 talloc_free(tmp_ctx);
1119                 return -1;
1120         }
1121
1122         ret = 0;
1123
1124         for (i = 0; i < talloc_array_length(nodes); i++) {
1125                 if (options.machinereadable) {
1126                         ret |= control_status_1_machine(ctdb, mypnn,
1127                                                         &nodemap->nodes[nodes[i]]);
1128                 } else {
1129                         ret |= control_status_1_human(ctdb, mypnn,
1130                                                       &nodemap->nodes[nodes[i]]);
1131                 }
1132         }
1133
1134         talloc_free(tmp_ctx);
1135         return ret;
1136 }
1137
1138 static struct ctdb_node_map *read_natgw_nodes_file(struct ctdb_context *ctdb,
1139                                                    TALLOC_CTX *mem_ctx)
1140 {
1141         const char *natgw_list;
1142         struct ctdb_node_map *natgw_nodes = NULL;
1143
1144         natgw_list = getenv("CTDB_NATGW_NODES");
1145         if (natgw_list == NULL) {
1146                 natgw_list = talloc_asprintf(mem_ctx, "%s/natgw_nodes",
1147                                              getenv("CTDB_BASE"));
1148                 if (natgw_list == NULL) {
1149                         DEBUG(DEBUG_ALERT,(__location__ " Out of memory\n"));
1150                         exit(1);
1151                 }
1152         }
1153         /* The PNNs/flags will be junk but they're not used */
1154         natgw_nodes = ctdb_read_nodes_file(mem_ctx, natgw_list);
1155         if (natgw_nodes == NULL) {
1156                 DEBUG(DEBUG_ERR,
1157                       ("Failed to load natgw node list '%s'\n", natgw_list));
1158         }
1159         return natgw_nodes;
1160 }
1161
1162
1163 /* talloc off the existing nodemap... */
1164 static struct ctdb_node_map *talloc_nodemap(struct ctdb_node_map *nodemap)
1165 {
1166         return talloc_zero_size(nodemap,
1167                                 offsetof(struct ctdb_node_map, nodes) +
1168                                 nodemap->num * sizeof(struct ctdb_node_and_flags));
1169 }
1170
1171 static struct ctdb_node_map *
1172 filter_nodemap_by_addrs(struct ctdb_context *ctdb,
1173                         struct ctdb_node_map *nodemap,
1174                         struct ctdb_node_map *natgw_nodes)
1175 {
1176         int i, j;
1177         struct ctdb_node_map *ret;
1178
1179         ret = talloc_nodemap(nodemap);
1180         CTDB_NO_MEMORY_NULL(ctdb, ret);
1181
1182         ret->num = 0;
1183
1184         for (i = 0; i < nodemap->num; i++) {
1185                 for(j = 0; j < natgw_nodes->num ; j++) {
1186                         if (nodemap->nodes[j].flags & NODE_FLAGS_DELETED) {
1187                                 continue;
1188                         }
1189                         if (ctdb_same_ip(&natgw_nodes->nodes[j].addr,
1190                                          &nodemap->nodes[i].addr)) {
1191
1192                                 ret->nodes[ret->num] = nodemap->nodes[i];
1193                                 ret->num++;
1194                                 break;
1195                         }
1196                 }
1197         }
1198
1199         return ret;
1200 }
1201
1202 static struct ctdb_node_map *
1203 filter_nodemap_by_capabilities(struct ctdb_context *ctdb,
1204                                struct ctdb_node_map *nodemap,
1205                                uint32_t required_capabilities,
1206                                bool first_only)
1207 {
1208         int i;
1209         uint32_t capabilities;
1210         struct ctdb_node_map *ret;
1211
1212         ret = talloc_nodemap(nodemap);
1213         CTDB_NO_MEMORY_NULL(ctdb, ret);
1214
1215         ret->num = 0;
1216
1217         for (i = 0; i < nodemap->num; i++) {
1218                 int res;
1219
1220                 /* Disconnected nodes have no capabilities! */
1221                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1222                         continue;
1223                 }
1224
1225                 res = ctdb_ctrl_getcapabilities(ctdb, TIMELIMIT(),
1226                                                 nodemap->nodes[i].pnn,
1227                                                 &capabilities);
1228                 if (res != 0) {
1229                         DEBUG(DEBUG_ERR, ("Unable to get capabilities from node %u\n",
1230                                           nodemap->nodes[i].pnn));
1231                         talloc_free(ret);
1232                         return NULL;
1233                 }
1234                 if (!(capabilities & required_capabilities)) {
1235                         continue;
1236                 }
1237
1238                 ret->nodes[ret->num] = nodemap->nodes[i];
1239                 ret->num++;
1240                 if (first_only) {
1241                         break;
1242                 }
1243         }
1244
1245         return ret;
1246 }
1247
1248 static struct ctdb_node_map *
1249 filter_nodemap_by_flags(struct ctdb_context *ctdb,
1250                         struct ctdb_node_map *nodemap,
1251                         uint32_t flags_mask)
1252 {
1253         int i;
1254         struct ctdb_node_map *ret;
1255
1256         ret = talloc_nodemap(nodemap);
1257         CTDB_NO_MEMORY_NULL(ctdb, ret);
1258
1259         ret->num = 0;
1260
1261         for (i = 0; i < nodemap->num; i++) {
1262                 if (nodemap->nodes[i].flags & flags_mask) {
1263                         continue;
1264                 }
1265
1266                 ret->nodes[ret->num] = nodemap->nodes[i];
1267                 ret->num++;
1268         }
1269
1270         return ret;
1271 }
1272
1273 /*
1274   display the list of nodes belonging to this natgw configuration
1275  */
1276 static int control_natgwlist(struct ctdb_context *ctdb, int argc, const char **argv)
1277 {
1278         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1279         int i, ret;
1280         struct ctdb_node_map *natgw_nodes = NULL;
1281         struct ctdb_node_map *orig_nodemap=NULL;
1282         struct ctdb_node_map *nodemap;
1283         uint32_t mypnn, pnn;
1284         const char *ip;
1285
1286         /* When we have some nodes that could be the NATGW, make a
1287          * series of attempts to find the first node that doesn't have
1288          * certain status flags set.
1289          */
1290         uint32_t exclude_flags[] = {
1291                 /* Look for a nice healthy node */
1292                 NODE_FLAGS_DISCONNECTED|NODE_FLAGS_STOPPED|NODE_FLAGS_DELETED|NODE_FLAGS_BANNED|NODE_FLAGS_UNHEALTHY,
1293                 /* If not found, an UNHEALTHY/BANNED node will do */
1294                 NODE_FLAGS_DISCONNECTED|NODE_FLAGS_STOPPED|NODE_FLAGS_DELETED,
1295                 /* If not found, a STOPPED node will do */
1296                 NODE_FLAGS_DISCONNECTED|NODE_FLAGS_DELETED,
1297                 0,
1298         };
1299
1300         /* read the natgw nodes file into a linked list */
1301         natgw_nodes = read_natgw_nodes_file(ctdb, tmp_ctx);
1302         if (natgw_nodes == NULL) {
1303                 ret = -1;
1304                 goto done;
1305         }
1306
1307         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE,
1308                                    tmp_ctx, &orig_nodemap);
1309         if (ret != 0) {
1310                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node.\n"));
1311                 talloc_free(tmp_ctx);
1312                 return -1;
1313         }
1314
1315         /* Get a nodemap that includes only the nodes in the NATGW
1316          * group */
1317         nodemap = filter_nodemap_by_addrs(ctdb, orig_nodemap, natgw_nodes);
1318         if (nodemap == NULL) {
1319                 ret = -1;
1320                 goto done;
1321         }
1322
1323         ret = 2; /* matches ENOENT */
1324         pnn = -1;
1325         ip = "0.0.0.0";
1326         /* For each flag mask... */
1327         for (i = 0; exclude_flags[i] != 0; i++) {
1328                 /* ... get a nodemap that excludes nodes with with
1329                  * masked flags... */
1330                 struct ctdb_node_map *t =
1331                         filter_nodemap_by_flags(ctdb, nodemap,
1332                                                 exclude_flags[i]);
1333                 if (t == NULL) {
1334                         /* No memory */
1335                         ret = -1;
1336                         goto done;
1337                 }
1338                 if (t->num > 0) {
1339                         /* ... and find the first node with the NATGW
1340                          * capability */
1341                         struct ctdb_node_map *n;
1342                         n = filter_nodemap_by_capabilities(ctdb, t,
1343                                                            CTDB_CAP_NATGW,
1344                                                            true);
1345                         if (n == NULL) {
1346                                 /* No memory */
1347                                 ret = -1;
1348                                 goto done;
1349                         }
1350                         if (n->num > 0) {
1351                                 ret = 0;
1352                                 pnn = n->nodes[0].pnn;
1353                                 ip = ctdb_addr_to_str(&n->nodes[0].addr);
1354                                 break;
1355                         }
1356                 }
1357                 talloc_free(t);
1358         }
1359
1360         if (options.machinereadable) {
1361                 printm(":Node:IP:\n");
1362                 printm(":%d:%s:\n", pnn, ip);
1363         } else {
1364                 printf("%d %s\n", pnn, ip);
1365         }
1366
1367         /* print the pruned list of nodes belonging to this natgw list */
1368         mypnn = getpnn(ctdb);
1369         if (options.machinereadable) {
1370                 control_status_header_machine();
1371         } else {
1372                 printf("Number of nodes:%d\n", nodemap->num);
1373         }
1374         for(i=0;i<nodemap->num;i++){
1375                 if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
1376                         continue;
1377                 }
1378                 if (options.machinereadable) {
1379                         control_status_1_machine(ctdb, mypnn, &(nodemap->nodes[i]));
1380                 } else {
1381                         control_status_1_human(ctdb, mypnn, &(nodemap->nodes[i]));
1382                 }
1383         }
1384
1385 done:
1386         talloc_free(tmp_ctx);
1387         return ret;
1388 }
1389
1390 /*
1391   display the status of the scripts for monitoring (or other events)
1392  */
1393 static int control_one_scriptstatus(struct ctdb_context *ctdb,
1394                                     enum ctdb_eventscript_call type)
1395 {
1396         struct ctdb_scripts_wire *script_status;
1397         int ret, i;
1398
1399         ret = ctdb_ctrl_getscriptstatus(ctdb, TIMELIMIT(), options.pnn, ctdb, type, &script_status);
1400         if (ret != 0) {
1401                 DEBUG(DEBUG_ERR, ("Unable to get script status from node %u\n", options.pnn));
1402                 return ret;
1403         }
1404
1405         if (script_status == NULL) {
1406                 if (!options.machinereadable) {
1407                         printf("%s cycle never run\n",
1408                                ctdb_eventscript_call_names[type]);
1409                 }
1410                 return 0;
1411         }
1412
1413         if (!options.machinereadable) {
1414                 int num_run = 0;
1415                 for (i=0; i<script_status->num_scripts; i++) {
1416                         if (script_status->scripts[i].status != -ENOEXEC) {
1417                                 num_run++;
1418                         }
1419                 }
1420                 printf("%d scripts were executed last %s cycle\n",
1421                        num_run,
1422                        ctdb_eventscript_call_names[type]);
1423         }
1424         for (i=0; i<script_status->num_scripts; i++) {
1425                 const char *status = NULL;
1426
1427                 /* The ETIME status is ignored for certain events.
1428                  * In that case the status is 0, but endtime is not set.
1429                  */
1430                 if (script_status->scripts[i].status == 0 &&
1431                     timeval_is_zero(&script_status->scripts[i].finished)) {
1432                         script_status->scripts[i].status = -ETIME;
1433                 }
1434
1435                 switch (script_status->scripts[i].status) {
1436                 case -ETIME:
1437                         status = "TIMEDOUT";
1438                         break;
1439                 case -ENOEXEC:
1440                         status = "DISABLED";
1441                         break;
1442                 case 0:
1443                         status = "OK";
1444                         break;
1445                 default:
1446                         if (script_status->scripts[i].status > 0)
1447                                 status = "ERROR";
1448                         break;
1449                 }
1450                 if (options.machinereadable) {
1451                         printm(":%s:%s:%i:%s:%lu.%06lu:%lu.%06lu:%s:\n",
1452                                ctdb_eventscript_call_names[type],
1453                                script_status->scripts[i].name,
1454                                script_status->scripts[i].status,
1455                                status,
1456                                (long)script_status->scripts[i].start.tv_sec,
1457                                (long)script_status->scripts[i].start.tv_usec,
1458                                (long)script_status->scripts[i].finished.tv_sec,
1459                                (long)script_status->scripts[i].finished.tv_usec,
1460                                script_status->scripts[i].output);
1461                         continue;
1462                 }
1463                 if (status)
1464                         printf("%-20s Status:%s    ",
1465                                script_status->scripts[i].name, status);
1466                 else
1467                         /* Some other error, eg from stat. */
1468                         printf("%-20s Status:CANNOT RUN (%s)",
1469                                script_status->scripts[i].name,
1470                                strerror(-script_status->scripts[i].status));
1471
1472                 if (script_status->scripts[i].status >= 0) {
1473                         printf("Duration:%.3lf ",
1474                         timeval_delta(&script_status->scripts[i].finished,
1475                               &script_status->scripts[i].start));
1476                 }
1477                 if (script_status->scripts[i].status != -ENOEXEC) {
1478                         printf("%s",
1479                                ctime(&script_status->scripts[i].start.tv_sec));
1480                         if (script_status->scripts[i].status != 0) {
1481                                 printf("   OUTPUT:%s\n",
1482                                        script_status->scripts[i].output);
1483                         }
1484                 } else {
1485                         printf("\n");
1486                 }
1487         }
1488         return 0;
1489 }
1490
1491
1492 static int control_scriptstatus(struct ctdb_context *ctdb,
1493                                 int argc, const char **argv)
1494 {
1495         int ret;
1496         enum ctdb_eventscript_call type, min, max;
1497         const char *arg;
1498
1499         if (argc > 1) {
1500                 DEBUG(DEBUG_ERR, ("Unknown arguments to scriptstatus\n"));
1501                 return -1;
1502         }
1503
1504         if (argc == 0)
1505                 arg = ctdb_eventscript_call_names[CTDB_EVENT_MONITOR];
1506         else
1507                 arg = argv[0];
1508
1509         for (type = 0; type < CTDB_EVENT_MAX; type++) {
1510                 if (strcmp(arg, ctdb_eventscript_call_names[type]) == 0) {
1511                         min = type;
1512                         max = type+1;
1513                         break;
1514                 }
1515         }
1516         if (type == CTDB_EVENT_MAX) {
1517                 if (strcmp(arg, "all") == 0) {
1518                         min = 0;
1519                         max = CTDB_EVENT_MAX;
1520                 } else {
1521                         DEBUG(DEBUG_ERR, ("Unknown event type %s\n", argv[0]));
1522                         return -1;
1523                 }
1524         }
1525
1526         if (options.machinereadable) {
1527                 printm(":Type:Name:Code:Status:Start:End:Error Output...:\n");
1528         }
1529
1530         for (type = min; type < max; type++) {
1531                 ret = control_one_scriptstatus(ctdb, type);
1532                 if (ret != 0) {
1533                         return ret;
1534                 }
1535         }
1536
1537         return 0;
1538 }
1539
1540 /*
1541   enable an eventscript
1542  */
1543 static int control_enablescript(struct ctdb_context *ctdb, int argc, const char **argv)
1544 {
1545         int ret;
1546
1547         if (argc < 1) {
1548                 usage();
1549         }
1550
1551         ret = ctdb_ctrl_enablescript(ctdb, TIMELIMIT(), options.pnn, argv[0]);
1552         if (ret != 0) {
1553           DEBUG(DEBUG_ERR, ("Unable to enable script %s on node %u\n", argv[0], options.pnn));
1554                 return ret;
1555         }
1556
1557         return 0;
1558 }
1559
1560 /*
1561   disable an eventscript
1562  */
1563 static int control_disablescript(struct ctdb_context *ctdb, int argc, const char **argv)
1564 {
1565         int ret;
1566
1567         if (argc < 1) {
1568                 usage();
1569         }
1570
1571         ret = ctdb_ctrl_disablescript(ctdb, TIMELIMIT(), options.pnn, argv[0]);
1572         if (ret != 0) {
1573           DEBUG(DEBUG_ERR, ("Unable to disable script %s on node %u\n", argv[0], options.pnn));
1574                 return ret;
1575         }
1576
1577         return 0;
1578 }
1579
1580 /*
1581   display the pnn of the recovery master
1582  */
1583 static int control_recmaster(struct ctdb_context *ctdb, int argc, const char **argv)
1584 {
1585         uint32_t recmaster;
1586         int ret;
1587
1588         ret = ctdb_ctrl_getrecmaster(ctdb, ctdb, TIMELIMIT(), options.pnn, &recmaster);
1589         if (ret != 0) {
1590                 DEBUG(DEBUG_ERR, ("Unable to get recmaster from node %u\n", options.pnn));
1591                 return -1;
1592         }
1593         printf("%d\n",recmaster);
1594
1595         return 0;
1596 }
1597
1598 /*
1599   add a tickle to a public address
1600  */
1601 static int control_add_tickle(struct ctdb_context *ctdb, int argc, const char **argv)
1602 {
1603         struct ctdb_tcp_connection t;
1604         TDB_DATA data;
1605         int ret;
1606
1607         assert_single_node_only();
1608
1609         if (argc < 2) {
1610                 usage();
1611         }
1612
1613         if (parse_ip_port(argv[0], &t.src_addr) == 0) {
1614                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
1615                 return -1;
1616         }
1617         if (parse_ip_port(argv[1], &t.dst_addr) == 0) {
1618                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[1]));
1619                 return -1;
1620         }
1621
1622         data.dptr = (uint8_t *)&t;
1623         data.dsize = sizeof(t);
1624
1625         /* tell all nodes about this tcp connection */
1626         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_TCP_ADD_DELAYED_UPDATE,
1627                            0, data, ctdb, NULL, NULL, NULL, NULL);
1628         if (ret != 0) {
1629                 DEBUG(DEBUG_ERR,("Failed to add tickle\n"));
1630                 return -1;
1631         }
1632         
1633         return 0;
1634 }
1635
1636
1637 /*
1638   delete a tickle from a node
1639  */
1640 static int control_del_tickle(struct ctdb_context *ctdb, int argc, const char **argv)
1641 {
1642         struct ctdb_tcp_connection t;
1643         TDB_DATA data;
1644         int ret;
1645
1646         assert_single_node_only();
1647
1648         if (argc < 2) {
1649                 usage();
1650         }
1651
1652         if (parse_ip_port(argv[0], &t.src_addr) == 0) {
1653                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
1654                 return -1;
1655         }
1656         if (parse_ip_port(argv[1], &t.dst_addr) == 0) {
1657                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[1]));
1658                 return -1;
1659         }
1660
1661         data.dptr = (uint8_t *)&t;
1662         data.dsize = sizeof(t);
1663
1664         /* tell all nodes about this tcp connection */
1665         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_TCP_REMOVE,
1666                            0, data, ctdb, NULL, NULL, NULL, NULL);
1667         if (ret != 0) {
1668                 DEBUG(DEBUG_ERR,("Failed to remove tickle\n"));
1669                 return -1;
1670         }
1671         
1672         return 0;
1673 }
1674
1675
1676 /*
1677   get a list of all tickles for this pnn
1678  */
1679 static int control_get_tickles(struct ctdb_context *ctdb, int argc, const char **argv)
1680 {
1681         struct ctdb_control_tcp_tickle_list *list;
1682         ctdb_sock_addr addr;
1683         int i, ret;
1684         unsigned port = 0;
1685
1686         assert_single_node_only();
1687
1688         if (argc < 1) {
1689                 usage();
1690         }
1691
1692         if (argc == 2) {
1693                 port = atoi(argv[1]);
1694         }
1695
1696         if (parse_ip(argv[0], NULL, 0, &addr) == 0) {
1697                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
1698                 return -1;
1699         }
1700
1701         ret = ctdb_ctrl_get_tcp_tickles(ctdb, TIMELIMIT(), options.pnn, ctdb, &addr, &list);
1702         if (ret == -1) {
1703                 DEBUG(DEBUG_ERR, ("Unable to list tickles\n"));
1704                 return -1;
1705         }
1706
1707         if (options.machinereadable){
1708                 printm(":source ip:port:destination ip:port:\n");
1709                 for (i=0;i<list->tickles.num;i++) {
1710                         if (port && port != ntohs(list->tickles.connections[i].dst_addr.ip.sin_port)) {
1711                                 continue;
1712                         }
1713                         printm(":%s:%u", ctdb_addr_to_str(&list->tickles.connections[i].src_addr), ntohs(list->tickles.connections[i].src_addr.ip.sin_port));
1714                         printm(":%s:%u:\n", ctdb_addr_to_str(&list->tickles.connections[i].dst_addr), ntohs(list->tickles.connections[i].dst_addr.ip.sin_port));
1715                 }
1716         } else {
1717                 printf("Tickles for ip:%s\n", ctdb_addr_to_str(&list->addr));
1718                 printf("Num tickles:%u\n", list->tickles.num);
1719                 for (i=0;i<list->tickles.num;i++) {
1720                         if (port && port != ntohs(list->tickles.connections[i].dst_addr.ip.sin_port)) {
1721                                 continue;
1722                         }
1723                         printf("SRC: %s:%u   ", ctdb_addr_to_str(&list->tickles.connections[i].src_addr), ntohs(list->tickles.connections[i].src_addr.ip.sin_port));
1724                         printf("DST: %s:%u\n", ctdb_addr_to_str(&list->tickles.connections[i].dst_addr), ntohs(list->tickles.connections[i].dst_addr.ip.sin_port));
1725                 }
1726         }
1727
1728         talloc_free(list);
1729         
1730         return 0;
1731 }
1732
1733
1734 static int move_ip(struct ctdb_context *ctdb, ctdb_sock_addr *addr, uint32_t pnn)
1735 {
1736         struct ctdb_all_public_ips *ips;
1737         struct ctdb_public_ip ip;
1738         int i, ret;
1739         uint32_t *nodes;
1740         uint32_t disable_time;
1741         TDB_DATA data;
1742         struct ctdb_node_map *nodemap=NULL;
1743         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1744
1745         disable_time = 30;
1746         data.dptr  = (uint8_t*)&disable_time;
1747         data.dsize = sizeof(disable_time);
1748         ret = ctdb_client_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_DISABLE_IP_CHECK, data);
1749         if (ret != 0) {
1750                 DEBUG(DEBUG_ERR,("Failed to send message to disable ipcheck\n"));
1751                 return -1;
1752         }
1753
1754
1755
1756         /* read the public ip list from the node */
1757         ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), pnn, ctdb, &ips);
1758         if (ret != 0) {
1759                 DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %u\n", pnn));
1760                 talloc_free(tmp_ctx);
1761                 return -1;
1762         }
1763
1764         for (i=0;i<ips->num;i++) {
1765                 if (ctdb_same_ip(addr, &ips->ips[i].addr)) {
1766                         break;
1767                 }
1768         }
1769         if (i==ips->num) {
1770                 DEBUG(DEBUG_ERR, ("Node %u can not host ip address '%s'\n",
1771                         pnn, ctdb_addr_to_str(addr)));
1772                 talloc_free(tmp_ctx);
1773                 return -1;
1774         }
1775
1776         ip.pnn  = pnn;
1777         ip.addr = *addr;
1778
1779         data.dptr  = (uint8_t *)&ip;
1780         data.dsize = sizeof(ip);
1781
1782         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &nodemap);
1783         if (ret != 0) {
1784                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
1785                 talloc_free(tmp_ctx);
1786                 return ret;
1787         }
1788
1789         nodes = list_of_nodes(ctdb, nodemap, tmp_ctx, NODE_FLAGS_INACTIVE, pnn);
1790         ret = ctdb_client_async_control(ctdb, CTDB_CONTROL_RELEASE_IP,
1791                                         nodes, 0,
1792                                         LONGTIMELIMIT(),
1793                                         false, data,
1794                                         NULL, NULL,
1795                                         NULL);
1796         if (ret != 0) {
1797                 DEBUG(DEBUG_ERR,("Failed to release IP on nodes\n"));
1798                 talloc_free(tmp_ctx);
1799                 return -1;
1800         }
1801
1802         ret = ctdb_ctrl_takeover_ip(ctdb, LONGTIMELIMIT(), pnn, &ip);
1803         if (ret != 0) {
1804                 DEBUG(DEBUG_ERR,("Failed to take over IP on node %d\n", pnn));
1805                 talloc_free(tmp_ctx);
1806                 return -1;
1807         }
1808
1809         /* update the recovery daemon so it now knows to expect the new
1810            node assignment for this ip.
1811         */
1812         ret = ctdb_client_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_RECD_UPDATE_IP, data);
1813         if (ret != 0) {
1814                 DEBUG(DEBUG_ERR,("Failed to send message to update the ip on the recovery master.\n"));
1815                 return -1;
1816         }
1817
1818         talloc_free(tmp_ctx);
1819         return 0;
1820 }
1821
1822
1823 /* 
1824  * scans all other nodes and returns a pnn for another node that can host this 
1825  * ip address or -1
1826  */
1827 static int
1828 find_other_host_for_public_ip(struct ctdb_context *ctdb, ctdb_sock_addr *addr)
1829 {
1830         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1831         struct ctdb_all_public_ips *ips;
1832         struct ctdb_node_map *nodemap=NULL;
1833         int i, j, ret;
1834         int pnn;
1835
1836         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1837         if (ret != 0) {
1838                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
1839                 talloc_free(tmp_ctx);
1840                 return ret;
1841         }
1842
1843         for(i=0;i<nodemap->num;i++){
1844                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1845                         continue;
1846                 }
1847                 if (nodemap->nodes[i].pnn == options.pnn) {
1848                         continue;
1849                 }
1850
1851                 /* read the public ip list from this node */
1852                 ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, tmp_ctx, &ips);
1853                 if (ret != 0) {
1854                         DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %u\n", nodemap->nodes[i].pnn));
1855                         return -1;
1856                 }
1857
1858                 for (j=0;j<ips->num;j++) {
1859                         if (ctdb_same_ip(addr, &ips->ips[j].addr)) {
1860                                 pnn = nodemap->nodes[i].pnn;
1861                                 talloc_free(tmp_ctx);
1862                                 return pnn;
1863                         }
1864                 }
1865                 talloc_free(ips);
1866         }
1867
1868         talloc_free(tmp_ctx);
1869         return -1;
1870 }
1871
1872 /* If pnn is -1 then try to find a node to move IP to... */
1873 static bool try_moveip(struct ctdb_context *ctdb, ctdb_sock_addr *addr, uint32_t pnn)
1874 {
1875         bool pnn_specified = (pnn == -1 ? false : true);
1876         int retries = 0;
1877
1878         while (retries < 5) {
1879                 if (!pnn_specified) {
1880                         pnn = find_other_host_for_public_ip(ctdb, addr);
1881                         if (pnn == -1) {
1882                                 return false;
1883                         }
1884                         DEBUG(DEBUG_NOTICE,
1885                               ("Trying to move public IP to node %u\n", pnn));
1886                 }
1887
1888                 if (move_ip(ctdb, addr, pnn) == 0) {
1889                         return true;
1890                 }
1891
1892                 sleep(3);
1893                 retries++;
1894         }
1895
1896         return false;
1897 }
1898
1899
1900 /*
1901   move/failover an ip address to a specific node
1902  */
1903 static int control_moveip(struct ctdb_context *ctdb, int argc, const char **argv)
1904 {
1905         uint32_t pnn;
1906         ctdb_sock_addr addr;
1907
1908         assert_single_node_only();
1909
1910         if (argc < 2) {
1911                 usage();
1912                 return -1;
1913         }
1914
1915         if (parse_ip(argv[0], NULL, 0, &addr) == 0) {
1916                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
1917                 return -1;
1918         }
1919
1920
1921         if (sscanf(argv[1], "%u", &pnn) != 1) {
1922                 DEBUG(DEBUG_ERR, ("Badly formed pnn\n"));
1923                 return -1;
1924         }
1925
1926         if (!try_moveip(ctdb, &addr, pnn)) {
1927                 DEBUG(DEBUG_ERR,("Failed to move IP to node %d.\n", pnn));
1928                 return -1;
1929         }
1930
1931         return 0;
1932 }
1933
1934 static int rebalance_node(struct ctdb_context *ctdb, uint32_t pnn)
1935 {
1936         TDB_DATA data;
1937
1938         data.dptr  = (uint8_t *)&pnn;
1939         data.dsize = sizeof(uint32_t);
1940         if (ctdb_client_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_REBALANCE_NODE, data) != 0) {
1941                 DEBUG(DEBUG_ERR,
1942                       ("Failed to send message to force node %u to be a rebalancing target\n",
1943                        pnn));
1944                 return -1;
1945         }
1946
1947         return 0;
1948 }
1949
1950
1951 /*
1952   rebalance a node by setting it to allow failback and triggering a
1953   takeover run
1954  */
1955 static int control_rebalancenode(struct ctdb_context *ctdb, int argc, const char **argv)
1956 {
1957         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1958         uint32_t *nodes;
1959         uint32_t pnn_mode;
1960         int i, ret;
1961
1962         assert_single_node_only();
1963
1964         if (argc > 1) {
1965                 usage();
1966         }
1967
1968         /* Determine the nodes where IPs need to be reloaded */
1969         if (!parse_nodestring(ctdb, tmp_ctx, argc == 1 ? argv[0] : NULL,
1970                               options.pnn, true, &nodes, &pnn_mode)) {
1971                 ret = -1;
1972                 goto done;
1973         }
1974
1975         for (i = 0; i < talloc_array_length(nodes); i++) {
1976                 if (!rebalance_node(ctdb, nodes[i])) {
1977                         ret = -1;
1978                 }
1979         }
1980
1981 done:
1982         talloc_free(tmp_ctx);
1983         return ret;
1984 }
1985
1986 static int rebalance_ip(struct ctdb_context *ctdb, ctdb_sock_addr *addr)
1987 {
1988         struct ctdb_public_ip ip;
1989         int ret;
1990         uint32_t *nodes;
1991         uint32_t disable_time;
1992         TDB_DATA data;
1993         struct ctdb_node_map *nodemap=NULL;
1994         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1995
1996         disable_time = 30;
1997         data.dptr  = (uint8_t*)&disable_time;
1998         data.dsize = sizeof(disable_time);
1999         ret = ctdb_client_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_DISABLE_IP_CHECK, data);
2000         if (ret != 0) {
2001                 DEBUG(DEBUG_ERR,("Failed to send message to disable ipcheck\n"));
2002                 return -1;
2003         }
2004
2005         ip.pnn  = -1;
2006         ip.addr = *addr;
2007
2008         data.dptr  = (uint8_t *)&ip;
2009         data.dsize = sizeof(ip);
2010
2011         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &nodemap);
2012         if (ret != 0) {
2013                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
2014                 talloc_free(tmp_ctx);
2015                 return ret;
2016         }
2017
2018         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
2019         ret = ctdb_client_async_control(ctdb, CTDB_CONTROL_RELEASE_IP,
2020                                         nodes, 0,
2021                                         LONGTIMELIMIT(),
2022                                         false, data,
2023                                         NULL, NULL,
2024                                         NULL);
2025         if (ret != 0) {
2026                 DEBUG(DEBUG_ERR,("Failed to release IP on nodes\n"));
2027                 talloc_free(tmp_ctx);
2028                 return -1;
2029         }
2030
2031         talloc_free(tmp_ctx);
2032         return 0;
2033 }
2034
2035 /*
2036   release an ip form all nodes and have it re-assigned by recd
2037  */
2038 static int control_rebalanceip(struct ctdb_context *ctdb, int argc, const char **argv)
2039 {
2040         ctdb_sock_addr addr;
2041
2042         assert_single_node_only();
2043
2044         if (argc < 1) {
2045                 usage();
2046                 return -1;
2047         }
2048
2049         if (parse_ip(argv[0], NULL, 0, &addr) == 0) {
2050                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
2051                 return -1;
2052         }
2053
2054         if (rebalance_ip(ctdb, &addr) != 0) {
2055                 DEBUG(DEBUG_ERR,("Error when trying to reassign ip\n"));
2056                 return -1;
2057         }
2058
2059         return 0;
2060 }
2061
2062 static int getips_store_callback(void *param, void *data)
2063 {
2064         struct ctdb_public_ip *node_ip = (struct ctdb_public_ip *)data;
2065         struct ctdb_all_public_ips *ips = param;
2066         int i;
2067
2068         i = ips->num++;
2069         ips->ips[i].pnn  = node_ip->pnn;
2070         ips->ips[i].addr = node_ip->addr;
2071         return 0;
2072 }
2073
2074 static int getips_count_callback(void *param, void *data)
2075 {
2076         uint32_t *count = param;
2077
2078         (*count)++;
2079         return 0;
2080 }
2081
2082 #define IP_KEYLEN       4
2083 static uint32_t *ip_key(ctdb_sock_addr *ip)
2084 {
2085         static uint32_t key[IP_KEYLEN];
2086
2087         bzero(key, sizeof(key));
2088
2089         switch (ip->sa.sa_family) {
2090         case AF_INET:
2091                 key[0]  = ip->ip.sin_addr.s_addr;
2092                 break;
2093         case AF_INET6: {
2094                 uint32_t *s6_a32 = (uint32_t *)&(ip->ip6.sin6_addr.s6_addr);
2095                 key[0]  = s6_a32[3];
2096                 key[1]  = s6_a32[2];
2097                 key[2]  = s6_a32[1];
2098                 key[3]  = s6_a32[0];
2099                 break;
2100         }
2101         default:
2102                 DEBUG(DEBUG_ERR, (__location__ " ERROR, unknown family passed :%u\n", ip->sa.sa_family));
2103                 return key;
2104         }
2105
2106         return key;
2107 }
2108
2109 static void *add_ip_callback(void *parm, void *data)
2110 {
2111         return parm;
2112 }
2113
2114 static int
2115 control_get_all_public_ips(struct ctdb_context *ctdb, TALLOC_CTX *tmp_ctx, struct ctdb_all_public_ips **ips)
2116 {
2117         struct ctdb_all_public_ips *tmp_ips;
2118         struct ctdb_node_map *nodemap=NULL;
2119         trbt_tree_t *ip_tree;
2120         int i, j, len, ret;
2121         uint32_t count;
2122
2123         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
2124         if (ret != 0) {
2125                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
2126                 return ret;
2127         }
2128
2129         ip_tree = trbt_create(tmp_ctx, 0);
2130
2131         for(i=0;i<nodemap->num;i++){
2132                 if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
2133                         continue;
2134                 }
2135                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
2136                         continue;
2137                 }
2138
2139                 /* read the public ip list from this node */
2140                 ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, tmp_ctx, &tmp_ips);
2141                 if (ret != 0) {
2142                         DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %u\n", nodemap->nodes[i].pnn));
2143                         return -1;
2144                 }
2145         
2146                 for (j=0; j<tmp_ips->num;j++) {
2147                         struct ctdb_public_ip *node_ip;
2148
2149                         node_ip = talloc(tmp_ctx, struct ctdb_public_ip);
2150                         node_ip->pnn  = tmp_ips->ips[j].pnn;
2151                         node_ip->addr = tmp_ips->ips[j].addr;
2152
2153                         trbt_insertarray32_callback(ip_tree,
2154                                 IP_KEYLEN, ip_key(&tmp_ips->ips[j].addr),
2155                                 add_ip_callback,
2156                                 node_ip);
2157                 }
2158                 talloc_free(tmp_ips);
2159         }
2160
2161         /* traverse */
2162         count = 0;
2163         trbt_traversearray32(ip_tree, IP_KEYLEN, getips_count_callback, &count);
2164
2165         len = offsetof(struct ctdb_all_public_ips, ips) + 
2166                 count*sizeof(struct ctdb_public_ip);
2167         tmp_ips = talloc_zero_size(tmp_ctx, len);
2168         trbt_traversearray32(ip_tree, IP_KEYLEN, getips_store_callback, tmp_ips);
2169
2170         *ips = tmp_ips;
2171
2172         return 0;
2173 }
2174
2175
2176 static void ctdb_every_second(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
2177 {
2178         struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
2179
2180         event_add_timed(ctdb->ev, ctdb, 
2181                                 timeval_current_ofs(1, 0),
2182                                 ctdb_every_second, ctdb);
2183 }
2184
2185 struct srvid_reply_handler_data {
2186         bool done;
2187         bool wait_for_all;
2188         uint32_t *nodes;
2189         const char *srvid_str;
2190 };
2191
2192 static void srvid_broadcast_reply_handler(struct ctdb_context *ctdb,
2193                                          uint64_t srvid,
2194                                          TDB_DATA data,
2195                                          void *private_data)
2196 {
2197         struct srvid_reply_handler_data *d =
2198                 (struct srvid_reply_handler_data *)private_data;
2199         int i;
2200         int32_t ret;
2201
2202         if (data.dsize != sizeof(ret)) {
2203                 DEBUG(DEBUG_ERR, (__location__ " Wrong reply size\n"));
2204                 return;
2205         }
2206
2207         /* ret will be a PNN (i.e. >=0) on success, or negative on error */
2208         ret = *(int32_t *)data.dptr;
2209         if (ret < 0) {
2210                 DEBUG(DEBUG_ERR,
2211                       ("%s failed with result %d\n", d->srvid_str, ret));
2212                 return;
2213         }
2214
2215         if (!d->wait_for_all) {
2216                 d->done = true;
2217                 return;
2218         }
2219
2220         /* Wait for all replies */
2221         d->done = true;
2222         for (i = 0; i < talloc_array_length(d->nodes); i++) {
2223                 if (d->nodes[i] == ret) {
2224                         DEBUG(DEBUG_DEBUG,
2225                               ("%s reply received from node %u\n",
2226                                d->srvid_str, ret));
2227                         d->nodes[i] = -1;
2228                 }
2229                 if (d->nodes[i] != -1) {
2230                         /* Found a node that hasn't yet replied */
2231                         d->done = false;
2232                 }
2233         }
2234 }
2235
2236 /* Broadcast the given SRVID to all connected nodes.  Wait for 1 reply
2237  * or replies from all connected nodes.  arg is the data argument to
2238  * pass in the srvid_request structure - pass 0 if this isn't needed.
2239  */
2240 static int srvid_broadcast(struct ctdb_context *ctdb,
2241                            uint64_t srvid, uint32_t *arg,
2242                            const char *srvid_str, bool wait_for_all)
2243 {
2244         int ret;
2245         TDB_DATA data;
2246         uint32_t pnn;
2247         uint64_t reply_srvid;
2248         struct srvid_request request;
2249         struct srvid_request_data request_data;
2250         struct srvid_reply_handler_data reply_data;
2251         struct timeval tv;
2252
2253         ZERO_STRUCT(request);
2254
2255         /* Time ticks to enable timeouts to be processed */
2256         event_add_timed(ctdb->ev, ctdb, 
2257                                 timeval_current_ofs(1, 0),
2258                                 ctdb_every_second, ctdb);
2259
2260         pnn = ctdb_get_pnn(ctdb);
2261         reply_srvid = getpid();
2262
2263         if (arg == NULL) {
2264                 request.pnn = pnn;
2265                 request.srvid = reply_srvid;
2266
2267                 data.dptr = (uint8_t *)&request;
2268                 data.dsize = sizeof(request);
2269         } else {
2270                 request_data.pnn = pnn;
2271                 request_data.srvid = reply_srvid;
2272                 request_data.data = *arg;
2273
2274                 data.dptr = (uint8_t *)&request_data;
2275                 data.dsize = sizeof(request_data);
2276         }
2277
2278         /* Register message port for reply from recovery master */
2279         ctdb_client_set_message_handler(ctdb, reply_srvid,
2280                                         srvid_broadcast_reply_handler,
2281                                         &reply_data);
2282
2283         reply_data.wait_for_all = wait_for_all;
2284         reply_data.nodes = NULL;
2285         reply_data.srvid_str = srvid_str;
2286
2287 again:
2288         reply_data.done = false;
2289
2290         if (wait_for_all) {
2291                 struct ctdb_node_map *nodemap;
2292
2293                 ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(),
2294                                            CTDB_CURRENT_NODE, ctdb, &nodemap);
2295                 if (ret != 0) {
2296                         DEBUG(DEBUG_ERR,
2297                               ("Unable to get nodemap from current node, try again\n"));
2298                         sleep(1);
2299                         goto again;
2300                 }
2301
2302                 if (reply_data.nodes != NULL) {
2303                         talloc_free(reply_data.nodes);
2304                 }
2305                 reply_data.nodes = list_of_connected_nodes(ctdb, nodemap,
2306                                                            NULL, true);
2307
2308                 talloc_free(nodemap);
2309         }
2310
2311         /* Send to all connected nodes. Only recmaster replies */
2312         ret = ctdb_client_send_message(ctdb, CTDB_BROADCAST_CONNECTED,
2313                                        srvid, data);
2314         if (ret != 0) {
2315                 /* This can only happen if the socket is closed and
2316                  * there's no way to recover from that, so don't try
2317                  * again.
2318                  */
2319                 DEBUG(DEBUG_ERR,
2320                       ("Failed to send %s request to connected nodes\n",
2321                        srvid_str));
2322                 return -1;
2323         }
2324
2325         tv = timeval_current();
2326         /* This loop terminates the reply is received */
2327         while (timeval_elapsed(&tv) < 5.0 && !reply_data.done) {
2328                 event_loop_once(ctdb->ev);
2329         }
2330
2331         if (!reply_data.done) {
2332                 DEBUG(DEBUG_NOTICE,
2333                       ("Still waiting for confirmation of %s\n", srvid_str));
2334                 sleep(1);
2335                 goto again;
2336         }
2337
2338         ctdb_client_remove_message_handler(ctdb, reply_srvid, &reply_data);
2339
2340         talloc_free(reply_data.nodes);
2341
2342         return 0;
2343 }
2344
2345 static int ipreallocate(struct ctdb_context *ctdb)
2346 {
2347         return srvid_broadcast(ctdb, CTDB_SRVID_TAKEOVER_RUN, NULL,
2348                                "IP reallocation", false);
2349 }
2350
2351
2352 static int control_ipreallocate(struct ctdb_context *ctdb, int argc, const char **argv)
2353 {
2354         return ipreallocate(ctdb);
2355 }
2356
2357 /*
2358   add a public ip address to a node
2359  */
2360 static int control_addip(struct ctdb_context *ctdb, int argc, const char **argv)
2361 {
2362         int i, ret;
2363         int len, retries = 0;
2364         unsigned mask;
2365         ctdb_sock_addr addr;
2366         struct ctdb_control_ip_iface *pub;
2367         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2368         struct ctdb_all_public_ips *ips;
2369
2370
2371         if (argc != 2) {
2372                 talloc_free(tmp_ctx);
2373                 usage();
2374         }
2375
2376         if (!parse_ip_mask(argv[0], argv[1], &addr, &mask)) {
2377                 DEBUG(DEBUG_ERR, ("Badly formed ip/mask : %s\n", argv[0]));
2378                 talloc_free(tmp_ctx);
2379                 return -1;
2380         }
2381
2382         /* read the public ip list from the node */
2383         ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &ips);
2384         if (ret != 0) {
2385                 DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %u\n", options.pnn));
2386                 talloc_free(tmp_ctx);
2387                 return -1;
2388         }
2389         for (i=0;i<ips->num;i++) {
2390                 if (ctdb_same_ip(&addr, &ips->ips[i].addr)) {
2391                         DEBUG(DEBUG_ERR,("Can not add ip to node. Node already hosts this ip\n"));
2392                         return 0;
2393                 }
2394         }
2395
2396
2397
2398         /* Dont timeout. This command waits for an ip reallocation
2399            which sometimes can take wuite a while if there has
2400            been a recent recovery
2401         */
2402         alarm(0);
2403
2404         len = offsetof(struct ctdb_control_ip_iface, iface) + strlen(argv[1]) + 1;
2405         pub = talloc_size(tmp_ctx, len); 
2406         CTDB_NO_MEMORY(ctdb, pub);
2407
2408         pub->addr  = addr;
2409         pub->mask  = mask;
2410         pub->len   = strlen(argv[1])+1;
2411         memcpy(&pub->iface[0], argv[1], strlen(argv[1])+1);
2412
2413         do {
2414                 ret = ctdb_ctrl_add_public_ip(ctdb, TIMELIMIT(), options.pnn, pub);
2415                 if (ret != 0) {
2416                         DEBUG(DEBUG_ERR, ("Unable to add public ip to node %u. Wait 3 seconds and try again.\n", options.pnn));
2417                         sleep(3);
2418                         retries++;
2419                 }
2420         } while (retries < 5 && ret != 0);
2421         if (ret != 0) {
2422                 DEBUG(DEBUG_ERR, ("Unable to add public ip to node %u. Giving up.\n", options.pnn));
2423                 talloc_free(tmp_ctx);
2424                 return ret;
2425         }
2426
2427         if (rebalance_node(ctdb, options.pnn) != 0) {
2428                 DEBUG(DEBUG_ERR,("Error when trying to rebalance node\n"));
2429                 return ret;
2430         }
2431
2432         talloc_free(tmp_ctx);
2433         return 0;
2434 }
2435
2436 /*
2437   add a public ip address to a node
2438  */
2439 static int control_ipiface(struct ctdb_context *ctdb, int argc, const char **argv)
2440 {
2441         ctdb_sock_addr addr;
2442         char *iface = NULL;
2443
2444         if (argc != 1) {
2445                 usage();
2446         }
2447
2448         if (!parse_ip(argv[0], NULL, 0, &addr)) {
2449                 printf("Badly formed ip : %s\n", argv[0]);
2450                 return -1;
2451         }
2452
2453         iface = ctdb_sys_find_ifname(&addr);
2454         if (iface == NULL) {
2455                 printf("Failed to get interface name for ip: %s", argv[0]);
2456                 return -1;
2457         }
2458
2459         printf("IP on interface %s\n", iface);
2460
2461         free(iface);
2462
2463         return 0;
2464 }
2465
2466 static int control_delip(struct ctdb_context *ctdb, int argc, const char **argv);
2467
2468 static int control_delip_all(struct ctdb_context *ctdb, int argc, const char **argv, ctdb_sock_addr *addr)
2469 {
2470         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2471         struct ctdb_node_map *nodemap=NULL;
2472         struct ctdb_all_public_ips *ips;
2473         int ret, i, j;
2474
2475         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
2476         if (ret != 0) {
2477                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from current node\n"));
2478                 return ret;
2479         }
2480
2481         /* remove it from the nodes that are not hosting the ip currently */
2482         for(i=0;i<nodemap->num;i++){
2483                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2484                         continue;
2485                 }
2486                 ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, tmp_ctx, &ips);
2487                 if (ret != 0) {
2488                         DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %d\n", nodemap->nodes[i].pnn));
2489                         continue;
2490                 }
2491
2492                 for (j=0;j<ips->num;j++) {
2493                         if (ctdb_same_ip(addr, &ips->ips[j].addr)) {
2494                                 break;
2495                         }
2496                 }
2497                 if (j==ips->num) {
2498                         continue;
2499                 }
2500
2501                 if (ips->ips[j].pnn == nodemap->nodes[i].pnn) {
2502                         continue;
2503                 }
2504
2505                 options.pnn = nodemap->nodes[i].pnn;
2506                 control_delip(ctdb, argc, argv);
2507         }
2508
2509
2510         /* remove it from every node (also the one hosting it) */
2511         for(i=0;i<nodemap->num;i++){
2512                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2513                         continue;
2514                 }
2515                 ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, tmp_ctx, &ips);
2516                 if (ret != 0) {
2517                         DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %d\n", nodemap->nodes[i].pnn));
2518                         continue;
2519                 }
2520
2521                 for (j=0;j<ips->num;j++) {
2522                         if (ctdb_same_ip(addr, &ips->ips[j].addr)) {
2523                                 break;
2524                         }
2525                 }
2526                 if (j==ips->num) {
2527                         continue;
2528                 }
2529
2530                 options.pnn = nodemap->nodes[i].pnn;
2531                 control_delip(ctdb, argc, argv);
2532         }
2533
2534         talloc_free(tmp_ctx);
2535         return 0;
2536 }
2537         
2538 /*
2539   delete a public ip address from a node
2540  */
2541 static int control_delip(struct ctdb_context *ctdb, int argc, const char **argv)
2542 {
2543         int i, ret;
2544         ctdb_sock_addr addr;
2545         struct ctdb_control_ip_iface pub;
2546         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2547         struct ctdb_all_public_ips *ips;
2548
2549         if (argc != 1) {
2550                 talloc_free(tmp_ctx);
2551                 usage();
2552         }
2553
2554         if (parse_ip(argv[0], NULL, 0, &addr) == 0) {
2555                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
2556                 return -1;
2557         }
2558
2559         if (options.pnn == CTDB_BROADCAST_ALL) {
2560                 return control_delip_all(ctdb, argc, argv, &addr);
2561         }
2562
2563         pub.addr  = addr;
2564         pub.mask  = 0;
2565         pub.len   = 0;
2566
2567         ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &ips);
2568         if (ret != 0) {
2569                 DEBUG(DEBUG_ERR, ("Unable to get public ip list from cluster\n"));
2570                 talloc_free(tmp_ctx);
2571                 return ret;
2572         }
2573         
2574         for (i=0;i<ips->num;i++) {
2575                 if (ctdb_same_ip(&addr, &ips->ips[i].addr)) {
2576                         break;
2577                 }
2578         }
2579
2580         if (i==ips->num) {
2581                 DEBUG(DEBUG_ERR, ("This node does not support this public address '%s'\n",
2582                         ctdb_addr_to_str(&addr)));
2583                 talloc_free(tmp_ctx);
2584                 return -1;
2585         }
2586
2587         /* This is an optimisation.  If this node is hosting the IP
2588          * then try to move it somewhere else without invoking a full
2589          * takeover run.  We don't care if this doesn't work!
2590          */
2591         if (ips->ips[i].pnn == options.pnn) {
2592                 (void) try_moveip(ctdb, &addr, -1);
2593         }
2594
2595         ret = ctdb_ctrl_del_public_ip(ctdb, TIMELIMIT(), options.pnn, &pub);
2596         if (ret != 0) {
2597                 DEBUG(DEBUG_ERR, ("Unable to del public ip from node %u\n", options.pnn));
2598                 talloc_free(tmp_ctx);
2599                 return ret;
2600         }
2601
2602         talloc_free(tmp_ctx);
2603         return 0;
2604 }
2605
2606 static int kill_tcp_from_file(struct ctdb_context *ctdb,
2607                               int argc, const char **argv)
2608 {
2609         struct ctdb_control_killtcp *killtcp;
2610         int max_entries, current, i;
2611         struct timeval timeout;
2612         char line[128], src[128], dst[128];
2613         int linenum;
2614         TDB_DATA data;
2615         struct client_async_data *async_data;
2616         struct ctdb_client_control_state *state;
2617
2618         if (argc != 0) {
2619                 usage();
2620         }
2621
2622         linenum = 1;
2623         killtcp = NULL;
2624         max_entries = 0;
2625         current = 0;
2626         while (!feof(stdin)) {
2627                 if (fgets(line, sizeof(line), stdin) == NULL) {
2628                         continue;
2629                 }
2630
2631                 /* Silently skip empty lines */
2632                 if (line[0] == '\n') {
2633                         continue;
2634                 }
2635
2636                 if (sscanf(line, "%s %s\n", src, dst) != 2) {
2637                         DEBUG(DEBUG_ERR, ("Bad line [%d]: '%s'\n",
2638                                           linenum, line));
2639                         talloc_free(killtcp);
2640                         return -1;
2641                 }
2642
2643                 if (current >= max_entries) {
2644                         max_entries += 1024;
2645                         killtcp = talloc_realloc(ctdb, killtcp,
2646                                                  struct ctdb_control_killtcp,
2647                                                  max_entries);
2648                         CTDB_NO_MEMORY(ctdb, killtcp);
2649                 }
2650
2651                 if (!parse_ip_port(src, &killtcp[current].src_addr)) {
2652                         DEBUG(DEBUG_ERR, ("Bad IP:port on line [%d]: '%s'\n",
2653                                           linenum, src));
2654                         talloc_free(killtcp);
2655                         return -1;
2656                 }
2657
2658                 if (!parse_ip_port(dst, &killtcp[current].dst_addr)) {
2659                         DEBUG(DEBUG_ERR, ("Bad IP:port on line [%d]: '%s'\n",
2660                                           linenum, dst));
2661                         talloc_free(killtcp);
2662                         return -1;
2663                 }
2664
2665                 current++;
2666         }
2667
2668         async_data = talloc_zero(ctdb, struct client_async_data);
2669         if (async_data == NULL) {
2670                 talloc_free(killtcp);
2671                 return -1;
2672         }
2673
2674         for (i = 0; i < current; i++) {
2675
2676                 data.dsize = sizeof(struct ctdb_control_killtcp);
2677                 data.dptr  = (unsigned char *)&killtcp[i];
2678
2679                 timeout = TIMELIMIT();
2680                 state = ctdb_control_send(ctdb, options.pnn, 0,
2681                                           CTDB_CONTROL_KILL_TCP, 0, data,
2682                                           async_data, &timeout, NULL);
2683
2684                 if (state == NULL) {
2685                         DEBUG(DEBUG_ERR,
2686                               ("Failed to call async killtcp control to node %u\n",
2687                                options.pnn));
2688                         talloc_free(killtcp);
2689                         return -1;
2690                 }
2691                 
2692                 ctdb_client_async_add(async_data, state);
2693         }
2694
2695         if (ctdb_client_async_wait(ctdb, async_data) != 0) {
2696                 DEBUG(DEBUG_ERR,("killtcp failed\n"));
2697                 talloc_free(killtcp);
2698                 return -1;
2699         }
2700
2701         talloc_free(killtcp);
2702         return 0;
2703 }
2704
2705
2706 /*
2707   kill a tcp connection
2708  */
2709 static int kill_tcp(struct ctdb_context *ctdb, int argc, const char **argv)
2710 {
2711         int ret;
2712         struct ctdb_control_killtcp killtcp;
2713
2714         assert_single_node_only();
2715
2716         if (argc == 0) {
2717                 return kill_tcp_from_file(ctdb, argc, argv);
2718         }
2719
2720         if (argc < 2) {
2721                 usage();
2722         }
2723
2724         if (!parse_ip_port(argv[0], &killtcp.src_addr)) {
2725                 DEBUG(DEBUG_ERR, ("Bad IP:port '%s'\n", argv[0]));
2726                 return -1;
2727         }
2728
2729         if (!parse_ip_port(argv[1], &killtcp.dst_addr)) {
2730                 DEBUG(DEBUG_ERR, ("Bad IP:port '%s'\n", argv[1]));
2731                 return -1;
2732         }
2733
2734         ret = ctdb_ctrl_killtcp(ctdb, TIMELIMIT(), options.pnn, &killtcp);
2735         if (ret != 0) {
2736                 DEBUG(DEBUG_ERR, ("Unable to killtcp from node %u\n", options.pnn));
2737                 return ret;
2738         }
2739
2740         return 0;
2741 }
2742
2743
2744 /*
2745   send a gratious arp
2746  */
2747 static int control_gratious_arp(struct ctdb_context *ctdb, int argc, const char **argv)
2748 {
2749         int ret;
2750         ctdb_sock_addr addr;
2751
2752         assert_single_node_only();
2753
2754         if (argc < 2) {
2755                 usage();
2756         }
2757
2758         if (!parse_ip(argv[0], NULL, 0, &addr)) {
2759                 DEBUG(DEBUG_ERR, ("Bad IP '%s'\n", argv[0]));
2760                 return -1;
2761         }
2762
2763         ret = ctdb_ctrl_gratious_arp(ctdb, TIMELIMIT(), options.pnn, &addr, argv[1]);
2764         if (ret != 0) {
2765                 DEBUG(DEBUG_ERR, ("Unable to send gratious_arp from node %u\n", options.pnn));
2766                 return ret;
2767         }
2768
2769         return 0;
2770 }
2771
2772 /*
2773   register a server id
2774  */
2775 static int regsrvid(struct ctdb_context *ctdb, int argc, const char **argv)
2776 {
2777         int ret;
2778         struct ctdb_server_id server_id;
2779
2780         if (argc < 3) {
2781                 usage();
2782         }
2783
2784         server_id.pnn       = strtoul(argv[0], NULL, 0);
2785         server_id.type      = strtoul(argv[1], NULL, 0);
2786         server_id.server_id = strtoul(argv[2], NULL, 0);
2787
2788         ret = ctdb_ctrl_register_server_id(ctdb, TIMELIMIT(), &server_id);
2789         if (ret != 0) {
2790                 DEBUG(DEBUG_ERR, ("Unable to register server_id from node %u\n", options.pnn));
2791                 return ret;
2792         }
2793         DEBUG(DEBUG_ERR,("Srvid registered. Sleeping for 999 seconds\n"));
2794         sleep(999);
2795         return -1;
2796 }
2797
2798 /*
2799   unregister a server id
2800  */
2801 static int unregsrvid(struct ctdb_context *ctdb, int argc, const char **argv)
2802 {
2803         int ret;
2804         struct ctdb_server_id server_id;
2805
2806         if (argc < 3) {
2807                 usage();
2808         }
2809
2810         server_id.pnn       = strtoul(argv[0], NULL, 0);
2811         server_id.type      = strtoul(argv[1], NULL, 0);
2812         server_id.server_id = strtoul(argv[2], NULL, 0);
2813
2814         ret = ctdb_ctrl_unregister_server_id(ctdb, TIMELIMIT(), &server_id);
2815         if (ret != 0) {
2816                 DEBUG(DEBUG_ERR, ("Unable to unregister server_id from node %u\n", options.pnn));
2817                 return ret;
2818         }
2819         return -1;
2820 }
2821
2822 /*
2823   check if a server id exists
2824  */
2825 static int chksrvid(struct ctdb_context *ctdb, int argc, const char **argv)
2826 {
2827         uint32_t status = 0;
2828         int ret;
2829         struct ctdb_server_id server_id;
2830
2831         if (argc < 3) {
2832                 usage();
2833         }
2834
2835         server_id.pnn       = strtoul(argv[0], NULL, 0);
2836         server_id.type      = strtoul(argv[1], NULL, 0);
2837         server_id.server_id = strtoul(argv[2], NULL, 0);
2838
2839         ret = ctdb_ctrl_check_server_id(ctdb, TIMELIMIT(), options.pnn, &server_id, &status);
2840         if (ret != 0) {
2841                 DEBUG(DEBUG_ERR, ("Unable to check server_id from node %u\n", options.pnn));
2842                 return ret;
2843         }
2844
2845         if (status) {
2846                 printf("Server id %d:%d:%d EXISTS\n", server_id.pnn, server_id.type, server_id.server_id);
2847         } else {
2848                 printf("Server id %d:%d:%d does NOT exist\n", server_id.pnn, server_id.type, server_id.server_id);
2849         }
2850         return 0;
2851 }
2852
2853 /*
2854   get a list of all server ids that are registered on a node
2855  */
2856 static int getsrvids(struct ctdb_context *ctdb, int argc, const char **argv)
2857 {
2858         int i, ret;
2859         struct ctdb_server_id_list *server_ids;
2860
2861         ret = ctdb_ctrl_get_server_id_list(ctdb, ctdb, TIMELIMIT(), options.pnn, &server_ids);
2862         if (ret != 0) {
2863                 DEBUG(DEBUG_ERR, ("Unable to get server_id list from node %u\n", options.pnn));
2864                 return ret;
2865         }
2866
2867         for (i=0; i<server_ids->num; i++) {
2868                 printf("Server id %d:%d:%d\n", 
2869                         server_ids->server_ids[i].pnn, 
2870                         server_ids->server_ids[i].type, 
2871                         server_ids->server_ids[i].server_id); 
2872         }
2873
2874         return -1;
2875 }
2876
2877 /*
2878   check if a server id exists
2879  */
2880 static int check_srvids(struct ctdb_context *ctdb, int argc, const char **argv)
2881 {
2882         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
2883         uint64_t *ids;
2884         uint8_t *result;
2885         int i;
2886
2887         if (argc < 1) {
2888                 talloc_free(tmp_ctx);
2889                 usage();
2890         }
2891
2892         ids    = talloc_array(tmp_ctx, uint64_t, argc);
2893         result = talloc_array(tmp_ctx, uint8_t, argc);
2894
2895         for (i = 0; i < argc; i++) {
2896                 ids[i] = strtoull(argv[i], NULL, 0);
2897         }
2898
2899         if (!ctdb_client_check_message_handlers(ctdb, ids, argc, result)) {
2900                 DEBUG(DEBUG_ERR, ("Unable to check server_id from node %u\n",
2901                                   options.pnn));
2902                 talloc_free(tmp_ctx);
2903                 return -1;
2904         }
2905
2906         for (i=0; i < argc; i++) {
2907                 printf("Server id %d:%llu %s\n", options.pnn, (long long)ids[i],
2908                        result[i] ? "exists" : "does not exist");
2909         }
2910
2911         talloc_free(tmp_ctx);
2912         return 0;
2913 }
2914
2915 /*
2916   send a tcp tickle ack
2917  */
2918 static int tickle_tcp(struct ctdb_context *ctdb, int argc, const char **argv)
2919 {
2920         int ret;
2921         ctdb_sock_addr  src, dst;
2922
2923         if (argc < 2) {
2924                 usage();
2925         }
2926
2927         if (!parse_ip_port(argv[0], &src)) {
2928                 DEBUG(DEBUG_ERR, ("Bad IP:port '%s'\n", argv[0]));
2929                 return -1;
2930         }
2931
2932         if (!parse_ip_port(argv[1], &dst)) {
2933                 DEBUG(DEBUG_ERR, ("Bad IP:port '%s'\n", argv[1]));
2934                 return -1;
2935         }
2936
2937         ret = ctdb_sys_send_tcp(&src, &dst, 0, 0, 0);
2938         if (ret==0) {
2939                 return 0;
2940         }
2941         DEBUG(DEBUG_ERR, ("Error while sending tickle ack\n"));
2942
2943         return -1;
2944 }
2945
2946
2947 /*
2948   display public ip status
2949  */
2950 static int control_ip(struct ctdb_context *ctdb, int argc, const char **argv)
2951 {
2952         int i, ret;
2953         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2954         struct ctdb_all_public_ips *ips;
2955
2956         if (options.pnn == CTDB_BROADCAST_ALL) {
2957                 /* read the list of public ips from all nodes */
2958                 ret = control_get_all_public_ips(ctdb, tmp_ctx, &ips);
2959         } else {
2960                 /* read the public ip list from this node */
2961                 ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &ips);
2962         }
2963         if (ret != 0) {
2964                 DEBUG(DEBUG_ERR, ("Unable to get public ips from node %u\n", options.pnn));
2965                 talloc_free(tmp_ctx);
2966                 return ret;
2967         }
2968
2969         if (options.machinereadable){
2970                 printm(":Public IP:Node:");
2971                 if (options.verbose){
2972                         printm("ActiveInterface:AvailableInterfaces:ConfiguredInterfaces:");
2973                 }
2974                 printm("\n");
2975         } else {
2976                 if (options.pnn == CTDB_BROADCAST_ALL) {
2977                         printf("Public IPs on ALL nodes\n");
2978                 } else {
2979                         printf("Public IPs on node %u\n", options.pnn);
2980                 }
2981         }
2982
2983         for (i=1;i<=ips->num;i++) {
2984                 struct ctdb_control_public_ip_info *info = NULL;
2985                 int32_t pnn;
2986                 char *aciface = NULL;
2987                 char *avifaces = NULL;
2988                 char *cifaces = NULL;
2989
2990                 if (options.pnn == CTDB_BROADCAST_ALL) {
2991                         pnn = ips->ips[ips->num-i].pnn;
2992                 } else {
2993                         pnn = options.pnn;
2994                 }
2995
2996                 if (pnn != -1) {
2997                         ret = ctdb_ctrl_get_public_ip_info(ctdb, TIMELIMIT(), pnn, ctdb,
2998                                                    &ips->ips[ips->num-i].addr, &info);
2999                 } else {
3000                         ret = -1;
3001                 }
3002
3003                 if (ret == 0) {
3004                         int j;
3005                         for (j=0; j < info->num; j++) {
3006                                 if (cifaces == NULL) {
3007                                         cifaces = talloc_strdup(info,
3008                                                                 info->ifaces[j].name);
3009                                 } else {
3010                                         cifaces = talloc_asprintf_append(cifaces,
3011                                                                          ",%s",
3012                                                                          info->ifaces[j].name);
3013                                 }
3014
3015                                 if (info->active_idx == j) {
3016                                         aciface = info->ifaces[j].name;
3017                                 }
3018
3019                                 if (info->ifaces[j].link_state == 0) {
3020                                         continue;
3021                                 }
3022
3023                                 if (avifaces == NULL) {
3024                                         avifaces = talloc_strdup(info, info->ifaces[j].name);
3025                                 } else {
3026                                         avifaces = talloc_asprintf_append(avifaces,
3027                                                                           ",%s",
3028                                                                           info->ifaces[j].name);
3029                                 }
3030                         }
3031                 }
3032
3033                 if (options.machinereadable){
3034                         printm(":%s:%d:",
3035                                 ctdb_addr_to_str(&ips->ips[ips->num-i].addr),
3036                                 ips->ips[ips->num-i].pnn);
3037                         if (options.verbose){
3038                                 printm("%s:%s:%s:",
3039                                         aciface?aciface:"",
3040                                         avifaces?avifaces:"",
3041                                         cifaces?cifaces:"");
3042                         }
3043                         printf("\n");
3044                 } else {
3045                         if (options.verbose) {
3046                                 printf("%s node[%d] active[%s] available[%s] configured[%s]\n",
3047                                         ctdb_addr_to_str(&ips->ips[ips->num-i].addr),
3048                                         ips->ips[ips->num-i].pnn,
3049                                         aciface?aciface:"",
3050                                         avifaces?avifaces:"",
3051                                         cifaces?cifaces:"");
3052                         } else {
3053                                 printf("%s %d\n",
3054                                         ctdb_addr_to_str(&ips->ips[ips->num-i].addr),
3055                                         ips->ips[ips->num-i].pnn);
3056                         }
3057                 }
3058                 talloc_free(info);
3059         }
3060
3061         talloc_free(tmp_ctx);
3062         return 0;
3063 }
3064
3065 /*
3066   public ip info
3067  */
3068 static int control_ipinfo(struct ctdb_context *ctdb, int argc, const char **argv)
3069 {
3070         int i, ret;
3071         ctdb_sock_addr addr;
3072         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3073         struct ctdb_control_public_ip_info *info;
3074
3075         if (argc != 1) {
3076                 talloc_free(tmp_ctx);
3077                 usage();
3078         }
3079
3080         if (parse_ip(argv[0], NULL, 0, &addr) == 0) {
3081                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
3082                 return -1;
3083         }
3084
3085         /* read the public ip info from this node */
3086         ret = ctdb_ctrl_get_public_ip_info(ctdb, TIMELIMIT(), options.pnn,
3087                                            tmp_ctx, &addr, &info);
3088         if (ret != 0) {
3089                 DEBUG(DEBUG_ERR, ("Unable to get public ip[%s]info from node %u\n",
3090                                   argv[0], options.pnn));
3091                 talloc_free(tmp_ctx);
3092                 return ret;
3093         }
3094
3095         printf("Public IP[%s] info on node %u\n",
3096                ctdb_addr_to_str(&info->ip.addr),
3097                options.pnn);
3098
3099         printf("IP:%s\nCurrentNode:%d\nNumInterfaces:%u\n",
3100                ctdb_addr_to_str(&info->ip.addr),
3101                info->ip.pnn, info->num);
3102
3103         for (i=0; i<info->num; i++) {
3104                 info->ifaces[i].name[CTDB_IFACE_SIZE] = '\0';
3105
3106                 printf("Interface[%u]: Name:%s Link:%s References:%u%s\n",
3107                        i+1, info->ifaces[i].name,
3108                        info->ifaces[i].link_state?"up":"down",
3109                        (unsigned int)info->ifaces[i].references,
3110                        (i==info->active_idx)?" (active)":"");
3111         }
3112
3113         talloc_free(tmp_ctx);
3114         return 0;
3115 }
3116
3117 /*
3118   display interfaces status
3119  */
3120 static int control_ifaces(struct ctdb_context *ctdb, int argc, const char **argv)
3121 {
3122         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3123         int i;
3124         struct ctdb_control_get_ifaces *ifaces;
3125         int ret;
3126
3127         /* read the public ip list from this node */
3128         ret = ctdb_ctrl_get_ifaces(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &ifaces);
3129         if (ret != 0) {
3130                 DEBUG(DEBUG_ERR, ("Unable to get interfaces from node %u\n",
3131                                   options.pnn));
3132                 talloc_free(tmp_ctx);
3133                 return -1;
3134         }
3135
3136         if (options.machinereadable){
3137                 printm(":Name:LinkStatus:References:\n");
3138         } else {
3139                 printf("Interfaces on node %u\n", options.pnn);
3140         }
3141
3142         for (i=0; i<ifaces->num; i++) {
3143                 if (options.machinereadable){
3144                         printm(":%s:%s:%u:\n",
3145                                ifaces->ifaces[i].name,
3146                                ifaces->ifaces[i].link_state?"1":"0",
3147                                (unsigned int)ifaces->ifaces[i].references);
3148                 } else {
3149                         printf("name:%s link:%s references:%u\n",
3150                                ifaces->ifaces[i].name,
3151                                ifaces->ifaces[i].link_state?"up":"down",
3152                                (unsigned int)ifaces->ifaces[i].references);
3153                 }
3154         }
3155
3156         talloc_free(tmp_ctx);
3157         return 0;
3158 }
3159
3160
3161 /*
3162   set link status of an interface
3163  */
3164 static int control_setifacelink(struct ctdb_context *ctdb, int argc, const char **argv)
3165 {
3166         int ret;
3167         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3168         struct ctdb_control_iface_info info;
3169
3170         ZERO_STRUCT(info);
3171
3172         if (argc != 2) {
3173                 usage();
3174         }
3175
3176         if (strlen(argv[0]) > CTDB_IFACE_SIZE) {
3177                 DEBUG(DEBUG_ERR, ("interfaces name '%s' too long\n",
3178                                   argv[0]));
3179                 talloc_free(tmp_ctx);
3180                 return -1;
3181         }
3182         strcpy(info.name, argv[0]);
3183
3184         if (strcmp(argv[1], "up") == 0) {
3185                 info.link_state = 1;
3186         } else if (strcmp(argv[1], "down") == 0) {
3187                 info.link_state = 0;
3188         } else {
3189                 DEBUG(DEBUG_ERR, ("link state invalid '%s' should be 'up' or 'down'\n",
3190                                   argv[1]));
3191                 talloc_free(tmp_ctx);
3192                 return -1;
3193         }
3194
3195         /* read the public ip list from this node */
3196         ret = ctdb_ctrl_set_iface_link(ctdb, TIMELIMIT(), options.pnn,
3197                                    tmp_ctx, &info);
3198         if (ret != 0) {
3199                 DEBUG(DEBUG_ERR, ("Unable to set link state for interfaces %s node %u\n",
3200                                   argv[0], options.pnn));
3201                 talloc_free(tmp_ctx);
3202                 return ret;
3203         }
3204
3205         talloc_free(tmp_ctx);
3206         return 0;
3207 }
3208
3209 /*
3210   display pid of a ctdb daemon
3211  */
3212 static int control_getpid(struct ctdb_context *ctdb, int argc, const char **argv)
3213 {
3214         uint32_t pid;
3215         int ret;
3216
3217         ret = ctdb_ctrl_getpid(ctdb, TIMELIMIT(), options.pnn, &pid);
3218         if (ret != 0) {
3219                 DEBUG(DEBUG_ERR, ("Unable to get daemon pid from node %u\n", options.pnn));
3220                 return ret;
3221         }
3222         printf("Pid:%d\n", pid);
3223
3224         return 0;
3225 }
3226
3227 typedef bool update_flags_handler_t(struct ctdb_context *ctdb, void *data);
3228
3229 static int update_flags_and_ipreallocate(struct ctdb_context *ctdb,
3230                                               void *data,
3231                                               update_flags_handler_t handler,
3232                                               uint32_t flag,
3233                                               const char *desc,
3234                                               bool set_flag)
3235 {
3236         struct ctdb_node_map *nodemap = NULL;
3237         bool flag_is_set;
3238         int ret;
3239
3240         /* Check if the node is already in the desired state */
3241         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
3242         if (ret != 0) {
3243                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
3244                 exit(10);
3245         }
3246         flag_is_set = nodemap->nodes[options.pnn].flags & flag;
3247         if (set_flag == flag_is_set) {
3248                 DEBUG(DEBUG_NOTICE, ("Node %d is %s %s\n", options.pnn,
3249                                      (set_flag ? "already" : "not"), desc));
3250                 return 0;
3251         }
3252
3253         do {
3254                 if (!handler(ctdb, data)) {
3255                         DEBUG(DEBUG_WARNING,
3256                               ("Failed to send control to set state %s on node %u, try again\n",
3257                                desc, options.pnn));
3258                 }
3259
3260                 sleep(1);
3261
3262                 /* Read the nodemap and verify the change took effect.
3263                  * Even if the above control/hanlder timed out then it
3264                  * could still have worked!
3265                  */
3266                 ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE,
3267                                          ctdb, &nodemap);
3268                 if (ret != 0) {
3269                         DEBUG(DEBUG_WARNING,
3270                               ("Unable to get nodemap from local node, try again\n"));
3271                 }
3272                 flag_is_set = nodemap->nodes[options.pnn].flags & flag;
3273         } while (nodemap == NULL || (set_flag != flag_is_set));
3274
3275         return ipreallocate(ctdb);
3276 }
3277
3278 /* Administratively disable a node */
3279 static bool update_flags_disabled(struct ctdb_context *ctdb, void *data)
3280 {
3281         int ret;
3282
3283         ret = ctdb_ctrl_modflags(ctdb, TIMELIMIT(), options.pnn,
3284                                  NODE_FLAGS_PERMANENTLY_DISABLED, 0);
3285         return ret == 0;
3286 }
3287
3288 static int control_disable(struct ctdb_context *ctdb, int argc, const char **argv)
3289 {
3290         return update_flags_and_ipreallocate(ctdb, NULL,
3291                                                   update_flags_disabled,
3292                                                   NODE_FLAGS_PERMANENTLY_DISABLED,
3293                                                   "disabled",
3294                                                   true /* set_flag*/);
3295 }
3296
3297 /* Administratively re-enable a node */
3298 static bool update_flags_not_disabled(struct ctdb_context *ctdb, void *data)
3299 {
3300         int ret;
3301
3302         ret = ctdb_ctrl_modflags(ctdb, TIMELIMIT(), options.pnn,
3303                                  0, NODE_FLAGS_PERMANENTLY_DISABLED);
3304         return ret == 0;
3305 }
3306
3307 static int control_enable(struct ctdb_context *ctdb,  int argc, const char **argv)
3308 {
3309         return update_flags_and_ipreallocate(ctdb, NULL,
3310                                                   update_flags_not_disabled,
3311                                                   NODE_FLAGS_PERMANENTLY_DISABLED,
3312                                                   "disabled",
3313                                                   false /* set_flag*/);
3314 }
3315
3316 /* Stop a node */
3317 static bool update_flags_stopped(struct ctdb_context *ctdb, void *data)
3318 {
3319         int ret;
3320
3321         ret = ctdb_ctrl_stop_node(ctdb, TIMELIMIT(), options.pnn);
3322
3323         return ret == 0;
3324 }
3325
3326 static int control_stop(struct ctdb_context *ctdb, int argc, const char **argv)
3327 {
3328         return update_flags_and_ipreallocate(ctdb, NULL,
3329                                                   update_flags_stopped,
3330                                                   NODE_FLAGS_STOPPED,
3331                                                   "stopped",
3332                                                   true /* set_flag*/);
3333 }
3334
3335 /* Continue a stopped node */
3336 static bool update_flags_not_stopped(struct ctdb_context *ctdb, void *data)
3337 {
3338         int ret;
3339
3340         ret = ctdb_ctrl_continue_node(ctdb, TIMELIMIT(), options.pnn);
3341
3342         return ret == 0;
3343 }
3344
3345 static int control_continue(struct ctdb_context *ctdb, int argc, const char **argv)
3346 {
3347         return update_flags_and_ipreallocate(ctdb, NULL,
3348                                                   update_flags_not_stopped,
3349                                                   NODE_FLAGS_STOPPED,
3350                                                   "stopped",
3351                                                   false /* set_flag */);
3352 }
3353
3354 static uint32_t get_generation(struct ctdb_context *ctdb)
3355 {
3356         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3357         struct ctdb_vnn_map *vnnmap=NULL;
3358         int ret;
3359         uint32_t generation;
3360
3361         /* wait until the recmaster is not in recovery mode */
3362         while (1) {
3363                 uint32_t recmode, recmaster;
3364                 
3365                 if (vnnmap != NULL) {
3366                         talloc_free(vnnmap);
3367                         vnnmap = NULL;
3368                 }
3369
3370                 /* get the recmaster */
3371                 ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, TIMELIMIT(), CTDB_CURRENT_NODE, &recmaster);
3372                 if (ret != 0) {
3373                         DEBUG(DEBUG_ERR, ("Unable to get recmaster from node %u\n", options.pnn));
3374                         talloc_free(tmp_ctx);
3375                         exit(10);
3376                 }
3377
3378                 /* get recovery mode */
3379                 ret = ctdb_ctrl_getrecmode(ctdb, tmp_ctx, TIMELIMIT(), recmaster, &recmode);
3380                 if (ret != 0) {
3381                         DEBUG(DEBUG_ERR, ("Unable to get recmode from node %u\n", options.pnn));
3382                         talloc_free(tmp_ctx);
3383                         exit(10);
3384                 }
3385
3386                 /* get the current generation number */
3387                 ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), recmaster, tmp_ctx, &vnnmap);
3388                 if (ret != 0) {
3389                         DEBUG(DEBUG_ERR, ("Unable to get vnnmap from recmaster (%u)\n", recmaster));
3390                         talloc_free(tmp_ctx);
3391                         exit(10);
3392                 }
3393
3394                 if ((recmode == CTDB_RECOVERY_NORMAL) && (vnnmap->generation != 1)) {
3395                         generation = vnnmap->generation;
3396                         talloc_free(tmp_ctx);
3397                         return generation;
3398                 }
3399                 sleep(1);
3400         }
3401 }
3402
3403 /* Ban a node */
3404 static bool update_state_banned(struct ctdb_context *ctdb, void *data)
3405 {
3406         struct ctdb_ban_time *bantime = (struct ctdb_ban_time *)data;
3407         int ret;
3408
3409         ret = ctdb_ctrl_set_ban(ctdb, TIMELIMIT(), options.pnn, bantime);
3410
3411         return ret == 0;
3412 }
3413
3414 static int control_ban(struct ctdb_context *ctdb, int argc, const char **argv)
3415 {
3416         struct ctdb_ban_time bantime;
3417
3418         if (argc < 1) {
3419                 usage();
3420         }
3421         
3422         bantime.pnn  = options.pnn;
3423         bantime.time = strtoul(argv[0], NULL, 0);
3424
3425         if (bantime.time == 0) {
3426                 DEBUG(DEBUG_ERR, ("Invalid ban time specified - must be >0\n"));
3427                 return -1;
3428         }
3429
3430         return update_flags_and_ipreallocate(ctdb, &bantime,
3431                                                   update_state_banned,
3432                                                   NODE_FLAGS_BANNED,
3433                                                   "banned",
3434                                                   true /* set_flag*/);
3435 }
3436
3437
3438 /* Unban a node */
3439 static int control_unban(struct ctdb_context *ctdb, int argc, const char **argv)
3440 {
3441         struct ctdb_ban_time bantime;
3442
3443         bantime.pnn  = options.pnn;
3444         bantime.time = 0;
3445
3446         return update_flags_and_ipreallocate(ctdb, &bantime,
3447                                                   update_state_banned,
3448                                                   NODE_FLAGS_BANNED,
3449                                                   "banned",
3450                                                   false /* set_flag*/);
3451 }
3452
3453 /*
3454   show ban information for a node
3455  */
3456 static int control_showban(struct ctdb_context *ctdb, int argc, const char **argv)
3457 {
3458         int ret;
3459         struct ctdb_node_map *nodemap=NULL;
3460         struct ctdb_ban_time *bantime;
3461
3462         /* verify the node exists */
3463         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
3464         if (ret != 0) {
3465                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
3466                 return ret;
3467         }
3468
3469         ret = ctdb_ctrl_get_ban(ctdb, TIMELIMIT(), options.pnn, ctdb, &bantime);
3470         if (ret != 0) {
3471                 DEBUG(DEBUG_ERR,("Showing ban info for node %d failed.\n", options.pnn));
3472                 return -1;
3473         }       
3474
3475         if (bantime->time == 0) {
3476                 printf("Node %u is not banned\n", bantime->pnn);
3477         } else {
3478                 printf("Node %u is banned, %d seconds remaining\n",
3479                        bantime->pnn, bantime->time);
3480         }
3481
3482         return 0;
3483 }
3484
3485 /*
3486   shutdown a daemon
3487  */
3488 static int control_shutdown(struct ctdb_context *ctdb, int argc, const char **argv)
3489 {
3490         int ret;
3491
3492         ret = ctdb_ctrl_shutdown(ctdb, TIMELIMIT(), options.pnn);
3493         if (ret != 0) {
3494                 DEBUG(DEBUG_ERR, ("Unable to shutdown node %u\n", options.pnn));
3495                 return ret;
3496         }
3497
3498         return 0;
3499 }
3500
3501 /*
3502   trigger a recovery
3503  */
3504 static int control_recover(struct ctdb_context *ctdb, int argc, const char **argv)
3505 {
3506         int ret;
3507         uint32_t generation, next_generation;
3508
3509         /* record the current generation number */
3510         generation = get_generation(ctdb);
3511
3512         ret = ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
3513         if (ret != 0) {
3514                 DEBUG(DEBUG_ERR, ("Unable to set recovery mode\n"));
3515                 return ret;
3516         }
3517
3518         /* wait until we are in a new generation */
3519         while (1) {
3520                 next_generation = get_generation(ctdb);
3521                 if (next_generation != generation) {
3522                         return 0;
3523                 }
3524                 sleep(1);
3525         }
3526
3527         return 0;
3528 }
3529
3530
3531 /*
3532   display monitoring mode of a remote node
3533  */
3534 static int control_getmonmode(struct ctdb_context *ctdb, int argc, const char **argv)
3535 {
3536         uint32_t monmode;
3537         int ret;
3538
3539         ret = ctdb_ctrl_getmonmode(ctdb, TIMELIMIT(), options.pnn, &monmode);
3540         if (ret != 0) {
3541                 DEBUG(DEBUG_ERR, ("Unable to get monmode from node %u\n", options.pnn));
3542                 return ret;
3543         }
3544         if (!options.machinereadable){
3545                 printf("Monitoring mode:%s (%d)\n",monmode==CTDB_MONITORING_ACTIVE?"ACTIVE":"DISABLED",monmode);
3546         } else {
3547                 printm(":mode:\n");
3548                 printm(":%d:\n",monmode);
3549         }
3550         return 0;
3551 }
3552
3553
3554 /*
3555   display capabilities of a remote node
3556  */
3557 static int control_getcapabilities(struct ctdb_context *ctdb, int argc, const char **argv)
3558 {
3559         uint32_t capabilities;
3560         int ret;
3561
3562         ret = ctdb_ctrl_getcapabilities(ctdb, TIMELIMIT(), options.pnn, &capabilities);
3563         if (ret != 0) {
3564                 DEBUG(DEBUG_ERR, ("Unable to get capabilities from node %u\n", options.pnn));
3565                 return -1;
3566         }
3567         
3568         if (!options.machinereadable){
3569                 printf("RECMASTER: %s\n", (capabilities&CTDB_CAP_RECMASTER)?"YES":"NO");
3570                 printf("LMASTER: %s\n", (capabilities&CTDB_CAP_LMASTER)?"YES":"NO");
3571                 printf("LVS: %s\n", (capabilities&CTDB_CAP_LVS)?"YES":"NO");
3572                 printf("NATGW: %s\n", (capabilities&CTDB_CAP_NATGW)?"YES":"NO");
3573         } else {
3574                 printm(":RECMASTER:LMASTER:LVS:NATGW:\n");
3575                 printm(":%d:%d:%d:%d:\n",
3576                         !!(capabilities&CTDB_CAP_RECMASTER),
3577                         !!(capabilities&CTDB_CAP_LMASTER),
3578                         !!(capabilities&CTDB_CAP_LVS),
3579                         !!(capabilities&CTDB_CAP_NATGW));
3580         }
3581         return 0;
3582 }
3583
3584 /*
3585   display lvs configuration
3586  */
3587
3588 static uint32_t lvs_exclude_flags[] = {
3589         /* Look for a nice healthy node */
3590         NODE_FLAGS_INACTIVE|NODE_FLAGS_DISABLED,
3591         /* If not found, an UNHEALTHY node will do */
3592         NODE_FLAGS_INACTIVE|NODE_FLAGS_PERMANENTLY_DISABLED,
3593         0,
3594 };
3595
3596 static int control_lvs(struct ctdb_context *ctdb, int argc, const char **argv)
3597 {
3598         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3599         struct ctdb_node_map *orig_nodemap=NULL;
3600         struct ctdb_node_map *nodemap;
3601         int i, ret;
3602
3603         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn,
3604                                    tmp_ctx, &orig_nodemap);
3605         if (ret != 0) {
3606                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
3607                 talloc_free(tmp_ctx);
3608                 return -1;
3609         }
3610
3611         nodemap = filter_nodemap_by_capabilities(ctdb, orig_nodemap,
3612                                                  CTDB_CAP_LVS, false);
3613         if (nodemap == NULL) {
3614                 /* No memory */
3615                 ret = -1;
3616                 goto done;
3617         }
3618
3619         ret = 0;
3620
3621         for (i = 0; lvs_exclude_flags[i] != 0; i++) {
3622                 struct ctdb_node_map *t =
3623                         filter_nodemap_by_flags(ctdb, nodemap,
3624                                                 lvs_exclude_flags[i]);
3625                 if (t == NULL) {
3626                         /* No memory */
3627                         ret = -1;
3628                         goto done;
3629                 }
3630                 if (t->num > 0) {
3631                         /* At least 1 node without excluded flags */
3632                         int j;
3633                         for (j = 0; j < t->num; j++) {
3634                                 printf("%d:%s\n", t->nodes[j].pnn, 
3635                                        ctdb_addr_to_str(&t->nodes[j].addr));
3636                         }
3637                         goto done;
3638                 }
3639                 talloc_free(t);
3640         }
3641 done:
3642         talloc_free(tmp_ctx);
3643         return ret;
3644 }
3645
3646 /*
3647   display who is the lvs master
3648  */
3649 static int control_lvsmaster(struct ctdb_context *ctdb, int argc, const char **argv)
3650 {
3651         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3652         struct ctdb_node_map *nodemap=NULL;
3653         int i, ret;
3654
3655         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn,
3656                                    tmp_ctx, &nodemap);
3657         if (ret != 0) {
3658                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
3659                 talloc_free(tmp_ctx);
3660                 return -1;
3661         }
3662
3663         for (i = 0; lvs_exclude_flags[i] != 0; i++) {
3664                 struct ctdb_node_map *t =
3665                         filter_nodemap_by_flags(ctdb, nodemap,
3666                                                 lvs_exclude_flags[i]);
3667                 if (t == NULL) {
3668                         /* No memory */
3669                         ret = -1;
3670                         goto done;
3671                 }
3672                 if (t->num > 0) {
3673                         struct ctdb_node_map *n;
3674                         n = filter_nodemap_by_capabilities(ctdb,
3675                                                            t,
3676                                                            CTDB_CAP_LVS,
3677                                                            true);
3678                         if (n == NULL) {
3679                                 /* No memory */
3680                                 ret = -1;
3681                                 goto done;
3682                         }
3683                         if (n->num > 0) {
3684                                 ret = 0;
3685                                 if (options.machinereadable) {
3686                                         printm("%d\n", n->nodes[0].pnn);
3687                                 } else {
3688                                         printf("Node %d is LVS master\n", n->nodes[0].pnn);
3689                                 }
3690                                 goto done;
3691                         }
3692                 }
3693                 talloc_free(t);
3694         }
3695
3696         printf("There is no LVS master\n");
3697         ret = 255;
3698 done:
3699         talloc_free(tmp_ctx);
3700         return ret;
3701 }
3702
3703 /*
3704   disable monitoring on a  node
3705  */
3706 static int control_disable_monmode(struct ctdb_context *ctdb, int argc, const char **argv)
3707 {
3708         
3709         int ret;
3710
3711         ret = ctdb_ctrl_disable_monmode(ctdb, TIMELIMIT(), options.pnn);
3712         if (ret != 0) {
3713                 DEBUG(DEBUG_ERR, ("Unable to disable monmode on node %u\n", options.pnn));
3714                 return ret;
3715         }
3716         printf("Monitoring mode:%s\n","DISABLED");
3717
3718         return 0;
3719 }
3720
3721 /*
3722   enable monitoring on a  node
3723  */
3724 static int control_enable_monmode(struct ctdb_context *ctdb, int argc, const char **argv)
3725 {
3726         
3727         int ret;
3728
3729         ret = ctdb_ctrl_enable_monmode(ctdb, TIMELIMIT(), options.pnn);
3730         if (ret != 0) {
3731                 DEBUG(DEBUG_ERR, ("Unable to enable monmode on node %u\n", options.pnn));
3732                 return ret;
3733         }
3734         printf("Monitoring mode:%s\n","ACTIVE");
3735
3736         return 0;
3737 }
3738
3739 /*
3740   display remote list of keys/data for a db
3741  */
3742 static int control_catdb(struct ctdb_context *ctdb, int argc, const char **argv)
3743 {
3744         const char *db_name;
3745         struct ctdb_db_context *ctdb_db;
3746         int ret;
3747         struct ctdb_dump_db_context c;
3748         uint8_t flags;
3749
3750         if (argc < 1) {
3751                 usage();
3752         }
3753
3754         if (!db_exists(ctdb, argv[0], NULL, &db_name, &flags)) {
3755                 return -1;
3756         }
3757
3758         ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), db_name, flags & CTDB_DB_FLAGS_PERSISTENT, 0);
3759         if (ctdb_db == NULL) {
3760                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
3761                 return -1;
3762         }
3763
3764         if (options.printlmaster) {
3765                 ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), options.pnn,
3766                                           ctdb, &ctdb->vnn_map);
3767                 if (ret != 0) {
3768                         DEBUG(DEBUG_ERR, ("Unable to get vnnmap from node %u\n",
3769                                           options.pnn));
3770                         return ret;
3771                 }
3772         }
3773
3774         ZERO_STRUCT(c);
3775         c.f = stdout;
3776         c.printemptyrecords = (bool)options.printemptyrecords;
3777         c.printdatasize = (bool)options.printdatasize;
3778         c.printlmaster = (bool)options.printlmaster;
3779         c.printhash = (bool)options.printhash;
3780         c.printrecordflags = (bool)options.printrecordflags;
3781
3782         /* traverse and dump the cluster tdb */
3783         ret = ctdb_dump_db(ctdb_db, &c);
3784         if (ret == -1) {
3785                 DEBUG(DEBUG_ERR, ("Unable to dump database\n"));
3786                 DEBUG(DEBUG_ERR, ("Maybe try 'ctdb getdbstatus %s'"
3787                                   " and 'ctdb getvar AllowUnhealthyDBRead'\n",
3788                                   db_name));
3789                 return -1;
3790         }
3791         talloc_free(ctdb_db);
3792
3793         printf("Dumped %d records\n", ret);
3794         return 0;
3795 }
3796
3797 struct cattdb_data {
3798         struct ctdb_context *ctdb;
3799         uint32_t count;
3800 };
3801
3802 static int cattdb_traverse(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *private_data)
3803 {
3804         struct cattdb_data *d = private_data;
3805         struct ctdb_dump_db_context c;
3806
3807         d->count++;
3808
3809         ZERO_STRUCT(c);
3810         c.f = stdout;
3811         c.printemptyrecords = (bool)options.printemptyrecords;
3812         c.printdatasize = (bool)options.printdatasize;
3813         c.printlmaster = false;
3814         c.printhash = (bool)options.printhash;
3815         c.printrecordflags = true;
3816
3817         return ctdb_dumpdb_record(d->ctdb, key, data, &c);
3818 }
3819
3820 /*
3821   cat the local tdb database using same format as catdb
3822  */
3823 static int control_cattdb(struct ctdb_context *ctdb, int argc, const char **argv)
3824 {
3825         const char *db_name;
3826         struct ctdb_db_context *ctdb_db;
3827         struct cattdb_data d;
3828         uint8_t flags;
3829
3830         if (argc < 1) {
3831                 usage();
3832         }
3833
3834         if (!db_exists(ctdb, argv[0], NULL, &db_name, &flags)) {
3835                 return -1;
3836         }
3837
3838         ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), db_name, flags & CTDB_DB_FLAGS_PERSISTENT, 0);
3839         if (ctdb_db == NULL) {
3840                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
3841                 return -1;
3842         }
3843
3844         /* traverse the local tdb */
3845         d.count = 0;
3846         d.ctdb  = ctdb;
3847         if (tdb_traverse_read(ctdb_db->ltdb->tdb, cattdb_traverse, &d) == -1) {
3848                 printf("Failed to cattdb data\n");
3849                 exit(10);
3850         }
3851         talloc_free(ctdb_db);
3852
3853         printf("Dumped %d records\n", d.count);
3854         return 0;
3855 }
3856
3857 /*
3858   display the content of a database key
3859  */
3860 static int control_readkey(struct ctdb_context *ctdb, int argc, const char **argv)
3861 {
3862         const char *db_name;
3863         struct ctdb_db_context *ctdb_db;
3864         struct ctdb_record_handle *h;
3865         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3866         TDB_DATA key, data;
3867         uint8_t flags;
3868
3869         if (argc < 2) {
3870                 usage();
3871         }
3872
3873         if (!db_exists(ctdb, argv[0], NULL, &db_name, &flags)) {
3874                 return -1;
3875         }
3876
3877         ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), db_name, flags & CTDB_DB_FLAGS_PERSISTENT, 0);
3878         if (ctdb_db == NULL) {
3879                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
3880                 return -1;
3881         }
3882
3883         key.dptr  = discard_const(argv[1]);
3884         key.dsize = strlen((char *)key.dptr);
3885
3886         h = ctdb_fetch_lock(ctdb_db, tmp_ctx, key, &data);
3887         if (h == NULL) {
3888                 printf("Failed to fetch record '%s' on node %d\n", 
3889                         (const char *)key.dptr, ctdb_get_pnn(ctdb));
3890                 talloc_free(tmp_ctx);
3891                 exit(10);
3892         }
3893
3894         printf("Data: size:%d ptr:[%.*s]\n", (int)data.dsize, (int)data.dsize, data.dptr);
3895
3896         talloc_free(tmp_ctx);
3897         talloc_free(ctdb_db);
3898         return 0;
3899 }
3900
3901 /*
3902   display the content of a database key
3903  */
3904 static int control_writekey(struct ctdb_context *ctdb, int argc, const char **argv)
3905 {
3906         const char *db_name;
3907         struct ctdb_db_context *ctdb_db;
3908         struct ctdb_record_handle *h;
3909         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3910         TDB_DATA key, data;
3911         uint8_t flags;
3912
3913         if (argc < 3) {
3914                 usage();
3915         }
3916
3917         if (!db_exists(ctdb, argv[0], NULL, &db_name, &flags)) {
3918                 return -1;
3919         }
3920
3921         ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), db_name, flags & CTDB_DB_FLAGS_PERSISTENT, 0);
3922         if (ctdb_db == NULL) {
3923                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
3924                 return -1;
3925         }
3926
3927         key.dptr  = discard_const(argv[1]);
3928         key.dsize = strlen((char *)key.dptr);
3929
3930         h = ctdb_fetch_lock(ctdb_db, tmp_ctx, key, &data);
3931         if (h == NULL) {
3932                 printf("Failed to fetch record '%s' on node %d\n", 
3933                         (const char *)key.dptr, ctdb_get_pnn(ctdb));
3934                 talloc_free(tmp_ctx);
3935                 exit(10);
3936         }
3937
3938         data.dptr  = discard_const(argv[2]);
3939         data.dsize = strlen((char *)data.dptr);
3940
3941         if (ctdb_record_store(h, data) != 0) {
3942                 printf("Failed to store record\n");
3943         }
3944
3945         talloc_free(h);
3946         talloc_free(tmp_ctx);
3947         talloc_free(ctdb_db);
3948         return 0;
3949 }
3950
3951 /*
3952   fetch a record from a persistent database
3953  */
3954 static int control_pfetch(struct ctdb_context *ctdb, int argc, const char **argv)
3955 {
3956         const char *db_name;
3957         struct ctdb_db_context *ctdb_db;
3958         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3959         struct ctdb_transaction_handle *h;
3960         TDB_DATA key, data;
3961         int fd, ret;
3962         bool persistent;
3963         uint8_t flags;
3964
3965         if (argc < 2) {
3966                 talloc_free(tmp_ctx);
3967                 usage();
3968         }
3969
3970         if (!db_exists(ctdb, argv[0], NULL, &db_name, &flags)) {
3971                 talloc_free(tmp_ctx);
3972                 return -1;
3973         }
3974
3975         persistent = flags & CTDB_DB_FLAGS_PERSISTENT;
3976         if (!persistent) {
3977                 DEBUG(DEBUG_ERR,("Database '%s' is not persistent\n", db_name));
3978                 talloc_free(tmp_ctx);
3979                 return -1;
3980         }
3981
3982         ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), db_name, persistent, 0);
3983         if (ctdb_db == NULL) {
3984                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
3985                 talloc_free(tmp_ctx);
3986                 return -1;
3987         }
3988
3989         h = ctdb_transaction_start(ctdb_db, tmp_ctx);
3990         if (h == NULL) {
3991                 DEBUG(DEBUG_ERR,("Failed to start transaction on database %s\n", db_name));
3992                 talloc_free(tmp_ctx);
3993                 return -1;
3994         }
3995
3996         key.dptr  = discard_const(argv[1]);
3997         key.dsize = strlen(argv[1]);
3998         ret = ctdb_transaction_fetch(h, tmp_ctx, key, &data);
3999         if (ret != 0) {
4000                 DEBUG(DEBUG_ERR,("Failed to fetch record\n"));
4001                 talloc_free(tmp_ctx);
4002                 return -1;
4003         }
4004
4005         if (data.dsize == 0 || data.dptr == NULL) {
4006                 DEBUG(DEBUG_ERR,("Record is empty\n"));
4007                 talloc_free(tmp_ctx);
4008                 return -1;
4009         }
4010
4011         if (argc == 3) {
4012           fd = open(argv[2], O_WRONLY|O_CREAT|O_TRUNC, 0600);
4013                 if (fd == -1) {
4014                         DEBUG(DEBUG_ERR,("Failed to open output file %s\n", argv[2]));
4015                         talloc_free(tmp_ctx);
4016                         return -1;
4017                 }
4018                 sys_write(fd, data.dptr, data.dsize);
4019                 close(fd);
4020         } else {
4021                 sys_write(1, data.dptr, data.dsize);
4022         }
4023
4024         /* abort the transaction */
4025         talloc_free(h);
4026
4027
4028         talloc_free(tmp_ctx);
4029         return 0;
4030 }
4031
4032 /*
4033   fetch a record from a tdb-file
4034  */
4035 static int control_tfetch(struct ctdb_context *ctdb, int argc, const char **argv)
4036 {
4037         const char *tdb_file;
4038         TDB_CONTEXT *tdb;
4039         TDB_DATA key, data;
4040         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
4041         int fd;
4042
4043         if (argc < 2) {
4044                 usage();
4045         }
4046
4047         tdb_file = argv[0];
4048
4049         tdb = tdb_open(tdb_file, 0, 0, O_RDONLY, 0);
4050         if (tdb == NULL) {
4051                 printf("Failed to open TDB file %s\n", tdb_file);
4052                 return -1;
4053         }
4054
4055         key = strtodata(tmp_ctx, argv[1], strlen(argv[1]));
4056         if (key.dptr == NULL) {
4057                 printf("Failed to convert \"%s\" into a TDB_DATA\n", argv[1]);
4058                 return -1;
4059         }
4060
4061         data = tdb_fetch(tdb, key);
4062         if (data.dptr == NULL || data.dsize < sizeof(struct ctdb_ltdb_header)) {
4063                 printf("Failed to read record %s from tdb %s\n", argv[1], tdb_file);
4064                 tdb_close(tdb);
4065                 return -1;
4066         }
4067
4068         tdb_close(tdb);
4069
4070         if (argc == 3) {
4071           fd = open(argv[2], O_WRONLY|O_CREAT|O_TRUNC, 0600);
4072                 if (fd == -1) {
4073                         printf("Failed to open output file %s\n", argv[2]);
4074                         return -1;
4075                 }
4076                 if (options.verbose){
4077                         sys_write(fd, data.dptr, data.dsize);
4078                 } else {
4079                         sys_write(fd, data.dptr+sizeof(struct ctdb_ltdb_header), data.dsize-sizeof(struct ctdb_ltdb_header));
4080                 }
4081                 close(fd);
4082         } else {
4083                 if (options.verbose){
4084                         sys_write(1, data.dptr, data.dsize);
4085                 } else {
4086                         sys_write(1, data.dptr+sizeof(struct ctdb_ltdb_header), data.dsize-sizeof(struct ctdb_ltdb_header));
4087                 }
4088         }
4089
4090         talloc_free(tmp_ctx);
4091         return 0;
4092 }
4093
4094 /*
4095   store a record and header to a tdb-file
4096  */
4097 static int control_tstore(struct ctdb_context *ctdb, int argc, const char **argv)
4098 {
4099         const char *tdb_file;
4100         TDB_CONTEXT *tdb;
4101         TDB_DATA key, value, data;
4102         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
4103         struct ctdb_ltdb_header header;
4104
4105         if (argc < 3) {
4106                 usage();
4107         }
4108
4109         tdb_file = argv[0];
4110
4111         tdb = tdb_open(tdb_file, 0, 0, O_RDWR, 0);
4112         if (tdb == NULL) {
4113                 printf("Failed to open TDB file %s\n", tdb_file);
4114                 return -1;
4115         }
4116
4117         key = strtodata(tmp_ctx, argv[1], strlen(argv[1]));
4118         if (key.dptr == NULL) {
4119                 printf("Failed to convert \"%s\" into a TDB_DATA\n", argv[1]);
4120                 return -1;
4121         }
4122
4123         value = strtodata(tmp_ctx, argv[2], strlen(argv[2]));
4124         if (value.dptr == NULL) {
4125                 printf("Failed to convert \"%s\" into a TDB_DATA\n", argv[2]);
4126                 return -1;
4127         }
4128
4129         ZERO_STRUCT(header);
4130         if (argc > 3) {
4131                 header.rsn = atoll(argv[3]);
4132         }
4133         if (argc > 4) {
4134                 header.dmaster = atoi(argv[4]);
4135         }
4136         if (argc > 5) {
4137                 header.flags = atoi(argv[5]);
4138         }
4139
4140         data.dsize = sizeof(struct ctdb_ltdb_header) + value.dsize;
4141         data.dptr = talloc_size(tmp_ctx, data.dsize);
4142         if (data.dptr == NULL) {
4143                 printf("Failed to allocate header+value\n");
4144                 return -1;
4145         }
4146
4147         *(struct ctdb_ltdb_header *)data.dptr = header;
4148         memcpy(data.dptr + sizeof(struct ctdb_ltdb_header), value.dptr, value.dsize);
4149
4150         if (tdb_store(tdb, key, data, TDB_REPLACE) != 0) {
4151                 printf("Failed to write record %s to tdb %s\n", argv[1], tdb_file);
4152                 tdb_close(tdb);
4153                 return -1;
4154         }
4155
4156         tdb_close(tdb);
4157
4158         talloc_free(tmp_ctx);
4159         return 0;
4160 }
4161
4162 /*
4163   write a record to a persistent database
4164  */
4165 static int control_pstore(struct ctdb_context *ctdb, int argc, const char **argv)
4166 {
4167         const char *db_name;
4168         struct ctdb_db_context *ctdb_db;
4169         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
4170         struct ctdb_transaction_handle *h;
4171         struct stat st;
4172         TDB_DATA key, data;
4173         int fd, ret;
4174
4175         if (argc < 3) {
4176                 talloc_free(tmp_ctx);
4177                 usage();
4178         }
4179
4180         fd = open(argv[2], O_RDONLY);
4181         if (fd == -1) {
4182                 DEBUG(DEBUG_ERR,("Failed to open file containing record data : %s  %s\n", argv[2], strerror(errno)));
4183                 talloc_free(tmp_ctx);
4184                 return -1;
4185         }
4186         
4187         ret = fstat(fd, &st);
4188         if (ret == -1) {
4189                 DEBUG(DEBUG_ERR,("fstat of file %s failed: %s\n", argv[2], strerror(errno)));
4190                 close(fd);
4191                 talloc_free(tmp_ctx);
4192                 return -1;
4193         }
4194
4195         if (!S_ISREG(st.st_mode)) {
4196                 DEBUG(DEBUG_ERR,("Not a regular file %s\n", argv[2]));
4197                 close(fd);
4198                 talloc_free(tmp_ctx);
4199                 return -1;
4200         }
4201
4202         data.dsize = st.st_size;
4203         if (data.dsize == 0) {
4204                 data.dptr  = NULL;
4205         } else {
4206                 data.dptr = talloc_size(tmp_ctx, data.dsize);
4207                 if (data.dptr == NULL) {
4208                         DEBUG(DEBUG_ERR,("Failed to talloc %d of memory to store record data\n", (int)data.dsize));
4209                         close(fd);
4210                         talloc_free(tmp_ctx);
4211                         return -1;
4212                 }
4213                 ret = sys_read(fd, data.dptr, data.dsize);
4214                 if (ret != data.dsize) {
4215                         DEBUG(DEBUG_ERR,("Failed to read %d bytes of record data\n", (int)data.dsize));
4216                         close(fd);
4217                         talloc_free(tmp_ctx);
4218                         return -1;
4219                 }
4220         }
4221         close(fd);
4222
4223
4224         db_name = argv[0];
4225
4226         ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), db_name, true, 0);
4227         if (ctdb_db == NULL) {
4228                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
4229                 talloc_free(tmp_ctx);
4230                 return -1;
4231         }
4232
4233         h = ctdb_transaction_start(ctdb_db, tmp_ctx);
4234         if (h == NULL) {
4235                 DEBUG(DEBUG_ERR,("Failed to start transaction on database %s\n", db_name));
4236                 talloc_free(tmp_ctx);
4237                 return -1;
4238         }
4239
4240         key = strtodata(tmp_ctx, argv[1], strlen(argv[1]));
4241         if (key.dptr == NULL) {
4242                 printf("Failed to convert \"%s\" into a TDB_DATA\n", argv[1]);
4243                 return -1;
4244         }
4245
4246         ret = ctdb_transaction_store(h, key, data);
4247         if (ret != 0) {
4248                 DEBUG(DEBUG_ERR,("Failed to store record\n"));
4249                 talloc_free(tmp_ctx);
4250                 return -1;
4251         }
4252
4253         ret = ctdb_transaction_commit(h);
4254         if (ret != 0) {
4255                 DEBUG(DEBUG_ERR,("Failed to commit transaction\n"));
4256                 talloc_free(tmp_ctx);
4257                 return -1;
4258         }
4259
4260
4261         talloc_free(tmp_ctx);
4262         return 0;
4263 }
4264
4265 /*
4266  * delete a record from a persistent database
4267  */
4268 static int control_pdelete(struct ctdb_context *ctdb, int argc, const char **argv)
4269 {
4270         const char *db_name;
4271         struct ctdb_db_context *ctdb_db;
4272         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
4273         struct ctdb_transaction_handle *h;
4274         TDB_DATA key;
4275         int ret;
4276         bool persistent;
4277         uint8_t flags;
4278
4279         if (argc < 2) {
4280                 talloc_free(tmp_ctx);
4281                 usage();
4282         }
4283
4284         if (!db_exists(ctdb, argv[0], NULL, &db_name, &flags)) {
4285                 talloc_free(tmp_ctx);
4286                 return -1;
4287         }
4288
4289         persistent = flags & CTDB_DB_FLAGS_PERSISTENT;
4290         if (!persistent) {
4291                 DEBUG(DEBUG_ERR, ("Database '%s' is not persistent\n", db_name));
4292                 talloc_free(tmp_ctx);
4293                 return -1;
4294         }
4295
4296         ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), db_name, persistent, 0);
4297         if (ctdb_db == NULL) {
4298                 DEBUG(DEBUG_ERR, ("Unable to attach to database '%s'\n", db_name));
4299                 talloc_free(tmp_ctx);
4300                 return -1;
4301         }
4302
4303         h = ctdb_transaction_start(ctdb_db, tmp_ctx);
4304         if (h == NULL) {
4305                 DEBUG(DEBUG_ERR, ("Failed to start transaction on database %s\n", db_name));
4306                 talloc_free(tmp_ctx);
4307                 return -1;
4308         }
4309
4310         key = strtodata(tmp_ctx, argv[1], strlen(argv[1]));
4311         if (key.dptr == NULL) {
4312                 printf("Failed to convert \"%s\" into a TDB_DATA\n", argv[1]);
4313                 return -1;
4314         }
4315
4316         ret = ctdb_transaction_store(h, key, tdb_null);
4317         if (ret != 0) {
4318                 DEBUG(DEBUG_ERR, ("Failed to delete record\n"));
4319                 talloc_free(tmp_ctx);
4320                 return -1;
4321         }
4322
4323         ret = ctdb_transaction_commit(h);
4324         if (ret != 0) {
4325                 DEBUG(DEBUG_ERR, ("Failed to commit transaction\n"));
4326                 talloc_free(tmp_ctx);
4327                 return -1;
4328         }
4329
4330         talloc_free(tmp_ctx);
4331         return 0;
4332 }
4333
4334 static const char *ptrans_parse_string(TALLOC_CTX *mem_ctx, const char *s,
4335                                        TDB_DATA *data)
4336 {
4337         const char *t;
4338         size_t n;
4339         const char *ret; /* Next byte after successfully parsed value */
4340
4341         /* Error, unless someone says otherwise */
4342         ret = NULL;
4343         /* Indicates no value to parse */
4344         *data = tdb_null;
4345
4346         /* Skip whitespace */
4347         n = strspn(s, " \t");
4348         t = s + n;
4349
4350         if (t[0] == '"') {
4351                 /* Quoted ASCII string - no wide characters! */
4352                 t++;
4353                 n = strcspn(t, "\"");
4354                 if (t[n] == '"') {
4355                         if (n > 0) {
4356                                 *data = strtodata(mem_ctx, t, n);
4357                                 CTDB_NOMEM_ABORT(data->dptr);
4358                         }
4359                         ret = t + n + 1;
4360                 } else {
4361                         DEBUG(DEBUG_WARNING,("Unmatched \" in input %s\n", s));
4362                 }
4363         } else {
4364                 DEBUG(DEBUG_WARNING,("Unsupported input format in %s\n", s));
4365         }
4366
4367         return ret;
4368 }
4369
4370 static bool ptrans_get_key_value(TALLOC_CTX *mem_ctx, FILE *file,
4371                                  TDB_DATA *key, TDB_DATA *value)
4372 {
4373         char line [1024]; /* FIXME: make this more flexible? */
4374         const char *t;
4375         char *ptr;
4376
4377         ptr = fgets(line, sizeof(line), file);
4378
4379         if (ptr == NULL) {
4380                 return false;
4381         }
4382
4383         /* Get key */
4384         t = ptrans_parse_string(mem_ctx, line, key);
4385         if (t == NULL || key->dptr == NULL) {
4386                 /* Line Ignored but not EOF */
4387                 return true;
4388         }
4389
4390         /* Get value */
4391         t = ptrans_parse_string(mem_ctx, t, value);
4392         if (t == NULL) {
4393                 /* Line Ignored but not EOF */
4394                 talloc_free(key->dptr);
4395                 *key = tdb_null;
4396                 return true;
4397         }
4398
4399         return true;
4400 }
4401
4402 /*
4403  * Update a persistent database as per file/stdin
4404  */
4405 static int control_ptrans(struct ctdb_context *ctdb,
4406                           int argc, const char **argv)
4407 {
4408         const char *db_name;
4409         struct ctdb_db_context *ctdb_db;
4410         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
4411         struct ctdb_transaction_handle *h;
4412         TDB_DATA key, value;
4413         FILE *file;
4414         int ret;
4415
4416         if (argc < 1) {
4417                 talloc_free(tmp_ctx);
4418                 usage();
4419         }
4420
4421         file = stdin;
4422         if (argc == 2) {
4423                 file = fopen(argv[1], "r");
4424                 if (file == NULL) {
4425                         DEBUG(DEBUG_ERR,("Unable to open file for reading '%s'\n", argv[1]));
4426                         talloc_free(tmp_ctx);
4427                         return -1;
4428                 }
4429         }
4430
4431         db_name = argv[0];
4432
4433         ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), db_name, true, 0);
4434         if (ctdb_db == NULL) {
4435                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
4436                 goto error;
4437         }
4438
4439         h = ctdb_transaction_start(ctdb_db, tmp_ctx);
4440         if (h == NULL) {
4441                 DEBUG(DEBUG_ERR,("Failed to start transaction on database %s\n", db_name));
4442                 goto error;
4443         }
4444
4445         while (ptrans_get_key_value(tmp_ctx, file, &key, &value)) {
4446                 if (key.dsize != 0) {
4447                         ret = ctdb_transaction_store(h, key, value);
4448                         /* Minimise memory use */
4449                         talloc_free(key.dptr);
4450                         if (value.dptr != NULL) {
4451                                 talloc_free(value.dptr);
4452                         }
4453                         if (ret != 0) {
4454                                 DEBUG(DEBUG_ERR,("Failed to store record\n"));
4455                                 ctdb_transaction_cancel(h);
4456                                 goto error;
4457                         }
4458                 }
4459         }
4460
4461         ret = ctdb_transaction_commit(h);
4462         if (ret != 0) {
4463                 DEBUG(DEBUG_ERR,("Failed to commit transaction\n"));
4464                 goto error;
4465         }
4466
4467         if (file != stdin) {
4468                 fclose(file);
4469         }
4470         talloc_free(tmp_ctx);
4471         return 0;
4472
4473 error:
4474         if (file != stdin) {
4475                 fclose(file);
4476         }
4477
4478         talloc_free(tmp_ctx);
4479         return -1;
4480 }
4481
4482 /*
4483   check if a service is bound to a port or not
4484  */
4485 static int control_chktcpport(struct ctdb_context *ctdb, int argc, const char **argv)
4486 {
4487         int s, ret;
4488         int v;
4489         int port;
4490         struct sockaddr_in sin;
4491
4492         if (argc != 1) {
4493                 printf("Use: ctdb chktcport <port>\n");
4494                 return EINVAL;
4495         }
4496
4497         port = atoi(argv[0]);
4498
4499         s = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
4500         if (s == -1) {
4501                 printf("Failed to open local socket\n");
4502                 return errno;
4503         }
4504
4505         v = fcntl(s, F_GETFL, 0);
4506         if (v == -1 || fcntl(s, F_SETFL, v | O_NONBLOCK) != 0) {
4507                 printf("Unable to set socket non-blocking: %s\n", strerror(errno));
4508         }
4509
4510         bzero(&sin, sizeof(sin));
4511         sin.sin_family = PF_INET;
4512         sin.sin_port   = htons(port);
4513         ret = bind(s, (struct sockaddr *)&sin, sizeof(sin));
4514         close(s);
4515         if (ret == -1) {
4516                 printf("Failed to bind to local socket: %d %s\n", errno, strerror(errno));
4517                 return errno;
4518         }
4519
4520         return 0;
4521 }
4522
4523
4524 /* Reload public IPs on a specified nodes */
4525 static int control_reloadips(struct ctdb_context *ctdb, int argc, const char **argv)
4526 {
4527         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
4528         uint32_t *nodes;
4529         uint32_t pnn_mode;
4530         uint32_t timeout;
4531         int ret;
4532
4533         assert_single_node_only();
4534
4535         if (argc > 1) {
4536                 usage();
4537         }
4538
4539         /* Determine the nodes where IPs need to be reloaded */
4540         if (!parse_nodestring(ctdb, tmp_ctx, argc == 1 ? argv[0] : NULL,
4541                               options.pnn, true, &nodes, &pnn_mode)) {
4542                 ret = -1;
4543                 goto done;
4544         }
4545
4546 again:
4547         /* Disable takeover runs on all connected nodes.  A reply
4548          * indicating success is needed from each node so all nodes
4549          * will need to be active.  This will retry until maxruntime
4550          * is exceeded, hence no error handling.
4551          * 
4552          * A check could be added to not allow reloading of IPs when
4553          * there are disconnected nodes.  However, this should
4554          * probably be left up to the administrator.
4555          */
4556         timeout = LONGTIMEOUT;
4557         srvid_broadcast(ctdb, CTDB_SRVID_DISABLE_TAKEOVER_RUNS, &timeout,
4558                         "Disable takeover runs", true);
4559
4560         /* Now tell all the desired nodes to reload their public IPs.
4561          * Keep trying this until it succeeds.  This assumes all
4562          * failures are transient, which might not be true...
4563          */
4564         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_RELOAD_PUBLIC_IPS,
4565                                       nodes, 0, LONGTIMELIMIT(),
4566                                       false, tdb_null,
4567                                       NULL, NULL, NULL) != 0) {
4568                 DEBUG(DEBUG_ERR,
4569                       ("Unable to reload IPs on some nodes, try again.\n"));
4570                 goto again;
4571         }
4572
4573         /* It isn't strictly necessary to wait until takeover runs are
4574          * re-enabled but doing so can't hurt.
4575          */
4576         timeout = 0;
4577         srvid_broadcast(ctdb, CTDB_SRVID_DISABLE_TAKEOVER_RUNS, &timeout,
4578                         "Enable takeover runs", true);
4579
4580         ipreallocate(ctdb);
4581
4582         ret = 0;
4583 done:
4584         talloc_free(tmp_ctx);
4585         return ret;
4586 }
4587
4588 /*
4589   display a list of the databases on a remote ctdb
4590  */
4591 static int control_getdbmap(struct ctdb_context *ctdb, int argc, const char **argv)
4592 {
4593         int i, ret;
4594         struct ctdb_dbid_map *dbmap=NULL;
4595
4596         ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, ctdb, &dbmap);
4597         if (ret != 0) {
4598                 DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n", options.pnn));
4599                 return ret;
4600         }
4601
4602         if(options.machinereadable){
4603                 printm(":ID:Name:Path:Persistent:Sticky:Unhealthy:ReadOnly:\n");
4604                 for(i=0;i<dbmap->num;i++){
4605                         const char *path = NULL;
4606                         const char *name = NULL;
4607                         const char *health = NULL;
4608                         bool persistent;
4609                         bool readonly;
4610                         bool sticky;
4611
4612                         ctdb_ctrl_getdbpath(ctdb, TIMELIMIT(), options.pnn,
4613                                             dbmap->dbs[i].dbid, ctdb, &path);
4614                         ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn,
4615                                             dbmap->dbs[i].dbid, ctdb, &name);
4616                         ctdb_ctrl_getdbhealth(ctdb, TIMELIMIT(), options.pnn,
4617                                               dbmap->dbs[i].dbid, ctdb, &health);
4618                         persistent = dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT;
4619                         readonly   = dbmap->dbs[i].flags & CTDB_DB_FLAGS_READONLY;
4620                         sticky     = dbmap->dbs[i].flags & CTDB_DB_FLAGS_STICKY;
4621                         printm(":0x%08X:%s:%s:%d:%d:%d:%d:\n",
4622                                dbmap->dbs[i].dbid, name, path,
4623                                !!(persistent), !!(sticky),
4624                                !!(health), !!(readonly));
4625                 }
4626                 return 0;
4627         }
4628
4629         printf("Number of databases:%d\n", dbmap->num);
4630         for(i=0;i<dbmap->num;i++){
4631                 const char *path = NULL;
4632                 const char *name = NULL;
4633                 const char *health = NULL;
4634                 bool persistent;
4635                 bool readonly;
4636                 bool sticky;
4637
4638                 ctdb_ctrl_getdbpath(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &path);
4639                 ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &name);
4640                 ctdb_ctrl_getdbhealth(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &health);
4641                 persistent = dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT;
4642                 readonly   = dbmap->dbs[i].flags & CTDB_DB_FLAGS_READONLY;
4643                 sticky     = dbmap->dbs[i].flags & CTDB_DB_FLAGS_STICKY;
4644                 printf("dbid:0x%08x name:%s path:%s%s%s%s%s\n",
4645                        dbmap->dbs[i].dbid, name, path,
4646                        persistent?" PERSISTENT":"",
4647                        sticky?" STICKY":"",
4648                        readonly?" READONLY":"",
4649                        health?" UNHEALTHY":"");
4650         }
4651
4652         return 0;
4653 }
4654
4655 /*
4656   display the status of a database on a remote ctdb
4657  */
4658 static int control_getdbstatus(struct ctdb_context *ctdb, int argc, const char **argv)
4659 {
4660         const char *db_name;
4661         uint32_t db_id;
4662         uint8_t flags;
4663         const char *path = NULL;
4664         const char *health = NULL;
4665
4666         if (argc < 1) {
4667                 usage();
4668         }
4669
4670         if (!db_exists(ctdb, argv[0], &db_id, &db_name, &flags)) {
4671                 return -1;
4672         }
4673
4674         ctdb_ctrl_getdbpath(ctdb, TIMELIMIT(), options.pnn, db_id, ctdb, &path);
4675         ctdb_ctrl_getdbhealth(ctdb, TIMELIMIT(), options.pnn, db_id, ctdb, &health);
4676         printf("dbid: 0x%08x\nname: %s\npath: %s\nPERSISTENT: %s\nSTICKY: %s\nREADONLY: %s\nHEALTH: %s\n",
4677                db_id, db_name, path,
4678                (flags & CTDB_DB_FLAGS_PERSISTENT ? "yes" : "no"),
4679                (flags & CTDB_DB_FLAGS_STICKY ? "yes" : "no"),
4680                (flags & CTDB_DB_FLAGS_READONLY ? "yes" : "no"),
4681                (health ? health : "OK"));
4682
4683         return 0;
4684 }
4685
4686 /*
4687   check if the local node is recmaster or not
4688   it will return 1 if this node is the recmaster and 0 if it is not
4689   or if the local ctdb daemon could not be contacted
4690  */
4691 static int control_isnotrecmaster(struct ctdb_context *ctdb, int argc, const char **argv)
4692 {
4693         uint32_t mypnn, recmaster;
4694         int ret;
4695
4696         assert_single_node_only();
4697
4698         mypnn = getpnn(ctdb);
4699
4700         ret = ctdb_ctrl_getrecmaster(ctdb, ctdb, TIMELIMIT(), options.pnn, &recmaster);
4701         if (ret != 0) {
4702                 printf("Failed to get the recmaster\n");
4703                 return 1;
4704         }
4705
4706         if (recmaster != mypnn) {
4707                 printf("this node is not the recmaster\n");
4708                 return 1;
4709         }
4710
4711         printf("this node is the recmaster\n");
4712         return 0;
4713 }
4714
4715 /*
4716   ping a node
4717  */
4718 static int control_ping(struct ctdb_context *ctdb, int argc, const char **argv)
4719 {
4720         int ret;
4721         struct timeval tv = timeval_current();
4722         ret = ctdb_ctrl_ping(ctdb, options.pnn);
4723         if (ret == -1) {
4724                 printf("Unable to get ping response from node %u\n", options.pnn);
4725                 return -1;
4726         } else {
4727                 printf("response from %u time=%.6f sec  (%d clients)\n", 
4728                        options.pnn, timeval_elapsed(&tv), ret);
4729         }
4730         return 0;
4731 }
4732
4733
4734 /*
4735   get a node's runstate
4736  */
4737 static int control_runstate(struct ctdb_context *ctdb, int argc, const char **argv)
4738 {
4739         int ret;
4740         enum ctdb_runstate runstate;
4741
4742         ret = ctdb_ctrl_get_runstate(ctdb, TIMELIMIT(), options.pnn, &runstate);
4743         if (ret == -1) {
4744                 printf("Unable to get runstate response from node %u\n",
4745                        options.pnn);
4746                 return -1;
4747         } else {
4748                 bool found = true;
4749                 enum ctdb_runstate t;
4750                 int i;
4751                 for (i=0; i<argc; i++) {
4752                         found = false;
4753                         t = runstate_from_string(argv[i]);
4754                         if (t == CTDB_RUNSTATE_UNKNOWN) {
4755                                 printf("Invalid run state (%s)\n", argv[i]);
4756                                 return -1;
4757                         }
4758
4759                         if (t == runstate) {
4760                                 found = true;
4761                                 break;
4762                         }
4763                 }
4764
4765                 if (!found) {
4766                         printf("CTDB not in required run state (got %s)\n", 
4767                                runstate_to_string((enum ctdb_runstate)runstate));
4768                         return -1;
4769                 }
4770         }
4771
4772         printf("%s\n", runstate_to_string(runstate));
4773         return 0;
4774 }
4775
4776
4777 /*
4778   get a tunable
4779  */
4780 static int control_getvar(struct ctdb_context *ctdb, int argc, const char **argv)
4781 {
4782         const char *name;
4783         uint32_t value;
4784         int ret;
4785
4786         if (argc < 1) {
4787                 usage();
4788         }
4789
4790         name = argv[0];
4791         ret = ctdb_ctrl_get_tunable(ctdb, TIMELIMIT(), options.pnn, name, &value);
4792         if (ret != 0) {
4793                 DEBUG(DEBUG_ERR, ("Unable to get tunable variable '%s'\n", name));
4794                 return -1;
4795         }
4796
4797         printf("%-23s = %u\n", name, value);
4798         return 0;
4799 }
4800
4801 /*
4802   set a tunable
4803  */
4804 static int control_setvar(struct ctdb_context *ctdb, int argc, const char **argv)
4805 {
4806         const char *name;
4807         uint32_t value;
4808         int ret;
4809
4810         if (argc < 2) {
4811                 usage();
4812         }
4813
4814         name = argv[0];
4815         value = strtoul(argv[1], NULL, 0);
4816
4817         ret = ctdb_ctrl_set_tunable(ctdb, TIMELIMIT(), options.pnn, name, value);
4818         if (ret == -1) {
4819                 DEBUG(DEBUG_ERR, ("Unable to set tunable variable '%s'\n", name));
4820                 return -1;
4821         }
4822         if (ret == 1) {
4823                 DEBUG(DEBUG_WARNING,
4824                       ("Setting obsolete tunable variable '%s'\n",
4825                        name));
4826         }
4827         return 0;
4828 }
4829
4830 /*
4831   list all tunables
4832  */
4833 static int control_listvars(struct ctdb_context *ctdb, int argc, const char **argv)
4834 {
4835         uint32_t count;
4836         const char **list;
4837         int ret, i;
4838
4839         ret = ctdb_ctrl_list_tunables(ctdb, TIMELIMIT(), options.pnn, ctdb, &list, &count);
4840         if (ret == -1) {
4841                 DEBUG(DEBUG_ERR, ("Unable to list tunable variables\n"));
4842                 return -1;
4843         }
4844
4845         for (i=0;i<count;i++) {
4846                 control_getvar(ctdb, 1, &list[i]);
4847         }
4848
4849         talloc_free(list);
4850         
4851         return 0;
4852 }
4853
4854 /*
4855   display debug level on a node
4856  */
4857 static int control_getdebug(struct ctdb_context *ctdb, int argc, const char **argv)
4858 {
4859         int ret;
4860         int32_t level;
4861
4862         ret = ctdb_ctrl_get_debuglevel(ctdb, options.pnn, &level);
4863         if (ret != 0) {
4864                 DEBUG(DEBUG_ERR, ("Unable to get debuglevel response from node %u\n", options.pnn));
4865                 return ret;
4866         } else {
4867                 const char *desc = get_debug_by_level(level);
4868                 if (desc == NULL) {
4869                         /* This should never happen */
4870                         desc = "Unknown";
4871                 }
4872                 if (options.machinereadable){
4873                         printm(":Name:Level:\n");
4874                         printm(":%s:%d:\n", desc, level);
4875                 } else {
4876                         printf("Node %u is at debug level %s (%d)\n",
4877                                options.pnn, desc, level);
4878                 }
4879         }
4880         return 0;
4881 }
4882
4883 /*
4884   display reclock file of a node
4885  */
4886 static int control_getreclock(struct ctdb_context *ctdb, int argc, const char **argv)
4887 {
4888         int ret;
4889         const char *reclock;
4890
4891         ret = ctdb_ctrl_getreclock(ctdb, TIMELIMIT(), options.pnn, ctdb, &reclock);
4892         if (ret != 0) {
4893                 DEBUG(DEBUG_ERR, ("Unable to get reclock file from node %u\n", options.pnn));
4894                 return ret;
4895         } else {
4896                 if (options.machinereadable){
4897                         if (reclock != NULL) {
4898                                 printm("%s", reclock);
4899                         }
4900                 } else {
4901                         if (reclock == NULL) {
4902                                 printf("No reclock file used.\n");
4903                         } else {
4904                                 printf("Reclock file:%s\n", reclock);
4905                         }
4906                 }
4907         }
4908         return 0;
4909 }
4910
4911 /*
4912   set the reclock file of a node
4913  */
4914 static int control_setreclock(struct ctdb_context *ctdb, int argc, const char **argv)
4915 {
4916         int ret;
4917         const char *reclock;
4918
4919         if (argc == 0) {
4920                 reclock = NULL;
4921         } else if (argc == 1) {
4922                 reclock = argv[0];
4923         } else {
4924                 usage();
4925         }
4926
4927         ret = ctdb_ctrl_setreclock(ctdb, TIMELIMIT(), options.pnn, reclock);
4928         if (ret != 0) {
4929                 DEBUG(DEBUG_ERR, ("Unable to get reclock file from node %u\n", options.pnn));
4930                 return ret;
4931         }
4932         return 0;
4933 }
4934
4935 /*
4936   set the natgw state on/off
4937  */
4938 static int control_setnatgwstate(struct ctdb_context *ctdb, int argc, const char **argv)
4939 {
4940         int ret;
4941         uint32_t natgwstate;
4942
4943         if (argc == 0) {
4944                 usage();
4945         }
4946
4947         if (!strcmp(argv[0], "on")) {
4948                 natgwstate = 1;
4949         } else if (!strcmp(argv[0], "off")) {
4950                 natgwstate = 0;
4951         } else {
4952                 usage();
4953         }
4954
4955         ret = ctdb_ctrl_setnatgwstate(ctdb, TIMELIMIT(), options.pnn, natgwstate);
4956         if (ret != 0) {
4957                 DEBUG(DEBUG_ERR, ("Unable to set the natgw state for node %u\n", options.pnn));
4958                 return ret;
4959         }
4960
4961         return 0;
4962 }
4963
4964 /*
4965   set the lmaster role on/off
4966  */
4967 static int control_setlmasterrole(struct ctdb_context *ctdb, int argc, const char **argv)
4968 {
4969         int ret;
4970         uint32_t lmasterrole;
4971
4972         if (argc == 0) {
4973                 usage();
4974         }
4975
4976         if (!strcmp(argv[0], "on")) {
4977                 lmasterrole = 1;
4978         } else if (!strcmp(argv[0], "off")) {
4979                 lmasterrole = 0;
4980         } else {
4981                 usage();
4982         }
4983
4984         ret = ctdb_ctrl_setlmasterrole(ctdb, TIMELIMIT(), options.pnn, lmasterrole);
4985         if (ret != 0) {
4986                 DEBUG(DEBUG_ERR, ("Unable to set the lmaster role for node %u\n", options.pnn));
4987                 return ret;
4988         }
4989
4990         return 0;
4991 }
4992
4993 /*
4994   set the recmaster role on/off
4995  */
4996 static int control_setrecmasterrole(struct ctdb_context *ctdb, int argc, const char **argv)
4997 {
4998         int ret;
4999         uint32_t recmasterrole;
5000
5001         if (argc == 0) {
5002                 usage();
5003         }
5004
5005         if (!strcmp(argv[0], "on")) {
5006                 recmasterrole = 1;
5007         } else if (!strcmp(argv[0], "off")) {
5008                 recmasterrole = 0;
5009         } else {
5010                 usage();
5011         }
5012
5013         ret = ctdb_ctrl_setrecmasterrole(ctdb, TIMELIMIT(), options.pnn, recmasterrole);
5014         if (ret != 0) {
5015                 DEBUG(DEBUG_ERR, ("Unable to set the recmaster role for node %u\n", options.pnn));
5016                 return ret;
5017         }
5018
5019         return 0;
5020 }
5021
5022 /*
5023   set debug level on a node or all nodes
5024  */
5025 static int control_setdebug(struct ctdb_context *ctdb, int argc, const char **argv)
5026 {
5027         int ret;
5028         int32_t level;
5029
5030         if (argc == 0) {
5031                 printf("You must specify the debug level. Valid levels are:\n");
5032                 print_debug_levels(stdout);
5033                 return 0;
5034         }
5035
5036         if (!parse_debug(argv[0], &level)) {
5037                 printf("Invalid debug level, must be one of\n");
5038                 print_debug_levels(stdout);
5039                 return -1;
5040         }
5041
5042         ret = ctdb_ctrl_set_debuglevel(ctdb, options.pnn, level);
5043         if (ret != 0) {
5044                 DEBUG(DEBUG_ERR, ("Unable to set debug level on node %u\n", options.pnn));
5045         }
5046         return 0;
5047 }
5048
5049
5050 /*
5051   thaw a node
5052  */
5053 static int control_thaw(struct ctdb_context *ctdb, int argc, const char **argv)
5054 {
5055         int ret;
5056         uint32_t priority;
5057         
5058         if (argc == 1) {
5059                 priority = strtol(argv[0], NULL, 0);
5060         } else {
5061                 priority = 0;
5062         }
5063         DEBUG(DEBUG_ERR,("Thaw by priority %u\n", priority));
5064
5065         ret = ctdb_ctrl_thaw_priority(ctdb, TIMELIMIT(), options.pnn, priority);
5066         if (ret != 0) {
5067                 DEBUG(DEBUG_ERR, ("Unable to thaw node %u\n", options.pnn));
5068         }               
5069         return 0;
5070 }
5071
5072
5073 /*
5074   attach to a database
5075  */
5076 static int control_attach(struct ctdb_context *ctdb, int argc, const char **argv)
5077 {
5078         const char *db_name;
5079         struct ctdb_db_context *ctdb_db;
5080         bool persistent = false;
5081
5082         if (argc < 1) {
5083                 usage();
5084         }
5085         db_name = argv[0];
5086         if (argc > 2) {
5087                 usage();
5088         }
5089         if (argc == 2) {
5090                 if (strcmp(argv[1], "persistent") != 0) {
5091                         usage();
5092                 }
5093                 persistent = true;
5094         }
5095
5096         ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), db_name, persistent, 0);
5097         if (ctdb_db == NULL) {
5098                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
5099                 return -1;
5100         }
5101
5102         return 0;
5103 }
5104
5105 /*
5106  * detach from a database
5107  */
5108 static int control_detach(struct ctdb_context *ctdb, int argc,
5109                           const char **argv)
5110 {
5111         uint32_t db_id;
5112         uint8_t flags;
5113         int ret, i, status = 0;
5114         struct ctdb_node_map *nodemap = NULL;
5115         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
5116         uint32_t recmode;
5117
5118         if (argc < 1) {
5119                 usage();
5120         }
5121
5122         assert_single_node_only();
5123
5124         ret = ctdb_ctrl_getrecmode(ctdb, tmp_ctx, TIMELIMIT(), options.pnn,
5125                                     &recmode);
5126         if (ret != 0) {
5127                 DEBUG(DEBUG_ERR, ("Database cannot be detached "
5128                                   "when recovery is active\n"));
5129                 talloc_free(tmp_ctx);
5130                 return -1;
5131         }
5132
5133         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx,
5134                                    &nodemap);
5135         if (ret != 0) {
5136                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n",
5137                                   options.pnn));
5138                 talloc_free(tmp_ctx);
5139                 return -1;
5140         }
5141
5142         for (i=0; i<nodemap->num; i++) {
5143                 uint32_t value;
5144
5145                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
5146                         continue;
5147                 }
5148
5149                 if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
5150                         continue;
5151                 }
5152
5153                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
5154                         DEBUG(DEBUG_ERR, ("Database cannot be detached on "
5155                                           "inactive (stopped or banned) node "
5156                                           "%u\n", nodemap->nodes[i].pnn));
5157                         talloc_free(tmp_ctx);
5158                         return -1;
5159                 }
5160
5161                 ret = ctdb_ctrl_get_tunable(ctdb, TIMELIMIT(),
5162                                             nodemap->nodes[i].pnn,
5163                                             "AllowClientDBAttach",
5164                                             &value);
5165                 if (ret != 0) {
5166                         DEBUG(DEBUG_ERR, ("Unable to get tunable "
5167                                           "AllowClientDBAttach from node %u\n",
5168                                            nodemap->nodes[i].pnn));
5169                         talloc_free(tmp_ctx);
5170                         return -1;
5171                 }
5172
5173                 if (value == 1) {
5174                         DEBUG(DEBUG_ERR, ("Database access is still active on "
5175                                           "node %u. Set AllowClientDBAttach=0 "
5176                                           "on all nodes.\n",
5177                                           nodemap->nodes[i].pnn));
5178                         talloc_free(tmp_ctx);
5179                         return -1;
5180                 }
5181         }
5182
5183         talloc_free(tmp_ctx);
5184
5185         for (i=0; i<argc; i++) {
5186                 if (!db_exists(ctdb, argv[i], &db_id, NULL, &flags)) {
5187                         continue;
5188                 }
5189
5190                 if (flags & CTDB_DB_FLAGS_PERSISTENT) {
5191                         DEBUG(DEBUG_ERR, ("Persistent database '%s' "
5192                                           "cannot be detached\n", argv[i]));
5193                         status = -1;
5194                         continue;
5195                 }
5196
5197                 ret = ctdb_detach(ctdb, db_id);
5198                 if (ret != 0) {
5199                         DEBUG(DEBUG_ERR, ("Database '%s' detach failed\n",
5200                                           argv[i]));
5201                         status = ret;
5202                 }
5203         }
5204
5205         return status;
5206 }
5207
5208 /*
5209   set db priority
5210  */
5211 static int control_setdbprio(struct ctdb_context *ctdb, int argc, const char **argv)
5212 {
5213         struct ctdb_db_priority db_prio;
5214         int ret;
5215
5216         if (argc < 2) {
5217                 usage();
5218         }
5219
5220         db_prio.db_id    = strtoul(argv[0], NULL, 0);
5221         db_prio.priority = strtoul(argv[1], NULL, 0);
5222
5223         ret = ctdb_ctrl_set_db_priority(ctdb, TIMELIMIT(), options.pnn, &db_prio);
5224         if (ret != 0) {
5225                 DEBUG(DEBUG_ERR,("Unable to set db prio\n"));
5226                 return -1;
5227         }
5228
5229         return 0;
5230 }
5231
5232 /*
5233   get db priority
5234  */
5235 static int control_getdbprio(struct ctdb_context *ctdb, int argc, const char **argv)
5236 {
5237         uint32_t db_id, priority;
5238         int ret;
5239
5240         if (argc < 1) {
5241                 usage();
5242         }
5243
5244         if (!db_exists(ctdb, argv[0], &db_id, NULL, NULL)) {
5245                 return -1;
5246         }
5247
5248         ret = ctdb_ctrl_get_db_priority(ctdb, TIMELIMIT(), options.pnn, db_id, &priority);
5249         if (ret != 0) {
5250                 DEBUG(DEBUG_ERR,("Unable to get db prio\n"));
5251                 return -1;
5252         }
5253
5254         DEBUG(DEBUG_ERR,("Priority:%u\n", priority));
5255
5256         return 0;
5257 }
5258
5259 /*
5260   set the sticky records capability for a database
5261  */
5262 static int control_setdbsticky(struct ctdb_context *ctdb, int argc, const char **argv)
5263 {
5264         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
5265         uint32_t db_id;
5266         int ret;
5267
5268         if (argc < 1) {
5269                 usage();
5270         }
5271
5272         if (!db_exists(ctdb, argv[0], &db_id, NULL, NULL)) {
5273                 return -1;
5274         }
5275
5276         ret = ctdb_ctrl_set_db_sticky(ctdb, options.pnn, db_id);
5277         if (ret != 0) {
5278                 DEBUG(DEBUG_ERR,("Unable to set db to support sticky records\n"));
5279                 talloc_free(tmp_ctx);
5280                 return -1;
5281         }
5282
5283         talloc_free(tmp_ctx);
5284         return 0;
5285 }
5286
5287 /*
5288   set the readonly capability for a database
5289  */
5290 static int control_setdbreadonly(struct ctdb_context *ctdb, int argc, const char **argv)
5291 {
5292         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
5293         uint32_t db_id;
5294         int ret;
5295
5296         if (argc < 1) {
5297                 usage();
5298         }
5299
5300         if (!db_exists(ctdb, argv[0], &db_id, NULL, NULL)) {
5301                 return -1;
5302         }
5303
5304         ret = ctdb_ctrl_set_db_readonly(ctdb, options.pnn, db_id);
5305         if (ret != 0) {
5306                 DEBUG(DEBUG_ERR,("Unable to set db to support readonly\n"));
5307                 talloc_free(tmp_ctx);
5308                 return -1;
5309         }
5310
5311         talloc_free(tmp_ctx);
5312         return 0;
5313 }
5314
5315 /*
5316   get db seqnum
5317  */
5318 static int control_getdbseqnum(struct ctdb_context *ctdb, int argc, const char **argv)
5319 {
5320         uint32_t db_id;
5321         uint64_t seqnum;
5322         int ret;
5323
5324         if (argc < 1) {
5325                 usage();
5326         }
5327
5328         if (!db_exists(ctdb, argv[0], &db_id, NULL, NULL)) {
5329                 return -1;
5330         }
5331
5332         ret = ctdb_ctrl_getdbseqnum(ctdb, TIMELIMIT(), options.pnn, db_id, &seqnum);
5333         if (ret != 0) {
5334                 DEBUG(DEBUG_ERR, ("Unable to get seqnum from node."));
5335                 return -1;
5336         }
5337
5338         printf("Sequence number:%lld\n", (long long)seqnum);
5339
5340         return 0;
5341 }
5342
5343 /*
5344   run an eventscript on a node
5345  */
5346 static int control_eventscript(struct ctdb_context *ctdb, int argc, const char **argv)
5347 {
5348         TDB_DATA data;
5349         int ret;
5350         int32_t res;
5351         char *errmsg;
5352         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
5353
5354         if (argc != 1) {
5355                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
5356                 return -1;
5357         }
5358
5359         data.dptr = (unsigned char *)discard_const(argv[0]);
5360         data.dsize = strlen((char *)data.dptr) + 1;
5361
5362         DEBUG(DEBUG_ERR, ("Running eventscripts with arguments \"%s\" on node %u\n", data.dptr, options.pnn));
5363
5364         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_RUN_EVENTSCRIPTS,
5365                            0, data, tmp_ctx, NULL, &res, NULL, &errmsg);
5366         if (ret != 0 || res != 0) {
5367                 if (errmsg != NULL) {
5368                         DEBUG(DEBUG_ERR,
5369                               ("Failed to run eventscripts - %s\n", errmsg));
5370                 } else {
5371                         DEBUG(DEBUG_ERR, ("Failed to run eventscripts\n"));
5372                 }
5373                 talloc_free(tmp_ctx);
5374                 return -1;
5375         }
5376         talloc_free(tmp_ctx);
5377         return 0;
5378 }
5379
5380 #define DB_VERSION 1
5381 #define MAX_DB_NAME 64
5382 struct db_file_header {
5383         unsigned long version;
5384         time_t timestamp;
5385         unsigned long persistent;
5386         unsigned long size;
5387         const char name[MAX_DB_NAME];
5388 };
5389
5390 struct backup_data {
5391         struct ctdb_marshall_buffer *records;
5392         uint32_t len;
5393         uint32_t total;
5394         bool traverse_error;
5395 };
5396
5397 static int backup_traverse(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *private)
5398 {
5399         struct backup_data *bd = talloc_get_type(private, struct backup_data);
5400         struct ctdb_rec_data *rec;
5401
5402         /* add the record */
5403         rec = ctdb_marshall_record(bd->records, 0, key, NULL, data);
5404         if (rec == NULL) {
5405                 bd->traverse_error = true;
5406                 DEBUG(DEBUG_ERR,("Failed to marshall record\n"));
5407                 return -1;
5408         }
5409         bd->records = talloc_realloc_size(NULL, bd->records, rec->length + bd->len);
5410         if (bd->records == NULL) {
5411                 DEBUG(DEBUG_ERR,("Failed to expand marshalling buffer\n"));
5412                 bd->traverse_error = true;
5413                 return -1;
5414         }
5415         bd->records->count++;
5416         memcpy(bd->len+(uint8_t *)bd->records, rec, rec->length);
5417         bd->len += rec->length;
5418         talloc_free(rec);
5419
5420         bd->total++;
5421         return 0;
5422 }
5423
5424 /*
5425  * backup a database to a file 
5426  */
5427 static int control_backupdb(struct ctdb_context *ctdb, int argc, const char **argv)
5428 {
5429         const char *db_name;
5430         int ret;
5431         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
5432         struct db_file_header dbhdr;
5433         struct ctdb_db_context *ctdb_db;
5434         struct backup_data *bd;
5435         int fh = -1;
5436         int status = -1;
5437         const char *reason = NULL;
5438         uint32_t db_id;
5439         uint8_t flags;
5440
5441         assert_single_node_only();
5442
5443         if (argc != 2) {
5444                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
5445                 return -1;
5446         }
5447
5448         if (!db_exists(ctdb, argv[0], &db_id, &db_name, &flags)) {
5449                 return -1;
5450         }
5451
5452         ret = ctdb_ctrl_getdbhealth(ctdb, TIMELIMIT(), options.pnn,
5453                                     db_id, tmp_ctx, &reason);
5454         if (ret != 0) {
5455                 DEBUG(DEBUG_ERR,("Unable to get dbhealth for database '%s'\n",
5456                                  argv[0]));
5457                 talloc_free(tmp_ctx);
5458                 return -1;
5459         }
5460         if (reason) {
5461                 uint32_t allow_unhealthy = 0;
5462
5463                 ctdb_ctrl_get_tunable(ctdb, TIMELIMIT(), options.pnn,
5464                                       "AllowUnhealthyDBRead",
5465                                       &allow_unhealthy);
5466
5467                 if (allow_unhealthy != 1) {
5468                         DEBUG(DEBUG_ERR,("database '%s' is unhealthy: %s\n",
5469                                          argv[0], reason));
5470
5471                         DEBUG(DEBUG_ERR,("disallow backup : tunable AllowUnhealthyDBRead = %u\n",
5472                                          allow_unhealthy));
5473                         talloc_free(tmp_ctx);
5474                         return -1;
5475                 }
5476
5477                 DEBUG(DEBUG_WARNING,("WARNING database '%s' is unhealthy - see 'ctdb getdbstatus %s'\n",
5478                                      argv[0], argv[0]));
5479                 DEBUG(DEBUG_WARNING,("WARNING! allow backup of unhealthy database: "
5480                                      "tunnable AllowUnhealthyDBRead = %u\n",
5481                                      allow_unhealthy));
5482         }
5483
5484         ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), db_name, flags & CTDB_DB_FLAGS_PERSISTENT, 0);
5485         if (ctdb_db == NULL) {
5486                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", argv[0]));
5487                 talloc_free(tmp_ctx);
5488                 return -1;
5489         }
5490
5491
5492         ret = tdb_transaction_start(ctdb_db->ltdb->tdb);
5493         if (ret == -1) {
5494                 DEBUG(DEBUG_ERR,("Failed to start transaction\n"));
5495                 talloc_free(tmp_ctx);
5496                 return -1;
5497         }
5498
5499
5500         bd = talloc_zero(tmp_ctx, struct backup_data);
5501         if (bd == NULL) {
5502                 DEBUG(DEBUG_ERR,("Failed to allocate backup_data\n"));
5503                 talloc_free(tmp_ctx);
5504                 return -1;
5505         }
5506
5507         bd->records = talloc_zero(bd, struct ctdb_marshall_buffer);
5508         if (bd->records == NULL) {
5509                 DEBUG(DEBUG_ERR,("Failed to allocate ctdb_marshall_buffer\n"));
5510                 talloc_free(tmp_ctx);
5511                 return -1;
5512         }
5513
5514         bd->len = offsetof(struct ctdb_marshall_buffer, data);
5515         bd->records->db_id = ctdb_db->db_id;
5516         /* traverse the database collecting all records */
5517         if (tdb_traverse_read(ctdb_db->ltdb->tdb, backup_traverse, bd) == -1 ||
5518             bd->traverse_error) {
5519                 DEBUG(DEBUG_ERR,("Traverse error\n"));
5520                 talloc_free(tmp_ctx);
5521                 return -1;              
5522         }
5523
5524         tdb_transaction_cancel(ctdb_db->ltdb->tdb);
5525
5526
5527         fh = open(argv[1], O_RDWR|O_CREAT, 0600);
5528         if (fh == -1) {
5529                 DEBUG(DEBUG_ERR,("Failed to open file '%s'\n", argv[1]));
5530                 talloc_free(tmp_ctx);
5531                 return -1;
5532         }
5533
5534         ZERO_STRUCT(dbhdr);
5535         dbhdr.version = DB_VERSION;
5536         dbhdr.timestamp = time(NULL);
5537         dbhdr.persistent = flags & CTDB_DB_FLAGS_PERSISTENT;
5538         dbhdr.size = bd->len;
5539         if (strlen(argv[0]) >= MAX_DB_NAME) {
5540                 DEBUG(DEBUG_ERR,("Too long dbname\n"));
5541                 goto done;
5542         }
5543         strncpy(discard_const(dbhdr.name), argv[0], MAX_DB_NAME-1);
5544         ret = sys_write(fh, &dbhdr, sizeof(dbhdr));
5545         if (ret == -1) {
5546                 DEBUG(DEBUG_ERR,("write failed: %s\n", strerror(errno)));
5547                 goto done;
5548         }
5549         ret = sys_write(fh, bd->records, bd->len);
5550         if (ret == -1) {
5551                 DEBUG(DEBUG_ERR,("write failed: %s\n", strerror(errno)));
5552                 goto done;
5553         }
5554
5555         status = 0;
5556 done:
5557         if (fh != -1) {
5558                 ret = close(fh);
5559                 if (ret == -1) {
5560                         DEBUG(DEBUG_ERR,("close failed: %s\n", strerror(errno)));
5561                 }
5562         }
5563
5564         DEBUG(DEBUG_ERR,("Database backed up to %s\n", argv[1]));
5565
5566         talloc_free(tmp_ctx);
5567         return status;
5568 }
5569
5570 /*
5571  * restore a database from a file 
5572  */
5573 static int control_restoredb(struct ctdb_context *ctdb, int argc, const char **argv)
5574 {
5575         int ret;
5576         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
5577         TDB_DATA outdata;
5578         TDB_DATA data;
5579         struct db_file_header dbhdr;
5580         struct ctdb_db_context *ctdb_db;
5581         struct ctdb_node_map *nodemap=NULL;
5582         struct ctdb_vnn_map *vnnmap=NULL;
5583         int i, fh;
5584         struct ctdb_control_wipe_database w;
5585         uint32_t *nodes;
5586         uint32_t generation;
5587         struct tm *tm;
5588         char tbuf[100];
5589         char *dbname;
5590
5591         assert_single_node_only();
5592
5593         if (argc < 1 || argc > 2) {
5594                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
5595                 return -1;
5596         }
5597
5598         fh = open(argv[0], O_RDONLY);
5599         if (fh == -1) {
5600                 DEBUG(DEBUG_ERR,("Failed to open file '%s'\n", argv[0]));
5601                 talloc_free(tmp_ctx);
5602                 return -1;
5603         }
5604
5605         sys_read(fh, &dbhdr, sizeof(dbhdr));
5606         if (dbhdr.version != DB_VERSION) {
5607                 DEBUG(DEBUG_ERR,("Invalid version of database dump. File is version %lu but expected version was %u\n", dbhdr.version, DB_VERSION));
5608                 close(fh);
5609                 talloc_free(tmp_ctx);
5610                 return -1;
5611         }
5612
5613         dbname = discard_const(dbhdr.name);
5614         if (argc == 2) {
5615                 dbname = discard_const(argv[1]);
5616         }
5617
5618         outdata.dsize = dbhdr.size;
5619         outdata.dptr = talloc_size(tmp_ctx, outdata.dsize);
5620         if (outdata.dptr == NULL) {
5621                 DEBUG(DEBUG_ERR,("Failed to allocate data of size '%lu'\n", dbhdr.size));
5622                 close(fh);
5623                 talloc_free(tmp_ctx);
5624                 return -1;
5625         }               
5626         sys_read(fh, outdata.dptr, outdata.dsize);
5627         close(fh);
5628
5629         tm = localtime(&dbhdr.timestamp);
5630         strftime(tbuf,sizeof(tbuf)-1,"%Y/%m/%d %H:%M:%S", tm);
5631         printf("Restoring database '%s' from backup @ %s\n",
5632                 dbname, tbuf);
5633
5634
5635         ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), dbname, dbhdr.persistent, 0);
5636         if (ctdb_db == NULL) {
5637                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", dbname));
5638                 talloc_free(tmp_ctx);
5639                 return -1;
5640         }
5641
5642         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb, &nodemap);
5643         if (ret != 0) {
5644                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
5645                 talloc_free(tmp_ctx);
5646                 return ret;
5647         }
5648
5649
5650         ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &vnnmap);
5651         if (ret != 0) {
5652                 DEBUG(DEBUG_ERR, ("Unable to get vnnmap from node %u\n", options.pnn));
5653                 talloc_free(tmp_ctx);
5654                 return ret;
5655         }
5656
5657         /* freeze all nodes */
5658         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
5659         for (i=1; i<=NUM_DB_PRIORITIES; i++) {
5660                 if (ctdb_client_async_control(ctdb, CTDB_CONTROL_FREEZE,
5661                                         nodes, i,
5662                                         TIMELIMIT(),
5663                                         false, tdb_null,
5664                                         NULL, NULL,
5665                                         NULL) != 0) {
5666                         DEBUG(DEBUG_ERR, ("Unable to freeze nodes.\n"));
5667                         ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
5668                         talloc_free(tmp_ctx);
5669                         return -1;
5670                 }
5671         }
5672
5673         generation = vnnmap->generation;
5674         data.dptr = (void *)&generation;
5675         data.dsize = sizeof(generation);
5676
5677         /* start a cluster wide transaction */
5678         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
5679         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_START,
5680                                         nodes, 0,
5681                                         TIMELIMIT(), false, data,
5682                                         NULL, NULL,
5683                                         NULL) != 0) {
5684                 DEBUG(DEBUG_ERR, ("Unable to start cluster wide transactions.\n"));
5685                 return -1;
5686         }
5687
5688
5689         w.db_id = ctdb_db->db_id;
5690         w.transaction_id = generation;
5691
5692         data.dptr = (void *)&w;
5693         data.dsize = sizeof(w);
5694
5695         /* wipe all the remote databases. */
5696         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
5697         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_WIPE_DATABASE,
5698                                         nodes, 0,
5699                                         TIMELIMIT(), false, data,
5700                                         NULL, NULL,
5701                                         NULL) != 0) {
5702                 DEBUG(DEBUG_ERR, ("Unable to wipe database.\n"));
5703                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
5704                 talloc_free(tmp_ctx);
5705                 return -1;
5706         }
5707         
5708         /* push the database */
5709         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
5710         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_PUSH_DB,
5711                                         nodes, 0,
5712                                         TIMELIMIT(), false, outdata,
5713                                         NULL, NULL,
5714                                         NULL) != 0) {
5715                 DEBUG(DEBUG_ERR, ("Failed to push database.\n"));
5716                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
5717                 talloc_free(tmp_ctx);
5718                 return -1;
5719         }
5720
5721         data.dptr = (void *)&ctdb_db->db_id;
5722         data.dsize = sizeof(ctdb_db->db_id);
5723
5724         /* mark the database as healthy */
5725         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
5726         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_DB_SET_HEALTHY,
5727                                         nodes, 0,
5728                                         TIMELIMIT(), false, data,
5729                                         NULL, NULL,
5730                                         NULL) != 0) {
5731                 DEBUG(DEBUG_ERR, ("Failed to mark database as healthy.\n"));
5732                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
5733                 talloc_free(tmp_ctx);
5734                 return -1;
5735         }
5736
5737         data.dptr = (void *)&generation;
5738         data.dsize = sizeof(generation);
5739
5740         /* commit all the changes */
5741         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_COMMIT,
5742                                         nodes, 0,
5743                                         TIMELIMIT(), false, data,
5744                                         NULL, NULL,
5745                                         NULL) != 0) {
5746                 DEBUG(DEBUG_ERR, ("Unable to commit databases.\n"));
5747                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
5748                 talloc_free(tmp_ctx);
5749                 return -1;
5750         }
5751
5752
5753         /* thaw all nodes */
5754         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
5755         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_THAW,
5756                                         nodes, 0,
5757                                         TIMELIMIT(),
5758                                         false, tdb_null,
5759                                         NULL, NULL,
5760                                         NULL) != 0) {
5761                 DEBUG(DEBUG_ERR, ("Unable to thaw nodes.\n"));
5762                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
5763                 talloc_free(tmp_ctx);
5764                 return -1;
5765         }
5766
5767
5768         talloc_free(tmp_ctx);
5769         return 0;
5770 }
5771
5772 /*
5773  * dump a database backup from a file
5774  */
5775 static int control_dumpdbbackup(struct ctdb_context *ctdb, int argc, const char **argv)
5776 {
5777         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
5778         TDB_DATA outdata;
5779         struct db_file_header dbhdr;
5780         int i, fh;
5781         struct tm *tm;
5782         char tbuf[100];
5783         struct ctdb_rec_data *rec = NULL;
5784         struct ctdb_marshall_buffer *m;
5785         struct ctdb_dump_db_context c;
5786
5787         assert_single_node_only();
5788
5789         if (argc != 1) {
5790                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
5791                 return -1;
5792         }
5793
5794         fh = open(argv[0], O_RDONLY);
5795         if (fh == -1) {
5796                 DEBUG(DEBUG_ERR,("Failed to open file '%s'\n", argv[0]));
5797                 talloc_free(tmp_ctx);
5798                 return -1;
5799         }
5800
5801         sys_read(fh, &dbhdr, sizeof(dbhdr));
5802         if (dbhdr.version != DB_VERSION) {
5803                 DEBUG(DEBUG_ERR,("Invalid version of database dump. File is version %lu but expected version was %u\n", dbhdr.version, DB_VERSION));
5804                 close(fh);
5805                 talloc_free(tmp_ctx);
5806                 return -1;
5807         }
5808
5809         outdata.dsize = dbhdr.size;
5810         outdata.dptr = talloc_size(tmp_ctx, outdata.dsize);
5811         if (outdata.dptr == NULL) {
5812                 DEBUG(DEBUG_ERR,("Failed to allocate data of size '%lu'\n", dbhdr.size));
5813                 close(fh);
5814                 talloc_free(tmp_ctx);
5815                 return -1;
5816         }
5817         sys_read(fh, outdata.dptr, outdata.dsize);
5818         close(fh);
5819         m = (struct ctdb_marshall_buffer *)outdata.dptr;
5820
5821         tm = localtime(&dbhdr.timestamp);
5822         strftime(tbuf,sizeof(tbuf)-1,"%Y/%m/%d %H:%M:%S", tm);
5823         printf("Backup of database name:'%s' dbid:0x%x08x from @ %s\n",
5824                 dbhdr.name, m->db_id, tbuf);
5825
5826         ZERO_STRUCT(c);
5827         c.f = stdout;
5828         c.printemptyrecords = (bool)options.printemptyrecords;
5829         c.printdatasize = (bool)options.printdatasize;
5830         c.printlmaster = false;
5831         c.printhash = (bool)options.printhash;
5832         c.printrecordflags = (bool)options.printrecordflags;
5833
5834         for (i=0; i < m->count; i++) {
5835                 uint32_t reqid = 0;
5836                 TDB_DATA key, data;
5837
5838                 /* we do not want the header splitted, so we pass NULL*/
5839                 rec = ctdb_marshall_loop_next(m, rec, &reqid,
5840                                               NULL, &key, &data);
5841
5842                 ctdb_dumpdb_record(ctdb, key, data, &c);
5843         }
5844
5845         printf("Dumped %d records\n", i);
5846         talloc_free(tmp_ctx);
5847         return 0;
5848 }
5849
5850 /*
5851  * wipe a database from a file
5852  */
5853 static int control_wipedb(struct ctdb_context *ctdb, int argc,
5854                           const char **argv)
5855 {
5856         const char *db_name;
5857         int ret;
5858         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
5859         TDB_DATA data;
5860         struct ctdb_db_context *ctdb_db;
5861         struct ctdb_node_map *nodemap = NULL;
5862         struct ctdb_vnn_map *vnnmap = NULL;
5863         int i;
5864         struct ctdb_control_wipe_database w;
5865         uint32_t *nodes;
5866         uint32_t generation;
5867         uint8_t flags;
5868
5869         assert_single_node_only();
5870
5871         if (argc != 1) {
5872                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
5873                 return -1;
5874         }
5875
5876         if (!db_exists(ctdb, argv[0], NULL, &db_name, &flags)) {
5877                 return -1;
5878         }
5879
5880         ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), db_name, flags & CTDB_DB_FLAGS_PERSISTENT, 0);
5881         if (ctdb_db == NULL) {
5882                 DEBUG(DEBUG_ERR, ("Unable to attach to database '%s'\n",
5883                                   argv[0]));
5884                 talloc_free(tmp_ctx);
5885                 return -1;
5886         }
5887
5888         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb,
5889                                    &nodemap);
5890         if (ret != 0) {
5891                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n",
5892                                   options.pnn));
5893                 talloc_free(tmp_ctx);
5894                 return ret;
5895         }
5896
5897         ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx,
5898                                   &vnnmap);
5899         if (ret != 0) {
5900                 DEBUG(DEBUG_ERR, ("Unable to get vnnmap from node %u\n",
5901                                   options.pnn));
5902                 talloc_free(tmp_ctx);
5903                 return ret;
5904         }
5905
5906         /* freeze all nodes */
5907         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
5908         for (i=1; i<=NUM_DB_PRIORITIES; i++) {
5909                 ret = ctdb_client_async_control(ctdb, CTDB_CONTROL_FREEZE,
5910                                                 nodes, i,
5911                                                 TIMELIMIT(),
5912                                                 false, tdb_null,
5913                                                 NULL, NULL,
5914                                                 NULL);
5915                 if (ret != 0) {
5916                         DEBUG(DEBUG_ERR, ("Unable to freeze nodes.\n"));
5917                         ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn,
5918                                              CTDB_RECOVERY_ACTIVE);
5919                         talloc_free(tmp_ctx);
5920                         return -1;
5921                 }
5922         }
5923
5924         generation = vnnmap->generation;
5925         data.dptr = (void *)&generation;
5926         data.dsize = sizeof(generation);
5927
5928         /* start a cluster wide transaction */
5929         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
5930         ret = ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_START,
5931                                         nodes, 0,
5932                                         TIMELIMIT(), false, data,
5933                                         NULL, NULL,
5934                                         NULL);
5935         if (ret!= 0) {
5936                 DEBUG(DEBUG_ERR, ("Unable to start cluster wide "
5937                                   "transactions.\n"));
5938                 return -1;
5939         }
5940
5941         w.db_id = ctdb_db->db_id;
5942         w.transaction_id = generation;
5943
5944         data.dptr = (void *)&w;
5945         data.dsize = sizeof(w);
5946
5947         /* wipe all the remote databases. */
5948         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
5949         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_WIPE_DATABASE,
5950                                         nodes, 0,
5951                                         TIMELIMIT(), false, data,
5952                                         NULL, NULL,
5953                                         NULL) != 0) {
5954                 DEBUG(DEBUG_ERR, ("Unable to wipe database.\n"));
5955                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
5956                 talloc_free(tmp_ctx);
5957                 return -1;
5958         }
5959
5960         data.dptr = (void *)&ctdb_db->db_id;
5961         data.dsize = sizeof(ctdb_db->db_id);
5962
5963         /* mark the database as healthy */
5964         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
5965         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_DB_SET_HEALTHY,
5966                                         nodes, 0,
5967                                         TIMELIMIT(), false, data,
5968                                         NULL, NULL,
5969                                         NULL) != 0) {
5970                 DEBUG(DEBUG_ERR, ("Failed to mark database as healthy.\n"));
5971                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
5972                 talloc_free(tmp_ctx);
5973                 return -1;
5974         }
5975
5976         data.dptr = (void *)&generation;
5977         data.dsize = sizeof(generation);
5978
5979         /* commit all the changes */
5980         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_COMMIT,
5981                                         nodes, 0,
5982                                         TIMELIMIT(), false, data,
5983                                         NULL, NULL,
5984                                         NULL) != 0) {
5985                 DEBUG(DEBUG_ERR, ("Unable to commit databases.\n"));
5986                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
5987                 talloc_free(tmp_ctx);
5988                 return -1;
5989         }
5990
5991         /* thaw all nodes */
5992         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
5993         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_THAW,
5994                                         nodes, 0,
5995                                         TIMELIMIT(),
5996                                         false, tdb_null,
5997                                         NULL, NULL,
5998                                         NULL) != 0) {
5999                 DEBUG(DEBUG_ERR, ("Unable to thaw nodes.\n"));
6000                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
6001                 talloc_free(tmp_ctx);
6002                 return -1;
6003         }
6004
6005         DEBUG(DEBUG_ERR, ("Database wiped.\n"));
6006
6007         talloc_free(tmp_ctx);
6008         return 0;
6009 }
6010
6011 /*
6012   dump memory usage
6013  */
6014 static int control_dumpmemory(struct ctdb_context *ctdb, int argc, const char **argv)
6015 {
6016         TDB_DATA data;
6017         int ret;
6018         int32_t res;
6019         char *errmsg;
6020         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
6021         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_DUMP_MEMORY,
6022                            0, tdb_null, tmp_ctx, &data, &res, NULL, &errmsg);
6023         if (ret != 0 || res != 0) {
6024                 DEBUG(DEBUG_ERR,("Failed to dump memory - %s\n", errmsg));
6025                 talloc_free(tmp_ctx);
6026                 return -1;
6027         }
6028         sys_write(1, data.dptr, data.dsize);
6029         talloc_free(tmp_ctx);
6030         return 0;
6031 }
6032
6033 /*
6034   handler for memory dumps
6035 */
6036 static void mem_dump_handler(struct ctdb_context *ctdb, uint64_t srvid, 
6037                              TDB_DATA data, void *private_data)
6038 {
6039         sys_write(1, data.dptr, data.dsize);
6040         exit(0);
6041 }
6042
6043 /*
6044   dump memory usage on the recovery daemon
6045  */
6046 static int control_rddumpmemory(struct ctdb_context *ctdb, int argc, const char **argv)
6047 {
6048         int ret;
6049         TDB_DATA data;
6050         struct srvid_request rd;
6051
6052         rd.pnn = ctdb_get_pnn(ctdb);
6053         rd.srvid = getpid();
6054
6055         /* register a message port for receiveing the reply so that we
6056            can receive the reply
6057         */
6058         ctdb_client_set_message_handler(ctdb, rd.srvid, mem_dump_handler, NULL);
6059
6060
6061         data.dptr = (uint8_t *)&rd;
6062         data.dsize = sizeof(rd);
6063
6064         ret = ctdb_client_send_message(ctdb, options.pnn, CTDB_SRVID_MEM_DUMP, data);
6065         if (ret != 0) {
6066                 DEBUG(DEBUG_ERR,("Failed to send memdump request message to %u\n", options.pnn));
6067                 return -1;
6068         }
6069
6070         /* this loop will terminate when we have received the reply */
6071         while (1) {     
6072                 event_loop_once(ctdb->ev);
6073         }
6074
6075         return 0;
6076 }
6077
6078 /*
6079   send a message to a srvid
6080  */
6081 static int control_msgsend(struct ctdb_context *ctdb, int argc, const char **argv)
6082 {
6083         unsigned long srvid;
6084         int ret;
6085         TDB_DATA data;
6086
6087         if (argc < 2) {
6088                 usage();
6089         }
6090
6091         srvid      = strtoul(argv[0], NULL, 0);
6092
6093         data.dptr = (uint8_t *)discard_const(argv[1]);
6094         data.dsize= strlen(argv[1]);
6095
6096         ret = ctdb_client_send_message(ctdb, CTDB_BROADCAST_CONNECTED, srvid, data);
6097         if (ret != 0) {
6098                 DEBUG(DEBUG_ERR,("Failed to send memdump request message to %u\n", options.pnn));
6099                 return -1;
6100         }
6101
6102         return 0;
6103 }
6104
6105 /*
6106   handler for msglisten
6107 */
6108 static void msglisten_handler(struct ctdb_context *ctdb, uint64_t srvid, 
6109                              TDB_DATA data, void *private_data)
6110 {
6111         int i;
6112
6113         printf("Message received: ");
6114         for (i=0;i<data.dsize;i++) {
6115                 printf("%c", data.dptr[i]);
6116         }
6117         printf("\n");
6118 }
6119
6120 /*
6121   listen for messages on a messageport
6122  */
6123 static int control_msglisten(struct ctdb_context *ctdb, int argc, const char **argv)
6124 {
6125         uint64_t srvid;
6126
6127         srvid = getpid();
6128
6129         /* register a message port and listen for messages
6130         */
6131         ctdb_client_set_message_handler(ctdb, srvid, msglisten_handler, NULL);
6132         printf("Listening for messages on srvid:%d\n", (int)srvid);
6133
6134         while (1) {     
6135                 event_loop_once(ctdb->ev);
6136         }
6137
6138         return 0;
6139 }
6140
6141 /*
6142   list all nodes in the cluster
6143   we parse the nodes file directly
6144  */
6145 static int control_listnodes(struct ctdb_context *ctdb, int argc, const char **argv)
6146 {
6147         TALLOC_CTX *mem_ctx = talloc_new(NULL);
6148         struct ctdb_node_map *node_map;
6149         int i;
6150
6151         assert_single_node_only();
6152
6153         node_map = read_nodes_file(mem_ctx);
6154         if (node_map == NULL) {
6155                 talloc_free(mem_ctx);
6156                 return -1;
6157         }
6158
6159         for (i = 0; i < node_map->num; i++) {
6160                 const char *addr;
6161
6162                 if (node_map->nodes[i].flags & NODE_FLAGS_DELETED) {
6163                         continue;
6164                 }
6165                 addr = ctdb_addr_to_str(&node_map->nodes[i].addr);
6166                 if (options.machinereadable){
6167                         printm(":%d:%s:\n", node_map->nodes[i].pnn, addr);
6168                 } else {
6169                         printf("%s\n", addr);
6170                 }
6171         }
6172         talloc_free(mem_ctx);
6173
6174         return 0;
6175 }
6176
6177 /**********************************************************************/
6178 /* reload the nodes file on all nodes */
6179
6180 static void get_nodes_files_callback(struct ctdb_context *ctdb,
6181                                      uint32_t node_pnn, int32_t res,
6182                                      TDB_DATA outdata, void *callback_data)
6183 {
6184         struct ctdb_node_map **maps =
6185                 talloc_get_type(callback_data, struct ctdb_node_map *);
6186
6187         if (outdata.dsize < offsetof(struct ctdb_node_map, nodes) ||
6188             outdata.dptr == NULL) {
6189                 DEBUG(DEBUG_ERR,
6190                       (__location__ " Invalid return data: %u %p\n",
6191                        (unsigned)outdata.dsize, outdata.dptr));
6192                 return;
6193         }
6194
6195         if (node_pnn >= talloc_array_length(maps)) {
6196                 DEBUG(DEBUG_ERR,
6197                       (__location__ " unexpected PNN %u\n", node_pnn));
6198                 return;
6199         }
6200
6201         maps[node_pnn] = talloc_memdup(maps, outdata.dptr, outdata.dsize);
6202 }
6203
6204 static void get_nodes_files_fail_callback(struct ctdb_context *ctdb,
6205                                           uint32_t node_pnn, int32_t res,
6206                                           TDB_DATA outdata, void *callback_data)
6207 {
6208         DEBUG(DEBUG_ERR,
6209               ("ERROR: Failed to get nodes file from node %u\n", node_pnn));
6210 }
6211
6212 static struct ctdb_node_map **
6213 ctdb_get_nodes_files(struct ctdb_context *ctdb,
6214                      TALLOC_CTX *mem_ctx,
6215                      struct timeval timeout,
6216                      struct ctdb_node_map *nodemap)
6217 {
6218         uint32_t *nodes;
6219         int ret;
6220         struct ctdb_node_map **maps;
6221
6222         maps = talloc_zero_array(mem_ctx, struct ctdb_node_map *, nodemap->num);
6223         CTDB_NO_MEMORY_NULL(ctdb, maps);
6224
6225         nodes = list_of_connected_nodes(ctdb, nodemap, mem_ctx, true);
6226
6227         ret = ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_NODES_FILE,
6228                                         nodes, 0, TIMELIMIT(),
6229                                         true, tdb_null,
6230                                         get_nodes_files_callback,
6231                                         get_nodes_files_fail_callback,
6232                                         maps);
6233         if (ret != 0) {
6234                 talloc_free(maps);
6235                 return NULL;
6236         }
6237
6238         return maps;
6239 }
6240
6241 static bool node_files_are_identical(struct ctdb_node_map *nm1,
6242                                      struct ctdb_node_map *nm2)
6243 {
6244         int i;
6245
6246         if (nm1->num != nm2->num) {
6247                 return false;
6248         }
6249         for (i = 0; i < nm1->num; i++) {
6250                 if (memcmp(&nm1->nodes[i], &nm2->nodes[i],
6251                            sizeof(struct ctdb_node_and_flags)) != 0) {
6252                         return false;
6253                 }
6254         }
6255
6256         return true;
6257 }
6258
6259 static bool check_all_node_files_are_identical(struct ctdb_context *ctdb,
6260                                                TALLOC_CTX *mem_ctx,
6261                                                struct timeval timeout,
6262                                                struct ctdb_node_map *nodemap,
6263                                                struct ctdb_node_map *file_nodemap)
6264 {
6265         static struct ctdb_node_map **maps;
6266         int i;
6267         bool ret = true;
6268
6269         maps = ctdb_get_nodes_files(ctdb, mem_ctx, timeout, nodemap);
6270         if (maps == NULL) {
6271                 return false;
6272         }
6273
6274         for (i = 0; i < talloc_array_length(maps); i++) {
6275                 if (maps[i] == NULL) {
6276                         continue;
6277                 }
6278                 if (!node_files_are_identical(file_nodemap, maps[i])) {
6279                         DEBUG(DEBUG_ERR,
6280                               ("ERROR: Node file on node %u differs from current node (%u)\n",
6281                                i, ctdb_get_pnn(ctdb)));
6282                         ret = false;
6283                 }
6284         }
6285
6286         return ret;
6287 }
6288
6289 /*
6290   reload the nodes file on the local node
6291  */
6292 static bool sanity_check_nodes_file_changes(TALLOC_CTX *mem_ctx,
6293                                             struct ctdb_node_map *nodemap,
6294                                             struct ctdb_node_map *file_nodemap)
6295 {
6296         int i;
6297         bool should_abort = false;
6298         bool have_changes = false;
6299
6300         for (i=0; i<nodemap->num; i++) {
6301                 if (i >= file_nodemap->num) {
6302                         DEBUG(DEBUG_ERR,
6303                               ("ERROR: Node %u (%s) missing from nodes file\n",
6304                                nodemap->nodes[i].pnn,
6305                                ctdb_addr_to_str(&nodemap->nodes[i].addr)));
6306                         should_abort = true;
6307                         continue;
6308                 }
6309                 if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED &&
6310                     file_nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
6311                         /* Node remains deleted */
6312                         DEBUG(DEBUG_INFO,
6313                               ("Node %u is unchanged (DELETED)\n",
6314                                nodemap->nodes[i].pnn));
6315                 } else if (!(nodemap->nodes[i].flags & NODE_FLAGS_DELETED) &&
6316                            !(file_nodemap->nodes[i].flags & NODE_FLAGS_DELETED)) {
6317                         /* Node not newly nor previously deleted */
6318                         if (!ctdb_same_ip(&nodemap->nodes[i].addr,
6319                                           &file_nodemap->nodes[i].addr)) {
6320                                 DEBUG(DEBUG_ERR,
6321                                       ("ERROR: Node %u has changed IP address (was %s, now %s)\n",
6322                                        nodemap->nodes[i].pnn,
6323                                        /* ctdb_addr_to_str() returns a static */
6324                                        talloc_strdup(mem_ctx,
6325                                                      ctdb_addr_to_str(&nodemap->nodes[i].addr)),
6326                                        ctdb_addr_to_str(&file_nodemap->nodes[i].addr)));
6327                                 should_abort = true;
6328                         } else {
6329                                 DEBUG(DEBUG_INFO,
6330                                       ("Node %u is unchanged\n",
6331                                        nodemap->nodes[i].pnn));
6332                                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
6333                                         DEBUG(DEBUG_WARNING,
6334                                               ("WARNING: Node %u is disconnected. You MUST fix this node manually!\n",
6335                                                nodemap->nodes[i].pnn));
6336                                 }
6337                         }
6338                 } else if (file_nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
6339                         /* Node is being deleted */
6340                         DEBUG(DEBUG_NOTICE,
6341                               ("Node %u is DELETED\n",
6342                                nodemap->nodes[i].pnn));
6343                         have_changes = true;
6344                         if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
6345                                 DEBUG(DEBUG_ERR,
6346                                       ("ERROR: Node %u is still connected\n",
6347                                        nodemap->nodes[i].pnn));
6348                                 should_abort = true;
6349                         }
6350                 } else if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
6351                         /* Node was previously deleted */
6352                         DEBUG(DEBUG_NOTICE,
6353                               ("Node %u is UNDELETED\n", nodemap->nodes[i].pnn));
6354                         have_changes = true;
6355                 }
6356         }
6357
6358         if (should_abort) {
6359                 DEBUG(DEBUG_ERR,
6360                       ("ERROR: Nodes will not be reloaded due to previous error\n"));
6361                 talloc_free(mem_ctx);
6362                 exit(1);
6363         }
6364
6365         /* Leftover nodes in file are NEW */
6366         for (; i < file_nodemap->num; i++) {
6367                 DEBUG(DEBUG_NOTICE, ("Node %u is NEW\n",
6368                                      file_nodemap->nodes[i].pnn));
6369                 have_changes = true;
6370         }
6371
6372         return have_changes;
6373 }
6374
6375 static void reload_nodes_fail_callback(struct ctdb_context *ctdb,
6376                                        uint32_t node_pnn, int32_t res,
6377                                        TDB_DATA outdata, void *callback_data)
6378 {
6379         DEBUG(DEBUG_WARNING,
6380               ("WARNING: Node %u failed to reload nodes. You MUST fix this node manually!\n",
6381                node_pnn));
6382 }
6383
6384 static int control_reload_nodes_file(struct ctdb_context *ctdb, int argc, const char **argv)
6385 {
6386         int i, ret;
6387         struct ctdb_node_map *nodemap=NULL;
6388         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
6389         struct ctdb_node_map *file_nodemap;
6390         uint32_t *conn;
6391         uint32_t timeout;
6392
6393         assert_current_node_only(ctdb);
6394
6395         /* Load both the current nodemap and the contents of the local
6396          * nodes file.  Compare and sanity check them before doing
6397          * anything. */
6398
6399         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
6400         if (ret != 0) {
6401                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
6402                 return ret;
6403         }
6404
6405         file_nodemap = read_nodes_file(tmp_ctx);
6406         if (file_nodemap == NULL) {
6407                 DEBUG(DEBUG_ERR,("Failed to read nodes file\n"));
6408                 talloc_free(tmp_ctx);
6409                 return -1;
6410         }
6411
6412         if (!check_all_node_files_are_identical(ctdb, tmp_ctx, TIMELIMIT(),
6413                                                 nodemap, file_nodemap)) {
6414                 return -1;
6415         }
6416
6417         if (!sanity_check_nodes_file_changes(tmp_ctx, nodemap, file_nodemap)) {
6418                 DEBUG(DEBUG_NOTICE,
6419                       ("No change in nodes file, skipping unnecessary reload\n"));
6420                 talloc_free(tmp_ctx);
6421                 return 0;
6422         }
6423
6424         /* Now make the changes */
6425         conn = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
6426         for (i = 0; i < talloc_array_length(conn); i++) {
6427                 DEBUG(DEBUG_NOTICE, ("Reloading nodes file on node %u\n",
6428                                      conn[i]));
6429         }
6430
6431         /* Another timeout could be used, such as ReRecoveryTimeout or
6432          * a new one for this purpose.  However, this is the simplest
6433          * option. */
6434         timeout = options.timelimit;
6435         srvid_broadcast(ctdb, CTDB_SRVID_DISABLE_RECOVERIES, &timeout,
6436                         "Disable recoveries", true);
6437
6438
6439         ret = ctdb_client_async_control(ctdb, CTDB_CONTROL_RELOAD_NODES_FILE,
6440                                         conn, 0, TIMELIMIT(),
6441                                         true, tdb_null,
6442                                         NULL, reload_nodes_fail_callback,
6443                                         NULL);
6444
6445         timeout = 0;
6446         srvid_broadcast(ctdb, CTDB_SRVID_DISABLE_RECOVERIES, &timeout,
6447                         "Enable recoveries", true);
6448
6449         talloc_free(tmp_ctx);
6450
6451         return 0;
6452 }
6453
6454
6455 static const struct {
6456         const char *name;
6457         int (*fn)(struct ctdb_context *, int, const char **);
6458         bool auto_all;
6459         bool without_daemon; /* can be run without daemon running ? */
6460         const char *msg;
6461         const char *args;
6462 } ctdb_commands[] = {
6463         { "version",         control_version,           true,   true,   "show version of ctdb" },
6464         { "status",          control_status,            true,   false,  "show node status" },
6465         { "uptime",          control_uptime,            true,   false,  "show node uptime" },
6466         { "ping",            control_ping,              true,   false,  "ping all nodes" },
6467         { "runstate",        control_runstate,          true,   false,  "get/check runstate of a node", "[setup|first_recovery|startup|running]" },
6468         { "getvar",          control_getvar,            true,   false,  "get a tunable variable",               "<name>"},
6469         { "setvar",          control_setvar,            true,   false,  "set a tunable variable",               "<name> <value>"},
6470         { "listvars",        control_listvars,          true,   false,  "list tunable variables"},
6471         { "statistics",      control_statistics,        false,  false, "show statistics" },
6472         { "statisticsreset", control_statistics_reset,  true,   false,  "reset statistics"},
6473         { "stats",           control_stats,             false,  false,  "show rolling statistics", "[number of history records]" },
6474         { "ip",              control_ip,                false,  false,  "show which public ip's that ctdb manages" },
6475         { "ipinfo",          control_ipinfo,            true,   false,  "show details about a public ip that ctdb manages", "<ip>" },
6476         { "ifaces",          control_ifaces,            true,   false,  "show which interfaces that ctdb manages" },
6477         { "setifacelink",    control_setifacelink,      true,   false,  "set interface link status", "<iface> <status>" },
6478         { "process-exists",  control_process_exists,    true,   false,  "check if a process exists on a node",  "<pid>"},
6479         { "getdbmap",        control_getdbmap,          true,   false,  "show the database map" },
6480         { "getdbstatus",     control_getdbstatus,       true,   false,  "show the status of a database", "<dbname|dbid>" },
6481         { "catdb",           control_catdb,             true,   false,  "dump a ctdb database" ,                     "<dbname|dbid>"},
6482         { "cattdb",          control_cattdb,            true,   false,  "dump a local tdb database" ,                     "<dbname|dbid>"},
6483         { "getmonmode",      control_getmonmode,        true,   false,  "show monitoring mode" },
6484         { "getcapabilities", control_getcapabilities,   true,   false,  "show node capabilities" },
6485         { "pnn",             control_pnn,               true,   false,  "show the pnn of the currnet node" },
6486         { "lvs",             control_lvs,               true,   false,  "show lvs configuration" },
6487         { "lvsmaster",       control_lvsmaster,         true,   false,  "show which node is the lvs master" },
6488         { "disablemonitor",      control_disable_monmode,true,  false,  "set monitoring mode to DISABLE" },
6489         { "enablemonitor",      control_enable_monmode, true,   false,  "set monitoring mode to ACTIVE" },
6490         { "setdebug",        control_setdebug,          true,   false,  "set debug level",                      "<EMERG|ALERT|CRIT|ERR|WARNING|NOTICE|INFO|DEBUG>" },
6491         { "getdebug",        control_getdebug,          true,   false,  "get debug level" },
6492         { "attach",          control_attach,            true,   false,  "attach to a database",                 "<dbname> [persistent]" },
6493         { "detach",          control_detach,            false,  false,  "detach from a database",                 "<dbname|dbid> [<dbname|dbid> ...]" },
6494         { "dumpmemory",      control_dumpmemory,        true,   false,  "dump memory map to stdout" },
6495         { "rddumpmemory",    control_rddumpmemory,      true,   false,  "dump memory map from the recovery daemon to stdout" },
6496         { "getpid",          control_getpid,            true,   false,  "get ctdbd process ID" },
6497         { "disable",         control_disable,           true,   false,  "disable a nodes public IP" },
6498         { "enable",          control_enable,            true,   false,  "enable a nodes public IP" },
6499         { "stop",            control_stop,              true,   false,  "stop a node" },
6500         { "continue",        control_continue,          true,   false,  "re-start a stopped node" },
6501         { "ban",             control_ban,               true,   false,  "ban a node from the cluster",          "<bantime>"},
6502         { "unban",           control_unban,             true,   false,  "unban a node" },
6503         { "showban",         control_showban,           true,   false,  "show ban information"},
6504         { "shutdown",        control_shutdown,          true,   false,  "shutdown ctdbd" },
6505         { "recover",         control_recover,           true,   false,  "force recovery" },
6506         { "sync",            control_ipreallocate,      false,  false,  "wait until ctdbd has synced all state changes" },
6507         { "ipreallocate",    control_ipreallocate,      false,  false,  "force the recovery daemon to perform a ip reallocation procedure" },
6508         { "thaw",            control_thaw,              true,   false,  "thaw databases", "[priority:1-3]" },
6509         { "isnotrecmaster",  control_isnotrecmaster,    false,  false,  "check if the local node is recmaster or not" },
6510         { "killtcp",         kill_tcp,                  false,  false, "kill a tcp connection.", "[<srcip:port> <dstip:port>]" },
6511         { "gratiousarp",     control_gratious_arp,      false,  false, "send a gratious arp", "<ip> <interface>" },
6512         { "tickle",          tickle_tcp,                false,  false, "send a tcp tickle ack", "<srcip:port> <dstip:port>" },
6513         { "gettickles",      control_get_tickles,       false,  false, "get the list of tickles registered for this ip", "<ip> [<port>]" },
6514         { "addtickle",       control_add_tickle,        false,  false, "add a tickle for this ip", "<ip>:<port> <ip>:<port>" },
6515
6516         { "deltickle",       control_del_tickle,        false,  false, "delete a tickle from this ip", "<ip>:<port> <ip>:<port>" },
6517
6518         { "regsrvid",        regsrvid,                  false,  false, "register a server id", "<pnn> <type> <id>" },
6519         { "unregsrvid",      unregsrvid,                false,  false, "unregister a server id", "<pnn> <type> <id>" },
6520         { "chksrvid",        chksrvid,                  false,  false, "check if a server id exists", "<pnn> <type> <id>" },
6521         { "getsrvids",       getsrvids,                 false,  false, "get a list of all server ids"},
6522         { "check_srvids",    check_srvids,              false,  false, "check if a srvid exists", "<id>+" },
6523         { "repack",          ctdb_repack,               false,  false, "repack all databases", "[max_freelist]"},
6524         { "listnodes",       control_listnodes,         false,  true, "list all nodes in the cluster"},
6525         { "reloadnodes",     control_reload_nodes_file, false,  false, "reload the nodes file and restart the transport on all nodes"},
6526         { "moveip",          control_moveip,            false,  false, "move/failover an ip address to another node", "<ip> <node>"},
6527         { "rebalanceip",     control_rebalanceip,       false,  false, "release an ip from the node and let recd rebalance it", "<ip>"},
6528         { "addip",           control_addip,             true,   false, "add a ip address to a node", "<ip/mask> <iface>"},
6529         { "delip",           control_delip,             false,  false, "delete an ip address from a node", "<ip>"},
6530         { "eventscript",     control_eventscript,       true,   false, "run the eventscript with the given parameters on a node", "<arguments>"},
6531         { "backupdb",        control_backupdb,          false,  false, "backup the database into a file.", "<dbname|dbid> <file>"},
6532         { "restoredb",        control_restoredb,        false,  false, "restore the database from a file.", "<file> [dbname]"},
6533         { "dumpdbbackup",    control_dumpdbbackup,      false,  true,  "dump database backup from a file.", "<file>"},
6534         { "wipedb",           control_wipedb,        false,     false, "wipe the contents of a database.", "<dbname|dbid>"},
6535         { "recmaster",        control_recmaster,        true,   false, "show the pnn for the recovery master."},
6536         { "scriptstatus",     control_scriptstatus,     true,   false, "show the status of the monitoring scripts (or all scripts)", "[all]"},
6537         { "enablescript",     control_enablescript,  true,      false, "enable an eventscript", "<script>"},
6538         { "disablescript",    control_disablescript,  true,     false, "disable an eventscript", "<script>"},
6539         { "natgwlist",        control_natgwlist,        true,   false, "show the nodes belonging to this natgw configuration"},
6540         { "xpnn",             control_xpnn,             false,  true,  "find the pnn of the local node without talking to the daemon (unreliable)" },
6541         { "getreclock",       control_getreclock,       true,   false, "Show the reclock file of a node"},
6542         { "setreclock",       control_setreclock,       true,   false, "Set/clear the reclock file of a node", "[filename]"},
6543         { "setnatgwstate",    control_setnatgwstate,    false,  false, "Set NATGW state to on/off", "{on|off}"},
6544         { "setlmasterrole",   control_setlmasterrole,   false,  false, "Set LMASTER role to on/off", "{on|off}"},
6545         { "setrecmasterrole", control_setrecmasterrole, false,  false, "Set RECMASTER role to on/off", "{on|off}"},
6546         { "setdbprio",        control_setdbprio,        false,  false, "Set DB priority", "<dbname|dbid> <prio:1-3>"},
6547         { "getdbprio",        control_getdbprio,        false,  false, "Get DB priority", "<dbname|dbid>"},
6548         { "setdbreadonly",    control_setdbreadonly,    false,  false, "Set DB readonly capable", "<dbname|dbid>"},
6549         { "setdbsticky",      control_setdbsticky,      false,  false, "Set DB sticky-records capable", "<dbname|dbid>"},
6550         { "msglisten",        control_msglisten,        false,  false, "Listen on a srvid port for messages", "<msg srvid>"},
6551         { "msgsend",          control_msgsend,  false,  false, "Send a message to srvid", "<srvid> <message>"},
6552         { "pfetch",          control_pfetch,            false,  false,  "fetch a record from a persistent database", "<dbname|dbid> <key> [<file>]" },
6553         { "pstore",          control_pstore,            false,  false,  "write a record to a persistent database", "<dbname|dbid> <key> <file containing record>" },
6554         { "pdelete",         control_pdelete,           false,  false,  "delete a record from a persistent database", "<dbname|dbid> <key>" },
6555         { "ptrans",          control_ptrans,            false,  false,  "update a persistent database (from stdin)", "<dbname|dbid>" },
6556         { "tfetch",          control_tfetch,            false,  true,  "fetch a record from a [c]tdb-file [-v]", "<tdb-file> <key> [<file>]" },
6557         { "tstore",          control_tstore,            false,  true,  "store a record (including ltdb header)", "<tdb-file> <key> <data> [<rsn> <dmaster> <flags>]" },
6558         { "readkey",         control_readkey,           true,   false,  "read the content off a database key", "<dbname|dbid> <key>" },
6559         { "writekey",        control_writekey,          true,   false,  "write to a database key", "<dbname|dbid> <key> <value>" },
6560         { "checktcpport",    control_chktcpport,        false,  true,  "check if a service is bound to a specific tcp port or not", "<port>" },
6561         { "rebalancenode",     control_rebalancenode,   false,  false, "mark nodes as forced IP rebalancing targets", "[<pnn-list>]"},
6562         { "getdbseqnum",     control_getdbseqnum,       false,  false, "get the sequence number off a database", "<dbname|dbid>" },
6563         { "nodestatus",      control_nodestatus,        true,   false,  "show and return node status", "[<pnn-list>]" },
6564         { "dbstatistics",    control_dbstatistics,      false,  false, "show db statistics", "<dbname|dbid>" },
6565         { "reloadips",       control_reloadips,         false,  false, "reload the public addresses file on specified nodes" , "[<pnn-list>]" },
6566         { "ipiface",         control_ipiface,           false,  true,  "Find which interface an ip address is hosted on", "<ip>" },
6567 };
6568
6569 /*
6570   show usage message
6571  */
6572 static void usage(void)
6573 {
6574         int i;
6575         printf(
6576 "Usage: ctdb [options] <control>\n" \
6577 "Options:\n" \
6578 "   -n <node>          choose node number, or 'all' (defaults to local node)\n"
6579 "   -Y                 generate machine readable output\n"
6580 "   -x <char>          specify delimiter for machine readable output\n"
6581 "   -v                 generate verbose output\n"
6582 "   -t <timelimit>     set timelimit for control in seconds (default %u)\n", options.timelimit);
6583         printf("Controls:\n");
6584         for (i=0;i<ARRAY_SIZE(ctdb_commands);i++) {
6585                 printf("  %-15s %-27s  %s\n", 
6586                        ctdb_commands[i].name, 
6587                        ctdb_commands[i].args?ctdb_commands[i].args:"",
6588                        ctdb_commands[i].msg);
6589         }
6590         exit(1);
6591 }
6592
6593
6594 static void ctdb_alarm(int sig)
6595 {
6596         printf("Maximum runtime exceeded - exiting\n");
6597         _exit(ERR_TIMEOUT);
6598 }
6599
6600 /*
6601   main program
6602 */
6603 int main(int argc, const char *argv[])
6604 {
6605         struct ctdb_context *ctdb;
6606         char *nodestring = NULL;
6607         int machineparsable = 0;
6608         struct poptOption popt_options[] = {
6609                 POPT_AUTOHELP
6610                 POPT_CTDB_CMDLINE
6611                 { "timelimit", 't', POPT_ARG_INT, &options.timelimit, 0, "timelimit", "integer" },
6612                 { "node",      'n', POPT_ARG_STRING, &nodestring, 0, "node", "integer|all" },
6613                 { "machinereadable", 'Y', POPT_ARG_NONE, &options.machinereadable, 0, "enable machine readable output", NULL },
6614                 { NULL, 'x', POPT_ARG_STRING, &options.machineseparator, 0, "specify separator for machine readable output", "char" },
6615                 { NULL, 'X', POPT_ARG_NONE, &machineparsable, 0, "enable machine parsable output with separator |", NULL },
6616                 { "verbose",    'v', POPT_ARG_NONE, &options.verbose, 0, "enable verbose output", NULL },
6617                 { "maxruntime", 'T', POPT_ARG_INT, &options.maxruntime, 0, "die if runtime exceeds this limit (in seconds)", "integer" },
6618                 { "print-emptyrecords", 0, POPT_ARG_NONE, &options.printemptyrecords, 0, "print the empty records when dumping databases (catdb, cattdb, dumpdbbackup)", NULL },
6619                 { "print-datasize", 0, POPT_ARG_NONE, &options.printdatasize, 0, "do not print record data when dumping databases, only the data size", NULL },
6620                 { "print-lmaster", 0, POPT_ARG_NONE, &options.printlmaster, 0, "print the record's lmaster in catdb", NULL },
6621                 { "print-hash", 0, POPT_ARG_NONE, &options.printhash, 0, "print the record's hash when dumping databases", NULL },
6622                 { "print-recordflags", 0, POPT_ARG_NONE, &options.printrecordflags, 0, "print the record flags in catdb and dumpdbbackup", NULL },
6623                 POPT_TABLEEND
6624         };
6625         int opt;
6626         const char **extra_argv;
6627         int extra_argc = 0;
6628         int ret=-1, i;
6629         poptContext pc;
6630         struct event_context *ev;
6631         const char *control;
6632
6633         setlinebuf(stdout);
6634         
6635         /* set some defaults */
6636         options.maxruntime = 0;
6637         options.timelimit = 10;
6638         options.pnn = CTDB_CURRENT_NODE;
6639
6640         pc = poptGetContext(argv[0], argc, argv, popt_options, POPT_CONTEXT_KEEP_FIRST);
6641
6642         while ((opt = poptGetNextOpt(pc)) != -1) {
6643                 switch (opt) {
6644                 default:
6645                         DEBUG(DEBUG_ERR, ("Invalid option %s: %s\n", 
6646                                 poptBadOption(pc, 0), poptStrerror(opt)));
6647                         exit(1);
6648                 }
6649         }
6650
6651         /* setup the remaining options for the main program to use */
6652         extra_argv = poptGetArgs(pc);
6653         if (extra_argv) {
6654                 extra_argv++;
6655                 while (extra_argv[extra_argc]) extra_argc++;
6656         }
6657
6658         if (extra_argc < 1) {
6659                 usage();
6660         }
6661
6662         if (options.maxruntime == 0) {
6663                 const char *ctdb_timeout;
6664                 ctdb_timeout = getenv("CTDB_TIMEOUT");
6665                 if (ctdb_timeout != NULL) {
6666                         options.maxruntime = strtoul(ctdb_timeout, NULL, 0);
6667                 } else {
6668                         /* default timeout is 120 seconds */
6669                         options.maxruntime = 120;
6670                 }
6671         }
6672
6673         if (machineparsable) {
6674                 options.machineseparator = "|";
6675         }
6676         if (options.machineseparator != NULL) {
6677                 if (strlen(options.machineseparator) != 1) {
6678                         printf("Invalid separator \"%s\" - "
6679                                "must be single character\n",
6680                                options.machineseparator);
6681                         exit(1);
6682                 }
6683
6684                 /* -x implies -Y */
6685                 options.machinereadable = true;
6686         } else if (options.machinereadable) {
6687                 options.machineseparator = ":";
6688         }
6689
6690         signal(SIGALRM, ctdb_alarm);
6691         alarm(options.maxruntime);
6692
6693         control = extra_argv[0];
6694
6695         /* Default value for CTDB_BASE - don't override */
6696         setenv("CTDB_BASE", CTDB_ETCDIR, 0);
6697
6698         ev = event_context_init(NULL);
6699         if (!ev) {
6700                 DEBUG(DEBUG_ERR, ("Failed to initialize event system\n"));
6701                 exit(1);
6702         }
6703
6704         for (i=0;i<ARRAY_SIZE(ctdb_commands);i++) {
6705                 if (strcmp(control, ctdb_commands[i].name) == 0) {
6706                         break;
6707                 }
6708         }
6709
6710         if (i == ARRAY_SIZE(ctdb_commands)) {
6711                 DEBUG(DEBUG_ERR, ("Unknown control '%s'\n", control));
6712                 exit(1);
6713         }
6714
6715         if (ctdb_commands[i].without_daemon == true) {
6716                 if (nodestring != NULL) {
6717                         DEBUG(DEBUG_ERR, ("Can't specify node(s) with \"ctdb %s\"\n", control));
6718                         exit(1);
6719                 }
6720                 return ctdb_commands[i].fn(NULL, extra_argc-1, extra_argv+1);
6721         }
6722
6723         /* initialise ctdb */
6724         ctdb = ctdb_cmdline_client(ev, TIMELIMIT());
6725
6726         if (ctdb == NULL) {
6727                 uint32_t pnn;
6728                 DEBUG(DEBUG_ERR, ("Failed to init ctdb\n"));
6729
6730                 pnn = find_node_xpnn();
6731                 if (pnn == -1) {
6732                         DEBUG(DEBUG_ERR,
6733                               ("Is this node part of a CTDB cluster?\n"));
6734                 }
6735                 exit(1);
6736         }
6737
6738         /* setup the node number(s) to contact */
6739         if (!parse_nodestring(ctdb, ctdb, nodestring, CTDB_CURRENT_NODE, false,
6740                               &options.nodes, &options.pnn)) {
6741                 usage();
6742         }
6743
6744         if (options.pnn == CTDB_CURRENT_NODE) {
6745                 options.pnn = options.nodes[0];
6746         }
6747
6748         if (ctdb_commands[i].auto_all && 
6749             ((options.pnn == CTDB_BROADCAST_ALL) ||
6750              (options.pnn == CTDB_MULTICAST))) {
6751                 int j;
6752
6753                 ret = 0;
6754                 for (j = 0; j < talloc_array_length(options.nodes); j++) {
6755                         options.pnn = options.nodes[j];
6756                         ret |= ctdb_commands[i].fn(ctdb, extra_argc-1, extra_argv+1);
6757                 }
6758         } else {
6759                 ret = ctdb_commands[i].fn(ctdb, extra_argc-1, extra_argv+1);
6760         }
6761
6762         talloc_free(ctdb);
6763         talloc_free(ev);
6764         (void)poptFreeContext(pc);
6765
6766         return ret;
6767
6768 }