ctdb-tools-ctdb: Add and use function filter_nodemap_by_natgw_nodes()
[samba.git] / ctdb / tools / ctdb.c
1 /* 
2    ctdb control tool
3
4    Copyright (C) Andrew Tridgell  2007
5    Copyright (C) Ronnie Sahlberg  2007
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program; if not, see <http://www.gnu.org/licenses/>.
19 */
20
21 #include "includes.h"
22 #include "system/time.h"
23 #include "system/filesys.h"
24 #include "system/network.h"
25 #include "system/locale.h"
26 #include "popt.h"
27 #include "cmdline.h"
28 #include "../include/ctdb_version.h"
29 #include "../include/ctdb_client.h"
30 #include "../include/ctdb_private.h"
31 #include "../common/rb_tree.h"
32 #include "db_wrap.h"
33
34 #define ERR_TIMEOUT     20      /* timed out trying to reach node */
35 #define ERR_NONODE      21      /* node does not exist */
36 #define ERR_DISNODE     22      /* node is disconnected */
37
38 static void usage(void);
39
40 static struct {
41         int timelimit;
42         uint32_t pnn;
43         uint32_t *nodes;
44         int machinereadable;
45         int verbose;
46         int maxruntime;
47         int printemptyrecords;
48         int printdatasize;
49         int printlmaster;
50         int printhash;
51         int printrecordflags;
52 } options;
53
54 #define LONGTIMEOUT options.timelimit*10
55
56 #define TIMELIMIT() timeval_current_ofs(options.timelimit, 0)
57 #define LONGTIMELIMIT() timeval_current_ofs(LONGTIMEOUT, 0)
58
59 static int control_version(struct ctdb_context *ctdb, int argc, const char **argv)
60 {
61         printf("CTDB version: %s\n", CTDB_VERSION_STRING);
62         return 0;
63 }
64
65 #define CTDB_NOMEM_ABORT(p) do { if (!(p)) {                            \
66                 DEBUG(DEBUG_ALERT,("ctdb fatal error: %s\n",            \
67                                    "Out of memory in " __location__ )); \
68                 abort();                                                \
69         }} while (0)
70
71 static uint32_t getpnn(struct ctdb_context *ctdb)
72 {
73         if ((options.pnn == CTDB_BROADCAST_ALL) ||
74             (options.pnn == CTDB_MULTICAST)) {
75                 DEBUG(DEBUG_ERR,
76                       ("Cannot get PNN for node %u\n", options.pnn));
77                 exit(1);
78         }
79
80         if (options.pnn == CTDB_CURRENT_NODE) {
81                 return ctdb_get_pnn(ctdb);
82         } else {
83                 return options.pnn;
84         }
85 }
86
87 static void assert_single_node_only(void)
88 {
89         if ((options.pnn == CTDB_BROADCAST_ALL) ||
90             (options.pnn == CTDB_MULTICAST)) {
91                 DEBUG(DEBUG_ERR,
92                       ("This control can not be applied to multiple PNNs\n"));
93                 exit(1);
94         }
95 }
96
97 /* Pretty print the flags to a static buffer in human-readable format.
98  * This never returns NULL!
99  */
100 static const char *pretty_print_flags(uint32_t flags)
101 {
102         int j;
103         static const struct {
104                 uint32_t flag;
105                 const char *name;
106         } flag_names[] = {
107                 { NODE_FLAGS_DISCONNECTED,          "DISCONNECTED" },
108                 { NODE_FLAGS_PERMANENTLY_DISABLED,  "DISABLED" },
109                 { NODE_FLAGS_BANNED,                "BANNED" },
110                 { NODE_FLAGS_UNHEALTHY,             "UNHEALTHY" },
111                 { NODE_FLAGS_DELETED,               "DELETED" },
112                 { NODE_FLAGS_STOPPED,               "STOPPED" },
113                 { NODE_FLAGS_INACTIVE,              "INACTIVE" },
114         };
115         static char flags_str[512]; /* Big enough to contain all flag names */
116
117         flags_str[0] = '\0';
118         for (j=0;j<ARRAY_SIZE(flag_names);j++) {
119                 if (flags & flag_names[j].flag) {
120                         if (flags_str[0] == '\0') {
121                                 (void) strcpy(flags_str, flag_names[j].name);
122                         } else {
123                                 (void) strncat(flags_str, "|", sizeof(flags_str)-1);
124                                 (void) strncat(flags_str, flag_names[j].name,
125                                                sizeof(flags_str)-1);
126                         }
127                 }
128         }
129         if (flags_str[0] == '\0') {
130                 (void) strcpy(flags_str, "OK");
131         }
132
133         return flags_str;
134 }
135
136 static int h2i(char h)
137 {
138         if (h >= 'a' && h <= 'f') return h - 'a' + 10;
139         if (h >= 'A' && h <= 'F') return h - 'f' + 10;
140         return h - '0';
141 }
142
143 static TDB_DATA hextodata(TALLOC_CTX *mem_ctx, const char *str)
144 {
145         int i, len;
146         TDB_DATA key = {NULL, 0};
147
148         len = strlen(str);
149         if (len & 0x01) {
150                 DEBUG(DEBUG_ERR,("Key specified with odd number of hexadecimal digits\n"));
151                 return key;
152         }
153
154         key.dsize = len>>1;
155         key.dptr  = talloc_size(mem_ctx, key.dsize);
156
157         for (i=0; i < len/2; i++) {
158                 key.dptr[i] = h2i(str[i*2]) << 4 | h2i(str[i*2+1]);
159         }
160         return key;
161 }
162
163 /* Parse a nodestring.  Parameter dd_ok controls what happens to nodes
164  * that are disconnected or deleted.  If dd_ok is true those nodes are
165  * included in the output list of nodes.  If dd_ok is false, those
166  * nodes are filtered from the "all" case and cause an error if
167  * explicitly specified.
168  */
169 static bool parse_nodestring(struct ctdb_context *ctdb,
170                              TALLOC_CTX *mem_ctx,
171                              const char * nodestring,
172                              uint32_t current_pnn,
173                              bool dd_ok,
174                              uint32_t **nodes,
175                              uint32_t *pnn_mode)
176 {
177         TALLOC_CTX *tmp_ctx = talloc_new(mem_ctx);
178         int n;
179         uint32_t i;
180         struct ctdb_node_map *nodemap;
181         int ret;
182
183         *nodes = NULL;
184
185         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
186         if (ret != 0) {
187                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
188                 talloc_free(tmp_ctx);
189                 exit(10);
190         }
191
192         if (nodestring != NULL) {
193                 *nodes = talloc_array(mem_ctx, uint32_t, 0);
194                 if (*nodes == NULL) {
195                         goto failed;
196                 }
197
198                 n = 0;
199
200                 if (strcmp(nodestring, "all") == 0) {
201                         *pnn_mode = CTDB_BROADCAST_ALL;
202
203                         /* all */
204                         for (i = 0; i < nodemap->num; i++) {
205                                 if ((nodemap->nodes[i].flags &
206                                      (NODE_FLAGS_DISCONNECTED |
207                                       NODE_FLAGS_DELETED)) && !dd_ok) {
208                                         continue;
209                                 }
210                                 *nodes = talloc_realloc(mem_ctx, *nodes,
211                                                         uint32_t, n+1);
212                                 if (*nodes == NULL) {
213                                         goto failed;
214                                 }
215                                 (*nodes)[n] = i;
216                                 n++;
217                         }
218                 } else {
219                         /* x{,y...} */
220                         char *ns, *tok;
221
222                         ns = talloc_strdup(tmp_ctx, nodestring);
223                         tok = strtok(ns, ",");
224                         while (tok != NULL) {
225                                 uint32_t pnn;
226                                 char *endptr;
227                                 i = (uint32_t)strtoul(tok, &endptr, 0);
228                                 if (i == 0 && tok == endptr) {
229                                         DEBUG(DEBUG_ERR,
230                                               ("Invalid node %s\n", tok));
231                                         talloc_free(tmp_ctx);
232                                         exit(ERR_NONODE);
233                                 }
234                                 if (i >= nodemap->num) {
235                                         DEBUG(DEBUG_ERR, ("Node %u does not exist\n", i));
236                                         talloc_free(tmp_ctx);
237                                         exit(ERR_NONODE);
238                                 }
239                                 if ((nodemap->nodes[i].flags & 
240                                      (NODE_FLAGS_DISCONNECTED |
241                                       NODE_FLAGS_DELETED)) && !dd_ok) {
242                                         DEBUG(DEBUG_ERR, ("Node %u has status %s\n", i, pretty_print_flags(nodemap->nodes[i].flags)));
243                                         talloc_free(tmp_ctx);
244                                         exit(ERR_DISNODE);
245                                 }
246                                 if ((pnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), i)) < 0) {
247                                         DEBUG(DEBUG_ERR, ("Can not access node %u. Node is not operational.\n", i));
248                                         talloc_free(tmp_ctx);
249                                         exit(10);
250                                 }
251
252                                 *nodes = talloc_realloc(mem_ctx, *nodes,
253                                                         uint32_t, n+1);
254                                 if (*nodes == NULL) {
255                                         goto failed;
256                                 }
257
258                                 (*nodes)[n] = i;
259                                 n++;
260
261                                 tok = strtok(NULL, ",");
262                         }
263                         talloc_free(ns);
264
265                         if (n == 1) {
266                                 *pnn_mode = (*nodes)[0];
267                         } else {
268                                 *pnn_mode = CTDB_MULTICAST;
269                         }
270                 }
271         } else {
272                 /* default - no nodes specified */
273                 *nodes = talloc_array(mem_ctx, uint32_t, 1);
274                 if (*nodes == NULL) {
275                         goto failed;
276                 }
277                 *pnn_mode = CTDB_CURRENT_NODE;
278
279                 if (((*nodes)[0] = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), current_pnn)) < 0) {
280                         goto failed;
281                 }
282         }
283
284         talloc_free(tmp_ctx);
285         return true;
286
287 failed:
288         talloc_free(tmp_ctx);
289         return false;
290 }
291
292 /*
293  check if a database exists
294 */
295 static bool db_exists(struct ctdb_context *ctdb, const char *dbarg,
296                       uint32_t *dbid, const char **dbname, uint8_t *flags)
297 {
298         int i, ret;
299         struct ctdb_dbid_map *dbmap=NULL;
300         bool dbid_given = false, found = false;
301         uint32_t id;
302         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
303         const char *name;
304
305         ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &dbmap);
306         if (ret != 0) {
307                 DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n", options.pnn));
308                 goto fail;
309         }
310
311         if (strncmp(dbarg, "0x", 2) == 0) {
312                 id = strtoul(dbarg, NULL, 0);
313                 dbid_given = true;
314         }
315
316         for(i=0; i<dbmap->num; i++) {
317                 if (dbid_given) {
318                         if (id == dbmap->dbs[i].dbid) {
319                                 found = true;
320                                 break;
321                         }
322                 } else {
323                         ret = ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, tmp_ctx, &name);
324                         if (ret != 0) {
325                                 DEBUG(DEBUG_ERR, ("Unable to get dbname from dbid %u\n", dbmap->dbs[i].dbid));
326                                 goto fail;
327                         }
328
329                         if (strcmp(name, dbarg) == 0) {
330                                 id = dbmap->dbs[i].dbid;
331                                 found = true;
332                                 break;
333                         }
334                 }
335         }
336
337         if (found && dbid_given && dbname != NULL) {
338                 ret = ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, tmp_ctx, &name);
339                 if (ret != 0) {
340                         DEBUG(DEBUG_ERR, ("Unable to get dbname from dbid %u\n", dbmap->dbs[i].dbid));
341                         found = false;
342                         goto fail;
343                 }
344         }
345
346         if (found) {
347                 if (dbid) *dbid = id;
348                 if (dbname) *dbname = talloc_strdup(ctdb, name);
349                 if (flags) *flags = dbmap->dbs[i].flags;
350         } else {
351                 DEBUG(DEBUG_ERR,("No database matching '%s' found\n", dbarg));
352         }
353
354 fail:
355         talloc_free(tmp_ctx);
356         return found;
357 }
358
359 /*
360   see if a process exists
361  */
362 static int control_process_exists(struct ctdb_context *ctdb, int argc, const char **argv)
363 {
364         uint32_t pnn, pid;
365         int ret;
366         if (argc < 1) {
367                 usage();
368         }
369
370         if (sscanf(argv[0], "%u:%u", &pnn, &pid) != 2) {
371                 DEBUG(DEBUG_ERR, ("Badly formed pnn:pid\n"));
372                 return -1;
373         }
374
375         ret = ctdb_ctrl_process_exists(ctdb, pnn, pid);
376         if (ret == 0) {
377                 printf("%u:%u exists\n", pnn, pid);
378         } else {
379                 printf("%u:%u does not exist\n", pnn, pid);
380         }
381         return ret;
382 }
383
384 /*
385   display statistics structure
386  */
387 static void show_statistics(struct ctdb_statistics *s, int show_header)
388 {
389         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
390         int i;
391         const char *prefix=NULL;
392         int preflen=0;
393         int tmp, days, hours, minutes, seconds;
394         const struct {
395                 const char *name;
396                 uint32_t offset;
397         } fields[] = {
398 #define STATISTICS_FIELD(n) { #n, offsetof(struct ctdb_statistics, n) }
399                 STATISTICS_FIELD(num_clients),
400                 STATISTICS_FIELD(frozen),
401                 STATISTICS_FIELD(recovering),
402                 STATISTICS_FIELD(num_recoveries),
403                 STATISTICS_FIELD(client_packets_sent),
404                 STATISTICS_FIELD(client_packets_recv),
405                 STATISTICS_FIELD(node_packets_sent),
406                 STATISTICS_FIELD(node_packets_recv),
407                 STATISTICS_FIELD(keepalive_packets_sent),
408                 STATISTICS_FIELD(keepalive_packets_recv),
409                 STATISTICS_FIELD(node.req_call),
410                 STATISTICS_FIELD(node.reply_call),
411                 STATISTICS_FIELD(node.req_dmaster),
412                 STATISTICS_FIELD(node.reply_dmaster),
413                 STATISTICS_FIELD(node.reply_error),
414                 STATISTICS_FIELD(node.req_message),
415                 STATISTICS_FIELD(node.req_control),
416                 STATISTICS_FIELD(node.reply_control),
417                 STATISTICS_FIELD(client.req_call),
418                 STATISTICS_FIELD(client.req_message),
419                 STATISTICS_FIELD(client.req_control),
420                 STATISTICS_FIELD(timeouts.call),
421                 STATISTICS_FIELD(timeouts.control),
422                 STATISTICS_FIELD(timeouts.traverse),
423                 STATISTICS_FIELD(locks.num_calls),
424                 STATISTICS_FIELD(locks.num_current),
425                 STATISTICS_FIELD(locks.num_pending),
426                 STATISTICS_FIELD(locks.num_failed),
427                 STATISTICS_FIELD(total_calls),
428                 STATISTICS_FIELD(pending_calls),
429                 STATISTICS_FIELD(childwrite_calls),
430                 STATISTICS_FIELD(pending_childwrite_calls),
431                 STATISTICS_FIELD(memory_used),
432                 STATISTICS_FIELD(max_hop_count),
433                 STATISTICS_FIELD(total_ro_delegations),
434                 STATISTICS_FIELD(total_ro_revokes),
435         };
436         
437         tmp = s->statistics_current_time.tv_sec - s->statistics_start_time.tv_sec;
438         seconds = tmp%60;
439         tmp    /= 60;
440         minutes = tmp%60;
441         tmp    /= 60;
442         hours   = tmp%24;
443         tmp    /= 24;
444         days    = tmp;
445
446         if (options.machinereadable){
447                 if (show_header) {
448                         printf("CTDB version:");
449                         printf("Current time of statistics:");
450                         printf("Statistics collected since:");
451                         for (i=0;i<ARRAY_SIZE(fields);i++) {
452                                 printf("%s:", fields[i].name);
453                         }
454                         printf("num_reclock_ctdbd_latency:");
455                         printf("min_reclock_ctdbd_latency:");
456                         printf("avg_reclock_ctdbd_latency:");
457                         printf("max_reclock_ctdbd_latency:");
458
459                         printf("num_reclock_recd_latency:");
460                         printf("min_reclock_recd_latency:");
461                         printf("avg_reclock_recd_latency:");
462                         printf("max_reclock_recd_latency:");
463
464                         printf("num_call_latency:");
465                         printf("min_call_latency:");
466                         printf("avg_call_latency:");
467                         printf("max_call_latency:");
468
469                         printf("num_lockwait_latency:");
470                         printf("min_lockwait_latency:");
471                         printf("avg_lockwait_latency:");
472                         printf("max_lockwait_latency:");
473
474                         printf("num_childwrite_latency:");
475                         printf("min_childwrite_latency:");
476                         printf("avg_childwrite_latency:");
477                         printf("max_childwrite_latency:");
478                         printf("\n");
479                 }
480                 printf("%d:", CTDB_VERSION);
481                 printf("%d:", (int)s->statistics_current_time.tv_sec);
482                 printf("%d:", (int)s->statistics_start_time.tv_sec);
483                 for (i=0;i<ARRAY_SIZE(fields);i++) {
484                         printf("%d:", *(uint32_t *)(fields[i].offset+(uint8_t *)s));
485                 }
486                 printf("%d:", s->reclock.ctdbd.num);
487                 printf("%.6f:", s->reclock.ctdbd.min);
488                 printf("%.6f:", s->reclock.ctdbd.num?s->reclock.ctdbd.total/s->reclock.ctdbd.num:0.0);
489                 printf("%.6f:", s->reclock.ctdbd.max);
490
491                 printf("%d:", s->reclock.recd.num);
492                 printf("%.6f:", s->reclock.recd.min);
493                 printf("%.6f:", s->reclock.recd.num?s->reclock.recd.total/s->reclock.recd.num:0.0);
494                 printf("%.6f:", s->reclock.recd.max);
495
496                 printf("%d:", s->call_latency.num);
497                 printf("%.6f:", s->call_latency.min);
498                 printf("%.6f:", s->call_latency.num?s->call_latency.total/s->call_latency.num:0.0);
499                 printf("%.6f:", s->call_latency.max);
500
501                 printf("%d:", s->childwrite_latency.num);
502                 printf("%.6f:", s->childwrite_latency.min);
503                 printf("%.6f:", s->childwrite_latency.num?s->childwrite_latency.total/s->childwrite_latency.num:0.0);
504                 printf("%.6f:", s->childwrite_latency.max);
505                 printf("\n");
506         } else {
507                 printf("CTDB version %u\n", CTDB_VERSION);
508                 printf("Current time of statistics  :                %s", ctime(&s->statistics_current_time.tv_sec));
509                 printf("Statistics collected since  : (%03d %02d:%02d:%02d) %s", days, hours, minutes, seconds, ctime(&s->statistics_start_time.tv_sec));
510
511                 for (i=0;i<ARRAY_SIZE(fields);i++) {
512                         if (strchr(fields[i].name, '.')) {
513                                 preflen = strcspn(fields[i].name, ".")+1;
514                                 if (!prefix || strncmp(prefix, fields[i].name, preflen) != 0) {
515                                         prefix = fields[i].name;
516                                         printf(" %*.*s\n", preflen-1, preflen-1, fields[i].name);
517                                 }
518                         } else {
519                                 preflen = 0;
520                         }
521                         printf(" %*s%-22s%*s%10u\n", 
522                                preflen?4:0, "",
523                                fields[i].name+preflen, 
524                                preflen?0:4, "",
525                                *(uint32_t *)(fields[i].offset+(uint8_t *)s));
526                 }
527                 printf(" hop_count_buckets:");
528                 for (i=0;i<MAX_COUNT_BUCKETS;i++) {
529                         printf(" %d", s->hop_count_bucket[i]);
530                 }
531                 printf("\n");
532                 printf(" lock_buckets:");
533                 for (i=0; i<MAX_COUNT_BUCKETS; i++) {
534                         printf(" %d", s->locks.buckets[i]);
535                 }
536                 printf("\n");
537                 printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n", "locks_latency      MIN/AVG/MAX", s->locks.latency.min, s->locks.latency.num?s->locks.latency.total/s->locks.latency.num:0.0, s->locks.latency.max, s->locks.latency.num);
538
539                 printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n", "reclock_ctdbd      MIN/AVG/MAX", s->reclock.ctdbd.min, s->reclock.ctdbd.num?s->reclock.ctdbd.total/s->reclock.ctdbd.num:0.0, s->reclock.ctdbd.max, s->reclock.ctdbd.num);
540
541                 printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n", "reclock_recd       MIN/AVG/MAX", s->reclock.recd.min, s->reclock.recd.num?s->reclock.recd.total/s->reclock.recd.num:0.0, s->reclock.recd.max, s->reclock.recd.num);
542
543                 printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n", "call_latency       MIN/AVG/MAX", s->call_latency.min, s->call_latency.num?s->call_latency.total/s->call_latency.num:0.0, s->call_latency.max, s->call_latency.num);
544                 printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n", "childwrite_latency MIN/AVG/MAX", s->childwrite_latency.min, s->childwrite_latency.num?s->childwrite_latency.total/s->childwrite_latency.num:0.0, s->childwrite_latency.max, s->childwrite_latency.num);
545         }
546
547         talloc_free(tmp_ctx);
548 }
549
550 /*
551   display remote ctdb statistics combined from all nodes
552  */
553 static int control_statistics_all(struct ctdb_context *ctdb)
554 {
555         int ret, i;
556         struct ctdb_statistics statistics;
557         uint32_t *nodes;
558         uint32_t num_nodes;
559
560         nodes = ctdb_get_connected_nodes(ctdb, TIMELIMIT(), ctdb, &num_nodes);
561         CTDB_NO_MEMORY(ctdb, nodes);
562         
563         ZERO_STRUCT(statistics);
564
565         for (i=0;i<num_nodes;i++) {
566                 struct ctdb_statistics s1;
567                 int j;
568                 uint32_t *v1 = (uint32_t *)&s1;
569                 uint32_t *v2 = (uint32_t *)&statistics;
570                 uint32_t num_ints = 
571                         offsetof(struct ctdb_statistics, __last_counter) / sizeof(uint32_t);
572                 ret = ctdb_ctrl_statistics(ctdb, nodes[i], &s1);
573                 if (ret != 0) {
574                         DEBUG(DEBUG_ERR, ("Unable to get statistics from node %u\n", nodes[i]));
575                         return ret;
576                 }
577                 for (j=0;j<num_ints;j++) {
578                         v2[j] += v1[j];
579                 }
580                 statistics.max_hop_count = 
581                         MAX(statistics.max_hop_count, s1.max_hop_count);
582                 statistics.call_latency.max = 
583                         MAX(statistics.call_latency.max, s1.call_latency.max);
584         }
585         talloc_free(nodes);
586         printf("Gathered statistics for %u nodes\n", num_nodes);
587         show_statistics(&statistics, 1);
588         return 0;
589 }
590
591 /*
592   display remote ctdb statistics
593  */
594 static int control_statistics(struct ctdb_context *ctdb, int argc, const char **argv)
595 {
596         int ret;
597         struct ctdb_statistics statistics;
598
599         if (options.pnn == CTDB_BROADCAST_ALL) {
600                 return control_statistics_all(ctdb);
601         }
602
603         ret = ctdb_ctrl_statistics(ctdb, options.pnn, &statistics);
604         if (ret != 0) {
605                 DEBUG(DEBUG_ERR, ("Unable to get statistics from node %u\n", options.pnn));
606                 return ret;
607         }
608         show_statistics(&statistics, 1);
609         return 0;
610 }
611
612
613 /*
614   reset remote ctdb statistics
615  */
616 static int control_statistics_reset(struct ctdb_context *ctdb, int argc, const char **argv)
617 {
618         int ret;
619
620         ret = ctdb_statistics_reset(ctdb, options.pnn);
621         if (ret != 0) {
622                 DEBUG(DEBUG_ERR, ("Unable to reset statistics on node %u\n", options.pnn));
623                 return ret;
624         }
625         return 0;
626 }
627
628
629 /*
630   display remote ctdb rolling statistics
631  */
632 static int control_stats(struct ctdb_context *ctdb, int argc, const char **argv)
633 {
634         int ret;
635         struct ctdb_statistics_wire *stats;
636         int i, num_records = -1;
637
638         assert_single_node_only();
639
640         if (argc ==1) {
641                 num_records = atoi(argv[0]) - 1;
642         }
643
644         ret = ctdb_ctrl_getstathistory(ctdb, TIMELIMIT(), options.pnn, ctdb, &stats);
645         if (ret != 0) {
646                 DEBUG(DEBUG_ERR, ("Unable to get rolling statistics from node %u\n", options.pnn));
647                 return ret;
648         }
649         for (i=0;i<stats->num;i++) {
650                 if (stats->stats[i].statistics_start_time.tv_sec == 0) {
651                         continue;
652                 }
653                 show_statistics(&stats->stats[i], i==0);
654                 if (i == num_records) {
655                         break;
656                 }
657         }
658         return 0;
659 }
660
661
662 /*
663   display remote ctdb db statistics
664  */
665 static int control_dbstatistics(struct ctdb_context *ctdb, int argc, const char **argv)
666 {
667         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
668         struct ctdb_db_statistics *dbstat;
669         int i;
670         uint32_t db_id;
671         int num_hot_keys;
672         int ret;
673
674         if (argc < 1) {
675                 usage();
676         }
677
678         if (!db_exists(ctdb, argv[0], &db_id, NULL, NULL)) {
679                 return -1;
680         }
681
682         ret = ctdb_ctrl_dbstatistics(ctdb, options.pnn, db_id, tmp_ctx, &dbstat);
683         if (ret != 0) {
684                 DEBUG(DEBUG_ERR,("Failed to read db statistics from node\n"));
685                 talloc_free(tmp_ctx);
686                 return -1;
687         }
688
689         printf("DB Statistics: %s\n", argv[0]);
690         printf(" %*s%-22s%*s%10u\n", 0, "", "ro_delegations", 4, "",
691                 dbstat->db_ro_delegations);
692         printf(" %*s%-22s%*s%10u\n", 0, "", "ro_revokes", 4, "",
693                 dbstat->db_ro_delegations);
694         printf(" %s\n", "locks");
695         printf(" %*s%-22s%*s%10u\n", 4, "", "total", 0, "",
696                 dbstat->locks.num_calls);
697         printf(" %*s%-22s%*s%10u\n", 4, "", "failed", 0, "",
698                 dbstat->locks.num_failed);
699         printf(" %*s%-22s%*s%10u\n", 4, "", "current", 0, "",
700                 dbstat->locks.num_current);
701         printf(" %*s%-22s%*s%10u\n", 4, "", "pending", 0, "",
702                 dbstat->locks.num_pending);
703         printf(" %s", "hop_count_buckets:");
704         for (i=0; i<MAX_COUNT_BUCKETS; i++) {
705                 printf(" %d", dbstat->hop_count_bucket[i]);
706         }
707         printf("\n");
708         printf(" %s", "lock_buckets:");
709         for (i=0; i<MAX_COUNT_BUCKETS; i++) {
710                 printf(" %d", dbstat->locks.buckets[i]);
711         }
712         printf("\n");
713         printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n",
714                 "locks_latency      MIN/AVG/MAX",
715                 dbstat->locks.latency.min,
716                 (dbstat->locks.latency.num ?
717                  dbstat->locks.latency.total /dbstat->locks.latency.num :
718                  0.0),
719                 dbstat->locks.latency.max,
720                 dbstat->locks.latency.num);
721         num_hot_keys = 0;
722         for (i=0; i<dbstat->num_hot_keys; i++) {
723                 if (dbstat->hot_keys[i].count > 0) {
724                         num_hot_keys++;
725                 }
726         }
727         dbstat->num_hot_keys = num_hot_keys;
728
729         printf(" Num Hot Keys:     %d\n", dbstat->num_hot_keys);
730         for (i = 0; i < dbstat->num_hot_keys; i++) {
731                 int j;
732                 printf("     Count:%d Key:", dbstat->hot_keys[i].count);
733                 for (j = 0; j < dbstat->hot_keys[i].key.dsize; j++) {
734                         printf("%02x", dbstat->hot_keys[i].key.dptr[j]&0xff);
735                 }
736                 printf("\n");
737         }
738
739         talloc_free(tmp_ctx);
740         return 0;
741 }
742
743 /*
744   display uptime of remote node
745  */
746 static int control_uptime(struct ctdb_context *ctdb, int argc, const char **argv)
747 {
748         int ret;
749         struct ctdb_uptime *uptime = NULL;
750         int tmp, days, hours, minutes, seconds;
751
752         ret = ctdb_ctrl_uptime(ctdb, ctdb, TIMELIMIT(), options.pnn, &uptime);
753         if (ret != 0) {
754                 DEBUG(DEBUG_ERR, ("Unable to get uptime from node %u\n", options.pnn));
755                 return ret;
756         }
757
758         if (options.machinereadable){
759                 printf(":Current Node Time:Ctdb Start Time:Last Recovery/Failover Time:Last Recovery/IPFailover Duration:\n");
760                 printf(":%u:%u:%u:%lf\n",
761                         (unsigned int)uptime->current_time.tv_sec,
762                         (unsigned int)uptime->ctdbd_start_time.tv_sec,
763                         (unsigned int)uptime->last_recovery_finished.tv_sec,
764                         timeval_delta(&uptime->last_recovery_finished,
765                                       &uptime->last_recovery_started)
766                 );
767                 return 0;
768         }
769
770         printf("Current time of node          :                %s", ctime(&uptime->current_time.tv_sec));
771
772         tmp = uptime->current_time.tv_sec - uptime->ctdbd_start_time.tv_sec;
773         seconds = tmp%60;
774         tmp    /= 60;
775         minutes = tmp%60;
776         tmp    /= 60;
777         hours   = tmp%24;
778         tmp    /= 24;
779         days    = tmp;
780         printf("Ctdbd start time              : (%03d %02d:%02d:%02d) %s", days, hours, minutes, seconds, ctime(&uptime->ctdbd_start_time.tv_sec));
781
782         tmp = uptime->current_time.tv_sec - uptime->last_recovery_finished.tv_sec;
783         seconds = tmp%60;
784         tmp    /= 60;
785         minutes = tmp%60;
786         tmp    /= 60;
787         hours   = tmp%24;
788         tmp    /= 24;
789         days    = tmp;
790         printf("Time of last recovery/failover: (%03d %02d:%02d:%02d) %s", days, hours, minutes, seconds, ctime(&uptime->last_recovery_finished.tv_sec));
791         
792         printf("Duration of last recovery/failover: %lf seconds\n",
793                 timeval_delta(&uptime->last_recovery_finished,
794                               &uptime->last_recovery_started));
795
796         return 0;
797 }
798
799 /*
800   show the PNN of the current node
801  */
802 static int control_pnn(struct ctdb_context *ctdb, int argc, const char **argv)
803 {
804         uint32_t mypnn;
805
806         mypnn = getpnn(ctdb);
807
808         printf("PNN:%d\n", mypnn);
809         return 0;
810 }
811
812
813 struct pnn_node {
814         struct pnn_node *next;
815         const char *addr;
816         int pnn;
817 };
818
819 static struct pnn_node *read_nodes_file(TALLOC_CTX *mem_ctx)
820 {
821         const char *nodes_list;
822         int nlines;
823         char **lines;
824         int i, pnn;
825         struct pnn_node *pnn_nodes = NULL;
826         struct pnn_node *pnn_node;
827         struct pnn_node *tmp_node;
828
829         /* read the nodes file */
830         nodes_list = getenv("CTDB_NODES");
831         if (nodes_list == NULL) {
832                 nodes_list = talloc_asprintf(mem_ctx, "%s/nodes",
833                                              getenv("CTDB_BASE"));
834                 if (nodes_list == NULL) {
835                         DEBUG(DEBUG_ALERT,(__location__ " Out of memory\n"));
836                         exit(1);
837                 }
838         }
839         lines = file_lines_load(nodes_list, &nlines, mem_ctx);
840         if (lines == NULL) {
841                 return NULL;
842         }
843         while (nlines > 0 && strcmp(lines[nlines-1], "") == 0) {
844                 nlines--;
845         }
846         for (i=0, pnn=0; i<nlines; i++) {
847                 char *node;
848
849                 node = lines[i];
850                 /* strip leading spaces */
851                 while((*node == ' ') || (*node == '\t')) {
852                         node++;
853                 }
854                 if (*node == '#') {
855                         pnn++;
856                         continue;
857                 }
858                 if (strcmp(node, "") == 0) {
859                         continue;
860                 }
861                 pnn_node = talloc(mem_ctx, struct pnn_node);
862                 pnn_node->pnn = pnn++;
863                 pnn_node->addr = talloc_strdup(pnn_node, node);
864                 pnn_node->next = pnn_nodes;
865                 pnn_nodes = pnn_node;
866         }
867
868         /* swap them around so we return them in incrementing order */
869         pnn_node = pnn_nodes;
870         pnn_nodes = NULL;
871         while (pnn_node) {
872                 tmp_node = pnn_node;
873                 pnn_node = pnn_node->next;
874
875                 tmp_node->next = pnn_nodes;
876                 pnn_nodes = tmp_node;
877         }
878
879         return pnn_nodes;
880 }
881
882 /*
883   show the PNN of the current node
884   discover the pnn by loading the nodes file and try to bind to all
885   addresses one at a time until the ip address is found.
886  */
887 static int control_xpnn(struct ctdb_context *ctdb, int argc, const char **argv)
888 {
889         TALLOC_CTX *mem_ctx = talloc_new(NULL);
890         struct pnn_node *pnn_nodes;
891         struct pnn_node *pnn_node;
892
893         assert_single_node_only();
894
895         pnn_nodes = read_nodes_file(mem_ctx);
896         if (pnn_nodes == NULL) {
897                 DEBUG(DEBUG_ERR,("Failed to read nodes file\n"));
898                 talloc_free(mem_ctx);
899                 return -1;
900         }
901
902         for(pnn_node=pnn_nodes;pnn_node;pnn_node=pnn_node->next) {
903                 ctdb_sock_addr addr;
904
905                 if (parse_ip(pnn_node->addr, NULL, 63999, &addr) == 0) {
906                         DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s' in nodes file\n", pnn_node->addr));
907                         talloc_free(mem_ctx);
908                         return -1;
909                 }
910
911                 if (ctdb_sys_have_ip(&addr)) {
912                         printf("PNN:%d\n", pnn_node->pnn);
913                         talloc_free(mem_ctx);
914                         return 0;
915                 }
916         }
917
918         printf("Failed to detect which PNN this node is\n");
919         talloc_free(mem_ctx);
920         return -1;
921 }
922
923 /* Helpers for ctdb status
924  */
925 static bool is_partially_online(struct ctdb_context *ctdb, struct ctdb_node_and_flags *node)
926 {
927         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
928         int j;
929         bool ret = false;
930
931         if (node->flags == 0) {
932                 struct ctdb_control_get_ifaces *ifaces;
933
934                 if (ctdb_ctrl_get_ifaces(ctdb, TIMELIMIT(), node->pnn,
935                                          tmp_ctx, &ifaces) == 0) {
936                         for (j=0; j < ifaces->num; j++) {
937                                 if (ifaces->ifaces[j].link_state != 0) {
938                                         continue;
939                                 }
940                                 ret = true;
941                                 break;
942                         }
943                 }
944         }
945         talloc_free(tmp_ctx);
946
947         return ret;
948 }
949
950 static void control_status_header_machine(void)
951 {
952         printf(":Node:IP:Disconnected:Banned:Disabled:Unhealthy:Stopped"
953                ":Inactive:PartiallyOnline:ThisNode:\n");
954 }
955
956 static int control_status_1_machine(struct ctdb_context *ctdb, int mypnn,
957                                     struct ctdb_node_and_flags *node)
958 {
959         printf(":%d:%s:%d:%d:%d:%d:%d:%d:%d:%c:\n", node->pnn,
960                ctdb_addr_to_str(&node->addr),
961                !!(node->flags&NODE_FLAGS_DISCONNECTED),
962                !!(node->flags&NODE_FLAGS_BANNED),
963                !!(node->flags&NODE_FLAGS_PERMANENTLY_DISABLED),
964                !!(node->flags&NODE_FLAGS_UNHEALTHY),
965                !!(node->flags&NODE_FLAGS_STOPPED),
966                !!(node->flags&NODE_FLAGS_INACTIVE),
967                is_partially_online(ctdb, node) ? 1 : 0,
968                (node->pnn == mypnn)?'Y':'N');
969
970         return node->flags;
971 }
972
973 static int control_status_1_human(struct ctdb_context *ctdb, int mypnn,
974                                   struct ctdb_node_and_flags *node)
975 {
976        printf("pnn:%d %-16s %s%s\n", node->pnn,
977               ctdb_addr_to_str(&node->addr),
978               is_partially_online(ctdb, node) ? "PARTIALLYONLINE" : pretty_print_flags(node->flags),
979               node->pnn == mypnn?" (THIS NODE)":"");
980
981        return node->flags;
982 }
983
984 /*
985   display remote ctdb status
986  */
987 static int control_status(struct ctdb_context *ctdb, int argc, const char **argv)
988 {
989         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
990         int i;
991         struct ctdb_vnn_map *vnnmap=NULL;
992         struct ctdb_node_map *nodemap=NULL;
993         uint32_t recmode, recmaster, mypnn;
994         int num_deleted_nodes = 0;
995         int ret;
996
997         mypnn = getpnn(ctdb);
998
999         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &nodemap);
1000         if (ret != 0) {
1001                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
1002                 talloc_free(tmp_ctx);
1003                 return -1;
1004         }
1005
1006         if (options.machinereadable) {
1007                 control_status_header_machine();
1008                 for (i=0;i<nodemap->num;i++) {
1009                         if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
1010                                 continue;
1011                         }
1012                         (void) control_status_1_machine(ctdb, mypnn,
1013                                                         &nodemap->nodes[i]);
1014                 }
1015                 talloc_free(tmp_ctx);
1016                 return 0;
1017         }
1018
1019         for (i=0; i<nodemap->num; i++) {
1020                 if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
1021                         num_deleted_nodes++;
1022                 }
1023         }
1024         if (num_deleted_nodes == 0) {
1025                 printf("Number of nodes:%d\n", nodemap->num);
1026         } else {
1027                 printf("Number of nodes:%d (including %d deleted nodes)\n",
1028                        nodemap->num, num_deleted_nodes);
1029         }
1030         for(i=0;i<nodemap->num;i++){
1031                 if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
1032                         continue;
1033                 }
1034                 (void) control_status_1_human(ctdb, mypnn, &nodemap->nodes[i]);
1035         }
1036
1037         ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &vnnmap);
1038         if (ret != 0) {
1039                 DEBUG(DEBUG_ERR, ("Unable to get vnnmap from node %u\n", options.pnn));
1040                 talloc_free(tmp_ctx);
1041                 return -1;
1042         }
1043         if (vnnmap->generation == INVALID_GENERATION) {
1044                 printf("Generation:INVALID\n");
1045         } else {
1046                 printf("Generation:%d\n",vnnmap->generation);
1047         }
1048         printf("Size:%d\n",vnnmap->size);
1049         for(i=0;i<vnnmap->size;i++){
1050                 printf("hash:%d lmaster:%d\n", i, vnnmap->map[i]);
1051         }
1052
1053         ret = ctdb_ctrl_getrecmode(ctdb, tmp_ctx, TIMELIMIT(), options.pnn, &recmode);
1054         if (ret != 0) {
1055                 DEBUG(DEBUG_ERR, ("Unable to get recmode from node %u\n", options.pnn));
1056                 talloc_free(tmp_ctx);
1057                 return -1;
1058         }
1059         printf("Recovery mode:%s (%d)\n",recmode==CTDB_RECOVERY_NORMAL?"NORMAL":"RECOVERY",recmode);
1060
1061         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, TIMELIMIT(), options.pnn, &recmaster);
1062         if (ret != 0) {
1063                 DEBUG(DEBUG_ERR, ("Unable to get recmaster from node %u\n", options.pnn));
1064                 talloc_free(tmp_ctx);
1065                 return -1;
1066         }
1067         printf("Recovery master:%d\n",recmaster);
1068
1069         talloc_free(tmp_ctx);
1070         return 0;
1071 }
1072
1073 static int control_nodestatus(struct ctdb_context *ctdb, int argc, const char **argv)
1074 {
1075         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1076         int i, ret;
1077         struct ctdb_node_map *nodemap=NULL;
1078         uint32_t * nodes;
1079         uint32_t pnn_mode, mypnn;
1080
1081         if (argc > 1) {
1082                 usage();
1083         }
1084
1085         if (!parse_nodestring(ctdb, tmp_ctx, argc == 1 ? argv[0] : NULL,
1086                               options.pnn, true, &nodes, &pnn_mode)) {
1087                 return -1;
1088         }
1089
1090         if (options.machinereadable) {
1091                 control_status_header_machine();
1092         } else if (pnn_mode == CTDB_BROADCAST_ALL) {
1093                 printf("Number of nodes:%d\n", (int) talloc_array_length(nodes));
1094         }
1095
1096         mypnn = getpnn(ctdb);
1097
1098         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &nodemap);
1099         if (ret != 0) {
1100                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
1101                 talloc_free(tmp_ctx);
1102                 return -1;
1103         }
1104
1105         ret = 0;
1106
1107         for (i = 0; i < talloc_array_length(nodes); i++) {
1108                 if (options.machinereadable) {
1109                         ret |= control_status_1_machine(ctdb, mypnn,
1110                                                         &nodemap->nodes[nodes[i]]);
1111                 } else {
1112                         ret |= control_status_1_human(ctdb, mypnn,
1113                                                       &nodemap->nodes[nodes[i]]);
1114                 }
1115         }
1116
1117         talloc_free(tmp_ctx);
1118         return ret;
1119 }
1120
1121 struct natgw_node {
1122         struct natgw_node *next;
1123         const char *addr;
1124 };
1125
1126 /* talloc off the existing nodemap... */
1127 static struct ctdb_node_map *talloc_nodemap(struct ctdb_node_map *nodemap)
1128 {
1129         return talloc_zero_size(nodemap,
1130                                 offsetof(struct ctdb_node_map, nodes) +
1131                                 nodemap->num * sizeof(struct ctdb_node_and_flags));
1132 }
1133
1134 static struct ctdb_node_map *
1135 filter_nodemap_by_natgw_nodes(struct ctdb_context *ctdb,
1136                               struct ctdb_node_map *nodemap,
1137                               struct natgw_node *natgw_nodes)
1138 {
1139         int i;
1140         struct natgw_node *n;
1141         struct ctdb_node_map *ret;
1142
1143         ret = talloc_nodemap(nodemap);
1144         CTDB_NO_MEMORY_NULL(ctdb, ret);
1145
1146         ret->num = 0;
1147
1148         for (i = 0; i < nodemap->num; i++) {
1149                 for(n = natgw_nodes; n != NULL ; n = n->next) {
1150                         if (!strcmp(n->addr,
1151                                     ctdb_addr_to_str(&nodemap->nodes[i].addr))) {
1152                                 break;
1153                         }
1154                 }
1155                 if (n == NULL) {
1156                         continue;
1157                 }
1158
1159                 ret->nodes[ret->num] = nodemap->nodes[i];
1160                 ret->num++;
1161         }
1162
1163         return ret;
1164 }
1165
1166 static struct ctdb_node_map *
1167 filter_nodemap_by_capabilities(struct ctdb_context *ctdb,
1168                                struct ctdb_node_map *nodemap,
1169                                uint32_t required_capabilities)
1170 {
1171         int i;
1172         uint32_t capabilities;
1173         struct ctdb_node_map *ret;
1174
1175         ret = talloc_nodemap(nodemap);
1176         CTDB_NO_MEMORY_NULL(ctdb, ret);
1177
1178         ret->num = 0;
1179
1180         for (i = 0; i < nodemap->num; i++) {
1181                 int res = ctdb_ctrl_getcapabilities(ctdb, TIMELIMIT(),
1182                                                     nodemap->nodes[i].pnn,
1183                                                     &capabilities);
1184                 if (res != 0) {
1185                         DEBUG(DEBUG_ERR, ("Unable to get capabilities from node %u\n",
1186                                           nodemap->nodes[i].pnn));
1187                         talloc_free(ret);
1188                         return NULL;
1189                 }
1190                 if (!(capabilities & required_capabilities)) {
1191                         continue;
1192                 }
1193
1194                 ret->nodes[ret->num] = nodemap->nodes[i];
1195                 ret->num++;
1196         }
1197
1198         return ret;
1199 }
1200
1201 static struct ctdb_node_map *
1202 filter_nodemap_by_flags(struct ctdb_context *ctdb,
1203                         struct ctdb_node_map *nodemap,
1204                         uint32_t flags_mask)
1205 {
1206         int i;
1207         struct ctdb_node_map *ret;
1208
1209         ret = talloc_nodemap(nodemap);
1210         CTDB_NO_MEMORY_NULL(ctdb, ret);
1211
1212         ret->num = 0;
1213
1214         for (i = 0; i < nodemap->num; i++) {
1215                 if (nodemap->nodes[i].flags & flags_mask) {
1216                         continue;
1217                 }
1218
1219                 ret->nodes[ret->num] = nodemap->nodes[i];
1220                 ret->num++;
1221         }
1222
1223         return ret;
1224 }
1225
1226 /*
1227   display the list of nodes belonging to this natgw configuration
1228  */
1229 static int control_natgwlist(struct ctdb_context *ctdb, int argc, const char **argv)
1230 {
1231         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1232         int i, ret;
1233         const char *natgw_list;
1234         int nlines;
1235         char **lines;
1236         struct natgw_node *natgw_nodes = NULL;
1237         struct natgw_node *natgw_node;
1238         struct ctdb_node_map *orig_nodemap=NULL;
1239         struct ctdb_node_map *cnodemap, *nodemap;
1240         uint32_t mypnn, pnn;
1241         const char *ip;
1242
1243         /* When we have some nodes that could be the NATGW, make a
1244          * series of attempts to find the first node that doesn't have
1245          * certain status flags set.
1246          */
1247         uint32_t exclude_flags[] = {
1248                 /* Look for a nice healthy node */
1249                 NODE_FLAGS_DISCONNECTED|NODE_FLAGS_STOPPED|NODE_FLAGS_DELETED|NODE_FLAGS_BANNED|NODE_FLAGS_UNHEALTHY,
1250                 /* If not found, an UNHEALTHY/BANNED node will do */
1251                 NODE_FLAGS_DISCONNECTED|NODE_FLAGS_STOPPED|NODE_FLAGS_DELETED,
1252                 /* If not found, a STOPPED node will do */
1253                 NODE_FLAGS_DISCONNECTED|NODE_FLAGS_DELETED,
1254                 0,
1255         };
1256
1257         /* read the natgw nodes file into a linked list */
1258         natgw_list = getenv("CTDB_NATGW_NODES");
1259         if (natgw_list == NULL) {
1260                 natgw_list = talloc_asprintf(tmp_ctx, "%s/natgw_nodes",
1261                                              getenv("CTDB_BASE"));
1262                 if (natgw_list == NULL) {
1263                         DEBUG(DEBUG_ALERT,(__location__ " Out of memory\n"));
1264                         exit(1);
1265                 }
1266         }
1267         lines = file_lines_load(natgw_list, &nlines, ctdb);
1268         if (lines == NULL) {
1269                 ctdb_set_error(ctdb, "Failed to load natgw node list '%s'\n", natgw_list);
1270                 talloc_free(tmp_ctx);
1271                 return -1;
1272         }
1273         for (i=0;i<nlines;i++) {
1274                 char *node;
1275
1276                 node = lines[i];
1277                 /* strip leading spaces */
1278                 while((*node == ' ') || (*node == '\t')) {
1279                         node++;
1280                 }
1281                 if (*node == '#') {
1282                         continue;
1283                 }
1284                 if (strcmp(node, "") == 0) {
1285                         continue;
1286                 }
1287                 natgw_node = talloc(ctdb, struct natgw_node);
1288                 natgw_node->addr = talloc_strdup(natgw_node, node);
1289                 CTDB_NO_MEMORY(ctdb, natgw_node->addr);
1290                 natgw_node->next = natgw_nodes;
1291                 natgw_nodes = natgw_node;
1292         }
1293
1294         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE,
1295                                    tmp_ctx, &orig_nodemap);
1296         if (ret != 0) {
1297                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node.\n"));
1298                 talloc_free(tmp_ctx);
1299                 return -1;
1300         }
1301
1302         /* Get a nodemap that includes only the nodes in the NATGW
1303          * group */
1304         nodemap = filter_nodemap_by_natgw_nodes(ctdb, orig_nodemap,
1305                                                 natgw_nodes);
1306         if (nodemap == NULL) {
1307                 ret = -1;
1308                 goto done;
1309         }
1310
1311         /* Get a nodemap that includes only the nodes with the NATGW
1312          * capability */
1313         cnodemap = filter_nodemap_by_capabilities(ctdb, nodemap,
1314                                                   CTDB_CAP_NATGW);
1315         if (cnodemap == NULL) {
1316                 ret = -1;
1317                 goto done;
1318         }
1319
1320         ret = 2; /* matches ENOENT */
1321         pnn = -1;
1322         ip = "0.0.0.0";
1323         for (i = 0; exclude_flags[i] != 0; i++) {
1324                 struct ctdb_node_map *t =
1325                         filter_nodemap_by_flags(ctdb, cnodemap,
1326                                                 exclude_flags[i]);
1327                 if (t == NULL) {
1328                         /* No memory */
1329                         ret = -1;
1330                         goto done;
1331                 }
1332                 if (t->num > 0) {
1333                         ret = 0;
1334                         pnn = t->nodes[0].pnn;
1335                         ip = ctdb_addr_to_str(&t->nodes[0].addr);
1336                         break;
1337                 }
1338                 talloc_free(t);
1339         }
1340
1341         if (options.machinereadable) {
1342                 printf(":Node:IP:\n");
1343                 printf(":%d:%s:\n", pnn, ip);
1344         } else {
1345                 printf("%d %s\n", pnn, ip);
1346         }
1347
1348         /* print the pruned list of nodes belonging to this natgw list */
1349         mypnn = getpnn(ctdb);
1350         if (options.machinereadable) {
1351                 control_status_header_machine();
1352         } else {
1353                 printf("Number of nodes:%d\n", nodemap->num);
1354         }
1355         for(i=0;i<nodemap->num;i++){
1356                 if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
1357                         continue;
1358                 }
1359                 if (options.machinereadable) {
1360                         control_status_1_machine(ctdb, mypnn, &(nodemap->nodes[i]));
1361                 } else {
1362                         control_status_1_human(ctdb, mypnn, &(nodemap->nodes[i]));
1363                 }
1364         }
1365
1366 done:
1367         talloc_free(tmp_ctx);
1368         return ret;
1369 }
1370
1371 /*
1372   display the status of the scripts for monitoring (or other events)
1373  */
1374 static int control_one_scriptstatus(struct ctdb_context *ctdb,
1375                                     enum ctdb_eventscript_call type)
1376 {
1377         struct ctdb_scripts_wire *script_status;
1378         int ret, i;
1379
1380         ret = ctdb_ctrl_getscriptstatus(ctdb, TIMELIMIT(), options.pnn, ctdb, type, &script_status);
1381         if (ret != 0) {
1382                 DEBUG(DEBUG_ERR, ("Unable to get script status from node %u\n", options.pnn));
1383                 return ret;
1384         }
1385
1386         if (script_status == NULL) {
1387                 if (!options.machinereadable) {
1388                         printf("%s cycle never run\n",
1389                                ctdb_eventscript_call_names[type]);
1390                 }
1391                 return 0;
1392         }
1393
1394         if (!options.machinereadable) {
1395                 printf("%d scripts were executed last %s cycle\n",
1396                        script_status->num_scripts,
1397                        ctdb_eventscript_call_names[type]);
1398         }
1399         for (i=0; i<script_status->num_scripts; i++) {
1400                 const char *status = NULL;
1401
1402                 switch (script_status->scripts[i].status) {
1403                 case -ETIME:
1404                         status = "TIMEDOUT";
1405                         break;
1406                 case -ENOEXEC:
1407                         status = "DISABLED";
1408                         break;
1409                 case 0:
1410                         status = "OK";
1411                         break;
1412                 default:
1413                         if (script_status->scripts[i].status > 0)
1414                                 status = "ERROR";
1415                         break;
1416                 }
1417                 if (options.machinereadable) {
1418                         printf(":%s:%s:%i:%s:%lu.%06lu:%lu.%06lu:%s:\n",
1419                                ctdb_eventscript_call_names[type],
1420                                script_status->scripts[i].name,
1421                                script_status->scripts[i].status,
1422                                status,
1423                                (long)script_status->scripts[i].start.tv_sec,
1424                                (long)script_status->scripts[i].start.tv_usec,
1425                                (long)script_status->scripts[i].finished.tv_sec,
1426                                (long)script_status->scripts[i].finished.tv_usec,
1427                                script_status->scripts[i].output);
1428                         continue;
1429                 }
1430                 if (status)
1431                         printf("%-20s Status:%s    ",
1432                                script_status->scripts[i].name, status);
1433                 else
1434                         /* Some other error, eg from stat. */
1435                         printf("%-20s Status:CANNOT RUN (%s)",
1436                                script_status->scripts[i].name,
1437                                strerror(-script_status->scripts[i].status));
1438
1439                 if (script_status->scripts[i].status >= 0) {
1440                         printf("Duration:%.3lf ",
1441                         timeval_delta(&script_status->scripts[i].finished,
1442                               &script_status->scripts[i].start));
1443                 }
1444                 if (script_status->scripts[i].status != -ENOEXEC) {
1445                         printf("%s",
1446                                ctime(&script_status->scripts[i].start.tv_sec));
1447                         if (script_status->scripts[i].status != 0) {
1448                                 printf("   OUTPUT:%s\n",
1449                                        script_status->scripts[i].output);
1450                         }
1451                 } else {
1452                         printf("\n");
1453                 }
1454         }
1455         return 0;
1456 }
1457
1458
1459 static int control_scriptstatus(struct ctdb_context *ctdb,
1460                                 int argc, const char **argv)
1461 {
1462         int ret;
1463         enum ctdb_eventscript_call type, min, max;
1464         const char *arg;
1465
1466         if (argc > 1) {
1467                 DEBUG(DEBUG_ERR, ("Unknown arguments to scriptstatus\n"));
1468                 return -1;
1469         }
1470
1471         if (argc == 0)
1472                 arg = ctdb_eventscript_call_names[CTDB_EVENT_MONITOR];
1473         else
1474                 arg = argv[0];
1475
1476         for (type = 0; type < CTDB_EVENT_MAX; type++) {
1477                 if (strcmp(arg, ctdb_eventscript_call_names[type]) == 0) {
1478                         min = type;
1479                         max = type+1;
1480                         break;
1481                 }
1482         }
1483         if (type == CTDB_EVENT_MAX) {
1484                 if (strcmp(arg, "all") == 0) {
1485                         min = 0;
1486                         max = CTDB_EVENT_MAX;
1487                 } else {
1488                         DEBUG(DEBUG_ERR, ("Unknown event type %s\n", argv[0]));
1489                         return -1;
1490                 }
1491         }
1492
1493         if (options.machinereadable) {
1494                 printf(":Type:Name:Code:Status:Start:End:Error Output...:\n");
1495         }
1496
1497         for (type = min; type < max; type++) {
1498                 ret = control_one_scriptstatus(ctdb, type);
1499                 if (ret != 0) {
1500                         return ret;
1501                 }
1502         }
1503
1504         return 0;
1505 }
1506
1507 /*
1508   enable an eventscript
1509  */
1510 static int control_enablescript(struct ctdb_context *ctdb, int argc, const char **argv)
1511 {
1512         int ret;
1513
1514         if (argc < 1) {
1515                 usage();
1516         }
1517
1518         ret = ctdb_ctrl_enablescript(ctdb, TIMELIMIT(), options.pnn, argv[0]);
1519         if (ret != 0) {
1520           DEBUG(DEBUG_ERR, ("Unable to enable script %s on node %u\n", argv[0], options.pnn));
1521                 return ret;
1522         }
1523
1524         return 0;
1525 }
1526
1527 /*
1528   disable an eventscript
1529  */
1530 static int control_disablescript(struct ctdb_context *ctdb, int argc, const char **argv)
1531 {
1532         int ret;
1533
1534         if (argc < 1) {
1535                 usage();
1536         }
1537
1538         ret = ctdb_ctrl_disablescript(ctdb, TIMELIMIT(), options.pnn, argv[0]);
1539         if (ret != 0) {
1540           DEBUG(DEBUG_ERR, ("Unable to disable script %s on node %u\n", argv[0], options.pnn));
1541                 return ret;
1542         }
1543
1544         return 0;
1545 }
1546
1547 /*
1548   display the pnn of the recovery master
1549  */
1550 static int control_recmaster(struct ctdb_context *ctdb, int argc, const char **argv)
1551 {
1552         uint32_t recmaster;
1553         int ret;
1554
1555         ret = ctdb_ctrl_getrecmaster(ctdb, ctdb, TIMELIMIT(), options.pnn, &recmaster);
1556         if (ret != 0) {
1557                 DEBUG(DEBUG_ERR, ("Unable to get recmaster from node %u\n", options.pnn));
1558                 return -1;
1559         }
1560         printf("%d\n",recmaster);
1561
1562         return 0;
1563 }
1564
1565 /*
1566   add a tickle to a public address
1567  */
1568 static int control_add_tickle(struct ctdb_context *ctdb, int argc, const char **argv)
1569 {
1570         struct ctdb_tcp_connection t;
1571         TDB_DATA data;
1572         int ret;
1573
1574         assert_single_node_only();
1575
1576         if (argc < 2) {
1577                 usage();
1578         }
1579
1580         if (parse_ip_port(argv[0], &t.src_addr) == 0) {
1581                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
1582                 return -1;
1583         }
1584         if (parse_ip_port(argv[1], &t.dst_addr) == 0) {
1585                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[1]));
1586                 return -1;
1587         }
1588
1589         data.dptr = (uint8_t *)&t;
1590         data.dsize = sizeof(t);
1591
1592         /* tell all nodes about this tcp connection */
1593         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_TCP_ADD_DELAYED_UPDATE,
1594                            0, data, ctdb, NULL, NULL, NULL, NULL);
1595         if (ret != 0) {
1596                 DEBUG(DEBUG_ERR,("Failed to add tickle\n"));
1597                 return -1;
1598         }
1599         
1600         return 0;
1601 }
1602
1603
1604 /*
1605   delete a tickle from a node
1606  */
1607 static int control_del_tickle(struct ctdb_context *ctdb, int argc, const char **argv)
1608 {
1609         struct ctdb_tcp_connection t;
1610         TDB_DATA data;
1611         int ret;
1612
1613         assert_single_node_only();
1614
1615         if (argc < 2) {
1616                 usage();
1617         }
1618
1619         if (parse_ip_port(argv[0], &t.src_addr) == 0) {
1620                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
1621                 return -1;
1622         }
1623         if (parse_ip_port(argv[1], &t.dst_addr) == 0) {
1624                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[1]));
1625                 return -1;
1626         }
1627
1628         data.dptr = (uint8_t *)&t;
1629         data.dsize = sizeof(t);
1630
1631         /* tell all nodes about this tcp connection */
1632         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_TCP_REMOVE,
1633                            0, data, ctdb, NULL, NULL, NULL, NULL);
1634         if (ret != 0) {
1635                 DEBUG(DEBUG_ERR,("Failed to remove tickle\n"));
1636                 return -1;
1637         }
1638         
1639         return 0;
1640 }
1641
1642
1643 /*
1644   get a list of all tickles for this pnn
1645  */
1646 static int control_get_tickles(struct ctdb_context *ctdb, int argc, const char **argv)
1647 {
1648         struct ctdb_control_tcp_tickle_list *list;
1649         ctdb_sock_addr addr;
1650         int i, ret;
1651         unsigned port = 0;
1652
1653         assert_single_node_only();
1654
1655         if (argc < 1) {
1656                 usage();
1657         }
1658
1659         if (argc == 2) {
1660                 port = atoi(argv[1]);
1661         }
1662
1663         if (parse_ip(argv[0], NULL, 0, &addr) == 0) {
1664                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
1665                 return -1;
1666         }
1667
1668         ret = ctdb_ctrl_get_tcp_tickles(ctdb, TIMELIMIT(), options.pnn, ctdb, &addr, &list);
1669         if (ret == -1) {
1670                 DEBUG(DEBUG_ERR, ("Unable to list tickles\n"));
1671                 return -1;
1672         }
1673
1674         if (options.machinereadable){
1675                 printf(":source ip:port:destination ip:port:\n");
1676                 for (i=0;i<list->tickles.num;i++) {
1677                         if (port && port != ntohs(list->tickles.connections[i].dst_addr.ip.sin_port)) {
1678                                 continue;
1679                         }
1680                         printf(":%s:%u", ctdb_addr_to_str(&list->tickles.connections[i].src_addr), ntohs(list->tickles.connections[i].src_addr.ip.sin_port));
1681                         printf(":%s:%u:\n", ctdb_addr_to_str(&list->tickles.connections[i].dst_addr), ntohs(list->tickles.connections[i].dst_addr.ip.sin_port));
1682                 }
1683         } else {
1684                 printf("Tickles for ip:%s\n", ctdb_addr_to_str(&list->addr));
1685                 printf("Num tickles:%u\n", list->tickles.num);
1686                 for (i=0;i<list->tickles.num;i++) {
1687                         if (port && port != ntohs(list->tickles.connections[i].dst_addr.ip.sin_port)) {
1688                                 continue;
1689                         }
1690                         printf("SRC: %s:%u   ", ctdb_addr_to_str(&list->tickles.connections[i].src_addr), ntohs(list->tickles.connections[i].src_addr.ip.sin_port));
1691                         printf("DST: %s:%u\n", ctdb_addr_to_str(&list->tickles.connections[i].dst_addr), ntohs(list->tickles.connections[i].dst_addr.ip.sin_port));
1692                 }
1693         }
1694
1695         talloc_free(list);
1696         
1697         return 0;
1698 }
1699
1700
1701 static int move_ip(struct ctdb_context *ctdb, ctdb_sock_addr *addr, uint32_t pnn)
1702 {
1703         struct ctdb_all_public_ips *ips;
1704         struct ctdb_public_ip ip;
1705         int i, ret;
1706         uint32_t *nodes;
1707         uint32_t disable_time;
1708         TDB_DATA data;
1709         struct ctdb_node_map *nodemap=NULL;
1710         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1711
1712         disable_time = 30;
1713         data.dptr  = (uint8_t*)&disable_time;
1714         data.dsize = sizeof(disable_time);
1715         ret = ctdb_client_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_DISABLE_IP_CHECK, data);
1716         if (ret != 0) {
1717                 DEBUG(DEBUG_ERR,("Failed to send message to disable ipcheck\n"));
1718                 return -1;
1719         }
1720
1721
1722
1723         /* read the public ip list from the node */
1724         ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), pnn, ctdb, &ips);
1725         if (ret != 0) {
1726                 DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %u\n", pnn));
1727                 talloc_free(tmp_ctx);
1728                 return -1;
1729         }
1730
1731         for (i=0;i<ips->num;i++) {
1732                 if (ctdb_same_ip(addr, &ips->ips[i].addr)) {
1733                         break;
1734                 }
1735         }
1736         if (i==ips->num) {
1737                 DEBUG(DEBUG_ERR, ("Node %u can not host ip address '%s'\n",
1738                         pnn, ctdb_addr_to_str(addr)));
1739                 talloc_free(tmp_ctx);
1740                 return -1;
1741         }
1742
1743         ip.pnn  = pnn;
1744         ip.addr = *addr;
1745
1746         data.dptr  = (uint8_t *)&ip;
1747         data.dsize = sizeof(ip);
1748
1749         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &nodemap);
1750         if (ret != 0) {
1751                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
1752                 talloc_free(tmp_ctx);
1753                 return ret;
1754         }
1755
1756         nodes = list_of_nodes(ctdb, nodemap, tmp_ctx, NODE_FLAGS_INACTIVE, pnn);
1757         ret = ctdb_client_async_control(ctdb, CTDB_CONTROL_RELEASE_IP,
1758                                         nodes, 0,
1759                                         LONGTIMELIMIT(),
1760                                         false, data,
1761                                         NULL, NULL,
1762                                         NULL);
1763         if (ret != 0) {
1764                 DEBUG(DEBUG_ERR,("Failed to release IP on nodes\n"));
1765                 talloc_free(tmp_ctx);
1766                 return -1;
1767         }
1768
1769         ret = ctdb_ctrl_takeover_ip(ctdb, LONGTIMELIMIT(), pnn, &ip);
1770         if (ret != 0) {
1771                 DEBUG(DEBUG_ERR,("Failed to take over IP on node %d\n", pnn));
1772                 talloc_free(tmp_ctx);
1773                 return -1;
1774         }
1775
1776         /* update the recovery daemon so it now knows to expect the new
1777            node assignment for this ip.
1778         */
1779         ret = ctdb_client_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_RECD_UPDATE_IP, data);
1780         if (ret != 0) {
1781                 DEBUG(DEBUG_ERR,("Failed to send message to update the ip on the recovery master.\n"));
1782                 return -1;
1783         }
1784
1785         talloc_free(tmp_ctx);
1786         return 0;
1787 }
1788
1789
1790 /* 
1791  * scans all other nodes and returns a pnn for another node that can host this 
1792  * ip address or -1
1793  */
1794 static int
1795 find_other_host_for_public_ip(struct ctdb_context *ctdb, ctdb_sock_addr *addr)
1796 {
1797         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1798         struct ctdb_all_public_ips *ips;
1799         struct ctdb_node_map *nodemap=NULL;
1800         int i, j, ret;
1801
1802         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1803         if (ret != 0) {
1804                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
1805                 talloc_free(tmp_ctx);
1806                 return ret;
1807         }
1808
1809         for(i=0;i<nodemap->num;i++){
1810                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1811                         continue;
1812                 }
1813                 if (nodemap->nodes[i].pnn == options.pnn) {
1814                         continue;
1815                 }
1816
1817                 /* read the public ip list from this node */
1818                 ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, tmp_ctx, &ips);
1819                 if (ret != 0) {
1820                         DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %u\n", nodemap->nodes[i].pnn));
1821                         return -1;
1822                 }
1823
1824                 for (j=0;j<ips->num;j++) {
1825                         if (ctdb_same_ip(addr, &ips->ips[j].addr)) {
1826                                 talloc_free(tmp_ctx);
1827                                 return nodemap->nodes[i].pnn;
1828                         }
1829                 }
1830                 talloc_free(ips);
1831         }
1832
1833         talloc_free(tmp_ctx);
1834         return -1;
1835 }
1836
1837 /* If pnn is -1 then try to find a node to move IP to... */
1838 static bool try_moveip(struct ctdb_context *ctdb, ctdb_sock_addr *addr, uint32_t pnn)
1839 {
1840         bool pnn_specified = (pnn == -1 ? false : true);
1841         int retries = 0;
1842
1843         while (retries < 5) {
1844                 if (!pnn_specified) {
1845                         pnn = find_other_host_for_public_ip(ctdb, addr);
1846                         if (pnn == -1) {
1847                                 return false;
1848                         }
1849                         DEBUG(DEBUG_NOTICE,
1850                               ("Trying to move public IP to node %u\n", pnn));
1851                 }
1852
1853                 if (move_ip(ctdb, addr, pnn) == 0) {
1854                         return true;
1855                 }
1856
1857                 sleep(3);
1858                 retries++;
1859         }
1860
1861         return false;
1862 }
1863
1864
1865 /*
1866   move/failover an ip address to a specific node
1867  */
1868 static int control_moveip(struct ctdb_context *ctdb, int argc, const char **argv)
1869 {
1870         uint32_t pnn;
1871         ctdb_sock_addr addr;
1872
1873         assert_single_node_only();
1874
1875         if (argc < 2) {
1876                 usage();
1877                 return -1;
1878         }
1879
1880         if (parse_ip(argv[0], NULL, 0, &addr) == 0) {
1881                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
1882                 return -1;
1883         }
1884
1885
1886         if (sscanf(argv[1], "%u", &pnn) != 1) {
1887                 DEBUG(DEBUG_ERR, ("Badly formed pnn\n"));
1888                 return -1;
1889         }
1890
1891         if (!try_moveip(ctdb, &addr, pnn)) {
1892                 DEBUG(DEBUG_ERR,("Failed to move IP to node %d.\n", pnn));
1893                 return -1;
1894         }
1895
1896         return 0;
1897 }
1898
1899 static int rebalance_node(struct ctdb_context *ctdb, uint32_t pnn)
1900 {
1901         TDB_DATA data;
1902
1903         data.dptr  = (uint8_t *)&pnn;
1904         data.dsize = sizeof(uint32_t);
1905         if (ctdb_client_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_REBALANCE_NODE, data) != 0) {
1906                 DEBUG(DEBUG_ERR,
1907                       ("Failed to send message to force node %u to be a rebalancing target\n",
1908                        pnn));
1909                 return -1;
1910         }
1911
1912         return 0;
1913 }
1914
1915
1916 /*
1917   rebalance a node by setting it to allow failback and triggering a
1918   takeover run
1919  */
1920 static int control_rebalancenode(struct ctdb_context *ctdb, int argc, const char **argv)
1921 {
1922         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1923         uint32_t *nodes;
1924         uint32_t pnn_mode;
1925         int i, ret;
1926
1927         assert_single_node_only();
1928
1929         if (argc > 1) {
1930                 usage();
1931         }
1932
1933         /* Determine the nodes where IPs need to be reloaded */
1934         if (!parse_nodestring(ctdb, tmp_ctx, argc == 1 ? argv[0] : NULL,
1935                               options.pnn, true, &nodes, &pnn_mode)) {
1936                 ret = -1;
1937                 goto done;
1938         }
1939
1940         for (i = 0; i < talloc_array_length(nodes); i++) {
1941                 if (!rebalance_node(ctdb, nodes[i])) {
1942                         ret = -1;
1943                 }
1944         }
1945
1946 done:
1947         talloc_free(tmp_ctx);
1948         return ret;
1949 }
1950
1951 static int rebalance_ip(struct ctdb_context *ctdb, ctdb_sock_addr *addr)
1952 {
1953         struct ctdb_public_ip ip;
1954         int ret;
1955         uint32_t *nodes;
1956         uint32_t disable_time;
1957         TDB_DATA data;
1958         struct ctdb_node_map *nodemap=NULL;
1959         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1960
1961         disable_time = 30;
1962         data.dptr  = (uint8_t*)&disable_time;
1963         data.dsize = sizeof(disable_time);
1964         ret = ctdb_client_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_DISABLE_IP_CHECK, data);
1965         if (ret != 0) {
1966                 DEBUG(DEBUG_ERR,("Failed to send message to disable ipcheck\n"));
1967                 return -1;
1968         }
1969
1970         ip.pnn  = -1;
1971         ip.addr = *addr;
1972
1973         data.dptr  = (uint8_t *)&ip;
1974         data.dsize = sizeof(ip);
1975
1976         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &nodemap);
1977         if (ret != 0) {
1978                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
1979                 talloc_free(tmp_ctx);
1980                 return ret;
1981         }
1982
1983         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
1984         ret = ctdb_client_async_control(ctdb, CTDB_CONTROL_RELEASE_IP,
1985                                         nodes, 0,
1986                                         LONGTIMELIMIT(),
1987                                         false, data,
1988                                         NULL, NULL,
1989                                         NULL);
1990         if (ret != 0) {
1991                 DEBUG(DEBUG_ERR,("Failed to release IP on nodes\n"));
1992                 talloc_free(tmp_ctx);
1993                 return -1;
1994         }
1995
1996         talloc_free(tmp_ctx);
1997         return 0;
1998 }
1999
2000 /*
2001   release an ip form all nodes and have it re-assigned by recd
2002  */
2003 static int control_rebalanceip(struct ctdb_context *ctdb, int argc, const char **argv)
2004 {
2005         ctdb_sock_addr addr;
2006
2007         assert_single_node_only();
2008
2009         if (argc < 1) {
2010                 usage();
2011                 return -1;
2012         }
2013
2014         if (parse_ip(argv[0], NULL, 0, &addr) == 0) {
2015                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
2016                 return -1;
2017         }
2018
2019         if (rebalance_ip(ctdb, &addr) != 0) {
2020                 DEBUG(DEBUG_ERR,("Error when trying to reassign ip\n"));
2021                 return -1;
2022         }
2023
2024         return 0;
2025 }
2026
2027 static int getips_store_callback(void *param, void *data)
2028 {
2029         struct ctdb_public_ip *node_ip = (struct ctdb_public_ip *)data;
2030         struct ctdb_all_public_ips *ips = param;
2031         int i;
2032
2033         i = ips->num++;
2034         ips->ips[i].pnn  = node_ip->pnn;
2035         ips->ips[i].addr = node_ip->addr;
2036         return 0;
2037 }
2038
2039 static int getips_count_callback(void *param, void *data)
2040 {
2041         uint32_t *count = param;
2042
2043         (*count)++;
2044         return 0;
2045 }
2046
2047 #define IP_KEYLEN       4
2048 static uint32_t *ip_key(ctdb_sock_addr *ip)
2049 {
2050         static uint32_t key[IP_KEYLEN];
2051
2052         bzero(key, sizeof(key));
2053
2054         switch (ip->sa.sa_family) {
2055         case AF_INET:
2056                 key[0]  = ip->ip.sin_addr.s_addr;
2057                 break;
2058         case AF_INET6: {
2059                 uint32_t *s6_a32 = (uint32_t *)&(ip->ip6.sin6_addr.s6_addr);
2060                 key[0]  = s6_a32[3];
2061                 key[1]  = s6_a32[2];
2062                 key[2]  = s6_a32[1];
2063                 key[3]  = s6_a32[0];
2064                 break;
2065         }
2066         default:
2067                 DEBUG(DEBUG_ERR, (__location__ " ERROR, unknown family passed :%u\n", ip->sa.sa_family));
2068                 return key;
2069         }
2070
2071         return key;
2072 }
2073
2074 static void *add_ip_callback(void *parm, void *data)
2075 {
2076         return parm;
2077 }
2078
2079 static int
2080 control_get_all_public_ips(struct ctdb_context *ctdb, TALLOC_CTX *tmp_ctx, struct ctdb_all_public_ips **ips)
2081 {
2082         struct ctdb_all_public_ips *tmp_ips;
2083         struct ctdb_node_map *nodemap=NULL;
2084         trbt_tree_t *ip_tree;
2085         int i, j, len, ret;
2086         uint32_t count;
2087
2088         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
2089         if (ret != 0) {
2090                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
2091                 return ret;
2092         }
2093
2094         ip_tree = trbt_create(tmp_ctx, 0);
2095
2096         for(i=0;i<nodemap->num;i++){
2097                 if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
2098                         continue;
2099                 }
2100                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
2101                         continue;
2102                 }
2103
2104                 /* read the public ip list from this node */
2105                 ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, tmp_ctx, &tmp_ips);
2106                 if (ret != 0) {
2107                         DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %u\n", nodemap->nodes[i].pnn));
2108                         return -1;
2109                 }
2110         
2111                 for (j=0; j<tmp_ips->num;j++) {
2112                         struct ctdb_public_ip *node_ip;
2113
2114                         node_ip = talloc(tmp_ctx, struct ctdb_public_ip);
2115                         node_ip->pnn  = tmp_ips->ips[j].pnn;
2116                         node_ip->addr = tmp_ips->ips[j].addr;
2117
2118                         trbt_insertarray32_callback(ip_tree,
2119                                 IP_KEYLEN, ip_key(&tmp_ips->ips[j].addr),
2120                                 add_ip_callback,
2121                                 node_ip);
2122                 }
2123                 talloc_free(tmp_ips);
2124         }
2125
2126         /* traverse */
2127         count = 0;
2128         trbt_traversearray32(ip_tree, IP_KEYLEN, getips_count_callback, &count);
2129
2130         len = offsetof(struct ctdb_all_public_ips, ips) + 
2131                 count*sizeof(struct ctdb_public_ip);
2132         tmp_ips = talloc_zero_size(tmp_ctx, len);
2133         trbt_traversearray32(ip_tree, IP_KEYLEN, getips_store_callback, tmp_ips);
2134
2135         *ips = tmp_ips;
2136
2137         return 0;
2138 }
2139
2140
2141 static void ctdb_every_second(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
2142 {
2143         struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
2144
2145         event_add_timed(ctdb->ev, ctdb, 
2146                                 timeval_current_ofs(1, 0),
2147                                 ctdb_every_second, ctdb);
2148 }
2149
2150 struct srvid_reply_handler_data {
2151         bool done;
2152         bool wait_for_all;
2153         uint32_t *nodes;
2154         const char *srvid_str;
2155 };
2156
2157 static void srvid_broadcast_reply_handler(struct ctdb_context *ctdb,
2158                                          uint64_t srvid,
2159                                          TDB_DATA data,
2160                                          void *private_data)
2161 {
2162         struct srvid_reply_handler_data *d =
2163                 (struct srvid_reply_handler_data *)private_data;
2164         int i;
2165         int32_t ret;
2166
2167         if (data.dsize != sizeof(ret)) {
2168                 DEBUG(DEBUG_ERR, (__location__ " Wrong reply size\n"));
2169                 return;
2170         }
2171
2172         /* ret will be a PNN (i.e. >=0) on success, or negative on error */
2173         ret = *(int32_t *)data.dptr;
2174         if (ret < 0) {
2175                 DEBUG(DEBUG_ERR,
2176                       ("%s failed with result %d\n", d->srvid_str, ret));
2177                 return;
2178         }
2179
2180         if (!d->wait_for_all) {
2181                 d->done = true;
2182                 return;
2183         }
2184
2185         /* Wait for all replies */
2186         d->done = true;
2187         for (i = 0; i < talloc_array_length(d->nodes); i++) {
2188                 if (d->nodes[i] == ret) {
2189                         DEBUG(DEBUG_INFO,
2190                               ("%s reply received from node %u\n",
2191                                d->srvid_str, ret));
2192                         d->nodes[i] = -1;
2193                 }
2194                 if (d->nodes[i] != -1) {
2195                         /* Found a node that hasn't yet replied */
2196                         d->done = false;
2197                 }
2198         }
2199 }
2200
2201 /* Broadcast the given SRVID to all connected nodes.  Wait for 1 reply
2202  * or replies from all connected nodes.  arg is the data argument to
2203  * pass in the srvid_request structure - pass 0 if this isn't needed.
2204  */
2205 static int srvid_broadcast(struct ctdb_context *ctdb,
2206                            uint64_t srvid, uint32_t *arg,
2207                            const char *srvid_str, bool wait_for_all)
2208 {
2209         int ret;
2210         TDB_DATA data;
2211         uint32_t pnn;
2212         uint64_t reply_srvid;
2213         struct srvid_request request;
2214         struct srvid_request_data request_data;
2215         struct srvid_reply_handler_data reply_data;
2216         struct timeval tv;
2217
2218         ZERO_STRUCT(request);
2219
2220         /* Time ticks to enable timeouts to be processed */
2221         event_add_timed(ctdb->ev, ctdb, 
2222                                 timeval_current_ofs(1, 0),
2223                                 ctdb_every_second, ctdb);
2224
2225         pnn = ctdb_get_pnn(ctdb);
2226         reply_srvid = getpid();
2227
2228         if (arg == NULL) {
2229                 request.pnn = pnn;
2230                 request.srvid = reply_srvid;
2231
2232                 data.dptr = (uint8_t *)&request;
2233                 data.dsize = sizeof(request);
2234         } else {
2235                 request_data.pnn = pnn;
2236                 request_data.srvid = reply_srvid;
2237                 request_data.data = *arg;
2238
2239                 data.dptr = (uint8_t *)&request_data;
2240                 data.dsize = sizeof(request_data);
2241         }
2242
2243         /* Register message port for reply from recovery master */
2244         ctdb_client_set_message_handler(ctdb, reply_srvid,
2245                                         srvid_broadcast_reply_handler,
2246                                         &reply_data);
2247
2248         reply_data.wait_for_all = wait_for_all;
2249         reply_data.nodes = NULL;
2250         reply_data.srvid_str = srvid_str;
2251
2252 again:
2253         reply_data.done = false;
2254
2255         if (wait_for_all) {
2256                 struct ctdb_node_map *nodemap;
2257
2258                 ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(),
2259                                            CTDB_CURRENT_NODE, ctdb, &nodemap);
2260                 if (ret != 0) {
2261                         DEBUG(DEBUG_ERR,
2262                               ("Unable to get nodemap from current node, try again\n"));
2263                         sleep(1);
2264                         goto again;
2265                 }
2266
2267                 if (reply_data.nodes != NULL) {
2268                         talloc_free(reply_data.nodes);
2269                 }
2270                 reply_data.nodes = list_of_connected_nodes(ctdb, nodemap,
2271                                                            NULL, true);
2272
2273                 talloc_free(nodemap);
2274         }
2275
2276         /* Send to all connected nodes. Only recmaster replies */
2277         ret = ctdb_client_send_message(ctdb, CTDB_BROADCAST_CONNECTED,
2278                                        srvid, data);
2279         if (ret != 0) {
2280                 /* This can only happen if the socket is closed and
2281                  * there's no way to recover from that, so don't try
2282                  * again.
2283                  */
2284                 DEBUG(DEBUG_ERR,
2285                       ("Failed to send %s request to connected nodes\n",
2286                        srvid_str));
2287                 return -1;
2288         }
2289
2290         tv = timeval_current();
2291         /* This loop terminates the reply is received */
2292         while (timeval_elapsed(&tv) < 5.0 && !reply_data.done) {
2293                 event_loop_once(ctdb->ev);
2294         }
2295
2296         if (!reply_data.done) {
2297                 DEBUG(DEBUG_NOTICE,
2298                       ("Still waiting for confirmation of %s\n", srvid_str));
2299                 sleep(1);
2300                 goto again;
2301         }
2302
2303         ctdb_client_remove_message_handler(ctdb, reply_srvid, &reply_data);
2304
2305         talloc_free(reply_data.nodes);
2306
2307         return 0;
2308 }
2309
2310 static int ipreallocate(struct ctdb_context *ctdb)
2311 {
2312         return srvid_broadcast(ctdb, CTDB_SRVID_TAKEOVER_RUN, NULL,
2313                                "IP reallocation", false);
2314 }
2315
2316
2317 static int control_ipreallocate(struct ctdb_context *ctdb, int argc, const char **argv)
2318 {
2319         return ipreallocate(ctdb);
2320 }
2321
2322 /*
2323   add a public ip address to a node
2324  */
2325 static int control_addip(struct ctdb_context *ctdb, int argc, const char **argv)
2326 {
2327         int i, ret;
2328         int len, retries = 0;
2329         unsigned mask;
2330         ctdb_sock_addr addr;
2331         struct ctdb_control_ip_iface *pub;
2332         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2333         struct ctdb_all_public_ips *ips;
2334
2335
2336         if (argc != 2) {
2337                 talloc_free(tmp_ctx);
2338                 usage();
2339         }
2340
2341         if (!parse_ip_mask(argv[0], argv[1], &addr, &mask)) {
2342                 DEBUG(DEBUG_ERR, ("Badly formed ip/mask : %s\n", argv[0]));
2343                 talloc_free(tmp_ctx);
2344                 return -1;
2345         }
2346
2347         /* read the public ip list from the node */
2348         ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &ips);
2349         if (ret != 0) {
2350                 DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %u\n", options.pnn));
2351                 talloc_free(tmp_ctx);
2352                 return -1;
2353         }
2354         for (i=0;i<ips->num;i++) {
2355                 if (ctdb_same_ip(&addr, &ips->ips[i].addr)) {
2356                         DEBUG(DEBUG_ERR,("Can not add ip to node. Node already hosts this ip\n"));
2357                         return 0;
2358                 }
2359         }
2360
2361
2362
2363         /* Dont timeout. This command waits for an ip reallocation
2364            which sometimes can take wuite a while if there has
2365            been a recent recovery
2366         */
2367         alarm(0);
2368
2369         len = offsetof(struct ctdb_control_ip_iface, iface) + strlen(argv[1]) + 1;
2370         pub = talloc_size(tmp_ctx, len); 
2371         CTDB_NO_MEMORY(ctdb, pub);
2372
2373         pub->addr  = addr;
2374         pub->mask  = mask;
2375         pub->len   = strlen(argv[1])+1;
2376         memcpy(&pub->iface[0], argv[1], strlen(argv[1])+1);
2377
2378         do {
2379                 ret = ctdb_ctrl_add_public_ip(ctdb, TIMELIMIT(), options.pnn, pub);
2380                 if (ret != 0) {
2381                         DEBUG(DEBUG_ERR, ("Unable to add public ip to node %u. Wait 3 seconds and try again.\n", options.pnn));
2382                         sleep(3);
2383                         retries++;
2384                 }
2385         } while (retries < 5 && ret != 0);
2386         if (ret != 0) {
2387                 DEBUG(DEBUG_ERR, ("Unable to add public ip to node %u. Giving up.\n", options.pnn));
2388                 talloc_free(tmp_ctx);
2389                 return ret;
2390         }
2391
2392         if (rebalance_node(ctdb, options.pnn) != 0) {
2393                 DEBUG(DEBUG_ERR,("Error when trying to rebalance node\n"));
2394                 return ret;
2395         }
2396
2397         talloc_free(tmp_ctx);
2398         return 0;
2399 }
2400
2401 /*
2402   add a public ip address to a node
2403  */
2404 static int control_ipiface(struct ctdb_context *ctdb, int argc, const char **argv)
2405 {
2406         ctdb_sock_addr addr;
2407
2408         if (argc != 1) {
2409                 usage();
2410         }
2411
2412         if (!parse_ip(argv[0], NULL, 0, &addr)) {
2413                 printf("Badly formed ip : %s\n", argv[0]);
2414                 return -1;
2415         }
2416
2417         printf("IP on interface %s\n", ctdb_sys_find_ifname(&addr));
2418
2419         return 0;
2420 }
2421
2422 static int control_delip(struct ctdb_context *ctdb, int argc, const char **argv);
2423
2424 static int control_delip_all(struct ctdb_context *ctdb, int argc, const char **argv, ctdb_sock_addr *addr)
2425 {
2426         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2427         struct ctdb_node_map *nodemap=NULL;
2428         struct ctdb_all_public_ips *ips;
2429         int ret, i, j;
2430
2431         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
2432         if (ret != 0) {
2433                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from current node\n"));
2434                 return ret;
2435         }
2436
2437         /* remove it from the nodes that are not hosting the ip currently */
2438         for(i=0;i<nodemap->num;i++){
2439                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2440                         continue;
2441                 }
2442                 ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, tmp_ctx, &ips);
2443                 if (ret != 0) {
2444                         DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %d\n", nodemap->nodes[i].pnn));
2445                         continue;
2446                 }
2447
2448                 for (j=0;j<ips->num;j++) {
2449                         if (ctdb_same_ip(addr, &ips->ips[j].addr)) {
2450                                 break;
2451                         }
2452                 }
2453                 if (j==ips->num) {
2454                         continue;
2455                 }
2456
2457                 if (ips->ips[j].pnn == nodemap->nodes[i].pnn) {
2458                         continue;
2459                 }
2460
2461                 options.pnn = nodemap->nodes[i].pnn;
2462                 control_delip(ctdb, argc, argv);
2463         }
2464
2465
2466         /* remove it from every node (also the one hosting it) */
2467         for(i=0;i<nodemap->num;i++){
2468                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2469                         continue;
2470                 }
2471                 ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, tmp_ctx, &ips);
2472                 if (ret != 0) {
2473                         DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %d\n", nodemap->nodes[i].pnn));
2474                         continue;
2475                 }
2476
2477                 for (j=0;j<ips->num;j++) {
2478                         if (ctdb_same_ip(addr, &ips->ips[j].addr)) {
2479                                 break;
2480                         }
2481                 }
2482                 if (j==ips->num) {
2483                         continue;
2484                 }
2485
2486                 options.pnn = nodemap->nodes[i].pnn;
2487                 control_delip(ctdb, argc, argv);
2488         }
2489
2490         talloc_free(tmp_ctx);
2491         return 0;
2492 }
2493         
2494 /*
2495   delete a public ip address from a node
2496  */
2497 static int control_delip(struct ctdb_context *ctdb, int argc, const char **argv)
2498 {
2499         int i, ret;
2500         ctdb_sock_addr addr;
2501         struct ctdb_control_ip_iface pub;
2502         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2503         struct ctdb_all_public_ips *ips;
2504
2505         if (argc != 1) {
2506                 talloc_free(tmp_ctx);
2507                 usage();
2508         }
2509
2510         if (parse_ip(argv[0], NULL, 0, &addr) == 0) {
2511                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
2512                 return -1;
2513         }
2514
2515         if (options.pnn == CTDB_BROADCAST_ALL) {
2516                 return control_delip_all(ctdb, argc, argv, &addr);
2517         }
2518
2519         pub.addr  = addr;
2520         pub.mask  = 0;
2521         pub.len   = 0;
2522
2523         ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &ips);
2524         if (ret != 0) {
2525                 DEBUG(DEBUG_ERR, ("Unable to get public ip list from cluster\n"));
2526                 talloc_free(tmp_ctx);
2527                 return ret;
2528         }
2529         
2530         for (i=0;i<ips->num;i++) {
2531                 if (ctdb_same_ip(&addr, &ips->ips[i].addr)) {
2532                         break;
2533                 }
2534         }
2535
2536         if (i==ips->num) {
2537                 DEBUG(DEBUG_ERR, ("This node does not support this public address '%s'\n",
2538                         ctdb_addr_to_str(&addr)));
2539                 talloc_free(tmp_ctx);
2540                 return -1;
2541         }
2542
2543         /* This is an optimisation.  If this node is hosting the IP
2544          * then try to move it somewhere else without invoking a full
2545          * takeover run.  We don't care if this doesn't work!
2546          */
2547         if (ips->ips[i].pnn == options.pnn) {
2548                 (void) try_moveip(ctdb, &addr, -1);
2549         }
2550
2551         ret = ctdb_ctrl_del_public_ip(ctdb, TIMELIMIT(), options.pnn, &pub);
2552         if (ret != 0) {
2553                 DEBUG(DEBUG_ERR, ("Unable to del public ip from node %u\n", options.pnn));
2554                 talloc_free(tmp_ctx);
2555                 return ret;
2556         }
2557
2558         talloc_free(tmp_ctx);
2559         return 0;
2560 }
2561
2562 static int kill_tcp_from_file(struct ctdb_context *ctdb,
2563                               int argc, const char **argv)
2564 {
2565         struct ctdb_control_killtcp *killtcp;
2566         int max_entries, current, i;
2567         struct timeval timeout;
2568         char line[128], src[128], dst[128];
2569         int linenum;
2570         TDB_DATA data;
2571         struct client_async_data *async_data;
2572         struct ctdb_client_control_state *state;
2573
2574         if (argc != 0) {
2575                 usage();
2576         }
2577
2578         linenum = 1;
2579         killtcp = NULL;
2580         max_entries = 0;
2581         current = 0;
2582         while (!feof(stdin)) {
2583                 if (fgets(line, sizeof(line), stdin) == NULL) {
2584                         continue;
2585                 }
2586
2587                 /* Silently skip empty lines */
2588                 if (line[0] == '\n') {
2589                         continue;
2590                 }
2591
2592                 if (sscanf(line, "%s %s\n", src, dst) != 2) {
2593                         DEBUG(DEBUG_ERR, ("Bad line [%d]: '%s'\n",
2594                                           linenum, line));
2595                         talloc_free(killtcp);
2596                         return -1;
2597                 }
2598
2599                 if (current >= max_entries) {
2600                         max_entries += 1024;
2601                         killtcp = talloc_realloc(ctdb, killtcp,
2602                                                  struct ctdb_control_killtcp,
2603                                                  max_entries);
2604                         CTDB_NO_MEMORY(ctdb, killtcp);
2605                 }
2606
2607                 if (!parse_ip_port(src, &killtcp[current].src_addr)) {
2608                         DEBUG(DEBUG_ERR, ("Bad IP:port on line [%d]: '%s'\n",
2609                                           linenum, src));
2610                         talloc_free(killtcp);
2611                         return -1;
2612                 }
2613
2614                 if (!parse_ip_port(dst, &killtcp[current].dst_addr)) {
2615                         DEBUG(DEBUG_ERR, ("Bad IP:port on line [%d]: '%s'\n",
2616                                           linenum, dst));
2617                         talloc_free(killtcp);
2618                         return -1;
2619                 }
2620
2621                 current++;
2622         }
2623
2624         async_data = talloc_zero(ctdb, struct client_async_data);
2625         if (async_data == NULL) {
2626                 talloc_free(killtcp);
2627                 return -1;
2628         }
2629
2630         for (i = 0; i < current; i++) {
2631
2632                 data.dsize = sizeof(struct ctdb_control_killtcp);
2633                 data.dptr  = (unsigned char *)&killtcp[i];
2634
2635                 timeout = TIMELIMIT();
2636                 state = ctdb_control_send(ctdb, options.pnn, 0,
2637                                           CTDB_CONTROL_KILL_TCP, 0, data,
2638                                           async_data, &timeout, NULL);
2639
2640                 if (state == NULL) {
2641                         DEBUG(DEBUG_ERR,
2642                               ("Failed to call async killtcp control to node %u\n",
2643                                options.pnn));
2644                         talloc_free(killtcp);
2645                         return -1;
2646                 }
2647                 
2648                 ctdb_client_async_add(async_data, state);
2649         }
2650
2651         if (ctdb_client_async_wait(ctdb, async_data) != 0) {
2652                 DEBUG(DEBUG_ERR,("killtcp failed\n"));
2653                 talloc_free(killtcp);
2654                 return -1;
2655         }
2656
2657         talloc_free(killtcp);
2658         return 0;
2659 }
2660
2661
2662 /*
2663   kill a tcp connection
2664  */
2665 static int kill_tcp(struct ctdb_context *ctdb, int argc, const char **argv)
2666 {
2667         int ret;
2668         struct ctdb_control_killtcp killtcp;
2669
2670         assert_single_node_only();
2671
2672         if (argc == 0) {
2673                 return kill_tcp_from_file(ctdb, argc, argv);
2674         }
2675
2676         if (argc < 2) {
2677                 usage();
2678         }
2679
2680         if (!parse_ip_port(argv[0], &killtcp.src_addr)) {
2681                 DEBUG(DEBUG_ERR, ("Bad IP:port '%s'\n", argv[0]));
2682                 return -1;
2683         }
2684
2685         if (!parse_ip_port(argv[1], &killtcp.dst_addr)) {
2686                 DEBUG(DEBUG_ERR, ("Bad IP:port '%s'\n", argv[1]));
2687                 return -1;
2688         }
2689
2690         ret = ctdb_ctrl_killtcp(ctdb, TIMELIMIT(), options.pnn, &killtcp);
2691         if (ret != 0) {
2692                 DEBUG(DEBUG_ERR, ("Unable to killtcp from node %u\n", options.pnn));
2693                 return ret;
2694         }
2695
2696         return 0;
2697 }
2698
2699
2700 /*
2701   send a gratious arp
2702  */
2703 static int control_gratious_arp(struct ctdb_context *ctdb, int argc, const char **argv)
2704 {
2705         int ret;
2706         ctdb_sock_addr addr;
2707
2708         assert_single_node_only();
2709
2710         if (argc < 2) {
2711                 usage();
2712         }
2713
2714         if (!parse_ip(argv[0], NULL, 0, &addr)) {
2715                 DEBUG(DEBUG_ERR, ("Bad IP '%s'\n", argv[0]));
2716                 return -1;
2717         }
2718
2719         ret = ctdb_ctrl_gratious_arp(ctdb, TIMELIMIT(), options.pnn, &addr, argv[1]);
2720         if (ret != 0) {
2721                 DEBUG(DEBUG_ERR, ("Unable to send gratious_arp from node %u\n", options.pnn));
2722                 return ret;
2723         }
2724
2725         return 0;
2726 }
2727
2728 /*
2729   register a server id
2730  */
2731 static int regsrvid(struct ctdb_context *ctdb, int argc, const char **argv)
2732 {
2733         int ret;
2734         struct ctdb_server_id server_id;
2735
2736         if (argc < 3) {
2737                 usage();
2738         }
2739
2740         server_id.pnn       = strtoul(argv[0], NULL, 0);
2741         server_id.type      = strtoul(argv[1], NULL, 0);
2742         server_id.server_id = strtoul(argv[2], NULL, 0);
2743
2744         ret = ctdb_ctrl_register_server_id(ctdb, TIMELIMIT(), &server_id);
2745         if (ret != 0) {
2746                 DEBUG(DEBUG_ERR, ("Unable to register server_id from node %u\n", options.pnn));
2747                 return ret;
2748         }
2749         DEBUG(DEBUG_ERR,("Srvid registered. Sleeping for 999 seconds\n"));
2750         sleep(999);
2751         return -1;
2752 }
2753
2754 /*
2755   unregister a server id
2756  */
2757 static int unregsrvid(struct ctdb_context *ctdb, int argc, const char **argv)
2758 {
2759         int ret;
2760         struct ctdb_server_id server_id;
2761
2762         if (argc < 3) {
2763                 usage();
2764         }
2765
2766         server_id.pnn       = strtoul(argv[0], NULL, 0);
2767         server_id.type      = strtoul(argv[1], NULL, 0);
2768         server_id.server_id = strtoul(argv[2], NULL, 0);
2769
2770         ret = ctdb_ctrl_unregister_server_id(ctdb, TIMELIMIT(), &server_id);
2771         if (ret != 0) {
2772                 DEBUG(DEBUG_ERR, ("Unable to unregister server_id from node %u\n", options.pnn));
2773                 return ret;
2774         }
2775         return -1;
2776 }
2777
2778 /*
2779   check if a server id exists
2780  */
2781 static int chksrvid(struct ctdb_context *ctdb, int argc, const char **argv)
2782 {
2783         uint32_t status;
2784         int ret;
2785         struct ctdb_server_id server_id;
2786
2787         if (argc < 3) {
2788                 usage();
2789         }
2790
2791         server_id.pnn       = strtoul(argv[0], NULL, 0);
2792         server_id.type      = strtoul(argv[1], NULL, 0);
2793         server_id.server_id = strtoul(argv[2], NULL, 0);
2794
2795         ret = ctdb_ctrl_check_server_id(ctdb, TIMELIMIT(), options.pnn, &server_id, &status);
2796         if (ret != 0) {
2797                 DEBUG(DEBUG_ERR, ("Unable to check server_id from node %u\n", options.pnn));
2798                 return ret;
2799         }
2800
2801         if (status) {
2802                 printf("Server id %d:%d:%d EXISTS\n", server_id.pnn, server_id.type, server_id.server_id);
2803         } else {
2804                 printf("Server id %d:%d:%d does NOT exist\n", server_id.pnn, server_id.type, server_id.server_id);
2805         }
2806         return 0;
2807 }
2808
2809 /*
2810   get a list of all server ids that are registered on a node
2811  */
2812 static int getsrvids(struct ctdb_context *ctdb, int argc, const char **argv)
2813 {
2814         int i, ret;
2815         struct ctdb_server_id_list *server_ids;
2816
2817         ret = ctdb_ctrl_get_server_id_list(ctdb, ctdb, TIMELIMIT(), options.pnn, &server_ids);
2818         if (ret != 0) {
2819                 DEBUG(DEBUG_ERR, ("Unable to get server_id list from node %u\n", options.pnn));
2820                 return ret;
2821         }
2822
2823         for (i=0; i<server_ids->num; i++) {
2824                 printf("Server id %d:%d:%d\n", 
2825                         server_ids->server_ids[i].pnn, 
2826                         server_ids->server_ids[i].type, 
2827                         server_ids->server_ids[i].server_id); 
2828         }
2829
2830         return -1;
2831 }
2832
2833 /*
2834   check if a server id exists
2835  */
2836 static int check_srvids(struct ctdb_context *ctdb, int argc, const char **argv)
2837 {
2838         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
2839         uint64_t *ids;
2840         uint8_t *result;
2841         int i;
2842
2843         if (argc < 1) {
2844                 talloc_free(tmp_ctx);
2845                 usage();
2846         }
2847
2848         ids    = talloc_array(tmp_ctx, uint64_t, argc);
2849         result = talloc_array(tmp_ctx, uint8_t, argc);
2850
2851         for (i = 0; i < argc; i++) {
2852                 ids[i] = strtoull(argv[i], NULL, 0);
2853         }
2854
2855         if (!ctdb_client_check_message_handlers(ctdb, ids, argc, result)) {
2856                 DEBUG(DEBUG_ERR, ("Unable to check server_id from node %u\n",
2857                                   options.pnn));
2858                 talloc_free(tmp_ctx);
2859                 return -1;
2860         }
2861
2862         for (i=0; i < argc; i++) {
2863                 printf("Server id %d:%llu %s\n", options.pnn, (long long)ids[i],
2864                        result[i] ? "exists" : "does not exist");
2865         }
2866
2867         talloc_free(tmp_ctx);
2868         return 0;
2869 }
2870
2871 /*
2872   send a tcp tickle ack
2873  */
2874 static int tickle_tcp(struct ctdb_context *ctdb, int argc, const char **argv)
2875 {
2876         int ret;
2877         ctdb_sock_addr  src, dst;
2878
2879         if (argc < 2) {
2880                 usage();
2881         }
2882
2883         if (!parse_ip_port(argv[0], &src)) {
2884                 DEBUG(DEBUG_ERR, ("Bad IP:port '%s'\n", argv[0]));
2885                 return -1;
2886         }
2887
2888         if (!parse_ip_port(argv[1], &dst)) {
2889                 DEBUG(DEBUG_ERR, ("Bad IP:port '%s'\n", argv[1]));
2890                 return -1;
2891         }
2892
2893         ret = ctdb_sys_send_tcp(&src, &dst, 0, 0, 0);
2894         if (ret==0) {
2895                 return 0;
2896         }
2897         DEBUG(DEBUG_ERR, ("Error while sending tickle ack\n"));
2898
2899         return -1;
2900 }
2901
2902
2903 /*
2904   display public ip status
2905  */
2906 static int control_ip(struct ctdb_context *ctdb, int argc, const char **argv)
2907 {
2908         int i, ret;
2909         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2910         struct ctdb_all_public_ips *ips;
2911
2912         if (options.pnn == CTDB_BROADCAST_ALL) {
2913                 /* read the list of public ips from all nodes */
2914                 ret = control_get_all_public_ips(ctdb, tmp_ctx, &ips);
2915         } else {
2916                 /* read the public ip list from this node */
2917                 ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &ips);
2918         }
2919         if (ret != 0) {
2920                 DEBUG(DEBUG_ERR, ("Unable to get public ips from node %u\n", options.pnn));
2921                 talloc_free(tmp_ctx);
2922                 return ret;
2923         }
2924
2925         if (options.machinereadable){
2926                 printf(":Public IP:Node:");
2927                 if (options.verbose){
2928                         printf("ActiveInterface:AvailableInterfaces:ConfiguredInterfaces:");
2929                 }
2930                 printf("\n");
2931         } else {
2932                 if (options.pnn == CTDB_BROADCAST_ALL) {
2933                         printf("Public IPs on ALL nodes\n");
2934                 } else {
2935                         printf("Public IPs on node %u\n", options.pnn);
2936                 }
2937         }
2938
2939         for (i=1;i<=ips->num;i++) {
2940                 struct ctdb_control_public_ip_info *info = NULL;
2941                 int32_t pnn;
2942                 char *aciface = NULL;
2943                 char *avifaces = NULL;
2944                 char *cifaces = NULL;
2945
2946                 if (options.pnn == CTDB_BROADCAST_ALL) {
2947                         pnn = ips->ips[ips->num-i].pnn;
2948                 } else {
2949                         pnn = options.pnn;
2950                 }
2951
2952                 if (pnn != -1) {
2953                         ret = ctdb_ctrl_get_public_ip_info(ctdb, TIMELIMIT(), pnn, ctdb,
2954                                                    &ips->ips[ips->num-i].addr, &info);
2955                 } else {
2956                         ret = -1;
2957                 }
2958
2959                 if (ret == 0) {
2960                         int j;
2961                         for (j=0; j < info->num; j++) {
2962                                 if (cifaces == NULL) {
2963                                         cifaces = talloc_strdup(info,
2964                                                                 info->ifaces[j].name);
2965                                 } else {
2966                                         cifaces = talloc_asprintf_append(cifaces,
2967                                                                          ",%s",
2968                                                                          info->ifaces[j].name);
2969                                 }
2970
2971                                 if (info->active_idx == j) {
2972                                         aciface = info->ifaces[j].name;
2973                                 }
2974
2975                                 if (info->ifaces[j].link_state == 0) {
2976                                         continue;
2977                                 }
2978
2979                                 if (avifaces == NULL) {
2980                                         avifaces = talloc_strdup(info, info->ifaces[j].name);
2981                                 } else {
2982                                         avifaces = talloc_asprintf_append(avifaces,
2983                                                                           ",%s",
2984                                                                           info->ifaces[j].name);
2985                                 }
2986                         }
2987                 }
2988
2989                 if (options.machinereadable){
2990                         printf(":%s:%d:",
2991                                 ctdb_addr_to_str(&ips->ips[ips->num-i].addr),
2992                                 ips->ips[ips->num-i].pnn);
2993                         if (options.verbose){
2994                                 printf("%s:%s:%s:",
2995                                         aciface?aciface:"",
2996                                         avifaces?avifaces:"",
2997                                         cifaces?cifaces:"");
2998                         }
2999                         printf("\n");
3000                 } else {
3001                         if (options.verbose) {
3002                                 printf("%s node[%d] active[%s] available[%s] configured[%s]\n",
3003                                         ctdb_addr_to_str(&ips->ips[ips->num-i].addr),
3004                                         ips->ips[ips->num-i].pnn,
3005                                         aciface?aciface:"",
3006                                         avifaces?avifaces:"",
3007                                         cifaces?cifaces:"");
3008                         } else {
3009                                 printf("%s %d\n",
3010                                         ctdb_addr_to_str(&ips->ips[ips->num-i].addr),
3011                                         ips->ips[ips->num-i].pnn);
3012                         }
3013                 }
3014                 talloc_free(info);
3015         }
3016
3017         talloc_free(tmp_ctx);
3018         return 0;
3019 }
3020
3021 /*
3022   public ip info
3023  */
3024 static int control_ipinfo(struct ctdb_context *ctdb, int argc, const char **argv)
3025 {
3026         int i, ret;
3027         ctdb_sock_addr addr;
3028         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3029         struct ctdb_control_public_ip_info *info;
3030
3031         if (argc != 1) {
3032                 talloc_free(tmp_ctx);
3033                 usage();
3034         }
3035
3036         if (parse_ip(argv[0], NULL, 0, &addr) == 0) {
3037                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
3038                 return -1;
3039         }
3040
3041         /* read the public ip info from this node */
3042         ret = ctdb_ctrl_get_public_ip_info(ctdb, TIMELIMIT(), options.pnn,
3043                                            tmp_ctx, &addr, &info);
3044         if (ret != 0) {
3045                 DEBUG(DEBUG_ERR, ("Unable to get public ip[%s]info from node %u\n",
3046                                   argv[0], options.pnn));
3047                 talloc_free(tmp_ctx);
3048                 return ret;
3049         }
3050
3051         printf("Public IP[%s] info on node %u\n",
3052                ctdb_addr_to_str(&info->ip.addr),
3053                options.pnn);
3054
3055         printf("IP:%s\nCurrentNode:%d\nNumInterfaces:%u\n",
3056                ctdb_addr_to_str(&info->ip.addr),
3057                info->ip.pnn, info->num);
3058
3059         for (i=0; i<info->num; i++) {
3060                 info->ifaces[i].name[CTDB_IFACE_SIZE] = '\0';
3061
3062                 printf("Interface[%u]: Name:%s Link:%s References:%u%s\n",
3063                        i+1, info->ifaces[i].name,
3064                        info->ifaces[i].link_state?"up":"down",
3065                        (unsigned int)info->ifaces[i].references,
3066                        (i==info->active_idx)?" (active)":"");
3067         }
3068
3069         talloc_free(tmp_ctx);
3070         return 0;
3071 }
3072
3073 /*
3074   display interfaces status
3075  */
3076 static int control_ifaces(struct ctdb_context *ctdb, int argc, const char **argv)
3077 {
3078         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3079         int i;
3080         struct ctdb_control_get_ifaces *ifaces;
3081         int ret;
3082
3083         /* read the public ip list from this node */
3084         ret = ctdb_ctrl_get_ifaces(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &ifaces);
3085         if (ret != 0) {
3086                 DEBUG(DEBUG_ERR, ("Unable to get interfaces from node %u\n",
3087                                   options.pnn));
3088                 talloc_free(tmp_ctx);
3089                 return -1;
3090         }
3091
3092         if (options.machinereadable){
3093                 printf(":Name:LinkStatus:References:\n");
3094         } else {
3095                 printf("Interfaces on node %u\n", options.pnn);
3096         }
3097
3098         for (i=0; i<ifaces->num; i++) {
3099                 if (options.machinereadable){
3100                         printf(":%s:%s:%u\n",
3101                                ifaces->ifaces[i].name,
3102                                ifaces->ifaces[i].link_state?"1":"0",
3103                                (unsigned int)ifaces->ifaces[i].references);
3104                 } else {
3105                         printf("name:%s link:%s references:%u\n",
3106                                ifaces->ifaces[i].name,
3107                                ifaces->ifaces[i].link_state?"up":"down",
3108                                (unsigned int)ifaces->ifaces[i].references);
3109                 }
3110         }
3111
3112         talloc_free(tmp_ctx);
3113         return 0;
3114 }
3115
3116
3117 /*
3118   set link status of an interface
3119  */
3120 static int control_setifacelink(struct ctdb_context *ctdb, int argc, const char **argv)
3121 {
3122         int ret;
3123         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3124         struct ctdb_control_iface_info info;
3125
3126         ZERO_STRUCT(info);
3127
3128         if (argc != 2) {
3129                 usage();
3130         }
3131
3132         if (strlen(argv[0]) > CTDB_IFACE_SIZE) {
3133                 DEBUG(DEBUG_ERR, ("interfaces name '%s' too long\n",
3134                                   argv[0]));
3135                 talloc_free(tmp_ctx);
3136                 return -1;
3137         }
3138         strcpy(info.name, argv[0]);
3139
3140         if (strcmp(argv[1], "up") == 0) {
3141                 info.link_state = 1;
3142         } else if (strcmp(argv[1], "down") == 0) {
3143                 info.link_state = 0;
3144         } else {
3145                 DEBUG(DEBUG_ERR, ("link state invalid '%s' should be 'up' or 'down'\n",
3146                                   argv[1]));
3147                 talloc_free(tmp_ctx);
3148                 return -1;
3149         }
3150
3151         /* read the public ip list from this node */
3152         ret = ctdb_ctrl_set_iface_link(ctdb, TIMELIMIT(), options.pnn,
3153                                    tmp_ctx, &info);
3154         if (ret != 0) {
3155                 DEBUG(DEBUG_ERR, ("Unable to set link state for interfaces %s node %u\n",
3156                                   argv[0], options.pnn));
3157                 talloc_free(tmp_ctx);
3158                 return ret;
3159         }
3160
3161         talloc_free(tmp_ctx);
3162         return 0;
3163 }
3164
3165 /*
3166   display pid of a ctdb daemon
3167  */
3168 static int control_getpid(struct ctdb_context *ctdb, int argc, const char **argv)
3169 {
3170         uint32_t pid;
3171         int ret;
3172
3173         ret = ctdb_ctrl_getpid(ctdb, TIMELIMIT(), options.pnn, &pid);
3174         if (ret != 0) {
3175                 DEBUG(DEBUG_ERR, ("Unable to get daemon pid from node %u\n", options.pnn));
3176                 return ret;
3177         }
3178         printf("Pid:%d\n", pid);
3179
3180         return 0;
3181 }
3182
3183 typedef bool update_flags_handler_t(struct ctdb_context *ctdb, void *data);
3184
3185 static int update_flags_and_ipreallocate(struct ctdb_context *ctdb,
3186                                               void *data,
3187                                               update_flags_handler_t handler,
3188                                               uint32_t flag,
3189                                               const char *desc,
3190                                               bool set_flag)
3191 {
3192         struct ctdb_node_map *nodemap = NULL;
3193         bool flag_is_set;
3194         int ret;
3195
3196         /* Check if the node is already in the desired state */
3197         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
3198         if (ret != 0) {
3199                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
3200                 exit(10);
3201         }
3202         flag_is_set = nodemap->nodes[options.pnn].flags & flag;
3203         if (set_flag == flag_is_set) {
3204                 DEBUG(DEBUG_NOTICE, ("Node %d is %s %s\n", options.pnn,
3205                                      (set_flag ? "already" : "not"), desc));
3206                 return 0;
3207         }
3208
3209         do {
3210                 if (!handler(ctdb, data)) {
3211                         DEBUG(DEBUG_WARNING,
3212                               ("Failed to send control to set state %s on node %u, try again\n",
3213                                desc, options.pnn));
3214                 }
3215
3216                 sleep(1);
3217
3218                 /* Read the nodemap and verify the change took effect.
3219                  * Even if the above control/hanlder timed out then it
3220                  * could still have worked!
3221                  */
3222                 ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE,
3223                                          ctdb, &nodemap);
3224                 if (ret != 0) {
3225                         DEBUG(DEBUG_WARNING,
3226                               ("Unable to get nodemap from local node, try again\n"));
3227                 }
3228                 flag_is_set = nodemap->nodes[options.pnn].flags & flag;
3229         } while (nodemap == NULL || (set_flag != flag_is_set));
3230
3231         return ipreallocate(ctdb);
3232 }
3233
3234 /* Administratively disable a node */
3235 static bool update_flags_disabled(struct ctdb_context *ctdb, void *data)
3236 {
3237         int ret;
3238
3239         ret = ctdb_ctrl_modflags(ctdb, TIMELIMIT(), options.pnn,
3240                                  NODE_FLAGS_PERMANENTLY_DISABLED, 0);
3241         return ret == 0;
3242 }
3243
3244 static int control_disable(struct ctdb_context *ctdb, int argc, const char **argv)
3245 {
3246         return update_flags_and_ipreallocate(ctdb, NULL,
3247                                                   update_flags_disabled,
3248                                                   NODE_FLAGS_PERMANENTLY_DISABLED,
3249                                                   "disabled",
3250                                                   true /* set_flag*/);
3251 }
3252
3253 /* Administratively re-enable a node */
3254 static bool update_flags_not_disabled(struct ctdb_context *ctdb, void *data)
3255 {
3256         int ret;
3257
3258         ret = ctdb_ctrl_modflags(ctdb, TIMELIMIT(), options.pnn,
3259                                  0, NODE_FLAGS_PERMANENTLY_DISABLED);
3260         return ret == 0;
3261 }
3262
3263 static int control_enable(struct ctdb_context *ctdb,  int argc, const char **argv)
3264 {
3265         return update_flags_and_ipreallocate(ctdb, NULL,
3266                                                   update_flags_not_disabled,
3267                                                   NODE_FLAGS_PERMANENTLY_DISABLED,
3268                                                   "disabled",
3269                                                   false /* set_flag*/);
3270 }
3271
3272 /* Stop a node */
3273 static bool update_flags_stopped(struct ctdb_context *ctdb, void *data)
3274 {
3275         int ret;
3276
3277         ret = ctdb_ctrl_stop_node(ctdb, TIMELIMIT(), options.pnn);
3278
3279         return ret == 0;
3280 }
3281
3282 static int control_stop(struct ctdb_context *ctdb, int argc, const char **argv)
3283 {
3284         return update_flags_and_ipreallocate(ctdb, NULL,
3285                                                   update_flags_stopped,
3286                                                   NODE_FLAGS_STOPPED,
3287                                                   "stopped",
3288                                                   true /* set_flag*/);
3289 }
3290
3291 /* Continue a stopped node */
3292 static bool update_flags_not_stopped(struct ctdb_context *ctdb, void *data)
3293 {
3294         int ret;
3295
3296         ret = ctdb_ctrl_continue_node(ctdb, TIMELIMIT(), options.pnn);
3297
3298         return ret == 0;
3299 }
3300
3301 static int control_continue(struct ctdb_context *ctdb, int argc, const char **argv)
3302 {
3303         return update_flags_and_ipreallocate(ctdb, NULL,
3304                                                   update_flags_not_stopped,
3305                                                   NODE_FLAGS_STOPPED,
3306                                                   "stopped",
3307                                                   false /* set_flag */);
3308 }
3309
3310 static uint32_t get_generation(struct ctdb_context *ctdb)
3311 {
3312         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3313         struct ctdb_vnn_map *vnnmap=NULL;
3314         int ret;
3315         uint32_t generation;
3316
3317         /* wait until the recmaster is not in recovery mode */
3318         while (1) {
3319                 uint32_t recmode, recmaster;
3320                 
3321                 if (vnnmap != NULL) {
3322                         talloc_free(vnnmap);
3323                         vnnmap = NULL;
3324                 }
3325
3326                 /* get the recmaster */
3327                 ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, TIMELIMIT(), CTDB_CURRENT_NODE, &recmaster);
3328                 if (ret != 0) {
3329                         DEBUG(DEBUG_ERR, ("Unable to get recmaster from node %u\n", options.pnn));
3330                         talloc_free(tmp_ctx);
3331                         exit(10);
3332                 }
3333
3334                 /* get recovery mode */
3335                 ret = ctdb_ctrl_getrecmode(ctdb, tmp_ctx, TIMELIMIT(), recmaster, &recmode);
3336                 if (ret != 0) {
3337                         DEBUG(DEBUG_ERR, ("Unable to get recmode from node %u\n", options.pnn));
3338                         talloc_free(tmp_ctx);
3339                         exit(10);
3340                 }
3341
3342                 /* get the current generation number */
3343                 ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), recmaster, tmp_ctx, &vnnmap);
3344                 if (ret != 0) {
3345                         DEBUG(DEBUG_ERR, ("Unable to get vnnmap from recmaster (%u)\n", recmaster));
3346                         talloc_free(tmp_ctx);
3347                         exit(10);
3348                 }
3349
3350                 if ((recmode == CTDB_RECOVERY_NORMAL) && (vnnmap->generation != 1)) {
3351                         generation = vnnmap->generation;
3352                         talloc_free(tmp_ctx);
3353                         return generation;
3354                 }
3355                 sleep(1);
3356         }
3357 }
3358
3359 /* Ban a node */
3360 static bool update_state_banned(struct ctdb_context *ctdb, void *data)
3361 {
3362         struct ctdb_ban_time *bantime = (struct ctdb_ban_time *)data;
3363         int ret;
3364
3365         ret = ctdb_ctrl_set_ban(ctdb, TIMELIMIT(), options.pnn, bantime);
3366
3367         return ret == 0;
3368 }
3369
3370 static int control_ban(struct ctdb_context *ctdb, int argc, const char **argv)
3371 {
3372         struct ctdb_ban_time bantime;
3373
3374         if (argc < 1) {
3375                 usage();
3376         }
3377         
3378         bantime.pnn  = options.pnn;
3379         bantime.time = strtoul(argv[0], NULL, 0);
3380
3381         if (bantime.time == 0) {
3382                 DEBUG(DEBUG_ERR, ("Invalid ban time specified - must be >0\n"));
3383                 return -1;
3384         }
3385
3386         return update_flags_and_ipreallocate(ctdb, &bantime,
3387                                                   update_state_banned,
3388                                                   NODE_FLAGS_BANNED,
3389                                                   "banned",
3390                                                   true /* set_flag*/);
3391 }
3392
3393
3394 /* Unban a node */
3395 static int control_unban(struct ctdb_context *ctdb, int argc, const char **argv)
3396 {
3397         struct ctdb_ban_time bantime;
3398
3399         bantime.pnn  = options.pnn;
3400         bantime.time = 0;
3401
3402         return update_flags_and_ipreallocate(ctdb, &bantime,
3403                                                   update_state_banned,
3404                                                   NODE_FLAGS_BANNED,
3405                                                   "banned",
3406                                                   false /* set_flag*/);
3407 }
3408
3409 /*
3410   show ban information for a node
3411  */
3412 static int control_showban(struct ctdb_context *ctdb, int argc, const char **argv)
3413 {
3414         int ret;
3415         struct ctdb_node_map *nodemap=NULL;
3416         struct ctdb_ban_time *bantime;
3417
3418         /* verify the node exists */
3419         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
3420         if (ret != 0) {
3421                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
3422                 return ret;
3423         }
3424
3425         ret = ctdb_ctrl_get_ban(ctdb, TIMELIMIT(), options.pnn, ctdb, &bantime);
3426         if (ret != 0) {
3427                 DEBUG(DEBUG_ERR,("Showing ban info for node %d failed.\n", options.pnn));
3428                 return -1;
3429         }       
3430
3431         if (bantime->time == 0) {
3432                 printf("Node %u is not banned\n", bantime->pnn);
3433         } else {
3434                 printf("Node %u is banned, %d seconds remaining\n",
3435                        bantime->pnn, bantime->time);
3436         }
3437
3438         return 0;
3439 }
3440
3441 /*
3442   shutdown a daemon
3443  */
3444 static int control_shutdown(struct ctdb_context *ctdb, int argc, const char **argv)
3445 {
3446         int ret;
3447
3448         ret = ctdb_ctrl_shutdown(ctdb, TIMELIMIT(), options.pnn);
3449         if (ret != 0) {
3450                 DEBUG(DEBUG_ERR, ("Unable to shutdown node %u\n", options.pnn));
3451                 return ret;
3452         }
3453
3454         return 0;
3455 }
3456
3457 /*
3458   trigger a recovery
3459  */
3460 static int control_recover(struct ctdb_context *ctdb, int argc, const char **argv)
3461 {
3462         int ret;
3463         uint32_t generation, next_generation;
3464         bool force;
3465
3466         /* "force" option ignores freeze failure and forces recovery */
3467         force = (argc == 1) && (strcasecmp(argv[0], "force") == 0);
3468
3469         /* record the current generation number */
3470         generation = get_generation(ctdb);
3471
3472         ret = ctdb_ctrl_freeze_priority(ctdb, TIMELIMIT(), options.pnn, 1);
3473         if (ret != 0) {
3474                 if (!force) {
3475                         DEBUG(DEBUG_ERR, ("Unable to freeze node\n"));
3476                         return ret;
3477                 }
3478                 DEBUG(DEBUG_WARNING, ("Unable to freeze node but proceeding because \"force\" option given\n"));
3479         }
3480
3481         ret = ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
3482         if (ret != 0) {
3483                 DEBUG(DEBUG_ERR, ("Unable to set recovery mode\n"));
3484                 return ret;
3485         }
3486
3487         /* wait until we are in a new generation */
3488         while (1) {
3489                 next_generation = get_generation(ctdb);
3490                 if (next_generation != generation) {
3491                         return 0;
3492                 }
3493                 sleep(1);
3494         }
3495
3496         return 0;
3497 }
3498
3499
3500 /*
3501   display monitoring mode of a remote node
3502  */
3503 static int control_getmonmode(struct ctdb_context *ctdb, int argc, const char **argv)
3504 {
3505         uint32_t monmode;
3506         int ret;
3507
3508         ret = ctdb_ctrl_getmonmode(ctdb, TIMELIMIT(), options.pnn, &monmode);
3509         if (ret != 0) {
3510                 DEBUG(DEBUG_ERR, ("Unable to get monmode from node %u\n", options.pnn));
3511                 return ret;
3512         }
3513         if (!options.machinereadable){
3514                 printf("Monitoring mode:%s (%d)\n",monmode==CTDB_MONITORING_ACTIVE?"ACTIVE":"DISABLED",monmode);
3515         } else {
3516                 printf(":mode:\n");
3517                 printf(":%d:\n",monmode);
3518         }
3519         return 0;
3520 }
3521
3522
3523 /*
3524   display capabilities of a remote node
3525  */
3526 static int control_getcapabilities(struct ctdb_context *ctdb, int argc, const char **argv)
3527 {
3528         uint32_t capabilities;
3529         int ret;
3530
3531         ret = ctdb_ctrl_getcapabilities(ctdb, TIMELIMIT(), options.pnn, &capabilities);
3532         if (ret != 0) {
3533                 DEBUG(DEBUG_ERR, ("Unable to get capabilities from node %u\n", options.pnn));
3534                 return -1;
3535         }
3536         
3537         if (!options.machinereadable){
3538                 printf("RECMASTER: %s\n", (capabilities&CTDB_CAP_RECMASTER)?"YES":"NO");
3539                 printf("LMASTER: %s\n", (capabilities&CTDB_CAP_LMASTER)?"YES":"NO");
3540                 printf("LVS: %s\n", (capabilities&CTDB_CAP_LVS)?"YES":"NO");
3541                 printf("NATGW: %s\n", (capabilities&CTDB_CAP_NATGW)?"YES":"NO");
3542         } else {
3543                 printf(":RECMASTER:LMASTER:LVS:NATGW:\n");
3544                 printf(":%d:%d:%d:%d:\n",
3545                         !!(capabilities&CTDB_CAP_RECMASTER),
3546                         !!(capabilities&CTDB_CAP_LMASTER),
3547                         !!(capabilities&CTDB_CAP_LVS),
3548                         !!(capabilities&CTDB_CAP_NATGW));
3549         }
3550         return 0;
3551 }
3552
3553 /*
3554   display lvs configuration
3555  */
3556
3557 static uint32_t lvs_exclude_flags[] = {
3558         /* Look for a nice healthy node */
3559         NODE_FLAGS_INACTIVE|NODE_FLAGS_DISABLED,
3560         /* If not found, an UNHEALTHY node will do */
3561         NODE_FLAGS_INACTIVE|NODE_FLAGS_PERMANENTLY_DISABLED,
3562         0,
3563 };
3564
3565 static int control_lvs(struct ctdb_context *ctdb, int argc, const char **argv)
3566 {
3567         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3568         struct ctdb_node_map *orig_nodemap=NULL;
3569         struct ctdb_node_map *nodemap;
3570         int i, ret;
3571
3572         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn,
3573                                    tmp_ctx, &orig_nodemap);
3574         if (ret != 0) {
3575                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
3576                 talloc_free(tmp_ctx);
3577                 return -1;
3578         }
3579
3580         nodemap = filter_nodemap_by_capabilities(ctdb, orig_nodemap,
3581                                                  CTDB_CAP_LVS);
3582         if (nodemap == NULL) {
3583                 /* No memory */
3584                 ret = -1;
3585                 goto done;
3586         }
3587
3588         ret = 0;
3589
3590         for (i = 0; lvs_exclude_flags[i] != 0; i++) {
3591                 struct ctdb_node_map *t =
3592                         filter_nodemap_by_flags(ctdb, nodemap,
3593                                                 lvs_exclude_flags[i]);
3594                 if (t == NULL) {
3595                         /* No memory */
3596                         ret = -1;
3597                         goto done;
3598                 }
3599                 if (t->num > 0) {
3600                         /* At least 1 node without excluded flags */
3601                         int j;
3602                         for (j = 0; j < t->num; j++) {
3603                                 printf("%d:%s\n", t->nodes[j].pnn, 
3604                                        ctdb_addr_to_str(&t->nodes[j].addr));
3605                         }
3606                         goto done;
3607                 }
3608                 talloc_free(t);
3609         }
3610 done:
3611         talloc_free(tmp_ctx);
3612         return ret;
3613 }
3614
3615 /*
3616   display who is the lvs master
3617  */
3618 static int control_lvsmaster(struct ctdb_context *ctdb, int argc, const char **argv)
3619 {
3620         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3621         struct ctdb_node_map *orig_nodemap=NULL;
3622         struct ctdb_node_map *nodemap;
3623         int i, ret;
3624
3625         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn,
3626                                    tmp_ctx, &orig_nodemap);
3627         if (ret != 0) {
3628                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
3629                 talloc_free(tmp_ctx);
3630                 return -1;
3631         }
3632
3633         nodemap = filter_nodemap_by_capabilities(ctdb, orig_nodemap,
3634                                                  CTDB_CAP_LVS);
3635         if (nodemap == NULL) {
3636                 /* No memory */
3637                 ret = -1;
3638                 goto done;
3639         }
3640
3641         for (i = 0; lvs_exclude_flags[i] != 0; i++) {
3642                 struct ctdb_node_map *t =
3643                         filter_nodemap_by_flags(ctdb, nodemap,
3644                                                 lvs_exclude_flags[i]);
3645                 if (t == NULL) {
3646                         /* No memory */
3647                         ret = -1;
3648                         goto done;
3649                 }
3650                 if (t->num > 0) {
3651                         ret = 0;
3652                         printf(options.machinereadable ?
3653                                "%d\n" : "Node %d is LVS master\n",
3654                                 t->nodes[0].pnn);
3655                         goto done;
3656                 }
3657                 talloc_free(t);
3658         }
3659
3660         printf("There is no LVS master\n");
3661         ret = 255;
3662 done:
3663         talloc_free(tmp_ctx);
3664         return ret;
3665 }
3666
3667 /*
3668   disable monitoring on a  node
3669  */
3670 static int control_disable_monmode(struct ctdb_context *ctdb, int argc, const char **argv)
3671 {
3672         
3673         int ret;
3674
3675         ret = ctdb_ctrl_disable_monmode(ctdb, TIMELIMIT(), options.pnn);
3676         if (ret != 0) {
3677                 DEBUG(DEBUG_ERR, ("Unable to disable monmode on node %u\n", options.pnn));
3678                 return ret;
3679         }
3680         printf("Monitoring mode:%s\n","DISABLED");
3681
3682         return 0;
3683 }
3684
3685 /*
3686   enable monitoring on a  node
3687  */
3688 static int control_enable_monmode(struct ctdb_context *ctdb, int argc, const char **argv)
3689 {
3690         
3691         int ret;
3692
3693         ret = ctdb_ctrl_enable_monmode(ctdb, TIMELIMIT(), options.pnn);
3694         if (ret != 0) {
3695                 DEBUG(DEBUG_ERR, ("Unable to enable monmode on node %u\n", options.pnn));
3696                 return ret;
3697         }
3698         printf("Monitoring mode:%s\n","ACTIVE");
3699
3700         return 0;
3701 }
3702
3703 /*
3704   display remote list of keys/data for a db
3705  */
3706 static int control_catdb(struct ctdb_context *ctdb, int argc, const char **argv)
3707 {
3708         const char *db_name;
3709         struct ctdb_db_context *ctdb_db;
3710         int ret;
3711         struct ctdb_dump_db_context c;
3712         uint8_t flags;
3713
3714         if (argc < 1) {
3715                 usage();
3716         }
3717
3718         if (!db_exists(ctdb, argv[0], NULL, &db_name, &flags)) {
3719                 return -1;
3720         }
3721
3722         ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), db_name, flags & CTDB_DB_FLAGS_PERSISTENT, 0);
3723         if (ctdb_db == NULL) {
3724                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
3725                 return -1;
3726         }
3727
3728         if (options.printlmaster) {
3729                 ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), options.pnn,
3730                                           ctdb, &ctdb->vnn_map);
3731                 if (ret != 0) {
3732                         DEBUG(DEBUG_ERR, ("Unable to get vnnmap from node %u\n",
3733                                           options.pnn));
3734                         return ret;
3735                 }
3736         }
3737
3738         ZERO_STRUCT(c);
3739         c.f = stdout;
3740         c.printemptyrecords = (bool)options.printemptyrecords;
3741         c.printdatasize = (bool)options.printdatasize;
3742         c.printlmaster = (bool)options.printlmaster;
3743         c.printhash = (bool)options.printhash;
3744         c.printrecordflags = (bool)options.printrecordflags;
3745
3746         /* traverse and dump the cluster tdb */
3747         ret = ctdb_dump_db(ctdb_db, &c);
3748         if (ret == -1) {
3749                 DEBUG(DEBUG_ERR, ("Unable to dump database\n"));
3750                 DEBUG(DEBUG_ERR, ("Maybe try 'ctdb getdbstatus %s'"
3751                                   " and 'ctdb getvar AllowUnhealthyDBRead'\n",
3752                                   db_name));
3753                 return -1;
3754         }
3755         talloc_free(ctdb_db);
3756
3757         printf("Dumped %d records\n", ret);
3758         return 0;
3759 }
3760
3761 struct cattdb_data {
3762         struct ctdb_context *ctdb;
3763         uint32_t count;
3764 };
3765
3766 static int cattdb_traverse(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *private_data)
3767 {
3768         struct cattdb_data *d = private_data;
3769         struct ctdb_dump_db_context c;
3770
3771         d->count++;
3772
3773         ZERO_STRUCT(c);
3774         c.f = stdout;
3775         c.printemptyrecords = (bool)options.printemptyrecords;
3776         c.printdatasize = (bool)options.printdatasize;
3777         c.printlmaster = false;
3778         c.printhash = (bool)options.printhash;
3779         c.printrecordflags = true;
3780
3781         return ctdb_dumpdb_record(d->ctdb, key, data, &c);
3782 }
3783
3784 /*
3785   cat the local tdb database using same format as catdb
3786  */
3787 static int control_cattdb(struct ctdb_context *ctdb, int argc, const char **argv)
3788 {
3789         const char *db_name;
3790         struct ctdb_db_context *ctdb_db;
3791         struct cattdb_data d;
3792         uint8_t flags;
3793
3794         if (argc < 1) {
3795                 usage();
3796         }
3797
3798         if (!db_exists(ctdb, argv[0], NULL, &db_name, &flags)) {
3799                 return -1;
3800         }
3801
3802         ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), db_name, flags & CTDB_DB_FLAGS_PERSISTENT, 0);
3803         if (ctdb_db == NULL) {
3804                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
3805                 return -1;
3806         }
3807
3808         /* traverse the local tdb */
3809         d.count = 0;
3810         d.ctdb  = ctdb;
3811         if (tdb_traverse_read(ctdb_db->ltdb->tdb, cattdb_traverse, &d) == -1) {
3812                 printf("Failed to cattdb data\n");
3813                 exit(10);
3814         }
3815         talloc_free(ctdb_db);
3816
3817         printf("Dumped %d records\n", d.count);
3818         return 0;
3819 }
3820
3821 /*
3822   display the content of a database key
3823  */
3824 static int control_readkey(struct ctdb_context *ctdb, int argc, const char **argv)
3825 {
3826         const char *db_name;
3827         struct ctdb_db_context *ctdb_db;
3828         struct ctdb_record_handle *h;
3829         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3830         TDB_DATA key, data;
3831         uint8_t flags;
3832
3833         if (argc < 2) {
3834                 usage();
3835         }
3836
3837         if (!db_exists(ctdb, argv[0], NULL, &db_name, &flags)) {
3838                 return -1;
3839         }
3840
3841         ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), db_name, flags & CTDB_DB_FLAGS_PERSISTENT, 0);
3842         if (ctdb_db == NULL) {
3843                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
3844                 return -1;
3845         }
3846
3847         key.dptr  = discard_const(argv[1]);
3848         key.dsize = strlen((char *)key.dptr);
3849
3850         h = ctdb_fetch_lock(ctdb_db, tmp_ctx, key, &data);
3851         if (h == NULL) {
3852                 printf("Failed to fetch record '%s' on node %d\n", 
3853                         (const char *)key.dptr, ctdb_get_pnn(ctdb));
3854                 talloc_free(tmp_ctx);
3855                 exit(10);
3856         }
3857
3858         printf("Data: size:%d ptr:[%.*s]\n", (int)data.dsize, (int)data.dsize, data.dptr);
3859
3860         talloc_free(ctdb_db);
3861         talloc_free(tmp_ctx);
3862         return 0;
3863 }
3864
3865 /*
3866   display the content of a database key
3867  */
3868 static int control_writekey(struct ctdb_context *ctdb, int argc, const char **argv)
3869 {
3870         const char *db_name;
3871         struct ctdb_db_context *ctdb_db;
3872         struct ctdb_record_handle *h;
3873         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3874         TDB_DATA key, data;
3875         uint8_t flags;
3876
3877         if (argc < 3) {
3878                 usage();
3879         }
3880
3881         if (!db_exists(ctdb, argv[0], NULL, &db_name, &flags)) {
3882                 return -1;
3883         }
3884
3885         ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), db_name, flags & CTDB_DB_FLAGS_PERSISTENT, 0);
3886         if (ctdb_db == NULL) {
3887                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
3888                 return -1;
3889         }
3890
3891         key.dptr  = discard_const(argv[1]);
3892         key.dsize = strlen((char *)key.dptr);
3893
3894         h = ctdb_fetch_lock(ctdb_db, tmp_ctx, key, &data);
3895         if (h == NULL) {
3896                 printf("Failed to fetch record '%s' on node %d\n", 
3897                         (const char *)key.dptr, ctdb_get_pnn(ctdb));
3898                 talloc_free(tmp_ctx);
3899                 exit(10);
3900         }
3901
3902         data.dptr  = discard_const(argv[2]);
3903         data.dsize = strlen((char *)data.dptr);
3904
3905         if (ctdb_record_store(h, data) != 0) {
3906                 printf("Failed to store record\n");
3907         }
3908
3909         talloc_free(h);
3910         talloc_free(ctdb_db);
3911         talloc_free(tmp_ctx);
3912         return 0;
3913 }
3914
3915 /*
3916   fetch a record from a persistent database
3917  */
3918 static int control_pfetch(struct ctdb_context *ctdb, int argc, const char **argv)
3919 {
3920         const char *db_name;
3921         struct ctdb_db_context *ctdb_db;
3922         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3923         struct ctdb_transaction_handle *h;
3924         TDB_DATA key, data;
3925         int fd, ret;
3926         bool persistent;
3927         uint8_t flags;
3928
3929         if (argc < 2) {
3930                 talloc_free(tmp_ctx);
3931                 usage();
3932         }
3933
3934         if (!db_exists(ctdb, argv[0], NULL, &db_name, &flags)) {
3935                 talloc_free(tmp_ctx);
3936                 return -1;
3937         }
3938
3939         persistent = flags & CTDB_DB_FLAGS_PERSISTENT;
3940         if (!persistent) {
3941                 DEBUG(DEBUG_ERR,("Database '%s' is not persistent\n", db_name));
3942                 talloc_free(tmp_ctx);
3943                 return -1;
3944         }
3945
3946         ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), db_name, persistent, 0);
3947         if (ctdb_db == NULL) {
3948                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
3949                 talloc_free(tmp_ctx);
3950                 return -1;
3951         }
3952
3953         h = ctdb_transaction_start(ctdb_db, tmp_ctx);
3954         if (h == NULL) {
3955                 DEBUG(DEBUG_ERR,("Failed to start transaction on database %s\n", db_name));
3956                 talloc_free(tmp_ctx);
3957                 return -1;
3958         }
3959
3960         key.dptr  = discard_const(argv[1]);
3961         key.dsize = strlen(argv[1]);
3962         ret = ctdb_transaction_fetch(h, tmp_ctx, key, &data);
3963         if (ret != 0) {
3964                 DEBUG(DEBUG_ERR,("Failed to fetch record\n"));
3965                 talloc_free(tmp_ctx);
3966                 return -1;
3967         }
3968
3969         if (data.dsize == 0 || data.dptr == NULL) {
3970                 DEBUG(DEBUG_ERR,("Record is empty\n"));
3971                 talloc_free(tmp_ctx);
3972                 return -1;
3973         }
3974
3975         if (argc == 3) {
3976           fd = open(argv[2], O_WRONLY|O_CREAT|O_TRUNC, 0600);
3977                 if (fd == -1) {
3978                         DEBUG(DEBUG_ERR,("Failed to open output file %s\n", argv[2]));
3979                         talloc_free(tmp_ctx);
3980                         return -1;
3981                 }
3982                 write(fd, data.dptr, data.dsize);
3983                 close(fd);
3984         } else {
3985                 write(1, data.dptr, data.dsize);
3986         }
3987
3988         /* abort the transaction */
3989         talloc_free(h);
3990
3991
3992         talloc_free(tmp_ctx);
3993         return 0;
3994 }
3995
3996 /*
3997   fetch a record from a tdb-file
3998  */
3999 static int control_tfetch(struct ctdb_context *ctdb, int argc, const char **argv)
4000 {
4001         const char *tdb_file;
4002         TDB_CONTEXT *tdb;
4003         TDB_DATA key, data;
4004         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
4005         int fd;
4006
4007         if (argc < 2) {
4008                 usage();
4009         }
4010
4011         tdb_file = argv[0];
4012
4013         tdb = tdb_open(tdb_file, 0, 0, O_RDONLY, 0);
4014         if (tdb == NULL) {
4015                 printf("Failed to open TDB file %s\n", tdb_file);
4016                 return -1;
4017         }
4018
4019         if (!strncmp(argv[1], "0x", 2)) {
4020                 key = hextodata(tmp_ctx, argv[1] + 2);
4021                 if (key.dsize == 0) {
4022                         printf("Failed to convert \"%s\" into a TDB_DATA\n", argv[1]);
4023                         return -1;
4024                 }
4025         } else {
4026                 key.dptr  = discard_const(argv[1]);
4027                 key.dsize = strlen(argv[1]);
4028         }
4029
4030         data = tdb_fetch(tdb, key);
4031         if (data.dptr == NULL || data.dsize < sizeof(struct ctdb_ltdb_header)) {
4032                 printf("Failed to read record %s from tdb %s\n", argv[1], tdb_file);
4033                 tdb_close(tdb);
4034                 return -1;
4035         }
4036
4037         tdb_close(tdb);
4038
4039         if (argc == 3) {
4040           fd = open(argv[2], O_WRONLY|O_CREAT|O_TRUNC, 0600);
4041                 if (fd == -1) {
4042                         printf("Failed to open output file %s\n", argv[2]);
4043                         return -1;
4044                 }
4045                 if (options.verbose){
4046                         write(fd, data.dptr, data.dsize);
4047                 } else {
4048                         write(fd, data.dptr+sizeof(struct ctdb_ltdb_header), data.dsize-sizeof(struct ctdb_ltdb_header));
4049                 }
4050                 close(fd);
4051         } else {
4052                 if (options.verbose){
4053                         write(1, data.dptr, data.dsize);
4054                 } else {
4055                         write(1, data.dptr+sizeof(struct ctdb_ltdb_header), data.dsize-sizeof(struct ctdb_ltdb_header));
4056                 }
4057         }
4058
4059         talloc_free(tmp_ctx);
4060         return 0;
4061 }
4062
4063 /*
4064   store a record and header to a tdb-file
4065  */
4066 static int control_tstore(struct ctdb_context *ctdb, int argc, const char **argv)
4067 {
4068         const char *tdb_file;
4069         TDB_CONTEXT *tdb;
4070         TDB_DATA key, value, data;
4071         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
4072         struct ctdb_ltdb_header header;
4073
4074         if (argc < 3) {
4075                 usage();
4076         }
4077
4078         tdb_file = argv[0];
4079
4080         tdb = tdb_open(tdb_file, 0, 0, O_RDWR, 0);
4081         if (tdb == NULL) {
4082                 printf("Failed to open TDB file %s\n", tdb_file);
4083                 return -1;
4084         }
4085
4086         if (!strncmp(argv[1], "0x", 2)) {
4087                 key = hextodata(tmp_ctx, argv[1] + 2);
4088                 if (key.dsize == 0) {
4089                         printf("Failed to convert \"%s\" into a TDB_DATA\n", argv[1]);
4090                         return -1;
4091                 }
4092         } else {
4093                 key.dptr  = discard_const(argv[1]);
4094                 key.dsize = strlen(argv[1]);
4095         }
4096
4097         if (!strncmp(argv[2], "0x", 2)) {
4098                 value = hextodata(tmp_ctx, argv[2] + 2);
4099                 if (value.dsize == 0) {
4100                         printf("Failed to convert \"%s\" into a TDB_DATA\n", argv[2]);
4101                         return -1;
4102                 }
4103         } else {
4104                 value.dptr  = discard_const(argv[2]);
4105                 value.dsize = strlen(argv[2]);
4106         }
4107
4108         ZERO_STRUCT(header);
4109         if (argc > 3) {
4110                 header.rsn = atoll(argv[3]);
4111         }
4112         if (argc > 4) {
4113                 header.dmaster = atoi(argv[4]);
4114         }
4115         if (argc > 5) {
4116                 header.flags = atoi(argv[5]);
4117         }
4118
4119         data.dsize = sizeof(struct ctdb_ltdb_header) + value.dsize;
4120         data.dptr = talloc_size(tmp_ctx, data.dsize);
4121         if (data.dptr == NULL) {
4122                 printf("Failed to allocate header+value\n");
4123                 return -1;
4124         }
4125
4126         *(struct ctdb_ltdb_header *)data.dptr = header;
4127         memcpy(data.dptr + sizeof(struct ctdb_ltdb_header), value.dptr, value.dsize);
4128
4129         if (tdb_store(tdb, key, data, TDB_REPLACE) != 0) {
4130                 printf("Failed to write record %s to tdb %s\n", argv[1], tdb_file);
4131                 tdb_close(tdb);
4132                 return -1;
4133         }
4134
4135         tdb_close(tdb);
4136
4137         talloc_free(tmp_ctx);
4138         return 0;
4139 }
4140
4141 /*
4142   write a record to a persistent database
4143  */
4144 static int control_pstore(struct ctdb_context *ctdb, int argc, const char **argv)
4145 {
4146         const char *db_name;
4147         struct ctdb_db_context *ctdb_db;
4148         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
4149         struct ctdb_transaction_handle *h;
4150         struct stat st;
4151         TDB_DATA key, data;
4152         int fd, ret;
4153
4154         if (argc < 3) {
4155                 talloc_free(tmp_ctx);
4156                 usage();
4157         }
4158
4159         fd = open(argv[2], O_RDONLY);
4160         if (fd == -1) {
4161                 DEBUG(DEBUG_ERR,("Failed to open file containing record data : %s  %s\n", argv[2], strerror(errno)));
4162                 talloc_free(tmp_ctx);
4163                 return -1;
4164         }
4165         
4166         ret = fstat(fd, &st);
4167         if (ret == -1) {
4168                 DEBUG(DEBUG_ERR,("fstat of file %s failed: %s\n", argv[2], strerror(errno)));
4169                 close(fd);
4170                 talloc_free(tmp_ctx);
4171                 return -1;
4172         }
4173
4174         if (!S_ISREG(st.st_mode)) {
4175                 DEBUG(DEBUG_ERR,("Not a regular file %s\n", argv[2]));
4176                 close(fd);
4177                 talloc_free(tmp_ctx);
4178                 return -1;
4179         }
4180
4181         data.dsize = st.st_size;
4182         if (data.dsize == 0) {
4183                 data.dptr  = NULL;
4184         } else {
4185                 data.dptr = talloc_size(tmp_ctx, data.dsize);
4186                 if (data.dptr == NULL) {
4187                         DEBUG(DEBUG_ERR,("Failed to talloc %d of memory to store record data\n", (int)data.dsize));
4188                         close(fd);
4189                         talloc_free(tmp_ctx);
4190                         return -1;
4191                 }
4192                 ret = read(fd, data.dptr, data.dsize);
4193                 if (ret != data.dsize) {
4194                         DEBUG(DEBUG_ERR,("Failed to read %d bytes of record data\n", (int)data.dsize));
4195                         close(fd);
4196                         talloc_free(tmp_ctx);
4197                         return -1;
4198                 }
4199         }
4200         close(fd);
4201
4202
4203         db_name = argv[0];
4204
4205         ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), db_name, true, 0);
4206         if (ctdb_db == NULL) {
4207                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
4208                 talloc_free(tmp_ctx);
4209                 return -1;
4210         }
4211
4212         h = ctdb_transaction_start(ctdb_db, tmp_ctx);
4213         if (h == NULL) {
4214                 DEBUG(DEBUG_ERR,("Failed to start transaction on database %s\n", db_name));
4215                 talloc_free(tmp_ctx);
4216                 return -1;
4217         }
4218
4219         key.dptr  = discard_const(argv[1]);
4220         key.dsize = strlen(argv[1]);
4221         ret = ctdb_transaction_store(h, key, data);
4222         if (ret != 0) {
4223                 DEBUG(DEBUG_ERR,("Failed to store record\n"));
4224                 talloc_free(tmp_ctx);
4225                 return -1;
4226         }
4227
4228         ret = ctdb_transaction_commit(h);
4229         if (ret != 0) {
4230                 DEBUG(DEBUG_ERR,("Failed to commit transaction\n"));
4231                 talloc_free(tmp_ctx);
4232                 return -1;
4233         }
4234
4235
4236         talloc_free(tmp_ctx);
4237         return 0;
4238 }
4239
4240 /*
4241  * delete a record from a persistent database
4242  */
4243 static int control_pdelete(struct ctdb_context *ctdb, int argc, const char **argv)
4244 {
4245         const char *db_name;
4246         struct ctdb_db_context *ctdb_db;
4247         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
4248         struct ctdb_transaction_handle *h;
4249         TDB_DATA key;
4250         int ret;
4251         bool persistent;
4252         uint8_t flags;
4253
4254         if (argc < 2) {
4255                 talloc_free(tmp_ctx);
4256                 usage();
4257         }
4258
4259         if (!db_exists(ctdb, argv[0], NULL, &db_name, &flags)) {
4260                 talloc_free(tmp_ctx);
4261                 return -1;
4262         }
4263
4264         persistent = flags & CTDB_DB_FLAGS_PERSISTENT;
4265         if (!persistent) {
4266                 DEBUG(DEBUG_ERR, ("Database '%s' is not persistent\n", db_name));
4267                 talloc_free(tmp_ctx);
4268                 return -1;
4269         }
4270
4271         ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), db_name, persistent, 0);
4272         if (ctdb_db == NULL) {
4273                 DEBUG(DEBUG_ERR, ("Unable to attach to database '%s'\n", db_name));
4274                 talloc_free(tmp_ctx);
4275                 return -1;
4276         }
4277
4278         h = ctdb_transaction_start(ctdb_db, tmp_ctx);
4279         if (h == NULL) {
4280                 DEBUG(DEBUG_ERR, ("Failed to start transaction on database %s\n", db_name));
4281                 talloc_free(tmp_ctx);
4282                 return -1;
4283         }
4284
4285         key.dptr = discard_const(argv[1]);
4286         key.dsize = strlen(argv[1]);
4287         ret = ctdb_transaction_store(h, key, tdb_null);
4288         if (ret != 0) {
4289                 DEBUG(DEBUG_ERR, ("Failed to delete record\n"));
4290                 talloc_free(tmp_ctx);
4291                 return -1;
4292         }
4293
4294         ret = ctdb_transaction_commit(h);
4295         if (ret != 0) {
4296                 DEBUG(DEBUG_ERR, ("Failed to commit transaction\n"));
4297                 talloc_free(tmp_ctx);
4298                 return -1;
4299         }
4300
4301         talloc_free(tmp_ctx);
4302         return 0;
4303 }
4304
4305 static const char *ptrans_parse_string(TALLOC_CTX *mem_ctx, const char *s,
4306                                        TDB_DATA *data)
4307 {
4308         const char *t;
4309         size_t n;
4310         const char *ret; /* Next byte after successfully parsed value */
4311
4312         /* Error, unless someone says otherwise */
4313         ret = NULL;
4314         /* Indicates no value to parse */
4315         *data = tdb_null;
4316
4317         /* Skip whitespace */
4318         n = strspn(s, " \t");
4319         t = s + n;
4320
4321         if (t[0] == '"') {
4322                 /* Quoted ASCII string - no wide characters! */
4323                 t++;
4324                 n = strcspn(t, "\"");
4325                 if (t[n] == '"') {
4326                         if (n > 0) {
4327                                 data->dsize = n;
4328                                 data->dptr = talloc_memdup(mem_ctx, t, n);
4329                                 CTDB_NOMEM_ABORT(data->dptr);
4330                         }
4331                         ret = t + n + 1;
4332                 } else {
4333                         DEBUG(DEBUG_WARNING,("Unmatched \" in input %s\n", s));
4334                 }
4335         } else {
4336                 DEBUG(DEBUG_WARNING,("Unsupported input format in %s\n", s));
4337         }
4338
4339         return ret;
4340 }
4341
4342 static bool ptrans_get_key_value(TALLOC_CTX *mem_ctx, FILE *file,
4343                                  TDB_DATA *key, TDB_DATA *value)
4344 {
4345         char line [1024]; /* FIXME: make this more flexible? */
4346         const char *t;
4347         char *ptr;
4348
4349         ptr = fgets(line, sizeof(line), file);
4350
4351         if (ptr == NULL) {
4352                 return false;
4353         }
4354
4355         /* Get key */
4356         t = ptrans_parse_string(mem_ctx, line, key);
4357         if (t == NULL || key->dptr == NULL) {
4358                 /* Line Ignored but not EOF */
4359                 return true;
4360         }
4361
4362         /* Get value */
4363         t = ptrans_parse_string(mem_ctx, t, value);
4364         if (t == NULL) {
4365                 /* Line Ignored but not EOF */
4366                 talloc_free(key->dptr);
4367                 *key = tdb_null;
4368                 return true;
4369         }
4370
4371         return true;
4372 }
4373
4374 /*
4375  * Update a persistent database as per file/stdin
4376  */
4377 static int control_ptrans(struct ctdb_context *ctdb,
4378                           int argc, const char **argv)
4379 {
4380         const char *db_name;
4381         struct ctdb_db_context *ctdb_db;
4382         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
4383         struct ctdb_transaction_handle *h;
4384         TDB_DATA key, value;
4385         FILE *file;
4386         int ret;
4387
4388         if (argc < 1) {
4389                 talloc_free(tmp_ctx);
4390                 usage();
4391         }
4392
4393         file = stdin;
4394         if (argc == 2) {
4395                 file = fopen(argv[1], "r");
4396                 if (file == NULL) {
4397                         DEBUG(DEBUG_ERR,("Unable to open file for reading '%s'\n", argv[1]));
4398                         talloc_free(tmp_ctx);
4399                         return -1;
4400                 }
4401         }
4402
4403         db_name = argv[0];
4404
4405         ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), db_name, true, 0);
4406         if (ctdb_db == NULL) {
4407                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
4408                 goto error;
4409         }
4410
4411         h = ctdb_transaction_start(ctdb_db, tmp_ctx);
4412         if (h == NULL) {
4413                 DEBUG(DEBUG_ERR,("Failed to start transaction on database %s\n", db_name));
4414                 goto error;
4415         }
4416
4417         while (ptrans_get_key_value(tmp_ctx, file, &key, &value)) {
4418                 if (key.dsize != 0) {
4419                         ret = ctdb_transaction_store(h, key, value);
4420                         /* Minimise memory use */
4421                         talloc_free(key.dptr);
4422                         if (value.dptr != NULL) {
4423                                 talloc_free(value.dptr);
4424                         }
4425                         if (ret != 0) {
4426                                 DEBUG(DEBUG_ERR,("Failed to store record\n"));
4427                                 ctdb_transaction_cancel(h);
4428                                 goto error;
4429                         }
4430                 }
4431         }
4432
4433         ret = ctdb_transaction_commit(h);
4434         if (ret != 0) {
4435                 DEBUG(DEBUG_ERR,("Failed to commit transaction\n"));
4436                 goto error;
4437         }
4438
4439         if (file != stdin) {
4440                 fclose(file);
4441         }
4442         talloc_free(tmp_ctx);
4443         return 0;
4444
4445 error:
4446         if (file != stdin) {
4447                 fclose(file);
4448         }
4449
4450         talloc_free(tmp_ctx);
4451         return -1;
4452 }
4453
4454 /*
4455   check if a service is bound to a port or not
4456  */
4457 static int control_chktcpport(struct ctdb_context *ctdb, int argc, const char **argv)
4458 {
4459         int s, ret;
4460         int v;
4461         int port;
4462         struct sockaddr_in sin;
4463
4464         if (argc != 1) {
4465                 printf("Use: ctdb chktcport <port>\n");
4466                 return EINVAL;
4467         }
4468
4469         port = atoi(argv[0]);
4470
4471         s = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
4472         if (s == -1) {
4473                 printf("Failed to open local socket\n");
4474                 return errno;
4475         }
4476
4477         v = fcntl(s, F_GETFL, 0);
4478         if (v == -1 || fcntl(s, F_SETFL, v | O_NONBLOCK) != 0) {
4479                 printf("Unable to set socket non-blocking: %s\n", strerror(errno));
4480         }
4481
4482         bzero(&sin, sizeof(sin));
4483         sin.sin_family = PF_INET;
4484         sin.sin_port   = htons(port);
4485         ret = bind(s, (struct sockaddr *)&sin, sizeof(sin));
4486         close(s);
4487         if (ret == -1) {
4488                 printf("Failed to bind to local socket: %d %s\n", errno, strerror(errno));
4489                 return errno;
4490         }
4491
4492         return 0;
4493 }
4494
4495
4496
4497 static void log_handler(struct ctdb_context *ctdb, uint64_t srvid, 
4498                              TDB_DATA data, void *private_data)
4499 {
4500         DEBUG(DEBUG_ERR,("Log data received\n"));
4501         if (data.dsize > 0) {
4502                 printf("%s", data.dptr);
4503         }
4504
4505         exit(0);
4506 }
4507
4508 /*
4509   display a list of log messages from the in memory ringbuffer
4510  */
4511 static int control_getlog(struct ctdb_context *ctdb, int argc, const char **argv)
4512 {
4513         int ret, i;
4514         bool main_daemon;
4515         struct ctdb_get_log_addr log_addr;
4516         TDB_DATA data;
4517         struct timeval tv;
4518
4519         /* Process options */
4520         main_daemon = true;
4521         log_addr.pnn = ctdb_get_pnn(ctdb);
4522         log_addr.level = DEBUG_NOTICE;
4523         for (i = 0; i < argc; i++) {
4524                 if (strcmp(argv[i], "recoverd") == 0) {
4525                         main_daemon = false;
4526                 } else {
4527                         if (isalpha(argv[i][0]) || argv[i][0] == '-') { 
4528                                 log_addr.level = get_debug_by_desc(argv[i]);
4529                         } else {
4530                                 log_addr.level = strtol(argv[i], NULL, 0);
4531                         }
4532                 }
4533         }
4534
4535         /* Our message port is our PID */
4536         log_addr.srvid = getpid();
4537
4538         data.dptr = (unsigned char *)&log_addr;
4539         data.dsize = sizeof(log_addr);
4540
4541         DEBUG(DEBUG_ERR, ("Pulling logs from node %u\n", options.pnn));
4542
4543         ctdb_client_set_message_handler(ctdb, log_addr.srvid, log_handler, NULL);
4544         sleep(1);
4545
4546         DEBUG(DEBUG_ERR,("Listen for response on %d\n", (int)log_addr.srvid));
4547
4548         if (main_daemon) {
4549                 int32_t res;
4550                 char *errmsg;
4551                 TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
4552
4553                 ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_GET_LOG,
4554                                    0, data, tmp_ctx, NULL, &res, NULL, &errmsg);
4555                 if (ret != 0 || res != 0) {
4556                         DEBUG(DEBUG_ERR,("Failed to get logs - %s\n", errmsg));
4557                         talloc_free(tmp_ctx);
4558                         return -1;
4559                 }
4560                 talloc_free(tmp_ctx);
4561         } else {
4562                 ret = ctdb_client_send_message(ctdb, options.pnn,
4563                                                CTDB_SRVID_GETLOG, data);
4564                 if (ret != 0) {
4565                         DEBUG(DEBUG_ERR,("Failed to send getlog request message to %u\n", options.pnn));
4566                         return -1;
4567                 }
4568         }
4569
4570         tv = timeval_current();
4571         /* this loop will terminate when we have received the reply */
4572         while (timeval_elapsed(&tv) < (double)options.timelimit) {
4573                 event_loop_once(ctdb->ev);
4574         }
4575
4576         DEBUG(DEBUG_INFO,("Timed out waiting for log data.\n"));
4577
4578         return 0;
4579 }
4580
4581 /*
4582   clear the in memory log area
4583  */
4584 static int control_clearlog(struct ctdb_context *ctdb, int argc, const char **argv)
4585 {
4586         int ret;
4587
4588         if (argc == 0 || (argc >= 1 && strcmp(argv[0], "recoverd") != 0)) {
4589                 /* "recoverd" not given - get logs from main daemon */
4590                 int32_t res;
4591                 char *errmsg;
4592                 TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
4593
4594                 ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_CLEAR_LOG,
4595                                    0, tdb_null, tmp_ctx, NULL, &res, NULL, &errmsg);
4596                 if (ret != 0 || res != 0) {
4597                         DEBUG(DEBUG_ERR,("Failed to clear logs\n"));
4598                         talloc_free(tmp_ctx);
4599                         return -1;
4600                 }
4601
4602                 talloc_free(tmp_ctx);
4603         } else {
4604                 TDB_DATA data; /* unused in recoverd... */
4605                 data.dsize = 0;
4606
4607                 ret = ctdb_client_send_message(ctdb, options.pnn, CTDB_SRVID_CLEARLOG, data);
4608                 if (ret != 0) {
4609                         DEBUG(DEBUG_ERR,("Failed to send clearlog request message to %u\n", options.pnn));
4610                         return -1;
4611                 }
4612         }
4613
4614         return 0;
4615 }
4616
4617 /* Reload public IPs on a specified nodes */
4618 static int control_reloadips(struct ctdb_context *ctdb, int argc, const char **argv)
4619 {
4620         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
4621         uint32_t *nodes;
4622         uint32_t pnn_mode;
4623         uint32_t timeout;
4624         int ret;
4625
4626         assert_single_node_only();
4627
4628         if (argc > 1) {
4629                 usage();
4630         }
4631
4632         /* Determine the nodes where IPs need to be reloaded */
4633         if (!parse_nodestring(ctdb, tmp_ctx, argc == 1 ? argv[0] : NULL,
4634                               options.pnn, true, &nodes, &pnn_mode)) {
4635                 ret = -1;
4636                 goto done;
4637         }
4638
4639 again:
4640         /* Disable takeover runs on all connected nodes.  A reply
4641          * indicating success is needed from each node so all nodes
4642          * will need to be active.  This will retry until maxruntime
4643          * is exceeded, hence no error handling.
4644          * 
4645          * A check could be added to not allow reloading of IPs when
4646          * there are disconnected nodes.  However, this should
4647          * probably be left up to the administrator.
4648          */
4649         timeout = LONGTIMEOUT;
4650         srvid_broadcast(ctdb, CTDB_SRVID_DISABLE_TAKEOVER_RUNS, &timeout,
4651                         "Disable takeover runs", true);
4652
4653         /* Now tell all the desired nodes to reload their public IPs.
4654          * Keep trying this until it succeeds.  This assumes all
4655          * failures are transient, which might not be true...
4656          */
4657         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_RELOAD_PUBLIC_IPS,
4658                                       nodes, 0, LONGTIMELIMIT(),
4659                                       false, tdb_null,
4660                                       NULL, NULL, NULL) != 0) {
4661                 DEBUG(DEBUG_ERR,
4662                       ("Unable to reload IPs on some nodes, try again.\n"));
4663                 goto again;
4664         }
4665
4666         /* It isn't strictly necessary to wait until takeover runs are
4667          * re-enabled but doing so can't hurt.
4668          */
4669         timeout = 0;
4670         srvid_broadcast(ctdb, CTDB_SRVID_DISABLE_TAKEOVER_RUNS, &timeout,
4671                         "Enable takeover runs", true);
4672
4673         ipreallocate(ctdb);
4674
4675         ret = 0;
4676 done:
4677         talloc_free(tmp_ctx);
4678         return ret;
4679 }
4680
4681 /*
4682   display a list of the databases on a remote ctdb
4683  */
4684 static int control_getdbmap(struct ctdb_context *ctdb, int argc, const char **argv)
4685 {
4686         int i, ret;
4687         struct ctdb_dbid_map *dbmap=NULL;
4688
4689         ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, ctdb, &dbmap);
4690         if (ret != 0) {
4691                 DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n", options.pnn));
4692                 return ret;
4693         }
4694
4695         if(options.machinereadable){
4696                 printf(":ID:Name:Path:Persistent:Sticky:Unhealthy:ReadOnly:\n");
4697                 for(i=0;i<dbmap->num;i++){
4698                         const char *path;
4699                         const char *name;
4700                         const char *health;
4701                         bool persistent;
4702                         bool readonly;
4703                         bool sticky;
4704
4705                         ctdb_ctrl_getdbpath(ctdb, TIMELIMIT(), options.pnn,
4706                                             dbmap->dbs[i].dbid, ctdb, &path);
4707                         ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn,
4708                                             dbmap->dbs[i].dbid, ctdb, &name);
4709                         ctdb_ctrl_getdbhealth(ctdb, TIMELIMIT(), options.pnn,
4710                                               dbmap->dbs[i].dbid, ctdb, &health);
4711                         persistent = dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT;
4712                         readonly   = dbmap->dbs[i].flags & CTDB_DB_FLAGS_READONLY;
4713                         sticky     = dbmap->dbs[i].flags & CTDB_DB_FLAGS_STICKY;
4714                         printf(":0x%08X:%s:%s:%d:%d:%d:%d:\n",
4715                                dbmap->dbs[i].dbid, name, path,
4716                                !!(persistent), !!(sticky),
4717                                !!(health), !!(readonly));
4718                 }
4719                 return 0;
4720         }
4721
4722         printf("Number of databases:%d\n", dbmap->num);
4723         for(i=0;i<dbmap->num;i++){
4724                 const char *path;
4725                 const char *name;
4726                 const char *health;
4727                 bool persistent;
4728                 bool readonly;
4729                 bool sticky;
4730
4731                 ctdb_ctrl_getdbpath(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &path);
4732                 ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &name);
4733                 ctdb_ctrl_getdbhealth(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &health);
4734                 persistent = dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT;
4735                 readonly   = dbmap->dbs[i].flags & CTDB_DB_FLAGS_READONLY;
4736                 sticky     = dbmap->dbs[i].flags & CTDB_DB_FLAGS_STICKY;
4737                 printf("dbid:0x%08x name:%s path:%s%s%s%s%s\n",
4738                        dbmap->dbs[i].dbid, name, path,
4739                        persistent?" PERSISTENT":"",
4740                        sticky?" STICKY":"",
4741                        readonly?" READONLY":"",
4742                        health?" UNHEALTHY":"");
4743         }
4744
4745         return 0;
4746 }
4747
4748 /*
4749   display the status of a database on a remote ctdb
4750  */
4751 static int control_getdbstatus(struct ctdb_context *ctdb, int argc, const char **argv)
4752 {
4753         const char *db_name;
4754         uint32_t db_id;
4755         uint8_t flags;
4756         const char *path;
4757         const char *health;
4758
4759         if (argc < 1) {
4760                 usage();
4761         }
4762
4763         if (!db_exists(ctdb, argv[0], &db_id, &db_name, &flags)) {
4764                 return -1;
4765         }
4766
4767         ctdb_ctrl_getdbpath(ctdb, TIMELIMIT(), options.pnn, db_id, ctdb, &path);
4768         ctdb_ctrl_getdbhealth(ctdb, TIMELIMIT(), options.pnn, db_id, ctdb, &health);
4769         printf("dbid: 0x%08x\nname: %s\npath: %s\nPERSISTENT: %s\nSTICKY: %s\nREADONLY: %s\nHEALTH: %s\n",
4770                db_id, db_name, path,
4771                (flags & CTDB_DB_FLAGS_PERSISTENT ? "yes" : "no"),
4772                (flags & CTDB_DB_FLAGS_STICKY ? "yes" : "no"),
4773                (flags & CTDB_DB_FLAGS_READONLY ? "yes" : "no"),
4774                (health ? health : "OK"));
4775
4776         return 0;
4777 }
4778
4779 /*
4780   check if the local node is recmaster or not
4781   it will return 1 if this node is the recmaster and 0 if it is not
4782   or if the local ctdb daemon could not be contacted
4783  */
4784 static int control_isnotrecmaster(struct ctdb_context *ctdb, int argc, const char **argv)
4785 {
4786         uint32_t mypnn, recmaster;
4787         int ret;
4788
4789         assert_single_node_only();
4790
4791         mypnn = getpnn(ctdb);
4792
4793         ret = ctdb_ctrl_getrecmaster(ctdb, ctdb, TIMELIMIT(), options.pnn, &recmaster);
4794         if (ret != 0) {
4795                 printf("Failed to get the recmaster\n");
4796                 return 1;
4797         }
4798
4799         if (recmaster != mypnn) {
4800                 printf("this node is not the recmaster\n");
4801                 return 1;
4802         }
4803
4804         printf("this node is the recmaster\n");
4805         return 0;
4806 }
4807
4808 /*
4809   ping a node
4810  */
4811 static int control_ping(struct ctdb_context *ctdb, int argc, const char **argv)
4812 {
4813         int ret;
4814         struct timeval tv = timeval_current();
4815         ret = ctdb_ctrl_ping(ctdb, options.pnn);
4816         if (ret == -1) {
4817                 printf("Unable to get ping response from node %u\n", options.pnn);
4818                 return -1;
4819         } else {
4820                 printf("response from %u time=%.6f sec  (%d clients)\n", 
4821                        options.pnn, timeval_elapsed(&tv), ret);
4822         }
4823         return 0;
4824 }
4825
4826
4827 /*
4828   get a node's runstate
4829  */
4830 static int control_runstate(struct ctdb_context *ctdb, int argc, const char **argv)
4831 {
4832         int ret;
4833         enum ctdb_runstate runstate;
4834
4835         ret = ctdb_ctrl_get_runstate(ctdb, TIMELIMIT(), options.pnn, &runstate);
4836         if (ret == -1) {
4837                 printf("Unable to get runstate response from node %u\n",
4838                        options.pnn);
4839                 return -1;
4840         } else {
4841                 bool found = true;
4842                 enum ctdb_runstate t;
4843                 int i;
4844                 for (i=0; i<argc; i++) {
4845                         found = false;
4846                         t = runstate_from_string(argv[i]);
4847                         if (t == CTDB_RUNSTATE_UNKNOWN) {
4848                                 printf("Invalid run state (%s)\n", argv[i]);
4849                                 return -1;
4850                         }
4851
4852                         if (t == runstate) {
4853                                 found = true;
4854                                 break;
4855                         }
4856                 }
4857
4858                 if (!found) {
4859                         printf("CTDB not in required run state (got %s)\n", 
4860                                runstate_to_string((enum ctdb_runstate)runstate));
4861                         return -1;
4862                 }
4863         }
4864
4865         printf("%s\n", runstate_to_string(runstate));
4866         return 0;
4867 }
4868
4869
4870 /*
4871   get a tunable
4872  */
4873 static int control_getvar(struct ctdb_context *ctdb, int argc, const char **argv)
4874 {
4875         const char *name;
4876         uint32_t value;
4877         int ret;
4878
4879         if (argc < 1) {
4880                 usage();
4881         }
4882
4883         name = argv[0];
4884         ret = ctdb_ctrl_get_tunable(ctdb, TIMELIMIT(), options.pnn, name, &value);
4885         if (ret != 0) {
4886                 DEBUG(DEBUG_ERR, ("Unable to get tunable variable '%s'\n", name));
4887                 return -1;
4888         }
4889
4890         printf("%-23s = %u\n", name, value);
4891         return 0;
4892 }
4893
4894 /*
4895   set a tunable
4896  */
4897 static int control_setvar(struct ctdb_context *ctdb, int argc, const char **argv)
4898 {
4899         const char *name;
4900         uint32_t value;
4901         int ret;
4902
4903         if (argc < 2) {
4904                 usage();
4905         }
4906
4907         name = argv[0];
4908         value = strtoul(argv[1], NULL, 0);
4909
4910         ret = ctdb_ctrl_set_tunable(ctdb, TIMELIMIT(), options.pnn, name, value);
4911         if (ret == -1) {
4912                 DEBUG(DEBUG_ERR, ("Unable to set tunable variable '%s'\n", name));
4913                 return -1;
4914         }
4915         return 0;
4916 }
4917
4918 /*
4919   list all tunables
4920  */
4921 static int control_listvars(struct ctdb_context *ctdb, int argc, const char **argv)
4922 {
4923         uint32_t count;
4924         const char **list;
4925         int ret, i;
4926
4927         ret = ctdb_ctrl_list_tunables(ctdb, TIMELIMIT(), options.pnn, ctdb, &list, &count);
4928         if (ret == -1) {
4929                 DEBUG(DEBUG_ERR, ("Unable to list tunable variables\n"));
4930                 return -1;
4931         }
4932
4933         for (i=0;i<count;i++) {
4934                 control_getvar(ctdb, 1, &list[i]);
4935         }
4936
4937         talloc_free(list);
4938         
4939         return 0;
4940 }
4941
4942 /*
4943   display debug level on a node
4944  */
4945 static int control_getdebug(struct ctdb_context *ctdb, int argc, const char **argv)
4946 {
4947         int ret;
4948         int32_t level;
4949
4950         ret = ctdb_ctrl_get_debuglevel(ctdb, options.pnn, &level);
4951         if (ret != 0) {
4952                 DEBUG(DEBUG_ERR, ("Unable to get debuglevel response from node %u\n", options.pnn));
4953                 return ret;
4954         } else {
4955                 if (options.machinereadable){
4956                         printf(":Name:Level:\n");
4957                         printf(":%s:%d:\n",get_debug_by_level(level),level);
4958                 } else {
4959                         printf("Node %u is at debug level %s (%d)\n", options.pnn, get_debug_by_level(level), level);
4960                 }
4961         }
4962         return 0;
4963 }
4964
4965 /*
4966   display reclock file of a node
4967  */
4968 static int control_getreclock(struct ctdb_context *ctdb, int argc, const char **argv)
4969 {
4970         int ret;
4971         const char *reclock;
4972
4973         ret = ctdb_ctrl_getreclock(ctdb, TIMELIMIT(), options.pnn, ctdb, &reclock);
4974         if (ret != 0) {
4975                 DEBUG(DEBUG_ERR, ("Unable to get reclock file from node %u\n", options.pnn));
4976                 return ret;
4977         } else {
4978                 if (options.machinereadable){
4979                         if (reclock != NULL) {
4980                                 printf("%s", reclock);
4981                         }
4982                 } else {
4983                         if (reclock == NULL) {
4984                                 printf("No reclock file used.\n");
4985                         } else {
4986                                 printf("Reclock file:%s\n", reclock);
4987                         }
4988                 }
4989         }
4990         return 0;
4991 }
4992
4993 /*
4994   set the reclock file of a node
4995  */
4996 static int control_setreclock(struct ctdb_context *ctdb, int argc, const char **argv)
4997 {
4998         int ret;
4999         const char *reclock;
5000
5001         if (argc == 0) {
5002                 reclock = NULL;
5003         } else if (argc == 1) {
5004                 reclock = argv[0];
5005         } else {
5006                 usage();
5007         }
5008
5009         ret = ctdb_ctrl_setreclock(ctdb, TIMELIMIT(), options.pnn, reclock);
5010         if (ret != 0) {
5011                 DEBUG(DEBUG_ERR, ("Unable to get reclock file from node %u\n", options.pnn));
5012                 return ret;
5013         }
5014         return 0;
5015 }
5016
5017 /*
5018   set the natgw state on/off
5019  */
5020 static int control_setnatgwstate(struct ctdb_context *ctdb, int argc, const char **argv)
5021 {
5022         int ret;
5023         uint32_t natgwstate;
5024
5025         if (argc == 0) {
5026                 usage();
5027         }
5028
5029         if (!strcmp(argv[0], "on")) {
5030                 natgwstate = 1;
5031         } else if (!strcmp(argv[0], "off")) {
5032                 natgwstate = 0;
5033         } else {
5034                 usage();
5035         }
5036
5037         ret = ctdb_ctrl_setnatgwstate(ctdb, TIMELIMIT(), options.pnn, natgwstate);
5038         if (ret != 0) {
5039                 DEBUG(DEBUG_ERR, ("Unable to set the natgw state for node %u\n", options.pnn));
5040                 return ret;
5041         }
5042
5043         return 0;
5044 }
5045
5046 /*
5047   set the lmaster role on/off
5048  */
5049 static int control_setlmasterrole(struct ctdb_context *ctdb, int argc, const char **argv)
5050 {
5051         int ret;
5052         uint32_t lmasterrole;
5053
5054         if (argc == 0) {
5055                 usage();
5056         }
5057
5058         if (!strcmp(argv[0], "on")) {
5059                 lmasterrole = 1;
5060         } else if (!strcmp(argv[0], "off")) {
5061                 lmasterrole = 0;
5062         } else {
5063                 usage();
5064         }
5065
5066         ret = ctdb_ctrl_setlmasterrole(ctdb, TIMELIMIT(), options.pnn, lmasterrole);
5067         if (ret != 0) {
5068                 DEBUG(DEBUG_ERR, ("Unable to set the lmaster role for node %u\n", options.pnn));
5069                 return ret;
5070         }
5071
5072         return 0;
5073 }
5074
5075 /*
5076   set the recmaster role on/off
5077  */
5078 static int control_setrecmasterrole(struct ctdb_context *ctdb, int argc, const char **argv)
5079 {
5080         int ret;
5081         uint32_t recmasterrole;
5082
5083         if (argc == 0) {
5084                 usage();
5085         }
5086
5087         if (!strcmp(argv[0], "on")) {
5088                 recmasterrole = 1;
5089         } else if (!strcmp(argv[0], "off")) {
5090                 recmasterrole = 0;
5091         } else {
5092                 usage();
5093         }
5094
5095         ret = ctdb_ctrl_setrecmasterrole(ctdb, TIMELIMIT(), options.pnn, recmasterrole);
5096         if (ret != 0) {
5097                 DEBUG(DEBUG_ERR, ("Unable to set the recmaster role for node %u\n", options.pnn));
5098                 return ret;
5099         }
5100
5101         return 0;
5102 }
5103
5104 /*
5105   set debug level on a node or all nodes
5106  */
5107 static int control_setdebug(struct ctdb_context *ctdb, int argc, const char **argv)
5108 {
5109         int i, ret;
5110         int32_t level;
5111
5112         if (argc == 0) {
5113                 printf("You must specify the debug level. Valid levels are:\n");
5114                 for (i=0; debug_levels[i].description != NULL; i++) {
5115                         printf("%s (%d)\n", debug_levels[i].description, debug_levels[i].level);
5116                 }
5117
5118                 return 0;
5119         }
5120
5121         if (isalpha(argv[0][0]) || argv[0][0] == '-') { 
5122                 level = get_debug_by_desc(argv[0]);
5123         } else {
5124                 level = strtol(argv[0], NULL, 0);
5125         }
5126
5127         for (i=0; debug_levels[i].description != NULL; i++) {
5128                 if (level == debug_levels[i].level) {
5129                         break;
5130                 }
5131         }
5132         if (debug_levels[i].description == NULL) {
5133                 printf("Invalid debug level, must be one of\n");
5134                 for (i=0; debug_levels[i].description != NULL; i++) {
5135                         printf("%s (%d)\n", debug_levels[i].description, debug_levels[i].level);
5136                 }
5137                 return -1;
5138         }
5139
5140         ret = ctdb_ctrl_set_debuglevel(ctdb, options.pnn, level);
5141         if (ret != 0) {
5142                 DEBUG(DEBUG_ERR, ("Unable to set debug level on node %u\n", options.pnn));
5143         }
5144         return 0;
5145 }
5146
5147
5148 /*
5149   thaw a node
5150  */
5151 static int control_thaw(struct ctdb_context *ctdb, int argc, const char **argv)
5152 {
5153         int ret;
5154         uint32_t priority;
5155         
5156         if (argc == 1) {
5157                 priority = strtol(argv[0], NULL, 0);
5158         } else {
5159                 priority = 0;
5160         }
5161         DEBUG(DEBUG_ERR,("Thaw by priority %u\n", priority));
5162
5163         ret = ctdb_ctrl_thaw_priority(ctdb, TIMELIMIT(), options.pnn, priority);
5164         if (ret != 0) {
5165                 DEBUG(DEBUG_ERR, ("Unable to thaw node %u\n", options.pnn));
5166         }               
5167         return 0;
5168 }
5169
5170
5171 /*
5172   attach to a database
5173  */
5174 static int control_attach(struct ctdb_context *ctdb, int argc, const char **argv)
5175 {
5176         const char *db_name;
5177         struct ctdb_db_context *ctdb_db;
5178         bool persistent = false;
5179
5180         if (argc < 1) {
5181                 usage();
5182         }
5183         db_name = argv[0];
5184         if (argc > 2) {
5185                 usage();
5186         }
5187         if (argc == 2) {
5188                 if (strcmp(argv[1], "persistent") != 0) {
5189                         usage();
5190                 }
5191                 persistent = true;
5192         }
5193
5194         ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), db_name, persistent, 0);
5195         if (ctdb_db == NULL) {
5196                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
5197                 return -1;
5198         }
5199
5200         return 0;
5201 }
5202
5203 /*
5204   set db priority
5205  */
5206 static int control_setdbprio(struct ctdb_context *ctdb, int argc, const char **argv)
5207 {
5208         struct ctdb_db_priority db_prio;
5209         int ret;
5210
5211         if (argc < 2) {
5212                 usage();
5213         }
5214
5215         db_prio.db_id    = strtoul(argv[0], NULL, 0);
5216         db_prio.priority = strtoul(argv[1], NULL, 0);
5217
5218         ret = ctdb_ctrl_set_db_priority(ctdb, TIMELIMIT(), options.pnn, &db_prio);
5219         if (ret != 0) {
5220                 DEBUG(DEBUG_ERR,("Unable to set db prio\n"));
5221                 return -1;
5222         }
5223
5224         return 0;
5225 }
5226
5227 /*
5228   get db priority
5229  */
5230 static int control_getdbprio(struct ctdb_context *ctdb, int argc, const char **argv)
5231 {
5232         uint32_t db_id, priority;
5233         int ret;
5234
5235         if (argc < 1) {
5236                 usage();
5237         }
5238
5239         if (!db_exists(ctdb, argv[0], &db_id, NULL, NULL)) {
5240                 return -1;
5241         }
5242
5243         ret = ctdb_ctrl_get_db_priority(ctdb, TIMELIMIT(), options.pnn, db_id, &priority);
5244         if (ret != 0) {
5245                 DEBUG(DEBUG_ERR,("Unable to get db prio\n"));
5246                 return -1;
5247         }
5248
5249         DEBUG(DEBUG_ERR,("Priority:%u\n", priority));
5250
5251         return 0;
5252 }
5253
5254 /*
5255   set the sticky records capability for a database
5256  */
5257 static int control_setdbsticky(struct ctdb_context *ctdb, int argc, const char **argv)
5258 {
5259         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
5260         uint32_t db_id;
5261         int ret;
5262
5263         if (argc < 1) {
5264                 usage();
5265         }
5266
5267         if (!db_exists(ctdb, argv[0], &db_id, NULL, NULL)) {
5268                 return -1;
5269         }
5270
5271         ret = ctdb_ctrl_set_db_sticky(ctdb, options.pnn, db_id);
5272         if (ret != 0) {
5273                 DEBUG(DEBUG_ERR,("Unable to set db to support sticky records\n"));
5274                 talloc_free(tmp_ctx);
5275                 return -1;
5276         }
5277
5278         talloc_free(tmp_ctx);
5279         return 0;
5280 }
5281
5282 /*
5283   set the readonly capability for a database
5284  */
5285 static int control_setdbreadonly(struct ctdb_context *ctdb, int argc, const char **argv)
5286 {
5287         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
5288         uint32_t db_id;
5289         int ret;
5290
5291         if (argc < 1) {
5292                 usage();
5293         }
5294
5295         if (!db_exists(ctdb, argv[0], &db_id, NULL, NULL)) {
5296                 return -1;
5297         }
5298
5299         ret = ctdb_ctrl_set_db_readonly(ctdb, options.pnn, db_id);
5300         if (ret != 0) {
5301                 DEBUG(DEBUG_ERR,("Unable to set db to support readonly\n"));
5302                 talloc_free(tmp_ctx);
5303                 return -1;
5304         }
5305
5306         talloc_free(tmp_ctx);
5307         return 0;
5308 }
5309
5310 /*
5311   get db seqnum
5312  */
5313 static int control_getdbseqnum(struct ctdb_context *ctdb, int argc, const char **argv)
5314 {
5315         uint32_t db_id;
5316         uint64_t seqnum;
5317         int ret;
5318
5319         if (argc < 1) {
5320                 usage();
5321         }
5322
5323         if (!db_exists(ctdb, argv[0], &db_id, NULL, NULL)) {
5324                 return -1;
5325         }
5326
5327         ret = ctdb_ctrl_getdbseqnum(ctdb, TIMELIMIT(), options.pnn, db_id, &seqnum);
5328         if (ret != 0) {
5329                 DEBUG(DEBUG_ERR, ("Unable to get seqnum from node."));
5330                 return -1;
5331         }
5332
5333         printf("Sequence number:%lld\n", (long long)seqnum);
5334
5335         return 0;
5336 }
5337
5338 /*
5339   run an eventscript on a node
5340  */
5341 static int control_eventscript(struct ctdb_context *ctdb, int argc, const char **argv)
5342 {
5343         TDB_DATA data;
5344         int ret;
5345         int32_t res;
5346         char *errmsg;
5347         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
5348
5349         if (argc != 1) {
5350                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
5351                 return -1;
5352         }
5353
5354         data.dptr = (unsigned char *)discard_const(argv[0]);
5355         data.dsize = strlen((char *)data.dptr) + 1;
5356
5357         DEBUG(DEBUG_ERR, ("Running eventscripts with arguments \"%s\" on node %u\n", data.dptr, options.pnn));
5358
5359         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_RUN_EVENTSCRIPTS,
5360                            0, data, tmp_ctx, NULL, &res, NULL, &errmsg);
5361         if (ret != 0 || res != 0) {
5362                 DEBUG(DEBUG_ERR,("Failed to run eventscripts - %s\n", errmsg));
5363                 talloc_free(tmp_ctx);
5364                 return -1;
5365         }
5366         talloc_free(tmp_ctx);
5367         return 0;
5368 }
5369
5370 #define DB_VERSION 1
5371 #define MAX_DB_NAME 64
5372 struct db_file_header {
5373         unsigned long version;
5374         time_t timestamp;
5375         unsigned long persistent;
5376         unsigned long size;
5377         const char name[MAX_DB_NAME];
5378 };
5379
5380 struct backup_data {
5381         struct ctdb_marshall_buffer *records;
5382         uint32_t len;
5383         uint32_t total;
5384         bool traverse_error;
5385 };
5386
5387 static int backup_traverse(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *private)
5388 {
5389         struct backup_data *bd = talloc_get_type(private, struct backup_data);
5390         struct ctdb_rec_data *rec;
5391
5392         /* add the record */
5393         rec = ctdb_marshall_record(bd->records, 0, key, NULL, data);
5394         if (rec == NULL) {
5395                 bd->traverse_error = true;
5396                 DEBUG(DEBUG_ERR,("Failed to marshall record\n"));
5397                 return -1;
5398         }
5399         bd->records = talloc_realloc_size(NULL, bd->records, rec->length + bd->len);
5400         if (bd->records == NULL) {
5401                 DEBUG(DEBUG_ERR,("Failed to expand marshalling buffer\n"));
5402                 bd->traverse_error = true;
5403                 return -1;
5404         }
5405         bd->records->count++;
5406         memcpy(bd->len+(uint8_t *)bd->records, rec, rec->length);
5407         bd->len += rec->length;
5408         talloc_free(rec);
5409
5410         bd->total++;
5411         return 0;
5412 }
5413
5414 /*
5415  * backup a database to a file 
5416  */
5417 static int control_backupdb(struct ctdb_context *ctdb, int argc, const char **argv)
5418 {
5419         const char *db_name;
5420         int ret;
5421         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
5422         struct db_file_header dbhdr;
5423         struct ctdb_db_context *ctdb_db;
5424         struct backup_data *bd;
5425         int fh = -1;
5426         int status = -1;
5427         const char *reason = NULL;
5428         uint32_t db_id;
5429         uint8_t flags;
5430
5431         assert_single_node_only();
5432
5433         if (argc != 2) {
5434                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
5435                 return -1;
5436         }
5437
5438         if (!db_exists(ctdb, argv[0], &db_id, &db_name, &flags)) {
5439                 return -1;
5440         }
5441
5442         ret = ctdb_ctrl_getdbhealth(ctdb, TIMELIMIT(), options.pnn,
5443                                     db_id, tmp_ctx, &reason);
5444         if (ret != 0) {
5445                 DEBUG(DEBUG_ERR,("Unable to get dbhealth for database '%s'\n",
5446                                  argv[0]));
5447                 talloc_free(tmp_ctx);
5448                 return -1;
5449         }
5450         if (reason) {
5451                 uint32_t allow_unhealthy = 0;
5452
5453                 ctdb_ctrl_get_tunable(ctdb, TIMELIMIT(), options.pnn,
5454                                       "AllowUnhealthyDBRead",
5455                                       &allow_unhealthy);
5456
5457                 if (allow_unhealthy != 1) {
5458                         DEBUG(DEBUG_ERR,("database '%s' is unhealthy: %s\n",
5459                                          argv[0], reason));
5460
5461                         DEBUG(DEBUG_ERR,("disallow backup : tunable AllowUnhealthyDBRead = %u\n",
5462                                          allow_unhealthy));
5463                         talloc_free(tmp_ctx);
5464                         return -1;
5465                 }
5466
5467                 DEBUG(DEBUG_WARNING,("WARNING database '%s' is unhealthy - see 'ctdb getdbstatus %s'\n",
5468                                      argv[0], argv[0]));
5469                 DEBUG(DEBUG_WARNING,("WARNING! allow backup of unhealthy database: "
5470                                      "tunnable AllowUnhealthyDBRead = %u\n",
5471                                      allow_unhealthy));
5472         }
5473
5474         ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), db_name, flags & CTDB_DB_FLAGS_PERSISTENT, 0);
5475         if (ctdb_db == NULL) {
5476                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", argv[0]));
5477                 talloc_free(tmp_ctx);
5478                 return -1;
5479         }
5480
5481
5482         ret = tdb_transaction_start(ctdb_db->ltdb->tdb);
5483         if (ret == -1) {
5484                 DEBUG(DEBUG_ERR,("Failed to start transaction\n"));
5485                 talloc_free(tmp_ctx);
5486                 return -1;
5487         }
5488
5489
5490         bd = talloc_zero(tmp_ctx, struct backup_data);
5491         if (bd == NULL) {
5492                 DEBUG(DEBUG_ERR,("Failed to allocate backup_data\n"));
5493                 talloc_free(tmp_ctx);
5494                 return -1;
5495         }
5496
5497         bd->records = talloc_zero(bd, struct ctdb_marshall_buffer);
5498         if (bd->records == NULL) {
5499                 DEBUG(DEBUG_ERR,("Failed to allocate ctdb_marshall_buffer\n"));
5500                 talloc_free(tmp_ctx);
5501                 return -1;
5502         }
5503
5504         bd->len = offsetof(struct ctdb_marshall_buffer, data);
5505         bd->records->db_id = ctdb_db->db_id;
5506         /* traverse the database collecting all records */
5507         if (tdb_traverse_read(ctdb_db->ltdb->tdb, backup_traverse, bd) == -1 ||
5508             bd->traverse_error) {
5509                 DEBUG(DEBUG_ERR,("Traverse error\n"));
5510                 talloc_free(tmp_ctx);
5511                 return -1;              
5512         }
5513
5514         tdb_transaction_cancel(ctdb_db->ltdb->tdb);
5515
5516
5517         fh = open(argv[1], O_RDWR|O_CREAT, 0600);
5518         if (fh == -1) {
5519                 DEBUG(DEBUG_ERR,("Failed to open file '%s'\n", argv[1]));
5520                 talloc_free(tmp_ctx);
5521                 return -1;
5522         }
5523
5524         ZERO_STRUCT(dbhdr);
5525         dbhdr.version = DB_VERSION;
5526         dbhdr.timestamp = time(NULL);
5527         dbhdr.persistent = flags & CTDB_DB_FLAGS_PERSISTENT;
5528         dbhdr.size = bd->len;
5529         if (strlen(argv[0]) >= MAX_DB_NAME) {
5530                 DEBUG(DEBUG_ERR,("Too long dbname\n"));
5531                 goto done;
5532         }
5533         strncpy(discard_const(dbhdr.name), argv[0], MAX_DB_NAME-1);
5534         ret = write(fh, &dbhdr, sizeof(dbhdr));
5535         if (ret == -1) {
5536                 DEBUG(DEBUG_ERR,("write failed: %s\n", strerror(errno)));
5537                 goto done;
5538         }
5539         ret = write(fh, bd->records, bd->len);
5540         if (ret == -1) {
5541                 DEBUG(DEBUG_ERR,("write failed: %s\n", strerror(errno)));
5542                 goto done;
5543         }
5544
5545         status = 0;
5546 done:
5547         if (fh != -1) {
5548                 ret = close(fh);
5549                 if (ret == -1) {
5550                         DEBUG(DEBUG_ERR,("close failed: %s\n", strerror(errno)));
5551                 }
5552         }
5553
5554         DEBUG(DEBUG_ERR,("Database backed up to %s\n", argv[1]));
5555
5556         talloc_free(tmp_ctx);
5557         return status;
5558 }
5559
5560 /*
5561  * restore a database from a file 
5562  */
5563 static int control_restoredb(struct ctdb_context *ctdb, int argc, const char **argv)
5564 {
5565         int ret;
5566         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
5567         TDB_DATA outdata;
5568         TDB_DATA data;
5569         struct db_file_header dbhdr;
5570         struct ctdb_db_context *ctdb_db;
5571         struct ctdb_node_map *nodemap=NULL;
5572         struct ctdb_vnn_map *vnnmap=NULL;
5573         int i, fh;
5574         struct ctdb_control_wipe_database w;
5575         uint32_t *nodes;
5576         uint32_t generation;
5577         struct tm *tm;
5578         char tbuf[100];
5579         char *dbname;
5580
5581         assert_single_node_only();
5582
5583         if (argc < 1 || argc > 2) {
5584                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
5585                 return -1;
5586         }
5587
5588         fh = open(argv[0], O_RDONLY);
5589         if (fh == -1) {
5590                 DEBUG(DEBUG_ERR,("Failed to open file '%s'\n", argv[0]));
5591                 talloc_free(tmp_ctx);
5592                 return -1;
5593         }
5594
5595         read(fh, &dbhdr, sizeof(dbhdr));
5596         if (dbhdr.version != DB_VERSION) {
5597                 DEBUG(DEBUG_ERR,("Invalid version of database dump. File is version %lu but expected version was %u\n", dbhdr.version, DB_VERSION));
5598                 close(fh);
5599                 talloc_free(tmp_ctx);
5600                 return -1;
5601         }
5602
5603         dbname = discard_const(dbhdr.name);
5604         if (argc == 2) {
5605                 dbname = discard_const(argv[1]);
5606         }
5607
5608         outdata.dsize = dbhdr.size;
5609         outdata.dptr = talloc_size(tmp_ctx, outdata.dsize);
5610         if (outdata.dptr == NULL) {
5611                 DEBUG(DEBUG_ERR,("Failed to allocate data of size '%lu'\n", dbhdr.size));
5612                 close(fh);
5613                 talloc_free(tmp_ctx);
5614                 return -1;
5615         }               
5616         read(fh, outdata.dptr, outdata.dsize);
5617         close(fh);
5618
5619         tm = localtime(&dbhdr.timestamp);
5620         strftime(tbuf,sizeof(tbuf)-1,"%Y/%m/%d %H:%M:%S", tm);
5621         printf("Restoring database '%s' from backup @ %s\n",
5622                 dbname, tbuf);
5623
5624
5625         ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), dbname, dbhdr.persistent, 0);
5626         if (ctdb_db == NULL) {
5627                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", dbname));
5628                 talloc_free(tmp_ctx);
5629                 return -1;
5630         }
5631
5632         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb, &nodemap);
5633         if (ret != 0) {
5634                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
5635                 talloc_free(tmp_ctx);
5636                 return ret;
5637         }
5638
5639
5640         ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &vnnmap);
5641         if (ret != 0) {
5642                 DEBUG(DEBUG_ERR, ("Unable to get vnnmap from node %u\n", options.pnn));
5643                 talloc_free(tmp_ctx);
5644                 return ret;
5645         }
5646
5647         /* freeze all nodes */
5648         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
5649         for (i=1; i<=NUM_DB_PRIORITIES; i++) {
5650                 if (ctdb_client_async_control(ctdb, CTDB_CONTROL_FREEZE,
5651                                         nodes, i,
5652                                         TIMELIMIT(),
5653                                         false, tdb_null,
5654                                         NULL, NULL,
5655                                         NULL) != 0) {
5656                         DEBUG(DEBUG_ERR, ("Unable to freeze nodes.\n"));
5657                         ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
5658                         talloc_free(tmp_ctx);
5659                         return -1;
5660                 }
5661         }
5662
5663         generation = vnnmap->generation;
5664         data.dptr = (void *)&generation;
5665         data.dsize = sizeof(generation);
5666
5667         /* start a cluster wide transaction */
5668         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
5669         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_START,
5670                                         nodes, 0,
5671                                         TIMELIMIT(), false, data,
5672                                         NULL, NULL,
5673                                         NULL) != 0) {
5674                 DEBUG(DEBUG_ERR, ("Unable to start cluster wide transactions.\n"));
5675                 return -1;
5676         }
5677
5678
5679         w.db_id = ctdb_db->db_id;
5680         w.transaction_id = generation;
5681
5682         data.dptr = (void *)&w;
5683         data.dsize = sizeof(w);
5684
5685         /* wipe all the remote databases. */
5686         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
5687         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_WIPE_DATABASE,
5688                                         nodes, 0,
5689                                         TIMELIMIT(), false, data,
5690                                         NULL, NULL,
5691                                         NULL) != 0) {
5692                 DEBUG(DEBUG_ERR, ("Unable to wipe database.\n"));
5693                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
5694                 talloc_free(tmp_ctx);
5695                 return -1;
5696         }
5697         
5698         /* push the database */
5699         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
5700         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_PUSH_DB,
5701                                         nodes, 0,
5702                                         TIMELIMIT(), false, outdata,
5703                                         NULL, NULL,
5704                                         NULL) != 0) {
5705                 DEBUG(DEBUG_ERR, ("Failed to push database.\n"));
5706                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
5707                 talloc_free(tmp_ctx);
5708                 return -1;
5709         }
5710
5711         data.dptr = (void *)&ctdb_db->db_id;
5712         data.dsize = sizeof(ctdb_db->db_id);
5713
5714         /* mark the database as healthy */
5715         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
5716         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_DB_SET_HEALTHY,
5717                                         nodes, 0,
5718                                         TIMELIMIT(), false, data,
5719                                         NULL, NULL,
5720                                         NULL) != 0) {
5721                 DEBUG(DEBUG_ERR, ("Failed to mark database as healthy.\n"));
5722                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
5723                 talloc_free(tmp_ctx);
5724                 return -1;
5725         }
5726
5727         data.dptr = (void *)&generation;
5728         data.dsize = sizeof(generation);
5729
5730         /* commit all the changes */
5731         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_COMMIT,
5732                                         nodes, 0,
5733                                         TIMELIMIT(), false, data,
5734                                         NULL, NULL,
5735                                         NULL) != 0) {
5736                 DEBUG(DEBUG_ERR, ("Unable to commit databases.\n"));
5737                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
5738                 talloc_free(tmp_ctx);
5739                 return -1;
5740         }
5741
5742
5743         /* thaw all nodes */
5744         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
5745         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_THAW,
5746                                         nodes, 0,
5747                                         TIMELIMIT(),
5748                                         false, tdb_null,
5749                                         NULL, NULL,
5750                                         NULL) != 0) {
5751                 DEBUG(DEBUG_ERR, ("Unable to thaw nodes.\n"));
5752                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
5753                 talloc_free(tmp_ctx);
5754                 return -1;
5755         }
5756
5757
5758         talloc_free(tmp_ctx);
5759         return 0;
5760 }
5761
5762 /*
5763  * dump a database backup from a file
5764  */
5765 static int control_dumpdbbackup(struct ctdb_context *ctdb, int argc, const char **argv)
5766 {
5767         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
5768         TDB_DATA outdata;
5769         struct db_file_header dbhdr;
5770         int i, fh;
5771         struct tm *tm;
5772         char tbuf[100];
5773         struct ctdb_rec_data *rec = NULL;
5774         struct ctdb_marshall_buffer *m;
5775         struct ctdb_dump_db_context c;
5776
5777         assert_single_node_only();
5778
5779         if (argc != 1) {
5780                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
5781                 return -1;
5782         }
5783
5784         fh = open(argv[0], O_RDONLY);
5785         if (fh == -1) {
5786                 DEBUG(DEBUG_ERR,("Failed to open file '%s'\n", argv[0]));
5787                 talloc_free(tmp_ctx);
5788                 return -1;
5789         }
5790
5791         read(fh, &dbhdr, sizeof(dbhdr));
5792         if (dbhdr.version != DB_VERSION) {
5793                 DEBUG(DEBUG_ERR,("Invalid version of database dump. File is version %lu but expected version was %u\n", dbhdr.version, DB_VERSION));
5794                 close(fh);
5795                 talloc_free(tmp_ctx);
5796                 return -1;
5797         }
5798
5799         outdata.dsize = dbhdr.size;
5800         outdata.dptr = talloc_size(tmp_ctx, outdata.dsize);
5801         if (outdata.dptr == NULL) {
5802                 DEBUG(DEBUG_ERR,("Failed to allocate data of size '%lu'\n", dbhdr.size));
5803                 close(fh);
5804                 talloc_free(tmp_ctx);
5805                 return -1;
5806         }
5807         read(fh, outdata.dptr, outdata.dsize);
5808         close(fh);
5809         m = (struct ctdb_marshall_buffer *)outdata.dptr;
5810
5811         tm = localtime(&dbhdr.timestamp);
5812         strftime(tbuf,sizeof(tbuf)-1,"%Y/%m/%d %H:%M:%S", tm);
5813         printf("Backup of database name:'%s' dbid:0x%x08x from @ %s\n",
5814                 dbhdr.name, m->db_id, tbuf);
5815
5816         ZERO_STRUCT(c);
5817         c.f = stdout;
5818         c.printemptyrecords = (bool)options.printemptyrecords;
5819         c.printdatasize = (bool)options.printdatasize;
5820         c.printlmaster = false;
5821         c.printhash = (bool)options.printhash;
5822         c.printrecordflags = (bool)options.printrecordflags;
5823
5824         for (i=0; i < m->count; i++) {
5825                 uint32_t reqid = 0;
5826                 TDB_DATA key, data;
5827
5828                 /* we do not want the header splitted, so we pass NULL*/
5829                 rec = ctdb_marshall_loop_next(m, rec, &reqid,
5830                                               NULL, &key, &data);
5831
5832                 ctdb_dumpdb_record(ctdb, key, data, &c);
5833         }
5834
5835         printf("Dumped %d records\n", i);
5836         talloc_free(tmp_ctx);
5837         return 0;
5838 }
5839
5840 /*
5841  * wipe a database from a file
5842  */
5843 static int control_wipedb(struct ctdb_context *ctdb, int argc,
5844                           const char **argv)
5845 {
5846         const char *db_name;
5847         int ret;
5848         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
5849         TDB_DATA data;
5850         struct ctdb_db_context *ctdb_db;
5851         struct ctdb_node_map *nodemap = NULL;
5852         struct ctdb_vnn_map *vnnmap = NULL;
5853         int i;
5854         struct ctdb_control_wipe_database w;
5855         uint32_t *nodes;
5856         uint32_t generation;
5857         uint8_t flags;
5858
5859         assert_single_node_only();
5860
5861         if (argc != 1) {
5862                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
5863                 return -1;
5864         }
5865
5866         if (!db_exists(ctdb, argv[0], NULL, &db_name, &flags)) {
5867                 return -1;
5868         }
5869
5870         ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), db_name, flags & CTDB_DB_FLAGS_PERSISTENT, 0);
5871         if (ctdb_db == NULL) {
5872                 DEBUG(DEBUG_ERR, ("Unable to attach to database '%s'\n",
5873                                   argv[0]));
5874                 talloc_free(tmp_ctx);
5875                 return -1;
5876         }
5877
5878         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb,
5879                                    &nodemap);
5880         if (ret != 0) {
5881                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n",
5882                                   options.pnn));
5883                 talloc_free(tmp_ctx);
5884                 return ret;
5885         }
5886
5887         ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx,
5888                                   &vnnmap);
5889         if (ret != 0) {
5890                 DEBUG(DEBUG_ERR, ("Unable to get vnnmap from node %u\n",
5891                                   options.pnn));
5892                 talloc_free(tmp_ctx);
5893                 return ret;
5894         }
5895
5896         /* freeze all nodes */
5897         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
5898         for (i=1; i<=NUM_DB_PRIORITIES; i++) {
5899                 ret = ctdb_client_async_control(ctdb, CTDB_CONTROL_FREEZE,
5900                                                 nodes, i,
5901                                                 TIMELIMIT(),
5902                                                 false, tdb_null,
5903                                                 NULL, NULL,
5904                                                 NULL);
5905                 if (ret != 0) {
5906                         DEBUG(DEBUG_ERR, ("Unable to freeze nodes.\n"));
5907                         ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn,
5908                                              CTDB_RECOVERY_ACTIVE);
5909                         talloc_free(tmp_ctx);
5910                         return -1;
5911                 }
5912         }
5913
5914         generation = vnnmap->generation;
5915         data.dptr = (void *)&generation;
5916         data.dsize = sizeof(generation);
5917
5918         /* start a cluster wide transaction */
5919         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
5920         ret = ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_START,
5921                                         nodes, 0,
5922                                         TIMELIMIT(), false, data,
5923                                         NULL, NULL,
5924                                         NULL);
5925         if (ret!= 0) {
5926                 DEBUG(DEBUG_ERR, ("Unable to start cluster wide "
5927                                   "transactions.\n"));
5928                 return -1;
5929         }
5930
5931         w.db_id = ctdb_db->db_id;
5932         w.transaction_id = generation;
5933
5934         data.dptr = (void *)&w;
5935         data.dsize = sizeof(w);
5936
5937         /* wipe all the remote databases. */
5938         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
5939         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_WIPE_DATABASE,
5940                                         nodes, 0,
5941                                         TIMELIMIT(), false, data,
5942                                         NULL, NULL,
5943                                         NULL) != 0) {
5944                 DEBUG(DEBUG_ERR, ("Unable to wipe database.\n"));
5945                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
5946                 talloc_free(tmp_ctx);
5947                 return -1;
5948         }
5949
5950         data.dptr = (void *)&ctdb_db->db_id;
5951         data.dsize = sizeof(ctdb_db->db_id);
5952
5953         /* mark the database as healthy */
5954         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
5955         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_DB_SET_HEALTHY,
5956                                         nodes, 0,
5957                                         TIMELIMIT(), false, data,
5958                                         NULL, NULL,
5959                                         NULL) != 0) {
5960                 DEBUG(DEBUG_ERR, ("Failed to mark database as healthy.\n"));
5961                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
5962                 talloc_free(tmp_ctx);
5963                 return -1;
5964         }
5965
5966         data.dptr = (void *)&generation;
5967         data.dsize = sizeof(generation);
5968
5969         /* commit all the changes */
5970         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_COMMIT,
5971                                         nodes, 0,
5972                                         TIMELIMIT(), false, data,
5973                                         NULL, NULL,
5974                                         NULL) != 0) {
5975                 DEBUG(DEBUG_ERR, ("Unable to commit databases.\n"));
5976                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
5977                 talloc_free(tmp_ctx);
5978                 return -1;
5979         }
5980
5981         /* thaw all nodes */
5982         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
5983         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_THAW,
5984                                         nodes, 0,
5985                                         TIMELIMIT(),
5986                                         false, tdb_null,
5987                                         NULL, NULL,
5988                                         NULL) != 0) {
5989                 DEBUG(DEBUG_ERR, ("Unable to thaw nodes.\n"));
5990                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
5991                 talloc_free(tmp_ctx);
5992                 return -1;
5993         }
5994
5995         DEBUG(DEBUG_ERR, ("Database wiped.\n"));
5996
5997         talloc_free(tmp_ctx);
5998         return 0;
5999 }
6000
6001 /*
6002   dump memory usage
6003  */
6004 static int control_dumpmemory(struct ctdb_context *ctdb, int argc, const char **argv)
6005 {
6006         TDB_DATA data;
6007         int ret;
6008         int32_t res;
6009         char *errmsg;
6010         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
6011         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_DUMP_MEMORY,
6012                            0, tdb_null, tmp_ctx, &data, &res, NULL, &errmsg);
6013         if (ret != 0 || res != 0) {
6014                 DEBUG(DEBUG_ERR,("Failed to dump memory - %s\n", errmsg));
6015                 talloc_free(tmp_ctx);
6016                 return -1;
6017         }
6018         write(1, data.dptr, data.dsize);
6019         talloc_free(tmp_ctx);
6020         return 0;
6021 }
6022
6023 /*
6024   handler for memory dumps
6025 */
6026 static void mem_dump_handler(struct ctdb_context *ctdb, uint64_t srvid, 
6027                              TDB_DATA data, void *private_data)
6028 {
6029         write(1, data.dptr, data.dsize);
6030         exit(0);
6031 }
6032
6033 /*
6034   dump memory usage on the recovery daemon
6035  */
6036 static int control_rddumpmemory(struct ctdb_context *ctdb, int argc, const char **argv)
6037 {
6038         int ret;
6039         TDB_DATA data;
6040         struct srvid_request rd;
6041
6042         rd.pnn = ctdb_get_pnn(ctdb);
6043         rd.srvid = getpid();
6044
6045         /* register a message port for receiveing the reply so that we
6046            can receive the reply
6047         */
6048         ctdb_client_set_message_handler(ctdb, rd.srvid, mem_dump_handler, NULL);
6049
6050
6051         data.dptr = (uint8_t *)&rd;
6052         data.dsize = sizeof(rd);
6053
6054         ret = ctdb_client_send_message(ctdb, options.pnn, CTDB_SRVID_MEM_DUMP, data);
6055         if (ret != 0) {
6056                 DEBUG(DEBUG_ERR,("Failed to send memdump request message to %u\n", options.pnn));
6057                 return -1;
6058         }
6059
6060         /* this loop will terminate when we have received the reply */
6061         while (1) {     
6062                 event_loop_once(ctdb->ev);
6063         }
6064
6065         return 0;
6066 }
6067
6068 /*
6069   send a message to a srvid
6070  */
6071 static int control_msgsend(struct ctdb_context *ctdb, int argc, const char **argv)
6072 {
6073         unsigned long srvid;
6074         int ret;
6075         TDB_DATA data;
6076
6077         if (argc < 2) {
6078                 usage();
6079         }
6080
6081         srvid      = strtoul(argv[0], NULL, 0);
6082
6083         data.dptr = (uint8_t *)discard_const(argv[1]);
6084         data.dsize= strlen(argv[1]);
6085
6086         ret = ctdb_client_send_message(ctdb, CTDB_BROADCAST_CONNECTED, srvid, data);
6087         if (ret != 0) {
6088                 DEBUG(DEBUG_ERR,("Failed to send memdump request message to %u\n", options.pnn));
6089                 return -1;
6090         }
6091
6092         return 0;
6093 }
6094
6095 /*
6096   handler for msglisten
6097 */
6098 static void msglisten_handler(struct ctdb_context *ctdb, uint64_t srvid, 
6099                              TDB_DATA data, void *private_data)
6100 {
6101         int i;
6102
6103         printf("Message received: ");
6104         for (i=0;i<data.dsize;i++) {
6105                 printf("%c", data.dptr[i]);
6106         }
6107         printf("\n");
6108 }
6109
6110 /*
6111   listen for messages on a messageport
6112  */
6113 static int control_msglisten(struct ctdb_context *ctdb, int argc, const char **argv)
6114 {
6115         uint64_t srvid;
6116
6117         srvid = getpid();
6118
6119         /* register a message port and listen for messages
6120         */
6121         ctdb_client_set_message_handler(ctdb, srvid, msglisten_handler, NULL);
6122         printf("Listening for messages on srvid:%d\n", (int)srvid);
6123
6124         while (1) {     
6125                 event_loop_once(ctdb->ev);
6126         }
6127
6128         return 0;
6129 }
6130
6131 /*
6132   list all nodes in the cluster
6133   we parse the nodes file directly
6134  */
6135 static int control_listnodes(struct ctdb_context *ctdb, int argc, const char **argv)
6136 {
6137         TALLOC_CTX *mem_ctx = talloc_new(NULL);
6138         struct pnn_node *pnn_nodes;
6139         struct pnn_node *pnn_node;
6140
6141         assert_single_node_only();
6142
6143         pnn_nodes = read_nodes_file(mem_ctx);
6144         if (pnn_nodes == NULL) {
6145                 DEBUG(DEBUG_ERR,("Failed to read nodes file\n"));
6146                 talloc_free(mem_ctx);
6147                 return -1;
6148         }
6149
6150         for(pnn_node=pnn_nodes;pnn_node;pnn_node=pnn_node->next) {
6151                 ctdb_sock_addr addr;
6152                 if (parse_ip(pnn_node->addr, NULL, 63999, &addr) == 0) {
6153                         DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s' in nodes file\n", pnn_node->addr));
6154                         talloc_free(mem_ctx);
6155                         return -1;
6156                 }
6157                 if (options.machinereadable){
6158                         printf(":%d:%s:\n", pnn_node->pnn, pnn_node->addr);
6159                 } else {
6160                         printf("%s\n", pnn_node->addr);
6161                 }
6162         }
6163         talloc_free(mem_ctx);
6164
6165         return 0;
6166 }
6167
6168 /*
6169   reload the nodes file on the local node
6170  */
6171 static int control_reload_nodes_file(struct ctdb_context *ctdb, int argc, const char **argv)
6172 {
6173         int i, ret;
6174         int mypnn;
6175         struct ctdb_node_map *nodemap=NULL;
6176
6177         assert_single_node_only();
6178
6179         mypnn = ctdb_get_pnn(ctdb);
6180
6181         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
6182         if (ret != 0) {
6183                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
6184                 return ret;
6185         }
6186
6187         /* reload the nodes file on all remote nodes */
6188         for (i=0;i<nodemap->num;i++) {
6189                 if (nodemap->nodes[i].pnn == mypnn) {
6190                         continue;
6191                 }
6192                 DEBUG(DEBUG_NOTICE, ("Reloading nodes file on node %u\n", nodemap->nodes[i].pnn));
6193                 ret = ctdb_ctrl_reload_nodes_file(ctdb, TIMELIMIT(),
6194                         nodemap->nodes[i].pnn);
6195                 if (ret != 0) {
6196                         DEBUG(DEBUG_ERR, ("ERROR: Failed to reload nodes file on node %u. You MUST fix that node manually!\n", nodemap->nodes[i].pnn));
6197                 }
6198         }
6199
6200         /* reload the nodes file on the local node */
6201         DEBUG(DEBUG_NOTICE, ("Reloading nodes file on node %u\n", mypnn));
6202         ret = ctdb_ctrl_reload_nodes_file(ctdb, TIMELIMIT(), mypnn);
6203         if (ret != 0) {
6204                 DEBUG(DEBUG_ERR, ("ERROR: Failed to reload nodes file on node %u. You MUST fix that node manually!\n", mypnn));
6205         }
6206
6207         /* initiate a recovery */
6208         control_recover(ctdb, argc, argv);
6209
6210         return 0;
6211 }
6212
6213
6214 static const struct {
6215         const char *name;
6216         int (*fn)(struct ctdb_context *, int, const char **);
6217         bool auto_all;
6218         bool without_daemon; /* can be run without daemon running ? */
6219         const char *msg;
6220         const char *args;
6221 } ctdb_commands[] = {
6222         { "version",         control_version,           true,   true,   "show version of ctdb" },
6223         { "status",          control_status,            true,   false,  "show node status" },
6224         { "uptime",          control_uptime,            true,   false,  "show node uptime" },
6225         { "ping",            control_ping,              true,   false,  "ping all nodes" },
6226         { "runstate",        control_runstate,          true,   false,  "get/check runstate of a node", "[setup|first_recovery|startup|running]" },
6227         { "getvar",          control_getvar,            true,   false,  "get a tunable variable",               "<name>"},
6228         { "setvar",          control_setvar,            true,   false,  "set a tunable variable",               "<name> <value>"},
6229         { "listvars",        control_listvars,          true,   false,  "list tunable variables"},
6230         { "statistics",      control_statistics,        false,  false, "show statistics" },
6231         { "statisticsreset", control_statistics_reset,  true,   false,  "reset statistics"},
6232         { "stats",           control_stats,             false,  false,  "show rolling statistics", "[number of history records]" },
6233         { "ip",              control_ip,                false,  false,  "show which public ip's that ctdb manages" },
6234         { "ipinfo",          control_ipinfo,            true,   false,  "show details about a public ip that ctdb manages", "<ip>" },
6235         { "ifaces",          control_ifaces,            true,   false,  "show which interfaces that ctdb manages" },
6236         { "setifacelink",    control_setifacelink,      true,   false,  "set interface link status", "<iface> <status>" },
6237         { "process-exists",  control_process_exists,    true,   false,  "check if a process exists on a node",  "<pid>"},
6238         { "getdbmap",        control_getdbmap,          true,   false,  "show the database map" },
6239         { "getdbstatus",     control_getdbstatus,       true,   false,  "show the status of a database", "<dbname|dbid>" },
6240         { "catdb",           control_catdb,             true,   false,  "dump a ctdb database" ,                     "<dbname|dbid>"},
6241         { "cattdb",          control_cattdb,            true,   false,  "dump a local tdb database" ,                     "<dbname|dbid>"},
6242         { "getmonmode",      control_getmonmode,        true,   false,  "show monitoring mode" },
6243         { "getcapabilities", control_getcapabilities,   true,   false,  "show node capabilities" },
6244         { "pnn",             control_pnn,               true,   false,  "show the pnn of the currnet node" },
6245         { "lvs",             control_lvs,               true,   false,  "show lvs configuration" },
6246         { "lvsmaster",       control_lvsmaster,         true,   false,  "show which node is the lvs master" },
6247         { "disablemonitor",      control_disable_monmode,true,  false,  "set monitoring mode to DISABLE" },
6248         { "enablemonitor",      control_enable_monmode, true,   false,  "set monitoring mode to ACTIVE" },
6249         { "setdebug",        control_setdebug,          true,   false,  "set debug level",                      "<EMERG|ALERT|CRIT|ERR|WARNING|NOTICE|INFO|DEBUG>" },
6250         { "getdebug",        control_getdebug,          true,   false,  "get debug level" },
6251         { "getlog",          control_getlog,            true,   false,  "get the log data from the in memory ringbuffer", "[<level>] [recoverd]" },
6252         { "clearlog",          control_clearlog,        true,   false,  "clear the log data from the in memory ringbuffer", "[recoverd]" },
6253         { "attach",          control_attach,            true,   false,  "attach to a database",                 "<dbname> [persistent]" },
6254         { "dumpmemory",      control_dumpmemory,        true,   false,  "dump memory map to stdout" },
6255         { "rddumpmemory",    control_rddumpmemory,      true,   false,  "dump memory map from the recovery daemon to stdout" },
6256         { "getpid",          control_getpid,            true,   false,  "get ctdbd process ID" },
6257         { "disable",         control_disable,           true,   false,  "disable a nodes public IP" },
6258         { "enable",          control_enable,            true,   false,  "enable a nodes public IP" },
6259         { "stop",            control_stop,              true,   false,  "stop a node" },
6260         { "continue",        control_continue,          true,   false,  "re-start a stopped node" },
6261         { "ban",             control_ban,               true,   false,  "ban a node from the cluster",          "<bantime>"},
6262         { "unban",           control_unban,             true,   false,  "unban a node" },
6263         { "showban",         control_showban,           true,   false,  "show ban information"},
6264         { "shutdown",        control_shutdown,          true,   false,  "shutdown ctdbd" },
6265         { "recover",         control_recover,           true,   false,  "force recovery" },
6266         { "sync",            control_ipreallocate,      false,  false,  "wait until ctdbd has synced all state changes" },
6267         { "ipreallocate",    control_ipreallocate,      false,  false,  "force the recovery daemon to perform a ip reallocation procedure" },
6268         { "thaw",            control_thaw,              true,   false,  "thaw databases", "[priority:1-3]" },
6269         { "isnotrecmaster",  control_isnotrecmaster,    false,  false,  "check if the local node is recmaster or not" },
6270         { "killtcp",         kill_tcp,                  false,  false, "kill a tcp connection.", "[<srcip:port> <dstip:port>]" },
6271         { "gratiousarp",     control_gratious_arp,      false,  false, "send a gratious arp", "<ip> <interface>" },
6272         { "tickle",          tickle_tcp,                false,  false, "send a tcp tickle ack", "<srcip:port> <dstip:port>" },
6273         { "gettickles",      control_get_tickles,       false,  false, "get the list of tickles registered for this ip", "<ip> [<port>]" },
6274         { "addtickle",       control_add_tickle,        false,  false, "add a tickle for this ip", "<ip>:<port> <ip>:<port>" },
6275
6276         { "deltickle",       control_del_tickle,        false,  false, "delete a tickle from this ip", "<ip>:<port> <ip>:<port>" },
6277
6278         { "regsrvid",        regsrvid,                  false,  false, "register a server id", "<pnn> <type> <id>" },
6279         { "unregsrvid",      unregsrvid,                false,  false, "unregister a server id", "<pnn> <type> <id>" },
6280         { "chksrvid",        chksrvid,                  false,  false, "check if a server id exists", "<pnn> <type> <id>" },
6281         { "getsrvids",       getsrvids,                 false,  false, "get a list of all server ids"},
6282         { "check_srvids",    check_srvids,              false,  false, "check if a srvid exists", "<id>+" },
6283         { "repack",          ctdb_repack,               false,  false, "repack all databases", "[max_freelist]"},
6284         { "listnodes",       control_listnodes,         false,  true, "list all nodes in the cluster"},
6285         { "reloadnodes",     control_reload_nodes_file, false,  false, "reload the nodes file and restart the transport on all nodes"},
6286         { "moveip",          control_moveip,            false,  false, "move/failover an ip address to another node", "<ip> <node>"},
6287         { "rebalanceip",     control_rebalanceip,       false,  false, "release an ip from the node and let recd rebalance it", "<ip>"},
6288         { "addip",           control_addip,             true,   false, "add a ip address to a node", "<ip/mask> <iface>"},
6289         { "delip",           control_delip,             false,  false, "delete an ip address from a node", "<ip>"},
6290         { "eventscript",     control_eventscript,       true,   false, "run the eventscript with the given parameters on a node", "<arguments>"},
6291         { "backupdb",        control_backupdb,          false,  false, "backup the database into a file.", "<dbname|dbid> <file>"},
6292         { "restoredb",        control_restoredb,        false,  false, "restore the database from a file.", "<file> [dbname]"},
6293         { "dumpdbbackup",    control_dumpdbbackup,      false,  true,  "dump database backup from a file.", "<file>"},
6294         { "wipedb",           control_wipedb,        false,     false, "wipe the contents of a database.", "<dbname|dbid>"},
6295         { "recmaster",        control_recmaster,        true,   false, "show the pnn for the recovery master."},
6296         { "scriptstatus",     control_scriptstatus,     true,   false, "show the status of the monitoring scripts (or all scripts)", "[all]"},
6297         { "enablescript",     control_enablescript,  true,      false, "enable an eventscript", "<script>"},
6298         { "disablescript",    control_disablescript,  true,     false, "disable an eventscript", "<script>"},
6299         { "natgwlist",        control_natgwlist,        true,   false, "show the nodes belonging to this natgw configuration"},
6300         { "xpnn",             control_xpnn,             false,  true,  "find the pnn of the local node without talking to the daemon (unreliable)" },
6301         { "getreclock",       control_getreclock,       true,   false, "Show the reclock file of a node"},
6302         { "setreclock",       control_setreclock,       true,   false, "Set/clear the reclock file of a node", "[filename]"},
6303         { "setnatgwstate",    control_setnatgwstate,    false,  false, "Set NATGW state to on/off", "{on|off}"},
6304         { "setlmasterrole",   control_setlmasterrole,   false,  false, "Set LMASTER role to on/off", "{on|off}"},
6305         { "setrecmasterrole", control_setrecmasterrole, false,  false, "Set RECMASTER role to on/off", "{on|off}"},
6306         { "setdbprio",        control_setdbprio,        false,  false, "Set DB priority", "<dbname|dbid> <prio:1-3>"},
6307         { "getdbprio",        control_getdbprio,        false,  false, "Get DB priority", "<dbname|dbid>"},
6308         { "setdbreadonly",    control_setdbreadonly,    false,  false, "Set DB readonly capable", "<dbname|dbid>"},
6309         { "setdbsticky",      control_setdbsticky,      false,  false, "Set DB sticky-records capable", "<dbname|dbid>"},
6310         { "msglisten",        control_msglisten,        false,  false, "Listen on a srvid port for messages", "<msg srvid>"},
6311         { "msgsend",          control_msgsend,  false,  false, "Send a message to srvid", "<srvid> <message>"},
6312         { "pfetch",          control_pfetch,            false,  false,  "fetch a record from a persistent database", "<dbname|dbid> <key> [<file>]" },
6313         { "pstore",          control_pstore,            false,  false,  "write a record to a persistent database", "<dbname|dbid> <key> <file containing record>" },
6314         { "pdelete",         control_pdelete,           false,  false,  "delete a record from a persistent database", "<dbname|dbid> <key>" },
6315         { "ptrans",          control_ptrans,            false,  false,  "update a persistent database (from stdin)", "<dbname|dbid>" },
6316         { "tfetch",          control_tfetch,            false,  true,  "fetch a record from a [c]tdb-file [-v]", "<tdb-file> <key> [<file>]" },
6317         { "tstore",          control_tstore,            false,  true,  "store a record (including ltdb header)", "<tdb-file> <key> <data> [<rsn> <dmaster> <flags>]" },
6318         { "readkey",         control_readkey,           true,   false,  "read the content off a database key", "<dbname|dbid> <key>" },
6319         { "writekey",        control_writekey,          true,   false,  "write to a database key", "<dbname|dbid> <key> <value>" },
6320         { "checktcpport",    control_chktcpport,        false,  true,  "check if a service is bound to a specific tcp port or not", "<port>" },
6321         { "rebalancenode",     control_rebalancenode,   false,  false, "mark nodes as forced IP rebalancing targets", "[<pnn-list>]"},
6322         { "getdbseqnum",     control_getdbseqnum,       false,  false, "get the sequence number off a database", "<dbname|dbid>" },
6323         { "nodestatus",      control_nodestatus,        true,   false,  "show and return node status", "[<pnn-list>]" },
6324         { "dbstatistics",    control_dbstatistics,      false,  false, "show db statistics", "<dbname|dbid>" },
6325         { "reloadips",       control_reloadips,         false,  false, "reload the public addresses file on specified nodes" , "[<pnn-list>]" },
6326         { "ipiface",         control_ipiface,           false,  true,  "Find which interface an ip address is hosted on", "<ip>" },
6327 };
6328
6329 /*
6330   show usage message
6331  */
6332 static void usage(void)
6333 {
6334         int i;
6335         printf(
6336 "Usage: ctdb [options] <control>\n" \
6337 "Options:\n" \
6338 "   -n <node>          choose node number, or 'all' (defaults to local node)\n"
6339 "   -Y                 generate machinereadable output\n"
6340 "   -v                 generate verbose output\n"
6341 "   -t <timelimit>     set timelimit for control in seconds (default %u)\n", options.timelimit);
6342         printf("Controls:\n");
6343         for (i=0;i<ARRAY_SIZE(ctdb_commands);i++) {
6344                 printf("  %-15s %-27s  %s\n", 
6345                        ctdb_commands[i].name, 
6346                        ctdb_commands[i].args?ctdb_commands[i].args:"",
6347                        ctdb_commands[i].msg);
6348         }
6349         exit(1);
6350 }
6351
6352
6353 static void ctdb_alarm(int sig)
6354 {
6355         printf("Maximum runtime exceeded - exiting\n");
6356         _exit(ERR_TIMEOUT);
6357 }
6358
6359 /*
6360   main program
6361 */
6362 int main(int argc, const char *argv[])
6363 {
6364         struct ctdb_context *ctdb;
6365         char *nodestring = NULL;
6366         struct poptOption popt_options[] = {
6367                 POPT_AUTOHELP
6368                 POPT_CTDB_CMDLINE
6369                 { "timelimit", 't', POPT_ARG_INT, &options.timelimit, 0, "timelimit", "integer" },
6370                 { "node",      'n', POPT_ARG_STRING, &nodestring, 0, "node", "integer|all" },
6371                 { "machinereadable", 'Y', POPT_ARG_NONE, &options.machinereadable, 0, "enable machinereadable output", NULL },
6372                 { "verbose",    'v', POPT_ARG_NONE, &options.verbose, 0, "enable verbose output", NULL },
6373                 { "maxruntime", 'T', POPT_ARG_INT, &options.maxruntime, 0, "die if runtime exceeds this limit (in seconds)", "integer" },
6374                 { "print-emptyrecords", 0, POPT_ARG_NONE, &options.printemptyrecords, 0, "print the empty records when dumping databases (catdb, cattdb, dumpdbbackup)", NULL },
6375                 { "print-datasize", 0, POPT_ARG_NONE, &options.printdatasize, 0, "do not print record data when dumping databases, only the data size", NULL },
6376                 { "print-lmaster", 0, POPT_ARG_NONE, &options.printlmaster, 0, "print the record's lmaster in catdb", NULL },
6377                 { "print-hash", 0, POPT_ARG_NONE, &options.printhash, 0, "print the record's hash when dumping databases", NULL },
6378                 { "print-recordflags", 0, POPT_ARG_NONE, &options.printrecordflags, 0, "print the record flags in catdb and dumpdbbackup", NULL },
6379                 POPT_TABLEEND
6380         };
6381         int opt;
6382         const char **extra_argv;
6383         int extra_argc = 0;
6384         int ret=-1, i;
6385         poptContext pc;
6386         struct event_context *ev;
6387         const char *control;
6388
6389         setlinebuf(stdout);
6390         
6391         /* set some defaults */
6392         options.maxruntime = 0;
6393         options.timelimit = 10;
6394         options.pnn = CTDB_CURRENT_NODE;
6395
6396         pc = poptGetContext(argv[0], argc, argv, popt_options, POPT_CONTEXT_KEEP_FIRST);
6397
6398         while ((opt = poptGetNextOpt(pc)) != -1) {
6399                 switch (opt) {
6400                 default:
6401                         DEBUG(DEBUG_ERR, ("Invalid option %s: %s\n", 
6402                                 poptBadOption(pc, 0), poptStrerror(opt)));
6403                         exit(1);
6404                 }
6405         }
6406
6407         /* setup the remaining options for the main program to use */
6408         extra_argv = poptGetArgs(pc);
6409         if (extra_argv) {
6410                 extra_argv++;
6411                 while (extra_argv[extra_argc]) extra_argc++;
6412         }
6413
6414         if (extra_argc < 1) {
6415                 usage();
6416         }
6417
6418         if (options.maxruntime == 0) {
6419                 const char *ctdb_timeout;
6420                 ctdb_timeout = getenv("CTDB_TIMEOUT");
6421                 if (ctdb_timeout != NULL) {
6422                         options.maxruntime = strtoul(ctdb_timeout, NULL, 0);
6423                 } else {
6424                         /* default timeout is 120 seconds */
6425                         options.maxruntime = 120;
6426                 }
6427         }
6428
6429         signal(SIGALRM, ctdb_alarm);
6430         alarm(options.maxruntime);
6431
6432         control = extra_argv[0];
6433
6434         /* Default value for CTDB_BASE - don't override */
6435         setenv("CTDB_BASE", ETCDIR "/ctdb", 0);
6436
6437         ev = event_context_init(NULL);
6438         if (!ev) {
6439                 DEBUG(DEBUG_ERR, ("Failed to initialize event system\n"));
6440                 exit(1);
6441         }
6442
6443         for (i=0;i<ARRAY_SIZE(ctdb_commands);i++) {
6444                 if (strcmp(control, ctdb_commands[i].name) == 0) {
6445                         break;
6446                 }
6447         }
6448
6449         if (i == ARRAY_SIZE(ctdb_commands)) {
6450                 DEBUG(DEBUG_ERR, ("Unknown control '%s'\n", control));
6451                 exit(1);
6452         }
6453
6454         if (ctdb_commands[i].without_daemon == true) {
6455                 if (nodestring != NULL) {
6456                         DEBUG(DEBUG_ERR, ("Can't specify node(s) with \"ctdb %s\"\n", control));
6457                         exit(1);
6458                 }
6459                 return ctdb_commands[i].fn(NULL, extra_argc-1, extra_argv+1);
6460         }
6461
6462         /* initialise ctdb */
6463         ctdb = ctdb_cmdline_client(ev, TIMELIMIT());
6464
6465         if (ctdb == NULL) {
6466                 DEBUG(DEBUG_ERR, ("Failed to init ctdb\n"));
6467                 exit(1);
6468         }
6469
6470         /* setup the node number(s) to contact */
6471         if (!parse_nodestring(ctdb, ctdb, nodestring, CTDB_CURRENT_NODE, false,
6472                               &options.nodes, &options.pnn)) {
6473                 usage();
6474         }
6475
6476         if (options.pnn == CTDB_CURRENT_NODE) {
6477                 options.pnn = options.nodes[0];
6478         }
6479
6480         if (ctdb_commands[i].auto_all && 
6481             ((options.pnn == CTDB_BROADCAST_ALL) ||
6482              (options.pnn == CTDB_MULTICAST))) {
6483                 int j;
6484
6485                 ret = 0;
6486                 for (j = 0; j < talloc_array_length(options.nodes); j++) {
6487                         options.pnn = options.nodes[j];
6488                         ret |= ctdb_commands[i].fn(ctdb, extra_argc-1, extra_argv+1);
6489                 }
6490         } else {
6491                 ret = ctdb_commands[i].fn(ctdb, extra_argc-1, extra_argv+1);
6492         }
6493
6494         talloc_free(ctdb);
6495         talloc_free(ev);
6496         (void)poptFreeContext(pc);
6497
6498         return ret;
6499
6500 }