Optimistic locking
Optimistic locking prevents concurrent updates from overwriting each other. When two processes try to update the same item, the second one fails instead of silently overwriting the first.
How it works
Add a VersionAttribute to your model. pydynox handles the rest:
- First save sets version to 1
- Each save increments version by 1
- Before saving, pydynox checks that the version in DynamoDB matches the local version
- If versions don't match, save fails with
ConditionCheckFailedError
Basic usage
from pydynox import Model, ModelConfig
from pydynox.attributes import StringAttribute, VersionAttribute
class Document(Model):
model_config = ModelConfig(table="documents")
pk = StringAttribute(hash_key=True)
content = StringAttribute()
version = VersionAttribute()
# Create new document
doc = Document(pk="DOC#VERSION", content="Hello")
print(doc.version) # None
doc.save()
print(doc.version) # 1
# Update document
doc.content = "Hello World"
doc.save()
print(doc.version) # 2
# Load from DB - version is preserved
loaded = Document.get(pk="DOC#VERSION")
print(loaded.version) # 2
Concurrent updates
When two processes load the same item and try to update it:
from pydynox import Model, ModelConfig
from pydynox.attributes import StringAttribute, VersionAttribute
from pydynox.exceptions import ConditionCheckFailedError
class Document(Model):
model_config = ModelConfig(table="documents")
pk = StringAttribute(hash_key=True)
content = StringAttribute()
version = VersionAttribute()
# Create document
doc = Document(pk="DOC#CONCURRENT", content="Original")
doc.save()
# Two processes load the same document
process_a = Document.get(pk="DOC#CONCURRENT")
process_b = Document.get(pk="DOC#CONCURRENT")
# Both have version 1
print(process_a.version) # 1
print(process_b.version) # 1
# Process A updates first - succeeds
process_a.content = "Updated by A"
process_a.save()
print(process_a.version) # 2
# Process B tries to update - fails!
process_b.content = "Updated by B"
try:
process_b.save()
except ConditionCheckFailedError:
print("Conflict! Someone else updated the document.")
Handling conflicts
When a save fails due to version mismatch, reload the item and retry:
from pydynox import Model, ModelConfig
from pydynox.attributes import NumberAttribute, StringAttribute, VersionAttribute
from pydynox.exceptions import ConditionCheckFailedError
class Counter(Model):
model_config = ModelConfig(table="counters")
pk = StringAttribute(hash_key=True)
count = NumberAttribute()
version = VersionAttribute()
def increment_with_retry(pk: str, max_retries: int = 3) -> Counter:
"""Increment counter with retry on conflict."""
for attempt in range(max_retries):
counter = Counter.get(pk=pk)
if counter is None:
counter = Counter(pk=pk, count=0)
# Increment
counter.count = counter.count + 1
try:
counter.save()
return counter
except ConditionCheckFailedError:
if attempt == max_retries - 1:
raise
print(f"Conflict on attempt {attempt + 1}, retrying...")
raise RuntimeError("Should not reach here")
# Usage
counter = increment_with_retry("COUNTER#RETRY")
print(f"Count: {counter.count}, Version: {counter.version}")
Async with high concurrency
For async code with many concurrent operations, always use retry with backoff:
import asyncio
from pydynox import Model, ModelConfig
from pydynox.attributes import StringAttribute, VersionAttribute
from pydynox.exceptions import ConditionCheckFailedError
class Counter(Model):
model_config = ModelConfig(table="counters")
pk = StringAttribute(hash_key=True)
value = StringAttribute()
version = VersionAttribute()
async def increment_with_retry(pk: str, max_retries: int = 5) -> Counter:
"""Increment counter with retry on conflict."""
for attempt in range(max_retries):
counter = await Counter.async_get(pk=pk)
if counter is None:
counter = Counter(pk=pk, value="0")
counter.value = str(int(counter.value) + 1)
try:
await counter.async_save()
return counter
except ConditionCheckFailedError:
if attempt == max_retries - 1:
raise
# Small delay before retry
await asyncio.sleep(0.01 * (attempt + 1))
raise RuntimeError("Should not reach here")
async def main():
# Create counter
counter = Counter(pk="COUNTER#1", value="0")
await counter.async_save()
# Run 10 concurrent increments
tasks = [increment_with_retry("COUNTER#1") for _ in range(10)]
await asyncio.gather(*tasks)
# Final value should be 10
final = await Counter.async_get(pk="COUNTER#1")
print(f"Final value: {final.value}") # 10
print(f"Final version: {final.version}") # 11 (1 create + 10 updates)
if __name__ == "__main__":
asyncio.run(main())
Delete with version check
Delete also checks the version. If someone else updated the item, delete fails:
from pydynox import Model, ModelConfig
from pydynox.attributes import StringAttribute, VersionAttribute
from pydynox.exceptions import ConditionCheckFailedError
class Document(Model):
model_config = ModelConfig(table="documents")
pk = StringAttribute(hash_key=True)
content = StringAttribute()
version = VersionAttribute()
# Create and update document
doc = Document(pk="DOC#DELETE", content="Hello")
doc.save()
doc.content = "Updated"
doc.save()
print(f"Version: {doc.version}") # 2
# Load stale copy
stale = Document.get(pk="DOC#DELETE")
# Update again
doc.content = "Updated again"
doc.save()
print(f"Version: {doc.version}") # 3
# Try to delete with stale version - fails!
try:
stale.delete()
except ConditionCheckFailedError:
print("Can't delete - version mismatch")
# Delete with current version - succeeds
doc.delete()
print("Deleted successfully")
Combining with user conditions
You can add your own conditions. They're combined with the version check using AND:
from pydynox import Model, ModelConfig
from pydynox.attributes import StringAttribute, VersionAttribute
from pydynox.exceptions import ConditionCheckFailedError
class Document(Model):
model_config = ModelConfig(table="documents")
pk = StringAttribute(hash_key=True)
status = StringAttribute()
content = StringAttribute()
version = VersionAttribute()
# Create document
doc = Document(pk="DOC#CONDITION", status="draft", content="Hello")
doc.save()
# Update only if status is "draft"
# This combines with version check: (status = "draft" AND version = 1)
doc.content = "Updated content"
doc.save(condition=Document.status == "draft")
print(f"Updated! Version: {doc.version}") # 2
# Change status
doc.status = "published"
doc.save()
print(f"Published! Version: {doc.version}") # 3
# Try to update draft-only - fails because status is "published"
doc.content = "Another update"
try:
doc.save(condition=Document.status == "draft")
except ConditionCheckFailedError:
print("Can't update - not a draft")
When to use
| Use case | Examples | Recommendation |
|---|---|---|
| Counters and balances | Page views, account balances, inventory | ✅ Use it |
| Documents with edits | Wiki pages, configs, user profiles | ✅ Use it |
| State machines | Order status, workflow steps | ✅ Use it |
| Shared resources | Seat reservations, appointment slots | ✅ Use it |
| High-frequency updates | Hot keys, real-time counters | ❌ Use transactions |
| Simple increments | Like counts, view counts | ❌ Use update() with add() |
| Single writer per item | Background jobs, migrations | ❌ Skip it |
Things to know
Version increments before save. If save fails, your local object has a wrong version. Always reload after a failed save.
update() does not use versioning. Only save() and delete() check and increment the version. If you need atomic field updates with versioning, reload and save.
New items check for existence. Creating an item with VersionAttribute uses attribute_not_exists condition. Creating the same item twice fails.
Use retry with backoff. In high-concurrency scenarios, add exponential backoff between retries to reduce contention.