merge fixes from samba4
[samba.git] / ctdb / common / ctdb_daemon.c
1 /* 
2    ctdb daemon code
3
4    Copyright (C) Andrew Tridgell  2006
5
6    This library is free software; you can redistribute it and/or
7    modify it under the terms of the GNU Lesser General Public
8    License as published by the Free Software Foundation; either
9    version 2 of the License, or (at your option) any later version.
10
11    This library is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14    Lesser General Public License for more details.
15
16    You should have received a copy of the GNU Lesser General Public
17    License along with this library; if not, write to the Free Software
18    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
19 */
20
21 #include "includes.h"
22 #include "db_wrap.h"
23 #include "lib/tdb/include/tdb.h"
24 #include "lib/events/events.h"
25 #include "lib/util/dlinklist.h"
26 #include "system/network.h"
27 #include "system/filesys.h"
28 #include "system/wait.h"
29 #include "../include/ctdb.h"
30 #include "../include/ctdb_private.h"
31
32 /*
33   structure describing a connected client in the daemon
34  */
35 struct ctdb_client {
36         struct ctdb_context *ctdb;
37         int fd;
38         struct ctdb_queue *queue;
39 };
40
41
42
43 static void daemon_incoming_packet(void *, uint8_t *, uint32_t );
44
45 static void ctdb_main_loop(struct ctdb_context *ctdb)
46 {
47         ctdb->methods->start(ctdb);
48
49         /* go into a wait loop to allow other nodes to complete */
50         event_loop_wait(ctdb->ev);
51
52         DEBUG(0,("event_loop_wait() returned. this should not happen\n"));
53         exit(1);
54 }
55
56
57 static void set_non_blocking(int fd)
58 {
59         unsigned v;
60         v = fcntl(fd, F_GETFL, 0);
61         fcntl(fd, F_SETFL, v | O_NONBLOCK);
62 }
63
64 static void block_signal(int signum)
65 {
66         struct sigaction act;
67
68         memset(&act, 0, sizeof(act));
69
70         act.sa_handler = SIG_IGN;
71         sigemptyset(&act.sa_mask);
72         sigaddset(&act.sa_mask, signum);
73         sigaction(signum, &act, NULL);
74 }
75
76
77 /*
78   send a packet to a client
79  */
80 static int daemon_queue_send(struct ctdb_client *client, struct ctdb_req_header *hdr)
81 {
82         client->ctdb->status.client_packets_sent++;
83         return ctdb_queue_send(client->queue, (uint8_t *)hdr, hdr->length);
84 }
85
86 /*
87   message handler for when we are in daemon mode. This redirects the message
88   to the right client
89  */
90 static void daemon_message_handler(struct ctdb_context *ctdb, uint32_t srvid, 
91                                     TDB_DATA data, void *private_data)
92 {
93         struct ctdb_client *client = talloc_get_type(private_data, struct ctdb_client);
94         struct ctdb_req_message *r;
95         int len;
96
97         /* construct a message to send to the client containing the data */
98         len = offsetof(struct ctdb_req_message, data) + data.dsize;
99         r = ctdbd_allocate_pkt(ctdb, len);
100
101         talloc_set_name_const(r, "req_message packet");
102
103         memset(r, 0, offsetof(struct ctdb_req_message, data));
104
105         r->hdr.length    = len;
106         r->hdr.ctdb_magic = CTDB_MAGIC;
107         r->hdr.ctdb_version = CTDB_VERSION;
108         r->hdr.operation = CTDB_REQ_MESSAGE;
109         r->srvid         = srvid;
110         r->datalen       = data.dsize;
111         memcpy(&r->data[0], data.dptr, data.dsize);
112
113         daemon_queue_send(client, &r->hdr);
114
115         talloc_free(r);
116 }
117                                            
118
119 /*
120   this is called when the ctdb daemon received a ctdb request to 
121   set the srvid from the client
122  */
123 static void daemon_request_register_message_handler(struct ctdb_client *client, 
124                                                     struct ctdb_req_register *c)
125 {
126         int res;
127         res = ctdb_register_message_handler(client->ctdb, client, 
128                                             c->srvid, daemon_message_handler, 
129                                             client);
130         if (res != 0) {
131                 DEBUG(0,(__location__ " Failed to register handler %u in daemon\n", 
132                          c->srvid));
133         } else {
134                 DEBUG(2,(__location__ " Registered message handler for srvid=%u\n", 
135                          c->srvid));
136         }
137 }
138
139
140 /*
141   called when the daemon gets a shutdown request from a client
142  */
143 static void daemon_request_shutdown(struct ctdb_client *client, 
144                                       struct ctdb_req_shutdown *f)
145 {
146         struct ctdb_context *ctdb = talloc_get_type(client->ctdb, struct ctdb_context);
147         int len;
148         uint32_t node;
149
150         /* we dont send to ourself so we can already count one daemon as
151            exiting */
152         ctdb->num_finished++;
153
154
155         /* loop over all nodes of the cluster */
156         for (node=0; node<ctdb->num_nodes;node++) {
157                 struct ctdb_req_finished *rf;
158
159                 /* dont send a message to ourself */
160                 if (ctdb->vnn == node) {
161                         continue;
162                 }
163
164                 len = sizeof(struct ctdb_req_finished);
165                 rf = ctdb->methods->allocate_pkt(ctdb, len);
166                 CTDB_NO_MEMORY_FATAL(ctdb, rf);
167                 talloc_set_name_const(rf, "ctdb_req_finished packet");
168
169                 ZERO_STRUCT(*rf);
170                 rf->hdr.length    = len;
171                 rf->hdr.ctdb_magic = CTDB_MAGIC;
172                 rf->hdr.ctdb_version = CTDB_VERSION;
173                 rf->hdr.operation = CTDB_REQ_FINISHED;
174                 rf->hdr.destnode  = node;
175                 rf->hdr.srcnode   = ctdb->vnn;
176                 rf->hdr.reqid     = 0;
177
178                 ctdb_queue_packet(ctdb, &(rf->hdr));
179
180                 talloc_free(rf);
181         }
182
183         /* wait until all nodes have are prepared to shutdown */
184         while (ctdb->num_finished != ctdb->num_nodes) {
185                 event_loop_once(ctdb->ev);
186         }
187
188         /* all daemons have requested to finish - we now exit */
189         DEBUG(1,("All daemons finished - exiting\n"));
190         _exit(0);
191 }
192
193
194
195 /*
196   called when the daemon gets a connect wait request from a client
197  */
198 static void daemon_request_connect_wait(struct ctdb_client *client, 
199                                         struct ctdb_req_connect_wait *c)
200 {
201         struct ctdb_reply_connect_wait r;
202         int res;
203
204         /* first wait - in the daemon */
205         ctdb_daemon_connect_wait(client->ctdb);
206
207         /* now send the reply */
208         ZERO_STRUCT(r);
209
210         r.hdr.length     = sizeof(r);
211         r.hdr.ctdb_magic = CTDB_MAGIC;
212         r.hdr.ctdb_version = CTDB_VERSION;
213         r.hdr.operation = CTDB_REPLY_CONNECT_WAIT;
214         r.vnn           = ctdb_get_vnn(client->ctdb);
215         r.num_connected = client->ctdb->num_connected;
216         
217         res = daemon_queue_send(client, &r.hdr);
218         if (res != 0) {
219                 DEBUG(0,(__location__ " Failed to queue a connect wait response\n"));
220                 return;
221         }
222 }
223
224
225 /*
226   called when the daemon gets a status request from a client
227  */
228 static void daemon_request_status(struct ctdb_client *client, 
229                                   struct ctdb_req_status *c)
230 {
231         struct ctdb_reply_status r;
232         int res;
233
234         /* now send the reply */
235         ZERO_STRUCT(r);
236
237         r.hdr.length     = sizeof(r);
238         r.hdr.ctdb_magic = CTDB_MAGIC;
239         r.hdr.ctdb_version = CTDB_VERSION;
240         r.hdr.operation = CTDB_REPLY_STATUS;
241         r.hdr.reqid = c->hdr.reqid;
242         r.status = client->ctdb->status;
243         
244         res = daemon_queue_send(client, &r.hdr);
245         if (res != 0) {
246                 DEBUG(0,(__location__ " Failed to queue a connect wait response\n"));
247                 return;
248         }
249 }
250
251 /*
252   destroy a ctdb_client
253 */
254 static int ctdb_client_destructor(struct ctdb_client *client)
255 {
256         close(client->fd);
257         client->fd = -1;
258         return 0;
259 }
260
261
262 /*
263   this is called when the ctdb daemon received a ctdb request message
264   from a local client over the unix domain socket
265  */
266 static void daemon_request_message_from_client(struct ctdb_client *client, 
267                                                struct ctdb_req_message *c)
268 {
269         TDB_DATA data;
270         int res;
271
272         /* maybe the message is for another client on this node */
273         if (ctdb_get_vnn(client->ctdb)==c->hdr.destnode) {
274                 ctdb_request_message(client->ctdb, (struct ctdb_req_header *)c);
275                 return;
276         }
277         
278         /* its for a remote node */
279         data.dptr = &c->data[0];
280         data.dsize = c->datalen;
281         res = ctdb_daemon_send_message(client->ctdb, c->hdr.destnode,
282                                        c->srvid, data);
283         if (res != 0) {
284                 DEBUG(0,(__location__ " Failed to send message to remote node %u\n",
285                          c->hdr.destnode));
286         }
287 }
288
289
290 struct daemon_call_state {
291         struct ctdb_client *client;
292         uint32_t reqid;
293         struct ctdb_call *call;
294         struct timeval start_time;
295 };
296
297 /* 
298    complete a call from a client 
299 */
300 static void daemon_call_from_client_callback(struct ctdb_call_state *state)
301 {
302         struct daemon_call_state *dstate = talloc_get_type(state->async.private_data, 
303                                                            struct daemon_call_state);
304         struct ctdb_reply_call *r;
305         int res;
306         uint32_t length;
307         struct ctdb_client *client = dstate->client;
308
309         talloc_steal(client, dstate);
310         talloc_steal(dstate, dstate->call);
311
312         res = ctdb_daemon_call_recv(state, dstate->call);
313         if (res != 0) {
314                 DEBUG(0, (__location__ " ctdbd_call_recv() returned error\n"));
315                 client->ctdb->status.pending_calls--;
316                 ctdb_latency(&client->ctdb->status.max_call_latency, dstate->start_time);
317                 return;
318         }
319
320         length = offsetof(struct ctdb_reply_call, data) + dstate->call->reply_data.dsize;
321         r = ctdbd_allocate_pkt(dstate, length);
322         if (r == NULL) {
323                 DEBUG(0, (__location__ " Failed to allocate reply_call in ctdb daemon\n"));
324                 client->ctdb->status.pending_calls--;
325                 ctdb_latency(&client->ctdb->status.max_call_latency, dstate->start_time);
326                 return;
327         }
328         memset(r, 0, offsetof(struct ctdb_reply_call, data));
329         r->hdr.length       = length;
330         r->hdr.ctdb_magic   = CTDB_MAGIC;
331         r->hdr.ctdb_version = CTDB_VERSION;
332         r->hdr.operation    = CTDB_REPLY_CALL;
333         r->hdr.reqid        = dstate->reqid;
334         r->datalen          = dstate->call->reply_data.dsize;
335         memcpy(&r->data[0], dstate->call->reply_data.dptr, r->datalen);
336
337         res = daemon_queue_send(client, &r->hdr);
338         if (res != 0) {
339                 DEBUG(0, (__location__ "Failed to queue packet from daemon to client\n"));
340         }
341         ctdb_latency(&client->ctdb->status.max_call_latency, dstate->start_time);
342         talloc_free(dstate);
343         client->ctdb->status.pending_calls--;
344 }
345
346
347 /*
348   this is called when the ctdb daemon received a ctdb request call
349   from a local client over the unix domain socket
350  */
351 static void daemon_request_call_from_client(struct ctdb_client *client, 
352                                             struct ctdb_req_call *c)
353 {
354         struct ctdb_call_state *state;
355         struct ctdb_db_context *ctdb_db;
356         struct daemon_call_state *dstate;
357         struct ctdb_call *call;
358         struct ctdb_ltdb_header header;
359         TDB_DATA key, data;
360         int ret;
361         struct ctdb_context *ctdb = client->ctdb;
362
363         ctdb->status.total_calls++;
364         ctdb->status.pending_calls++;
365
366         ctdb_db = find_ctdb_db(client->ctdb, c->db_id);
367         if (!ctdb_db) {
368                 DEBUG(0, (__location__ " Unknown database in request. db_id==0x%08x",
369                           c->db_id));
370                 ctdb->status.pending_calls--;
371                 return;
372         }
373
374         key.dptr = c->data;
375         key.dsize = c->keylen;
376
377         ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, key, &header, 
378                                            (struct ctdb_req_header *)c, &data,
379                                            daemon_incoming_packet, client);
380         if (ret == -2) {
381                 /* will retry later */
382                 ctdb->status.pending_calls--;
383                 return;
384         }
385
386         if (ret != 0) {
387                 DEBUG(0,(__location__ " Unable to fetch record\n"));
388                 ctdb->status.pending_calls--;
389                 return;
390         }
391
392         dstate = talloc(client, struct daemon_call_state);
393         if (dstate == NULL) {
394                 ctdb_ltdb_unlock(ctdb_db, key);
395                 DEBUG(0,(__location__ " Unable to allocate dstate\n"));
396                 ctdb->status.pending_calls--;
397                 return;
398         }
399         dstate->start_time = timeval_current();
400         dstate->client = client;
401         dstate->reqid  = c->hdr.reqid;
402         talloc_steal(dstate, data.dptr);
403
404         call = dstate->call = talloc_zero(dstate, struct ctdb_call);
405         if (call == NULL) {
406                 ctdb_ltdb_unlock(ctdb_db, key);
407                 DEBUG(0,(__location__ " Unable to allocate call\n"));
408                 ctdb->status.pending_calls--;
409                 ctdb_latency(&ctdb->status.max_call_latency, dstate->start_time);
410                 return;
411         }
412
413         call->call_id = c->callid;
414         call->key = key;
415         call->call_data.dptr = c->data + c->keylen;
416         call->call_data.dsize = c->calldatalen;
417         call->flags = c->flags;
418
419         if (header.dmaster == ctdb->vnn && !(ctdb->flags & CTDB_FLAG_SELF_CONNECT)) {
420                 state = ctdb_call_local_send(ctdb_db, call, &header, &data);
421         } else {
422                 state = ctdb_daemon_call_send_remote(ctdb_db, call, &header);
423         }
424
425         ctdb_ltdb_unlock(ctdb_db, key);
426
427         if (state == NULL) {
428                 DEBUG(0,(__location__ " Unable to setup call send\n"));
429                 ctdb->status.pending_calls--;
430                 ctdb_latency(&ctdb->status.max_call_latency, dstate->start_time);
431                 return;
432         }
433         talloc_steal(state, dstate);
434         talloc_steal(client, state);
435
436         state->async.fn = daemon_call_from_client_callback;
437         state->async.private_data = dstate;
438 }
439
440 /* data contains a packet from the client */
441 static void daemon_incoming_packet(void *p, uint8_t *data, uint32_t nread)
442 {
443         struct ctdb_req_header *hdr = (struct ctdb_req_header *)data;
444         struct ctdb_client *client = talloc_get_type(p, struct ctdb_client);
445         TALLOC_CTX *tmp_ctx;
446         struct ctdb_context *ctdb = client->ctdb;
447
448         /* place the packet as a child of a tmp_ctx. We then use
449            talloc_free() below to free it. If any of the calls want
450            to keep it, then they will steal it somewhere else, and the
451            talloc_free() will be a no-op */
452         tmp_ctx = talloc_new(client);
453         talloc_steal(tmp_ctx, hdr);
454
455         if (hdr->ctdb_magic != CTDB_MAGIC) {
456                 ctdb_set_error(client->ctdb, "Non CTDB packet rejected in daemon\n");
457                 goto done;
458         }
459
460         if (hdr->ctdb_version != CTDB_VERSION) {
461                 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
462                 goto done;
463         }
464
465         switch (hdr->operation) {
466         case CTDB_REQ_CALL:
467                 ctdb->status.client.req_call++;
468                 daemon_request_call_from_client(client, (struct ctdb_req_call *)hdr);
469                 break;
470
471         case CTDB_REQ_REGISTER:
472                 ctdb->status.client.req_register++;
473                 daemon_request_register_message_handler(client, 
474                                                         (struct ctdb_req_register *)hdr);
475                 break;
476         case CTDB_REQ_MESSAGE:
477                 ctdb->status.client.req_message++;
478                 daemon_request_message_from_client(client, (struct ctdb_req_message *)hdr);
479                 break;
480
481         case CTDB_REQ_CONNECT_WAIT:
482                 ctdb->status.client.req_connect_wait++;
483                 daemon_request_connect_wait(client, (struct ctdb_req_connect_wait *)hdr);
484                 break;
485
486         case CTDB_REQ_SHUTDOWN:
487                 ctdb->status.client.req_shutdown++;
488                 daemon_request_shutdown(client, (struct ctdb_req_shutdown *)hdr);
489                 break;
490
491         case CTDB_REQ_STATUS:
492                 ctdb->status.client.req_status++;
493                 daemon_request_status(client, (struct ctdb_req_status *)hdr);
494                 break;
495
496         default:
497                 DEBUG(0,(__location__ " daemon: unrecognized operation %d\n",
498                          hdr->operation));
499         }
500
501 done:
502         talloc_free(tmp_ctx);
503 }
504
505 /*
506   called when the daemon gets a incoming packet
507  */
508 static void ctdb_daemon_read_cb(uint8_t *data, size_t cnt, void *args)
509 {
510         struct ctdb_client *client = talloc_get_type(args, struct ctdb_client);
511         struct ctdb_req_header *hdr;
512
513         if (cnt == 0) {
514                 talloc_free(client);
515                 return;
516         }
517
518         client->ctdb->status.client_packets_recv++;
519
520         if (cnt < sizeof(*hdr)) {
521                 ctdb_set_error(client->ctdb, "Bad packet length %d in daemon\n", cnt);
522                 return;
523         }
524         hdr = (struct ctdb_req_header *)data;
525         if (cnt != hdr->length) {
526                 ctdb_set_error(client->ctdb, "Bad header length %d expected %d\n in daemon", 
527                                hdr->length, cnt);
528                 return;
529         }
530
531         if (hdr->ctdb_magic != CTDB_MAGIC) {
532                 ctdb_set_error(client->ctdb, "Non CTDB packet rejected\n");
533                 return;
534         }
535
536         if (hdr->ctdb_version != CTDB_VERSION) {
537                 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
538                 return;
539         }
540
541         DEBUG(3,(__location__ " client request %d of type %d length %d from "
542                  "node %d to %d\n", hdr->reqid, hdr->operation, hdr->length,
543                  hdr->srcnode, hdr->destnode));
544
545         /* it is the responsibility of the incoming packet function to free 'data' */
546         daemon_incoming_packet(client, data, cnt);
547 }
548
549 static void ctdb_accept_client(struct event_context *ev, struct fd_event *fde, 
550                          uint16_t flags, void *private_data)
551 {
552         struct sockaddr_in addr;
553         socklen_t len;
554         int fd;
555         struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
556         struct ctdb_client *client;
557
558         memset(&addr, 0, sizeof(addr));
559         len = sizeof(addr);
560         fd = accept(ctdb->daemon.sd, (struct sockaddr *)&addr, &len);
561         if (fd == -1) {
562                 return;
563         }
564         set_non_blocking(fd);
565
566         client = talloc_zero(ctdb, struct ctdb_client);
567         client->ctdb = ctdb;
568         client->fd = fd;
569
570         client->queue = ctdb_queue_setup(ctdb, client, fd, CTDB_DS_ALIGNMENT, 
571                                          ctdb_daemon_read_cb, client);
572
573         talloc_set_destructor(client, ctdb_client_destructor);
574 }
575
576
577
578 static void ctdb_read_from_parent(struct event_context *ev, struct fd_event *fde, 
579                          uint16_t flags, void *private_data)
580 {
581         int *fd = private_data;
582         int cnt;
583         char buf;
584
585         /* XXX this is a good place to try doing some cleaning up before exiting */
586         cnt = read(*fd, &buf, 1);
587         if (cnt==0) {
588                 DEBUG(2,(__location__ " parent process exited. filedescriptor dissappeared\n"));
589                 exit(1);
590         } else {
591                 DEBUG(0,(__location__ " ctdb: did not expect data from parent process\n"));
592                 exit(1);
593         }
594 }
595
596
597
598 /*
599   create a unix domain socket and bind it
600   return a file descriptor open on the socket 
601 */
602 static int ux_socket_bind(struct ctdb_context *ctdb)
603 {
604         struct sockaddr_un addr;
605
606         ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
607         if (ctdb->daemon.sd == -1) {
608                 ctdb->daemon.sd = -1;
609                 return -1;
610         }
611
612         set_non_blocking(ctdb->daemon.sd);
613
614         memset(&addr, 0, sizeof(addr));
615         addr.sun_family = AF_UNIX;
616         strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path));
617
618         if (bind(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
619                 close(ctdb->daemon.sd);
620                 ctdb->daemon.sd = -1;
621                 return -1;
622         }       
623         listen(ctdb->daemon.sd, 1);
624
625         return 0;
626 }
627
628 /*
629   delete the socket on exit - called on destruction of autofree context
630  */
631 static int unlink_destructor(const char *name)
632 {
633         unlink(name);
634         return 0;
635 }
636
637 /*
638   start the protocol going
639 */
640 int ctdb_start(struct ctdb_context *ctdb)
641 {
642         pid_t pid;
643         static int fd[2];
644         int res;
645         struct fd_event *fde;
646         const char *domain_socket_name;
647
648         /* generate a name to use for our local socket */
649         ctdb->daemon.name = talloc_asprintf(ctdb, "%s.%s", CTDB_PATH, ctdb->address.address);
650         /* get rid of any old sockets */
651         unlink(ctdb->daemon.name);
652
653         /* create a unix domain stream socket to listen to */
654         res = ux_socket_bind(ctdb);
655         if (res!=0) {
656                 DEBUG(0,(__location__ " Failed to open CTDB unix domain socket\n"));
657                 exit(10);
658         }
659
660         res = pipe(&fd[0]);
661         if (res) {
662                 DEBUG(0,(__location__ " Failed to open pipe for CTDB\n"));
663                 exit(1);
664         }
665         pid = fork();
666         if (pid==-1) {
667                 DEBUG(0,(__location__ " Failed to fork CTDB daemon\n"));
668                 exit(1);
669         }
670
671         if (pid) {
672                 close(fd[0]);
673                 close(ctdb->daemon.sd);
674                 ctdb->daemon.sd = -1;
675                 return 0;
676         }
677
678         block_signal(SIGPIPE);
679
680         /* ensure the socket is deleted on exit of the daemon */
681         domain_socket_name = talloc_strdup(talloc_autofree_context(), ctdb->daemon.name);
682         talloc_set_destructor(domain_socket_name, unlink_destructor);   
683         
684         close(fd[1]);
685
686         ctdb->ev = event_context_init(NULL);
687         fde = event_add_fd(ctdb->ev, ctdb, fd[0], EVENT_FD_READ, ctdb_read_from_parent, &fd[0]);
688         fde = event_add_fd(ctdb->ev, ctdb, ctdb->daemon.sd, EVENT_FD_READ, ctdb_accept_client, ctdb);
689         ctdb_main_loop(ctdb);
690
691         return 0;
692 }
693
694 /*
695   allocate a packet for use in client<->daemon communication
696  */
697 void *ctdbd_allocate_pkt(TALLOC_CTX *mem_ctx, size_t len)
698 {
699         int size;
700
701         size = (len+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
702         return talloc_size(mem_ctx, size);
703 }
704
705 /*
706   called when a CTDB_REQ_FINISHED packet comes in
707 */
708 void ctdb_request_finished(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
709 {
710         ctdb->num_finished++;
711 }