988f3335de25483a6540e8c0fbb758c7ad8b8239
[abartlet/samba.git/.git] / source4 / cluster / ctdb / common / ctdb_client.c
1 /* 
2    ctdb daemon code
3
4    Copyright (C) Andrew Tridgell  2007
5    Copyright (C) Ronnie Sahlberg  2007
6
7    This library is free software; you can redistribute it and/or
8    modify it under the terms of the GNU Lesser General Public
9    License as published by the Free Software Foundation; either
10    version 3 of the License, or (at your option) any later version.
11
12    This library is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15    Lesser General Public License for more details.
16
17    You should have received a copy of the GNU Lesser General Public
18    License along with this library; if not, write to the Free Software
19    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
20 */
21
22 #include "includes.h"
23 #include "db_wrap.h"
24 #include "lib/tdb/include/tdb.h"
25 #include "lib/events/events.h"
26 #include "lib/util/dlinklist.h"
27 #include "system/network.h"
28 #include "system/filesys.h"
29 #include "../include/ctdb.h"
30 #include "../include/ctdb_private.h"
31
32 /*
33   queue a packet for sending from client to daemon
34 */
35 static int ctdb_client_queue_pkt(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
36 {
37         return ctdb_queue_send(ctdb->daemon.queue, (uint8_t *)hdr, hdr->length);
38 }
39
40
41 /*
42   handle a connect wait reply packet
43  */
44 static void ctdb_reply_connect_wait(struct ctdb_context *ctdb, 
45                                     struct ctdb_req_header *hdr)
46 {
47         struct ctdb_reply_connect_wait *r = (struct ctdb_reply_connect_wait *)hdr;
48         ctdb->num_connected = r->num_connected;
49 }
50
51 /*
52   state of a in-progress ctdb call in client
53 */
54 struct ctdb_client_call_state {
55         enum call_state state;
56         uint32_t reqid;
57         struct ctdb_db_context *ctdb_db;
58         struct ctdb_call call;
59 };
60
61 /*
62   called when a CTDB_REPLY_CALL packet comes in in the client
63
64   This packet comes in response to a CTDB_REQ_CALL request packet. It
65   contains any reply data from the call
66 */
67 static void ctdb_client_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
68 {
69         struct ctdb_reply_call *c = (struct ctdb_reply_call *)hdr;
70         struct ctdb_client_call_state *state;
71
72         state = idr_find_type(ctdb->idr, hdr->reqid, struct ctdb_client_call_state);
73         if (state == NULL) {
74                 DEBUG(0, ("reqid %d not found\n", hdr->reqid));
75                 return;
76         }
77
78         state->call.reply_data.dptr = c->data;
79         state->call.reply_data.dsize = c->datalen;
80         state->call.status = c->status;
81
82         talloc_steal(state, c);
83
84         state->state = CTDB_CALL_DONE;
85 }
86
87 static void ctdb_reply_status(struct ctdb_context *ctdb, struct ctdb_req_header *hdr);
88
89 /*
90   this is called in the client, when data comes in from the daemon
91  */
92 static void ctdb_client_read_cb(uint8_t *data, size_t cnt, void *args)
93 {
94         struct ctdb_context *ctdb = talloc_get_type(args, struct ctdb_context);
95         struct ctdb_req_header *hdr = (struct ctdb_req_header *)data;
96         TALLOC_CTX *tmp_ctx;
97
98         /* place the packet as a child of a tmp_ctx. We then use
99            talloc_free() below to free it. If any of the calls want
100            to keep it, then they will steal it somewhere else, and the
101            talloc_free() will be a no-op */
102         tmp_ctx = talloc_new(ctdb);
103         talloc_steal(tmp_ctx, hdr);
104
105         if (cnt == 0) {
106                 DEBUG(2,("Daemon has exited - shutting down client\n"));
107                 exit(0);
108         }
109
110         if (cnt < sizeof(*hdr)) {
111                 DEBUG(0,("Bad packet length %d in client\n", cnt));
112                 goto done;
113         }
114         if (cnt != hdr->length) {
115                 ctdb_set_error(ctdb, "Bad header length %d expected %d in client\n", 
116                                hdr->length, cnt);
117                 goto done;
118         }
119
120         if (hdr->ctdb_magic != CTDB_MAGIC) {
121                 ctdb_set_error(ctdb, "Non CTDB packet rejected in client\n");
122                 goto done;
123         }
124
125         if (hdr->ctdb_version != CTDB_VERSION) {
126                 ctdb_set_error(ctdb, "Bad CTDB version 0x%x rejected in client\n", hdr->ctdb_version);
127                 goto done;
128         }
129
130         switch (hdr->operation) {
131         case CTDB_REPLY_CALL:
132                 ctdb_client_reply_call(ctdb, hdr);
133                 break;
134
135         case CTDB_REQ_MESSAGE:
136                 ctdb_request_message(ctdb, hdr);
137                 break;
138
139         case CTDB_REPLY_CONNECT_WAIT:
140                 ctdb_reply_connect_wait(ctdb, hdr);
141                 break;
142
143         case CTDB_REPLY_STATUS:
144                 ctdb_reply_status(ctdb, hdr);
145                 break;
146
147         default:
148                 DEBUG(0,("bogus operation code:%d\n",hdr->operation));
149         }
150
151 done:
152         talloc_free(tmp_ctx);
153 }
154
155 /*
156   connect to a unix domain socket
157 */
158 int ctdb_socket_connect(struct ctdb_context *ctdb)
159 {
160         struct sockaddr_un addr;
161
162         memset(&addr, 0, sizeof(addr));
163         addr.sun_family = AF_UNIX;
164         strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path));
165
166         ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
167         if (ctdb->daemon.sd == -1) {
168                 return -1;
169         }
170         
171         if (connect(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
172                 close(ctdb->daemon.sd);
173                 ctdb->daemon.sd = -1;
174                 return -1;
175         }
176
177         ctdb->daemon.queue = ctdb_queue_setup(ctdb, ctdb, ctdb->daemon.sd, 
178                                               CTDB_DS_ALIGNMENT, 
179                                               ctdb_client_read_cb, ctdb);
180         return 0;
181 }
182
183
184 struct ctdb_record_handle {
185         struct ctdb_db_context *ctdb_db;
186         TDB_DATA key;
187         TDB_DATA *data;
188         struct ctdb_ltdb_header header;
189 };
190
191
192 /*
193   make a recv call to the local ctdb daemon - called from client context
194
195   This is called when the program wants to wait for a ctdb_call to complete and get the 
196   results. This call will block unless the call has already completed.
197 */
198 int ctdb_call_recv(struct ctdb_client_call_state *state, struct ctdb_call *call)
199 {
200         while (state->state < CTDB_CALL_DONE) {
201                 event_loop_once(state->ctdb_db->ctdb->ev);
202         }
203         if (state->state != CTDB_CALL_DONE) {
204                 DEBUG(0,(__location__ " ctdb_call_recv failed\n"));
205                 talloc_free(state);
206                 return -1;
207         }
208
209         if (state->call.reply_data.dsize) {
210                 call->reply_data.dptr = talloc_memdup(state->ctdb_db,
211                                                       state->call.reply_data.dptr,
212                                                       state->call.reply_data.dsize);
213                 call->reply_data.dsize = state->call.reply_data.dsize;
214         } else {
215                 call->reply_data.dptr = NULL;
216                 call->reply_data.dsize = 0;
217         }
218         call->status = state->call.status;
219         talloc_free(state);
220
221         return 0;
222 }
223
224
225
226
227 /*
228   destroy a ctdb_call in client
229 */
230 static int ctdb_client_call_destructor(struct ctdb_client_call_state *state)    
231 {
232         idr_remove(state->ctdb_db->ctdb->idr, state->reqid);
233         return 0;
234 }
235
236 /*
237   construct an event driven local ctdb_call
238
239   this is used so that locally processed ctdb_call requests are processed
240   in an event driven manner
241 */
242 static struct ctdb_client_call_state *ctdb_client_call_local_send(struct ctdb_db_context *ctdb_db, 
243                                                                   struct ctdb_call *call,
244                                                                   struct ctdb_ltdb_header *header,
245                                                                   TDB_DATA *data)
246 {
247         struct ctdb_client_call_state *state;
248         struct ctdb_context *ctdb = ctdb_db->ctdb;
249         int ret;
250
251         state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
252         CTDB_NO_MEMORY_NULL(ctdb, state);
253
254         talloc_steal(state, data->dptr);
255
256         state->state = CTDB_CALL_DONE;
257         state->call = *call;
258         state->ctdb_db = ctdb_db;
259
260         ret = ctdb_call_local(ctdb_db, &state->call, header, data, ctdb->vnn);
261         talloc_steal(state, state->call.reply_data.dptr);
262
263         return state;
264 }
265
266 /*
267   make a ctdb call to the local daemon - async send. Called from client context.
268
269   This constructs a ctdb_call request and queues it for processing. 
270   This call never blocks.
271 */
272 struct ctdb_client_call_state *ctdb_call_send(struct ctdb_db_context *ctdb_db, 
273                                        struct ctdb_call *call)
274 {
275         struct ctdb_client_call_state *state;
276         struct ctdb_context *ctdb = ctdb_db->ctdb;
277         struct ctdb_ltdb_header header;
278         TDB_DATA data;
279         int ret;
280         size_t len;
281         struct ctdb_req_call *c;
282
283         /* if the domain socket is not yet open, open it */
284         if (ctdb->daemon.sd==-1) {
285                 ctdb_socket_connect(ctdb);
286         }
287
288         ret = ctdb_ltdb_lock(ctdb_db, call->key);
289         if (ret != 0) {
290                 DEBUG(0,(__location__ " Failed to get chainlock\n"));
291                 return NULL;
292         }
293
294         ret = ctdb_ltdb_fetch(ctdb_db, call->key, &header, ctdb_db, &data);
295         if (ret != 0) {
296                 ctdb_ltdb_unlock(ctdb_db, call->key);
297                 DEBUG(0,(__location__ " Failed to fetch record\n"));
298                 return NULL;
299         }
300
301         if (header.dmaster == ctdb->vnn && !(ctdb->flags & CTDB_FLAG_SELF_CONNECT)) {
302                 state = ctdb_client_call_local_send(ctdb_db, call, &header, &data);
303                 talloc_free(data.dptr);
304                 ctdb_ltdb_unlock(ctdb_db, call->key);
305                 return state;
306         }
307
308         ctdb_ltdb_unlock(ctdb_db, call->key);
309         talloc_free(data.dptr);
310
311         state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
312         if (state == NULL) {
313                 DEBUG(0, (__location__ " failed to allocate state\n"));
314                 return NULL;
315         }
316
317         len = offsetof(struct ctdb_req_call, data) + call->key.dsize + call->call_data.dsize;
318         c = ctdbd_allocate_pkt(state, len);
319         if (c == NULL) {
320                 DEBUG(0, (__location__ " failed to allocate packet\n"));
321                 return NULL;
322         }
323         talloc_set_name_const(c, "ctdb client req_call packet");
324         memset(c, 0, offsetof(struct ctdb_req_call, data));
325
326         c->hdr.length    = len;
327         c->hdr.ctdb_magic = CTDB_MAGIC;
328         c->hdr.ctdb_version = CTDB_VERSION;
329         c->hdr.operation = CTDB_REQ_CALL;
330         /* this limits us to 16k outstanding messages - not unreasonable */
331         c->hdr.reqid     = idr_get_new(ctdb->idr, state, 0xFFFF);
332         c->flags         = call->flags;
333         c->db_id         = ctdb_db->db_id;
334         c->callid        = call->call_id;
335         c->keylen        = call->key.dsize;
336         c->calldatalen   = call->call_data.dsize;
337         memcpy(&c->data[0], call->key.dptr, call->key.dsize);
338         memcpy(&c->data[call->key.dsize], 
339                call->call_data.dptr, call->call_data.dsize);
340         state->call                = *call;
341         state->call.call_data.dptr = &c->data[call->key.dsize];
342         state->call.key.dptr       = &c->data[0];
343
344         state->state  = CTDB_CALL_WAIT;
345         state->ctdb_db = ctdb_db;
346         state->reqid = c->hdr.reqid;
347
348         talloc_set_destructor(state, ctdb_client_call_destructor);
349
350         ctdb_client_queue_pkt(ctdb, &c->hdr);
351
352         return state;
353 }
354
355
356 /*
357   full ctdb_call. Equivalent to a ctdb_call_send() followed by a ctdb_call_recv()
358 */
359 int ctdb_call(struct ctdb_db_context *ctdb_db, struct ctdb_call *call)
360 {
361         struct ctdb_client_call_state *state;
362
363         state = ctdb_call_send(ctdb_db, call);
364         return ctdb_call_recv(state, call);
365 }
366
367
368 /*
369   tell the daemon what messaging srvid we will use, and register the message
370   handler function in the client
371 */
372 int ctdb_set_message_handler(struct ctdb_context *ctdb, uint32_t srvid, 
373                              ctdb_message_fn_t handler,
374                              void *private_data)
375                                     
376 {
377         struct ctdb_req_register c;
378         int res;
379
380         /* if the domain socket is not yet open, open it */
381         if (ctdb->daemon.sd==-1) {
382                 ctdb_socket_connect(ctdb);
383         }
384
385         ZERO_STRUCT(c);
386
387         c.hdr.length       = sizeof(c);
388         c.hdr.ctdb_magic   = CTDB_MAGIC;
389         c.hdr.ctdb_version = CTDB_VERSION;
390         c.hdr.operation    = CTDB_REQ_REGISTER;
391         c.srvid            = srvid;
392
393         res = ctdb_client_queue_pkt(ctdb, &c.hdr);
394         if (res != 0) {
395                 return res;
396         }
397
398         /* also need to register the handler with our ctdb structure */
399         return ctdb_register_message_handler(ctdb, ctdb, srvid, handler, private_data);
400 }
401
402
403 /*
404   send a message - from client context
405  */
406 int ctdb_send_message(struct ctdb_context *ctdb, uint32_t vnn,
407                       uint32_t srvid, TDB_DATA data)
408 {
409         struct ctdb_req_message *r;
410         int len, res;
411
412         len = offsetof(struct ctdb_req_message, data) + data.dsize;
413         r = ctdb->methods->allocate_pkt(ctdb, len);
414         CTDB_NO_MEMORY(ctdb, r);
415         talloc_set_name_const(r, "req_message packet");
416
417         r->hdr.length    = len;
418         r->hdr.ctdb_magic = CTDB_MAGIC;
419         r->hdr.ctdb_version = CTDB_VERSION;
420         r->hdr.operation = CTDB_REQ_MESSAGE;
421         r->hdr.destnode  = vnn;
422         r->hdr.srcnode   = ctdb->vnn;
423         r->hdr.reqid     = 0;
424         r->srvid         = srvid;
425         r->datalen       = data.dsize;
426         memcpy(&r->data[0], data.dptr, data.dsize);
427         
428         res = ctdb_client_queue_pkt(ctdb, &r->hdr);
429         if (res != 0) {
430                 return res;
431         }
432
433         talloc_free(r);
434         return 0;
435 }
436
437 /*
438   wait for all nodes to be connected - from client
439  */
440 void ctdb_connect_wait(struct ctdb_context *ctdb)
441 {
442         struct ctdb_req_connect_wait r;
443         int res;
444
445         ZERO_STRUCT(r);
446
447         r.hdr.length     = sizeof(r);
448         r.hdr.ctdb_magic = CTDB_MAGIC;
449         r.hdr.ctdb_version = CTDB_VERSION;
450         r.hdr.operation = CTDB_REQ_CONNECT_WAIT;
451
452         DEBUG(3,("ctdb_connect_wait: sending to ctdbd\n"));
453
454         /* if the domain socket is not yet open, open it */
455         if (ctdb->daemon.sd==-1) {
456                 ctdb_socket_connect(ctdb);
457         }
458         
459         res = ctdb_queue_send(ctdb->daemon.queue, (uint8_t *)&r.hdr, r.hdr.length);
460         if (res != 0) {
461                 DEBUG(0,(__location__ " Failed to queue a connect wait request\n"));
462                 return;
463         }
464
465         DEBUG(3,("ctdb_connect_wait: waiting\n"));
466
467         /* now we can go into the normal wait routine, as the reply packet
468            will update the ctdb->num_connected variable */
469         ctdb_daemon_connect_wait(ctdb);
470 }
471
472 /*
473   cancel a ctdb_fetch_lock operation, releasing the lock
474  */
475 static int fetch_lock_destructor(struct ctdb_record_handle *h)
476 {
477         ctdb_ltdb_unlock(h->ctdb_db, h->key);
478         return 0;
479 }
480
481 /*
482   force the migration of a record to this node
483  */
484 static int ctdb_client_force_migration(struct ctdb_db_context *ctdb_db, TDB_DATA key)
485 {
486         struct ctdb_call call;
487         ZERO_STRUCT(call);
488         call.call_id = CTDB_NULL_FUNC;
489         call.key = key;
490         call.flags = CTDB_IMMEDIATE_MIGRATION;
491         return ctdb_call(ctdb_db, &call);
492 }
493
494 /*
495   get a lock on a record, and return the records data. Blocks until it gets the lock
496  */
497 struct ctdb_record_handle *ctdb_fetch_lock(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
498                                            TDB_DATA key, TDB_DATA *data)
499 {
500         int ret;
501         struct ctdb_record_handle *h;
502
503         /*
504           procedure is as follows:
505
506           1) get the chain lock. 
507           2) check if we are dmaster
508           3) if we are the dmaster then return handle 
509           4) if not dmaster then ask ctdb daemon to make us dmaster, and wait for
510              reply from ctdbd
511           5) when we get the reply, goto (1)
512          */
513
514         h = talloc_zero(mem_ctx, struct ctdb_record_handle);
515         if (h == NULL) {
516                 return NULL;
517         }
518
519         h->ctdb_db = ctdb_db;
520         h->key     = key;
521         h->key.dptr = talloc_memdup(h, key.dptr, key.dsize);
522         if (h->key.dptr == NULL) {
523                 talloc_free(h);
524                 return NULL;
525         }
526         h->data    = data;
527
528         DEBUG(3,("ctdb_fetch_lock: key=%*.*s\n", key.dsize, key.dsize, 
529                  (const char *)key.dptr));
530
531 again:
532         /* step 1 - get the chain lock */
533         ret = ctdb_ltdb_lock(ctdb_db, key);
534         if (ret != 0) {
535                 DEBUG(0, (__location__ " failed to lock ltdb record\n"));
536                 talloc_free(h);
537                 return NULL;
538         }
539
540         DEBUG(4,("ctdb_fetch_lock: got chain lock\n"));
541
542         talloc_set_destructor(h, fetch_lock_destructor);
543
544         ret = ctdb_ltdb_fetch(ctdb_db, key, &h->header, h, data);
545         if (ret != 0) {
546                 ctdb_ltdb_unlock(ctdb_db, key);
547                 talloc_free(h);
548                 return NULL;
549         }
550
551         /* when torturing, ensure we test the remote path */
552         if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
553             random() % 5 == 0) {
554                 h->header.dmaster = (uint32_t)-1;
555         }
556
557
558         DEBUG(4,("ctdb_fetch_lock: done local fetch\n"));
559
560         if (h->header.dmaster != ctdb_db->ctdb->vnn) {
561                 ctdb_ltdb_unlock(ctdb_db, key);
562                 ret = ctdb_client_force_migration(ctdb_db, key);
563                 if (ret != 0) {
564                         DEBUG(4,("ctdb_fetch_lock: force_migration failed\n"));
565                         talloc_free(h);
566                         return NULL;
567                 }
568                 goto again;
569         }
570
571         DEBUG(4,("ctdb_fetch_lock: we are dmaster - done\n"));
572         return h;
573 }
574
575 /*
576   store some data to the record that was locked with ctdb_fetch_lock()
577 */
578 int ctdb_record_store(struct ctdb_record_handle *h, TDB_DATA data)
579 {
580         return ctdb_ltdb_store(h->ctdb_db, h->key, &h->header, data);
581 }
582
583 /*
584   wait until we're the only node left.
585   this function never returns
586 */
587 void ctdb_shutdown(struct ctdb_context *ctdb)
588 {
589         struct ctdb_req_shutdown r;
590         int len;
591
592         /* if the domain socket is not yet open, open it */
593         if (ctdb->daemon.sd==-1) {
594                 ctdb_socket_connect(ctdb);
595         }
596
597         len = sizeof(struct ctdb_req_shutdown);
598         ZERO_STRUCT(r);
599         r.hdr.length       = len;
600         r.hdr.ctdb_magic   = CTDB_MAGIC;
601         r.hdr.ctdb_version = CTDB_VERSION;
602         r.hdr.operation    = CTDB_REQ_SHUTDOWN;
603         r.hdr.reqid        = 0;
604
605         ctdb_client_queue_pkt(ctdb, &(r.hdr));
606
607         /* this event loop will terminate once we receive the reply */
608         while (1) {
609                 event_loop_once(ctdb->ev);
610         }
611 }
612
613 enum ctdb_status_states {CTDB_STATUS_WAIT, CTDB_STATUS_DONE};
614
615 struct ctdb_status_state {
616         uint32_t reqid;
617         struct ctdb_status *status;
618         enum ctdb_status_states state;
619 };
620
621 /*
622   handle a ctdb_reply_status reply
623  */
624 static void ctdb_reply_status(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
625 {
626         struct ctdb_reply_status *r = (struct ctdb_reply_status *)hdr;
627         struct ctdb_status_state *state;
628
629         state = idr_find_type(ctdb->idr, hdr->reqid, struct ctdb_status_state);
630         if (state == NULL) {
631                 DEBUG(0, ("reqid %d not found\n", hdr->reqid));
632                 return;
633         }
634
635         *state->status = r->status;
636         state->state = CTDB_STATUS_DONE;
637 }
638
639 /*
640   wait until we're the only node left.
641   this function never returns
642 */
643 int ctdb_status(struct ctdb_context *ctdb, struct ctdb_status *status)
644 {
645         struct ctdb_req_status r;
646         int ret;
647         struct ctdb_status_state *state;
648
649         /* if the domain socket is not yet open, open it */
650         if (ctdb->daemon.sd==-1) {
651                 ctdb_socket_connect(ctdb);
652         }
653
654         state = talloc(ctdb, struct ctdb_status_state);
655         CTDB_NO_MEMORY(ctdb, state);
656
657         state->reqid = idr_get_new(ctdb->idr, state, 0xFFFF);
658         state->status = status;
659         state->state = CTDB_STATUS_WAIT;
660         
661         ZERO_STRUCT(r);
662         r.hdr.length       = sizeof(r);
663         r.hdr.ctdb_magic   = CTDB_MAGIC;
664         r.hdr.ctdb_version = CTDB_VERSION;
665         r.hdr.operation    = CTDB_REQ_STATUS;
666         r.hdr.reqid        = state->reqid;
667
668         ret = ctdb_client_queue_pkt(ctdb, &(r.hdr));
669         if (ret != 0) {
670                 talloc_free(state);
671                 return -1;
672         }
673         
674         while (state->state == CTDB_STATUS_WAIT) {
675                 event_loop_once(ctdb->ev);
676         }
677
678         talloc_free(state);
679
680         return 0;
681 }
682