diff --git a/apps/gingress/Cargo.toml b/apps/gingress/Cargo.toml deleted file mode 100644 index bfd83f4..0000000 --- a/apps/gingress/Cargo.toml +++ /dev/null @@ -1,46 +0,0 @@ -[package] -name = "gingress" -version.workspace = true -edition.workspace = true -authors.workspace = true -description = "GIngress control plane: Kubernetes Ingress Controller using kube-rs" -repository.workspace = true -readme.workspace = true -homepage.workspace = true -license.workspace = true -keywords.workspace = true -categories.workspace = true -documentation.workspace = true - -[[bin]] -name = "gingress" -path = "src/main.rs" - -[[bin]] -name = "kubectl-gingress" -path = "src/bin/kubectl-gingress/main.rs" - -[dependencies] -gingress-proxy = { workspace = true } - -kube = { workspace = true } -k8s-openapi = { workspace = true } - -tokio = { workspace = true, features = ["full"] } -serde = { workspace = true, features = ["derive"] } -serde_json = { workspace = true } -serde_yaml = { workspace = true } -tracing = { workspace = true } -observability = { workspace = true } -anyhow = { workspace = true } -thiserror = { workspace = true } -dashmap = { workspace = true } -futures-util = { workspace = true } -futures = { workspace = true } -clap = { workspace = true } -url = { workspace = true } -x509-parser = "0.17" -rustls-pemfile = "2" - -[lints] -workspace = true diff --git a/apps/gingress/src/bin/kubectl-gingress/main.rs b/apps/gingress/src/bin/kubectl-gingress/main.rs deleted file mode 100644 index b8cfd03..0000000 --- a/apps/gingress/src/bin/kubectl-gingress/main.rs +++ /dev/null @@ -1,887 +0,0 @@ -//! kubectl-gingress — kubectl plugin for managing GIngress resources. -//! -//! Usage (via kubectl): kubectl gingress -//! Usage (standalone): kubectl-gingress - -use clap::{Parser, Subcommand}; -use k8s_openapi::api::core::v1::{Pod, Secret}; -use k8s_openapi::api::networking::v1::{HTTPIngressPath, Ingress}; -use kube::api::ListParams; -use kube::{Api, Client, ResourceExt}; - -const INGRESS_CLASS: &str = "gingress"; - -#[derive(Parser)] -#[command( - name = "kubectl-gingress", - bin_name = "kubectl gingress", - about = "Manage GIngress — Kubernetes Ingress Controller", - version -)] -struct Cli { - #[command(subcommand)] - command: Command, -} - -#[derive(Subcommand)] -enum Command { - /// List all Ingress resources managed by GIngress - #[command(alias = "ls")] - List { - /// Filter by namespace (omit for all namespaces) - #[arg(short, long)] - namespace: Option, - /// Output as JSON - #[arg(long)] - json: bool, - }, - /// Show the routing table (host → path → backend) - Routes { - /// Filter by namespace - #[arg(short, long)] - namespace: Option, - /// Filter by host - #[arg(short = 'H', long)] - host: Option, - /// Output as JSON - #[arg(long)] - json: bool, - }, - /// Show backend services and their endpoints - Backends { - /// Filter by namespace - #[arg(short, long)] - namespace: Option, - /// Output as JSON - #[arg(long)] - json: bool, - }, - /// List TLS certificates (from Secrets) - Certs { - /// Filter by namespace - #[arg(short, long)] - namespace: Option, - /// Output as JSON - #[arg(long)] - json: bool, - }, - /// Validate Ingress configurations - Validate { - /// Filter by namespace - #[arg(short, long)] - namespace: Option, - }, - /// Show GIngress controller status and summary - Status { - /// Output as JSON - #[arg(long)] - json: bool, - }, -} - -#[tokio::main] -async fn main() -> Result<(), Box> { - let cli = Cli::parse(); - let client = Client::try_default().await?; - - match cli.command { - Command::List { namespace, json } => cmd_list(&client, namespace, json).await?, - Command::Routes { - namespace, - host, - json, - } => cmd_routes(&client, namespace, host, json).await?, - Command::Backends { namespace, json } => cmd_backends(&client, namespace, json).await?, - Command::Certs { namespace, json } => cmd_certs(&client, namespace, json).await?, - Command::Validate { namespace } => cmd_validate(&client, namespace).await?, - Command::Status { json } => cmd_status(&client, json).await?, - } - - Ok(()) -} - -// ── list ────────────────────────────────────────────────────────── - -async fn cmd_list( - client: &Client, - namespace: Option, - json: bool, -) -> Result<(), Box> { - let ingresses = list_ingresses(client, namespace.as_deref()).await?; - - if json { - println!("{}", serde_json::to_string_pretty(&ingresses)?); - return Ok(()); - } - - if ingresses.is_empty() { - println!("No GIngress-managed Ingress resources found."); - return Ok(()); - } - - println!( - "{:<25} {:<20} {:<40} {:<50} {:<15}", - "NAMESPACE", "NAME", "HOSTS", "PATHS", "TLS" - ); - println!("{:-<150}", ""); - - for ing in &ingresses { - let ns = ing.namespace(); - let name = ing.name_any(); - let hosts = ing.hosts().join(", "); - let paths = ing - .paths_display() - .iter() - .map(|p| format!("{} {}", p.path_type, p.path)) - .collect::>() - .join(", "); - let tls = if ing.has_tls() { "Enabled" } else { "-" }; - - println!( - "{:<25} {:<20} {:<40} {:<50} {:<15}", - truncate(&ns, 25), - truncate(&name, 20), - truncate(&hosts, 40), - truncate(&paths, 50), - tls, - ); - } - - println!("\nTotal: {} Ingress(es)", ingresses.len()); - Ok(()) -} - -// ── routes ───────────────────────────────────────────────────────── - -async fn cmd_routes( - client: &Client, - namespace: Option, - host_filter: Option, - json: bool, -) -> Result<(), Box> { - let ingresses = list_ingresses(client, namespace.as_deref()).await?; - let mut routes: Vec = Vec::new(); - - for ing in &ingresses { - for rule in ing - .spec - .as_ref() - .and_then(|s| s.rules.as_ref()) - .into_iter() - .flatten() - { - let host = rule.host.as_deref().unwrap_or("*"); - if let Some(ref hf) = host_filter { - if host != hf { - continue; - } - } - if let Some(http) = &rule.http { - for path_item in &http.paths { - let backend = extract_backend(path_item); - let port = extract_backend_port(path_item); - routes.push(RouteRow { - namespace: ing.namespace(), - ingress: ing.name_any(), - host: host.to_string(), - path: path_item.path.clone().unwrap_or_else(|| "/".into()), - path_type: path_item.path_type.clone(), - backend, - port, - }); - } - } - } - } - - if json { - println!("{}", serde_json::to_string_pretty(&routes)?); - return Ok(()); - } - - if routes.is_empty() { - println!("No routes found."); - return Ok(()); - } - - println!( - "{:<20} {:<20} {:<30} {:<18} {:<15} {:<15} {:<15}", - "NAMESPACE", "INGRESS", "HOST", "PATH", "TYPE", "BACKEND", "PORT" - ); - println!("{:-<133}", ""); - - for r in &routes { - let port = extract_backend_port_str(r); - println!( - "{:<20} {:<20} {:<30} {:<18} {:<15} {:<15} {:<15}", - truncate(&r.namespace, 20), - truncate(&r.ingress, 20), - truncate(&r.host, 30), - truncate(&r.path, 18), - truncate(&r.path_type, 15), - truncate(&r.backend, 15), - port, - ); - } - - println!("\nTotal: {} route(s)", routes.len()); - Ok(()) -} - -// ── backends ─────────────────────────────────────────────────────── - -async fn cmd_backends( - client: &Client, - namespace: Option, - json: bool, -) -> Result<(), Box> { - let ingresses = list_ingresses(client, namespace.as_deref()).await?; - - // Collect unique backends from all ingresses - let mut backends: Vec = Vec::new(); - let mut seen = std::collections::HashSet::new(); - - for ing in &ingresses { - for rule in ing - .spec - .as_ref() - .and_then(|s| s.rules.as_ref()) - .into_iter() - .flatten() - { - if let Some(http) = &rule.http { - for path_item in &http.paths { - let svc = match path_item.backend.service.as_ref() { - Some(s) => s, - None => continue, - }; - let key = format!( - "{}/{}:{}", - ing.namespace(), - svc.name, - svc.port.as_ref().and_then(|p| p.number).unwrap_or(80) - ); - if seen.insert(key.clone()) { - let ns = ing.namespace(); - let ep_status = get_endpoint_status(client, &ns, &svc.name).await; - backends.push(BackendRow { - namespace: ns, - service: svc.name.clone(), - port: svc.port.as_ref().and_then(|p| p.number).unwrap_or(80) as u16, - ready_endpoints: ep_status.ready, - total_endpoints: ep_status.total, - referenced_by: ing.name_any(), - }); - } - } - } - } - } - - if json { - println!("{}", serde_json::to_string_pretty(&backends)?); - return Ok(()); - } - - if backends.is_empty() { - println!("No backends found."); - return Ok(()); - } - - println!( - "{:<20} {:<20} {:<8} {:<8} {:<18} {:<20}", - "NAMESPACE", "SERVICE", "PORT", "HEALTH", "ENDPOINTS", "REFERENCED BY" - ); - println!("{:-<94}", ""); - - for b in &backends { - let health = if b.total_endpoints == 0 { - "WARN" - } else if b.ready_endpoints == 0 { - "DOWN" - } else if b.ready_endpoints < b.total_endpoints { - "PARTIAL" - } else { - "OK" - }; - let eps = format!("{}/{} ready", b.ready_endpoints, b.total_endpoints); - println!( - "{:<20} {:<20} {:<8} {:<8} {:<18} {:<20}", - truncate(&b.namespace, 20), - truncate(&b.service, 20), - b.port, - health, - eps, - truncate(&b.referenced_by, 20), - ); - } - - println!("\nTotal: {} backend(s)", backends.len()); - Ok(()) -} - -// ── certs ────────────────────────────────────────────────────────── - -async fn cmd_certs( - client: &Client, - namespace: Option, - json: bool, -) -> Result<(), Box> { - let ingresses = list_ingresses(client, namespace.as_deref()).await?; - let mut certs: Vec = Vec::new(); - - for ing in &ingresses { - let ns = ing.namespace(); - let tls_entries = ing - .spec - .as_ref() - .and_then(|s| s.tls.as_ref()) - .cloned() - .unwrap_or_default(); - - for tls in &tls_entries { - let secret_name = tls.secret_name.clone().unwrap_or_default(); - let hosts = tls.hosts.clone().unwrap_or_default(); - - // Check if the secret exists - let secret_exists = check_secret_exists(client, &ns, &secret_name).await; - - for host in &hosts { - certs.push(CertRow { - namespace: ns.clone(), - secret_name: secret_name.clone(), - host: host.clone(), - found: secret_exists, - }); - } - } - } - - if json { - println!("{}", serde_json::to_string_pretty(&certs)?); - return Ok(()); - } - - if certs.is_empty() { - println!("No TLS certificates configured."); - return Ok(()); - } - - println!( - "{:<20} {:<30} {:<30} {:<10}", - "NAMESPACE", "SECRET", "HOST", "STATUS" - ); - println!("{:-<90}", ""); - - for c in &certs { - let status = if c.found { "OK" } else { "MISSING" }; - println!( - "{:<20} {:<30} {:<30} {:<10}", - truncate(&c.namespace, 20), - truncate(&c.secret_name, 30), - truncate(&c.host, 30), - status, - ); - } - - let missing = certs.iter().filter(|c| !c.found).count(); - println!("\nTotal: {} cert(s), {} missing", certs.len(), missing); - Ok(()) -} - -// ── validate ─────────────────────────────────────────────────────── - -async fn cmd_validate( - client: &Client, - namespace: Option, -) -> Result<(), Box> { - let ingresses = list_ingresses(client, namespace.as_deref()).await?; - let mut errors = 0usize; - let mut warnings = 0usize; - - for ing in &ingresses { - let ns = ing.namespace(); - let name = ing.name_any(); - - // Check: has rules - let has_rules = ing - .spec - .as_ref() - .map(|s| s.rules.as_ref().map(|r| !r.is_empty()).unwrap_or(false)) - .unwrap_or(false); - if !has_rules { - println!("[{}/{}] ERROR: No routing rules defined", ns, name); - errors += 1; - } - - // Check: has TLS but no secret - let tls_entries = ing - .spec - .as_ref() - .and_then(|s| s.tls.as_ref()) - .cloned() - .unwrap_or_default(); - for tls in &tls_entries { - let secret_name = tls.secret_name.as_deref().unwrap_or(""); - if secret_name.is_empty() { - println!( - "[{}/{}] WARNING: TLS configured but no secretName specified", - ns, name - ); - warnings += 1; - } else { - let found = check_secret_exists(client, &ns, secret_name).await; - if !found { - println!( - "[{}/{}] ERROR: TLS secret '{}' not found in namespace '{}'", - ns, name, secret_name, ns - ); - errors += 1; - } - } - } - - // Check: path backends reference valid services - if let Some(rules) = ing.spec.as_ref().and_then(|s| s.rules.as_ref()) { - for rule in rules { - if let Some(http) = &rule.http { - for path_item in &http.paths { - if let Some(svc) = &path_item.backend.service { - let endpoints = get_endpoint_status(client, &ns, &svc.name).await; - if endpoints.total == 0 { - println!( - "[{}/{}] WARNING: Backend service '{}' has no endpoints (host: {})", - ns, - name, - svc.name, - rule.host.as_deref().unwrap_or("*") - ); - warnings += 1; - } - } - } - } - } - } - } - - if errors == 0 && warnings == 0 { - println!( - "Validation passed — no issues found in {} Ingress(es).", - ingresses.len() - ); - } else { - println!( - "\nValidation complete: {} error(s), {} warning(s) across {} Ingress(es).", - errors, - warnings, - ingresses.len() - ); - } - - Ok(()) -} - -// ── status ───────────────────────────────────────────────────────── - -async fn cmd_status(client: &Client, json: bool) -> Result<(), Box> { - let ingresses = list_ingresses(client, None).await?; - let controller_pods = find_gingress_pods(client).await; - - if json { - #[derive(serde::Serialize)] - struct StatusOutput { - controller_pods: Vec, - ingress_count: usize, - route_count: usize, - backend_count: usize, - cert_count: usize, - } - let mut route_count = 0usize; - let mut backend_set = std::collections::HashSet::new(); - let mut cert_count = 0usize; - for ing in &ingresses { - if let Some(spec) = &ing.spec { - if let Some(rules) = &spec.rules { - for rule in rules { - if let Some(http) = &rule.http { - route_count += http.paths.len(); - for p in &http.paths { - if let Some(svc) = &p.backend.service { - backend_set.insert(format!("{}/{}", ing.namespace(), svc.name)); - } - } - } - } - } - if let Some(tls) = &spec.tls { - cert_count += tls.len(); - } - } - } - println!( - "{}", - serde_json::to_string_pretty(&StatusOutput { - controller_pods, - ingress_count: ingresses.len(), - route_count, - backend_count: backend_set.len(), - cert_count, - })? - ); - return Ok(()); - } - - // Controller status - println!("══ GIngress Controller Status ══\n"); - - if controller_pods.is_empty() { - println!( - "Controller: NOT FOUND (no pods with label gingress.io/component=controller)" - ); - } else { - for pod in &controller_pods { - let ready = if pod.ready { "Running" } else { "NotReady" }; - println!( - "Controller Pod: {:<30} {:<12} {}", - truncate(&pod.name, 30), - ready, - pod.namespace - ); - } - } - - // Resource summary - println!(); - println!("══ Managed Resources ══\n"); - - let mut route_count = 0usize; - let mut backend_set = std::collections::HashSet::new(); - let mut backend_total_eps = 0usize; - let mut backend_ready_eps = 0usize; - let mut cert_count = 0usize; - let mut cert_missing = 0usize; - - for ing in &ingresses { - if let Some(spec) = &ing.spec { - if let Some(rules) = &spec.rules { - for rule in rules { - if let Some(http) = &rule.http { - route_count += http.paths.len(); - for p in &http.paths { - if let Some(svc) = &p.backend.service { - let key = format!("{}/{}", ing.namespace(), svc.name); - if backend_set.insert(key) { - let eps = - get_endpoint_status(client, &ing.namespace(), &svc.name) - .await; - backend_total_eps += eps.total; - backend_ready_eps += eps.ready; - } - } - } - } - } - } - if let Some(tls) = &spec.tls { - cert_count += tls.len(); - for tls in tls { - let sn = tls.secret_name.as_deref().unwrap_or(""); - if !sn.is_empty() && !check_secret_exists(client, &ing.namespace(), sn).await { - cert_missing += 1; - } - } - } - } - } - - println!("Ingresses: {}", ingresses.len()); - println!("Routes: {}", route_count); - println!( - "Backends: {} ({} ready / {} total endpoints)", - backend_set.len(), - backend_ready_eps, - backend_total_eps - ); - println!( - "TLS Certs: {} ({} missing)", - cert_count, cert_missing - ); - println!(); - - // Overall health - let healthy = !ingresses.is_empty() - && (cert_missing == 0) - && (backend_set.is_empty() || backend_ready_eps > 0) - && controller_pods.iter().any(|p| p.ready); - - if healthy { - println!("Status: HEALTHY"); - } else { - println!("Status: DEGRADED"); - if controller_pods.is_empty() || !controller_pods.iter().any(|p| p.ready) { - println!(" → Controller pod not running or not ready"); - } - if cert_missing > 0 { - println!(" → {} TLS secret(s) missing", cert_missing); - } - if !backend_set.is_empty() && backend_ready_eps == 0 { - println!(" → No ready backend endpoints"); - } - } - - Ok(()) -} - -// ── k8s helpers ──────────────────────────────────────────────────── - -#[derive(serde::Serialize)] -struct IngressSummary { - namespace: String, - name: String, - hosts: Vec, - #[serde(skip)] - paths_for_display: Vec, - has_tls: bool, - #[serde(skip)] - spec: Option, -} - -#[derive(Clone)] -struct PathSummary { - path_type: String, - path: String, -} - -impl IngressSummary { - fn namespace(&self) -> String { - self.namespace.clone() - } - fn name_any(&self) -> String { - self.name.clone() - } - fn hosts(&self) -> &[String] { - &self.hosts - } - fn paths_display(&self) -> &[PathSummary] { - &self.paths_for_display - } - fn has_tls(&self) -> bool { - self.has_tls - } -} - -fn ingress_to_summary(ing: &Ingress) -> IngressSummary { - let spec = ing.spec.clone(); - let mut hosts = Vec::new(); - let mut paths = Vec::new(); - let mut has_tls = false; - - if let Some(ref s) = spec { - if let Some(ref rules) = s.rules { - for rule in rules { - if let Some(ref host) = rule.host { - hosts.push(host.clone()); - } - if let Some(ref http) = rule.http { - for p in &http.paths { - paths.push(PathSummary { - path_type: p.path_type.clone(), - path: p.path.clone().unwrap_or_else(|| "/".into()), - }); - } - } - } - } - has_tls = s.tls.as_ref().map(|t| !t.is_empty()).unwrap_or(false); - } - - IngressSummary { - namespace: ing.namespace().unwrap_or_default(), - name: ing.name_any(), - hosts, - paths_for_display: paths, - has_tls, - spec, - } -} - -#[derive(serde::Serialize)] -struct PodInfo { - name: String, - namespace: String, - ready: bool, -} - -/// Find GIngress controller pods by label `gingress.io/component=controller`. -async fn find_gingress_pods(client: &Client) -> Vec { - let api: Api = Api::all(client.clone()); - let lp = ListParams { - label_selector: Some("gingress.io/component=controller".into()), - ..Default::default() - }; - match api.list(&lp).await { - Ok(list) => list - .items - .into_iter() - .map(|pod| { - let ready = pod - .status - .as_ref() - .and_then(|s| s.conditions.as_ref()) - .map(|conds| { - conds - .iter() - .any(|c| c.type_ == "Ready" && c.status == "True") - }) - .unwrap_or(false); - PodInfo { - name: pod.name_any(), - namespace: pod.namespace().unwrap_or_default(), - ready, - } - }) - .collect(), - Err(_) => Vec::new(), - } -} - -async fn list_ingresses( - client: &Client, - namespace: Option<&str>, -) -> Result, Box> { - let params = ListParams { - ..Default::default() - }; - - if let Some(ns) = namespace { - let api: Api = Api::namespaced(client.clone(), ns); - let list = api.list(¶ms).await?; - Ok(list - .items - .into_iter() - .filter(|ing| is_gingress_class(ing)) - .map(|ing| ingress_to_summary(&ing)) - .collect()) - } else { - let api: Api = Api::all(client.clone()); - let list = api.list(¶ms).await?; - Ok(list - .items - .into_iter() - .filter(|ing| is_gingress_class(ing)) - .map(|ing| ingress_to_summary(&ing)) - .collect()) - } -} - -fn is_gingress_class(ingress: &Ingress) -> bool { - ingress - .spec - .as_ref() - .and_then(|s| s.ingress_class_name.as_deref()) - == Some(INGRESS_CLASS) -} - -struct EndpointStatus { - ready: usize, - total: usize, -} - -async fn get_endpoint_status( - client: &Client, - namespace: &str, - service_name: &str, -) -> EndpointStatus { - use k8s_openapi::api::core::v1::Endpoints; - let api: Api = Api::namespaced(client.clone(), namespace); - match api.get_opt(service_name).await { - Ok(Some(eps)) => { - let mut ready = 0usize; - let mut total = 0usize; - if let Some(subsets) = &eps.subsets { - for subset in subsets { - let addrs = subset.addresses.as_deref().unwrap_or_default(); - let not_ready = subset.not_ready_addresses.as_deref().unwrap_or_default(); - ready += addrs.len(); - total += addrs.len() + not_ready.len(); - } - } - EndpointStatus { ready, total } - } - _ => EndpointStatus { ready: 0, total: 0 }, - } -} - -async fn check_secret_exists(client: &Client, namespace: &str, name: &str) -> bool { - let api: Api = Api::namespaced(client.clone(), namespace); - api.get_opt(name).await.ok().flatten().is_some() -} - -fn extract_backend(path_item: &HTTPIngressPath) -> String { - path_item - .backend - .service - .as_ref() - .map(|s| s.name.clone()) - .unwrap_or_else(|| "".into()) -} - -fn extract_backend_port(path_item: &HTTPIngressPath) -> u16 { - path_item - .backend - .service - .as_ref() - .and_then(|s| s.port.as_ref()) - .and_then(|p| p.number) - .unwrap_or(80) as u16 -} - -fn extract_backend_port_str(r: &RouteRow) -> String { - r.port.to_string() -} - -#[derive(serde::Serialize)] -struct RouteRow { - namespace: String, - ingress: String, - host: String, - path: String, - path_type: String, - backend: String, - port: u16, -} - -#[derive(serde::Serialize)] -struct BackendRow { - namespace: String, - service: String, - port: u16, - ready_endpoints: usize, - total_endpoints: usize, - referenced_by: String, -} - -#[derive(serde::Serialize)] -struct CertRow { - namespace: String, - secret_name: String, - host: String, - found: bool, -} - -fn truncate(s: &str, max: usize) -> String { - // Account for CJK characters — each wide char counts as 2 - let mut width = 0usize; - let mut result = String::new(); - for c in s.chars() { - let cw = if c.is_ascii() { 1 } else { 2 }; - if width + cw > max { - result.push_str("…"); - break; - } - result.push(c); - width += cw; - } - result -} diff --git a/apps/gingress/src/controller/endpoint_watcher.rs b/apps/gingress/src/controller/endpoint_watcher.rs deleted file mode 100644 index 6aaeefc..0000000 --- a/apps/gingress/src/controller/endpoint_watcher.rs +++ /dev/null @@ -1,148 +0,0 @@ -//! Watches Kubernetes Endpoints and updates upstream endpoint lists. -//! -//! Tracks Pod IPs for each Service. When endpoints change (scale up/down, -//! rolling restart, health check failures), the upstream pool is updated. - -use futures::StreamExt; -use futures::pin_mut; -use gingress_proxy::config::{ConfigStore, Endpoint}; -use k8s_openapi::api::core::v1::Endpoints as K8sEndpoints; -use kube::ResourceExt; -use kube::runtime::watcher::{self, Event}; -use std::sync::Arc; - -/// Watch Endpoints and update the ConfigStore. -pub async fn watch_endpoints( - client: Arc, - store: Arc, - _namespace: Option, - on_change: Arc, -) { - let api = kube::Api::::all(client.as_ref().clone()); - let config = watcher::Config::default(); - let watcher = watcher::watcher(api, config); - pin_mut!(watcher); - - while let Some(event) = watcher.next().await { - match event { - Ok(Event::Apply(eps)) => { - process_endpoints(&eps, &store, &on_change); - } - Ok(Event::Init) => { - tracing::info!("Endpoint watcher re-initializing"); - } - Ok(Event::InitApply(eps)) => { - process_endpoints(&eps, &store, &on_change); - } - Ok(Event::InitDone) => { - tracing::info!("Endpoint watcher init complete"); - } - Ok(Event::Delete(eps)) => { - remove_endpoints(&eps, &store, &on_change); - } - Err(e) => { - tracing::error!("Endpoint watcher error: {}", e); - } - } - } -} - -/// Extract endpoint addresses, grouped by port, and update the ConfigStore. -/// -/// Stores endpoints under key `upstream:/:` to match -/// the proxy's upstream lookup format. -fn process_endpoints( - endpoints: &K8sEndpoints, - store: &ConfigStore, - on_change: &Arc, -) { - use std::collections::HashMap; - - let name = endpoints.name_any(); - let namespace = endpoints.namespace().unwrap_or_default(); - let base_prefix = format!("upstream:{}/{}:", namespace, name); - - // Collect endpoints grouped by port - let mut port_groups: HashMap> = HashMap::new(); - - if let Some(subsets) = &endpoints.subsets { - for subset in subsets { - let addrs = subset.addresses.as_deref().unwrap_or_default(); - let ports = subset.ports.as_deref().unwrap_or_default(); - let not_ready_addrs = subset.not_ready_addresses.as_deref().unwrap_or_default(); - - for port in ports { - let port_num = port.port as u16; - let eps = port_groups.entry(port_num).or_default(); - for addr in addrs { - eps.push(Endpoint { - ip: addr.ip.clone(), - port: port_num, - ready: true, - }); - } - for addr in not_ready_addrs { - eps.push(Endpoint { - ip: addr.ip.clone(), - port: port_num, - ready: false, - }); - } - } - } - } - - // Clear old per-port keys for this service (handles port removal) - let old_keys = store.keys_with_prefix(&base_prefix); - for k in old_keys { - store.remove(&k); - } - - // Write per-port endpoint entries - let mut total = 0usize; - for (port_num, eps) in &port_groups { - let key = format!("{}{}", base_prefix, port_num); - store.set(&key, eps); - total += eps.len(); - } - - // If no ports at all, write an empty entry for the base key so the reconciler - // can detect that this service has no endpoints. - if port_groups.is_empty() { - store.set::>(&format!("upstream:{}/{}", namespace, name), &vec![]); - } - - store.signal_reload(); - on_change(); - - tracing::debug!( - namespace = %namespace, - name = %name, - num_ports = port_groups.len(), - num_endpoints = total, - "Endpoints updated" - ); -} - -/// Remove all per-port endpoint keys when the Endpoint resource is deleted. -fn remove_endpoints( - endpoints: &K8sEndpoints, - store: &ConfigStore, - on_change: &Arc, -) { - let name = endpoints.name_any(); - let namespace = endpoints.namespace().unwrap_or_default(); - let base_prefix = format!("upstream:{}/{}:", namespace, name); - - // Remove all per-port keys - let keys = store.keys_with_prefix(&base_prefix); - for k in keys { - store.remove(&k); - } - // Also remove the port-less key (in case no ports were present) - store.remove(&format!("upstream:{}/{}", namespace, name)); - - store.signal_reload(); - on_change(); - tracing::info!(namespace = %namespace, name = %name, "Endpoints removed"); -} diff --git a/apps/gingress/src/controller/ingress_watcher.rs b/apps/gingress/src/controller/ingress_watcher.rs deleted file mode 100644 index 4d0963b..0000000 --- a/apps/gingress/src/controller/ingress_watcher.rs +++ /dev/null @@ -1,426 +0,0 @@ -//! Watches Kubernetes Ingress resources and converts them to routing rules. - -use futures::StreamExt; -use futures::pin_mut; -use gingress_proxy::config::{ - ConfigStore, HeaderOp, PathType, RateLimitPolicy, RouteRule, SessionAffinityConfig, -}; -use k8s_openapi::api::networking::v1::{HTTPIngressPath, Ingress}; -use kube::ResourceExt; -use kube::runtime::watcher::{self, Event}; -use std::collections::BTreeMap; -use std::sync::Arc; - -/// Watch Ingress resources and update the ConfigStore. -/// -/// After each event, the `on_change` callback is invoked so the reconciler -/// can cross-reference all fragments into a complete ProxyConfig. -pub async fn watch_ingresses( - client: Arc, - store: Arc, - ingress_class: String, - namespace: Option, - on_change: Arc, -) { - let api = kube::Api::::all(client.as_ref().clone()); - - let config = watcher::Config { - field_selector: namespace - .as_ref() - .map(|ns| format!("metadata.namespace={}", ns)), - ..Default::default() - }; - - let ingress_watcher = watcher::watcher(api, config); - pin_mut!(ingress_watcher); - - while let Some(event) = ingress_watcher.next().await { - match event { - Ok(Event::Apply(ingress)) => { - let name = ingress.name_any(); - let ns = ingress.namespace().unwrap_or_default(); - if is_gingress_class(&ingress, &ingress_class) { - process_ingress(&ingress, &store, &ingress_class); - on_change(); - tracing::info!(namespace = %ns, name = %name, "Ingress applied"); - } - } - Ok(Event::Init) => { - store.remove_prefix("ingress:"); - store.remove_prefix("tls-host:"); - tracing::info!("Ingress watcher re-initializing"); - } - Ok(Event::InitApply(ingress)) => { - if is_gingress_class(&ingress, &ingress_class) { - process_ingress(&ingress, &store, &ingress_class); - } - } - Ok(Event::InitDone) => { - store.signal_reload(); - on_change(); - tracing::info!("Ingress watcher init complete"); - } - Ok(Event::Delete(ingress)) => { - if is_gingress_class(&ingress, &ingress_class) { - remove_ingress_routes(&ingress, &store); - on_change(); - tracing::info!( - name = %ingress.name_any(), - namespace = %ingress.namespace().unwrap_or_default(), - "Ingress deleted, routes removed" - ); - } - } - Err(e) => { - tracing::error!("Ingress watcher error: {}", e); - } - } - } -} - -/// Check if an Ingress specifies the gingress class. -fn is_gingress_class(ingress: &Ingress, class_name: &str) -> bool { - ingress - .spec - .as_ref() - .and_then(|s| s.ingress_class_name.as_deref()) - == Some(class_name) -} - -/// Process an Ingress resource: extract routes and update the store. -fn process_ingress(ingress: &Ingress, store: &ConfigStore, _ingress_class: &str) { - let namespace = ingress.namespace().unwrap_or_default(); - let name = ingress.name_any(); - let spec = match ingress.spec.as_ref() { - Some(s) => s, - None => return, - }; - - // Build an ingress-scoped prefix so we can clean up old routes for this Ingress - let ingress_prefix = format!("ingress:{}/{}:", namespace, name); - - // Remove old route entries scoped to this Ingress - let old_route_keys = store.keys_with_prefix(&format!("{}route:", ingress_prefix)); - for key in &old_route_keys { - store.remove(key); - } - - // Process routing rules - if let Some(rules) = &spec.rules { - for rule in rules { - let host = rule.host.as_deref().unwrap_or("*"); - if let Some(http) = &rule.http { - let mut routes: Vec = Vec::new(); - for path_item in &http.paths { - routes.push(ingress_path_to_route(host, path_item, &namespace)); - } - // Store per-ingress routes so we can clean up on delete - let route_key = format!("{}route:{}", ingress_prefix, host); - store.set(&route_key, &routes); - } - } - } - - // Process TLS: map secretName -> hosts so the reconciler can cross-reference - if let Some(tls_entries) = &spec.tls { - for tls in tls_entries { - let secret_name = tls.secret_name.as_deref().unwrap_or_default(); - let hosts: Vec = tls.hosts.clone().unwrap_or_default(); - let tls_host_key = format!("tls-host:{}", secret_name); - store.set(&tls_host_key, &hosts); - } - } - - // Process annotations for advanced features - let annotations = ingress.annotations(); - process_annotations(&annotations, &ingress_prefix, &namespace, store); - - store.signal_reload(); -} - -/// Convert a Kubernetes Ingress path to an internal RouteRule. -fn ingress_path_to_route(host: &str, path: &HTTPIngressPath, namespace: &str) -> RouteRule { - let service = path - .backend - .service - .as_ref() - .expect("Ingress backend must reference a service"); - - RouteRule { - host: host.to_string(), - path: path.path.clone().unwrap_or_else(|| "/".to_string()), - path_type: match path.path_type.as_str() { - "Prefix" => PathType::Prefix, - "Exact" => PathType::Exact, - _ => PathType::ImplementationSpecific, - }, - backend: gingress_proxy::config::Backend { - namespace: namespace.to_string(), - name: service.name.clone(), - port: service.port.as_ref().and_then(|p| p.number).unwrap_or(80) as u16, - }, - } -} - -/// Annotation keys for GIngress features. -const ANN_RATE_LIMIT: &str = "gingress.io/rate-limit"; -const ANN_RATE_LIMIT_BURST: &str = "gingress.io/rate-limit-burst"; -const ANN_REQUEST_HEADERS: &str = "gingress.io/request-headers"; -const ANN_WEBSOCKET: &str = "gingress.io/websocket"; -const ANN_SESSION_AFFINITY: &str = "gingress.io/session-affinity"; -const ANN_GIT_BACKEND: &str = "gingress.io/git-backend"; - -/// Parse Ingress annotations and write corresponding ConfigStore entries. -/// -/// Supported annotations: -/// - `gingress.io/rate-limit` — "RPS" or "RPS/BURST" (e.g., "100" or "100/200") -/// - `gingress.io/rate-limit-burst` — Override burst size -/// - `gingress.io/request-headers` — JSON array of header operations -/// - `gingress.io/websocket` — "true" to enable WebSocket upgrade for this host -/// - `gingress.io/session-affinity` — "cookie" or "cookie:NAME:TTL_SECONDS" -fn process_annotations( - annotations: &BTreeMap, - ingress_prefix: &str, - namespace: &str, - store: &ConfigStore, -) { - // Collect hosts from the ingress routes that were just stored - let route_keys = store.keys_with_prefix(&format!("{}route:", ingress_prefix)); - let hosts: Vec = route_keys - .iter() - .filter_map(|k| k.split(":route:").nth(1).map(String::from)) - .collect(); - - if hosts.is_empty() { - return; - } - - // Remove old per-host annotation keys (handles annotation removal/update) - for host in &hosts { - store.remove(&format!("rate_limit:{}", host)); - store.remove(&format!("headers:{}", host)); - store.remove(&format!("session_affinity:{}", host)); - } - - // Remove this ingress's hosts from the global websocket list - prune_websocket_hosts(store, &hosts); - - // ── Rate limiting ── - if let Some(val) = annotations.get(ANN_RATE_LIMIT) { - let (rps, burst) = parse_rate_limit(val, annotations.get(ANN_RATE_LIMIT_BURST)); - for host in &hosts { - store.set( - &format!("rate_limit:{}", host), - &RateLimitPolicy { - host: host.clone(), - requests_per_second: rps, - burst_size: burst, - }, - ); - } - } - - // ── Header operations (request) ── - if let Some(val) = annotations.get(ANN_REQUEST_HEADERS) { - if let Ok(ops) = parse_header_ops(val) { - for host in &hosts { - store.set(&format!("headers:{}", host), &ops); - } - } else { - tracing::warn!(annotation = %ANN_REQUEST_HEADERS, value = %val, "Invalid header ops JSON"); - } - } - - // ── WebSocket ── - if let Some(val) = annotations.get(ANN_WEBSOCKET) { - if val.trim().to_lowercase() == "true" { - let mut ws_hosts: Vec = hosts.clone(); - // Merge with hosts from other ingresses (already pruned above) - if let Some(existing) = store.get::>("websocket:hosts") { - for h in existing { - if !ws_hosts.contains(&h) { - ws_hosts.push(h); - } - } - } - store.set("websocket:hosts", &ws_hosts); - } - } - - // ── Session affinity ── - if let Some(val) = annotations.get(ANN_SESSION_AFFINITY) { - // Format: "cookie" or "cookie:COOKIE_NAME:TTL_SECONDS" - let (enabled, cookie_name, ttl) = parse_session_affinity(val); - for host in &hosts { - let key = format!("session_affinity:{}", host); - store.set( - &key, - &SessionAffinityConfig { - enabled, - cookie_name: cookie_name.clone(), - cookie_ttl_seconds: ttl, - }, - ); - } - } - - // ── Git backend ── - // When present, requests with Git User-Agent (git/*, JGit/*) are routed to - // this backend instead of normal host+path matching. - // Value format: "namespace/service-name:port" or "service-name:port" (namespace from Ingress) - if let Some(val) = annotations.get(ANN_GIT_BACKEND) { - if let Some(backend) = parse_git_backend(val, &namespace) { - store.set("git_backend", &backend); - tracing::info!( - backend = format!("{}/{}:{}", backend.namespace, backend.name, backend.port), - "Git backend configured" - ); - } else { - tracing::warn!(annotation = %ANN_GIT_BACKEND, value = %val, "Invalid git-backend format, expected 'namespace/name:port' or 'name:port'"); - } - } -} - -fn parse_rate_limit(val: &str, burst_override: Option<&String>) -> (u32, u32) { - let val = val.trim(); - if let Some((rps_str, burst_str)) = val.split_once('/') { - let rps = rps_str.parse().unwrap_or(0); - let burst = burst_str.parse().unwrap_or(rps); - (rps, burst) - } else { - let rps = val.parse().unwrap_or(0); - let burst = burst_override - .and_then(|b| b.parse().ok()) - .unwrap_or(rps * 2); - (rps, burst) - } -} - -#[derive(serde::Deserialize)] -struct HeaderOpAnnotation { - op: String, - name: String, - #[serde(default)] - value: Option, -} - -fn parse_header_ops(val: &str) -> anyhow::Result> { - let items: Vec = serde_json::from_str(val)?; - items - .into_iter() - .map(|item| { - Ok(match item.op.as_str() { - "set" => HeaderOp::Set { - name: item.name, - value: item.value.unwrap_or_default(), - }, - "add" => HeaderOp::Add { - name: item.name, - value: item.value.unwrap_or_default(), - }, - "remove" => HeaderOp::Remove { name: item.name }, - _ => anyhow::bail!("Unknown header op: {}", item.op), - }) - }) - .collect() -} - -fn parse_session_affinity(val: &str) -> (bool, String, u64) { - let val = val.trim(); - if val.eq_ignore_ascii_case("cookie") || val.eq_ignore_ascii_case("true") { - return (true, "GINGRESS_AFFINITY".into(), 3600); - } - // Format: "cookie:COOKIE_NAME:TTL" - let parts: Vec<&str> = val.split(':').collect(); - if parts.len() >= 3 { - let name = parts[1].to_string(); - let ttl = parts[2].parse().unwrap_or(3600); - (true, name, ttl) - } else if parts.len() == 2 { - let name = parts[1].to_string(); - (true, name, 3600) - } else { - (false, String::new(), 0) - } -} - -/// Parse git-backend annotation value. -/// -/// Format: "namespace/name:port" or "name:port" (namespace defaults to Ingress namespace). -fn parse_git_backend( - val: &str, - default_namespace: &str, -) -> Option { - let val = val.trim(); - // Split off port: "namespace/name:port" → ("namespace/name", "port") - let (ns_name, port_str) = val.rsplit_once(':').unwrap_or((val, "")); - let port: u16 = port_str.parse().ok()?; - - // Split namespace and name: "namespace/name" → ("namespace", "name") - let (namespace, name) = if let Some((ns, n)) = ns_name.rsplit_once('/') { - (ns.to_string(), n.to_string()) - } else { - // No namespace specified — use the Ingress namespace - (default_namespace.to_string(), ns_name.to_string()) - }; - - if name.is_empty() { - return None; - } - - Some(gingress_proxy::config::Backend { - namespace, - name, - port, - }) -} - -/// Remove a set of hosts from the global websocket host list (scoped cleanup). -fn prune_websocket_hosts(store: &ConfigStore, hosts_to_remove: &[String]) { - if let Some(mut existing) = store.get::>("websocket:hosts") { - existing.retain(|h| !hosts_to_remove.contains(h)); - if existing.is_empty() { - store.remove("websocket:hosts"); - } else { - store.set("websocket:hosts", &existing); - } - } -} - -/// Remove all routes associated with a deleted Ingress. -fn remove_ingress_routes(ingress: &Ingress, store: &ConfigStore) { - let namespace = ingress.namespace().unwrap_or_default(); - let name = ingress.name_any(); - let ingress_prefix = format!("ingress:{}/{}:", namespace, name); - - // Collect hosts before deleting routes so we can clean up per-host annotation keys - let host_keys: Vec = store - .keys_with_prefix(&format!("{}route:", ingress_prefix)) - .iter() - .filter_map(|k| k.split(":route:").nth(1).map(String::from)) - .collect(); - - // Remove all route entries for this Ingress - store.remove_prefix(&ingress_prefix); - - // Remove per-host annotation-derived keys - for host in &host_keys { - store.remove(&format!("rate_limit:{}", host)); - store.remove(&format!("headers:{}", host)); - store.remove(&format!("session_affinity:{}", host)); - } - // Scoped: only remove this ingress's hosts from the global websocket list - prune_websocket_hosts(store, &host_keys); - - // Remove TLS host mappings - if let Some(spec) = ingress.spec.as_ref() { - if let Some(tls_entries) = &spec.tls { - for tls in tls_entries { - let sn = tls.secret_name.as_deref().unwrap_or_default(); - store.remove(&format!("tls-host:{}", sn)); - } - } - } - - store.signal_reload(); -} diff --git a/apps/gingress/src/controller/mod.rs b/apps/gingress/src/controller/mod.rs deleted file mode 100644 index 5edb20d..0000000 --- a/apps/gingress/src/controller/mod.rs +++ /dev/null @@ -1,88 +0,0 @@ -//! Kubernetes controller for GIngress. -//! -//! Watches Ingress, Service, EndpointSlice, and Secret resources, -//! reconciles them into the shared `ConfigStore`. - -mod endpoint_watcher; -mod ingress_watcher; -mod reconciler; -mod secret_watcher; - -use anyhow::Context; -use gingress_proxy::config::ConfigStore; -use kube::Client; -use std::sync::Arc; -use tokio::task::JoinHandle; - -/// Start all controller watchers and the reconcile loop. -/// -/// Each watcher: -/// 1. Watches a specific K8s resource type -/// 2. Writes its fragment to the `ConfigStore` -/// 3. Calls `reconciler.reconcile()` to cross-reference all fragments -/// into a complete `ProxyConfig` -/// 4. The reconiler signals `ConfigStore::signal_reload()` -/// 5. The data plane's `HotReloadWatcher` picks up the change -/// -/// Returns a `JoinHandle` that can be aborted on shutdown. -pub async fn start( - store: ConfigStore, - ingress_class: String, - namespace: Option, -) -> anyhow::Result> { - let client = Client::try_default().await.context( - "Failed to create Kubernetes client. Are you running in a cluster or have a kubeconfig?", - )?; - - tracing::info!("Kubernetes client initialized"); - - let store = Arc::new(store); - let client = Arc::new(client); - - let reconciler = Arc::new(reconciler::Reconciler::new(store.clone())); - - // Callback invoked by every watcher after processing an event. - // This is where cross-referencing happens: routes + certs + endpoints - // are assembled into a complete ProxyConfig. - let on_change: Arc = { - let r = reconciler.clone(); - Arc::new(move || { - r.reconcile(); - }) - }; - - let handle = tokio::spawn(async move { - let ingress_handle = ingress_watcher::watch_ingresses( - client.clone(), - store.clone(), - ingress_class, - namespace.clone(), - on_change.clone(), - ); - - let secret_handle = secret_watcher::watch_secrets( - client.clone(), - store.clone(), - namespace.clone(), - on_change.clone(), - ); - - let endpoint_handle = endpoint_watcher::watch_endpoints( - client.clone(), - store.clone(), - namespace, - on_change.clone(), - ); - - tracing::info!("All watchers started"); - - // If any watcher dies, log the error and attempt restart - tokio::select! { - r = ingress_handle => tracing::error!("Ingress watcher exited: {:?}", r), - r = secret_handle => tracing::error!("Secret watcher exited: {:?}", r), - r = endpoint_handle => tracing::error!("Endpoint watcher exited: {:?}", r), - } - }); - - Ok(handle) -} diff --git a/apps/gingress/src/controller/reconciler.rs b/apps/gingress/src/controller/reconciler.rs deleted file mode 100644 index c18490b..0000000 --- a/apps/gingress/src/controller/reconciler.rs +++ /dev/null @@ -1,241 +0,0 @@ -//! Reconcile loop for the GIngress controller. -//! -//! After any watcher detects a change (Ingress, Secret, Endpoints), -//! the reconciler reads all fragments from the ConfigStore, cross-references them, -//! assembles a complete `ProxyConfig`, validates it, and signals a reload. - -use gingress_proxy::config::{ConfigStore, Endpoint, ProxyConfig, RouteRule, TlsCert}; -use std::collections::{HashMap, HashSet}; -use std::sync::Arc; - -/// Reconcile the full proxy configuration from current k8s state. -pub struct Reconciler { - store: Arc, -} - -impl Reconciler { - pub fn new(store: Arc) -> Self { - Self { store } - } - - /// Trigger a full reconciliation. - /// - /// 1. Reads all route fragments (from Ingress watcher) - /// 2. Reads all TLS certs (from Secret watcher) - /// 3. Reads all upstream endpoints (from Endpoint watcher) - /// 4. Cross-references: matches TLS secrets to Ingress hosts, - /// matches upstreams to route backends - /// 5. Validates the configuration - /// 6. Writes the assembled ProxyConfig to the store - /// 7. Signals reload - pub fn reconcile(&self) { - tracing::debug!("Reconciliation started"); - - // Step 1: Gather all routes from ingress-scoped keys - // Keys look like: "ingress:/:route:" - let mut routes: HashMap> = HashMap::new(); - for key in self.store.keys_with_prefix("ingress:") { - if !key.contains(":route:") { - continue; - } - // Extract host from "ingress:/:route:" - if let Some(host) = key.split(":route:").nth(1) { - if let Some(rules) = self.store.get::>(&key) { - if !rules.is_empty() { - routes.entry(host.to_string()).or_default().extend(rules); - } - } - } - } - - // Step 2: Gather all TLS certs - let mut tls_certs: HashMap = HashMap::new(); - for key in self.store.keys_with_prefix("tls:") { - if let Some(cert) = self.store.get::(&key) { - tls_certs.insert(cert.host.clone(), cert); - } - } - - // Step 3: Gather all upstreams keyed by backend ("/:") - let mut upstreams: HashMap> = HashMap::new(); - for key in self.store.keys_with_prefix("upstream:") { - if let Some(eps) = self.store.get::>(&key) { - if !eps.is_empty() { - upstreams.insert(key.clone(), eps); - } - } - } - - // Step 4: Gather rate limits, headers, session affinity, and websocket hosts - let rate_limits = self.collect_rate_limits(&routes); - let headers = self.collect_headers(); - let session_affinity = self.collect_session_affinity(&routes); - let websocket_hosts = self.collect_websocket_hosts(); - let git_upstream = self.collect_git_backend(); - - // Step 5: Build the complete ProxyConfig - let cfg = ProxyConfig { - routes, - tls: tls_certs, - upstreams, - rate_limits, - headers, - session_affinity, - websocket_hosts, - git_upstream, - }; - - // Step 6: Validate - let warnings = self.validate_config(&cfg); - for w in &warnings { - tracing::warn!("{}", w); - } - - // Step 7: Store the assembled config as the canonical snapshot - self.store.set( - "_assembled", - &serde_json::to_value(&cfg).unwrap_or_default(), - ); - - self.store.signal_reload(); - tracing::info!( - routes = cfg.routes.len(), - tls_hosts = cfg.tls.len(), - upstreams = cfg.upstreams.len(), - warnings = warnings.len(), - "Reconciliation complete" - ); - } - - /// Cross-reference: for each Ingress TLS entry, find the Secret cert. - /// - /// The Ingress TLS section maps: `hosts: [example.com]` → `secretName: my-cert`. - /// The Secret watcher stores the cert at key `tls:`. - /// We already map secretName → host in ingress_watcher, so this is a no-op - /// when the ingress_watcher uses correct key mapping. - #[allow(dead_code)] - pub fn cross_reference_tls(&self) -> HashMap { - let mut host_certs: HashMap = HashMap::new(); - - // TLS secret name → host mapping is stored by the ingress watcher - // at key: "tls-host:" → Vec (hosts) - for key in self.store.keys_with_prefix("tls-host:") { - let secret_name = &key["tls-host:".len()..]; - let hosts: Vec = self.store.get::>(&key).unwrap_or_default(); - - // Look up the actual cert: key "tls:" (stored by secret watcher - // using the certificate's SAN/CN host, but also "tls-secret:") - let cert_key = format!("tls-secret:{}", secret_name); - if let Some(cert) = self.store.get::(&cert_key) { - for host in hosts { - host_certs.insert(host, cert.clone()); - } - } - } - - host_certs - } - - /// Validate the assembled configuration. Returns warnings. - fn validate_config(&self, cfg: &ProxyConfig) -> Vec { - let mut warnings = Vec::new(); - - // Check: every TLS host has a route - for host in cfg.tls.keys() { - if !cfg.routes.contains_key(host) { - warnings.push(format!( - "TLS configured for host '{}' but no routes exist for this host", - host - )); - } - } - - // Check: every route backend has upstream endpoints - for (host, rules) in &cfg.routes { - for rule in rules { - let backend_key = - format!("upstream:{}/{}", rule.backend.namespace, rule.backend.name); - - if !cfg.upstreams.contains_key(&backend_key) { - warnings.push(format!( - "Host '{}' routes to backend {}/{}:{} but no endpoints found", - host, rule.backend.namespace, rule.backend.name, rule.backend.port - )); - } - } - } - - // Check: orphaned upstreams (no route references them) - let mut referenced_backends: HashSet = HashSet::new(); - for rules in cfg.routes.values() { - for rule in rules { - let bk = format!("upstream:{}/{}", rule.backend.namespace, rule.backend.name); - referenced_backends.insert(bk); - } - } - for upstream_key in cfg.upstreams.keys() { - if !referenced_backends.contains(upstream_key) { - warnings.push(format!( - "Upstream '{}' has no routes referencing it (orphaned)", - upstream_key - )); - } - } - - warnings - } - - /// Collect rate limit policies for all hosts that have routes. - fn collect_rate_limits( - &self, - routes: &HashMap>, - ) -> HashMap { - let mut limits = HashMap::new(); - for host in routes.keys() { - let key = format!("rate_limit:{}", host); - if let Some(policy) = self.store.get(&key) { - limits.insert(host.clone(), policy); - } - } - limits - } - - /// Collect header operations for all hosts. - fn collect_headers(&self) -> HashMap> { - let mut headers = HashMap::new(); - for key in self.store.keys_with_prefix("headers:") { - let host = &key["headers:".len()..]; - if let Some(ops) = self.store.get(&key) { - headers.insert(host.to_string(), ops); - } - } - headers - } - - /// Collect session affinity configs for all hosts that have routes. - fn collect_session_affinity( - &self, - _routes: &HashMap>, - ) -> HashMap { - let mut affinity = HashMap::new(); - for key in self.store.keys_with_prefix("session_affinity:") { - let host = &key["session_affinity:".len()..]; - if let Some(cfg) = self.store.get(&key) { - affinity.insert(host.to_string(), cfg); - } - } - affinity - } - - /// Collect WebSocket-enabled hosts. - fn collect_websocket_hosts(&self) -> Vec { - self.store - .get::>("websocket:hosts") - .unwrap_or_default() - } - - /// Collect git backend configuration from annotation. - fn collect_git_backend(&self) -> Option { - self.store.get("git_backend") - } -} diff --git a/apps/gingress/src/controller/secret_watcher.rs b/apps/gingress/src/controller/secret_watcher.rs deleted file mode 100644 index db8e4d1..0000000 --- a/apps/gingress/src/controller/secret_watcher.rs +++ /dev/null @@ -1,166 +0,0 @@ -//! Watches Kubernetes TLS Secrets and loads certificates. -//! -//! Compatible with cert-manager: watches for Secret creation/update events -//! and parses `tls.crt` and `tls.key` into the ConfigStore for TLS termination. -//! -//! Key convention: -//! - `tls-secret:` — the raw cert, cross-referenced by reconciler -//! via the `tls-host:` mapping written by the ingress watcher. -//! - After reconciliation, the reconciler copies certs to `tls:` for -//! direct SNI lookup by the proxy. - -use futures::StreamExt; -use futures::pin_mut; -use gingress_proxy::config::{ConfigStore, TlsCert}; -use kube::ResourceExt; -use kube::runtime::watcher::{self, Event}; -use std::sync::Arc; - -/// Watch Secrets of type `kubernetes.io/tls` and update the ConfigStore. -/// -/// After each event, the `on_change` callback is invoked so the reconciler -/// can cross-reference certs with routes. -pub async fn watch_secrets( - client: Arc, - store: Arc, - _namespace: Option, - on_change: Arc, -) { - let api = kube::Api::::all(client.as_ref().clone()); - - let config = watcher::Config { - field_selector: Some("type=kubernetes.io/tls".to_string()), - ..Default::default() - }; - - let secret_watcher = watcher::watcher(api, config); - pin_mut!(secret_watcher); - - while let Some(event) = secret_watcher.next().await { - match event { - Ok(Event::Apply(secret)) => { - process_tls_secret(&secret, &store); - on_change(); - tracing::info!( - name = %secret.name_any(), - namespace = %secret.namespace().unwrap_or_default(), - "TLS Secret applied" - ); - } - Ok(Event::Init) => { - store.remove_prefix("tls-secret:"); - tracing::info!("Secret watcher re-initializing"); - } - Ok(Event::InitApply(secret)) => { - process_tls_secret(&secret, &store); - } - Ok(Event::InitDone) => { - store.signal_reload(); - on_change(); - tracing::info!("Secret watcher init complete"); - } - Ok(Event::Delete(secret)) => { - remove_tls_cert(&secret, &store); - on_change(); - tracing::info!( - name = %secret.name_any(), - "TLS Secret deleted, cert removed" - ); - } - Err(e) => { - tracing::error!("Secret watcher error: {}", e); - } - } - } -} - -/// Parse a TLS secret and store the certificate. -fn process_tls_secret(secret: &k8s_openapi::api::core::v1::Secret, store: &ConfigStore) { - let data = match &secret.data { - Some(d) => d, - None => return, - }; - - let cert_pem = match data - .get("tls.crt") - .and_then(|v| std::str::from_utf8(&v.0).ok()) - { - Some(v) => v.to_string(), - None => { - tracing::warn!(name = %secret.name_any(), "TLS Secret missing tls.crt"); - return; - } - }; - - let key_pem = match data - .get("tls.key") - .and_then(|v| std::str::from_utf8(&v.0).ok()) - { - Some(v) => v.to_string(), - None => { - tracing::warn!(name = %secret.name_any(), "TLS Secret missing tls.key"); - return; - } - }; - - let secret_name = secret.name_any(); - - // Extract SANs from the certificate to determine which hosts this cert covers - let hosts = extract_sans_from_pem(&cert_pem).unwrap_or_else(|| vec![secret_name.clone()]); - - let tls_cert = TlsCert { - host: hosts.first().cloned().unwrap_or(secret_name.clone()), - cert_pem, - key_pem, - }; - - // Store under the secret name for cross-referencing - store.set(&format!("tls-secret:{}", secret_name), &tls_cert); - - // Also store directly under each SAN host for SNI lookup - for host in &hosts { - store.set(&format!("tls:{}", host), &tls_cert); - } - - store.signal_reload(); -} - -/// Extract Subject Alternative Names from a PEM certificate. -fn extract_sans_from_pem(pem_data: &str) -> Option> { - use x509_parser::prelude::*; - - let mut reader = std::io::BufReader::new(pem_data.as_bytes()); - let certs: Vec<_> = rustls_pemfile::certs(&mut reader) - .collect::, _>>() - .ok()?; - let cert_der = certs.first()?; - let (_, cert) = X509Certificate::from_der(cert_der).ok()?; - - let mut hosts: Vec = Vec::new(); - - if let Ok(Some(san)) = cert.subject_alternative_name() { - for name in &san.value.general_names { - if let GeneralName::DNSName(dns) = name { - hosts.push(dns.to_string()); - } - } - } - - // Fallback: use CN - if hosts.is_empty() { - if let Some(cn) = cert.subject().iter_common_name().next() { - hosts.push(cn.as_str().unwrap_or_default().to_string()); - } - } - - if hosts.is_empty() { None } else { Some(hosts) } -} - -/// Remove a TLS certificate when the Secret is deleted. -fn remove_tls_cert(secret: &k8s_openapi::api::core::v1::Secret, store: &ConfigStore) { - let secret_name = secret.name_any(); - store.remove(&format!("tls-secret:{}", secret_name)); - // Also clean up tls-host mapping - store.remove(&format!("tls-host:{}", secret_name)); - store.signal_reload(); -} diff --git a/apps/gingress/src/main.rs b/apps/gingress/src/main.rs deleted file mode 100644 index 6d9099f..0000000 --- a/apps/gingress/src/main.rs +++ /dev/null @@ -1,174 +0,0 @@ -//! GIngress — Kubernetes Ingress Controller -//! -//! Control plane that watches Kubernetes resources (Ingress, Service, Endpoints, -//! Secrets) and updates the shared `ConfigStore` for the data plane. -//! -//! Architecture: -//! - Watches Ingress resources → builds routing rules -//! - Watches TLS Secrets → loads certificates -//! - Watches Endpoints → tracks upstream IPs -//! - Reconciler → diffs changes and pushes to ConfigStore + signals reload - -mod controller; - -use clap::Parser; -use gingress_proxy::config::ConfigStore; -use gingress_proxy::hot_reload; -use gingress_proxy::observability; -use gingress_proxy::server::{self, GIngressProxy}; - -#[derive(Parser)] -#[command(name = "gingress")] -struct Args { - /// Ingress class name to watch (default: "gingress") - #[arg(long, default_value = "gingress")] - ingress_class: String, - - /// Kubernetes namespace to watch (empty = all namespaces) - #[arg(long)] - namespace: Option, - - /// HTTP bind address for the proxy - #[arg(long, default_value = "0.0.0.0:80")] - bind_http: String, - - /// HTTPS bind address for the proxy - #[arg(long, default_value = "0.0.0.0:443")] - bind_https: String, - - /// Metrics bind address - #[arg(long, default_value = "0.0.0.0:8080")] - metrics_bind: String, - - /// Log level - #[arg(long, default_value = "info")] - log_level: String, - - /// OTLP endpoint (optional) - #[arg(long)] - otlp_endpoint: Option, -} - -#[tokio::main] -async fn main() -> anyhow::Result<()> { - let args = Args::parse(); - - // Initialize tracing - observability::init_tracing(&args.log_level, args.otlp_endpoint.is_some()); - - // Initialize OTLP if configured - let _otel_guard = if let Some(ref endpoint) = args.otlp_endpoint { - Some(observability::init_otlp(endpoint, "gingress")?) - } else { - None - }; - - tracing::info!( - ingress_class = %args.ingress_class, - bind_http = %args.bind_http, - bind_https = %args.bind_https, - "GIngress starting" - ); - - // Shared config store between control plane and data plane - let config_store = ConfigStore::new(); - - // Start the control plane: watch k8s resources - let controller_handle = controller::start( - config_store.clone(), - args.ingress_class.clone(), - args.namespace.clone(), - ) - .await?; - - tracing::info!("Kubernetes controller started"); - - // Metrics server (for Prometheus scraping) - let metrics_handle = spawn_metrics_server(&args.metrics_bind).await?; - tracing::info!(bind = %args.metrics_bind, "Metrics server started"); - - // Build the Pingora proxy (data plane) - let proxy = GIngressProxy::new(config_store.clone()); - - // Spawn hot-reload watcher: applies config changes to the proxy - let reload_handle = hot_reload::spawn_reload_watcher(config_store.clone(), move |store| { - // Read the assembled ProxyConfig that the reconciler wrote at key "_assembled" - match store.get::("_assembled") { - Some(config_json) => { - if let Ok(cfg) = - serde_json::from_value::(config_json) - { - tracing::info!( - routes = cfg.routes.len(), - tls_hosts = cfg.tls.len(), - upstreams = cfg.upstreams.len(), - "Hot-reload: new proxy configuration applied" - ); - - // Apply TLS certificates to the proxy - for (_host, cert) in &cfg.tls { - tracing::debug!( - host = %cert.host, - "Hot-reload: TLS cert loaded for host" - ); - } - - // Apply routes to the proxy - for (host, rules) in &cfg.routes { - tracing::debug!( - host = %host, - num_rules = rules.len(), - "Hot-reload: routes configured" - ); - } - } else { - tracing::error!("Hot-reload: failed to deserialize assembled ProxyConfig"); - } - } - None => { - tracing::warn!("Hot-reload: no assembled config found (_assembled key missing)"); - } - } - }); - - // Build and run the proxy server (blocking) - let server = server::build_server(proxy, &args.bind_http, &args.bind_https)?; - - tracing::info!( - "GIngress proxy starting, listening on {} (HTTP) and {} (HTTPS)", - args.bind_http, - args.bind_https - ); - - // Run proxy in a tokio blocking task - let proxy_handle = tokio::task::spawn_blocking(move || { - server::run_server(server); - }); - - // Wait for shutdown signal - tokio::signal::ctrl_c().await?; - tracing::info!("Shutdown signal received, stopping..."); - - controller_handle.abort(); - reload_handle.abort(); - metrics_handle.abort(); - - let _ = tokio::time::timeout(std::time::Duration::from_secs(5), proxy_handle).await; - - tracing::info!("GIngress stopped"); - Ok(()) -} - -/// Spawn the metrics server for Prometheus scraping. -async fn spawn_metrics_server(bind: &str) -> anyhow::Result> { - use std::net::TcpListener; - let bind = bind.to_string(); - let listener = TcpListener::bind(&bind)?; - let handle = tokio::spawn(async move { - // Serve metrics via a minimal HTTP handler - // Uses the prometheus_exporter from observability - let _ = listener; - tracing::info!(bind = %bind, "Metrics server stopped"); - }); - Ok(handle) -} diff --git a/build.sh b/build.sh deleted file mode 100644 index 3f7b71f..0000000 --- a/build.sh +++ /dev/null @@ -1,88 +0,0 @@ -#!/usr/bin/env bash -set -euo pipefail - -# ── helpers ────────────────────────────────────────────────────────── -RED='\033[0;31m'; GREEN='\033[0;32m'; YELLOW='\033[1;33m'; NC='\033[0m' -log() { echo -e "${GREEN}[OK]${NC} $*"; } -warn() { echo -e "${YELLOW}[WARN]${NC} $*"; } -err() { echo -e "${RED}[ERR]${NC} $*"; exit 1; } - -command_exists() { command -v "$1" &>/dev/null; } - -# ── 1. Rust ───────────────────────────────────────────────────────── -if command_exists rustc; then - log "Rust $(rustc --version)" -else - warn "Rust not found, installing via rustup..." - curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y - # shellcheck disable=SC1091 - source "$HOME/.cargo/env" - log "Rust installed: $(rustc --version)" -fi - -# ── 2. Node.js ────────────────────────────────────────────────────── -if command_exists node; then - log "Node.js $(node --version)" -else - warn "Node.js not found, installing via nvm..." - curl -o- https://raw.githubusercontent.com/nvm-sh/nvm/v0.40.1/install.sh | bash - # shellcheck disable=SC1090 - export NVM_DIR="${HOME}/.nvm" - [ -s "$NVM_DIR/nvm.sh" ] && source "$NVM_DIR/nvm.sh" - nvm install --lts - log "Node.js installed: $(node --version)" -fi - -# ── 2b. Bun ───────────────────────────────────────────────────────── -if command_exists bun; then - log "Bun $(bun --version)" -else - warn "Bun not found, installing..." - curl -fsSL https://bun.sh/install | bash - # shellcheck disable=SC1091 - [ -s "$HOME/.bun/_bun" ] && export PATH="$HOME/.bun/bin:$PATH" - log "Bun installed: $(bun --version)" -fi - -# ── 3. Docker ─────────────────────────────────────────────────────── -if command_exists docker; then - log "Docker $(docker --version)" -else - warn "Docker not found, installing..." - curl -fsSL https://get.docker.com | sh - log "Docker installed: $(docker --version)" -fi - -# ── 4. Frontend build ─────────────────────────────────────────────── -log "Running bun install..." -bun install - -log "Running bun run build..." -bun run build - -# ── 5. Rust build ─────────────────────────────────────────────────── -log "Running cargo build --release --workspace..." -cargo build --release --workspace - -# ── 6. Docker images ──────────────────────────────────────────────── -TAG=$(git rev-parse --short HEAD) -log "Building Docker images with tag: $TAG" - -IMAGES=( - "docker/app.Dockerfile app:$TAG" - "docker/email.Dockerfile email-worker:$TAG" - "docker/githook.Dockerfile git-hook:$TAG" - "docker/gitserver.Dockerfile gitserver:$TAG" - "docker/metrics.Dockerfile metrics-aggregator:$TAG" - "docker/static.Dockerfile static-server:$TAG" - "docker/gingress.Dockerfile gingress:$TAG" -) - -for entry in "${IMAGES[@]}"; do - read -r dockerfile tag <<< "$entry" - log "Building $tag..." - docker build -f "$dockerfile" -t "$tag" . -done - -log "All images built successfully." -docker images | grep -E "app|email-worker|git-hook|gitserver|metrics-aggregator|static-server|gingress" | grep "$TAG" || true diff --git a/deploy.sh b/deploy.sh deleted file mode 100644 index 230c15c..0000000 --- a/deploy.sh +++ /dev/null @@ -1,90 +0,0 @@ -#!/usr/bin/env bash -set -euo pipefail - -# ── helpers ────────────────────────────────────────────────────────── -RED='\033[0;31m'; GREEN='\033[0;32m'; YELLOW='\033[1;33m'; NC='\033[0m' -log() { echo -e "${GREEN}[OK]${NC} $*"; } -warn() { echo -e "${YELLOW}[WARN]${NC} $*"; } -err() { echo -e "${RED}[ERR]${NC} $*"; exit 1; } - -command_exists() { command -v "$1" &>/dev/null; } - -# ── defaults ───────────────────────────────────────────────────────── -NAMESPACE="${NAMESPACE:-app}" -RELEASE="${RELEASE:-deploy}" -CHART_DIR="${CHART_DIR:-./deploy}" -REGISTRY="${REGISTRY:-harbor.gitdata.me/gtateam}" -TAG="${TAG:-$(git rev-parse --short HEAD)}" -CONFIG_MAP="${CONFIG_MAP:-app-env}" -PVC_NAME="${PVC_NAME:-shared-data}" - -# ── prerequisites ──────────────────────────────────────────────────── -command_exists helm || err "helm not found — install via https://helm.sh/docs/intro/install/" -command_exists kubectl || err "kubectl not found — install via https://kubernetes.io/docs/tasks/tools/" - -log "helm $(helm version --short)" -log "kubectl $(kubectl version --client --short 2>/dev/null || kubectl version -o json 2>/dev/null | grep gitVersion)" - -# ── 1. Ensure namespace (not managed by Helm — preserved on uninstall) ── -log "Ensuring namespace $NAMESPACE exists..." -kubectl create namespace "$NAMESPACE" --dry-run=client -o yaml | kubectl apply -f - - -# ── 2. Ensure prerequisites ───────────────────────────────────────── -if ! kubectl get namespace "$NAMESPACE" &>/dev/null; then - err "Namespace '$NAMESPACE' not found — create it first: kubectl create namespace $NAMESPACE" -fi - -if ! kubectl get configmap "$CONFIG_MAP" -n "$NAMESPACE" &>/dev/null; then - err "ConfigMap '$CONFIG_MAP' not found in namespace '$NAMESPACE' — create it first" -fi - -if ! kubectl get pvc "$PVC_NAME" -n "$NAMESPACE" &>/dev/null; then - err "PVC '$PVC_NAME' not found in namespace '$NAMESPACE' — create it first" -fi - -# Protect ConfigMap and PVC from accidental Helm deletion -kubectl annotate configmap "$CONFIG_MAP" -n "$NAMESPACE" helm.sh/resource-policy=keep --overwrite -kubectl annotate pvc "$PVC_NAME" -n "$NAMESPACE" helm.sh/resource-policy=keep --overwrite - -# cert-manager ClusterIssuer -if ! kubectl get clusterissuer cloudflare-acme-cluster-issuer &>/dev/null; then - warn "ClusterIssuer 'cloudflare-acme-cluster-issuer' not found — TLS certificate issuance will fail" -fi - -log "Prerequisites verified" - -# ── 3. Lint chart ──────────────────────────────────────────────────── -log "Linting Helm chart..." -helm lint "$CHART_DIR" || err "Helm lint failed" - -# ── 4. Deploy ──────────────────────────────────────────────────────── -log "Deploying release $RELEASE with tag $TAG..." - -if ! helm upgrade --install "$RELEASE" "$CHART_DIR" \ - --namespace "$NAMESPACE" \ - --set imageRegistry="$REGISTRY" \ - --set imageTag="$TAG" \ - --set configMapName="$CONFIG_MAP" \ - --timeout 5m; then - echo "" - err "Deployment FAILED — release preserved for debugging. - -Debug commands: - helm status $RELEASE -n $NAMESPACE - kubectl get pods -n $NAMESPACE - kubectl logs -n app --previous - helm rollback $RELEASE -n $NAMESPACE (rollback to previous release) - helm uninstall $RELEASE -n $NAMESPACE (remove failed release)" - -fi - -log "Release $RELEASE deployed successfully" - -# ── 5. Verify ──────────────────────────────────────────────────────── -log "Checking deployment status..." -kubectl get deployments -n "$NAMESPACE" -l app.kubernetes.io/instance="$RELEASE" -kubectl get pods -n "$NAMESPACE" -l app.kubernetes.io/instance="$RELEASE" -kubectl get services -n "$NAMESPACE" -l app.kubernetes.io/instance="$RELEASE" -kubectl get ingress -n "$NAMESPACE" - -log "Deployment complete" \ No newline at end of file diff --git a/docker/gingress.Dockerfile b/docker/gingress.Dockerfile deleted file mode 100644 index 8a9709e..0000000 --- a/docker/gingress.Dockerfile +++ /dev/null @@ -1,8 +0,0 @@ -FROM ubuntu:24.04 -RUN apt-get update && apt-get install -y --no-install-recommends \ - ca-certificates libssl3 \ - && rm -rf /var/lib/apt/lists/* -WORKDIR /app -COPY ./target/release/gingress /bin -EXPOSE 80 443 8080 -ENTRYPOINT ["gingress"] diff --git a/libs/gingress-proxy/Cargo.toml b/libs/gingress-proxy/Cargo.toml deleted file mode 100644 index bd1e939..0000000 --- a/libs/gingress-proxy/Cargo.toml +++ /dev/null @@ -1,42 +0,0 @@ -[package] -name = "gingress-proxy" -version.workspace = true -edition.workspace = true -authors.workspace = true -description = "GIngress data plane: Pingora-based HTTP/HTTPS proxy with TLS, LB, health checks, and observability" -repository.workspace = true -readme.workspace = true -homepage.workspace = true -license.workspace = true -keywords.workspace = true -categories.workspace = true -documentation.workspace = true - -[lib] -path = "src/lib.rs" -name = "gingress_proxy" - -[dependencies] -pingora = { version = "0.8", features = ["proxy"] } -pingora-proxy = "0.8" -pingora-load-balancing = "0.8" -pingora-cache = "0.8" - -tokio = { workspace = true } -serde = { workspace = true, features = ["derive"] } -serde_json = { workspace = true } -serde_yaml = { workspace = true } -rustls = { workspace = true } -rustls-pemfile = "2" -tracing = { workspace = true } -observability = { workspace = true } -anyhow = { workspace = true } -thiserror = { workspace = true } -once_cell = { workspace = true } -dashmap = { workspace = true } -futures-util = { workspace = true } -http = "1" -async-trait = { workspace = true } - -[lints] -workspace = true diff --git a/libs/gingress-proxy/src/config.rs b/libs/gingress-proxy/src/config.rs deleted file mode 100644 index 0e27d56..0000000 --- a/libs/gingress-proxy/src/config.rs +++ /dev/null @@ -1,193 +0,0 @@ -//! Shared configuration store for the GIngress proxy. -//! -//! This is the bridge between the control plane and data plane. -//! The control plane writes routing configuration here; the data plane reads it. - -use dashmap::DashMap; -use serde::{Deserialize, Serialize}; -use std::collections::HashMap; -use std::sync::Arc; -use tokio::sync::watch; - -/// A single backend service reference. -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)] -pub struct Backend { - pub namespace: String, - pub name: String, - pub port: u16, -} - -/// An upstream endpoint (pod IP + port). -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] -pub struct Endpoint { - pub ip: String, - pub port: u16, - pub ready: bool, -} - -/// Routing rule: path prefix + backend. -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct RouteRule { - pub host: String, - pub path: String, - pub path_type: PathType, - pub backend: Backend, -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub enum PathType { - Prefix, - Exact, - ImplementationSpecific, -} - -/// TLS certificate for a host. -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct TlsCert { - pub host: String, - pub cert_pem: String, - pub key_pem: String, -} - -/// Rate limiting policy for a host. -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct RateLimitPolicy { - pub host: String, - pub requests_per_second: u32, - pub burst_size: u32, -} - -/// Header operation to inject or remove. -#[derive(Debug, Clone, Serialize, Deserialize)] -pub enum HeaderOp { - Set { name: String, value: String }, - Add { name: String, value: String }, - Remove { name: String }, -} - -/// Session affinity configuration. -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct SessionAffinityConfig { - pub enabled: bool, - pub cookie_name: String, - pub cookie_ttl_seconds: u64, -} - -/// Full proxy configuration — the single source of truth shared between planes. -#[derive(Debug, Clone, Serialize, Deserialize, Default)] -pub struct ProxyConfig { - /// Route rules keyed by host - pub routes: HashMap>, - /// TLS certs keyed by host (SNI) - pub tls: HashMap, - /// Upstream endpoints keyed by backend identifier - pub upstreams: HashMap>, - /// Rate limit policies keyed by host - pub rate_limits: HashMap, - /// Header operations keyed by host - pub headers: HashMap>, - /// Session affinity configs keyed by host - pub session_affinity: HashMap, - /// WebSocket enabled hosts - pub websocket_hosts: Vec, - /// Git backend — requests with Git User-Agent are routed here - pub git_upstream: Option, -} - -/// The shared configuration store: read-heavy, write-light. -/// -/// Backed by `DashMap` for concurrent reads with low contention. -/// Writes trigger a `watch` notification so the data plane can hot-reload. -#[derive(Clone)] -pub struct ConfigStore { - inner: Arc, -} - -struct ConfigStoreInner { - config: Arc>, - reload_tx: watch::Sender, - reload_rx: watch::Receiver, -} - -impl ConfigStore { - /// Create a new empty config store. - pub fn new() -> Self { - let (reload_tx, reload_rx) = watch::channel(0u64); - Self { - inner: Arc::new(ConfigStoreInner { - config: Arc::new(DashMap::new()), - reload_tx, - reload_rx, - }), - } - } - - /// Get the reload notification receiver. - /// The data plane watches this for hot-reload signals. - pub fn reload_rx(&self) -> watch::Receiver { - self.inner.reload_rx.clone() - } - - /// Signal a reload. Called by the control plane after updating config. - pub fn signal_reload(&self) { - let current = *self.inner.reload_tx.borrow(); - let _ = self.inner.reload_tx.send(current.wrapping_add(1)); - } - - /// Store a typed config section by key. - pub fn set(&self, key: &str, value: &T) { - let json = serde_json::to_value(value).unwrap_or_default(); - self.inner.config.insert(key.to_string(), json); - } - - /// Read a typed config section by key. - pub fn get Deserialize<'de>>(&self, key: &str) -> Option { - self.inner - .config - .get(key) - .and_then(|v| serde_json::from_value(v.clone()).ok()) - } - - /// Remove a config section by key. - pub fn remove(&self, key: &str) { - self.inner.config.remove(key); - } - - /// Remove all keys matching a prefix (for cleanup on Ingress delete). - pub fn remove_prefix(&self, prefix: &str) { - let keys: Vec = self - .inner - .config - .iter() - .filter(|entry| entry.key().starts_with(prefix)) - .map(|entry| entry.key().clone()) - .collect(); - for key in keys { - self.inner.config.remove(&key); - } - } - - /// Get all keys matching a prefix. - pub fn keys_with_prefix(&self, prefix: &str) -> Vec { - self.inner - .config - .iter() - .filter(|entry| entry.key().starts_with(prefix)) - .map(|entry| entry.key().clone()) - .collect() - } - - /// Read the latest assembled proxy configuration. - /// - /// Returns the config written by the reconciler at key `_assembled`. - /// This is called on every request by the proxy's `upstream_peer()`. - pub fn assemble_proxy_config(&self) -> ProxyConfig { - self.get::("_assembled").unwrap_or_default() - } -} - -impl Default for ConfigStore { - fn default() -> Self { - Self::new() - } -} diff --git a/libs/gingress-proxy/src/filters/header_inject.rs b/libs/gingress-proxy/src/filters/header_inject.rs deleted file mode 100644 index e9be030..0000000 --- a/libs/gingress-proxy/src/filters/header_inject.rs +++ /dev/null @@ -1,93 +0,0 @@ -//! Header injection/deletion filter. -//! -//! Injects or removes HTTP headers on requests and responses based on per-host -//! configuration from the `ConfigStore`. Looks up the `headers:` key -//! dynamically on each request. - -use super::{FilterContext, PostFilter, PreFilter}; -use crate::config::{ConfigStore, HeaderOp}; -use pingora::proxy::Session; - -pub struct HeaderInjectFilter { - store: ConfigStore, -} - -impl HeaderInjectFilter { - pub fn new(store: ConfigStore) -> Self { - Self { store } - } - - /// Resolve header operations for the host in the current session. - fn ops_for_session(&self, session: &Session) -> Vec { - let host = session - .req_header() - .headers - .get("host") - .and_then(|v| v.to_str().ok()) - .unwrap_or_default(); - - self.store - .get::>(&format!("headers:{}", host)) - .unwrap_or_default() - } -} - -fn apply_header_ops(header_map: &mut http::HeaderMap, ops: &[HeaderOp]) { - for op in ops { - match op { - HeaderOp::Set { name, value } => { - if let (Ok(key), Ok(val)) = ( - http::HeaderName::from_bytes(name.as_bytes()), - http::HeaderValue::from_str(value), - ) { - header_map.insert(key, val); - } - } - HeaderOp::Add { name, value } => { - if let (Ok(key), Ok(val)) = ( - http::HeaderName::from_bytes(name.as_bytes()), - http::HeaderValue::from_str(value), - ) { - header_map.append(key, val); - } - } - HeaderOp::Remove { name } => { - if let Ok(key) = http::HeaderName::from_bytes(name.as_bytes()) { - header_map.remove(key); - } - } - } - } -} - -impl PreFilter for HeaderInjectFilter { - fn name(&self) -> &'static str { - "header_inject" - } - - fn filter( - &self, - session: &mut Session, - _ctx: &mut FilterContext, - ) -> Result<(), Box> { - let ops = self.ops_for_session(session); - if !ops.is_empty() { - apply_header_ops(&mut session.req_header_mut().headers, &ops); - } - Ok(()) - } -} - -impl PostFilter for HeaderInjectFilter { - fn name(&self) -> &'static str { - "header_inject" - } - - fn filter( - &self, - _session: &mut Session, - _ctx: &FilterContext, - ) -> Result<(), Box> { - Ok(()) - } -} diff --git a/libs/gingress-proxy/src/filters/mod.rs b/libs/gingress-proxy/src/filters/mod.rs deleted file mode 100644 index 53f12e4..0000000 --- a/libs/gingress-proxy/src/filters/mod.rs +++ /dev/null @@ -1,114 +0,0 @@ -//! Request/response filter chain for the GIngress proxy. -//! -//! Filters are applied in order: before proxying (pre-filters) and after the -//! upstream response (post-filters). Each filter implements a simple trait. - -pub mod header_inject; -pub mod rate_limit; -pub mod real_ip; -pub mod session_sticky; -pub mod ws_upgrade; - -use pingora::proxy::Session; - -/// Context passed through the filter chain for a single request. -pub struct FilterContext { - /// Real client IP discovered by the real_ip filter. - pub real_ip: Option, - /// Session sticky key (e.g., cookie value). - pub sticky_key: Option, - /// Whether this is a WebSocket upgrade request. - pub is_websocket: bool, - /// The resolved upstream endpoint (set by load balancer). - pub upstream_endpoint: Option, -} - -impl Default for FilterContext { - fn default() -> Self { - Self { - real_ip: None, - sticky_key: None, - is_websocket: false, - upstream_endpoint: None, - } - } -} - -/// Pre-filter: runs before forwarding the request to the upstream. -pub trait PreFilter: Send + Sync { - fn name(&self) -> &'static str; - fn filter( - &self, - session: &mut Session, - ctx: &mut FilterContext, - ) -> Result<(), Box>; -} - -/// Post-filter: runs after receiving the upstream response. -pub trait PostFilter: Send + Sync { - fn name(&self) -> &'static str; - fn filter( - &self, - session: &mut Session, - ctx: &FilterContext, - ) -> Result<(), Box>; -} - -/// A chain of filters applied sequentially. -pub struct FilterChain { - pre_filters: Vec>, - post_filters: Vec>, -} - -impl FilterChain { - pub fn new() -> Self { - Self { - pre_filters: Vec::new(), - post_filters: Vec::new(), - } - } - - pub fn add_pre(&mut self, filter: Box) { - self.pre_filters.push(filter); - } - - pub fn add_post(&mut self, filter: Box) { - self.post_filters.push(filter); - } - - /// Run all pre-filters. Stops on first error. - pub fn run_pre( - &self, - session: &mut Session, - ctx: &mut FilterContext, - ) -> Result<(), Box> { - for f in &self.pre_filters { - f.filter(session, ctx).map_err(|e| { - tracing::error!(filter = f.name(), error = %e, "Pre-filter failed"); - e - })?; - } - Ok(()) - } - - /// Run all post-filters. Stops on first error. - pub fn run_post( - &self, - session: &mut Session, - ctx: &FilterContext, - ) -> Result<(), Box> { - for f in &self.post_filters { - f.filter(session, ctx).map_err(|e| { - tracing::error!(filter = f.name(), error = %e, "Post-filter failed"); - e - })?; - } - Ok(()) - } -} - -impl Default for FilterChain { - fn default() -> Self { - Self::new() - } -} diff --git a/libs/gingress-proxy/src/filters/rate_limit.rs b/libs/gingress-proxy/src/filters/rate_limit.rs deleted file mode 100644 index 9a99d8b..0000000 --- a/libs/gingress-proxy/src/filters/rate_limit.rs +++ /dev/null @@ -1,78 +0,0 @@ -//! Rate limiting filter using token bucket algorithm. -//! -//! Per-host rate limiting with configurable requests-per-second and burst tolerance. - -use super::{FilterContext, PreFilter}; -use dashmap::DashMap; -use pingora::proxy::Session; -use std::sync::Arc; -use std::time::Instant; - -/// Token bucket rate limiter state for a single key (IP or host). -struct TokenBucket { - tokens: f64, - last_refill: Instant, - max_tokens: f64, - refill_rate: f64, // tokens per second -} - -pub struct RateLimitFilter { - buckets: Arc>, - default_rate: f64, - default_burst: f64, -} - -impl RateLimitFilter { - pub fn new(default_requests_per_second: u32, default_burst_size: u32) -> Self { - Self { - buckets: Arc::new(DashMap::new()), - default_rate: default_requests_per_second as f64, - default_burst: default_burst_size as f64, - } - } - - /// Check if a request identified by `key` is allowed. - /// Returns true if allowed, false if rate-limited. - fn check_rate(&self, key: &str) -> bool { - let now = Instant::now(); - let mut bucket = self - .buckets - .entry(key.to_string()) - .or_insert_with(|| TokenBucket { - tokens: self.default_burst, - last_refill: now, - max_tokens: self.default_burst, - refill_rate: self.default_rate, - }); - - // Refill tokens based on elapsed time - let elapsed = now.duration_since(bucket.last_refill).as_secs_f64(); - bucket.tokens = (bucket.tokens + elapsed * bucket.refill_rate).min(bucket.max_tokens); - bucket.last_refill = now; - - if bucket.tokens >= 1.0 { - bucket.tokens -= 1.0; - true - } else { - false - } - } -} - -impl PreFilter for RateLimitFilter { - fn name(&self) -> &'static str { - "rate_limit" - } - - fn filter( - &self, - _session: &mut Session, - ctx: &mut FilterContext, - ) -> Result<(), Box> { - let key = ctx.real_ip.clone().unwrap_or_else(|| "unknown".to_string()); - if !self.check_rate(&key) { - return Err("rate limit exceeded".into()); - } - Ok(()) - } -} diff --git a/libs/gingress-proxy/src/filters/real_ip.rs b/libs/gingress-proxy/src/filters/real_ip.rs deleted file mode 100644 index 95a5c7f..0000000 --- a/libs/gingress-proxy/src/filters/real_ip.rs +++ /dev/null @@ -1,61 +0,0 @@ -//! Real IP discovery filter. -//! -//! Extracts the real client IP from `X-Forwarded-For` header or Proxy Protocol. -//! Sets it in the filter context for downstream use (logging, rate limiting, etc.). - -use super::{FilterContext, PreFilter}; -use pingora::proxy::Session; - -pub struct RealIpFilter { - /// Whether to trust Proxy Protocol headers (TCP-level). - #[allow(dead_code)] - trust_proxy_protocol: bool, - /// Maximum number of trusted proxy hops. - trusted_hops: usize, -} - -impl RealIpFilter { - pub fn new(trust_proxy_protocol: bool, trusted_hops: usize) -> Self { - Self { - trust_proxy_protocol, - trusted_hops, - } - } -} - -impl PreFilter for RealIpFilter { - fn name(&self) -> &'static str { - "real_ip" - } - - fn filter( - &self, - session: &mut Session, - ctx: &mut FilterContext, - ) -> Result<(), Box> { - // Extract from X-Forwarded-For header - if let Some(xff) = session.req_header().headers.get("x-forwarded-for") { - if let Ok(val) = xff.to_str() { - // X-Forwarded-For: client, proxy1, proxy2 - let ips: Vec<&str> = val.split(',').map(|s| s.trim()).collect(); - let client_idx = ips.len().saturating_sub(1 + self.trusted_hops); - if let Some(client_ip) = ips.get(client_idx) { - ctx.real_ip = Some(client_ip.to_string()); - } - } - } - - // Fallback: use the direct connection IP - if ctx.real_ip.is_none() { - ctx.real_ip = Some(session.client_addr().map_or_else( - || "unknown".to_string(), - |addr| { - addr.as_inet() - .map_or_else(|| "unknown".to_string(), |inet| inet.ip().to_string()) - }, - )); - } - - Ok(()) - } -} diff --git a/libs/gingress-proxy/src/filters/session_sticky.rs b/libs/gingress-proxy/src/filters/session_sticky.rs deleted file mode 100644 index 7f96fb2..0000000 --- a/libs/gingress-proxy/src/filters/session_sticky.rs +++ /dev/null @@ -1,81 +0,0 @@ -//! Session affinity (sticky sessions) filter. -//! -//! Uses a cookie to route consecutive requests from the same client -//! to the same upstream backend, enabling sticky sessions. - -use super::{FilterContext, PostFilter, PreFilter}; -use pingora::proxy::Session; - -const DEFAULT_COOKIE_NAME: &str = "GINGRESS_SESSION"; - -pub struct SessionStickyFilter { - cookie_name: String, - cookie_ttl_seconds: u64, -} - -impl SessionStickyFilter { - pub fn new(cookie_name: Option, cookie_ttl_seconds: u64) -> Self { - Self { - cookie_name: cookie_name.unwrap_or_else(|| DEFAULT_COOKIE_NAME.to_string()), - cookie_ttl_seconds, - } - } -} - -impl PreFilter for SessionStickyFilter { - fn name(&self) -> &'static str { - "session_sticky" - } - - fn filter( - &self, - session: &mut Session, - ctx: &mut FilterContext, - ) -> Result<(), Box> { - // Extract existing sticky cookie - if let Some(cookie_val) = extract_cookie(session, &self.cookie_name) { - ctx.sticky_key = Some(cookie_val); - } else { - // Generate a new sticky key - ctx.sticky_key = Some(generate_sticky_id()); - } - Ok(()) - } -} - -impl PostFilter for SessionStickyFilter { - fn name(&self) -> &'static str { - "session_sticky" - } - - fn filter( - &self, - session: &mut Session, - ctx: &FilterContext, - ) -> Result<(), Box> { - // Set the sticky cookie if a new key was generated - if let Some(ref sticky_key) = ctx.sticky_key { - // Set-Cookie would be injected here. - // Pingora response header API usage depends on version. - let _ = session; - let _ = sticky_key; - let _ = &self.cookie_ttl_seconds; - } - Ok(()) - } -} - -/// Generate a random sticky session ID. -fn generate_sticky_id() -> String { - use std::time::{SystemTime, UNIX_EPOCH}; - let now = SystemTime::now() - .duration_since(UNIX_EPOCH) - .unwrap_or_default(); - let entropy = (now.as_nanos() as u64) ^ (now.as_secs() << 16); - format!("{:016x}", entropy) -} - -/// Extract a cookie value from the request headers. -fn extract_cookie(_session: &Session, _cookie_name: &str) -> Option { - None // Placeholder: full impl parses Cookie header -} diff --git a/libs/gingress-proxy/src/filters/ws_upgrade.rs b/libs/gingress-proxy/src/filters/ws_upgrade.rs deleted file mode 100644 index a44eb42..0000000 --- a/libs/gingress-proxy/src/filters/ws_upgrade.rs +++ /dev/null @@ -1,57 +0,0 @@ -//! WebSocket upgrade filter. -//! -//! Detects WebSocket upgrade requests and sets the `is_websocket` flag in -//! the filter context. Pingora natively supports HTTP upgrade semantics. - -use super::{FilterContext, PreFilter}; -use pingora::proxy::Session; - -pub struct WsUpgradeFilter; - -impl WsUpgradeFilter { - pub fn new() -> Self { - Self - } -} - -impl PreFilter for WsUpgradeFilter { - fn name(&self) -> &'static str { - "ws_upgrade" - } - - fn filter( - &self, - session: &mut Session, - ctx: &mut FilterContext, - ) -> Result<(), Box> { - let headers = &session.req_header().headers; - - let is_upgrade = headers - .get("upgrade") - .and_then(|v| v.to_str().ok()) - .map(|v| v.to_lowercase() == "websocket") - .unwrap_or(false); - - let has_connection_upgrade = headers - .get("connection") - .and_then(|v| v.to_str().ok()) - .map(|v| v.to_lowercase().contains("upgrade")) - .unwrap_or(false); - - let has_ws_key = headers.get("sec-websocket-key").is_some(); - let has_ws_version = headers.get("sec-websocket-version").is_some(); - - if is_upgrade && has_connection_upgrade && has_ws_key && has_ws_version { - ctx.is_websocket = true; - tracing::debug!("WebSocket upgrade detected"); - } - - Ok(()) - } -} - -impl Default for WsUpgradeFilter { - fn default() -> Self { - Self::new() - } -} diff --git a/libs/gingress-proxy/src/health_checker.rs b/libs/gingress-proxy/src/health_checker.rs deleted file mode 100644 index 3a366e6..0000000 --- a/libs/gingress-proxy/src/health_checker.rs +++ /dev/null @@ -1,73 +0,0 @@ -//! Active and passive health checks for upstream endpoints. -//! -//! - Active: periodically probes upstream `/health` endpoints. -//! - Passive: tracks request failure rates and marks unhealthy endpoints. - -use crate::config::Endpoint; -use std::sync::Arc; -use tokio::sync::RwLock; - -/// Health checker for a pool of upstream endpoints. -pub struct HealthChecker { - endpoints: Arc>>, - /// How often to run active health probes. - #[allow(dead_code)] - interval: std::time::Duration, - /// Failure threshold for passive health checks. - #[allow(dead_code)] - passive_fail_threshold: u32, - /// Success threshold for recovery. - #[allow(dead_code)] - passive_success_threshold: u32, -} - -impl HealthChecker { - /// Create a new health checker. - pub fn new(endpoints: Vec, interval: std::time::Duration) -> Self { - Self { - endpoints: Arc::new(RwLock::new(endpoints)), - interval, - passive_fail_threshold: 3, - passive_success_threshold: 2, - } - } - - /// Update the endpoint pool. - pub async fn update_endpoints(&self, new_endpoints: Vec) { - let mut eps = self.endpoints.write().await; - *eps = new_endpoints; - } - - /// Run active health probes in a background task. - pub fn spawn_active_probes(self: Arc) -> tokio::task::JoinHandle<()> { - tokio::spawn(async move { - // Active probing loop would go here. - // For now a placeholder that keeps endpoints as-is. - loop { - tokio::time::sleep(std::time::Duration::from_secs(10)).await; - } - }) - } - - /// Mark an endpoint as failing (passive check). - /// Returns true if this endpoint should now be considered unhealthy. - #[allow(dead_code)] - pub async fn record_failure(&self, endpoint_ip: &str) -> bool { - let mut eps = self.endpoints.write().await; - if let Some(ep) = eps.iter_mut().find(|e| e.ip == endpoint_ip) { - // Track failure count via metadata (simplified: uses ready field) - // Full impl would add failure_count field to Endpoint - let _ = ep; - return false; // Placeholder - } - false - } - - /// Mark an endpoint as successful (passive check). - #[allow(dead_code)] - pub async fn record_success(&self, endpoint_ip: &str) { - let _eps = self.endpoints.write().await; - // Reset failure count for the endpoint - let _ = endpoint_ip; - } -} diff --git a/libs/gingress-proxy/src/hot_reload.rs b/libs/gingress-proxy/src/hot_reload.rs deleted file mode 100644 index a05d278..0000000 --- a/libs/gingress-proxy/src/hot_reload.rs +++ /dev/null @@ -1,64 +0,0 @@ -//! Hot reload support for the GIngress proxy. -//! -//! Watches the `ConfigStore` reload channel and gracefully applies -//! configuration changes without dropping active connections. - -use crate::config::ConfigStore; - -/// Hot-reload watcher that listens for config changes. -pub struct HotReloadWatcher { - store: ConfigStore, - current_version: u64, -} - -impl HotReloadWatcher { - pub fn new(store: ConfigStore) -> Self { - Self { - store, - current_version: 0, - } - } - - /// Wait for the next configuration change. - /// Returns true if config changed, false if the channel was closed. - pub async fn wait_for_change(&mut self) -> bool { - let mut rx = self.store.reload_rx(); - loop { - match rx.changed().await { - Ok(()) => { - let new_version = *rx.borrow(); - if new_version != self.current_version { - self.current_version = new_version; - return true; - } - } - Err(_) => return false, - } - } - } - - /// Get the current configuration snapshot. - pub fn store(&self) -> &ConfigStore { - &self.store - } -} - -/// Spawn a background task that watches for config changes and applies them. -/// -/// The `on_reload` callback is invoked each time the config changes. -pub fn spawn_reload_watcher(store: ConfigStore, on_reload: F) -> tokio::task::JoinHandle<()> -where - F: Fn(&ConfigStore) + Send + 'static, -{ - tokio::spawn(async move { - let mut watcher = HotReloadWatcher::new(store); - loop { - if !watcher.wait_for_change().await { - tracing::info!("Config reload channel closed, stopping watcher"); - break; - } - tracing::info!("Config change detected, reloading..."); - on_reload(watcher.store()); - } - }) -} diff --git a/libs/gingress-proxy/src/lib.rs b/libs/gingress-proxy/src/lib.rs deleted file mode 100644 index c8f60d4..0000000 --- a/libs/gingress-proxy/src/lib.rs +++ /dev/null @@ -1,27 +0,0 @@ -//! GIngress data plane: Pingora-based HTTP/HTTPS reverse proxy. -//! -//! ## Architecture -//! -//! The data plane receives traffic and proxies it to upstream services -//! based on configuration from the shared `ConfigStore`. The control plane -//! (apps/gingress) updates the `ConfigStore` by watching Kubernetes resources. -//! -//! ## Components -//! -//! - `config` — Shared configuration store (routes, TLS certs, upstreams, policies) -//! - `server` — Pingora server lifecycle and proxy bridge -//! - `tls` — TLS termination via rustls with SNI-based certificate selection -//! - `load_balancer` — Upstream selection algorithms -//! - `health_checker` — Active and passive upstream health checks -//! - `filters` — Request/response filter chain (real IP, headers, rate limit, sticky, WS) -//! - `observability` — Prometheus metrics and OTLP tracing -//! - `hot_reload` — Graceful config reload without dropping connections - -pub mod config; -pub mod filters; -pub mod health_checker; -pub mod hot_reload; -pub mod load_balancer; -pub mod observability; -pub mod server; -pub mod tls; diff --git a/libs/gingress-proxy/src/load_balancer.rs b/libs/gingress-proxy/src/load_balancer.rs deleted file mode 100644 index 34708c2..0000000 --- a/libs/gingress-proxy/src/load_balancer.rs +++ /dev/null @@ -1,66 +0,0 @@ -//! Load balancing algorithms for upstream selection. -//! -//! Uses Pingora's built-in load balancing primitives where possible, -//! with custom extensions for session-affinity consistent hashing. - -use crate::config::Endpoint; -use std::sync::atomic::{AtomicUsize, Ordering}; - -/// Load balancer that selects an upstream endpoint from a pool. -#[derive(Debug)] -pub struct LoadBalancer { - endpoints: Vec, - counter: AtomicUsize, -} - -impl LoadBalancer { - /// Create a new load balancer with the given endpoints. - pub fn new(endpoints: Vec) -> Self { - Self { - endpoints, - counter: AtomicUsize::new(0), - } - } - - /// Update the endpoint pool (e.g., on hot-reload). - pub fn update_endpoints(&mut self, endpoints: Vec) { - self.endpoints = endpoints; - } - - /// Select an endpoint using round-robin. - pub fn round_robin(&self) -> Option<&Endpoint> { - let healthy: Vec<&Endpoint> = self.endpoints.iter().filter(|e| e.ready).collect(); - if healthy.is_empty() { - return None; - } - let idx = self.counter.fetch_add(1, Ordering::Relaxed) % healthy.len(); - Some(healthy[idx]) - } - - /// Select an endpoint using the least-connections strategy (placeholder). - /// - /// Full implementation would track active connections per endpoint. - pub fn least_connections(&self) -> Option<&Endpoint> { - self.endpoints.iter().filter(|e| e.ready).min_by_key(|_| { - // Placeholder: return 0 for now. Real impl would track connection counts. - 0usize - }) - } - - /// Consistent hash selection based on a key (for session affinity). - pub fn consistent_hash(&self, key: &str) -> Option<&Endpoint> { - use std::collections::hash_map::DefaultHasher; - use std::hash::{Hash, Hasher}; - - let healthy: Vec<&Endpoint> = self.endpoints.iter().filter(|e| e.ready).collect(); - if healthy.is_empty() { - return None; - } - - let mut hasher = DefaultHasher::new(); - key.hash(&mut hasher); - let hash = hasher.finish(); - let idx = hash as usize % healthy.len(); - Some(healthy[idx]) - } -} diff --git a/libs/gingress-proxy/src/observability.rs b/libs/gingress-proxy/src/observability.rs deleted file mode 100644 index 44be0da..0000000 --- a/libs/gingress-proxy/src/observability.rs +++ /dev/null @@ -1,82 +0,0 @@ -//! Observability integration for the GIngress proxy. -//! -//! Reuses the workspace `observability` lib for tracing initialization, -//! and adds GIngress-specific Prometheus metrics. - -use std::sync::Arc; - -/// GIngress-specific HTTP metrics. -/// -/// Extends the workspace `observability::HttpMetrics` with ingress-specific counters. -pub struct IngressMetrics { - /// Total requests per host - pub requests_total: Arc>, - /// Active WebSocket connections - pub ws_connections_active: Arc, - /// Upstream health status (0 = unhealthy, 1 = healthy) - pub upstream_health: Arc>, - /// TLS certificate expiry timestamps - pub tls_cert_expiry: Arc>, -} - -impl IngressMetrics { - pub fn new() -> Self { - Self { - requests_total: Arc::new(dashmap::DashMap::new()), - ws_connections_active: Arc::new(std::sync::atomic::AtomicU64::new(0)), - upstream_health: Arc::new(dashmap::DashMap::new()), - tls_cert_expiry: Arc::new(dashmap::DashMap::new()), - } - } - - /// Record a request for a given host and status code. - pub fn record_request(&self, host: &str, _status: u16) { - self.requests_total - .entry(host.to_string()) - .and_modify(|c| *c += 1) - .or_insert(1); - } - - /// Record WebSocket connection opened. - pub fn ws_open(&self) { - self.ws_connections_active - .fetch_add(1, std::sync::atomic::Ordering::Relaxed); - } - - /// Record WebSocket connection closed. - pub fn ws_close(&self) { - self.ws_connections_active - .fetch_sub(1, std::sync::atomic::Ordering::Relaxed); - } - - /// Update upstream health status. - pub fn set_upstream_health(&self, upstream: &str, healthy: bool) { - self.upstream_health - .insert(upstream.to_string(), if healthy { 1 } else { 0 }); - } - - /// Record TLS cert expiry time. - pub fn set_cert_expiry(&self, host: &str, expiry_unix: i64) { - self.tls_cert_expiry.insert(host.to_string(), expiry_unix); - } -} - -impl Default for IngressMetrics { - fn default() -> Self { - Self::new() - } -} - -/// Initialize tracing via the workspace observability lib. -pub fn init_tracing(level: &str, otel_enabled: bool) { - observability::init_tracing_subscriber(level, otel_enabled); -} - -/// Initialize OTLP export for distributed tracing. -pub fn init_otlp( - endpoint: &str, - service_name: &str, -) -> anyhow::Result> { - observability::init_otlp(endpoint, service_name, "0.1.0", "info") - .map_err(|e| anyhow::anyhow!("OTLP init failed: {}", e)) -} diff --git a/libs/gingress-proxy/src/server.rs b/libs/gingress-proxy/src/server.rs deleted file mode 100644 index 612ca83..0000000 --- a/libs/gingress-proxy/src/server.rs +++ /dev/null @@ -1,193 +0,0 @@ -//! Pingora server lifecycle and proxy bridge. -//! -//! Manages the Pingora server process, connects it to the ConfigStore, -//! and implements the proxy logic (request routing, upstream selection, filtering). - -use crate::config::ConfigStore; -use crate::filters::header_inject::HeaderInjectFilter; -use crate::filters::{FilterChain, FilterContext}; -use pingora::proxy::Session; -use pingora::server::Server; -use pingora::upstreams::peer::HttpPeer; -use pingora_proxy::ProxyHttp; -use std::sync::Arc; - -/// GIngress proxy service — the core proxy implementation. -pub struct GIngressProxy { - pub config: ConfigStore, - pub metrics: Arc, - filter_chain: Arc>, -} - -impl GIngressProxy { - /// Create a new proxy service. - pub fn new(config: ConfigStore) -> Self { - let mut filter_chain = FilterChain::new(); - // Add default filters - filter_chain.add_pre(Box::new(crate::filters::real_ip::RealIpFilter::new( - true, 1, - ))); - filter_chain.add_pre(Box::new(HeaderInjectFilter::new(config.clone()))); - filter_chain.add_pre(Box::new(crate::filters::ws_upgrade::WsUpgradeFilter::new())); - - Self { - config, - metrics: Arc::new(crate::observability::IngressMetrics::new()), - filter_chain: Arc::new(std::sync::RwLock::new(filter_chain)), - } - } - - /// Get the filter chain for external management. - pub fn filter_chain(&self) -> &Arc> { - &self.filter_chain - } - - /// Match a request to a route rule based on host and path. - fn match_route(cfg: &crate::config::ProxyConfig, host: &str, path: &str) -> Option { - cfg.routes - .get(host) - .and_then(|rules| { - rules.iter().find(|r| match r.path_type { - crate::config::PathType::Prefix - | crate::config::PathType::ImplementationSpecific => path.starts_with(&r.path), - crate::config::PathType::Exact => path == r.path, - }) - }) - .map(|r| { - format!( - "upstream:{}/{}:{}", - r.backend.namespace, r.backend.name, r.backend.port - ) - }) - } -} - -#[async_trait::async_trait] -impl ProxyHttp for GIngressProxy { - type CTX = FilterContext; - - fn new_ctx(&self) -> Self::CTX { - FilterContext::default() - } - - async fn upstream_peer( - &self, - session: &mut Session, - ctx: &mut Self::CTX, - ) -> pingora::Result> { - let host = session - .req_header() - .headers - .get("host") - .and_then(|v| v.to_str().ok()) - .map(|v| v.to_string()) - .unwrap_or_default(); - - let cfg = self.config.assemble_proxy_config(); - - // Git User-Agent override: requests from git clients (git/2.x, JGit, etc.) - // are routed directly to the git backend regardless of host/path matching. - let backend_key = if let Some(ref git_backend) = cfg.git_upstream { - let ua = session - .req_header() - .headers - .get("user-agent") - .and_then(|v| v.to_str().ok()) - .unwrap_or(""); - - if ua.starts_with("git/") || ua.starts_with("JGit/") { - tracing::debug!(host, ua, "Git UA detected, routing to git backend"); - Some(format!( - "upstream:{}/{}:{}", - git_backend.namespace, git_backend.name, git_backend.port - )) - } else { - // Normal route matching for non-git requests - Self::match_route(&cfg, &host, session.req_header().uri.path()) - } - } else { - Self::match_route(&cfg, &host, session.req_header().uri.path()) - }; - - // Select endpoint via load balancer - let endpoint = backend_key - .as_ref() - .and_then(|key| cfg.upstreams.get(key)) - .and_then(|eps| { - let lb = crate::load_balancer::LoadBalancer::new(eps.clone()); - match ctx.sticky_key { - Some(ref key) => lb.consistent_hash(key).cloned(), - None => lb.round_robin().cloned(), - } - }); - - match endpoint { - Some(ep) => { - let addr = format!("{}:{}", ep.ip, ep.port); - ctx.upstream_endpoint = Some(addr.clone()); - let peer = HttpPeer::new(addr, false, String::new()); - Ok(Box::new(peer)) - } - None => pingora::Error::e_explain( - pingora::ErrorType::InternalError, - format!( - "no upstream found for host '{}' path '{}'", - host, - session.req_header().uri.path() - ), - ), - } - } - - async fn request_filter( - &self, - session: &mut Session, - ctx: &mut Self::CTX, - ) -> pingora::Result { - self.filter_chain - .read() - .unwrap() - .run_pre(session, ctx) - .map_err(|e| { - pingora::Error::because(pingora::ErrorType::InternalError, "pre-filter failed", e) - })?; - Ok(false) - } -} - -/// Build and configure the Pingora server. -/// -/// Takes ownership of the `GIngressProxy` and wraps it in a -/// `pingora_proxy::http_proxy_service` which implements the required -/// `ServiceWithDependents` trait. -pub fn build_server( - proxy: GIngressProxy, - bind_http: &str, - bind_https: &str, -) -> anyhow::Result { - let mut server = Server::new(Some(pingora::server::configuration::Opt { - upgrade: true, // Enable HTTP upgrade (for WebSocket) - ..Default::default() - }))?; - - server.bootstrap(); - - let mut http_service = - pingora_proxy::http_proxy_service_with_name(&server.configuration, proxy, "gingress"); - http_service.add_tcp(bind_http); - - // HTTPS with TLS would be added via add_tls on a separate service instance. - tracing::info!( - bind_http = %bind_http, - bind_https = %bind_https, - "GIngress proxy server configured" - ); - - server.add_service(http_service); - Ok(server) -} - -/// Run the proxy server (blocking). -pub fn run_server(server: Server) { - server.run_forever(); -} diff --git a/libs/gingress-proxy/src/tls.rs b/libs/gingress-proxy/src/tls.rs deleted file mode 100644 index 9512cbc..0000000 --- a/libs/gingress-proxy/src/tls.rs +++ /dev/null @@ -1,109 +0,0 @@ -//! TLS termination using rustls with SNI-based multi-certificate support. -//! -//! Certificates are loaded from the `ConfigStore`, populated by the control plane -//! watching Kubernetes TLS Secrets (from cert-manager or manual creation). - -use crate::config::ConfigStore; -use anyhow::Context; -use rustls::ServerConfig; -use rustls::server::ResolvesServerCert; -use rustls::sign::CertifiedKey; -use std::collections::HashMap; -use std::fmt; -use std::sync::Arc; - -/// SNI-based certificate resolver. -/// -/// Selects the appropriate TLS certificate based on the client's SNI hostname. -pub struct SniResolver { - certs: HashMap>, - default: Option>, -} - -impl fmt::Debug for SniResolver { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("SniResolver") - .field("num_certs", &self.certs.len()) - .finish() - } -} - -impl SniResolver { - pub fn new() -> Self { - Self { - certs: HashMap::new(), - default: None, - } - } - - /// Load certificates from the config store. - pub fn load_from_config(&mut self, store: &ConfigStore) -> anyhow::Result<()> { - let _ = store; - Ok(()) - } - - /// Add a certificate for a specific hostname. - pub fn add_cert(&mut self, host: &str, cert_pem: &str, key_pem: &str) -> anyhow::Result<()> { - let cert_chain = rustls_pemfile::certs(&mut cert_pem.as_bytes()) - .collect::, _>>() - .context("Failed to parse certificate PEM")?; - - let key_der = rustls_pemfile::private_key(&mut key_pem.as_bytes()) - .context("Failed to parse private key PEM")? - .context("No private key found in PEM")?; - - let signing_key = rustls::crypto::ring::sign::any_supported_type(&key_der) - .context("Unsupported private key type")?; - - let certified_key = Arc::new(CertifiedKey::new(cert_chain, signing_key)); - - if self.default.is_none() { - self.default = Some(certified_key.clone()); - } - - self.certs.insert(host.to_string(), certified_key); - Ok(()) - } - - /// Remove a certificate for a hostname. - pub fn remove_cert(&mut self, host: &str) { - let removed = self.certs.remove(host); - // If we removed the default, pick another one - if let Some(ref removed_key) = removed { - if self - .default - .as_ref() - .map_or(false, |d| Arc::ptr_eq(d, removed_key)) - { - self.default = self.certs.values().next().cloned(); - } - } - } -} - -impl ResolvesServerCert for SniResolver { - fn resolve(&self, client_hello: rustls::server::ClientHello) -> Option> { - if let Some(name) = client_hello.server_name() { - // Try exact match - if let Some(cert) = self.certs.get(name) { - return Some(cert.clone()); - } - // Try wildcard matching - if let Some(dot) = name.find('.') { - let wildcard = format!("*{}", &name[dot..]); - if let Some(cert) = self.certs.get(&wildcard) { - return Some(cert.clone()); - } - } - } - self.default.clone() - } -} - -/// Build a rustls `ServerConfig` with the given SNI resolver. -pub fn build_server_config(resolver: SniResolver) -> anyhow::Result> { - let config = ServerConfig::builder() - .with_no_client_auth() - .with_cert_resolver(Arc::new(resolver)); - Ok(Arc::new(config)) -} diff --git a/push.sh b/push.sh deleted file mode 100644 index 1718ba0..0000000 --- a/push.sh +++ /dev/null @@ -1,32 +0,0 @@ -#!/usr/bin/env bash -set -euo pipefail - -REGISTRY="${REGISTRY:-harbor.gitdata.me/gtateam}" -TAG="${TAG:-$(git rev-parse --short HEAD)}" - -RED='\033[0;31m'; GREEN='\033[0;32m'; YELLOW='\033[1;33m'; NC='\033[0m' -log() { echo -e "${GREEN}[OK]${NC} $*"; } -err() { echo -e "${RED}[ERR]${NC} $*"; exit 1; } - -# ── credentials ───────────────────────────────────────────────────── -: "${DOCKER_USERNAME:?DOCKER_USERNAME env var required}" -: "${DOCKER_PASSWORD:?DOCKER_PASSWORD env var required}" - -log "Logging into $REGISTRY..." -echo "$DOCKER_PASSWORD" | docker login "$REGISTRY" -u "$DOCKER_USERNAME" --password-stdin - -# ── tag & push ────────────────────────────────────────────────────── -IMAGES=(app email-worker git-hook gitserver metrics-aggregator static-server gingress) - -for name in "${IMAGES[@]}"; do - SRC="${name}:${TAG}" - DST="${REGISTRY}/${name}:${TAG}" - - log "Tagging $SRC -> $DST" - docker tag "$SRC" "$DST" - - log "Pushing $DST" - docker push "$DST" -done - -log "All images pushed to $REGISTRY" diff --git a/src/app/chat/ChatConversationList.tsx b/src/components/chat/ChatConversationList.tsx similarity index 100% rename from src/app/chat/ChatConversationList.tsx rename to src/components/chat/ChatConversationList.tsx diff --git a/src/app/chat/ChatHeader.tsx b/src/components/chat/ChatHeader.tsx similarity index 100% rename from src/app/chat/ChatHeader.tsx rename to src/components/chat/ChatHeader.tsx diff --git a/src/app/chat/ChatMessageBubble.tsx b/src/components/chat/ChatMessageBubble.tsx similarity index 100% rename from src/app/chat/ChatMessageBubble.tsx rename to src/components/chat/ChatMessageBubble.tsx diff --git a/src/app/chat/ChatMessageInput.tsx b/src/components/chat/ChatMessageInput.tsx similarity index 100% rename from src/app/chat/ChatMessageInput.tsx rename to src/components/chat/ChatMessageInput.tsx diff --git a/src/app/chat/ChatMessageList.tsx b/src/components/chat/ChatMessageList.tsx similarity index 100% rename from src/app/chat/ChatMessageList.tsx rename to src/components/chat/ChatMessageList.tsx diff --git a/src/app/chat/ChatModelSelector.tsx b/src/components/chat/ChatModelSelector.tsx similarity index 100% rename from src/app/chat/ChatModelSelector.tsx rename to src/components/chat/ChatModelSelector.tsx diff --git a/src/app/chat/ChatPageContext.ts b/src/components/chat/ChatPageContext.ts similarity index 100% rename from src/app/chat/ChatPageContext.ts rename to src/components/chat/ChatPageContext.ts diff --git a/src/app/chat/ChatSlashCommandMenu.tsx b/src/components/chat/ChatSlashCommandMenu.tsx similarity index 100% rename from src/app/chat/ChatSlashCommandMenu.tsx rename to src/components/chat/ChatSlashCommandMenu.tsx diff --git a/src/app/chat/chatSlashContext.ts b/src/components/chat/chatSlashContext.ts similarity index 100% rename from src/app/chat/chatSlashContext.ts rename to src/components/chat/chatSlashContext.ts diff --git a/uninstall.sh b/uninstall.sh deleted file mode 100644 index a709216..0000000 --- a/uninstall.sh +++ /dev/null @@ -1,57 +0,0 @@ -#!/usr/bin/env bash -set -euo pipefail - -# ── helpers ────────────────────────────────────────────────────────── -RED='\033[0;31m'; GREEN='\033[0;32m'; YELLOW='\033[1;33m'; NC='\033[0m' -log() { echo -e "${GREEN}[OK]${NC} $*"; } -warn() { echo -e "${YELLOW}[WARN]${NC} $*"; } - -# ── defaults ───────────────────────────────────────────────────────── -NAMESPACE="${NAMESPACE:-app}" -RELEASE="${RELEASE:-deploy}" -CONFIG_MAP="${CONFIG_MAP:-app-env}" -PVC_NAME="${PVC_NAME:-shared-data}" - -# ── safety check ───────────────────────────────────────────────────── -echo "" -warn "This will remove Helm release '$RELEASE' from namespace '$NAMESPACE'." -warn "The following resources are PROTECTED and will NOT be deleted:" -warn " - Namespace: $NAMESPACE" -warn " - ConfigMap: $CONFIG_MAP" -warn " - PVC: $PVC_NAME" -echo "" -read -rp "Continue? [y/N] " confirm -if [[ "$confirm" != "y" && "$confirm" != "Y" ]]; then - log "Cancelled" - exit 0 -fi - -# ── uninstall ──────────────────────────────────────────────────────── -log "Uninstalling Helm release $RELEASE..." -helm uninstall "$RELEASE" --namespace "$NAMESPACE" - -log "Helm release uninstalled" - -# ── verify protected resources ─────────────────────────────────────── -log "Verifying protected resources still exist..." - -if kubectl get namespace "$NAMESPACE" &>/dev/null; then - log "Namespace '$NAMESPACE' preserved" -else - echo -e "${RED}[ERR]${NC} Namespace '$NAMESPACE' was deleted!" -fi - -if kubectl get configmap "$CONFIG_MAP" -n "$NAMESPACE" &>/dev/null; then - log "ConfigMap '$CONFIG_MAP' preserved" -else - echo -e "${RED}[ERR]${NC} ConfigMap '$CONFIG_MAP' was deleted!" -fi - -if kubectl get pvc "$PVC_NAME" -n "$NAMESPACE" &>/dev/null; then - log "PVC '$PVC_NAME' preserved" -else - echo -e "${RED}[ERR]${NC} PVC '$PVC_NAME' was deleted!" -fi - -log "Uninstall complete — remaining resources in namespace $NAMESPACE:" -kubectl get all,pvc,configmap,ingress -n "$NAMESPACE" \ No newline at end of file