r12242: - make the push notifications triggered by the change count
[kai/samba-autobuild/.git] / source4 / wrepl_server / wrepl_out_connection.c
1 /* 
2    Unix SMB/CIFS implementation.
3    
4    WINS Replication server
5    
6    Copyright (C) Stefan Metzmacher      2005
7    
8    This program is free software; you can redistribute it and/or modify
9    it under the terms of the GNU General Public License as published by
10    the Free Software Foundation; either version 2 of the License, or
11    (at your option) any later version.
12    
13    This program is distributed in the hope that it will be useful,
14    but WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16    GNU General Public License for more details.
17    
18    You should have received a copy of the GNU General Public License
19    along with this program; if not, write to the Free Software
20    Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21 */
22
23 #include "includes.h"
24 #include "dlinklist.h"
25 #include "lib/events/events.h"
26 #include "lib/socket/socket.h"
27 #include "smbd/service_task.h"
28 #include "smbd/service_stream.h"
29 #include "lib/messaging/irpc.h"
30 #include "librpc/gen_ndr/ndr_winsrepl.h"
31 #include "wrepl_server/wrepl_server.h"
32 #include "nbt_server/wins/winsdb.h"
33 #include "ldb/include/ldb.h"
34 #include "libcli/composite/composite.h"
35 #include "libcli/wrepl/winsrepl.h"
36 #include "wrepl_server/wrepl_out_helpers.h"
37
38 static void wreplsrv_pull_handler_te(struct event_context *ev, struct timed_event *te,
39                                      struct timeval t, void *ptr);
40
41 static void wreplsrv_pull_handler_creq(struct composite_context *creq)
42 {
43         struct wreplsrv_partner *partner = talloc_get_type(creq->async.private_data, struct wreplsrv_partner);
44         uint32_t interval;
45
46         partner->pull.last_status = wreplsrv_pull_cycle_recv(partner->pull.creq);
47         partner->pull.creq = NULL;
48         talloc_free(partner->pull.cycle_io);
49         partner->pull.cycle_io = NULL;
50
51         if (!NT_STATUS_IS_OK(partner->pull.last_status)) {
52                 interval = partner->pull.error_count * partner->pull.retry_interval;
53                 interval = MIN(interval, partner->pull.interval);
54                 partner->pull.error_count++;
55
56                 DEBUG(1,("wreplsrv_pull_cycle(%s): %s: next: %us\n",
57                          partner->address, nt_errstr(partner->pull.last_status),
58                          interval));
59         } else {
60                 interval = partner->pull.interval;
61                 partner->pull.error_count = 0;
62
63                 DEBUG(2,("wreplsrv_pull_cycle(%s): %s: next: %us\n",
64                          partner->address, nt_errstr(partner->pull.last_status),
65                          interval));
66         }
67
68         partner->pull.te = event_add_timed(partner->service->task->event_ctx, partner,
69                                            timeval_current_ofs(interval, 0),
70                                            wreplsrv_pull_handler_te, partner);
71         if (!partner->pull.te) {
72                 DEBUG(0,("wreplsrv_pull_handler_creq: event_add_timed() failed! no memory!\n"));
73         }
74 }
75
76 static void wreplsrv_pull_handler_te(struct event_context *ev, struct timed_event *te,
77                                      struct timeval t, void *ptr)
78 {
79         struct wreplsrv_partner *partner = talloc_get_type(ptr, struct wreplsrv_partner);
80
81         partner->pull.te = NULL;
82
83         partner->pull.cycle_io = talloc(partner, struct wreplsrv_pull_cycle_io);
84         if (!partner->pull.cycle_io) {
85                 goto requeue;
86         }
87
88         partner->pull.cycle_io->in.partner      = partner;
89         partner->pull.cycle_io->in.num_owners   = 0;
90         partner->pull.cycle_io->in.owners       = NULL;
91         partner->pull.cycle_io->in.wreplconn    = NULL;
92         partner->pull.creq = wreplsrv_pull_cycle_send(partner->pull.cycle_io, partner->pull.cycle_io);
93         if (!partner->pull.creq) {
94                 DEBUG(1,("wreplsrv_pull_cycle_send(%s) failed\n",
95                          partner->address));
96                 goto requeue;
97         }
98
99         partner->pull.creq->async.fn            = wreplsrv_pull_handler_creq;
100         partner->pull.creq->async.private_data  = partner;
101
102         return;
103 requeue:
104         talloc_free(partner->pull.cycle_io);
105         partner->pull.cycle_io = NULL;
106         /* retry later */
107         partner->pull.te = event_add_timed(partner->service->task->event_ctx, partner,
108                                            timeval_add(&t, partner->pull.retry_interval, 0),
109                                            wreplsrv_pull_handler_te, partner);
110         if (!partner->pull.te) {
111                 DEBUG(0,("wreplsrv_pull_handler_te: event_add_timed() failed! no memory!\n"));
112         }
113 }
114
115 NTSTATUS wreplsrv_sched_inform_action(struct wreplsrv_partner *partner, struct wrepl_table *inform_in)
116 {
117         if (partner->pull.creq) {
118                 /* there's already a pull in progress, so we're done */
119                 return NT_STATUS_OK;
120         }
121
122         /* remove the scheduled pull */
123         talloc_free(partner->pull.te);
124         partner->pull.te = NULL;
125
126         partner->pull.cycle_io = talloc(partner, struct wreplsrv_pull_cycle_io);
127         if (!partner->pull.cycle_io) {
128                 goto requeue;
129         }
130
131         partner->pull.cycle_io->in.partner      = partner;
132         partner->pull.cycle_io->in.num_owners   = inform_in->partner_count;
133         partner->pull.cycle_io->in.owners       = inform_in->partners;
134         talloc_steal(partner->pull.cycle_io, inform_in->partners);
135         partner->pull.cycle_io->in.wreplconn    = NULL;
136         partner->pull.creq = wreplsrv_pull_cycle_send(partner->pull.cycle_io, partner->pull.cycle_io);
137         if (!partner->pull.creq) {
138                 DEBUG(1,("wreplsrv_pull_cycle_send(%s) failed\n",
139                          partner->address));
140                 goto requeue;
141         }
142
143         partner->pull.creq->async.fn            = wreplsrv_pull_handler_creq;
144         partner->pull.creq->async.private_data  = partner;
145
146         return NT_STATUS_OK;
147 requeue:
148         talloc_free(partner->pull.cycle_io);
149         partner->pull.cycle_io = NULL;
150         /* retry later */
151         partner->pull.te = event_add_timed(partner->service->task->event_ctx, partner,
152                                            timeval_current_ofs(partner->pull.retry_interval, 0),
153                                            wreplsrv_pull_handler_te, partner);
154         if (!partner->pull.te) {
155                 DEBUG(0,("wreplsrv_pull_handler_te: event_add_timed() failed! no memory!\n"));
156         }
157
158         return NT_STATUS_OK;
159 }
160
161 NTSTATUS wreplsrv_setup_out_connections(struct wreplsrv_service *service)
162 {
163         struct wreplsrv_partner *cur;
164
165         for (cur = service->partners; cur; cur = cur->next) {
166                 if ((cur->type & WINSREPL_PARTNER_PULL) && cur->pull.interval) {
167                         cur->pull.te = event_add_timed(service->task->event_ctx, cur,
168                                                        timeval_zero(), wreplsrv_pull_handler_te, cur);
169                         NT_STATUS_HAVE_NO_MEMORY(cur->pull.te);
170                 }
171         }
172
173         return NT_STATUS_OK;
174 }