Commit ca733824 authored by neop's avatar neop
Browse files

try to write to local influxdb

parent 26af7b13
......@@ -2,6 +2,7 @@ package sensor2graph;
import java.io.*;
import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.*;
import java.util.logging.*;
import java.util.stream.*;
......@@ -10,6 +11,7 @@ import java.net.*;
import java.nio.file.*;
import java.nio.file.attribute.*;
import com.influxdb.client.domain.WritePrecision;
import com.influxdb.client.write.Point;
import org.neo4j.driver.*;
......@@ -44,7 +46,7 @@ class DayUploader {
Main.glogger.info("uploading sensor " + file_uri);
try (var session = Main.driver.session(SessionConfig.forDatabase(Main.database_name))) {
sensor.process(session, transaction_config);
// sensor.process(session, transaction_config);
}
Main.glogger.fine("uploaded sensor " + file_uri);
......@@ -75,15 +77,26 @@ class DayUploader {
private static void put_data_into_influxdb(File saved_csv_file, Sensor sensor) throws FileNotFoundException, IOException {
try (var csv = new BufferedReader(new FileReader(saved_csv_file))) {
var index_of_field = get_index_of_field_map(csv.readLine());
csv.lines().map(line -> line.split(";")).forEach(data -> {
index_of_field.get("sensor_id");
Main.influxdb_write_api.writePoint(Point.measurement("sensor")
.addTag("sensor_id", (String) sensor.properties.get("sensor_id"))
.addTag("sensor_type", (String) sensor.properties.get("sensor_type"))
.addTag("location", (String) sensor.properties.get("location"))
.addField("used_percent", 29.43234543));
});
var tag_columns = Arrays.asList("sensor_id", "sensor_type", "location");
var field_columns = new HashSet<>(index_of_field.keySet());
field_columns.remove("timestamp");
field_columns.removeAll(tag_columns);
csv.lines()
.map(line -> line.split(";"))
.map(data -> {
var point = Point.measurement("sensor")
.time(Instant.parse(data[index_of_field.get("timestamp")] + "Z"), WritePrecision.S);
for (var tag_column : tag_columns) {
point.addTag(tag_column, (String) sensor.properties.get(tag_column));
}
for (var field_column : field_columns) {
point.addField(field_column, data[index_of_field.get(field_column)]);
}
return point;
})
.forEach(point -> Main.influxdb_write_api.writePoint("bucket0", "openforecast", point));
}
Main.influxdb_write_api.flush();
......
......@@ -80,12 +80,9 @@ public class Main {
influxDBClient = InfluxDBClientFactory.create(
InfluxDBClientOptions.builder()
.url(Main.influxdb_uri)
.org("-")
.authenticateToken(String.format("%s:%s",
System.getenv("SENSOR2GRAPH_INFLUXDB_USER"),
System.getenv("SENSOR2GRAPH_INFLUXDB_PASS")
).toCharArray())
.bucket(influxdb_database_name)
.authenticateToken("hunter2".toCharArray())
.bucket("bucket0")
.org("openforecast")
.okHttpClient(
new OkHttpClient().newBuilder()
.connectTimeout(10, TimeUnit.MINUTES)
......@@ -112,21 +109,21 @@ public class Main {
}
private static void run() throws Exception {
System.out.println("starting influx query");
var tables = influxDBClient.getQueryApi().query("from(bucket:\"openforecast\") |> range(start: -600d1h, stop: -600d)");
// System.out.println("starting influx query");
// var tables = influxDBClient.getQueryApi().query("from(bucket:\"openforecast\") |> range(start: -600d1h, stop: -600d)");
for (var fluxTable : tables) {
System.out.println("table " + fluxTable);
for (var record : fluxTable.getRecords()) {
System.out.println(String.format("%s %s: %s %s", record.getTime(), record.getMeasurement(), record.getField(), record.getValue()));
record.getValues().forEach((key, value) -> System.out.println(key + ":" + value));
}
}
System.out.println("finished influx query");
// try_uploading_failed_sensors();
// upload_all_days();
// try_uploading_failed_sensors();
// for (var fluxTable : tables) {
// System.out.println("table " + fluxTable);
// for (var record : fluxTable.getRecords()) {
// System.out.println(String.format("%s %s: %s %s", record.getTime(), record.getMeasurement(), record.getField(), record.getValue()));
// record.getValues().forEach((key, value) -> System.out.println(key + ":" + value));
// }
// }
// System.out.println("finished influx query");
try_uploading_failed_sensors();
upload_all_days();
try_uploading_failed_sensors();
}
private static void try_uploading_failed_sensors() throws Exception {
......@@ -208,24 +205,26 @@ public class Main {
Main.logger.fine("initializing neo4j driver.");
initialize_neo4j_driver();
Main.logger.fine("initializing influxdb driver.");
initialize_influxdb_driver();
try {
Handle.http_client = Handle.get_client();
test_auth();
pre_run_graphdb_actions();
run();
} catch (org.neo4j.driver.exceptions.AuthenticationException e) {
Main.logger.log(Level.SEVERE, "Authentication failed, are your credentials correct?");
} catch (Exception e) {
Main.logger.log(Level.SEVERE, "An unrecoverable exception occured, terminating...", e);
Main.logger.fine("initializing influxdb driver.");
initialize_influxdb_driver();
try {
try {
Handle.http_client = Handle.get_client();
test_auth();
pre_run_graphdb_actions();
run();
} catch (org.neo4j.driver.exceptions.AuthenticationException e) {
Main.logger.log(Level.SEVERE, "Authentication failed, are your credentials correct?");
} catch (Exception e) {
Main.logger.log(Level.SEVERE, "An unrecoverable exception occured, terminating...", e);
}
} finally {
influxdb_write_api.close();
influxDBClient.close();
}
} finally {
driver.close();
influxdb_write_api.close();
influxDBClient.close();
}
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment