Skip to content

Search

transcript_indexer.search

Query helpers over the indexed corpus.

Pure functions over a sqlite3.Connection. Used by both the CLI (Phase 3) and the MCP server (Phase 7). Semantic search lands in Phase 5; this module covers FTS5 and metadata search only.

search_conversations(conn, *, since=None, until=None, participant=None, kind=None, has_notes=None, title_query=None, limit=None)

List conversations matching the given metadata filters.

Source code in src/transcript_indexer/search.py
def search_conversations(
    conn: sqlite3.Connection,
    *,
    since: str | None = None,
    until: str | None = None,
    participant: str | None = None,
    kind: str | None = None,
    has_notes: bool | None = None,
    title_query: str | None = None,
    limit: int | None = None,
) -> list[ConversationSummary]:
    """List conversations matching the given metadata filters."""
    sql = [
        "SELECT c.id, c.source_path, c.kind, c.source, c.started_at, c.title, "
        "  (SELECT COUNT(*) FROM notes n WHERE n.conversation_id = c.id) AS note_count "
        "FROM conversations c"
    ]
    where: list[str] = []
    params: list[object] = []

    if participant:
        sql.append(
            "JOIN participants p ON p.conversation_id = c.id "
            "LEFT JOIN people pe ON pe.id = p.person_id "
            "LEFT JOIN person_aliases pa ON pa.person_id = p.person_id"
        )
        where.append(
            "(p.speaker = ? COLLATE NOCASE "
            "OR pe.canonical = ? COLLATE NOCASE "
            "OR pa.alias = ? COLLATE NOCASE)"
        )
        params.extend([participant, participant, participant])

    if since:
        where.append("c.started_at >= ?")
        params.append(since)
    if until:
        where.append("c.started_at <= ?")
        params.append(until)
    if kind:
        where.append("c.kind = ?")
        params.append(kind)
    if title_query:
        where.append("c.title LIKE ?")
        params.append(f"%{title_query}%")
    if has_notes is True:
        where.append("EXISTS (SELECT 1 FROM notes n WHERE n.conversation_id = c.id)")
    elif has_notes is False:
        where.append("NOT EXISTS (SELECT 1 FROM notes n WHERE n.conversation_id = c.id)")

    if where:
        sql.append("WHERE " + " AND ".join(where))
    sql.append("GROUP BY c.id ORDER BY c.started_at DESC")
    if limit is not None:
        sql.append("LIMIT ?")
        params.append(int(limit))

    rows = conn.execute(" ".join(sql), params).fetchall()
    return [_row_to_summary(conn, r, note_count=int(r["note_count"])) for r in rows]

get_conversation(conn, conversation_id, *, include_notes=True)

Fetch a single conversation with its turns and (optionally) notes.

Source code in src/transcript_indexer/search.py
def get_conversation(
    conn: sqlite3.Connection, conversation_id: int, *, include_notes: bool = True
) -> ConversationDetail | None:
    """Fetch a single conversation with its turns and (optionally) notes."""
    row = conn.execute(
        "SELECT id, source_path, kind, source, started_at, title FROM conversations WHERE id = ?",
        (conversation_id,),
    ).fetchone()
    if row is None:
        return None
    note_count = int(
        conn.execute(
            "SELECT COUNT(*) AS n FROM notes WHERE conversation_id = ?", (conversation_id,)
        ).fetchone()["n"]
    )
    summary = _row_to_summary(conn, row, note_count=note_count)
    turn_rows = conn.execute(
        "SELECT turn_idx, speaker, person_id, timestamp_sec, text FROM turns "
        "WHERE conversation_id = ? ORDER BY turn_idx",
        (conversation_id,),
    ).fetchall()
    turns = [
        TurnRow(
            idx=int(r["turn_idx"]),
            speaker=str(r["speaker"]),
            person_id=int(r["person_id"]) if r["person_id"] is not None else None,
            timestamp_seconds=int(r["timestamp_sec"]) if r["timestamp_sec"] is not None else None,
            text=str(r["text"]),
        )
        for r in turn_rows
    ]
    notes: list[NoteSummary] = []
    if include_notes:
        note_rows = conn.execute(
            "SELECT n.id, n.conversation_id, c.title AS conversation_title, n.kind, "
            "  n.source_path, n.indexed_at "
            "FROM notes n JOIN conversations c ON c.id = n.conversation_id "
            "WHERE n.conversation_id = ? ORDER BY n.id",
            (conversation_id,),
        ).fetchall()
        notes = [_note_row_to_summary(r) for r in note_rows]
    return ConversationDetail(summary=summary, turns=turns, notes=notes)

find_notes(conn, *, since=None, until=None, kind=None, conversation_kind=None, limit=None)

List notes filtered by note kind, parent kind, and date.

