babylon.rag

RAG (Retrieval Augmented Generation) system implementation.

This module implements a Retrieval Augmented Generation system with: - Object lifecycle management - Embeddings and debeddings - Document chunking and preprocessing - Vector storage and retrieval - End-to-end RAG pipeline - Pre-embeddings system - Context window management - Priority queuing - Working set management with tiered contexts

Main Components: - RagPipeline: Main orchestrator for ingestion and querying - DocumentProcessor: Text preprocessing and chunking - EmbeddingManager: OpenAI-based embedding generation - VectorStore: ChromaDB-based vector storage - Retriever: High-level retrieval interface

Usage:

Basic usage with the main pipeline:

```python from babylon.rag import RagPipeline, RagConfig

# Initialize pipeline with custom config config = RagConfig(chunk_size=1000, default_top_k=5) pipeline = RagPipeline(config=config)

# Ingest documents result = pipeline.ingest_text(“Your document content here”, “doc_1”)

# Query for relevant content response = pipeline.query(“What is this document about?”)

# Get combined context for LLM context = response.get_combined_context(max_length=2000) ```

Or use individual components:

```python from babylon.rag import DocumentProcessor, EmbeddingManager, VectorStore

# Process documents processor = DocumentProcessor() chunks = processor.process_text(“Your content”, “source_id”)

# Generate embeddings embedding_manager = EmbeddingManager() embedded_chunks = await embedding_manager.aembed_batch(chunks)

# Store in vector database vector_store = VectorStore(“my_collection”) vector_store.add_chunks(embedded_chunks) ```

class babylon.rag.DocumentProcessor(preprocessor=None, chunker=None)[source]

Bases: object

High-level document processor that combines preprocessing and chunking.

Parameters:
__init__(preprocessor=None, chunker=None)[source]

Initialize the document processor.

Parameters:
  • preprocessor (Preprocessor | None) – Optional custom preprocessor (uses default if None)

  • chunker (TextChunker | None) – Optional custom chunker (uses default if None)

process_file(file_path, encoding='utf-8')[source]

Process a text file into document chunks.

Parameters:
  • file_path (str) – Path to the text file

  • encoding (str) – File encoding (default: utf-8)

Return type:

list[DocumentChunk]

Returns:

List of processed DocumentChunk objects

Raises:
  • FileNotFoundError – If file doesn’t exist

  • PreprocessingError – If preprocessing fails

  • ChunkingError – If chunking fails

process_text(content, source_file=None, metadata=None)[source]

Process raw text into document chunks.

Parameters:
  • content (str) – Raw text content

  • source_file (str | None) – Optional source file path

  • metadata (dict[str, Any] | None) – Optional metadata to attach to chunks

Return type:

list[DocumentChunk]

Returns:

List of processed DocumentChunk objects

Raises:
  • PreprocessingError – If preprocessing fails

  • ChunkingError – If chunking fails

class babylon.rag.DocumentChunk(**data)[source]

Bases: BaseModel

Represents a chunk of a document with metadata.

Parameters:
generate_id_if_empty()[source]

Generate ID if not provided.

Return type:

DocumentChunk

