2 Cluster wide synchronization
4 Copyright (C) Amitay Isaacs 2015
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
21 #include "system/network.h"
23 #include "lib/util/tevent_unix.h"
25 #include "client/client.h"
27 #include "tests/src/cluster_wait.h"
29 #define MSG_ID_JOIN (CTDB_SRVID_TEST_RANGE | 0x1)
30 #define MSG_ID_SYNC (CTDB_SRVID_TEST_RANGE | 0x2)
32 /* Wait for all the clients to initialize */
34 struct cluster_wait_state {
35 struct tevent_context *ev;
36 struct ctdb_client_context *client;
42 static void cluster_wait_join_registered(struct tevent_req *subreq);
43 static void cluster_wait_sync_registered(struct tevent_req *subreq);
44 static void cluster_wait_join(struct tevent_req *subreq);
45 static void cluster_wait_join_sent(struct tevent_req *subreq);
46 static void cluster_wait_join_handler(uint64_t srvid, TDB_DATA data,
48 static void cluster_wait_join_unregistered(struct tevent_req *subreq);
49 static void cluster_wait_sync_sent(struct tevent_req *subreq);
50 static void cluster_wait_sync_handler(uint64_t srvid, TDB_DATA data,
52 static void cluster_wait_sync_unregistered(struct tevent_req *subreq);
54 struct tevent_req *cluster_wait_send(TALLOC_CTX *mem_ctx,
55 struct tevent_context *ev,
56 struct ctdb_client_context *client,
59 struct tevent_req *req, *subreq;
60 struct cluster_wait_state *state;
63 req = tevent_req_create(mem_ctx, &state, struct cluster_wait_state);
69 state->client = client;
70 state->num_nodes = num_nodes;
72 state->join_done = false;
74 if (ctdb_client_pnn(client) == 0) {
75 state->ready = talloc_zero_array(state, bool, num_nodes);
76 if (tevent_req_nomem(state->ready, req)) {
77 return tevent_req_post(req, ev);
80 subreq = ctdb_client_set_message_handler_send(
81 state, ev, client, MSG_ID_JOIN,
82 cluster_wait_join_handler, req);
83 if (tevent_req_nomem(subreq, req)) {
84 return tevent_req_post(req, ev);
86 tevent_req_set_callback(subreq, cluster_wait_join_registered,
90 subreq = ctdb_client_set_message_handler_send(
91 state, ev, client, MSG_ID_SYNC,
92 cluster_wait_sync_handler, req);
93 if (tevent_req_nomem(subreq, req)) {
94 return tevent_req_post(req, ev);
96 tevent_req_set_callback(subreq, cluster_wait_sync_registered, req);
98 /* If cluster is not synchronized within 30 seconds, time out */
99 ok = tevent_req_set_endtime(
102 tevent_timeval_current_ofs(30, 0));
104 return tevent_req_post(req, ev);
110 static void cluster_wait_join_registered(struct tevent_req *subreq)
112 struct tevent_req *req = tevent_req_callback_data(
113 subreq, struct tevent_req);
117 status = ctdb_client_set_message_handler_recv(subreq, &ret);
120 tevent_req_error(req, ret);
124 printf("Waiting for cluster\n");
128 static void cluster_wait_sync_registered(struct tevent_req *subreq)
130 struct tevent_req *req = tevent_req_callback_data(
131 subreq, struct tevent_req);
132 struct cluster_wait_state *state = tevent_req_data(
133 req, struct cluster_wait_state);
137 status = ctdb_client_set_message_handler_recv(subreq, &ret);
140 tevent_req_error(req, ret);
144 subreq = tevent_wakeup_send(state, state->ev,
145 tevent_timeval_current_ofs(1, 0));
146 if (tevent_req_nomem(subreq, req)) {
149 tevent_req_set_callback(subreq, cluster_wait_join, req);
152 static void cluster_wait_join(struct tevent_req *subreq)
154 struct tevent_req *req = tevent_req_callback_data(
155 subreq, struct tevent_req);
156 struct cluster_wait_state *state = tevent_req_data(
157 req, struct cluster_wait_state);
158 struct ctdb_req_message msg;
162 status = tevent_wakeup_recv(subreq);
165 tevent_req_error(req, EIO);
169 pnn = ctdb_client_pnn(state->client);
171 msg.srvid = MSG_ID_JOIN;
172 msg.data.data.dsize = sizeof(pnn);
173 msg.data.data.dptr = (uint8_t *)&pnn;
175 subreq = ctdb_client_message_send(state, state->ev, state->client,
177 if (tevent_req_nomem(subreq, req)) {
180 tevent_req_set_callback(subreq, cluster_wait_join_sent, req);
183 static void cluster_wait_join_sent(struct tevent_req *subreq)
185 struct tevent_req *req = tevent_req_callback_data(
186 subreq, struct tevent_req);
187 struct cluster_wait_state *state = tevent_req_data(
188 req, struct cluster_wait_state);
192 status = ctdb_client_message_recv(subreq, &ret);
195 tevent_req_error(req, ret);
199 subreq = tevent_wakeup_send(state, state->ev,
200 tevent_timeval_current_ofs(1, 0));
201 if (tevent_req_nomem(subreq, req)) {
204 tevent_req_set_callback(subreq, cluster_wait_join, req);
207 static void cluster_wait_join_handler(uint64_t srvid, TDB_DATA data,
210 struct tevent_req *req = talloc_get_type_abort(
211 private_data, struct tevent_req);
212 struct cluster_wait_state *state = tevent_req_data(
213 req, struct cluster_wait_state);
214 struct tevent_req *subreq;
218 if (srvid != MSG_ID_JOIN) {
222 if (data.dsize != sizeof(uint32_t)) {
226 pnn = *(uint32_t *)data.dptr;
228 if (pnn > state->num_nodes) {
232 state->ready[pnn] = true;
234 for (i=0; i<state->num_nodes; i++) {
235 if (! state->ready[i]) {
240 if (state->join_done) {
244 state->join_done = true;
245 subreq = ctdb_client_remove_message_handler_send(
246 state, state->ev, state->client,
248 if (tevent_req_nomem(subreq, req)) {
251 tevent_req_set_callback(subreq, cluster_wait_join_unregistered, req);
254 static void cluster_wait_join_unregistered(struct tevent_req *subreq)
256 struct tevent_req *req = tevent_req_callback_data(
257 subreq, struct tevent_req);
258 struct cluster_wait_state *state = tevent_req_data(
259 req, struct cluster_wait_state);
260 struct ctdb_req_message msg;
264 status = ctdb_client_remove_message_handler_recv(subreq, &ret);
266 tevent_req_error(req, ret);
270 msg.srvid = MSG_ID_SYNC;
271 msg.data.data = tdb_null;
273 subreq = ctdb_client_message_send(state, state->ev, state->client,
274 CTDB_BROADCAST_CONNECTED, &msg);
275 if (tevent_req_nomem(subreq, req)) {
278 tevent_req_set_callback(subreq, cluster_wait_sync_sent, req);
281 static void cluster_wait_sync_sent(struct tevent_req *subreq)
283 struct tevent_req *req = tevent_req_callback_data(
284 subreq, struct tevent_req);
288 status = ctdb_client_message_recv(subreq, &ret);
291 tevent_req_error(req, ret);
296 static void cluster_wait_sync_handler(uint64_t srvid, TDB_DATA data,
299 struct tevent_req *req = talloc_get_type_abort(
300 private_data, struct tevent_req);
301 struct cluster_wait_state *state = tevent_req_data(
302 req, struct cluster_wait_state);
303 struct tevent_req *subreq;
305 if (srvid != MSG_ID_SYNC) {
309 subreq = ctdb_client_remove_message_handler_send(
310 state, state->ev, state->client,
312 if (tevent_req_nomem(subreq, req)) {
315 tevent_req_set_callback(subreq, cluster_wait_sync_unregistered, req);
318 static void cluster_wait_sync_unregistered(struct tevent_req *subreq)
320 struct tevent_req *req = tevent_req_callback_data(
321 subreq, struct tevent_req);
325 status = ctdb_client_remove_message_handler_recv(subreq, &ret);
328 tevent_req_error(req, ret);
332 tevent_req_done(req);
335 bool cluster_wait_recv(struct tevent_req *req, int *perr)
339 if (tevent_req_is_unix_error(req, &err)) {