Back
0
Mar 1, 2025

Concurrency Patterns in Go for Payment Processing Systems

Go's concurrency model, built around goroutines and channels, makes it an excellent choice for building high-throughput payment processing systems. In this article, we'll explore practical patterns for handling concurrent payment operations while maintaining correctness, idempotency, and fault tolerance.

Why Go for Payment Systems?

Go offers several advantages for fintech applications:

  • Lightweight concurrency: Goroutines use ~2KB of stack space vs threads (~1MB)
  • Built-in race detection: go run -race catches concurrency bugs
  • Fast compilation: Quick iteration during development
  • Strong standard library: HTTP, JSON, crypto out of the box
  • Static typing: Catch errors at compile time

Goroutines vs Threads

// Traditional threading (expensive)
// Each thread: ~1MB stack
// 10,000 threads = ~10GB memory

// Go goroutines (lightweight)
// Each goroutine: ~2KB stack
// 10,000 goroutines = ~20MB memory

func main() {
    // Spawn 100,000 concurrent payment processors
    for i := 0; i < 100000; i++ {
        go processPayment(i) // Only ~200MB total!
    }
}

Pattern 1: Worker Pool for Payment Processing

Handle high-volume payment requests efficiently.

package main

import (
    "context"
    "fmt"
    "sync"
    "time"
)

type Payment struct {
    ID            string
    Amount        float64
    Currency      string
    IdempotencyKey string
}

type PaymentResult struct {
    Payment *Payment
    Success bool
    Error   error
}

// Worker pool implementation
type PaymentProcessor struct {
    workers    int
    jobQueue   chan *Payment
    resultChan chan *PaymentResult
    wg         sync.WaitGroup
}

func NewPaymentProcessor(workers int, queueSize int) *PaymentProcessor {
    return &PaymentProcessor{
        workers:    workers,
        jobQueue:   make(chan *Payment, queueSize),
        resultChan: make(chan *PaymentResult, queueSize),
    }
}

func (p *PaymentProcessor) Start(ctx context.Context) {
    // Start worker goroutines
    for i := 0; i < p.workers; i++ {
        p.wg.Add(1)
        go p.worker(ctx, i)
    }
}

func (p *PaymentProcessor) worker(ctx context.Context, id int) {
    defer p.wg.Done()
    
    fmt.Printf("Worker %d started\n", id)
    
    for {
        select {
        case <-ctx.Done():
            fmt.Printf("Worker %d shutting down\n", id)
            return
            
        case payment, ok := <-p.jobQueue:
            if !ok {
                return
            }
            
            // Process payment
            result := p.processPayment(payment)
            
            // Send result
            select {
            case p.resultChan <- result:
            case <-ctx.Done():
                return
            }
        }
    }
}

func (p *PaymentProcessor) processPayment(payment *Payment) *PaymentResult {
    // Simulate payment processing
    time.Sleep(100 * time.Millisecond)
    
    // Validate payment
    if payment.Amount <= 0 {
        return &PaymentResult{
            Payment: payment,
            Success: false,
            Error:   fmt.Errorf("invalid amount: %f", payment.Amount),
        }
    }
    
    // Process with payment gateway
    // In real implementation: call Stripe, PayPal, etc.
    
    return &PaymentResult{
        Payment: payment,
        Success: true,
        Error:   nil,
    }
}

func (p *PaymentProcessor) Submit(payment *Payment) {
    p.jobQueue <- payment
}

func (p *PaymentProcessor) Results() <-chan *PaymentResult {
    return p.resultChan
}

func (p *PaymentProcessor) Shutdown() {
    close(p.jobQueue)
    p.wg.Wait()
    close(p.resultChan)
}

// Usage
func main() {
    ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
    defer cancel()
    
    // Create processor with 10 workers
    processor := NewPaymentProcessor(10, 100)
    processor.Start(ctx)
    
    // Submit payments
    go func() {
        for i := 0; i < 100; i++ {
            payment := &Payment{
                ID:       fmt.Sprintf("PAY-%d", i),
                Amount:   100.50,
                Currency: "USD",
            }
            processor.Submit(payment)
        }
    }()
    
    // Collect results
    processed := 0
    for result := range processor.Results() {
        if result.Success {
            fmt.Printf("Payment %s processed successfully\n", result.Payment.ID)
        } else {
            fmt.Printf("Payment %s failed: %v\n", result.Payment.ID, result.Error)
        }
        
        processed++
        if processed == 100 {
            break
        }
    }
    
    processor.Shutdown()
}

