ctdbd: Add nodes_file member to struct ctdb_context
[obnox/samba/samba-obnox.git] / ctdb / server / ctdb_server.c
1 /* 
2    ctdb main protocol code
3
4    Copyright (C) Andrew Tridgell  2006
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "tdb.h"
22 #include "lib/util/dlinklist.h"
23 #include "system/network.h"
24 #include "system/filesys.h"
25 #include "../include/ctdb_private.h"
26
27 /*
28   choose the transport we will use
29 */
30 int ctdb_set_transport(struct ctdb_context *ctdb, const char *transport)
31 {
32         ctdb->transport = talloc_strdup(ctdb, transport);
33         CTDB_NO_MEMORY(ctdb, ctdb->transport);
34
35         return 0;
36 }
37
38 /*
39   Check whether an ip is a valid node ip
40   Returns the node id for this ip address or -1
41 */
42 int ctdb_ip_to_nodeid(struct ctdb_context *ctdb, const char *nodeip)
43 {
44         int nodeid;
45
46         for (nodeid=0;nodeid<ctdb->num_nodes;nodeid++) {
47                 if (ctdb->nodes[nodeid]->flags & NODE_FLAGS_DELETED) {
48                         continue;
49                 }
50                 if (!strcmp(ctdb->nodes[nodeid]->address.address, nodeip)) {
51                         return nodeid;
52                 }
53         }
54
55         return -1;
56 }
57
58 /*
59   choose the recovery lock file
60 */
61 int ctdb_set_recovery_lock_file(struct ctdb_context *ctdb, const char *file)
62 {
63         if (ctdb->recovery_lock_file != NULL) {
64                 talloc_free(ctdb->recovery_lock_file);
65                 ctdb->recovery_lock_file = NULL;
66         }
67
68         if (file == NULL) {
69                 DEBUG(DEBUG_ALERT,("Recovery lock file set to \"\". Disabling recovery lock checking\n"));
70                 ctdb->tunable.verify_recovery_lock = 0;
71                 return 0;
72         }
73
74         ctdb->recovery_lock_file = talloc_strdup(ctdb, file);
75         CTDB_NO_MEMORY(ctdb, ctdb->recovery_lock_file);
76
77         return 0;
78 }
79
80 /*
81   set the directory for the local databases
82 */
83 int ctdb_set_tdb_dir(struct ctdb_context *ctdb, const char *dir)
84 {
85         ctdb->db_directory = talloc_strdup(ctdb, dir);
86         if (ctdb->db_directory == NULL) {
87                 return -1;
88         }
89         return 0;
90 }
91
92 /*
93   set the directory for the persistent databases
94 */
95 int ctdb_set_tdb_dir_persistent(struct ctdb_context *ctdb, const char *dir)
96 {
97         ctdb->db_directory_persistent = talloc_strdup(ctdb, dir);
98         if (ctdb->db_directory_persistent == NULL) {
99                 return -1;
100         }
101         return 0;
102 }
103
104 /*
105   set the directory for internal state databases
106 */
107 int ctdb_set_tdb_dir_state(struct ctdb_context *ctdb, const char *dir)
108 {
109         ctdb->db_directory_state = talloc_strdup(ctdb, dir);
110         if (ctdb->db_directory_state == NULL) {
111                 return -1;
112         }
113         return 0;
114 }
115
116 /*
117   add a node to the list of nodes
118 */
119 static int ctdb_add_node(struct ctdb_context *ctdb, char *nstr)
120 {
121         struct ctdb_node *node, **nodep;
122
123         nodep = talloc_realloc(ctdb, ctdb->nodes, struct ctdb_node *, ctdb->num_nodes+1);
124         CTDB_NO_MEMORY(ctdb, nodep);
125
126         ctdb->nodes = nodep;
127         nodep = &ctdb->nodes[ctdb->num_nodes];
128         (*nodep) = talloc_zero(ctdb->nodes, struct ctdb_node);
129         CTDB_NO_MEMORY(ctdb, *nodep);
130         node = *nodep;
131
132         if (ctdb_parse_address(ctdb, node, nstr, &node->address) != 0) {
133                 return -1;
134         }
135         node->ctdb = ctdb;
136         node->name = talloc_asprintf(node, "%s:%u", 
137                                      node->address.address, 
138                                      node->address.port);
139         /* this assumes that the nodes are kept in sorted order, and no gaps */
140         node->pnn = ctdb->num_nodes;
141
142         /* nodes start out disconnected and unhealthy */
143         node->flags = (NODE_FLAGS_DISCONNECTED | NODE_FLAGS_UNHEALTHY);
144
145         if (ctdb->address.address &&
146             ctdb_same_address(&ctdb->address, &node->address)) {
147                 /* for automatic binding to interfaces, see tcp_connect.c */
148                 ctdb->pnn = node->pnn;
149         }
150
151         ctdb->num_nodes++;
152         node->dead_count = 0;
153
154         return 0;
155 }
156
157 /*
158   add an entry for a "deleted" node to the list of nodes.
159   a "deleted" node is a node that is commented out from the nodes file.
160   this is used to prevent that subsequent nodes in the nodes list
161   change their pnn value if a node is "delete" by commenting it out and then
162   using "ctdb reloadnodes" at runtime.
163 */
164 static int ctdb_add_deleted_node(struct ctdb_context *ctdb)
165 {
166         struct ctdb_node *node, **nodep;
167
168         nodep = talloc_realloc(ctdb, ctdb->nodes, struct ctdb_node *, ctdb->num_nodes+1);
169         CTDB_NO_MEMORY(ctdb, nodep);
170
171         ctdb->nodes = nodep;
172         nodep = &ctdb->nodes[ctdb->num_nodes];
173         (*nodep) = talloc_zero(ctdb->nodes, struct ctdb_node);
174         CTDB_NO_MEMORY(ctdb, *nodep);
175         node = *nodep;
176         
177         if (ctdb_parse_address(ctdb, node, "0.0.0.0", &node->address) != 0) {
178                 DEBUG(DEBUG_ERR,("Failed to setup deleted node %d\n", ctdb->num_nodes));
179                 return -1;
180         }
181         node->ctdb = ctdb;
182         node->name = talloc_strdup(node, "0.0.0.0:0");
183
184         /* this assumes that the nodes are kept in sorted order, and no gaps */
185         node->pnn = ctdb->num_nodes;
186
187         /* this node is permanently deleted/disconnected */
188         node->flags = NODE_FLAGS_DELETED|NODE_FLAGS_DISCONNECTED;
189
190         ctdb->num_nodes++;
191         node->dead_count = 0;
192
193         return 0;
194 }
195
196
197 /*
198   setup the node list from a file
199 */
200 static int ctdb_set_nlist(struct ctdb_context *ctdb, const char *nlist)
201 {
202         char **lines;
203         int nlines;
204         int i, j, num_present;
205
206         talloc_free(ctdb->nodes);
207         ctdb->nodes     = NULL;
208         ctdb->num_nodes = 0;
209
210         lines = file_lines_load(nlist, &nlines, ctdb);
211         if (lines == NULL) {
212                 ctdb_set_error(ctdb, "Failed to load nlist '%s'\n", nlist);
213                 return -1;
214         }
215         while (nlines > 0 && strcmp(lines[nlines-1], "") == 0) {
216                 nlines--;
217         }
218
219         num_present = 0;
220         for (i=0; i < nlines; i++) {
221                 char *node;
222
223                 node = lines[i];
224                 /* strip leading spaces */
225                 while((*node == ' ') || (*node == '\t')) {
226                         node++;
227                 }
228                 if (*node == '#') {
229                         if (ctdb_add_deleted_node(ctdb) != 0) {
230                                 talloc_free(lines);
231                                 return -1;
232                         }
233                         continue;
234                 }
235                 if (strcmp(node, "") == 0) {
236                         continue;
237                 }
238                 if (ctdb_add_node(ctdb, node) != 0) {
239                         talloc_free(lines);
240                         return -1;
241                 }
242                 num_present++;
243         }
244
245         /* initialize the vnn mapping table now that we have the nodes list,
246            skipping any deleted nodes
247         */
248         ctdb->vnn_map = talloc(ctdb, struct ctdb_vnn_map);
249         CTDB_NO_MEMORY(ctdb, ctdb->vnn_map);
250
251         ctdb->vnn_map->generation = INVALID_GENERATION;
252         ctdb->vnn_map->size = num_present;
253         ctdb->vnn_map->map = talloc_array(ctdb->vnn_map, uint32_t, ctdb->vnn_map->size);
254         CTDB_NO_MEMORY(ctdb, ctdb->vnn_map->map);
255
256         for(i=0, j=0; i < ctdb->vnn_map->size; i++) {
257                 if (ctdb->nodes[i]->flags & NODE_FLAGS_DELETED) {
258                         continue;
259                 }
260                 ctdb->vnn_map->map[j] = i;
261                 j++;
262         }
263         
264         talloc_free(lines);
265         return 0;
266 }
267
268 void ctdb_load_nodes_file(struct ctdb_context *ctdb)
269 {
270         int ret;
271
272         ret = ctdb_set_nlist(ctdb, ctdb->nodes_file);
273         if (ret == -1) {
274                 DEBUG(DEBUG_ALERT,("ctdb_set_nlist failed - %s\n", ctdb_errstr(ctdb)));
275                 exit(1);
276         }
277 }
278
279 /*
280   setup the local node address
281 */
282 int ctdb_set_address(struct ctdb_context *ctdb, const char *address)
283 {
284         if (ctdb_parse_address(ctdb, ctdb, address, &ctdb->address) != 0) {
285                 return -1;
286         }
287         
288         ctdb->name = talloc_asprintf(ctdb, "%s:%u", 
289                                      ctdb->address.address, 
290                                      ctdb->address.port);
291         return 0;
292 }
293
294
295 /*
296   return the number of active nodes
297 */
298 uint32_t ctdb_get_num_active_nodes(struct ctdb_context *ctdb)
299 {
300         int i;
301         uint32_t count=0;
302         for (i=0; i < ctdb->num_nodes; i++) {
303                 if (!(ctdb->nodes[i]->flags & NODE_FLAGS_INACTIVE)) {
304                         count++;
305                 }
306         }
307         return count;
308 }
309
310
311 /*
312   called when we need to process a packet. This can be a requeued packet
313   after a lockwait, or a real packet from another node
314 */
315 void ctdb_input_pkt(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
316 {
317         TALLOC_CTX *tmp_ctx;
318
319         /* place the packet as a child of the tmp_ctx. We then use
320            talloc_free() below to free it. If any of the calls want
321            to keep it, then they will steal it somewhere else, and the
322            talloc_free() will only free the tmp_ctx */
323         tmp_ctx = talloc_new(ctdb);
324         talloc_steal(tmp_ctx, hdr);
325
326         DEBUG(DEBUG_DEBUG,(__location__ " ctdb request %u of type %u length %u from "
327                  "node %u to %u\n", hdr->reqid, hdr->operation, hdr->length,
328                  hdr->srcnode, hdr->destnode));
329
330         switch (hdr->operation) {
331         case CTDB_REQ_CALL:
332         case CTDB_REPLY_CALL:
333         case CTDB_REQ_DMASTER:
334         case CTDB_REPLY_DMASTER:
335                 /* we dont allow these calls when banned */
336                 if (ctdb->nodes[ctdb->pnn]->flags & NODE_FLAGS_BANNED) {
337                         DEBUG(DEBUG_DEBUG,(__location__ " ctdb operation %u"
338                                 " request %u"
339                                 " length %u from node %u to %u while node"
340                                 " is banned\n",
341                                  hdr->operation, hdr->reqid,
342                                  hdr->length, 
343                                  hdr->srcnode, hdr->destnode));
344                         goto done;
345                 }
346
347                 /* for ctdb_call inter-node operations verify that the
348                    remote node that sent us the call is running in the
349                    same generation instance as this node
350                 */
351                 if (ctdb->vnn_map->generation != hdr->generation) {
352                         DEBUG(DEBUG_DEBUG,(__location__ " ctdb operation %u"
353                                 " request %u"
354                                 " length %u from node %u to %u had an"
355                                 " invalid generation id:%u while our"
356                                 " generation id is:%u\n", 
357                                  hdr->operation, hdr->reqid,
358                                  hdr->length, 
359                                  hdr->srcnode, hdr->destnode, 
360                                  hdr->generation, ctdb->vnn_map->generation));
361                         goto done;
362                 }
363         }
364
365         switch (hdr->operation) {
366         case CTDB_REQ_CALL:
367                 CTDB_INCREMENT_STAT(ctdb, node.req_call);
368                 ctdb_request_call(ctdb, hdr);
369                 break;
370
371         case CTDB_REPLY_CALL:
372                 CTDB_INCREMENT_STAT(ctdb, node.reply_call);
373                 ctdb_reply_call(ctdb, hdr);
374                 break;
375
376         case CTDB_REPLY_ERROR:
377                 CTDB_INCREMENT_STAT(ctdb, node.reply_error);
378                 ctdb_reply_error(ctdb, hdr);
379                 break;
380
381         case CTDB_REQ_DMASTER:
382                 CTDB_INCREMENT_STAT(ctdb, node.req_dmaster);
383                 ctdb_request_dmaster(ctdb, hdr);
384                 break;
385
386         case CTDB_REPLY_DMASTER:
387                 CTDB_INCREMENT_STAT(ctdb, node.reply_dmaster);
388                 ctdb_reply_dmaster(ctdb, hdr);
389                 break;
390
391         case CTDB_REQ_MESSAGE:
392                 CTDB_INCREMENT_STAT(ctdb, node.req_message);
393                 ctdb_request_message(ctdb, hdr);
394                 break;
395
396         case CTDB_REQ_CONTROL:
397                 CTDB_INCREMENT_STAT(ctdb, node.req_control);
398                 ctdb_request_control(ctdb, hdr);
399                 break;
400
401         case CTDB_REPLY_CONTROL:
402                 CTDB_INCREMENT_STAT(ctdb, node.reply_control);
403                 ctdb_reply_control(ctdb, hdr);
404                 break;
405
406         case CTDB_REQ_KEEPALIVE:
407                 CTDB_INCREMENT_STAT(ctdb, keepalive_packets_recv);
408                 break;
409
410         default:
411                 DEBUG(DEBUG_CRIT,("%s: Packet with unknown operation %u\n", 
412                          __location__, hdr->operation));
413                 break;
414         }
415
416 done:
417         talloc_free(tmp_ctx);
418 }
419
420
421 /*
422   called by the transport layer when a node is dead
423 */
424 void ctdb_node_dead(struct ctdb_node *node)
425 {
426         if (node->flags & NODE_FLAGS_DISCONNECTED) {
427                 DEBUG(DEBUG_INFO,("%s: node %s is already marked disconnected: %u connected\n", 
428                          node->ctdb->name, node->name, 
429                          node->ctdb->num_connected));
430                 return;
431         }
432         node->ctdb->num_connected--;
433         node->flags |= NODE_FLAGS_DISCONNECTED | NODE_FLAGS_UNHEALTHY;
434         node->rx_cnt = 0;
435         node->dead_count = 0;
436
437         DEBUG(DEBUG_NOTICE,("%s: node %s is dead: %u connected\n", 
438                  node->ctdb->name, node->name, node->ctdb->num_connected));
439         ctdb_daemon_cancel_controls(node->ctdb, node);
440
441         if (node->ctdb->methods == NULL) {
442                 DEBUG(DEBUG_ERR,(__location__ " Can not restart transport while shutting down daemon.\n"));
443                 return;
444         }
445
446         node->ctdb->methods->restart(node);
447 }
448
449 /*
450   called by the transport layer when a node is connected
451 */
452 void ctdb_node_connected(struct ctdb_node *node)
453 {
454         if (!(node->flags & NODE_FLAGS_DISCONNECTED)) {
455                 DEBUG(DEBUG_INFO,("%s: node %s is already marked connected: %u connected\n", 
456                          node->ctdb->name, node->name, 
457                          node->ctdb->num_connected));
458                 return;
459         }
460         node->ctdb->num_connected++;
461         node->dead_count = 0;
462         node->flags &= ~NODE_FLAGS_DISCONNECTED;
463         node->flags |= NODE_FLAGS_UNHEALTHY;
464         DEBUG(DEBUG_INFO,("%s: connected to %s - %u connected\n", 
465                  node->ctdb->name, node->name, node->ctdb->num_connected));
466 }
467
468 struct queue_next {
469         struct ctdb_context *ctdb;
470         struct ctdb_req_header *hdr;
471 };
472
473
474 /*
475   triggered when a deferred packet is due
476  */
477 static void queue_next_trigger(struct event_context *ev, struct timed_event *te, 
478                                struct timeval t, void *private_data)
479 {
480         struct queue_next *q = talloc_get_type(private_data, struct queue_next);
481         ctdb_input_pkt(q->ctdb, q->hdr);
482         talloc_free(q);
483 }       
484
485 /*
486   defer a packet, so it is processed on the next event loop
487   this is used for sending packets to ourselves
488  */
489 static void ctdb_defer_packet(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
490 {
491         struct queue_next *q;
492         q = talloc(ctdb, struct queue_next);
493         if (q == NULL) {
494                 DEBUG(DEBUG_ERR,(__location__ " Failed to allocate deferred packet\n"));
495                 return;
496         }
497         q->ctdb = ctdb;
498         q->hdr = talloc_memdup(ctdb, hdr, hdr->length);
499         if (q->hdr == NULL) {
500                 DEBUG(DEBUG_ERR,("Error copying deferred packet to self\n"));
501                 return;
502         }
503 #if 0
504         /* use this to put packets directly into our recv function */
505         ctdb_input_pkt(q->ctdb, q->hdr);
506 #else
507         event_add_timed(ctdb->ev, q, timeval_zero(), queue_next_trigger, q);
508 #endif
509 }
510
511
512 /*
513   broadcast a packet to all nodes
514 */
515 static void ctdb_broadcast_packet_all(struct ctdb_context *ctdb, 
516                                       struct ctdb_req_header *hdr)
517 {
518         int i;
519         for (i=0; i < ctdb->num_nodes; i++) {
520                 if (ctdb->nodes[i]->flags & NODE_FLAGS_DELETED) {
521                         continue;
522                 }
523                 hdr->destnode = ctdb->nodes[i]->pnn;
524                 ctdb_queue_packet(ctdb, hdr);
525         }
526 }
527
528 /*
529   broadcast a packet to all nodes in the current vnnmap
530 */
531 static void ctdb_broadcast_packet_vnnmap(struct ctdb_context *ctdb, 
532                                          struct ctdb_req_header *hdr)
533 {
534         int i;
535         for (i=0;i<ctdb->vnn_map->size;i++) {
536                 hdr->destnode = ctdb->vnn_map->map[i];
537                 ctdb_queue_packet(ctdb, hdr);
538         }
539 }
540
541 /*
542   broadcast a packet to all connected nodes
543 */
544 static void ctdb_broadcast_packet_connected(struct ctdb_context *ctdb, 
545                                             struct ctdb_req_header *hdr)
546 {
547         int i;
548         for (i=0; i < ctdb->num_nodes; i++) {
549                 if (ctdb->nodes[i]->flags & NODE_FLAGS_DELETED) {
550                         continue;
551                 }
552                 if (!(ctdb->nodes[i]->flags & NODE_FLAGS_DISCONNECTED)) {
553                         hdr->destnode = ctdb->nodes[i]->pnn;
554                         ctdb_queue_packet(ctdb, hdr);
555                 }
556         }
557 }
558
559 /*
560   queue a packet or die
561 */
562 void ctdb_queue_packet(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
563 {
564         struct ctdb_node *node;
565
566         switch (hdr->destnode) {
567         case CTDB_BROADCAST_ALL:
568                 ctdb_broadcast_packet_all(ctdb, hdr);
569                 return;
570         case CTDB_BROADCAST_VNNMAP:
571                 ctdb_broadcast_packet_vnnmap(ctdb, hdr);
572                 return;
573         case CTDB_BROADCAST_CONNECTED:
574                 ctdb_broadcast_packet_connected(ctdb, hdr);
575                 return;
576         }
577
578         CTDB_INCREMENT_STAT(ctdb, node_packets_sent);
579
580         if (!ctdb_validate_pnn(ctdb, hdr->destnode)) {
581                 DEBUG(DEBUG_CRIT,(__location__ " cant send to node %u that does not exist\n", 
582                          hdr->destnode));
583                 return;
584         }
585
586         node = ctdb->nodes[hdr->destnode];
587
588         if (node->flags & NODE_FLAGS_DELETED) {
589                 DEBUG(DEBUG_ERR, (__location__ " Can not queue packet to DELETED node %d\n", hdr->destnode));
590                 return;
591         }
592
593         if (node->pnn == ctdb->pnn) {
594                 ctdb_defer_packet(ctdb, hdr);
595                 return;
596         }
597
598         if (ctdb->methods == NULL) {
599                 DEBUG(DEBUG_ALERT, (__location__ " Can not queue packet. "
600                                     "Transport is DOWN\n"));
601                 return;
602         }
603
604         node->tx_cnt++;
605         if (ctdb->methods->queue_pkt(node, (uint8_t *)hdr, hdr->length) != 0) {
606                 ctdb_fatal(ctdb, "Unable to queue packet\n");
607         }
608 }
609
610
611
612
613 /*
614   a valgrind hack to allow us to get opcode specific backtraces
615   very ugly, and relies on no compiler optimisation!
616 */
617 void ctdb_queue_packet_opcode(struct ctdb_context *ctdb, struct ctdb_req_header *hdr, unsigned opcode)
618 {
619         switch (opcode) {
620 #define DO_OP(x) case x: ctdb_queue_packet(ctdb, hdr); break
621                 DO_OP(1);
622                 DO_OP(2);
623                 DO_OP(3);
624                 DO_OP(4);
625                 DO_OP(5);
626                 DO_OP(6);
627                 DO_OP(7);
628                 DO_OP(8);
629                 DO_OP(9);
630                 DO_OP(10);
631                 DO_OP(11);
632                 DO_OP(12);
633                 DO_OP(13);
634                 DO_OP(14);
635                 DO_OP(15);
636                 DO_OP(16);
637                 DO_OP(17);
638                 DO_OP(18);
639                 DO_OP(19);
640                 DO_OP(20);
641                 DO_OP(21);
642                 DO_OP(22);
643                 DO_OP(23);
644                 DO_OP(24);
645                 DO_OP(25);
646                 DO_OP(26);
647                 DO_OP(27);
648                 DO_OP(28);
649                 DO_OP(29);
650                 DO_OP(30);
651                 DO_OP(31);
652                 DO_OP(32);
653                 DO_OP(33);
654                 DO_OP(34);
655                 DO_OP(35);
656                 DO_OP(36);
657                 DO_OP(37);
658                 DO_OP(38);
659                 DO_OP(39);
660                 DO_OP(40);
661                 DO_OP(41);
662                 DO_OP(42);
663                 DO_OP(43);
664                 DO_OP(44);
665                 DO_OP(45);
666                 DO_OP(46);
667                 DO_OP(47);
668                 DO_OP(48);
669                 DO_OP(49);
670                 DO_OP(50);
671                 DO_OP(51);
672                 DO_OP(52);
673                 DO_OP(53);
674                 DO_OP(54);
675                 DO_OP(55);
676                 DO_OP(56);
677                 DO_OP(57);
678                 DO_OP(58);
679                 DO_OP(59);
680                 DO_OP(60);
681                 DO_OP(61);
682                 DO_OP(62);
683                 DO_OP(63);
684                 DO_OP(64);
685                 DO_OP(65);
686                 DO_OP(66);
687                 DO_OP(67);
688                 DO_OP(68);
689                 DO_OP(69);
690                 DO_OP(70);
691                 DO_OP(71);
692                 DO_OP(72);
693                 DO_OP(73);
694                 DO_OP(74);
695                 DO_OP(75);
696                 DO_OP(76);
697                 DO_OP(77);
698                 DO_OP(78);
699                 DO_OP(79);
700                 DO_OP(80);
701                 DO_OP(81);
702                 DO_OP(82);
703                 DO_OP(83);
704                 DO_OP(84);
705                 DO_OP(85);
706                 DO_OP(86);
707                 DO_OP(87);
708                 DO_OP(88);
709                 DO_OP(89);
710                 DO_OP(90);
711                 DO_OP(91);
712                 DO_OP(92);
713                 DO_OP(93);
714                 DO_OP(94);
715                 DO_OP(95);
716                 DO_OP(96);
717                 DO_OP(97);
718                 DO_OP(98);
719                 DO_OP(99);
720                 DO_OP(100);
721         default: 
722                 ctdb_queue_packet(ctdb, hdr);
723                 break;
724         }
725 }