Linux-EROFS Archive mirror
 help / color / mirror / Atom feed
* [PATCH v4 0/5] erofs-utils: mkfs: introduce multi-threaded compression
@ 2024-02-28 16:16 Yifan Zhao
  2024-02-28 16:16 ` [PATCH v4 1/5] erofs-utils: introduce multi-threading framework Yifan Zhao
                   ` (4 more replies)
  0 siblings, 5 replies; 10+ messages in thread
From: Yifan Zhao @ 2024-02-28 16:16 UTC (permalink / raw
  To: hsiangkao; +Cc: linux-erofs

changelog since v3:
- remove unnecessary struct z_erofs_write_index_ctx, and add a
  helper z_erofs_mt_fix_index to fix extents->blkaddr before
  writing index
- rename: z_erofs_vle_compress_ctx -> z_erofs_compressed_segment_ctx
- rename: z_erofs_file_compress_ctx -> z_erofs_compressed_inode_ctx
- leave tryrecompress_trailing() unchanged as it's only used in
  ztailpacking scenario which is not supported yet  

Gao Xiang (2):
  erofs-utils: add a helper to get available processors
  erofs-utils: lib: introduce atomic operations

Yifan Zhao (3):
  erofs-utils: introduce multi-threading framework
  erofs-utils: mkfs: add --worker=# parameter
  erofs-utils: mkfs: introduce inner-file multi-threaded compression

 configure.ac                |  17 +
 include/erofs/atomic.h      |  28 ++
 include/erofs/compress.h    |   1 +
 include/erofs/config.h      |   5 +
 include/erofs/internal.h    |   3 +
 include/erofs/workqueue.h   |  37 ++
 lib/Makefile.am             |   4 +
 lib/compress.c              | 703 +++++++++++++++++++++++++++++-------
 lib/compressor.c            |   2 +
 lib/compressor_deflate.c    |  11 +-
 lib/compressor_libdeflate.c |   6 +-
 lib/compressor_liblzma.c    |   5 +-
 lib/config.c                |  16 +
 lib/workqueue.c             | 132 +++++++
 mkfs/main.c                 |  38 ++
 15 files changed, 877 insertions(+), 131 deletions(-)
 create mode 100644 include/erofs/atomic.h
 create mode 100644 include/erofs/workqueue.h
 create mode 100644 lib/workqueue.c

-- 
2.44.0


^ permalink raw reply	[flat|nested] 10+ messages in thread

* [PATCH v4 1/5] erofs-utils: introduce multi-threading framework
  2024-02-28 16:16 [PATCH v4 0/5] erofs-utils: mkfs: introduce multi-threaded compression Yifan Zhao
@ 2024-02-28 16:16 ` Yifan Zhao
  2024-02-29  9:43   ` Gao Xiang
  2024-02-28 16:16 ` [PATCH v4 2/5] erofs-utils: add a helper to get available processors Yifan Zhao
                   ` (3 subsequent siblings)
  4 siblings, 1 reply; 10+ messages in thread
From: Yifan Zhao @ 2024-02-28 16:16 UTC (permalink / raw
  To: hsiangkao; +Cc: linux-erofs

Add a workqueue implementation for multi-threading support inspired by
xfsprogs.

Signed-off-by: Yifan Zhao <zhaoyifan@sjtu.edu.cn>
Suggested-by: Gao Xiang <hsiangkao@linux.alibaba.com>
---
 configure.ac              |  16 +++++
 include/erofs/internal.h  |   3 +
 include/erofs/workqueue.h |  37 +++++++++++
 lib/Makefile.am           |   4 ++
 lib/workqueue.c           | 132 ++++++++++++++++++++++++++++++++++++++
 5 files changed, 192 insertions(+)
 create mode 100644 include/erofs/workqueue.h
 create mode 100644 lib/workqueue.c

diff --git a/configure.ac b/configure.ac
index 4b59230..3ccd6bb 100644
--- a/configure.ac
+++ b/configure.ac
@@ -96,6 +96,14 @@ AC_DEFUN([EROFS_UTILS_PARSE_DIRECTORY],
 
 AC_ARG_VAR([MAX_BLOCK_SIZE], [The maximum block size which erofs-utils supports])
 
+AC_MSG_CHECKING([whether to enable multi-threading support])
+AC_ARG_ENABLE([multithreading],
+    AS_HELP_STRING([--enable-multithreading],
+                   [enable multi-threading support @<:@default=no@:>@]),
+    [enable_multithreading="$enableval"],
+    [enable_multithreading="no"])
+AC_MSG_RESULT([$enable_multithreading])
+
 AC_ARG_ENABLE([debug],
     [AS_HELP_STRING([--enable-debug],
                     [enable debugging mode @<:@default=no@:>@])],
@@ -280,6 +288,13 @@ AS_IF([test "x$MAX_BLOCK_SIZE" = "x"], [
                              [erofs_cv_max_block_size=4096]))
 ], [erofs_cv_max_block_size=$MAX_BLOCK_SIZE])
 
+# Configure multi-threading support
+AS_IF([test "x$enable_multithreading" != "xno"], [
+    AC_CHECK_HEADERS([pthread.h])
+    AC_CHECK_LIB([pthread], [pthread_mutex_lock], [], AC_MSG_ERROR([libpthread is required for multi-threaded build]))
+    AC_DEFINE(EROFS_MT_ENABLED, 1, [Enable multi-threading support])
+], [])
+
 # Configure debug mode
 AS_IF([test "x$enable_debug" != "xno"], [], [
   dnl Turn off all assert checking.
@@ -471,6 +486,7 @@ AS_IF([test "x$enable_fuzzing" != "xyes"], [], [
 AM_CONDITIONAL([ENABLE_FUZZING], [test "x${enable_fuzzing}" = "xyes"])
 
 # Set up needed symbols, conditionals and compiler/linker flags
+AM_CONDITIONAL([ENABLE_EROFS_MT], [test "x${enable_multithreading}" != "xno"])
 AM_CONDITIONAL([ENABLE_LZ4], [test "x${have_lz4}" = "xyes"])
 AM_CONDITIONAL([ENABLE_LZ4HC], [test "x${have_lz4hc}" = "xyes"])
 AM_CONDITIONAL([ENABLE_FUSE], [test "x${have_fuse}" = "xyes"])
diff --git a/include/erofs/internal.h b/include/erofs/internal.h
index 82797e1..954aef4 100644
--- a/include/erofs/internal.h
+++ b/include/erofs/internal.h
@@ -22,6 +22,9 @@ typedef unsigned short umode_t;
 #include <sys/types.h> /* for off_t definition */
 #include <sys/stat.h> /* for S_ISCHR definition */
 #include <stdio.h>
+#ifdef HAVE_PTHREAD_H
+#include <pthread.h>
+#endif
 
 #ifndef PATH_MAX
 #define PATH_MAX        4096    /* # chars in a path name including nul */
diff --git a/include/erofs/workqueue.h b/include/erofs/workqueue.h
new file mode 100644
index 0000000..b4b3901
--- /dev/null
+++ b/include/erofs/workqueue.h
@@ -0,0 +1,37 @@
+/* SPDX-License-Identifier: GPL-2.0+ OR Apache-2.0 */
+#ifndef __EROFS_WORKQUEUE_H
+#define __EROFS_WORKQUEUE_H
+
+#include "internal.h"
+
+struct erofs_work;
+
+typedef void erofs_wq_func_t(struct erofs_work *);
+typedef void erofs_wq_priv_fini_t(void *);
+
+struct erofs_work {
+	void (*func)(struct erofs_work *work);
+	struct erofs_work *next;
+	void *priv;
+};
+
+struct erofs_workqueue {
+	struct erofs_work *head, *tail;
+	pthread_mutex_t lock;
+	pthread_cond_t cond_empty;
+	pthread_cond_t cond_full;
+	pthread_t *workers;
+	unsigned int nworker;
+	unsigned int max_jobs;
+	unsigned int job_count;
+	bool shutdown;
+	size_t priv_size;
+	erofs_wq_priv_fini_t *priv_fini;
+};
+
+int erofs_alloc_workqueue(struct erofs_workqueue *wq, unsigned int nworker,
+			 unsigned int max_jobs, size_t priv_size,
+			 erofs_wq_priv_fini_t *priv_fini);
+int erofs_queue_work(struct erofs_workqueue *wq, struct erofs_work *work);
+int erofs_destroy_workqueue(struct erofs_workqueue *wq);
+#endif
\ No newline at end of file
diff --git a/lib/Makefile.am b/lib/Makefile.am
index 54b9c9c..7307f7b 100644
--- a/lib/Makefile.am
+++ b/lib/Makefile.am
@@ -53,3 +53,7 @@ liberofs_la_SOURCES += kite_deflate.c compressor_deflate.c
 if ENABLE_LIBDEFLATE
 liberofs_la_SOURCES += compressor_libdeflate.c
 endif
+if ENABLE_EROFS_MT
+liberofs_la_CFLAGS += -lpthread
+liberofs_la_SOURCES += workqueue.c
+endif
diff --git a/lib/workqueue.c b/lib/workqueue.c
new file mode 100644
index 0000000..138afd5
--- /dev/null
+++ b/lib/workqueue.c
@@ -0,0 +1,132 @@
+// SPDX-License-Identifier: GPL-2.0+ OR Apache-2.0
+#include <pthread.h>
+#include <stdlib.h>
+#include "erofs/workqueue.h"
+
+static void *worker_thread(void *arg)
+{
+	struct erofs_workqueue *wq = arg;
+	struct erofs_work *work;
+	void *priv = NULL;
+
+	if (wq->priv_size) {
+		priv = calloc(wq->priv_size, 1);
+		assert(priv);
+	}
+
+	while (true) {
+		pthread_mutex_lock(&wq->lock);
+
+		while (wq->job_count == 0 && !wq->shutdown)
+			pthread_cond_wait(&wq->cond_empty, &wq->lock);
+		if (wq->job_count == 0 && wq->shutdown) {
+			pthread_mutex_unlock(&wq->lock);
+			break;
+		}
+
+		work = wq->head;
+		wq->head = work->next;
+		if (!wq->head)
+			wq->tail = NULL;
+		wq->job_count--;
+
+		if (wq->job_count == wq->max_jobs - 1)
+			pthread_cond_broadcast(&wq->cond_full);
+
+		pthread_mutex_unlock(&wq->lock);
+
+		work->priv = priv;
+		work->func(work);
+	}
+
+	if (priv) {
+		assert(wq->priv_fini);
+		(wq->priv_fini)(priv);
+		free(priv);
+	}
+
+	return NULL;
+}
+
+int erofs_alloc_workqueue(struct erofs_workqueue *wq, unsigned int nworker,
+			 unsigned int max_jobs, size_t priv_size,
+			 erofs_wq_priv_fini_t *priv_fini)
+{
+	unsigned int i;
+
+	if (!wq || nworker <= 0 || max_jobs <= 0)
+		return -EINVAL;
+
+	wq->head = wq->tail = NULL;
+	wq->nworker = nworker;
+	wq->max_jobs = max_jobs;
+	wq->job_count = 0;
+	wq->shutdown = false;
+	wq->priv_size = priv_size;
+	wq->priv_fini = priv_fini;
+	pthread_mutex_init(&wq->lock, NULL);
+	pthread_cond_init(&wq->cond_empty, NULL);
+	pthread_cond_init(&wq->cond_full, NULL);
+
+	wq->workers = malloc(nworker * sizeof(pthread_t));
+	if (!wq->workers)
+		return -ENOMEM;
+
+	for (i = 0; i < nworker; i++) {
+		if (pthread_create(&wq->workers[i], NULL, worker_thread, wq)) {
+			while (i--)
+				pthread_cancel(wq->workers[i]);
+			free(wq->workers);
+			return -ENOMEM;
+		}
+	}
+
+	return 0;
+}
+
+int erofs_queue_work(struct erofs_workqueue *wq, struct erofs_work *work)
+{
+	if (!wq || !work)
+		return -EINVAL;
+
+	pthread_mutex_lock(&wq->lock);
+
+	while (wq->job_count == wq->max_jobs)
+		pthread_cond_wait(&wq->cond_full, &wq->lock);
+
+	work->next = NULL;
+	if (!wq->head)
+		wq->head = work;
+	else
+		wq->tail->next = work;
+	wq->tail = work;
+	wq->job_count++;
+
+	pthread_cond_signal(&wq->cond_empty);
+	pthread_mutex_unlock(&wq->lock);
+
+	return 0;
+}
+
+int erofs_destroy_workqueue(struct erofs_workqueue *wq)
+{
+	unsigned int i;
+
+	if (!wq)
+		return -EINVAL;
+
+	pthread_mutex_lock(&wq->lock);
+	wq->shutdown = true;
+	pthread_cond_broadcast(&wq->cond_empty);
+	pthread_mutex_unlock(&wq->lock);
+
+	for (i = 0; i < wq->nworker; i++)
+		pthread_join(wq->workers[i], NULL);
+
+	free(wq->workers);
+	pthread_mutex_destroy(&wq->lock);
+	pthread_cond_destroy(&wq->cond_empty);
+	pthread_cond_destroy(&wq->cond_full);
+
+	return 0;
+}
\ No newline at end of file
-- 
2.44.0


^ permalink raw reply related	[flat|nested] 10+ messages in thread

* [PATCH v4 2/5] erofs-utils: add a helper to get available processors
  2024-02-28 16:16 [PATCH v4 0/5] erofs-utils: mkfs: introduce multi-threaded compression Yifan Zhao
  2024-02-28 16:16 ` [PATCH v4 1/5] erofs-utils: introduce multi-threading framework Yifan Zhao
@ 2024-02-28 16:16 ` Yifan Zhao
  2024-02-28 16:16 ` [PATCH v4 3/5] erofs-utils: mkfs: add --worker=# parameter Yifan Zhao
                   ` (2 subsequent siblings)
  4 siblings, 0 replies; 10+ messages in thread
From: Yifan Zhao @ 2024-02-28 16:16 UTC (permalink / raw
  To: hsiangkao; +Cc: linux-erofs

From: Gao Xiang <hsiangkao@linux.alibaba.com>

In order to prepare for multi-threaded decompression.

Signed-off-by: Gao Xiang <hsiangkao@linux.alibaba.com>
---
 configure.ac           |  1 +
 include/erofs/config.h |  1 +
 lib/config.c           | 12 ++++++++++++
 3 files changed, 14 insertions(+)

diff --git a/configure.ac b/configure.ac
index 3ccd6bb..2e69260 100644
--- a/configure.ac
+++ b/configure.ac
@@ -256,6 +256,7 @@ AC_CHECK_FUNCS(m4_flatten([
 	strerror
 	strrchr
 	strtoull
+	sysconf
 	tmpfile64
 	utimensat]))
 
diff --git a/include/erofs/config.h b/include/erofs/config.h
index eecf575..73e3ac2 100644
--- a/include/erofs/config.h
+++ b/include/erofs/config.h
@@ -109,6 +109,7 @@ static inline int erofs_selabel_open(const char *file_contexts)
 
 void erofs_update_progressinfo(const char *fmt, ...);
 char *erofs_trim_for_progressinfo(const char *str, int placeholder);
+unsigned int erofs_get_available_processors(void);
 
 #ifdef __cplusplus
 }
diff --git a/lib/config.c b/lib/config.c
index 1096cd1..947a183 100644
--- a/lib/config.c
+++ b/lib/config.c
@@ -14,6 +14,9 @@
 #ifdef HAVE_SYS_IOCTL_H
 #include <sys/ioctl.h>
 #endif
+#ifdef HAVE_UNISTD_H
+#include <unistd.h>
+#endif
 
 struct erofs_configure cfg;
 struct erofs_sb_info sbi;
@@ -177,3 +180,12 @@ void erofs_update_progressinfo(const char *fmt, ...)
 	fputs(msg, stdout);
 	fputc('\n', stdout);
 }
+
+unsigned int erofs_get_available_processors(void)
+{
+#if defined(HAVE_UNISTD_H) && defined(HAVE_SYSCONF)
+	return sysconf(_SC_NPROCESSORS_ONLN);
+#else
+	return 0;
+#endif
+}
-- 
2.44.0


^ permalink raw reply related	[flat|nested] 10+ messages in thread

* [PATCH v4 3/5] erofs-utils: mkfs: add --worker=# parameter
  2024-02-28 16:16 [PATCH v4 0/5] erofs-utils: mkfs: introduce multi-threaded compression Yifan Zhao
  2024-02-28 16:16 ` [PATCH v4 1/5] erofs-utils: introduce multi-threading framework Yifan Zhao
  2024-02-28 16:16 ` [PATCH v4 2/5] erofs-utils: add a helper to get available processors Yifan Zhao
@ 2024-02-28 16:16 ` Yifan Zhao
  2024-02-29  9:55   ` Gao Xiang
  2024-02-28 16:16 ` [PATCH v4 4/5] erofs-utils: lib: introduce atomic operations Yifan Zhao
  2024-02-28 16:16 ` [PATCH v4 5/5] erofs-utils: mkfs: introduce inner-file multi-threaded compression Yifan Zhao
  4 siblings, 1 reply; 10+ messages in thread
From: Yifan Zhao @ 2024-02-28 16:16 UTC (permalink / raw
  To: hsiangkao; +Cc: linux-erofs

This patch introduces a --worker=# parameter for the incoming
multi-threaded compression support. It also introduces a segment size
used in multi-threaded compression, which has the default value 16MB
and cannot be modified.

It also introduces a concept called `segment size` to split large files
for multi-threading, which has the default value 16MB for now.

Signed-off-by: Yifan Zhao <zhaoyifan@sjtu.edu.cn>
---
 include/erofs/config.h |  4 ++++
 lib/config.c           |  4 ++++
 mkfs/main.c            | 38 ++++++++++++++++++++++++++++++++++++++
 3 files changed, 46 insertions(+)

diff --git a/include/erofs/config.h b/include/erofs/config.h
index 73e3ac2..d2f91ff 100644
--- a/include/erofs/config.h
+++ b/include/erofs/config.h
@@ -75,6 +75,10 @@ struct erofs_configure {
 	char c_force_chunkformat;
 	/* < 0, xattr disabled and INT_MAX, always use inline xattrs */
 	int c_inline_xattr_tolerance;
+#ifdef EROFS_MT_ENABLED
+	u64 c_segment_size;
+	u32 c_mt_workers;
+#endif
 
 	u32 c_pclusterblks_max, c_pclusterblks_def, c_pclusterblks_packed;
 	u32 c_max_decompressed_extent_bytes;
diff --git a/lib/config.c b/lib/config.c
index 947a183..2530274 100644
--- a/lib/config.c
+++ b/lib/config.c
@@ -38,6 +38,10 @@ void erofs_init_configure(void)
 	cfg.c_pclusterblks_max = 1;
 	cfg.c_pclusterblks_def = 1;
 	cfg.c_max_decompressed_extent_bytes = -1;
+#ifdef EROFS_MT_ENABLED
+	cfg.c_segment_size = 16ULL * 1024 * 1024;
+	cfg.c_mt_workers = 1;
+#endif
 
 	erofs_stdout_tty = isatty(STDOUT_FILENO);
 }
diff --git a/mkfs/main.c b/mkfs/main.c
index 258c1ce..ce9c28b 100644
--- a/mkfs/main.c
+++ b/mkfs/main.c
@@ -74,6 +74,9 @@ static struct option long_options[] = {
 	{"ungzip", optional_argument, NULL, 517},
 #endif
 	{"offset", required_argument, NULL, 518},
+#ifdef EROFS_MT_ENABLED
+	{"workers", required_argument, NULL, 519},
+#endif
 	{0, 0, 0, 0},
 };
 
@@ -179,6 +182,9 @@ static void usage(int argc, char **argv)
 		" --product-out=X       X=product_out directory\n"
 		" --fs-config-file=X    X=fs_config file\n"
 		" --block-list-file=X   X=block_list file\n"
+#endif
+#ifdef EROFS_MT_ENABLED
+		" --workers=#            set the number of worker threads to # (default=1)\n"
 #endif
 		);
 }
@@ -408,6 +414,13 @@ static void erofs_rebuild_cleanup(void)
 	rebuild_src_count = 0;
 }
 
+#ifdef EROFS_MT_ENABLED
+static u32 mkfs_max_worker_num() {
+	u32 ncpu = erofs_get_available_processors();
+	return ncpu ? ncpu : 16;
+}
+#endif
+
 static int mkfs_parse_options_cfg(int argc, char *argv[])
 {
 	char *endptr;
@@ -650,6 +663,21 @@ static int mkfs_parse_options_cfg(int argc, char *argv[])
 				return -EINVAL;
 			}
 			break;
+#ifdef EROFS_MT_ENABLED
+		case 519:
+			cfg.c_mt_workers = strtoul(optarg, &endptr, 0);
+			if (errno || *endptr != '\0') {
+				erofs_err("invalid worker number %s", optarg);
+				return -EINVAL;
+			}
+			if (cfg.c_mt_workers > mkfs_max_worker_num()) {
+				erofs_warn(
+					"worker number %s is too large, setting to %ud",
+					optarg, mkfs_max_worker_num());
+				cfg.c_mt_workers = mkfs_max_worker_num();
+			}
+			break;
+#endif
 		case 'V':
 			version();
 			exit(0);
@@ -803,6 +831,16 @@ static int mkfs_parse_options_cfg(int argc, char *argv[])
 		}
 		cfg.c_pclusterblks_packed = pclustersize_packed >> sbi.blkszbits;
 	}
+
+#ifdef EROFS_MT_ENABLED
+	if (cfg.c_mt_workers > 1 &&
+	    (cfg.c_dedupe || cfg.c_fragments || cfg.c_ztailpacking)) {
+		cfg.c_mt_workers = 1;
+		erofs_warn("Please note that dedupe/fragments/ztailpacking"
+			   "is NOT supported in multi-threaded mode now, using worker=1.");
+	}
+#endif
+
 	return 0;
 }
 
