@@ -182,8 +182,7 @@ def _save_jsonl(documents, output_path, start_index=0, max_index=10000, prefix=N
182
182
"""Worker function to write out the data to jsonl files"""
183
183
184
184
def _output_json (document ):
185
- myjson = json .dumps (document , ensure_ascii = False )
186
- return myjson .encode ("utf-8" )
185
+ return document .strip ().encode ('utf-8' )
187
186
188
187
def _name (start_index , npad , prefix , i ):
189
188
tag = str (start_index + i ).rjust (npad , "0" )
@@ -195,11 +194,19 @@ def _name(start_index, npad, prefix, i):
195
194
196
195
output_glob_string = os .path .join (output_path , "*.jsonl" )
197
196
198
- documents .map (_output_json ).to_textfiles (
197
+ output_files = documents .map (_output_json ).to_textfiles (
199
198
output_glob_string ,
200
199
name_function = name ,
201
200
)
202
201
202
+ # Delete empty files generated due to empty partitions in the bag
203
+ for output_file in output_files :
204
+ try :
205
+ if os .path .getsize (output_file ) == 0 :
206
+ os .remove (output_file )
207
+ except Exception as exception :
208
+ print (f"An exception occurred when trying to delete { output_file } .\n { exception } " , flush = True )
209
+
203
210
204
211
def reshard_jsonl (
205
212
input_dir , output_dir , output_file_size = "100M" , start_index = 0 , file_prefix = ""
@@ -222,7 +229,7 @@ def reshard_jsonl(
222
229
input_files = list (get_all_files_paths_under (input_dir ))
223
230
224
231
# Read in the dask bag
225
- b = db .read_text (input_files , blocksize = blocksize ). map ( json . loads )
232
+ b = db .read_text (input_files , blocksize = blocksize )
226
233
227
234
# Prepare the output
228
235
output_dir = expand_outdir_and_mkdir (output_dir )
0 commit comments