Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,12 @@ pub fn main() {
let assert Ok(session_b) = game.start(FireDice)
let assert Ok(session_c) = game.start(PlayChip)

chip.register(registry, GroupA, session_a)
chip.register(registry, GroupB, session_b)
chip.register(registry, GroupA, session_c)
chip.register(registry.data, GroupA, session_a.data)
chip.register(registry.data, GroupB, session_b.data)
chip.register(registry.data, GroupA, session_c.data)

chip.members(registry, GroupA, 50)
|> list.each(fn(session) { game.next(session) })
|> list.each(fn(session) { game.next(session.data) })
}
```

Expand Down
6 changes: 3 additions & 3 deletions gleam.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ pages = [

[dependencies]
gleam_stdlib = ">= 0.36.0 and < 2.0.0"
gleam_erlang = ">= 0.24.0 and < 2.0.0"
gleam_otp = ">= 0.10.0 and < 2.0.0"
lamb = ">= 0.1.0 and < 2.0.0"
gleam_erlang = ">= 1.0.0 and < 2.0.0"
gleam_otp = ">= 1.0.0 and < 2.0.0"
lamb = { git = "https://github.com/trescenzi/lamb.git", ref = "9b9b0e2" }

[dev-dependencies]
gleeunit = ">= 1.0.0 and < 2.0.0"
Expand Down
18 changes: 9 additions & 9 deletions manifest.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,20 @@
# You typically do not need to edit this file

packages = [
{ name = "benchee", version = "1.3.1", build_tools = ["mix"], requirements = ["deep_merge", "statistex", "table"], otp_app = "benchee", source = "hex", outer_checksum = "76224C58EA1D0391C8309A8ECBFE27D71062878F59BD41A390266BF4AC1CC56D" },
{ name = "benchee", version = "1.4.0", build_tools = ["mix"], requirements = ["deep_merge", "statistex", "table"], otp_app = "benchee", source = "hex", outer_checksum = "299CD10DD8CE51C9EA3DDB74BB150F93D25E968F93E4C1FA31698A8E4FA5D715" },
{ name = "deep_merge", version = "1.0.0", build_tools = ["mix"], requirements = [], otp_app = "deep_merge", source = "hex", outer_checksum = "CE708E5F094B9CD4E8F2BE4F00D2F4250C4095BE93F8CD6D018C753894885430" },
{ name = "gleam_erlang", version = "0.33.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleam_erlang", source = "hex", outer_checksum = "A1D26B80F01901B59AABEE3475DD4C18D27D58FA5C897D922FCB9B099749C064" },
{ name = "gleam_otp", version = "0.16.0", build_tools = ["gleam"], requirements = ["gleam_erlang", "gleam_stdlib"], otp_app = "gleam_otp", source = "hex", outer_checksum = "FA0EB761339749B4E82D63016C6A18C4E6662DA05BAB6F1346F9AF2E679E301A" },
{ name = "gleam_stdlib", version = "0.47.0", build_tools = ["gleam"], requirements = [], otp_app = "gleam_stdlib", source = "hex", outer_checksum = "3B22D46743C46498C8355365243327AC731ECD3959216344FA9CF9AD348620AC" },
{ name = "gleeunit", version = "1.2.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleeunit", source = "hex", outer_checksum = "F7A7228925D3EE7D0813C922E062BFD6D7E9310F0BEE585D3A42F3307E3CFD13" },
{ name = "lamb", version = "0.6.1", build_tools = ["gleam"], requirements = ["gleam_erlang", "gleam_stdlib"], otp_app = "lamb", source = "hex", outer_checksum = "A74714DE60B3BADB623DFFF910C843793AE660222A9AD63C70053D33C0C3D311" },
{ name = "gleam_erlang", version = "1.1.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleam_erlang", source = "hex", outer_checksum = "D7A2E71CE7F6B513E62F9A9EF6DFDE640D9607598C477FCCADEF751C45FD82E7" },
{ name = "gleam_otp", version = "1.0.0", build_tools = ["gleam"], requirements = ["gleam_erlang", "gleam_stdlib"], otp_app = "gleam_otp", source = "hex", outer_checksum = "7020E652D18F9ABAC9C877270B14160519FA0856EE80126231C505D719AD68DA" },
{ name = "gleam_stdlib", version = "0.60.0", build_tools = ["gleam"], requirements = [], otp_app = "gleam_stdlib", source = "hex", outer_checksum = "621D600BB134BC239CB2537630899817B1A42E60A1D46C5E9F3FAE39F88C800B" },
{ name = "gleeunit", version = "1.5.1", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleeunit", source = "hex", outer_checksum = "D33B7736CF0766ED3065F64A1EBB351E72B2E8DE39BAFC8ADA0E35E92A6A934F" },
{ name = "lamb", version = "0.6.1", build_tools = ["gleam"], requirements = ["gleam_erlang", "gleam_stdlib"], source = "git", repo = "https://github.com/trescenzi/lamb.git", commit = "9b9b0e2593213f0f94a0950a1193a810a57f1f1b" },
{ name = "statistex", version = "1.0.0", build_tools = ["mix"], requirements = [], otp_app = "statistex", source = "hex", outer_checksum = "FF9D8BEE7035028AB4742FF52FC80A2AA35CECE833CF5319009B52F1B5A86C27" },
]

[requirements]
benchee = { version = ">= 1.3.0 and < 2.0.0" }
gleam_erlang = { version = ">= 0.24.0 and < 2.0.0" }
gleam_otp = { version = ">= 0.10.0 and < 2.0.0" }
gleam_erlang = { version = ">= 1.0.0 and < 2.0.0" }
gleam_otp = { version = ">= 1.0.0 and < 2.0.0" }
gleam_stdlib = { version = ">= 0.36.0 and < 2.0.0" }
gleeunit = { version = ">= 1.0.0 and < 2.0.0" }
lamb = { version = ">= 0.1.0 and < 2.0.0" }
lamb = { git = "https://github.com/trescenzi/lamb.git", ref = "9b9b0e2" }
55 changes: 32 additions & 23 deletions src/chip.gleam
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,9 @@
//// automatically delist dead processes.

import gleam/dynamic
import gleam/erlang
import gleam/erlang/atom
import gleam/erlang/process.{type Pid, type Subject}
import gleam/function
import gleam/erlang/reference.{type Reference}
import gleam/otp/actor
import gleam/result.{try}
import lamb.{Bag, Private, Protected, Public, Set}
Expand Down Expand Up @@ -47,9 +46,12 @@ pub type Named {
/// ```gleam
/// > let _ = chip.start(chip.Named("sessions"))
/// ```
pub fn start(named: Named) -> Result(Registry(msg, group), actor.StartError) {
let init = fn() { init(named) }
actor.start_spec(actor.Spec(init: init, init_timeout: 100, loop: loop))
pub fn start(named: Named)
-> Result(actor.Started(Subject(Message(a, b))), actor.StartError) {
let init = fn(self) { init(self, named) }
actor.new_with_initialiser(100, init)
|> actor.on_message(loop)
|> actor.start()
}

/// Retrieves a previously named registry.
Expand Down Expand Up @@ -139,7 +141,7 @@ pub fn members(
group: group,
timeout: Int,
) -> List(Subject(msg)) {
let group_store = process.call(registry, GroupStore2(_), timeout)
let group_store = process.call(registry, timeout, GroupStore2)

lamb.lookup(group_store, group)
}
Expand All @@ -159,7 +161,7 @@ pub fn stop(registry: Registry(msg, group)) -> Nil {
// Server Code ::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::

type Monitor =
erlang.Reference
Reference

/// Chip's internal message type.
pub opaque type Message(msg, group) {
Expand All @@ -185,10 +187,8 @@ type ProcessDown {
ProcessDown(monitor: Monitor, pid: Pid)
}

fn init(
named: Named,
) -> actor.InitResult(State(msg, group), Message(msg, group)) {
let self = process.new_subject()
fn init(self, named: Named) {
//let self = process.new_subject()

let table = initialize_named_registries_store()

Expand All @@ -206,16 +206,19 @@ fn init(

let selector =
process.new_selector()
|> process.selecting(self, function.identity)
|> process.selecting_anything(process_down)
|> process.select(self)
|> process.select_other(process_down)

actor.Ready(state, selector)
actor.initialised(state)
|> actor.selecting(selector)
|> actor.returning(self)
|> Ok
}

fn loop(
message: Message(msg, group),
state: State(msg, group),
) -> actor.Next(Message(msg, group), State(msg, group)) {
message: Message(msg, group),
) -> actor.Next(State(msg, group), Message(msg, group)) {
case message {
GroupStore2(client) -> {
// priority is given through selective receive
Expand All @@ -228,9 +231,15 @@ fn loop(
Register(subject, group) -> {
let pid = process.subject_owner(subject)

let Nil = monitor(state.monitors, pid)
lamb.insert(state.monitors, pid, Nil)
lamb.insert(state.groups, group, subject)
case pid {
Ok(pid) -> {
let Nil = monitor(state.monitors, pid)
lamb.insert(state.monitors, pid, Nil)
lamb.insert(state.groups, group, subject)
}
Error(_) -> Nil
//TODO what do we want to do in this case?
}

state
|> actor.continue()
Expand All @@ -256,7 +265,7 @@ fn loop(
}

Stop -> {
actor.Stop(process.Normal)
actor.stop()
}
}
}
Expand Down Expand Up @@ -311,7 +320,7 @@ fn monitor(monitors: lamb.Table(Pid, Nil), pid: Pid) -> Nil {
case lamb.any(monitors, pid) {
True -> Nil
False -> {
let _monitor = process.monitor_process(pid)
let _monitor = process.monitor(pid)
Nil
}
}
Expand All @@ -321,7 +330,7 @@ fn monitor(monitors: lamb.Table(Pid, Nil), pid: Pid) -> Nil {
fn decode_down_message(message: dynamic.Dynamic) -> Result(ProcessDown, Nil)

fn schedulers() -> Int {
ffi_system_info(atom.create_from_string("schedulers"))
ffi_system_info(atom.create("schedulers"))
}

type Option =
Expand All @@ -331,7 +340,7 @@ type Option =
fn ffi_system_info(option: Option) -> Int

fn demonitor(reference: Monitor) -> Nil {
let _ = ffi_demonitor(reference, [atom.create_from_string("flush")])
let _ = ffi_demonitor(reference, [atom.create("flush")])
Nil
}

Expand Down
49 changes: 19 additions & 30 deletions test/artifacts/clock.gleam
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
import chip
import gleam/erlang/process
import gleam/function.{identity}
import gleam/option
import gleam/otp/actor
import gleam/otp/supervisor

pub opaque type Message {
Inc
Expand All @@ -18,19 +15,10 @@ pub type Group {
}

pub fn start(registry: chip.Registry(Message, Group), group: Group, count: Int) {
let init = fn() { init(registry, group, count) }
actor.start_spec(actor.Spec(init: init, init_timeout: 10, loop: loop))
}

pub fn childspec(count) {
supervisor.worker(fn(param) {
let #(registry, group) = param
start(registry, group, count)
})
|> supervisor.returning(fn(param, _self) {
let #(registry, group) = param
#(registry, group, count)
})
let init = fn(self) { init(self, registry, group, count) }
actor.new_with_initialiser(10, init)
|> actor.on_message(loop)
|> actor.start()
}

pub fn stop(counter: process.Subject(Message)) -> Nil {
Expand All @@ -42,38 +30,39 @@ pub fn increment(counter: process.Subject(Message)) -> Nil {
}

pub fn current(counter: process.Subject(Message)) -> Int {
actor.call(counter, Current(_), 10)
actor.call(counter, 10, Current)
}

fn init(registry: chip.Registry(Message, Group), group: Group, count: Int) {
// Create a reference to self
let self = process.new_subject()

fn init(self, registry: chip.Registry(Message, Group), group: Group, count: Int) {
// Register the counter under an id on initialization
chip.register(registry, group, self)

let selector =
process.new_selector()
|> process.select(self)

// The registry may send messages through the self subject to this actor
// adding self to this actor selector will allow us to handle those messages.
actor.Ready(
count,
process.new_selector()
|> process.selecting(self, identity),
)

actor.initialised(count)
|> actor.selecting(selector)
|> actor.returning(self)
|> Ok
}

fn loop(message: Message, count: Int) {
fn loop(count: Int, message: Message) {
case message {
Inc -> {
actor.Continue(count + 1, option.None)
actor.continue(count + 1)
}

Current(client) -> {
process.send(client, count)
actor.Continue(count, option.None)
actor.continue(count)
}

Stop -> {
actor.Stop(process.Normal)
actor.stop()
}
}
}
42 changes: 22 additions & 20 deletions test/artifacts/game.gleam
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
import chip
import gleam/erlang/process
import gleam/function
import gleam/otp/actor
import gleam/otp/supervisor
import gleam/otp/supervision as supervisor

type SessionRegistry =
chip.Registry(Message, Int)
Expand All @@ -27,52 +26,55 @@ pub type Session {
}

pub fn start(action) {
actor.start(action, loop)
actor.new(action)
|> actor.on_message(loop)
|> actor.start
}

pub fn start_with(registry: SessionRegistry, id: Int, action: Action) {
let init = fn() { init(registry, id, action) }
actor.start_spec(actor.Spec(init: init, init_timeout: 10, loop: loop))
pub fn start_with(id: Int, registry: SessionRegistry, action: Action) {
let init = fn(self) { init(self, registry, id, action) }
actor.new_with_initialiser(10, init)
|> actor.on_message(loop)
|> actor.start
}

pub fn childspec(registry: SessionRegistry, action) {
supervisor.worker(fn(id) { start_with(registry, id, action) })
|> supervisor.returning(fn(id, _self) { id + 1 })
pub fn childspec(id: Int, registry: SessionRegistry, action) {
supervisor.worker(fn() { start_with(id, registry, action) })
}

pub fn next(game: Game) -> Nil {
actor.send(game, Next)
}

pub fn current(game: Game) -> String {
actor.call(game, Current(_), 100)
actor.call(game, 100, Current)
}

pub fn stop(game: Game) -> Nil {
actor.send(game, Stop)
}

fn init(registry, id, action) {
// Create a reference to self
let self = process.new_subject()

fn init(self, registry, id, action) {
// Register the counter under an id on initialization
chip.register(registry, id, self)

// The registry may send messages through the self subject to this actor
// adding self to this actor selector will allow us to handle those messages.
actor.Ready(
action,
let selector =
process.new_selector()
|> process.selecting(self, function.identity),
)
|> process.select(self)

actor.initialised(action)
|> actor.selecting(selector)
|> actor.returning(self)
|> Ok
}

fn loop(message, action) {
fn loop(action, message) {
case message {
Next -> next_state(action)
Current(client) -> send_unicode(client, action)
Stop -> actor.Stop(process.Normal)
Stop -> actor.stop()
}
}

Expand Down
5 changes: 2 additions & 3 deletions test/artifacts/pubsub.gleam
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import chip
import gleam/erlang/process
import gleam/list
import gleam/otp/supervisor
import gleam/otp/supervision

pub type PubSub(message, channel) =
chip.Registry(message, channel)
Expand All @@ -11,8 +11,7 @@ pub fn start() {
}

pub fn childspec() {
supervisor.worker(fn(_param) { start() })
|> supervisor.returning(fn(_param, pubsub) { pubsub })
supervision.worker(fn() { start() })
}

pub fn subscribe(
Expand Down
Loading