Commit 73ad5968 authored by j.hoerdt's avatar j.hoerdt
Browse files

Merge branch 'pids' into master

parents daada96c 008646db
......@@ -78,4 +78,14 @@ The latest artifacts can be browsed [here](https://gitlab.gwdg.de/j.hoerdt/senso
The internal api docs can be built with `./gradlew javadoc`.
1. Proceed with [point 2 of via jar file](#runjar).
\ No newline at end of file
1. Proceed with [point 2 of via jar file](#runjar).
## Developers:
### Updating dependencies:
Look for new versions here:
- https://search.maven.org/search?q=g:com.google.code.gson%20AND%20a:gson
- https://search.maven.org/search?q=g:org.neo4j.driver%20AND%20a:neo4j-java-driver
and update the entries in the `build.gradle` file.
......@@ -11,6 +11,7 @@ repositories {
dependencies {
testImplementation 'junit:junit:4.12'
implementation 'org.neo4j.driver:neo4j-java-driver:4.1.1'
implementation 'com.google.code.gson:gson:2.8.6'
}
......
......@@ -28,8 +28,7 @@ class DayUploader {
return () -> {
Main.glogger.fine("creating result stream");
Stream<Result> results = csv_file_uris.flatMap(file_uri -> {
Result res;
Stream<Integer> results = csv_file_uris.flatMap(file_uri -> {
Main.glogger.fine("creating session in thread " + Thread.currentThread().getName());
try (
......@@ -38,22 +37,15 @@ class DayUploader {
) {
csv.mark(2000);
Map<String, String> one_csv_line = read_one_csv_line(csv);
csv.reset();
File saved_csv_file = new File(Main.csv_directory, new URL(file_uri).getFile());
Files.createDirectories(saved_csv_file.getParentFile().toPath(), daily_folder_perms);
try (FileWriter csv_file_writer = new FileWriter(saved_csv_file)) {
Main.glogger.info("copying csv to " + saved_csv_file);
csv.transferTo(csv_file_writer);
}
save_csv_to_disk(file_uri, csv);
// init sensor after saving to get broken csvs also
Sensor sensor = Sensor.fromCSV(file_uri, one_csv_line);
Main.glogger.info("uploading sensor " + file_uri);
res = session.writeTransaction(tx -> tx.run(sensor.get_creation_query()), transaction_config);
Main.glogger.fine("uploaded sensor " + file_uri);
sensor.process(session, transaction_config);
Main.glogger.info("uploading sensor " + file_uri);
} catch(Exception e) {
Main.glogger.log(Level.SEVERE, "sensor " + file_uri + " not uploaded because: " + e.getMessage(), e);
try {
......@@ -66,7 +58,7 @@ class DayUploader {
}
Main.glogger.fine("closed session");
return Stream.of(res);
return Stream.of(1);
});
Main.glogger.fine("result stream complete, counting results");
......@@ -77,6 +69,15 @@ class DayUploader {
Main.glogger.info("finished uploading");
};
}
private static void save_csv_to_disk(String file_uri, BufferedReader csv) throws MalformedURLException, IOException {
File saved_csv_file = new File(Main.csv_directory, new URL(file_uri).getFile());
Files.createDirectories(saved_csv_file.getParentFile().toPath(), daily_folder_perms);
try (FileWriter csv_file_writer = new FileWriter(saved_csv_file)) {
Main.glogger.info("copying csv to " + saved_csv_file);
csv.transferTo(csv_file_writer);
}
}
private static Map<String, String> read_one_csv_line(BufferedReader csv_reader) throws IOException {
......
......@@ -6,6 +6,7 @@ import org.neo4j.driver.Config.*;
import sensor2graph.webdirget.*;
import java.io.*;
import java.net.PasswordAuthentication;
import java.nio.file.*;
import java.util.*;
import java.util.concurrent.*;
......@@ -16,6 +17,7 @@ import java.util.stream.*;
public class Main {
// configurable through command line arguments {
private static String config_file = "sensor2graph.properties";
public static PasswordAuthentication pid_registry_credentials;
// }
// configurable in configuration file {
......@@ -29,6 +31,8 @@ public class Main {
private static String neo4j_uri;
private static File neo4j_server_certificate_file;
private static String archive_uri;
public static String pid_registry_uri;
public static String prefix_for_new_pids;
// }
public static Driver driver;
......@@ -158,6 +162,9 @@ public class Main {
Main.logger.fine("initializing neo4j driver.");
initialize_driver(AuthTokens.basic(args[0], args[1]));
Main.logger.fine("setting pid registry credentials");
pid_registry_credentials = new PasswordAuthentication(args[2], args[3].toCharArray());
try {
test_auth();
run();
......@@ -184,6 +191,8 @@ public class Main {
neo4j_uri = config.getProperty("neo4j_uri" );
neo4j_server_certificate_file = new File(config.getProperty("neo4j_server_certificate_file"));
archive_uri = config.getProperty("archive_uri" );
pid_registry_uri = config.getProperty("pid_registry_uri" );
prefix_for_new_pids = config.getProperty("prefix_for_new_pids" );
}
......@@ -200,16 +209,18 @@ public class Main {
private static Properties get_default_properties() {
Properties defaults = new Properties();
defaults.setProperty("already_uploaded_days_file", "already_uploaded_days.txt" );
defaults.setProperty("failed_to_upload_sensors_file", "failed_to_upload.txt" );
defaults.setProperty("database_name", "sensor" );
defaults.setProperty("csv_directory", "data/csv_files/" );
defaults.setProperty("geocoding_cache", "data/geocoding_cache/" );
defaults.setProperty("concurrency", "50" );
defaults.setProperty("http_agent", "Please/0.5" );
defaults.setProperty("neo4j_uri", "bolt://graphdb.gwdg.de:7687" );
defaults.setProperty("neo4j_server_certificate_file", "graphdb.gwdg.de.pem" );
defaults.setProperty("archive_uri", "https://archive.sensor.community/");
defaults.setProperty("already_uploaded_days_file", "already_uploaded_days.txt" );
defaults.setProperty("failed_to_upload_sensors_file", "failed_to_upload.txt" );
defaults.setProperty("database_name", "sensor" );
defaults.setProperty("csv_directory", "data/csv_files/" );
defaults.setProperty("geocoding_cache", "data/geocoding_cache/" );
defaults.setProperty("concurrency", "50" );
defaults.setProperty("http_agent", "Please/0.5" );
defaults.setProperty("neo4j_uri", "bolt://graphdb.gwdg.de:7687" );
defaults.setProperty("neo4j_server_certificate_file", "graphdb.gwdg.de.pem" );
defaults.setProperty("archive_uri", "https://archive.sensor.community/" );
defaults.setProperty("pid_registry_uri", "http://vm04.pid.gwdg.de:8081/handles/");
defaults.setProperty("prefix_for_new_pids", "21.T11998/" );
return defaults;
}
......@@ -217,16 +228,16 @@ public class Main {
private static boolean check_args(String[] args) {
Main.logger.fine("checking for correct arg count");
if (args.length == 2) {
if (args.length == 4) {
return true;
}
if (args.length == 3) {
config_file = args[2];
if (args.length == 5) {
config_file = args[4];
return true;
}
System.out.println("Usage: <this_program> <username> <password> [config_file]");
System.out.println("Usage: <this_program> <graphdb_username> <graphdb_password> <pid_registry_username> <pid_registry_password> [config_file]");
return false;
}
......
......@@ -3,21 +3,106 @@ package sensor2graph;
import sensor2graph.georev.*;
import org.neo4j.driver.*;
import org.neo4j.driver.Record;
import com.google.gson.*;
import java.net.*;
import java.net.http.*;
import java.net.http.HttpRequest.BodyPublishers;
import java.net.http.HttpResponse.BodyHandlers;
import java.time.Duration;
import java.util.*;
import java.util.function.*;
import java.util.logging.*;
import java.util.regex.*;
class Sensor {
private static HttpClient client = HttpClient.newBuilder().connectTimeout(Duration.ofSeconds(20))
.authenticator(new Authenticator() {
protected PasswordAuthentication getPasswordAuthentication() {
return Main.pid_registry_credentials;
}
}).build();
private Map<String, Object> properties = new HashMap<String, Object>();
public Query get_creation_query() {
public void process(Session session, TransactionConfig transaction_config) throws Exception {
org.neo4j.driver.Record rec = session.writeTransaction(tx -> {
Result res = tx.run(get_creation_query());
return res.single();
}, transaction_config);
String pid = rec.get("s.pid").asString();
properties.put("node_id", rec.get("id(s)").asObject());
if (pid == "null") {
//no pid exists for this sensor
properties.put("pid", register_new_pid(rec));
put_pid_in_graphdb(session, transaction_config);
} else {
update_pid(pid, rec);
}
}
private void put_pid_in_graphdb(Session session, TransactionConfig transaction_config) {
Query pid_query = new Query("match (n) where id(n) = $node_id set n.pid = $pid").withParameters(properties);
session.writeTransaction(tx -> tx.run(pid_query), transaction_config);
}
private String register_new_pid(Record rec) throws Exception {
String response_body = request_to_registry("POST", Main.prefix_for_new_pids, rec);
return new Gson().fromJson(response_body, JsonObject.class).get("epic-pid").getAsString();
}
private void update_pid(String pid, Record rec) throws Exception {
request_to_registry("PUT", pid, rec);
}
private String request_to_registry(String method, String target_uri, Record rec) throws Exception {
JsonArray body = new JsonArray();
add_attribute(body,
"21.T11148/eb3c713572f681e6c4c3",
"[{\"AlternateIdentifier\":{\"AlternateIdentifierValue\":\"" + properties.get("sensor_id") + "\",\"alternateIdentifierType\":\"serialNumber\"}}]"
);
add_attribute(body,
"21.T11148/22c62082a4d2d9ae2602",
"[{\"Date\":{\"date\":\"" + rec.get("s.first_msg").asLocalDate().toString() + "\",\"dateType\":\"first measurement\"}},{\"Date\":{\"date\":\"" + rec.get("s.last_msg").asLocalDate().toString() + "\",\"dateType\":\"last measurement\"}}]"
);
add_attribute(body,
"21.T11148/709a23220f2c3d64d1e1",
rec.get("type.sensor_type").asString()
);
Main.glogger.fine("sent body: " + body);
HttpRequest request = HttpRequest.newBuilder(URI.create(Main.pid_registry_uri + target_uri))
.header("Content-Type", "application/json")
.header("Accept", "application/json")
.method(method, BodyPublishers.ofString(body.toString())).build();
HttpResponse<String> response = client.send(request, BodyHandlers.ofString());
Main.glogger.fine("status: " + response.statusCode());
Main.glogger.fine("resp: " + response.body());
if (response.statusCode() / 100 != 2) {
throw new Exception("request to pid registry failed with status code " + response.statusCode() + ", reponse: " + response.body());
}
return response.body();
}
private void add_attribute(JsonArray body, String registered_type, String parsed_data) {
JsonObject obj = new JsonObject();
obj.addProperty("type", registered_type);
obj.addProperty("parsed_data", parsed_data);
body.add(obj);
}
private Query get_creation_query() {
BiFunction<String, String, String> string_if_exists = (dependency, merge_string) -> (properties.get(dependency) != null ? merge_string : "");
Query template = new Query(
return new Query(
"with date($date) as date_ " +
string_if_exists.apply("country", "merge (country:Country {code: $country_code, name: $country})") +
string_if_exists.apply("state",
......@@ -49,10 +134,9 @@ class Sensor {
"set" +
" s.first_msg = case when date_ > s.first_msg then s.first_msg else date_ end," +
" s.last_msg = case when date_ < s.last_msg then s.last_msg else date_ end"
);
return template.withParameters(properties);
" s.last_msg = case when date_ < s.last_msg then s.last_msg else date_ end " +
"return s.pid, id(s), s.first_msg, s.last_msg, type.sensor_type"
).withParameters(properties);
}
private static Predicate<String> contains_indoor_string = Pattern.compile(".+indoor.csv").asMatchPredicate();
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment