s3-msg: For msg_channel, correct the talloc hierarchy
[kai/samba.git] / source3 / lib / msg_channel.c
1 /*
2    Unix SMB/CIFS implementation.
3    Samba3 message channels
4    Copyright (C) Volker Lendecke 2012
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15
16    You should have received a copy of the GNU General Public License
17    along with this program.  If not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "msg_channel.h"
22 #include "ctdb_conn.h"
23 #include "lib/util/tevent_unix.h"
24
25 struct msg_channel {
26         struct ctdb_msg_channel *ctdb_channel;
27         struct messaging_context *msg;
28         uint32_t msg_type;
29
30         struct tevent_req *pending_req;
31         struct tevent_context *ev;
32
33         struct messaging_rec **msgs;
34 };
35
36 struct msg_channel_init_state {
37         struct msg_channel *channel;
38 };
39
40 static void msg_channel_init_got_ctdb(struct tevent_req *subreq);
41 static void msg_channel_init_got_msg(struct messaging_context *msg,
42                                void *priv, uint32_t msg_type,
43                                struct server_id server_id, DATA_BLOB *data);
44 static void msg_channel_trigger(struct tevent_context *ev,
45                                 struct tevent_immediate *im,
46                                 void *priv);
47 static int msg_channel_destructor(struct msg_channel *s);
48
49 struct tevent_req *msg_channel_init_send(TALLOC_CTX *mem_ctx,
50                                     struct tevent_context *ev,
51                                     struct messaging_context *msg,
52                                     uint32_t msg_type)
53 {
54         struct tevent_req *req, *subreq;
55         struct msg_channel_init_state *state;
56         struct server_id pid;
57
58         req = tevent_req_create(mem_ctx, &state,
59                                 struct msg_channel_init_state);
60         if (req == NULL) {
61                 return NULL;
62         }
63
64         state->channel = talloc_zero(state, struct msg_channel);
65         if (tevent_req_nomem(state->channel, req)) {
66                 return tevent_req_post(req, ev);
67         }
68         state->channel->msg = msg;
69         state->channel->msg_type = msg_type;
70
71         pid = messaging_server_id(msg);
72         subreq = ctdb_msg_channel_init_send(state, ev, lp_ctdbd_socket(),
73                                             pid.pid);
74         if (tevent_req_nomem(subreq, req)) {
75                 return tevent_req_post(req, ev);
76         }
77         tevent_req_set_callback(subreq, msg_channel_init_got_ctdb, req);
78         return req;
79 }
80
81 static void msg_channel_init_got_ctdb(struct tevent_req *subreq)
82 {
83         struct tevent_req *req = tevent_req_callback_data(
84                 subreq, struct tevent_req);
85         struct msg_channel_init_state *state = tevent_req_data(
86                 req, struct msg_channel_init_state);
87         struct msg_channel *s = state->channel;
88         NTSTATUS status;
89         int ret;
90
91         ret = ctdb_msg_channel_init_recv(subreq, s, &s->ctdb_channel);
92         TALLOC_FREE(subreq);
93
94         if (ret == ENOSYS) {
95                 s->ctdb_channel = NULL;
96                 ret = 0;
97         }
98
99         if (tevent_req_error(req, ret)) {
100                 return;
101         }
102         status = messaging_register(s->msg, s, s->msg_type,
103                                     msg_channel_init_got_msg);
104         if (!NT_STATUS_IS_OK(status)) {
105                 tevent_req_error(req, map_errno_from_nt_status(status));
106                 return;
107         }
108         talloc_set_destructor(s, msg_channel_destructor);
109         tevent_req_done(req);
110 }
111
112 static int msg_channel_destructor(struct msg_channel *s)
113 {
114         messaging_deregister(s->msg, s->msg_type, s);
115         return 0;
116 }
117
118 int msg_channel_init_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
119                           struct msg_channel **pchannel)
120 {
121         struct msg_channel_init_state *state = tevent_req_data(
122                 req, struct msg_channel_init_state);
123         int err;
124
125         if (tevent_req_is_unix_error(req, &err)) {
126                 return err;
127         }
128         *pchannel = talloc_move(mem_ctx, &state->channel);
129         return 0;
130 }
131
132 int msg_channel_init(TALLOC_CTX *mem_ctx, struct messaging_context *msg,
133                      uint32_t msgtype, struct msg_channel **pchannel)
134 {
135         TALLOC_CTX *frame = talloc_stackframe();
136         struct tevent_context *ev;
137         struct tevent_req *req;
138         int err = ENOMEM;
139         bool ok;
140
141         ev = tevent_context_init(frame);
142         if (ev == NULL) {
143                 goto fail;
144         }
145         req = msg_channel_init_send(frame, ev, msg, msgtype);
146         if (req == NULL) {
147                 goto fail;
148         }
149         ok = tevent_req_poll(req, ev);
150         if (!ok) {
151                 err = errno;
152                 goto fail;
153         }
154         err = msg_channel_init_recv(req, mem_ctx, pchannel);
155 fail:
156         TALLOC_FREE(frame);
157         return err;
158 }
159
160 static void msg_channel_init_got_msg(struct messaging_context *msg,
161                                      void *priv, uint32_t msg_type,
162                                      struct server_id server_id,
163                                      DATA_BLOB *data)
164 {
165         struct msg_channel *s = talloc_get_type_abort(
166                 priv, struct msg_channel);
167         struct messaging_rec *rec;
168         struct messaging_rec **msgs;
169         size_t num_msgs;
170         struct tevent_immediate *im;
171
172         rec = talloc(s, struct messaging_rec);
173         if (rec == NULL) {
174                 goto fail;
175         }
176         rec->msg_version = 1;
177         rec->msg_type = msg_type;
178         rec->dest = server_id;
179         rec->src = messaging_server_id(msg);
180         rec->buf.data = (uint8_t *)talloc_memdup(rec, data->data,
181                                                  data->length);
182         if (rec->buf.data == NULL) {
183                 goto fail;
184         }
185         rec->buf.length = data->length;
186
187         num_msgs = talloc_array_length(s->msgs);
188         msgs = talloc_realloc(s, s->msgs, struct messaging_rec *, num_msgs+1);
189         if (msgs == NULL) {
190                 goto fail;
191         }
192         s->msgs = msgs;
193         s->msgs[num_msgs] = talloc_move(s->msgs, &rec);
194
195         if (s->pending_req == NULL) {
196                 return;
197         }
198
199         im = tevent_create_immediate(s);
200         if (im == NULL) {
201                 goto fail;
202         }
203         tevent_schedule_immediate(im, s->ev, msg_channel_trigger, s);
204         return;
205 fail:
206         TALLOC_FREE(rec);
207 }
208
209 struct msg_read_state {
210         struct tevent_context *ev;
211         struct tevent_req *req;
212         struct msg_channel *channel;
213         struct messaging_rec *rec;
214 };
215
216 static int msg_read_state_destructor(struct msg_read_state *s);
217 static void msg_read_got_ctdb(struct tevent_req *subreq);
218
219 struct tevent_req *msg_read_send(TALLOC_CTX *mem_ctx,
220                                  struct tevent_context *ev,
221                                  struct msg_channel *channel)
222 {
223         struct tevent_req *req;
224         struct tevent_immediate *im;
225         struct msg_read_state *state;
226         void *msg_tdb_event;
227         size_t num_msgs;
228
229         req = tevent_req_create(mem_ctx, &state, struct msg_read_state);
230         if (req == NULL) {
231                 return NULL;
232         }
233         state->ev = ev;
234         state->req = req;
235         state->channel = channel;
236
237         if (channel->pending_req != NULL) {
238                 tevent_req_error(req, EBUSY);
239                 return tevent_req_post(req, ev);
240         }
241         channel->pending_req = req;
242         channel->ev = ev;
243         talloc_set_destructor(state, msg_read_state_destructor);
244
245         num_msgs = talloc_array_length(channel->msgs);
246         if (num_msgs != 0) {
247                 im = tevent_create_immediate(channel->ev);
248                 if (tevent_req_nomem(im, req)) {
249                         return tevent_req_post(req, ev);
250                 }
251                 tevent_schedule_immediate(im, channel->ev, msg_channel_trigger,
252                                           channel);
253                 return req;
254         }
255
256         msg_tdb_event = messaging_tdb_event(state, channel->msg, ev);
257         if (tevent_req_nomem(msg_tdb_event, req)) {
258                 return tevent_req_post(req, ev);
259
260         }
261         if (channel->ctdb_channel != NULL) {
262                 struct tevent_req *subreq;
263
264                 subreq = ctdb_msg_read_send(state, ev,
265                                             channel->ctdb_channel);
266                 if (tevent_req_nomem(subreq, req)) {
267                         return tevent_req_post(req, ev);
268                 }
269                 tevent_req_set_callback(subreq, msg_read_got_ctdb, req);
270         }
271         return req;
272 }
273
274 static int msg_read_state_destructor(struct msg_read_state *s)
275 {
276         assert(s->channel->pending_req == s->req);
277         s->channel->pending_req = NULL;
278         return 0;
279 }
280
281 static void msg_channel_trigger(struct tevent_context *ev,
282                                struct tevent_immediate *im,
283                                void *priv)
284 {
285         struct msg_channel *channel;
286         struct tevent_req *req;
287         struct msg_read_state *state;
288         size_t num_msgs;
289
290         channel = talloc_get_type_abort(priv, struct msg_channel);
291         req = channel->pending_req;
292         state = tevent_req_data(req, struct msg_read_state);
293
294         talloc_set_destructor(state, NULL);
295         msg_read_state_destructor(state);
296
297         num_msgs = talloc_array_length(channel->msgs);
298         assert(num_msgs > 0);
299
300         state->rec = talloc_move(state, &channel->msgs[0]);
301
302         memmove(channel->msgs, channel->msgs+1,
303                 sizeof(struct messaging_rec *) * (num_msgs-1));
304         channel->msgs = talloc_realloc(
305                 channel, channel->msgs, struct messaging_rec *, num_msgs - 1);
306
307         tevent_req_done(req);
308 }
309
310 static void msg_read_got_ctdb(struct tevent_req *subreq)
311 {
312         struct tevent_req *req = tevent_req_callback_data(
313                 subreq, struct tevent_req);
314         struct msg_read_state *state = tevent_req_data(
315                 req, struct msg_read_state);
316         DATA_BLOB blob;
317         enum ndr_err_code ndr_err;
318         int ret;
319
320         ret = ctdb_msg_read_recv(subreq, talloc_tos(),
321                                  &blob.data, &blob.length);
322         TALLOC_FREE(subreq);
323         if (tevent_req_error(req, ret)) {
324                 return;
325         }
326
327         state->rec = talloc(state, struct messaging_rec);
328         if (tevent_req_nomem(state->rec, req)) {
329                 return;
330         }
331
332         ndr_err = ndr_pull_struct_blob(
333                 &blob, state->rec, state->rec,
334                 (ndr_pull_flags_fn_t)ndr_pull_messaging_rec);
335
336         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
337                 DEBUG(1, ("ndr_pull_struct_blob failed: %s\n",
338                           ndr_errstr(ndr_err)));
339                 tevent_req_error(req, ndr_map_error2errno(ndr_err));
340                 return;
341         }
342         if (DEBUGLEVEL >= 10) {
343                 NDR_PRINT_DEBUG(messaging_rec, state->rec);
344         }
345         if (state->rec->msg_type == state->channel->msg_type) {
346                 tevent_req_done(req);
347                 return;
348         }
349         /*
350          * Got some unexpected msg type, wait for the next one
351          */
352         subreq = ctdb_msg_read_send(state, state->ev,
353                                     state->channel->ctdb_channel);
354         if (tevent_req_nomem(subreq, req)) {
355                 return;
356         }
357         tevent_req_set_callback(subreq, msg_read_got_ctdb, req);
358 }
359
360 int msg_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
361                   struct messaging_rec **prec)
362 {
363         struct msg_read_state *state = tevent_req_data(
364                 req, struct msg_read_state);
365         int err;
366
367         if (tevent_req_is_unix_error(req, &err)) {
368                 return err;
369         }
370         *prec = talloc_move(mem_ctx, &state->rec);
371         return 0;
372 }