e3e60685b05e351c1c998c82ecf67f980c2ea10f
[amitay/samba.git] / source4 / cluster / ctdb / ctdb_cluster.c
1 /* 
2    Unix SMB/CIFS implementation.
3
4    ctdb clustering hooks
5
6    Copyright (C) Andrew Tridgell 2006
7    
8    This program is free software; you can redistribute it and/or modify
9    it under the terms of the GNU General Public License as published by
10    the Free Software Foundation; either version 2 of the License, or
11    (at your option) any later version.
12    
13    This program is distributed in the hope that it will be useful,
14    but WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16    GNU General Public License for more details.
17    
18    You should have received a copy of the GNU General Public License
19    along with this program; if not, write to the Free Software
20    Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21 */
22
23 #include "includes.h"
24 #include "lib/events/events.h"
25 #include "cluster/cluster.h"
26 #include "system/filesys.h"
27 #include "cluster/cluster_private.h"
28 #include "lib/tdb/include/tdb.h"
29 #include "include/ctdb.h"
30 #include "db_wrap.h"
31 #include "lib/util/dlinklist.h"
32
33 /* a linked list of messaging handlers, allowing incoming messages
34    to be directed to the right messaging context */
35 struct cluster_messaging_list {
36         struct cluster_messaging_list *next, *prev;
37         struct cluster_state *state;
38         struct messaging_context *msg;
39         struct server_id server;
40         cluster_message_fn_t handler;
41 };
42
43 struct cluster_state {
44         struct ctdb_context *ctdb;
45         struct cluster_messaging_list *list;
46 };
47
48
49
50 /*
51   return a server_id for a ctdb node
52 */
53 static struct server_id ctdb_id(struct cluster_ops *ops, uint32_t id)
54 {
55         struct cluster_state *state = ops->private;
56         struct ctdb_context *ctdb = state->ctdb;
57         struct server_id server_id;
58         server_id.node = ctdb_get_vnn(ctdb);
59         server_id.id = id;
60         return server_id;
61 }
62
63
64 /*
65   return a server_id as a string
66 */
67 static const char *ctdb_id_string(struct cluster_ops *ops, 
68                                   TALLOC_CTX *mem_ctx, struct server_id id)
69 {
70         return talloc_asprintf(mem_ctx, "%u.%u", id.node, id.id);
71 }
72
73 /*
74   this is an interim method for subsystems that have not yet been
75   converted to use the ctdb api. It opens a shared database in the
76   cluster temporary area, using TDB_CLEAR_IF_FIRST which relies on
77   correct operation of fcntl locks on the shared fileystem.
78 */
79 static struct tdb_wrap *ctdb_tdb_tmp_open(struct cluster_ops *ops,
80                                           TALLOC_CTX *mem_ctx, const char *dbname, 
81                                           int flags)
82 {
83         const char *dir = lp_parm_string(-1, "ctdb", "shared data");
84         char *path;
85         struct tdb_wrap *w;
86         if (dir == NULL) {
87                 DEBUG(0,("ERROR: You must set 'ctdb:shared data' to a cluster shared path\n"));
88                 return NULL;
89         }
90         path = talloc_asprintf(mem_ctx, "%s/%s", dir, dbname);
91         w = tdb_wrap_open(mem_ctx, path, 0,  
92                           flags | TDB_CLEAR_IF_FIRST,
93                           O_RDWR|O_CREAT, 0600);
94         talloc_free(path);
95         return w;
96 }
97
98 /*
99   get at the ctdb handle
100 */
101 static void *ctdb_backend_handle(struct cluster_ops *ops)
102 {
103         struct cluster_state *state = ops->private;
104         return (void *)state->ctdb;
105 }
106
107 struct ctdb_handler_state {
108         struct cluster_state *state;
109         cluster_message_fn_t handler;
110         struct messaging_context *msg;
111 };
112
113 /*
114   dispatch incoming ctdb messages
115 */
116 static void ctdb_message_handler(struct ctdb_context *ctdb, uint32_t srvid, 
117                                  TDB_DATA data, void *private)
118 {
119         struct ctdb_handler_state *s = talloc_get_type(private, 
120                                                        struct ctdb_handler_state);
121         DATA_BLOB blob;
122         blob.data = data.dptr;
123         blob.length = data.dsize;
124         s->handler(s->msg, blob);
125 }
126
127 static int ctdb_handler_destructor(struct ctdb_handler_state *s)
128 {
129         /* XXX - tell ctdb to de-register the message handler */
130         return 0;
131 }
132
133 /*
134   setup a handler for ctdb messages
135 */
136 static NTSTATUS ctdb_message_init(struct cluster_ops *ops,
137                                   struct messaging_context *msg, 
138                                   struct server_id server,
139                                   cluster_message_fn_t handler)
140 {
141         struct cluster_state *state = ops->private;
142         struct ctdb_handler_state *h;
143         int ret;
144
145         h = talloc(msg, struct ctdb_handler_state);
146         NT_STATUS_HAVE_NO_MEMORY(h);
147
148         h->state = state;
149         h->handler = handler;
150         h->msg = msg;
151
152         talloc_set_destructor(h, ctdb_handler_destructor);
153
154         /* setup a message handler */
155         ret = ctdb_set_message_handler(state->ctdb, server.id, 
156                                        ctdb_message_handler, h);
157         if (ret == -1) {
158                 DEBUG(0,("ctdb_set_message_handler failed - %s\n", 
159                          ctdb_errstr(state->ctdb)));
160                 exit(1);
161         }
162
163         return NT_STATUS_OK;
164 }
165
166 /*
167   send a ctdb message to another node
168 */
169 static NTSTATUS ctdb_message_send(struct cluster_ops *ops,
170                                   struct server_id server, DATA_BLOB *data)
171 {
172         struct cluster_state *state = ops->private;
173         struct ctdb_context *ctdb = state->ctdb;
174         TDB_DATA tdata;
175         int ret;
176
177         tdata.dptr = data->data;
178         tdata.dsize = data->length;
179
180         ret = ctdb_send_message(ctdb, server.node, server.id, tdata);
181         if (ret != 0) {
182                 return NT_STATUS_INTERNAL_DB_CORRUPTION;
183         }
184         return NT_STATUS_OK;
185 }
186
187 static struct cluster_ops cluster_ctdb_ops = {
188         .cluster_id           = ctdb_id,
189         .cluster_id_string    = ctdb_id_string,
190         .cluster_tdb_tmp_open = ctdb_tdb_tmp_open,
191         .backend_handle       = ctdb_backend_handle,
192         .message_init         = ctdb_message_init,
193         .message_send         = ctdb_message_send,
194         .private           = NULL
195 };
196
197 /* initialise ctdb */
198 void cluster_ctdb_init(struct event_context *ev, const char *model)
199 {
200         const char *nlist;
201         const char *address;
202         const char *transport;
203         struct cluster_state *state;
204         int ret, lacount, i;
205         const char *db_list[] = { "brlock", "opendb" };
206
207         nlist = lp_parm_string(-1, "ctdb", "nlist");
208         if (nlist == NULL) return;
209
210         address = lp_parm_string(-1, "ctdb", "address");
211         if (address == NULL) return;
212
213         transport = lp_parm_string(-1, "ctdb", "transport");
214         if (transport == NULL) {
215                 transport = "tcp";
216         }
217
218         state = talloc(ev, struct cluster_state);
219         if (state == NULL) goto failed;
220
221         state->ctdb = ctdb_init(ev);
222         if (state->ctdb == NULL) goto failed;
223
224         state->list = NULL;
225
226         cluster_ctdb_ops.private = state;
227
228         ret = ctdb_set_transport(state->ctdb, transport);
229         if (ret == -1) {
230                 DEBUG(0,("ctdb_set_transport failed - %s\n",
231                          ctdb_errstr(state->ctdb)));
232                 goto failed;
233         }
234
235         if (lp_parm_bool(-1, "ctdb", "selfconnect", False)) {
236                 DEBUG(0,("Enabling ctdb selfconnect\n"));
237                 ctdb_set_flags(state->ctdb, CTDB_FLAG_SELF_CONNECT);
238         }
239
240         lacount = lp_parm_int(-1, "ctdb", "maxlacount", -1);
241         if (lacount != -1) {
242                 ctdb_set_max_lacount(state->ctdb, lacount);
243         }
244
245         /* tell ctdb what address to listen on */
246         ret = ctdb_set_address(state->ctdb, address);
247         if (ret == -1) {
248                 DEBUG(0,("ctdb_set_address failed - %s\n", ctdb_errstr(state->ctdb)));
249                 goto failed;
250         }
251
252         ret = ctdb_set_tdb_dir(state->ctdb, lp_lockdir());
253         if (ret == -1) {
254                 DEBUG(0,("ctdb_set_tdb_dir failed - %s\n", ctdb_errstr(state->ctdb)));
255                 goto failed;
256         }
257
258         /* tell ctdb what nodes are available */
259         ret = ctdb_set_nlist(state->ctdb, nlist);
260         if (ret == -1) {
261                 DEBUG(0,("ctdb_set_nlist failed - %s\n", ctdb_errstr(state->ctdb)));
262                 goto failed;
263         }
264
265         /* attach all the databases we will need */
266         for (i=0;i<ARRAY_SIZE(db_list);i++) {
267                 struct ctdb_db_context *ctdb_db;
268                 ctdb_db = ctdb_attach(state->ctdb, db_list[i], TDB_INTERNAL, 
269                                       O_RDWR|O_CREAT|O_TRUNC, 0666);
270                 if (ctdb_db == NULL) goto failed;
271         }
272
273         cluster_set_ops(&cluster_ctdb_ops);
274
275         /* nasty hack for now ... */
276         {
277                 void brl_ctdb_init_ops(void);
278                 brl_ctdb_init_ops();
279         }
280
281         /* start the protocol running */
282         ret = ctdb_start(state->ctdb);
283         if (ret == -1) {
284                 DEBUG(0,("ctdb_start failed - %s\n", ctdb_errstr(state->ctdb)));
285                 goto failed;
286         }
287
288         /* wait until all nodes are connected (should not be needed
289            outside of test code) */
290         ctdb_connect_wait(state->ctdb);
291
292         return;
293         
294 failed:
295         DEBUG(0,("cluster_ctdb_init failed\n"));
296         talloc_free(state);
297 }
298