Skip to content

how to read video from IStreamReader? #1150

@horizon86

Description

@horizon86

In the OpenCV type hints (.../miniforge3/envs/main/lib/python3.11/site-packages/cv2/__init__.pyi), the VideoCapture class is defined to accept an IStreamReader object via its constructor:

class IStreamReader:
    # Functions
    def read(self, buffer: str, size: int) -> int: ...

    def seek(self, offset: int, origin: int) -> int: ...

class VideoCapture:
    # Functions
    ...
    @_typing.overload
    def __init__(self, source: IStreamReader, apiPreference: int, params: _typing.Sequence[int]) -> None: ...

However, when I create a subclass of IStreamReader and implement the read() and seek() methods, I receive the following error when initializing VideoCapture:

cv2.error: OpenCV(4.12.0) :-1: error: (-5:Bad argument) in function 'VideoCapture'
> Overload resolution failed:
>  - VideoCapture() takes at most 2 arguments (3 given)
>  - Expected 'filename' to be a str or path-like object
>  - VideoCapture() takes at most 2 arguments (3 given)
>  - Argument 'index' is required to be an integer
>  - Input stream should be derived from io.BufferedIOBase

This error persists even when using BytesIO instead of a subclass of IStreamReader.

You can reproduce the issue with the following code (using a valid URL to download a video):

import cv2
import requests
import time
from typing import Optional
from logging import Logger, getLogger


class URLStreamReader(cv2.IStreamReader):
    """
    URL video stream reader that inherits from cv2.IStreamReader
    Pre-downloads the entire video to memory, then provides streaming access interface
    """
    
    def __init__(self, video_url: str, logger: Optional[Logger] = None):
        """
        Initialize URLStreamReader, pre-download video to memory
        
        Args:
            video_url: URL address of the video
            logger: Logger instance, uses default logger if not provided
        """
        super().__init__()
        self.video_url = video_url
        self.video_data = b''
        self.position = 0
        self.logger = logger or getLogger(__name__)
        self._download_video()
        
    def _download_video(self):
        """Download video to memory buffer"""
        self.logger.info(f"Starting video download: {self.video_url}")
        start_time = time.time()
        
        try:
            response = requests.get(self.video_url, stream=True)
            response.raise_for_status()
            
            chunks = []
            total_size = 0
            for chunk in response.iter_content(chunk_size=8192):
                if chunk:
                    chunks.append(chunk)
                    total_size += len(chunk)
            
            self.video_data = b''.join(chunks)
            download_time = time.time() - start_time
            self.logger.info(f"Video download completed, size: {total_size / 1024 / 1024:.2f}MB, time: {download_time:.2f}s")
            
        except Exception as e:
            self.logger.error(f"Video download failed: {e}")
            raise
    
    def read(self, buffer, size: int) -> int:
        """
        Read video data to buffer
        
        Args:
            buffer: Target buffer (treated as bytes object)
            size: Number of bytes to read
            
        Returns:
            Actual number of bytes read
        """
        if self.position >= len(self.video_data):
            return 0
        
        # Calculate actual readable bytes
        available = len(self.video_data) - self.position
        bytes_to_read = min(size, available)
        
        if bytes_to_read <= 0:
            return 0
        
        # Get data to read
        data = self.video_data[self.position:self.position + bytes_to_read]
        self.position += bytes_to_read
        
        # Try to write data to buffer
        try:
            # If buffer is bytearray or other mutable byte object
            if hasattr(buffer, '__setitem__') and hasattr(buffer, '__len__'):
                # Ensure buffer is large enough
                buffer_len = len(buffer) if hasattr(buffer, '__len__') else size
                copy_len = min(bytes_to_read, buffer_len)
                
                # Copy data to buffer byte by byte
                for i in range(copy_len):
                    buffer[i] = data[i]
                
                return copy_len
            
            # If buffer has write method (file-like object)
            elif hasattr(buffer, 'write'):
                buffer.write(data)
                return bytes_to_read
            
            # If buffer is memoryview
            elif isinstance(buffer, memoryview):
                copy_len = min(bytes_to_read, len(buffer))
                buffer[:copy_len] = data[:copy_len]
                return copy_len
            
            # Other cases, try direct assignment (may not succeed, but worth trying)
            else:
                # In this case, we cannot modify buffer, only return bytes read
                # OpenCV's C++ layer may get data through other mechanisms
                return bytes_to_read
                
        except Exception as e:
            self.logger.warning(f"Cannot write to buffer: {e}, buffer type: {type(buffer)}")
            # Even if write fails, return bytes read
            # OpenCV may get data through other ways
            return bytes_to_read
    
    def seek(self, offset: int, origin: int) -> int:
        """
        Set read position
        
        Args:
            offset: Offset value
            origin: Starting position
                   0 (SEEK_SET): From file beginning
                   1 (SEEK_CUR): From current position
                   2 (SEEK_END): From file end
            
        Returns:
            New position
        """
        if origin == 0:  # SEEK_SET
            new_position = offset
        elif origin == 1:  # SEEK_CUR
            new_position = self.position + offset
        elif origin == 2:  # SEEK_END
            new_position = len(self.video_data) + offset
        else:
            raise ValueError(f"Invalid origin value: {origin}")
        
        # Ensure position is within valid range
        self.position = max(0, min(new_position, len(self.video_data)))
        
        return self.position
# Usage example
if __name__ == "__main__":
    
    # Example URL (please replace with actual video URL)
    video_url = "https://lf26-bot-platform-tos-sign.coze.cn/bot-studio-bot-platform/bot_files/353388233497496/video/mp4/7560697486898741257/upload?lk3s=50ccb0c5&x-expires=1760967080&x-signature=%2FHQ8qDwVP%2Ftoq2xPSKw5mP2mUdM%3D"

    url_reader = URLStreamReader(video_url)
    cap = cv2.VideoCapture(url_reader, cv2.CAP_FFMPEG, None)
    
    if cap.isOpened():
        print(f"Video opened successfully, total frames: {cap.get(cv2.CAP_PROP_FRAME_COUNT)}")
        print(f"Video FPS: {cap.get(cv2.CAP_PROP_FPS)}")

        ret, frame = cap.read()
        if ret:
            print(f"Successfully read first frame, size: {frame.shape}")
    
    cap.release()
    url_reader.close()

Additional Question
I know that passing a URL directly works, but I’d like to know if there’s a way to read and process a video directly from memory (e.g., from a byte buffer or an in-memory stream) without writing it to disk.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions