Skip to content

odmantic.engine

Bases: BaseEngine

The AIOEngine object is responsible for handling database operations with MongoDB in an asynchronous way using motor.

Source code in odmantic/engine.py
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
class AIOEngine(BaseEngine):
    """The AIOEngine object is responsible for handling database operations with MongoDB
    in an asynchronous way using motor.
    """

    client: "AsyncIOMotorClient"
    database: "AsyncIOMotorDatabase"

    def __init__(
        self,
        client: Union["AsyncIOMotorClient", None] = None,
        database: str = "test",
    ):
        """Engine constructor.

        Args:
            client: instance of an AsyncIO motor client. If None, a default one
                    will be created
            database: name of the database to use

        <!---
        #noqa: DAR401 RuntimeError
        -->
        """
        if not motor:
            raise RuntimeError(
                "motor is required to use AIOEngine, install it with:\n\n"
                + 'pip install "odmantic[motor]"'
            )
        if client is None:
            client = AsyncIOMotorClient()
        super().__init__(client=client, database=database)

    def get_collection(self, model: Type[ModelType]) -> "AsyncIOMotorCollection":
        """Get the motor collection associated to a Model.

        Args:
            model: model class

        Returns:
            the AsyncIO motor collection object
        """
        return self.database[model.__collection__]

    @staticmethod
    def _get_session(
        session: Union[AIOSessionType, AIOSessionBase],
    ) -> Optional[AsyncIOMotorClientSession]:
        if isinstance(session, (AIOSession, AIOTransaction)):
            return session.get_driver_session()
        assert not isinstance(session, AIOSessionBase)  # Abstract class
        return session

    async def configure_database(
        self,
        models: Sequence[Type[ModelType]],
        *,
        update_existing_indexes: bool = False,
        session: SyncSessionType = None,
    ) -> None:
        """Apply model constraints to the database.

        Args:
            models: list of models to initialize the database with
            update_existing_indexes: conflicting indexes will be dropped before creation
            session: an optional session to use for the operation

        <!---
        #noqa: DAR401 pymongo.errors.OperationFailure
        -->
        """
        driver_session = self._get_session(session)
        for model in models:
            collection = self.get_collection(model)
            for index in model.__indexes__():
                pymongo_index = (
                    index.get_pymongo_index()
                    if isinstance(index, ODMBaseIndex)
                    else index
                )
                try:
                    await collection.create_indexes(
                        [pymongo_index], session=driver_session
                    )
                except pymongo.errors.OperationFailure as exc:
                    if update_existing_indexes and getattr(exc, "code", None) in (
                        85,  # aka IndexOptionsConflict
                        86,  # aka IndexKeySpecsConflict for MongoDB > 5
                    ):
                        await collection.drop_index(
                            pymongo_index.document["name"], session=driver_session
                        )
                        await collection.create_indexes(
                            [pymongo_index], session=driver_session
                        )
                    else:
                        raise

    def session(self) -> AIOSession:
        """Get a new session for the engine to allow ordering sequential operations.

        Returns:
            a new session object

        Example usage:

        ```python
        engine = AIOEngine(...)
        async with engine.session() as session:
            john = await session.find(User, User.name == "John")
            john.name = "Doe"
            await session.save(john)
        ```
        """
        return AIOSession(self)

    def transaction(self) -> AIOTransaction:
        """Get a new transaction for the engine to aggregate sequential operations.

        Returns:
            a new transaction object

        Example usage:
        ```python
        engine = AIOEngine(...)
        async with engine.transaction() as transaction:
            john = transaction.find(User, User.name == "John")
            john.name = "Doe"
            await transaction.save(john)
            await transaction.commit()
        ```

        Warning:
            MongoDB transaction are only supported on replicated clusters: either
            directly a replicaSet or a sharded cluster with replication enabled.
        """
        return AIOTransaction(self)

    def find(
        self,
        model: Type[ModelType],
        *queries: Union[
            QueryExpression, Dict, bool
        ],  # bool: allow using binary operators with mypy
        sort: Optional[Any] = None,
        skip: int = 0,
        limit: Optional[int] = None,
        session: AIOSessionType = None,
    ) -> AIOCursor[ModelType]:
        """Search for Model instances matching the query filter provided

        Args:
            model: model to perform the operation on
            *queries: query filter to apply
            sort: sort expression
            skip: number of document to skip
            limit: maximum number of instance fetched
            session: an optional session to use for the operation

        Raises:
            DocumentParsingError: unable to parse one of the resulting documents

        Returns:
            [odmantic.engine.AIOCursor][] of the query

        <!---
        #noqa: DAR401 ValueError
        #noqa: DAR401 TypeError
        #noqa: DAR402 DocumentParsingError
        -->
        """
        pipeline = self._prepare_find_pipeline(
            model,
            *queries,
            sort=sort,
            skip=skip,
            limit=limit,
        )
        collection = self.get_collection(model)
        motor_cursor = collection.aggregate(
            pipeline, session=self._get_session(session)
        )
        return AIOCursor(model, motor_cursor)

    async def find_one(
        self,
        model: Type[ModelType],
        *queries: Union[
            QueryExpression, Dict, bool
        ],  # bool: allow using binary operators w/o plugin
        sort: Optional[Any] = None,
        session: AIOSessionType = None,
    ) -> Optional[ModelType]:
        """Search for a Model instance matching the query filter provided

        Args:
            model: model to perform the operation on
            *queries: query filter to apply
            sort: sort expression
            session: an optional session to use for the operation

        Raises:
            DocumentParsingError: unable to parse the resulting document

        Returns:
            the fetched instance if found otherwise None

        <!---
        #noqa: DAR401 TypeError
        #noqa: DAR402 DocumentParsingError
        -->
        """
        if not lenient_issubclass(model, Model):
            raise TypeError("Can only call find_one with a Model class")
        results = await self.find(model, *queries, sort=sort, limit=1, session=session)
        if len(results) == 0:
            return None
        return results[0]

    async def _save(
        self, instance: ModelType, session: "AsyncIOMotorClientSession"
    ) -> ModelType:
        """Perform an atomic save operation in the specified session"""
        for ref_field_name in instance.__references__:
            sub_instance = cast(Model, getattr(instance, ref_field_name))
            await self._save(sub_instance, session)

        fields_to_update = instance.__fields_modified__ | instance.__mutable_fields__
        if len(fields_to_update) > 0:
            doc = instance.model_dump_doc(include=fields_to_update)
            collection = self.get_collection(type(instance))
            try:
                await collection.update_one(
                    instance.model_dump_doc(include={instance.__primary_field__}),
                    {"$set": doc},
                    upsert=True,
                    session=session,
                )
            except pymongo.errors.DuplicateKeyError as e:
                raise DuplicateKeyError(instance, e)
            object.__setattr__(instance, "__fields_modified__", set())
        return instance

    async def save(
        self,
        instance: ModelType,
        *,
        session: AIOSessionType = None,
    ) -> ModelType:
        """Persist an instance to the database

        This method behaves as an 'upsert' operation. If a document already exists
        with the same primary key, it will be overwritten.

        All the other models referenced by this instance will be saved as well.

        Args:
            instance: instance to persist
            session: An optional session to use for the operation. If not provided, an
                     internal session will be used to persist the instance and
                     sub-instances.

        Returns:
            the saved instance

        Raises:
            DuplicateKeyError: the instance is duplicated according to a unique index.

        NOTE:
            The save operation actually modify the instance argument in place. However,
            the instance is still returned for convenience.

        <!---
        #noqa: DAR401 TypeError
        #noqa: DAR402 DuplicateKeyError
        -->
        """
        if not isinstance(instance, Model):
            raise TypeError("Can only call find_one with a Model class")
        if session:
            await self._save(instance, self._get_session(session))
        else:
            async with await self.client.start_session() as local_session:
                await self._save(instance, local_session)
        return instance

    async def save_all(
        self,
        instances: Sequence[ModelType],
        *,
        session: AIOSessionType = None,
    ) -> List[ModelType]:
        """Persist instances to the database

        This method behaves as multiple 'upsert' operations. If one of the document
        already exists with the same primary key, it will be overwritten.

        All the other models referenced by this instance will be recursively saved as
        well.

        Args:
            instances: instances to persist
            session: An optional session to use for the operation. If not provided, an
                     internal session will be used to persist the instances.

        Returns:
            the saved instances

        Raises:
            DuplicateKeyError: an instance is duplicated according to a unique index.

        NOTE:
            The save_all operation actually modify the arguments in place. However, the
            instances are still returned for convenience.

        <!---
        #noqa: DAR402 DuplicateKeyError
        -->
        """
        if session:
            added_instances = [
                await self._save(instance, self._get_session(session))
                for instance in instances
            ]
        else:
            async with await self.client.start_session() as local_session:
                added_instances = [
                    await self._save(instance, local_session) for instance in instances
                ]
        return added_instances

    async def delete(
        self,
        instance: ModelType,
        *,
        session: AIOSessionType = None,
    ) -> None:
        """Delete an instance from the database

        Args:
            instance: the instance to delete
            session: an optional session to use for the operation


        Raises:
            DocumentNotFoundError: the instance has not been persisted to the database
        """
        # TODO handle cascade deletion
        collection = self.database[instance.__collection__]
        pk_name = instance.__primary_field__
        result = await collection.delete_many(
            {"_id": getattr(instance, pk_name)}, session=self._get_session(session)
        )
        count = int(result.deleted_count)
        if count == 0:
            raise DocumentNotFoundError(instance)

    async def remove(
        self,
        model: Type[ModelType],
        *queries: Union[QueryExpression, Dict, bool],
        just_one: bool = False,
        session: AIOSessionType = None,
    ) -> int:
        """Delete Model instances matching the query filter provided

        Args:
            model: model to perform the operation on
            *queries: query filter to apply
            just_one: limit the deletion to just one document
            session: an optional session to use for the operation

        Returns:
            the number of instances deleted from the database.

        """
        query = AIOEngine._build_query(*queries)
        collection = self.get_collection(model)

        if just_one:
            result = await collection.delete_one(
                query, session=self._get_session(session)
            )
        else:
            result = await collection.delete_many(
                query, session=self._get_session(session)
            )

        return cast(int, result.deleted_count)

    async def count(
        self,
        model: Type[ModelType],
        *queries: Union[QueryExpression, Dict, bool],
        session: AIOSessionType = None,
    ) -> int:
        """Get the count of documents matching a query

        Args:
            model: model to perform the operation on
            *queries: query filters to apply
            session: an optional session to use for the operation

        Returns:
            number of document matching the query

        <!---
        #noqa: DAR401 TypeError
        -->
        """
        if not lenient_issubclass(model, Model):
            raise TypeError("Can only call count with a Model class")
        query = BaseEngine._build_query(*queries)
        collection = self.database[model.__collection__]
        count = await collection.count_documents(
            query, session=self._get_session(session)
        )
        return int(count)

