r12391: use the new periodic schedule system for the pull replication too
[kai/samba.git] / source4 / wrepl_server / wrepl_in_call.c
1 /* 
2    Unix SMB/CIFS implementation.
3    
4    WINS Replication server
5    
6    Copyright (C) Stefan Metzmacher      2005
7    
8    This program is free software; you can redistribute it and/or modify
9    it under the terms of the GNU General Public License as published by
10    the Free Software Foundation; either version 2 of the License, or
11    (at your option) any later version.
12    
13    This program is distributed in the hope that it will be useful,
14    but WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16    GNU General Public License for more details.
17    
18    You should have received a copy of the GNU General Public License
19    along with this program; if not, write to the Free Software
20    Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21 */
22
23 #include "includes.h"
24 #include "dlinklist.h"
25 #include "lib/events/events.h"
26 #include "lib/socket/socket.h"
27 #include "smbd/service_task.h"
28 #include "smbd/service_stream.h"
29 #include "lib/messaging/irpc.h"
30 #include "librpc/gen_ndr/ndr_winsrepl.h"
31 #include "librpc/gen_ndr/ndr_nbt.h"
32 #include "libcli/wrepl/winsrepl.h"
33 #include "wrepl_server/wrepl_server.h"
34 #include "wrepl_server/wrepl_out_helpers.h"
35 #include "libcli/composite/composite.h"
36 #include "nbt_server/wins/winsdb.h"
37 #include "lib/ldb/include/ldb.h"
38 #include "lib/ldb/include/ldb_errors.h"
39
40 static NTSTATUS wreplsrv_in_start_association(struct wreplsrv_in_call *call)
41 {
42         struct wrepl_start *start       = &call->req_packet.message.start;
43         struct wrepl_start *start_reply = &call->rep_packet.message.start_reply;
44
45         if (call->req_packet.opcode & WREPL_OPCODE_BITS) {
46                 /*
47                  *if the assoc_ctx doesn't match ignore the packet
48                  */
49                 if ((call->req_packet.assoc_ctx != call->wreplconn->assoc_ctx.our_ctx)
50                    && (call->req_packet.assoc_ctx != 0)) {
51                         return ERROR_INVALID_PARAMETER;
52                 }
53         } else {
54                 call->wreplconn->assoc_ctx.our_ctx = WREPLSRV_INVALID_ASSOC_CTX;
55                 return NT_STATUS_OK;
56         }
57
58         if (start->minor_version != 2 || start->major_version != 5) {
59                 /* w2k terminate the connection if the versions doesn't match */
60                 return NT_STATUS_UNKNOWN_REVISION;
61         }
62
63         call->wreplconn->assoc_ctx.stopped      = False;
64         call->wreplconn->assoc_ctx.our_ctx      = WREPLSRV_VALID_ASSOC_CTX;
65         call->wreplconn->assoc_ctx.peer_ctx     = start->assoc_ctx;
66
67         call->rep_packet.mess_type              = WREPL_START_ASSOCIATION_REPLY;
68         start_reply->assoc_ctx                  = call->wreplconn->assoc_ctx.our_ctx;
69         start_reply->minor_version              = 2;
70         start_reply->major_version              = 5;
71
72         return NT_STATUS_OK;
73 }
74
75 static NTSTATUS wreplsrv_in_stop_assoc_ctx(struct wreplsrv_in_call *call)
76 {
77         struct wrepl_stop *stop_out             = &call->rep_packet.message.stop;
78
79         call->wreplconn->assoc_ctx.stopped      = True;
80
81         call->rep_packet.mess_type              = WREPL_STOP_ASSOCIATION;
82         stop_out->reason                        = 4;
83
84         return NT_STATUS_OK;
85 }
86
87 static NTSTATUS wreplsrv_in_stop_association(struct wreplsrv_in_call *call)
88 {
89         /*
90          * w2k only check the assoc_ctx if the opcode has the 0x00007800 bits are set
91          */
92         if (call->req_packet.opcode & WREPL_OPCODE_BITS) {
93                 /*
94                  *if the assoc_ctx doesn't match ignore the packet
95                  */
96                 if (call->req_packet.assoc_ctx != call->wreplconn->assoc_ctx.our_ctx) {
97                         return ERROR_INVALID_PARAMETER;
98                 }
99                 /* when the opcode bits are set the connection should be directly terminated */
100                 return NT_STATUS_CONNECTION_RESET;
101         }
102
103         if (call->wreplconn->assoc_ctx.stopped) {
104                 /* this causes the connection to be directly terminated */
105                 return NT_STATUS_CONNECTION_RESET;
106         }
107
108         /* this will cause to not receive packets anymore and terminate the connection if the reply is send */
109         call->terminate_after_send = True;
110         return wreplsrv_in_stop_assoc_ctx(call);
111 }
112
113 static NTSTATUS wreplsrv_in_table_query(struct wreplsrv_in_call *call)
114 {
115         struct wreplsrv_service *service = call->wreplconn->service;
116         struct wrepl_replication *repl_out = &call->rep_packet.message.replication;
117         struct wrepl_table *table_out = &call->rep_packet.message.replication.info.table;
118         const char *our_ip = call->wreplconn->our_ip;
119
120         repl_out->command = WREPL_REPL_TABLE_REPLY;
121
122         return wreplsrv_fill_wrepl_table(service, call, table_out,
123                                          our_ip, our_ip, True);
124 }
125
126 static int wreplsrv_in_sort_wins_name(struct wrepl_wins_name *n1,
127                                       struct wrepl_wins_name *n2)
128 {
129         if (n1->id < n2->id) return -1;
130         if (n1->id > n2->id) return 1;
131         return 0;
132 }
133
134 static NTSTATUS wreplsrv_record2wins_name(TALLOC_CTX *mem_ctx,
135                                           const char *our_address,
136                                           struct wrepl_wins_name *name,
137                                           struct winsdb_record *rec)
138 {
139         uint32_t num_ips, i;
140         struct wrepl_ip *ips;
141
142         name->name              = rec->name;
143         talloc_steal(mem_ctx, rec->name);
144
145         name->id                = rec->version;
146         name->unknown           = WINSDB_GROUP_ADDRESS;
147
148         name->flags             = WREPL_NAME_FLAGS(rec->type, rec->state, rec->node, rec->is_static);
149
150         switch (name->flags & 2) {
151         case 0:
152                 name->addresses.ip                      = rec->addresses[0]->address;
153                 talloc_steal(mem_ctx, rec->addresses[0]->address);
154                 break;
155         case 2:
156                 num_ips = winsdb_addr_list_length(rec->addresses);
157                 ips     = talloc_array(mem_ctx, struct wrepl_ip, num_ips);
158                 NT_STATUS_HAVE_NO_MEMORY(ips);
159
160                 for (i = 0; i < num_ips; i++) {
161                         if (strcasecmp(WINSDB_OWNER_LOCAL, rec->addresses[i]->wins_owner) == 0) {
162                                 ips[i].owner    = talloc_strdup(ips, our_address);
163                                 NT_STATUS_HAVE_NO_MEMORY(ips[i].owner);
164                         } else {
165                                 ips[i].owner    = rec->addresses[i]->wins_owner;
166                                 talloc_steal(ips, rec->addresses[i]->wins_owner);
167                         }
168                         ips[i].ip       = rec->addresses[i]->address;
169                         talloc_steal(ips, rec->addresses[i]->address);
170                 }
171
172                 name->addresses.addresses.num_ips       = num_ips;
173                 name->addresses.addresses.ips           = ips;
174                 break;
175         }
176
177         return NT_STATUS_OK;
178 }
179
180 static NTSTATUS wreplsrv_in_send_request(struct wreplsrv_in_call *call)
181 {
182         struct wreplsrv_service *service = call->wreplconn->service;
183         struct wrepl_wins_owner *owner_in = &call->req_packet.message.replication.info.owner;
184         struct wrepl_replication *repl_out = &call->rep_packet.message.replication;
185         struct wrepl_send_reply *reply_out = &call->rep_packet.message.replication.info.reply;
186         struct wreplsrv_owner local_owner;
187         struct wreplsrv_owner *owner;
188         const char *filter;
189         struct ldb_result *res = NULL;
190         int ret;
191         struct wrepl_wins_name *names;
192         struct winsdb_record *rec;
193         NTSTATUS status;
194         uint32_t i;
195
196         if (strcmp(call->wreplconn->our_ip, owner_in->address) == 0) {
197                 ZERO_STRUCT(local_owner);
198                 local_owner.owner.address       = WINSDB_OWNER_LOCAL;
199                 local_owner.owner.min_version   = 0;
200                 local_owner.owner.max_version   = wreplsrv_local_max_version(service);
201                 local_owner.owner.type          = 1;
202                 owner = &local_owner;
203         } else {
204                 owner = wreplsrv_find_owner(service->table, owner_in->address);
205         }
206
207         repl_out->command       = WREPL_REPL_SEND_REPLY;
208         reply_out->num_names    = 0;
209         reply_out->names        = NULL;
210
211         /*
212          * if we didn't know this owner, must be a bug in the partners client code...
213          * return an empty list.
214          */
215         if (!owner) {
216                 return NT_STATUS_OK;
217         }
218
219         /*
220          * the client sends a max_version of 0, interpret it as
221          * (uint64_t)-1
222          */
223         if (owner_in->max_version == 0) {
224                 owner_in->max_version = (uint64_t)-1;
225         }
226
227         /*
228          * if the partner ask for nothing, or give invalid ranges,
229          * return an empty list.
230          */
231         if (owner_in->min_version > owner_in->max_version) {
232                 return NT_STATUS_OK;
233         }
234
235         /*
236          * if the partner has already all records for nothing, or give invalid ranges,
237          * return an empty list.
238          */
239         if (owner_in->min_version > owner->owner.max_version) {
240                 return NT_STATUS_OK;
241         }
242
243         filter = talloc_asprintf(call,
244                                  "(&(winsOwner=%s)(objectClass=winsRecord)"
245                                  "(|(recordState=%u)(recordState=%u))"
246                                  "(versionID>=%llu)(versionID<=%llu))",
247                                  owner->owner.address,
248                                  WREPL_STATE_ACTIVE, WREPL_STATE_TOMBSTONE,
249                                  (long long)owner_in->min_version, 
250                                  (long long)owner_in->max_version);
251         NT_STATUS_HAVE_NO_MEMORY(filter);
252         ret = ldb_search(service->wins_db, NULL, LDB_SCOPE_SUBTREE, filter, NULL, &res);
253         if (ret != LDB_SUCCESS) return NT_STATUS_INTERNAL_DB_CORRUPTION;
254         talloc_steal(call, res);
255         if (res->count == 0) {
256                 DEBUG(2,("WINSREPL:reply [%u] records owner[%s] min[%llu] max[%llu] to partner[%s]\n",
257                         res->count, owner_in->address, 
258                         (long long)owner_in->min_version, 
259                         (long long)owner_in->max_version,
260                         call->wreplconn->partner->address));
261                 return NT_STATUS_OK;
262         }
263
264         names = talloc_array(call, struct wrepl_wins_name, res->count);
265         NT_STATUS_HAVE_NO_MEMORY(names);
266
267         for (i = 0; i < res->count; i++) {
268                 status = winsdb_record(res->msgs[i], call, &rec);
269                 NT_STATUS_NOT_OK_RETURN(status);
270
271                 status = wreplsrv_record2wins_name(names, call->wreplconn->our_ip, &names[i], rec);
272                 NT_STATUS_NOT_OK_RETURN(status);
273                 talloc_free(rec);
274                 talloc_free(res->msgs[i]);
275         }
276
277         /* sort the names before we send them */
278         qsort(names, res->count, sizeof(struct wrepl_wins_name), (comparison_fn_t)wreplsrv_in_sort_wins_name);
279
280         DEBUG(2,("WINSREPL:reply [%u] records owner[%s] min[%llu] max[%llu] to partner[%s]\n",
281                 res->count, owner_in->address, 
282                 (long long)owner_in->min_version, 
283                 (long long)owner_in->max_version,
284                 call->wreplconn->partner->address));
285
286         reply_out->num_names    = res->count;
287         reply_out->names        = names;
288
289         return NT_STATUS_OK;
290 }
291
292 struct wreplsrv_in_update_state {
293         struct wreplsrv_in_connection *wrepl_in;
294         struct wreplsrv_out_connection *wrepl_out;
295         struct composite_context *creq;
296         struct wreplsrv_pull_cycle_io cycle_io;
297 };
298
299 static void wreplsrv_in_update_handler(struct composite_context *creq)
300 {
301         struct wreplsrv_in_update_state *update_state = talloc_get_type(creq->async.private_data,
302                                                         struct wreplsrv_in_update_state);
303         NTSTATUS status;
304
305         status = wreplsrv_pull_cycle_recv(creq);
306
307         talloc_free(update_state->wrepl_out);
308
309         wreplsrv_terminate_in_connection(update_state->wrepl_in, nt_errstr(status));
310 }
311
312 static NTSTATUS wreplsrv_in_update(struct wreplsrv_in_call *call)
313 {
314         struct wreplsrv_in_connection *wrepl_in = call->wreplconn;
315         struct wreplsrv_out_connection *wrepl_out;
316         struct wrepl_table *update_in = &call->req_packet.message.replication.info.table;
317         struct wreplsrv_in_update_state *update_state;
318         uint16_t fde_flags;
319
320         DEBUG(2,("WREPL_REPL_UPDATE: partner[%s] initiator[%s] num_owners[%u]\n",
321                 call->wreplconn->partner->address,
322                 update_in->initiator, update_in->partner_count));
323
324         /* 
325          * we need to flip the connection into a client connection
326          * and do a WREPL_REPL_SEND_REQUEST's on the that connection
327          * and then stop this connection
328          */
329         fde_flags = event_get_fd_flags(wrepl_in->conn->event.fde);
330         talloc_free(wrepl_in->conn->event.fde);
331         wrepl_in->conn->event.fde = NULL;
332
333         update_state = talloc(wrepl_in, struct wreplsrv_in_update_state);
334         NT_STATUS_HAVE_NO_MEMORY(update_state);
335
336         wrepl_out = talloc(update_state, struct wreplsrv_out_connection);
337         NT_STATUS_HAVE_NO_MEMORY(wrepl_out);
338         wrepl_out->service              = wrepl_in->service;
339         wrepl_out->partner              = wrepl_in->partner;
340         wrepl_out->assoc_ctx.our_ctx    = wrepl_in->assoc_ctx.our_ctx;
341         wrepl_out->assoc_ctx.peer_ctx   = wrepl_in->assoc_ctx.peer_ctx;
342         wrepl_out->sock                 = wrepl_socket_merge(wrepl_out,
343                                                              wrepl_in->conn->event.ctx,
344                                                              wrepl_in->conn->socket,
345                                                              wrepl_in->packet);
346         NT_STATUS_HAVE_NO_MEMORY(wrepl_out->sock);
347
348         event_set_fd_flags(wrepl_out->sock->event.fde, fde_flags);
349
350         update_state->wrepl_in                  = wrepl_in;
351         update_state->wrepl_out                 = wrepl_out;
352         update_state->cycle_io.in.partner       = wrepl_out->partner;
353         update_state->cycle_io.in.num_owners    = update_in->partner_count;
354         update_state->cycle_io.in.owners        = update_in->partners;
355         talloc_steal(update_state, update_in->partners);
356         update_state->cycle_io.in.wreplconn     = wrepl_out;
357         update_state->creq = wreplsrv_pull_cycle_send(update_state, &update_state->cycle_io);
358         if (!update_state->creq) {
359                 return NT_STATUS_INTERNAL_ERROR;
360         }
361
362         update_state->creq->async.fn            = wreplsrv_in_update_handler;
363         update_state->creq->async.private_data  = update_state;
364
365         return ERROR_INVALID_PARAMETER;
366 }
367
368 static NTSTATUS wreplsrv_in_update2(struct wreplsrv_in_call *call)
369 {
370         return wreplsrv_in_update(call);
371 }
372
373 static NTSTATUS wreplsrv_in_inform(struct wreplsrv_in_call *call)
374 {
375         struct wrepl_table *inform_in = &call->req_packet.message.replication.info.table;
376
377         DEBUG(2,("WREPL_REPL_INFORM: partner[%s] initiator[%s] num_owners[%u]\n",
378                 call->wreplconn->partner->address,
379                 inform_in->initiator, inform_in->partner_count));
380
381         wreplsrv_out_partner_pull(call->wreplconn->partner, inform_in);
382
383         /* we don't reply to WREPL_REPL_INFORM messages */
384         return ERROR_INVALID_PARAMETER;
385 }
386
387 static NTSTATUS wreplsrv_in_inform2(struct wreplsrv_in_call *call)
388 {
389         return wreplsrv_in_inform(call);
390 }
391
392 static NTSTATUS wreplsrv_in_replication(struct wreplsrv_in_call *call)
393 {
394         struct wrepl_replication *repl_in = &call->req_packet.message.replication;
395         NTSTATUS status;
396
397         /*
398          * w2k only check the assoc_ctx if the opcode has the 0x00007800 bits are set
399          */
400         if (call->req_packet.opcode & WREPL_OPCODE_BITS) {
401                 /*
402                  *if the assoc_ctx doesn't match ignore the packet
403                  */
404                 if (call->req_packet.assoc_ctx != call->wreplconn->assoc_ctx.our_ctx) {
405                         return ERROR_INVALID_PARAMETER;
406                 }
407         }
408
409         if (!call->wreplconn->partner) {
410                 return wreplsrv_in_stop_assoc_ctx(call);
411         }
412
413         switch (repl_in->command) {
414                 case WREPL_REPL_TABLE_QUERY:
415                         if (!(call->wreplconn->partner->type & WINSREPL_PARTNER_PUSH)) {
416                                 return wreplsrv_in_stop_assoc_ctx(call);
417                         }
418                         status = wreplsrv_in_table_query(call);
419                         break;
420
421                 case WREPL_REPL_TABLE_REPLY:
422                         return ERROR_INVALID_PARAMETER;
423
424                 case WREPL_REPL_SEND_REQUEST:
425                         if (!(call->wreplconn->partner->type & WINSREPL_PARTNER_PUSH)) {
426                                 return wreplsrv_in_stop_assoc_ctx(call);
427                         }
428                         status = wreplsrv_in_send_request(call);
429                         break;
430
431                 case WREPL_REPL_SEND_REPLY:
432                         return ERROR_INVALID_PARAMETER;
433         
434                 case WREPL_REPL_UPDATE:
435                         if (!(call->wreplconn->partner->type & WINSREPL_PARTNER_PULL)) {
436                                 return wreplsrv_in_stop_assoc_ctx(call);
437                         }
438                         status = wreplsrv_in_update(call);
439                         break;
440
441                 case WREPL_REPL_UPDATE2:
442                         if (!(call->wreplconn->partner->type & WINSREPL_PARTNER_PULL)) {
443                                 return wreplsrv_in_stop_assoc_ctx(call);
444                         }
445                         status = wreplsrv_in_update2(call);
446                         break;
447
448                 case WREPL_REPL_INFORM:
449                         if (!(call->wreplconn->partner->type & WINSREPL_PARTNER_PULL)) {
450                                 return wreplsrv_in_stop_assoc_ctx(call);
451                         }
452                         status = wreplsrv_in_inform(call);
453                         break;
454
455                 case WREPL_REPL_INFORM2:
456                         if (!(call->wreplconn->partner->type & WINSREPL_PARTNER_PULL)) {
457                                 return wreplsrv_in_stop_assoc_ctx(call);
458                         }
459                         status = wreplsrv_in_inform2(call);
460                         break;
461
462                 default:
463                         return ERROR_INVALID_PARAMETER;
464         }
465
466         if (NT_STATUS_IS_OK(status)) {
467                 call->rep_packet.mess_type = WREPL_REPLICATION;
468         }
469
470         return status;
471 }
472
473 static NTSTATUS wreplsrv_in_invalid_assoc_ctx(struct wreplsrv_in_call *call)
474 {
475         struct wrepl_start *start       = &call->rep_packet.message.start;
476
477         call->rep_packet.opcode         = 0x00008583;
478         call->rep_packet.assoc_ctx      = 0;
479         call->rep_packet.mess_type      = WREPL_START_ASSOCIATION;
480
481         start->assoc_ctx                = 0x0000000a;
482         start->minor_version            = 0x0001;
483         start->major_version            = 0x0000;
484
485         call->rep_packet.padding        = data_blob_talloc(call, NULL, 4);
486         memset(call->rep_packet.padding.data, '\0', call->rep_packet.padding.length);
487
488         return NT_STATUS_OK;
489 }
490
491 NTSTATUS wreplsrv_in_call(struct wreplsrv_in_call *call)
492 {
493         NTSTATUS status;
494
495         if (!(call->req_packet.opcode & WREPL_OPCODE_BITS)
496             && (call->wreplconn->assoc_ctx.our_ctx == WREPLSRV_INVALID_ASSOC_CTX)) {
497                 return wreplsrv_in_invalid_assoc_ctx(call);
498         }
499
500         switch (call->req_packet.mess_type) {
501                 case WREPL_START_ASSOCIATION:
502                         status = wreplsrv_in_start_association(call);
503                         break;
504                 case WREPL_START_ASSOCIATION_REPLY:
505                         /* this is not valid here, so we ignore it */
506                         return ERROR_INVALID_PARAMETER;
507
508                 case WREPL_STOP_ASSOCIATION:
509                         status = wreplsrv_in_stop_association(call);
510                         break;
511
512                 case WREPL_REPLICATION:
513                         status = wreplsrv_in_replication(call);
514                         break;
515                 default:
516                         /* everythingelse is also not valid here, so we ignore it */
517                         return ERROR_INVALID_PARAMETER;
518         }
519
520         if (call->wreplconn->assoc_ctx.our_ctx == WREPLSRV_INVALID_ASSOC_CTX) {
521                 return wreplsrv_in_invalid_assoc_ctx(call);
522         }
523
524         if (NT_STATUS_IS_OK(status)) {
525                 /* let the backend to set some of the opcode bits, but always add the standards */
526                 call->rep_packet.opcode         |= WREPL_OPCODE_BITS;
527                 call->rep_packet.assoc_ctx      = call->wreplconn->assoc_ctx.peer_ctx;
528         }
529
530         return status;
531 }