server-queue.h 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110
  1. #pragma once
  2. #include "server-task.h"
  3. #include <condition_variable>
  4. #include <deque>
  5. #include <mutex>
  6. #include <unordered_set>
  7. struct server_queue {
  8. private:
  9. int id = 0;
  10. bool running;
  11. // queues
  12. std::deque<server_task> queue_tasks;
  13. std::deque<server_task> queue_tasks_deferred;
  14. std::mutex mutex_tasks;
  15. std::condition_variable condition_tasks;
  16. // callback functions
  17. std::function<void(server_task &&)> callback_new_task;
  18. std::function<void(void)> callback_update_slots;
  19. public:
  20. // Add a new task to the end of the queue
  21. int post(server_task && task, bool front = false);
  22. // multi-task version of post()
  23. int post(std::vector<server_task> && tasks, bool front = false);
  24. // Add a new task, but defer until one slot is available
  25. void defer(server_task && task);
  26. // Get the next id for creating a new task
  27. int get_new_id();
  28. // Register function to process a new task
  29. void on_new_task(std::function<void(server_task &&)> callback);
  30. // Register the function to be called when all slots data is ready to be processed
  31. void on_update_slots(std::function<void(void)> callback);
  32. // Call when the state of one slot is changed, it will move one task from deferred to main queue
  33. void pop_deferred_task();
  34. // end the start_loop routine
  35. void terminate();
  36. /**
  37. * Main loop consists of these steps:
  38. * - Wait until a new task arrives
  39. * - Process the task (i.e. maybe copy data into slot)
  40. * - Check if multitask is finished
  41. * - Update all slots
  42. */
  43. void start_loop();
  44. // for metrics
  45. size_t queue_tasks_deferred_size() {
  46. std::unique_lock<std::mutex> lock(mutex_tasks);
  47. return queue_tasks_deferred.size();
  48. }
  49. private:
  50. void cleanup_pending_task(int id_target);
  51. };
  52. struct server_response {
  53. private:
  54. bool running = true;
  55. // for keeping track of all tasks waiting for the result
  56. std::unordered_set<int> waiting_task_ids;
  57. // the main result queue (using ptr for polymorphism)
  58. std::vector<server_task_result_ptr> queue_results;
  59. std::mutex mutex_results;
  60. std::condition_variable condition_results;
  61. public:
  62. // add the id_task to the list of tasks waiting for response
  63. void add_waiting_task_id(int id_task);
  64. void add_waiting_tasks(const std::vector<server_task> & tasks);
  65. // when the request is finished, we can remove task associated with it
  66. void remove_waiting_task_id(int id_task);
  67. // remove multiple tasks from waiting list
  68. void remove_waiting_task_ids(const std::unordered_set<int> & id_tasks);
  69. // This function blocks the thread until there is a response for one of the id_tasks
  70. server_task_result_ptr recv(const std::unordered_set<int> & id_tasks);
  71. // same as recv(), but have timeout in seconds
  72. // if timeout is reached, nullptr is returned
  73. server_task_result_ptr recv_with_timeout(const std::unordered_set<int> & id_tasks, int timeout);
  74. // single-task version of recv()
  75. server_task_result_ptr recv(int id_task);
  76. // Send a new result to a waiting id_task
  77. void send(server_task_result_ptr && result);
  78. // terminate the waiting loop
  79. void terminate();
  80. };