Compare commits

...

4 Commits

Author SHA1 Message Date
Vladimir Protsenko 352021f75c Update for 2026 4 weeks ago
Vladimir Protsenko 938a629719 Update 'README.md' 1 year ago
Vladimir Protsenko 1b3a359c2d how defend 2 years ago
Vladimir Protsenko 00a4f61990 2024 2 years ago

@ -1,684 +0,0 @@
# Введение
Крупномасштабным распределённым приложениям требуются различные формы синхронизации для достижения консенсуса о базовой информации окружения. Выделение примитивов синхронизации в API позволяет выделить ядро координации из логики приложения в отдельный сервис. Являясь частью критической инфраструктуры, к такому сервису предъявляются в первую очередь требования надёжности и доступности. Подобный дизайн делает разработку и поддержку распределённого приложения проще.
ZooKeeper является проектом с открытым исходным кодом, который предоставляет отказоустойчивый распределённый сервис хранения критичных для работы кластера данных. Хранимыми данными могут быть: конфигурационная информация, иерархическое пространство имён, url ссылки, идентификаторы задач и прочее. ZooKeeper, созданный для внутренних нужд компании Yahoo!”, в настоящее время стал использоваться такими открытыми технологиями, как: Apache HBase, HDFS, Apache Storm, Apache Kafka и др.
Сайт проекта http://zookeeper.apache.org/.
Статья: Hunt P. et al. ZooKeeper: Wait-free Coordination for Internet-scale Systems //USENIX annual technical conference. 2010. Т. 8. №. 9.
# Цель работы
- запустить ZooKeeper,
- изучить директорию с установкой ZooKeeper,
- запустить интерактивную сессию ZooKeeper CLI и освоить её команды,
- научиться проводить мониторинг ZooKeeper,
- разработать приложение с барьерной синхронизацией, основанной на ZooKeeper,
- запустить и проверить работу приложения.
Данная лабораторная может выполняться на виртуальной машине создаваемой по Vagrant конфигурации в приложении А или с ZooKeeper, установленным в Windows.
# Изучение ZooKeeper
## Установка
Данный шаг может быть пропущен, если вы будете работать с Vagrant.
Перейдите на страницу Download на официальном сайте https://zookeeper.apache.org/ и скачайте последнюю стабильную версию (на момент написания 3.4.14).
Архив содержит скрипты как для Windows, так и для Unix операционных систем. Необходимым условием для работы Zookeeper является наличие в системе Java Runtime Environment.
Распакуйте архив в директорию `С:\Temp` в Windows. Если вы используете Unix систему, распакуйте и переместите содержимое архива в `/opt`. Набор команд, устанавливающих ZooKeeper в CentOS 7, вы можете найти в Vagrantfile из приложения А.
Перед первым запуском переименуйте файл `zoo_sample.cfg` в директории `conf` в `zoo.cfg`. Файл `zoo.cfg`, который на данный момент содержит базовые настройки, используется для конфигурации сервера.
*Примечание.* В Windows системе в `Панель управления\Система и безопасность\Система -> Дополнительные параметры системы -> Переменные среды` можно установить `JAVA_HOME`, если она не была сконфигурирована вами или администратором после установки Java в систему.
## Запуск
### Windows
В Windows запустите сервер двойным кликом по скрипту `zkServer.cmd` в папке `./bin/` или из терминала, набрав:
```
zkServer.cmd
```
### Linux
```
zkServer.sh
```
Проверьте, что zookeeper работает
```
systemctl status zookeeper
```
## Изучение директории установки ZooKeeper
Перейдите в директорию установки ZooKeeper.
Изучите содержимое директории.
В директории находятся следующие папки:
- bin с исполняемыми файлами для запуска, остановки и взаимодействия с ZooKeeper,
- conf с конфигурационными файлами,
- contrib с инструментами для интеграции ZooKeeper в другие системы: rest, fuse, perl и python библиотеки,
- dist-maven артефакты Maven,
- docs в которой хранится документация,
- recipes различные рецепты, помогающие решать задачи с использованием ZooKeeper (выбор лидера, блокировки, очереди),
- src с исходным кодом и тестовыми скриптами.
## Взаимодействие с ZooKeeper через командный интерфейс CLI
Одним из способов взаимодействия с ZooKeeper является консольный интерфейс ZooKeeper CLI. В реальных задачах вы скорее всего будете использовать ZooKeeper клиентскую библиотеку, однако взаимодействие через CLI является прекрасной возможностью изучить систему и также полезно для ряда задач.
Прежде всего нам понадобится перейти в папку bin директории установки ZooKeeper.
Для запуска интерактивной сессии ZooKeeper CLI используйте скрипт zkCli с расширением в зависимости от той среды, в который вы его запускаете (sh - unix, cmd - windows).
Следующая команда устанавливает подключение к ZooKeeper CLI сессии:
```
./zkCli.sh -server 127.0.0.1:2181
```
*Примечание.* При запуске zkCli.sh без параметров по умолчанию подключение производится к localhost:2181, поэтому явно указанные выше параметры вы можете опустить.
Подключение установлено. Для вывода всех возможных команд наберите help.
```
[zk: localhost:2181(CONNECTED) 0] help
ZooKeeper -server host:port -client-configuration properties-file cmd args
addWatch [-m mode] path # optional mode is one of [PERSISTENT, PERSISTENT_RECURSIVE] - default is PERSISTENT_RECURSIVE
addauth scheme auth
close
config [-c] [-w] [-s]
connect host:port
create [-s] [-e] [-c] [-t ttl] path [data] [acl]
delete [-v version] path
deleteall path [-b batch size]
delquota [-n|-b|-N|-B] path
get [-s] [-w] path
getAcl [-s] path
getAllChildrenNumber path
getEphemerals path
history
listquota path
ls [-s] [-w] [-R] path
printwatches on|off
quit
reconfig [-s] [-v version] [[-file path] | [-members serverID=host:port1:port2;port3[,...]*]] | [-add serverId=host:port1:port2;port3[,...]]* [-remove serverId[,...]*]
redo cmdno
removewatches path [-c|-d|-a] [-l]
set [-s] [-v version] path data
setAcl [-s] [-v version] [-R] path acl
setquota -n|-b|-N|-B val path
stat [-w] path
sync path
version
whoami
```
Выйти из консоли вы можете с помощью команды quit или отправив EOF символ сочетанием Ctrl+D.
Далее последует изучение возможностей CLI интерфейса. Вы научитесь добавлять и удалять разные типы узлов znode, считывать и записывать данные в znode из CLI, разбираться в управлении конфигурациями на базовых примерах.
Находясь в консоли CLI введите команду `ls /`.
```
[zk: localhost:2181(CONNECTED) 2] ls /
[zookeeper]
```
В результе вы должны получить список узлов в корне иерархической структуры данных ZooKeeper. В данном случае выводится один узел. Аналогично вы можете изучать некорневые узлы. Выведите список дочерних узлов `/zookeeper`.
*Примечание.* Используйте автоматическое дополнение при наборе, срабатываемое по нажатию клавиши TAB.
Теперь в корне создайте свой узел `/mynode` с данными "first_version" следующей командой:
```
create /mynode 'first_version'
```
Проверьте, что в корне появился новый узел.
```
[zk: localhost:2181(CONNECTED) 4] ls /
[mynode, zookeeper]
```
Следующие команды возвращают данные и метаданные узла:
```
get /mynode
stat /mynode
```
```
[zk: localhost:2181(CONNECTED) 10] get /mynode
first_version
[zk: localhost:2181(CONNECTED) 13] stat /mynode
cZxid = 0x2
ctime = Wed Sep 01 10:51:29 SAMT 2021
mZxid = 0x2
mtime = Wed Sep 01 10:51:29 SAMT 2021
pZxid = 0x2
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 13
numChildren = 0
```
Изучим структуру, хранимую в узле
|Поле | Описание |
| ----| ----|
|'first_version'| Хранимые данные|
|cZxid| Номер транзакции создания узла в системе|
|ctime| Время создания узла|
|mZxid| Номер транзакции модификации узла|
|mtime| Время модификации узла|
|pZxid| Номер транзакции модификации дочерних узлов|
|cversion| Количество изменений дочерних узлов|
|dataVersion| Количество изменений данных узла|
|aclVersion| Количество изменений прав доступа к данному узлу|
|ephemeralOwner| Идентификатор сессии владельца узла, если узел эфимерный. Иначе значение равно нулю.|
|dataLength| Размер данных|
|numChildren| Количество дочерних узлов|
Измените данные узла на "second_version":
```
set /mynode 'second_version'
```
```
[zk: localhost:2181(CONNECTED) 15] get /mynode
second_version
[zk: localhost:2181(CONNECTED) 16] stat /mynode
cZxid = 0x2
ctime = Wed Sep 01 10:51:29 SAMT 2021
mZxid = 0x3
mtime = Wed Sep 01 10:56:11 SAMT 2021
pZxid = 0x2
cversion = 0
dataVersion = 1
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 14
numChildren = 0
```
В качестве результата мы получим обновлённые метаданные узла. Обратите внимание на те значения, которые изменились.
Изменилась дата модификации `mtime` и значение поля `dataVersion` стало больше на единицу, так как мы провели одно изменение. Также изменился размер данных.
Теперь создайте два нумерованных (sequential) узла в качестве дочерних mynode:
```
create -s /mynode/child 'im_sequential'
create -s /mynode/child 'me_too'
```
```
[zk: localhost:2181(CONNECTED) 17] create -s /mynode/child 'im_sequential'
Created /mynode/child0000000000
[zk: localhost:2181(CONNECTED) 18] create -s /mynode/child 'me_too'
Created /mynode/child0000000001
```
Передав дополнительно флаг -s, мы указали, что создаваемый узел нумерованный. Этот способ позволяет создавать узлы с уникальными именами, по которым можно узнать порядок поступления запросов на сервер.
*Пример.* Принадлежность клиентов к группе
Несмотря на то, что ZooKeeper используется, как правило, из программного кода, мы можем эмулировать простой сценарий мониторинга принадлежности клиентов к группе в CLI.
В данном примере в корне zookeeper файловой системы будет создан узел под именем mygroup. Затем несколько сессий CLI будут эмулировать клиентов, добавляющих себя в эту группу. Клиент будет добавлять эфимерный узел в mygroup иерархию. При закрытии сессии узел автоматически будет удаляться.
*Примечание.* При использовании Vagrant управляемой виртуальной машины из VisualCode создайте несколько терминалов и установите ssh соединение в каждом командой `vagrant ssh`.
Этот сценарий может применяться для реализации сервиса разрешения имён (DNS) узлов кластера. Каждый узел регистрирует себя под своим именем и сохраняет свой url или ip адрес. Узлы, которые временно недоступны или аварийно завершили работу, в списке отсутствуют. Таким образом директория хранит актуальный список работающих узлов с их адресами.
Внутри CLI сессии, создайте узел mygroup.
```
create /mygroup 'top_node'
```
Откройте две новых CLI консоли и в каждой создайте по дочернему узлу в mygroup:
*Примечание.* Удобный способ открыть CLI консоль в windows `Shift + Правая кнопка мыши -> Открыть окно PowerShell здесь` в директории zookeeper, затем выполнить `bin/zkCli.cmd`.
Консоль 1 - grue.
```
create -e /mygroup/grue 'iam_grue'
```
Консоль 2 - bleen.
```
create -e /mygroup/bleen 'iam_bleen'
```
Эфимерный тип узла задаётся ключом `-e`.
Проверьте в исходной консоли, что grue и bleen являются членами группы mygroup.
![](images/1_.png)
Представим теперь, что одному из клиентов нужна информация о другом клиенте (к качестве клиентов могут выступать узлы кластера). Этот сценарий эмулируется получением информации командой `get`, которую мы уже запускали ранее. Выберите консоль grue и обратитесь к информации узла bleen.
```
get /mygroup/bleen
```
Информацией, которая хранится в узле клиента может быть url адрес клиента, либо любая другая информация требуемая для работы распределённого приложения.
Теперь эмулируйте аварийное отключение любого клиента. Нажмите сочетание клавиш Ctrl+D в одной из консолей, создавшей эфимерный узел.
Проверьте, что соответствующий узел пропал из mygroup. Изменение списка дочерних узлов может произойти не сразу — от 2 до 20 `tickTime`, значение которого вы можете посмотреть в `zoo.cfg`.
```
[zk: localhost:2181(CONNECTED) 53] ls /mygroup
[bleen, grue]
[zk: localhost:2181(CONNECTED) 54] ls /mygroup
[bleen, grue]
[zk: localhost:2181(CONNECTED) 55] ls /mygroup
[bleen, grue]
[zk: localhost:2181(CONNECTED) 56] ls /mygroup
[bleen, grue]
[zk: localhost:2181(CONNECTED) 57] ls /mygroup
[bleen, grue]
[zk: localhost:2181(CONNECTED) 58] ls /mygroup
[bleen, grue]
[zk: localhost:2181(CONNECTED) 59] ls /mygroup
[bleen, grue]
[zk: localhost:2181(CONNECTED) 60] ls /mygroup
[bleen]
[zk: localhost:2181(CONNECTED) 61]
```
Таким образом клиенты могут получать информацию о появлении и отключении других клиентов.
В заключении удалите узел `/mygroup`.
```
delete /mygroup
```
### Пример управления конфигурацией распределённого приложения
Хранение конфигурационной информации в ZooKeeper одно из наиболее популярных приложений. Мы будем эмулировать данную концепцию также с помощью CLI.
Использование ZooKeeper для хранения конфигурационной информации имеет два преимущества. Первое состоит в том, что новые клиенты могут узнавать конфигурацию кластера и определять свою роль самостоятельно. Второе преимущество заключается в возможности подписки на обновление конфигурационных параметров. Это позволяет динамически реагировать на изменения в конфигурации во время выполнения, что необходимо в режиме работы 24/7.
Создадим в корне узел "myconfig" в задачу которого будет входить хранение конфигурации. В нашем случае узел будет хранить строку `'sheep_count=1'`.
Во всех случаях, когда конфигурационная информация нашего гипотетического распределённого приложения будет изменяться, мы будем обновлять znode строкой с новым значением. Другим клиентам распределённого приложения достаточно проверять хранимые в этом узле данные.
Откройте новую консоль и подключитесь к ZooKeeper. Данная консоль будет играть роль физического сервера, который ожидает получить оповещение в случае изменения конфигурационной информации, записанной в `/myconfig` znode.
Следующая команда устанавливает watch-триггер на узел:
```
get /myconfig -w true
```
Вернитесь к первому терминалу и измените значение myconfig:
```
set /myconfig 'sheep_count=2'
```
Во втором терминале должно появиться оповещение об изменении данных!
```
[zk: localhost:2181(CONNECTED) 87]
WATCHER::
WatchedEvent state:SyncConnected type:NodeDataChanged path:/myconfig
```
Триггер сбрасывается после одного срабатывания, а значит его придётся 'взводить' каждый раз заново. Как правило, в приложении, в логике обработчика события присутствует такая процедура.
Удалите узел `/myconfig`. Проверьте, что эта команда выполнилась.
Примеры приложений:
- Эфимерные узлы в сочетании возможностью быть нумерованными позволяют реализовать механизм аварийного переключения https://ru.wikipedia.org/wiki/Аварийное_переключение, производить выбор лидера, обеспечивать координацию доступа к ресурсам.
- Асинхронная передача и рассылка сообщений.
Другие примеры использования ZooKeeper https://zookeeper.apache.org/doc/r3.4.5/recipes.html.
## Мониторинг ZooKeeper
ZooKeeper позволяет проводить мониторинг его состояния с использованием 4 буквенных команд: `conf`, `cons`, `stat` и других. Полный список команд доступен по адресу https://zookeeper.apache.org/doc/r3.7.0/zookeeperAdmin.html#sc_4lw .
**Примечание. С коммита https://github.com/apache/zookeeper/commit/5fe68506f217246c7ebd96803f9c78e13ec2f11a Zookeeper 4-буквенные команды отключены по умолчанию. Вы можете их включить при запуске сервера с параметром JVM `-Dzookeeper.4lw.commands.whitelist=*`.**
Команды можно отправлять на сервер из linux терминала с помощью утилит `echo` и `netcat`. Для некоторых linux систем `netcat` может быть вызван по сокращённому имени `nc`. Синтаксис команды:
```
echo <команда> | nc <имя или адрес хоста zookeeper> <порт>
```
В Windows вы можете установить подключение к zookeeper командой telnet
```
telnet <имя или адрес хоста zookeeper> <порт>
```
Для следующей команды сервер должен прислать ответ "imok", если он функционирует и доступен:
```
echo ruok | nc localhost 2181
```
Узнать версию zookeeper позволяет следующая команда:
```
echo stat | nc localhost 2181
```
Следующая команда возвращает конфигурационные параметры ZooKeeper сервера:
```
echo conf | nc localhost 2181
```
После выполнения распечатываются следующие параметры:
- порт клиента (`clientPort`),
- путь к директории, где хранятся данные (`dataDir`),
- путь к директории, где хранятся логи сервера (`dataLogDir`),
- интервал часов, указанный миллисекундах (`tickTime`),
- максимальное количество подключений к серверу (`maxClientCnxns`),
- минимальный и максимальный таймаут сессии (`minSessionTimeout`, `maxSessionTimeout`),
- идентификатор сервера.
Вышеперечисленные параметры могут быть указана в конфигурационном файле в директории conf (полный путь `/opt/ibm/biginsights/zookeeper/conf/`).
Отправьте команду `cons` для получения списка подключений всех клиентов с детальной информацией о сессиях.
```
echo cons | nc localhost 2181
```
Команды cons и conf дают детальную информацию. Для получения более общей информации используйте stat.
```
echo stat | nc localhost 2181
```
В распределённом варианте работы ZooKeeper команду dump для вывода текущих сессий и связанных с ними эфимерных узлов следует выполнять на узле-лидере. В текущей лабораторной работе все данные будут связаны с локально выполняемыми клиентскими процессами.
```
echo dump | nc localhost 2181
```
Для того, чтобы посмотреть информацию о watch-триггерах используйте команду wchs:
```
echo wchs | nc localhost 2181
```
Существуют и другие команды, найти которые вы можете в документации по администрированию ZooKeeper (Administrator's Guide).
На этом первая часть работы завершена.
# Разработка распределённого приложения
ZooKeeper поставляется с двумя клиентами для языков C и Java. В текущей лабораторной мы будем использовать Java API для реализации приложения с барьерной синхронизацией. Аналогия с животными и зоопарком позволит лучше понять концепты ZooKeeper.
Вы создадите зоопарк, который будет представлен корневым узлом `/zoo/`. Каждое животное, ваше приложение, будет входить в зоопарк, оно будет создавать дочерний эфимерный узел в зоопарке со своим именем. После того, как все животные будут в сборе, каждое начнёт бежать и остановится через определённый период времени. В конце приложения эфимерные узлы будут явно удалены.
## Настройка среды и проекта
Разработка распределённого приложения будет вестись на языке Scala в IDE IntelliJ Idea.
Создайте новый проект SBT аналогично тому, как это делалось в лабораторной работе 4. Дождитесь когда SBT инициализирует проект. Это может занять несколько минут.
Добавьте в качестве зависимости библиотеку ZooKeeper в `build.sbt` как `provided`. Определите версию ZooKeeper. Координаты библиотеки соответствующей версии вы можете найти в https://mvnrepository.com/.
![](images/2_.png)
*Примечание.* При указании zookeeper зависимости может потребоваться исключения из зависимостей: `com.sun.jdmk`, `com.sun.jmx`, `javax.jms`.
## Реализация логики приложения
Инициализируйте пакет `zoo` в папке `src/main/scala/`.
В пакете `zoo` создайте scala объект `Main` и поместите туда следующий код.
```scala
package zoo
object Main {
val sleepTime = 100
def main(args: Array[String]): Unit = {
println("Starting animal runner")
val Seq(animalName, hostPort, partySize) = args.toSeq
val animal = Animal(animalName, hostPort, "/zoo", partySize.toInt)
try {
} catch {
case e: Exception => println("Animal was not permitted to the zoo. " + e)
}
}
}
```
Этот код объявляет главный класс с методом `main`. Программа ожидает в качестве аргументов список: имя животного, адрес и порт zookeeper, размер группы животных. Далее следует создание объекта `Animal` на основе параметров: имя животного, адрес и порт zookeeper, путь к корневому узлу для узлов животных, величина группы животных. В конце метода main располагается try/catch блок, в котором будет выполняться код взаимодействующий с ZooKeeper.
*Примечание.* Приём, с помощью которого присваиваются списку имён переменных соответствующие значения списка аргументов, называется "сопоставление по шаблону" (pattern matching).
Нашей следующей задачей будет реализация класса `Animal` и заполнение тела try блока.
В методе enter объект `Animal` должен связываться с ZooKeeper, создавать эфимерный узел с именем `animalName` и подписываться на обновления группы `/zoo`.
```scala
animal.enter()
println(s"${animal.name} entered.")
```
Напишите цикл, в котором с интервалом sleepTime в миллисекундах печатается сообщение о работе процесса. Количество итераций может быть случайным.
```scala
for (i <- 1 to Random.nextInt(100)) {
Thread.sleep(sleepTime)
println(s"${animal.name} is running...")
}
animal.leave()
```
Перейдём к реализации класса Animal. Для удобства обращения к полям класса будем использовать case class. Инициализация Animal заключается в установлении соединения с ZooKeeper, определении переменных mutex и animalPath.
```scala
package zoo
import org.apache.zookeeper._
case class Animal(name:String, hostPort:String, root:String, partySize:Integer) extends Watcher {
val zk = new ZooKeeper(hostPort, 3000, this)
val mutex = new Object()
val animalPath = root+"/"+name
if (zk == null) throw new Exception("ZK is NULL.")
}
```
Для реакции на события от ZooKeeper класс должен реализовывать метод process интерфейса Watcher.
```scala
override def process(event: WatchedEvent): Unit = {
// код реакции на событие
}
```
Так как обработка событий и проверка условия барьера выполняются в разных потоках нам понадобится выполнять код методов в синхронизованном блоке. Синхронизация в Scala похожа на Java и выполняется на объекте-мьютексе. Ниже приведён пример блока синхронизации.
```scala
mutex.synchronized {
// код
}
```
Реализуем простую реакцию $-$ вывод на экран события.
```scala
override def process(event: WatchedEvent): Unit = {
mutex.synchronized {
println(s"Event from keeper: ${event.getType}")
}
}
```
Далее перейдём к реализации метода enter.
```scala
def enter():Boolean = {
// код создания узла и ожидания у барьера
}
```
В начале метода создайте эфимерный узел.
```scala
zk.create(animalPath, Array.emptyByteArray, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL)
```
Затем в синхронизованном блоке в цикле напишите код, ожидающий появления в корневом узле `/zoo` всех животных.
```scala
mutex.synchronized {
while (true) {
val party = zk.getChildren(root, this)
if (party.size() < partySize) {
println("Waiting for the others.")
mutex.wait()
println("Noticed someone.")
} else {
return true
}
}
}
return false
}
```
Реализуйте оставшийся метод, который вызывается в конце приложения и удаляет созданный в начале эфимерный узел с помощью метода `delete`.
## Проверка работоспособности приложения
Запустите несколько клиентов распределённого приложения и проверьте его работу.
![](images/3_.png)
*Примечание.* Перед запуском создайте корневой узел животных `/zoo`, если он ещё не создан.
*Примечание.* Убедитесь, что библиотеки, которые используются проектом (zookeeper, log4j и другие) находятся в области видимости `CLASSPATH`. Окружения этапов компиляции, тестирования и исполнения как правило различны.
## Упражнения
С использованием Zookeeper сервиса:
1. Решите проблему обедающих философов (каждый философ - отдельный процесс в системе)
2. Реализуйте двуфазный коммит протокол для high-available регистра (каждый регистр - отдельный процесс в системе)
# Приложение А. Инициализация виртуальной машины с помощью Vagrant
Для работы Vagrant необходим гипервизор Oracle VM VirtualBox. Для инициализации виртуальной машины поместите следующие файлы в пустую папку, находясь в ней откройте терминал и запустите команду `vagrant up`.
Vagrantfile
```Vagrantfile
# -*- mode: ruby -*-
# vi: set ft=ruby :
# All Vagrant configuration is done below. The "2" in Vagrant.configure
# configures the configuration version (we support older styles for
# backwards compatibility). Please don't change it unless you know what
# you're doing.
Vagrant.configure("2") do |config|
# The most common configuration options are documented and commented below.
# For a complete reference, please see the online documentation at
# https://docs.vagrantup.com.
# Every Vagrant development environment requires a box. You can search for
# boxes at https://vagrantcloud.com/search.
config.vm.box = "centos/7"
# config.vm.provider "virtualbox" do |vb|
# # Display the VirtualBox GUI when booting the machine
# vb.gui = true
#
# # Customize the amount of memory on the VM:
# vb.memory = "1024"
# end
#
# View the documentation for the provider you are using for more
# information on available options.
# Enable provisioning with a shell script. Additional provisioners such as
# Puppet, Chef, Ansible, Salt, and Docker are also available. Please see the
# documentation for more information about their specific syntax and use.
config.vm.provision "file", source: "zookeeper.service", destination: "zookeeper.service"
config.vm.provision "shell", inline: <<-SHELL
yum install nc -y
yum install java-11-openjdk -y
yum install wget -y
wget http://apache-mirror.rbc.ru/pub/apache/zookeeper/zookeeper-3.4.14/zookeeper-3.4.14.tar.gz
tar zxf zookeeper-3.4.14.tar.gz
mv zookeeper-3.4.14 /opt/
cp /opt/zookeeper-3.4.14/conf/zoo_sample.cfg /opt/zookeeper-3.4.14/conf/zoo.cfg
mv zookeeper.service /etc/systemd/system/zookeeper.service
chmod 664 /etc/systemd/system/zookeeper.service
systemctl start zookeeper
SHELL
end
zookeeper.service
[Unit]
Description=Zookeeper
After=syslog.target
[Service]
SyslogIdentifier=zookeeper
TimeoutStartSec=10min
Type=forking
ExecStart=/opt/zookeeper-3.4.14/bin/zkServer.sh start
ExecStop=/opt/zookeeper-3.4.14/bin/zkServer.sh stop
[Install]
WantedBy=multi-user.target
```
# Дополнительные ссылки
1. https://www.youtube.com/c/DistributedSystemsCourse
2. https://www.youtube.com/c/lindseykuperwithasharpie
3. https://lamport.azurewebsites.net/video/videos.html (recent talk: https://www.youtube.com/watch?v=Ocxczi-CvRQ)
4. https://hydraconf.com/ (free talks: https://www.youtube.com/channel/UCcwI0q9tsGZYZDvz5mLsXZA)
5. Lynch, Nancy A. Distributed Algorithms. 1996. https://dl.acm.org/doi/book/10.5555/2821576
6. Martin Kleppmann. Designing Data-Intensive Applications: The Big Ideas Behind Reliable, Scalable, and Maintainable Systems
7. Alex Petrov. Database Internals: A Deep Dive Into How Distributed Data Systems Work.

@ -1,56 +0,0 @@
# Подсказки к упражениям
## Двухфазный коммит протокол на основе ZooKeeper
На основе материалов: https://zookeeper.apache.org/doc/r3.4.2/recipes.html#sc_recipes_twoPhasedCommit и https://stackoverflow.com/questions/24635777/how-to-implement-2pc-in-zookeeper-cluster
[![IMAGE_ALT](https://img.youtube.com/vi/yu2TZF7S1Mg/3.jpg)](https://youtu.be/yu2TZF7S1Mg?t=964)
Протокол двухфазной коммита — это алгоритм, позволяющий всем клиентам в распределенной системе договориться либо о коммите транзакции, либо о ее откате.
Взаимодействующие компоненты:
- создатель транзакции,
- координатор транзакции,
- исполнители транзакции,
- zookeeper,
- (опционально) прямой канал связи создателя транзакции и координатора,
- (опционально) прямые каналы связи между координатором и исполнителями.
### Вариант 1.
В ZooKeeper вы можете реализовать двухфазный коммит протокол, если координатор создаст узел транзакции, скажем "/app/Tx", и один дочерний узел для каждого исполнителя транзакции, скажем "/app/Tx/node_i". Когда *координатор* создает дочерний узел, он оставляет его содержимое неопределенным.
Первая фаза:
Как только каждый исполнитель, участвующий в транзакции, получает транзакцию от координатора, исполнитель читает каждый дочерний узел "node_i" и подписывается на события изменения транзакционного узла. Затем каждый исполнитель обрабатывает запрос и принимает решение "commit" или "abort", записывая данные в свой узел.
Вторая фаза:
Как только запись завершается, другие исполнители получают уведомление, и как только все исполнители получат все голоса, они могут принять решение либо "commit", либо "abort". Обратите внимание, что узел может принять решение абортировать транзакцию раньше, если какой-либо исполнитель проголосует за "abort".
Интересным аспектом этой реализации является то, что единственная роль координатора заключается в определении группы исполнителей, создании узлов ZooKeeper и распространении транзакции на соответствующим исполнителям. Фактически, даже распространение транзакции может быть сделано через ZooKeeper путем записи в узле транзакции.
У описанного выше подхода есть два важных недостатка:
— это сложность сообщений, которая составляет O(n²).
— невозможность обнаружения аварийного завершения исполнителей.
Для решения первой проблемы можно сделать так, чтобы об изменениях в узлах транзакций уведомлялся только координатор, а затем уведомлять исполнителей, как только координатор примет решение. Обратите внимание, что этот подход остаётся масштабируемым, но он медленнее, поскольку требует, чтобы все коммуникации проходили через координатора.
Для решения второй проблемы можно сделать так, чтобы координатор распространял транзакцию исполнителям, а каждый исполнитель создавал свой собственный эфемерный узел.
### Вариант 2.
1. Координатор C регистрирует транзакционный узел /app/tx
Первая фаза:
2. Координатор уведомляет исполнителей о транзакции
3. Координатор подписывается на изменения транзакционного узла (устанавливает WATCH на /app/tx)
4. Каждый исполнитель создает эфемерных узел /app/tx/node_i с решением commit/abort
5. Исполнитель подписывается на события своего узла для получения решения от координатора ( вторая фаза )
Вторая фаза:
6. Координатор принимает решение о commit/abort после ожидания таймаута или после создания всех узлов исполнителей с решением commit
7. Координатор изменяет значение эфемерных узлов для каждого исполнителя на commit / abort
8. Исполнители применяют / прерывают транзакцию
9. Исполнители обновляют значение узла на committed

Binary file not shown.

Before

Width:  |  Height:  |  Size: 22 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 111 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 148 KiB

585
QnA.md

@ -1,585 +0,0 @@
# QnA
## Содержание
- [Apache Spark](#apache-spark)
- [Hive](#hive)
- [MapReduce](#mapreduce)
- [Потоковая обработка (Apache Storm)](#потоковая-обработка-apache-storm)
- [Архитектуры BigData решений](#архитектуры-bigdata-решений)
## Apache Spark
[содержание](#)
### С чего начать первое знакомство с Apache Spark?
Рекомендуется начать с установки IDEA Community Edition, создания sbt проекта и подключения Apache Spark библиотеки последней версии https://mvnrepository.com/artifact/org.apache.spark/spark-core. В среде разработки вы можете реализовать классическое приложение, создать scratch файл или запустить scala консоль для более интерактивной разработки.
После освоения Spark команд на локальных данных, отработайте навыки импорта/экспорта данных в распределённую файловую систему и запуска Apache Spark приложений на одной из доступных виртуальных машин с Hadoop экосистемой:
- https://mapr.com/try-mapr/sandbox/,
- https://www.cloudera.com/downloads/hortonworks-sandbox/hdp.html
или на одном из серверов в облаке:
- https://docs.microsoft.com/ru-ru/azure/hdinsight/spark/apache-spark-overview,
- https://aws.amazon.com/ru/emr/features/spark/,
- https://cloud.google.com/learn/what-is-apache-spark.
В подготовленной администратором среде вам должны быть доступны команды `spark-shell` для интерактивного программного взаимодействия, `spark-shell` для запуска задачи, `spark-sql` для интерактивного выполнения Spark SQL запросов. Также доступен `spark-class` скрипт, который используется всеми вышеперечисленными командами, для более тонкой настройки запуска.
### Как подключить Apache Spark библиотеку к проекту на Java, Scala?
Как и любую другую библиотеку в зависимости от менеджера зависимостей в конфигурационном файле. Например, для Maven это будет файл `maven.conf`. Существуют сайты репозитариев с функцией поиска библиотек, например mvnrepository.com. На нём на странице найденной библиотеки приводится инструкция о том, как её добавить.
### Какой командой можно загрузить текстовый файл в Spark?
```
sc.textFile(“/user/user1/example.txt”)
```
### Какие виды источников данных могут быть использованы в Spark?
В первую очередь Spark предназначен для обработки файлов в DFS (распределённой файловой системе). К нему также можно подключить различные облачные хранилища. Для тестирования на локальной машине возможно обращение к локальной файловой системе с префиксом `file://` в пути. В Spark Streaming возможно использование коннекторов с системами очередей, например к Apache Kafka и реляционным базам данных.
Более полный список из документации: Connecting to SQL Databases using JDBC, Amazon Redshift, Amazon S3, Amazon S3 Select, Azure Blob Storage, Azure Data Lake Storage Gen1, Azure Data Lake Storage Gen2, Azure Cosmos DB, Azure SQL Data Warehouse, Cassandra, Couchbase, ElasticSearch, Images, Import Hive Tables, MongoDB, Neo4j, Oracle, Avro Files, CSV Files, JSON Files, LZO Compressed Files, Parquet Files, Redis, Riak Time Series, Snowflake, Zip Files, Apache Kafka, Amazon Kinesis, Optimized S3 File Source with SQS, Azure Event Hubs, Databricks Delta Tables, Read and Write Streaming Avro Data with DataFrames.
### С каким типом данных (c какой абстракцией) работает Spark?
Resilient Distributed Dataset (RDD), DataSet, DataFrame.
### Как указать в качестве источника файл на локальной файловой системе?
В пути к файлу должен присутствовать префикс `file://` (указание схемы “file” в формате URI).
### Какие два вида операций доступны в Apache Spark? В чём их различие?
Операции трансформаций и операции действия (Операции, которые только объявляют какие действия с данными нужно сделать и операции, которые обязательно приводят к запуску расчётов, в том числе действий, от которых такая операции зависит).
### К какому типу операций относятся функции высшего порядка map, filter, reduce?
К трансформациям, за исключением reduce.
### Что является моделью описания обработки данных?
Направленный ациклический граф (DAG).
### Какие функции доступны для сохранения данных?
Из RDD — saveAsObjectFile, saveAsTextFile. Из DataFrame и DataSet — write.
### Чем определяется распределение данных по кластеру?
Количеством, размером файлов и размером блока в распределённой файловой системе. При обработке указанным параллелизмом операции.
### Как распределён список размера N на M узлах с указанием параллелизма 3?
Наверное, по N/M элементов. При чём здесь параллелизм.
### Можно ли реализовать итеративный алгоритм на Apache Spark?
Да, это одна из мотиваций создания фреймворка.
### Чем отличаются функции join и union?
Join работает со списками пар ключ/значение, семантика операции требует наличие идентификатора значения. Uninon не требует допущений относительно элементов коллекции, работает с коллекциями как с мультимножествами, возвращая мультимножество (не убирает дубликаты).
### Является ли результат выполнения операции union множеством?
Нет, операция ведёт себя как объединение мультимножеств или как конкатенация списков. Дубликаты не убираются.
### Когда следует использовать кэширование?
Когда посчитанные на определённом этапе данные в DAG будут использованы далее в нескольких местах.
### Какие виды хранения существуют для распределённых данных?
Непонятный вопрос. (in memory, ...)
### Каким способом можно сгруппировать данные?
С помощью функций `aggregate`, `reduce`, `fold`.
### Чем отличаются функции aggregate от reduce?
Тип возвращаемого значения может отличаться от типа входных значений.
### Какая трансформация будет выполняться дольше groupBy->reduce или reduceByKey? Почему?
Трансформация groupBy->reduce будет выполняться дольше, так как фреймворк не будет выполнять частичную редукцию до передачи данных по сети. Это приведёт к большему объёму переданных по сети данных и большей нагрузке reduce обработчиков.
### Где (на каком узле) выполняется операция collect?
Операция выполняется на driver узле, с которого была запущена задача, например посредством `spark-submit` или `spark-shell`.
### Приведите пример, в котором нужно использовать flatMap.
В классической задаче подсчёта частоты встречаемости слов в тексте этап создания слов из строк файла реализуется с помощью flatMap. `“text.flatMap(line => line.split(“ ”)).map(word => (word, 1)).reduceByKey((a, b) => a + b)”`
### На каких узлах и в каком количестве выполняется функция, переданная в mapPartitions?
Функция выполняемая для одной партиции гарантированно выполняется на одном узле в одном Java процессе.
### Что произойдёт при выключении одного из узлов при обработке map операции? В середине обработки конвейера из трансформаций?
Будет выполнено несколько попыток перезапуска.
### Как поведёт себя система в случае отказа узла при установленном флаге checkpoint?
Оператор, после которого вызвана функция checkpoint, будет восстанавливать состояние из контрольной точки, а не путём пересчёта данных с нулевой отметки.
### Каким образом получить список пар ключ-значение, отсортированный по ключу?
```
list.sortBy(x => x._1)
```
### Чем отличается операция fold от reduce в Spark?
В fold можно добавить начальное значение.
### Как выглядит простейшая программа на Apache Spark?
```
sc.map(x=>x+1).take(5) ?
```
### Как объявить анонимную функцию?
```
e => print(e) или (a,b) => a + b
```
### Для чего используются анонимные функции и какими свойствами они обладают?
Функции без имени в scala обладают теми же возможностями, что и обычные функции с именами. Обычно используются для коротких выражений, выполнение которых производится не главным потоком или отложено во времени. В Spark разделение между описанием действием и тем, как оно будет вычисляться присутствует естественным образом в DAG.
### Что такое замыкание?
Это функция, в которой происходит обращение к переменным лексического контекста объявленными за пределами тела функции. В этом случае функция “захватывает” переменную и должна при передаче по сети или сохранении на диск сериализоваться вместе с данными переменной.
### Где выполняется пользовательская функция в локальном режиме и в режиме выполнения на кластере?
Функция в локально режиме выполняется в JVM driver процесса, в распределённом режиме рабочий процесс выполнения определяется планировщиком Spark.
### Какие виды процессов существуют в Spark кластере?
Существует 2 вида процессов: driver и worker.
### Верно ли, что переменная аккумулятор может быть использована для суммирования результатов, полученных в процессе выполнения задачи?
Да. Пример `sc.parallelize(Array(1,2,3,4)).foreach(x => accum += x)` .
### Можно ли считывать переменную аккумулятор на рабочих узлах?
Нет. Аккумулирующую переменную можно считать только в driver процессе.
### Где выполняется команды вывода для rdd.map(println), rdd.foreach(println), rdd.take(5).foreach(println)?
Первая команда выполнится, если будет вызвана операция-действие, вторая на разных узлах, где лежат данные, третья соберёт и напечатает данные в driver.
### Каким способом могут быть разосланы данные, например словарь, по всем узлам для дальнейшего использования в функциях трансформации?
С помощью broadcast переменной. `val broadcastVar = sc.broadcast(Array(1,2,3)) `
## Hive
[содержание](#)
### Какие типы таблиц существуют в Hive?
Существует два типа таблиц: управляемая (managed) и внешняя (external). Hive управляет и данными, и схемой управляемой таблицы. Данные внешней таблицы находятся за пределами ответственности Hive.
### Подходит ли Hive роль OLTP (обработка транзакций в реальном времени)?
Нет, Hive не даёт возможности вставки и обновления на уровне строк. И, следовательно, обрабатывать данные в реальном времени.
### Можно ли переименовывать таблицы после создания?
Да. `ALTER TABLE old_name RENAME TO new_name`.
### Можно ли изменить тип данных колонки?
Да, используя параметр `REPLACE в ALTER TABLE. ALTER TABLE table_name REPLACE COLUMNS ...`
### Дайте определение metastore.
Это реляционная база данных для хранения метаданных таблиц, партиций, hive баз данных.
### Для чего писать свой SerDe класс?
В зависимости от характера данных пользователя встроенные SerDe реализации могут быть неподходящими для их формата.
### Для чего создавался Hive? Какую роль он играет в экосистеме Hadoop?
Hive является интерфейсом к данным в HDFS, позволяющим выполнять подобные SQL запросы к данным на языке HiveQL. Его используют для получения и анализа данных.
### В какой директории по умолчанию хранятся табличные данные?
```
hdfs://namenode_server/user/hive/warehouse
```
### В каких трёх режимах может быть запущен Hive?
Локальном (Local), Распределённом (Distributed), Псевдораспределённом (Pseudodistributed)
### Существует ли понятие типа данных в Hive?
Да, например `float`, `int`, `ARRAY`.
### Существует ли тип данных для даты?
Да, `TIMESTAMP`. Дата хранится в формате `java.sql.timestamp`.
### Какие составные типы данных есть в Hive?
`ARRAY`, `MAP`, `STRUCT`
### Можем ли мы запустить линукс консольную команду из Hive интерпретатора?
Используя восклицательный знак. Например, “!pwd”
### Для чего нужны Hive переменные?
Через переменные вы можете передавать значения в Hive запросы при их запуске.
### Могут ли запросы выполняться из скриптов? Как?
```
hive> source /path/file.hql
```
### Какую роль играет файл `_hiverc` в Hive?
В файле хранится список комманд, запускаемых при старте интерпретатора Hive. Например, там может быть установлен строгий режим выполнения задачи (strict mode).
### Какие разделители используются по умолчанию для разграничения строк и столбцов?
Для строк символ новой строки “\n”, для столбцов (полей) символы “\001”, “\002”, “\003”.
### В какой момент происходит валидация схемы - во время чтения и/или во время записи?
Только время чтения.
### Можно ли командах SHOW использовать шаблоны поиска?
Да, `SHOW DATABASES LIKE p.*`
### Как вывести все базы данные Hive, начинающиеся с буквы p?
```
SHOW DATABASES LIKE p.*
```
### Для чего используется команда USE?
Для выбора текущей базы данных, для которой будут выполняться последующие запросы.
### Как удалить `DBPROPERTY`?
Удалить `DBPROPERTY` нельзя.
### Что означает строка `“set hive.mapred.mode = strict;”` ?
Она переключает MapReduce задачи в строгий режим. В строгом режиме запросы не могут выполняться над партиционированными таблицами без слова WHERE. Это предотвращает выполнение больших задач длительное время.
### Каким образом проверить существование конкретной партиции?
```
SHOW PARTITIONS table_name PARTITION(partition_column=partition_value)
```
### Какие Java классы отвечают за кодирование данных в файлы, хранящих таблицы Hive?
`org.apache.hadoop.mapred.TextInputFormat`
### Какие Java классы отвечают за раскодирование данных в файлы, хранящих таблицы Hive?
`org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat`
### Что произойдет при попытке удаления таблицы с выражением IF EXISTS?
Hive выкинет ошибку, если таблица не существует.
### Что произойдёт с данными, если поменять путь к партиции таблицы?
Данные останутся в прежнем место, их нужно перемещать отдельной командой.
### Как вставить дополнительные столбцы в таблицу перед существующей колонкой?
```
ALTER TABLE table_name CHANGE COLUMN new_col INT BEFORE x_col
```
### Экономит ли место архивированная таблица в HDFS?
Нет, только уменьшает нагрузку на NameNode
### Можно ли исключить партицию таблицы из результата запроса?
Да, выставив свойство `ENABLE OFFLINE в ALTER TABLE`.
### Каким образом при загрузке данных в Hive указать путь к HDFS файлу или к локальному файлу линукс системы.
Для указания локального файла требуется дополнительное выражение local.
### Что делает следующий запрос `“INSERT OVERWRITE TABLE employees PARTITION (country, state) SELECT …, se.cntry, se.st FROM stage_employees se;”`?
Он создаёт раздел для таблицы employees со значениями из колонок SELECT выражения.
### Что такое генерирующая функция в Hive?
Генерирующая функция принимает один столбец и разворачивает его в несколько.
### В чём разница между LIKE и RLIKE операторами Hive?
`LIKE` ведёт себя аналогично обычному SQL LIKE. Например, `street_name like %Chi`. RLIKE позволяет использовать регулярные выражения. Например, `street_name RLIKE .*(Chi|Oho).*`.
### Можно ли выполнить декартово произведение двух таблиц в Hive.
Нет. Но почему?
### Как преобразовать строку “51.2” в число с плавающей точкой?
```
cast(price as FLOAT)
```
### Может ли имя представления (view) совпадать с именем таблицы?
Нет, имена таблиц и представлений уникальны для одной базы данных
### Можно ли загрузить данные в представление?
Нет
### Какими накладными расходами чревато создание индексов?
Индексы занимают дополнительной пространство и требуют дополнительной обработки данных при создании
### Назовите команду для вывода списка индексов.
```
SHOW INDEX ON table_name
```
### Что такое бакетирование?
Шардинг? Значения колонки хешируются в разные бакеты, количество которых определено пользователем. Это один из способов избежать создания нескольких или вложенных партиций.
### Для чего предназначена команда /* streamtable(table_name) */?
Это подсказка для Hive, что перед запросом данные должны быть сначала помещены в память. Это техника оптимизации запроса.
### Может ли партиция быть заархивирована? Какие существуют преимущества и недостатки?
Да, партиция может быть заархивирована. Преимущество в том, что количество файлов, учитываемых NameNode и заархивированный файл может быть предметом запроса на hiveql. Недостаток в том, что выполнение запроса будет производиться менее эффективно.
### Что такое UDF?
UDF является Java программой, которая создаётся для определённой задачи, которая не может быть решена стандартными функциями Hive. В программе можно определить тип аргумента и вернуть соответствующий результат.
### Следующее выражение не выполнилось - `LOAD DATA LOCAL INPATH {env:HOME}/country/state/ OVERWRITE INTO TABLE address;`. В чём причина?
В качестве аргумента `LOCAL INPATH` должен передаваться путь к файлу, а не к директории.
### Как указать создателя таблицы при создании таблицы?
```
TBLPROPERTIES(creator = Joan)
```
## MapReduce
[содержание](#)
### Как работает механизм репликации в HDFS?
Chain репликация выполняется при параметре `dfs.replication` в `hdfs-size.xml` большим, чем 1. Данные записываются в локальный блок HDFS, затем транслируются в блок в соседнем узле той же стойки, затем в блок на узле из другой стойки. Запись заканчивается, когда, начиная с последнего узла цепочки до первого, проходит подтверждение об успешной записи.
### За счёт какой оптимизации в MapReduce минимизируется объём переданных данных по сети?
Выполнения обработки на том же узле, на котором хранятся данные.
### Необходимо ли указывать тип входного и выходного формата при запуске MapReduce?
Нет, значение по умолчанию “text”.
### Можем ли мы переименовать выходной файл?
Да, реализуя класс MultipleFormatOutput.
### Что подразумевается под перемешиванием и сортировкой в MapReduce?
Сразу после выполнения map задачи полученные результаты сортируются перед и после передачи на сторону reduce. По принятой терминологии этот процесс называется shuffle (перемешивание).
### Объясните, как происходит процесс сброса на диск (spilling) во время MapReduce.
Выдача map задачи сначала записывается в циклический буфер в памяти, а при его заполнении на заданный процент данные сбрасываются на диск. По умолчанию размер буфера `mapreduce.task.io.sort.mb` равен 100МБ, а пороговое значение `mapreduce.map.sort.spill.percent` заполнения 80%.
### Что такое распределённый кэш в фреймворке MapReduce?
Распределённый кэш — это дополнительная утилита фреймворка для хранения файлов, необходимых для выполнения задачи. После регистрации в кэше для данной задачи файлов, фреймворк сделает их доступными локально на каждом узле, на котором будет исполняться задача. Так, что вы сможете получить доступ из Mapper и Reducer.
### Что такое Combiner? В каких случаях его следует использовать?
Это функция аналогичная reduce для частичной агрегации данных на стороне Map до передачи данных по сети.
### Почему результаты Map задач сохраняются на локальном диске, а не в HDFS?
Результаты Map являются промежуточными в MapReduce задачи и после выполнения могут быть удалены. Поэтому смысла в их репликации нет.
### Что произойдёт, если map задача прервётся из-за ошибки до того, как её результаты будут переданы в reducer?
Будет создана новая задача на другом узле для расчёта результата заново.
### В чём заключается роль MapReduce Partitioner?
Задачей этого класса является распределение пар ключ/значение по партициям, число которых равно количеству Reduce процессов. Каждая партиция будет обработана своим reduce процессом. Распределения производится хэш-функцией над одним или группой ключей. В поставку Hadoop для этой цели входит класс HashPartitioner.
### Как мы можем быть уверены, что значения определённого ключа попадут в один reducer?
Мы можем контролировать куда попадут пары, используя класс распределяющий по партициям.
### В чём разница между InputSplit и HDFS блоком?
Объект класс InputSplit хранит логическое разделение исходных данных, а размер HDFS блока определяет физическое разделение.
### Что такое InputFormat?
Наследник InputFormat определяет то, как: проверяет путь к входным файлам на корректность, исходные данные делятся на логические части InputSplit, создаёт объект класса RecordReader для чтения записей из логической части.
### Для чего используется TextInputFormat?
Этот класс является классом по умолчанию для формата входных данных. Согласно ему исходный файл читается по строкам, что формирует пары номер строки/содержимое строки для Map функции.
### Какую роль играет RecordReader?
Логическое разбиение задаёт поле для работы map функции, но не определяет то, как эти данные читать (парсить). Задача преобразования входных данных в пары ключ/значение передаётся RecordReader-у, который создаётся классом InputFormat.
### Какие конфигурационные параметры необходимы для запуска MapReduce задачи?
Путь к входным файлам, путь к выходным файлам, формат входных данных, формат выходных данных, класс содержащий map функцию, класс содержащий reduce функцию, jar c этими классами плюс driver класс.
### Когда вам следует использовать SequenceFileInputFormat?
Этот класс позволяет читать данные из файла, записанного в сжатом бинарном формате `SequenceFileOutputFormat`, который применяется для эффективной передачи данных между MapReduce задачами.
### Что такое identity Mapper и identity Reducer?
Это классы Mapper и Reducer по умолчанию, которые использует фреймворк, если пользователем они не переопределены. Map возвращает идентичный результат полученному от RecordReader, Reduce возвращает идентичный результат полученному от shuffle фазы.
### В чём заключается преимущество выполнения join на стороне Map?
Уменьшает накладные расходы в фазе перемешивания и сортировки, а также уменьшает время выполнения задачи.
### Можно ли установить количество Reduce задач равным нулю?
Да, например, в случае, когда вам не нужно проводить редукцию результатов Map. В этом случае результаты буду записаны не локально, а напрямую в HDFS.
### Как планово остановить MapReduce задачу?
С помощью команды `hadoop job -kill JOBID`
### Каким образом можно добавить к исполняемому jar файлу дополнительные ресурсы: медиа файлы, статичные файлы с табличными данными?
Вы можете добавить их в распределённый кэш. Файлы будут скопированы один раз в начале задачи.
### Дайте определение спекулятивному выполнению задач.
Это оптимизация, которая позволяет бороться с медленными обработчиками. В конце Map или Reduce фазы задачи для логического разбиения запускаются не в одном, а нескольких экземплярах. Та задача, которая выигрывает гонку, первой записывает данные локально или в HDFS, а дубликаты уничтожаются.
### Что такое NameNode и DataNode?
NameNode — главный процесс в HDFS, который управляет процессами DataNode, хранит метаданные файлов распределённой файловой системы. DataNode — обеспечивает доступ к локальным данным, контролирует целостность данных, за которые отвечает.
### Имеет ли смысл устанавливаться NameNode и DataNode на машины с одинаковыми характеристиками?
Нет, для хранения метаданных требуется большой объём оперативной памяти и небольшое пространство на жёстком диске. Для хранения данных требуются большие объёмы дискового пространства.
### В чём различие между NAS (Network Attached Storage) и HDFS?
NAS является единой точкой доступа к данным, единой точкой отказа, не позволяет обрабатывать данные без предварительной передачи по сети на узел или узлы обработки.
### Чем отличается реляционное хранилище от HDFS?
Схема проверяется при записи в РБД и при чтении в HDFS
В РБД доступны эффективное чтение и вставка отдельных записей. В HDFS за счёт параллельного чтения с множества узлов достигается очень высокая пропускная способность.
Hadoop наилучшим образом подходит для OLAP, РБД для OLTP.
### За счёт чего HDFS позволяет достичь высокой пропускной способности на чтение?
За счёт модели “один пишет — многие читают”
### Какую задачу решает SecondaryNameNode? (Верно ли, что SecondaryNameNode используется для достижения высокой доступности?)
Это процесс-помощник, который периодически применяет изменения EditLogs к FsImage для предотвращения разрастания EditLogs.
### Какую информацию в HDFS можно считать метаданными?
Метаданные отражают структуру HDFS директорий и файлов. Они включают в себя данные о: владельце, правах, квотах, степени репликации. В NameNode присутствует два файла с метаданными:
- FsImage — содержит полное описание файловой системы с начала работы процесса,
- EditLogs — содержит все последние изменения, произведённые в файловой системе
### В чём заключается проблема хранения большого количества небольших файлов в HDFS?
Несмотря на наличие места в HDFS, файловая система может перестать нормально работать из-за нехватки оперативной памяти для хранения метаданных файлов, размер которых равен приблизительно 150 байтам на файл.
Объясните работу механизма Heartbeat.
Это периодический сигнал от DataNode к NameNode, наличие которого сигнализирует о штатной работе и наличии соединения между процессами. По умолчанию интервал между сигналами `dfs.heartbeat.interval` составляет 3 секунды.
### Каким образом можно узнать о статусе работы NameNode в системе?
Одним из способов является утилита JPS.
### Что такое HDFS блок?
Блок — это непрерывная последовательность байт с данными на диске. HDFS хранит каждый файл блоками, размер которых по умолчанию равен 128 МБ, в отличие от Linux файловой системы с размером блоков 4 КБ. Мотивацией для таких больших размеров блока является снижение накладных расходов на поиск считывающей головкой в жестких дисках место считывания и уменьшения количества метаданных.
### Предположим, что в HDFS хранится файл размером 514 МБ с параметром репликации и размером блока по умолчанию. Какое количество блоков хранится в системе, каков их размер, и сколько всего данных будет храниться на дисках?
При размере блока 128 МБ и репликацией 3, в HDFS будет храниться 4 блока по 128 МБ и один блок 2 МБ, каждый в трёх копиях. Всего на дисках будет храниться 1542 МБ.
### Можно ли скопировать файл в HDFS c другим параметром размера блока, чем тот, что задан администратором HDFS?
Да, при копировании нужно указать опцию “-Ddfs.blocksize=block_size”, где размер указывается в байтах. Например,
```
hadoop fs -Ddfs.blocksize=33554432 -copyFromLocal test.txt /sample_hdfs
```
Проверить атрибуты файла можно командой:
```
hadoop fs -stat %o /sample_hdfs/test.txt
```
### Можно ли поменять размер блока для всех файлов хранимых в HDFS?
Да, можно, в конфигурационном файле `hdfs-site.xml`. Но изменения произойдут только после перезапуска распределённой файловой системы.
### Что такое сканнер блоков HDFS?
DataNode периодически сканирует файлы, хранимые на его узле, на предмет ошибок. Если битые файлы найдены выполняется следующая последовательность действий:
1. Отправляется отчёт об испорченных блоках в NameNode;
2. NameNode инициализирует процесс создания новых реплик данных из оставшихся корректных блоков;
3. После успешной репликации испорченные блоки будут удаляются.
Посмотреть отчёт сканнера можно на web странице DataNode в разделе /blockScannerReport.
### За счёт чего достигается отказоустойчивость HDFS?
Отказоустойчивость достигается за счёт репликации, heartbeat мониторинга и перезапуска вылетевших задач.
### Можете ли вы для выбранных файлов в HDFS изменить степень репликации? Покажите, как это сделать на примере.
Да. Например, для файла text.xml с репликацией 1 можно установить количество реплик равное 3 командой:
```
hadoop fs -setrwp -w 3 test.xml
```
Проверить изменения можно командами:
```
hadoop fs -ls или hadoop fsck test.xml -files
```
### Что означает rack awareness алгоритм?
При репликации копии будут записываться в разные наиболее друг от друга независимые с точки зрения причины отказа оборудования места. Если первая реплика будет сохранена локально, то вторая будет сохранена на другой машине в той же стойке, а третья в машине в другой стойке. Это позволяет улучшить доступность данных по сети и уменьшить вероятность потери данных.
### Опишите последовательность действий алгоритма записи данных в HDFS.
Допустим клиент хочет записать файл в HDFS. Тогда процесс пройдёт следующие этапы:
- клиент разделит файл на блоки и отправит запрос на запись в NameNode,
- для каждого блока будет возвращён список из нескольких адресов DataNode (3 при степени репликации 3) в которые можно отправлять соответствующие блоки,
- блок будет скопирован клиентом в один из DataNode, который автоматически запустит репликацию на следующие два в цепочке.
### Можете ли вы изменить существующий файл?
Нет, в HDFS данные файлов не изменяемые. Можно добавлять данные в конец файла.
### Могут ли несколько пользователей параллельно записывать данные в один HDFS файл?
Нет. HDFS работает согласно модели “пишет один — читают многие”.
### Можно ли считывать из файла, который в данный момент пишется программой другого пользователя?
Да, можно, но HDFS не даёт гарантий, что новые данные, которые другой пользователь по его заявлению успешно записал, будут видны при чтении. Пишущая программа может периодически вызывать hflush для ускорения появления новых данных для чтения другими пользователями.
### Каким образом HDFS отслеживает целостность хранимых данных в кластере?
В распределённом хранилище существует ненулевая вероятность того, что данные могли повредиться при записи или чтение. Поэтому HDFS также хранит контрольные суммы для данных, которые всегда проверяются при чтении данных. Кроме того, периодически каждый DataNode выполняет дежурную операцию обхода данных с проверкой контрольных сумм.
### Для чего используется конфигурация HighAvailability у NameNode? Как она работает?
Для решения проблемы единой точки отказа, которой является процесс NameNode. Был добавлен в Hadoop 2.x. При такой конфигурации в кластере присутствуют два NameNode процесса в активно/пассивном режиме. В случае, если один завершается с ошибкой или становится недоступным — другой переключается в активный режим и подхватывает обслуживание запросов.
### Для чего в HDFS предоставляется возможность архивирования?
Эта возможность была добавлена для решения проблемы, возрастающей со временем нагрузки на память процессом NameNode. Процессом архивации, накопившееся множество мелких файлов, объединяется в небольшое количество крупных. Это позволяет существенно сократить объём метаданных, хранимых для каждого файла.
### Каким образом можно совершить миграцию данных из одного Hadoop кластера в другой?
```
hadoop distcp hdfs://<source NameNode> hdfs://<target NameNode>
```
## Потоковая обработка (Apache Storm)
[содержание](#)
### Что такое Apache Storm?
Система потоковой обработки с открытым кодом.
### Дайте определение топологии в Apache Storm и назовите её составляющие элементы.
Топология — ориентированный граф описывающий способ обработки данных в Storm. Может трактоваться как описание запроса в более низкоуровневых терминах, чем SQL. Состоит из источников (spouts), вводящих поток записей из внешних систем в систему Storm, и обработчиков (bolts), выполняющих преобразования над потоком записей.
### Какими отличительными свойствами обладают потоковые данные?
Количество данных в потоке потенциально бесконечно, характеристики (интенсивность, распределение) определяются внешним для потоковой системы окружением
### Можно ли обновить топологию в Apache Storm?
Единственным вариантом обновления топологии является остановка старой топологии и запуск новой. В реальный рабочих условиях может потребоваться запуск новой топологии параллельно старой.
### Объясните механизм гарантии доставки сообщений в Apache Storm.
Источник закрепляет за каждым созданным им сообщением уникальный идентификатор и хранит его до тех пор, пока сообщение или производные от него сообщения не пройдут по графу обработки. Перед каждой передачей узел-отправитель генерирует случайное число и отправляет вместе с идентификатором специальному процессу `Acker`. Каждый получатель, после успешной обработки сообщения отправляет те же два числа — идентификатор и сгенерированное случайное число, которые он узнал из сообщения — процессу `Acker` второй раз. По получению сообщения Acker в своей хэш-таблице записывает результат исключающего или между хранимым значением случайным числом и полученным из сообщения под ключом идентификатора. Обработка считает выполненной, если значение ключа идентификатора стало равным нулю.
### Какой механизм обмена сообщениями используется в Apache Storm между рабочими процессами, а какой между потоками процесса? Можно ли его заменить?
По умолчанию используется Netty для передачи сообщений. Раньше использовался ZeroMQ, который может быть включен в настройках. Внутри процесса обмен происходит через циклический буфер, названный разработчиками LMAX Disruptor.
### Приведите несколько примеров задач, хорошо решаемых потоковыми системами?
Мониторинг сети на предмет аномалий и взломов. Веб аналитика кликов пользователей на интернет-страницах.
## Архитектуры BigData решений
[содержание](#)
### Объясните, что такое Lambda архитектура.
Lambda architecture = CQRS (batch layer + serving layer) + speed layer
https://martinfowler.com/bliki/CQRS.html
### Чем Kappa архитектура отличается от Lambda архитектуры? В чём преимущества?
Главное отличие в том, что вместо двух фреймворков обработки данных используется один фреймворк потоковой обработки.
### О чём говорит CAP теорема? Приведите неформальное доказательство.
Вы не можете одновременно добиться трёх свойств от системы: согласованности данных, доступности, распределённости. В реальности распределённость является неотъемлемым свойством, поэтому перед разработчиком системы встаёт выбор между гарантией согласованности данных или гарантией высокой доступности.
### Для чего используется Zookeeper?
Zookeeper используется многими фреймворками (Kafka, Storm, Flink, Hive) для хранения конфигурационной информации, метаданных о системе. Отсутствие единой точки отказа, гарантия согласованности данных, отказоустойчивость делают его хорошим кандидатом для хранения небольшого, но жизненно важного объёма информации о состоянии системы. Хранение данных в памяти позволяет быстро отдавать данные.

@ -1,20 +1,16 @@
# Материалы курса "Большие данные" 2023
# Материалы курса "Большие данные" 2026 (бакалавры)
### График выполнения и форма отчётности
| Время | Лабораторная работа | Форма отчётности |
| ------ | ------ | ------ |
| Сентябрь | [Введение в Apache Spark](./L1%20-%20Introduction%20to%20Apache%20Spark/) | проект с выполненными заданиями и отчёт |
| Октябрь | [Формирование отчётов в Apache Spark](./L2%20-%20Reports%20with%20Apache%20Spark) | скрипт/проект и отчёт |
| Ноябрь | [Потоковая обработка в Apache Flink](./L3%20-%20Stream%20processing%20with%20Apache%20Flink/README.md) | проект с выполненными заданиями и отчёт (с зелёными тестами) |
| Декабрь | [ZooKeeper](./L4%20-%20ZooKeeper/L4_Zookeeper.md) | проект и отчёт |
| Февраль | [Введение в Mapreduce](./L0%20-%20Introduction%20to%20MapReduce%20data%20processing%20model/) | jupyter ноутбук с выполненными заданиями |
| Март | [Введение в Apache Spark](./L1%20-%20Introduction%20to%20Apache%20Spark/) | jupyter ноутбук или проект с выполненными заданиями и отчёт |
| Апрель | [Формирование отчётов в Apache Spark](./L2%20-%20Reports%20with%20Apache%20Spark) | jupyter ноутбук или скрипт/проект и отчёт |
| Май | [Потоковая обработка в Apache Flink](./L3%20-%20Stream%20processing%20with%20Apache%20Flink/README.md) | проект с выполненными заданиями и отчёт (с зелёными тестами) |
Первые две лабораторные могут выполняться на оборудовании университета на сайте https://mapr.space. Пишите, для регистрации пользователя, преподавателю.
Spark лабораторные могут выполняться в Google Colab, наподобие того, как это сделано здесь https://colab.research.google.com/drive/1G894WS7ltIUTusWWmsCnF_zQhQqZCDOc.
### События
В заданиях используйте выборки данных из папки [data](./data/) .
### Архив событий
27-30 июня, Data + AI Summit NA 2022: https://databricks.com/dataaisummit/north-america-2022
26-28 октября, Flink Forward Global 2021: https://www.youtube.com/playlist?list=PLDX4T_cnKjD0J2LFr7yBk2aSS_o2l-7ue
24-28 мая, Data + AI Summit NA 2021: https://www.youtube.com/playlist?list=PLTPXxbhUt-YVtufaAKCRfyPYsjgpq5DRL
9-12 декабря, SmartData 2020: https://www.youtube.com/playlist?list=PLeN_80lmoMY1ugdDLg2mWht5eQDq6CoNQ
Для сдачи выкладывайте решения в репозиторий на github (или иной хостинг на основе системы контроля версий). Защита проходит онлайн в виде проверки выполнения заданий и 1-2 вопросов по каждой лабораторной.
Loading…
Cancel
Save