r23798: updated old Temple Place FSF addresses to new URL
[kai/samba.git] / source4 / cluster / ctdb / common / ctdb_client.c
1 /* 
2    ctdb daemon code
3
4    Copyright (C) Andrew Tridgell  2007
5    Copyright (C) Ronnie Sahlberg  2007
6
7    This library is free software; you can redistribute it and/or
8    modify it under the terms of the GNU Lesser General Public
9    License as published by the Free Software Foundation; either
10    version 3 of the License, or (at your option) any later version.
11
12    This library is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15    Lesser General Public License for more details.
16
17    You should have received a copy of the GNU Lesser General Public
18    License along with this library; if not, see <http://www.gnu.org/licenses/>.
19 */
20
21 #include "includes.h"
22 #include "db_wrap.h"
23 #include "lib/tdb/include/tdb.h"
24 #include "lib/events/events.h"
25 #include "lib/util/dlinklist.h"
26 #include "system/network.h"
27 #include "system/filesys.h"
28 #include "../include/ctdb.h"
29 #include "../include/ctdb_private.h"
30
31 /*
32   queue a packet for sending from client to daemon
33 */
34 static int ctdb_client_queue_pkt(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
35 {
36         return ctdb_queue_send(ctdb->daemon.queue, (uint8_t *)hdr, hdr->length);
37 }
38
39
40 /*
41   handle a connect wait reply packet
42  */
43 static void ctdb_reply_connect_wait(struct ctdb_context *ctdb, 
44                                     struct ctdb_req_header *hdr)
45 {
46         struct ctdb_reply_connect_wait *r = (struct ctdb_reply_connect_wait *)hdr;
47         ctdb->num_connected = r->num_connected;
48 }
49
50 /*
51   state of a in-progress ctdb call in client
52 */
53 struct ctdb_client_call_state {
54         enum call_state state;
55         uint32_t reqid;
56         struct ctdb_db_context *ctdb_db;
57         struct ctdb_call call;
58 };
59
60 /*
61   called when a CTDB_REPLY_CALL packet comes in in the client
62
63   This packet comes in response to a CTDB_REQ_CALL request packet. It
64   contains any reply data from the call
65 */
66 static void ctdb_client_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
67 {
68         struct ctdb_reply_call *c = (struct ctdb_reply_call *)hdr;
69         struct ctdb_client_call_state *state;
70
71         state = idr_find_type(ctdb->idr, hdr->reqid, struct ctdb_client_call_state);
72         if (state == NULL) {
73                 DEBUG(0, ("reqid %d not found\n", hdr->reqid));
74                 return;
75         }
76
77         state->call.reply_data.dptr = c->data;
78         state->call.reply_data.dsize = c->datalen;
79         state->call.status = c->status;
80
81         talloc_steal(state, c);
82
83         state->state = CTDB_CALL_DONE;
84 }
85
86 static void ctdb_reply_status(struct ctdb_context *ctdb, struct ctdb_req_header *hdr);
87
88 /*
89   this is called in the client, when data comes in from the daemon
90  */
91 static void ctdb_client_read_cb(uint8_t *data, size_t cnt, void *args)
92 {
93         struct ctdb_context *ctdb = talloc_get_type(args, struct ctdb_context);
94         struct ctdb_req_header *hdr = (struct ctdb_req_header *)data;
95         TALLOC_CTX *tmp_ctx;
96
97         /* place the packet as a child of a tmp_ctx. We then use
98            talloc_free() below to free it. If any of the calls want
99            to keep it, then they will steal it somewhere else, and the
100            talloc_free() will be a no-op */
101         tmp_ctx = talloc_new(ctdb);
102         talloc_steal(tmp_ctx, hdr);
103
104         if (cnt == 0) {
105                 DEBUG(2,("Daemon has exited - shutting down client\n"));
106                 exit(0);
107         }
108
109         if (cnt < sizeof(*hdr)) {
110                 DEBUG(0,("Bad packet length %d in client\n", cnt));
111                 goto done;
112         }
113         if (cnt != hdr->length) {
114                 ctdb_set_error(ctdb, "Bad header length %d expected %d in client\n", 
115                                hdr->length, cnt);
116                 goto done;
117         }
118
119         if (hdr->ctdb_magic != CTDB_MAGIC) {
120                 ctdb_set_error(ctdb, "Non CTDB packet rejected in client\n");
121                 goto done;
122         }
123
124         if (hdr->ctdb_version != CTDB_VERSION) {
125                 ctdb_set_error(ctdb, "Bad CTDB version 0x%x rejected in client\n", hdr->ctdb_version);
126                 goto done;
127         }
128
129         switch (hdr->operation) {
130         case CTDB_REPLY_CALL:
131                 ctdb_client_reply_call(ctdb, hdr);
132                 break;
133
134         case CTDB_REQ_MESSAGE:
135                 ctdb_request_message(ctdb, hdr);
136                 break;
137
138         case CTDB_REPLY_CONNECT_WAIT:
139                 ctdb_reply_connect_wait(ctdb, hdr);
140                 break;
141
142         case CTDB_REPLY_STATUS:
143                 ctdb_reply_status(ctdb, hdr);
144                 break;
145
146         default:
147                 DEBUG(0,("bogus operation code:%d\n",hdr->operation));
148         }
149
150 done:
151         talloc_free(tmp_ctx);
152 }
153
154 /*
155   connect to a unix domain socket
156 */
157 int ctdb_socket_connect(struct ctdb_context *ctdb)
158 {
159         struct sockaddr_un addr;
160
161         memset(&addr, 0, sizeof(addr));
162         addr.sun_family = AF_UNIX;
163         strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path));
164
165         ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
166         if (ctdb->daemon.sd == -1) {
167                 return -1;
168         }
169         
170         if (connect(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
171                 close(ctdb->daemon.sd);
172                 ctdb->daemon.sd = -1;
173                 return -1;
174         }
175
176         ctdb->daemon.queue = ctdb_queue_setup(ctdb, ctdb, ctdb->daemon.sd, 
177                                               CTDB_DS_ALIGNMENT, 
178                                               ctdb_client_read_cb, ctdb);
179         return 0;
180 }
181
182
183 struct ctdb_record_handle {
184         struct ctdb_db_context *ctdb_db;
185         TDB_DATA key;
186         TDB_DATA *data;
187         struct ctdb_ltdb_header header;
188 };
189
190
191 /*
192   make a recv call to the local ctdb daemon - called from client context
193
194   This is called when the program wants to wait for a ctdb_call to complete and get the 
195   results. This call will block unless the call has already completed.
196 */
197 int ctdb_call_recv(struct ctdb_client_call_state *state, struct ctdb_call *call)
198 {
199         while (state->state < CTDB_CALL_DONE) {
200                 event_loop_once(state->ctdb_db->ctdb->ev);
201         }
202         if (state->state != CTDB_CALL_DONE) {
203                 DEBUG(0,(__location__ " ctdb_call_recv failed\n"));
204                 talloc_free(state);
205                 return -1;
206         }
207
208         if (state->call.reply_data.dsize) {
209                 call->reply_data.dptr = talloc_memdup(state->ctdb_db,
210                                                       state->call.reply_data.dptr,
211                                                       state->call.reply_data.dsize);
212                 call->reply_data.dsize = state->call.reply_data.dsize;
213         } else {
214                 call->reply_data.dptr = NULL;
215                 call->reply_data.dsize = 0;
216         }
217         call->status = state->call.status;
218         talloc_free(state);
219
220         return 0;
221 }
222
223
224
225
226 /*
227   destroy a ctdb_call in client
228 */
229 static int ctdb_client_call_destructor(struct ctdb_client_call_state *state)    
230 {
231         idr_remove(state->ctdb_db->ctdb->idr, state->reqid);
232         return 0;
233 }
234
235 /*
236   construct an event driven local ctdb_call
237
238   this is used so that locally processed ctdb_call requests are processed
239   in an event driven manner
240 */
241 static struct ctdb_client_call_state *ctdb_client_call_local_send(struct ctdb_db_context *ctdb_db, 
242                                                                   struct ctdb_call *call,
243                                                                   struct ctdb_ltdb_header *header,
244                                                                   TDB_DATA *data)
245 {
246         struct ctdb_client_call_state *state;
247         struct ctdb_context *ctdb = ctdb_db->ctdb;
248         int ret;
249
250         state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
251         CTDB_NO_MEMORY_NULL(ctdb, state);
252
253         talloc_steal(state, data->dptr);
254
255         state->state = CTDB_CALL_DONE;
256         state->call = *call;
257         state->ctdb_db = ctdb_db;
258
259         ret = ctdb_call_local(ctdb_db, &state->call, header, data, ctdb->vnn);
260         talloc_steal(state, state->call.reply_data.dptr);
261
262         return state;
263 }
264
265 /*
266   make a ctdb call to the local daemon - async send. Called from client context.
267
268   This constructs a ctdb_call request and queues it for processing. 
269   This call never blocks.
270 */
271 struct ctdb_client_call_state *ctdb_call_send(struct ctdb_db_context *ctdb_db, 
272                                        struct ctdb_call *call)
273 {
274         struct ctdb_client_call_state *state;
275         struct ctdb_context *ctdb = ctdb_db->ctdb;
276         struct ctdb_ltdb_header header;
277         TDB_DATA data;
278         int ret;
279         size_t len;
280         struct ctdb_req_call *c;
281
282         /* if the domain socket is not yet open, open it */
283         if (ctdb->daemon.sd==-1) {
284                 ctdb_socket_connect(ctdb);
285         }
286
287         ret = ctdb_ltdb_lock(ctdb_db, call->key);
288         if (ret != 0) {
289                 DEBUG(0,(__location__ " Failed to get chainlock\n"));
290                 return NULL;
291         }
292
293         ret = ctdb_ltdb_fetch(ctdb_db, call->key, &header, ctdb_db, &data);
294         if (ret != 0) {
295                 ctdb_ltdb_unlock(ctdb_db, call->key);
296                 DEBUG(0,(__location__ " Failed to fetch record\n"));
297                 return NULL;
298         }
299
300         if (header.dmaster == ctdb->vnn && !(ctdb->flags & CTDB_FLAG_SELF_CONNECT)) {
301                 state = ctdb_client_call_local_send(ctdb_db, call, &header, &data);
302                 talloc_free(data.dptr);
303                 ctdb_ltdb_unlock(ctdb_db, call->key);
304                 return state;
305         }
306
307         ctdb_ltdb_unlock(ctdb_db, call->key);
308         talloc_free(data.dptr);
309
310         state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
311         if (state == NULL) {
312                 DEBUG(0, (__location__ " failed to allocate state\n"));
313                 return NULL;
314         }
315
316         len = offsetof(struct ctdb_req_call, data) + call->key.dsize + call->call_data.dsize;
317         c = ctdbd_allocate_pkt(state, len);
318         if (c == NULL) {
319                 DEBUG(0, (__location__ " failed to allocate packet\n"));
320                 return NULL;
321         }
322         talloc_set_name_const(c, "ctdb client req_call packet");
323         memset(c, 0, offsetof(struct ctdb_req_call, data));
324
325         c->hdr.length    = len;
326         c->hdr.ctdb_magic = CTDB_MAGIC;
327         c->hdr.ctdb_version = CTDB_VERSION;
328         c->hdr.operation = CTDB_REQ_CALL;
329         /* this limits us to 16k outstanding messages - not unreasonable */
330         c->hdr.reqid     = idr_get_new(ctdb->idr, state, 0xFFFF);
331         c->flags         = call->flags;
332         c->db_id         = ctdb_db->db_id;
333         c->callid        = call->call_id;
334         c->keylen        = call->key.dsize;
335         c->calldatalen   = call->call_data.dsize;
336         memcpy(&c->data[0], call->key.dptr, call->key.dsize);
337         memcpy(&c->data[call->key.dsize], 
338                call->call_data.dptr, call->call_data.dsize);
339         state->call                = *call;
340         state->call.call_data.dptr = &c->data[call->key.dsize];
341         state->call.key.dptr       = &c->data[0];
342
343         state->state  = CTDB_CALL_WAIT;
344         state->ctdb_db = ctdb_db;
345         state->reqid = c->hdr.reqid;
346
347         talloc_set_destructor(state, ctdb_client_call_destructor);
348
349         ctdb_client_queue_pkt(ctdb, &c->hdr);
350
351         return state;
352 }
353
354
355 /*
356   full ctdb_call. Equivalent to a ctdb_call_send() followed by a ctdb_call_recv()
357 */
358 int ctdb_call(struct ctdb_db_context *ctdb_db, struct ctdb_call *call)
359 {
360         struct ctdb_client_call_state *state;
361
362         state = ctdb_call_send(ctdb_db, call);
363         return ctdb_call_recv(state, call);
364 }
365
366
367 /*
368   tell the daemon what messaging srvid we will use, and register the message
369   handler function in the client
370 */
371 int ctdb_set_message_handler(struct ctdb_context *ctdb, uint32_t srvid, 
372                              ctdb_message_fn_t handler,
373                              void *private_data)
374                                     
375 {
376         struct ctdb_req_register c;
377         int res;
378
379         /* if the domain socket is not yet open, open it */
380         if (ctdb->daemon.sd==-1) {
381                 ctdb_socket_connect(ctdb);
382         }
383
384         ZERO_STRUCT(c);
385
386         c.hdr.length       = sizeof(c);
387         c.hdr.ctdb_magic   = CTDB_MAGIC;
388         c.hdr.ctdb_version = CTDB_VERSION;
389         c.hdr.operation    = CTDB_REQ_REGISTER;
390         c.srvid            = srvid;
391
392         res = ctdb_client_queue_pkt(ctdb, &c.hdr);
393         if (res != 0) {
394                 return res;
395         }
396
397         /* also need to register the handler with our ctdb structure */
398         return ctdb_register_message_handler(ctdb, ctdb, srvid, handler, private_data);
399 }
400
401
402 /*
403   send a message - from client context
404  */
405 int ctdb_send_message(struct ctdb_context *ctdb, uint32_t vnn,
406                       uint32_t srvid, TDB_DATA data)
407 {
408         struct ctdb_req_message *r;
409         int len, res;
410
411         len = offsetof(struct ctdb_req_message, data) + data.dsize;
412         r = ctdb->methods->allocate_pkt(ctdb, len);
413         CTDB_NO_MEMORY(ctdb, r);
414         talloc_set_name_const(r, "req_message packet");
415
416         r->hdr.length    = len;
417         r->hdr.ctdb_magic = CTDB_MAGIC;
418         r->hdr.ctdb_version = CTDB_VERSION;
419         r->hdr.operation = CTDB_REQ_MESSAGE;
420         r->hdr.destnode  = vnn;
421         r->hdr.srcnode   = ctdb->vnn;
422         r->hdr.reqid     = 0;
423         r->srvid         = srvid;
424         r->datalen       = data.dsize;
425         memcpy(&r->data[0], data.dptr, data.dsize);
426         
427         res = ctdb_client_queue_pkt(ctdb, &r->hdr);
428         if (res != 0) {
429                 return res;
430         }
431
432         talloc_free(r);
433         return 0;
434 }
435
436 /*
437   wait for all nodes to be connected - from client
438  */
439 void ctdb_connect_wait(struct ctdb_context *ctdb)
440 {
441         struct ctdb_req_connect_wait r;
442         int res;
443
444         ZERO_STRUCT(r);
445
446         r.hdr.length     = sizeof(r);
447         r.hdr.ctdb_magic = CTDB_MAGIC;
448         r.hdr.ctdb_version = CTDB_VERSION;
449         r.hdr.operation = CTDB_REQ_CONNECT_WAIT;
450
451         DEBUG(3,("ctdb_connect_wait: sending to ctdbd\n"));
452
453         /* if the domain socket is not yet open, open it */
454         if (ctdb->daemon.sd==-1) {
455                 ctdb_socket_connect(ctdb);
456         }
457         
458         res = ctdb_queue_send(ctdb->daemon.queue, (uint8_t *)&r.hdr, r.hdr.length);
459         if (res != 0) {
460                 DEBUG(0,(__location__ " Failed to queue a connect wait request\n"));
461                 return;
462         }
463
464         DEBUG(3,("ctdb_connect_wait: waiting\n"));
465
466         /* now we can go into the normal wait routine, as the reply packet
467            will update the ctdb->num_connected variable */
468         ctdb_daemon_connect_wait(ctdb);
469 }
470
471 /*
472   cancel a ctdb_fetch_lock operation, releasing the lock
473  */
474 static int fetch_lock_destructor(struct ctdb_record_handle *h)
475 {
476         ctdb_ltdb_unlock(h->ctdb_db, h->key);
477         return 0;
478 }
479
480 /*
481   force the migration of a record to this node
482  */
483 static int ctdb_client_force_migration(struct ctdb_db_context *ctdb_db, TDB_DATA key)
484 {
485         struct ctdb_call call;
486         ZERO_STRUCT(call);
487         call.call_id = CTDB_NULL_FUNC;
488         call.key = key;
489         call.flags = CTDB_IMMEDIATE_MIGRATION;
490         return ctdb_call(ctdb_db, &call);
491 }
492
493 /*
494   get a lock on a record, and return the records data. Blocks until it gets the lock
495  */
496 struct ctdb_record_handle *ctdb_fetch_lock(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
497                                            TDB_DATA key, TDB_DATA *data)
498 {
499         int ret;
500         struct ctdb_record_handle *h;
501
502         /*
503           procedure is as follows:
504
505           1) get the chain lock. 
506           2) check if we are dmaster
507           3) if we are the dmaster then return handle 
508           4) if not dmaster then ask ctdb daemon to make us dmaster, and wait for
509              reply from ctdbd
510           5) when we get the reply, goto (1)
511          */
512
513         h = talloc_zero(mem_ctx, struct ctdb_record_handle);
514         if (h == NULL) {
515                 return NULL;
516         }
517
518         h->ctdb_db = ctdb_db;
519         h->key     = key;
520         h->key.dptr = talloc_memdup(h, key.dptr, key.dsize);
521         if (h->key.dptr == NULL) {
522                 talloc_free(h);
523                 return NULL;
524         }
525         h->data    = data;
526
527         DEBUG(3,("ctdb_fetch_lock: key=%*.*s\n", key.dsize, key.dsize, 
528                  (const char *)key.dptr));
529
530 again:
531         /* step 1 - get the chain lock */
532         ret = ctdb_ltdb_lock(ctdb_db, key);
533         if (ret != 0) {
534                 DEBUG(0, (__location__ " failed to lock ltdb record\n"));
535                 talloc_free(h);
536                 return NULL;
537         }
538
539         DEBUG(4,("ctdb_fetch_lock: got chain lock\n"));
540
541         talloc_set_destructor(h, fetch_lock_destructor);
542
543         ret = ctdb_ltdb_fetch(ctdb_db, key, &h->header, h, data);
544         if (ret != 0) {
545                 ctdb_ltdb_unlock(ctdb_db, key);
546                 talloc_free(h);
547                 return NULL;
548         }
549
550         /* when torturing, ensure we test the remote path */
551         if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
552             random() % 5 == 0) {
553                 h->header.dmaster = (uint32_t)-1;
554         }
555
556
557         DEBUG(4,("ctdb_fetch_lock: done local fetch\n"));
558
559         if (h->header.dmaster != ctdb_db->ctdb->vnn) {
560                 ctdb_ltdb_unlock(ctdb_db, key);
561                 ret = ctdb_client_force_migration(ctdb_db, key);
562                 if (ret != 0) {
563                         DEBUG(4,("ctdb_fetch_lock: force_migration failed\n"));
564                         talloc_free(h);
565                         return NULL;
566                 }
567                 goto again;
568         }
569
570         DEBUG(4,("ctdb_fetch_lock: we are dmaster - done\n"));
571         return h;
572 }
573
574 /*
575   store some data to the record that was locked with ctdb_fetch_lock()
576 */
577 int ctdb_record_store(struct ctdb_record_handle *h, TDB_DATA data)
578 {
579         return ctdb_ltdb_store(h->ctdb_db, h->key, &h->header, data);
580 }
581
582 /*
583   wait until we're the only node left.
584   this function never returns
585 */
586 void ctdb_shutdown(struct ctdb_context *ctdb)
587 {
588         struct ctdb_req_shutdown r;
589         int len;
590
591         /* if the domain socket is not yet open, open it */
592         if (ctdb->daemon.sd==-1) {
593                 ctdb_socket_connect(ctdb);
594         }
595
596         len = sizeof(struct ctdb_req_shutdown);
597         ZERO_STRUCT(r);
598         r.hdr.length       = len;
599         r.hdr.ctdb_magic   = CTDB_MAGIC;
600         r.hdr.ctdb_version = CTDB_VERSION;
601         r.hdr.operation    = CTDB_REQ_SHUTDOWN;
602         r.hdr.reqid        = 0;
603
604         ctdb_client_queue_pkt(ctdb, &(r.hdr));
605
606         /* this event loop will terminate once we receive the reply */
607         while (1) {
608                 event_loop_once(ctdb->ev);
609         }
610 }
611
612 enum ctdb_status_states {CTDB_STATUS_WAIT, CTDB_STATUS_DONE};
613
614 struct ctdb_status_state {
615         uint32_t reqid;
616         struct ctdb_status *status;
617         enum ctdb_status_states state;
618 };
619
620 /*
621   handle a ctdb_reply_status reply
622  */
623 static void ctdb_reply_status(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
624 {
625         struct ctdb_reply_status *r = (struct ctdb_reply_status *)hdr;
626         struct ctdb_status_state *state;
627
628         state = idr_find_type(ctdb->idr, hdr->reqid, struct ctdb_status_state);
629         if (state == NULL) {
630                 DEBUG(0, ("reqid %d not found\n", hdr->reqid));
631                 return;
632         }
633
634         *state->status = r->status;
635         state->state = CTDB_STATUS_DONE;
636 }
637
638 /*
639   wait until we're the only node left.
640   this function never returns
641 */
642 int ctdb_status(struct ctdb_context *ctdb, struct ctdb_status *status)
643 {
644         struct ctdb_req_status r;
645         int ret;
646         struct ctdb_status_state *state;
647
648         /* if the domain socket is not yet open, open it */
649         if (ctdb->daemon.sd==-1) {
650                 ctdb_socket_connect(ctdb);
651         }
652
653         state = talloc(ctdb, struct ctdb_status_state);
654         CTDB_NO_MEMORY(ctdb, state);
655
656         state->reqid = idr_get_new(ctdb->idr, state, 0xFFFF);
657         state->status = status;
658         state->state = CTDB_STATUS_WAIT;
659         
660         ZERO_STRUCT(r);
661         r.hdr.length       = sizeof(r);
662         r.hdr.ctdb_magic   = CTDB_MAGIC;
663         r.hdr.ctdb_version = CTDB_VERSION;
664         r.hdr.operation    = CTDB_REQ_STATUS;
665         r.hdr.reqid        = state->reqid;
666
667         ret = ctdb_client_queue_pkt(ctdb, &(r.hdr));
668         if (ret != 0) {
669                 talloc_free(state);
670                 return -1;
671         }
672         
673         while (state->state == CTDB_STATUS_WAIT) {
674                 event_loop_once(ctdb->ev);
675         }
676
677         talloc_free(state);
678
679         return 0;
680 }
681