Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions apps/agent/Taskfile.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,6 @@ tasks:
cmds:
- golangci-lint run

migrate:
cmds:
- goose -dir=./pkg/clickhouse/schema clickhouse "tcp://127.0.0.1:9000" up

generate:
cmds:
- go get github.com/oapi-codegen/oapi-codegen/v2/cmd/oapi-codegen
Expand Down
10 changes: 5 additions & 5 deletions apps/agent/pkg/clickhouse/schema/000_README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ Examples:
### Aggregation Suffixes

For aggregated or summary tables, use suffixes like:
- `_daily`
- `_monthly`
- `_per_day`
- `_per_month`
- `_summary`

## Materialized View Naming Convention
Expand All @@ -54,19 +54,19 @@ Format: `mv_[description]_[aggregation]`
`raw_sales_transactions_v1`

2. Materialized View:
`mv_active_users_daily_v2`
`mv_active_users_per_day_v2`

3. Temporary Table:
`tmp_andreas_user_analysis_v1`

4. Aggregated Table:
`mv_sales_summary_daily_v1`
`mv_sales_summary_per_hour_v1`
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

New time-based suffix introduced without documentation.

The example has been updated to use '_per_hour', which is a new time-based suffix not mentioned in the "Aggregation Suffixes" section. Consider adding '_per_hour' to the list of suffixes in that section for completeness and consistency.


## Consistency Across Related Objects

Maintain consistent naming across related tables, views, and other objects:

- `raw_user_activity_v1`
- `mv_user_activity_daily_v1`
- `mv_user_activity_per_day_v1`

By following these conventions, we ensure a clear, consistent, and scalable naming structure for our ClickHouse setup.
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
-- +goose up
CREATE TABLE default.raw_telemetry_sdks_v1(
-- the api request id, so we can correlate the telemetry with traces and logs
request_id String,

-- unix milli
time Int64,

-- ie: node@20
runtime String,
-- ie: vercel
platform String,

-- ie: [ "@unkey/api@1.2.3", "@unkey/ratelimit@4.5.6" ]
versions Array(String)
)
ENGINE = MergeTree()
ORDER BY (request_id, time)
;
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
-- +goose up
CREATE TABLE default.key_verifications_per_day_v1
(
time DateTime,
workspace_id String,
key_space_id String,
identity_id String,
key_id String,
outcome LowCardinality(String),
count AggregateFunction(count, UInt64)
)
ENGINE = AggregatingMergeTree()
ORDER BY (workspace_id, key_space_id, time, identity_id, key_id)
;
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
-- +goose up
CREATE MATERIALIZED VIEW default.mv_key_verifications_per_day_v1 TO default.key_verifications_per_day_v1 AS
SELECT
workspace_id,
key_space_id,
identity_id,
key_id,
outcome,
countState() as count,
toStartOfDay(fromUnixTimestamp64Milli(time)) AS time
FROM default.raw_key_verifications_v1
GROUP BY
workspace_id,
key_space_id,
identity_id,
key_id,
outcome,
time
;
23 changes: 19 additions & 4 deletions apps/api/src/pkg/analytics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,19 @@ export class Analytics {
this.clickhouse = opts.clickhouse ? new ch.Client({ url: opts.clickhouse.url }) : new ch.Noop();
}