-- 
2.44.0


^ permalink raw reply related	[flat|nested] 10+ messages in thread

* [PATCH v4 4/5] erofs-utils: lib: introduce atomic operations
  2024-02-28 16:16 [PATCH v4 0/5] erofs-utils: mkfs: introduce multi-threaded compression Yifan Zhao
                   ` (2 preceding siblings ...)
  2024-02-28 16:16 ` [PATCH v4 3/5] erofs-utils: mkfs: add --worker=# parameter Yifan Zhao
@ 2024-02-28 16:16 ` Yifan Zhao
  2024-02-28 16:16 ` [PATCH v4 5/5] erofs-utils: mkfs: introduce inner-file multi-threaded compression Yifan Zhao
  4 siblings, 0 replies; 10+ messages in thread
From: Yifan Zhao @ 2024-02-28 16:16 UTC (permalink / raw
  To: hsiangkao; +Cc: linux-erofs

From: Gao Xiang <hsiangkao@linux.alibaba.com>

Add some helpers (relaxed semantics) in order to prepare for the
upcoming multi-threaded support.

For example, compressor may be initialized more than once in different
worker threads, resulting in noisy warnings.

This patch makes sure that each message will be printed only once by
adding `__warnonce` atomic booleans to each erofs_compressor_init().

Cc: Yifan Zhao <zhaoyifan@sjtu.edu.cn>
Signed-off-by: Gao Xiang <hsiangkao@linux.alibaba.com>
Signed-off-by: Yifan Zhao <zhaoyifan@sjtu.edu.cn>
---
 include/erofs/atomic.h      | 28 ++++++++++++++++++++++++++++
 lib/compressor_deflate.c    | 11 ++++++++---
 lib/compressor_libdeflate.c |  6 +++++-
 lib/compressor_liblzma.c    |  5 ++++-
 4 files changed, 45 insertions(+), 5 deletions(-)
 create mode 100644 include/erofs/atomic.h

diff --git a/include/erofs/atomic.h b/include/erofs/atomic.h
new file mode 100644
index 0000000..214cdb1
--- /dev/null
+++ b/include/erofs/atomic.h
@@ -0,0 +1,28 @@
+/* SPDX-License-Identifier: GPL-2.0+ OR Apache-2.0 */
+/*
+ * Copyright (C) 2024 Alibaba Cloud
+ */
+#ifndef __EROFS_ATOMIC_H
+#define __EROFS_ATOMIC_H
+
+/*
+ * Just use GCC/clang built-in functions for now
+ * See: https://gcc.gnu.org/onlinedocs/gcc/_005f_005fatomic-Builtins.html
+ */
+typedef unsigned long erofs_atomic_t;
+typedef char erofs_atomic_bool_t;
+
+#define erofs_atomic_read(ptr) ({ \
+	typeof(*ptr) __n;    \
+	__atomic_load(ptr, &__n, __ATOMIC_RELAXED); \
+__n;})
+
+#define erofs_atomic_set(ptr, n) do { \
+	typeof(*ptr) __n = (n);    \
+	__atomic_store(ptr, &__n, __ATOMIC_RELAXED); \
+} while(0)
+
+#define erofs_atomic_test_and_set(ptr) \
+	__atomic_test_and_set(ptr, __ATOMIC_RELAXED)
+
+#endif
diff --git a/lib/compressor_deflate.c b/lib/compressor_deflate.c
index 8629415..e482224 100644
--- a/lib/compressor_deflate.c
+++ b/lib/compressor_deflate.c
@@ -7,6 +7,7 @@
 #include "erofs/print.h"
 #include "erofs/config.h"
 #include "compressor.h"
