Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 92 additions & 0 deletions src/parxy_cli/commands/parse.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
"""Command line interface for Parxy document processing."""

import json
import tomllib
from datetime import timedelta
from pathlib import Path
from typing import Optional, List, Annotated
Expand All @@ -17,6 +19,75 @@
console = Console()


def _load_middleware_from_config(config_path: Path) -> List[str]:
"""Load middleware class paths from a config file.

Supports JSON, TOML, YAML and YML. The expected structure is either:
- A top-level list: ``["path.to.Middleware1", "path.to.Middleware2"]``
- An object with a ``middleware`` key: ``{"middleware": ["path.to.Middleware"]}``
"""
if not config_path.exists():
raise typer.BadParameter(f'Middleware config file not found: {config_path}')

suffix = config_path.suffix.lower()

if suffix == '.json':
raw_data = json.loads(config_path.read_text(encoding='utf-8'))
elif suffix == '.toml':
raw_data = tomllib.loads(config_path.read_text(encoding='utf-8'))
elif suffix in {'.yaml', '.yml'}:
try:
import yaml
except ImportError as exc:
raise typer.BadParameter(
'YAML config requires PyYAML. Install pyyaml or use JSON/TOML config.'
) from exc
raw_data = yaml.safe_load(config_path.read_text(encoding='utf-8'))
else:
raise typer.BadParameter(
'Unsupported middleware config format. Use .json, .toml, .yaml or .yml'
)

if isinstance(raw_data, list):
middleware_list = raw_data
elif isinstance(raw_data, dict):
middleware_list = raw_data.get('middleware', [])
if not isinstance(middleware_list, list):
raise typer.BadParameter(
'middleware_config: "middleware" key must be a list of class paths.'
)
else:
raise typer.BadParameter(
'Middleware config must be a list or an object with a "middleware" key.'
)

if not all(isinstance(item, str) for item in middleware_list):
raise typer.BadParameter('Middleware class paths must be strings.')

return middleware_list


def configure_middleware(
middleware: Optional[List[str]],
config_path: Optional[Path],
) -> None:
"""Configure global middleware from inline class paths and/or a config file."""
paths: List[str] = list(middleware or [])

if config_path is not None:
paths.extend(_load_middleware_from_config(config_path))

if not paths:
return

Parxy.clear_middleware()
Parxy.with_middleware(paths)

console.info(
f'Using {len(paths)} middleware class{"es" if len(paths) != 1 else ""}.'
)


def collect_files_with_depth(
directory: Path, pattern: str, max_depth: int, current_depth: int = 0
) -> List[Path]:
Expand Down Expand Up @@ -261,6 +332,22 @@ def parse(
min=1,
),
] = None,
middleware: Annotated[
Optional[List[str]],
typer.Option(
'--middleware',
'-p',
help='Middleware class path(s) to apply. Can be specified multiple times (e.g. --middleware my.pkg.MyMiddleware).',
),
] = None,
middleware_config: Annotated[
Optional[str],
typer.Option(
'--middleware-config',
envvar='PARXY_MIDDLEWARE_CONFIG',
help='Path to a .json/.toml/.yaml file with a list of middleware class paths to apply. Appended after inline middleware with --middleware',
),
] = None,
):
"""
Parse documents using one or more drivers.
Expand Down Expand Up @@ -312,6 +399,11 @@ def parse(
# Calculate total tasks
total_tasks = len(files) * len(drivers)

configure_middleware(
middleware=middleware,
config_path=Path(middleware_config) if middleware_config else None,
)

error_count = 0

# Show info
Expand Down
155 changes: 146 additions & 9 deletions src/parxy_core/drivers/abstract_driver.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
import base64
import hashlib
import io
import time
from abc import ABC, abstractmethod
from logging import Logger
from typing import Dict, Any, Self, Tuple, Optional
from typing import Dict, Any, Self, Tuple, Optional, List, Union

import requests
import validators

from parxy_core.models import Document
from parxy_core.models import Document, ParsingRequest
from parxy_core.exceptions import (
FileNotFoundException,
ParsingException,
Expand All @@ -21,6 +20,7 @@
from parxy_core.models.config import BaseConfig
from parxy_core.logging import create_null_logger
from parxy_core.tracing import tracer
from parxy_core.middleware import Middleware


class Driver(ABC):
Expand Down Expand Up @@ -50,6 +50,9 @@ class Driver(ABC):

_logger: Logger

_middleware: List[Middleware]
"""Driver-specific middleware list"""

def __new__(cls, config: Dict[str, Any] = [], logger: Logger = None):
instance = super().__new__(cls)
instance.__init__(config=config, logger=logger)
Expand All @@ -62,6 +65,7 @@ def __init__(self, config: Dict[str, Any] = None, logger: Logger = None):
logger = create_null_logger(name=f'parxy.{self.__class__.__name__}')

self._logger = logger
self._middleware = [] # Initialize empty middleware list
self._initialize_driver()

def parse(
Expand Down Expand Up @@ -110,25 +114,30 @@ def parse(
driver=self.__class__.__name__,
level=level,
**kwargs,
) as span:
):
self._validate_level(level)
middleware_list = self._resolve_middleware()

try:
# Start timing
start_time = time.perf_counter()

document = self._handle(file=file, level=level, **kwargs)
if middleware_list:
document = self._parse_with_middleware(
file=file,
level=level,
middleware_list=middleware_list,
**kwargs,
)
else:
document = self._handle(file=file, level=level, **kwargs)

# Calculate elapsed time in milliseconds
end_time = time.perf_counter()
elapsed_ms = (end_time - start_time) * 1000

# Store elapsed time in parsing metadata
if document.parsing_metadata is None:
document.parsing_metadata = {}
document.parsing_metadata['driver_elapsed_time'] = elapsed_ms

# Increment the documents processed counter
tracer.count(
'documents.processed',
description='Total documents processed by each driver',
Expand Down Expand Up @@ -170,10 +179,138 @@ def parse(
tracer.error('Parsing failed', exception=str(parxy_exc))
raise parxy_exc from ex

def _resolve_middleware(self) -> List[Middleware]:
"""Resolve middleware for the current parse call.

External middleware is applied first, then driver-specific middleware.
"""
from parxy_core.drivers.factory import DriverFactory

combined = DriverFactory.build().get_middleware()
combined.extend(self._middleware)
return combined

def _parse_with_middleware(
self,
file: str | io.BytesIO | bytes,
level: str,
middleware_list: List[Middleware],
**kwargs,
) -> Document:
"""Parse file with middleware chain.

Parameters
----------
file : str | io.BytesIO | bytes
Path, URL or stream of the file to parse.
level : str
Desired extraction level.
middleware_list : List[Middleware]
List of middleware to apply.
**kwargs : dict
Additional keyword arguments.

Returns
-------
Document
The parsed document
"""
# Create parsing request
request = ParsingRequest(
driver=self.__class__.__name__,
file=file,
level=level,
config=kwargs,
)

with tracer.span('middleware-chain', count=len(middleware_list)):

def call_handle(index: int, req: ParsingRequest) -> Document:
if index >= len(middleware_list):
return self._handle(file=req.file, level=req.level, **req.config)

current_middleware = middleware_list[index]
with tracer.span(
'middleware.handle',
middleware=current_middleware.__class__.__name__,
index=index,
):
return current_middleware.handle(
req, lambda next_req: call_handle(index + 1, next_req)
)

def call_terminate(index: int, doc: Document) -> Document:
if index < 0:
return doc

current_middleware = middleware_list[index]
with tracer.span(
'middleware.terminate',
middleware=current_middleware.__class__.__name__,
index=index,
):
return current_middleware.terminate(
doc, lambda next_doc: call_terminate(index - 1, next_doc)
)

document = call_handle(0, request)
document = call_terminate(len(middleware_list) - 1, document)

return document

def _initialize_driver(self) -> Self:
"""Initialize driver internal logic. It is called automatically during class initialization"""
return self

def with_middleware(self, middleware: Union[Middleware, List[Middleware]]) -> Self:
"""Add middleware to this driver instance.

Note: Drivers are singletons, so middleware added to a driver instance
persists for all subsequent uses of that driver.

Parameters
----------
middleware : Union[Middleware, List[Middleware]]
A middleware instance or list of middleware instances to add

Returns
-------
Self
Returns self for chaining

Example
-------
>>> driver = Parxy.driver('pymupdf')
>>> driver.with_middleware(LoggingMiddleware())
>>> doc = driver.parse('document.pdf')
"""
if isinstance(middleware, list):
self._middleware.extend(middleware)
else:
self._middleware.append(middleware)
return self

def clear_middleware(self) -> Self:
"""Clear all middleware from this driver instance.

Returns
-------
Self
Returns self for chaining
"""
self._middleware.clear()
return self

def get_middleware(self) -> List[Middleware]:
"""Get the list of middleware for this driver.

Returns
-------
List[Middleware]
Copy of the current middleware list
"""
return list(self._middleware)

@abstractmethod
def _handle(
self,
Expand Down
Loading