跳转至

Data Flow & Power Attribution

This document explains how Kepler transforms raw hardware energy readings into accurate, fair power attribution across different workload levels.

Overview

Kepler's power attribution algorithm follows a 4-phase process that ensures mathematical consistency and fair energy distribution:

Phase 1: Hardware Collection → Phase 2: Node Breakdown → Phase 3: Workload Attribution → Phase 4: Hierarchical Aggregation

System Data Flow

Kepler Architecture Diagram

Data Flow Sequence:

graph TD
    A[Hardware RAPL Sensors] --> B[Device Layer]
    C[proc filesystem] --> D[Resource Layer]

    B --> E[Power Monitor]
    D --> E

    E --> F[Power Attribution Algorithm]
    F --> G[Snapshot Creation]

    G --> H[Prometheus Exporter]
    G --> I[Stdout Exporter]

    H --> J[Metrics Endpoint]
    I --> K[Debug Output]

Phase 1: Hardware Energy Collection

RAPL Sensor Reading

The Device Layer reads energy consumption from Intel RAPL (Running Average Power Limit) sensors:

func (pm *PowerMonitor) collectHardwareEnergy() (map[EnergyZone]Energy, error) {
    zones, err := pm.cpu.Zones()
    if err != nil {
        return nil, err
    }

    energyReadings := make(map[EnergyZone]Energy)
    for _, zone := range zones {
        energy, err := zone.Energy()  // Read from /sys/class/powercap/intel-rapl
        if err != nil {
            continue
        }
        energyReadings[zone] = energy
    }

    return energyReadings, nil
}

Counter Wraparound Handling

RAPL counters are finite and wrap around at maximum values:

func calculateEnergyDelta(current, previous, maxEnergy Energy) Energy {
    if current >= previous {
        return current - previous  // Normal case
    }

    // Handle wraparound: counter reset to 0
    return (maxEnergy - previous) + current
}

Energy Zone Types

Zone Description Coverage Priority
psys Platform system energy CPU + Memory + I/O Highest
package CPU package energy Cores + Uncore + Cache High
core CPU cores only Processing units Medium
dram Memory energy System memory Medium
uncore Uncore energy Cache, memory controller Low

Phase 2: Node Energy Breakdown

Active vs Idle Split

Node energy is split based on CPU usage ratio from /proc/stat:

func (pm *PowerMonitor) calculateNodePower(prev, current *Node) error {
    // Get CPU usage ratio from /proc/stat
    cpuUsageRatio := pm.resources.Node().CPUUsageRatio

    for zone, energyDelta := range energyDeltas {
        // Split energy between active and idle
        activeEnergy := Energy(float64(energyDelta) * cpuUsageRatio)
        idleEnergy := energyDelta - activeEnergy

        current.Zones[zone] = NodeUsage{
            EnergyTotal:       prev.Zones[zone].EnergyTotal + energyDelta,
            Power:            Power(energyDelta / timeDelta),
            ActiveEnergyTotal: prev.Zones[zone].ActiveEnergyTotal + activeEnergy,
            ActivePower:      Power(activeEnergy / timeDelta),
            IdleEnergyTotal:  prev.Zones[zone].IdleEnergyTotal + idleEnergy,
            IdlePower:        Power(idleEnergy / timeDelta),
            activeEnergy:     activeEnergy,  // Internal field for attribution
        }
    }

    return nil
}

Mathematical Relationship

Total Node Energy = Active Energy + Idle Energy
Active Energy = Total Energy × CPU Usage Ratio
Idle Energy = Total Energy × (1 - CPU Usage Ratio)

Phase 3: Workload Attribution

CPU Time-Based Attribution

Active energy is distributed to workloads proportionally based on their CPU time consumption:

func (pm *PowerMonitor) calculateProcessPower(prev, current *Snapshot) error {
    node := current.Node
    processes := pm.resources.Processes().Running

    // Calculate total CPU time delta across all processes
    var nodeCPUTimeDelta float64
    for _, proc := range processes {
        nodeCPUTimeDelta += proc.CPUTimeDelta
    }

    // Distribute active energy proportionally
    for _, proc := range processes {
        process := newProcess(proc, node.Zones)

        for zone, nodeZoneUsage := range node.Zones {
            if nodeZoneUsage.ActivePower == 0 || nodeCPUTimeDelta == 0 {
                continue
            }

            // Calculate this process's share
            cpuTimeRatio := proc.CPUTimeDelta / nodeCPUTimeDelta
            activeEnergy := Energy(cpuTimeRatio * float64(nodeZoneUsage.activeEnergy))

            // Accumulate absolute energy
            absoluteEnergy := activeEnergy
            if prevProcess, exists := prev.Processes[proc.StringID()]; exists {
                if prevUsage, hasZone := prevProcess.Zones[zone]; hasZone {
                    absoluteEnergy += prevUsage.EnergyTotal
                }
            }

            process.Zones[zone] = Usage{
                Power:       Power(cpuTimeRatio * float64(nodeZoneUsage.ActivePower)),
                EnergyTotal: absoluteEnergy,
            }
        }

        current.Processes[process.StringID()] = process
    }

    return nil
}

