oku_fs/fs/net/
posts.rs

1use super::core::home_replica_filters;
2use crate::fs::util::entry_key_to_path;
3use crate::{
4    database::{
5        core::DATABASE,
6        posts::core::{OkuNote, OkuPost},
7        users::OkuUser,
8    },
9    fs::OkuFs,
10};
11use dashmap::DashMap;
12use iroh_blobs::Hash;
13use iroh_docs::rpc::client::docs::Entry;
14use iroh_docs::AuthorId;
15use log::error;
16use miette::IntoDiagnostic;
17use rayon::iter::{
18    FromParallelIterator, IntoParallelIterator, IntoParallelRefIterator, ParallelIterator,
19};
20use std::{
21    collections::{HashMap, HashSet},
22    path::{Path, PathBuf},
23    sync::atomic::AtomicUsize,
24};
25use url::Url;
26
27impl OkuFs {
28    /// Retrieves the OkuNet posts by the local user, if any.
29    ///
30    /// # Returns
31    ///
32    /// A list of the OkuNet posts by the local user.
33    pub async fn posts(&self) -> Option<Vec<OkuPost>> {
34        let post_files = self
35            .read_directory(&self.home_replica().await?, Path::new("/posts/"))
36            .await
37            .ok()
38            .unwrap_or_default();
39        Some(
40            post_files
41                .par_iter()
42                .filter(|(entry, _)| {
43                    entry_key_to_path(entry.key())
44                        .map(|x| matches!(x.extension(), Some(y) if y == "toml"))
45                        .unwrap_or(false)
46                })
47                .filter_map(|(entry, bytes)| {
48                    toml::from_str::<OkuNote>(String::from_utf8_lossy(bytes).as_ref())
49                        .ok()
50                        .map(|x| OkuPost {
51                            entry: entry.clone(),
52                            note: x,
53                        })
54                })
55                .collect(),
56        )
57    }
58
59    /// Retrieve all posts known to this Oku node.
60    ///
61    /// # Returns
62    ///
63    /// All posts known to this Oku node.
64    pub async fn all_posts(&self) -> HashSet<OkuPost> {
65        let mut posts = HashSet::<_>::from_par_iter(self.posts().await.unwrap_or_default());
66        posts.extend(DATABASE.get_posts().unwrap_or_default());
67        posts
68    }
69
70    /// Filters posts containing at least one of the given tags.
71    ///
72    /// # Arguments
73    ///
74    /// * `posts` - A set of posts.
75    ///
76    /// * `tags` - A set of tags.
77    ///
78    /// # Returns
79    ///
80    /// A list of OkuNet posts with the given tags.
81    pub async fn posts_with_tags(&self, posts: &[OkuPost], tags: &HashSet<String>) -> Vec<OkuPost> {
82        posts
83            .to_owned()
84            .into_par_iter()
85            .filter(|x| !x.note.tags.is_disjoint(tags))
86            .collect()
87    }
88
89    /// Retrieves the set of all tags that appear in the given posts.
90    ///
91    /// # Arguments
92    ///
93    /// * `posts` - A set of posts.
94    ///
95    /// # Returns
96    ///
97    /// All tags that appear across the posts.
98    pub async fn all_tags(&self, posts: &HashSet<OkuPost>) -> HashSet<String> {
99        HashSet::<_>::from_par_iter(posts.into_par_iter().flat_map(|x| x.note.tags.clone()))
100    }
101
102    /// Retrieves a mapping of tags to the number of posts containing them.
103    ///
104    /// # Arguments
105    ///
106    /// * `posts` - A set of posts.
107    ///
108    /// # Returns
109    ///
110    /// All tags that appear across the posts, and how often they appear.
111    pub async fn count_tags(&self, posts: &HashSet<OkuPost>) -> HashMap<String, usize> {
112        let result: DashMap<String, AtomicUsize> = DashMap::new();
113        posts.into_par_iter().for_each(|x| {
114            x.note.tags.par_iter().for_each(|y| match result.get(y) {
115                None => {
116                    result.insert(y.to_owned(), AtomicUsize::new(1));
117                }
118                Some(v) => {
119                    v.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
120                }
121            });
122        });
123        result
124            .into_par_iter()
125            .map(|(k, v)| (k, v.into_inner()))
126            .collect()
127    }
128
129    /// Retrieves an OkuNet post authored by the local user using its path.
130    ///
131    /// # Arguments
132    ///
133    /// * `path` - A path to a post in the user's home replica.
134    ///
135    /// # Returns
136    ///
137    /// The OkuNet post at the given path.
138    pub async fn post(&self, path: &PathBuf) -> miette::Result<OkuPost> {
139        let namespace_id = self
140            .home_replica()
141            .await
142            .ok_or(miette::miette!("Home replica not set … "))?;
143        match self.read_file(&namespace_id, path).await {
144            Ok(bytes) => {
145                let note = toml::from_str::<OkuNote>(String::from_utf8_lossy(&bytes).as_ref())
146                    .into_diagnostic()?;
147                Ok(OkuPost {
148                    entry: self.get_entry(&namespace_id, path).await?,
149                    note,
150                })
151            }
152            Err(e) => Err(miette::miette!("{}", e)),
153        }
154    }
155
156    /// Attempts to retrieve an OkuNet post from a file entry.
157    ///
158    /// # Arguments
159    ///
160    /// * `entry` - The file entry to parse.
161    ///
162    /// # Returns
163    ///
164    /// An OkuNet post, if the entry represents one.
165    pub async fn post_from_entry(&self, entry: &Entry) -> miette::Result<OkuPost> {
166        let bytes = self
167            .content_bytes(entry)
168            .await
169            .map_err(|e| miette::miette!("{}", e))?;
170        let note = toml::from_str::<OkuNote>(String::from_utf8_lossy(&bytes).as_ref())
171            .into_diagnostic()?;
172        Ok(OkuPost {
173            entry: entry.clone(),
174            note,
175        })
176    }
177
178    /// Retrieves OkuNet posts from the file entries in an [`OkuUser`].
179    ///
180    /// # Arguments
181    ///
182    /// * `user` - The OkuNet user record containing the file entries.
183    ///
184    /// # Returns
185    ///
186    /// A list of OkuNet posts contained within the user record.
187    pub async fn posts_from_user(&self, user: &OkuUser) -> miette::Result<Vec<OkuPost>> {
188        let mut posts: Vec<_> = Vec::new();
189        for post in user.posts.clone() {
190            posts.push(self.post_from_entry(&post).await?);
191        }
192        Ok(posts)
193    }
194
195    /// Create or modify an OkuNet post in the user's home replica.
196    ///
197    /// # Arguments
198    ///
199    /// * `path` - The path to create, or modify, the post at; a suggested path is generated if none is provided.
200    ///
201    /// * `url` - The URL the post is regarding.
202    ///
203    /// * `title` - The title of the post.
204    ///
205    /// * `body` - The body of the post.
206    ///
207    /// * `tags` - A list of tags associated with the post.
208    ///
209    /// # Returns
210    ///
211    /// A hash of the post's content.
212    pub async fn create_or_modify_post(
213        &self,
214        url: &Url,
215        title: &String,
216        body: &String,
217        tags: &HashSet<String>,
218    ) -> miette::Result<Hash> {
219        let home_replica_id = self
220            .home_replica()
221            .await
222            .ok_or(miette::miette!("No home replica set … "))?;
223        let new_note = OkuNote {
224            url: url.clone(),
225            title: title.to_string(),
226            body: body.to_string(),
227            tags: tags.clone(),
228        };
229        let post_path = &new_note.post_path().into();
230        self.create_or_modify_file(
231            &home_replica_id,
232            post_path,
233            toml::to_string_pretty(&new_note).into_diagnostic()?,
234        )
235        .await
236    }
237
238    /// Delete an OkuNet post in the user's home replica.
239    ///
240    /// # Arguments
241    ///
242    /// * `path` - A path to a post in the user's home replica.
243    ///
244    /// # Returns
245    ///
246    /// The number of entries deleted in the replica, which should be 1 if the file was successfully deleted.
247    pub async fn delete_post(&self, path: &PathBuf) -> miette::Result<usize> {
248        let home_replica_id = self
249            .home_replica()
250            .await
251            .ok_or(miette::miette!("No home replica set … "))?;
252        self.delete_file(&home_replica_id, path).await
253    }
254
255    /// Join a swarm to fetch the latest version of an OkuNet post.
256    ///
257    /// # Arguments
258    ///
259    /// * `author_id` - The authorship ID of the post's author.
260    ///
261    /// * `path` - The path to the post in the author's home replica.
262    ///
263    /// # Returns
264    ///
265    /// The requested OkuNet post.
266    pub async fn fetch_post(
267        &self,
268        author_id: &AuthorId,
269        path: &PathBuf,
270    ) -> miette::Result<OkuPost> {
271        let ticket = self
272            .resolve_author_id(author_id)
273            .await
274            .map_err(|e| miette::miette!("{}", e))?;
275        let namespace_id = ticket.capability.id();
276        match self
277            .fetch_file_with_ticket(&ticket, path, &Some(home_replica_filters()))
278            .await
279        {
280            Ok(bytes) => {
281                let note = toml::from_str::<OkuNote>(String::from_utf8_lossy(&bytes).as_ref())
282                    .into_diagnostic()?;
283                if let Err(e) = self
284                    .fetch_post_embeddings(&ticket, author_id, note.url.as_ref())
285                    .await
286                {
287                    error!("{e}")
288                }
289                Ok(OkuPost {
290                    entry: self.get_entry(&namespace_id, path).await?,
291                    note,
292                })
293            }
294            Err(e) => Err(miette::miette!("{}", e)),
295        }
296    }
297
298    /// Retrieves an OkuNet post from the database, or from the mainline DHT if not found locally.
299    ///
300    /// # Arguments
301    ///
302    /// * `author_id` - The authorship ID of the post's author.
303    ///
304    /// * `path` - The path to the post in the author's home replica.
305    ///
306    /// # Returns
307    ///
308    /// The requested OkuNet post.
309    pub async fn get_or_fetch_post(
310        &self,
311        author_id: &AuthorId,
312        path: &PathBuf,
313    ) -> miette::Result<OkuPost> {
314        match DATABASE.get_post(author_id, path).ok().flatten() {
315            Some(post) => Ok(post),
316            None => self.fetch_post(author_id, path).await,
317        }
318    }
319}