1use super::core::{home_replica_filters, ExportedUser};
2use crate::{
3 config::OkuFsConfig,
4 database::{
5 core::DATABASE,
6 posts::core::{OkuNote, OkuPost},
7 users::{OkuIdentity, OkuUser},
8 },
9 fs::OkuFs,
10};
11use futures::StreamExt;
12use iroh_blobs::Hash;
13use iroh_docs::sync::CapabilityKind;
14use iroh_docs::AuthorId;
15use iroh_docs::DocTicket;
16use iroh_docs::NamespaceId;
17use iroh_docs::{api::protocol::ShareMode, Author, NamespaceSecret};
18use log::debug;
19use miette::IntoDiagnostic;
20use rayon::iter::{
21 FromParallelIterator, IntoParallelIterator, IntoParallelRefIterator, ParallelIterator,
22};
23use std::{collections::HashSet, path::Path, time::SystemTime};
24
25impl OkuFs {
26 pub async fn default_author(&self) -> AuthorId {
32 self.docs.author_default().await.unwrap_or_default()
33 }
34
35 pub async fn export_user(&self) -> Option<ExportedUser> {
41 let default_author = self.get_author().await.ok();
42 let home_replica = self.home_replica().await;
43 let home_replica_ticket = match home_replica {
44 Some(home_replica_id) => self
45 .create_document_ticket(&home_replica_id, &ShareMode::Write)
46 .await
47 .ok(),
48 None => None,
49 };
50 default_author.map(|author| ExportedUser {
51 author,
52 home_replica,
53 home_replica_ticket,
54 })
55 }
56
57 pub async fn import_user(&self, exported_user: &ExportedUser) -> miette::Result<()> {
63 self.docs
64 .author_import(exported_user.author.clone())
65 .await
66 .map_err(|e| miette::miette!("{}", e))?;
67 self.docs
68 .author_set_default(exported_user.author.id())
69 .await
70 .map_err(|e| miette::miette!("{}", e))?;
71 match (
72 exported_user.home_replica,
73 exported_user.home_replica_ticket.clone(),
74 ) {
75 (Some(home_replica), Some(home_replica_ticket)) => match self
76 .fetch_replica_by_ticket(&home_replica_ticket, &None, &None)
77 .await
78 {
79 Ok(_) => (),
80 Err(_e) => self
81 .fetch_replica_by_id(&home_replica, &None)
82 .await
83 .map_err(|e| miette::miette!("{}", e))?,
84 },
85 (Some(home_replica), None) => self
86 .fetch_replica_by_id(&home_replica, &None)
87 .await
88 .map_err(|e| miette::miette!("{}", e))?,
89 _ => (),
90 }
91 Ok(())
92 }
93
94 pub async fn export_user_toml(&self) -> miette::Result<String> {
100 toml::to_string(
101 &self
102 .export_user()
103 .await
104 .ok_or(miette::miette!("No authorship credentials to export … "))?,
105 )
106 .into_diagnostic()
107 }
108
109 pub async fn import_user_toml(&self, exported_user_toml: &str) -> miette::Result<()> {
115 let exported_user: ExportedUser = toml::from_str(exported_user_toml).into_diagnostic()?;
116 self.import_user(&exported_user).await
117 }
118
119 pub async fn create_home_replica(
129 &self,
130 author: &Option<Author>,
131 ) -> miette::Result<NamespaceId> {
132 let given_author_id = author.as_ref().map(|x| x.id());
133 if let Some(given_author_id) = given_author_id.as_ref() {
134 let is_known_author_id = self
135 .docs
136 .author_list()
137 .await
138 .map_err(|e| miette::miette!(e))?
139 .any(|x| async move { x.map_or(false, |x| x == *given_author_id) })
140 .await;
141 if !is_known_author_id {
142 return Err(miette::miette!("Cannot create home replica for authors whose private key is unknown (author ID: {})", crate::fs::util::fmt_short(given_author_id)));
143 }
144 }
145
146 let default_author = self.get_author().await.map_err(|e| miette::miette!(e))?;
147 let author = author.as_ref().unwrap_or(&default_author);
148 debug!(
149 "Attempting to create home replica for author with ID {} … ",
150 crate::fs::util::fmt_short(author.id())
151 );
152 let home_replica = self
153 .docs
154 .import_namespace(iroh_docs::Capability::Write(NamespaceSecret::from_bytes(
155 &author.to_bytes(),
156 )))
157 .await
158 .map_err(|e| miette::miette!(e))?;
159 self.replica_sender.send_replace(());
160 debug!(
161 "Created home replica for author with ID {} … ",
162 crate::fs::util::fmt_short(author.id())
163 );
164 Ok(home_replica.id())
165 }
166
167 pub async fn home_replica(&self) -> Option<NamespaceId> {
173 let home_replica = NamespaceId::from(self.default_author().await.as_bytes());
174 let home_replica_capability = self.get_replica_capability(&home_replica).await.ok();
175 let home_replica_exists = match home_replica_capability {
176 Some(CapabilityKind::Write) => Some(home_replica),
177 Some(CapabilityKind::Read) => None,
178 None => None,
179 };
180 if let None = home_replica_exists {
181 debug!("Home replica does not exist; creating … ");
182 self.create_home_replica(&None).await.ok()
183 } else {
184 home_replica_exists
185 }
186 }
187
188 pub async fn identity(&self) -> Option<OkuIdentity> {
194 let profile_bytes = self
195 .read_file(&self.home_replica().await?, &"/profile.toml".into())
196 .await
197 .ok()?;
198 toml::from_str(String::from_utf8_lossy(&profile_bytes).as_ref()).ok()
199 }
200
201 pub async fn set_identity(&self, identity: &OkuIdentity) -> miette::Result<Hash> {
211 let mut validated_identity = identity.clone();
213 let me = self.default_author().await;
214 validated_identity.following.retain(|y| me != *y);
215 validated_identity.blocked.retain(|y| me != *y);
216 validated_identity.following = validated_identity
218 .following
219 .difference(&validated_identity.blocked)
220 .copied()
221 .collect();
222
223 self.create_or_modify_file(
224 &self
225 .home_replica()
226 .await
227 .ok_or(miette::miette!("No home replica set … "))?,
228 &"/profile.toml".into(),
229 toml::to_string_pretty(&validated_identity).into_diagnostic()?,
230 )
231 .await
232 }
233
234 pub async fn set_display_name(&self, display_name: &String) -> miette::Result<Hash> {
244 let mut identity = self.identity().await.unwrap_or_default();
245 identity.name = display_name.to_string();
246 self.set_identity(&identity).await
247 }
248
249 pub async fn toggle_follow(&self, author_id: &AuthorId) -> miette::Result<Hash> {
259 let mut identity = self.identity().await.unwrap_or_default();
260 match identity.following.contains(author_id) {
261 true => identity.following.remove(author_id),
262 false => identity.following.insert(*author_id),
263 };
264 self.set_identity(&identity).await
265 }
266
267 pub async fn follow(&self, author_id: &AuthorId) -> miette::Result<Hash> {
277 let mut identity = self.identity().await.unwrap_or_default();
278 match identity.following.contains(author_id) {
279 true => (),
280 false => {
281 identity.following.insert(*author_id);
282 }
283 };
284 self.set_identity(&identity).await
285 }
286
287 pub async fn unfollow(&self, author_id: &AuthorId) -> miette::Result<Hash> {
297 let mut identity = self.identity().await.unwrap_or_default();
298 if identity.following.contains(author_id) {
299 identity.following.remove(author_id);
300 };
301 self.set_identity(&identity).await
302 }
303
304 pub async fn toggle_block(&self, author_id: &AuthorId) -> miette::Result<Hash> {
314 let mut identity = self.identity().await.unwrap_or_default();
315 match identity.blocked.contains(author_id) {
316 true => identity.blocked.remove(author_id),
317 false => identity.blocked.insert(*author_id),
318 };
319 self.set_identity(&identity).await
320 }
321
322 pub async fn block(&self, author_id: &AuthorId) -> miette::Result<Hash> {
332 let mut identity = self.identity().await.unwrap_or_default();
333 match identity.blocked.contains(author_id) {
334 true => (),
335 false => {
336 identity.blocked.insert(*author_id);
337 }
338 };
339 self.set_identity(&identity).await
340 }
341
342 pub async fn unblock(&self, author_id: &AuthorId) -> miette::Result<Hash> {
352 let mut identity = self.identity().await.unwrap_or_default();
353 if identity.blocked.contains(author_id) {
354 identity.blocked.remove(author_id);
355 };
356 self.set_identity(&identity).await
357 }
358
359 pub async fn is_followed(&self, author_id: &AuthorId) -> bool {
369 self.identity()
370 .await
371 .map(|x| x.following.contains(author_id))
372 .unwrap_or(false)
373 }
374
375 pub async fn is_blocked(&self, author_id: &AuthorId) -> bool {
385 self.identity()
386 .await
387 .map(|x| x.blocked.contains(author_id))
388 .unwrap_or(false)
389 }
390
391 pub async fn is_me(&self, author_id: &AuthorId) -> bool {
401 &self.default_author().await == author_id
402 }
403
404 pub async fn user(&self) -> miette::Result<OkuUser> {
410 Ok(OkuUser {
411 author_id: self.default_author().await,
412 last_fetched: SystemTime::now(),
413 posts: self
414 .posts()
415 .await
416 .map(|x| x.into_par_iter().map(|y| y.entry).collect())
417 .unwrap_or_default(),
418 identity: self.identity().await,
419 })
420 }
421
422 pub async fn refresh_users(&self) -> miette::Result<()> {
425 let (followed_users, blocked_users) = match self.identity().await {
428 Some(identity) => (identity.following, identity.blocked),
429 None => (HashSet::new(), HashSet::new()),
430 };
431 let users_to_add: HashSet<_> = followed_users
433 .difference(&blocked_users)
434 .map(|x| x.to_owned())
435 .collect();
436 let local_users: HashSet<_> = DATABASE.all_local_users().into_par_iter().collect();
437 let users_to_delete: HashSet<_> = local_users
438 .difference(&users_to_add)
439 .map(|x| x.to_owned())
440 .collect();
441
442 for user_id in users_to_add {
443 let user = self.get_or_fetch_user(&user_id).await?;
444 let (user_followed_users, user_blocked_users) = match user.identity {
445 Some(identity) => (identity.following, identity.blocked),
446 None => (HashSet::new(), HashSet::new()),
447 };
448 for user_user in user_followed_users.difference(&user_blocked_users) {
449 self.get_or_fetch_user(user_user).await?;
450 }
451 }
452 DATABASE.delete_by_author_ids(&Vec::from_par_iter(users_to_delete))?;
453 Ok(())
454 }
455
456 pub async fn fetch_users(&self) -> miette::Result<()> {
459 let (followed_users, blocked_users) = match self.identity().await {
462 Some(identity) => (identity.following, identity.blocked),
463 None => (HashSet::new(), HashSet::new()),
464 };
465 let users_to_add: HashSet<_> = followed_users
467 .difference(&blocked_users)
468 .map(|x| x.to_owned())
469 .collect();
470 let local_users: HashSet<_> = DATABASE.all_local_users().into_par_iter().collect();
471 let users_to_delete: HashSet<_> = local_users
472 .difference(&users_to_add)
473 .map(|x| x.to_owned())
474 .collect();
475
476 for user_id in users_to_add {
477 let user = self.fetch_user(&user_id).await?;
478 let (user_followed_users, user_blocked_users) = match user.identity {
479 Some(identity) => (identity.following, identity.blocked),
480 None => (HashSet::new(), HashSet::new()),
481 };
482 for user_user in user_followed_users.difference(&user_blocked_users) {
483 self.fetch_user(user_user).await?;
484 }
485 }
486 DATABASE.delete_by_author_ids(&Vec::from_par_iter(users_to_delete))?;
487 Ok(())
488 }
489
490 pub async fn resolve_author_id(&self, author_id: &AuthorId) -> anyhow::Result<DocTicket> {
500 self.okunet_fetch_sender.send_replace(true);
501 let ticket = self
502 .resolve_namespace_id(&NamespaceId::from(author_id.as_bytes()))
503 .await;
504 self.okunet_fetch_sender.send_replace(false);
505 ticket
506 }
507
508 pub async fn fetch_profile(&self, ticket: &DocTicket) -> miette::Result<OkuIdentity> {
518 match self
519 .fetch_file_with_ticket(
520 ticket,
521 &"/profile.toml".into(),
522 &Some(home_replica_filters()),
523 )
524 .await
525 {
526 Ok(profile_bytes) => Ok(toml::from_str(
527 String::from_utf8_lossy(&profile_bytes).as_ref(),
528 )
529 .into_diagnostic()?),
530 Err(e) => Err(miette::miette!("{}", e)),
531 }
532 }
533
534 pub async fn fetch_posts(&self, ticket: &DocTicket) -> miette::Result<Vec<OkuPost>> {
544 match self
545 .fetch_directory_with_ticket(
546 ticket,
547 Path::new("/posts/"),
548 &Some(home_replica_filters()),
549 )
550 .await
551 {
552 Ok(post_files) => Ok(post_files
553 .par_iter()
554 .filter_map(|(entry, bytes)| {
555 toml::from_str::<OkuNote>(String::from_utf8_lossy(bytes).as_ref())
556 .ok()
557 .map(|x| OkuPost {
558 entry: entry.clone(),
559 note: x,
560 })
561 })
562 .collect()),
563 Err(e) => Err(miette::miette!("{}", e)),
564 }
565 }
566
567 pub async fn get_or_fetch_user(&self, author_id: &AuthorId) -> miette::Result<OkuUser> {
579 let config = OkuFsConfig::load_or_create_config().unwrap_or_default();
580 let republish_delay = config.get_republish_delay();
581 match DATABASE.get_user(author_id).ok().flatten() {
582 Some(user) => {
583 match SystemTime::now()
584 .duration_since(user.last_fetched)
585 .into_diagnostic()?
586 > republish_delay
587 {
588 true => self.fetch_user(author_id).await,
589 false => Ok(user),
590 }
591 }
592 None => self.fetch_user(author_id).await,
593 }
594 }
595
596 pub async fn fetch_user(&self, author_id: &AuthorId) -> miette::Result<OkuUser> {
606 self.okunet_fetch_sender.send_replace(true);
607 let ticket = self
608 .resolve_author_id(author_id)
609 .await
610 .map_err(|e| miette::miette!("{}", e))?;
611
612 let profile = self.fetch_profile(&ticket).await.ok();
613 let posts = self.fetch_posts(&ticket).await.unwrap_or_default();
614 DATABASE.upsert_posts(&posts)?;
615 DATABASE.upsert_user(&OkuUser {
616 author_id: *author_id,
617 last_fetched: SystemTime::now(),
618 posts: posts.into_par_iter().map(|y| y.entry).collect(),
619 identity: profile,
620 })?;
621 self.okunet_fetch_sender.send_replace(false);
622 DATABASE
623 .get_user(author_id)?
624 .ok_or(miette::miette!("User {} not found … ", author_id))
625 }
626}