Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 14 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,26 +28,31 @@ npm run server

### Usage

1. Open Grafana at http://localhost:3000.
1. By default, Grafana listens to `0.0.0.0:3000`. Open `http://<GRAFANA_IP>:3000` in the browser.

2. Connect the datasource to a CLP API server:
- Navigate to **Connections > Data sources > CLP**.
- Navigate to **Connections > Data sources > CLP** on the left side panel.
- Enter the API server URL (e.g., `http://<CLP_API_SERVER_HOST>:<PORT>`).
- Click **Save & test** to verify connectivity.

3. Build a dashboard:
- Add a visualization and choose **CLP** as the datasource.
3. Explore log data:
- Click **Explore** on the left side panel.
- Configure your query in the query editor:
- **Dataset**: the dataset to search (defaults to `default`).
- **Query Text**: the search query string.
- **Ignore Case**: whether to perform a case-insensitive search.
- **Max Results**: the maximum number of results to return.
- Set the desired time range and click **Run query** in the top right.
- The results will be displayed.

4. Build a dashboard:
- Add a visualization and choose **CLP** as the datasource.
- Configure your query in the query editor.
- Set the desired time range and click **Refresh** to run the query.
- To view results in the Logs panel:
1. Select the **Logs** visualization in the top right.
2. Add an **Extract fields** transformation and choose **JSON** as the format.
3. Add a **Convert field type** transformation to convert your timestamp field to the
**Time** type.
- To view results in the Logs panel, select the **Logs** visualization in the top right.
- To enable Grafana log level detection:
1. Add an **Extract fields** transformation and choose **JSON** as the format.
2. Extract the JSON field path for the log level into a field named `severity`.

## Test the plugin in an existing Grafana deployment

Expand Down
61 changes: 51 additions & 10 deletions src/datasource.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import {
} from '@grafana/data';

import { SearchQuery, ClpDataSourceOptions, DEFAULT_QUERY } from './types';
import { Observable, forkJoin, lastValueFrom } from 'rxjs';
import { Observable, forkJoin, zip, lastValueFrom } from 'rxjs';
import { map, switchMap, reduce } from 'rxjs/operators';
import { createParser, type EventSourceMessage, type ParseError } from 'eventsource-parser';

Expand Down Expand Up @@ -99,8 +99,26 @@ export class DataSource extends DataSourceApi<SearchQuery, ClpDataSourceOptions>
);
}

#fetchTimestampColumnNames(dataset: string): Observable<string[]> {
return getBackendSrv()
.fetch<string[]>({
url: `${this.baseUrl}/column_metadata/${dataset}/timestamp`,
method: 'GET',
})
.pipe(map((response) => response.data));
}

#extractField(message: unknown, columnName: string): unknown {
const fieldPath = columnName.split(/(?<!\\)\./).map((s) => s.replace(/\\\./g, '.'));
let current = message;
for (const segment of fieldPath) {
current = (current as Record<string, unknown>)[segment];
}
return current;
}

query(options: DataQueryRequest<SearchQuery>): Observable<DataQueryResponse> {
const observables = options.targets.map((target) =>
const queryResultsObservables = options.targets.map((target) =>
this.#submitQuery(target, options.range).pipe(
switchMap((uri) => {
const searchJobId = uri.split('/').pop()!;
Expand All @@ -126,25 +144,48 @@ export class DataSource extends DataSourceApi<SearchQuery, ClpDataSourceOptions>
}
};
});
}),
map((dataBuffer) => ({ target, dataBuffer }))
})
)
);

return forkJoin(observables).pipe(
map((results) => ({
data: results.map(({ target, dataBuffer }) => {
const timestampColumnNamesObservables = options.targets.map((target) =>
this.#fetchTimestampColumnNames(target.dataset ?? 'default')
);

const dataframeObservables = options.targets.map((target, i) =>
zip(timestampColumnNamesObservables[i], queryResultsObservables[i]).pipe(
map(([timestampColumnNames, dataBuffer]) => {
const fields = [];

const values = target.maxNumResults ? dataBuffer.slice(0, target.maxNumResults) : dataBuffer;
fields.push({ name: 'body', values, type: FieldType.string });

const [timestampColumnName] = timestampColumnNames;
if ('undefined' !== typeof timestampColumnName) {
const parsedValues = values.map((line) => JSON.parse(line));
const timestamps = parsedValues.map((parsedValue) => {
try {
return this.#extractField(parsedValue, timestampColumnName);
} catch (err: unknown) {
return null;
}
});
fields.push({ name: 'timestamp', values: timestamps, type: FieldType.time });
}

return createDataFrame({
refId: target.refId,
fields: [{ name: target.refId, values, type: FieldType.string }],
fields: fields,
meta: {
type: DataFrameType.LogLines,
preferredVisualisationType: 'logs',
},
});
}),
}))
})
)
);

return forkJoin(dataframeObservables).pipe(map((data) => ({ data })));
}

async testDatasource() {
Expand Down