Atomicity — Атомарность
Гарантирует, что каждая транзакция будет выполнена полностью или не будет выполнена совсем.
Consistency — Согласованность
Гарантирует, что транзакция переводит базу данных из одного согласованного состояния в другое.
Isolation — Изолированность
Во время выполнения транзакции параллельные транзакции не должны оказывать влияния на её результат.
Durability — Надёжность
Гарантирует, что после успешного завершения транзакции все изменения, сделанные в рамках этой транзакции, сохраняются в базе данных даже в случае сбоя системы.
Две транзакции меняют один и тот же блок данных параллельно и одно из изменений теряется
Первая транзакция поменяла данные, вторая их прочла, но первая откатилась
Первая транзакция прочла это поле, вторая транзакция изменила это поле, первая транзакция спустя время снова прочла это поле
Как неповторяющееся чтение, но тут не изменили поля, а добавили новые поля из-за которых выборка поменяла результат
SUM() стал другим из-за новых чисел
Блокировка данных, на время выполнения команды изменения.
Помогает предотвратить проблему потерянного обновления
Помогает предовратить грязное чтение
Уровень, при котором читающая транзакция «не видит» изменения данных, которые были ею ранее прочитаны.
Помогает предотвратить неповторяющееся чтение
Результат выполнения нескольких параллельных транзакций должен быть таким, как если бы они выполнялись последовательно.
Помогает предотвратить фантомное чтение
Кластеризованный индекс — это индекс, который сортирует строки с данными в таблице. Он хранит данные в листьях индекса, где все значения отсортированы в определённом порядке — либо по возрастанию, либо по убыванию. Благодаря этому существенно возрастает скорость поиска данных (при условии последовательного доступа к данным).
В таблице может присутствовать только один кластеризованный индекс.
Хранится в одном файле с данными
Физически сортирует строки таблицы по значению индекса
Некластеризованный индекс — индекс, который используется для применения индексов к неключевым столбцам. Главное отличие от кластеризованного индекса заключается в том, что некластеризованный индекс не упорядочивает данные физически. Он хранит данные и индексы в разных местах. Листья некластеризованного индекса содержат только те столбцы таблицы, по которым определён данный индекс. Это означает, что системе запросов необходима дополнительная операция для извлечения требуемых данных. Некластеризованные индексы нельзя отсортировать, в отличие от кластеризованных, однако существует возможность создания более одного некластеризованного индекса.
Хранит ссылки на физическое расположение строк и ключевые значения в таблице.
В таблице может быть много некластерных индексов
Хранится отдельно от данных (в другом файле)
Каждая запись в таблице А, связана с одной и только одной записью в таблице B (уникальность записи)
Каждая запись в таблице А может быть связана с одной или несколькими записями через внешний ключ
Любая запись в таблице А может быть связана с любой записью в таблице B (промежуточная таблица)
FROM
JOIN
WHERE
GROUP BY
HAVING
SELECT
COUNT(*)OVER() – если есть оконная ф-ция
QUALIFY --фильтрация по оконной ф-ции (в таких бд как Clickhouse или Snowflake)
DISTINCT
ORDER BY
OFFSET
LIMIT
B-tree: Самый популярный. Поддерживает >, < =, >=, <=, LIKE ‘abc%’, NULL, IS NULL, логарифмическая сложность O(log n)
Hash: Используется для быстрого поиска по равенству. (когда много операций =)
GIN: Подходит для массивов и полнотекстового поиска.
GiST: Используется для геометрических данных и полнотекстового поиска.
SP-GiST: Для наборов данных, которые подразумевают естественную упорядоченность, например телефонная книга
BRIN: Полезен на огромных наборов данных, которые упорядочены
Оконная ф-ция определяется ключевым словом OVER.
Планировщик будет решать какое сканирование использовать для извлечения данных(если индексов нет, то полное, а если есть то он будет думать)
index scan - Если есть большой набор проиндексированных данных и мы осуществляем поиск по индексу и потенциально в результирующий набор попадает малое кол-во строк(например 1), а в таблице содержится 1000000 строк, то оптимизатор включит index scan.
index only scan - Некоторые индексы(не все), хранят вместе с идентификаторами строк сами проиндексированные значения и это позволяет им просто читать сам индекс без обращения к таблицам и забирать результат прямо из самого индекса. Вместо чтения значения из таблиц при таком сканировании необходимо лишь заглянуть в карту видимости, чтобы выяснить актуальность индексных записей. Потому что карта видимости ставит 1, показывая что можно выполнить такое сканирование и данные актуальные
bitmap scan - надо выбрать кол-во строк не так мало как при index scan, но и не так много как при sequential scan. Например 300к из млн
sequential scan - Даже если поиск идёт по индексированному столбцу, а там куча данных и так забирать надо, то планировщик может наплевать и выбрать последовательное сканирование.
DDL - для работы с объектами БД(такие как таблицы, функции, индексы)
DML - для работы с данными внутри объектов (записи)
DCL - привилегии ролям даём (GRANT - дать, REVOKE - забрать привилегии у роли)
TCL - для контроля любой транзакции (автокоммит по дефолту есть, который выполняет за вас СУБД)
Self JOIN (Таблица соединяется сама с собой)
Одна таблица(например какого-то подразделения) где надо найти у какого начальника какой подчиненный
Right JOIN
Берутся все данные с правой таблицы и объединяются с теми данные которые есть в левой таблице(по ключу). Если есть значения в правой таблице, но по ключу в левой нет к ним данных, то будет NULL писаться
Full JOIN
Полное объединение левой и правой таблицы, если есть не совпадающие значения по ключам, то будут NULL
Inner JOIN
Только пересечение
CROSS JOIN
Каждая строка одной таблицы, совмещается с каждой строкой другой таблицы (декартово произведение).
Например (найти пару самых продаваемых продуктов, cross join + distinct)
Left JOIN
Как Right JOIN но наоборот
Nested loops join (соединения вложенных циклов)
Вложенные циклы. каждая строка из одной таблицы сравнивается с каждой строкой из другой таблицы и соединяются
Merge join (соединение слиянием)
Самые быстрый. Используется когда 2 большие таблицы. Требует чтобы обе таблицы имели индекс по ключу(т.е отсортированы).
Одна строка сравнивается с другой, когда нет совпадения, то переключается на другую строку, которая сравнивается с этой
Если повторяющиеся значения:
Записывает любые повторяющиеся значения из второй таблицы во временную таблицу и выполняет сравнения там. Затем, если эти значения также дублируются в первой таблице, сравнивает их со значениями, которые уже сохранены во временной таблице.
Hash match join (хеш-соединение)
Используется для больших таблиц, когда отсутствуют индексы или когда данные не отсортированы, а условие соединения предполагает точное совпадение
Берем меньшую из таблиц, для каждой строки создаем запись в хэш-таблице. Затем берем большую, сканируем и проверяем каждую строку на совпадение с хэш-таблицей, если совпадают, то соединяем
Есть внешняя таблица, есть внутренняя. Из внешней мы берем одно значение, отправляем через какую-то хэш-функцию и результат этой хэш-функции приходит в какой-то бакет. Потом мы получаем ключ от другой таблицы и проверяем, есть ли этот ключ в бакете, если есть, то попадается в результат
varchar - значение переменной длины(можем ограничивать значение которое мы можем вставлять) [переменные символы, например ФИО]
char - значение фиксированной длины(именно такой длины, не больше, не меньше) [когда определенно ограниченной длины, например номер телефона]
Разница VARCHAR(10) и CHAR(10) в том что CHAR(10) добивает пустые значения до 10 байт. Считывание по CHAR быстрее, т.к. VARCHAR имеет доп информацию о том сколько места занимает это значение
text - сколько хотим, столько и пишем
interval - нужен чтобы к дате можно было прибавить определенный промежуток времени
uuid - позволяет строить уникальный идентификатор вне зависимости от значения(для хэш индексов например)
Оконные функции: вычисляют агрегаты/ранги/сдвиги «по окну» над строками, не схлопывая строки, по OVER (PARTITION BY ... ORDER BY ...).
OVER (PARTITION BY ... ORDER BY ...)
Границы окна: ROWS/RANGE/ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW и т.п.
ROWS
RANGE
ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
SUM(x) OVER (PARTITION BY k ORDER BY dt ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)
SUM(...) OVER (PARTITION BY k) без ORDER BY — одна сумма на партицию (константа по группе). С ORDER BY (и дефолтным фреймом) — накопительная сумма.
SUM(...) OVER (PARTITION BY k)
DENSE_RANK через ROW_NUMBER:
DENSE_RANK
ROW_NUMBER
WITH ranks AS (
SELECT DISTINCT key_col, ROW_NUMBER() OVER (ORDER BY key_col) AS dense_rnk
FROM t
) SELECT t.*, r.dense_rnk
JOIN ranks r USING (key_col);
В разных БД(Например Clickhouse) кол-во агрегатных ф-ций больше
В RANK пропускаем значение при дубликатов, в отличие от DENCE_RANK
Если нам важна последовательность то DENCE_RANK
ROW_NUMBER просто считает строки последовательно
LAG - колонка сдвинется на 1(или n) вверх
LEAD - колонка сдвинется на 1(или n) вниз
FIRST_VALUE - первое значение из окна таблицы
LAST_VALUE - последнее значение из окна таблицы
Оконная ф-ция определяется ключевым словом OVER. А само окно определяется с помощью PARTITION BY
COUNT по столбцу не учитывает NULL
EXPLAIN - дает план запроса, который БД будет выполнять на основе статистических данных, которые БД сама высчитывает.
EXPLAIN ANALYZE - фактическое время и затраты при выполнении запроса. Если запрос выполняется очень долго, то делать EXPLAIN ANALYZE нет смысла.
VACUUM - проверяет индексы, удаленные строки, неудаленные строки.(DELETE не удаляет строку физически[типо просто помечает галкой], её можно откатить)
ANALYZE - обновляет статистику. Если вы вставили очень много строк, не поленитесь сделать ANALYZE
Сначала посмотрю. Если запрос выполнялся долго то выполню команду EXPLAIN, посмотрю на cost'ы, если они огромные в каком-то шаге, то буду этот запрос переписывать(именно этот промежуток), посмотрю на подзапросы, возможно человек не делал CTE(а сделал 5 одинаковых подзапросов, которые на каждом шаге выполнялся заново и строил временную таблицу).
Я бы выполнил команду ANALYZE, может у меня до этого миллион строк залили, потом удалили через DELETE команду и залили миллиард, а статистика осталась на том миллионе, поэтому я бы выполнил команду ANALYZE или VACUUM ANALYZE
Можно много всего накидывать тут
Откуда берутся cost. Это мнимые стоимости, но на них надо смотреть
В плане запросов может быть написано SPILL и кол-во строк. SPILL file означает, что не хватило у нас оперативной памяти и БД пришлось выгружать какое-то кол-во данных на диск локальный(SSD или HDD) это плохая тенденция. Надо делать так, чтобы их не было.
Все арифметические действия с NULL вернут NULL
Могут спросить (в постгресе нет функции div0, деления на 0) а как сделать так, чтобы деление на 0 возвращало пустое значение, без ошибки.
Надо чтобы делитель был = NULL. Можно достичь этого с помощью ф-ции NULLIF. Либо с помощью CASE WHEN
При COUNT(*) считаются все строки в столбце id, не важно NULL они или нет
При COUNT(id) с указанием конкретной колонки уже не учитываются NULL значения
SUM, AVG, MIN, MAX тоже не учитывают NULL
Когда мы выполняем AVG по колонке, мы не учитываем значения NULL, это эквивалентно SUM(id)::numeric/COUNT(id)
Но если мы хотим при подсчете среднего учитывать NULL значения то необходимо писать SUM(id)::numeric/COUNT(*)
Либо второй вариант, где мы NULL заменяем 0, AVG(COALESCE(id, 0))
Примечание: numeric пишется, чтобы при делении целого числа на целое число вернулось с остатком(одно из чисел должно быть десятичным)
Мы не можем ставить операции >, <, = NULL и т.д. Фильтровать можно только IS NULL или IS NOT NULL
На id != null некоторые СУБД дают ошибку, постгрес на это выдаст NULL
id != null
Могут спросить, а как отсортировать чтобы в вначале были NULL, либо в конце. В Postgres, Greenplum и Clickhouse
Разница COUNT(id) в том что он не считает NULL, в отличие от остальных.
COUNT(*) отличается от COUNT(1) ничем. cost у них одинаковый. Можно что угодно вставлять в COUNT() он будет всё равно воспринимать его как COUNT(*), константа
SELECT col1, COUNT(1) FROM t1 GROUP BY col2
SELECT DISTINCT col1, COUNT(*) OVER(PARTITION BY col2) FROM t1
Ничем не отличаются, в плане результата, в плане выполнения с оконной ф-цией медленнее будет выполняться.
Эта проверка что оконная ф-ция проставляет своё значение для каждого значения
Есть 2 таблицы 10 строк и 100 строк. Какое максимальное и минимальное кол-во возвращаемых строк при разных видах JOIN будет?
Вариативный вопрос. Может быть 10 строк и 10 строк, 5 строк и 7 строк. Ответ не меняется
У тебя есть 100 миллионов строк и тебе необходимо удалить 90 миллионов, как ты это сделаешь и почему?
Проверка на понимание работы БД. Пример с постгресом, таблицы хранятся как heap(куча). Есть сегмент = 1 ГБ, который внутри состоит из страниц данных, вроде 1 стр = 8 Кб. Есть определенный заголовок версии строки и сама строка данных. Надо понимать, если мы хотим 90 миллионов просто удалить, например пишем DELETE FROM .. TABLE WHERE id > 10.. То физически они не удалятся из этой таблицы. В заголовке строки есть 2 параметра типо xmin и xmax. в xmin ставится номер транзакции когда данная строка обновлена или удалена или что-то ещё. А в xmax ставится дата окончания, когда эту строку либо обновили, либо удалили. То есть номер транзакции которая удалила эту строчку. Мы просто в эти 90 миллионов ставим xmax(Номер транзакции которая удалила эту строку). Соответственно это долго. Есть пример 2х способов(на самом деле их больше)
В основе хотят услышать, что мы должны не просто в тупую удалять строки через DELETE, а именно удалить саму таблицу, потому что DROP TABLE и TRUNCATE TABLE работаю так, что они удаляют таблицу.
TRUNCATE(запоминает структуру таблицы, удаляет сегментный файл, а потом пересоздает его, DDL, но индексы и статистику не обновляет)
DROP(удаляет всё и строки и статистику и индексы)
Снизу 2 способа.
Есть запрос, который работает ночью и строит отчет. Ежедневно он работал нормально и создавал отчет за 2 часа. Сегодня утром ты пришёл на работу, а отчета нет. Смотришь свой пайплайн, а он все еще крутится на чтении запроса. Что могло произойти?
Этот вопрос на воображение. Обычно от нас хотят услышать, что мы залезем в EXPLAIN, посмотрим какой у нас скрипт, посмотрим cost, не навернулась ли у нас статистика, проверить SPILL есть ли и от этого уже отталкиваться.
Что могло произойти?
Например товарищ что-то так настроил и допустил ошибку, что записи повторяются циклично и кучу раз повторились
Вакуум не успел собрать статистику
Что такое и для чего нужен подзапрос? Что такое коррелируемый и не коррелируемый подзапрос? В чем отличие СТЕ от подзапроса?
Коррелируемый подзапрос — Внутри подзапроса пишем колонку из внешнего запроса. Подзапрос выполняется для каждой строчки. Часто используются в конструкции EXIST
Пример коррелируемого подзапроса:
Некоррелируемый подзапрос — Выполнятся 1 раз и результат используется внутри внешнего запроса
Пример некоррелируемого подзапроса:
Общие табличные выражения (CTE) — Создает временную таблицу, которую можно переиспользовать. [Сохраняется в кэше и можно переиспользовать]
Когда использовать?
Хорошая читаемость кода
Идёт какой-то большой джойн, а потом по этому джойну идут какие-то дополнительные вычисления(дальнейшие).
Для чего нужна команда UNION и в чем её различим с UNION ALL? Какая команда работает быстрее и почему? Какие еще операции над множествами ты знаешь и что они делают?
UNION:
Удаляет дублирующиеся строки.
Может быть медленнее из-за необходимости проверки на дубликаты.
UNION ALL:
Включает все строки, включая дубликаты.
Обычно работает быстрее, так как не требует проверки на дубликаты.
INTERSECT - выбирает общие значения из 2х множеств
EXCEPT - вычитает одно множество из другого
Расскажи про условия в SQL. Для чего нужна конструкция CASE, как она записывается и в каких конструкциях запроса её можно использовать?
Может использоваться в различных частях SQL-запроса, таких как SELECT, WHERE, ORDER BY и других.
WHERE фильтрует данные до группировки
HAVING фильтрует данные после группировки
QUALIFY фильтрует данные после применения оконных ф-ций
Пример QUALIFY
MPP (Massively Parallel Processing) это архитектура, где каждый узел
обрабатывает часть данных параллельно и независимо.
Как устроено:
Есть один главный узел (мастер).
Есть много рабочих узлов (сегменты).
У каждого свои СPU, память, диск.
Все узлы соединены быстрой сетью.
Правильный DISTRIBUTED BY - Распределяй таблицы по ключам, по которым часто происходит JOIN (Высокая селективность/Много уникальных значений)
Избегай data motion - Проверяй план через EXPLAIN ищи Motion
Проверяй план через EXPLAIN ищи Motion
Избегай select *
Фильтры (WHERE) применяй до JOIN, чтобы не тянуть лишние строки
Data Motion(Перекосы данных) в Greenplum (и других МРР-системах) - это процесс перемещения данных между сегментами (нодами) во время выполнения запроса.
Когда возникает:
Таблицы распределены по разным ключам, и нужно сделать JOIN, GROUP BY, ORDER BY, DISTINCT, которые требуют собрать или перестроить данные
Результат нужно вернуть на мастер (координатор)
Виды Data Motion:
Broadcast копирует строки со всех сегментов на каждый сегмент. Возникает если 1 таблица маленькая
Redistribute (Hash Motion) пересылает строки по хешу нового ключа . Возникает при JOIN или ORDER BY по другому ключу.
Gather собирает данные с сегментов на мастер. Возникает при ORDER BY, LIMIT и RETURN
Когда ты создаёшь таблицу, ты указываешь DISTRIBUTED BY ()
Принцип работы
Greenplum считает хэш от значения
По хешу выбирает номер сегмента, куда записать строку
Каждая строка попадает на один конкретный сегмент
Правильный DISTRIBUTED BY - Распределяй таблицы по ключам, по которым часто происходит JOIN
https://habr.com/ru/companies/X5Tech/articles/851386/
Greenplum vs HDFS: Greenplum — MPP СУБД (SQL, планировщик, индексы, ACID-частично); HDFS — слой хранения/ФС, не БД/SQL.
HDFS (Hadoop Distributed File System) — это распределенная файловая система, разработанная для хранения и обработки больших объемов данных на кластерах из недорогого оборудования.
Характеристики:
Масштабируемость (увеличение узлов)
Отказоустойчивость (репликации)
Высокая пропускная способность (Оптимизирована для пакетной обработки больших объемов данных)
HDFS: распределённая FS Hadoop; хранит большие файлы блоками на DataNode, метаданные у NameNode; репликация, «write once, read many».
Проблемы HDFS: миллионы мелких файлов, зависимость от NameNode (HA частично решает), write-once, ребалансировка, сеть/IO узкие места, тюнинг репликации/блоков.
MR - модель распределённых вычислений. Либо парадигма обработки данных
Суть MapReduce состоит в разделении информационного массива на части, параллельной обработке каждой части на отдельном узле и финальном объединении всех результатов.
Map - предварительная обработка данных в виде большиго списка значений.
Master распределяет данные по Worker-ам, worker-ы делают Map.
Shuffle - рабочие узлы (Spark Worker) перераспределяют данные на основе ключей (ранее созданных в Map) таким образом, чтобы все данные одного ключа лежали на одном работчем узле.
Reduce - параллельная обработка каждым Worker-ом по порядку следования ключей и склеивание на Master Node.
Это фреймворк с открытым исходным кодом для реализации распределённой обработки данных.
Он был разработан как альтернатива Hadoop MapReduce, предлагая более высокую производительность и удобство для сложных задач анализа данных
В него входят
MLlib. Набор библиотек для машинного обучения.
SQL. Компонент, который отвечает за запрос данных.
GraphX. Модуль для работы с графами (абстрактными представлениями связей между множеством объектов).
Streaming. Средство для обработки потоков Big Data в реальном времени.
Spark - распределённый движок вычислений (ETL, SQL, Streaming, ML); абстракции RDD/DataFrame, работает поверх YARN/K8s/Standalone, память-центричен.
Spill в Spark — это термин для обозначения процесса перемещения данных из памяти на диск, а затем снова обратно в память.
Он происходит, когда данных слишком много, и они не помещаются в выделенный для задачи раздел памяти. Фреймворк вынужден выполнять дорогостоящие операции чтения и записи на диск, чтобы освободить локальную оперативную память и избежать ошибки нехватки памяти (OutOfMemory, OOM), которая может привести к сбою приложения.
Некоторые причины spill-эффекта в Spark:
Большое значение параметра spark.sql.files.maxPartitionBytes. По умолчанию это количество байтов для упаковки в один раздел при чтении файлов формата Parquet, JSON и ORC. Если установить раздел, считываемый Spark намного больше, например, 1 ГБ, активное поглощение может вызвать spill-эффект.
Операция explode() на небольшом массиве данных с соединениями и декартовыми соединениями (CrossJoin) двух таблиц, результат которого может превысить размер раздела.
Агрегации по искажённым данным, которые неравномерно распределены по узлам кластера, также потенциально могут создать очень большой раздел и вызвать перенос данных из памяти на диск и обратно.
—Еще объяснения—
Shuffle - дорогое перемешивание данных между узлами.
Когда происходит: groupBy, join, distinct.
Как:
1. Write: Данные пишутся на диск.
2. Read: Читаются по сети для перегруппировки.
Проблемы:
Перекос (skew) : Один ключ с большими данными. Медленно и риск OOM(Out Of Memory).
Spill - сброс данных из памяти на диск при нехватке RAM.
Когда происходит:
Shuffle
Агрегации (groupBy, join)
Кэширование
Зачем избегать: Чтобы избежать ООМ(ошибка когда не хватает памяти) и обработать большие данные.
Минусы: Сильно медленнее (диск vs оперативная память).
Решение:
Увеличить число партиций
Настроить память (spark.memory.fraction)
Добавить RAM исполнителям
SparkSession — это точка входа для использования всех функций Apache Spark
Параметры, которыми задаётся SparkSession:
appName — имя приложения
config — параметры конфигурации, например, для настройки источника данных (CSV-файлы, JSON-файлы, базы данных) и записи данных в эти источники.
SparkContext использовали до версии 2.0
Broadcast join в Spark — это метод, который оптимизирует соединение больших и малых датафреймов. Он позволяет избежать перетасовки данных и сократить накладные расходы на передачу по сети.
Настроить broadcast join в Spark можно с помощью подсказок (хинтов). Они позволяют пользователю предложить Spark применять нужные подходы для создания плана выполнения SQL-запроса.
Чтобы указать, что надо сделать broadcast какой-то небольшой таблицы или датасета (до ~1Gb), можно в SQL-запросе указывать хинт /+ BROADCAST(t)/, где t — алиас таблицы или датасета.
Важно использовать broadcast join только для небольших фреймов данных, так как трансляция больших фреймов может вызвать проблемы с памятью из-за репликации данных на всех рабочих узлах.
Workflow приложения в Spark:
Приложение запускается и инициализирует SparkContext.
Программа-драйвер запрашивает у менеджера кластеров ресурсы для запуска исполнителей.
Менеджер кластеров запускает исполнителей.
Драйвер запускает код Spark.
Исполнители запускают задания и отправляют результаты драйверу.
SparkContext останавливается, а исполнители закрываются и возвращают ресурсы обратно в кластер.
Выделение ресурсов в Spark может происходить двумя способами:
Статическое выделение. Объём памяти и количество ядер выделяют на этапе запуска приложения и оставляют неизменными в течение всего выполнения. Подходит для задач, в которых требуется предсказуемое и стабильное распределение ресурсов на протяжении всего выполнения.
Динамическое выделение. Система автоматически адаптирует объём памяти и количество ядер исполнителей, чтобы удовлетворить текущим потребностям приложения. Этот метод позволяет управлять ресурсами в режиме реального времени, что делает его подходящим для приложений с переменной нагрузкой.
Когда отдаются ресурсы, при динамическом выделении приложение может возвращать ресурсы в кластер, если они больше не используются, и запрашивать их снова позже, когда появится спрос.
В Spark существуют следующие виды кэширования и их отличия:
Cache(). Хранит данные в оперативной памяти каждого отдельного узла, если для этого есть место, в противном случае — на диске.
Persist(уровень). Может сохранять данные в памяти, на диске или из кэша в сериализованном или несериализованном формате в соответствии со стратегией кэширования, указанной уровнем.
Также существуют следующие уровни хранения для кэширования данных:
DISK_ONLY. Данные сохраняются на диске в сериализованном формате.
MEMORY_ONLY. Данные сохраняются в оперативной памяти в десериализованном формате.
MEMORY_AND_DISK. Данные сохраняются в оперативной памяти, и если памяти недостаточно, вытесненные блоки будут сохранены на диске.
OFF_HEAP. Данные сохраняются в памяти вне кучи.
YARN в Apache Spark — это фреймворк управления ресурсами. Другими словами, это почти операционная система на кластерном уровне.
Что делает YARN: контролирует управление ресурсами, планирование и безопасность запущенных на нём Spark-приложений в любом режиме (кластера или клиента). При запуске Spark в YARN исполнитель Spark работает как контейнер YARN.
Зачем YARN нужен: позволяет использовать один пул ресурсов кластера для всех фреймворков, которые на нём запускаются, и настраивать его. Это даёт возможность разным типам приложений сосуществовать в одном кластере.
Кроме того, YARN централизует логи приложений, агрегируя их в HDFS или похожую систему хранения. Это упрощает отладку и позволяет проводить исторический анализ для обнаружения узких мест в производительности.
Ленивые вычисления
Преобразование в Spark — это операция над коллекциями данных, результатом которой служат новые данные. Вычисление преобразованных данных откладывается до того момента, когда к ним будут применены действия.
Действие в Spark — это тип операции, который возвращает конкретное значение. Действия применяются, когда необходимо вывести конкретное значение в консоль. Примеры действий: collect(), take(n) и count() и show().
Трансформации (например, map, filter):
Создают план вычислений (DAG)
Ленивые ничего не считают сразу
Действия (например, count, save, show):
Запускают выполнение всего плана
Возвращают результат или пишут данные
Ленивые вычисления:
Spark ждёт вызова действия, чтобы оптимизировать и выполнить весь план трансформаций разом
Экономит ресурсы и позволяет избежать лишних вычислений
Параллелизм в Apache Spark — это способность выполнять несколько задач одновременно на кластере. Spark использует модель параллелизма на уровне операций, что означает, что каждая операция в коде может быть выполнена параллельно на разных узлах кластера.
Уровень параллелизма определяет, сколько задач будет выполнено одновременно на кластере. Это может быть настроено через параметры конфигурации Spark. Важно подобрать подходящее значение в зависимости от характеристик кластера и характера обрабатываемых данных.
По умолчанию значение уровня параллелизма (spark.default.parallelism) равно общему количеству ядер на кластере.
Также в Spark есть параллелизм второго порядка, который позволяет выполнять несколько действий параллельно. Например, когда исполнители завершат первое действие, они немедленно начнут работу над вторым и не будут бездействовать, пока оба не будут завершены.
5 механизмов выполнения операций соединения в Apache Spark SQL
Broadcast Hash Join. Один из входных наборов данных транслируется всем исполнителям. Для транслируемого набора данных создаётся хеш-таблица, после чего каждый раздел не транслируемого входного набора данных присоединяется независимо к другому набору данных, доступному как локальная хеш-таблица.
Shuffle Hash Join. Наборы данных выравниваются по выбранной схеме разделения. Если нужно, выполняется шаффлинг для соответствия схеме разделения. Затем выполняется join в каждой партиции с использованием hash join.
Sort Merge Join. Аналогично shuffle hash join, но требует сортировки данных перед join.
Cartesian Join. Используется только для cross join. Создаёт все возможные пары записей из обоих наборов данных.
Broadcast Nested Loop Join. Является механизмом join по умолчанию, когда нельзя выбрать другие механизмы.
1. Broadcast Hash Join (BHJ)
Когда: Одна из таблиц МАЛЕНЬКАЯ (меньше порога spark.sql.autoBroadcastJoinThreshold, по умолчанию 10МБ).
Как: Малая таблица рассылается (broadcast) на все ноды. Быстро, т.к. избегает shuffle.
2. Sort Merge Join (SMJ)
Когда: Дефолтный join для больших таблиц. Обе стороны большие, и данные отсортированы по ключу join'a или требуется отсортированный вывод.
Как: Данные шарятся (shuffle), сортируются по ключу и мержатся. Эффективно для больших данных, но требует shuffle и сортировки.
3. Broadcast Nested Loop Join (BNLJ)
Когда: Если join condition НЕ равенство (например, df1.value > df2.value). Очень медленный, но иногда единственный вариант.
Как: Рассылает одну из таблиц и для каждой строки проверяет condition вложенным циклом. Избегайте, если возможно.
4. Cartesian Join (Cross Join)
Когда: Явно запрошен (через crossJoin или с условием 1=1).
Декартово произведение очень дорого.
Как: Полный shuffle и объединение всех строк
Используйте только если осознанно
Основное отличие методов repartition и coalesce в Spark заключается в их назначении:
Repartition используется для увеличения или уменьшения количества партиций. Он выполняет shuffle данных, что может быть дорогостоящей операцией, но обеспечивает равномерное распределение данных между партициями.
Coalesce используется для уменьшения количества партиций. Он выполняет это без shuffle данных, что делает его менее затратным по сравнению с repartition. Однако coalesce эффективен только для уменьшения количества партиций.
Таким образом, repartition создаёт разделы одинакового размера, выполняя полное перемешивание данных, в то время как coalesce объединяет существующие разделы, чтобы избежать полного перемешивания.
—Еще объяснение—
Repartition:
Увеличивает или уменьшает число партиций.
Всегда выполняет полный shuffle (дорогая операция).
Используй: Когда нужно увеличить параллелизм или равномерно распределить данные.
Coalesce:
Только уменьшает число партиций.
Избегает shuffle, объединяя партиции на тех же узлах.
Используй: Когда нужно уменьшить число партиций без shuffle
(например, перед записью в few files)
Adaptive Query Execution (AQE) — это механизм оптимизации запросов в Apache Spark в реальном времени. Он динамически оптимизирует планы выполнения запросов на основе статистики и характеристик данных во время выполнения.
AQE позволяет более эффективно использовать ресурсы и снижать время выполнения. Например, он может разделить большие задачи на более мелкие, что уменьшает узкие места.
AQE также позволяет отложить решение о стратегии объединения до времени выполнения, выбрав наиболее подходящий метод на основе фактических размеров данных.
AQE используется в трёх сценариях, где традиционная оптимизация может не работать:
Перекос данных. Spark может определить перекос данных во время выполнения и скорректировать выполнение задач.
Неизвестные шаблоны объединений. В запросе с неизвестными шаблонами объединений AQE может отложить решение о стратегии объединения до времени выполнения, выбрав наиболее подходящий метод.
Изменение характеристик данных. Размер и распределение наборов данных могут меняться, делая статически определённые планы запросов устаревшими.
Persistence в Spark позволяет контролировать уровень хранения данных (в памяти, на диске или комбинации). Это полезно для больших наборов данных, которые не помещаются целиком в память, или для обеспечения отказоустойчивости.
Существуют следующие уровни хранения в Spark:
MEMORY_ONLY. Хранит данные только в памяти. Это самый быстрый уровень хранения, но и самый изменчивый, так как данные можно удалить из памяти без пространства.
MEMORY_AND_DISK. Хранит данные в памяти, а если памяти недостаточно, сохраняет на диск.
DISK_ONLY. Хранит данные только на диске. Это самый надёжный уровень хранения, но и самый медленный.
MEMORY_ONLY_SER. Хранит данные в памяти в сериализованном виде. Экономит память, но увеличивает затраты на сериализацию/десериализацию.
MEMORY_AND_DISK_SER. Комбинация MEMORY_ONLY_SER и DISK_ONLY.
По умолчанию каждому исполнителю (worker) в приложении Spark выделяется 1 ГБ оперативной памяти.
Драйвер в Spark — это управляющий процесс, который запускает метод main() приложения. Он создаёт объекты SparkSession и SparkContext, а также преобразует код в операции преобразования и действия. Также драйвер создаёт логические и физические планы, планирует и координирует задачи с менеджером кластера.
Исполнитель в Spark — распределённый процесс, который отвечает за выполнение задач. У каждого приложения Spark собственный набор исполнителей. Они делают всю обработку данных задания Spark. Исполнители сохраняют результаты в памяти, а на диске — только тогда, когда это специально указывается в программе-драйвере. Возвращает результаты драйверу после их завершения.
На одном воркере может быть несколько экзекьюторов
RDD (Resilient Distributed Dataset) — это неизменяемая распределённая совокупность элементов данных, которые могут храниться в памяти или на диске в кластере машин. RDD позволяют совершать низкоуровневые трансформации над неструктурированными данными (как медиа или текст).
DataFrame — это распределённая коллекция данных, организованная в именованные столбцы. Концептуально он соответствует таблице в реляционной базе данных с оптимизацией для распределённых вычислений.
Dataset — это расширение API DataFrame, обеспечивающее функциональность объектно ориентированного подхода, производительность оптимизатора запросов Catalyst и механизм хранения вне кучи.
RDD работают «под капотом» путём разделения данных на несколько разделов (Partition), которые хранятся на каждом узле-исполнителе. Каждый узел выполняет работу только на собственных разделах. RDD отказоустойчивы, так как отслеживают поток данных для автоматического восстановления потерянных данных в случае сбоя.
——Еще объяснение——
RDD (Разбитые Данные):
Как кирпичики - базовые блоки Spark.
Без схемы (не знаешь, что внутри объекта).
Требуют ручной оптимизации (медленнее).
Для сложных низкоуровневых задач.
DataFrame (Таблицы):
Как Excel-таблица со столбцами и названиями.
Со схемой (известны типы данных: число, строка).
Автоматическая оптимизация (быстрее, умный движок).
Для большинства задач (ETL, аналитика).
Dataset (Улучшенные Таблицы):
Как типизированные DataFrame.
Проверка типов на этапе компиляции (меньше ошибок).
Только для Scala/Java
Avro — это строковый формат, который хранит схему в формате JSON, облегчая её чтение и интерпретацию любой программой. Сами данные лежат в двоичном формате, компактно и эффективно. Лучше подходит для операций записи, где данные нужно быстро сериализовать и десериализовать.
Parquet — колоночный формат, который оптимизирован для операций чтения и аналитики. Он эффективен в плане сжатия и кодирования, что делает его идеальным для сценариев, где важна эффективность чтения и хранения.
ORC — колоночный формат, похожий на Parquet, но оптимизированный для операций чтения и записи. Он эффективен в плане сжатия, что снижает затраты на хранение и ускоряет извлечение данных.
Команда для взаимодействия с HDFS
Джоба работала 7 дней, потом упала, заказчик не получил результат. В чем проблема и где начать смотреть?
Возможные причины проблемы:
1. Ошибка в коде:
Непредвиденная ошибка в логике программы
Проблема с обработкой данных или выходными данными
2. Отсутствие ресурсов:
Выделение недостаточных ресурсов для выполнения задачи
Перегрузка кластера Hadoop
3. Проблемы с конфигурацией:
Неправильная настройка параметров Spark или Hadoop
Конфликт между версиями компонентов
4. Сбой инфраструктуры:
Проблемы с сетевым соединением
Сбой узлов кластера
5. Проблемы с мониторингом:
Отсутствие эффективного мониторинга процесса
Неисправность системы мониторинга
Где начать поиск:
1. Логи:
Проверьте логи джобы и кластера
Используйте команды:`cat /path/to/job/logs/`* `tail -f /path/to/job/logs/latest.log`
2. Метрики производительности:
Проверьте метрики CPU, памяти и дискового пространства
Используйте инструменты мониторинга, такие как Nagios или Prometheus
3. Статус задачи:
Проверьте статус задачи через интерфейс управления задачами (например, YARN UI)Используйте команду:
`yarn application -list`4. Результаты выполнения:
Проверьте директорию с результатами выполнения
Используйте команду:
`hadoop fs -ls /path/to/results/directory`
5. Мониторинг ресурсов:
Проверьте использование ресурсов кластера
Используйте команду:* `yarn cluster-info`
Некоторые другие отличия:
CTE определяются в начале запроса, а подзапросы — inline.
CTE всегда имеют имя, а подзапросы — только в PostgreSQL.
CTE можно использовать много раз внутри запроса, а подзапрос — только один раз.
Подзапросы можно использовать в предложении WHERE в сочетании с ключевыми словами IN или EXISTS, а с CTE это сделать нельзя.
Совмещать использование GROUP BY и оконных функций в SQL можно, но с определёнными ограничениями.
В соответствии с порядком логической обработки запросов, оконные функции обрабатываются на этапе SELECT или ORDER BY, то есть после GROUP BY. Поэтому при оценке GROUP BY оконные функции ещё не вычислены.
Один из способов совместить эти понятия — использовать подзапрос. В нём вычислить оконную функцию, а затем использовать её в основном запросе. Например:
SELECT quartile, min(points), max(points), count(*) FROM (SELECT ntile(4) OVER (ORDER BY points) AS quartile, points FROM midterm) groups GROUP BY quartile
В подзапросе используется функция NTILE() для разделения студентов на группы, а в основном запросе вычисляется статистика: минимум, максимум и количество студентов.
Ещё одна возможность — использовать общее табличное выражение (CTE). Его можно определить на основе запроса, вычисляющего групповой агрегат, а во внешнем запросе вычислить оконный агрегат.
Разница между использованием GROUP BY и оконных функций в SQL заключается в следующем:
GROUP BY сокращает количество строк в запросе с помощью их группировки. После группировки остаётся только одна запись для каждого значения, использованного в столбце.
Оконные функции не уменьшают количество строк в запросе по сравнению с исходной таблицей. Они работают с выделенным набором строк (окном) и выполняют вычисление для этого набора строк в отдельном столбце. Результат работы оконных функций добавляется к результатирующей выборке в ещё одно поле.
Кроме того, оконные функции могут вычислять скользящие средние и кумулятивные суммы, а также обращаться к другим строкам.
Разница между функциями COUNT(*) и COUNT(имя столбца) в SQL заключается в том, что COUNT(*) возвращает общее количество строк в таблице, включая те, где есть значения NULL, а COUNT(имя столбца) — количество всех ненулевых значений в определённой колонке.
Таким образом, COUNT(*) учитывает все без исключения, а COUNT(имя столбца) исключает пустые значения и подсчитывает только заполненные поля.
Функции и хранимые процедуры отличаются по следующим параметрам:
Возвращаемые значения. Функции всегда возвращают значение. Хранимые процедуры не обязательно возвращают значения.
Вызов. Функции могут вызываться внутри SELECT-запросов, либо вызываться как результат самого запроса. Хранимые процедуры вызываются командой CALL название_процедуры()
Для удаления дубликатов строк в SQL можно использовать следующие методы:
Метод с дублирующей таблицей. Нужно переместить один экземпляр любой повторяющейся строки в исходной таблице в дублирующую таблицу, затем удалить все строки из исходной таблицы, которые также находятся в дублирующей таблице. После этого строки в дублирующей таблице перемещают обратно в исходную таблицу и удаляют дублирующую таблицу.
Метод с функцией ROW_NUMBER. Функция ROW_NUMBER разделяет данные на основе определённого столбца (или нескольких столбцов, разделённых запятыми) и удаляет все записи, которые получили значение, превышающее 1. Это указывает на то, что записи являются дубликатами.
Перед удалением данных рекомендуется создать резервную копию таблицы.
Также в PostgreSQL для нахождения медианы можно использовать функцию percentile_count:
SELECT percentile_cont (0.5) WITHIN GROUP (ORDER BY sales) FROM table
Колоночная база данных — это система управления базами данных, которая хранит данные столбцами, а не строками, как в традиционных реляционных базах данных.
Преимущества колоночных баз данных:
Ускорение аналитических запросов. При агрегациях и фильтрациях задействуются только те столбцы, которые участвуют в запросе. При этом данные одного типа лежат последовательно.
Эффективное сжатие данных. Данные в одном столбце имеют схожую природу, что позволяет применять более эффективные алгоритмы сжатия.
Снижение нагрузки на I/O. Извлечение только нужных столбцов уменьшает объём операций ввода-вывода.
Недостатки колоночных баз данных:
Меньшая эффективность для транзакционных операций. Записи и обновления, которые влияют на большое количество столбцов, требуют больше операций, чем в реляционных базах.
Усложнение работы с несжатыми или разреженными данными. Если данные столбца не поддаются сжатию или содержат много пропусков, преимущества колоночного подхода снижаются.
Стороковая БД - данные на диске хранятся блоками. Даже если тебе нужна только одна колонка, ты не сможешь вытащить только её. В широких таблицах (много столбцов) будет очень долго.
Столбцовая БД - данные хранятся по колонкам, а не по строкам. Каждая колонка хранится отдельно на диске. А значения в колонке храняться последовательно.
Из-за того что база загружает только нужные данные - она идеально
подходит для аналитических запросов
Строковые БД: OLTP, частые записи/малогабаритные выборки «вся строка». Колоночные: OLAP, агрегации по столбцам, скан немногих колонок, сильная компрессия.
Колоночные: чтение отдельных колонок быстрее; массовые UPDATE/DELETE обычно медленнее (append+compaction), point-update не их сильная сторона.
В строковых все поля строки лежат рядом → один I/O для всей строки; у колоночных поля разнесены по колонкам.
B-tree(Balanced-Tree): собранное сбалансированное дерево, упорядоченные ключи; поиск/вставка/удаление O(log n), диапазоны O(log n + k).
Хранение: колоночный формат, данные записываются «частями» (parts) внутри партиций (PARTITION BY).
Сортировка: каждая часть отсортирована по ORDER BY (ключ сортировки/первичный ключ по умолчанию).
Индексация: создаётся разреженный первичный индекс (по ORDER BY) с «метками» (granules) для чтения только нужных диапазонов.
Слияния: фоновые мерджи объединяют мелкие части в крупные внутри партиции; для производных движков (Replacing/Summing/…) при мерджах применяется их логика.
Чтение: отбрасывание партиций по PARTITION BY, затем «data skipping» по ORDER BY, PREWHERE/WHERE, вторичным индексам/skip-индексам; минимизация I/O.
Задает ключ сортировки в каждой части (и первичный ключ по умолчанию), на котором строится разреженный индекс.
Главный эффект: работа «data skipping» — резкое ускорение фильтрации/диапазонных запросов.
Влияние на мерджи и поведение производных движков (напр., что считать «одинаковой» строкой).
Не гарантирует порядок результата SELECT — для упорядоченного вывода используйте ORDER BY в запросе.
Замена дубликатов выполняется только внутри одной партиции и для строк с одинаковым ключом сортировки (ORDER BY).
Версия: с version оставляется запись с максимальной версией; без версии — «последняя» по порядку мерджа (не детерминировано на репликах).
Когда срабатывает: физически — при мерджах/OPTIMIZE … FINAL; логически — при SELECT … FINAL.
Через границы партиций замены не происходят.
В SELECT заставляет «доделать» результат для движков семейства MergeTree:
Replacing — убрать дубликаты,
Summing/Aggregating/Collapsing — применить финализацию состояний/сворачивание.
Дорого: читает больше данных и делает дополнительную обработку на лету; применяйте точечно.
На обычном MergeTree эффект отсутствует.
SELECT … FINAL: одноразовая логическая финализация на чтении; не меняет хранение; платите стоимостью каждого запроса.
OPTIMIZE … FINAL: физически мерджит все части партиции до одной, применяя финализацию; изменяет данные на диске; дорогая одноразовая операция, после неё обычные запросы быстрее (часто без FINAL).
На репликах OPTIMIZE исполняется как совместимая операция (через журнал репликации); для кластера используйте ON CLUSTER.
Логическая прокладка над локальными таблицами на шардах, описанных в cluster.
Чтение: координация на инициаторе, максимально возможный pushdown (WHERE/AGG/LIMIT/…) на шарды, объединение промежуточных результатов.
Запись: маршрутизация по шард-ключу к нужному шару(дам); далее — в локальные (обычно ReplicatedMergeTree) таблицы.
Роутинг по sharding_key; для шарда с несколькими репликами:
internal_replication=1 — отправка на одну реплику, далее репликация разнесёт.
internal_replication=0 — отправка на все реплики шарда.
Надёжность: при недоступности узлов данные буферизуются в distributed/ и досылаются фоново монитором.
Режимы: синхронная/асинхронная отправка (insert_distributed_sync), принудительный выбор шарда (insert_shard_id), случайный шард (insert_distributed_one_random_shard).
Используйте EXPLAIN:
EXPLAIN PIPELINE | PLAN | SYNTAX — видно шаги Remote, где выполняются фильтры/агрегации/лимиты на шардах.
Системные логи:
system.query_log (поля is_initial_query, статистика по чтению/агрегациям),
увеличьте send_logs_level='debug' и посмотрите, что было «pushed down».
Косвенно — по настройкам/паттернам: distributed_group_by_no_merge, optimize_distributed_group_by_sharding_key, distributed_push_down_limit, optimize_skip_unused_shards.
Переписать запрос под pushdown:
Фильтровать по шард-ключу (включая его в WHERE) и включить optimize_skip_unused_shards.
Делать GROUP BY, включающий шард-ключ → optimize_distributed_group_by_sharding_key=1.
Для тяжёлых агрегаций: distributed_group_by_no_merge=1 (оставить итог по шардам) или двухфазная агрегация.
Джойны:
Для маленьких таблиц — GLOBAL JOIN/IN (бродкаст на шарды).
Для больших — соположить данные: одинаковый шард-ключ у обеих таблиц; использовать Join-движок/матвью/распределённые словари.
Структурные решения:
cluster()/remote() для распределённого выполнения над локальными таблицами.
Матвью/предагрегации/проекции на шардах, чтобы тяжелое считалось локально.
distributed_push_down_limit=1, перенос ORDER BY … LIMIT на шарды, где возможно.
MergeTree. Базовый движок для создания таблиц в ClickHouse. Обеспечивает быструю вставку, поддерживает большие объёмы данных.
ReplacingMergeTree. Позволяет «схлапывать» значения по ключу сортировки. Это помогает исключить дублирование значений по ключу сортировки и держать таблицу в актуальном состоянии.
AggregatingMergeTree. Хранит предагрегированные состояния (AggregateFunction ( . . . ) )
SummingMergeTree. Автоматически агрегирует строки с одинаковыми ключами
CollapsingMergeTree. Удаляет пары строк с противоположными знаками sign (+1 / -1)
VersionedCollapsingMergeTree. Kak Collapsing, но учитывает version оставляет последнюю актуальную строку
Log. Простые движки с минимальной функциональностью. Они эффективны, когда нужно быстро записать много небольших таблиц (до примерно 1 миллиона строк) и прочитать их позже целиком.
Движки для интеграции. Используются для связи с другими системами хранения и обработки данных. Например, Kafka, MySQL, ODBC, JDBC.
Memory. Хранит данные в памяти, подходит для небольших таблиц объёмом менее 100 миллионов строк, которые не требуют сохранения данных.
Индексы ClickHouse: разреженный первичный ключ MergeTree (data skipping по гранулам), вторичные data-skipping индексы (minmax, bloom, set, tokenbf/ngrambf), проекции; у новых версий — векторные индексы для ANN.
Индексы отдельно задаются для ускорения фильтрации по неключевым колонкам, Помогают пропустить блоки, в которых точно нет нужных данных
Типы:
minmax - Сохраняет min/max значений на блок
set - Сохраняет set уникальных значений
bloom_filter - Быстрый поиск no exact match
tokenbf_v1 - Полнотекстовый поиск по словам
ngrambf_v1 - Полнотекст по подстрокам (n-граммам)
JOIN в ClickHouse: в основном эквисоединения (hash join в память правой стороны, при больших — grace hash/partial merge/merge), есть ASOF для «нестрогого равенства по времени». На кластере — локально по шардам с последующим объединением.
ASOF
JOIN в ClickHouse не по строчке, а блочно
Всё работает в потоках и блоками, для максимальной скорости
Нет nested loop, нет индексов -> важно, как устроены данные
INNER JOIN, LEFT JOIN, RIGHT JOIN - работают также как в SQL
ANY INNER / ANY LEFT - взять любую подходящую строку, если их несколько. Это важно потому что clickhouse не поддерживает FULL JOIN
ASOF JOIN - Используется в временных данных, берёт ближайшее "раньше по времени" значение.
SEMI JOIN - это фильтрация по существованию
Мутации - пересборка частей данных в фоне.
Update - Через мутации. Построит новые части с измененными
строками, а старые удалит.
Распределение таблиц CH: Distributed-таблица маршрутизирует по sharding_key на локальные MergeTree на шардах; внутри — PARTITION BY (партиции) и ORDER BY (сортировка в частях).
Distributed
sharding_key
MergeTree
PARTITION BY
Координатор CH: узел, получивший запрос (инициатор) — он координирует выполнение; центрального коордиатора нет.
CAP CH: для ReplicatedMergeTree — склонность к C+P (консистентность с ClickHouse Keeper/ZooKeeper; при проблемах с координацией записи ограничиваются), с итоговой согласованностью между репликами и кворумными настройками.
ReplicatedMergeTree
Нет JOIN по неравенству, т.к. движок оптимизирован под эквисоединения (hash/merge). Исключение — ASOF (интервалы/«ближайшее по времени»).
Когда выберет MergeJOIN: когда оба входа отсортированы по ключам соединения (или могут быть эффективно отсортированы) и включён join_algorithm=auto/merge; большие входы, жёсткие лимиты памяти для hash join.
join_algorithm=auto/merge
MergeJOIN: одновременный проход по двум отсортированным потокам, как в фазе merge; малопамятный, амортиз. O(n+m), хорошо для диапазонов и множественных совпадений.
В PostgreSQL MergeJoin берёт отсортированные источники (индексный скан по B-tree или явная сортировка). В ClickHouse глобальная переcортировка «на лету» по произвольному ключу для очень больших наборов и распределённо не всегда рациональна; MergeTree фиксированно отсортирован по ORDER BY, поэтому MergeJOIN целесообразен/возможен только при совпадении ключей сортировки или приемлемой цене сортировки.
Топ-5 оптимизаций:
Выбирайте правильные PARTITION BY/ORDER BY/первичный ключ (чтобы отсекать чтение).
Читайте только нужные колонки; избегайте SELECT *.
SELECT *
Предагрегируйте/материализуйте тяжёлые вычисления; используйте проекции/materialized views.
Используйте подходящие data-skipping индексы и LowCardinality для строковых ключей.
LowCardinality
Следите за перекосом: правильные ключи разбиения/перепартиционирование.
Еще советы
Если запрос медленный:
1. Добавь индексы по полям фильтрации, джойна и сортировки. Всегда выбирай только нужные поля без SELECT … WHERE до JOIN, LIMIT ДО ORDER BY, по возможности.
2. Проверь, можно ли агрегацию сделать на меньшем объёме.
3. Используй оконные функции, если не нужна группировка.
4. Всегда смотри план запроса. Найдёшь слабые места: full scan, nested loop, лишние сортировки
Перекос данных: неравномерное распределение ключей/партиций → «тяжёлые» таски и хвосты. Лечение: соли/репартITION, переопределение ключа, AQE, pre-agg, sampling.
Шардирование/репликация: шардирование — горизонтальное деление данных для масштабирования; репликация — копии для отказоустойчивости и параллельного чтения.
Шардирование ускоряет? Да, для полноскановых/параллелящихся запросов; может замедлять при межшардовых JOIN/Shuffle.
Подходы DWH: Кимболл (Dimensional/Bus, звёзды), Инмон (EDW 3NF + витрины), Data Vault, ODS, Data Lake/Lakehouse.
Data Lake vs DWH: Lake — schema-on-read, дешёвое сырое/полусырые данные; DWH — schema-on-write, строгость, качество, BI. Lakehouse — гибрид с ACID-таблицами.
Кимболл vs Инмон: Кимболл — витрины в звезде от источников; Инмон — корпоративный нормализованный EDW, от него витрины.
SCD: медленно меняющиеся измерения. Типы: 1 (перезапись), 2 (история с valid_from/valid_to/is_current), 3 (хранение прежнего значения), 4/6 — вариации.
valid_from
valid_to
is_current
SCD2 процесс:
Новая NK: вставка с valid_from=now, valid_to=NULL, is_current=1.
valid_from=now
valid_to=NULL
is_current=1
Изменение: закрыть старую (valid_to=now, is_current=0), вставить новую версию.
valid_to=now
is_current=0
Удаление: «мягко» закрыть интервал (или флаг удаления).
Как правильно работать с таблицами, если 1я- это просто справочник(например пользователи) построеная по SCD2, а 2я- это покупки пользователей, и необходимо найти все покупки пользователя с актуальными данными на день покупки.
SCD2-справочник + факты: соединять по NK и по интервалу актуальности на дату покупки (лучше — факт хранит суррогат ключ версии).
SELECT f.*, d.*
FROM fact_purchases f
JOIN dim_users d
ON d.user_nk = f.user_nk
AND f.purchase_dt >= d.valid_from
AND f.purchase_dt < COALESCE(d.valid_to, 'infinity');
Парадигмы (столпы) ООП: инкапсуляция, наследование, полиморфизм, абстракция.
Некоторые виды магических методов в Python:
Метод init. Используется для инициализации объекта. Вызывается автоматически при создании нового экземпляра класса.
Методы int, float и complex. Преобразуют сложные объекты в примитивный тип int, float и complex соответственно.
Метод bool. Принимает один позиционный аргумент и возвращает либо true, либо false. Его цель — проверить, является ли объект true или false, либо явно преобразовать в логическое значение.
Методы str и repr. Определяют строковое представление объекта и его машиночитаемое представление соответственно.
Методы getattr(self, name), setattr(self, name, value) и delattr(self, name). Вызываются при обращении к атрибуту класса, который не существует, при назначении значения атрибуту класса и при удалении атрибута класса соответственно.
Декораторы в Python — это функции, которые принимают другую функцию в качестве аргумента, добавляют к ней дополнительную функциональность и возвращают функцию с изменённым поведением.
Они позволяют динамически менять, расширять, дополнять логику и поведение функций, классов, методов без изменения исходного кода.
Декораторы: функция, оборачивающая другую для добавления поведения, не меняя вызовы; отделяют кросс-срезочные задачи (логирование, кэш, контроль доступа), меняют структуру через @decorator
@decorator
import time, functools
def timeit(fn):
@functools.wraps(fn)
def wrapper(*a, **kw):
t = time.perf_counter()
try:
return fn(*a, **kw)
finally:
print(f"{fn.__name__}: {time.perf_counter()-t:.6f}s")
return wrapper
@timeit
def work(n):
return sum(range(n))
Итератор и генератор — это два понятия в языке Python, которые позволяют работать с коллекциями данных.
Итератор — это объект, который обеспечивает доступ к элементам коллекции по одному. Он может работать с любым видом коллекции, а не только с последовательностью значений.
Генератор — это особый тип функции, позволяющий создавать последовательность значений по одному, вместо того чтобы возвращать всю последовательность сразу. Когда вызывается функция-генератор, она возвращает объект-итератор, который управляет потоком данных из генератора. Функция генератора продолжает выполняться до тех пор, пока не достигнет конца своей итерации, после чего она прекращает выполнение.
Генератор: ленивый итератор, выдающий значения по yield (или генераторным выражением). Экономит память, удобен для потоков данных.
yield
def countdown(n):
while n > 0:
yield n
n -= 1
g = (x*x for x in range(5))
Они под капотом из себя представляют хэш-таблицы.
Хэш-таблица это такая структура данных, ключом которой могут быть только хэшируемые объекты.
То есть объект у которого хэш-значение на протяжении жизни не меняется вообще.
Значением соответственно может быть всё что угодно
Когда ты добавляешь какой ключ с значением в словарь, то проверяется хэш значение этого ключа, если оно имеется и ф-ция хэш не вызывает ошибки при добавлении этого ключа, тогда добавляется
Различия изменяемых/неизменяемых: изменяемые меняют состояние без смены id (нельзя ключом dict если не хешируемы), неизменяемые при «изменении» создают новый объект (хороши как ключи, потокобезопаснее на чтение).
Список — упорядоченный изменяемый;
кортеж — упорядоченный неизменяемый;
множество — неупорядоченное уникальных элементов;
словарь — отображение ключ→значение (упорядочен по вставке с 3.7).
Список нужен, если нам надо что-то внутрь складировать
А вот операции считывания, итерирования, хранения лучше в кортеже. Так как он неизменяемый, по нему итерирование быстрее происходит
Кортеж потребляет меньше памяти
Со списками обычно работают когда внутри них однотипные обёекты
Специальный объект для управления ресурсами, который потом с гарантией освобождает эти ресурсы
Как работает:
Объект должен реализовывать методы:
_enter_() - вход в контекст, возвращает ресурс
_exit_() - выход из контекста, освобождение ресурса
Как устроен:
1. Хеш-функция: Для ключа вычисляется хеш (через hash()).
2. Индекс: На основе хеша вычисляется индекс ячейки в массиве (с помощью маски).
3. Ячейка (bucket): Каждая ячейка хранит ссылку на запись: [hash, key, value].
Решение коллизий (Collision Resolution)
Метод открытой адресации (Open Addressing):
Если ячейка занята, ищется следующая свободная (линейное пробирование) .
В современных версиях Python используется более сложная версия (например, псевдослучайное пробирование) .
Что происходит при добавлении:
1 Вычисляется хеш ключа.
2. Находится ячейка по индексу.
3. Если ячейка занята ищется следующая свободная.
4. Данные сохраняются в первую свободную ячейку.
При поиске:
2. Проверяется ячейка по индексу.
3. Если ключ не совпадает проверяются следующие ячейки, пока не найдётся совпадение или пустая ячейка.
Это следствие ошибки при выполнения кода(при Runtime), либо синтаксической ошибки(при компиляции). Останавливается выполнения всей программы.
в try (тот код, который потенциально может вызвать ошибку)
в except (то, что надо выполнить при возникновении ошибки)
необязательные:
в finaly (то что выполнится в конце в любом случае)
else (выполняется, если в try не было никакого исключения)
[1,2,3,4]
Переменная b становится ссылкой на тот же список, что и a. Это означает, что a и b теперь указывают на один и тот же объект в памяти.
b
a
1ый print выведет - c d
2ой print выведет - e
3ий print выведет - пустой список
Выпадет исключение, что мы не можем изменить кортеж, но при этом список меняется внутри
dict (встроенный)
counter
chainmap
ordereddict
defaultdict
mappingproxy
a = 10
del a
# имя 'a' удалено; обращение к a -> NameError. Объект 10 останется, если где-то ещё ссылаются
(i for I in range(1,100)): создаст генератор. При первом next() — NameError, т.к. в выражении используется i, а переменная связывания — I (если i не определена снаружи).
(i for I in range(1,100))
next()
NameError
i
I
Использовать изменяемый тип данных в качестве ключа в словаре Python нельзя, это приведёт к ошибке.
Ключи в словарях Python должны быть неизменяемыми типами данных, такими как строки, числа или кортежи.
В буквальном смысле, анонимная функция — это функция без имени.
Для чего используют lambda
lambda функция хорошо подходит для сортировки многомерных списков по разным параметрам, например, если нужно отсортировать список словарей по разным ключам.
Анонимная функция вида lambda args: expr. Нужна для кратких одноразовых функций
lambda args: expr
(например, sorted(items, key=lambda x: x[1])). Не для сложной логики.
sorted(items, key=lambda x: x[1])
Разница между операторами «==» и «is» в Python заключается в том, что они служат разным целям:
Оператор «==» проверяет равенство значений. Он оценивает, совпадают ли значения двух объектов, и возвращает True, если это так, и False в противном случае. 12
Оператор «is» проверяет идентичность. Он определяет, указывают ли две переменные на один и тот же объект в памяти, и возвращает True, если оба объекта указывают на одно и то же место в памяти, и False в противном случае. 12
Таким образом, «==» сравнивает значения, а «is» проверяет, указывают ли объекты на одну и ту же область памяти.
Self в Python — это ссылка на текущий экземпляр класса. Через self можно получить доступ к атрибутам и методам класса внутри него.
Для чего нужен self:
Доступ к атрибутам и методам экземпляра. С помощью переменной self методы экземпляра могут легко получить доступ к различным атрибутам и другим методам одного и того же объекта.
Изменение состояния экземпляра. Переменная self также способна изменять состояние объекта.
Читаемость и понятность кода. Использование self подчёркивает, что метод применяется к конкретному экземпляру класса, что делает код более ясным и понятным.
Как и где использовать self: self является явным параметром для того, чтобы подчеркнуть, что метод применяется к конкретному экземпляру класса. Это делается при определении метода, когда ему первым аргументом передаётся self. Также переменная self может использоваться для доступа к полю переменной внутри определения класса.
Разница между func и func() в Python заключается в том, что первое представляет собой ссылку на функцию, а второе — вызов (выполнение) этой функции.
func — это представляющий функцию объект, который можно назначить переменной или передать другой функции. Когда используется только имя функции без круглых скобок, это означает, что к самой функции обращаются как к объекту, но не вызывают её.
func() — это вызов функции. Когда после имени функции добавляются круглые скобки, это означает, что её код фактически вызывается (выполняется).
Таким образом, использование func просто ссылается на объект функции, не вызывая её код, а с использованием func(), функция вызывается и выполняет свой код
Сложность алгоритма рассчитывается с использованием верхней (наихудшей) оценки, которая выражается с использованием нотации O.
Выделяют следующие основные категории алгоритмической сложности в O-нотации:
Постоянное время: O(1). Время выполнения не зависит от количества элементов во входном наборе данных. Пример: операции присваивания, сложения, взятия элемента списка по индексу и др..
Линейное время: O(N). Время выполнения пропорционально количеству элементов в коллекции. Пример: найти имя в телефонной книге простым перелистыванием, почистить ковёр пылесосом и т.д..
Пример расчёта сложности алгоритма на примере списка: если len(alist) — это N, тогда цикл for i in range(len(alist)) будет иметь сложность O(N), так как цикл выполняется N раз.
Ещё один пример: итоговая сложность двух вложенных действий равна произведению их сложностей. Например, если некоторая функция f(...) имеет класс сложности O(N2), а её выполняют в цикле N раз, то сложность этого кода будет равна: O(N) × O(N2) = O(N×N2) = O(N3).
super() в Python — это встроенная функция, которая позволяет дочернему классу ссылаться на свой родительский класс. Она даёт возможность вызывать методы, определённые в суперклассе, из подкласса, что позволяет расширять и настраивать функциональность, унаследованную от родительского класса.
Зачем нужен super():
Позволяет не явно ссылаться на базовые классы по имени. Это удобно для доступа к переопределённым методам и предотвращения дублирования кода.
Гарантирует, что все конструкторы суперклассов вызываются в правильном порядке. Это предотвращает проблемы с инициализацией и позволяет каждому классу в иерархии наследования вносить свой вклад в конечное состояние объекта.
Итерация в Python — это процесс обхода элементов итерируемого объекта. То есть это процедура взятия элементов чего-то по очереди. В более общем смысле — последовательность инструкций, которая повторяется определённое количество раз или до выполнения указанного условия.
Итерируемый объект в Python — это любой объект, от которого можно получить итератор. Такими объектами являются, например, списки, кортежи, строки и словари. Итерируемыми объектами могут быть и пользовательские объекты, если в их классе реализован специальный метод iter().
Итератор в Python — это объект, который реализует метод next(), возвращающий следующий элемент итерируемого объекта при каждом вызове, и бросающий исключение StopIteration, когда элементы закончились. Итератор получают с помощью функции iter().
Пример итерации:
numbers = [1, 2, 3, 4, 5]
for num in numbers:
print(num)
Итерабельность: __iter__ возвращает итератор; итератор реализует __next__. Встроенные: iter(), next(), enumerate, zip, map, filter, sorted, reversed, sum, any/all.
__iter__
__next__
iter()
enumerate
zip
map
filter
sorted
reversed
sum
any
all
ER «многие-ко-многим» (3NF): через связующую таблицу (junction) с двумя внешними ключами на сущности и (обычно) составным PK из них; атрибуты связи — в этой таблице.
Порядок разрешения методов (Method Resolution Order, MRO) в Python определяет последовательность, в которой Python ищет методы и атрибуты в иерархии классов. Это особенно важно при работе с множественным наследованием, когда класс может наследовать атрибуты и методы от нескольких родительских классов.
Алгоритм C3-линеаризации определяет MRO путём комбинирования:
Самого класса. Всегда начинаем с самого класса, в котором вызван метод
Списка родительских классов в порядке их перечисления. После текущего класса проверяем базовые классы в том порядке, в каком они указаны при наследовании.
MRO родительских классов в том же порядке. Если один и тот же базовый класс наследуется через несколько путей, он проверяется только один раз и в правильном порядке (все остальные разы он будет пропущен).
Порядок разрешения методов следует линейной последовательности. Это означает, что Python ищет метод от дочернего класса к родительскому, следуя порядку, указанному в определении класса.
Проверить порядок обхода методов и полей класса в Python можно, используя атрибут mro или функцию mro().
Память в Python работает следующим образом:
При запуске программы операционная система создаёт новый процесс и выделяет под него ресурсы. В эту память загружается интерпретатор Python вместе со всеми необходимыми ему для работы данными, включая код программы.
Программа не сама выполняет сохранение и освобождение памяти. Интерпретатор лишь запрашивает это у диспетчера памяти.
Диспетчер памяти делегирует работу, связанную с хранением данных, аллокаторам — распределителям памяти. Непосредственно с оперативной памятью взаимодействует распределитель сырой памяти. Поверх него работают аллокаторы, реализующие стратегии управления памятью, специфичные для отдельных типов объектов.
Виртуальная память Python представляет иерархическую структуру:
Арена — фрагмент памяти, расположенный в пределах непрерывного блока оперативной памяти объёмом 256 Кб. Объекты размером более 256 Кб направляются в стандартный аллокатор C.
Пул — блок памяти внутри арены, занимающий 4 Кб, что соответствует одной странице виртуальной памяти.
Блок — элемент пула размером от 16 до 512 байт. В пределах пула все блоки имеют одинаковый размер.
Для освобождения памяти используются два механизма: счётчик ссылок и сборщик мусора. Счётчик ссылок увеличивается на единицу, когда создаётся что-то, что обращается к объекту, например, сохраняется объект в новой переменной. И наоборот, счётчик уменьшается на единицу, когда перестаётся ссылаться на объект. Если содержимое всех переменных — ссылок на объект изменится, счётчик обнулится. В этот момент Python освободит ячейку памяти, и по её адресу можно будет хранить новый объект.
Сборщик мусора: освобождает память недостижимых объектов. В CPython — счётчики ссылок + циклический GC для контейнеров; финализаторы вызываются по правилам.
В большинстве случаев сборщик мусора работает автоматически, очищая неиспользуемые объекты без какого-либо вмешательства. Однако в некоторых случаях можно запустить сборку мусора вручную с помощью функции gc.collect()
class Solution:
def twoSum(self, nums: List[int], target: int) -> List[int]:
# Dictionary to store number -> index mapping
num_map = {}
for i, num in enumerate(nums):
complement = target - num
# Check if complement exists in our map
if complement in num_map:
return [num_map[complement], i]
# Store current number and its index
num_map[num] = i
# Since problem guarantees exactly one solution,
#we don't need a return here
# Definition for singly-linked list.
# class ListNode:
# def __init__(self, val=0, next=None):
# self.val = val
# self.next = next
def mergeTwoLists(self,
list1: Optional[ListNode],
list2: Optional[ListNode]) -> Optional[ListNode]:
# Create a dummy node to simplify the merging process
dummy = ListNode(0)
current = dummy
# Traverse both lists while both have nodes
while list1 and list2:
if list1.val <= list2.val:
current.next = list1
list1 = list1.next
else:
current.next = list2
list2 = list2.next
current = current.next
# Attach the remaining nodes from whichever list is not exhausted
current.next = list1 if list1 else list2
# Return the head of the merged list (skip the dummy node)
return dummy.next
Есть например большая таблица в 30 строк, есть PK и куча колонок(30,40 или 50 или n).
XCom:
Обмен небольшими данными между задачами в рамках одного запуска DAG.
Привязаны к task instance/dag run; живут недолго, хранятся в БД, не для больших объектов.
Используются через xcom_push/xcom_pull; хороши для промежуточных результатов и метаданных.
Для чего: Передача маленьких данных между
ключи, флаги, статусы, небольшие объекты Не для больших файлов/данных
Данные хранятся в Metadata Database
Сериализация
через Base64
Ограничение размера (~64КВ в зависимости от БД)
Variables:
Глобальное key-value хранилище конфигурации для DAG-ов/окружений.
Долговечные, не зависят от конкретного запуска; управляются через UI/CLI/код, могут браться из Secret Backends.
Не предназначены для передачи промежуточных результатов между задачами.
Хуки (Hook): низкоуровневые адаптеры к внешним системам (БД, S3, HTTP и т.д.). Инкапсулируют подключение/аутентификацию, предоставляют удобные методы работы. Не являются задачами DAG, не планируются, обычно вызываются внутри операторов/сенсоров.
Операторы (Operator): определяют единицу работы (задачу) в DAG. Имеют расписание/зависимости, запускаются планировщиком. Часто используют хуки для реальной работы. Сенсоры — это специализированные операторы ожидания условия.
Хук - Соединение с внешней системой
Пример: PostgresHook, S3Hook
Оператор - Конкретное действие или задача
Пример: PythonOperator, BashOperator
Выполняет работу, использует хуки внутри
Отличие:
Хук = подключение, Оператор = действие
Операторы используют хуки для доступа к системам и выполнения
работы.
Есть алгоритм, называется поиск островов.
Есть ПО которое формирует csv файлы, его загружал в Clickhouse. Программа закрывалась несколько раз, а она не должна. Каждый день в 9 часов, выкладывался отчет со строками, сколько минут простаивала прога. Принцип алгоритма в том, что у вас есть последовательно идущие числа. Островом являются числа 1-3, 5-7, 10-13. Нужно найти минимальное и максимальное значение острова row_number применяется
Webserver
UI для мониторинга и управления
Показывает DAGs, логи, статусы задач
Scheduler
Мозг системы - планирует и запускает задачи
Определяет когда выполнять DAGs и таски
Работает с метаданными в БД
Executor
Исполнитель - как запускать задачи:
SequentialExecutor - последовательно (dev)
LocalExecutor - мультипроцессорный
CeleryExecutor - распределенный
KubernetesExecutor - B K8s pods
Workers
Воркеры - выполняют задачи (для Celery/K8s)
Масштабируются горизонтально
Metadata Database
БД (PostgreSQL/MySQL) - хранит:
DAGS метаданные
Статусы задач
ХСоm, переменные, connections
mode="poke" (по умолчанию): периодически «пикает» условие и удерживает воркер-слот до срабатывания или таймаута.
mode="reschedule": освобождает воркер-слот между проверками, задача пере-планируется на следующую проверку.
Дополнительно в Airflow 2.2+ существуют деферрящиеся сенсоры (deferrable=True), которые передают ожидание триггеру (Triggerer), полностью разгружая воркеры до наступления события.
SequentialExecutor: один процесс, по одной задаче за раз. Подходит для разработки/отладки, не для продакшена.
LocalExecutor: параллельное выполнение на одной машине (многопроцессность). Простой деплой, ограничен ресурсами ноды.
CeleryExecutor: распределённое выполнение через Celery, брокер (Redis/RabbitMQ) и result backend. Горизонтально масштабируется за счёт воркеров.
KubernetesExecutor: каждая задача — отдельный Pod в Kubernetes. Тонкая настройка ресурсов/изоляции, авто‑масштабирование без внешнего брокера.
CeleryKubernetesExecutor: гибрид Celery и Kubernetes — позволяет направлять часть задач в Celery, часть в K8s в зависимости от очередей/политик.
DaskExecutor: Распределенные вычисления через Dask. Для вычислительно сложных задач
Ключевые различия: способ параллелизма (локально vs распределённо), потребность во внешних компонентах (брокер/кластер K8s), изоляция задач, сложность эксплуатации и масштабируемость.
Модель — это SELECT в .sql с Jinja, конфигом и зависимостями через ref()/source(), который dbt компилирует в DAG и материализует (view/table/incremental/ephemeral).
К модели привязываются тесты/доки через schema.yml, теги, pre/post hooks.
Короткий ответ:
dbt‑модель — это SQL+Jinja SELECT с config, зависящий от ref()/source(). При выполнении dbt материализует его по стратегии (view/table/incremental/ephemeral), учитывая тесты и зависимости.
-- models/mart/sales.sql
{{ config(materialized='incremental', unique_key='order_id', on_schema_change='append_new_columns') }}
with src as (select * from {{ source('raw','orders') }})
select order_id, amount, updated_at
from src
{% if is_incremental() %}
where updated_at > (select coalesce(max(updated_at), '1900-01-01') from {{ this }})
{% endif %}
Базовые: view, table, incremental, ephemeral (не создаёт объект в БД).
У incremental важно упомянуть стратегии: append, merge/upsert, insert_overwrite (зависят от адаптера), и параметры unique_key, on_schema_change.
view/table/incremental/ephemeral; для incremental выбираем стратегию (append/merge/insert_overwrite), задаём unique_key, on_schema_change.
CSV в seeds/, грузятся dbt seed, можно типы/квотинг/схему, хорошо для небольших справочников/маппингов, версионируются.
seed — CSV из репозитория, грузимый dbt seed в БД; используем для небольших статичных справочников и маппингов, настраиваем типы и схему в dbt_project.yml.
seeds:
+schema: ref
my_project:
countries.csv:
column_types:
code: string
At‑least‑once: возможны дубли из‑за ретраев.
Exactly‑once (EOS): достигается идемпотентным и транзакционным продюсером (enable.idempotence=true, transactional.id), с атомарной записью и коммитом оффсетов. End‑to‑end EOS требует идемпотентного/транзакционного sink’а.
At‑least‑once — без потерь, но с дублями при ретраях. Exactly‑once — атомарная обработка (read‑process‑write) с идемпотентным/транзакционным продюсером; дороже, но без дублей при корректном sink’е.
Только “проверки у consumer” недостаточно; нужны идемпотентность на продюсере и/или upsert на приёмнике.
Короткий ответ (варианты, комбинируемые):
Идемпотентный продюсер: enable.idempotence=true, acks=all, retries>0.
Ключ + уникальный event_id; дедуп по окну в стрим‑движке (state store, TTL) или log compaction по ключу.
Идемпотентный sink: MERGE/UPSERT в DWH (unique_key), INSERT ... ON CONFLICT ... в OLTP.
Транзакционная обработка: запись данных и коммит оффсета в одной транзакции (Kafka Streams/Flink 2PC, outbox‑pattern).
Разделить: оркестратор vs DWH/compute; измерить, а не “в целом оптимизировать”.
Короткий ответ (шаги):
Локализуем: метрики времени по таскам/моделям, профилируем (Airflow: concurrency/pools/SLAs/кванты очередей; DWH: query runtime).
Анализ SQL: EXPLAIN/ANALYZE, партиции/кластеризация, порядок джоинов, селективность, статистики; устраняем shuffle/skew, фильтруем раньше.
Стратегия сборки: материализовать тяжёлые CTE (не ephemeral), перейти на incremental, insert_overwrite по партициям.
Ресурсы: размер кластера, параллелизм, лимиты пула, локи; после крупных MERGE — VACUUM/ANALYZE (где применимо).
Не обязательно “пересобрать всё целиком”; обычно бэкфилл по партициям + инкрементальная логика.
Разделить: это атрибут измерения (SCD2) или факта?
Если измерение: делаем SCD2 (dbt snapshots или свой MERGE) с valid_from/valid_to/is_current; джойним факт по времени события.
Если факт: добавляем колонку, включаем on_schema_change: append_new_columns, пишем бэкфилл‑модель по партициям (год/месяц) с insert_overwrite/merge.
План: side‑by‑side таблица/вью → бэкфилл батчами → валидация (counts/max_ts/sample diffs) → атомарный свитч.
Помимо “таблицы с max‑датой” нужны freshness‑чекы и тесты качества; плюс системные метрики.
Freshness: dbt source freshness с SLO и алертами.
Тесты: not_null/unique/relationships/accepted_values, дрейф объёма/распределений, data‑diff между stage↔mart.
Аудит: техтаблица/метрики на каждый mart (loaded_at, max_event_ts, row_count, checksum/hash) → Prometheus/Grafana/Alerting (Telegram/Slack).
Оркестратор/стрим: SLA, consumer‑lag, ретраи/дедлеттер, процент ошибок sink’а.
-- пример аудита после обновления витрины
insert into audit.marts (model, loaded_at, max_event_ts, row_count)
select 'mart_sales', current_timestamp, max(event_ts), count(*) from {{ ref('mart_sales') }};
Last changed14 days ago