Commit 26af7b13 authored by neop's avatar neop
Browse files

start implementing csv to influxdb point conversion

parent 8380bbe2
......@@ -5,4 +5,5 @@ failed_to_upload_sensors_file = test_sensor.txt
pid_reg_server_cert = /home/neop/Documents/Uni/bachelor/ba/sensordata-ansible/roles/sensor2graph/files/sensor2graph/handle_auth/pid_reg_server_cert.pem
pid_reg_keystore = /home/neop/Documents/Uni/bachelor/ba/sensordata-ansible/roles/sensor2graph/files/sensor2graph/handle_auth/pid_reg_keystore.p12
neo4j_server_certificate_file = /home/neop/Documents/Uni/bachelor/ba/sensordata-ansible/roles/nginx-reverse-proxy/files/nginx-reverse-proxy/sensordata_cert.pem
sensor_type_info = config/sensor_type_info.json
\ No newline at end of file
sensor_type_info = config/sensor_type_info.json
influxdb_uri = http://localhost:8086
\ No newline at end of file
......@@ -10,6 +10,7 @@ import java.net.*;
import java.nio.file.*;
import java.nio.file.attribute.*;
import com.influxdb.client.write.Point;
import org.neo4j.driver.*;
import sensor2graph.webdirget.*;
......@@ -35,8 +36,11 @@ class DayUploader {
var saved_csv_file = new File(Main.csv_directory, new URL(file_uri).getFile());
download_csv_to_disk(saved_csv_file, file_uri);
// init sensor after saving to get broken csvs also
var sensor = make_sensor_from_csv(saved_csv_file, file_uri);
put_data_into_influxdb(saved_csv_file, sensor);
Main.glogger.info("uploading sensor " + file_uri);
try (var session = Main.driver.session(SessionConfig.forDatabase(Main.database_name))) {
......@@ -68,6 +72,32 @@ class DayUploader {
};
}
private static void put_data_into_influxdb(File saved_csv_file, Sensor sensor) throws FileNotFoundException, IOException {
try (var csv = new BufferedReader(new FileReader(saved_csv_file))) {
var index_of_field = get_index_of_field_map(csv.readLine());
csv.lines().map(line -> line.split(";")).forEach(data -> {
index_of_field.get("sensor_id");
Main.influxdb_write_api.writePoint(Point.measurement("sensor")
.addTag("sensor_id", (String) sensor.properties.get("sensor_id"))
.addTag("sensor_type", (String) sensor.properties.get("sensor_type"))
.addTag("location", (String) sensor.properties.get("location"))
.addField("used_percent", 29.43234543));
});
}
Main.influxdb_write_api.flush();
}
private static Map<String, Integer> get_index_of_field_map(String header_line) throws IOException {
var header = header_line.split(";");
var index_of_field = new HashMap<String, Integer>();
for (int i = 0; i < header.length; ++i) {
index_of_field.put(header[i], i);
}
return index_of_field;
}
private static Sensor make_sensor_from_csv(File saved_csv_file, String file_uri) throws Exception {
try (var csv = new BufferedReader(new FileReader(saved_csv_file))) {
return Sensor.fromCSV(file_uri, read_one_csv_line(csv));
......
......@@ -113,7 +113,7 @@ public class Main {
private static void run() throws Exception {
System.out.println("starting influx query");
var tables = influxDBClient.getQueryApi().query("from(bucket:\"openforecast\") |> range(start: -600d1h, stop: -600d) |> limit(100)");
var tables = influxDBClient.getQueryApi().query("from(bucket:\"openforecast\") |> range(start: -600d1h, stop: -600d)");
for (var fluxTable : tables) {
System.out.println("table " + fluxTable);
......@@ -224,6 +224,7 @@ public class Main {
Main.logger.log(Level.SEVERE, "An unrecoverable exception occured, terminating...", e);
} finally {
driver.close();
influxdb_write_api.close();
influxDBClient.close();
}
}
......
......@@ -16,7 +16,7 @@ import java.util.logging.*;
import java.util.regex.*;
class Sensor {
private Map<String, Object> properties = new HashMap<String, Object>();
public Map<String, Object> properties = new HashMap<String, Object>();
public void process(Session session, TransactionConfig transaction_config) throws Exception {
var rec = session.writeTransaction(tx -> {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment