Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions apps/dev-playground/app.yaml
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
env:
- name: DATABRICKS_WAREHOUSE_ID
valueFrom: sql-warehouse

permissions:
user_authorization:
scopes:
- sql
17 changes: 17 additions & 0 deletions apps/dev-playground/client/src/routes/index.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import {
} from "@tanstack/react-router";
import { Button, Card } from "@databricks/appkit-ui/react";
import { ThemeSelector } from "@/components/theme-selector";
import { useEffect } from "react";

export const Route = createFileRoute("/")({
component: IndexRoute,
Expand All @@ -16,6 +17,22 @@ export const Route = createFileRoute("/")({
function IndexRoute() {
const navigate = useNavigate();

useEffect(() => {
fetch("/sp")
.then((res) => res.json())
.then((data) => {
console.log(data);
});
}, []);

// useEffect(() => {
// fetch("/obo")
// .then((res) => res.json())
// .then((data) => {
// console.log(data);
// });
// }, []);

return (
<div className="min-h-screen bg-background">
<div className="absolute top-4 right-4">
Expand Down
48 changes: 47 additions & 1 deletion apps/dev-playground/server/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,51 @@ import { reconnect } from "./reconnect-plugin";
import { telemetryExamples } from "./telemetry-example-plugin";

createApp({
plugins: [server(), reconnect(), telemetryExamples(), analytics({})],
plugins: [
server({ autoStart: false }),
reconnect(),
telemetryExamples(),
analytics({}),
],
}).then((appkit) => {
appkit.server
.extend((app) => {
// Debug endpoint to inspect headers
app.get("/debug-headers", (req, res) => {
const token = req.headers["x-forwarded-access-token"] as string;
res.json({
hasToken: !!token,
tokenLength: token?.length,
tokenPrefix: token?.substring(0, 30),
userId: req.headers["x-forwarded-user"],
allHeaders: Object.keys(req.headers),
});
});

app.get("/sp", (_req, res) => {
appkit.analytics.query("SELECT 1").then((result) => {
console.log(result);
res.json(result);
});
});

app.get("/obo", (req, res) => {
appkit.analytics
.asUser(req)
.query("SELECT 1")
.then((result) => {
console.log(result);
res.json(result);
})
.catch((error) => {
console.error("OBO Error:", error);
res.status(500).json({
error: error.message,
errorCode: error.errorCode,
statusCode: error.statusCode,
});
});
});
})
.start();
});
92 changes: 53 additions & 39 deletions packages/appkit/src/analytics/analytics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,13 @@ import type {
StreamExecutionSettings,
} from "shared";
import { SQLWarehouseConnector } from "../connectors";
import {
getCurrentUserId,
getWarehouseId,
getWorkspaceClient,
} from "../context";
import { Plugin, toPlugin } from "../plugin";
import type { Request, Response } from "../utils";
import { getRequestContext, getWorkspaceClient } from "../utils";
import { queryDefaults } from "./defaults";
import { QueryProcessor } from "./query";
import type {
Expand All @@ -20,7 +24,6 @@ import type {
export class AnalyticsPlugin extends Plugin {
name = "analytics";
envVars = [];
requiresDatabricksClient = true;

protected static description = "Analytics plugin for data analysis";
protected declare config: IAnalyticsConfig;
Expand All @@ -41,6 +44,7 @@ export class AnalyticsPlugin extends Plugin {
}

injectRoutes(router: IAppRouter) {
// Service principal endpoints
this.route(router, {
name: "arrow",
method: "get",
Expand All @@ -50,12 +54,22 @@ export class AnalyticsPlugin extends Plugin {
},
});

this.route<AnalyticsQueryResponse>(router, {
name: "query",
method: "post",
path: "/query/:query_key",
handler: async (req: Request, res: Response) => {
await this._handleQueryRoute(req, res);
},
});

// User context endpoints - use asUser(req) to execute with user's identity
this.route(router, {
name: "arrowAsUser",
method: "get",
path: "/users/me/arrow-result/:jobId",
handler: async (req: Request, res: Response) => {
await this._handleArrowRoute(req, res, { asUser: true });
await this.asUser(req)._handleArrowRoute(req, res);
},
});

Expand All @@ -64,29 +78,19 @@ export class AnalyticsPlugin extends Plugin {
method: "post",
path: "/users/me/query/:query_key",
handler: async (req: Request, res: Response) => {
await this._handleQueryRoute(req, res, { asUser: true });
},
});

this.route<AnalyticsQueryResponse>(router, {
name: "query",
method: "post",
path: "/query/:query_key",
handler: async (req: Request, res: Response) => {
await this._handleQueryRoute(req, res, { asUser: false });
await this.asUser(req)._handleQueryRoute(req, res);
},
});
}

private async _handleArrowRoute(
req: Request,
res: Response,
{ asUser = false }: { asUser?: boolean } = {},
): Promise<void> {
/**
* Handle Arrow data download requests.
* When called via asUser(req), uses the user's Databricks credentials.
*/
async _handleArrowRoute(req: Request, res: Response): Promise<void> {
try {
const { jobId } = req.params;

const workspaceClient = getWorkspaceClient(asUser);
const workspaceClient = getWorkspaceClient();

console.log(
`Processing Arrow job request: ${jobId} for plugin: ${this.name}`,
Expand All @@ -111,11 +115,11 @@ export class AnalyticsPlugin extends Plugin {
}
}

private async _handleQueryRoute(
req: Request,
res: Response,
{ asUser = false }: { asUser?: boolean } = {},
): Promise<void> {
/**
* Handle SQL query execution requests.
* When called via asUser(req), uses the user's Databricks credentials.
*/
async _handleQueryRoute(req: Request, res: Response): Promise<void> {
const { query_key } = req.params;
const { parameters, format = "JSON" } = req.body as IAnalyticsQueryRequest;
const queryParameters =
Expand All @@ -131,10 +135,8 @@ export class AnalyticsPlugin extends Plugin {
type: "result",
};

const requestContext = getRequestContext();
const userKey = asUser
? requestContext.userId
: requestContext.serviceUserId;
// Get user key from current context (automatically includes user ID when in user context)
const userKey = getCurrentUserId();

if (!query_key) {
res.status(400).json({ error: "query_key is required" });
Expand Down Expand Up @@ -164,7 +166,7 @@ export class AnalyticsPlugin extends Plugin {
JSON.stringify(parameters),
JSON.stringify(format),
hashedQuery,
userKey,
// userKey is automatically set based on context
],
},
};
Expand All @@ -186,9 +188,6 @@ export class AnalyticsPlugin extends Plugin {
processedParams,
queryParameters.formatParameters,
signal,
{
asUser,
},
);

return { type: queryParameters.type, ...result };
Expand All @@ -198,15 +197,29 @@ export class AnalyticsPlugin extends Plugin {
);
}

/**
* Execute a SQL query using the current execution context.
*
* When called directly: uses service principal credentials.
* When called via asUser(req).query(...): uses user's credentials.
*
* @example
* ```typescript
* // Service principal execution
* const result = await analytics.query("SELECT * FROM table")
*
* // User context execution (in route handler)
* const result = await this.asUser(req).query("SELECT * FROM table")
* ```
*/
async query(
query: string,
parameters?: Record<string, SQLTypeMarker | null | undefined>,
formatParameters?: Record<string, any>,
signal?: AbortSignal,
{ asUser = false }: { asUser?: boolean } = {},
): Promise<any> {
const requestContext = getRequestContext();
const workspaceClient = getWorkspaceClient(asUser);
const workspaceClient = getWorkspaceClient();
const warehouseId = await getWarehouseId();

const { statement, parameters: sqlParameters } =
this.queryProcessor.convertToSQLParameters(query, parameters);
Expand All @@ -215,7 +228,7 @@ export class AnalyticsPlugin extends Plugin {
workspaceClient,
{
statement,
warehouse_id: await requestContext.warehouseId,
warehouse_id: warehouseId,
parameters: sqlParameters,
...formatParameters,
},
Expand All @@ -225,8 +238,9 @@ export class AnalyticsPlugin extends Plugin {
return response.result;
}

// If we need arrow stream in more plugins we can define this as a base method in the core plugin class
// and have a generic endpoint for each plugin that consumes this arrow data.
/**
* Get Arrow-formatted data for a completed query job.
*/
protected async getArrowData(
workspaceClient: WorkspaceClient,
jobId: string,
Expand Down
5 changes: 2 additions & 3 deletions packages/appkit/src/analytics/query.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { createHash } from "node:crypto";
import type { sql } from "@databricks/sdk-experimental";
import { isSQLTypeMarker, type SQLTypeMarker, sql as sqlHelpers } from "shared";
import { getRequestContext } from "../utils";
import { getWorkspaceId } from "../context";

type SQLParameterValue = SQLTypeMarker | null | undefined;

Expand All @@ -18,8 +18,7 @@ export class QueryProcessor {

// auto-inject workspaceId if needed and not provided
if (queryParams.has("workspaceId") && !processed.workspaceId) {
const requestContext = getRequestContext();
const workspaceId = await requestContext.workspaceId;
const workspaceId = await getWorkspaceId();
if (workspaceId) {
processed.workspaceId = sqlHelpers.string(workspaceId);
}
Expand Down
12 changes: 6 additions & 6 deletions packages/appkit/src/connectors/lakebase/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -268,22 +268,22 @@ export class LakebaseConnector {
this.close();
}

/** Get Databricks workspace client - from config or request context */
/** Get Databricks workspace client - from config or execution context */
private getWorkspaceClient(): WorkspaceClient {
if (this.config.workspaceClient) {
return this.config.workspaceClient;
}

try {
const { getRequestContext } = require("../../utils");
const { serviceDatabricksClient } = getRequestContext();
const { getWorkspaceClient: getClient } = require("../../context");
const client = getClient();

// cache it for subsequent calls
this.config.workspaceClient = serviceDatabricksClient;
return serviceDatabricksClient;
this.config.workspaceClient = client;
return client;
} catch (_error) {
throw new Error(
"Databricks workspace client not available. Either pass it in config or use within AppKit request context.",
"Databricks workspace client not available. Either pass it in config or ensure ServiceContext is initialized.",
);
}
}
Expand Down
Loading