跳转至

Concurrency & Thread Safety

This document explains Kepler's concurrency patterns, thread safety guarantees, and how the system achieves deterministic results regardless of concurrent access patterns.

Core Principle: Deterministic Processing

"Parallel vs Serial processing should not produce different results"

Kepler's architecture ensures that concurrent access never produces different results than serial access, maintaining data consistency and predictable behavior.

Thread Safety Guarantees

Component-Level Thread Safety

Component Thread Safety Access Pattern Notes
PowerMonitor (public methods) ✅ Thread-safe Multiple readers, single writer Except Init()
Device Layer ❌ Not required Single goroutine access Called only from monitor
Resource Layer ❌ Not required Single goroutine access Called only from monitor
Snapshot ✅ Immutable Multiple readers Copy-on-write semantics
Exporters ✅ Thread-safe Multiple concurrent exports Independent collection
Service Framework ✅ Thread-safe Concurrent lifecycle management Coordinated shutdown

PowerMonitor Thread Safety

The PowerMonitor is the only component that requires explicit thread safety since it's accessed concurrently by multiple exporters:

type PowerMonitor struct {
    // Thread-safe fields
    snapshot     atomic.Pointer[Snapshot]  // Atomic pointer for lock-free reads
    computeGroup singleflight.Group        // Prevent redundant calculations
    exported     atomic.Bool               // Track export state

    // Single-writer fields (no concurrent access)
    cpu       device.CPUPowerMeter
    resources resource.Informer

    // Configuration (read-only after init)
    interval     time.Duration
    maxStaleness time.Duration
}

Concurrency Patterns

1. Single Writer, Multiple Readers

The fundamental pattern that ensures data consistency:

// Single writer: Only the collection goroutine updates snapshots
func (pm *PowerMonitor) refreshSnapshot() error {
    newSnapshot := NewSnapshot()

    // Perform all calculations
    pm.calculatePower(newSnapshot)

    // Atomic update - visible to all readers instantly
    pm.snapshot.Store(newSnapshot)

    return nil
}

// Multiple readers: Exporters read snapshots concurrently
func (pm *PowerMonitor) Snapshot() (*Snapshot, error) {
    return pm.snapshot.Load(), nil  // Lock-free read
}

Benefits:

  • No locks required for reads
  • Consistent view across all readers
  • No partial updates possible

2. Singleflight Protection

Prevents redundant calculations when multiple goroutines request fresh data simultaneously:

func (pm *PowerMonitor) synchronizedPowerRefresh() error {
    _, err, _ := pm.computeGroup.Do("compute", func() (any, error) {
        // Double-check pattern: verify freshness after acquiring singleflight lock
        if pm.isFresh() {
            return nil, nil  // Another goroutine already computed fresh data
        }

        return nil, pm.refreshSnapshot()
    })

    return err
}

Scenario Protected:

Goroutine 1: isFresh() → false → waits for singleflight lock
Goroutine 2: isFresh() → false → waits for singleflight lock
Goroutine 1: acquires lock → computes → releases lock
Goroutine 2: acquires lock → isFresh() → true → returns without computation

3. Immutable Snapshots

Snapshots are immutable after creation, ensuring thread-safe access:

type Snapshot struct {
    Timestamp time.Time  // Set once during creation

    // All fields are populated once and never modified
    Node              *Node
    Processes         map[string]*Process
    Containers        map[string]*Container
    VirtualMachines   map[string]*VirtualMachine
    Pods              map[string]*Pod

    // Terminated workloads (also immutable)
    TerminatedProcesses      []*Process
    TerminatedContainers     []*Container
    TerminatedVirtualMachines []*VirtualMachine
    TerminatedPods           []*Pod
}

func NewSnapshot() *Snapshot {
    return &Snapshot{
        Node:              &Node{Zones: make(NodeZoneUsageMap)},
        Processes:         make(map[string]*Process),
        Containers:        make(map[string]*Container),
        VirtualMachines:   make(map[string]*VirtualMachine),
        Pods:              make(map[string]*Pod),
        // Terminated slices initialized during calculation
    }
}

4. Atomic State Management

Simple state changes use atomic operations to avoid locks:

type PowerMonitor struct {
    exported atomic.Bool  // Track whether current snapshot has been exported
}

// Mark snapshot as exported (thread-safe)
func (pm *PowerMonitor) Snapshot() (*Snapshot, error) {
    snapshot := pm.snapshot.Load()
    pm.exported.Store(true)  // Atomic flag update
    return snapshot, nil
}

// Check export state during collection (thread-safe)
func (pm *PowerMonitor) refreshSnapshot() error {
    if pm.exported.Load() {
        // Clear terminated workloads after export
        pm.terminatedProcessesTracker.Clear()
    }

    // Reset export flag
    pm.exported.Store(false)

    return nil
}

