scheduler.go 2.0 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576
  1. package engine
  2. import (
  3. "fmt"
  4. "makarna/pkg/backend/cpu"
  5. "makarna/pkg/graph"
  6. "makarna/pkg/model"
  7. "makarna/pkg/tensor"
  8. )
  9. // Scheduler orchestrates prefill and decode while reusing a static graph plan.
  10. // It supports multi-token steps (len(tokens) > 1) for continuous batching.
  11. type Scheduler struct {
  12. engine *Engine
  13. plan graph.ExecutionPlan
  14. cache model.KVCache
  15. }
  16. // NewScheduler binds a model engine with a graph plan and KV cache.
  17. func (e *Engine) NewScheduler(plan graph.ExecutionPlan, cache model.KVCache) *Scheduler {
  18. return &Scheduler{
  19. engine: e,
  20. plan: plan,
  21. cache: cache,
  22. }
  23. }
  24. // RemainingContext returns how many tokens can still fit in the reserved plan.
  25. func (s *Scheduler) RemainingContext() int {
  26. return s.plan.MaxContext - s.cache.SeqLen()
  27. }
  28. // Prefill runs a single step prefill (full prompt) and advances the cache.
  29. func (s *Scheduler) Prefill(tokens []int) (tensor.Tensor, error) {
  30. return s.run(tokens)
  31. }
  32. // Decode runs multi-token decode in one step. The caller is responsible for
  33. // picking the next tokens from the returned logits.
  34. func (s *Scheduler) Decode(tokens []int) (tensor.Tensor, error) {
  35. return s.run(tokens)
  36. }
  37. func (s *Scheduler) run(tokens []int) (tensor.Tensor, error) {
  38. if len(tokens) == 0 {
  39. return nil, fmt.Errorf("no tokens to run")
  40. }
  41. if s.cache.SeqLen()+len(tokens) > s.plan.MaxContext {
  42. return nil, fmt.Errorf("context limit exceeded: need %d, max %d", s.cache.SeqLen()+len(tokens), s.plan.MaxContext)
  43. }
  44. input := cpu.NewTensor(tensor.Shape{len(tokens)}, nil)
  45. for i, id := range tokens {
  46. input.DataFloat32()[i] = float32(id)
  47. }
  48. pos := cpu.NewTensor(tensor.Shape{len(tokens)}, nil)
  49. base := s.cache.SeqLen()
  50. for i := range tokens {
  51. pos.DataFloat32()[i] = float32(base + i)
  52. }
  53. before := s.cache.SeqLen()
  54. logits, err := s.engine.Forward(nil, input, pos, s.cache)
  55. if err != nil {
  56. return nil, err
  57. }
  58. // Some model implementations advance the cache internally.
  59. // If the cache didn't advance, commit here.
  60. if s.cache.SeqLen() == before {
  61. s.cache.Commit(len(tokens))
  62. }
  63. return logits, nil
  64. }