Source code for dllmforge.IE_agent_extractor_docling

#TODO: add async version of this module
"""
Synchronous Information Extractor module for extracting structured information from documents using LLM with Docling.
"""
import os
import json
from typing import List, Dict, Any, Optional, Union, Generator
from pathlib import Path
from pydantic import BaseModel
from langchain.output_parsers import PydanticOutputParser
from langchain.output_parsers.json import parse_json_markdown
from langchain.prompts import ChatPromptTemplate, SystemMessagePromptTemplate, HumanMessagePromptTemplate
from dllmforge.langchain_api import LangchainAPI
from dllmforge.IE_agent_config import IEAgentConfig, ExtractorConfig
import base64

# Docling imports
try:
    from docling.document_converter import DocumentConverter
    DOCLING_AVAILABLE = True
except Exception:
    # If docling or its submodules are not importable in this environment,
    # provide a lightweight fallback DocumentConverter that is callable so
    # tests and code paths that only instantiate the converter won't crash.
    DOCLING_AVAILABLE = False

[docs] class DocumentConverter: """Fallback stub used when the real `docling` package isn't available. The stub is intentionally minimal: it can be instantiated safely but its `convert` method raises a RuntimeError. Test suites can still patch `DocumentConverter` where they need to simulate conversions. """
[docs] def __init__(self, *args, **kwargs): return None
[docs] def convert(self, *args, **kwargs): raise RuntimeError("docling is not available in this environment")
[docs] class DoclingProcessedDocument: """Class representing a document processed by Docling"""
[docs] def __init__(self, content: Union[str, bytes], content_type: str, metadata: Optional[Dict[str, Any]] = None, docling_result=None): self.content = content self.content_type = content_type self.metadata = metadata or {} self.docling_result = docling_result # Store the full Docling result for advanced features
[docs] class DocumentChunk: """Class representing a chunk of document content"""
[docs] def __init__(self, content: Union[str, bytes], content_type: str, metadata: Optional[Dict[str, Any]] = None, docling_elements: Optional[List] = None): self.content = content self.content_type = content_type self.metadata = metadata or {} self.docling_elements = docling_elements or [] # Store Docling elements for structure awareness
[docs] class DoclingDocumentProcessor: """Document processor using Docling for advanced PDF processing"""
[docs] def __init__(self, config): self.config = config # Use simple DocumentConverter without problematic pipeline options # The default configuration should work fine for most use cases self.converter = DocumentConverter()
[docs] def encode_image_base64(self, image_data: bytes) -> str: """Encode image data to base64 string""" return base64.b64encode(image_data).decode('utf-8')
[docs] def process_document(self, file_path: Path) -> Optional[DoclingProcessedDocument]: """Process a single document using Docling""" try: # Convert document using Docling result = self.converter.convert(str(file_path)) # Create metadata from Docling result metadata = { 'source_file': str(file_path), 'file_name': file_path.name, 'file_type': file_path.suffix.lower(), 'num_pages': len(result.document.pages) if hasattr(result.document, 'pages') else 1, 'num_tables': len(result.document.tables), 'num_figures': len(result.document.pictures), 'processing_method': 'docling' } # Extract structured content content_parts = [] for text_value in result.document.texts: sub_metadata = metadata.copy() sub_metadata['page_no'] = text_value.prov[0].page_no doc_chunk = DocumentChunk(content=text_value.text, content_type="text", metadata=sub_metadata) content_parts.append(doc_chunk) # Extract tables if any for table in result.document.tables: table_md = table.export_to_markdown() sub_metadata = metadata.copy() sub_metadata['page_no'] = table.prov[0].page_no content_parts.append( DocumentChunk(content=f"\n\n## Table\n{table_md}", content_type="text", metadata=sub_metadata)) content_parts.append(f"\n\n## Table\n{table_md}") # Combine all content full_content = content_parts # Add document metadata if available if hasattr(result.document, 'meta') and result.document.meta: metadata.update({ 'title': getattr(result.document.meta, 'title', ''), 'author': getattr(result.document.meta, 'author', ''), 'subject': getattr(result.document.meta, 'subject', ''), }) return DoclingProcessedDocument(content=full_content, content_type='chunks', metadata=metadata, docling_result=result) except Exception as e: print(f"Error processing document {file_path}: {e}") return None
[docs] def process_directory(self) -> List[DoclingProcessedDocument]: """Process all documents in the configured directory""" input_dir = Path(self.config.document.input_dir) if not input_dir.exists(): print(f"Input directory does not exist: {input_dir}") return [] pattern = self.config.document.file_pattern files = list(input_dir.glob(pattern)) if not files: print(f"No files found matching pattern {pattern} in {input_dir}") return [] processed_docs = [] for file_path in files: print(f"Processing: {file_path}") doc = self.process_document(file_path) if doc: processed_docs.append(doc) return processed_docs
[docs] class DoclingInfoExtractor: """Class for extracting information from documents using LLM with Docling preprocessing"""
[docs] def __init__(self, config: IEAgentConfig, output_schema: type[BaseModel], llm_api: Optional[LangchainAPI] = None): """Initialize the information extractor""" self.config = config self.output_schema = output_schema self.llm_api = llm_api or LangchainAPI() self.output_parser = PydanticOutputParser(pydantic_object=output_schema) self.doc_processor = DoclingDocumentProcessor(config) self.system_prompt = self.refine_system_prompt(config.schema.task_description)
[docs] def refine_system_prompt(self, task_description: str) -> str: """Use LLM to refine user's task description into a proper system prompt""" system_template = """You are an expert at creating clear and effective system prompts for LLMs. Your task is to refine a user's task description into a well-structured system prompt. Guidelines for prompt refinement: 1. Maintain the core objective of the task 2. Add clear instructions and constraints 3. Include relevant context and examples if needed 4. Structure the prompt in a logical order 5. Use clear, unambiguous language 6. Consider that the input will be processed by Docling for better structure recognition """ human_template = """Please refine this task description into a proper system prompt: {task_description} Create a well-structured system prompt that will guide the LLM in extracting information according to the task requirements. The input will be preprocessed by Docling, which means tables, figures, and document structure will be well-preserved in markdown format. Be thorough but concise. """ prompt = ChatPromptTemplate.from_messages([ SystemMessagePromptTemplate.from_template(system_template), HumanMessagePromptTemplate.from_template(human_template) ]) messages = prompt.format_messages(task_description=task_description) response = self.llm_api.chat_completion(messages) refined_prompt = response["response"] if response else task_description print(f"Refined system prompt: {refined_prompt}") return refined_prompt
[docs] def chunk_document(self, doc: DoclingProcessedDocument) -> Generator[DocumentChunk, None, None]: """Split document into chunks based on Docling structure if needed""" content = doc.content chunk_size = self.config.extractor.chunk_size overlap = self.config.extractor.chunk_overlap # For Docling-processed documents, we can be smarter about chunking # by respecting document structure (sections, tables, etc.) if len(content) <= chunk_size: # Document is small enough, return as single chunk yield DocumentChunk( content=content, content_type='text', metadata=doc.metadata, docling_elements=[] # Could extract specific elements here ) return # Smart chunking based on markdown sections lines = content.split('\n') current_chunk = [] current_size = 0 for line in lines: line_size = len(line) + 1 # +1 for newline # If adding this line would exceed chunk size and we have content if current_size + line_size > chunk_size and current_chunk: # Yield current chunk chunk_content = '\n'.join(current_chunk) yield DocumentChunk(content=chunk_content, content_type='text', metadata={ **doc.metadata, 'chunk_size': len(chunk_content), 'chunk_type': 'docling_smart' }) # Start new chunk with overlap overlap_lines = current_chunk[-overlap // 50:] if overlap > 0 else [] current_chunk = overlap_lines + [line] current_size = sum(len(l) + 1 for l in current_chunk) else: current_chunk.append(line) current_size += line_size # Yield final chunk if any content remains if current_chunk: chunk_content = '\n'.join(current_chunk) yield DocumentChunk(content=chunk_content, content_type='text', metadata={ **doc.metadata, 'chunk_size': len(chunk_content), 'chunk_type': 'docling_smart' })
[docs] def create_text_extraction_prompt(self) -> ChatPromptTemplate: """/no_think Create prompt template for text-based information extraction with Docling awareness""" system_message_prompt = SystemMessagePromptTemplate.from_template(self.system_prompt + """ Additional context: The input text has been processed by Docling, which means: - Tables are properly formatted in markdown - Document structure is preserved - Figures and captions are identified - Text quality is enhanced through advanced PDF processing Pay special attention to structured elements like tables and figures when extracting information. """) human_template = """Please extract the required information from the following Docling-processed text: {content} Extract the information according to this schema: {format_instructions} The text has been processed by Docling for better structure recognition. Pay attention to tables (marked with ## Table), figures (marked with ## Figure), and other structured elements in the markdown format. Return the extracted information in the specified JSON format. """ human_message_prompt = HumanMessagePromptTemplate.from_template(human_template) return ChatPromptTemplate.from_messages([system_message_prompt, human_message_prompt])
[docs] def process_text_chunk(self, chunk: DocumentChunk) -> Optional[Dict[str, Any]]: """Process a text document chunk with Docling enhancements""" try: prompt = self.create_text_extraction_prompt() messages = prompt.format_messages(content=chunk.content, format_instructions="/no_think " + self.output_parser.get_format_instructions()) response = self.llm_api.chat_completion(messages) if not response: return None parsed_json = parse_json_markdown(response["response"]) # Validate against schema validated_response = self.output_schema(**parsed_json) return validated_response except Exception as e: print(f"Error processing text chunk: {e}") return None
[docs] def create_multimodal_extraction_prompt(self) -> ChatPromptTemplate: """Create prompt template for multimodal extraction with Docling structure""" system_message_prompt = SystemMessagePromptTemplate.from_template(self.system_prompt + """ Additional context: You are analyzing a document that has been processed by Docling, which provides enhanced structure recognition. The content includes both text and visual elements that have been identified and structured. """) human_template = """Please extract the required information from the provided document content. The document has been processed with Docling for enhanced structure recognition. Extract the information according to this schema: {format_instructions} Return the extracted information in the specified JSON format. """ human_message_prompt = HumanMessagePromptTemplate.from_template(human_template) return ChatPromptTemplate.from_messages([system_message_prompt, human_message_prompt])
[docs] def process_multimodal_chunk(self, chunk: DocumentChunk, doc: DoclingProcessedDocument) -> Optional[Dict[str, Any]]: """Process chunk with access to original Docling result for multimodal content""" try: prompt = self.create_multimodal_extraction_prompt() # Prepare content - combine text with image references content_parts = [{"type": "text", "text": chunk.content}] # If we have the Docling result, we can extract images if doc.docling_result and hasattr(doc.docling_result.document, 'pictures'): for i, picture in enumerate(doc.docling_result.document.pictures): # Try to get image data if available if hasattr(picture, 'image') and picture.image: try: # Convert image to base64 image_data = picture.image if isinstance(image_data, bytes): image_b64 = self.doc_processor.encode_image_base64(image_data) content_parts.append({ "type": "image_url", "image_url": { "url": f"data:image/jpeg;base64,{image_b64}" } }) except Exception as e: print(f"Could not process image {i}: {e}") messages = prompt.format_messages(format_instructions=self.output_parser.get_format_instructions()) # Update the human message with multimodal content if len(content_parts) > 1: messages[1].content = content_parts response = self.llm_api.chat_completion(messages) if not response: return None parsed_json = parse_json_markdown(response["response"]) # Validate against schema validated_response = self.output_schema(**parsed_json) return validated_response except Exception as e: print(f"Error processing multimodal chunk: {e}") # Fallback to text-only processing return self.process_text_chunk(chunk)
[docs] def process_chunk(self, chunk: DocumentChunk, doc: DoclingProcessedDocument) -> Optional[Dict[str, Any]]: """Process a document chunk with Docling context""" # Check if we can do multimodal processing if (hasattr(self.llm_api, 'supports_vision') and self.llm_api.supports_vision() and doc.docling_result and hasattr(doc.docling_result.document, 'pictures') and doc.docling_result.document.pictures): return self.process_multimodal_chunk(chunk, doc) else: return self.process_text_chunk(chunk)
[docs] def process_document(self, doc: Union[DoclingProcessedDocument, List[DoclingProcessedDocument]]) -> List[Dict[str, Any]]: """Process document and extract information""" # Handle both single documents and lists docs = [doc] if isinstance(doc, DoclingProcessedDocument) else doc # Process each document all_results = [] for d in docs: if d.content_type != 'chunks': # Create chunks for the document chunks = list(self.chunk_document(d)) else: chunks = d.content # Process chunks with document context doc_results = [] for chunk in chunks: result = self.process_chunk(chunk, d) if result is not None: doc_results.append(result) all_results.extend(doc_results) return all_results
[docs] def save_results(self, results: List[Any], output_path: Path) -> None: """Save extraction results to JSON file""" output_path.parent.mkdir(parents=True, exist_ok=True) # Convert Pydantic models to dictionaries json_results = [] for result in results: if hasattr(result, 'model_dump'): # Pydantic v2 json_results.append(result.model_dump()) elif hasattr(result, 'dict'): # Pydantic v1 json_results.append(result.dict()) else: json_results.append(result) # Already a dict or other JSON-serializable object with open(output_path, 'w', encoding='utf-8') as f: json.dump(json_results, f, indent=2, ensure_ascii=False) print(f"Results saved to {output_path}")
[docs] def process_all(self) -> None: """Process all documents in configured directory""" # Process documents using Docling processed_docs = self.doc_processor.process_directory() if not processed_docs: print("No documents to process") return # Process each document for doc in processed_docs: try: results = self.process_document(doc) # Generate output path from source file name source_file = Path(doc.metadata['source_file']).stem output_path = Path(self.config.document.output_dir) / f"{source_file}_extracted_docling.json" self.save_results(results, output_path) except Exception as e: print(f"Error processing document: {e}") continue
if __name__ == "__main__": import os import importlib.util from pathlib import Path from dllmforge.IE_agent_config import IEAgentConfig, ExtractorConfig, DocumentConfig, SchemaConfig from dllmforge.IE_agent_schema_generator import SchemaGenerator from dllmforge.LLMs.Deltares_LLMs import DeltaresOllamaLLM # Setup paths current_dir = Path(__file__).parent schema_dir = current_dir / "generated_schemas" schema_dir.mkdir(exist_ok=True) schema_file = schema_dir / "model_hyperparameters_docling.py" # Create schema configuration for model hyperparameters schema_config = SchemaConfig( task_description= """/no_think Generate a Pydantic schema class named ModelHyperparameters to extract machine learning model hyperparameters from research papers and documentation. The schema should capture: model architecture details (type, layers, neurons, etc.), training parameters (learning rate, batch size, epochs), optimization settings (optimizer, loss function), regularization techniques (dropout, etc.). Note: Input will be processed by Docling for enhanced structure recognition including tables and figures.""", output_path=str(schema_file)) # Generate and save the schema llm = DeltaresOllamaLLM(base_url="https://chat-api.directory.intra", model_name="qwen3:latest", temperature=0.8) schema_generator = SchemaGenerator(schema_config, llm_api=llm) schema_code = schema_generator.generate_schema() # Find all class names in the generated code import re class_matches = re.finditer(r"class\s+(\w+)\s*\(", schema_code) class_names = [match.group(1) for match in class_matches] if not class_names: raise ValueError("Could not find any class names in generated schema") # Get the last class as it's typically the main schema schema_class_name = class_names[-1] print(f"\nFound schema classes: {', '.join(class_names)}") print(f"Using main schema class: {schema_class_name}") # Save the schema schema_generator.save_schema(schema_code) # Import the generated schema module spec = importlib.util.spec_from_file_location("model_hyperparameters_docling", schema_file) module = importlib.util.module_from_spec(spec) spec.loader.exec_module(module) # Get the schema class dynamically if not hasattr(module, schema_class_name): raise ValueError(f"Generated schema does not contain class {schema_class_name}") SchemaClass = getattr(module, schema_class_name) # Create configuration for the extractor config = IEAgentConfig( schema=schema_config, # Reuse the same schema config document=DocumentConfig( input_dir=r"tests\test_input", output_dir=r"tests\test_output", file_pattern="*.pdf", # Process PDF files output_type="text" # Extract as text ), extractor=ExtractorConfig()) # Example 1: Process single document with Docling print("\nExample 1: Processing single document with Docling...") single_doc_path = Path( r"tests\test_input\piping_documents\Campos Montero et al. - 2025 - SchemaGAN A conditional Generative Adversarial Network for geotechnical subsurface schematisation.pdf" ) # Create extractor with Docling support extractor = DoclingInfoExtractor(config=config, output_schema=SchemaClass, llm_api=llm) # Process the document doc = extractor.doc_processor.process_document(single_doc_path) if doc: results = extractor.process_document(doc) output_path = Path(config.document.output_dir) / f"{single_doc_path.stem}_extracted_docling.json" extractor.save_results(results, output_path) # Example 2: Process entire directory with Docling #print("\nExample 2: Processing entire directory with Docling...") ## Create new extractor instance with the same schema #extractor = DoclingInfoExtractor(config=config, output_schema=SchemaClass, llm_api=llm) #extractor.process_all()