dse-assignment/components/x_way/x_way_server.py

86 lines
2.3 KiB
Python

import flask
import flask_restx
import requests
from bson import json_util
from flask import Flask, jsonify
from flask_cors import CORS
from flask import request
import json;
app = Flask(__name__)
CORS(app)
api_bp = flask.Blueprint("api", __name__, url_prefix="/api/v1/resources")
API = flask_restx.Api(api_bp)
app.register_blueprint(api_bp)
NAMESPACE = API.namespace("car_events")
ENTITY_IDENT_URL = 'http://entityident:5002/api/v1/resources/'
EVENT_STORE_URL = 'http://eventstore:5001/api/keys/'
@NAMESPACE.route('/')
class CarsEvents(flask_restx.Resource):
@NAMESPACE.doc(params={'vin': {'description': 'Vehicle Identifier Number of car',
'type': 'String', 'default': '5GZCZ43D13S812715'}})
def get(self):
vin = request.args.get('vin')
try:
response = requests.get(EVENT_STORE_URL + 'DAF:' + vin + '/0/')
cars = json.loads(response.text)
except requests.exceptions.ConnectionError as e:
print("Is the EVENT_STORE_URL running and reachable?")
raise e
return jsonify(cars)
@app.route('/api/v1/resources/traffic_light_events', methods=['GET'])
def get_traffic_light_events():
id = request.args.get('id')
try:
response = requests.get(EVENT_STORE_URL + 'TL:' + id + '/0/')
traffic_lights = json.loads(response.text)
except requests.exceptions.ConnectionError as e:
print("Is the EVENT_STORE_URL running and reachable?")
raise e
return json_util.dumps(traffic_lights)
@app.route('/api/v1/resources/cars', methods=['GET'])
def get_cars():
try:
response = requests.get(ENTITY_IDENT_URL + 'cars')
cars = response.json()['cursor']
except requests.exceptions.ConnectionError as e:
print("Is the entity_ident_server running and reachable?")
raise e
return json_util.dumps(cars)
@app.route('/api/v1/resources/traffic_lights', methods=['GET'])
def get_traffic_lights():
try:
response = requests.get(ENTITY_IDENT_URL + 'traffic_lights')
traffic_lights = response.json()['cursor']
except requests.exceptions.ConnectionError as e:
print("Is the entity_ident_server running and reachable?")
raise e
return json_util.dumps(traffic_lights)
if __name__ == '__main__':
app.run('0.0.0.0', 5004)