Skip to content

Engine

This engine documentation present how to work with both the Sync (SyncEngine) and the Async (AIOEngine) engines. The methods available for both are very close but the main difference is that the Async engine exposes coroutines instead of functions for the Sync engine.

Creating the engine

In the previous examples, we created the engine using default parameters:

  • MongoDB: running on localhost port 27017

  • Database name: test

It's possible to provide a custom client (AsyncIOMotorClient or PyMongoClient) to the engine constructor. In the same way, the database name can be changed using the database keyword argument.

1
2
3
4
5
6
from motor.motor_asyncio import AsyncIOMotorClient

from odmantic import AIOEngine

client = AsyncIOMotorClient("mongodb://localhost:27017/")
engine = AIOEngine(client=client, database="example_db")
1
2
3
4
5
6
from pymongo import MongoClient

from odmantic import SyncEngine

client = MongoClient("mongodb://localhost:27017/")
engine = SyncEngine(client=client, database="example_db")

For additional information about the MongoDB connection strings, see this section of the MongoDB documentation.

Usage with DNS SRV records

If you decide to use the DNS Seed List Connection Format (i.e mongodb+srv://...), you will need to install the dnspython package.

Create

There are two ways of persisting instances to the database (i.e creating new documents):

  • engine.save: to save a single instance

  • engine.save_all: to save multiple instances at once

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
from odmantic import AIOEngine, Model


class Player(Model):
    name: str
    game: str


engine = AIOEngine()

leeroy = Player(name="Leeroy Jenkins", game="World of Warcraft")
await engine.save(leeroy)

players = [
    Player(name="Shroud", game="Counter-Strike"),
    Player(name="Serral", game="Starcraft"),
    Player(name="TLO", game="Starcraft"),
]
await engine.save_all(players)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
from odmantic import SyncEngine, Model


class Player(Model):
    name: str
    game: str


engine = SyncEngine()

leeroy = Player(name="Leeroy Jenkins", game="World of Warcraft")
engine.save(leeroy)

players = [
    Player(name="Shroud", game="Counter-Strike"),
    Player(name="Serral", game="Starcraft"),
    Player(name="TLO", game="Starcraft"),
]
engine.save_all(players)
Resulting documents in the player collection
{
  "_id": ObjectId("5f85f36d6dfecacc68428a46"),
  "game": "World of Warcraft",
  "name": "Leeroy Jenkins"
}
{
  "_id": ObjectId("5f85f36d6dfecacc68428a47"),
  "game": "Counter-Strike",
  "name": "Shroud"
}
{
  "_id": ObjectId("5f85f36d6dfecacc68428a49"),
  "game": "Starcraft",
  "name": "TLO"
}
{
  "_id": ObjectId("5f85f36d6dfecacc68428a48"),
  "game": "Starcraft",
  "name": "Serral"
}

Referenced instances

When calling engine.save or engine.save_all, the referenced models will are persisted as well.

Upsert behavior

The save and save_all methods behave as upsert operations (more details). Hence, you might overwrite documents if you save instances with an existing primary key already existing in the database.

Read

Examples database content

The next examples will consider that you have a player collection populated with the documents previously created.

Fetch a single instance

As with regular MongoDB driver, you can use the engine.find_one method to get at most one instance of a specific Model. This method will either return an instance matching the specified criteriums or None if no instances have been found.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
from odmantic import AIOEngine, Model


class Player(Model):
    name: str
    game: str


engine = AIOEngine()

player = await engine.find_one(Player, Player.name == "Serral")
print(repr(player))
#> Player(id=ObjectId(...), name="Serral", game="Starcraft")

another_player = await engine.find_one(
    Player, Player.name == "Player_Not_Stored_In_Database"
)
print(another_player)
#> None
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
from odmantic import Model, SyncEngine


class Player(Model):
    name: str
    game: str


engine = SyncEngine()

player = engine.find_one(Player, Player.name == "Serral")
print(repr(player))
#> Player(id=ObjectId(...), name="Serral", game="Starcraft")

another_player = engine.find_one(
    Player, Player.name == "Player_Not_Stored_In_Database"
)
print(another_player)
#> None

Missing values in documents

While parsing the MongoDB documents into Model instances, ODMantic will use the provided default values to populate the missing fields.

See this section for more details about document parsing.

Fetch using sort

We can use the sort parameter to fetch the Player instance with the first name in ascending order:

await engine.find_one(Player, sort=Player.name)
Find out more on sort in the dedicated section.

Fetch multiple instances

To get more than one instance from the database at once, you can use the engine.find method.

This method will return a cursor: an AIOCursor object for the AIOEngine and a SyncCursor object for the SyncEngine.

Those cursors can mainly be used in two different ways:

Usage as an iterator

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
from odmantic import AIOEngine, Model


class Player(Model):
    name: str
    game: str


engine = AIOEngine()

async for player in engine.find(Player, Player.game == "Starcraft"):
    print(repr(player))

#> Player(id=ObjectId(...), name='TLO', game='Starcraft')
#> Player(id=ObjectId(...), name='Serral', game='Starcraft')
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
from odmantic import Model, SyncEngine


class Player(Model):
    name: str
    game: str


engine = SyncEngine()

for player in engine.find(Player, Player.game == "Starcraft"):
    print(repr(player))

#> Player(id=ObjectId(...), name='TLO', game='Starcraft')
#> Player(id=ObjectId(...), name='Serral', game='Starcraft')

Ordering instances

The sort parameter allows to order the query in ascending or descending order on a single or multiple fields.

engine.find(Player, sort=(Player.name, Player.game.desc()))
Find out more on sort in the dedicated section.

Usage as an awaitable/list

Even if the iterator usage should be preferred, in some cases it might be required to gather all the documents from the database before processing them.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
from odmantic import AIOEngine, Model


class Player(Model):
    name: str
    game: str


engine = AIOEngine()

players = await engine.find(Player, Player.game != "Starcraft")
print(players)
#> [
#>     Player(id=ObjectId(...), name="Leeroy Jenkins", game="World of Warcraft"),
#>     Player(id=ObjectId(...), name="Shroud", game="Counter-Strike"),
#> ]
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
from odmantic import SyncEngine, Model


class Player(Model):
    name: str
    game: str


engine = SyncEngine()

players = list(engine.find(Player, Player.game != "Starcraft"))
print(players)
#> [
#>     Player(id=ObjectId(...), name="Leeroy Jenkins", game="World of Warcraft"),
#>     Player(id=ObjectId(...), name="Shroud", game="Counter-Strike"),
#> ]

Pagination

When using AIOEngine.find or SyncEngine.find you can as well use the skip and limit keyword arguments , respectively to skip a specified number of instances and to limit the number of fetched instances.

Referenced instances

When calling engine.find or engine.find_one, the referenced models will be recursively resolved as well by design.

Passing the model class to find and find_one

When using the method to retrieve instances from the database, you have to specify the Model you want to query on as the first positional parameter. Internally, this enables ODMantic to properly type the results.

Count instances

You can count instances in the database by using the engine.count method and as with other read methods, it's still possible to use this method with filtering queries.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
from odmantic import AIOEngine, Model


class Player(Model):
    name: str
    game: str


engine = AIOEngine()

player_count = await engine.count(Player)
print(player_count)
#> 4
cs_count = await engine.count(Player, Player.game == "Counter-Strike")
print(cs_count)
#> 1
valorant_count = await engine.count(Player, Player.game == "Valorant")
print(valorant_count)
#> 0
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
from odmantic import Model, SyncEngine


class Player(Model):
    name: str
    game: str


engine = SyncEngine()

player_count = engine.count(Player)
print(player_count)
#> 4
cs_count = engine.count(Player, Player.game == "Counter-Strike")
print(cs_count)
#> 1
valorant_count = engine.count(Player, Player.game == "Valorant")
print(valorant_count)
#> 0

Combining multiple queries in read operations

While using find, find_one or count, you may pass as many queries as you want as positional arguments. Those will be implicitly combined as single and_ query.

Update

Updating an instance in the database can be done by modifying the instance locally and saving it again to the database.

The engine.save and engine.save_all methods are actually behaving as upsert operations. In other words, if the instance already exists it will be updated. Otherwise, the related document will be created in the database.

Modifying one field

Modifying a single field can be achieved by directly changing the instance attribute and saving the instance.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
from odmantic import AIOEngine, Model


class Player(Model):
    name: str
    game: str


engine = AIOEngine()
shroud = await engine.find_one(Player, Player.name == "Shroud")
print(shroud.game)
#> Counter-Strike
shroud.game = "Valorant"
await engine.save(shroud)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
from odmantic import SyncEngine, Model


class Player(Model):
    name: str
    game: str


engine = SyncEngine()
shroud = engine.find_one(Player, Player.name == "Shroud")
print(shroud.game)
#> Counter-Strike
shroud.game = "Valorant"
engine.save(shroud)
Resulting documents in the player collection
{
  "_id": ObjectId("5f85f36d6dfecacc68428a46"),
  "game": "World of Warcraft",
  "name": "Leeroy Jenkins"
}
{
  "_id": ObjectId("5f85f36d6dfecacc68428a47"),
  "game": "Valorant",
  "name": "Shroud"
}
{
  "_id": ObjectId("5f85f36d6dfecacc68428a49"),
  "game": "Starcraft",
  "name": "TLO"
}
{
  "_id": ObjectId("5f85f36d6dfecacc68428a48"),
  "game": "Starcraft",
  "name": "Serral"
}

Patching multiple fields at once

The easiest way to change multiple fields at once is to use the Model.model_update method. This method will take either a Pydantic object or a dictionary and update the matching fields of the instance.

From a Pydantic Model

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
from pydantic import BaseModel

from odmantic import AIOEngine, Model


class Player(Model):
    name: str
    game: str


engine = AIOEngine()


player_tlo = await engine.find_one(Player, Player.name == "TLO")
print(repr(player_tlo))
#> Player(id=ObjectId(...), name='TLO', game='Starcraft')

# Create the structure of the patch object with pydantic
class PatchPlayerSchema(BaseModel):
    name: str
    game: str


# Create the patch object containing the new values
patch_object = PatchPlayerSchema(name="TheLittleOne", game="Starcraft II")
# Apply the patch to the instance
player_tlo.model_update(patch_object)

print(repr(player_tlo))
#> Player(id=ObjectId(...), name='TheLittleOne', game='Starcraft II')

# Finally persist again the new instance
await engine.save(player_tlo)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
from pydantic import BaseModel

from odmantic import Model, SyncEngine


class Player(Model):
    name: str
    game: str


engine = SyncEngine()


player_tlo = engine.find_one(Player, Player.name == "TLO")
print(repr(player_tlo))
#> Player(id=ObjectId(...), name='TLO', game='Starcraft')

# Create the structure of the patch object with pydantic
class PatchPlayerSchema(BaseModel):
    name: str
    game: str


# Create the patch object containing the new values
patch_object = PatchPlayerSchema(name="TheLittleOne", game="Starcraft II")
# Apply the patch to the instance
player_tlo.model_update(patch_object)

print(repr(player_tlo))
#> Player(id=ObjectId(...), name='TheLittleOne', game='Starcraft II')

# Finally persist again the new instance
engine.save(player_tlo)

From a dictionary

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
from odmantic import AIOEngine, Model


class Player(Model):
    name: str
    game: str


engine = AIOEngine()

player_tlo = await engine.find_one(Player, Player.name == "TLO")
print(repr(player_tlo))
#> Player(id=ObjectId(...), name='TLO', game='Starcraft')

# Create the patch dictionary containing the new values
patch_object = {"name": "TheLittleOne", "game": "Starcraft II"}
# Update the local instance
player_tlo.model_update(patch_object)

print(repr(player_tlo))
#> Player(id=ObjectId(...), name='TheLittleOne', game='Starcraft II')

# Finally persist the instance
await engine.save(player_tlo)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
from odmantic import Model, SyncEngine


class Player(Model):
    name: str
    game: str


engine = SyncEngine()

player_tlo = engine.find_one(Player, Player.name == "TLO")
print(repr(player_tlo))
#> Player(id=ObjectId(...), name='TLO', game='Starcraft')

# Create the patch dictionary containing the new values
patch_object = {"name": "TheLittleOne", "game": "Starcraft II"}
# Update the local instance
player_tlo.model_update(patch_object)

print(repr(player_tlo))
#> Player(id=ObjectId(...), name='TheLittleOne', game='Starcraft II')

# Finally persist the instance
engine.save(player_tlo)

Resulting document associated to the player

{
  "_id": ObjectId("5f85f36d6dfecacc68428a49"),
  "game": "Starcraft II",
  "name": "TheLittleOne"
}

Changing the primary field

Directly changing the primary field value as explained above is not possible and a NotImplementedError exception will be raised if you try to do so.

The easiest way to change an instance primary field is to perform a local copy of the instance using the Model.copy method.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
from bson import ObjectId

from odmantic import AIOEngine, Model


class Player(Model):
    name: str
    game: str


engine = AIOEngine()

shroud = await engine.find_one(Player, Player.name == "Shroud")
print(shroud.id)
#> 5f86074f6dfecacc68428a62
new_id = ObjectId("ffffffffffffffffffffffff")
# Copy the player instance with a new primary key
new_shroud = shroud.copy(update={"id": new_id})
# Delete the initial player instance
await engine.delete(shroud)
# Finally persist again the new instance
await engine.save(new_shroud)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
from bson import ObjectId

from odmantic import SyncEngine, Model


class Player(Model):
    name: str
    game: str


engine = SyncEngine()

shroud = engine.find_one(Player, Player.name == "Shroud")
print(shroud.id)
#> 5f86074f6dfecacc68428a62
new_id = ObjectId("ffffffffffffffffffffffff")
# Copy the player instance with a new primary key
new_shroud = shroud.copy(update={"id": new_id})
# Delete the initial player instance
engine.delete(shroud)
# Finally persist again the new instance
engine.save(new_shroud)

Resulting document associated to the player

{
    "_id": ObjectId("ffffffffffffffffffffffff"),
    "game": "Valorant",
    "name": "Shroud"
}

Update data used with the copy

The data updated by the copy method is not validated: you should absolutely trust this data.

Delete

Delete a single instance

You can delete instance by passing them to the engine.delete method.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
from odmantic import AIOEngine, Model


class Player(Model):
    name: str
    game: str


engine = AIOEngine()

players = await engine.find(Player)

for player in players:
    await engine.delete(player)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
from odmantic import SyncEngine, Model


class Player(Model):
    name: str
    game: str


engine = SyncEngine()

players = engine.find(Player)

for player in players:
    engine.delete(player)

Remove

You can delete instances that match a filter by using the engine.remove method.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
from odmantic import AIOEngine, Model


class Player(Model):
    name: str
    game: str


engine = AIOEngine()

delete_count = await engine.remove(Player, Player.game == "Warzone")
print(delete_count)
#> 2
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
from odmantic import SyncEngine, Model


class Player(Model):
    name: str
    game: str


engine = SyncEngine()

delete_count = engine.remove(Player, Player.game == "Warzone")
print(delete_count)
#> 2

Just one

You can limit engine.remove to removing only one instance by passing just_one.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
from odmantic import AIOEngine, Model


class Player(Model):
    name: str
    game: str


engine = AIOEngine()

delete_count = await engine.remove(
    Player, Player.game == "Warzone", just_one=True
)
print(delete_count)
#> 1
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
from odmantic import Model, SyncEngine


class Player(Model):
    name: str
    game: str


engine = SyncEngine()

delete_count = engine.remove(
    Player, Player.game == "Warzone", just_one=True
)
print(delete_count)
#> 1

Consistency

Using a Session

Why are sessions needed ?

A session is a way to guarantee that the data you read is consistent with the data you write. This is especially useful when you need to perform multiple operations on the same data.

See this document for more details on causal consistency.

You can create a session by using the engine.session method. This method will return either a SyncSession or an AIOSession object, depending on the type of engine used. Those session objects are context manager and can be used along with the with or the async with keywords. Once the context is entered the session object exposes the same database operation methods as the related engine object but execute each operation in the session context.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
from odmantic import AIOEngine, Model


class Player(Model):
    name: str
    game: str


engine = AIOEngine()

leeroy = Player(name="Leeroy Jenkins", game="World of Warcraft")

async with engine.session() as session:
    await session.save_all(
        [
            Player(name="Shroud", game="Counter-Strike"),
            Player(name="Serral", game="Starcraft"),
            Player(name="TLO", game="Starcraft"),
        ]
    )
    player_count = await session.count(Player)
    print(player_count)
    #> 3
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
from odmantic import SyncEngine, Model


class Player(Model):
    name: str
    game: str


engine = SyncEngine()

leeroy = Player(name="Leeroy Jenkins", game="World of Warcraft")

with engine.session() as session:
    session.save_all(
        [
            Player(name="Shroud", game="Counter-Strike"),
            Player(name="Serral", game="Starcraft"),
            Player(name="TLO", game="Starcraft"),
        ]
    )
    player_count = session.count(Player)
    print(player_count)
    #> 3

Directly using driver sessions

Every single engine method also accepts a session parameter. You can use this parameter to provide an existing driver (motor or PyMongo) session that you created manually.

Accessing the underlying driver session object

The session.get_driver_session method exposes the underlying driver session. This is useful if you want to use the driver session directly to perform raw operations.

Using a Transaction

Why are transactions needed ?

A transaction is a mechanism that allows you to execute multiple operations in a single atomic operation. This is useful when you want to ensure that a set of operations is atomicly performed on a specific document.

MongoDB transaction support

Transactions are only supported in a replica sets (Mongo 4.0+) or sharded clusters with replication enabled (Mongo 4.2+), if you use them in a standalone MongoDB instance an error will be raised.

You can create a transaction directly from the engine by using the engine.transaction method. This methods will either return a SyncTransaction or an AIOTransaction object. As for sessions, transaction objects exposes the same database operation methods as the related engine object but execute each operation in a transactional context.

In order to terminate a transaction you must either call the commit method to persist all the changes or call the abort method to drop the changes introduced in the transaction.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
from odmantic import AIOEngine, Model


class Player(Model):
    name: str
    game: str


engine = AIOEngine()

async with engine.transaction() as transaction:
    await transaction.save(Player(name="Leeroy Jenkins", game="WoW"))
    await transaction.commit()

print(engine.count(Player))
#> 1

async with engine.transaction() as transaction:
    await transaction.save(Player(name="Shroud", game="Counter-Strike"))
    await transaction.save(Player(name="Serral", game="Starcraft"))
    await transaction.abort()

print(engine.count(Player))
#> 1
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
from odmantic import Model, SyncEngine


class Player(Model):
    name: str
    game: str


engine = SyncEngine()

with engine.transaction() as transaction:
    transaction.save(Player(name="Leeroy Jenkins", game="WoW"))
    transaction.commit()

print(engine.count(Player))
#> 1

with engine.transaction() as transaction:
    transaction.save(Player(name="Shroud", game="Counter-Strike"))
    transaction.save(Player(name="Serral", game="Starcraft"))
    transaction.abort()

print(engine.count(Player))
#> 1

It is also possible to create a transaction within an existing session by using the session.transaction method:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
from odmantic import AIOEngine, Model


class Player(Model):
    name: str
    game: str


engine = AIOEngine()

async with engine.session() as session:
    leeroy = await session.save(Player(name="Leeroy Jenkins", game="WoW"))
    shroud = await session.save(Player(name="Shroud", game="Counter-Strike"))
    async with session.transaction() as transaction:
        leeroy.game = "Fortnite"
        await transaction.save(leeroy)
        shroud.game = "Fortnite"
        await transaction.save(shroud)
        await transaction.commit()

print(await engine.count(Player, Player.game == "Fortnite"))
#> 2
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
from odmantic import Model, SyncEngine


class Player(Model):
    name: str
    game: str


engine = SyncEngine()

with engine.session() as session:
    leeroy = session.save(Player(name="Leeroy Jenkins", game="WoW"))
    shroud = session.save(Player(name="Shroud", game="Counter-Strike"))
    with session.transaction() as transaction:
        leeroy.game = "Fortnite"
        transaction.save(leeroy)
        shroud.game = "Fortnite"
        transaction.save(shroud)
        transaction.commit()

print(engine.count(Player, Player.game == "Fortnite"))
#> 2