Написание заданий ETL на чистом C#

ОГЛАВЛЕНИЕ

Если вы любите писать на C# и не любите использовать DTS/SSIS для создания заданий ETL (или общую идею щелканья по конструктору с целью сделать работу), то эта статья для вас.

•    Скачать исходники - 3.57 Мб

Введение

Если вы не любите писать на C# и/или не против создания заданий ETL посредством конструктора и считаете, что конструктор включает в себя все нужные вам сценарии, то эта статья не для вас.

Пример процесса в данной статье имеет 4 операции. Две – для чтения из разных источников пользователя, одна – для их объединения и одна –  для записи объединенных данных:

public class MainProcess : EtlProcess
{
    protected override void Initialize()
    {
        Register(new JoinUserRecords()
            .Left(new UserNameRead(Settings.Default.NamesFile))
            .Right(new UserAddressRead(Settings.Default.AddressesFile))
        );

        Register(new UserFullWrite(Settings.Default.OutputFile));
    }
}

Файл имен пользователей:

Id Name
1 Bob
2 John
3 Frank

Файл адресов:

Id Address
1 123 Main St.
2 42 Everywhich way
3 1 Microsoft way

Выходной файл из процесса:

Id Name Address
1 Bob 123 Main St.
2 John 42 Everywhich way
3 Frank 1 Microsoft way

Если, смотря на вышесказанное, вы не понимаете, что делается, то этот метод не будет интересен вам. Но если вам ясен смысл того, что делает вышеназванный процесс, и легче понять его, чем щелкать по куче диалоговых окон, то вам наверняка очень понравится Rhino ETL.

ETL

ETL означает – извлечь, преобразовать, загрузить. Например, вы получаете файлы или другие данные от поставщиков или других третьих сторон, которые надо каким-то образом обработать и затем вставить в базу данных.

Получение Rhino ETL

Сейчас нелегко получить рабочую копию Rhino ETL, поэтому это и было написано. Это стимулирует интерес к нему и приведет к добавлению дополнительных возможностей и улучшений, более частым выходам версий и т.д. На многих сайтах люди пытаются выяснить, как начать, и для этого им приходится скачивать и компилировать исходники инструментов Rhino. Данная статья даст вам то, что поможет быстро и просто скачать и посмотреть, как это работает и подойдет ли вам. Включенные сюда DLL(динамически подключаемые библиотеки) скомпилированы в режиме отладки. В итоге вы поймете, как скачать и скомпилировать исходники раньше, чем появятся стандартные версии и загрузки для него.

Чтобы скачать и скомпилировать Rhino ETL, вам потребуется:

Получить TortoiseSVN или любой другой клиент SVN(система управления версиями): Скачать исходники инструментов Rhino из http://rhino-tools.svn.sourceforge.net/svnroot/rhino-tools. Скомпилировать решение в подкаталоге ETL – недостаточно просто открыть решение и собрать его, надо заставить работать правильно сборку Nant. Чтобы избежать этого, были удалены все ссылочные файлы AssemblyInfo.cs, и ссылки были изменены, чтобы можно было просто собирать проекты, необходимые для запуска тестов ETL.

Использование Rhino ETL

Изучение блочных тестов в проекте ETL позволило понять суть работы Rhino ETL. Они помогают понять тонкости всех доступных опций. Документация отсутствует, поэтому указанный способ, позволяющий понять принципы его работы, единственный. При возникновении проблемы найдите похожий блочный тест и пройдите код по шагам. Так решается большинство появляющихся трудностей.

Объяснение мягкого возврата

Rhino ETL широко использует мягкий возврат, понимание которого необходимо для создания сложных заданий ETL. Здесь объясняется, что надо знать для работы с ним.

Если имеется класс, генерирующий числа до максимально целого значения таким образом:

private class NumberGenerator
{
    public int NumbersGenerated { get; set; }
   
    public IEnumerable<int> GenerateNumbers()
    {
        for (int i = 1; i < int.MaxValue; i++)
        {
            Console.WriteLine("NumberGenerator generated a number");
            NumbersGenerated++;
            yield return i;
        }
    }
}

