Source code for dllmforge.rag_embedding
"""
This module provides embedding functionality for RAG (Retrieval-Augmented Generation) pipelines.
It can be used to 1) vectorize document chunks, and 2) vectorize user queries.
The module uses Azure OpenAI embeddings model as an example of using hosted embedding APIs. Note you need
Azure OpenAI service and a deployed embedding model on Azure to use this module.
"""
from typing import List, Any, Union, Dict
import base64
import re
import os
from dotenv import load_dotenv
try:
from langchain_openai import AzureOpenAIEmbeddings
LANGCHAIN_OPENAI_AVAILABLE = True
except ImportError:
LANGCHAIN_OPENAI_AVAILABLE = False
AzureOpenAIEmbeddings = None
[docs]
class AzureOpenAIEmbeddingModel:
"""Class for embedding queries and document chunks using Azure OpenAI Embeddings."""
[docs]
def __init__(self,
model: str = "text-embedding-3-large",
api_base: str = None,
deployment_name_embeddings: str = None,
api_key: str = None,
api_version: str = None):
"""
Initialize the embedding model using provided arguments or environment variables for Azure OpenAI.
Args:
model: Name of the embedding model to use
api_base: Azure OpenAI API base URL
deployment_name_embeddings: Azure OpenAI deployment name for embeddings
api_key: Azure OpenAI API key
api_version: Azure OpenAI API version
"""
if not LANGCHAIN_OPENAI_AVAILABLE:
raise ImportError("langchain_openai is required for AzureOpenAIEmbeddingModel. "
"Install it with: pip install dllmforge[api]")
load_dotenv()
api_base = api_base or os.getenv('AZURE_OPENAI_API_BASE')
deployment_name_embeddings = deployment_name_embeddings or os.getenv('AZURE_OPENAI_DEPLOYMENT_EMBEDDINGS')
api_key = api_key or os.getenv('AZURE_OPENAI_API_KEY')
api_version = api_version or os.getenv('AZURE_OPENAI_API_VERSION')
self.embeddings = AzureOpenAIEmbeddings(model=model,
azure_endpoint=api_base,
azure_deployment=deployment_name_embeddings,
api_key=api_key,
openai_api_version=api_version)
[docs]
@staticmethod
def validate_embedding(embedding: List[float]) -> bool:
"""Validate that the embedding is not empty."""
if not embedding:
return False
if not all(isinstance(x, (int, float)) for x in embedding):
return False
return True
[docs]
@staticmethod
def encode_filename(filename: str) -> str:
"""Encode filename to be safe for Azure Cognitive Search document keys."""
# Remove file extension
name_without_ext = os.path.splitext(filename)[0]
# Replace spaces and special characters with underscores
safe_name = re.sub(r'[^a-zA-Z0-9_-]', '_', name_without_ext)
# Encode in base64 and make it URL-safe
encoded = base64.urlsafe_b64encode(safe_name.encode()).decode()
# Remove padding
encoded = encoded.rstrip('=')
return encoded
[docs]
def embed(self, query_or_chunks: Union[str, List[Dict[str, Any]]]) -> Union[List[float], List[Dict[str, Any]]]:
"""
Embed a single query string or a list of document chunks.
Args:
query_or_chunks: A string (query) or a list of dictionaries (document chunks)
Each dictionary should have keys: "text", "file_name", "page_number"
Returns:
For a string query: list of floats (embedding vector)
For document chunks: list of dictionaries with keys: "chunk_id", "chunk", "page_number",
"file_name", "text_vector"
"""
if isinstance(query_or_chunks, str):
query_text = query_or_chunks
vectorized_query = self.embeddings.embed_query(query_text)
if not self.validate_embedding(vectorized_query):
raise ValueError("Invalid embedding generated for query.")
return vectorized_query
elif isinstance(query_or_chunks, list) and all(isinstance(t, dict) for t in query_or_chunks):
chunks = query_or_chunks
vectorized_chunks = []
for chunk in chunks:
embedding = self.embeddings.embed_query(chunk["text"])
if not self.validate_embedding(embedding):
raise ValueError(f"Invalid embedding generated for chunk: {chunk}")
# encode file_name to be safe for Azure Cognitive Search document keys.
safe_filename = self.encode_filename(chunk["file_name"])
vectorized_chunks.append({
"chunk_id": f"{safe_filename}_i{chunk['chunk_index']}",
"chunk": chunk["text"],
"page_number": chunk["page_number"],
"file_name": chunk["file_name"],
"text_vector": embedding
})
return vectorized_chunks
else:
raise ValueError("Input must be a string or a list of dictionaries.")
if __name__ == "__main__":
# Example usage
model = AzureOpenAIEmbeddingModel()
# Example: Embedding a query
query = "What is the capital of France?"
query_embedding = model.embed(query)
print("Query embedding (first 5 values):", query_embedding[:5])
# Example: Embedding document chunks
from rag_preprocess_documents import *
from pathlib import Path
data_dir = Path(r'c:\Users\deng_jg\work\16centralized_agents\test_data')
pdf_path = data_dir / "lstm_low_flow.pdf"
# Load the PDF document
loader = PDFLoader()
pages, file_name = loader.load(pdf_path)
# Create chunks with custom settings
chunker = TextChunker(chunk_size=1000, overlap_size=200)
chunks = chunker.chunk_text(pages, file_name)
# Embed the document chunks
chunk_embeddings = model.embed(chunks)
print(f"Generated {len(chunk_embeddings)} embeddings for document chunks.")
for i, emb in enumerate(chunk_embeddings):
if i < 4:
print(f"Chunk {i+1} - File: {emb['file_name']}, Page: {emb['page_number']}")
print(f" Text preview: {emb['chunk'][:100]}...")
print(f" Embedding (first 5 values): {emb['text_vector'][:5]}")
print()
else:
break