__init__(client=None, database='test')

Engine constructor.

Parameters:

Name Type Description Default
client Union[AsyncIOMotorClient, None]

instance of an AsyncIO motor client. If None, a default one will be created

None
database str

name of the database to use

'test'
Source code in odmantic/engine.py
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
def __init__(
    self,
    client: Union["AsyncIOMotorClient", None] = None,
    database: str = "test",
):
    """Engine constructor.

    Args:
        client: instance of an AsyncIO motor client. If None, a default one
                will be created
        database: name of the database to use

    <!---
    #noqa: DAR401 RuntimeError
    -->
    """
    if not motor:
        raise RuntimeError(
            "motor is required to use AIOEngine, install it with:\n\n"
            + 'pip install "odmantic[motor]"'
        )
    if client is None:
        client = AsyncIOMotorClient()
    super().__init__(client=client, database=database)

configure_database(models, *, update_existing_indexes=False, session=None) async

Apply model constraints to the database.

Parameters:

Name Type Description Default
models Sequence[Type[ModelType]]

list of models to initialize the database with

required
update_existing_indexes bool

conflicting indexes will be dropped before creation

False
session SyncSessionType

an optional session to use for the operation

None
Source code in odmantic/engine.py
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
async def configure_database(
    self,
    models: Sequence[Type[ModelType]],
    *,
    update_existing_indexes: bool = False,
    session: SyncSessionType = None,
) -> None:
    """Apply model constraints to the database.

    Args:
        models: list of models to initialize the database with
        update_existing_indexes: conflicting indexes will be dropped before creation
        session: an optional session to use for the operation

    <!---
    #noqa: DAR401 pymongo.errors.OperationFailure
    -->
    """
    driver_session = self._get_session(session)
    for model in models:
        collection = self.get_collection(model)
        for index in model.__indexes__():
            pymongo_index = (
                index.get_pymongo_index()
                if isinstance(index, ODMBaseIndex)
                else index
            )
            try:
                await collection.create_indexes(
                    [pymongo_index], session=driver_session
                )
            except pymongo.errors.OperationFailure as exc:
                if update_existing_indexes and getattr(exc, "code", None) in (
                    85,  # aka IndexOptionsConflict
                    86,  # aka IndexKeySpecsConflict for MongoDB > 5
                ):
                    await collection.drop_index(
                        pymongo_index.document["name"], session=driver_session
                    )
                    await collection.create_indexes(
                        [pymongo_index], session=driver_session
                    )
                else:
                    raise