+#include "erofs/atomic.h"
 
 void *kite_deflate_init(int level, unsigned int dict_size);
 void kite_deflate_end(void *s);
@@ -36,6 +37,8 @@ static int compressor_deflate_exit(struct erofs_compress *c)
 
 static int compressor_deflate_init(struct erofs_compress *c)
 {
+	static erofs_atomic_bool_t __warnonce;
+
 	if (c->private_data) {
 		kite_deflate_end(c->private_data);
 		c->private_data = NULL;
@@ -44,9 +47,11 @@ static int compressor_deflate_init(struct erofs_compress *c)
 	if (IS_ERR_VALUE(c->private_data))
 		return PTR_ERR(c->private_data);
 
-	erofs_warn("EXPERIMENTAL DEFLATE algorithm in use. Use at your own risk!");
-	erofs_warn("*Carefully* check filesystem data correctness to avoid corruption!");
-	erofs_warn("Please send a report to <linux-erofs@lists.ozlabs.org> if something is wrong.");
+	if (!erofs_atomic_test_and_set(&__warnonce)) {
+		erofs_warn("EXPERIMENTAL DEFLATE algorithm in use. Use at your own risk!");
+		erofs_warn("*Carefully* check filesystem data correctness to avoid corruption!");
+		erofs_warn("Please send a report to <linux-erofs@lists.ozlabs.org> if something is wrong.");
+	}
 	return 0;
 }
 
diff --git a/lib/compressor_libdeflate.c b/lib/compressor_libdeflate.c
index 62d93f7..14cbce4 100644
--- a/lib/compressor_libdeflate.c
+++ b/lib/compressor_libdeflate.c
@@ -4,6 +4,7 @@
 #include "erofs/config.h"
 #include <libdeflate.h>
 #include "compressor.h"
+#include "erofs/atomic.h"
 
 static int libdeflate_compress_destsize(const struct erofs_compress *c,
 				        const void *src, unsigned int *srcsize,
@@ -82,12 +83,15 @@ static int compressor_libdeflate_exit(struct erofs_compress *c)
 
 static int compressor_libdeflate_init(struct erofs_compress *c)
 {
+	static erofs_atomic_bool_t __warnonce;
+
 	libdeflate_free_compressor(c->private_data);
 	c->private_data = libdeflate_alloc_compressor(c->compression_level);
 	if (!c->private_data)
 		return -ENOMEM;
 
-	erofs_warn("EXPERIMENTAL libdeflate compressor in use. Use at your own risk!");
+	if (!erofs_atomic_test_and_set(&__warnonce))
+		erofs_warn("EXPERIMENTAL libdeflate compressor in use. Use at your own risk!");
 	return 0;
 }
 
diff --git a/lib/compressor_liblzma.c b/lib/compressor_liblzma.c
index 712f44f..2f19a93 100644
--- a/lib/compressor_liblzma.c
+++ b/lib/compressor_liblzma.c
@@ -9,6 +9,7 @@
 #include "erofs/config.h"
 #include "erofs/print.h"
 #include "erofs/internal.h"
+#include "erofs/atomic.h"
 #include "compressor.h"
 
 struct erofs_liblzma_context {
@@ -85,6 +86,7 @@ static int erofs_compressor_liblzma_init(struct erofs_compress *c)
 {
 	struct erofs_liblzma_context *ctx;
 	u32 preset;
+	static erofs_atomic_bool_t __warnonce;
 
 	ctx = malloc(sizeof(*ctx));
 	if (!ctx)
@@ -103,7 +105,8 @@ static int erofs_compressor_liblzma_init(struct erofs_compress *c)
 	ctx->opt.dict_size = c->dict_size;
 
 	c->private_data = ctx;
-	erofs_warn("It may take a longer time since MicroLZMA is still single-threaded for now.");
+	if (!erofs_atomic_test_and_set(&__warnonce))
+		erofs_warn("It may take a longer time since MicroLZMA is still single-threaded for now.");
 	return 0;
 }
 
-- 
2.44.0


^ permalink raw reply related	[flat|nested] 10+ messages in thread

* [PATCH v4 5/5] erofs-utils: mkfs: introduce inner-file multi-threaded compression
  2024-02-28 16:16 [PATCH v4 0/5] erofs-utils: mkfs: introduce multi-threaded compression Yifan Zhao
                   ` (3 preceding siblings ...)
  2024-02-28 16:16 ` [PATCH v4 4/5] erofs-utils: lib: introduce atomic operations Yifan Zhao
@ 2024-02-28 16:16 ` Yifan Zhao
  4 siblings, 0 replies; 10+ messages in thread
From: Yifan Zhao @ 2024-02-28 16:16 UTC (permalink / raw
  To: hsiangkao; +Cc: linux-erofs

Currently, the creation of EROFS compressed image creation is
single-threaded, which suffers from performance issues. This patch
attempts to address it by compressing the large file in parallel.

Specifically, each input file larger than 16MB is splited into segments,
and each worker thread compresses a segment as if it were a separate
file. Finally, the main thread merges all the compressed segments.

Multi-threaded compression is not compatible with -Ededupe,
-E(all-)fragments and -Eztailpacking for now.

Signed-off-by: Yifan Zhao <zhaoyifan@sjtu.edu.cn>
Co-authored-by: Tong Xin <xin_tong@sjtu.edu.cn>
---
 include/erofs/compress.h |   1 +
 lib/compress.c           | 703 ++++++++++++++++++++++++++++++++-------
 lib/compressor.c         |   2 +
 3 files changed, 580 insertions(+), 126 deletions(-)

diff --git a/include/erofs/compress.h b/include/erofs/compress.h
index 046640b..2699334 100644
--- a/include/erofs/compress.h
+++ b/include/erofs/compress.h
@@ -15,6 +15,7 @@ extern "C"
 #include "internal.h"
 
 #define EROFS_CONFIG_COMPR_MAX_SZ           (4000 * 1024)
+#define EROFS_COMPR_QUEUE_SZ (EROFS_CONFIG_COMPR_MAX_SZ * 2)
 
 void z_erofs_drop_inline_pcluster(struct erofs_inode *inode);
 int erofs_write_compressed_file(struct erofs_inode *inode, int fd);
diff --git a/lib/compress.c b/lib/compress.c
index 9611102..f586117 100644
--- a/lib/compress.c
+++ b/lib/compress.c
@@ -8,6 +8,9 @@
 #ifndef _LARGEFILE64_SOURCE
 #define _LARGEFILE64_SOURCE
 #endif
+#ifndef _GNU_SOURCE
+#define _GNU_SOURCE
+#endif
 #include <string.h>
 #include <stdlib.h>
 #include <unistd.h>
@@ -20,6 +23,12 @@
 #include "erofs/block_list.h"
 #include "erofs/compress_hints.h"
 #include "erofs/fragments.h"
+#ifdef EROFS_MT_ENABLED
+#include "erofs/workqueue.h"
+#endif
+#ifdef HAVE_LINUX_FALLOC_H
+#include <linux/falloc.h>
+#endif
 
 /* compressing configuration specified by users */
 struct erofs_compress_cfg {
@@ -33,29 +42,81 @@ struct z_erofs_extent_item {
 	struct z_erofs_inmem_extent e;
 };
 
-struct z_erofs_vle_compress_ctx {
-	u8 queue[EROFS_CONFIG_COMPR_MAX_SZ * 2];
+struct z_erofs_compressed_inode_ctx {
+	struct erofs_inode *inode;
+	int fd;
+	unsigned int pclustersize;
+
+	u32 tof_chksum;
+	bool fix_dedupedfrag;
+	bool fragemitted;
+
+	/* fields for write indexes */
+	u8 *metacur;
+	struct list_head *extents;
+	u16 clusterofs;
+};
+
+struct z_erofs_compressed_segment_ctx {
+	struct z_erofs_compressed_inode_ctx *ictx;
+
+	u8 *queue;
 	struct list_head extents;
 	struct z_erofs_extent_item *pivot;
 
-	struct erofs_inode *inode;
-	struct erofs_compress_cfg *ccfg;
+	struct erofs_compress *chandle;
+	char *destbuf;
 
-	u8 *metacur;
 	unsigned int head, tail;
 	erofs_off_t remaining;
-	unsigned int pclustersize;
 	erofs_blk_t blkaddr;		/* pointing to the next blkaddr */
+	erofs_blk_t compressed_blocks;
 	u16 clusterofs;
 
-	u32 tof_chksum;
-	bool fix_dedupedfrag;
-	bool fragemitted;
+	int seg_num, seg_idx;
+	FILE *tmpfile;
+	off_t tmpfile_off;
+};
+
+#ifdef EROFS_MT_ENABLED
+struct erofs_compress_wq_private {
+	bool init;
+	u8 *queue;
+	char *destbuf;
+	struct erofs_compress_cfg *ccfg;
+	FILE* tmpfile;
+};
+
+struct erofs_compress_work {
+	/* Note: struct erofs_work must be the first member */
+	struct erofs_work work;
+	struct z_erofs_compressed_segment_ctx ctx;
+
+	unsigned int alg_id;
+	char *alg_name;
+	unsigned int comp_level;
+	unsigned int dict_size;
+
+	int ret;
+
+	struct erofs_compress_work *next;
 };
 
+static struct {
+	struct erofs_workqueue wq;
+	struct erofs_compress_work *idle;
+	pthread_mutex_t mutex;
+	pthread_cond_t cond;
+	int nfini;
+} z_erofs_mt_ctrl;
+#endif
+
+static bool z_erofs_mt_enabled;
+static u8 *z_erofs_global_queue;
+
 #define Z_EROFS_LEGACY_MAP_HEADER_SIZE	Z_EROFS_FULL_INDEX_ALIGN(0)
 
-static void z_erofs_write_indexes_final(struct z_erofs_vle_compress_ctx *ctx)
+static void z_erofs_write_indexes_final(struct z_erofs_compressed_inode_ctx *ctx)
 {
 	const unsigned int type = Z_EROFS_LCLUSTER_TYPE_PLAIN;
 	struct z_erofs_lcluster_index di;
@@ -71,7 +132,7 @@ static void z_erofs_write_indexes_final(struct z_erofs_vle_compress_ctx *ctx)
 	ctx->metacur += sizeof(di);
 }
 
-static void z_erofs_write_extent(struct z_erofs_vle_compress_ctx *ctx,
+static void z_erofs_write_extent(struct z_erofs_compressed_inode_ctx *ctx,
 				 struct z_erofs_inmem_extent *e)
 {
 	struct erofs_inode *inode = ctx->inode;
@@ -170,12 +231,12 @@ static void z_erofs_write_extent(struct z_erofs_vle_compress_ctx *ctx,
 	ctx->clusterofs = clusterofs + count;
 }
 
-static void z_erofs_write_indexes(struct z_erofs_vle_compress_ctx *ctx)
+static void z_erofs_write_indexes(struct z_erofs_compressed_inode_ctx *ctx)
 {
 	struct z_erofs_extent_item *ei, *n;
 
 	ctx->clusterofs = 0;
-	list_for_each_entry_safe(ei, n, &ctx->extents, list) {
+	list_for_each_entry_safe(ei, n, ctx->extents, list) {
 		z_erofs_write_extent(ctx, &ei->e);
 
 		list_del(&ei->list);
@@ -184,15 +245,16 @@ static void z_erofs_write_indexes(struct z_erofs_vle_compress_ctx *ctx)
 	z_erofs_write_indexes_final(ctx);
 }
 
-static bool z_erofs_need_refill(struct z_erofs_vle_compress_ctx *ctx)
+static bool z_erofs_need_refill(struct z_erofs_compressed_segment_ctx *ctx)
 {
 	const bool final = !ctx->remaining;
 	unsigned int qh_aligned, qh_after;
+	struct erofs_inode *inode = ctx->ictx->inode;
 
 	if (final || ctx->head < EROFS_CONFIG_COMPR_MAX_SZ)
 		return false;
 
-	qh_aligned = round_down(ctx->head, erofs_blksiz(ctx->inode->sbi));
+	qh_aligned = round_down(ctx->head, erofs_blksiz(inode->sbi));
 	qh_after = ctx->head - qh_aligned;
 	memmove(ctx->queue, ctx->queue + qh_aligned, ctx->tail - qh_aligned);
 	ctx->tail -= qh_aligned;
@@ -204,7 +266,7 @@ static struct z_erofs_extent_item dummy_pivot = {
 	.e.length = 0
 };
 
-static void z_erofs_commit_extent(struct z_erofs_vle_compress_ctx *ctx,
+static void z_erofs_commit_extent(struct z_erofs_compressed_segment_ctx *ctx,
 				  struct z_erofs_extent_item *ei)
 {
 	if (ei == &dummy_pivot)
@@ -212,14 +274,13 @@ static void z_erofs_commit_extent(struct z_erofs_vle_compress_ctx *ctx,
 
 	list_add_tail(&ei->list, &ctx->extents);
 	ctx->clusterofs = (ctx->clusterofs + ei->e.length) &
-			(erofs_blksiz(ctx->inode->sbi) - 1);
-
+			  (erofs_blksiz(ctx->ictx->inode->sbi) - 1);
 }
 
-static int z_erofs_compress_dedupe(struct z_erofs_vle_compress_ctx *ctx,
+static int z_erofs_compress_dedupe(struct z_erofs_compressed_segment_ctx *ctx,
 				   unsigned int *len)
 {
-	struct erofs_inode *inode = ctx->inode;
+	struct erofs_inode *inode = ctx->ictx->inode;
 	const unsigned int lclustermask = (1 << inode->z_logical_clusterbits) - 1;
 	struct erofs_sb_info *sbi = inode->sbi;
 	struct z_erofs_extent_item *ei = ctx->pivot;
@@ -315,16 +376,17 @@ out:
 	return 0;
 }
 
-static int write_uncompressed_extent(struct z_erofs_vle_compress_ctx *ctx,
+static int write_uncompressed_extent(struct z_erofs_compressed_segment_ctx *ctx,
 				     unsigned int len, char *dst)
 {
-	struct erofs_sb_info *sbi = ctx->inode->sbi;
+	struct erofs_inode *inode = ctx->ictx->inode;
+	struct erofs_sb_info *sbi = inode->sbi;
 	unsigned int count = min(erofs_blksiz(sbi), len);
 	unsigned int interlaced_offset, rightpart;
 	int ret;
 
 	/* write interlaced uncompressed data if needed */
-	if (ctx->inode->z_advise & Z_EROFS_ADVISE_INTERLACED_PCLUSTER)
+	if (inode->z_advise & Z_EROFS_ADVISE_INTERLACED_PCLUSTER)
 		interlaced_offset = ctx->clusterofs;
 	else
 		interlaced_offset = 0;
@@ -335,11 +397,19 @@ static int write_uncompressed_extent(struct z_erofs_vle_compress_ctx *ctx,
 	memcpy(dst + interlaced_offset, ctx->queue + ctx->head, rightpart);
 	memcpy(dst, ctx->queue + ctx->head + rightpart, count - rightpart);
 
-	erofs_dbg("Writing %u uncompressed data to block %u",
-		  count, ctx->blkaddr);
-	ret = blk_write(sbi, dst, ctx->blkaddr, 1);
-	if (ret)
-		return ret;
+	if (ctx->tmpfile) {
+		erofs_dbg("Writing %u uncompressed data to tmpfile", count);
+		ret = fwrite(dst, erofs_blksiz(sbi), 1, ctx->tmpfile);
+		if (ret != 1)
+			return -EIO;
+		fflush(ctx->tmpfile);
+	} else {
+		erofs_dbg("Writing %u uncompressed data to block %u", count,
+			  ctx->blkaddr);
+		ret = blk_write(sbi, dst, ctx->blkaddr, 1);
+		if (ret)
+			return ret;
+	}
 	return count;
 }
 
@@ -379,12 +449,12 @@ static int z_erofs_fill_inline_data(struct erofs_inode *inode, void *data,
 	return len;
 }
 
-static void tryrecompress_trailing(struct z_erofs_vle_compress_ctx *ctx,
-				   struct erofs_compress *ec,
-				   void *in, unsigned int *insize,
-				   void *out, unsigned int *compressedsize)
+static void tryrecompress_trailing(struct z_erofs_compressed_segment_ctx *ctx,
+				   struct erofs_compress *ec, void *in,
+				   unsigned int *insize, void *out,
+				   unsigned int *compressedsize)
 {
-	struct erofs_sb_info *sbi = ctx->inode->sbi;
+	struct erofs_sb_info *sbi = ctx->ictx->inode->sbi;
 	static char tmp[Z_EROFS_PCLUSTER_MAX_SIZE];
 	unsigned int count;
 	int ret = *compressedsize;
@@ -406,10 +476,10 @@ static void tryrecompress_trailing(struct z_erofs_vle_compress_ctx *ctx,
 	*compressedsize = ret;
 }
 
-static bool z_erofs_fixup_deduped_fragment(struct z_erofs_vle_compress_ctx *ctx,
+static bool z_erofs_fixup_deduped_fragment(struct z_erofs_compressed_segment_ctx *ctx,
 					   unsigned int len)
 {
-	struct erofs_inode *inode = ctx->inode;
+	struct erofs_inode *inode = ctx->ictx->inode;
 	struct erofs_sb_info *sbi = inode->sbi;
 	const unsigned int newsize = ctx->remaining + len;
 
@@ -417,9 +487,10 @@ static bool z_erofs_fixup_deduped_fragment(struct z_erofs_vle_compress_ctx *ctx,
 
 	/* try to fix again if it gets larger (should be rare) */
 	if (inode->fragment_size < newsize) {
-		ctx->pclustersize = min_t(erofs_off_t, z_erofs_get_max_pclustersize(inode),
-					  roundup(newsize - inode->fragment_size,
-						  erofs_blksiz(sbi)));
+		ctx->ictx->pclustersize =
+			min_t(erofs_off_t, z_erofs_get_max_pclustersize(inode),
+			      roundup(newsize - inode->fragment_size,
+				      erofs_blksiz(sbi)));
 		return false;
 	}
 
@@ -436,29 +507,34 @@ static bool z_erofs_fixup_deduped_fragment(struct z_erofs_vle_compress_ctx *ctx,
 	return true;
 }
 
-static int __z_erofs_compress_one(struct z_erofs_vle_compress_ctx *ctx,
+static int __z_erofs_compress_one(struct z_erofs_compressed_segment_ctx *ctx,
 				  struct z_erofs_inmem_extent *e)
 {
-	static char dstbuf[EROFS_CONFIG_COMPR_MAX_SZ + EROFS_MAX_BLOCK_SIZE];
-	struct erofs_inode *inode = ctx->inode;
+	static char
+		global_dstbuf[EROFS_CONFIG_COMPR_MAX_SZ + EROFS_MAX_BLOCK_SIZE];
+	char *dstbuf = ctx->destbuf ?: global_dstbuf;
+	struct z_erofs_compressed_inode_ctx *ictx = ctx->ictx;
+	struct erofs_inode *inode = ictx->inode;
 	struct erofs_sb_info *sbi = inode->sbi;
 	unsigned int blksz = erofs_blksiz(sbi);
 	char *const dst = dstbuf + blksz;
-	struct erofs_compress *const h = &ctx->ccfg->handle;
+	struct erofs_compress *const h = ctx->chandle;
 	unsigned int len = ctx->tail - ctx->head;
 	bool is_packed_inode = erofs_is_packed_inode(inode);
 	bool final = !ctx->remaining;
-	bool may_packing = (cfg.c_fragments && final && !is_packed_inode);
-	bool may_inline = (cfg.c_ztailpacking && final && !may_packing);
+	bool may_packing = (cfg.c_fragments && final && !is_packed_inode &&
+			    !z_erofs_mt_enabled);
+	bool may_inline = (cfg.c_ztailpacking && final && !may_packing &&
+			   !z_erofs_mt_enabled);
 	unsigned int compressedsize;
 	int ret;
 
-	if (len <= ctx->pclustersize) {
+	if (len <= ictx->pclustersize) {
 		if (!final || !len)
 			return 1;
 		if (may_packing) {
-			if (inode->fragment_size && !ctx->fix_dedupedfrag) {
-				ctx->pclustersize = roundup(len, blksz);
+			if (inode->fragment_size && !ictx->fix_dedupedfrag) {
+				ictx->pclustersize = roundup(len, blksz);
 				goto fix_dedupedfrag;
 			}
 			e->length = len;
@@ -470,7 +546,7 @@ static int __z_erofs_compress_one(struct z_erofs_vle_compress_ctx *ctx,
 
 	e->length = min(len, cfg.c_max_decompressed_extent_bytes);
 	ret = erofs_compress_destsize(h, ctx->queue + ctx->head,
-				      &e->length, dst, ctx->pclustersize);
+				      &e->length, dst, ictx->pclustersize);
 	if (ret <= 0) {
 		erofs_err("failed to compress %s: %s", inode->i_srcpath,
 			  erofs_strerror(ret));
@@ -507,16 +583,16 @@ nocompression:
 		e->compressedblks = 1;
 		e->raw = true;
 	} else if (may_packing && len == e->length &&
-		   compressedsize < ctx->pclustersize &&
-		   (!inode->fragment_size || ctx->fix_dedupedfrag)) {
+		   compressedsize < ictx->pclustersize &&
+		   (!inode->fragment_size || ictx->fix_dedupedfrag)) {
 frag_packing:
 		ret = z_erofs_pack_fragments(inode, ctx->queue + ctx->head,
-					     len, ctx->tof_chksum);
+					     len, ictx->tof_chksum);
 		if (ret < 0)
 			return ret;
 		e->compressedblks = 0; /* indicate a fragment */
 		e->raw = false;
-		ctx->fragemitted = true;
+		ictx->fragemitted = true;
 	/* tailpcluster should be less than 1 block */
 	} else if (may_inline && len == e->length && compressedsize < blksz) {
 		if (ctx->clusterofs + len <= blksz) {
@@ -545,8 +621,8 @@ frag_packing:
 		 */
 		if (may_packing && len == e->length &&
 		    (compressedsize & (blksz - 1)) &&
-		    ctx->tail < sizeof(ctx->queue)) {
-			ctx->pclustersize = roundup(compressedsize, blksz);
+		    ctx->tail < EROFS_COMPR_QUEUE_SZ) {
+			ictx->pclustersize = roundup(compressedsize, blksz);
 			goto fix_dedupedfrag;
 		}
 
@@ -569,13 +645,24 @@ frag_packing:
 		}
 
 		/* write compressed data */
-		erofs_dbg("Writing %u compressed data to %u of %u blocks",
-			  e->length, ctx->blkaddr, e->compressedblks);
+		if (ctx->tmpfile) {
+			erofs_dbg("Writing %u compressed data to tmpfile of %u blocks",
+				  e->length, e->compressedblks);
+
+			ret = fwrite(dst - padding, erofs_blksiz(sbi),
+				     e->compressedblks, ctx->tmpfile);
+			if (ret != e->compressedblks)
+				return -EIO;
+			fflush(ctx->tmpfile);
+		} else {
+			erofs_dbg("Writing %u compressed data to %u of %u blocks",
+				  e->length, ctx->blkaddr, e->compressedblks);
 
-		ret = blk_write(sbi, dst - padding, ctx->blkaddr,
-				e->compressedblks);
-		if (ret)
-			return ret;
+			ret = blk_write(sbi, dst - padding, ctx->blkaddr,
+					e->compressedblks);
+			if (ret)
+				return ret;
+		}
 		e->raw = false;
 		may_inline = false;
 		may_packing = false;
@@ -591,14 +678,15 @@ frag_packing:
 fix_dedupedfrag:
 	DBG_BUGON(!inode->fragment_size);
 	ctx->remaining += inode->fragment_size;
-	ctx->fix_dedupedfrag = true;
+	ictx->fix_dedupedfrag = true;
 	return 1;
 }
 
-static int z_erofs_compress_one(struct z_erofs_vle_compress_ctx *ctx)
+static int z_erofs_compress_one(struct z_erofs_compressed_segment_ctx *ctx)
 {
 	unsigned int len = ctx->tail - ctx->head;
 	struct z_erofs_extent_item *ei;
+	struct z_erofs_compressed_inode_ctx *ictx = ctx->ictx;
 
 	while (len) {
 		int ret = z_erofs_compress_dedupe(ctx, &len);
@@ -624,7 +712,7 @@ static int z_erofs_compress_one(struct z_erofs_vle_compress_ctx *ctx)
 
 		len -= ei->e.length;
 		ctx->pivot = ei;
-		if (ctx->fix_dedupedfrag && !ctx->fragemitted &&
+		if (ictx->fix_dedupedfrag && !ictx->fragemitted &&
 		    z_erofs_fixup_deduped_fragment(ctx, len))
 			break;
 
@@ -912,11 +1000,328 @@ void z_erofs_drop_inline_pcluster(struct erofs_inode *inode)
 	inode->eof_tailraw = NULL;
 }
 
+int z_erofs_compress_file(struct z_erofs_compressed_segment_ctx *ctx,
+			  u64 offset, erofs_blk_t blkaddr)
+{
+	struct z_erofs_compressed_inode_ctx *ictx = ctx->ictx;
+	struct erofs_inode *inode = ictx->inode;
+	int ret = 0;
+
+	while (ctx->remaining) {
+		const u64 rx = min_t(u64, ctx->remaining,
+				     EROFS_COMPR_QUEUE_SZ - ctx->tail);
+
+		ret = pread(ictx->fd, ctx->queue + ctx->tail, rx, offset);
+		if (ret != rx)
+			return -errno;
+		ctx->remaining -= rx;
+		ctx->tail += rx;
+		offset += rx;
+
+		ret = z_erofs_compress_one(ctx);
+		if (ret)
+			return ret;
+	}
+	DBG_BUGON(ctx->head != ctx->tail);
+
+	ctx->compressed_blocks = ctx->blkaddr - blkaddr;
+	DBG_BUGON(ctx->compressed_blocks < !!inode->idata_size);
+	ctx->compressed_blocks -= !!inode->idata_size;
+
+	if (ctx->pivot) {
+		z_erofs_commit_extent(ctx, ctx->pivot);
+		ctx->pivot = NULL;
+	}
+
+	return 0;
+}
+
+#ifdef EROFS_MT_ENABLED
+#if defined(HAVE_FALLOCATE) && defined(FALLOC_FL_PUNCH_HOLE)
+#define USE_PER_WORKER_TMPFILE 1
+#endif
+
+int z_erofs_mt_private_init(struct erofs_sb_info *sbi,
+			    struct erofs_compress_wq_private *priv,
+			    unsigned int alg_id, char *alg_name,
+			    unsigned int comp_level, unsigned int dict_size)
+{
+	struct erofs_compress_cfg *lc;
+	int ret;
+
+	if (unlikely(!priv->init)) {
+		priv->init = true;
+
+		priv->queue = malloc(EROFS_COMPR_QUEUE_SZ);
+		if (!priv->queue)
+			return -ENOMEM;
+
+		priv->destbuf = calloc(1, EROFS_CONFIG_COMPR_MAX_SZ +
+						  EROFS_MAX_BLOCK_SIZE);
+		if (!priv->destbuf)
+			return -ENOMEM;
+
+		priv->ccfg = calloc(EROFS_MAX_COMPR_CFGS,
+				    sizeof(struct erofs_compress_cfg));
+		if (!priv->ccfg)
+			return -ENOMEM;
+#ifdef USE_PER_WORKER_TMPFILE
+#ifndef HAVE_TMPFILE64
+		priv->tmpfile = tmpfile();
+#else
+		priv->tmpfile = tmpfile64();
+#endif
+		if (!priv->tmpfile)
+			return -errno;
+#endif
+	}
+
+	lc = &priv->ccfg[alg_id];
+	if (!lc->enable) {
+		lc->enable = true;
+		lc->algorithmtype = alg_id;
+
+		ret = erofs_compressor_init(sbi, &lc->handle, alg_name,
+					    comp_level, dict_size);
+		if (ret)
+			return ret;
+	}
+
+	return 0;
+}
+
+void z_erofs_mt_private_fini(void *private)
+{
+	struct erofs_compress_wq_private *priv = private;
+	int i;
+
+	if (priv->init) {
+		for (i = 0; i < EROFS_MAX_COMPR_CFGS; i++) {
+			if (priv->ccfg[i].enable)
+				erofs_compressor_exit(&priv->ccfg[i].handle);
+		}
+		free(priv->ccfg);
+		free(priv->destbuf);
+		free(priv->queue);
+#ifdef USE_PER_WORKER_TMPFILE
+		fclose(priv->tmpfile);
+#endif
+		priv->init = false;
+	}
+}
+
+void z_erofs_mt_work(struct erofs_work *work)
+{
+	struct erofs_compress_work *cwork = (struct erofs_compress_work *)work;
+	struct z_erofs_compressed_segment_ctx *ctx = &cwork->ctx;
+	struct erofs_compress_wq_private *priv = work->priv;
+	erofs_blk_t blkaddr = ctx->blkaddr;
+	u64 offset = ctx->seg_idx * cfg.c_segment_size;
+	int ret = 0;
+
+	ret = z_erofs_mt_private_init(ctx->ictx->inode->sbi, priv,
+				      cwork->alg_id, cwork->alg_name,
+				      cwork->comp_level, cwork->dict_size);
+	if (ret)
+		goto out;
+
+	ctx->queue = priv->queue;
+	ctx->destbuf = priv->destbuf;
+	ctx->chandle = &priv->ccfg[cwork->alg_id].handle;
+#ifdef USE_PER_WORKER_TMPFILE
+	ctx->tmpfile = priv->tmpfile;
+	ctx->tmpfile_off = ftell(ctx->tmpfile);
+	if (ctx->tmpfile_off == -1) {
+		ret = -errno;
+		goto out;
+	}
+#else
+#ifdef HAVE_TMPFILE64
+	ctx->tmpfile = tmpfile64();
+#else
+	ctx->tmpfile = tmpfile();
+#endif
+	if (!ctx->tmpfile) {
+		ret = -errno;
+		goto out;
+	}
+	ctx->tmpfile_off = 0;
+#endif
+
+	ret = z_erofs_compress_file(ctx, offset, blkaddr);
+	if (ret)
+		goto out;
+
+	fflush(ctx->tmpfile);
+
+out:
+	cwork->ret = ret;
+	pthread_mutex_lock(&z_erofs_mt_ctrl.mutex);
+	++z_erofs_mt_ctrl.nfini;
+	pthread_cond_signal(&z_erofs_mt_ctrl.cond);
+	pthread_mutex_unlock(&z_erofs_mt_ctrl.mutex);
+}
+
+int z_erofs_mt_merge(struct erofs_compress_work *cur, erofs_blk_t blkaddr,
+		     erofs_blk_t *compressed_blocks)
+{
+	struct z_erofs_compressed_segment_ctx *ctx, *listhead = NULL;
+	struct erofs_sb_info *sbi = cur->ctx.ictx->inode->sbi;
+	struct erofs_compress_work *tmp;
+	char *memblock = NULL;
+	size_t size = 0;
+	int ret = 0, lret;
+
+	while (cur != NULL) {
+		ctx = &cur->ctx;
+
+		if (!listhead)
+			listhead = ctx;
+		else
+			list_splice_tail(&ctx->extents, &listhead->extents);
+
+		if (cur->ret != 0) {
+			if (!ret)
+				ret = cur->ret;
+			goto out;
+		}
+
+		size = ctx->compressed_blocks * erofs_blksiz(sbi);
+		memblock = realloc(memblock, size);
+		if (!memblock) {
+			if (!ret)
+				ret = -ENOMEM;
+			goto out;
+		}
+
+		lret = pread(fileno(ctx->tmpfile), memblock, size,
+			     ctx->tmpfile_off);
+		if (lret != size) {
+			if (!ret)
+				ret = errno ? -errno : -EIO;
+			goto out;
+		}
+
+#ifdef USE_PER_WORKER_TMPFILE
+		lret = fallocate(fileno(ctx->tmpfile),
+				 FALLOC_FL_PUNCH_HOLE | FALLOC_FL_KEEP_SIZE,
+				 ctx->tmpfile_off, size);
+		if (lret) {
+			if (!ret)
+				ret = -errno;
+			goto out;
+		}
+#endif
+
+		lret = blk_write(sbi, memblock, blkaddr + *compressed_blocks,
+				 ctx->compressed_blocks);
+		if (lret) {
+			if (!ret)
+				ret = lret;
+			goto out;
+		}
+		*compressed_blocks += ctx->compressed_blocks;
+
+out:
+#ifndef USE_PER_WORKER_TMPFILE
+		fclose(ctx->tmpfile);
+#endif
+		tmp = cur->next;
+		cur->next = z_erofs_mt_ctrl.idle;
+		z_erofs_mt_ctrl.idle = cur;
+		cur = tmp;
+	}
+
+	free(memblock);
+
+	return ret;
+}
+
+void z_erofs_mt_fix_index(struct z_erofs_compressed_inode_ctx *ctx,
+			  erofs_blk_t blkaddr)
+{
+	struct z_erofs_extent_item *ei;
+	erofs_blk_t blkoff = 0;
+
+	list_for_each_entry(ei, ctx->extents, list) {
+		ei->e.blkaddr = blkaddr + blkoff;
+		blkoff += ei->e.compressedblks;
+	}
+}
+
+int z_erofs_mt_compress(struct z_erofs_compressed_segment_ctx *ctx,
+			struct erofs_compress_cfg *ccfg,
+			erofs_blk_t *compressed_blocks)
+{
+	struct erofs_compress_work *work, *head = NULL, **last = &head;
+	struct z_erofs_compressed_inode_ctx *ictx = ctx->ictx;
+	struct erofs_inode *inode = ictx->inode;
+	erofs_blk_t blkaddr = ctx->blkaddr;
+	int nsegs = DIV_ROUND_UP(inode->i_size, cfg.c_segment_size);
+	int ret;
+
+	z_erofs_mt_ctrl.nfini = 0;
+
+	for (int i = 0; i < nsegs; i++) {
+		if (z_erofs_mt_ctrl.idle) {
+			work = z_erofs_mt_ctrl.idle;
+			z_erofs_mt_ctrl.idle = work->next;
+			work->next = NULL;
+		} else {
+			work = calloc(1, sizeof(*work));
+			if (!work)
+				return -ENOMEM;
+		}
+		*last = work;
+		last = &work->next;
+
+		memset(&work->ctx, 0, sizeof(work->ctx));
+		if (i == nsegs - 1)
+			work->ctx.remaining = inode->i_size -
+					      inode->fragment_size -
+					      i * cfg.c_segment_size;
+		else
+			work->ctx.remaining = cfg.c_segment_size;
+		work->ctx.seg_num = nsegs;
+		work->ctx.seg_idx = i;
+		work->ctx.blkaddr = blkaddr;
+		init_list_head(&work->ctx.extents);
+		work->ctx.ictx = ictx;
+
+		work->alg_id = ccfg->handle.alg->id;
+		work->alg_name = ccfg->handle.alg->name;
+		work->comp_level = ccfg->handle.compression_level;
+		work->dict_size = ccfg->handle.dict_size;
+
+		work->work.func = z_erofs_mt_work;
+
+		erofs_queue_work(&z_erofs_mt_ctrl.wq, &work->work);
+	}
+
+	pthread_mutex_lock(&z_erofs_mt_ctrl.mutex);
+	while (z_erofs_mt_ctrl.nfini != nsegs)
+		pthread_cond_wait(&z_erofs_mt_ctrl.cond,
+				  &z_erofs_mt_ctrl.mutex);
+	pthread_mutex_unlock(&z_erofs_mt_ctrl.mutex);
+
+	ictx->extents = &head->ctx.extents;
+
+	ret = z_erofs_mt_merge(head, blkaddr, compressed_blocks);
+	if (ret)
+		return ret;
+
+	z_erofs_mt_fix_index(ictx, blkaddr);
+	return 0;
+}
+#endif
+
 int erofs_write_compressed_file(struct erofs_inode *inode, int fd)
 {
 	struct erofs_buffer_head *bh;
-	static struct z_erofs_vle_compress_ctx ctx;
-	erofs_blk_t blkaddr, compressed_blocks;
+	static struct z_erofs_compressed_inode_ctx ictx;
+	static struct z_erofs_compressed_segment_ctx ctx;
+	struct erofs_compress_cfg *ccfg;
+	erofs_blk_t blkaddr, compressed_blocks = 0;
 	unsigned int legacymetasize;
 	int ret;
 	struct erofs_sb_info *sbi = inode->sbi;
@@ -963,8 +1368,8 @@ int erofs_write_compressed_file(struct erofs_inode *inode, int fd)
 		}
 	}
 #endif
-	ctx.ccfg = &erofs_ccfg[inode->z_algorithmtype[0]];
-	inode->z_algorithmtype[0] = ctx.ccfg[0].algorithmtype;
+	ccfg = &erofs_ccfg[inode->z_algorithmtype[0]];
+	inode->z_algorithmtype[0] = ccfg[0].algorithmtype;
 	inode->z_algorithmtype[1] = 0;
 
 	inode->idata_size = 0;
@@ -975,82 +1380,87 @@ int erofs_write_compressed_file(struct erofs_inode *inode, int fd)
 	 * parts into the packed inode.
 	 */
 	if (cfg.c_fragments && !erofs_is_packed_inode(inode)) {
-		ret = z_erofs_fragments_dedupe(inode, fd, &ctx.tof_chksum);
+		ret = z_erofs_fragments_dedupe(inode, fd, &ictx.tof_chksum);
 		if (ret < 0)
 			goto err_bdrop;
 	}
 
 	blkaddr = erofs_mapbh(bh->block);	/* start_blkaddr */
-	ctx.inode = inode;
-	ctx.pclustersize = z_erofs_get_max_pclustersize(inode);
+
+	ictx.inode = inode;
+	ictx.fd = fd;
+	ictx.fix_dedupedfrag = false;
+	ictx.fragemitted = false;
+	ictx.pclustersize = z_erofs_get_max_pclustersize(inode);
+	ictx.metacur = compressmeta + Z_EROFS_LEGACY_MAP_HEADER_SIZE;
+
+	memset(&ctx, 0, sizeof(ctx));
+	ctx.ictx = &ictx;
 	ctx.blkaddr = blkaddr;
-	ctx.metacur = compressmeta + Z_EROFS_LEGACY_MAP_HEADER_SIZE;
-	ctx.head = ctx.tail = 0;
-	ctx.clusterofs = 0;
-	ctx.pivot = &dummy_pivot;
 	init_list_head(&ctx.extents);
-	ctx.remaining = inode->i_size - inode->fragment_size;
-	ctx.fix_dedupedfrag = false;
-	ctx.fragemitted = false;
-	if (cfg.c_all_fragments && !erofs_is_packed_inode(inode) &&
-	    !inode->fragment_size) {
-		ret = z_erofs_pack_file_from_fd(inode, fd, ctx.tof_chksum);
+
+	if (z_erofs_mt_enabled) {
+#ifdef EROFS_MT_ENABLED
+		if (inode->i_size <= cfg.c_segment_size)
+			goto single_thread_comp;
+
+		ret = z_erofs_mt_compress(&ctx, ccfg, &compressed_blocks);
 		if (ret)
 			goto err_free_idata;
+#endif
 	} else {
-		while (ctx.remaining) {
-			const u64 rx = min_t(u64, ctx.remaining,
-					     sizeof(ctx.queue) - ctx.tail);
-
-			ret = read(fd, ctx.queue + ctx.tail, rx);
-			if (ret != rx) {
-				ret = -errno;
-				goto err_bdrop;
-			}
-			ctx.remaining -= rx;
-			ctx.tail += rx;
+		if (cfg.c_all_fragments && !erofs_is_packed_inode(inode) &&
+		    !inode->fragment_size) {
+			ret = z_erofs_pack_file_from_fd(inode, fd,
+							ictx.tof_chksum);
+			if (ret)
+				goto err_free_idata;
 
-			ret = z_erofs_compress_one(&ctx);
+			ictx.extents = &ctx.extents;
+		} else {
+#ifdef EROFS_MT_ENABLED
+single_thread_comp:
+#endif
+			ctx.queue = z_erofs_global_queue;
+			ctx.destbuf = NULL;
+			ctx.chandle = &ccfg->handle;
+			ctx.remaining = inode->i_size - inode->fragment_size;
+			ctx.seg_num = 1;
+			ctx.seg_idx = 0;
+
+			ret = z_erofs_compress_file(&ctx, 0, blkaddr);
 			if (ret)
 				goto err_free_idata;
+
+			compressed_blocks = ctx.compressed_blocks;
+			ictx.extents = &ctx.extents;
 		}
-	}
-	DBG_BUGON(ctx.head != ctx.tail);
 
-	/* fall back to no compression mode */
-	compressed_blocks = ctx.blkaddr - blkaddr;
-	DBG_BUGON(compressed_blocks < !!inode->idata_size);
-	compressed_blocks -= !!inode->idata_size;
+		/* generate an extent for the deduplicated fragment */
+		if (inode->fragment_size && !ictx.fragemitted) {
+			struct z_erofs_extent_item *ei;
 
-	if (ctx.pivot) {
-		z_erofs_commit_extent(&ctx, ctx.pivot);
-		ctx.pivot = NULL;
-	}
-
-	/* generate an extent for the deduplicated fragment */
-	if (inode->fragment_size && !ctx.fragemitted) {
-		struct z_erofs_extent_item *ei;
+			ei = malloc(sizeof(*ei));
+			if (!ei) {
+				ret = -ENOMEM;
+				goto err_free_idata;
+			}
 
-		ei = malloc(sizeof(*ei));
-		if (!ei) {
-			ret = -ENOMEM;
-			goto err_free_idata;
+			ei->e = (struct z_erofs_inmem_extent){
+				.length = inode->fragment_size,
+				.compressedblks = 0,
+				.raw = false,
+				.partial = false,
+				.blkaddr = ctx.blkaddr,
+			};
+			init_list_head(&ei->list);
+			z_erofs_commit_extent(&ctx, ei);
 		}
-
-		ei->e = (struct z_erofs_inmem_extent) {
-			.length = inode->fragment_size,
-			.compressedblks = 0,
-			.raw = false,
-			.partial = false,
-			.blkaddr = ctx.blkaddr,
-		};
-		init_list_head(&ei->list);
-		z_erofs_commit_extent(&ctx, ei);
+		z_erofs_fragments_commit(inode);
 	}
-	z_erofs_fragments_commit(inode);
 
-	z_erofs_write_indexes(&ctx);
-	legacymetasize = ctx.metacur - compressmeta;
+	z_erofs_write_indexes(&ictx);
+	legacymetasize = ictx.metacur - compressmeta;
 	/* estimate if data compression saves space or not */
 	if (!inode->fragment_size &&
 	    compressed_blocks * erofs_blksiz(sbi) + inode->idata_size +
@@ -1062,7 +1472,7 @@ int erofs_write_compressed_file(struct erofs_inode *inode, int fd)
 	z_erofs_dedupe_commit(false);
 	z_erofs_write_mapheader(inode, compressmeta);
 
-	if (!ctx.fragemitted)
+	if (!ictx.fragemitted)
 		sbi->saved_by_deduplication += inode->fragment_size;
 
 	/* if the entire file is a fragment, a simplified form is used. */
@@ -1257,8 +1667,32 @@ int z_erofs_compress_init(struct erofs_sb_info *sbi, struct erofs_buffer_head *s
 		return -EINVAL;
 	}
 
-	if (erofs_sb_has_compr_cfgs(sbi))
-		return z_erofs_build_compr_cfgs(sbi, sb_bh, max_dict_size);
+	if (erofs_sb_has_compr_cfgs(sbi)) {
+		ret = z_erofs_build_compr_cfgs(sbi, sb_bh, max_dict_size);
+		if (ret)
+			return ret;
+	}
+
+#ifdef EROFS_MT_ENABLED
+	if (cfg.c_mt_workers == 1) {
+		z_erofs_mt_enabled = false;
+	} else {
+		pthread_mutex_init(&z_erofs_mt_ctrl.mutex, NULL);
+		pthread_cond_init(&z_erofs_mt_ctrl.cond, NULL);
+		ret = erofs_alloc_workqueue(
+			&z_erofs_mt_ctrl.wq, cfg.c_mt_workers,
+			cfg.c_mt_workers << 2,
+			sizeof(struct erofs_compress_wq_private),
+			z_erofs_mt_private_fini);
+		z_erofs_mt_enabled = !ret;
+	}
+#else
+	mt_enabled = false;
+#endif
+	z_erofs_global_queue = malloc(EROFS_COMPR_QUEUE_SZ);
+	if (!z_erofs_global_queue)
+		return -ENOMEM;
+
 	return 0;
 }
 
@@ -1271,5 +1705,22 @@ int z_erofs_compress_exit(void)
 		if (ret)
 			return ret;
 	}
+
+	if (z_erofs_mt_enabled) {
+#ifdef EROFS_MT_ENABLED
+		ret = erofs_destroy_workqueue(&z_erofs_mt_ctrl.wq);
+		if (ret)
+			return ret;
+		while (z_erofs_mt_ctrl.idle) {
+			struct erofs_compress_work *tmp =
+				z_erofs_mt_ctrl.idle->next;
+			free(z_erofs_mt_ctrl.idle);
+			z_erofs_mt_ctrl.idle = tmp;
+		}
+#endif
+	}
+
+	free(z_erofs_global_queue);
+
 	return 0;
 }
diff --git a/lib/compressor.c b/lib/compressor.c
index 4720e72..97732d1 100644
--- a/lib/compressor.c
+++ b/lib/compressor.c
@@ -86,6 +86,8 @@ int erofs_compressor_init(struct erofs_sb_info *sbi, struct erofs_compress *c,
 
 	/* should be written in "minimum compression ratio * 100" */
 	c->compress_threshold = 100;
+	c->compression_level = -1;
+	c->dict_size = 0;
 
 	if (!alg_name) {
 		c->alg = NULL;
-- 
2.44.0


^ permalink raw reply related	[flat|nested] 10+ messages in thread

* Re: [PATCH v4 1/5] erofs-utils: introduce multi-threading framework
  2024-02-28 16:16 ` [PATCH v4 1/5] erofs-utils: introduce multi-threading framework Yifan Zhao
@ 2024-02-29  9:43   ` Gao Xiang
  2024-02-29  9:50     ` Gao Xiang
  2024-02-29 12:09     ` Yifan Zhao
  0 siblings, 2 replies; 10+ messages in thread
From: Gao Xiang @ 2024-02-29  9:43 UTC (permalink / raw
  To: Yifan Zhao; +Cc: linux-erofs

Hi Yifan,

On 2024/2/29 00:16, Yifan Zhao wrote:
> Add a workqueue implementation for multi-threading support inspired by
> xfsprogs.
> 
> Signed-off-by: Yifan Zhao <zhaoyifan@sjtu.edu.cn>
> Suggested-by: Gao Xiang <hsiangkao@linux.alibaba.com>
> ---
>   configure.ac              |  16 +++++
>   include/erofs/internal.h  |   3 +
>   include/erofs/workqueue.h |  37 +++++++++++
>   lib/Makefile.am           |   4 ++
>   lib/workqueue.c           | 132 ++++++++++++++++++++++++++++++++++++++
>   5 files changed, 192 insertions(+)
>   create mode 100644 include/erofs/workqueue.h
>   create mode 100644 lib/workqueue.c
> 
> diff --git a/configure.ac b/configure.ac
> index 4b59230..3ccd6bb 100644
> --- a/configure.ac
> +++ b/configure.ac
> @@ -96,6 +96,14 @@ AC_DEFUN([EROFS_UTILS_PARSE_DIRECTORY],
>   
>   AC_ARG_VAR([MAX_BLOCK_SIZE], [The maximum block size which erofs-utils supports])
>   
> +AC_MSG_CHECKING([whether to enable multi-threading support])
> +AC_ARG_ENABLE([multithreading],
> +    AS_HELP_STRING([--enable-multithreading],
> +                   [enable multi-threading support @<:@default=no@:>@]),
> +    [enable_multithreading="$enableval"],
> +    [enable_multithreading="no"])
> +AC_MSG_RESULT([$enable_multithreading])
> +
>   AC_ARG_ENABLE([debug],
>       [AS_HELP_STRING([--enable-debug],
>                       [enable debugging mode @<:@default=no@:>@])],
> @@ -280,6 +288,13 @@ AS_IF([test "x$MAX_BLOCK_SIZE" = "x"], [
>                                [erofs_cv_max_block_size=4096]))
>   ], [erofs_cv_max_block_size=$MAX_BLOCK_SIZE])
>   
> +# Configure multi-threading support
> +AS_IF([test "x$enable_multithreading" != "xno"], [
> +    AC_CHECK_HEADERS([pthread.h])
> +    AC_CHECK_LIB([pthread], [pthread_mutex_lock], [], AC_MSG_ERROR([libpthread is required for multi-threaded build]))
> +    AC_DEFINE(EROFS_MT_ENABLED, 1, [Enable multi-threading support])
> +], [])
> +
>   # Configure debug mode
>   AS_IF([test "x$enable_debug" != "xno"], [], [
>     dnl Turn off all assert checking.
> @@ -471,6 +486,7 @@ AS_IF([test "x$enable_fuzzing" != "xyes"], [], [
>   AM_CONDITIONAL([ENABLE_FUZZING], [test "x${enable_fuzzing}" = "xyes"])
>   
>   # Set up needed symbols, conditionals and compiler/linker flags
> +AM_CONDITIONAL([ENABLE_EROFS_MT], [test "x${enable_multithreading}" != "xno"])
>   AM_CONDITIONAL([ENABLE_LZ4], [test "x${have_lz4}" = "xyes"])
>   AM_CONDITIONAL([ENABLE_LZ4HC], [test "x${have_lz4hc}" = "xyes"])
>   AM_CONDITIONAL([ENABLE_FUSE], [test "x${have_fuse}" = "xyes"])
> diff --git a/include/erofs/internal.h b/include/erofs/internal.h
> index 82797e1..954aef4 100644
> --- a/include/erofs/internal.h
> +++ b/include/erofs/internal.h
> @@ -22,6 +22,9 @@ typedef unsigned short umode_t;
>   #include <sys/types.h> /* for off_t definition */
>   #include <sys/stat.h> /* for S_ISCHR definition */
>   #include <stdio.h>
> +#ifdef HAVE_PTHREAD_H
> +#include <pthread.h>
> +#endif
>   
>   #ifndef PATH_MAX
>   #define PATH_MAX        4096    /* # chars in a path name including nul */
> diff --git a/include/erofs/workqueue.h b/include/erofs/workqueue.h
> new file mode 100644
> index 0000000..b4b3901
> --- /dev/null
> +++ b/include/erofs/workqueue.h
> @@ -0,0 +1,37 @@
> +/* SPDX-License-Identifier: GPL-2.0+ OR Apache-2.0 */
> +#ifndef __EROFS_WORKQUEUE_H
> +#define __EROFS_WORKQUEUE_H
> +
> +#include "internal.h"
> +
> +struct erofs_work;
> +
> +typedef void erofs_wq_func_t(struct erofs_work *);
> +typedef void erofs_wq_priv_fini_t(void *);
> +
> +struct erofs_work {
> +	void (*func)(struct erofs_work *work);
> +	struct erofs_work *next;
> +	void *priv;
> +};
> +
> +struct erofs_workqueue {
> +	struct erofs_work *head, *tail;
> +	pthread_mutex_t lock;
> +	pthread_cond_t cond_empty;
> +	pthread_cond_t cond_full;
> +	pthread_t *workers;
> +	unsigned int nworker;
> +	unsigned int max_jobs;
> +	unsigned int job_count;
> +	bool shutdown;
> +	size_t priv_size;

I don't like this way honestly, how about
	..
	erofs_wq_func_t on_start, on_exit;
	void *private;
	..

much like:
https://www.gnu.org/software/libc/manual/html_node/Cleanups-on-Exit.html

> +	erofs_wq_priv_fini_t *priv_fini;
> +};
> +
> +int erofs_alloc_workqueue(struct erofs_workqueue *wq, unsigned int nworker,
> +			 unsigned int max_jobs, size_t priv_size,
> +			 erofs_wq_priv_fini_t *priv_fini);
> +int erofs_queue_work(struct erofs_workqueue *wq, struct erofs_work *work);
> +int erofs_destroy_workqueue(struct erofs_workqueue *wq);
> +#endif
> \ No newline at end of file
> diff --git a/lib/Makefile.am b/lib/Makefile.am
> index 54b9c9c..7307f7b 100644
> --- a/lib/Makefile.am
> +++ b/lib/Makefile.am
> @@ -53,3 +53,7 @@ liberofs_la_SOURCES += kite_deflate.c compressor_deflate.c
>   if ENABLE_LIBDEFLATE
>   liberofs_la_SOURCES += compressor_libdeflate.c
>   endif
> +if ENABLE_EROFS_MT
> +liberofs_la_CFLAGS += -lpthread
> +liberofs_la_SOURCES += workqueue.c
> +endif
> diff --git a/lib/workqueue.c b/lib/workqueue.c
> new file mode 100644
> index 0000000..138afd5
> --- /dev/null
> +++ b/lib/workqueue.c
> @@ -0,0 +1,132 @@
> +// SPDX-License-Identifier: GPL-2.0+ OR Apache-2.0
> +#include <pthread.h>
> +#include <stdlib.h>
> +#include "erofs/workqueue.h"
> +
> +static void *worker_thread(void *arg)
> +{
> +	struct erofs_workqueue *wq = arg;
> +	struct erofs_work *work;
> +	void *priv = NULL;
> +
> +	if (wq->priv_size) {
> +		priv = calloc(wq->priv_size, 1);
> +		assert(priv);
> +	}

	if (wq->on_start)
		wq->on_start(wq);

> +
> +	while (true) {
> +		pthread_mutex_lock(&wq->lock);
> +
> +		while (wq->job_count == 0 && !wq->shutdown)
> +			pthread_cond_wait(&wq->cond_empty, &wq->lock);
> +		if (wq->job_count == 0 && wq->shutdown) {
> +			pthread_mutex_unlock(&wq->lock);
> +			break;
> +		}
> +
> +		work = wq->head;
> +		wq->head = work->next;
> +		if (!wq->head)
> +			wq->tail = NULL;
> +		wq->job_count--;
> +
> +		if (wq->job_count == wq->max_jobs - 1)
> +			pthread_cond_broadcast(&wq->cond_full);
> +
> +		pthread_mutex_unlock(&wq->lock);
> +
> +		work->priv = priv;
> +		work->func(work);
> +	}
> +
> +	if (priv) {
> +		assert(wq->priv_fini);
> +		(wq->priv_fini)(priv);
> +		free(priv);
> +	}

	if (wq->on_exit)
		wq->on_exit(wq);

> +
> +	return NULL;
> +}
> +
> +int erofs_alloc_workqueue(struct erofs_workqueue *wq, unsigned int nworker,
> +			 unsigned int max_jobs, size_t priv_size,
> +			 erofs_wq_priv_fini_t *priv_fini)
> +{
> +	unsigned int i;
> +
> +	if (!wq || nworker <= 0 || max_jobs <= 0)
> +		return -EINVAL;
> +
> +	wq->head = wq->tail = NULL;
> +	wq->nworker = nworker;
> +	wq->max_jobs = max_jobs;
> +	wq->job_count = 0;
> +	wq->shutdown = false;
> +	wq->priv_size = priv_size;
> +	wq->priv_fini = priv_fini;
> +	pthread_mutex_init(&wq->lock, NULL);
> +	pthread_cond_init(&wq->cond_empty, NULL);
> +	pthread_cond_init(&wq->cond_full, NULL);
> +
> +	wq->workers = malloc(nworker * sizeof(pthread_t));
> +	if (!wq->workers)
> +		return -ENOMEM;
> +
> +	for (i = 0; i < nworker; i++) {
> +		if (pthread_create(&wq->workers[i], NULL, worker_thread, wq)) {
> +			while (i--)
> +				pthread_cancel(wq->workers[i]);

How about
			while (i)
				pthread_cancel(wq->workers[--i]);

I preferred this since i won't be < 0.

Thanks,
Gao Xiang

^ permalink raw reply	[flat|nested] 10+ messages in thread

* Re: [PATCH v4 1/5] erofs-utils: introduce multi-threading framework
  2024-02-29  9:43   ` Gao Xiang
@ 2024-02-29  9:50     ` Gao Xiang
  2024-02-29 12:09     ` Yifan Zhao
  1 sibling, 0 replies; 10+ messages in thread
From: Gao Xiang @ 2024-02-29  9:50 UTC (permalink / raw
  To: Yifan Zhao; +Cc: linux-erofs



On 2024/2/29 17:43, Gao Xiang wrote:
> Hi Yifan,
> 
> On 2024/2/29 00:16, Yifan Zhao wrote:
>> Add a workqueue implementation for multi-threading support inspired by
>> xfsprogs.
>>
>> Signed-off-by: Yifan Zhao <zhaoyifan@sjtu.edu.cn>
>> Suggested-by: Gao Xiang <hsiangkao@linux.alibaba.com>
>> ---

..

>> index 54b9c9c..7307f7b 100644
>> --- a/lib/Makefile.am
>> +++ b/lib/Makefile.am
>> @@ -53,3 +53,7 @@ liberofs_la_SOURCES += kite_deflate.c compressor_deflate.c
>>   if ENABLE_LIBDEFLATE
>>   liberofs_la_SOURCES += compressor_libdeflate.c
>>   endif
>> +if ENABLE_EROFS_MT
>> +liberofs_la_CFLAGS += -lpthread

By the way, this line should be
liberofs_la_LDFLAGS = -lpthread

Otherwise, it can fail on the clang side.

Thanks,
Gao Xiang

^ permalink raw reply	[flat|nested] 10+ messages in thread

* Re: [PATCH v4 3/5] erofs-utils: mkfs: add --worker=# parameter
  2024-02-28 16:16 ` [PATCH v4 3/5] erofs-utils: mkfs: add --worker=# parameter Yifan Zhao
@ 2024-02-29  9:55   ` Gao Xiang
  0 siblings, 0 replies; 10+ messages in thread
From: Gao Xiang @ 2024-02-29  9:55 UTC (permalink / raw
  To: Yifan Zhao; +Cc: linux-erofs



On 2024/2/29 00:16, Yifan Zhao wrote:
> This patch introduces a --worker=# parameter for the incoming
> multi-threaded compression support. It also introduces a segment size
> used in multi-threaded compression, which has the default value 16MB
> and cannot be modified.
> 
> It also introduces a concept called `segment size` to split large files
> for multi-threading, which has the default value 16MB for now.
> 
> Signed-off-by: Yifan Zhao <zhaoyifan@sjtu.edu.cn>
> ---
>   include/erofs/config.h |  4 ++++
>   lib/config.c           |  4 ++++
>   mkfs/main.c            | 38 ++++++++++++++++++++++++++++++++++++++
>   3 files changed, 46 insertions(+)
> 
> diff --git a/include/erofs/config.h b/include/erofs/config.h
> index 73e3ac2..d2f91ff 100644
> --- a/include/erofs/config.h
> +++ b/include/erofs/config.h
> @@ -75,6 +75,10 @@ struct erofs_configure {
>   	char c_force_chunkformat;
>   	/* < 0, xattr disabled and INT_MAX, always use inline xattrs */
>   	int c_inline_xattr_tolerance;
> +#ifdef EROFS_MT_ENABLED
> +	u64 c_segment_size;
> +	u32 c_mt_workers;
> +#endif
>   
>   	u32 c_pclusterblks_max, c_pclusterblks_def, c_pclusterblks_packed;
>   	u32 c_max_decompressed_extent_bytes;
> diff --git a/lib/config.c b/lib/config.c
> index 947a183..2530274 100644
> --- a/lib/config.c
> +++ b/lib/config.c
> @@ -38,6 +38,10 @@ void erofs_init_configure(void)
>   	cfg.c_pclusterblks_max = 1;
>   	cfg.c_pclusterblks_def = 1;
>   	cfg.c_max_decompressed_extent_bytes = -1;
> +#ifdef EROFS_MT_ENABLED
> +	cfg.c_segment_size = 16ULL * 1024 * 1024;
> +	cfg.c_mt_workers = 1;
> +#endif
>   
>   	erofs_stdout_tty = isatty(STDOUT_FILENO);
>   }
> diff --git a/mkfs/main.c b/mkfs/main.c
> index 258c1ce..ce9c28b 100644
> --- a/mkfs/main.c
> +++ b/mkfs/main.c
> @@ -74,6 +74,9 @@ static struct option long_options[] = {
>   	{"ungzip", optional_argument, NULL, 517},
>   #endif
>   	{"offset", required_argument, NULL, 518},
> +#ifdef EROFS_MT_ENABLED
> +	{"workers", required_argument, NULL, 519},
> +#endif
>   	{0, 0, 0, 0},
>   };
>   
> @@ -179,6 +182,9 @@ static void usage(int argc, char **argv)
>   		" --product-out=X       X=product_out directory\n"
>   		" --fs-config-file=X    X=fs_config file\n"
>   		" --block-list-file=X   X=block_list file\n"
> +#endif
> +#ifdef EROFS_MT_ENABLED
> +		" --workers=#            set the number of worker threads to # (default=1)\n"
>   #endif
>   		);
>   }
> @@ -408,6 +414,13 @@ static void erofs_rebuild_cleanup(void)
>   	rebuild_src_count = 0;
>   }
>   
> +#ifdef EROFS_MT_ENABLED
> +static u32 mkfs_max_worker_num() {

static unsigned int erofs_mkfs_max_worker_num()
{
	return erofs_get_available_processors() ? : 16;
}

> +	u32 ncpu = erofs_get_available_processors();
> +	return ncpu ? ncpu : 16;
> +}
> +#endif
> +
>   static int mkfs_parse_options_cfg(int argc, char *argv[])
>   {
>   	char *endptr;
> @@ -650,6 +663,21 @@ static int mkfs_parse_options_cfg(int argc, char *argv[])
>   				return -EINVAL;
>   			}
>   			break;
> +#ifdef EROFS_MT_ENABLED
> +		case 519:
> +			cfg.c_mt_workers = strtoul(optarg, &endptr, 0);
> +			if (errno || *endptr != '\0') {
> +				erofs_err("invalid worker number %s", optarg);
> +				return -EINVAL;
> +			}
> +			if (cfg.c_mt_workers > mkfs_max_worker_num()) {
> +				erofs_warn(
> +					"worker number %s is too large, setting to %ud",
> +					optarg, mkfs_max_worker_num());
let's not break erofs_{err,warn,...} print line, it means:

				cfg.c_mt_workers = mkfs_max_worker_num();
				erofs_warn("worker number %s is too large, reseting to %ud",
					   optarg, cfg.c_mt_workers);

> +				cfg.c_mt_workers = mkfs_max_worker_num();
> +			}
> +			break;
> +#endif
>   		case 'V':
>   			version();
>   			exit(0);
> @@ -803,6 +831,16 @@ static int mkfs_parse_options_cfg(int argc, char *argv[])
>   		}
>   		cfg.c_pclusterblks_packed = pclustersize_packed >> sbi.blkszbits;
>   	}
> +
> +#ifdef EROFS_MT_ENABLED
> +	if (cfg.c_mt_workers > 1 &&
> +	    (cfg.c_dedupe || cfg.c_fragments || cfg.c_ztailpacking)) {
> +		cfg.c_mt_workers = 1;
> +		erofs_warn("Please note that dedupe/fragments/ztailpacking"
> +			   "is NOT supported in multi-threaded mode now, using worker=1.");
> +	}
> +#endif

This part would be better to go with the next patch.

Thanks,
Gao Xiang

^ permalink raw reply	[flat|nested] 10+ messages in thread

* Re: [PATCH v4 1/5] erofs-utils: introduce multi-threading framework
  2024-02-29  9:43   ` Gao Xiang
  2024-02-29  9:50     ` Gao Xiang
@ 2024-02-29 12:09     ` Yifan Zhao
  1 sibling, 0 replies; 10+ messages in thread
From: Yifan Zhao @ 2024-02-29 12:09 UTC (permalink / raw
  To: Gao Xiang; +Cc: linux-erofs


On 2/29/24 17:43, Gao Xiang wrote:
> Hi Yifan,
>
> On 2024/2/29 00:16, Yifan Zhao wrote:
>> Add a workqueue implementation for multi-threading support inspired by
>> xfsprogs.
>>
>> Signed-off-by: Yifan Zhao <zhaoyifan@sjtu.edu.cn>
>> Suggested-by: Gao Xiang <hsiangkao@linux.alibaba.com>
>> ---
>>   configure.ac              |  16 +++++
>>   include/erofs/internal.h  |   3 +
>>   include/erofs/workqueue.h |  37 +++++++++++
>>   lib/Makefile.am           |   4 ++
>>   lib/workqueue.c           | 132 ++++++++++++++++++++++++++++++++++++++
>>   5 files changed, 192 insertions(+)
>>   create mode 100644 include/erofs/workqueue.h
>>   create mode 100644 lib/workqueue.c
>>
>> diff --git a/configure.ac b/configure.ac
>> index 4b59230..3ccd6bb 100644
>> --- a/configure.ac
>> +++ b/configure.ac
>> @@ -96,6 +96,14 @@ AC_DEFUN([EROFS_UTILS_PARSE_DIRECTORY],
>>     AC_ARG_VAR([MAX_BLOCK_SIZE], [The maximum block size which 
>> erofs-utils supports])
>>   +AC_MSG_CHECKING([whether to enable multi-threading support])
>> +AC_ARG_ENABLE([multithreading],
>> +    AS_HELP_STRING([--enable-multithreading],
>> +                   [enable multi-threading support 
>> @<:@default=no@:>@]),
>> +    [enable_multithreading="$enableval"],
>> +    [enable_multithreading="no"])
>> +AC_MSG_RESULT([$enable_multithreading])
>> +
>>   AC_ARG_ENABLE([debug],
>>       [AS_HELP_STRING([--enable-debug],
>>                       [enable debugging mode @<:@default=no@:>@])],
>> @@ -280,6 +288,13 @@ AS_IF([test "x$MAX_BLOCK_SIZE" = "x"], [
>>                                [erofs_cv_max_block_size=4096]))
>>   ], [erofs_cv_max_block_size=$MAX_BLOCK_SIZE])
>>   +# Configure multi-threading support
>> +AS_IF([test "x$enable_multithreading" != "xno"], [
>> +    AC_CHECK_HEADERS([pthread.h])
>> +    AC_CHECK_LIB([pthread], [pthread_mutex_lock], [], 
>> AC_MSG_ERROR([libpthread is required for multi-threaded build]))
>> +    AC_DEFINE(EROFS_MT_ENABLED, 1, [Enable multi-threading support])
>> +], [])
>> +
>>   # Configure debug mode
>>   AS_IF([test "x$enable_debug" != "xno"], [], [
>>     dnl Turn off all assert checking.
>> @@ -471,6 +486,7 @@ AS_IF([test "x$enable_fuzzing" != "xyes"], [], [
>>   AM_CONDITIONAL([ENABLE_FUZZING], [test "x${enable_fuzzing}" = "xyes"])
>>     # Set up needed symbols, conditionals and compiler/linker flags
>> +AM_CONDITIONAL([ENABLE_EROFS_MT], [test "x${enable_multithreading}" 
>> != "xno"])
>>   AM_CONDITIONAL([ENABLE_LZ4], [test "x${have_lz4}" = "xyes"])
>>   AM_CONDITIONAL([ENABLE_LZ4HC], [test "x${have_lz4hc}" = "xyes"])
>>   AM_CONDITIONAL([ENABLE_FUSE], [test "x${have_fuse}" = "xyes"])
>> diff --git a/include/erofs/internal.h b/include/erofs/internal.h
>> index 82797e1..954aef4 100644
>> --- a/include/erofs/internal.h
>> +++ b/include/erofs/internal.h
>> @@ -22,6 +22,9 @@ typedef unsigned short umode_t;
>>   #include <sys/types.h> /* for off_t definition */
>>   #include <sys/stat.h> /* for S_ISCHR definition */
>>   #include <stdio.h>
>> +#ifdef HAVE_PTHREAD_H
>> +#include <pthread.h>
>> +#endif
>>     #ifndef PATH_MAX
>>   #define PATH_MAX        4096    /* # chars in a path name including 
>> nul */
>> diff --git a/include/erofs/workqueue.h b/include/erofs/workqueue.h
>> new file mode 100644
>> index 0000000..b4b3901
>> --- /dev/null
>> +++ b/include/erofs/workqueue.h
>> @@ -0,0 +1,37 @@
>> +/* SPDX-License-Identifier: GPL-2.0+ OR Apache-2.0 */
>> +#ifndef __EROFS_WORKQUEUE_H
>> +#define __EROFS_WORKQUEUE_H
>> +
>> +#include "internal.h"
>> +
>> +struct erofs_work;
>> +
>> +typedef void erofs_wq_func_t(struct erofs_work *);
>> +typedef void erofs_wq_priv_fini_t(void *);
>> +
>> +struct erofs_work {
>> +    void (*func)(struct erofs_work *work);
>> +    struct erofs_work *next;
>> +    void *priv;
>> +};
>> +
>> +struct erofs_workqueue {
>> +    struct erofs_work *head, *tail;
>> +    pthread_mutex_t lock;
>> +    pthread_cond_t cond_empty;
>> +    pthread_cond_t cond_full;
>> +    pthread_t *workers;
>> +    unsigned int nworker;
>> +    unsigned int max_jobs;
>> +    unsigned int job_count;
>> +    bool shutdown;
>> +    size_t priv_size;
>
> I don't like this way honestly, how about
>     ..
>     erofs_wq_func_t on_start, on_exit;
>     void *private;
>     ..
>
> much like:
> https://www.gnu.org/software/libc/manual/html_node/Cleanups-on-Exit.html
>
I believe `private` is a per-worker field and could not appear here?

And per-worker private data is initialized on demand now (we don't know 
if a certain compressor is needed in the worker thread), so I don't 
think it could be replaced with `on_start` which tries to initialize it 
during worker thread creation.


Thanks,

Yifan Zhao

>> +    erofs_wq_priv_fini_t *priv_fini;
>> +};
>> +
>> +int erofs_alloc_workqueue(struct erofs_workqueue *wq, unsigned int 
>> nworker,
>> +             unsigned int max_jobs, size_t priv_size,
>> +             erofs_wq_priv_fini_t *priv_fini);
>> +int erofs_queue_work(struct erofs_workqueue *wq, struct erofs_work 
>> *work);
>> +int erofs_destroy_workqueue(struct erofs_workqueue *wq);
>> +#endif
>> \ No newline at end of file
>> diff --git a/lib/Makefile.am b/lib/Makefile.am
>> index 54b9c9c..7307f7b 100644
>> --- a/lib/Makefile.am
>> +++ b/lib/Makefile.am
>> @@ -53,3 +53,7 @@ liberofs_la_SOURCES += kite_deflate.c 
>> compressor_deflate.c
>>   if ENABLE_LIBDEFLATE
>>   liberofs_la_SOURCES += compressor_libdeflate.c
>>   endif
>> +if ENABLE_EROFS_MT
>> +liberofs_la_CFLAGS += -lpthread
>> +liberofs_la_SOURCES += workqueue.c
>> +endif
>> diff --git a/lib/workqueue.c b/lib/workqueue.c
>> new file mode 100644
>> index 0000000..138afd5
>> --- /dev/null
>> +++ b/lib/workqueue.c
>> @@ -0,0 +1,132 @@
>> +// SPDX-License-Identifier: GPL-2.0+ OR Apache-2.0
>> +#include <pthread.h>
>> +#include <stdlib.h>
>> +#include "erofs/workqueue.h"
>> +
>> +static void *worker_thread(void *arg)
>> +{
>> +    struct erofs_workqueue *wq = arg;
>> +    struct erofs_work *work;
>> +    void *priv = NULL;
>> +
>> +    if (wq->priv_size) {
>> +        priv = calloc(wq->priv_size, 1);
>> +        assert(priv);
>> +    }
>
>     if (wq->on_start)
>         wq->on_start(wq);
>
>> +
>> +    while (true) {
>> +        pthread_mutex_lock(&wq->lock);
>> +
>> +        while (wq->job_count == 0 && !wq->shutdown)
>> +            pthread_cond_wait(&wq->cond_empty, &wq->lock);
>> +        if (wq->job_count == 0 && wq->shutdown) {
>> +            pthread_mutex_unlock(&wq->lock);
>> +            break;
>> +        }
>> +
>> +        work = wq->head;
>> +        wq->head = work->next;
>> +        if (!wq->head)
>> +            wq->tail = NULL;
>> +        wq->job_count--;
>> +
>> +        if (wq->job_count == wq->max_jobs - 1)
>> +            pthread_cond_broadcast(&wq->cond_full);
>> +
>> +        pthread_mutex_unlock(&wq->lock);
>> +
>> +        work->priv = priv;
>> +        work->func(work);
>> +    }
>> +
>> +    if (priv) {
>> +        assert(wq->priv_fini);
>> +        (wq->priv_fini)(priv);
>> +        free(priv);
>> +    }
>
>     if (wq->on_exit)
>         wq->on_exit(wq);
>
>> +
>> +    return NULL;
>> +}
>> +
>> +int erofs_alloc_workqueue(struct erofs_workqueue *wq, unsigned int 
>> nworker,
>> +             unsigned int max_jobs, size_t priv_size,
>> +             erofs_wq_priv_fini_t *priv_fini)
>> +{
>> +    unsigned int i;
>> +
>> +    if (!wq || nworker <= 0 || max_jobs <= 0)
>> +        return -EINVAL;
>> +
>> +    wq->head = wq->tail = NULL;
>> +    wq->nworker = nworker;
>> +    wq->max_jobs = max_jobs;
>> +    wq->job_count = 0;
>> +    wq->shutdown = false;
>> +    wq->priv_size = priv_size;
>> +    wq->priv_fini = priv_fini;
>> +    pthread_mutex_init(&wq->lock, NULL);
>> +    pthread_cond_init(&wq->cond_empty, NULL);
>> +    pthread_cond_init(&wq->cond_full, NULL);
>> +
>> +    wq->workers = malloc(nworker * sizeof(pthread_t));
>> +    if (!wq->workers)
>> +        return -ENOMEM;
>> +
>> +    for (i = 0; i < nworker; i++) {
>> +        if (pthread_create(&wq->workers[i], NULL, worker_thread, wq)) {
>> +            while (i--)
>> +                pthread_cancel(wq->workers[i]);
>
> How about
>             while (i)
>                 pthread_cancel(wq->workers[--i]);
>
> I preferred this since i won't be < 0.
>
> Thanks,
> Gao Xiang

^ permalink raw reply	[flat|nested] 10+ messages in thread

end of thread, other threads:[~2024-02-29 12:09 UTC | newest]

Thread overview: 10+ messages (download: mbox.gz follow: Atom feed
-- links below jump to the message on this page --
2024-02-28 16:16 [PATCH v4 0/5] erofs-utils: mkfs: introduce multi-threaded compression Yifan Zhao
2024-02-28 16:16 ` [PATCH v4 1/5] erofs-utils: introduce multi-threading framework Yifan Zhao
2024-02-29  9:43   ` Gao Xiang
2024-02-29  9:50     ` Gao Xiang
2024-02-29 12:09     ` Yifan Zhao
2024-02-28 16:16 ` [PATCH v4 2/5] erofs-utils: add a helper to get available processors Yifan Zhao
2024-02-28 16:16 ` [PATCH v4 3/5] erofs-utils: mkfs: add --worker=# parameter Yifan Zhao
2024-02-29  9:55   ` Gao Xiang
2024-02-28 16:16 ` [PATCH v4 4/5] erofs-utils: lib: introduce atomic operations Yifan Zhao
2024-02-28 16:16 ` [PATCH v4 5/5] erofs-utils: mkfs: introduce inner-file multi-threaded compression Yifan Zhao

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).