2
2
import tempfile
3
3
4
4
from django .conf import settings
5
+ from django_q .tasks import async_task
5
6
7
+ from documents import tasks
8
+ from documents .consumer import ConsumerError
6
9
from documents .models import Document
7
10
8
11
from pikepdf import Pdf
@@ -12,55 +15,123 @@ class MergeError(Exception):
12
15
pass
13
16
14
17
15
- def execute_split_merge_plan ( plan , metadata : str , delete_source : bool , preview : bool ) :
18
+ class PdfCache :
16
19
17
- temp_dir = tempfile .mkdtemp (prefix = "paperless-merge" , dir = settings .SCRATCH_DIR )
20
+ def __init__ (self ):
21
+ self .cache = dict ()
18
22
19
- target_files = []
23
+ def open_from_document (self , document : Document ):
24
+ if document .pk in self .cache :
25
+ return self .cache [document .pk ]
20
26
21
- for (i , target_document_spec ) in enumerate (plan ):
27
+ if document .mime_type == 'application/pdf' :
28
+ filename = document .source_path
29
+ elif document .has_archive_version :
30
+ filename = document .archive_path
31
+ else :
32
+ raise MergeError ()
22
33
23
- # create a new document from documents in target_document_spec
34
+ if not os .path .exists (filename ):
35
+ raise MergeError ()
24
36
25
- target_pdf : Pdf = Pdf .new ()
37
+ pdf = Pdf .open (filename )
38
+ self .cache [document .pk ] = pdf
26
39
27
- for source_document_spec in target_document_spec :
40
+ return pdf
28
41
29
- source_document_id = source_document_spec ['document' ]
42
+ def close_all (self ):
43
+ for pk in self .cache :
44
+ self .cache [pk ].close ()
30
45
31
- if 'pages' in source_document_spec :
32
- pages = source_document_spec ['pages' ]
33
- else :
34
- pages = None
46
+ self .cache .clear ()
35
47
36
- try :
37
- source_document : Document = Document .objects .get (id = source_document_id )
38
- except Document .DoesNotExist :
39
- raise MergeError ()
40
48
41
- if source_document .mime_type == 'application/pdf' :
42
- source_pdf : Pdf = Pdf .open (source_document .source_path )
43
- elif source_document .has_archive_version :
44
- source_pdf : Pdf = Pdf .open (source_document .archive_path )
45
- else :
46
- raise MergeError ()
49
+ def consume_many_files (kwargs_list , delete_document_ids = None ):
50
+ new_document_ids = []
47
51
48
- if pages is not None :
49
- for page in pages :
50
- if page >= len (source_pdf .pages ):
51
- raise MergeError ()
52
- target_pdf .pages .append (source_pdf .pages [page ])
53
- else :
54
- target_pdf .pages .extend (source_pdf .pages )
52
+ try :
53
+ for kwargs in kwargs_list :
54
+ new_document_ids .append (tasks .consume_file (** kwargs ))
55
55
56
- target_pdf_filename = os .path .join (temp_dir , f"{ i + 1 :02} .pdf" )
57
- target_pdf .save (target_pdf_filename )
58
- target_files .append (target_pdf_filename )
56
+ except ConsumerError :
57
+ # in case something goes wrong, delete all previously created documents
58
+ for document_id in new_document_ids :
59
+ Document .objects .get (id = document_id ).delete ()
60
+ raise
61
+ else :
62
+ # If everything goes well, optionally delete source documents
63
+ if delete_document_ids :
64
+ for document_id in delete_document_ids :
65
+ Document .objects .get (id = document_id ).delete ()
59
66
60
- if not preview :
61
- pass
62
67
63
- if delete_source :
64
- pass
68
+ def execute_split_merge_plan (plan , tempdir : str , metadata : str = "redo" , delete_source : bool = False , preview : bool = True ):
69
+
70
+ consume_tasks = []
71
+ cache = PdfCache ()
72
+ source_documents = set ()
73
+
74
+ try :
75
+ for (i , target_document_spec ) in enumerate (plan ):
76
+ # create a new document from documents in target_document_spec
77
+
78
+ target_pdf : Pdf = Pdf .new ()
79
+ version = target_pdf .pdf_version
80
+
81
+ for source_document_spec in target_document_spec :
82
+ source_document_id = source_document_spec ['document' ]
83
+ source_documents .add (source_document_id )
84
+
85
+ if 'pages' in source_document_spec :
86
+ pages = source_document_spec ['pages' ]
87
+ else :
88
+ pages = None
89
+
90
+ try :
91
+ source_document : Document = Document .objects .get (id = source_document_id )
92
+ except Document .DoesNotExist :
93
+ raise MergeError ()
94
+
95
+ source_pdf : Pdf = cache .open_from_document (source_document )
96
+ version = max (version , source_pdf .pdf_version )
97
+
98
+ if pages is not None :
99
+ for page in pages :
100
+ if page >= len (source_pdf .pages ):
101
+ raise MergeError ()
102
+ target_pdf .pages .append (source_pdf .pages [page ])
103
+ else :
104
+ target_pdf .pages .extend (source_pdf .pages )
105
+
106
+ target_pdf_filename = tempfile .NamedTemporaryFile (suffix = "_pdf" , dir = tempdir ).name
107
+ target_pdf .remove_unreferenced_resources ()
108
+ target_pdf .save (target_pdf_filename , min_version = version )
109
+ target_pdf .close ()
110
+
111
+ consume_task = {"path" : target_pdf_filename }
112
+
113
+ first_id = target_document_spec [0 ]["document" ]
114
+ first_doc : Document = Document .objects .get (id = first_id )
115
+
116
+ consume_task ["override_title" ] = first_doc .title
117
+
118
+ if metadata == "copy_first" :
119
+ if first_doc .correspondent :
120
+ consume_task ["override_correspondent_id" ] = first_doc .correspondent .id
121
+ if first_doc .document_type :
122
+ consume_task ["override_document_type_id" ] = first_doc .document_type .hidden
123
+ if first_doc .tags .count () > 0 :
124
+ consume_task ["override_tag_ids" ] = [tag .id for tag in first_doc .tags ]
125
+
126
+ consume_tasks .append (consume_task )
127
+ finally :
128
+ cache .close_all ()
129
+
130
+ if not preview :
131
+ async_task (
132
+ "documents.merge.consume_many_files" ,
133
+ kwargs_list = consume_tasks ,
134
+ delete_document_ids = list (source_documents ) if delete_source else None
135
+ )
65
136
66
- return target_files
137
+ return [ os . path . basename ( t [ "path" ]) for t in consume_tasks ]
0 commit comments