Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

When subscribed to a new_topic, send endpoints to all apps #70

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 5 additions & 6 deletions app/src/main/java/io/heckel/ntfy/msg/ApiService.kt
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import java.io.IOException
import java.net.URLEncoder
import java.nio.charset.StandardCharsets.UTF_8
import java.util.concurrent.TimeUnit
import kotlin.random.Random

class ApiService {
private val client = OkHttpClient.Builder()
Expand Down Expand Up @@ -111,7 +110,7 @@ class ApiService {
unifiedPushTopics: String,
since: String?,
user: User?,
notify: (topic: String, Notification) -> Unit,
notify: (Message) -> Unit,
fail: (Exception) -> Unit
): Call {
val sinceVal = since ?: "all"
Expand All @@ -128,10 +127,8 @@ class ApiService {
val source = response.body?.source() ?: throw Exception("Unexpected response for $url: body is empty")
while (!source.exhausted()) {
val line = source.readUtf8Line() ?: throw Exception("Unexpected response for $url: line is null")
val notification = parser.parseWithTopic(line, notificationId = Random.nextInt(), subscriptionId = 0) // subscriptionId to be set downstream
if (notification != null) {
notify(notification.topic, notification.notification)
}
val message = parser.parseMessage(line)
if (message != null) notify(message)
}
} catch (e: Exception) {
Log.e(TAG, "Connection to $url failed (1): ${e.message}", e)
Expand Down Expand Up @@ -175,6 +172,8 @@ class ApiService {

// These constants have corresponding values in the server codebase!
const val CONTROL_TOPIC = "~control"
const val EVENT_OPEN_PARAM_NEW_TOPIC = "new_topic"
const val EVENT_OPEN = "open"
const val EVENT_MESSAGE = "message"
const val EVENT_KEEPALIVE = "keepalive"
const val EVENT_POLL_REQUEST = "poll_request"
Expand Down
2 changes: 1 addition & 1 deletion app/src/main/java/io/heckel/ntfy/msg/Message.kt
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ data class Message(
val icon: String?,
val actions: List<MessageAction>?,
val title: String?,
val message: String,
val message: String?,
val encoding: String?,
val attachment: MessageAttachment?,
)
Expand Down
12 changes: 8 additions & 4 deletions app/src/main/java/io/heckel/ntfy/msg/NotificationParser.kt
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,17 @@ import java.lang.reflect.Type
class NotificationParser {
private val gson = Gson()

fun parseMessage(s: String) : Message? {
return gson.fromJson(s, Message::class.java)
}

fun parse(s: String, subscriptionId: Long = 0, notificationId: Int = 0): Notification? {
val notificationWithTopic = parseWithTopic(s, subscriptionId = subscriptionId, notificationId = notificationId)
val message = parseMessage(s) ?: return null
val notificationWithTopic = parseNotificationWithTopic(message, subscriptionId = subscriptionId, notificationId = notificationId)
return notificationWithTopic?.notification
}

fun parseWithTopic(s: String, subscriptionId: Long = 0, notificationId: Int = 0): NotificationWithTopic? {
val message = gson.fromJson(s, Message::class.java)
fun parseNotificationWithTopic(message: Message, subscriptionId: Long = 0, notificationId: Int = 0): NotificationWithTopic? {
if (message.event != ApiService.EVENT_MESSAGE) {
return null
}
Expand Down Expand Up @@ -56,7 +60,7 @@ class NotificationParser {
subscriptionId = subscriptionId,
timestamp = message.time,
title = message.title ?: "",
message = message.message,
message = message.message?: "",
encoding = message.encoding ?: "",
priority = toPriority(message.priority),
tags = joinTags(message.tags),
Expand Down
11 changes: 4 additions & 7 deletions app/src/main/java/io/heckel/ntfy/service/JsonConnection.kt
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package io.heckel.ntfy.service
import io.heckel.ntfy.db.*
import io.heckel.ntfy.util.Log
import io.heckel.ntfy.msg.ApiService
import io.heckel.ntfy.msg.Message
import io.heckel.ntfy.util.topicUrl
import kotlinx.coroutines.*
import okhttp3.Call
Expand All @@ -16,7 +17,7 @@ class JsonConnection(
private val user: User?,
private val sinceId: String?,
private val stateChangeListener: (Collection<Long>, ConnectionState) -> Unit,
private val notificationListener: (Subscription, Notification) -> Unit,
private val notificationListener: (ConnectionId, Message) -> String?,
private val serviceActive: () -> Boolean
) : Connection {
private val baseUrl = connectionId.baseUrl
Expand All @@ -40,12 +41,8 @@ class JsonConnection(
while (isActive && serviceActive()) {
Log.d(TAG, "[$url] (Re-)starting connection for subscriptions: $topicsToSubscriptionIds")
val startTime = System.currentTimeMillis()
val notify = notify@ { topic: String, notification: Notification ->
since = notification.id
val subscriptionId = topicsToSubscriptionIds[topic] ?: return@notify
val subscription = repository.getSubscription(subscriptionId) ?: return@notify
val notificationWithSubscriptionId = notification.copy(subscriptionId = subscription.id)
notificationListener(subscription, notificationWithSubscriptionId)
val notify = { message : Message ->
since = notificationListener(ConnectionId(baseUrl, topicsToSubscriptionIds, topicIsUnifiedPush), message)?: since
}
val failed = AtomicBoolean(false)
val fail = { _: Exception ->
Expand Down
36 changes: 34 additions & 2 deletions app/src/main/java/io/heckel/ntfy/service/SubscriberService.kt
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@ import io.heckel.ntfy.R
import io.heckel.ntfy.app.Application
import io.heckel.ntfy.db.ConnectionState
import io.heckel.ntfy.db.Repository
import io.heckel.ntfy.db.Subscription
import io.heckel.ntfy.msg.ApiService
import io.heckel.ntfy.msg.Message
import io.heckel.ntfy.msg.NotificationDispatcher
import io.heckel.ntfy.msg.NotificationParser
import io.heckel.ntfy.ui.Colors
import io.heckel.ntfy.ui.MainActivity
import io.heckel.ntfy.util.Log
Expand All @@ -28,6 +29,7 @@ import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Mutex
import java.util.concurrent.ConcurrentHashMap
import kotlin.random.Random

/**
* The subscriber service manages the foreground service for instant delivery.
Expand Down Expand Up @@ -67,6 +69,7 @@ class SubscriberService : Service() {
private var notificationManager: NotificationManager? = null
private var serviceNotification: Notification? = null
private val refreshMutex = Mutex() // Ensure refreshConnections() is only run one at a time
private val parser = NotificationParser()

override fun onStartCommand(intent: Intent?, flags: Int, startId: Int): Int {
Log.d(TAG, "onStartCommand executed with startId: $startId")
Expand Down Expand Up @@ -247,11 +250,39 @@ class SubscriberService : Service() {
}
}

private fun onConnectionOpen(connectionId: ConnectionId, message: String?) {
Log.d(TAG, "Received open from connection ${connectionId.baseUrl} with message: $message")
// this check is sufficient for now, and the message can be upgraded to include other parameters in the future
if (message?.contains(ApiService.EVENT_OPEN_PARAM_NEW_TOPIC) == true) {
GlobalScope.launch(Dispatchers.IO) {
for (topic in connectionId.topicsToSubscriptionIds.keys) {
if (connectionId.topicIsUnifiedPush[topic] == true) {
Log.d(TAG, "Attempting to re-register ${connectionId.baseUrl}/$topic")
io.heckel.ntfy.up.BroadcastReceiver.sendRegistration(baseContext, connectionId.baseUrl, topic)
// TODO is that the right context - looks like it works???
Comment on lines +261 to +262
Copy link
Author

@karmanyaahm karmanyaahm Dec 17, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should that work? Or do I need to make a whole chain of contexts being passed down and stored? Not really sure what context is used for when creating the repository downstream. It seems to work as is.

}
}
}
}
}
private fun onStateChanged(subscriptionIds: Collection<Long>, state: ConnectionState) {
repository.updateState(subscriptionIds, state)
}

private fun onNotificationReceived(subscription: Subscription, notification: io.heckel.ntfy.db.Notification) {
// Process messages received from the server, and dispatch a notification if required.
// Return the ID of the notification if successfully processed, else null.
private fun onNotificationReceived(connectionId: ConnectionId, message: Message) : String? {
if (message.event == ApiService.EVENT_OPEN) {
onConnectionOpen(connectionId, message.message)
return null
}

val (topic, notificationWithoutId) = parser.parseNotificationWithTopic(message, notificationId = Random.nextInt(), subscriptionId = 0)
?: return null // subscriptionId to be set downstream
val subscriptionId = connectionId.topicsToSubscriptionIds[topic] ?: return null
val subscription = repository.getSubscription(subscriptionId) ?: return null
val notification = notificationWithoutId.copy(subscriptionId = subscription.id)

// Wakelock while notifications are being dispatched
// Wakelocks are reference counted by default so that should work neatly here
wakeLock?.acquire(NOTIFICATION_RECEIVED_WAKELOCK_TIMEOUT_MILLIS)
Expand All @@ -269,6 +300,7 @@ class SubscriberService : Service() {
}
}
}
return notification.id
}

private fun createNotificationChannel(): NotificationManager? {
Expand Down
27 changes: 13 additions & 14 deletions app/src/main/java/io/heckel/ntfy/service/WsConnection.kt
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import android.os.Handler
import android.os.Looper
import io.heckel.ntfy.db.*
import io.heckel.ntfy.msg.ApiService.Companion.requestBuilder
import io.heckel.ntfy.msg.Message
import io.heckel.ntfy.msg.NotificationParser
import io.heckel.ntfy.util.Log
import io.heckel.ntfy.util.topicShortUrl
Expand All @@ -18,7 +19,6 @@ import java.util.*
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent.atomic.AtomicReference
import kotlin.random.Random

/**
* Connect to ntfy server via WebSockets. This connection represents a single connection to a server, with
Expand All @@ -36,7 +36,7 @@ class WsConnection(
private val user: User?,
private val sinceId: String?,
private val stateChangeListener: (Collection<Long>, ConnectionState) -> Unit,
private val notificationListener: (Subscription, Notification) -> Unit,
private val notificationListener: (ConnectionId, Message) -> String?,
private val alarmManager: AlarmManager
) : Connection {
private val parser = NotificationParser()
Expand All @@ -59,7 +59,8 @@ class WsConnection(
private val topicIsUnifiedPush = connectionId.topicIsUnifiedPush
private val subscriptionIds = topicsToSubscriptionIds.values
private val topicsStr = topicsToSubscriptionIds.keys.joinToString(separator = ",")
private val unifiedPushTopicsStr = topicIsUnifiedPush.filter { entry -> entry.value }.keys.joinToString(separator = ",")
private val unifiedPushTopicsStr = topicIsUnifiedPush.filter { entry -> entry.value
}.keys.joinToString(separator = ",")
private val shortUrl = topicShortUrl(baseUrl, topicsStr)

init {
Expand Down Expand Up @@ -137,18 +138,16 @@ class WsConnection(
override fun onMessage(webSocket: WebSocket, text: String) {
synchronize("onMessage") {
Log.d(TAG, "$shortUrl (gid=$globalId, lid=$id): Received message: $text")
val notificationWithTopic = parser.parseWithTopic(text, subscriptionId = 0, notificationId = Random.nextInt())
if (notificationWithTopic == null) {
Log.d(TAG, "$shortUrl (gid=$globalId, lid=$id): Irrelevant or unknown message. Discarding.")
return@synchronize
val message = parser.parseMessage(text) ?: return@synchronize
val id = notificationListener(
ConnectionId(baseUrl, topicsToSubscriptionIds, topicIsUnifiedPush),
message
)
if (id != null) {
since.set(id)
} else {
Log.d(WsConnection.TAG,"$shortUrl (gid=$globalId, lid=$id): Irrelevant or unknown message. Discarding.")
}
val topic = notificationWithTopic.topic
val notification = notificationWithTopic.notification
val subscriptionId = topicsToSubscriptionIds[topic] ?: return@synchronize
val subscription = repository.getSubscription(subscriptionId) ?: return@synchronize
val notificationWithSubscriptionId = notification.copy(subscriptionId = subscription.id)
notificationListener(subscription, notificationWithSubscriptionId)
since.set(notification.id)
}
}

Expand Down
28 changes: 27 additions & 1 deletion app/src/main/java/io/heckel/ntfy/up/BroadcastReceiver.kt
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,12 @@ class BroadcastReceiver : android.content.BroadcastReceiver() {
try {
// Note, this may fail due to a SQL constraint exception, see https://github.com/binwiederhier/ntfy/issues/185
repository.addSubscription(subscription)
distributor.sendEndpoint(appId, connectorToken, endpoint)
/* We don't send the endpoint here anymore, the foreground service will do that after
registering with the push server. This avoids a race condition where the application server
is rejected before ntfy even establishes that this topic exists.
This is fine from an application perspective, because other distributors can't even register
without a connection to the push server.
Unless the app sends registration twice. Then it'll get the endpoint.*/
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

^ Pointing this out, hopefully is fine

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, assuming the Android app is pushed out before the server changes, I'll have to leave the endpoint sending in for now. Otherwise, the new Android app will break for old servers.


// Refresh (and maybe start) foreground service
SubscriberServiceManager.refresh(app)
Expand Down Expand Up @@ -143,5 +148,26 @@ class BroadcastReceiver : android.content.BroadcastReceiver() {
private const val TOPIC_RANDOM_ID_LENGTH = 12

val mutex = Mutex() // https://github.com/binwiederhier/ntfy/issues/230

// TODO Where's the best place to put this function? This seems to be the only place
// with the access to the locks, but also globally accessible
// but also, broadcast receiver is for *receiving Android broadcasts*
public fun sendRegistration(context: Context, baseUrl : String, topic : String) {
val app = context.applicationContext as Application
val repository = app.repository
val distributor = Distributor(app)
GlobalScope.launch(Dispatchers.IO) {
// We're doing all of this inside a critical section, because of possible races.
// See https://github.com/binwiederhier/ntfy/issues/230 for details.

mutex.withLock {
val existingSubscription = repository.getSubscription(baseUrl, topic) ?: return@launch
val appId = existingSubscription.upAppId ?: return@launch
val connectorToken = existingSubscription.upConnectorToken ?: return@launch
val endpoint = topicUrlUp(existingSubscription.baseUrl, existingSubscription.topic)
distributor.sendEndpoint(appId, connectorToken, endpoint)
Comment on lines +153 to +170
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where should I put this function?

}
}
}
}
}