то сгенерируются только те числа, по которым прошел цикл. Например, следующий тест подтверждает, что цикл не проходит по всему набору целых чисел от 1 до MaxValue, если результаты вообще не перечисляются:

[TestMethod]
public void UnenumeratedDoesNoWork()
{
    var generator = new NumberGenerator();

    generator.GenerateNumbers();

    Assert.AreEqual(0, generator.NumbersGenerated);
}

Аналогично, тест ниже подтверждает, что перечисление только первых пяти элементов генерирует 5 чисел, вместо генерации всех чисел и взятия первых пяти. Работа выполняется лишь в точности для того, что используется, и не более того.

[TestMethod]
public void GenerateFiveNumbers()
{
    var generator = new NumberGenerator();
   
    foreach (int number in generator.GenerateNumbers())
    {
        Console.WriteLine(number);

        if (number == 5) break;
    }

    Assert.AreEqual(5, generator.NumbersGenerated);
}

Последний и более интересный пример, соединяющий перечислители так, что результаты одного используются для другого.

private class ChainedNumberGenerator
{
    public int NumbersGenerated { get; set; }

    public IEnumerable<int> GenerateNumbers(IEnumerable<int> inNumbers)
    {
        foreach(int i in inNumbers)
        {
            Console.WriteLine("ChainedNumberGenerator generated a number");
            NumbersGenerated++;
            yield return i;
        }
    }
}

[TestMethod]
public void ChainedNumberGeneratorsAreDependentOnEachOther()
{
    var firstGenerator = new NumberGenerator();
    var lastGenerator = new ChainedNumberGenerator();

    foreach (int number in
      lastGenerator.GenerateNumbers(firstGenerator.GenerateNumbers()))
    {
        Console.WriteLine(number);
        if (number == 5) break;
    }

    Assert.AreEqual(5, lastGenerator.NumbersGenerated);
    Assert.AreEqual(5, firstGenerator.NumbersGenerated);
}

Этот код выводит следующее:

NumberGenerator generated a number
ChainedNumberGenerator generated a number
1

NumberGenerator generated a number
ChainedNumberGenerator generated a number
2

NumberGenerator generated a number
ChainedNumberGenerator generated a number
3

NumberGenerator generated a number
ChainedNumberGenerator generated a number
4

NumberGenerator generated a number
ChainedNumberGenerator generated a number
5

Если вышеуказанное непонятно –  изучите мягкий возврат, прежде чем идти дальше. Если пример понятен –  приступайте к использованию Rhino ETL.

Конвейер

Важно понимать мягкий возврат, потому что именно так строки отправляются от одной операции к другой в Rhino ETL. Если имеется процесс с операцией чтения, и процесс после него что-то делает с этими строками, то ожидается, что все строки  будут прочитаны, а затем запустится следующая операция.
Но на самом деле произойдет следующее (в зависимости от того, как итерируются строки): первая операция обработает одну строку, затем вторая операция обработает эту же самую строку, и т.д. на протяжении всего конвейера, который повторится для второй, третьей и т.д., строк. Аналогично, если бы во второй операции строки вообще не итерировались, первая не сделала бы вообще никакой работы.

Это вызывает путаницу, но дает повышение производительности – вместо многократного прохода в цикле по входным строкам цикл выполняется один раз, и выполняются все нужные действия.

Если надо обработать все строки на первом шаге и затем перейти ко второму, это возможно путем потребления (итерирования) всех строк в начале следующего процесса, прежде чем он выдаст собственные возвращаемые строки.

Объяснение "словаря кряканья"

Объект строки использует нечто под названием «словарь кряканья», позволяющий легко добавлять/обновлять элементы для строки и извлекать их без проверки на пустоту перед каждым обращением. Например, следующее правильно:

object a = row["SomemadeupstringThatDoesntExist"]
row["Column1"] = "abc";
row["Column1"] = "123";

Пример кода – объединение двух файлов в один