public get insertSdkTelemetry() {
return this.clickhouse.insert({
table: "default.raw_telemetry_sdks_v1",
schema: z.object({
request_id: z.string(),
time: z.number().int(),
runtime: z.string(),
platform: z.string(),
versions: z.array(z.string()),
}),
});
}
//tinybird, to be removed
public get ingestSdkTelemetry() {
return this.writeClient.buildIngestEndpoint({
datasource: "sdk_telemetry__v1",
Expand All @@ -54,7 +67,8 @@ export class Analytics {
});
}

public ingestUnkeyAuditLogs(logs: MaybeArray<UnkeyAuditLog>) {
//tinybird
public ingestUnkeyAuditLogsTinybird(logs: MaybeArray<UnkeyAuditLog>) {
return this.writeClient.buildIngestEndpoint({
datasource: "audit_logs__v2",
event: auditLogSchemaV1
Expand All @@ -78,7 +92,8 @@ export class Analytics {
})(logs);
}

public get ingestGenericAuditLogs() {
//tinybird
public get ingestGenericAuditLogsTinybird() {
return this.writeClient.buildIngestEndpoint({
datasource: "audit_logs__v2",
event: auditLogSchemaV1.transform((l) => ({
Expand All @@ -92,7 +107,7 @@ export class Analytics {
})),
});
}

//tinybird
public get ingestRatelimit() {
return this.writeClient.buildIngestEndpoint({
datasource: "ratelimits__v2",
Expand Down Expand Up @@ -146,7 +161,7 @@ export class Analytics {
}),
});
}

// replaced by insertKeyVerification
public get ingestKeyVerification() {
Comment on lines +164 to 165
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Consider proper deprecation of ingestKeyVerification.

The comment indicates that this method has been replaced by insertKeyVerification. However, the method still exists in the codebase, which could lead to confusion and potential misuse.

To address this:

  1. Properly deprecate the method using TypeScript's @deprecated tag.
  2. Add a more descriptive comment explaining why it's deprecated and what to use instead.
  3. Consider creating a ticket to remove this method in a future update.

Here's a suggested change:

-  // replaced by insertKeyVerification
+  /**
+   * @deprecated This method has been replaced by `insertKeyVerification`.
+   * Please use `insertKeyVerification` for all new code.
+   * This method will be removed in a future version.
+   */
  public get ingestKeyVerification() {

Additionally, you might want to add a console warning when this method is called to alert developers of its deprecated status.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// replaced by insertKeyVerification
public get ingestKeyVerification() {
/**
* @deprecated This method has been replaced by `insertKeyVerification`.
* Please use `insertKeyVerification` for all new code.
* This method will be removed in a future version.
*/
public get ingestKeyVerification() {

return this.writeClient.buildIngestEndpoint({
datasource: "key_verifications__v2",
Expand Down
106 changes: 106 additions & 0 deletions apps/api/src/pkg/audit.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
import type { Context } from "@/pkg/hono/app";
import { auditLogSchemaV1, unkeyAuditLogEvents } from "@unkey/schema/src/auditlog";
import { type Transaction, schema } from "./db";

import { newId } from "@unkey/id";
import { z } from "zod";
import type { UnkeyAuditLog } from "./analytics";

export async function insertUnkeyAuditLog(
c: Context,
tx: Transaction | undefined,
auditLogs: UnkeyAuditLog | Array<UnkeyAuditLog>,
): Promise<void> {
const schema = auditLogSchemaV1.merge(
z.object({
event: unkeyAuditLogEvents,
auditLogId: z.string().default(newId("auditLog")),
bucket: z.string().default("unkey_mutations"),
time: z.number().default(Date.now()),
}),
);
Comment on lines +14 to +21
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Avoid variable shadowing of schema to prevent confusion

The local variable schema declared in insertUnkeyAuditLog shadows the imported schema from "./db". This could lead to confusion or unexpected behavior. Consider renaming the local variable to avoid this shadowing.

Apply this diff to rename the local variable:

-  const schema = auditLogSchemaV1.merge(
+  const validationSchema = auditLogSchemaV1.merge(

Update its usage accordingly:

-    arr.map((l) => schema.parse(l)),
+    arr.map((l) => validationSchema.parse(l)),

Committable suggestion was skipped due to low confidence.


const arr = Array.isArray(auditLogs) ? auditLogs : [auditLogs];
return insertGenericAuditLogs(
c,
tx,
arr.map((l) => schema.parse(l)),
);
}

export async function insertGenericAuditLogs(
c: Context,
tx: Transaction | undefined,
auditLogs: z.infer<typeof auditLogSchemaV1> | z.infer<typeof auditLogSchemaV1>[],
): Promise<void> {
const arr = Array.isArray(auditLogs) ? auditLogs : [auditLogs];

if (arr.length === 0) {
return;
}

const { cache, logger, db } = c.get("services");

for (const log of arr) {
const { val: bucket, err } = await cache.auditLogBucketByWorkspaceIdAndName.swr(
[log.workspaceId, log.bucket].join(":"),
async () => {
const bucket = await (tx ?? db.primary).query.auditLogBucket.findFirst({
where: (table, { eq, and }) =>
and(eq(table.workspaceId, log.workspaceId), eq(table.name, log.bucket)),
});
if (!bucket) {
return undefined;
}
return {
id: bucket.id,
};
},
);
if (err) {
logger.error("Could not find audit log bucket for workspace", {
workspaceId: log.workspaceId,
error: err.message,
});
continue;
}

if (!bucket) {
logger.error("Could not find audit log bucket for workspace", {
workspaceId: log.workspaceId,
});
continue;
}
Comment on lines +69 to +73
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Enhance error logging to include bucket name

When logging the error for a missing audit log bucket, consider including the bucket name to provide more context for debugging.

Apply this diff to include the bucket name in the log:

     logger.error("Could not find audit log bucket for workspace", {
       workspaceId: log.workspaceId,
+      bucket: log.bucket,
     });
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
logger.error("Could not find audit log bucket for workspace", {
workspaceId: log.workspaceId,
});
continue;
}
logger.error("Could not find audit log bucket for workspace", {
workspaceId: log.workspaceId,
bucket: log.bucket,
});
continue;
}


const auditLogId = newId("auditLog");
await (tx ?? db.primary).insert(schema.auditLog).values({
id: auditLogId,
workspaceId: log.workspaceId,
bucketId: bucket.id,
event: log.event,
time: log.time,

display: log.description ?? "",

remoteIp: log.context?.location,

userAgent: log.context?.userAgent,
actorType: log.actor.type,
actorId: log.actor.id,
actorName: log.actor.name,
actorMeta: log.actor.meta,
});
Comment on lines +75 to +92
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Use provided auditLogId from logs instead of generating a new one

In insertGenericAuditLogs, a new auditLogId is generated even though the log may already contain an auditLogId. To maintain consistency and avoid potential duplication, consider using log.auditLogId if it exists.

Apply this diff to use log.auditLogId if available:

-    const auditLogId = newId("auditLog");
+    const auditLogId = log.auditLogId || newId("auditLog");
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
const auditLogId = newId("auditLog");
await (tx ?? db.primary).insert(schema.auditLog).values({
id: auditLogId,
workspaceId: log.workspaceId,
bucketId: bucket.id,
event: log.event,
time: log.time,
display: log.description ?? "",
remoteIp: log.context?.location,
userAgent: log.context?.userAgent,
actorType: log.actor.type,
actorId: log.actor.id,
actorName: log.actor.name,
actorMeta: log.actor.meta,
});
const auditLogId = log.auditLogId || newId("auditLog");
await (tx ?? db.primary).insert(schema.auditLog).values({
id: auditLogId,
workspaceId: log.workspaceId,
bucketId: bucket.id,
event: log.event,
time: log.time,
display: log.description ?? "",
remoteIp: log.context?.location,
userAgent: log.context?.userAgent,
actorType: log.actor.type,
actorId: log.actor.id,
actorName: log.actor.name,
actorMeta: log.actor.meta,
});

await (tx ?? db.primary).insert(schema.auditLogTarget).values(
log.resources.map((r) => ({
workspaceId: log.workspaceId,
bucketId: bucket.id,
auditLogId,
displayName: r.name ?? "",
type: r.type,
id: r.id,
name: r.name,
meta: r.meta,
})),
);
}
}
3 changes: 3 additions & 0 deletions apps/api/src/pkg/cache/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ export function initCache(c: Context<HonoEnv>, metrics: Metrics): C<CacheNamespa
c.executionCtx,
defaultOpts,
),
auditLogBucketByWorkspaceIdAndName: new Namespace<
CacheNamespaces["auditLogBucketByWorkspaceIdAndName"]
>(c.executionCtx, defaultOpts),
});
}

Expand Down
4 changes: 4 additions & 0 deletions apps/api/src/pkg/cache/namespaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ export type CacheNamespaces = {
total: number;
};
identityByExternalId: Identity | null;
// uses a compound key of [workspaceId, name]
auditLogBucketByWorkspaceIdAndName: {
id: string;
};
};

export type CacheNamespace = keyof CacheNamespaces;
3 changes: 1 addition & 2 deletions apps/api/src/pkg/db.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
import { Client } from "@planetscale/database";
import { type PlanetScaleDatabase, drizzle, schema } from "@unkey/db";
import { type Database, drizzle, schema } from "@unkey/db";
import type { Logger } from "@unkey/worker-logging";
import { instrumentedFetch } from "./util/instrument-fetch";
export type Database = PlanetScaleDatabase<typeof schema>;

type ConnectionOptions = {
host: string;
Expand Down
6 changes: 3 additions & 3 deletions apps/api/src/pkg/key_migration/handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ export async function migrateKey(
environment: message.environment,
});

await analytics.ingestUnkeyAuditLogs({
await analytics.ingestUnkeyAuditLogsTinybird({
workspaceId: message.workspaceId,
event: "key.create",
actor: {
Expand Down Expand Up @@ -149,7 +149,7 @@ export async function migrateKey(

await tx.insert(schema.keysRoles).values(roleConnections);

await analytics.ingestUnkeyAuditLogs(
await analytics.ingestUnkeyAuditLogsTinybird(
roleConnections.map((rc) => ({
workspaceId: message.workspaceId,
actor: { type: "key", id: message.rootKeyId },
Expand Down Expand Up @@ -184,7 +184,7 @@ export async function migrateKey(

await tx.insert(schema.keysPermissions).values(permissionConnections);

await analytics.ingestUnkeyAuditLogs(
await analytics.ingestUnkeyAuditLogsTinybird(
permissionConnections.map((pc) => ({
workspaceId: message.workspaceId,
actor: { type: "key", id: message.rootKeyId },
Expand Down
18 changes: 17 additions & 1 deletion apps/api/src/pkg/middleware/metrics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ export function metrics(): MiddlewareHandler<HonoEnv> {

c.executionCtx.waitUntil(
analytics.ingestSdkTelemetry(event).catch((err) => {
logger.error("Error ingesting SDK telemetry", {
logger.error("Error ingesting SDK telemetry into tinybird", {
method: c.req.method,
path: c.req.path,
error: err.message,
Expand All @@ -62,6 +62,22 @@ export function metrics(): MiddlewareHandler<HonoEnv> {
});
}),
);
c.executionCtx.waitUntil(
analytics
.insertSdkTelemetry({
...event,
request_id: event.requestId,
})
.catch((err) => {
logger.error("Error inserting SDK telemetry", {
method: c.req.method,
path: c.req.path,
error: err.message,
telemetry,
event,
});
}),
);
}

await next();
Expand Down
24 changes: 23 additions & 1 deletion apps/api/src/routes/legacy_keys_createKey.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import type { App } from "@/pkg/hono/app";
import { createRoute, z } from "@hono/zod-openapi";

import { insertUnkeyAuditLog } from "@/pkg/audit";
import { rootKeyAuth } from "@/pkg/auth/root_key";
import { UnkeyApiError, openApiErrorResponses } from "@/pkg/errors";
import { schema } from "@unkey/db";
Expand Down Expand Up @@ -227,7 +228,28 @@ export const registerLegacyKeysCreate = (app: App) =>
deletedAt: null,
});

await analytics.ingestUnkeyAuditLogs({
await analytics.ingestUnkeyAuditLogsTinybird({
workspaceId: authorizedWorkspaceId,
actor: { type: "key", id: rootKeyId },
event: "key.create",
description: `Created ${keyId}`,
resources: [
{
type: "key",
id: keyId,
},
{
type: "keyAuth",
id: api.keyAuthId!,
},
{ type: "api", id: api.id },
],
context: {
location: c.get("location"),
userAgent: c.get("userAgent"),
},
});
await insertUnkeyAuditLog(c, tx, {
Comment on lines +231 to +252
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Refactor to eliminate duplicated audit logging code

The calls to analytics.ingestUnkeyAuditLogsTinybird and insertUnkeyAuditLog use identical parameters. To improve maintainability and reduce redundancy, consider extracting the shared parameters into a single auditLogData object. This will make future updates easier and minimize the risk of inconsistencies.

Apply this diff to refactor the code:

+          const auditLogData = {
+            workspaceId: authorizedWorkspaceId,
+            actor: { type: "key", id: rootKeyId },
+            event: "key.create",
+            description: `Created ${keyId}`,
+            resources: [
+              {
+                type: "key",
+                id: keyId,
+              },
+              {
+                type: "keyAuth",
+                id: api.keyAuthId!,
+              },
+              { type: "api", id: api.id },
+            ],
+            context: {
+              location: c.get("location"),
+              userAgent: c.get("userAgent"),
+            },
+          };
-          await analytics.ingestUnkeyAuditLogsTinybird({
-            workspaceId: authorizedWorkspaceId,
-            actor: { type: "key", id: rootKeyId },
-            event: "key.create",
-            description: `Created ${keyId}`,
-            resources: [
-              {
-                type: "key",
-                id: keyId,
-              },
-              {
-                type: "keyAuth",
-                id: api.keyAuthId!,
-              },
-              { type: "api", id: api.id },
-            ],
-            context: {
-              location: c.get("location"),
-              userAgent: c.get("userAgent"),
-            },
-          });
+          await analytics.ingestUnkeyAuditLogsTinybird(auditLogData);
-          await insertUnkeyAuditLog(c, tx, {
-            workspaceId: authorizedWorkspaceId,
-            actor: { type: "key", id: rootKeyId },
-            event: "key.create",
-            description: `Created ${keyId}`,
-            resources: [
-              {
-                type: "key",
-                id: keyId,
-              },
-              {
-                type: "keyAuth",
-                id: api.keyAuthId!,
-              },
-              { type: "api", id: api.id },
-            ],
-            context: {
-              location: c.get("location"),
-              userAgent: c.get("userAgent"),
-            },
-          });
+          await insertUnkeyAuditLog(c, tx, auditLogData);
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
await analytics.ingestUnkeyAuditLogsTinybird({
workspaceId: authorizedWorkspaceId,
actor: { type: "key", id: rootKeyId },
event: "key.create",
description: `Created ${keyId}`,
resources: [
{
type: "key",
id: keyId,
},
{
type: "keyAuth",
id: api.keyAuthId!,
},
{ type: "api", id: api.id },
],
context: {
location: c.get("location"),
userAgent: c.get("userAgent"),
},
});
await insertUnkeyAuditLog(c, tx, {
const auditLogData = {
workspaceId: authorizedWorkspaceId,
actor: { type: "key", id: rootKeyId },
event: "key.create",
description: `Created ${keyId}`,
resources: [
{
type: "key",
id: keyId,
},
{
type: "keyAuth",
id: api.keyAuthId!,
},
{ type: "api", id: api.id },
],
context: {
location: c.get("location"),
userAgent: c.get("userAgent"),
},
};
await analytics.ingestUnkeyAuditLogsTinybird(auditLogData);
await insertUnkeyAuditLog(c, tx, auditLogData);

workspaceId: authorizedWorkspaceId,
actor: { type: "key", id: rootKeyId },
event: "key.create",
Expand Down
Loading