Skip to main content

oku_fs/fs/net/
users.rs

1use super::core::{home_replica_filters, ExportedUser};
2use crate::{
3    config::OkuFsConfig,
4    database::{
5        core::DATABASE,
6        posts::core::{OkuNote, OkuPost},
7        users::{OkuIdentity, OkuUser},
8    },
9    fs::OkuFs,
10};
11use futures::StreamExt;
12use iroh_blobs::Hash;
13use iroh_docs::sync::CapabilityKind;
14use iroh_docs::AuthorId;
15use iroh_docs::DocTicket;
16use iroh_docs::NamespaceId;
17use iroh_docs::{api::protocol::ShareMode, Author, NamespaceSecret};
18use log::debug;
19use miette::IntoDiagnostic;
20use rayon::iter::{
21    FromParallelIterator, IntoParallelIterator, IntoParallelRefIterator, ParallelIterator,
22};
23use std::{collections::HashSet, path::Path, time::SystemTime};
24
25impl OkuFs {
26    /// Retrieve the content authorship ID used by the node.
27    ///
28    /// # Returns
29    ///
30    /// The content authorship ID used by the node.
31    pub async fn default_author(&self) -> AuthorId {
32        self.docs.author_default().await.unwrap_or_default()
33    }
34
35    /// Exports the local Oku user's credentials.
36    ///
37    /// # Returns
38    ///
39    /// The local Oku user's credentials, containing sensitive information.
40    pub async fn export_user(&self) -> Option<ExportedUser> {
41        let default_author = self.get_author().await.ok();
42        let home_replica = self.home_replica().await;
43        let home_replica_ticket = match home_replica {
44            Some(home_replica_id) => self
45                .create_document_ticket(&home_replica_id, &ShareMode::Write)
46                .await
47                .ok(),
48            None => None,
49        };
50        default_author.map(|author| ExportedUser {
51            author,
52            home_replica,
53            home_replica_ticket,
54        })
55    }
56
57    /// Imports Oku user credentials that were exported from another node.
58    ///
59    /// # Arguments
60    ///
61    /// * `exported_user` - Oku user credentials, which contain sensitive information.
62    pub async fn import_user(&self, exported_user: &ExportedUser) -> miette::Result<()> {
63        self.docs
64            .author_import(exported_user.author.clone())
65            .await
66            .map_err(|e| miette::miette!("{}", e))?;
67        self.docs
68            .author_set_default(exported_user.author.id())
69            .await
70            .map_err(|e| miette::miette!("{}", e))?;
71        match (
72            exported_user.home_replica,
73            exported_user.home_replica_ticket.clone(),
74        ) {
75            (Some(home_replica), Some(home_replica_ticket)) => match self
76                .fetch_replica_by_ticket(&home_replica_ticket, &None, &None)
77                .await
78            {
79                Ok(_) => (),
80                Err(_e) => self
81                    .fetch_replica_by_id(&home_replica, &None)
82                    .await
83                    .map_err(|e| miette::miette!("{}", e))?,
84            },
85            (Some(home_replica), None) => self
86                .fetch_replica_by_id(&home_replica, &None)
87                .await
88                .map_err(|e| miette::miette!("{}", e))?,
89            _ => (),
90        }
91        Ok(())
92    }
93
94    /// Exports the local Oku user's credentials in TOML format.
95    ///
96    /// # Returns
97    ///
98    /// The local Oku user's credentials, containing sensitive information.
99    pub async fn export_user_toml(&self) -> miette::Result<String> {
100        toml::to_string(
101            &self
102                .export_user()
103                .await
104                .ok_or(miette::miette!("No authorship credentials to export … "))?,
105        )
106        .into_diagnostic()
107    }
108
109    /// Imports Oku user credentials that were exported from another node.
110    ///
111    /// # Arguments
112    ///
113    /// * `exported_user` - Oku user credentials, encoded in TOML format. They contain sensitive information.
114    pub async fn import_user_toml(&self, exported_user_toml: &str) -> miette::Result<()> {
115        let exported_user: ExportedUser = toml::from_str(exported_user_toml).into_diagnostic()?;
116        self.import_user(&exported_user).await
117    }
118
119    /// Creates the home replica of a known author.
120    ///
121    /// # Arguments
122    ///
123    /// * `author` - An optional author keypair to create the home replica for. If not provided, a home replica for the default author is created. The author keypair must be one already imported.
124    ///
125    /// # Returns
126    ///
127    /// The replica ID of the created home replica.
128    pub async fn create_home_replica(
129        &self,
130        author: &Option<Author>,
131    ) -> miette::Result<NamespaceId> {
132        let given_author_id = author.as_ref().map(|x| x.id());
133        if let Some(given_author_id) = given_author_id.as_ref() {
134            let is_known_author_id = self
135                .docs
136                .author_list()
137                .await
138                .map_err(|e| miette::miette!(e))?
139                .any(|x| async move { x.map_or(false, |x| x == *given_author_id) })
140                .await;
141            if !is_known_author_id {
142                return Err(miette::miette!("Cannot create home replica for authors whose private key is unknown (author ID: {})", crate::fs::util::fmt_short(given_author_id)));
143            }
144        }
145
146        let default_author = self.get_author().await.map_err(|e| miette::miette!(e))?;
147        let author = author.as_ref().unwrap_or(&default_author);
148        debug!(
149            "Attempting to create home replica for author with ID {} … ",
150            crate::fs::util::fmt_short(author.id())
151        );
152        let home_replica = self
153            .docs
154            .import_namespace(iroh_docs::Capability::Write(NamespaceSecret::from_bytes(
155                &author.to_bytes(),
156            )))
157            .await
158            .map_err(|e| miette::miette!(e))?;
159        self.replica_sender.send_replace(());
160        debug!(
161            "Created home replica for author with ID {} … ",
162            crate::fs::util::fmt_short(author.id())
163        );
164        Ok(home_replica.id())
165    }
166
167    /// Retrieve the home replica of the Oku user, creating it if it does not yet exist.
168    ///
169    /// # Returns
170    ///
171    /// The home replica of the Oku user, if it already existed or was able to be created successfully.
172    pub async fn home_replica(&self) -> Option<NamespaceId> {
173        let home_replica = NamespaceId::from(self.default_author().await.as_bytes());
174        let home_replica_capability = self.get_replica_capability(&home_replica).await.ok();
175        let home_replica_exists = match home_replica_capability {
176            Some(CapabilityKind::Write) => Some(home_replica),
177            Some(CapabilityKind::Read) => None,
178            None => None,
179        };
180        if let None = home_replica_exists {
181            debug!("Home replica does not exist; creating … ");
182            self.create_home_replica(&None).await.ok()
183        } else {
184            home_replica_exists
185        }
186    }
187
188    /// Retrieves the OkuNet identity of the local user.
189    ///
190    /// # Returns
191    ///
192    /// The local user's OkuNet identity, if they have one.
193    pub async fn identity(&self) -> Option<OkuIdentity> {
194        let profile_bytes = self
195            .read_file(&self.home_replica().await?, &"/profile.toml".into())
196            .await
197            .ok()?;
198        toml::from_str(String::from_utf8_lossy(&profile_bytes).as_ref()).ok()
199    }
200
201    /// Replaces the current OkuNet identity of the local user.
202    ///
203    /// # Arguments
204    ///
205    /// * `identity` - The new OkuNet identity.
206    ///
207    /// # Returns
208    ///
209    /// The hash of the new identity file in the local user's home replica.
210    pub async fn set_identity(&self, identity: &OkuIdentity) -> miette::Result<Hash> {
211        // It is not valid to follow or unfollow yourself.
212        let mut validated_identity = identity.clone();
213        let me = self.default_author().await;
214        validated_identity.following.retain(|y| me != *y);
215        validated_identity.blocked.retain(|y| me != *y);
216        // It is not valid to follow blocked people.
217        validated_identity.following = validated_identity
218            .following
219            .difference(&validated_identity.blocked)
220            .copied()
221            .collect();
222
223        self.create_or_modify_file(
224            &self
225                .home_replica()
226                .await
227                .ok_or(miette::miette!("No home replica set … "))?,
228            &"/profile.toml".into(),
229            toml::to_string_pretty(&validated_identity).into_diagnostic()?,
230        )
231        .await
232    }
233
234    /// Replaces the current display name of the local user.
235    ///
236    /// # Arguments
237    ///
238    /// * `display_name` - The new display name.
239    ///
240    /// # Returns
241    ///
242    /// # The hash of the new identity file in the local user's home replica.
243    pub async fn set_display_name(&self, display_name: &String) -> miette::Result<Hash> {
244        let mut identity = self.identity().await.unwrap_or_default();
245        identity.name = display_name.to_string();
246        self.set_identity(&identity).await
247    }
248
249    /// Follow or unfollow a user.
250    ///
251    /// # Arguments
252    ///
253    /// * `author_id` - The user to follow or unfollow's content authorship ID.
254    ///
255    /// # Returns
256    ///
257    /// The hash of the new identity file in the local user's home replica.
258    pub async fn toggle_follow(&self, author_id: &AuthorId) -> miette::Result<Hash> {
259        let mut identity = self.identity().await.unwrap_or_default();
260        match identity.following.contains(author_id) {
261            true => identity.following.remove(author_id),
262            false => identity.following.insert(*author_id),
263        };
264        self.set_identity(&identity).await
265    }
266
267    /// Follow a user.
268    ///
269    /// # Arguments
270    ///
271    /// * `author_id` - The user to follow's content authorship ID.
272    ///
273    /// # Returns
274    ///
275    /// The hash of the new identity file in the local user's home replica.
276    pub async fn follow(&self, author_id: &AuthorId) -> miette::Result<Hash> {
277        let mut identity = self.identity().await.unwrap_or_default();
278        match identity.following.contains(author_id) {
279            true => (),
280            false => {
281                identity.following.insert(*author_id);
282            }
283        };
284        self.set_identity(&identity).await
285    }
286
287    /// Unfollow a user.
288    ///
289    /// # Arguments
290    ///
291    /// * `author_id` - The user to unfollow's content authorship ID.
292    ///
293    /// # Returns
294    ///
295    /// The hash of the new identity file in the local user's home replica.
296    pub async fn unfollow(&self, author_id: &AuthorId) -> miette::Result<Hash> {
297        let mut identity = self.identity().await.unwrap_or_default();
298        if identity.following.contains(author_id) {
299            identity.following.remove(author_id);
300        };
301        self.set_identity(&identity).await
302    }
303
304    /// Block or unblock a user.
305    ///
306    /// # Arguments
307    ///
308    /// * `author_id` - The user to block or unblock's content authorship ID.
309    ///
310    /// # Returns
311    ///
312    /// The hash of the new identity file in the local user's home replica.
313    pub async fn toggle_block(&self, author_id: &AuthorId) -> miette::Result<Hash> {
314        let mut identity = self.identity().await.unwrap_or_default();
315        match identity.blocked.contains(author_id) {
316            true => identity.blocked.remove(author_id),
317            false => identity.blocked.insert(*author_id),
318        };
319        self.set_identity(&identity).await
320    }
321
322    /// Block a user.
323    ///
324    /// # Arguments
325    ///
326    /// * `author_id` - The user to block's content authorship ID.
327    ///
328    /// # Returns
329    ///
330    /// The hash of the new identity file in the local user's home replica.
331    pub async fn block(&self, author_id: &AuthorId) -> miette::Result<Hash> {
332        let mut identity = self.identity().await.unwrap_or_default();
333        match identity.blocked.contains(author_id) {
334            true => (),
335            false => {
336                identity.blocked.insert(*author_id);
337            }
338        };
339        self.set_identity(&identity).await
340    }
341
342    /// Unblock a user.
343    ///
344    /// # Arguments
345    ///
346    /// * `author_id` - The user to unblock's content authorship ID.
347    ///
348    /// # Returns
349    ///
350    /// The hash of the new identity file in the local user's home replica.
351    pub async fn unblock(&self, author_id: &AuthorId) -> miette::Result<Hash> {
352        let mut identity = self.identity().await.unwrap_or_default();
353        if identity.blocked.contains(author_id) {
354            identity.blocked.remove(author_id);
355        };
356        self.set_identity(&identity).await
357    }
358
359    /// Check if a user is followed.
360    ///
361    /// # Arguments
362    ///
363    /// * `author_id` - The user's content authorship ID.
364    ///
365    /// # Returns
366    ///
367    /// Whether or not the user is followed.
368    pub async fn is_followed(&self, author_id: &AuthorId) -> bool {
369        self.identity()
370            .await
371            .map(|x| x.following.contains(author_id))
372            .unwrap_or(false)
373    }
374
375    /// Check if a user is blocked.
376    ///
377    /// # Arguments
378    ///
379    /// * `author_id` - The user's content authorship ID.
380    ///
381    /// # Returns
382    ///
383    /// Whether or not the user is blocked.
384    pub async fn is_blocked(&self, author_id: &AuthorId) -> bool {
385        self.identity()
386            .await
387            .map(|x| x.blocked.contains(author_id))
388            .unwrap_or(false)
389    }
390
391    /// Check whether or not an author ID is the local user's.
392    ///
393    /// # Arguments
394    ///
395    /// * `author_id` - A user's content authorship ID.
396    ///
397    /// # Returns
398    ///
399    /// Whether or not the user's authorship ID is the local user's.
400    pub async fn is_me(&self, author_id: &AuthorId) -> bool {
401        &self.default_author().await == author_id
402    }
403
404    /// Retrieves an [`OkuUser`] representing the local user.
405    ///
406    /// # Returns
407    ///
408    /// An [`OkuUser`] representing the current user, as if it were retrieved from another Oku user's database.
409    pub async fn user(&self) -> miette::Result<OkuUser> {
410        Ok(OkuUser {
411            author_id: self.default_author().await,
412            last_fetched: SystemTime::now(),
413            posts: self
414                .posts()
415                .await
416                .map(|x| x.into_par_iter().map(|y| y.entry).collect())
417                .unwrap_or_default(),
418            identity: self.identity().await,
419        })
420    }
421
422    /// Refreshes any user data last retrieved longer than [`crate::config::OkuFsConfig::get_republish_delay`] ago according to the system time; the users one is following, and the users they're following, are recorded locally.
423    /// Blocked users are not recorded.
424    pub async fn refresh_users(&self) -> miette::Result<()> {
425        // Wanted users: followed users
426        // Unwanted users: blocked users, unfollowed users
427        let (followed_users, blocked_users) = match self.identity().await {
428            Some(identity) => (identity.following, identity.blocked),
429            None => (HashSet::new(), HashSet::new()),
430        };
431        // In case a user is somehow followed and blocked (additional checks should already prevent this)
432        let users_to_add: HashSet<_> = followed_users
433            .difference(&blocked_users)
434            .map(|x| x.to_owned())
435            .collect();
436        let local_users: HashSet<_> = DATABASE.all_local_users().into_par_iter().collect();
437        let users_to_delete: HashSet<_> = local_users
438            .difference(&users_to_add)
439            .map(|x| x.to_owned())
440            .collect();
441
442        for user_id in users_to_add {
443            let user = self.get_or_fetch_user(&user_id).await?;
444            let (user_followed_users, user_blocked_users) = match user.identity {
445                Some(identity) => (identity.following, identity.blocked),
446                None => (HashSet::new(), HashSet::new()),
447            };
448            for user_user in user_followed_users.difference(&user_blocked_users) {
449                self.get_or_fetch_user(user_user).await?;
450            }
451        }
452        DATABASE.delete_by_author_ids(&Vec::from_par_iter(users_to_delete))?;
453        Ok(())
454    }
455
456    /// Retrieves user data regardless of when last retrieved; the users one is following, and the users they're following, are recorded locally.
457    /// Blocked users are not recorded.
458    pub async fn fetch_users(&self) -> miette::Result<()> {
459        // Wanted users: followed users
460        // Unwanted users: blocked users, unfollowed users
461        let (followed_users, blocked_users) = match self.identity().await {
462            Some(identity) => (identity.following, identity.blocked),
463            None => (HashSet::new(), HashSet::new()),
464        };
465        // In case a user is somehow followed and blocked (additional checks should already prevent this)
466        let users_to_add: HashSet<_> = followed_users
467            .difference(&blocked_users)
468            .map(|x| x.to_owned())
469            .collect();
470        let local_users: HashSet<_> = DATABASE.all_local_users().into_par_iter().collect();
471        let users_to_delete: HashSet<_> = local_users
472            .difference(&users_to_add)
473            .map(|x| x.to_owned())
474            .collect();
475
476        for user_id in users_to_add {
477            let user = self.fetch_user(&user_id).await?;
478            let (user_followed_users, user_blocked_users) = match user.identity {
479                Some(identity) => (identity.following, identity.blocked),
480                None => (HashSet::new(), HashSet::new()),
481            };
482            for user_user in user_followed_users.difference(&user_blocked_users) {
483                self.fetch_user(user_user).await?;
484            }
485        }
486        DATABASE.delete_by_author_ids(&Vec::from_par_iter(users_to_delete))?;
487        Ok(())
488    }
489
490    /// Use the mainline DHT to obtain a ticket for the home replica of the user with the given content authorship ID.
491    ///
492    /// # Arguments
493    ///
494    /// * `author_id` - A content authorship ID.
495    ///
496    /// # Returns
497    ///
498    /// A ticket for the home replica of the user with the given content authorship ID.
499    pub async fn resolve_author_id(&self, author_id: &AuthorId) -> anyhow::Result<DocTicket> {
500        self.okunet_fetch_sender.send_replace(true);
501        let ticket = self
502            .resolve_namespace_id(&NamespaceId::from(author_id.as_bytes()))
503            .await;
504        self.okunet_fetch_sender.send_replace(false);
505        ticket
506    }
507
508    /// Join a swarm to fetch the latest version of a home replica and obtain the OkuNet identity within it.
509    ///
510    /// # Arguments
511    ///
512    /// * `author_id` - A content authorship ID.
513    ///
514    /// # Returns
515    ///
516    /// The OkuNet identity within the home replica of the user with the given content authorship ID.
517    pub async fn fetch_profile(&self, ticket: &DocTicket) -> miette::Result<OkuIdentity> {
518        match self
519            .fetch_file_with_ticket(
520                ticket,
521                &"/profile.toml".into(),
522                &Some(home_replica_filters()),
523            )
524            .await
525        {
526            Ok(profile_bytes) => Ok(toml::from_str(
527                String::from_utf8_lossy(&profile_bytes).as_ref(),
528            )
529            .into_diagnostic()?),
530            Err(e) => Err(miette::miette!("{}", e)),
531        }
532    }
533
534    /// Join a swarm to fetch the latest version of a home replica and obtain the OkuNet posts within it.
535    ///
536    /// # Arguments
537    ///
538    /// * `author_id` - A content authorship ID.
539    ///
540    /// # Returns
541    ///
542    /// The OkuNet posts within the home replica of the user with the given content authorship ID.
543    pub async fn fetch_posts(&self, ticket: &DocTicket) -> miette::Result<Vec<OkuPost>> {
544        match self
545            .fetch_directory_with_ticket(
546                ticket,
547                Path::new("/posts/"),
548                &Some(home_replica_filters()),
549            )
550            .await
551        {
552            Ok(post_files) => Ok(post_files
553                .par_iter()
554                .filter_map(|(entry, bytes)| {
555                    toml::from_str::<OkuNote>(String::from_utf8_lossy(bytes).as_ref())
556                        .ok()
557                        .map(|x| OkuPost {
558                            entry: entry.clone(),
559                            note: x,
560                        })
561                })
562                .collect()),
563            Err(e) => Err(miette::miette!("{}", e)),
564        }
565    }
566
567    /// Obtain an OkuNet user's content, identified by their content authorship ID.
568    ///
569    /// If last retrieved longer than [`crate::config::OkuFsConfig::get_republish_delay`] ago according to the system time, a known user's content will be re-fetched.
570    ///
571    /// # Arguments
572    ///
573    /// * `author_id` - A content authorship ID.
574    ///
575    /// # Returns
576    ///
577    /// An OkuNet user's content.
578    pub async fn get_or_fetch_user(&self, author_id: &AuthorId) -> miette::Result<OkuUser> {
579        let config = OkuFsConfig::load_or_create_config().unwrap_or_default();
580        let republish_delay = config.get_republish_delay();
581        match DATABASE.get_user(author_id).ok().flatten() {
582            Some(user) => {
583                match SystemTime::now()
584                    .duration_since(user.last_fetched)
585                    .into_diagnostic()?
586                    > republish_delay
587                {
588                    true => self.fetch_user(author_id).await,
589                    false => Ok(user),
590                }
591            }
592            None => self.fetch_user(author_id).await,
593        }
594    }
595
596    /// Fetch the latest version of an OkuNet user's content, identified by their content authorship ID.
597    ///
598    /// # Arguments
599    ///
600    /// * `author_id` - A content authorship ID.
601    ///
602    /// # Returns
603    ///
604    /// The latest version of an OkuNet user's content.
605    pub async fn fetch_user(&self, author_id: &AuthorId) -> miette::Result<OkuUser> {
606        self.okunet_fetch_sender.send_replace(true);
607        let ticket = self
608            .resolve_author_id(author_id)
609            .await
610            .map_err(|e| miette::miette!("{}", e))?;
611
612        let profile = self.fetch_profile(&ticket).await.ok();
613        let posts = self.fetch_posts(&ticket).await.unwrap_or_default();
614        DATABASE.upsert_posts(&posts)?;
615        DATABASE.upsert_user(&OkuUser {
616            author_id: *author_id,
617            last_fetched: SystemTime::now(),
618            posts: posts.into_par_iter().map(|y| y.entry).collect(),
619            identity: profile,
620        })?;
621        self.okunet_fetch_sender.send_replace(false);
622        DATABASE
623            .get_user(author_id)?
624            .ok_or(miette::miette!("User {} not found … ", author_id))
625    }
626}