Scoped Dependencies
Scoped dependencies live for the duration of a specific scope, typically a web request or processing context.
Basic Scoped Usage
from nexusdi import scoped, scope_cleanup
import time
@scoped
class RequestContext:
def __init__(self):
self.request_id = str(uuid.uuid4())
self.start_time = time.time()
self.user_id = None
def set_user(self, user_id: int):
self.user_id = user_id
@scoped
class AuditLogger:
def __init__(self, context: RequestContext):
self.context = context
self.events = []
def log_event(self, event: str):
self.events.append({
"event": event,
"request_id": self.context.request_id,
"user_id": self.context.user_id,
"timestamp": time.time()
})
Scope Lifecycle
Within the same scope, dependencies are reused:
def handle_request(user_id: int):
try:
# First resolution creates the scope
context = _container.resolve(RequestContext)
context.set_user(user_id)
# Second resolution returns same instance
audit = _container.resolve(AuditLogger)
audit.log_event("request_started")
# Third resolution also returns same context
another_context = _container.resolve(RequestContext)
assert context is another_context # Same instance
# Process request...
result = process_user_data(user_id)
audit.log_event("request_completed")
return result
finally:
# Clean up scoped instances
scope_cleanup()
Web Application Integration
Flask Integration
from flask import Flask, g
from nexusdi import scoped, scope_cleanup, _container
app = Flask(__name__)
@scoped
class RequestData:
def __init__(self):
self.request_start = time.time()
self.user_id = None
@app.before_request
def before_request():
# Scoped dependencies are automatically created per request
request_data = _container.resolve(RequestData)
g.request_data = request_data
@app.teardown_request
def teardown_request(exception):
# Clean up scoped dependencies
scope_cleanup()
@app.route('/users/<int:user_id>')
def get_user(user_id):
request_data = _container.resolve(RequestData)
request_data.user_id = user_id
# Process user...
return f"User {user_id}"
FastAPI Integration
from fastapi import FastAPI, Depends
from contextlib import asynccontextmanager
@asynccontextmanager
async def request_scope():
try:
yield
finally:
scope_cleanup()
app = FastAPI()
@app.middleware("http")
async def scope_middleware(request, call_next):
async with request_scope():
response = await call_next(request)
return response
@app.get("/users/{user_id}")
async def get_user(user_id: int):
request_data = _container.resolve(RequestData)
request_data.user_id = user_id
return {"user_id": user_id}
Thread-Based Scopes
Each thread gets its own scope:
import threading
from nexusdi import scoped, _container
@scoped
class ThreadData:
def __init__(self):
self.thread_id = threading.get_ident()
self.data = {}
def worker(worker_id: int):
# Each thread gets its own ThreadData instance
thread_data = _container.resolve(ThreadData)
thread_data.data['worker_id'] = worker_id
# Same instance within the thread
same_data = _container.resolve(ThreadData)
assert thread_data is same_data
print(f"Worker {worker_id} on thread {thread_data.thread_id}")
# Start multiple threads
threads = []
for i in range(3):
t = threading.Thread(target=worker, args=(i,))
threads.append(t)
t.start()
for t in threads:
t.join()
Automatic Scope Management
NexusDI automatically creates scopes when needed:
@scoped
class SessionData:
def __init__(self):
self.session_id = str(uuid.uuid4())
print(f"Created session: {self.session_id}")
# First access creates new scope
session1 = _container.resolve(SessionData)
# Second access returns same instance
session2 = _container.resolve(SessionData)
assert session1 is session2
# After cleanup, new scope is created
scope_cleanup()
session3 = _container.resolve(SessionData)
assert session1 is not session3
Error Handling in Scoped Dependencies
from nexusdi.exceptions import LifecycleException
@scoped
class DatabaseTransaction:
def __init__(self):
self.transaction = self._begin_transaction()
def _begin_transaction(self):
# Could fail during initialization
return "transaction_handle"
def commit(self):
print("Transaction committed")
def rollback(self):
print("Transaction rolled back")
def process_with_transaction():
try:
tx = _container.resolve(DatabaseTransaction)
# Do work...
tx.commit()
except Exception as e:
tx = _container.resolve(DatabaseTransaction)
tx.rollback()
raise
finally:
scope_cleanup()
Scoped vs Singleton vs Transient
from nexusdi import singleton, transient, scoped
@singleton
class AppConfig: # Shared across entire app
def __init__(self):
self.database_url = "postgresql://..."
@scoped
class UserSession: # One per request/scope
def __init__(self, config: AppConfig):
self.config = config
self.logged_in_user = None
@transient
class EmailSender: # New instance each time
def __init__(self, session: UserSession):
self.session = session
Best Practices
- Always call
scope_cleanup()when scope ends - Use scoped dependencies for request-specific data
- Avoid heavy initialization in scoped constructors
- Consider thread safety for multi-threaded applications
- Clean up resources properly in scope cleanup
Common Patterns
Request Processing Pipeline
@scoped
class RequestProcessor:
def __init__(self):
self.steps = []
def add_step(self, step: str):
self.steps.append(step)
def get_summary(self):
return f"Processed {len(self.steps)} steps"
def process_pipeline():
processor = _container.resolve(RequestProcessor)
processor.add_step("validation")
processor.add_step("processing")
processor.add_step("response")
return processor.get_summary()