-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
122 lines (100 loc) · 3.8 KB
/
main.py
File metadata and controls
122 lines (100 loc) · 3.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
from fastapi import FastAPI, Depends, HTTPException
from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, Session
from pydantic import BaseModel
import os
import json
import asyncpg
from pgqueuer.queries import Queries
from pgqueuer.db import AsyncpgDriver
DATABASE_URL = os.getenv("DATABASE_URL", "postgresql+psycopg://fastapi_user:fastapi_password@localhost:5432/fastapi_db")
ASYNC_DATABASE_URL = DATABASE_URL.replace("postgresql+psycopg://", "postgresql://")
engine = create_engine(DATABASE_URL)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
name = Column(String, index=True)
# Create SQLAlchemy tables
Base.metadata.create_all(bind=engine)
app = FastAPI()
# Initialize pgqueuer tables
async def init_pgqueuer():
"""Initialize pgqueuer tables on startup (idempotent)"""
conn = await asyncpg.connect(ASYNC_DATABASE_URL)
try:
# Check if pgqueuer tables already exist
table_exists = await conn.fetchval(
"SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_name = 'pgqueuer')"
)
if not table_exists:
driver = AsyncpgDriver(conn)
from pgqueuer.queries import Queries
queries = Queries(driver)
await queries.install()
await queries.upgrade()
print("pgqueuer tables initialized successfully!")
else:
print("pgqueuer tables already exist, skipping initialization")
except Exception as e:
print(f"pgqueuer initialization: {e}")
finally:
await conn.close()
@app.on_event("startup")
async def startup_event():
await init_pgqueuer()
class UserCreate(BaseModel):
name: str
class UserResponse(BaseModel):
id: int
name: str
class Config:
orm_mode = True
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
@app.post("/users/", response_model=UserResponse)
async def create_user(user: UserCreate):
"""
Create user and audit job atomically in the same transaction.
Both operations succeed or both fail together.
"""
conn = await asyncpg.connect(ASYNC_DATABASE_URL)
try:
async with conn.transaction():
# 1. Create user using raw SQL in the same transaction
user_id = await conn.fetchval(
"INSERT INTO users (name) VALUES ($1) RETURNING id",
user.name
)
# 2. Create audit job in the same transaction
audit_payload = {
"event_type": "user_created",
"user_id": user_id,
"user_name": user.name,
"timestamp": str(user_id)
}
# 3. Enqueue job using pgqueuer in same transaction
driver = AsyncpgDriver(conn)
queries = Queries(driver)
await queries.enqueue(
["audit_events"], # Entrypoint names
[json.dumps(audit_payload).encode()], # Job payload as bytes
[0] # Priority
)
# Both operations committed together automatically when transaction exits
# Return the created user
return UserResponse(id=user_id, name=user.name)
except Exception as e:
# Transaction automatically rolled back on exception
raise HTTPException(status_code=500, detail=f"Failed to create user: {str(e)}")
finally:
await conn.close()
@app.get("/")
def read_root():
return {"message": "FastAPI with SQLAlchemy and PostgreSQL"}