gitdataai/apps/operator/src/controller/git_hook.rs
2026-04-15 09:08:09 +08:00

138 lines
4.4 KiB
Rust

//! Controller for the `GitHook` CRD — Deployment + ConfigMap.
use crate::context::ReconcileState;
use crate::controller::app::{apply_deployment, patch_status};
use crate::controller::helpers::{child_meta, env_var_to_json, merge_env, owner_ref, query_deployment_status, std_labels};
use crate::crd::{GitHook, GitHookSpec, JsonResource};
use serde_json::{Value, json};
use std::sync::Arc;
use tracing::info;
pub async fn reconcile(gh: Arc<GitHook>, ctx: Arc<ReconcileState>) -> Result<(), kube::Error> {
let ns = gh.metadata.namespace.as_deref().unwrap_or("default");
let name = gh.metadata.name.as_deref().unwrap_or("");
let spec = &gh.spec;
let client = &ctx.client;
let or = owner_ref(&gh.metadata, &gh.api_version, &gh.kind);
let labels = std_labels();
let cm_name = format!("{}-config", name);
// ---- ConfigMap ----
let configmap = build_configmap(ns, &cm_name, &or, &labels);
apply_configmap(client, ns, &cm_name, &configmap).await?;
// ---- Deployment ----
let deployment = build_deployment(ns, name, &cm_name, spec, &or, &labels);
apply_deployment(client, ns, name, &deployment).await?;
let (ready_replicas, phase) = query_deployment_status(client, ns, name).await?;
let status = json!({ "status": { "readyReplicas": ready_replicas, "phase": phase } });
patch_status::<GitHook>(client, ns, name, &status).await?;
Ok(())
}
fn build_configmap(
ns: &str,
cm_name: &str,
or: &crate::crd::OwnerReference,
labels: &std::collections::BTreeMap<String, String>,
) -> Value {
let pool_config = serde_yaml::to_string(&serde_json::json!({
"max_concurrent": 8,
"cpu_threshold": 80.0,
"redis_list_prefix": "{hook}",
"redis_log_channel": "hook:logs",
"redis_block_timeout_secs": 5,
"redis_max_retries": 3,
}))
.unwrap_or_default();
json!({
"metadata": child_meta(cm_name, ns, or, labels.clone()),
"data": {
"pool.yaml": pool_config
}
})
}
fn build_deployment(
ns: &str,
name: &str,
cm_name: &str,
spec: &GitHookSpec,
or: &crate::crd::OwnerReference,
labels: &std::collections::BTreeMap<String, String>,
) -> Value {
let env = merge_env(&[], &spec.env);
let image = if spec.image.is_empty() {
"myapp/git-hook:latest".to_string()
} else {
spec.image.clone()
};
let pull = if spec.image_pull_policy.is_empty() {
"IfNotPresent".to_string()
} else {
spec.image_pull_policy.clone()
};
let resources = super::app::build_resources(&spec.resources);
// Add WORKER_ID env
let worker_id = spec
.worker_id
.clone()
.unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
let mut env_vars: Vec<serde_json::Value> = env.iter().map(env_var_to_json).collect();
env_vars.push(json!({ "name": "HOOK_POOL_WORKER_ID", "value": worker_id }));
json!({
"metadata": child_meta(name, ns, or, labels.clone()),
"spec": {
"replicas": 1,
"selector": { "matchLabels": labels },
"template": {
"metadata": { "labels": labels.clone() },
"spec": {
"containers": [{
"name": "git-hook",
"image": image,
"env": env_vars,
"imagePullPolicy": pull,
"resources": resources,
"volumeMounts": [{ "name": "hook-config", "mountPath": "/config" }]
}],
"volumes": [{
"name": "hook-config",
"configMap": { "name": cm_name }
}]
}
}
}
})
}
async fn apply_configmap(
client: &kube::Client,
ns: &str,
name: &str,
body: &Value,
) -> Result<(), kube::Error> {
let api: kube::Api<JsonResource> = kube::Api::namespaced(client.clone(), ns);
let jr = JsonResource::new(Default::default(), body.clone());
match api.get(name).await {
Ok(_) => {
let _ = api
.replace(name, &kube::api::PostParams::default(), &jr)
.await?;
Ok(())
}
Err(kube::Error::Api(e)) if e.code == 404 => {
info!(name, ns, "creating git-hook configmap");
let _ = api.create(&kube::api::PostParams::default(), &jr).await?;
Ok(())
}
Err(e) => Err(e),
}
}