Multiple databases #400
-
|
Hi, Does beanie supports working with multiple databases ? |
Beta Was this translation helpful? Give feedback.
Replies: 9 comments 14 replies
-
|
Hi, |
Beta Was this translation helpful? Give feedback.
-
|
In the provided snippet, all instances of |
Beta Was this translation helpful? Give feedback.
-
|
@roman-right I'm using multitenant-architecture and this setup you have provided is working fine, unless we have a concurrent requests, to reproduce it, you can set up two simple endpoints (FastAPI) that deal with 2 different databases, then you can try to set up postman runner to make concurrent requests. In this case we endpoint with the following error: raise InvalidOperation("Can only use session with the MongoClient that started it") Additionally calling |
Beta Was this translation helpful? Give feedback.
-
|
@roman-right any updates on this? |
Beta Was this translation helpful? Give feedback.
-
|
I suggest Django approach could be good for adoption. I mean DB mapping and use .using('db_map_name') in queries. |
Beta Was this translation helpful? Give feedback.
-
|
I also have multitenant setup, my solution to handle it was to create some beanie document class cache (one entry per tenant DB/Document class) @classmethod
async def _init_beanie_with_document(
cls,
database_name: str,
document_class: type[Document],
retries=0,
skip_indexes=False,
) -> type[Document]:
"""
Initialize Beanie with a document class for a specific database. Uses a cache with expiration and locking to avoid redundant initializations.
Args:
database_name: database name
document_class: document class to initialize
retries: Number of attempts already made (for recursion)
Returns:
The initialized document class
"""
doc_class_name = document_class.__name__
connection_cache = ConnectionCache.get_instance()
# Check if the connection is in the cache
cached_class = await connection_cache.get(database_name, doc_class_name)
if cached_class is not None:
return cached_class
# Acquire a lock to avoid concurrent initialization.
lock = await connection_cache.get_lock(database_name, doc_class_name)
async with lock:
# Vérifier à nouveau après avoir acquis le verrou (double-check locking pattern)
cached_class = await connection_cache.get(database_name, doc_class_name)
if cached_class is not None:
return cached_class
# Initialize the Beanie connection
try:
logger.debug(
f"####################### INIT BEANIE WITH DOCUMENT ->>>>> {database_name,doc_class_name}",
)
await init_beanie(
database=client.get_database(database_name),
document_models=[document_class],
multiprocessing_mode=False,
skip_indexes=skip_indexes,
)
motor = document_class.get_motor_collection()
if motor is None:
if retries == 5:
logger.warning(
f"Init failure after 5 tries for {database_name}.{doc_class_name}"
)
return document_class
logger.debug(
f"MOTOR EMPTY pour db: {database_name} - document: {doc_class_name}, réessai {retries + 1}"
)
await asyncio.sleep(0.05)
initialized_beanie = await cls._init_beanie_with_document(
database_name, document_class, retries=retries + 1
)
return initialized_beanie
# Cache the connection
if not skip_indexes:
await connection_cache.set(
database_name, doc_class_name, document_class
)
return document_class
except Exception as e:
logger.error(
f"Beanie Initialization error for {database_name}.{doc_class_name}: {e}"
)
raise
` |
Beta Was this translation helpful? Give feedback.
-
|
Having the same feature request. In our case we are "polluting" one client database by another which is critical. A simple example of the issue looks like this: calling both endpoints will write both documents to "client.db_two". |
Beta Was this translation helpful? Give feedback.
-
|
Not sure about the state of this. My setting is like the OP, i.e. n-to-1 mapping from documents to database clients. Things seem to be working fine. I only call |
Beta Was this translation helpful? Give feedback.
-
|
Hi everyone, we've been dealing with the same multi-tenant challenge (FastAPI + Beanie, We noticed that @roman-right mentioned a new initializer with DB switch What we found in the source codeLooking at # beanie/odm/interfaces/getters.py (OtherGettersInterface)
@classmethod
def get_pymongo_collection(cls) -> AsyncCollection:
return cls.get_settings().pymongo_collectionEvery Beanie operation -- find, insert, save, replace, delete, This means The implementationOverride from __future__ import annotations
from contextvars import ContextVar, Token
from contextlib import contextmanager
from typing import Any, ClassVar
from beanie import Document
from motor.motor_asyncio import AsyncIOMotorDatabase
# -- Tenant context --------------------------------------------------------
_current_tenant: ContextVar[str | None] = ContextVar(
"current_tenant", default=None
)
@contextmanager
def tenant_context(tenant_id: str):
"""Set the active tenant for the current async context."""
token: Token = _current_tenant.set(tenant_id)
try:
yield
finally:
_current_tenant.reset(token) # restores previous value, not None
def get_current_tenant() -> str | None:
return _current_tenant.get()
# -- Tenant DB registry ----------------------------------------------------
_tenant_dbs: dict[str, AsyncIOMotorDatabase] = {}
def register_tenant_db(tenant_id: str, db: AsyncIOMotorDatabase) -> None:
if not isinstance(db, AsyncIOMotorDatabase):
raise TypeError(
f"Expected AsyncIOMotorDatabase, got {type(db).__name__}"
)
_tenant_dbs[tenant_id] = db
def get_tenant_db(tenant_id: str) -> AsyncIOMotorDatabase:
try:
return _tenant_dbs[tenant_id]
except KeyError:
raise ValueError(f"Unknown tenant: {tenant_id!r}")
# -- Settings cache --------------------------------------------------------
_settings_cache: dict[tuple, Any] = {}
def clear_settings_cache() -> None:
_settings_cache.clear()
# -- Base document ---------------------------------------------------------
class TenantDocument(Document):
"""
Base class for all documents that should be tenant-aware.
Inherit from this instead of Document.
"""
class Settings:
is_root = True # so Beanie doesn't treat subclasses as union types
@classmethod
def get_settings(cls):
tenant = get_current_tenant()
if tenant is None:
return super().get_settings()
cache_key = (cls, tenant) # cls itself, not cls.__name__
cached = _settings_cache.get(cache_key)
if cached is not None:
return cached
original = super().get_settings()
db = get_tenant_db(tenant)
settings = original.model_copy()
settings.pymongo_collection = db[original.name]
settings.pymongo_db = db
_settings_cache[cache_key] = settings
return settings
Usage with FastAPITenant detection happens in raw ASGI middleware (not Starlette's class TenantMiddleware:
def __init__(self, app):
self.app = app
async def __call__(self, scope, receive, send):
if scope["type"] not in ("http", "websocket"):
await self.app(scope, receive, send)
return
tenant_id = extract_tenant_from_jwt(scope) # your JWT logic
if tenant_id:
with tenant_context(tenant_id):
await self.app(scope, receive, send)
else:
await self.app(scope, receive, send)What we've testedWe tried this in a FastAPI application where the tenant ID is carried in Known limitation: indexesSince Our workaround is to create indexes when registering a new tenant database: async def ensure_tenant_indexes(
document_models: list[type[TenantDocument]],
tenant_id: str,
) -> None:
"""Create indexes on a tenant database for all document models."""
with tenant_context(tenant_id):
for model in document_models:
try:
await model._ensure_indexes()
except AttributeError:
# Method name may differ across Beanie versions
await model.ensure_indexes()This runs once per tenant at registration time, not per request. We're wondering if there's a cleaner way -- maybe by calling parts Feedback welcomeWe've tested this extensively and it works well so far, but we're sure
Any other thoughts or "have you thought about X?" equally welcome. |
Beta Was this translation helpful? Give feedback.
Hi,
There is no "good practice" option for this now. I'm working on a new initializer atm - plan to add db swaps there.