“Claude Went Down—Did Your Python Code Survive? Build This Instant AI Fallback Script”

“Claude Went Down—Did Your Python Code Survive? Build This Instant AI Fallback Script”

The Chatbot That Never Says “I’m Sorry, I’m Having Trouble Right Now”

Imagine this scenario.

You’ve just launched your AI-powered customer support chatbot. It’s smart. It’s fast. It’s powered by Claude 3.5 Sonnet, and your beta testers love it. The launch day goes perfectly. Users are delighted. Your boss is thrilled. You’re already planning the celebration.

Then June 2nd happens.

At 11:34 AM, your monitoring dashboard explodes. Error rates spike to 100%. Customer support tickets flood in. “The chatbot isn’t working.” “I’m getting error messages.” “Why is your AI broken?”

You scramble to figure out what’s happening. You check your code – it’s fine. You check your servers – they’re fine. You check Claude’s status page – and there it is. “Elevated errors affecting Claude Opus 4.6.”

Your chatbot is dead. Not because you did anything wrong. Not because your code is buggy. But because a service you depend on – a service you trusted – is having a bad day.

And here’s the painful truth: your users don’t care why the chatbot is broken. They only care that it’s broken.

This is the story of how to build a chatbot that never has to say “I’m sorry.” A chatbot that fails gracefully, degrades intelligently, and keeps working even when its primary brain takes an unexpected vacation.

This is the story of building a fault-tolerant AI chatbot in Python, with Claude as your star player and a reliable fallback ready to step in when the star needs a break.


Part 1: Why “Just Using Claude” Isn’t Good Enough

The Illusion of Reliability

Let’s start with a confession: I used to believe that relying on a single AI provider was perfectly fine. After all, these are massive companies with world-class infrastructure. They have redundancies. They have SLAs. They have teams of engineers whose entire job is keeping the lights on.

Then I watched three different production systems fail during the June 2nd outage.

The problem isn’t Claude. Claude is extraordinary. The problem is that every complex system eventually fails. Google has gone down. AWS has gone down. Cloudflare has gone down. And yes, Anthropic has gone down.

The June 2nd outage lasted nearly six hours. Six hours where thousands of businesses – from solo entrepreneurs to Fortune 500 companies – had AI-powered features that simply didn’t work.

But here’s what’s interesting. Some chatbots kept working. Not at full capacity, maybe. Not with Claude’s brilliance, certainly. But they kept responding. Kept helping users. Kept providing value.

Those chatbots had one thing in common: fallback strategies.

The Fallback Philosophy

A fallback isn’t a backup. A backup is something you restore after a failure. A fallback is something that seamlessly takes over during a failure.

Think of it like this: if your primary chef calls in sick, a backup means you close the restaurant for the day while you find a replacement. A fallback means your sous-chef immediately starts cooking, and your customers never even know there was a problem.

In AI terms, a fallback means:

  • When Claude is unavailable, your chatbot automatically switches to another model
  • When all cloud models are unavailable, it uses a tiny local model
  • When even that fails, it serves cached responses to common questions
  • When everything fails, it offers a graceful “we’ll get back to you” message instead of an error

The goal isn’t perfection. The goal is never showing an error message to your user.

What We’re Building

By the end of this guide, you’ll have built a production-ready, fault-tolerant AI chatbot with:

  1. Claude 3.5 Sonnet as your primary model (smart, expressive, excellent for most tasks)
  2. GPT-4 Turbo as your first fallback (still very capable, different strengths)
  3. Gemini 1.5 Pro as your second fallback (great for specific tasks, different pricing)
  4. A tiny local LLM as your third fallback (always available, never depends on APIs)
  5. Intelligent caching for common questions
  6. Circuit breakers to prevent cascade failures
  7. Health checks that proactively detect outages
  8. Proper logging and monitoring so you know exactly what’s happening

And we’ll build it all in Python, using modern libraries and patterns that won’t make your eyes glaze over.


Part 2: The Foundation – Setting Up Your Python Environment

Project Structure That Scales

Before we write a single line of chatbot logic, let’s set up a project structure that won’t collapse under its own weight. Create a directory and file structure like this:

