-
Notifications
You must be signed in to change notification settings - Fork 183
/
sessions.py
1345 lines (1149 loc) · 54.9 KB
/
sessions.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
from .collections import DottedDict
from .edit import apply_workspace_edit
from .edit import parse_workspace_edit
from .logging import debug
from .logging import exception_log
from .open import open_externally
from .open import open_file_and_center_async
from .promise import PackagedTask
from .promise import Promise
from .protocol import CodeAction
from .protocol import Command
from .protocol import CompletionItemTag
from .protocol import Error
from .protocol import ErrorCode
from .protocol import ExecuteCommandParams
from .protocol import Notification
from .protocol import Request
from .protocol import Response
from .protocol import WorkspaceFolder
from .settings import client_configs
from .transports import Transport
from .transports import TransportCallbacks
from .types import Capabilities
from .types import ClientConfig
from .types import ClientStates
from .types import debounced
from .types import diff
from .types import DocumentSelector
from .types import method_to_capability
from .types import SettingsRegistration
from .typing import Callable, cast, Dict, Any, Optional, List, Tuple, Generator, Type, Protocol, Mapping, Union
from .url import uri_to_filename
from .version import __version__
from .views import COMPLETION_KINDS
from .views import extract_variables
from .views import get_storage_path
from .views import SYMBOL_KINDS
from .workspace import is_subpath_of
from abc import ABCMeta
from abc import abstractmethod
from weakref import WeakSet
import functools
import os
import sublime
import weakref
InitCallback = Callable[['Session', bool], None]
class Manager(metaclass=ABCMeta):
"""
A Manager is a container of Sessions.
"""
# Observers
@abstractmethod
def window(self) -> sublime.Window:
"""
Get the window associated with this manager.
"""
pass
@abstractmethod
def sessions(self, view: sublime.View, capability: Optional[str] = None) -> 'Generator[Session, None, None]':
"""
Iterate over the sessions stored in this manager, applicable to the given view, with the given capability.
"""
pass
@abstractmethod
def get_project_path(self, file_path: str) -> Optional[str]:
"""
Get the project path for the given file.
"""
pass
# Mutators
@abstractmethod
def start_async(self, configuration: ClientConfig, initiating_view: sublime.View) -> None:
"""
Start a new Session with the given configuration. The initiating view is the view that caused this method to
be called.
A normal flow of calls would be start -> on_post_initialize -> do language server things -> on_post_exit.
However, it is possible that the subprocess cannot start, in which case on_post_initialize will never be called.
"""
pass
@abstractmethod
def update_diagnostics_panel_async(self) -> None:
pass
@abstractmethod
def show_diagnostics_panel_async(self) -> None:
pass
@abstractmethod
def hide_diagnostics_panel_async(self) -> None:
pass
# Event callbacks
@abstractmethod
def on_post_exit_async(self, session: 'Session', exit_code: int, exception: Optional[Exception]) -> None:
"""
The given Session has stopped with the given exit code.
"""
pass
def get_initialize_params(variables: Dict[str, str], workspace_folders: List[WorkspaceFolder],
config: ClientConfig) -> dict:
completion_kinds = list(range(1, len(COMPLETION_KINDS) + 1))
symbol_kinds = list(range(1, len(SYMBOL_KINDS) + 1))
completion_tag_value_set = [v for k, v in CompletionItemTag.__dict__.items() if not k.startswith('_')]
first_folder = workspace_folders[0] if workspace_folders else None
capabilities = {
"general": {
"regularExpressions": {
"engine": "ECMAScript"
}
},
"textDocument": {
"synchronization": {
"dynamicRegistration": True, # exceptional
"didSave": True,
"willSave": True,
"willSaveWaitUntil": True
},
"hover": {
"dynamicRegistration": True,
"contentFormat": ["markdown", "plaintext"]
},
"completion": {
"dynamicRegistration": True,
"completionItem": {
"snippetSupport": True,
"deprecatedSupport": True,
"documentationFormat": ["markdown", "plaintext"],
"tagSupport": {
"valueSet": completion_tag_value_set
}
},
"completionItemKind": {
"valueSet": completion_kinds
}
},
"signatureHelp": {
"dynamicRegistration": True,
"signatureInformation": {
"documentationFormat": ["markdown", "plaintext"],
"parameterInformation": {
"labelOffsetSupport": True
}
}
},
"references": {
"dynamicRegistration": True
},
"documentHighlight": {
"dynamicRegistration": True
},
"documentSymbol": {
"dynamicRegistration": True,
"hierarchicalDocumentSymbolSupport": True,
"symbolKind": {
"valueSet": symbol_kinds
}
},
"formatting": {
"dynamicRegistration": True # exceptional
},
"rangeFormatting": {
"dynamicRegistration": True
},
"declaration": {
"dynamicRegistration": True,
"linkSupport": True
},
"definition": {
"dynamicRegistration": True,
"linkSupport": True
},
"typeDefinition": {
"dynamicRegistration": True,
"linkSupport": True
},
"implementation": {
"dynamicRegistration": True,
"linkSupport": True
},
"codeAction": {
"dynamicRegistration": True,
"codeActionLiteralSupport": {
"codeActionKind": {
"valueSet": [
"quickfix",
"refactor",
"refactor.extract",
"refactor.inline",
"refactor.rewrite",
"source.organizeImports"
]
}
},
"dataSupport": True,
"resolveSupport": {
"properties": [
"edit",
"command"
]
}
},
"rename": {
"dynamicRegistration": True,
"prepareSupport": True
},
"colorProvider": {
"dynamicRegistration": True # exceptional
},
"publishDiagnostics": {
"relatedInformation": True
},
"selectionRange": {
"dynamicRegistration": True
}
},
"workspace": {
"applyEdit": True,
"didChangeConfiguration": {
"dynamicRegistration": True
},
"executeCommand": {},
"workspaceEdit": {
"documentChanges": True,
"failureHandling": "abort",
},
"workspaceFolders": True,
"symbol": {
"dynamicRegistration": True, # exceptional
"symbolKind": {
"valueSet": symbol_kinds
}
},
"configuration": True
},
"window": {
"showDocument": {
"support": True
},
"showMessage": {
"messageActionItem": {
"additionalPropertiesSupport": True
}
},
"workDoneProgress": True
}
}
if config.experimental_capabilities is not None:
capabilities['experimental'] = config.experimental_capabilities
return {
"processId": os.getpid(),
"clientInfo": {
"name": "Sublime Text LSP",
"version": ".".join(map(str, __version__))
},
"rootUri": first_folder.uri() if first_folder else None,
"rootPath": first_folder.path if first_folder else None,
"workspaceFolders": [folder.to_lsp() for folder in workspace_folders] if workspace_folders else None,
"capabilities": capabilities,
"initializationOptions": config.init_options.get_resolved(variables)
}
class SessionViewProtocol(Protocol):
session = None # type: Session
view = None # type: sublime.View
listener = None # type: Any
session_buffer = None # type: Any
def on_capability_added_async(self, registration_id: str, capability_path: str, options: Dict[str, Any]) -> None:
...
def on_capability_removed_async(self, registration_id: str, discarded_capabilities: Dict[str, Any]) -> None:
...
def has_capability_async(self, capability_path: str) -> bool:
...
def shutdown_async(self) -> None:
...
def present_diagnostics_async(self, flags: int) -> None:
...
def on_request_started_async(self, request_id: int, request: Request) -> None:
...
def on_request_finished_async(self, request_id: int) -> None:
...
class SessionBufferProtocol(Protocol):
session = None # type: Session
session_views = None # type: WeakSet[SessionViewProtocol]
file_name = None # type: str
language_id = None # type: str
def register_capability_async(
self,
registration_id: str,
capability_path: str,
registration_path: str,
options: Dict[str, Any]
) -> None:
...
def unregister_capability_async(
self,
registration_id: str,
capability_path: str,
registration_path: str
) -> None:
...
def on_diagnostics_async(self, diagnostics: List[Dict[str, Any]], version: Optional[int]) -> None:
...
class AbstractPlugin(metaclass=ABCMeta):
"""
Inherit from this class to handle non-standard requests and notifications.
Given a request/notification, replace the non-alphabetic characters with an underscore, and prepend it with "m_".
This will be the name of your method.
For instance, to implement the non-standard eslint/openDoc request, define the Python method
def m_eslint_openDoc(self, params, request_id):
session = self.weaksession()
if session:
webbrowser.open_tab(params['url'])
session.send_response(Response(request_id, None))
To handle the non-standard eslint/status notification, define the Python method
def m_eslint_status(self, params):
pass
To understand how this works, see the __getattr__ method of the Session class.
"""
@classmethod
@abstractmethod
def name(cls) -> str:
"""
A human-friendly name. If your plugin is called "LSP-foobar", then this should return "foobar". If you also
have your settings file called "LSP-foobar.sublime-settings", then you don't even need to re-implement the
configuration method (see below).
"""
raise NotImplementedError()
@classmethod
def configuration(cls) -> Tuple[sublime.Settings, str]:
"""
Return the Settings object that defines the "command", "languages", and optionally the "initializationOptions",
"default_settings", "env" and "tcp_port" as the first element in the tuple, and the path to the base settings
filename as the second element in the tuple.
The second element in the tuple is used to handle "settings" overrides from users properly. For example, if your
plugin is called LSP-foobar, you would return "Packages/LSP-foobar/LSP-foobar.sublime-settings".
The "command", "initializationOptions" and "env" are subject to template string substitution. The following
template strings are recognized:
$file
$file_base_name
$file_extension
$file_name
$file_path
$platform
$project
$project_base_name
$project_extension
$project_name
$project_path
These are just the values from window.extract_variables(). Additionally,
$storage_path The path to the package storage (see AbstractPlugin.storage_path)
$cache_path sublime.cache_path()
$temp_dir tempfile.gettempdir()
$home os.path.expanduser('~')
$port A random free TCP-port on localhost in case "tcp_port" is set to 0. This string template can only
be used in the "command"
The "command" and "env" are expanded upon starting the subprocess of the Session. The "initializationOptions"
are expanded upon doing the initialize request. "initializationOptions" does not expand $port.
When you're managing your own server binary, you would typically place it in sublime.cache_path(). So your
"command" should look like this: "command": ["$cache_path/LSP-foobar/server_binary", "--stdio"]
"""
name = cls.name()
basename = "LSP-{}.sublime-settings".format(name)
filepath = "Packages/LSP-{}/{}".format(name, basename)
return sublime.load_settings(basename), filepath
@classmethod
def additional_variables(cls) -> Optional[Dict[str, str]]:
"""
In addition to the above variables, add more variables here to be expanded.
"""
return None
@classmethod
def storage_path(cls) -> str:
"""
The storage path. Use this as your base directory to install server files. Its path is '$DATA/Package Storage'.
You should have an additional subdirectory preferrably the same name as your plugin. For instance:
```python
from LSP.plugin import AbstractPlugin
import os
class MyPlugin(AbstractPlugin):
@classmethod
def name(cls) -> str:
return "my-plugin"
@classmethod
def basedir(cls) -> str:
# Do everything relative to this directory
return os.path.join(cls.storage_path(), cls.name())
```
"""
return get_storage_path()
@classmethod
def needs_update_or_installation(cls) -> bool:
"""
If this plugin manages its own server binary, then this is the place to check whether the binary needs
an update, or whether it needs to be installed before starting the language server.
"""
return False
@classmethod
def install_or_update(cls) -> None:
"""
Do the actual update/installation of the server binary. This runs in a separate thread, so don't spawn threads
yourself here.
"""
pass
@classmethod
def can_start(cls, window: sublime.Window, initiating_view: sublime.View,
workspace_folders: List[WorkspaceFolder], configuration: ClientConfig) -> Optional[str]:
"""
Determines ability to start. This is called after needs_update_or_installation and after install_or_update.
So you may assume that if you're managing your server binary, then it is already installed when this
classmethod is called.
:param window: The window
:param initiating_view: The initiating view
:param workspace_folders: The workspace folders
:param configuration: The configuration
:returns: A string describing the reason why we should not start a language server session, or None if we
should go ahead and start a session.
"""
return None
@classmethod
def on_pre_start(cls, window: sublime.Window, initiating_view: sublime.View,
workspace_folders: List[WorkspaceFolder], configuration: ClientConfig) -> Optional[str]:
"""
Callback invoked just before the language server subprocess is started. This is the place to do last-minute
adjustments to your "command" or "init_options" in the passed-in "configuration" argument, or change the
order of the workspace folders. You can also choose to return a custom working directory, but consider that a
language server should not care about the working directory.
:param window: The window
:param initiating_view: The initiating view
:param workspace_folders: The workspace folders, you can modify these
:param configuration: The configuration, you can modify this one
:returns: A desired working directory, or None if you don't care
"""
return None
@classmethod
def on_post_start(cls, window: sublime.Window, initiating_view: sublime.View,
workspace_folders: List[WorkspaceFolder], configuration: ClientConfig) -> None:
"""
Callback invoked when the subprocess was just started.
:param window: The window
:param initiating_view: The initiating view
:param workspace_folders: The workspace folders
:param configuration: The configuration
"""
pass
def __init__(self, weaksession: 'weakref.ref[Session]') -> None:
"""
Constructs a new instance. Your instance is constructed after a response to the initialize request.
:param weaksession: A weak reference to the Session. You can grab a strong reference through
self.weaksession(), but don't hold on to that reference.
"""
self.weaksession = weaksession
def on_settings_changed(self, settings: DottedDict) -> None:
"""
Override this method to alter the settings that are returned to the server for the
workspace/didChangeConfiguration notification and the workspace/configuration requests.
:param settings: The settings that the server should receive.
"""
pass
def on_workspace_configuration(self, params: Dict, configuration: Any) -> None:
"""
Override to augment configuration returned for the workspace/configuration request.
:param params: A ConfigurationItem for which configuration is requested.
:param configuration: The resolved configuration for given params.
"""
pass
def on_pre_server_command(self, command: Mapping[str, Any], done_callback: Callable[[], None]) -> bool:
"""
Intercept a command that is about to be sent to the language server.
:param command: The payload containing a "command" and optionally "arguments".
:param done_callback: The callback that you promise to invoke when you return true.
:returns: True if *YOU* will handle this command plugin-side, false otherwise. You must invoke the
passed `done_callback` when you're done.
"""
return False
_plugins = {} # type: Dict[str, Tuple[Type[AbstractPlugin], SettingsRegistration]]
def _register_plugin_impl(plugin: Type[AbstractPlugin], notify_listener: bool) -> None:
global _plugins
name = plugin.name()
if name in _plugins:
return
try:
settings, base_file = plugin.configuration()
if client_configs.add_external_config(name, settings, base_file, notify_listener):
on_change = functools.partial(client_configs.update_external_config, name, settings, base_file)
_plugins[name] = (plugin, SettingsRegistration(settings, on_change))
except Exception as ex:
exception_log('Failed to register plugin "{}"'.format(name), ex)
def register_plugin(plugin: Type[AbstractPlugin], notify_listener: bool = True) -> None:
"""
Register an LSP plugin in LSP.
You should put a call to this function in your `plugin_loaded` callback. This way, when your package is disabled
by a user and then re-enabled again by a user, the changes in state are picked up by LSP, and your language server
will start for the relevant views.
While your helper package may still work without calling `register_plugin` in `plugin_loaded`, the user will have a
better experience when you do call this function.
Your implementation should look something like this:
```python
from LSP.plugin import register_plugin
from LSP.plugin import unregister_plugin
from LSP.plugin import AbstractPlugin
class MyPlugin(AbstractPlugin):
...
def plugin_loaded():
register_plugin(MyPlugin)
def plugin_unloaded():
unregister_plugin(MyPlugin)
```
If you need to install supplementary files (e.g. javascript source code that implements the actual server), do so
in `AbstractPlugin.install_or_update` in a blocking manner, without the use of Python's `threading` module.
"""
if notify_listener:
# There is a bug in Sublime Text's `plugin_loaded` callback. When the package is in the list of
# `"ignored_packages"` in Packages/User/Preferences.sublime-settings, and then removed from that list, the
# sublime.Settings object has missing keys/values. To circumvent this, we run the actual registration one tick
# later. At that point, the settings object is fully loaded. At least, it seems that way. For more context,
# see https://github.com/sublimehq/sublime_text/issues/3379
# and https://github.com/sublimehq/sublime_text/issues/2099
sublime.set_timeout(lambda: _register_plugin_impl(plugin, notify_listener))
else:
_register_plugin_impl(plugin, notify_listener)
def unregister_plugin(plugin: Type[AbstractPlugin]) -> None:
"""
Unregister an LSP plugin in LSP.
You should put a call to this function in your `plugin_unloaded` callback. this way, when your package is disabled
by a user, your language server is shut down for the views that it is attached to. This results in a good user
experience.
"""
global _plugins
name = plugin.name()
try:
_plugins.pop(name, None)
client_configs.remove_external_config(name)
except Exception as ex:
exception_log('Failed to unregister plugin "{}"'.format(name), ex)
def get_plugin(name: str) -> Optional[Type[AbstractPlugin]]:
global _plugins
tup = _plugins.get(name, None)
return tup[0] if tup else None
class Logger(metaclass=ABCMeta):
@abstractmethod
def stderr_message(self, message: str) -> None:
pass
@abstractmethod
def outgoing_response(self, request_id: Any, params: Any) -> None:
pass
@abstractmethod
def outgoing_error_response(self, request_id: Any, error: Error) -> None:
pass
@abstractmethod
def outgoing_request(self, request_id: int, method: str, params: Any) -> None:
pass
@abstractmethod
def outgoing_notification(self, method: str, params: Any) -> None:
pass
@abstractmethod
def incoming_response(self, request_id: int, params: Any, is_error: bool) -> None:
pass
@abstractmethod
def incoming_request(self, request_id: Any, method: str, params: Any) -> None:
pass
@abstractmethod
def incoming_notification(self, method: str, params: Any, unhandled: bool) -> None:
pass
def print_to_status_bar(error: Dict[str, Any]) -> None:
sublime.status_message(error["message"])
def method2attr(method: str) -> str:
# window/messageRequest -> m_window_messageRequest
# $/progress -> m___progress
# client/registerCapability -> m_client_registerCapability
return 'm_' + ''.join(map(lambda c: c if c.isalpha() else '_', method))
class _RegistrationData:
__slots__ = ("registration_id", "capability_path", "registration_path", "options", "session_buffers", "selector")
def __init__(
self,
registration_id: str,
capability_path: str,
registration_path: str,
options: Dict[str, Any]
) -> None:
self.registration_id = registration_id
self.registration_path = registration_path
self.capability_path = capability_path
document_selector = options.pop("documentSelector", None)
if not isinstance(document_selector, list):
document_selector = []
self.selector = DocumentSelector(document_selector)
self.options = options
self.session_buffers = WeakSet() # type: WeakSet[SessionBufferProtocol]
def __del__(self) -> None:
for sb in self.session_buffers:
sb.unregister_capability_async(self.registration_id, self.capability_path, self.registration_path)
def check_applicable(self, sb: SessionBufferProtocol) -> None:
for sv in sb.session_views:
if self.selector.matches(sv.view):
self.session_buffers.add(sb)
sb.register_capability_async(
self.registration_id, self.capability_path, self.registration_path, self.options)
return
class Session(TransportCallbacks):
def __init__(self, manager: Manager, logger: Logger, workspace_folders: List[WorkspaceFolder],
config: ClientConfig, plugin_class: Optional[Type[AbstractPlugin]]) -> None:
self.transport = None # type: Optional[Transport]
self.request_id = 0 # Our request IDs are always integers.
self._logger = logger
self._response_handlers = {} # type: Dict[int, Tuple[Request, Callable, Optional[Callable[[Any], None]]]]
self.config = config
self.manager = weakref.ref(manager)
self.window = manager.window()
self.state = ClientStates.STARTING
self.capabilities = Capabilities()
self.exiting = False
self._registrations = {} # type: Dict[str, _RegistrationData]
self._init_callback = None # type: Optional[InitCallback]
self._initialize_error = None # type: Optional[Tuple[int, Optional[Exception]]]
self._views_opened = 0
self._workspace_folders = workspace_folders
self._session_views = WeakSet() # type: WeakSet[SessionViewProtocol]
self._session_buffers = WeakSet() # type: WeakSet[SessionBufferProtocol]
self._progress = {} # type: Dict[Any, Dict[str, str]]
self._plugin_class = plugin_class
self._plugin = None # type: Optional[AbstractPlugin]
def __del__(self) -> None:
debug(self.config.command, "ended")
for token in self._progress.keys():
key = self._progress_status_key(token)
for sv in self.session_views_async():
if sv.view.is_valid():
sv.view.erase_status(key)
def __getattr__(self, name: str) -> Any:
"""
If we don't have a request/notification handler, look up the request/notification handler in the plugin.
"""
if name.startswith('m_'):
attr = getattr(self._plugin, name)
if attr is not None:
return attr
raise AttributeError(name)
# TODO: Create an assurance that the API doesn't change here as it can be used by plugins.
def get_workspace_folders(self) -> List[WorkspaceFolder]:
return self._workspace_folders
# --- session view management --------------------------------------------------------------------------------------
def register_session_view_async(self, sv: SessionViewProtocol) -> None:
self._session_views.add(sv)
self._views_opened += 1
def unregister_session_view_async(self, sv: SessionViewProtocol) -> None:
self._session_views.discard(sv)
if not self._session_views:
current_count = self._views_opened
debounced(self.end_async, 3000, lambda: self._views_opened == current_count, async_thread=True)
def session_views_async(self) -> Generator[SessionViewProtocol, None, None]:
"""
It is only safe to iterate over this in the async thread
"""
yield from self._session_views
def session_view_for_view_async(self, view: sublime.View) -> Optional[SessionViewProtocol]:
for sv in self.session_views_async():
if sv.view == view:
return sv
return None
def set_window_status_async(self, key: str, message: str) -> None:
for sv in self.session_views_async():
sv.view.set_status(key, message)
def erase_window_status_async(self, key: str) -> None:
for sv in self.session_views_async():
sv.view.erase_status(key)
# --- session buffer management ------------------------------------------------------------------------------------
def register_session_buffer_async(self, sb: SessionBufferProtocol) -> None:
self._session_buffers.add(sb)
for data in self._registrations.values():
data.check_applicable(sb)
def unregister_session_buffer_async(self, sb: SessionBufferProtocol) -> None:
self._session_buffers.discard(sb)
def session_buffers_async(self) -> Generator[SessionBufferProtocol, None, None]:
"""
It is only safe to iterate over this in the async thread
"""
yield from self._session_buffers
def get_session_buffer_for_uri_async(self, uri: str) -> Optional[SessionBufferProtocol]:
file_name = uri_to_filename(uri)
for sb in self.session_buffers_async():
try:
if sb.file_name == file_name or os.path.samefile(file_name, sb.file_name):
return sb
except FileNotFoundError:
pass
return None
# --- capability observers -----------------------------------------------------------------------------------------
def can_handle(self, view: sublime.View, capability: Optional[str], inside_workspace: bool) -> bool:
file_name = view.file_name() or ''
if (self.config.match_view(view)
and self.state == ClientStates.READY
and self.handles_path(file_name, inside_workspace)):
# If there's no capability requirement then this session can handle the view
if capability is None:
return True
sv = self.session_view_for_view_async(view)
if sv:
return sv.has_capability_async(capability)
else:
return self.has_capability(capability)
return False
def has_capability(self, capability: str) -> bool:
value = self.get_capability(capability)
return value is not False and value is not None
def get_capability(self, capability: str) -> Optional[Any]:
return self.capabilities.get(capability)
def should_notify_did_open(self) -> bool:
return self.capabilities.should_notify_did_open()
def text_sync_kind(self) -> int:
return self.capabilities.text_sync_kind()
def should_notify_did_change(self) -> bool:
return self.capabilities.should_notify_did_change()
def should_notify_did_change_workspace_folders(self) -> bool:
return self.capabilities.should_notify_did_change_workspace_folders()
def should_notify_will_save(self) -> bool:
return self.capabilities.should_notify_will_save()
def should_notify_did_save(self) -> Tuple[bool, bool]:
return self.capabilities.should_notify_did_save()
def should_notify_did_close(self) -> bool:
return self.capabilities.should_notify_did_close()
# --- misc methods -------------------------------------------------------------------------------------------------
def handles_path(self, file_path: Optional[str], inside_workspace: bool) -> bool:
if self._supports_workspace_folders():
# A workspace-aware language server handles any path, both inside and outside the workspaces.
return True
# If we end up here then the language server is workspace-unaware. This means there can be more than one
# language server with the same config name. So we have to actually do the subpath checks.
if not file_path:
return False
if not self._workspace_folders or not inside_workspace:
return True
for folder in self._workspace_folders:
if is_subpath_of(file_path, folder.path):
return True
return False
def update_folders(self, folders: List[WorkspaceFolder]) -> None:
if self.should_notify_did_change_workspace_folders():
added, removed = diff(self._workspace_folders, folders)
if added or removed:
params = {
"event": {
"added": [a.to_lsp() for a in added],
"removed": [r.to_lsp() for r in removed]
}
}
self.send_notification(Notification.didChangeWorkspaceFolders(params))
if self._supports_workspace_folders():
self._workspace_folders = folders
else:
self._workspace_folders = folders[:1]
def initialize_async(self, variables: Dict[str, str], transport: Transport, init_callback: InitCallback) -> None:
self.transport = transport
params = get_initialize_params(variables, self._workspace_folders, self.config)
self._init_callback = init_callback
self.send_request_async(
Request.initialize(params), self._handle_initialize_success, self._handle_initialize_error)
def _handle_initialize_success(self, result: Any) -> None:
self.capabilities.assign(result.get('capabilities', dict()))
if self._workspace_folders and not self._supports_workspace_folders():
self._workspace_folders = self._workspace_folders[:1]
self.state = ClientStates.READY
if self._plugin_class is not None:
self._plugin = self._plugin_class(weakref.ref(self))
self.send_notification(Notification.initialized())
self._maybe_send_did_change_configuration()
execute_commands = self.get_capability('executeCommandProvider.commands')
if execute_commands:
debug("{}: Supported execute commands: {}".format(self.config.name, execute_commands))
code_action_kinds = self.get_capability('codeActionProvider.codeActionKinds')
if code_action_kinds:
debug('{}: supported code action kinds: {}'.format(self.config.name, code_action_kinds))
if self._init_callback:
self._init_callback(self, False)
self._init_callback = None
def _handle_initialize_error(self, result: Any) -> None:
self._initialize_error = (result.get('code', -1), Exception(result.get('message', 'Error initializing server')))
# Init callback called after transport is closed to avoid pre-mature GC of Session.
self.end_async()
def call_manager(self, method: str, *args: Any) -> None:
mgr = self.manager()
if mgr:
getattr(mgr, method)(*args)
def clear_diagnostics_async(self) -> None:
# XXX: Remove this functionality?
for sb in self.session_buffers_async():
sb.on_diagnostics_async([], None)
def on_stderr_message(self, message: str) -> None:
self.call_manager('handle_stderr_log', self, message)
self._logger.stderr_message(message)
def _supports_workspace_folders(self) -> bool:
return self.has_capability("workspace.workspaceFolders.supported")
def _maybe_send_did_change_configuration(self) -> None:
if self.config.settings:
if self._plugin:
self._plugin.on_settings_changed(self.config.settings)
variables = self._template_variables()
resolved = self.config.settings.get_resolved(variables)
self.send_notification(Notification("workspace/didChangeConfiguration", {"settings": resolved}))
def _template_variables(self) -> Dict[str, str]:
variables = extract_variables(self.window)
if self._plugin_class is not None:
extra_vars = self._plugin_class.additional_variables()
if extra_vars:
variables.update(extra_vars)
return variables
def execute_command(self, command: ExecuteCommandParams) -> Promise:
"""Run a command from any thread. Your .then() continuations will run in Sublime's worker thread."""
if self._plugin:
task = Promise.packaged_task() # type: PackagedTask[None]
promise, resolve = task
if self._plugin.on_pre_server_command(command, resolve):
return promise
# TODO: Our Promise class should be able to handle errors/exceptions
return Promise(
lambda resolve: self.send_request(
Request.executeCommand(command),
resolve,
lambda err: resolve(Error(err["code"], err["message"], err.get("data")))
)
)
def run_code_action_async(self, code_action: Union[Command, CodeAction]) -> Promise:
command = code_action.get("command")
if isinstance(command, str):
code_action = cast(Command, code_action)
# This is actually a command.
command_params = {'command': command} # type: ExecuteCommandParams
arguments = code_action.get('arguments', None)
if isinstance(arguments, list):
command_params['arguments'] = arguments
return self.execute_command(command_params)
# At this point it cannot be a command anymore, it has to be a proper code action.
# A code action can have an edit and/or command. Note that it can have *both*. In case both are present, we
# must apply the edits before running the command.
code_action = cast(CodeAction, code_action)
return self._maybe_resolve_code_action(code_action).then(self._apply_code_action_async)
def _maybe_resolve_code_action(self, code_action: CodeAction) -> Promise[Union[CodeAction, Error]]:
if self.has_capability("codeActionProvider.resolveSupport"):
# TODO: Should we accept a SessionBuffer? What if this capability is registered with a documentSelector?
# We must first resolve the command and edit properties, because they can potentially be absent.
request = Request("codeAction/resolve", code_action)
return self.send_request_task(request)
return Promise.resolve(code_action)
def _apply_code_action_async(self, code_action: Union[CodeAction, Error, None]) -> Promise: