Commit 7dc73f47 authored by David's avatar David

It makes a single run successfully

parent 90402347
......@@ -28,6 +28,7 @@ lazy val backupCoordinator = (project in file("."))
libraryDependencies ++= Seq(
akka, akkaStreams, akkaSlf4j, akkaTestkit, scalaTest, scalacticScalaTest, sshj, bouncycastle, jzlib,
) ++ log4j
) ++ log4j,
mainClass := Some("is.kow.backupcoordinator.Main")
)
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.7")
\ No newline at end of file
......@@ -2,4 +2,10 @@ akka {
loggers = ["akka.event.slf4j.Slf4jLogger"]
loglevel = "DEBUG"
logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
}
backup {
username = "root"
keyPath = "/dev/null"
keyPath = ${?BACKUP_KEY_PATH}
}
\ No newline at end of file
......@@ -2,6 +2,8 @@ package is.kow.backupcoordinator
import java.security.Security
import akka.actor.ActorSystem
import is.kow.backupcoordinator.actors.HostBackupActor
import org.bouncycastle.jce.provider.BouncyCastleProvider
object Main extends App {
......@@ -10,4 +12,27 @@ object Main extends App {
println("HELLO WORLD")
val system = ActorSystem("HostBackupSystem")
//Create a host to backup
val hostBackups = List(
HostBackup("malek.dark.kow.is",
"homedirs",
List("/home/rkowis", "/home/dkowis", "/home/git", "/home/znc")
),
//TODO: this one is kinda big, we should do something smaller.
HostBackup("kain.dark.kow.is",
"kainrepo",
List("/srv/budget", "/srv/pictures", "/srv/software", "/srv/music"))
)
val smallerHostBackups = List(
HostBackup("scm.dark.kow.is",
"gitrepos",
List("/home/gitbucket/.gitbucket"))
)
val hostBackupActor = system.actorOf(HostBackupActor.props(smallerHostBackups.head))
}
......@@ -8,7 +8,7 @@ import is.kow.backupcoordinator.actors.SSHConnectionActor.Connect
import net.schmizz.sshj.SSHClient
object HostBackupActor {
def props(hostBackup: HostBackup, statusActor: ActorRef): Props = Props(new HostBackupActor(hostBackup, statusActor))
def props(hostBackup: HostBackup): Props = Props(new HostBackupActor(hostBackup))
case object EstablishSSHSession
......@@ -16,7 +16,7 @@ object HostBackupActor {
}
class HostBackupActor(hostBackup: HostBackup, statusActor: ActorRef) extends Actor with ActorLogging {
class HostBackupActor(hostBackup: HostBackup, statusActor: Option[ActorRef] = None) extends Actor with ActorLogging {
import scala.concurrent.duration._
......@@ -33,7 +33,6 @@ class HostBackupActor(hostBackup: HostBackup, statusActor: ActorRef) extends Act
6. close SSH connection
*/
override def preStart(): Unit = {
//send myself a message to start up the SSH command
self ! EstablishSSHSession
......@@ -44,42 +43,50 @@ class HostBackupActor(hostBackup: HostBackup, statusActor: ActorRef) extends Act
//Start a tiny actor to do the actual establishment, so it can fail, and we can retry
val connectionActor = context.actorOf(SSHConnectionActor.props)
connectionActor ! Connect(hostBackup.hostname) //Assuming default port
context.setReceiveTimeout(3.seconds)
context.setReceiveTimeout(5.seconds)
context.become(awaitingConnection)
}
def awaitingConnection: Receive = {
case ReceiveTimeout =>
log.error("Did not receive connection in time! OH NOES")
throw new Exception("Did not receive connection in time!")
throw new Exception("Did not receive connection in time! This causes a restart")
case client: SSHClient =>
context.setReceiveTimeout(Duration.Undefined)
log.info("SSH Connection established")
//Have an ssh connection, time to send commands like make backup directory
val mkdirActor = context.actorOf(SSHCommandActor.props(client))
context.watch(mkdirActor)
mkdirActor ! ExecuteCommand("mkdir /mnt/auto-backup")
mkdirActor ! ExecuteCommand("mkdir -p /mnt/auto-backup")
context.become(awaitingMkdirComplete(client, mkdirActor))
context.setReceiveTimeout(1.second)
context.setReceiveTimeout(5.second)
}
def awaitingMkdirComplete(client: SSHClient, mkdirActor: ActorRef): Receive = {
case ReceiveTimeout =>
log.error("Unable to create directory in time! OH NOES")
throw new Exception("Did not create directory in time!")
//TODO: need to clean up!
case t@Terminated(`client`) =>
log.error(s"there was an exception and the actor died")
//TODO: need to clean up behind myself
case ExecutionCompleted(cmd, output, status) =>
case ExecutionCompleted(cmd, stdOut, stdErr, status) =>
context.setReceiveTimeout(Duration.Undefined)
log.info(s"Command Processed ($status): $cmd -- $output")
context.unwatch(mkdirActor)
log.info(
s"""
| MKDIR COMPLETE: Command Processed ($status): $cmd
| stdOut: $stdOut
| stdErr: $stdErr
""".stripMargin)
//Can move onto step three
val mountActor = context.actorOf(SSHCommandActor.props(client))
context.watch(mountActor)
mountActor ! ExecuteCommand("mount 10.10.220.92:/mnt/OldTank/backupTesting /mnt/auto-backup")
context.become(awaitingMountComplete(client, mountActor))
context.setReceiveTimeout(3.seconds)
context.setReceiveTimeout(5.seconds)
case ReceiveTimeout =>
log.error("Unable to create directory in time! OH NOES")
throw new Exception("Did not create directory in time!")
//TODO: need to clean up!
case t@Terminated(`mkdirActor`) =>
log.error(s"there was an exception and the actor died")
//TODO: need to clean up behind myself
}
def awaitingMountComplete(client: SSHClient, mountActor: ActorRef): Receive = {
......@@ -87,12 +94,22 @@ class HostBackupActor(hostBackup: HostBackup, statusActor: ActorRef) extends Act
log.error("Unable to mount nfs in time! OH NOES")
throw new Exception("Did mount nfs in time!")
//TODO: need to clean up behind me
case t@Terminated(`client`) =>
case t@Terminated(`mountActor`) =>
log.error(s"there was an exception and mounting command died")
case ExecutionCompleted(cmd, output, status) =>
case ExecutionCompleted(cmd, stdOut, stdErr, status) =>
context.unwatch(mountActor)
context.setReceiveTimeout(Duration.Undefined)
//Completed the call to mount the nfs directory
log.info(s"Command Processed ($status): $cmd -- $output")
if (status != 0) {
log.error("Mounting directory failed!!!!")
} else {
log.info(
s"""
| Mount Complete: Command Processed ($status): $cmd
| stdOut: $stdOut
| stdErr: $stdErr
""".stripMargin.trim)
}
//Step four!
doBackup(client, hostBackup.preBackupCommand, hostBackup.backupPaths, hostBackup.postBackupCommand)
......@@ -102,9 +119,9 @@ class HostBackupActor(hostBackup: HostBackup, statusActor: ActorRef) extends Act
case t@Terminated(`cmdActor`) =>
log.error("Pre-backup failed!")
//TODO: need to clean up hard
case ExecutionCompleted(cmd, output, status) =>
case ExecutionCompleted(cmd, stdOut, stdErr, status) =>
//Completed the call to do pre-backup commands
log.info(s"Command Processed ($status): $cmd -- $output")
log.info(s"Pre-Backup command complete! Command Processed ($status): $cmd -- $stdOut | $stdErr")
//Can now execute the actual backup command
doBackup(client, None, backupPaths, postBackup)
}
......@@ -113,9 +130,10 @@ class HostBackupActor(hostBackup: HostBackup, statusActor: ActorRef) extends Act
case t@Terminated(`cmdActor`) =>
log.error("Directory backup failed!")
//TODO: need to clean up hard
case ExecutionCompleted(cmd, output, status) =>
case ExecutionCompleted(cmd, stdOut, stdErr, status) =>
//A backup directory is completed!
log.info(s"Completed a directory backup!")
log.info(s"Directory Backup ${backupPaths.head}: Completed")
log.info(s"BACKUP RESULT: \n$stdOut")
doBackup(client, None, backupPaths.tail, postBackup)
}
......@@ -123,7 +141,7 @@ class HostBackupActor(hostBackup: HostBackup, statusActor: ActorRef) extends Act
case t@Terminated(`cmdActor`) =>
log.error("unmount failed!")
//TODO: need to clean up hard
case ExecutionCompleted(cmd, output, status) =>
case ExecutionCompleted(cmd, stdOut, stdErr, status) =>
log.info("ALL DONE, closing down")
//Done with this, now we can close our ssh connection
client.close() //Boom!
......@@ -137,7 +155,8 @@ class HostBackupActor(hostBackup: HostBackup, statusActor: ActorRef) extends Act
preBackupCmd ! ExecuteCommand(preBackup.get)
//TODO: what's a reasonable time for stopping? Should have some kind of tick to indicate processing
context.become(awaitingPreBackup(client, preBackupCmd, backupPaths, postBackup))
} else if (backupPaths.nonEmpty) {
}
else if (backupPaths.nonEmpty) {
//TODO: this assumes already initialized -- Need to add an initializer actor to handle that for me
//TODO: can check initialization with `borg info`
//have to build the backup command and issue one for each backup path
......@@ -146,12 +165,12 @@ class HostBackupActor(hostBackup: HostBackup, statusActor: ActorRef) extends Act
val backupName = path.split("/").last
val backupCmd =
s"""
| borg create \
| --info \
| --stats \
| --one-file-system \
| --compression zlib,7 \
| ${repoPath}::${backupName}-{now:%Y-%m-%d_%H:%M:%S} \
| borg create \\
| --info \\
| --stats \\
| --one-file-system \\
| --compression zlib,7 \\
| ${repoPath}::${backupName}-{now:%Y-%m-%d_%H:%M:%S} \\
| ${path}
|
""".stripMargin
......@@ -161,7 +180,9 @@ class HostBackupActor(hostBackup: HostBackup, statusActor: ActorRef) extends Act
backupDirectoryActor ! ExecuteCommand(backupCmd)
context.become(awaitingBackupDirectory(client, backupDirectoryActor, backupPaths, postBackup))
} else if (postBackup.nonEmpty) {
}
else if (postBackup.nonEmpty) {
log.error("TODO Post backup stuff isn't actually ever run")
//Backup paths should be empty at this stage
doBackup(client, None, backupPaths, None)
......@@ -173,6 +194,7 @@ class HostBackupActor(hostBackup: HostBackup, statusActor: ActorRef) extends Act
context.become(awaitingUnmount(client, actor))
}
}
}
......@@ -2,7 +2,7 @@ package is.kow.backupcoordinator.actors
import java.util.concurrent.{Executors, TimeUnit}
import akka.actor.{Actor, ActorLogging, Props}
import akka.actor.{Actor, ActorLogging, PoisonPill, Props}
import is.kow.backupcoordinator.actors.SSHCommandActor.{ExecuteCommand, ExecutionCompleted}
import net.schmizz.sshj.SSHClient
import net.schmizz.sshj.connection.channel.direct.Session
......@@ -19,7 +19,7 @@ object SSHCommandActor {
case class ExecuteCommand(command: String) extends SSHCommandActorMessages
case class ExecutionCompleted(command: ExecuteCommand, output: String, exitValue: Integer) extends SSHCommandActorMessages
case class ExecutionCompleted(command: ExecuteCommand, stdOut: String, stdErr: String, exitValue: Integer) extends SSHCommandActorMessages
}
......@@ -39,25 +39,30 @@ class SSHCommandActor(ssh: SSHClient) extends Actor with ActorLogging{
override def receive: Receive = {
case e: ExecuteCommand =>
//This is really the only thing we're going to work on
log.debug(s"Starting to execute command ${e.command}")
log.debug(s"Starting to execute command || ${e.command} ||")
val command = session.exec(e.command)
val replyTo = sender()
//This guy is fully synchronous and blocking so be sure to stick it into another thread
Future {
//Collect all the output
val output = Source.fromInputStream(command.getInputStream).mkString
val stdOut = Source.fromInputStream(command.getInputStream).mkString
val stdErr = Source.fromInputStream(command.getErrorStream).mkString
command.join(5, TimeUnit.SECONDS)
val exitStatus = command.getExitStatus
//join on it, and then also get the exit status
ExecutionCompleted(e, output, exitStatus)
log.debug("Command Execution completed!") //Has to be last thing!
ExecutionCompleted(e, stdOut, stdErr, exitStatus)
}.onComplete {
case Success(ec) =>
//Send the completed message back to the sender and stop myself
log.debug(s"Sending $ec to $replyTo")
replyTo ! ec
context.stop(self)
log.debug(s"STOPPING MYSELF: $self")
context.stop(self) //this sends a message, should work fine
case Failure(throwable) =>
log.error("COMMAND EXECUTION FAILED!!!!")
//I think I just need to die, and throw the exception, which should allow my supervisor
// to do something about it
throw throwable
......
......@@ -3,6 +3,7 @@ package is.kow.backupcoordinator.actors
import java.security.{PrivateKey, PublicKey}
import akka.actor.{Actor, ActorLogging, Props}
import com.typesafe.config.ConfigFactory
import is.kow.backupcoordinator.actors.SSHConnectionActor.Connect
import net.schmizz.sshj.SSHClient
import net.schmizz.sshj.common.KeyType
......@@ -18,6 +19,8 @@ object SSHConnectionActor {
class SSHConnectionActor extends Actor with ActorLogging {
val config = ConfigFactory.load()
//TODO: implement key loading from config
class DarkKeyProvider extends KeyProvider {
override def getPrivate: PrivateKey = {
......@@ -48,12 +51,33 @@ class SSHConnectionActor extends Actor with ActorLogging {
val keyProvider = new DarkKeyProvider()
val client = new SSHClient()
client.authPublickey("root", keyProvider)
client.addHostKeyVerifier(new DarkHostKeyVerifier)
client.connect(hostname)
val sshClient = new SSHClient()
val username = config.getString("backup.username")
val keyPath = config.getString("backup.keyPath")
try {
log.info(s"Establishing SSH Connection: username: $username -- $keyPath")
log.info("HostKeyVerifier")
sshClient.addHostKeyVerifier(new DarkHostKeyVerifier)
log.debug(s"RIGHT BEFORE CONNECT TO $hostname")
sshClient.connect(hostname)
log.info("Authenticating with public key")
sshClient.authPublickey(username, keyPath)
//client.authPassword(username, password)
log.info("Using compression")
sshClient.useCompression()
context.parent ! sshClient
log.info("SSH Connection established, I'm finished")
context.parent ! client
} catch {
case e: Exception =>
log.error(e, "OH NOES, something went horribly wrong during getting the connection: {}")
}
//I'm all done, this is my only purpose
context.stop(self)
}
......
......@@ -34,7 +34,7 @@ object SSHActorExperiment extends App with EUtils {
val result = Await.result(resultFuture, 10.seconds)
println(s"COMMAND: ${cmd}\nOutput:\n${result.output}\nExitStatus: ${result.exitValue}\n\n")
println(s"COMMAND: ${cmd}\nOutput:\n${result.stdOut}\nExitStatus: ${result.exitValue}\n\n")
}
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment