基本完成

This commit is contained in:
2024-05-03 16:15:40 +08:00
parent 1267ba6c90
commit ad78dad7c7
24 changed files with 566 additions and 190 deletions

35
src/common.cpp Normal file
View File

@@ -0,0 +1,35 @@
# include <hpcstat/common.hpp>
# include <boost/filesystem.hpp>
# include <boost/process.hpp>
# include <boost/dll.hpp>
namespace hpcstat
{
std::optional<std::string> exec
(std::filesystem::path program, std::vector<std::string> args, std::optional<std::string> stdin)
{
namespace bp = boost::process;
bp::ipstream output;
bp::opstream input;
std::unique_ptr<bp::child> process;
if (stdin)
{
process = std::make_unique<bp::child>
(program.string(), bp::args(args), bp::std_out > output, bp::std_err > stderr, bp::std_in < input);
input << *stdin;
input.close();
}
else process = std::make_unique<bp::child>
(program.string(), bp::args(args), bp::std_out > output, bp::std_err > stderr, bp::std_in < bp::null);
process->wait();
if (process->exit_code() != 0) return std::nullopt;
std::stringstream ss;
ss << output.rdbuf();
return ss.str();
}
long now()
{
return std::chrono::duration_cast<std::chrono::seconds>
(std::chrono::system_clock::now().time_since_epoch()).count();
}
}

19
src/env.cpp Normal file
View File

@@ -0,0 +1,19 @@
# include <iostream>
# include <hpcstat/env.hpp>
# include <fmt/format.h>
# include <unistd.h>
namespace hpcstat::env
{
bool interactive() { return isatty(fileno(stdin)); }
std::optional<std::string> env(std::string name, bool required)
{
if (auto value = std::getenv(name.c_str()); !value)
{
if (required) std::cerr << fmt::format("Failed to get environment variable {}\n", name);
return std::nullopt;
}
else return value;
}
// XDG_SESSION_ID HPCSTAT_SUBACCOUNT SSH_CONNECTION
}

17
src/keys.cpp Normal file
View File

@@ -0,0 +1,17 @@
# include <hpcstat/keys.hpp>
namespace hpcstat
{
std::map<std::string, Key> Keys
{
{ "LNoYfq/SM7l8sFAy325WpC+li+kZl3jwST7TmP72Tz8", { "chn", "Haonan Chen" } },
{ "VJT5wgkb2RcIeVNTA+/NKxokctbYnJ/KgH6IxrKqIGE", { "gb", "Bin Gong" } },
{ "umC3/RB1vS8TQBHsY3IzhOiyqVrOSw2fB3rIpDQSmf4", { "xll", "Leilei Xiang" } },
{ "fdq5k13N2DAzIK/2a1Mm4/ZVsDUgT623TSOXsVswxT8", { "yjq", "Junqi Yao" } },
{ "8USxEYi8ePPpLhk5FYBo2udT7/NFmEe8c2+oQajGXzA", { "zem", "Enming Zhang" } },
{ "7bmG24muNsaAZkCy7mQ9Nf2HuNafmvUO+Hf1bId9zts", { "00", "Yaping Wu" } },
{ "dtx0QxdgFrXn2SYxtIRz43jIAH6rLgJidSdTvuTuews", { "01", "Jing Li" } },
{ "8crUO9u4JiVqw3COyjXfzZe87s6XZFhvi0LaY0Mv6bg", { "02", "Huahan Zhan" } },
{ "QkmIYw7rmDEAP+LDWxm6L2/XLnAqTwRUB7B0pxYlOUs", { "03", "Na Gao" } }
};
}

100
src/lfs.cpp Normal file
View File

