diff --git a/backend/celery_config.py b/backend/celery_config.py index bb7eaa2b2d8..0ea6f8f7725 100644 --- a/backend/celery_config.py +++ b/backend/celery_config.py @@ -8,6 +8,7 @@ celery = Celery(__name__) + if CELERY_BROKER_URL.startswith("sqs"): broker_transport_options = { CELERY_BROKER_QUEUE_NAME: { @@ -37,5 +38,4 @@ else: raise ValueError(f"Unsupported broker URL: {CELERY_BROKER_URL}") - -celery.autodiscover_tasks(["modules.sync", "modules", "middlewares", "packages"]) +celery.autodiscover_tasks(["modules.sync", "modules","modules.assistant.ito", "middlewares", "packages"]) diff --git a/backend/modules/assistant/ito/ito.py b/backend/modules/assistant/ito/ito.py index dc008178113..873605349b3 100644 --- a/backend/modules/assistant/ito/ito.py +++ b/backend/modules/assistant/ito/ito.py @@ -13,6 +13,9 @@ from modules.assistant.ito.utils.pdf_generator import PDFGenerator, PDFModel from modules.chat.controller.chat.utils import update_user_usage from modules.contact_support.controller.settings import ContactsSettings +from modules.notification.dto.inputs import NotificationUpdatableProperties +from modules.notification.entity.notification import NotificationsStatusEnum +from modules.notification.service.notification_service import NotificationService from modules.upload.controller.upload_routes import upload_file from modules.user.entity.user_identity import UserIdentity from modules.user.service.user_usage import UserUsage @@ -22,6 +25,8 @@ logger = get_logger(__name__) +notification_service = NotificationService() + class ITO(BaseModel): input: InputAssistant @@ -62,31 +67,36 @@ def increase_usage_user(self): def calculate_pricing(self): return 20 - def generate_pdf(self, filename: str, title: str, content: str): - pdf_model = PDFModel(title=title, content=content) - pdf = PDFGenerator(pdf_model) - pdf.print_pdf() - pdf.output(filename, "F") - @abstractmethod async def process_assistant(self): pass + +async def uploadfile_to_file(uploadFile: UploadFile): + # Transform the UploadFile object to a file object with same name and content + tmp_file = NamedTemporaryFile(delete=False) + tmp_file.write(uploadFile.file.read()) + tmp_file.flush() # Make sure all data is written to disk + return tmp_file + + +class OutputHandler(BaseModel): async def send_output_by_email( self, - file: UploadFile, filename: str, + file: UploadFile, task_name: str, custom_message: str, brain_id: str = None, + user_email: str = None, ): settings = ContactsSettings() - file = await self.uploadfile_to_file(file) + file = await uploadfile_to_file(file) domain_quivr = os.getenv("QUIVR_DOMAIN", "https://chat.quivr.app/") with open(file.name, "rb") as f: mail_from = settings.resend_contact_sales_from - mail_to = self.current_user.email + mail_to = user_email body = f"""
Quivr Logo @@ -116,20 +126,35 @@ async def send_output_by_email( "subject": "Quivr Ingestion Processed", "reply_to": "no-reply@quivr.app", "html": body, - "attachments": [{"filename": filename, "content": list(f.read())}], + "attachments": [ + { + "filename": filename, + "content": list(f.read()), + "type": "application/pdf", + } + ], } logger.info(f"Sending email to {mail_to} with file {filename}") send_email(params) - async def uploadfile_to_file(self, uploadFile: UploadFile): - # Transform the UploadFile object to a file object with same name and content - tmp_file = NamedTemporaryFile(delete=False) - tmp_file.write(uploadFile.file.read()) - tmp_file.flush() # Make sure all data is written to disk - return tmp_file + def generate_pdf(self, filename: str, title: str, content: str): + pdf_model = PDFModel(title=title, content=content) + pdf = PDFGenerator(pdf_model) + pdf.print_pdf() + pdf.output(filename, "F") async def create_and_upload_processed_file( - self, processed_content: str, original_filename: str, file_description: str + self, + processed_content: str, + original_filename: str, + file_description: str, + content: str, + task_name: str, + custom_message: str, + brain_id: str = None, + email_activated: bool = False, + current_user: UserIdentity = None, + notification_id: str = None, ) -> dict: """Handles creation and uploading of the processed file.""" # remove any special characters from the filename that aren't http safe @@ -164,32 +189,36 @@ async def create_and_upload_processed_file( headers={"content-type": "application/pdf"}, ) - if self.input.outputs.email.activated: + logger.info(f"current_user: {current_user}") + if email_activated: await self.send_output_by_email( - file_to_upload, new_filename, + file_to_upload, "Summary", f"{file_description} of {original_filename}", - brain_id=( - self.input.outputs.brain.value - if ( - self.input.outputs.brain.activated - and self.input.outputs.brain.value - ) - else None - ), + brain_id=brain_id, + user_email=current_user["email"], ) # Reset to start of file before upload file_to_upload.file.seek(0) - if self.input.outputs.brain.activated: + UserIdentity(**current_user) + if brain_id: await upload_file( uploadFile=file_to_upload, - brain_id=self.input.outputs.brain.value, - current_user=self.current_user, + brain_id=brain_id, + current_user=current_user, chat_id=None, ) os.remove(new_filename) + notification_service.update_notification_by_id( + notification_id, + NotificationUpdatableProperties( + status=NotificationsStatusEnum.SUCCESS, + description=f"Summary of {original_filename} generated successfully", + ), + ) + return {"message": f"{file_description} generated successfully"} diff --git a/backend/modules/assistant/ito/summary.py b/backend/modules/assistant/ito/summary.py index 4c713aa2f95..509b271513c 100644 --- a/backend/modules/assistant/ito/summary.py +++ b/backend/modules/assistant/ito/summary.py @@ -1,6 +1,7 @@ import tempfile from typing import List +from celery_config import celery from fastapi import UploadFile from langchain.chains import ( MapReduceDocumentsChain, @@ -23,9 +24,12 @@ Outputs, ) from modules.assistant.ito.ito import ITO +from modules.notification.dto.inputs import CreateNotification +from modules.notification.service.notification_service import NotificationService from modules.user.entity.user_identity import UserIdentity logger = get_logger(__name__) +notification_service = NotificationService() class SummaryAssistant(ITO): @@ -69,97 +73,119 @@ def check_input(self): return True async def process_assistant(self): - try: - self.increase_usage_user() - except Exception as e: - logger.error(f"Error increasing usage: {e}") - return {"error": str(e)} - - # Create a temporary file with the uploaded file as a temporary file and then pass it to the loader - tmp_file = tempfile.NamedTemporaryFile(delete=False) - - # Write the file to the temporary file - tmp_file.write(self.files[0].file.read()) - - # Now pass the path of the temporary file to the loader - - loader = UnstructuredPDFLoader(tmp_file.name) - - tmp_file.close() - - data = loader.load() - - llm = ChatLiteLLM(model="gpt-4o", max_tokens=2000) - - map_template = """The following is a document that has been divided into multiple sections: - {docs} - - Please carefully analyze each section and identify the following: - - 1. Main Themes: What are the overarching ideas or topics in this section? - 2. Key Points: What are the most important facts, arguments, or ideas presented in this section? - 3. Important Information: Are there any crucial details that stand out? This could include data, quotes, specific events, entity, or other relevant information. - 4. People: Who are the key individuals mentioned in this section? What roles do they play? - 5. Reasoning: What logic or arguments are used to support the key points? - 6. Chapters: If the document is divided into chapters, what is the main focus of each chapter? - - Remember to consider the language and context of the document. This will help in understanding the nuances and subtleties of the text.""" - map_prompt = PromptTemplate.from_template(map_template) - map_chain = LLMChain(llm=llm, prompt=map_prompt) + notification = notification_service.add_notification( + CreateNotification( + user_id=self.current_user.id, + status="info", + title=f"Creating Summary for {self.files[0].filename}", + ) + ) + # Create a temporary file with the uploaded file as a temporary file and then pass it to the loader + tmp_file = tempfile.NamedTemporaryFile(delete=False) - # Reduce - reduce_template = """The following is a set of summaries for parts of the document: - {docs} - Take these and distill it into a final, consolidated summary of the document. Make sure to include the main themes, key points, and important information such as data, quotes,people and specific events. - Use markdown such as bold, italics, underlined. For example, **bold**, *italics*, and _underlined_ to highlight key points. - Please provide the final summary with sections using bold headers. - Sections should always be Summary and Key Points, but feel free to add more sections as needed. - Always use bold text for the sections headers. - Keep the same language as the documents. - Answer:""" - reduce_prompt = PromptTemplate.from_template(reduce_template) + # Write the file to the temporary file + tmp_file.write(self.files[0].file.read()) - # Run chain - reduce_chain = LLMChain(llm=llm, prompt=reduce_prompt) + # Now pass the path of the temporary file to the loader - # Takes a list of documents, combines them into a single string, and passes this to an LLMChain - combine_documents_chain = StuffDocumentsChain( - llm_chain=reduce_chain, document_variable_name="docs" - ) + loader = UnstructuredPDFLoader(tmp_file.name) - # Combines and iteratively reduces the mapped documents - reduce_documents_chain = ReduceDocumentsChain( - # This is final chain that is called. - combine_documents_chain=combine_documents_chain, - # If documents exceed context for `StuffDocumentsChain` - collapse_documents_chain=combine_documents_chain, - # The maximum number of tokens to group documents into. - token_max=4000, - ) + tmp_file.close() - # Combining documents by mapping a chain over them, then combining results - map_reduce_chain = MapReduceDocumentsChain( - # Map chain - llm_chain=map_chain, - # Reduce chain - reduce_documents_chain=reduce_documents_chain, - # The variable name in the llm_chain to put the documents in - document_variable_name="docs", - # Return the results of the map steps in the output - return_intermediate_steps=False, - ) + data = loader.load() - text_splitter = CharacterTextSplitter.from_tiktoken_encoder( - chunk_size=1000, chunk_overlap=100 - ) - split_docs = text_splitter.split_documents(data) + text_splitter = CharacterTextSplitter.from_tiktoken_encoder( + chunk_size=1000, chunk_overlap=100 + ) + split_docs = text_splitter.split_documents(data) + logger.info(f"Split {len(split_docs)} documents") + # Jsonify the split docs + split_docs = [doc.to_json() for doc in split_docs] + ## Turn this into a task + brain_id = ( + self.input.outputs.brain.value + if self.input.outputs.brain.activated + else None + ) + email_activated = self.input.outputs.email.activated + celery.send_task( + name="task_summary", + args=( + split_docs, + self.files[0].filename, + brain_id, + email_activated, + self.current_user.model_dump(mode="json"), + notification.id, + ), + ) + except Exception as e: + logger.error(f"Error processing summary: {e}") + + +def map_reduce_chain(): + llm = ChatLiteLLM(model="gpt-4o", max_tokens=2000) + + map_template = """The following is a document that has been divided into multiple sections: + {docs} + + Please carefully analyze each section and identify the following: + + 1. Main Themes: What are the overarching ideas or topics in this section? + 2. Key Points: What are the most important facts, arguments, or ideas presented in this section? + 3. Important Information: Are there any crucial details that stand out? This could include data, quotes, specific events, entity, or other relevant information. + 4. People: Who are the key individuals mentioned in this section? What roles do they play? + 5. Reasoning: What logic or arguments are used to support the key points? + 6. Chapters: If the document is divided into chapters, what is the main focus of each chapter? + + Remember to consider the language and context of the document. This will help in understanding the nuances and subtleties of the text.""" + map_prompt = PromptTemplate.from_template(map_template) + map_chain = LLMChain(llm=llm, prompt=map_prompt) + + # Reduce + reduce_template = """The following is a set of summaries for parts of the document : + {docs} + Take these and distill it into a final, consolidated summary of the document. Make sure to include the main themes, key points, and important information such as data, quotes,people and specific events. + Use markdown such as bold, italics, underlined. For example, **bold**, *italics*, and _underlined_ to highlight key points. + Please provide the final summary with sections using bold headers. + Sections should always be Summary and Key Points, but feel free to add more sections as needed. + Always use bold text for the sections headers. + Keep the same language as the documents. + Answer:""" + reduce_prompt = PromptTemplate.from_template(reduce_template) + + # Run chain + llm = ChatLiteLLM(model="gpt-4o", max_tokens=2000) + reduce_chain = LLMChain(llm=llm, prompt=reduce_prompt) + + # Takes a list of documents, combines them into a single string, and passes this to an LLMChain + combine_documents_chain = StuffDocumentsChain( + llm_chain=reduce_chain, document_variable_name="docs" + ) - content = map_reduce_chain.run(split_docs) + # Combines and iteratively reduces the mapped documents + reduce_documents_chain = ReduceDocumentsChain( + # This is final chain that is called. + combine_documents_chain=combine_documents_chain, + # If documents exceed context for `StuffDocumentsChain` + collapse_documents_chain=combine_documents_chain, + # The maximum number of tokens to group documents into. + token_max=4000, + ) - return await self.create_and_upload_processed_file( - content, self.files[0].filename, "Summary" - ) + # Combining documents by mapping a chain over them, then combining results + map_reduce_chain = MapReduceDocumentsChain( + # Map chain + llm_chain=map_chain, + # Reduce chain + reduce_documents_chain=reduce_documents_chain, + # The variable name in the llm_chain to put the documents in + document_variable_name="docs", + # Return the results of the map steps in the output + return_intermediate_steps=False, + ) + return map_reduce_chain def summary_inputs(): diff --git a/backend/modules/assistant/ito/tasks.py b/backend/modules/assistant/ito/tasks.py new file mode 100644 index 00000000000..904d8f6a49d --- /dev/null +++ b/backend/modules/assistant/ito/tasks.py @@ -0,0 +1,45 @@ +import asyncio + +from celery_config import celery +from langchain_core.documents import Document +from logger import get_logger + +from .ito import OutputHandler +from .summary import map_reduce_chain + +logger = get_logger(__name__) + + +@celery.task(name="task_summary") +def task_summary( + split_docs, filename, brain_id, email_activated, current_user, notification_id +): + loop = asyncio.get_event_loop() + # turn split_docs into a list of Document objects + logger.info("split_docs: %s", split_docs) + split_docs = [ + Document( + page_content=doc["kwargs"]["page_content"], + metadata=doc["kwargs"]["metadata"], + ) + for doc in split_docs + if "kwargs" in doc + and "page_content" in doc["kwargs"] + and "metadata" in doc["kwargs"] + ] + content = map_reduce_chain().run(split_docs) + output_handler = OutputHandler() + return loop.run_until_complete( + output_handler.create_and_upload_processed_file( + content, + filename, + "Summary", + content, + "Summary", + "Summary", + brain_id, + email_activated, + current_user, + notification_id, + ) + ) diff --git a/docker-compose.yml b/docker-compose.yml index 7713d5f4e3e..90d8f5e6e34 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -45,8 +45,6 @@ services: - "--workers" - "6" restart: always - volumes: - - ./backend/:/code/ ports: - 5050:5050