Skip to content

Commit

Permalink
Merge branch 'main' into CU-86c11rv16_automated-tests
Browse files Browse the repository at this point in the history
  • Loading branch information
brett-onions committed Nov 21, 2024
2 parents 30ddd51 + e3e5994 commit 735f76f
Show file tree
Hide file tree
Showing 6 changed files with 230 additions and 23 deletions.
2 changes: 1 addition & 1 deletion .env.dev
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ MINIO_ACCESS_KEY=tCroZpZ3usDUcvPM3QT6
MINIO_SECRET_KEY=suVjMHUpVIGyWx8fFJHTiZiT88dHhKgVpzvYTOKK
MINIO_PREFIX=
MINIO_SUFFIX=

MINIO_BUCKET_REGION=us-east-1
# Clickhouse Configuration
CLICKHOUSE_URL=http://localhost:8123
CLICKHOUSE_USER=
Expand Down
2 changes: 2 additions & 0 deletions src/config/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ export const getConfig = () => {
port: process.env.MINIO_PORT ? parseInt(process.env.MINIO_PORT) : 9000,
useSSL: process.env.MINIO_USE_SSL === 'true' ? true : false,
buckets: process.env.MINIO_BUCKETS || 'climate-mediator',
bucket: process.env.MINIO_BUCKET || 'climate-mediator',
bucketRegion: process.env.MINIO_BUCKET_REGION || 'us-east-1',
accessKey: process.env.MINIO_ACCESS_KEY || 'tCroZpZ3usDUcvPM3QT6',
secretKey: process.env.MINIO_SECRET_KEY || 'suVjMHUpVIGyWx8fFJHTiZiT88dHhKgVpzvYTOKK',
prefix: process.env.MINIO_PREFIX || '',
Expand Down
1 change: 1 addition & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,4 @@ app.listen(getConfig().port, () => {
setupMediator(path.resolve(__dirname, './openhim/mediatorConfig.json'));
}
});

80 changes: 67 additions & 13 deletions src/routes/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ import express from 'express';
import multer from 'multer';
import { getConfig } from '../config/config';
import { getCsvHeaders } from '../utils/file-validators';
import { createTable } from '../utils/clickhouse';
import logger from '../logger';

import fs from 'fs';
import path from 'path';
import e from 'express';
import { uploadToMinio } from '../utils/minio';
const routes = express.Router();

