// Package kvcache implements a paged KV cache with global prefix caching. package kvcache import ( "crypto/sha256" "encoding/binary" "fmt" "sync" "makarna/pkg/backend/cpu" "makarna/pkg/backend/cuda" "makarna/pkg/tensor" ) // BlockHash is the hash of a block's token content for prefix caching. type BlockHash [32]byte // KVBlock represents a single block of KV cache memory. type KVBlock struct { ID int K tensor.Tensor // [blockSize, kvDim] V tensor.Tensor // [blockSize, kvDim] pk []float32 pv []float32 Hash *BlockHash // nil if not cached/committed RefCount int // number of requests using this block Layer int // which layer this block belongs to GPU int } // BlockPoolConfig configures the block pool. type BlockPoolConfig struct { NumLayers int NumKVHeads int HeadDim int BlockSize int NumBlocks int // total blocks per layer Device tensor.DeviceType GPU int LayerGPUs []int // LayerPlacements optionally overrides per-layer device placement. // If provided, it must have length NumLayers and may mix CPU/CUDA layers. // When set, Device/GPU/LayerGPUs are treated as defaults for layers that // don't specify a valid placement. LayerPlacements []tensor.DevicePlacement Preallocate bool } // BlockPool manages a pool of KV cache blocks with prefix caching. type BlockPool struct { cfg BlockPoolConfig mu sync.RWMutex // blocks[layer][blockID] = *KVBlock blocks [][]*KVBlock // Free block IDs per layer (LIFO for locality) freeBlocks [][]int // Hash → block mapping for prefix caching (per layer) // hashToBlock[layer][hash] = blockID hashToBlock []map[BlockHash]int // Stats stats PrefixCacheStats // CUDA preallocation: one contiguous buffer per layer for K and V. // When set, ensureAllocatedLocked creates block views into these buffers. kvDim int kBase []*cuda.Tensor vBase []*cuda.Tensor layerGPUs []int layerPlacements []tensor.DevicePlacement } // PrefixCacheStats tracks cache performance. type PrefixCacheStats struct { Hits int64 Misses int64 Evictions int64 Allocations int64 } // NewBlockPool creates a new block pool. func NewBlockPool(cfg BlockPoolConfig) (*BlockPool, error) { if cfg.BlockSize <= 0 { cfg.BlockSize = 16 } if cfg.NumBlocks <= 0 { cfg.NumBlocks = 1024 } bp := &BlockPool{ cfg: cfg, blocks: make([][]*KVBlock, cfg.NumLayers), freeBlocks: make([][]int, cfg.NumLayers), hashToBlock: make([]map[BlockHash]int, cfg.NumLayers), kvDim: cfg.NumKVHeads * cfg.HeadDim, } if len(cfg.LayerPlacements) != 0 { if len(cfg.LayerPlacements) != cfg.NumLayers { return nil, fmt.Errorf("LayerPlacements length %d != NumLayers %d", len(cfg.LayerPlacements), cfg.NumLayers) } bp.layerPlacements = make([]tensor.DevicePlacement, cfg.NumLayers) for i := range cfg.LayerPlacements { bp.layerPlacements[i] = cfg.LayerPlacements[i].Normalize() } } if cfg.Device == tensor.CUDA || bp.layerPlacements != nil { if len(cfg.LayerGPUs) != 0 && len(cfg.LayerGPUs) != cfg.NumLayers { return nil, fmt.Errorf("LayerGPUs length %d != NumLayers %d", len(cfg.LayerGPUs), cfg.NumLayers) } if len(cfg.LayerGPUs) == cfg.NumLayers { bp.layerGPUs = append([]int(nil), cfg.LayerGPUs...) } else { bp.layerGPUs = make([]int, cfg.NumLayers) for i := range bp.layerGPUs { bp.layerGPUs[i] = cfg.GPU } } } // Initialize block metadata. K/V tensors are allocated lazily on first use. for layer := 0; layer < cfg.NumLayers; layer++ { bp.blocks[layer] = make([]*KVBlock, cfg.NumBlocks) bp.freeBlocks[layer] = make([]int, 0, cfg.NumBlocks) bp.hashToBlock[layer] = make(map[BlockHash]int) for b := 0; b < cfg.NumBlocks; b++ { p := bp.LayerDevice(layer) gpu := p.GPU if p.Type != tensor.CUDA { gpu = -1 } bp.blocks[layer][b] = &KVBlock{ID: b, Layer: layer, GPU: gpu} bp.freeBlocks[layer] = append(bp.freeBlocks[layer], b) } } if cfg.Preallocate && cuda.Available() { if err := bp.preallocateCUDA(); err != nil { bp.Free() return nil, err } } return bp, nil } func (bp *BlockPool) preallocateCUDA() error { if bp.kvDim <= 0 { return fmt.Errorf("invalid kvDim %d", bp.kvDim) } if bp.cfg.BlockSize <= 0 || bp.cfg.NumBlocks <= 0 { return fmt.Errorf("invalid block pool sizes: blockSize=%d numBlocks=%d", bp.cfg.BlockSize, bp.cfg.NumBlocks) } totalTokens := bp.cfg.NumBlocks * bp.cfg.BlockSize shape := tensor.Shape{totalTokens, bp.kvDim} bp.kBase = make([]*cuda.Tensor, bp.cfg.NumLayers) bp.vBase = make([]*cuda.Tensor, bp.cfg.NumLayers) for layer := 0; layer < bp.cfg.NumLayers; layer++ { p := bp.LayerDevice(layer) if p.Type != tensor.CUDA { continue } gpu := p.GPU k, err := cuda.NewTensor(shape, tensor.Float16, gpu) if err != nil { return fmt.Errorf("preallocate K layer %d: %w", layer, err) } v, err := cuda.NewTensor(shape, tensor.Float16, gpu) if err != nil { k.Free() return fmt.Errorf("preallocate V layer %d: %w", layer, err) } bp.kBase[layer] = k bp.vBase[layer] = v } return nil } func (bp *BlockPool) ensureAllocatedLocked(layer, blockID int) (*KVBlock, error) { if layer < 0 || layer >= len(bp.blocks) { return nil, fmt.Errorf("invalid layer %d", layer) } if blockID < 0 || blockID >= len(bp.blocks[layer]) { return nil, fmt.Errorf("invalid blockID %d", blockID) } block := bp.blocks[layer][blockID] if block == nil { p := bp.LayerDevice(layer) gpu := p.GPU if p.Type != tensor.CUDA { gpu = -1 } block = &KVBlock{ID: blockID, Layer: layer, GPU: gpu} bp.blocks[layer][blockID] = block } if block.K != nil && block.V != nil { return block, nil } kvDim := bp.kvDim p := bp.LayerDevice(layer) if p.Type == tensor.CUDA && bp.kBase != nil && bp.vBase != nil { if layer < len(bp.kBase) && layer < len(bp.vBase) && bp.kBase[layer] != nil && bp.vBase[layer] != nil { elemSize := uintptr(tensor.Float16.Size()) blockElems := uintptr(bp.cfg.BlockSize * kvDim) offsetBytes := uintptr(blockID) * blockElems * elemSize k, err := bp.kBase[layer].ViewAt(tensor.Shape{bp.cfg.BlockSize, kvDim}, offsetBytes) if err != nil { return nil, fmt.Errorf("create K view: %w", err) } v, err := bp.vBase[layer].ViewAt(tensor.Shape{bp.cfg.BlockSize, kvDim}, offsetBytes) if err != nil { return nil, fmt.Errorf("create V view: %w", err) } block.K = k block.V = v } } if block.K == nil || block.V == nil { k, v, err := allocateBlock(bp.cfg.BlockSize, kvDim, p.Type, p.GPU) if err != nil { return nil, err } block.K = k block.V = v } if p.Type == tensor.CPU { bufSize := bp.cfg.NumKVHeads * bp.cfg.BlockSize * bp.cfg.HeadDim block.pk = make([]float32, bufSize) block.pv = make([]float32, bufSize) } return block, nil } func allocateBlock(blockSize, kvDim int, device tensor.DeviceType, gpu int) (tensor.Tensor, tensor.Tensor, error) { shape := tensor.Shape{blockSize, kvDim} if device == tensor.CUDA && cuda.Available() { k, err := cuda.NewTensor(shape, tensor.Float16, gpu) if err != nil { return nil, nil, err } v, err := cuda.NewTensor(shape, tensor.Float16, gpu) if err != nil { return nil, nil, err } return k, v, nil } k, err := cpu.NewTensorU16(shape, tensor.Float16, nil) if err != nil { return nil, nil, err } v, err := cpu.NewTensorU16(shape, tensor.Float16, nil) if err != nil { return nil, nil, err } return k, v, nil } // ComputeBlockHash computes hash for a sequence of token IDs. // This is used for prefix caching - same tokens = same hash. func ComputeBlockHash(tokens []int, parentHash *BlockHash) BlockHash { h := sha256.New() if parentHash != nil { h.Write(parentHash[:]) } buf := make([]byte, 4) for _, t := range tokens { binary.LittleEndian.PutUint32(buf, uint32(t)) h.Write(buf) } var hash BlockHash copy(hash[:], h.Sum(nil)) return hash } // ComputeBlockHashes computes hashes for all blocks in a token sequence. func ComputeBlockHashes(tokens []int, blockSize int) []BlockHash { numBlocks := (len(tokens) + blockSize - 1) / blockSize hashes := make([]BlockHash, numBlocks) var parentHash *BlockHash for i := 0; i < numBlocks; i++ { start := i * blockSize end := start + blockSize if end > len(tokens) { end = len(tokens) } hashes[i] = ComputeBlockHash(tokens[start:end], parentHash) parentHash = &hashes[i] } return hashes } // FindCachedBlocks finds the longest prefix of blocks that are already cached. // Returns block IDs and the number of tokens covered. func (bp *BlockPool) FindCachedBlocks(layer int, hashes []BlockHash) ([]int, int) { bp.mu.RLock() defer bp.mu.RUnlock() if layer < 0 || layer >= len(bp.hashToBlock) { return nil, 0 } cached := make([]int, 0, len(hashes)) for _, hash := range hashes { if blockID, ok := bp.hashToBlock[layer][hash]; ok { cached = append(cached, blockID) bp.stats.Hits++ } else { bp.stats.Misses++ break // prefix caching requires contiguous hits } } return cached, len(cached) * bp.cfg.BlockSize } // AllocateBlocks allocates new blocks for a request. // If needed, evicts least-recently-used cached blocks. func (bp *BlockPool) AllocateBlocks(layer int, count int) ([]int, error) { bp.mu.Lock() defer bp.mu.Unlock() if layer < 0 || layer >= len(bp.freeBlocks) { return nil, fmt.Errorf("invalid layer %d", layer) } allocated := make([]int, 0, count) // First try free blocks for len(allocated) < count && len(bp.freeBlocks[layer]) > 0 { n := len(bp.freeBlocks[layer]) blockID := bp.freeBlocks[layer][n-1] bp.freeBlocks[layer] = bp.freeBlocks[layer][:n-1] if _, err := bp.ensureAllocatedLocked(layer, blockID); err != nil { // Return blockID to free list and fail. bp.freeBlocks[layer] = append(bp.freeBlocks[layer], blockID) return allocated, fmt.Errorf("allocate block %d: %w", blockID, err) } allocated = append(allocated, blockID) bp.stats.Allocations++ } // If still need more, evict cached blocks with refcount=0 if len(allocated) < count { for hash, blockID := range bp.hashToBlock[layer] { block := bp.blocks[layer][blockID] if block.RefCount == 0 { // Evict delete(bp.hashToBlock[layer], hash) block.Hash = nil if _, err := bp.ensureAllocatedLocked(layer, blockID); err != nil { return allocated, fmt.Errorf("allocate evicted block %d: %w", blockID, err) } allocated = append(allocated, blockID) bp.stats.Evictions++ bp.stats.Allocations++ if len(allocated) >= count { break } } } } if len(allocated) < count { return allocated, fmt.Errorf("not enough blocks: need %d, got %d", count, len(allocated)) } // Increment ref counts for _, blockID := range allocated { bp.blocks[layer][blockID].RefCount++ } return allocated, nil } // TouchBlocks increments ref count for cached blocks being reused. func (bp *BlockPool) TouchBlocks(layer int, blockIDs []int) { bp.mu.Lock() defer bp.mu.Unlock() for _, blockID := range blockIDs { if blockID >= 0 && blockID < len(bp.blocks[layer]) { bp.blocks[layer][blockID].RefCount++ } } } // CacheBlocks registers blocks in the hash cache after they're computed. func (bp *BlockPool) CacheBlocks(layer int, blockIDs []int, hashes []BlockHash) { bp.mu.Lock() defer bp.mu.Unlock() for i, blockID := range blockIDs { if i >= len(hashes) { break } block, err := bp.ensureAllocatedLocked(layer, blockID) if err != nil { continue } hash := hashes[i] block.Hash = &hash bp.hashToBlock[layer][hash] = blockID } } // FreeBlocks decrements ref counts; blocks with refcount=0 become eviction candidates. func (bp *BlockPool) FreeBlocks(layer int, blockIDs []int) { bp.mu.Lock() defer bp.mu.Unlock() for _, blockID := range blockIDs { if blockID >= 0 && blockID < len(bp.blocks[layer]) { block := bp.blocks[layer][blockID] if block.RefCount > 0 { block.RefCount-- } // Don't return to free list if cached (stays as eviction candidate) // Only truly free if not in hash cache if block.RefCount == 0 && block.Hash == nil { bp.freeBlocks[layer] = append(bp.freeBlocks[layer], blockID) } } } } // GetBlock returns a block by ID. func (bp *BlockPool) GetBlock(layer, blockID int) *KVBlock { bp.mu.Lock() defer bp.mu.Unlock() block, err := bp.ensureAllocatedLocked(layer, blockID) if err != nil { return nil } return block } func (bp *BlockPool) LayerDevice(layer int) tensor.DevicePlacement { if bp == nil { return tensor.DevicePlacement{Type: tensor.CPU, GPU: -1} } if bp.layerPlacements != nil && layer >= 0 && layer < len(bp.layerPlacements) { p := bp.layerPlacements[layer].Normalize() // Defensive: normalize invalid CUDA GPU ids. if p.Type == tensor.CUDA && p.GPU < 0 { p.GPU = 0 } return p } if bp.cfg.Device != tensor.CUDA { return tensor.DevicePlacement{Type: bp.cfg.Device, GPU: -1} } gpu := bp.cfg.GPU if bp.layerGPUs != nil && layer >= 0 && layer < len(bp.layerGPUs) { gpu = bp.layerGPUs[layer] } return tensor.DevicePlacement{Type: tensor.CUDA, GPU: gpu} } // Stats returns current cache statistics. func (bp *BlockPool) Stats() PrefixCacheStats { bp.mu.RLock() defer bp.mu.RUnlock() return bp.stats } // NumFreeBlocks returns free block count for a layer. func (bp *BlockPool) NumFreeBlocks(layer int) int { bp.mu.RLock() defer bp.mu.RUnlock() if layer < 0 || layer >= len(bp.freeBlocks) { return 0 } return len(bp.freeBlocks[layer]) } // Usage returns cache utilization (0.0 to 1.0). func (bp *BlockPool) Usage() float64 { bp.mu.RLock() defer bp.mu.RUnlock() if bp.cfg.NumBlocks == 0 { return 0 } // Average across layers totalFree := 0 for _, free := range bp.freeBlocks { totalFree += len(free) } avgFree := float64(totalFree) / float64(len(bp.freeBlocks)) return 1.0 - (avgFree / float64(bp.cfg.NumBlocks)) } // Free releases all GPU memory. func (bp *BlockPool) Free() { bp.mu.Lock() defer bp.mu.Unlock() // Free contiguous CUDA buffers first (if present). for i := range bp.kBase { if bp.kBase[i] != nil { bp.kBase[i].Free() bp.kBase[i] = nil } } for i := range bp.vBase { if bp.vBase[i] != nil { bp.vBase[i].Free() bp.vBase[i] = nil } } bp.kBase = nil bp.vBase = nil for _, layerBlocks := range bp.blocks { for _, block := range layerBlocks { if block == nil { continue } if ct, ok := block.K.(*cuda.Tensor); ok && ct != nil { ct.Free() } if ct, ok := block.V.(*cuda.Tensor); ok && ct != nil { ct.Free() } } } bp.blocks = nil bp.freeBlocks = nil bp.hashToBlock = nil }