ctdb-common: Add line based I/O
authorAmitay Isaacs <amitay@gmail.com>
Wed, 18 Jul 2018 08:42:10 +0000 (18:42 +1000)
committerMartin Schwenke <martins@samba.org>
Sat, 28 Jul 2018 01:50:11 +0000 (03:50 +0200)
BUG: https://bugzilla.samba.org/show_bug.cgi?id=13520

Signed-off-by: Amitay Isaacs <amitay@gmail.com>
Reviewed-by: Martin Schwenke <martin@meltin.net>
ctdb/common/line.c [new file with mode: 0644]
ctdb/common/line.h [new file with mode: 0644]
ctdb/tests/cunit/line_test_001.sh [new file with mode: 0755]
ctdb/tests/src/line_test.c [new file with mode: 0644]
ctdb/wscript

diff --git a/ctdb/common/line.c b/ctdb/common/line.c
new file mode 100644 (file)
index 0000000..c4c6726
--- /dev/null
@@ -0,0 +1,145 @@
+/*
+   Line based I/O over fds
+
+   Copyright (C) Amitay Isaacs  2018
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; either version 3 of the License, or
+   (at your option) any later version.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "replace.h"
+
+#include <talloc.h>
+
+#include "lib/util/sys_rw.h"
+
+#include "common/line.h"
+
+struct line_read_state {
+       line_process_fn_t callback;
+       void *private_data;
+       char *buf;
+       size_t hint, len, offset;
+       int num_lines;
+};
+
+static bool line_read_one(char *buf, size_t start, size_t len, size_t *pos)
+{
+       size_t i;
+
+       for (i=start; i<len; i++) {
+               if (buf[i] == '\n' || buf[i] == '\0') {
+                       *pos = i;
+                       return true;
+               }
+       }
+
+       return false;
+}
+
+static int line_read_process(struct line_read_state *state)
+{
+       size_t start = 0;
+       size_t pos = 0;
+
+       while (1) {
+               int ret;
+               bool ok;
+
+               ok = line_read_one(state->buf, start, state->offset, &pos);
+               if (! ok) {
+                       break;
+               }
+
+               state->buf[pos] = '\0';
+               state->num_lines += 1;
+
+               ret = state->callback(state->buf + start, state->private_data);
+               if (ret != 0) {
+                       return ret;
+               }
+
+               start = pos+1;
+       }
+
+       if (pos > 0) {
+               if (pos+1 < state->offset) {
+                       memmove(state->buf,
+                               state->buf + pos+1,
+                               state->offset - (pos+1));
+               }
+               state->offset -= (pos+1);
+       }
+
+       return 0;
+}
+
+int line_read(int fd,
+             size_t length,
+             TALLOC_CTX *mem_ctx,
+             line_process_fn_t callback,
+             void *private_data,
+             int *num_lines)
+{
+       struct line_read_state state;
+
+       if (length < 32) {
+               length = 32;
+       }
+
+       state = (struct line_read_state) {
+               .callback = callback,
+               .private_data = private_data,
+               .hint = length,
+       };
+
+       while (1) {
+               ssize_t n;
+               int ret;
+
+               if (state.offset == state.len) {
+                       state.len += state.hint;
+                       state.buf = talloc_realloc_size(mem_ctx,
+                                                       state.buf,
+                                                       state.len);
+                       if (state.buf == NULL) {
+                               return ENOMEM;
+                       }
+               }
+
+               n = sys_read(fd,
+                            state.buf + state.offset,
+                            state.len - state.offset);
+               if (n < 0) {
+                       return errno;
+               }
+               if (n == 0) {
+                       break;
+               }
+
+               state.offset += n;
+
+               ret = line_read_process(&state);
+               if (ret != 0) {
+                       if (num_lines != NULL) {
+                               *num_lines = state.num_lines;
+                       }
+                       return ret;
+               }
+       }
+
+       if (num_lines != NULL) {
+               *num_lines = state.num_lines;
+       }
+       return 0;
+}
diff --git a/ctdb/common/line.h b/ctdb/common/line.h
new file mode 100644 (file)
index 0000000..6b67f1e
--- /dev/null
@@ -0,0 +1,62 @@
+/*
+   Line based I/O over fds
+
+   Copyright (C) Amitay Isaacs  2018
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; either version 3 of the License, or
+   (at your option) any later version.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, see <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef __CTDB_LINE_H__
+#define __CTDB_LINE_H__
+
+#include <talloc.h>
+
+/**
+ * @file line.h
+ *
+ * @brief Line based I/O over pipes and sockets
+ */
+
+/**
+ * @brief The callback routine called to process a line
+ *
+ * @param[in]  line The line read
+ * @param[in]  private_data Private data for callback
+ * @return 0 to continue processing lines, non-zero to stop reading
+ */
+typedef int (*line_process_fn_t)(char *line, void *private_data);
+
+/**
+ * @brief Read a line (terminated by \n or \0)
+ *
+ * If there is any read error on fd, then errno will be returned.
+ * If callback function returns a non-zero value, then that value will be
+ * returned.
+ *
+ * @param[in]  fd The file descriptor
+ * @param[in]  length The expected length of a line (this is only a hint)
+ * @param[in]  mem_ctx Talloc memory context
+ * @param[in]  callback Callback function called when a line is read
+ * @param[in]  private_data Private data for callback
+ * @param[out] num_lines Number of lines read so far
+ * @return 0 on on success, errno on failure
+ */
+int line_read(int fd,
+             size_t length,
+             TALLOC_CTX *mem_ctx,
+             line_process_fn_t callback,
+             void *private_data,
+             int *num_lines);
+
+#endif /* __CTDB_LINE_H__ */
diff --git a/ctdb/tests/cunit/line_test_001.sh b/ctdb/tests/cunit/line_test_001.sh
new file mode 100755 (executable)
index 0000000..991d01a
--- /dev/null
@@ -0,0 +1,90 @@
+#!/bin/sh
+
+. "${TEST_SCRIPTS_DIR}/unit.sh"
+
+tfile="${TEST_VAR_DIR}/line.$$"
+
+remove_files ()
+{
+       rm -f "$tfile"
+}
+
+test_cleanup remove_files
+
+> "$tfile"
+
+ok_null
+unit_test line_test "$tfile"
+
+printf "\0" > "$tfile"
+
+required_result 1 <<EOF
+
+EOF
+
+unit_test line_test "$tfile"
+
+echo -n "hello" > "$tfile"
+
+ok_null
+unit_test line_test "$tfile"
+
+cat <<EOF > "$tfile"
+hello
+world
+EOF
+
+required_result 2 << EOF
+hello
+world
+EOF
+unit_test line_test "$tfile"
+
+required_result 2 << EOF
+hello
+world
+EOF
+unit_test line_test "$tfile"
+
+cat <<EOF > "$tfile"
+This is a really long long line full of random words and hopefully it will be read properly by the line test program and identified as a single line
+EOF
+
+required_result 1 <<EOF
+This is a really long long line full of random words and hopefully it will be read properly by the line test program and identified as a single line
+EOF
+unit_test line_test "$tfile"
+
+cat <<EOF > "$tfile"
+line number one
+line number two
+line number one
+line number two
+line number one
+EOF
+
+required_result 5 <<EOF
+line number one
+line number two
+line number one
+line number two
+line number one
+EOF
+unit_test line_test "$tfile" 64
+
+cat <<EOF > "$tfile"
+this is line number one
+this is line number two
+this is line number three
+this is line number four
+this is line number five
+EOF
+
+required_result 5 <<EOF
+this is line number one
+this is line number two
+this is line number three
+this is line number four
+this is line number five
+EOF
+unit_test line_test "$tfile" 64
diff --git a/ctdb/tests/src/line_test.c b/ctdb/tests/src/line_test.c
new file mode 100644 (file)
index 0000000..0c5a821
--- /dev/null
@@ -0,0 +1,102 @@
+/*
+   Test code for line based I/O over fds
+
+   Copyright (C) Amitay Isaacs  2018
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; either version 3 of the License, or
+   (at your option) any later version.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "replace.h"
+#include "system/filesys.h"
+
+#include <talloc.h>
+#include <assert.h>
+
+#include "common/line.c"
+
+static int line_print(char *line, void *private_data)
+{
+       printf("%s\n", line);
+       fflush(stdout);
+
+       return 0;
+}
+
+int main(int argc, const char **argv)
+{
+       TALLOC_CTX *mem_ctx;
+       size_t hint = 32;
+       pid_t pid;
+       int ret, lines = 0;
+       int pipefd[2];
+
+       if (argc < 2 || argc > 3) {
+               fprintf(stderr, "Usage: %s <filename> [<hint>]\n", argv[0]);
+               exit(1);
+       }
+
+       if (argc == 3) {
+               long value;
+
+               value = atol(argv[2]);
+               assert(value > 0);
+               hint = value;
+       }
+
+       ret = pipe(pipefd);
+       assert(ret == 0);
+
+       pid = fork();
+       assert(pid != -1);
+
+       if (pid == 0) {
+               char buffer[16];
+               ssize_t n, n2;
+               int fd;
+
+               close(pipefd[0]);
+
+               fd = open(argv[1], O_RDONLY);
+               assert(fd != -1);
+
+               while (1) {
+                       n = read(fd, buffer, sizeof(buffer));
+                       assert(n >= 0 && n <= sizeof(buffer));
+
+                       if (n == 0) {
+                               break;
+                       }
+
+                       n2 = write(pipefd[1], buffer, n);
+                       assert(n2 == n);
+               }
+
+               close(pipefd[1]);
+               close(fd);
+
+               exit(0);
+       }
+
+       close(pipefd[1]);
+
+       mem_ctx = talloc_new(NULL);
+       assert(mem_ctx != NULL);
+
+       ret = line_read(pipefd[0], hint, NULL, line_print, NULL, &lines);
+       assert(ret == 0);
+
+       talloc_free(mem_ctx);
+
+       return lines;
+}
index c26bd8c0d9a29838f56508d13bd5fc438fa16c5d..6d69545b6aa505257c3bc61f01168860768af285 100644 (file)
@@ -404,7 +404,7 @@ def build(bld):
                                              pidfile.c run_proc.c
                                              hash_count.c run_event.c
                                              sock_client.c version.c
-                                             cmdline.c path.c conf.c
+                                             cmdline.c path.c conf.c line.c
                                           '''),
                         deps='''samba-util sys_rw tevent-util
                                 replace talloc tevent tdb popt''')
@@ -868,6 +868,7 @@ def build(bld):
         'run_event_test',
         'cmdline_test',
         'conf_test',
+        'line_test',
     ]
 
     for target in ctdb_unit_tests: