npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

dwh-service

v1.0.3

Published

dwh-service

Downloads

15

Readme

dwh-service

Содержание

Описание

dwh-service является монорепой для ELT-скриптов обработки данных и включает в себя:

| ПО | Версия | Описание | | -- | -------| -------- | | Apache Airflow | 2.3.3 | Оркестрация ELT-операций | | dbt | 1.2.0 | (T)ransform-операции в SQL-нотации | | Spark, PySpark | 3.3.0 | (E)xtract- и (L)oad-операции | | ADB | 6.21.1 | Хранилище данных | | ADQM | 22.3.7 | Витрины данных |

Дополнительно в состав проекта включены конфиги docker для развёртывания локального окружения, эмулирующего целевое. Подробную информацию о локальном стенде, разворачиваемом из docker-образов, см. в описании.

Структура проекта

dwh-service
├───airflow
├───dbt
├───docker
├───spark
├───sql_scripts
└───README.md

Сквозной процесс работы с одной сущностью в проекте

Для примера возьмём таблицу statement.decision в БД ТСР.

Необходимо загрузить таблицу в БД ХОАД, сделать необходимые преобразования, протащить её до схемы dwh_core и сделать витрину, зависящую от этой таблицы.

Если таблица уже добавлена в конфиг и нужно только обновить в ней данные, то достаточно выполнить только пункты 4 и 5.

1. Добавление/обновление метаданных таблицы в конфигурационной БД

Инструмент - локальная консоль (python).

Используем скрипт spark_development/get_db_structure.py. Вызываем его локально следующим образом:

python get_db_structure.py --db "ид БД ТСР в dwh_stg_config.load_data_config_sources" --sch "схема для загрузки (statement)" --meta_db "DEV или TEST"

ВАЖНО! Если БД источник - HBase, то для корректной работы необходимо установить Java (JDK), а также положить драйвер для подключения к HBase (phoenix-5.0.0-HBase-2.0-thin-client.jar) в папку spark/packages (предварительно папку надо создать)

скрипт работает только на python 3.10. Если установлена более ранняя версия python, то можно попросить загрузить кого-нибудь из коллег, у кого установлен python 3.10

В результате выполнения скрипта в таблице dwh_stg_config.source_db_structure добавятся/обновятся данные о всех таблицах в схеме statement в БД ТСР.

2. Добавление таблицы в конфигурационную таблицу

Инструмент - средство для работы с БД (например, DBeaver) с подключением к конфигурационной БД.

Используемые функции:

  1. dwh_stg_config.get_config_from_source - просмотр конфигурационных данных
  2. dwh_stg_config.insert_jdbc_config_from_source - добавление конфигурации JDBC
  3. dwh_stg_config.update_jdbc_config_from_source - обновление существующей конфигурации JDBC
  4. dwh_stg_config.insert_kafka_config_from_source - добавление новых агрегатов Kafka
  5. dwh_stg_config.update_kafka_config_from_source - обновление агрегатов Kafka
  6. dwh_stg_config.insert_full_config_from_source - добавление конфигурации JDBC и агрегата Kafka вместе
  7. dwh_stg_config.update_full_config_from_source - обновление конфигурации JDBC и добавление/обновление агрегата Kafka вместе

Для работы в данном случае нужны только insert_full_config_from_source и update_full_config_from_source. Остальные функции используются внутри этих двух.

Примеры соответствующих вызовов функций:

SELECT *
FROM dwh_stg_config.source_db_structure str
cross JOIN json_to_record (
    dwh_stg_config.get_config_from_source(
        _db_id := str.source_db,
        _schema_name := str.schema_name,
        _table_name := str.table_name,
        _aggregate_name := null,
        _result_table_name := null
        )
) as (
    app_name                text,
    kafka_config_id         int,
    load_mode               text,
    aggregate_name          text,
    source_from_id          int,
    schema_from_name        text,
    table_from_name         text,
    select_text             text,
    filter_col              text,
    default_filter_val      text,
    source_to_id            int,
    src_jdbc_table_name     text,
    src_kafka_table_name    text,
    json_fields             text,
    table_columns_json      text,
    dbt_generate_jdbc_stg   text,
    dbt_generate_kafka_stg  text,
    dbt_generate_kafka_test text,
    dbt_generate_core       text
)
WHERE str.table_name = 'decision'
  and str.schema_name = 'statement'
  and str.source_db = 8
do
$$                    
declare
    _config_id int;
begin
    _config_id := dwh_stg_config.insert_jdbc_config_from_source(
                        _db_id 				:= 8,
                        _schema_name 		:= 'statement',
                        _table_name 		:= 'decision',
                        _aggregate_name     := null,
                        _result_table_name 	:= 'statement__decision');

    raise notice '%', _config_id::text;

end;
$$;
do
$$                    
declare
    _config_id int;
    _table_name text;
    _aggregate_name text;
begin
    select j.config_id,
            case
                when position( '_jdbc__' in j.table_name_to) = 0
                then null
                else substring(j.table_name_to from position( '_jdbc__' in j.table_name_to)+7)
            end as table_name,
            a.aggregate_name
    into _config_id, _table_name, _aggregate_name
    from dwh_stg_config.load_data_config_jdbc j
    left join dwh_stg_config.load_kafka_aggregate_tables a
        on j.table_name_to = a.jdbc_table_name
        and a.active_record = 1
    where j.config_id = 3533; --ид обновляемой записи


    _config_id := dwh_stg_config.update_jdbc_config_from_source(
                        _config_id 			:= _config_id,
                        _aggregate_name     := _aggregate_name,
                        _result_table_name 	:= _table_name
    );
    raise notice '%', _config_id::text;

end;
$$;	 
do
$$                    
declare
    _aggregate_table_id int;
begin
    _aggregate_table_id := dwh_stg_config.insert_kafka_config_from_source(
                        _config_id_jdbc := 1600,
                        _aggregate_name := 'this.is.aggregate.name');

    raise notice '%', _aggregate_table_id::text;

end;
$$;
do
$$                    
declare
    _aggregate_table_id int;
begin
    _aggregate_table_id := dwh_stg_config.update_kafka_config_from_source(
                        _aggregate_table_id := 1600,
                        _jdbc_config_id := null,
                        _aggregate_name := null,
                        _result_table_name := null
    );

    raise notice '%', _aggregate_table_id::text;

end;
$$;
do
$$                    
declare
    res text;
begin
    res := dwh_stg_config.insert_full_config_from_source(
                        _db_id 				:= 8,
                        _schema_name 		:= 'statement',
                        _table_name 		:= 'person',
                        _aggregate_name     := 'ru.gov.pfr.ecp.apso.tsr.statement.pojo.Person',
                        _result_table_name 	:= 'stm__pers__tt');

    raise notice '%', res;

end;
$$;
do
$$                    
declare
    res text;
begin
    res := dwh_stg_config.update_full_config_from_source(
                        _config_id 			:= 3627,
                        _aggregate_name 	:= null,
                        _result_table_name 	:= null
    );

    raise notice '%', res;

end;
$$;  

Для каждой таблицы очень желательно сразу указать агрегат (атрибут _aggregate_name в функциях).

  1. Если таблица НЕ НСИ, то можно воспользоваться запросом в БД Greenplum вида:
SELECT "aggregateName", count(*)
FROM dwh_src.src_kafka__iw_multipart
where lower("aggregateName") like '%tsr%'
group by "aggregateName"
order by 1, 2

Если в entityName или aggregateName есть соответствующий агрегат, то необходимо привязать его к созданному конфигу JDBC. Если есть сомнения по поводу корректности агрегата, можно проверить набор его полей запросами:

select value_json
from dwh_src.src_kafka__iw_multipart
where "aggregateName" = 'ru.gov.pfr.ecp.rs.rs_fl_ip.pojo.GpDogovor'

Набор полей должен соотноситься с набором полей в таблице-источнике.

  1. Если таблица НСИ, то название агрегата = названию таблицы в CamelCase. Название можно найти, выполнив в БД НСИ запрос вида:
select * 
from wso.dict
where lower(name) like '%table%'

Для версионных справочников НСИ, в которых имеются несколько записей по одному code с различными периодами действия startdate / enddate. При добавлении/обновлении подобных справочников при запуске функций dwh_stg_config.update_full_config_from_source или dwh_stg_config.insert_full_config_from_source необходимо передавать параметр _id_field со значением 'code'. В остальных случаях передавать параметр не нужно. Пример соответствующих вызовов функций:

do
$$                    
declare
    res text;
begin
    res := dwh_stg_config.insert_full_config_from_source(
                        _db_id 				:= 3,
                        _schema_name 		:= 'wso',
                        _table_name 		:= 'dictofficemse',
                        _aggregate_name     := 'DictOfficeMse',
                        _result_table_name 	:= 'dictofficemse',
                        _id_field           := 'code');

    raise notice '%', res;

end;
$$;
do
$$                    
declare
    res text;
begin
    res := dwh_stg_config.update_full_config_from_source(
                        _config_id 			:= 3580,
                        _aggregate_name 	:= 'DictOfficeMse',
                        _result_table_name 	:= 'dictofficemse',
                        _id_field           := 'code'
    );

    raise notice '%', res;

end;
$$;  

После добавления/обновления конфига необходимо проверить корректность данных в dwh_stg_config.load_data_config_jdbc и dwh_stg_config.load_kafka_aggregate_tables.

ВАЖНО! По умолчанию конфиг добавляется с флагом active = 0. Для того, чтобы он попал в загрузку, необходимо изменить флаг на 1.

3. Создание модели в dbt для транформации в stg и core

Инструменты:

  • средство для работы с БД (например, DBeaver) с подключением к конфигурационной БД для просмотра добавленых ранее конфигов
  • локальная консоль (python) для выполнения скриптов
  • средство просмотра файлов в файловой системе (проводник, VSCode)

Если агрегат был добавлен, то в dwh_src необходимо создать также таблицу с данными kafka. Есть два варианта сделать это:

  1. Для НЕ НСИ: Запустить DAG HOAD_..._KAFKA_load_config_iw_system с параметрами {"offsets": "{'0': -2}", "aggregate_name": "aggr.name"} - указать название добавляемого агрегата (не забыть перед этим запустить DAG HOAD_..._update_config_file).
  2. Для НСИ, либо если для НЕ НСИ по первому варианту таблица не создалась: использовать запрос sql_scripts/old/114_select_to_create_kafka_table.sql. Запрос необходиимо выполнить в конфигурационной БД. В последнем столбце будет SQL-код для создания таблицы. Его необходимо выполнить в БД Greenplum.

На предыдущем шаге в конфигурационную таблицу в том числе добавляются вызовы скриптов генерации моделей dbt.

  1. Модель jdbc stg.

Для создания модели открыть в консоли папку dbt/py_scripts, выполнить команду из dwh_stg_config.load_data_config_jdbc.dbt_to_stg. В результате выполнения команды в папке dbt/models/stg/jdbc/generated_views будет создана необходимая модель.

  1. Модель kafka stg.

Если агрегат неизвестен и не был указан в конфигурации, то этот шаг пропускается.

Для создания модели открыть в консоли папку dbt/py_scripts, выполнить команду из dwh_stg_config.load_kafka_aggregate_tables.dbt_to_stg. В результате выполнения команды в папке dbt/models/stg/kafka/generated_views будет создана необходимая модель.

2.1. Тест данных kafka.

Если агрегат неизвестен и не был указан в конфигурации, то этот шаг пропускается.

Для создания теста открыть в консоли папку dbt/py_scripts, выполнить команду из dwh_stg_config.load_kafka_aggregate_tables.dbt_test. В результате выполнения команды в папке dbt/tests/stg/kafka/generated_tests будет создан необходимый тест.

Описание генератора тестов находится в dbt/tests/stg/README.md.

  1. Модель core.