fault_tolerant_chatbot/
├── .env                      # API keys (never commit this!)
├── requirements.txt          # Dependencies
├── docker-compose.yml        # For local development with Redis
├── src/
│   ├── __init__.py
│   ├── main.py              # FastAPI/Streamlit entry point
│   ├── providers/
│   │   ├── __init__.py
│   │   ├── base.py          # Abstract base class for all providers
│   │   ├── claude.py        # Claude implementation
│   │   ├── gpt.py           # OpenAI GPT implementation
│   │   ├── gemini.py        # Google Gemini implementation
│   │   └── local.py         # Local LLM (Ollama/transformers)
│   ├── fallback/
│   │   ├── __init__.py
│   │   ├── router.py        # Routes requests to available providers
│   │   ├── circuit_breaker.py
│   │   └── health.py
│   ├── cache/
│   │   ├── __init__.py
│   │   └── redis_cache.py
│   └── models/
│       ├── __init__.py
│       └── conversation.py   # Chat history, user sessions
├── tests/
│   ├── test_providers.py
│   └── test_fallback.py
└── README.md

Installing the Essentials

Your requirements.txt should include:

# Core AI providers
anthropic>=0.30.0
openai>=1.30.0
google-generativeai>=0.7.0

# Local LLM support
ollama>=0.3.0  # Or use transformers for fully local

# Resilience and reliability
tenacity>=8.2.0
circuitbreaker>=2.0.0
backoff>=2.2.0

# Caching and async
redis>=5.0.0
asyncio>=3.4.3
aiohttp>=3.9.0

# Web framework (choose one)
fastapi>=0.110.0
uvicorn>=0.27.0
# OR streamlit>=1.32.0 for quick prototyping

# Monitoring and logging
structlog>=24.1.0
prometheus-client>=0.19.0
opentelemetry-api>=1.22.0

# Utilities
python-dotenv>=1.0.0
pydantic>=2.6.0

Install everything with:

pip install -r requirements.txt

Environment Configuration

Create your .env file (and add it to .gitignore immediately):

# API Keys
ANTHROPIC_API_KEY=sk-ant-...
OPENAI_API_KEY=sk-...
GOOGLE_API_KEY=AIza...

# Redis (for caching)
REDIS_URL=redis://localhost:6379

# Local LLM (Ollama)
OLLAMA_URL=http://localhost:11434
LOCAL_MODEL_NAME=llama3:8b

# Circuit breaker settings
CIRCUIT_BREAKER_FAILURE_THRESHOLD=5
CIRCUIT_BREAKER_RECOVERY_TIMEOUT=60

# Timeouts (seconds)
CLAUDE_TIMEOUT=30
GPT_TIMEOUT=25
GEMINI_TIMEOUT=20
LOCAL_TIMEOUT=10

# Feature flags
ENABLE_MULTI_PROVIDER=true
ENABLE_CACHING=true
ENABLE_FALLBACK_LOCAL=true

Part 3: The Provider Abstraction – Treating All AIs Equally

The Base Provider Class

The secret to fault tolerance is abstraction. If your code directly calls anthropic.Anthropic().messages.create(), you’re locked in. If instead you call provider.chat(prompt), you can swap providers without changing a single line of business logic.

Here’s your abstract base class:

# src/providers/base.py

from abc import ABC, abstractmethod
from typing import Dict, Any, Optional, List
from pydantic import BaseModel
from datetime import datetime
import structlog

logger = structlog.get_logger()

class ChatMessage(BaseModel):
    role: str  # 'user', 'assistant', 'system'
    content: str
    timestamp: datetime = datetime.now()

class ProviderResponse(BaseModel):
    content: str
    provider_name: str
    model_name: str
    latency_ms: float
    tokens_used: Optional[int] = None
    success: bool = True
    error: Optional[str] = None

class AIProvider(ABC):
    """Abstract base class for all AI providers"""

    def __init__(self, name: str, model: str, timeout: int = 30):
        self.name = name
        self.model = model
        self.timeout = timeout
        self._healthy = True

    @abstractmethod
    async def chat(self, messages: List[ChatMessage], **kwargs) -> ProviderResponse:
        """Send a chat conversation to the provider and get a response"""
        pass

    @abstractmethod
    async def health_check(self) -> bool:
        """Check if the provider is currently available"""
        pass

    def mark_unhealthy(self, error: str):
        """Mark this provider as temporarily unhealthy"""
        self._healthy = False
        logger.warning("provider_marked_unhealthy", 
                      provider=self.name, 
                      error=error)

    def mark_healthy(self):
        """Mark this provider as healthy again"""
        self._healthy = True
        logger.info("provider_marked_healthy", provider=self.name)

    @property
    def is_healthy(self) -> bool:
        return self._healthy

Implementing Claude Provider

Now let’s implement the Claude provider. Notice how it follows the abstract interface exactly:

# src/providers/claude.py

import anthropic
import asyncio
import time
from typing import List
from .base import AIProvider, ChatMessage, ProviderResponse
import structlog

logger = structlog.get_logger()

class ClaudeProvider(AIProvider):
    def __init__(self, api_key: str, timeout: int = 30):
        super().__init__(name="claude", model="claude-3-sonnet-20240229", timeout=timeout)
        self.client = anthropic.AsyncAnthropic(api_key=api_key, timeout=timeout)

    async def chat(self, messages: List[ChatMessage], **kwargs) -> ProviderResponse:
        start_time = time.time()

        try:
            # Convert our message format to Claude's format
            claude_messages = [
                {"role": msg.role, "content": msg.content}
                for msg in messages if msg.role != "system"
            ]

            # Extract system message if present
            system_message = next(
                (msg.content for msg in messages if msg.role == "system"), 
                None
            )

            response = await self.client.messages.create(
                model=self.model,
                messages=claude_messages,
                system=system_message,
                max_tokens=kwargs.get("max_tokens", 1000),
                temperature=kwargs.get("temperature", 0.7)
            )

            latency_ms = (time.time() - start_time) * 1000

            logger.info("claude_response_success", 
                       latency_ms=latency_ms,
                       tokens_used=response.usage.input_tokens + response.usage.output_tokens)

            return ProviderResponse(
                content=response.content[0].text,
                provider_name=self.name,
                model_name=self.model,
                latency_ms=latency_ms,
                tokens_used=response.usage.input_tokens + response.usage.output_tokens,
                success=True
            )

        except Exception as e:
            latency_ms = (time.time() - start_time) * 1000
            logger.error("claude_response_failed", 
                        error=str(e),
                        latency_ms=latency_ms)

            return ProviderResponse(
                content="",
                provider_name=self.name,
                model_name=self.model,
                latency_ms=latency_ms,
                success=False,
                error=str(e)
            )

    async def health_check(self) -> bool:
        """Quick health check using a minimal request"""
        try:
            # Use a tiny request to verify connectivity
            await self.client.messages.create(
                model=self.model,
                messages=[{"role": "user", "content": "Hi"}],
                max_tokens=5,
                timeout=5
            )
            return True
        except Exception:
            return False

Implementing GPT and Gemini Providers

The beauty of abstraction is that you can add providers without changing anything else. Here’s a condensed version for GPT:

# src/providers/gpt.py

import openai
import time
from .base import AIProvider, ChatMessage, ProviderResponse

class GPTProvider(AIProvider):
    def __init__(self, api_key: str, timeout: int = 25):
        super().__init__(name="gpt", model="gpt-4-turbo-preview", timeout=timeout)
        self.client = openai.AsyncOpenAI(api_key=api_key, timeout=timeout)

    async def chat(self, messages: List[ChatMessage], **kwargs) -> ProviderResponse:
        start_time = time.time()

        try:
            gpt_messages = [
                {"role": msg.role, "content": msg.content}
                for msg in messages
            ]

            response = await self.client.chat.completions.create(
                model=self.model,
                messages=gpt_messages,
                max_tokens=kwargs.get("max_tokens", 1000),
                temperature=kwargs.get("temperature", 0.7)
            )

            return ProviderResponse(
                content=response.choices[0].message.content,
                provider_name=self.name,
                model_name=self.model,
                latency_ms=(time.time() - start_time) * 1000,
                tokens_used=response.usage.total_tokens,
                success=True
            )
        except Exception as e:
            return ProviderResponse(..., success=False, error=str(e))

    async def health_check(self) -> bool:
        # Similar to Claude
        pass

Part 4: The Fallback Router – The Heart of Fault Tolerance

The Intelligent Router

Now for the magic. The fallback router is what makes your chatbot fault-tolerant. It tries providers in order, fails fast, and keeps trying until someone succeeds:

# src/fallback/router.py

import asyncio
from typing import List, Optional
from src.providers.base import AIProvider, ChatMessage, ProviderResponse
from src.fallback.circuit_breaker import CircuitBreaker
from src.cache.redis_cache import ResponseCache
import structlog

logger = structlog.get_logger()

