oku_fs/database/dht.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135
use super::core::*;
use miette::IntoDiagnostic;
use native_db::*;
use native_model::{native_model, Model};
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize, Debug, Clone)]
#[native_model(id = 3, version = 1)]
#[native_db(
primary_key(primary_key -> (Vec<u8>, Vec<u8>))
)]
/// A record of a replica announcement on the DHT.
pub struct ReplicaAnnouncement {
/// The public key of the announcement.
#[primary_key]
pub key: Vec<u8>,
/// The signature of the announcement.
pub signature: Vec<u8>,
}
impl OkuDatabase {
/// Insert or update a replica announcement record.
///
/// # Arguments
///
/// * `announcement` - A replica announcement record to upsert.
///
/// # Returns
///
/// The previous record of the announcement, if one existed.
pub fn upsert_announcement(
&self,
announcement: &ReplicaAnnouncement,
) -> miette::Result<Option<ReplicaAnnouncement>> {
let rw = self.database.rw_transaction().into_diagnostic()?;
let old_value: Option<ReplicaAnnouncement> =
rw.upsert(announcement.to_owned()).into_diagnostic()?;
rw.commit().into_diagnostic()?;
Ok(old_value)
}
/// Insert or update multiple replica announcement records.
///
/// # Arguments
///
/// * `announcements` - A list of replica announcement records to upsert.
///
/// # Returns
///
/// A list containing the previous record of each announcement, if one existed.
pub fn upsert_announcements(
&self,
announcements: &[ReplicaAnnouncement],
) -> miette::Result<Vec<Option<ReplicaAnnouncement>>> {
let rw = self.database.rw_transaction().into_diagnostic()?;
let old_announcements: Vec<_> = announcements
.iter()
.cloned()
.filter_map(|announcement| rw.upsert(announcement).ok())
.collect();
rw.commit().into_diagnostic()?;
Ok(old_announcements)
}
/// Delete a replica announcement record.
///
/// # Arguments
///
/// * `announcement` - A replica announcement record to delete.
///
/// # Returns
///
/// The deleted replica announcement record.
pub fn delete_announcement(
&self,
announcement: &ReplicaAnnouncement,
) -> miette::Result<ReplicaAnnouncement> {
let rw = self.database.rw_transaction().into_diagnostic()?;
let removed_announcement = rw.remove(announcement.to_owned()).into_diagnostic()?;
rw.commit().into_diagnostic()?;
Ok(removed_announcement)
}
/// Delete multiple replica announcement records.
///
/// # Arguments
///
/// * `announcements` - A list of replica announcement records to delete.
///
/// # Returns
///
/// A list containing the deleted replica announcement records.
pub fn delete_announcements(
&self,
announcements: &[ReplicaAnnouncement],
) -> miette::Result<Vec<ReplicaAnnouncement>> {
let rw = self.database.rw_transaction().into_diagnostic()?;
let removed_announcements = announcements
.iter()
.filter_map(|announcement| rw.remove(announcement.to_owned()).ok())
.collect();
rw.commit().into_diagnostic()?;
Ok(removed_announcements)
}
/// Gets the replica announcements recorded by this node.
///
/// # Returns
///
/// The replica announcements recorded by this node.
pub fn get_announcements(&self) -> miette::Result<Vec<ReplicaAnnouncement>> {
let r = self.database.r_transaction().into_diagnostic()?;
r.scan()
.primary()
.into_diagnostic()?
.all()
.into_diagnostic()?
.collect::<Result<Vec<_>, _>>()
.into_diagnostic()
}
/// Gets a replica announcement record by its public key.
///
/// # Arguments
///
/// * `key` - The public key of the DHT announcement.
///
/// # Returns
///
/// A replica announcement record.
pub fn get_announcement(&self, key: &Vec<u8>) -> miette::Result<Option<ReplicaAnnouncement>> {
let r = self.database.r_transaction().into_diagnostic()?;
r.get().primary(key.to_owned()).into_diagnostic()
}
}