Pattern 2: Idempotent Payment Handler

Ensure payments can be safely retried.

package main

import (
    "context"
    "crypto/sha256"
    "encoding/hex"
    "fmt"
    "sync"
    "time"
)

type IdempotentPaymentService struct {
    cache      sync.Map // Thread-safe map for idempotency
    mu         sync.RWMutex
    processing map[string]*sync.Mutex // Per-key locks
}

func NewIdempotentPaymentService() *IdempotentPaymentService {
    return &IdempotentPaymentService{
        processing: make(map[string]*sync.Mutex),
    }
}

func (s *IdempotentPaymentService) ProcessPayment(
    ctx context.Context,
    payment *Payment,
) (*PaymentResult, error) {
    
    // Generate idempotency key if not provided
    if payment.IdempotencyKey == "" {
        payment.IdempotencyKey = s.generateIdempotencyKey(payment)
    }
    
    // Check cache first (fast path)
    if cached, ok := s.cache.Load(payment.IdempotencyKey); ok {
        return cached.(*PaymentResult), nil
    }
    
    // Get or create mutex for this idempotency key
    mutex := s.getOrCreateMutex(payment.IdempotencyKey)
    mutex.Lock()
    defer mutex.Unlock()
    
    // Double-check cache after acquiring lock
    if cached, ok := s.cache.Load(payment.IdempotencyKey); ok {
        return cached.(*PaymentResult), nil
    }
    
    // Process payment
    result := &PaymentResult{
        Payment: payment,
        Success: true,
    }
    
    // Simulate payment gateway call
    select {
    case <-ctx.Done():
        return nil, ctx.Err()
    case <-time.After(100 * time.Millisecond):
        // Payment processed
    }
    
    // Cache result (TTL: 24 hours in production)
    s.cache.Store(payment.IdempotencyKey, result)
    
    // Schedule cache cleanup
    go s.cleanupCache(payment.IdempotencyKey, 24*time.Hour)
    
    return result, nil
}

func (s *IdempotentPaymentService) getOrCreateMutex(key string) *sync.Mutex {
    s.mu.Lock()
    defer s.mu.Unlock()
    
    if mutex, ok := s.processing[key]; ok {
        return mutex
    }
    
    mutex := &sync.Mutex{}
    s.processing[key] = mutex
    return mutex
}

func (s *IdempotentPaymentService) generateIdempotencyKey(payment *Payment) string {
    data := fmt.Sprintf("%s:%f:%s", payment.ID, payment.Amount, payment.Currency)
    hash := sha256.Sum256([]byte(data))
    return hex.EncodeToString(hash[:])
}

func (s *IdempotentPaymentService) cleanupCache(key string, ttl time.Duration) {
    time.Sleep(ttl)
    s.cache.Delete(key)
    
    s.mu.Lock()
    delete(s.processing, key)
    s.mu.Unlock()
}

Pattern 3: Rate Limiting with Token Bucket

Protect payment gateway from overload.

package main

import (
    "context"
    "fmt"
    "sync"
    "time"
)

type TokenBucket struct {
    capacity   int
    tokens     int
    refillRate time.Duration
    mu         sync.Mutex
    lastRefill time.Time
}

func NewTokenBucket(capacity int, refillRate time.Duration) *TokenBucket {
    return &TokenBucket{
        capacity:   capacity,
        tokens:     capacity,
        refillRate: refillRate,
        lastRefill: time.Now(),
    }
}

func (tb *TokenBucket) Allow() bool {
    tb.mu.Lock()
    defer tb.mu.Unlock()
    
    // Refill tokens based on elapsed time
    now := time.Now()
    elapsed := now.Sub(tb.lastRefill)
    tokensToAdd := int(elapsed / tb.refillRate)
    
    if tokensToAdd > 0 {
        tb.tokens = min(tb.capacity, tb.tokens+tokensToAdd)
        tb.lastRefill = now
    }
    
    // Check if token available
    if tb.tokens > 0 {
        tb.tokens--
        return true
    }
    
    return false
}

