diff --git a/.github/services/memcached/memcached_with_unix/action.yml b/.github/services/memcached/memcached_with_unix/action.yml new file mode 100644 index 000000000000..1fd09e689fd9 --- /dev/null +++ b/.github/services/memcached/memcached_with_unix/action.yml @@ -0,0 +1,52 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +name: memcached_with_unix +description: 'Behavior test for memcached with unix socket' + +runs: + using: "composite" + steps: + - name: Setup memcached server with Unix socket + shell: bash + working-directory: fixtures/memcached + run: | + # Create directory for the socket + sudo mkdir -p /tmp/memcached + sudo chmod 777 /tmp/memcached + docker compose -f docker-compose-memcached-unix.yml up -d --wait + - name: Wait for socket + shell: bash + run: | + # Wait for the socket file to be created + for i in {1..30}; do + if [ -S /tmp/memcached/memcached.sock ]; then + echo "Socket is ready" + break + fi + echo "Waiting for socket... ($i/30)" + sleep 1 + done + # Verify socket exists + ls -la /tmp/memcached/ + - name: Setup + shell: bash + run: | + cat << EOF >> $GITHUB_ENV + OPENDAL_MEMCACHED_ENDPOINT=unix:///tmp/memcached/memcached.sock + OPENDAL_MEMCACHED_ROOT=/ + EOF diff --git a/core/Cargo.lock b/core/Cargo.lock index 6610251347fc..9c18a7c86991 100644 --- a/core/Cargo.lock +++ b/core/Cargo.lock @@ -6766,10 +6766,10 @@ name = "opendal-service-memcached" version = "0.55.0" dependencies = [ "fastpool", - "http 1.4.0", "opendal-core", "serde", "tokio", + "url", ] [[package]] diff --git a/core/services/memcached/Cargo.toml b/core/services/memcached/Cargo.toml index fa8ea87aaab6..f5c27e6f5b7e 100644 --- a/core/services/memcached/Cargo.toml +++ b/core/services/memcached/Cargo.toml @@ -32,10 +32,10 @@ all-features = true [dependencies] fastpool = "1.0.2" -http = { workspace = true } opendal-core = { path = "../../core", version = "0.55.0", default-features = false } serde = { workspace = true, features = ["derive"] } tokio = { workspace = true, features = ["net", "io-util"] } +url = "2.5.7" [dev-dependencies] tokio = { workspace = true, features = ["macros", "rt-multi-thread"] } diff --git a/core/services/memcached/src/backend.rs b/core/services/memcached/src/backend.rs index 12ab94f0b221..4ea91c0c9dbf 100644 --- a/core/services/memcached/src/backend.rs +++ b/core/services/memcached/src/backend.rs @@ -15,7 +15,9 @@ // specific language governing permissions and limitations // under the License. +use std::borrow::Cow; use std::sync::Arc; +use url::Url; use opendal_core::raw::*; use opendal_core::*; @@ -94,53 +96,73 @@ impl Builder for MemcachedBuilder { type Config = MemcachedConfig; fn build(self) -> Result { - let endpoint = self.config.endpoint.clone().ok_or_else(|| { + let endpoint_raw = self.config.endpoint.clone().ok_or_else(|| { Error::new(ErrorKind::ConfigInvalid, "endpoint is empty") .with_context("service", MEMCACHED_SCHEME) })?; - let uri = http::Uri::try_from(&endpoint).map_err(|err| { + + let url_str = if !endpoint_raw.contains("://") { + Cow::Owned(format!("tcp://{}", endpoint_raw)) + } else { + Cow::Borrowed(endpoint_raw.as_str()) + }; + + let parsed = Url::parse(&url_str).map_err(|err| { Error::new(ErrorKind::ConfigInvalid, "endpoint is invalid") .with_context("service", MEMCACHED_SCHEME) - .with_context("endpoint", &endpoint) + .with_context("endpoint", &endpoint_raw) .set_source(err) })?; - match uri.scheme_str() { - // If scheme is none, we will use tcp by default. - None => (), - Some(scheme) => { - // We only support tcp by now. - if scheme != "tcp" { + let endpoint = match parsed.scheme() { + "tcp" => { + let host = parsed.host_str().ok_or_else(|| { + Error::new(ErrorKind::ConfigInvalid, "tcp endpoint doesn't have host") + .with_context("service", MEMCACHED_SCHEME) + .with_context("endpoint", &endpoint_raw) + })?; + let port = parsed.port().ok_or_else(|| { + Error::new(ErrorKind::ConfigInvalid, "tcp endpoint doesn't have port") + .with_context("service", MEMCACHED_SCHEME) + .with_context("endpoint", &endpoint_raw) + })?; + Endpoint::Tcp(format!("{host}:{port}")) + } + + #[cfg(unix)] + "unix" => { + let path = parsed.path(); + if path.is_empty() { return Err(Error::new( ErrorKind::ConfigInvalid, - "endpoint is using invalid scheme", + "unix endpoint doesn't have path", ) .with_context("service", MEMCACHED_SCHEME) - .with_context("endpoint", &endpoint) - .with_context("scheme", scheme.to_string())); + .with_context("endpoint", &endpoint_raw)); } + Endpoint::Unix(path.to_string()) } - }; - let host = if let Some(host) = uri.host() { - host.to_string() - } else { - return Err( - Error::new(ErrorKind::ConfigInvalid, "endpoint doesn't have host") - .with_context("service", MEMCACHED_SCHEME) - .with_context("endpoint", &endpoint), - ); - }; - let port = if let Some(port) = uri.port_u16() { - port - } else { - return Err( - Error::new(ErrorKind::ConfigInvalid, "endpoint doesn't have port") - .with_context("service", MEMCACHED_SCHEME) - .with_context("endpoint", &endpoint), - ); + #[cfg(not(unix))] + "unix" => { + return Err(Error::new( + ErrorKind::ConfigInvalid, + "unix socket is not supported on this platform", + ) + .with_context("service", MEMCACHED_SCHEME) + .with_context("endpoint", &endpoint_raw)); + } + + scheme => { + return Err(Error::new( + ErrorKind::ConfigInvalid, + "endpoint is using invalid scheme, only tcp and unix are supported", + ) + .with_context("service", MEMCACHED_SCHEME) + .with_context("endpoint", &endpoint_raw) + .with_context("scheme", scheme)); + } }; - let endpoint = format!("{host}:{port}",); let root = normalize_root(self.config.root.unwrap_or_else(|| "/".to_string()).as_str()); diff --git a/core/services/memcached/src/binary.rs b/core/services/memcached/src/binary.rs index 205b94fcaeb3..ec9c72abcffc 100644 --- a/core/services/memcached/src/binary.rs +++ b/core/services/memcached/src/binary.rs @@ -15,13 +15,13 @@ // specific language governing permissions and limitations // under the License. +use crate::core::SocketStream; use opendal_core::raw::*; use opendal_core::*; use tokio::io; use tokio::io::AsyncReadExt; use tokio::io::AsyncWriteExt; use tokio::io::BufReader; -use tokio::net::TcpStream; pub(super) mod constants { pub const OK_STATUS: u16 = 0x0; @@ -60,7 +60,7 @@ pub struct PacketHeader { } impl PacketHeader { - pub async fn write(self, writer: &mut TcpStream) -> io::Result<()> { + pub async fn write(self, writer: &mut SocketStream) -> io::Result<()> { writer.write_u8(self.magic).await?; writer.write_u8(self.opcode).await?; writer.write_u16(self.key_length).await?; @@ -73,7 +73,7 @@ impl PacketHeader { Ok(()) } - pub async fn read(reader: &mut TcpStream) -> Result { + pub async fn read(reader: &mut SocketStream) -> Result { let header = PacketHeader { magic: reader.read_u8().await?, opcode: reader.read_u8().await?, @@ -98,11 +98,11 @@ pub struct Response { #[derive(Debug)] pub struct Connection { - io: BufReader, + io: BufReader, } impl Connection { - pub fn new(io: TcpStream) -> Self { + pub fn new(io: SocketStream) -> Self { Self { io: BufReader::new(io), } @@ -246,7 +246,7 @@ impl Connection { } } -pub async fn parse_response(reader: &mut TcpStream) -> Result { +pub async fn parse_response(reader: &mut SocketStream) -> Result { let header = PacketHeader::read(reader).await.map_err(new_std_io_error)?; if header.vbucket_id_or_status != constants::OK_STATUS diff --git a/core/services/memcached/src/core.rs b/core/services/memcached/src/core.rs index f05b8f48f82a..1221bb08ed7f 100644 --- a/core/services/memcached/src/core.rs +++ b/core/services/memcached/src/core.rs @@ -22,22 +22,104 @@ use fastpool::ObjectStatus; use fastpool::bounded; use opendal_core::raw::*; use opendal_core::*; +use std::io; +use std::net::SocketAddr; +use std::pin::Pin; +use std::task::{Context, Poll}; +use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; use tokio::net::TcpStream; +#[cfg(unix)] +use tokio::net::UnixStream; use super::binary; +#[derive(Debug)] +pub enum SocketStream { + Tcp(TcpStream), + #[cfg(unix)] + Unix(UnixStream), +} + +impl SocketStream { + pub async fn connect_tcp(addr_str: &str) -> io::Result { + let socket_addr: SocketAddr = addr_str + .parse() + .map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))?; + let stream = TcpStream::connect(socket_addr).await?; + Ok(SocketStream::Tcp(stream)) + } + + #[cfg(unix)] + pub async fn connect_unix(path: &str) -> io::Result { + let stream = UnixStream::connect(path).await?; + Ok(SocketStream::Unix(stream)) + } +} + +impl AsyncRead for SocketStream { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll> { + match self.get_mut() { + SocketStream::Tcp(s) => Pin::new(s).poll_read(cx, buf), + #[cfg(unix)] + SocketStream::Unix(s) => Pin::new(s).poll_read(cx, buf), + } + } +} + +impl AsyncWrite for SocketStream { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + match self.get_mut() { + SocketStream::Tcp(s) => Pin::new(s).poll_write(cx, buf), + #[cfg(unix)] + SocketStream::Unix(s) => Pin::new(s).poll_write(cx, buf), + } + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + match self.get_mut() { + SocketStream::Tcp(s) => Pin::new(s).poll_flush(cx), + #[cfg(unix)] + SocketStream::Unix(s) => Pin::new(s).poll_flush(cx), + } + } + + fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + match self.get_mut() { + SocketStream::Tcp(s) => Pin::new(s).poll_shutdown(cx), + #[cfg(unix)] + SocketStream::Unix(s) => Pin::new(s).poll_shutdown(cx), + } + } +} + +/// Endpoint for memcached connection. +#[derive(Clone, Debug)] +pub enum Endpoint { + Tcp(String), // host:port + #[cfg(unix)] + Unix(String), // socket path +} + /// A connection manager for `memcache_async::ascii::Protocol`. #[derive(Clone)] struct MemcacheConnectionManager { - address: String, + endpoint: Endpoint, username: Option, password: Option, } impl MemcacheConnectionManager { - fn new(address: &str, username: Option, password: Option) -> Self { + fn new(endpoint: Endpoint, username: Option, password: Option) -> Self { Self { - address: address.to_string(), + endpoint, username, password, } @@ -48,11 +130,17 @@ impl ManageObject for MemcacheConnectionManager { type Object = binary::Connection; type Error = Error; - /// TODO: Implement unix stream support. async fn create(&self) -> Result { - let conn = TcpStream::connect(&self.address) - .await - .map_err(new_std_io_error)?; + let conn = match &self.endpoint { + Endpoint::Tcp(addr) => SocketStream::connect_tcp(addr) + .await + .map_err(new_std_io_error)?, + #[cfg(unix)] + Endpoint::Unix(path) => SocketStream::connect_unix(path) + .await + .map_err(new_std_io_error)?, + }; + let mut conn = binary::Connection::new(conn); if let (Some(username), Some(password)) = (self.username.as_ref(), self.password.as_ref()) { @@ -81,7 +169,7 @@ pub struct MemcachedCore { impl MemcachedCore { pub fn new( - endpoint: String, + endpoint: Endpoint, username: Option, password: Option, default_ttl: Option, @@ -89,7 +177,7 @@ impl MemcachedCore { ) -> Self { let conn = bounded::Pool::new( bounded::PoolConfig::new(connection_pool_max_size.unwrap_or(10)), - MemcacheConnectionManager::new(endpoint.as_str(), username, password), + MemcacheConnectionManager::new(endpoint, username, password), ); Self { default_ttl, conn } diff --git a/fixtures/memcached/docker-compose-memcached-unix.yml b/fixtures/memcached/docker-compose-memcached-unix.yml new file mode 100644 index 000000000000..41c0ab5ad51c --- /dev/null +++ b/fixtures/memcached/docker-compose-memcached-unix.yml @@ -0,0 +1,37 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +services: + memcached: + image: bitnami/memcached:latest + environment: + # memcache's max item size is 1MiB, But opendal's behavior tests + # will produce larger file. + # + # Specify the setting here to make our test happy. + MEMCACHED_MAX_ITEM_SIZE: 16777216 + # Enable Unix socket with extra flags + # -s: socket path + # -a: socket access mask (0777 for world-readable/writable) + MEMCACHED_EXTRA_FLAGS: "-s /tmp/memcached/memcached.sock -a 0777" + volumes: + - /tmp/memcached:/tmp/memcached + healthcheck: + test: ["CMD-SHELL", "test -S /tmp/memcached/memcached.sock"] + interval: 5s + timeout: 5s + retries: 5