gitdataai/apps/operator/src/controller/mod.rs
2026-04-15 09:08:09 +08:00

189 lines
5.0 KiB
Rust

//! Kubernetes Controllers — one per CRD type.
pub mod app;
pub mod email_worker;
pub mod git_hook;
pub mod gitserver;
pub mod helpers;
pub mod migrate;
use crate::context::ReconcileCtx;
use crate::crd::{App, EmailWorker, GitHook, GitServer, Migrate};
use futures::StreamExt;
use kube::runtime::{Controller, controller::Action};
use std::sync::Arc;
fn error_policy<K: std::fmt::Debug>(
obj: Arc<K>,
err: &kube::Error,
_: Arc<ReconcileCtx>,
) -> Action {
tracing::error!(?obj, %err, "reconcile error");
Action::await_change()
}
/// Start the App controller.
pub async fn start_app(client: kube::Client, ctx: Arc<ReconcileCtx>) -> anyhow::Result<()> {
Controller::new(kube::Api::<App>::all(client.clone()), Default::default())
.owns::<k8s_openapi::api::apps::v1::Deployment>(
kube::Api::all(client.clone()),
Default::default(),
)
.owns::<k8s_openapi::api::core::v1::Service>(
kube::Api::all(client.clone()),
Default::default(),
)
.run(
|o, c| {
let c = c.clone();
async move {
app::reconcile(o, c).await?;
Ok::<_, kube::Error>(Action::await_change())
}
},
error_policy,
ctx.clone(),
)
.for_each(|r| async move {
if let Err(e) = r {
tracing::error!(%e, "app controller stream error");
}
})
.await;
Ok(())
}
/// Start the GitServer controller.
pub async fn start_gitserver(client: kube::Client, ctx: Arc<ReconcileCtx>) -> anyhow::Result<()> {
Controller::new(
kube::Api::<GitServer>::all(client.clone()),
Default::default(),
)
.owns::<k8s_openapi::api::apps::v1::Deployment>(
kube::Api::all(client.clone()),
Default::default(),
)
.owns::<k8s_openapi::api::core::v1::Service>(kube::Api::all(client.clone()), Default::default())
.owns::<k8s_openapi::api::core::v1::PersistentVolumeClaim>(
kube::Api::all(client.clone()),
Default::default(),
)
.run(
|o, c| {
let c = c.clone();
async move {
gitserver::reconcile(o, c).await?;
Ok::<_, kube::Error>(Action::await_change())
}
},
error_policy,
ctx.clone(),
)
.for_each(|r| async move {
if let Err(e) = r {
tracing::error!(%e, "gitserver controller stream error");
}
})
.await;
Ok(())
}
/// Start the EmailWorker controller.
pub async fn start_email_worker(
client: kube::Client,
ctx: Arc<ReconcileCtx>,
) -> anyhow::Result<()> {
Controller::new(
kube::Api::<EmailWorker>::all(client.clone()),
Default::default(),
)
.owns::<k8s_openapi::api::apps::v1::Deployment>(
kube::Api::all(client.clone()),
Default::default(),
)
.run(
|o, c| {
let c = c.clone();
async move {
email_worker::reconcile(o, c).await?;
Ok::<_, kube::Error>(Action::await_change())
}
},
error_policy,
ctx.clone(),
)
.for_each(|r| async move {
if let Err(e) = r {
tracing::error!(%e, "email_worker controller stream error");
}
})
.await;
Ok(())
}
/// Start the GitHook controller.
pub async fn start_git_hook(client: kube::Client, ctx: Arc<ReconcileCtx>) -> anyhow::Result<()> {
Controller::new(
kube::Api::<GitHook>::all(client.clone()),
Default::default(),
)
.owns::<k8s_openapi::api::apps::v1::Deployment>(
kube::Api::all(client.clone()),
Default::default(),
)
.owns::<k8s_openapi::api::core::v1::ConfigMap>(
kube::Api::all(client.clone()),
Default::default(),
)
.run(
|o, c| {
let c = c.clone();
async move {
git_hook::reconcile(o, c).await?;
Ok::<_, kube::Error>(Action::await_change())
}
},
error_policy,
ctx.clone(),
)
.for_each(|r| async move {
if let Err(e) = r {
tracing::error!(%e, "git_hook controller stream error");
}
})
.await;
Ok(())
}
/// Start the Migrate controller.
pub async fn start_migrate(client: kube::Client, ctx: Arc<ReconcileCtx>) -> anyhow::Result<()> {
Controller::new(
kube::Api::<Migrate>::all(client.clone()),
Default::default(),
)
.owns::<k8s_openapi::api::batch::v1::Job>(kube::Api::all(client.clone()), Default::default())
.run(
|o, c| {
let c = c.clone();
async move {
migrate::reconcile(o, c).await?;
Ok::<_, kube::Error>(Action::await_change())
}
},
error_policy,
ctx.clone(),
)
.for_each(|r| async move {
if let Err(e) = r {
tracing::error!(%e, "migrate controller stream error");
}
})
.await;
Ok(())
}