Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: use registry for runtime messaging #121

Merged
merged 2 commits into from
Jul 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
260 changes: 128 additions & 132 deletions lib/next_ls.ex
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ defmodule NextLS do
:runtime_task_supervisor,
:dynamic_supervisor,
:extensions,
:extension_registry,
:registry,
:symbol_table
])

Expand All @@ -55,7 +55,8 @@ defmodule NextLS do
task_supervisor = Keyword.fetch!(args, :task_supervisor)
runtime_task_supervisor = Keyword.fetch!(args, :runtime_task_supervisor)
dynamic_supervisor = Keyword.fetch!(args, :dynamic_supervisor)
extension_registry = Keyword.fetch!(args, :extension_registry)

registry = Keyword.fetch!(args, :registry)
extensions = Keyword.get(args, :extensions, [NextLS.ElixirExtension])
cache = Keyword.fetch!(args, :cache)
symbol_table = Keyword.fetch!(args, :symbol_table)
Expand All @@ -72,9 +73,8 @@ defmodule NextLS do
task_supervisor: task_supervisor,
runtime_task_supervisor: runtime_task_supervisor,
dynamic_supervisor: dynamic_supervisor,
extension_registry: extension_registry,
registry: registry,
extensions: extensions,
runtime_tasks: nil,
ready: false,
client_capabilities: nil
)}
Expand Down Expand Up @@ -208,39 +208,44 @@ defmodule NextLS do
def handle_request(%TextDocumentFormatting{params: %{text_document: %{uri: uri}}}, lsp) do
document = lsp.assigns.documents[uri]

{_, %{runtime: runtime}} =
Enum.find(lsp.assigns.runtimes, fn {_name, %{uri: wuri}} -> String.starts_with?(uri, wuri) end)

with {:ok, {formatter, _}} <- Runtime.call(runtime, {Mix.Tasks.Format, :formatter_for_file, [".formatter.exs"]}),
{:ok, response} when is_binary(response) or is_list(response) <-
Runtime.call(runtime, {Kernel, :apply, [formatter, [Enum.join(document, "\n")]]}) do
{:reply,
[
%TextEdit{
new_text: IO.iodata_to_binary(response),
range: %Range{
start: %Position{line: 0, character: 0},
end: %Position{
line: length(document),
character: document |> List.last() |> String.length() |> Kernel.-(1) |> max(0)
}
}
}
], lsp}
else
{:error, :not_ready} ->
GenLSP.notify(lsp, %GenLSP.Notifications.WindowShowMessage{
params: %GenLSP.Structures.ShowMessageParams{
type: GenLSP.Enumerations.MessageType.info(),
message: "The NextLS runtime is still initializing!"
}
})

{:reply, nil, lsp}
[resp] =
dispatch(lsp.assigns.registry, :runtimes, fn entries ->
for {runtime, %{uri: wuri}} <- entries, String.starts_with?(uri, wuri) do
with {:ok, {formatter, _}} <-
Runtime.call(runtime, {Mix.Tasks.Format, :formatter_for_file, [".formatter.exs"]}),
{:ok, response} when is_binary(response) or is_list(response) <-
Runtime.call(runtime, {Kernel, :apply, [formatter, [Enum.join(document, "\n")]]}) do
{:reply,
[
%TextEdit{
new_text: IO.iodata_to_binary(response),
range: %Range{
start: %Position{line: 0, character: 0},
end: %Position{
line: length(document),
character: document |> List.last() |> String.length() |> Kernel.-(1) |> max(0)
}
}
}
], lsp}
else
{:error, :not_ready} ->
GenLSP.notify(lsp, %GenLSP.Notifications.WindowShowMessage{
params: %GenLSP.Structures.ShowMessageParams{
type: GenLSP.Enumerations.MessageType.info(),
message: "The NextLS runtime is still initializing!"
}
})

{:reply, nil, lsp}

_ ->
{:reply, nil, lsp}
end
end
end)

_ ->
{:reply, nil, lsp}
end
resp
end

def handle_request(%Shutdown{}, lsp) do
Expand All @@ -267,87 +272,79 @@ defmodule NextLS do
{:ok, _} =
DynamicSupervisor.start_child(
lsp.assigns.dynamic_supervisor,
{extension, cache: lsp.assigns.cache, registry: lsp.assigns.extension_registry, publisher: self()}
{extension, cache: lsp.assigns.cache, registry: lsp.assigns.registry, publisher: self()}
)
end

GenLSP.log(lsp, "[NextLS] Booting runtime...")

runtimes =
for %{uri: uri, name: name} <- lsp.assigns.workspace_folders do
token = token()
progress_start(lsp, token, "Initializing NextLS runtime for folder #{name}...")
GenLSP.log(lsp, "[NextLS] Booting runtimes...")

{:ok, runtime} =
DynamicSupervisor.start_child(
lsp.assigns.dynamic_supervisor,
{NextLS.Runtime,
task_supervisor: lsp.assigns.runtime_task_supervisor,
extension_registry: lsp.assigns.extension_registry,
working_dir: URI.parse(uri).path,
parent: self(),
logger: lsp.assigns.logger}
)
for %{uri: uri, name: name} <- lsp.assigns.workspace_folders do
token = token()
progress_start(lsp, token, "Initializing NextLS runtime for folder #{name}...")
parent = self()

Process.monitor(runtime)

{name,
%{uri: uri, runtime: runtime, refresh_ref: {token, "NextLS runtime for folder #{name} has initialized!"}}}
end

lsp = assign(lsp, runtimes: Map.new(runtimes))

tasks =
for {name, workspace} <- runtimes do
Task.Supervisor.async_nolink(lsp.assigns.task_supervisor, fn ->
with false <- wait_until(fn -> NextLS.Runtime.ready?(workspace.runtime) end) do
GenLSP.error(lsp, "[NextLS] Failed to start runtime for folder #{name}")
raise "Failed to boot runtime"
end
{:ok, runtime} =
DynamicSupervisor.start_child(
lsp.assigns.dynamic_supervisor,
{NextLS.Runtime,
name: name,
task_supervisor: lsp.assigns.runtime_task_supervisor,
registry: lsp.assigns.registry,
working_dir: URI.parse(uri).path,
uri: uri,
parent: self(),
on_initialized: fn status ->
if status == :ready do
progress_end(lsp, token, "NextLS runtime for folder #{name} has initialized!")
GenLSP.log(lsp, "[NextLS] Runtime for folder #{name} is ready...")
send(parent, {:runtime_ready, name, self()})
else
progress_end(lsp, token)
GenLSP.error(lsp, "[NextLS] Runtime for folder #{name} failed to initialize")
end
end,
logger: lsp.assigns.logger}
)

GenLSP.log(lsp, "[NextLS] Runtime for folder #{name} is ready...")
ref = Process.monitor(runtime)

{name, :ready}
end)
end
Process.put(ref, name)

refresh_refs =
tasks |> Enum.zip_with(runtimes, fn task, {_name, runtime} -> {task.ref, runtime.refresh_ref} end) |> Map.new()
{name, %{uri: uri, runtime: runtime}}
end

{:noreply,
assign(lsp,
refresh_refs: Map.merge(lsp.assigns.refresh_refs, refresh_refs),
runtime_tasks: tasks
)}
{:noreply, lsp}
end

def handle_notification(%TextDocumentDidSave{}, %{assigns: %{ready: false}} = lsp) do
{:noreply, lsp}
end

# TODO: add some test cases for saving files in multiple workspaces
def handle_notification(
%TextDocumentDidSave{
params: %GenLSP.Structures.DidSaveTextDocumentParams{text: text, text_document: %{uri: uri}}
},
%{assigns: %{ready: true}} = lsp
) do
for task <- Task.Supervisor.children(lsp.assigns.task_supervisor),
task not in for(t <- lsp.assigns.runtime_tasks, do: t.pid) do
for task <- Task.Supervisor.children(lsp.assigns.task_supervisor) do
Process.exit(task, :kill)
end

token = token()
refresh_refs =
dispatch(lsp.assigns.registry, :runtimes, fn entries ->
for {pid, %{name: name, uri: wuri}} <- entries, String.starts_with?(uri, wuri), into: %{} do
token = token()
progress_start(lsp, token, "Compiling...")

progress_start(lsp, token, "Compiling...")
runtimes = Enum.to_list(lsp.assigns.runtimes)
task =
Task.Supervisor.async_nolink(lsp.assigns.task_supervisor, fn ->
{name, Runtime.compile(pid)}
end)

tasks =
for {name, r} <- runtimes do
Task.Supervisor.async_nolink(lsp.assigns.task_supervisor, fn -> {name, Runtime.compile(r.runtime)} end)
end

refresh_refs =
tasks |> Enum.zip_with(runtimes, fn task, {_name, runtime} -> {task.ref, runtime.refresh_ref} end) |> Map.new()
{task.ref, {token, "Compiled!"}}
end
end)

{:noreply,
lsp
Expand All @@ -363,8 +360,7 @@ defmodule NextLS do
%TextDocumentDidChange{params: %{text_document: %{uri: uri}, content_changes: [%{text: text}]}},
lsp
) do
for task <- Task.Supervisor.children(lsp.assigns.task_supervisor),
task not in for(t <- lsp.assigns.runtime_tasks, do: t.pid) do
for task <- Task.Supervisor.children(lsp.assigns.task_supervisor) do
Process.exit(task, :kill)
end

Expand Down Expand Up @@ -420,30 +416,27 @@ defmodule NextLS do
{:noreply, lsp}
end

def handle_info({ref, resp}, %{assigns: %{refresh_refs: refs}} = lsp) when is_map_key(refs, ref) do
Process.demonitor(ref, [:flush])
{{token, msg}, refs} = Map.pop(refs, ref)
def handle_info({:runtime_ready, name, runtime_pid}, lsp) do
token = token()
progress_start(lsp, token, "Compiling...")

