Sfoglia il codice sorgente

Fix race conditions in threadpool when dealing with dynamic/frequent n_threads changes (#17748)

* tests: update barrier test to check for race condition in active threads

* cpu: combine n_graph and n_threads into a single atomic update

* tests: add multi-graph test for test_barrier
Max Krasnyansky 1 mese fa
parent
commit
e1f4921980
2 ha cambiato i file con 190 aggiunte e 53 eliminazioni
  1. 34 39
      ggml/src/ggml-cpu/ggml-cpu.c
  2. 156 14
      tests/test-barrier.cpp

+ 34 - 39
ggml/src/ggml-cpu/ggml-cpu.c

@@ -187,6 +187,9 @@ typedef void * thread_ret_t;
 
 typedef pthread_t ggml_thread_t;
 
+#define GGML_THREADPOOL_N_THREADS_MASK (0xffffU)
+#define GGML_THREADPOOL_N_THREADS_BITS (16)
+
 #if defined(__APPLE__)
 #include <unistd.h>
 #include <mach/mach.h>
@@ -449,7 +452,7 @@ struct ggml_threadpool {
     struct ggml_cplan  * cplan;
 
     // synchronization primitives
-    atomic_int n_graph;       // incremented when there is work to be done (i.e each graph)
+    atomic_int n_graph;       // updated when there is work to be done (i.e each graph) holds graph and active thread counts.
     atomic_int GGML_CACHE_ALIGN n_barrier;
     atomic_int GGML_CACHE_ALIGN n_barrier_passed;
     atomic_int GGML_CACHE_ALIGN current_chunk; // currently processing chunk during Mat_Mul, shared between all the threads.
@@ -457,12 +460,10 @@ struct ggml_threadpool {
     // these are atomic as an annotation for thread-sanitizer
     atomic_bool stop;         // Used for stopping the threadpool altogether
     atomic_bool pause;        // Used for pausing the threadpool or individual threads
-    atomic_int abort;         // Used for aborting processing of a graph
+    atomic_int  abort;        // Used for aborting processing of a graph
 
     struct ggml_compute_state * workers;   // per thread state
-    int          n_threads_max; // number of threads in the pool
-    atomic_int   n_threads_cur; // number of threads used in the current graph
-
+    int          n_threads;   // Number of threads in the pool
     int32_t      prio;        // Scheduling priority
     uint32_t     poll;        // Polling level (0 - no polling)
 
@@ -539,7 +540,7 @@ struct ggml_state {
 static struct ggml_state g_state = {0};
 
 void ggml_barrier(struct ggml_threadpool * tp) {
-    int n_threads = atomic_load_explicit(&tp->n_threads_cur, memory_order_relaxed);
+    int n_threads = atomic_load_explicit(&tp->n_graph, memory_order_relaxed) & GGML_THREADPOOL_N_THREADS_MASK;
     if (n_threads == 1) {
         return;
     }
@@ -556,7 +557,7 @@ void ggml_barrier(struct ggml_threadpool * tp) {
         // last thread
         atomic_store_explicit(&tp->n_barrier, 0, memory_order_relaxed);
 
-        // exit barrier (fill seq-cst fence)
+        // exit barrier (full seq-cst fence)
         atomic_fetch_add_explicit(&tp->n_barrier_passed, 1, memory_order_seq_cst);
         return;
     }
@@ -2628,7 +2629,7 @@ static void ggml_thread_cpumask_next(const bool * global_mask, bool * local_mask
 void ggml_threadpool_free(struct ggml_threadpool* threadpool) {
     if (!threadpool) return;
 
-    const int n_threads = threadpool->n_threads_max;
+    const int n_threads = threadpool->n_threads;
 
 #ifndef GGML_USE_OPENMP
     struct ggml_compute_state* workers = threadpool->workers;
@@ -2704,7 +2705,7 @@ struct ggml_cplan ggml_graph_plan(
         //GGML_PRINT_DEBUG("Threadpool is not specified. Will create a disposable threadpool : n_threads %d\n", n_threads);
     }
     if (n_threads <= 0) {
-        n_threads = threadpool ? threadpool->n_threads_max : GGML_DEFAULT_N_THREADS;
+        n_threads = threadpool ? threadpool->n_threads : GGML_DEFAULT_N_THREADS;
     }
 
 #if defined(__EMSCRIPTEN__) && !defined(__EMSCRIPTEN_PTHREADS__)
@@ -2912,12 +2913,14 @@ static thread_ret_t ggml_graph_compute_thread(void * data) {
 
     struct ggml_compute_params params = {
         /*.ith       =*/ state->ith,
-        /*.nth       =*/ atomic_load_explicit(&tp->n_threads_cur, memory_order_relaxed),
+        /*.nth       =*/ atomic_load_explicit(&tp->n_graph, memory_order_relaxed) & GGML_THREADPOOL_N_THREADS_MASK,
         /*.wsize     =*/ cplan->work_size,
         /*.wdata     =*/ cplan->work_data,
         /*.threadpool=*/ tp,
     };
 
+    GGML_PRINT_DEBUG("thread #%d compute-start cplan %p last-graph %d \n", state->ith, cplan, state->last_graph);
+
     for (int node_n = 0; node_n < cgraph->n_nodes && atomic_load_explicit(&tp->abort, memory_order_relaxed) != node_n; node_n++) {
         struct ggml_tensor * node = cgraph->nodes[node_n];
 
@@ -2939,6 +2942,8 @@ static thread_ret_t ggml_graph_compute_thread(void * data) {
         }
     }
 
+    GGML_PRINT_DEBUG("thread #%d compute-done cplan %p last-graph %d \n", state->ith, cplan, state->last_graph);
+
     ggml_barrier(state->threadpool);
 
     return 0;
@@ -2946,27 +2951,23 @@ static thread_ret_t ggml_graph_compute_thread(void * data) {
 
 #ifndef GGML_USE_OPENMP
 
-// check if thread is active
-static inline bool ggml_graph_compute_thread_active(struct ggml_compute_state * state) {
-    struct ggml_threadpool * threadpool = state->threadpool;
-    int n_threads = atomic_load_explicit(&threadpool->n_threads_cur, memory_order_relaxed);
-    return (state->ith < n_threads);
-}
-
 // check if thread is ready to proceed (exit from polling or sleeping)
+// returns true if loops should exit, sets state->pending to indicate new work
 static inline bool ggml_graph_compute_thread_ready(struct ggml_compute_state * state) {
     struct ggml_threadpool * threadpool = state->threadpool;
 
     if (state->pending || threadpool->stop || threadpool->pause) { return true; }
 
     // check for new graph/work
-    int new_graph = atomic_load_explicit(&threadpool->n_graph, memory_order_relaxed);
-    if (new_graph != state->last_graph) {
-        state->pending    = ggml_graph_compute_thread_active(state);
-        state->last_graph = new_graph;
+    int n_graph   = atomic_load_explicit(&threadpool->n_graph, memory_order_relaxed);
+    int n_threads = n_graph & GGML_THREADPOOL_N_THREADS_MASK;
+    if (n_graph != state->last_graph) {
+        state->pending    = (state->ith < n_threads);
+        state->last_graph = n_graph;
+        return true;
     }
 
-    return state->pending;
+    return false;
 }
 
 // sync thread state after polling
@@ -2983,11 +2984,6 @@ static inline void ggml_graph_compute_thread_sync(struct ggml_compute_state * st
 static inline bool ggml_graph_compute_poll_for_work(struct ggml_compute_state * state) {
     struct ggml_threadpool * threadpool = state->threadpool;
 
-    // Skip polling for unused threads
-    if (!ggml_graph_compute_thread_active(state)) {
-        return state->pending;
-    }
-
     // This seems to make 0 ... 100 a decent range for polling level across modern processors.
     // Perhaps, we can adjust it dynamically based on load and things.
     const uint64_t n_rounds = 1024UL * 128 * threadpool->poll;
@@ -3049,7 +3045,6 @@ static thread_ret_t ggml_graph_compute_secondary_thread(void* data) {
         ggml_graph_compute_check_for_work(state);
         if (state->pending) {
             state->pending = false;
-
             ggml_graph_compute_thread(state);
         }
     }
@@ -3064,14 +3059,15 @@ static void ggml_graph_compute_kickoff(struct ggml_threadpool * threadpool, int
 
     ggml_mutex_lock(&threadpool->mutex);
 
-    GGML_PRINT_DEBUG("threadpool: n_threads_cur %d n_threads %d\n", threadpool->n_threads_cur, n_threads);
+    // Update the number of active threads and the graph count
+    int n_graph = atomic_load_explicit(&threadpool->n_graph, memory_order_relaxed) >> GGML_THREADPOOL_N_THREADS_BITS;
+    n_graph = ((n_graph + 1) << GGML_THREADPOOL_N_THREADS_BITS) | (n_threads & GGML_THREADPOOL_N_THREADS_MASK);
 
-    // Update the number of active threads
-    atomic_store_explicit(&threadpool->n_threads_cur, n_threads, memory_order_relaxed);
+    GGML_PRINT_DEBUG("compute-kickoff: n_threads %d n_graph %d\n", n_threads, n_graph);
 
     // Indicate the graph is ready to be processed
     // We need the full seq-cst fence here because of the polling threads (used in thread_sync)
-    atomic_fetch_add_explicit(&threadpool->n_graph, 1, memory_order_seq_cst);
+    atomic_store_explicit(&threadpool->n_graph, n_graph, memory_order_seq_cst);
 
     if (threadpool->pause) {
        // Update main thread prio and affinity to match the threadpool settings
@@ -3109,8 +3105,7 @@ static struct ggml_threadpool * ggml_threadpool_new_impl(
         threadpool->pause            = tpp->paused;
         threadpool->abort            = -1;
         threadpool->workers          = NULL;
-        threadpool->n_threads_max    = tpp->n_threads;
-        threadpool->n_threads_cur    = tpp->n_threads;
+        threadpool->n_threads        = tpp->n_threads;
         threadpool->poll             = tpp->poll;
         threadpool->prio             = tpp->prio;
         threadpool->ec               = GGML_STATUS_SUCCESS;
@@ -3205,7 +3200,7 @@ enum ggml_status ggml_graph_compute(struct ggml_cgraph * cgraph, struct ggml_cpl
             {
                 // update the number of threads from the actual number of threads that we got from OpenMP
                 n_threads = omp_get_num_threads();
-                atomic_store_explicit(&threadpool->n_threads_cur, n_threads, memory_order_relaxed);
+                atomic_store_explicit(&threadpool->n_graph, n_threads, memory_order_relaxed);
             }
 
             // Apply thread CPU mask and priority
@@ -3218,13 +3213,13 @@ enum ggml_status ggml_graph_compute(struct ggml_cgraph * cgraph, struct ggml_cpl
             ggml_graph_compute_thread(&threadpool->workers[ith]);
         }
     } else {
-        atomic_store_explicit(&threadpool->n_threads_cur, 1, memory_order_relaxed);
+        atomic_store_explicit(&threadpool->n_graph, 1, memory_order_relaxed);
         ggml_graph_compute_thread(&threadpool->workers[0]);
     }
 #else
-    if (n_threads > threadpool->n_threads_max) {
-        GGML_LOG_WARN("cplan requested more threads (%d) than available (%d)\n", n_threads, threadpool->n_threads_max);
-        n_threads = threadpool->n_threads_max;
+    if (n_threads > threadpool->n_threads) {
+        GGML_LOG_WARN("cplan requested more threads (%d) than available (%d)\n", n_threads, threadpool->n_threads);
+        n_threads = threadpool->n_threads;
     }
 
     // Kick all threads to start the new graph

+ 156 - 14
tests/test-barrier.cpp

@@ -11,19 +11,7 @@
 
 #define MAX_NARGS 2
 
-int main(int argc, char *argv[]) {
-
-    int n_threads = std::max(1, std::min(4, (int) std::thread::hardware_concurrency()));
-    int n_rounds  = 100;
-
-    if (argc > 1) {
-        n_threads = std::atoi(argv[1]);
-    }
-
-    if (argc > 2) {
-        n_rounds  = std::atoi(argv[2]);
-    }
-
+static void test_barrier(int n_threads, int n_rounds) {
     struct ggml_init_params params = {
         /* .mem_size   = */ 1024*1024*1024,
         /* .mem_buffer = */ NULL,
@@ -56,7 +44,7 @@ int main(int argc, char *argv[]) {
         exit(1);
     }
 
-    // Create compute plan
+    // The test runs with constant number of threads
     struct ggml_cplan cplan = ggml_graph_plan(gf, n_threads, threadpool);
 
     std::vector<uint8_t> work_data(cplan.work_size);
@@ -89,6 +77,160 @@ int main(int argc, char *argv[]) {
 
     ggml_threadpool_free(threadpool);
     ggml_free(ctx);
+}
+
+static void test_active(int n_threads, int n_rounds) {
+    struct ggml_init_params params = {
+        /* .mem_size   = */ 1024*1024*1024,
+        /* .mem_buffer = */ NULL,
+        /* .no_alloc   = */ false,
+    };
+
+    struct ggml_context * ctx = ggml_init(params);
+
+    // Create graph
+    struct ggml_cgraph * gf = ggml_new_graph(ctx);
+
+    // Small graph with, parallel ops with barriers
+    struct ggml_tensor * out = ggml_new_tensor_1d(ctx, GGML_TYPE_F32,  64);
+    for (int i = 0; i < 2; i++) {
+        struct ggml_tensor * a = ggml_new_tensor_2d(ctx, GGML_TYPE_Q4_0, 64, 128);
+        out = ggml_mul_mat(ctx, a, out);
+
+        struct ggml_tensor * d = ggml_new_tensor_2d(ctx, GGML_TYPE_Q4_0, 128, 64);
+        out = ggml_mul_mat(ctx, d, out);
+    }
+
+    ggml_build_forward_expand(gf, out);
+    int n_nodes = ggml_graph_n_nodes(gf);
+
+    // Create threadpool
+    struct ggml_threadpool_params tpp  = ggml_threadpool_params_default(n_threads);
+    struct ggml_threadpool* threadpool = ggml_threadpool_new(&tpp);
+    if (!threadpool) {
+        fprintf(stderr, "threadpool create failed : n_threads %d\n", n_threads);
+        exit(1);
+    }
+
+    std::cerr << "graph-compute with"
+              << "\n n_threads: " << n_threads
+              << "\n   n_nodes: " << n_nodes
+              << "\n  n_rounds: " << n_rounds
+              << "\n";
+    // ggml_graph_print(gf);
+
+    // In this test we keep changing the number of threads every 4th iteration
+    // to test for race conditions in that path
+
+    for (int i=0; i < n_rounds; i++) {
+        struct ggml_cplan cplan = ggml_graph_plan(gf, (i % 4) == 0 ? 1 : n_threads, threadpool);
+
+        std::vector<uint8_t> work_data(cplan.work_size);
+        cplan.work_data = work_data.data();
+
+        ggml_graph_compute(gf, &cplan);
+    }
+
+    ggml_threadpool_free(threadpool);
+    ggml_free(ctx);
+}
+
+static void test_multi_graph(int n_threads, int n_rounds) {
+    struct ggml_init_params params = {
+        /* .mem_size   = */ 1024*1024*1024,
+        /* .mem_buffer = */ NULL,
+        /* .no_alloc   = */ false,
+    };
+
+    struct ggml_context * ctx = ggml_init(params);
+
+    // Create graphs
+    struct ggml_cgraph * gf0 = ggml_new_graph(ctx);
+    {
+        // Small graph with parallel ops with barriers
+        struct ggml_tensor * out = ggml_new_tensor_1d(ctx, GGML_TYPE_F32,  64);
+        for (int i = 0; i < 2; i++) {
+            struct ggml_tensor * a = ggml_new_tensor_2d(ctx, GGML_TYPE_Q4_0, 64, 128);
+            out = ggml_mul_mat(ctx, a, out);
+
+            struct ggml_tensor * d = ggml_new_tensor_2d(ctx, GGML_TYPE_Q4_0, 128, 64);
+            out = ggml_mul_mat(ctx, d, out);
+        }
+
+        ggml_build_forward_expand(gf0, out);
+    }
+
+    struct ggml_cgraph * gf1 = ggml_new_graph(ctx);
+    {
+        // Small graph with parallel ops with barriers
+        // Use larger tensors to make sure work_data size is larger than gf0
+        struct ggml_tensor * out = ggml_new_tensor_1d(ctx, GGML_TYPE_F32,  256);
+        for (int i = 0; i < 4; i++) {
+            struct ggml_tensor * a = ggml_new_tensor_2d(ctx, GGML_TYPE_Q4_0, 256, 128);
+            out = ggml_mul_mat(ctx, a, out);
+
+            struct ggml_tensor * d = ggml_new_tensor_2d(ctx, GGML_TYPE_Q4_0, 128, 256);
+            out = ggml_mul_mat(ctx, d, out);
+        }
+
+        ggml_build_forward_expand(gf1, out);
+    }
+
+
+    // Create threadpool
+    struct ggml_threadpool_params tpp  = ggml_threadpool_params_default(n_threads);
+    struct ggml_threadpool* threadpool = ggml_threadpool_new(&tpp);
+    if (!threadpool) {
+        fprintf(stderr, "threadpool create failed : n_threads %d\n", n_threads);
+        exit(1);
+    }
+
+    std::cerr << "graph-compute with"
+              << "\n gf0 n_nodes: " << ggml_graph_n_nodes(gf0)
+              << "\n gf1 n_nodes: " << ggml_graph_n_nodes(gf1)
+              << "\n   n_threads: " << n_threads
+              << "\n    n_rounds: " << n_rounds
+              << "\n";
+
+    // In this test we keep changing the number of threads every 4th iteration
+    // and we compute two graphs back to back to test graph frequent graph switching
+
+    for (int i=0; i < n_rounds; i++) {
+        struct ggml_cplan cplan0 = ggml_graph_plan(gf0, (i % 4) == 0 ? 1 : n_threads, threadpool);
+        std::vector<uint8_t> work_data0(cplan0.work_size);
+        cplan0.work_data = work_data0.data();
+
+        struct ggml_cplan cplan1 = ggml_graph_plan(gf1, (i % 4) == 0 ? 1 : n_threads, threadpool);
+        std::vector<uint8_t> work_data1(cplan1.work_size);
+        cplan1.work_data = work_data1.data();
+
+        ggml_graph_compute(gf0, &cplan0);
+        ggml_graph_compute(gf1, &cplan1);
+    }
+
+    ggml_threadpool_free(threadpool);
+    ggml_free(ctx);
+}
+
+
+int main(int argc, char *argv[]) {
+
+    int n_threads = std::max(1, std::min(4, (int) std::thread::hardware_concurrency()));
+    int n_rounds  = 100;
+
+    if (argc > 1) {
+        n_threads = std::atoi(argv[1]);
+    }
+
+    if (argc > 2) {
+        n_rounds  = std::atoi(argv[2]);
+    }
+
+    test_barrier(n_threads, n_rounds);
+
+    test_active(n_threads,  n_rounds * 100);
+
+    test_multi_graph(n_threads,  n_rounds * 10);
 
     return 0;
 }