Circuit Self-managed infrastructure, programmatic monitoring and orchestration

Write the circuit app

In this section we are going to write a Go program (the circuit app), which when executed will connect to the circuit cluster and deploy the tutorial's MySQL/Node.js-based web service using two hosts from the cluster.

The source of the finished app is located in:

	$GOPATH/github.com/gocircuit/circuit/tutorial/nodejs-using-mysql/start-app/main.go

First strokes

This is going to be a single source-file Go program, which will expect exactly one command-line parameter, -addr, specifying the circuit address to connect into. As a start, the program will look like this:

package main

import (
	"flag"
	"github.com/gocircuit/circuit/client"
)

var flagAddr = flag.String("addr", "", "circuit server address (looks like circuit://...)")

func fatalf(format string, arg ...interface{}) {
	println(fmt.Sprintf(format, arg...))
	os.Exit(1)
}

func main() {
	flag.Parse()
	…
}

Notable here is the import of the circuit client package "github.com/gocircuit/circuit/client" and the definition of function fatalf(), which we'll use to report terminal errors.

Connecting to the cluster

The next step is to connect into a circuit server (specified by a circuit address) and obtain a client object that will give us access to the circuit API. The following subroutine takes a circuit address as an argument and returns a connected client object as a result:

func connect(addr string) *client.Client {
	defer func() {
		if r := recover(); r != nil {
			fatalf("could not connect: %v", r)
		}
	}()
	return client.Dial(addr, nil)
}

Note that by convention, the circuit client library universally reports loss of connection (or inability to establish connection) conditions via panics, as they can occur anywhere in its methods. Such panics are normal error conditions, and can be recovered from. In our case, we prefer to terminate the program with an error message.

The main function can now be updated to:

func main() {
	flag.Parse()
	c := connect(*flagAddr)
	…
}

Selecting hosts

The next goal is to “list” the contents of the circuit cluster and to choose two hosts out of the inventory — one for the MySQL database and one for the Node.js front-end service.

The circuit Go API represents all cluster resources in the form of one big hierarchy of “anchors”. Each anchor can have any number of uniquely-named sub-anchors, and it can be associated with one resource (process, server, container, etc.) Anchors are represented by the interface type client.Anchor.

The client object, of type *client.Client, is itself an anchor (it implements client.Anchor) and it is in fact the root anchor of the circuit cluster's virtual hierarchy.

The root anchor is unique in that it is not associated with any resource and its sub-anchors automatically exactly correspond to the circuit servers that are presently members of the cluster.

Every anchor has a View method:

	View() map[string]client.Anchor
which returns the sub-anchors of this anchor and their names. If we invoke the View method of the root anchor, we obtain a list of anchors corresponding to the currently live circuit servers.

We are going to use this to write a simple subroutine that blindly picks a fixed number of hosts out of the available ones, re-using some hosts if necessary:

func pickHosts(c *client.Client, n int) (hosts []client.Anchor) {
	defer func() {
		if recover() != nil {
			fatalf("client connection lost")
		}
	}()
	view := c.View()
	if len(view) == 0 {
		fatalf("no hosts in cluster")
	}
	for len(hosts) < n {
		for _, a := range view {
			if len(hosts) >= n {
				break
			}
			hosts = append(hosts, a)
		}
	}
	return
}

Note again here that a panic ensuing from c.View() would indicate a broken connection between the client and the circuit server, in which case we prefer to exit the program.

We can further update main to:

func main() {
	flag.Parse()
	c := connect(*flagAddr)
	hosts := pickHosts(c, 2)
	…
}

Before we continue with the main app logic—starting MySQL and starting Node.js—we are going to make a small detour. We are going to implement a useful subroutine that executes shell commands and scripts on any desired host directly from the Go environment of our app.

A versatile orchestration subroutine

The function runShellStdin takes an anchor parameter host, which is expected to be an anchor corresponding to a circuit server. It executes a desired shell command cmd on the corresponding host, and it also supplies the string stdin as standard input to the shell process.

The function waits (blocks) until the shell process terminates and returns its standard output in the form of a string. If the shell process exits in error, this is reflected in a non-nil return error value. If the function fails due to loss of connection (as opposed to due to an unsuccessful exit from the shell process), runShellStdin will terminate the processes and exit with an error message.

