From ab2447c309aeea325081b09fa4a4b10914c6cb24 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?K=C3=A9vin=20Dunglas?=
Date: Thu, 14 May 2020 14:46:25 +0200
Subject: [PATCH 1/3] Implementation of the spec changes
---
docs/getting-started.md | 8 +--
docs/hub/config.md | 3 +-
docs/hub/troubleshooting.md | 6 +-
examples/chat-python-flask/chat.py | 7 +-
examples/publisher-node.js | 2 +-
examples/publisher-php.php | 2 +-
examples/publisher-ruby.rb | 2 +-
gatling/LoadTest.scala | 2 +-
go.mod | 2 +
go.sum | 4 ++
hub/authorization.go | 40 +++++++----
hub/authorization_test.go | 62 +++++-----------
hub/bolt_transport_test.go | 20 +++---
hub/config.go | 1 -
hub/config_test.go | 2 +-
hub/hub.go | 27 ++-----
hub/hub_test.go | 6 +-
hub/log.go | 14 +---
hub/metrics_test.go | 12 ++--
hub/publish.go | 49 +++++--------
hub/publish_test.go | 34 ++++-----
hub/subscribe.go | 81 ++-------------------
hub/subscribe_test.go | 27 ++++---
hub/subscriber.go | 111 +++++++++--------------------
hub/subscriber_test.go | 20 +-----
hub/topic_selector.go | 86 ++++++++++++++++++++++
hub/topic_selector_test.go | 7 ++
hub/transport_test.go | 26 ++++---
hub/update.go | 12 ++--
hub/update_test.go | 22 ++++++
public/app.js | 9 +--
public/index.html | 33 +++++----
spec/openapi.yaml | 8 +--
33 files changed, 330 insertions(+), 417 deletions(-)
create mode 100644 hub/topic_selector.go
create mode 100644 hub/topic_selector_test.go
create mode 100644 hub/update_test.go
diff --git a/docs/getting-started.md b/docs/getting-started.md
index 65fa05c3..7f0f8d02 100644
--- a/docs/getting-started.md
+++ b/docs/getting-started.md
@@ -71,7 +71,7 @@ To dispatch an update, the publisher (an application server, a web browser...) n
```http
POST example.com HTTP/1.1
-Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJtZXJjdXJlIjp7InN1YnNjcmliZSI6WyJmb28iLCJiYXIiXSwicHVibGlzaCI6WyJmb28iXX19.afLx2f2ut3YgNVFStCx95Zm_UND1mZJ69OenXaDuZL8
+Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJtZXJjdXJlIjp7InB1Ymxpc2giOlsiKiJdLCJzdWJzY3JpYmUiOlsiaHR0cHM6Ly9leGFtcGxlLmNvbS9teS1wcml2YXRlLXRvcGljIiwiaHR0cDovL2xvY2FsaG9zdDozMDAwL2RlbW8vYm9va3Mve2lkfS5qc29ubGQiXSwicGF5bG9hZCI6eyJ1c2VyIjoiaHR0cHM6Ly9leGFtcGxlLmNvbS91c2Vycy9kdW5nbGFzIiwicmVtb3RlX2FkZHIiOiIxMjcuMC4wLjEifX19.bRUavgS2H9GyCHq7eoPUL_rZm2L7fGujtyyzUhiOsnw
topic=https://example.com/books/1&data={"foo": "updated value"}
```
@@ -95,8 +95,8 @@ const req = https.request({
path: '/.well-known/mercure',
method: 'POST',
headers: {
- Authorization: 'Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJtZXJjdXJlIjp7InN1YnNjcmliZSI6WyJmb28iLCJiYXIiXSwicHVibGlzaCI6WyJmb28iXX19.afLx2f2ut3YgNVFStCx95Zm_UND1mZJ69OenXaDuZL8',
- // the JWT must have a mercure.publish key containing an array of targets (can be empty for public updates)
+ Authorization: 'Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJtZXJjdXJlIjp7InB1Ymxpc2giOlsiKiJdLCJzdWJzY3JpYmUiOlsiaHR0cHM6Ly9leGFtcGxlLmNvbS9teS1wcml2YXRlLXRvcGljIiwiaHR0cDovL2xvY2FsaG9zdDozMDAwL2RlbW8vYm9va3Mve2lkfS5qc29ubGQiXSwicGF5bG9hZCI6eyJ1c2VyIjoiaHR0cHM6Ly9leGFtcGxlLmNvbS91c2Vycy9kdW5nbGFzIiwicmVtb3RlX2FkZHIiOiIxMjcuMC4wLjEifX19.bRUavgS2H9GyCHq7eoPUL_rZm2L7fGujtyyzUhiOsnw',
+ // the JWT must have a mercure.publish key containing an array of topic selectors (can contain "*" for all topics, and be empty for public updates)
// the JWT key must be shared between the hub and the server
'Content-Type': 'application/x-www-form-urlencoded',
'Content-Length': Buffer.byteLength(postData),
@@ -109,7 +109,7 @@ req.end();
// but any HTTP client, written in any language, will be just fine.
```
-The JWT must contain a `publish` property containing an array of targets. This array can be empty to allow publishing anonymous updates only. To create and read JWTs try [jwt.io](https://jwt.io) ([demo token](https://jwt.io/#debugger-io?token=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJtZXJjdXJlIjp7InN1YnNjcmliZSI6WyJmb28iLCJiYXIiXSwicHVibGlzaCI6WyJmb28iXX19.afLx2f2ut3YgNVFStCx95Zm_UND1mZJ69OenXaDuZL8), key: `!ChangeMe!`).
+The JWT must contain a `publish` property containing an array of topic selectors. This array can be empty to allow publishing anonymous updates only. The topic selector `*` can be used to allow publishing private updates for all topics. To create and read JWTs try [jwt.io](https://jwt.io) ([demo token](https://jwt.io/#debugger-io?token=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJtZXJjdXJlIjp7InB1Ymxpc2giOlsiKiJdLCJzdWJzY3JpYmUiOlsiaHR0cHM6Ly9leGFtcGxlLmNvbS9teS1wcml2YXRlLXRvcGljIiwiaHR0cDovL2xvY2FsaG9zdDozMDAwL2RlbW8vYm9va3Mve2lkfS5qc29ubGQiXSwicGF5bG9hZCI6eyJ1c2VyIjoiaHR0cHM6Ly9leGFtcGxlLmNvbS91c2Vycy9kdW5nbGFzIiwicmVtb3RlX2FkZHIiOiIxMjcuMC4wLjEifX19.bRUavgS2H9GyCHq7eoPUL_rZm2L7fGujtyyzUhiOsnw), key: `!ChangeMe!`).
## Going Further
diff --git a/docs/hub/config.md b/docs/hub/config.md
index 65e3b7f7..38c822e3 100644
--- a/docs/hub/config.md
+++ b/docs/hub/config.md
@@ -28,7 +28,7 @@ When using environment variables, list must be space separated. As flags paramet
| `cors_allowed_origins` | a list of allowed CORS origins, can be `*` for all |
| `debug` | set to `true` to enable the debug mode, **dangerous, don't enable in production** (logs updates' content, why an update is not send to a specific subscriber and recovery stack traces) |
| `demo` | set to `true` to enable the demo mode (automatically enabled when `debug=true`) |
-| `dispatch_subscriptions` | set to `true` to dispatch updates when a subscription between the Hub and a subscriber is established or closed. The topic follows the template `https://mercure.rocks/subscriptions/{subscriptionID}`. To receive connection updates, subscribers must have `https://mercure.rocks/targets/subscriptions` or an URL matching the template `https://mercure.rocks/targets/subscriptions/{topic}` (`{topic}` is URL-encoded topic of the subscription) as targets |
+| `dispatch_subscriptions` | set to `true` to dispatch private updates when a subscription between the Hub and a subscriber is established or closed. The topic follows the template `/.well-known/mercure/subscriptions/{subscriptionID}/{topic}` |
| `heartbeat_interval` | interval between heartbeats (useful with some proxies, and old browsers), defaults to `15s`, set to `0s` to disable |
| `jwt_key` | the JWT key to use for both publishers and subscribers |
| `jwt_algorithm` | the JWT verification algorithm to use for both publishers and subscribers, e.g. HS256 (default) or RS512 |
@@ -42,7 +42,6 @@ When using environment variables, list must be space separated. As flags paramet
| `read_timeout` | maximum duration for reading the entire request, including the body, set to `0s` to disable (default), example: `2m` |
| `subscriber_jwt_key` | must contain the secret key to valid subscribers' JWT, can be omitted if `jwt_key` is set |
| `subscriber_jwt_algorithm` | the JWT verification algorithm to use for subscribers, e.g. HS256 (default) or RS512 |
-| `subscriptions_include_ip` | set to `true` to include the subscriber's IP in the subscription update |
| `transport_url` | URL representation of the history database. Provided database are `null` to disabled history, `bolt` to use [bbolt](https://github.com/etcd-io/bbolt) (example `bolt:///var/run/mercure.db?size=100&cleanup_frequency=0.4`), defaults to `bolt://updates.db` |
| `update_buffer_size` | maximum number of updates to allow buffering before closing the connection |
| `update_buffer_full_timeout` | time to wait before closing the connection after the buffer is full |
diff --git a/docs/hub/troubleshooting.md b/docs/hub/troubleshooting.md
index 53aeb971..81091b81 100644
--- a/docs/hub/troubleshooting.md
+++ b/docs/hub/troubleshooting.md
@@ -5,10 +5,10 @@
* Check the logs written by the hub on `stderr`, they contain the exact reason why the token has been rejected
* Be sure to set a **secret key** (and not a JWT) in `JWT_KEY` (or in `SUBSCRIBER_JWT_KEY` and `PUBLISHER_JWT_KEY`)
* If the secret key contains special characters, be sure to escape them properly, especially if you set the environment variable in a shell, or in a YAML file (Kubernetes...)
-* The publisher always needs a valid JWT, even if `ALLOW_ANONYMOUS` is set to `1`, this JWT **must** have a property named `publish` and containing an array of targets ([example](https://jwt.io/#debugger-io?token=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJtZXJjdXJlIjp7InB1Ymxpc2giOltdfX0.473isprbLWLjXmAaVZj6FIVkCdjn37SQpGjzWws-xa0))
-* The subscriber needs a valid JWT only if `ALLOW_ANONYMOUS` is set to `0` (default), or to subscribe to private updates, in this case the JWT **must** have a property named `subscribe` and containing an array of targets ([example](https://jwt.io/#debugger-io?token=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJtZXJjdXJlIjp7InN1YnNjcmliZSI6W119fQ.s-6MlTvJ6vpsZ7ftmz3dvWpZznRxnxI0KlrZOHVo8Qc))
+* The publisher always needs a valid JWT, even if `ALLOW_ANONYMOUS` is set to `1`, this JWT **must** have a property named `publish`. To dispatch private updates, the `publish` property must contain the list of topic selectors this publisher can use ([example](https://jwt.io/#debugger-io?token=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJtZXJjdXJlIjp7InB1Ymxpc2giOlsiKiJdLCJzdWJzY3JpYmUiOlsiaHR0cHM6Ly9leGFtcGxlLmNvbS9teS1wcml2YXRlLXRvcGljIiwiaHR0cDovL2xvY2FsaG9zdDozMDAwL2RlbW8vYm9va3Mve2lkfS5qc29ubGQiXSwicGF5bG9hZCI6eyJ1c2VyIjoiaHR0cHM6Ly9leGFtcGxlLmNvbS91c2Vycy9kdW5nbGFzIiwicmVtb3RlX2FkZHIiOiIxMjcuMC4wLjEifX19.bRUavgS2H9GyCHq7eoPUL_rZm2L7fGujtyyzUhiOsnw))
+* The subscriber needs a valid JWT only if `ALLOW_ANONYMOUS` is set to `0` (default), or to subscribe to private updates, in this case the JWT **must** have a property named `subscribe` and containing an array of topic selectors ([example](https://jwt.io/#debugger-io?token=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJtZXJjdXJlIjp7InB1Ymxpc2giOlsiKiJdLCJzdWJzY3JpYmUiOlsiaHR0cHM6Ly9leGFtcGxlLmNvbS9teS1wcml2YXRlLXRvcGljIiwiaHR0cDovL2xvY2FsaG9zdDozMDAwL2RlbW8vYm9va3Mve2lkfS5qc29ubGQiXSwicGF5bG9hZCI6eyJ1c2VyIjoiaHR0cHM6Ly9leGFtcGxlLmNvbS91c2Vycy9kdW5nbGFzIiwicmVtb3RlX2FkZHIiOiIxMjcuMC4wLjEifX19.bRUavgS2H9GyCHq7eoPUL_rZm2L7fGujtyyzUhiOsnw))
-For both the `publish` and `subscribe` properties, the array can be empty to publish only public updates, or set it to `["*"]` to allow accessing to all targets.
+For both the `publish` and `subscribe` properties, the array can be empty to publish only public updates, or set it to `["*"]` to allow publishing updates for all topics.
## Browser Issues
diff --git a/examples/chat-python-flask/chat.py b/examples/chat-python-flask/chat.py
index 252d24fc..2bd0effe 100644
--- a/examples/chat-python-flask/chat.py
+++ b/examples/chat-python-flask/chat.py
@@ -20,7 +20,6 @@
JWT_KEY: the JWT key to use (must be shared with the Mercure hub)
HUB_URL: the URL of the Mercure hub (default: http://localhost:3000/.well-known/mercure)
TOPIC: the topic to use (default: http://example.com/chat)
- TARGET: the target to use (default: chan)
COOKIE_DOMAIN: the cookie domain (default: None)
"""
@@ -32,16 +31,14 @@
@app.route("/")
def chat():
- targets = [os.environ.get('TARGET', 'chan')]
+ topic = os.environ.get('TOPIC', 'http://example.com/chat')
token = jwt.encode(
- {'mercure': {'subscribe': targets, 'publish': targets}},
+ {'mercure': {'subscribe': [topics], 'publish': [topics]}},
os.environ.get('JWT_KEY', '!ChangeMe!'),
algorithm='HS256'
)
hub_url = os.environ.get('HUB_URL', 'http://localhost:3000/.well-known/mercure')
- topic = os.environ.get('TOPIC', 'http://example.com/chat')
-
resp = make_response(render_template('chat.html', config={
'hubURL': hub_url, 'topic': topic}))
resp.set_cookie('mercureAuthorization', token, httponly=True, path='/.well-known/mercure',
diff --git a/examples/publisher-node.js b/examples/publisher-node.js
index 512c8e55..c42124a8 100644
--- a/examples/publisher-node.js
+++ b/examples/publisher-node.js
@@ -2,7 +2,7 @@ const http = require("http");
const querystring = require("querystring");
const demoJwt =
- "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJtZXJjdXJlIjp7InN1YnNjcmliZSI6WyJmb28iLCJiYXIiXSwicHVibGlzaCI6WyJmb28iXX19.afLx2f2ut3YgNVFStCx95Zm_UND1mZJ69OenXaDuZL8";
+ "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJtZXJjdXJlIjp7InB1Ymxpc2giOlsiKiJdLCJzdWJzY3JpYmUiOlsiaHR0cHM6Ly9leGFtcGxlLmNvbS9teS1wcml2YXRlLXRvcGljIiwiaHR0cDovL2xvY2FsaG9zdDozMDAwL2RlbW8vYm9va3Mve2lkfS5qc29ubGQiXSwicGF5bG9hZCI6eyJ1c2VyIjoiaHR0cHM6Ly9leGFtcGxlLmNvbS91c2Vycy9kdW5nbGFzIiwicmVtb3RlX2FkZHIiOiIxMjcuMC4wLjEifX19.bRUavgS2H9GyCHq7eoPUL_rZm2L7fGujtyyzUhiOsnw";
const postData = querystring.stringify({
topic: "http://localhost:3000/demo/books/1.jsonld",
diff --git a/examples/publisher-php.php b/examples/publisher-php.php
index 25324826..087462c3 100644
--- a/examples/publisher-php.php
+++ b/examples/publisher-php.php
@@ -1,6 +1,6 @@
'http://localhost:3000/demo/books/1.jsonld',
diff --git a/examples/publisher-ruby.rb b/examples/publisher-ruby.rb
index 7a43b3b3..99aa2c95 100644
--- a/examples/publisher-ruby.rb
+++ b/examples/publisher-ruby.rb
@@ -1,7 +1,7 @@
require 'json'
require 'net/http'
-token = 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJtZXJjdXJlIjp7InN1YnNjcmliZSI6WyJmb28iLCJiYXIiXSwicHVibGlzaCI6WyJmb28iXX19.afLx2f2ut3YgNVFStCx95Zm_UND1mZJ69OenXaDuZL8'
+token = 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJtZXJjdXJlIjp7InB1Ymxpc2giOlsiKiJdLCJzdWJzY3JpYmUiOlsiaHR0cHM6Ly9leGFtcGxlLmNvbS9teS1wcml2YXRlLXRvcGljIiwiaHR0cDovL2xvY2FsaG9zdDozMDAwL2RlbW8vYm9va3Mve2lkfS5qc29ubGQiXSwicGF5bG9hZCI6eyJ1c2VyIjoiaHR0cHM6Ly9leGFtcGxlLmNvbS91c2Vycy9kdW5nbGFzIiwicmVtb3RlX2FkZHIiOiIxMjcuMC4wLjEifX19.bRUavgS2H9GyCHq7eoPUL_rZm2L7fGujtyyzUhiOsnw'
Net::HTTP.start('localhost', 3000) do |http|
req = Net::HTTP::Post.new('/.well-known/mercure')
diff --git a/gatling/LoadTest.scala b/gatling/LoadTest.scala
index eeaffb6a..c64b76ef 100644
--- a/gatling/LoadTest.scala
+++ b/gatling/LoadTest.scala
@@ -27,7 +27,7 @@ class LoadTest extends Simulation {
/** The hub URL */
val HubUrl = Properties.envOrElse("HUB_URL", "http://localhost:3001/.well-known/mercure")
/** JWT to use to publish */
- val Jwt = Properties.envOrElse("JWT", "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJtZXJjdXJlIjp7InN1YnNjcmliZSI6WyJmb28iLCJiYXIiXSwicHVibGlzaCI6WyJmb28iXX19.afLx2f2ut3YgNVFStCx95Zm_UND1mZJ69OenXaDuZL8")
+ val Jwt = Properties.envOrElse("JWT", "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJtZXJjdXJlIjp7InB1Ymxpc2giOlsiKiJdLCJzdWJzY3JpYmUiOlsiaHR0cHM6Ly9leGFtcGxlLmNvbS9teS1wcml2YXRlLXRvcGljIiwiaHR0cDovL2xvY2FsaG9zdDozMDAwL2RlbW8vYm9va3Mve2lkfS5qc29ubGQiXSwicGF5bG9hZCI6eyJ1c2VyIjoiaHR0cHM6Ly9leGFtcGxlLmNvbS91c2Vycy9kdW5nbGFzIiwicmVtb3RlX2FkZHIiOiIxMjcuMC4wLjEifX19.bRUavgS2H9GyCHq7eoPUL_rZm2L7fGujtyyzUhiOsnw")
/** Number of concurrent subscribers initially connected */
val InitialSubscribers = Properties.envOrElse("INITIAL_SUBSCRIBERS", "100").toInt
/** Additional subscribers rate (per second) */
diff --git a/go.mod b/go.mod
index 8c49c0ec..034ba97b 100644
--- a/go.mod
+++ b/go.mod
@@ -12,8 +12,10 @@ require (
github.com/konsorten/go-windows-terminal-sequences v1.0.3 // indirect
github.com/mitchellh/mapstructure v1.3.0 // indirect
github.com/pelletier/go-toml v1.7.0 // indirect
+ github.com/petermattis/goid v0.0.0-20180202154549-b0b1615b78e5 // indirect
github.com/prometheus/client_golang v1.6.0
github.com/prometheus/client_model v0.2.0
+ github.com/sasha-s/go-deadlock v0.2.0
github.com/sirupsen/logrus v1.5.0
github.com/spf13/afero v1.2.2 // indirect
github.com/spf13/cast v1.3.1 // indirect
diff --git a/go.sum b/go.sum
index c0db1b30..4aca7d3b 100644
--- a/go.sum
+++ b/go.sum
@@ -120,6 +120,8 @@ github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn
github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic=
github.com/pelletier/go-toml v1.7.0 h1:7utD74fnzVc/cpcyy8sjrlFr5vYpypUixARcHIMIGuI=
github.com/pelletier/go-toml v1.7.0/go.mod h1:vwGMzjaWMwyfHwgIBhI2YUM4fB6nL6lVAvS1LBMMhTE=
+github.com/petermattis/goid v0.0.0-20180202154549-b0b1615b78e5 h1:q2e307iGHPdTGp0hoxKjt1H5pDo6utceo3dQVK3I5XQ=
+github.com/petermattis/goid v0.0.0-20180202154549-b0b1615b78e5/go.mod h1:jvVRKCrJTQWu0XVbaOlby/2lO20uSCHEMzzplHXte1o=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
@@ -147,6 +149,8 @@ github.com/prometheus/procfs v0.0.11/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4
github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU=
github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
+github.com/sasha-s/go-deadlock v0.2.0 h1:lMqc+fUb7RrFS3gQLtoQsJ7/6TV/pAIFvBsqX73DK8Y=
+github.com/sasha-s/go-deadlock v0.2.0/go.mod h1:StQn567HiB1fF2yJ44N9au7wOhrPS3iZqiDbRupzT10=
github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc=
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
diff --git a/hub/authorization.go b/hub/authorization.go
index 7ea5fae3..4d641ddb 100644
--- a/hub/authorization.go
+++ b/hub/authorization.go
@@ -168,26 +168,36 @@ func validateJWT(encodedToken string, key []byte, signingAlgorithm jwt.SigningMe
return nil, ErrInvalidJWT
}
-func authorizedTargets(claims *claims, publisher bool) (all bool, targets map[string]struct{}) {
- if claims == nil {
- return false, map[string]struct{}{}
+func canReceive(s *topicSelectorStore, topics, topicSelectors []string) bool {
+ for _, topic := range topics {
+ for _, topicSelector := range topicSelectors {
+ if s.match(topic, topicSelector, true) {
+ return true
+ }
+ }
}
- var providedTargets []string
- if publisher {
- providedTargets = claims.Mercure.Publish
- } else {
- providedTargets = claims.Mercure.Subscribe
- }
+ return false
+}
- authorizedTargets := make(map[string]struct{}, len(providedTargets))
- for _, target := range providedTargets {
- if target == "*" {
- return true, nil
+func canDispatch(s *topicSelectorStore, topics, topicSelectors []string) bool {
+ for _, topic := range topics {
+ var matched bool
+ for _, topicSelector := range topicSelectors {
+ if topicSelector == "*" {
+ return true
+ }
+
+ if s.match(topic, topicSelector, false) {
+ matched = true
+ break
+ }
}
- authorizedTargets[target] = struct{}{}
+ if !matched {
+ return false
+ }
}
- return false, authorizedTargets
+ return true
}
diff --git a/hub/authorization_test.go b/hub/authorization_test.go
index d64e4479..acb56136 100644
--- a/hub/authorization_test.go
+++ b/hub/authorization_test.go
@@ -344,50 +344,24 @@ func TestAuthorizeCookieOriginHasPriorityRsa(t *testing.T) {
assert.Nil(t, err)
}
-func TestAuthorizedNilClaim(t *testing.T) {
- all, targets := authorizedTargets(nil, true)
- assert.False(t, all)
- assert.Empty(t, targets)
-}
-
-func TestAuthorizedTargetsPublisher(t *testing.T) {
- c := &claims{Mercure: mercureClaim{
- Publish: []string{"foo", "bar"},
- }}
-
- all, targets := authorizedTargets(c, true)
- assert.False(t, all)
- assert.Equal(t, map[string]struct{}{"foo": {}, "bar": {}}, targets)
-}
-
-func TestAuthorizedAllTargetsPublisher(t *testing.T) {
- c := &claims{Mercure: mercureClaim{
- Publish: []string{"*"},
- }}
-
- all, targets := authorizedTargets(c, true)
- assert.True(t, all)
- assert.Empty(t, targets)
-}
-
-func TestAuthorizedTargetsSubscriber(t *testing.T) {
- c := &claims{Mercure: mercureClaim{
- Subscribe: []string{"foo", "bar"},
- }}
-
- all, targets := authorizedTargets(c, false)
- assert.False(t, all)
- assert.Equal(t, map[string]struct{}{"foo": {}, "bar": {}}, targets)
-}
-
-func TestAuthorizedAllTargetsSubscriber(t *testing.T) {
- c := &claims{Mercure: mercureClaim{
- Subscribe: []string{"*"},
- }}
-
- all, targets := authorizedTargets(c, false)
- assert.True(t, all)
- assert.Empty(t, targets)
+func TestCanReceive(t *testing.T) {
+ s := newTopicSelectorStore()
+ assert.True(t, canReceive(s, []string{"foo", "bar"}, []string{"foo", "bar"}))
+ assert.True(t, canReceive(s, []string{"foo", "bar"}, []string{"bar"}))
+ assert.True(t, canReceive(s, []string{"foo", "bar"}, []string{"*"}))
+ assert.False(t, canReceive(s, []string{"foo", "bar"}, []string{}))
+ assert.False(t, canReceive(s, []string{"foo", "bar"}, []string{"baz"}))
+ assert.False(t, canReceive(s, []string{"foo", "bar"}, []string{"baz", "bat"}))
+}
+
+func TestCanDispatch(t *testing.T) {
+ s := newTopicSelectorStore()
+ assert.True(t, canDispatch(s, []string{"foo", "bar"}, []string{"foo", "bar"}))
+ assert.True(t, canDispatch(s, []string{"foo", "bar"}, []string{"*"}))
+ assert.False(t, canDispatch(s, []string{"foo", "bar"}, []string{}))
+ assert.False(t, canDispatch(s, []string{"foo", "bar"}, []string{"foo"}))
+ assert.False(t, canDispatch(s, []string{"foo", "bar"}, []string{"baz"}))
+ assert.False(t, canDispatch(s, []string{"foo", "bar"}, []string{"baz", "bat"}))
}
func TestGetJWTKeyInvalid(t *testing.T) {
diff --git a/hub/bolt_transport_test.go b/hub/bolt_transport_test.go
index 9e27b253..d385d7c7 100644
--- a/hub/bolt_transport_test.go
+++ b/hub/bolt_transport_test.go
@@ -26,9 +26,8 @@ func TestBoltTransportHistory(t *testing.T) {
})
}
- s := newSubscriber("8")
+ s := newSubscriber("8", newTopicSelectorStore())
s.Topics = topics
- s.RawTopics = topics
go s.start()
err := transport.AddSubscriber(s)
@@ -60,9 +59,8 @@ func TestBoltTransportHistoryAndLive(t *testing.T) {
})
}
- s := newSubscriber("8")
+ s := newSubscriber("8", newTopicSelectorStore())
s.Topics = topics
- s.RawTopics = topics
go s.start()
err := transport.AddSubscriber(s)
@@ -148,7 +146,7 @@ func TestBoltTransportDoNotDispatchedUntilListen(t *testing.T) {
defer os.Remove("test.db")
assert.Implements(t, (*Transport)(nil), transport)
- s := newSubscriber("")
+ s := newSubscriber("", newTopicSelectorStore())
go s.start()
err := transport.AddSubscriber(s)
@@ -184,9 +182,8 @@ func TestBoltTransportDispatch(t *testing.T) {
defer os.Remove("test.db")
assert.Implements(t, (*Transport)(nil), transport)
- s := newSubscriber("")
+ s := newSubscriber("", newTopicSelectorStore())
s.Topics = []string{"https://example.com/foo"}
- s.RawTopics = s.Topics
go s.start()
err := transport.AddSubscriber(s)
@@ -209,9 +206,8 @@ func TestBoltTransportClosed(t *testing.T) {
defer os.Remove("test.db")
assert.Implements(t, (*Transport)(nil), transport)
- s := newSubscriber("")
+ s := newSubscriber("", newTopicSelectorStore())
s.Topics = []string{"https://example.com/foo"}
- s.RawTopics = s.Topics
go s.start()
err := transport.AddSubscriber(s)
@@ -237,12 +233,14 @@ func TestBoltCleanDisconnectedSubscribers(t *testing.T) {
defer transport.Close()
defer os.Remove("test.db")
- s1 := newSubscriber("")
+ tss := newTopicSelectorStore()
+
+ s1 := newSubscriber("", tss)
go s1.start()
err := transport.AddSubscriber(s1)
require.Nil(t, err)
- s2 := newSubscriber("")
+ s2 := newSubscriber("", tss)
go s2.start()
err = transport.AddSubscriber(s2)
require.Nil(t, err)
diff --git a/hub/config.go b/hub/config.go
index 09406bdb..f9f19991 100644
--- a/hub/config.go
+++ b/hub/config.go
@@ -28,7 +28,6 @@ func SetConfigDefaults(v *viper.Viper) {
v.SetDefault("use_forwarded_headers", false)
v.SetDefault("demo", false)
v.SetDefault("dispatch_subscriptions", false)
- v.SetDefault("subscriptions_include_ip", false)
v.SetDefault("metrics", false)
v.SetDefault("metrics_login", "")
v.SetDefault("metrics_password", "")
diff --git a/hub/config_test.go b/hub/config_test.go
index c15d7f62..ad3c4333 100644
--- a/hub/config_test.go
+++ b/hub/config_test.go
@@ -37,7 +37,7 @@ func TestSetFlags(t *testing.T) {
fs := pflag.NewFlagSet("test", pflag.PanicOnError)
SetFlags(fs, v)
- assert.Subset(t, v.AllKeys(), []string{"cert_file", "compress", "demo", "jwt_algorithm", "transport_url", "acme_hosts", "acme_cert_dir", "subscriber_jwt_key", "log_format", "jwt_key", "allow_anonymous", "debug", "read_timeout", "publisher_jwt_algorithm", "write_timeout", "key_file", "use_forwarded_headers", "subscriber_jwt_algorithm", "addr", "publisher_jwt_key", "heartbeat_interval", "cors_allowed_origins", "publish_allowed_origins", "dispatch_subscriptions", "subscriptions_include_ip", "metrics", "metrics_login", "metrics_password", "dispatch_timeout"})
+ assert.Subset(t, v.AllKeys(), []string{"cert_file", "compress", "demo", "jwt_algorithm", "transport_url", "acme_hosts", "acme_cert_dir", "subscriber_jwt_key", "log_format", "jwt_key", "allow_anonymous", "debug", "read_timeout", "publisher_jwt_algorithm", "write_timeout", "key_file", "use_forwarded_headers", "subscriber_jwt_algorithm", "addr", "publisher_jwt_key", "heartbeat_interval", "cors_allowed_origins", "publish_allowed_origins", "dispatch_subscriptions", "metrics", "metrics_login", "metrics_password", "dispatch_timeout"})
}
func TestInitConfig(t *testing.T) {
diff --git a/hub/hub.go b/hub/hub.go
index 1725f551..ab898f35 100644
--- a/hub/hub.go
+++ b/hub/hub.go
@@ -3,32 +3,17 @@ package hub
import (
"log"
"net/http"
- "sync"
"github.com/spf13/viper"
- "github.com/yosida95/uritemplate"
)
-// uriTemplates caches uritemplate.Template to improve memory and CPU usage.
-type uriTemplates struct {
- sync.RWMutex
- m map[string]*templateCache
-}
-
-type templateCache struct {
- // counter stores the number of subsribers currently using this topic
- counter uint32
- // the uritemplate.Template instance, of nil if it's a raw string
- template *uritemplate.Template
-}
-
// Hub stores channels with clients currently subscribed and allows to dispatch updates.
type Hub struct {
- config *viper.Viper
- transport Transport
- server *http.Server
- uriTemplates uriTemplates
- metrics *Metrics
+ config *viper.Viper
+ transport Transport
+ server *http.Server
+ topicSelectorStore *topicSelectorStore
+ metrics *Metrics
}
// Stop stops disconnect all connected clients.
@@ -56,7 +41,7 @@ func NewHubWithTransport(v *viper.Viper, t Transport) *Hub {
v,
t,
nil,
- uriTemplates{m: make(map[string]*templateCache)},
+ newTopicSelectorStore(),
NewMetrics(),
}
}
diff --git a/hub/hub_test.go b/hub/hub_test.go
index e13f4311..1948d77c 100644
--- a/hub/hub_test.go
+++ b/hub/hub_test.go
@@ -87,16 +87,16 @@ func createDummyWithTransportAndConfig(t Transport, v *viper.Viper) *Hub {
return NewHubWithTransport(v, t)
}
-func createDummyAuthorizedJWT(h *Hub, r role, targets []string) string {
+func createDummyAuthorizedJWT(h *Hub, r role, topicSelectors []string) string {
token := jwt.New(jwt.SigningMethodHS256)
key := h.getJWTKey(r)
switch r {
case publisherRole:
- token.Claims = &claims{mercureClaim{Publish: targets}, jwt.StandardClaims{}}
+ token.Claims = &claims{mercureClaim{Publish: topicSelectors}, jwt.StandardClaims{}}
case subscriberRole:
- token.Claims = &claims{mercureClaim{Subscribe: targets}, jwt.StandardClaims{}}
+ token.Claims = &claims{mercureClaim{Subscribe: topicSelectors}, jwt.StandardClaims{}}
}
tokenString, _ := token.SignedString(key)
diff --git a/hub/log.go b/hub/log.go
index 12877749..bf01221b 100644
--- a/hub/log.go
+++ b/hub/log.go
@@ -11,7 +11,7 @@ func addUpdateFields(f log.Fields, u *Update, debug bool) log.Fields {
f["event_type"] = u.Type
f["event_retry"] = u.Retry
f["update_topics"] = u.Topics
- f["update_targets"] = targetsMapToSlice(u.Targets)
+ f["update_private"] = u.Private
if debug {
f["update_data"] = u.Data
@@ -29,18 +29,6 @@ func createFields(u *Update, s *Subscriber) log.Fields {
return f
}
-func targetsMapToSlice(t map[string]struct{}) []string {
- targets := make([]string, len(t))
-
- var i int
- for target := range t {
- targets[i] = target
- i++
- }
-
- return targets
-}
-
// InitLogrus configures the global logger.
func InitLogrus() {
if viper.GetBool("debug") {
diff --git a/hub/metrics_test.go b/hub/metrics_test.go
index 42c6f864..fc75f677 100644
--- a/hub/metrics_test.go
+++ b/hub/metrics_test.go
@@ -11,13 +11,15 @@ import (
func TestNumberOfRunningSubscribers(t *testing.T) {
m := NewMetrics()
- s1 := newSubscriber("")
+ sst := newTopicSelectorStore()
+
+ s1 := newSubscriber("", sst)
s1.Topics = []string{"topic1", "topic2"}
m.NewSubscriber(s1)
assertGaugeLabelValue(t, 1.0, m.subscribers, "topic1")
assertGaugeLabelValue(t, 1.0, m.subscribers, "topic2")
- s2 := newSubscriber("")
+ s2 := newSubscriber("", sst)
s2.Topics = []string{"topic2"}
m.NewSubscriber(s2)
assertGaugeLabelValue(t, 1.0, m.subscribers, "topic1")
@@ -35,13 +37,15 @@ func TestNumberOfRunningSubscribers(t *testing.T) {
func TestTotalNumberOfHandledSubscribers(t *testing.T) {
m := NewMetrics()
- s1 := newSubscriber("")
+ sst := newTopicSelectorStore()
+
+ s1 := newSubscriber("", sst)
s1.Topics = []string{"topic1", "topic2"}
m.NewSubscriber(s1)
assertCounterValue(t, 1.0, m.subscribersTotal, "topic1")
assertCounterValue(t, 1.0, m.subscribersTotal, "topic2")
- s2 := newSubscriber("")
+ s2 := newSubscriber("", sst)
s2.Topics = []string{"topic2"}
m.NewSubscriber(s2)
assertCounterValue(t, 1.0, m.subscribersTotal, "topic1")
diff --git a/hub/publish.go b/hub/publish.go
index 00eb67ef..5f6dc7d2 100644
--- a/hub/publish.go
+++ b/hub/publish.go
@@ -1,8 +1,6 @@
package hub
import (
- "errors"
- "fmt"
"io"
"net/http"
"strconv"
@@ -10,8 +8,6 @@ import (
log "github.com/sirupsen/logrus"
)
-var ErrTargetNotAuthorized = errors.New("target not authorized")
-
// PublishHandler allows publisher to broadcast updates to all subscribers.
func (h *Hub) PublishHandler(w http.ResponseWriter, r *http.Request) {
claims, err := authorize(r, h.getJWTKey(publisherRole), h.getJWTAlgorithm(publisherRole), h.config.GetStringSlice("publish_allowed_origins"))
@@ -32,18 +28,6 @@ func (h *Hub) PublishHandler(w http.ResponseWriter, r *http.Request) {
return
}
- data := r.PostForm.Get("data")
- if data == "" {
- http.Error(w, "Missing \"data\" parameter", http.StatusBadRequest)
- return
- }
-
- targets, err := getAuthorizedTargets(claims, r.PostForm["target"])
- if err != nil {
- http.Error(w, http.StatusText(http.StatusUnauthorized), http.StatusUnauthorized)
- return
- }
-
var retry uint64
retryString := r.PostForm.Get("retry")
if retryString != "" {
@@ -54,7 +38,22 @@ func (h *Hub) PublishHandler(w http.ResponseWriter, r *http.Request) {
}
}
- u := newUpdate(Event{data, r.PostForm.Get("id"), r.PostForm.Get("type"), retry}, topics, targets)
+ private := len(r.PostForm["private"]) != 0
+ if private && !canDispatch(h.topicSelectorStore, topics, claims.Mercure.Publish) {
+ http.Error(w, http.StatusText(http.StatusUnauthorized), http.StatusUnauthorized)
+ return
+ }
+
+ u := newUpdate(
+ topics,
+ private,
+ Event{
+ r.PostForm.Get("data"),
+ r.PostForm.Get("id"),
+ r.PostForm.Get("type"),
+ retry,
+ },
+ )
// Broadcast the update
if err := h.transport.Dispatch(u); err != nil {
@@ -66,19 +65,3 @@ func (h *Hub) PublishHandler(w http.ResponseWriter, r *http.Request) {
h.metrics.NewUpdate(u)
}
-
-func getAuthorizedTargets(claims *claims, t []string) (map[string]struct{}, error) {
- authorizedAlltargets, authorizedTargets := authorizedTargets(claims, true)
- targets := make(map[string]struct{}, len(t))
- for _, t := range t {
- if !authorizedAlltargets {
- _, ok := authorizedTargets[t]
- if !ok {
- return nil, fmt.Errorf("%q: %w", t, ErrTargetNotAuthorized)
- }
- }
- targets[t] = struct{}{}
- }
-
- return targets, nil
-}
diff --git a/hub/publish_test.go b/hub/publish_test.go
index abe42eb7..6cad3d7d 100644
--- a/hub/publish_test.go
+++ b/hub/publish_test.go
@@ -96,7 +96,7 @@ func TestPublishNoData(t *testing.T) {
req := httptest.NewRequest("POST", defaultHubURL, strings.NewReader(form.Encode()))
req.Header.Add("Content-Type", "application/x-www-form-urlencoded")
- req.Header.Add("Authorization", "Bearer "+createDummyAuthorizedJWT(hub, publisherRole, []string{}))
+ req.Header.Add("Authorization", "Bearer "+createDummyAuthorizedJWT(hub, publisherRole, []string{"*"}))
w := httptest.NewRecorder()
hub.PublishHandler(w, req)
@@ -104,8 +104,7 @@ func TestPublishNoData(t *testing.T) {
resp := w.Result()
defer resp.Body.Close()
- assert.Equal(t, http.StatusBadRequest, resp.StatusCode)
- assert.Equal(t, "Missing \"data\" parameter\n", w.Body.String())
+ assert.Equal(t, http.StatusOK, resp.StatusCode)
}
func TestPublishInvalidRetry(t *testing.T) {
@@ -130,13 +129,13 @@ func TestPublishInvalidRetry(t *testing.T) {
assert.Equal(t, "Invalid \"retry\" parameter\n", w.Body.String())
}
-func TestPublishNotAuthorizedTarget(t *testing.T) {
+func TestPublishNotAuthorizedTopicSelector(t *testing.T) {
hub := createDummy()
form := url.Values{}
form.Add("topic", "http://example.com/books/1")
form.Add("data", "foo")
- form.Add("target", "not-allowed")
+ form.Add("private", "on")
req := httptest.NewRequest("POST", defaultHubURL, strings.NewReader(form.Encode()))
req.Header.Add("Content-Type", "application/x-www-form-urlencoded")
@@ -155,10 +154,9 @@ func TestPublishOK(t *testing.T) {
hub := createDummy()
defer hub.Stop()
- s := newSubscriber("")
+ s := newSubscriber("", newTopicSelectorStore())
s.Topics = []string{"http://example.com/books/1"}
- s.RawTopics = s.Topics
- s.Targets = map[string]struct{}{"foo": {}}
+ s.Claims = &claims{Mercure: mercureClaim{Subscribe: s.Topics}}
go s.start()
err := hub.transport.AddSubscriber(s)
@@ -172,22 +170,20 @@ func TestPublishOK(t *testing.T) {
assert.True(t, ok)
require.NotNil(t, u)
assert.Equal(t, "id", u.ID)
- assert.Equal(t, []string{"http://example.com/books/1"}, u.Topics)
+ assert.Equal(t, s.Topics, u.Topics)
assert.Equal(t, "Hello!", u.Data)
- assert.Equal(t, struct{}{}, u.Targets["foo"])
- assert.Equal(t, struct{}{}, u.Targets["bar"])
+ assert.True(t, u.Private)
}(&wg)
form := url.Values{}
form.Add("id", "id")
form.Add("topic", "http://example.com/books/1")
form.Add("data", "Hello!")
- form.Add("target", "foo")
- form.Add("target", "bar")
+ form.Add("private", "on")
req := httptest.NewRequest("POST", defaultHubURL, strings.NewReader(form.Encode()))
req.Header.Add("Content-Type", "application/x-www-form-urlencoded")
- req.Header.Add("Authorization", "Bearer "+createDummyAuthorizedJWT(hub, publisherRole, []string{"foo", "bar"}))
+ req.Header.Add("Authorization", "Bearer "+createDummyAuthorizedJWT(hub, publisherRole, s.Topics))
w := httptest.NewRecorder()
hub.PublishHandler(w, req)
@@ -206,10 +202,8 @@ func TestPublishGenerateUUID(t *testing.T) {
h := createDummy()
defer h.Stop()
- s := newSubscriber("")
+ s := newSubscriber("", newTopicSelectorStore())
s.Topics = []string{"http://example.com/books/1"}
- s.RawTopics = s.Topics
- s.Targets = map[string]struct{}{"foo": {}}
go s.start()
h.transport.AddSubscriber(s)
@@ -231,7 +225,6 @@ func TestPublishGenerateUUID(t *testing.T) {
req := httptest.NewRequest("POST", defaultHubURL, strings.NewReader(form.Encode()))
req.Header.Add("Content-Type", "application/x-www-form-urlencoded")
- //req.AddCookie(&http.Cookie{Name: "mercureAuthorization", Value: createDummyAuthorizedJWT(hub, publisherRole, []string{})})
req.Header.Add("Authorization", "Bearer "+createDummyAuthorizedJWT(h, publisherRole, []string{}))
w := httptest.NewRecorder()
@@ -264,12 +257,11 @@ func TestPublishWithErrorInTransport(t *testing.T) {
form.Add("id", "id")
form.Add("topic", "http://example.com/books/1")
form.Add("data", "Hello!")
- form.Add("target", "foo")
- form.Add("target", "bar")
+ form.Add("private", "on")
req := httptest.NewRequest("POST", defaultHubURL, strings.NewReader(form.Encode()))
req.Header.Add("Content-Type", "application/x-www-form-urlencoded")
- req.Header.Add("Authorization", "Bearer "+createDummyAuthorizedJWT(hub, publisherRole, []string{"foo", "bar"}))
+ req.Header.Add("Authorization", "Bearer "+createDummyAuthorizedJWT(hub, publisherRole, []string{"foo", "http://example.com/books/1"}))
w := httptest.NewRecorder()
hub.PublishHandler(w, req)
diff --git a/hub/subscribe.go b/hub/subscribe.go
index c31d7881..bcbd6bd2 100644
--- a/hub/subscribe.go
+++ b/hub/subscribe.go
@@ -4,14 +4,11 @@ import (
"encoding/json"
"fmt"
"io"
- "net"
"net/http"
"net/url"
- "strings"
"time"
log "github.com/sirupsen/logrus"
- "github.com/yosida95/uritemplate"
)
type subscription struct {
@@ -20,7 +17,6 @@ type subscription struct {
Topic string `json:"topic"`
Active bool `json:"active"`
mercureClaim
- Address string `json:"address,omitempty"`
}
// SubscribeHandler creates a keep alive connection and sends the events to the subscribers.
@@ -77,14 +73,14 @@ func (h *Hub) SubscribeHandler(w http.ResponseWriter, r *http.Request) {
// registerSubscriber initializes the connection.
func (h *Hub) registerSubscriber(w http.ResponseWriter, r *http.Request, debug bool) *Subscriber {
- s := newSubscriber(retrieveLastEventID(r))
+ s := newSubscriber(retrieveLastEventID(r), h.topicSelectorStore)
s.Debug = debug
s.LogFields["remote_addr"] = r.RemoteAddr
claims, err := authorize(r, h.getJWTKey(subscriberRole), h.getJWTAlgorithm(subscriberRole), nil)
if claims != nil {
s.Claims = claims
- s.LogFields["subscriber_targets"] = claims.Mercure.Subscribe
+ s.LogFields["subscriber_topic_selectors"] = claims.Mercure.Subscribe
}
if err != nil || (claims == nil && !h.config.GetBool("allow_anonymous")) {
http.Error(w, http.StatusText(http.StatusUnauthorized), http.StatusUnauthorized)
@@ -99,16 +95,11 @@ func (h *Hub) registerSubscriber(w http.ResponseWriter, r *http.Request, debug b
}
s.LogFields["subscriber_topics"] = s.Topics
- s.RawTopics, s.TemplateTopics = h.parseTopics(s.Topics)
s.EscapedTopics = escapeTopics(s.Topics)
- s.AllTargets, s.Targets = authorizedTargets(claims, false)
s.RemoteAddr = r.RemoteAddr
go s.start()
- if h.config.GetBool("subscriptions_include_ip") {
- s.RemoteHost, _, _ = net.SplitHostPort(r.RemoteAddr)
- }
h.dispatchSubscriptionUpdate(s, true)
if err := h.transport.AddSubscriber(s); err != nil {
http.Error(w, http.StatusText(http.StatusServiceUnavailable), http.StatusServiceUnavailable)
@@ -124,40 +115,6 @@ func (h *Hub) registerSubscriber(w http.ResponseWriter, r *http.Request, debug b
return s
}
-func (h *Hub) parseTopics(topics []string) (rawTopics []string, templateTopics []*uritemplate.Template) {
- rawTopics = make([]string, 0, len(topics))
- templateTopics = make([]*uritemplate.Template, 0, len(topics))
- for _, topic := range topics {
- if tpl := h.getURITemplate(topic); tpl == nil {
- rawTopics = append(rawTopics, topic)
- } else {
- templateTopics = append(templateTopics, tpl)
- }
- }
-
- return rawTopics, templateTopics
-}
-
-// getURITemplate retrieves or creates the uritemplate.Template associated with this topic, or nil if it's not a template.
-func (h *Hub) getURITemplate(topic string) *uritemplate.Template {
- var tpl *uritemplate.Template
- h.uriTemplates.Lock()
- defer h.uriTemplates.Unlock()
- if tplCache, ok := h.uriTemplates.m[topic]; ok {
- tpl = tplCache.template
- tplCache.counter++
-
- return tpl
- }
- if strings.Contains(topic, "{") { // If it's definitely not an URI template, skip to save some resources
- tpl, _ = uritemplate.New(topic) // Returns nil in case of error, will be considered as a raw string
- }
-
- h.uriTemplates.m[topic] = &templateCache{1, tpl}
-
- return tpl
-}
-
// sendHeaders sends correct HTTP headers to create a keep-alive connection.
func sendHeaders(w http.ResponseWriter) {
// Keep alive, useful only for HTTP 1 clients https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Keep-Alive
@@ -222,24 +179,6 @@ func (h *Hub) shutdown(s *Subscriber) {
h.dispatchSubscriptionUpdate(s, false)
log.WithFields(s.LogFields).Info("Subscriber disconnected")
h.metrics.SubscriberDisconnect(s)
-
- // Remove unused uritemplate.Template instances from memory.
- keys := make([]string, 0, len(s.RawTopics)+len(s.TemplateTopics))
- copy(s.RawTopics, keys)
- for _, uriTemplate := range s.TemplateTopics {
- keys = append(keys, uriTemplate.Raw())
- }
-
- h.uriTemplates.Lock()
- for _, key := range keys {
- counter := h.uriTemplates.m[key].counter
- if counter == 0 {
- delete(h.uriTemplates.m, key)
- } else {
- h.uriTemplates.m[key].counter = counter - 1
- }
- }
- h.uriTemplates.Unlock()
}
func (h *Hub) dispatchSubscriptionUpdate(s *Subscriber, active bool) {
@@ -249,11 +188,10 @@ func (h *Hub) dispatchSubscriptionUpdate(s *Subscriber, active bool) {
for k, topic := range s.Topics {
connection := &subscription{
- ID: "https://mercure.rocks/subscriptions/" + s.EscapedTopics[k] + "/" + s.EscapedID,
- Type: "https://mercure.rocks/Subscription",
- Topic: topic,
- Active: active,
- Address: s.RemoteHost,
+ ID: "/.well-known/mercure/subscriptions/" + s.EscapedID + "/" + s.EscapedTopics[k],
+ Type: "https://mercure.rocks/Subscription",
+ Topic: topic,
+ Active: active,
}
if s.Claims != nil {
@@ -271,12 +209,7 @@ func (h *Hub) dispatchSubscriptionUpdate(s *Subscriber, active bool) {
panic(err)
}
- u := newUpdate(
- Event{Data: string(json)},
- []string{connection.ID},
- map[string]struct{}{"https://mercure.rocks/targets/subscriptions": {}, "https://mercure.rocks/targets/subscriptions/" + s.EscapedTopics[k]: {}},
- )
-
+ u := newUpdate([]string{connection.ID}, true, Event{Data: string(json)})
h.transport.Dispatch(u)
}
}
diff --git a/hub/subscribe_test.go b/hub/subscribe_test.go
index 4a363f75..e13bc791 100644
--- a/hub/subscribe_test.go
+++ b/hub/subscribe_test.go
@@ -287,7 +287,7 @@ func TestUnsubscribe(t *testing.T) {
wg.Wait()
}
-func TestSubscribeTarget(t *testing.T) {
+func TestSubscribePrivate(t *testing.T) {
hub := createDummy()
hub.config.Set("debug", true)
s, _ := hub.transport.(*LocalTransport)
@@ -303,19 +303,19 @@ func TestSubscribeTarget(t *testing.T) {
}
hub.transport.Dispatch(&Update{
- Targets: map[string]struct{}{"baz": {}},
Topics: []string{"http://example.com/reviews/21"},
Event: Event{Data: "Foo", ID: "a"},
+ Private: true,
})
hub.transport.Dispatch(&Update{
- Targets: map[string]struct{}{},
Topics: []string{"http://example.com/reviews/22"},
Event: Event{Data: "Hello World", ID: "b", Type: "test"},
+ Private: true,
})
hub.transport.Dispatch(&Update{
- Targets: map[string]struct{}{"hello": {}, "bar": {}},
Topics: []string{"http://example.com/reviews/23"},
Event: Event{Data: "Great", ID: "c", Retry: 1},
+ Private: true,
})
return
}
@@ -323,7 +323,7 @@ func TestSubscribeTarget(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
req := httptest.NewRequest("GET", defaultHubURL+"?topic=http://example.com/reviews/{id}", nil).WithContext(ctx)
- req.AddCookie(&http.Cookie{Name: "mercureAuthorization", Value: createDummyAuthorizedJWT(hub, subscriberRole, []string{"foo", "bar"})})
+ req.AddCookie(&http.Cookie{Name: "mercureAuthorization", Value: createDummyAuthorizedJWT(hub, subscriberRole, []string{"http://example.com/reviews/22", "http://example.com/reviews/23"})})
w := &responseTester{
expectedStatusCode: http.StatusOK,
@@ -339,7 +339,6 @@ func TestSubscribeTarget(t *testing.T) {
func TestSubscriptionEvents(t *testing.T) {
hub := createDummy()
hub.config.Set("dispatch_subscriptions", true)
- hub.config.Set("subscriptions_include_ip", true)
var wg sync.WaitGroup
ctx1, cancel1 := context.WithCancel(context.Background())
@@ -348,8 +347,8 @@ func TestSubscriptionEvents(t *testing.T) {
go func() {
// Authorized to receive connection events
defer wg.Done()
- req := httptest.NewRequest("GET", defaultHubURL+"?topic=https://mercure.rocks/subscriptions/{topic}/{connectionID}", nil).WithContext(ctx1)
- req.AddCookie(&http.Cookie{Name: "mercureAuthorization", Value: createDummyAuthorizedJWT(hub, subscriberRole, []string{"https://mercure.rocks/targets/subscriptions"})})
+ req := httptest.NewRequest("GET", defaultHubURL+"?topic=/.well-known/mercure/subscriptions/{subscriptionID}/{topic}", nil).WithContext(ctx1)
+ req.AddCookie(&http.Cookie{Name: "mercureAuthorization", Value: createDummyAuthorizedJWT(hub, subscriberRole, []string{"/.well-known/mercure/subscriptions/{subscriptionID}/{topic}"})})
w := httptest.NewRecorder()
hub.SubscribeHandler(w, req)
@@ -359,20 +358,20 @@ func TestSubscriptionEvents(t *testing.T) {
assert.Equal(t, http.StatusOK, resp.StatusCode)
bodyContent := string(body)
- assert.Contains(t, bodyContent, `data: "@id": "https://mercure.rocks/subscriptions/https%3A%2F%2Fexample.com/`)
+ assert.Contains(t, bodyContent, `data: "@id": "/.well-known/mercure/subscriptions/`)
+ assert.Contains(t, bodyContent, `/https%3A%2F%2Fexample.com`)
assert.Contains(t, bodyContent, `data: "@type": "https://mercure.rocks/Subscription",`)
assert.Contains(t, bodyContent, `data: "topic": "https://example.com",`)
assert.Contains(t, bodyContent, `data: "publish": [],`)
assert.Contains(t, bodyContent, `data: "subscribe": []`)
assert.Contains(t, bodyContent, `data: "active": true,`)
assert.Contains(t, bodyContent, `data: "active": false,`)
- assert.Contains(t, bodyContent, `data: "address": "`)
}()
go func() {
// Not authorized to receive connection events
defer wg.Done()
- req := httptest.NewRequest("GET", defaultHubURL+"?topic=https://mercure.rocks/subscriptions/{topic}/{connectionID}", nil).WithContext(ctx2)
+ req := httptest.NewRequest("GET", defaultHubURL+"?topic=/.well-known/mercure/subscriptions/{subscriptionID}/{topic}", nil).WithContext(ctx2)
req.AddCookie(&http.Cookie{Name: "mercureAuthorization", Value: createDummyAuthorizedJWT(hub, subscriberRole, []string{})})
w := httptest.NewRecorder()
hub.SubscribeHandler(w, req)
@@ -419,7 +418,7 @@ func TestSubscriptionEvents(t *testing.T) {
hub.Stop()
}
-func TestSubscribeAllTargets(t *testing.T) {
+func TestSubscribeAll(t *testing.T) {
hub := createDummy()
s, _ := hub.transport.(*LocalTransport)
@@ -434,14 +433,14 @@ func TestSubscribeAllTargets(t *testing.T) {
}
hub.transport.Dispatch(&Update{
- Targets: map[string]struct{}{"foo": {}},
Topics: []string{"http://example.com/reviews/21"},
Event: Event{Data: "Foo", ID: "a"},
+ Private: true,
})
hub.transport.Dispatch(&Update{
- Targets: map[string]struct{}{"bar": {}},
Topics: []string{"http://example.com/reviews/22"},
Event: Event{Data: "Hello World", ID: "b", Type: "test"},
+ Private: true,
})
return
diff --git a/hub/subscriber.go b/hub/subscriber.go
index cae93371..25e3ac83 100644
--- a/hub/subscriber.go
+++ b/hub/subscriber.go
@@ -5,7 +5,6 @@ import (
"github.com/gofrs/uuid"
log "github.com/sirupsen/logrus"
- "github.com/yosida95/uritemplate"
)
type updateSource struct {
@@ -15,29 +14,24 @@ type updateSource struct {
// Subscriber represents a client subscribed to a list of topics.
type Subscriber struct {
- ID string
- EscapedID string
- Claims *claims
- Targets map[string]struct{}
- Topics []string
- EscapedTopics []string
- RawTopics []string
- TemplateTopics []*uritemplate.Template
- LastEventID string
- RemoteAddr string
- RemoteHost string
- LogFields log.Fields
- AllTargets bool
- Debug bool
-
- out chan *Update
- disconnected chan struct{}
- matchCache map[string]bool
- history updateSource
- live updateSource
+ ID string
+ EscapedID string
+ Claims *claims
+ Topics []string
+ EscapedTopics []string
+ LastEventID string
+ RemoteAddr string
+ LogFields log.Fields
+ Debug bool
+
+ out chan *Update
+ disconnected chan struct{}
+ history updateSource
+ live updateSource
+ topicSelectorStore *topicSelectorStore
}
-func newSubscriber(lastEventID string) *Subscriber {
+func newSubscriber(lastEventID string, uriTemplates *topicSelectorStore) *Subscriber {
id := "urn:uuid:" + uuid.Must(uuid.NewV4()).String()
s := &Subscriber{
ID: id,
@@ -47,11 +41,11 @@ func newSubscriber(lastEventID string) *Subscriber {
"subscriber_id": id,
"last_event_id": lastEventID,
},
- history: updateSource{},
- live: updateSource{in: make(chan *Update)},
- out: make(chan *Update),
- disconnected: make(chan struct{}),
- matchCache: make(map[string]bool),
+ history: updateSource{},
+ live: updateSource{in: make(chan *Update)},
+ out: make(chan *Update),
+ disconnected: make(chan struct{}),
+ topicSelectorStore: uriTemplates,
}
if lastEventID != "" {
@@ -64,6 +58,7 @@ func newSubscriber(lastEventID string) *Subscriber {
// start stores incoming updates in an history and a live buffer and dispatch them.
// Updates coming from the history are always dispatched first.
func (s *Subscriber) start() {
+ defer s.cleanup()
for {
select {
case <-s.disconnected:
@@ -91,6 +86,13 @@ func (s *Subscriber) start() {
}
}
+func (s *Subscriber) cleanup() {
+ s.topicSelectorStore.cleanup(s.Topics)
+ if s.Claims != nil && s.Claims.Mercure.Subscribe != nil {
+ s.topicSelectorStore.cleanup(s.Claims.Mercure.Subscribe)
+ }
+}
+
// outChan returns the out channel if buffers aren't empty, or nil to block.
func (s *Subscriber) outChan() chan<- *Update {
if len(s.live.buffer) > 0 || len(s.history.buffer) > 0 {
@@ -163,62 +165,15 @@ func (s *Subscriber) Disconnected() <-chan struct{} {
// CanDispatch checks if an update can be dispatched to this subsriber.
func (s *Subscriber) CanDispatch(u *Update) bool {
- if !s.IsAuthorized(u) {
- log.WithFields(createFields(u, s)).Debug("Subscriber not authorized to receive this update (no targets matching)")
+ if !canReceive(s.topicSelectorStore, u.Topics, s.Topics) {
+ log.WithFields(createFields(u, s)).Debug("Subscriber has not subscribed to this update")
return false
}
- if !s.IsSubscribed(u) {
- log.WithFields(createFields(u, s)).Debug("Subscriber has not subscribed to this update (no topics matching)")
+ if u.Private && (s.Claims == nil || s.Claims.Mercure.Subscribe == nil || !canReceive(s.topicSelectorStore, u.Topics, s.Claims.Mercure.Subscribe)) {
+ log.WithFields(createFields(u, s)).Debug("Subscriber not authorized to receive this update")
return false
}
return true
}
-
-// IsAuthorized checks if the subscriber can access to at least one of the update's intended targets.
-// Don't forget to also call IsSubscribed.
-func (s *Subscriber) IsAuthorized(u *Update) bool {
- if s.AllTargets || len(u.Targets) == 0 {
- return true
- }
-
- for t := range s.Targets {
- if _, ok := u.Targets[t]; ok {
- return true
- }
- }
-
- return false
-}
-
-// IsSubscribed checks if the subscriber has subscribed to this update.
-// Don't forget to also call IsAuthorized.
-func (s *Subscriber) IsSubscribed(u *Update) bool {
- for _, ut := range u.Topics {
- if match, ok := s.matchCache[ut]; ok {
- if match {
- return true
- }
- continue
- }
-
- for _, rt := range s.RawTopics {
- if ut == rt {
- s.matchCache[ut] = true
- return true
- }
- }
-
- for _, tt := range s.TemplateTopics {
- if tt.Match(ut) != nil {
- s.matchCache[ut] = true
- return true
- }
- }
-
- s.matchCache[ut] = false
- }
-
- return false
-}
diff --git a/hub/subscriber_test.go b/hub/subscriber_test.go
index 835119a1..eacaf05e 100644
--- a/hub/subscriber_test.go
+++ b/hub/subscriber_test.go
@@ -7,25 +7,9 @@ import (
"github.com/stretchr/testify/assert"
)
-func TestIsSubscribed(t *testing.T) {
- s := newSubscriber("")
- s.Topics = []string{"foo", "bar"}
- s.RawTopics = s.Topics
-
- assert.Len(t, s.matchCache, 0)
- assert.False(t, s.IsSubscribed(&Update{Topics: []string{"baz", "bat"}}))
- assert.True(t, s.IsSubscribed(&Update{Topics: []string{"baz", "bar"}}))
- assert.Len(t, s.matchCache, 3)
-
- // assert cache is used
- assert.True(t, s.IsSubscribed(&Update{Topics: []string{"bar", "qux"}}))
- assert.Len(t, s.matchCache, 3)
-}
-
func TestDispatch(t *testing.T) {
- s := newSubscriber("1")
+ s := newSubscriber("1", newTopicSelectorStore())
s.Topics = []string{"http://example.com"}
- s.RawTopics = s.Topics
go s.start()
defer s.Disconnect()
@@ -44,7 +28,7 @@ func TestDispatch(t *testing.T) {
}
func TestDisconnect(t *testing.T) {
- s := newSubscriber("")
+ s := newSubscriber("", newTopicSelectorStore())
s.Disconnect()
// can be called two times without crashing
s.Disconnect()
diff --git a/hub/topic_selector.go b/hub/topic_selector.go
new file mode 100644
index 00000000..83ec98ee
--- /dev/null
+++ b/hub/topic_selector.go
@@ -0,0 +1,86 @@
+package hub
+
+import (
+ "strings"
+ "sync"
+
+ "github.com/yosida95/uritemplate"
+)
+
+type selector struct {
+ sync.Mutex
+ // counter stores the number of subsribers currently using this topic
+ counter uint32
+ // the uritemplate.Template instance, of nil if it's a raw string
+ template *uritemplate.Template
+ matchCache map[string]bool
+}
+
+// topicSelectorStore caches uritemplate.Template to improve memory and CPU usage.
+type topicSelectorStore struct {
+ sync.Mutex
+ m map[string]*selector
+}
+
+func newTopicSelectorStore() *topicSelectorStore {
+ return &topicSelectorStore{m: make(map[string]*selector)}
+}
+
+func (tss *topicSelectorStore) match(topic, topicSelector string, addToCache bool) bool {
+ // Always do an exact matching comparision first
+ // Also check if the topic selector is the reserved keyword *
+ if topicSelector == "*" || topic == topicSelector {
+ return true
+ }
+
+ templateStore := tss.getTemplateStore(topicSelector, addToCache)
+ templateStore.Lock()
+ defer templateStore.Unlock()
+ if match, ok := templateStore.matchCache[topic]; ok {
+ return match
+ }
+
+ match := templateStore.template != nil && templateStore.template.Match(topic) != nil
+ templateStore.matchCache[topic] = match
+
+ return match
+}
+
+// getTemplateStore retrieves or creates the uritemplate.Template associated with this topic, or nil if it's not a template.
+func (tss *topicSelectorStore) getTemplateStore(topicSelector string, addToCache bool) (s *selector) {
+ tss.Lock()
+ defer tss.Unlock()
+ if store, ok := tss.m[topicSelector]; ok {
+ if addToCache {
+ store.counter++
+ }
+
+ return store
+ }
+
+ s = &selector{matchCache: make(map[string]bool)}
+ if strings.Contains(topicSelector, "{") { // If it's definitely not an URI template, skip to save some resources
+ s.template, _ = uritemplate.New(topicSelector) // Returns nil in case of error, will be considered as a raw string
+ }
+
+ if addToCache {
+ tss.m[topicSelector] = s
+ }
+
+ return s
+}
+
+// Remove unused uritemplate.Template instances from memory.
+func (tss *topicSelectorStore) cleanup(topics []string) {
+ tss.Lock()
+ defer tss.Unlock()
+ for _, topic := range topics {
+ if tc, ok := tss.m[topic]; ok {
+ if tc.counter <= 0 {
+ delete(tss.m, topic)
+ } else {
+ tc.counter--
+ }
+ }
+ }
+}
diff --git a/hub/topic_selector_test.go b/hub/topic_selector_test.go
new file mode 100644
index 00000000..e206bf49
--- /dev/null
+++ b/hub/topic_selector_test.go
@@ -0,0 +1,7 @@
+package hub
+
+import "testing"
+
+func TestMatch(t *testing.T) {
+ // TODO
+}
diff --git a/hub/transport_test.go b/hub/transport_test.go
index 9d3b0e76..b568a038 100644
--- a/hub/transport_test.go
+++ b/hub/transport_test.go
@@ -15,16 +15,12 @@ func TestLocalTransportDoNotDispatchUntilListen(t *testing.T) {
defer transport.Close()
assert.Implements(t, (*Transport)(nil), transport)
- u := &Update{
- Topics: []string{"http://example.com/books/1"},
- }
+ u := &Update{Topics: []string{"http://example.com/books/1"}}
err := transport.Dispatch(u)
require.Nil(t, err)
- s := newSubscriber("")
+ s := newSubscriber("", newTopicSelectorStore())
s.Topics = u.Topics
- s.RawTopics = u.Topics
- s.Targets = map[string]struct{}{"foo": {}}
go s.start()
err = transport.AddSubscriber(s)
@@ -57,9 +53,8 @@ func TestLocalTransportDispatch(t *testing.T) {
defer transport.Close()
assert.Implements(t, (*Transport)(nil), transport)
- s := newSubscriber("")
+ s := newSubscriber("", newTopicSelectorStore())
s.Topics = []string{"http://example.com/foo"}
- s.RawTopics = s.Topics
go s.start()
err := transport.AddSubscriber(s)
@@ -79,14 +74,16 @@ func TestLocalTransportClosed(t *testing.T) {
defer transport.Close()
assert.Implements(t, (*Transport)(nil), transport)
- s := newSubscriber("")
+ tss := newTopicSelectorStore()
+
+ s := newSubscriber("", tss)
err := transport.AddSubscriber(s)
require.Nil(t, err)
err = transport.Close()
assert.Nil(t, err)
- err = transport.AddSubscriber(newSubscriber(""))
+ err = transport.AddSubscriber(newSubscriber("", tss))
assert.Equal(t, err, ErrClosedTransport)
err = transport.Dispatch(&Update{})
@@ -100,13 +97,15 @@ func TestLiveCleanDisconnectedSubscribers(t *testing.T) {
transport := NewLocalTransport()
defer transport.Close()
- s1 := newSubscriber("")
+ tss := newTopicSelectorStore()
+
+ s1 := newSubscriber("", tss)
go s1.start()
err := transport.AddSubscriber(s1)
require.Nil(t, err)
- s2 := newSubscriber("")
+ s2 := newSubscriber("", tss)
go s2.start()
err = transport.AddSubscriber(s2)
@@ -132,9 +131,8 @@ func TestLiveReading(t *testing.T) {
defer transport.Close()
assert.Implements(t, (*Transport)(nil), transport)
- s := newSubscriber("")
+ s := newSubscriber("", newTopicSelectorStore())
s.Topics = []string{"https://example.com"}
- s.RawTopics = s.Topics
go s.start()
err := transport.AddSubscriber(s)
diff --git a/hub/update.go b/hub/update.go
index 38226882..632675c4 100644
--- a/hub/update.go
+++ b/hub/update.go
@@ -4,13 +4,13 @@ import "github.com/gofrs/uuid"
// Update represents an update to send to subscribers.
type Update struct {
- // The target audience.
- Targets map[string]struct{}
-
// The topics' Internationalized Resource Identifier (RFC3987) (will most likely be URLs).
// The first one is the canonical IRI, while next ones are alternate IRIs.
Topics []string
+ // Private updates can only be dispatched to subscribers authorized to receive them.
+ Private bool
+
// The Server-Sent Event to send.
Event
}
@@ -20,11 +20,11 @@ type serializedUpdate struct {
event string
}
-func newUpdate(event Event, topics []string, targets map[string]struct{}) *Update {
+func newUpdate(topics []string, private bool, event Event) *Update {
u := &Update{
- Event: event,
Topics: topics,
- Targets: targets,
+ Private: private,
+ Event: event,
}
if u.ID == "" {
u.ID = "urn:uuid:" + uuid.Must(uuid.NewV4()).String()
diff --git a/hub/update_test.go b/hub/update_test.go
new file mode 100644
index 00000000..cb0d274d
--- /dev/null
+++ b/hub/update_test.go
@@ -0,0 +1,22 @@
+package hub
+
+import (
+ "strings"
+ "testing"
+
+ "github.com/gofrs/uuid"
+ "github.com/stretchr/testify/assert"
+)
+
+func TestNewUpdate(t *testing.T) {
+ u := newUpdate([]string{"foo"}, true, Event{Retry: 3})
+
+ assert.Equal(t, []string{"foo"}, u.Topics)
+ assert.True(t, u.Private)
+ assert.Equal(t, uint64(3), u.Retry)
+
+ assert.True(t, strings.HasPrefix(u.ID, "urn:uuid:"))
+
+ _, err := uuid.FromString(strings.TrimPrefix(u.ID, "urn:uuid:"))
+ assert.Nil(t, err)
+}
diff --git a/public/app.js b/public/app.js
index 53dbf3ab..b8fd3190 100644
--- a/public/app.js
+++ b/public/app.js
@@ -4,7 +4,7 @@
const origin = window.location.origin;
const defaultTopic = origin + "/demo/books/1.jsonld";
const defaultJwt =
- "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJtZXJjdXJlIjp7InN1YnNjcmliZSI6WyJmb28iLCJiYXIiXSwicHVibGlzaCI6WyJmb28iXX19.afLx2f2ut3YgNVFStCx95Zm_UND1mZJ69OenXaDuZL8";
+ "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJtZXJjdXJlIjp7InB1Ymxpc2giOlsiKiJdLCJzdWJzY3JpYmUiOlsiaHR0cHM6Ly9leGFtcGxlLmNvbS9teS1wcml2YXRlLXRvcGljIiwiaHR0cDovL2xvY2FsaG9zdDozMDAwL2RlbW8vYm9va3Mve2lkfS5qc29ubGQiXSwicGF5bG9hZCI6eyJ1c2VyIjoiaHR0cHM6Ly9leGFtcGxlLmNvbS91c2Vycy9kdW5nbGFzIiwicmVtb3RlX2FkZHIiOiIxMjcuMC4wLjEifX19.bRUavgS2H9GyCHq7eoPUL_rZm2L7fGujtyyzUhiOsnw";
const updates = document.querySelector("#updates");
const settingsForm = document.forms.settings;
@@ -146,7 +146,7 @@ foo`;
publishForm.onsubmit = function (e) {
e.preventDefault();
const {
- elements: { topics, data, targets, id, type, retry },
+ elements: { topics, data, priv, id, type, retry },
} = this;
const body = new URLSearchParams({
@@ -157,10 +157,7 @@ foo`;
});
topics.value.split("\n").forEach((topic) => body.append("topic", topic));
- targets.value !== "" &&
- targets.value
- .split("\n")
- .forEach((target) => body.append("target", target));
+ priv.checked && body.append("private", "on")
const opt = { method: "POST", body };
if (settingsForm.authorization.value === "header")
diff --git a/public/index.html b/public/index.html
index b90a1dd6..68d7769d 100644
--- a/public/index.html
+++ b/public/index.html
@@ -62,11 +62,11 @@
Settings
Claim structure to use:
{
"mercure": {
- "subscribe": ["list of authorized targets, * for all, omit for public only"],
- "publish": ["list of authorized targets, * for all, omit to not allow to publish"]
+ "subscribe": ["list of topic selectors, * for all, omit for public only"],
+ "publish": ["list of topic selectors, * for all, omit to not allow to publish"]
}
}