Skip to content

Commit

Permalink
ft: multiple buckets and mediator registration
Browse files Browse the repository at this point in the history
  • Loading branch information
brett-onions committed Nov 20, 2024
1 parent 9ea8276 commit bec35fc
Show file tree
Hide file tree
Showing 5 changed files with 83 additions and 80 deletions.
2 changes: 1 addition & 1 deletion .env.dev
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ BODY_SIZE_LIMIT=50mb
MINIO_ENDPOINT=localhost
MINIO_PORT=9000
MINIO_USE_SSL=false
MINIO_BUCKET=climate-mediator
MINIO_BUCKETS=climate-mediator
MINIO_ACCESS_KEY=tCroZpZ3usDUcvPM3QT6
MINIO_SECRET_KEY=suVjMHUpVIGyWx8fFJHTiZiT88dHhKgVpzvYTOKK
MINIO_PREFIX=
Expand Down
2 changes: 1 addition & 1 deletion src/config/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ export const getConfig = () => {
endPoint: process.env.MINIO_ENDPOINT || 'localhost',
port: process.env.MINIO_PORT ? parseInt(process.env.MINIO_PORT) : 9000,
useSSL: process.env.MINIO_USE_SSL === 'true' ? true : false,
bucket: process.env.MINIO_BUCKET || 'climate-mediator',
buckets: process.env.MINIO_BUCKETS || 'climate-mediator',
accessKey: process.env.MINIO_ACCESS_KEY || 'tCroZpZ3usDUcvPM3QT6',
secretKey: process.env.MINIO_SECRET_KEY || 'suVjMHUpVIGyWx8fFJHTiZiT88dHhKgVpzvYTOKK',
prefix: process.env.MINIO_PREFIX || '',
Expand Down
84 changes: 8 additions & 76 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,91 +1,23 @@
import express from 'express';
import * as Minio from 'minio';
import path from 'path';
import { getConfig } from './config/config';
import logger from './logger';
import routes from './routes/index';
import { setupMediator } from './openhim/openhim';
import { validateJsonFile, getCsvHeaders } from './utils/file-validators';
import { readFile, rm } from 'fs/promises';
import { createTable, flattenJson } from './utils/clickhouse';
import { setupMinio } from './utils/minio';

const app = express();

app.use('/', routes);
const prefix = getConfig().runningMode === 'testing' ? '/' : '/climate';

if (getConfig().runningMode !== 'testing') {
app.listen(getConfig().port, () => {
logger.info(`Server is running on port - ${getConfig().port}`);
app.use(prefix, routes);

if (getConfig().registerMediator) {
setupMediator(path.resolve(__dirname, './openhim/mediatorConfig.json'));
}
});
}

async function setupMinio() {
const { bucket, endPoint, port, useSSL, accessKey, secretKey, prefix, suffix } =
getConfig().minio;
setupMinio();

const minioClient = new Minio.Client({
endPoint,
port,
useSSL,
accessKey,
secretKey,
});
app.listen(getConfig().port, () => {
logger.info(`Server is running on port - ${getConfig().port}`);

try {
// Test connection by attempting to list buckets
await minioClient.listBuckets();
logger.info(`Successfully connected to Minio at ${endPoint}:${port}/${bucket}`);
} catch (error) {
logger.error(`Failed to connect to Minio: ${error}`);
throw error;
if (getConfig().runningMode !== 'testing' && getConfig().registerMediator) {
setupMediator(path.resolve(__dirname, './openhim/mediatorConfig.json'));
}
const listener = minioClient.listenBucketNotification(bucket, prefix, suffix, [
's3:ObjectCreated:*',
]);

listener.on('notification', async (notification) => {
//@ts-ignore
const file = notification.s3.object.key;

//@ts-ignore
const tableName = notification.s3.bucket.name;

//@ts-ignore
minioClient.fGetObject(bucket, file, `tmp/${file}`, async (err) => {
if (err) {
logger.error(err);
} else {
const fileBuffer = await readFile(`tmp/${file}`);

//get the file extension
const extension = file.split('.').pop();
logger.info(`File Downloaded - Type: ${extension}`);

if (extension === 'json' && validateJsonFile(fileBuffer)) {
// flatten the json and pass it to clickhouse
//const fields = flattenJson(JSON.parse(fileBuffer.toString()));
//await createTable(fields, tableName);
logger.warn(`File type not currently supported- ${extension}`);
} else if (extension === 'csv' && getCsvHeaders(fileBuffer)) {
//get the first line of the csv file
const fields = (await readFile(`tmp/${file}`, 'utf8')).split('\n')[0].split(',');

await createTable(fields, tableName);
} else {
logger.warn(`Unknown file type - ${extension}`);
}
await rm(`tmp/${file}`);
}
});
});
}

setupMinio();

app.listen(3000, () => {
logger.info(`Server is running on port - ${3000}`);
});
3 changes: 1 addition & 2 deletions src/openhim/mediatorConfig.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@
"host": "climate-mediator",
"port": "3000",
"primary": true,
"type": "http",
"path": "/climate/upload"
"type": "http"
}
],
"allow": [
Expand Down
72 changes: 72 additions & 0 deletions src/utils/minio.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import * as Minio from 'minio';
import { getConfig } from '../config/config';
import logger from '../logger';
import { readFile, rm } from 'fs/promises';
import { createTable } from './clickhouse';
import { validateJsonFile, getCsvHeaders } from './file-validators';

export async function setupMinio() {
const { buckets, endPoint, port, useSSL, accessKey, secretKey, prefix, suffix } =
getConfig().minio;

const minioClient = new Minio.Client({
endPoint,
port,
useSSL,
accessKey,
secretKey,
});

try {
// Test connection by attempting to list buckets
await minioClient.listBuckets();
logger.info(`Successfully connected to Minio at ${endPoint}:${port}`);
} catch (error) {
logger.error(`Failed to connect to Minio: ${error}`);
throw error;
}

const listOfBuckets = buckets.split(',');

for (const bucket of listOfBuckets) {
const listener = minioClient.listenBucketNotification(bucket, prefix, suffix, [
's3:ObjectCreated:*',
]);

listener.on('notification', async (notification) => {
//@ts-ignore
const file = notification.s3.object.key;

//@ts-ignore
const tableName = notification.s3.bucket.name;

//@ts-ignore
minioClient.fGetObject(bucket, file, `tmp/${file}`, async (err) => {
if (err) {
logger.error(err);
} else {
const fileBuffer = await readFile(`tmp/${file}`);

//get the file extension
const extension = file.split('.').pop();
logger.info(`File Downloaded - Type: ${extension}`);

if (extension === 'json' && validateJsonFile(fileBuffer)) {
// flatten the json and pass it to clickhouse
//const fields = flattenJson(JSON.parse(fileBuffer.toString()));
//await createTable(fields, tableName);
logger.warn(`File type not currently supported- ${extension}`);
} else if (extension === 'csv' && getCsvHeaders(fileBuffer)) {
//get the first line of the csv file
const fields = (await readFile(`tmp/${file}`, 'utf8')).split('\n')[0].split(',');

await createTable(fields, tableName);
} else {
logger.warn(`Unknown file type - ${extension}`);
}
await rm(`tmp/${file}`);
}
});
});
}
}

0 comments on commit bec35fc

Please sign in to comment.