Explorar o código

server: (router) allow child process to report status via stdout (#18110)

* server: (router) allow child process to report status via stdout

* apply suggestions
Xuan-Son Nguyen hai 1 mes
pai
achega
bde461de8c
Modificáronse 3 ficheiros con 21 adicións e 44 borrados
  1. 19 40
      tools/server/server-models.cpp
  2. 1 2
      tools/server/server-models.h
  3. 1 2
      tools/server/server.cpp

+ 19 - 40
tools/server/server-models.cpp

@@ -17,6 +17,7 @@
 #include <chrono>
 #include <queue>
 #include <filesystem>
+#include <cstring>
 
 #ifdef _WIN32
 #include <winsock2.h>
@@ -33,7 +34,8 @@
 #include <limits.h>
 #endif
 
-#define CMD_EXIT "exit"
+#define CMD_ROUTER_TO_CHILD_EXIT  "cmd_router_to_child:exit"
+#define CMD_CHILD_TO_ROUTER_READY "cmd_child_to_router:ready"
 
 // address for child process, this is needed because router may run on 0.0.0.0
 // ref: https://github.com/ggml-org/llama.cpp/issues/17862
@@ -534,6 +536,8 @@ void server_models::load(const std::string & name) {
         std::vector<char *> argv = to_char_ptr_array(child_args);
         std::vector<char *> envp = to_char_ptr_array(child_env);
 
+        // TODO @ngxson : maybe separate stdout and stderr in the future
+        //                so that we can use stdout for commands and stderr for logging
         int options = subprocess_option_no_window | subprocess_option_combined_stdout_stderr;
         int result = subprocess_create_ex(argv.data(), options, envp.data(), inst.subproc.get());
         if (result != 0) {
@@ -547,11 +551,17 @@ void server_models::load(const std::string & name) {
     // captured variables are guaranteed to be destroyed only after the thread is joined
     inst.th = std::thread([this, name, child_proc = inst.subproc, port = inst.meta.port]() {
         // read stdout/stderr and forward to main server log
+        bool state_received = false; // true if child state received
         FILE * p_stdout_stderr = subprocess_stdout(child_proc.get());
         if (p_stdout_stderr) {
             char buffer[4096];
             while (fgets(buffer, sizeof(buffer), p_stdout_stderr) != nullptr) {
                 LOG("[%5d] %s", port, buffer);
+                if (!state_received && std::strstr(buffer, CMD_CHILD_TO_ROUTER_READY) != nullptr) {
+                    // child process is ready
+                    this->update_status(name, SERVER_MODEL_STATUS_LOADED);
+                    state_received = true;
+                }
             }
         } else {
             SRV_ERR("failed to get stdout/stderr of child process for name=%s\n", name.c_str());
@@ -595,7 +605,7 @@ static void interrupt_subprocess(FILE * stdin_file) {
     // because subprocess.h does not provide a way to send SIGINT,
     // we will send a command to the child process to exit gracefully
     if (stdin_file) {
-        fprintf(stdin_file, "%s\n", CMD_EXIT);
+        fprintf(stdin_file, "%s\n", CMD_ROUTER_TO_CHILD_EXIT);
         fflush(stdin_file);
     }
 }
@@ -707,32 +717,13 @@ server_http_res_ptr server_models::proxy_request(const server_http_req & req, co
     return proxy;
 }
 
-std::thread server_models::setup_child_server(const common_params & base_params, int router_port, const std::string & name, std::function<void(int)> & shutdown_handler) {
+std::thread server_models::setup_child_server(const std::function<void(int)> & shutdown_handler) {
     // send a notification to the router server that a model instance is ready
-    // TODO @ngxson : use HTTP client from libcommon
-    httplib::Client cli(base_params.hostname, router_port);
-    cli.set_connection_timeout(0, 200000); // 200 milliseconds
-
-    httplib::Request req;
-    req.method = "POST";
-    req.path   = "/models/status";
-    req.set_header("Content-Type", "application/json");
-    if (!base_params.api_keys.empty()) {
-        req.set_header("Authorization", "Bearer " + base_params.api_keys[0]);
-    }
-
-    json body;
-    body["model"] = name;
-    body["value"] = server_model_status_to_string(SERVER_MODEL_STATUS_LOADED);
-    req.body = body.dump();
-
-    SRV_INF("notifying router server (port=%d) that model %s is ready\n", router_port, name.c_str());
-    auto result = cli.send(std::move(req));
-    if (result.error() != httplib::Error::Success) {
-        auto err_str = httplib::to_string(result.error());
-        SRV_ERR("failed to notify router server: %s\n", err_str.c_str());
-        exit(1); // force exit
-    }
+    common_log_pause(common_log_main());
+    fflush(stdout);
+    fprintf(stdout, "%s\n", CMD_CHILD_TO_ROUTER_READY);
+    fflush(stdout);
+    common_log_resume(common_log_main());
 
     // setup thread for monitoring stdin
     return std::thread([shutdown_handler]() {
@@ -746,7 +737,7 @@ std::thread server_models::setup_child_server(const common_params & base_params,
                 eof = true;
                 break;
             }
-            if (line.find(CMD_EXIT) != std::string::npos) {
+            if (line.find(CMD_ROUTER_TO_CHILD_EXIT) != std::string::npos) {
                 SRV_INF("%s", "exit command received, exiting...\n");
                 shutdown_handler(0);
                 break;
@@ -869,18 +860,6 @@ void server_models_routes::init_routes() {
         return res;
     };
 
-    // used by child process to notify the router about status change
-    // TODO @ngxson : maybe implement authentication for this endpoint in the future
-    this->post_router_models_status = [this](const server_http_req & req) {
-        auto res = std::make_unique<server_http_res>();
-        json body = json::parse(req.body);
-        std::string model = json_value(body, "model", std::string());
-        std::string value = json_value(body, "value", std::string());
-        models.update_status(model, server_model_status_from_string(value));
-        res_ok(res, {{"success", true}});
-        return res;
-    };
-
     this->get_router_models = [this](const server_http_req &) {
         auto res = std::make_unique<server_http_res>();
         json models_json = json::array();

+ 1 - 2
tools/server/server-models.h

@@ -144,7 +144,7 @@ public:
 
     // notify the router server that a model instance is ready
     // return the monitoring thread (to be joined by the caller)
-    static std::thread setup_child_server(const common_params & base_params, int router_port, const std::string & name, std::function<void(int)> & shutdown_handler);
+    static std::thread setup_child_server(const std::function<void(int)> & shutdown_handler);
 };
 
 struct server_models_routes {
@@ -162,7 +162,6 @@ struct server_models_routes {
     server_http_context::handler_t proxy_post;
     server_http_context::handler_t get_router_models;
     server_http_context::handler_t post_router_models_load;
-    server_http_context::handler_t post_router_models_status;
     server_http_context::handler_t post_router_models_unload;
 };
 

+ 1 - 2
tools/server/server.cpp

@@ -153,7 +153,6 @@ int main(int argc, char ** argv, char ** envp) {
         routes.get_models = models_routes->get_router_models;
         ctx_http.post("/models/load",   ex_wrapper(models_routes->post_router_models_load));
         ctx_http.post("/models/unload", ex_wrapper(models_routes->post_router_models_unload));
-        ctx_http.post("/models/status", ex_wrapper(models_routes->post_router_models_status));
     }
 
     ctx_http.get ("/health",              ex_wrapper(routes.get_health)); // public endpoint (no API key check)
@@ -291,7 +290,7 @@ int main(int argc, char ** argv, char ** envp) {
         const char * router_port = std::getenv("LLAMA_SERVER_ROUTER_PORT");
         std::thread monitor_thread;
         if (router_port != nullptr) {
-            monitor_thread = server_models::setup_child_server(params, std::atoi(router_port), params.model_alias, shutdown_handler);
+            monitor_thread = server_models::setup_child_server(shutdown_handler);
         }
 
         // this call blocks the main thread until queue_tasks.terminate() is called