count(model, *queries, session=None) async

Get the count of documents matching a query

Parameters:

Name Type Description Default
model Type[ModelType]

model to perform the operation on

required
*queries Union[QueryExpression, Dict, bool]

query filters to apply

()
session AIOSessionType

an optional session to use for the operation

None

Returns:

Type Description
int

number of document matching the query

Source code in odmantic/engine.py
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
async def count(
    self,
    model: Type[ModelType],
    *queries: Union[QueryExpression, Dict, bool],
    session: AIOSessionType = None,
) -> int:
    """Get the count of documents matching a query

    Args:
        model: model to perform the operation on
        *queries: query filters to apply
        session: an optional session to use for the operation

    Returns:
        number of document matching the query

    <!---
    #noqa: DAR401 TypeError
    -->
    """
    if not lenient_issubclass(model, Model):
        raise TypeError("Can only call count with a Model class")
    query = BaseEngine._build_query(*queries)
    collection = self.database[model.__collection__]
    count = await collection.count_documents(
        query, session=self._get_session(session)
    )
    return int(count)

delete(instance, *, session=None) async

Delete an instance from the database

Parameters:

Name Type Description Default
instance ModelType

the instance to delete

required
session AIOSessionType

an optional session to use for the operation

None

Raises:

Type Description
DocumentNotFoundError

the instance has not been persisted to the database

Source code in odmantic/engine.py
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
async def delete(
    self,
    instance: ModelType,
    *,
    session: AIOSessionType = None,
) -> None:
    """Delete an instance from the database

    Args:
        instance: the instance to delete
        session: an optional session to use for the operation


    Raises:
        DocumentNotFoundError: the instance has not been persisted to the database
    """
    # TODO handle cascade deletion
    collection = self.database[instance.__collection__]
    pk_name = instance.__primary_field__
    result = await collection.delete_many(
        {"_id": getattr(instance, pk_name)}, session=self._get_session(session)
    )
    count = int(result.deleted_count)
    if count == 0:
        raise DocumentNotFoundError(instance)

find(model, *queries, sort=None, skip=0, limit=None, session=None)

Search for Model instances matching the query filter provided

Parameters:

Name Type Description Default
model Type[ModelType]

model to perform the operation on

required
*queries Union[QueryExpression, Dict, bool]

query filter to apply

()
sort Optional[Any]

sort expression

None
skip int

number of document to skip

0
limit Optional[int]

maximum number of instance fetched

None
session AIOSessionType

an optional session to use for the operation

None

Raises:

Type Description
DocumentParsingError

unable to parse one of the resulting documents

Returns:

Type Description
AIOCursor[ModelType]
Source code in odmantic/engine.py
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
def find(
    self,
    model: Type[ModelType],
    *queries: Union[
        QueryExpression, Dict, bool
    ],  # bool: allow using binary operators with mypy
    sort: Optional[Any] = None,
    skip: int = 0,
    limit: Optional[int] = None,
    session: AIOSessionType = None,
) -> AIOCursor[ModelType]:
    """Search for Model instances matching the query filter provided

    Args:
        model: model to perform the operation on
        *queries: query filter to apply
        sort: sort expression
        skip: number of document to skip
        limit: maximum number of instance fetched
        session: an optional session to use for the operation

    Raises:
        DocumentParsingError: unable to parse one of the resulting documents

    Returns:
        [odmantic.engine.AIOCursor][] of the query

    <!---
    #noqa: DAR401 ValueError
    #noqa: DAR401 TypeError
    #noqa: DAR402 DocumentParsingError
    -->
    """
    pipeline = self._prepare_find_pipeline(
        model,
        *queries,
        sort=sort,
        skip=skip,
        limit=limit,
    )
    collection = self.get_collection(model)
    motor_cursor = collection.aggregate(
        pipeline, session=self._get_session(session)
    )
    return AIOCursor(model, motor_cursor)

find_one(model, *queries, sort=None, session=None) async

Search for a Model instance matching the query filter provided

Parameters:

Name Type Description Default
model Type[ModelType]

model to perform the operation on

required
*queries Union[QueryExpression, Dict, bool]

query filter to apply

()
sort Optional[Any]

sort expression

None
session AIOSessionType

an optional session to use for the operation

None

Raises:

Type Description
DocumentParsingError

unable to parse the resulting document

Returns:

Type Description
Optional[ModelType]

the fetched instance if found otherwise None

Source code in odmantic/engine.py
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
async def find_one(
    self,
    model: Type[ModelType],
    *queries: Union[
        QueryExpression, Dict, bool
    ],  # bool: allow using binary operators w/o plugin
    sort: Optional[Any] = None,
    session: AIOSessionType = None,
) -> Optional[ModelType]:
    """Search for a Model instance matching the query filter provided

    Args:
        model: model to perform the operation on
        *queries: query filter to apply
        sort: sort expression
        session: an optional session to use for the operation

    Raises:
        DocumentParsingError: unable to parse the resulting document

    Returns:
        the fetched instance if found otherwise None

    <!---
    #noqa: DAR401 TypeError
    #noqa: DAR402 DocumentParsingError
    -->
    """
    if not lenient_issubclass(model, Model):
        raise TypeError("Can only call find_one with a Model class")
    results = await self.find(model, *queries, sort=sort, limit=1, session=session)
    if len(results) == 0:
        return None
    return results[0]

get_collection(model)

Get the motor collection associated to a Model.

Parameters:

Name Type Description Default
model Type[ModelType]

model class

required

Returns:

Type Description
AsyncIOMotorCollection

the AsyncIO motor collection object

Source code in odmantic/engine.py
328
329
330
331
332
333
334
335
336
337
def get_collection(self, model: Type[ModelType]) -> "AsyncIOMotorCollection":
    """Get the motor collection associated to a Model.

    Args:
        model: model class

    Returns:
        the AsyncIO motor collection object
    """
    return self.database[model.__collection__]

remove(model, *queries, just_one=False, session=None) async

Delete Model instances matching the query filter provided

Parameters:

Name Type Description Default
model Type[ModelType]

model to perform the operation on

required
*queries Union[QueryExpression, Dict, bool]

query filter to apply

()
just_one bool

limit the deletion to just one document

False
session AIOSessionType

an optional session to use for the operation

None

Returns:

Type Description
int

the number of instances deleted from the database.

