-
Notifications
You must be signed in to change notification settings - Fork 152
/
Copy pathtransform.py
126 lines (100 loc) · 4.73 KB
/
transform.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
# (C) Copyright IBM Corp. 2024.
# Licensed under the Apache License, Version 2.0 (the “License”);
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an “AS IS” BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
import time
from typing import Any
import pyarrow as pa
from data_processing.data_access import DataAccessLocal
from data_processing.transform import AbstractTableTransform
from data_processing.utils import get_logger
from dpk_connector import crawl
from dpk_web2parquet.utils import *
user_agent = "Mozilla/5.0 (X11; Linux i686; rv:125.0) Gecko/20100101 Firefox/125.0"
logger = get_logger(__name__,"DEBUG")
class Web2ParquetTransform(AbstractTableTransform):
"""
Crawl the web and load content to pyarrow Table.
"""
def __init__(self, config: dict[str, Any]):
"""
Initialize based on the dictionary of configuration information.
example:
kwargs = {'urls': ['https://thealliance.ai/'],'depth': 1,'downloads': 1}
Web2ParquetTransform(**kwargs)
or
Web2ParquetTransform(urls=['https://thealliance.ai/'], depth=1, downloads=1)
"""
# Make sure that the param name corresponds to the name used in apply_input_params method
# of NOOPTransformConfiguration class
logger.debug(f"Received configuration: {config}")
super().__init__(config)
self.seed_urls = config.get("urls", [])
self.depth = config.get("depth", 1)
self.downloads = config.get("downloads", 10)
self.allow_mime_types = config.get("mime_types", ["application/pdf","text/html","text/markdown","text/plain"])
self.folder=config.get('folder', None)
assert self.seed_urls, "Must specify a list of URLs to crawl. Url cannot be None"
## users may be tempted to provide a single URLs, we still need to put it in a list of 1
if type(self.seed_urls) is not list:
self.seed_urls=[self.seed_urls]
self.count = 0
self.docs = []
def on_download(self, url: str, body: bytes, headers: dict) -> None:
"""
Callback function called when a page has been downloaded.
You have access to the request URL, response body and headers.
"""
doc=get_file_info(url, headers)
doc['url'] = url
doc['contents'] = body
logger.debug(f"url: {doc['url']}, filename: {doc['filename']}, content_type: {doc['content_type']}")
## Enforce download limits
if len(self.docs) < self.downloads:
self.docs.append(doc)
def transform(self, table: pa.Table=None, file_name: str = None) -> tuple[list[pa.Table], dict[str, Any]]:
"""
Put Transform-specific to convert one Table to 0 or more tables. It also returns
a dictionary of execution statistics - arbitrary dictionary
This implementation makes no modifications so effectively implements a copy of the
input parquet to the output folder, without modification.
"""
start_time = time.time()
crawl(
self.seed_urls,
self.on_download,
user_agent=user_agent,
depth_limit=self.depth,
download_limit=self.downloads,
allow_mime_types=self.allow_mime_types
) # blocking call
end_time = time.time()
table = pa.Table.from_pylist(self.docs)
metadata = {
"count": len(self.docs),
"requested_seeds": len(self.seed_urls),
"requested_depth": self.depth,
"requested_downloads": self. downloads,
}
logger.info(f"Crawling is completed in {end_time - start_time:.2f} seconds")
logger.info(f"{metadata = }")
#############################################################################
## The same transform can also be used to store crawled files to local folder
if self.folder:
dao=DataAccessLocal(local_config={'output_folder':self.folder,'input_folder':'.'})
for x in self.docs:
dao.save_file(self.folder+'/'+x['filename'], x['contents'])
return [table], metadata
class Web2Parquet(Web2ParquetTransform):
"""
Crawl the web and load content to pyarrow Table.
"""
def __init__(self, **kwargs):
super().__init__(dict(kwargs))