1use super::core::*;
2use super::posts::core::OkuPost;
3use iroh_docs::rpc::client::docs::Entry;
4use iroh_docs::AuthorId;
5use log::error;
6use miette::IntoDiagnostic;
7use native_db::*;
8use native_model::{native_model, Model};
9use rayon::iter::{IntoParallelIterator, IntoParallelRefIterator, ParallelIterator};
10use serde::{Deserialize, Serialize};
11use std::hash::{Hash, Hasher};
12use std::{collections::HashSet, time::SystemTime};
13
14#[derive(Serialize, Deserialize, Debug, Clone)]
15#[native_model(id = 1, version = 1)]
16#[native_db(
17 primary_key(author_id -> Vec<u8>)
18)]
19pub struct OkuUser {
21 pub author_id: AuthorId,
23 pub last_fetched: SystemTime,
25 pub posts: Vec<Entry>,
27 pub identity: Option<OkuIdentity>,
29}
30
31impl PartialEq for OkuUser {
32 fn eq(&self, other: &Self) -> bool {
33 self.author_id == other.author_id
34 }
35}
36impl Eq for OkuUser {}
37impl Hash for OkuUser {
38 fn hash<H: Hasher>(&self, state: &mut H) {
39 self.author_id.hash(state);
40 }
41}
42
43impl OkuUser {
44 fn author_id(&self) -> Vec<u8> {
45 self.author_id.as_bytes().to_vec()
46 }
47}
48
49#[derive(Serialize, Deserialize, PartialEq, Debug, Clone, Default)]
50pub struct OkuIdentity {
52 pub name: String,
54 pub following: HashSet<AuthorId>,
57 pub blocked: HashSet<AuthorId>,
60}
61
62impl OkuDatabase {
63 pub fn upsert_user(&self, user: &OkuUser) -> miette::Result<Option<OkuUser>> {
73 let rw = self.database.rw_transaction().into_diagnostic()?;
74 let old_value: Option<OkuUser> = rw.upsert(user.to_owned()).into_diagnostic()?;
75 rw.commit().into_diagnostic()?;
76 Ok(old_value)
77 }
78
79 pub fn delete_user(&self, user: &OkuUser) -> miette::Result<OkuUser> {
89 let rw = self.database.rw_transaction().into_diagnostic()?;
90 let removed_user = rw.remove(user.to_owned()).into_diagnostic()?;
91 rw.commit().into_diagnostic()?;
92 Ok(removed_user)
93 }
94
95 pub fn delete_users(&self, users: &[OkuUser]) -> miette::Result<Vec<OkuUser>> {
105 let rw = self.database.rw_transaction().into_diagnostic()?;
106 let removed_users = users
107 .iter()
108 .filter_map(|user| rw.remove(user.to_owned()).ok())
109 .collect();
110 rw.commit().into_diagnostic()?;
111 Ok(removed_users)
112 }
113
114 pub fn delete_users_with_posts(&self, users: &[OkuUser]) -> miette::Result<Vec<OkuPost>> {
124 Ok(self
125 .delete_users(users)?
126 .par_iter()
127 .filter_map(|x| self.get_posts_by_author(&x.author_id).ok())
128 .collect::<Vec<_>>()
129 .into_par_iter()
130 .flat_map(|x| self.delete_posts(&x).ok())
131 .collect::<Vec<_>>()
132 .concat())
133 }
134
135 pub fn delete_by_author_ids(&self, author_ids: &Vec<AuthorId>) -> miette::Result<()> {
143 let users: Vec<_> = author_ids
144 .par_iter()
145 .filter_map(|x| self.get_user(x).ok().flatten())
146 .collect();
147 let posts: Vec<_> = author_ids
148 .into_par_iter()
149 .filter_map(|x| self.get_posts_by_author(x).ok())
150 .flatten()
151 .collect();
152 if let Err(e) = self.delete_users(&users) {
153 error!("{}", e);
154 }
155 if let Err(e) = self.delete_posts(&posts) {
156 error!("{}", e);
157 }
158 Ok(())
159 }
160
161 pub fn all_local_users(&self) -> Vec<AuthorId> {
169 let user_records: HashSet<_> = self
170 .get_users()
171 .unwrap_or_default()
172 .par_iter()
173 .map(|x| x.author_id)
174 .collect();
175 let post_record_users: HashSet<_> = self
176 .get_posts()
177 .unwrap_or_default()
178 .par_iter()
179 .map(|x| x.entry.author())
180 .collect();
181 user_records
182 .union(&post_record_users)
183 .map(|x| x.to_owned())
184 .collect()
185 }
186
187 pub fn get_users(&self) -> miette::Result<Vec<OkuUser>> {
193 let r = self.database.r_transaction().into_diagnostic()?;
194 r.scan()
195 .primary()
196 .into_diagnostic()?
197 .all()
198 .into_diagnostic()?
199 .collect::<Result<Vec<_>, _>>()
200 .into_diagnostic()
201 }
202
203 pub fn get_user(&self, author_id: &AuthorId) -> miette::Result<Option<OkuUser>> {
213 let r = self.database.r_transaction().into_diagnostic()?;
214 r.get()
215 .primary(author_id.as_bytes().to_vec())
216 .into_diagnostic()
217 }
218}