gitdataai/apps/gingress/src/main.rs

175 lines
5.6 KiB
Rust

//! GIngress — Kubernetes Ingress Controller
//!
//! Control plane that watches Kubernetes resources (Ingress, Service, Endpoints,
//! Secrets) and updates the shared `ConfigStore` for the data plane.
//!
//! Architecture:
//! - Watches Ingress resources → builds routing rules
//! - Watches TLS Secrets → loads certificates
//! - Watches Endpoints → tracks upstream IPs
//! - Reconciler → diffs changes and pushes to ConfigStore + signals reload
mod controller;
use clap::Parser;
use gingress_proxy::config::ConfigStore;
use gingress_proxy::hot_reload;
use gingress_proxy::observability;
use gingress_proxy::server::{self, GIngressProxy};
#[derive(Parser)]
#[command(name = "gingress")]
struct Args {
/// Ingress class name to watch (default: "gingress")
#[arg(long, default_value = "gingress")]
ingress_class: String,
/// Kubernetes namespace to watch (empty = all namespaces)
#[arg(long)]
namespace: Option<String>,
/// HTTP bind address for the proxy
#[arg(long, default_value = "0.0.0.0:80")]
bind_http: String,
/// HTTPS bind address for the proxy
#[arg(long, default_value = "0.0.0.0:443")]
bind_https: String,
/// Metrics bind address
#[arg(long, default_value = "0.0.0.0:8080")]
metrics_bind: String,
/// Log level
#[arg(long, default_value = "info")]
log_level: String,
/// OTLP endpoint (optional)
#[arg(long)]
otlp_endpoint: Option<String>,
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let args = Args::parse();
// Initialize tracing
observability::init_tracing(&args.log_level, args.otlp_endpoint.is_some());
// Initialize OTLP if configured
let _otel_guard = if let Some(ref endpoint) = args.otlp_endpoint {
Some(observability::init_otlp(endpoint, "gingress")?)
} else {
None
};
tracing::info!(
ingress_class = %args.ingress_class,
bind_http = %args.bind_http,
bind_https = %args.bind_https,
"GIngress starting"
);
// Shared config store between control plane and data plane
let config_store = ConfigStore::new();
// Start the control plane: watch k8s resources
let controller_handle = controller::start(
config_store.clone(),
args.ingress_class.clone(),
args.namespace.clone(),
)
.await?;
tracing::info!("Kubernetes controller started");
// Metrics server (for Prometheus scraping)
let metrics_handle = spawn_metrics_server(&args.metrics_bind).await?;
tracing::info!(bind = %args.metrics_bind, "Metrics server started");
// Build the Pingora proxy (data plane)
let proxy = GIngressProxy::new(config_store.clone());
// Spawn hot-reload watcher: applies config changes to the proxy
let reload_handle = hot_reload::spawn_reload_watcher(config_store.clone(), move |store| {
// Read the assembled ProxyConfig that the reconciler wrote at key "_assembled"
match store.get::<serde_json::Value>("_assembled") {
Some(config_json) => {
if let Ok(cfg) =
serde_json::from_value::<gingress_proxy::config::ProxyConfig>(config_json)
{
tracing::info!(
routes = cfg.routes.len(),
tls_hosts = cfg.tls.len(),
upstreams = cfg.upstreams.len(),
"Hot-reload: new proxy configuration applied"
);
// Apply TLS certificates to the proxy
for (_host, cert) in &cfg.tls {
tracing::debug!(
host = %cert.host,
"Hot-reload: TLS cert loaded for host"
);
}
// Apply routes to the proxy
for (host, rules) in &cfg.routes {
tracing::debug!(
host = %host,
num_rules = rules.len(),
"Hot-reload: routes configured"
);
}
} else {
tracing::error!("Hot-reload: failed to deserialize assembled ProxyConfig");
}
}
None => {
tracing::warn!("Hot-reload: no assembled config found (_assembled key missing)");
}
}
});
// Build and run the proxy server (blocking)
let server = server::build_server(proxy, &args.bind_http, &args.bind_https)?;
tracing::info!(
"GIngress proxy starting, listening on {} (HTTP) and {} (HTTPS)",
args.bind_http,
args.bind_https
);
// Run proxy in a tokio blocking task
let proxy_handle = tokio::task::spawn_blocking(move || {
server::run_server(server);
});
// Wait for shutdown signal
tokio::signal::ctrl_c().await?;
tracing::info!("Shutdown signal received, stopping...");
controller_handle.abort();
reload_handle.abort();
metrics_handle.abort();
let _ = tokio::time::timeout(std::time::Duration::from_secs(5), proxy_handle).await;
tracing::info!("GIngress stopped");
Ok(())
}
/// Spawn the metrics server for Prometheus scraping.
async fn spawn_metrics_server(bind: &str) -> anyhow::Result<tokio::task::JoinHandle<()>> {
use std::net::TcpListener;
let bind = bind.to_string();
let listener = TcpListener::bind(&bind)?;
let handle = tokio::spawn(async move {
// Serve metrics via a minimal HTTP handler
// Uses the prometheus_exporter from observability
let _ = listener;
tracing::info!(bind = %bind, "Metrics server stopped");
});
Ok(handle)
}