Ver código fonte

llama : Async DirectIO model loading on Linux (#18012)

* Uncached model read

* Removing additional --mmap arg

* Removing trailing whitespaces

* Adding fallback when O_DIRECT is not supported

* Remove branching in llama-model-loader.cpp and reduce code duplications in llama-mmap.cpp

* Adding maybe unused keyword for Mac and Windows.

* File seek aligned

* Removing all branches for direct_io in llama-model-loader.cpp

* Always use alignment from llama_file

* use_mmap=true
Julius Tischbein 1 mês atrás
pai
commit
4d4f4cacd1
3 arquivos alterados com 184 adições e 42 exclusões
  1. 123 28
      src/llama-mmap.cpp
  2. 5 1
      src/llama-mmap.h
  3. 56 13
      src/llama-model-loader.cpp

+ 123 - 28
src/llama-mmap.cpp

@@ -13,9 +13,10 @@
 #ifdef __has_include
     #if __has_include(<unistd.h>)
         #include <unistd.h>
+        #include <fcntl.h>
+        #include <sys/stat.h>
         #if defined(_POSIX_MAPPED_FILES)
             #include <sys/mman.h>
-            #include <fcntl.h>
         #endif
         #if defined(_POSIX_MEMLOCK_RANGE)
             #include <sys/resource.h>
@@ -74,7 +75,7 @@ struct llama_file::impl {
         return ret;
     }
 
-    impl(const char * fname, const char * mode) {
+    impl(const char * fname, const char * mode, [[maybe_unused]] const bool use_direct_io = false) {
         fp = ggml_fopen(fname, mode);
         if (fp == NULL) {
             throw std::runtime_error(format("failed to open %s: %s", fname, strerror(errno)));
@@ -153,13 +154,40 @@ struct llama_file::impl {
         write_raw(&val, sizeof(val));
     }
 
+    void read_aligned_chunk(size_t offset, void * dest, size_t size) const {
+        throw std::runtime_error("DirectIO is not implemented on Windows.");
+    }
+
     ~impl() {
         if (fp) {
             std::fclose(fp);
         }
     }
 #else
-    impl(const char * fname, const char * mode) {
+    impl(const char * fname, const char * mode, [[maybe_unused]] const bool use_direct_io = false) {
+#ifdef __linux__
+        // Try unbuffered I/O for read only
+        if (use_direct_io && std::strcmp(mode, "rb") == 0) {
+            fd = open(fname, O_RDONLY | O_DIRECT);
+
+            if (fd != -1) {
+                struct stat file_stats{};
+                fstat(fd, &file_stats);
+
+                size = file_stats.st_size;
+                alignment = file_stats.st_blksize;
+
+                off_t ret = lseek(fd, 0, SEEK_SET);
+                if (ret == -1) {
+                    throw std::runtime_error(format("seek error: %s", strerror(errno)));
+                }
+                return;
+            }
+
+            LLAMA_LOG_WARN("Failed to open model %s with error: %s. Falling back to buffered I/O",
+                fname, strerror(errno));
+        }
+#endif
         fp = ggml_fopen(fname, mode);
         if (fp == NULL) {
             throw std::runtime_error(format("failed to open %s: %s", fname, strerror(errno)));
@@ -170,27 +198,30 @@ struct llama_file::impl {
     }
 
     size_t tell() const {
-// TODO: this ifdef is never true?
-#ifdef _WIN32
-        __int64 ret = _ftelli64(fp);
-#else
-        long ret = std::ftell(fp);
-#endif
-        if (ret == -1) {
-            throw std::runtime_error(format("ftell error: %s", strerror(errno)));
+        if (fd == -1) {
+            long ret = std::ftell(fp);
+            if (ret == -1) {
+                throw std::runtime_error(format("ftell error: %s", strerror(errno)));
+            }
+
+            return (size_t) ret;
         }
 
-        return (size_t) ret;
+        off_t pos = lseek(fd, 0, SEEK_CUR);
+        if (pos == -1) {
+            throw std::runtime_error(format("lseek error: %s", strerror(errno)));
+        }
+        return (size_t) pos;
     }
 
     void seek(size_t offset, int whence) const {
-// TODO: this ifdef is never true?
-#ifdef _WIN32
-        int ret = _fseeki64(fp, (__int64) offset, whence);
-#else
-        int ret = std::fseek(fp, (long) offset, whence);
-#endif
-        if (ret != 0) {
+        off_t ret = 0;
+        if (fd == -1) {
+            ret = std::fseek(fp, (long) offset, whence);
+        } else {
+            ret = lseek(fd, offset, whence);
+        }
+        if (ret == -1) {
             throw std::runtime_error(format("seek error: %s", strerror(errno)));
         }
     }
@@ -200,13 +231,55 @@ struct llama_file::impl {
             return;
         }
         errno = 0;
-        std::size_t ret = std::fread(ptr, len, 1, fp);
-        if (ferror(fp)) {
-            throw std::runtime_error(format("read error: %s", strerror(errno)));
+        if (fd == -1) {
+            std::size_t ret = std::fread(ptr, len, 1, fp);
+            if (ferror(fp)) {
+                throw std::runtime_error(format("read error: %s", strerror(errno)));
+            }
+            if (ret != 1) {
+                throw std::runtime_error("unexpectedly reached end of file");
+            }
+        } else {
+            bool successful = false;
+            while (!successful) {
+                off_t ret = read(fd, ptr, len);
+
+                if (ret == -1) {
+                    if (errno == EINTR) {
+                        continue;  // Interrupted by signal, retry
+                    }
+                    throw std::runtime_error(format("read error: %s", strerror(errno)));
+                }
+                if (ret == 0) {
+                    throw std::runtime_error("unexpectedly reached end of file");
+                }
+
+                successful = true;
+            }
         }
-        if (ret != 1) {
-            throw std::runtime_error("unexpectedly reached end of file");
+    }
+
+    void read_aligned_chunk(size_t offset, void * dest, size_t size) const {
+        off_t aligned_offset = offset & ~(alignment - 1);
+        off_t offset_from_alignment = offset - aligned_offset;
+        size_t bytes_to_read = (offset_from_alignment + size + alignment - 1) & ~(alignment - 1);
+
+        void * raw_buffer = nullptr;
+        int ret = posix_memalign(&raw_buffer, alignment, bytes_to_read);
+        if (ret != 0) {
+            throw std::runtime_error(format("posix_memalign failed with error %d", ret));
         }
+
+        struct aligned_buffer_deleter {
+            void operator()(void * p) const { free(p); }
+        };
+        std::unique_ptr<void, aligned_buffer_deleter> buffer(raw_buffer);
+
+        seek(aligned_offset, SEEK_SET);
+        read_raw(buffer.get(), bytes_to_read);
+
+        uintptr_t actual_data = reinterpret_cast<uintptr_t>(buffer.get()) + offset_from_alignment;
+        memcpy(dest, reinterpret_cast<void *>(actual_data), size);
     }
 
     uint32_t read_u32() const {
@@ -231,22 +304,43 @@ struct llama_file::impl {
     }
 
     ~impl() {
-        if (fp) {
+        if (fd != -1) {
+            close(fd);
+        } else {
             std::fclose(fp);
         }
     }
+    int fd = -1;
 #endif
 
-    FILE * fp;
-    size_t size;
+    void read_raw_at(void * ptr, size_t len, size_t offset) const {
+        if (alignment != 1) {
+            read_aligned_chunk(offset, ptr, len);
+        } else {
+            seek(offset, SEEK_SET);
+            read_raw(ptr, len);
+        }
+    }
+
+    size_t read_alignment() const {
+        return alignment;
+    }
+
+    size_t alignment = 1;
+
+    FILE * fp{};
+    size_t size{};
 };
 
-llama_file::llama_file(const char * fname, const char * mode) : pimpl(std::make_unique<impl>(fname, mode)) {}
+llama_file::llama_file(const char * fname, const char * mode, const bool use_direct_io) :
+    pimpl(std::make_unique<impl>(fname, mode, use_direct_io)) {}
 llama_file::~llama_file() = default;
 
 size_t llama_file::tell() const { return pimpl->tell(); }
 size_t llama_file::size() const { return pimpl->size; }
 
+size_t llama_file::read_alignment() const { return pimpl->read_alignment(); }
+
 int llama_file::file_id() const {
 #ifdef _WIN32
     return _fileno(pimpl->fp);
@@ -261,6 +355,7 @@ int llama_file::file_id() const {
 
 void llama_file::seek(size_t offset, int whence) const { pimpl->seek(offset, whence); }
 void llama_file::read_raw(void * ptr, size_t len) const { pimpl->read_raw(ptr, len); }
+void llama_file::read_raw_at(void * ptr, size_t len, size_t offset) const { pimpl->read_raw_at(ptr, len, offset); }
 
 uint32_t llama_file::read_u32() const { return pimpl->read_u32(); }
 

+ 5 - 1
src/llama-mmap.h

@@ -3,6 +3,7 @@
 #include <cstdint>
 #include <memory>
 #include <vector>
+#include <cstdio>
 
 struct llama_file;
 struct llama_mmap;
@@ -13,7 +14,7 @@ using llama_mmaps  = std::vector<std::unique_ptr<llama_mmap>>;
 using llama_mlocks = std::vector<std::unique_ptr<llama_mlock>>;
 
 struct llama_file {
-    llama_file(const char * fname, const char * mode);
+    llama_file(const char * fname, const char * mode, bool use_direct_io = false);
     ~llama_file();
 
     size_t tell() const;
@@ -24,11 +25,14 @@ struct llama_file {
     void seek(size_t offset, int whence) const;
 
     void read_raw(void * ptr, size_t len) const;
+    void read_raw_at(void * ptr, size_t len, size_t offset) const;
+    void read_aligned_chunk(size_t offset, void * dest, size_t size) const;
     uint32_t read_u32() const;
 
     void write_raw(const void * ptr, size_t len) const;
     void write_u32(uint32_t val) const;
 
+    size_t read_alignment() const;
 private:
     struct impl;
     std::unique_ptr<impl> pimpl;

+ 56 - 13
src/llama-model-loader.cpp

@@ -504,7 +504,7 @@ llama_model_loader::llama_model_loader(
     get_key(llm_kv(LLM_KV_GENERAL_ARCHITECTURE), arch_name, false);
     llm_kv = LLM_KV(llm_arch_from_string(arch_name));
 
-    files.emplace_back(new llama_file(fname.c_str(), "rb"));
+    files.emplace_back(new llama_file(fname.c_str(), "rb", !use_mmap));
     contexts.emplace_back(ctx);
 
     // Save tensors data offset of the main file.
@@ -572,7 +572,7 @@ llama_model_loader::llama_model_loader(
                 }
             }
 
-            files.emplace_back(new llama_file(fname_split, "rb"));
+            files.emplace_back(new llama_file(fname_split, "rb", !use_mmap));
             contexts.emplace_back(ctx);
 
             // Save tensors data offset info of the shard.
@@ -935,7 +935,15 @@ bool llama_model_loader::load_all_data(
     // 4 staging buffers for async uploads, each sized 1MB seems to be a good default for single NVMe drives.
     // NVMe raid configurations might require more / larger buffers.
     constexpr size_t n_buffers = 4;
-    constexpr size_t buffer_size = 1 * 1024 * 1024; // 1MB
+
+    size_t alignment = 1;
+    for (const auto & file : files) {
+        alignment = std::max(file->read_alignment(), alignment);
+    }
+
+    // Buffer size: balance between memory usage and I/O efficiency
+    // 64MB works well for NVMe drives
+    const size_t buffer_size = alignment != 1 ? 64 * 1024 * 1024 + 2 * alignment : 1 * 1024 * 1024;
 
     std::vector<ggml_backend_buffer_t> host_buffers;
     std::vector<ggml_backend_event_t> events;
@@ -985,6 +993,7 @@ bool llama_model_loader::load_all_data(
         // If the backend is supported, create pinned memory buffers and events for synchronisation.
         for (size_t idx = 0; idx < n_buffers; ++idx) {
             auto * buf = ggml_backend_buft_alloc_buffer(host_buft, buffer_size);
+
             if (!buf) {
                 LLAMA_LOG_DEBUG("%s: failed to allocate host buffer for async uploads for device %s\n", func,
                     ggml_backend_dev_name(dev));
@@ -1066,9 +1075,9 @@ bool llama_model_loader::load_all_data(
             }
         } else {
             const auto & file = files.at(weight->idx);
+
             if (ggml_backend_buffer_is_host(cur->buffer)) {
-                file->seek(weight->offs, SEEK_SET);
-                file->read_raw(cur->data, n_size);
+                file->read_raw_at(cur->data, n_size, weight->offs);
                 if (check_tensors) {
                     validation_result.emplace_back(std::async(std::launch::async, [cur, n_size] {
                         return std::make_pair(cur, ggml_validate_row_data(cur->type, cur->data, n_size));
@@ -1077,26 +1086,60 @@ bool llama_model_loader::load_all_data(
             } else {
                 // If upload_backend is valid load the tensor in chunks to pinned memory and upload the buffers asynchronously to the GPU.
                 if (upload_backend) {
-                    file->seek(weight->offs, SEEK_SET);
+                    auto offset = (off_t) weight->offs;
+                    alignment = file->read_alignment();
+                    off_t aligned_offset = offset & ~(alignment - 1);
+                    off_t offset_from_alignment = offset - aligned_offset;
+                    file->seek(aligned_offset, SEEK_SET);
+
+                    // Calculate aligned read boundaries
+                    size_t read_start = aligned_offset;
+                    size_t read_end = (offset + n_size + alignment - 1) & ~(alignment - 1);
 
                     size_t bytes_read = 0;
+                    size_t data_read = 0;  // Actual tensor data copied (excluding padding)
+
+                    while (bytes_read < read_end - read_start) {
+                        size_t read_size = std::min<size_t>(buffer_size, read_end - read_start - bytes_read);
 
-                    while (bytes_read < n_size) {
-                        size_t read_iteration = std::min<size_t>(buffer_size, n_size - bytes_read);
+                        // Align the destination pointer within the pinned buffer
+                        uintptr_t ptr_dest_aligned = (reinterpret_cast<uintptr_t>(host_ptrs[buffer_idx]) + alignment - 1) & ~(alignment - 1);
 
+                        // Wait for previous upload to complete before reusing buffer
                         ggml_backend_event_synchronize(events[buffer_idx]);
-                        file->read_raw(host_ptrs[buffer_idx], read_iteration);
-                        ggml_backend_tensor_set_async(upload_backend, cur, host_ptrs[buffer_idx], bytes_read, read_iteration);
+
+                        // Read aligned chunk from file
+                        file->read_raw(reinterpret_cast<void *>(ptr_dest_aligned), read_size);
+
+                        // Calculate actual data portion (excluding alignment padding)
+                        uintptr_t ptr_data = ptr_dest_aligned;
+                        size_t data_to_copy = read_size;
+
+                        // Skip alignment padding at start of first chunk
+                        if (bytes_read == 0) {
+                            ptr_data += offset_from_alignment;
+                            data_to_copy -= offset_from_alignment;
+                        }
+
+                        // Trim alignment padding at end of last chunk
+                        if (aligned_offset + bytes_read + read_size > offset + n_size) {
+                            data_to_copy -= (read_end - (offset + n_size));
+                        }
+
+                        // Async upload actual data to GPU
+                        ggml_backend_tensor_set_async(upload_backend, cur,
+                                                      reinterpret_cast<void *>(ptr_data), data_read, data_to_copy);
                         ggml_backend_event_record(events[buffer_idx], upload_backend);
 
-                        bytes_read += read_iteration;
+                        data_read += data_to_copy;
+                        bytes_read += read_size;
+
                         ++buffer_idx;
                         buffer_idx %= n_buffers;
                     }
                 } else {
                     read_buf.resize(n_size);
-                    file->seek(weight->offs, SEEK_SET);
-                    file->read_raw(read_buf.data(), n_size);
+                    file->read_raw_at(read_buf.data(), n_size, weight->offs);
                     ggml_backend_tensor_set(cur, read_buf.data(), 0, n_size);
                     if (check_tensors && !ggml_validate_row_data(cur->type, read_buf.data(), n_size)) {
                         throw std::runtime_error(format("tensor '%s' has invalid data", ggml_get_name(cur)));