class FallbackRouter:
    def __init__(self, providers: List[AIProvider], cache: Optional[ResponseCache] = None):
        self.providers = providers
        self.cache = cache
        self.circuit_breakers = {
            provider.name: CircuitBreaker(
                failure_threshold=5,
                recovery_timeout=60,
                name=provider.name
            )
            for provider in providers
        }

    async def chat(self, messages: List[ChatMessage], **kwargs) -> ProviderResponse:
        """Try providers in order until one succeeds"""

        # Check cache first
        if self.cache:
            cache_key = self._generate_cache_key(messages)
            cached_response = await self.cache.get(cache_key)
            if cached_response:
                logger.info("cache_hit", key=cache_key)
                return cached_response

        # Try each provider in priority order
        for provider in self.providers:
            # Skip if circuit breaker is open
            cb = self.circuit_breakers[provider.name]
            if cb.is_open:
                logger.warning("circuit_breaker_open", provider=provider.name)
                continue

            try:
                # Attempt the call with circuit breaker protection
                response = await cb.call(provider.chat, messages, **kwargs)

                if response.success:
                    logger.info("provider_succeeded", 
                              provider=provider.name,
                              latency_ms=response.latency_ms)

                    # Cache successful response
                    if self.cache and response.success:
                        await self.cache.set(cache_key, response)

                    return response
                else:
                    logger.warning("provider_failed",
                                  provider=provider.name,
                                  error=response.error)

            except Exception as e:
                logger.error("provider_exception",
                            provider=provider.name,
                            error=str(e))
                # Circuit breaker will record failure automatically

        # All providers failed
        logger.error("all_providers_failed")
        return ProviderResponse(
            content="I'm having trouble connecting right now. Please try again in a moment.",
            provider_name="fallback",
            model_name="error_handler",
            latency_ms=0,
            success=False,
            error="All providers unavailable"
        )

    def _generate_cache_key(self, messages: List[ChatMessage]) -> str:
        """Create a cache key from the conversation"""
        import hashlib
        conversation = "|".join([f"{m.role}:{m.content}" for m in messages])
        return hashlib.md5(conversation.encode()).hexdigest()

The Circuit Breaker Implementation

The circuit breaker prevents your system from repeatedly hammering a failed provider:

# src/fallback/circuit_breaker.py

import time
import asyncio
from enum import Enum
import structlog

logger = structlog.get_logger()

class CircuitState(Enum):
    CLOSED = "closed"      # Normal operation, calls go through
    OPEN = "open"          # Failure detected, calls fail fast
    HALF_OPEN = "half_open" # Testing if service recovered

class CircuitBreaker:
    def __init__(self, failure_threshold: int, recovery_timeout: int, name: str):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.name = name

        self.state = CircuitState.CLOSED
        self.failure_count = 0
        self.last_failure_time = None

    @property
    def is_open(self) -> bool:
        """Check if circuit is open (failing fast)"""
        if self.state == CircuitState.OPEN:
            # Check if it's time to try again
            if time.time() - self.last_failure_time > self.recovery_timeout:
                logger.info("circuit_transition", 
                           breaker=self.name, 
                           from_state="open", 
                           to_state="half_open")
                self.state = CircuitState.HALF_OPEN
                return False
            return True
        return False

    async def call(self, func, *args, **kwargs):
        """Execute the function with circuit breaker protection"""

        if self.is_open:
            raise Exception(f"Circuit breaker '{self.name}' is open")

        try:
            result = await func(*args, **kwargs)

            # Check if the result indicates success
            if hasattr(result, 'success') and not result.success:
                raise Exception(result.error)

            # Success! Reset failure count in HALF_OPEN state
            if self.state == CircuitState.HALF_OPEN:
                self._reset()

            return result

        except Exception as e:
            self._record_failure()
            raise e

    def _record_failure(self):
        """Record a failure and potentially open the circuit"""
        self.failure_count += 1
        self.last_failure_time = time.time()

        if self.failure_count >= self.failure_threshold:
            if self.state != CircuitState.OPEN:
                logger.warning("circuit_opened",
                              breaker=self.name,
                              failure_count=self.failure_count)
                self.state = CircuitState.OPEN

    def _reset(self):
        """Reset the circuit to closed state"""
        self.failure_count = 0
        self.state = CircuitState.CLOSED
        logger.info("circuit_closed", breaker=self.name)

Part 5: Caching for Speed and Resilience

Redis-Powered Response Cache

