pr0206.py
import os
from extract_data import extract_data
from insert_influx import insert_influx
if __name__ == "__main__":
base_dir = "./datasets/"
coin_data = []
for dir in os.listdir(path=base_dir):
coin_data.append(extract_data(base_dir + dir))
insert_influx(coin_data)
extract_data.py
from datetime import datetime, timezone
def extract_data(file_name: str) -> list[tuple[int, str, str, datetime, float, float, float, float, float, float]]:
values = []
with open(file_name) as file:
content = file.read()
content = content.split("\n")
for i in range(1, len(content) - 1):
row_content = content[i].split(",")
dt = datetime.strptime(row_content[3], "%Y-%m-%d %H:%M:%S")
dt_utc = dt.replace(tzinfo=timezone.utc)
time_date = dt_utc.isoformat().replace('+00:00', "Z")
values.append((int(row_content[0]), row_content[1], row_content[2], time_date, row_content[4], row_content[5], row_content[6], row_content[7], row_content[8], row_content[9]))
return values
insert_influx.py
from influxdb_client.client.write_api import ASYNCHRONOUS
from influxdb_client.client.influxdb_client import InfluxDBClient
from influxdb_client.client.exceptions import InfluxDBError
from influxdb_client.client.write.point import Point
from influxdb_client.client.write_api import WriteOptions, WriteApi
from urllib3.exceptions import NewConnectionError
from datetime import datetime
from typing import Any
INFLUX_URL = "http://localhost:8086"
INFLUX_TOKEN = "MyInitialAdminToken0=="
SNo_key = "SNo"
name_key = "Name"
symbol_key = "Symbol"
date_key = "Date"
high_key = "High"
low_key = "Low"
open_key = "Open"
close_key = "Close"
volume_key = "Volume"
marketcap_key = "Marketcap"
def insert_influx(data: list[list[tuple[int, str, str, datetime, float, float, float, float, float, float]]]):
try:
client = InfluxDBClient(
url=INFLUX_URL,
token=INFLUX_TOKEN,
org="docs"
)
options = WriteOptions(write_type=ASYNCHRONOUS) # pyright: ignore
write_api = client.write_api(write_options=options)
datos_insertados = 0
datos_no_insertados = 0
for coin_data in data:
print("New coin")
mapped_data = list(map(lambda x: {
SNo_key: x[0],
name_key: x[1],
symbol_key: x[2],
date_key: x[3],
high_key: x[4],
low_key: x[5],
open_key: x[6],
close_key: x[7],
volume_key: x[8],
marketcap_key: x[9]
}, coin_data))
for temp_value in mapped_data:
if _insert_date(temp_value, write_api):
datos_insertados += 1
else:
datos_no_insertados += 1
print("Insertados: ", datos_insertados)
print("No insertados: ", datos_no_insertados)
except NewConnectionError as nce:
print("Error conectando: ", nce)
except KeyboardInterrupt:
print("Finalizado por usuario")
def _insert_date(data: dict[str, Any], write_api: WriteApi) -> bool:
try:
p = Point("crypto_value")\
.tag("coin", data[name_key])\
.field(high_key, float(data[high_key]))\
.field(low_key, float(data[low_key]))\
.field(open_key, float(data[open_key]))\
.field(close_key, float(data[close_key]))\
.field(volume_key, float(data[volume_key]))\
.field(marketcap_key, float(data[marketcap_key]))\
.time(data[date_key])
write_api.write(bucket="crypto", org="docs", record=p)
return True
except InfluxDBError:
return False

