Compare commits

...

10 Commits

Author SHA1 Message Date
Vincent Breitmoser
0b69a2bc87 about: document /vks/v1/updates endpoint 2021-07-22 23:27:32 +02:00
Vincent Breitmoser
5aff7abd61 nginx: add cached route for the updates endpoint 2021-07-22 23:27:32 +02:00
Vincent Breitmoser
9965e61108 web: introduce a cache for loaded epochs 2021-07-22 23:27:32 +02:00
Vincent Breitmoser
e4696e5f1d web: introduce dynamic /updates endpoint 2021-07-22 23:27:32 +02:00
Vincent Breitmoser
17e28cac5a db: add read_log_epoch method 2021-07-22 23:27:32 +02:00
Vincent Breitmoser
b70b042194 db: represent update manifests as Vec<u32> 2021-07-21 09:45:20 +02:00
Vincent Breitmoser
9364f037f6 main: add merge_util with k-merge 2021-07-21 09:45:20 +02:00
Vincent Breitmoser
5de5e987b4 script: translate-log-to-epoch 2021-07-21 09:45:20 +02:00
Vincent Breitmoser
4b18a96fa0 db: log into epoch instead of date files 2021-07-21 09:45:20 +02:00
Justus Winter
adc00fa469 db: add update manifest data structures 2021-07-21 09:45:20 +02:00
14 changed files with 584 additions and 10 deletions

1
Cargo.lock generated
View File

