From 2b4d9d1a1fa6fe2c70e071c3d7cc2c4394097397 Mon Sep 17 00:00:00 2001 From: TropiiDev Date: Fri, 18 Apr 2025 00:59:30 -0400 Subject: [PATCH] Initial Commit v0.0.1 --- .gitignore | 5 +++ api.py | 110 +++++++++++++++++++++++++++++++++++++++++++++++ auth.py | 50 +++++++++++++++++++++ functions.py | 31 +++++++++++++ requirements.txt | 50 +++++++++++++++++++++ sql.py | 32 ++++++++++++++ tables.py | 51 ++++++++++++++++++++++ 7 files changed, 329 insertions(+) create mode 100644 .gitignore create mode 100644 api.py create mode 100644 auth.py create mode 100644 functions.py create mode 100644 requirements.txt create mode 100644 sql.py create mode 100644 tables.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..4887442 --- /dev/null +++ b/.gitignore @@ -0,0 +1,5 @@ +.venv/ +.idea/ +__pycache__/ +database.db +.env \ No newline at end of file diff --git a/api.py b/api.py new file mode 100644 index 0000000..613c466 --- /dev/null +++ b/api.py @@ -0,0 +1,110 @@ +from auth import * +from fastapi import FastAPI +from fastapi.middleware.cors import CORSMiddleware +from fastapi.security import OAuth2PasswordRequestForm +from functions import * +import random +from sql import * +from tables import * + +# fastapi init +app = FastAPI(lifespan=lifespan) +origins = [ + "http://localhost:3000", + "https://task.fstropii.com", + "https://www.task.fstropii.com", + "https://task.fstropii.com/", + "http://localhost" +] +app.add_middleware( + CORSMiddleware, + allow_origins=origins, + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], +) + +# routes +@app.get("/") +async def root(): + return {"message": "Hello World"} + +@app.post("/user/create", response_model=UserPublic) +def create_user(user: UserCreate, session: SessionDep): + salt = random.randint(00000, 99999) + existing_user = session.query(User).filter(User.email == user.email).first() + if existing_user: + raise HTTPException(status_code=400, detail="Email already registered") + + user.password = hash_password(user.password, salt) + user.salt = salt + + db_user = User.model_validate(user) + session.add(db_user) + session.commit() + session.refresh(db_user) + return db_user + +@app.patch("/user/update", response_model=UserPublic) +async def update_user(user: UserUpdate, session: SessionDep, current_user: User = Depends(get_current_user)): + user_db = session.get(User, current_user.id) + user_data = user.model_dump(exclude_unset=True) + print(f"l52_db: {user_db.tasks}") + if 'tasks' in user_data: + if user_db.tasks is None: + user_db.tasks = [] + + for i in range(len(user_db.tasks)): # loop through all tasks + if user_db.tasks[i]['title'] == user_data['tasks'][i]['title']: # check if there is a task with the same title + # if there is, check if the is_completed status has changed + # if it has, update the task in the db + # if it hasn't, do nothing + if user_db.tasks[i]['is_completed'] != user_data['tasks'][i]['is_completed']: + # update the task in the db + updated_task = user_db.tasks[i] + updated_task['is_completed'] = user_data['tasks'][i]['is_completed'] + break + else: + current_tasks = user_db.tasks + new_task = user_data['tasks'] + user_data.tasks = current_tasks + new_task + print(f"l71_update_db: {user_db.tasks}") + print(f"l72_update_user_data: {user_data}") + user_db.sqlmodel_update(user_data) + print(f"l74_user_data {user_data}") + print(f"l75_user_db: {user_db.tasks}") + session.add(user_db) + print("l##_db: ", user_db.tasks) + session.commit() + print("l76_not_refresh_db: ", user_db.tasks) # randomly sets is_completed to false even though its true every other time + session.refresh(user_db) + print(f"l77_db: {user_db.tasks}") + return user_db + +@app.get('/user/login') +async def verify_user(user: VerifyUser, session: SessionDep): + existing_user = session.query(User).filter(User.email == user.email).first() + if not existing_user: + raise HTTPException(status_code=400, detail="Email not registered") + + is_password_valid = validate_password(user.password, existing_user.password, existing_user.salt) + + if not is_password_valid: + raise HTTPException(status_code=400, detail="Password not valid") + + return {"is_password_valid": is_password_valid, "email": existing_user.email, "id": existing_user.id} + + +@app.post('/token', response_model=Token) +async def login_for_access_token(session: SessionDep, form_data: OAuth2PasswordRequestForm = Depends()): + user = authenticate_user(form_data.username, form_data.password, session) + if not user: + raise HTTPException(status_code=401, detail="Incorrect username or password", headers={"WWW-Authenticate": "Bearer"}) + + access_token_expires = timedelta(minutes=int(os.getenv("ACCESS_TOKEN_EXPIRES_MINUTES"))) + access_token = create_access_token(data={"sub": user.username}, expires_delta=access_token_expires) + return {"access_token": access_token, "token_type": "bearer"} + +@app.get("/users/me/", response_model=UserPublic) +async def read_users_me(session: SessionDep, current_user: User = Depends(get_current_user)): + return current_user \ No newline at end of file diff --git a/auth.py b/auth.py new file mode 100644 index 0000000..1d843d0 --- /dev/null +++ b/auth.py @@ -0,0 +1,50 @@ +import os + +from datetime import datetime, timedelta +from fastapi import Depends, HTTPException +from fastapi.security import OAuth2PasswordBearer +from jose import JWTError, jwt +from passlib.context import CryptContext +from tables import * +from sql import * +from functions import * + +from dotenv import load_dotenv + +load_dotenv() + +secret_key = os.getenv("SECRET_KEY") +algo = os.getenv("ALGORITHM") + +pwd_context = CryptContext(schemes=['bcrypt'], deprecated="auto") +oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token") + +def create_access_token(data: dict, expires_delta: timedelta | None = None): + to_encode = data.copy() + + if expires_delta: + expire = datetime.now(datetime.timezone.utc) + expires_delta + else: + expire = datetime.now(datetime.timezone.utc) + timedelta(minutes=15) + + to_encode.update({"exp": expire}) + encoded_jwt = jwt.encode(to_encode, secret_key, algorithm=algo) + return encoded_jwt + +async def get_current_user(session: SessionDep, token: str = Depends(oauth2_scheme)): + credential_exception = HTTPException(status_code=401, detail="Could not validate credentials", headers={"WWW-Authenticate": "Bearer"}) + try: + payload = jwt.decode(token, secret_key, algorithms=[algo]) + username: str = payload.get("sub") + if username is None: + raise credential_exception + + token_data = TokenData(username=username) + except JWTError: + raise credential_exception + + user = get_user_by_username(token_data.username, session) + if user is None: + raise credential_exception + + return user \ No newline at end of file diff --git a/functions.py b/functions.py new file mode 100644 index 0000000..82845ec --- /dev/null +++ b/functions.py @@ -0,0 +1,31 @@ +import hashlib + +from tables import * +from sql import * + +def hash_password(password: str, salt: int = None): + password = f"{password}{salt}" + return hashlib.sha256(password.encode()).hexdigest() + +def get_user_by_email(email: str, session) -> User | None: + return session.query(User).filter(User.email == email).first() + +def get_user_by_username(username: str, session) -> User | None: + return session.query(User).filter(User.username == username).first() + +def validate_password(password: str, hash: str, salt: int) -> bool: + hashed_pass = hash_password(password, salt) + if hash == hashed_pass: + return True + + return False + +def authenticate_user(username: str, password: str, session: SessionDep): + user = get_user_by_username(username, session) + if not user: + return False + + if not validate_password(password, user.password, user.salt): + return False + + return user \ No newline at end of file diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..c295b25 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,50 @@ +annotated-types==0.7.0 +anyio==4.9.0 +certifi==2025.1.31 +charset-normalizer==3.4.1 +click==8.1.8 +crypto==1.4.1 +dnspython==2.7.0 +ecdsa==0.19.1 +email_validator==2.2.0 +fastapi==0.115.12 +fastapi-cli==0.0.7 +greenlet==3.2.0 +h11==0.14.0 +httpcore==1.0.8 +httptools==0.6.4 +httpx==0.28.1 +idna==3.10 +Jinja2==3.1.6 +markdown-it-py==3.0.0 +MarkupSafe==3.0.2 +mdurl==0.1.2 +Naked==0.1.32 +passlib==1.7.4 +pyasn1==0.4.8 +pydantic==2.11.3 +pydantic_core==2.33.1 +Pygments==2.19.1 +python-dotenv==1.1.0 +python-jose==3.4.0 +python-multipart==0.0.20 +PyYAML==6.0.2 +requests==2.32.3 +rich==14.0.0 +rich-toolkit==0.14.1 +rsa==4.9.1 +shellescape==3.8.1 +shellingham==1.5.4 +six==1.17.0 +sniffio==1.3.1 +SQLAlchemy==2.0.40 +sqlmodel==0.0.24 +starlette==0.46.2 +typer==0.15.2 +typing-inspection==0.4.0 +typing_extensions==4.13.2 +urllib3==2.4.0 +uvicorn==0.34.1 +uvloop==0.21.0 +watchfiles==1.0.5 +websockets==15.0.1 diff --git a/sql.py b/sql.py new file mode 100644 index 0000000..c27628f --- /dev/null +++ b/sql.py @@ -0,0 +1,32 @@ +from contextlib import asynccontextmanager +from fastapi import Depends, FastAPI + +from sqlmodel import Session, SQLModel, create_engine +from typing import Annotated + +# SQLite file naming + +sqlite_file_name = "database.db" +sqlite_url = f"sqlite:///{sqlite_file_name}" + +# Create the engine + +connect_args = {"check_same_thread": False} +engine = create_engine(sqlite_url, connect_args=connect_args) + +# Create the tables in the db +def create_db_and_tables(): + SQLModel.metadata.create_all(engine) + +# Get the session dep +def get_session(): + with Session(engine) as session: + yield session + +SessionDep = Annotated[Session, Depends(get_session)] + +# Run the create_db_and_tables on startup +@asynccontextmanager +async def lifespan(api: FastAPI): + create_db_and_tables() + yield \ No newline at end of file diff --git a/tables.py b/tables.py new file mode 100644 index 0000000..c407670 --- /dev/null +++ b/tables.py @@ -0,0 +1,51 @@ +from pydantic import BaseModel +from sqlmodel import Field, SQLModel +from sqlalchemy import Column, JSON + +# Token models +class Token(BaseModel): + access_token: str + token_type: str + +class TokenData(BaseModel): + username: str | None = None + +# Task models +class Task(BaseModel): + title: str + description: str | None = None + is_completed: bool = False + +# User models +class UserBase(SQLModel): + name: str + email: str = Field(index=True) + username: str + +class User(UserBase, table=True): + id: int = Field(default=None, primary_key=True, index=True) + is_active: bool = True + username: str = Field(index=True, unique=True) + password: str + salt: int + tasks: list[Task] | None = Field(default=None, sa_column=Column(JSON)) + +class UserPublic(UserBase): + id: int + is_active: bool + tasks: list[Task] | None = Field(default=None, sa_column=Column(JSON)) + +class UserCreate(UserBase): + password: str + salt: int | None = Field(default=0) + is_active: bool = True + +class UserUpdate(SQLModel): + name: str | None = None + email: str | None = None + password: str | None = None + tasks: list[Task] | None = None + +class VerifyUser(BaseModel): + email: str + password: str \ No newline at end of file