1use super::core::{home_replica_filters, ExportedUser};
2use crate::{
3 config::OkuFsConfig,
4 database::{
5 core::DATABASE,
6 dht::ReplicaAnnouncement,
7 posts::core::{OkuNote, OkuPost},
8 users::{OkuIdentity, OkuUser},
9 },
10 discovery::REPUBLISH_DELAY,
11 fs::{util::merge_tickets, OkuFs},
12};
13use anyhow::anyhow;
14use futures::StreamExt;
15use iroh_base::ticket::Ticket;
16use iroh_blobs::Hash;
17use iroh_docs::rpc::client::docs::ShareMode;
18use iroh_docs::sync::CapabilityKind;
19use iroh_docs::AuthorId;
20use iroh_docs::DocTicket;
21use iroh_docs::NamespaceId;
22use miette::IntoDiagnostic;
23use rayon::iter::{
24 FromParallelIterator, IntoParallelIterator, IntoParallelRefIterator, ParallelIterator,
25};
26use std::{collections::HashSet, path::Path, time::SystemTime};
27
28impl OkuFs {
29 pub async fn default_author(&self) -> AuthorId {
35 self.docs
36 .client()
37 .authors()
38 .default()
39 .await
40 .unwrap_or_default()
41 }
42
43 pub async fn export_user(&self) -> Option<ExportedUser> {
49 let default_author = self.get_author().await.ok();
50 let home_replica = self.home_replica().await;
51 let home_replica_ticket = match home_replica {
52 Some(home_replica_id) => self
53 .create_document_ticket(&home_replica_id, &ShareMode::Write)
54 .await
55 .ok(),
56 None => None,
57 };
58 default_author.map(|author| ExportedUser {
59 author,
60 home_replica,
61 home_replica_ticket,
62 })
63 }
64
65 pub async fn import_user(&self, exported_user: &ExportedUser) -> miette::Result<()> {
71 self.docs
72 .client()
73 .authors()
74 .import(exported_user.author.clone())
75 .await
76 .map_err(|e| miette::miette!("{}", e))?;
77 self.docs
78 .client()
79 .authors()
80 .set_default(exported_user.author.id())
81 .await
82 .map_err(|e| miette::miette!("{}", e))?;
83 match (
84 exported_user.home_replica,
85 exported_user.home_replica_ticket.clone(),
86 ) {
87 (Some(home_replica), Some(home_replica_ticket)) => match self
88 .fetch_replica_by_ticket(&home_replica_ticket, &None, &None)
89 .await
90 {
91 Ok(_) => (),
92 Err(_e) => self
93 .fetch_replica_by_id(&home_replica, &None)
94 .await
95 .map_err(|e| miette::miette!("{}", e))?,
96 },
97 (Some(home_replica), None) => self
98 .fetch_replica_by_id(&home_replica, &None)
99 .await
100 .map_err(|e| miette::miette!("{}", e))?,
101 _ => (),
102 }
103 self.set_home_replica(&exported_user.home_replica)
104 }
105
106 pub async fn export_user_toml(&self) -> miette::Result<String> {
112 toml::to_string(
113 &self
114 .export_user()
115 .await
116 .ok_or(miette::miette!("No authorship credentials to export … "))?,
117 )
118 .into_diagnostic()
119 }
120
121 pub async fn import_user_toml(&self, exported_user_toml: &str) -> miette::Result<()> {
127 let exported_user: ExportedUser = toml::from_str(exported_user_toml).into_diagnostic()?;
128 self.import_user(&exported_user).await
129 }
130
131 pub async fn home_replica(&self) -> Option<NamespaceId> {
137 let config = OkuFsConfig::load_or_create_config().ok()?;
138 let home_replica = config.home_replica().ok().flatten()?;
139 let home_replica_capability = self.get_replica_capability(&home_replica).await.ok()?;
140 match home_replica_capability {
141 CapabilityKind::Write => Some(home_replica),
142 CapabilityKind::Read => None,
143 }
144 }
145
146 pub fn set_home_replica(&self, home_replica: &Option<NamespaceId>) -> miette::Result<()> {
152 let config = OkuFsConfig::load_or_create_config()?;
153 config.set_home_replica(home_replica)?;
154 config.save()?;
155 self.replica_sender.send_replace(());
156 Ok(())
157 }
158
159 pub async fn identity(&self) -> Option<OkuIdentity> {
165 let profile_bytes = self
166 .read_file(&self.home_replica().await?, &"/profile.toml".into())
167 .await
168 .ok()?;
169 toml::from_str(String::from_utf8_lossy(&profile_bytes).as_ref()).ok()
170 }
171
172 pub async fn set_identity(&self, identity: &OkuIdentity) -> miette::Result<Hash> {
182 let mut validated_identity = identity.clone();
184 let me = self.default_author().await;
185 validated_identity.following.retain(|y| me != *y);
186 validated_identity.blocked.retain(|y| me != *y);
187 validated_identity.following = validated_identity
189 .following
190 .difference(&validated_identity.blocked)
191 .copied()
192 .collect();
193
194 self.create_or_modify_file(
195 &self
196 .home_replica()
197 .await
198 .ok_or(miette::miette!("No home replica set … "))?,
199 &"/profile.toml".into(),
200 toml::to_string_pretty(&validated_identity).into_diagnostic()?,
201 )
202 .await
203 }
204
205 pub async fn set_display_name(&self, display_name: &String) -> miette::Result<Hash> {
215 let mut identity = self.identity().await.unwrap_or_default();
216 identity.name = display_name.to_string();
217 self.set_identity(&identity).await
218 }
219
220 pub async fn toggle_follow(&self, author_id: &AuthorId) -> miette::Result<Hash> {
230 let mut identity = self.identity().await.unwrap_or_default();
231 match identity.following.contains(author_id) {
232 true => identity.following.remove(author_id),
233 false => identity.following.insert(*author_id),
234 };
235 self.set_identity(&identity).await
236 }
237
238 pub async fn follow(&self, author_id: &AuthorId) -> miette::Result<Hash> {
248 let mut identity = self.identity().await.unwrap_or_default();
249 match identity.following.contains(author_id) {
250 true => (),
251 false => {
252 identity.following.insert(*author_id);
253 }
254 };
255 self.set_identity(&identity).await
256 }
257
258 pub async fn unfollow(&self, author_id: &AuthorId) -> miette::Result<Hash> {
268 let mut identity = self.identity().await.unwrap_or_default();
269 if identity.following.contains(author_id) {
270 identity.following.remove(author_id);
271 };
272 self.set_identity(&identity).await
273 }
274
275 pub async fn toggle_block(&self, author_id: &AuthorId) -> miette::Result<Hash> {
285 let mut identity = self.identity().await.unwrap_or_default();
286 match identity.blocked.contains(author_id) {
287 true => identity.blocked.remove(author_id),
288 false => identity.blocked.insert(*author_id),
289 };
290 self.set_identity(&identity).await
291 }
292
293 pub async fn block(&self, author_id: &AuthorId) -> miette::Result<Hash> {
303 let mut identity = self.identity().await.unwrap_or_default();
304 match identity.blocked.contains(author_id) {
305 true => (),
306 false => {
307 identity.blocked.insert(*author_id);
308 }
309 };
310 self.set_identity(&identity).await
311 }
312
313 pub async fn unblock(&self, author_id: &AuthorId) -> miette::Result<Hash> {
323 let mut identity = self.identity().await.unwrap_or_default();
324 if identity.blocked.contains(author_id) {
325 identity.blocked.remove(author_id);
326 };
327 self.set_identity(&identity).await
328 }
329
330 pub async fn is_followed(&self, author_id: &AuthorId) -> bool {
340 self.identity()
341 .await
342 .map(|x| x.following.contains(author_id))
343 .unwrap_or(false)
344 }
345
346 pub async fn is_blocked(&self, author_id: &AuthorId) -> bool {
356 self.identity()
357 .await
358 .map(|x| x.blocked.contains(author_id))
359 .unwrap_or(false)
360 }
361
362 pub async fn is_me(&self, author_id: &AuthorId) -> bool {
372 &self.default_author().await == author_id
373 }
374
375 pub async fn user(&self) -> miette::Result<OkuUser> {
381 Ok(OkuUser {
382 author_id: self.default_author().await,
383 last_fetched: SystemTime::now(),
384 posts: self
385 .posts()
386 .await
387 .map(|x| x.into_par_iter().map(|y| y.entry).collect())
388 .unwrap_or_default(),
389 identity: self.identity().await,
390 })
391 }
392
393 pub async fn refresh_users(&self) -> miette::Result<()> {
396 let (followed_users, blocked_users) = match self.identity().await {
399 Some(identity) => (identity.following, identity.blocked),
400 None => (HashSet::new(), HashSet::new()),
401 };
402 let users_to_add: HashSet<_> = followed_users
404 .difference(&blocked_users)
405 .map(|x| x.to_owned())
406 .collect();
407 let local_users: HashSet<_> = DATABASE.all_local_users().into_par_iter().collect();
408 let users_to_delete: HashSet<_> = local_users
409 .difference(&users_to_add)
410 .map(|x| x.to_owned())
411 .collect();
412
413 for user_id in users_to_add {
414 let user = self.get_or_fetch_user(&user_id).await?;
415 let (user_followed_users, user_blocked_users) = match user.identity {
416 Some(identity) => (identity.following, identity.blocked),
417 None => (HashSet::new(), HashSet::new()),
418 };
419 for user_user in user_followed_users.difference(&user_blocked_users) {
420 self.get_or_fetch_user(user_user).await?;
421 }
422 }
423 DATABASE.delete_by_author_ids(&Vec::from_par_iter(users_to_delete))?;
424 Ok(())
425 }
426
427 pub async fn fetch_users(&self) -> miette::Result<()> {
430 let (followed_users, blocked_users) = match self.identity().await {
433 Some(identity) => (identity.following, identity.blocked),
434 None => (HashSet::new(), HashSet::new()),
435 };
436 let users_to_add: HashSet<_> = followed_users
438 .difference(&blocked_users)
439 .map(|x| x.to_owned())
440 .collect();
441 let local_users: HashSet<_> = DATABASE.all_local_users().into_par_iter().collect();
442 let users_to_delete: HashSet<_> = local_users
443 .difference(&users_to_add)
444 .map(|x| x.to_owned())
445 .collect();
446
447 for user_id in users_to_add {
448 let user = self.fetch_user(&user_id).await?;
449 let (user_followed_users, user_blocked_users) = match user.identity {
450 Some(identity) => (identity.following, identity.blocked),
451 None => (HashSet::new(), HashSet::new()),
452 };
453 for user_user in user_followed_users.difference(&user_blocked_users) {
454 self.fetch_user(user_user).await?;
455 }
456 }
457 DATABASE.delete_by_author_ids(&Vec::from_par_iter(users_to_delete))?;
458 Ok(())
459 }
460
461 pub async fn resolve_author_id(&self, author_id: &AuthorId) -> anyhow::Result<DocTicket> {
471 self.okunet_fetch_sender.send_replace(true);
472 let get_stream = self.dht.get_mutable(author_id.as_bytes(), None, None)?;
473 tokio::pin!(get_stream);
474 let mut tickets = Vec::new();
475 while let Some(mutable_item) = get_stream.next().await {
476 let _ = DATABASE.upsert_announcement(&ReplicaAnnouncement {
477 key: mutable_item.key().to_vec(),
478 signature: mutable_item.signature().to_vec(),
479 });
480 tickets.push(DocTicket::from_bytes(mutable_item.value())?)
481 }
482 self.okunet_fetch_sender.send_replace(false);
483 merge_tickets(&tickets).ok_or(anyhow!(
484 "Could not find tickets for {} … ",
485 crate::fs::util::fmt(author_id)
486 ))
487 }
488
489 pub async fn fetch_profile(&self, ticket: &DocTicket) -> miette::Result<OkuIdentity> {
499 match self
500 .fetch_file_with_ticket(
501 ticket,
502 &"/profile.toml".into(),
503 &Some(home_replica_filters()),
504 )
505 .await
506 {
507 Ok(profile_bytes) => Ok(toml::from_str(
508 String::from_utf8_lossy(&profile_bytes).as_ref(),
509 )
510 .into_diagnostic()?),
511 Err(e) => Err(miette::miette!("{}", e)),
512 }
513 }
514
515 pub async fn fetch_posts(&self, ticket: &DocTicket) -> miette::Result<Vec<OkuPost>> {
525 match self
526 .fetch_directory_with_ticket(
527 ticket,
528 Path::new("/posts/"),
529 &Some(home_replica_filters()),
530 )
531 .await
532 {
533 Ok(post_files) => Ok(post_files
534 .par_iter()
535 .filter_map(|(entry, bytes)| {
536 toml::from_str::<OkuNote>(String::from_utf8_lossy(bytes).as_ref())
537 .ok()
538 .map(|x| OkuPost {
539 entry: entry.clone(),
540 note: x,
541 })
542 })
543 .collect()),
544 Err(e) => Err(miette::miette!("{}", e)),
545 }
546 }
547
548 pub async fn get_or_fetch_user(&self, author_id: &AuthorId) -> miette::Result<OkuUser> {
560 match DATABASE.get_user(author_id).ok().flatten() {
561 Some(user) => {
562 match SystemTime::now()
563 .duration_since(user.last_fetched)
564 .into_diagnostic()?
565 > REPUBLISH_DELAY
566 {
567 true => self.fetch_user(author_id).await,
568 false => Ok(user),
569 }
570 }
571 None => self.fetch_user(author_id).await,
572 }
573 }
574
575 pub async fn fetch_user(&self, author_id: &AuthorId) -> miette::Result<OkuUser> {
585 self.okunet_fetch_sender.send_replace(true);
586 let ticket = self
587 .resolve_author_id(author_id)
588 .await
589 .map_err(|e| miette::miette!("{}", e))?;
590
591 let profile = self.fetch_profile(&ticket).await.ok();
592 let posts = self.fetch_posts(&ticket).await.unwrap_or_default();
593 DATABASE.upsert_posts(&posts)?;
594 DATABASE.upsert_user(&OkuUser {
595 author_id: *author_id,
596 last_fetched: SystemTime::now(),
597 posts: posts.into_par_iter().map(|y| y.entry).collect(),
598 identity: profile,
599 })?;
600 self.okunet_fetch_sender.send_replace(false);
601 DATABASE
602 .get_user(author_id)?
603 .ok_or(miette::miette!("User {} not found … ", author_id))
604 }
605}