return lowest_lsn;
}
+/*
+ * Keep processing entries in the iclog callback list until we come around and
+ * it is empty. We need to atomically see that the list is empty and change the
+ * state to DIRTY so that we don't miss any more callbacks being added.
+ *
+ * This function is called with the icloglock held and returns with it held. We
+ * drop it while running callbacks, however, as holding it over thousands of
+ * callbacks is unnecessary and causes excessive contention if we do.
+ */
+static void
+xlog_state_do_iclog_callbacks(
+ struct xlog *log,
+ struct xlog_in_core *iclog,
+ bool aborted)
+{
+ spin_unlock(&log->l_icloglock);
+ spin_lock(&iclog->ic_callback_lock);
+ while (!list_empty(&iclog->ic_callbacks)) {
+ LIST_HEAD(tmp);
+
+ list_splice_init(&iclog->ic_callbacks, &tmp);
+
+ spin_unlock(&iclog->ic_callback_lock);
+ xlog_cil_process_committed(&tmp, aborted);
+ spin_lock(&iclog->ic_callback_lock);
+ }
+
+ /*
+ * Pick up the icloglock while still holding the callback lock so we
+ * serialise against anyone trying to add more callbacks to this iclog
+ * now we've finished processing.
+ */
+ spin_lock(&log->l_icloglock);
+ spin_unlock(&iclog->ic_callback_lock);
+}
+
#ifdef DEBUG
/*
* Make one last gasp attempt to see if iclogs are being left in limbo. If the
int flushcnt = 0;
xfs_lsn_t lowest_lsn;
int ioerrors; /* counter: iclogs with errors */
- int loopdidcallbacks; /* flag: inner loop did callbacks*/
- int funcdidcallbacks; /* flag: function did callbacks */
+ bool cycled_icloglock;
+ bool did_callbacks;
int repeats; /* for issuing console warnings if
* looping too many times */
spin_lock(&log->l_icloglock);
first_iclog = iclog = log->l_iclog;
ioerrors = 0;
- funcdidcallbacks = 0;
+ did_callbacks = 0;
repeats = 0;
do {
*/
first_iclog = log->l_iclog;
iclog = log->l_iclog;
- loopdidcallbacks = 0;
+ cycled_icloglock = false;
repeats++;
do {
} else
ioerrors++;
- spin_unlock(&log->l_icloglock);
-
/*
- * Keep processing entries in the callback list until
- * we come around and it is empty. We need to
- * atomically see that the list is empty and change the
- * state to DIRTY so that we don't miss any more
- * callbacks being added.
+ * Running callbacks will drop the icloglock which means
+ * we'll have to run at least one more complete loop.
*/
- spin_lock(&iclog->ic_callback_lock);
- while (!list_empty(&iclog->ic_callbacks)) {
- LIST_HEAD(tmp);
+ cycled_icloglock = true;
+ xlog_state_do_iclog_callbacks(log, iclog, aborted);
- list_splice_init(&iclog->ic_callbacks, &tmp);
-
- spin_unlock(&iclog->ic_callback_lock);
- xlog_cil_process_committed(&tmp, aborted);
- spin_lock(&iclog->ic_callback_lock);
- }
-
- loopdidcallbacks++;
- funcdidcallbacks++;
-
- spin_lock(&log->l_icloglock);
- spin_unlock(&iclog->ic_callback_lock);
if (!(iclog->ic_state & XLOG_STATE_IOERROR))
iclog->ic_state = XLOG_STATE_DIRTY;
iclog = iclog->ic_next;
} while (first_iclog != iclog);
+ did_callbacks |= cycled_icloglock;
+
if (repeats > 5000) {
flushcnt += repeats;
repeats = 0;
"%s: possible infinite loop (%d iterations)",
__func__, flushcnt);
}
- } while (!ioerrors && loopdidcallbacks);
+ } while (!ioerrors && cycled_icloglock);
- if (funcdidcallbacks)
+ if (did_callbacks)
xlog_state_callback_check_state(log);
if (log->l_iclog->ic_state & (XLOG_STATE_ACTIVE|XLOG_STATE_IOERROR))