Source code in odmantic/engine.py
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
async def remove(
    self,
    model: Type[ModelType],
    *queries: Union[QueryExpression, Dict, bool],
    just_one: bool = False,
    session: AIOSessionType = None,
) -> int:
    """Delete Model instances matching the query filter provided

    Args:
        model: model to perform the operation on
        *queries: query filter to apply
        just_one: limit the deletion to just one document
        session: an optional session to use for the operation

    Returns:
        the number of instances deleted from the database.

    """
    query = AIOEngine._build_query(*queries)
    collection = self.get_collection(model)

    if just_one:
        result = await collection.delete_one(
            query, session=self._get_session(session)
        )
    else:
        result = await collection.delete_many(
            query, session=self._get_session(session)
        )

    return cast(int, result.deleted_count)

save(instance, *, session=None) async

Persist an instance to the database

This method behaves as an 'upsert' operation. If a document already exists with the same primary key, it will be overwritten.

All the other models referenced by this instance will be saved as well.

Parameters:

Name Type Description Default
instance ModelType

instance to persist

required
session AIOSessionType

An optional session to use for the operation. If not provided, an internal session will be used to persist the instance and sub-instances.

None

Returns:

Type Description
ModelType

the saved instance

Raises:

Type Description
DuplicateKeyError

the instance is duplicated according to a unique index.

NOTE

The save operation actually modify the instance argument in place. However, the instance is still returned for convenience.

Source code in odmantic/engine.py
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
async def save(
    self,
    instance: ModelType,
    *,
    session: AIOSessionType = None,
) -> ModelType:
    """Persist an instance to the database

    This method behaves as an 'upsert' operation. If a document already exists
    with the same primary key, it will be overwritten.

    All the other models referenced by this instance will be saved as well.

    Args:
        instance: instance to persist
        session: An optional session to use for the operation. If not provided, an
                 internal session will be used to persist the instance and
                 sub-instances.

    Returns:
        the saved instance

    Raises:
        DuplicateKeyError: the instance is duplicated according to a unique index.

    NOTE:
        The save operation actually modify the instance argument in place. However,
        the instance is still returned for convenience.

    <!---
    #noqa: DAR401 TypeError
    #noqa: DAR402 DuplicateKeyError
    -->
    """
    if not isinstance(instance, Model):
        raise TypeError("Can only call find_one with a Model class")
    if session:
        await self._save(instance, self._get_session(session))
    else:
        async with await self.client.start_session() as local_session:
            await self._save(instance, local_session)
    return instance

save_all(instances, *, session=None) async

Persist instances to the database

This method behaves as multiple 'upsert' operations. If one of the document already exists with the same primary key, it will be overwritten.

All the other models referenced by this instance will be recursively saved as well.

Parameters:

Name Type Description Default
instances Sequence[ModelType]

instances to persist

required
session AIOSessionType

An optional session to use for the operation. If not provided, an internal session will be used to persist the instances.

None

Returns:

Type Description
List[ModelType]

the saved instances

Raises:

Type Description
DuplicateKeyError

an instance is duplicated according to a unique index.

NOTE

The save_all operation actually modify the arguments in place. However, the instances are still returned for convenience.

Source code in odmantic/engine.py
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
async def save_all(
    self,
    instances: Sequence[ModelType],
    *,
    session: AIOSessionType = None,
) -> List[ModelType]:
    """Persist instances to the database

    This method behaves as multiple 'upsert' operations. If one of the document
    already exists with the same primary key, it will be overwritten.

    All the other models referenced by this instance will be recursively saved as
    well.

    Args:
        instances: instances to persist
        session: An optional session to use for the operation. If not provided, an
                 internal session will be used to persist the instances.

    Returns:
        the saved instances

    Raises:
        DuplicateKeyError: an instance is duplicated according to a unique index.

    NOTE:
        The save_all operation actually modify the arguments in place. However, the
        instances are still returned for convenience.

    <!---
    #noqa: DAR402 DuplicateKeyError
    -->
    """
    if session:
        added_instances = [
            await self._save(instance, self._get_session(session))
            for instance in instances
        ]
    else:
        async with await self.client.start_session() as local_session:
            added_instances = [
                await self._save(instance, local_session) for instance in instances
            ]
    return added_instances

session()

Get a new session for the engine to allow ordering sequential operations.

Returns:

Type Description
AIOSession

a new session object

Example usage:

engine = AIOEngine(...)
async with engine.session() as session:
    john = await session.find(User, User.name == "John")
    john.name = "Doe"
    await session.save(john)
Source code in odmantic/engine.py
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
def session(self) -> AIOSession:
    """Get a new session for the engine to allow ordering sequential operations.

    Returns:
        a new session object

    Example usage:

    ```python
    engine = AIOEngine(...)
    async with engine.session() as session:
        john = await session.find(User, User.name == "John")
        john.name = "Doe"
        await session.save(john)
    ```
    """
    return AIOSession(self)

transaction()

Get a new transaction for the engine to aggregate sequential operations.

Returns:

Type Description
AIOTransaction

a new transaction object

Example usage:

engine = AIOEngine(...)
async with engine.transaction() as transaction:
    john = transaction.find(User, User.name == "John")
    john.name = "Doe"
    await transaction.save(john)
    await transaction.commit()

Warning

MongoDB transaction are only supported on replicated clusters: either directly a replicaSet or a sharded cluster with replication enabled.

Source code in odmantic/engine.py
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
def transaction(self) -> AIOTransaction:
    """Get a new transaction for the engine to aggregate sequential operations.

    Returns:
        a new transaction object

    Example usage:
    ```python
    engine = AIOEngine(...)
    async with engine.transaction() as transaction:
        john = transaction.find(User, User.name == "John")
        john.name = "Doe"
        await transaction.save(john)
        await transaction.commit()
    ```

    Warning:
        MongoDB transaction are only supported on replicated clusters: either
        directly a replicaSet or a sharded cluster with replication enabled.
    """
    return AIOTransaction(self)

Bases: BaseCursor[ModelType], AsyncIterable[ModelType], Awaitable[List[ModelType]]

This object has to be built from the odmantic.engine.AIOEngine.find method.

An AIOCursor object support multiple async operations:

  • async for: asynchronously iterate over the query results
  • await : when awaited it will return a list of the fetched models
