Commit a4e38a00 authored by neop's avatar neop
Browse files

simplify configuration system

- adding or modifying options does not require you to touch 3 places anymore
- don't softcode concurrency, process_timeout_minutes, http_agent
- finally restrict args count, introduced in 8bde568c
- config_file was a terrible global
- remove bad defaults for some options
parent b7c8acd8
*
!*/
# whitelist
!gradle/*
......
......@@ -5,3 +5,4 @@ failed_to_upload_sensors_file = /progress/failed_to_upload.txt
pid_reg_server_cert = /handle_auth/pid_reg_server_cert.pem
pid_reg_keystore = /handle_auth/pid_reg_keystore.p12
neo4j_uri=bolt://sensordata.open-forecast.eu:7687
influxdb_uri=https://metrics.gwdg.de:8086
......@@ -17,7 +17,7 @@ import org.neo4j.driver.*;
import sensor2graph.webdirget.*;
class DayUploader {
private static ForkJoinPool thread_pool = new ForkJoinPool(Main.concurrency);
private static ForkJoinPool thread_pool = new ForkJoinPool(10);
private static TransactionConfig transaction_config = TransactionConfig.builder().withTimeout(Duration.ofMinutes(1)).build();
private static FileAttribute<?> daily_folder_perms = PosixFilePermissions.asFileAttribute(PosixFilePermissions.fromString("rwxr-xr-x"));
......@@ -34,7 +34,7 @@ class DayUploader {
Main.glogger.fine("creating session in thread " + Thread.currentThread().getName());
try {
var saved_csv_file = new File(Main.csv_directory, new URL(file_uri).getFile());
var saved_csv_file = new File(Main.config.getProperty("csv_directory"), new URL(file_uri).getFile());
download_csv_to_disk(saved_csv_file, file_uri);
......@@ -44,7 +44,7 @@ class DayUploader {
put_data_into_influxdb(saved_csv_file, sensor);
Main.glogger.info("uploading sensor " + file_uri);
try (var session = Main.driver.session(SessionConfig.forDatabase(Main.database_name))) {
try (var session = Main.driver.session(SessionConfig.forDatabase(Main.config.getProperty("database_name")))) {
sensor.process(session, transaction_config);
}
Main.glogger.fine("uploaded sensor " + file_uri);
......@@ -52,9 +52,9 @@ class DayUploader {
} catch(Exception e) {
Main.glogger.log(Level.SEVERE, "sensor " + file_uri + " not uploaded because: " + e.getMessage(), e);
try {
Util.append_to_file(Main.failed_to_upload_sensors_file, file_uri);
Util.append_to_file(Main.config.getProperty("failed_to_upload_sensors_file"), file_uri);
} catch (Exception e1) {
Main.glogger.severe("Could not write to failed sensors file " + Main.failed_to_upload_sensors_file + " because: " + e1.getMessage());
Main.glogger.severe("Could not write to failed sensors file " + Main.config.getProperty("failed_to_upload_sensors_file") + " because: " + e1.getMessage());
throw new RuntimeException(e1);
}
return Stream.empty();
......@@ -181,7 +181,7 @@ class DayUploader {
remove_daily_folder(new URL(daily_folder_url));
Main.glogger.fine("adding " + daily_folder_url + " to list of successful days");
Util.append_to_file(Main.already_uploaded_days_file, daily_folder_url);
Util.append_to_file(Main.config.getProperty("already_uploaded_days_file"), daily_folder_url);
}
public static void create_daily_archive(URL daily_folder_url) throws Exception {
......@@ -190,10 +190,10 @@ class DayUploader {
Main.glogger.info("archiving " + date + " to " + tarname);
Files.createDirectories(new File(Main.csv_directory, date).toPath(), daily_folder_perms);
Files.createDirectories(new File(Main.config.getProperty("csv_directory"), date).toPath(), daily_folder_perms);
var tar_zstd = new ProcessBuilder("tar", "--zstd", "-cf", tarname, date)
.directory(Main.csv_directory)
.directory(new File(Main.config.getProperty("csv_directory")))
.redirectErrorStream(true);
run_process(tar_zstd, "tar");
......@@ -206,7 +206,7 @@ class DayUploader {
//easier than stdlib:
var rm = new ProcessBuilder("rm", "-rf", date)
.directory(Main.csv_directory)
.directory(new File(Main.config.getProperty("csv_directory")))
.redirectErrorStream(true);
run_process(rm, "rm");
......@@ -216,7 +216,7 @@ class DayUploader {
long start_time = System.currentTimeMillis();
var p = process_builder.start();
if (p.waitFor(Main.process_timeout_minutes, TimeUnit.MINUTES) == false /*waiting time elapsed*/) {
if (p.waitFor(120, TimeUnit.MINUTES) == false /*waiting time elapsed*/) {
throw new TimeoutException(name + " process took too long");
}
......
......@@ -14,7 +14,7 @@ public class Handle {
private static TrustManager[] get_trust_managers() throws Exception {
var server_self_signed_certificate = CertificateFactory.getInstance("X.509")
.generateCertificate(new FileInputStream(Main.pid_reg_server_cert));
.generateCertificate(new FileInputStream(Main.config.getProperty("pid_reg_server_cert")));
var server_cert_public_key_store = KeyStore.getInstance(KeyStore.getDefaultType());
server_cert_public_key_store.load(null, null);
......@@ -27,7 +27,7 @@ public class Handle {
private static KeyManager[] get_key_managers() throws Exception {
var private_key_store = KeyStore.getInstance(KeyStore.getDefaultType());
private_key_store.load(new FileInputStream(Main.pid_reg_keystore), "asdfasdf".toCharArray());
private_key_store.load(new FileInputStream(Main.config.getProperty("pid_reg_keystore")), "asdfasdf".toCharArray());
var key_manager_factory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
key_manager_factory.init(private_key_store, "asdfasdf".toCharArray());
return key_manager_factory.getKeyManagers();
......
......@@ -20,28 +20,7 @@ import java.util.logging.*;
import java.util.stream.*;
public class Main {
// configurable through command line arguments {
private static String config_file = "sensor2graph.properties";
// }
// configurable in configuration file {
public static String already_uploaded_days_file;
public static String failed_to_upload_sensors_file;
public static String database_name;
public static File csv_directory;
public static File geocoding_cache;
public static int concurrency;
public static int process_timeout_minutes;
private static String neo4j_uri;
private static String influxdb_uri;
private static String influxdb_database_name;
private static String archive_uri;
private static String sensor_type_info;
public static String pid_registry_uri;
public static String prefix_for_new_pids;
public static String pid_reg_server_cert;
public static String pid_reg_keystore;
// }
public static Properties config;
public static Driver driver;
public static InfluxDBClient influxDBClient;
......@@ -56,7 +35,7 @@ public class Main {
private static void initialize_neo4j_driver() {
driver = GraphDatabase.driver(
neo4j_uri,
config.getProperty("neo4j_uri"),
AuthTokens.basic(System.getenv("SENSOR2GRAPH_GRAPHDB_USER"), System.getenv("SENSOR2GRAPH_GRAPHDB_PASS")),
Config.builder()
.withLogging(Logging.console(Level.INFO))
......@@ -65,7 +44,7 @@ public class Main {
.withMaxConnectionLifetime(30, TimeUnit.SECONDS)
//.withMaxTransactionRetryTime(3, TimeUnit.SECONDS)
//.withLeakedSessionsLogging()
.withMaxConnectionPoolSize(concurrency)
.withMaxConnectionPoolSize(10)
.withEncryption()
.build()
);
......@@ -75,12 +54,12 @@ public class Main {
// inspired by https://github.com/influxdata/influxdb-client-java/blob/86bda85ca6f0acf5bdea9883645f47e93d501003/client/src/main/java/com/influxdb/client/InfluxDBClientFactory.java#L173 with added timeout
influxDBClient = InfluxDBClientFactory.create(
InfluxDBClientOptions.builder()
.url(Main.influxdb_uri)
.url(Main.config.getProperty("influxdb_uri"))
.authenticateToken(String.format("%s:%s",
System.getenv("SENSOR2GRAPH_INFLUXDB_USER"),
System.getenv("SENSOR2GRAPH_INFLUXDB_PASS")
).toCharArray())
.bucket(Main.influxdb_database_name)
.bucket(Main.config.getProperty("influxdb_database_name"))
.org("-")
.okHttpClient(
new OkHttpClient().newBuilder()
......@@ -116,13 +95,13 @@ public class Main {
private static void try_uploading_failed_sensors() throws Exception {
Main.logger.info("Trying again to upload failed csv files in case of transient errors");
Set<String> failed_csv_uris = parse_line_separated_values_file(failed_to_upload_sensors_file);
Set<String> failed_csv_uris = parse_line_separated_values_file(config.getProperty("failed_to_upload_sensors_file"));
Path backup = Path.of(failed_to_upload_sensors_file + ".backup");
Path backup = Path.of(config.getProperty("failed_to_upload_sensors_file") + ".backup");
//those who fail again will get rewritten
try {
Files.move(Path.of(failed_to_upload_sensors_file), backup);
Files.move(Path.of(config.getProperty("failed_to_upload_sensors_file")), backup);
} catch (FileAlreadyExistsException e) {
Main.logger.severe(backup + " already existed. Looks like the program was abruptly terminated last time. Consider manually restoring it by removing the .backup postfix.");
throw new Exception(e);
......@@ -132,13 +111,13 @@ public class Main {
try {
DayUploader.upload_from_csv_uris(failed_csv_uris.stream());
} catch (Exception e) {
Main.logger.log(Level.SEVERE, "uploading failed csvfiles failed fatally, restoring " + failed_to_upload_sensors_file, e);
Files.move(backup, Path.of(failed_to_upload_sensors_file), StandardCopyOption.REPLACE_EXISTING);
Main.logger.log(Level.SEVERE, "uploading failed csvfiles failed fatally, restoring " + config.getProperty("failed_to_upload_sensors_file"), e);
Files.move(backup, Path.of(config.getProperty("failed_to_upload_sensors_file")), StandardCopyOption.REPLACE_EXISTING);
throw e;
}
if (!Files.deleteIfExists(backup)) {
Main.logger.warning(failed_to_upload_sensors_file + " somehow did not exist anymore");
Main.logger.warning(config.getProperty("failed_to_upload_sensors_file") + " somehow did not exist anymore");
}
Main.logger.info("Finished trying to upload failed csv files");
......@@ -148,13 +127,13 @@ public class Main {
private static void upload_all_days() throws Exception {
Main.logger.info("Start to upload all sensors");
Set<String> already_uploaded_day_uris = parse_line_separated_values_file(already_uploaded_days_file);
Set<String> already_uploaded_day_uris = parse_line_separated_values_file(config.getProperty("already_uploaded_days_file"));
Stream<String> daily_folders;
try {
daily_folders = DirectoryIndexer.get_daily_folders_in_html(Util.reader_of_uri(archive_uri), archive_uri);
daily_folders = DirectoryIndexer.get_daily_folders_in_html(Util.reader_of_uri(Main.config.getProperty("archive_uri")), Main.config.getProperty("archive_uri"));
} catch (Exception e) {
throw new Exception("could not read list of folders from " + archive_uri, e);
throw new Exception("could not read list of folders from " + Main.config.getProperty("archive_uri"), e);
}
daily_folders
......@@ -176,11 +155,17 @@ public class Main {
public static void main(String[] args) {
Main.logger.fine("Commandline arguments are: " + Stream.of(args).collect(Collectors.joining(", ")));
if (!check_args(args)) return;
String config_file = null;
try {
config_file = parse_args(args);
} catch (Exception e) {
Main.logger.log(Level.SEVERE, "Usage: <this_program> [config_file]", e);
System.exit(-1);
}
Main.logger.fine("parsing config file");
try {
initialize_config_vars();
config = load_user_defined_properties_with_defaults(config_file, get_default_properties());
} catch (Exception e) {
Main.logger.log(Level.SEVERE, "Failed parsing the configuration file.", e);
System.exit(-1);
......@@ -218,31 +203,9 @@ public class Main {
}
private static void initialize_config_vars() throws Exception {
Properties config = load_user_defined_properties_with_defaults(get_default_properties());
already_uploaded_days_file = config.getProperty("already_uploaded_days_file" );
failed_to_upload_sensors_file = config.getProperty("failed_to_upload_sensors_file");
database_name = config.getProperty("database_name" );
csv_directory = new File(config.getProperty("csv_directory" ));
geocoding_cache = new File(config.getProperty("geocoding_cache" ));
concurrency = Integer.parseInt(config.getProperty("concurrency" ));
neo4j_uri = config.getProperty("neo4j_uri" );
archive_uri = config.getProperty("archive_uri" );
sensor_type_info = config.getProperty("sensor_type_info" );
pid_registry_uri = config.getProperty("pid_registry_uri" );
prefix_for_new_pids = config.getProperty("prefix_for_new_pids" );
pid_reg_server_cert = config.getProperty("pid_reg_server_cert" );
pid_reg_keystore = config.getProperty("pid_reg_keystore" );
process_timeout_minutes = Integer.parseInt(config.getProperty("process_timeout_minutes" ));
influxdb_uri = config.getProperty("influxdb_uri" );
influxdb_database_name = config.getProperty("influxdb_database_name" );
}
private static Properties load_user_defined_properties_with_defaults(Properties defaults) throws Exception {
private static Properties load_user_defined_properties_with_defaults(String fname, Properties defaults) throws Exception {
var config = new Properties(defaults);
config.load(new FileInputStream(config_file));
config.load(new FileInputStream(fname));
return config;
}
......@@ -254,47 +217,36 @@ public class Main {
defaults.setProperty("database_name", "neo4j" );
defaults.setProperty("csv_directory", "data/csv_files/" );
defaults.setProperty("geocoding_cache", "data/geocoding_cache/" );
defaults.setProperty("concurrency", "10" );
defaults.setProperty("neo4j_uri", "bolt://localhost:7687" );
defaults.setProperty("archive_uri", "https://archive.sensor.community/" );
defaults.setProperty("sensor_type_info", "sensor_type_info.json" );
defaults.setProperty("pid_registry_uri", "https://vm13.pid.gwdg.de:8000/api/handles/");
defaults.setProperty("prefix_for_new_pids", "21.11138/" );
defaults.setProperty("pid_reg_server_cert", "pid_reg_server_cert.pem" );
defaults.setProperty("pid_reg_keystore", "pid_reg_keystore.p12" );
defaults.setProperty("process_timeout_minutes", "60" );
defaults.setProperty("influxdb_uri", "https://metrics.gwdg.de:8086" );
defaults.setProperty("influxdb_database_name", "openforecast" );
return defaults;
}
private static boolean check_args(String[] args) {
private static String parse_args(String[] args) throws Exception {
Main.logger.fine("checking for correct arg count");
if (args.length == 0 || args.length == 4) {
return true;
if (args.length == 0) {
return "sensor2graph.properties";
}
if (args.length == 1) {
config_file = args[0];
return true;
}
if (args.length == 5) {
config_file = args[4];
return true;
return args[0];
}
System.out.println("Usage: <this_program> [config_file]");
return false;
throw new Exception("bad CLI arg count");
}
private static void pre_run_graphdb_actions() throws Exception {
Map<String, Object> parameters = new Gson().fromJson(new FileReader(sensor_type_info), new TypeToken <Map<String, Object>>() {}.getType());
try (var session = Main.driver.session(SessionConfig.forDatabase(Main.database_name))) {
Map<String, Object> parameters = new Gson().fromJson(new FileReader(Main.config.getProperty("sensor_type_info")), new TypeToken <Map<String, Object>>() {}.getType());
try (var session = Main.driver.session(SessionConfig.forDatabase(Main.config.getProperty("database_name")))) {
session.writeTransaction(tx -> {
tx.run(
"unwind $sensor_type_info as sensor_type_info\n" +
......@@ -314,7 +266,7 @@ public class Main {
private static void test_auth() {
Main.logger.fine("testing neo4j authentication");
try (Session session = Main.driver.session(SessionConfig.forDatabase(Main.database_name))) {
try (Session session = Main.driver.session(SessionConfig.forDatabase(Main.config.getProperty("database_name")))) {
session.readTransaction(tx -> tx.run(new Query("match (n) return n limit 3")));
}
}
......
......@@ -43,7 +43,7 @@ class Sensor {
}
private String register_new_pid(Record rec) throws Exception {
String response_body = request_to_registry("PUT", Main.prefix_for_new_pids + UUID.randomUUID().toString(), rec);
String response_body = request_to_registry("PUT", Main.config.getProperty("prefix_for_new_pids") + UUID.randomUUID().toString(), rec);
var created_pid = new Gson().fromJson(response_body, JsonObject.class).get("handle").getAsString();
Main.glogger.info("created pid: " + created_pid);
return created_pid;
......@@ -151,14 +151,14 @@ class Sensor {
Request request = new Request.Builder()
.url(Main.pid_registry_uri + target_uri)
.url(Main.config.getProperty("pid_registry_uri") + target_uri)
.method(method, RequestBody.create(body.toString(), MediaType.parse("application/json")))
.header("Accept", "application/json")
.header("Content-Type", "application/json")
.header("Authorization", "Handle clientCert=\"true\"")
.build();
Main.glogger.fine("request uri: " + Main.pid_registry_uri + target_uri);
Main.glogger.fine("request uri: " + Main.config.getProperty("pid_registry_uri") + target_uri);
try (Response response = Handle.http_client.newCall(request).execute()) {
var response_body = response.body().string();
......
......@@ -86,7 +86,7 @@ public class Util {
} catch (DateTimeParseException ex) {}
// save until Sa 20. Nov 18:46:40 CET 2286
// safe until 2262
var as_long = Long.parseLong(str);
switch (str.length()) {
case 10:
......@@ -96,7 +96,6 @@ public class Util {
return Instant.ofEpochMilli(as_long);
case 16:
// may suffer from overlow
return Instant.ofEpochSecond(0, as_long * 1000);
case 19:
......
......@@ -65,7 +65,7 @@ public class ReverseGeocoder {
}
private Document rom_cached_request(String query_params, DocumentBuilder document_builder) throws Exception {
File xml_geodata_file = new File(Main.geocoding_cache, query_params + ".xml");
File xml_geodata_file = new File(Main.config.getProperty("geocoding_cache"), query_params + ".xml");
Document xml_geodata;
if (xml_geodata_file.exists()) {
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment