| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544 |
- // Package kvcache implements a paged KV cache with global prefix caching.
- package kvcache
- import (
- "crypto/sha256"
- "encoding/binary"
- "fmt"
- "sync"
- "makarna/pkg/backend/cpu"
- "makarna/pkg/backend/cuda"
- "makarna/pkg/tensor"
- )
- // BlockHash is the hash of a block's token content for prefix caching.
- type BlockHash [32]byte
- // KVBlock represents a single block of KV cache memory.
- type KVBlock struct {
- ID int
- K tensor.Tensor // [blockSize, kvDim]
- V tensor.Tensor // [blockSize, kvDim]
- pk []float32
- pv []float32
- Hash *BlockHash // nil if not cached/committed
- RefCount int // number of requests using this block
- Layer int // which layer this block belongs to
- GPU int
- }
- // BlockPoolConfig configures the block pool.
- type BlockPoolConfig struct {
- NumLayers int
- NumKVHeads int
- HeadDim int
- BlockSize int
- NumBlocks int // total blocks per layer
- Device tensor.DeviceType
- GPU int
- LayerGPUs []int
- // LayerPlacements optionally overrides per-layer device placement.
- // If provided, it must have length NumLayers and may mix CPU/CUDA layers.
- // When set, Device/GPU/LayerGPUs are treated as defaults for layers that
- // don't specify a valid placement.
- LayerPlacements []tensor.DevicePlacement
- Preallocate bool
- }
- // BlockPool manages a pool of KV cache blocks with prefix caching.
- type BlockPool struct {
- cfg BlockPoolConfig
- mu sync.RWMutex
- // blocks[layer][blockID] = *KVBlock
- blocks [][]*KVBlock
- // Free block IDs per layer (LIFO for locality)
- freeBlocks [][]int
- // Hash → block mapping for prefix caching (per layer)
- // hashToBlock[layer][hash] = blockID
- hashToBlock []map[BlockHash]int
- // Stats
- stats PrefixCacheStats
- // CUDA preallocation: one contiguous buffer per layer for K and V.
- // When set, ensureAllocatedLocked creates block views into these buffers.
- kvDim int
- kBase []*cuda.Tensor
- vBase []*cuda.Tensor
- layerGPUs []int
- layerPlacements []tensor.DevicePlacement
- }
- // PrefixCacheStats tracks cache performance.
- type PrefixCacheStats struct {
- Hits int64
- Misses int64
- Evictions int64
- Allocations int64
- }
- // NewBlockPool creates a new block pool.
- func NewBlockPool(cfg BlockPoolConfig) (*BlockPool, error) {
- if cfg.BlockSize <= 0 {
- cfg.BlockSize = 16
- }
- if cfg.NumBlocks <= 0 {
- cfg.NumBlocks = 1024
- }
- bp := &BlockPool{
- cfg: cfg,
- blocks: make([][]*KVBlock, cfg.NumLayers),
- freeBlocks: make([][]int, cfg.NumLayers),
- hashToBlock: make([]map[BlockHash]int, cfg.NumLayers),
- kvDim: cfg.NumKVHeads * cfg.HeadDim,
- }
- if len(cfg.LayerPlacements) != 0 {
- if len(cfg.LayerPlacements) != cfg.NumLayers {
- return nil, fmt.Errorf("LayerPlacements length %d != NumLayers %d", len(cfg.LayerPlacements), cfg.NumLayers)
- }
- bp.layerPlacements = make([]tensor.DevicePlacement, cfg.NumLayers)
- for i := range cfg.LayerPlacements {
- bp.layerPlacements[i] = cfg.LayerPlacements[i].Normalize()
- }
- }
- if cfg.Device == tensor.CUDA || bp.layerPlacements != nil {
- if len(cfg.LayerGPUs) != 0 && len(cfg.LayerGPUs) != cfg.NumLayers {
- return nil, fmt.Errorf("LayerGPUs length %d != NumLayers %d", len(cfg.LayerGPUs), cfg.NumLayers)
- }
- if len(cfg.LayerGPUs) == cfg.NumLayers {
- bp.layerGPUs = append([]int(nil), cfg.LayerGPUs...)
- } else {
- bp.layerGPUs = make([]int, cfg.NumLayers)
- for i := range bp.layerGPUs {
- bp.layerGPUs[i] = cfg.GPU
- }
- }
- }
- // Initialize block metadata. K/V tensors are allocated lazily on first use.
- for layer := 0; layer < cfg.NumLayers; layer++ {
- bp.blocks[layer] = make([]*KVBlock, cfg.NumBlocks)
- bp.freeBlocks[layer] = make([]int, 0, cfg.NumBlocks)
- bp.hashToBlock[layer] = make(map[BlockHash]int)
- for b := 0; b < cfg.NumBlocks; b++ {
- p := bp.LayerDevice(layer)
- gpu := p.GPU
- if p.Type != tensor.CUDA {
- gpu = -1
- }
- bp.blocks[layer][b] = &KVBlock{ID: b, Layer: layer, GPU: gpu}
- bp.freeBlocks[layer] = append(bp.freeBlocks[layer], b)
- }
- }
- if cfg.Preallocate && cuda.Available() {
- if err := bp.preallocateCUDA(); err != nil {
- bp.Free()
- return nil, err
- }
- }
- return bp, nil
- }
- func (bp *BlockPool) preallocateCUDA() error {
- if bp.kvDim <= 0 {
- return fmt.Errorf("invalid kvDim %d", bp.kvDim)
- }
- if bp.cfg.BlockSize <= 0 || bp.cfg.NumBlocks <= 0 {
- return fmt.Errorf("invalid block pool sizes: blockSize=%d numBlocks=%d", bp.cfg.BlockSize, bp.cfg.NumBlocks)
- }
- totalTokens := bp.cfg.NumBlocks * bp.cfg.BlockSize
- shape := tensor.Shape{totalTokens, bp.kvDim}
- bp.kBase = make([]*cuda.Tensor, bp.cfg.NumLayers)
- bp.vBase = make([]*cuda.Tensor, bp.cfg.NumLayers)
- for layer := 0; layer < bp.cfg.NumLayers; layer++ {
- p := bp.LayerDevice(layer)
- if p.Type != tensor.CUDA {
- continue
- }
- gpu := p.GPU
- k, err := cuda.NewTensor(shape, tensor.Float16, gpu)
- if err != nil {
- return fmt.Errorf("preallocate K layer %d: %w", layer, err)
- }
- v, err := cuda.NewTensor(shape, tensor.Float16, gpu)
- if err != nil {
- k.Free()
- return fmt.Errorf("preallocate V layer %d: %w", layer, err)
- }
- bp.kBase[layer] = k
- bp.vBase[layer] = v
- }
- return nil
- }
- func (bp *BlockPool) ensureAllocatedLocked(layer, blockID int) (*KVBlock, error) {
- if layer < 0 || layer >= len(bp.blocks) {
- return nil, fmt.Errorf("invalid layer %d", layer)
- }
- if blockID < 0 || blockID >= len(bp.blocks[layer]) {
- return nil, fmt.Errorf("invalid blockID %d", blockID)
- }
- block := bp.blocks[layer][blockID]
- if block == nil {
- p := bp.LayerDevice(layer)
- gpu := p.GPU
- if p.Type != tensor.CUDA {
- gpu = -1
- }
- block = &KVBlock{ID: blockID, Layer: layer, GPU: gpu}
- bp.blocks[layer][blockID] = block
- }
- if block.K != nil && block.V != nil {
- return block, nil
- }
- kvDim := bp.kvDim
- p := bp.LayerDevice(layer)
- if p.Type == tensor.CUDA && bp.kBase != nil && bp.vBase != nil {
- if layer < len(bp.kBase) && layer < len(bp.vBase) && bp.kBase[layer] != nil && bp.vBase[layer] != nil {
- elemSize := uintptr(tensor.Float16.Size())
- blockElems := uintptr(bp.cfg.BlockSize * kvDim)
- offsetBytes := uintptr(blockID) * blockElems * elemSize
- k, err := bp.kBase[layer].ViewAt(tensor.Shape{bp.cfg.BlockSize, kvDim}, offsetBytes)
- if err != nil {
- return nil, fmt.Errorf("create K view: %w", err)
- }
- v, err := bp.vBase[layer].ViewAt(tensor.Shape{bp.cfg.BlockSize, kvDim}, offsetBytes)
- if err != nil {
- return nil, fmt.Errorf("create V view: %w", err)
- }
- block.K = k
- block.V = v
- }
- }
- if block.K == nil || block.V == nil {
- k, v, err := allocateBlock(bp.cfg.BlockSize, kvDim, p.Type, p.GPU)
- if err != nil {
- return nil, err
- }
- block.K = k
- block.V = v
- }
- if p.Type == tensor.CPU {
- bufSize := bp.cfg.NumKVHeads * bp.cfg.BlockSize * bp.cfg.HeadDim
- block.pk = make([]float32, bufSize)
- block.pv = make([]float32, bufSize)
- }
- return block, nil
- }
- func allocateBlock(blockSize, kvDim int, device tensor.DeviceType, gpu int) (tensor.Tensor, tensor.Tensor, error) {
- shape := tensor.Shape{blockSize, kvDim}
- if device == tensor.CUDA && cuda.Available() {
- k, err := cuda.NewTensor(shape, tensor.Float16, gpu)
- if err != nil {
- return nil, nil, err
- }
- v, err := cuda.NewTensor(shape, tensor.Float16, gpu)
- if err != nil {
- return nil, nil, err
- }
- return k, v, nil
- }
- k, err := cpu.NewTensorU16(shape, tensor.Float16, nil)
- if err != nil {
- return nil, nil, err
- }
- v, err := cpu.NewTensorU16(shape, tensor.Float16, nil)
- if err != nil {
- return nil, nil, err
- }
- return k, v, nil
- }
- // ComputeBlockHash computes hash for a sequence of token IDs.
- // This is used for prefix caching - same tokens = same hash.
- func ComputeBlockHash(tokens []int, parentHash *BlockHash) BlockHash {
- h := sha256.New()
- if parentHash != nil {
- h.Write(parentHash[:])
- }
- buf := make([]byte, 4)
- for _, t := range tokens {
- binary.LittleEndian.PutUint32(buf, uint32(t))
- h.Write(buf)
- }
- var hash BlockHash
- copy(hash[:], h.Sum(nil))
- return hash
- }
- // ComputeBlockHashes computes hashes for all blocks in a token sequence.
- func ComputeBlockHashes(tokens []int, blockSize int) []BlockHash {
- numBlocks := (len(tokens) + blockSize - 1) / blockSize
- hashes := make([]BlockHash, numBlocks)
- var parentHash *BlockHash
- for i := 0; i < numBlocks; i++ {
- start := i * blockSize
- end := start + blockSize
- if end > len(tokens) {
- end = len(tokens)
- }
- hashes[i] = ComputeBlockHash(tokens[start:end], parentHash)
- parentHash = &hashes[i]
- }
- return hashes
- }
- // FindCachedBlocks finds the longest prefix of blocks that are already cached.
- // Returns block IDs and the number of tokens covered.
- func (bp *BlockPool) FindCachedBlocks(layer int, hashes []BlockHash) ([]int, int) {
- bp.mu.RLock()
- defer bp.mu.RUnlock()
- if layer < 0 || layer >= len(bp.hashToBlock) {
- return nil, 0
- }
- cached := make([]int, 0, len(hashes))
- for _, hash := range hashes {
- if blockID, ok := bp.hashToBlock[layer][hash]; ok {
- cached = append(cached, blockID)
- bp.stats.Hits++
- } else {
- bp.stats.Misses++
- break // prefix caching requires contiguous hits
- }
- }
- return cached, len(cached) * bp.cfg.BlockSize
- }
- // AllocateBlocks allocates new blocks for a request.
- // If needed, evicts least-recently-used cached blocks.
- func (bp *BlockPool) AllocateBlocks(layer int, count int) ([]int, error) {
- bp.mu.Lock()
- defer bp.mu.Unlock()
- if layer < 0 || layer >= len(bp.freeBlocks) {
- return nil, fmt.Errorf("invalid layer %d", layer)
- }
- allocated := make([]int, 0, count)
- // First try free blocks
- for len(allocated) < count && len(bp.freeBlocks[layer]) > 0 {
- n := len(bp.freeBlocks[layer])
- blockID := bp.freeBlocks[layer][n-1]
- bp.freeBlocks[layer] = bp.freeBlocks[layer][:n-1]
- if _, err := bp.ensureAllocatedLocked(layer, blockID); err != nil {
- // Return blockID to free list and fail.
- bp.freeBlocks[layer] = append(bp.freeBlocks[layer], blockID)
- return allocated, fmt.Errorf("allocate block %d: %w", blockID, err)
- }
- allocated = append(allocated, blockID)
- bp.stats.Allocations++
- }
- // If still need more, evict cached blocks with refcount=0
- if len(allocated) < count {
- for hash, blockID := range bp.hashToBlock[layer] {
- block := bp.blocks[layer][blockID]
- if block.RefCount == 0 {
- // Evict
- delete(bp.hashToBlock[layer], hash)
- block.Hash = nil
- if _, err := bp.ensureAllocatedLocked(layer, blockID); err != nil {
- return allocated, fmt.Errorf("allocate evicted block %d: %w", blockID, err)
- }
- allocated = append(allocated, blockID)
- bp.stats.Evictions++
- bp.stats.Allocations++
- if len(allocated) >= count {
- break
- }
- }
- }
- }
- if len(allocated) < count {
- return allocated, fmt.Errorf("not enough blocks: need %d, got %d", count, len(allocated))
- }
- // Increment ref counts
- for _, blockID := range allocated {
- bp.blocks[layer][blockID].RefCount++
- }
- return allocated, nil
- }
- // TouchBlocks increments ref count for cached blocks being reused.
- func (bp *BlockPool) TouchBlocks(layer int, blockIDs []int) {
- bp.mu.Lock()
- defer bp.mu.Unlock()
- for _, blockID := range blockIDs {
- if blockID >= 0 && blockID < len(bp.blocks[layer]) {
- bp.blocks[layer][blockID].RefCount++
- }
- }
- }
- // CacheBlocks registers blocks in the hash cache after they're computed.
- func (bp *BlockPool) CacheBlocks(layer int, blockIDs []int, hashes []BlockHash) {
- bp.mu.Lock()
- defer bp.mu.Unlock()
- for i, blockID := range blockIDs {
- if i >= len(hashes) {
- break
- }
- block, err := bp.ensureAllocatedLocked(layer, blockID)
- if err != nil {
- continue
- }
- hash := hashes[i]
- block.Hash = &hash
- bp.hashToBlock[layer][hash] = blockID
- }
- }
- // FreeBlocks decrements ref counts; blocks with refcount=0 become eviction candidates.
- func (bp *BlockPool) FreeBlocks(layer int, blockIDs []int) {
- bp.mu.Lock()
- defer bp.mu.Unlock()
- for _, blockID := range blockIDs {
- if blockID >= 0 && blockID < len(bp.blocks[layer]) {
- block := bp.blocks[layer][blockID]
- if block.RefCount > 0 {
- block.RefCount--
- }
- // Don't return to free list if cached (stays as eviction candidate)
- // Only truly free if not in hash cache
- if block.RefCount == 0 && block.Hash == nil {
- bp.freeBlocks[layer] = append(bp.freeBlocks[layer], blockID)
- }
- }
- }
- }
- // GetBlock returns a block by ID.
- func (bp *BlockPool) GetBlock(layer, blockID int) *KVBlock {
- bp.mu.Lock()
- defer bp.mu.Unlock()
- block, err := bp.ensureAllocatedLocked(layer, blockID)
- if err != nil {
- return nil
- }
- return block
- }
- func (bp *BlockPool) LayerDevice(layer int) tensor.DevicePlacement {
- if bp == nil {
- return tensor.DevicePlacement{Type: tensor.CPU, GPU: -1}
- }
- if bp.layerPlacements != nil && layer >= 0 && layer < len(bp.layerPlacements) {
- p := bp.layerPlacements[layer].Normalize()
- // Defensive: normalize invalid CUDA GPU ids.
- if p.Type == tensor.CUDA && p.GPU < 0 {
- p.GPU = 0
- }
- return p
- }
- if bp.cfg.Device != tensor.CUDA {
- return tensor.DevicePlacement{Type: bp.cfg.Device, GPU: -1}
- }
- gpu := bp.cfg.GPU
- if bp.layerGPUs != nil && layer >= 0 && layer < len(bp.layerGPUs) {
- gpu = bp.layerGPUs[layer]
- }
- return tensor.DevicePlacement{Type: tensor.CUDA, GPU: gpu}
- }
- // Stats returns current cache statistics.
- func (bp *BlockPool) Stats() PrefixCacheStats {
- bp.mu.RLock()
- defer bp.mu.RUnlock()
- return bp.stats
- }
- // NumFreeBlocks returns free block count for a layer.
- func (bp *BlockPool) NumFreeBlocks(layer int) int {
- bp.mu.RLock()
- defer bp.mu.RUnlock()
- if layer < 0 || layer >= len(bp.freeBlocks) {
- return 0
- }
- return len(bp.freeBlocks[layer])
- }
- // Usage returns cache utilization (0.0 to 1.0).
- func (bp *BlockPool) Usage() float64 {
- bp.mu.RLock()
- defer bp.mu.RUnlock()
- if bp.cfg.NumBlocks == 0 {
- return 0
- }
- // Average across layers
- totalFree := 0
- for _, free := range bp.freeBlocks {
- totalFree += len(free)
- }
- avgFree := float64(totalFree) / float64(len(bp.freeBlocks))
- return 1.0 - (avgFree / float64(bp.cfg.NumBlocks))
- }
- // Free releases all GPU memory.
- func (bp *BlockPool) Free() {
- bp.mu.Lock()
- defer bp.mu.Unlock()
- // Free contiguous CUDA buffers first (if present).
- for i := range bp.kBase {
- if bp.kBase[i] != nil {
- bp.kBase[i].Free()
- bp.kBase[i] = nil
- }
- }
- for i := range bp.vBase {
- if bp.vBase[i] != nil {
- bp.vBase[i].Free()
- bp.vBase[i] = nil
- }
- }
- bp.kBase = nil
- bp.vBase = nil
- for _, layerBlocks := range bp.blocks {
- for _, block := range layerBlocks {
- if block == nil {
- continue
- }
- if ct, ok := block.K.(*cuda.Tensor); ok && ct != nil {
- ct.Free()
- }
- if ct, ok := block.V.(*cuda.Tensor); ok && ct != nil {
- ct.Free()
- }
- }
- }
- bp.blocks = nil
- bp.freeBlocks = nil
- bp.hashToBlock = nil
- }
|