babylon.rag
RAG (Retrieval Augmented Generation) system implementation.
This module implements a Retrieval Augmented Generation system with: - Object lifecycle management - Embeddings and debeddings - Document chunking and preprocessing - Vector storage and retrieval - End-to-end RAG pipeline - Pre-embeddings system - Context window management - Priority queuing - Working set management with tiered contexts
Main Components: - RagPipeline: Main orchestrator for ingestion and querying - DocumentProcessor: Text preprocessing and chunking - EmbeddingManager: OpenAI-based embedding generation - VectorStore: ChromaDB-based vector storage - Retriever: High-level retrieval interface
- Usage:
Basic usage with the main pipeline:
```python from babylon.rag import RagPipeline, RagConfig
# Initialize pipeline with custom config config = RagConfig(chunk_size=1000, default_top_k=5) pipeline = RagPipeline(config=config)
# Ingest documents result = pipeline.ingest_text(“Your document content here”, “doc_1”)
# Query for relevant content response = pipeline.query(“What is this document about?”)
# Get combined context for LLM context = response.get_combined_context(max_length=2000) ```
Or use individual components:
```python from babylon.rag import DocumentProcessor, EmbeddingManager, VectorStore
# Process documents processor = DocumentProcessor() chunks = processor.process_text(“Your content”, “source_id”)
# Generate embeddings embedding_manager = EmbeddingManager() embedded_chunks = await embedding_manager.aembed_batch(chunks)
# Store in vector database vector_store = VectorStore(“my_collection”) vector_store.add_chunks(embedded_chunks) ```
- class babylon.rag.DocumentProcessor(preprocessor=None, chunker=None)[source]
Bases:
objectHigh-level document processor that combines preprocessing and chunking.
- Parameters:
preprocessor (Preprocessor | None)
chunker (TextChunker | None)
- __init__(preprocessor=None, chunker=None)[source]
Initialize the document processor.
- Parameters:
preprocessor (
Preprocessor|None) – Optional custom preprocessor (uses default if None)chunker (
TextChunker|None) – Optional custom chunker (uses default if None)
- process_file(file_path, encoding='utf-8')[source]
Process a text file into document chunks.
- Parameters:
- Return type:
- Returns:
List of processed DocumentChunk objects
- Raises:
FileNotFoundError – If file doesn’t exist
PreprocessingError – If preprocessing fails
ChunkingError – If chunking fails
- class babylon.rag.DocumentChunk(**data)[source]
Bases:
BaseModelRepresents a chunk of a document with metadata.
- Parameters:
- model_config: ClassVar[ConfigDict] = {'validate_assignment': True}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class babylon.rag.TextChunker(chunk_size=1000, overlap_size=100, preserve_paragraphs=True, preserve_sentences=True)[source]
Bases:
objectChunks text into smaller, contextually meaningful pieces.
- Parameters:
- __init__(chunk_size=1000, overlap_size=100, preserve_paragraphs=True, preserve_sentences=True)[source]
Initialize the chunker.
- class babylon.rag.Preprocessor(min_content_length=50, max_content_length=100000, remove_extra_whitespace=True, normalize_unicode=True)[source]
Bases:
objectPreprocesses documents for chunking and embedding.
- Parameters:
- __init__(min_content_length=50, max_content_length=100000, remove_extra_whitespace=True, normalize_unicode=True)[source]
Initialize the preprocessor.
- class babylon.rag.LifecycleManager[source]
Bases:
objectManages the lifecycle of RAG objects.
Tracks object states, transitions, and provides metrics for monitoring system health.
- get_object_state(object_id)[source]
Get the current state of an object.
- Parameters:
object_id (
str) – Unique identifier for the object- Return type:
ObjectState|str|None- Returns:
Current state or None if object not found
- get_objects_in_state(state)[source]
Get all objects currently in a given state.
- Parameters:
state (
ObjectState) – The state to filter by- Return type:
- Returns:
List of object IDs in the specified state
- get_performance_metrics()[source]
Get current performance metrics.
- Return type:
- Returns:
PerformanceMetrics dataclass with current stats
- get_state_history(object_id)[source]
Get the state transition history for an object.
- Parameters:
object_id (
str) – Unique identifier for the object- Return type:
- Returns:
List of (state, timestamp) tuples
- register_object(object_id, initial_state=ObjectState.CREATED)[source]
Register a new object for lifecycle tracking.
- Parameters:
object_id (
str) – Unique identifier for the objectinitial_state (
ObjectState) – Initial state for the object
- Return type:
- class babylon.rag.ObjectState(*values)[source]
Bases:
EnumPossible states for RAG objects in their lifecycle.
- CREATED = 'created'
- PREPROCESSED = 'preprocessed'
- CHUNKED = 'chunked'
- EMBEDDED = 'embedded'
- STORED = 'stored'
- RETRIEVED = 'retrieved'
- REMOVED_FROM_CONTEXT = 'removed_from_context'
- ARCHIVED = 'archived'
- ERROR = 'error'
- class babylon.rag.PerformanceMetrics(hot_objects=<factory>, cache_hit_rate=<factory>, avg_token_usage=0.0, latency_stats=<factory>, memory_profile=<factory>)[source]
Bases:
objectPerformance metrics for RAG operations.
- Parameters:
- __init__(hot_objects=<factory>, cache_hit_rate=<factory>, avg_token_usage=0.0, latency_stats=<factory>, memory_profile=<factory>)
- exception babylon.rag.RagError(message, error_code='RAG_001', details=None)[source]
Bases:
ObserverErrorUnified exception class for all RAG system errors.
Use error_code to distinguish between different error types: - RAG_001-099: General errors - RAG_1XX: Lifecycle errors (invalid objects, state transitions) - RAG_2XX: Embedding errors (API failures, batch errors) - RAG_3XX: Query/retrieval errors - RAG_4XX: Pre-embedding errors (chunking, preprocessing) - RAG_5XX: Context window errors (capacity, optimization)
- class babylon.rag.RagPipeline(config=None, chroma_manager=None, embedding_manager=None)[source]
Bases:
objectMain RAG pipeline that orchestrates ingestion and retrieval.
- Parameters:
config (RagConfig | None)
chroma_manager (ChromaManager | None)
embedding_manager (EmbeddingManager | None)
- __exit__(exc_type, exc_val, exc_tb)[source]
Context manager exit with cleanup.
- Return type:
- Parameters:
exc_type (type[BaseException] | None)
exc_val (BaseException | None)
exc_tb (TracebackType | None)
- __init__(config=None, chroma_manager=None, embedding_manager=None)[source]
Initialize the RAG pipeline.
- Parameters:
config (
RagConfig|None) – RAG configuration (uses default if None)chroma_manager (
ChromaManager|None) – ChromaDB manager (creates new if None)embedding_manager (
EmbeddingManager|None) – Embedding manager (creates new if None)
- async aingest_file(file_path, encoding='utf-8')[source]
Asynchronously ingest a text file into the RAG system.
- Parameters:
- Return type:
- Returns:
IngestionResult with processing statistics
- Raises:
FileNotFoundError – If file doesn’t exist
RagError – If ingestion fails
- async aingest_files(file_paths, encoding='utf-8', max_concurrent=5)[source]
Asynchronously ingest multiple files concurrently.
- async aingest_text(content, source_id, metadata=None)[source]
Asynchronously ingest text content into the RAG system.
- Parameters:
- Return type:
- Returns:
IngestionResult with processing statistics
- Raises:
RagError – If ingestion fails
- async aquery(query, top_k=None, similarity_threshold=None, metadata_filter=None)[source]
Asynchronously query the RAG system for relevant content.
- Parameters:
- Return type:
- Returns:
QueryResponse with search results
- Raises:
RagError – If query processing fails
- clear_collection()[source]
Clear all documents from the collection.
WARNING: This will delete all ingested documents!
- Return type:
- ingest_file(file_path, encoding='utf-8')[source]
Synchronously ingest a text file into the RAG system.
- Parameters:
- Return type:
- Returns:
IngestionResult with processing statistics
- ingest_files(file_paths, encoding='utf-8', max_concurrent=5)[source]
Synchronously ingest multiple files.
- ingest_text(content, source_id, metadata=None)[source]
Synchronously ingest text content into the RAG system.
This is a convenience wrapper around aingest_text for synchronous code. For better performance in async contexts, use aingest_text directly.
- class babylon.rag.RagConfig(**data)[source]
Bases:
BaseModelConfiguration for RAG pipeline.
- Parameters:
- model_config: ClassVar[ConfigDict] = {'frozen': True}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class babylon.rag.IngestionResult(**data)[source]
Bases:
BaseModelResult of document ingestion process.
- Parameters:
- model_config: ClassVar[ConfigDict] = {'frozen': True}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- async babylon.rag.quick_ingest_text(content, source_id, collection_name='quick_rag')[source]
Quickly ingest text content using default settings.
- Parameters:
- Return type:
- Returns:
IngestionResult with processing statistics
- async babylon.rag.quick_query(query, collection_name='quick_rag', top_k=5)[source]
Quickly query the RAG system using default settings.
- Parameters:
- Return type:
- Returns:
QueryResponse with search results
- class babylon.rag.EmbeddingManager(embedding_dimension=None, batch_size=None, max_cache_size=1000, max_concurrent_requests=4)[source]
Bases:
objectManages embeddings for RAG objects.
The EmbeddingManager handles: - Generating embeddings via Ollama (local) or OpenAI (cloud) API - Caching embeddings for reuse with LRU eviction - Rate limiting and retry logic - Batch operations for efficiency - Error handling and recovery - Performance metrics collection - Concurrent operations
Default: Ollama with embeddinggemma for fully offline operation.
- Parameters:
- __init__(embedding_dimension=None, batch_size=None, max_cache_size=1000, max_concurrent_requests=4)[source]
Initialize the embedding manager.
- Parameters:
embedding_dimension (
int|None) – Size of embedding vectors (default: from LLMConfig)batch_size (
int|None) – Number of objects to embed in each batch (default: from LLMConfig)max_cache_size (
int) – Maximum number of embeddings to keep in cache (default: 1000)max_concurrent_requests (
int) – Maximum number of concurrent embedding requests (default: 4)
- Raises:
ValueError – If embedding configuration is invalid
- async aembed(obj)[source]
Asynchronously generate and attach embedding for a single object.
- Parameters:
obj (
TypeVar(E, bound=Embeddable)) – Object to embed- Return type:
TypeVar(E, bound=Embeddable)- Returns:
Object with embedding attached
- Raises:
ValueError – If object content is invalid
EmbeddingError – If embedding generation fails
- async aembed_batch(objects)[source]
Asynchronously generate embeddings for multiple objects efficiently.
- Parameters:
objects (
Sequence[TypeVar(E, bound=Embeddable)]) – List of objects to embed- Return type:
list[TypeVar(E, bound=Embeddable)]- Returns:
List of objects with embeddings attached
- Raises:
EmbeddingError – If any object’s embedding generation fails
- debed(obj)[source]
Remove embedding from an object.
- Parameters:
obj (
TypeVar(E, bound=Embeddable)) – Object to remove embedding from- Return type:
TypeVar(E, bound=Embeddable)- Returns:
Object with embedding removed
- debed_batch(objects)[source]
Remove embeddings from multiple objects.
- Parameters:
objects (
Sequence[TypeVar(E, bound=Embeddable)]) – List of objects to remove embeddings from- Return type:
list[TypeVar(E, bound=Embeddable)]- Returns:
List of objects with embeddings removed
- embed(obj)[source]
Synchronously generate and attach embedding for a single object.
This is a convenience wrapper around aembed for synchronous code. For better performance in async contexts, use aembed directly.
- Parameters:
obj (
TypeVar(E, bound=Embeddable)) – Object to embed- Return type:
TypeVar(E, bound=Embeddable)- Returns:
Object with embedding attached
- Raises:
ValueError – If object content is invalid
EmbeddingError – If embedding generation fails
- embed_batch(objects)[source]
Synchronously generate embeddings for multiple objects efficiently.
This is a convenience wrapper around aembed_batch for synchronous code. For better performance in async contexts, use aembed_batch directly.
- Parameters:
objects (
Sequence[TypeVar(E, bound=Embeddable)]) – List of objects to embed- Return type:
list[TypeVar(E, bound=Embeddable)]- Returns:
List of objects with embeddings attached
- Raises:
EmbeddingError – If any object’s embedding generation fails
- class babylon.rag.VectorStore(collection_name='documents', chroma_manager=None)[source]
Bases:
objectInterface to ChromaDB for storing and retrieving document vectors.
- Parameters:
collection_name (str)
chroma_manager (ChromaManager | None)
- __init__(collection_name='documents', chroma_manager=None)[source]
Initialize the vector store.
- Parameters:
collection_name (
str) – Name of the ChromaDB collectionchroma_manager (
ChromaManager|None) – Optional ChromaManager instance (creates new if None)
- add_chunks(chunks)[source]
Add document chunks to the vector store.
- Parameters:
chunks (
list[DocumentChunk]) – List of DocumentChunk objects with embeddings- Raises:
RagError – If chunks are missing embeddings or storage fails
- Return type:
- query_similar(query_embedding, k=10, where=None, include=None)[source]
Query for similar chunks using embedding.
- class babylon.rag.Retriever(vector_store, embedding_manager)[source]
Bases:
objectHigh-level retrieval interface for RAG queries.
- Parameters:
vector_store (VectorStore)
embedding_manager (EmbeddingManager)
- __init__(vector_store, embedding_manager)[source]
Initialize the retriever.
- Parameters:
vector_store (
VectorStore) – VectorStore instance for similarity searchembedding_manager (
EmbeddingManager) – EmbeddingManager for query embedding
- async aquery(query, k=10, similarity_threshold=0.0, metadata_filter=None)[source]
Asynchronously query for relevant document chunks.
- Parameters:
- Return type:
- Returns:
QueryResponse with results and timing information
- Raises:
RagError – If query processing fails
- query(query, k=10, similarity_threshold=0.0, metadata_filter=None)[source]
Synchronously query for relevant document chunks.
This is a convenience wrapper around aquery for synchronous code. For better performance in async contexts, use aquery directly.
- Parameters:
- Return type:
- Returns:
QueryResponse with results and timing information
- Raises:
RagError – If query processing fails
- class babylon.rag.QueryResponse(**data)[source]
Bases:
BaseModelRepresents the complete response to a query.
- Parameters:
- get_combined_context(max_length=4000, separator='\\n\\n')[source]
Combine result chunks into a single context string.
- model_config: ClassVar[ConfigDict] = {'validate_assignment': True}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
-
results:
list[QueryResult]
- class babylon.rag.QueryResult(**data)[source]
Bases:
BaseModelRepresents a single query result with similarity score.
- Parameters:
- convert_similarity_to_distance()[source]
Convert similarity score to distance if not provided.
- Return type:
- model_config: ClassVar[ConfigDict] = {'validate_assignment': True}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
-
chunk:
DocumentChunk
Modules
Document chunking and preprocessing for the RAG system. |
|
Context window management for the RAG system. |
|
Embedding management for the RAG system. |
|
Custom exceptions for the RAG system. |
|
Object lifecycle management for the RAG system. |
|
Pre-embedding pipeline for RAG document processing. |
|
Main RAG pipeline service that orchestrates document ingestion and query processing. |
|
Query and retrieval interface for the RAG system. |