Service Framework Concurrency

Coordinated Lifecycle Management

The service framework manages concurrent service execution using oklog/run:

func Run(ctx context.Context, logger *slog.Logger, services []Service) error {
    ctx, cancel := context.WithCancel(ctx)
    defer cancel()

    var g run.Group

    // Add each service to the run group
    for _, s := range services {
        if runner, ok := s.(Runner); ok {
            svc := s  // Capture for closure
            r := runner

            g.Add(
                func() error {
                    return r.Run(ctx)  // Execute service
                },
                func(err error) {
                    cancel()  // Cancel all services on any failure

                    // Graceful shutdown
                    if shutdowner, ok := svc.(Shutdowner); ok {
                        shutdowner.Shutdown()
                    }
                },
            )
        }
    }

    return g.Run()  // Execute all services concurrently
}

Concurrency Features:

  • Concurrent Execution: All services run in parallel
  • Failure Propagation: Any service failure cancels all others
  • Graceful Shutdown: Services shut down in proper order
  • Context Cancellation: Clean cancellation propagation

Service Independence

Services are designed to be independent and not share mutable state:

// Each service has its own dependencies and state
func createServices(cfg *config.Config) []service.Service {
    // Create independent instances
    cpuPowerMeter := device.NewCPUPowerMeter(cfg.Host.SysFS)
    resourceInformer := resource.NewInformer(cfg.Host.ProcFS)
    powerMonitor := monitor.NewPowerMonitor(cpuPowerMeter, resourceInformer)

    // Services communicate through well-defined interfaces
    promExporter := prometheus.NewExporter(powerMonitor, apiServer)
    stdoutExporter := stdout.NewExporter(powerMonitor)

    return []service.Service{
        resourceInformer,
        cpuPowerMeter,
        powerMonitor,
        promExporter,
        stdoutExporter,
    }
}

Exporter Concurrency

Independent Collection

Each exporter independently accesses the PowerMonitor without coordination:

// Prometheus exporter collects metrics independently
func (c *PowerCollector) Collect(ch chan<- prometheus.Metric) {
    snapshot, err := c.pm.Snapshot()  // Thread-safe access
    if err != nil {
        return
    }

    // Process snapshot data independently
    c.collectNodeMetrics(ch, snapshot.Node)
    c.collectProcessMetrics(ch, snapshot.Processes)
    // ... other metrics
}

// Stdout exporter also accesses independently
func (e *Exporter) Run(ctx context.Context) error {
    for {
        select {
        case <-e.monitor.DataChannel():
            snapshot, _ := e.monitor.Snapshot()  // Same thread-safe access
            e.printSnapshot(snapshot)
        case <-ctx.Done():
            return ctx.Err()
        }
    }
}

Benefits:

  • No coordination required between exporters
  • Each exporter can have different collection frequencies
  • Independent failure handling
  • Easy to add new exporters

Data Channel Notifications

The PowerMonitor notifies exporters of new data without blocking:

func (pm *PowerMonitor) signalNewData() {
    select {
    case pm.dataCh <- struct{}{}:
        // Successfully notified
    default:
        // Channel full, skip notification (non-blocking)
    }
}

func (pm *PowerMonitor) DataChannel() <-chan struct{} {
    return pm.dataCh  // Read-only channel for exporters
}

Resource Layer Concurrency

Parallel Resource Processing

Within the resource informer, independent workload types are processed in parallel:

func (ri *resourceInformer) Refresh() error {
    // 1. Refresh processes first (foundation for other types)
    containerProcs, vmProcs, err := ri.refreshProcesses()

    // 2. Process independent workload types in parallel
    var wg sync.WaitGroup
    var cntrErrs, vmErrs, nodeErrs, podErrs error

    wg.Add(3)

    // Containers and pods (containers → pods dependency)
    go func() {
        defer wg.Done()
        cntrErrs = ri.refreshContainers(containerProcs)
        podErrs = ri.refreshPods()  // Depends on containers
    }()

    // VMs (independent)
    go func() {
        defer wg.Done()
        vmErrs = ri.refreshVMs(vmProcs)
    }()

    // Node stats (independent)
    go func() {
        defer wg.Done()
        nodeErrs = ri.refreshNode()
    }()

    wg.Wait()

    return errors.Join(cntrErrs, vmErrs, nodeErrs, podErrs)
}

Safety Considerations:

  • Different goroutines operate on completely separate data structures
  • No shared mutable state between parallel operations
  • Dependencies respected (containers before pods)

Power Attribution Concurrency

Single-Threaded Attribution

