feat: initial implementation of data ingestion and streaming API

This commit is contained in:
2025-09-20 14:26:19 +02:00
commit 5f3c31ec3f
4 changed files with 180 additions and 0 deletions

4
src/llm/.gitignore vendored Normal file
View File

@@ -0,0 +1,4 @@
dune_db/
books/
venv/
__pycache__/

82
src/llm/ingest.py Normal file
View File

@@ -0,0 +1,82 @@
import os
import re
from langchain_community.document_loaders import TextLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_community.vectorstores import Chroma
from langchain_community.embeddings import HuggingFaceEmbeddings
# --- CONFIGURATION ---
EMBEDDING_MODEL_NAME = "nomic-ai/nomic-embed-text-v1.5"
DATA_PATH = "books"
DB_PATH = "dune_db"
CHUNK_SIZE = 2048
CHUNK_OVERLAP = 256
def main():
"""
Main function to run the data ingestion process.
"""
create_vector_store()
def create_vector_store():
"""
Creates a Chroma vector store from documents in the DATA_PATH directory.
"""
print("Loading and processing documents...")
book_files = sorted([f for f in os.listdir(DATA_PATH) if f.endswith(".txt")])
all_splits = []
for book_file in book_files:
try:
order_str, title_ext = book_file.split('_', 1)
book_order = int(order_str)
book_title = os.path.splitext(title_ext)[0].replace('_', ' ')
except ValueError:
print(f"Skipping file with unexpected format: {book_file}")
continue
print(f" - Processing Book {book_order}: {book_title}")
file_path = os.path.join(DATA_PATH, book_file)
loader = TextLoader(file_path, encoding="utf-8")
documents = loader.load()
for doc in documents:
doc.page_content = re.sub(r'\n{3,}', '\n\n', doc.page_content)
doc.page_content = doc.page_content.strip()
doc.metadata = {
"source": book_file,
"book_title": book_title,
"book_order": book_order
}
text_splitter = RecursiveCharacterTextSplitter(chunk_size=CHUNK_SIZE, chunk_overlap=CHUNK_OVERLAP)
splits = text_splitter.split_documents(documents)
all_splits.extend(splits)
print(f"Created {len(all_splits)} text chunks.")
print(f"Initializing embedding model: {EMBEDDING_MODEL_NAME}")
# --- THIS IS THE CORRECTED SECTION ---
embedding_model = HuggingFaceEmbeddings(
model_name=EMBEDDING_MODEL_NAME,
model_kwargs={'trust_remote_code': True} # Argument moved inside model_kwargs
)
# --- END CORRECTION ---
print(f"Creating vector store and embedding {len(all_splits)} chunks...")
vector_store = Chroma.from_documents(
documents=all_splits,
embedding=embedding_model,
persist_directory=DB_PATH
)
print("--------------------------------------------------")
print(f"Ingestion complete!")
print(f"Vector store created at: {DB_PATH}")
print("--------------------------------------------------")
if __name__ == "__main__":
main()

74
src/llm/main.py Normal file
View File

@@ -0,0 +1,74 @@
from fastapi import FastAPI, HTTPException
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
import requests
import json
from langchain_community.vectorstores import Chroma
from langchain_community.embeddings import HuggingFaceEmbeddings
# --- Configuration (Same as before) ---
DB_PATH = "dune_db"
EMBEDDING_MODEL_NAME = "all-MiniLM-L6-v2"
OLLAMA_API_URL = "http://localhost:11434/api/generate"
OLLAMA_MODEL = "llama3:8b"
PROMPT_TEMPLATE = """
You are a helpful AI assistant and an expert on the Dune book series.
Use the following pieces of context from the books to answer the user's question.
If you don't know the answer from the context provided, just say that you don't know, don't try to make up an answer.
Context:
{context}
Question:
{question}
Answer:
"""
# --- Pydantic Models (Same as before) ---
class AskRequest(BaseModel):
question: str
# --- Initialize FastAPI and load resources (Same as before) ---
app = FastAPI()
embeddings = HuggingFaceEmbeddings(model_name=EMBEDDING_MODEL_NAME, model_kwargs={'trust_remote_code': True})
vector_store = Chroma(persist_directory=DB_PATH, embedding_function=embeddings)
retriever = vector_store.as_retriever(search_kwargs={"k": 5})
# --- NEW: The Streaming Endpoint ---
@app.post("/ask-stream")
async def ask_question_stream(request: AskRequest):
print(f"🔍 Streaming request for: {request.question}")
# 1. Retrieve context (this part is still blocking)
retrieved_docs = retriever.invoke(request.question)
context = "\n\n---\n\n".join([doc.page_content for doc in retrieved_docs])
prompt = PROMPT_TEMPLATE.format(context=context, question=request.question)
# 2. Define the generator for the streaming response
async def stream_generator():
try:
ollama_payload = {
"model": OLLAMA_MODEL,
"prompt": prompt,
"stream": True # <-- The key change to enable streaming from Ollama
}
# Use stream=True to get a streaming response from requests
with requests.post(OLLAMA_API_URL, json=ollama_payload, stream=True) as response:
response.raise_for_status()
# Ollama streams JSON objects separated by newlines
for line in response.iter_lines():
if line:
chunk = json.loads(line)
# Yield the actual text part of the token
yield chunk.get("response", "")
except requests.RequestException as e:
print(f"❌ Error communicating with Ollama: {e}")
yield "Error: Could not connect to the language model."
except Exception as e:
print(f"❌ An unexpected error occurred: {e}")
yield "Error: An unexpected error occurred while generating the answer."
# 3. Return the generator wrapped in a StreamingResponse
return StreamingResponse(stream_generator(), media_type="text/plain")

20
src/llm/requirements.txt Normal file
View File

@@ -0,0 +1,20 @@
# --- Core AI Framework ---
langchain
langchain-community
langchain-text-splitters
# --- API & Server ---
fastapi
uvicorn[standard]
# --- Vector Database ---
chromadb
# --- Embedding Model Dependencies ---
sentence-transformers
nomic
# --- Deep Learning Backend for Embeddings ---
torch
einops