zoobzio December 26, 2025 Edit this page

Real-time Patterns Cookbook

Practical recipes for building real-time features with Server-Sent Events.

Price Ticker

Stream financial data updates to clients:

type PriceUpdate struct {
    Symbol    string    `json:"symbol"`
    Price     float64   `json:"price"`
    Change    float64   `json:"change"`
    Timestamp time.Time `json:"timestamp"`
}

type TickerConfig struct {
    Symbols []string `json:"symbols" validate:"required,min=1,max=10"`
}

func NewPriceTickerHandler(priceService *PriceService) *rocco.StreamHandler[TickerConfig, PriceUpdate] {
    return rocco.NewStreamHandler[TickerConfig, PriceUpdate](
        "price-ticker",
        http.MethodPost,
        "/prices/stream",
        func(req *rocco.Request[TickerConfig], stream rocco.Stream[PriceUpdate]) error {
            // Subscribe to price updates for requested symbols
            updates := priceService.Subscribe(req.Body.Symbols)
            defer priceService.Unsubscribe(updates)

            ticker := time.NewTicker(30 * time.Second)
            defer ticker.Stop()

            for {
                select {
                case <-stream.Done():
                    return nil
                case <-ticker.C:
                    stream.SendComment("ping")
                case update := <-updates:
                    if err := stream.Send(update); err != nil {
                        return err
                    }
                }
            }
        },
    ).
        WithSummary("Stream price updates").
        WithDescription("Streams real-time price updates for specified symbols").
        WithTags("prices", "streaming")
}

User Notifications

Stream notifications for authenticated users:

type Notification struct {
    ID        string    `json:"id"`
    Type      string    `json:"type"`
    Title     string    `json:"title"`
    Body      string    `json:"body"`
    Read      bool      `json:"read"`
    CreatedAt time.Time `json:"created_at"`
}

func NewNotificationHandler(notifService *NotificationService) *rocco.StreamHandler[rocco.NoBody, Notification] {
    return rocco.NewStreamHandler[rocco.NoBody, Notification](
        "notifications",
        http.MethodGet,
        "/notifications/stream",
        func(req *rocco.Request[rocco.NoBody], stream rocco.Stream[Notification]) error {
            userID := req.Identity.ID()

            // Send unread notifications first
            unread, err := notifService.GetUnread(userID)
            if err != nil {
                return err
            }
            for _, n := range unread {
                if err := stream.SendEvent("unread", n); err != nil {
                    return err
                }
            }

            // Stream new notifications
            newNotifs := notifService.Subscribe(userID)
            defer notifService.Unsubscribe(userID, newNotifs)

            for {
                select {
                case <-stream.Done():
                    return nil
                case notif := <-newNotifs:
                    if err := stream.SendEvent("new", notif); err != nil {
                        return err
                    }
                }
            }
        },
    ).
        WithSummary("Stream notifications").
        WithAuthentication().
        WithScopes("notifications:read")
}

Progress Updates

Stream progress for long-running tasks:

type ProgressUpdate struct {
    TaskID     string  `json:"task_id"`
    Status     string  `json:"status"` // pending, running, completed, failed
    Progress   float64 `json:"progress"` // 0.0 to 1.0
    Message    string  `json:"message,omitempty"`
    Error      string  `json:"error,omitempty"`
}

func NewProgressHandler(taskService *TaskService) *rocco.StreamHandler[rocco.NoBody, ProgressUpdate] {
    return rocco.NewStreamHandler[rocco.NoBody, ProgressUpdate](
        "task-progress",
        http.MethodGet,
        "/tasks/{taskId}/progress",
        func(req *rocco.Request[rocco.NoBody], stream rocco.Stream[ProgressUpdate]) error {
            taskID := req.Params.Path["taskId"]

            updates := taskService.SubscribeProgress(taskID)
            defer taskService.UnsubscribeProgress(taskID, updates)

            for {
                select {
                case <-stream.Done():
                    return nil
                case update, ok := <-updates:
                    if !ok {
                        // Channel closed, task complete
                        return nil
                    }
                    if err := stream.Send(update); err != nil {
                        return err
                    }
                    if update.Status == "completed" || update.Status == "failed" {
                        return nil
                    }
                }
            }
        },
    ).
        WithSummary("Stream task progress").
        WithPathParams("taskId").
        WithAuthentication()
}

Chat / Pub-Sub Pattern

Stream messages for a channel:

type ChatMessage struct {
    ID        string    `json:"id"`
    Channel   string    `json:"channel"`
    UserID    string    `json:"user_id"`
    Username  string    `json:"username"`
    Content   string    `json:"content"`
    Timestamp time.Time `json:"timestamp"`
}

type ChannelEvent struct {
    Type    string `json:"type"` // message, join, leave, typing
    Payload any    `json:"payload"`
}

func NewChatHandler(chatService *ChatService) *rocco.StreamHandler[rocco.NoBody, ChannelEvent] {
    return rocco.NewStreamHandler[rocco.NoBody, ChannelEvent](
        "chat-stream",
        http.MethodGet,
        "/channels/{channel}/stream",
        func(req *rocco.Request[rocco.NoBody], stream rocco.Stream[ChannelEvent]) error {
            channel := req.Params.Path["channel"]
            userID := req.Identity.ID()

            // Join channel
            sub, err := chatService.Join(channel, userID)
            if err != nil {
                return rocco.ErrForbidden.WithMessage("cannot join channel")
            }
            defer chatService.Leave(channel, userID)

            // Notify others of join
            stream.SendEvent("system", ChannelEvent{
                Type:    "joined",
                Payload: map[string]string{"channel": channel},
            })

            keepAlive := time.NewTicker(30 * time.Second)
            defer keepAlive.Stop()

            for {
                select {
                case <-stream.Done():
                    return nil
                case <-keepAlive.C:
                    stream.SendComment("ping")
                case event := <-sub.Events:
                    if err := stream.SendEvent(event.Type, event); err != nil {
                        return err
                    }
                }
            }
        },
    ).
        WithSummary("Stream channel events").
        WithPathParams("channel").
        WithAuthentication()
}

Dashboard Metrics

Stream live metrics for dashboards:

type DashboardMetrics struct {
    Timestamp       time.Time `json:"timestamp"`
    ActiveUsers     int       `json:"active_users"`
    RequestsPerSec  float64   `json:"requests_per_sec"`
    ErrorRate       float64   `json:"error_rate"`
    AvgLatencyMs    float64   `json:"avg_latency_ms"`
    CPUUsage        float64   `json:"cpu_usage"`
    MemoryUsage     float64   `json:"memory_usage"`
}

func NewMetricsHandler(metricsService *MetricsService) *rocco.StreamHandler[rocco.NoBody, DashboardMetrics] {
    return rocco.NewStreamHandler[rocco.NoBody, DashboardMetrics](
        "dashboard-metrics",
        http.MethodGet,
        "/admin/metrics/stream",
        func(req *rocco.Request[rocco.NoBody], stream rocco.Stream[DashboardMetrics]) error {
            ticker := time.NewTicker(time.Second)
            defer ticker.Stop()

            for {
                select {
                case <-stream.Done():
                    return nil
                case <-ticker.C:
                    metrics := metricsService.Current()
                    if err := stream.Send(metrics); err != nil {
                        return err
                    }
                }
            }
        },
    ).
        WithSummary("Stream dashboard metrics").
        WithAuthentication().
        WithRoles("admin")
}

Connection Management Pattern

Manage multiple concurrent connections with limits:

type ConnectionManager struct {
    mu          sync.RWMutex
    connections map[string]int // userID -> connection count
    maxPerUser  int
}

func NewConnectionManager(maxPerUser int) *ConnectionManager {
    return &ConnectionManager{
        connections: make(map[string]int),
        maxPerUser:  maxPerUser,
    }
}

func (cm *ConnectionManager) Acquire(userID string) error {
    cm.mu.Lock()
    defer cm.mu.Unlock()

    if cm.connections[userID] >= cm.maxPerUser {
        return errors.New("max connections exceeded")
    }
    cm.connections[userID]++
    return nil
}

func (cm *ConnectionManager) Release(userID string) {
    cm.mu.Lock()
    defer cm.mu.Unlock()

    cm.connections[userID]--
    if cm.connections[userID] <= 0 {
        delete(cm.connections, userID)
    }
}

// Usage in handler
func NewManagedStreamHandler(cm *ConnectionManager) *rocco.StreamHandler[rocco.NoBody, Event] {
    return rocco.NewStreamHandler[rocco.NoBody, Event](
        "managed-stream",
        http.MethodGet,
        "/events",
        func(req *rocco.Request[rocco.NoBody], stream rocco.Stream[Event]) error {
            userID := req.Identity.ID()

            if err := cm.Acquire(userID); err != nil {
                return rocco.ErrTooManyRequests.WithMessage("too many connections")
            }
            defer cm.Release(userID)

            // ... stream logic
            for {
                select {
                case <-stream.Done():
                    return nil
                // ...
                }
            }
        },
    ).WithAuthentication()
}

Client-Side Usage

JavaScript EventSource

// Basic usage
const source = new EventSource('/events');

source.onmessage = (event) => {
    const data = JSON.parse(event.data);
    console.log('Received:', data);
};

source.onerror = (error) => {
    console.error('SSE error:', error);
};

// Named events
source.addEventListener('price', (event) => {
    const price = JSON.parse(event.data);
    updatePriceDisplay(price);
});

source.addEventListener('notification', (event) => {
    const notif = JSON.parse(event.data);
    showNotification(notif);
});

// With authentication (POST not supported by EventSource)
// Use fetch with ReadableStream instead
async function streamWithAuth(url, token) {
    const response = await fetch(url, {
        headers: { 'Authorization': `Bearer ${token}` }
    });

    const reader = response.body.getReader();
    const decoder = new TextDecoder();

    while (true) {
        const { done, value } = await reader.read();
        if (done) break;

        const text = decoder.decode(value);
        // Parse SSE format
        const lines = text.split('\n');
        for (const line of lines) {
            if (line.startsWith('data: ')) {
                const data = JSON.parse(line.slice(6));
                handleEvent(data);
            }
        }
    }
}

See Also