Real-time Patterns Cookbook
Practical recipes for building real-time features with Server-Sent Events.
Price Ticker
Stream financial data updates to clients:
type PriceUpdate struct {
Symbol string `json:"symbol"`
Price float64 `json:"price"`
Change float64 `json:"change"`
Timestamp time.Time `json:"timestamp"`
}
type TickerConfig struct {
Symbols []string `json:"symbols" validate:"required,min=1,max=10"`
}
func NewPriceTickerHandler(priceService *PriceService) *rocco.StreamHandler[TickerConfig, PriceUpdate] {
return rocco.NewStreamHandler[TickerConfig, PriceUpdate](
"price-ticker",
http.MethodPost,
"/prices/stream",
func(req *rocco.Request[TickerConfig], stream rocco.Stream[PriceUpdate]) error {
// Subscribe to price updates for requested symbols
updates := priceService.Subscribe(req.Body.Symbols)
defer priceService.Unsubscribe(updates)
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for {
select {
case <-stream.Done():
return nil
case <-ticker.C:
stream.SendComment("ping")
case update := <-updates:
if err := stream.Send(update); err != nil {
return err
}
}
}
},
).
WithSummary("Stream price updates").
WithDescription("Streams real-time price updates for specified symbols").
WithTags("prices", "streaming")
}
User Notifications
Stream notifications for authenticated users:
type Notification struct {
ID string `json:"id"`
Type string `json:"type"`
Title string `json:"title"`
Body string `json:"body"`
Read bool `json:"read"`
CreatedAt time.Time `json:"created_at"`
}
func NewNotificationHandler(notifService *NotificationService) *rocco.StreamHandler[rocco.NoBody, Notification] {
return rocco.NewStreamHandler[rocco.NoBody, Notification](
"notifications",
http.MethodGet,
"/notifications/stream",
func(req *rocco.Request[rocco.NoBody], stream rocco.Stream[Notification]) error {
userID := req.Identity.ID()
// Send unread notifications first
unread, err := notifService.GetUnread(userID)
if err != nil {
return err
}
for _, n := range unread {
if err := stream.SendEvent("unread", n); err != nil {
return err
}
}
// Stream new notifications
newNotifs := notifService.Subscribe(userID)
defer notifService.Unsubscribe(userID, newNotifs)
for {
select {
case <-stream.Done():
return nil
case notif := <-newNotifs:
if err := stream.SendEvent("new", notif); err != nil {
return err
}
}
}
},
).
WithSummary("Stream notifications").
WithAuthentication().
WithScopes("notifications:read")
}
Progress Updates
Stream progress for long-running tasks:
type ProgressUpdate struct {
TaskID string `json:"task_id"`
Status string `json:"status"` // pending, running, completed, failed
Progress float64 `json:"progress"` // 0.0 to 1.0
Message string `json:"message,omitempty"`
Error string `json:"error,omitempty"`
}
func NewProgressHandler(taskService *TaskService) *rocco.StreamHandler[rocco.NoBody, ProgressUpdate] {
return rocco.NewStreamHandler[rocco.NoBody, ProgressUpdate](
"task-progress",
http.MethodGet,
"/tasks/{taskId}/progress",
func(req *rocco.Request[rocco.NoBody], stream rocco.Stream[ProgressUpdate]) error {
taskID := req.Params.Path["taskId"]
updates := taskService.SubscribeProgress(taskID)
defer taskService.UnsubscribeProgress(taskID, updates)
for {
select {
case <-stream.Done():
return nil
case update, ok := <-updates:
if !ok {
// Channel closed, task complete
return nil
}
if err := stream.Send(update); err != nil {
return err
}
if update.Status == "completed" || update.Status == "failed" {
return nil
}
}
}
},
).
WithSummary("Stream task progress").
WithPathParams("taskId").
WithAuthentication()
}
Chat / Pub-Sub Pattern
Stream messages for a channel:
type ChatMessage struct {
ID string `json:"id"`
Channel string `json:"channel"`
UserID string `json:"user_id"`
Username string `json:"username"`
Content string `json:"content"`
Timestamp time.Time `json:"timestamp"`
}
type ChannelEvent struct {
Type string `json:"type"` // message, join, leave, typing
Payload any `json:"payload"`
}
func NewChatHandler(chatService *ChatService) *rocco.StreamHandler[rocco.NoBody, ChannelEvent] {
return rocco.NewStreamHandler[rocco.NoBody, ChannelEvent](
"chat-stream",
http.MethodGet,
"/channels/{channel}/stream",
func(req *rocco.Request[rocco.NoBody], stream rocco.Stream[ChannelEvent]) error {
channel := req.Params.Path["channel"]
userID := req.Identity.ID()
// Join channel
sub, err := chatService.Join(channel, userID)
if err != nil {
return rocco.ErrForbidden.WithMessage("cannot join channel")
}
defer chatService.Leave(channel, userID)
// Notify others of join
stream.SendEvent("system", ChannelEvent{
Type: "joined",
Payload: map[string]string{"channel": channel},
})
keepAlive := time.NewTicker(30 * time.Second)
defer keepAlive.Stop()
for {
select {
case <-stream.Done():
return nil
case <-keepAlive.C:
stream.SendComment("ping")
case event := <-sub.Events:
if err := stream.SendEvent(event.Type, event); err != nil {
return err
}
}
}
},
).
WithSummary("Stream channel events").
WithPathParams("channel").
WithAuthentication()
}
Dashboard Metrics
Stream live metrics for dashboards:
type DashboardMetrics struct {
Timestamp time.Time `json:"timestamp"`
ActiveUsers int `json:"active_users"`
RequestsPerSec float64 `json:"requests_per_sec"`
ErrorRate float64 `json:"error_rate"`
AvgLatencyMs float64 `json:"avg_latency_ms"`
CPUUsage float64 `json:"cpu_usage"`
MemoryUsage float64 `json:"memory_usage"`
}
func NewMetricsHandler(metricsService *MetricsService) *rocco.StreamHandler[rocco.NoBody, DashboardMetrics] {
return rocco.NewStreamHandler[rocco.NoBody, DashboardMetrics](
"dashboard-metrics",
http.MethodGet,
"/admin/metrics/stream",
func(req *rocco.Request[rocco.NoBody], stream rocco.Stream[DashboardMetrics]) error {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for {
select {
case <-stream.Done():
return nil
case <-ticker.C:
metrics := metricsService.Current()
if err := stream.Send(metrics); err != nil {
return err
}
}
}
},
).
WithSummary("Stream dashboard metrics").
WithAuthentication().
WithRoles("admin")
}
Connection Management Pattern
Manage multiple concurrent connections with limits:
type ConnectionManager struct {
mu sync.RWMutex
connections map[string]int // userID -> connection count
maxPerUser int
}
func NewConnectionManager(maxPerUser int) *ConnectionManager {
return &ConnectionManager{
connections: make(map[string]int),
maxPerUser: maxPerUser,
}
}
func (cm *ConnectionManager) Acquire(userID string) error {
cm.mu.Lock()
defer cm.mu.Unlock()
if cm.connections[userID] >= cm.maxPerUser {
return errors.New("max connections exceeded")
}
cm.connections[userID]++
return nil
}
func (cm *ConnectionManager) Release(userID string) {
cm.mu.Lock()
defer cm.mu.Unlock()
cm.connections[userID]--
if cm.connections[userID] <= 0 {
delete(cm.connections, userID)
}
}
// Usage in handler
func NewManagedStreamHandler(cm *ConnectionManager) *rocco.StreamHandler[rocco.NoBody, Event] {
return rocco.NewStreamHandler[rocco.NoBody, Event](
"managed-stream",
http.MethodGet,
"/events",
func(req *rocco.Request[rocco.NoBody], stream rocco.Stream[Event]) error {
userID := req.Identity.ID()
if err := cm.Acquire(userID); err != nil {
return rocco.ErrTooManyRequests.WithMessage("too many connections")
}
defer cm.Release(userID)
// ... stream logic
for {
select {
case <-stream.Done():
return nil
// ...
}
}
},
).WithAuthentication()
}
Client-Side Usage
JavaScript EventSource
// Basic usage
const source = new EventSource('/events');
source.onmessage = (event) => {
const data = JSON.parse(event.data);
console.log('Received:', data);
};
source.onerror = (error) => {
console.error('SSE error:', error);
};
// Named events
source.addEventListener('price', (event) => {
const price = JSON.parse(event.data);
updatePriceDisplay(price);
});
source.addEventListener('notification', (event) => {
const notif = JSON.parse(event.data);
showNotification(notif);
});
// With authentication (POST not supported by EventSource)
// Use fetch with ReadableStream instead
async function streamWithAuth(url, token) {
const response = await fetch(url, {
headers: { 'Authorization': `Bearer ${token}` }
});
const reader = response.body.getReader();
const decoder = new TextDecoder();
while (true) {
const { done, value } = await reader.read();
if (done) break;
const text = decoder.decode(value);
// Parse SSE format
const lines = text.split('\n');
for (const line of lines) {
if (line.startsWith('data: ')) {
const data = JSON.parse(line.slice(6));
handleEvent(data);
}
}
}
}
See Also
- Streaming Guide - Complete SSE documentation
- Authentication Guide - Securing streams
- Events Reference - Stream lifecycle events