Returns stdout to the controller to update the status

This commit is contained in:
Jean-Marc ANDRE 2020-12-06 11:29:06 +01:00
parent aa640afff2
commit a531a997bb
4 changed files with 152 additions and 32 deletions

View File

@ -21,6 +21,9 @@ rules:
- apiGroups: ["formol.desmojim.fr"]
resources: ["backupsessions", "backupconfigurations"]
verbs: ["get", "list", "watch"]
- apiGroups: ["formol.desmojim.fr"]
resources: ["backupsessions/status"]
verbs: ["update"]
---
apiVersion: rbac.authorization.k8s.io/v1beta1
kind: ClusterRole

View File

@ -2,9 +2,12 @@ package backup
import (
"strings"
"bufio"
"os"
"os/exec"
"log"
"go.uber.org/zap"
"github.com/go-logr/logr"
"github.com/go-logr/zapr"
)
var (
@ -13,31 +16,47 @@ var (
aws_access_key_id string
aws_secret_access_key string
resticExec = "/usr/local/bin/restic"
logger logr.Logger
)
func init() {
if repository = os.Getenv("RESTIC_REPOSITORY"); repository == "" {
log.Fatal("RESTIC_REPOSITORY not set")
}
if passwordFile = os.Getenv("RESTIC_PASSWORD"); passwordFile == "" {
log.Fatal("RESTIC_PASSWORD not set")
}
if aws_access_key_id = os.Getenv("AWS_ACCESS_KEY_ID"); aws_access_key_id == "" {
log.Fatal("AWS_ACCESS_KEY_ID not set")
}
if aws_secret_access_key = os.Getenv("AWS_SECRET_ACCESS_KEY"); aws_secret_access_key == "" {
log.Fatal("AWS_SECRET_ACCESS_KEY not set")
}
zapLog, _ := zap.NewDevelopment()
logger = zapr.NewLogger(zapLog)
repository = os.Getenv("RESTIC_REPOSITORY")
passwordFile = os.Getenv("RESTIC_PASSWORD")
aws_access_key_id = os.Getenv("AWS_ACCESS_KEY_ID")
aws_secret_access_key = os.Getenv("AWS_SECRET_ACCESS_KEY")
}
func checkRepo(repo string) error {
log := logger.WithName("backup-checkrepo")
cmd := exec.Command(resticExec, "check", "-r", repo)
cmd.Stderr = os.Stderr
cmd.Stdout = os.Stdout
err := cmd.Run()
stderr, err := cmd.StderrPipe()
if err != nil {
log.Error(err, "unable to pipe stderr")
return err
}
if err := cmd.Start(); err != nil {
log.Error(err, "cannot start repo check")
return err
}
if err := cmd.Wait(); err != nil {
log.V(0).Info("initializing new repo", "repo", repo)
cmd = exec.Command(resticExec, "init", "-r", repo)
err = cmd.Run()
if err := cmd.Start(); err != nil {
log.Error(err, "cannot start repo init")
return err
}
go func(){
scanner := bufio.NewScanner(stderr)
for scanner.Scan() {
log.V(0).Info("and error happened", "stderr", scanner.Text())
}
}()
if err := cmd.Wait(); err != nil {
log.Error(err, "something went wrong during repo init")
return err
}
}
return err
}
@ -46,16 +65,47 @@ func BackupVolume(path string) error {
return nil
}
func BackupDeployment(prefix string, paths []string) error {
func BackupDeployment(prefix string, paths []string, c chan []byte) (error) {
log := logger.WithName("backup-deployment")
newrepo := repository
if prefix != "" {
newrepo = repository + "/" + prefix
}
if err := checkRepo(newrepo); err != nil {
log.Fatal("unable to setup newrepo", "newrepo", newrepo)
log.Error(err, "unable to setup newrepo", "newrepo", newrepo)
return err
}
cmd := exec.Command(resticExec, "backup", "--json", "-r", newrepo, strings.Join(paths, " "))
stderr, err := cmd.StderrPipe()
if err != nil {
log.Error(err, "unable to pipe stderr")
return err
}
stdout, err := cmd.StdoutPipe()
if err != nil {
log.Error(err, "unable to pipe stdout")
return err
}
if err := cmd.Start(); err != nil {
log.Error(err, "cannot start backup")
return err
}
go func(){
scanner := bufio.NewScanner(stderr)
for scanner.Scan() {
log.V(0).Info("and error happened", "stderr", scanner.Text())
}
}()
go func(c chan []byte){
scanner := bufio.NewScanner(stdout)
for scanner.Scan() {
c <- scanner.Bytes()
}
}(c)
if err := cmd.Wait(); err != nil {
log.Error(err, "something went wrong during the backup")
return err
}
cmd := exec.Command(resticExec, "backup", "-r", newrepo, strings.Join(paths, " "))
return cmd.Run()
return nil
}

View File

@ -1,11 +1,14 @@
package controllers
import (
"time"
"encoding/json"
"context"
"github.com/go-logr/logr"
"k8s.io/apimachinery/pkg/runtime"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
"os"
"path/filepath"
@ -22,14 +25,17 @@ var (
)
func init() {
log := zap.New(zap.UseDevMode(true)).WithName("init")
namespace := os.Getenv("POD_NAMESPACE")
if namespace == "" {
panic("No POD_NAMESPACE env var")
return
}
config, err := rest.InClusterConfig()
if err != nil {
config, err = clientcmd.BuildConfigFromFlags("", filepath.Join(os.Getenv("HOME"), ".kube", "config",))
if err != nil {
log.Error(err, "unable to get config")
panic(err.Error())
}
}
@ -45,6 +51,7 @@ func init() {
pod, err := clientset.CoreV1().Pods(namespace).Get(hostname, metav1.GetOptions{})
if err != nil {
log.Error(err, "unable to get pod")
panic("unable to get pod")
}
@ -81,6 +88,10 @@ func (r *BackupSessionReconciler) Reconcile(req ctrl.Request) (ctrl.Result, erro
log.V(1).Info("backupSession.Namespace", "namespace", backupSession.Namespace)
log.V(1).Info("backupSession.Spec.Ref.Name", "name", backupSession.Spec.Ref.Name)
if backupSession.Status.BackupSessionState != "" {
log.V(0).Info("State is not null. Skipping", "state", backupSession.Status.BackupSessionState)
return ctrl.Result{}, nil
}
backupConf := &formolv1alpha1.BackupConfiguration{}
if err := r.Get(ctx, client.ObjectKey{
Namespace: backupSession.Namespace,
@ -93,14 +104,48 @@ func (r *BackupSessionReconciler) Reconcile(req ctrl.Request) (ctrl.Result, erro
log.V(1).Info("Found BackupConfiguration", "BackupConfiguration", backupConf)
// Found the BackupConfiguration.
if backupConf.Spec.Target.Name != deploymentName {
log.V(0).Info("Not for us", "target", backupConf.Spec.Target.Name, "us", deploymentName)
return ctrl.Result{}, nil
}
log.V(0).Info("before", "backupsession", backupSession)
backupSession.Status.BackupSessionState = formolv1alpha1.Running
if err := r.Client.Status().Update(ctx, backupSession); err != nil {
log.Error(err, "unable to update status", "backupsession", backupSession)
return ctrl.Result{}, err
}
c := make(chan []byte)
go func(){
for msg := range c {
var dat map[string]interface{}
if err := json.Unmarshal(msg, &dat); err != nil {
log.Error(err, "unable to unmarshal json", "msg", msg)
continue
}
log.V(1).Info("message on stdout", "stdout", dat)
if message_type, ok := dat["message_type"]; ok && message_type == "summary"{
backupSession.Status.SnapshotId = dat["snapshot_id"].(string)
backupSession.Status.Duration = &metav1.Duration{Duration : time.Duration(dat["total_duration"].(float64) * 1000) * time.Millisecond}
}
}
}()
result := formolv1alpha1.Failure
defer func() {
close(c)
backupSession.Status.BackupSessionState = result
if err := r.Status().Update(ctx, backupSession); err != nil {
log.Error(err, "unable to update status")
}
}()
switch backupConf.Spec.Target.Kind {
case "Deployment":
if err := backup.BackupDeployment("", backupConf.Spec.Paths); err != nil {
backupSession.Status.StartTime = &metav1.Time {Time: time.Now()}
if err := backup.BackupDeployment("", backupConf.Spec.Paths, c); err != nil {
log.Error(err, "unable to backup deployment")
return ctrl.Result{}, nil
return ctrl.Result{}, err
}
default:
return ctrl.Result{}, nil
result = formolv1alpha1.Success
}
return ctrl.Result{}, nil
}

View File

@ -1,7 +1,6 @@
package create
import (
"fmt"
"strings"
"time"
"context"
@ -14,22 +13,43 @@ import (
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
ctrl "sigs.k8s.io/controller-runtime"
formolv1alpha1 "github.com/desmo999r/formol/api/v1alpha1"
)
func CreateBackupSession(name string, namespace string) {
fmt.Println("CreateBackupSession called")
log := zap.New(zap.UseDevMode(true)).WithName("CreateBackupSession")
ctrl.SetLogger(log)
log.V(0).Info("CreateBackupSession called")
config, err := rest.InClusterConfig()
if err != nil {
config, err = clientcmd.BuildConfigFromFlags("", filepath.Join(os.Getenv("HOME"), ".kube", "config",))
if err != nil {
panic(err.Error())
log.Error(err, "unable to get config")
os.Exit(1)
}
}
scheme := runtime.NewScheme()
_ = formolv1alpha1.AddToScheme(scheme)
_ = clientgoscheme.AddToScheme(scheme)
cl, err := client.New(config, client.Options{Scheme: scheme})
if err != nil {
log.Error(err, "unable to get client")
os.Exit(1)
}
backupConfList := &formolv1alpha1.BackupConfigurationList{}
if err := cl.List(context.TODO(), backupConfList, client.InNamespace(namespace)); err != nil {
log.Error(err, "unable to get backupconf")
os.Exit(1)
}
backupConf := &formolv1alpha1.BackupConfiguration{}
for _, bc := range backupConfList.Items {
if bc.Name == name {
*backupConf = bc
}
}
backupSession := &formolv1alpha1.BackupSession{
ObjectMeta: metav1.ObjectMeta{
@ -43,10 +63,12 @@ func CreateBackupSession(name string, namespace string) {
},
Status: formolv1alpha1.BackupSessionStatus{},
}
if err != nil {
panic(err.Error())
if err := ctrl.SetControllerReference(backupConf, backupSession, scheme); err != nil {
log.Error(err, "unable to set controller reference")
os.Exit(1)
}
if err := cl.Create(context.TODO(), backupSession); err != nil {
panic(err.Error())
log.Error(err, "unable to create backupsession")
os.Exit(1)
}
}