2 #include "system/network.h"
7 #include "lib/util/debug.h"
8 #include "lib/util/blocking.h"
9 #include "lib/util/tevent_unix.h"
10 #include "lib/util/byteorder.h"
11 #include "lib/messaging/messages_dgm.h"
13 #include "common/path.h"
14 #include "common/pkt_read.h"
15 #include "common/pkt_write.h"
17 #include "transport/transport.h"
18 #include "transport/transport_header.h"
19 #include "transport/transport_packet.h"
20 #include "transport/transport_db.h"
21 #include "transport/transport_api.h"
23 struct transport_context {
24 struct transport_db_context *db;
28 void (*disconnect_cb)(void *private_data);
29 void (*read_cb)(struct transport_endpoint *src,
30 struct transport_packet *pkt,
35 static int transport_context_destructor(struct transport_context *transport);
36 static void transport_read(struct tevent_context *ev,
43 struct transport_context *transport_init(
45 struct tevent_context *ev,
47 void (*disconnect_cb)(void *private_data),
48 void (*read_cb)(struct transport_endpoint *src,
49 struct transport_packet *pkt,
53 struct transport_context *transport;
56 transport = talloc(mem_ctx, struct transport_context);
57 if (transport == NULL) {
61 transport->db = transport_db_register(transport, &endpoint_id);
62 if (transport->db == NULL) {
65 transport->endpoint_id = endpoint_id;
67 transport->disconnect_cb = disconnect_cb;
68 transport->read_cb = read_cb;
69 transport->private_data = private_data;
71 ret = messaging_dgm_init(ev,
78 D_ERR("transport: Failed to initialize messaging\n");
82 talloc_set_destructor(transport, transport_context_destructor);
87 talloc_free(transport);
91 static int transport_context_destructor(struct transport_context *transport)
93 messaging_dgm_destroy();
98 static void transport_read(struct tevent_context *ev,
105 struct transport_context *transport = talloc_get_type_abort(
106 private_data, struct transport_context);
107 struct transport_packet *pkt;
108 struct transport_header header;
112 pkt = transport_packet_init(transport, msg, msg_len);
114 D_ERR("transport: Dropping packet\n");
118 ret = transport_header_pull(pkt, &header);
120 D_ERR("transport: Invalid packet, dropping\n");
124 ok = transport_header_verify(&header);
126 D_ERR("transport: Invalid header, dropping\n");
130 if (header.dst.endpoint != transport->endpoint_id) {
131 D_ERR("transport: Wrong destination 0x%x, dropping\n",
132 header.dst.endpoint);
136 transport->read_cb(&header.src, pkt, transport->private_data);
142 int transport_write(struct transport_context *transport,
143 struct transport_endpoint *dst,
144 struct transport_packet *pkt)
147 struct transport_header header;
148 struct transport_endpoint src;
154 pid = transport_db_lookup(transport->db, dst->endpoint);
156 D_ERR("transport: Unknown endpoint id 0x%x\n",
161 src.node = CTDB_NODE_LOCAL;
162 src.endpoint = transport->endpoint_id;
164 transport_header_fill(&header, &src, dst);
166 ret = transport_packet_finish(pkt, &buf, &buflen);
171 iov[0].iov_base = &header;
172 iov[0].iov_len = transport_header_len(&header);
174 iov[1].iov_base = buf;
175 iov[1].iov_len = buflen;
177 ret = messaging_dgm_send(pid, iov, 2, NULL, 0);
179 D_ERR("transport: Failed to send message\n");