Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 30 additions & 20 deletions core/src/services/sftp/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ use serde::Deserialize;

use super::error::is_not_found;
use super::error::is_sftp_protocol_error;
use super::error::parse_sftp_error;
use super::error::parse_ssh_error;
use super::lister::SftpLister;
use super::writer::SftpWriter;
use crate::raw::*;
Expand Down Expand Up @@ -291,7 +293,7 @@ impl Accessor for SftpBackend {
if let Err(e) = res {
// ignore error if dir already exists
if !is_sftp_protocol_error(&e) {
return Err(e.into());
return Err(parse_sftp_error(e));
}
}
fs.set_cwd(&current);
Expand All @@ -305,7 +307,7 @@ impl Accessor for SftpBackend {
let mut fs = client.fs();
fs.set_cwd(&self.root);

let meta: Metadata = fs.metadata(path).await?.into();
let meta: Metadata = fs.metadata(path).await.map_err(parse_sftp_error)?.into();

Ok(RpStat::new(meta))
}
Expand All @@ -315,9 +317,12 @@ impl Accessor for SftpBackend {

let mut fs = client.fs();
fs.set_cwd(&self.root);
let path = fs.canonicalize(path).await?;
let path = fs.canonicalize(path).await.map_err(parse_sftp_error)?;

let f = client.open(path.as_path()).await?;
let f = client
.open(path.as_path())
.await
.map_err(parse_sftp_error)?;

// Sorry for the ugly code...
//
Expand All @@ -339,7 +344,7 @@ impl Accessor for SftpBackend {

let mut fs = client.fs();
fs.set_cwd(&self.root);
let path = fs.canonicalize(path).await?;
let path = fs.canonicalize(path).await.map_err(parse_sftp_error)?;

let mut option = client.options();
option.create(true);
Expand All @@ -349,7 +354,7 @@ impl Accessor for SftpBackend {
option.write(true);
}

let file = option.open(path).await?;
let file = option.open(path).await.map_err(parse_sftp_error)?;

Ok((RpWrite::new(), SftpWriter::new(file)))
}
Expand All @@ -368,15 +373,15 @@ impl Accessor for SftpBackend {
if is_not_found(&e) {
return Ok(RpDelete::default());
} else {
return Err(e.into());
return Err(parse_sftp_error(e));
}
}
}
.read_dir()
.boxed();

while let Some(file) = dir.next().await {
let file = file?;
let file = file.map_err(parse_sftp_error)?;
let file_name = file.filename().to_str();
if file_name == Some(".") || file_name == Some("..") {
continue;
Expand All @@ -394,14 +399,14 @@ impl Accessor for SftpBackend {

match fs.remove_dir(path).await {
Err(e) if !is_not_found(&e) => {
return Err(e.into());
return Err(parse_sftp_error(e));
}
_ => {}
}
} else {
match fs.remove_file(path).await {
Err(e) if !is_not_found(&e) => {
return Err(e.into());
return Err(parse_sftp_error(e));
}
_ => {}
}
Expand All @@ -423,7 +428,7 @@ impl Accessor for SftpBackend {
if is_not_found(&e) {
return Ok((RpList::default(), None));
} else {
return Err(e.into());
return Err(parse_sftp_error(e));
}
}
}
Expand All @@ -445,12 +450,15 @@ impl Accessor for SftpBackend {
self.create_dir(dir, OpCreateDir::default()).await?;
}

let src = fs.canonicalize(from).await?;
let dst = fs.canonicalize(to).await?;
let mut src_file = client.open(&src).await?;
let mut dst_file = client.create(dst).await?;
let src = fs.canonicalize(from).await.map_err(parse_sftp_error)?;
let dst = fs.canonicalize(to).await.map_err(parse_sftp_error)?;
let mut src_file = client.open(&src).await.map_err(parse_sftp_error)?;
let mut dst_file = client.create(dst).await.map_err(parse_sftp_error)?;

src_file.copy_all_to(&mut dst_file).await?;
src_file
.copy_all_to(&mut dst_file)
.await
.map_err(parse_sftp_error)?;

Ok(RpCopy::default())
}
Expand All @@ -464,7 +472,7 @@ impl Accessor for SftpBackend {
if let Some((dir, _)) = to.rsplit_once('/') {
self.create_dir(dir, OpCreateDir::default()).await?;
}
fs.rename(from, to).await?;
fs.rename(from, to).await.map_err(parse_sftp_error)?;

Ok(RpRename::default())
}
Expand Down Expand Up @@ -517,9 +525,11 @@ async fn connect_sftp(

session.known_hosts_check(known_hosts_strategy);

let session = session.connect(&endpoint).await?;
let session = session.connect(&endpoint).await.map_err(parse_ssh_error)?;

let sftp = Sftp::from_session(session, SftpOptions::default()).await?;
let sftp = Sftp::from_session(session, SftpOptions::default())
.await
.map_err(parse_sftp_error)?;

if !root.is_empty() {
let mut fs = sftp.fs();
Expand All @@ -533,7 +543,7 @@ async fn connect_sftp(
if let Err(e) = res {
// ignore error if dir already exists
if !is_sftp_protocol_error(&e) {
return Err(e.into());
return Err(parse_sftp_error(e));
}
}
fs.set_cwd(&current);
Expand Down
58 changes: 13 additions & 45 deletions core/src/services/sftp/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,55 +22,23 @@ use openssh_sftp_client::Error as SftpClientError;
use crate::Error;
use crate::ErrorKind;

#[derive(Debug)]
pub enum SftpError {
SftpClientError(SftpClientError),
SshError(SshError),
}

impl From<SftpClientError> for Error {
fn from(e: SftpClientError) -> Self {
let kind = match &e {
SftpClientError::UnsupportedSftpProtocol { version: _ } => ErrorKind::Unsupported,
SftpClientError::SftpError(kind, _msg) => match kind {
SftpErrorKind::NoSuchFile => ErrorKind::NotFound,
SftpErrorKind::PermDenied => ErrorKind::PermissionDenied,
SftpErrorKind::OpUnsupported => ErrorKind::Unsupported,
_ => ErrorKind::Unexpected,
},
pub fn parse_sftp_error(e: SftpClientError) -> Error {
let kind = match &e {
SftpClientError::UnsupportedSftpProtocol { version: _ } => ErrorKind::Unsupported,
SftpClientError::SftpError(kind, _msg) => match kind {
SftpErrorKind::NoSuchFile => ErrorKind::NotFound,
SftpErrorKind::PermDenied => ErrorKind::PermissionDenied,
SftpErrorKind::OpUnsupported => ErrorKind::Unsupported,
_ => ErrorKind::Unexpected,
};

Error::new(kind, "sftp error").set_source(e)
}
}

/// REMOVE ME: it's not allowed to impl `<T>` for Error.
impl From<SshError> for Error {
fn from(e: SshError) -> Self {
Error::new(ErrorKind::Unexpected, "ssh error").set_source(e)
}
}

impl From<SftpClientError> for SftpError {
fn from(e: SftpClientError) -> Self {
SftpError::SftpClientError(e)
}
}
},
_ => ErrorKind::Unexpected,
};

impl From<SshError> for SftpError {
fn from(e: SshError) -> Self {
SftpError::SshError(e)
}
Error::new(kind, "sftp error").set_source(e)
}

impl From<SftpError> for Error {
fn from(e: SftpError) -> Self {
match e {
SftpError::SftpClientError(e) => e.into(),
SftpError::SshError(e) => e.into(),
}
}
pub fn parse_ssh_error(e: SshError) -> Error {
Error::new(ErrorKind::Unexpected, "ssh error").set_source(e)
}

pub(super) fn is_not_found(e: &SftpClientError) -> bool {
Expand Down
6 changes: 5 additions & 1 deletion core/src/services/sftp/lister.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ use openssh_sftp_client::fs::ReadDir;
use crate::raw::oio;
use crate::Result;

use super::error::parse_sftp_error;

pub struct SftpLister {
dir: Pin<Box<ReadDir>>,
prefix: String,
Expand All @@ -47,7 +49,9 @@ impl SftpLister {
#[async_trait]
impl oio::List for SftpLister {
fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Result<Option<oio::Entry>>> {
let item = ready!(self.dir.poll_next_unpin(cx)).transpose()?;
let item = ready!(self.dir.poll_next_unpin(cx))
.transpose()
.map_err(parse_sftp_error)?;

match item {
Some(e) => {
Expand Down