added storage of extended ltdb header information
[vlendec/samba-autobuild/.git] / ctdb / common / ctdb_call.c
1 /* 
2    ctdb_call protocol code
3
4    Copyright (C) Andrew Tridgell  2006
5
6    This library is free software; you can redistribute it and/or
7    modify it under the terms of the GNU Lesser General Public
8    License as published by the Free Software Foundation; either
9    version 2 of the License, or (at your option) any later version.
10
11    This library is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14    Lesser General Public License for more details.
15
16    You should have received a copy of the GNU Lesser General Public
17    License along with this library; if not, write to the Free Software
18    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
19 */
20
21 #include "includes.h"
22 #include "lib/events/events.h"
23 #include "system/network.h"
24 #include "system/filesys.h"
25 #include "ctdb_private.h"
26
27
28 /*
29   local version of ctdb_call
30 */
31 static int ctdb_call_local(struct ctdb_context *ctdb, TDB_DATA key, int call_id, 
32                            TDB_DATA *call_data, TDB_DATA *reply_data)
33 {
34         struct ctdb_call *c;
35         struct ctdb_registered_call *fn;
36         TDB_DATA data;
37         struct ctdb_ltdb_header header;
38         int ret;
39
40         c = talloc(ctdb, struct ctdb_call);
41         CTDB_NO_MEMORY(ctdb, c);
42
43         ret = ctdb_ltdb_fetch(ctdb, c, key, &header, &data);
44         if (ret != 0) return -1;
45         
46         c->key = key;
47         c->call_data = call_data;
48         c->record_data.dptr = talloc_memdup(c, data.dptr, data.dsize);
49         c->record_data.dsize = data.dsize;
50         CTDB_NO_MEMORY(ctdb, c->record_data.dptr);
51         c->new_data = NULL;
52         c->reply_data = NULL;
53
54         for (fn=ctdb->calls;fn;fn=fn->next) {
55                 if (fn->id == call_id) break;
56         }
57         if (fn == NULL) {
58                 ctdb_set_error(ctdb, "Unknown call id %u\n", call_id);
59                 return -1;
60         }
61
62         if (fn->fn(c) != 0) {
63                 ctdb_set_error(ctdb, "ctdb_call %u failed\n", call_id);
64                 return -1;
65         }
66
67         if (c->new_data) {
68                 if (ctdb_ltdb_store(ctdb, key, &header, *c->new_data) != 0) {
69                         ctdb_set_error(ctdb, "ctdb_call tdb_store failed\n");
70                         return -1;
71                 }
72         }
73
74         if (reply_data) {
75                 if (c->reply_data) {
76                         *reply_data = *c->reply_data;
77                         talloc_steal(ctdb, reply_data->dptr);
78                 } else {
79                         reply_data->dptr = NULL;
80                         reply_data->dsize = 0;
81                 }
82         }
83
84         talloc_free(c);
85
86         return 0;
87 }
88
89 /*
90   called when a CTDB_REQ_CALL packet comes in
91 */
92 void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
93 {
94         struct ctdb_req_call *c = (struct ctdb_req_call *)hdr;
95         TDB_DATA key, call_data, reply_data;
96         struct ctdb_reply_call *r;
97         struct ctdb_node *node;
98
99         key.dptr = c->data;
100         key.dsize = c->keylen;
101         call_data.dptr = c->data + c->keylen;
102         call_data.dsize = c->calldatalen;
103
104         ctdb_call_local(ctdb, key, c->callid, 
105                         call_data.dsize?&call_data:NULL,
106                         &reply_data);
107
108         r = talloc_size(ctdb, sizeof(*r) + reply_data.dsize);
109         r->hdr.length = sizeof(*r) + reply_data.dsize;
110         r->hdr.operation = CTDB_REPLY_CALL;
111         r->hdr.destnode  = hdr->srcnode;
112         r->hdr.srcnode   = hdr->destnode;
113         r->hdr.reqid     = hdr->reqid;
114         r->datalen       = reply_data.dsize;
115         memcpy(&r->data[0], reply_data.dptr, reply_data.dsize);
116
117         node = ctdb->nodes[hdr->srcnode];
118
119         ctdb->methods->queue_pkt(node, (uint8_t *)r, r->hdr.length);
120
121         talloc_free(reply_data.dptr);
122         talloc_free(r);
123 }
124
125 enum call_state {CTDB_CALL_WAIT, CTDB_CALL_DONE, CTDB_CALL_ERROR};
126
127 /*
128   state of a in-progress ctdb call
129 */
130 struct ctdb_call_state {
131         enum call_state state;
132         struct ctdb_req_call *c;
133         struct ctdb_node *node;
134         TDB_DATA reply_data;
135 };
136
137
138 /*
139   called when a CTDB_REPLY_CALL packet comes in
140 */
141 void ctdb_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
142 {
143         struct ctdb_reply_call *c = (struct ctdb_reply_call *)hdr;
144         struct ctdb_call_state *state;
145         TDB_DATA reply_data;
146
147         state = idr_find(ctdb->idr, hdr->reqid);
148
149         reply_data.dptr = c->data;
150         reply_data.dsize = c->datalen;
151
152         state->reply_data = reply_data;
153
154         talloc_steal(state, c);
155
156         state->state = CTDB_CALL_DONE;
157 }
158
159 /*
160   destroy a ctdb_call
161 */
162 static int ctdb_call_destructor(struct ctdb_call_state *state)
163 {
164         idr_remove(state->node->ctdb->idr, state->c->hdr.reqid);
165         return 0;
166 }
167
168 /*
169   called when a call times out
170 */
171 void ctdb_call_timeout(struct event_context *ev, struct timed_event *te, 
172                        struct timeval t, void *private)
173 {
174         struct ctdb_call_state *state = talloc_get_type(private, struct ctdb_call_state);
175         state->state = CTDB_CALL_ERROR;
176         ctdb_set_error(state->node->ctdb, "ctdb_call timed out");
177 }
178
179 /*
180   fake an event driven local ctdb_call
181 */
182 struct ctdb_call_state *ctdb_call_local_send(struct ctdb_context *ctdb, 
183                                              TDB_DATA key, int call_id, 
184                                              TDB_DATA *call_data, TDB_DATA *reply_data)
185 {
186         struct ctdb_call_state *state;
187         int ret;
188
189         state = talloc_zero(ctdb, struct ctdb_call_state);
190         CTDB_NO_MEMORY_NULL(ctdb, state);
191
192         state->state = CTDB_CALL_DONE;
193         state->node = ctdb->nodes[ctdb->vnn];
194
195         ret = ctdb_call_local(ctdb, key, call_id, call_data, &state->reply_data);
196         return state;
197 }
198
199
200 /*
201   make a remote ctdb call - async send
202 */
203 struct ctdb_call_state *ctdb_call_send(struct ctdb_context *ctdb, 
204                                        TDB_DATA key, int call_id, 
205                                        TDB_DATA *call_data, TDB_DATA *reply_data)
206 {
207         uint32_t dest;
208         uint32_t len;
209         struct ctdb_call_state *state;
210
211         dest = ctdb_hash(&key) % ctdb->num_nodes;
212         if (dest == ctdb->vnn && !(ctdb->flags & CTDB_FLAG_SELF_CONNECT)) {
213                 return ctdb_call_local_send(ctdb, key, call_id, call_data, reply_data);
214         }
215
216         state = talloc_zero(ctdb, struct ctdb_call_state);
217         CTDB_NO_MEMORY_NULL(ctdb, state);
218
219         len = sizeof(*state->c) + key.dsize + (call_data?call_data->dsize:0);
220         state->c = talloc_size(ctdb, len);
221         CTDB_NO_MEMORY_NULL(ctdb, state->c);
222
223         state->c->hdr.length    = len;
224         state->c->hdr.operation = CTDB_REQ_CALL;
225         state->c->hdr.destnode  = dest;
226         state->c->hdr.srcnode   = ctdb->vnn;
227         /* this limits us to 16k outstanding messages - not unreasonable */
228         state->c->hdr.reqid     = idr_get_new(ctdb->idr, state, 0xFFFF);
229         state->c->callid        = call_id;
230         state->c->keylen        = key.dsize;
231         state->c->calldatalen   = call_data?call_data->dsize:0;
232         memcpy(&state->c->data[0], key.dptr, key.dsize);
233         if (call_data) {
234                 memcpy(&state->c->data[key.dsize], call_data->dptr, call_data->dsize);
235         }
236
237         state->node = ctdb->nodes[dest];
238         state->state = CTDB_CALL_WAIT;
239
240         talloc_set_destructor(state, ctdb_call_destructor);
241
242         if (ctdb->methods->queue_pkt(state->node, (uint8_t *)state->c, len) != 0) {
243                 talloc_free(state);
244                 return NULL;
245         }
246
247         event_add_timed(ctdb->ev, state, timeval_current_ofs(CTDB_REQ_TIMEOUT, 0), 
248                         ctdb_call_timeout, state);
249         return state;
250 }
251
252
253 /*
254   make a remote ctdb call - async recv
255 */
256 int ctdb_call_recv(struct ctdb_call_state *state, TDB_DATA *reply_data)
257 {
258         while (state->state < CTDB_CALL_DONE) {
259                 event_loop_once(state->node->ctdb->ev);
260         }
261         if (state->state != CTDB_CALL_DONE) {
262                 talloc_free(state);
263                 return -1;
264         }
265         if (reply_data) {
266                 reply_data->dptr = talloc_memdup(state->node->ctdb,
267                                                  state->reply_data.dptr,
268                                                  state->reply_data.dsize);
269                 reply_data->dsize = state->reply_data.dsize;
270         }
271         talloc_free(state);
272         return 0;
273 }
274
275 /*
276   full ctdb_call
277 */
278 int ctdb_call(struct ctdb_context *ctdb, 
279               TDB_DATA key, int call_id, 
280               TDB_DATA *call_data, TDB_DATA *reply_data)
281 {
282         struct ctdb_call_state *state;
283         state = ctdb_call_send(ctdb, key, call_id, call_data, reply_data);
284         return ctdb_call_recv(state, reply_data);
285 }