*** ВНИМАНИЕ: Блог переехал на другой адрес - demin.ws ***
Показаны сообщения с ярлыком erlang. Показать все сообщения
Показаны сообщения с ярлыком erlang. Показать все сообщения

суббота, 10 декабря 2011 г.

Задача расположения восьми ферзей на Erlang'e

Знаю, что баян, но для меня было весьма показательно.

Например, вот вариант, который я написал где-то за полчаса:

-module(queens_classic).
-export([solve/0]).

solve() ->
    solve(lists:seq(1, 8), lists:seq(1, 15 + 15), 1, []).

print_board([]) -> io:format("~n", []);
print_board([H|T]) ->
    Line = [string:copies(". ", H - 1), "@ ", string:copies(". ", 8 - H)],
    io:format("~s~n", [Line]),
    print_board(T).

solve(_, _, Cols, Result) when Cols > 8 ->
    io:format("~p~n", [Result]),
    print_board(Result);

solve(Rows, Diags, Col, Result) ->
    lists:foreach(fun(Row) ->
                     D1 = Row + Col,
                     D2 = Row - Col + 8 + 15,
                     T = lists:member(Row, Rows) andalso
                         lists:member(D1, Diags) andalso
                         lists:member(D2, Diags),
                     if T ->
                         Rows1 = Rows -- [Row],
                         Diags1 = Diags -- [D1, D2],
                         solve(Rows1, Diags1, Col + 1, [Row | Result]);
                        true -> void
                     end
                  end, Rows).

