Source code for pykomodo.multi_dirs_chunker

import os
import fnmatch
import re
import concurrent.futures
from typing import Optional, List, Tuple
from pykomodo.tree_generator import TreeGenerator
from pykomodo.pdf_processor import PDFProcessor
import ast

BUILTIN_IGNORES = [
    "**/.git/**",
    "**/.svn/**",
    "**/.hg/**",
    "**/.idea/**",
    "**/.vscode/**",
    "**/__pycache__/**",
    "**/*.pyc",
    "**/*.pyo",
    "**/.pytest_cache/**",
    "**/.coverage",
    "**/.tox/**",
    "**/.eggs/**",
    "**/Cython/Debugger/**",    
    "**/*.egg-info/**",
    "**/build/**",
    "**/dist/**",
    "**/venv/**",
    "**/.venv/**",
    "**/env/**",
    "**/ENV/**",
    "**/virtualenv/**",
    "**/site-packages/**",
    "**/pip/**",
    "**/.DS_Store",
    "**/Thumbs.db",
    "**/node_modules/**",
    "**/*.env",
    "**/.env", 
    "**/*.png",
    "**/*.jpg",
    "**/*.jpeg",
    "**/*.gif",
    "**/*.webp",
    "**/*.bmp",
    "**/*.mp3",
    "**/*.mp4",
    "**/tmp/**",
    "**/temp/**",
    "**/var/folders/**",
    "**/test/data/**",
    "**/tests/data/**",
    "**/test_data/**",
    "**/tests_data/**",
    "__pycache__",
    "*.pyc",
    "*.pyo",
    "target",
    "venv"
]