func runShellStdin(host client.Anchor, cmd, stdin string) (string, error) {
	defer func() {
		if recover() != nil {
			fatalf("connection to host lost")
		}
	}()
	job := host.Walk([]string{"shelljob", strconv.Itoa(rand.Int())})
	proc, _ := job.MakeProc(client.Cmd{
		Path:  "/bin/sh",
		Dir:   "/tmp",
		Args:  []string{"-c", cmd},
		Scrub: true,
	})
	go func() {
		io.Copy(proc.Stdin(), bytes.NewBufferString(stdin))
		proc.Stdin().Close() // Must close the standard input of the shell process.
	}()
	proc.Stderr().Close() // Close to indicate discarding standard error
	var buf bytes.Buffer
	io.Copy(&buf, proc.Stdout())
	stat, _ := proc.Wait()
	return buf.String(), stat.Exit
}

Let's break down what this function accomplishes:

  1. 	defer func() {
    		if recover() != nil {
    			fatalf("connection to host lost")
    		}
    	}()
    

    The defer statement catches panics that may arise from the circuit API calls. By convention, any such panic indicates that either (i) the particular host we are manipulating (through the methods of the anchor object) has become disconnected from the cluster, or (ii) our client has lost connection to the circuit server that it initially connected to, using client.Dial.

    In this example, we prefer to terminate the app if we encounter loss of connectivity of either kind.

    In general, one could detect whether (i) or (ii) was the cause for the panic. For instance, if a subsequent call to a client method, like View(), also panics then the client itself has been disconnected, i.e. condition (ii). In this case, you need to discard the client object as well as any anchors derived from it. But if such a subsequent call does not panic, it implies that the initial panic was caused by condition (i). In this case, only the host that your anchor refers to has been disconnected and you can continue using the same client.

  2. 	job := host.Walk([]string{"shelljob", strconv.Itoa(rand.Int())})
    

    The next line, which invokes host.Walk, creates an anchor (i.e. a node in the circuit's virtual hierarchy) for the shell process that we are about to execute. For instance, if the host anchor corresponds to a path like /Xfea8b5b798f2fc09, then the anchor job will correspond to a path like /Xfea8b5b798f2fc09/shelljob/1234, where 1234 is an integer that we pick randomly to make sure we arrive at an anchor that does not already have a resource attached to it.

    In general, calls to anchor.Walk() always succeed (as long as the implied underlying host is connected). If the anchor we are “walking” to does not already exist, it is automatically created. On the other hand, anchors that are not used by any clients and have no resources attached to them are eventually garbage-collected for you.

  3. 	proc, _ := job.MakeProc(client.Cmd{
    		Path:  "/bin/sh",
    		Dir:   "/tmp",
    		Args:  []string{"-c", cmd},
    		Scrub: true,
    	})
    

    The following call to job.MakeProc executes the shell process and creates a process handle — which we call a process element — and attaches the process element to the anchor job. The process element is represented by the returned value in proc. (In general, elements attached to an anchor can be retrieved using the Get method.)

    The function MakeProc returns as soon as the process is executed, it does not wait for the process to complete. The returned error value, ignored in our example, is non-nil only in the event that the binary to be executed is not found on the host.

    The argument to MakeProc specifies the command, as usual. The field Scrub, when set to true, tells the circuit runtime to remove the process anchor automatically when the process dies. (Normally anchors that have resources attached to them are not garbage-collected from the virtual hierarchy. They must be scrubbed explicitly by the user.)

  4. 	go func() {
    		io.Copy(proc.Stdin(), bytes.NewBufferString(stdin))
    		proc.Stdin().Close() // Must close the standard input of the shell process.
    	}()
    	proc.Stderr().Close() // Close to indicate discarding standard error
    	var buf bytes.Buffer
    	io.Copy(&buf, proc.Stdout())
    

    As soon as MakeProc returns, the process is running. Our next goal is to take care of its standard streams: By POSIX convention, every process will block if (i) it tries to read from standard input and there is nothing to read and the descriptor is still open, or (ii) it tries to write to standard error or output and they are not being consumed.

    We have direct access to the standard streams of the running process via the methods

    	Stdin() io.WriteCloser
    	Stdout() io.ReadCloser
    	Stderr() io.ReadCloser
    
    of the proc variable.

    In a separate goroutine, we write the contents of the parameter stdin to the standard input of the shell process and then we close the stream, indicating that no more input is to be expected.

    Meanwhile, in the main goroutine we first close the standard error stream. This tells the circuit that all output to that stream should be discarded. Closing a stream never blocks.

    Finally, we block on greedily reading the standard output of the shell process into a buffer until we encounter closure, i.e. an EOF condition. Closure of the standard output stream happens immediately before the process exits. At this point io.Copy() will unblock.

  5. 	stat, _ := proc.Wait()
    	return buf.String(), stat.Exit
    

    At last we invoke proc.Wait to wait for the death of the process and capture its exit state within the returned stat structure. If the error field stat.Exit is non-nil, it means the process exited in error.

Often we won't be interested in passing any data to the standard input of the shell process, for which cases we add a shortcut subroutine:

func runShell(host client.Anchor, cmd string) (string, error) {
	return runShellStdin(host, cmd, "")
}

We are now going to use runShell to fetch the public and private IP addresses of any host on the cluster.

Retrieving EC2 host public and private IP addresses

On any Amazon EC2 host instance, by definition, one is able to retrieve the public and private IP addresses of the host instance using the following command-lines, respectively:

curl http://169.254.169.254/latest/meta-data/public-ipv4
curl http://169.254.169.254/latest/meta-data/local-ipv4

Basing on that and using runShell, the following subroutine will fetch the public address of any host on the circuit cluster, specified by its anchor:

func getEc2PublicIP(host client.Anchor) string {
	out, err := runShell(host, "curl http://169.254.169.254/latest/meta-data/public-ipv4")
	if err != nil {
		fatalf("get ec2 public ip error: %v", err)
	}
	out = strings.TrimSpace(out)
	if _, err := net.ResolveIPAddr("ip", out); err != nil {
		fatalf("ip %q unrecognizable: %v", out, err)
	}
	return out
}

To retrieve the private host IP we implement a similar function getEc2PrivateIP, which only differs from the above in that public-ipv4 is substituted with local-ipv4.

Starting the MySQL database on host A

We would like to write a routine that starts a fresh MySQL server on a given host and returns its server address and port number as a result.

	func startMysql(host client.Anchor) (ip, port string)

We are first going to describe the “manual” processs of starting a fresh MySQL server, assuming we have a shell session at the host.

Then we are going to show how this manual process can be codified into a Go subroutine that performs its steps directly from the client application.

Manually starting MySQL at the host

Let's asume you are at the shell of the host machine. The following steps describe the way to start the MySQL server with a new database.

Obtain the private IP address of this host:

	$ IP=$(curl http://169.254.169.254/latest/meta-data/local-ipv4)

Rewrite MySQL's configuration file to bind to the private IP address and the default port 3306:

	$ sudo sed -i 's/^bind-address\s*=.*$/bind-address = '$IP:3306'/' /etc/mysql/my.cnf

Start the server:

	$ sudo /etc/init.d/mysql start

Connect to MySQL as root to prepare the tutorial user and database:

	$ sudo mysql
	mysql> DROP USER tutorial;
	mysql> DROP DATABASE tutorial;
	mysql> CREATE USER tutorial;
	mysql> CREATE DATABASE tutorial;
	mysql> GRANT ALL ON tutorial.*  TO tutorial;

Then connect as the tutorial user and set up the main table:

	$ mysql -u tutorial
	mysql> USE tutorial;
	mysql> CREATE TABLE NameValue (name VARCHAR(100), value TEXT, PRIMARY KEY (name));

The database is now configured, up and accepting connections at $IP:3306.

Programmatically starting MySQL from the app

Retracing the manual steps programmatically is straightforward, purely using the subroutines getEc2PrivateIP, runShell and runShellStdin that we created earlier.

	func startMysql(host client.Anchor) (ip, port string) {

		// Retrieve the IP address of this host within the cluster's private network.
		ip = getEc2PrivateIP(host)

		// We use the default MySQL server port
		port = strconv.Itoa(3306)

		// Rewrite MySQL config to bind to the private host address
		cfg := fmt.Sprintf(`sudo sed -i 's/^bind-address\s*=.*$/bind-address = %s/' /etc/mysql/my.cnf`, ip)
		if _, err := runShell(host, cfg); err != nil {
			fatalf("mysql configuration error: %v", err)
		}

		// Start MySQL server
		if _, err := runShell(host, "sudo /etc/init.d/mysql start"); err != nil {
			fatalf("mysql start error: %v", err)
		}

		// Remove old database and user
		runShellStdin(host, "sudo /usr/bin/mysql", "DROP USER tutorial;")
		runShellStdin(host, "sudo /usr/bin/mysql", "DROP DATABASE tutorial;")

		// Create tutorial user and database within MySQL
		const m1 = `
	CREATE USER tutorial;
	CREATE DATABASE tutorial;
	GRANT ALL ON tutorial.*  TO tutorial;
	`
		if _, err := runShellStdin(host, "sudo /usr/bin/mysql", m1); err != nil {
			fatalf("problem creating database and user: %v", err)
		}

		// Create key/value table within tutorial database
		const m2 = `
	USE tutorial;
	CREATE TABLE NameValue (name VARCHAR(100), value TEXT, PRIMARY KEY (name));
	`
		if _, err := runShellStdin(host, "/usr/bin/mysql -u tutorial", m2); err != nil {
			fatalf("problem creating table: %v", err)
		}

		return
	}

We add a call to startMysql to the main logic:

	func main() {
		flag.Parse()
		c := connect(*flagAddr)
		host := pickHosts(c, 2)

		mysqlIP, mysqlPort := startMysql(host[0])
		println("Started MySQL on private address:", mysqlIP, mysqlPort)
		…
	}

Starting the Node.js app on host B

Starting the Node.js app amounts to running the following command-line on the target host:

	$ sudo /usr/bin/nodejs nodejs-app/index.js \
		--mysql_host $MYSQL_HOST --mysql_port $MYSQL_PORT \
		--api_host $API_HOST --api_port $API_PORT \
		&> /tmp/tutorial-nodejs.log

The app finds the backend MySQL server via the arguments --mysql_host and --mysql_port. While it binds its HTTP API server to the address given by the arguments --api_host and --api_port.

The function startNodejs takes a target host parameter, as well as the host and port of the backing MySQL server. It starts the Node.js app on the target host and returns the public IP address and port of the HTTP API endpoint.

	func startNodejs(host client.Anchor, mysqlIP, mysqlPort string) (ip, port string) {
		defer func() {
			if recover() != nil {
				fatalf("connection to host lost")
			}
		}()

		// Start node.js application
		ip = getEc2PublicIP(host)
		port = "8080"
		job := host.Walk([]string{"nodejs"})
		shell := fmt.Sprintf(
			"sudo /usr/bin/nodejs index.js "+
				"--mysql_host %s --mysql_port %s --api_host %s --api_port %s "+
				"&> /tmp/tutorial-nodejs.log",
			mysqlIP, mysqlPort,
			"0.0.0.0", port,
		)
		proc, err := job.MakeProc(client.Cmd{
			Path:  "/bin/sh",
			Dir:   "/home/ubuntu/nodejs-app",
			Args:  []string{"-c", shell},
			Scrub: true,
		})
		if err != nil {
			fatalf("nodejs app already running")
		}
		proc.Stdin().Close()
		proc.Stdout().Close()
		proc.Stderr().Close()

		return
	}

Note how we run the server. We execute a shell process, which itself executes the Node.js app. The shell process, which is the one created by the circuit, is a long-running one. It will run for as long as the child Node.js server is running.

As soon as the process is executed MakeProc returns, but the process continues executing in the background. We then close all of its standard streams as we don't intend them to be used.

The process element is attached to anchor of the form /X686ea8f7374e59a2/nodejs. This will allow you to find it in the future and check its state, for instance, by using the command-lines circuit ls and circuit peek.

At last, we tie this function into the main logic, which completes our circuit app implementation:

	func main() {
		flag.Parse()
		c := connect(*flagAddr)
		host := pickHosts(c, 2)

		mysqlIP, mysqlPort := startMysql(host[0])
		println("Started MySQL on private address:", mysqlIP, mysqlPort)

		nodejsIP, nodejsPort := startNodejs(host[1], mysqlIP, mysqlPort)
		println("Started Node.js service on public address:", nodejsIP, nodejsPort)
	}