Source code in odmantic/engine.py
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
class AIOCursor(
    BaseCursor[ModelType], AsyncIterable[ModelType], Awaitable[List[ModelType]]
):
    """This object has to be built from the [odmantic.engine.AIOEngine.find][] method.

    An AIOCursor object support multiple async operations:

      - **async for**: asynchronously iterate over the query results
      - **await** : when awaited it will return a list of the fetched models
    """

    _cursor: "AsyncIOMotorCursor"

    def __init__(self, model: Type[ModelType], cursor: "AsyncIOMotorCursor"):
        super().__init__(model=model, cursor=cursor)

    def __await__(self) -> Generator[None, None, List[ModelType]]:
        if self._results is not None:
            return self._results
        raw_docs = yield from self._cursor.to_list(length=None).__await__()
        instances = []
        for raw_doc in raw_docs:
            instances.append(self._parse_document(raw_doc))
            yield
        self._results = instances
        return instances

    async def __aiter__(self) -> AsyncGenerator[ModelType, None]:
        if self._results is not None:
            for res in self._results:
                yield res
            return
        results = []
        async for raw_doc in self._cursor:
            instance = self._parse_document(raw_doc)
            results.append(instance)
            yield instance
        self._results = results

Bases: BaseEngine

The SyncEngine object is responsible for handling database operations with MongoDB in an synchronous way using pymongo.

Source code in odmantic/engine.py
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
class SyncEngine(BaseEngine):
    """The SyncEngine object is responsible for handling database operations with
    MongoDB in an synchronous way using pymongo.
    """

    client: "MongoClient"
    database: "Database"

    def __init__(
        self,
        client: "Union[MongoClient, None]" = None,
        database: str = "test",
    ):
        """Engine constructor.

        Args:
            client: instance of a PyMongo client. If None, a default one
                    will be created
            database: name of the database to use
        """
        if client is None:
            client = MongoClient()
        super().__init__(client=client, database=database)

    def get_collection(self, model: Type[ModelType]) -> "Collection":
        """Get the pymongo collection associated to a Model.

        Args:
            model: model class

        Returns:
            the pymongo collection object
        """
        collection = self.database[model.__collection__]
        return collection

    @staticmethod
    def _get_session(
        session: Union[SyncSessionType, SyncSessionBase],
    ) -> Optional[ClientSession]:
        if isinstance(session, (SyncSession, SyncTransaction)):
            return session.get_driver_session()
        assert not isinstance(session, SyncSessionBase)  # Abstract class
        return session

    def configure_database(
        self,
        models: Sequence[Type[ModelType]],
        *,
        update_existing_indexes: bool = False,
        session: SyncSessionType = None,
    ) -> None:
        """Apply model constraints to the database.

        Args:
            models: list of models to initialize the database with
            update_existing_indexes: conflicting indexes will be dropped before creation
            session: an optional session to use for the operation

        <!---
        #noqa: DAR401 pymongo.errors.OperationFailure
        -->
        """
        driver_session = self._get_session(session)
        for model in models:
            collection = self.get_collection(model)
            for index in model.__indexes__():
                pymongo_index = (
                    index.get_pymongo_index()
                    if isinstance(index, ODMBaseIndex)
                    else index
                )
                try:
                    collection.create_indexes([pymongo_index], session=driver_session)
                except pymongo.errors.OperationFailure as exc:
                    if update_existing_indexes and getattr(exc, "code", None) in (
                        85,  # aka IndexOptionsConflict
                        86,  # aka IndexKeySpecsConflict for MongoDB > 5
                    ):
                        collection.drop_index(
                            pymongo_index.document["name"], session=driver_session
                        )
                        collection.create_indexes(
                            [pymongo_index], session=driver_session
                        )
                    else:
                        raise

    def session(self) -> SyncSession:
        """Get a new session for the engine to allow ordering sequential operations.

        Returns:
            a new session object

        Example usage:

        ```python
        engine = SyncEngine(...)
        with engine.session() as session:
            john = session.find(User, User.name == "John")
            john.name = "Doe"
            session.save(john)
        ```
        """
        return SyncSession(self)

    def transaction(self) -> SyncTransaction:
        """Get a new transaction for the engine to aggregate sequential operations.

        Returns:
            a new transaction object

        Example usage:
        ```python
        engine = SyncEngine(...)
        with engine.transaction() as transaction:
            john = transaction.find(User, User.name == "John")
            john.name = "Doe"
            transaction.save(john)
            transaction.commit()
        ```

        Warning:
            MongoDB transaction are only supported on replicated clusters: either
            directly a replicaSet or a sharded cluster with replication enabled.
        """
        return SyncTransaction(self)

    def find(
        self,
        model: Type[ModelType],
        *queries: Union[
            QueryExpression, Dict, bool
        ],  # bool: allow using binary operators with mypy
        sort: Optional[Any] = None,
        skip: int = 0,
        limit: Optional[int] = None,
        session: SyncSessionType = None,
    ) -> SyncCursor[ModelType]:
        """Search for Model instances matching the query filter provided

        Args:
            model: model to perform the operation on
            *queries: query filter to apply
            sort: sort expression
            skip: number of document to skip
            limit: maximum number of instance fetched
            session: an optional session to use for the operation

        Raises:
            DocumentParsingError: unable to parse one of the resulting documents

        Returns:
            [odmantic.engine.SyncCursor][] of the query

        <!---
        #noqa: DAR401 ValueError
        #noqa: DAR401 TypeError
        #noqa: DAR402 DocumentParsingError
        -->
        """
        pipeline = self._prepare_find_pipeline(
            model,
            *queries,
            sort=sort,
            skip=skip,
            limit=limit,
        )
        collection = self.get_collection(model)
        cursor = collection.aggregate(pipeline, session=self._get_session(session))
        return SyncCursor(model, cursor)

    def find_one(
        self,
        model: Type[ModelType],
        *queries: Union[
            QueryExpression, Dict, bool
        ],  # bool: allow using binary operators w/o plugin
        sort: Optional[Any] = None,
        session: SyncSessionType = None,
    ) -> Optional[ModelType]:
        """Search for a Model instance matching the query filter provided

        Args:
            model: model to perform the operation on
            *queries: query filter to apply
            sort: sort expression
            session: an optional session to use for the operation

        Raises:
            DocumentParsingError: unable to parse the resulting document

        Returns:
            the fetched instance if found otherwise None

        <!---
        #noqa: DAR401 TypeError
        #noqa: DAR402 DocumentParsingError
        -->
        """
        if not lenient_issubclass(model, Model):
            raise TypeError("Can only call find_one with a Model class")
        results = list(self.find(model, *queries, sort=sort, limit=1, session=session))
        if len(results) == 0:
            return None
        return results[0]

    def _save(self, instance: ModelType, session: "ClientSession") -> ModelType:
        """Perform an atomic save operation in the specified session"""
        for ref_field_name in instance.__references__:
            sub_instance = cast(Model, getattr(instance, ref_field_name))
            self._save(sub_instance, session)

        fields_to_update = instance.__fields_modified__ | instance.__mutable_fields__
        if len(fields_to_update) > 0:
            doc = instance.model_dump_doc(include=fields_to_update)
            collection = self.get_collection(type(instance))
            try:
                collection.update_one(
                    instance.model_dump_doc(include={instance.__primary_field__}),
                    {"$set": doc},
                    upsert=True,
                    session=session,
                )
            except pymongo.errors.DuplicateKeyError as e:
                raise DuplicateKeyError(instance, e)
            object.__setattr__(instance, "__fields_modified__", set())
        return instance

    def save(
        self,
        instance: ModelType,
        *,
        session: SyncSessionType = None,
    ) -> ModelType:
        """Persist an instance to the database

        This method behaves as an 'upsert' operation. If a document already exists
        with the same primary key, it will be overwritten.

        All the other models referenced by this instance will be saved as well.

        Args:
            instance: instance to persist
            session: An optional session to use for the operation. If not provided, an
                     internal session will be used to persist the instance and
                     sub-instances.

        Returns:
            the saved instance

        Raises:
            DuplicateKeyError: the instance is duplicated according to a unique index.

        NOTE:
            The save operation actually modify the instance argument in place. However,
            the instance is still returned for convenience.

        <!---
        #noqa: DAR401 TypeError
        #noqa: DAR402 DuplicateKeyError
        -->
        """
        if not isinstance(instance, Model):
            raise TypeError("Can only call find_one with a Model class")

        if session:
            self._save(instance, self._get_session(session))  # type: ignore
        else:
            with self.client.start_session() as local_session:
                self._save(instance, local_session)
        return instance

    def save_all(
        self,
        instances: Sequence[ModelType],
        *,
        session: SyncSessionType = None,
    ) -> List[ModelType]:
        """Persist instances to the database

        This method behaves as multiple 'upsert' operations. If one of the document
        already exists with the same primary key, it will be overwritten.

        All the other models referenced by this instance will be recursively saved as
        well.

        Args:
            instances: instances to persist
            session: An optional session to use for the operation. If not provided an
                     internal session will be used to persist the instances.

        Returns:
            the saved instances

        Raises:
            DuplicateKeyError: an instance is duplicated according to a unique index.

        NOTE:
            The save_all operation actually modify the arguments in place. However, the
            instances are still returned for convenience.
        <!---
        #noqa: DAR402 DuplicateKeyError
        -->
        """
        if session:
            added_instances = [
                self._save(instance, self._get_session(session))  # type: ignore
                for instance in instances
            ]
        else:
            with self.client.start_session() as local_session:
                added_instances = [
                    self._save(instance, local_session) for instance in instances
                ]
        return added_instances

    def delete(
        self,
        instance: ModelType,
        session: SyncSessionType = None,
    ) -> None:
        """Delete an instance from the database

        Args:
            instance: the instance to delete
            session: an optional session to use for the operation

        Raises:
            DocumentNotFoundError: the instance has not been persisted to the database

        """
        # TODO handle cascade deletion
        collection = self.database[instance.__collection__]
        pk_name = instance.__primary_field__
        result = collection.delete_many(
            {"_id": getattr(instance, pk_name)}, session=self._get_session(session)
        )
        count = result.deleted_count
        if count == 0:
            raise DocumentNotFoundError(instance)

    def remove(
        self,
        model: Type[ModelType],
        *queries: Union[QueryExpression, Dict, bool],
        just_one: bool = False,
        session: SyncSessionType = None,
    ) -> int:
        """Delete Model instances matching the query filter provided

        Args:
            model: model to perform the operation on
            *queries: query filter to apply
            just_one: limit the deletion to just one document
            session: an optional session to use for the operation

        Returns:
            the number of instances deleted from the database.
        """
        query = SyncEngine._build_query(*queries)
        collection = self.get_collection(model)

        if just_one:
            result = collection.delete_one(query, session=self._get_session(session))
        else:
            result = collection.delete_many(query, session=self._get_session(session))

        return result.deleted_count

    def count(
        self,
        model: Type[ModelType],
        *queries: Union[QueryExpression, Dict, bool],
        session: SyncSessionType = None,
    ) -> int:
        """Get the count of documents matching a query

        Args:
            model: model to perform the operation on
            *queries: query filters to apply
            session: an optional session to use for the operation

        Returns:
            number of document matching the query

        <!---
        #noqa: DAR401 TypeError
        -->
        """
        if not lenient_issubclass(model, Model):
            raise TypeError("Can only call count with a Model class")
        query = BaseEngine._build_query(*queries)
        collection = self.database[model.__collection__]
        count = collection.count_documents(query, session=self._get_session(session))
        return int(count)

