Skip to content

Asyncstream with compression support (gzip, snappy, bzip2, zstd, parquet, orc)

Notifications You must be signed in to change notification settings

chimpler/asyncstream

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

4 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

asyncstream

UNDER CONSTRUCTION

Simple library to compress/uncompress Async streams using file iterator and readers.

It supports the following compression format:

  • gzip
  • bzip2
  • snappy
  • zstandard
  • parquet (experimental)
  • orc (experimental)

Getting started

Install the library as follows:

pip install asyncstream

Compress a regular file to gzip (examples/simple_compress_gzip.py):

import asyncstream
import asyncio


async def run():
    async with asyncstream.open('samples/animals.txt', 'rb') as fd:
        async with asyncstream.open('samples/animals.txt.gz',  compression='gzip') as gzfd:
            async for line in fd:
                await gzfd.write(line)

if __name__ == '__main__':
    asyncio.run(run())

or you can also open from an async file descriptor using aiofiles (examples/simple_compress_gzip_with_aiofiles.py):

pip install aiofiles

And then run the following code:

import aiofiles
import asyncstream
import asyncio


async def run():
    async with aiofiles.open('examples/animals.txt', 'rb') as fd:
        async with aiofiles.open('/tmp/animals.txt.gz', 'wb') as wfd:
            async with asyncstream.open(wfd, 'wb', compression='gzip') as gzfd:
                async for line in fd:
                    await gzfd.write(line)


if __name__ == '__main__':
    asyncio.run(run())

You can also uncompress an S3 file on the fly using aiobotocore:

pip install aiobotocore

And then run the following code (examples/simple_uncompress_bzip2_from_s3.py):

import aiobotocore
import asyncstream
import asyncio

async def run():
    session = aiobotocore.get_session()
    async with session.create_client('s3') as s3:
        obj = await s3.get_object(Bucket='test-bucket', Key='path/to/file.bz2')
        async with asyncstream.open(obj['Body'], 'rt', compression='bzip2') as fd:
            async for line in fd:
                print(line)
    

if __name__ == '__main__':
    asyncio.run(run())

Convert a gzip file to a snappy file

import asyncstream
import asyncio

async def run():
    async with asyncstream.open('samples/animals.txt.gz', 'rb', compression='gzip') as inc_fd:
        async with asyncstream.open('samples/animals.txt.snappy', 'wb', compression='snappy') as outc_fd:
            async for line in inc_fd:
                await outc_fd.write(line)


if __name__ == '__main__':
    asyncio.run(run())

Use an async reader and writer to filter and update data on the fly

import asyncstream
import asyncio

async def run():
   async with asyncstream.open('/tmp/animals.txt.bz2', 'rb') as in_fd:
       async with asyncstream.open('/tmp/animals.txt.snappy', 'wb') as out_fd:
           async with asyncstream.reader(in_fd) as reader:
               async with asyncstream.writer(out_fd) as writer:
                   async for name, color, age in reader:
                       if color != 'white':
                           await writer.writerow([name, color, age * 2])

asyncio.run(run())

Simple parquet encoding

You will need to install pyarrow and pandas:

pip install pyarrow pandas

To compress using snappy, you can install snappy:

pip install snappy

The code below converts a csv file and convert it to parquet

import asyncstream
import asyncio

async def run():
    async with asyncstream.open('examples/animals.txt', 'rb') as fd:
        async with asyncstream.open('output.parquet', 'wb', encoding='parquet', compression='snappy') as wfd:
            async with asyncstream.writer(wfd) as writer:
                async for line in fd:
                    await writer.write(line)

asyncio.run(run())

Simple parquet decoding

import asyncstream
import asyncio

async def run():
    async with asyncstream.open('output.parquet', 'rb', encoding='parquet') as fd:
            async with asyncstream.reader(fd) as reader:
                async for line in reader:
                    print(line)

asyncio.run(run())

Simple orc encoding

import asyncstream
import asyncio

async def run():
    async with asyncstream.open('examples/animals.txt', 'rb') as fd:
        async with asyncstream.open('output.orc.snappy', 'wb', encoding='orc', compression='snappy') as wfd:
            async with asyncstream.writer(wfd) as writer:
                async for line in fd:
                    await writer.write(line)

asyncio.run(run())

Simple orc decoding

import asyncstream
import asyncio

async def run():
    async with asyncstream.open('output.orc.snappy', 'rb', encoding='orc') as fd:
            async with asyncstream.reader(fd) as reader:
                async for line in reader:
                    print(line)

asyncio.run(run())

Usage

asyncstream.open(afd: Union[str, AsyncBufferedIOBase], mode=None, encoding=None, compression=None, compress_level=-1)

Open an async file (using its filename or an AsyncBufferedIOBase) and compress or decompress on the fly depending on the mode (r or w).

Inputs:

  • mode: r or w and t (text) or b (binary)
  • encoding: None, parquet or orc
  • compression: See the compression supported section
  • compress_level: Optional if compression is used

The AsyncFileObj object returned has the following methods:

  • flush(): used when open in write mode
  • close(): close file descriptor and release resources. It is done automatically when the async with block is exited

asyncstream.reader(afd: AsyncFileObj, columns=Optional[Iterable[str]], column_types=Optional[Iterable[str]], has_header=False, sep=',', eol='\n')

Create an async reader using AsyncFileObj returned by the asyncstream.open method. It must be open in text mode (t).

Inputs:

  • afd: AsyncFileObj created with asyncstream.open
  • columns: optional list of column names to use. If it is not set and has_header is true, then it will use the first row as the column names
  • column_types: optional list of column types (by default it will consider all the columns to be string)
  • has_header: the file has the first line set as header
  • sep: separator between values
  • eol: end of line character

asyncstream.writer(afd: AsyncFileObj, columns: Optional[Iterable[str]] = None, column_types: Optional[Iterable[str]] = None, has_header: bool = True, sep=',', eol='\n')

Create an async writer using AsyncFileObj returned by the asyncstream.open method. It must be open in text mode (t).

Inputs:

  • afd: AsyncFileObj created with asyncstream.open in text mode t and write w
  • columns: optional list of column names to use. If it is not set and has_header is true, then it will use the first row as the column names
  • column_types: optional list of column types (by default it will consider all the columns to be string)
  • has_header: the file has the first line set as header
  • sep: separator between values
  • eol: end of line character

Compression supported

Compression Status
gzip / zlib
bzip2
snappy
zstd

Parquet

Compression Status
none
brotli
bzip2
gzip
snappy
zstd
zlib

Orc

Compression Status
none
bzip2
gzip / zlib
snappy
zlib
zstd

About

Asyncstream with compression support (gzip, snappy, bzip2, zstd, parquet, orc)

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages