babylon.rag.rag_pipeline

Main RAG pipeline service that orchestrates document ingestion and query processing.

Functions

quick_ingest_text(content, source_id[, ...])

Quickly ingest text content using default settings.

quick_query(query[, collection_name, top_k])

Quickly query the RAG system using default settings.

Classes

IngestionResult(**data)

Result of document ingestion process.

RagConfig(**data)

Configuration for RAG pipeline.

RagPipeline([config, chroma_manager, ...])

Main RAG pipeline that orchestrates ingestion and retrieval.

class babylon.rag.rag_pipeline.IngestionResult(**data)[source]

Bases: BaseModel

Result of document ingestion process.

Parameters:
model_config: ClassVar[ConfigDict] = {'frozen': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

success: bool
chunks_processed: int
chunks_stored: int
processing_time_ms: float
embedding_time_ms: float
storage_time_ms: float
errors: list[str]
source_files: list[str]
class babylon.rag.rag_pipeline.RagConfig(**data)[source]

Bases: BaseModel

Configuration for RAG pipeline.

Parameters:
  • chunk_size (int)

  • chunk_overlap (int)

  • min_chunk_length (int)

  • max_chunk_length (int)

  • embedding_batch_size (int)

  • max_concurrent_embeds (int)

  • default_top_k (int)

  • default_similarity_threshold (float)

  • collection_name (str)

  • use_persistent_storage (bool)

model_config: ClassVar[ConfigDict] = {'frozen': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

chunk_size: int
chunk_overlap: int
min_chunk_length: int
max_chunk_length: int
embedding_batch_size: int
max_concurrent_embeds: int
default_top_k: int
default_similarity_threshold: float
collection_name: str
use_persistent_storage: bool
class babylon.rag.rag_pipeline.RagPipeline(config=None, chroma_manager=None, embedding_manager=None)[source]

Bases: object

Main RAG pipeline that orchestrates ingestion and retrieval.

Parameters:
__init__(config=None, chroma_manager=None, embedding_manager=None)[source]

Initialize the RAG pipeline.

Parameters:
async aingest_text(content, source_id, metadata=None)[source]

Asynchronously ingest text content into the RAG system.

Parameters:
  • content (str) – Text content to ingest

  • source_id (str) – Unique identifier for the content source

  • metadata (dict[str, Any] | None) – Optional metadata to attach to chunks

Return type:

IngestionResult

Returns:

IngestionResult with processing statistics

Raises:

RagError – If ingestion fails

ingest_text(content, source_id, metadata=None)[source]

Synchronously ingest text content into the RAG system.

This is a convenience wrapper around aingest_text for synchronous code. For better performance in async contexts, use aingest_text directly.

Parameters:
  • content (str) – Text content to ingest

  • source_id (str) – Unique identifier for the content source

  • metadata (dict[str, Any] | None) – Optional metadata to attach to chunks

Return type:

IngestionResult

Returns:

IngestionResult with processing statistics

async aingest_file(file_path, encoding='utf-8')[source]

Asynchronously ingest a text file into the RAG system.

Parameters:
  • file_path (str) – Path to the text file

  • encoding (str) – File encoding (default: utf-8)

Return type:

IngestionResult

Returns:

IngestionResult with processing statistics

Raises:
ingest_file(file_path, encoding='utf-8')[source]

Synchronously ingest a text file into the RAG system.

Parameters:
  • file_path (str) – Path to the text file

  • encoding (str) – File encoding (default: utf-8)

Return type:

IngestionResult

Returns:

IngestionResult with processing statistics

async aingest_files(file_paths, encoding='utf-8', max_concurrent=5)[source]

Asynchronously ingest multiple files concurrently.

Parameters:
  • file_paths (list[str]) – List of file paths to ingest

  • encoding (str) – File encoding (default: utf-8)

  • max_concurrent (int) – Maximum number of concurrent ingestions

Return type:

list[IngestionResult]

Returns:

List of IngestionResult objects

ingest_files(file_paths, encoding='utf-8', max_concurrent=5)[source]

Synchronously ingest multiple files.

Parameters:
  • file_paths (list[str]) – List of file paths to ingest

  • encoding (str) – File encoding (default: utf-8)

  • max_concurrent (int) – Maximum number of concurrent ingestions

Return type:

list[IngestionResult]

Returns:

List of IngestionResult objects

async aquery(query, top_k=None, similarity_threshold=None, metadata_filter=None)[source]

Asynchronously query the RAG system for relevant content.

Parameters:
  • query (str) – Query text to search for

  • top_k (int | None) – Number of results to return (uses config default if None)

  • similarity_threshold (float | None) – Minimum similarity score (uses config default if None)

  • metadata_filter (dict[str, Any] | None) – Optional filters for chunk metadata

Return type:

QueryResponse

Returns:

QueryResponse with search results

Raises:

RagError – If query processing fails

query(query, top_k=None, similarity_threshold=None, metadata_filter=None)[source]

Synchronously query the RAG system for relevant content.

Parameters:
  • query (str) – Query text to search for

  • top_k (int | None) – Number of results to return (uses config default if None)

  • similarity_threshold (float | None) – Minimum similarity score (uses config default if None)

  • metadata_filter (dict[str, Any] | None) – Optional filters for chunk metadata

Return type:

QueryResponse

Returns:

QueryResponse with search results

get_stats()[source]

Get statistics about the RAG system.

Return type:

dict[str, Any]

Returns:

Dictionary with system statistics

clear_collection()[source]

Clear all documents from the collection.

WARNING: This will delete all ingested documents!

Return type:

None

async aclose()[source]

Asynchronously close the RAG pipeline and clean up resources.

Return type:

None

close()[source]

Synchronously close the RAG pipeline and clean up resources.

Return type:

None

__enter__()[source]

Context manager entry.

Return type:

Self

__exit__(exc_type, exc_val, exc_tb)[source]

Context manager exit with cleanup.

Return type:

None

Parameters:
async babylon.rag.rag_pipeline.quick_ingest_text(content, source_id, collection_name='quick_rag')[source]

Quickly ingest text content using default settings.

Parameters:
  • content (str) – Text content to ingest

  • source_id (str) – Unique identifier for the content

  • collection_name (str) – ChromaDB collection name

Return type:

IngestionResult

Returns:

IngestionResult with processing statistics

async babylon.rag.rag_pipeline.quick_query(query, collection_name='quick_rag', top_k=5)[source]

Quickly query the RAG system using default settings.

Parameters:
  • query (str) – Query text to search for

  • collection_name (str) – ChromaDB collection name

  • top_k (int) – Number of results to return

Return type:

QueryResponse

Returns:

QueryResponse with search results