| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409 |
- package engine
- import (
- "log"
- "sort"
- "strconv"
- "strings"
- "makarna/pkg/backend/cuda"
- "makarna/pkg/loader"
- "makarna/pkg/tensor"
- )
- var (
- cudaDeviceCountFn = cuda.DeviceCount
- cudaMemoryInfoDeviceFn = cuda.MemoryInfoDevice
- )
- const defaultGPUReserveBytes = uint64(1024 << 20) // 1GB headroom for scratch/overheads
- // PlanLayerDevices decides per-layer placement based on a GPU budget fraction.
- // If GPU info is unavailable, defaults to CPU.
- func PlanLayerDevices(md *loader.ModelData, cfg *loader.ModelConfig, budgetFraction float64, gpuDevices []int) []tensor.DevicePlacement {
- if budgetFraction <= 0 || budgetFraction > 1 {
- budgetFraction = 0.9
- }
- numLayers := int(cfg.Params["num_hidden_layers"].(float64))
- if numLayers <= 0 {
- return nil
- }
- gpus := normalizedAutoGPUList(gpuDevices)
- if len(gpus) == 0 {
- log.Printf("placement: no usable GPUs, defaulting to CPU")
- return makeCPUPlacements(numLayers)
- }
- budgets := make(map[int]uint64, len(gpus))
- var totalBudget uint64
- for _, gpu := range gpus {
- totalMem, freeMem, err := cudaMemoryInfoDeviceFn(gpu)
- if err != nil {
- log.Printf("placement: cuda mem info gpu=%d unavailable (%v), skipping", gpu, err)
- continue
- }
- target := uint64(float64(totalMem) * budgetFraction)
- if target > freeMem {
- target = freeMem
- }
- if target <= defaultGPUReserveBytes {
- log.Printf("placement: gpu=%d budget too small (%d bytes), skipping", gpu, target)
- continue
- }
- target -= defaultGPUReserveBytes
- budgets[gpu] = target
- totalBudget += target
- }
- if totalBudget == 0 {
- log.Printf("placement: no usable GPU budgets, defaulting to CPU")
- return makeCPUPlacements(numLayers)
- }
- layerWeights := estimateLayerWeightBytes(md, numLayers)
- layerCache := estimateLayerCacheBytes(cfg, numLayers)
- layerNeed := make([]uint64, numLayers)
- var totalNeed uint64
- for i := 0; i < numLayers; i++ {
- layerNeed[i] = layerWeights[i] + layerCache[i]
- totalNeed += layerNeed[i]
- }
- placements := makeCPUPlacements(numLayers)
- if totalNeed == 0 {
- return placements
- }
- // If everything fits in aggregate, distribute layers roughly proportionally to GPU budgets.
- if totalNeed <= totalBudget {
- // Build per-GPU target usage.
- targetByGPU := make(map[int]uint64, len(budgets))
- var ordered []int
- for _, gpu := range gpus {
- if b := budgets[gpu]; b > 0 {
- ordered = append(ordered, gpu)
- }
- }
- sort.Ints(ordered)
- var acc uint64
- for i, gpu := range ordered {
- b := budgets[gpu]
- share := uint64(float64(totalNeed) * (float64(b) / float64(totalBudget)))
- if i == len(ordered)-1 {
- share = totalNeed - acc
- }
- targetByGPU[gpu] = share
- acc += share
- }
- used := make(map[int]uint64, len(ordered))
- cur := 0
- for layer := numLayers - 1; layer >= 0; layer-- {
- need := layerNeed[layer]
- placed := false
- for cur < len(ordered) {
- gpu := ordered[cur]
- b := budgets[gpu]
- if b-used[gpu] < need {
- cur++
- continue
- }
- // Prefer moving to next GPU once we cross the proportional target, if next can fit.
- if cur < len(ordered)-1 && used[gpu]+need > targetByGPU[gpu] {
- next := ordered[cur+1]
- if budgets[next]-used[next] >= need {
- cur++
- continue
- }
- }
- placements[layer] = tensor.DevicePlacement{Type: tensor.CUDA, GPU: gpu}
- used[gpu] += need
- placed = true
- break
- }
- if !placed {
- placements[layer] = tensor.DevicePlacement{Type: tensor.CPU, GPU: -1}
- }
- }
- return placements
- }
- // Otherwise: greedy fill GPUs in order, spill the rest to CPU.
- ordered := make([]int, 0, len(budgets))
- for _, gpu := range gpus {
- if budgets[gpu] > 0 {
- ordered = append(ordered, gpu)
- }
- }
- sort.Ints(ordered)
- used := make(map[int]uint64, len(ordered))
- cur := 0
- for layer := numLayers - 1; layer >= 0; layer-- {
- need := layerNeed[layer]
- placed := false
- for cur < len(ordered) {
- gpu := ordered[cur]
- if budgets[gpu]-used[gpu] >= need {
- placements[layer] = tensor.DevicePlacement{Type: tensor.CUDA, GPU: gpu}
- used[gpu] += need
- placed = true
- break
- }
- cur++
- }
- if !placed {
- placements[layer] = tensor.DevicePlacement{Type: tensor.CPU, GPU: -1}
- }
- }
- return placements
- }
- func makeCPUPlacements(n int) []tensor.DevicePlacement {
- p := make([]tensor.DevicePlacement, n)
- for i := range p {
- p[i] = tensor.DevicePlacement{Type: tensor.CPU, GPU: -1}
- }
- return p
- }
- func normalizedAutoGPUList(gpuDevices []int) []int {
- seen := make(map[int]bool)
- out := make([]int, 0, len(gpuDevices))
- for _, g := range gpuDevices {
- if g < 0 {
- continue
- }
- if !seen[g] {
- seen[g] = true
- out = append(out, g)
- }
- }
- if len(out) > 0 {
- sort.Ints(out)
- return out
- }
- // Auto-detect all devices when not explicitly configured.
- n, err := cudaDeviceCountFn()
- if err != nil || n <= 0 {
- return nil
- }
- out = make([]int, 0, n)
- for i := 0; i < n; i++ {
- out = append(out, i)
- }
- return out
- }
- func estimateLayerWeightBytes(md *loader.ModelData, numLayers int) []uint64 {
- out := make([]uint64, numLayers)
- if md == nil {
- return out
- }
- for name, t := range md.Metadata.Tensors {
- sz := effectiveTensorBytes(t)
- if idx, ok := parseLayerIndex(name); ok && idx >= 0 && idx < numLayers {
- out[idx] += sz
- continue
- }
- ln := strings.ToLower(name)
- switch {
- case strings.Contains(ln, "embed_tokens") || strings.Contains(ln, "token_emb"):
- out[0] += sz
- case strings.Contains(ln, "lm_head") || strings.Contains(ln, "output"):
- out[numLayers-1] += sz
- default:
- // Treat as "shared"; add to last layer for budgeting.
- out[numLayers-1] += sz
- }
- }
- return out
- }
- func effectiveTensorBytes(t loader.TensorEntry) uint64 {
- // Placement needs to approximate the *runtime* footprint; prefer exact on-disk size
- // when available, fall back to dtype-based estimates otherwise.
- if t.Size > 0 {
- return t.Size
- }
- var elems uint64 = 1
- for _, d := range t.Shape {
- if d == 0 {
- return 0
- }
- elems *= d
- }
- if elems == 0 {
- return 0
- }
- switch t.DType {
- case loader.F16, loader.BF16:
- return elems * 2
- case loader.F32:
- return elems * 4
- }
- // Fallback: assume fp16 weight cache for large 2D quant matrices (matmul weights).
- if t.DType.Info().BlockSize > 0 && len(t.Shape) == 2 && t.Shape[0] > 1 && t.Shape[1] > 1 {
- return elems * 2
- }
- // Fallback to metadata bytes-per-element.
- bpe := float64(t.DType.Info().BytesPerEl)
- if bpe <= 0 {
- return 0
- }
- return uint64(float64(elems) * bpe)
- }
- func estimateLayerCacheBytes(cfg *loader.ModelConfig, numLayers int) []uint64 {
- out := make([]uint64, numLayers)
- if cfg == nil || cfg.Params == nil || numLayers <= 0 {
- return out
- }
- arch := strings.ToLower(cfg.Architecture)
- params := cfg.Params
- // KimiLinear: recurrent cache (per-layer constant state) + short conv states.
- if strings.Contains(arch, "kimi") {
- lac, _ := params["linear_attn_config"].(map[string]any)
- numHeads := intFromAny(lac["num_heads"], intFromAny(params["num_attention_heads"], 0))
- headDim := intFromAny(lac["head_dim"], 0)
- kernel := intFromAny(lac["short_conv_kernel_size"], 0)
- if numHeads <= 0 || headDim <= 0 {
- return out
- }
- if kernel <= 1 {
- kernel = 1
- }
- convLen := kernel - 1
- projSize := numHeads * headDim
- recurrentBytes := uint64(numHeads * headDim * headDim * 4)
- convBytes := uint64(projSize * convLen * 4)
- perLayer := recurrentBytes + 3*convBytes
- kdaLayers := parseIntListAny(lac["kda_layers"])
- if len(kdaLayers) == 0 {
- // Conservative: assume all layers use recurrent state.
- for i := range out {
- out[i] = perLayer
- }
- return out
- }
- for _, oneBased := range kdaLayers {
- idx := oneBased - 1
- if idx >= 0 && idx < len(out) {
- out[idx] = perLayer
- }
- }
- return out
- }
- // Default: paged KV cache with fp16 K/V (2 bytes per element).
- numHeads := intFromAny(params["num_attention_heads"], 0)
- numKVHeads := intFromAny(params["num_key_value_heads"], 0)
- if numKVHeads == 0 {
- numKVHeads = numHeads
- }
- hidden := intFromAny(params["hidden_size"], 0)
- headDim := intFromAny(params["head_dim"], 0)
- if headDim == 0 && numHeads > 0 {
- headDim = hidden / numHeads
- }
- maxSeq := intFromAny(params["max_position_embeddings"], 0)
- if maxSeq <= 0 {
- maxSeq = 4096
- }
- if numKVHeads <= 0 || headDim <= 0 {
- return out
- }
- kvDim := numKVHeads * headDim
- perLayer := uint64(maxSeq * kvDim * 4) // K+V fp16 => 2B + 2B = 4B
- for i := range out {
- out[i] = perLayer
- }
- return out
- }
- func parseLayerIndex(name string) (int, bool) {
- const p1 = "model.layers."
- if strings.HasPrefix(name, p1) {
- rest := name[len(p1):]
- dot := strings.IndexByte(rest, '.')
- if dot <= 0 {
- return 0, false
- }
- n, err := strconv.Atoi(rest[:dot])
- if err != nil {
- return 0, false
- }
- return n, true
- }
- const p2 = "layers."
- if strings.HasPrefix(name, p2) {
- rest := name[len(p2):]
- dot := strings.IndexByte(rest, '.')
- if dot <= 0 {
- return 0, false
- }
- n, err := strconv.Atoi(rest[:dot])
- if err != nil {
- return 0, false
- }
- return n, true
- }
- return 0, false
- }
- func intFromAny(v any, def int) int {
- switch x := v.(type) {
- case int:
- return x
- case int64:
- return int(x)
- case float64:
- return int(x)
- case float32:
- return int(x)
- case string:
- if n, err := strconv.Atoi(x); err == nil {
- return n
- }
- }
- return def
- }
- func parseIntListAny(v any) []int {
- arr, ok := v.([]any)
- if !ok {
- if f, ok := v.([]float64); ok {
- out := make([]int, len(f))
- for i := range f {
- out[i] = int(f[i])
- }
- return out
- }
- return nil
- }
- out := make([]int, 0, len(arr))
- for _, it := range arr {
- switch x := it.(type) {
- case float64:
- out = append(out, int(x))
- case int:
- out = append(out, x)
- }
- }
- return out
- }
- // cudaMemInfo queries free/total GPU memory; returns ok=false if CUDA not available.
- func cudaMemInfo() (total uint64, free uint64, err error) {
- return cudaMemoryInfo()
- }
|