@@ -0,0 +1,100 @@
# include <regex>
# include <iostream>
# include <set>
# include <hpcstat/lfs.hpp>
# include <hpcstat/common.hpp>
# include <hpcstat/env.hpp>
# include <boost/process.hpp>
# include <fmt/format.h>
# include <nlohmann/json.hpp>
namespace hpcstat::lfs
{
std::optional<std::pair<unsigned, std::string>> bsub(std::vector<std::string> args)
{
if (auto bsub = env::env("HPCSTAT_BSUB", true); !bsub)
return std::nullopt;
else
{
std::set<std::string> valid_args = { "J" "q" "n" "R" "o" };
for (auto it = args.begin(); it != args.end(); it++)
{
if (it->length() > 0 && (*it)[0] == '-')
{
if (!valid_args.contains(it->substr(1)))
{
std::cerr << fmt::format("Unknown bsub argument: {}\n", *it)
<< "bsub might support this argument, but hpcstat currently does not support it.\n"
"If you are sure this argument is supported by bsub,\n"
"please submit issue on [github](https://github.com/CHN-beta/hpcstat) or contact chn@chn.moe.\n";
return std::nullopt;
}
if (it + 1 != args.end() && ((it + 1)->length() == 0 || (*(it + 1))[0] != '-')) it++;
}
else break;
}
if (auto result = exec(*bsub, args); result) return std::nullopt;
else
{
// Job <462270> is submitted to queue <normal_1day>.
std::regex re(R"r(Job <(\d+)> is submitted to queue <(\w+)>.)r");
std::smatch match;
if (std::regex_search(*result, match, re))
return std::make_pair(std::stoi(match[1]), match[2]);
else
{
std::cerr << fmt::format("Failed to parse job id from output: {}\n", *result);
return std::nullopt;
}
}
}
}
std::optional<std::map<unsigned, std::tuple<std::string, std::string, double>>> bjobs_list()
{
if
(
auto result = exec
(
boost::process::search_path("bjobs").string(),
{ "-a", "-o", "jobid submit_time stat cpu_used", "-json" }
);
!result
)
return std::nullopt;
else
{
nlohmann::json j;
try { j = nlohmann::json::parse(*result); }
catch (nlohmann::json::parse_error& e)
{
std::cerr << fmt::format("Failed to parse bjobs output: {}\n", e.what());
return std::nullopt;
}
std::map<unsigned, std::tuple<std::string, std::string, double>> jobs;
for (auto& job : j["RECORDS"])
{
std::string status = job["STAT"];
if (!std::set<std::string>{ "DONE", "EXIT" }.contains(status)) continue;
std::string submit_time = job["SUBMIT_TIME"];
std::string cpu_used_str = job["CPU_USED"];
double cpu_used = std::stof(cpu_used_str.substr(0, cpu_used_str.find(' ')));
jobs[std::stoi(job["JOBID"].get<std::string>())] = { submit_time, status, cpu_used };
}
return jobs;
}
}
std::optional<std::string> bjobs_detail(unsigned jobid)
{
if
(
auto result = exec
(
boost::process::search_path("bjobs").string(),
{ "-l", std::to_string(jobid) }
);
!result
)
return std::nullopt;
else return *result;
}
}

View File

