/* * Copyright (c) 2018 Markus Hennecke * * Permission to use, copy, modify, and distribute this software for any * purpose with or without fee is hereby granted, provided that the above * copyright notice and this permission notice appear in all copies. * * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. */ #include #include #include #include #include #include #include #include "db.h" #include static PGconn *conn = NULL; static bool atexit_regdone = false; static bool _exec_in_tx(struct kreq *, const char *, const char * const *, const size_t); static bool _query(struct kreq *, char **, const char *, const char *, const char *); bool db_connect(struct kreq *r) { if (conn) return true; conn = PQconnectdb( "postgresql://" DATABASE_USER ":" DATABASE_PASSWORD "@" DATABASE_HOSTNAME ":" DATABASE_PORT "/weatherdata" "?application_name=weatherdata_input"); if (PQstatus(conn) != CONNECTION_OK) { kutil_warnx(r, NULL, "Connection to database failed: %s", PQerrorMessage(conn)); } if (!atexit_regdone) atexit_regdone = (atexit(db_disconnect) == 0); return (PQstatus(conn) == CONNECTION_OK); } void db_disconnect(void) { if (conn) { PQfinish(conn); conn = NULL; } } bool _exec_in_tx(struct kreq *r, const char *sql, const char * const params[], const size_t n) { if (! db_connect(r)) return false; PGresult *res = PQexec(conn, "BEGIN"); if (PQresultStatus(res) != PGRES_COMMAND_OK) { kutil_warnx(r, NULL, "BEGIN command failed: %s", PQerrorMessage(conn)); PQclear(res); return false; } PQclear(res); res = PQexecParams(conn, sql, n, NULL, params, NULL, NULL, 0); if (PQresultStatus(res) != PGRES_COMMAND_OK) { kutil_warnx(r, NULL, "stmt \"%s\" failed: %s", sql, PQerrorMessage(conn)); PQclear(res); return false; } PQclear(res); res = PQexec(conn, "COMMIT"); if (PQresultStatus(res) != PGRES_COMMAND_OK) { kutil_warnx(r, NULL, "COMMIT command failed: %s", PQerrorMessage(conn)); PQclear(res); return false; } PQclear(res); return true; } bool db_add_error(struct kreq *r, const char *device, const char *timestamp, const char *errormsg) { if (timestamp == NULL) { const char * const params[2] = { device, errormsg }; const char * sql = "INSERT INTO \"errors\" (" "\"device\", \"error\") VALUES ($1, $2)"; return _exec_in_tx(r, sql, params, 2); } else { const char * const params[3] = { device, timestamp, errormsg }; const char * sql = "INSERT INTO \"errors\" (" "\"device\", \"timestamp\", \"error\"" ") VALUES ($1, $2, $3)"; return _exec_in_tx(r, sql, params, 3); } } bool db_add_data(struct kreq *r, const char *temp, const char *humidity, const char *pressure, const char *battery) { const char * const params[4] = { temp, humidity, pressure, battery }; const char * sql = "INSERT INTO \"data\" (" "\"timestamp\", \"temp\", \"humidity\", \"pressure\", \"battery\") " "VALUES (NOW(), $1, $2, $3, $4)"; return _exec_in_tx(r, sql, params, 4); } /* * Accepts SQL query strings and replaces %s%s%s with a WHERE statement * depending on the passed _from and _until parameters. If both parameters * are not NULL the WHERE statement looks like this: * WHERE (timestamp >= $1::timestamptz AND timestamp < $2::timestamptz) * For only one parameter being not NULL corresponding part of the * statement above is used alone. * If both _from and _until are NULL an empty string will be used. */ static bool _query(struct kreq *r, char **_json, const char *_sqlfmt, const char *_from, const char *_until) { if (! db_connect(r)) return false; int num_params = 0; if (_from) num_params += 1; if (_until) num_params += 1; const char *where = (num_params != 0) ? " WHERE (" : ""; const char *from = (_from) ? "\"timestamp\" >= $1::timestamptz" : ""; const char *until = (_until) ? ( (num_params == 2) ? " AND \"timestamp\" < $2::timestamptz)" : "\"timestamp\" < $1::timestamptz)") : (num_params == 1) ? ")" : ""; char *sql; if (asprintf(&sql, _sqlfmt, where, from, until) == -1) { kutil_warnx(r, NULL, "Unable to create query string"); return false; } //kutil_info(r, NULL, "query = %s", sql); const char * const params[2] = { ((num_params == 1) && _until) ? _until: _from, _until }; PGresult *res = PQexecParams(conn, sql, num_params, NULL, params, NULL, NULL, 0); if (PQresultStatus(res) != PGRES_TUPLES_OK) { kutil_warnx(r, NULL, "Query %s failed: %s", sql, PQerrorMessage(conn)); PQclear(res); free(sql); return false; } free(sql); const int rows = PQntuples(res); if (rows == 0) { *_json = strdup("[]"); PQclear(res); return true; } else if (rows != 1) { kutil_warnx(r, NULL, "Got %d rows, no json?", rows); PQclear(res); return false; } *_json = strdup(PQgetvalue(res, 0, 0)); PQclear(res); return true; } bool db_get_statistics_data_json(struct kreq *r, char **_json, const char *_from, const char *_until) { return _query(r, _json, "SELECT coalesce(row_to_json(\"d\"), '[]'::json)" " AS \"data\" " "FROM " "(WITH \"statistics\" AS (" "SELECT \"timestamp\"::date," " round(min(\"temp\")::numeric, 2) AS \"min_temp\"," " round(max(\"temp\")::numeric, 2) AS \"max_temp\"," " round(avg(\"humidity\")::numeric, 2) AS \"avg_humidity\" " "FROM \"data\" WHERE \"timestamp\"::date IN (" "SELECT DISTINCT ON (\"timestamp\"::date) " "\"timestamp\"::date FROM \"data\" " "%s%s%s) " "GROUP BY \"timestamp\"::date " "ORDER BY \"timestamp\"::date ASC) " "SELECT array_agg(\"timestamp\") AS \"labels\", " " array_agg(\"min_temp\") AS \"min_temp\"," " array_agg(\"max_temp\") AS \"max_temp\"," " array_agg(\"avg_humidity\") AS \"avg_humidity\" " "FROM \"statistics\") \"d\"", _from, _until); } bool db_get_data_json(struct kreq *r, char **_json, const char *_from, const char *_until) { return _query(r, _json, "SELECT coalesce(row_to_json(\"d\"), '[]'::json)" " AS \"data\" " "FROM " "(WITH \"weatherdata\" AS (" "SELECT \"timestamp\", \"temp\", \"humidity\"," " \"pressure\", \"battery\" FROM \"data\" %s%s%s " "ORDER BY \"timestamp\" ASC) " "SELECT array_agg(\"timestamp\") AS \"labels\", " " array_agg(\"temp\") AS \"temp\", " " array_agg(\"humidity\") AS \"humidity\", " " array_agg(\"pressure\") AS \"pressure\", " " array_agg(\"battery\") AS \"battery\" " "FROM \"weatherdata\") \"d\"", _from, _until); } bool db_get_latest_json(struct kreq *r, char **_json, const char *_from_unused, const char *_until_unused) { return _query(r, _json, "SELECT coalesce(row_to_json(\"d\"), '[]'::json)" " AS \"data\" " "FROM " "(SELECT date_trunc('second', \"timestamp\") as \"timestamp\"," " \"avg_temp\", \"avg_humidity\", \"temp\"," " \"humidity\", \"pressure\", \"battery\" FROM (" "SELECT 'j' AS j," " round(avg(\"temp\")::numeric, 2) AS \"avg_temp\", " " round(avg(\"humidity\")::numeric, 2) AS \"avg_humidity\" " "FROM \"data\" " "WHERE \"timestamp\" >= now() - '1 hour'::interval" ") AS \"averages\" " "JOIN" " (SELECT 'j' AS \"j\", \"temp\", \"humidity\"," " \"pressure\", \"battery\", \"timestamp\" " "FROM \"data\" " "WHERE \"timestamp\" = " "(SELECT max(\"timestamp\") FROM \"data\"))" " AS \"latest\" ON \"averages\".\"j\" = \"latest\".\"j\") \"d\"", NULL, NULL); } bool db_get_errors_json(struct kreq *r, char **json, const char *device) { if (! db_connect(r)) return false; const char *select = "SELECT coalesce(jsonb_agg(\"e\"), '[]'::jsonb)" " AS \"errors\" " "FROM " "(SELECT " " date_trunc('second', \"timestamp\") AS \"timestamp\"," " \"device\", \"error\" FROM \"errors\") AS \"e\""; const char *where = (device == NULL) ? "" : " WHERE \"device\" = ?"; char *sql; if (-1 == asprintf(&sql, "%s%s", select, where)) { kutil_warnx(r, NULL, "Unable to create query string"); return false; } const size_t num_params = (device != NULL) ? 1 : 0; //kutil_info(r, NULL, "query = %s", sql); const char * const params[1] = { device, }; PGresult *res = PQexecParams(conn, sql, num_params, NULL, params, NULL, NULL, 0); if (PQresultStatus(res) != PGRES_TUPLES_OK) { kutil_warnx(r, NULL, "Query %s failed: %s", sql, PQerrorMessage(conn)); PQclear(res); free(sql); return false; } free(sql); const int rows = PQntuples(res); if (rows == 0) { *json = strdup("[]"); PQclear(res); return true; } else if (rows != 1) { kutil_warnx(r, NULL, "Got %d rows, no json?", rows); PQclear(res); return false; } *json = strdup(PQgetvalue(res, 0, 0)); PQclear(res); return true; }