Link Search Menu Expand Document

Chaos Monkey - MySQL-backed Store for Schedules and Terminations [Go]

Status
PUBLISHED
Project
Chaos Monkey
Project home page
https://github.com/Netflix/chaosmonkey
Language
Go
Tags
#data-access #sql #dao #chaos-engineering

Help Code Catalog grow: suggest your favorite code or weight in on open article proposals.

Table of contents
  1. Context
  2. Problem
  3. Overview
  4. Implementation details
  5. Testing
  6. References
  7. Copyright notice

Context

Chaos Monkey randomly terminates virtual machine instances and containers that run inside of your production environment. Exposing engineers to failures more frequently incentivizes them to build resilient services. Chaos Monkey is an example of a tool that follows the Principles of Chaos Engineering.

Chaos Monkey uses MySQL to store termination schedules and performed instance terminations.

Problem

It’s often considered a good practice to separate application logic from persistence. How is it done in Chaos Monkey?

Overview

The persistence logic of Chaos Monkey is encapsulated in the MySQL struct. It’a a variation of the Data Access Object (DAO) pattern or of the Repository pattern. The difference between them is rather subtle.

The structure provides methods to:

  • Retrieve the schedule for a given date.
  • Publish a schedule.
  • Check if a termination is permitted.

It’s common for DAOs and Repositories to try to abstract away the nature of the storage as much as possible. Here, while the interface does not give away the nature of the storage, the name, MySQL, does.

Implementation details

Structure definition. The only field is the underlying database.

// MySQL represents a MySQL-backed store for schedules and terminations
type MySQL struct {
	db *sql.DB
}

Retrieving the schedule for the given date. It runs a SELECT statement and maps the results to a Schedule object.

// Retrieve  retrieves the schedule for the given date
func (m MySQL) Retrieve(date time.Time) (sched *schedule.Schedule, err error) {
	rows, err := m.db.Query("SELECT time, app, account, region, stack, cluster FROM schedules WHERE date = DATE(?)", utcDate(date))
	if err != nil {
		return nil, errors.Wrapf(err, "failed to retrieve schedule for %s", date)
	}

	sched = schedule.New()

	defer func() {
		if cerr := rows.Close(); cerr != nil && err == nil {
			err = errors.Wrap(cerr, "rows.Close() failed")
		}
	}()

	for rows.Next() {
		var tm time.Time
		var app, account, region, stack, cluster string

		err = rows.Scan(&tm, &app, &account, &region, &stack, &cluster)
		if err != nil {
			return nil, errors.Wrap(err, "failed to scan row")
		}

		sched.Add(tm, grp.New(app, account, region, stack, cluster))
	}

	err = rows.Err()
	if err != nil {
		return nil, errors.Wrap(err, "rows.Err() errored")
	}

	return sched, nil

}

Publishing a schedule. It checks if a schedule for the date already exists and, if it doesn’t, runs an INSERT statement. Note the delay that allows testing for race conditions.

It could run a little faster if statements were prepared during the initialization. Read more about using prepared statements in Go.

// Publish publishes the schedule for the given date
func (m MySQL) Publish(date time.Time, sched *schedule.Schedule) error {
	return m.PublishWithDelay(date, sched, 0)
}

// PublishWithDelay publishes the schedule with a delay between checking the schedule
// exists and writing it. The delay is used only for testing race conditions
func (m MySQL) PublishWithDelay(date time.Time, sched *schedule.Schedule, delay time.Duration) (err error) {
	// First, we check to see if there is a schedule present
	tx, err := m.db.Begin()
	if err != nil {
		return errors.Wrap(err, "failed to begin transaction")
	}

	// We must either commit or rollback at the end
	defer func() {
		switch err {
		case nil:
			err = tx.Commit()
		case schedstore.ErrAlreadyExists:
			// We want to return ErrAlreadyExists even if the transaction commit
			// fails
			_ = tx.Commit()
		default:
			_ = tx.Rollback()
		}
	}()

	exists, err := schedExists(tx, date)
	if err != nil {
		return err
	}

	if exists {
		return schedstore.ErrAlreadyExists
	}

	if delay > 0 {
		time.Sleep(delay)
	}
	query := "INSERT INTO schedules (date, time, app, account, region, stack, cluster) VALUES (?, ?, ?, ?, ?, ?, ?)"
	stmt, err := tx.Prepare(query)
	if err != nil {
		return errors.Wrapf(err, "failed to prepare sql statement: %s", query)
	}

	for _, entry := range sched.Entries() {
		var app, account, region, stack, cluster string
		app = entry.Group.App()
		account = entry.Group.Account()
		if val, ok := entry.Group.Region(); ok {
			region = val
		}
		if val, ok := entry.Group.Stack(); ok {
			stack = val
		}
		if val, ok := entry.Group.Cluster(); ok {
			cluster = val
		}

		_, err = stmt.Exec(utcDate(date), entry.Time.In(time.UTC), app, account, region, stack, cluster)
		if err != nil {
			return errors.Wrapf(err, "failed to execute prepared query")
		}
	}

	return nil
}