@@ -844,7 +844,6 @@ version = "0.1.0"
dependencies = [
"anyhow 1.0.34 (registry+https://github.com/rust-lang/crates.io-index)",
"base64 0.10.1 (registry+https://github.com/rust-lang/crates.io-index)",
"chrono 0.4.19 (registry+https://github.com/rust-lang/crates.io-index)",
"fs2 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)",
"hex 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)",
"idna 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)",

View File

@@ -21,7 +21,6 @@ pathdiff = "0.1"
idna = "0.1"
fs2 = "0.4"
walkdir = "2.2"
chrono = "0.4"
zbase32 = "0.1.2"
[lib]

View File

@@ -1,4 +1,4 @@
use std::collections::HashMap;
use std::{collections::HashMap, io::{BufRead, BufReader}};
use std::convert::TryFrom;
use std::fs::{OpenOptions, File, create_dir_all, read_link, remove_file, rename, set_permissions, Permissions};
use std::io::Write;
@@ -22,6 +22,8 @@ use tempfile::NamedTempFile;
use openpgp::Cert;
use openpgp_utils::POLICY;
use crate::updates::Epoch;
pub struct Filesystem {
tmp_dir: PathBuf,
@@ -401,6 +403,35 @@ impl Database for Filesystem {
Ok(())
}
fn read_log_epoch(&self, epoch: Epoch) -> Result<Option<Vec<u32>>> {
if epoch >= Epoch::current().expect("before end of time") {
Err(anyhow!("Epoch must be in the past to read"))?;
}
let path = self.keys_dir_log.join(epoch.to_string());
let file = match std::fs::File::open(&path) {
Ok(file) => file,
Err(_) => return Ok(None),
};
let mut result: Vec<u32> = Vec::new();
for (i, line) in BufReader::new(file).lines().enumerate() {
let line = line?;
let mut fields = line.split_whitespace();
// timestamp field - ignore
fields.next()
.ok_or_else(|| anyhow!("Malformed line {:?}:{}: {:?}", path, i + 1, line))?;
// fingerprint field
let field = fields.next()
.ok_or_else(|| anyhow!("Malformed line {:?}:{}: {:?}", path, i + 1, line))?;
// parse only the prefix
let prefix = u32::from_str_radix(&field[0..8], 16)
.map_err(|_| anyhow!("Malformed fingerprint in line {:?}:{}", path, i + 1))?;
result.push(prefix);
}
result.sort();
Ok(Some(result))
}
fn move_tmp_to_full(&self, file: NamedTempFile, fpr: &Fingerprint) -> Result<()> {
if self.dry_run {
return Ok(());

View File

@@ -6,8 +6,6 @@ use std::str::FromStr;
use openpgp::serialize::SerializeInto;
use chrono::prelude::Utc;
#[macro_use]
extern crate anyhow;
use anyhow::Result;
@@ -24,7 +22,6 @@ extern crate time;
extern crate url;
extern crate hex;
extern crate walkdir;
extern crate chrono;
extern crate zbase32;
use tempfile::NamedTempFile;
@@ -39,9 +36,11 @@ use openpgp::{
pub mod types;
use types::{Email, Fingerprint, KeyID};
use updates::Epoch;
pub mod wkd;
pub mod sync;
pub mod updates;
mod fs;
pub use self::fs::Filesystem as KeyDatabase;
@@ -179,6 +178,8 @@ pub trait Database: Sync + Send {
fn by_email(&self, email: &Email) -> Option<String>;
fn by_email_wkd(&self, email: &Email) -> Option<Vec<u8>>;
fn read_log_epoch(&self, epoch: Epoch) -> Result<Option<Vec<u32>>>;
/// Complex operation that updates a Cert in the database.
///
/// 1. Merge new Cert with old, full Cert
@@ -346,7 +347,7 @@ pub trait Database: Sync + Send {
}
fn get_current_log_filename(&self) -> String {
Utc::now().format("%Y-%m-%d").to_string()
Epoch::current().expect("not the end of time").to_string()
}
fn get_tpk_status(&self, fpr_primary: &Fingerprint, known_addresses: &[Email]) -> Result<TpkStatus> {

View File

@@ -135,6 +135,13 @@ impl FromStr for Fingerprint {
}
}
impl Fingerprint {
/// Returns the fingerprint as slice.
pub fn as_bytes(&self) -> &[u8] {
&self.0[..]
}
}
#[derive(Serialize, Deserialize, Clone, Debug, Hash, PartialEq, Eq)]
pub struct KeyID([u8; 8]);

345
database/src/updates.rs Normal file
View File

@@ -0,0 +1,345 @@
use std::{convert::{TryFrom, TryInto}, fmt, io, ops};
use crate::types::Fingerprint;
/// The number of seconds in one epoch.
pub const SECONDS_PER_EPOCH: u64 = 1 << 15;
/// Set of certificate updates.
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct Manifest {
start: Epoch,
end: Epoch,
prefixes: Vec<u32>,
}
impl Manifest {
/// Magic string for Update Manifests.
pub const MAGIC: &'static [u8] = b"\xE4\x2B\xAF\xBD\xD5\x75\x77\x0A";
/// Creates a new Update Manifest
pub fn new<S, E>(start: S, end: E, prefixes: Vec<u32>)
-> anyhow::Result<Manifest>
where
S: Into<Epoch>,
E: Into<Epoch>,
{
let start = start.into();
let end = end.into();
if start <= end {
Ok(Manifest {
start,
end,
prefixes,
})
} else {
Err(anyhow::anyhow!("End epoch predates start epoch"))
}
}
/// Computes the prefix of the given Fingerprint.
fn prefix(fingerprint: &Fingerprint) -> u32 {
let mut prefix = [0u8; 4];
prefix.copy_from_slice(&fingerprint.as_bytes()[..4]);
u32::from_be_bytes(prefix)
}
/// Tests whether a cert is included in the Update Manifest.
///
/// Note: Due to the privacy-preserving nature of Update Manifests
/// that store only fingerprint prefixes, this may return false
/// positives.
pub fn contains(&self, fingerprint: &Fingerprint) -> bool {
self.prefixes.binary_search(&Self::prefix(fingerprint)).is_ok()
}
/// Tests whether an epoch is included in the Update Manifest.
pub fn contains_epoch(&self, epoch: Epoch) -> bool {
// Both start and end are inclusive, therefore:
self.start <= epoch && epoch <= self.end
}
/// Iterates over all epochs contained in this Update Manifest.
pub fn epochs(&self) -> impl Iterator<Item = Epoch> {
(self.start.0..self.end.0 + 1).into_iter()
.map(|n| Epoch(n))
}
/// Returns the number of epochs in this manifest.
pub fn epoch_count(&self) -> u32 {
self.end.0 - self.start.0
}
/// Returns the start epoch.
pub fn start(&self) -> Epoch {
self.start
}
/// Returns the end epoch.
pub fn end(&self) -> Epoch {
self.end
}
/// Returns the number of fingerprint prefixes in this Update
/// Manifest.
pub fn len(&self) -> usize {
self.prefixes.len()
}
/// Writes the Update Manifest to a Vec<u8>
pub fn to_vec(&self) -> Vec<u8> {
let mut result = Vec::with_capacity(Self::MAGIC.len() + 4 + 4 + self.len() * 4);
self.serialize(&mut result).expect("Writing to pre-allocated vector cannot fail.");
result
}
/// Writes the Update Manifest to the given `io::Write`r.
pub fn serialize(&self, sink: &mut dyn io::Write) -> io::Result<()> {
sink.write_all(Self::MAGIC)?;
self.start.serialize(sink)?;
self.end.serialize(sink)?;
for prefix in &self.prefixes {
sink.write_all(&prefix.to_be_bytes())?;
}
Ok(())
}
/// Reads the Epoch from the given `io::Read`er.
pub fn parse(source: &mut dyn io::Read) -> io::Result<Self> {
let mut magic = [0; 8];
source.read_exact(&mut magic)?;
if &magic[..] != Self::MAGIC {
return Err(io::Error::new(io::ErrorKind::Other,
anyhow::anyhow!("Bad magic string")));
}
let start = Epoch::parse(source)?;
let end = Epoch::parse(source)?;
if start > end {
return Err(io::Error::new(
io::ErrorKind::Other,
anyhow::anyhow!("End epoch predates start epoch")));
}
let mut prefixes = Vec::default();
let mut prefix = [0; 4];
'parse: loop {
let mut read = 0;
loop {
let n = source.read(&mut prefix[read..])?;
if n == 0 {
match read {
0 => break 'parse,
_ => return Err(io::Error::new(
io::ErrorKind::UnexpectedEof,
"Truncated fingerprint prefix")),
}
}
read += n;
if read == 4 {
prefixes.push(u32::from_be_bytes(prefix.clone()));
continue 'parse;
}
}
}
Ok(Manifest {
start,
end,
prefixes,
})
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct Epoch(u32);
impl fmt::Display for Epoch {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.0)
}
}
impl From<u32> for Epoch {
fn from(e: u32) -> Self {
Epoch(e)
}
}
impl TryFrom<std::time::SystemTime> for Epoch {
type Error = anyhow::Error;
fn try_from(t: std::time::SystemTime) -> anyhow::Result<Self> {
use std::time::*;
let unix_epoch = t.duration_since(UNIX_EPOCH)?;
Self::try_from_unix(unix_epoch.as_secs())
}
}
impl Epoch {
/// Returns the currently active Epoch.
pub fn current() -> anyhow::Result<Epoch> {
std::time::SystemTime::now().try_into()
}
/// Returns the epoch for the given UNIX epoch.
pub fn try_from_unix(t: u64) -> anyhow::Result<Self> {
Ok(Epoch((t / SECONDS_PER_EPOCH).try_into()?))
}
/// Returns the previous Epoch, if any.
pub fn pred(&self) -> Option<Epoch> {
self.0.checked_sub(1).map(|e| Epoch(e))
}
/// Returns the next Epoch, if any.
pub fn succ(&self) -> Option<Epoch> {
self.0.checked_add(1).map(|e| Epoch(e))
}
/// Returns the start unix timestamp of this Epoch.
pub fn unix_start(&self) -> u64 {
self.0 as u64 * SECONDS_PER_EPOCH
}
/// Returns an iterator over all epochs starting from this one to
/// `other`, in ascending order, excluding `other`.
pub fn until(&self, other: Epoch)
-> anyhow::Result<impl Iterator<Item = Epoch>> {
if *self > other {
return Err(anyhow::anyhow!("self is later than other"));
}
Ok((self.0..other.0).into_iter().map(|e| Epoch(e)))
}
/// Writes the Epoch to the given `io::Write`r.
pub fn serialize(&self, sink: &mut dyn io::Write) -> io::Result<()> {
sink.write_all(&self.0.to_be_bytes())
}
/// Reads the Epoch from the given `io::Read`er.
pub fn parse(source: &mut dyn io::Read) -> io::Result<Self> {
let mut be_bytes = [0; 4];
source.read_exact(&mut be_bytes)?;
Ok(Self(u32::from_be_bytes(be_bytes)))
}
}
impl std::str::FromStr for Epoch {
type Err = anyhow::Error;
fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
if s == "current" {
Ok(Epoch::current()?)
} else {
Ok(Epoch(s.parse()?))
}
}
}
impl From<Epoch> for time::Tm {
fn from(e: Epoch) -> time::Tm {
time::at_utc(time::Timespec::new(e.unix_start() as i64, 0))
}
}
impl ops::Add<Epoch> for Epoch {
type Output = i64;
fn add(self, other: Epoch) -> i64 {
(self.0 as i64) + (other.0 as i64)
}
}
impl ops::Sub<Epoch> for Epoch {
type Output = i64;
fn sub(self, other: Epoch) -> i64 {
(self.0 as i64) - (other.0 as i64)
}
}
impl ops::Add<u32> for Epoch {
type Output = Self;
fn add(self, other: u32) -> Self {
Self(self.0 + other)
}
}
impl ops::Sub<u32> for Epoch {
type Output = Self;
fn sub(self, other: u32) -> Self {
Self(self.0 - other)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn current_epoch() -> crate::Result<()> {
let _ = Epoch::current()?;
Ok(())
}
/// Checks serialization using the sample Update Manifest.
///
/// ```text
/// Below is a hexdump of an update manifest that covers epochs from 15296
/// to 15322 (inclusive). This corresponds to the time range from
/// 1985-11-18T22:35:28 through 1985-11-29T04:21:03 (inclusive). The
/// observed updated certificates had five primary key fingerprint
/// prefixes:
///
/// 32144D9D, 65FB1218, 7E91F402, 9ED85A5E, EA71546A
///
/// 00000000 e4 2b af bd d5 75 77 0a 00 00 3b c0 00 00 3b da
/// 00000010 32 14 4d 9d 65 fb 12 18 7e 91 f4 02 9e d8 5a 5e
/// 00000020 ea 71 54 6a
/// ```
fn sample_manifest() -> crate::Result<(Epoch, Epoch,
Vec<Fingerprint>,
&'static[u8])> {
let start = Epoch(15296);
let end = Epoch(15322);
let fp0: Fingerprint =
"32144D9DAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA".parse()?;
let fp1: Fingerprint =
"65FB1218AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA".parse()?;
let fp2: Fingerprint =
"7E91F402AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA".parse()?;
let fp3: Fingerprint =
"9ED85A5EAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA".parse()?;
let fp4: Fingerprint =
"EA71546AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA".parse()?;
let bytes = b"\
\xe4\x2b\xaf\xbd\xd5\x75\x77\x0a\x00\x00\x3b\xc0\x00\x00\x3b\xda\
\x32\x14\x4d\x9d\x65\xfb\x12\x18\x7e\x91\xf4\x02\x9e\xd8\x5a\x5e\
\xea\x71\x54\x6a";
Ok((start, end, vec![fp0, fp1, fp2, fp3, fp4], bytes))
}
#[test]
fn parse() -> crate::Result<()> {
let (start, end, fingerprints, bytes) = sample_manifest()?;
let manifest = Manifest::parse(&mut io::Cursor::new(bytes))?;
assert_eq!(manifest.start, start);
assert_eq!(manifest.end, end);
assert!(manifest.contains(&fingerprints[3]));
assert!(manifest.contains(&fingerprints[1]));
assert!(manifest.contains(&fingerprints[0]));
assert!(manifest.contains(&fingerprints[4]));
assert!(manifest.contains(&fingerprints[2]));
assert_eq!(manifest.len(), 5);
Ok(())
}
}

View File

@@ -44,6 +44,19 @@
</p>
</li>
<li>
<tt>GET /vks/v1/updates/&lt;EPOCH&gt;</tt>
<p>
Retrieves a <a href="https://gitlab.com/hagrid-keyserver/keystore-updates">keystore update manifest</a>
from the specified <tt>epoch</tt> to the latest completed one.
An update manifest contains fingerprint prefixes of keys that received updates
during the requested time span.
This can be used by a client to find which keys in its local storage have received updates in a single request.
The server will return an HTTP <tt>404</tt> error if there is no data for the given time span.
The returned data has a content-type of <code>application/pgp-keystore-update-manifest</code>.
</p>
</li>
<li>
<tt>POST /vks/v1/upload</tt>
<p>

View File

@@ -76,6 +76,14 @@ location /vks {
# try_files /keys/links/by-email/$1/$2/$3 =404;
}
location /vks/v1/updates {
add_header 'Access-Control-Allow-Origin' '*' always;
etag off;
proxy_cache update_manifests;
proxy_pass http://127.0.0.1:8080;
}
add_header 'Access-Control-Allow-Origin' '*' always;
add_header 'Cache-Control' 'no-cache' always;
etag off;

View File

@@ -24,7 +24,8 @@ http {
limit_req_zone $limit_loose zone=search_email_loose:10m rate=1r/m;
limit_req_zone $binary_remote_addr zone=search_fpr_keyid:10m rate=5r/s;
proxy_cache_path /tmp/nginx_cache use_temp_path=off keys_zone=static_cache:10m;
proxy_cache_path /tmp/nginx_cache/static keys_zone=static_cache:1m use_temp_path=off;
proxy_cache_path /tmp/nginx_cache/updates keys_zone=update_manifests:1m use_temp_path=off inactive=480m;
proxy_cache_valid 200 5m;
server {

View File

@@ -38,6 +38,7 @@ mod i18n_helpers;
mod gettext_strings;
mod web;
mod template_helpers;
mod merge_util;
fn main() {
if let Err(e) = web::serve() {

61
src/merge_util.rs Normal file
View File

@@ -0,0 +1,61 @@
// Blog post: https://creativcoder.dev/merge-k-sorted-arrays-rust
// Merge k sorted arrays
use std::collections::BinaryHeap;
use std::cmp::Reverse;
use std::cmp::Ordering;
#[derive(Debug, Eq)]
struct Item<'a> {
arr: &'a &'a [u32],
idx: usize
}
impl<'a> PartialEq for Item<'a> {
fn eq(&self, other: &Self) -> bool {
self.get_item() == other.get_item()
}
}
impl<'a> PartialOrd for Item<'a> {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
self.get_item().partial_cmp(&other.get_item())
}
}
impl<'a> Ord for Item<'a> {
fn cmp(&self, other: &Self) -> Ordering {
self.get_item().cmp(&other.get_item())
}
}
impl<'a> Item<'a> {
fn new(arr: &'a &[u32], idx: usize) -> Self {
Self { arr, idx }
}
fn get_item(&self) -> u32 {
self.arr[self.idx]
}
}
pub fn merge_vectors(arrays: Vec<&[u32]>) -> Vec<u32> {
let mut sorted = Vec::with_capacity(arrays.iter().map(|a| a.len()).sum());
let mut heap = BinaryHeap::with_capacity(arrays.len());
for arr in &arrays {
let item = Item::new(arr, 0);
heap.push(Reverse(item));
}
while !heap.is_empty() {
let mut it = heap.pop().unwrap();
sorted.push(it.0.get_item());
it.0.idx += 1;
if it.0.idx < it.0.arr.len() {
heap.push(it)
}
}
sorted
}

View File

@@ -1,4 +1,3 @@
use rocket;
use rocket::http::{Header, Status};
use rocket::request;
use rocket::outcome::Outcome;
@@ -30,6 +29,8 @@ use crate::database::{Database, KeyDatabase, Query};
use crate::database::types::Fingerprint;
use crate::Result;
use crate::web::vks_updates::UpdateEpochCache;
use std::convert::TryInto;
mod hkp;
@@ -38,6 +39,7 @@ mod maintenance;
mod vks;
mod vks_web;
mod vks_api;
mod vks_updates;
mod debug_web;
use crate::web::maintenance::MaintenanceMode;
@@ -417,6 +419,8 @@ fn rocket_factory(mut rocket: rocket::Rocket) -> Result<rocket::Rocket> {
vks_web::verify_confirm_form,
vks_web::quick_upload,
vks_web::quick_upload_proceed,
// Update Manifests
vks_updates::get_update_manifest,
// Debug
debug_web::debug_info,
// HKP
@@ -443,6 +447,7 @@ fn rocket_factory(mut rocket: rocket::Rocket) -> Result<rocket::Rocket> {
let rate_limiter = configure_rate_limiter(rocket.config())?;
let maintenance_mode = configure_maintenance_mode(rocket.config())?;
let localized_template_list = configure_localized_template_list(rocket.config())?;
let update_cache = UpdateEpochCache::new();
println!("{:?}", localized_template_list);
let prometheus = configure_prometheus(rocket.config());
@@ -462,6 +467,7 @@ fn rocket_factory(mut rocket: rocket::Rocket) -> Result<rocket::Rocket> {
.manage(db_service)
.manage(rate_limiter)
.manage(localized_template_list)
.manage(update_cache)
.mount("/", routes);
if let Some(prometheus) = prometheus {
@@ -568,7 +574,7 @@ pub mod tests {
use std::io::Write;
use std::path::Path;
use tempfile::{tempdir, TempDir};
use super::rocket;
use rocket;
use rocket::local::{Client, LocalResponse};
use rocket::http::Status;
use rocket::http::ContentType;

85
src/web/vks_updates.rs Normal file
View File

@@ -0,0 +1,85 @@
use std::{collections::HashMap, sync::RwLock};
use rocket::http::hyper::header::{Expires, HttpDate};
use crate::{database::updates::{Epoch, Manifest}, merge_util::merge_vectors};
use crate::database::{Database, KeyDatabase};
use crate::Result;
const EPOCH_SERVE_LIMIT: u32 = 120;
pub struct UpdateEpochCache(RwLock<HashMap<Epoch, Vec<u32>>>);
impl UpdateEpochCache {
pub fn new() -> Self {
UpdateEpochCache(RwLock::new(HashMap::new()))
}
}
#[derive(Responder)]
pub enum ManifestUpdateResponse {
#[response(status = 200, content_type = "application/pgp-keystore-update-manifest")]
Binary(Vec<u8>, Expires),
#[response(status = 404)]
NotFound(String),
#[response(status = 400)]
BadRequest(String),
}
#[get("/vks/v1/updates/<epoch>")]
pub fn get_update_manifest(
db: rocket::State<KeyDatabase>,
cache: rocket::State<UpdateEpochCache>,
epoch: u32,
) -> ManifestUpdateResponse {
let epoch_now = Epoch::current().unwrap();
let epoch_since = Epoch::from(epoch);
if epoch_since >= epoch_now {
return ManifestUpdateResponse::BadRequest("Requested epoch must be in the past and completed".to_owned());
}
if epoch_now - epoch_since > (EPOCH_SERVE_LIMIT as i64) {
return ManifestUpdateResponse::NotFound(
format!("Updafe manifest data only available for the last {} epochs", EPOCH_SERVE_LIMIT));
}
if let Err(e) = provision_cache_since(&db, &cache, &epoch_since) {
return ManifestUpdateResponse::NotFound(e.to_string());
}
let cache_lock = cache.0.read().expect("lock can't be poisoned");
let mut epoch_data: Vec<&[u32]> = Vec::with_capacity((epoch_now - epoch_since) as usize);
for e in epoch_since.until(epoch_now).expect("epoch_since is before epoch_now") {
if let Some(v) = cache_lock.get(&e) {
epoch_data.push(&v);
}
}
let prefixes = merge_vectors(epoch_data);
let manifest = Manifest::new(epoch_since, epoch_now.pred().unwrap(), prefixes).unwrap();
let expires = Expires(HttpDate(epoch_now.succ().expect("We're not at the end of time").into()));
ManifestUpdateResponse::Binary(manifest.to_vec(), expires)
}
fn provision_cache_since(db: &KeyDatabase, cache: &UpdateEpochCache, epoch_since: &Epoch) -> Result<()> {
let epoch_now = Epoch::current().expect("not the end of time");
let mut cache_lock = cache.0.write().expect("lock can't be poisoned");
for epoch in epoch_since.until(epoch_now).expect("epoch_since is before epoch_now") {
if cache_lock.contains_key(&epoch) {
continue;
}
match db.read_log_epoch(epoch) {
Err(e) => {
eprintln!("{:?}", e);
Err(anyhow!("No update manifest data available for requested epoch"))?
},
Ok(None) => Err(anyhow!("No update manifest data available for requested epoch"))?,
Ok(Some(prefixes)) => cache_lock.insert(epoch, prefixes),
};
}
let ref epoch_earliest = epoch_now - EPOCH_SERVE_LIMIT;
cache_lock.retain(|k, _| k > epoch_earliest);
Ok(())
}

17
translate-log-to-epoch Executable file
View File

@@ -0,0 +1,17 @@
#!/usr/bin/env zsh
# this script translates hagrid's update logs from "per-day" format into "per-epoch" format.
(( $# > 0 )) || { echo "Usage: $0 logfiles.." >&2; exit 1; }
for f in "$@"; do
integer next_epoch_time=0
while read -r line; do
timestamp=${line%% *}
if (( timestamp >= next_epoch_time )); then
current_epoch=$(( timestamp / (1<<15) ))
next_epoch_time=$(( (current_epoch + 1) * (1<<15) ))
echo writing epoch $current_epoch
fi
echo $line >> $current_epoch
done < $f
done