Caching serves two purposes in a fault-tolerant chatbot:

  1. Speed – Common questions get instant responses
  2. Resilience – During outages, serve cached answers
# src/cache/redis_cache.py

import redis.asyncio as redis
import json
from typing import Optional
from src.providers.base import ProviderResponse
import structlog

logger = structlog.get_logger()

class ResponseCache:
    def __init__(self, redis_url: str, ttl_seconds: int = 3600):
        self.redis = redis.from_url(redis_url)
        self.ttl = ttl_seconds

    async def get(self, key: str) -> Optional[ProviderResponse]:
        """Get cached response if available"""
        try:
            cached = await self.redis.get(key)
            if cached:
                data = json.loads(cached)
                logger.debug("cache_hit", key=key)
                return ProviderResponse(**data)
        except Exception as e:
            logger.warning("cache_get_failed", error=str(e))
        return None

    async def set(self, key: str, response: ProviderResponse):
        """Cache a response"""
        try:
            await self.redis.setex(
                key,
                self.ttl,
                json.dumps(response.dict())
            )
            logger.debug("cache_set", key=key)
        except Exception as e:
            logger.warning("cache_set_failed", error=str(e))

    async def invalidate_pattern(self, pattern: str):
        """Invalidate all keys matching a pattern"""
        try:
            keys = await self.redis.keys(pattern)
            if keys:
                await self.redis.delete(*keys)
                logger.info("cache_invalidated", count=len(keys))
        except Exception as e:
            logger.warning("cache_invalidate_failed", error=str(e))

Part 6: The Local LLM – Your Last Line of Defense

Running a Local Model with Ollama

The most underrated fallback is a local model. It’s never subject to API outages, rate limits, or network issues. It’s slower and less capable, but it’s always there.

First, install Ollama and pull a model:

curl -fsSL https://ollama.com/install.sh | sh
ollama pull llama3:8b

Now implement the provider:

# src/providers/local.py

import aiohttp
import time
from .base import AIProvider, ChatMessage, ProviderResponse
import structlog

logger = structlog.get_logger()

class LocalLLMProvider(AIProvider):
    def __init__(self, ollama_url: str, model_name: str, timeout: int = 10):
        super().__init__(name="local", model=model_name, timeout=timeout)
        self.ollama_url = ollama_url
        self.model_name = model_name

    async def chat(self, messages: List[ChatMessage], **kwargs) -> ProviderResponse:
        start_time = time.time()

        try:
            # Convert conversation to prompt format
            prompt = self._format_prompt(messages)

            async with aiohttp.ClientSession() as session:
                async with session.post(
                    f"{self.ollama_url}/api/generate",
                    json={
                        "model": self.model_name,
                        "prompt": prompt,
                        "stream": False,
                        "options": {
                            "num_predict": kwargs.get("max_tokens", 500),
                            "temperature": kwargs.get("temperature", 0.7)
                        }
                    },
                    timeout=aiohttp.ClientTimeout(total=self.timeout)
                ) as response:
                    result = await response.json()

                    latency_ms = (time.time() - start_time) * 1000

                    return ProviderResponse(
                        content=result.get("response", ""),
                        provider_name=self.name,
                        model_name=self.model_name,
                        latency_ms=latency_ms,
                        tokens_used=result.get("eval_count", 0),
                        success=True
                    )

        except Exception as e:
            latency_ms = (time.time() - start_time) * 1000
            logger.error("local_llm_failed", error=str(e))

            return ProviderResponse(
                content="",
                provider_name=self.name,
                model_name=self.model_name,
                latency_ms=latency_ms,
                success=False,
                error=str(e)
            )

    def _format_prompt(self, messages: List[ChatMessage]) -> str:
        """Convert conversation to a single prompt string"""
        prompt_parts = []
        for msg in messages:
            if msg.role == "system":
                prompt_parts.append(f"System: {msg.content}")
            elif msg.role == "user":
                prompt_parts.append(f"User: {msg.content}")
            elif msg.role == "assistant":
                prompt_parts.append(f"Assistant: {msg.content}")

        prompt_parts.append("Assistant:")
        return "\n".join(prompt_parts)

    async def health_check(self) -> bool:
        """Check if Ollama is running and model is loaded"""
        try:
            async with aiohttp.ClientSession() as session:
                async with session.get(f"{self.ollama_url}/api/tags", timeout=5) as response:
                    if response.status == 200:
                        data = await response.json()
                        models = [m["name"] for m in data.get("models", [])]
                        return self.model_name in models
        except Exception:
            return False
        return False

