#include "common/logging.h"
#include "common/common.h"
-#define QUEUE_BUFFER_SIZE (16*1024)
-
/* structures for packet queueing - see common/ctdb_io.c */
struct ctdb_buffer {
uint8_t *data;
ctdb_queue_cb_fn_t callback;
bool *destroyed;
const char *name;
+ uint32_t buffer_size;
};
}
if (queue->buffer.length < pkt_size) {
- if (pkt_size > QUEUE_BUFFER_SIZE) {
+ if (pkt_size > queue->buffer_size) {
queue->buffer.extend = pkt_size;
}
return;
tevent_schedule_immediate(queue->im, queue->ctdb->ev,
queue_process_event, queue);
} else {
- if (queue->buffer.size > QUEUE_BUFFER_SIZE) {
+ if (queue->buffer.size > queue->buffer_size) {
TALLOC_FREE(queue->buffer.data);
queue->buffer.size = 0;
}
if (queue->buffer.data == NULL) {
/* starting fresh, allocate buf to read data */
- queue->buffer.data = talloc_size(queue, QUEUE_BUFFER_SIZE);
+ queue->buffer.data = talloc_size(queue, queue->buffer_size);
if (queue->buffer.data == NULL) {
DEBUG(DEBUG_ERR, ("read error alloc failed for %u\n", num_ready));
goto failed;
}
- queue->buffer.size = QUEUE_BUFFER_SIZE;
+ queue->buffer.size = queue->buffer_size;
} else if (queue->buffer.extend > 0) {
/* extending buffer */
data = talloc_realloc_size(queue, queue->buffer.data, queue->buffer.extend);
}
talloc_set_destructor(queue, queue_destructor);
+ queue->buffer_size = ctdb->tunable.queue_buffer_size;
+ /* In client code, ctdb->tunable is not initialized.
+ * This does not affect recovery daemon.
+ */
+ if (queue->buffer_size == 0) {
+ queue->buffer_size = 1024;
+ }
+
return queue;
}
offsetof(struct ctdb_tunable_list, lock_processes_per_db) },
{ "RecBufferSizeLimit", 1000*1000, false,
offsetof(struct ctdb_tunable_list, rec_buffer_size_limit) },
+ { "QueueBufferSize", 16*1024, false,
+ offsetof(struct ctdb_tunable_list, queue_buffer_size) },
{ NULL, 0, true, }
};