from elasticsearch import Elasticsearch, exceptions import os import time from flask import Flask, jsonify, request, render_template import sys import requests es_scheme = sys.argv[1] es_host = sys.argv[2] es_port = int(sys.argv[3]) es_user = sys.argv[4] es_password = sys.argv[5] connection_string = f"{es_scheme}://{es_host}:{es_port}" print(f"Elastic server: {es_scheme}://{es_host}:{es_port}, auth: {es_user}:{es_password}") es = Elasticsearch( connection_string, basic_auth=(es_user, es_password), verify_certs=False ) app = Flask(__name__) def load_data_in_es(): """ creates an index in elasticsearch """ url = "http://data.sfgov.org/resource/rqzj-sfat.json" r = requests.get(url) data = r.json() print("Loading data in elasticsearch ...") for id, truck in enumerate(data): res = es.index(index="sfdata", id=id, body=truck) print("Total trucks loaded: ", len(data)) def safe_check_index(index, retry=60): """ connect to ES with retry """ if not retry: print("Out of retries. Bailing out...") sys.exit(1) try: status = es.indices.exists(index=index) return status except exceptions.ConnectionError as e: print(f"Unable to connect to {connection_string}. Retrying in 5 secs...", flush=True) time.sleep(5) safe_check_index(index, retry-1) def format_fooditems(string): items = [x.strip().lower() for x in string.split(":")] return items[1:] if items[0].find("cold truck") > -1 else items def check_and_load_index(): """ checks if index exits and loads the data accordingly """ if not safe_check_index('sfdata'): print("Index not found...") load_data_in_es() ########### ### APP ### ########### @app.route('/') def index(): return render_template('index.html') @app.route('/debug') def test_es(): resp = {} try: msg = es.cat.indices() resp["msg"] = msg resp["status"] = "success" except: resp["status"] = "failure" resp["msg"] = "Unable to reach ES" return jsonify(resp) @app.route('/search') def search(): key = request.args.get('q') if not key: return jsonify({ "status": "failure", "msg": "Please provide a query" }) try: res = es.search( index="sfdata", body={ "query": {"match": {"fooditems": key}}, "size": 750 # max document size }) except Exception as e: return jsonify({ "status": "failure", "msg": "error in reaching elasticsearch" }) # filtering results vendors = set([x["_source"]["applicant"] for x in res["hits"]["hits"]]) temp = {v: [] for v in vendors} fooditems = {v: "" for v in vendors} for r in res["hits"]["hits"]: applicant = r["_source"]["applicant"] if "location" in r["_source"]: truck = { "hours" : r["_source"].get("dayshours", "NA"), "schedule" : r["_source"].get("schedule", "NA"), "address" : r["_source"].get("address", "NA"), "location" : r["_source"]["location"] } fooditems[applicant] = r["_source"]["fooditems"] temp[applicant].append(truck) # building up results results = {"trucks": []} for v in temp: results["trucks"].append({ "name": v, "fooditems": format_fooditems(fooditems[v]), "branches": temp[v], "drinks": fooditems[v].find("COLD TRUCK") > -1 }) hits = len(results["trucks"]) locations = sum([len(r["branches"]) for r in results["trucks"]]) return jsonify({ "trucks": results["trucks"], "hits": hits, "locations": locations, "status": "success" }) if __name__ == "__main__": ENVIRONMENT_DEBUG = os.environ.get("DEBUG", False) check_and_load_index() app.run(host='0.0.0.0', port=5000, debug=ENVIRONMENT_DEBUG)