Checking if a termination is permitted. The name of the method seems to imply that it’s side-effect free, but it actually records the termination time.

It’s not obvious that this logic belongs to the persistence layer. Perhaps it was done this way to ensure that the check and the recording are done in the same transaction.

/ Check checks if a termination is permitted and, if so, records the
// termination time on the server
func (m MySQL) Check(term chaosmonkey.Termination, appCfg chaosmonkey.AppConfig, endHour int, loc *time.Location) error {
	return m.CheckWithDelay(term, appCfg, endHour, loc, 0)
}

// CheckWithDelay is the same as Check, but adds a delay between reading and
// writing to the database (used for testing only)
func (m MySQL) CheckWithDelay(term chaosmonkey.Termination, appCfg chaosmonkey.AppConfig, endHour int, loc *time.Location, delay time.Duration) error {
	tx, err := m.db.Begin()
	if err != nil {
		return errors.Wrap(err, "failed to begin transaction")
	}

	defer func() {
		switch err {
		case nil:
			err = tx.Commit()
		default:
			_ = tx.Rollback()
		}
	}()

	err = respectsMinTimeBetweenKills(tx, term.Time, term, appCfg, endHour, loc)
	if err != nil {
		return err
	}

	if delay > 0 {
		time.Sleep(delay)
	}

	err = recordTermination(tx, term, loc)
	return err

}

Testing

Basic happy test:

// Test we can publish and then retrieve a schedule
func TestPublishRetrieve(t *testing.T) {
	err := initDB()
	if err != nil {
		t.Fatal(err)
	}

	m, err := mysql.New("localhost", port, "root", password, "chaosmonkey")
	if err != nil {
		t.Fatal(err)
	}

	loc, err := time.LoadLocation("America/Los_Angeles")
	if err != nil {
		t.Fatal(err)
	}

	sched := schedule.New()

	t1 := time.Date(2016, time.June, 20, 11, 40, 0, 0, loc)
	sched.Add(t1, grp.New("chaosguineapig", "test", "us-east-1", "", "chaosguineapig-test"))

	date := time.Date(2016, time.June, 20, 0, 0, 0, 0, loc)

	// Code under test:
	err = m.Publish(date, sched)
	if err != nil {
		t.Fatal(err)
	}
	sched, err = m.Retrieve(date)
	if err != nil {
		t.Fatal(err)
	}

	entries := sched.Entries()
	if got, want := len(entries), 1; got != want {
		t.Fatalf("got len(entries)=%d, want %d", got, want)
	}

	entry := entries[0]

	if !t1.Equal(entry.Time) {
		t.Errorf("%s != %s", t1, entry.Time)
	}
}

Testing for race conditions. Note that Go’s built-in race detector wouldn’t catch race conditions like these.

func TestScheduleAlreadyExistsConcurrency(t *testing.T) {
    // ...

	// Try to publish the schedule twice. At least one schedule should return an
	// error
	ch := make(chan error, 2)

	date := time.Date(2016, time.June, 20, 0, 0, 0, 0, loc)

	go func() {
		ch <- m.PublishWithDelay(date, psched1, 3*time.Second)
	}()

	go func() {
		ch <- m.PublishWithDelay(date, psched2, 0)
	}()

	// Retrieve the two error values from the two calls

	var success int
	var txDeadlock int
	for i := 0; i < 2; i++ {
		err := <-ch
		switch {
		case err == nil:
			success++
		case mysql.TxDeadlock(err):
			txDeadlock++
		default:
			t.Fatalf("Unexpected error: %+v", err)
		}
	}

	if got, want := success, 1; got != want {
		t.Errorf("got %d succeses, want: %d", got, want)
	}

	// Should cause a deadlock
	if got, want := txDeadlock, 1; got != want {
		t.Errorf("got %d txDeadlock, want: %d", got, want)
	}
}

References

Bat is licensed under the Apache License 2.0.

Copyright 2015 Netflix, Inc.