__init__(client=None, database='test')

Engine constructor.

Parameters:

Name Type Description Default
client Union[MongoClient, None]

instance of a PyMongo client. If None, a default one will be created

None
database str

name of the database to use

'test'
Source code in odmantic/engine.py
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
def __init__(
    self,
    client: "Union[MongoClient, None]" = None,
    database: str = "test",
):
    """Engine constructor.

    Args:
        client: instance of a PyMongo client. If None, a default one
                will be created
        database: name of the database to use
    """
    if client is None:
        client = MongoClient()
    super().__init__(client=client, database=database)

configure_database(models, *, update_existing_indexes=False, session=None)

Apply model constraints to the database.

Parameters:

Name Type Description Default
models Sequence[Type[ModelType]]

list of models to initialize the database with

required
update_existing_indexes bool

conflicting indexes will be dropped before creation

False
session SyncSessionType

an optional session to use for the operation

None
Source code in odmantic/engine.py
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
def configure_database(
    self,
    models: Sequence[Type[ModelType]],
    *,
    update_existing_indexes: bool = False,
    session: SyncSessionType = None,
) -> None:
    """Apply model constraints to the database.

    Args:
        models: list of models to initialize the database with
        update_existing_indexes: conflicting indexes will be dropped before creation
        session: an optional session to use for the operation

    <!---
    #noqa: DAR401 pymongo.errors.OperationFailure
    -->
    """
    driver_session = self._get_session(session)
    for model in models:
        collection = self.get_collection(model)
        for index in model.__indexes__():
            pymongo_index = (
                index.get_pymongo_index()
                if isinstance(index, ODMBaseIndex)
                else index
            )
            try:
                collection.create_indexes([pymongo_index], session=driver_session)
            except pymongo.errors.OperationFailure as exc:
                if update_existing_indexes and getattr(exc, "code", None) in (
                    85,  # aka IndexOptionsConflict
                    86,  # aka IndexKeySpecsConflict for MongoDB > 5
                ):
                    collection.drop_index(
                        pymongo_index.document["name"], session=driver_session
                    )
                    collection.create_indexes(
                        [pymongo_index], session=driver_session
                    )
                else:
                    raise

