Add back monitoring for time skips, forward as well as backward.
[metze/ctdb/wip.git] / server / ctdb_server.c
1 /* 
2    ctdb main protocol code
3
4    Copyright (C) Andrew Tridgell  2006
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "lib/tdb/include/tdb.h"
22 #include "lib/tevent/tevent.h"
23 #include "lib/util/dlinklist.h"
24 #include "system/network.h"
25 #include "system/filesys.h"
26 #include "../include/ctdb_private.h"
27
28 /*
29   choose the transport we will use
30 */
31 int ctdb_set_transport(struct ctdb_context *ctdb, const char *transport)
32 {
33         ctdb->transport = talloc_strdup(ctdb, transport);
34         CTDB_NO_MEMORY(ctdb, ctdb->transport);
35
36         return 0;
37 }
38
39 /*
40   Check whether an ip is a valid node ip
41   Returns the node id for this ip address or -1
42 */
43 int ctdb_ip_to_nodeid(struct ctdb_context *ctdb, const char *nodeip)
44 {
45         int nodeid;
46
47         for (nodeid=0;nodeid<ctdb->num_nodes;nodeid++) {
48                 if (ctdb->nodes[nodeid]->flags & NODE_FLAGS_DELETED) {
49                         continue;
50                 }
51                 if (!strcmp(ctdb->nodes[nodeid]->address.address, nodeip)) {
52                         return nodeid;
53                 }
54         }
55
56         return -1;
57 }
58
59 /*
60   choose the recovery lock file
61 */
62 int ctdb_set_recovery_lock_file(struct ctdb_context *ctdb, const char *file)
63 {
64         if (ctdb->recovery_lock_file != NULL) {
65                 talloc_free(ctdb->recovery_lock_file);
66                 ctdb->recovery_lock_file = NULL;
67         }
68
69         if (file == NULL) {
70                 DEBUG(DEBUG_ALERT,("Recovery lock file set to \"\". Disabling recovery lock checking\n"));
71                 ctdb->tunable.verify_recovery_lock = 0;
72                 return 0;
73         }
74
75         ctdb->recovery_lock_file = talloc_strdup(ctdb, file);
76         CTDB_NO_MEMORY(ctdb, ctdb->recovery_lock_file);
77
78         return 0;
79 }
80
81 /*
82   set the directory for the local databases
83 */
84 int ctdb_set_tdb_dir(struct ctdb_context *ctdb, const char *dir)
85 {
86         ctdb->db_directory = talloc_strdup(ctdb, dir);
87         if (ctdb->db_directory == NULL) {
88                 return -1;
89         }
90         return 0;
91 }
92
93 /*
94   set the directory for the persistent databases
95 */
96 int ctdb_set_tdb_dir_persistent(struct ctdb_context *ctdb, const char *dir)
97 {
98         ctdb->db_directory_persistent = talloc_strdup(ctdb, dir);
99         if (ctdb->db_directory_persistent == NULL) {
100                 return -1;
101         }
102         return 0;
103 }
104
105 /*
106   set the directory for internal state databases
107 */
108 int ctdb_set_tdb_dir_state(struct ctdb_context *ctdb, const char *dir)
109 {
110         ctdb->db_directory_state = talloc_strdup(ctdb, dir);
111         if (ctdb->db_directory_state == NULL) {
112                 return -1;
113         }
114         return 0;
115 }
116
117 /*
118   add a node to the list of nodes
119 */
120 static int ctdb_add_node(struct ctdb_context *ctdb, char *nstr)
121 {
122         struct ctdb_node *node, **nodep;
123
124         nodep = talloc_realloc(ctdb, ctdb->nodes, struct ctdb_node *, ctdb->num_nodes+1);
125         CTDB_NO_MEMORY(ctdb, nodep);
126
127         ctdb->nodes = nodep;
128         nodep = &ctdb->nodes[ctdb->num_nodes];
129         (*nodep) = talloc_zero(ctdb->nodes, struct ctdb_node);
130         CTDB_NO_MEMORY(ctdb, *nodep);
131         node = *nodep;
132
133         if (ctdb_parse_address(ctdb, node, nstr, &node->address) != 0) {
134                 return -1;
135         }
136         node->ctdb = ctdb;
137         node->name = talloc_asprintf(node, "%s:%u", 
138                                      node->address.address, 
139                                      node->address.port);
140         /* this assumes that the nodes are kept in sorted order, and no gaps */
141         node->pnn = ctdb->num_nodes;
142
143         /* nodes start out disconnected and unhealthy */
144         node->flags = (NODE_FLAGS_DISCONNECTED | NODE_FLAGS_UNHEALTHY);
145
146         if (ctdb->address.address &&
147             ctdb_same_address(&ctdb->address, &node->address)) {
148                 /* for automatic binding to interfaces, see tcp_connect.c */
149                 ctdb->pnn = node->pnn;
150                 node->flags &= ~NODE_FLAGS_DISCONNECTED;
151
152                 /* do we start out in DISABLED mode? */
153                 if (ctdb->start_as_disabled != 0) {
154                         DEBUG(DEBUG_INFO, ("This node is configured to start in DISABLED state\n"));
155                         node->flags |= NODE_FLAGS_DISABLED;
156                 }
157                 /* do we start out in STOPPED mode? */
158                 if (ctdb->start_as_stopped != 0) {
159                         DEBUG(DEBUG_INFO, ("This node is configured to start in STOPPED state\n"));
160                         node->flags |= NODE_FLAGS_STOPPED;
161                 }
162         }
163
164         ctdb->num_nodes++;
165         node->dead_count = 0;
166
167         return 0;
168 }
169
170 /*
171   add an entry for a "deleted" node to the list of nodes.
172   a "deleted" node is a node that is commented out from the nodes file.
173   this is used to prevent that subsequent nodes in the nodes list
174   change their pnn value if a node is "delete" by commenting it out and then
175   using "ctdb reloadnodes" at runtime.
176 */
177 static int ctdb_add_deleted_node(struct ctdb_context *ctdb)
178 {
179         struct ctdb_node *node, **nodep;
180
181         nodep = talloc_realloc(ctdb, ctdb->nodes, struct ctdb_node *, ctdb->num_nodes+1);
182         CTDB_NO_MEMORY(ctdb, nodep);
183
184         ctdb->nodes = nodep;
185         nodep = &ctdb->nodes[ctdb->num_nodes];
186         (*nodep) = talloc_zero(ctdb->nodes, struct ctdb_node);
187         CTDB_NO_MEMORY(ctdb, *nodep);
188         node = *nodep;
189         
190         if (ctdb_parse_address(ctdb, node, "0.0.0.0", &node->address) != 0) {
191                 DEBUG(DEBUG_ERR,("Failed to setup deleted node %d\n", ctdb->num_nodes));
192                 return -1;
193         }
194         node->ctdb = ctdb;
195         node->name = talloc_strdup(node, "0.0.0.0:0");
196
197         /* this assumes that the nodes are kept in sorted order, and no gaps */
198         node->pnn = ctdb->num_nodes;
199
200         /* this node is permanently deleted/disconnected */
201         node->flags = NODE_FLAGS_DELETED|NODE_FLAGS_DISCONNECTED;
202
203         ctdb->num_nodes++;
204         node->dead_count = 0;
205
206         return 0;
207 }
208
209
210 /*
211   setup the node list from a file
212 */
213 int ctdb_set_nlist(struct ctdb_context *ctdb, const char *nlist)
214 {
215         char **lines;
216         int nlines;
217         int i, j, num_present;
218
219         talloc_free(ctdb->nodes);
220         ctdb->nodes     = NULL;
221         ctdb->num_nodes = 0;
222
223         lines = file_lines_load(nlist, &nlines, ctdb);
224         if (lines == NULL) {
225                 ctdb_set_error(ctdb, "Failed to load nlist '%s'\n", nlist);
226                 return -1;
227         }
228         while (nlines > 0 && strcmp(lines[nlines-1], "") == 0) {
229                 nlines--;
230         }
231
232         num_present = 0;
233         for (i=0; i < nlines; i++) {
234                 char *node;
235
236                 node = lines[i];
237                 /* strip leading spaces */
238                 while((*node == ' ') || (*node == '\t')) {
239                         node++;
240                 }
241                 if (*node == '#') {
242                         if (ctdb_add_deleted_node(ctdb) != 0) {
243                                 talloc_free(lines);
244                                 return -1;
245                         }
246                         continue;
247                 }
248                 if (strcmp(node, "") == 0) {
249                         continue;
250                 }
251                 if (ctdb_add_node(ctdb, node) != 0) {
252                         talloc_free(lines);
253                         return -1;
254                 }
255                 num_present++;
256         }
257
258         /* initialize the vnn mapping table now that we have the nodes list,
259            skipping any deleted nodes
260         */
261         ctdb->vnn_map = talloc(ctdb, struct ctdb_vnn_map);
262         CTDB_NO_MEMORY(ctdb, ctdb->vnn_map);
263
264         ctdb->vnn_map->generation = INVALID_GENERATION;
265         ctdb->vnn_map->size = num_present;
266         ctdb->vnn_map->map = talloc_array(ctdb->vnn_map, uint32_t, ctdb->vnn_map->size);
267         CTDB_NO_MEMORY(ctdb, ctdb->vnn_map->map);
268
269         for(i=0, j=0; i < ctdb->vnn_map->size; i++) {
270                 if (ctdb->nodes[i]->flags & NODE_FLAGS_DELETED) {
271                         continue;
272                 }
273                 ctdb->vnn_map->map[j] = i;
274                 j++;
275         }
276         
277         talloc_free(lines);
278         return 0;
279 }
280
281
282 /*
283   setup the local node address
284 */
285 int ctdb_set_address(struct ctdb_context *ctdb, const char *address)
286 {
287         if (ctdb_parse_address(ctdb, ctdb, address, &ctdb->address) != 0) {
288                 return -1;
289         }
290         
291         ctdb->name = talloc_asprintf(ctdb, "%s:%u", 
292                                      ctdb->address.address, 
293                                      ctdb->address.port);
294         return 0;
295 }
296
297
298 /*
299   return the number of active nodes
300 */
301 uint32_t ctdb_get_num_active_nodes(struct ctdb_context *ctdb)
302 {
303         int i;
304         uint32_t count=0;
305         for (i=0; i < ctdb->num_nodes; i++) {
306                 if (!(ctdb->nodes[i]->flags & NODE_FLAGS_INACTIVE)) {
307                         count++;
308                 }
309         }
310         return count;
311 }
312
313
314 /*
315   called when we need to process a packet. This can be a requeued packet
316   after a lockwait, or a real packet from another node
317 */
318 void ctdb_input_pkt(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
319 {
320         TALLOC_CTX *tmp_ctx;
321
322         /* place the packet as a child of the tmp_ctx. We then use
323            talloc_free() below to free it. If any of the calls want
324            to keep it, then they will steal it somewhere else, and the
325            talloc_free() will only free the tmp_ctx */
326         tmp_ctx = talloc_new(ctdb);
327         talloc_steal(tmp_ctx, hdr);
328
329         DEBUG(DEBUG_DEBUG,(__location__ " ctdb request %u of type %u length %u from "
330                  "node %u to %u\n", hdr->reqid, hdr->operation, hdr->length,
331                  hdr->srcnode, hdr->destnode));
332
333         switch (hdr->operation) {
334         case CTDB_REQ_CALL:
335         case CTDB_REPLY_CALL:
336         case CTDB_REQ_DMASTER:
337         case CTDB_REPLY_DMASTER:
338                 /* we dont allow these calls when banned */
339                 if (ctdb->nodes[ctdb->pnn]->flags & NODE_FLAGS_BANNED) {
340                         DEBUG(DEBUG_DEBUG,(__location__ " ctdb operation %u"
341                                 " request %u"
342                                 " length %u from node %u to %u while node"
343                                 " is banned\n",
344                                  hdr->operation, hdr->reqid,
345                                  hdr->length, 
346                                  hdr->srcnode, hdr->destnode));
347                         goto done;
348                 }
349
350                 /* for ctdb_call inter-node operations verify that the
351                    remote node that sent us the call is running in the
352                    same generation instance as this node
353                 */
354                 if (ctdb->vnn_map->generation != hdr->generation) {
355                         DEBUG(DEBUG_DEBUG,(__location__ " ctdb operation %u"
356                                 " request %u"
357                                 " length %u from node %u to %u had an"
358                                 " invalid generation id:%u while our"
359                                 " generation id is:%u\n", 
360                                  hdr->operation, hdr->reqid,
361                                  hdr->length, 
362                                  hdr->srcnode, hdr->destnode, 
363                                  hdr->generation, ctdb->vnn_map->generation));
364                         goto done;
365                 }
366         }
367
368         switch (hdr->operation) {
369         case CTDB_REQ_CALL:
370                 ctdb->statistics.node.req_call++;
371                 ctdb_request_call(ctdb, hdr);
372                 break;
373
374         case CTDB_REPLY_CALL:
375                 ctdb->statistics.node.reply_call++;
376                 ctdb_reply_call(ctdb, hdr);
377                 break;
378
379         case CTDB_REPLY_ERROR:
380                 ctdb->statistics.node.reply_error++;
381                 ctdb_reply_error(ctdb, hdr);
382                 break;
383
384         case CTDB_REQ_DMASTER:
385                 ctdb->statistics.node.req_dmaster++;
386                 ctdb_request_dmaster(ctdb, hdr);
387                 break;
388
389         case CTDB_REPLY_DMASTER:
390                 ctdb->statistics.node.reply_dmaster++;
391                 ctdb_reply_dmaster(ctdb, hdr);
392                 break;
393
394         case CTDB_REQ_MESSAGE:
395                 ctdb->statistics.node.req_message++;
396                 ctdb_request_message(ctdb, hdr);
397                 break;
398
399         case CTDB_REQ_CONTROL:
400                 ctdb->statistics.node.req_control++;
401                 ctdb_request_control(ctdb, hdr);
402                 break;
403
404         case CTDB_REPLY_CONTROL:
405                 ctdb->statistics.node.reply_control++;
406                 ctdb_reply_control(ctdb, hdr);
407                 break;
408
409         case CTDB_REQ_KEEPALIVE:
410                 ctdb->statistics.keepalive_packets_recv++;
411                 break;
412
413         default:
414                 DEBUG(DEBUG_CRIT,("%s: Packet with unknown operation %u\n", 
415                          __location__, hdr->operation));
416                 break;
417         }
418
419 done:
420         talloc_free(tmp_ctx);
421 }
422
423
424 /*
425   called by the transport layer when a node is dead
426 */
427 void ctdb_node_dead(struct ctdb_node *node)
428 {
429         if (node->flags & NODE_FLAGS_DISCONNECTED) {
430                 DEBUG(DEBUG_INFO,("%s: node %s is already marked disconnected: %u connected\n", 
431                          node->ctdb->name, node->name, 
432                          node->ctdb->num_connected));
433                 return;
434         }
435         node->ctdb->num_connected--;
436         node->flags |= NODE_FLAGS_DISCONNECTED | NODE_FLAGS_UNHEALTHY;
437         node->rx_cnt = 0;
438         node->dead_count = 0;
439
440         DEBUG(DEBUG_NOTICE,("%s: node %s is dead: %u connected\n", 
441                  node->ctdb->name, node->name, node->ctdb->num_connected));
442         ctdb_daemon_cancel_controls(node->ctdb, node);
443
444         if (node->ctdb->methods == NULL) {
445                 DEBUG(DEBUG_ERR,(__location__ " Can not restart transport while shutting down daemon.\n"));
446                 return;
447         }
448
449         node->ctdb->methods->restart(node);
450 }
451
452 /*
453   called by the transport layer when a node is connected
454 */
455 void ctdb_node_connected(struct ctdb_node *node)
456 {
457         if (!(node->flags & NODE_FLAGS_DISCONNECTED)) {
458                 DEBUG(DEBUG_INFO,("%s: node %s is already marked connected: %u connected\n", 
459                          node->ctdb->name, node->name, 
460                          node->ctdb->num_connected));
461                 return;
462         }
463         node->ctdb->num_connected++;
464         node->dead_count = 0;
465         node->flags &= ~NODE_FLAGS_DISCONNECTED;
466         node->flags |= NODE_FLAGS_UNHEALTHY;
467         DEBUG(DEBUG_INFO,("%s: connected to %s - %u connected\n", 
468                  node->ctdb->name, node->name, node->ctdb->num_connected));
469 }
470
471 struct queue_next {
472         struct ctdb_context *ctdb;
473         struct ctdb_req_header *hdr;
474 };
475
476
477 /*
478   triggered when a deferred packet is due
479  */
480 static void queue_next_trigger(struct event_context *ev, struct timed_event *te, 
481                                struct timeval t, void *private_data)
482 {
483         struct queue_next *q = talloc_get_type(private_data, struct queue_next);
484         ctdb_input_pkt(q->ctdb, q->hdr);
485         talloc_free(q);
486 }       
487
488 /*
489   defer a packet, so it is processed on the next event loop
490   this is used for sending packets to ourselves
491  */
492 static void ctdb_defer_packet(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
493 {
494         struct queue_next *q;
495         q = talloc(ctdb, struct queue_next);
496         if (q == NULL) {
497                 DEBUG(DEBUG_ERR,(__location__ " Failed to allocate deferred packet\n"));
498                 return;
499         }
500         q->ctdb = ctdb;
501         q->hdr = talloc_memdup(ctdb, hdr, hdr->length);
502         if (q->hdr == NULL) {
503                 DEBUG(DEBUG_ERR,("Error copying deferred packet to self\n"));
504                 return;
505         }
506 #if 0
507         /* use this to put packets directly into our recv function */
508         ctdb_input_pkt(q->ctdb, q->hdr);
509 #else
510         event_add_timed(ctdb->ev, q, timeval_zero(), queue_next_trigger, q);
511 #endif
512 }
513
514
515 /*
516   broadcast a packet to all nodes
517 */
518 static void ctdb_broadcast_packet_all(struct ctdb_context *ctdb, 
519                                       struct ctdb_req_header *hdr)
520 {
521         int i;
522         for (i=0; i < ctdb->num_nodes; i++) {
523                 if (ctdb->nodes[i]->flags & NODE_FLAGS_DELETED) {
524                         continue;
525                 }
526                 hdr->destnode = ctdb->nodes[i]->pnn;
527                 ctdb_queue_packet(ctdb, hdr);
528         }
529 }
530
531 /*
532   broadcast a packet to all nodes in the current vnnmap
533 */
534 static void ctdb_broadcast_packet_vnnmap(struct ctdb_context *ctdb, 
535                                          struct ctdb_req_header *hdr)
536 {
537         int i;
538         for (i=0;i<ctdb->vnn_map->size;i++) {
539                 hdr->destnode = ctdb->vnn_map->map[i];
540                 ctdb_queue_packet(ctdb, hdr);
541         }
542 }
543
544 /*
545   broadcast a packet to all connected nodes
546 */
547 static void ctdb_broadcast_packet_connected(struct ctdb_context *ctdb, 
548                                             struct ctdb_req_header *hdr)
549 {
550         int i;
551         for (i=0; i < ctdb->num_nodes; i++) {
552                 if (ctdb->nodes[i]->flags & NODE_FLAGS_DELETED) {
553                         continue;
554                 }
555                 if (!(ctdb->nodes[i]->flags & NODE_FLAGS_DISCONNECTED)) {
556                         hdr->destnode = ctdb->nodes[i]->pnn;
557                         ctdb_queue_packet(ctdb, hdr);
558                 }
559         }
560 }
561
562 /*
563   queue a packet or die
564 */
565 void ctdb_queue_packet(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
566 {
567         struct ctdb_node *node;
568
569         switch (hdr->destnode) {
570         case CTDB_BROADCAST_ALL:
571                 ctdb_broadcast_packet_all(ctdb, hdr);
572                 return;
573         case CTDB_BROADCAST_VNNMAP:
574                 ctdb_broadcast_packet_vnnmap(ctdb, hdr);
575                 return;
576         case CTDB_BROADCAST_CONNECTED:
577                 ctdb_broadcast_packet_connected(ctdb, hdr);
578                 return;
579         }
580
581         ctdb->statistics.node_packets_sent++;
582
583         if (!ctdb_validate_pnn(ctdb, hdr->destnode)) {
584                 DEBUG(DEBUG_CRIT,(__location__ " cant send to node %u that does not exist\n", 
585                          hdr->destnode));
586                 return;
587         }
588
589         node = ctdb->nodes[hdr->destnode];
590
591         if (node->flags & NODE_FLAGS_DELETED) {
592                 DEBUG(DEBUG_ERR, (__location__ " Can not queue packet to DELETED node %d\n", hdr->destnode));
593                 return;
594         }
595
596         if (node->pnn == ctdb->pnn) {
597                 ctdb_defer_packet(ctdb, hdr);
598                 return;
599         }
600
601         if (ctdb->methods == NULL) {
602                 DEBUG(DEBUG_ALERT, (__location__ " Can not queue packet. "
603                                     "Transport is DOWN\n"));
604                 return;
605         }
606
607         node->tx_cnt++;
608         if (ctdb->methods->queue_pkt(node, (uint8_t *)hdr, hdr->length) != 0) {
609                 ctdb_fatal(ctdb, "Unable to queue packet\n");
610         }
611 }
612
613
614
615
616 /*
617   a valgrind hack to allow us to get opcode specific backtraces
618   very ugly, and relies on no compiler optimisation!
619 */
620 void ctdb_queue_packet_opcode(struct ctdb_context *ctdb, struct ctdb_req_header *hdr, unsigned opcode)
621 {
622         switch (opcode) {
623 #define DO_OP(x) case x: ctdb_queue_packet(ctdb, hdr); break
624                 DO_OP(1);
625                 DO_OP(2);
626                 DO_OP(3);
627                 DO_OP(4);
628                 DO_OP(5);
629                 DO_OP(6);
630                 DO_OP(7);
631                 DO_OP(8);
632                 DO_OP(9);
633                 DO_OP(10);
634                 DO_OP(11);
635                 DO_OP(12);
636                 DO_OP(13);
637                 DO_OP(14);
638                 DO_OP(15);
639                 DO_OP(16);
640                 DO_OP(17);
641                 DO_OP(18);
642                 DO_OP(19);
643                 DO_OP(20);
644                 DO_OP(21);
645                 DO_OP(22);
646                 DO_OP(23);
647                 DO_OP(24);
648                 DO_OP(25);
649                 DO_OP(26);
650                 DO_OP(27);
651                 DO_OP(28);
652                 DO_OP(29);
653                 DO_OP(30);
654                 DO_OP(31);
655                 DO_OP(32);
656                 DO_OP(33);
657                 DO_OP(34);
658                 DO_OP(35);
659                 DO_OP(36);
660                 DO_OP(37);
661                 DO_OP(38);
662                 DO_OP(39);
663                 DO_OP(40);
664                 DO_OP(41);
665                 DO_OP(42);
666                 DO_OP(43);
667                 DO_OP(44);
668                 DO_OP(45);
669                 DO_OP(46);
670                 DO_OP(47);
671                 DO_OP(48);
672                 DO_OP(49);
673                 DO_OP(50);
674                 DO_OP(51);
675                 DO_OP(52);
676                 DO_OP(53);
677                 DO_OP(54);
678                 DO_OP(55);
679                 DO_OP(56);
680                 DO_OP(57);
681                 DO_OP(58);
682                 DO_OP(59);
683                 DO_OP(60);
684                 DO_OP(61);
685                 DO_OP(62);
686                 DO_OP(63);
687                 DO_OP(64);
688                 DO_OP(65);
689                 DO_OP(66);
690                 DO_OP(67);
691                 DO_OP(68);
692                 DO_OP(69);
693                 DO_OP(70);
694                 DO_OP(71);
695                 DO_OP(72);
696                 DO_OP(73);
697                 DO_OP(74);
698                 DO_OP(75);
699                 DO_OP(76);
700                 DO_OP(77);
701                 DO_OP(78);
702                 DO_OP(79);
703                 DO_OP(80);
704                 DO_OP(81);
705                 DO_OP(82);
706                 DO_OP(83);
707                 DO_OP(84);
708                 DO_OP(85);
709                 DO_OP(86);
710                 DO_OP(87);
711                 DO_OP(88);
712                 DO_OP(89);
713                 DO_OP(90);
714                 DO_OP(91);
715                 DO_OP(92);
716                 DO_OP(93);
717                 DO_OP(94);
718                 DO_OP(95);
719                 DO_OP(96);
720                 DO_OP(97);
721                 DO_OP(98);
722                 DO_OP(99);
723                 DO_OP(100);
724         default: 
725                 ctdb_queue_packet(ctdb, hdr);
726                 break;
727         }
728 }