diff --git a/README.md b/README.md index 37e9c73..6bd6feb 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # Paper Search MCP -A Model Context Protocol (MCP) server for searching and downloading academic papers from multiple sources, including arXiv, PubMed, bioRxiv, and Sci-Hub (optional). Designed for seamless integration with large language models like Claude Desktop. +A Model Context Protocol (MCP) server for searching and downloading academic papers from multiple sources, including arXiv, PubMed, bioRxiv, medRxiv, Google Scholar, IACR ePrint Archive, Semantic Scholar, and Zenodo. Designed for seamless integration with large language models like Claude Desktop. ![PyPI](https://img.shields.io/pypi/v/paper-search-mcp.svg) ![License](https://img.shields.io/badge/license-MIT-blue.svg) ![Python](https://img.shields.io/badge/python-3.10+-blue.svg) [![smithery badge](https://smithery.ai/badge/@openags/paper-search-mcp)](https://smithery.ai/server/@openags/paper-search-mcp) @@ -33,12 +33,24 @@ A Model Context Protocol (MCP) server for searching and downloading academic pap ## Features -- **Multi-Source Support**: Search and download papers from arXiv, PubMed, bioRxiv, medRxiv, Google Scholar, IACR ePrint Archive, Semantic Scholar. +- **Multi-Source Support**: Search and download papers from arXiv, PubMed, bioRxiv, medRxiv, Google Scholar, IACR ePrint Archive, Semantic Scholar, and Zenodo. - **Standardized Output**: Papers are returned in a consistent dictionary format via the `Paper` class. - **Asynchronous Tools**: Efficiently handles network requests using `httpx`. - **MCP Integration**: Compatible with MCP clients for LLM context enhancement. - **Extensible Design**: Easily add new academic platforms by extending the `academic_platforms` module. +### Zenodo tools overview + +The Zenodo tools help you find and retrieve research papers recorded on Zenodo (and interact with their records): + +- `search_zenodo`: Search research papers recorded on Zenodo (supports Lucene query and filters like community, year, resource_type, subtype, creators, keywords, sort, order). +- `download_zenodo`: Download the PDF of a research paper recorded on Zenodo when the record includes a PDF. +- `read_zenodo_paper`: Extract text from the PDF of a research paper recorded on Zenodo. +- `search_zenodo_communities`: Discover Zenodo communities (by title/slug/description) to find collections of research papers. +- `get_zenodo_record_details`: Retrieve the raw Zenodo record JSON for a paper or any record. +- `list_zenodo_files`: List files attached to a research paper recorded on Zenodo (or any record). +- `search_zenodo_by_creator`: Convenience search for research papers recorded on Zenodo by a single creator/author. + --- ## Installation @@ -157,6 +169,7 @@ We welcome contributions! Here's how to get started: - [√] Google Scholar - [√] IACR ePrint Archive - [√] Semantic Scholar +- [√] Zenodo - [ ] PubMed Central (PMC) - [ ] Science Direct - [ ] Springer Link diff --git a/paper_search_mcp/academic_platforms/zenodo.py b/paper_search_mcp/academic_platforms/zenodo.py new file mode 100644 index 0000000..97c1ebb --- /dev/null +++ b/paper_search_mcp/academic_platforms/zenodo.py @@ -0,0 +1,519 @@ +# paper_search_mcp/academic_platforms/zenodo.py +from typing import List, Optional, Dict, Any +from datetime import datetime +import os +import requests +from ..paper import Paper +from PyPDF2 import PdfReader + +import logging +import random + +logger = logging.getLogger(__name__) + + +class PaperSource: + """Abstract base class for paper sources""" + + def search(self, query: str, **kwargs) -> List[Paper]: + raise NotImplementedError + + def download_pdf(self, paper_id: str, save_path: str) -> str: + raise NotImplementedError + + def read_paper(self, paper_id: str, save_path: str) -> str: + raise NotImplementedError + + +class ZenodoSearcher(PaperSource): + """Zenodo paper and dataset search implementation""" + + BASE_URL = os.environ.get("ZENODO_BASE_URL", "https://zenodo.org") + API_TOKEN = os.environ.get("ZENODO_ACCESS_TOKEN", "").strip() + + BROWSERS = [ + "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36", + "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7)", + "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36", + ] + + def __init__(self): + self._setup_session() + + def _setup_session(self): + """Initialize HTTP session with random user agent and optional token""" + self.session = requests.Session() + self.session.headers.update( + { + "User-Agent": random.choice(self.BROWSERS), + "Accept": "application/json, text/plain, */*", + "Accept-Language": "en-US,en;q=0.9", + } + ) + if self.API_TOKEN: + # Zenodo uses OAuth Bearer token + self.session.headers["Authorization"] = f"Bearer {self.API_TOKEN}" + + def _parse_date(self, date_str: Optional[str]) -> Optional[datetime]: + if not date_str: + return None + s = str(date_str).strip() + # Try ISO 8601 first (e.g., 2025-08-07T22:52:05.283108+00:00 or ...Z) + try: + s_iso = s.replace("Z", "+00:00") + return datetime.fromisoformat(s_iso) + except Exception: + pass + # If a timestamp, try the date part only + if "T" in s: + try: + return datetime.strptime(s.split("T")[0], "%Y-%m-%d") + except Exception: + pass + # Try common formats from Zenodo (YYYY-MM-DD or YYYY-MM or YYYY) + for fmt in ("%Y-%m-%d", "%Y-%m", "%Y"): + try: + return datetime.strptime(s, fmt) + except Exception: + continue + # Avoid noisy warnings; log at debug level + logger.debug(f"Zenodo: could not parse date: {s}") + return None + + def _select_pdf_file(self, rec: Dict[str, Any]) -> Optional[Dict[str, Any]]: + files = rec.get("files") or [] + if not isinstance(files, list): + return None + # Prefer explicit PDFs by key extension or type/mimetype when available + pdfs = [] + for f in files: + key = f.get("key", "") + if key.lower().endswith(".pdf"): + pdfs.append(f) + continue + if (f.get("type") == "pdf") or (f.get("mimetype") == "application/pdf"): + pdfs.append(f) + if pdfs: + return pdfs[0] + # No PDF found + return None + + def _record_to_paper(self, rec: Dict[str, Any]) -> Optional[Paper]: + try: + metadata = rec.get("metadata", {}) or {} + title = metadata.get("title", "") + creators = metadata.get("creators") or [] + authors = [c.get("name") for c in creators if isinstance(c, dict) and c.get("name")] + description = metadata.get("description") or "" + + # Publication date: prefer metadata.publication_date, then metadata.dates[*].date, then rec.updated/created + publication_date_raw: Optional[str] = metadata.get("publication_date") + if not publication_date_raw: + dates_field = metadata.get("dates") + if isinstance(dates_field, list): + # Prefer Issued/Published if present, else the first date + preferred = None + for d in dates_field: + if isinstance(d, dict) and d.get("date"): + if str(d.get("type", "")).lower() in {"issued", "published", "publication"}: + preferred = d["date"] + break + if not preferred: + preferred = d["date"] + publication_date_raw = preferred + if not publication_date_raw: + publication_date_raw = rec.get("updated") or rec.get("created") + + published_date = self._parse_date(publication_date_raw) + doi = rec.get("doi") or metadata.get("doi") or "" + links = rec.get("links") or {} + url = links.get("html") or links.get("latest_html") or "" + + pdf_url = "" + selected_file = self._select_pdf_file(rec) + if selected_file: + file_links = selected_file.get("links") or {} + pdf_url = file_links.get("download") or file_links.get("self") or "" + + keywords = metadata.get("keywords") or [] + if isinstance(keywords, str): + keywords = [keywords] + + categories: List[str] = [] + try: + resource_type = metadata.get("resource_type") or {} + if isinstance(resource_type, dict) and resource_type.get("type"): + categories.append(resource_type["type"]) # e.g., 'publication' or 'dataset' + except Exception: + pass + + paper_id = str(rec.get("id")) if rec.get("id") is not None else (doi or url or title) + + extra = { + "conceptdoi": rec.get("conceptdoi"), + "resource_type": metadata.get("resource_type"), + "communities": metadata.get("communities"), + } + + return Paper( + paper_id=paper_id, + title=title, + authors=authors, + abstract=description, + url=url, + pdf_url=pdf_url, + published_date=published_date if published_date else None, + updated_date=self._parse_date(rec.get("updated")) if rec.get("updated") else None, + source="zenodo", + categories=categories, + keywords=keywords, + doi=doi, + citations=0, + extra=extra, + ) + except Exception as e: + logger.warning(f"Failed to map Zenodo record to Paper: {e}") + return None + + def _year_filter(self, year: Optional[str]) -> Optional[str]: + """Convert year argument to a Lucene publication_date filter. + Supports: "2025", "2016-2020", "2010-", "-2015". + """ + if not year: + return None + y = year.strip() + if "-" in y: + parts = y.split("-") + start = parts[0].strip() or "*" + end = parts[1].strip() if len(parts) > 1 else "*" + return f"metadata.publication_date:[{start} TO {end}]" + # single year + return f"metadata.publication_date:[{y} TO {y}]" + + def _build_query( + self, + query: str = "", + community: Optional[str] = None, + year: Optional[str] = None, + resource_type: Optional[str] = None, + subtype: Optional[str] = None, + creators: Optional[List[str]] = None, + keywords: Optional[List[str]] = None, + ) -> str: + parts: List[str] = [] + if query: + parts.append(f"({query})") + if community: + # Zenodo community slug, e.g., kios-coe + parts.append(f"communities:{community}") + yf = self._year_filter(year) + if yf: + parts.append(yf) + if resource_type: + parts.append(f"resource_type.type:{resource_type}") + if subtype: + parts.append(f"resource_type.subtype:{subtype}") + if creators: + # Match any of the creators' names + names = " OR ".join([f'"{c}"' for c in creators if c]) + if names: + parts.append(f"creators.name:({names})") + if keywords: + kws = " OR ".join([f'"{k}"' for k in keywords if k]) + if kws: + # Some records index as 'keywords' + parts.append(f"keywords:({kws})") + return " AND ".join(parts) if parts else "*" + + def search( + self, + query: str = "", + max_results: int = 10, + *, + community: Optional[str] = None, + year: Optional[str] = None, + resource_type: Optional[str] = None, + subtype: Optional[str] = None, + creators: Optional[List[str]] = None, + keywords: Optional[List[str]] = None, + sort: Optional[str] = None, + order: Optional[str] = None, + ) -> List[Paper]: + """ + Search Zenodo records using the public API. + + Args: + query: Free-text query (Lucene syntax supported by Zenodo) + max_results: Maximum number of results to return + community: Community slug (e.g., 'kios-coe') + year: Year or range, supports '2025', '2016-2020', '2010-', '-2015' + resource_type: e.g., 'publication', 'dataset' + subtype: e.g., 'conferencepaper', 'article' + creators: List of author names to match + keywords: List of keywords to match + sort: Field to sort by (e.g., 'mostrecent', 'bestmatch', 'version') + order: 'asc' or 'desc' + Returns: + List[Paper] + """ + papers: List[Paper] = [] + page = 1 + page_size = min(max_results, 100) + try: + q = self._build_query( + query=query, + community=community, + year=year, + resource_type=resource_type, + subtype=subtype, + creators=creators, + keywords=keywords, + ) + while len(papers) < max_results: + params: Dict[str, Any] = { + "q": q, + "page": page, + "size": page_size, + } + if sort: + params["sort"] = sort + if order: + params["order"] = order + url = f"{self.BASE_URL}/api/records" + resp = self.session.get(url, params=params, timeout=30) + if resp.status_code != 200: + logger.error( + f"Zenodo search failed: HTTP {resp.status_code}: {resp.text[:200]}" + ) + break + data = resp.json() + hits = (data.get("hits") or {}).get("hits") or [] + if not hits: + break + for rec in hits: + if len(papers) >= max_results: + break + paper = self._record_to_paper(rec) + if paper: + papers.append(paper) + page += 1 + except Exception as e: + logger.error(f"Zenodo search error: {e}") + return papers[:max_results] + + def _get_record(self, paper_id: str) -> Optional[Dict[str, Any]]: + """Fetch a Zenodo record by numeric ID or DOI/URL when possible.""" + # Prefer numeric IDs like '1234567' + try: + # If paper_id looks like a full URL, try to extract the numeric ID + if paper_id.startswith("http"): + # Expected patterns like https://zenodo.org/records/1234567 + for part in paper_id.split("/"): + if part.isdigit(): + paper_id = part + break + url = f"{self.BASE_URL}/api/records/{paper_id}" + resp = self.session.get(url, timeout=30) + if resp.status_code == 200: + return resp.json() + else: + logger.error(f"Failed to fetch Zenodo record {paper_id}: HTTP {resp.status_code}") + return None + except Exception as e: + logger.error(f"Error fetching Zenodo record {paper_id}: {e}") + return None + + def download_pdf(self, paper_id: str, save_path: str = "./downloads") -> str: + """ + Download a PDF file from a Zenodo record if available. + + Args: + paper_id: Zenodo record ID (numeric) or record URL + save_path: Directory to save the PDF + Returns: + Path to the downloaded PDF, or error message + """ + try: + rec = self._get_record(paper_id) + if not rec: + return f"Error: Could not fetch Zenodo record {paper_id}" + file_entry = self._select_pdf_file(rec) + if not file_entry: + return "Error: No PDF file available for this record" + + links = file_entry.get("links") or {} + download_url = links.get("download") or links.get("self") + if not download_url: + return "Error: No downloadable link for the selected file" + + os.makedirs(save_path, exist_ok=True) + filename = file_entry.get("key") or f"zenodo_{rec.get('id', 'file')}.pdf" + if not filename.lower().endswith(".pdf"): + filename += ".pdf" + outfile = os.path.join(save_path, f"zenodo_{str(rec.get('id'))}_{os.path.basename(filename)}") + + with self.session.get(download_url, timeout=60, stream=True) as r: + r.raise_for_status() + with open(outfile, "wb") as f: + for chunk in r.iter_content(chunk_size=8192): + if chunk: + f.write(chunk) + return outfile + except Exception as e: + logger.error(f"Zenodo PDF download error: {e}") + return f"Error downloading PDF: {e}" + + def read_paper(self, paper_id: str, save_path: str = "./downloads") -> str: + """ + Download and extract text from a Zenodo record's PDF if available. + """ + try: + pdf_path = self.download_pdf(paper_id, save_path) + if not os.path.isfile(pdf_path): + # When download_pdf returns an error message + return pdf_path + + text = "" + reader = PdfReader(pdf_path) + for page in reader.pages: + try: + page_text = page.extract_text() + if page_text: + text += page_text + "\n" + except Exception as e: + logger.warning(f"Failed to extract text from a page: {e}") + continue + + if not text.strip(): + return f"PDF downloaded to {pdf_path}, but unable to extract readable text" + return text.strip() + except Exception as e: + logger.error(f"Zenodo read paper error: {e}") + return f"Error reading paper: {e}" + + def search_communities( + self, + query: str = "", + max_results: int = 20, + *, + sort: Optional[str] = None, + order: Optional[str] = None, + ) -> List[Dict[str, Any]]: + """Search Zenodo communities. + + Args: + query: Free-text query for community title/slug/description. + max_results: Maximum number of communities to return. + sort: Sort field (e.g., 'newest', 'bestmatch'). + order: 'asc' or 'desc'. + Returns: + A list of community metadata dictionaries. + """ + results: List[Dict[str, Any]] = [] + page = 1 + page_size = min(max_results, 100) + try: + while len(results) < max_results: + params: Dict[str, Any] = { + "q": query or "*", + "page": page, + "size": page_size, + } + if sort: + params["sort"] = sort + if order: + params["order"] = order + + url = f"{self.BASE_URL}/api/communities" + resp = self.session.get(url, params=params, timeout=30) + if resp.status_code != 200: + logger.error( + f"Zenodo community search failed: HTTP {resp.status_code}: {resp.text[:200]}" + ) + break + data = resp.json() or {} + hits = (data.get("hits") or {}).get("hits") or [] + if not hits: + break + for com in hits: + if len(results) >= max_results: + break + results.append( + { + "id": com.get("id"), + "slug": com.get("slug"), + "title": com.get("title") or (com.get("metadata", {}).get("title") if isinstance(com.get("metadata"), dict) else None), + "description": com.get("description") or (com.get("metadata", {}).get("description") if isinstance(com.get("metadata"), dict) else None), + "created": com.get("created"), + "updated": com.get("updated"), + "links": (com.get("links") or {}), + } + ) + page += 1 + except Exception as e: + logger.error(f"Zenodo communities search error: {e}") + return results[:max_results] + + def get_record_details(self, paper_id: str) -> Optional[Dict[str, Any]]: + """Public method to fetch the raw Zenodo record JSON by numeric ID or URL.""" + try: + return self._get_record(paper_id) + except Exception as e: + logger.error(f"Zenodo get_record_details error: {e}") + return None + + def list_files(self, paper_id: str) -> List[Dict[str, Any]]: + """List files for a given record ID/URL with basic metadata and download links.""" + files_info: List[Dict[str, Any]] = [] + try: + rec = self._get_record(paper_id) + if not rec: + return files_info + files = rec.get("files") or [] + if not isinstance(files, list): + return files_info + for f in files: + links = f.get("links") or {} + files_info.append( + { + "key": f.get("key"), + "size": f.get("size"), + "checksum": f.get("checksum"), + "type": f.get("type"), + "mimetype": f.get("mimetype"), + "download": links.get("download") or links.get("self"), + } + ) + except Exception as e: + logger.error(f"Zenodo list_files error: {e}") + return files_info + + def search_by_creator( + self, + creator: str, + max_results: int = 10, + *, + community: Optional[str] = None, + year: Optional[str] = None, + resource_type: Optional[str] = None, + subtype: Optional[str] = None, + sort: Optional[str] = None, + order: Optional[str] = None, + ) -> List[Paper]: + """Convenience wrapper to search by a single creator name.""" + try: + return self.search( + query="", + max_results=max_results, + community=community, + year=year, + resource_type=resource_type, + subtype=subtype, + creators=[creator] if creator else None, + keywords=None, + sort=sort, + order=order, + ) + except Exception as e: + logger.error(f"Zenodo search_by_creator error: {e}") + return [] diff --git a/paper_search_mcp/server.py b/paper_search_mcp/server.py index 7a371df..3734ed7 100644 --- a/paper_search_mcp/server.py +++ b/paper_search_mcp/server.py @@ -9,6 +9,7 @@ from .academic_platforms.google_scholar import GoogleScholarSearcher from .academic_platforms.iacr import IACRSearcher from .academic_platforms.semantic import SemanticSearcher +from .academic_platforms.zenodo import ZenodoSearcher # from .academic_platforms.hub import SciHubSearcher from .paper import Paper @@ -24,6 +25,7 @@ google_scholar_searcher = GoogleScholarSearcher() iacr_searcher = IACRSearcher() semantic_searcher = SemanticSearcher() +zenodo_searcher = ZenodoSearcher() # scihub_searcher = SciHubSearcher() @@ -341,5 +343,150 @@ async def read_semantic_paper(paper_id: str, save_path: str = "./downloads") -> return "" +@mcp.tool() +async def search_zenodo( + query: str, + max_results: int = 10, + *, + community: Optional[str] = None, + year: Optional[str] = None, + resource_type: Optional[str] = None, + subtype: Optional[str] = None, + creators: Optional[List[str]] = None, + keywords: Optional[List[str]] = None, + sort: Optional[str] = None, + order: Optional[str] = None, +) -> List[Dict]: + """Search research papers recorded on Zenodo (and optionally other Zenodo records). + + Use this to find publications archived on Zenodo. Supports Zenodo's Lucene + query syntax and common filters (community, year, resource_type, subtype, creators, keywords, sort, order). + + Args: + query: Free-text or Lucene query (e.g., 'anomaly detection'). + max_results: Maximum number of records to return (default: 10). + community: Community slug (e.g., 'kios-coe'). + year: Year or range (e.g., '2025', '2016-2020', '2010-', '-2015'). + resource_type: e.g., 'publication', 'dataset'. + subtype: e.g., 'conferencepaper', 'article'. + creators: List of author names to match. + keywords: List of keywords to match. + sort: Field to sort by (e.g., 'mostrecent', 'bestmatch', 'version'). + order: 'asc' or 'desc'. + Returns: + List of Zenodo record metadata (papers prioritized) as dictionaries. + """ + async with httpx.AsyncClient() as client: + papers = zenodo_searcher.search( + query, + max_results, + community=community, + year=year, + resource_type=resource_type, + subtype=subtype, + creators=creators, + keywords=keywords, + sort=sort, + order=order, + ) + return [paper.to_dict() for paper in papers] if papers else [] + + +@mcp.tool() +async def download_zenodo(paper_id: str, save_path: str = "./downloads") -> str: + """Download the PDF of a research paper recorded on Zenodo (if the record includes a PDF). + + Args: + paper_id: Zenodo record ID (numeric) or record URL. + save_path: Directory to save the PDF (default: './downloads'). + Returns: + Path to the downloaded PDF file or error message. + """ + return zenodo_searcher.download_pdf(paper_id, save_path) + + +@mcp.tool() +async def read_zenodo_paper(paper_id: str, save_path: str = "./downloads") -> str: + """Read and extract text from the PDF of a research paper recorded on Zenodo. + + Args: + paper_id: Zenodo record ID (numeric) or record URL. + save_path: Directory where the PDF is/will be saved (default: './downloads'). + Returns: + str: The extracted text content of the paper or an error message. + """ + try: + return zenodo_searcher.read_paper(paper_id, save_path) + except Exception as e: + print(f"Error reading Zenodo paper {paper_id}: {e}") + return "" + + +@mcp.tool() +async def search_zenodo_communities( + query: str = "", + max_results: int = 20, + sort: Optional[str] = None, + order: Optional[str] = None, +) -> List[Dict]: + """Search Zenodo communities to discover collections of research papers (by title/slug/description). + + Args: + query: Free-text query for communities. + max_results: Maximum number of communities to return. + sort: Sort field (e.g., 'newest', 'bestmatch'). + order: 'asc' or 'desc'. + Returns: + List of community metadata dictionaries. + """ + async with httpx.AsyncClient() as client: + results = zenodo_searcher.search_communities( + query=query, max_results=max_results, sort=sort, order=order + ) + return results if results else [] + + +@mcp.tool() +async def get_zenodo_record_details(paper_id: str) -> Dict: + """Get the raw Zenodo record JSON for a research paper (or any Zenodo record) by ID or URL.""" + async with httpx.AsyncClient() as client: + rec = zenodo_searcher.get_record_details(paper_id) + return rec or {} + + +@mcp.tool() +async def list_zenodo_files(paper_id: str) -> List[Dict]: + """List files attached to a research paper recorded on Zenodo (or any Zenodo record).""" + async with httpx.AsyncClient() as client: + files = zenodo_searcher.list_files(paper_id) + return files if files else [] + + +@mcp.tool() +async def search_zenodo_by_creator( + creator: str, + max_results: int = 10, + community: Optional[str] = None, + year: Optional[str] = None, + resource_type: Optional[str] = None, + subtype: Optional[str] = None, + sort: Optional[str] = None, + order: Optional[str] = None, +) -> List[Dict]: + """Convenience search to find research papers recorded on Zenodo by a single creator/author name.""" + async with httpx.AsyncClient() as client: + papers = zenodo_searcher.search_by_creator( + creator=creator, + max_results=max_results, + community=community, + year=year, + resource_type=resource_type, + subtype=subtype, + sort=sort, + order=order, + ) + return [p.to_dict() for p in papers] if papers else [] + + if __name__ == "__main__": mcp.run(transport="stdio") diff --git a/tests/test_zenodo.py b/tests/test_zenodo.py new file mode 100644 index 0000000..8d4c55f --- /dev/null +++ b/tests/test_zenodo.py @@ -0,0 +1,137 @@ +# tests/test_zenodo.py +import unittest +import os +import requests +import tempfile +import shutil +from datetime import datetime + +from paper_search_mcp.academic_platforms.zenodo import ZenodoSearcher + + +def check_zenodo_accessible(): + """Check if Zenodo API is accessible.""" + try: + r = requests.get("https://zenodo.org/api/records", params={"q": "*", "size": 1}, timeout=8) + return r.status_code == 200 + except Exception: + return False + + +class TestZenodoSearcher(unittest.TestCase): + @classmethod + def setUpClass(cls): + cls.zenodo_accessible = check_zenodo_accessible() + if not cls.zenodo_accessible: + print("\nWarning: Zenodo is not accessible, some tests will be skipped") + cls.searcher = ZenodoSearcher() + + # Try to locate a generic sample publication for deeper tests + cls.sample_record_id = None + cls.sample_pdf_available = False + if cls.zenodo_accessible: + try: + papers = cls.searcher.search( + query="machine learning", + max_results=3, + resource_type="publication", + sort="mostrecent", + ) + if papers: + cls.sample_record_id = str(papers[0].paper_id) + cls.sample_pdf_available = bool(papers[0].pdf_url) + print(f"Sample Zenodo record selected for tests: {cls.sample_record_id}") + except Exception as e: + print(f"Could not prefetch sample record: {e}") + + def setUp(self): + self.searcher = self.__class__.searcher + + @unittest.skipUnless(check_zenodo_accessible(), "Zenodo not accessible") + def test_search_basic(self): + papers = self.searcher.search("control systems", max_results=3) + self.assertIsInstance(papers, list) + self.assertLessEqual(len(papers), 3) + if papers: + p = papers[0] + self.assertEqual(p.source, "zenodo") + self.assertTrue(hasattr(p, "title")) + self.assertTrue(p.url.startswith("http")) + + @unittest.skipUnless(check_zenodo_accessible(), "Zenodo not accessible") + def test_search_with_date_filter(self): + papers = self.searcher.search( + query="metadata.publication_date:[2025-01-01 TO 2025-12-31]", + max_results=5, + resource_type="publication", + sort="mostrecent", + ) + self.assertIsInstance(papers, list) + for p in papers: + self.assertEqual(p.source, "zenodo") + # published_date should parse ISO 8601 or YYYY-MM-DD + if p.published_date: + self.assertIsInstance(p.published_date, datetime) + + @unittest.skipUnless(check_zenodo_accessible(), "Zenodo not accessible") + def test_search_communities(self): + communities = self.searcher.search_communities(query="open", max_results=10, sort="bestmatch") + self.assertIsInstance(communities, list) + if communities: + # each community dict should have slug/title/links + c = communities[0] + self.assertIn("slug", c) + self.assertIn("links", c) + + @unittest.skipUnless(check_zenodo_accessible(), "Zenodo not accessible") + def test_get_record_details_and_list_files(self): + if not self.__class__.sample_record_id: + self.skipTest("No sample record id available") + details = self.searcher.get_record_details(self.__class__.sample_record_id) + self.assertTrue(details) + self.assertIn("id", details) + files = self.searcher.list_files(self.__class__.sample_record_id) + self.assertIsInstance(files, list) + # Non-fatal if no files or PDF not present + has_pdf = any((f.get("mimetype") == "application/pdf") or str(f.get("key", "")).lower().endswith(".pdf") for f in files) + if not has_pdf: + print("No PDF file listed for the sample record; proceeding without download test") + + @unittest.skipUnless(check_zenodo_accessible(), "Zenodo not accessible") + def test_download_and_read_pdf_if_available(self): + if not self.__class__.sample_record_id: + self.skipTest("No sample record id available") + # Check whether a PDF seems to be available + files = self.searcher.list_files(self.__class__.sample_record_id) + pdf_candidates = [f for f in files if (f.get("mimetype") == "application/pdf") or str(f.get("key", "")).lower().endswith(".pdf")] + if not pdf_candidates: + self.skipTest("Sample record has no PDF to download") + + temp_dir = tempfile.mkdtemp(prefix="zenodo_test_") + try: + pdf_path = self.searcher.download_pdf(self.__class__.sample_record_id, temp_dir) + # download_pdf returns a file path on success, or an error string + if isinstance(pdf_path, str) and os.path.isfile(pdf_path): + self.assertTrue(pdf_path.endswith('.pdf')) + self.assertGreater(os.path.getsize(pdf_path), 1024) + + # Try reading text (best-effort) + result = self.searcher.read_paper(self.__class__.sample_record_id, temp_dir) + self.assertIsInstance(result, str) + # Do not strictly assert text length due to varied PDFs + else: + print(f"PDF download not successful: {pdf_path}") + finally: + shutil.rmtree(temp_dir, ignore_errors=True) + + @unittest.skipUnless(check_zenodo_accessible(), "Zenodo not accessible") + def test_search_by_creator(self): + # Best-effort; ensure call succeeds and returns a list + results = self.searcher.search_by_creator("Hinton", max_results=3) + self.assertIsInstance(results, list) + if results: + self.assertEqual(results[0].source, "zenodo") + + +if __name__ == "__main__": + unittest.main()