func min(a, b int) int {
    if a < b {
        return a
    }
    return b
}

// Rate-limited payment processor
type RateLimitedPaymentProcessor struct {
    limiter *TokenBucket
    processor *PaymentProcessor
}

func NewRateLimitedPaymentProcessor(
    rps int,
    workers int,
) *RateLimitedPaymentProcessor {
    
    return &RateLimitedPaymentProcessor{
        limiter:   NewTokenBucket(rps, time.Second/time.Duration(rps)),
        processor: NewPaymentProcessor(workers, 1000),
    }
}

func (p *RateLimitedPaymentProcessor) ProcessPayment(
    ctx context.Context,
    payment *Payment,
) error {
    
    // Wait for rate limit
    for {
        if p.limiter.Allow() {
            break
        }
        
        select {
        case <-ctx.Done():
            return ctx.Err()
        case <-time.After(10 * time.Millisecond):
            // Retry
        }
    }
    
    // Submit to processor
    p.processor.Submit(payment)
    return nil
}

Pattern 4: Circuit Breaker for Payment Gateway

Prevent cascading failures when payment gateway is down.

package main

import (
    "errors"
    "sync"
    "time"
)

type CircuitState int

const (
    StateClosed CircuitState = iota
    StateOpen
    StateHalfOpen
)

type CircuitBreaker struct {
    maxFailures  int
    timeout      time.Duration
    state        CircuitState
    failures     int
    lastFailTime time.Time
    mu           sync.RWMutex
}

func NewCircuitBreaker(maxFailures int, timeout time.Duration) *CircuitBreaker {
    return &CircuitBreaker{
        maxFailures: maxFailures,
        timeout:     timeout,
        state:       StateClosed,
    }
}

func (cb *CircuitBreaker) Call(fn func() error) error {
    cb.mu.Lock()
    
    // Check if circuit should transition from Open to HalfOpen
    if cb.state == StateOpen {
        if time.Since(cb.lastFailTime) > cb.timeout {
            cb.state = StateHalfOpen
            cb.failures = 0
        } else {
            cb.mu.Unlock()
            return errors.New("circuit breaker is open")
        }
    }
    
    cb.mu.Unlock()
    
    // Execute function
    err := fn()
    
    cb.mu.Lock()
    defer cb.mu.Unlock()
    
    if err != nil {
        cb.failures++
        cb.lastFailTime = time.Now()
        
        if cb.failures >= cb.maxFailures {
            cb.state = StateOpen
        }
        
        return err
    }
    
    // Success - reset circuit
    if cb.state == StateHalfOpen {
        cb.state = StateClosed
    }
    cb.failures = 0
    
    return nil
}

// Payment gateway with circuit breaker
type ResilientPaymentGateway struct {
    breaker *CircuitBreaker
}

func NewResilientPaymentGateway() *ResilientPaymentGateway {
    return &ResilientPaymentGateway{
        breaker: NewCircuitBreaker(5, 30*time.Second),
    }
}

func (g *ResilientPaymentGateway) Charge(payment *Payment) error {
    return g.breaker.Call(func() error {
        // Actual payment gateway call
        return g.callPaymentGateway(payment)
    })
}

func (g *ResilientPaymentGateway) callPaymentGateway(payment *Payment) error {
    // Simulate gateway call
    time.Sleep(50 * time.Millisecond)
    return nil
}

Pattern 5: Distributed Lock for Concurrent Payments

Prevent double-charging using distributed locks.

package main

import (
    "context"
    "fmt"
    "time"
    
    "github.com/go-redis/redis/v8"
)

type DistributedLock struct {
    client *redis.Client
}

func NewDistributedLock(redisAddr string) *DistributedLock {
    return &DistributedLock{
        client: redis.NewClient(&redis.Options{
            Addr: redisAddr,
        }),
    }
}

func (dl *DistributedLock) AcquireLock(
    ctx context.Context,
    key string,
    ttl time.Duration,
) (bool, error) {
    
    // SET key value NX EX ttl
    result, err := dl.client.SetNX(ctx, key, "locked", ttl).Result()
    return result, err
}