[docs] class PriorityRule:
[docs] def __init__(self, pattern, score): self.pattern = pattern self.score = score
[docs] class ChunkWriterInterface:
[docs] def __init__(self, chunker): self.chunker = chunker
[docs] def write_chunk(self, content_bytes, chunk_num): self.chunker._write_chunk(content_bytes, chunk_num)
[docs] class ParallelChunker: DIR_IGNORE_NAMES = [ "venv", ".venv", "env", "node_modules", ".git", ".svn", ".hg", "__pycache__", ".pytest_cache", ".tox", ".eggs", "build", "dist" ]
[docs] def __init__( self, equal_chunks: Optional[int] = None, max_chunk_size: Optional[int] = None, output_dir: str = "chunks", user_ignore: Optional[List[str]] = None, user_unignore: Optional[List[str]] = None, binary_extensions: Optional[List[str]] = None, priority_rules: Optional[List[Tuple[str,int]]] = None, num_threads: int = 4, dry_run: bool = False, semantic_chunking: bool = False, file_type: Optional[str] = None, verbose: bool = False ) -> None: if equal_chunks is not None and max_chunk_size is not None: raise ValueError("Cannot specify both equal_chunks and max_chunk_size") if equal_chunks is None and max_chunk_size is None: raise ValueError("Must specify either equal_chunks or max_chunk_size") self.dir_ignore_names = self.DIR_IGNORE_NAMES self.equal_chunks = equal_chunks self.max_chunk_size = max_chunk_size self.output_dir = output_dir self.num_threads = num_threads self.dry_run = dry_run self.semantic_chunking = semantic_chunking self.file_type = file_type.lower() if file_type else None self.verbose = verbose if user_ignore is None: user_ignore = [] if user_unignore is None: user_unignore = [] self.ignore_patterns = BUILTIN_IGNORES[:] self.ignore_patterns.extend(user_ignore) self.unignore_patterns = list(user_unignore) found_venv = False if user_unignore: for pattern in user_unignore: if "site-packages" in pattern or "venv" in pattern: found_venv = True break if not found_venv: self.unignore_patterns.append("*.py") if binary_extensions is None: binary_extensions = ["exe", "dll", "so"] self.binary_exts = set(ext.lower() for ext in binary_extensions) self.priority_rules = [] if priority_rules: for rule_data in priority_rules: if isinstance(rule_data, PriorityRule): self.priority_rules.append(rule_data) else: pat, score = rule_data self.priority_rules.append(PriorityRule(pat, score)) self.loaded_files = [] self.current_walk_root = None self.tree_generator = TreeGenerator() pdf_chunk_size = 1000 if max_chunk_size: pdf_chunk_size = max_chunk_size self.pdf_processor = PDFProcessor(pdf_chunk_size)
def _get_text_content(self, path, content_bytes): if path.endswith(".pdf"): return self.pdf_processor.extract_text_from_pdf(path) else: text = content_bytes.decode("utf-8", errors="replace") text = self._filter_api_keys(text) return text
[docs] def is_absolute_pattern(self, pattern): if pattern.startswith("/"): return True if re.match(r"^[a-zA-Z]:\\", pattern): return True return False
def _contains_api_key(self, line: str) -> bool: pattern = r'[\'"].*[a-zA-Z0-9_-]{20,}.*[\'"]' return bool(re.search(pattern, line)) def _filter_api_keys(self, text: str) -> str: lines = text.splitlines() filtered_lines = [] for line in lines: contains_key = self._contains_api_key(line) if contains_key: filtered_lines.append("[API_KEY_REDACTED]") else: filtered_lines.append(line) result = "\n".join(filtered_lines) return result def _match_segments(self, path_segs, pattern_segs, pi=0, pj=0): if pj == len(pattern_segs): return pi == len(path_segs) if pi == len(path_segs): return all(seg == '**' for seg in pattern_segs[pj:]) seg_pat = pattern_segs[pj] if seg_pat == "**": if self._match_segments(path_segs, pattern_segs, pi, pj + 1): return True return self._match_segments(path_segs, pattern_segs, pi + 1, pj) if fnmatch.fnmatch(path_segs[pi], seg_pat): return self._match_segments(path_segs, pattern_segs, pi + 1, pj + 1) return False def _double_star_fnmatch(self, path, pattern): path = path.replace("\\", "/") pattern = pattern.replace("\\", "/") return self._match_segments(path.split("/"), pattern.split("/")) def _matches_pattern(self, abs_path, rel_path, pattern): if self.is_absolute_pattern(pattern): target = abs_path else: target = rel_path if "**" in pattern: if self._double_star_fnmatch(target, pattern): return True else: if fnmatch.fnmatch(target, pattern): return True if not self.is_absolute_pattern(pattern) and "/" not in pattern: if fnmatch.fnmatch(os.path.basename(abs_path), pattern): return True return False def _read_ignore_file(self, directory): for filename in ['.pykomodo-ignore', '.gitignore']: ignore_file_path = os.path.join(directory, filename) if os.path.exists(ignore_file_path): try: with open(ignore_file_path, 'r') as f: for line in f: line = line.strip() if line and not line.startswith('#'): if filename == '.gitignore' and '**' not in line: if not line.startswith('/'): line = f"**/{line}" if line.endswith('/'): line = f"{line}**" self.ignore_patterns.append(line) except: print(f"Error reading {filename}")
[docs] def should_ignore_file(self, path): abs_path = os.path.abspath(path) root = self.current_walk_root or os.path.dirname(abs_path) rel_path = os.path.relpath(abs_path, start=root).replace("\\", "/") for pat in self.ignore_patterns: if self._matches_pattern(abs_path, rel_path, pat): for unignore_pat in self.unignore_patterns: if self._matches_pattern(abs_path, rel_path, unignore_pat): return False return True return False
[docs] def is_binary_file(self, path): ext = path.split(".")[-1].lower() if ext in {"py", "pdf"}: return False if ext in self.binary_exts: return True try: with open(path, "rb") as f: chunk = f.read(8192) if b"\0" in chunk: return True except OSError: return True return False
def _collect_paths(self, dir_list): collected = [] for directory in dir_list: self.current_walk_root = os.path.abspath(directory) for root, dirs, files in os.walk(directory): new_dirs = [] for d in dirs: if d not in self.dir_ignore_names: new_dirs.append(d) dirs[:] = new_dirs for filename in files: full_path = os.path.join(root, filename) if self.file_type: if not filename.lower().endswith(f".{self.file_type}"): continue if os.path.commonprefix([os.path.abspath(self.output_dir), os.path.abspath(full_path)]) == os.path.abspath(self.output_dir): continue if self.should_ignore_file(full_path): continue collected.append(full_path) return collected def _load_file_data(self, path): try: with open(path, "rb") as f: content = f.read() return path, content, self.calculate_priority(path) except: return path, None, 0
[docs] def calculate_priority(self, path): highest = 0 basename = os.path.basename(path) for rule in self.priority_rules: if fnmatch.fnmatch(basename, rule.pattern): highest = max(highest, rule.score) return highest
[docs] def process_directories(self, dirs: List[str]) -> None: if dirs: self.current_walk_root = os.path.abspath(dirs[0]) self.tree_generator.reset() for directory in dirs: self._read_ignore_file(directory) all_paths = self._collect_paths(dirs) self.loaded_files.clear() if self.dry_run: self._handle_dry_run(all_paths) return self._load_files_parallel(all_paths) self.loaded_files.sort(key=lambda x: (-x[2], x[0])) self._process_chunks()
def _handle_dry_run(self, paths): print("[DRY-RUN] The following files would be processed (in priority order):") files = [] for path in paths: priority = self.calculate_priority(path) files.append((path, priority)) files.sort(key=lambda x: -x[1]) for path, priority in files: print(f" - {path} (priority={priority})") def _load_files_parallel(self, all_paths): executor = concurrent.futures.ThreadPoolExecutor(max_workers=self.num_threads) futures = {} for path in all_paths: future = executor.submit(self._load_file_data, path) futures[future] = path for future in concurrent.futures.as_completed(futures): path, content, priority = future.result() if content and not self.is_binary_file(path): self.loaded_files.append((path, content, priority)) executor.shutdown()
[docs] def process_file(self, file_path: str, custom_chunk_size: Optional[int] = None, force_process: bool = False) -> None: if not os.path.isfile(file_path): raise ValueError(f"File not found: {file_path}") if self.should_ignore_file(file_path) and not force_process and not self.dry_run: print(f"Skipping ignored file: {file_path}") return if self.dry_run: priority = self.calculate_priority(file_path) print(f"[DRY-RUN] Would process file: {file_path} (priority={priority})") return if self.is_binary_file(file_path) and not file_path.endswith(".pdf") and not force_process: print(f"Skipping binary file: {file_path}") return path, content, priority = self._load_file_data(file_path) if content is None: print(f"Error loading file: {file_path}") return self.loaded_files = [(path, content, priority)] original_max_chunk_size = None if custom_chunk_size is not None and not self.equal_chunks: original_max_chunk_size = self.max_chunk_size self.max_chunk_size = custom_chunk_size try: self._process_chunks() finally: if original_max_chunk_size is not None: self.max_chunk_size = original_max_chunk_size
[docs] def process_directory(self, directory): self.process_directories([directory])
def _split_tokens(self, content_bytes): return content_bytes.decode("utf-8", errors="replace").split() def _write_chunk(self, content_bytes, chunk_num): os.makedirs(self.output_dir, exist_ok=True) chunk_path = os.path.join(self.output_dir, f"chunk-{chunk_num}.txt") try: tree_header = "" if chunk_num == 0 and self.current_walk_root: tree_header = self.tree_generator.prepare_tree_header(self.current_walk_root) if type(content_bytes) == bytes: chunk_content = content_bytes.decode('utf-8', errors='replace') else: chunk_content = str(content_bytes) final_content = tree_header + chunk_content with open(chunk_path, "w", encoding="utf-8") as f: f.write(final_content) except Exception: print(f"Error writing chunk {chunk_num}") try: with open(chunk_path, "wb") as f: f.write(content_bytes) except: pass
[docs] def pdf_chunking(self, path, idx): chunk_writer = ChunkWriterInterface(self) return self.pdf_processor.process_pdf_for_chunking(path, idx, chunk_writer)
def _process_chunks(self): if not self.loaded_files: return if self.semantic_chunking: self._chunk_by_semantic() elif self.equal_chunks: self._chunk_by_equal_parts() else: self._chunk_by_size() def _extract_pdf_paragraphs(self, path): return self.pdf_processor.extract_pdf_paragraphs(path) def _chunk_by_equal_parts(self) -> None: text_blocks = [] for path, content_bytes, _ in self.loaded_files: if path.endswith(".pdf"): paragraphs = self._extract_pdf_paragraphs(path) for para in paragraphs: s = len(para.split()) if s > 0: text_blocks.append((path, para, s)) else: text = self._get_text_content(path, content_bytes) if text: s = len(text.split()) text_blocks.append((path, text, s)) if not text_blocks: return n_chunks = self.equal_chunks text_blocks.sort(key=lambda x: -x[2]) chunks = [[] for _ in range(n_chunks)] chunk_sizes = [0] * n_chunks for block in text_blocks: min_idx = 0 min_size = chunk_sizes[0] for i in range(1, n_chunks): if chunk_sizes[i] < min_size: min_size = chunk_sizes[i] min_idx = i chunks[min_idx].append(block) chunk_sizes[min_idx] += block[2] for i, chunk in enumerate(chunks): if chunk: self._write_equal_chunk([(path, text) for path, text, _ in chunk], i) def _write_equal_chunk(self, chunk_data, chunk_num): tree_header = "" if chunk_num == 0 and self.current_walk_root: tree_header = self.tree_generator.prepare_tree_header(self.current_walk_root) txt = tree_header txt += "="*80 + "\n" + f"CHUNK {chunk_num + 1} OF {self.equal_chunks}\n" + "="*80 + "\n\n" for path, text in chunk_data: txt += "="*40 + "\n" + f"File: {path}\n" + "="*40 + "\n" + text + "\n" os.makedirs(self.output_dir, exist_ok=True) chunk_path = os.path.join(self.output_dir, f"chunk-{chunk_num}.txt") with open(chunk_path, "w", encoding="utf-8") as f: f.write(txt) def _build_chunk_header(self, num, file_path): return [ "="*80, f"CHUNK {num}", "="*80, "", "="*40, f"File: {file_path}", "="*40, "" ] def _write_file_chunk(self, path, lines, chunk_num): header = self._build_chunk_header(chunk_num, path) chunk_data = "\n".join(header + lines) + "\n" self._write_chunk(chunk_data.encode("utf-8"), chunk_num - 1) def _chunk_by_size(self): chunk_num = 1 for path, content_bytes, _ in self.loaded_files: text = self._get_text_content(path, content_bytes) if not text: header = self._build_chunk_header(chunk_num, path) data = "\n".join(header + ["[Empty File]"]) + "\n" self._write_chunk(data.encode("utf-8"), chunk_num - 1) chunk_num += 1 continue if path.endswith(".pdf"): next_idx = self.pdf_chunking(path, chunk_num - 1) chunk_num = next_idx + 1 continue lines = text.splitlines() current_lines = [] word_count = 0 for line in lines: if not line.strip(): current_lines.append(line) continue words = len(line.split()) if word_count + words > self.max_chunk_size and current_lines: self._write_file_chunk(path, current_lines, chunk_num) chunk_num += 1 current_lines = [] word_count = 0 current_lines.append(line) word_count += words if current_lines: self._write_file_chunk(path, current_lines, chunk_num) chunk_num += 1 def _chunk_by_semantic(self): chunk_index = 0 for path, content_bytes in self.loaded_files: text = self._get_text_content(path, content_bytes) if not text and not path.endswith(".pdf"): continue if path.endswith(".py"): chunk_index = self._chunk_python_file_ast(path, text, chunk_index) else: chunk_index = self._chunk_nonpython_file_by_size(path, text, chunk_index) def _chunk_nonpython_file_by_size(self, path, text, chunk_index): lines = text.splitlines() if not lines: t = ( "="*80 + "\n" + f"CHUNK {chunk_index + 1}\n" + "="*80 + "\n\n" + "="*40 + "\n" + f"File: {path}\n" + "="*40 + "\n" + "[Empty File]\n" ) self._write_chunk(t.encode("utf-8"), chunk_index) return chunk_index + 1 lines = [] current_size = 0 idx = chunk_index for line in lines: line_size = len(line.split()) if self.max_chunk_size and (current_size + line_size) > self.max_chunk_size and lines: chunk_data = self._format_chunk_content(path, lines, idx) self._write_chunk(chunk_data.encode("utf-8"), idx) idx += 1 lines = [] current_size = 0 lines.append(line) current_size += line_size if lines: chunk_data = self._format_chunk_content(path, lines, idx) self._write_chunk(chunk_data.encode("utf-8"), idx) idx += 1 return idx def _format_chunk_content(self, path, lines, idx): h = [ "="*80, f"CHUNK {idx + 1}", "="*80, "", "="*40, f"File: {path}", "="*40, "" ] return "\n".join(h + lines) + "\n" def _chunk_python_file_ast(self, path, text, chunk_index): try: tree = ast.parse(text, filename=path) except SyntaxError: chunk_data = f"{'='*80}\nFILE: {path}\n{'='*80}\n\n{text}" self._write_chunk(chunk_data.encode("utf-8"), chunk_index) return chunk_index + 1 lines = text.splitlines() node_boundaries = [] for node in tree.body: if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)): node_type = "Function" label = f"{node_type}: {node.name}" elif isinstance(node, ast.ClassDef): label = f"Class: {node.name}" else: continue start = node.lineno end = getattr(node, 'end_lineno', start) node_boundaries.append((start, end, label)) node_boundaries.sort(key=lambda x: x[0]) expanded_blocks = [] prev_end = 1 for (start, end, label) in node_boundaries: if start > prev_end: expanded_blocks.append((prev_end, start - 1, "GLOBAL CODE")) expanded_blocks.append((start, end, label)) prev_end = end + 1 if prev_end <= len(lines): expanded_blocks.append((prev_end, len(lines), "GLOBAL CODE")) code_blocks = [] for (start, end, label) in expanded_blocks: snippet = lines[start - 1 : end] block_text = f"{label} (lines {start}-{end})\n" + "\n".join(snippet) code_blocks.append(block_text) current_lines = [] current_count = 0 for block in code_blocks: block_size = len(block.splitlines()) if not self.max_chunk_size: current_lines.append(block) current_count += block_size continue if block_size > self.max_chunk_size: if current_lines: chunk_data = "\n\n".join(current_lines) final_text = f"{'='*80}\nFILE: {path}\n{'='*80}\n\n{chunk_data}" self._write_chunk(final_text.encode("utf-8"), chunk_index) chunk_index += 1 current_lines = [] current_count = 0 big_block_data = f"{'='*80}\nFILE: {path}\n{'='*80}\n\n{block}" self._write_chunk(big_block_data.encode("utf-8"), chunk_index) chunk_index += 1 continue if current_count + block_size > self.max_chunk_size and current_lines: chunk_data = "\n\n".join(current_lines) final_text = f"{'='*80}\nFILE: {path}\n{'='*80}\n\n{chunk_data}" self._write_chunk(final_text.encode("utf-8"), chunk_index) chunk_index += 1 current_lines = [] current_count = 0 current_lines.append(block) current_count += block_size if current_lines: chunk_data = "\n\n".join(current_lines) final_text = f"{'='*80}\nFILE: {path}\n{'='*80}\n\n{chunk_data}" self._write_chunk(final_text.encode("utf-8"), chunk_index) chunk_index += 1 return chunk_index
[docs] def close(self): pass
def __enter__(self): return self def __exit__(self): self.close() return False