Skip to content

Commit

Permalink
Updates and refactors
Browse files Browse the repository at this point in the history
Channel_file.go - fix issue with segments not correctly ending when they were supposed to

Log_type.go - moved log type to it's own file, setup global logging (touches on issue teacat#47)

Main.go - added update_log_level handler, setting global log level

Channel.go, channel_internal.go, channel_util.go - updated to use new log_type

Manager.go - updated to use new log_type, update from .com to .global (issue teacat#74)

Channel_update.go, create_channel.go, delete_channel.go, get_channel.go, get_settings.go, listen_update.go, pause_channel.go, resume_channel.go, terminal_program.go - go fmt / go vet

Chaturbate_channels.json.sample - added sample json of the channels file, for mapping in docker config

List_channels.go - refactored to sort by online status, so online is always at the first ones you see

Script.js - adjust default settings, added pagination, added global log logic

Index.html - updated to use online version of tocas ui, added pagination, added global log logic, visual improvements

Removal of local tocas folder since using online version
  • Loading branch information
J0nDoe committed Oct 22, 2024
1 parent 9fb2916 commit 3bdae1b
Show file tree
Hide file tree
Showing 14 changed files with 998 additions and 697 deletions.
20 changes: 10 additions & 10 deletions chaturbate/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type Channel struct {
IsPaused bool
isStopped bool
Logs []string
logType logType
LogType LogType

bufferLock sync.Mutex
buffer map[int][]byte
Expand All @@ -61,32 +61,32 @@ type Channel struct {
// Run
func (w *Channel) Run() {
if w.Username == "" {
w.log(logTypeError, "username is empty, use `-u USERNAME` to specify")
w.log(LogTypeError, "username is empty, use `-u USERNAME` to specify")
return
}

for {
if w.IsPaused {
w.log(logTypeInfo, "channel is paused")
w.log(LogTypeInfo, "channel is paused")
<-w.ResumeChannel // blocking
w.log(logTypeInfo, "channel is resumed")
w.log(LogTypeInfo, "channel is resumed")
}
if w.isStopped {
w.log(logTypeInfo, "channel is stopped")
w.log(LogTypeInfo, "channel is stopped")
break
}

body, err := w.requestChannelBody()
if err != nil {
w.log(logTypeError, "body request error: %w", err)
w.log(LogTypeError, "body request error: %v", err)
}
if strings.Contains(body, "playlist.m3u8") {
w.IsOnline = true
w.LastStreamedAt = time.Now().Format("2006-01-02 15:04:05")
w.log(logTypeInfo, "channel is online, start fetching...")
w.log(LogTypeInfo, "channel is online, start fetching...")

if err := w.record(body); err != nil { // blocking
w.log(logTypeError, "record error: %w", err)
w.log(LogTypeError, "record error: %v", err)
}
continue // this excutes when recording is over/interrupted
}
Expand All @@ -95,11 +95,11 @@ func (w *Channel) Run() {
// close file when offline so user can move/delete it
if w.file != nil {
if err := w.releaseFile(); err != nil {
w.log(logTypeError, "release file: %w", err)
w.log(LogTypeError, "release file: %v", err)
}
}

w.log(logTypeInfo, "channel is offline, check again %d min(s) later", w.Interval)
w.log(LogTypeInfo, "channel is offline, check again %d min(s) later", w.Interval)
<-time.After(time.Duration(w.Interval) * time.Minute) // minutes cooldown to check online status
}
}
Expand Down
60 changes: 36 additions & 24 deletions chaturbate/channel_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,10 @@ import (
"time"
)

// filename
// filename generates the filename based on the session pattern and current split index.
func (w *Channel) filename() (string, error) {
data := w.sessionPattern
if data == nil {
data = map[string]any{
if w.sessionPattern == nil {
w.sessionPattern = map[string]any{
"Username": w.Username,
"Year": time.Now().Format("2006"),
"Month": time.Now().Format("01"),
Expand All @@ -23,69 +22,82 @@ func (w *Channel) filename() (string, error) {
"Second": time.Now().Format("05"),
"Sequence": 0,
}
w.sessionPattern = data
} else {
data["Sequence"] = w.splitIndex
}
t, err := template.New("filename").Parse(w.filenamePattern)

w.sessionPattern["Sequence"] = w.splitIndex

var buf bytes.Buffer
tmpl, err := template.New("filename").Parse(w.filenamePattern)
if err != nil {
return "", err
return "", fmt.Errorf("filename pattern error: %w", err)
}
var buf bytes.Buffer
if err := t.Execute(&buf, data); err != nil {
return "", err
if err := tmpl.Execute(&buf, w.sessionPattern); err != nil {
return "", fmt.Errorf("template execution error: %w", err)
}

return buf.String(), nil
}

// newFile
// newFile creates a new file and prepares it for writing stream data.
func (w *Channel) newFile() error {
filename, err := w.filename()
if err != nil {
return fmt.Errorf("filename pattern error: %w", err)
return err
}

if err := os.MkdirAll(filepath.Dir(filename), 0777); err != nil {
return fmt.Errorf("create folder: %w", err)
}

file, err := os.OpenFile(filename+".ts", os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0777)
if err != nil {
return fmt.Errorf("cannot open file: %s: %w", filename, err)
}
w.log(logTypeInfo, "the stream will be saved as %s.ts", filename)

w.log(LogTypeInfo, "the stream will be saved as %s.ts", filename)
w.file = file
return nil
}

// releaseFile
// releaseFile closes the current file and removes it if empty.
func (w *Channel) releaseFile() error {
if w.file == nil {
return nil
}
// close the file to remove it

if err := w.file.Close(); err != nil {
return fmt.Errorf("close file: %s: %w", w.file.Name(), err)
}
// remove it if it was empty
if w.SegmentFilesize == 0 {
w.log(logTypeInfo, "%s was removed because it was empty", w.file.Name())

if w.SegmentFilesize == 0 {
w.log(LogTypeInfo, "%s was removed because it was empty", w.file.Name())
if err := os.Remove(w.file.Name()); err != nil {
return fmt.Errorf("remove zero file: %s: %w", w.file.Name(), err)
}
}

w.file = nil
return nil
}

// nextFile
func (w *Channel) nextFile() error {
// nextFile handles the transition to a new file segment, ensuring correct timing.
func (w *Channel) nextFile(startTime time.Time) error {
// Release the current file before creating a new one.
if err := w.releaseFile(); err != nil {
w.log(logTypeError, "release file: %w", err)
w.log(LogTypeError, "release file: %v", err)
return err
}

// Increment the split index for the next file.
w.splitIndex++

// Reset segment data.
w.SegmentFilesize = 0
w.SegmentDuration = 0

// Calculate the actual segment duration using the elapsed time.
elapsed := int(time.Since(startTime).Minutes())
w.SegmentDuration = elapsed

// Create the new file.
return w.newFile()
}
68 changes: 44 additions & 24 deletions chaturbate/channel_internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,17 +175,17 @@ func (w *Channel) resolveSource(body string) (string, string, error) {
if variant == nil {
return "", "", fmt.Errorf("no available resolution")
}
w.log(logTypeInfo, "resolution %dp is used", variant.width)
w.log(LogTypeInfo, "resolution %dp is used", variant.width)

url, ok := variant.framerate[w.Framerate]
// If the framerate is not found, fallback to the first found framerate, this block pretends there're only 30 and 60 fps.
// no complex logic here, im lazy.
if ok {
w.log(logTypeInfo, "framerate %dfps is used", w.Framerate)
w.log(LogTypeInfo, "framerate %dfps is used", w.Framerate)
} else {
for k, v := range variant.framerate {
url = v
w.log(logTypeWarning, "framerate %dfps not found, fallback to %dfps", w.Framerate, k)
w.log(LogTypeWarning, "framerate %dfps not found, fallback to %dfps", w.Framerate, k)
w.Framerate = k
break
}
Expand All @@ -196,66 +196,86 @@ func (w *Channel) resolveSource(body string) (string, string, error) {
return rootURL, sourceURL, nil
}

// mergeSegments is a async function that runs in background for the channel,
// and it merges the segments from buffer to the file.
// mergeSegments runs in the background and merges segments from the buffer to the file.
func (w *Channel) mergeSegments() {
var segmentRetries int
startTime := time.Now() // Track the start time of the current segment.

for {
if w.IsPaused || w.isStopped {
break
}

// Handle segment retries if not found.
if segmentRetries > 5 {
w.log(logTypeWarning, "segment #%d not found in buffer, skipped", w.bufferIndex)
w.log(LogTypeWarning, "segment #%d not found in buffer, skipped", w.bufferIndex)
w.bufferIndex++
segmentRetries = 0
continue
}

// If buffer is empty, wait and retry.
if len(w.buffer) == 0 {
<-time.After(1 * time.Second)
time.Sleep(1 * time.Second)
continue
}

// Retrieve segment from buffer.
w.bufferLock.Lock()
buf, ok := w.buffer[w.bufferIndex]
w.bufferLock.Unlock()

if !ok {
segmentRetries++
<-time.After(time.Duration(segmentRetries) * time.Second)
time.Sleep(time.Duration(segmentRetries) * time.Second)
continue
}

// Write the segment to the file.
lens, err := w.file.Write(buf)
if err != nil {
w.log(logTypeError, "segment #%d written error: %v", w.bufferIndex, err)
w.log(LogTypeError, "segment #%d written error: %v", w.bufferIndex, err)
w.retries++
continue
}
w.log(logTypeInfo, "segment #%d written", w.bufferIndex)
w.log(logTypeDebug, "duration: %s, size: %s", DurationStr(w.SegmentDuration), ByteStr(w.SegmentFilesize))

// Update segment size and log progress.
w.SegmentFilesize += lens
segmentRetries = 0
w.log(LogTypeInfo, "segment #%d written", w.bufferIndex)
w.log(LogTypeDebug, "duration: %s, size: %s", DurationStr(w.SegmentDuration), ByteStr(w.SegmentFilesize))

// Check if the file size limit has been reached.
if w.SplitFilesize > 0 && w.SegmentFilesize >= w.SplitFilesize*1024*1024 {
w.log(logTypeInfo, "filesize exceeded, creating new file")
w.log(LogTypeInfo, "filesize exceeded, creating new file")

if err := w.nextFile(); err != nil {
w.log(logTypeError, "next file error: %v", err)
if err := w.nextFile(startTime); err != nil {
w.log(LogTypeError, "next file error: %v", err)
break
}
} else if w.SplitDuration > 0 && w.SegmentDuration >= w.SplitDuration*60 {
w.log(logTypeInfo, "duration exceeded, creating new file")

if err := w.nextFile(); err != nil {
w.log(logTypeError, "next file error: %v", err)
startTime = time.Now() // Reset start time for the new segment.
}

// Check if the duration limit has been reached.
elapsed := int(time.Since(startTime).Minutes())
if w.SplitDuration > 0 && elapsed >= w.SplitDuration {
w.log(LogTypeInfo, "duration exceeded, creating new file")

if err := w.nextFile(startTime); err != nil {
w.log(LogTypeError, "next file error: %v", err)
break
}

startTime = time.Now() // Reset start time for the new segment.
}

// Remove the processed segment from the buffer.
w.bufferLock.Lock()
delete(w.buffer, w.bufferIndex)
w.bufferLock.Unlock()

w.bufferIndex++
w.bufferIndex++ // Move to the next segment.
segmentRetries = 0 // Reset retries for the next segment.
}
}

Expand All @@ -276,15 +296,15 @@ func (w *Channel) fetchSegments() {
break
}

w.log(logTypeError, "segment list error, will try again [%d/10]: %v", disconnectRetries, err)
w.log(LogTypeError, "segment list error, will try again [%d/10]: %v", disconnectRetries, err)
disconnectRetries++

<-time.After(time.Duration(wait) * time.Second)
continue
}

if disconnectRetries > 0 {
w.log(logTypeInfo, "channel is back online!")
w.log(LogTypeInfo, "channel is back online!")
w.IsOnline = true
disconnectRetries = 0
}
Expand All @@ -296,7 +316,7 @@ func (w *Channel) fetchSegments() {

go func(index int, uri string) {
if err := w.requestSegment(uri, index); err != nil {
w.log(logTypeError, "segment #%d request error, ignored: %v", index, err)
w.log(LogTypeError, "segment #%d request error, ignored: %v", index, err)
return
}
}(w.segmentIndex, v.URI)
Expand Down Expand Up @@ -379,7 +399,7 @@ func (w *Channel) requestSegment(url string, index int) error {
return fmt.Errorf("read body: %w", err)
}

w.log(logTypeDebug, "segment #%d fetched", index)
w.log(LogTypeDebug, "segment #%d fetched", index)

w.bufferLock.Lock()
w.buffer[index] = body
Expand Down
33 changes: 14 additions & 19 deletions chaturbate/channel_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,34 +5,28 @@ import (
"time"
)

type logType string

const (
logTypeDebug logType = "DEBUG"
logTypeInfo logType = "INFO"
logTypeWarning logType = "WARN"
logTypeError logType = "ERROR"
)

// log
func (w *Channel) log(typ logType, message string, v ...interface{}) {
switch w.logType {
case logTypeInfo:
if typ == logTypeDebug {
func (w *Channel) log(typ LogType, message string, v ...interface{}) {
// Check the global log level
currentLogLevel := GetGlobalLogLevel()

switch currentLogLevel {
case LogTypeInfo:
if typ == LogTypeDebug {
return
}
case logTypeWarning:
if typ == logTypeDebug || typ == logTypeInfo {
case LogTypeWarning:
if typ == LogTypeDebug || typ == LogTypeInfo {
return
}
case logTypeError:
if typ == logTypeDebug || typ == logTypeInfo || typ == logTypeWarning {
case LogTypeError:
if typ == LogTypeDebug || typ == LogTypeInfo || typ == LogTypeWarning {
return
}
}

updateLog := fmt.Sprintf("[%s] [%s] %s", time.Now().Format("2006-01-02 15:04:05"), typ, fmt.Errorf(message, v...))
consoleLog := fmt.Sprintf("[%s] [%s] [%s] %s", time.Now().Format("2006-01-02 15:04:05"), typ, w.Username, fmt.Errorf(message, v...))
updateLog := fmt.Sprintf("[%s] [%s] %s", time.Now().Format("2006-01-02 15:04:05"), typ, fmt.Sprintf(message, v...))
consoleLog := fmt.Sprintf("[%s] [%s] [%s] %s", time.Now().Format("2006-01-02 15:04:05"), typ, w.Username, fmt.Sprintf(message, v...))

update := &Update{
Username: w.Username,
Expand All @@ -43,6 +37,7 @@ func (w *Channel) log(typ logType, message string, v ...interface{}) {
SegmentDuration: w.SegmentDuration,
SegmentFilesize: w.SegmentFilesize,
}

if w.file != nil {
update.Filename = w.file.Name()
}
Expand Down
Loading

0 comments on commit 3bdae1b

Please sign in to comment.