Skip to content

Commit bbec794

Browse files
author
flowcore-platform
committed
feat(index): ✨ Add ingest tool for event ingestion with API key support
1 parent 0f5c59b commit bbec794

File tree

3 files changed

+84
-0
lines changed

3 files changed

+84
-0
lines changed

src/index.ts

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import {
2525
getFlowTypeHandler,
2626
getTenantHandler,
2727
getTimeBucketsHandler,
28+
ingestHandler,
2829
listDataCoresHandler,
2930
listEventTypesHandler,
3031
listFlowTypesHandler,
@@ -47,6 +48,7 @@ const { values, positionals } = parseArgs({
4748
options: {
4849
serviceAccountId: { type: "string" },
4950
serviceAccountKey: { type: "string" },
51+
apiKey: { type: "string", optional: true },
5052
},
5153
allowPositionals: true,
5254
})
@@ -56,6 +58,10 @@ if (positionals.length > 0) {
5658
console.warn(`Warning: Unexpected positional arguments: ${positionals.join(", ")}`)
5759
}
5860

61+
if (!values.apiKey) {
62+
console.warn("Ingestion will be disabled because no API key was provided")
63+
}
64+
5965
const serviceAccountId = values.serviceAccountId as string
6066
const serviceAccountKey = values.serviceAccountKey as string
6167

@@ -195,6 +201,23 @@ server.tool(
195201
)
196202

197203
// Write tools
204+
if (values.apiKey) {
205+
server.tool(
206+
"ingest",
207+
"Ingest events into an event type. This is useful for ingesting events into an event type, and then using the get_events tool to get the events for a specific time bucket. The events are stored in time buckets, and can be fetched by using the get_time_buckets tool. When you fetch events from a time bucket, you can use the cursor to paginate through the events. When ingesting events, you can ingest a single event or an array of events. The format of the event can be anything you want, as long as it's valid JSON. It will be the payload of the event.",
208+
{
209+
tenant: z.string().describe("The tenant name to ingest events for"),
210+
dataCoreId: z.string().describe("The data core ID to ingest events for"),
211+
flowTypeName: z.string().describe("The flow type name to ingest events for"),
212+
eventTypeName: z.string().describe("The event type name to ingest events for"),
213+
events: z
214+
.union([z.array(z.unknown()), z.unknown()])
215+
.describe("The events to ingest in the format of an array of events or a single event"),
216+
},
217+
// biome-ignore lint/suspicious/noExplicitAny: <explanation>
218+
ingestHandler(values.apiKey as string) as any,
219+
)
220+
}
198221
server.tool(
199222
"create_data_core",
200223
"Create a data core in a tenant",

src/tools/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ export * from "./get-events"
88
export * from "./get-flow-type"
99
export * from "./get-tenant"
1010
export * from "./get-time-buckets"
11+
export * from "./ingest"
1112
export * from "./list-data-cores"
1213
export * from "./list-event-types"
1314
export * from "./list-flow-types"

src/tools/ingest.ts

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
export type IngestInput = {
2+
tenant: string
3+
dataCoreId: string
4+
flowTypeName: string
5+
eventTypeName: string
6+
events: unknown[] | unknown
7+
}
8+
9+
export const ingestHandler = (apiKey: string) => async ({
10+
tenant,
11+
dataCoreId,
12+
flowTypeName,
13+
eventTypeName,
14+
events,
15+
}: IngestInput) => {
16+
try {
17+
let type = "event"
18+
19+
if (Array.isArray(events)) {
20+
type = "events"
21+
}
22+
23+
const webhookUrl = `https://webhook.api.flowcore.io/${type}/${tenant}/${dataCoreId}/${flowTypeName}/${eventTypeName}`
24+
25+
// Execute the command manually since it's a custom command
26+
const response = await fetch(webhookUrl, {
27+
method: "POST",
28+
headers: {
29+
"Content-Type": "application/json",
30+
Authorization: `${apiKey}`,
31+
},
32+
body: JSON.stringify(events),
33+
})
34+
35+
if (!response.ok) {
36+
throw new Error(`HTTP error ${response.status}: ${await response.text()}`)
37+
}
38+
39+
const result = await response.json()
40+
41+
return {
42+
content: [
43+
{
44+
type: "text" as const,
45+
text: JSON.stringify(result),
46+
},
47+
],
48+
}
49+
} catch (error) {
50+
// Create properly typed content array for error
51+
const content = [
52+
{
53+
type: "text" as const,
54+
text: `Failed to ingest events with error: ${error}`,
55+
},
56+
]
57+
58+
return { isError: true, content }
59+
}
60+
}

0 commit comments

Comments
 (0)