func (dl *DistributedLock) ReleaseLock(ctx context.Context, key string) error {
    return dl.client.Del(ctx, key).Err()
}

// Payment service with distributed locking
type DistributedPaymentService struct {
    lock *DistributedLock
}

func (s *DistributedPaymentService) ProcessPayment(
    ctx context.Context,
    payment *Payment,
) (*PaymentResult, error) {
    
    lockKey := fmt.Sprintf("payment:lock:%s", payment.ID)
    
    // Try to acquire lock
    acquired, err := s.lock.AcquireLock(ctx, lockKey, 30*time.Second)
    if err != nil {
        return nil, err
    }
    
    if !acquired {
        return nil, errors.New("payment already being processed")
    }
    
    defer s.lock.ReleaseLock(ctx, lockKey)
    
    // Process payment
    result := &PaymentResult{
        Payment: payment,
        Success: true,
    }
    
    return result, nil
}

Pattern 6: Fan-Out/Fan-In for Multi-Gateway Processing

Try multiple payment gateways concurrently.

package main

import (
    "context"
    "fmt"
    "sync"
)

type Gateway interface {
    Charge(ctx context.Context, payment *Payment) (*PaymentResult, error)
    Name() string
}

type StripeGateway struct{}
func (g *StripeGateway) Name() string { return "Stripe" }
func (g *StripeGateway) Charge(ctx context.Context, p *Payment) (*PaymentResult, error) {
    // Stripe implementation
    return &PaymentResult{Payment: p, Success: true}, nil
}

type PayPalGateway struct{}
func (g *PayPalGateway) Name() string { return "PayPal" }
func (g *PayPalGateway) Charge(ctx context.Context, p *Payment) (*PaymentResult, error) {
    // PayPal implementation
    return &PaymentResult{Payment: p, Success: true}, nil
}

type MultiGatewayProcessor struct {
    gateways []Gateway
}

func NewMultiGatewayProcessor(gateways []Gateway) *MultiGatewayProcessor {
    return &MultiGatewayProcessor{gateways: gateways}
}

func (p *MultiGatewayProcessor) ProcessPayment(
    ctx context.Context,
    payment *Payment,
) (*PaymentResult, error) {
    
    // Fan-out: try all gateways concurrently
    results := make(chan *PaymentResult, len(p.gateways))
    errors := make(chan error, len(p.gateways))
    
    var wg sync.WaitGroup
    
    for _, gateway := range p.gateways {
        wg.Add(1)
        
        go func(gw Gateway) {
            defer wg.Done()
            
            result, err := gw.Charge(ctx, payment)
            if err != nil {
                errors <- fmt.Errorf("%s failed: %w", gw.Name(), err)
                return
            }
            
            results <- result
        }(gateway)
    }
    
    // Close channels when all goroutines complete
    go func() {
        wg.Wait()
        close(results)
        close(errors)
    }()
    
    // Fan-in: return first successful result
    select {
    case result := <-results:
        return result, nil
    case err := <-errors:
        // Log error but continue waiting for success
        fmt.Printf("Gateway error: %v\n", err)
    case <-ctx.Done():
        return nil, ctx.Err()
    }
    
    return nil, fmt.Errorf("all gateways failed")
}

Pattern 7: Graceful Shutdown

Ensure in-flight payments complete before shutdown.

package main

import (
    "context"
    "os"
    "os/signal"
    "sync"
    "syscall"
    "time"
)

type GracefulPaymentService struct {
    processor  *PaymentProcessor
    wg         sync.WaitGroup
    shutdownCh chan struct{}
}

func NewGracefulPaymentService() *GracefulPaymentService {
    return &GracefulPaymentService{
        processor:  NewPaymentProcessor(10, 100),
        shutdownCh: make(chan struct{}),
    }
}

func (s *GracefulPaymentService) Start(ctx context.Context) {
    s.processor.Start(ctx)
    
    // Handle shutdown signals
    sigCh := make(chan os.Signal, 1)
    signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
    
    go func() {
        <-sigCh
        fmt.Println("Shutdown signal received")
        s.Shutdown()
    }()
}

