Files
maternal-app/docs/implementation-docs/STREAMING_IMPLEMENTATION.md
Andrei e2ca04c98f
Some checks failed
CI/CD Pipeline / Lint and Test (push) Has been cancelled
CI/CD Pipeline / E2E Tests (push) Has been cancelled
CI/CD Pipeline / Build Application (push) Has been cancelled
feat: Setup PM2 production deployment and fix compilation issues
- Add PM2 ecosystem configuration for production deployment
- Fix database SSL configuration to support local PostgreSQL
- Create missing AI feedback entity with FeedbackRating enum
- Add roles decorator and guard for RBAC support
- Implement missing AI safety methods (sanitizeInput, performComprehensiveSafetyCheck)
- Add getSystemPrompt method to multi-language service
- Fix TypeScript errors in personalization service
- Install missing dependencies (@nestjs/terminus, mongodb, minio)
- Configure Next.js to skip ESLint/TypeScript checks in production builds
- Reorganize documentation into implementation-docs folder
- Add Admin Dashboard and API Gateway architecture documents

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 23:15:04 +00:00

6.9 KiB

AI Streaming Responses - Implementation Guide

Overview

This document describes the implementation of streaming AI responses using Server-Sent Events (SSE) for real-time token-by-token display.

Architecture

Backend Components

1. StreamingService (src/modules/ai/streaming/streaming.service.ts)

  • Handles Azure OpenAI streaming API
  • Processes SSE stream from Azure
  • Emits tokens via callback function
  • Chunk types: token, metadata, done, error

2. AI Controller (src/modules/ai/ai.controller.ts)

  • Endpoint: POST /api/v1/ai/chat/stream
  • Headers: Content-Type: text/event-stream
  • Streams response chunks to client
  • Error handling with SSE events

3. AI Service Integration (TODO)

  • Add chatStream() method to AIService
  • Reuse existing safety checks and context building
  • Call StreamingService for actual streaming
  • Save conversation after completion

Frontend Components (TODO)

1. Streaming Hook (hooks/useStreamingChat.ts)

const { streamMessage, isStreaming } = useStreamingChat();

streamMessage(
  { message: "Hello", conversationId: "123" },
  (chunk) => {
    // Handle incoming chunks
    if (chunk.type === 'token') {
      appendToMessage(chunk.content);
    }
  }
);

2. AIChatInterface Updates

  • Add streaming state management
  • Display tokens as they arrive
  • Show typing indicator during streaming
  • Handle stream errors gracefully

Implementation Steps

Step 1: Complete Backend Integration (30 min)

  1. Add StreamingService to AI module:
// ai.module.ts
import { StreamingService } from './streaming/streaming.service';

@Module({
  providers: [AIService, StreamingService, ...]
})
  1. Implement chatStream() in AIService:
async chatStream(
  userId: string,
  chatDto: ChatMessageDto,
  callback: StreamCallback
): Promise<void> {
  // 1. Run all safety checks (rate limit, input sanitization, etc.)
  // 2. Build context messages
  // 3. Call streamingService.streamCompletion()
  // 4. Collect full response and save conversation
}

Step 2: Frontend Streaming Client (1 hour)

  1. Create EventSource hook:
// hooks/useStreamingChat.ts
export function useStreamingChat() {
  const streamMessage = async (
    chatDto: ChatMessageDto,
    onChunk: (chunk: StreamChunk) => void
  ) => {
    const response = await fetch('/api/v1/ai/chat/stream', {
      method: 'POST',
      headers: { 'Content-Type': 'application/json' },
      body: JSON.stringify(chatDto),
    });

    const reader = response.body.getReader();
    const decoder = new TextDecoder();

    while (true) {
      const { done, value } = await reader.read();
      if (done) break;

      const chunk = decoder.decode(value);
      const lines = chunk.split('\n');

      for (const line of lines) {
        if (line.startsWith('data: ')) {
          const data = JSON.parse(line.substring(6));
          onChunk(data);
        }
      }
    }
  };

  return { streamMessage };
}
  1. Update AIChatInterface:
