In this section we are going to write a Go program (the circuit app), which when executed will connect to the circuit cluster and deploy the tutorial's MySQL/Node.js-based web service using two hosts from the cluster.
The source of the finished app is located in:
$GOPATH/github.com/gocircuit/circuit/tutorial/nodejs-using-mysql/start-app/main.go
This is going to be a single source-file Go program, which will expect exactly one command-line
parameter, -addr
, specifying the circuit address to connect into. As
a start, the program will look like this:
package main import ( "flag" "github.com/gocircuit/circuit/client" ) var flagAddr = flag.String("addr", "", "circuit server address (looks like circuit://...)") func fatalf(format string, arg ...interface{}) { println(fmt.Sprintf(format, arg...)) os.Exit(1) } func main() { flag.Parse() … }
Notable here is the import of the circuit client package "github.com/gocircuit/circuit/client"
and the definition of function fatalf()
, which we'll use to report terminal errors.
The next step is to connect into a circuit server (specified by a circuit address) and obtain a client object that will give us access to the circuit API. The following subroutine takes a circuit address as an argument and returns a connected client object as a result:
func connect(addr string) *client.Client { defer func() { if r := recover(); r != nil { fatalf("could not connect: %v", r) } }() return client.Dial(addr, nil) }
Note that by convention, the circuit client library universally reports loss of connection (or inability to establish connection) conditions via panics, as they can occur anywhere in its methods. Such panics are normal error conditions, and can be recovered from. In our case, we prefer to terminate the program with an error message.
The main
function can now be updated to:
func main() { flag.Parse() c := connect(*flagAddr) … }
The next goal is to “list” the contents of the circuit cluster and to choose two hosts out of the inventory — one for the MySQL database and one for the Node.js front-end service.
The circuit Go API represents all cluster resources in the form of one
big hierarchy of “anchors”. Each anchor can have any number of
uniquely-named sub-anchors, and it can be associated with one
resource (process, server, container, etc.) Anchors are represented
by the interface type client.Anchor
.
The client object, of type *client.Client
, is itself an
anchor (it implements client.Anchor
) and it is in fact
the root anchor of the circuit cluster's virtual hierarchy.
The root anchor is unique in that it is not associated with any resource and its sub-anchors automatically exactly correspond to the circuit servers that are presently members of the cluster.
Every anchor has a View
method:
View() map[string]client.Anchorwhich returns the sub-anchors of this anchor and their names. If we invoke the
View
method of the root anchor,
we obtain a list of anchors corresponding to the currently live
circuit servers.
We are going to use this to write a simple subroutine that blindly picks a fixed number of hosts out of the available ones, re-using some hosts if necessary:
func pickHosts(c *client.Client, n int) (hosts []client.Anchor) { defer func() { if recover() != nil { fatalf("client connection lost") } }() view := c.View() if len(view) == 0 { fatalf("no hosts in cluster") } for len(hosts) < n { for _, a := range view { if len(hosts) >= n { break } hosts = append(hosts, a) } } return }
Note again here that a panic ensuing from c.View()
would
indicate a broken connection between the client and the circuit server, in which
case we prefer to exit the program.
We can further update main
to:
func main() { flag.Parse() c := connect(*flagAddr) hosts := pickHosts(c, 2) … }
Before we continue with the main app logic—starting MySQL and starting Node.js—we are going to make a small detour. We are going to implement a useful subroutine that executes shell commands and scripts on any desired host directly from the Go environment of our app.
The function runShellStdin
takes an anchor parameter host
,
which is expected to be an anchor corresponding to a circuit server. It
executes a desired shell command cmd
on the corresponding host,
and it also supplies the string stdin
as standard input to the
shell process.
The function waits (blocks) until the shell process terminates and returns its
standard output in the form of a string. If the shell process exits in error, this
is reflected in a non-nil return error value. If the function fails due to loss of
connection (as opposed to due to an unsuccessful exit from the shell process),
runShellStdin
will terminate the processes and exit with an error message.
func runShellStdin(host client.Anchor, cmd, stdin string) (string, error) { defer func() { if recover() != nil { fatalf("connection to host lost") } }() job := host.Walk([]string{"shelljob", strconv.Itoa(rand.Int())}) proc, _ := job.MakeProc(client.Cmd{ Path: "/bin/sh", Dir: "/tmp", Args: []string{"-c", cmd}, Scrub: true, }) go func() { io.Copy(proc.Stdin(), bytes.NewBufferString(stdin)) proc.Stdin().Close() // Must close the standard input of the shell process. }() proc.Stderr().Close() // Close to indicate discarding standard error var buf bytes.Buffer io.Copy(&buf, proc.Stdout()) stat, _ := proc.Wait() return buf.String(), stat.Exit }
Let's break down what this function accomplishes:
defer func() { if recover() != nil { fatalf("connection to host lost") } }()
The defer
statement catches panics that may arise from the circuit API calls.
By convention, any such panic indicates that either (i) the particular host we are manipulating
(through the methods of the anchor object) has become disconnected from the cluster, or
(ii) our client has lost connection to the circuit server that it initially connected to, using client.Dial
.
In this example, we prefer to terminate the app if we encounter loss of connectivity of either kind.
In general, one could detect whether (i) or (ii) was the cause for the panic.
For instance, if a subsequent call to a client method, like View()
, also panics
then the client itself has been disconnected, i.e. condition (ii). In this case, you need to discard
the client object as well as any anchors derived from it. But if such a subsequent call does not panic, it implies
that the initial panic was caused by condition (i). In this case, only the host that your anchor
refers to has been disconnected and you can continue using the same client.
job := host.Walk([]string{"shelljob", strconv.Itoa(rand.Int())})
The next line, which invokes host.Walk
, creates an anchor (i.e. a node in the
circuit's virtual hierarchy) for the shell process that we are about to execute.
For instance, if the host anchor corresponds to a path like /Xfea8b5b798f2fc09
,
then the anchor job
will correspond to a path like
/Xfea8b5b798f2fc09/shelljob/1234
, where 1234
is an
integer that we pick randomly to make sure we arrive at an anchor that does not
already have a resource attached to it.
In general, calls to anchor.Walk()
always succeed (as long as the implied
underlying host is connected). If the anchor we are “walking” to does not already exist,
it is automatically created. On the other hand, anchors that are not used by any clients
and have no resources attached to them are eventually garbage-collected for you.
proc, _ := job.MakeProc(client.Cmd{ Path: "/bin/sh", Dir: "/tmp", Args: []string{"-c", cmd}, Scrub: true, })
The following call to job.MakeProc
executes the shell process
and creates a process handle — which we call a process element — and
attaches the process element to the anchor job
.
The process element is represented by the returned value in proc
.
(In general, elements attached to an anchor can be retrieved using the Get
method.)
The function MakeProc
returns as soon as the process is executed, it
does not wait for the process to complete. The returned error value, ignored in our example,
is non-nil only in the event that the binary to be executed is not found on the host.
The argument to MakeProc
specifies the command, as usual.
The field Scrub
, when set to true
, tells the circuit runtime to remove
the process anchor automatically when the process dies. (Normally anchors that
have resources attached to them are not garbage-collected from the virtual hierarchy.
They must be scrubbed explicitly by the user.)
go func() { io.Copy(proc.Stdin(), bytes.NewBufferString(stdin)) proc.Stdin().Close() // Must close the standard input of the shell process. }() proc.Stderr().Close() // Close to indicate discarding standard error var buf bytes.Buffer io.Copy(&buf, proc.Stdout())
As soon as MakeProc
returns, the process is running.
Our next goal is to take care of its standard streams: By POSIX convention,
every process will block if (i) it tries to read from standard input and there is nothing
to read and the descriptor is still open, or (ii) it tries to write to standard
error or output and they are not being consumed.
We have direct access to the standard streams of the running process via the methods
Stdin() io.WriteCloser Stdout() io.ReadCloser Stderr() io.ReadCloserof the
proc
variable.
In a separate goroutine, we write the contents of the parameter stdin
to the standard input of the shell process and then we close the stream, indicating that
no more input is to be expected.
Meanwhile, in the main goroutine we first close the standard error stream. This tells the circuit that all output to that stream should be discarded. Closing a stream never blocks.
Finally, we block on greedily reading the standard output of the shell process
into a buffer until we encounter closure, i.e. an EOF condition. Closure of the
standard output stream happens immediately before the process exits. At
this point io.Copy()
will unblock.
stat, _ := proc.Wait() return buf.String(), stat.Exit
At last we invoke proc.Wait
to wait for the death of the
process and capture its exit state within the returned stat
structure.
If the error field stat.Exit
is non-nil, it means the process
exited in error.
Often we won't be interested in passing any data to the standard input of the shell process, for which cases we add a shortcut subroutine:
func runShell(host client.Anchor, cmd string) (string, error) { return runShellStdin(host, cmd, "") }
We are now going to use runShell
to fetch the public
and private IP addresses of any host on the cluster.
On any Amazon EC2 host instance, by definition, one is able to retrieve the public and private IP addresses of the host instance using the following command-lines, respectively:
curl http://169.254.169.254/latest/meta-data/public-ipv4 curl http://169.254.169.254/latest/meta-data/local-ipv4
Basing on that and using runShell
, the following subroutine will
fetch the public address of any host on the circuit cluster, specified by its anchor:
func getEc2PublicIP(host client.Anchor) string { out, err := runShell(host, "curl http://169.254.169.254/latest/meta-data/public-ipv4") if err != nil { fatalf("get ec2 public ip error: %v", err) } out = strings.TrimSpace(out) if _, err := net.ResolveIPAddr("ip", out); err != nil { fatalf("ip %q unrecognizable: %v", out, err) } return out }
To retrieve the private host IP we implement a similar function getEc2PrivateIP
,
which only differs from the above in that public-ipv4
is substituted with
local-ipv4
.
We would like to write a routine that starts a fresh MySQL server on a given host and returns its server address and port number as a result.
func startMysql(host client.Anchor) (ip, port string)
We are first going to describe the “manual” processs of starting a fresh MySQL server, assuming we have a shell session at the host.
Then we are going to show how this manual process can be codified into a Go subroutine that performs its steps directly from the client application.
Let's asume you are at the shell of the host machine. The following steps describe the way to start the MySQL server with a new database.
Obtain the private IP address of this host:
$ IP=$(curl http://169.254.169.254/latest/meta-data/local-ipv4)
Rewrite MySQL's configuration file to bind to the private IP address and the default port 3306:
$ sudo sed -i 's/^bind-address\s*=.*$/bind-address = '$IP:3306'/' /etc/mysql/my.cnf
Start the server:
$ sudo /etc/init.d/mysql start
Connect to MySQL as root to prepare the tutorial user and database:
$ sudo mysql mysql> DROP USER tutorial; mysql> DROP DATABASE tutorial; mysql> CREATE USER tutorial; mysql> CREATE DATABASE tutorial; mysql> GRANT ALL ON tutorial.* TO tutorial;
Then connect as the tutorial
user and set up the main table:
$ mysql -u tutorial mysql> USE tutorial; mysql> CREATE TABLE NameValue (name VARCHAR(100), value TEXT, PRIMARY KEY (name));
The database is now configured, up and accepting connections at $IP:3306
.
Retracing the manual steps programmatically is straightforward, purely using the
subroutines getEc2PrivateIP
, runShell
and runShellStdin
that we created earlier.
func startMysql(host client.Anchor) (ip, port string) { // Retrieve the IP address of this host within the cluster's private network. ip = getEc2PrivateIP(host) // We use the default MySQL server port port = strconv.Itoa(3306) // Rewrite MySQL config to bind to the private host address cfg := fmt.Sprintf(`sudo sed -i 's/^bind-address\s*=.*$/bind-address = %s/' /etc/mysql/my.cnf`, ip) if _, err := runShell(host, cfg); err != nil { fatalf("mysql configuration error: %v", err) } // Start MySQL server if _, err := runShell(host, "sudo /etc/init.d/mysql start"); err != nil { fatalf("mysql start error: %v", err) } // Remove old database and user runShellStdin(host, "sudo /usr/bin/mysql", "DROP USER tutorial;") runShellStdin(host, "sudo /usr/bin/mysql", "DROP DATABASE tutorial;") // Create tutorial user and database within MySQL const m1 = ` CREATE USER tutorial; CREATE DATABASE tutorial; GRANT ALL ON tutorial.* TO tutorial; ` if _, err := runShellStdin(host, "sudo /usr/bin/mysql", m1); err != nil { fatalf("problem creating database and user: %v", err) } // Create key/value table within tutorial database const m2 = ` USE tutorial; CREATE TABLE NameValue (name VARCHAR(100), value TEXT, PRIMARY KEY (name)); ` if _, err := runShellStdin(host, "/usr/bin/mysql -u tutorial", m2); err != nil { fatalf("problem creating table: %v", err) } return }
We add a call to startMysql
to the main logic:
func main() { flag.Parse() c := connect(*flagAddr) host := pickHosts(c, 2) mysqlIP, mysqlPort := startMysql(host[0]) println("Started MySQL on private address:", mysqlIP, mysqlPort) … }
Starting the Node.js app amounts to running the following command-line on the target host:
$ sudo /usr/bin/nodejs nodejs-app/index.js \ --mysql_host $MYSQL_HOST --mysql_port $MYSQL_PORT \ --api_host $API_HOST --api_port $API_PORT \ &> /tmp/tutorial-nodejs.log
The app finds the backend MySQL server via the arguments --mysql_host
and --mysql_port
. While it binds its HTTP API server to the address given by
the arguments --api_host
and --api_port
.
The function startNodejs
takes a target host parameter,
as well as the host and port of the backing MySQL server. It starts the Node.js
app on the target host and returns the public IP address and port of the HTTP API endpoint.
func startNodejs(host client.Anchor, mysqlIP, mysqlPort string) (ip, port string) { defer func() { if recover() != nil { fatalf("connection to host lost") } }() // Start node.js application ip = getEc2PublicIP(host) port = "8080" job := host.Walk([]string{"nodejs"}) shell := fmt.Sprintf( "sudo /usr/bin/nodejs index.js "+ "--mysql_host %s --mysql_port %s --api_host %s --api_port %s "+ "&> /tmp/tutorial-nodejs.log", mysqlIP, mysqlPort, "0.0.0.0", port, ) proc, err := job.MakeProc(client.Cmd{ Path: "/bin/sh", Dir: "/home/ubuntu/nodejs-app", Args: []string{"-c", shell}, Scrub: true, }) if err != nil { fatalf("nodejs app already running") } proc.Stdin().Close() proc.Stdout().Close() proc.Stderr().Close() return }
Note how we run the server. We execute a shell process, which itself executes the Node.js app. The shell process, which is the one created by the circuit, is a long-running one. It will run for as long as the child Node.js server is running.
As soon as the process is executed MakeProc
returns, but the process continues
executing in the background. We then close all of its standard streams as we don't intend them to be used.
The process element is attached to anchor of the form /X686ea8f7374e59a2/nodejs
.
This will allow you to find it in the future and check its state, for instance, by using the command-lines
circuit ls
and circuit peek
.
At last, we tie this function into the main logic, which completes our circuit app implementation:
func main() { flag.Parse() c := connect(*flagAddr) host := pickHosts(c, 2) mysqlIP, mysqlPort := startMysql(host[0]) println("Started MySQL on private address:", mysqlIP, mysqlPort) nodejsIP, nodejsPort := startNodejs(host[1], mysqlIP, mysqlPort) println("Started Node.js service on public address:", nodejsIP, nodejsPort) }