11import { EndpointContext } from '@chainlink/external-adapter-framework/adapter'
2+ import { ResponseCache } from '@chainlink/external-adapter-framework/cache/response'
23import { TransportDependencies } from '@chainlink/external-adapter-framework/transports'
34import { SubscriptionTransport } from '@chainlink/external-adapter-framework/transports/abstract/subscription'
4- import { sleep } from '@chainlink/external-adapter-framework/util'
5- import SftpClient from 'ssh2-sftp-client'
6- import { BaseEndpointTypes } from '../endpoint/sftp'
5+ import { AdapterResponse , makeLogger , sleep } from '@chainlink/external-adapter-framework/util'
6+ import { AdapterInputError } from '@chainlink/external-adapter-framework/validation/error'
7+ import SftpClient , { FileInfo } from 'ssh2-sftp-client'
8+ import { BaseEndpointTypes , IndexResponseData , inputParameters } from '../endpoint/sftp'
9+ import { CSVParserFactory } from '../parsing/factory'
10+ import {
11+ instrumentToFilePathMap ,
12+ instrumentToFileRegexMap ,
13+ isInstrumentSupported ,
14+ } from './constants'
15+
16+ const logger = makeLogger ( 'FTSE SFTP Adapter' )
17+
18+ type RequestParams = typeof inputParameters . validated
19+
20+ interface SftpConnectionConfig {
21+ host : string
22+ port : number
23+ username : string
24+ password : string
25+ readyTimeout : number
26+ }
727
828export class SftpTransport extends SubscriptionTransport < BaseEndpointTypes > {
929 config ! : BaseEndpointTypes [ 'Settings' ]
1030 endpointName ! : string
31+ name ! : string
32+ responseCache ! : ResponseCache < BaseEndpointTypes >
1133 sftpClient : SftpClient
1234
1335 constructor ( ) {
@@ -24,12 +46,156 @@ export class SftpTransport extends SubscriptionTransport<BaseEndpointTypes> {
2446 await super . initialize ( dependencies , adapterSettings , endpointName , transportName )
2547 this . config = adapterSettings
2648 this . endpointName = endpointName
49+ this . name = transportName
50+ this . responseCache = dependencies . responseCache
2751 }
2852
29- async backgroundHandler ( context : EndpointContext < BaseEndpointTypes > ) : Promise < void > {
53+ async backgroundHandler (
54+ context : EndpointContext < BaseEndpointTypes > ,
55+ entries : RequestParams [ ] ,
56+ ) : Promise < void > {
57+ await Promise . all ( entries . map ( async ( param ) => this . handleRequest ( param ) ) )
3058 await sleep ( context . adapterSettings . BACKGROUND_EXECUTE_MS )
3159 }
3260
61+ async handleRequest ( param : RequestParams ) {
62+ let response : AdapterResponse < BaseEndpointTypes [ 'Response' ] >
63+ try {
64+ response = await this . _handleRequest ( param )
65+ } catch ( e ) {
66+ const errorMessage = e instanceof Error ? e . message : 'Unknown error occurred'
67+ response = {
68+ statusCode : 502 ,
69+ errorMessage,
70+ timestamps : {
71+ providerDataRequestedUnixMs : 0 ,
72+ providerDataReceivedUnixMs : 0 ,
73+ providerIndicatedTimeUnixMs : undefined ,
74+ } ,
75+ }
76+ } finally {
77+ try {
78+ await this . sftpClient . end ( )
79+ logger . info ( 'SFTP connection closed' )
80+ } catch ( error ) {
81+ logger . error ( 'Error closing SFTP connection:' , error )
82+ }
83+ }
84+
85+ await this . responseCache . write ( this . name , [ { params : param , response } ] )
86+ }
87+
88+ async _handleRequest (
89+ param : RequestParams ,
90+ ) : Promise < AdapterResponse < BaseEndpointTypes [ 'Response' ] > > {
91+ const providerDataRequestedUnixMs = Date . now ( )
92+
93+ await this . connectToSftp ( )
94+
95+ const parsedData = await this . tryDownloadAndParseFile ( param . instrument )
96+
97+ // Extract the numeric result based on the data type
98+ let result : number
99+ if ( 'gbpIndex' in parsedData ) {
100+ // FTSE data
101+ result = ( parsedData . gbpIndex as number ) ?? 0
102+ } else if ( 'close' in parsedData ) {
103+ // Russell data
104+ result = parsedData . close as number
105+ } else {
106+ throw new Error ( 'Unknown data format received from parser' )
107+ }
108+
109+ logger . info ( `Successfully processed data for instrument: ${ param . instrument } ` )
110+ return {
111+ data : {
112+ result : parsedData ,
113+ } ,
114+ statusCode : 200 ,
115+ result,
116+ timestamps : {
117+ providerDataRequestedUnixMs,
118+ providerDataReceivedUnixMs : Date . now ( ) ,
119+ providerIndicatedTimeUnixMs : undefined ,
120+ } ,
121+ }
122+ }
123+
124+ private async connectToSftp ( ) : Promise < void > {
125+ const connectConfig : SftpConnectionConfig = {
126+ host : this . config . SFTP_HOST ,
127+ port : this . config . SFTP_PORT || 22 ,
128+ username : this . config . SFTP_USERNAME ,
129+ password : this . config . SFTP_PASSWORD ,
130+ readyTimeout : 30000 ,
131+ }
132+
133+ try {
134+ // Create a new client instance to avoid connection state issues
135+ this . sftpClient = new SftpClient ( )
136+ await this . sftpClient . connect ( connectConfig )
137+ logger . info ( 'Successfully connected to SFTP server' )
138+ } catch ( error ) {
139+ logger . error ( error , 'Failed to connect to SFTP server' )
140+ throw new AdapterInputError ( {
141+ statusCode : 500 ,
142+ message : `Failed to connect to SFTP server: ${
143+ error instanceof Error ? error . message : 'Unknown error'
144+ } `,
145+ } )
146+ }
147+ }
148+
149+ private async tryDownloadAndParseFile ( instrument : string ) : Promise < IndexResponseData > {
150+ // Validate that the instrument is supported
151+ if ( ! isInstrumentSupported ( instrument ) ) {
152+ throw new AdapterInputError ( {
153+ statusCode : 400 ,
154+ message : `Unsupported instrument: ${ instrument } ` ,
155+ } )
156+ }
157+
158+ const filePath = instrumentToFilePathMap [ instrument ]
159+ const fileRegex = instrumentToFileRegexMap [ instrument ]
160+
161+ const fileList = await this . sftpClient . list ( filePath )
162+ // Filter files based on the regex pattern
163+ const matchingFiles = fileList
164+ . map ( ( file : FileInfo ) => file . name )
165+ . filter ( ( fileName : string ) => fileRegex . test ( fileName ) )
166+
167+ if ( matchingFiles . length === 0 ) {
168+ throw new AdapterInputError ( {
169+ statusCode : 500 ,
170+ message : `No files matching pattern ${ fileRegex } found in directory: ${ filePath } ` ,
171+ } )
172+ } else if ( matchingFiles . length > 1 ) {
173+ throw new AdapterInputError ( {
174+ statusCode : 500 ,
175+ message : `Multiple files matching pattern ${ fileRegex } found in directory: ${ filePath } .` ,
176+ } )
177+ }
178+ const fullPath = `${ filePath } ${ matchingFiles [ 0 ] } `
179+
180+ // Log the download attempt
181+ logger . info ( `Downloading file: ${ fullPath } ` )
182+
183+ const fileContent = await this . sftpClient . get ( fullPath )
184+ // we need latin1 here because the file contains special characters like "®"
185+ const csvContent = fileContent . toString ( 'latin1' )
186+
187+ const parser = CSVParserFactory . detectParserByInstrument ( instrument )
188+
189+ if ( ! parser ) {
190+ throw new AdapterInputError ( {
191+ statusCode : 500 ,
192+ message : `Parser initialization failed for instrument: ${ instrument } ` ,
193+ } )
194+ }
195+
196+ return ( await parser . parse ( csvContent ) ) as IndexResponseData
197+ }
198+
33199 getSubscriptionTtlFromConfig ( adapterSettings : BaseEndpointTypes [ 'Settings' ] ) : number {
34200 return adapterSettings . BACKGROUND_EXECUTE_MS || 60000
35201 }
0 commit comments