}
for (ev = tevent_contexts; ev != NULL; ev = ev->next) {
+ struct tevent_threaded_context *tctx;
+
+ for (tctx = ev->threaded_contexts; tctx != NULL;
+ tctx = tctx->next) {
+ ret = pthread_mutex_lock(&tctx->event_ctx_mutex);
+ if (ret != 0) {
+ tevent_abort(ev, "pthread_mutex_lock failed");
+ }
+ }
+
ret = pthread_mutex_lock(&ev->scheduled_mutex);
if (ret != 0) {
tevent_abort(ev, "pthread_mutex_lock failed");
for (ev = DLIST_TAIL(tevent_contexts); ev != NULL;
ev = DLIST_PREV(ev)) {
+ struct tevent_threaded_context *tctx;
+
ret = pthread_mutex_unlock(&ev->scheduled_mutex);
if (ret != 0) {
tevent_abort(ev, "pthread_mutex_unlock failed");
}
+
+ for (tctx = DLIST_TAIL(ev->threaded_contexts); tctx != NULL;
+ tctx = DLIST_PREV(tctx)) {
+ ret = pthread_mutex_unlock(&tctx->event_ctx_mutex);
+ if (ret != 0) {
+ tevent_abort(
+ ev, "pthread_mutex_unlock failed");
+ }
+ }
}
ret = pthread_mutex_unlock(&tevent_contexts_mutex);
ev = DLIST_PREV(ev)) {
struct tevent_threaded_context *tctx;
- for (tctx = ev->threaded_contexts; tctx != NULL;
- tctx = tctx->next) {
+ for (tctx = DLIST_TAIL(ev->threaded_contexts); tctx != NULL;
+ tctx = DLIST_PREV(tctx)) {
tctx->event_ctx = NULL;
+
+ ret = pthread_mutex_unlock(&tctx->event_ctx_mutex);
+ if (ret != 0) {
+ tevent_abort(
+ ev, "pthread_mutex_unlock failed");
+ }
}
ev->threaded_contexts = NULL;
if (ret != 0) {
abort();
}
-#endif
- if (ev->threaded_contexts != NULL) {
+ while (ev->threaded_contexts != NULL) {
+ struct tevent_threaded_context *tctx = ev->threaded_contexts;
+
+ ret = pthread_mutex_lock(&tctx->event_ctx_mutex);
+ if (ret != 0) {
+ abort();
+ }
+
/*
- * Threaded contexts are indicators that threads are
- * about to send us immediates via
- * tevent_threaded_schedule_immediate. The caller
- * needs to make sure that the tevent context lives
- * long enough to receive immediates from all threads.
+ * Indicate to the thread that the tevent_context is
+ * gone. The counterpart of this is in
+ * _tevent_threaded_schedule_immediate, there we read
+ * this under the threaded_context's mutex.
*/
- tevent_abort(ev, "threaded contexts exist");
+
+ tctx->event_ctx = NULL;
+
+ ret = pthread_mutex_unlock(&tctx->event_ctx_mutex);
+ if (ret != 0) {
+ abort();
+ }
+
+ DLIST_REMOVE(ev->threaded_contexts, tctx);
}
+#endif
tevent_common_wakeup_fini(ev);
static int tevent_threaded_context_destructor(
struct tevent_threaded_context *tctx)
{
+ int ret;
+
if (tctx->event_ctx != NULL) {
DLIST_REMOVE(tctx->event_ctx->threaded_contexts, tctx);
}
+
+ ret = pthread_mutex_destroy(&tctx->event_ctx_mutex);
+ if (ret != 0) {
+ abort();
+ }
+
return 0;
}
return NULL;
}
tctx->event_ctx = ev;
+ tctx->wakeup_fd = ev->wakeup_fd;
+
+ ret = pthread_mutex_init(&tctx->event_ctx_mutex, NULL);
+ if (ret != 0) {
+ TALLOC_FREE(tctx);
+ return NULL;
+ }
DLIST_ADD(ev->threaded_contexts, tctx);
talloc_set_destructor(tctx, tevent_threaded_context_destructor);
const char *location)
{
#ifdef HAVE_PTHREAD
- struct tevent_context *ev = tctx->event_ctx;
+ struct tevent_context *ev;
int ret;
+ ret = pthread_mutex_lock(&tctx->event_ctx_mutex);
+ if (ret != 0) {
+ abort();
+ }
+
+ ev = tctx->event_ctx;
+
+ ret = pthread_mutex_unlock(&tctx->event_ctx_mutex);
+ if (ret != 0) {
+ abort();
+ }
+
+ if (ev == NULL) {
+ /*
+ * Our event context is already gone.
+ */
+ return;
+ }
+
if ((im->event_ctx != NULL) || (handler == NULL)) {
abort();
}
* than a noncontended one. So I'd opt for the lower footprint
* initially. Maybe we have to change that later.
*/
- tevent_common_wakeup(ev);
+ tevent_common_wakeup_fd(tctx->wakeup_fd);
#else
/*
* tevent_threaded_context_create() returned NULL with ENOSYS...