Building a search API with Elasticsearch requires careful consideration of data modeling, query construction, and response handling. In this article, we’ll explore how to create a robust search API using Python and FastAPI, focusing on best practices and performance optimization.

Introduction

Modern applications often require sophisticated search capabilities. By combining Elasticsearch with Python’s FastAPI framework, we can create powerful search APIs that deliver fast, relevant results. We’ll cover the implementation using Python’s ecosystem of tools and libraries.

Project Setup

# Create virtual environment
python -m venv venv
source venv/bin/activate  # On Windows: venv\Scripts\activate

# Install dependencies
pip install fastapi uvicorn elasticsearch python-dotenv redis

Basic API Structure

# main.py
from fastapi import FastAPI, HTTPException
from elasticsearch import AsyncElasticsearch
from dotenv import load_dotenv
import os
from typing import Optional, List, Dict, Any

load_dotenv()

app = FastAPI()

# Initialize Elasticsearch client
es = AsyncElasticsearch(
    os.getenv("ELASTICSEARCH_URL"),
    basic_auth=(
        os.getenv("ELASTICSEARCH_USERNAME"),
        os.getenv("ELASTICSEARCH_PASSWORD")
    )
)

@app.get("/api/search")
async def search(
    query: str,
    page: int = 1,
    size: int = 10
):
    try:
        result = await es.search(
            index="products",
            from_=(page - 1) * size,
            size=size,
            query={
                "multi_match": {
                    "query": query,
                    "fields": ["title^3", "description", "category"]
                }
            }
        )
        
        return {
            "results": result["hits"]["hits"],
            "total": result["hits"]["total"]["value"],
            "page": page,
            "size": size
        }
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=3000)

Advanced Search Features

@app.get("/api/search/faceted")
async def faceted_search(
    query: str,
    filters: Optional[Dict[str, str]] = None
):
    try:
        search_query = {
            "bool": {
                "must": [
                    {
                        "multi_match": {
                            "query": query,
                            "fields": ["title^3", "description"]
                        }
                    }
                ]
            }
        }
        
        if filters:
            search_query["bool"]["filter"] = [
                {"term": {field: value}}
                for field, value in filters.items()
            ]
        
        result = await es.search(
            index="products",
            query=search_query,
            aggs={
                "categories": {"terms": {"field": "category"}},
                "price_ranges": {
                    "range": {
                        "field": "price",
                        "ranges": [
                            {"to": 50},
                            {"from": 50, "to": 100},
                            {"from": 100}
                        ]
                    }
                }
            }
        )
        
        return {
            "results": result["hits"]["hits"],
            "facets": result["aggregations"]
        }
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

Autocomplete

@app.get("/api/autocomplete")
async def autocomplete(prefix: str):
    try:
        result = await es.search(
            index="products",
            suggest={
                "product-suggest": {
                    "prefix": prefix,
                    "completion": {
                        "field": "suggest",
                        "size": 5
                    }
                }
            }
        )
        
        return {
            "suggestions": result["suggest"]["product-suggest"][0]["options"]
        }
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

Error Handling and Validation

Input Validation

from pydantic import BaseModel, Field

class SearchParams(BaseModel):
    query: str
    page: int = Field(default=1, ge=1)
    size: int = Field(default=10, ge=1, le=100)

@app.get("/api/search/validated")
async def validated_search(params: SearchParams):
    try:
        result = await es.search(
            index="products",
            from_=(params.page - 1) * params.size,
            size=params.size,
            query={
                "multi_match": {
                    "query": params.query,
                    "fields": ["title^3", "description", "category"]
                }
            }
        )
        
        return {
            "results": result["hits"]["hits"],
            "total": result["hits"]["total"]["value"],
            "page": params.page,
            "size": params.size
        }
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

Error Handling

from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse

app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

@app.exception_handler(Exception)
async def global_exception_handler(request, exc):
    return JSONResponse(
        status_code=500,
        content={"error": str(exc)}
    )

Performance Optimization

Caching

import redis
from fastapi_cache import FastAPICache
from fastapi_cache.backends.redis import RedisBackend
from fastapi_cache.decorator import cache

# Initialize Redis
redis_client = redis.Redis(
    host=os.getenv("REDIS_HOST", "localhost"),
    port=int(os.getenv("REDIS_PORT", 6379)),
    db=0
)

@app.on_event("startup")
async def startup():
    FastAPICache.init(RedisBackend(redis_client))

@app.get("/api/search/cached")
@cache(expire=3600)
async def cached_search(query: str, page: int = 1, size: int = 10):
    try:
        result = await es.search(
            index="products",
            from_=(page - 1) * size,
            size=size,
            query={
                "multi_match": {
                    "query": query,
                    "fields": ["title^3", "description", "category"]
                }
            }
        )
        
        return {
            "results": result["hits"]["hits"],
            "total": result["hits"]["total"]["value"],
            "page": page,
            "size": size
        }
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

Query Optimization

def optimize_query(query: str) -> Dict[str, Any]:
    return {
        "query": {
            "bool": {
                "should": [
                    {
                        "match_phrase": {
                            "title": {
                                "query": query,
                                "boost": 3
                            }
                        }
                    },
                    {
                        "match": {
                            "description": query
                        }
                    }
                ],
                "minimum_should_match": 1
            }
        }
    }

Testing

Unit Tests

# test_search.py
import pytest
from fastapi.testclient import TestClient
from main import app

client = TestClient(app)

def test_search_endpoint():
    response = client.get("/api/search?query=laptop")
    assert response.status_code == 200
    data = response.json()
    assert "results" in data
    assert "total" in data
    assert isinstance(data["results"], list)

Integration Tests

# test_integration.py
import pytest
from elasticsearch import AsyncElasticsearch
from main import app, es

@pytest.mark.asyncio
async def test_search_integration():
    # Test with actual Elasticsearch connection
    result = await es.search(
        index="products",
        query={"match_all": {}}
    )
    assert "hits" in result
    assert "total" in result["hits"]

Deployment Considerations

Environment Configuration

# .env file
ELASTICSEARCH_URL=http://localhost:9200
ELASTICSEARCH_USERNAME=admin
ELASTICSEARCH_PASSWORD=secret
REDIS_HOST=localhost
REDIS_PORT=6379

Docker Configuration

# Dockerfile
FROM python:3.9-slim

WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY . .

EXPOSE 3000
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "3000"]

Best Practices

  1. API Design

    • Use FastAPI’s built-in validation
    • Implement proper error handling
    • Add request validation with Pydantic
    • Document endpoints with OpenAPI
  2. Performance

    • Implement Redis caching
    • Optimize Elasticsearch queries
    • Use async/await for I/O operations
    • Monitor response times
  3. Security

    • Validate inputs with Pydantic
    • Implement rate limiting
    • Use proper authentication
    • Sanitize outputs

Common Issues and Solutions

Performance Issues

  • Implement Redis caching
  • Optimize query structure
  • Use appropriate indexes
  • Monitor resource usage

Scalability

  • Use connection pooling
  • Implement load balancing
  • Monitor cluster health
  • Plan for horizontal scaling

Next Steps

After building your search API:

  1. Add authentication with JWT
  2. Implement monitoring with Prometheus
  3. Add more search features
  4. Optimize for production

Conclusion

Building a search API with Elasticsearch and Python requires:

  • Proper project structure with FastAPI
  • Efficient query handling
  • Robust error management
  • Performance optimization

Remember to:

  • Follow best practices
  • Test thoroughly
  • Monitor performance
  • Plan for scale

Stay tuned for our next article on advanced Elasticsearch features and optimizations.