model_config: ClassVar[ConfigDict] = {'validate_assignment': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

id: str
content: str
source_file: str | None
chunk_index: int
start_char: int
end_char: int
metadata: dict[str, Any] | None
embedding: list[float] | None
class babylon.rag.TextChunker(chunk_size=1000, overlap_size=100, preserve_paragraphs=True, preserve_sentences=True)[source]

Bases: object

Chunks text into smaller, contextually meaningful pieces.

Parameters:
  • chunk_size (int)

  • overlap_size (int)

  • preserve_paragraphs (bool)

  • preserve_sentences (bool)

__init__(chunk_size=1000, overlap_size=100, preserve_paragraphs=True, preserve_sentences=True)[source]

Initialize the chunker.

Parameters:
  • chunk_size (int) – Target size for each chunk in characters

  • overlap_size (int) – Number of overlapping characters between chunks

  • preserve_paragraphs (bool) – Try to avoid splitting paragraphs

  • preserve_sentences (bool) – Try to avoid splitting sentences

chunk_text(content, source_file=None, metadata=None)[source]

Chunk text content into DocumentChunk objects.

Parameters:
  • content (str) – Text content to chunk

  • source_file (str | None) – Optional source file path

  • metadata (dict[str, Any] | None) – Optional metadata to attach to chunks

Return type:

list[DocumentChunk]

Returns:

List of DocumentChunk objects

Raises:

ChunkingError – If chunking fails

class babylon.rag.Preprocessor(min_content_length=50, max_content_length=100000, remove_extra_whitespace=True, normalize_unicode=True)[source]

Bases: object

Preprocesses documents for chunking and embedding.

Parameters:
  • min_content_length (int)

  • max_content_length (int)

  • remove_extra_whitespace (bool)

  • normalize_unicode (bool)

__init__(min_content_length=50, max_content_length=100000, remove_extra_whitespace=True, normalize_unicode=True)[source]

Initialize the preprocessor.

Parameters:
  • min_content_length (int) – Minimum acceptable content length

  • max_content_length (int) – Maximum acceptable content length

  • remove_extra_whitespace (bool) – Whether to normalize whitespace

  • normalize_unicode (bool) – Whether to normalize unicode characters

preprocess(content, content_id=None)[source]

Preprocess content for chunking.

Parameters:
  • content (str) – Raw content to preprocess

  • content_id (str | None) – Optional identifier for error reporting

Return type:

str

Returns:

Preprocessed content

Raises:

PreprocessingError – If content fails validation or preprocessing

class babylon.rag.LifecycleManager[source]

Bases: object

Manages the lifecycle of RAG objects.

Tracks object states, transitions, and provides metrics for monitoring system health.

__init__()[source]

Initialize the lifecycle manager.

Return type:

None

clear()[source]

Clear all tracked objects and history.

Return type:

None

get_object(object_id)[source]

Get full object information.

Parameters:

object_id (str) – Unique identifier for the object

Return type:

dict[str, Any] | None

Returns:

Object data dict or None if not found

get_object_state(object_id)[source]

Get the current state of an object.

Parameters:

object_id (str) – Unique identifier for the object

Return type:

ObjectState | str | None

Returns:

Current state or None if object not found

get_objects_in_state(state)[source]

Get all objects currently in a given state.

Parameters:

state (ObjectState) – The state to filter by

Return type:

list[str]

Returns:

List of object IDs in the specified state

get_performance_metrics()[source]

Get current performance metrics.

Return type:

PerformanceMetrics

Returns:

PerformanceMetrics dataclass with current stats

get_state_history(object_id)[source]

Get the state transition history for an object.

Parameters:

object_id (str) – Unique identifier for the object

Return type:

list[tuple[ObjectState, datetime]]

Returns:

List of (state, timestamp) tuples

register_object(object_id, initial_state=ObjectState.CREATED)[source]

Register a new object for lifecycle tracking.

Parameters:
  • object_id (str) – Unique identifier for the object

  • initial_state (ObjectState) – Initial state for the object

Return type:

None

update_object_state(object_id, new_state)[source]

Update the state of a tracked object.

Parameters:
  • object_id (str) – Unique identifier for the object

  • new_state (ObjectState | str) – New state for the object (ObjectState enum or string)

Return type:

None

class babylon.rag.ObjectState(*values)[source]

Bases: Enum

Possible states for RAG objects in their lifecycle.

CREATED = 'created'
PREPROCESSED = 'preprocessed'
CHUNKED = 'chunked'
EMBEDDED = 'embedded'
STORED = 'stored'
RETRIEVED = 'retrieved'
REMOVED_FROM_CONTEXT = 'removed_from_context'
ARCHIVED = 'archived'
ERROR = 'error'
class babylon.rag.PerformanceMetrics(hot_objects=<factory>, cache_hit_rate=<factory>, avg_token_usage=0.0, latency_stats=<factory>, memory_profile=<factory>)[source]

Bases: object

Performance metrics for RAG operations.

Parameters:
__init__(hot_objects=<factory>, cache_hit_rate=<factory>, avg_token_usage=0.0, latency_stats=<factory>, memory_profile=<factory>)
Parameters:
Return type:

None

avg_token_usage: float = 0.0
hot_objects: list[str]
cache_hit_rate: dict[str, float]
latency_stats: dict[str, float]
memory_profile: dict[str, float]
exception babylon.rag.RagError(message, error_code='RAG_001', details=None)[source]

Bases: ObserverError

Unified exception class for all RAG system errors.

Use error_code to distinguish between different error types: - RAG_001-099: General errors - RAG_1XX: Lifecycle errors (invalid objects, state transitions) - RAG_2XX: Embedding errors (API failures, batch errors) - RAG_3XX: Query/retrieval errors - RAG_4XX: Pre-embedding errors (chunking, preprocessing) - RAG_5XX: Context window errors (capacity, optimization)

Parameters:
Return type:

None

__init__(message, error_code='RAG_001', details=None)[source]
Parameters:
Return type:

None

babylon.rag.LifecycleError

alias of RagError

babylon.rag.InvalidObjectError

alias of RagError

babylon.rag.StateTransitionError

alias of RagError

babylon.rag.CorruptStateError

alias of RagError

babylon.rag.PreEmbeddingError

alias of RagError

babylon.rag.PreprocessingError

alias of RagError

babylon.rag.ChunkingError

alias of RagError

babylon.rag.CacheError

alias of RagError

class babylon.rag.RagPipeline(config=None, chroma_manager=None, embedding_manager=None)[source]

Bases: object

Main RAG pipeline that orchestrates ingestion and retrieval.

Parameters:
__enter__()[source]

Context manager entry.

Return type:

Self

__exit__(exc_type, exc_val, exc_tb)[source]

Context manager exit with cleanup.

Return type:

None

Parameters:
__init__(config=None, chroma_manager=None, embedding_manager=None)[source]

Initialize the RAG pipeline.

Parameters:
async aclose()[source]

Asynchronously close the RAG pipeline and clean up resources.

Return type:

None

async aingest_file(file_path, encoding='utf-8')[source]

Asynchronously ingest a text file into the RAG system.

Parameters:
  • file_path (str) – Path to the text file

  • encoding (str) – File encoding (default: utf-8)

Return type:

IngestionResult

Returns:

IngestionResult with processing statistics

Raises:
async aingest_files(file_paths, encoding='utf-8', max_concurrent=5)[source]

Asynchronously ingest multiple files concurrently.

Parameters:
  • file_paths (list[str]) – List of file paths to ingest

  • encoding (str) – File encoding (default: utf-8)

  • max_concurrent (int) – Maximum number of concurrent ingestions

Return type:

list[IngestionResult]

Returns:

List of IngestionResult objects

async aingest_text(content, source_id, metadata=None)[source]

Asynchronously ingest text content into the RAG system.

Parameters:
  • content (str) – Text content to ingest

  • source_id (str) – Unique identifier for the content source

  • metadata (dict[str, Any] | None) – Optional metadata to attach to chunks

Return type:

IngestionResult

Returns:

IngestionResult with processing statistics

Raises:

RagError – If ingestion fails

async aquery(query, top_k=None, similarity_threshold=None, metadata_filter=None)[source]

Asynchronously query the RAG system for relevant content.

Parameters:
  • query (str) – Query text to search for

  • top_k (int | None) – Number of results to return (uses config default if None)

  • similarity_threshold (float | None) – Minimum similarity score (uses config default if None)

  • metadata_filter (dict[str, Any] | None) – Optional filters for chunk metadata

Return type:

QueryResponse

Returns:

QueryResponse with search results

Raises:

RagError – If query processing fails

clear_collection()[source]

Clear all documents from the collection.

WARNING: This will delete all ingested documents!

Return type:

None

close()[source]

Synchronously close the RAG pipeline and clean up resources.

Return type:

None

get_stats()[source]

Get statistics about the RAG system.

Return type:

dict[str, Any]

Returns:

Dictionary with system statistics

ingest_file(file_path, encoding='utf-8')[source]

Synchronously ingest a text file into the RAG system.

Parameters:
  • file_path (str) – Path to the text file

  • encoding (str) – File encoding (default: utf-8)

Return type:

IngestionResult

Returns:

IngestionResult with processing statistics

ingest_files(file_paths, encoding='utf-8', max_concurrent=5)[source]

Synchronously ingest multiple files.

Parameters:
  • file_paths (list[str]) – List of file paths to ingest

  • encoding (str) – File encoding (default: utf-8)

  • max_concurrent (int) – Maximum number of concurrent ingestions

Return type:

list[IngestionResult]

Returns:

List of IngestionResult objects

ingest_text(content, source_id, metadata=None)[source]

Synchronously ingest text content into the RAG system.

This is a convenience wrapper around aingest_text for synchronous code. For better performance in async contexts, use aingest_text directly.

Parameters:
  • content (str) – Text content to ingest

  • source_id (str) – Unique identifier for the content source

  • metadata (dict[str, Any] | None) – Optional metadata to attach to chunks

Return type:

IngestionResult

Returns:

IngestionResult with processing statistics

query(query, top_k=None, similarity_threshold=None, metadata_filter=None)[source]

Synchronously query the RAG system for relevant content.

Parameters:
  • query (str) – Query text to search for

  • top_k (int | None) – Number of results to return (uses config default if None)

  • similarity_threshold (float | None) – Minimum similarity score (uses config default if None)

  • metadata_filter (dict[str, Any] | None) – Optional filters for chunk metadata

Return type:

QueryResponse

Returns:

QueryResponse with search results

class babylon.rag.RagConfig(**data)[source]

Bases: BaseModel

Configuration for RAG pipeline.

Parameters:
  • chunk_size (int)

  • chunk_overlap (int)

  • min_chunk_length (int)

  • max_chunk_length (int)

  • embedding_batch_size (int)

  • max_concurrent_embeds (int)

  • default_top_k (int)

  • default_similarity_threshold (float)

  • collection_name (str)

  • use_persistent_storage (bool)

model_config: ClassVar[ConfigDict] = {'frozen': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

chunk_size: int
chunk_overlap: int
min_chunk_length: int
max_chunk_length: int
embedding_batch_size: int
max_concurrent_embeds: int
default_top_k: int
default_similarity_threshold: float
collection_name: str
use_persistent_storage: bool
class babylon.rag.IngestionResult(**data)[source]

Bases: BaseModel

Result of document ingestion process.

Parameters:
model_config: ClassVar[ConfigDict] = {'frozen': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

success: bool
chunks_processed: int
chunks_stored: int
processing_time_ms: float
embedding_time_ms: float
storage_time_ms: float
errors: list[str]
source_files: list[str]
async babylon.rag.quick_ingest_text(content, source_id, collection_name='quick_rag')[source]

Quickly ingest text content using default settings.

Parameters:
  • content (str) – Text content to ingest

  • source_id (str) – Unique identifier for the content

  • collection_name (str) – ChromaDB collection name

Return type:

IngestionResult

Returns:

IngestionResult with processing statistics

async babylon.rag.quick_query(query, collection_name='quick_rag', top_k=5)[source]

Quickly query the RAG system using default settings.

Parameters:
  • query (str) – Query text to search for

  • collection_name (str) – ChromaDB collection name

  • top_k (int) – Number of results to return

Return type:

QueryResponse

Returns:

QueryResponse with search results

class babylon.rag.EmbeddingManager(embedding_dimension=None, batch_size=None, max_cache_size=1000, max_concurrent_requests=4)[source]

Bases: object

Manages embeddings for RAG objects.

The EmbeddingManager handles: - Generating embeddings via Ollama (local) or OpenAI (cloud) API - Caching embeddings for reuse with LRU eviction - Rate limiting and retry logic - Batch operations for efficiency - Error handling and recovery - Performance metrics collection - Concurrent operations

Default: Ollama with embeddinggemma for fully offline operation.

Parameters:
  • embedding_dimension (int | None)

  • batch_size (int | None)

  • max_cache_size (int)

  • max_concurrent_requests (int)

__init__(embedding_dimension=None, batch_size=None, max_cache_size=1000, max_concurrent_requests=4)[source]

Initialize the embedding manager.

Parameters:
  • embedding_dimension (int | None) – Size of embedding vectors (default: from LLMConfig)

  • batch_size (int | None) – Number of objects to embed in each batch (default: from LLMConfig)

  • max_cache_size (int) – Maximum number of embeddings to keep in cache (default: 1000)

  • max_concurrent_requests (int) – Maximum number of concurrent embedding requests (default: 4)

Raises:

ValueError – If embedding configuration is invalid

async aembed(obj)[source]

Asynchronously generate and attach embedding for a single object.

Parameters:

obj (TypeVar(E, bound= Embeddable)) – Object to embed

Return type:

TypeVar(E, bound= Embeddable)

Returns:

Object with embedding attached

Raises:
async aembed_batch(objects)[source]

Asynchronously generate embeddings for multiple objects efficiently.

Parameters:

objects (Sequence[TypeVar(E, bound= Embeddable)]) – List of objects to embed

Return type:

list[TypeVar(E, bound= Embeddable)]

Returns:

List of objects with embeddings attached

Raises:

EmbeddingError – If any object’s embedding generation fails

property cache_size: int

Get current number of embeddings in cache.

async close()[source]

Close resources used by the embedding manager.

Return type:

None

debed(obj)[source]

Remove embedding from an object.

Parameters:

obj (TypeVar(E, bound= Embeddable)) – Object to remove embedding from

Return type:

TypeVar(E, bound= Embeddable)

Returns:

Object with embedding removed

debed_batch(objects)[source]

Remove embeddings from multiple objects.

Parameters:

objects (Sequence[TypeVar(E, bound= Embeddable)]) – List of objects to remove embeddings from

Return type:

list[TypeVar(E, bound= Embeddable)]

Returns:

List of objects with embeddings removed

embed(obj)[source]

Synchronously generate and attach embedding for a single object.

This is a convenience wrapper around aembed for synchronous code. For better performance in async contexts, use aembed directly.

Parameters:

obj (TypeVar(E, bound= Embeddable)) – Object to embed

Return type:

TypeVar(E, bound= Embeddable)

Returns:

Object with embedding attached

Raises:
embed_batch(objects)[source]

Synchronously generate embeddings for multiple objects efficiently.

This is a convenience wrapper around aembed_batch for synchronous code. For better performance in async contexts, use aembed_batch directly.

Parameters:

objects (Sequence[TypeVar(E, bound= Embeddable)]) – List of objects to embed

Return type:

list[TypeVar(E, bound= Embeddable)]

Returns:

List of objects with embeddings attached

Raises:

EmbeddingError – If any object’s embedding generation fails

class babylon.rag.VectorStore(collection_name='documents', chroma_manager=None)[source]

Bases: object

Interface to ChromaDB for storing and retrieving document vectors.

Parameters:
__init__(collection_name='documents', chroma_manager=None)[source]

Initialize the vector store.

Parameters:
  • collection_name (str) – Name of the ChromaDB collection

  • chroma_manager (ChromaManager | None) – Optional ChromaManager instance (creates new if None)

add_chunks(chunks)[source]

Add document chunks to the vector store.

Parameters:

chunks (list[DocumentChunk]) – List of DocumentChunk objects with embeddings

Raises:

RagError – If chunks are missing embeddings or storage fails

Return type:

None

property collection: Any

Get or create the ChromaDB collection.

delete_chunks(chunk_ids)[source]

Delete chunks from the vector store.

Parameters:

chunk_ids (list[str]) – List of chunk IDs to delete

Raises:

RagError – If deletion fails

Return type:

None

get_collection_count()[source]

Get the number of chunks in the collection.

Return type:

int

query_similar(query_embedding, k=10, where=None, include=None)[source]

Query for similar chunks using embedding.

Parameters:
  • query_embedding (list[float]) – Query vector embedding

  • k (int) – Number of results to return

  • where (dict[str, Any] | None) – Optional metadata filters

  • include (list[str] | None) – Fields to include in results

Return type:

tuple[list[str], list[str], list[list[float]], list[dict[str, Any]], list[float]]

Returns:

Tuple of (ids, documents, embeddings, metadatas, distances)

Raises:

RagError – If query fails

class babylon.rag.Retriever(vector_store, embedding_manager)[source]

Bases: object

High-level retrieval interface for RAG queries.

Parameters:
__init__(vector_store, embedding_manager)[source]

Initialize the retriever.

Parameters:
  • vector_store (VectorStore) – VectorStore instance for similarity search

  • embedding_manager (EmbeddingManager) – EmbeddingManager for query embedding

async aquery(query, k=10, similarity_threshold=0.0, metadata_filter=None)[source]

Asynchronously query for relevant document chunks.

Parameters:
  • query (str) – Query text to search for

  • k (int) – Number of results to return

  • similarity_threshold (float) – Minimum similarity score for results

  • metadata_filter (dict[str, Any] | None) – Optional filters for chunk metadata

Return type:

QueryResponse

Returns:

QueryResponse with results and timing information

Raises:

RagError – If query processing fails

query(query, k=10, similarity_threshold=0.0, metadata_filter=None)[source]

Synchronously query for relevant document chunks.

This is a convenience wrapper around aquery for synchronous code. For better performance in async contexts, use aquery directly.

Parameters:
  • query (str) – Query text to search for

  • k (int) – Number of results to return

  • similarity_threshold (float) – Minimum similarity score for results

  • metadata_filter (dict[str, Any] | None) – Optional filters for chunk metadata

Return type:

QueryResponse

Returns:

QueryResponse with results and timing information

Raises:

RagError – If query processing fails

class babylon.rag.QueryResponse(**data)[source]

Bases: BaseModel

Represents the complete response to a query.

Parameters:
get_combined_context(max_length=4000, separator='\\n\\n')[source]

Combine result chunks into a single context string.

Return type:

str

Parameters:
  • max_length (int)

  • separator (str)

get_top_k(k)[source]

Get the top k results by similarity score.

Return type:

list[QueryResult]

Parameters:

k (int)

model_config: ClassVar[ConfigDict] = {'validate_assignment': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

query: str
results: list[QueryResult]
total_results: int
processing_time_ms: float
embedding_time_ms: float
search_time_ms: float
metadata: dict[str, Any] | None
class babylon.rag.QueryResult(**data)[source]

Bases: BaseModel

Represents a single query result with similarity score.

Parameters:
convert_similarity_to_distance()[source]

Convert similarity score to distance if not provided.

Return type:

QueryResult

model_config: ClassVar[ConfigDict] = {'validate_assignment': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

chunk: DocumentChunk
similarity_score: float
distance: float
metadata: dict[str, Any] | None

Modules

chunker

Document chunking and preprocessing for the RAG system.

context_window

Context window management for the RAG system.

embeddings

Embedding management for the RAG system.

exceptions

Custom exceptions for the RAG system.

lifecycle

Object lifecycle management for the RAG system.

pre_embeddings

Pre-embedding pipeline for RAG document processing.

rag_pipeline

Main RAG pipeline service that orchestrates document ingestion and query processing.

retrieval

Query and retrieval interface for the RAG system.