From a531a997bbeda0228b0c113cdab44bcd71cf5581 Mon Sep 17 00:00:00 2001 From: Jean-Marc ANDRE Date: Sun, 6 Dec 2020 11:29:06 +0100 Subject: [PATCH] Returns stdout to the controller to update the status --- manifests/formolcli-rbac.yaml | 3 + src/backup/root.go | 92 ++++++++++++++++----- src/controllers/backupsession_controller.go | 55 ++++++++++-- src/create/root.go | 34 ++++++-- 4 files changed, 152 insertions(+), 32 deletions(-) diff --git a/manifests/formolcli-rbac.yaml b/manifests/formolcli-rbac.yaml index 5795222..51cb035 100644 --- a/manifests/formolcli-rbac.yaml +++ b/manifests/formolcli-rbac.yaml @@ -21,6 +21,9 @@ rules: - apiGroups: ["formol.desmojim.fr"] resources: ["backupsessions", "backupconfigurations"] verbs: ["get", "list", "watch"] + - apiGroups: ["formol.desmojim.fr"] + resources: ["backupsessions/status"] + verbs: ["update"] --- apiVersion: rbac.authorization.k8s.io/v1beta1 kind: ClusterRole diff --git a/src/backup/root.go b/src/backup/root.go index 61129f7..3043730 100644 --- a/src/backup/root.go +++ b/src/backup/root.go @@ -2,9 +2,12 @@ package backup import ( "strings" + "bufio" "os" "os/exec" - "log" + "go.uber.org/zap" + "github.com/go-logr/logr" + "github.com/go-logr/zapr" ) var ( @@ -13,31 +16,47 @@ var ( aws_access_key_id string aws_secret_access_key string resticExec = "/usr/local/bin/restic" + logger logr.Logger ) func init() { - if repository = os.Getenv("RESTIC_REPOSITORY"); repository == "" { - log.Fatal("RESTIC_REPOSITORY not set") - } - if passwordFile = os.Getenv("RESTIC_PASSWORD"); passwordFile == "" { - log.Fatal("RESTIC_PASSWORD not set") - } - if aws_access_key_id = os.Getenv("AWS_ACCESS_KEY_ID"); aws_access_key_id == "" { - log.Fatal("AWS_ACCESS_KEY_ID not set") - } - if aws_secret_access_key = os.Getenv("AWS_SECRET_ACCESS_KEY"); aws_secret_access_key == "" { - log.Fatal("AWS_SECRET_ACCESS_KEY not set") - } + zapLog, _ := zap.NewDevelopment() + logger = zapr.NewLogger(zapLog) + repository = os.Getenv("RESTIC_REPOSITORY") + passwordFile = os.Getenv("RESTIC_PASSWORD") + aws_access_key_id = os.Getenv("AWS_ACCESS_KEY_ID") + aws_secret_access_key = os.Getenv("AWS_SECRET_ACCESS_KEY") } func checkRepo(repo string) error { + log := logger.WithName("backup-checkrepo") cmd := exec.Command(resticExec, "check", "-r", repo) - cmd.Stderr = os.Stderr - cmd.Stdout = os.Stdout - err := cmd.Run() + stderr, err := cmd.StderrPipe() if err != nil { + log.Error(err, "unable to pipe stderr") + return err + } + if err := cmd.Start(); err != nil { + log.Error(err, "cannot start repo check") + return err + } + if err := cmd.Wait(); err != nil { + log.V(0).Info("initializing new repo", "repo", repo) cmd = exec.Command(resticExec, "init", "-r", repo) - err = cmd.Run() + if err := cmd.Start(); err != nil { + log.Error(err, "cannot start repo init") + return err + } + go func(){ + scanner := bufio.NewScanner(stderr) + for scanner.Scan() { + log.V(0).Info("and error happened", "stderr", scanner.Text()) + } + }() + if err := cmd.Wait(); err != nil { + log.Error(err, "something went wrong during repo init") + return err + } } return err } @@ -46,16 +65,47 @@ func BackupVolume(path string) error { return nil } -func BackupDeployment(prefix string, paths []string) error { +func BackupDeployment(prefix string, paths []string, c chan []byte) (error) { + log := logger.WithName("backup-deployment") newrepo := repository if prefix != "" { newrepo = repository + "/" + prefix } if err := checkRepo(newrepo); err != nil { - log.Fatal("unable to setup newrepo", "newrepo", newrepo) + log.Error(err, "unable to setup newrepo", "newrepo", newrepo) + return err + } + cmd := exec.Command(resticExec, "backup", "--json", "-r", newrepo, strings.Join(paths, " ")) + stderr, err := cmd.StderrPipe() + if err != nil { + log.Error(err, "unable to pipe stderr") + return err + } + stdout, err := cmd.StdoutPipe() + if err != nil { + log.Error(err, "unable to pipe stdout") + return err + } + if err := cmd.Start(); err != nil { + log.Error(err, "cannot start backup") + return err + } + go func(){ + scanner := bufio.NewScanner(stderr) + for scanner.Scan() { + log.V(0).Info("and error happened", "stderr", scanner.Text()) + } + }() + go func(c chan []byte){ + scanner := bufio.NewScanner(stdout) + for scanner.Scan() { + c <- scanner.Bytes() + } + }(c) + if err := cmd.Wait(); err != nil { + log.Error(err, "something went wrong during the backup") return err } - cmd := exec.Command(resticExec, "backup", "-r", newrepo, strings.Join(paths, " ")) - return cmd.Run() + return nil } diff --git a/src/controllers/backupsession_controller.go b/src/controllers/backupsession_controller.go index 400b1a4..f5ad38f 100644 --- a/src/controllers/backupsession_controller.go +++ b/src/controllers/backupsession_controller.go @@ -1,11 +1,14 @@ package controllers import ( + "time" + "encoding/json" "context" "github.com/go-logr/logr" "k8s.io/apimachinery/pkg/runtime" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/log/zap" "os" "path/filepath" @@ -22,14 +25,17 @@ var ( ) func init() { + log := zap.New(zap.UseDevMode(true)).WithName("init") + namespace := os.Getenv("POD_NAMESPACE") if namespace == "" { - panic("No POD_NAMESPACE env var") + return } config, err := rest.InClusterConfig() if err != nil { config, err = clientcmd.BuildConfigFromFlags("", filepath.Join(os.Getenv("HOME"), ".kube", "config",)) if err != nil { + log.Error(err, "unable to get config") panic(err.Error()) } } @@ -45,6 +51,7 @@ func init() { pod, err := clientset.CoreV1().Pods(namespace).Get(hostname, metav1.GetOptions{}) if err != nil { + log.Error(err, "unable to get pod") panic("unable to get pod") } @@ -81,6 +88,10 @@ func (r *BackupSessionReconciler) Reconcile(req ctrl.Request) (ctrl.Result, erro log.V(1).Info("backupSession.Namespace", "namespace", backupSession.Namespace) log.V(1).Info("backupSession.Spec.Ref.Name", "name", backupSession.Spec.Ref.Name) + if backupSession.Status.BackupSessionState != "" { + log.V(0).Info("State is not null. Skipping", "state", backupSession.Status.BackupSessionState) + return ctrl.Result{}, nil + } backupConf := &formolv1alpha1.BackupConfiguration{} if err := r.Get(ctx, client.ObjectKey{ Namespace: backupSession.Namespace, @@ -93,14 +104,48 @@ func (r *BackupSessionReconciler) Reconcile(req ctrl.Request) (ctrl.Result, erro log.V(1).Info("Found BackupConfiguration", "BackupConfiguration", backupConf) // Found the BackupConfiguration. + if backupConf.Spec.Target.Name != deploymentName { + log.V(0).Info("Not for us", "target", backupConf.Spec.Target.Name, "us", deploymentName) + return ctrl.Result{}, nil + } + log.V(0).Info("before", "backupsession", backupSession) + backupSession.Status.BackupSessionState = formolv1alpha1.Running + if err := r.Client.Status().Update(ctx, backupSession); err != nil { + log.Error(err, "unable to update status", "backupsession", backupSession) + return ctrl.Result{}, err + } + c := make(chan []byte) + + go func(){ + for msg := range c { + var dat map[string]interface{} + if err := json.Unmarshal(msg, &dat); err != nil { + log.Error(err, "unable to unmarshal json", "msg", msg) + continue + } + log.V(1).Info("message on stdout", "stdout", dat) + if message_type, ok := dat["message_type"]; ok && message_type == "summary"{ + backupSession.Status.SnapshotId = dat["snapshot_id"].(string) + backupSession.Status.Duration = &metav1.Duration{Duration : time.Duration(dat["total_duration"].(float64) * 1000) * time.Millisecond} + } + } + }() + result := formolv1alpha1.Failure + defer func() { + close(c) + backupSession.Status.BackupSessionState = result + if err := r.Status().Update(ctx, backupSession); err != nil { + log.Error(err, "unable to update status") + } + }() switch backupConf.Spec.Target.Kind { case "Deployment": - if err := backup.BackupDeployment("", backupConf.Spec.Paths); err != nil { + backupSession.Status.StartTime = &metav1.Time {Time: time.Now()} + if err := backup.BackupDeployment("", backupConf.Spec.Paths, c); err != nil { log.Error(err, "unable to backup deployment") - return ctrl.Result{}, nil + return ctrl.Result{}, err } - default: - return ctrl.Result{}, nil + result = formolv1alpha1.Success } return ctrl.Result{}, nil } diff --git a/src/create/root.go b/src/create/root.go index 48c4a25..f98e00c 100644 --- a/src/create/root.go +++ b/src/create/root.go @@ -1,7 +1,6 @@ package create import ( - "fmt" "strings" "time" "context" @@ -14,22 +13,43 @@ import ( "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/log/zap" + ctrl "sigs.k8s.io/controller-runtime" formolv1alpha1 "github.com/desmo999r/formol/api/v1alpha1" ) func CreateBackupSession(name string, namespace string) { - fmt.Println("CreateBackupSession called") + log := zap.New(zap.UseDevMode(true)).WithName("CreateBackupSession") + ctrl.SetLogger(log) + log.V(0).Info("CreateBackupSession called") config, err := rest.InClusterConfig() if err != nil { config, err = clientcmd.BuildConfigFromFlags("", filepath.Join(os.Getenv("HOME"), ".kube", "config",)) if err != nil { - panic(err.Error()) + log.Error(err, "unable to get config") + os.Exit(1) } } scheme := runtime.NewScheme() _ = formolv1alpha1.AddToScheme(scheme) _ = clientgoscheme.AddToScheme(scheme) cl, err := client.New(config, client.Options{Scheme: scheme}) + if err != nil { + log.Error(err, "unable to get client") + os.Exit(1) + } + + backupConfList := &formolv1alpha1.BackupConfigurationList{} + if err := cl.List(context.TODO(), backupConfList, client.InNamespace(namespace)); err != nil { + log.Error(err, "unable to get backupconf") + os.Exit(1) + } + backupConf := &formolv1alpha1.BackupConfiguration{} + for _, bc := range backupConfList.Items { + if bc.Name == name { + *backupConf = bc + } + } backupSession := &formolv1alpha1.BackupSession{ ObjectMeta: metav1.ObjectMeta{ @@ -43,10 +63,12 @@ func CreateBackupSession(name string, namespace string) { }, Status: formolv1alpha1.BackupSessionStatus{}, } - if err != nil { - panic(err.Error()) + if err := ctrl.SetControllerReference(backupConf, backupSession, scheme); err != nil { + log.Error(err, "unable to set controller reference") + os.Exit(1) } if err := cl.Create(context.TODO(), backupSession); err != nil { - panic(err.Error()) + log.Error(err, "unable to create backupsession") + os.Exit(1) } }