chore: remove gingress ingress controller and proxy
Delete unused ingress controller and proxy components: - apps/gingress: Kubernetes ingress controller - libs/gingress-proxy: Gateway proxy with rate limiting, TLS, etc. - docker/gingress.Dockerfile
This commit is contained in:
parent
e7d38fc565
commit
d59647d9a8
@ -1,46 +0,0 @@
|
||||
[package]
|
||||
name = "gingress"
|
||||
version.workspace = true
|
||||
edition.workspace = true
|
||||
authors.workspace = true
|
||||
description = "GIngress control plane: Kubernetes Ingress Controller using kube-rs"
|
||||
repository.workspace = true
|
||||
readme.workspace = true
|
||||
homepage.workspace = true
|
||||
license.workspace = true
|
||||
keywords.workspace = true
|
||||
categories.workspace = true
|
||||
documentation.workspace = true
|
||||
|
||||
[[bin]]
|
||||
name = "gingress"
|
||||
path = "src/main.rs"
|
||||
|
||||
[[bin]]
|
||||
name = "kubectl-gingress"
|
||||
path = "src/bin/kubectl-gingress/main.rs"
|
||||
|
||||
[dependencies]
|
||||
gingress-proxy = { workspace = true }
|
||||
|
||||
kube = { workspace = true }
|
||||
k8s-openapi = { workspace = true }
|
||||
|
||||
tokio = { workspace = true, features = ["full"] }
|
||||
serde = { workspace = true, features = ["derive"] }
|
||||
serde_json = { workspace = true }
|
||||
serde_yaml = { workspace = true }
|
||||
tracing = { workspace = true }
|
||||
observability = { workspace = true }
|
||||
anyhow = { workspace = true }
|
||||
thiserror = { workspace = true }
|
||||
dashmap = { workspace = true }
|
||||
futures-util = { workspace = true }
|
||||
futures = { workspace = true }
|
||||
clap = { workspace = true }
|
||||
url = { workspace = true }
|
||||
x509-parser = "0.17"
|
||||
rustls-pemfile = "2"
|
||||
|
||||
[lints]
|
||||
workspace = true
|
||||
@ -1,887 +0,0 @@
|
||||
//! kubectl-gingress — kubectl plugin for managing GIngress resources.
|
||||
//!
|
||||
//! Usage (via kubectl): kubectl gingress <subcommand>
|
||||
//! Usage (standalone): kubectl-gingress <subcommand>
|
||||
|
||||
use clap::{Parser, Subcommand};
|
||||
use k8s_openapi::api::core::v1::{Pod, Secret};
|
||||
use k8s_openapi::api::networking::v1::{HTTPIngressPath, Ingress};
|
||||
use kube::api::ListParams;
|
||||
use kube::{Api, Client, ResourceExt};
|
||||
|
||||
const INGRESS_CLASS: &str = "gingress";
|
||||
|
||||
#[derive(Parser)]
|
||||
#[command(
|
||||
name = "kubectl-gingress",
|
||||
bin_name = "kubectl gingress",
|
||||
about = "Manage GIngress — Kubernetes Ingress Controller",
|
||||
version
|
||||
)]
|
||||
struct Cli {
|
||||
#[command(subcommand)]
|
||||
command: Command,
|
||||
}
|
||||
|
||||
#[derive(Subcommand)]
|
||||
enum Command {
|
||||
/// List all Ingress resources managed by GIngress
|
||||
#[command(alias = "ls")]
|
||||
List {
|
||||
/// Filter by namespace (omit for all namespaces)
|
||||
#[arg(short, long)]
|
||||
namespace: Option<String>,
|
||||
/// Output as JSON
|
||||
#[arg(long)]
|
||||
json: bool,
|
||||
},
|
||||
/// Show the routing table (host → path → backend)
|
||||
Routes {
|
||||
/// Filter by namespace
|
||||
#[arg(short, long)]
|
||||
namespace: Option<String>,
|
||||
/// Filter by host
|
||||
#[arg(short = 'H', long)]
|
||||
host: Option<String>,
|
||||
/// Output as JSON
|
||||
#[arg(long)]
|
||||
json: bool,
|
||||
},
|
||||
/// Show backend services and their endpoints
|
||||
Backends {
|
||||
/// Filter by namespace
|
||||
#[arg(short, long)]
|
||||
namespace: Option<String>,
|
||||
/// Output as JSON
|
||||
#[arg(long)]
|
||||
json: bool,
|
||||
},
|
||||
/// List TLS certificates (from Secrets)
|
||||
Certs {
|
||||
/// Filter by namespace
|
||||
#[arg(short, long)]
|
||||
namespace: Option<String>,
|
||||
/// Output as JSON
|
||||
#[arg(long)]
|
||||
json: bool,
|
||||
},
|
||||
/// Validate Ingress configurations
|
||||
Validate {
|
||||
/// Filter by namespace
|
||||
#[arg(short, long)]
|
||||
namespace: Option<String>,
|
||||
},
|
||||
/// Show GIngress controller status and summary
|
||||
Status {
|
||||
/// Output as JSON
|
||||
#[arg(long)]
|
||||
json: bool,
|
||||
},
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
let cli = Cli::parse();
|
||||
let client = Client::try_default().await?;
|
||||
|
||||
match cli.command {
|
||||
Command::List { namespace, json } => cmd_list(&client, namespace, json).await?,
|
||||
Command::Routes {
|
||||
namespace,
|
||||
host,
|
||||
json,
|
||||
} => cmd_routes(&client, namespace, host, json).await?,
|
||||
Command::Backends { namespace, json } => cmd_backends(&client, namespace, json).await?,
|
||||
Command::Certs { namespace, json } => cmd_certs(&client, namespace, json).await?,
|
||||
Command::Validate { namespace } => cmd_validate(&client, namespace).await?,
|
||||
Command::Status { json } => cmd_status(&client, json).await?,
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// ── list ──────────────────────────────────────────────────────────
|
||||
|
||||
async fn cmd_list(
|
||||
client: &Client,
|
||||
namespace: Option<String>,
|
||||
json: bool,
|
||||
) -> Result<(), Box<dyn std::error::Error>> {
|
||||
let ingresses = list_ingresses(client, namespace.as_deref()).await?;
|
||||
|
||||
if json {
|
||||
println!("{}", serde_json::to_string_pretty(&ingresses)?);
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
if ingresses.is_empty() {
|
||||
println!("No GIngress-managed Ingress resources found.");
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
println!(
|
||||
"{:<25} {:<20} {:<40} {:<50} {:<15}",
|
||||
"NAMESPACE", "NAME", "HOSTS", "PATHS", "TLS"
|
||||
);
|
||||
println!("{:-<150}", "");
|
||||
|
||||
for ing in &ingresses {
|
||||
let ns = ing.namespace();
|
||||
let name = ing.name_any();
|
||||
let hosts = ing.hosts().join(", ");
|
||||
let paths = ing
|
||||
.paths_display()
|
||||
.iter()
|
||||
.map(|p| format!("{} {}", p.path_type, p.path))
|
||||
.collect::<Vec<_>>()
|
||||
.join(", ");
|
||||
let tls = if ing.has_tls() { "Enabled" } else { "-" };
|
||||
|
||||
println!(
|
||||
"{:<25} {:<20} {:<40} {:<50} {:<15}",
|
||||
truncate(&ns, 25),
|
||||
truncate(&name, 20),
|
||||
truncate(&hosts, 40),
|
||||
truncate(&paths, 50),
|
||||
tls,
|
||||
);
|
||||
}
|
||||
|
||||
println!("\nTotal: {} Ingress(es)", ingresses.len());
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// ── routes ─────────────────────────────────────────────────────────
|
||||
|
||||
async fn cmd_routes(
|
||||
client: &Client,
|
||||
namespace: Option<String>,
|
||||
host_filter: Option<String>,
|
||||
json: bool,
|
||||
) -> Result<(), Box<dyn std::error::Error>> {
|
||||
let ingresses = list_ingresses(client, namespace.as_deref()).await?;
|
||||
let mut routes: Vec<RouteRow> = Vec::new();
|
||||
|
||||
for ing in &ingresses {
|
||||
for rule in ing
|
||||
.spec
|
||||
.as_ref()
|
||||
.and_then(|s| s.rules.as_ref())
|
||||
.into_iter()
|
||||
.flatten()
|
||||
{
|
||||
let host = rule.host.as_deref().unwrap_or("*");
|
||||
if let Some(ref hf) = host_filter {
|
||||
if host != hf {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
if let Some(http) = &rule.http {
|
||||
for path_item in &http.paths {
|
||||
let backend = extract_backend(path_item);
|
||||
let port = extract_backend_port(path_item);
|
||||
routes.push(RouteRow {
|
||||
namespace: ing.namespace(),
|
||||
ingress: ing.name_any(),
|
||||
host: host.to_string(),
|
||||
path: path_item.path.clone().unwrap_or_else(|| "/".into()),
|
||||
path_type: path_item.path_type.clone(),
|
||||
backend,
|
||||
port,
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if json {
|
||||
println!("{}", serde_json::to_string_pretty(&routes)?);
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
if routes.is_empty() {
|
||||
println!("No routes found.");
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
println!(
|
||||
"{:<20} {:<20} {:<30} {:<18} {:<15} {:<15} {:<15}",
|
||||
"NAMESPACE", "INGRESS", "HOST", "PATH", "TYPE", "BACKEND", "PORT"
|
||||
);
|
||||
println!("{:-<133}", "");
|
||||
|
||||
for r in &routes {
|
||||
let port = extract_backend_port_str(r);
|
||||
println!(
|
||||
"{:<20} {:<20} {:<30} {:<18} {:<15} {:<15} {:<15}",
|
||||
truncate(&r.namespace, 20),
|
||||
truncate(&r.ingress, 20),
|
||||
truncate(&r.host, 30),
|
||||
truncate(&r.path, 18),
|
||||
truncate(&r.path_type, 15),
|
||||
truncate(&r.backend, 15),
|
||||
port,
|
||||
);
|
||||
}
|
||||
|
||||
println!("\nTotal: {} route(s)", routes.len());
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// ── backends ───────────────────────────────────────────────────────
|
||||
|
||||
async fn cmd_backends(
|
||||
client: &Client,
|
||||
namespace: Option<String>,
|
||||
json: bool,
|
||||
) -> Result<(), Box<dyn std::error::Error>> {
|
||||
let ingresses = list_ingresses(client, namespace.as_deref()).await?;
|
||||
|
||||
// Collect unique backends from all ingresses
|
||||
let mut backends: Vec<BackendRow> = Vec::new();
|
||||
let mut seen = std::collections::HashSet::new();
|
||||
|
||||
for ing in &ingresses {
|
||||
for rule in ing
|
||||
.spec
|
||||
.as_ref()
|
||||
.and_then(|s| s.rules.as_ref())
|
||||
.into_iter()
|
||||
.flatten()
|
||||
{
|
||||
if let Some(http) = &rule.http {
|
||||
for path_item in &http.paths {
|
||||
let svc = match path_item.backend.service.as_ref() {
|
||||
Some(s) => s,
|
||||
None => continue,
|
||||
};
|
||||
let key = format!(
|
||||
"{}/{}:{}",
|
||||
ing.namespace(),
|
||||
svc.name,
|
||||
svc.port.as_ref().and_then(|p| p.number).unwrap_or(80)
|
||||
);
|
||||
if seen.insert(key.clone()) {
|
||||
let ns = ing.namespace();
|
||||
let ep_status = get_endpoint_status(client, &ns, &svc.name).await;
|
||||
backends.push(BackendRow {
|
||||
namespace: ns,
|
||||
service: svc.name.clone(),
|
||||
port: svc.port.as_ref().and_then(|p| p.number).unwrap_or(80) as u16,
|
||||
ready_endpoints: ep_status.ready,
|
||||
total_endpoints: ep_status.total,
|
||||
referenced_by: ing.name_any(),
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if json {
|
||||
println!("{}", serde_json::to_string_pretty(&backends)?);
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
if backends.is_empty() {
|
||||
println!("No backends found.");
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
println!(
|
||||
"{:<20} {:<20} {:<8} {:<8} {:<18} {:<20}",
|
||||
"NAMESPACE", "SERVICE", "PORT", "HEALTH", "ENDPOINTS", "REFERENCED BY"
|
||||
);
|
||||
println!("{:-<94}", "");
|
||||
|
||||
for b in &backends {
|
||||
let health = if b.total_endpoints == 0 {
|
||||
"WARN"
|
||||
} else if b.ready_endpoints == 0 {
|
||||
"DOWN"
|
||||
} else if b.ready_endpoints < b.total_endpoints {
|
||||
"PARTIAL"
|
||||
} else {
|
||||
"OK"
|
||||
};
|
||||
let eps = format!("{}/{} ready", b.ready_endpoints, b.total_endpoints);
|
||||
println!(
|
||||
"{:<20} {:<20} {:<8} {:<8} {:<18} {:<20}",
|
||||
truncate(&b.namespace, 20),
|
||||
truncate(&b.service, 20),
|
||||
b.port,
|
||||
health,
|
||||
eps,
|
||||
truncate(&b.referenced_by, 20),
|
||||
);
|
||||
}
|
||||
|
||||
println!("\nTotal: {} backend(s)", backends.len());
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// ── certs ──────────────────────────────────────────────────────────
|
||||
|
||||
async fn cmd_certs(
|
||||
client: &Client,
|
||||
namespace: Option<String>,
|
||||
json: bool,
|
||||
) -> Result<(), Box<dyn std::error::Error>> {
|
||||
let ingresses = list_ingresses(client, namespace.as_deref()).await?;
|
||||
let mut certs: Vec<CertRow> = Vec::new();
|
||||
|
||||
for ing in &ingresses {
|
||||
let ns = ing.namespace();
|
||||
let tls_entries = ing
|
||||
.spec
|
||||
.as_ref()
|
||||
.and_then(|s| s.tls.as_ref())
|
||||
.cloned()
|
||||
.unwrap_or_default();
|
||||
|
||||
for tls in &tls_entries {
|
||||
let secret_name = tls.secret_name.clone().unwrap_or_default();
|
||||
let hosts = tls.hosts.clone().unwrap_or_default();
|
||||
|
||||
// Check if the secret exists
|
||||
let secret_exists = check_secret_exists(client, &ns, &secret_name).await;
|
||||
|
||||
for host in &hosts {
|
||||
certs.push(CertRow {
|
||||
namespace: ns.clone(),
|
||||
secret_name: secret_name.clone(),
|
||||
host: host.clone(),
|
||||
found: secret_exists,
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if json {
|
||||
println!("{}", serde_json::to_string_pretty(&certs)?);
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
if certs.is_empty() {
|
||||
println!("No TLS certificates configured.");
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
println!(
|
||||
"{:<20} {:<30} {:<30} {:<10}",
|
||||
"NAMESPACE", "SECRET", "HOST", "STATUS"
|
||||
);
|
||||
println!("{:-<90}", "");
|
||||
|
||||
for c in &certs {
|
||||
let status = if c.found { "OK" } else { "MISSING" };
|
||||
println!(
|
||||
"{:<20} {:<30} {:<30} {:<10}",
|
||||
truncate(&c.namespace, 20),
|
||||
truncate(&c.secret_name, 30),
|
||||
truncate(&c.host, 30),
|
||||
status,
|
||||
);
|
||||
}
|
||||
|
||||
let missing = certs.iter().filter(|c| !c.found).count();
|
||||
println!("\nTotal: {} cert(s), {} missing", certs.len(), missing);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// ── validate ───────────────────────────────────────────────────────
|
||||
|
||||
async fn cmd_validate(
|
||||
client: &Client,
|
||||
namespace: Option<String>,
|
||||
) -> Result<(), Box<dyn std::error::Error>> {
|
||||
let ingresses = list_ingresses(client, namespace.as_deref()).await?;
|
||||
let mut errors = 0usize;
|
||||
let mut warnings = 0usize;
|
||||
|
||||
for ing in &ingresses {
|
||||
let ns = ing.namespace();
|
||||
let name = ing.name_any();
|
||||
|
||||
// Check: has rules
|
||||
let has_rules = ing
|
||||
.spec
|
||||
.as_ref()
|
||||
.map(|s| s.rules.as_ref().map(|r| !r.is_empty()).unwrap_or(false))
|
||||
.unwrap_or(false);
|
||||
if !has_rules {
|
||||
println!("[{}/{}] ERROR: No routing rules defined", ns, name);
|
||||
errors += 1;
|
||||
}
|
||||
|
||||
// Check: has TLS but no secret
|
||||
let tls_entries = ing
|
||||
.spec
|
||||
.as_ref()
|
||||
.and_then(|s| s.tls.as_ref())
|
||||
.cloned()
|
||||
.unwrap_or_default();
|
||||
for tls in &tls_entries {
|
||||
let secret_name = tls.secret_name.as_deref().unwrap_or("");
|
||||
if secret_name.is_empty() {
|
||||
println!(
|
||||
"[{}/{}] WARNING: TLS configured but no secretName specified",
|
||||
ns, name
|
||||
);
|
||||
warnings += 1;
|
||||
} else {
|
||||
let found = check_secret_exists(client, &ns, secret_name).await;
|
||||
if !found {
|
||||
println!(
|
||||
"[{}/{}] ERROR: TLS secret '{}' not found in namespace '{}'",
|
||||
ns, name, secret_name, ns
|
||||
);
|
||||
errors += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Check: path backends reference valid services
|
||||
if let Some(rules) = ing.spec.as_ref().and_then(|s| s.rules.as_ref()) {
|
||||
for rule in rules {
|
||||
if let Some(http) = &rule.http {
|
||||
for path_item in &http.paths {
|
||||
if let Some(svc) = &path_item.backend.service {
|
||||
let endpoints = get_endpoint_status(client, &ns, &svc.name).await;
|
||||
if endpoints.total == 0 {
|
||||
println!(
|
||||
"[{}/{}] WARNING: Backend service '{}' has no endpoints (host: {})",
|
||||
ns,
|
||||
name,
|
||||
svc.name,
|
||||
rule.host.as_deref().unwrap_or("*")
|
||||
);
|
||||
warnings += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if errors == 0 && warnings == 0 {
|
||||
println!(
|
||||
"Validation passed — no issues found in {} Ingress(es).",
|
||||
ingresses.len()
|
||||
);
|
||||
} else {
|
||||
println!(
|
||||
"\nValidation complete: {} error(s), {} warning(s) across {} Ingress(es).",
|
||||
errors,
|
||||
warnings,
|
||||
ingresses.len()
|
||||
);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// ── status ─────────────────────────────────────────────────────────
|
||||
|
||||
async fn cmd_status(client: &Client, json: bool) -> Result<(), Box<dyn std::error::Error>> {
|
||||
let ingresses = list_ingresses(client, None).await?;
|
||||
let controller_pods = find_gingress_pods(client).await;
|
||||
|
||||
if json {
|
||||
#[derive(serde::Serialize)]
|
||||
struct StatusOutput {
|
||||
controller_pods: Vec<PodInfo>,
|
||||
ingress_count: usize,
|
||||
route_count: usize,
|
||||
backend_count: usize,
|
||||
cert_count: usize,
|
||||
}
|
||||
let mut route_count = 0usize;
|
||||
let mut backend_set = std::collections::HashSet::new();
|
||||
let mut cert_count = 0usize;
|
||||
for ing in &ingresses {
|
||||
if let Some(spec) = &ing.spec {
|
||||
if let Some(rules) = &spec.rules {
|
||||
for rule in rules {
|
||||
if let Some(http) = &rule.http {
|
||||
route_count += http.paths.len();
|
||||
for p in &http.paths {
|
||||
if let Some(svc) = &p.backend.service {
|
||||
backend_set.insert(format!("{}/{}", ing.namespace(), svc.name));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if let Some(tls) = &spec.tls {
|
||||
cert_count += tls.len();
|
||||
}
|
||||
}
|
||||
}
|
||||
println!(
|
||||
"{}",
|
||||
serde_json::to_string_pretty(&StatusOutput {
|
||||
controller_pods,
|
||||
ingress_count: ingresses.len(),
|
||||
route_count,
|
||||
backend_count: backend_set.len(),
|
||||
cert_count,
|
||||
})?
|
||||
);
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// Controller status
|
||||
println!("══ GIngress Controller Status ══\n");
|
||||
|
||||
if controller_pods.is_empty() {
|
||||
println!(
|
||||
"Controller: NOT FOUND (no pods with label gingress.io/component=controller)"
|
||||
);
|
||||
} else {
|
||||
for pod in &controller_pods {
|
||||
let ready = if pod.ready { "Running" } else { "NotReady" };
|
||||
println!(
|
||||
"Controller Pod: {:<30} {:<12} {}",
|
||||
truncate(&pod.name, 30),
|
||||
ready,
|
||||
pod.namespace
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// Resource summary
|
||||
println!();
|
||||
println!("══ Managed Resources ══\n");
|
||||
|
||||
let mut route_count = 0usize;
|
||||
let mut backend_set = std::collections::HashSet::new();
|
||||
let mut backend_total_eps = 0usize;
|
||||
let mut backend_ready_eps = 0usize;
|
||||
let mut cert_count = 0usize;
|
||||
let mut cert_missing = 0usize;
|
||||
|
||||
for ing in &ingresses {
|
||||
if let Some(spec) = &ing.spec {
|
||||
if let Some(rules) = &spec.rules {
|
||||
for rule in rules {
|
||||
if let Some(http) = &rule.http {
|
||||
route_count += http.paths.len();
|
||||
for p in &http.paths {
|
||||
if let Some(svc) = &p.backend.service {
|
||||
let key = format!("{}/{}", ing.namespace(), svc.name);
|
||||
if backend_set.insert(key) {
|
||||
let eps =
|
||||
get_endpoint_status(client, &ing.namespace(), &svc.name)
|
||||
.await;
|
||||
backend_total_eps += eps.total;
|
||||
backend_ready_eps += eps.ready;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if let Some(tls) = &spec.tls {
|
||||
cert_count += tls.len();
|
||||
for tls in tls {
|
||||
let sn = tls.secret_name.as_deref().unwrap_or("");
|
||||
if !sn.is_empty() && !check_secret_exists(client, &ing.namespace(), sn).await {
|
||||
cert_missing += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
println!("Ingresses: {}", ingresses.len());
|
||||
println!("Routes: {}", route_count);
|
||||
println!(
|
||||
"Backends: {} ({} ready / {} total endpoints)",
|
||||
backend_set.len(),
|
||||
backend_ready_eps,
|
||||
backend_total_eps
|
||||
);
|
||||
println!(
|
||||
"TLS Certs: {} ({} missing)",
|
||||
cert_count, cert_missing
|
||||
);
|
||||
println!();
|
||||
|
||||
// Overall health
|
||||
let healthy = !ingresses.is_empty()
|
||||
&& (cert_missing == 0)
|
||||
&& (backend_set.is_empty() || backend_ready_eps > 0)
|
||||
&& controller_pods.iter().any(|p| p.ready);
|
||||
|
||||
if healthy {
|
||||
println!("Status: HEALTHY");
|
||||
} else {
|
||||
println!("Status: DEGRADED");
|
||||
if controller_pods.is_empty() || !controller_pods.iter().any(|p| p.ready) {
|
||||
println!(" → Controller pod not running or not ready");
|
||||
}
|
||||
if cert_missing > 0 {
|
||||
println!(" → {} TLS secret(s) missing", cert_missing);
|
||||
}
|
||||
if !backend_set.is_empty() && backend_ready_eps == 0 {
|
||||
println!(" → No ready backend endpoints");
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// ── k8s helpers ────────────────────────────────────────────────────
|
||||
|
||||
#[derive(serde::Serialize)]
|
||||
struct IngressSummary {
|
||||
namespace: String,
|
||||
name: String,
|
||||
hosts: Vec<String>,
|
||||
#[serde(skip)]
|
||||
paths_for_display: Vec<PathSummary>,
|
||||
has_tls: bool,
|
||||
#[serde(skip)]
|
||||
spec: Option<k8s_openapi::api::networking::v1::IngressSpec>,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct PathSummary {
|
||||
path_type: String,
|
||||
path: String,
|
||||
}
|
||||
|
||||
impl IngressSummary {
|
||||
fn namespace(&self) -> String {
|
||||
self.namespace.clone()
|
||||
}
|
||||
fn name_any(&self) -> String {
|
||||
self.name.clone()
|
||||
}
|
||||
fn hosts(&self) -> &[String] {
|
||||
&self.hosts
|
||||
}
|
||||
fn paths_display(&self) -> &[PathSummary] {
|
||||
&self.paths_for_display
|
||||
}
|
||||
fn has_tls(&self) -> bool {
|
||||
self.has_tls
|
||||
}
|
||||
}
|
||||
|
||||
fn ingress_to_summary(ing: &Ingress) -> IngressSummary {
|
||||
let spec = ing.spec.clone();
|
||||
let mut hosts = Vec::new();
|
||||
let mut paths = Vec::new();
|
||||
let mut has_tls = false;
|
||||
|
||||
if let Some(ref s) = spec {
|
||||
if let Some(ref rules) = s.rules {
|
||||
for rule in rules {
|
||||
if let Some(ref host) = rule.host {
|
||||
hosts.push(host.clone());
|
||||
}
|
||||
if let Some(ref http) = rule.http {
|
||||
for p in &http.paths {
|
||||
paths.push(PathSummary {
|
||||
path_type: p.path_type.clone(),
|
||||
path: p.path.clone().unwrap_or_else(|| "/".into()),
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
has_tls = s.tls.as_ref().map(|t| !t.is_empty()).unwrap_or(false);
|
||||
}
|
||||
|
||||
IngressSummary {
|
||||
namespace: ing.namespace().unwrap_or_default(),
|
||||
name: ing.name_any(),
|
||||
hosts,
|
||||
paths_for_display: paths,
|
||||
has_tls,
|
||||
spec,
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(serde::Serialize)]
|
||||
struct PodInfo {
|
||||
name: String,
|
||||
namespace: String,
|
||||
ready: bool,
|
||||
}
|
||||
|
||||
/// Find GIngress controller pods by label `gingress.io/component=controller`.
|
||||
async fn find_gingress_pods(client: &Client) -> Vec<PodInfo> {
|
||||
let api: Api<Pod> = Api::all(client.clone());
|
||||
let lp = ListParams {
|
||||
label_selector: Some("gingress.io/component=controller".into()),
|
||||
..Default::default()
|
||||
};
|
||||
match api.list(&lp).await {
|
||||
Ok(list) => list
|
||||
.items
|
||||
.into_iter()
|
||||
.map(|pod| {
|
||||
let ready = pod
|
||||
.status
|
||||
.as_ref()
|
||||
.and_then(|s| s.conditions.as_ref())
|
||||
.map(|conds| {
|
||||
conds
|
||||
.iter()
|
||||
.any(|c| c.type_ == "Ready" && c.status == "True")
|
||||
})
|
||||
.unwrap_or(false);
|
||||
PodInfo {
|
||||
name: pod.name_any(),
|
||||
namespace: pod.namespace().unwrap_or_default(),
|
||||
ready,
|
||||
}
|
||||
})
|
||||
.collect(),
|
||||
Err(_) => Vec::new(),
|
||||
}
|
||||
}
|
||||
|
||||
async fn list_ingresses(
|
||||
client: &Client,
|
||||
namespace: Option<&str>,
|
||||
) -> Result<Vec<IngressSummary>, Box<dyn std::error::Error>> {
|
||||
let params = ListParams {
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
if let Some(ns) = namespace {
|
||||
let api: Api<Ingress> = Api::namespaced(client.clone(), ns);
|
||||
let list = api.list(¶ms).await?;
|
||||
Ok(list
|
||||
.items
|
||||
.into_iter()
|
||||
.filter(|ing| is_gingress_class(ing))
|
||||
.map(|ing| ingress_to_summary(&ing))
|
||||
.collect())
|
||||
} else {
|
||||
let api: Api<Ingress> = Api::all(client.clone());
|
||||
let list = api.list(¶ms).await?;
|
||||
Ok(list
|
||||
.items
|
||||
.into_iter()
|
||||
.filter(|ing| is_gingress_class(ing))
|
||||
.map(|ing| ingress_to_summary(&ing))
|
||||
.collect())
|
||||
}
|
||||
}
|
||||
|
||||
fn is_gingress_class(ingress: &Ingress) -> bool {
|
||||
ingress
|
||||
.spec
|
||||
.as_ref()
|
||||
.and_then(|s| s.ingress_class_name.as_deref())
|
||||
== Some(INGRESS_CLASS)
|
||||
}
|
||||
|
||||
struct EndpointStatus {
|
||||
ready: usize,
|
||||
total: usize,
|
||||
}
|
||||
|
||||
async fn get_endpoint_status(
|
||||
client: &Client,
|
||||
namespace: &str,
|
||||
service_name: &str,
|
||||
) -> EndpointStatus {
|
||||
use k8s_openapi::api::core::v1::Endpoints;
|
||||
let api: Api<Endpoints> = Api::namespaced(client.clone(), namespace);
|
||||
match api.get_opt(service_name).await {
|
||||
Ok(Some(eps)) => {
|
||||
let mut ready = 0usize;
|
||||
let mut total = 0usize;
|
||||
if let Some(subsets) = &eps.subsets {
|
||||
for subset in subsets {
|
||||
let addrs = subset.addresses.as_deref().unwrap_or_default();
|
||||
let not_ready = subset.not_ready_addresses.as_deref().unwrap_or_default();
|
||||
ready += addrs.len();
|
||||
total += addrs.len() + not_ready.len();
|
||||
}
|
||||
}
|
||||
EndpointStatus { ready, total }
|
||||
}
|
||||
_ => EndpointStatus { ready: 0, total: 0 },
|
||||
}
|
||||
}
|
||||
|
||||
async fn check_secret_exists(client: &Client, namespace: &str, name: &str) -> bool {
|
||||
let api: Api<Secret> = Api::namespaced(client.clone(), namespace);
|
||||
api.get_opt(name).await.ok().flatten().is_some()
|
||||
}
|
||||
|
||||
fn extract_backend(path_item: &HTTPIngressPath) -> String {
|
||||
path_item
|
||||
.backend
|
||||
.service
|
||||
.as_ref()
|
||||
.map(|s| s.name.clone())
|
||||
.unwrap_or_else(|| "<resource>".into())
|
||||
}
|
||||
|
||||
fn extract_backend_port(path_item: &HTTPIngressPath) -> u16 {
|
||||
path_item
|
||||
.backend
|
||||
.service
|
||||
.as_ref()
|
||||
.and_then(|s| s.port.as_ref())
|
||||
.and_then(|p| p.number)
|
||||
.unwrap_or(80) as u16
|
||||
}
|
||||
|
||||
fn extract_backend_port_str(r: &RouteRow) -> String {
|
||||
r.port.to_string()
|
||||
}
|
||||
|
||||
#[derive(serde::Serialize)]
|
||||
struct RouteRow {
|
||||
namespace: String,
|
||||
ingress: String,
|
||||
host: String,
|
||||
path: String,
|
||||
path_type: String,
|
||||
backend: String,
|
||||
port: u16,
|
||||
}
|
||||
|
||||
#[derive(serde::Serialize)]
|
||||
struct BackendRow {
|
||||
namespace: String,
|
||||
service: String,
|
||||
port: u16,
|
||||
ready_endpoints: usize,
|
||||
total_endpoints: usize,
|
||||
referenced_by: String,
|
||||
}
|
||||
|
||||
#[derive(serde::Serialize)]
|
||||
struct CertRow {
|
||||
namespace: String,
|
||||
secret_name: String,
|
||||
host: String,
|
||||
found: bool,
|
||||
}
|
||||
|
||||
fn truncate(s: &str, max: usize) -> String {
|
||||
// Account for CJK characters — each wide char counts as 2
|
||||
let mut width = 0usize;
|
||||
let mut result = String::new();
|
||||
for c in s.chars() {
|
||||
let cw = if c.is_ascii() { 1 } else { 2 };
|
||||
if width + cw > max {
|
||||
result.push_str("…");
|
||||
break;
|
||||
}
|
||||
result.push(c);
|
||||
width += cw;
|
||||
}
|
||||
result
|
||||
}
|
||||
@ -1,148 +0,0 @@
|
||||
//! Watches Kubernetes Endpoints and updates upstream endpoint lists.
|
||||
//!
|
||||
//! Tracks Pod IPs for each Service. When endpoints change (scale up/down,
|
||||
//! rolling restart, health check failures), the upstream pool is updated.
|
||||
|
||||
use futures::StreamExt;
|
||||
use futures::pin_mut;
|
||||
use gingress_proxy::config::{ConfigStore, Endpoint};
|
||||
use k8s_openapi::api::core::v1::Endpoints as K8sEndpoints;
|
||||
use kube::ResourceExt;
|
||||
use kube::runtime::watcher::{self, Event};
|
||||
use std::sync::Arc;
|
||||
|
||||
/// Watch Endpoints and update the ConfigStore.
|
||||
pub async fn watch_endpoints(
|
||||
client: Arc<kube::Client>,
|
||||
store: Arc<ConfigStore>,
|
||||
_namespace: Option<String>,
|
||||
on_change: Arc<dyn Fn() + Send + Sync>,
|
||||
) {
|
||||
let api = kube::Api::<K8sEndpoints>::all(client.as_ref().clone());
|
||||
let config = watcher::Config::default();
|
||||
let watcher = watcher::watcher(api, config);
|
||||
pin_mut!(watcher);
|
||||
|
||||
while let Some(event) = watcher.next().await {
|
||||
match event {
|
||||
Ok(Event::Apply(eps)) => {
|
||||
process_endpoints(&eps, &store, &on_change);
|
||||
}
|
||||
Ok(Event::Init) => {
|
||||
tracing::info!("Endpoint watcher re-initializing");
|
||||
}
|
||||
Ok(Event::InitApply(eps)) => {
|
||||
process_endpoints(&eps, &store, &on_change);
|
||||
}
|
||||
Ok(Event::InitDone) => {
|
||||
tracing::info!("Endpoint watcher init complete");
|
||||
}
|
||||
Ok(Event::Delete(eps)) => {
|
||||
remove_endpoints(&eps, &store, &on_change);
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::error!("Endpoint watcher error: {}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Extract endpoint addresses, grouped by port, and update the ConfigStore.
|
||||
///
|
||||
/// Stores endpoints under key `upstream:<ns>/<name>:<port>` to match
|
||||
/// the proxy's upstream lookup format.
|
||||
fn process_endpoints(
|
||||
endpoints: &K8sEndpoints,
|
||||
store: &ConfigStore,
|
||||
on_change: &Arc<dyn Fn() + Send + Sync>,
|
||||
) {
|
||||
use std::collections::HashMap;
|
||||
|
||||
let name = endpoints.name_any();
|
||||
let namespace = endpoints.namespace().unwrap_or_default();
|
||||
let base_prefix = format!("upstream:{}/{}:", namespace, name);
|
||||
|
||||
// Collect endpoints grouped by port
|
||||
let mut port_groups: HashMap<u16, Vec<Endpoint>> = HashMap::new();
|
||||
|
||||
if let Some(subsets) = &endpoints.subsets {
|
||||
for subset in subsets {
|
||||
let addrs = subset.addresses.as_deref().unwrap_or_default();
|
||||
let ports = subset.ports.as_deref().unwrap_or_default();
|
||||
let not_ready_addrs = subset.not_ready_addresses.as_deref().unwrap_or_default();
|
||||
|
||||
for port in ports {
|
||||
let port_num = port.port as u16;
|
||||
let eps = port_groups.entry(port_num).or_default();
|
||||
for addr in addrs {
|
||||
eps.push(Endpoint {
|
||||
ip: addr.ip.clone(),
|
||||
port: port_num,
|
||||
ready: true,
|
||||
});
|
||||
}
|
||||
for addr in not_ready_addrs {
|
||||
eps.push(Endpoint {
|
||||
ip: addr.ip.clone(),
|
||||
port: port_num,
|
||||
ready: false,
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Clear old per-port keys for this service (handles port removal)
|
||||
let old_keys = store.keys_with_prefix(&base_prefix);
|
||||
for k in old_keys {
|
||||
store.remove(&k);
|
||||
}
|
||||
|
||||
// Write per-port endpoint entries
|
||||
let mut total = 0usize;
|
||||
for (port_num, eps) in &port_groups {
|
||||
let key = format!("{}{}", base_prefix, port_num);
|
||||
store.set(&key, eps);
|
||||
total += eps.len();
|
||||
}
|
||||
|
||||
// If no ports at all, write an empty entry for the base key so the reconciler
|
||||
// can detect that this service has no endpoints.
|
||||
if port_groups.is_empty() {
|
||||
store.set::<Vec<Endpoint>>(&format!("upstream:{}/{}", namespace, name), &vec![]);
|
||||
}
|
||||
|
||||
store.signal_reload();
|
||||
on_change();
|
||||
|
||||
tracing::debug!(
|
||||
namespace = %namespace,
|
||||
name = %name,
|
||||
num_ports = port_groups.len(),
|
||||
num_endpoints = total,
|
||||
"Endpoints updated"
|
||||
);
|
||||
}
|
||||
|
||||
/// Remove all per-port endpoint keys when the Endpoint resource is deleted.
|
||||
fn remove_endpoints(
|
||||
endpoints: &K8sEndpoints,
|
||||
store: &ConfigStore,
|
||||
on_change: &Arc<dyn Fn() + Send + Sync>,
|
||||
) {
|
||||
let name = endpoints.name_any();
|
||||
let namespace = endpoints.namespace().unwrap_or_default();
|
||||
let base_prefix = format!("upstream:{}/{}:", namespace, name);
|
||||
|
||||
// Remove all per-port keys
|
||||
let keys = store.keys_with_prefix(&base_prefix);
|
||||
for k in keys {
|
||||
store.remove(&k);
|
||||
}
|
||||
// Also remove the port-less key (in case no ports were present)
|
||||
store.remove(&format!("upstream:{}/{}", namespace, name));
|
||||
|
||||
store.signal_reload();
|
||||
on_change();
|
||||
tracing::info!(namespace = %namespace, name = %name, "Endpoints removed");
|
||||
}
|
||||
@ -1,426 +0,0 @@
|
||||
//! Watches Kubernetes Ingress resources and converts them to routing rules.
|
||||
|
||||
use futures::StreamExt;
|
||||
use futures::pin_mut;
|
||||
use gingress_proxy::config::{
|
||||
ConfigStore, HeaderOp, PathType, RateLimitPolicy, RouteRule, SessionAffinityConfig,
|
||||
};
|
||||
use k8s_openapi::api::networking::v1::{HTTPIngressPath, Ingress};
|
||||
use kube::ResourceExt;
|
||||
use kube::runtime::watcher::{self, Event};
|
||||
use std::collections::BTreeMap;
|
||||
use std::sync::Arc;
|
||||
|
||||
/// Watch Ingress resources and update the ConfigStore.
|
||||
///
|
||||
/// After each event, the `on_change` callback is invoked so the reconciler
|
||||
/// can cross-reference all fragments into a complete ProxyConfig.
|
||||
pub async fn watch_ingresses(
|
||||
client: Arc<kube::Client>,
|
||||
store: Arc<ConfigStore>,
|
||||
ingress_class: String,
|
||||
namespace: Option<String>,
|
||||
on_change: Arc<dyn Fn() + Send + Sync>,
|
||||
) {
|
||||
let api = kube::Api::<Ingress>::all(client.as_ref().clone());
|
||||
|
||||
let config = watcher::Config {
|
||||
field_selector: namespace
|
||||
.as_ref()
|
||||
.map(|ns| format!("metadata.namespace={}", ns)),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let ingress_watcher = watcher::watcher(api, config);
|
||||
pin_mut!(ingress_watcher);
|
||||
|
||||
while let Some(event) = ingress_watcher.next().await {
|
||||
match event {
|
||||
Ok(Event::Apply(ingress)) => {
|
||||
let name = ingress.name_any();
|
||||
let ns = ingress.namespace().unwrap_or_default();
|
||||
if is_gingress_class(&ingress, &ingress_class) {
|
||||
process_ingress(&ingress, &store, &ingress_class);
|
||||
on_change();
|
||||
tracing::info!(namespace = %ns, name = %name, "Ingress applied");
|
||||
}
|
||||
}
|
||||
Ok(Event::Init) => {
|
||||
store.remove_prefix("ingress:");
|
||||
store.remove_prefix("tls-host:");
|
||||
tracing::info!("Ingress watcher re-initializing");
|
||||
}
|
||||
Ok(Event::InitApply(ingress)) => {
|
||||
if is_gingress_class(&ingress, &ingress_class) {
|
||||
process_ingress(&ingress, &store, &ingress_class);
|
||||
}
|
||||
}
|
||||
Ok(Event::InitDone) => {
|
||||
store.signal_reload();
|
||||
on_change();
|
||||
tracing::info!("Ingress watcher init complete");
|
||||
}
|
||||
Ok(Event::Delete(ingress)) => {
|
||||
if is_gingress_class(&ingress, &ingress_class) {
|
||||
remove_ingress_routes(&ingress, &store);
|
||||
on_change();
|
||||
tracing::info!(
|
||||
name = %ingress.name_any(),
|
||||
namespace = %ingress.namespace().unwrap_or_default(),
|
||||
"Ingress deleted, routes removed"
|
||||
);
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::error!("Ingress watcher error: {}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Check if an Ingress specifies the gingress class.
|
||||
fn is_gingress_class(ingress: &Ingress, class_name: &str) -> bool {
|
||||
ingress
|
||||
.spec
|
||||
.as_ref()
|
||||
.and_then(|s| s.ingress_class_name.as_deref())
|
||||
== Some(class_name)
|
||||
}
|
||||
|
||||
/// Process an Ingress resource: extract routes and update the store.
|
||||
fn process_ingress(ingress: &Ingress, store: &ConfigStore, _ingress_class: &str) {
|
||||
let namespace = ingress.namespace().unwrap_or_default();
|
||||
let name = ingress.name_any();
|
||||
let spec = match ingress.spec.as_ref() {
|
||||
Some(s) => s,
|
||||
None => return,
|
||||
};
|
||||
|
||||
// Build an ingress-scoped prefix so we can clean up old routes for this Ingress
|
||||
let ingress_prefix = format!("ingress:{}/{}:", namespace, name);
|
||||
|
||||
// Remove old route entries scoped to this Ingress
|
||||
let old_route_keys = store.keys_with_prefix(&format!("{}route:", ingress_prefix));
|
||||
for key in &old_route_keys {
|
||||
store.remove(key);
|
||||
}
|
||||
|
||||
// Process routing rules
|
||||
if let Some(rules) = &spec.rules {
|
||||
for rule in rules {
|
||||
let host = rule.host.as_deref().unwrap_or("*");
|
||||
if let Some(http) = &rule.http {
|
||||
let mut routes: Vec<RouteRule> = Vec::new();
|
||||
for path_item in &http.paths {
|
||||
routes.push(ingress_path_to_route(host, path_item, &namespace));
|
||||
}
|
||||
// Store per-ingress routes so we can clean up on delete
|
||||
let route_key = format!("{}route:{}", ingress_prefix, host);
|
||||
store.set(&route_key, &routes);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Process TLS: map secretName -> hosts so the reconciler can cross-reference
|
||||
if let Some(tls_entries) = &spec.tls {
|
||||
for tls in tls_entries {
|
||||
let secret_name = tls.secret_name.as_deref().unwrap_or_default();
|
||||
let hosts: Vec<String> = tls.hosts.clone().unwrap_or_default();
|
||||
let tls_host_key = format!("tls-host:{}", secret_name);
|
||||
store.set(&tls_host_key, &hosts);
|
||||
}
|
||||
}
|
||||
|
||||
// Process annotations for advanced features
|
||||
let annotations = ingress.annotations();
|
||||
process_annotations(&annotations, &ingress_prefix, &namespace, store);
|
||||
|
||||
store.signal_reload();
|
||||
}
|
||||
|
||||
/// Convert a Kubernetes Ingress path to an internal RouteRule.
|
||||
fn ingress_path_to_route(host: &str, path: &HTTPIngressPath, namespace: &str) -> RouteRule {
|
||||
let service = path
|
||||
.backend
|
||||
.service
|
||||
.as_ref()
|
||||
.expect("Ingress backend must reference a service");
|
||||
|
||||
RouteRule {
|
||||
host: host.to_string(),
|
||||
path: path.path.clone().unwrap_or_else(|| "/".to_string()),
|
||||
path_type: match path.path_type.as_str() {
|
||||
"Prefix" => PathType::Prefix,
|
||||
"Exact" => PathType::Exact,
|
||||
_ => PathType::ImplementationSpecific,
|
||||
},
|
||||
backend: gingress_proxy::config::Backend {
|
||||
namespace: namespace.to_string(),
|
||||
name: service.name.clone(),
|
||||
port: service.port.as_ref().and_then(|p| p.number).unwrap_or(80) as u16,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
/// Annotation keys for GIngress features.
|
||||
const ANN_RATE_LIMIT: &str = "gingress.io/rate-limit";
|
||||
const ANN_RATE_LIMIT_BURST: &str = "gingress.io/rate-limit-burst";
|
||||
const ANN_REQUEST_HEADERS: &str = "gingress.io/request-headers";
|
||||
const ANN_WEBSOCKET: &str = "gingress.io/websocket";
|
||||
const ANN_SESSION_AFFINITY: &str = "gingress.io/session-affinity";
|
||||
const ANN_GIT_BACKEND: &str = "gingress.io/git-backend";
|
||||
|
||||
/// Parse Ingress annotations and write corresponding ConfigStore entries.
|
||||
///
|
||||
/// Supported annotations:
|
||||
/// - `gingress.io/rate-limit` — "RPS" or "RPS/BURST" (e.g., "100" or "100/200")
|
||||
/// - `gingress.io/rate-limit-burst` — Override burst size
|
||||
/// - `gingress.io/request-headers` — JSON array of header operations
|
||||
/// - `gingress.io/websocket` — "true" to enable WebSocket upgrade for this host
|
||||
/// - `gingress.io/session-affinity` — "cookie" or "cookie:NAME:TTL_SECONDS"
|
||||
fn process_annotations(
|
||||
annotations: &BTreeMap<String, String>,
|
||||
ingress_prefix: &str,
|
||||
namespace: &str,
|
||||
store: &ConfigStore,
|
||||
) {
|
||||
// Collect hosts from the ingress routes that were just stored
|
||||
let route_keys = store.keys_with_prefix(&format!("{}route:", ingress_prefix));
|
||||
let hosts: Vec<String> = route_keys
|
||||
.iter()
|
||||
.filter_map(|k| k.split(":route:").nth(1).map(String::from))
|
||||
.collect();
|
||||
|
||||
if hosts.is_empty() {
|
||||
return;
|
||||
}
|
||||
|
||||
// Remove old per-host annotation keys (handles annotation removal/update)
|
||||
for host in &hosts {
|
||||
store.remove(&format!("rate_limit:{}", host));
|
||||
store.remove(&format!("headers:{}", host));
|
||||
store.remove(&format!("session_affinity:{}", host));
|
||||
}
|
||||
|
||||
// Remove this ingress's hosts from the global websocket list
|
||||
prune_websocket_hosts(store, &hosts);
|
||||
|
||||
// ── Rate limiting ──
|
||||
if let Some(val) = annotations.get(ANN_RATE_LIMIT) {
|
||||
let (rps, burst) = parse_rate_limit(val, annotations.get(ANN_RATE_LIMIT_BURST));
|
||||
for host in &hosts {
|
||||
store.set(
|
||||
&format!("rate_limit:{}", host),
|
||||
&RateLimitPolicy {
|
||||
host: host.clone(),
|
||||
requests_per_second: rps,
|
||||
burst_size: burst,
|
||||
},
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// ── Header operations (request) ──
|
||||
if let Some(val) = annotations.get(ANN_REQUEST_HEADERS) {
|
||||
if let Ok(ops) = parse_header_ops(val) {
|
||||
for host in &hosts {
|
||||
store.set(&format!("headers:{}", host), &ops);
|
||||
}
|
||||
} else {
|
||||
tracing::warn!(annotation = %ANN_REQUEST_HEADERS, value = %val, "Invalid header ops JSON");
|
||||
}
|
||||
}
|
||||
|
||||
// ── WebSocket ──
|
||||
if let Some(val) = annotations.get(ANN_WEBSOCKET) {
|
||||
if val.trim().to_lowercase() == "true" {
|
||||
let mut ws_hosts: Vec<String> = hosts.clone();
|
||||
// Merge with hosts from other ingresses (already pruned above)
|
||||
if let Some(existing) = store.get::<Vec<String>>("websocket:hosts") {
|
||||
for h in existing {
|
||||
if !ws_hosts.contains(&h) {
|
||||
ws_hosts.push(h);
|
||||
}
|
||||
}
|
||||
}
|
||||
store.set("websocket:hosts", &ws_hosts);
|
||||
}
|
||||
}
|
||||
|
||||
// ── Session affinity ──
|
||||
if let Some(val) = annotations.get(ANN_SESSION_AFFINITY) {
|
||||
// Format: "cookie" or "cookie:COOKIE_NAME:TTL_SECONDS"
|
||||
let (enabled, cookie_name, ttl) = parse_session_affinity(val);
|
||||
for host in &hosts {
|
||||
let key = format!("session_affinity:{}", host);
|
||||
store.set(
|
||||
&key,
|
||||
&SessionAffinityConfig {
|
||||
enabled,
|
||||
cookie_name: cookie_name.clone(),
|
||||
cookie_ttl_seconds: ttl,
|
||||
},
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// ── Git backend ──
|
||||
// When present, requests with Git User-Agent (git/*, JGit/*) are routed to
|
||||
// this backend instead of normal host+path matching.
|
||||
// Value format: "namespace/service-name:port" or "service-name:port" (namespace from Ingress)
|
||||
if let Some(val) = annotations.get(ANN_GIT_BACKEND) {
|
||||
if let Some(backend) = parse_git_backend(val, &namespace) {
|
||||
store.set("git_backend", &backend);
|
||||
tracing::info!(
|
||||
backend = format!("{}/{}:{}", backend.namespace, backend.name, backend.port),
|
||||
"Git backend configured"
|
||||
);
|
||||
} else {
|
||||
tracing::warn!(annotation = %ANN_GIT_BACKEND, value = %val, "Invalid git-backend format, expected 'namespace/name:port' or 'name:port'");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn parse_rate_limit(val: &str, burst_override: Option<&String>) -> (u32, u32) {
|
||||
let val = val.trim();
|
||||
if let Some((rps_str, burst_str)) = val.split_once('/') {
|
||||
let rps = rps_str.parse().unwrap_or(0);
|
||||
let burst = burst_str.parse().unwrap_or(rps);
|
||||
(rps, burst)
|
||||
} else {
|
||||
let rps = val.parse().unwrap_or(0);
|
||||
let burst = burst_override
|
||||
.and_then(|b| b.parse().ok())
|
||||
.unwrap_or(rps * 2);
|
||||
(rps, burst)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(serde::Deserialize)]
|
||||
struct HeaderOpAnnotation {
|
||||
op: String,
|
||||
name: String,
|
||||
#[serde(default)]
|
||||
value: Option<String>,
|
||||
}
|
||||
|
||||
fn parse_header_ops(val: &str) -> anyhow::Result<Vec<HeaderOp>> {
|
||||
let items: Vec<HeaderOpAnnotation> = serde_json::from_str(val)?;
|
||||
items
|
||||
.into_iter()
|
||||
.map(|item| {
|
||||
Ok(match item.op.as_str() {
|
||||
"set" => HeaderOp::Set {
|
||||
name: item.name,
|
||||
value: item.value.unwrap_or_default(),
|
||||
},
|
||||
"add" => HeaderOp::Add {
|
||||
name: item.name,
|
||||
value: item.value.unwrap_or_default(),
|
||||
},
|
||||
"remove" => HeaderOp::Remove { name: item.name },
|
||||
_ => anyhow::bail!("Unknown header op: {}", item.op),
|
||||
})
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
fn parse_session_affinity(val: &str) -> (bool, String, u64) {
|
||||
let val = val.trim();
|
||||
if val.eq_ignore_ascii_case("cookie") || val.eq_ignore_ascii_case("true") {
|
||||
return (true, "GINGRESS_AFFINITY".into(), 3600);
|
||||
}
|
||||
// Format: "cookie:COOKIE_NAME:TTL"
|
||||
let parts: Vec<&str> = val.split(':').collect();
|
||||
if parts.len() >= 3 {
|
||||
let name = parts[1].to_string();
|
||||
let ttl = parts[2].parse().unwrap_or(3600);
|
||||
(true, name, ttl)
|
||||
} else if parts.len() == 2 {
|
||||
let name = parts[1].to_string();
|
||||
(true, name, 3600)
|
||||
} else {
|
||||
(false, String::new(), 0)
|
||||
}
|
||||
}
|
||||
|
||||
/// Parse git-backend annotation value.
|
||||
///
|
||||
/// Format: "namespace/name:port" or "name:port" (namespace defaults to Ingress namespace).
|
||||
fn parse_git_backend(
|
||||
val: &str,
|
||||
default_namespace: &str,
|
||||
) -> Option<gingress_proxy::config::Backend> {
|
||||
let val = val.trim();
|
||||
// Split off port: "namespace/name:port" → ("namespace/name", "port")
|
||||
let (ns_name, port_str) = val.rsplit_once(':').unwrap_or((val, ""));
|
||||
let port: u16 = port_str.parse().ok()?;
|
||||
|
||||
// Split namespace and name: "namespace/name" → ("namespace", "name")
|
||||
let (namespace, name) = if let Some((ns, n)) = ns_name.rsplit_once('/') {
|
||||
(ns.to_string(), n.to_string())
|
||||
} else {
|
||||
// No namespace specified — use the Ingress namespace
|
||||
(default_namespace.to_string(), ns_name.to_string())
|
||||
};
|
||||
|
||||
if name.is_empty() {
|
||||
return None;
|
||||
}
|
||||
|
||||
Some(gingress_proxy::config::Backend {
|
||||
namespace,
|
||||
name,
|
||||
port,
|
||||
})
|
||||
}
|
||||
|
||||
/// Remove a set of hosts from the global websocket host list (scoped cleanup).
|
||||
fn prune_websocket_hosts(store: &ConfigStore, hosts_to_remove: &[String]) {
|
||||
if let Some(mut existing) = store.get::<Vec<String>>("websocket:hosts") {
|
||||
existing.retain(|h| !hosts_to_remove.contains(h));
|
||||
if existing.is_empty() {
|
||||
store.remove("websocket:hosts");
|
||||
} else {
|
||||
store.set("websocket:hosts", &existing);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Remove all routes associated with a deleted Ingress.
|
||||
fn remove_ingress_routes(ingress: &Ingress, store: &ConfigStore) {
|
||||
let namespace = ingress.namespace().unwrap_or_default();
|
||||
let name = ingress.name_any();
|
||||
let ingress_prefix = format!("ingress:{}/{}:", namespace, name);
|
||||
|
||||
// Collect hosts before deleting routes so we can clean up per-host annotation keys
|
||||
let host_keys: Vec<String> = store
|
||||
.keys_with_prefix(&format!("{}route:", ingress_prefix))
|
||||
.iter()
|
||||
.filter_map(|k| k.split(":route:").nth(1).map(String::from))
|
||||
.collect();
|
||||
|
||||
// Remove all route entries for this Ingress
|
||||
store.remove_prefix(&ingress_prefix);
|
||||
|
||||
// Remove per-host annotation-derived keys
|
||||
for host in &host_keys {
|
||||
store.remove(&format!("rate_limit:{}", host));
|
||||
store.remove(&format!("headers:{}", host));
|
||||
store.remove(&format!("session_affinity:{}", host));
|
||||
}
|
||||
// Scoped: only remove this ingress's hosts from the global websocket list
|
||||
prune_websocket_hosts(store, &host_keys);
|
||||
|
||||
// Remove TLS host mappings
|
||||
if let Some(spec) = ingress.spec.as_ref() {
|
||||
if let Some(tls_entries) = &spec.tls {
|
||||
for tls in tls_entries {
|
||||
let sn = tls.secret_name.as_deref().unwrap_or_default();
|
||||
store.remove(&format!("tls-host:{}", sn));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
store.signal_reload();
|
||||
}
|
||||
@ -1,88 +0,0 @@
|
||||
//! Kubernetes controller for GIngress.
|
||||
//!
|
||||
//! Watches Ingress, Service, EndpointSlice, and Secret resources,
|
||||
//! reconciles them into the shared `ConfigStore`.
|
||||
|
||||
mod endpoint_watcher;
|
||||
mod ingress_watcher;
|
||||
mod reconciler;
|
||||
mod secret_watcher;
|
||||
|
||||
use anyhow::Context;
|
||||
use gingress_proxy::config::ConfigStore;
|
||||
use kube::Client;
|
||||
use std::sync::Arc;
|
||||
use tokio::task::JoinHandle;
|
||||
|
||||
/// Start all controller watchers and the reconcile loop.
|
||||
///
|
||||
/// Each watcher:
|
||||
/// 1. Watches a specific K8s resource type
|
||||
/// 2. Writes its fragment to the `ConfigStore`
|
||||
/// 3. Calls `reconciler.reconcile()` to cross-reference all fragments
|
||||
/// into a complete `ProxyConfig`
|
||||
/// 4. The reconiler signals `ConfigStore::signal_reload()`
|
||||
/// 5. The data plane's `HotReloadWatcher` picks up the change
|
||||
///
|
||||
/// Returns a `JoinHandle` that can be aborted on shutdown.
|
||||
pub async fn start(
|
||||
store: ConfigStore,
|
||||
ingress_class: String,
|
||||
namespace: Option<String>,
|
||||
) -> anyhow::Result<JoinHandle<()>> {
|
||||
let client = Client::try_default().await.context(
|
||||
"Failed to create Kubernetes client. Are you running in a cluster or have a kubeconfig?",
|
||||
)?;
|
||||
|
||||
tracing::info!("Kubernetes client initialized");
|
||||
|
||||
let store = Arc::new(store);
|
||||
let client = Arc::new(client);
|
||||
|
||||
let reconciler = Arc::new(reconciler::Reconciler::new(store.clone()));
|
||||
|
||||
// Callback invoked by every watcher after processing an event.
|
||||
// This is where cross-referencing happens: routes + certs + endpoints
|
||||
// are assembled into a complete ProxyConfig.
|
||||
let on_change: Arc<dyn Fn() + Send + Sync> = {
|
||||
let r = reconciler.clone();
|
||||
Arc::new(move || {
|
||||
r.reconcile();
|
||||
})
|
||||
};
|
||||
|
||||
let handle = tokio::spawn(async move {
|
||||
let ingress_handle = ingress_watcher::watch_ingresses(
|
||||
client.clone(),
|
||||
store.clone(),
|
||||
ingress_class,
|
||||
namespace.clone(),
|
||||
on_change.clone(),
|
||||
);
|
||||
|
||||
let secret_handle = secret_watcher::watch_secrets(
|
||||
client.clone(),
|
||||
store.clone(),
|
||||
namespace.clone(),
|
||||
on_change.clone(),
|
||||
);
|
||||
|
||||
let endpoint_handle = endpoint_watcher::watch_endpoints(
|
||||
client.clone(),
|
||||
store.clone(),
|
||||
namespace,
|
||||
on_change.clone(),
|
||||
);
|
||||
|
||||
tracing::info!("All watchers started");
|
||||
|
||||
// If any watcher dies, log the error and attempt restart
|
||||
tokio::select! {
|
||||
r = ingress_handle => tracing::error!("Ingress watcher exited: {:?}", r),
|
||||
r = secret_handle => tracing::error!("Secret watcher exited: {:?}", r),
|
||||
r = endpoint_handle => tracing::error!("Endpoint watcher exited: {:?}", r),
|
||||
}
|
||||
});
|
||||
|
||||
Ok(handle)
|
||||
}
|
||||
@ -1,241 +0,0 @@
|
||||
//! Reconcile loop for the GIngress controller.
|
||||
//!
|
||||
//! After any watcher detects a change (Ingress, Secret, Endpoints),
|
||||
//! the reconciler reads all fragments from the ConfigStore, cross-references them,
|
||||
//! assembles a complete `ProxyConfig`, validates it, and signals a reload.
|
||||
|
||||
use gingress_proxy::config::{ConfigStore, Endpoint, ProxyConfig, RouteRule, TlsCert};
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::sync::Arc;
|
||||
|
||||
/// Reconcile the full proxy configuration from current k8s state.
|
||||
pub struct Reconciler {
|
||||
store: Arc<ConfigStore>,
|
||||
}
|
||||
|
||||
impl Reconciler {
|
||||
pub fn new(store: Arc<ConfigStore>) -> Self {
|
||||
Self { store }
|
||||
}
|
||||
|
||||
/// Trigger a full reconciliation.
|
||||
///
|
||||
/// 1. Reads all route fragments (from Ingress watcher)
|
||||
/// 2. Reads all TLS certs (from Secret watcher)
|
||||
/// 3. Reads all upstream endpoints (from Endpoint watcher)
|
||||
/// 4. Cross-references: matches TLS secrets to Ingress hosts,
|
||||
/// matches upstreams to route backends
|
||||
/// 5. Validates the configuration
|
||||
/// 6. Writes the assembled ProxyConfig to the store
|
||||
/// 7. Signals reload
|
||||
pub fn reconcile(&self) {
|
||||
tracing::debug!("Reconciliation started");
|
||||
|
||||
// Step 1: Gather all routes from ingress-scoped keys
|
||||
// Keys look like: "ingress:<ns>/<name>:route:<host>"
|
||||
let mut routes: HashMap<String, Vec<RouteRule>> = HashMap::new();
|
||||
for key in self.store.keys_with_prefix("ingress:") {
|
||||
if !key.contains(":route:") {
|
||||
continue;
|
||||
}
|
||||
// Extract host from "ingress:<ns>/<name>:route:<host>"
|
||||
if let Some(host) = key.split(":route:").nth(1) {
|
||||
if let Some(rules) = self.store.get::<Vec<RouteRule>>(&key) {
|
||||
if !rules.is_empty() {
|
||||
routes.entry(host.to_string()).or_default().extend(rules);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Step 2: Gather all TLS certs
|
||||
let mut tls_certs: HashMap<String, TlsCert> = HashMap::new();
|
||||
for key in self.store.keys_with_prefix("tls:") {
|
||||
if let Some(cert) = self.store.get::<TlsCert>(&key) {
|
||||
tls_certs.insert(cert.host.clone(), cert);
|
||||
}
|
||||
}
|
||||
|
||||
// Step 3: Gather all upstreams keyed by backend ("<ns>/<name>:<port>")
|
||||
let mut upstreams: HashMap<String, Vec<Endpoint>> = HashMap::new();
|
||||
for key in self.store.keys_with_prefix("upstream:") {
|
||||
if let Some(eps) = self.store.get::<Vec<Endpoint>>(&key) {
|
||||
if !eps.is_empty() {
|
||||
upstreams.insert(key.clone(), eps);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Step 4: Gather rate limits, headers, session affinity, and websocket hosts
|
||||
let rate_limits = self.collect_rate_limits(&routes);
|
||||
let headers = self.collect_headers();
|
||||
let session_affinity = self.collect_session_affinity(&routes);
|
||||
let websocket_hosts = self.collect_websocket_hosts();
|
||||
let git_upstream = self.collect_git_backend();
|
||||
|
||||
// Step 5: Build the complete ProxyConfig
|
||||
let cfg = ProxyConfig {
|
||||
routes,
|
||||
tls: tls_certs,
|
||||
upstreams,
|
||||
rate_limits,
|
||||
headers,
|
||||
session_affinity,
|
||||
websocket_hosts,
|
||||
git_upstream,
|
||||
};
|
||||
|
||||
// Step 6: Validate
|
||||
let warnings = self.validate_config(&cfg);
|
||||
for w in &warnings {
|
||||
tracing::warn!("{}", w);
|
||||
}
|
||||
|
||||
// Step 7: Store the assembled config as the canonical snapshot
|
||||
self.store.set(
|
||||
"_assembled",
|
||||
&serde_json::to_value(&cfg).unwrap_or_default(),
|
||||
);
|
||||
|
||||
self.store.signal_reload();
|
||||
tracing::info!(
|
||||
routes = cfg.routes.len(),
|
||||
tls_hosts = cfg.tls.len(),
|
||||
upstreams = cfg.upstreams.len(),
|
||||
warnings = warnings.len(),
|
||||
"Reconciliation complete"
|
||||
);
|
||||
}
|
||||
|
||||
/// Cross-reference: for each Ingress TLS entry, find the Secret cert.
|
||||
///
|
||||
/// The Ingress TLS section maps: `hosts: [example.com]` → `secretName: my-cert`.
|
||||
/// The Secret watcher stores the cert at key `tls:<secretName>`.
|
||||
/// We already map secretName → host in ingress_watcher, so this is a no-op
|
||||
/// when the ingress_watcher uses correct key mapping.
|
||||
#[allow(dead_code)]
|
||||
pub fn cross_reference_tls(&self) -> HashMap<String, TlsCert> {
|
||||
let mut host_certs: HashMap<String, TlsCert> = HashMap::new();
|
||||
|
||||
// TLS secret name → host mapping is stored by the ingress watcher
|
||||
// at key: "tls-host:<secretName>" → Vec<String> (hosts)
|
||||
for key in self.store.keys_with_prefix("tls-host:") {
|
||||
let secret_name = &key["tls-host:".len()..];
|
||||
let hosts: Vec<String> = self.store.get::<Vec<String>>(&key).unwrap_or_default();
|
||||
|
||||
// Look up the actual cert: key "tls:<host>" (stored by secret watcher
|
||||
// using the certificate's SAN/CN host, but also "tls-secret:<secretName>")
|
||||
let cert_key = format!("tls-secret:{}", secret_name);
|
||||
if let Some(cert) = self.store.get::<TlsCert>(&cert_key) {
|
||||
for host in hosts {
|
||||
host_certs.insert(host, cert.clone());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
host_certs
|
||||
}
|
||||
|
||||
/// Validate the assembled configuration. Returns warnings.
|
||||
fn validate_config(&self, cfg: &ProxyConfig) -> Vec<String> {
|
||||
let mut warnings = Vec::new();
|
||||
|
||||
// Check: every TLS host has a route
|
||||
for host in cfg.tls.keys() {
|
||||
if !cfg.routes.contains_key(host) {
|
||||
warnings.push(format!(
|
||||
"TLS configured for host '{}' but no routes exist for this host",
|
||||
host
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
// Check: every route backend has upstream endpoints
|
||||
for (host, rules) in &cfg.routes {
|
||||
for rule in rules {
|
||||
let backend_key =
|
||||
format!("upstream:{}/{}", rule.backend.namespace, rule.backend.name);
|
||||
|
||||
if !cfg.upstreams.contains_key(&backend_key) {
|
||||
warnings.push(format!(
|
||||
"Host '{}' routes to backend {}/{}:{} but no endpoints found",
|
||||
host, rule.backend.namespace, rule.backend.name, rule.backend.port
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Check: orphaned upstreams (no route references them)
|
||||
let mut referenced_backends: HashSet<String> = HashSet::new();
|
||||
for rules in cfg.routes.values() {
|
||||
for rule in rules {
|
||||
let bk = format!("upstream:{}/{}", rule.backend.namespace, rule.backend.name);
|
||||
referenced_backends.insert(bk);
|
||||
}
|
||||
}
|
||||
for upstream_key in cfg.upstreams.keys() {
|
||||
if !referenced_backends.contains(upstream_key) {
|
||||
warnings.push(format!(
|
||||
"Upstream '{}' has no routes referencing it (orphaned)",
|
||||
upstream_key
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
warnings
|
||||
}
|
||||
|
||||
/// Collect rate limit policies for all hosts that have routes.
|
||||
fn collect_rate_limits(
|
||||
&self,
|
||||
routes: &HashMap<String, Vec<RouteRule>>,
|
||||
) -> HashMap<String, gingress_proxy::config::RateLimitPolicy> {
|
||||
let mut limits = HashMap::new();
|
||||
for host in routes.keys() {
|
||||
let key = format!("rate_limit:{}", host);
|
||||
if let Some(policy) = self.store.get(&key) {
|
||||
limits.insert(host.clone(), policy);
|
||||
}
|
||||
}
|
||||
limits
|
||||
}
|
||||
|
||||
/// Collect header operations for all hosts.
|
||||
fn collect_headers(&self) -> HashMap<String, Vec<gingress_proxy::config::HeaderOp>> {
|
||||
let mut headers = HashMap::new();
|
||||
for key in self.store.keys_with_prefix("headers:") {
|
||||
let host = &key["headers:".len()..];
|
||||
if let Some(ops) = self.store.get(&key) {
|
||||
headers.insert(host.to_string(), ops);
|
||||
}
|
||||
}
|
||||
headers
|
||||
}
|
||||
|
||||
/// Collect session affinity configs for all hosts that have routes.
|
||||
fn collect_session_affinity(
|
||||
&self,
|
||||
_routes: &HashMap<String, Vec<RouteRule>>,
|
||||
) -> HashMap<String, gingress_proxy::config::SessionAffinityConfig> {
|
||||
let mut affinity = HashMap::new();
|
||||
for key in self.store.keys_with_prefix("session_affinity:") {
|
||||
let host = &key["session_affinity:".len()..];
|
||||
if let Some(cfg) = self.store.get(&key) {
|
||||
affinity.insert(host.to_string(), cfg);
|
||||
}
|
||||
}
|
||||
affinity
|
||||
}
|
||||
|
||||
/// Collect WebSocket-enabled hosts.
|
||||
fn collect_websocket_hosts(&self) -> Vec<String> {
|
||||
self.store
|
||||
.get::<Vec<String>>("websocket:hosts")
|
||||
.unwrap_or_default()
|
||||
}
|
||||
|
||||
/// Collect git backend configuration from annotation.
|
||||
fn collect_git_backend(&self) -> Option<gingress_proxy::config::Backend> {
|
||||
self.store.get("git_backend")
|
||||
}
|
||||
}
|
||||
@ -1,166 +0,0 @@
|
||||
//! Watches Kubernetes TLS Secrets and loads certificates.
|
||||
//!
|
||||
//! Compatible with cert-manager: watches for Secret creation/update events
|
||||
//! and parses `tls.crt` and `tls.key` into the ConfigStore for TLS termination.
|
||||
//!
|
||||
//! Key convention:
|
||||
//! - `tls-secret:<secretName>` — the raw cert, cross-referenced by reconciler
|
||||
//! via the `tls-host:<secretName>` mapping written by the ingress watcher.
|
||||
//! - After reconciliation, the reconciler copies certs to `tls:<host>` for
|
||||
//! direct SNI lookup by the proxy.
|
||||
|
||||
use futures::StreamExt;
|
||||
use futures::pin_mut;
|
||||
use gingress_proxy::config::{ConfigStore, TlsCert};
|
||||
use kube::ResourceExt;
|
||||
use kube::runtime::watcher::{self, Event};
|
||||
use std::sync::Arc;
|
||||
|
||||
/// Watch Secrets of type `kubernetes.io/tls` and update the ConfigStore.
|
||||
///
|
||||
/// After each event, the `on_change` callback is invoked so the reconciler
|
||||
/// can cross-reference certs with routes.
|
||||
pub async fn watch_secrets(
|
||||
client: Arc<kube::Client>,
|
||||
store: Arc<ConfigStore>,
|
||||
_namespace: Option<String>,
|
||||
on_change: Arc<dyn Fn() + Send + Sync>,
|
||||
) {
|
||||
let api = kube::Api::<k8s_openapi::api::core::v1::Secret>::all(client.as_ref().clone());
|
||||
|
||||
let config = watcher::Config {
|
||||
field_selector: Some("type=kubernetes.io/tls".to_string()),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let secret_watcher = watcher::watcher(api, config);
|
||||
pin_mut!(secret_watcher);
|
||||
|
||||
while let Some(event) = secret_watcher.next().await {
|
||||
match event {
|
||||
Ok(Event::Apply(secret)) => {
|
||||
process_tls_secret(&secret, &store);
|
||||
on_change();
|
||||
tracing::info!(
|
||||
name = %secret.name_any(),
|
||||
namespace = %secret.namespace().unwrap_or_default(),
|
||||
"TLS Secret applied"
|
||||
);
|
||||
}
|
||||
Ok(Event::Init) => {
|
||||
store.remove_prefix("tls-secret:");
|
||||
tracing::info!("Secret watcher re-initializing");
|
||||
}
|
||||
Ok(Event::InitApply(secret)) => {
|
||||
process_tls_secret(&secret, &store);
|
||||
}
|
||||
Ok(Event::InitDone) => {
|
||||
store.signal_reload();
|
||||
on_change();
|
||||
tracing::info!("Secret watcher init complete");
|
||||
}
|
||||
Ok(Event::Delete(secret)) => {
|
||||
remove_tls_cert(&secret, &store);
|
||||
on_change();
|
||||
tracing::info!(
|
||||
name = %secret.name_any(),
|
||||
"TLS Secret deleted, cert removed"
|
||||
);
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::error!("Secret watcher error: {}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Parse a TLS secret and store the certificate.
|
||||
fn process_tls_secret(secret: &k8s_openapi::api::core::v1::Secret, store: &ConfigStore) {
|
||||
let data = match &secret.data {
|
||||
Some(d) => d,
|
||||
None => return,
|
||||
};
|
||||
|
||||
let cert_pem = match data
|
||||
.get("tls.crt")
|
||||
.and_then(|v| std::str::from_utf8(&v.0).ok())
|
||||
{
|
||||
Some(v) => v.to_string(),
|
||||
None => {
|
||||
tracing::warn!(name = %secret.name_any(), "TLS Secret missing tls.crt");
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
let key_pem = match data
|
||||
.get("tls.key")
|
||||
.and_then(|v| std::str::from_utf8(&v.0).ok())
|
||||
{
|
||||
Some(v) => v.to_string(),
|
||||
None => {
|
||||
tracing::warn!(name = %secret.name_any(), "TLS Secret missing tls.key");
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
let secret_name = secret.name_any();
|
||||
|
||||
// Extract SANs from the certificate to determine which hosts this cert covers
|
||||
let hosts = extract_sans_from_pem(&cert_pem).unwrap_or_else(|| vec![secret_name.clone()]);
|
||||
|
||||
let tls_cert = TlsCert {
|
||||
host: hosts.first().cloned().unwrap_or(secret_name.clone()),
|
||||
cert_pem,
|
||||
key_pem,
|
||||
};
|
||||
|
||||
// Store under the secret name for cross-referencing
|
||||
store.set(&format!("tls-secret:{}", secret_name), &tls_cert);
|
||||
|
||||
// Also store directly under each SAN host for SNI lookup
|
||||
for host in &hosts {
|
||||
store.set(&format!("tls:{}", host), &tls_cert);
|
||||
}
|
||||
|
||||
store.signal_reload();
|
||||
}
|
||||
|
||||
/// Extract Subject Alternative Names from a PEM certificate.
|
||||
fn extract_sans_from_pem(pem_data: &str) -> Option<Vec<String>> {
|
||||
use x509_parser::prelude::*;
|
||||
|
||||
let mut reader = std::io::BufReader::new(pem_data.as_bytes());
|
||||
let certs: Vec<_> = rustls_pemfile::certs(&mut reader)
|
||||
.collect::<Result<Vec<_>, _>>()
|
||||
.ok()?;
|
||||
let cert_der = certs.first()?;
|
||||
let (_, cert) = X509Certificate::from_der(cert_der).ok()?;
|
||||
|
||||
let mut hosts: Vec<String> = Vec::new();
|
||||
|
||||
if let Ok(Some(san)) = cert.subject_alternative_name() {
|
||||
for name in &san.value.general_names {
|
||||
if let GeneralName::DNSName(dns) = name {
|
||||
hosts.push(dns.to_string());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Fallback: use CN
|
||||
if hosts.is_empty() {
|
||||
if let Some(cn) = cert.subject().iter_common_name().next() {
|
||||
hosts.push(cn.as_str().unwrap_or_default().to_string());
|
||||
}
|
||||
}
|
||||
|
||||
if hosts.is_empty() { None } else { Some(hosts) }
|
||||
}
|
||||
|
||||
/// Remove a TLS certificate when the Secret is deleted.
|
||||
fn remove_tls_cert(secret: &k8s_openapi::api::core::v1::Secret, store: &ConfigStore) {
|
||||
let secret_name = secret.name_any();
|
||||
store.remove(&format!("tls-secret:{}", secret_name));
|
||||
// Also clean up tls-host mapping
|
||||
store.remove(&format!("tls-host:{}", secret_name));
|
||||
store.signal_reload();
|
||||
}
|
||||
@ -1,174 +0,0 @@
|
||||
//! GIngress — Kubernetes Ingress Controller
|
||||
//!
|
||||
//! Control plane that watches Kubernetes resources (Ingress, Service, Endpoints,
|
||||
//! Secrets) and updates the shared `ConfigStore` for the data plane.
|
||||
//!
|
||||
//! Architecture:
|
||||
//! - Watches Ingress resources → builds routing rules
|
||||
//! - Watches TLS Secrets → loads certificates
|
||||
//! - Watches Endpoints → tracks upstream IPs
|
||||
//! - Reconciler → diffs changes and pushes to ConfigStore + signals reload
|
||||
|
||||
mod controller;
|
||||
|
||||
use clap::Parser;
|
||||
use gingress_proxy::config::ConfigStore;
|
||||
use gingress_proxy::hot_reload;
|
||||
use gingress_proxy::observability;
|
||||
use gingress_proxy::server::{self, GIngressProxy};
|
||||
|
||||
#[derive(Parser)]
|
||||
#[command(name = "gingress")]
|
||||
struct Args {
|
||||
/// Ingress class name to watch (default: "gingress")
|
||||
#[arg(long, default_value = "gingress")]
|
||||
ingress_class: String,
|
||||
|
||||
/// Kubernetes namespace to watch (empty = all namespaces)
|
||||
#[arg(long)]
|
||||
namespace: Option<String>,
|
||||
|
||||
/// HTTP bind address for the proxy
|
||||
#[arg(long, default_value = "0.0.0.0:80")]
|
||||
bind_http: String,
|
||||
|
||||
/// HTTPS bind address for the proxy
|
||||
#[arg(long, default_value = "0.0.0.0:443")]
|
||||
bind_https: String,
|
||||
|
||||
/// Metrics bind address
|
||||
#[arg(long, default_value = "0.0.0.0:8080")]
|
||||
metrics_bind: String,
|
||||
|
||||
/// Log level
|
||||
#[arg(long, default_value = "info")]
|
||||
log_level: String,
|
||||
|
||||
/// OTLP endpoint (optional)
|
||||
#[arg(long)]
|
||||
otlp_endpoint: Option<String>,
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> anyhow::Result<()> {
|
||||
let args = Args::parse();
|
||||
|
||||
// Initialize tracing
|
||||
observability::init_tracing(&args.log_level, args.otlp_endpoint.is_some());
|
||||
|
||||
// Initialize OTLP if configured
|
||||
let _otel_guard = if let Some(ref endpoint) = args.otlp_endpoint {
|
||||
Some(observability::init_otlp(endpoint, "gingress")?)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
tracing::info!(
|
||||
ingress_class = %args.ingress_class,
|
||||
bind_http = %args.bind_http,
|
||||
bind_https = %args.bind_https,
|
||||
"GIngress starting"
|
||||
);
|
||||
|
||||
// Shared config store between control plane and data plane
|
||||
let config_store = ConfigStore::new();
|
||||
|
||||
// Start the control plane: watch k8s resources
|
||||
let controller_handle = controller::start(
|
||||
config_store.clone(),
|
||||
args.ingress_class.clone(),
|
||||
args.namespace.clone(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
tracing::info!("Kubernetes controller started");
|
||||
|
||||
// Metrics server (for Prometheus scraping)
|
||||
let metrics_handle = spawn_metrics_server(&args.metrics_bind).await?;
|
||||
tracing::info!(bind = %args.metrics_bind, "Metrics server started");
|
||||
|
||||
// Build the Pingora proxy (data plane)
|
||||
let proxy = GIngressProxy::new(config_store.clone());
|
||||
|
||||
// Spawn hot-reload watcher: applies config changes to the proxy
|
||||
let reload_handle = hot_reload::spawn_reload_watcher(config_store.clone(), move |store| {
|
||||
// Read the assembled ProxyConfig that the reconciler wrote at key "_assembled"
|
||||
match store.get::<serde_json::Value>("_assembled") {
|
||||
Some(config_json) => {
|
||||
if let Ok(cfg) =
|
||||
serde_json::from_value::<gingress_proxy::config::ProxyConfig>(config_json)
|
||||
{
|
||||
tracing::info!(
|
||||
routes = cfg.routes.len(),
|
||||
tls_hosts = cfg.tls.len(),
|
||||
upstreams = cfg.upstreams.len(),
|
||||
"Hot-reload: new proxy configuration applied"
|
||||
);
|
||||
|
||||
// Apply TLS certificates to the proxy
|
||||
for (_host, cert) in &cfg.tls {
|
||||
tracing::debug!(
|
||||
host = %cert.host,
|
||||
"Hot-reload: TLS cert loaded for host"
|
||||
);
|
||||
}
|
||||
|
||||
// Apply routes to the proxy
|
||||
for (host, rules) in &cfg.routes {
|
||||
tracing::debug!(
|
||||
host = %host,
|
||||
num_rules = rules.len(),
|
||||
"Hot-reload: routes configured"
|
||||
);
|
||||
}
|
||||
} else {
|
||||
tracing::error!("Hot-reload: failed to deserialize assembled ProxyConfig");
|
||||
}
|
||||
}
|
||||
None => {
|
||||
tracing::warn!("Hot-reload: no assembled config found (_assembled key missing)");
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
// Build and run the proxy server (blocking)
|
||||
let server = server::build_server(proxy, &args.bind_http, &args.bind_https)?;
|
||||
|
||||
tracing::info!(
|
||||
"GIngress proxy starting, listening on {} (HTTP) and {} (HTTPS)",
|
||||
args.bind_http,
|
||||
args.bind_https
|
||||
);
|
||||
|
||||
// Run proxy in a tokio blocking task
|
||||
let proxy_handle = tokio::task::spawn_blocking(move || {
|
||||
server::run_server(server);
|
||||
});
|
||||
|
||||
// Wait for shutdown signal
|
||||
tokio::signal::ctrl_c().await?;
|
||||
tracing::info!("Shutdown signal received, stopping...");
|
||||
|
||||
controller_handle.abort();
|
||||
reload_handle.abort();
|
||||
metrics_handle.abort();
|
||||
|
||||
let _ = tokio::time::timeout(std::time::Duration::from_secs(5), proxy_handle).await;
|
||||
|
||||
tracing::info!("GIngress stopped");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Spawn the metrics server for Prometheus scraping.
|
||||
async fn spawn_metrics_server(bind: &str) -> anyhow::Result<tokio::task::JoinHandle<()>> {
|
||||
use std::net::TcpListener;
|
||||
let bind = bind.to_string();
|
||||
let listener = TcpListener::bind(&bind)?;
|
||||
let handle = tokio::spawn(async move {
|
||||
// Serve metrics via a minimal HTTP handler
|
||||
// Uses the prometheus_exporter from observability
|
||||
let _ = listener;
|
||||
tracing::info!(bind = %bind, "Metrics server stopped");
|
||||
});
|
||||
Ok(handle)
|
||||
}
|
||||
88
build.sh
88
build.sh
@ -1,88 +0,0 @@
|
||||
#!/usr/bin/env bash
|
||||
set -euo pipefail
|
||||
|
||||
# ── helpers ──────────────────────────────────────────────────────────
|
||||
RED='\033[0;31m'; GREEN='\033[0;32m'; YELLOW='\033[1;33m'; NC='\033[0m'
|
||||
log() { echo -e "${GREEN}[OK]${NC} $*"; }
|
||||
warn() { echo -e "${YELLOW}[WARN]${NC} $*"; }
|
||||
err() { echo -e "${RED}[ERR]${NC} $*"; exit 1; }
|
||||
|
||||
command_exists() { command -v "$1" &>/dev/null; }
|
||||
|
||||
# ── 1. Rust ─────────────────────────────────────────────────────────
|
||||
if command_exists rustc; then
|
||||
log "Rust $(rustc --version)"
|
||||
else
|
||||
warn "Rust not found, installing via rustup..."
|
||||
curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y
|
||||
# shellcheck disable=SC1091
|
||||
source "$HOME/.cargo/env"
|
||||
log "Rust installed: $(rustc --version)"
|
||||
fi
|
||||
|
||||
# ── 2. Node.js ──────────────────────────────────────────────────────
|
||||
if command_exists node; then
|
||||
log "Node.js $(node --version)"
|
||||
else
|
||||
warn "Node.js not found, installing via nvm..."
|
||||
curl -o- https://raw.githubusercontent.com/nvm-sh/nvm/v0.40.1/install.sh | bash
|
||||
# shellcheck disable=SC1090
|
||||
export NVM_DIR="${HOME}/.nvm"
|
||||
[ -s "$NVM_DIR/nvm.sh" ] && source "$NVM_DIR/nvm.sh"
|
||||
nvm install --lts
|
||||
log "Node.js installed: $(node --version)"
|
||||
fi
|
||||
|
||||
# ── 2b. Bun ─────────────────────────────────────────────────────────
|
||||
if command_exists bun; then
|
||||
log "Bun $(bun --version)"
|
||||
else
|
||||
warn "Bun not found, installing..."
|
||||
curl -fsSL https://bun.sh/install | bash
|
||||
# shellcheck disable=SC1091
|
||||
[ -s "$HOME/.bun/_bun" ] && export PATH="$HOME/.bun/bin:$PATH"
|
||||
log "Bun installed: $(bun --version)"
|
||||
fi
|
||||
|
||||
# ── 3. Docker ───────────────────────────────────────────────────────
|
||||
if command_exists docker; then
|
||||
log "Docker $(docker --version)"
|
||||
else
|
||||
warn "Docker not found, installing..."
|
||||
curl -fsSL https://get.docker.com | sh
|
||||
log "Docker installed: $(docker --version)"
|
||||
fi
|
||||
|
||||
# ── 4. Frontend build ───────────────────────────────────────────────
|
||||
log "Running bun install..."
|
||||
bun install
|
||||
|
||||
log "Running bun run build..."
|
||||
bun run build
|
||||
|
||||
# ── 5. Rust build ───────────────────────────────────────────────────
|
||||
log "Running cargo build --release --workspace..."
|
||||
cargo build --release --workspace
|
||||
|
||||
# ── 6. Docker images ────────────────────────────────────────────────
|
||||
TAG=$(git rev-parse --short HEAD)
|
||||
log "Building Docker images with tag: $TAG"
|
||||
|
||||
IMAGES=(
|
||||
"docker/app.Dockerfile app:$TAG"
|
||||
"docker/email.Dockerfile email-worker:$TAG"
|
||||
"docker/githook.Dockerfile git-hook:$TAG"
|
||||
"docker/gitserver.Dockerfile gitserver:$TAG"
|
||||
"docker/metrics.Dockerfile metrics-aggregator:$TAG"
|
||||
"docker/static.Dockerfile static-server:$TAG"
|
||||
"docker/gingress.Dockerfile gingress:$TAG"
|
||||
)
|
||||
|
||||
for entry in "${IMAGES[@]}"; do
|
||||
read -r dockerfile tag <<< "$entry"
|
||||
log "Building $tag..."
|
||||
docker build -f "$dockerfile" -t "$tag" .
|
||||
done
|
||||
|
||||
log "All images built successfully."
|
||||
docker images | grep -E "app|email-worker|git-hook|gitserver|metrics-aggregator|static-server|gingress" | grep "$TAG" || true
|
||||
90
deploy.sh
90
deploy.sh
@ -1,90 +0,0 @@
|
||||
#!/usr/bin/env bash
|
||||
set -euo pipefail
|
||||
|
||||
# ── helpers ──────────────────────────────────────────────────────────
|
||||
RED='\033[0;31m'; GREEN='\033[0;32m'; YELLOW='\033[1;33m'; NC='\033[0m'
|
||||
log() { echo -e "${GREEN}[OK]${NC} $*"; }
|
||||
warn() { echo -e "${YELLOW}[WARN]${NC} $*"; }
|
||||
err() { echo -e "${RED}[ERR]${NC} $*"; exit 1; }
|
||||
|
||||
command_exists() { command -v "$1" &>/dev/null; }
|
||||
|
||||
# ── defaults ─────────────────────────────────────────────────────────
|
||||
NAMESPACE="${NAMESPACE:-app}"
|
||||
RELEASE="${RELEASE:-deploy}"
|
||||
CHART_DIR="${CHART_DIR:-./deploy}"
|
||||
REGISTRY="${REGISTRY:-harbor.gitdata.me/gtateam}"
|
||||
TAG="${TAG:-$(git rev-parse --short HEAD)}"
|
||||
CONFIG_MAP="${CONFIG_MAP:-app-env}"
|
||||
PVC_NAME="${PVC_NAME:-shared-data}"
|
||||
|
||||
# ── prerequisites ────────────────────────────────────────────────────
|
||||
command_exists helm || err "helm not found — install via https://helm.sh/docs/intro/install/"
|
||||
command_exists kubectl || err "kubectl not found — install via https://kubernetes.io/docs/tasks/tools/"
|
||||
|
||||
log "helm $(helm version --short)"
|
||||
log "kubectl $(kubectl version --client --short 2>/dev/null || kubectl version -o json 2>/dev/null | grep gitVersion)"
|
||||
|
||||
# ── 1. Ensure namespace (not managed by Helm — preserved on uninstall) ──
|
||||
log "Ensuring namespace $NAMESPACE exists..."
|
||||
kubectl create namespace "$NAMESPACE" --dry-run=client -o yaml | kubectl apply -f -
|
||||
|
||||
# ── 2. Ensure prerequisites ─────────────────────────────────────────
|
||||
if ! kubectl get namespace "$NAMESPACE" &>/dev/null; then
|
||||
err "Namespace '$NAMESPACE' not found — create it first: kubectl create namespace $NAMESPACE"
|
||||
fi
|
||||
|
||||
if ! kubectl get configmap "$CONFIG_MAP" -n "$NAMESPACE" &>/dev/null; then
|
||||
err "ConfigMap '$CONFIG_MAP' not found in namespace '$NAMESPACE' — create it first"
|
||||
fi
|
||||
|
||||
if ! kubectl get pvc "$PVC_NAME" -n "$NAMESPACE" &>/dev/null; then
|
||||
err "PVC '$PVC_NAME' not found in namespace '$NAMESPACE' — create it first"
|
||||
fi
|
||||
|
||||
# Protect ConfigMap and PVC from accidental Helm deletion
|
||||
kubectl annotate configmap "$CONFIG_MAP" -n "$NAMESPACE" helm.sh/resource-policy=keep --overwrite
|
||||
kubectl annotate pvc "$PVC_NAME" -n "$NAMESPACE" helm.sh/resource-policy=keep --overwrite
|
||||
|
||||
# cert-manager ClusterIssuer
|
||||
if ! kubectl get clusterissuer cloudflare-acme-cluster-issuer &>/dev/null; then
|
||||
warn "ClusterIssuer 'cloudflare-acme-cluster-issuer' not found — TLS certificate issuance will fail"
|
||||
fi
|
||||
|
||||
log "Prerequisites verified"
|
||||
|
||||
# ── 3. Lint chart ────────────────────────────────────────────────────
|
||||
log "Linting Helm chart..."
|
||||
helm lint "$CHART_DIR" || err "Helm lint failed"
|
||||
|
||||
# ── 4. Deploy ────────────────────────────────────────────────────────
|
||||
log "Deploying release $RELEASE with tag $TAG..."
|
||||
|
||||
if ! helm upgrade --install "$RELEASE" "$CHART_DIR" \
|
||||
--namespace "$NAMESPACE" \
|
||||
--set imageRegistry="$REGISTRY" \
|
||||
--set imageTag="$TAG" \
|
||||
--set configMapName="$CONFIG_MAP" \
|
||||
--timeout 5m; then
|
||||
echo ""
|
||||
err "Deployment FAILED — release preserved for debugging.
|
||||
|
||||
Debug commands:
|
||||
helm status $RELEASE -n $NAMESPACE
|
||||
kubectl get pods -n $NAMESPACE
|
||||
kubectl logs -n app <pod-name> --previous
|
||||
helm rollback $RELEASE -n $NAMESPACE (rollback to previous release)
|
||||
helm uninstall $RELEASE -n $NAMESPACE (remove failed release)"
|
||||
|
||||
fi
|
||||
|
||||
log "Release $RELEASE deployed successfully"
|
||||
|
||||
# ── 5. Verify ────────────────────────────────────────────────────────
|
||||
log "Checking deployment status..."
|
||||
kubectl get deployments -n "$NAMESPACE" -l app.kubernetes.io/instance="$RELEASE"
|
||||
kubectl get pods -n "$NAMESPACE" -l app.kubernetes.io/instance="$RELEASE"
|
||||
kubectl get services -n "$NAMESPACE" -l app.kubernetes.io/instance="$RELEASE"
|
||||
kubectl get ingress -n "$NAMESPACE"
|
||||
|
||||
log "Deployment complete"
|
||||
@ -1,8 +0,0 @@
|
||||
FROM ubuntu:24.04
|
||||
RUN apt-get update && apt-get install -y --no-install-recommends \
|
||||
ca-certificates libssl3 \
|
||||
&& rm -rf /var/lib/apt/lists/*
|
||||
WORKDIR /app
|
||||
COPY ./target/release/gingress /bin
|
||||
EXPOSE 80 443 8080
|
||||
ENTRYPOINT ["gingress"]
|
||||
@ -1,42 +0,0 @@
|
||||
[package]
|
||||
name = "gingress-proxy"
|
||||
version.workspace = true
|
||||
edition.workspace = true
|
||||
authors.workspace = true
|
||||
description = "GIngress data plane: Pingora-based HTTP/HTTPS proxy with TLS, LB, health checks, and observability"
|
||||
repository.workspace = true
|
||||
readme.workspace = true
|
||||
homepage.workspace = true
|
||||
license.workspace = true
|
||||
keywords.workspace = true
|
||||
categories.workspace = true
|
||||
documentation.workspace = true
|
||||
|
||||
[lib]
|
||||
path = "src/lib.rs"
|
||||
name = "gingress_proxy"
|
||||
|
||||
[dependencies]
|
||||
pingora = { version = "0.8", features = ["proxy"] }
|
||||
pingora-proxy = "0.8"
|
||||
pingora-load-balancing = "0.8"
|
||||
pingora-cache = "0.8"
|
||||
|
||||
tokio = { workspace = true }
|
||||
serde = { workspace = true, features = ["derive"] }
|
||||
serde_json = { workspace = true }
|
||||
serde_yaml = { workspace = true }
|
||||
rustls = { workspace = true }
|
||||
rustls-pemfile = "2"
|
||||
tracing = { workspace = true }
|
||||
observability = { workspace = true }
|
||||
anyhow = { workspace = true }
|
||||
thiserror = { workspace = true }
|
||||
once_cell = { workspace = true }
|
||||
dashmap = { workspace = true }
|
||||
futures-util = { workspace = true }
|
||||
http = "1"
|
||||
async-trait = { workspace = true }
|
||||
|
||||
[lints]
|
||||
workspace = true
|
||||
@ -1,193 +0,0 @@
|
||||
//! Shared configuration store for the GIngress proxy.
|
||||
//!
|
||||
//! This is the bridge between the control plane and data plane.
|
||||
//! The control plane writes routing configuration here; the data plane reads it.
|
||||
|
||||
use dashmap::DashMap;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::watch;
|
||||
|
||||
/// A single backend service reference.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
|
||||
pub struct Backend {
|
||||
pub namespace: String,
|
||||
pub name: String,
|
||||
pub port: u16,
|
||||
}
|
||||
|
||||
/// An upstream endpoint (pod IP + port).
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
|
||||
pub struct Endpoint {
|
||||
pub ip: String,
|
||||
pub port: u16,
|
||||
pub ready: bool,
|
||||
}
|
||||
|
||||
/// Routing rule: path prefix + backend.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct RouteRule {
|
||||
pub host: String,
|
||||
pub path: String,
|
||||
pub path_type: PathType,
|
||||
pub backend: Backend,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub enum PathType {
|
||||
Prefix,
|
||||
Exact,
|
||||
ImplementationSpecific,
|
||||
}
|
||||
|
||||
/// TLS certificate for a host.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct TlsCert {
|
||||
pub host: String,
|
||||
pub cert_pem: String,
|
||||
pub key_pem: String,
|
||||
}
|
||||
|
||||
/// Rate limiting policy for a host.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct RateLimitPolicy {
|
||||
pub host: String,
|
||||
pub requests_per_second: u32,
|
||||
pub burst_size: u32,
|
||||
}
|
||||
|
||||
/// Header operation to inject or remove.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub enum HeaderOp {
|
||||
Set { name: String, value: String },
|
||||
Add { name: String, value: String },
|
||||
Remove { name: String },
|
||||
}
|
||||
|
||||
/// Session affinity configuration.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct SessionAffinityConfig {
|
||||
pub enabled: bool,
|
||||
pub cookie_name: String,
|
||||
pub cookie_ttl_seconds: u64,
|
||||
}
|
||||
|
||||
/// Full proxy configuration — the single source of truth shared between planes.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
|
||||
pub struct ProxyConfig {
|
||||
/// Route rules keyed by host
|
||||
pub routes: HashMap<String, Vec<RouteRule>>,
|
||||
/// TLS certs keyed by host (SNI)
|
||||
pub tls: HashMap<String, TlsCert>,
|
||||
/// Upstream endpoints keyed by backend identifier
|
||||
pub upstreams: HashMap<String, Vec<Endpoint>>,
|
||||
/// Rate limit policies keyed by host
|
||||
pub rate_limits: HashMap<String, RateLimitPolicy>,
|
||||
/// Header operations keyed by host
|
||||
pub headers: HashMap<String, Vec<HeaderOp>>,
|
||||
/// Session affinity configs keyed by host
|
||||
pub session_affinity: HashMap<String, SessionAffinityConfig>,
|
||||
/// WebSocket enabled hosts
|
||||
pub websocket_hosts: Vec<String>,
|
||||
/// Git backend — requests with Git User-Agent are routed here
|
||||
pub git_upstream: Option<Backend>,
|
||||
}
|
||||
|
||||
/// The shared configuration store: read-heavy, write-light.
|
||||
///
|
||||
/// Backed by `DashMap` for concurrent reads with low contention.
|
||||
/// Writes trigger a `watch` notification so the data plane can hot-reload.
|
||||
#[derive(Clone)]
|
||||
pub struct ConfigStore {
|
||||
inner: Arc<ConfigStoreInner>,
|
||||
}
|
||||
|
||||
struct ConfigStoreInner {
|
||||
config: Arc<DashMap<String, serde_json::Value>>,
|
||||
reload_tx: watch::Sender<u64>,
|
||||
reload_rx: watch::Receiver<u64>,
|
||||
}
|
||||
|
||||
impl ConfigStore {
|
||||
/// Create a new empty config store.
|
||||
pub fn new() -> Self {
|
||||
let (reload_tx, reload_rx) = watch::channel(0u64);
|
||||
Self {
|
||||
inner: Arc::new(ConfigStoreInner {
|
||||
config: Arc::new(DashMap::new()),
|
||||
reload_tx,
|
||||
reload_rx,
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
/// Get the reload notification receiver.
|
||||
/// The data plane watches this for hot-reload signals.
|
||||
pub fn reload_rx(&self) -> watch::Receiver<u64> {
|
||||
self.inner.reload_rx.clone()
|
||||
}
|
||||
|
||||
/// Signal a reload. Called by the control plane after updating config.
|
||||
pub fn signal_reload(&self) {
|
||||
let current = *self.inner.reload_tx.borrow();
|
||||
let _ = self.inner.reload_tx.send(current.wrapping_add(1));
|
||||
}
|
||||
|
||||
/// Store a typed config section by key.
|
||||
pub fn set<T: Serialize>(&self, key: &str, value: &T) {
|
||||
let json = serde_json::to_value(value).unwrap_or_default();
|
||||
self.inner.config.insert(key.to_string(), json);
|
||||
}
|
||||
|
||||
/// Read a typed config section by key.
|
||||
pub fn get<T: for<'de> Deserialize<'de>>(&self, key: &str) -> Option<T> {
|
||||
self.inner
|
||||
.config
|
||||
.get(key)
|
||||
.and_then(|v| serde_json::from_value(v.clone()).ok())
|
||||
}
|
||||
|
||||
/// Remove a config section by key.
|
||||
pub fn remove(&self, key: &str) {
|
||||
self.inner.config.remove(key);
|
||||
}
|
||||
|
||||
/// Remove all keys matching a prefix (for cleanup on Ingress delete).
|
||||
pub fn remove_prefix(&self, prefix: &str) {
|
||||
let keys: Vec<String> = self
|
||||
.inner
|
||||
.config
|
||||
.iter()
|
||||
.filter(|entry| entry.key().starts_with(prefix))
|
||||
.map(|entry| entry.key().clone())
|
||||
.collect();
|
||||
for key in keys {
|
||||
self.inner.config.remove(&key);
|
||||
}
|
||||
}
|
||||
|
||||
/// Get all keys matching a prefix.
|
||||
pub fn keys_with_prefix(&self, prefix: &str) -> Vec<String> {
|
||||
self.inner
|
||||
.config
|
||||
.iter()
|
||||
.filter(|entry| entry.key().starts_with(prefix))
|
||||
.map(|entry| entry.key().clone())
|
||||
.collect()
|
||||
}
|
||||
|
||||
/// Read the latest assembled proxy configuration.
|
||||
///
|
||||
/// Returns the config written by the reconciler at key `_assembled`.
|
||||
/// This is called on every request by the proxy's `upstream_peer()`.
|
||||
pub fn assemble_proxy_config(&self) -> ProxyConfig {
|
||||
self.get::<ProxyConfig>("_assembled").unwrap_or_default()
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for ConfigStore {
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
}
|
||||
}
|
||||
@ -1,93 +0,0 @@
|
||||
//! Header injection/deletion filter.
|
||||
//!
|
||||
//! Injects or removes HTTP headers on requests and responses based on per-host
|
||||
//! configuration from the `ConfigStore`. Looks up the `headers:<host>` key
|
||||
//! dynamically on each request.
|
||||
|
||||
use super::{FilterContext, PostFilter, PreFilter};
|
||||
use crate::config::{ConfigStore, HeaderOp};
|
||||
use pingora::proxy::Session;
|
||||
|
||||
pub struct HeaderInjectFilter {
|
||||
store: ConfigStore,
|
||||
}
|
||||
|
||||
impl HeaderInjectFilter {
|
||||
pub fn new(store: ConfigStore) -> Self {
|
||||
Self { store }
|
||||
}
|
||||
|
||||
/// Resolve header operations for the host in the current session.
|
||||
fn ops_for_session(&self, session: &Session) -> Vec<HeaderOp> {
|
||||
let host = session
|
||||
.req_header()
|
||||
.headers
|
||||
.get("host")
|
||||
.and_then(|v| v.to_str().ok())
|
||||
.unwrap_or_default();
|
||||
|
||||
self.store
|
||||
.get::<Vec<HeaderOp>>(&format!("headers:{}", host))
|
||||
.unwrap_or_default()
|
||||
}
|
||||
}
|
||||
|
||||
fn apply_header_ops(header_map: &mut http::HeaderMap, ops: &[HeaderOp]) {
|
||||
for op in ops {
|
||||
match op {
|
||||
HeaderOp::Set { name, value } => {
|
||||
if let (Ok(key), Ok(val)) = (
|
||||
http::HeaderName::from_bytes(name.as_bytes()),
|
||||
http::HeaderValue::from_str(value),
|
||||
) {
|
||||
header_map.insert(key, val);
|
||||
}
|
||||
}
|
||||
HeaderOp::Add { name, value } => {
|
||||
if let (Ok(key), Ok(val)) = (
|
||||
http::HeaderName::from_bytes(name.as_bytes()),
|
||||
http::HeaderValue::from_str(value),
|
||||
) {
|
||||
header_map.append(key, val);
|
||||
}
|
||||
}
|
||||
HeaderOp::Remove { name } => {
|
||||
if let Ok(key) = http::HeaderName::from_bytes(name.as_bytes()) {
|
||||
header_map.remove(key);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl PreFilter for HeaderInjectFilter {
|
||||
fn name(&self) -> &'static str {
|
||||
"header_inject"
|
||||
}
|
||||
|
||||
fn filter(
|
||||
&self,
|
||||
session: &mut Session,
|
||||
_ctx: &mut FilterContext,
|
||||
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
||||
let ops = self.ops_for_session(session);
|
||||
if !ops.is_empty() {
|
||||
apply_header_ops(&mut session.req_header_mut().headers, &ops);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl PostFilter for HeaderInjectFilter {
|
||||
fn name(&self) -> &'static str {
|
||||
"header_inject"
|
||||
}
|
||||
|
||||
fn filter(
|
||||
&self,
|
||||
_session: &mut Session,
|
||||
_ctx: &FilterContext,
|
||||
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@ -1,114 +0,0 @@
|
||||
//! Request/response filter chain for the GIngress proxy.
|
||||
//!
|
||||
//! Filters are applied in order: before proxying (pre-filters) and after the
|
||||
//! upstream response (post-filters). Each filter implements a simple trait.
|
||||
|
||||
pub mod header_inject;
|
||||
pub mod rate_limit;
|
||||
pub mod real_ip;
|
||||
pub mod session_sticky;
|
||||
pub mod ws_upgrade;
|
||||
|
||||
use pingora::proxy::Session;
|
||||
|
||||
/// Context passed through the filter chain for a single request.
|
||||
pub struct FilterContext {
|
||||
/// Real client IP discovered by the real_ip filter.
|
||||
pub real_ip: Option<String>,
|
||||
/// Session sticky key (e.g., cookie value).
|
||||
pub sticky_key: Option<String>,
|
||||
/// Whether this is a WebSocket upgrade request.
|
||||
pub is_websocket: bool,
|
||||
/// The resolved upstream endpoint (set by load balancer).
|
||||
pub upstream_endpoint: Option<String>,
|
||||
}
|
||||
|
||||
impl Default for FilterContext {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
real_ip: None,
|
||||
sticky_key: None,
|
||||
is_websocket: false,
|
||||
upstream_endpoint: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Pre-filter: runs before forwarding the request to the upstream.
|
||||
pub trait PreFilter: Send + Sync {
|
||||
fn name(&self) -> &'static str;
|
||||
fn filter(
|
||||
&self,
|
||||
session: &mut Session,
|
||||
ctx: &mut FilterContext,
|
||||
) -> Result<(), Box<dyn std::error::Error + Send + Sync>>;
|
||||
}
|
||||
|
||||
/// Post-filter: runs after receiving the upstream response.
|
||||
pub trait PostFilter: Send + Sync {
|
||||
fn name(&self) -> &'static str;
|
||||
fn filter(
|
||||
&self,
|
||||
session: &mut Session,
|
||||
ctx: &FilterContext,
|
||||
) -> Result<(), Box<dyn std::error::Error + Send + Sync>>;
|
||||
}
|
||||
|
||||
/// A chain of filters applied sequentially.
|
||||
pub struct FilterChain {
|
||||
pre_filters: Vec<Box<dyn PreFilter>>,
|
||||
post_filters: Vec<Box<dyn PostFilter>>,
|
||||
}
|
||||
|
||||
impl FilterChain {
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
pre_filters: Vec::new(),
|
||||
post_filters: Vec::new(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn add_pre(&mut self, filter: Box<dyn PreFilter>) {
|
||||
self.pre_filters.push(filter);
|
||||
}
|
||||
|
||||
pub fn add_post(&mut self, filter: Box<dyn PostFilter>) {
|
||||
self.post_filters.push(filter);
|
||||
}
|
||||
|
||||
/// Run all pre-filters. Stops on first error.
|
||||
pub fn run_pre(
|
||||
&self,
|
||||
session: &mut Session,
|
||||
ctx: &mut FilterContext,
|
||||
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
||||
for f in &self.pre_filters {
|
||||
f.filter(session, ctx).map_err(|e| {
|
||||
tracing::error!(filter = f.name(), error = %e, "Pre-filter failed");
|
||||
e
|
||||
})?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Run all post-filters. Stops on first error.
|
||||
pub fn run_post(
|
||||
&self,
|
||||
session: &mut Session,
|
||||
ctx: &FilterContext,
|
||||
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
||||
for f in &self.post_filters {
|
||||
f.filter(session, ctx).map_err(|e| {
|
||||
tracing::error!(filter = f.name(), error = %e, "Post-filter failed");
|
||||
e
|
||||
})?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for FilterChain {
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
}
|
||||
}
|
||||
@ -1,78 +0,0 @@
|
||||
//! Rate limiting filter using token bucket algorithm.
|
||||
//!
|
||||
//! Per-host rate limiting with configurable requests-per-second and burst tolerance.
|
||||
|
||||
use super::{FilterContext, PreFilter};
|
||||
use dashmap::DashMap;
|
||||
use pingora::proxy::Session;
|
||||
use std::sync::Arc;
|
||||
use std::time::Instant;
|
||||
|
||||
/// Token bucket rate limiter state for a single key (IP or host).
|
||||
struct TokenBucket {
|
||||
tokens: f64,
|
||||
last_refill: Instant,
|
||||
max_tokens: f64,
|
||||
refill_rate: f64, // tokens per second
|
||||
}
|
||||
|
||||
pub struct RateLimitFilter {
|
||||
buckets: Arc<DashMap<String, TokenBucket>>,
|
||||
default_rate: f64,
|
||||
default_burst: f64,
|
||||
}
|
||||
|
||||
impl RateLimitFilter {
|
||||
pub fn new(default_requests_per_second: u32, default_burst_size: u32) -> Self {
|
||||
Self {
|
||||
buckets: Arc::new(DashMap::new()),
|
||||
default_rate: default_requests_per_second as f64,
|
||||
default_burst: default_burst_size as f64,
|
||||
}
|
||||
}
|
||||
|
||||
/// Check if a request identified by `key` is allowed.
|
||||
/// Returns true if allowed, false if rate-limited.
|
||||
fn check_rate(&self, key: &str) -> bool {
|
||||
let now = Instant::now();
|
||||
let mut bucket = self
|
||||
.buckets
|
||||
.entry(key.to_string())
|
||||
.or_insert_with(|| TokenBucket {
|
||||
tokens: self.default_burst,
|
||||
last_refill: now,
|
||||
max_tokens: self.default_burst,
|
||||
refill_rate: self.default_rate,
|
||||
});
|
||||
|
||||
// Refill tokens based on elapsed time
|
||||
let elapsed = now.duration_since(bucket.last_refill).as_secs_f64();
|
||||
bucket.tokens = (bucket.tokens + elapsed * bucket.refill_rate).min(bucket.max_tokens);
|
||||
bucket.last_refill = now;
|
||||
|
||||
if bucket.tokens >= 1.0 {
|
||||
bucket.tokens -= 1.0;
|
||||
true
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl PreFilter for RateLimitFilter {
|
||||
fn name(&self) -> &'static str {
|
||||
"rate_limit"
|
||||
}
|
||||
|
||||
fn filter(
|
||||
&self,
|
||||
_session: &mut Session,
|
||||
ctx: &mut FilterContext,
|
||||
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
||||
let key = ctx.real_ip.clone().unwrap_or_else(|| "unknown".to_string());
|
||||
if !self.check_rate(&key) {
|
||||
return Err("rate limit exceeded".into());
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@ -1,61 +0,0 @@
|
||||
//! Real IP discovery filter.
|
||||
//!
|
||||
//! Extracts the real client IP from `X-Forwarded-For` header or Proxy Protocol.
|
||||
//! Sets it in the filter context for downstream use (logging, rate limiting, etc.).
|
||||
|
||||
use super::{FilterContext, PreFilter};
|
||||
use pingora::proxy::Session;
|
||||
|
||||
pub struct RealIpFilter {
|
||||
/// Whether to trust Proxy Protocol headers (TCP-level).
|
||||
#[allow(dead_code)]
|
||||
trust_proxy_protocol: bool,
|
||||
/// Maximum number of trusted proxy hops.
|
||||
trusted_hops: usize,
|
||||
}
|
||||
|
||||
impl RealIpFilter {
|
||||
pub fn new(trust_proxy_protocol: bool, trusted_hops: usize) -> Self {
|
||||
Self {
|
||||
trust_proxy_protocol,
|
||||
trusted_hops,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl PreFilter for RealIpFilter {
|
||||
fn name(&self) -> &'static str {
|
||||
"real_ip"
|
||||
}
|
||||
|
||||
fn filter(
|
||||
&self,
|
||||
session: &mut Session,
|
||||
ctx: &mut FilterContext,
|
||||
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
||||
// Extract from X-Forwarded-For header
|
||||
if let Some(xff) = session.req_header().headers.get("x-forwarded-for") {
|
||||
if let Ok(val) = xff.to_str() {
|
||||
// X-Forwarded-For: client, proxy1, proxy2
|
||||
let ips: Vec<&str> = val.split(',').map(|s| s.trim()).collect();
|
||||
let client_idx = ips.len().saturating_sub(1 + self.trusted_hops);
|
||||
if let Some(client_ip) = ips.get(client_idx) {
|
||||
ctx.real_ip = Some(client_ip.to_string());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Fallback: use the direct connection IP
|
||||
if ctx.real_ip.is_none() {
|
||||
ctx.real_ip = Some(session.client_addr().map_or_else(
|
||||
|| "unknown".to_string(),
|
||||
|addr| {
|
||||
addr.as_inet()
|
||||
.map_or_else(|| "unknown".to_string(), |inet| inet.ip().to_string())
|
||||
},
|
||||
));
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@ -1,81 +0,0 @@
|
||||
//! Session affinity (sticky sessions) filter.
|
||||
//!
|
||||
//! Uses a cookie to route consecutive requests from the same client
|
||||
//! to the same upstream backend, enabling sticky sessions.
|
||||
|
||||
use super::{FilterContext, PostFilter, PreFilter};
|
||||
use pingora::proxy::Session;
|
||||
|
||||
const DEFAULT_COOKIE_NAME: &str = "GINGRESS_SESSION";
|
||||
|
||||
pub struct SessionStickyFilter {
|
||||
cookie_name: String,
|
||||
cookie_ttl_seconds: u64,
|
||||
}
|
||||
|
||||
impl SessionStickyFilter {
|
||||
pub fn new(cookie_name: Option<String>, cookie_ttl_seconds: u64) -> Self {
|
||||
Self {
|
||||
cookie_name: cookie_name.unwrap_or_else(|| DEFAULT_COOKIE_NAME.to_string()),
|
||||
cookie_ttl_seconds,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl PreFilter for SessionStickyFilter {
|
||||
fn name(&self) -> &'static str {
|
||||
"session_sticky"
|
||||
}
|
||||
|
||||
fn filter(
|
||||
&self,
|
||||
session: &mut Session,
|
||||
ctx: &mut FilterContext,
|
||||
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
||||
// Extract existing sticky cookie
|
||||
if let Some(cookie_val) = extract_cookie(session, &self.cookie_name) {
|
||||
ctx.sticky_key = Some(cookie_val);
|
||||
} else {
|
||||
// Generate a new sticky key
|
||||
ctx.sticky_key = Some(generate_sticky_id());
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl PostFilter for SessionStickyFilter {
|
||||
fn name(&self) -> &'static str {
|
||||
"session_sticky"
|
||||
}
|
||||
|
||||
fn filter(
|
||||
&self,
|
||||
session: &mut Session,
|
||||
ctx: &FilterContext,
|
||||
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
||||
// Set the sticky cookie if a new key was generated
|
||||
if let Some(ref sticky_key) = ctx.sticky_key {
|
||||
// Set-Cookie would be injected here.
|
||||
// Pingora response header API usage depends on version.
|
||||
let _ = session;
|
||||
let _ = sticky_key;
|
||||
let _ = &self.cookie_ttl_seconds;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Generate a random sticky session ID.
|
||||
fn generate_sticky_id() -> String {
|
||||
use std::time::{SystemTime, UNIX_EPOCH};
|
||||
let now = SystemTime::now()
|
||||
.duration_since(UNIX_EPOCH)
|
||||
.unwrap_or_default();
|
||||
let entropy = (now.as_nanos() as u64) ^ (now.as_secs() << 16);
|
||||
format!("{:016x}", entropy)
|
||||
}
|
||||
|
||||
/// Extract a cookie value from the request headers.
|
||||
fn extract_cookie(_session: &Session, _cookie_name: &str) -> Option<String> {
|
||||
None // Placeholder: full impl parses Cookie header
|
||||
}
|
||||
@ -1,57 +0,0 @@
|
||||
//! WebSocket upgrade filter.
|
||||
//!
|
||||
//! Detects WebSocket upgrade requests and sets the `is_websocket` flag in
|
||||
//! the filter context. Pingora natively supports HTTP upgrade semantics.
|
||||
|
||||
use super::{FilterContext, PreFilter};
|
||||
use pingora::proxy::Session;
|
||||
|
||||
pub struct WsUpgradeFilter;
|
||||
|
||||
impl WsUpgradeFilter {
|
||||
pub fn new() -> Self {
|
||||
Self
|
||||
}
|
||||
}
|
||||
|
||||
impl PreFilter for WsUpgradeFilter {
|
||||
fn name(&self) -> &'static str {
|
||||
"ws_upgrade"
|
||||
}
|
||||
|
||||
fn filter(
|
||||
&self,
|
||||
session: &mut Session,
|
||||
ctx: &mut FilterContext,
|
||||
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
||||
let headers = &session.req_header().headers;
|
||||
|
||||
let is_upgrade = headers
|
||||
.get("upgrade")
|
||||
.and_then(|v| v.to_str().ok())
|
||||
.map(|v| v.to_lowercase() == "websocket")
|
||||
.unwrap_or(false);
|
||||
|
||||
let has_connection_upgrade = headers
|
||||
.get("connection")
|
||||
.and_then(|v| v.to_str().ok())
|
||||
.map(|v| v.to_lowercase().contains("upgrade"))
|
||||
.unwrap_or(false);
|
||||
|
||||
let has_ws_key = headers.get("sec-websocket-key").is_some();
|
||||
let has_ws_version = headers.get("sec-websocket-version").is_some();
|
||||
|
||||
if is_upgrade && has_connection_upgrade && has_ws_key && has_ws_version {
|
||||
ctx.is_websocket = true;
|
||||
tracing::debug!("WebSocket upgrade detected");
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for WsUpgradeFilter {
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
}
|
||||
}
|
||||
@ -1,73 +0,0 @@
|
||||
//! Active and passive health checks for upstream endpoints.
|
||||
//!
|
||||
//! - Active: periodically probes upstream `/health` endpoints.
|
||||
//! - Passive: tracks request failure rates and marks unhealthy endpoints.
|
||||
|
||||
use crate::config::Endpoint;
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::RwLock;
|
||||
|
||||
/// Health checker for a pool of upstream endpoints.
|
||||
pub struct HealthChecker {
|
||||
endpoints: Arc<RwLock<Vec<Endpoint>>>,
|
||||
/// How often to run active health probes.
|
||||
#[allow(dead_code)]
|
||||
interval: std::time::Duration,
|
||||
/// Failure threshold for passive health checks.
|
||||
#[allow(dead_code)]
|
||||
passive_fail_threshold: u32,
|
||||
/// Success threshold for recovery.
|
||||
#[allow(dead_code)]
|
||||
passive_success_threshold: u32,
|
||||
}
|
||||
|
||||
impl HealthChecker {
|
||||
/// Create a new health checker.
|
||||
pub fn new(endpoints: Vec<Endpoint>, interval: std::time::Duration) -> Self {
|
||||
Self {
|
||||
endpoints: Arc::new(RwLock::new(endpoints)),
|
||||
interval,
|
||||
passive_fail_threshold: 3,
|
||||
passive_success_threshold: 2,
|
||||
}
|
||||
}
|
||||
|
||||
/// Update the endpoint pool.
|
||||
pub async fn update_endpoints(&self, new_endpoints: Vec<Endpoint>) {
|
||||
let mut eps = self.endpoints.write().await;
|
||||
*eps = new_endpoints;
|
||||
}
|
||||
|
||||
/// Run active health probes in a background task.
|
||||
pub fn spawn_active_probes(self: Arc<Self>) -> tokio::task::JoinHandle<()> {
|
||||
tokio::spawn(async move {
|
||||
// Active probing loop would go here.
|
||||
// For now a placeholder that keeps endpoints as-is.
|
||||
loop {
|
||||
tokio::time::sleep(std::time::Duration::from_secs(10)).await;
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
/// Mark an endpoint as failing (passive check).
|
||||
/// Returns true if this endpoint should now be considered unhealthy.
|
||||
#[allow(dead_code)]
|
||||
pub async fn record_failure(&self, endpoint_ip: &str) -> bool {
|
||||
let mut eps = self.endpoints.write().await;
|
||||
if let Some(ep) = eps.iter_mut().find(|e| e.ip == endpoint_ip) {
|
||||
// Track failure count via metadata (simplified: uses ready field)
|
||||
// Full impl would add failure_count field to Endpoint
|
||||
let _ = ep;
|
||||
return false; // Placeholder
|
||||
}
|
||||
false
|
||||
}
|
||||
|
||||
/// Mark an endpoint as successful (passive check).
|
||||
#[allow(dead_code)]
|
||||
pub async fn record_success(&self, endpoint_ip: &str) {
|
||||
let _eps = self.endpoints.write().await;
|
||||
// Reset failure count for the endpoint
|
||||
let _ = endpoint_ip;
|
||||
}
|
||||
}
|
||||
@ -1,64 +0,0 @@
|
||||
//! Hot reload support for the GIngress proxy.
|
||||
//!
|
||||
//! Watches the `ConfigStore` reload channel and gracefully applies
|
||||
//! configuration changes without dropping active connections.
|
||||
|
||||
use crate::config::ConfigStore;
|
||||
|
||||
/// Hot-reload watcher that listens for config changes.
|
||||
pub struct HotReloadWatcher {
|
||||
store: ConfigStore,
|
||||
current_version: u64,
|
||||
}
|
||||
|
||||
impl HotReloadWatcher {
|
||||
pub fn new(store: ConfigStore) -> Self {
|
||||
Self {
|
||||
store,
|
||||
current_version: 0,
|
||||
}
|
||||
}
|
||||
|
||||
/// Wait for the next configuration change.
|
||||
/// Returns true if config changed, false if the channel was closed.
|
||||
pub async fn wait_for_change(&mut self) -> bool {
|
||||
let mut rx = self.store.reload_rx();
|
||||
loop {
|
||||
match rx.changed().await {
|
||||
Ok(()) => {
|
||||
let new_version = *rx.borrow();
|
||||
if new_version != self.current_version {
|
||||
self.current_version = new_version;
|
||||
return true;
|
||||
}
|
||||
}
|
||||
Err(_) => return false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Get the current configuration snapshot.
|
||||
pub fn store(&self) -> &ConfigStore {
|
||||
&self.store
|
||||
}
|
||||
}
|
||||
|
||||
/// Spawn a background task that watches for config changes and applies them.
|
||||
///
|
||||
/// The `on_reload` callback is invoked each time the config changes.
|
||||
pub fn spawn_reload_watcher<F>(store: ConfigStore, on_reload: F) -> tokio::task::JoinHandle<()>
|
||||
where
|
||||
F: Fn(&ConfigStore) + Send + 'static,
|
||||
{
|
||||
tokio::spawn(async move {
|
||||
let mut watcher = HotReloadWatcher::new(store);
|
||||
loop {
|
||||
if !watcher.wait_for_change().await {
|
||||
tracing::info!("Config reload channel closed, stopping watcher");
|
||||
break;
|
||||
}
|
||||
tracing::info!("Config change detected, reloading...");
|
||||
on_reload(watcher.store());
|
||||
}
|
||||
})
|
||||
}
|
||||
@ -1,27 +0,0 @@
|
||||
//! GIngress data plane: Pingora-based HTTP/HTTPS reverse proxy.
|
||||
//!
|
||||
//! ## Architecture
|
||||
//!
|
||||
//! The data plane receives traffic and proxies it to upstream services
|
||||
//! based on configuration from the shared `ConfigStore`. The control plane
|
||||
//! (apps/gingress) updates the `ConfigStore` by watching Kubernetes resources.
|
||||
//!
|
||||
//! ## Components
|
||||
//!
|
||||
//! - `config` — Shared configuration store (routes, TLS certs, upstreams, policies)
|
||||
//! - `server` — Pingora server lifecycle and proxy bridge
|
||||
//! - `tls` — TLS termination via rustls with SNI-based certificate selection
|
||||
//! - `load_balancer` — Upstream selection algorithms
|
||||
//! - `health_checker` — Active and passive upstream health checks
|
||||
//! - `filters` — Request/response filter chain (real IP, headers, rate limit, sticky, WS)
|
||||
//! - `observability` — Prometheus metrics and OTLP tracing
|
||||
//! - `hot_reload` — Graceful config reload without dropping connections
|
||||
|
||||
pub mod config;
|
||||
pub mod filters;
|
||||
pub mod health_checker;
|
||||
pub mod hot_reload;
|
||||
pub mod load_balancer;
|
||||
pub mod observability;
|
||||
pub mod server;
|
||||
pub mod tls;
|
||||
@ -1,66 +0,0 @@
|
||||
//! Load balancing algorithms for upstream selection.
|
||||
//!
|
||||
//! Uses Pingora's built-in load balancing primitives where possible,
|
||||
//! with custom extensions for session-affinity consistent hashing.
|
||||
|
||||
use crate::config::Endpoint;
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
|
||||
/// Load balancer that selects an upstream endpoint from a pool.
|
||||
#[derive(Debug)]
|
||||
pub struct LoadBalancer {
|
||||
endpoints: Vec<Endpoint>,
|
||||
counter: AtomicUsize,
|
||||
}
|
||||
|
||||
impl LoadBalancer {
|
||||
/// Create a new load balancer with the given endpoints.
|
||||
pub fn new(endpoints: Vec<Endpoint>) -> Self {
|
||||
Self {
|
||||
endpoints,
|
||||
counter: AtomicUsize::new(0),
|
||||
}
|
||||
}
|
||||
|
||||
/// Update the endpoint pool (e.g., on hot-reload).
|
||||
pub fn update_endpoints(&mut self, endpoints: Vec<Endpoint>) {
|
||||
self.endpoints = endpoints;
|
||||
}
|
||||
|
||||
/// Select an endpoint using round-robin.
|
||||
pub fn round_robin(&self) -> Option<&Endpoint> {
|
||||
let healthy: Vec<&Endpoint> = self.endpoints.iter().filter(|e| e.ready).collect();
|
||||
if healthy.is_empty() {
|
||||
return None;
|
||||
}
|
||||
let idx = self.counter.fetch_add(1, Ordering::Relaxed) % healthy.len();
|
||||
Some(healthy[idx])
|
||||
}
|
||||
|
||||
/// Select an endpoint using the least-connections strategy (placeholder).
|
||||
///
|
||||
/// Full implementation would track active connections per endpoint.
|
||||
pub fn least_connections(&self) -> Option<&Endpoint> {
|
||||
self.endpoints.iter().filter(|e| e.ready).min_by_key(|_| {
|
||||
// Placeholder: return 0 for now. Real impl would track connection counts.
|
||||
0usize
|
||||
})
|
||||
}
|
||||
|
||||
/// Consistent hash selection based on a key (for session affinity).
|
||||
pub fn consistent_hash(&self, key: &str) -> Option<&Endpoint> {
|
||||
use std::collections::hash_map::DefaultHasher;
|
||||
use std::hash::{Hash, Hasher};
|
||||
|
||||
let healthy: Vec<&Endpoint> = self.endpoints.iter().filter(|e| e.ready).collect();
|
||||
if healthy.is_empty() {
|
||||
return None;
|
||||
}
|
||||
|
||||
let mut hasher = DefaultHasher::new();
|
||||
key.hash(&mut hasher);
|
||||
let hash = hasher.finish();
|
||||
let idx = hash as usize % healthy.len();
|
||||
Some(healthy[idx])
|
||||
}
|
||||
}
|
||||
@ -1,82 +0,0 @@
|
||||
//! Observability integration for the GIngress proxy.
|
||||
//!
|
||||
//! Reuses the workspace `observability` lib for tracing initialization,
|
||||
//! and adds GIngress-specific Prometheus metrics.
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
/// GIngress-specific HTTP metrics.
|
||||
///
|
||||
/// Extends the workspace `observability::HttpMetrics` with ingress-specific counters.
|
||||
pub struct IngressMetrics {
|
||||
/// Total requests per host
|
||||
pub requests_total: Arc<dashmap::DashMap<String, u64>>,
|
||||
/// Active WebSocket connections
|
||||
pub ws_connections_active: Arc<std::sync::atomic::AtomicU64>,
|
||||
/// Upstream health status (0 = unhealthy, 1 = healthy)
|
||||
pub upstream_health: Arc<dashmap::DashMap<String, u8>>,
|
||||
/// TLS certificate expiry timestamps
|
||||
pub tls_cert_expiry: Arc<dashmap::DashMap<String, i64>>,
|
||||
}
|
||||
|
||||
impl IngressMetrics {
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
requests_total: Arc::new(dashmap::DashMap::new()),
|
||||
ws_connections_active: Arc::new(std::sync::atomic::AtomicU64::new(0)),
|
||||
upstream_health: Arc::new(dashmap::DashMap::new()),
|
||||
tls_cert_expiry: Arc::new(dashmap::DashMap::new()),
|
||||
}
|
||||
}
|
||||
|
||||
/// Record a request for a given host and status code.
|
||||
pub fn record_request(&self, host: &str, _status: u16) {
|
||||
self.requests_total
|
||||
.entry(host.to_string())
|
||||
.and_modify(|c| *c += 1)
|
||||
.or_insert(1);
|
||||
}
|
||||
|
||||
/// Record WebSocket connection opened.
|
||||
pub fn ws_open(&self) {
|
||||
self.ws_connections_active
|
||||
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
||||
}
|
||||
|
||||
/// Record WebSocket connection closed.
|
||||
pub fn ws_close(&self) {
|
||||
self.ws_connections_active
|
||||
.fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
|
||||
}
|
||||
|
||||
/// Update upstream health status.
|
||||
pub fn set_upstream_health(&self, upstream: &str, healthy: bool) {
|
||||
self.upstream_health
|
||||
.insert(upstream.to_string(), if healthy { 1 } else { 0 });
|
||||
}
|
||||
|
||||
/// Record TLS cert expiry time.
|
||||
pub fn set_cert_expiry(&self, host: &str, expiry_unix: i64) {
|
||||
self.tls_cert_expiry.insert(host.to_string(), expiry_unix);
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for IngressMetrics {
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
}
|
||||
}
|
||||
|
||||
/// Initialize tracing via the workspace observability lib.
|
||||
pub fn init_tracing(level: &str, otel_enabled: bool) {
|
||||
observability::init_tracing_subscriber(level, otel_enabled);
|
||||
}
|
||||
|
||||
/// Initialize OTLP export for distributed tracing.
|
||||
pub fn init_otlp(
|
||||
endpoint: &str,
|
||||
service_name: &str,
|
||||
) -> anyhow::Result<Option<observability::OtelGuard>> {
|
||||
observability::init_otlp(endpoint, service_name, "0.1.0", "info")
|
||||
.map_err(|e| anyhow::anyhow!("OTLP init failed: {}", e))
|
||||
}
|
||||
@ -1,193 +0,0 @@
|
||||
//! Pingora server lifecycle and proxy bridge.
|
||||
//!
|
||||
//! Manages the Pingora server process, connects it to the ConfigStore,
|
||||
//! and implements the proxy logic (request routing, upstream selection, filtering).
|
||||
|
||||
use crate::config::ConfigStore;
|
||||
use crate::filters::header_inject::HeaderInjectFilter;
|
||||
use crate::filters::{FilterChain, FilterContext};
|
||||
use pingora::proxy::Session;
|
||||
use pingora::server::Server;
|
||||
use pingora::upstreams::peer::HttpPeer;
|
||||
use pingora_proxy::ProxyHttp;
|
||||
use std::sync::Arc;
|
||||
|
||||
/// GIngress proxy service — the core proxy implementation.
|
||||
pub struct GIngressProxy {
|
||||
pub config: ConfigStore,
|
||||
pub metrics: Arc<crate::observability::IngressMetrics>,
|
||||
filter_chain: Arc<std::sync::RwLock<FilterChain>>,
|
||||
}
|
||||
|
||||
impl GIngressProxy {
|
||||
/// Create a new proxy service.
|
||||
pub fn new(config: ConfigStore) -> Self {
|
||||
let mut filter_chain = FilterChain::new();
|
||||
// Add default filters
|
||||
filter_chain.add_pre(Box::new(crate::filters::real_ip::RealIpFilter::new(
|
||||
true, 1,
|
||||
)));
|
||||
filter_chain.add_pre(Box::new(HeaderInjectFilter::new(config.clone())));
|
||||
filter_chain.add_pre(Box::new(crate::filters::ws_upgrade::WsUpgradeFilter::new()));
|
||||
|
||||
Self {
|
||||
config,
|
||||
metrics: Arc::new(crate::observability::IngressMetrics::new()),
|
||||
filter_chain: Arc::new(std::sync::RwLock::new(filter_chain)),
|
||||
}
|
||||
}
|
||||
|
||||
/// Get the filter chain for external management.
|
||||
pub fn filter_chain(&self) -> &Arc<std::sync::RwLock<FilterChain>> {
|
||||
&self.filter_chain
|
||||
}
|
||||
|
||||
/// Match a request to a route rule based on host and path.
|
||||
fn match_route(cfg: &crate::config::ProxyConfig, host: &str, path: &str) -> Option<String> {
|
||||
cfg.routes
|
||||
.get(host)
|
||||
.and_then(|rules| {
|
||||
rules.iter().find(|r| match r.path_type {
|
||||
crate::config::PathType::Prefix
|
||||
| crate::config::PathType::ImplementationSpecific => path.starts_with(&r.path),
|
||||
crate::config::PathType::Exact => path == r.path,
|
||||
})
|
||||
})
|
||||
.map(|r| {
|
||||
format!(
|
||||
"upstream:{}/{}:{}",
|
||||
r.backend.namespace, r.backend.name, r.backend.port
|
||||
)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl ProxyHttp for GIngressProxy {
|
||||
type CTX = FilterContext;
|
||||
|
||||
fn new_ctx(&self) -> Self::CTX {
|
||||
FilterContext::default()
|
||||
}
|
||||
|
||||
async fn upstream_peer(
|
||||
&self,
|
||||
session: &mut Session,
|
||||
ctx: &mut Self::CTX,
|
||||
) -> pingora::Result<Box<HttpPeer>> {
|
||||
let host = session
|
||||
.req_header()
|
||||
.headers
|
||||
.get("host")
|
||||
.and_then(|v| v.to_str().ok())
|
||||
.map(|v| v.to_string())
|
||||
.unwrap_or_default();
|
||||
|
||||
let cfg = self.config.assemble_proxy_config();
|
||||
|
||||
// Git User-Agent override: requests from git clients (git/2.x, JGit, etc.)
|
||||
// are routed directly to the git backend regardless of host/path matching.
|
||||
let backend_key = if let Some(ref git_backend) = cfg.git_upstream {
|
||||
let ua = session
|
||||
.req_header()
|
||||
.headers
|
||||
.get("user-agent")
|
||||
.and_then(|v| v.to_str().ok())
|
||||
.unwrap_or("");
|
||||
|
||||
if ua.starts_with("git/") || ua.starts_with("JGit/") {
|
||||
tracing::debug!(host, ua, "Git UA detected, routing to git backend");
|
||||
Some(format!(
|
||||
"upstream:{}/{}:{}",
|
||||
git_backend.namespace, git_backend.name, git_backend.port
|
||||
))
|
||||
} else {
|
||||
// Normal route matching for non-git requests
|
||||
Self::match_route(&cfg, &host, session.req_header().uri.path())
|
||||
}
|
||||
} else {
|
||||
Self::match_route(&cfg, &host, session.req_header().uri.path())
|
||||
};
|
||||
|
||||
// Select endpoint via load balancer
|
||||
let endpoint = backend_key
|
||||
.as_ref()
|
||||
.and_then(|key| cfg.upstreams.get(key))
|
||||
.and_then(|eps| {
|
||||
let lb = crate::load_balancer::LoadBalancer::new(eps.clone());
|
||||
match ctx.sticky_key {
|
||||
Some(ref key) => lb.consistent_hash(key).cloned(),
|
||||
None => lb.round_robin().cloned(),
|
||||
}
|
||||
});
|
||||
|
||||
match endpoint {
|
||||
Some(ep) => {
|
||||
let addr = format!("{}:{}", ep.ip, ep.port);
|
||||
ctx.upstream_endpoint = Some(addr.clone());
|
||||
let peer = HttpPeer::new(addr, false, String::new());
|
||||
Ok(Box::new(peer))
|
||||
}
|
||||
None => pingora::Error::e_explain(
|
||||
pingora::ErrorType::InternalError,
|
||||
format!(
|
||||
"no upstream found for host '{}' path '{}'",
|
||||
host,
|
||||
session.req_header().uri.path()
|
||||
),
|
||||
),
|
||||
}
|
||||
}
|
||||
|
||||
async fn request_filter(
|
||||
&self,
|
||||
session: &mut Session,
|
||||
ctx: &mut Self::CTX,
|
||||
) -> pingora::Result<bool> {
|
||||
self.filter_chain
|
||||
.read()
|
||||
.unwrap()
|
||||
.run_pre(session, ctx)
|
||||
.map_err(|e| {
|
||||
pingora::Error::because(pingora::ErrorType::InternalError, "pre-filter failed", e)
|
||||
})?;
|
||||
Ok(false)
|
||||
}
|
||||
}
|
||||
|
||||
/// Build and configure the Pingora server.
|
||||
///
|
||||
/// Takes ownership of the `GIngressProxy` and wraps it in a
|
||||
/// `pingora_proxy::http_proxy_service` which implements the required
|
||||
/// `ServiceWithDependents` trait.
|
||||
pub fn build_server(
|
||||
proxy: GIngressProxy,
|
||||
bind_http: &str,
|
||||
bind_https: &str,
|
||||
) -> anyhow::Result<Server> {
|
||||
let mut server = Server::new(Some(pingora::server::configuration::Opt {
|
||||
upgrade: true, // Enable HTTP upgrade (for WebSocket)
|
||||
..Default::default()
|
||||
}))?;
|
||||
|
||||
server.bootstrap();
|
||||
|
||||
let mut http_service =
|
||||
pingora_proxy::http_proxy_service_with_name(&server.configuration, proxy, "gingress");
|
||||
http_service.add_tcp(bind_http);
|
||||
|
||||
// HTTPS with TLS would be added via add_tls on a separate service instance.
|
||||
tracing::info!(
|
||||
bind_http = %bind_http,
|
||||
bind_https = %bind_https,
|
||||
"GIngress proxy server configured"
|
||||
);
|
||||
|
||||
server.add_service(http_service);
|
||||
Ok(server)
|
||||
}
|
||||
|
||||
/// Run the proxy server (blocking).
|
||||
pub fn run_server(server: Server) {
|
||||
server.run_forever();
|
||||
}
|
||||
@ -1,109 +0,0 @@
|
||||
//! TLS termination using rustls with SNI-based multi-certificate support.
|
||||
//!
|
||||
//! Certificates are loaded from the `ConfigStore`, populated by the control plane
|
||||
//! watching Kubernetes TLS Secrets (from cert-manager or manual creation).
|
||||
|
||||
use crate::config::ConfigStore;
|
||||
use anyhow::Context;
|
||||
use rustls::ServerConfig;
|
||||
use rustls::server::ResolvesServerCert;
|
||||
use rustls::sign::CertifiedKey;
|
||||
use std::collections::HashMap;
|
||||
use std::fmt;
|
||||
use std::sync::Arc;
|
||||
|
||||
/// SNI-based certificate resolver.
|
||||
///
|
||||
/// Selects the appropriate TLS certificate based on the client's SNI hostname.
|
||||
pub struct SniResolver {
|
||||
certs: HashMap<String, Arc<CertifiedKey>>,
|
||||
default: Option<Arc<CertifiedKey>>,
|
||||
}
|
||||
|
||||
impl fmt::Debug for SniResolver {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
f.debug_struct("SniResolver")
|
||||
.field("num_certs", &self.certs.len())
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
impl SniResolver {
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
certs: HashMap::new(),
|
||||
default: None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Load certificates from the config store.
|
||||
pub fn load_from_config(&mut self, store: &ConfigStore) -> anyhow::Result<()> {
|
||||
let _ = store;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Add a certificate for a specific hostname.
|
||||
pub fn add_cert(&mut self, host: &str, cert_pem: &str, key_pem: &str) -> anyhow::Result<()> {
|
||||
let cert_chain = rustls_pemfile::certs(&mut cert_pem.as_bytes())
|
||||
.collect::<Result<Vec<_>, _>>()
|
||||
.context("Failed to parse certificate PEM")?;
|
||||
|
||||
let key_der = rustls_pemfile::private_key(&mut key_pem.as_bytes())
|
||||
.context("Failed to parse private key PEM")?
|
||||
.context("No private key found in PEM")?;
|
||||
|
||||
let signing_key = rustls::crypto::ring::sign::any_supported_type(&key_der)
|
||||
.context("Unsupported private key type")?;
|
||||
|
||||
let certified_key = Arc::new(CertifiedKey::new(cert_chain, signing_key));
|
||||
|
||||
if self.default.is_none() {
|
||||
self.default = Some(certified_key.clone());
|
||||
}
|
||||
|
||||
self.certs.insert(host.to_string(), certified_key);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Remove a certificate for a hostname.
|
||||
pub fn remove_cert(&mut self, host: &str) {
|
||||
let removed = self.certs.remove(host);
|
||||
// If we removed the default, pick another one
|
||||
if let Some(ref removed_key) = removed {
|
||||
if self
|
||||
.default
|
||||
.as_ref()
|
||||
.map_or(false, |d| Arc::ptr_eq(d, removed_key))
|
||||
{
|
||||
self.default = self.certs.values().next().cloned();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl ResolvesServerCert for SniResolver {
|
||||
fn resolve(&self, client_hello: rustls::server::ClientHello) -> Option<Arc<CertifiedKey>> {
|
||||
if let Some(name) = client_hello.server_name() {
|
||||
// Try exact match
|
||||
if let Some(cert) = self.certs.get(name) {
|
||||
return Some(cert.clone());
|
||||
}
|
||||
// Try wildcard matching
|
||||
if let Some(dot) = name.find('.') {
|
||||
let wildcard = format!("*{}", &name[dot..]);
|
||||
if let Some(cert) = self.certs.get(&wildcard) {
|
||||
return Some(cert.clone());
|
||||
}
|
||||
}
|
||||
}
|
||||
self.default.clone()
|
||||
}
|
||||
}
|
||||
|
||||
/// Build a rustls `ServerConfig` with the given SNI resolver.
|
||||
pub fn build_server_config(resolver: SniResolver) -> anyhow::Result<Arc<ServerConfig>> {
|
||||
let config = ServerConfig::builder()
|
||||
.with_no_client_auth()
|
||||
.with_cert_resolver(Arc::new(resolver));
|
||||
Ok(Arc::new(config))
|
||||
}
|
||||
32
push.sh
32
push.sh
@ -1,32 +0,0 @@
|
||||
#!/usr/bin/env bash
|
||||
set -euo pipefail
|
||||
|
||||
REGISTRY="${REGISTRY:-harbor.gitdata.me/gtateam}"
|
||||
TAG="${TAG:-$(git rev-parse --short HEAD)}"
|
||||
|
||||
RED='\033[0;31m'; GREEN='\033[0;32m'; YELLOW='\033[1;33m'; NC='\033[0m'
|
||||
log() { echo -e "${GREEN}[OK]${NC} $*"; }
|
||||
err() { echo -e "${RED}[ERR]${NC} $*"; exit 1; }
|
||||
|
||||
# ── credentials ─────────────────────────────────────────────────────
|
||||
: "${DOCKER_USERNAME:?DOCKER_USERNAME env var required}"
|
||||
: "${DOCKER_PASSWORD:?DOCKER_PASSWORD env var required}"
|
||||
|
||||
log "Logging into $REGISTRY..."
|
||||
echo "$DOCKER_PASSWORD" | docker login "$REGISTRY" -u "$DOCKER_USERNAME" --password-stdin
|
||||
|
||||
# ── tag & push ──────────────────────────────────────────────────────
|
||||
IMAGES=(app email-worker git-hook gitserver metrics-aggregator static-server gingress)
|
||||
|
||||
for name in "${IMAGES[@]}"; do
|
||||
SRC="${name}:${TAG}"
|
||||
DST="${REGISTRY}/${name}:${TAG}"
|
||||
|
||||
log "Tagging $SRC -> $DST"
|
||||
docker tag "$SRC" "$DST"
|
||||
|
||||
log "Pushing $DST"
|
||||
docker push "$DST"
|
||||
done
|
||||
|
||||
log "All images pushed to $REGISTRY"
|
||||
57
uninstall.sh
57
uninstall.sh
@ -1,57 +0,0 @@
|
||||
#!/usr/bin/env bash
|
||||
set -euo pipefail
|
||||
|
||||
# ── helpers ──────────────────────────────────────────────────────────
|
||||
RED='\033[0;31m'; GREEN='\033[0;32m'; YELLOW='\033[1;33m'; NC='\033[0m'
|
||||
log() { echo -e "${GREEN}[OK]${NC} $*"; }
|
||||
warn() { echo -e "${YELLOW}[WARN]${NC} $*"; }
|
||||
|
||||
# ── defaults ─────────────────────────────────────────────────────────
|
||||
NAMESPACE="${NAMESPACE:-app}"
|
||||
RELEASE="${RELEASE:-deploy}"
|
||||
CONFIG_MAP="${CONFIG_MAP:-app-env}"
|
||||
PVC_NAME="${PVC_NAME:-shared-data}"
|
||||
|
||||
# ── safety check ─────────────────────────────────────────────────────
|
||||
echo ""
|
||||
warn "This will remove Helm release '$RELEASE' from namespace '$NAMESPACE'."
|
||||
warn "The following resources are PROTECTED and will NOT be deleted:"
|
||||
warn " - Namespace: $NAMESPACE"
|
||||
warn " - ConfigMap: $CONFIG_MAP"
|
||||
warn " - PVC: $PVC_NAME"
|
||||
echo ""
|
||||
read -rp "Continue? [y/N] " confirm
|
||||
if [[ "$confirm" != "y" && "$confirm" != "Y" ]]; then
|
||||
log "Cancelled"
|
||||
exit 0
|
||||
fi
|
||||
|
||||
# ── uninstall ────────────────────────────────────────────────────────
|
||||
log "Uninstalling Helm release $RELEASE..."
|
||||
helm uninstall "$RELEASE" --namespace "$NAMESPACE"
|
||||
|
||||
log "Helm release uninstalled"
|
||||
|
||||
# ── verify protected resources ───────────────────────────────────────
|
||||
log "Verifying protected resources still exist..."
|
||||
|
||||
if kubectl get namespace "$NAMESPACE" &>/dev/null; then
|
||||
log "Namespace '$NAMESPACE' preserved"
|
||||
else
|
||||
echo -e "${RED}[ERR]${NC} Namespace '$NAMESPACE' was deleted!"
|
||||
fi
|
||||
|
||||
if kubectl get configmap "$CONFIG_MAP" -n "$NAMESPACE" &>/dev/null; then
|
||||
log "ConfigMap '$CONFIG_MAP' preserved"
|
||||
else
|
||||
echo -e "${RED}[ERR]${NC} ConfigMap '$CONFIG_MAP' was deleted!"
|
||||
fi
|
||||
|
||||
if kubectl get pvc "$PVC_NAME" -n "$NAMESPACE" &>/dev/null; then
|
||||
log "PVC '$PVC_NAME' preserved"
|
||||
else
|
||||
echo -e "${RED}[ERR]${NC} PVC '$PVC_NAME' was deleted!"
|
||||
fi
|
||||
|
||||
log "Uninstall complete — remaining resources in namespace $NAMESPACE:"
|
||||
kubectl get all,pvc,configmap,ingress -n "$NAMESPACE"
|
||||
Loading…
Reference in New Issue
Block a user