Attribution Formula

For each process and each energy zone:

Process Energy Share = (Process CPU Time Delta / Total CPU Time Delta) × Active Energy
Process Power = Process Energy Share / Time Delta
Process Total Energy = Previous Total + Current Energy Share

Energy Conservation Validation

The system validates that energy attribution is mathematically consistent:

// Validation: Sum of process energies should equal node active energy
var totalProcessEnergy Energy
for _, process := range snapshot.Processes {
    for zone, usage := range process.Zones {
        totalProcessEnergy += usage.EnergyTotal - prevProcessEnergy[zone]
    }
}

assert.Equal(nodeActiveEnergyDelta, totalProcessEnergy)

Phase 4: Hierarchical Aggregation

Container Aggregation

Container energy is the sum of all processes within that container:

func (pm *PowerMonitor) calculateContainerPower(prev, current *Snapshot) error {
    containers := pm.resources.Containers().Running

    for _, cntr := range containers {
        container := newContainer(cntr, current.Node.Zones)

        // Sum energy from all processes in this container
        for _, proc := range current.Processes {
            if proc.ContainerID == cntr.ID {
                for zone, processUsage := range proc.Zones {
                    containerUsage := container.Zones[zone]
                    containerUsage.Power += processUsage.Power
                    containerUsage.EnergyTotal += processUsage.EnergyTotal - getPrevProcessEnergy(prev, proc, zone)
                    container.Zones[zone] = containerUsage
                }
            }
        }

        current.Containers[container.StringID()] = container
    }

    return nil
}

VM Aggregation

Virtual machine energy follows the same pattern as containers:

func (pm *PowerMonitor) calculateVMPower(prev, current *Snapshot) error {
    vms := pm.resources.VirtualMachines().Running

    for _, vm := range vms {
        virtualMachine := newVM(vm, current.Node.Zones)

        // Sum energy from all processes associated with this VM
        for _, proc := range current.Processes {
            if proc.VirtualMachineID == vm.ID {
                // Same aggregation logic as containers
                aggregateProcessEnergyToWorkload(proc, virtualMachine)
            }
        }

        current.VirtualMachines[virtualMachine.StringID()] = virtualMachine
    }

    return nil
}

Pod Aggregation

Pod energy is calculated from constituent containers:

func (pm *PowerMonitor) calculatePodPower(prev, current *Snapshot) error {
    pods := pm.resources.Pods().Running

    for _, p := range pods {
        pod := newPod(p, current.Node.Zones)

        // Sum energy from all containers in this pod
        for _, container := range current.Containers {
            if container.PodID == p.ID {
                for zone, containerUsage := range container.Zones {
                    podUsage := pod.Zones[zone]
                    podUsage.Power += containerUsage.Power
                    podUsage.EnergyTotal += containerUsage.EnergyTotal - getPrevContainerEnergy(prev, container, zone)
                    pod.Zones[zone] = podUsage
                }
            }
        }

        current.Pods[pod.StringID()] = pod
    }

    return nil
}

Hierarchical Consistency

The system enforces mathematical relationships across all levels:

Node Active Energy = Σ(Process Energy Deltas)
Container Energy = Σ(Container Process Energy Deltas)
VM Energy = Σ(VM Process Energy Deltas)
Pod Energy = Σ(Pod Container Energy Deltas)

Terminated Workload Handling

Fair Attribution Problem

Traditional monitoring only reports running workloads, leading to unfair attribution:

  • Process consumes 100J of energy
  • Process terminates before next collection
  • Energy is lost or attributed to other workloads

Solution: Terminated Workload Tracking

type TerminatedResourceTracker[T Resource] struct {
    items    []T
    capacity int
    minEnergyThreshold Energy
}

func (t *TerminatedResourceTracker[T]) Add(resource T) {
    // Only track workloads above energy threshold
    if resource.TotalEnergy() < t.minEnergyThreshold {
        return
    }

    // Add to priority queue (highest energy first)
    t.items = append(t.items, resource)
    sort.Slice(t.items, func(i, j int) bool {
        return t.items[i].TotalEnergy() > t.items[j].TotalEnergy()
    })

    // Maintain capacity limit
    if len(t.items) > t.capacity {
        t.items = t.items[:t.capacity]
    }
}

Export-Triggered Cleanup

Terminated workloads are only cleared after export to prevent data loss:

