Skip to content

Database

transcript_indexer.db.connection

Connection factory: WAL, sqlite-vec, migrations, and meta consistency.

The chunk_embeddings vec0 table is dimension-locked at create time, so it is created here at runtime from config rather than in the migration. The chosen dimensions/provider/model are recorded in the meta table; if config disagrees with meta on a later open, we refuse to start and direct the user to txi reembed --change-dimension.

open_connection(cfg, *, read_only=False)

Open a SQLite connection with sqlite-vec loaded, WAL mode, FK enforcement, migrations applied, and chunk_embeddings ensured.

Source code in src/transcript_indexer/db/connection.py
def open_connection(cfg: Config, *, read_only: bool = False) -> sqlite3.Connection:
    """Open a SQLite connection with sqlite-vec loaded, WAL mode, FK enforcement,
    migrations applied, and chunk_embeddings ensured."""
    db_path = cfg.paths.db.expanduser()
    if not read_only:
        db_path.parent.mkdir(parents=True, exist_ok=True)

    if read_only:
        uri = f"file:{db_path}?mode=ro"
        conn = sqlite3.connect(uri, uri=True)
    else:
        conn = sqlite3.connect(db_path)

    conn.row_factory = sqlite3.Row
    conn.enable_load_extension(True)
    sqlite_vec.load(conn)
    conn.enable_load_extension(False)

    if not read_only:
        conn.execute("PRAGMA journal_mode=WAL")
        conn.execute("PRAGMA foreign_keys=ON")
        schema.migrate(conn)
        _ensure_chunk_embeddings(conn, cfg)
    else:
        conn.execute("PRAGMA foreign_keys=ON")

    return conn

open_for_path(db_path, dimensions=1024)

Test-friendly opener: bypass Config, point at an explicit DB path.

Source code in src/transcript_indexer/db/connection.py
def open_for_path(db_path: Path, dimensions: int = 1024) -> sqlite3.Connection:
    """Test-friendly opener: bypass Config, point at an explicit DB path."""
    cfg = Config()
    cfg.paths.db = db_path
    cfg.embedding.dimensions = dimensions
    return open_connection(cfg)

transcript_indexer.db.schema

Migration runner. Tracks schema version via PRAGMA user_version.

migrate(conn)

Apply any pending migrations. Returns the new user_version.

Source code in src/transcript_indexer/db/schema.py
def migrate(conn: sqlite3.Connection) -> int:
    """Apply any pending migrations. Returns the new user_version."""
    have = current_user_version(conn)
    for version, _name, sql in _list_migrations():
        if version <= have:
            continue
        with conn:  # transaction
            conn.executescript(sql)
            conn.execute(f"PRAGMA user_version = {version}")
    return current_user_version(conn)