From d947b561a2f71fdeeeee7d46f4996cb5e10d3cb3 Mon Sep 17 00:00:00 2001 From: Vladimir Protsenko Date: Fri, 3 Dec 2021 20:15:57 +0400 Subject: [PATCH] Added DataFrame/SQL examples. Introduced two errors in first two rows of trips dataset to practice with non-available values. --- ...bike_analysis_python_with_dataframes.ipynb | 1104 +++++++++++++++++ ...ctive_bike_analysis_python_with_rdd.ipynb} | 48 +- data/trips.csv | 4 +- 3 files changed, 1142 insertions(+), 14 deletions(-) create mode 100644 L1 - Introduction to Apache Spark/L1_interactive_bike_analysis_python_with_dataframes.ipynb rename L1 - Introduction to Apache Spark/{L1_interactive_bike_analysis_python.ipynb => L1_interactive_bike_analysis_python_with_rdd.ipynb} (90%) diff --git a/L1 - Introduction to Apache Spark/L1_interactive_bike_analysis_python_with_dataframes.ipynb b/L1 - Introduction to Apache Spark/L1_interactive_bike_analysis_python_with_dataframes.ipynb new file mode 100644 index 0000000..2d41ac9 --- /dev/null +++ b/L1 - Introduction to Apache Spark/L1_interactive_bike_analysis_python_with_dataframes.ipynb @@ -0,0 +1,1104 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "848f4d20-86d3-44e8-a864-aa09ca222e59", + "metadata": {}, + "source": [ + "# Лабораторная 1. Интерактивный анализ данных велопарковок SF Bay Area Bike Share в Apache Spark с использованием Spark SQL и DataFrame API\n", + "\n", + "## Описание данных\n", + "\n", + "https://www.kaggle.com/benhamner/sf-bay-area-bike-share\n", + "\n", + "stations.csv схема:\n", + "\n", + "```\n", + "id: station ID number\n", + "name: name of station\n", + "lat: latitude\n", + "long: longitude\n", + "dock_count: number of total docks at station\n", + "city: city (San Francisco, Redwood City, Palo Alto, Mountain View, San Jose)\n", + "installation_date: original date that station was installed. If station was moved, it is noted below.\n", + "```\n", + "\n", + "trips.csv схема:\n", + "\n", + "```\n", + "id: numeric ID of bike trip\n", + "duration: time of trip in seconds\n", + "start_date: start date of trip with date and time, in PST\n", + "start_station_name: station name of start station\n", + "start_station_id: numeric reference for start station\n", + "end_date: end date of trip with date and time, in PST\n", + "end_station_name: station name for end station\n", + "end_station_id: numeric reference for end station\n", + "bike_id: ID of bike used\n", + "subscription_type: Subscriber = annual or 30-day member; Customer = 24-hour or 3-day member\n", + "zip_code: Home zip code of subscriber (customers can choose to manually enter zip at kiosk however data is unreliable)\n", + "```" + ] + }, + { + "cell_type": "markdown", + "id": "6ee3eea7-9267-44e8-a321-a90139e1a00c", + "metadata": {}, + "source": [ + "https://spark.apache.org/docs/latest/sql-programming-guide.html" + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "id": "30002669-3799-4a39-831e-d276a4708f9a", + "metadata": {}, + "outputs": [], + "source": [ + "from pyspark import SparkContext, SparkConf \n", + "from pyspark.sql import SparkSession\n", + "import pyspark.sql as sql" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "id": "3ed7b961-7879-4937-ac24-11d615e091b8", + "metadata": {}, + "outputs": [], + "source": [ + "conf = SparkConf().setAppName(\"L1_interactive_bike_analysis\").setMaster('yarn')" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "id": "0da718e8-7ad8-42f1-872f-c8805fd3c41c", + "metadata": {}, + "outputs": [], + "source": [ + "sc = SparkContext(conf=conf)\n", + "spark = SparkSession(sc)" + ] + }, + { + "cell_type": "markdown", + "id": "03fe57b4-a7a6-48b8-8fee-e42c4491b5ad", + "metadata": {}, + "source": [ + "# Пример чтения csv файлов и работы с дефектными данными\n", + "\n", + "Список опций чтения и записи для CSV файлов https://spark.apache.org/docs/latest/sql-data-sources-csv.html#data-source-option\n", + "\n", + "Формат паттерна временной метки Spark SQL отличается от python библиотеки datetime.\n", + "https://spark.apache.org/docs/latest/sql-ref-datetime-pattern.html" + ] + }, + { + "cell_type": "code", + "execution_count": 32, + "id": "35951dcd-6d23-415d-84d0-6e351f9c7329", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "DataFrame[id: int, duration: int, start_date: timestamp, start_station_name: string, start_station_id: int, end_date: timestamp, end_station_name: string, end_station_id: int, bike_id: int, subscription_type: string, zip_code: string]" + ] + }, + "execution_count": 32, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "tripData = spark.read\\\n", + ".option(\"header\", True)\\\n", + ".option(\"inferSchema\", True)\\\n", + ".option(\"timestampFormat\", 'M/d/y H:m')\\\n", + ".csv(\"tripd_with_error.csv\")\n", + "\n", + "tripData" + ] + }, + { + "cell_type": "code", + "execution_count": 34, + "id": "79061437-08a1-4cd5-b840-0ac91c8f3adb", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "root\n", + " |-- id: integer (nullable = true)\n", + " |-- duration: integer (nullable = true)\n", + " |-- start_date: timestamp (nullable = true)\n", + " |-- start_station_name: string (nullable = true)\n", + " |-- start_station_id: integer (nullable = true)\n", + " |-- end_date: timestamp (nullable = true)\n", + " |-- end_station_name: string (nullable = true)\n", + " |-- end_station_id: integer (nullable = true)\n", + " |-- bike_id: integer (nullable = true)\n", + " |-- subscription_type: string (nullable = true)\n", + " |-- zip_code: string (nullable = true)\n", + "\n" + ] + } + ], + "source": [ + "tripData.printSchema()" + ] + }, + { + "cell_type": "code", + "execution_count": 35, + "id": "3d9f3fce-7529-4aaa-9e39-9a50ab6d8a10", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+----+--------+-------------------+--------------------+----------------+-------------------+--------------------+--------------+-------+-----------------+--------+\n", + "| id|duration| start_date| start_station_name|start_station_id| end_date| end_station_name|end_station_id|bike_id|subscription_type|zip_code|\n", + "+----+--------+-------------------+--------------------+----------------+-------------------+--------------------+--------------+-------+-----------------+--------+\n", + "|4576| 63| null|South Van Ness at...| 66|2013-08-29 14:14:00|South Van Ness at...| 66| 520| Subscriber| 94127|\n", + "|4607| null|2013-08-29 14:42:00| San Jose City Hall| 10|2013-08-29 14:43:00| San Jose City Hall| 10| 661| Subscriber| 95138|\n", + "|4130| 71|2013-08-29 10:16:00|Mountain View Cit...| 27|2013-08-29 10:17:00|Mountain View Cit...| 27| 48| Subscriber| 97214|\n", + "|4251| 77|2013-08-29 11:29:00| San Jose City Hall| 10|2013-08-29 11:30:00| San Jose City Hall| 10| 26| Subscriber| 95060|\n", + "|4299| 83|2013-08-29 12:02:00|South Van Ness at...| 66|2013-08-29 12:04:00| Market at 10th| 67| 319| Subscriber| 94103|\n", + "+----+--------+-------------------+--------------------+----------------+-------------------+--------------------+--------------+-------+-----------------+--------+\n", + "only showing top 5 rows\n", + "\n" + ] + } + ], + "source": [ + "tripData.show(n=5)" + ] + }, + { + "cell_type": "code", + "execution_count": 26, + "id": "5b0aa3c6-dbac-4827-983f-bde7555e08bb", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "\u001b[0;31mSignature:\u001b[0m \u001b[0mtripData\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mdropna\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mhow\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0;34m'any'\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mthresh\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0;32mNone\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0msubset\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0;32mNone\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n", + "\u001b[0;31mDocstring:\u001b[0m\n", + "Returns a new :class:`DataFrame` omitting rows with null values.\n", + ":func:`DataFrame.dropna` and :func:`DataFrameNaFunctions.drop` are aliases of each other.\n", + "\n", + ".. versionadded:: 1.3.1\n", + "\n", + "Parameters\n", + "----------\n", + "how : str, optional\n", + " 'any' or 'all'.\n", + " If 'any', drop a row if it contains any nulls.\n", + " If 'all', drop a row only if all its values are null.\n", + "thresh: int, optional\n", + " default None\n", + " If specified, drop rows that have less than `thresh` non-null values.\n", + " This overwrites the `how` parameter.\n", + "subset : str, tuple or list, optional\n", + " optional list of column names to consider.\n", + "\n", + "Examples\n", + "--------\n", + ">>> df4.na.drop().show()\n", + "+---+------+-----+\n", + "|age|height| name|\n", + "+---+------+-----+\n", + "| 10| 80|Alice|\n", + "+---+------+-----+\n", + "\u001b[0;31mFile:\u001b[0m ~/.local/lib/python3.9/site-packages/pyspark/sql/dataframe.py\n", + "\u001b[0;31mType:\u001b[0m method\n" + ] + }, + "metadata": {}, + "output_type": "display_data" + } + ], + "source": [ + "? tripData.dropna" + ] + }, + { + "cell_type": "code", + "execution_count": 36, + "id": "b60a4a3b-1a69-4337-94b1-e91ac8a3b967", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+----+--------+-------------------+--------------------+----------------+-------------------+--------------------+--------------+-------+-----------------+--------+\n", + "| id|duration| start_date| start_station_name|start_station_id| end_date| end_station_name|end_station_id|bike_id|subscription_type|zip_code|\n", + "+----+--------+-------------------+--------------------+----------------+-------------------+--------------------+--------------+-------+-----------------+--------+\n", + "|4130| 71|2013-08-29 10:16:00|Mountain View Cit...| 27|2013-08-29 10:17:00|Mountain View Cit...| 27| 48| Subscriber| 97214|\n", + "|4251| 77|2013-08-29 11:29:00| San Jose City Hall| 10|2013-08-29 11:30:00| San Jose City Hall| 10| 26| Subscriber| 95060|\n", + "|4299| 83|2013-08-29 12:02:00|South Van Ness at...| 66|2013-08-29 12:04:00| Market at 10th| 67| 319| Subscriber| 94103|\n", + "|4927| 103|2013-08-29 18:54:00| Golden Gate at Polk| 59|2013-08-29 18:56:00| Golden Gate at Polk| 59| 527| Subscriber| 94109|\n", + "|4500| 109|2013-08-29 13:25:00|Santa Clara at Al...| 4|2013-08-29 13:27:00| Adobe on Almaden| 5| 679| Subscriber| 95112|\n", + "+----+--------+-------------------+--------------------+----------------+-------------------+--------------------+--------------+-------+-----------------+--------+\n", + "only showing top 5 rows\n", + "\n" + ] + } + ], + "source": [ + "tripData.dropna().show(n=5)" + ] + }, + { + "cell_type": "code", + "execution_count": 37, + "id": "45266a31-4e69-4a50-b4b2-c966bb75dd39", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+-------+------------------+------------------+--------------------+------------------+--------------------+------------------+------------------+-----------------+-------------------+\n", + "|summary| id| duration| start_station_name| start_station_id| end_station_name| end_station_id| bike_id|subscription_type| zip_code|\n", + "+-------+------------------+------------------+--------------------+------------------+--------------------+------------------+------------------+-----------------+-------------------+\n", + "| count| 985352| 985351| 985352| 985352| 985352| 985352| 985352| 985352| 978484|\n", + "| mean|521401.35102481145|1092.1337716204682| null|58.060558054380564| null|58.043319544690625|426.07050779822845| null| 2008421.845039644|\n", + "| stddev|245889.28553182338| 25689.80328664839| null|16.998928220336474| null| 17.10456379871572| 155.4614377036669| null|1.243190730418017E9|\n", + "| min| 4069| 60| 2nd at Folsom| 2| 2nd at Folsom| 2| 9| Customer| 0|\n", + "| max| 913460| 17270400|Yerba Buena Cente...| 84|Yerba Buena Cente...| 84| 878| Subscriber| v6z2x|\n", + "+-------+------------------+------------------+--------------------+------------------+--------------------+------------------+------------------+-----------------+-------------------+\n", + "\n" + ] + } + ], + "source": [ + "tripData.describe().show()" + ] + }, + { + "cell_type": "code", + "execution_count": 39, + "id": "6f486cc0-d591-43b8-8c92-a50415f3a141", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "root\n", + " |-- id: integer (nullable = true)\n", + " |-- name: string (nullable = true)\n", + " |-- lat: double (nullable = true)\n", + " |-- long: double (nullable = true)\n", + " |-- dock_count: integer (nullable = true)\n", + " |-- city: string (nullable = true)\n", + " |-- installation_date: timestamp (nullable = true)\n", + "\n" + ] + } + ], + "source": [ + "stationData = spark.read\\\n", + ".option(\"header\", True)\\\n", + ".option(\"inferSchema\", True)\\\n", + ".option(\"timestampFormat\", 'M/d/y')\\\n", + ".csv(\"stations.csv\")\n", + "\n", + "stationData.printSchema()" + ] + }, + { + "cell_type": "code", + "execution_count": 40, + "id": "e831a0c6-0ced-43c2-80dc-2b7058859eda", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+---+--------------------+------------------+-------------------+----------+--------+-------------------+\n", + "| id| name| lat| long|dock_count| city| installation_date|\n", + "+---+--------------------+------------------+-------------------+----------+--------+-------------------+\n", + "| 2|San Jose Diridon ...| 37.329732|-121.90178200000001| 27|San Jose|2013-08-06 00:00:00|\n", + "| 3|San Jose Civic Ce...| 37.330698| -121.888979| 15|San Jose|2013-08-05 00:00:00|\n", + "| 4|Santa Clara at Al...| 37.333988| -121.894902| 11|San Jose|2013-08-06 00:00:00|\n", + "| 5| Adobe on Almaden| 37.331415| -121.8932| 19|San Jose|2013-08-05 00:00:00|\n", + "| 6| San Pedro Square|37.336721000000004| -121.894074| 15|San Jose|2013-08-07 00:00:00|\n", + "+---+--------------------+------------------+-------------------+----------+--------+-------------------+\n", + "only showing top 5 rows\n", + "\n" + ] + } + ], + "source": [ + "stationData.show(n=5)" + ] + }, + { + "cell_type": "code", + "execution_count": 41, + "id": "c7633ce0-8860-4a5d-aa6a-41d89418b56f", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+-------+------------------+--------------------+-------------------+-------------------+-----------------+-------------+\n", + "|summary| id| name| lat| long| dock_count| city|\n", + "+-------+------------------+--------------------+-------------------+-------------------+-----------------+-------------+\n", + "| count| 70| 70| 70| 70| 70| 70|\n", + "| mean| 43.0| null| 37.59024338428572|-122.21841616428571|17.65714285714286| null|\n", + "| stddev|24.166091947189145| null|0.20347253639672502|0.20944604979644524|4.010441857493954| null|\n", + "| min| 2| 2nd at Folsom| 37.329732| -122.418954| 11|Mountain View|\n", + "| max| 84|Yerba Buena Cente...| 37.80477| -121.877349| 27| San Jose|\n", + "+-------+------------------+--------------------+-------------------+-------------------+-----------------+-------------+\n", + "\n" + ] + } + ], + "source": [ + "stationData.describe().show()" + ] + }, + { + "cell_type": "markdown", + "id": "5944fca6-54c9-4bce-a065-aafe2c84226c", + "metadata": {}, + "source": [ + "# Пример использования DataFrame API" + ] + }, + { + "cell_type": "markdown", + "id": "3bcffb6f-68b0-45bf-9359-f97e4ec281b1", + "metadata": {}, + "source": [ + "Выполните операцию объединения коллекций по ключу с помощью функции join. Объедините stationsIndexed и tripsByStartTerminals, stationsIndexed и tripsByEndTerminals." + ] + }, + { + "cell_type": "markdown", + "id": "22fe9167-4833-4086-af57-4f94ae0b343f", + "metadata": {}, + "source": [ + "https://spark.apache.org/docs/latest/sql-getting-started.html#untyped-dataset-operations-aka-dataframe-operations" + ] + }, + { + "cell_type": "code", + "execution_count": 42, + "id": "98a3fffd-c4f3-4047-a2c1-8ed651a2dbf0", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "root\n", + " |-- id: integer (nullable = true)\n", + " |-- duration: integer (nullable = true)\n", + " |-- start_date: timestamp (nullable = true)\n", + " |-- start_station_name: string (nullable = true)\n", + " |-- start_station_id: integer (nullable = true)\n", + " |-- end_date: timestamp (nullable = true)\n", + " |-- end_station_name: string (nullable = true)\n", + " |-- end_station_id: integer (nullable = true)\n", + " |-- bike_id: integer (nullable = true)\n", + " |-- subscription_type: string (nullable = true)\n", + " |-- zip_code: string (nullable = true)\n", + "\n", + "root\n", + " |-- id: integer (nullable = true)\n", + " |-- name: string (nullable = true)\n", + " |-- lat: double (nullable = true)\n", + " |-- long: double (nullable = true)\n", + " |-- dock_count: integer (nullable = true)\n", + " |-- city: string (nullable = true)\n", + " |-- installation_date: timestamp (nullable = true)\n", + "\n" + ] + } + ], + "source": [ + "tripData.printSchema()\n", + "stationData.printSchema()" + ] + }, + { + "cell_type": "code", + "execution_count": 45, + "id": "e89bb540-49c1-46a4-83f0-5cf280a02656", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+---+--------------------+------------------+-------------------+\n", + "| id| name| lat| long|\n", + "+---+--------------------+------------------+-------------------+\n", + "| 2|San Jose Diridon ...| 37.329732|-121.90178200000001|\n", + "| 3|San Jose Civic Ce...| 37.330698| -121.888979|\n", + "| 4|Santa Clara at Al...| 37.333988| -121.894902|\n", + "| 5| Adobe on Almaden| 37.331415| -121.8932|\n", + "| 6| San Pedro Square|37.336721000000004| -121.894074|\n", + "| 7|Paseo de San Antonio| 37.333798|-121.88694299999999|\n", + "| 8| San Salvador at 1st| 37.330165|-121.88583100000001|\n", + "| 9| Japantown| 37.348742|-121.89471499999999|\n", + "| 10| San Jose City Hall| 37.337391| -121.886995|\n", + "| 11| MLK Library| 37.335885|-121.88566000000002|\n", + "| 12|SJSU 4th at San C...| 37.332808|-121.88389099999999|\n", + "| 13| St James Park| 37.339301|-121.88993700000002|\n", + "| 14|Arena Green / SAP...| 37.332692| -121.900084|\n", + "| 16|SJSU - San Salvad...|37.333954999999996| -121.877349|\n", + "| 21| Franklin at Maple| 37.481758| -122.226904|\n", + "| 22|Redwood City Calt...|37.486078000000006|-122.23208899999999|\n", + "| 23|San Mateo County ...|37.487615999999996| -122.229951|\n", + "| 24|Redwood City Publ...| 37.484219| -122.227424|\n", + "| 25|Stanford in Redwo...| 37.48537|-122.20328799999999|\n", + "| 26|Redwood City Medi...| 37.487682| -122.223492|\n", + "+---+--------------------+------------------+-------------------+\n", + "only showing top 20 rows\n", + "\n" + ] + } + ], + "source": [ + "stationsView = stationData.select(stationData['id'], stationData['name'], stationData['lat'], stationData['long'])\n", + "stationsView.show()" + ] + }, + { + "cell_type": "code", + "execution_count": 56, + "id": "4d44dd69-b9f2-4c0e-afc1-c5eb556d7978", + "metadata": {}, + "outputs": [], + "source": [ + "startTrips = tripData.select(tripData.id, tripData.duration, tripData.start_station_id).withColumnRenamed('id', 'trip_id').join(stationsView, tripData.start_station_id == stationsView.id)\n", + "startTrips = startTrips.drop('id')" + ] + }, + { + "cell_type": "code", + "execution_count": 57, + "id": "be7c7bbf-4603-400f-a38c-bf75fe93410a", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+-------+--------+----------------+--------------------+------------------+-------------------+\n", + "|trip_id|duration|start_station_id| name| lat| long|\n", + "+-------+--------+----------------+--------------------+------------------+-------------------+\n", + "| 4576| 63| 66|South Van Ness at...| 37.774814| -122.418954|\n", + "| 4607| null| 10| San Jose City Hall| 37.337391| -121.886995|\n", + "| 4130| 71| 27|Mountain View Cit...| 37.389218| -122.081896|\n", + "| 4251| 77| 10| San Jose City Hall| 37.337391| -121.886995|\n", + "| 4299| 83| 66|South Van Ness at...| 37.774814| -122.418954|\n", + "| 4927| 103| 59| Golden Gate at Polk| 37.781332| -122.418603|\n", + "| 4500| 109| 4|Santa Clara at Al...| 37.333988| -121.894902|\n", + "| 4563| 111| 8| San Salvador at 1st| 37.330165|-121.88583100000001|\n", + "| 4760| 113| 66|South Van Ness at...| 37.774814| -122.418954|\n", + "| 4258| 114| 10| San Jose City Hall| 37.337391| -121.886995|\n", + "| 4549| 125| 49| Spear at Folsom|37.790302000000004|-122.39063700000001|\n", + "| 4498| 126| 6| San Pedro Square|37.336721000000004| -121.894074|\n", + "| 4965| 129| 28|Mountain View Cal...|37.394358000000004|-122.07671299999998|\n", + "| 4557| 130| 64| 2nd at South Park| 37.782259| -122.392738|\n", + "| 4386| 134| 41| Clay at Battery| 37.795001| -122.39997|\n", + "| 4749| 138| 47| Post at Kearney| 37.788975| -122.403452|\n", + "| 4242| 141| 10| San Jose City Hall| 37.337391| -121.886995|\n", + "| 4329| 142| 67| Market at 10th|37.776619000000004|-122.41738500000001|\n", + "| 5097| 142| 74| Steuart at Market| 37.794139| -122.394434|\n", + "| 5084| 144| 39| Powell Street BART|37.783871000000005| -122.408433|\n", + "+-------+--------+----------------+--------------------+------------------+-------------------+\n", + "only showing top 20 rows\n", + "\n" + ] + } + ], + "source": [ + "startTrips.show()" + ] + }, + { + "cell_type": "code", + "execution_count": 63, + "id": "562a2b9e-4e84-4a11-a33f-905d424ff990", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "root\n", + " |-- id: integer (nullable = true)\n", + " |-- duration: integer (nullable = true)\n", + " |-- start_date: timestamp (nullable = true)\n", + " |-- start_station_name: string (nullable = true)\n", + " |-- start_station_id: integer (nullable = true)\n", + " |-- end_date: timestamp (nullable = true)\n", + " |-- end_station_name: string (nullable = true)\n", + " |-- end_station_id: integer (nullable = true)\n", + " |-- bike_id: integer (nullable = true)\n", + " |-- subscription_type: string (nullable = true)\n", + " |-- zip_code: string (nullable = true)\n", + "\n", + "root\n", + " |-- id: integer (nullable = true)\n", + " |-- name: string (nullable = true)\n", + " |-- lat: double (nullable = true)\n", + " |-- long: double (nullable = true)\n", + " |-- dock_count: integer (nullable = true)\n", + " |-- city: string (nullable = true)\n", + " |-- installation_date: timestamp (nullable = true)\n", + "\n" + ] + } + ], + "source": [ + "tripData.printSchema()\n", + "stationData.printSchema()" + ] + }, + { + "cell_type": "markdown", + "id": "c389458c-098c-4239-a069-ee33e7bd6e69", + "metadata": { + "tags": [] + }, + "source": [ + "# Пример использования Spark SQL API" + ] + }, + { + "cell_type": "markdown", + "id": "adb410d7-e319-4f78-b5d1-eafb594dea34", + "metadata": {}, + "source": [ + "https://spark.apache.org/docs/latest/sql-getting-started.html#running-sql-queries-programmatically" + ] + }, + { + "cell_type": "code", + "execution_count": 65, + "id": "33248614-5149-4c63-b222-060b5fec4b9d", + "metadata": {}, + "outputs": [], + "source": [ + "stationData.createOrReplaceTempView(\"stations\")\n", + "tripData.createOrReplaceTempView(\"trips\")" + ] + }, + { + "cell_type": "markdown", + "id": "14b88c1e-fdce-4e5e-92db-8ee974c77c4c", + "metadata": {}, + "source": [ + "https://spark.apache.org/docs/latest/sql-ref.html" + ] + }, + { + "cell_type": "code", + "execution_count": 74, + "id": "e54a726b-8e00-4a69-82b7-c7eee3f4ac68", + "metadata": {}, + "outputs": [], + "source": [ + "endTrips = spark.sql(\"\"\"\n", + "SELECT trips.id as trip_id, trips.end_station_id, trips.duration, stations.name as station_name, stations.lat, stations.long \n", + "FROM trips INNER JOIN stations \n", + " ON trips.end_station_id==stations.id\n", + "\"\"\")" + ] + }, + { + "cell_type": "code", + "execution_count": 75, + "id": "9f756e1d-0ac3-41a0-b175-d2bcfd93a069", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+-------+--------------+--------+--------------------+------------------+-------------------+\n", + "|trip_id|end_station_id|duration| station_name| lat| long|\n", + "+-------+--------------+--------+--------------------+------------------+-------------------+\n", + "| 4576| 66| 63|South Van Ness at...| 37.774814| -122.418954|\n", + "| 4607| 10| null| San Jose City Hall| 37.337391| -121.886995|\n", + "| 4130| 27| 71|Mountain View Cit...| 37.389218| -122.081896|\n", + "| 4251| 10| 77| San Jose City Hall| 37.337391| -121.886995|\n", + "| 4299| 67| 83| Market at 10th|37.776619000000004|-122.41738500000001|\n", + "| 4927| 59| 103| Golden Gate at Polk| 37.781332| -122.418603|\n", + "| 4500| 5| 109| Adobe on Almaden| 37.331415| -121.8932|\n", + "| 4563| 8| 111| San Salvador at 1st| 37.330165|-121.88583100000001|\n", + "| 4760| 66| 113|South Van Ness at...| 37.774814| -122.418954|\n", + "| 4258| 11| 114| MLK Library| 37.335885|-121.88566000000002|\n", + "| 4549| 54| 125|Embarcadero at Br...| 37.787152|-122.38801299999999|\n", + "| 4498| 4| 126|Santa Clara at Al...| 37.333988| -121.894902|\n", + "| 4965| 28| 129|Mountain View Cal...|37.394358000000004|-122.07671299999998|\n", + "| 4557| 64| 130| 2nd at South Park| 37.782259| -122.392738|\n", + "| 4386| 56| 134| Beale at Market| 37.792251|-122.39708600000002|\n", + "| 4749| 47| 138| Post at Kearney| 37.788975| -122.403452|\n", + "| 4242| 10| 141| San Jose City Hall| 37.337391| -121.886995|\n", + "| 4329| 67| 142| Market at 10th|37.776619000000004|-122.41738500000001|\n", + "| 5097| 50| 142|Harry Bridges Pla...| 37.795392| -122.394203|\n", + "| 5084| 76| 144| Market at 4th| 37.786305|-122.40496599999999|\n", + "+-------+--------------+--------+--------------------+------------------+-------------------+\n", + "only showing top 20 rows\n", + "\n" + ] + } + ], + "source": [ + "endTrips.show()" + ] + }, + { + "cell_type": "markdown", + "id": "111e1fba-9ffe-4a55-81d7-021041f09865", + "metadata": {}, + "source": [ + "Для каждой стартовой станции найдем среднее время поездки. " + ] + }, + { + "cell_type": "markdown", + "id": "efcfc45a-465f-4e58-a55a-98069804d13f", + "metadata": {}, + "source": [ + "Рассчитаем среднее время поездки для каждого стартового парковочного места" + ] + }, + { + "cell_type": "code", + "execution_count": 225, + "id": "106a733b-540b-4830-a781-b7bd91fe8785", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+--------------------+------------------+\n", + "| start_station_name| avg(duration)|\n", + "+--------------------+------------------+\n", + "|University and Em...| 7230.231034482758|\n", + "|Redwood City Medi...| 4764.68287037037|\n", + "|San Jose Civic Ce...| 4720.621422376409|\n", + "| Park at Olive| 4686.397612488521|\n", + "|California Ave Ca...| 4502.620639534884|\n", + "|Redwood City Publ...|3697.0892307692307|\n", + "|Rengstorff Avenue...| 3544.797270955166|\n", + "|Palo Alto Caltrai...| 3158.331498866947|\n", + "|San Mateo County ...|3002.0827067669175|\n", + "|South Van Ness at...|2936.8873503613395|\n", + "|San Antonio Shopp...| 2508.434736091298|\n", + "|Cowper at University| 2493.220572640509|\n", + "| Broadway at Main|2481.2537313432836|\n", + "|Redwood City Calt...| 2405.29409190372|\n", + "| Japantown|2297.0913838120105|\n", + "|San Antonio Caltr...|2103.7238932071646|\n", + "|SJSU 4th at San C...| 1995.366021236727|\n", + "|Washington at Kea...|1979.3077445652175|\n", + "| Mezes Park|1918.1354359925788|\n", + "|Arena Green / SAP...|1888.3390476190475|\n", + "+--------------------+------------------+\n", + "only showing top 20 rows\n", + "\n" + ] + } + ], + "source": [ + "spark.sql(\"\"\"\n", + "SELECT start_station_name, avg(duration)\n", + "FROM trips\n", + "GROUP BY trips.start_station_name\n", + "ORDER BY avg(duration) DESC\n", + "\"\"\").show()" + ] + }, + { + "cell_type": "markdown", + "id": "0fa268d7-ddd4-42f1-9688-ecaf60019143", + "metadata": {}, + "source": [ + "# Пример подготовки данных c Spark SQL, pandas, h3 для их визуализации на карте folium" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "b302c7d6-f4f8-4c60-9bdc-bf815e8615f0", + "metadata": {}, + "outputs": [], + "source": [ + "# ! pip install h3 h3_pyspark pandas folium" + ] + }, + { + "cell_type": "markdown", + "id": "4c826bda-4237-4b13-b4e3-34be3eb43c78", + "metadata": {}, + "source": [ + "Найдём велосипеды, которые ездили в рождество 2014 года.\n", + "https://spark.apache.org/docs/latest/api/sql/#make_timestamp" + ] + }, + { + "cell_type": "code", + "execution_count": 86, + "id": "3551e682-6db3-4cd4-95c1-76f0d45fc5ba", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+-------+-------------------+-------------------+\n", + "|bike_id| start_date| end_date|\n", + "+-------+-------------------+-------------------+\n", + "| 379|2014-12-25 22:10:00|2014-12-25 22:18:00|\n", + "| 709|2014-12-25 21:21:00|2014-12-25 21:27:00|\n", + "| 376|2014-12-25 20:40:00|2014-12-25 20:46:00|\n", + "| 541|2014-12-25 20:27:00|2014-12-25 20:32:00|\n", + "| 283|2014-12-25 19:56:00|2014-12-25 20:01:00|\n", + "| 519|2014-12-25 19:56:00|2014-12-25 20:01:00|\n", + "| 583|2014-12-25 19:05:00|2014-12-25 19:07:00|\n", + "| 495|2014-12-25 18:42:00|2014-12-25 18:44:00|\n", + "| 541|2014-12-25 18:28:00|2014-12-25 18:37:00|\n", + "| 585|2014-12-25 18:27:00|2014-12-25 18:37:00|\n", + "| 574|2014-12-25 18:12:00|2014-12-25 18:21:00|\n", + "| 630|2014-12-25 18:12:00|2014-12-25 18:22:00|\n", + "| 583|2014-12-25 18:05:00|2014-12-25 18:22:00|\n", + "| 290|2014-12-25 18:01:00|2014-12-25 18:15:00|\n", + "| 451|2014-12-25 17:55:00|2014-12-25 18:04:00|\n", + "| 630|2014-12-25 17:55:00|2014-12-25 17:59:00|\n", + "| 574|2014-12-25 17:54:00|2014-12-25 17:59:00|\n", + "| 463|2014-12-25 17:46:00|2014-12-25 17:53:00|\n", + "| 628|2014-12-25 17:46:00|2014-12-25 17:53:00|\n", + "| 58|2014-12-25 16:32:00|2014-12-25 17:04:00|\n", + "+-------+-------------------+-------------------+\n", + "only showing top 20 rows\n", + "\n" + ] + } + ], + "source": [ + "# year - the year to represent, from 1 to 9999 \n", + "# month - the month-of-year to represent, from 1 (January) to 12 (December) \n", + "# day - the day-of-month to represent, from 1 to 31 \n", + "# hour - the hour-of-day to represent, from 0 to 23 \n", + "# min - the minute-of-hour to represent, from 0 to 59 \n", + "# sec - the second-of-minute and its micro-fraction to represent, from 0 to 60. The value can be either an integer like 13 , or a fraction like 13.123. If the sec argument equals to 60, the seconds field is set to 0 and 1 minute is added to the final timestamp. \n", + "# timezone - the time zone identifier. For example, CET, UTC and etc.\n", + "\n", + "spark.sql(\"\"\"\n", + "SELECT bike_id, start_date, end_date \n", + "FROM trips \n", + "WHERE \n", + " start_date > make_timestamp(2014, 12, 25, 0, 0, 0) \n", + " AND start_date < make_timestamp(2014, 12, 26, 0, 0, 0)\n", + "\"\"\").show()" + ] + }, + { + "cell_type": "markdown", + "id": "9a8f64a4-2a00-497e-b0a5-f423af29c077", + "metadata": {}, + "source": [ + "Найдём станции через которые проехал один из велосипедов, найденных ранее." + ] + }, + { + "cell_type": "code", + "execution_count": 93, + "id": "ac997bad-296b-4f4a-9a1b-fed9d78e3fba", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+-------+-------------------+-------------------+--------------+\n", + "|bike_id| start_date| end_date| name|\n", + "+-------+-------------------+-------------------+--------------+\n", + "| 583|2014-12-25 19:05:00|2014-12-25 19:07:00|Market at 10th|\n", + "| 583|2014-12-25 18:05:00|2014-12-25 18:22:00|Market at 10th|\n", + "| 583|2014-12-25 12:14:00|2014-12-25 12:21:00| Market at 4th|\n", + "| 583|2014-12-25 19:05:00|2014-12-25 19:07:00|Market at 10th|\n", + "| 583|2014-12-25 18:05:00|2014-12-25 18:22:00|Market at 10th|\n", + "| 583|2014-12-25 12:14:00|2014-12-25 12:21:00| Market at 4th|\n", + "+-------+-------------------+-------------------+--------------+\n", + "\n" + ] + } + ], + "source": [ + "spark.sql(\"\"\"\n", + "SELECT trips.bike_id, trips.start_date, trips.end_date, stations.name\n", + "FROM trips INNER JOIN stations\n", + " ON trips.start_station_id == stations.id\n", + "WHERE \n", + " bike_id == 583 \n", + " AND start_date > make_timestamp(2014, 12, 25, 0, 0, 0) \n", + " AND start_date < make_timestamp(2014, 12, 26, 0, 0, 0)\n", + "\"\"\").show()" + ] + }, + { + "cell_type": "markdown", + "id": "35e6fdec-4552-417a-9a79-5ac3c1604ac6", + "metadata": {}, + "source": [ + "Найдём все станции, которые попали в ту же клетку h3 координатной сетки что и станции, через которые проехал велосипед 583 25.12.2014." + ] + }, + { + "cell_type": "markdown", + "id": "8e554596-7c5f-4783-ab6d-5e7669e61ba3", + "metadata": {}, + "source": [ + "Отобразим gps координаты станций в координаты h3." + ] + }, + { + "cell_type": "code", + "execution_count": 96, + "id": "b3d7bd21-ab5f-4119-8484-d816725cec3b", + "metadata": {}, + "outputs": [], + "source": [ + "from pyspark.sql import functions as F\n", + "import h3_pyspark\n", + "import h3" + ] + }, + { + "cell_type": "markdown", + "id": "996cd067-d19b-42ad-b0b6-4431ebed5674", + "metadata": {}, + "source": [ + "H3 Grid Resolutions https://h3geo.org/docs/core-library/restable/" + ] + }, + { + "cell_type": "code", + "execution_count": 205, + "id": "cef28792-2d71-49ef-b390-6d6e9ce7320d", + "metadata": {}, + "outputs": [], + "source": [ + "resolution = 8\n", + "stationData.withColumn('h3', h3_pyspark.geo_to_h3('lat', 'long', sql.functions.lit(resolution))).createOrReplaceTempView(\"stations_h3\")" + ] + }, + { + "cell_type": "markdown", + "id": "2019f74d-afa5-44db-b103-bf3e298eaa85", + "metadata": {}, + "source": [ + "Используя вложенный sql запрос найдём h3 координаты станций, через который проехал велосипед 583. А затем отфильтруем поездки в рождество 2014 года, которые стартовали со станций с теми же h3 координатами, что мы нашли." + ] + }, + { + "cell_type": "code", + "execution_count": 166, + "id": "5f8a0d22-3f90-453f-9a00-e9a7829d33ca", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+-------+-------------------+---------------+------------------+-------------------+--------------+\n", + "|bike_id| start_date| h3| lat| long| name|\n", + "+-------+-------------------+---------------+------------------+-------------------+--------------+\n", + "| 439|2014-12-25 01:40:00|8828308281fffff|37.776619000000004|-122.41738500000001|Market at 10th|\n", + "| 439|2014-12-25 01:40:00|8828308281fffff|37.776619000000004|-122.41738500000001|Market at 10th|\n", + "| 659|2014-12-25 09:49:00|88283082abfffff|37.781752000000004|-122.40512700000001| 5th at Howard|\n", + "| 465|2014-12-25 09:49:00|88283082abfffff|37.781752000000004|-122.40512700000001| 5th at Howard|\n", + "| 659|2014-12-25 09:49:00|88283082abfffff|37.781752000000004|-122.40512700000001| 5th at Howard|\n", + "| 465|2014-12-25 09:49:00|88283082abfffff|37.781752000000004|-122.40512700000001| 5th at Howard|\n", + "| 583|2014-12-25 12:14:00|88283082abfffff| 37.786305|-122.40496599999999| Market at 4th|\n", + "| 583|2014-12-25 12:14:00|88283082abfffff| 37.786305|-122.40496599999999| Market at 4th|\n", + "| 479|2014-12-25 12:22:00|88283082abfffff|37.781752000000004|-122.40512700000001| 5th at Howard|\n", + "| 331|2014-12-25 12:22:00|88283082abfffff|37.781752000000004|-122.40512700000001| 5th at Howard|\n", + "| 479|2014-12-25 12:22:00|88283082abfffff|37.781752000000004|-122.40512700000001| 5th at Howard|\n", + "| 331|2014-12-25 12:22:00|88283082abfffff|37.781752000000004|-122.40512700000001| 5th at Howard|\n", + "| 330|2014-12-25 12:27:00|88283082abfffff|37.781752000000004|-122.40512700000001| 5th at Howard|\n", + "| 438|2014-12-25 12:27:00|88283082abfffff|37.781752000000004|-122.40512700000001| 5th at Howard|\n", + "| 330|2014-12-25 12:27:00|88283082abfffff|37.781752000000004|-122.40512700000001| 5th at Howard|\n", + "| 438|2014-12-25 12:27:00|88283082abfffff|37.781752000000004|-122.40512700000001| 5th at Howard|\n", + "| 428|2014-12-25 12:35:00|88283082abfffff|37.781752000000004|-122.40512700000001| 5th at Howard|\n", + "| 428|2014-12-25 12:35:00|88283082abfffff|37.781752000000004|-122.40512700000001| 5th at Howard|\n", + "| 214|2014-12-25 12:38:00|88283082abfffff|37.781752000000004|-122.40512700000001| 5th at Howard|\n", + "| 292|2014-12-25 12:38:00|88283082abfffff| 37.786305|-122.40496599999999| Market at 4th|\n", + "+-------+-------------------+---------------+------------------+-------------------+--------------+\n", + "only showing top 20 rows\n", + "\n" + ] + } + ], + "source": [ + "christmas_583_contacts = spark.sql(\"\"\"\n", + "SELECT trips.bike_id, trips.start_date, stations_h3.h3, stations_h3.lat, stations_h3.long, stations_h3.name\n", + "FROM trips INNER JOIN stations_h3\n", + " ON trips.start_station_id == stations_h3.id\n", + "WHERE \n", + " stations_h3.h3 in (SELECT stations_h3.h3\n", + " FROM trips INNER JOIN stations_h3\n", + " ON trips.start_station_id == stations_h3.id\n", + " WHERE \n", + " bike_id == 583 \n", + " AND start_date > make_timestamp(2014, 12, 25, 0, 0, 0) \n", + " AND start_date < make_timestamp(2014, 12, 26, 0, 0, 0))\n", + " AND start_date > make_timestamp(2014, 12, 25, 0, 0, 0)\n", + " AND start_date < make_timestamp(2014, 12, 26, 0, 0, 0)\n", + "ORDER BY trips.start_date\n", + "\"\"\")\n", + "christmas_583_contacts.cache()\n", + "christmas_583_contacts.show()" + ] + }, + { + "cell_type": "code", + "execution_count": 167, + "id": "772045d9-39ef-47ec-8797-92e92f629a1f", + "metadata": {}, + "outputs": [], + "source": [ + "import pandas as pd\n", + "import h3\n", + "\n", + "h3_places = christmas_583_contacts.select('lat','long', 'name', 'h3').toPandas()" + ] + }, + { + "cell_type": "code", + "execution_count": 215, + "id": "a5d599fd-9e74-4d08-89b0-3a67f1603377", + "metadata": {}, + "outputs": [], + "source": [ + "# source code from https://nbviewer.org/github/uber/h3-py-notebooks/blob/master/notebooks/usage.ipynb\n", + "import folium \n", + "\n", + "def init_map(hexagons, width=1100, height=900):\n", + " lats = []\n", + " longs = []\n", + " for hexagon in hexagons:\n", + " lat, long = h3.h3_to_geo(hexagon)\n", + " lats.append(lat)\n", + " longs.append(long)\n", + " return folium.Map(location=[sum(lats)/len(lats), sum(longs)/len(longs)], zoom_start=15, tiles='cartodbpositron', width=width, height=height)\n", + "\n", + "def visualize_hexagons(folium_map, hexagons, color=\"red\"):\n", + " \"\"\"\n", + " hexagons is a list of hexcluster. Each hexcluster is a list of hexagons. \n", + " eg. [[hex1, hex2], [hex3, hex4]]\n", + " \"\"\"\n", + " polylines = []\n", + " lat = []\n", + " lng = []\n", + " for hex in hexagons:\n", + " polygons = h3.h3_set_to_multi_polygon([hex], geo_json=False)\n", + " # flatten polygons into loops.\n", + " outlines = [loop for polygon in polygons for loop in polygon]\n", + " polyline = [outline + [outline[0]] for outline in outlines][0]\n", + " lat.extend(map(lambda v:v[0], polyline))\n", + " lng.extend(map(lambda v:v[1], polyline))\n", + " polylines.append(polyline)\n", + " \n", + " for polyline in polylines:\n", + " my_PolyLine = folium.PolyLine(locations=polyline, weight=8, color=color)\n", + " folium_map.add_child(my_PolyLine)\n", + " \n", + " return folium_map\n", + "\n", + "def visualize_stations(folium_map, stations, color=\"red\"):\n", + " \"\"\"\n", + " stations is a dataframe with columns: lat, long, station_name\n", + " \"\"\"\n", + " for idx, lat, long, station_name in stations.itertuples():\n", + " folium_map.add_child(folium.map.Marker(location=(lat, long)))\n", + " folium_map.add_child(folium.map.Marker(location=(lat, long), \n", + " icon=folium.features.DivIcon(\n", + " icon_size=(500,36),\n", + " icon_anchor=(-17,37),\n", + " html=f'
{station_name}
',\n", + " )))\n", + " return folium_map" + ] + }, + { + "cell_type": "code", + "execution_count": 217, + "id": "aa0de1f4-6070-4f29-9989-a36b36146c61", + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "
Make this Notebook Trusted to load map: File -> Trust Notebook
" + ], + "text/plain": [ + "" + ] + }, + "metadata": {}, + "output_type": "display_data" + } + ], + "source": [ + "m = init_map(h3_places.h3.unique())\n", + "visualize_hexagons(m, h3_places.h3.unique(), color=\"black\")\n", + "visualize_stations(m, h3_places.loc[:, ['lat', 'long', 'name']])\n", + "display(m)" + ] + }, + { + "cell_type": "code", + "execution_count": 152, + "id": "c98261f7-283c-4c7e-b915-3778ff972f5f", + "metadata": {}, + "outputs": [], + "source": [ + "sc.stop()" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.9.5" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/L1 - Introduction to Apache Spark/L1_interactive_bike_analysis_python.ipynb b/L1 - Introduction to Apache Spark/L1_interactive_bike_analysis_python_with_rdd.ipynb similarity index 90% rename from L1 - Introduction to Apache Spark/L1_interactive_bike_analysis_python.ipynb rename to L1 - Introduction to Apache Spark/L1_interactive_bike_analysis_python_with_rdd.ipynb index 21b40df..0e98192 100644 --- a/L1 - Introduction to Apache Spark/L1_interactive_bike_analysis_python.ipynb +++ b/L1 - Introduction to Apache Spark/L1_interactive_bike_analysis_python_with_rdd.ipynb @@ -42,7 +42,7 @@ }, { "cell_type": "code", - "execution_count": 124, + "execution_count": 128, "id": "30002669-3799-4a39-831e-d276a4708f9a", "metadata": {}, "outputs": [], @@ -52,7 +52,7 @@ }, { "cell_type": "code", - "execution_count": 125, + "execution_count": 130, "id": "3ed7b961-7879-4937-ac24-11d615e091b8", "metadata": {}, "outputs": [], @@ -62,7 +62,7 @@ }, { "cell_type": "code", - "execution_count": 126, + "execution_count": 131, "id": "0da718e8-7ad8-42f1-872f-c8805fd3c41c", "metadata": {}, "outputs": [], @@ -70,6 +70,18 @@ "sc = SparkContext(conf=conf)" ] }, + { + "cell_type": "code", + "execution_count": 133, + "id": "47896fd0-906e-413d-8980-3948d73a3067", + "metadata": {}, + "outputs": [], + "source": [ + "tripData = sc.textFile(\"tripd_with_error.csv\")\n", + "tripsHeader = tripData.first()\n", + "trips = tripData.filter(lambda row: row != tripsHeader).map(lambda row: row.split(\",\", -1))" + ] + }, { "cell_type": "code", "execution_count": 10, @@ -492,7 +504,7 @@ }, { "cell_type": "code", - "execution_count": 62, + "execution_count": 134, "id": "84aa084c-8e7a-427b-9bd9-1b7e106c1ea2", "metadata": {}, "outputs": [], @@ -554,7 +566,7 @@ }, { "cell_type": "code", - "execution_count": 80, + "execution_count": 150, "id": "9f728f0d-86ee-4d9e-9292-31ca3c1fae35", "metadata": {}, "outputs": [], @@ -574,7 +586,8 @@ " zip_code: str\n", " \n", " for trip in trips:\n", - " yield Trip( \n", + " try:\n", + " yield Trip( \n", " trip_id = int(trip[0]),\n", " duration = int(trip[1]),\n", " start_date = datetime.strptime(trip[2], '%m/%d/%Y %H:%M'),\n", @@ -586,29 +599,40 @@ " bike_id = int(trip[8]),\n", " subscription_type = trip[9],\n", " zip_code = trip[10]\n", - " )" + " ) \n", + " except:\n", + " pass" ] }, { "cell_type": "code", - "execution_count": 81, + "execution_count": 151, "id": "c265426b-f20b-426e-aeba-7295bc797f3b", "metadata": {}, "outputs": [ { "data": { "text/plain": [ - "Trip(trip_id=4576, duration=63, start_date=datetime.datetime(2013, 8, 29, 14, 13), start_station_name='South Van Ness at Market', start_station_id=66, end_date=datetime.datetime(2013, 8, 29, 14, 14), end_station_name='South Van Ness at Market', end_station_id='66', bike_id=520, subscription_type='Subscriber', zip_code='94127')" + "[Trip(trip_id=4607, duration=70, start_date=datetime.datetime(2013, 8, 29, 14, 42), start_station_name='San Jose City Hall', start_station_id=10, end_date=datetime.datetime(2013, 8, 29, 14, 43), end_station_name='San Jose City Hall', end_station_id='10', bike_id=661, subscription_type='Subscriber', zip_code='95138'),\n", + " Trip(trip_id=4130, duration=71, start_date=datetime.datetime(2013, 8, 29, 10, 16), start_station_name='Mountain View City Hall', start_station_id=27, end_date=datetime.datetime(2013, 8, 29, 10, 17), end_station_name='Mountain View City Hall', end_station_id='27', bike_id=48, subscription_type='Subscriber', zip_code='97214'),\n", + " Trip(trip_id=4251, duration=77, start_date=datetime.datetime(2013, 8, 29, 11, 29), start_station_name='San Jose City Hall', start_station_id=10, end_date=datetime.datetime(2013, 8, 29, 11, 30), end_station_name='San Jose City Hall', end_station_id='10', bike_id=26, subscription_type='Subscriber', zip_code='95060'),\n", + " Trip(trip_id=4299, duration=83, start_date=datetime.datetime(2013, 8, 29, 12, 2), start_station_name='South Van Ness at Market', start_station_id=66, end_date=datetime.datetime(2013, 8, 29, 12, 4), end_station_name='Market at 10th', end_station_id='67', bike_id=319, subscription_type='Subscriber', zip_code='94103'),\n", + " Trip(trip_id=4927, duration=103, start_date=datetime.datetime(2013, 8, 29, 18, 54), start_station_name='Golden Gate at Polk', start_station_id=59, end_date=datetime.datetime(2013, 8, 29, 18, 56), end_station_name='Golden Gate at Polk', end_station_id='59', bike_id=527, subscription_type='Subscriber', zip_code='94109'),\n", + " Trip(trip_id=4500, duration=109, start_date=datetime.datetime(2013, 8, 29, 13, 25), start_station_name='Santa Clara at Almaden', start_station_id=4, end_date=datetime.datetime(2013, 8, 29, 13, 27), end_station_name='Adobe on Almaden', end_station_id='5', bike_id=679, subscription_type='Subscriber', zip_code='95112'),\n", + " Trip(trip_id=4563, duration=111, start_date=datetime.datetime(2013, 8, 29, 14, 2), start_station_name='San Salvador at 1st', start_station_id=8, end_date=datetime.datetime(2013, 8, 29, 14, 4), end_station_name='San Salvador at 1st', end_station_id='8', bike_id=687, subscription_type='Subscriber', zip_code='95112'),\n", + " Trip(trip_id=4760, duration=113, start_date=datetime.datetime(2013, 8, 29, 17, 1), start_station_name='South Van Ness at Market', start_station_id=66, end_date=datetime.datetime(2013, 8, 29, 17, 3), end_station_name='South Van Ness at Market', end_station_id='66', bike_id=553, subscription_type='Subscriber', zip_code='94103'),\n", + " Trip(trip_id=4258, duration=114, start_date=datetime.datetime(2013, 8, 29, 11, 33), start_station_name='San Jose City Hall', start_station_id=10, end_date=datetime.datetime(2013, 8, 29, 11, 35), end_station_name='MLK Library', end_station_id='11', bike_id=107, subscription_type='Subscriber', zip_code='95060'),\n", + " Trip(trip_id=4549, duration=125, start_date=datetime.datetime(2013, 8, 29, 13, 52), start_station_name='Spear at Folsom', start_station_id=49, end_date=datetime.datetime(2013, 8, 29, 13, 55), end_station_name='Embarcadero at Bryant', end_station_id='54', bike_id=368, subscription_type='Subscriber', zip_code='94109')]" ] }, - "execution_count": 81, + "execution_count": 151, "metadata": {}, "output_type": "execute_result" } ], "source": [ "tripsInternal = trips.mapPartitions(initTrip)\n", - "tripsInternal.first()" + "tripsInternal.take(10)" ] }, { @@ -969,7 +993,7 @@ }, { "cell_type": "code", - "execution_count": 127, + "execution_count": 152, "id": "c98261f7-283c-4c7e-b915-3778ff972f5f", "metadata": {}, "outputs": [], diff --git a/data/trips.csv b/data/trips.csv index 7656882..19b2dd7 100644 --- a/data/trips.csv +++ b/data/trips.csv @@ -1,6 +1,6 @@ id,duration,start_date,start_station_name,start_station_id,end_date,end_station_name,end_station_id,bike_id,subscription_type,zip_code -4576,63,8/29/2013 14:13,South Van Ness at Market,66,8/29/2013 14:14,South Van Ness at Market,66,520,Subscriber,94127 -4607,70,8/29/2013 14:42,San Jose City Hall,10,8/29/2013 14:43,San Jose City Hall,10,661,Subscriber,95138 +4576,63,,South Van Ness at Market,66,8/29/2013 14:14,South Van Ness at Market,66,520,Subscriber,94127 +4607,,8/29/2013 14:42,San Jose City Hall,10,8/29/2013 14:43,San Jose City Hall,10,661,Subscriber,95138 4130,71,8/29/2013 10:16,Mountain View City Hall,27,8/29/2013 10:17,Mountain View City Hall,27,48,Subscriber,97214 4251,77,8/29/2013 11:29,San Jose City Hall,10,8/29/2013 11:30,San Jose City Hall,10,26,Subscriber,95060 4299,83,8/29/2013 12:02,South Van Ness at Market,66,8/29/2013 12:04,Market at 10th,67,319,Subscriber,94103