4 Copyright (C) Amitay Isaacs 2015
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
21 #include "system/network.h"
22 #include "system/filesys.h"
28 #include "lib/util/tevent_unix.h"
30 #include "common/reqid.h"
31 #include "common/srvid.h"
32 #include "common/comm.h"
34 #include "protocol/protocol.h"
35 #include "protocol/protocol_api.h"
37 #include "client/client_private.h"
38 #include "client/client.h"
45 struct ctdb_client_message_state {
46 struct ctdb_client_context *client;
50 static int ctdb_client_message_state_destructor(
51 struct ctdb_client_message_state *state);
52 static void ctdb_client_message_done(struct tevent_req *subreq);
54 struct tevent_req *ctdb_client_message_send(TALLOC_CTX *mem_ctx,
55 struct tevent_context *ev,
56 struct ctdb_client_context *client,
58 struct ctdb_req_message *message)
60 struct tevent_req *req, *subreq;
61 struct ctdb_client_message_state *state;
62 struct ctdb_req_header h;
65 size_t datalen, buflen;
68 req = tevent_req_create(mem_ctx, &state,
69 struct ctdb_client_message_state);
74 reqid = reqid_new(client->idr, state);
75 if (reqid == REQID_INVALID) {
80 state->client = client;
83 talloc_set_destructor(state, ctdb_client_message_state_destructor);
85 ctdb_req_header_fill(&h, 0, CTDB_REQ_MESSAGE, destnode,
88 datalen = ctdb_req_message_len(&h, message);
89 ret = ctdb_allocate_pkt(state, datalen, &buf, &buflen);
91 tevent_req_error(req, ret);
92 return tevent_req_post(req, ev);
95 ret = ctdb_req_message_push(&h, message, buf, &buflen);
97 tevent_req_error(req, ret);
98 return tevent_req_post(req, ev);
101 subreq = comm_write_send(state, ev, client->comm, buf, buflen);
102 if (tevent_req_nomem(subreq, req)) {
103 return tevent_req_post(req, ev);
105 tevent_req_set_callback(subreq, ctdb_client_message_done, req);
110 static int ctdb_client_message_state_destructor(
111 struct ctdb_client_message_state *state)
113 reqid_remove(state->client->idr, state->reqid);
117 static void ctdb_client_message_done(struct tevent_req *subreq)
119 struct tevent_req *req = tevent_req_callback_data(
120 subreq, struct tevent_req);
124 status = comm_write_recv(subreq, &ret);
127 tevent_req_error(req, ret);
131 tevent_req_done(req);
134 bool ctdb_client_message_recv(struct tevent_req *req, int *perr)
138 if (tevent_req_is_unix_error(req, &err)) {
148 void ctdb_client_req_message(struct ctdb_client_context *client,
149 uint8_t *buf, size_t buflen, uint32_t reqid)
151 struct ctdb_req_header h;
152 struct ctdb_req_message_data message;
153 TALLOC_CTX *tmp_ctx = talloc_new(client);
156 ret = ctdb_req_message_data_pull(buf, buflen, &h, tmp_ctx, &message);
161 srvid_dispatch(client->srv, message.srvid, CTDB_SRVID_ALL,
163 talloc_free(tmp_ctx);
167 * Handle multiple nodes
170 struct ctdb_client_message_multi_state {
178 struct message_index_state {
179 struct tevent_req *req;
183 static void ctdb_client_message_multi_done(struct tevent_req *subreq);
185 struct tevent_req *ctdb_client_message_multi_send(
187 struct tevent_context *ev,
188 struct ctdb_client_context *client,
189 uint32_t *pnn_list, int count,
190 struct ctdb_req_message *message)
192 struct tevent_req *req, *subreq;
193 struct ctdb_client_message_multi_state *state;
196 if (pnn_list == NULL || count == 0) {
200 req = tevent_req_create(mem_ctx, &state,
201 struct ctdb_client_message_multi_state);
206 state->pnn_list = pnn_list;
207 state->count = count;
210 state->err_list = talloc_zero_array(state, int, count);
211 if (tevent_req_nomem(state->err_list, req)) {
212 return tevent_req_post(req, ev);
215 for (i=0; i<count; i++) {
216 struct message_index_state *substate;
218 subreq = ctdb_client_message_send(state, ev, client,
219 pnn_list[i], message);
220 if (tevent_req_nomem(subreq, req)) {
221 return tevent_req_post(req, ev);
224 substate = talloc(subreq, struct message_index_state);
225 if (tevent_req_nomem(substate, req)) {
226 return tevent_req_post(req, ev);
232 tevent_req_set_callback(subreq, ctdb_client_message_multi_done,
239 static void ctdb_client_message_multi_done(struct tevent_req *subreq)
241 struct message_index_state *substate = tevent_req_callback_data(
242 subreq, struct message_index_state);
243 struct tevent_req *req = substate->req;
244 int idx = substate->index;
245 struct ctdb_client_message_multi_state *state = tevent_req_data(
246 req, struct ctdb_client_message_multi_state);
250 status = ctdb_client_message_recv(subreq, &ret);
253 if (state->err == 0) {
255 state->err_list[idx] = state->err;
261 if (state->done == state->count) {
262 tevent_req_done(req);
266 bool ctdb_client_message_multi_recv(struct tevent_req *req, int *perr,
267 TALLOC_CTX *mem_ctx, int **perr_list)
269 struct ctdb_client_message_multi_state *state = tevent_req_data(
270 req, struct ctdb_client_message_multi_state);
273 if (tevent_req_is_unix_error(req, &err)) {
277 if (perr_list != NULL) {
278 *perr_list = talloc_steal(mem_ctx, state->err_list);
287 if (perr_list != NULL) {
288 *perr_list = talloc_steal(mem_ctx, state->err_list);
291 if (state->err != 0) {
299 * sync version of message send
302 int ctdb_client_message(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
303 struct ctdb_client_context *client,
304 uint32_t destnode, struct ctdb_req_message *message)
307 struct tevent_req *req;
311 tmp_ctx = talloc_new(client);
312 if (tmp_ctx == NULL) {
316 req = ctdb_client_message_send(tmp_ctx, ev, client, destnode, message);
318 talloc_free(tmp_ctx);
322 tevent_req_poll(req, ev);
324 status = ctdb_client_message_recv(req, &ret);
326 talloc_free(tmp_ctx);
330 talloc_free(tmp_ctx);
334 struct ctdb_client_set_message_handler_state {
335 struct ctdb_client_context *client;
337 srvid_handler_fn handler;
341 static void ctdb_client_set_message_handler_done(struct tevent_req *subreq);
343 struct tevent_req *ctdb_client_set_message_handler_send(
345 struct tevent_context *ev,
346 struct ctdb_client_context *client,
348 srvid_handler_fn handler,
351 struct tevent_req *req, *subreq;
352 struct ctdb_client_set_message_handler_state *state;
353 struct ctdb_req_control request;
355 req = tevent_req_create(mem_ctx, &state,
356 struct ctdb_client_set_message_handler_state);
361 state->client = client;
362 state->srvid = srvid;
363 state->handler = handler;
364 state->private_data = private_data;
366 ctdb_req_control_register_srvid(&request, srvid);
367 subreq = ctdb_client_control_send(state, ev, client, client->pnn,
368 tevent_timeval_zero(), &request);
369 if (tevent_req_nomem(subreq, req)) {
370 return tevent_req_post(req, ev);
372 tevent_req_set_callback(subreq, ctdb_client_set_message_handler_done,
378 static void ctdb_client_set_message_handler_done(struct tevent_req *subreq)
380 struct tevent_req *req = tevent_req_callback_data(
381 subreq, struct tevent_req);
382 struct ctdb_client_set_message_handler_state *state = tevent_req_data(
383 req, struct ctdb_client_set_message_handler_state);
384 struct ctdb_reply_control *reply;
388 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
391 tevent_req_error(req, ret);
395 ret = ctdb_reply_control_register_srvid(reply);
398 tevent_req_error(req, ret);
402 ret = srvid_register(state->client->srv, state->client, state->srvid,
403 state->handler, state->private_data);
405 tevent_req_error(req, ret);
409 tevent_req_done(req);
412 bool ctdb_client_set_message_handler_recv(struct tevent_req *req, int *perr)
416 if (tevent_req_is_unix_error(req, &err)) {
425 struct ctdb_client_remove_message_handler_state {
426 struct ctdb_client_context *client;
431 static void ctdb_client_remove_message_handler_done(struct tevent_req *subreq);
433 struct tevent_req *ctdb_client_remove_message_handler_send(
435 struct tevent_context *ev,
436 struct ctdb_client_context *client,
440 struct tevent_req *req, *subreq;
441 struct ctdb_client_remove_message_handler_state *state;
442 struct ctdb_req_control request;
444 req = tevent_req_create(mem_ctx, &state,
445 struct ctdb_client_remove_message_handler_state);
450 state->client = client;
451 state->srvid = srvid;
452 state->private_data = private_data;
454 ctdb_req_control_deregister_srvid(&request, srvid);
455 subreq = ctdb_client_control_send(state, ev, client, client->pnn,
456 tevent_timeval_zero(), &request);
457 if (tevent_req_nomem(subreq, req)) {
458 return tevent_req_post(req, ev);
460 tevent_req_set_callback(subreq,
461 ctdb_client_remove_message_handler_done, req);
466 static void ctdb_client_remove_message_handler_done(struct tevent_req *subreq)
468 struct tevent_req *req = tevent_req_callback_data(
469 subreq, struct tevent_req);
470 struct ctdb_client_remove_message_handler_state *state = tevent_req_data(
471 req, struct ctdb_client_remove_message_handler_state);
472 struct ctdb_reply_control *reply;
476 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
479 tevent_req_error(req, ret);
483 ret = ctdb_reply_control_deregister_srvid(reply);
486 tevent_req_error(req, ret);
490 ret = srvid_deregister(state->client->srv, state->srvid,
491 state->private_data);
493 tevent_req_error(req, ret);
497 tevent_req_done(req);
500 bool ctdb_client_remove_message_handler_recv(struct tevent_req *req, int *perr)
504 if (tevent_req_is_unix_error(req, &err)) {
513 int ctdb_client_set_message_handler(struct tevent_context *ev,
514 struct ctdb_client_context *client,
515 uint64_t srvid, srvid_handler_fn handler,
521 mem_ctx = talloc_new(client);
522 if (mem_ctx == NULL) {
526 ret = ctdb_ctrl_register_srvid(mem_ctx, ev, client, client->pnn,
527 tevent_timeval_zero(), srvid);
528 talloc_free(mem_ctx);
533 return srvid_register(client->srv, client, srvid,
534 handler, private_data);
537 int ctdb_client_remove_message_handler(struct tevent_context *ev,
538 struct ctdb_client_context *client,
539 uint64_t srvid, void *private_data)
544 mem_ctx = talloc_new(client);
545 if (mem_ctx == NULL) {
549 ret = ctdb_ctrl_deregister_srvid(mem_ctx, ev, client, client->pnn,
550 tevent_timeval_zero(), srvid);
551 talloc_free(mem_ctx);
556 return srvid_deregister(client->srv, srvid, private_data);