Building a search API with Elasticsearch requires careful consideration of data modeling, query construction, and response handling. In this article, we’ll explore how to create a robust search API using Python and FastAPI, focusing on best practices and performance optimization.
Introduction
Modern applications often require sophisticated search capabilities. By combining Elasticsearch with Python’s FastAPI framework, we can create powerful search APIs that deliver fast, relevant results. We’ll cover the implementation using Python’s ecosystem of tools and libraries.
Project Setup
# Create virtual environment
python -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activate
# Install dependencies
pip install fastapi uvicorn elasticsearch python-dotenv redis
Basic API Structure
# main.py
from fastapi import FastAPI, HTTPException
from elasticsearch import AsyncElasticsearch
from dotenv import load_dotenv
import os
from typing import Optional, List, Dict, Any
load_dotenv()
app = FastAPI()
# Initialize Elasticsearch client
es = AsyncElasticsearch(
os.getenv("ELASTICSEARCH_URL"),
basic_auth=(
os.getenv("ELASTICSEARCH_USERNAME"),
os.getenv("ELASTICSEARCH_PASSWORD")
)
)
@app.get("/api/search")
async def search(
query: str,
page: int = 1,
size: int = 10
):
try:
result = await es.search(
index="products",
from_=(page - 1) * size,
size=size,
query={
"multi_match": {
"query": query,
"fields": ["title^3", "description", "category"]
}
}
)
return {
"results": result["hits"]["hits"],
"total": result["hits"]["total"]["value"],
"page": page,
"size": size
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=3000)
Advanced Search Features
Faceted Search
@app.get("/api/search/faceted")
async def faceted_search(
query: str,
filters: Optional[Dict[str, str]] = None
):
try:
search_query = {
"bool": {
"must": [
{
"multi_match": {
"query": query,
"fields": ["title^3", "description"]
}
}
]
}
}
if filters:
search_query["bool"]["filter"] = [
{"term": {field: value}}
for field, value in filters.items()
]
result = await es.search(
index="products",
query=search_query,
aggs={
"categories": {"terms": {"field": "category"}},
"price_ranges": {
"range": {
"field": "price",
"ranges": [
{"to": 50},
{"from": 50, "to": 100},
{"from": 100}
]
}
}
}
)
return {
"results": result["hits"]["hits"],
"facets": result["aggregations"]
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
Autocomplete
@app.get("/api/autocomplete")
async def autocomplete(prefix: str):
try:
result = await es.search(
index="products",
suggest={
"product-suggest": {
"prefix": prefix,
"completion": {
"field": "suggest",
"size": 5
}
}
}
)
return {
"suggestions": result["suggest"]["product-suggest"][0]["options"]
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
Error Handling and Validation
Input Validation
from pydantic import BaseModel, Field
class SearchParams(BaseModel):
query: str
page: int = Field(default=1, ge=1)
size: int = Field(default=10, ge=1, le=100)
@app.get("/api/search/validated")
async def validated_search(params: SearchParams):
try:
result = await es.search(
index="products",
from_=(params.page - 1) * params.size,
size=params.size,
query={
"multi_match": {
"query": params.query,
"fields": ["title^3", "description", "category"]
}
}
)
return {
"results": result["hits"]["hits"],
"total": result["hits"]["total"]["value"],
"page": params.page,
"size": params.size
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
Error Handling
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.exception_handler(Exception)
async def global_exception_handler(request, exc):
return JSONResponse(
status_code=500,
content={"error": str(exc)}
)
Performance Optimization
Caching
import redis
from fastapi_cache import FastAPICache
from fastapi_cache.backends.redis import RedisBackend
from fastapi_cache.decorator import cache
# Initialize Redis
redis_client = redis.Redis(
host=os.getenv("REDIS_HOST", "localhost"),
port=int(os.getenv("REDIS_PORT", 6379)),
db=0
)
@app.on_event("startup")
async def startup():
FastAPICache.init(RedisBackend(redis_client))
@app.get("/api/search/cached")
@cache(expire=3600)
async def cached_search(query: str, page: int = 1, size: int = 10):
try:
result = await es.search(
index="products",
from_=(page - 1) * size,
size=size,
query={
"multi_match": {
"query": query,
"fields": ["title^3", "description", "category"]
}
}
)
return {
"results": result["hits"]["hits"],
"total": result["hits"]["total"]["value"],
"page": page,
"size": size
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
Query Optimization
def optimize_query(query: str) -> Dict[str, Any]:
return {
"query": {
"bool": {
"should": [
{
"match_phrase": {
"title": {
"query": query,
"boost": 3
}
}
},
{
"match": {
"description": query
}
}
],
"minimum_should_match": 1
}
}
}
Testing
Unit Tests
# test_search.py
import pytest
from fastapi.testclient import TestClient
from main import app
client = TestClient(app)
def test_search_endpoint():
response = client.get("/api/search?query=laptop")
assert response.status_code == 200
data = response.json()
assert "results" in data
assert "total" in data
assert isinstance(data["results"], list)
Integration Tests
# test_integration.py
import pytest
from elasticsearch import AsyncElasticsearch
from main import app, es
@pytest.mark.asyncio
async def test_search_integration():
# Test with actual Elasticsearch connection
result = await es.search(
index="products",
query={"match_all": {}}
)
assert "hits" in result
assert "total" in result["hits"]
Deployment Considerations
Environment Configuration
# .env file
ELASTICSEARCH_URL=http://localhost:9200
ELASTICSEARCH_USERNAME=admin
ELASTICSEARCH_PASSWORD=secret
REDIS_HOST=localhost
REDIS_PORT=6379
Docker Configuration
# Dockerfile
FROM python:3.9-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
EXPOSE 3000
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "3000"]
Best Practices
-
API Design
- Use FastAPI’s built-in validation
- Implement proper error handling
- Add request validation with Pydantic
- Document endpoints with OpenAPI
-
Performance
- Implement Redis caching
- Optimize Elasticsearch queries
- Use async/await for I/O operations
- Monitor response times
-
Security
- Validate inputs with Pydantic
- Implement rate limiting
- Use proper authentication
- Sanitize outputs
Common Issues and Solutions
Performance Issues
- Implement Redis caching
- Optimize query structure
- Use appropriate indexes
- Monitor resource usage
Scalability
- Use connection pooling
- Implement load balancing
- Monitor cluster health
- Plan for horizontal scaling
Next Steps
After building your search API:
- Add authentication with JWT
- Implement monitoring with Prometheus
- Add more search features
- Optimize for production
Conclusion
Building a search API with Elasticsearch and Python requires:
- Proper project structure with FastAPI
- Efficient query handling
- Robust error management
- Performance optimization
Remember to:
- Follow best practices
- Test thoroughly
- Monitor performance
- Plan for scale
Stay tuned for our next article on advanced Elasticsearch features and optimizations.