diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..2cf0f81 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +*.ipynb_checkpoints diff --git a/L1 - Introduction to Apache Spark/L1_Apache_Spark.md b/L1 - Introduction to Apache Spark/L1_Apache_Spark.md index 42a6406..c0dca36 100644 --- a/L1 - Introduction to Apache Spark/L1_Apache_Spark.md +++ b/L1 - Introduction to Apache Spark/L1_Apache_Spark.md @@ -23,21 +23,16 @@ Apache Spark — программный каркас с открытым исх - MapR https://mapr.com/try-mapr/sandbox/, - Hortonworks https://www.cloudera.com/downloads/hortonworks-sandbox.html . -Либо, следуя инструкциями в приложении В, запустите Spark кластер в облаке Microsoft Azure. +# Работа в консоли spark-shell/pyspark -MapR-FS — API совместимая с HDFS реализация распределённой файловой системы (РФС) от MapR. - -Подключитесь к машине по ssh, используя ip или имя узла кластера. Узнать ip адрес виртуальной машины можно выполнив команду ifconfig. В облаке вашей машине присваивается имя, которое вы можете найти на главной странице HDInsight кластера в вкладке Overview. - -# Работа в консоли spark-shell - -В первой части задания вы работаете с 3 файловыми системами: -- локальной файловой системой, в которой находятся файлы с заданиями, +В первой части задания вы работаете с 2 файловыми системами: - файловой системой Linux узла кластера, - распределённой файловой системой (краткое описание приведено в приложении А). -## Основные операции взаимодействия с распределённой файловой системой +## Основные операции взаимодействия с распределённой файловой системой MapR-FS + +MapR-FS — API совместимая с HDFS реализация распределённой файловой системы (РФС) от MapR. Для импорта/экспорта данных в РФС используйте команды `hadoop fs –put` и `hadoop fs –get`: ``` @@ -66,6 +61,7 @@ warandsociety.txt 100% 5204KB 5 $ hadoop fs -put * . ``` Проверьте, переместились ли файлы. + ```bash $ hadoop fs -ls ``` @@ -75,7 +71,7 @@ $ hadoop fs -ls ![spark shell command results](images/2_spark_shell_command.png) -Веб-страница запущенной сессии spark-shell. +Веб-страница запущенной сессии. ![spark shell command results](images/3_spark_shell_web.png) @@ -98,36 +94,70 @@ $ hadoop fs -ls Создайте RDD для текстового файла warandpeace.txt. Для подробного списка операций считывания файлов обращайтесь к документации класса SparkContext https://spark.apache.org/docs/2.4.0/api/scala/index.html#org.apache.spark.SparkContext. *Примечание.* При наборе команд используйте TAB $-$ функцию автодополнения. + +Scala ```scala -val warandpeace = sc.textFile("warandpeace.txt") +val warandpeace = sc.textFile("warandsociety.txt") +``` +Python +```python +warandpeace = sc.textFile("warandsociety.txt") ``` + В данной команде указывается относительный путь, который начинается с вашей папки в РФС. Выведите количество строк файла. + +Scala ```scala warandpeace.count ``` *Примечание.* При отсутствии у функции аргументов, в scala скобки можно опускать. +Python +```python +warandpeace.count() +``` Попробуйте считать несуществующий файл, например `nil`, а затем вывести количество его строк на экран + +Scala ```scala val nilFile = sc.textFile("nil") nilFile.count ``` + +Python +```python +nilFile = sc.textFile("nil") +nilFile.count() +``` ![](images/4_textFile.png) Заметьте, что первая команда выполняется успешно, а вторая выводит сообщение, что такого файла нет. Это происходит потому, что выполнение обработки в Spark является ленивым и не запускается, до встречи команды действия(action). `count` $-$ первая команда действия, с которой вы познакомились. -Считайте первые 10 строк файла warandpeace.txt. +Считайте первые 10 строк файла warandsociety.txt. + +Scala ```scala warandpeace.take(10) ``` + +Python +```python +warandpeace.take(10) +``` + Эта команда не требует считывания и передачи на главный узел всех данных RDD. Узнайте на сколько частей разделились данные в кластере. + +Scala ```scala warandpeace.partitions ``` +~Python~ + + Если используется определённый метод распределения вы можете получить данные о нём командой `partitioner`. Начиная с версии 1.6.0 доступна команда `warAndPeaceFile.getNumPartitions` для получения информации о количестве разделов. Создайте распределённую коллекцию из нескольких элементов и для каждого элемента верните ip адрес, на котором он расположен: @@ -137,20 +167,35 @@ sc.parallelize(Array(1,2,3)).map(x => java.net.InetAddress.getLocalHost).collect ## Обработка текста Найдите строки, в которых содержится слово "война". + +Scala ```scala val linesWithWar = warandpeace.filter(x => x.contains("война")) ``` +Python +```python +linesWithWar = warandpeace.filter(x => "война" in x) +``` *Примечание.* Аргументом filter является лямбда функция $-$ функция без имени. До обозначения => в скобках через запятую следуют переменные аргументов функции, затем следует команда языка Scala. При использовании фигурных скобок язык позволяет описывать лямбда функции с цепочкой команд в теле, аналогично именованным функциям. Запросите первую строку. Строкой в данном файле является целый абзац, так как только по завершению абзаца содержится символ переноса строки. + +Scala ```scala linesWithWar.first ``` +Python +```python +linesWithWar.first() +``` + Данные могут быть перемещены в кэш. Этот приём очень полезен при повторном обращении к данным, для запросов "горячих" данных или запуска итеративных алгоритмов. Перед подсчётом количества элементов вызовите команду кэширования `cache()`. Трансформации не будут обработаны, пока не будет запущена одна из команд - действий. Воспользуйтесь следующим блоком кода для замера времени выполнения команды. + +Scala ```scala def time[R](block: => R): R = { val t0 = System.nanoTime() @@ -172,19 +217,41 @@ time{ linesWithWar.count() } ![](images/6_count_elapsed_time.png) +Python +```python +def time(f): + import time + t = time.process_time() + f() + print(f"Elapsed time: {int((time.process_time() - t)*1e9)} ns") + +``` + +```python +linesWithWar.cache() +time(lambda: linesWithWar.count() ) +time(lambda: linesWithWar.count() ) +``` + При выполнении команды count второй раз вы должны заметить небольшое ускорение. Кэширование небольших файлов не даёт большого преимущества, однако для огромных файлов, распределённых по сотням или тысячам узлов, разница во времени выполнения может быть существенной. Вторая команда `linesWithWar.count()` выполняется над результатом от предшествующих команде cache трансформаций и на больших объёмах данных будет ускорять выполнение последующих команд. Найдите гистограмму слов: +Scala ```scala val wordCounts = linesWithWar.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a + b) ``` +Python +```python +wordCounts = linesWithWar.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b) +``` Spark существенно упростил реализацию многих задач, ранее решаемых с использованием MapReduce. Эта однострочная программа $-$ WordCount $-$ является наиболее популярным примером задачи, эффективно распараллеливаемой в Hadoop кластере. Её реализация в MapReduce занимается около 130 строк кода. Сохраните результаты в файл, а затем, найдите данные в HDFS и выведите данные в linux консоли с помощью команды `hadoop fs -cat warandpeace_histogram.txt/*` (здесь используется относительный путь). +Scala/Python ```scala wordCounts.saveAsTextFile("warandpeace_histogram.txt") ``` @@ -195,9 +262,17 @@ $ hadoop fs -cat warandpeace_histogram.txt/* *Упражнение.* Улучшите процедуру, убирая из слов лишние символы и трансформируя все слова в нижний регистр. Используйте регулярные выражения. Например, по регулярному выражению "\\w*".r следующий код +Scala ```scala "\\w*".r.findAllIn("a b c").toArray.foreach(println) ``` +Python +```python +import re +p = re.compile('\w+') +letters = p.findall("a b c") +[print(l) for l in letters] +``` выведет на каждой строке по букве. Кроме Scala консоли для тестирования регулярных выражений вы можете использовать сайты: - https://regex101.com/, @@ -208,34 +283,44 @@ $ hadoop fs -cat warandpeace_histogram.txt/* Инициализируйте два множества - +Scala ```scala val a = sc.parallelize(Array(1,2,3,4)) val b = sc.parallelize(Array(3,4,6,7)) ``` +Python +```python +a = sc.parallelize([1,2,3,4]) +b = sc.parallelize([3,4,6,7]) +``` Найдите объединение a и b и соберите данные на главный узел с помощью функции collect. +Scala/Python ```scala a.union(b).collect ``` + Обратите внимание, что общие элементы дублируются, поэтому результат не является классическим множеством на самом деле. Такое поведение делает это операцию очень дешёвой, так как обновляется только информация о местонахождении данных для данного RDD. Уберите дубликаты с помощью distinct. +Scala/Python ```scala -a.union(b).distinct().collect +a.union(b).distinct().collect() ``` Найдите пересечение множеств. +Scala/Python ```scala -a.intersection(b).collect +a.intersection(b).collect() ``` Найдите разность множеств. +Scala/Python ```scala -a.subtract(b).collect +a.subtract(b).collect() ``` *Примечание.* При запуске collect на центральный узел - driver передаются все данные из распределённого RDD. При работе с большим объемом данных выполнение данной команды может заполнить всю оперативную память driver узла. @@ -252,12 +337,18 @@ a.subtract(b).collect В консоли, с которой вы работали в предыдущем разделе, создайте широковещательную переменную. Наберите: +Scala ```scala val broadcastVar = sc.broadcast(Array(1,2,3)) ``` +Python +```python +broadcastVar = sc.broadcast([1,2,3]) +``` Для получения значения обратитесь к полю value: +Scala/Python ```scala broadcastVar.value ``` @@ -270,18 +361,29 @@ broadcastVar.value Потренируйтесь в создании аккумулирующих переменных: +Scala ```scala val accum = sc.longAccumulator ``` +Python +```python +accum = sc.accumulator(0) +``` Следующим шагом запустите параллельную обработку массива и в каждом параллельном задании добавьте к аккумулирующей переменной значение элемента массива: +Scala ```scala sc.parallelize(Array(1,2,3,4)).foreach(x => accum.add(x)) ``` +Python +```python +sc.parallelize([1,2,3,4]).foreach(lambda x: accum.add(x)) +``` Для получения текущего значения вызовите команду: +Scala/Python ```scala accum.value ``` @@ -290,21 +392,36 @@ accum.value Пары ключ-значение Создайте пару ключ-значение из двух букв: +Scala ```scala val pair = ('a', 'b') ``` +Python +```python +pair = ('a', 'b') +``` Для доступа к первому значению обратитесь к полю _1: +Scala ```scala pair._1 ``` +Python +```python +pair[0] +``` Для доступа к второму значению к полю _2: +Scala ```scala pair._2 ``` +Python +```python +pair[1] +``` Если распределённая коллекция состоит из пар, то они трактуются как для ключ-значение и для таких коллекций доступны дополнительные операции. Наиболее распространённые, это: группировка по ключу, агрегирование значений с одинаковыми ключами, объединение двух коллекций по ключу. @@ -320,47 +437,86 @@ pair._2 Создайте RDD на основе загруженных данных nyctaxi.csv: +Scala ```scala val taxi = sc.textFile("nyctaxi.csv") ``` +Python +```python +taxi = sc.textFile("nyctaxi.csv") +``` Выведите первые 5 строк из данной таблицы: +Scala ```scala taxi.take(5).foreach(println) ``` +Python +``` +for t in taxi.take(5): + print(t) +``` Обратите внимание, что первая строка является заголовком. Её как правило нужно будет отфильтровать. Одним из эффективных способов является следующий: +Scala ```scala taxi.mapPartitionsWithIndex{(idx,iter)=> if (idx==0) iter.drop(1) else iter } ``` +Python +```python +import itertools +taxi.mapPartitionsWithIndex(lambda idx, it: itertools.islice(it,1,None) if (idx==0) else it ) +``` *Примечание.* Для анализа структурированных табличных данных рассматривайте в качестве альтернативы использование SQL API и DataSet API. Для разбора значений потребуется создать RDD, где каждая строка разбита на массив подстрок. Используйте запятую в качестве разделителя. Наберите: +Scala ```scala val taxiParse = taxi.map(line=>line.split(",")) ``` +Python +```python +taxiParse = taxi.map(lambda line: line.split(",")) +``` Теперь преобразуем массив строк в массив пар ключ-значение, где ключом будет служить номер такси (6 колонка), а значением единица. +Scala ```scala val taxiMedKey = taxiParse.map(row => (row(6), 1)) ``` +Python +```python +taxiMedKey = taxiParse.map(lambda row: (row[6], 1)) +``` Следом мы можем найти количество поездок каждого номера такси: +Scala ```scala val taxiMedCounts = taxiMedKey.reduceByKey((v1, v2) => v1+v2) ``` +Python +```python +taxiMedCounts = taxiMedKey.reduceByKey(lambda v1, v2: v1+v2) +``` Выведем полученные результаты в отсортированном виде: +Scala ```scala taxiMedCounts.map(_.swap).top(10).map(_.swap).foreach(println) ``` +Python +```python +top10 = taxiMedCounts.map(lambda x: x[::-1]).top(10) +for x in top10: + print(x[::-1]) +``` *Примечание.* Нотация `_.swap` является объявлением анонимной функции от одного аргумента, аналог записи `x => x.swap`. @@ -368,33 +524,55 @@ taxiMedCounts.map(_.swap).top(10).map(_.swap).foreach(println) Вы также можете сгруппировать все описанные выше трансформации, преобразующие исходные данные в одну цепочку: +Scala ```scala val taxiCounts = taxi.map(line=>line.split(",")).map(row=>(row(6),1)).reduceByKey(_ + _) ``` +Python +```python +taxiCounts = taxi.map(lambda line: line.split(",")).map(lambda row: (row[6],1)).reduceByKey(lambda a,b: a + b) +``` *Примечание.* Нотация `_ + _` является объявлением анонимной функции от двух аргументов, аналог более многословной записи `(a,b) => a + b`. Попробуйте найти общее количество номеров такси несколько раз, предварительно объявив RDD taxiCounts как сохраняемую в кэше: +Scala/Python ```scala taxiCounts.cache() ``` Сравните время, которое трансформации выполняются первый раз и второй. Чем больше данные, тем существеннее разница. +Scala ```scala time{ taxiCounts.count() } time{ taxiCounts.count() } ``` +Python +```python +time(lambda: taxiCounts.count()) +time(lambda: taxiCounts.count()) +``` + ## Настройка способа хранения RDD В данной части будет рассмотрена настройка способов хранения RDD. Вы сравните различные способы хранения, включая: хранение в сериализованном виде, в исходном, с репликацией. +Scala ```scala -trips.persist(StorageLevel.MEMORY_ONLY) +taxi.persist(StorageLevel.MEMORY_ONLY) ``` +Python +```python +import pyspark +taxi.persist(storageLevel=pyspark.StorageLevel.MEMORY_ONLY) +``` + +https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.StorageLevel.html?highlight=storagelevel#pyspark.StorageLevel + ![](images/7_persistance_type.png) Другими способами хранения являются: @@ -406,470 +584,13 @@ trips.persist(StorageLevel.MEMORY_ONLY) - MEMORY_AND_DISK_2, - OFF_HEAP. -Подробнее о способах хранения вы можете узнать по адресу http://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence - - -# Создание локального проекта - -Разработка программы на Spark может быть выполнена на нескольких языках: Python, R, Scala, Java. В данном руководстве рассмотрим разработку на последних двух, так как они имеют самую полную поддержку API обработки данных. - -Разработка приложения может производиться в любом текстовом редакторе, затем быть собрана системой сборки в отдельное приложение и запущена на Spark кластере с помощью консольной команды spark-submit. - -В данной лабораторной работе мы будем использовать IntelliJ IDEA. IDE предоставляет набор возможностей для упрощения разработки: автодополнение, индексация проекта, статическая проверка кода, подсветка синтаксиса, интеграция с системами контроля версий и системами сборки. - -Для работы необходима установленная последняя версия IDE IntelliJ IDEA Community Edition. Данная среда разработки доступа для скачивания по адресу https://www.jetbrains.com/idea/. - -Для создания проекта в IntelliJ IDEA запустите среду разработки, выберите Create New Project. - -![](images/8_.png) - -## Разработка с использованием системы сборки SBT на языке Scala - -Для создания Scala + SBT проекта выберите слева в меню Scala и затем SBT. - -![](images/9_.png) - -Далее укажите имя проекта, версию Java, версию SBT и версию Scala компилятора. Для разработки на Spark 2.4.0 рекомендуется выбрать версию Scala 2.12.9. - -*Примечание.* Установите флаг Use auto-import для того, чтобы не обновлять зависимости вручную при изменениях в проекте. - -![](images/10_.png) - -После нажатия Finish откроется главное окно среды разработки. - -![](images/11_.png) - -Подождите, когда SBT скачает зависимости. - - -В дереве проекта должен появиться файл `build.sbt`, являющийся основным файлом для настройки сборки и указания зависимостей проекта SBT. В файле на момент создания указаны: имя проекта, версия проекта, версия языка Scala. - -*Примечание.* Появление предупреждений о конфликте имён в SBT 0.13.8 является известной ошибкой https://github.com/sbt/sbt/issues/1933. Одно из решений — использование более ранней версии или скрытие всех предупреждений установкой степени логирования `logLevel := Level.Error`. - - -Код Scala помещается в папку `src/main/scala` или `src/main/scala-2.12`. - -Создайте в папке scala объект Main c методом main. Данный метод будет точкой входа в программу. - -![](images/12_.png) -![](images/13_.png) - -*Примечание.* Аналогом объекта object в Java является паттерн Singleton. Выполнения тела объекта происходит при его загрузке в память, аналогично инициализации в конструкторе, методы object доступны без создания объекта оператором new, аналогично публичным статическим методам. - -```scala -object Main { - def main(args: Array[String]) { - println("Hello world") - } -} -``` - -В контекстном меню выберите Run 'Main', либо нажмите сочетание клавиш Ctrl+Shift+F10. - -![](images/14_.png) - -После выполнения в консоли должно появиться приветствие. - -![](images/15_.png) - -Добавьте к проекту зависимость Spark (версия на MapR кластере), записав следующие строки в конце файла build.sbt: - -```scala -libraryDependencies ++= Seq( - "org.apache.spark" %% "spark-core" % "2.4.0" -) -``` - -Сохраните изменения и обновите проект. - -![](images/16_.png) - -Подождите, когда SBT скачает все зависимые библиотеки. - -Измените код `Main.scala` и создайте простейшую Spark программу. Импортируйте классы пакета `org.apache.spark`. - -```scala -import org.apache.spark._ -``` - -Создайте конфигурацию Spark с помощью класса SparkConf. Укажите обязательные параметры: имя запускаемой задачи (имя контекста задачи) и режим запуска (список режимов http://spark.apache.org/docs/latest/submitting-applications.html#master-urls). В нашем случае в качестве режима будет указан параметр `local[2]`, означающий запуск с двумя потоками на локальной машине. В качестве режима может быть указан адрес главного узла. - -```scala -val cfg = new SparkConf() - .setAppName("Test").setMaster("local[2]") -``` - -*Примечание.* В Scala различаются два вида переменных: `val` и `var`. Переменные `val` являются неизменяемыми и инициализируются один раз, в отличие от `var`, которой можно присваивать новые значения несколько раз. - -Инициализируйте контекст Spark в главном методе. - -```scala -val sc = new SparkContext(cfg) -``` - -Добавьте в конец файла команду остановки контекста - -```scala -sc.stop() -``` - -После инициализации контекста вы можете обращаться к командам Spark. Считайте любой текстовый файл из локальной файловой системы и выведите его по строкам в консоль. - -*Примечание.* Путь к файлу в локальной файловой системе имеет определённый формат, имеющий префикс "file:///". https://tools.ietf.org/html/rfc8089 - -```scala -val textFile = sc.textFile("file:///c:/temp/file.txt") -textFile.foreach(println) -``` - -![](images/17_.png) - -*Примечание.* При работе без winutils.exe запись в файловую систему будет порождать ошибку. Известным решением является скачивание данного файла из проекта Hadoop в файловую систему в папку с названием bin и указанием переменной Spark `hadoop.home.dir`. В переменной `hadoop.home.dir` хранится путь к папке c Hadoop определённой версии. Установить переменную среды JVM вы можете кодом `System`.`setProperty(key, value)` . Другим решением проблемы является установка переменной среды `HADOOP_HOME` (потребуется перезапуск IDE). https://issues.apache.org/jira/browse/SPARK-2356. - -## Анализ данных велопарковок - -Тестовыми данными являются список поездок на велосипедах `trips.csv` и список велостоянок проката велосипедов `stations.csv`. - - ![](images/18_.png) - -Создайте по одному RDD на основе каждого файла `stations.csv`, `trips.csv`. Считайте данные в переменную, затем запомните заголовок. Объявите новую переменную с данными, в которых не будет заголовка, а строки преобразованы в массивы строк в соответствии с разделителем — запятая. - -*Примечание.* Существует более эффективный, но громоздкий способ исключить заголовок из данных с использованием метода mapPartitionWithIndex. Пример присутствует в первой части лабораторной работы в разделе нахождения десяти популярных номеров такси. - -```scala -val tripData = sc.textFile("file:///z:/data/trips.csv") - -// запомним заголовок, чтобы затем его исключить -val tripsHeader = tripData.first -val trips = tripData.filter(row=>row!=tripsHeader) - .map(row=>row.split(",",-1)) - -val stationData = sc.textFile("file:///z:/data/stations.csv") -val stationsHeader = stationData.first -val stations = stationData.filter(row=>row!=stationsHeader) .map(row=>row.split(",",-1)) -``` - -*Примечание.* Использование в качестве второго параметра -1 в `row.split(",",-1)` позволяет не отбрасывать пустые строки. Например `"1,2,".split(",")` вернёт `Array("1","2")`, а `"1,2,".split(",",-1)` вернёт `Array("1","2","")`. - -Выведите заголовки таблиц и изучите колонки csv файлов. - -```scala -stationsHeader -tripsHeader -``` - -Выведите несколько элементов данных в trips и stations. - -*Примечание.* Убрать информационные строки логов из выдачи можно следующим образом: - -```scala -import org.apache.log4j.{Logger, Level} -Logger.getLogger("org.apache.spark").setLevel(Level.WARN) -Logger.getLogger("org.spark-project").setLevel(Level.WARN) -``` - -Объявите `stationsIndexed` так, чтобы результатом был список пар ключ-значение с целочисленным ключом из первой колонки. Таким образом вы создаёте индекс на основе первой колонки - номера велостоянки - -```scala -val stationsIndexed = stations.keyBy(row=>row(0).toInt) -``` - -*Примечание.* Обращение к массиву в Scala производится в круглых скобках. Например Array(1,2,3)(0) вернёт первый элемент. - -Выведите часть данных нового RDD. - -Аналогичное действие проделайте для индексирования коллекции trips по колонкам Start Terminal и End Terminal и сохраните результат в переменные, например tripsByStartTerminals и tripsByEndTerminals. - -Выполните операцию объединения коллекций по ключу с помощью функции join. Объедините stationsIndexed и tripsByStartTerminals, stationsIndexed и tripsByEndTerminals. - -```scala -val startTrips = stationsIndexed.join(tripsByStartTerminals) -val endTrips = stationsIndexed.join(tripsByEndTerminals) -``` - -Объявление последовательности трансформаций приводит к созданию ацикличного ориентированного графа. Вывести полученный граф можно для любого RDD. - -```scala -startTrips.toDebugString -endTrips.toDebugString -``` - -![](images/19_.png) - - -Выполните объявленные графы трансформаций вызовом команды count. - -```scala -startTrips.count() -endTrips.count() -``` - -Если вы знаете распределение ключей заранее, вы можете выбрать оптимальный способ хеширования ключей по разделам `Partition`. Например, если один ключ встречается на порядки чаще, чем другие ключи, то использование `HashPartitioner` будет не лучшим выбором, так как данные связанные с этим ключом будут собираться в одном разделе. Это приведёт к неравномерной нагрузке на вычислительные ресурсы. - -Выбрать определённую реализацию класса распределения по разделам можно с помощью функции RDD `partitionBy`. Например, для RDD `stationsIndexed` выбирается `HashPartitioner` с количеством разделов равным количеству разделов trips RDD. - -```scala -stationsIndexed.partitionBy(new HashPartitioner(trips.partitions.size)) -``` - -Также можно создавать свои классы для распределения ключей. Узнать какой класс назначен для текущего RDD можно обращением к полю partitioner. - -```scala -stationsIndexed.partitioner -``` - -## Создание модели данных - -Для более эффективной обработки и получения дополнительных возможностей мы можем объявить классы сущностей предметной области и преобразовать исходные строковые данные в объявленное представление. - -В Scala часто для объявления структур данных используется конструкция case class. Особенностью такого объявления класса являются: автоматическое создание методов доступа get для аргументов конструктора, автоматическое определение методов hashcode и equals, возможность case классов быть разобранными по шаблону (pattern matching). Например, для определения - -```scala -case class IntNumber(val value:Integer) -``` - -выполнение - -```scala -new IntNumber(4).value -``` - -вернёт значение 4. - -Объявите case классы для представления строк таблиц в соответствии с именами заголовков. - -```scala -case class Station( - stationId:Integer, - name:String, - lat:Double, - long:Double, - dockcount:Integer, - landmark:String, - installation:String, - notes:String) - -case class Trip( - tripId:Integer, - duration:Integer, - startDate:LocalDateTime, - startStation:String, - startTerminal:Integer, - endDate:LocalDateTime, - endStation:String, - endTerminal:Integer, - bikeId: Integer, - subscriptionType: String, - zipCode: String) -``` - -Для конвертации времени будем использовать пакет java.time. Краткое введение в работу с пакетом находится в Приложении Б. Объявим формат данных. - -```scala -val timeFormat = DateTimeFormatter.ofPattern("M/d/yyyy H:m") -``` - -Объявим trips с учётом преобразования во внутреннее представление. - -```scala -val tripsInternal = trips.mapPartitions(rows => { - val timeFormat = DateTimeFormatter.ofPattern("M/d/yyyy H:m") - rows.map( row => - new Trip(tripId=row(0).toInt, - duration=row(1).toInt, - startDate= LocalDate.parse(row(2), timeFormat), - startStation=row(3), - startTerminal=row(4).toInt, - endDate=LocalDate.parse(row(5), timeFormat), - endStation=row(6), - endTerminal=row(7).toInt, - bikeId=row(8).toInt, - subscriptionType=row(9), - zipCode=row(10)))}) -``` - -Изучите полученные данные. Например, вызовом следующих команд: - -```scala -tripsInternal.first -tripsInternal.first.startDate -``` - -*Примечание.* В связи с тем, что timeFormat содержит несериализуемый объект, его необходимо создавать на каждом узле для каждой партиции. - -То же можно проделать и для station RDD - -```scala -val stationsInternal = stations.map(row=> - new Station(stationId=row(0).toInt, - name=row(1), - lat=row(2).toDouble, - long=row(3).toDouble, - dockcount=row(4).toInt, - landmark=row(5), - installation=row(6) - notes=null)) -``` - -*Примечание.* Восьмая колонка не присутствует в таблице, так как в данных она пустая. Если в будущем она не будет использоваться, имеет смысл её убрать из описания case класса. - -*Примечание.* В данных присутствуют различные форматы времени. - -Посчитаем среднее время поездки, используя groupByKey. - -Для этого потребуется преобразовать trips RDD в RDD коллекцию пар ключ-значение аналогично тому, как мы совершали это ранее методом keyBy. - -```scala -val tripsByStartStation = tripsInternal.keyBy(record => record.startStation) -``` - -Рассчитаем среднее время поездки для каждого стартового парковочного места - -```scala -val avgDurationByStartStation = tripsByStartStation - .mapValues(x=>x.duration) - .groupByKey() - .mapValues(col=>col.reduce((a,b)=>a+b)/col.size) -``` - -Выведем первые 10 результатов - -```scala -avgDurationByStartStation.take(10).foreach(println) -``` - -Выполнение операции groupByKey приводит к интенсивным передачам данных. Если группировка делается для последующей редукции элементов лучше использовать трансформацию reduceByKey или aggregateByKey. Их выполнение приведёт сначала к локальной редукции над разделом Partition, а затем будет произведено окончательное суммирование над полученными частичными суммами. - -*Примечание.* Выполнение reduceByKey логически сходно с выполнением Combine и Reduce фазы MapReduce работы. - -Функция aggregateByKey является аналогом reduceByKey с возможностью указывать начальный элемент. - -Рассчитаем среднее значение с помощью aggregateByKey. Одновременно будут вычисляться два значения для каждого стартового терминала: сумма времён и количество поездок. - -```scala -val avgDurationByStartStation2 = tripsByStartStation - .mapValues(x=>x.duration) - .aggregateByKey((0,0))( - (acc, value) => (acc._1 + value, acc._2 + 1), - (acc1, acc2) => (acc1._1+acc2._1, acc1._2+acc2._2)) - .mapValues(acc=>acc._1/acc._2) -``` - -В первых скобках передаётся начальное значение. В нашем случае это пара нулей. Первая анонимная функция предназначена для прохода по коллекции раздела. На этом проходе значение элементов помещаются средой в переменную value, а переменная «аккумулятора» acc накапливает значения. Вторая анонимная функция предназначена для этапа редукции частично посчитанных локальных результатов. - -Сравните результаты `avgDurationByStartStation` и `avgDurationByStartStation2` и их время выполнения. - -Теперь найдём первую поездку для каждой велостоянки. Для решения опять потребуется группировка. Ещё одним недостатком `groupByKey` данных является то, что для группировки данные должны поместиться в оперативной памяти. Это может привести к ошибке `OutOfMemoryException` для больших объёмов данных. - -Сгруппируем поездки по велостоянкам и отсортируем поездки в группах по возрастанию даты. - -```scala -val firstGrouped = tripsByStartStation - .groupByKey() - .mapValues(x => - x.toList.sortWith((trip1, trip2) => - trip1.startDate.compareTo(trip2.startDate)<0)) -``` - -![](images/20_.png) - -Лучшим вариантом с точки зрения эффективности будет использование трансформации reduceByKey - -```scala -val firstGrouped = tripsByStartStation - .reduceByKey((trip1,trip2) => - if (trip1.startDate.compareTo(trip2.startDate)<0) - trip1 else trip2) -``` - -В данном случае «передаваться дальше» будет меньшее из значений ключа. - -![](images/21_.png) - -## Задачи: - -1. Найти велосипед с максимальным пробегом. -2. Найти наибольшее расстояние между станциями. -3. Найти путь велосипеда с максимальным пробегом через станции. -4. Найти количество велосипедов в системе. -5. Найти пользователей потративших на поездки более 3 часов. - -## Запуск проекта в кластере - -Для запуска собранного проекта на сервере используйте команду `spark-submit`. Однако прежде чем собрать проект, необходимо его изменить, так как в данный момент в коде зашиты пути к файлам с данными в локальной системе и режим запуска (`setMaster(“local[2]“)`). - -Параметризуйте эти значения аргументами передаваемыми в программу при запуске. - -```scala -val Seq(masterURL, tripDataPath, stationDataPath) = args.toSeq -val cfg = new SparkConfig().setAppName("Test").setMaster(masterURL) - -val tripData = sc.textFile(tripDataPath) -val stationData = sc.textFile(stationDataPath) -``` - -В конфигурации запуска добавьте значения аргументов: - - -Проверьте, что проект работает на локальном компьютере. - -Соберите JAR с помощью sbt команды package. Файл появится в директории `target/scala-2.12`. Скопируйте его на сервер с помощью `scp` и запустите. - -``` -$ spark-submit --deploy-mode cluster untitled4_2.11-0.1.jar yarn /labs/lab1/trips.csv /labs/lab1/stations.csv -``` - -Логи YARN контейнеров вы можете найти в директории `/mapr/tmp/studX/`. Проверьте, что выдача вашей программы на сервере идентична выдаче в IDE при запуске на локальном компьютере. - - -# Приложение А -## Краткое описание файловой системы HDFS - -HDFS — распределенная файловая система, используемая в проекте Hadoop. HDFS-кластер в первую очередь состоит из NameNоde-сервера и DataNode-серверов, которые хранят данные. NameNode-сервер управляет пространством имен файловой системы и доступом клиентов к данным. Чтобы разгрузить NameNode-сервер, передача данных осуществляется только между клиентом и DataNode-сервером. - - -Развёртывание экземпляра HDFS предусматривает наличие центрального узла имён (англ. name node), хранящего метаданные файловой системы и метаинформацию о распределении блоков, и серии узлов данных (англ. data node), непосредственно хранящих блоки файлов. Узел имён отвечает за обработку операций уровня файлов и каталогов — открытие и закрытие файлов, манипуляция с каталогами, узлы данных непосредственно отрабатывают операции по записи и чтению данных. Узел имён и узлы данных снабжаются веб-серверами, отображающими текущий статус узлов и позволяющими просматривать содержимое файловой системы. Административные функции доступны из интерфейса командной строки. - -# Приложение Б -## Основные понятия java.time - -Для представления времени в Java 8 рекомендуется использовать пакет `java.time`, реализующий стандарт JSR 310. Документация пакета `java.time` доступна по адресу https://docs.oracle.com/javase/8/docs/api/java/time/package-summary.html. - -Далее приводится работа с основными классами представления времени `java.time`. Для экспериментов удобно использовать REPL консоль. Если вы находитесь в среде разработки IDEA Scala консоль может быть запущена нажатием Ctrl+Shift+D, либо через контекстное меню Intellij IDEA. - -Примечание. REPL (от сокращения `read`, `eval`, `print`, `loop` - считать, выполнить, напечатать, повторять в цикле)  интерактивный цикл взаимодействия программной среды с пользователем. - -![](images/23_.png) -![](images/24_.png) - -Примечание. Консоль также можно запустить в командном окне операционной системы с помощью sbt console, находясь в папке с проектом. В обоих вариантах зависимости проекта подключаются автоматически так, что вы можете работать со сторонними библиотеками. - -В пакете java.time различаются представления времени: -- класс Instant — хранит числовую метку; -- класс LocalDate — хранит дату без времени; -- класс LocalTime — хранит время без даты; -- класс LocalDateTime — хранит время и дату; -- класс ZonedDateTime — хранит дату, время и часовой пояс. - -Узнайте в консоли текущее время, вызвав статический метод now() у каждого класса, изучите возвращаемое представление. Например, - -``` scala -import java.time._ -Instant.now() -``` - -Перед использованием классов объявите их импорт. Символ «_» импортирует все классы данного пакета. - -Enter используется для переноса строки. Для выполнения нажмите сочетание клавиш Ctrl+Enter. - -![](images/25_.png) - -Создайте примечательную вам дату с помощью статического конструктора of классов `LocalDate`, `LocalDateTime`, `ZonedDateTime`. Воспользуйтесь подсказками среды разработки или документацией для определения количества аргументов метода of и их значения. - - +Подробнее о способах хранения вы можете узнать по адресу http://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence -Изучите создание времён и дат с помощью метода parse данных классов. Используйте форматирование, которое выдавалось системой при возвращении значения в консоль. +# Следующие шаги -![](images/26_.png) +Проведите анализ данных велопарковок на языке Python в интерактивном режиме из Jupyter книги `L1_interactive_bike_analysis_python.ipynb`. -Для задания пользовательского формата считывания вы можете использовать класс DateTimeFormatter. Описание класса и символов шаблона располагается по адресу https://docs.oracle.com/javase/8/docs/api/java/time/format/DateTimeFormatter.html. +Проведите анализ данных велопарковок на языке Scala или Python в неинтерактивном режиме (`--deploy-mode cluster`). Инструкции по созданию и запуску приложений: + - Scala `L1_noninteractive_bike_analysis_scala.md` + - Python `L1_noninteractive_bike_analysis_scala.py` + diff --git a/L1 - Introduction to Apache Spark/L1_interactive_bike_analysis_python.ipynb b/L1 - Introduction to Apache Spark/L1_interactive_bike_analysis_python.ipynb new file mode 100644 index 0000000..df33a48 --- /dev/null +++ b/L1 - Introduction to Apache Spark/L1_interactive_bike_analysis_python.ipynb @@ -0,0 +1,1012 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "848f4d20-86d3-44e8-a864-aa09ca222e59", + "metadata": {}, + "source": [ + "# Лабораторная 1. Интерактивный анализ данных велопарковок SF Bay Area Bike Share в Apache Spark\n", + "\n", + "## Описание данных\n", + "\n", + "https://www.kaggle.com/benhamner/sf-bay-area-bike-share\n", + "\n", + "stations.csv схема:\n", + "\n", + "```\n", + "id: station ID number\n", + "name: name of station\n", + "lat: latitude\n", + "long: longitude\n", + "dock_count: number of total docks at station\n", + "city: city (San Francisco, Redwood City, Palo Alto, Mountain View, San Jose)\n", + "installation_date: original date that station was installed. If station was moved, it is noted below.\n", + "```\n", + "\n", + "trips.csv схема:\n", + "\n", + "```\n", + "id: numeric ID of bike trip\n", + "duration: time of trip in seconds\n", + "start_date: start date of trip with date and time, in PST\n", + "start_station_name: station name of start station\n", + "start_station_id: numeric reference for start station\n", + "end_date: end date of trip with date and time, in PST\n", + "end_station_name: station name for end station\n", + "end_station_id: numeric reference for end station\n", + "bike_id: ID of bike used\n", + "subscription_type: Subscriber = annual or 30-day member; Customer = 24-hour or 3-day member\n", + "zip_code: Home zip code of subscriber (customers can choose to manually enter zip at kiosk however data is unreliable)\n", + "```" + ] + }, + { + "cell_type": "code", + "execution_count": 124, + "id": "30002669-3799-4a39-831e-d276a4708f9a", + "metadata": {}, + "outputs": [], + "source": [ + "from pyspark import SparkContext, SparkConf " + ] + }, + { + "cell_type": "code", + "execution_count": 125, + "id": "3ed7b961-7879-4937-ac24-11d615e091b8", + "metadata": {}, + "outputs": [], + "source": [ + "conf = SparkConf().setAppName(\"L1_interactive_bike_analysis\").setMaster('yarn')" + ] + }, + { + "cell_type": "code", + "execution_count": 126, + "id": "0da718e8-7ad8-42f1-872f-c8805fd3c41c", + "metadata": {}, + "outputs": [], + "source": [ + "sc = SparkContext(conf=conf)" + ] + }, + { + "cell_type": "code", + "execution_count": 10, + "id": "6f486cc0-d591-43b8-8c92-a50415f3a141", + "metadata": {}, + "outputs": [], + "source": [ + "tripData = sc.textFile(\"trips.csv\")\n", + "# запомним заголовок, чтобы затем его исключить из данных\n", + "tripsHeader = tripData.first()\n", + "trips = tripData.filter(lambda row: row != tripsHeader).map(lambda row: row.split(\",\", -1))\n", + "\n", + "stationData = sc.textFile(\"stations.csv\")\n", + "stationsHeader = stationData.first()\n", + "stations = stationData.filter(lambda row: row != stationsHeader).map(lambda row: row.split(\",\", -1))" + ] + }, + { + "cell_type": "code", + "execution_count": 19, + "id": "bb7a57bc-bbef-452f-97b8-95a5412c6aef", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "[(0, 'id'),\n", + " (1, 'duration'),\n", + " (2, 'start_date'),\n", + " (3, 'start_station_name'),\n", + " (4, 'start_station_id'),\n", + " (5, 'end_date'),\n", + " (6, 'end_station_name'),\n", + " (7, 'end_station_id'),\n", + " (8, 'bike_id'),\n", + " (9, 'subscription_type'),\n", + " (10, 'zip_code')]" + ] + }, + "execution_count": 19, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "list(enumerate(tripsHeader.split(\",\")))" + ] + }, + { + "cell_type": "code", + "execution_count": 20, + "id": "537d7ba4-0a47-4041-93f5-71148fa39821", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "[(0, 'id'),\n", + " (1, 'name'),\n", + " (2, 'lat'),\n", + " (3, 'long'),\n", + " (4, 'dock_count'),\n", + " (5, 'city'),\n", + " (6, 'installation_date')]" + ] + }, + "execution_count": 20, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "list(enumerate(stationsHeader.split(\",\")))" + ] + }, + { + "cell_type": "code", + "execution_count": 13, + "id": "9c745434-7d5f-4bc2-8681-4deaf7d764f0", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "[['4576',\n", + " '63',\n", + " '8/29/2013 14:13',\n", + " 'South Van Ness at Market',\n", + " '66',\n", + " '8/29/2013 14:14',\n", + " 'South Van Ness at Market',\n", + " '66',\n", + " '520',\n", + " 'Subscriber',\n", + " '94127'],\n", + " ['4607',\n", + " '70',\n", + " '8/29/2013 14:42',\n", + " 'San Jose City Hall',\n", + " '10',\n", + " '8/29/2013 14:43',\n", + " 'San Jose City Hall',\n", + " '10',\n", + " '661',\n", + " 'Subscriber',\n", + " '95138']]" + ] + }, + "execution_count": 13, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "trips.take(2)" + ] + }, + { + "cell_type": "code", + "execution_count": 14, + "id": "ab452ac6-83e1-4116-91eb-f7eed5cdd60e", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "[['2',\n", + " 'San Jose Diridon Caltrain Station',\n", + " '37.329732',\n", + " '-121.90178200000001',\n", + " '27',\n", + " 'San Jose',\n", + " '8/6/2013'],\n", + " ['3',\n", + " 'San Jose Civic Center',\n", + " '37.330698',\n", + " '-121.888979',\n", + " '15',\n", + " 'San Jose',\n", + " '8/5/2013']]" + ] + }, + "execution_count": 14, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "stations.take(2)" + ] + }, + { + "cell_type": "markdown", + "id": "64f9d459-525c-432b-8715-529a73def432", + "metadata": {}, + "source": [ + "Объявите `stationsIndexed` так, чтобы результатом был список пар ключ-значение с целочисленным ключом из первой колонки. Таким образом вы создаёте индекс на основе первой колонки - номера велостоянки" + ] + }, + { + "cell_type": "code", + "execution_count": 15, + "id": "632d36ea-3112-4466-9b48-618d762e390f", + "metadata": {}, + "outputs": [], + "source": [ + "stationsIndexed = stations.keyBy(lambda station: station[0])" + ] + }, + { + "cell_type": "code", + "execution_count": 17, + "id": "64515328-9ea3-4997-8da4-e0f6102b280f", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "[('2',\n", + " ['2',\n", + " 'San Jose Diridon Caltrain Station',\n", + " '37.329732',\n", + " '-121.90178200000001',\n", + " '27',\n", + " 'San Jose',\n", + " '8/6/2013']),\n", + " ('3',\n", + " ['3',\n", + " 'San Jose Civic Center',\n", + " '37.330698',\n", + " '-121.888979',\n", + " '15',\n", + " 'San Jose',\n", + " '8/5/2013'])]" + ] + }, + "execution_count": 17, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "stationsIndexed.take(2)" + ] + }, + { + "cell_type": "markdown", + "id": "79de40a1-ebf9-408a-826f-ebf95d6490e9", + "metadata": {}, + "source": [ + "Аналогичное действие проделайте для индексирования коллекции trips по колонкам start_station_id и end_station_id и сохраните результат в переменные, например tripsByStartTerminals и tripsByEndTerminals." + ] + }, + { + "cell_type": "code", + "execution_count": 21, + "id": "1ea1aa3e-d972-4a64-b2ef-3e60e21093a4", + "metadata": {}, + "outputs": [], + "source": [ + "tripsByStartTerminals = \n", + "tripsByEndTerminals = " + ] + }, + { + "cell_type": "markdown", + "id": "3bcffb6f-68b0-45bf-9359-f97e4ec281b1", + "metadata": {}, + "source": [ + "Выполните операцию объединения коллекций по ключу с помощью функции join. Объедините stationsIndexed и tripsByStartTerminals, stationsIndexed и tripsByEndTerminals." + ] + }, + { + "cell_type": "code", + "execution_count": 22, + "id": "33248614-5149-4c63-b222-060b5fec4b9d", + "metadata": {}, + "outputs": [], + "source": [ + "startTrips = stationsIndexed.join(tripsByStartTerminals)\n", + "endTrips = stationsIndexed.join(tripsByEndTerminals)" + ] + }, + { + "cell_type": "markdown", + "id": "d8db4683-3f4b-46f3-8705-bcbad0f17e0b", + "metadata": {}, + "source": [ + "Объявление последовательности трансформаций приводит к созданию ацикличного ориентированного графа. Вывести полученный граф можно для любого RDD." + ] + }, + { + "cell_type": "code", + "execution_count": 29, + "id": "d61bec0b-4839-48b6-a8c5-7fbd24b30bf3", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "(4) PythonRDD[33] at RDD at PythonRDD.scala:53 []\n", + " | MapPartitionsRDD[24] at mapPartitions at PythonRDD.scala:145 []\n", + " | ShuffledRDD[23] at partitionBy at NativeMethodAccessorImpl.java:0 []\n", + " +-(4) PairwiseRDD[22] at join at :1 []\n", + " | PythonRDD[21] at join at :1 []\n", + " | UnionRDD[20] at union at NativeMethodAccessorImpl.java:0 []\n", + " | PythonRDD[18] at RDD at PythonRDD.scala:53 []\n", + " | stations.csv MapPartitionsRDD[13] at textFile at NativeMethodAccessorImpl.java:0 []\n", + " | stations.csv HadoopRDD[12] at textFile at NativeMethodAccessorImpl.java:0 []\n", + " | PythonRDD[19] at RDD at PythonRDD.scala:53 []\n", + " | trips.csv MapPartitionsRDD[10] at textFile at NativeMethodAccessorImpl.java:0 []\n", + " | trips.csv HadoopRDD[9] at textFile at NativeMethodAccessorImpl.java:0 []\n" + ] + } + ], + "source": [ + "print(startTrips.toDebugString().decode(\"utf-8\"))" + ] + }, + { + "cell_type": "code", + "execution_count": 30, + "id": "6bd0c246-509d-46e1-aad3-2bbe4679d4df", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "(4) PythonRDD[32] at RDD at PythonRDD.scala:53 []\n", + " | MapPartitionsRDD[31] at mapPartitions at PythonRDD.scala:145 []\n", + " | ShuffledRDD[30] at partitionBy at NativeMethodAccessorImpl.java:0 []\n", + " +-(4) PairwiseRDD[29] at join at :2 []\n", + " | PythonRDD[28] at join at :2 []\n", + " | UnionRDD[27] at union at NativeMethodAccessorImpl.java:0 []\n", + " | PythonRDD[25] at RDD at PythonRDD.scala:53 []\n", + " | stations.csv MapPartitionsRDD[13] at textFile at NativeMethodAccessorImpl.java:0 []\n", + " | stations.csv HadoopRDD[12] at textFile at NativeMethodAccessorImpl.java:0 []\n", + " | PythonRDD[26] at RDD at PythonRDD.scala:53 []\n", + " | trips.csv MapPartitionsRDD[10] at textFile at NativeMethodAccessorImpl.java:0 []\n", + " | trips.csv HadoopRDD[9] at textFile at NativeMethodAccessorImpl.java:0 []\n" + ] + } + ], + "source": [ + "print(endTrips.toDebugString().decode(\"utf-8\"))" + ] + }, + { + "cell_type": "markdown", + "id": "35e6fdec-4552-417a-9a79-5ac3c1604ac6", + "metadata": {}, + "source": [ + "Выполните объявленные графы трансформаций вызовом команды count." + ] + }, + { + "cell_type": "code", + "execution_count": 31, + "id": "b3d7bd21-ab5f-4119-8484-d816725cec3b", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "669959" + ] + }, + "execution_count": 31, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "startTrips.count()" + ] + }, + { + "cell_type": "code", + "execution_count": 32, + "id": "cef28792-2d71-49ef-b390-6d6e9ce7320d", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "669959" + ] + }, + "execution_count": 32, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "endTrips.count()" + ] + }, + { + "cell_type": "markdown", + "id": "14c6b808-f130-4265-bbdc-4c02a7fc32ce", + "metadata": {}, + "source": [ + "Если вы знаете распределение ключей заранее, вы можете выбрать оптимальный способ хеширования ключей по разделам `Partition`. Например, если один ключ встречается на порядки чаще, чем другие ключи, то использование `HashPartitioner` будет не лучшим выбором, так как данные связанные с этим ключом будут собираться в одном разделе. Это приведёт к неравномерной нагрузке на вычислительные ресурсы.\n", + "\n", + "Выбрать определённую реализацию класса распределения по разделам можно с помощью функции RDD `partitionBy`. Например, для RDD `stationsIndexed` выбирается `portable_hash(idx)` с количеством разделов равным количеству разделов trips RDD." + ] + }, + { + "cell_type": "code", + "execution_count": 55, + "id": "2a17ea3a-f109-40e6-8959-48beed7ca672", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "MapPartitionsRDD[39] at mapPartitions at PythonRDD.scala:145" + ] + }, + "execution_count": 55, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "from pyspark.rdd import portable_hash\n", + "\n", + "stationsIndexed.partitionBy(numPartitions=trips.getNumPartitions(), partitionFunc=lambda x: portable_hash(x[0]))" + ] + }, + { + "cell_type": "markdown", + "id": "bfb7f922-ac6b-41f4-a1d6-ecd43b60a3ab", + "metadata": {}, + "source": [ + "Узнать какой класс назначен для текущего RDD можно обращением к полю partitioner." + ] + }, + { + "cell_type": "code", + "execution_count": 58, + "id": "8d4353d6-5db9-4e9f-9d3a-20f85c37fcf9", + "metadata": {}, + "outputs": [], + "source": [ + "stationsIndexed.partitioner" + ] + }, + { + "cell_type": "markdown", + "id": "797aa041-b72e-4b6a-8992-ee7f1a715096", + "metadata": {}, + "source": [ + "## Создание модели данных\n", + "\n", + "Для более эффективной обработки и получения дополнительных возможностей мы можем объявить классы сущностей предметной области и преобразовать исходные строковые данные в объявленное представление.\n", + "\n", + "В Scala часто для объявления структур данных используется конструкция case class. Особенностью такого объявления класса являются: автоматическое создание методов доступа get для аргументов конструктора, автоматическое определение методов hashcode и equals, возможность case классов быть разобранными по шаблону (pattern matching)." + ] + }, + { + "cell_type": "code", + "execution_count": 62, + "id": "84aa084c-8e7a-427b-9bd9-1b7e106c1ea2", + "metadata": {}, + "outputs": [], + "source": [ + "from typing import NamedTuple\n", + "from datetime import datetime" + ] + }, + { + "cell_type": "code", + "execution_count": 63, + "id": "d92a6f7e-b57a-4f79-836d-07d5e3286012", + "metadata": {}, + "outputs": [], + "source": [ + "def initStation(stations):\n", + " class Station(NamedTuple):\n", + " station_id: int\n", + " name: str\n", + " lat: float\n", + " long: float\n", + " dockcount: int\n", + " landmark: str\n", + " installation: str\n", + " \n", + " for station in stations:\n", + " yield Station(\n", + " station_id = int(station[0]),\n", + " name = station[1],\n", + " lat = float(station[2]),\n", + " long = float(station[3]),\n", + " dockcount = int(station[4]),\n", + " landmark = station[5],\n", + " installation = datetime.strptime(station[6], '%m/%d/%Y')\n", + " )" + ] + }, + { + "cell_type": "code", + "execution_count": 71, + "id": "a09036d1-2ba1-4184-86af-41ea7505a36b", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "Station(station_id=2, name='San Jose Diridon Caltrain Station', lat=37.329732, long=-121.90178200000001, dockcount=27, landmark='San Jose', installation=datetime.datetime(2013, 8, 6, 0, 0))" + ] + }, + "execution_count": 71, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "stationsInternal = stations.mapPartitions(initStation)\n", + "stationsInternal.first()" + ] + }, + { + "cell_type": "code", + "execution_count": 80, + "id": "9f728f0d-86ee-4d9e-9292-31ca3c1fae35", + "metadata": {}, + "outputs": [], + "source": [ + "def initTrip(trips):\n", + " class Trip(NamedTuple):\n", + " trip_id: int\n", + " duration: int\n", + " start_date: datetime\n", + " start_station_name: str\n", + " start_station_id: int\n", + " end_date: datetime\n", + " end_station_name: str\n", + " end_station_id: int\n", + " bike_id: int\n", + " subscription_type: str\n", + " zip_code: str\n", + " \n", + " for trip in trips:\n", + " yield Trip( \n", + " trip_id = int(trip[0]),\n", + " duration = int(trip[1]),\n", + " start_date = datetime.strptime(trip[2], '%m/%d/%Y %H:%M'),\n", + " start_station_name = trip[3],\n", + " start_station_id = int(trip[4]),\n", + " end_date = datetime.strptime(trip[5], '%m/%d/%Y %H:%M'),\n", + " end_station_name = trip[6],\n", + " end_station_id = trip[7],\n", + " bike_id = int(trip[8]),\n", + " subscription_type = trip[9],\n", + " zip_code = trip[10]\n", + " )" + ] + }, + { + "cell_type": "code", + "execution_count": 81, + "id": "c265426b-f20b-426e-aeba-7295bc797f3b", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "Trip(trip_id=4576, duration=63, start_date=datetime.datetime(2013, 8, 29, 14, 13), start_station_name='South Van Ness at Market', start_station_id=66, end_date=datetime.datetime(2013, 8, 29, 14, 14), end_station_name='South Van Ness at Market', end_station_id='66', bike_id=520, subscription_type='Subscriber', zip_code='94127')" + ] + }, + "execution_count": 81, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "tripsInternal = trips.mapPartitions(initTrip)\n", + "tripsInternal.first()" + ] + }, + { + "cell_type": "markdown", + "id": "111e1fba-9ffe-4a55-81d7-021041f09865", + "metadata": {}, + "source": [ + "Для каждой стартовой станции найдем среднее время поездки. Будем использовать метод groupByKey.\n", + "\n", + "Для этого потребуется преобразовать trips RDD в RDD коллекцию пар ключ-значение аналогично тому, как мы совершали это ранее методом keyBy." + ] + }, + { + "cell_type": "code", + "execution_count": 82, + "id": "5f5cf9b5-b470-4042-8c46-15d1afa22527", + "metadata": {}, + "outputs": [], + "source": [ + "tripsByStartStation = tripsInternal.keyBy(lambda trip: trip.start_station_name)" + ] + }, + { + "cell_type": "markdown", + "id": "efcfc45a-465f-4e58-a55a-98069804d13f", + "metadata": {}, + "source": [ + "Рассчитаем среднее время поездки для каждого стартового парковочного места" + ] + }, + { + "cell_type": "code", + "execution_count": 85, + "id": "106a733b-540b-4830-a781-b7bd91fe8785", + "metadata": {}, + "outputs": [], + "source": [ + "import numpy as np\n", + "\n", + "avgDurationByStartStation = tripsByStartStation\\\n", + " .mapValues(lambda trip: trip.duration)\\\n", + " .groupByKey()\\\n", + " .mapValues(lambda trip_durations: np.mean(list(trip_durations)))" + ] + }, + { + "cell_type": "markdown", + "id": "b8facf2f-77f1-45b2-bf12-b57b95886bfc", + "metadata": {}, + "source": [ + "Выведем первые 10 результатов" + ] + }, + { + "cell_type": "code", + "execution_count": 94, + "id": "3b8c0ad3-e939-4d6f-916c-1b558e50d17f", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "CPU times: user 6.79 ms, sys: 5.3 ms, total: 12.1 ms\n", + "Wall time: 127 ms\n" + ] + }, + { + "data": { + "text/plain": [ + "[('University and Emerson', 7090.239417989418),\n", + " ('California Ave Caltrain Station', 4628.005847953216),\n", + " ('Redwood City Public Library', 4579.234741784037),\n", + " ('Park at Olive', 4438.1613333333335),\n", + " ('San Jose Civic Center', 4208.016938519448),\n", + " ('Rengstorff Avenue / California Street', 4174.082373782108),\n", + " ('Redwood City Medical Center', 3959.491961414791),\n", + " ('Palo Alto Caltrain Station', 3210.6489815253435),\n", + " ('San Mateo County Center', 2716.7700348432054),\n", + " ('Broadway at Main', 2481.2537313432836)]" + ] + }, + "execution_count": 94, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "%%time\n", + "\n", + "avgDurationByStartStation.top(10, key=lambda x: x[1])" + ] + }, + { + "cell_type": "markdown", + "id": "8acf13df-5033-4397-be08-5524b9d01c50", + "metadata": {}, + "source": [ + "Выполнение операции groupByKey приводит к интенсивным передачам данных. Если группировка делается для последующей редукции элементов лучше использовать трансформацию reduceByKey или aggregateByKey. Их выполнение приведёт сначала к локальной редукции над разделом Partition, а затем будет произведено окончательное суммирование над полученными частичными суммами.\n", + "\n", + "*Примечание.* Выполнение reduceByKey логически сходно с выполнением Combine и Reduce фазы MapReduce работы.\n", + "\n", + "Функция aggregateByKey является аналогом reduceByKey с возможностью указывать начальный элемент.\n", + "\n", + "Рассчитаем среднее значение с помощью aggregateByKey. Одновременно будут вычисляться два значения для каждого стартового терминала: сумма времён и количество поездок." + ] + }, + { + "cell_type": "code", + "execution_count": 88, + "id": "53c746c5-ee60-42b9-a1f3-465bbbdaa02b", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "\u001b[0;31mSignature:\u001b[0m\n", + " \u001b[0mtripsByStartStation\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0maggregateByKey\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m\u001b[0m\n", + "\u001b[0;34m\u001b[0m \u001b[0mzeroValue\u001b[0m\u001b[0;34m,\u001b[0m\u001b[0;34m\u001b[0m\n", + "\u001b[0;34m\u001b[0m \u001b[0mseqFunc\u001b[0m\u001b[0;34m,\u001b[0m\u001b[0;34m\u001b[0m\n", + "\u001b[0;34m\u001b[0m \u001b[0mcombFunc\u001b[0m\u001b[0;34m,\u001b[0m\u001b[0;34m\u001b[0m\n", + "\u001b[0;34m\u001b[0m \u001b[0mnumPartitions\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0;32mNone\u001b[0m\u001b[0;34m,\u001b[0m\u001b[0;34m\u001b[0m\n", + "\u001b[0;34m\u001b[0m \u001b[0mpartitionFunc\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0;34m<\u001b[0m\u001b[0mfunction\u001b[0m \u001b[0mportable_hash\u001b[0m \u001b[0mat\u001b[0m \u001b[0;36m0x7fb9a437c310\u001b[0m\u001b[0;34m>\u001b[0m\u001b[0;34m,\u001b[0m\u001b[0;34m\u001b[0m\n", + "\u001b[0;34m\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n", + "\u001b[0;31mDocstring:\u001b[0m\n", + "Aggregate the values of each key, using given combine functions and a neutral\n", + "\"zero value\". This function can return a different result type, U, than the type\n", + "of the values in this RDD, V. Thus, we need one operation for merging a V into\n", + "a U and one operation for merging two U's, The former operation is used for merging\n", + "values within a partition, and the latter is used for merging values between\n", + "partitions. To avoid memory allocation, both of these functions are\n", + "allowed to modify and return their first argument instead of creating a new U.\n", + "\u001b[0;31mFile:\u001b[0m ~/.local/lib/python3.9/site-packages/pyspark/rdd.py\n", + "\u001b[0;31mType:\u001b[0m method\n" + ] + }, + "metadata": {}, + "output_type": "display_data" + } + ], + "source": [ + "? tripsByStartStation.aggregateByKey" + ] + }, + { + "cell_type": "code", + "execution_count": 90, + "id": "fcdafcfb-ed3c-4c18-9051-44cf3632625c", + "metadata": {}, + "outputs": [], + "source": [ + "def seqFunc(acc, duration):\n", + " duration_sum, count = acc\n", + " return (duration_sum + duration, count + 1)\n", + "\n", + "def combFunc(acc1, acc2):\n", + " duration_sum1, count1 = acc1\n", + " duration_sum2, count2 = acc2\n", + " return (duration_sum1+duration_sum2, count1+count2)\n", + "\n", + "def meanFunc(acc):\n", + " duration_sum, count = acc\n", + " return duration_sum/count\n", + "\n", + "avgDurationByStartStation2 = tripsByStartStation\\\n", + " .mapValues(lambda trip: trip.duration)\\\n", + " .aggregateByKey(\n", + " zeroValue=(0,0),\n", + " seqFunc=seqFunc,\n", + " combFunc=combFunc)\\\n", + " .mapValues(meanFunc)" + ] + }, + { + "cell_type": "markdown", + "id": "71665ff6-6ce3-48b1-a183-1d02b9c2e80e", + "metadata": {}, + "source": [ + "В `zeroValue` передаётся начальное значение. В нашем случае это пара нулей. Первая функция `seqFunc` предназначена для прохода по коллекции партиции. На этом проходе значение элементов помещаются средой в переменную duration, а переменная «аккумулятора» acc накапливает значения. Вторая функция `combFunc` предназначена для этапа редукции частично посчитанных локальных результатов.\n", + "\n", + "Сравните результаты `avgDurationByStartStation` и `avgDurationByStartStation2` и их время выполнения." + ] + }, + { + "cell_type": "code", + "execution_count": 95, + "id": "c69d5013-f26e-4d6b-8fb6-277390a0f267", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "CPU times: user 8.47 ms, sys: 3.23 ms, total: 11.7 ms\n", + "Wall time: 65.6 ms\n" + ] + }, + { + "data": { + "text/plain": [ + "[('University and Emerson', 7090.239417989418),\n", + " ('California Ave Caltrain Station', 4628.005847953216),\n", + " ('Redwood City Public Library', 4579.234741784037),\n", + " ('Park at Olive', 4438.1613333333335),\n", + " ('San Jose Civic Center', 4208.016938519448),\n", + " ('Rengstorff Avenue / California Street', 4174.082373782108),\n", + " ('Redwood City Medical Center', 3959.491961414791),\n", + " ('Palo Alto Caltrain Station', 3210.6489815253435),\n", + " ('San Mateo County Center', 2716.7700348432054),\n", + " ('Broadway at Main', 2481.2537313432836)]" + ] + }, + "execution_count": 95, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "%%time\n", + "\n", + "avgDurationByStartStation2.top(10, key=lambda x: x[1])" + ] + }, + { + "cell_type": "markdown", + "id": "39ce42ac-d6ed-441d-9236-76fce6b35136", + "metadata": {}, + "source": [ + "Теперь найдём первую поездку для каждой велостоянки. Для решения опять потребуется группировка. Ещё одним недостатком `groupByKey` данных является то, что для группировки данные должны поместиться в оперативной памяти. Это может привести к ошибке `OutOfMemoryException` для больших объёмов данных.\n", + "\n", + "Найдем самую раннюю поездку для каждой станции. Сгруппируем поездки по станциям, возьмём первую поездку из отсортированного списка:" + ] + }, + { + "cell_type": "code", + "execution_count": 115, + "id": "2f37ab32-5e2c-4596-ba09-9d6283e9fd6b", + "metadata": {}, + "outputs": [], + "source": [ + "def earliestTrip(trips):\n", + " if trips is None:\n", + " return None\n", + " if len(trips)==0:\n", + " return trips\n", + " trips = list(trips)\n", + " min_date = trips[0].start_date\n", + " min_trip = trips[0]\n", + " for trip in trips[1:]:\n", + " if min_date > trip.start_date:\n", + " min_date = trip.start_date\n", + " min_trip = trip\n", + " return min_trip\n", + "\n", + "firstGrouped = tripsByStartStation\\\n", + " .groupByKey()\\\n", + " .mapValues(lambda trips: earliestTrip(trips))" + ] + }, + { + "cell_type": "code", + "execution_count": 116, + "id": "7f74e315-c036-4992-9111-dff2d4150183", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "CPU times: user 3.15 ms, sys: 15.3 ms, total: 18.4 ms\n", + "Wall time: 25.3 s\n" + ] + }, + { + "data": { + "text/plain": [ + "[('Market at 4th',\n", + " Trip(trip_id=4134, duration=574, start_date=datetime.datetime(2013, 8, 29, 10, 19), start_station_name='Market at 4th', start_station_id=76, end_date=datetime.datetime(2013, 8, 29, 10, 29), end_station_name='2nd at South Park', end_station_id='64', bike_id=426, subscription_type='Customer', zip_code='94117')),\n", + " ('Market at Sansome',\n", + " Trip(trip_id=4321, duration=505, start_date=datetime.datetime(2013, 8, 29, 12, 10), start_station_name='Market at Sansome', start_station_id=77, end_date=datetime.datetime(2013, 8, 29, 12, 19), end_station_name='Harry Bridges Plaza (Ferry Building)', end_station_id='50', bike_id=625, subscription_type='Subscriber', zip_code='94110')),\n", + " ('San Jose Diridon Caltrain Station',\n", + " Trip(trip_id=4547, duration=1580, start_date=datetime.datetime(2013, 8, 29, 13, 52), start_station_name='San Jose Diridon Caltrain Station', start_station_id=2, end_date=datetime.datetime(2013, 8, 29, 14, 18), end_station_name='San Jose City Hall', end_station_id='10', bike_id=107, subscription_type='Customer', zip_code='94306')),\n", + " ('Howard at 2nd',\n", + " Trip(trip_id=4524, duration=579, start_date=datetime.datetime(2013, 8, 29, 13, 39), start_station_name='Howard at 2nd', start_station_id=63, end_date=datetime.datetime(2013, 8, 29, 13, 48), end_station_name='San Francisco Caltrain (Townsend at 4th)', end_station_id='70', bike_id=358, subscription_type='Subscriber', zip_code='94117')),\n", + " ('San Francisco City Hall',\n", + " Trip(trip_id=4265, duration=151, start_date=datetime.datetime(2013, 8, 29, 11, 40), start_station_name='San Francisco City Hall', start_station_id=58, end_date=datetime.datetime(2013, 8, 29, 11, 42), end_station_name='San Francisco City Hall', end_station_id='58', bike_id=520, subscription_type='Subscriber', zip_code='94110'))]" + ] + }, + "execution_count": 116, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "%%time\n", + "\n", + "firstGrouped.take(5)" + ] + }, + { + "cell_type": "markdown", + "id": "c6020067-3a75-47e8-9bb1-9e36cded29a7", + "metadata": {}, + "source": [ + "Лучшим вариантом с точки зрения эффективности будет использование трансформации `reduceByKey`" + ] + }, + { + "cell_type": "code", + "execution_count": 117, + "id": "750007e2-2706-4997-9b35-a44269b55200", + "metadata": {}, + "outputs": [], + "source": [ + "firstGrouped = tripsByStartStation\\\n", + " .reduceByKey(lambda tripA, tripB: tripA if tripA.start_date < tripB.start_date else tripB)" + ] + }, + { + "cell_type": "code", + "execution_count": 118, + "id": "b98199bd-26dd-4112-87e4-e9505338bd3c", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "CPU times: user 15.5 ms, sys: 2.89 ms, total: 18.4 ms\n", + "Wall time: 16 s\n" + ] + }, + { + "data": { + "text/plain": [ + "[('Market at 4th',\n", + " Trip(trip_id=4134, duration=574, start_date=datetime.datetime(2013, 8, 29, 10, 19), start_station_name='Market at 4th', start_station_id=76, end_date=datetime.datetime(2013, 8, 29, 10, 29), end_station_name='2nd at South Park', end_station_id='64', bike_id=426, subscription_type='Customer', zip_code='94117')),\n", + " ('Market at Sansome',\n", + " Trip(trip_id=4320, duration=520, start_date=datetime.datetime(2013, 8, 29, 12, 10), start_station_name='Market at Sansome', start_station_id=77, end_date=datetime.datetime(2013, 8, 29, 12, 19), end_station_name='Harry Bridges Plaza (Ferry Building)', end_station_id='50', bike_id=616, subscription_type='Subscriber', zip_code='94109')),\n", + " ('San Jose Diridon Caltrain Station',\n", + " Trip(trip_id=4547, duration=1580, start_date=datetime.datetime(2013, 8, 29, 13, 52), start_station_name='San Jose Diridon Caltrain Station', start_station_id=2, end_date=datetime.datetime(2013, 8, 29, 14, 18), end_station_name='San Jose City Hall', end_station_id='10', bike_id=107, subscription_type='Customer', zip_code='94306')),\n", + " ('Howard at 2nd',\n", + " Trip(trip_id=4525, duration=650, start_date=datetime.datetime(2013, 8, 29, 13, 39), start_station_name='Howard at 2nd', start_station_id=63, end_date=datetime.datetime(2013, 8, 29, 13, 50), end_station_name='San Francisco Caltrain (Townsend at 4th)', end_station_id='70', bike_id=359, subscription_type='Subscriber', zip_code='94401')),\n", + " ('San Francisco City Hall',\n", + " Trip(trip_id=4265, duration=151, start_date=datetime.datetime(2013, 8, 29, 11, 40), start_station_name='San Francisco City Hall', start_station_id=58, end_date=datetime.datetime(2013, 8, 29, 11, 42), end_station_name='San Francisco City Hall', end_station_id='58', bike_id=520, subscription_type='Subscriber', zip_code='94110'))]" + ] + }, + "execution_count": 118, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "%%time\n", + "\n", + "firstGrouped.take(5)" + ] + }, + { + "cell_type": "code", + "execution_count": 123, + "id": "c98261f7-283c-4c7e-b915-3778ff972f5f", + "metadata": {}, + "outputs": [], + "source": [ + "sc.stop()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "e8bc4000-9d08-408a-b037-fbd9796ed459", + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.9.5" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/L1 - Introduction to Apache Spark/L1_noninteractive_bike_analysis_python.py b/L1 - Introduction to Apache Spark/L1_noninteractive_bike_analysis_python.py new file mode 100644 index 0000000..a4f85b3 --- /dev/null +++ b/L1 - Introduction to Apache Spark/L1_noninteractive_bike_analysis_python.py @@ -0,0 +1,132 @@ +# +# Лабораторная 1. Неинтерактивный анализ данных велопарковок SF Bay Area Bike Share в Apache Spark +# +# Для каждой стартовой станции найдем среднее время поездки и самую первую поездку. +# +# Команда для запуска +# spark-submit --master yarn --deploy-mode cluster L1_noninteractive_bike_analysis.py +# +# используйте --conf spark.yarn.submit.waitAppCompletion=true, чтобы ждать завершения выполнения спарк задачи +# + +from pyspark import SparkContext, SparkConf +from typing import NamedTuple +from datetime import datetime +import numpy as np + +conf = SparkConf().setAppName("Lab1_Script") +# conf.set("spark.yarn.submit.waitAppCompletion", "false") + +sc = SparkContext(conf=conf) + +tripData = sc.textFile("trips.csv") +# запомним заголовок, чтобы затем его исключить из данных +tripsHeader = tripData.first() +trips = tripData.filter(lambda row: row != tripsHeader).map(lambda row: row.split(",", -1)) + +stationData = sc.textFile("stations.csv") +stationsHeader = stationData.first() +stations = stationData.filter(lambda row: row != stationsHeader).map(lambda row: row.split(",", -1)) + +def initStation(stations): + class Station(NamedTuple): + station_id: int + name: str + lat: float + long: float + dockcount: int + landmark: str + installation: str + + for station in stations: + yield Station( + station_id = int(station[0]), + name = station[1], + lat = float(station[2]), + long = float(station[3]), + dockcount = int(station[4]), + landmark = station[5], + installation = datetime.strptime(station[6], '%m/%d/%Y') + ) + +def initTrip(trips): + class Trip(NamedTuple): + trip_id: int + duration: int + start_date: datetime + start_station_name: str + start_station_id: int + end_date: datetime + end_station_name: str + end_station_id: int + bike_id: int + subscription_type: str + zip_code: str + + for trip in trips: + yield Trip( + trip_id = int(trip[0]), + duration = int(trip[1]), + start_date = datetime.strptime(trip[2], '%m/%d/%Y %H:%M'), + start_station_name = trip[3], + start_station_id = int(trip[4]), + end_date = datetime.strptime(trip[5], '%m/%d/%Y %H:%M'), + end_station_name = trip[6], + end_station_id = trip[7], + bike_id = int(trip[8]), + subscription_type = trip[9], + zip_code = trip[10] + ) + +stationsInternal = stations.mapPartitions(initStation) +tripsInternal = trips.mapPartitions(initTrip) + +tripsByStartStation = tripsInternal.keyBy(lambda trip: trip.start_station_name) + +# среднее время поездок стартовой станции + +def seqFunc(acc, duration): + duration_sum, count = acc + return (duration_sum + duration, count + 1) + +def combFunc(acc1, acc2): + duration_sum1, count1 = acc1 + duration_sum2, count2 = acc2 + return (duration_sum1+duration_sum2, count1+count2) + +def meanFunc(acc): + duration_sum, count = acc + return duration_sum/count + +avgDurationByStartStation = tripsByStartStation\ + .mapValues(lambda trip: trip.duration)\ + .aggregateByKey( + zeroValue=(0,0), + seqFunc=seqFunc, + combFunc=combFunc)\ + .mapValues(meanFunc)\ + .sortBy(lambda x: x[1], ascending=False) + +# +# первые поездки стартовых станций +# + +firstStationsTrip = tripsByStartStation\ + .reduceByKey(lambda tripA, tripB: tripA if tripA.start_date < tripB.start_date else tripB) + +# +# старт выполнения задач и сохранение результата в HDFS +# + +avgDurationByStartStation.saveAsTextFile("avg_duration_of_start_stations") +firstStationsTrip.saveAsTextFile("first_station_trips") + +# результаты сохранятся в директории распределённой файловой системы в домашней папке пользователя /user/$USER/: +# - avg_duration_of_start_stations +# - first_station_trips +# +# получить результаты в виде одного файла можно командой `hadoop fs -getmerge`: +# hadoop fs -getmerge avg_duration_of_start_stations avg_duration_of_start_stations.txt +# hadoop fs -getmerge first_station_trips first_station_trips.txt + +sc.stop() \ No newline at end of file diff --git a/L1 - Introduction to Apache Spark/L1_noninteractive_bike_analysis_scala.md b/L1 - Introduction to Apache Spark/L1_noninteractive_bike_analysis_scala.md new file mode 100644 index 0000000..63b4a0b --- /dev/null +++ b/L1 - Introduction to Apache Spark/L1_noninteractive_bike_analysis_scala.md @@ -0,0 +1,465 @@ + +# Создание локального проекта + +Разработка программы на Spark может быть выполнена на нескольких языках: Python, R, Scala, Java. В данном руководстве рассмотрим разработку на последних двух, так как они имеют самую полную поддержку API обработки данных. + +Разработка приложения может производиться в любом текстовом редакторе, затем быть собрана системой сборки в отдельное приложение и запущена на Spark кластере с помощью консольной команды spark-submit. + +В данной лабораторной работе мы будем использовать IntelliJ IDEA. IDE предоставляет набор возможностей для упрощения разработки: автодополнение, индексация проекта, статическая проверка кода, подсветка синтаксиса, интеграция с системами контроля версий и системами сборки. + +Для работы необходима установленная последняя версия IDE IntelliJ IDEA Community Edition. Данная среда разработки доступа для скачивания по адресу https://www.jetbrains.com/idea/. + +Для создания проекта в IntelliJ IDEA запустите среду разработки, выберите Create New Project. + +![](images/8_.png) + +## Разработка с использованием системы сборки SBT на языке Scala + +Для создания Scala + SBT проекта выберите слева в меню Scala и затем SBT. + +![](images/9_.png) + +Далее укажите имя проекта, версию Java, версию SBT и версию Scala компилятора. Для разработки на Spark 2.4.0 рекомендуется выбрать версию Scala 2.12.9. + +*Примечание.* Установите флаг Use auto-import для того, чтобы не обновлять зависимости вручную при изменениях в проекте. + +![](images/10_.png) + +После нажатия Finish откроется главное окно среды разработки. + +![](images/11_.png) + +Подождите, когда SBT скачает зависимости. + + +В дереве проекта должен появиться файл `build.sbt`, являющийся основным файлом для настройки сборки и указания зависимостей проекта SBT. В файле на момент создания указаны: имя проекта, версия проекта, версия языка Scala. + +*Примечание.* Появление предупреждений о конфликте имён в SBT 0.13.8 является известной ошибкой https://github.com/sbt/sbt/issues/1933. Одно из решений — использование более ранней версии или скрытие всех предупреждений установкой степени логирования `logLevel := Level.Error`. + + +Код Scala помещается в папку `src/main/scala` или `src/main/scala-2.12`. + +Создайте в папке scala объект Main c методом main. Данный метод будет точкой входа в программу. + +![](images/12_.png) +![](images/13_.png) + +*Примечание.* Аналогом объекта object в Java является паттерн Singleton. Выполнения тела объекта происходит при его загрузке в память, аналогично инициализации в конструкторе, методы object доступны без создания объекта оператором new, аналогично публичным статическим методам. + +```scala +object Main { + def main(args: Array[String]) { + println("Hello world") + } +} +``` + +В контекстном меню выберите Run 'Main', либо нажмите сочетание клавиш Ctrl+Shift+F10. + +![](images/14_.png) + +После выполнения в консоли должно появиться приветствие. + +![](images/15_.png) + +Добавьте к проекту зависимость Spark (версия на MapR кластере), записав следующие строки в конце файла build.sbt: + +```scala +libraryDependencies ++= Seq( + "org.apache.spark" %% "spark-core" % "2.4.0" +) +``` + +Сохраните изменения и обновите проект. + +![](images/16_.png) + +Подождите, когда SBT скачает все зависимые библиотеки. + +Измените код `Main.scala` и создайте простейшую Spark программу. Импортируйте классы пакета `org.apache.spark`. + +```scala +import org.apache.spark._ +``` + +Создайте конфигурацию Spark с помощью класса SparkConf. Укажите обязательные параметры: имя запускаемой задачи (имя контекста задачи) и режим запуска (список режимов http://spark.apache.org/docs/latest/submitting-applications.html#master-urls). В нашем случае в качестве режима будет указан параметр `local[2]`, означающий запуск с двумя потоками на локальной машине. В качестве режима может быть указан адрес главного узла. + +```scala +val cfg = new SparkConf() + .setAppName("Test").setMaster("local[2]") +``` + +*Примечание.* В Scala различаются два вида переменных: `val` и `var`. Переменные `val` являются неизменяемыми и инициализируются один раз, в отличие от `var`, которой можно присваивать новые значения несколько раз. + +Инициализируйте контекст Spark в главном методе. + +```scala +val sc = new SparkContext(cfg) +``` + +Добавьте в конец файла команду остановки контекста + +```scala +sc.stop() +``` + +После инициализации контекста вы можете обращаться к командам Spark. Считайте любой текстовый файл из локальной файловой системы и выведите его по строкам в консоль. + +*Примечание.* Путь к файлу в локальной файловой системе имеет определённый формат, имеющий префикс "file:///". https://tools.ietf.org/html/rfc8089 + +```scala +val textFile = sc.textFile("file:///c:/temp/file.txt") +textFile.foreach(println) +``` + +![](images/17_.png) + +*Примечание.* При работе без winutils.exe запись в файловую систему будет порождать ошибку. Известным решением является скачивание данного файла из проекта Hadoop в файловую систему в папку с названием bin и указанием переменной Spark `hadoop.home.dir`. В переменной `hadoop.home.dir` хранится путь к папке c Hadoop определённой версии. Установить переменную среды JVM вы можете кодом `System`.`setProperty(key, value)` . Другим решением проблемы является установка переменной среды `HADOOP_HOME` (потребуется перезапуск IDE). https://issues.apache.org/jira/browse/SPARK-2356. + +## Анализ данных велопарковок + +Тестовыми данными являются список поездок на велосипедах `trips.csv` и список велостоянок проката велосипедов `stations.csv`. + + ![](images/18_.png) + +Создайте по одному RDD на основе каждого файла `stations.csv`, `trips.csv`. Считайте данные в переменную, затем запомните заголовок. Объявите новую переменную с данными, в которых не будет заголовка, а строки преобразованы в массивы строк в соответствии с разделителем — запятая. + +*Примечание.* Существует более эффективный, но громоздкий способ исключить заголовок из данных с использованием метода mapPartitionWithIndex. Пример присутствует в первой части лабораторной работы в разделе нахождения десяти популярных номеров такси. + +```scala +val tripData = sc.textFile("file:///z:/data/trips.csv") + +// запомним заголовок, чтобы затем его исключить +val tripsHeader = tripData.first +val trips = tripData.filter(row=>row!=tripsHeader).map(row=>row.split(",",-1)) + +val stationData = sc.textFile("file:///z:/data/stations.csv") +val stationsHeader = stationData.first +val stations = stationData.filter(row=>row!=stationsHeader).map(row=>row.split(",",-1)) +``` + +*Примечание.* Использование в качестве второго параметра -1 в `row.split(",",-1)` позволяет не отбрасывать пустые строки. Например `"1,2,".split(",")` вернёт `Array("1","2")`, а `"1,2,".split(",",-1)` вернёт `Array("1","2","")`. + +Выведите заголовки таблиц и изучите колонки csv файлов. + +```scala +stationsHeader +tripsHeader +``` + +Выведите несколько элементов данных в trips и stations. + +*Примечание.* Убрать информационные строки логов из выдачи можно следующим образом: + +```scala +import org.apache.log4j.{Logger, Level} +Logger.getLogger("org.apache.spark").setLevel(Level.WARN) +Logger.getLogger("org.spark-project").setLevel(Level.WARN) +``` + +Объявите `stationsIndexed` так, чтобы результатом был список пар ключ-значение с целочисленным ключом из первой колонки. Таким образом вы создаёте индекс на основе первой колонки - номера велостоянки + +```scala +val stationsIndexed = stations.keyBy(row=>row(0).toInt) +``` + +*Примечание.* Обращение к массиву в Scala производится в круглых скобках. Например Array(1,2,3)(0) вернёт первый элемент. + +Выведите часть данных нового RDD. + +Аналогичное действие проделайте для индексирования коллекции trips по колонкам Start Terminal и End Terminal и сохраните результат в переменные, например tripsByStartTerminals и tripsByEndTerminals. + +Выполните операцию объединения коллекций по ключу с помощью функции join. Объедините stationsIndexed и tripsByStartTerminals, stationsIndexed и tripsByEndTerminals. + +```scala +val startTrips = stationsIndexed.join(tripsByStartTerminals) +val endTrips = stationsIndexed.join(tripsByEndTerminals) +``` + +Объявление последовательности трансформаций приводит к созданию ацикличного ориентированного графа. Вывести полученный граф можно для любого RDD. + +```scala +startTrips.toDebugString +endTrips.toDebugString +``` + +![](images/19_.png) + + +Выполните объявленные графы трансформаций вызовом команды count. + +```scala +startTrips.count() +endTrips.count() +``` + +Если вы знаете распределение ключей заранее, вы можете выбрать оптимальный способ хеширования ключей по разделам `Partition`. Например, если один ключ встречается на порядки чаще, чем другие ключи, то использование `HashPartitioner` будет не лучшим выбором, так как данные связанные с этим ключом будут собираться в одном разделе. Это приведёт к неравномерной нагрузке на вычислительные ресурсы. + +Выбрать определённую реализацию класса распределения по разделам можно с помощью функции RDD `partitionBy`. Например, для RDD `stationsIndexed` выбирается `HashPartitioner` с количеством разделов равным количеству разделов trips RDD. + +```scala +stationsIndexed.partitionBy(new HashPartitioner(trips.partitions.size)) +``` + +Также можно создавать свои классы для распределения ключей. Узнать какой класс назначен для текущего RDD можно обращением к полю partitioner. + +```scala +stationsIndexed.partitioner +``` + +## Создание модели данных + +Для более эффективной обработки и получения дополнительных возможностей мы можем объявить классы сущностей предметной области и преобразовать исходные строковые данные в объявленное представление. + +В Scala часто для объявления структур данных используется конструкция case class. Особенностью такого объявления класса являются: автоматическое создание методов доступа get для аргументов конструктора, автоматическое определение методов hashcode и equals, возможность case классов быть разобранными по шаблону (pattern matching). Например, для определения + +```scala +case class IntNumber(val value:Integer) +``` + +выполнение + +```scala +new IntNumber(4).value +``` + +вернёт значение 4. + +Объявите case классы для представления строк таблиц в соответствии с именами заголовков. + +```scala +case class Station( + stationId:Integer, + name:String, + lat:Double, + long:Double, + dockcount:Integer, + landmark:String, + installation:String, + notes:String) + +case class Trip( + tripId:Integer, + duration:Integer, + startDate:LocalDateTime, + startStation:String, + startTerminal:Integer, + endDate:LocalDateTime, + endStation:String, + endTerminal:Integer, + bikeId: Integer, + subscriptionType: String, + zipCode: String) +``` + +Для конвертации времени будем использовать пакет java.time. Краткое введение в работу с пакетом находится в Приложении Б. Объявим формат данных. + +```scala +val timeFormat = DateTimeFormatter.ofPattern("M/d/yyyy H:m") +``` + +Объявим trips с учётом преобразования во внутреннее представление. + +```scala +val tripsInternal = trips.mapPartitions(rows => { + val timeFormat = DateTimeFormatter.ofPattern("M/d/yyyy H:m") + rows.map( row => + new Trip(tripId=row(0).toInt, + duration=row(1).toInt, + startDate= LocalDate.parse(row(2), timeFormat), + startStation=row(3), + startTerminal=row(4).toInt, + endDate=LocalDate.parse(row(5), timeFormat), + endStation=row(6), + endTerminal=row(7).toInt, + bikeId=row(8).toInt, + subscriptionType=row(9), + zipCode=row(10)))}) +``` + +Изучите полученные данные. Например, вызовом следующих команд: + +```scala +tripsInternal.first +tripsInternal.first.startDate +``` + +*Примечание.* В связи с тем, что timeFormat содержит несериализуемый объект, его необходимо создавать на каждом узле для каждой партиции. + +То же можно проделать и для station RDD + +```scala +val stationsInternal = stations.map(row=> + new Station(stationId=row(0).toInt, + name=row(1), + lat=row(2).toDouble, + long=row(3).toDouble, + dockcount=row(4).toInt, + landmark=row(5), + installation=row(6) + notes=null)) +``` + +*Примечание.* Восьмая колонка не присутствует в таблице, так как в данных она пустая. Если в будущем она не будет использоваться, имеет смысл её убрать из описания case класса. + +*Примечание.* В данных присутствуют различные форматы времени. + +Посчитаем среднее время поездки, используя groupByKey. + +Для этого потребуется преобразовать trips RDD в RDD коллекцию пар ключ-значение аналогично тому, как мы совершали это ранее методом keyBy. + +```scala +val tripsByStartStation = tripsInternal.keyBy(record => record.startStation) +``` + +Рассчитаем среднее время поездки для каждого стартового парковочного места + +```scala +val avgDurationByStartStation = tripsByStartStation + .mapValues(x=>x.duration) + .groupByKey() + .mapValues(col=>col.reduce((a,b)=>a+b)/col.size) +``` + +Выведем первые 10 результатов + +```scala +avgDurationByStartStation.take(10).foreach(println) +``` + +Выполнение операции groupByKey приводит к интенсивным передачам данных. Если группировка делается для последующей редукции элементов лучше использовать трансформацию reduceByKey или aggregateByKey. Их выполнение приведёт сначала к локальной редукции над разделом Partition, а затем будет произведено окончательное суммирование над полученными частичными суммами. + +*Примечание.* Выполнение reduceByKey логически сходно с выполнением Combine и Reduce фазы MapReduce работы. + +Функция aggregateByKey является аналогом reduceByKey с возможностью указывать начальный элемент. + +Рассчитаем среднее значение с помощью aggregateByKey. Одновременно будут вычисляться два значения для каждого стартового терминала: сумма времён и количество поездок. + +```scala +val avgDurationByStartStation2 = tripsByStartStation + .mapValues(x=>x.duration) + .aggregateByKey((0,0))( + (acc, value) => (acc._1 + value, acc._2 + 1), + (acc1, acc2) => (acc1._1+acc2._1, acc1._2+acc2._2)) + .mapValues(acc=>acc._1/acc._2) +``` + +В первых скобках передаётся начальное значение. В нашем случае это пара нулей. Первая анонимная функция предназначена для прохода по коллекции раздела. На этом проходе значение элементов помещаются средой в переменную value, а переменная «аккумулятора» acc накапливает значения. Вторая анонимная функция предназначена для этапа редукции частично посчитанных локальных результатов. + +Сравните результаты `avgDurationByStartStation` и `avgDurationByStartStation2` и их время выполнения. + +Теперь найдём первую поездку для каждой велостоянки. Для решения опять потребуется группировка. Ещё одним недостатком `groupByKey` данных является то, что для группировки данные должны поместиться в оперативной памяти. Это может привести к ошибке `OutOfMemoryException` для больших объёмов данных. + +Сгруппируем поездки по велостоянкам и отсортируем поездки в группах по возрастанию даты. + +```scala +val firstGrouped = tripsByStartStation + .groupByKey() + .mapValues(x => + x.toList.sortWith((trip1, trip2) => + trip1.startDate.compareTo(trip2.startDate)<0)) +``` + +![](images/20_.png) + +Лучшим вариантом с точки зрения эффективности будет использование трансформации reduceByKey + +```scala +val firstGrouped = tripsByStartStation + .reduceByKey((trip1,trip2) => + if (trip1.startDate.compareTo(trip2.startDate)<0) + trip1 else trip2) +``` + +В данном случае «передаваться дальше» будет меньшее из значений ключа. + +![](images/21_.png) + +## Задачи: + +1. Найти велосипед с максимальным пробегом. +2. Найти наибольшее расстояние между станциями. +3. Найти путь велосипеда с максимальным пробегом через станции. +4. Найти количество велосипедов в системе. +5. Найти пользователей потративших на поездки более 3 часов. + +## Запуск проекта в кластере + +Для запуска собранного проекта на сервере используйте команду `spark-submit`. Однако прежде чем собрать проект, необходимо его изменить, так как в данный момент в коде зашиты пути к файлам с данными в локальной системе и режим запуска (`setMaster(“local[2]“)`). + +Параметризуйте эти значения аргументами передаваемыми в программу при запуске. + +```scala +val Seq(masterURL, tripDataPath, stationDataPath) = args.toSeq +val cfg = new SparkConfig().setAppName("Test").setMaster(masterURL) + +val tripData = sc.textFile(tripDataPath) +val stationData = sc.textFile(stationDataPath) +``` + +В конфигурации запуска добавьте значения аргументов: + + +Проверьте, что проект работает на локальном компьютере. + +Соберите JAR с помощью sbt команды package. Файл появится в директории `target/scala-2.12`. Скопируйте его на сервер с помощью `scp` и запустите. + +``` +$ spark-submit --deploy-mode cluster untitled4_2.11-0.1.jar yarn /labs/lab1/trips.csv /labs/lab1/stations.csv +``` + +Логи YARN контейнеров вы можете найти в директории `/mapr/tmp/studX/`. Проверьте, что выдача вашей программы на сервере идентична выдаче в IDE при запуске на локальном компьютере. + + +# Приложение А +## Краткое описание файловой системы HDFS + +HDFS — распределенная файловая система, используемая в проекте Hadoop. HDFS-кластер в первую очередь состоит из NameNоde-сервера и DataNode-серверов, которые хранят данные. NameNode-сервер управляет пространством имен файловой системы и доступом клиентов к данным. Чтобы разгрузить NameNode-сервер, передача данных осуществляется только между клиентом и DataNode-сервером. + + +Развёртывание экземпляра HDFS предусматривает наличие центрального узла имён (англ. name node), хранящего метаданные файловой системы и метаинформацию о распределении блоков, и серии узлов данных (англ. data node), непосредственно хранящих блоки файлов. Узел имён отвечает за обработку операций уровня файлов и каталогов — открытие и закрытие файлов, манипуляция с каталогами, узлы данных непосредственно отрабатывают операции по записи и чтению данных. Узел имён и узлы данных снабжаются веб-серверами, отображающими текущий статус узлов и позволяющими просматривать содержимое файловой системы. Административные функции доступны из интерфейса командной строки. + +# Приложение Б +## Основные понятия java.time + +Для представления времени в Java 8 рекомендуется использовать пакет `java.time`, реализующий стандарт JSR 310. Документация пакета `java.time` доступна по адресу https://docs.oracle.com/javase/8/docs/api/java/time/package-summary.html. + +Далее приводится работа с основными классами представления времени `java.time`. Для экспериментов удобно использовать REPL консоль. Если вы находитесь в среде разработки IDEA Scala консоль может быть запущена нажатием Ctrl+Shift+D, либо через контекстное меню Intellij IDEA. + +Примечание. REPL (от сокращения `read`, `eval`, `print`, `loop` - считать, выполнить, напечатать, повторять в цикле)  интерактивный цикл взаимодействия программной среды с пользователем. + +![](images/23_.png) +![](images/24_.png) + +Примечание. Консоль также можно запустить в командном окне операционной системы с помощью sbt console, находясь в папке с проектом. В обоих вариантах зависимости проекта подключаются автоматически так, что вы можете работать со сторонними библиотеками. + +В пакете java.time различаются представления времени: +- класс Instant — хранит числовую метку; +- класс LocalDate — хранит дату без времени; +- класс LocalTime — хранит время без даты; +- класс LocalDateTime — хранит время и дату; +- класс ZonedDateTime — хранит дату, время и часовой пояс. + +Узнайте в консоли текущее время, вызвав статический метод now() у каждого класса, изучите возвращаемое представление. Например, + +``` scala +import java.time._ +Instant.now() +``` + +Перед использованием классов объявите их импорт. Символ «_» импортирует все классы данного пакета. + +Enter используется для переноса строки. Для выполнения нажмите сочетание клавиш Ctrl+Enter. + +![](images/25_.png) + +Создайте примечательную вам дату с помощью статического конструктора of классов `LocalDate`, `LocalDateTime`, `ZonedDateTime`. Воспользуйтесь подсказками среды разработки или документацией для определения количества аргументов метода of и их значения. + + + +Изучите создание времён и дат с помощью метода parse данных классов. Используйте форматирование, которое выдавалось системой при возвращении значения в консоль. + +![](images/26_.png) + +Для задания пользовательского формата считывания вы можете использовать класс DateTimeFormatter. Описание класса и символов шаблона располагается по адресу https://docs.oracle.com/javase/8/docs/api/java/time/format/DateTimeFormatter.html. +