299 lines
9.7 KiB
Python
299 lines
9.7 KiB
Python
import os
|
|
import base64
|
|
import requests
|
|
import json
|
|
import time
|
|
from datetime import datetime
|
|
|
|
def print_response(response):
|
|
print("Code: " + str(response.status_code) + "; Body: " + str(response.json()))
|
|
|
|
def unwrap_file(file_str: str) -> bytes:
|
|
return base64.b64decode(file_str)
|
|
|
|
def send_image(identifier, image_path, metadata_payload):
|
|
print("Sending image with identifier " + identifier)
|
|
if not os.path.isfile(image_path):
|
|
print("No image found at path " + image_path)
|
|
return False
|
|
file_encoded_b64 = None
|
|
with open(image_path, "rb") as file_to_send:
|
|
file_encoded_b64 = base64.b64encode(file_to_send.read())
|
|
|
|
if file_encoded_b64 is None:
|
|
print("Error reading file")
|
|
return False
|
|
|
|
file_payload = file_encoded_b64.decode('utf-8')
|
|
baseurl = "http://127.0.0.1:8000"
|
|
post_url = "/image/post"
|
|
body = {
|
|
'id': identifier,
|
|
'metadata': metadata_payload,
|
|
'image_data': file_payload
|
|
}
|
|
try:
|
|
response = requests.post(baseurl + post_url, json=body)
|
|
print_response(response)
|
|
except os.error:
|
|
print("Error sending request")
|
|
return True
|
|
|
|
|
|
def get_image(identifier):
|
|
print("Getting image with identifier "+identifier)
|
|
|
|
baseurl = "http://127.0.0.1:8000"
|
|
get_url = "/image/get/"+identifier
|
|
|
|
try:
|
|
response = requests.get(baseurl + get_url)
|
|
payload = response.json()
|
|
print(payload['id'])
|
|
print(payload['metadata'])
|
|
b64encoded_image = unwrap_file(payload['image_data'])
|
|
goal_folder_name = "images_fetch"
|
|
path = "../../"+goal_folder_name+"/"+payload['metadata']['filename']
|
|
goal_folder_path = os.path.sep.join(os.path.abspath(__file__).split(os.path.sep)[:-3]) + os.path.sep + goal_folder_name
|
|
out = open(path, 'wb')
|
|
out.write(b64encoded_image)
|
|
out.close()
|
|
os.system(goal_folder_path + os.path.sep + payload['metadata']['filename'])
|
|
except os.error:
|
|
print("Error sending request")
|
|
return True
|
|
|
|
|
|
|
|
def get_all():
|
|
print("Getting all images")
|
|
|
|
baseurl = "http://127.0.0.1:8000"
|
|
get_url = "/image/get/all"
|
|
|
|
try:
|
|
response = requests.get(baseurl + get_url)
|
|
print_response(response)
|
|
except os.error:
|
|
print("Error sending request")
|
|
return True
|
|
|
|
metadata = None
|
|
index = 0
|
|
metadata_folder = "." + os.path.sep
|
|
metadata_file = "metadata_short" + ".json"
|
|
metadata_path = metadata_folder + os.path.sep + metadata_file
|
|
|
|
image_folder = "." + os.path.sep + "images"
|
|
|
|
|
|
if (os.path.isfile(metadata_path)):
|
|
print("Loading metadata to memory from " + metadata_path)
|
|
metadata = sorted(json.load(open(metadata_file, "r")), key=lambda k: datetime.strptime(k['datetime'], '%d-%b-%Y (%H:%M:%S.%f)'))
|
|
else:
|
|
print("Default metadata_file not found.")
|
|
|
|
if (os.path.isdir(image_folder)):
|
|
print("Image folder set to " + image_folder)
|
|
else:
|
|
print("Default image folder not found.")
|
|
|
|
#main input loop
|
|
command = "dummy"
|
|
while (command.lower() not in ["exit", "quit", "end"]):
|
|
|
|
try:
|
|
command = input("Please enter command: ")
|
|
except:
|
|
print()
|
|
print("Error while reading keyboard input")
|
|
break
|
|
command_split = command.split(" ")
|
|
command = command_split[0]
|
|
attributes = command_split[1:]
|
|
|
|
if command.lower() == "help" or command == "?":
|
|
print()
|
|
print("List of commands:")
|
|
print("exit - exits the program")
|
|
print("help - lists all commands")
|
|
print("dir - list current selected image folder")
|
|
print("len - length of current metadata set")
|
|
print("index - display index of current lined up image")
|
|
print("info - display info of current lined up image")
|
|
print("metadata <filepath> - load new metadata file from path")
|
|
print("imagefolder <path> - selected new image folder")
|
|
print("show - opens image next in line in default os program")
|
|
print("trigger - next image in line is sent to backend")
|
|
print("fetch - gets the next image from database if exists")
|
|
print("fetchall - get all images")
|
|
print("rapid <#amount> - next <amound> images in line are sent to backend in 2 second intervals")
|
|
print("skip [<#amount>] - skips the next <amount> number of images in line or 1 if no <amount> is given")
|
|
print("set <#index> - sets image on given <index> as next in line")
|
|
print("select |<identifier> - select image with given <identifier> as next in line")
|
|
print()
|
|
print()
|
|
|
|
if command.lower() == "dir":
|
|
print("Current image folder is " + image_folder)
|
|
|
|
if command.lower() == "len":
|
|
if metadata is None:
|
|
print("No metadata selected")
|
|
else:
|
|
print("Length of metadata list " + str(len(metadata)))
|
|
|
|
if command.lower() == "index":
|
|
print("Current lineup index is " + str(index))
|
|
|
|
if command.lower() == "info":
|
|
if metadata is None:
|
|
print("No metadata selected")
|
|
else:
|
|
if (index < len(metadata)):
|
|
print("Metadata on index " + str(index))
|
|
print(metadata[index])
|
|
|
|
if command.lower() == "metadata":
|
|
if (len(attributes) < 1):
|
|
print("Error: No path supplied")
|
|
continue
|
|
path = attributes[0]
|
|
if not os.path.isfile(path):
|
|
print("Error: No file on path " + path)
|
|
continue
|
|
print("Loading metadata to memory from " + path)
|
|
metadata = sorted(json.load(open(metadata_file, "r")), key=lambda k: datetime.strptime(k['datetime'], '%d-%b-%Y (%H:%M:%S.%f)'))
|
|
index = 0
|
|
|
|
if command.lower() == "imagefolder":
|
|
if (len(attributes) < 1):
|
|
print("Error: No path supplied")
|
|
continue
|
|
path = attributes[0]
|
|
if not os.path.isdir(path):
|
|
print("Error: Path is no valid directory " + path)
|
|
continue
|
|
print("New image folder selected " + path)
|
|
|
|
if command.lower() == "show":
|
|
if metadata is None:
|
|
print("No metadata loaded")
|
|
continue
|
|
if image_folder is None:
|
|
print("No image folder selected")
|
|
continue
|
|
meta_payload = metadata[index]
|
|
filename = meta_payload['filename']
|
|
os.system(image_folder + os.path.sep + filename)
|
|
|
|
if command.lower() == "trigger":
|
|
if metadata is None:
|
|
print("No metadata loaded")
|
|
continue
|
|
if image_folder is None:
|
|
print("No image folder selected")
|
|
continue
|
|
meta_payload = metadata[index]
|
|
filename = meta_payload['filename']
|
|
send_image(filename[:-4], image_folder + os.path.sep + filename, meta_payload)
|
|
index = index + 1
|
|
|
|
if command.lower() == "fetch":
|
|
if metadata is None:
|
|
print("No metadata loaded")
|
|
continue
|
|
if image_folder is None:
|
|
print("No image folder selected")
|
|
continue
|
|
meta_payload = metadata[index]
|
|
filename = meta_payload['filename']
|
|
get_image(filename[:-4])
|
|
index = index + 1
|
|
|
|
|
|
if command.lower() == "fetchall":
|
|
if metadata is None:
|
|
print("No metadata loaded")
|
|
continue
|
|
if image_folder is None:
|
|
print("No image folder selected")
|
|
continue
|
|
get_all()
|
|
index = index + 1
|
|
|
|
if command.lower() == "rapid":
|
|
if metadata is None:
|
|
print("No metadata loaded")
|
|
continue
|
|
if image_folder is None:
|
|
print("No image folder selected")
|
|
continue
|
|
if (len(attributes) < 1):
|
|
print("Error: No amount supplied")
|
|
continue
|
|
if not attributes[0].isnumeric():
|
|
print("Error: amount is no number")
|
|
continue
|
|
amount = int(attributes[0])
|
|
for i in range(0,amount):
|
|
meta_payload = metadata[index]
|
|
filename = meta_payload['filename']
|
|
send_image(filename[:-4], image_folder + os.path.sep + filename, meta_payload)
|
|
index = index + 1
|
|
if index >= len(metadata) or index < 0:
|
|
index = 0
|
|
try:
|
|
time.sleep(2)
|
|
except:
|
|
print("Error on sleep")
|
|
|
|
if command.lower() == "skip":
|
|
if metadata is None:
|
|
print("Please first select metadata")
|
|
continue
|
|
if (len(attributes) < 1):
|
|
index = index + 1
|
|
else:
|
|
amount = attributes[0]
|
|
if amount.isnumeric():
|
|
index = index + int(amount)
|
|
if index >= len(metadata) or index < 0:
|
|
index = 0
|
|
|
|
if command.lower() == "set":
|
|
if metadata is None:
|
|
print("Please first select metadata")
|
|
continue
|
|
if (len(attributes) < 1):
|
|
print("Error: No index supplied")
|
|
continue
|
|
if not attributes[0].isnumeric():
|
|
print("Index is no number")
|
|
continue
|
|
ind = int(attributes[0])
|
|
if ind >= len(metadata) or ind < 0:
|
|
print("Index outside length of metadata")
|
|
continue
|
|
index = ind
|
|
|
|
|
|
if command.lower() == "select":
|
|
if metadata is None:
|
|
print("Please first select metadata")
|
|
continue
|
|
if (len(attributes) < 1):
|
|
print("Error: No identifier supplied")
|
|
continue
|
|
identifier = attributes[0]
|
|
found = False
|
|
for i, elem in enumerate(metadata):
|
|
if elem['filename'][:-4] == identifier:
|
|
index = i
|
|
print("Found image at index " + str(i))
|
|
found = True
|
|
break
|
|
if not found:
|
|
print("Could not find image by identifier " + identifier)
|
|
|
|
quit("Bye") |