use ctdb_call_info, so struct ctdb_call can be used for top level call
[vlendec/samba-autobuild/.git] / ctdb / common / ctdb_call.c
1 /* 
2    ctdb_call protocol code
3
4    Copyright (C) Andrew Tridgell  2006
5
6    This library is free software; you can redistribute it and/or
7    modify it under the terms of the GNU Lesser General Public
8    License as published by the Free Software Foundation; either
9    version 2 of the License, or (at your option) any later version.
10
11    This library is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14    Lesser General Public License for more details.
15
16    You should have received a copy of the GNU Lesser General Public
17    License along with this library; if not, write to the Free Software
18    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
19 */
20 /*
21   see http://wiki.samba.org/index.php/Samba_%26_Clustering for
22   protocol design and packet details
23 */
24 #include "includes.h"
25 #include "lib/events/events.h"
26 #include "lib/tdb/include/tdb.h"
27 #include "system/network.h"
28 #include "system/filesys.h"
29 #include "../include/ctdb_private.h"
30
31 /*
32   queue a packet or die
33 */
34 static void ctdb_queue_packet(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
35 {
36         struct ctdb_node *node;
37         node = ctdb->nodes[hdr->destnode];
38         if (ctdb->methods->queue_pkt(node, (uint8_t *)hdr, hdr->length) != 0) {
39                 ctdb_fatal(ctdb, "Unable to queue packet\n");
40         }
41 }
42
43
44 /*
45   local version of ctdb_call
46 */
47 static int ctdb_call_local(struct ctdb_context *ctdb, TDB_DATA key, 
48                            struct ctdb_ltdb_header *header, TDB_DATA *data,
49                            int call_id, TDB_DATA *call_data, TDB_DATA *reply_data,
50                            uint32_t caller)
51 {
52         struct ctdb_call_info *c;
53         struct ctdb_registered_call *fn;
54
55         c = talloc(ctdb, struct ctdb_call_info);
56         CTDB_NO_MEMORY(ctdb, c);
57
58         c->key = key;
59         c->call_data = call_data;
60         c->record_data.dptr = talloc_memdup(c, data->dptr, data->dsize);
61         c->record_data.dsize = data->dsize;
62         CTDB_NO_MEMORY(ctdb, c->record_data.dptr);
63         c->new_data = NULL;
64         c->reply_data = NULL;
65
66         for (fn=ctdb->calls;fn;fn=fn->next) {
67                 if (fn->id == call_id) break;
68         }
69         if (fn == NULL) {
70                 ctdb_set_error(ctdb, "Unknown call id %u\n", call_id);
71                 return -1;
72         }
73
74         if (fn->fn(c) != 0) {
75                 ctdb_set_error(ctdb, "ctdb_call %u failed\n", call_id);
76                 return -1;
77         }
78
79         if (header->laccessor != caller) {
80                 header->lacount = 0;
81         }
82         header->laccessor = caller;
83         header->lacount++;
84
85         /* we need to force the record to be written out if this was a remote access,
86            so that the lacount is updated */
87         if (c->new_data == NULL && header->laccessor != ctdb->vnn) {
88                 c->new_data = &c->record_data;
89         }
90
91         if (c->new_data) {
92                 if (ctdb_ltdb_store(ctdb, key, header, *c->new_data) != 0) {
93                         ctdb_set_error(ctdb, "ctdb_call tdb_store failed\n");
94                         return -1;
95                 }
96         }
97
98         if (reply_data) {
99                 if (c->reply_data) {
100                         *reply_data = *c->reply_data;
101                         talloc_steal(ctdb, reply_data->dptr);
102                 } else {
103                         reply_data->dptr = NULL;
104                         reply_data->dsize = 0;
105                 }
106         }
107
108         talloc_free(c);
109
110         return 0;
111 }
112
113 /*
114   send an error reply
115 */
116 static void ctdb_send_error(struct ctdb_context *ctdb, 
117                             struct ctdb_req_header *hdr, uint32_t status,
118                             const char *fmt, ...) PRINTF_ATTRIBUTE(4,5);
119 static void ctdb_send_error(struct ctdb_context *ctdb, 
120                             struct ctdb_req_header *hdr, uint32_t status,
121                             const char *fmt, ...)
122 {
123         va_list ap;
124         struct ctdb_reply_error *r;
125         char *msg;
126         int msglen, len;
127
128         va_start(ap, fmt);
129         msg = talloc_vasprintf(ctdb, fmt, ap);
130         if (msg == NULL) {
131                 ctdb_fatal(ctdb, "Unable to allocate error in ctdb_send_error\n");
132         }
133         va_end(ap);
134
135         msglen = strlen(msg)+1;
136         len = offsetof(struct ctdb_reply_error, msg);
137         r = ctdb->methods->allocate_pkt(ctdb, len + msglen);
138         CTDB_NO_MEMORY_FATAL(ctdb, r);
139
140         r->hdr.length    = len + msglen;
141         r->hdr.operation = CTDB_REPLY_ERROR;
142         r->hdr.destnode  = hdr->srcnode;
143         r->hdr.srcnode   = ctdb->vnn;
144         r->hdr.reqid     = hdr->reqid;
145         r->status        = status;
146         r->msglen        = msglen;
147         memcpy(&r->msg[0], msg, msglen);
148
149         talloc_free(msg);
150
151         ctdb_queue_packet(ctdb, &r->hdr);
152
153         talloc_free(r);
154 }
155
156
157 /*
158   send a redirect reply
159 */
160 static void ctdb_call_send_redirect(struct ctdb_context *ctdb, 
161                                     struct ctdb_req_call *c, 
162                                     struct ctdb_ltdb_header *header)
163 {
164         struct ctdb_reply_redirect *r;
165
166         r = ctdb->methods->allocate_pkt(ctdb, sizeof(*r));
167         CTDB_NO_MEMORY_FATAL(ctdb, r);
168         r->hdr.length = sizeof(*r);
169         r->hdr.operation = CTDB_REPLY_REDIRECT;
170         r->hdr.destnode  = c->hdr.srcnode;
171         r->hdr.srcnode   = ctdb->vnn;
172         r->hdr.reqid     = c->hdr.reqid;
173         r->dmaster       = header->dmaster;
174
175         ctdb_queue_packet(ctdb, &r->hdr);
176
177         talloc_free(r);
178 }
179
180 /*
181   send a dmaster request (give another node the dmaster for a record)
182
183   This is always sent to the lmaster, which ensures that the lmaster
184   always knows who the dmaster is. The lmaster will then send a
185   CTDB_REPLY_DMASTER to the new dmaster
186 */
187 static void ctdb_call_send_dmaster(struct ctdb_context *ctdb, 
188                                    struct ctdb_req_call *c, 
189                                    struct ctdb_ltdb_header *header,
190                                    TDB_DATA *key, TDB_DATA *data)
191 {
192         struct ctdb_req_dmaster *r;
193         int len;
194         
195         len = offsetof(struct ctdb_req_dmaster, data) + key->dsize + data->dsize;
196         r = ctdb->methods->allocate_pkt(ctdb, len);
197         CTDB_NO_MEMORY_FATAL(ctdb, r);
198         r->hdr.length    = len;
199         r->hdr.operation = CTDB_REQ_DMASTER;
200         r->hdr.destnode  = ctdb_lmaster(ctdb, key);
201         r->hdr.srcnode   = ctdb->vnn;
202         r->hdr.reqid     = c->hdr.reqid;
203         r->dmaster       = header->laccessor;
204         r->keylen        = key->dsize;
205         r->datalen       = data->dsize;
206         memcpy(&r->data[0], key->dptr, key->dsize);
207         memcpy(&r->data[key->dsize], data->dptr, data->dsize);
208
209         if (r->hdr.destnode == ctdb->vnn) {
210                 /* we are the lmaster - don't send to ourselves */
211                 ctdb_request_dmaster(ctdb, &r->hdr);
212         } else {
213                 ctdb_queue_packet(ctdb, &r->hdr);
214
215                 /* update the ltdb to record the new dmaster */
216                 header->dmaster = r->hdr.destnode;
217                 ctdb_ltdb_store(ctdb, *key, header, *data);
218         }
219
220         talloc_free(r);
221 }
222
223
224 /*
225   called when a CTDB_REQ_DMASTER packet comes in
226
227   this comes into the lmaster for a record when the current dmaster
228   wants to give up the dmaster role and give it to someone else
229 */
230 void ctdb_request_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
231 {
232         struct ctdb_req_dmaster *c = (struct ctdb_req_dmaster *)hdr;
233         struct ctdb_reply_dmaster *r;
234         TDB_DATA key, data, data2;
235         struct ctdb_ltdb_header header;
236         int ret, len;
237
238         key.dptr = c->data;
239         key.dsize = c->keylen;
240         data.dptr = c->data + c->keylen;
241         data.dsize = c->datalen;
242
243         /* fetch the current record */
244         ret = ctdb_ltdb_fetch(ctdb, key, &header, &data2);
245         if (ret != 0) {
246                 ctdb_fatal(ctdb, "ctdb_req_dmaster failed to fetch record");
247                 return;
248         }
249
250         /* its a protocol error if the sending node is not the current dmaster */
251         if (header.dmaster != hdr->srcnode) {
252                 ctdb_fatal(ctdb, "dmaster request from non-master");
253                 return;
254         }
255
256         header.dmaster = c->dmaster;
257         if (ctdb_ltdb_store(ctdb, key, &header, data) != 0) {
258                 ctdb_fatal(ctdb, "ctdb_req_dmaster unable to update dmaster");
259                 return;
260         }
261
262         /* send the CTDB_REPLY_DMASTER */
263         len = offsetof(struct ctdb_reply_dmaster, data) + data.dsize;
264         r = ctdb->methods->allocate_pkt(ctdb, len);
265         CTDB_NO_MEMORY_FATAL(ctdb, r);
266         r->hdr.length    = len;
267         r->hdr.operation = CTDB_REPLY_DMASTER;
268         r->hdr.destnode  = c->dmaster;
269         r->hdr.srcnode   = ctdb->vnn;
270         r->hdr.reqid     = hdr->reqid;
271         r->datalen       = data.dsize;
272         memcpy(&r->data[0], data.dptr, data.dsize);
273
274         if (r->hdr.destnode == r->hdr.srcnode) {
275                 ctdb_reply_dmaster(ctdb, &r->hdr);
276         } else {
277                 ctdb_queue_packet(ctdb, &r->hdr);
278         }
279
280         talloc_free(r);
281 }
282
283
284 /*
285   called when a CTDB_REQ_CALL packet comes in
286 */
287 void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
288 {
289         struct ctdb_req_call *c = (struct ctdb_req_call *)hdr;
290         TDB_DATA key, data, call_data, reply_data;
291         struct ctdb_reply_call *r;
292         int ret, len;
293         struct ctdb_ltdb_header header;
294
295         key.dptr = c->data;
296         key.dsize = c->keylen;
297         call_data.dptr = c->data + c->keylen;
298         call_data.dsize = c->calldatalen;
299
300         /* determine if we are the dmaster for this key. This also
301            fetches the record data (if any), thus avoiding a 2nd fetch of the data 
302            if the call will be answered locally */
303         ret = ctdb_ltdb_fetch(ctdb, key, &header, &data);
304         if (ret != 0) {
305                 ctdb_send_error(ctdb, hdr, ret, "ltdb fetch failed in ctdb_request_call");
306                 return;
307         }
308
309         /* if we are not the dmaster, then send a redirect to the
310            requesting node */
311         if (header.dmaster != ctdb->vnn) {
312                 ctdb_call_send_redirect(ctdb, c, &header);
313                 talloc_free(data.dptr);
314                 return;
315         }
316
317         /* if this nodes has done enough consecutive calls on the same record
318            then give them the record */
319         if (header.laccessor == c->hdr.srcnode &&
320             header.lacount >= ctdb->max_lacount) {
321                 ctdb_call_send_dmaster(ctdb, c, &header, &key, &data);
322                 talloc_free(data.dptr);
323                 return;
324         }
325
326         ctdb_call_local(ctdb, key, &header, &data, c->callid, 
327                         call_data.dsize?&call_data:NULL,
328                         &reply_data, c->hdr.srcnode);
329
330         len = offsetof(struct ctdb_reply_call, data) + reply_data.dsize;
331         r = ctdb->methods->allocate_pkt(ctdb, len);
332         CTDB_NO_MEMORY_FATAL(ctdb, r);
333         r->hdr.length    = len;
334         r->hdr.operation = CTDB_REPLY_CALL;
335         r->hdr.destnode  = hdr->srcnode;
336         r->hdr.srcnode   = hdr->destnode;
337         r->hdr.reqid     = hdr->reqid;
338         r->datalen       = reply_data.dsize;
339         memcpy(&r->data[0], reply_data.dptr, reply_data.dsize);
340
341         ctdb_queue_packet(ctdb, &r->hdr);
342
343         talloc_free(reply_data.dptr);
344         talloc_free(r);
345 }
346
347 enum call_state {CTDB_CALL_WAIT, CTDB_CALL_DONE, CTDB_CALL_ERROR};
348
349 /*
350   state of a in-progress ctdb call
351 */
352 struct ctdb_call_state {
353         enum call_state state;
354         struct ctdb_req_call *c;
355         struct ctdb_node *node;
356         const char *errmsg;
357         TDB_DATA call_data;
358         TDB_DATA reply_data;
359         TDB_DATA key;
360         int redirect_count;
361         struct ctdb_ltdb_header header;
362 };
363
364
365 /*
366   called when a CTDB_REPLY_CALL packet comes in
367
368   This packet comes in response to a CTDB_REQ_CALL request packet. It
369   contains any reply data freom the call
370 */
371 void ctdb_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
372 {
373         struct ctdb_reply_call *c = (struct ctdb_reply_call *)hdr;
374         struct ctdb_call_state *state;
375         TDB_DATA reply_data;
376
377         state = idr_find(ctdb->idr, hdr->reqid);
378         if (state == NULL) return;
379
380         reply_data.dptr = c->data;
381         reply_data.dsize = c->datalen;
382
383         state->reply_data = reply_data;
384
385         talloc_steal(state, c);
386
387         state->state = CTDB_CALL_DONE;
388 }
389
390 /*
391   called when a CTDB_REPLY_DMASTER packet comes in
392
393   This packet comes in from the lmaster response to a CTDB_REQ_CALL
394   request packet. It means that the current dmaster wants to give us
395   the dmaster role
396 */
397 void ctdb_reply_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
398 {
399         struct ctdb_reply_dmaster *c = (struct ctdb_reply_dmaster *)hdr;
400         struct ctdb_call_state *state;
401         TDB_DATA data;
402
403         state = idr_find(ctdb->idr, hdr->reqid);
404         if (state == NULL) return;
405
406         data.dptr = c->data;
407         data.dsize = c->datalen;
408
409         talloc_steal(state, c);
410
411         /* we're now the dmaster - update our local ltdb with new header
412            and data */
413         state->header.dmaster = ctdb->vnn;
414
415         if (ctdb_ltdb_store(ctdb, state->key, &state->header, data) != 0) {
416                 ctdb_fatal(ctdb, "ctdb_reply_dmaster store failed\n");
417                 return;
418         }
419
420         ctdb_call_local(ctdb, state->key, &state->header, &data, state->c->callid,
421                         state->call_data.dsize?&state->call_data:NULL,
422                         &state->reply_data, ctdb->vnn);
423
424         state->state = CTDB_CALL_DONE;
425 }
426
427
428 /*
429   called when a CTDB_REPLY_ERROR packet comes in
430 */
431 void ctdb_reply_error(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
432 {
433         struct ctdb_reply_error *c = (struct ctdb_reply_error *)hdr;
434         struct ctdb_call_state *state;
435
436         state = idr_find(ctdb->idr, hdr->reqid);
437         if (state == NULL) return;
438
439         talloc_steal(state, c);
440
441         state->state  = CTDB_CALL_ERROR;
442         state->errmsg = (char *)c->msg;
443 }
444
445
446 /*
447   called when a CTDB_REPLY_REDIRECT packet comes in
448
449   This packet arrives when we have sent a CTDB_REQ_CALL request and
450   the node that received it is not the dmaster for the given key. We
451   are given a hint as to what node to try next.
452 */
453 void ctdb_reply_redirect(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
454 {
455         struct ctdb_reply_redirect *c = (struct ctdb_reply_redirect *)hdr;
456         struct ctdb_call_state *state;
457
458         state = idr_find(ctdb->idr, hdr->reqid);
459         if (state == NULL) return;
460
461         talloc_steal(state, c);
462         
463         /* don't allow for too many redirects */
464         if (state->redirect_count++ == CTDB_MAX_REDIRECT) {
465                 c->dmaster = ctdb_lmaster(ctdb, &state->key);
466         }
467
468         /* send it off again */
469         state->node = ctdb->nodes[c->dmaster];
470
471         ctdb_queue_packet(ctdb, &state->c->hdr);
472 }
473
474 /*
475   destroy a ctdb_call
476 */
477 static int ctdb_call_destructor(struct ctdb_call_state *state)
478 {
479         idr_remove(state->node->ctdb->idr, state->c->hdr.reqid);
480         return 0;
481 }
482
483
484 /*
485   called when a ctdb_call times out
486 */
487 void ctdb_call_timeout(struct event_context *ev, struct timed_event *te, 
488                        struct timeval t, void *private)
489 {
490         struct ctdb_call_state *state = talloc_get_type(private, struct ctdb_call_state);
491         state->state = CTDB_CALL_ERROR;
492         ctdb_set_error(state->node->ctdb, "ctdb_call %u timed out",
493                        state->c->hdr.reqid);
494 }
495
496 /*
497   construct an event driven local ctdb_call
498
499   this is used so that locally processed ctdb_call requests are processed
500   in an event driven manner
501 */
502 struct ctdb_call_state *ctdb_call_local_send(struct ctdb_context *ctdb, 
503                                              TDB_DATA key, int call_id, 
504                                              TDB_DATA *call_data, TDB_DATA *reply_data,
505                                              struct ctdb_ltdb_header *header,
506                                              TDB_DATA *data)
507 {
508         struct ctdb_call_state *state;
509         int ret;
510
511         state = talloc_zero(ctdb, struct ctdb_call_state);
512         CTDB_NO_MEMORY_NULL(ctdb, state);
513
514         state->state = CTDB_CALL_DONE;
515         state->node = ctdb->nodes[ctdb->vnn];
516
517         ret = ctdb_call_local(ctdb, key, header, data, 
518                               call_id, call_data, &state->reply_data, 
519                               ctdb->vnn);
520         return state;
521 }
522
523
524 /*
525   make a remote ctdb call - async send
526
527   This constructs a ctdb_call request and queues it for processing. 
528   This call never blocks.
529 */
530 struct ctdb_call_state *ctdb_call_send(struct ctdb_context *ctdb, 
531                                        TDB_DATA key, int call_id, 
532                                        TDB_DATA *call_data, TDB_DATA *reply_data)
533 {
534         uint32_t len;
535         struct ctdb_call_state *state;
536         int ret;
537         struct ctdb_ltdb_header header;
538         TDB_DATA data;
539
540         /*
541           if we are the dmaster for this key then we don't need to
542           send it off at all, we can bypass the network and handle it
543           locally. To find out if we are the dmaster we need to look
544           in our ltdb
545         */
546         ret = ctdb_ltdb_fetch(ctdb, key, &header, &data);
547         if (ret != 0) return NULL;
548
549         if (header.dmaster == ctdb->vnn && !(ctdb->flags & CTDB_FLAG_SELF_CONNECT)) {
550                 return ctdb_call_local_send(ctdb, key, call_id, call_data, reply_data,
551                                             &header, &data);
552         }
553
554         state = talloc_zero(ctdb, struct ctdb_call_state);
555         CTDB_NO_MEMORY_NULL(ctdb, state);
556
557         len = offsetof(struct ctdb_req_call, data) + key.dsize + (call_data?call_data->dsize:0);
558         state->c = ctdb->methods->allocate_pkt(ctdb, len);
559         CTDB_NO_MEMORY_NULL(ctdb, state->c);
560
561         state->c->hdr.length    = len;
562         state->c->hdr.operation = CTDB_REQ_CALL;
563         state->c->hdr.destnode  = header.dmaster;
564         state->c->hdr.srcnode   = ctdb->vnn;
565         /* this limits us to 16k outstanding messages - not unreasonable */
566         state->c->hdr.reqid     = idr_get_new(ctdb->idr, state, 0xFFFF);
567         state->c->callid        = call_id;
568         state->c->keylen        = key.dsize;
569         state->c->calldatalen   = call_data?call_data->dsize:0;
570         memcpy(&state->c->data[0], key.dptr, key.dsize);
571         if (call_data) {
572                 memcpy(&state->c->data[key.dsize], call_data->dptr, call_data->dsize);
573                 state->call_data.dptr = &state->c->data[key.dsize];
574                 state->call_data.dsize = call_data->dsize;
575         }
576         state->key.dptr         = &state->c->data[0];
577         state->key.dsize        = key.dsize;
578
579         state->node   = ctdb->nodes[header.dmaster];
580         state->state  = CTDB_CALL_WAIT;
581         state->header = header;
582
583         talloc_set_destructor(state, ctdb_call_destructor);
584
585         ctdb_queue_packet(ctdb, &state->c->hdr);
586
587         event_add_timed(ctdb->ev, state, timeval_current_ofs(CTDB_REQ_TIMEOUT, 0), 
588                         ctdb_call_timeout, state);
589         return state;
590 }
591
592
593 /*
594   make a remote ctdb call - async recv. 
595
596   This is called when the program wants to wait for a ctdb_call to complete and get the 
597   results. This call will block unless the call has already completed.
598 */
599 int ctdb_call_recv(struct ctdb_call_state *state, TDB_DATA *reply_data)
600 {
601         while (state->state < CTDB_CALL_DONE) {
602                 event_loop_once(state->node->ctdb->ev);
603         }
604         if (state->state != CTDB_CALL_DONE) {
605                 ctdb_set_error(state->node->ctdb, "%s", state->errmsg);
606                 talloc_free(state);
607                 return -1;
608         }
609         if (reply_data) {
610                 reply_data->dptr = talloc_memdup(state->node->ctdb,
611                                                  state->reply_data.dptr,
612                                                  state->reply_data.dsize);
613                 reply_data->dsize = state->reply_data.dsize;
614         }
615         talloc_free(state);
616         return 0;
617 }
618
619 /*
620   full ctdb_call. Equivalent to a ctdb_call_send() followed by a ctdb_call_recv()
621 */
622 int ctdb_call(struct ctdb_context *ctdb, 
623               TDB_DATA key, int call_id, 
624               TDB_DATA *call_data, TDB_DATA *reply_data)
625 {
626         struct ctdb_call_state *state;
627         state = ctdb_call_send(ctdb, key, call_id, call_data, reply_data);
628         return ctdb_call_recv(state, reply_data);
629 }