count(model, *queries, session=None)

Get the count of documents matching a query

Parameters:

Name Type Description Default
model Type[ModelType]

model to perform the operation on

required
*queries Union[QueryExpression, Dict, bool]

query filters to apply

()
session SyncSessionType

an optional session to use for the operation

None

Returns:

Type Description
int

number of document matching the query

Source code in odmantic/engine.py
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
def count(
    self,
    model: Type[ModelType],
    *queries: Union[QueryExpression, Dict, bool],
    session: SyncSessionType = None,
) -> int:
    """Get the count of documents matching a query

    Args:
        model: model to perform the operation on
        *queries: query filters to apply
        session: an optional session to use for the operation

    Returns:
        number of document matching the query

    <!---
    #noqa: DAR401 TypeError
    -->
    """
    if not lenient_issubclass(model, Model):
        raise TypeError("Can only call count with a Model class")
    query = BaseEngine._build_query(*queries)
    collection = self.database[model.__collection__]
    count = collection.count_documents(query, session=self._get_session(session))
    return int(count)

delete(instance, session=None)

Delete an instance from the database

Parameters:

Name Type Description Default
instance ModelType

the instance to delete

required
session SyncSessionType

an optional session to use for the operation

None

Raises:

Type Description
DocumentNotFoundError

the instance has not been persisted to the database

Source code in odmantic/engine.py
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
def delete(
    self,
    instance: ModelType,
    session: SyncSessionType = None,
) -> None:
    """Delete an instance from the database

    Args:
        instance: the instance to delete
        session: an optional session to use for the operation

    Raises:
        DocumentNotFoundError: the instance has not been persisted to the database

    """
    # TODO handle cascade deletion
    collection = self.database[instance.__collection__]
    pk_name = instance.__primary_field__
    result = collection.delete_many(
        {"_id": getattr(instance, pk_name)}, session=self._get_session(session)
    )
    count = result.deleted_count
    if count == 0:
        raise DocumentNotFoundError(instance)

find(model, *queries, sort=None, skip=0, limit=None, session=None)

Search for Model instances matching the query filter provided

Parameters:

Name Type Description Default
model Type[ModelType]

model to perform the operation on

required
*queries Union[QueryExpression, Dict, bool]

query filter to apply

()
sort Optional[Any]

sort expression

None
skip int

number of document to skip

0
limit Optional[int]

maximum number of instance fetched

None
session SyncSessionType

an optional session to use for the operation

None

Raises:

Type Description
DocumentParsingError

unable to parse one of the resulting documents

Returns:

Type Description
SyncCursor[ModelType]
Source code in odmantic/engine.py
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
def find(
    self,
    model: Type[ModelType],
    *queries: Union[
        QueryExpression, Dict, bool
    ],  # bool: allow using binary operators with mypy
    sort: Optional[Any] = None,
    skip: int = 0,
    limit: Optional[int] = None,
    session: SyncSessionType = None,
) -> SyncCursor[ModelType]:
    """Search for Model instances matching the query filter provided

    Args:
        model: model to perform the operation on
        *queries: query filter to apply
        sort: sort expression
        skip: number of document to skip
        limit: maximum number of instance fetched
        session: an optional session to use for the operation

    Raises:
        DocumentParsingError: unable to parse one of the resulting documents

    Returns:
        [odmantic.engine.SyncCursor][] of the query

    <!---
    #noqa: DAR401 ValueError
    #noqa: DAR401 TypeError
    #noqa: DAR402 DocumentParsingError
    -->
    """
    pipeline = self._prepare_find_pipeline(
        model,
        *queries,
        sort=sort,
        skip=skip,
        limit=limit,
    )
    collection = self.get_collection(model)
    cursor = collection.aggregate(pipeline, session=self._get_session(session))
    return SyncCursor(model, cursor)

find_one(model, *queries, sort=None, session=None)

Search for a Model instance matching the query filter provided

Parameters:

Name Type Description Default
model Type[ModelType]

model to perform the operation on

required
*queries Union[QueryExpression, Dict, bool]

query filter to apply

()
sort Optional[Any]

sort expression

None
session SyncSessionType

an optional session to use for the operation

None

Raises:

Type Description
DocumentParsingError

unable to parse the resulting document

Returns:

Type Description
Optional[ModelType]

the fetched instance if found otherwise None

Source code in odmantic/engine.py
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
def find_one(
    self,
    model: Type[ModelType],
    *queries: Union[
        QueryExpression, Dict, bool
    ],  # bool: allow using binary operators w/o plugin
    sort: Optional[Any] = None,
    session: SyncSessionType = None,
) -> Optional[ModelType]:
    """Search for a Model instance matching the query filter provided

    Args:
        model: model to perform the operation on
        *queries: query filter to apply
        sort: sort expression
        session: an optional session to use for the operation

    Raises:
        DocumentParsingError: unable to parse the resulting document

    Returns:
        the fetched instance if found otherwise None

    <!---
    #noqa: DAR401 TypeError
    #noqa: DAR402 DocumentParsingError
    -->
    """
    if not lenient_issubclass(model, Model):
        raise TypeError("Can only call find_one with a Model class")
    results = list(self.find(model, *queries, sort=sort, limit=1, session=session))
    if len(results) == 0:
        return None
    return results[0]

get_collection(model)

Get the pymongo collection associated to a Model.

Parameters:

Name Type Description Default
model Type[ModelType]

model class

required

Returns:

Type Description
Collection

the pymongo collection object

Source code in odmantic/engine.py
739
740
741
742
743
744
745
746
747
748
749
def get_collection(self, model: Type[ModelType]) -> "Collection":
    """Get the pymongo collection associated to a Model.

    Args:
        model: model class

    Returns:
        the pymongo collection object
    """
    collection = self.database[model.__collection__]
    return collection

remove(model, *queries, just_one=False, session=None)

Delete Model instances matching the query filter provided

Parameters:

Name Type Description Default
model Type[ModelType]

model to perform the operation on

required
*queries Union[QueryExpression, Dict, bool]

query filter to apply

()
just_one bool

limit the deletion to just one document

False
session SyncSessionType

an optional session to use for the operation

None

Returns:

Type Description
int

the number of instances deleted from the database.

