Commit 21a759be authored by j.hoerdt's avatar j.hoerdt
Browse files

Merge branch 'data-importer-integration' into 'master'

Data importer integration

Closes #21

See merge request !11
parents 2294427d f684b747
*
!*/
# whitelist
!gradle/*
......
......@@ -13,6 +13,7 @@ dependencies {
implementation 'org.neo4j.driver:neo4j-java-driver:4.3.5'
implementation 'com.google.code.gson:gson:2.8.9'
implementation 'com.squareup.okhttp3:okhttp:4.9.2'
implementation 'com.influxdb:influxdb-client-java:3.4.0'
}
compileJava {
......
......@@ -5,4 +5,5 @@ failed_to_upload_sensors_file = test_sensor.txt
pid_reg_server_cert = /home/neop/Documents/Uni/bachelor/ba/sensordata-ansible/roles/sensor2graph/files/sensor2graph/handle_auth/pid_reg_server_cert.pem
pid_reg_keystore = /home/neop/Documents/Uni/bachelor/ba/sensordata-ansible/roles/sensor2graph/files/sensor2graph/handle_auth/pid_reg_keystore.p12
neo4j_server_certificate_file = /home/neop/Documents/Uni/bachelor/ba/sensordata-ansible/roles/nginx-reverse-proxy/files/nginx-reverse-proxy/sensordata_cert.pem
sensor_type_info = config/sensor_type_info.json
\ No newline at end of file
sensor_type_info = config/sensor_type_info.json
influxdb_uri = http://localhost:8086
\ No newline at end of file
......@@ -10,6 +10,8 @@ import java.net.*;
import java.nio.file.*;
import java.nio.file.attribute.*;
import com.influxdb.client.domain.WritePrecision;
import com.influxdb.client.write.Point;
import org.neo4j.driver.*;
import sensor2graph.webdirget.*;
......@@ -35,8 +37,11 @@ class DayUploader {
var saved_csv_file = new File(Main.csv_directory, new URL(file_uri).getFile());
download_csv_to_disk(saved_csv_file, file_uri);
// init sensor after saving to get broken csvs also
var sensor = make_sensor_from_csv(saved_csv_file, file_uri);
put_data_into_influxdb(saved_csv_file, sensor);
Main.glogger.info("uploading sensor " + file_uri);
try (var session = Main.driver.session(SessionConfig.forDatabase(Main.database_name))) {
......@@ -68,6 +73,54 @@ class DayUploader {
};
}
private static void put_data_into_influxdb(File saved_csv_file, Sensor sensor) throws FileNotFoundException, IOException {
var tag_columns = List.of("sensor_id", "sensor_type", "location");
var example_parsing_exception = new Object(){ NumberFormatException e = null; };
try (var csv = new BufferedReader(new FileReader(saved_csv_file))) {
var index_of_field = get_index_of_field_map(csv.readLine());
var field_columns = new HashSet<>(index_of_field.keySet());
field_columns.remove("timestamp");
field_columns.removeAll(tag_columns);
csv.lines()
.map(line -> line.split(";", -1))
.forEach(data -> {
var point = Point.measurement("sensor")
.time(Util.parseAnyDateTime(data[index_of_field.get("timestamp")]), WritePrecision.NS);
for (var tag : tag_columns) {
point.addTag(tag, (String) sensor.properties.get(tag));
}
for (var field : field_columns) {
try {
point.addField(field, Double.parseDouble(data[index_of_field.get(field)]));
} catch (NumberFormatException e) {
example_parsing_exception.e = e;
}
}
Main.influxdb_write_api.writePoint(point);
}
);
}
if (example_parsing_exception.e != null) {
Main.glogger.fine("there were lines with columns where I could not parse double from string \"%s\" due to for example " + example_parsing_exception.e);
}
Main.influxdb_write_api.flush();
}
private static Map<String, Integer> get_index_of_field_map(String header_line) throws IOException {
var header = header_line.split(";", -1);
var index_of_field = new HashMap<String, Integer>();
for (int i = 0; i < header.length; ++i) {
index_of_field.put(header[i], i);
}
return index_of_field;
}
private static Sensor make_sensor_from_csv(File saved_csv_file, String file_uri) throws Exception {
try (var csv = new BufferedReader(new FileReader(saved_csv_file))) {
return Sensor.fromCSV(file_uri, read_one_csv_line(csv));
......
package sensor2graph;
import com.influxdb.client.*;
import org.neo4j.driver.*;
import org.neo4j.driver.Config.*;
import okhttp3.OkHttpClient;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
......@@ -31,6 +34,8 @@ public class Main {
public static int process_timeout_minutes;
private static String http_agent;
private static String neo4j_uri;
private static String influxdb_uri;
private static String influxdb_database_name;
private static File neo4j_server_certificate_file;
private static String archive_uri;
private static String sensor_type_info;
......@@ -41,6 +46,8 @@ public class Main {
// }
public static Driver driver;
public static InfluxDBClient influxDBClient;
public static WriteApi influxdb_write_api;
public static String handle_registry_session_id;
public static java.util.logging.Logger glogger;
private static java.util.logging.Logger logger;
......@@ -49,7 +56,7 @@ public class Main {
logger = java.util.logging.Logger.getLogger(Main.class.getName());
}
private static void initialize_driver() {
private static void initialize_neo4j_driver() {
driver = GraphDatabase.driver(
neo4j_uri,
AuthTokens.basic(System.getenv("SENSOR2GRAPH_GRAPHDB_USER"), System.getenv("SENSOR2GRAPH_GRAPHDB_PASS")),
......@@ -67,6 +74,29 @@ public class Main {
);
}
private static void initialize_influxdb_driver() {
// inspired by https://github.com/influxdata/influxdb-client-java/blob/86bda85ca6f0acf5bdea9883645f47e93d501003/client/src/main/java/com/influxdb/client/InfluxDBClientFactory.java#L173 with added timeout
influxDBClient = InfluxDBClientFactory.create(
InfluxDBClientOptions.builder()
.url(Main.influxdb_uri)
.authenticateToken(String.format("%s:%s",
System.getenv("SENSOR2GRAPH_INFLUXDB_USER"),
System.getenv("SENSOR2GRAPH_INFLUXDB_PASS")
).toCharArray())
.bucket(Main.influxdb_database_name)
.org("-")
.okHttpClient(
new OkHttpClient().newBuilder()
.connectTimeout(10, TimeUnit.MINUTES)
.readTimeout(10, TimeUnit.MINUTES)
.writeTimeout(10, TimeUnit.MINUTES)
)
.build()
).enableGzip();
influxdb_write_api = influxDBClient.makeWriteApi();
}
private static Set<String> parse_line_separated_values_file(String fname) throws Exception {
File file = new File(fname);
if (file.createNewFile()) {
......@@ -80,14 +110,12 @@ public class Main {
}
}
private static void run() throws Exception {
try_uploading_failed_sensors();
upload_all_days();
try_uploading_failed_sensors();
}
private static void try_uploading_failed_sensors() throws Exception {
Main.logger.info("Trying again to upload failed csv files in case of transient errors");
......@@ -166,26 +194,32 @@ public class Main {
System.setProperty("http.agent", http_agent);
Main.logger.fine("initializing neo4j driver.");
initialize_driver();
initialize_neo4j_driver();
try {
Handle.http_client = Handle.get_client();
test_auth();
pre_run_graphdb_actions();
run();
} catch (org.neo4j.driver.exceptions.AuthenticationException e) {
Main.logger.log(Level.SEVERE, "Authentication failed, are your credentials correct?");
} catch (Exception e) {
Main.logger.log(Level.SEVERE, "An unrecoverable exception occured, terminating...", e);
Main.logger.fine("initializing influxdb driver.");
initialize_influxdb_driver();
try {
try {
Handle.http_client = Handle.get_client();
test_auth();
pre_run_graphdb_actions();
run();
} catch (org.neo4j.driver.exceptions.AuthenticationException e) {
Main.logger.log(Level.SEVERE, "Authentication failed, are your credentials correct?");
} catch (Exception e) {
Main.logger.log(Level.SEVERE, "An unrecoverable exception occured, terminating...", e);
}
} finally {
influxdb_write_api.close();
influxDBClient.close();
}
} finally {
driver.close();
}
}
private static void initialize_config_vars() throws Exception{
private static void initialize_config_vars() throws Exception {
Properties config = load_user_defined_properties_with_defaults(get_default_properties());
already_uploaded_days_file = config.getProperty("already_uploaded_days_file" );
......@@ -204,6 +238,8 @@ public class Main {
pid_reg_server_cert = config.getProperty("pid_reg_server_cert" );
pid_reg_keystore = config.getProperty("pid_reg_keystore" );
process_timeout_minutes = Integer.parseInt(config.getProperty("process_timeout_minutes" ));
influxdb_uri = config.getProperty("influxdb_uri" );
influxdb_database_name = config.getProperty("influxdb_database_name" );
}
......@@ -232,6 +268,9 @@ public class Main {
defaults.setProperty("pid_reg_server_cert", "pid_reg_server_cert.pem" );
defaults.setProperty("pid_reg_keystore", "pid_reg_keystore.p12" );
defaults.setProperty("process_timeout_minutes", "60" );
defaults.setProperty("influxdb_uri", "https://metrics.gwdg.de:8086" );
defaults.setProperty("influxdb_database_name", "openforecast" );
return defaults;
}
......@@ -279,7 +318,7 @@ public class Main {
private static void test_auth() {
Main.logger.fine("testing authentication");
Main.logger.fine("testing neo4j authentication");
try (Session session = Main.driver.session(SessionConfig.forDatabase(Main.database_name))) {
session.readTransaction(tx -> tx.run(new Query("match (n) return n limit 3")));
}
......
......@@ -16,7 +16,7 @@ import java.util.logging.*;
import java.util.regex.*;
class Sensor {
private Map<String, Object> properties = new HashMap<String, Object>();
public Map<String, Object> properties = new HashMap<String, Object>();
public void process(Session session, TransactionConfig transaction_config) throws Exception {
var rec = session.writeTransaction(tx -> {
......
......@@ -8,6 +8,8 @@ import java.util.stream.*;
import javax.xml.parsers.*;
import org.w3c.dom.*;
import org.xml.sax.*;
import java.time.*;
import java.time.format.*;
public class Util {
private static int timeout_millis = 120000;
......@@ -63,6 +65,46 @@ public class Util {
}
Files.writeString(Path.of(fname), line + '\n', StandardOpenOption.APPEND);
}
public static Instant parseAnyDateTime(String str) {
// formatters for times directly convertible to Instant
for (var formatter : List.of(
DateTimeFormatter.ISO_INSTANT,
DateTimeFormatter.ISO_OFFSET_DATE_TIME
)) {
try {
return formatter.parse(str, Instant::from);
} catch (DateTimeParseException ex) {}
}
// formatters for local times assumed to be UTC
try {
return Instant.from(
DateTimeFormatter.ISO_LOCAL_DATE_TIME.parse(str, LocalDateTime::from)
.atOffset(ZoneOffset.UTC)
);
} catch (DateTimeParseException ex) {}
// save until Sa 20. Nov 18:46:40 CET 2286
var as_long = Long.parseLong(str);
switch (str.length()) {
case 10:
return Instant.ofEpochSecond(as_long);
case 13:
return Instant.ofEpochMilli(as_long);
case 16:
// may suffer from overlow
return Instant.ofEpochSecond(0, as_long * 1000);
case 19:
return Instant.ofEpochSecond(0, as_long);
}
throw new DateTimeException("what kind of timestamp is this supposed to be?: " + str);
}
/*
public static void main(String[] args) throws MalformedURLException, IOException {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment