Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Media opening (slight reorder & tidy of #1058) #1223

Merged
134 changes: 134 additions & 0 deletions tests/helper/test_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,20 @@
canonicalize_color,
classify_unread_counts,
display_error_if_present,
download_media,
get_unused_fence,
hash_util_decode,
index_messages,
notify_if_message_sent_outside_narrow,
open_media,
powerset,
process_media,
)


MODULE = "zulipterminal.helper"
MODEL = "zulipterminal.model.Model"
SERVER_URL = "https://chat.zulip.org"


def test_index_messages_narrow_all_messages(
Expand Down Expand Up @@ -481,3 +485,133 @@ def test_get_unused_fence(message_content: str, expected_fence: str) -> None:
generated_fence = get_unused_fence(message_content)

assert generated_fence == expected_fence


def test_download_media(
mocker: MockerFixture,
media_path: str = "/tmp/zt-somerandomtext-image.png",
url: str = SERVER_URL + "/user_uploads/path/image.png",
) -> None:
mocker.patch(MODULE + ".requests")
mocker.patch(MODULE + ".open")
callback = mocker.patch("zulipterminal.ui.View.set_footer_text")
(
mocker.patch(
MODULE + ".NamedTemporaryFile"
).return_value.__enter__.return_value.name
) = media_path
controller = mocker.Mock()

assert media_path == download_media(controller, url, callback)


@pytest.mark.parametrize(
"platform, download_media_called, show_media_called, tool, modified_media_path",
[
("Linux", True, True, "xdg-open", "/path/to/media"),
("MacOS", True, True, "open", "/path/to/media"),
("WSL", True, True, "explorer.exe", "\\path\\to\\media"),
("UnknownOS", True, False, "unknown-tool", "/path/to/media"),
],
ids=[
"Linux_os_user",
"Mac_os_user",
"WSL_os_user",
"Unsupported_os_user",
],
)
def test_process_media(
mocker: MockerFixture,
platform: str,
download_media_called: bool,
show_media_called: bool,
tool: str,
modified_media_path: str,
media_path: str = "/path/to/media",
link: str = "/url/of/media",
) -> None:
controller = mocker.Mock()
mocked_download_media = mocker.patch(
MODULE + ".download_media", return_value=media_path
)
mocked_open_media = mocker.patch(MODULE + ".open_media")
mocker.patch(MODULE + ".PLATFORM", platform)
mocker.patch("zulipterminal.core.Controller.show_media_confirmation_popup")

process_media(controller, link)

assert mocked_download_media.called == download_media_called
assert controller.show_media_confirmation_popup.called == show_media_called
if show_media_called:
controller.show_media_confirmation_popup.assert_called_once_with(
mocked_open_media, tool, modified_media_path
)


def test_process_media_empty_url(
mocker: MockerFixture,
link: str = "",
) -> None:
controller = mocker.Mock()
mocker.patch("zulipterminal.core.Controller.report_error")
mocked_download_media = mocker.patch(MODULE + ".download_media")
mocker.patch("zulipterminal.core.Controller.show_media_confirmation_popup")

process_media(controller, link)

mocked_download_media.assert_not_called()
controller.show_media_confirmation_popup.assert_not_called()
controller.report_error.assert_called_once_with("The media link is empty")


@pytest.mark.parametrize(
"returncode, error",
[
(0, []),
(
1,
[
" The tool ",
("footer_contrast", "xdg-open"),
" did not run successfully" ". Exited with ",
("footer_contrast", "1"),
],
),
],
)
def test_open_media(
mocker: MockerFixture,
returncode: int,
error: List[Any],
tool: str = "xdg-open",
media_path: str = "/tmp/zt-somerandomtext-image.png",
) -> None:
mocked_run = mocker.patch(MODULE + ".subprocess.run")
mocked_run.return_value.returncode = returncode
controller = mocker.Mock()

open_media(controller, tool, media_path)

assert mocked_run.called
if error:
controller.report_error.assert_called_once_with(error)
else:
controller.report_error.assert_not_called()


def test_open_media_tool_exception(
mocker: MockerFixture,
media_path: str = "/tmp/zt-somerandomtext-image.png",
tool: str = "unsupported-tool",
error: List[Any] = [
" The tool ",
("footer_contrast", "unsupported-tool"),
" could not be found",
],
) -> None:
mocker.patch(MODULE + ".subprocess.run", side_effect=FileNotFoundError())
controller = mocker.Mock()

open_media(controller, tool, media_path)

controller.report_error.assert_called_once_with(error)
43 changes: 42 additions & 1 deletion tests/platform_code/test_platform_code.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
import pytest
from pytest_mock import MockerFixture

from zulipterminal.platform_code import AllPlatforms, SupportedPlatforms, notify
from zulipterminal.platform_code import (
AllPlatforms,
SupportedPlatforms,
normalized_file_path,
notify,
successful_GUI_return_code,
)


MODULE = "zulipterminal.platform_code"
Expand Down Expand Up @@ -67,3 +73,38 @@ def test_notify_quotes(
assert len(params[0][0][0]) == cmd_length

# NOTE: If there is a quoting error, we may get a ValueError too


@pytest.mark.parametrize(
"PLATFORM, expected_return_code",
[
("Linux", 0),
("MacOS", 0),
("WSL", 1),
],
)
def test_successful_GUI_return_code(
mocker: MockerFixture,
PLATFORM: SupportedPlatforms,
expected_return_code: int,
) -> None:
mocker.patch(MODULE + ".PLATFORM", PLATFORM)
assert successful_GUI_return_code() == expected_return_code


@pytest.mark.parametrize(
"PLATFORM, expected_path",
[
("Linux", "/path/to/file"),
("MacOS", "/path/to/file"),
("WSL", "\\path\\to\\file"),
],
)
def test_normalized_file_path(
mocker: MockerFixture,
PLATFORM: SupportedPlatforms,
expected_path: str,
path: str = "/path/to/file",
) -> None:
mocker.patch(MODULE + ".PLATFORM", PLATFORM)
assert normalized_file_path(path) == expected_path
21 changes: 16 additions & 5 deletions tests/ui_tools/test_buttons.py
Original file line number Diff line number Diff line change
Expand Up @@ -493,11 +493,15 @@ def test_update_widget(
assert isinstance(mocked_button._w, AttrMap)

@pytest.mark.parametrize(
"link, handle_narrow_link_called",
[
(SERVER_URL + "/#narrow/stream/1-Stream-1", True),
(SERVER_URL + "/user_uploads/some/path/image.png", False),
("https://foo.com", False),
"link",
"handle_narrow_link_called",
"process_media_called",
],
[
(SERVER_URL + "/#narrow/stream/1-Stream-1", True, False),
(SERVER_URL + "/user_uploads/some/path/image.png", False, True),
("https://foo.com", False, False),
],
ids=[
"internal_narrow_link",
Expand All @@ -506,15 +510,22 @@ def test_update_widget(
],
)
def test_handle_link(
self, mocker: MockerFixture, link: str, handle_narrow_link_called: bool
self,
mocker: MockerFixture,
link: str,
handle_narrow_link_called: bool,
process_media_called: bool,
) -> None:
self.controller.model.server_url = SERVER_URL
self.handle_narrow_link = mocker.patch(MSGLINKBUTTON + ".handle_narrow_link")
self.controller.loop.widget = mocker.Mock(spec=Overlay)
self.process_media = mocker.patch(MODULE + ".process_media")
mocked_button = self.message_link_button(link=link)

mocked_button.handle_link()

assert self.handle_narrow_link.called == handle_narrow_link_called
assert self.process_media.called == process_media_called

@pytest.mark.parametrize(
"stream_data, expected_response",
Expand Down
17 changes: 17 additions & 0 deletions zulipterminal/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -471,6 +471,23 @@ def report_warning(
"""
self.view.set_footer_text(text, "task:warning", duration)

def show_media_confirmation_popup(
self, func: Any, tool: str, media_path: str
) -> None:
callback = partial(func, self, tool, media_path)
question = urwid.Text(
[
"Your requested media has been downloaded to:\n",
("bold", media_path),
"\n\nDo you want the application to open it with ",
("bold", tool),
"?",
]
)
self.loop.widget = PopUpConfirmationView(
self, question, callback, location="center"
)

def search_messages(self, text: str) -> None:
# Search for a text in messages
self.model.index["search"].clear()
Expand Down
95 changes: 94 additions & 1 deletion zulipterminal/helper.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import os
import subprocess
import time
from collections import OrderedDict, defaultdict
from contextlib import contextmanager
from functools import wraps
from functools import partial, wraps
from itertools import chain, combinations
from re import ASCII, MULTILINE, findall, match
from tempfile import NamedTemporaryFile
from threading import Thread
from typing import (
Any,
Expand All @@ -23,6 +25,7 @@
)
from urllib.parse import unquote

import requests
from typing_extensions import TypedDict

from zulipterminal.api_types import Composition, EmojiType, Message
Expand All @@ -33,6 +36,11 @@
REGEX_QUOTED_FENCE_LENGTH,
)
from zulipterminal.config.ui_mappings import StreamAccessType
from zulipterminal.platform_code import (
PLATFORM,
normalized_file_path,
successful_GUI_return_code,
)


class StreamData(TypedDict):
Expand Down Expand Up @@ -723,3 +731,88 @@ def suppress_output() -> Iterator[None]:
finally:
os.dup2(stdout, 1)
os.dup2(stderr, 2)


@asynch
def process_media(controller: Any, link: str) -> None:
"""
Helper to process media links.
"""
if not link:
controller.report_error("The media link is empty")
return

show_download_status = partial(
controller.view.set_footer_text, "Downloading your media..."
)
media_path = download_media(controller, link, show_download_status)
tool = ""

# TODO: Add support for other platforms as well.
if PLATFORM == "WSL":
tool = "explorer.exe"
# Modifying path to backward slashes instead of forward slashes
media_path = media_path.replace("/", "\\")
elif PLATFORM == "Linux":
tool = "xdg-open"
elif PLATFORM == "MacOS":
tool = "open"
else:
controller.report_error("Media not supported for this platform")
return

controller.show_media_confirmation_popup(open_media, tool, media_path)


def download_media(
controller: Any, url: str, show_download_status: Callable[..., None]
) -> str:
"""
Helper to download media from given link. Returns the path to downloaded media.
"""
media_name = url.split("/")[-1]
client = controller.client
auth = requests.auth.HTTPBasicAuth(client.email, client.api_key)

with requests.get(url, auth=auth, stream=True) as response:
response.raise_for_status()
local_path = ""
with NamedTemporaryFile(
mode="wb", delete=False, prefix="zt-", suffix=f"-{media_name}"
) as file:
local_path = file.name
for chunk in response.iter_content(chunk_size=8192):
if chunk: # Filter out keep-alive new chunks.
file.write(chunk)
show_download_status()

controller.report_success([" Downloaded ", ("bold", media_name)])
return normalized_file_path(local_path)

return ""


@asynch
def open_media(controller: Any, tool: str, media_path: str) -> None:
"""
Helper to open a media file given its path and tool.
"""
error = []
command = [tool, media_path]
try:
process = subprocess.run(
command, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL
)
exit_status = process.returncode
if exit_status != successful_GUI_return_code():
error = [
" The tool ",
("footer_contrast", tool),
" did not run successfully" ". Exited with ",
("footer_contrast", str(exit_status)),
]
except FileNotFoundError:
error = [" The tool ", ("footer_contrast", tool), " could not be found"]

if error:
controller.report_error(error)
Loading