oku_fs/fs/net/
users.rs

1use super::core::{home_replica_filters, ExportedUser};
2use crate::{
3    config::OkuFsConfig,
4    database::{
5        core::DATABASE,
6        dht::ReplicaAnnouncement,
7        posts::core::{OkuNote, OkuPost},
8        users::{OkuIdentity, OkuUser},
9    },
10    discovery::REPUBLISH_DELAY,
11    fs::{util::merge_tickets, OkuFs},
12};
13use anyhow::anyhow;
14use futures::StreamExt;
15use iroh_base::ticket::Ticket;
16use iroh_blobs::Hash;
17use iroh_docs::rpc::client::docs::ShareMode;
18use iroh_docs::sync::CapabilityKind;
19use iroh_docs::AuthorId;
20use iroh_docs::DocTicket;
21use iroh_docs::NamespaceId;
22use miette::IntoDiagnostic;
23use rayon::iter::{
24    FromParallelIterator, IntoParallelIterator, IntoParallelRefIterator, ParallelIterator,
25};
26use std::{collections::HashSet, path::Path, time::SystemTime};
27
28impl OkuFs {
29    /// Retrieve the content authorship ID used by the node.
30    ///
31    /// # Returns
32    ///
33    /// The content authorship ID used by the node.
34    pub async fn default_author(&self) -> AuthorId {
35        self.docs
36            .client()
37            .authors()
38            .default()
39            .await
40            .unwrap_or_default()
41    }
42
43    /// Exports the local Oku user's credentials.
44    ///
45    /// # Returns
46    ///
47    /// The local Oku user's credentials, containing sensitive information.
48    pub async fn export_user(&self) -> Option<ExportedUser> {
49        let default_author = self.get_author().await.ok();
50        let home_replica = self.home_replica().await;
51        let home_replica_ticket = match home_replica {
52            Some(home_replica_id) => self
53                .create_document_ticket(&home_replica_id, &ShareMode::Write)
54                .await
55                .ok(),
56            None => None,
57        };
58        default_author.map(|author| ExportedUser {
59            author,
60            home_replica,
61            home_replica_ticket,
62        })
63    }
64
65    /// Imports Oku user credentials that were exported from another node.
66    ///
67    /// # Arguments
68    ///
69    /// * `exported_user` - Oku user credentials, which contain sensitive information.
70    pub async fn import_user(&self, exported_user: &ExportedUser) -> miette::Result<()> {
71        self.docs
72            .client()
73            .authors()
74            .import(exported_user.author.clone())
75            .await
76            .map_err(|e| miette::miette!("{}", e))?;
77        self.docs
78            .client()
79            .authors()
80            .set_default(exported_user.author.id())
81            .await
82            .map_err(|e| miette::miette!("{}", e))?;
83        match (
84            exported_user.home_replica,
85            exported_user.home_replica_ticket.clone(),
86        ) {
87            (Some(home_replica), Some(home_replica_ticket)) => match self
88                .fetch_replica_by_ticket(&home_replica_ticket, &None, &None)
89                .await
90            {
91                Ok(_) => (),
92                Err(_e) => self
93                    .fetch_replica_by_id(&home_replica, &None)
94                    .await
95                    .map_err(|e| miette::miette!("{}", e))?,
96            },
97            (Some(home_replica), None) => self
98                .fetch_replica_by_id(&home_replica, &None)
99                .await
100                .map_err(|e| miette::miette!("{}", e))?,
101            _ => (),
102        }
103        self.set_home_replica(&exported_user.home_replica)
104    }
105
106    /// Exports the local Oku user's credentials in TOML format.
107    ///
108    /// # Returns
109    ///
110    /// The local Oku user's credentials, containing sensitive information.
111    pub async fn export_user_toml(&self) -> miette::Result<String> {
112        toml::to_string(
113            &self
114                .export_user()
115                .await
116                .ok_or(miette::miette!("No authorship credentials to export … "))?,
117        )
118        .into_diagnostic()
119    }
120
121    /// Imports Oku user credentials that were exported from another node.
122    ///
123    /// # Arguments
124    ///
125    /// * `exported_user` - Oku user credentials, encoded in TOML format. They contain sensitive information.
126    pub async fn import_user_toml(&self, exported_user_toml: &str) -> miette::Result<()> {
127        let exported_user: ExportedUser = toml::from_str(exported_user_toml).into_diagnostic()?;
128        self.import_user(&exported_user).await
129    }
130
131    /// Retrieve the home replica of the Oku user.
132    ///
133    /// # Returns
134    ///
135    /// The home replica of the Oku user.
136    pub async fn home_replica(&self) -> Option<NamespaceId> {
137        let config = OkuFsConfig::load_or_create_config().ok()?;
138        let home_replica = config.home_replica().ok().flatten()?;
139        let home_replica_capability = self.get_replica_capability(&home_replica).await.ok()?;
140        match home_replica_capability {
141            CapabilityKind::Write => Some(home_replica),
142            CapabilityKind::Read => None,
143        }
144    }
145
146    /// Set the home replica of the Oku user.
147    ///
148    /// # Arguments
149    ///
150    /// * `home_replica` - The ID of the intended new home replica.
151    pub fn set_home_replica(&self, home_replica: &Option<NamespaceId>) -> miette::Result<()> {
152        let config = OkuFsConfig::load_or_create_config()?;
153        config.set_home_replica(home_replica)?;
154        config.save()?;
155        self.replica_sender.send_replace(());
156        Ok(())
157    }
158
159    /// Retrieves the OkuNet identity of the local user.
160    ///
161    /// # Returns
162    ///
163    /// The local user's OkuNet identity, if they have one.
164    pub async fn identity(&self) -> Option<OkuIdentity> {
165        let profile_bytes = self
166            .read_file(&self.home_replica().await?, &"/profile.toml".into())
167            .await
168            .ok()?;
169        toml::from_str(String::from_utf8_lossy(&profile_bytes).as_ref()).ok()
170    }
171
172    /// Replaces the current OkuNet identity of the local user.
173    ///
174    /// # Arguments
175    ///
176    /// * `identity` - The new OkuNet identity.
177    ///
178    /// # Returns
179    ///
180    /// The hash of the new identity file in the local user's home replica.
181    pub async fn set_identity(&self, identity: &OkuIdentity) -> miette::Result<Hash> {
182        // It is not valid to follow or unfollow yourself.
183        let mut validated_identity = identity.clone();
184        let me = self.default_author().await;
185        validated_identity.following.retain(|y| me != *y);
186        validated_identity.blocked.retain(|y| me != *y);
187        // It is not valid to follow blocked people.
188        validated_identity.following = validated_identity
189            .following
190            .difference(&validated_identity.blocked)
191            .copied()
192            .collect();
193
194        self.create_or_modify_file(
195            &self
196                .home_replica()
197                .await
198                .ok_or(miette::miette!("No home replica set … "))?,
199            &"/profile.toml".into(),
200            toml::to_string_pretty(&validated_identity).into_diagnostic()?,
201        )
202        .await
203    }
204
205    /// Replaces the current display name of the local user.
206    ///
207    /// # Arguments
208    ///
209    /// * `display_name` - The new display name.
210    ///
211    /// # Returns
212    ///
213    /// # The hash of the new identity file in the local user's home replica.
214    pub async fn set_display_name(&self, display_name: &String) -> miette::Result<Hash> {
215        let mut identity = self.identity().await.unwrap_or_default();
216        identity.name = display_name.to_string();
217        self.set_identity(&identity).await
218    }
219
220    /// Follow or unfollow a user.
221    ///
222    /// # Arguments
223    ///
224    /// * `author_id` - The user to follow or unfollow's content authorship ID.
225    ///
226    /// # Returns
227    ///
228    /// The hash of the new identity file in the local user's home replica.
229    pub async fn toggle_follow(&self, author_id: &AuthorId) -> miette::Result<Hash> {
230        let mut identity = self.identity().await.unwrap_or_default();
231        match identity.following.contains(author_id) {
232            true => identity.following.remove(author_id),
233            false => identity.following.insert(*author_id),
234        };
235        self.set_identity(&identity).await
236    }
237
238    /// Follow a user.
239    ///
240    /// # Arguments
241    ///
242    /// * `author_id` - The user to follow's content authorship ID.
243    ///
244    /// # Returns
245    ///
246    /// The hash of the new identity file in the local user's home replica.
247    pub async fn follow(&self, author_id: &AuthorId) -> miette::Result<Hash> {
248        let mut identity = self.identity().await.unwrap_or_default();
249        match identity.following.contains(author_id) {
250            true => (),
251            false => {
252                identity.following.insert(*author_id);
253            }
254        };
255        self.set_identity(&identity).await
256    }
257
258    /// Unfollow a user.
259    ///
260    /// # Arguments
261    ///
262    /// * `author_id` - The user to unfollow's content authorship ID.
263    ///
264    /// # Returns
265    ///
266    /// The hash of the new identity file in the local user's home replica.
267    pub async fn unfollow(&self, author_id: &AuthorId) -> miette::Result<Hash> {
268        let mut identity = self.identity().await.unwrap_or_default();
269        if identity.following.contains(author_id) {
270            identity.following.remove(author_id);
271        };
272        self.set_identity(&identity).await
273    }
274
275    /// Block or unblock a user.
276    ///
277    /// # Arguments
278    ///
279    /// * `author_id` - The user to block or unblock's content authorship ID.
280    ///
281    /// # Returns
282    ///
283    /// The hash of the new identity file in the local user's home replica.
284    pub async fn toggle_block(&self, author_id: &AuthorId) -> miette::Result<Hash> {
285        let mut identity = self.identity().await.unwrap_or_default();
286        match identity.blocked.contains(author_id) {
287            true => identity.blocked.remove(author_id),
288            false => identity.blocked.insert(*author_id),
289        };
290        self.set_identity(&identity).await
291    }
292
293    /// Block a user.
294    ///
295    /// # Arguments
296    ///
297    /// * `author_id` - The user to block's content authorship ID.
298    ///
299    /// # Returns
300    ///
301    /// The hash of the new identity file in the local user's home replica.
302    pub async fn block(&self, author_id: &AuthorId) -> miette::Result<Hash> {
303        let mut identity = self.identity().await.unwrap_or_default();
304        match identity.blocked.contains(author_id) {
305            true => (),
306            false => {
307                identity.blocked.insert(*author_id);
308            }
309        };
310        self.set_identity(&identity).await
311    }
312
313    /// Unblock a user.
314    ///
315    /// # Arguments
316    ///
317    /// * `author_id` - The user to unblock's content authorship ID.
318    ///
319    /// # Returns
320    ///
321    /// The hash of the new identity file in the local user's home replica.
322    pub async fn unblock(&self, author_id: &AuthorId) -> miette::Result<Hash> {
323        let mut identity = self.identity().await.unwrap_or_default();
324        if identity.blocked.contains(author_id) {
325            identity.blocked.remove(author_id);
326        };
327        self.set_identity(&identity).await
328    }
329
330    /// Check if a user is followed.
331    ///
332    /// # Arguments
333    ///
334    /// * `author_id` - The user's content authorship ID.
335    ///
336    /// # Returns
337    ///
338    /// Whether or not the user is followed.
339    pub async fn is_followed(&self, author_id: &AuthorId) -> bool {
340        self.identity()
341            .await
342            .map(|x| x.following.contains(author_id))
343            .unwrap_or(false)
344    }
345
346    /// Check if a user is blocked.
347    ///
348    /// # Arguments
349    ///
350    /// * `author_id` - The user's content authorship ID.
351    ///
352    /// # Returns
353    ///
354    /// Whether or not the user is blocked.
355    pub async fn is_blocked(&self, author_id: &AuthorId) -> bool {
356        self.identity()
357            .await
358            .map(|x| x.blocked.contains(author_id))
359            .unwrap_or(false)
360    }
361
362    /// Check whether or not an author ID is the local user's.
363    ///
364    /// # Arguments
365    ///
366    /// * `author_id` - A user's content authorship ID.
367    ///
368    /// # Returns
369    ///
370    /// Whether or not the user's authorship ID is the local user's.
371    pub async fn is_me(&self, author_id: &AuthorId) -> bool {
372        &self.default_author().await == author_id
373    }
374
375    /// Retrieves an [`OkuUser`] representing the local user.
376    ///
377    /// # Returns
378    ///
379    /// An [`OkuUser`] representing the current user, as if it were retrieved from another Oku user's database.
380    pub async fn user(&self) -> miette::Result<OkuUser> {
381        Ok(OkuUser {
382            author_id: self.default_author().await,
383            last_fetched: SystemTime::now(),
384            posts: self
385                .posts()
386                .await
387                .map(|x| x.into_par_iter().map(|y| y.entry).collect())
388                .unwrap_or_default(),
389            identity: self.identity().await,
390        })
391    }
392
393    /// Refreshes any user data last retrieved longer than [`REPUBLISH_DELAY`] ago according to the system time; the users one is following, and the users they're following, are recorded locally.
394    /// Blocked users are not recorded.
395    pub async fn refresh_users(&self) -> miette::Result<()> {
396        // Wanted users: followed users
397        // Unwanted users: blocked users, unfollowed users
398        let (followed_users, blocked_users) = match self.identity().await {
399            Some(identity) => (identity.following, identity.blocked),
400            None => (HashSet::new(), HashSet::new()),
401        };
402        // In case a user is somehow followed and blocked (additional checks should already prevent this)
403        let users_to_add: HashSet<_> = followed_users
404            .difference(&blocked_users)
405            .map(|x| x.to_owned())
406            .collect();
407        let local_users: HashSet<_> = DATABASE.all_local_users().into_par_iter().collect();
408        let users_to_delete: HashSet<_> = local_users
409            .difference(&users_to_add)
410            .map(|x| x.to_owned())
411            .collect();
412
413        for user_id in users_to_add {
414            let user = self.get_or_fetch_user(&user_id).await?;
415            let (user_followed_users, user_blocked_users) = match user.identity {
416                Some(identity) => (identity.following, identity.blocked),
417                None => (HashSet::new(), HashSet::new()),
418            };
419            for user_user in user_followed_users.difference(&user_blocked_users) {
420                self.get_or_fetch_user(user_user).await?;
421            }
422        }
423        DATABASE.delete_by_author_ids(&Vec::from_par_iter(users_to_delete))?;
424        Ok(())
425    }
426
427    /// Retrieves user data regardless of when last retrieved; the users one is following, and the users they're following, are recorded locally.
428    /// Blocked users are not recorded.
429    pub async fn fetch_users(&self) -> miette::Result<()> {
430        // Wanted users: followed users
431        // Unwanted users: blocked users, unfollowed users
432        let (followed_users, blocked_users) = match self.identity().await {
433            Some(identity) => (identity.following, identity.blocked),
434            None => (HashSet::new(), HashSet::new()),
435        };
436        // In case a user is somehow followed and blocked (additional checks should already prevent this)
437        let users_to_add: HashSet<_> = followed_users
438            .difference(&blocked_users)
439            .map(|x| x.to_owned())
440            .collect();
441        let local_users: HashSet<_> = DATABASE.all_local_users().into_par_iter().collect();
442        let users_to_delete: HashSet<_> = local_users
443            .difference(&users_to_add)
444            .map(|x| x.to_owned())
445            .collect();
446
447        for user_id in users_to_add {
448            let user = self.fetch_user(&user_id).await?;
449            let (user_followed_users, user_blocked_users) = match user.identity {
450                Some(identity) => (identity.following, identity.blocked),
451                None => (HashSet::new(), HashSet::new()),
452            };
453            for user_user in user_followed_users.difference(&user_blocked_users) {
454                self.fetch_user(user_user).await?;
455            }
456        }
457        DATABASE.delete_by_author_ids(&Vec::from_par_iter(users_to_delete))?;
458        Ok(())
459    }
460
461    /// Use the mainline DHT to obtain a ticket for the home replica of the user with the given content authorship ID.
462    ///
463    /// # Arguments
464    ///
465    /// * `author_id` - A content authorship ID.
466    ///
467    /// # Returns
468    ///
469    /// A ticket for the home replica of the user with the given content authorship ID.
470    pub async fn resolve_author_id(&self, author_id: &AuthorId) -> anyhow::Result<DocTicket> {
471        self.okunet_fetch_sender.send_replace(true);
472        let get_stream = self.dht.get_mutable(author_id.as_bytes(), None, None)?;
473        tokio::pin!(get_stream);
474        let mut tickets = Vec::new();
475        while let Some(mutable_item) = get_stream.next().await {
476            let _ = DATABASE.upsert_announcement(&ReplicaAnnouncement {
477                key: mutable_item.key().to_vec(),
478                signature: mutable_item.signature().to_vec(),
479            });
480            tickets.push(DocTicket::from_bytes(mutable_item.value())?)
481        }
482        self.okunet_fetch_sender.send_replace(false);
483        merge_tickets(&tickets).ok_or(anyhow!(
484            "Could not find tickets for {} … ",
485            crate::fs::util::fmt(author_id)
486        ))
487    }
488
489    /// Join a swarm to fetch the latest version of a home replica and obtain the OkuNet identity within it.
490    ///
491    /// # Arguments
492    ///
493    /// * `author_id` - A content authorship ID.
494    ///
495    /// # Returns
496    ///
497    /// The OkuNet identity within the home replica of the user with the given content authorship ID.
498    pub async fn fetch_profile(&self, ticket: &DocTicket) -> miette::Result<OkuIdentity> {
499        match self
500            .fetch_file_with_ticket(
501                ticket,
502                &"/profile.toml".into(),
503                &Some(home_replica_filters()),
504            )
505            .await
506        {
507            Ok(profile_bytes) => Ok(toml::from_str(
508                String::from_utf8_lossy(&profile_bytes).as_ref(),
509            )
510            .into_diagnostic()?),
511            Err(e) => Err(miette::miette!("{}", e)),
512        }
513    }
514
515    /// Join a swarm to fetch the latest version of a home replica and obtain the OkuNet posts within it.
516    ///
517    /// # Arguments
518    ///
519    /// * `author_id` - A content authorship ID.
520    ///
521    /// # Returns
522    ///
523    /// The OkuNet posts within the home replica of the user with the given content authorship ID.
524    pub async fn fetch_posts(&self, ticket: &DocTicket) -> miette::Result<Vec<OkuPost>> {
525        match self
526            .fetch_directory_with_ticket(
527                ticket,
528                Path::new("/posts/"),
529                &Some(home_replica_filters()),
530            )
531            .await
532        {
533            Ok(post_files) => Ok(post_files
534                .par_iter()
535                .filter_map(|(entry, bytes)| {
536                    toml::from_str::<OkuNote>(String::from_utf8_lossy(bytes).as_ref())
537                        .ok()
538                        .map(|x| OkuPost {
539                            entry: entry.clone(),
540                            note: x,
541                        })
542                })
543                .collect()),
544            Err(e) => Err(miette::miette!("{}", e)),
545        }
546    }
547
548    /// Obtain an OkuNet user's content, identified by their content authorship ID.
549    ///
550    /// If last retrieved longer than [`REPUBLISH_DELAY`] ago according to the system time, a known user's content will be re-fetched.
551    ///
552    /// # Arguments
553    ///
554    /// * `author_id` - A content authorship ID.
555    ///
556    /// # Returns
557    ///
558    /// An OkuNet user's content.
559    pub async fn get_or_fetch_user(&self, author_id: &AuthorId) -> miette::Result<OkuUser> {
560        match DATABASE.get_user(author_id).ok().flatten() {
561            Some(user) => {
562                match SystemTime::now()
563                    .duration_since(user.last_fetched)
564                    .into_diagnostic()?
565                    > REPUBLISH_DELAY
566                {
567                    true => self.fetch_user(author_id).await,
568                    false => Ok(user),
569                }
570            }
571            None => self.fetch_user(author_id).await,
572        }
573    }
574
575    /// Fetch the latest version of an OkuNet user's content, identified by their content authorship ID.
576    ///
577    /// # Arguments
578    ///
579    /// * `author_id` - A content authorship ID.
580    ///
581    /// # Returns
582    ///
583    /// The latest version of an OkuNet user's content.
584    pub async fn fetch_user(&self, author_id: &AuthorId) -> miette::Result<OkuUser> {
585        self.okunet_fetch_sender.send_replace(true);
586        let ticket = self
587            .resolve_author_id(author_id)
588            .await
589            .map_err(|e| miette::miette!("{}", e))?;
590
591        let profile = self.fetch_profile(&ticket).await.ok();
592        let posts = self.fetch_posts(&ticket).await.unwrap_or_default();
593        DATABASE.upsert_posts(&posts)?;
594        DATABASE.upsert_user(&OkuUser {
595            author_id: *author_id,
596            last_fetched: SystemTime::now(),
597            posts: posts.into_par_iter().map(|y| y.entry).collect(),
598            identity: profile,
599        })?;
600        self.okunet_fetch_sender.send_replace(false);
601        DATABASE
602            .get_user(author_id)?
603            .ok_or(miette::miette!("User {} not found … ", author_id))
604    }
605}