ctdb: Fix CID 1435740 Unchecked return value
[kai/samba-autobuild/.git] / ctdb / tests / src / cluster_wait.c
1 /*
2    Cluster wide synchronization
3
4    Copyright (C) Amitay Isaacs  2015
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "replace.h"
21 #include "system/network.h"
22
23 #include "lib/util/tevent_unix.h"
24
25 #include "client/client.h"
26
27 #include "tests/src/cluster_wait.h"
28
29 #define MSG_ID_JOIN     (CTDB_SRVID_TEST_RANGE | 0x1)
30 #define MSG_ID_SYNC     (CTDB_SRVID_TEST_RANGE | 0x2)
31
32 /* Wait for all the clients to initialize */
33
34 struct cluster_wait_state {
35         struct tevent_context *ev;
36         struct ctdb_client_context *client;
37         int num_nodes;
38         bool *ready;
39         bool join_done;
40 };
41
42 static void cluster_wait_join_registered(struct tevent_req *subreq);
43 static void cluster_wait_sync_registered(struct tevent_req *subreq);
44 static void cluster_wait_join(struct tevent_req *subreq);
45 static void cluster_wait_join_sent(struct tevent_req *subreq);
46 static void cluster_wait_join_handler(uint64_t srvid, TDB_DATA data,
47                                       void *private_data);
48 static void cluster_wait_join_unregistered(struct tevent_req *subreq);
49 static void cluster_wait_sync_sent(struct tevent_req *subreq);
50 static void cluster_wait_sync_handler(uint64_t srvid, TDB_DATA data,
51                                       void *private_data);
52 static void cluster_wait_sync_unregistered(struct tevent_req *subreq);
53
54 struct tevent_req *cluster_wait_send(TALLOC_CTX *mem_ctx,
55                                      struct tevent_context *ev,
56                                      struct ctdb_client_context *client,
57                                      int num_nodes)
58 {
59         struct tevent_req *req, *subreq;
60         struct cluster_wait_state *state;
61         bool ok;
62
63         req = tevent_req_create(mem_ctx, &state, struct cluster_wait_state);
64         if (req == NULL) {
65                 return NULL;
66         }
67
68         state->ev = ev;
69         state->client = client;
70         state->num_nodes = num_nodes;
71
72         state->join_done = false;
73
74         if (ctdb_client_pnn(client) == 0) {
75                 state->ready = talloc_zero_array(state, bool, num_nodes);
76                 if (tevent_req_nomem(state->ready, req)) {
77                         return tevent_req_post(req, ev);
78                 }
79
80                 subreq = ctdb_client_set_message_handler_send(
81                                         state, ev, client, MSG_ID_JOIN,
82                                         cluster_wait_join_handler, req);
83                 if (tevent_req_nomem(subreq, req)) {
84                         return tevent_req_post(req, ev);
85                 }
86                 tevent_req_set_callback(subreq, cluster_wait_join_registered,
87                                         req);
88         }
89
90         subreq = ctdb_client_set_message_handler_send(
91                                         state, ev, client, MSG_ID_SYNC,
92                                         cluster_wait_sync_handler, req);
93         if (tevent_req_nomem(subreq, req)) {
94                 return tevent_req_post(req, ev);
95         }
96         tevent_req_set_callback(subreq, cluster_wait_sync_registered, req);
97
98         /* If cluster is not synchronized within 30 seconds, time out */
99         ok = tevent_req_set_endtime(
100                 req,
101                 ev,
102                 tevent_timeval_current_ofs(30, 0));
103         if (!ok) {
104                 return tevent_req_post(req, ev);
105         }
106
107         return req;
108 }
109
110 static void cluster_wait_join_registered(struct tevent_req *subreq)
111 {
112         struct tevent_req *req = tevent_req_callback_data(
113                 subreq, struct tevent_req);
114         bool status;
115         int ret;
116
117         status = ctdb_client_set_message_handler_recv(subreq, &ret);
118         TALLOC_FREE(subreq);
119         if (! status) {
120                 tevent_req_error(req, ret);
121                 return;
122         }
123
124         printf("Waiting for cluster\n");
125         fflush(stdout);
126 }
127
128 static void cluster_wait_sync_registered(struct tevent_req *subreq)
129 {
130         struct tevent_req *req = tevent_req_callback_data(
131                 subreq, struct tevent_req);
132         struct cluster_wait_state *state = tevent_req_data(
133                 req, struct cluster_wait_state);
134         bool status;
135         int ret;
136
137         status = ctdb_client_set_message_handler_recv(subreq, &ret);
138         TALLOC_FREE(subreq);
139         if (! status) {
140                 tevent_req_error(req, ret);
141                 return;
142         }
143
144         subreq = tevent_wakeup_send(state, state->ev,
145                                     tevent_timeval_current_ofs(1, 0));
146         if (tevent_req_nomem(subreq, req)) {
147                 return;
148         }
149         tevent_req_set_callback(subreq, cluster_wait_join, req);
150 }
151
152 static void cluster_wait_join(struct tevent_req *subreq)
153 {
154         struct tevent_req *req = tevent_req_callback_data(
155                 subreq, struct tevent_req);
156         struct cluster_wait_state *state = tevent_req_data(
157                 req, struct cluster_wait_state);
158         struct ctdb_req_message msg;
159         uint32_t pnn;
160         bool status;
161
162         status = tevent_wakeup_recv(subreq);
163         TALLOC_FREE(subreq);
164         if (! status) {
165                 tevent_req_error(req, EIO);
166                 return;
167         }
168
169         pnn = ctdb_client_pnn(state->client);
170
171         msg.srvid = MSG_ID_JOIN;
172         msg.data.data.dsize = sizeof(pnn);
173         msg.data.data.dptr = (uint8_t *)&pnn;
174
175         subreq = ctdb_client_message_send(state, state->ev, state->client,
176                                           0, &msg);
177         if (tevent_req_nomem(subreq, req)) {
178                 return;
179         }
180         tevent_req_set_callback(subreq, cluster_wait_join_sent, req);
181 }
182
183 static void cluster_wait_join_sent(struct tevent_req *subreq)
184 {
185         struct tevent_req *req = tevent_req_callback_data(
186                 subreq, struct tevent_req);
187         struct cluster_wait_state *state = tevent_req_data(
188                 req, struct cluster_wait_state);
189         bool status;
190         int ret;
191
192         status = ctdb_client_message_recv(subreq, &ret);
193         TALLOC_FREE(subreq);
194         if (! status) {
195                 tevent_req_error(req, ret);
196                 return;
197         }
198
199         subreq = tevent_wakeup_send(state, state->ev,
200                                     tevent_timeval_current_ofs(1, 0));
201         if (tevent_req_nomem(subreq, req)) {
202                 return;
203         }
204         tevent_req_set_callback(subreq, cluster_wait_join, req);
205 }
206
207 static void cluster_wait_join_handler(uint64_t srvid, TDB_DATA data,
208                                       void *private_data)
209 {
210         struct tevent_req *req = talloc_get_type_abort(
211                 private_data, struct tevent_req);
212         struct cluster_wait_state *state = tevent_req_data(
213                 req, struct cluster_wait_state);
214         struct tevent_req *subreq;
215         uint32_t pnn;
216         int i;
217
218         if (srvid != MSG_ID_JOIN) {
219                 return;
220         }
221
222         if (data.dsize != sizeof(uint32_t)) {
223                 return;
224         }
225
226         pnn = *(uint32_t *)data.dptr;
227
228         if (pnn > state->num_nodes) {
229                 return;
230         }
231
232         state->ready[pnn] = true;
233
234         for (i=0; i<state->num_nodes; i++) {
235                 if (! state->ready[i]) {
236                         return;
237                 }
238         }
239
240         if (state->join_done) {
241                 return;
242         }
243
244         state->join_done = true;
245         subreq = ctdb_client_remove_message_handler_send(
246                                         state, state->ev, state->client,
247                                         MSG_ID_JOIN, req);
248         if (tevent_req_nomem(subreq, req)) {
249                 return;
250         }
251         tevent_req_set_callback(subreq, cluster_wait_join_unregistered, req);
252 }
253
254 static void cluster_wait_join_unregistered(struct tevent_req *subreq)
255 {
256         struct tevent_req *req = tevent_req_callback_data(
257                 subreq, struct tevent_req);
258         struct cluster_wait_state *state = tevent_req_data(
259                 req, struct cluster_wait_state);
260         struct ctdb_req_message msg;
261         bool status;
262         int ret;
263
264         status = ctdb_client_remove_message_handler_recv(subreq, &ret);
265         if (! status) {
266                 tevent_req_error(req, ret);
267                 return;
268         }
269
270         msg.srvid = MSG_ID_SYNC;
271         msg.data.data = tdb_null;
272
273         subreq = ctdb_client_message_send(state, state->ev, state->client,
274                                           CTDB_BROADCAST_CONNECTED, &msg);
275         if (tevent_req_nomem(subreq, req)) {
276                 return;
277         }
278         tevent_req_set_callback(subreq, cluster_wait_sync_sent, req);
279 }
280
281 static void cluster_wait_sync_sent(struct tevent_req *subreq)
282 {
283         struct tevent_req *req = tevent_req_callback_data(
284                 subreq, struct tevent_req);
285         bool status;
286         int ret;
287
288         status = ctdb_client_message_recv(subreq, &ret);
289         TALLOC_FREE(subreq);
290         if (! status) {
291                 tevent_req_error(req, ret);
292                 return;
293         }
294 }
295
296 static void cluster_wait_sync_handler(uint64_t srvid, TDB_DATA data,
297                                       void *private_data)
298 {
299         struct tevent_req *req = talloc_get_type_abort(
300                 private_data, struct tevent_req);
301         struct cluster_wait_state *state = tevent_req_data(
302                 req, struct cluster_wait_state);
303         struct tevent_req *subreq;
304
305         if (srvid != MSG_ID_SYNC) {
306                 return;
307         }
308
309         subreq = ctdb_client_remove_message_handler_send(
310                                         state, state->ev, state->client,
311                                         MSG_ID_SYNC, req);
312         if (tevent_req_nomem(subreq, req)) {
313                 return;
314         }
315         tevent_req_set_callback(subreq, cluster_wait_sync_unregistered, req);
316 }
317
318 static void cluster_wait_sync_unregistered(struct tevent_req *subreq)
319 {
320         struct tevent_req *req = tevent_req_callback_data(
321                 subreq, struct tevent_req);
322         bool status;
323         int ret;
324
325         status = ctdb_client_remove_message_handler_recv(subreq, &ret);
326         TALLOC_FREE(subreq);
327         if (! status) {
328                 tevent_req_error(req, ret);
329                 return;
330         }
331
332         tevent_req_done(req);
333 }
334
335 bool cluster_wait_recv(struct tevent_req *req, int *perr)
336 {
337         int err;
338
339         if (tevent_req_is_unix_error(req, &err)) {
340                 if (perr != NULL) {
341                         *perr = err;
342                 }
343                 return false;
344         }
345         return true;
346 }