@@ -1,213 +1,112 @@
# include <vector>
# include <string>
# include <optional>
# include <sstream>
# include <map>
# include <filesystem>
# include <regex>
# include <iostream>
# include <chrono>
# include <boost/filesystem.hpp>
# include <boost/process.hpp>
# include <boost/dll.hpp>
# include <hpcstat/sql.hpp>
# include <hpcstat/ssh.hpp>
# include <hpcstat/env.hpp>
# include <hpcstat/common.hpp>
# include <hpcstat/keys.hpp>
# include <hpcstat/lfs.hpp>
# include <fmt/format.h>
# include <fmt/ranges.h>
# include <zxorm/zxorm.hpp>
using namespace std::literals;
// ssh fingerprint -> username
const std::map<std::string, std::string> Username
{
{ "LNoYfq/SM7l8sFAy325WpC+li+kZl3jwST7TmP72Tz8", "Haonan Chen" },
{ "VJT5wgkb2RcIeVNTA+/NKxokctbYnJ/KgH6IxrKqIGE", "Bin Gong" },
{ "umC3/RB1vS8TQBHsY3IzhOiyqVrOSw2fB3rIpDQSmf4", "Leilei Xiang" },
{ "fdq5k13N2DAzIK/2a1Mm4/ZVsDUgT623TSOXsVswxT8", "Junqi Yao" },
{ "8USxEYi8ePPpLhk5FYBo2udT7/NFmEe8c2+oQajGXzA", "Enming Zhang" },
{ "7bmG24muNsaAZkCy7mQ9Nf2HuNafmvUO+Hf1bId9zts", "Yaping Wu" },
{ "dtx0QxdgFrXn2SYxtIRz43jIAH6rLgJidSdTvuTuews", "Jing Li" },
{ "8crUO9u4JiVqw3COyjXfzZe87s6XZFhvi0LaY0Mv6bg", "Huahan Zhan" },
{ "QkmIYw7rmDEAP+LDWxm6L2/XLnAqTwRUB7B0pxYlOUs", "Na Gao" }
};
// program path, set at start of main, e.g. /gpfs01/.../bin/hpcstat
std::filesystem::path Program;
// run a program, wait until it exit, return its stdout if it return 0, otherwise nullopt
std::optional<std::string> exec(boost::filesystem::path program, std::vector<std::string> args)
{
namespace bp = boost::process;
bp::ipstream output;
auto process = bp::child
(program, bp::args(args), bp::std_out > output, bp::std_err > stderr, bp::std_in < bp::null);
process.wait();
if (process.exit_code() != 0) return std::nullopt;
std::stringstream ss;
ss << output.rdbuf();
return ss.str();
}
// detect ssh fingerprint using ssh-add
// always assume sha256 fingerprint
std::optional<std::vector<std::string>> fingerprints()
{
auto output =
exec(Program.replace_filename("ssh-add"), { "-l" });
if (!output) { std::cerr << "Failed to get ssh fingerprints\n"; return std::nullopt; }
auto fingerprint = output->substr(0, 47);
// search for all strings that match the fingerprint pattern: sha256:...
std::regex pattern(R"r(\b(?:sha|SHA)256:([0-9A-Za-z+/=]{43})\b)r");
std::smatch match;
std::vector<std::string> fingerprints;
for (auto i = std::sregex_iterator(output->begin(), output->end(), pattern); i != std::sregex_iterator(); i++)
fingerprints.push_back(i->str(1));
return fingerprints;
}
// get an authenticated fingerprint and username
std::optional<std::pair<std::string, std::string>> authenticated()
{
auto fps = fingerprints();
if (!fps) return std::nullopt;
for (auto& fp : *fps)
if (Username.contains(fp)) return std::make_pair(fp, Username.at(fp));
std::cerr << fmt::format("No valid fingerprint found, available fingerprints: {}\n", *fps);
return std::nullopt;
}
// initialize the database
struct LoginData
{
unsigned Id = 0; long Time = 0;
std::string Key, SessionId;
std::optional<std::string> Subaccount;
bool Interactive;
};
using LoginTable = zxorm::Table
<
"login", LoginData,
zxorm::Column<"id", &LoginData::Id, zxorm::PrimaryKey<>>,
zxorm::Column<"time", &LoginData::Time>,
zxorm::Column<"key", &LoginData::Key>,
zxorm::Column<"session_id", &LoginData::SessionId>,
zxorm::Column<"sub_account", &LoginData::Subaccount>,
zxorm::Column<"interactive", &LoginData::Interactive>
>;
struct LogoutData { unsigned Id = 0; long Time = 0; std::string SessionId; };
using LogoutTable = zxorm::Table
<
"logout", LogoutData,
zxorm::Column<"id", &LogoutData::Id, zxorm::PrimaryKey<>>,
zxorm::Column<"time", &LogoutData::Time>, zxorm::Column<"sessionid", &LogoutData::SessionId>
>;
struct SubmitJobData
{ unsigned Id = 0; long Time = 0; int JobId; std::string Key, SessionId, Subaccount, SubmitDir, JobCommand; };
using SubmitJobTable = zxorm::Table
<
"submitjob", SubmitJobData,
zxorm::Column<"id", &SubmitJobData::Id, zxorm::PrimaryKey<>>,
zxorm::Column<"time", &SubmitJobData::Time>,
zxorm::Column<"job_id", &SubmitJobData::JobId>,
zxorm::Column<"key", &SubmitJobData::Key>,
zxorm::Column<"session_id", &SubmitJobData::SessionId>,
zxorm::Column<"sub_account", &SubmitJobData::Subaccount>,
zxorm::Column<"submit_dir", &SubmitJobData::SubmitDir>,
zxorm::Column<"job_command", &SubmitJobData::JobCommand>
>;
struct FinishJobData { unsigned Id = 0; long Time = 0; int JobId; std::string JobResult; double CpuTime; };
using FinishJobTable = zxorm::Table
<
"finishjob", FinishJobData,
zxorm::Column<"id", &FinishJobData::Id, zxorm::PrimaryKey<>>,
zxorm::Column<"time", &FinishJobData::Time>,
zxorm::Column<"job_id", &FinishJobData::JobId>,
zxorm::Column<"job_result", &FinishJobData::JobResult>,
zxorm::Column<"cpu_time", &FinishJobData::CpuTime>
>;
struct QueryJobData { unsigned Id = 0; int JobId; };
using QueryJobTable = zxorm::Table
<
"queryjob", QueryJobData,
zxorm::Column<"id", &QueryJobData::Id, zxorm::PrimaryKey<>>, zxorm::Column<"job_id", &QueryJobData::JobId>
>;
void initdb()
{
auto dbfile = Program.replace_filename("hpcstat.db").string();
zxorm::Connection<LoginTable, LogoutTable, SubmitJobTable, FinishJobTable, QueryJobTable>
conn(dbfile.c_str());
conn.create_tables();
}
void writedb(auto value)
{
auto dbfile = Program.replace_filename("hpcstat.db").string();
zxorm::Connection<LoginTable, LogoutTable, SubmitJobTable, FinishJobTable, QueryJobTable>
conn(dbfile.c_str());
value.Time = std::chrono::duration_cast<std::chrono::seconds>
(std::chrono::system_clock::now().time_since_epoch()).count();
conn.insert_record(value);
}
bool interactive() { return isatty(fileno(stdin)); }
// get value of XDG_SESSION_ID
std::optional<std::string> session_id()
{
if (auto value = std::getenv("XDG_SESSION_ID"); !value)
{ std::cerr << "Failed to get session id\n"; return std::nullopt; }
else return value;
}
// get value of HPCSTAT_SUBACCOUNT
std::optional<std::string> subaccount()
{ if (auto value = std::getenv("HPCSTAT_SUBACCOUNT"); value) return value; else return std::nullopt; }
# include <range/v3/view.hpp>
int main(int argc, const char** argv)
{
using namespace hpcstat;
std::vector<std::string> args(argv, argv + argc);
Program = boost::dll::program_location().string();
if (args.size() == 1) { std::cout << "Usage: hpcstat initdb|login|logout|submitjob|finishjob\n"; return 1; }
else if (args[1] == "initdb")
initdb();
{
if (!sql::initdb()) { std::cerr << "Failed to initialize database\n"; return 1; }
}
else if (args[1] == "login")
{
std::cout << "Checking your ssh certification..." << std::flush;
if (auto key = authenticated(); !key) return 1;
else if (auto session = session_id(); !session) return 1;
std::cout << "Communicating with the agent..." << std::flush;
if (auto fp = ssh::fingerprint(); !fp) return 1;
else if (auto session = env::env("XDG_SESSION_ID", true); !session)
return 1;
else
{
writedb(LoginData
{.Key = key->first, .SessionId = *session, .Subaccount = subaccount(), .Interactive = interactive()});
std::cout << fmt::format("\33[2K\rLogged in as {}.\n", key->second);
sql::LoginData data
{
.Time = now(), .Key = *fp, .SessionId = *session, .Subaccount = env::env("HPCSTAT_SUBACCOUNT"),
.Ip = env::env("SSH_CONNECTION"), .Interactive = env::interactive()
};
auto signature = ssh::sign(sql::serialize(data), *fp);
if (!signature) return 1;
data.Signature = *signature;
sql::writedb(data);
std::cout << fmt::format("\33[2K\rLogged in as {}.\n", *fp);
}
}
else if (args[1] == "logout")
{
if (auto session = session_id(); !session) return 1;
else writedb(LogoutData{.SessionId = *session});
if (auto session_id = env::env("XDG_SESSION_ID", true); !session_id)
return 1;
else sql::writedb(sql::LogoutData{ .Time = now(), .SessionId = *session_id });
}
else if (args[1] == "submitjob")
{
if (args.size() < 4) { std::cerr << "Usage: hpcstat submitjob <jobid> <submitdir> <jobcommand>\n"; return 1; }
if (auto key = authenticated(); !key) return 1;
else if (auto session = session_id(); !session) return 1;
else writedb(SubmitJobData
if (args.size() < 3) { std::cerr << "Usage: hpcstat submitjob <args passed to bsub>\n"; return 1; }
if (auto fp = ssh::fingerprint(); !fp) return 1;
else if (auto session = env::env("XDG_SESSION_ID", true); !session)
return 1;
else if
(auto bsub = lfs::bsub(args | ranges::views::drop(2) | ranges::to<std::vector<std::string>>); !bsub)
return 1;
else
{
.JobId = std::stoi(args[2]), .Key = key->first, .SessionId = *session,
.SubmitDir = std::filesystem::current_path().string(),
.JobCommand = [&]
{ std::stringstream ss; for (int i = 3; i < args.size(); i++) ss << args[i] << " "; return ss.str(); }()
});
sql::SubmitJobData data
{
.Time = now(), .JobId = bsub->first, .Key = *fp, .SessionId = *session,
.SubmitDir = std::filesystem::current_path().string(),
.JobCommand = args | ranges::views::drop(2) | ranges::views::join(' ') | ranges::to<std::string>(),
.Subaccount = env::env("HPCSTAT_SUBACCOUNT"), .Ip = env::env("SSH_CONNECTION")
};
auto signature = ssh::sign(sql::serialize(data), *fp);
if (!signature) return 1;
data.Signature = *signature;
sql::writedb(data);
std::cout << fmt::format
("Job {} was submitted to {} by {}.\n", bsub->first, bsub->second, Keys[*fp].Username);
}
}
else if (args[1] == "finishjob")
{
}
else
{
std::cerr << "Unknown command\n";
return 1;
if (auto fp = ssh::fingerprint(); !fp) return 1;
else if (auto session = env::env("XDG_SESSION_ID", true); !session)
return 1;
else if (auto all_jobs = lfs::bjobs_list(); !all_jobs) return 1;
else if
(
auto not_recorded = sql::finishjob_remove_existed
(
*all_jobs
| ranges::views::transform([](auto& it) { return std::pair{ it.first, std::get<0>(it.second) }; })
| ranges::to<std::map<unsigned, std::string>>
);
!not_recorded
)
return 1;
else for (auto jobid : *not_recorded)
{
if (auto detail = lfs::bjobs_detail(jobid); !detail) return 1;
else
{
sql::FinishJobData data
{
.Time = now(), .JobId = jobid, .JobResult = std::get<1>(all_jobs->at(jobid)),
.SubmitTime = std::get<1>(all_jobs->at(jobid)), .CpuTime = std::get<2>(all_jobs->at(jobid)),
};
if
(
auto signature = ssh::sign(sql::serialize(data), *fp);
!signature
)
return 1;
else { data.Signature = *signature; sql::writedb(data); }
}
}
}
else { std::cerr << "Unknown command.\n"; return 1; }
return 0;
}