func (s *GracefulPaymentService) ProcessPayment(payment *Payment) {
    s.wg.Add(1)
    
    go func() {
        defer s.wg.Done()
        
        select {
        case <-s.shutdownCh:
            fmt.Printf("Rejecting payment %s due to shutdown\n", payment.ID)
            return
        default:
            s.processor.Submit(payment)
        }
    }()
}

func (s *GracefulPaymentService) Shutdown() {
    close(s.shutdownCh)
    
    // Wait for in-flight payments with timeout
    done := make(chan struct{})
    
    go func() {
        s.wg.Wait()
        close(done)
    }()
    
    select {
    case <-done:
        fmt.Println("All payments completed")
    case <-time.After(30 * time.Second):
        fmt.Println("Shutdown timeout - forcing exit")
    }
    
    s.processor.Shutdown()
}

Real-World Example: Complete Payment Service

package main

import (
    "context"
    "fmt"
    "time"
)

type PaymentService struct {
    processor   *PaymentProcessor
    idempotency *IdempotentPaymentService
    rateLimiter *TokenBucket
    breaker     *CircuitBreaker
}

func NewPaymentService() *PaymentService {
    return &PaymentService{
        processor:   NewPaymentProcessor(20, 1000),
        idempotency: NewIdempotentPaymentService(),
        rateLimiter: NewTokenBucket(100, 10*time.Millisecond), // 100 RPS
        breaker:     NewCircuitBreaker(10, 1*time.Minute),
    }
}

func (s *PaymentService) ProcessPayment(
    ctx context.Context,
    payment *Payment,
) (*PaymentResult, error) {
    
    // 1. Check idempotency
    result, err := s.idempotency.ProcessPayment(ctx, payment)
    if err == nil {
        return result, nil
    }
    
    // 2. Rate limiting
    if !s.rateLimiter.Allow() {
        return nil, fmt.Errorf("rate limit exceeded")
    }
    
    // 3. Circuit breaker
    err = s.breaker.Call(func() error {
        s.processor.Submit(payment)
        return nil
    })
    
    if err != nil {
        return nil, err
    }
    
    // 4. Wait for result
    select {
    case result := <-s.processor.Results():
        return result, nil
    case <-ctx.Done():
        return nil, ctx.Err()
    }
}

func main() {
    ctx := context.Background()
    service := NewPaymentService()
    
    service.processor.Start(ctx)
    defer service.processor.Shutdown()
    
    // Process payment
    payment := &Payment{
        ID:       "PAY-123",
        Amount:   99.99,
        Currency: "USD",
    }
    
    result, err := service.ProcessPayment(ctx, payment)
    if err != nil {
        fmt.Printf("Payment failed: %v\n", err)
        return
    }
    
    fmt.Printf("Payment processed: %+v\n", result)
}

Best Practices

1. Always Use Context for Cancellation

func processPayment(ctx context.Context, payment *Payment) error {
    select {
    case <-ctx.Done():
        return ctx.Err()
    default:
        // Process payment
    }
}

2. Avoid Goroutine Leaks

// BAD: Goroutine may leak
go func() {
    for {
        processPayment()
    }
}()

// GOOD: Goroutine can be stopped
go func() {
    for {
        select {
        case <-ctx.Done():
            return
        default:
            processPayment()
        }
    }
}()

3. Use Buffered Channels Wisely

// Unbuffered: Sender blocks until receiver ready
ch := make(chan *Payment)

// Buffered: Sender blocks only when buffer full
ch := make(chan *Payment, 100)

4. Handle Panics in Goroutines

go func() {
    defer func() {
        if r := recover(); r != nil {
            log.Printf("Recovered from panic: %v", r)
        }
    }()
    
    processPayment()
}()

Conclusion

Go's concurrency primitives make it ideal for building high-performance payment systems. Key takeaways:

  • Use worker pools for bounded concurrency
  • Implement idempotency for safe retries
  • Add rate limiting to protect downstream services
  • Use circuit breakers for fault tolerance
  • Always handle graceful shutdown
  • Test with race detector: go test -race

These patterns will help you build robust, scalable payment processing systems that can handle millions of transactions reliably.

References