Part 7: Putting It All Together – The Chatbot API

The Complete Chatbot Application

Now let’s wire everything together into a working FastAPI application:

# src/main.py

from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from typing import List, Optional
import os
from dotenv import load_dotenv

from src.providers.claude import ClaudeProvider
from src.providers.gpt import GPTProvider
from src.providers.gemini import GeminiProvider
from src.providers.local import LocalLLMProvider
from src.fallback.router import FallbackRouter
from src.cache.redis_cache import ResponseCache
from src.models.conversation import ChatMessage

load_dotenv()

app = FastAPI(title="Fault-Tolerant AI Chatbot")

app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_methods=["*"],
    allow_headers=["*"],
)

# Initialize providers in priority order
providers = []

# Primary: Claude
if os.getenv("ANTHROPIC_API_KEY"):
    providers.append(ClaudeProvider(api_key=os.getenv("ANTHROPIC_API_KEY")))

# First fallback: GPT
if os.getenv("OPENAI_API_KEY"):
    providers.append(GPTProvider(api_key=os.getenv("OPENAI_API_KEY")))

# Second fallback: Gemini
if os.getenv("GOOGLE_API_KEY"):
    providers.append(GeminiProvider(api_key=os.getenv("GOOGLE_API_KEY")))

# Last resort: Local LLM
if os.getenv("ENABLE_FALLBACK_LOCAL") == "true":
    providers.append(LocalLLMProvider(
        ollama_url=os.getenv("OLLAMA_URL", "http://localhost:11434"),
        model_name=os.getenv("LOCAL_MODEL_NAME", "llama3:8b")
    ))

# Initialize cache
cache = ResponseCache(redis_url=os.getenv("REDIS_URL", "redis://localhost:6379"))

# Create router
router = FallbackRouter(providers=providers, cache=cache)

class ChatRequest(BaseModel):
    messages: List[dict]  # [{"role": "user", "content": "Hello"}]
    max_tokens: Optional[int] = 1000
    temperature: Optional[float] = 0.7

class ChatResponse(BaseModel):
    content: str
    provider_used: str
    model_used: str
    latency_ms: float
    cached: bool = False

@app.post("/chat", response_model=ChatResponse)
async def chat_endpoint(request: ChatRequest):
    """Main chat endpoint with automatic failover"""

    # Convert request to our internal format
    messages = [
        ChatMessage(role=msg["role"], content=msg["content"])
        for msg in request.messages
    ]

    # Let the router handle everything
    response = await router.chat(
        messages,
        max_tokens=request.max_tokens,
        temperature=request.temperature
    )

    if not response.success:
        raise HTTPException(status_code=503, detail=response.error)

    return ChatResponse(
        content=response.content,
        provider_used=response.provider_name,
        model_used=response.model_name,
        latency_ms=response.latency_ms,
        cached=response.provider_name == "cache"
    )

@app.get("/health")
async def health_check():
    """Health check endpoint for load balancers"""
    healthy_providers = []
    for provider in providers:
        is_healthy = await provider.health_check()
        if is_healthy:
            healthy_providers.append(provider.name)

    return {
        "status": "healthy" if healthy_providers else "degraded",
        "available_providers": healthy_providers,
        "total_providers": len(providers)
    }

@app.get("/metrics")
async def metrics():
    """Prometheus metrics endpoint (simplified)"""
    return {
        "circuit_breaker_states": {
            name: cb.state.value 
            for name, cb in router.circuit_breakers.items()
        }
    }

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

Part 8: Testing Your Fault-Tolerant Chatbot

The Chaos Engineering Approach

You don’t know if your fallbacks work until you test them. Here’s how to simulate failures:

# tests/test_fallback.py

import pytest
from unittest.mock import patch, AsyncMock
from src.fallback.router import FallbackRouter
from src.providers.base import AIProvider, ProviderResponse

class FailingProvider(AIProvider):
    """A provider that always fails"""
    async def chat(self, messages, **kwargs):
        return ProviderResponse(
            content="", provider_name=self.name,
            model_name=self.model, latency_ms=0,
            success=False, error="Simulated failure"
        )

    async def health_check(self):
        return False

class WorkingProvider(AIProvider):
    """A provider that always works"""
    async def chat(self, messages, **kwargs):
        return ProviderResponse(
            content="I'm working!",
            provider_name=self.name,
            model_name=self.model,
            latency_ms=10,
            success=True
        )

    async def health_check(self):
        return True

