zoobzio December 26, 2025 Edit this page

Streaming Guide

Rocco provides built-in support for Server-Sent Events (SSE), enabling real-time data streaming from server to client over HTTP.

When to Use SSE

Use CaseSSEWebSocketPolling
Server → Client updates✓ Best
Bidirectional communication✓ Best
Browser reconnection✓ Built-in✗ ManualN/A
Proxy/firewall compatibility✓ Excellent✗ Can be blocked
Binary data

SSE is ideal for: notifications, live feeds, progress updates, dashboards, and any scenario where the server pushes updates to clients.

Basic Usage

Creating a Stream Handler

type PriceUpdate struct {
    Symbol string  `json:"symbol"`
    Price  float64 `json:"price"`
}

handler := rocco.NewStreamHandler[rocco.NoBody, PriceUpdate](
    "price-stream",
    http.MethodGet,
    "/prices/stream",
    func(req *rocco.Request[rocco.NoBody], stream rocco.Stream[PriceUpdate]) error {
        ticker := time.NewTicker(time.Second)
        defer ticker.Stop()

        for {
            select {
            case <-stream.Done():
                // Client disconnected
                return nil
            case <-ticker.C:
                if err := stream.Send(PriceUpdate{
                    Symbol: "BTC",
                    Price:  getCurrentPrice(),
                }); err != nil {
                    return err
                }
            }
        }
    },
).WithSummary("Stream price updates")

The Stream Interface

type Stream[T any] interface {
    // Send sends a data-only event
    Send(data T) error

    // SendEvent sends a named event with data
    SendEvent(event string, data T) error

    // SendComment sends a comment (useful for keep-alive)
    SendComment(comment string) error

    // Done returns a channel closed when client disconnects
    Done() <-chan struct{}
}

Event Types

Data Events

The simplest form - sends JSON data:

stream.Send(PriceUpdate{Symbol: "ETH", Price: 2500.00})

Output:

data: {"symbol":"ETH","price":2500}

Named Events

Named events allow clients to filter by event type:

stream.SendEvent("price", PriceUpdate{Symbol: "ETH", Price: 2500.00})
stream.SendEvent("volume", VolumeUpdate{Symbol: "ETH", Volume: 1000000})

Output:

event: price
data: {"symbol":"ETH","price":2500}

event: volume
data: {"symbol":"ETH","volume":1000000}

Client-side handling:

const source = new EventSource('/prices/stream');

source.addEventListener('price', (e) => {
    const price = JSON.parse(e.data);
    console.log('Price update:', price);
});

source.addEventListener('volume', (e) => {
    const volume = JSON.parse(e.data);
    console.log('Volume update:', volume);
});

Comments (Keep-Alive)

Comments are ignored by clients but keep the connection alive:

// Send keep-alive every 30 seconds
ticker := time.NewTicker(30 * time.Second)
for {
    select {
    case <-ticker.C:
        stream.SendComment("keep-alive")
    case data := <-updates:
        stream.Send(data)
    case <-stream.Done():
        return nil
    }
}

Output:

: keep-alive

Client Disconnection

Always check stream.Done() to detect client disconnection:

func(req *rocco.Request[rocco.NoBody], stream rocco.Stream[Event]) error {
    for {
        select {
        case <-stream.Done():
            // Clean up resources
            log.Println("Client disconnected")
            return nil
        case event := <-eventChannel:
            if err := stream.Send(event); err != nil {
                return err
            }
        }
    }
}

Authentication

Stream handlers support the same authentication as regular handlers:

handler := rocco.NewStreamHandler[rocco.NoBody, Notification](
    "notifications",
    http.MethodGet,
    "/notifications/stream",
    func(req *rocco.Request[rocco.NoBody], stream rocco.Stream[Notification]) error {
        userID := req.Identity.ID()
        // Stream notifications for this user
        for notification := range getUserNotifications(userID) {
            stream.Send(notification)
        }
        return nil
    },
).
    WithAuthentication().
    WithScopes("notifications:read")

Request Input

Stream handlers can accept request bodies (useful for POST streams with initial configuration):

type StreamConfig struct {
    Symbols  []string `json:"symbols" validate:"required,min=1"`
    Interval int      `json:"interval" validate:"min=100,max=60000"`
}

handler := rocco.NewStreamHandler[StreamConfig, PriceUpdate](
    "configured-stream",
    http.MethodPost,
    "/prices/stream",
    func(req *rocco.Request[StreamConfig], stream rocco.Stream[PriceUpdate]) error {
        ticker := time.NewTicker(time.Duration(req.Body.Interval) * time.Millisecond)
        defer ticker.Stop()

        for {
            select {
            case <-stream.Done():
                return nil
            case <-ticker.C:
                for _, symbol := range req.Body.Symbols {
                    stream.Send(PriceUpdate{
                        Symbol: symbol,
                        Price:  getPrice(symbol),
                    })
                }
            }
        }
    },
)

Path and Query Parameters

handler := rocco.NewStreamHandler[rocco.NoBody, Event](
    "channel-stream",
    http.MethodGet,
    "/channels/{channel}/events",
    func(req *rocco.Request[rocco.NoBody], stream rocco.Stream[Event]) error {
        channel := req.Params.Path["channel"]
        filter := req.Params.Query["filter"]

        events := subscribeToChannel(channel, filter)
        for event := range events {
            stream.Send(event)
        }
        return nil
    },
).
    WithPathParams("channel").
    WithQueryParams("filter")

OpenAPI Documentation

Stream handlers are documented in OpenAPI with text/event-stream content type:

/prices/stream:
  get:
    summary: Stream price updates
    responses:
      '200':
        description: Server-Sent Events stream
        content:
          text/event-stream:
            schema:
              type: string
              description: SSE stream emitting PriceUpdate events as JSON

Best Practices

1. Always Handle Disconnection

select {
case <-stream.Done():
    return nil  // Clean exit
case data := <-source:
    stream.Send(data)
}

2. Use Keep-Alives for Long-Lived Streams

keepAlive := time.NewTicker(30 * time.Second)
defer keepAlive.Stop()

for {
    select {
    case <-keepAlive.C:
        stream.SendComment("ping")
    // ... other cases
    }
}

3. Clean Up Resources

func(req *rocco.Request[rocco.NoBody], stream rocco.Stream[Event]) error {
    subscription := subscribe()
    defer subscription.Close()  // Always clean up

    for {
        select {
        case <-stream.Done():
            return nil
        case event := <-subscription.Events():
            stream.Send(event)
        }
    }
}

4. Consider Backpressure

If events arrive faster than they can be sent:

for {
    select {
    case <-stream.Done():
        return nil
    case event := <-fastSource:
        // Non-blocking send with timeout
        ctx, cancel := context.WithTimeout(req.Context, time.Second)
        err := stream.Send(event)
        cancel()
        if err != nil {
            return err  // Client too slow, disconnect
        }
    }
}

Observability

Stream handlers emit lifecycle signals:

SignalWhen
http.stream.executingHandler starts
http.stream.startedHeaders sent, stream established
http.stream.endedHandler completed normally
http.stream.client.disconnectedClient disconnected
http.stream.errorError during streaming

Hook into these for metrics and logging:

capitan.Subscribe(rocco.StreamStarted, func(fields map[string]any) {
    metrics.StreamsActive.Inc()
})

capitan.Subscribe(rocco.StreamEnded, func(fields map[string]any) {
    metrics.StreamsActive.Dec()
})