Source code in odmantic/engine.py
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
def remove(
    self,
    model: Type[ModelType],
    *queries: Union[QueryExpression, Dict, bool],
    just_one: bool = False,
    session: SyncSessionType = None,
) -> int:
    """Delete Model instances matching the query filter provided

    Args:
        model: model to perform the operation on
        *queries: query filter to apply
        just_one: limit the deletion to just one document
        session: an optional session to use for the operation

    Returns:
        the number of instances deleted from the database.
    """
    query = SyncEngine._build_query(*queries)
    collection = self.get_collection(model)

    if just_one:
        result = collection.delete_one(query, session=self._get_session(session))
    else:
        result = collection.delete_many(query, session=self._get_session(session))

    return result.deleted_count

save(instance, *, session=None)

Persist an instance to the database

This method behaves as an 'upsert' operation. If a document already exists with the same primary key, it will be overwritten.

All the other models referenced by this instance will be saved as well.

Parameters:

Name Type Description Default
instance ModelType

instance to persist

required
session SyncSessionType

An optional session to use for the operation. If not provided, an internal session will be used to persist the instance and sub-instances.

None

Returns:

Type Description
ModelType

the saved instance

Raises:

Type Description
DuplicateKeyError

the instance is duplicated according to a unique index.

NOTE

The save operation actually modify the instance argument in place. However, the instance is still returned for convenience.

Source code in odmantic/engine.py
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
def save(
    self,
    instance: ModelType,
    *,
    session: SyncSessionType = None,
) -> ModelType:
    """Persist an instance to the database

    This method behaves as an 'upsert' operation. If a document already exists
    with the same primary key, it will be overwritten.

    All the other models referenced by this instance will be saved as well.

    Args:
        instance: instance to persist
        session: An optional session to use for the operation. If not provided, an
                 internal session will be used to persist the instance and
                 sub-instances.

    Returns:
        the saved instance

    Raises:
        DuplicateKeyError: the instance is duplicated according to a unique index.

    NOTE:
        The save operation actually modify the instance argument in place. However,
        the instance is still returned for convenience.

    <!---
    #noqa: DAR401 TypeError
    #noqa: DAR402 DuplicateKeyError
    -->
    """
    if not isinstance(instance, Model):
        raise TypeError("Can only call find_one with a Model class")

    if session:
        self._save(instance, self._get_session(session))  # type: ignore
    else:
        with self.client.start_session() as local_session:
            self._save(instance, local_session)
    return instance

save_all(instances, *, session=None)

Persist instances to the database

This method behaves as multiple 'upsert' operations. If one of the document already exists with the same primary key, it will be overwritten.

All the other models referenced by this instance will be recursively saved as well.

Parameters:

Name Type Description Default
instances Sequence[ModelType]

instances to persist

required
session SyncSessionType

An optional session to use for the operation. If not provided an internal session will be used to persist the instances.

None

Returns:

Type Description
List[ModelType]

the saved instances

Raises:

Type Description
DuplicateKeyError

an instance is duplicated according to a unique index.

NOTE

The save_all operation actually modify the arguments in place. However, the instances are still returned for convenience.

Source code in odmantic/engine.py
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
def save_all(
    self,
    instances: Sequence[ModelType],
    *,
    session: SyncSessionType = None,
) -> List[ModelType]:
    """Persist instances to the database

    This method behaves as multiple 'upsert' operations. If one of the document
    already exists with the same primary key, it will be overwritten.

    All the other models referenced by this instance will be recursively saved as
    well.

    Args:
        instances: instances to persist
        session: An optional session to use for the operation. If not provided an
                 internal session will be used to persist the instances.

    Returns:
        the saved instances

    Raises:
        DuplicateKeyError: an instance is duplicated according to a unique index.

    NOTE:
        The save_all operation actually modify the arguments in place. However, the
        instances are still returned for convenience.
    <!---
    #noqa: DAR402 DuplicateKeyError
    -->
    """
    if session:
        added_instances = [
            self._save(instance, self._get_session(session))  # type: ignore
            for instance in instances
        ]
    else:
        with self.client.start_session() as local_session:
            added_instances = [
                self._save(instance, local_session) for instance in instances
            ]
    return added_instances

session()

Get a new session for the engine to allow ordering sequential operations.

Returns:

Type Description
SyncSession

a new session object

Example usage:

engine = SyncEngine(...)
with engine.session() as session:
    john = session.find(User, User.name == "John")
    john.name = "Doe"
    session.save(john)
Source code in odmantic/engine.py
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
def session(self) -> SyncSession:
    """Get a new session for the engine to allow ordering sequential operations.

    Returns:
        a new session object

    Example usage:

    ```python
    engine = SyncEngine(...)
    with engine.session() as session:
        john = session.find(User, User.name == "John")
        john.name = "Doe"
        session.save(john)
    ```
    """
    return SyncSession(self)

transaction()

Get a new transaction for the engine to aggregate sequential operations.

Returns:

Type Description
SyncTransaction

a new transaction object

Example usage:

engine = SyncEngine(...)
with engine.transaction() as transaction:
    john = transaction.find(User, User.name == "John")
    john.name = "Doe"
    transaction.save(john)
    transaction.commit()

Warning

MongoDB transaction are only supported on replicated clusters: either directly a replicaSet or a sharded cluster with replication enabled.

Source code in odmantic/engine.py
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
def transaction(self) -> SyncTransaction:
    """Get a new transaction for the engine to aggregate sequential operations.

    Returns:
        a new transaction object

    Example usage:
    ```python
    engine = SyncEngine(...)
    with engine.transaction() as transaction:
        john = transaction.find(User, User.name == "John")
        john.name = "Doe"
        transaction.save(john)
        transaction.commit()
    ```

    Warning:
        MongoDB transaction are only supported on replicated clusters: either
        directly a replicaSet or a sharded cluster with replication enabled.
    """
    return SyncTransaction(self)

Bases: BaseCursor[ModelType], Iterable[ModelType]

This object has to be built from the odmantic.engine.SyncEngine.find method.

A SyncCursor object supports iterating over the query results using for.

To get a list of all the results you can wrap it with list, as in list(cursor).

Source code in odmantic/engine.py
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
class SyncCursor(BaseCursor[ModelType], Iterable[ModelType]):
    """This object has to be built from the [odmantic.engine.SyncEngine.find][] method.

    A SyncCursor object supports iterating over the query results using **`for`**.

    To get a list of all the results you can wrap it with `list`, as in `list(cursor)`.
    """

    _cursor: "CommandCursor"

    def __init__(self, model: Type[ModelType], cursor: "CommandCursor"):
        super().__init__(model=model, cursor=cursor)

    def __iter__(self) -> Iterator[ModelType]:
        if self._results is not None:
            for res in self._results:
                yield res
            return
        results = []
        for raw_doc in self._cursor:
            instance = self._parse_document(raw_doc)
            results.append(instance)
            yield instance
        self._results = results