@pytest.mark.asyncio
async def test_fallback_when_primary_fails():
    # Primary fails, secondary works
    providers = [
        FailingProvider(name="failing", model="test"),
        WorkingProvider(name="working", model="test")
    ]

    router = FallbackRouter(providers=providers)
    response = await router.chat([])

    assert response.success
    assert response.provider_name == "working"
    assert response.content == "I'm working!"

@pytest.mark.asyncio
async def test_circuit_breaker_opens_after_failures():
    failing = FailingProvider(name="failing", model="test")
    router = FallbackRouter(providers=[failing])

    # Make multiple failing calls
    for _ in range(6):  # More than threshold of 5
        response = await router.chat([])
        assert not response.success

    # Circuit should now be open
    cb = router.circuit_breakers["failing"]
    assert cb.state.value == "open"

Manual Testing Script

Run this to see your chatbot gracefully degrade:

# test_manual.py

import asyncio
from src.providers.claude import ClaudeProvider
from src.providers.gpt import GPTProvider
from src.fallback.router import FallbackRouter
from src.models.conversation import ChatMessage
import os

async def test_fault_tolerance():
    print("Testing fault-tolerant chatbot...\n")

    # Initialize with only Claude (for testing failures)
    providers = [
        ClaudeProvider(api_key=os.getenv("ANTHROPIC_API_KEY"))
    ]

    router = FallbackRouter(providers=providers)

    messages = [ChatMessage(role="user", content="What's the capital of France?")]

    # Normal call
    print("Test 1: Normal operation")
    response = await router.chat(messages)
    print(f"✅ Provider: {response.provider_name}")
    print(f"✅ Response: {response.content[:100]}...\n")

    # Simulate failure by monkey patching (evil but effective for testing)
    print("Test 2: Simulating Claude failure...")
    original_method = providers[0].chat

    async def failing_chat(*args, **kwargs):
        return ProviderResponse(
            content="", provider_name="claude",
            model_name="test", latency_ms=0,
            success=False, error="Simulated outage"
        )

    providers[0].chat = failing_chat

    # This should now fail gracefully
    response = await router.chat(messages)
    if not response.success:
        print("✅ Graceful failure message returned")
        print(f"   Message: {response.content}\n")

    # Restore original method
    providers[0].chat = original_method

    print("Test complete! Your chatbot handles failures gracefully.")

if __name__ == "__main__":
    asyncio.run(test_fault_tolerance())

Part 9: Deployment and Monitoring

Docker Configuration

Make your chatbot portable with Docker:

# docker-compose.yml

version: '3.8'

services:
  redis:
    image: redis:7-alpine
    ports:
      - "6379:6379"

  ollama:
    image: ollama/ollama:latest
    ports:
      - "11434:11434"
    volumes:
      - ollama_data:/root/.ollama
    command: serve

  chatbot:
    build: .
    ports:
      - "8000:8000"
    environment:
      - REDIS_URL=redis://redis:6379
      - OLLAMA_URL=http://ollama:11434
    env_file:
      - .env
    depends_on:
      - redis
      - ollama
    restart: unless-stopped

volumes:
  ollama_data:

Production Monitoring with Prometheus

Add these metrics to track fallback effectiveness:

# Add to main.py

from prometheus_client import Counter, Histogram, Gauge, generate_latest

chat_requests = Counter('chat_requests_total', 'Total chat requests')
chat_fallbacks = Counter('chat_fallbacks_total', 'Requests that required fallback')
provider_usage = Counter('provider_usage_total', 'Usage by provider', ['provider'])
response_latency = Histogram('response_latency_seconds', 'Response latency')

@app.get("/prometheus")
async def prometheus_metrics():
    return Response(content=generate_latest(), media_type="text/plain")

Conclusion: Your Chatbot Will Survive

You’ve just built something remarkable. Not just a chatbot – a fault-tolerant system that gracefully handles failure, intelligently falls back, and keeps providing value even when things go wrong.

Let’s recap what you’ve created:

  • Primary provider (Claude) for best-in-class responses
  • Two cloud fallbacks (GPT, Gemini) for redundancy
  • Local LLM fallback for total independence
  • Intelligent caching for speed and offline resilience
  • Circuit breakers to prevent cascade failures
  • Health checks to proactively detect issues
  • Proper monitoring to understand what’s happening