55
src/sql.cpp Normal file
View File

@@ -0,0 +1,55 @@
# include <filesystem>
# include <set>
# include <hpcstat/sql.hpp>
# include <hpcstat/env.hpp>
# include <range/v3/range.hpp>
# include <range/v3/view.hpp>
namespace hpcstat::sql
{
std::string serialize(auto data)
{
auto [serialized_data_byte, out] = zpp::bits::data_out();
out(data).or_throw();
static_assert(sizeof(char) == sizeof(std::byte));
return { reinterpret_cast<char*>(serialized_data_byte.data()), serialized_data_byte.size() };
}
template std::string serialize(LoginData);
template std::string serialize(SubmitJobData);
template std::string serialize(FinishJobData);
std::optional<zxorm::Connection<LoginTable, LogoutTable, SubmitJobTable, FinishJobTable>> connect()
{
if (auto datadir = env::env("HPCSTAT_DATADIR", true); !datadir)
return std::nullopt;
else
{
auto dbfile = std::filesystem::path(*datadir) / "hpcstat.db";
return std::make_optional<zxorm::Connection<LoginTable, LogoutTable, SubmitJobTable, FinishJobTable>>
(dbfile.c_str());
}
}
bool initdb()
{ if (auto conn = connect(); !conn) return false; else { conn->create_tables(); return true; } }
bool writedb(auto value)
{ if (auto conn = connect(); !conn) return false; else { conn->insert_record(value); return true; } }
template bool writedb(LoginData);
template bool writedb(LogoutData);
template bool writedb(SubmitJobData);
template bool writedb(FinishJobData);
std::optional<std::set<unsigned>> finishjob_remove_existed(std::map<unsigned, std::string> jobid_submit_time)
{
if (auto conn = connect(); !conn) return std::nullopt;
else
{
auto all_job = jobid_submit_time | ranges::views::keys | ranges::to<std::vector<unsigned>>;
auto not_logged_job = all_job | ranges::to<std::set<unsigned>>;
for (auto it : conn->select_query<FinishJobData>()
.order_by<FinishJobTable::field_t<"id">>(zxorm::order_t::DESC)
.where_many(FinishJobTable::field_t<"id">().in(all_job))
.exec())
if (jobid_submit_time[it.JobId] == it.SubmitTime)
not_logged_job.erase(it.JobId);
return not_logged_job;
}
}
}

