Open Source RAG Pipeline Tutorial¶
This tutorial demonstrates how to build a complete Retrieval-Augmented Generation (RAG) pipeline using open source components from the DLLMForge library. The pipeline includes document preprocessing, open source embeddings, vector storage, and RAG evaluation.
Overview¶
The RAG pipeline consists of several key components:
Document Loading: Load PDF documents and extract text
Text Chunking: Split documents into manageable chunks
Embedding Generation: Create vector embeddings using open source models
Vector Storage: Store embeddings in a FAISS vector database
Retrieval: Find relevant document chunks for queries
Generation: Generate answers using an open source LLM
Evaluation: Assess the quality of the RAG system
Step-by-Step Implementation¶
1. Import Required Modules¶
Start by importing all necessary components:
from dllmforge.rag_embedding_open_source import LangchainHFEmbeddingModel
from dllmforge.rag_evaluation import RAGEvaluator
from dllmforge.LLMs.Deltares_LLMs import DeltaresOllamaLLM
from dllmforge.rag_preprocess_documents import PDFLoader, TextChunker
from langchain_community.vectorstores import FAISS
from langchain_community.docstore.in_memory import InMemoryDocstore
from pathlib import Path
import faiss
2. Initialize the Embedding Model¶
Create an embedding model using open source HuggingFace transformers:
# Initialize the embedding model
# Default model: "sentence-transformers/all-MiniLM-L6-v2"
model = LangchainHFEmbeddingModel("intfloat/multilingual-e5-large")
The LangchainHFEmbeddingModel class supports any HuggingFace sentence transformer model and provides:
Automatic model downloading and caching
Batch embedding for efficient processing
Input validation for embeddings
We recommend starting with the default model and experimenting with others based on your use case. The ranked list of popular models can be found at: https://huggingface.co/spaces/mteb/leaderboard. Note that the bigger the number of parameters, the better the performance, but also the higher the resource requirements. Which implies that the model might not fit into memory on smaller machines and take a long time to download.
3. Load and Process Documents¶
First, you will need to download some PDF documents to use as your knowledge base. In this example, we use the schemaGAN paper from science direct (https://www.sciencedirect.com/science/article/pii/S0266352X25001260). Download this pdf into a local directory and update the path below. In our case, we copy the pdf into a folder named “documents” in the root of the repository.
Two important parameters to consider when chunking documents are chunk_size and overlap_size.
chunk_size: The maximum size of each text chunk (in characters). Smaller chunks may improve retrieval performance but increase the number of chunks.overlap_size: The number of overlapping characters between chunks. Overlapping chunks can help preserve context but may increase redundancy.
Load PDF documents and create text chunks:
# Define the directory containing PDF documents
data_dir = Path(r'documents')
pdfs = list(data_dir.glob("*.pdf"))
# Initialize document loader and chunker
loader = PDFLoader()
chunker = TextChunker(chunk_size=1000, overlap_size=200)
global_embeddings = []
metadatas = []
# Process each PDF file
for pdf_path in pdfs:
# Load the PDF document
pages, file_name, metadata = loader.load(pdf_path)
# Create chunks with overlap for better context preservation
chunks = chunker.chunk_text(pages, file_name, metadata)
# Generate embeddings for chunks
chunk_embeddings = model.embed(chunks)
# Store embeddings and metadata
global_embeddings.extend(chunk_embeddings)
metadatas.extend([chunk["metadata"] for chunk in chunks])
print(f"Embedded {len(chunk_embeddings)} chunks from {file_name}.")
print(f"Total embeddings generated: {len(global_embeddings)}")
After running this code, you should see output indicating the number of chunks embedded from each PDF and the total embeddings generated. In this example, you will see the following output:
Embedded 107 chunks from 1-s2.0-S0266352X25001260-main.pdf. Total embeddings generated: 107
4. Create Vector Store¶
Set up a FAISS vector store for efficient similarity search: In this example, the FAISS index was used but other index types can be used as well, like MongoDB or Weaviate.
# Get embedding dimension
embedding_dim = len(global_embeddings[0]["text_vector"])
# Create FAISS index for L2 (Euclidean) distance
index = faiss.IndexFlatL2(embedding_dim)
# Initialize vector store
vector_store = FAISS(
embedding_function=model.embeddings,
index=index,
docstore=InMemoryDocstore(),
index_to_docstore_id={},
)
# Add embeddings to the vector store
for chunk, meta in zip(global_embeddings, metadatas):
vector_store.add_texts(
texts=[chunk["chunk"]],
metadatas=[meta],
ids=[chunk["chunk_id"]],
embeddings=[chunk["text_vector"]]
)
Alternative Index Types:
IndexFlatL2: Exact L2 distance (slower but accurate)IndexFlatIP: Inner product similarityIndexIVFFlat: Faster approximate search for large datasets
5. Test Retrieval¶
Test the vector store with a sample query:
# Query the vector store directly
query_embedding = vector_store.similarity_search_with_score(
query="Size of images for schema GAN",
k=5
)
print("Query result:", query_embedding)
# Each result contains (Document, similarity_score)
for doc, score in query_embedding:
print(f"Score: {score}")
print(f"Content: {doc.page_content[:200]}...")
print(f"Metadata: {doc.metadata}")
print("---")
6. Initialize the LLM¶
Set up the open source language model using Ollama, in this case the Qwen3 model:
# Initialize Ollama LLM
llm = DeltaresOllamaLLM(
base_url="https://chat-api.directory.intra",
model_name="qwen3:latest", # Or another available model
temperature=0.8
)
7. Create Retriever and Generate Answers¶
Set up the retriever and generate answers to questions:
# Create retriever with similarity threshold
retriever = vector_store.as_retriever(
search_type="similarity_score_threshold",
search_kwargs={
"score_threshold": 0.1, # Minimum similarity score
"k": 10 # Maximum number of documents to retrieve
},
)
# Generate answer using RAG
question = "Size of images produced by schemaGAN? give me the answer in axb format"
chat_result = llm.ask_with_retriever(question, retriever)
answer = chat_result.generations[0].message.content
print("Answer:", answer)
The answer should be relevant to the question based on the retrieved documents. If the answer is not satisfactory, consider refining the query or adjusting the retriever and/or embedding model.
8. Evaluate the RAG System¶
Use the built-in evaluation framework to assess RAG performance:
# Define test questions with ground truth answers
TEST_QUESTIONS = [{
"question": "Size of images produced by schemaGAN?",
"ground_truth": "The images produced by schemaGAN have a size of **512 × 32 pixels**."
}, {
"question": "What is the network architecture based on?",
"ground_truth": "the pix2pix method from Isola et al. (2017)"
}]
# Initialize evaluator
evaluator = RAGEvaluator(llm_provider="deltares", deltares_llm=llm)
results = []
for q_data in TEST_QUESTIONS:
question = q_data["question"]
ground_truth = q_data["ground_truth"]
# Generate answer
chat_result = llm.ask_with_retriever(question, retriever)
answer = chat_result.generations[0].message.content
answer = answer.split("</think>")[-1].strip() # Clean up response
# Get retrieved contexts
retrieved_contexts = retriever.invoke(question)
# Evaluate the RAG pipeline
evaluation = evaluator.evaluate_rag_pipeline(
question=question,
generated_answer=answer,
retrieved_contexts=retrieved_contexts,
ground_truth_answer=ground_truth
)
# Store results
result = {
'question': question,
'ground_truth': ground_truth,
'response': answer,
'context': retrieved_contexts,
'evaluation': evaluation
}
results.append(result)
# Print evaluation metrics
print(f"Question: {question}")
print(f"RAGAS Score: {evaluation.ragas_score:.3f}")
print(f"Answer Relevancy: {evaluation.answer_relevancy.score:.3f}")
print(f"Faithfulness: {evaluation.faithfulness.score:.3f}")
print(f"Context Recall: {evaluation.context_recall.score:.3f}")
print(f"Context Relevancy: {evaluation.context_relevancy.score:.3f}")
print("---")
Evaluation Metrics Explained¶
The RAG evaluation provides four key metrics:
- 1. Context Relevancy (0-1)
Measures how relevant the retrieved documents are to the question. Higher scores indicate better retrieval. If the score is low, it indicates a potential issue with the embedding model or vector store.
- 2. Context Recall (0-1)
Measures whether all necessary information was retrieved. Compares retrieved context with ground truth. Low scores suggest that important documents may be missing from the retrieval. There is possible room for improvement by adjusting chunk size, overlap, or using a different embedding model.
- 3. Faithfulness (0-1)
Measures factual accuracy and absence of hallucinations. Checks if the answer is grounded in the retrieved context. Low scores indicate that the LLM may be generating information not supported by the context. Adjust the LLM temperature or try a different model to improve faithfulness.
- 4. Answer Relevancy (0-1)
Measures how directly the answer addresses the question. Penalizes verbose or off-topic responses. Low scores suggest that the LLM may not be effectively using the retrieved context. Experiment with prompt engineering or different LLMs to enhance answer relevancy.
- RAGAS Score
Overall score combining all metrics, providing a single measure of RAG system quality.
Advanced Configuration¶
Optimizing Chunk Size¶
Experiment with different chunk sizes based on your documents:
# For technical documents with detailed information
chunker = TextChunker(chunk_size=1500, overlap_size=300)
# For shorter, conversational content
chunker = TextChunker(chunk_size=500, overlap_size=100)
# For very long documents
chunker = TextChunker(chunk_size=2000, overlap_size=400)
Using Different Embedding Models¶
Try different embedding models for better performance:
# More capable but larger model
model = LangchainHFEmbeddingModel(
model_name="sentence-transformers/all-mpnet-base-v2"
)
# Multilingual model
model = LangchainHFEmbeddingModel(
model_name="sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2"
)
# Domain-specific model (if available)
model = LangchainHFEmbeddingModel(
model_name="sentence-transformers/allenai-specter"
)
Improving Retrieval¶
Fine-tune retrieval parameters:
# For high precision (fewer but more relevant results)
retriever = vector_store.as_retriever(
search_type="similarity_score_threshold",
search_kwargs={
"score_threshold": 0.3, # Higher threshold
"k": 5 # Fewer results
},
)
# For high recall (more results, potentially less relevant)
retriever = vector_store.as_retriever(
search_type="similarity",
search_kwargs={
"k": 20 # More results
},
)
# For diverse results using Maximum Marginal Relevance
retriever = vector_store.as_retriever(
search_type="mmr",
search_kwargs={
"k": 10,
"lambda_mult": 0.7 # Balance between relevance and diversity
},
)
Docling Automatic chunker¶
If the documents are not very well structured, you can use the automatic chunker from docling. This chunker uses a language model to create chunks based on the content and structure of the document. Docling chunks are typically more semantically meaningful than fixed-size chunks, and can improve retrieval performance.
The following code snippet shows how to use the docling chunker. You should replace the previous chunking code with this code.
from langchain_docling import DoclingLoader
from docling.chunking import HybridChunker
from langchain_docling.loader import ExportType
# find all PDF files in the directory
pdfs = list(data_dir.glob("*.pdf"))
global_chunks = []
for pdf_path in pdfs:
loader = DoclingLoader(
file_path=pdf_path,
export_type=ExportType.DOC_CHUNKS,
chunker=HybridChunker(tokenizer=model.embeddings.model_name,
chunk_size=512, # Max length supported by MiniLM
chunk_overlap=50 # Some overlap for better context
))
docs = loader.load()
global_chunks.extend(docs)
print(f"Total embeddings generated: {len(global_chunks)}")
# now create the vector store
# create faiss vector store
index = faiss.IndexFlatL2(len(model.embed("test")))
vector_store = FAISS(
embedding_function=model.embeddings,
index=index,
docstore=InMemoryDocstore({}),
index_to_docstore_id={},
)
vector_store.add_documents(global_chunks)
# query the vector store directly to check wat is achterland in piping?
query_embedding = vector_store.similarity_search_with_score(query="Size of images for schema GAN in pixels", k=5)
print("Query result:", query_embedding)