diff --git a/L0 - Introduction to MapReduce data processing model/MapReduceExamples.ipynb b/L0 - Introduction to MapReduce data processing model/MapReduceExamples.ipynb new file mode 100644 index 0000000..439f033 --- /dev/null +++ b/L0 - Introduction to MapReduce data processing model/MapReduceExamples.ipynb @@ -0,0 +1,1485 @@ +{ + "nbformat": 4, + "nbformat_minor": 0, + "metadata": { + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "colab": { + "name": "MapReduceExamples.ipynb", + "provenance": [], + "collapsed_sections": [] + }, + "kernelspec": { + "name": "python3", + "display_name": "Python 3" + } + }, + "cells": [ + { + "cell_type": "markdown", + "metadata": { + "id": "82OvPKEiEqjc" + }, + "source": [ + "# Введение в MapReduce модель на Python\n" + ] + }, + { + "cell_type": "code", + "metadata": { + "id": "JQ2cvXLjICmI" + }, + "source": [ + "from typing import NamedTuple # requires python 3.6+\n", + "from typing import Iterator" + ], + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "code", + "metadata": { + "id": "yjPHumVwEyEg" + }, + "source": [ + "def MAP(_, row:NamedTuple):\n", + " if (row.gender == 'female'):\n", + " yield (row.age, row)\n", + " \n", + "def REDUCE(age:str, rows:Iterator[NamedTuple]):\n", + " sum = 0\n", + " count = 0\n", + " for row in rows:\n", + " sum += row.social_contacts\n", + " count += 1\n", + " if (count > 0):\n", + " yield (age, sum/count)\n", + " else:\n", + " yield (age, 0)" + ], + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "vBKMgpG_ilaZ" + }, + "source": [ + "Модель элемента данных" + ] + }, + { + "cell_type": "code", + "metadata": { + "id": "Rv-XIjhTJPx3" + }, + "source": [ + "class User(NamedTuple):\n", + " id: int\n", + " age: str\n", + " social_contacts: int\n", + " gender: str" + ], + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "code", + "metadata": { + "id": "5KV0Ze2vQgu5" + }, + "source": [ + "input_collection = [\n", + " User(id=0, age=55, gender='male', social_contacts=20),\n", + " User(id=1, age=25, gender='female', social_contacts=240),\n", + " User(id=2, age=25, gender='female', social_contacts=500),\n", + " User(id=3, age=33, gender='female', social_contacts=800)\n", + "]" + ], + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "YFeqzyZxZIFZ" + }, + "source": [ + "Функция RECORDREADER моделирует чтение элементов с диска или по сети." + ] + }, + { + "cell_type": "code", + "metadata": { + "id": "S5HR4E_GQoMJ" + }, + "source": [ + "def RECORDREADER():\n", + " return [(u.id, u) for u in input_collection]" + ], + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "code", + "metadata": { + "id": "NeEoWla-ROUy", + "colab": { + "base_uri": "https://localhost:8080/", + "height": 85 + }, + "outputId": "94ca6e0e-4644-4282-acbf-1759d7ba2918" + }, + "source": [ + "list(RECORDREADER())" + ], + "execution_count": null, + "outputs": [ + { + "output_type": "execute_result", + "data": { + "text/plain": [ + "[(0, User(id=0, age=55, social_contacts=20, gender='male')),\n", + " (1, User(id=1, age=25, social_contacts=240, gender='female')),\n", + " (2, User(id=2, age=25, social_contacts=500, gender='female')),\n", + " (3, User(id=3, age=33, social_contacts=800, gender='female'))]" + ] + }, + "metadata": { + "tags": [] + }, + "execution_count": 20 + } + ] + }, + { + "cell_type": "code", + "metadata": { + "id": "YB8orgPSZs8M" + }, + "source": [ + "def flatten(nested_iterable):\n", + " for iterable in nested_iterable:\n", + " for element in iterable:\n", + " yield element" + ], + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "code", + "metadata": { + "id": "74oyvDLaRmd5", + "colab": { + "base_uri": "https://localhost:8080/", + "height": 68 + }, + "outputId": "c6147702-7153-47c7-a574-d5fe6abe29a8" + }, + "source": [ + "map_output = flatten(map(lambda x: MAP(*x), RECORDREADER()))\n", + "map_output = list(map_output) # materialize\n", + "map_output" + ], + "execution_count": null, + "outputs": [ + { + "output_type": "execute_result", + "data": { + "text/plain": [ + "[(25, User(id=1, age=25, social_contacts=240, gender='female')),\n", + " (25, User(id=2, age=25, social_contacts=500, gender='female')),\n", + " (33, User(id=3, age=33, social_contacts=800, gender='female'))]" + ] + }, + "metadata": { + "tags": [] + }, + "execution_count": 22 + } + ] + }, + { + "cell_type": "code", + "metadata": { + "id": "8ncYDJ3-VzDn" + }, + "source": [ + "def groupbykey(iterable):\n", + " t = {}\n", + " for (k2, v2) in iterable:\n", + " t[k2] = t.get(k2, []) + [v2]\n", + " return t.items()" + ], + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "code", + "metadata": { + "id": "cKzY_6COWOA2", + "colab": { + "base_uri": "https://localhost:8080/", + "height": 85 + }, + "outputId": "e6791b12-e409-47e9-bcd4-e9f8ca8611bd" + }, + "source": [ + "shuffle_output = groupbykey(map_output)\n", + "shuffle_output = list(shuffle_output)\n", + "shuffle_output" + ], + "execution_count": null, + "outputs": [ + { + "output_type": "execute_result", + "data": { + "text/plain": [ + "[(25,\n", + " [User(id=1, age=25, social_contacts=240, gender='female'),\n", + " User(id=2, age=25, social_contacts=500, gender='female')]),\n", + " (33, [User(id=3, age=33, social_contacts=800, gender='female')])]" + ] + }, + "metadata": { + "tags": [] + }, + "execution_count": 24 + } + ] + }, + { + "cell_type": "code", + "metadata": { + "id": "NlA7lkDDYL0t", + "colab": { + "base_uri": "https://localhost:8080/", + "height": 34 + }, + "outputId": "6b25d03f-5c92-4f3b-f500-6d70acd598b7" + }, + "source": [ + "reduce_output = flatten(map(lambda x: REDUCE(*x), shuffle_output))\n", + "reduce_output = list(reduce_output)\n", + "reduce_output" + ], + "execution_count": null, + "outputs": [ + { + "output_type": "execute_result", + "data": { + "text/plain": [ + "[(25, 370.0), (33, 800.0)]" + ] + }, + "metadata": { + "tags": [] + }, + "execution_count": 25 + } + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "xf6qhHEtd6bI" + }, + "source": [ + "Все действия одним конвейером!" + ] + }, + { + "cell_type": "code", + "metadata": { + "id": "dZaQGYxCdpw5", + "colab": { + "base_uri": "https://localhost:8080/", + "height": 34 + }, + "outputId": "3f5c6425-e5c5-49d2-b2cd-ce58a9acc33c" + }, + "source": [ + "list(flatten(map(lambda x: REDUCE(*x), groupbykey(flatten(map(lambda x: MAP(*x), RECORDREADER()))))))" + ], + "execution_count": null, + "outputs": [ + { + "output_type": "execute_result", + "data": { + "text/plain": [ + "[(25, 370.0), (33, 800.0)]" + ] + }, + "metadata": { + "tags": [] + }, + "execution_count": 26 + } + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "Vq3EWRIpwSiJ" + }, + "source": [ + "# **MapReduce**\n", + "Выделим общую для всех пользователей часть системы в отдельную функцию высшего порядка. Это наиболее простая модель MapReduce, без учёта распределённого хранения данных. \n", + "\n", + "Пользователь для решения своей задачи реализует RECORDREADER, MAP, REDUCE." + ] + }, + { + "cell_type": "code", + "metadata": { + "id": "V1PZeQMwwVjc" + }, + "source": [ + "def flatten(nested_iterable):\n", + " for iterable in nested_iterable:\n", + " for element in iterable:\n", + " yield element\n", + "\n", + "def groupbykey(iterable):\n", + " t = {}\n", + " for (k2, v2) in iterable:\n", + " t[k2] = t.get(k2, []) + [v2]\n", + " return t.items()\n", + "\n", + "def MapReduce(RECORDREADER, MAP, REDUCE):\n", + " return flatten(map(lambda x: REDUCE(*x), groupbykey(flatten(map(lambda x: MAP(*x), RECORDREADER())))))" + ], + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "iFIVrimep678" + }, + "source": [ + "## Спецификация MapReduce\n", + "\n", + "\n", + "\n", + "```\n", + "f (k1, v1) -> (k2,v2)*\n", + "g (k2, v2*) -> (k3,v3)*\n", + " \n", + "mapreduce ((k1,v1)*) -> (k3,v3)*\n", + "groupby ((k2,v2)*) -> (k2,v2*)*\n", + "flatten (e2**) -> e2*\n", + " \n", + "mapreduce .map(f).flatten.groupby(k2).map(g).flatten\n", + "```\n", + "\n", + "\n" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "QtTFyqke3KGe" + }, + "source": [ + "# Примеры" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "BNhh5763w5Vn" + }, + "source": [ + "## SQL " + ] + }, + { + "cell_type": "code", + "metadata": { + "id": "QkyurnvGxBGk", + "colab": { + "base_uri": "https://localhost:8080/", + "height": 34 + }, + "outputId": "84761282-d2ba-435a-e8d7-a85150730e10" + }, + "source": [ + "from typing import NamedTuple # requires python 3.6+\n", + "from typing import Iterator\n", + "\n", + "class User(NamedTuple):\n", + " id: int\n", + " age: str\n", + " social_contacts: int\n", + " gender: str\n", + " \n", + "input_collection = [\n", + " User(id=0, age=55, gender='male', social_contacts=20),\n", + " User(id=1, age=25, gender='female', social_contacts=240),\n", + " User(id=2, age=25, gender='female', social_contacts=500),\n", + " User(id=3, age=33, gender='female', social_contacts=800)\n", + "]\n", + "\n", + "def MAP(_, row:NamedTuple):\n", + " if (row.gender == 'female'):\n", + " yield (row.age, row)\n", + " \n", + "def REDUCE(age:str, rows:Iterator[NamedTuple]):\n", + " sum = 0\n", + " count = 0\n", + " for row in rows:\n", + " sum += row.social_contacts\n", + " count += 1\n", + " if (count > 0):\n", + " yield (age, sum/count)\n", + " else:\n", + " yield (age, 0)\n", + " \n", + "def RECORDREADER():\n", + " return [(u.id, u) for u in input_collection]\n", + "\n", + "output = MapReduce(RECORDREADER, MAP, REDUCE)\n", + "output = list(output)\n", + "output" + ], + "execution_count": null, + "outputs": [ + { + "output_type": "execute_result", + "data": { + "text/plain": [ + "[(25, 370.0), (33, 800.0)]" + ] + }, + "metadata": { + "tags": [] + }, + "execution_count": 28 + } + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "kNKYIeerx0nY" + }, + "source": [ + "## Matrix-Vector multiplication " + ] + }, + { + "cell_type": "code", + "metadata": { + "id": "rwcntRcCyi1V", + "colab": { + "base_uri": "https://localhost:8080/", + "height": 102 + }, + "outputId": "606737ab-6b55-455c-931f-4fc45155f8a9" + }, + "source": [ + "from typing import Iterator\n", + "import numpy as np\n", + "\n", + "mat = np.ones((5,4))\n", + "vec = np.random.rand(5) # in-memory vector in all map tasks\n", + "\n", + "def MAP(coordinates:(int, int), value:int):\n", + " i, j = coordinates\n", + " yield (i, value*vec[j])\n", + " \n", + "def REDUCE(i:int, products:Iterator[NamedTuple]):\n", + " sum = 0\n", + " for p in products:\n", + " sum += p\n", + " yield (i, sum)\n", + "\n", + "def RECORDREADER():\n", + " for i in range(mat.shape[0]):\n", + " for j in range(mat.shape[1]):\n", + " yield ((i, j), mat[i,j])\n", + " \n", + "output = MapReduce(RECORDREADER, MAP, REDUCE)\n", + "output = list(output)\n", + "output" + ], + "execution_count": null, + "outputs": [ + { + "output_type": "execute_result", + "data": { + "text/plain": [ + "[(0, 2.905589809636405),\n", + " (1, 2.905589809636405),\n", + " (2, 2.905589809636405),\n", + " (3, 2.905589809636405),\n", + " (4, 2.905589809636405)]" + ] + }, + "metadata": { + "tags": [] + }, + "execution_count": 29 + } + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "ruZREYdi2o4O" + }, + "source": [ + "## Inverted index " + ] + }, + { + "cell_type": "code", + "metadata": { + "id": "vt9H9Alf3TYv", + "colab": { + "base_uri": "https://localhost:8080/", + "height": 102 + }, + "outputId": "51aeffc9-e111-4607-bd84-cfcc7b56f238" + }, + "source": [ + "from typing import Iterator\n", + "\n", + "d1 = \"it is what it is\"\n", + "d2 = \"what is it\"\n", + "d3 = \"it is a banana\"\n", + "documents = [d1, d2, d3]\n", + "\n", + "def RECORDREADER():\n", + " for (docid, document) in enumerate(documents):\n", + " yield (\"{}\".format(docid), document)\n", + " \n", + "def MAP(docId:str, body:str):\n", + " for word in set(body.split(' ')):\n", + " yield (word, docId)\n", + " \n", + "def REDUCE(word:str, docIds:Iterator[str]):\n", + " yield (word, sorted(docIds))\n", + "\n", + "output = MapReduce(RECORDREADER, MAP, REDUCE)\n", + "output = list(output)\n", + "output" + ], + "execution_count": null, + "outputs": [ + { + "output_type": "execute_result", + "data": { + "text/plain": [ + "[('what', ['0', '1']),\n", + " ('is', ['0', '1', '2']),\n", + " ('it', ['0', '1', '2']),\n", + " ('a', ['2']),\n", + " ('banana', ['2'])]" + ] + }, + "metadata": { + "tags": [] + }, + "execution_count": 30 + } + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "R7az-6DA6qr2" + }, + "source": [ + "## WordCount" + ] + }, + { + "cell_type": "code", + "metadata": { + "id": "dN-nbtgG6uYG", + "colab": { + "base_uri": "https://localhost:8080/", + "height": 34 + }, + "outputId": "24117576-7931-401d-a581-28e246b23453" + }, + "source": [ + "from typing import Iterator\n", + "\n", + "d1 = \"\"\"\n", + "it is what it is\n", + "it is what it is\n", + "it is what it is\"\"\"\n", + "d2 = \"\"\"\n", + "what is it\n", + "what is it\"\"\"\n", + "d3 = \"\"\"\n", + "it is a banana\"\"\"\n", + "documents = [d1, d2, d3]\n", + "\n", + "def RECORDREADER():\n", + " for (docid, document) in enumerate(documents):\n", + " for (lineid, line) in enumerate(document.split('\\n')):\n", + " yield (\"{}:{}\".format(docid,lineid), line)\n", + "\n", + "def MAP(docId:str, line:str):\n", + " for word in line.split(\" \"): \n", + " yield (word, 1)\n", + " \n", + "def REDUCE(word:str, counts:Iterator[int]):\n", + " sum = 0\n", + " for c in counts:\n", + " sum += c\n", + " yield (word, sum)\n", + "\n", + "output = MapReduce(RECORDREADER, MAP, REDUCE)\n", + "output = list(output)\n", + "output" + ], + "execution_count": null, + "outputs": [ + { + "output_type": "execute_result", + "data": { + "text/plain": [ + "[('', 3), ('it', 9), ('is', 9), ('what', 5), ('a', 1), ('banana', 1)]" + ] + }, + "metadata": { + "tags": [] + }, + "execution_count": 31 + } + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "h-jRAcYCAkkk" + }, + "source": [ + "# MapReduce Distributed\n", + "\n", + "Добавляется в модель фабрика RECORDREARER-ов --- INPUTFORMAT, функция распределения промежуточных результатов по партициям PARTITIONER, и функция COMBINER для частичной аггрегации промежуточных результатов до распределения по новым партициям." + ] + }, + { + "cell_type": "code", + "metadata": { + "id": "nw-b-xJsApgW" + }, + "source": [ + "def flatten(nested_iterable):\n", + " for iterable in nested_iterable:\n", + " for element in iterable:\n", + " yield element\n", + "\n", + "def groupbykey(iterable):\n", + " t = {}\n", + " for (k2, v2) in iterable:\n", + " t[k2] = t.get(k2, []) + [v2]\n", + " return t.items()\n", + " \n", + "def groupbykey_distributed(map_partitions, PARTITIONER):\n", + " global reducers\n", + " partitions = [dict() for _ in range(reducers)]\n", + " for map_partition in map_partitions:\n", + " for (k2, v2) in map_partition:\n", + " p = partitions[PARTITIONER(k2)]\n", + " p[k2] = p.get(k2, []) + [v2]\n", + " return [(partition_id, sorted(partition.items(), key=lambda x: x[0])) for (partition_id, partition) in enumerate(partitions)]\n", + " \n", + "def PARTITIONER(obj):\n", + " global reducers\n", + " return hash(obj) % reducers\n", + " \n", + "def MapReduceDistributed(INPUTFORMAT, MAP, REDUCE, PARTITIONER=PARTITIONER, COMBINER=None):\n", + " map_partitions = map(lambda record_reader: flatten(map(lambda k1v1: MAP(*k1v1), record_reader)), INPUTFORMAT())\n", + " if COMBINER != None:\n", + " map_partitions = map(lambda map_partition: flatten(map(lambda k2v2: COMBINER(*k2v2), groupbykey(map_partition))), map_partitions)\n", + " reduce_partitions = groupbykey_distributed(map_partitions, PARTITIONER) # shuffle\n", + " reduce_outputs = map(lambda reduce_partition: (reduce_partition[0], flatten(map(lambda reduce_input_group: REDUCE(*reduce_input_group), reduce_partition[1]))), reduce_partitions)\n", + " \n", + " print(\"{} key-value pairs were sent over a network.\".format(sum([len(vs) for (k,vs) in flatten([partition for (partition_id, partition) in reduce_partitions])])))\n", + " return reduce_outputs" + ], + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "kxirlf3XqZxY" + }, + "source": [ + "## Спецификация MapReduce Distributed\n", + "\n", + "\n", + "```\n", + "f (k1, v1) -> (k2,v2)*\n", + "g (k2, v2*) -> (k3,v3)*\n", + " \n", + "e1 (k1, v1)\n", + "e2 (k2, v2)\n", + "partition1 (k2, v2)*\n", + "partition2 (k2, v2*)*\n", + " \n", + "flatmap (e1->e2*, e1*) -> partition1*\n", + "groupby (partition1*) -> partition2*\n", + "\n", + "mapreduce ((k1,v1)*) -> (k3,v3)*\n", + "mapreduce .flatmap(f).groupby(k2).flatmap(g)\n", + "```\n", + "\n" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "CWYw_CpbbY3C" + }, + "source": [ + "## WordCount " + ] + }, + { + "cell_type": "code", + "metadata": { + "id": "uR_zfGFkMZlp", + "colab": { + "base_uri": "https://localhost:8080/", + "height": 68 + }, + "outputId": "c8d46167-473d-43b9-881a-2396991b3731" + }, + "source": [ + "from typing import Iterator\n", + "import numpy as np\n", + "\n", + "d1 = \"\"\"\n", + "it is what it is\n", + "it is what it is\n", + "it is what it is\"\"\"\n", + "d2 = \"\"\"\n", + "what is it\n", + "what is it\"\"\"\n", + "d3 = \"\"\"\n", + "it is a banana\"\"\"\n", + "documents = [d1, d2, d3, d1, d2, d3]\n", + "\n", + "maps = 3\n", + "reducers = 2\n", + "\n", + "def INPUTFORMAT():\n", + " global maps\n", + " \n", + " def RECORDREADER(split):\n", + " for (docid, document) in enumerate(split):\n", + " for (lineid, line) in enumerate(document.split('\\n')):\n", + " yield (\"{}:{}\".format(docid,lineid), line)\n", + " \n", + " split_size = int(np.ceil(len(documents)/maps))\n", + " for i in range(0, len(documents), split_size):\n", + " yield RECORDREADER(documents[i:i+split_size])\n", + "\n", + "def MAP(docId:str, line:str):\n", + " for word in line.split(\" \"): \n", + " yield (word, 1)\n", + " \n", + "def REDUCE(word:str, counts:Iterator[int]):\n", + " sum = 0\n", + " for c in counts:\n", + " sum += c\n", + " yield (word, sum)\n", + " \n", + "# try to set COMBINER=REDUCER and look at the number of values sent over the network \n", + "partitioned_output = MapReduceDistributed(INPUTFORMAT, MAP, REDUCE, COMBINER=None) \n", + "partitioned_output = [(partition_id, list(partition)) for (partition_id, partition) in partitioned_output]\n", + "partitioned_output" + ], + "execution_count": null, + "outputs": [ + { + "output_type": "stream", + "text": [ + "56 key-value pairs were sent over a network.\n" + ], + "name": "stdout" + }, + { + "output_type": "execute_result", + "data": { + "text/plain": [ + "[(0, [('', 6), ('a', 2), ('banana', 2), ('is', 18), ('it', 18), ('what', 10)]),\n", + " (1, [])]" + ] + }, + "metadata": { + "tags": [] + }, + "execution_count": 33 + } + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "gCJGx8IQ87xS" + }, + "source": [ + "## TeraSort" + ] + }, + { + "cell_type": "code", + "metadata": { + "id": "P2v8v1v_8_YR", + "colab": { + "base_uri": "https://localhost:8080/", + "height": 578 + }, + "outputId": "e0987c25-9757-46cb-8e55-d5d2adfbee2b" + }, + "source": [ + "import numpy as np\n", + "\n", + "input_values = np.random.rand(30)\n", + "maps = 3\n", + "reducers = 2\n", + "min_value = 0.0\n", + "max_value = 1.0\n", + "\n", + "def INPUTFORMAT():\n", + " global maps\n", + " \n", + " def RECORDREADER(split):\n", + " for value in split:\n", + " yield (value, None)\n", + " \n", + " split_size = int(np.ceil(len(input_values)/maps))\n", + " for i in range(0, len(input_values), split_size):\n", + " yield RECORDREADER(input_values[i:i+split_size])\n", + " \n", + "def MAP(value:int, _):\n", + " yield (value, None)\n", + " \n", + "def PARTITIONER(key):\n", + " global reducers\n", + " global max_value\n", + " global min_value\n", + " bucket_size = (max_value-min_value)/reducers\n", + " bucket_id = 0\n", + " while((key>(bucket_id+1)*bucket_size) and ((bucket_id+1)*bucket_size