gitdataai/admin/src/lib/redis.ts
ZhenYi a9c51526b8
Some checks are pending
CI / Rust Lint & Check (push) Waiting to run
CI / Rust Tests (push) Waiting to run
CI / Frontend Lint & Type Check (push) Waiting to run
CI / Frontend Build (push) Blocked by required conditions
fix(admin): fix ioredis 5.x Cluster constructor type resolution
- Use `Cluster` as default export from ioredis (not RedisCluster named export)
- Import ClusterNode type and use explicit type annotation on nodes array
- Use `any` cast on Cluster constructor to bypass TS overload resolution issue
- Fix closeRedis return type to Promise<unknown>
2026-04-20 09:41:19 +08:00

286 lines
7.7 KiB
TypeScript
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

/**
* Redis 客户端
* 支持单节点和集群模式
* 前缀admin:*
*/
import Redis, { default as Cluster } from "ioredis";
import type { ClusterNode } from "ioredis";
import { REDIS_URL, REDIS_CLUSTER_URLS } from "./env";
// Admin 专用的 Redis 前缀
const ADMIN_PREFIX = "admin:session:";
// 平台用户 Session 前缀(与 Rust 主应用一致)
const PLATFORM_SESSION_PREFIX = "session:user_uid:";
let redis: Redis | null = null;
function createSingleClient(): Redis {
return new Redis(REDIS_URL, {
lazyConnect: true,
retryStrategy(times) {
return Math.min(times * 100, 3000);
},
maxRetriesPerRequest: 3,
});
}
function createClusterClient(): Redis {
if (REDIS_CLUSTER_URLS.length === 0) {
return createSingleClient();
}
const nodes: ClusterNode[] = REDIS_CLUSTER_URLS.map((url) => {
const u = new URL(url);
return { host: u.hostname, port: parseInt(u.port || "6379", 10) };
});
const firstUrl = new URL(REDIS_CLUSTER_URLS[0]);
// ioredis 5.x: Cluster 是 default export, redisOptions 展开到顶层, 无 clusterRetryStrategy
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const cluster = new (Cluster as any)(nodes, {
lazyConnect: true,
maxRetriesPerRequest: 3,
retryStrategy: (times: number) => Math.min(times * 100, 3000),
// 从第一个 URL 提取认证信息(所有节点共用相同密码)
username: firstUrl.username || undefined,
password: firstUrl.password || undefined,
});
return cluster as Redis;
}
export function getRedis(): Redis {
if (!redis) {
redis =
REDIS_CLUSTER_URLS.length > 1 ? createClusterClient() : createSingleClient();
}
return redis;
}
export function closeRedis(): Promise<unknown> {
if (redis) {
return redis.quit();
}
return Promise.resolve();
}
/**
* 获取 Admin session 的 Redis key
*/
function adminSessionKey(sessionId: string): string {
return `${ADMIN_PREFIX}${sessionId}`;
}
/**
* 序列化 session 状态
* 兼容 Rust session 格式: { v: 1, state: {...} }
*/
function serializeSessionState(state: Record<string, unknown>): string {
return JSON.stringify({ v: 1, state });
}
/**
* 反序列化 session 状态
*/
function deserializeSessionState(raw: string): Record<string, unknown> {
try {
const parsed = JSON.parse(raw);
if (parsed && typeof parsed === "object" && "state" in parsed && parsed.state) {
return parsed.state as Record<string, unknown>;
}
return parsed as Record<string, unknown>;
} catch {
return {};
}
}
// ============ Session 操作 ============
export async function saveSession(
sessionId: string,
state: Record<string, unknown>,
ttlSeconds: number
): Promise<void> {
const r = getRedis();
const key = adminSessionKey(sessionId);
const value = serializeSessionState(state);
await r.setex(key, ttlSeconds, value);
}
export async function loadSession(
sessionId: string
): Promise<Record<string, unknown> | null> {
const r = getRedis();
const key = adminSessionKey(sessionId);
const raw = await r.get(key);
if (!raw) return null;
return deserializeSessionState(raw);
}
export async function updateSession(
sessionId: string,
state: Record<string, unknown>,
ttlSeconds: number
): Promise<void> {
const r = getRedis();
const key = adminSessionKey(sessionId);
const value = serializeSessionState(state);
await r.setex(key, ttlSeconds, value);
}
export async function deleteSession(sessionId: string): Promise<void> {
const r = getRedis();
const key = adminSessionKey(sessionId);
await r.del(key);
}
export async function refreshSessionTtl(
sessionId: string,
ttlSeconds: number
): Promise<void> {
const r = getRedis();
const key = adminSessionKey(sessionId);
await r.expire(key, ttlSeconds);
}
// ============ 在线用户管理SCAN不使用 KEYS============
export interface SessionInfo {
sessionId: string;
userId: string | null;
username: string | null;
ipAddress: string | null;
userAgent: string | null;
createdAt: string | null;
}
export async function getOnlineSessions(): Promise<SessionInfo[]> {
const r = getRedis();
const sessions: SessionInfo[] = [];
let cursor = "0";
do {
const [nextCursor, keys] = await r.scan(
cursor,
"MATCH",
`${ADMIN_PREFIX}*`,
"COUNT",
100
);
cursor = nextCursor;
if (keys.length > 0) {
const pipeline = r.pipeline();
for (const key of keys) {
pipeline.get(key);
}
const results = await pipeline.exec();
for (let i = 0; i < keys.length; i++) {
const raw = results?.[i]?.[1] as string | null;
if (raw) {
const state = deserializeSessionState(raw);
sessions.push({
sessionId: keys[i].replace(ADMIN_PREFIX, ""),
userId: String(state["session:user_uid"] ?? ""),
username: (state["session:username"] as string) || null,
ipAddress: (state["session:ip_address"] as string) || null,
userAgent: (state["session:user_agent"] as string) || null,
createdAt: (state["session:created_at"] as string) || null,
});
}
}
}
} while (cursor !== "0");
return sessions;
}
export async function getUserSessions(
userId: string
): Promise<SessionInfo[]> {
const allSessions = await getOnlineSessions();
return allSessions.filter((s) => s.userId === userId);
}
export async function kickUser(userId: string): Promise<number> {
const sessions = await getUserSessions(userId);
if (sessions.length === 0) return 0;
const r = getRedis();
const pipeline = r.pipeline();
for (const s of sessions) {
pipeline.del(`${ADMIN_PREFIX}${s.sessionId}`);
}
const results = await pipeline.exec();
return results?.filter((r) => r[0] === null && r[1] === 1).length ?? 0;
}
export async function kickSession(sessionId: string): Promise<boolean> {
const r = getRedis();
const result = await r.del(`${ADMIN_PREFIX}${sessionId}`);
return result === 1;
}
// ============ 平台用户会话管理SCAN session:user_uid:*============
export interface PlatformSessionInfo {
sessionId: string;
userId: string;
username: string | null;
workspaceId: string | null;
ipAddress: string | null;
userAgent: string | null;
createdAt: string | null;
}
export async function getOnlinePlatformSessions(): Promise<PlatformSessionInfo[]> {
const r = getRedis();
const sessions: PlatformSessionInfo[] = [];
let cursor = "0";
do {
const [nextCursor, keys] = await r.scan(
cursor,
"MATCH",
`${PLATFORM_SESSION_PREFIX}*`,
"COUNT",
100
);
cursor = nextCursor;
if (keys.length > 0) {
const pipeline = r.pipeline();
for (const key of keys) {
pipeline.get(key);
}
const results = await pipeline.exec();
for (let i = 0; i < keys.length; i++) {
const raw = results?.[i]?.[1] as string | null;
if (raw) {
const state = deserializeSessionState(raw);
sessions.push({
sessionId: keys[i].replace(PLATFORM_SESSION_PREFIX, ""),
userId: String(state["session:user_uid"] ?? ""),
username: (state["session:username"] as string) || null,
workspaceId: (state["session:workspace_id"] as string) || null,
ipAddress: (state["session:ip_address"] as string) || null,
userAgent: (state["session:user_agent"] as string) || null,
createdAt: (state["session:created_at"] as string) || null,
});
}
}
}
} while (cursor !== "0");
return sessions;
}
export async function kickPlatformSession(sessionId: string): Promise<boolean> {
const r = getRedis();
const result = await r.del(`${PLATFORM_SESSION_PREFIX}${sessionId}`);
return result === 1;
}