Возникла задача: у нас есть компилятор собственного языка программирования, которым мы компилируем некоторый диалект бейсика в исходник на C.
К сожалению, по историческим причинам, у нас не было четкого регрессионного тестирования для этого компилятора. Но сейчас, на основе исходников бизнес-приложения, написанного на этом бейсике, решили сделать полноценное тестирование.
План таков: принять какую-то текущую версию компилятора, на которую нет открытых жалоб от клиентов, за эталон. Скомпилировать этой версией приличное количество исходников, сохранить результат, и затем каждый раз при внесении в компилятор изменений прогонять все эти исходники и смотреть, генерируется ли точно такой же вывод. Это не защитит от появления ошибок в целом, но по крайне мере будет уверенность, что существующий бизнес код все еще компилируется правильно.
Несложная задача. Только есть одно "но". Количество исходников, которые планируется использовать как эталонные - около 15 тысяч файлов, суммарным объемом чуть меньше гига (для удобства они завернуты в один TAR). Подобный "прогон" может быть весьма долгим. И есть естественное желание сделать тест максимально быстрым, используя многопроцессорную машину, ибо задача прекрасно распараллеливается.
Как вариант - можно сделать Makefile и запускать его с ключом "-j" в GNU Make. Но если написать специализированную многопоточную программу, то можно достичь лучшей производительности.
Итак, очевидно: вместо последовательного выполнения нужно запускать компиляцию каждого файла в параллельных потоках. Но так как файлов много (~15 тысяч), неэффективно просто одновременно запустить столько много потоков. Разумнее всего будет иметь пул потоков, где их количество будет определяться, например, количеством процессоров (например, умноженное на 2). Пул будет назначать очередную задачу на свободный поток, и если все потоки заняты, он будет блокироваться до тех пор, пока не появиться свободный.
Таким образом, мы будем поддерживать занятыми N потоков, обеспечивая оптимальную загрузку процессоров, не тратя время на лишние переключения контекста и постоянное создание и уничтожение потоков.
Сначала я решил написать все на С++ и pthreads. После нескольких часов танцов вокруг функторов, мьютексов, семафоров и условных переменных, у меня так ничего реально работающего не вышло. И тут я вспомнил про Go. Не поверите - через час работы у меня была готова первая версия, включая мелочевку типа работы с TAR, командной строкой и запуском внешнего процесса.
Итак: данная программа берет TAR с исходниками, распаковывает его, и каждый файл прогоняет через компилятор.
Сразу скажу, цель того, что я все это пишу тут, это продемонстрировать (и не более того), как просто и удобно на Go можно писать многопоточные императивные программы.
Главная концепция, которая используется в этой программе - это каналы. По каналам можно синхронно передавать данные и функции между потоками (Go-рутинами).
Далее, можно просто смотреть по исходнику. Самое интересное место там, где видно, как функция "compile()" может вызываться из нескольких потоков без каких-либо изменений.
package main
import (
"archive/tar"
"container/vector"
"exec"
"flag"
"fmt"
"io"
"os"
"strings"
)
// Два флага: количество потоков и имя компилятора.
var jobs *int = flag.Int("jobs", 0, "number of concurrent jobs")
var compiler *string = flag.String("cc", "bcom", "compiler name")
func main() {
flag.Parse()
os.Args = flag.Args()
args := os.Args
ar := args[0]
r, err := os.Open(ar, os.O_RDONLY, 0666);
if err != nil {
fmt.Printf("unable to open TAR %s\n", ar)
os.Exit(1)
}
// defer - это аналог "finally {}", гарантированное выполнение
// кода при выходе из блока.
defer r.Close()
// Цикл распаковки TAR.
fmt.Printf("- extracting %s\n", ar)
// Создаем контекст для распаковки.
tr := tar.NewReader(r)
tests := new(vector.StringVector)
// Последовательный проход по архиву, сохранение файлов и составление
// списка для компиляции.
for {
// Получаем дескриптор следующего файла в архиве.
hdr, _ := tr.Next()
if hdr == nil {
break
}
name := &hdr.Name
// Если это не заголовочный файл, сохраним имя.
if !strings.HasPrefix(*name, "HDR_") {
tests.Push(*name)
}
// Создаем новый файл.
w, err := os.Open("data/" + *name, os.O_CREAT | os.O_RDWR, 0666)
if err != nil {
fmt.Printf("unable to create %s\n", *name)
os.Exit(1)
}
// Копируем содержимое в текущий файл.
io.Copy(w, tr)
w.Close()
}
fmt.Printf("- compiling...\n")
*compiler , _ = exec.LookPath(*compiler)
fmt.Printf("- compiler %s\n", *compiler)
if *jobs == 0 {
// Вызываем "compile()" последовательно, в основном потоке.
fmt.Printf("- running sequentially\n")
for i := 0; i < tests.Len(); i++ {
compile(tests.At(i))
}
} else {
// Запускаем "compile()" в параллельных потоках.
fmt.Printf("- running %d concurrent job(s)\n", *jobs)
// Канал задач: в этот канал мы будем класть имена файлов,
// которые надо скомпилировать. Потоки-runner'ы будут ждать
// сообщений из этого канала. Канал имеет ограничение по
// длине. Это аналог семафора, чтобы блокировать главный
// поток, если все runner'ы заняты.
tasks := make(chan string, *jobs)
// Канал подтверждения полного завершение потока-runner'а.
// Главный поток будет ждать, пока все runner'ы ответят
// по этому каналу. Тип сообщений тут не важен.
done := make(chan bool)
// Запускаем runner'ы.
for i := 0; i < *jobs; i++ {
go runner(tasks, done)
}
// Передаем в канал имена файлов для обработки. При
// достижении максимального размера канала, главный поток
// будет заблокирован.
for i := 0; i < tests.Len(); i++ {
tasks <- tests.At(i)
}
// Посылаем всем потокам команду завершиться и ждем
// подтверждения о нормальном выходе от каждого потока.
for i := 0; i < *jobs; i++ {
tasks <- ""
<- done
}
}
}
// Поток-runner.
func runner(tasks chan string, done chan bool) {
// Бесконечный цикл.
for {
// Ждем сообщения из канала. Обычно, поток заблокирован
// на этом месте.
name := <- tasks
// Если имя пустое, нас просят завершиться.
if len(name) == 0 {
break
}
// Компилируем файл.
compile(name)
}
// Посылаем сообщение, что поток завершился.
done <- true
}
func compile(name string) {
// Вызываем компилятор.
c, err := exec.Run(*compiler, []string{*compiler, name},
os.Environ(), "./data", exec.DevNull,
exec.PassThrough, exec.PassThrough)
if err != nil {
fmt.Printf("unable to compile %s (%s)\n", name, err.String())
os.Exit(1)
}
c.Wait(0)
}
Makefile:
target = tar_extractor
all:
6g $(target).go
6l -o $(target) $(target).6
Я погонял это добро под Линуксом 64-бит на восьми процессорном блейде. Во время тестирования я был на машине один, так что результаты разных прогонов можно сравнивать. Файл "huge.tar" содержит ~15 тысяч исходников и имеет размер один гигабайт.
Так выглядит загрузка процессоров, когда машина ничего не делает (все процессоры почти на 100% в idle):
Cpu0 : 0.0%us, 0.0%sy, 0.0%ni,100.0%id, 0.0%wa, 0.0%hi, 0.0%si, 0.0%st
Cpu1 : 0.0%us, 0.0%sy, 0.0%ni, 99.7%id, 0.3%wa, 0.0%hi, 0.0%si, 0.0%st
Cpu2 : 0.0%us, 0.0%sy, 0.0%ni,100.0%id, 0.0%wa, 0.0%hi, 0.0%si, 0.0%st
Cpu3 : 0.0%us, 0.0%sy, 0.0%ni,100.0%id, 0.0%wa, 0.0%hi, 0.0%si, 0.0%st
Cpu4 : 0.0%us, 0.0%sy, 0.0%ni,100.0%id, 0.0%wa, 0.0%hi, 0.0%si, 0.0%st
Cpu5 : 0.0%us, 0.3%sy, 0.0%ni, 99.3%id, 0.3%wa, 0.0%hi, 0.0%si, 0.0%st
Cpu6 : 0.0%us, 0.0%sy, 0.0%ni,100.0%id, 0.0%wa, 0.0%hi, 0.0%si, 0.0%st
Cpu7 : 0.0%us, 0.0%sy, 0.0%ni,100.0%id, 0.0%wa, 0.0%hi, 0.0%si, 0.0%st
Запускаем в последовательном режиме (-jobs 0):
make && time -p ./tar_extractor -jobs 0 huge.tar
Время работы:
real 213.81
user 187.32
sys 61.33
Практически все процессоры на 70-80% ничего не делают (все снимки я делал во время стадии компиляции):
Cpu0 : 11.9%us, 4.3%sy, 0.0%ni, 82.5%id, 1.3%wa, 0.0%hi, 0.0%si, 0.0%st
Cpu1 : 9.6%us, 2.7%sy, 0.0%ni, 87.7%id, 0.0%wa, 0.0%hi, 0.0%si, 0.0%st
Cpu2 : 4.3%us, 1.3%sy, 0.0%ni, 92.7%id, 1.7%wa, 0.0%hi, 0.0%si, 0.0%st
Cpu3 : 16.0%us, 6.0%sy, 0.0%ni, 78.0%id, 0.0%wa, 0.0%hi, 0.0%si, 0.0%st
Cpu4 : 12.6%us, 4.3%sy, 0.0%ni, 82.7%id, 0.3%wa, 0.0%hi, 0.0%si, 0.0%st
Cpu5 : 11.6%us, 3.3%sy, 0.0%ni, 85.0%id, 0.0%wa, 0.0%hi, 0.0%si, 0.0%st
Cpu6 : 4.7%us, 1.3%sy, 0.0%ni, 94.0%id, 0.0%wa, 0.0%hi, 0.0%si, 0.0%st
Cpu7 : 16.6%us, 6.3%sy, 0.0%ni, 77.1%id, 0.0%wa, 0.0%hi, 0.0%si, 0.0%st
Суммарная загрузка процессоров - 2.7%:
PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND
15054 tester 18 0 41420 4980 1068 S 2.7 0.1 0:02.96 tar_extractor
Теперь запускаем с пулом потоков, но только с одним каналом (-jobs 1).
Время:
real 217.87
user 191.42
sys 62.53
Процессоры:
Cpu0 : 5.7%us, 1.7%sy, 0.0%ni, 92.7%id, 0.0%wa, 0.0%hi, 0.0%si, 0.0%st
Cpu1 : 13.3%us, 5.3%sy, 0.0%ni, 81.3%id, 0.0%wa, 0.0%hi, 0.0%si, 0.0%st
Cpu2 : 7.0%us, 2.7%sy, 0.0%ni, 89.3%id, 0.7%wa, 0.0%hi, 0.3%si, 0.0%st
Cpu3 : 15.3%us, 5.7%sy, 0.0%ni, 77.7%id, 1.3%wa, 0.0%hi, 0.0%si, 0.0%st
Cpu4 : 6.0%us, 2.0%sy, 0.0%ni, 92.0%id, 0.0%wa, 0.0%hi, 0.0%si, 0.0%st
Cpu5 : 14.3%us, 7.3%sy, 0.0%ni, 78.4%id, 0.0%wa, 0.0%hi, 0.0%si, 0.0%st
Cpu6 : 7.0%us, 2.3%sy, 0.0%ni, 90.7%id, 0.0%wa, 0.0%hi, 0.0%si, 0.0%st
Cpu7 : 15.3%us, 6.6%sy, 0.0%ni, 78.1%id, 0.0%wa, 0.0%hi, 0.0%si, 0.0%st
Понятно, что картина такая же, так как реально мы также гоняем один поток.
А теперь включаем пул потоков (-jobs 32):
make && time -p ./tar_extractor -jobs 32 huge.tar
Время работы упало почти в семь раз:
real 38.38
user 195.55
sys 69.92
Общая загрузка процессоров (во время стадии компиляции) возросла до 23%:
PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND
17488 tester 16 0 45900 9732 1076 S 23.6 0.1 0:06.40 tar_extractor
Видно, что все процессоры реально заняты:
Cpu0 : 56.3%us, 26.3%sy, 0.0%ni, 17.3%id, 0.0%wa, 0.0%hi, 0.0%si, 0.0%st
Cpu1 : 55.5%us, 27.9%sy, 0.0%ni, 15.6%id, 1.0%wa, 0.0%hi, 0.0%si, 0.0%st
Cpu2 : 56.1%us, 25.9%sy, 0.0%ni, 15.0%id, 0.7%wa, 0.3%hi, 2.0%si, 0.0%st
Cpu3 : 58.1%us, 26.2%sy, 0.0%ni, 15.6%id, 0.0%wa, 0.0%hi, 0.0%si, 0.0%st
Cpu4 : 57.2%us, 25.8%sy, 0.0%ni, 17.1%id, 0.0%wa, 0.0%hi, 0.0%si, 0.0%st
Cpu5 : 56.8%us, 26.2%sy, 0.0%ni, 16.9%id, 0.0%wa, 0.0%hi, 0.0%si, 0.0%st
Cpu6 : 59.0%us, 26.3%sy, 0.0%ni, 13.0%id, 1.7%wa, 0.0%hi, 0.0%si, 0.0%st
Cpu7 : 56.5%us, 27.2%sy, 0.0%ni, 16.3%id, 0.0%wa, 0.0%hi, 0.0%si, 0.0%st
Данное тестирование предназначено исключительно для понимания, как простейший параллельный код ускоряет все в разы. И также для демонстрации, как просто и относительно безопасно можно программировать параллельные вычисления в Go.
Посты по теме Go: