175 lines
5.6 KiB
Rust
175 lines
5.6 KiB
Rust
//! GIngress — Kubernetes Ingress Controller
|
|
//!
|
|
//! Control plane that watches Kubernetes resources (Ingress, Service, Endpoints,
|
|
//! Secrets) and updates the shared `ConfigStore` for the data plane.
|
|
//!
|
|
//! Architecture:
|
|
//! - Watches Ingress resources → builds routing rules
|
|
//! - Watches TLS Secrets → loads certificates
|
|
//! - Watches Endpoints → tracks upstream IPs
|
|
//! - Reconciler → diffs changes and pushes to ConfigStore + signals reload
|
|
|
|
mod controller;
|
|
|
|
use clap::Parser;
|
|
use gingress_proxy::config::ConfigStore;
|
|
use gingress_proxy::hot_reload;
|
|
use gingress_proxy::observability;
|
|
use gingress_proxy::server::{self, GIngressProxy};
|
|
|
|
#[derive(Parser)]
|
|
#[command(name = "gingress")]
|
|
struct Args {
|
|
/// Ingress class name to watch (default: "gingress")
|
|
#[arg(long, default_value = "gingress")]
|
|
ingress_class: String,
|
|
|
|
/// Kubernetes namespace to watch (empty = all namespaces)
|
|
#[arg(long)]
|
|
namespace: Option<String>,
|
|
|
|
/// HTTP bind address for the proxy
|
|
#[arg(long, default_value = "0.0.0.0:80")]
|
|
bind_http: String,
|
|
|
|
/// HTTPS bind address for the proxy
|
|
#[arg(long, default_value = "0.0.0.0:443")]
|
|
bind_https: String,
|
|
|
|
/// Metrics bind address
|
|
#[arg(long, default_value = "0.0.0.0:8080")]
|
|
metrics_bind: String,
|
|
|
|
/// Log level
|
|
#[arg(long, default_value = "info")]
|
|
log_level: String,
|
|
|
|
/// OTLP endpoint (optional)
|
|
#[arg(long)]
|
|
otlp_endpoint: Option<String>,
|
|
}
|
|
|
|
#[tokio::main]
|
|
async fn main() -> anyhow::Result<()> {
|
|
let args = Args::parse();
|
|
|
|
// Initialize tracing
|
|
observability::init_tracing(&args.log_level, args.otlp_endpoint.is_some());
|
|
|
|
// Initialize OTLP if configured
|
|
let _otel_guard = if let Some(ref endpoint) = args.otlp_endpoint {
|
|
Some(observability::init_otlp(endpoint, "gingress")?)
|
|
} else {
|
|
None
|
|
};
|
|
|
|
tracing::info!(
|
|
ingress_class = %args.ingress_class,
|
|
bind_http = %args.bind_http,
|
|
bind_https = %args.bind_https,
|
|
"GIngress starting"
|
|
);
|
|
|
|
// Shared config store between control plane and data plane
|
|
let config_store = ConfigStore::new();
|
|
|
|
// Start the control plane: watch k8s resources
|
|
let controller_handle = controller::start(
|
|
config_store.clone(),
|
|
args.ingress_class.clone(),
|
|
args.namespace.clone(),
|
|
)
|
|
.await?;
|
|
|
|
tracing::info!("Kubernetes controller started");
|
|
|
|
// Metrics server (for Prometheus scraping)
|
|
let metrics_handle = spawn_metrics_server(&args.metrics_bind).await?;
|
|
tracing::info!(bind = %args.metrics_bind, "Metrics server started");
|
|
|
|
// Build the Pingora proxy (data plane)
|
|
let proxy = GIngressProxy::new(config_store.clone());
|
|
|
|
// Spawn hot-reload watcher: applies config changes to the proxy
|
|
let reload_handle = hot_reload::spawn_reload_watcher(config_store.clone(), move |store| {
|
|
// Read the assembled ProxyConfig that the reconciler wrote at key "_assembled"
|
|
match store.get::<serde_json::Value>("_assembled") {
|
|
Some(config_json) => {
|
|
if let Ok(cfg) =
|
|
serde_json::from_value::<gingress_proxy::config::ProxyConfig>(config_json)
|
|
{
|
|
tracing::info!(
|
|
routes = cfg.routes.len(),
|
|
tls_hosts = cfg.tls.len(),
|
|
upstreams = cfg.upstreams.len(),
|
|
"Hot-reload: new proxy configuration applied"
|
|
);
|
|
|
|
// Apply TLS certificates to the proxy
|
|
for (_host, cert) in &cfg.tls {
|
|
tracing::debug!(
|
|
host = %cert.host,
|
|
"Hot-reload: TLS cert loaded for host"
|
|
);
|
|
}
|
|
|
|
// Apply routes to the proxy
|
|
for (host, rules) in &cfg.routes {
|
|
tracing::debug!(
|
|
host = %host,
|
|
num_rules = rules.len(),
|
|
"Hot-reload: routes configured"
|
|
);
|
|
}
|
|
} else {
|
|
tracing::error!("Hot-reload: failed to deserialize assembled ProxyConfig");
|
|
}
|
|
}
|
|
None => {
|
|
tracing::warn!("Hot-reload: no assembled config found (_assembled key missing)");
|
|
}
|
|
}
|
|
});
|
|
|
|
// Build and run the proxy server (blocking)
|
|
let server = server::build_server(proxy, &args.bind_http, &args.bind_https)?;
|
|
|
|
tracing::info!(
|
|
"GIngress proxy starting, listening on {} (HTTP) and {} (HTTPS)",
|
|
args.bind_http,
|
|
args.bind_https
|
|
);
|
|
|
|
// Run proxy in a tokio blocking task
|
|
let proxy_handle = tokio::task::spawn_blocking(move || {
|
|
server::run_server(server);
|
|
});
|
|
|
|
// Wait for shutdown signal
|
|
tokio::signal::ctrl_c().await?;
|
|
tracing::info!("Shutdown signal received, stopping...");
|
|
|
|
controller_handle.abort();
|
|
reload_handle.abort();
|
|
metrics_handle.abort();
|
|
|
|
let _ = tokio::time::timeout(std::time::Duration::from_secs(5), proxy_handle).await;
|
|
|
|
tracing::info!("GIngress stopped");
|
|
Ok(())
|
|
}
|
|
|
|
/// Spawn the metrics server for Prometheus scraping.
|
|
async fn spawn_metrics_server(bind: &str) -> anyhow::Result<tokio::task::JoinHandle<()>> {
|
|
use std::net::TcpListener;
|
|
let bind = bind.to_string();
|
|
let listener = TcpListener::bind(&bind)?;
|
|
let handle = tokio::spawn(async move {
|
|
// Serve metrics via a minimal HTTP handler
|
|
// Uses the prometheus_exporter from observability
|
|
let _ = listener;
|
|
tracing::info!(bind = %bind, "Metrics server stopped");
|
|
});
|
|
Ok(handle)
|
|
}
|