Просмотр исходного кода

vulkan: change graph_compute to be async and enable get_tensor_async (#17158)

* vulkan: change graph_compute to be async and enable get_tensor_async

This allows some additional CPU/GPU overlap for large pp workloads. Also seems
to help a bit for token gen, maybe getting rid of a small bubble between
graph_compute and get_tensor.

Async set and copy functions seem to be very rarely used, so I didn't enable
them because I didn't have a good way to test them.

The async commands need to be ordered against each other, so put them all on
the compute queue. The non-async commands still use the transfer queue.

The fence for graph_compute/get_tensor_async is submitted and waited on in
ggml_vk_synchronize.

* fix thread safety errors

* teardown context cleanly

* Handle async read to non-pinned dst
Jeff Bolz 2 месяцев назад
Родитель
Сommit
38eaf32af1
1 измененных файлов с 96 добавлено и 45 удалено
  1. 96 45
      ggml/src/ggml-vulkan/ggml-vulkan.cpp

+ 96 - 45
ggml/src/ggml-vulkan/ggml-vulkan.cpp

@@ -234,6 +234,7 @@ class vk_memory_logger;
 #endif
 class vk_perf_logger;
 static void ggml_vk_destroy_buffer(vk_buffer& buf);
+static void ggml_vk_synchronize(ggml_backend_vk_context * ctx);
 
 static constexpr uint32_t mul_mat_vec_max_cols = 8;
 static constexpr uint32_t p021_max_gqa_ratio = 8;
@@ -1581,8 +1582,9 @@ struct ggml_backend_vk_context {
     size_t semaphore_idx, event_idx;
     ggml_vk_garbage_collector gc;
     size_t prealloc_size_x, prealloc_size_y, prealloc_size_split_k, prealloc_size_add_rms_partials, prealloc_size_add_rms_partials_offset;
-    vk_buffer prealloc_x, prealloc_y, prealloc_split_k, prealloc_add_rms_partials;
+    vk_buffer prealloc_x, prealloc_y, prealloc_split_k, prealloc_add_rms_partials, sync_staging;
     vk::Fence fence, almost_ready_fence;
+    bool submit_pending {};
     bool almost_ready_fence_pending {};
     // Set before op_add and unset after op_rms_norm to indicate that the add should
     // write partial sums to accumulate the square of the vector components
@@ -5602,6 +5604,16 @@ static void ggml_vk_ensure_sync_staging_buffer(vk_device& device, size_t size) {
     }
 }
 
+static void ggml_vk_ensure_sync_staging_buffer(ggml_backend_vk_context * ctx, size_t size) {
+    if (ctx->sync_staging == nullptr || ctx->sync_staging->size < size) {
+        VK_LOG_MEMORY("ggml_vk_ensure_sync_staging_buffer(" << size << ")");
+        ggml_vk_destroy_buffer(ctx->sync_staging);
+        ctx->sync_staging = ggml_vk_create_buffer_check(ctx->device, size,
+            vk::MemoryPropertyFlagBits::eHostVisible | vk::MemoryPropertyFlagBits::eHostCoherent | vk::MemoryPropertyFlagBits::eHostCached,
+            vk::MemoryPropertyFlagBits::eHostVisible | vk::MemoryPropertyFlagBits::eHostCoherent);
+    }
+}
+
 static void ggml_vk_buffer_write_nc_async(ggml_backend_vk_context * ctx, vk_context& subctx, vk_buffer& dst, size_t offset, const ggml_tensor * tensor, bool sync_staging = false) {
     VK_LOG_DEBUG("ggml_vk_buffer_write_nc_async(" << tensor << ")");
     GGML_ASSERT(!ggml_is_contiguous(tensor));
@@ -5803,7 +5815,7 @@ static void ggml_vk_buffer_write(vk_buffer& dst, size_t offset, const void * src
     ggml_vk_buffer_write_2d(dst, offset, src, 0, size, 1);
 }
 
-static void ggml_vk_buffer_read_2d_async(vk_context subctx, vk_buffer& src, size_t offset, void * dst, size_t spitch, size_t dpitch, size_t width, size_t height, bool sync_staging = false) {
+static bool ggml_vk_buffer_read_2d_async(vk_context subctx, vk_buffer& src, size_t offset, void * dst, size_t spitch, size_t dpitch, size_t width, size_t height, bool sync_staging = false) {
     VK_LOG_DEBUG("ggml_vk_buffer_read_2d_async(offset=" << offset << ", width=" << width << ", height=" << height << ")");
     GGML_ASSERT(width > 0);
     GGML_ASSERT(height > 0);
@@ -5836,12 +5848,13 @@ static void ggml_vk_buffer_read_2d_async(vk_context subctx, vk_buffer& src, size
         ggml_vk_sync_buffers(nullptr, subctx);
         subctx->s->buffer.copyBuffer(src->buffer, buf->buffer, slices);
 
-        return;
+        return true;
     }
     VK_LOG_DEBUG("STAGING");
 
     if (!sync_staging) {
-        GGML_ABORT("Asynchronous read from non-pinned memory not supported");
+        // copy was not handled caller needs to fall back
+        return false;
     }
 
     // Fall back to staging buffer
@@ -5854,9 +5867,10 @@ static void ggml_vk_buffer_read_2d_async(vk_context subctx, vk_buffer& src, size
     subctx->s->buffer.copyBuffer(src->buffer, staging_buffer->buffer, slices);
 
     deferred_memcpy(dst, staging_buffer->ptr, copy_size, &subctx->out_memcpys);
+    return true;
 }
 
-static void ggml_vk_buffer_read_async(vk_context subctx, vk_buffer& src, size_t offset, void * dst, size_t size, bool sync_staging = false) {
+static bool ggml_vk_buffer_read_async(vk_context subctx, vk_buffer& src, size_t offset, void * dst, size_t size, bool sync_staging = false) {
     return ggml_vk_buffer_read_2d_async(subctx, src, offset, dst, size, size, size, 1, sync_staging);
 }
 
@@ -5875,7 +5889,8 @@ static void ggml_vk_buffer_read(vk_buffer& src, size_t offset, void * dst, size_
 
         vk_context subctx = ggml_vk_create_temporary_context(src->device->transfer_queue.cmd_pool);
         ggml_vk_ctx_begin(src->device, subctx);
-        ggml_vk_buffer_read_async(subctx, src, offset, dst, size, true);
+        bool ret = ggml_vk_buffer_read_async(subctx, src, offset, dst, size, true);
+        GGML_ASSERT(ret);
         ggml_vk_ctx_end(subctx);
 
         ggml_vk_submit(subctx, src->device->fence);
@@ -11204,8 +11219,9 @@ static void ggml_vk_preallocate_buffers(ggml_backend_vk_context * ctx, vk_contex
     if (subctx) {
         // Submit and wait for any pending work before reallocating the buffers
         ggml_vk_ctx_end(subctx);
-        ggml_vk_submit(subctx, ctx->fence);
-        ggml_vk_wait_for_fence(ctx);
+        ggml_vk_submit(subctx, {});
+        ctx->submit_pending = true;
+        ggml_vk_synchronize(ctx);
         ggml_vk_ctx_begin(ctx->device, subctx);
     }
 
@@ -11243,7 +11259,7 @@ static void ggml_vk_preallocate_buffers(ggml_backend_vk_context * ctx, vk_contex
     }
 }
 
-static bool ggml_vk_compute_forward(ggml_backend_vk_context* ctx, ggml_cgraph * cgraph, ggml_tensor* tensor, int tensor_idx, bool use_fence, bool almost_ready);
+static bool ggml_vk_compute_forward(ggml_backend_vk_context* ctx, ggml_cgraph * cgraph, ggml_tensor* tensor, int tensor_idx, bool almost_ready);
 
 // Returns true if node has enqueued work into the queue, false otherwise
 // If submit is true the current all operations queued so far are being submitted to Vulkan to overlap cmdlist creation and GPU execution.
@@ -11787,7 +11803,7 @@ static bool ggml_vk_build_graph(ggml_backend_vk_context * ctx, ggml_cgraph * cgr
 
         ctx->compute_ctx.reset();
 
-        bool ok = ggml_vk_compute_forward(ctx, cgraph, node_begin, node_idx_begin, false, almost_ready);
+        bool ok = ggml_vk_compute_forward(ctx, cgraph, node_begin, node_idx_begin, almost_ready);
         if (!ok) {
             if (node->op == GGML_OP_UNARY) {
                 std::cerr << __func__ << ": error: op not supported UNARY " << node->name << " (" << ggml_unary_op_name(static_cast<ggml_unary_op>(node->op_params[0])) << ")" << std::endl;
@@ -11802,7 +11818,7 @@ static bool ggml_vk_build_graph(ggml_backend_vk_context * ctx, ggml_cgraph * cgr
     return true;
 }
 
-static bool ggml_vk_compute_forward(ggml_backend_vk_context * ctx, ggml_cgraph * cgraph, ggml_tensor * tensor, int tensor_idx, bool use_fence = true, bool almost_ready = false) {
+static bool ggml_vk_compute_forward(ggml_backend_vk_context * ctx, ggml_cgraph * cgraph, ggml_tensor * tensor, int tensor_idx, bool almost_ready = false) {
     GGML_UNUSED(cgraph);
     ggml_backend_buffer * buf = nullptr;
 
@@ -11919,16 +11935,10 @@ static bool ggml_vk_compute_forward(ggml_backend_vk_context * ctx, ggml_cgraph *
 
     vk_context subctx = ctx->tensor_ctxs[tensor_idx].lock();
 
-    // always wait for the GPU work to be done for the last submit
-    if (tensor_idx == subctx->exit_tensor_idx) {
-        use_fence = true;
-    }
-
     // Only run if ctx hasn't been submitted yet
     if (!subctx->seqs.empty()) {
 #ifdef GGML_VULKAN_CHECK_RESULTS
         ggml_vk_check_results_0(ctx, cgraph, tensor_idx);
-        use_fence = true;
 #endif
 
         // Do staging buffer copies
@@ -11940,17 +11950,16 @@ static bool ggml_vk_compute_forward(ggml_backend_vk_context * ctx, ggml_cgraph *
             memset(mset.dst, mset.val, mset.n);
         }
 
-        if (almost_ready && !ctx->almost_ready_fence_pending && !use_fence) {
+        if (almost_ready && !ctx->almost_ready_fence_pending) {
             ggml_vk_submit(subctx, ctx->almost_ready_fence);
             ctx->almost_ready_fence_pending = true;
         } else {
-            ggml_vk_submit(subctx, use_fence ? ctx->fence : vk::Fence{});
+            ggml_vk_submit(subctx, {});
         }
+        ctx->submit_pending = true;
 
-        if (use_fence) {
-            ggml_vk_wait_for_fence(ctx);
-        }
 #ifdef GGML_VULKAN_CHECK_RESULTS
+        ggml_vk_synchronize(ctx);
         ggml_vk_check_results_1(ctx, cgraph, tensor_idx);
 #endif
     }
@@ -12006,11 +12015,19 @@ static void ggml_vk_graph_cleanup(ggml_backend_vk_context * ctx) {
 // Clean up on backend free
 static void ggml_vk_cleanup(ggml_backend_vk_context * ctx) {
     VK_LOG_DEBUG("ggml_vk_cleanup(" << ctx->name << ")");
+    // discard any unsubmitted command buffers
+    ctx->transfer_ctx.reset();
+    // wait for any pending command buffers to finish
+    ggml_vk_synchronize(ctx);
+
     ggml_vk_graph_cleanup(ctx);
 
     ggml_vk_destroy_buffer(ctx->prealloc_x);
     ggml_vk_destroy_buffer(ctx->prealloc_y);
     ggml_vk_destroy_buffer(ctx->prealloc_split_k);
+    ggml_vk_destroy_buffer(ctx->prealloc_add_rms_partials);
+    ggml_vk_destroy_buffer(ctx->sync_staging);
+
     ctx->prealloc_y_last_pipeline_used = nullptr;
 
     ctx->prealloc_size_x = 0;
@@ -12305,7 +12322,7 @@ static void ggml_backend_vk_set_tensor_async(ggml_backend_t backend, ggml_tensor
 
     if (ctx->transfer_ctx.expired()) {
         // Initialize new transfer context
-        transfer_ctx = ggml_vk_create_context(ctx, ctx->transfer_cmd_pool);
+        transfer_ctx = ggml_vk_create_context(ctx, ctx->compute_cmd_pool);
         ctx->transfer_ctx = transfer_ctx;
         ggml_vk_ctx_begin(ctx->device, transfer_ctx);
     } else {
@@ -12328,7 +12345,7 @@ static void ggml_backend_vk_get_tensor_async(ggml_backend_t backend, const ggml_
 
     if (ctx->transfer_ctx.expired()) {
         // Initialize new transfer context
-        transfer_ctx = ggml_vk_create_context(ctx, ctx->transfer_cmd_pool);
+        transfer_ctx = ggml_vk_create_context(ctx, ctx->compute_cmd_pool);
         ctx->transfer_ctx = transfer_ctx;
         ggml_vk_ctx_begin(ctx->device, transfer_ctx);
     } else {
@@ -12337,7 +12354,23 @@ static void ggml_backend_vk_get_tensor_async(ggml_backend_t backend, const ggml_
 
     vk_buffer buf = buf_ctx->dev_buffer;
 
-    ggml_vk_buffer_read_async(transfer_ctx, buf, vk_tensor_offset(tensor) + tensor->view_offs + offset, data, size);
+    auto src_offset = vk_tensor_offset(tensor) + tensor->view_offs + offset;
+    bool ret = ggml_vk_buffer_read_async(transfer_ctx, buf, src_offset, data, size);
+
+    // If that failed, copy synchronously through a staging buffer
+    if (!ret) {
+        ggml_vk_ensure_sync_staging_buffer(ctx, size);
+        ggml_vk_sync_buffers(nullptr, transfer_ctx);
+
+        vk::BufferCopy buffer_cpy;
+        buffer_cpy.srcOffset = src_offset;
+        buffer_cpy.dstOffset = 0;
+        buffer_cpy.size = size;
+
+        transfer_ctx->s->buffer.copyBuffer(buf->buffer, ctx->sync_staging->buffer, { buffer_cpy });
+        deferred_memcpy(data, ctx->sync_staging->ptr, size, &transfer_ctx->out_memcpys);
+        ggml_vk_synchronize(ctx);
+    }
 }
 
 static bool ggml_backend_vk_cpy_tensor_async(ggml_backend_t backend, const ggml_tensor * src, ggml_tensor * dst) {
@@ -12351,7 +12384,7 @@ static bool ggml_backend_vk_cpy_tensor_async(ggml_backend_t backend, const ggml_
 
         if (ctx->transfer_ctx.expired()) {
             // Initialize new transfer context
-            transfer_ctx = ggml_vk_create_context(ctx, ctx->transfer_cmd_pool);
+            transfer_ctx = ggml_vk_create_context(ctx, ctx->compute_cmd_pool);
             ctx->transfer_ctx = transfer_ctx;
             ggml_vk_ctx_begin(ctx->device, transfer_ctx);
         } else {
@@ -12368,29 +12401,49 @@ static bool ggml_backend_vk_cpy_tensor_async(ggml_backend_t backend, const ggml_
     return false;
 }
 
-static void ggml_backend_vk_synchronize(ggml_backend_t backend) {
-    VK_LOG_DEBUG("ggml_backend_vk_synchronize()");
-    ggml_backend_vk_context * ctx = (ggml_backend_vk_context *)backend->context;
-    if(ctx->transfer_ctx.expired()) {
-        return;
-    }
+static void ggml_vk_synchronize(ggml_backend_vk_context * ctx) {
+    VK_LOG_DEBUG("ggml_vk_synchronize()");
 
-    vk_context transfer_ctx = ctx->transfer_ctx.lock();
+    bool do_transfer = !ctx->transfer_ctx.expired();
 
-    ggml_vk_ctx_end(transfer_ctx);
+    vk_context transfer_ctx;
+    if (do_transfer) {
+        transfer_ctx = ctx->transfer_ctx.lock();
 
-    for (auto& cpy : transfer_ctx->in_memcpys) {
-        memcpy(cpy.dst, cpy.src, cpy.n);
+        ggml_vk_ctx_end(transfer_ctx);
+
+        for (auto& cpy : transfer_ctx->in_memcpys) {
+            memcpy(cpy.dst, cpy.src, cpy.n);
+        }
+
+        ggml_vk_submit(transfer_ctx, {});
+        ctx->submit_pending = true;
     }
 
-    ggml_vk_submit(transfer_ctx, ctx->fence);
-    ggml_vk_wait_for_fence(ctx);
+    if (ctx->submit_pending) {
+        {
+            std::lock_guard<std::mutex> guard(queue_mutex);
+            ctx->device->compute_queue.queue.submit({}, ctx->fence);
+        }
+        ggml_vk_wait_for_fence(ctx);
+        ctx->submit_pending = false;
+    }
 
-    for (auto& cpy : transfer_ctx->out_memcpys) {
-        memcpy(cpy.dst, cpy.src, cpy.n);
+    if (do_transfer) {
+        for (auto& cpy : transfer_ctx->out_memcpys) {
+            memcpy(cpy.dst, cpy.src, cpy.n);
+        }
+        ctx->transfer_ctx.reset();
     }
+}
 
-    ctx->transfer_ctx.reset();
+static void ggml_backend_vk_synchronize(ggml_backend_t backend) {
+    VK_LOG_DEBUG("ggml_backend_vk_synchronize()");
+    ggml_backend_vk_context * ctx = (ggml_backend_vk_context *)backend->context;
+
+    ggml_vk_synchronize(ctx);
+
+    ggml_vk_graph_cleanup(ctx);
 }
 
 static bool ggml_vk_is_empty(ggml_tensor * node) {
@@ -12938,8 +12991,6 @@ static ggml_status ggml_backend_vk_graph_compute(ggml_backend_t backend, ggml_cg
         ctx->device->perf_logger->print_timings();
     }
 
-    ggml_vk_graph_cleanup(ctx);
-
     return GGML_STATUS_SUCCESS;
 
     UNUSED(backend);
@@ -13168,9 +13219,9 @@ static ggml_backend_i ggml_backend_vk_interface = {
     /* .get_name                = */ ggml_backend_vk_name,
     /* .free                    = */ ggml_backend_vk_free,
     /* .set_tensor_async        = */ NULL,  // ggml_backend_vk_set_tensor_async,
-    /* .get_tensor_async        = */ NULL,  // ggml_backend_vk_get_tensor_async,
+    /* .get_tensor_async        = */ ggml_backend_vk_get_tensor_async,
     /* .cpy_tensor_async        = */ NULL,  // ggml_backend_vk_cpy_tensor_async,
-    /* .synchronize             = */ NULL,  // ggml_backend_vk_synchronize,
+    /* .synchronize             = */ ggml_backend_vk_synchronize,
     /* .graph_plan_create       = */ NULL,
     /* .graph_plan_free         = */ NULL,
     /* .graph_plan_update       = */ NULL,