Skip to content

Core

SQLiteConnectionState

Bases: pw._ConnectionState

Defaults to make SQLite DB async-compatible (according to FastAPI/Pydantic)

Source code in samudra/conf/database/core.py
class SQLiteConnectionState(pw._ConnectionState):
    """Defaults to make SQLite DB async-compatible (according to FastAPI/Pydantic)"""

    def __init__(self, **kwargs):
        super().__setattr__("_state", db_state)
        super().__init__(**kwargs)

    def __setattr__(self, name, value):
        self._state.get()[name] = value

    def __getattr__(self, name):
        return self._state.get()[name]

get_database(name)

Returns the connection class based on the name.

Source code in samudra/conf/database/core.py
def get_database(name: str) -> pw.Database:
    """Returns the connection class based on the name."""
    info = read_database_info(name)
    if info.get("engine") == DatabaseEngine.SQLite:
        return_db = pw.SqliteDatabase(info.get("path"), check_same_thread=False)
        return_db._state = SQLiteConnectionState()
        return return_db
    if info.get("engine") == DatabaseEngine.MySQL:
        return pw.MySQLDatabase(info.get("path"))

new_database(name, engine, path)

Create and register a SQLite database or just register a database if not SQLite

Source code in samudra/conf/database/core.py
def new_database(name: str, engine: DatabaseEngine, path: str) -> pw.Database:
    """Create and register a SQLite database or just register a database if not SQLite"""
    # ? Should this be a function parameters?
    DATABASE_HOST = os.getenv("DATABASE_HOST")
    DATABASE_PORT = int(os.getenv("DATABASE_PORT"))
    DATABASE_OPTIONS = os.getenv("DATABASE_OPTIONS")
    USERNAME = os.getenv("DATABASE_USERNAME")
    PASSWORD = os.getenv("DATABASE_PASSWORD")
    SSL_MODE = os.getenv("SSL_MODE")
    if engine is None or engine not in DatabaseEngine.__members__.values():
        raise ValueError(
            "Invalid engine. You entered {}. Valid values are: \n - {}".format(
                engine, "\n - ".join(DatabaseEngine.__members__.values())
            )
        )
    if engine == DatabaseEngine.SQLite:
        return create_sqlite(name=name, path=Path(path))
    if engine == DatabaseEngine.MySQL:
        conn_str = f"mysql://{USERNAME}:{PASSWORD}@{DATABASE_HOST}:{DATABASE_PORT}/{name}?ssl-mode=REQUIRED"
        return_db = pw.MySQLDatabase(conn_str)
        append_database_list(name=name, path=conn_str, engine=engine)
        logging.info(f"Connecting to {return_db.database} as {USERNAME}")
    else:
        raise NotImplementedError("Invalid engine")

set_active_database(name)

Sets the database as the currently active database

Source code in samudra/conf/database/core.py
def set_active_database(name: str) -> None:
    """Sets the database as the currently active database"""
    # Check if the name is already registered in .samudra/databases.toml
    db_obj = read_database_info(name=name)
    if db_obj is None:
        raise FileNotFoundError(
            f"The database name `{name}` is not found. Perhaps it is not created yet."
        )
    # Write the info in .samudra/config.toml
    write_config({"active": name})