189 lines
5.0 KiB
Rust
189 lines
5.0 KiB
Rust
//! Kubernetes Controllers — one per CRD type.
|
|
|
|
pub mod app;
|
|
pub mod email_worker;
|
|
pub mod git_hook;
|
|
pub mod gitserver;
|
|
pub mod helpers;
|
|
pub mod migrate;
|
|
|
|
use crate::context::ReconcileCtx;
|
|
use crate::crd::{App, EmailWorker, GitHook, GitServer, Migrate};
|
|
use futures::StreamExt;
|
|
use kube::runtime::{Controller, controller::Action};
|
|
use std::sync::Arc;
|
|
|
|
fn error_policy<K: std::fmt::Debug>(
|
|
obj: Arc<K>,
|
|
err: &kube::Error,
|
|
_: Arc<ReconcileCtx>,
|
|
) -> Action {
|
|
tracing::error!(?obj, %err, "reconcile error");
|
|
Action::await_change()
|
|
}
|
|
|
|
/// Start the App controller.
|
|
pub async fn start_app(client: kube::Client, ctx: Arc<ReconcileCtx>) -> anyhow::Result<()> {
|
|
Controller::new(kube::Api::<App>::all(client.clone()), Default::default())
|
|
.owns::<k8s_openapi::api::apps::v1::Deployment>(
|
|
kube::Api::all(client.clone()),
|
|
Default::default(),
|
|
)
|
|
.owns::<k8s_openapi::api::core::v1::Service>(
|
|
kube::Api::all(client.clone()),
|
|
Default::default(),
|
|
)
|
|
.run(
|
|
|o, c| {
|
|
let c = c.clone();
|
|
async move {
|
|
app::reconcile(o, c).await?;
|
|
Ok::<_, kube::Error>(Action::await_change())
|
|
}
|
|
},
|
|
error_policy,
|
|
ctx.clone(),
|
|
)
|
|
.for_each(|r| async move {
|
|
if let Err(e) = r {
|
|
tracing::error!(%e, "app controller stream error");
|
|
}
|
|
})
|
|
.await;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
/// Start the GitServer controller.
|
|
pub async fn start_gitserver(client: kube::Client, ctx: Arc<ReconcileCtx>) -> anyhow::Result<()> {
|
|
Controller::new(
|
|
kube::Api::<GitServer>::all(client.clone()),
|
|
Default::default(),
|
|
)
|
|
.owns::<k8s_openapi::api::apps::v1::Deployment>(
|
|
kube::Api::all(client.clone()),
|
|
Default::default(),
|
|
)
|
|
.owns::<k8s_openapi::api::core::v1::Service>(kube::Api::all(client.clone()), Default::default())
|
|
.owns::<k8s_openapi::api::core::v1::PersistentVolumeClaim>(
|
|
kube::Api::all(client.clone()),
|
|
Default::default(),
|
|
)
|
|
.run(
|
|
|o, c| {
|
|
let c = c.clone();
|
|
async move {
|
|
gitserver::reconcile(o, c).await?;
|
|
Ok::<_, kube::Error>(Action::await_change())
|
|
}
|
|
},
|
|
error_policy,
|
|
ctx.clone(),
|
|
)
|
|
.for_each(|r| async move {
|
|
if let Err(e) = r {
|
|
tracing::error!(%e, "gitserver controller stream error");
|
|
}
|
|
})
|
|
.await;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
/// Start the EmailWorker controller.
|
|
pub async fn start_email_worker(
|
|
client: kube::Client,
|
|
ctx: Arc<ReconcileCtx>,
|
|
) -> anyhow::Result<()> {
|
|
Controller::new(
|
|
kube::Api::<EmailWorker>::all(client.clone()),
|
|
Default::default(),
|
|
)
|
|
.owns::<k8s_openapi::api::apps::v1::Deployment>(
|
|
kube::Api::all(client.clone()),
|
|
Default::default(),
|
|
)
|
|
.run(
|
|
|o, c| {
|
|
let c = c.clone();
|
|
async move {
|
|
email_worker::reconcile(o, c).await?;
|
|
Ok::<_, kube::Error>(Action::await_change())
|
|
}
|
|
},
|
|
error_policy,
|
|
ctx.clone(),
|
|
)
|
|
.for_each(|r| async move {
|
|
if let Err(e) = r {
|
|
tracing::error!(%e, "email_worker controller stream error");
|
|
}
|
|
})
|
|
.await;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
/// Start the GitHook controller.
|
|
pub async fn start_git_hook(client: kube::Client, ctx: Arc<ReconcileCtx>) -> anyhow::Result<()> {
|
|
Controller::new(
|
|
kube::Api::<GitHook>::all(client.clone()),
|
|
Default::default(),
|
|
)
|
|
.owns::<k8s_openapi::api::apps::v1::Deployment>(
|
|
kube::Api::all(client.clone()),
|
|
Default::default(),
|
|
)
|
|
.owns::<k8s_openapi::api::core::v1::ConfigMap>(
|
|
kube::Api::all(client.clone()),
|
|
Default::default(),
|
|
)
|
|
.run(
|
|
|o, c| {
|
|
let c = c.clone();
|
|
async move {
|
|
git_hook::reconcile(o, c).await?;
|
|
Ok::<_, kube::Error>(Action::await_change())
|
|
}
|
|
},
|
|
error_policy,
|
|
ctx.clone(),
|
|
)
|
|
.for_each(|r| async move {
|
|
if let Err(e) = r {
|
|
tracing::error!(%e, "git_hook controller stream error");
|
|
}
|
|
})
|
|
.await;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
/// Start the Migrate controller.
|
|
pub async fn start_migrate(client: kube::Client, ctx: Arc<ReconcileCtx>) -> anyhow::Result<()> {
|
|
Controller::new(
|
|
kube::Api::<Migrate>::all(client.clone()),
|
|
Default::default(),
|
|
)
|
|
.owns::<k8s_openapi::api::batch::v1::Job>(kube::Api::all(client.clone()), Default::default())
|
|
.run(
|
|
|o, c| {
|
|
let c = c.clone();
|
|
async move {
|
|
migrate::reconcile(o, c).await?;
|
|
Ok::<_, kube::Error>(Action::await_change())
|
|
}
|
|
},
|
|
error_policy,
|
|
ctx.clone(),
|
|
)
|
|
.for_each(|r| async move {
|
|
if let Err(e) = r {
|
|
tracing::error!(%e, "migrate controller stream error");
|
|
}
|
|
})
|
|
.await;
|
|
|
|
Ok(())
|
|
}
|