|
| 1 | +use std::io::Write; |
| 2 | +use std::sync::LazyLock; |
1 | 3 | use std::{
|
2 | 4 | collections::HashMap,
|
3 | 5 | fs::File,
|
4 |
| - io::{BufRead, BufReader, Lines, Write}, |
5 |
| - iter::Enumerate, |
6 |
| - sync::LazyLock, |
| 6 | + io::{BufRead, BufReader}, |
7 | 7 | };
|
8 | 8 |
|
9 | 9 | use anyhow::{bail, Context, Result};
|
10 | 10 | use regex::{Captures, Regex};
|
| 11 | +use rusqlite::Transaction; |
11 | 12 | use types::{Channel, Source};
|
12 | 13 |
|
13 | 14 | use crate::{
|
14 |
| - log, media_type, source_type, sql, |
| 15 | + log, media_type, source_type, |
| 16 | + sql::{self, set_channel_group_id}, |
15 | 17 | types::{self, ChannelHttpHeaders},
|
16 | 18 | };
|
17 | 19 |
|
@@ -40,97 +42,96 @@ pub fn read_m3u8(mut source: Source, wipe: bool) -> Result<()> {
|
40 | 42 | };
|
41 | 43 | let file = File::open(path).context("Failed to open m3u8 file")?;
|
42 | 44 | let reader = BufReader::new(file);
|
43 |
| - let mut lines = reader.lines().enumerate(); |
44 |
| - let mut problematic_lines: usize = 0; |
45 |
| - let mut lines_count: usize = 0; |
| 45 | + let mut lines = reader.lines().enumerate().peekable(); |
46 | 46 | let mut groups: HashMap<String, i64> = HashMap::new();
|
47 | 47 | let mut sql = sql::get_conn()?;
|
48 | 48 | let tx = sql.transaction()?;
|
49 |
| - let mut found_first_valid_channel: bool = false; |
50 | 49 | if wipe {
|
51 | 50 | sql::wipe(&tx, source.id.context("no source id")?)?;
|
52 | 51 | } else {
|
53 | 52 | source.id = Some(sql::create_or_find_source_by_name(&tx, &source)?);
|
54 | 53 | }
|
55 |
| - while let (Some((c1, l1)), Some((c2, l2))) = (lines.next(), lines.next()) { |
56 |
| - lines_count = c2; |
57 |
| - let mut l1 = match l1.with_context(|| format!("(l1) Error on line: {c1}, skipping")) { |
58 |
| - Ok(line) => line, |
| 54 | + let mut channel_line: Option<String> = None; |
| 55 | + let mut channel_headers: Option<ChannelHttpHeaders> = None; |
| 56 | + let mut channel_headers_set: bool = false; |
| 57 | + let mut last_non_empty_line: Option<String> = None; |
| 58 | + while let Some((c1, l1)) = lines.next() { |
| 59 | + let l1 = match l1.with_context(|| format!("Failed to process line {c1}")) { |
| 60 | + Ok(r) => r, |
59 | 61 | Err(e) => {
|
60 | 62 | log::log(format!("{:?}", e));
|
61 |
| - problematic_lines += 1; |
62 | 63 | continue;
|
63 | 64 | }
|
64 | 65 | };
|
65 |
| - let mut l2 = match l2.with_context(|| format!("(l2) Error on line: {c2}, skipping")) { |
66 |
| - Ok(line) => line, |
67 |
| - Err(e) => { |
68 |
| - log::log(format!("{:?}", e)); |
69 |
| - problematic_lines += 1; |
70 |
| - continue; |
71 |
| - } |
72 |
| - }; |
73 |
| - while l1.trim().is_empty() |
74 |
| - || !(found_first_valid_channel || l1.to_lowercase().starts_with("#extinf")) |
75 |
| - { |
76 |
| - l1 = l2.clone(); |
77 |
| - if let Some(next) = lines.next() { |
78 |
| - let line_number = next.0; |
79 |
| - l2 = next.1.with_context(|| format!("Tried to skip empty/gibberish line (bad m3u mitigation), error on line {line_number}"))?; |
80 |
| - } else { |
81 |
| - break; |
| 66 | + let l1_upper = l1.to_uppercase(); |
| 67 | + if l1_upper.starts_with("#EXTINF") || lines.peek().is_none() { |
| 68 | + if let Some(channel) = channel_line { |
| 69 | + if !channel_headers_set { |
| 70 | + channel_headers = None; |
| 71 | + } |
| 72 | + commit_channel( |
| 73 | + channel, |
| 74 | + last_non_empty_line.take(), |
| 75 | + &mut groups, |
| 76 | + channel_headers.take(), |
| 77 | + source.id.context("missing source id")?, |
| 78 | + source.use_tvg_id, |
| 79 | + &tx, |
| 80 | + ) |
| 81 | + .with_context(|| format!("Failed to process channel ending at line {c1}")) |
| 82 | + .unwrap_or_else(|e| { |
| 83 | + log::log(format!("{:?}", e)); |
| 84 | + }); |
82 | 85 | }
|
83 |
| - } |
84 |
| - if !found_first_valid_channel { |
85 |
| - found_first_valid_channel = true; |
86 |
| - } |
87 |
| - let mut headers: Option<ChannelHttpHeaders> = None; |
88 |
| - if l2.starts_with("#EXTVLCOPT") { |
89 |
| - let (fail, _headers) = extract_headers(&mut l2, &mut lines)?; |
90 |
| - if fail { |
91 |
| - continue; |
| 86 | + channel_line = Some(l1); |
| 87 | + channel_headers_set = false; |
| 88 | + } else if l1_upper.starts_with("#EXTVLCOPT") { |
| 89 | + if channel_headers.is_none() { |
| 90 | + channel_headers = Some(ChannelHttpHeaders { |
| 91 | + ..Default::default() |
| 92 | + }); |
92 | 93 | }
|
93 |
| - headers = _headers; |
94 |
| - } |
95 |
| - let mut channel = match get_channel_from_lines( |
96 |
| - l1, |
97 |
| - l2, |
98 |
| - source.id.context("no source id")?, |
99 |
| - source.use_tvg_id, |
100 |
| - ) |
101 |
| - .with_context(|| format!("Failed to process lines #{c1} #{c2}, skipping")) |
102 |
| - { |
103 |
| - Ok(val) => val, |
104 |
| - Err(e) => { |
105 |
| - log::log(format!("{:?}", e)); |
106 |
| - problematic_lines += 2; |
107 |
| - continue; |
| 94 | + if set_http_headers(&l1, channel_headers.as_mut().context("no headers")?) { |
| 95 | + channel_headers_set = true; |
108 | 96 | }
|
109 |
| - }; |
110 |
| - sql::set_channel_group_id( |
111 |
| - &mut groups, |
112 |
| - &mut channel, |
113 |
| - &tx, |
114 |
| - &source.id.context("no source id")?, |
115 |
| - ) |
116 |
| - .unwrap_or_else(|e| log::log(format!("{:?}", e))); |
117 |
| - sql::insert_channel(&tx, channel)?; |
118 |
| - if let Some(mut headers) = headers { |
119 |
| - headers.channel_id = Some(tx.last_insert_rowid()); |
120 |
| - sql::insert_channel_headers(&tx, headers)?; |
| 97 | + } else if !l1.trim().is_empty() { |
| 98 | + last_non_empty_line = Some(l1); |
121 | 99 | }
|
122 | 100 | }
|
123 |
| - if problematic_lines > lines_count / 2 { |
124 |
| - tx.rollback() |
125 |
| - .unwrap_or_else(|e| log::log(format!("{:?}", e))); |
126 |
| - return Err(anyhow::anyhow!( |
127 |
| - "Too many problematic lines, read considered failed" |
128 |
| - )); |
129 |
| - } |
| 101 | + |
130 | 102 | tx.commit()?;
|
131 | 103 | Ok(())
|
132 | 104 | }
|
133 | 105 |
|
| 106 | +fn commit_channel( |
| 107 | + channel_line: String, |
| 108 | + last_line: Option<String>, |
| 109 | + groups: &mut HashMap<String, i64>, |
| 110 | + headers: Option<ChannelHttpHeaders>, |
| 111 | + source_id: i64, |
| 112 | + use_tvg_id: Option<bool>, |
| 113 | + tx: &Transaction, |
| 114 | +) -> Result<()> { |
| 115 | + let mut channel = get_channel_from_lines( |
| 116 | + channel_line, |
| 117 | + last_line.context("missing last line")?, |
| 118 | + source_id, |
| 119 | + use_tvg_id, |
| 120 | + )?; |
| 121 | + set_channel_group_id(groups, &mut channel, tx, &source_id).unwrap_or_else(|e| { |
| 122 | + log::log(format!( |
| 123 | + "Failed to set group id for channel: {}, Error: {:?}", |
| 124 | + channel.name, e |
| 125 | + )) |
| 126 | + }); |
| 127 | + sql::insert_channel(tx, channel)?; |
| 128 | + if let Some(mut headers) = headers { |
| 129 | + headers.channel_id = Some(tx.last_insert_rowid()); |
| 130 | + sql::insert_channel_headers(tx, headers)?; |
| 131 | + } |
| 132 | + Ok(()) |
| 133 | +} |
| 134 | + |
134 | 135 | pub async fn get_m3u8_from_link(source: Source, wipe: bool) -> Result<()> {
|
135 | 136 | let client = reqwest::Client::new();
|
136 | 137 | let url = source.url.clone().context("Invalid source")?;
|
@@ -161,46 +162,6 @@ fn extract_non_empty_capture(caps: Captures) -> Option<String> {
|
161 | 162 | .filter(|s| !s.trim().is_empty())
|
162 | 163 | }
|
163 | 164 |
|
164 |
| -fn extract_headers( |
165 |
| - l2: &mut String, |
166 |
| - lines: &mut Enumerate<Lines<BufReader<File>>>, |
167 |
| -) -> Result<(bool, Option<ChannelHttpHeaders>)> { |
168 |
| - let mut headers = ChannelHttpHeaders { |
169 |
| - id: None, |
170 |
| - channel_id: None, |
171 |
| - http_origin: None, |
172 |
| - referrer: None, |
173 |
| - user_agent: None, |
174 |
| - ignore_ssl: None, |
175 |
| - }; |
176 |
| - let mut at_least_one: bool = false; |
177 |
| - while l2.starts_with("#EXTVLCOPT") { |
178 |
| - let result = set_http_headers(&l2, &mut headers); |
179 |
| - if result && !at_least_one { |
180 |
| - at_least_one = true; |
181 |
| - } |
182 |
| - let result = lines.next().context("EOF?")?; |
183 |
| - if let Ok(line) = result.1 { |
184 |
| - l2.clear(); |
185 |
| - l2.push_str(&line); |
186 |
| - } else { |
187 |
| - log::log(format!( |
188 |
| - "{:?}", |
189 |
| - result |
190 |
| - .1 |
191 |
| - .context(format!("Failed to get line at {}", result.0)) |
192 |
| - .unwrap_err() |
193 |
| - )); |
194 |
| - return Ok((true, None)); |
195 |
| - } |
196 |
| - } |
197 |
| - if at_least_one { |
198 |
| - return Ok((false, Some(headers))); |
199 |
| - } else { |
200 |
| - return Ok((true, None)); |
201 |
| - } |
202 |
| -} |
203 |
| - |
204 | 165 | fn set_http_headers(line: &str, headers: &mut ChannelHttpHeaders) -> bool {
|
205 | 166 | if let Some(origin) = HTTP_ORIGIN_REGEX
|
206 | 167 | .captures(&line)
|
|
0 commit comments