1
+ use std:: io:: Write ;
2
+ use std:: sync:: LazyLock ;
1
3
use std:: {
2
4
collections:: HashMap ,
3
5
fs:: File ,
4
- io:: { BufRead , BufReader , Lines , Write } ,
5
- iter:: Enumerate ,
6
- sync:: LazyLock ,
6
+ io:: { BufRead , BufReader } ,
7
7
} ;
8
8
9
9
use anyhow:: { bail, Context , Result } ;
10
10
use regex:: { Captures , Regex } ;
11
+ use rusqlite:: Transaction ;
11
12
use types:: { Channel , Source } ;
12
13
13
14
use crate :: {
14
- log, media_type, source_type, sql,
15
+ log, media_type, source_type,
16
+ sql:: { self , set_channel_group_id} ,
15
17
types:: { self , ChannelHttpHeaders } ,
16
18
} ;
17
19
@@ -33,6 +35,17 @@ static HTTP_REFERRER_REGEX: LazyLock<Regex> =
33
35
static HTTP_USER_AGENT_REGEX : LazyLock < Regex > =
34
36
LazyLock :: new ( || Regex :: new ( r#"http-user-agent=(?P<user_agent>.+)"# ) . unwrap ( ) ) ;
35
37
38
+ struct M3UProcessing {
39
+ channel_line : Option < String > ,
40
+ channel_headers : Option < ChannelHttpHeaders > ,
41
+ channel_headers_set : bool ,
42
+ last_non_empty_line : Option < String > ,
43
+ groups : HashMap < String , i64 > ,
44
+ source_id : i64 ,
45
+ use_tvg_id : Option < bool > ,
46
+ line_count : usize ,
47
+ }
48
+
36
49
pub fn read_m3u8 ( mut source : Source , wipe : bool ) -> Result < ( ) > {
37
50
let path = match source. source_type {
38
51
source_type:: M3U_LINK => get_tmp_path ( ) ,
@@ -41,93 +54,110 @@ pub fn read_m3u8(mut source: Source, wipe: bool) -> Result<()> {
41
54
let file = File :: open ( path) . context ( "Failed to open m3u8 file" ) ?;
42
55
let reader = BufReader :: new ( file) ;
43
56
let mut lines = reader. lines ( ) . enumerate ( ) ;
44
- let mut problematic_lines: usize = 0 ;
45
- let mut lines_count: usize = 0 ;
46
- let mut groups: HashMap < String , i64 > = HashMap :: new ( ) ;
47
57
let mut sql = sql:: get_conn ( ) ?;
48
58
let tx = sql. transaction ( ) ?;
49
- let mut found_first_valid_channel: bool = false ;
50
59
if wipe {
51
60
sql:: wipe ( & tx, source. id . context ( "no source id" ) ?) ?;
52
61
} else {
53
62
source. id = Some ( sql:: create_or_find_source_by_name ( & tx, & source) ?) ;
54
63
}
55
- while let ( Some ( ( c1, l1) ) , Some ( ( c2, l2) ) ) = ( lines. next ( ) , lines. next ( ) ) {
56
- lines_count = c2;
57
- let mut l1 = match l1. with_context ( || format ! ( "(l1) Error on line: {c1}, skipping" ) ) {
58
- Ok ( line) => line,
64
+ let mut processing = M3UProcessing {
65
+ channel_headers : None ,
66
+ channel_headers_set : false ,
67
+ channel_line : None ,
68
+ groups : HashMap :: new ( ) ,
69
+ last_non_empty_line : None ,
70
+ source_id : source. id . context ( "no source id" ) ?,
71
+ use_tvg_id : source. use_tvg_id ,
72
+ line_count : 0 ,
73
+ } ;
74
+ while let Some ( ( c1, l1) ) = lines. next ( ) {
75
+ processing. line_count = c1;
76
+ let l1 = match l1. with_context ( || format ! ( "Failed to process line {c1}" ) ) {
77
+ Ok ( r) => r,
59
78
Err ( e) => {
60
79
log:: log ( format ! ( "{:?}" , e) ) ;
61
- problematic_lines += 1 ;
62
80
continue ;
63
81
}
64
82
} ;
65
- let mut l2 = match l2. with_context ( || format ! ( "(l2) Error on line: {c2}, skipping" ) ) {
66
- Ok ( line) => line,
67
- Err ( e) => {
68
- log:: log ( format ! ( "{:?}" , e) ) ;
69
- problematic_lines += 1 ;
70
- continue ;
83
+ let l1_upper = l1. to_uppercase ( ) ;
84
+ if l1_upper. starts_with ( "#EXTINF" ) {
85
+ try_commit_channel ( & mut processing, & tx) ;
86
+ processing. channel_line = Some ( l1) ;
87
+ processing. channel_headers_set = false ;
88
+ } else if l1_upper. starts_with ( "#EXTVLCOPT" ) {
89
+ if processing. channel_headers . is_none ( ) {
90
+ processing. channel_headers = Some ( ChannelHttpHeaders {
91
+ ..Default :: default ( )
92
+ } ) ;
71
93
}
72
- } ;
73
- while l1. trim ( ) . is_empty ( )
74
- || !( found_first_valid_channel || l1. to_lowercase ( ) . starts_with ( "#extinf" ) )
75
- {
76
- l1 = l2. clone ( ) ;
77
- if let Some ( next) = lines. next ( ) {
78
- let line_number = next. 0 ;
79
- l2 = next. 1 . with_context ( || format ! ( "Tried to skip empty/gibberish line (bad m3u mitigation), error on line {line_number}" ) ) ?;
80
- } else {
81
- break ;
94
+ if set_http_headers (
95
+ & l1,
96
+ processing. channel_headers . as_mut ( ) . context ( "no headers" ) ?,
97
+ ) {
98
+ processing. channel_headers_set = true ;
82
99
}
100
+ } else if !l1. trim ( ) . is_empty ( ) {
101
+ processing. last_non_empty_line = Some ( l1) ;
83
102
}
84
- if !found_first_valid_channel {
85
- found_first_valid_channel = true ;
86
- }
87
- let mut headers : Option < ChannelHttpHeaders > = None ;
88
- if l2 . starts_with ( "#EXTVLCOPT" ) {
89
- let ( fail , _headers ) = extract_headers ( & mut l2 , & mut lines ) ? ;
90
- if fail {
91
- continue ;
92
- }
93
- headers = _headers ;
103
+ }
104
+ try_commit_channel ( & mut processing , & tx ) ;
105
+ tx . commit ( ) ? ;
106
+ Ok ( ( ) )
107
+ }
108
+
109
+ fn try_commit_channel ( processing : & mut M3UProcessing , tx : & Transaction ) {
110
+ if let Some ( channel ) = processing . channel_line . take ( ) {
111
+ if !processing . channel_headers_set {
112
+ processing . channel_headers = None ;
94
113
}
95
- let mut channel = match get_channel_from_lines (
96
- l1,
97
- l2,
98
- source. id . context ( "no source id" ) ?,
99
- source. use_tvg_id ,
100
- )
101
- . with_context ( || format ! ( "Failed to process lines #{c1} #{c2}, skipping" ) )
102
- {
103
- Ok ( val) => val,
104
- Err ( e) => {
105
- log:: log ( format ! ( "{:?}" , e) ) ;
106
- problematic_lines += 2 ;
107
- continue ;
108
- }
109
- } ;
110
- sql:: set_channel_group_id (
111
- & mut groups,
112
- & mut channel,
114
+ commit_channel (
115
+ channel,
116
+ processing. last_non_empty_line . take ( ) ,
117
+ & mut processing. groups ,
118
+ processing. channel_headers . take ( ) ,
119
+ processing. source_id ,
120
+ processing. use_tvg_id ,
113
121
& tx,
114
- & source. id . context ( "no source id" ) ?,
115
122
)
116
- . unwrap_or_else ( |e| log:: log ( format ! ( "{:?}" , e) ) ) ;
117
- sql:: insert_channel ( & tx, channel) ?;
118
- if let Some ( mut headers) = headers {
119
- headers. channel_id = Some ( tx. last_insert_rowid ( ) ) ;
120
- sql:: insert_channel_headers ( & tx, headers) ?;
121
- }
123
+ . with_context ( || {
124
+ format ! (
125
+ "Failed to process channel ending at line {}" ,
126
+ processing. line_count
127
+ )
128
+ } )
129
+ . unwrap_or_else ( |e| {
130
+ log:: log ( format ! ( "{:?}" , e) ) ;
131
+ } ) ;
122
132
}
123
- if problematic_lines > lines_count / 2 {
124
- tx. rollback ( )
125
- . unwrap_or_else ( |e| log:: log ( format ! ( "{:?}" , e) ) ) ;
126
- return Err ( anyhow:: anyhow!(
127
- "Too many problematic lines, read considered failed"
128
- ) ) ;
133
+ }
134
+
135
+ fn commit_channel (
136
+ channel_line : String ,
137
+ last_line : Option < String > ,
138
+ groups : & mut HashMap < String , i64 > ,
139
+ headers : Option < ChannelHttpHeaders > ,
140
+ source_id : i64 ,
141
+ use_tvg_id : Option < bool > ,
142
+ tx : & Transaction ,
143
+ ) -> Result < ( ) > {
144
+ let mut channel = get_channel_from_lines (
145
+ channel_line,
146
+ last_line. context ( "missing last line" ) ?,
147
+ source_id,
148
+ use_tvg_id,
149
+ ) ?;
150
+ set_channel_group_id ( groups, & mut channel, tx, & source_id) . unwrap_or_else ( |e| {
151
+ log:: log ( format ! (
152
+ "Failed to set group id for channel: {}, Error: {:?}" ,
153
+ channel. name, e
154
+ ) )
155
+ } ) ;
156
+ sql:: insert_channel ( tx, channel) ?;
157
+ if let Some ( mut headers) = headers {
158
+ headers. channel_id = Some ( tx. last_insert_rowid ( ) ) ;
159
+ sql:: insert_channel_headers ( tx, headers) ?;
129
160
}
130
- tx. commit ( ) ?;
131
161
Ok ( ( ) )
132
162
}
133
163
@@ -161,46 +191,6 @@ fn extract_non_empty_capture(caps: Captures) -> Option<String> {
161
191
. filter ( |s| !s. trim ( ) . is_empty ( ) )
162
192
}
163
193
164
- fn extract_headers (
165
- l2 : & mut String ,
166
- lines : & mut Enumerate < Lines < BufReader < File > > > ,
167
- ) -> Result < ( bool , Option < ChannelHttpHeaders > ) > {
168
- let mut headers = ChannelHttpHeaders {
169
- id : None ,
170
- channel_id : None ,
171
- http_origin : None ,
172
- referrer : None ,
173
- user_agent : None ,
174
- ignore_ssl : None ,
175
- } ;
176
- let mut at_least_one: bool = false ;
177
- while l2. starts_with ( "#EXTVLCOPT" ) {
178
- let result = set_http_headers ( & l2, & mut headers) ;
179
- if result && !at_least_one {
180
- at_least_one = true ;
181
- }
182
- let result = lines. next ( ) . context ( "EOF?" ) ?;
183
- if let Ok ( line) = result. 1 {
184
- l2. clear ( ) ;
185
- l2. push_str ( & line) ;
186
- } else {
187
- log:: log ( format ! (
188
- "{:?}" ,
189
- result
190
- . 1
191
- . context( format!( "Failed to get line at {}" , result. 0 ) )
192
- . unwrap_err( )
193
- ) ) ;
194
- return Ok ( ( true , None ) ) ;
195
- }
196
- }
197
- if at_least_one {
198
- return Ok ( ( false , Some ( headers) ) ) ;
199
- } else {
200
- return Ok ( ( true , None ) ) ;
201
- }
202
- }
203
-
204
194
fn set_http_headers ( line : & str , headers : & mut ChannelHttpHeaders ) -> bool {
205
195
if let Some ( origin) = HTTP_ORIGIN_REGEX
206
196
. captures ( & line)
0 commit comments