The Chatbot That Never Says “I’m Sorry, I’m Having Trouble Right Now”
Imagine this scenario.
You’ve just launched your AI-powered customer support chatbot. It’s smart. It’s fast. It’s powered by Claude 3.5 Sonnet, and your beta testers love it. The launch day goes perfectly. Users are delighted. Your boss is thrilled. You’re already planning the celebration.
Then June 2nd happens.
At 11:34 AM, your monitoring dashboard explodes. Error rates spike to 100%. Customer support tickets flood in. “The chatbot isn’t working.” “I’m getting error messages.” “Why is your AI broken?”
You scramble to figure out what’s happening. You check your code – it’s fine. You check your servers – they’re fine. You check Claude’s status page – and there it is. “Elevated errors affecting Claude Opus 4.6.”
Your chatbot is dead. Not because you did anything wrong. Not because your code is buggy. But because a service you depend on – a service you trusted – is having a bad day.
And here’s the painful truth: your users don’t care why the chatbot is broken. They only care that it’s broken.
This is the story of how to build a chatbot that never has to say “I’m sorry.” A chatbot that fails gracefully, degrades intelligently, and keeps working even when its primary brain takes an unexpected vacation.
This is the story of building a fault-tolerant AI chatbot in Python, with Claude as your star player and a reliable fallback ready to step in when the star needs a break.
Part 1: Why “Just Using Claude” Isn’t Good Enough
The Illusion of Reliability
Let’s start with a confession: I used to believe that relying on a single AI provider was perfectly fine. After all, these are massive companies with world-class infrastructure. They have redundancies. They have SLAs. They have teams of engineers whose entire job is keeping the lights on.
Then I watched three different production systems fail during the June 2nd outage.
The problem isn’t Claude. Claude is extraordinary. The problem is that every complex system eventually fails. Google has gone down. AWS has gone down. Cloudflare has gone down. And yes, Anthropic has gone down.
The June 2nd outage lasted nearly six hours. Six hours where thousands of businesses – from solo entrepreneurs to Fortune 500 companies – had AI-powered features that simply didn’t work.
But here’s what’s interesting. Some chatbots kept working. Not at full capacity, maybe. Not with Claude’s brilliance, certainly. But they kept responding. Kept helping users. Kept providing value.
Those chatbots had one thing in common: fallback strategies.
The Fallback Philosophy
A fallback isn’t a backup. A backup is something you restore after a failure. A fallback is something that seamlessly takes over during a failure.
Think of it like this: if your primary chef calls in sick, a backup means you close the restaurant for the day while you find a replacement. A fallback means your sous-chef immediately starts cooking, and your customers never even know there was a problem.
In AI terms, a fallback means:
- When Claude is unavailable, your chatbot automatically switches to another model
- When all cloud models are unavailable, it uses a tiny local model
- When even that fails, it serves cached responses to common questions
- When everything fails, it offers a graceful “we’ll get back to you” message instead of an error
The goal isn’t perfection. The goal is never showing an error message to your user.
What We’re Building
By the end of this guide, you’ll have built a production-ready, fault-tolerant AI chatbot with:
- Claude 3.5 Sonnet as your primary model (smart, expressive, excellent for most tasks)
- GPT-4 Turbo as your first fallback (still very capable, different strengths)
- Gemini 1.5 Pro as your second fallback (great for specific tasks, different pricing)
- A tiny local LLM as your third fallback (always available, never depends on APIs)
- Intelligent caching for common questions
- Circuit breakers to prevent cascade failures
- Health checks that proactively detect outages
- Proper logging and monitoring so you know exactly what’s happening
And we’ll build it all in Python, using modern libraries and patterns that won’t make your eyes glaze over.
Part 2: The Foundation – Setting Up Your Python Environment
Project Structure That Scales
Before we write a single line of chatbot logic, let’s set up a project structure that won’t collapse under its own weight. Create a directory and file structure like this:
fault_tolerant_chatbot/
├── .env # API keys (never commit this!)
├── requirements.txt # Dependencies
├── docker-compose.yml # For local development with Redis
├── src/
│ ├── __init__.py
│ ├── main.py # FastAPI/Streamlit entry point
│ ├── providers/
│ │ ├── __init__.py
│ │ ├── base.py # Abstract base class for all providers
│ │ ├── claude.py # Claude implementation
│ │ ├── gpt.py # OpenAI GPT implementation
│ │ ├── gemini.py # Google Gemini implementation
│ │ └── local.py # Local LLM (Ollama/transformers)
│ ├── fallback/
│ │ ├── __init__.py
│ │ ├── router.py # Routes requests to available providers
│ │ ├── circuit_breaker.py
│ │ └── health.py
│ ├── cache/
│ │ ├── __init__.py
│ │ └── redis_cache.py
│ └── models/
│ ├── __init__.py
│ └── conversation.py # Chat history, user sessions
├── tests/
│ ├── test_providers.py
│ └── test_fallback.py
└── README.mdInstalling the Essentials
Your requirements.txt should include:
# Core AI providers
anthropic>=0.30.0
openai>=1.30.0
google-generativeai>=0.7.0
# Local LLM support
ollama>=0.3.0 # Or use transformers for fully local
# Resilience and reliability
tenacity>=8.2.0
circuitbreaker>=2.0.0
backoff>=2.2.0
# Caching and async
redis>=5.0.0
asyncio>=3.4.3
aiohttp>=3.9.0
# Web framework (choose one)
fastapi>=0.110.0
uvicorn>=0.27.0
# OR streamlit>=1.32.0 for quick prototyping
# Monitoring and logging
structlog>=24.1.0
prometheus-client>=0.19.0
opentelemetry-api>=1.22.0
# Utilities
python-dotenv>=1.0.0
pydantic>=2.6.0Install everything with:
pip install -r requirements.txtEnvironment Configuration
Create your .env file (and add it to .gitignore immediately):
# API Keys
ANTHROPIC_API_KEY=sk-ant-...
OPENAI_API_KEY=sk-...
GOOGLE_API_KEY=AIza...
# Redis (for caching)
REDIS_URL=redis://localhost:6379
# Local LLM (Ollama)
OLLAMA_URL=http://localhost:11434
LOCAL_MODEL_NAME=llama3:8b
# Circuit breaker settings
CIRCUIT_BREAKER_FAILURE_THRESHOLD=5
CIRCUIT_BREAKER_RECOVERY_TIMEOUT=60
# Timeouts (seconds)
CLAUDE_TIMEOUT=30
GPT_TIMEOUT=25
GEMINI_TIMEOUT=20
LOCAL_TIMEOUT=10
# Feature flags
ENABLE_MULTI_PROVIDER=true
ENABLE_CACHING=true
ENABLE_FALLBACK_LOCAL=truePart 3: The Provider Abstraction – Treating All AIs Equally
The Base Provider Class
The secret to fault tolerance is abstraction. If your code directly calls anthropic.Anthropic().messages.create(), you’re locked in. If instead you call provider.chat(prompt), you can swap providers without changing a single line of business logic.
Here’s your abstract base class:
# src/providers/base.py
from abc import ABC, abstractmethod
from typing import Dict, Any, Optional, List
from pydantic import BaseModel
from datetime import datetime
import structlog
logger = structlog.get_logger()
class ChatMessage(BaseModel):
role: str # 'user', 'assistant', 'system'
content: str
timestamp: datetime = datetime.now()
class ProviderResponse(BaseModel):
content: str
provider_name: str
model_name: str
latency_ms: float
tokens_used: Optional[int] = None
success: bool = True
error: Optional[str] = None
class AIProvider(ABC):
"""Abstract base class for all AI providers"""
def __init__(self, name: str, model: str, timeout: int = 30):
self.name = name
self.model = model
self.timeout = timeout
self._healthy = True
@abstractmethod
async def chat(self, messages: List[ChatMessage], **kwargs) -> ProviderResponse:
"""Send a chat conversation to the provider and get a response"""
pass
@abstractmethod
async def health_check(self) -> bool:
"""Check if the provider is currently available"""
pass
def mark_unhealthy(self, error: str):
"""Mark this provider as temporarily unhealthy"""
self._healthy = False
logger.warning("provider_marked_unhealthy",
provider=self.name,
error=error)
def mark_healthy(self):
"""Mark this provider as healthy again"""
self._healthy = True
logger.info("provider_marked_healthy", provider=self.name)
@property
def is_healthy(self) -> bool:
return self._healthyImplementing Claude Provider
Now let’s implement the Claude provider. Notice how it follows the abstract interface exactly:
# src/providers/claude.py
import anthropic
import asyncio
import time
from typing import List
from .base import AIProvider, ChatMessage, ProviderResponse
import structlog
logger = structlog.get_logger()
class ClaudeProvider(AIProvider):
def __init__(self, api_key: str, timeout: int = 30):
super().__init__(name="claude", model="claude-3-sonnet-20240229", timeout=timeout)
self.client = anthropic.AsyncAnthropic(api_key=api_key, timeout=timeout)
async def chat(self, messages: List[ChatMessage], **kwargs) -> ProviderResponse:
start_time = time.time()
try:
# Convert our message format to Claude's format
claude_messages = [
{"role": msg.role, "content": msg.content}
for msg in messages if msg.role != "system"
]
# Extract system message if present
system_message = next(
(msg.content for msg in messages if msg.role == "system"),
None
)
response = await self.client.messages.create(
model=self.model,
messages=claude_messages,
system=system_message,
max_tokens=kwargs.get("max_tokens", 1000),
temperature=kwargs.get("temperature", 0.7)
)
latency_ms = (time.time() - start_time) * 1000
logger.info("claude_response_success",
latency_ms=latency_ms,
tokens_used=response.usage.input_tokens + response.usage.output_tokens)
return ProviderResponse(
content=response.content[0].text,
provider_name=self.name,
model_name=self.model,
latency_ms=latency_ms,
tokens_used=response.usage.input_tokens + response.usage.output_tokens,
success=True
)
except Exception as e:
latency_ms = (time.time() - start_time) * 1000
logger.error("claude_response_failed",
error=str(e),
latency_ms=latency_ms)
return ProviderResponse(
content="",
provider_name=self.name,
model_name=self.model,
latency_ms=latency_ms,
success=False,
error=str(e)
)
async def health_check(self) -> bool:
"""Quick health check using a minimal request"""
try:
# Use a tiny request to verify connectivity
await self.client.messages.create(
model=self.model,
messages=[{"role": "user", "content": "Hi"}],
max_tokens=5,
timeout=5
)
return True
except Exception:
return FalseImplementing GPT and Gemini Providers
The beauty of abstraction is that you can add providers without changing anything else. Here’s a condensed version for GPT:
# src/providers/gpt.py
import openai
import time
from .base import AIProvider, ChatMessage, ProviderResponse
class GPTProvider(AIProvider):
def __init__(self, api_key: str, timeout: int = 25):
super().__init__(name="gpt", model="gpt-4-turbo-preview", timeout=timeout)
self.client = openai.AsyncOpenAI(api_key=api_key, timeout=timeout)
async def chat(self, messages: List[ChatMessage], **kwargs) -> ProviderResponse:
start_time = time.time()
try:
gpt_messages = [
{"role": msg.role, "content": msg.content}
for msg in messages
]
response = await self.client.chat.completions.create(
model=self.model,
messages=gpt_messages,
max_tokens=kwargs.get("max_tokens", 1000),
temperature=kwargs.get("temperature", 0.7)
)
return ProviderResponse(
content=response.choices[0].message.content,
provider_name=self.name,
model_name=self.model,
latency_ms=(time.time() - start_time) * 1000,
tokens_used=response.usage.total_tokens,
success=True
)
except Exception as e:
return ProviderResponse(..., success=False, error=str(e))
async def health_check(self) -> bool:
# Similar to Claude
passPart 4: The Fallback Router – The Heart of Fault Tolerance
The Intelligent Router
Now for the magic. The fallback router is what makes your chatbot fault-tolerant. It tries providers in order, fails fast, and keeps trying until someone succeeds:
# src/fallback/router.py
import asyncio
from typing import List, Optional
from src.providers.base import AIProvider, ChatMessage, ProviderResponse
from src.fallback.circuit_breaker import CircuitBreaker
from src.cache.redis_cache import ResponseCache
import structlog
logger = structlog.get_logger()
class FallbackRouter:
def __init__(self, providers: List[AIProvider], cache: Optional[ResponseCache] = None):
self.providers = providers
self.cache = cache
self.circuit_breakers = {
provider.name: CircuitBreaker(
failure_threshold=5,
recovery_timeout=60,
name=provider.name
)
for provider in providers
}
async def chat(self, messages: List[ChatMessage], **kwargs) -> ProviderResponse:
"""Try providers in order until one succeeds"""
# Check cache first
if self.cache:
cache_key = self._generate_cache_key(messages)
cached_response = await self.cache.get(cache_key)
if cached_response:
logger.info("cache_hit", key=cache_key)
return cached_response
# Try each provider in priority order
for provider in self.providers:
# Skip if circuit breaker is open
cb = self.circuit_breakers[provider.name]
if cb.is_open:
logger.warning("circuit_breaker_open", provider=provider.name)
continue
try:
# Attempt the call with circuit breaker protection
response = await cb.call(provider.chat, messages, **kwargs)
if response.success:
logger.info("provider_succeeded",
provider=provider.name,
latency_ms=response.latency_ms)
# Cache successful response
if self.cache and response.success:
await self.cache.set(cache_key, response)
return response
else:
logger.warning("provider_failed",
provider=provider.name,
error=response.error)
except Exception as e:
logger.error("provider_exception",
provider=provider.name,
error=str(e))
# Circuit breaker will record failure automatically
# All providers failed
logger.error("all_providers_failed")
return ProviderResponse(
content="I'm having trouble connecting right now. Please try again in a moment.",
provider_name="fallback",
model_name="error_handler",
latency_ms=0,
success=False,
error="All providers unavailable"
)
def _generate_cache_key(self, messages: List[ChatMessage]) -> str:
"""Create a cache key from the conversation"""
import hashlib
conversation = "|".join([f"{m.role}:{m.content}" for m in messages])
return hashlib.md5(conversation.encode()).hexdigest()The Circuit Breaker Implementation
The circuit breaker prevents your system from repeatedly hammering a failed provider:
# src/fallback/circuit_breaker.py
import time
import asyncio
from enum import Enum
import structlog
logger = structlog.get_logger()
class CircuitState(Enum):
CLOSED = "closed" # Normal operation, calls go through
OPEN = "open" # Failure detected, calls fail fast
HALF_OPEN = "half_open" # Testing if service recovered
class CircuitBreaker:
def __init__(self, failure_threshold: int, recovery_timeout: int, name: str):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.name = name
self.state = CircuitState.CLOSED
self.failure_count = 0
self.last_failure_time = None
@property
def is_open(self) -> bool:
"""Check if circuit is open (failing fast)"""
if self.state == CircuitState.OPEN:
# Check if it's time to try again
if time.time() - self.last_failure_time > self.recovery_timeout:
logger.info("circuit_transition",
breaker=self.name,
from_state="open",
to_state="half_open")
self.state = CircuitState.HALF_OPEN
return False
return True
return False
async def call(self, func, *args, **kwargs):
"""Execute the function with circuit breaker protection"""
if self.is_open:
raise Exception(f"Circuit breaker '{self.name}' is open")
try:
result = await func(*args, **kwargs)
# Check if the result indicates success
if hasattr(result, 'success') and not result.success:
raise Exception(result.error)
# Success! Reset failure count in HALF_OPEN state
if self.state == CircuitState.HALF_OPEN:
self._reset()
return result
except Exception as e:
self._record_failure()
raise e
def _record_failure(self):
"""Record a failure and potentially open the circuit"""
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
if self.state != CircuitState.OPEN:
logger.warning("circuit_opened",
breaker=self.name,
failure_count=self.failure_count)
self.state = CircuitState.OPEN
def _reset(self):
"""Reset the circuit to closed state"""
self.failure_count = 0
self.state = CircuitState.CLOSED
logger.info("circuit_closed", breaker=self.name)Part 5: Caching for Speed and Resilience
Redis-Powered Response Cache
Caching serves two purposes in a fault-tolerant chatbot:
- Speed – Common questions get instant responses
- Resilience – During outages, serve cached answers
# src/cache/redis_cache.py
import redis.asyncio as redis
import json
from typing import Optional
from src.providers.base import ProviderResponse
import structlog
logger = structlog.get_logger()
class ResponseCache:
def __init__(self, redis_url: str, ttl_seconds: int = 3600):
self.redis = redis.from_url(redis_url)
self.ttl = ttl_seconds
async def get(self, key: str) -> Optional[ProviderResponse]:
"""Get cached response if available"""
try:
cached = await self.redis.get(key)
if cached:
data = json.loads(cached)
logger.debug("cache_hit", key=key)
return ProviderResponse(**data)
except Exception as e:
logger.warning("cache_get_failed", error=str(e))
return None
async def set(self, key: str, response: ProviderResponse):
"""Cache a response"""
try:
await self.redis.setex(
key,
self.ttl,
json.dumps(response.dict())
)
logger.debug("cache_set", key=key)
except Exception as e:
logger.warning("cache_set_failed", error=str(e))
async def invalidate_pattern(self, pattern: str):
"""Invalidate all keys matching a pattern"""
try:
keys = await self.redis.keys(pattern)
if keys:
await self.redis.delete(*keys)
logger.info("cache_invalidated", count=len(keys))
except Exception as e:
logger.warning("cache_invalidate_failed", error=str(e))Part 6: The Local LLM – Your Last Line of Defense
Running a Local Model with Ollama
The most underrated fallback is a local model. It’s never subject to API outages, rate limits, or network issues. It’s slower and less capable, but it’s always there.
First, install Ollama and pull a model:
curl -fsSL https://ollama.com/install.sh | sh
ollama pull llama3:8bNow implement the provider:
# src/providers/local.py
import aiohttp
import time
from .base import AIProvider, ChatMessage, ProviderResponse
import structlog
logger = structlog.get_logger()
class LocalLLMProvider(AIProvider):
def __init__(self, ollama_url: str, model_name: str, timeout: int = 10):
super().__init__(name="local", model=model_name, timeout=timeout)
self.ollama_url = ollama_url
self.model_name = model_name
async def chat(self, messages: List[ChatMessage], **kwargs) -> ProviderResponse:
start_time = time.time()
try:
# Convert conversation to prompt format
prompt = self._format_prompt(messages)
async with aiohttp.ClientSession() as session:
async with session.post(
f"{self.ollama_url}/api/generate",
json={
"model": self.model_name,
"prompt": prompt,
"stream": False,
"options": {
"num_predict": kwargs.get("max_tokens", 500),
"temperature": kwargs.get("temperature", 0.7)
}
},
timeout=aiohttp.ClientTimeout(total=self.timeout)
) as response:
result = await response.json()
latency_ms = (time.time() - start_time) * 1000
return ProviderResponse(
content=result.get("response", ""),
provider_name=self.name,
model_name=self.model_name,
latency_ms=latency_ms,
tokens_used=result.get("eval_count", 0),
success=True
)
except Exception as e:
latency_ms = (time.time() - start_time) * 1000
logger.error("local_llm_failed", error=str(e))
return ProviderResponse(
content="",
provider_name=self.name,
model_name=self.model_name,
latency_ms=latency_ms,
success=False,
error=str(e)
)
def _format_prompt(self, messages: List[ChatMessage]) -> str:
"""Convert conversation to a single prompt string"""
prompt_parts = []
for msg in messages:
if msg.role == "system":
prompt_parts.append(f"System: {msg.content}")
elif msg.role == "user":
prompt_parts.append(f"User: {msg.content}")
elif msg.role == "assistant":
prompt_parts.append(f"Assistant: {msg.content}")
prompt_parts.append("Assistant:")
return "\n".join(prompt_parts)
async def health_check(self) -> bool:
"""Check if Ollama is running and model is loaded"""
try:
async with aiohttp.ClientSession() as session:
async with session.get(f"{self.ollama_url}/api/tags", timeout=5) as response:
if response.status == 200:
data = await response.json()
models = [m["name"] for m in data.get("models", [])]
return self.model_name in models
except Exception:
return False
return FalsePart 7: Putting It All Together – The Chatbot API
The Complete Chatbot Application
Now let’s wire everything together into a working FastAPI application:
# src/main.py
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from typing import List, Optional
import os
from dotenv import load_dotenv
from src.providers.claude import ClaudeProvider
from src.providers.gpt import GPTProvider
from src.providers.gemini import GeminiProvider
from src.providers.local import LocalLLMProvider
from src.fallback.router import FallbackRouter
from src.cache.redis_cache import ResponseCache
from src.models.conversation import ChatMessage
load_dotenv()
app = FastAPI(title="Fault-Tolerant AI Chatbot")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"],
)
# Initialize providers in priority order
providers = []
# Primary: Claude
if os.getenv("ANTHROPIC_API_KEY"):
providers.append(ClaudeProvider(api_key=os.getenv("ANTHROPIC_API_KEY")))
# First fallback: GPT
if os.getenv("OPENAI_API_KEY"):
providers.append(GPTProvider(api_key=os.getenv("OPENAI_API_KEY")))
# Second fallback: Gemini
if os.getenv("GOOGLE_API_KEY"):
providers.append(GeminiProvider(api_key=os.getenv("GOOGLE_API_KEY")))
# Last resort: Local LLM
if os.getenv("ENABLE_FALLBACK_LOCAL") == "true":
providers.append(LocalLLMProvider(
ollama_url=os.getenv("OLLAMA_URL", "http://localhost:11434"),
model_name=os.getenv("LOCAL_MODEL_NAME", "llama3:8b")
))
# Initialize cache
cache = ResponseCache(redis_url=os.getenv("REDIS_URL", "redis://localhost:6379"))
# Create router
router = FallbackRouter(providers=providers, cache=cache)
class ChatRequest(BaseModel):
messages: List[dict] # [{"role": "user", "content": "Hello"}]
max_tokens: Optional[int] = 1000
temperature: Optional[float] = 0.7
class ChatResponse(BaseModel):
content: str
provider_used: str
model_used: str
latency_ms: float
cached: bool = False
@app.post("/chat", response_model=ChatResponse)
async def chat_endpoint(request: ChatRequest):
"""Main chat endpoint with automatic failover"""
# Convert request to our internal format
messages = [
ChatMessage(role=msg["role"], content=msg["content"])
for msg in request.messages
]
# Let the router handle everything
response = await router.chat(
messages,
max_tokens=request.max_tokens,
temperature=request.temperature
)
if not response.success:
raise HTTPException(status_code=503, detail=response.error)
return ChatResponse(
content=response.content,
provider_used=response.provider_name,
model_used=response.model_name,
latency_ms=response.latency_ms,
cached=response.provider_name == "cache"
)
@app.get("/health")
async def health_check():
"""Health check endpoint for load balancers"""
healthy_providers = []
for provider in providers:
is_healthy = await provider.health_check()
if is_healthy:
healthy_providers.append(provider.name)
return {
"status": "healthy" if healthy_providers else "degraded",
"available_providers": healthy_providers,
"total_providers": len(providers)
}
@app.get("/metrics")
async def metrics():
"""Prometheus metrics endpoint (simplified)"""
return {
"circuit_breaker_states": {
name: cb.state.value
for name, cb in router.circuit_breakers.items()
}
}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)Part 8: Testing Your Fault-Tolerant Chatbot
The Chaos Engineering Approach
You don’t know if your fallbacks work until you test them. Here’s how to simulate failures:
# tests/test_fallback.py
import pytest
from unittest.mock import patch, AsyncMock
from src.fallback.router import FallbackRouter
from src.providers.base import AIProvider, ProviderResponse
class FailingProvider(AIProvider):
"""A provider that always fails"""
async def chat(self, messages, **kwargs):
return ProviderResponse(
content="", provider_name=self.name,
model_name=self.model, latency_ms=0,
success=False, error="Simulated failure"
)
async def health_check(self):
return False
class WorkingProvider(AIProvider):
"""A provider that always works"""
async def chat(self, messages, **kwargs):
return ProviderResponse(
content="I'm working!",
provider_name=self.name,
model_name=self.model,
latency_ms=10,
success=True
)
async def health_check(self):
return True
@pytest.mark.asyncio
async def test_fallback_when_primary_fails():
# Primary fails, secondary works
providers = [
FailingProvider(name="failing", model="test"),
WorkingProvider(name="working", model="test")
]
router = FallbackRouter(providers=providers)
response = await router.chat([])
assert response.success
assert response.provider_name == "working"
assert response.content == "I'm working!"
@pytest.mark.asyncio
async def test_circuit_breaker_opens_after_failures():
failing = FailingProvider(name="failing", model="test")
router = FallbackRouter(providers=[failing])
# Make multiple failing calls
for _ in range(6): # More than threshold of 5
response = await router.chat([])
assert not response.success
# Circuit should now be open
cb = router.circuit_breakers["failing"]
assert cb.state.value == "open"Manual Testing Script
Run this to see your chatbot gracefully degrade:
# test_manual.py
import asyncio
from src.providers.claude import ClaudeProvider
from src.providers.gpt import GPTProvider
from src.fallback.router import FallbackRouter
from src.models.conversation import ChatMessage
import os
async def test_fault_tolerance():
print("Testing fault-tolerant chatbot...\n")
# Initialize with only Claude (for testing failures)
providers = [
ClaudeProvider(api_key=os.getenv("ANTHROPIC_API_KEY"))
]
router = FallbackRouter(providers=providers)
messages = [ChatMessage(role="user", content="What's the capital of France?")]
# Normal call
print("Test 1: Normal operation")
response = await router.chat(messages)
print(f"✅ Provider: {response.provider_name}")
print(f"✅ Response: {response.content[:100]}...\n")
# Simulate failure by monkey patching (evil but effective for testing)
print("Test 2: Simulating Claude failure...")
original_method = providers[0].chat
async def failing_chat(*args, **kwargs):
return ProviderResponse(
content="", provider_name="claude",
model_name="test", latency_ms=0,
success=False, error="Simulated outage"
)
providers[0].chat = failing_chat
# This should now fail gracefully
response = await router.chat(messages)
if not response.success:
print("✅ Graceful failure message returned")
print(f" Message: {response.content}\n")
# Restore original method
providers[0].chat = original_method
print("Test complete! Your chatbot handles failures gracefully.")
if __name__ == "__main__":
asyncio.run(test_fault_tolerance())Part 9: Deployment and Monitoring
Docker Configuration
Make your chatbot portable with Docker:
# docker-compose.yml
version: '3.8'
services:
redis:
image: redis:7-alpine
ports:
- "6379:6379"
ollama:
image: ollama/ollama:latest
ports:
- "11434:11434"
volumes:
- ollama_data:/root/.ollama
command: serve
chatbot:
build: .
ports:
- "8000:8000"
environment:
- REDIS_URL=redis://redis:6379
- OLLAMA_URL=http://ollama:11434
env_file:
- .env
depends_on:
- redis
- ollama
restart: unless-stopped
volumes:
ollama_data:Production Monitoring with Prometheus
Add these metrics to track fallback effectiveness:
# Add to main.py
from prometheus_client import Counter, Histogram, Gauge, generate_latest
chat_requests = Counter('chat_requests_total', 'Total chat requests')
chat_fallbacks = Counter('chat_fallbacks_total', 'Requests that required fallback')
provider_usage = Counter('provider_usage_total', 'Usage by provider', ['provider'])
response_latency = Histogram('response_latency_seconds', 'Response latency')
@app.get("/prometheus")
async def prometheus_metrics():
return Response(content=generate_latest(), media_type="text/plain")Conclusion: Your Chatbot Will Survive
You’ve just built something remarkable. Not just a chatbot – a fault-tolerant system that gracefully handles failure, intelligently falls back, and keeps providing value even when things go wrong.
Let’s recap what you’ve created:
- ✅ Primary provider (Claude) for best-in-class responses
- ✅ Two cloud fallbacks (GPT, Gemini) for redundancy
- ✅ Local LLM fallback for total independence
- ✅ Intelligent caching for speed and offline resilience
- ✅ Circuit breakers to prevent cascade failures
- ✅ Health checks to proactively detect issues
- ✅ Proper monitoring to understand what’s happening