progress_end(lsp, token, msg)
task =
Task.Supervisor.async_nolink(lsp.assigns.task_supervisor, fn ->
{name, Runtime.compile(runtime_pid)}
end)

lsp =
case resp do
{name, :ready} ->
token = token()
progress_start(lsp, token, "Compiling...")
refresh_refs = Map.put(lsp.assigns.refresh_refs, task.ref, {token, "Compiled!"})

task =
Task.Supervisor.async_nolink(lsp.assigns.task_supervisor, fn ->
{name, Runtime.compile(lsp.assigns.runtimes[name].runtime)}
end)
{:noreply, assign(lsp, ready: true, refresh_refs: refresh_refs)}
end

assign(lsp, ready: true, refresh_refs: Map.put(refs, task.ref, {token, "Compiled!"}))
def handle_info({ref, _resp}, %{assigns: %{refresh_refs: refs}} = lsp) when is_map_key(refs, ref) do
Process.demonitor(ref, [:flush])
{{token, msg}, refs} = Map.pop(refs, ref)

_ ->
assign(lsp, refresh_refs: refs)
end
progress_end(lsp, token, msg)

{:noreply, lsp}
{:noreply, assign(lsp, refresh_refs: refs)}
end

def handle_info({:DOWN, ref, :process, _pid, _reason}, %{assigns: %{refresh_refs: refs}} = lsp)
Expand All @@ -455,35 +448,20 @@ defmodule NextLS do
{:noreply, assign(lsp, refresh_refs: refs)}
end

def handle_info({:DOWN, _ref, :process, runtime, _reason}, %{assigns: %{runtimes: runtimes}} = lsp) do
{name, _} = Enum.find(runtimes, fn {_name, %{runtime: r}} -> r == runtime end)
def handle_info({:DOWN, ref, :process, _runtime, _reason}, lsp) do
name = Process.get(ref)
Process.delete(ref)

GenLSP.error(lsp, "[NextLS] The runtime for #{name} has crashed")

{:noreply, assign(lsp, runtimes: Map.drop(runtimes, name))}
{:noreply, lsp}
end

def handle_info(message, lsp) do
GenLSP.log(lsp, "[NextLS] Unhanded message: #{inspect(message)}")
GenLSP.log(lsp, "[NextLS] Unhandled message: #{inspect(message)}")
{:noreply, lsp}
end

defp wait_until(cb) do
wait_until(120, cb)
end

defp wait_until(0, _cb) do
false
end

defp wait_until(n, cb) do
if cb.() do
true
else
Process.sleep(1000)
wait_until(n - 1, cb)
end
end

defp progress_start(lsp, token, msg) do
GenLSP.notify(lsp, %GenLSP.Notifications.DollarProgress{
params: %GenLSP.Structures.ProgressParams{
Expand Down Expand Up @@ -527,4 +505,22 @@ defmodule NextLS do

defp elixir_kind_to_lsp_kind(kind) when kind in [:def, :defp, :defmacro, :defmacrop],
do: GenLSP.Enumerations.SymbolKind.function()

# NOTE: this is only possible because the registry is not partitioned
# if it is partitioned, then the callback is called multiple times
# and this method of extracting the result doesn't really make sense
defp dispatch(registry, key, callback) do
ref = make_ref()
me = self()

Registry.dispatch(registry, key, fn entries ->
result = callback.(entries)

send(me, {ref, result})
end)

receive do
{^ref, result} -> result
end
end
end
2 changes: 1 addition & 1 deletion lib/next_ls/extensions/elixir_extension.ex
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ defmodule NextLS.ElixirExtension do
registry = Keyword.fetch!(args, :registry)
publisher = Keyword.fetch!(args, :publisher)

Registry.register(registry, :extension, :elixir)
Registry.register(registry, :extensions, :elixir)

{:ok, %{cache: cache, registry: registry, publisher: publisher}}
end
Expand Down
4 changes: 2 additions & 2 deletions lib/next_ls/lsp_supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,14 @@ defmodule NextLS.LSPSupervisor do
{GenLSP.Buffer, buffer_opts},
{NextLS.DiagnosticCache, name: :diagnostic_cache},
{NextLS.SymbolTable, name: :symbol_table, path: hidden_folder},
{Registry, name: NextLS.ExtensionRegistry, keys: :duplicate},
{Registry, name: NextLS.Registry, keys: :duplicate},
{NextLS,
cache: :diagnostic_cache,
symbol_table: :symbol_table,
task_supervisor: NextLS.TaskSupervisor,
runtime_task_supervisor: :runtime_task_supervisor,
dynamic_supervisor: NextLS.DynamicSupervisor,
extension_registry: NextLS.ExtensionRegistry}
registry: NextLS.Registry}
]

Supervisor.init(children, strategy: :one_for_one)
Expand Down
Loading
Loading