84
src/ssh.cpp Normal file
View File

@@ -0,0 +1,84 @@
# include <filesystem>
# include <iostream>
# include <regex>
# include <hpcstat/ssh.hpp>
# include <hpcstat/keys.hpp>
# include <hpcstat/env.hpp>
# include <hpcstat/common.hpp>
# include <fmt/format.h>
# include <boost/filesystem.hpp>
# include <boost/process.hpp>
# include <boost/dll.hpp>
namespace hpcstat::ssh
{
std::optional<std::string> fingerprint()
{
if (auto datadir = env::env("HPCSTAT_DATADIR", true); !datadir)
return std::nullopt;
else if
(
auto output =
exec(std::filesystem::path(*datadir) / "ssh-add", { "-l" });
!output
)
{ std::cerr << "Failed to get ssh fingerprints\n"; return std::nullopt; }
else
{
std::regex pattern(R"r(\b(?:sha|SHA)256:([0-9A-Za-z+/=]{43})\b)r");
std::smatch match;
for
(
auto i = std::sregex_iterator(output->begin(), output->end(), pattern);
i != std::sregex_iterator(); i++
)
if (Keys.contains(i->str(1))) return i->str(1);
std::cerr << fmt::format("No valid fingerprint found in:\n{}\n", *output);
return std::nullopt;
}
}
std::optional<std::string> sign(std::string message, std::string fingerprint)
{
if (auto datadir = env::env("HPCSTAT_DATADIR", true); !datadir)
return std::nullopt;
else if
(
auto output = exec
(
std::filesystem::path(*datadir) / "ssh-keygen",
{
"-Y", "sign",
"-f", fmt::format("{}/keys/{}", *datadir, Keys[fingerprint].PubkeyFilename),
"-n", "hpcstat@chn.moe", "-"
},
message
);
!output
)
{ std::cerr << fmt::format("Failed to sign message: {}\n", message); return std::nullopt; }
else return *output;
}
bool verify(std::string message, std::string signature, std::string fingerprint)
{
if (auto datadir = env::env("HPCSTAT_DATADIR", true); !datadir)
return false;
else
{
namespace bf = boost::filesystem;
auto tempdir = bf::temp_directory_path() / bf::unique_path();
bf::create_directories(tempdir);
auto signaturefile = tempdir / "signature";
std::ofstream(signaturefile) << signature;
return exec
(
std::filesystem::path(*datadir) / "ssh-keygen",
{
"-Y", "verify",
"-f", fmt::format("{}/keys/{}", *datadir, Keys[fingerprint].PubkeyFilename),
"-n", "hpcstat@chn.moe", "-s", signaturefile.string()
},
message
).has_value();
}
}
}