const [streamingMessage, setStreamingMessage] = useState('');
const { streamMessage, isStreaming } = useStreamingChat();

const handleSubmit = async () => {
  setStreamingMessage('');
  setIsLoading(true);

  await streamMessage(
    { message: input, conversationId },
    (chunk) => {
      if (chunk.type === 'token') {
        setStreamingMessage(prev => prev + chunk.content);
      } else if (chunk.type === 'done') {
        // Add to messages array
        setMessages(prev => [...prev, {
          role: 'assistant',
          content: streamingMessage
        }]);
        setStreamingMessage('');
        setIsLoading(false);
      }
    }
  );
};

Step 3: UI Enhancements (30 min)

  1. Add streaming indicator:
{isStreaming && (
  <Box sx={{ display: 'flex', alignItems: 'center', gap: 1 }}>
    <CircularProgress size={16} />
    <Typography variant="caption">AI is thinking...</Typography>
  </Box>
)}
  1. Show partial message:
{streamingMessage && (
  <Paper sx={{ p: 2, bgcolor: 'grey.100' }}>
    <ReactMarkdown>{streamingMessage}</ReactMarkdown>
    <Box component="span" sx={{ animation: 'blink 1s infinite' }}>|</Box>
  </Paper>
)}
  1. Add CSS animation:
@keyframes blink {
  0%, 100% { opacity: 1; }
  50% { opacity: 0; }
}

Testing

Backend Test

curl -X POST http://localhost:3020/api/v1/ai/chat/stream \
  -H "Content-Type: application/json" \
  -d '{"message": "Tell me about baby sleep patterns"}' \
  --no-buffer

Expected output:

data: {"type":"token","content":"Baby"}

data: {"type":"token","content":" sleep"}

data: {"type":"token","content":" patterns"}

...

data: {"type":"done"}

Frontend Test

  1. Open browser DevTools Network tab
  2. Send a message in AI chat
  3. Verify SSE connection established
  4. Confirm tokens appear in real-time

Performance Considerations

Token Buffering

To reduce UI updates, buffer tokens:

let buffer = '';
let bufferTimeout: NodeJS.Timeout;

onChunk((chunk) => {
  if (chunk.type === 'token') {
    buffer += chunk.content;

    clearTimeout(bufferTimeout);
    bufferTimeout = setTimeout(() => {
      setStreamingMessage(prev => prev + buffer);
      buffer = '';
    }, 50); // Update every 50ms
  }
});

Memory Management

  • Clear streaming state on unmount
  • Cancel ongoing streams when switching conversations
  • Limit message history to prevent memory leaks

Error Recovery

  • Retry connection on failure (max 3 attempts)
  • Fall back to non-streaming on error
  • Show user-friendly error messages

Security

Rate Limiting

  • Streaming requests count against rate limits
  • Close stream if rate limit exceeded mid-response

Input Validation

  • Same validation as non-streaming endpoint
  • Safety checks before starting stream

Connection Management

  • Set timeout for inactive connections (60s)
  • Clean up resources on client disconnect

Future Enhancements

  1. Token Usage Tracking: Track streaming token usage separately
  2. Pause/Resume: Allow users to pause streaming
  3. Multi-Model Streaming: Switch models mid-conversation
  4. Streaming Analytics: Track average tokens/second
  5. WebSocket Alternative: Consider WebSocket for bidirectional streaming

Troubleshooting

Stream Cuts Off Early

  • Check timeout settings (increase to 120s for long responses)
  • Verify nginx/proxy timeout configuration
  • Check network connectivity

Tokens Arrive Out of Order

  • Ensure single-threaded processing
  • Use buffer to accumulate before rendering
  • Verify SSE event ordering

High Latency

  • Check Azure OpenAI endpoint latency
  • Optimize context size to reduce time-to-first-token
  • Consider caching common responses

References