Skip to content

Commit

Permalink
Treat all binary types equally throughout transport (#171)
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshMock authored Oct 24, 2024
1 parent bd15309 commit b858063
Show file tree
Hide file tree
Showing 7 changed files with 74 additions and 22 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@elastic/transport",
"version": "8.9.0",
"version": "8.9.1",
"description": "Transport classes and utilities shared among Node.js Elastic client libraries",
"main": "./index.js",
"types": "index.d.ts",
Expand Down
14 changes: 2 additions & 12 deletions src/Transport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import {
ErrorOptions
} from './errors'
import { Connection, ConnectionRequestParams } from './connection'
import { isBinary } from './connection/BaseConnection'
import Diagnostic from './Diagnostic'
import Serializer from './Serializer'
import { Readable as ReadableStream } from 'node:stream'
Expand Down Expand Up @@ -556,18 +557,7 @@ export default class Transport {
body = await unzip(body)
}

const binaryTypes = [
'application/vnd.mapbox-vector-tile',
'application/vnd.apache.arrow.stream',
'application/vnd.elasticsearch+arrow+stream',
'application/smile',
'application/vnd.elasticsearch+smile',
'application/cbor',
'application/vnd.elasticsearch+cbor'
]
const contentType = headers['content-type'] ?? ''
const isBinary = binaryTypes.map(type => contentType.includes(type)).includes(true)
if (Buffer.isBuffer(body) && !isBinary) {
if (Buffer.isBuffer(body) && !isBinary(headers['content-type'] ?? '')) {
body = body.toString()
}

Expand Down
16 changes: 16 additions & 0 deletions src/connection/BaseConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -248,3 +248,19 @@ export function isCaFingerprintMatch (cert1: string | null, cert2: string | null
}
return cert1 === cert2
}

export function isBinary (contentType: string | string[]): boolean {
const binaryTypes = [
'application/vnd.mapbox-vector-tile',
'application/vnd.apache.arrow.stream',
'application/vnd.elasticsearch+arrow+stream',
'application/smile',
'application/vnd.elasticsearch+smile',
'application/cbor',
'application/vnd.elasticsearch+cbor'
]

return binaryTypes
.map(type => contentType.includes(type))
.includes(true)
}
13 changes: 7 additions & 6 deletions src/connection/HttpConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ import BaseConnection, {
ConnectionRequestResponse,
ConnectionRequestResponseAsStream,
getIssuerCertificate,
isCaFingerprintMatch
isCaFingerprintMatch,
isBinary
} from './BaseConnection'
import { kCaFingerprint } from '../symbols'
import { Readable as ReadableStream, pipeline } from 'node:stream'
Expand Down Expand Up @@ -147,7 +148,7 @@ export default class HttpConnection extends BaseConnection {

const contentEncoding = (response.headers['content-encoding'] ?? '').toLowerCase()
const isCompressed = contentEncoding.includes('gzip') || contentEncoding.includes('deflate')
const isVectorTile = (response.headers['content-type'] ?? '').includes('application/vnd.mapbox-vector-tile')
const bodyIsBinary = isBinary(response.headers['content-type'] ?? '')

/* istanbul ignore else */
if (response.headers['content-length'] !== undefined) {
Expand All @@ -167,8 +168,8 @@ export default class HttpConnection extends BaseConnection {

// if the response is compressed, we must handle it
// as buffer for allowing decompression later
let payload = isCompressed || isVectorTile ? new Array<Buffer>() : ''
const onData = isCompressed || isVectorTile ? onDataAsBuffer : onDataAsString
let payload = isCompressed || bodyIsBinary ? new Array<Buffer>() : ''
const onData = isCompressed || bodyIsBinary ? onDataAsBuffer : onDataAsString

let currentLength = 0
function onDataAsBuffer (chunk: Buffer): void {
Expand Down Expand Up @@ -208,13 +209,13 @@ export default class HttpConnection extends BaseConnection {
}

resolve({
body: isCompressed || isVectorTile ? Buffer.concat(payload as Buffer[]) : payload as string,
body: isCompressed || bodyIsBinary ? Buffer.concat(payload as Buffer[]) : payload as string,
statusCode: response.statusCode as number,
headers: response.headers
})
}

if (!isCompressed && !isVectorTile) {
if (!isCompressed && !bodyIsBinary) {
response.setEncoding('utf8')
}

Expand Down
7 changes: 4 additions & 3 deletions src/connection/UndiciConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ import BaseConnection, {
ConnectionRequestResponse,
ConnectionRequestResponseAsStream,
getIssuerCertificate,
isCaFingerprintMatch
isCaFingerprintMatch,
isBinary
} from './BaseConnection'
import { Pool, buildConnector, Dispatcher } from 'undici'
import {
Expand Down Expand Up @@ -182,7 +183,7 @@ export default class Connection extends BaseConnection {
// @ts-expect-error Assume header is not string[] for now.
const contentEncoding = (response.headers['content-encoding'] ?? '').toLowerCase()
const isCompressed = contentEncoding.includes('gzip') || contentEncoding.includes('deflate') // eslint-disable-line
const isVectorTile = (response.headers['content-type'] ?? '').includes('application/vnd.mapbox-vector-tile')
const bodyIsBinary = isBinary(response.headers['content-type'] ?? '')

/* istanbul ignore else */
if (response.headers['content-length'] !== undefined) {
Expand All @@ -198,7 +199,7 @@ export default class Connection extends BaseConnection {

this.diagnostic.emit('deserialization', null, options)
try {
if (isCompressed || isVectorTile) { // eslint-disable-line
if (isCompressed || bodyIsBinary) { // eslint-disable-line
let currentLength = 0
const payload: Buffer[] = []
for await (const chunk of response.body) {
Expand Down
22 changes: 22 additions & 0 deletions test/unit/http-connection.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1113,6 +1113,28 @@ test('Support mapbox vector tile', async t => {
server.stop()
})

test('Support Apache Arrow', async t => {
t.plan(1)

const binaryContent = '/////zABAAAQAAAAAAAKAA4ABgANAAgACgAAAAAABAAQAAAAAAEKAAwAAAAIAAQACgAAAAgAAAAIAAAAAAAAAAIAAAB8AAAABAAAAJ7///8UAAAARAAAAEQAAAAAAAoBRAAAAAEAAAAEAAAAjP///wgAAAAQAAAABAAAAGRhdGUAAAAADAAAAGVsYXN0aWM6dHlwZQAAAAAAAAAAgv///wAAAQAEAAAAZGF0ZQAAEgAYABQAEwASAAwAAAAIAAQAEgAAABQAAABMAAAAVAAAAAAAAwFUAAAAAQAAAAwAAAAIAAwACAAEAAgAAAAIAAAAEAAAAAYAAABkb3VibGUAAAwAAABlbGFzdGljOnR5cGUAAAAAAAAAAAAABgAIAAYABgAAAAAAAgAGAAAAYW1vdW50AAAAAAAA/////7gAAAAUAAAAAAAAAAwAFgAOABUAEAAEAAwAAABgAAAAAAAAAAAABAAQAAAAAAMKABgADAAIAAQACgAAABQAAABYAAAABQAAAAAAAAAAAAAABAAAAAAAAAAAAAAAAQAAAAAAAAAIAAAAAAAAACgAAAAAAAAAMAAAAAAAAAABAAAAAAAAADgAAAAAAAAAKAAAAAAAAAAAAAAAAgAAAAUAAAAAAAAAAAAAAAAAAAAFAAAAAAAAAAAAAAAAAAAAHwAAAAAAAAAAAACgmZkTQAAAAGBmZiBAAAAAAAAAL0AAAADAzMwjQAAAAMDMzCtAHwAAAAAAAADV6yywkgEAANWPBquSAQAA1TPgpZIBAADV17mgkgEAANV7k5uSAQAA/////wAAAAA='

function handler (req: http.IncomingMessage, res: http.ServerResponse) {
res.setHeader('Content-Type', 'application/vnd.apache.arrow.stream')
res.end(Buffer.from(binaryContent, 'base64'))
}

const [{ port }, server] = await buildServer(handler)
const connection = new HttpConnection({
url: new URL(`http://localhost:${port}`)
})
const res = await connection.request({
path: '/_query',
method: 'POST',
}, options)
t.equal(res.body.toString('base64'), Buffer.from(binaryContent, 'base64').toString('base64'))
server.stop()
})

test('Check server fingerprint (success)', async t => {
t.plan(1)

Expand Down
22 changes: 22 additions & 0 deletions test/unit/undici-connection.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -983,6 +983,28 @@ test('Support mapbox vector tile', async t => {
server.stop()
})

test('Support Apache Arrow', async t => {
t.plan(1)

const binaryContent = '/////zABAAAQAAAAAAAKAA4ABgANAAgACgAAAAAABAAQAAAAAAEKAAwAAAAIAAQACgAAAAgAAAAIAAAAAAAAAAIAAAB8AAAABAAAAJ7///8UAAAARAAAAEQAAAAAAAoBRAAAAAEAAAAEAAAAjP///wgAAAAQAAAABAAAAGRhdGUAAAAADAAAAGVsYXN0aWM6dHlwZQAAAAAAAAAAgv///wAAAQAEAAAAZGF0ZQAAEgAYABQAEwASAAwAAAAIAAQAEgAAABQAAABMAAAAVAAAAAAAAwFUAAAAAQAAAAwAAAAIAAwACAAEAAgAAAAIAAAAEAAAAAYAAABkb3VibGUAAAwAAABlbGFzdGljOnR5cGUAAAAAAAAAAAAABgAIAAYABgAAAAAAAgAGAAAAYW1vdW50AAAAAAAA/////7gAAAAUAAAAAAAAAAwAFgAOABUAEAAEAAwAAABgAAAAAAAAAAAABAAQAAAAAAMKABgADAAIAAQACgAAABQAAABYAAAABQAAAAAAAAAAAAAABAAAAAAAAAAAAAAAAQAAAAAAAAAIAAAAAAAAACgAAAAAAAAAMAAAAAAAAAABAAAAAAAAADgAAAAAAAAAKAAAAAAAAAAAAAAAAgAAAAUAAAAAAAAAAAAAAAAAAAAFAAAAAAAAAAAAAAAAAAAAHwAAAAAAAAAAAACgmZkTQAAAAGBmZiBAAAAAAAAAL0AAAADAzMwjQAAAAMDMzCtAHwAAAAAAAADV6yywkgEAANWPBquSAQAA1TPgpZIBAADV17mgkgEAANV7k5uSAQAA/////wAAAAA='

function handler (_req: http.IncomingMessage, res: http.ServerResponse) {
res.setHeader('Content-Type', 'application/vnd.apache.arrow.stream')
res.end(Buffer.from(binaryContent, 'base64'))
}

const [{ port }, server] = await buildServer(handler)
const connection = new UndiciConnection({
url: new URL(`http://localhost:${port}`)
})
const res = await connection.request({
path: '/_query',
method: 'POST',
}, options)
t.equal(res.body.toString('base64'), Buffer.from(binaryContent, 'base64').toString('base64'))
server.stop()
})

test('Check server fingerprint (success)', async t => {
t.plan(1)

Expand Down

0 comments on commit b858063

Please sign in to comment.