Skip to content

Commit

Permalink
Pandas-ify Create Base Documents (#1209)
Browse files Browse the repository at this point in the history
  • Loading branch information
AlonsoGuevara authored Sep 25, 2024
1 parent f518c8b commit dda4edd
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 30 deletions.
4 changes: 4 additions & 0 deletions .semversioner/next-release/patch-20240925001101245840.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{
"type": "patch",
"description": "Optimize Create Base Documents subflow"
}
57 changes: 27 additions & 30 deletions graphrag/index/workflows/v1/subflows/create_base_documents.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@
)
from datashaper.table_store.types import VerbResult, create_verb_result

from graphrag.index.verbs.overrides.aggregate import aggregate_df


@verb(name="create_base_documents", treats_input_tables_as_immutable=True)
def create_base_documents(
Expand All @@ -26,55 +24,54 @@ def create_base_documents(
source = cast(pd.DataFrame, input.get_input())
text_units = cast(pd.DataFrame, input.get_others()[0])

text_units = cast(
pd.DataFrame, text_units.explode("document_ids")[["id", "document_ids", "text"]]
)
text_units.rename(
columns={
"document_ids": "chunk_doc_id",
"id": "chunk_id",
"text": "chunk_text",
},
inplace=True,
text_units = (
text_units.explode("document_ids")
.loc[:, ["id", "document_ids", "text"]]
.rename(
columns={
"document_ids": "chunk_doc_id",
"id": "chunk_id",
"text": "chunk_text",
}
)
)

joined = text_units.merge(
source,
left_on="chunk_doc_id",
right_on="id",
how="inner",
copy=False,
)

docs_with_text_units = aggregate_df(
joined,
groupby=["id"],
aggregations=[
{
"column": "chunk_id",
"operation": "array_agg",
"to": "text_units",
}
],
docs_with_text_units = joined.groupby("id", sort=False).agg(
text_units=("chunk_id", list)
)

rejoined = docs_with_text_units.merge(
source,
on="id",
how="right",
)
copy=False,
).reset_index(drop=True)

rejoined.rename(columns={"text": "raw_content"}, inplace=True)
rejoined["id"] = rejoined["id"].astype(str)

# attribute columns are converted to strings and then collapsed into a single json object
# Convert attribute columns to strings and collapse them into a JSON object
if document_attribute_columns:
for column in document_attribute_columns:
rejoined[column] = rejoined[column].astype(str)
rejoined["attributes"] = rejoined[document_attribute_columns].apply(
lambda row: {**row},
axis=1,
# Convert all specified columns to string at once
rejoined[document_attribute_columns] = rejoined[
document_attribute_columns
].astype(str)

# Collapse the document_attribute_columns into a single JSON object column
rejoined["attributes"] = rejoined[document_attribute_columns].to_dict(
orient="records"
)

# Drop the original attribute columns after collapsing them
rejoined.drop(columns=document_attribute_columns, inplace=True)
rejoined.reset_index()

return create_verb_result(
cast(
Expand Down

0 comments on commit dda4edd

Please sign in to comment.