tdb:tdbtool: add the "speed" command to the help text.
[sahlberg/ctdb.git] / server / ctdb_server.c
1 /* 
2    ctdb main protocol code
3
4    Copyright (C) Andrew Tridgell  2006
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "lib/tdb/include/tdb.h"
22 #include "lib/events/events.h"
23 #include "lib/util/dlinklist.h"
24 #include "system/network.h"
25 #include "system/filesys.h"
26 #include "../include/ctdb_private.h"
27
28 /*
29   choose the transport we will use
30 */
31 int ctdb_set_transport(struct ctdb_context *ctdb, const char *transport)
32 {
33         ctdb->transport = talloc_strdup(ctdb, transport);
34         CTDB_NO_MEMORY(ctdb, ctdb->transport);
35
36         return 0;
37 }
38
39 /*
40   Check whether an ip is a valid node ip
41   Returns the node id for this ip address or -1
42 */
43 int ctdb_ip_to_nodeid(struct ctdb_context *ctdb, const char *nodeip)
44 {
45         int nodeid;
46
47         for (nodeid=0;nodeid<ctdb->num_nodes;nodeid++) {
48                 if (ctdb->nodes[nodeid]->flags & NODE_FLAGS_DELETED) {
49                         continue;
50                 }
51                 if (!strcmp(ctdb->nodes[nodeid]->address.address, nodeip)) {
52                         return nodeid;
53                 }
54         }
55
56         return -1;
57 }
58
59 /*
60   choose the recovery lock file
61 */
62 int ctdb_set_recovery_lock_file(struct ctdb_context *ctdb, const char *file)
63 {
64         if (ctdb->recovery_lock_file != NULL) {
65                 talloc_free(ctdb->recovery_lock_file);
66                 ctdb->recovery_lock_file = NULL;
67         }
68
69         if (file == NULL) {
70                 DEBUG(DEBUG_ALERT,("Recovery lock file set to \"\". Disabling recovery lock checking\n"));
71                 ctdb->tunable.verify_recovery_lock = 0;
72                 return 0;
73         }
74
75         ctdb->recovery_lock_file = talloc_strdup(ctdb, file);
76         CTDB_NO_MEMORY(ctdb, ctdb->recovery_lock_file);
77
78         return 0;
79 }
80
81 /*
82   set the directory for the local databases
83 */
84 int ctdb_set_tdb_dir(struct ctdb_context *ctdb, const char *dir)
85 {
86         ctdb->db_directory = talloc_strdup(ctdb, dir);
87         if (ctdb->db_directory == NULL) {
88                 return -1;
89         }
90         return 0;
91 }
92
93 /*
94   set the directory for the persistent databases
95 */
96 int ctdb_set_tdb_dir_persistent(struct ctdb_context *ctdb, const char *dir)
97 {
98         ctdb->db_directory_persistent = talloc_strdup(ctdb, dir);
99         if (ctdb->db_directory_persistent == NULL) {
100                 return -1;
101         }
102         return 0;
103 }
104
105 /*
106   add a node to the list of nodes
107 */
108 static int ctdb_add_node(struct ctdb_context *ctdb, char *nstr)
109 {
110         struct ctdb_node *node, **nodep;
111
112         nodep = talloc_realloc(ctdb, ctdb->nodes, struct ctdb_node *, ctdb->num_nodes+1);
113         CTDB_NO_MEMORY(ctdb, nodep);
114
115         ctdb->nodes = nodep;
116         nodep = &ctdb->nodes[ctdb->num_nodes];
117         (*nodep) = talloc_zero(ctdb->nodes, struct ctdb_node);
118         CTDB_NO_MEMORY(ctdb, *nodep);
119         node = *nodep;
120
121         if (ctdb_parse_address(ctdb, node, nstr, &node->address) != 0) {
122                 return -1;
123         }
124         node->ctdb = ctdb;
125         node->name = talloc_asprintf(node, "%s:%u", 
126                                      node->address.address, 
127                                      node->address.port);
128         /* this assumes that the nodes are kept in sorted order, and no gaps */
129         node->pnn = ctdb->num_nodes;
130
131         /* nodes start out disconnected and unhealthy */
132         node->flags = (NODE_FLAGS_DISCONNECTED | NODE_FLAGS_UNHEALTHY);
133
134         if (ctdb->address.address &&
135             ctdb_same_address(&ctdb->address, &node->address)) {
136                 /* for automatic binding to interfaces, see tcp_connect.c */
137                 ctdb->pnn = node->pnn;
138                 node->flags &= ~NODE_FLAGS_DISCONNECTED;
139
140                 /* do we start out in DISABLED mode? */
141                 if (ctdb->start_as_disabled != 0) {
142                         DEBUG(DEBUG_INFO, ("This node is configured to start in DISABLED state\n"));
143                         node->flags |= NODE_FLAGS_DISABLED;
144                 }
145                 /* do we start out in STOPPED mode? */
146                 if (ctdb->start_as_stopped != 0) {
147                         DEBUG(DEBUG_INFO, ("This node is configured to start in STOPPED state\n"));
148                         node->flags |= NODE_FLAGS_STOPPED;
149                 }
150         }
151
152         ctdb->num_nodes++;
153         node->dead_count = 0;
154
155         return 0;
156 }
157
158 /*
159   add an entry for a "deleted" node to the list of nodes.
160   a "deleted" node is a node that is commented out from the nodes file.
161   this is used to prevent that subsequent nodes in the nodes list
162   change their pnn value if a node is "delete" by commenting it out and then
163   using "ctdb reloadnodes" at runtime.
164 */
165 static int ctdb_add_deleted_node(struct ctdb_context *ctdb)
166 {
167         struct ctdb_node *node, **nodep;
168
169         nodep = talloc_realloc(ctdb, ctdb->nodes, struct ctdb_node *, ctdb->num_nodes+1);
170         CTDB_NO_MEMORY(ctdb, nodep);
171
172         ctdb->nodes = nodep;
173         nodep = &ctdb->nodes[ctdb->num_nodes];
174         (*nodep) = talloc_zero(ctdb->nodes, struct ctdb_node);
175         CTDB_NO_MEMORY(ctdb, *nodep);
176         node = *nodep;
177         
178         if (ctdb_parse_address(ctdb, node, "0.0.0.0", &node->address) != 0) {
179                 DEBUG(DEBUG_ERR,("Failed to setup deleted node %d\n", ctdb->num_nodes));
180                 return -1;
181         }
182         node->ctdb = ctdb;
183         node->name = talloc_strdup(node, "0.0.0.0:0");
184
185         /* this assumes that the nodes are kept in sorted order, and no gaps */
186         node->pnn = ctdb->num_nodes;
187
188         /* this node is permanently deleted/disconnected */
189         node->flags = NODE_FLAGS_DELETED|NODE_FLAGS_DISCONNECTED;
190
191         ctdb->num_nodes++;
192         node->dead_count = 0;
193
194         return 0;
195 }
196
197
198 /*
199   setup the node list from a file
200 */
201 int ctdb_set_nlist(struct ctdb_context *ctdb, const char *nlist)
202 {
203         char **lines;
204         int nlines;
205         int i, j, num_present;
206
207         talloc_free(ctdb->nodes);
208         ctdb->nodes     = NULL;
209         ctdb->num_nodes = 0;
210
211         lines = file_lines_load(nlist, &nlines, ctdb);
212         if (lines == NULL) {
213                 ctdb_set_error(ctdb, "Failed to load nlist '%s'\n", nlist);
214                 return -1;
215         }
216         while (nlines > 0 && strcmp(lines[nlines-1], "") == 0) {
217                 nlines--;
218         }
219
220         num_present = 0;
221         for (i=0; i < nlines; i++) {
222                 char *node;
223
224                 node = lines[i];
225                 /* strip leading spaces */
226                 while((*node == ' ') || (*node == '\t')) {
227                         node++;
228                 }
229                 if (*node == '#') {
230                         if (ctdb_add_deleted_node(ctdb) != 0) {
231                                 talloc_free(lines);
232                                 return -1;
233                         }
234                         continue;
235                 }
236                 if (strcmp(node, "") == 0) {
237                         continue;
238                 }
239                 if (ctdb_add_node(ctdb, node) != 0) {
240                         talloc_free(lines);
241                         return -1;
242                 }
243                 num_present++;
244         }
245
246         /* initialize the vnn mapping table now that we have the nodes list,
247            skipping any deleted nodes
248         */
249         ctdb->vnn_map = talloc(ctdb, struct ctdb_vnn_map);
250         CTDB_NO_MEMORY(ctdb, ctdb->vnn_map);
251
252         ctdb->vnn_map->generation = INVALID_GENERATION;
253         ctdb->vnn_map->size = num_present;
254         ctdb->vnn_map->map = talloc_array(ctdb->vnn_map, uint32_t, ctdb->vnn_map->size);
255         CTDB_NO_MEMORY(ctdb, ctdb->vnn_map->map);
256
257         for(i=0, j=0; i < ctdb->vnn_map->size; i++) {
258                 if (ctdb->nodes[i]->flags & NODE_FLAGS_DELETED) {
259                         continue;
260                 }
261                 ctdb->vnn_map->map[j] = i;
262                 j++;
263         }
264         
265         talloc_free(lines);
266         return 0;
267 }
268
269
270 /*
271   setup the local node address
272 */
273 int ctdb_set_address(struct ctdb_context *ctdb, const char *address)
274 {
275         if (ctdb_parse_address(ctdb, ctdb, address, &ctdb->address) != 0) {
276                 return -1;
277         }
278         
279         ctdb->name = talloc_asprintf(ctdb, "%s:%u", 
280                                      ctdb->address.address, 
281                                      ctdb->address.port);
282         return 0;
283 }
284
285
286 /*
287   return the number of active nodes
288 */
289 uint32_t ctdb_get_num_active_nodes(struct ctdb_context *ctdb)
290 {
291         int i;
292         uint32_t count=0;
293         for (i=0; i < ctdb->num_nodes; i++) {
294                 if (!(ctdb->nodes[i]->flags & NODE_FLAGS_INACTIVE)) {
295                         count++;
296                 }
297         }
298         return count;
299 }
300
301
302 /*
303   called when we need to process a packet. This can be a requeued packet
304   after a lockwait, or a real packet from another node
305 */
306 void ctdb_input_pkt(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
307 {
308         TALLOC_CTX *tmp_ctx;
309
310         /* place the packet as a child of the tmp_ctx. We then use
311            talloc_free() below to free it. If any of the calls want
312            to keep it, then they will steal it somewhere else, and the
313            talloc_free() will only free the tmp_ctx */
314         tmp_ctx = talloc_new(ctdb);
315         talloc_steal(tmp_ctx, hdr);
316
317         DEBUG(DEBUG_DEBUG,(__location__ " ctdb request %u of type %u length %u from "
318                  "node %u to %u\n", hdr->reqid, hdr->operation, hdr->length,
319                  hdr->srcnode, hdr->destnode));
320
321         switch (hdr->operation) {
322         case CTDB_REQ_CALL:
323         case CTDB_REPLY_CALL:
324         case CTDB_REQ_DMASTER:
325         case CTDB_REPLY_DMASTER:
326                 /* we dont allow these calls when banned */
327                 if (ctdb->nodes[ctdb->pnn]->flags & NODE_FLAGS_BANNED) {
328                         DEBUG(DEBUG_DEBUG,(__location__ " ctdb operation %u"
329                                 " request %u"
330                                 " length %u from node %u to %u while node"
331                                 " is banned\n",
332                                  hdr->operation, hdr->reqid,
333                                  hdr->length, 
334                                  hdr->srcnode, hdr->destnode));
335                         goto done;
336                 }
337
338                 /* for ctdb_call inter-node operations verify that the
339                    remote node that sent us the call is running in the
340                    same generation instance as this node
341                 */
342                 if (ctdb->vnn_map->generation != hdr->generation) {
343                         DEBUG(DEBUG_DEBUG,(__location__ " ctdb operation %u"
344                                 " request %u"
345                                 " length %u from node %u to %u had an"
346                                 " invalid generation id:%u while our"
347                                 " generation id is:%u\n", 
348                                  hdr->operation, hdr->reqid,
349                                  hdr->length, 
350                                  hdr->srcnode, hdr->destnode, 
351                                  hdr->generation, ctdb->vnn_map->generation));
352                         goto done;
353                 }
354         }
355
356         switch (hdr->operation) {
357         case CTDB_REQ_CALL:
358                 ctdb->statistics.node.req_call++;
359                 ctdb_request_call(ctdb, hdr);
360                 break;
361
362         case CTDB_REPLY_CALL:
363                 ctdb->statistics.node.reply_call++;
364                 ctdb_reply_call(ctdb, hdr);
365                 break;
366
367         case CTDB_REPLY_ERROR:
368                 ctdb->statistics.node.reply_error++;
369                 ctdb_reply_error(ctdb, hdr);
370                 break;
371
372         case CTDB_REQ_DMASTER:
373                 ctdb->statistics.node.req_dmaster++;
374                 ctdb_request_dmaster(ctdb, hdr);
375                 break;
376
377         case CTDB_REPLY_DMASTER:
378                 ctdb->statistics.node.reply_dmaster++;
379                 ctdb_reply_dmaster(ctdb, hdr);
380                 break;
381
382         case CTDB_REQ_MESSAGE:
383                 ctdb->statistics.node.req_message++;
384                 ctdb_request_message(ctdb, hdr);
385                 break;
386
387         case CTDB_REQ_CONTROL:
388                 ctdb->statistics.node.req_control++;
389                 ctdb_request_control(ctdb, hdr);
390                 break;
391
392         case CTDB_REPLY_CONTROL:
393                 ctdb->statistics.node.reply_control++;
394                 ctdb_reply_control(ctdb, hdr);
395                 break;
396
397         case CTDB_REQ_KEEPALIVE:
398                 ctdb->statistics.keepalive_packets_recv++;
399                 break;
400
401         default:
402                 DEBUG(DEBUG_CRIT,("%s: Packet with unknown operation %u\n", 
403                          __location__, hdr->operation));
404                 break;
405         }
406
407 done:
408         talloc_free(tmp_ctx);
409 }
410
411
412 /*
413   called by the transport layer when a node is dead
414 */
415 void ctdb_node_dead(struct ctdb_node *node)
416 {
417         if (node->flags & NODE_FLAGS_DISCONNECTED) {
418                 DEBUG(DEBUG_INFO,("%s: node %s is already marked disconnected: %u connected\n", 
419                          node->ctdb->name, node->name, 
420                          node->ctdb->num_connected));
421                 return;
422         }
423         node->ctdb->num_connected--;
424         node->flags |= NODE_FLAGS_DISCONNECTED | NODE_FLAGS_UNHEALTHY;
425         node->rx_cnt = 0;
426         node->dead_count = 0;
427
428         DEBUG(DEBUG_NOTICE,("%s: node %s is dead: %u connected\n", 
429                  node->ctdb->name, node->name, node->ctdb->num_connected));
430         ctdb_daemon_cancel_controls(node->ctdb, node);
431
432         if (node->ctdb->methods == NULL) {
433                 DEBUG(DEBUG_ERR,(__location__ " Can not restart transport while shutting down daemon.\n"));
434                 return;
435         }
436
437         node->ctdb->methods->restart(node);
438 }
439
440 /*
441   called by the transport layer when a node is connected
442 */
443 void ctdb_node_connected(struct ctdb_node *node)
444 {
445         if (!(node->flags & NODE_FLAGS_DISCONNECTED)) {
446                 DEBUG(DEBUG_INFO,("%s: node %s is already marked connected: %u connected\n", 
447                          node->ctdb->name, node->name, 
448                          node->ctdb->num_connected));
449                 return;
450         }
451         node->ctdb->num_connected++;
452         node->dead_count = 0;
453         node->flags &= ~NODE_FLAGS_DISCONNECTED;
454         node->flags |= NODE_FLAGS_UNHEALTHY;
455         DEBUG(DEBUG_INFO,("%s: connected to %s - %u connected\n", 
456                  node->ctdb->name, node->name, node->ctdb->num_connected));
457 }
458
459 struct queue_next {
460         struct ctdb_context *ctdb;
461         struct ctdb_req_header *hdr;
462 };
463
464
465 /*
466   triggered when a deferred packet is due
467  */
468 static void queue_next_trigger(struct event_context *ev, struct timed_event *te, 
469                                struct timeval t, void *private_data)
470 {
471         struct queue_next *q = talloc_get_type(private_data, struct queue_next);
472         ctdb_input_pkt(q->ctdb, q->hdr);
473         talloc_free(q);
474 }       
475
476 /*
477   defer a packet, so it is processed on the next event loop
478   this is used for sending packets to ourselves
479  */
480 static void ctdb_defer_packet(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
481 {
482         struct queue_next *q;
483         q = talloc(ctdb, struct queue_next);
484         if (q == NULL) {
485                 DEBUG(DEBUG_ERR,(__location__ " Failed to allocate deferred packet\n"));
486                 return;
487         }
488         q->ctdb = ctdb;
489         q->hdr = talloc_memdup(ctdb, hdr, hdr->length);
490         if (q->hdr == NULL) {
491                 DEBUG(DEBUG_ERR,("Error copying deferred packet to self\n"));
492                 return;
493         }
494 #if 0
495         /* use this to put packets directly into our recv function */
496         ctdb_input_pkt(q->ctdb, q->hdr);
497 #else
498         event_add_timed(ctdb->ev, q, timeval_zero(), queue_next_trigger, q);
499 #endif
500 }
501
502
503 /*
504   broadcast a packet to all nodes
505 */
506 static void ctdb_broadcast_packet_all(struct ctdb_context *ctdb, 
507                                       struct ctdb_req_header *hdr)
508 {
509         int i;
510         for (i=0; i < ctdb->num_nodes; i++) {
511                 if (ctdb->nodes[i]->flags & NODE_FLAGS_DELETED) {
512                         continue;
513                 }
514                 hdr->destnode = ctdb->nodes[i]->pnn;
515                 ctdb_queue_packet(ctdb, hdr);
516         }
517 }
518
519 /*
520   broadcast a packet to all nodes in the current vnnmap
521 */
522 static void ctdb_broadcast_packet_vnnmap(struct ctdb_context *ctdb, 
523                                          struct ctdb_req_header *hdr)
524 {
525         int i;
526         for (i=0;i<ctdb->vnn_map->size;i++) {
527                 hdr->destnode = ctdb->vnn_map->map[i];
528                 ctdb_queue_packet(ctdb, hdr);
529         }
530 }
531
532 /*
533   broadcast a packet to all connected nodes
534 */
535 static void ctdb_broadcast_packet_connected(struct ctdb_context *ctdb, 
536                                             struct ctdb_req_header *hdr)
537 {
538         int i;
539         for (i=0; i < ctdb->num_nodes; i++) {
540                 if (ctdb->nodes[i]->flags & NODE_FLAGS_DELETED) {
541                         continue;
542                 }
543                 if (!(ctdb->nodes[i]->flags & NODE_FLAGS_DISCONNECTED)) {
544                         hdr->destnode = ctdb->nodes[i]->pnn;
545                         ctdb_queue_packet(ctdb, hdr);
546                 }
547         }
548 }
549
550 /*
551   queue a packet or die
552 */
553 void ctdb_queue_packet(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
554 {
555         struct ctdb_node *node;
556
557         switch (hdr->destnode) {
558         case CTDB_BROADCAST_ALL:
559                 ctdb_broadcast_packet_all(ctdb, hdr);
560                 return;
561         case CTDB_BROADCAST_VNNMAP:
562                 ctdb_broadcast_packet_vnnmap(ctdb, hdr);
563                 return;
564         case CTDB_BROADCAST_CONNECTED:
565                 ctdb_broadcast_packet_connected(ctdb, hdr);
566                 return;
567         }
568
569         ctdb->statistics.node_packets_sent++;
570
571         if (!ctdb_validate_pnn(ctdb, hdr->destnode)) {
572                 DEBUG(DEBUG_CRIT,(__location__ " cant send to node %u that does not exist\n", 
573                          hdr->destnode));
574                 return;
575         }
576
577         node = ctdb->nodes[hdr->destnode];
578
579         if (node->flags & NODE_FLAGS_DELETED) {
580                 DEBUG(DEBUG_ERR, (__location__ " Can not queue packet to DELETED node %d\n", hdr->destnode));
581                 return;
582         }
583
584         if (node->pnn == ctdb->pnn) {
585                 ctdb_defer_packet(ctdb, hdr);
586                 return;
587         }
588
589         if (ctdb->methods == NULL) {
590                 DEBUG(DEBUG_ALERT, (__location__ " Can not queue packet. "
591                                     "Transport is DOWN\n"));
592                 return;
593         }
594
595         node->tx_cnt++;
596         if (ctdb->methods->queue_pkt(node, (uint8_t *)hdr, hdr->length) != 0) {
597                 ctdb_fatal(ctdb, "Unable to queue packet\n");
598         }
599 }
600
601
602
603
604 /*
605   a valgrind hack to allow us to get opcode specific backtraces
606   very ugly, and relies on no compiler optimisation!
607 */
608 void ctdb_queue_packet_opcode(struct ctdb_context *ctdb, struct ctdb_req_header *hdr, unsigned opcode)
609 {
610         switch (opcode) {
611 #define DO_OP(x) case x: ctdb_queue_packet(ctdb, hdr); break
612                 DO_OP(1);
613                 DO_OP(2);
614                 DO_OP(3);
615                 DO_OP(4);
616                 DO_OP(5);
617                 DO_OP(6);
618                 DO_OP(7);
619                 DO_OP(8);
620                 DO_OP(9);
621                 DO_OP(10);
622                 DO_OP(11);
623                 DO_OP(12);
624                 DO_OP(13);
625                 DO_OP(14);
626                 DO_OP(15);
627                 DO_OP(16);
628                 DO_OP(17);
629                 DO_OP(18);
630                 DO_OP(19);
631                 DO_OP(20);
632                 DO_OP(21);
633                 DO_OP(22);
634                 DO_OP(23);
635                 DO_OP(24);
636                 DO_OP(25);
637                 DO_OP(26);
638                 DO_OP(27);
639                 DO_OP(28);
640                 DO_OP(29);
641                 DO_OP(30);
642                 DO_OP(31);
643                 DO_OP(32);
644                 DO_OP(33);
645                 DO_OP(34);
646                 DO_OP(35);
647                 DO_OP(36);
648                 DO_OP(37);
649                 DO_OP(38);
650                 DO_OP(39);
651                 DO_OP(40);
652                 DO_OP(41);
653                 DO_OP(42);
654                 DO_OP(43);
655                 DO_OP(44);
656                 DO_OP(45);
657                 DO_OP(46);
658                 DO_OP(47);
659                 DO_OP(48);
660                 DO_OP(49);
661                 DO_OP(50);
662                 DO_OP(51);
663                 DO_OP(52);
664                 DO_OP(53);
665                 DO_OP(54);
666                 DO_OP(55);
667                 DO_OP(56);
668                 DO_OP(57);
669                 DO_OP(58);
670                 DO_OP(59);
671                 DO_OP(60);
672                 DO_OP(61);
673                 DO_OP(62);
674                 DO_OP(63);
675                 DO_OP(64);
676                 DO_OP(65);
677                 DO_OP(66);
678                 DO_OP(67);
679                 DO_OP(68);
680                 DO_OP(69);
681                 DO_OP(70);
682                 DO_OP(71);
683                 DO_OP(72);
684                 DO_OP(73);
685                 DO_OP(74);
686                 DO_OP(75);
687                 DO_OP(76);
688                 DO_OP(77);
689                 DO_OP(78);
690                 DO_OP(79);
691                 DO_OP(80);
692                 DO_OP(81);
693                 DO_OP(82);
694                 DO_OP(83);
695                 DO_OP(84);
696                 DO_OP(85);
697                 DO_OP(86);
698                 DO_OP(87);
699                 DO_OP(88);
700                 DO_OP(89);
701                 DO_OP(90);
702                 DO_OP(91);
703                 DO_OP(92);
704                 DO_OP(93);
705                 DO_OP(94);
706                 DO_OP(95);
707                 DO_OP(96);
708                 DO_OP(97);
709                 DO_OP(98);
710                 DO_OP(99);
711                 DO_OP(100);
712         default: 
713                 ctdb_queue_packet(ctdb, hdr);
714                 break;
715         }
716 }