Source code in src/transcript_indexer/search.py
def find_notes(
    conn: sqlite3.Connection,
    *,
    since: str | None = None,
    until: str | None = None,
    kind: str | None = None,
    conversation_kind: str | None = None,
    limit: int | None = None,
) -> list[NoteSummary]:
    """List notes filtered by note kind, parent kind, and date."""
    sql = [
        "SELECT n.id, n.conversation_id, c.title AS conversation_title, n.kind, "
        "  n.source_path, n.indexed_at "
        "FROM notes n JOIN conversations c ON c.id = n.conversation_id"
    ]
    where: list[str] = []
    params: list[object] = []
    if kind:
        where.append("n.kind = ?")
        params.append(kind)
    if conversation_kind:
        where.append("c.kind = ?")
        params.append(conversation_kind)
    if since:
        where.append("c.started_at >= ?")
        params.append(since)
    if until:
        where.append("c.started_at <= ?")
        params.append(until)
    if where:
        sql.append("WHERE " + " AND ".join(where))
    sql.append("ORDER BY c.started_at DESC, n.id")
    if limit is not None:
        sql.append("LIMIT ?")
        params.append(int(limit))
    rows = conn.execute(" ".join(sql), params).fetchall()
    return [_note_row_to_summary(r) for r in rows]

search_turns_fts(conn, query, *, speaker=None, since=None, until=None, limit=20)

Run an FTS5 query against turns and return ranked hits.

Source code in src/transcript_indexer/search.py
def search_turns_fts(
    conn: sqlite3.Connection,
    query: str,
    *,
    speaker: str | None = None,
    since: str | None = None,
    until: str | None = None,
    limit: int = 20,
) -> list[TurnHit]:
    """Run an FTS5 query against turns and return ranked hits."""
    fts_q = _escape_fts_query(query)
    if not fts_q:
        return []
    sql = [
        "SELECT t.id AS turn_id, t.conversation_id, c.title AS conversation_title, "
        "  c.started_at, t.turn_idx, t.speaker, t.text, "
        "  snippet(turns_fts, 0, '[', ']', '...', 16) AS snippet, "
        "  bm25(turns_fts) AS rank "
        "FROM turns_fts "
        "JOIN turns t ON t.id = turns_fts.rowid "
        "JOIN conversations c ON c.id = t.conversation_id "
        "WHERE turns_fts MATCH ?"
    ]
    params: list[object] = [fts_q]
    if speaker:
        sql.append("AND t.speaker = ? COLLATE NOCASE")
        params.append(speaker)
    if since:
        sql.append("AND c.started_at >= ?")
        params.append(since)
    if until:
        sql.append("AND c.started_at <= ?")
        params.append(until)
    sql.append("ORDER BY rank LIMIT ?")
    params.append(int(limit))
    rows = conn.execute(" ".join(sql), params).fetchall()
    return [
        TurnHit(
            turn_id=int(r["turn_id"]),
            conversation_id=int(r["conversation_id"]),
            conversation_title=str(r["conversation_title"]),
            conversation_started_at=datetime.fromisoformat(str(r["started_at"])),
            turn_idx=int(r["turn_idx"]),
            speaker=str(r["speaker"]),
            text=str(r["text"]),
            snippet=str(r["snippet"]),
            rank=float(r["rank"]),
        )
        for r in rows
    ]

search_notes_fts(conn, query, *, kind=None, limit=20)

Run an FTS5 query against notes and return ranked hits.

Source code in src/transcript_indexer/search.py
def search_notes_fts(
    conn: sqlite3.Connection,
    query: str,
    *,
    kind: str | None = None,
    limit: int = 20,
) -> list[NoteHit]:
    """Run an FTS5 query against notes and return ranked hits."""
    fts_q = _escape_fts_query(query)
    if not fts_q:
        return []
    sql = [
        "SELECT n.id AS note_id, n.conversation_id, c.title AS conversation_title, n.kind, "
        "  snippet(notes_fts, 0, '[', ']', '...', 16) AS snippet, "
        "  bm25(notes_fts) AS rank "
        "FROM notes_fts "
        "JOIN notes n ON n.id = notes_fts.rowid "
        "JOIN conversations c ON c.id = n.conversation_id "
        "WHERE notes_fts MATCH ?"
    ]
    params: list[object] = [fts_q]
    if kind:
        sql.append("AND n.kind = ?")
        params.append(kind)
    sql.append("ORDER BY rank LIMIT ?")
    params.append(int(limit))
    rows = conn.execute(" ".join(sql), params).fetchall()
    return [
        NoteHit(
            note_id=int(r["note_id"]),
            conversation_id=int(r["conversation_id"]),
            conversation_title=str(r["conversation_title"]),
            kind=str(r["kind"]),
            snippet=str(r["snippet"]),
            rank=float(r["rank"]),
        )
        for r in rows
    ]

search_fts(conn, query, *, artifact='any', speaker=None, since=None, until=None, note_kind=None, limit=20)

Run FTS against the requested artifact set.

Source code in src/transcript_indexer/search.py
def search_fts(
    conn: sqlite3.Connection,
    query: str,
    *,
    artifact: Artifact = "any",
    speaker: str | None = None,
    since: str | None = None,
    until: str | None = None,
    note_kind: str | None = None,
    limit: int = 20,
) -> tuple[list[TurnHit], list[NoteHit]]:
    """Run FTS against the requested artifact set."""
    turns: list[TurnHit] = []
    notes: list[NoteHit] = []
    if artifact in ("any", "turns"):
        turns = search_turns_fts(
            conn, query, speaker=speaker, since=since, until=until, limit=limit
        )
    if artifact in ("any", "notes"):
        notes = search_notes_fts(conn, query, kind=note_kind, limit=limit)
    return turns, notes