| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342 |
- package compute
- import (
- "fmt"
- "unsafe"
- "makarna/pkg/backend/cpu"
- "makarna/pkg/backend/cpu/nn"
- "makarna/pkg/backend/cuda"
- "makarna/pkg/backend/device"
- "makarna/pkg/tensor"
- )
- type MoEConfig struct {
- NumExperts int
- TopK int
- IntermediateSize int
- RouterActivationFunc string // "softmax" or "sigmoid"
- UseGroupedTopK bool
- NumExpertGroup int
- TopKGroup int
- Renormalize bool
- RoutedScalingFactor float32
- NumSharedExperts int
- }
- type MoEWeights struct {
- GateW tensor.Tensor
- GateBias tensor.Tensor
- W1 []tensor.Tensor // gate projections per expert
- W2 []tensor.Tensor // down projections per expert
- W3 []tensor.Tensor // up projections per expert
- SharedGate, SharedUp, SharedDown tensor.Tensor
- }
- func HybridMoE(
- ctx *Context,
- hidden *Activation,
- weights *MoEWeights,
- cfg MoEConfig,
- out *Activation,
- ) error {
- // If CPUMoE is enabled, keep expert weights on CPU (saves GPU memory for large MoE models)
- if ctx != nil && ctx.CPUMoE {
- return hybridMoECPU(ctx, hidden, weights, cfg, out)
- }
- if ctx != nil && ctx.IsGPU() && hidden.IsGPU() && device.CUDAAvailable() && cuda.Available() {
- err := hybridMoEGPU(ctx, hidden, weights, cfg, out)
- if err == nil {
- return nil
- }
- // GPU failed (likely OOM), fallback to CPU
- }
- return hybridMoECPU(ctx, hidden, weights, cfg, out)
- }
- func hybridMoEGPU(
- ctx *Context,
- hidden *Activation,
- weights *MoEWeights,
- cfg MoEConfig,
- out *Activation,
- ) error {
- gpu := ctx.Placement().GPU
- seqLen := hidden.Shape()[0]
- hiddenSize := hidden.Shape()[1]
- // 1. Router: hidden -> gate scores
- gateAct, _ := NewActivation(tensor.Shape{seqLen, cfg.NumExperts}, tensor.DevicePlacement{Type: tensor.CUDA, GPU: gpu})
- defer FreeActivation(gateAct)
- if err := HybridLinear(ctx, hidden, weights.GateW, gateAct); err != nil {
- return err
- }
- gateCUDA, _ := gateAct.AsCUDA(gpu)
- gatePtr := gateCUDA.Data().(unsafe.Pointer)
- // 2. Apply router activation (softmax or sigmoid)
- if cfg.RouterActivationFunc == "softmax" {
- if err := cuda.SoftmaxRows(gatePtr, seqLen, cfg.NumExperts, gpu); err != nil {
- return err
- }
- } else {
- if err := cuda.Sigmoid(gatePtr, seqLen*cfg.NumExperts, gpu); err != nil {
- return err
- }
- }
- // 3. Add bias and TopK selection
- // For now, download to CPU for complex routing logic, then upload results
- gateCPU, err := gateAct.AsCPU()
- if err != nil {
- return err
- }
- gateData := gateCPU.DataFloat32()
- biasCPU, ok := weights.GateBias.(*cpu.Tensor)
- if !ok {
- return fmt.Errorf("moe gate bias not cpu tensor")
- }
- biasData := biasCPU.DataFloat32()
- // Initialize output on GPU
- outCUDA, err := out.AsCUDA(gpu)
- if err != nil {
- return err
- }
- outCPU := make([]float32, seqLen*hiddenSize)
- hiddenCPU, err := hidden.AsCPU()
- if err != nil {
- return err
- }
- inData := hiddenCPU.DataFloat32()
- // PRE-ALLOCATE REUSABLE BUFFERS (critical for avoiding memory leak!)
- tokAct, _ := NewActivation(tensor.Shape{1, hiddenSize}, tensor.DevicePlacement{Type: tensor.CUDA, GPU: gpu})
- defer FreeActivation(tokAct)
- a1, _ := NewActivation(tensor.Shape{1, cfg.IntermediateSize}, tensor.DevicePlacement{Type: tensor.CUDA, GPU: gpu})
- defer FreeActivation(a1)
- a3, _ := NewActivation(tensor.Shape{1, cfg.IntermediateSize}, tensor.DevicePlacement{Type: tensor.CUDA, GPU: gpu})
- defer FreeActivation(a3)
- exOut, _ := NewActivation(tensor.Shape{1, hiddenSize}, tensor.DevicePlacement{Type: tensor.CUDA, GPU: gpu})
- defer FreeActivation(exOut)
- sharedGateBuf, _ := NewActivation(tensor.Shape{1, cfg.IntermediateSize}, tensor.DevicePlacement{Type: tensor.CUDA, GPU: gpu})
- defer FreeActivation(sharedGateBuf)
- sharedUpBuf, _ := NewActivation(tensor.Shape{1, cfg.IntermediateSize}, tensor.DevicePlacement{Type: tensor.CUDA, GPU: gpu})
- defer FreeActivation(sharedUpBuf)
- sharedOutBuf, _ := NewActivation(tensor.Shape{1, hiddenSize}, tensor.DevicePlacement{Type: tensor.CUDA, GPU: gpu})
- defer FreeActivation(sharedOutBuf)
- // Temp CPU tensor for token data upload
- tokCPU := cpu.NewTensor(tensor.Shape{1, hiddenSize}, nil)
- for t := 0; t < seqLen; t++ {
- row := gateData[t*cfg.NumExperts : (t+1)*cfg.NumExperts]
- scores := make([]float32, cfg.NumExperts)
- copy(scores, row)
- scoresForChoice := make([]float32, cfg.NumExperts)
- for i := 0; i < cfg.NumExperts; i++ {
- scoresForChoice[i] = scores[i] + biasData[i]
- }
- masked := scoresForChoice
- if cfg.UseGroupedTopK {
- masked = nn.GroupedTopKMask(scoresForChoice, cfg.NumExpertGroup, cfg.TopKGroup)
- }
- choices := nn.SelectTopKExperts(masked, cfg.TopK, scores)
- if cfg.Renormalize && len(choices) > 1 {
- nn.RenormalizeMoEWeights(choices)
- }
- nn.ScaleMoEWeights(choices, cfg.RoutedScalingFactor)
- // Copy token data to GPU (reuse tokAct buffer)
- copy(tokCPU.DataFloat32(), inData[t*hiddenSize:(t+1)*hiddenSize])
- if tokCUDA, err := tokAct.AsCUDA(gpu); err == nil {
- tokCUDA.CopyFrom(tokCPU.DataFloat32())
- }
- acc := outCPU[t*hiddenSize : (t+1)*hiddenSize]
- for _, ch := range choices {
- ex := ch.Idx
- if ex >= len(weights.W1) || ex >= len(weights.W2) || ex >= len(weights.W3) {
- continue
- }
- w1 := weights.W1[ex]
- w2 := weights.W2[ex]
- w3 := weights.W3[ex]
- if w1 == nil || w2 == nil || w3 == nil {
- continue
- }
- // Reuse pre-allocated buffers
- HybridLinear(ctx, tokAct, w1, a1)
- HybridLinear(ctx, tokAct, w3, a3)
- // SwiGLU on GPU: a1 = silu(a1) * a3
- HybridSiLU(ctx, a1)
- HybridMul(ctx, a1, a3)
- HybridLinear(ctx, a1, w2, exOut)
- exCPU, _ := exOut.AsCPU()
- wd := ch.Weight
- for j := 0; j < hiddenSize; j++ {
- acc[j] += wd * exCPU.DataFloat32()[j]
- }
- }
- // Shared experts
- if cfg.NumSharedExperts > 0 && weights.SharedGate != nil && weights.SharedUp != nil && weights.SharedDown != nil {
- HybridLinear(ctx, tokAct, weights.SharedGate, sharedGateBuf)
- HybridLinear(ctx, tokAct, weights.SharedUp, sharedUpBuf)
- // SwiGLU on GPU
- HybridSiLU(ctx, sharedGateBuf)
- HybridMul(ctx, sharedGateBuf, sharedUpBuf)
- HybridLinear(ctx, sharedGateBuf, weights.SharedDown, sharedOutBuf)
- sCPU, _ := sharedOutBuf.AsCPU()
- for j := 0; j < hiddenSize; j++ {
- acc[j] += sCPU.DataFloat32()[j]
- }
- }
- }
- // Upload result to GPU
- outCUDA.CopyFrom(outCPU)
- return nil
- }
- func hybridMoECPU(
- ctx *Context,
- hidden *Activation,
- weights *MoEWeights,
- cfg MoEConfig,
- out *Activation,
- ) error {
- seqLen := hidden.Shape()[0]
- hiddenSize := hidden.Shape()[1]
- gateAct, _ := NewActivation(tensor.Shape{seqLen, cfg.NumExperts}, tensor.DevicePlacement{Type: tensor.CPU, GPU: -1})
- if err := HybridLinear(ctx, hidden, weights.GateW, gateAct); err != nil {
- return err
- }
- gateCPU, _ := gateAct.AsCPU()
- biasCPU, ok := weights.GateBias.(*cpu.Tensor)
- if !ok {
- return fmt.Errorf("moe gate bias not cpu tensor")
- }
- outCPU, _ := out.AsCPU()
- outData := outCPU.DataFloat32()
- hiddenCPU, _ := hidden.AsCPU()
- inData := hiddenCPU.DataFloat32()
- // Zero out output buffer as we will accumulate into it
- for i := range outData {
- outData[i] = 0
- }
- for t := 0; t < seqLen; t++ {
- row := gateCPU.DataFloat32()[t*cfg.NumExperts : (t+1)*cfg.NumExperts]
- scores := make([]float32, cfg.NumExperts)
- for i := 0; i < cfg.NumExperts; i++ {
- if cfg.RouterActivationFunc == "softmax" {
- scores[i] = row[i]
- } else {
- scores[i] = nn.Sigmoid(row[i])
- }
- }
- if cfg.RouterActivationFunc == "softmax" {
- nn.SoftmaxInplaceSimple(scores)
- }
- scoresForChoice := make([]float32, cfg.NumExperts)
- for i := 0; i < cfg.NumExperts; i++ {
- scoresForChoice[i] = scores[i] + biasCPU.DataFloat32()[i]
- }
- masked := scoresForChoice
- if cfg.UseGroupedTopK {
- masked = nn.GroupedTopKMask(scoresForChoice, cfg.NumExpertGroup, cfg.TopKGroup)
- }
- choices := nn.SelectTopKExperts(masked, cfg.TopK, scores)
- if cfg.Renormalize && len(choices) > 1 {
- nn.RenormalizeMoEWeights(choices)
- }
- nn.ScaleMoEWeights(choices, cfg.RoutedScalingFactor)
- tok := cpu.NewTensor(tensor.Shape{1, hiddenSize}, nil)
- copy(tok.DataFloat32(), inData[t*hiddenSize:(t+1)*hiddenSize])
- tokAct := NewActivationFrom(tok)
- acc := outData[t*hiddenSize : (t+1)*hiddenSize]
- for _, ch := range choices {
- ex := ch.Idx
- if ex >= len(weights.W1) || ex >= len(weights.W2) || ex >= len(weights.W3) {
- continue
- }
- w1 := weights.W1[ex]
- w2 := weights.W2[ex]
- w3 := weights.W3[ex]
- if w1 == nil || w2 == nil || w3 == nil {
- continue
- }
- a1, _ := NewActivation(tensor.Shape{1, cfg.IntermediateSize}, tensor.DevicePlacement{Type: tensor.CPU, GPU: -1})
- a3, _ := NewActivation(tensor.Shape{1, cfg.IntermediateSize}, tensor.DevicePlacement{Type: tensor.CPU, GPU: -1})
- HybridLinear(ctx, tokAct, w1, a1)
- HybridLinear(ctx, tokAct, w3, a3)
- a1CPU, _ := a1.AsCPU()
- a3CPU, _ := a3.AsCPU()
- a1d := a1CPU.DataFloat32()
- a3d := a3CPU.DataFloat32()
- for j := 0; j < cfg.IntermediateSize; j++ {
- a1d[j] = a1d[j] * nn.Sigmoid(a1d[j])
- a1d[j] = a1d[j] * a3d[j]
- }
- mix := NewActivationFrom(a1CPU)
- exOut, _ := NewActivation(tensor.Shape{1, hiddenSize}, tensor.DevicePlacement{Type: tensor.CPU, GPU: -1})
- HybridLinear(ctx, mix, w2, exOut)
- exCPU, _ := exOut.AsCPU()
- wd := ch.Weight
- for j := 0; j < hiddenSize; j++ {
- acc[j] += wd * exCPU.DataFloat32()[j]
- }
- }
- if cfg.NumSharedExperts > 0 && weights.SharedGate != nil && weights.SharedUp != nil && weights.SharedDown != nil {
- sharedGate, _ := NewActivation(tensor.Shape{1, cfg.IntermediateSize}, tensor.DevicePlacement{Type: tensor.CPU, GPU: -1})
- sharedUp, _ := NewActivation(tensor.Shape{1, cfg.IntermediateSize}, tensor.DevicePlacement{Type: tensor.CPU, GPU: -1})
- HybridLinear(ctx, tokAct, weights.SharedGate, sharedGate)
- HybridLinear(ctx, tokAct, weights.SharedUp, sharedUp)
- gCPU, _ := sharedGate.AsCPU()
- uCPU, _ := sharedUp.AsCPU()
- gD := gCPU.DataFloat32()
- uD := uCPU.DataFloat32()
- for j := 0; j < len(gD) && j < len(uD); j++ {
- gD[j] = gD[j] * nn.Sigmoid(gD[j])
- gD[j] = gD[j] * uD[j]
- }
- mix := NewActivationFrom(gCPU)
- sharedOut, _ := NewActivation(tensor.Shape{1, hiddenSize}, tensor.DevicePlacement{Type: tensor.CPU, GPU: -1})
- HybridLinear(ctx, mix, weights.SharedDown, sharedOut)
- sCPU, _ := sharedOut.AsCPU()
- for j := 0; j < hiddenSize; j++ {
- acc[j] += sCPU.DataFloat32()[j]
- }
- }
- }
- // Important: Copy result back to output activation (which might be on GPU)
- return out.CopyFrom(outCPU)
- }
|