Power attribution is intentionally single-threaded to ensure deterministic results:

func (pm *PowerMonitor) refreshSnapshot() error {
    newSnapshot := NewSnapshot()
    prevSnapshot := pm.snapshot.Load()

    // All attribution calculations run in single goroutine
    if prevSnapshot == nil {
        err := pm.firstReading(newSnapshot)
    } else {
        err := pm.calculatePower(prevSnapshot, newSnapshot)
    }

    // Atomic update makes results visible to all readers
    newSnapshot.Timestamp = pm.clock.Now()
    pm.snapshot.Store(newSnapshot)

    return err
}

Why Single-Threaded?

  • Deterministic Results: Same inputs always produce same outputs
  • Mathematical Consistency: Energy conservation enforced across all levels
  • Simplified Reasoning: No race conditions in complex attribution logic
  • Performance: Attribution is CPU-bound, not I/O-bound

Testing Concurrency

Race Detection

All concurrent code is tested with Go's race detector:

# Run tests with race detection
go test -race ./...

# Run specific concurrency tests
go test -race -run TestConcurrency ./internal/monitor/

Stress Testing

Concurrency stress tests validate behavior under high contention:

func TestPowerMonitor_ConcurrentAccess(t *testing.T) {
    pm := setupPowerMonitor(t)

    // Start multiple goroutines accessing snapshots
    var wg sync.WaitGroup
    const numGoroutines = 100

    for i := 0; i < numGoroutines; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()

            // Multiple concurrent calls should not race
            for j := 0; j < 100; j++ {
                snapshot, err := pm.Snapshot()
                assert.NoError(t, err)
                assert.NotNil(t, snapshot)
            }
        }()
    }

    wg.Wait()
}

Determinism Testing

Tests verify that concurrent access produces identical results to serial access:

func TestPowerMonitor_DeterministicResults(t *testing.T) {
    pm := setupPowerMonitor(t)

    // Trigger calculation
    snapshot1, _ := pm.Snapshot()

    // Multiple concurrent accesses should return identical data
    var results []*Snapshot
    var mu sync.Mutex
    var wg sync.WaitGroup

    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            snapshot, _ := pm.Snapshot()

            mu.Lock()
            results = append(results, snapshot)
            mu.Unlock()
        }()
    }

    wg.Wait()

    // All results should be identical
    for _, result := range results {
        assert.Equal(t, snapshot1, result)
    }
}

Common Concurrency Patterns

1. Reader-Writer Pattern

type SafeCounter struct {
    mu    sync.RWMutex
    value int64
}

func (c *SafeCounter) Increment() {
    c.mu.Lock()
    defer c.mu.Unlock()
    c.value++
}

func (c *SafeCounter) Value() int64 {
    c.mu.RLock()
    defer c.mu.RUnlock()
    return c.value
}

Used in: Terminated workload tracking with priority queues

2. Atomic Operations

type AtomicFlag struct {
    flag int64
}

func (f *AtomicFlag) Set() {
    atomic.StoreInt64(&f.flag, 1)
}

func (f *AtomicFlag) IsSet() bool {
    return atomic.LoadInt64(&f.flag) == 1
}

Used in: Export state tracking, freshness flags

3. Channel-Based Coordination

type Coordinator struct {
    dataCh chan struct{}
    done   chan struct{}
}

func (c *Coordinator) Signal() {
    select {
    case c.dataCh <- struct{}{}:
    default: // Non-blocking
    }
}

func (c *Coordinator) Wait(ctx context.Context) error {
    select {
    case <-c.dataCh:
        return nil
    case <-ctx.Done():
        return ctx.Err()
    }
}

Used in: Data change notifications, service coordination

Performance Implications

Lock-Free Reads

Most reads in Kepler are lock-free, providing excellent read performance:

// O(1) lock-free read
func (pm *PowerMonitor) Snapshot() (*Snapshot, error) {
    return pm.snapshot.Load(), nil
}

Single Writer Bottleneck

The single writer pattern creates a natural bottleneck that prevents:

  • Inconsistent state
  • Race conditions in complex calculations
  • Need for complex locking schemes

Memory Consistency

Atomic pointer updates provide memory consistency guarantees:

  • Readers see either old snapshot or new snapshot, never partial updates
  • Memory barriers ensure visibility across CPU cores
  • No cache coherency issues

Next Steps

After understanding concurrency patterns:

  • Interfaces: Learn the contracts that enable safe concurrent access
  • Configuration: Configure collection intervals and concurrency parameters

Copyright Contributors to the Kepler's project.

The Linux Foundation® (TLF) has registered trademarks and uses trademarks. For a list of TLF trademarks, see Trademark Usage.