Включенный пример имеет блочные тесты для каждой операции, чтобы можно было подробно изучить каждую из них. Также можно запустить консольное приложение, выполняющее все сразу. Оно считывает два файла, объединяет их по общему свойству (Id) и записывает конечные результаты в другой текстовый файл.

Ниже приведен код главного процесса:

public class MainProcess : EtlProcess
{
    protected override void Initialize()
    {
        Register(new JoinUserRecords()
            .Left(new UserNameRead(Settings.Default.NamesFile))
            .Right(new UserAddressRead(Settings.Default.AddressesFile))
        );

        Register(new UserFullWrite(Settings.Default.OutputFile));
    }
}

Показаны первые две операции:

public class UserNameRead : AbstractOperation
{
    public UserNameRead(string filePath)
    {
        this.filePath = filePath;
    }

    string filePath = null;

    public override IEnumerable<Row> Execute(IEnumerable<Row> rows)
    {
        using (FileEngine file = FluentFile.For<UserNameRecord>().From(filePath))
        {
            foreach (object obj in file)
            {
                yield return Row.FromObject(obj);
            }
        }
    }
}

public class UserAddressRead : AbstractOperation
{
    public UserAddressRead(string filePath)
    {
        this.filePath = filePath;
    }

    string filePath = null;

    public override IEnumerable<Row> Execute(IEnumerable<Row> rows)
    {
        using (FileEngine file = FluentFile.For<UserAddressRecord>().From(filePath))
        {
            foreach (object obj in file)
            {
                yield return Row.FromObject(obj);
            }
        }
    }
}

Обратите внимание на использование FileHelpers через Rhino ETL для выполнения ввода-вывода в файл, избавляющего от необходимости разбираться с тем, как разобрать CSV, с файлом с разделителями табуляции и т.д. Он считывает строго типизированные данные C# из файлов, а затем преобразует их в объект Row, используемый Rhino ETL для передачи данных от операции к операции. Итак, эти операции создают объект строки для каждой строки во входных файлах и передают их следующей операции –  объединению:

public class JoinUserRecords : JoinOperation
{
    protected override void SetupJoinConditions()
    {
        InnerJoin
            .Left("Id")
            .Right("Id");
    }

    protected override Row MergeRows(Row leftRow, Row rightRow)
    {
        Row row = new Row();
        row.Copy(leftRow);

        //скопировать все свойства, не входящие в записи пользователя
        row["Address"] = rightRow["Address"];

        return row;
    }
}

Строки объединяются по их общему свойству в методе SetupJoinConditions, а затем надо реализовать еще один абстрактный метод, чтобы указать, что надо делать с объединенными строками. Например, может требоваться всего один столбец из объединенной строки, или могут требоваться все столбцы. Метод MergeRows управляет этим.

Конечный процесс выводит объединенные строки:

public class UserFullWrite : AbstractOperation
{
    public UserFullWrite(string filePath)
    {
        this.filePath = filePath;
    }

    string filePath = null;

    public override IEnumerable<Row> Execute(IEnumerable<Row> rows)
    {
        FluentFile engine = FluentFile.For<UserFullRecord>();
        engine.HeaderText = "Id\tName\tAddress";
        using (FileEngine file = engine.To(filePath))
        {
            foreach (Row row in rows)
            {
                file.Write(row.ToObject<UserFullRecord>());

                //при необходимости передать строки другой следующей операции
                yield return row;
            }
        }
    }
}

Снова используются FileHelpers через вспомогательные методы/классы Rhino ETL.

Напоследок

Здесь были показаны только операции с файлами, потому что их было проще распространить и заставить работать на машинах других людей. SQL, разумеется, поддерживается, и есть много операций для считывания таблицы и автоматического заполнения строк сразу. Более подробные данные о них смотрите в блочных тестах.

Настоящая мощность проявляется при выполнении более сложных задач, например, вызов веб-службы, использование объектов COM, организация поточной обработки и т.д., в задании ETL. Теперь можно использовать имеющийся набор навыков и опыт для выполнения указанных задач, а не пытаться придумать обходные пути и приемы в продукте с конструктором.