Выглядит ужасно, и стиль однозначно понятно какой: C/Python на стероидах (циклы, if'ы).

А вот над этим вариантом я провозился несколько часов:

-module(queens).
-export([solve/0]).

solve() ->
    solve(1, []).

print_board([]) -> io:format("~n", []);
print_board([{_X, Y}|T]) ->
    Line = [string:copies(". ", Y - 1), "@ ", string:copies(". ", 8 - Y)],
    io:format("~s~n", [Line]),
    print_board(T).

solve(X, Taken) when X > 8 ->
    io:format("~p~n", [Taken]),
    print_board(Taken);

solve(X, Taken) ->
    L = [{X, Y} || Y <- lists:seq(1, 8), not under_attack({X, Y}, Taken)],
    row(L, Taken).

row([], _) -> [];
row([{X, Y}|L], Taken) ->
    solve(X + 1, [{X, Y} | Taken]),
    row(L, Taken).

under_attack(_, []) -> false;
under_attack({X, Y}, [{Xt, Yt}|L]) ->
    Y == Yt orelse abs(Y - Yt) == abs(X - Xt) orelse
    under_attack({X, Y}, L).

Вся работа со списками вручную без циклоподобных конструкций.

Печатает типа такого:

[4,7,5,2,6,1,3,8]
. . . @ . . . .
. . . . . . @ .
. . . . @ . . .
. @ . . . . . .
. . . . . @ . .
@ . . . . . . .
. . @ . . . . .
. . . . . . . @

[5,7,2,6,3,1,4,8]
. . . . @ . . .
. . . . . . @ .
. @ . . . . . .
. . . . . @ . .
. . @ . . . . .
@ . . . . . . .
. . . @ . . . .
. . . . . . . @

...

Увы, но вот эта версия мне кажется более красивой с точки зрения фукнционального стиля.

На всякий случай Makefile для обоих вариантов:

target = queens

all:
    erlc $(target).erl
    erl -noshell -s $(target) solve -s init stop

classic:
    erlc $(target)_classic.erl
    erl -noshell -s $(target)_classic solve -s init stop

clean:
    -rm *.beam *.dump

среда, 30 ноября 2011 г.

MapReduce на Erlang'e

Я продолжаю погружение в Эрланг. Уже есть хитрый план переписать один из наших сервисов для мониторинга на Эрланге. Мы тут осваиваем облака Windows Azure и Amazon EC2 в качестве платформы для некоторых продуктов и внутренних задач типа QA, поэтому возможность использовать много ядер и машин без переписывания кода выглядить перспективно.

Итак, для начала простой, но реальный пример - есть проект ~2000 файлов. Надо составить список используемых переменных окружения. То есть найти вхождения строк "getenv(...)" и "GetVariable(...)" (это наш wrapper) и выдрать из них параметр.

Задача незамысловатая и давно решается программой на C++, которая даже обход каталогов не делает, а просто вызывает юниксовый "find", генерирующий список файлов по маске, и затем по списку лопатит файлы. На 2000 файлах работает пару секунд в один поток.

Теперь Эрланг. Тут хочется замутить что-нибудь более кучерявое, чем последовательный обход файлов. MapReduce как раз в тему - можно составить список файлов, затем анализ каждого файла делать параллельно (Map), аккумулируя найденные имена переменных, и в конце обработать все полученные входждение (Reduce), в нашем случае просто подсчитать количество вхождения каждой переменной.

Фактически мой код повторяет пример из "Programming Erlang" и использует модуль phofs (parallel higher-order functions) из этой же книги.

-module(find_variables).
-export([main/0, find_variables_in_file/2, process_found_variables/3]).

-define(PATH, "/Projects/interesting_project").
-define(MASK, "\\..*(cpp|c)").

main() ->
    io:format("Creating list of files...~n", []),
    % Стандартная функция обхода файловой системы. Последний параметр -
    % функтор, накапливающий имена в списке.
    Files = filelib:fold_files(?PATH, ?MASK, true,
                               fun(N, A) -> [N | A] end, []),
    io:format("Found ~b file(s)~n", [length(Files)]),
    F1 = fun find_variables_in_file/2,   % Map
    F2 = fun process_found_variables/3,  % Reduce
    % Вызываем MapReduce через функцию benchmark, считающую время
    % выполнения.
    benchmark(fun() ->
        L = phofs:mapreduce(F1, F2, [], Files),
        io:format("Found ~b variable(s)~n", [length(L)])
    end, "MapReduce").

benchmark(Worker, Title) ->
    {T, _} = timer:tc(fun() -> Worker() end),
    io:format("~s: ~f sec(s)~n", [Title, T/1000000]).

-define(REGEXP, "(getenv|GetVariable)\s*\\(\s*\"([^\"]+)\"\s*\\)").

% Map. Анализ одного файла.
find_variables_in_file(Pid, FileName) ->
    case file:open(FileName, [read]) of
        {ok, File} ->
            % Заранее компилируем регулярное выражение.
            {_, RE} = re:compile(?REGEXP),
            % Данный обратный вызов пошлет родительскому контролирующему
            % потому сообщение с именем найденной переменной.
            CallBack = fun(Var) -> Pid ! {Var, 1} end,
            find_variable_in_file(File, RE, CallBack),
            file:close(File);
        {error, Reason} ->
            io:format("Unable to process '~s', ~p~n", [FileName, Reason]),
            exit(1)
    end.

% Reduce. Анализ данных. Данная функция вызывается контролирующим
% процессом MapReduce для каждого найденного ключа вместе со списком
% значений, ассоциированных с ним. В нашем случае это будут пары
% {VarName, 1}. Мы просто подсчитаем для каждого VarName количество
% пришедших пар, то есть количество найденных вхождений этой переменной.
% Это и есть наш незамысловатый анализ.

process_found_variables(Key, Vals, A) ->
    [{Key, length(Vals)} | A].

% Построчный обход файла.
find_variable_in_file(File, RE, CallBack) ->
    case io:get_line(File, "") of
       eof -> void;
       Line ->
         scan_line_in_file(Line, RE, CallBack),
         find_variable_in_file(File, RE, CallBack)
    end.

% Поиск строки в строке по регулярному выражению (скомпилированному ранее),
% и в случае нахождение вызов CallBack с передачей ему имени найденной
% переменной.
scan_line_in_file(Line, RE, CallBack) ->
    case re:run(Line, RE) of
        {match, Captured} ->
            [_, _, {NameP, NameL}] = Captured,
            Name = string:substr(Line, NameP + 1, NameL),
            CallBack(Name);
        nomatch -> void
    end.

Для сборки программы нужен модуль phofs. Он является универсальным, независимым от конкретных функций Map и Reduce.

И Makefile на всякий случай:

target = find_variables

all:
    erlc $(target).erl
    erlc phofs.erl
    erl -noshell -s $(target) main -s init stop

clean:
    -rm *.beam *.dump

Пузомерка. Как я уже сказал, программа на C++ вместе со временем вызова "find" на моей машине работает 1-2 секунды. Версия на Erlang'e работает ~20 секунд. Плохо? Смотря как посмотреть. Если анализ каждого файла будет более длительным (то есть программа будет основное время тратить на анализ файла, а не обход каталогов), то тут уже не совсем очевидно, какое из решений будет более практично при увеличении числа файлов и сложности анализа.

Я новичок в Эрланге, поэтому будут признателен за критику кода.

Посты по теме:

суббота, 26 ноября 2011 г.

Улучшенный TCP/IP proxy на Erlang'e

Писал я про мое освоение Эрганга через написание программы для перехвата и удобного логирования TCP/IP соединений.

B итоге я окончательно допилил программу, и теперь она заменила мне версию на Питоне.

Что программа умеет особенно удобного (как мне кажется):

  • удобный вид лога, в котором отображается шестнадцатеричный дамп, и символьного представление для видимых кодов
  • в дампе отображается номер соединения (в случае смешивания выводов нескольких параллельных соединений)
  • для каждого соединения вычисляется длительность
  • ведутся дополнительные двоичные логи для каждой из сторон в соединении (для повторного "проигрывания" данных)

Про Эрланг. Меня начинает реально вставлять. Я почувствовал (для многих это и не новость), что тут можно написать что-то реальное, особенно связаное с сетью и многозадачностью.

Из насущных проблем:

  • Пока нет чувства разумного дробления на модули и даже функции. При общей тотальной иммутабельности сложно что-то напортачить, но когда количество функций разрастается, хочется их как-то группировать.
  • Нет чувства правильного форматирования кода. Вроде как 80-ти символьные строки и пробелы вместо табуляций меня пока никогда не подводили, но при функциональном стиле кода часто получаются длинные "лесенки".

Пузомерка. Я сделал тест на прокач шестидесятимегового файла через питоновскую и эрланговскую версию. Результаты интересные.

Кач напрямую:

curl http://www.erlang.org/download/otp_src_R14B04.tar.gz >direct

Через Питон:

Window 1: python pyspy.py -l 50000 -a www.erlang.org -p 80 -L log

Window 2: curl http://localhost:50000/download/otp_src_R14B04.tar.gz >via-proxy-python

Через Эрланг:

Window 1: escript tcp_proxy.erl 50000 www.erlang.org 80

Window 2: curl http://localhost:50000/download/otp_src_R14B04.tar.gz >via-proxy-erlang

Файл напрямую качается, условно, минуту. Питоновская версия прокачала файл за шесть минут при включенном логе на экран и файл. Причем сброс лога и непосредственно прокач заканчивались приблизительно в одно время (данные задачи выполняются параллельно, общаясь через очередь, и технически не обязаны завершаться одновременно, так как очередь надо выгрести).

На Эрланге картина иная. Файл прокачался практически за то же время, что и напрямую! Но вот полного сброса лога я так и дождался. Через шесть минут он успел сбросить где-то 10% лога.

Выводы: Видимо, поведение питоновской версии обусловлено тем, что поток лога и потока-качалка работаются примерно с одной скоростью, поэтому в среднем очеред обмена постоянно выгребается. Фактически, скорость программы ограничена пропускной способностью потока логирования, но так как визуально не видно, что поток качания заканчиватся значительно раньше, то можно предположить, что он работается примерно с такой же скоростью (напомню, ~6 минут).

На Эрланге же качалка работает, как мне показалось, очень быстро. Данные перекачиваются и параллельно загоняются в очередь на логирование. А вот производительность логирования оставляет желать лучшего. Ради эксперимента я закомментировал вызов функции создания шестнадцатеричного дампа, и время сброса лога также упало до минуты. Поэтому, как мне кажется, корень зла в моей кривой работе со строками и списками при создании дампа (возможно что-то где-то постоянно копируются, а в мире рекурсии и изменения данных только через копирование ошибки подобного рода дорого отражаются на производильности). А вот работа с сокетами и посылкой/приемом сообщений между потоками в Эрланге очень эффективная.

Я вообще заметил, что в Эрланге ты подсознательно начинашь писать многопотоковые программы. Например, тут в принципе нет глобальных объектов. И допустим, у тебя есть флаг, глобальная установка, которую хочется иметь везде. Так как глобально ее объявить нельзя, приходится таскать как параметр функций там и сям. А как вариант "навязанного конструктивного мышления", думаешь - а давай-ка я запущу этот кусок как поток и буду вызывать его функционал через посылку сообщений. В этом случае я могу передать мне нужный параметр один раз в начале при создании потока, тем самым сделав его типа глобальным для этого потока.

Наверное пример вышел немного скомканным, но общая идея такова - так как ты обязан передавать в функцию все ее параметры каждый раз, то начинаешь думать об максимальной независимости и дроблении функционала, что как следствие, позволяет запускать их в разных потоках.

Для интересующихся - исходник доступен.

Посты по теме:

вторник, 22 ноября 2011 г.

TCP/IP proxy на Erlang'e

По мотивам недавнего поста про изучение новых языков, я таки добил версию на Erlang'е. Если тут есть спецы по языку, буду признателен за критику.

Программа по функциям идентична версии на Питоне за исключением отсутствия дублирования лога в файл и продвинутого разбора флагов командной строки.

И так: программа многопоточна, и журналирование также происходит в отдельном потоке для обеспечения целостности многострочных дампов.

Про Эрланг. После многократных и пока полностью неуспешных заходов на Хаскелл и после все еще неудачных попыток на Lisp или Scheme написать что-то более менее реальное и жизненное, Эрланг был реальным прорывом для меня.

Удивительно, невозможность изменять переменные (представьте, что программируя на С++ надо все переменные делать const) является фантастическим способом борьбы с опечатками при cut-and-paste. Также когда делаешь циклы через хвостовую рекурсию, сразу осознаешь, как эффективно работать со списками, чтобы их не копировать, а всегда "таскать" за хвост или голову.

Ну а концепция легких потоков и обмена сообщениями между ними (как в Go), приправленная глобальной иммутабельностью, позволяет легко писать надежные многотопочные программы.

Например, истересен способ реализации многопотокового TCP/IP сервера. Обычно при его программировании есть распростраенный прием: один главный поток, принимающий соединения, и когда соединение принято, создается новый поток-исполнитель, который обрабатывает соединение и после этого умирает.

В Эрланге можно сделать иначе (функция acceptor()). Поток, ожидающий входящего соединения, после его получения рождает свой клон для ожидания следующего соединения и затем сам обрабабатывает запрос.

Для меня это было немного необычно.

-module(tcp_proxy).

-define(WIDTH, 16).

main([ListenPort, RemoteHost, RemotePort]) ->
    ListenPortN = list_to_integer(ListenPort),
    start(ListenPortN, RemoteHost, list_to_integer(RemotePort));

main(_) -> usage().

usage() ->
    io:format("~ntcp_proxy.erl local_port remote_port remote_host~n~n", []),
    io:format("Example:~n~n", []),
    io:format("tcp_proxy.erl 50000 google.com 80~n~n", []).

start(ListenPort, CalleeHost, CalleePort) ->
    io:format("Start listening on port ~p and forwarding data to ~s:~p~n",
              [ListenPort, CalleeHost, CalleePort]),
    {ok, ListenSocket} = gen_tcp:listen(ListenPort, [binary, {packet, 0},
                                                    {reuseaddr, true},
                                                    {active, true}]),
    io:format("Listener socket is started ~s~n", [socket_info(ListenSocket)]),
    spawn(fun() -> acceptor(ListenSocket, CalleeHost, CalleePort, 0) end),
    register(logger, spawn(fun() -> logger() end)),
    wait().

% Infinine loop to make sure that the main thread doesn't exit.
wait() -> receive _ -> true end, wait().

format_socket_info(Info) ->
    {ok, {{A, B, C, D}, Port}} = Info,
    lists:flatten(io_lib:format("~p.~p.~p.~p:~p", [A, B, C, D, Port])).

peer_info(Socket) -> format_socket_info(inet:peername(Socket)).

socket_info(Socket) -> format_socket_info(inet:sockname(Socket)).

acceptor(ListenSocket, RemoteHost, RemotePort, ConnN) ->
    case gen_tcp:accept(ListenSocket) of
      {ok, LocalSocket} ->
          spawn(fun() -> acceptor(ListenSocket, RemoteHost, RemotePort, ConnN + 1) end),
          LocalInfo = peer_info(LocalSocket),
          logger ! {message, "~4.16.0B: Incoming connection from ~s~n", [ConnN, LocalInfo]},
          case gen_tcp:connect(RemoteHost, RemotePort, [binary, {packet, 0}]) of
            {ok, RemoteSocket} ->
              RemoteInfo = peer_info(RemoteSocket),
              logger ! {message, "~4.16.0B: Connected to ~s~n", [ConnN, RemoteInfo]},
              exchange_data(LocalSocket, RemoteSocket, LocalInfo, RemoteInfo, ConnN, 0),
              logger ! {message, "~4.16.0B: Finished~n", [ConnN]};
            {error, Reason} ->
              logger ! {message, "~4.16.0B: Unable to connect to ~s:~s (error: ~p)~n",
                       [ConnN, RemoteHost, RemotePort, Reason]}
          end;
      {error, Reason} ->
          logger ! {message, "Socket accept error '~w'~n", [Reason]}
    end.

exchange_data(LocalSocket, RemoteSocket, LocalInfo, RemoteInfo, ConnN, PacketN) ->
    receive
        {tcp, RemoteSocket, Bin} ->
            logger ! {received, ConnN, Bin, RemoteInfo, PacketN},
            gen_tcp:send(LocalSocket, Bin),
            logger ! {sent, ConnN, LocalInfo, PacketN},
            exchange_data(LocalSocket, RemoteSocket, LocalInfo, RemoteInfo, ConnN, PacketN + 1);
        {tcp, LocalSocket, Bin} ->
            logger ! {received, ConnN, Bin, LocalInfo, PacketN},
            gen_tcp:send(RemoteSocket, Bin),
            logger ! {sent, ConnN, RemoteInfo, PacketN},
            exchange_data(LocalSocket, RemoteSocket, LocalInfo, RemoteInfo, ConnN, PacketN + 1);
        {tcp_closed, RemoteSocket} ->
            logger ! {message, "~4.16.0B: Disconnected from ~s~n", [ConnN, RemoteInfo]};
        {tcp_closed, LocalSocket} ->
            logger ! {message, "~4.16.0B: Disconnected from ~s~n", [ConnN, LocalInfo]}
    end.

logger() ->
    receive
        {received, Pid, Msg, From, PacketN} ->
            io:format("~4.16.0B: Received (#~p) ~p byte(s) from ~s~n",
                      [Pid, PacketN, byte_size(Msg), From]),
            dump_bin(Pid, Msg),
            logger();
        {sent, Pid, ToSocket, PacketN} ->
            io:format("~4.16.0B: Sent (#~p) to ~s~n", [Pid, PacketN, ToSocket]),
            logger();
        {message, Format, Args} ->
            io:format(Format, Args),
            logger()
    end.

dump_list(Prefix, L, Offset) ->
    {H, T} = lists:split(lists:min([?WIDTH, length(L)]), L),
    io:format("~4.16.0B: ", [Prefix]),
    io:format("~4.16.0B: ", [Offset]),
    io:format("~-*s| ", [?WIDTH * 3, dump_numbers(H)]),
    io:format("~-*s", [?WIDTH, dump_chars(H)]),
    io:format("~n", []),
    if length(T) > 0 -> dump_list(Prefix, T, Offset + 16); true -> [] end.

dump_numbers(L) when (is_list(L)) ->
    lists:flatten([io_lib:format("~2.16.0B ", [X]) || X <- L]).

dump_chars(L) ->
    lists:map(fun(X) ->
                if X >= 32 andalso X < 128 -> X;
                   true -> $.
                end
              end, L).

dump_bin(Prefix, Bin) ->
    dump_list(Prefix, binary_to_list(Bin), 0).

В работе может выводить примерно следующее:

alexander:erlang/>./tcp_proxy.sh 50000 pop.yandex.ru 110
Start listening on port 50000 and forwarding data to pop.yandex.ru:110
Listener socket is started 0.0.0.0:50000
0000: Incoming connection from 127.0.0.1:51402
0000: Connected to 213.180.204.37:110
0000: Received (#0) 38 byte(s) from 213.180.204.37:110
0000: 0000: 2B 4F 4B 20 50 4F 50 20 59 61 21 20 76 31 2E 30 | +OK POP Ya! v1.0
0000: 0010: 2E 30 6E 61 40 32 35 20 67 55 62 44 54 51 64 5A | .0na@25 gUbDTQdZ
0000: 0020: 6D 6D 49 31 0D 0A                               | mmI1..
0000: Sent (#0) to 127.0.0.1:51402
0000: Received (#1) 11 byte(s) from 127.0.0.1:51402
0000: 0000: 55 53 45 52 20 74 65 73 74 0D 0A                | USER test..
0000: Sent (#1) to 213.180.204.37:110
0000: Received (#2) 23 byte(s) from 213.180.204.37:110
0000: 0000: 2B 4F 4B 20 70 61 73 73 77 6F 72 64 2C 20 70 6C | +OK password, pl
0000: 0010: 65 61 73 65 2E 0D 0A                            | ease...
0000: Sent (#2) to 127.0.0.1:51402
0000: Received (#3) 11 byte(s) from 127.0.0.1:51402
0000: 0000: 50 41 53 53 20 70 61 73 73 0D 0A                | PASS pass..
0000: Sent (#3) to 213.180.204.37:110
0000: Received (#4) 72 byte(s) from 213.180.204.37:110
0000: 0000: 2D 45 52 52 20 5B 41 55 54 48 5D 20 6C 6F 67 69 | -ERR [AUTH] logi
0000: 0010: 6E 20 66 61 69 6C 75 72 65 20 6F 72 20 50 4F 50 | n failure or POP
0000: 0020: 33 20 64 69 73 61 62 6C 65 64 2C 20 74 72 79 20 | 3 disabled, try
0000: 0030: 6C 61 74 65 72 2E 20 73 63 3D 67 55 62 44 54 51 | later. sc=gUbDTQ
0000: 0040: 64 5A 6D 6D 49 31 0D 0A                         | dZmmI1..
0000: Sent (#4) to 127.0.0.1:51402
0000: Disconnected from 213.180.204.37:110
0000: Finished
0001: Incoming connection from 127.0.0.1:51405
0001: Connected to 213.180.204.37:110
0001: Received (#0) 38 byte(s) from 213.180.204.37:110
0001: 0000: 2B 4F 4B 20 50 4F 50 20 59 61 21 20 76 31 2E 30 | +OK POP Ya! v1.0
0001: 0010: 2E 30 6E 61 40 33 30 20 70 55 62 41 72 52 33 74 | .0na@30 pUbArR3t
0001: 0020: 6A 65 41 31 0D 0A                               | jeA1..
0001: Sent (#0) to 127.0.0.1:51405
0001: Received (#1) 6 byte(s) from 127.0.0.1:51405
0001: 0000: 51 55 49 54 0D 0A                               | QUIT..
0001: Sent (#1) to 213.180.204.37:110
0001: Received (#2) 20 byte(s) from 213.180.204.37:110
0001: 0000: 2B 4F 4B 20 73 68 75 74 74 69 6E 67 20 64 6F 77 | +OK shutting dow
0001: 0010: 6E 2E 0D 0A                                     | n...
0001: Sent (#2) to 127.0.0.1:51405
0001: Disconnected from 213.180.204.37:110
0001: Finished

Вывод: Эрланг - прекрасный вариант для начала функциональной карьеры.

Посты по теме: