backup snapshot finalized
This commit is contained in:
parent
1f2baef062
commit
fac6d9b620
18
cmd/root.go
18
cmd/root.go
@ -26,6 +26,17 @@ var createBackupSessionCmd = &cobra.Command{
|
||||
},
|
||||
}
|
||||
|
||||
var backupCmd = &cobra.Command{
|
||||
Use: "backup",
|
||||
Short: "Backup paths",
|
||||
Run: func(cmd *cobra.Command, args []string) {
|
||||
backupSessionName, _ := cmd.Flags().GetString("name")
|
||||
backupSessionNamespace, _ := cmd.Flags().GetString("namespace")
|
||||
targetName, _ := cmd.Flags().GetString("target-name")
|
||||
standalone.BackupPaths(backupSessionName, backupSessionNamespace, targetName, args...)
|
||||
},
|
||||
}
|
||||
|
||||
var startRestoreSessionCmd = &cobra.Command{
|
||||
Use: "start",
|
||||
Short: "Restore a restic snapshot",
|
||||
@ -101,6 +112,7 @@ func init() {
|
||||
rootCmd.AddCommand(restoreSessionCmd)
|
||||
rootCmd.AddCommand(snapshotCmd)
|
||||
backupSessionCmd.AddCommand(createBackupSessionCmd)
|
||||
backupSessionCmd.AddCommand(backupCmd)
|
||||
restoreSessionCmd.AddCommand(startRestoreSessionCmd)
|
||||
snapshotCmd.AddCommand(deleteSnapshotCmd)
|
||||
rootCmd.AddCommand(startServerCmd)
|
||||
@ -108,6 +120,12 @@ func init() {
|
||||
createBackupSessionCmd.Flags().String("name", "", "The name of the BackupConfiguration containing the information about the backup.")
|
||||
createBackupSessionCmd.MarkFlagRequired("namespace")
|
||||
createBackupSessionCmd.MarkFlagRequired("name")
|
||||
backupCmd.Flags().String("target-name", "", "The name of target being restored")
|
||||
backupCmd.Flags().String("namespace", "", "The namespace of the BackupConfiguration containing the information about the backup.")
|
||||
backupCmd.Flags().String("name", "", "The name of the BackupConfiguration containing the information about the backup.")
|
||||
backupCmd.MarkFlagRequired("namespace")
|
||||
backupCmd.MarkFlagRequired("name")
|
||||
backupCmd.MarkFlagRequired("target-name")
|
||||
startRestoreSessionCmd.Flags().String("namespace", "", "The namespace of RestoreSession")
|
||||
startRestoreSessionCmd.Flags().String("name", "", "The name of RestoreSession")
|
||||
startRestoreSessionCmd.Flags().String("target-name", "", "The name of target being restored")
|
||||
|
||||
@ -16,6 +16,8 @@ import (
|
||||
|
||||
type BackupSessionReconciler struct {
|
||||
Session
|
||||
backupSession formolv1alpha1.BackupSession
|
||||
backupConf formolv1alpha1.BackupConfiguration
|
||||
}
|
||||
|
||||
func (r *BackupSessionReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
|
||||
@ -32,6 +34,7 @@ func (r *BackupSessionReconciler) Reconcile(ctx context.Context, req ctrl.Reques
|
||||
}
|
||||
return ctrl.Result{}, err
|
||||
}
|
||||
r.backupSession = backupSession
|
||||
if len(backupSession.Status.Targets) == 0 {
|
||||
// The main BackupSession controller hasn't assigned a backup task yet
|
||||
// Wait a bit
|
||||
@ -49,6 +52,7 @@ func (r *BackupSessionReconciler) Reconcile(ctx context.Context, req ctrl.Reques
|
||||
}
|
||||
return ctrl.Result{}, err
|
||||
}
|
||||
r.backupConf = backupConf
|
||||
|
||||
// we don't want a copy because we will modify and update it.
|
||||
var target formolv1alpha1.Target
|
||||
@ -107,7 +111,7 @@ func (r *BackupSessionReconciler) Reconcile(ctx context.Context, req ctrl.Reques
|
||||
}
|
||||
case formolv1alpha1.OnlineKind:
|
||||
backupPaths := strings.Split(os.Getenv(formolv1alpha1.BACKUP_PATHS), string(os.PathListSeparator))
|
||||
if backupResult, result := r.backupPaths(backupPaths); result != nil {
|
||||
if backupResult, result := r.BackupPaths(backupPaths); result != nil {
|
||||
r.Log.Error(result, "unable to backup paths", "target name", targetName, "paths", backupPaths)
|
||||
newSessionState = formolv1alpha1.Failure
|
||||
} else {
|
||||
@ -118,15 +122,14 @@ func (r *BackupSessionReconciler) Reconcile(ctx context.Context, req ctrl.Reques
|
||||
}
|
||||
case formolv1alpha1.SnapshotKind:
|
||||
if err := r.backupSnapshot(target); err != nil {
|
||||
switch err.(type) {
|
||||
case *NotReadyToUseError:
|
||||
if IsNotReadyToUse(err) {
|
||||
r.Log.V(0).Info("Volume snapshots are not ready. Requeueing")
|
||||
return ctrl.Result{
|
||||
Requeue: true,
|
||||
}, nil
|
||||
default:
|
||||
} else {
|
||||
r.Log.Error(err, "unable to do snapshot backup")
|
||||
// TODO: cleanup existing snapshots
|
||||
return ctrl.Result{}, err
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -138,10 +141,15 @@ func (r *BackupSessionReconciler) Reconcile(ctx context.Context, req ctrl.Reques
|
||||
if result = r.runFinalizeSteps(target); result != nil {
|
||||
r.Log.Error(err, "unable to run finalize steps")
|
||||
}
|
||||
if targetStatus.SnapshotId == "" {
|
||||
newSessionState = formolv1alpha1.Failure
|
||||
if target.BackupType == formolv1alpha1.SnapshotKind {
|
||||
// SnapshotKind special state where we wait for the backup Job to finish
|
||||
newSessionState = formolv1alpha1.WaitingForJob
|
||||
} else {
|
||||
newSessionState = formolv1alpha1.Success
|
||||
if targetStatus.SnapshotId == "" {
|
||||
newSessionState = formolv1alpha1.Failure
|
||||
} else {
|
||||
newSessionState = formolv1alpha1.Success
|
||||
}
|
||||
}
|
||||
case formolv1alpha1.Success:
|
||||
// Target backup is a success
|
||||
|
||||
@ -1,56 +1,20 @@
|
||||
package controllers
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"encoding/json"
|
||||
formolv1alpha1 "github.com/desmo999r/formol/api/v1alpha1"
|
||||
volumesnapshotv1 "github.com/kubernetes-csi/external-snapshotter/client/v6/apis/volumesnapshot/v1"
|
||||
"io"
|
||||
batchv1 "k8s.io/api/batch/v1"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"os"
|
||||
"os/exec"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
"strings"
|
||||
)
|
||||
|
||||
type BackupResult struct {
|
||||
SnapshotId string
|
||||
Duration float64
|
||||
}
|
||||
|
||||
func (r *BackupSessionReconciler) backupPaths(paths []string) (result BackupResult, err error) {
|
||||
if err = r.CheckRepo(); err != nil {
|
||||
r.Log.Error(err, "unable to setup repo", "repo", os.Getenv(formolv1alpha1.RESTIC_REPOSITORY))
|
||||
return
|
||||
}
|
||||
r.Log.V(0).Info("backing up paths", "paths", paths)
|
||||
cmd := exec.Command(RESTIC_EXEC, append([]string{"backup", "--json", "--tag", r.Name}, paths...)...)
|
||||
stdout, _ := cmd.StdoutPipe()
|
||||
stderr, _ := cmd.StderrPipe()
|
||||
_ = cmd.Start()
|
||||
|
||||
scanner := bufio.NewScanner(io.MultiReader(stdout, stderr))
|
||||
scanner.Split(bufio.ScanLines)
|
||||
var data map[string]interface{}
|
||||
for scanner.Scan() {
|
||||
if err := json.Unmarshal(scanner.Bytes(), &data); err != nil {
|
||||
r.Log.Error(err, "unable to unmarshal json", "data", scanner.Text())
|
||||
continue
|
||||
}
|
||||
switch data["message_type"].(string) {
|
||||
case "summary":
|
||||
result.SnapshotId = data["snapshot_id"].(string)
|
||||
result.Duration = data["total_duration"].(float64)
|
||||
case "status":
|
||||
r.Log.V(0).Info("backup running", "percent done", data["percent_done"].(float64))
|
||||
}
|
||||
}
|
||||
|
||||
err = cmd.Wait()
|
||||
return
|
||||
}
|
||||
const (
|
||||
JOBTTL int32 = 7200
|
||||
)
|
||||
|
||||
func (r *BackupSessionReconciler) backupJob(target formolv1alpha1.Target) (result BackupResult, err error) {
|
||||
paths := []string{}
|
||||
@ -72,11 +36,11 @@ func (r *BackupSessionReconciler) backupJob(target formolv1alpha1.Target) (resul
|
||||
paths = append(paths, container.SharePath)
|
||||
}
|
||||
}
|
||||
result, err = r.backupPaths(paths)
|
||||
result, err = r.BackupPaths(paths)
|
||||
return
|
||||
}
|
||||
|
||||
func (r *BackupSessionReconciler) backupSnapshot(target formolv1alpha1.Target) error {
|
||||
func (r *BackupSessionReconciler) backupSnapshot(target formolv1alpha1.Target) (e error) {
|
||||
targetObject, targetPodSpec := formolv1alpha1.GetTargetObjects(target.TargetKind)
|
||||
if err := r.Get(r.Context, client.ObjectKey{
|
||||
Namespace: r.Namespace,
|
||||
@ -93,16 +57,54 @@ func (r *BackupSessionReconciler) backupSnapshot(target formolv1alpha1.Target) e
|
||||
// replace the volumes in the container struct with the snapshot volumes
|
||||
// use formolv1alpha1.GetVolumeMounts to get the volume mounts for the Job
|
||||
// sidecar := formolv1alpha1.GetSidecar(backupConf, target)
|
||||
_, vms := formolv1alpha1.GetVolumeMounts(container, targetContainer)
|
||||
paths, vms := formolv1alpha1.GetVolumeMounts(container, targetContainer)
|
||||
if err := r.snapshotVolumes(vms, targetPodSpec); err != nil {
|
||||
if IsNotReadyToUse(err) {
|
||||
r.Log.V(0).Info("Some volumes are still not ready to use")
|
||||
defer func() { e = &NotReadyToUseError{} }()
|
||||
} else {
|
||||
r.Log.Error(err, "cannot snapshot the volumes")
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
r.Log.V(1).Info("Creating a Job to backup the Snapshot volumes")
|
||||
sidecar := formolv1alpha1.GetSidecar(r.backupConf, target)
|
||||
sidecar.Args = append([]string{"backupsession", "backup", "--namespace", r.Namespace, "--name", r.Name, "--target-name", target.TargetName}, paths...)
|
||||
sidecar.VolumeMounts = vms
|
||||
if env, err := r.getResticEnv(r.backupConf); err != nil {
|
||||
r.Log.Error(err, "unable to get restic env")
|
||||
return err
|
||||
} else {
|
||||
sidecar.Env = append(sidecar.Env, env...)
|
||||
}
|
||||
sidecar.Env = append(sidecar.Env, corev1.EnvVar{
|
||||
Name: formolv1alpha1.BACKUP_PATHS,
|
||||
Value: strings.Join(paths, string(os.PathListSeparator)),
|
||||
})
|
||||
job := batchv1.Job{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Namespace: r.Namespace,
|
||||
Name: "backupsnapshot-" + r.Name,
|
||||
},
|
||||
Spec: batchv1.JobSpec{
|
||||
TTLSecondsAfterFinished: func() *int32 { ttl := JOBTTL; return &ttl }(),
|
||||
Template: corev1.PodTemplateSpec{
|
||||
Spec: corev1.PodSpec{
|
||||
Volumes: targetPodSpec.Volumes,
|
||||
Containers: []corev1.Container{
|
||||
sidecar,
|
||||
},
|
||||
RestartPolicy: corev1.RestartPolicyNever,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
if err := r.Create(r.Context, &job); err != nil {
|
||||
r.Log.Error(err, "unable to create the snapshot volumes backup job", "job", job, "container", sidecar)
|
||||
return err
|
||||
}
|
||||
r.Log.V(1).Info("snapshot volumes backup job created", "job", job.Name)
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -166,6 +168,9 @@ func (r *BackupSessionReconciler) snapshotVolume(volume corev1.Volume) (*volumes
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Namespace: r.Namespace,
|
||||
Name: volumeSnapshotName,
|
||||
Labels: map[string]string{
|
||||
"backupsession": r.Name,
|
||||
},
|
||||
},
|
||||
Spec: volumesnapshotv1.VolumeSnapshotSpec{
|
||||
VolumeSnapshotClassName: &volumeSnapshotClass.Name,
|
||||
@ -210,6 +215,7 @@ func (r *BackupSessionReconciler) createVolumeFromSnapshot(vs *volumesnapshotv1.
|
||||
// The Volume does not exist. Create it.
|
||||
pv := corev1.PersistentVolume{}
|
||||
pvName, _ := strings.CutPrefix(vs.Name, strings.Join([]string{"vs", r.Name}, "-"))
|
||||
pvName = pvName[1:]
|
||||
if err = r.Get(r.Context, client.ObjectKey{
|
||||
Name: pvName,
|
||||
}, &pv); err != nil {
|
||||
@ -220,11 +226,17 @@ func (r *BackupSessionReconciler) createVolumeFromSnapshot(vs *volumesnapshotv1.
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Namespace: r.Namespace,
|
||||
Name: backupPVCName,
|
||||
Labels: map[string]string{
|
||||
"backupsession": r.Name,
|
||||
},
|
||||
},
|
||||
Spec: corev1.PersistentVolumeClaimSpec{
|
||||
StorageClassName: &pv.Spec.StorageClassName,
|
||||
//AccessModes: []corev1.PersistentVolumeAccessMode{corev1.ReadOnlyMany},
|
||||
AccessModes: pv.Spec.AccessModes,
|
||||
Resources: corev1.ResourceRequirements{
|
||||
Requests: pv.Spec.Capacity,
|
||||
},
|
||||
DataSource: &corev1.TypedLocalObjectReference{
|
||||
APIGroup: func() *string { s := "snapshot.storage.k8s.io"; return &s }(),
|
||||
Kind: "VolumeSnapshot",
|
||||
@ -260,6 +272,7 @@ func (r *BackupSessionReconciler) snapshotVolumes(vms []corev1.VolumeMount, podS
|
||||
return
|
||||
}
|
||||
if vs != nil {
|
||||
// The snapshot is ready. We create a PVC from it.
|
||||
backupPVCName, err := r.createVolumeFromSnapshot(vs)
|
||||
if err != nil {
|
||||
r.Log.Error(err, "unable to create volume from snapshot", "vs", vs)
|
||||
|
||||
@ -4,6 +4,7 @@ import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
formolv1alpha1 "github.com/desmo999r/formol/api/v1alpha1"
|
||||
"github.com/go-logr/logr"
|
||||
@ -23,13 +24,18 @@ import (
|
||||
|
||||
type Session struct {
|
||||
client.Client
|
||||
Log logr.Logger
|
||||
Scheme *runtime.Scheme
|
||||
context.Context
|
||||
Log logr.Logger
|
||||
Scheme *runtime.Scheme
|
||||
Namespace string
|
||||
Name string
|
||||
}
|
||||
|
||||
type BackupResult struct {
|
||||
SnapshotId string
|
||||
Duration float64
|
||||
}
|
||||
|
||||
const (
|
||||
RESTIC_EXEC = "/usr/bin/restic"
|
||||
)
|
||||
@ -40,7 +46,7 @@ func (s Session) getResticEnv(backupConf formolv1alpha1.BackupConfiguration) (en
|
||||
Namespace: backupConf.Namespace,
|
||||
Name: backupConf.Spec.Repository,
|
||||
}, &repo); err != nil {
|
||||
s.Log.Error(err, "unable to get repo")
|
||||
s.Log.Error(err, "unable to get repo", "backupconf", backupConf)
|
||||
return
|
||||
}
|
||||
if repo.Spec.Backend.S3 != nil {
|
||||
@ -94,6 +100,38 @@ func (s Session) CheckRepo() error {
|
||||
return err
|
||||
}
|
||||
|
||||
func (s Session) BackupPaths(paths []string) (result BackupResult, err error) {
|
||||
if err = s.CheckRepo(); err != nil {
|
||||
s.Log.Error(err, "unable to setup repo", "repo", os.Getenv(formolv1alpha1.RESTIC_REPOSITORY))
|
||||
return
|
||||
}
|
||||
s.Log.V(0).Info("backing up paths", "paths", paths)
|
||||
cmd := exec.Command(RESTIC_EXEC, append([]string{"backup", "--json", "--tag", s.Name}, paths...)...)
|
||||
stdout, _ := cmd.StdoutPipe()
|
||||
stderr, _ := cmd.StderrPipe()
|
||||
_ = cmd.Start()
|
||||
|
||||
scanner := bufio.NewScanner(io.MultiReader(stdout, stderr))
|
||||
scanner.Split(bufio.ScanLines)
|
||||
var data map[string]interface{}
|
||||
for scanner.Scan() {
|
||||
if err := json.Unmarshal(scanner.Bytes(), &data); err != nil {
|
||||
s.Log.Error(err, "unable to unmarshal json", "data", scanner.Text())
|
||||
continue
|
||||
}
|
||||
switch data["message_type"].(string) {
|
||||
case "summary":
|
||||
result.SnapshotId = data["snapshot_id"].(string)
|
||||
result.Duration = data["total_duration"].(float64)
|
||||
case "status":
|
||||
s.Log.V(0).Info("backup running", "percent done", data["percent_done"].(float64))
|
||||
}
|
||||
}
|
||||
|
||||
err = cmd.Wait()
|
||||
return
|
||||
}
|
||||
|
||||
func (s Session) getSecretData(name string) map[string][]byte {
|
||||
secret := corev1.Secret{}
|
||||
if err := s.Get(s.Context, client.ObjectKey{
|
||||
|
||||
2
formol
2
formol
@ -1 +1 @@
|
||||
Subproject commit 8975f77e5858ee167508ef0359c3b9d6cbaba6ee
|
||||
Subproject commit ea1c1bd2e31cc6f67621ed71659e738ca5f5d8c8
|
||||
@ -4,10 +4,12 @@ import (
|
||||
"context"
|
||||
formolv1alpha1 "github.com/desmo999r/formol/api/v1alpha1"
|
||||
"github.com/desmo999r/formolcli/controllers"
|
||||
volumesnapshotv1 "github.com/kubernetes-csi/external-snapshotter/client/v6/apis/volumesnapshot/v1"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
|
||||
"k8s.io/client-go/rest"
|
||||
"k8s.io/client-go/tools/clientcmd"
|
||||
@ -44,8 +46,9 @@ func init() {
|
||||
}
|
||||
}
|
||||
session.Scheme = runtime.NewScheme()
|
||||
_ = formolv1alpha1.AddToScheme(session.Scheme)
|
||||
_ = clientgoscheme.AddToScheme(session.Scheme)
|
||||
utilruntime.Must(formolv1alpha1.AddToScheme(session.Scheme))
|
||||
utilruntime.Must(volumesnapshotv1.AddToScheme(session.Scheme))
|
||||
utilruntime.Must(clientgoscheme.AddToScheme(session.Scheme))
|
||||
session.Client, err = client.New(config, client.Options{Scheme: session.Scheme})
|
||||
if err != nil {
|
||||
log.Error(err, "unable to get client")
|
||||
@ -53,6 +56,66 @@ func init() {
|
||||
}
|
||||
}
|
||||
|
||||
func BackupPaths(
|
||||
backupSessionName string,
|
||||
backupSessionNamespace string,
|
||||
targetName string,
|
||||
paths ...string) error {
|
||||
log := session.Log.WithName("BackupPaths")
|
||||
backupResult, err := session.BackupPaths(paths)
|
||||
log.V(0).Info("Backup Job is over", "target", targetName, "snapshotID", backupResult.SnapshotId, "duration", backupResult.Duration)
|
||||
if err != nil {
|
||||
log.Error(err, "unable to backup paths", "paths", paths)
|
||||
return err
|
||||
}
|
||||
backupSession := formolv1alpha1.BackupSession{}
|
||||
if err := session.Get(session.Context, client.ObjectKey{
|
||||
Name: backupSessionName,
|
||||
Namespace: backupSessionNamespace,
|
||||
}, &backupSession); err != nil {
|
||||
log.Error(err, "unable to get backupsession", "name", backupSessionName, "namespace", backupSessionNamespace)
|
||||
return err
|
||||
}
|
||||
for i, target := range backupSession.Status.Targets {
|
||||
if target.TargetName == targetName {
|
||||
backupSession.Status.Targets[i].SessionState = formolv1alpha1.Success
|
||||
backupSession.Status.Targets[i].SnapshotId = backupResult.SnapshotId
|
||||
backupSession.Status.Targets[i].Duration = &metav1.Duration{Duration: time.Now().Sub(backupSession.Status.Targets[i].StartTime.Time)}
|
||||
if err := session.Status().Update(session.Context, &backupSession); err != nil {
|
||||
log.Error(err, "unable to update backupSession status")
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
// Now find the PVC, VolumeSnapshots with the right label backupsession
|
||||
// and delete them
|
||||
vss := volumesnapshotv1.VolumeSnapshotList{}
|
||||
if err := session.List(session.Context, &vss, client.InNamespace(backupSessionNamespace), client.MatchingLabels{"backupsession": backupSessionName}); err != nil {
|
||||
log.Error(err, "unable to list the volumesnapshots", "backupsession", backupSessionName)
|
||||
return err
|
||||
}
|
||||
for _, vs := range vss.Items {
|
||||
if err := session.Delete(session.Context, &vs); err != nil {
|
||||
log.Error(err, "unable to delete volumesnapshot", "vs", vs.Name)
|
||||
return err
|
||||
}
|
||||
log.V(0).Info("volumesnapshot deleted", "vs", vs.Name)
|
||||
}
|
||||
pvcs := corev1.PersistentVolumeClaimList{}
|
||||
if err := session.List(session.Context, &pvcs, client.InNamespace(backupSessionNamespace), client.MatchingLabels{"backupsession": backupSessionName}); err != nil {
|
||||
log.Error(err, "unable to list the PVCs", "backupsession", backupSessionName)
|
||||
return err
|
||||
}
|
||||
for _, pvc := range pvcs.Items {
|
||||
if err := session.Delete(session.Context, &pvc); err != nil {
|
||||
log.Error(err, "unable to delete PVC", "pvc", pvc.Name)
|
||||
return err
|
||||
}
|
||||
log.V(0).Info("PVC deleted", "pvc", pvc.Name)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func StartRestore(
|
||||
restoreSessionName string,
|
||||
restoreSessionNamespace string,
|
||||
|
||||
Loading…
Reference in New Issue
Block a user