Для создания модели открыть в консоли папку dbt/py_scripts, выполнить команду из dwh_stg_config.load_data_config_jdbc.dbt_to_core. В результате выполнения команды в папке dbt/models/core/generated_tables будет создана необходимая модель.

  1. Проверить созданные модели и тесты и, если всё ок, перенести их в соответствующие папки модуля, к которому они относятся.

4.1. В случае, если в модели необходимо создать индексы на определенные поля, которые могут потребоваться в витринах при соединении таблиц или в фильтрах, необходимо дополнить поле post_hook следующей конструкцией - {{create_index(<schema_name>, <table_name>, <column_name>, <_number>)}}, где

  • <schema_name> - название схемы, в которой распологается модель
  • <table_name> - название модели
  • <column_name> - название индексируемой колонки(можно создать составной индекс, указав колонки через запятую - 'column1,column2,column3', при этом пробелы использовать нельзя, т.к. эта переменная так же включается в название)
  • <_number> - порядковый номер индекса, начинается с 0 (по канону)

Создаются индексы только типа B-дерево

При использовании нескольких макросов в post_hook необходимо разделять их точкой с запятой (;) - post_hook: "{{macros1()}} ; {{macros2()}}"

ВАЖНО! Созданные модели являются только шаблонами, при необходимости в них можно вносить изменения.

4. Загрузка таблицы JDBC

Инструменты - Airflow

  1. Запустить DAG HOAD_<среда>_update_config_file для загрузки актуальной конфигурации из БД в конфигурационный файл dag_utils/config_file_data.py
  2. DAG'и в настоящий момент распределяются по модулям. Например, в DAG HOAD_<среда>_JDBC_load_TSR попадают конфиги, app_name которых начинается на "tsr". Если таблица относится к модулю, для которого уже создан DAG, то необходимо открыть этот DAG и посмотреть, что соответствующие таски с указанной таблицей появились. Если такого DAG нет, его необходимо создать по аналогии с ранее созданными DAG.
  3. Запустить выбранный DAG, либо дождаться его выполнения, если для него установлено расписание, проверить результат его выполнения.
  4. Ожидаемый результат DAG в целевой БД:
    1. Должна создаться/обновиться таблица, указанная в dwh_stg_config.load_data_config_jdbc.table_name_to
    2. Должны создаться/обновиться объекты, созданные в dbt (вью в схеме dwh_stg, таблица в схеме dwh_core). ВАЖНО! На данном этапе п.2 не выполнится, т.к. модели dbt ещё не загружены на стенд Airflow. Для создания объектов в БД необходимо локально запустить dbt командой dbt run --select tag:название_тега_сущности

5. Загрузка таблицы Kafka

Инструменты - Airflow

  1. Запустить DAG HOAD_<среда>_update_config_file для загрузки актуальной конфигурации из БД в конфигурационный файл dag_utils/config_file_data.py.
  2. Если при создании конфигурации был указан агрегат в kafka, то изменения по нему загружаются в рамках DAG HOAD_<среда>_KAFKA_load_config_iw_system.
  3. Запустить указанный DAG, либо дождаться его выполнения, если для него установлено расписание, проверить результат его выполнения.
  4. Ожидаемый результат DAG в целевой БД:
    1. Должна создаться/обновиться таблица, указанная в dwh_stg_config.load_kafka_aggregate_tables.kafka_table_name.
    2. Должны создаться/обновиться объекты, созданные в dbt (вью в схеме dwh_stg, таблица в схеме dwh_core) ВАЖНО! На данном этапе п.2 не выполнится, т.к. модели dbt ещё не загружены на стенд Airflow. Для создания объектов в БД необходимо локально запустить dbt командой dbt run --select tag:название_тега_сущности

6. Создание витрин

Инструменты:

  • средство для работы с БД (например, DBeaver) с подключением к целевой БД
  • локальный редактор файлов (Notepad++, VSCode)

Необходимый README создан в папке dbt/models/marts.

  1. Написать необходимые запросы для формирования витрины, проверить.
  2. Создать модель в dbt в папке dbt/models/marts по аналогии с ранее созданными витринами.
  3. Указать описание витрины в файле dbt/models/marts/dwh_marts.yml