Streaming Guide
Rocco provides built-in support for Server-Sent Events (SSE), enabling real-time data streaming from server to client over HTTP.
When to Use SSE
| Use Case | SSE | WebSocket | Polling |
|---|---|---|---|
| Server → Client updates | ✓ Best | ✓ | ✓ |
| Bidirectional communication | ✗ | ✓ Best | ✗ |
| Browser reconnection | ✓ Built-in | ✗ Manual | N/A |
| Proxy/firewall compatibility | ✓ Excellent | ✗ Can be blocked | ✓ |
| Binary data | ✗ | ✓ | ✓ |
SSE is ideal for: notifications, live feeds, progress updates, dashboards, and any scenario where the server pushes updates to clients.
Basic Usage
Creating a Stream Handler
type PriceUpdate struct {
Symbol string `json:"symbol"`
Price float64 `json:"price"`
}
handler := rocco.NewStreamHandler[rocco.NoBody, PriceUpdate](
"price-stream",
http.MethodGet,
"/prices/stream",
func(req *rocco.Request[rocco.NoBody], stream rocco.Stream[PriceUpdate]) error {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for {
select {
case <-stream.Done():
// Client disconnected
return nil
case <-ticker.C:
if err := stream.Send(PriceUpdate{
Symbol: "BTC",
Price: getCurrentPrice(),
}); err != nil {
return err
}
}
}
},
).WithSummary("Stream price updates")
The Stream Interface
type Stream[T any] interface {
// Send sends a data-only event
Send(data T) error
// SendEvent sends a named event with data
SendEvent(event string, data T) error
// SendComment sends a comment (useful for keep-alive)
SendComment(comment string) error
// Done returns a channel closed when client disconnects
Done() <-chan struct{}
}
Event Types
Data Events
The simplest form - sends JSON data:
stream.Send(PriceUpdate{Symbol: "ETH", Price: 2500.00})
Output:
data: {"symbol":"ETH","price":2500}
Named Events
Named events allow clients to filter by event type:
stream.SendEvent("price", PriceUpdate{Symbol: "ETH", Price: 2500.00})
stream.SendEvent("volume", VolumeUpdate{Symbol: "ETH", Volume: 1000000})
Output:
event: price
data: {"symbol":"ETH","price":2500}
event: volume
data: {"symbol":"ETH","volume":1000000}
Client-side handling:
const source = new EventSource('/prices/stream');
source.addEventListener('price', (e) => {
const price = JSON.parse(e.data);
console.log('Price update:', price);
});
source.addEventListener('volume', (e) => {
const volume = JSON.parse(e.data);
console.log('Volume update:', volume);
});
Comments (Keep-Alive)
Comments are ignored by clients but keep the connection alive:
// Send keep-alive every 30 seconds
ticker := time.NewTicker(30 * time.Second)
for {
select {
case <-ticker.C:
stream.SendComment("keep-alive")
case data := <-updates:
stream.Send(data)
case <-stream.Done():
return nil
}
}
Output:
: keep-alive
Client Disconnection
Always check stream.Done() to detect client disconnection:
func(req *rocco.Request[rocco.NoBody], stream rocco.Stream[Event]) error {
for {
select {
case <-stream.Done():
// Clean up resources
log.Println("Client disconnected")
return nil
case event := <-eventChannel:
if err := stream.Send(event); err != nil {
return err
}
}
}
}
Authentication
Stream handlers support the same authentication as regular handlers:
handler := rocco.NewStreamHandler[rocco.NoBody, Notification](
"notifications",
http.MethodGet,
"/notifications/stream",
func(req *rocco.Request[rocco.NoBody], stream rocco.Stream[Notification]) error {
userID := req.Identity.ID()
// Stream notifications for this user
for notification := range getUserNotifications(userID) {
stream.Send(notification)
}
return nil
},
).
WithAuthentication().
WithScopes("notifications:read")
Request Input
Stream handlers can accept request bodies (useful for POST streams with initial configuration):
type StreamConfig struct {
Symbols []string `json:"symbols" validate:"required,min=1"`
Interval int `json:"interval" validate:"min=100,max=60000"`
}
handler := rocco.NewStreamHandler[StreamConfig, PriceUpdate](
"configured-stream",
http.MethodPost,
"/prices/stream",
func(req *rocco.Request[StreamConfig], stream rocco.Stream[PriceUpdate]) error {
ticker := time.NewTicker(time.Duration(req.Body.Interval) * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-stream.Done():
return nil
case <-ticker.C:
for _, symbol := range req.Body.Symbols {
stream.Send(PriceUpdate{
Symbol: symbol,
Price: getPrice(symbol),
})
}
}
}
},
)
Path and Query Parameters
handler := rocco.NewStreamHandler[rocco.NoBody, Event](
"channel-stream",
http.MethodGet,
"/channels/{channel}/events",
func(req *rocco.Request[rocco.NoBody], stream rocco.Stream[Event]) error {
channel := req.Params.Path["channel"]
filter := req.Params.Query["filter"]
events := subscribeToChannel(channel, filter)
for event := range events {
stream.Send(event)
}
return nil
},
).
WithPathParams("channel").
WithQueryParams("filter")
OpenAPI Documentation
Stream handlers are documented in OpenAPI with text/event-stream content type:
/prices/stream:
get:
summary: Stream price updates
responses:
'200':
description: Server-Sent Events stream
content:
text/event-stream:
schema:
type: string
description: SSE stream emitting PriceUpdate events as JSON
Best Practices
1. Always Handle Disconnection
select {
case <-stream.Done():
return nil // Clean exit
case data := <-source:
stream.Send(data)
}
2. Use Keep-Alives for Long-Lived Streams
keepAlive := time.NewTicker(30 * time.Second)
defer keepAlive.Stop()
for {
select {
case <-keepAlive.C:
stream.SendComment("ping")
// ... other cases
}
}
3. Clean Up Resources
func(req *rocco.Request[rocco.NoBody], stream rocco.Stream[Event]) error {
subscription := subscribe()
defer subscription.Close() // Always clean up
for {
select {
case <-stream.Done():
return nil
case event := <-subscription.Events():
stream.Send(event)
}
}
}
4. Consider Backpressure
If events arrive faster than they can be sent:
for {
select {
case <-stream.Done():
return nil
case event := <-fastSource:
// Non-blocking send with timeout
ctx, cancel := context.WithTimeout(req.Context, time.Second)
err := stream.Send(event)
cancel()
if err != nil {
return err // Client too slow, disconnect
}
}
}
Observability
Stream handlers emit lifecycle signals:
| Signal | When |
|---|---|
http.stream.executing | Handler starts |
http.stream.started | Headers sent, stream established |
http.stream.ended | Handler completed normally |
http.stream.client.disconnected | Client disconnected |
http.stream.error | Error during streaming |
Hook into these for metrics and logging:
capitan.Subscribe(rocco.StreamStarted, func(fields map[string]any) {
metrics.StreamsActive.Inc()
})
capitan.Subscribe(rocco.StreamEnded, func(fields map[string]any) {
metrics.StreamsActive.Dec()
})