const bodySizeLimit = getConfig().bodySizeLimit;
Expand All @@ -15,6 +17,35 @@ const jsonBodyParser = express.json({

const upload = multer({ storage: multer.memoryStorage() });

const saveCsvToTmp = (fileBuffer: Buffer, fileName: string): string => {
const tmpDir = path.join(process.cwd(), 'tmp');

// Create tmp directory if it doesn't exist
if (!fs.existsSync(tmpDir)) {
fs.mkdirSync(tmpDir);
}

const fileUrl = path.join(tmpDir, fileName);
fs.writeFileSync(fileUrl, fileBuffer);
logger.info(`fileUrl: ${fileUrl}`);

return fileUrl;
};

const isValidFileType = (file: Express.Multer.File): boolean => {
const validMimeTypes = ['text/csv', 'application/json'];
return validMimeTypes.includes(file.mimetype);
};

function validateJsonFile(buffer: Buffer): boolean {
try {
JSON.parse(buffer.toString());
return true;
} catch {
return false;
}
}

routes.post('/upload', upload.single('file'), async (req, res) => {
const file = req.file;
const bucket = req.query.bucket;
Expand All @@ -29,21 +60,44 @@ routes.post('/upload', upload.single('file'), async (req, res) => {
return res.status(400).send('No bucket provided');
}

const headers = getCsvHeaders(file.buffer);

if (!headers) {
return res.status(400).send('Invalid file type, please upload a valid CSV file');
if (!isValidFileType(file)) {
logger.error(`Invalid file type: ${file.mimetype}`);
return res.status(400).send('Invalid file type. Please upload either a CSV or JSON file');
}

const tableCreated = await createTable(headers, bucket as string);
// For CSV files, validate headers
if (file.mimetype === 'text/csv') {
const headers = getCsvHeaders(file.buffer);
if (!headers) {
return res.status(400).send('Invalid CSV file format');
}
const fileUrl = saveCsvToTmp(file.buffer, file.originalname);
try {
const uploadResult = await uploadToMinio(fileUrl, file.originalname, bucket as string, file.mimetype);
// Clean up the temporary file
fs.unlinkSync(fileUrl);

if (!tableCreated) {
return res
.status(500)
.send('Failed to create table, please check csv or use another name for the bucket');
}
if (uploadResult) {
return res.status(201).send(`File ${file.originalname} uploaded in bucket ${bucket}`);
} else {
return res.status(400).send(`Object ${file.originalname} already exists in bucket ${bucket}`);
}
} catch (error) {
// Clean up the temporary file in case of error
fs.unlinkSync(fileUrl);
logger.error('Error uploading file to Minio:', error);
return res.status(500).send('Error uploading file');
}
} else if (file.mimetype === 'application/json') {
if (!validateJsonFile(file.buffer)) {
return res.status(400).send('Invalid JSON file format');
}

return res.status(201).send('File uploaded successfully');
return res.status(200).send('JSON file is valid - Future implementation');
} else {
return res.status(400).send('Invalid file type. Please upload either a CSV or JSON file');
}

});

export default routes;
45 changes: 41 additions & 4 deletions src/utils/clickhouse.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,14 @@ export async function createTable(fields: string[], tableName: string) {
}

try {
console.debug(`Creating table ${normalizedTableName} with fields ${fields.join(', ')}`);
logger.debug(`Creating table ${normalizedTableName} with fields ${fields.join(', ')}`);
const result = await client.query({
query: generateDDL(fields, normalizedTableName),
});
console.log('Table created successfully');
logger.info(`Table ${normalizedTableName} created successfully`);
} catch (error) {
console.log('Error checking/creating table');
console.error(error);
logger.error(`Error checking/creating table ${normalizedTableName}`);
logger.debug(JSON.stringify(error));
return false;
}

Expand Down Expand Up @@ -68,3 +68,40 @@ export function flattenJson(json: any, prefix = ''): string[] {
const fieldsSet = new Set(fields);
return Array.from(fieldsSet);
}

export async function insertFromS3(tableName: string, s3Path: string, s3Config: {
accessKey: string,
secretKey: string
}) {
logger.info(`Inside the insertFromS3 function`);
const client = createClient({
url,
password,
});
logger.info(`s3Path: ${s3Path}`);
const normalizedTableName = tableName.replace(/-/g, '_');

try {
logger.debug(`Inserting data into ${normalizedTableName} from ${s3Path}`);
const query = `
INSERT INTO \`default\`.${normalizedTableName}
SELECT * FROM s3(
'${s3Path}',
'${s3Config.accessKey}',
'${s3Config.secretKey}',
'CSVWithNames'
)
`;
logger.debug(`Query: ${query}`);
await client.query({ query });
logger.info(`Successfully inserted data into ${normalizedTableName}`);
return true;
} catch (error) {
logger.error('Error inserting data from S3');
logger.error(error);
return false;
} finally {
await client.close();
}
}

123 changes: 118 additions & 5 deletions src/utils/minio.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,111 @@ import * as Minio from 'minio';
import { getConfig } from '../config/config';
import logger from '../logger';
import { readFile, rm } from 'fs/promises';
import { createTable } from './clickhouse';
import { createTable, insertFromS3 } from './clickhouse';
import { validateJsonFile, getCsvHeaders } from './file-validators';

export async function createMinioBucketListeners() {
const { buckets, endPoint, port, useSSL, accessKey, secretKey, prefix, suffix } =
getConfig().minio;
const { endPoint, port, useSSL, bucketRegion, accessKey, secretKey, prefix, suffix, buckets } =
getConfig().minio;

/**
* Uploads a file to Minio storage
* @param {string} sourceFile - Path to the file to upload
* @param {string} destinationObject - Name for the uploaded object
* @param {Object} [customMetadata={}] - Optional custom metadata
* @returns {Promise<void>}
*/
export async function uploadToMinio(
sourceFile: string,
destinationObject: string,
bucket: string,
fileType: string,
customMetadata = {}
) {
const minioClient = new Minio.Client({
endPoint,
port,
useSSL,
accessKey,
secretKey,
});
// Check if bucket exists, create if it doesn't
const exists = await minioClient.bucketExists(bucket);
if (!exists) {
await minioClient.makeBucket(bucket, bucketRegion);
logger.info(`Bucket ${bucket} created in "${bucketRegion}".`);
}

try {
const fileExists = await checkFileExists(destinationObject, bucket, fileType);
if (fileExists) {
return false;
} else {
const metaData = {
'Content-Type': fileType,
'X-Upload-Id': crypto.randomUUID(),
...customMetadata,
};

// Upload the file
await minioClient.fPutObject(bucket, destinationObject, sourceFile, metaData);
logger.info(
`File ${sourceFile} uploaded as object ${destinationObject} in bucket ${bucket}`
);
return true;
}
} catch (error) {
console.error('Error checking file:', error);
}
}

/**
* Checks if a CSV file exists in the specified Minio bucket
* @param {string} fileName - Name of the CSV file to check
* @param {string} bucket - Bucket name
* @returns {Promise<boolean>} - Returns true if file exists, false otherwise
*/
export async function checkFileExists(
fileName: string,
bucket: string,
fileType: string
): Promise<boolean> {
const minioClient = new Minio.Client({
endPoint,
port,
useSSL,
accessKey,
secretKey,
});

try {
// Check if bucket exists first
const bucketExists = await minioClient.bucketExists(bucket);
if (!bucketExists) {
logger.info(`Bucket ${bucket} does not exist`);
return false;
}

// Get object stats to check if file exists
const stats = await minioClient.statObject(bucket, fileName); // Optionally verify it's a CSV file by checking Content-Type
if (stats.metaData && stats.metaData['content-type'] === fileType) {
logger.info(`File ${fileName} exists in bucket ${bucket}`);
return true;
} else {
logger.info(`File ${fileName} does not exist in bucket ${bucket}`);
return false;
}
} catch (err: any) {
if (err.code === 'NotFound') {
logger.debug(`File ${fileName} not found in bucket ${bucket}`);
return false;
}
// For any other error, log it and rethrow
logger.error(`Error checking file existence: ${err.message}`);
throw err;
}
}

export async function createMinioBucketListeners() {
const minioClient = new Minio.Client({
endPoint,
port,
Expand Down Expand Up @@ -36,12 +134,15 @@ export async function createMinioBucketListeners() {
logger.debug(`Listening for notifications on bucket ${bucket}`);

listener.on('notification', async (notification) => {

//@ts-ignore
const file = notification.s3.object.key;

//@ts-ignore
const tableName = notification.s3.bucket.name;

logger.info(`File received: ${file} from bucket ${tableName}`);

//@ts-ignore
minioClient.fGetObject(bucket, file, `tmp/${file}`, async (err) => {
if (err) {
Expand All @@ -63,10 +164,22 @@ export async function createMinioBucketListeners() {
const fields = (await readFile(`tmp/${file}`, 'utf8')).split('\n')[0].split(',');

await createTable(fields, tableName);

// If running locally and using docker compose, the minio host is 'minio'. This is to allow clickhouse to connect to the minio server
const host = getConfig().runningMode === 'testing' ? 'minio' : endPoint;
// Construct the S3-style URL for the file
const minioUrl = `http://${host}:${port}/${bucket}/${file}`;

// Insert data into clickhouse
await insertFromS3(tableName, minioUrl, {
accessKey,
secretKey,
});
} else {
logger.warn(`Unknown file type - ${extension}`);
}
await rm(`tmp/${file}`);
logger.debug(`File ${file} deleted from tmp directory`);
}
});
});
Expand Down

0 comments on commit 735f76f

Please sign in to comment.