-
Notifications
You must be signed in to change notification settings - Fork 2.6k
feat: MarkdownHeaderSplitter #9660
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
45e7c12
edfd644
cd55f13
dafe1bd
6da2513
96e616c
c3e397f
1ca9803
6c49600
9c23202
b24d92d
7b8150e
f085221
3490d89
0bf3187
d87ef97
84e34ed
32b0958
69b7953
f5b91f0
83e5579
f3625f5
526ac4f
a46ac62
821d907
c630e14
fa53e1b
1e6cbe3
e756d99
c48bdcf
decaadf
3ef71c4
0fbea3a
38119a6
e12e7f7
f31528e
cee156c
c63035f
cf1b820
22369b6
316ebec
d5e462c
4089ddc
20d172e
a7c6725
c9c44ee
0e36419
edc60b5
995c121
223a676
babc7d9
e488edc
c0efda3
893e3de
9abf10b
11da0a8
32d8c68
bcf56ca
c5415ec
dff06bc
a54d25a
a34c7a6
7bc798e
a7eef6b
5b5fc93
8ef5af0
f1e3739
df7e775
3c1c376
86feef6
c22b57d
7c03a04
9a8ca76
2f1e203
b22feb5
8501831
23da68e
ccc1057
c4a5c17
f3d7799
f842fdb
c7fc2e4
64ff6fb
eb3e568
ad155cc
1c3897c
6339f07
b2455c0
1fb1671
4d166e6
10be09d
42297d9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,337 @@ | ||
| # SPDX-FileCopyrightText: 2022-present deepset GmbH <info@deepset.ai> | ||
| # | ||
| # SPDX-License-Identifier: Apache-2.0 | ||
|
|
||
| import re | ||
| from typing import Literal, Optional | ||
|
|
||
| from haystack import Document, component, logging | ||
| from haystack.components.preprocessors import DocumentSplitter | ||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
|
|
||
| @component | ||
| class MarkdownHeaderSplitter: | ||
| """ | ||
| Split documents at ATX-style Markdown headers (#), with optional secondary splitting. | ||
|
|
||
| This component processes text documents by: | ||
| - Splitting them into chunks at Markdown headers (e.g., '#', '##', etc.), preserving header hierarchy as metadata. | ||
| - Optionally applying a secondary split (by word, passage, period, or line) to each chunk | ||
| (using haystack's DocumentSplitter). | ||
| - Preserving and propagating metadata such as parent headers, page numbers, and split IDs. | ||
| """ | ||
|
|
||
| def __init__( | ||
| self, | ||
| *, | ||
| page_break_character: str = "\f", | ||
| keep_headers: bool = True, | ||
| secondary_split:Literal["word", "passage", "period", "line"] | None = None, | ||
| split_length: int = 200, | ||
| split_overlap: int = 0, | ||
| split_threshold: int = 0, | ||
| skip_empty_documents: bool = True, | ||
| ): | ||
| """ | ||
| Initialize the MarkdownHeaderSplitter. | ||
|
|
||
| :param page_break_character: Character used to identify page breaks. Defaults to form feed ("\f"). | ||
| :param keep_headers: If True, headers are kept in the content. If False, headers are moved to metadata. | ||
| Defaults to True. | ||
|
Comment on lines
+41
to
+42
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does this mean that if
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes! I thought it didn't make sense to keep them in meta if they're still in the content
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it could be useful to always have them in the meta since we may want to use metadata filtering to quickly find relevant sections. So I think we should always store them in meta! |
||
| :param secondary_split: Optional secondary split condition after header splitting. | ||
| Options are None, "word", "passage", "period", "line". Defaults to None. | ||
| :param split_length: The maximum number of units in each split when using secondary splitting. Defaults to 200. | ||
| :param split_overlap: The number of overlapping units for each split when using secondary splitting. | ||
| Defaults to 0. | ||
| :param split_threshold: The minimum number of units per split when using secondary splitting. Defaults to 0. | ||
| :param skip_empty_documents: Choose whether to skip documents with empty content. Default is True. | ||
| Set to False when downstream components in the Pipeline (like LLMDocumentContentExtractor) can extract text | ||
| from non-textual documents. | ||
| """ | ||
| self.page_break_character = page_break_character | ||
| self.secondary_split = secondary_split | ||
| self.split_length = split_length | ||
| self.split_overlap = split_overlap | ||
| self.split_threshold = split_threshold | ||
| self.skip_empty_documents = skip_empty_documents | ||
| self.keep_headers = keep_headers | ||
| self._header_pattern = re.compile(r"(?m)^(#{1,6}) (.+)$") # ATX-style .md-headers | ||
| self._is_warmed_up = False | ||
|
|
||
| # initialize secondary_splitter only if needed | ||
| if self.secondary_split: | ||
| self.secondary_splitter = DocumentSplitter( | ||
| split_by=self.secondary_split, | ||
| split_length=self.split_length, | ||
| split_overlap=self.split_overlap, | ||
| split_threshold=self.split_threshold, | ||
| ) | ||
OGuggenbuehl marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| def warm_up(self): | ||
| """ | ||
| Warm up the MarkdownHeaderSplitter. | ||
| """ | ||
| if self.secondary_split and not self._is_warmed_up: | ||
| self.secondary_splitter.warm_up() | ||
| self._is_warmed_up = True | ||
|
|
||
| def _split_text_by_markdown_headers(self, text: str, doc_id: str) -> list[dict]: | ||
| """Split text by ATX-style headers (#) and create chunks with appropriate metadata.""" | ||
| logger.debug("Splitting text by markdown headers") | ||
|
|
||
| # find headers | ||
| matches = list(re.finditer(self._header_pattern, text)) | ||
|
|
||
| # return unsplit if no headers found | ||
| if not matches: | ||
| logger.info( | ||
| "No headers found in document {doc_id}; returning full document as single chunk.", doc_id=doc_id | ||
| ) | ||
| return [{"content": text, "meta": {}}] | ||
|
|
||
| # process headers and build chunks | ||
| chunks: list[dict] = [] | ||
| header_stack: list[Optional[str]] = [None] * 6 | ||
| active_parents: list[str] = [] # track active parent headers | ||
| pending_headers: list[str] = [] # store empty headers to prepend to next content | ||
| has_content = False # flag to track if any header has content | ||
|
|
||
| for i, match in enumerate(matches): | ||
| # extract header info | ||
| header_prefix = match.group(1) | ||
| header_text = match.group(2) | ||
| level = len(header_prefix) | ||
|
|
||
| # get content | ||
| start = match.end() | ||
| end = matches[i + 1].start() if i + 1 < len(matches) else len(text) | ||
| content = text[start:end] | ||
| if not self.keep_headers and content.startswith("\n"): | ||
| content = content[1:] # remove leading newline if headers not kept | ||
|
Comment on lines
+111
to
+112
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd say let's drop this update and keep the leading newline character. We should utilize a DocumentCleaner after this splitter if we want to clean up this kind of leading and trailing whitespace type characters |
||
|
|
||
| # update header stack to track nesting | ||
| header_stack[level - 1] = header_text | ||
| for j in range(level, 6): | ||
| header_stack[j] = None | ||
|
|
||
| # skip splits w/o content | ||
| if not content.strip(): # this strip is needed to avoid counting whitespace as content | ||
| # add as parent for subsequent headers | ||
| active_parents = [h for h in header_stack[: level - 1] if h is not None] | ||
| active_parents.append(header_text) | ||
| if self.keep_headers: | ||
|
Comment on lines
+119
to
+124
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this can produce an unwanted edge-case which is if |
||
| header_line = f"{header_prefix} {header_text}" | ||
OGuggenbuehl marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| pending_headers.append(header_line) | ||
| continue | ||
|
|
||
| has_content = True # at least one header has content | ||
| parent_headers = list(active_parents) | ||
|
|
||
| logger.debug( | ||
| "Creating chunk for header '{header_text}' at level {level}", header_text=header_text, level=level | ||
| ) | ||
|
|
||
| if self.keep_headers: | ||
| header_line = f"{header_prefix} {header_text}" | ||
| # add pending & current header to content | ||
| chunk_content = "" | ||
| if pending_headers: | ||
| chunk_content += "\n".join(pending_headers) + "\n" | ||
| chunk_content += f"{header_line}{content}" | ||
| chunks.append( | ||
| { | ||
| "content": chunk_content, | ||
| "meta": {"header": header_text, "parent_headers": parent_headers}, | ||
| } | ||
| ) | ||
| pending_headers = [] # reset pending headers | ||
| else: | ||
| chunks.append({"content": content, "meta": {"header": header_text, "parent_headers": parent_headers}}) | ||
|
|
||
| # reset active parents | ||
| active_parents = [h for h in header_stack[: level - 1] if h is not None] | ||
|
|
||
| # return doc unchunked if no headers have content | ||
| if not has_content: | ||
| logger.info( | ||
| "Document {doc_id} contains only headers with no content; returning original document.", doc_id=doc_id | ||
| ) | ||
| return [{"content": text, "meta": {}}] | ||
|
|
||
| return chunks | ||
|
|
||
| def _apply_secondary_splitting(self, documents: list[Document]) -> list[Document]: | ||
| """ | ||
| Apply secondary splitting while preserving header metadata and structure. | ||
|
|
||
| Ensures page counting is maintained across splits. | ||
| """ | ||
| result_docs = [] | ||
| current_split_id = 0 # track split_id across all secondary splits from the same parent | ||
|
|
||
| for doc in documents: | ||
| if doc.content is None: | ||
| result_docs.append(doc) | ||
| continue | ||
|
|
||
| content_for_splitting: str = doc.content | ||
|
|
||
| if not self.keep_headers: # skip header extraction if keep_headers | ||
| # extract header information | ||
| header_match = re.search(self._header_pattern, doc.content) | ||
| if header_match: | ||
| content_for_splitting = doc.content[header_match.end() :] | ||
|
|
||
| # track page from meta | ||
| current_page = doc.meta.get("page_number", 1) | ||
|
|
||
| # create a clean meta dict without split_id for secondary splitting | ||
| clean_meta = {k: v for k, v in doc.meta.items() if k != "split_id"} | ||
|
|
||
| secondary_splits = self.secondary_splitter.run( | ||
| documents=[Document(content=content_for_splitting, meta=clean_meta)] | ||
| )["documents"] | ||
|
|
||
| # split processing | ||
| for i, split in enumerate(secondary_splits): | ||
| # calculate page number for this split | ||
| if i > 0 and secondary_splits[i - 1].content: | ||
| current_page = self._update_page_number_with_breaks(secondary_splits[i - 1].content, current_page) | ||
|
|
||
| # set page number and split_id to meta | ||
| split.meta["page_number"] = current_page | ||
| split.meta["split_id"] = current_split_id | ||
| # ensure source_id is preserved from the original document | ||
| if "source_id" in doc.meta: | ||
| split.meta["source_id"] = doc.meta["source_id"] | ||
| current_split_id += 1 | ||
|
|
||
| # preserve header metadata if we're not keeping headers in content | ||
| if not self.keep_headers: | ||
| for key in ["header", "parent_headers"]: | ||
| if key in doc.meta: | ||
| split.meta[key] = doc.meta[key] | ||
|
|
||
| result_docs.append(split) | ||
|
|
||
| logger.debug( | ||
| "Secondary splitting complete. Final count: {final_count} documents.", final_count=len(result_docs) | ||
| ) | ||
| return result_docs | ||
|
|
||
| def _update_page_number_with_breaks(self, content: str, current_page: int) -> int: | ||
| """ | ||
| Update page number based on page breaks in content. | ||
|
|
||
| :param content: Content to check for page breaks | ||
| :param current_page: Current page number | ||
| :return: New current page number | ||
| """ | ||
| if not isinstance(content, str): | ||
| return current_page | ||
|
|
||
| page_breaks = content.count(self.page_break_character) | ||
| new_page_number = current_page + page_breaks | ||
|
|
||
| if page_breaks > 0: | ||
| logger.debug( | ||
| "Found {page_breaks} page breaks, page number updated: {old} → {new}", | ||
| page_breaks=page_breaks, | ||
| old=current_page, | ||
| new=new_page_number, | ||
| ) | ||
|
|
||
| return new_page_number | ||
|
|
||
| def _split_documents_by_markdown_headers(self, documents: list[Document]) -> list[Document]: | ||
| """Split a list of documents by markdown headers, preserving metadata.""" | ||
|
|
||
| result_docs = [] | ||
| for doc in documents: | ||
| logger.debug("Splitting document with id={doc_id}", doc_id=doc.id) | ||
| # mypy: doc.content is Optional[str], so we must check for None before passing to splitting method | ||
| if doc.content is None: | ||
| continue | ||
OGuggenbuehl marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| splits = self._split_text_by_markdown_headers(doc.content, doc.id) | ||
| docs = [] | ||
|
|
||
| current_page = doc.meta.get("page_number", 1) if doc.meta else 1 | ||
| total_pages = doc.content.count(self.page_break_character) + 1 | ||
| logger.debug( | ||
| "Processing page number: {current_page} out of {total_pages}", | ||
| current_page=current_page, | ||
| total_pages=total_pages, | ||
| ) | ||
|
Comment on lines
+260
to
+266
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Correct me if I'm wrong but this doesn't sound quite right. The incoming document is usually a converted PDF file from a converter that hasn't yet been split. So this would mean the Either way the message Also currently you don't offset total pages by current page so we could end up with message like |
||
| for split_idx, split in enumerate(splits): | ||
| meta = doc.meta.copy() if doc.meta else {} | ||
| meta.update({"source_id": doc.id, "page_number": current_page, "split_id": split_idx}) | ||
| if split.get("meta"): | ||
| meta.update(split["meta"]) | ||
| current_page = self._update_page_number_with_breaks(split["content"], current_page) | ||
| docs.append(Document(content=split["content"], meta=meta)) | ||
| logger.debug( | ||
| "Split into {num_docs} documents for id={doc_id}, final page: {current_page}", | ||
| num_docs=len(docs), | ||
| doc_id=doc.id, | ||
| current_page=current_page, | ||
| ) | ||
| result_docs.extend(docs) | ||
| return result_docs | ||
|
|
||
| @component.output_types(documents=list[Document]) | ||
| def run(self, documents: list[Document]) -> dict[str, list[Document]]: | ||
| """ | ||
| Run the markdown header splitter with optional secondary splitting. | ||
|
|
||
| :param documents: List of documents to split | ||
|
|
||
| :returns: A dictionary with the following key: | ||
| - `documents`: List of documents with the split texts. Each document includes: | ||
| - A metadata field `source_id` to track the original document. | ||
| - A metadata field `page_number` to track the original page number. | ||
OGuggenbuehl marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| - A metadata field `split_id` to identify the split chunk index within its parent document. | ||
| - All other metadata copied from the original document. | ||
| """ | ||
| if self.secondary_split and not self._is_warmed_up: | ||
| self.warm_up() | ||
| # validate input documents | ||
OGuggenbuehl marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| for doc in documents: | ||
| if doc.content is None: | ||
| raise ValueError( | ||
| ( | ||
| "MarkdownHeaderSplitter only works with text documents but content for document ID" | ||
| f" {doc.id} is None." | ||
| ) | ||
| ) | ||
| if not isinstance(doc.content, str): | ||
| raise ValueError("MarkdownHeaderSplitter only works with text documents (str content).") | ||
|
|
||
| final_docs = [] | ||
| for doc in documents: | ||
| # handle empty documents | ||
| if not doc.content or not doc.content.strip(): # avoid counting whitespace as content | ||
| if self.skip_empty_documents: | ||
| logger.warning("Document ID {doc_id} has an empty content. Skipping this document.", doc_id=doc.id) | ||
| continue | ||
| # keep empty documents | ||
| final_docs.append(doc) | ||
| logger.warning( | ||
| "Document ID {doc_id} has an empty content. Keeping this document as per configuration.", | ||
| doc_id=doc.id, | ||
| ) | ||
| continue | ||
|
|
||
| # split this document by headers | ||
| header_split_docs = self._split_documents_by_markdown_headers([doc]) | ||
|
|
||
| # apply secondary splitting if configured | ||
| if self.secondary_split: | ||
| doc_splits = self._apply_secondary_splitting(header_split_docs) | ||
| else: | ||
| doc_splits = header_split_docs | ||
|
|
||
| final_docs.extend(doc_splits) | ||
|
|
||
| return {"documents": final_docs} | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,9 @@ | ||
| --- | ||
| features: | ||
| - | | ||
| Introduced the `MarkdownHeaderSplitter` component: | ||
| - Splits documents into chunks at Markdown headers (`#`, `##`, etc.), preserving header hierarchy as metadata. | ||
| - Optionally infers and rewrites header levels for documents where header structure is ambiguous (e.g. documents parsed using Docling). | ||
| - Supports secondary splitting (by word, passage, period, or line) for further chunking after header-based splitting using Haystack's `DocumentSplitter`. | ||
| - Preserves and propagates metadata such as parent headers and page numbers. | ||
| - Handles edge cases such as documents with no headers, empty content, and non-text documents. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Apologies we have moved to using python 3.10 types since this PR is opened. So if you could drop
Optionaland instead use| Noneinstead that would be great!There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you could also update your branch with current main then the formatting scripts and tests should catch this change for you