16e28065463b4183543761d5731aa371496b7728
[sahlberg/ctdb.git] / server / ctdb_persistent.c
1 /* 
2    persistent store logic
3
4    Copyright (C) Andrew Tridgell  2007
5    Copyright (C) Ronnie Sahlberg  2007
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program; if not, see <http://www.gnu.org/licenses/>.
19 */
20
21 #include "includes.h"
22 #include "lib/events/events.h"
23 #include "system/filesys.h"
24 #include "system/wait.h"
25 #include "db_wrap.h"
26 #include "lib/tdb/include/tdb.h"
27 #include "../include/ctdb_private.h"
28
29 struct ctdb_persistent_state {
30         struct ctdb_context *ctdb;
31         struct ctdb_req_control *c;
32         const char *errormsg;
33         uint32_t num_pending;
34         int32_t status;
35 };
36
37 /*
38   called when a node has acknowledged a ctdb_control_update_record call
39  */
40 static void ctdb_persistent_callback(struct ctdb_context *ctdb,
41                                      int32_t status, TDB_DATA data, 
42                                      const char *errormsg,
43                                      void *private_data)
44 {
45         struct ctdb_persistent_state *state = talloc_get_type(private_data, 
46                                                               struct ctdb_persistent_state);
47
48         if (status != 0) {
49                 DEBUG(0,("ctdb_persistent_callback failed with status %d (%s)\n",
50                          status, errormsg));
51                 state->status = status;
52                 state->errormsg = errormsg;
53         }
54         state->num_pending--;
55         if (state->num_pending == 0) {
56                 ctdb_request_control_reply(state->ctdb, state->c, NULL, state->status, state->errormsg);
57                 talloc_free(state);
58         }
59 }
60
61 /*
62   called if persistent store times out
63  */
64 static void ctdb_persistent_store_timeout(struct event_context *ev, struct timed_event *te, 
65                                          struct timeval t, void *private_data)
66 {
67         struct ctdb_persistent_state *state = talloc_get_type(private_data, struct ctdb_persistent_state);
68         
69         ctdb_request_control_reply(state->ctdb, state->c, NULL, -1, "timeout in ctdb_persistent_state");
70
71         talloc_free(state);
72 }
73
74 /*
75   store a persistent record - called from a ctdb client when it has updated
76   a record in a persistent database. The client will have the record
77   locked for the duration of this call. The client is the dmaster when 
78   this call is made
79  */
80 int32_t ctdb_control_persistent_store(struct ctdb_context *ctdb, 
81                                       struct ctdb_req_control *c, 
82                                       TDB_DATA recdata, bool *async_reply)
83 {
84         struct ctdb_persistent_state *state;
85         int i;
86
87         state = talloc_zero(c, struct ctdb_persistent_state);
88         CTDB_NO_MEMORY(ctdb, state);
89
90         state->ctdb = ctdb;
91         state->c    = c;
92
93         for (i=0;i<ctdb->vnn_map->size;i++) {
94                 struct ctdb_node *node = ctdb->nodes[ctdb->vnn_map->map[i]];
95                 int ret;
96
97                 /* only send to active nodes */
98                 if (node->flags & NODE_FLAGS_INACTIVE) {
99                         continue;
100                 }
101
102                 /* don't send to ourselves */
103                 if (node->pnn == ctdb->pnn) {
104                         continue;
105                 }
106                 
107                 ret = ctdb_daemon_send_control(ctdb, node->pnn, 0, CTDB_CONTROL_UPDATE_RECORD,
108                                                c->client_id, 0, recdata, 
109                                                ctdb_persistent_callback, state);
110                 if (ret == -1) {
111                         DEBUG(0,("Unable to send CTDB_CONTROL_UPDATE_RECORD to pnn %u\n", node->pnn));
112                         talloc_free(state);
113                         return -1;
114                 }
115
116                 state->num_pending++;
117         }
118
119         if (state->num_pending == 0) {
120                 talloc_free(state);
121                 return 0;
122         }
123         
124         /* we need to wait for the replies */
125         *async_reply = true;
126
127         /* need to keep the control structure around */
128         talloc_steal(state, c);
129
130         /* but we won't wait forever */
131         event_add_timed(ctdb->ev, state, 
132                         timeval_current_ofs(ctdb->tunable.control_timeout, 0),
133                         ctdb_persistent_store_timeout, state);
134
135         return 0;
136 }
137
138
139 struct ctdb_persistent_lock_state {
140         struct ctdb_db_context *ctdb_db;
141         TDB_DATA key;
142         TDB_DATA data;
143         struct ctdb_ltdb_header *header;
144         struct tdb_context *tdb;
145         struct ctdb_req_control *c;
146 };
147
148
149 /*
150   called with a lock held in the current process
151  */
152 static int ctdb_persistent_store(struct ctdb_persistent_lock_state *state)
153 {
154         struct ctdb_ltdb_header oldheader;
155         int ret;
156
157         /* fetch the old header and ensure the rsn is less than the new rsn */
158         ret = ctdb_ltdb_fetch(state->ctdb_db, state->key, &oldheader, NULL, NULL);
159         if (ret != 0) {
160                 DEBUG(0,("Failed to fetch old record for db_id 0x%08x in ctdb_persistent_store\n",
161                          state->ctdb_db->db_id));
162                 return -1;
163         }
164
165         if (oldheader.rsn >= state->header->rsn) {
166                 DEBUG(0,("existing header for db_id 0x%08x has larger RSN %llu than new RSN %llu in ctdb_persistent_store\n",
167                          state->ctdb_db->db_id, 
168                          (unsigned long long)oldheader.rsn, (unsigned long long)state->header->rsn));
169                 return -1;
170         }
171
172         ret = ctdb_ltdb_store(state->ctdb_db, state->key, state->header, state->data);
173         if (ret != 0) {
174                 DEBUG(0,("Failed to store record for db_id 0x%08x in ctdb_persistent_store\n", 
175                          state->ctdb_db->db_id));
176                 return -1;
177         }
178
179         return 0;
180 }
181
182
183 /*
184   called when we get the lock on the given record
185   at this point the lockwait child holds a lock on our behalf
186  */
187 static void ctdb_persistent_lock_callback(void *private_data)
188 {
189         struct ctdb_persistent_lock_state *state = talloc_get_type(private_data, 
190                                                                    struct ctdb_persistent_lock_state);
191         int ret;
192
193         ret = tdb_chainlock_mark(state->tdb, state->key);
194         if (ret != 0) {
195                 DEBUG(0,("Failed to mark lock in ctdb_persistent_lock_callback\n"));
196                 ctdb_request_control_reply(state->ctdb_db->ctdb, state->c, NULL, ret, NULL);
197                 return;
198         }
199
200         ret = ctdb_persistent_store(state);
201         ctdb_request_control_reply(state->ctdb_db->ctdb, state->c, NULL, ret, NULL);
202         tdb_chainlock_unmark(state->tdb, state->key);
203 }
204
205 /*
206   called if our lockwait child times out
207  */
208 static void ctdb_persistent_lock_timeout(struct event_context *ev, struct timed_event *te, 
209                                          struct timeval t, void *private_data)
210 {
211         struct ctdb_persistent_lock_state *state = talloc_get_type(private_data, 
212                                                                    struct ctdb_persistent_lock_state);
213         ctdb_request_control_reply(state->ctdb_db->ctdb, state->c, NULL, -1, "timeout in ctdb_persistent_lock");
214         talloc_free(state);
215 }
216
217
218 /* 
219    update a record on this node if the new record has a higher rsn than the
220    current record
221  */
222 int32_t ctdb_control_update_record(struct ctdb_context *ctdb, 
223                                    struct ctdb_req_control *c, TDB_DATA recdata, 
224                                    bool *async_reply)
225 {
226         struct ctdb_rec_data *rec = (struct ctdb_rec_data *)&recdata.dptr[0];
227         int ret;
228         struct ctdb_db_context *ctdb_db;
229         uint32_t db_id = rec->reqid;
230         struct lockwait_handle *handle;
231         struct ctdb_persistent_lock_state *state;
232
233         if (ctdb->recovery_mode != CTDB_RECOVERY_NORMAL) {
234                 DEBUG(0,("rejecting ctdb_control_update_record when recovery active\n"));
235                 return -1;
236         }
237
238         ctdb_db = find_ctdb_db(ctdb, db_id);
239         if (ctdb_db == NULL) {
240                 DEBUG(0,("Unknown database 0x%08x in ctdb_control_update_record\n", db_id));
241                 return -1;
242         }
243
244         state = talloc(c, struct ctdb_persistent_lock_state);
245         CTDB_NO_MEMORY(ctdb, state);
246
247         state->ctdb_db = ctdb_db;
248         state->c       = c;
249         state->tdb     = ctdb_db->ltdb->tdb;
250         state->key.dptr   = &rec->data[0];
251         state->key.dsize  = rec->keylen;
252         state->data.dptr  = &rec->data[rec->keylen];
253         state->data.dsize = rec->datalen;
254
255         if (state->data.dsize < sizeof(struct ctdb_ltdb_header)) {
256                 DEBUG(0,("Invalid data size %u in ctdb_control_update_record\n", 
257                          (unsigned)state->data.dsize));
258                 return -1;
259         }
260
261         state->header = (struct ctdb_ltdb_header *)&state->data.dptr[0];
262         state->data.dptr  += sizeof(struct ctdb_ltdb_header);
263         state->data.dsize -= sizeof(struct ctdb_ltdb_header);
264
265         /* try and do it without a lockwait */
266         ret = tdb_chainlock_nonblock(state->tdb, state->key);
267         if (ret == 0) {
268                 ret = ctdb_persistent_store(state);
269                 tdb_chainunlock(state->tdb, state->key);
270                 return ret;
271         }
272
273         /* wait until we have a lock on this record */
274         handle = ctdb_lockwait(ctdb_db, state->key, ctdb_persistent_lock_callback, state);
275         if (handle == NULL) {
276                 DEBUG(0,("Failed to setup lockwait handler in ctdb_control_update_record\n"));
277                 return -1;
278         }
279
280         *async_reply = true;
281
282         event_add_timed(ctdb->ev, state, timeval_current_ofs(ctdb->tunable.control_timeout, 0),
283                         ctdb_persistent_lock_timeout, state);
284
285         return 0;
286 }