You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
big_data/L1 - Introduction to Apache.../L1_noninteractive_bike_anal...

458 lines
30 KiB
Markdown

# Создание локального проекта
Разработка программы на Spark может быть выполнена на нескольких языках: Python, R, Scala, Java. В данном руководстве рассмотрим разработку на последних двух, так как они имеют самую полную поддержку API обработки данных.
Разработка приложения может производиться в любом текстовом редакторе, затем быть собрана системой сборки в отдельное приложение и запущена на Spark кластере с помощью консольной команды spark-submit.
В данной лабораторной работе мы будем использовать IntelliJ IDEA. IDE предоставляет набор возможностей для упрощения разработки: автодополнение, индексация проекта, статическая проверка кода, подсветка синтаксиса, интеграция с системами контроля версий и системами сборки.
Для работы необходима установленная последняя версия IDE IntelliJ IDEA Community Edition. Данная среда разработки доступа для скачивания по адресу https://www.jetbrains.com/idea/.
Для создания проекта в IntelliJ IDEA запустите среду разработки, выберите Create New Project.
![](images/8_.png)
## Разработка с использованием системы сборки SBT на языке Scala
Для создания Scala + SBT проекта выберите слева в меню Scala и затем SBT.
![](images/9_.png)
Далее укажите имя проекта, версию Java, версию SBT и версию Scala компилятора. Для разработки на Spark 2.4.0 рекомендуется выбрать версию Scala 2.12.9.
*Примечание.* Установите флаг Use auto-import для того, чтобы не обновлять зависимости вручную при изменениях в проекте.
![](images/10_.png)
После нажатия Finish откроется главное окно среды разработки.
![](images/11_.png)
Подождите, когда SBT скачает зависимости.
В дереве проекта должен появиться файл `build.sbt`, являющийся основным файлом для настройки сборки и указания зависимостей проекта SBT. В файле на момент создания указаны: имя проекта, версия проекта, версия языка Scala.
*Примечание.* Появление предупреждений о конфликте имён в SBT 0.13.8 является известной ошибкой https://github.com/sbt/sbt/issues/1933. Одно из решений — использование более ранней версии или скрытие всех предупреждений установкой степени логирования `logLevel := Level.Error`.
Код Scala помещается в папку `src/main/scala` или `src/main/scala-2.12`.
Создайте в папке scala объект Main c методом main. Данный метод будет точкой входа в программу.
![](images/12_.png)
![](images/13_.png)
*Примечание.* Аналогом объекта object в Java является паттерн Singleton. Выполнения тела объекта происходит при его загрузке в память, аналогично инициализации в конструкторе, методы object доступны без создания объекта оператором new, аналогично публичным статическим методам.
```scala
object Main {
def main(args: Array[String]) {
println("Hello world")
}
}
```
В контекстном меню выберите Run 'Main', либо нажмите сочетание клавиш Ctrl+Shift+F10.
![](images/14_.png)
После выполнения в консоли должно появиться приветствие.
![](images/15_.png)
Добавьте к проекту зависимость Spark (версия на MapR кластере), записав следующие строки в конце файла build.sbt:
```scala
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % "2.4.0"
)
```
Сохраните изменения и обновите проект.
![](images/16_.png)
Подождите, когда SBT скачает все зависимые библиотеки.
Измените код `Main.scala` и создайте простейшую Spark программу. Импортируйте классы пакета `org.apache.spark`.
```scala
import org.apache.spark._
```
Создайте конфигурацию Spark с помощью класса SparkConf. Укажите обязательные параметры: имя запускаемой задачи (имя контекста задачи) и режим запуска (список режимов http://spark.apache.org/docs/latest/submitting-applications.html#master-urls). В нашем случае в качестве режима будет указан параметр `local[2]`, означающий запуск с двумя потоками на локальной машине. В качестве режима может быть указан адрес главного узла.
```scala
val cfg = new SparkConf()
.setAppName("Test").setMaster("local[2]")
```
*Примечание.* В Scala различаются два вида переменных: `val` и `var`. Переменные `val` являются неизменяемыми и инициализируются один раз, в отличие от `var`, которой можно присваивать новые значения несколько раз.
Инициализируйте контекст Spark в главном методе.
```scala
val sc = new SparkContext(cfg)
```
Добавьте в конец файла команду остановки контекста
```scala
sc.stop()
```
После инициализации контекста вы можете обращаться к командам Spark. Считайте любой текстовый файл из локальной файловой системы и выведите его по строкам в консоль.
*Примечание.* Путь к файлу в локальной файловой системе имеет определённый формат, имеющий префикс "file:///". https://tools.ietf.org/html/rfc8089
```scala
val textFile = sc.textFile("file:///c:/temp/file.txt")
textFile.foreach(println)
```
![](images/17_.png)
*Примечание.* При работе без winutils.exe запись в файловую систему будет порождать ошибку. Известным решением является скачивание данного файла из проекта Hadoop в файловую систему в папку с названием bin и указанием переменной Spark `hadoop.home.dir`. В переменной `hadoop.home.dir` хранится путь к папке c Hadoop определённой версии. Установить переменную среды JVM вы можете кодом `System`.`setProperty(key, value)` . Другим решением проблемы является установка переменной среды `HADOOP_HOME` (потребуется перезапуск IDE). https://issues.apache.org/jira/browse/SPARK-2356.
## Анализ данных велопарковок
Тестовыми данными являются список поездок на велосипедах `trips.csv` и список велостоянок проката велосипедов `stations.csv`.
![](images/18_.png)
Создайте по одному RDD на основе каждого файла `stations.csv`, `trips.csv`. Считайте данные в переменную, затем запомните заголовок. Объявите новую переменную с данными, в которых не будет заголовка, а строки преобразованы в массивы строк в соответствии с разделителем — запятая.
*Примечание.* Существует более эффективный, но громоздкий способ исключить заголовок из данных с использованием метода mapPartitionWithIndex. Пример присутствует в первой части лабораторной работы в разделе нахождения десяти популярных номеров такси.
```scala
val tripData = sc.textFile("file:///z:/data/trips.csv")
// запомним заголовок, чтобы затем его исключить
val tripsHeader = tripData.first
val trips = tripData.filter(row=>row!=tripsHeader).map(row=>row.split(",",-1))
val stationData = sc.textFile("file:///z:/data/stations.csv")
val stationsHeader = stationData.first
val stations = stationData.filter(row=>row!=stationsHeader).map(row=>row.split(",",-1))
```
*Примечание.* Использование в качестве второго параметра -1 в `row.split(",",-1)` позволяет не отбрасывать пустые строки. Например `"1,2,".split(",")` вернёт `Array("1","2")`, а `"1,2,".split(",",-1)` вернёт `Array("1","2","")`.
Выведите заголовки таблиц и изучите колонки csv файлов.
```scala
stationsHeader
tripsHeader
```
Выведите несколько элементов данных в trips и stations.
*Примечание.* Убрать информационные строки логов из выдачи можно следующим образом:
```scala
import org.apache.log4j.{Logger, Level}
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.spark-project").setLevel(Level.WARN)
```
Объявите `stationsIndexed` так, чтобы результатом был список пар ключ-значение с целочисленным ключом из первой колонки. Таким образом вы создаёте индекс на основе первой колонки - номера велостоянки
```scala
val stationsIndexed = stations.keyBy(row=>row(0).toInt)
```
*Примечание.* Обращение к массиву в Scala производится в круглых скобках. Например Array(1,2,3)(0) вернёт первый элемент.
Выведите часть данных нового RDD.
Аналогичное действие проделайте для индексирования коллекции trips по колонкам Start Terminal и End Terminal и сохраните результат в переменные, например tripsByStartTerminals и tripsByEndTerminals.
Выполните операцию объединения коллекций по ключу с помощью функции join. Объедините stationsIndexed и tripsByStartTerminals, stationsIndexed и tripsByEndTerminals.
```scala
val startTrips = stationsIndexed.join(tripsByStartTerminals)
val endTrips = stationsIndexed.join(tripsByEndTerminals)
```
Объявление последовательности трансформаций приводит к созданию ацикличного ориентированного графа. Вывести полученный граф можно для любого RDD.
```scala
startTrips.toDebugString
endTrips.toDebugString
```
![](images/19_.png)
Выполните объявленные графы трансформаций вызовом команды count.
```scala
startTrips.count()
endTrips.count()
```
Если вы знаете распределение ключей заранее, вы можете выбрать оптимальный способ хеширования ключей по разделам `Partition`. Например, если один ключ встречается на порядки чаще, чем другие ключи, то использование `HashPartitioner` будет не лучшим выбором, так как данные связанные с этим ключом будут собираться в одном разделе. Это приведёт к неравномерной нагрузке на вычислительные ресурсы.
Выбрать определённую реализацию класса распределения по разделам можно с помощью функции RDD `partitionBy`. Например, для RDD `stationsIndexed` выбирается `HashPartitioner` с количеством разделов равным количеству разделов trips RDD.
```scala
stationsIndexed.partitionBy(new HashPartitioner(trips.partitions.size))
```
Также можно создавать свои классы для распределения ключей. Узнать какой класс назначен для текущего RDD можно обращением к полю partitioner.
```scala
stationsIndexed.partitioner
```
## Создание модели данных
Для более эффективной обработки и получения дополнительных возможностей мы можем объявить классы сущностей предметной области и преобразовать исходные строковые данные в объявленное представление.
В Scala часто для объявления структур данных используется конструкция case class. Особенностью такого объявления класса являются: автоматическое создание методов доступа get для аргументов конструктора, автоматическое определение методов hashcode и equals, возможность case классов быть разобранными по шаблону (pattern matching). Например, для определения
```scala
case class IntNumber(val value:Integer)
```
выполнение
```scala
new IntNumber(4).value
```
вернёт значение 4.
Объявите case классы для представления строк таблиц в соответствии с именами заголовков.
```scala
case class Station(
stationId:Integer,
name:String,
lat:Double,
long:Double,
dockcount:Integer,
landmark:String,
installation:String,
notes:String)
case class Trip(
tripId:Integer,
duration:Integer,
startDate:LocalDateTime,
startStation:String,
startTerminal:Integer,
endDate:LocalDateTime,
endStation:String,
endTerminal:Integer,
bikeId: Integer,
subscriptionType: String,
zipCode: String)
```
Для конвертации времени будем использовать пакет java.time. Краткое введение в работу с пакетом находится в Приложении Б. Объявим формат данных.
```scala
val timeFormat = DateTimeFormatter.ofPattern("M/d/yyyy H:m")
```
Объявим trips с учётом преобразования во внутреннее представление.
```scala
val tripsInternal = trips.mapPartitions(rows => {
val timeFormat = DateTimeFormatter.ofPattern("M/d/yyyy H:m")
rows.map( row =>
new Trip(tripId=row(0).toInt,
duration=row(1).toInt,
startDate= LocalDate.parse(row(2), timeFormat),
startStation=row(3),
startTerminal=row(4).toInt,
endDate=LocalDate.parse(row(5), timeFormat),
endStation=row(6),
endTerminal=row(7).toInt,
bikeId=row(8).toInt,
subscriptionType=row(9),
zipCode=row(10)))})
```
Изучите полученные данные. Например, вызовом следующих команд:
```scala
tripsInternal.first
tripsInternal.first.startDate
```
*Примечание.* В связи с тем, что timeFormat содержит несериализуемый объект, его необходимо создавать на каждом узле для каждой партиции.
То же можно проделать и для station RDD
```scala
val stationsInternal = stations.map(row=>
new Station(stationId=row(0).toInt,
name=row(1),
lat=row(2).toDouble,
long=row(3).toDouble,
dockcount=row(4).toInt,
landmark=row(5),
installation=row(6)
notes=null))
```
*Примечание.* Восьмая колонка не присутствует в таблице, так как в данных она пустая. Если в будущем она не будет использоваться, имеет смысл её убрать из описания case класса.
*Примечание.* В данных присутствуют различные форматы времени.
Посчитаем среднее время поездки, используя groupByKey.
Для этого потребуется преобразовать trips RDD в RDD коллекцию пар ключ-значение аналогично тому, как мы совершали это ранее методом keyBy.
```scala
val tripsByStartStation = tripsInternal.keyBy(record => record.startStation)
```
Рассчитаем среднее время поездки для каждого стартового парковочного места
```scala
val avgDurationByStartStation = tripsByStartStation
.mapValues(x=>x.duration)
.groupByKey()
.mapValues(col=>col.reduce((a,b)=>a+b)/col.size)
```
Выведем первые 10 результатов
```scala
avgDurationByStartStation.take(10).foreach(println)
```
Выполнение операции groupByKey приводит к интенсивным передачам данных. Если группировка делается для последующей редукции элементов лучше использовать трансформацию reduceByKey или aggregateByKey. Их выполнение приведёт сначала к локальной редукции над разделом Partition, а затем будет произведено окончательное суммирование над полученными частичными суммами.
*Примечание.* Выполнение reduceByKey логически сходно с выполнением Combine и Reduce фазы MapReduce работы.
Функция aggregateByKey является аналогом reduceByKey с возможностью указывать начальный элемент.
Рассчитаем среднее значение с помощью aggregateByKey. Одновременно будут вычисляться два значения для каждого стартового терминала: сумма времён и количество поездок.
```scala
val avgDurationByStartStation2 = tripsByStartStation
.mapValues(x=>x.duration)
.aggregateByKey((0,0))(
(acc, value) => (acc._1 + value, acc._2 + 1),
(acc1, acc2) => (acc1._1+acc2._1, acc1._2+acc2._2))
.mapValues(acc=>acc._1/acc._2)
```
В первых скобках передаётся начальное значение. В нашем случае это пара нулей. Первая анонимная функция предназначена для прохода по коллекции раздела. На этом проходе значение элементов помещаются средой в переменную value, а переменная «аккумулятора» acc накапливает значения. Вторая анонимная функция предназначена для этапа редукции частично посчитанных локальных результатов.
Сравните результаты `avgDurationByStartStation` и `avgDurationByStartStation2` и их время выполнения.
Теперь найдём первую поездку для каждой велостоянки. Для решения опять потребуется группировка. Ещё одним недостатком `groupByKey` данных является то, что для группировки данные должны поместиться в оперативной памяти. Это может привести к ошибке `OutOfMemoryException` для больших объёмов данных.
Сгруппируем поездки по велостоянкам и отсортируем поездки в группах по возрастанию даты.
```scala
val firstGrouped = tripsByStartStation
.groupByKey()
.mapValues(x =>
x.toList.sortWith((trip1, trip2) =>
trip1.startDate.compareTo(trip2.startDate)<0))
```
![](images/20_.png)
Лучшим вариантом с точки зрения эффективности будет использование трансформации reduceByKey
```scala
val firstGrouped = tripsByStartStation
.reduceByKey((trip1,trip2) =>
if (trip1.startDate.compareTo(trip2.startDate)<0)
trip1 else trip2)
```
В данном случае «передаваться дальше» будет меньшее из значений ключа.
![](images/21_.png)
## Запуск проекта в кластере
Для запуска собранного проекта на сервере используйте команду `spark-submit`. Однако прежде чем собрать проект, необходимо его изменить, так как в данный момент в коде зашиты пути к файлам с данными в локальной системе и режим запуска (`setMaster(“local[2]“)`).
Параметризуйте эти значения аргументами передаваемыми в программу при запуске.
```scala
val Seq(masterURL, tripDataPath, stationDataPath) = args.toSeq
val cfg = new SparkConfig().setAppName("Test").setMaster(masterURL)
val tripData = sc.textFile(tripDataPath)
val stationData = sc.textFile(stationDataPath)
```
В конфигурации запуска добавьте значения аргументов:
Проверьте, что проект работает на локальном компьютере.
Соберите JAR с помощью sbt команды package. Файл появится в директории `target/scala-2.12`. Скопируйте его на сервер с помощью `scp` и запустите.
```
$ spark-submit --deploy-mode cluster untitled4_2.11-0.1.jar yarn /labs/lab1/trips.csv /labs/lab1/stations.csv
```
Логи YARN контейнеров вы можете найти в директории `/mapr/tmp/studX/`. Проверьте, что выдача вашей программы на сервере идентична выдаче в IDE при запуске на локальном компьютере.
# Приложение А
## Краткое описание файловой системы HDFS
HDFS — распределенная файловая система, используемая в проекте Hadoop. HDFS-кластер в первую очередь состоит из NameNоde-сервера и DataNode-серверов, которые хранят данные. NameNode-сервер управляет пространством имен файловой системы и доступом клиентов к данным. Чтобы разгрузить NameNode-сервер, передача данных осуществляется только между клиентом и DataNode-сервером.
Развёртывание экземпляра HDFS предусматривает наличие центрального узла имён (англ. name node), хранящего метаданные файловой системы и метаинформацию о распределении блоков, и серии узлов данных (англ. data node), непосредственно хранящих блоки файлов. Узел имён отвечает за обработку операций уровня файлов и каталогов — открытие и закрытие файлов, манипуляция с каталогами, узлы данных непосредственно отрабатывают операции по записи и чтению данных. Узел имён и узлы данных снабжаются веб-серверами, отображающими текущий статус узлов и позволяющими просматривать содержимое файловой системы. Административные функции доступны из интерфейса командной строки.
# Приложение Б
## Основные понятия java.time
Для представления времени в Java 8 рекомендуется использовать пакет `java.time`, реализующий стандарт JSR 310. Документация пакета `java.time` доступна по адресу https://docs.oracle.com/javase/8/docs/api/java/time/package-summary.html.
Далее приводится работа с основными классами представления времени `java.time`. Для экспериментов удобно использовать REPL консоль. Если вы находитесь в среде разработки IDEA Scala консоль может быть запущена нажатием Ctrl+Shift+D, либо через контекстное меню Intellij IDEA.
Примечание. REPL (от сокращения `read`, `eval`, `print`, `loop` - считать, выполнить, напечатать, повторять в цикле)  интерактивный цикл взаимодействия программной среды с пользователем.
![](images/23_.png)
![](images/24_.png)
Примечание. Консоль также можно запустить в командном окне операционной системы с помощью sbt console, находясь в папке с проектом. В обоих вариантах зависимости проекта подключаются автоматически так, что вы можете работать со сторонними библиотеками.
В пакете java.time различаются представления времени:
- класс Instant — хранит числовую метку;
- класс LocalDate — хранит дату без времени;
- класс LocalTime — хранит время без даты;
- класс LocalDateTime — хранит время и дату;
- класс ZonedDateTime — хранит дату, время и часовой пояс.
Узнайте в консоли текущее время, вызвав статический метод now() у каждого класса, изучите возвращаемое представление. Например,
``` scala
import java.time._
Instant.now()
```
Перед использованием классов объявите их импорт. Символ «_» импортирует все классы данного пакета.
Enter используется для переноса строки. Для выполнения нажмите сочетание клавиш Ctrl+Enter.
![](images/25_.png)
Создайте примечательную вам дату с помощью статического конструктора of классов `LocalDate`, `LocalDateTime`, `ZonedDateTime`. Воспользуйтесь подсказками среды разработки или документацией для определения количества аргументов метода of и их значения.
Изучите создание времён и дат с помощью метода parse данных классов. Используйте форматирование, которое выдавалось системой при возвращении значения в консоль.
![](images/26_.png)
Для задания пользовательского формата считывания вы можете использовать класс DateTimeFormatter. Описание класса и символов шаблона располагается по адресу https://docs.oracle.com/javase/8/docs/api/java/time/format/DateTimeFormatter.html.