func (pm *PowerMonitor) calculateProcessPower(prev, current *Snapshot) error {
    // Clear terminated workloads if previous snapshot was exported
    if pm.exported.Load() {
        pm.terminatedProcessesTracker.Clear()
        pm.terminatedContainersTracker.Clear()
        pm.terminatedVMsTracker.Clear()
        pm.terminatedPodsTracker.Clear()
    }

    // Handle newly terminated processes
    for id := range prev.Processes {
        if _, stillRunning := current.Processes[id]; !stillRunning {
            pm.terminatedProcessesTracker.Add(prev.Processes[id].Clone())
        }
    }

    // Populate terminated workloads in snapshot
    current.TerminatedProcesses = pm.terminatedProcessesTracker.Items()

    return nil
}

Data Freshness & Caching

Staleness Control

Data is automatically refreshed when stale:

func (pm *PowerMonitor) Snapshot() (*Snapshot, error) {
    if !pm.isFresh() {
        if err := pm.synchronizedPowerRefresh(); err != nil {
            return nil, err
        }
    }
    return pm.snapshot.Load(), nil
}

func (pm *PowerMonitor) isFresh() bool {
    snapshot := pm.snapshot.Load()
    if snapshot == nil {
        return false
    }

    age := pm.clock.Now().Sub(snapshot.Timestamp)
    return age <= pm.maxStaleness
}

Singleflight Protection

Prevents redundant calculations during concurrent requests:

func (pm *PowerMonitor) synchronizedPowerRefresh() error {
    _, err, _ := pm.computeGroup.Do("compute", func() (any, error) {
        // Double-check freshness after acquiring lock
        if pm.isFresh() {
            return nil, nil
        }

        return nil, pm.refreshSnapshot()
    })

    return err
}

Collection Timing & Intervals

Configurable Collection

monitor:
  interval: 3s        # How often to collect new data
  staleness: 10s      # Maximum age before data is considered stale

Collection Lifecycle

func (pm *PowerMonitor) Run(ctx context.Context) error {
    ticker := pm.clock.NewTicker(pm.interval)
    defer ticker.Stop()

    for {
        select {
        case <-ticker.C():
            pm.scheduleNextCollection()
        case <-ctx.Done():
            return ctx.Err()
        }
    }
}

func (pm *PowerMonitor) scheduleNextCollection() {
    go func() {
        if err := pm.synchronizedPowerRefresh(); err != nil {
            pm.logger.Error("Collection failed", "error", err)
        } else {
            pm.signalNewData()  // Notify exporters
        }
    }()
}

Error Handling & Edge Cases

Missing Hardware Support

func (pm *PowerMonitor) refreshSnapshot() error {
    zones, err := pm.cpu.Zones()
    if err != nil {
        return fmt.Errorf("failed to get energy zones: %w", err)
    }

    if len(zones) == 0 {
        return fmt.Errorf("no RAPL zones available")
    }

    // Continue with available zones
    return pm.calculatePowerWithZones(zones)
}

Process Lifecycle Edge Cases

  • New Processes: Start with zero previous energy, contribute immediately
  • Terminated Processes: Tracked until exported, prevent energy loss
  • Process Migration: CPU time tracking handles process movement
  • Short-lived Processes: Minimum energy threshold prevents noise

Energy Measurement Edge Cases

  • Counter Wraparound: Proper delta calculation across wraparound boundaries
  • Negative Deltas: Clock adjustments and measurement errors handled gracefully
  • Zero Energy Periods: Idle systems with minimal energy consumption
  • High-frequency Collection: Prevent measurement noise from frequent sampling

Performance Considerations

Algorithmic Complexity

  • Process Attribution: O(N × Z) where N = processes, Z = zones
  • Container Aggregation: O(C × Z) where C = containers
  • Pod Aggregation: O(P × Z) where P = pods
  • Total Complexity: O((N + C + P) × Z) - linear scaling

Memory Usage

  • Snapshot Storage: Single atomic pointer, copy-on-write
  • Terminated Tracking: Configurable capacity with priority-based retention
  • Cache Management: Automatic cleanup prevents memory leaks

I/O Optimization

  • Batch procfs Reads: Single scan for all process information
  • Hardware Sensor Caching: Zone discovery cached, energy read on-demand
  • Parallel Processing: Independent workload types processed concurrently

Next Steps

After understanding data flow and attribution:

  • Concurrency: Learn how thread safety is maintained during attribution
  • Interfaces: Understand the contracts that enable this data flow
  • Configuration: Configure collection intervals and attribution parameters

Copyright Contributors to the Kepler's project.

The Linux Foundation® (TLF) has registered trademarks and uses trademarks. For a list of TLF trademarks, see Trademark Usage.