oku_fs/fs/net/
posts.rs

1use super::core::home_replica_filters;
2use crate::fs::util::entry_key_to_path;
3use crate::{
4    database::{
5        core::DATABASE,
6        posts::core::{OkuNote, OkuPost},
7        users::OkuUser,
8    },
9    fs::OkuFs,
10};
11use dashmap::DashMap;
12use iroh_blobs::Hash;
13use iroh_docs::rpc::client::docs::Entry;
14use iroh_docs::AuthorId;
15use log::error;
16use miette::IntoDiagnostic;
17use rayon::iter::{
18    FromParallelIterator, IntoParallelIterator, IntoParallelRefIterator, ParallelIterator,
19};
20use std::{
21    collections::{HashMap, HashSet},
22    path::{Path, PathBuf},
23    sync::atomic::AtomicUsize,
24};
25use url::Url;
26
27impl OkuFs {
28    /// Retrieves the OkuNet posts by the local user, if any.
29    ///
30    /// # Returns
31    ///
32    /// A list of the OkuNet posts by the local user.
33    pub async fn posts(&self) -> Option<Vec<OkuPost>> {
34        let post_files = self
35            .read_directory(&self.home_replica().await?, Path::new("/posts/"))
36            .await
37            .ok()
38            .unwrap_or_default();
39        Some(
40            post_files
41                .par_iter()
42                .filter(|(entry, _)| {
43                    entry_key_to_path(entry.key())
44                        .map(|x| matches!(x.extension(), Some(y) if y == "okupost"))
45                        .unwrap_or(false)
46                })
47                .filter_map(|(entry, bytes)| {
48                    toml::from_str::<OkuNote>(String::from_utf8_lossy(bytes).as_ref())
49                        .ok()
50                        .map(|x| OkuPost {
51                            entry: entry.clone(),
52                            note: x,
53                        })
54                })
55                .collect(),
56        )
57    }
58
59    /// Retrieve all posts known to this Oku node.
60    ///
61    /// # Returns
62    ///
63    /// All posts known to this Oku node.
64    pub async fn all_posts(&self) -> HashSet<OkuPost> {
65        let mut posts = HashSet::<_>::from_par_iter(self.posts().await.unwrap_or_default());
66        posts.extend(DATABASE.get_posts().unwrap_or_default());
67        posts
68    }
69
70    /// Filters posts containing at least one of the given tags.
71    ///
72    /// # Arguments
73    ///
74    /// * `posts` - A set of posts.
75    ///
76    /// * `tags` - A set of tags.
77    ///
78    /// # Returns
79    ///
80    /// A list of OkuNet posts with the given tags.
81    pub async fn posts_with_tags(&self, posts: &[OkuPost], tags: &HashSet<String>) -> Vec<OkuPost> {
82        posts
83            .to_owned()
84            .into_par_iter()
85            .filter(|x| !x.note.tags.is_disjoint(tags))
86            .collect()
87    }
88
89    /// Retrieves the set of all tags that appear in the given posts.
90    ///
91    /// # Arguments
92    ///
93    /// * `posts` - A set of posts.
94    ///
95    /// # Returns
96    ///
97    /// All tags that appear across the posts.
98    pub async fn all_tags(&self, posts: &HashSet<OkuPost>) -> HashSet<String> {
99        HashSet::<_>::from_par_iter(posts.into_par_iter().flat_map(|x| x.note.tags.clone()))
100    }
101
102    /// Retrieves a mapping of tags to the number of posts containing them.
103    ///
104    /// # Arguments
105    ///
106    /// * `posts` - A set of posts.
107    ///
108    /// # Returns
109    ///
110    /// All tags that appear across the posts, and how often they appear.
111    pub async fn count_tags(&self, posts: &HashSet<OkuPost>) -> HashMap<String, usize> {
112        let result: DashMap<String, AtomicUsize> = DashMap::new();
113        posts.into_par_iter().for_each(|x| {
114            x.note.tags.par_iter().for_each(|y| match result.get(y) {
115                None => {
116                    result.insert(y.to_owned(), AtomicUsize::new(1));
117                }
118                Some(v) => {
119                    v.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
120                }
121            });
122        });
123        result
124            .into_par_iter()
125            .map(|(k, v)| (k, v.into_inner()))
126            .collect()
127    }
128
129    /// Retrieves an OkuNet post authored by the local user using its path.
130    ///
131    /// # Arguments
132    ///
133    /// * `path` - A path to a post in the user's home replica.
134    ///
135    /// # Returns
136    ///
137    /// The OkuNet post at the given path.
138    pub async fn post(&self, path: &PathBuf) -> miette::Result<OkuPost> {
139        let namespace_id = self
140            .home_replica()
141            .await
142            .ok_or(miette::miette!("Home replica not set … "))?;
143        match self.read_file(&namespace_id, path).await {
144            Ok(bytes) => {
145                let note = toml::from_str::<OkuNote>(String::from_utf8_lossy(&bytes).as_ref())
146                    .into_diagnostic()?;
147                Ok(OkuPost {
148                    entry: self.get_entry(&namespace_id, path).await?,
149                    note,
150                })
151            }
152            Err(e) => Err(miette::miette!("{}", e)),
153        }
154    }
155
156    /// Attempts to retrieve an OkuNet post from a file entry.
157    ///
158    /// # Arguments
159    ///
160    /// * `entry` - The file entry to parse.
161    ///
162    /// # Returns
163    ///
164    /// An OkuNet post, if the entry represents one.
165    pub async fn post_from_entry(&self, entry: &Entry) -> miette::Result<OkuPost> {
166        let bytes = self
167            .content_bytes(entry)
168            .await
169            .map_err(|e| miette::miette!("{}", e))?;
170        let note = toml::from_str::<OkuNote>(String::from_utf8_lossy(&bytes).as_ref())
171            .into_diagnostic()?;
172        Ok(OkuPost {
173            entry: entry.clone(),
174            note,
175        })
176    }
177
178    /// Retrieves OkuNet posts from the file entries in an [`OkuUser`].
179    ///
180    /// # Arguments
181    ///
182    /// * `user` - The OkuNet user record containing the file entries.
183    ///
184    /// # Returns
185    ///
186    /// A list of OkuNet posts contained within the user record.
187    pub async fn posts_from_user(&self, user: &OkuUser) -> miette::Result<Vec<OkuPost>> {
188        let mut posts: Vec<_> = Vec::new();
189        for post in user.posts.clone() {
190            posts.push(self.post_from_entry(&post).await?);
191        }
192        Ok(posts)
193    }
194
195    /// Create or modify an OkuNet post in the user's home replica.
196    ///
197    /// # Arguments
198    ///
199    /// * `path` - The path to create, or modify, the post at; a suggested path is generated if none is provided.
200    ///
201    /// * `url` - The URL the post is regarding.
202    ///
203    /// * `title` - The title of the post.
204    ///
205    /// * `body` - The body of the post.
206    ///
207    /// * `tags` - A list of tags associated with the post.
208    ///
209    /// # Returns
210    ///
211    /// A hash of the post's content.
212    pub async fn create_or_modify_post(
213        &self,
214        path: &Option<PathBuf>,
215        url: &Url,
216        title: &String,
217        body: &String,
218        tags: &HashSet<String>,
219    ) -> miette::Result<Hash> {
220        let home_replica_id = self
221            .home_replica()
222            .await
223            .ok_or(miette::miette!("No home replica set … "))?;
224        let new_note = OkuNote {
225            url: url.clone(),
226            title: title.to_string(),
227            body: body.to_string(),
228            tags: tags.clone(),
229        };
230        let post_path = match path {
231            Some(given_path) => given_path,
232            None => &new_note.suggested_post_path().into(),
233        };
234        self.create_or_modify_file(
235            &home_replica_id,
236            post_path,
237            toml::to_string_pretty(&new_note).into_diagnostic()?,
238        )
239        .await
240    }
241
242    /// Delete an OkuNet post in the user's home replica.
243    ///
244    /// # Arguments
245    ///
246    /// * `path` - A path to a post in the user's home replica.
247    ///
248    /// # Returns
249    ///
250    /// The number of entries deleted in the replica, which should be 1 if the file was successfully deleted.
251    pub async fn delete_post(&self, path: &PathBuf) -> miette::Result<usize> {
252        let home_replica_id = self
253            .home_replica()
254            .await
255            .ok_or(miette::miette!("No home replica set … "))?;
256        self.delete_file(&home_replica_id, path).await
257    }
258
259    /// Join a swarm to fetch the latest version of an OkuNet post.
260    ///
261    /// # Arguments
262    ///
263    /// * `author_id` - The authorship ID of the post's author.
264    ///
265    /// * `path` - The path to the post in the author's home replica.
266    ///
267    /// # Returns
268    ///
269    /// The requested OkuNet post.
270    pub async fn fetch_post(
271        &self,
272        author_id: &AuthorId,
273        path: &PathBuf,
274    ) -> miette::Result<OkuPost> {
275        let ticket = self
276            .resolve_author_id(author_id)
277            .await
278            .map_err(|e| miette::miette!("{}", e))?;
279        let namespace_id = ticket.capability.id();
280        match self
281            .fetch_file_with_ticket(&ticket, path, &Some(home_replica_filters()))
282            .await
283        {
284            Ok(bytes) => {
285                let note = toml::from_str::<OkuNote>(String::from_utf8_lossy(&bytes).as_ref())
286                    .into_diagnostic()?;
287                let mut embedding_path = path.clone();
288                embedding_path.set_extension("okuembed");
289                if let Err(e) = self
290                    .fetch_post_embeddings(&ticket, &embedding_path, note.url.as_ref())
291                    .await
292                {
293                    error!("{e}")
294                }
295                Ok(OkuPost {
296                    entry: self.get_entry(&namespace_id, path).await?,
297                    note,
298                })
299            }
300            Err(e) => Err(miette::miette!("{}", e)),
301        }
302    }
303
304    /// Retrieves an OkuNet post from the database, or from the mainline DHT if not found locally.
305    ///
306    /// # Arguments
307    ///
308    /// * `author_id` - The authorship ID of the post's author.
309    ///
310    /// * `path` - The path to the post in the author's home replica.
311    ///
312    /// # Returns
313    ///
314    /// The requested OkuNet post.
315    pub async fn get_or_fetch_post(
316        &self,
317        author_id: &AuthorId,
318        path: &PathBuf,
319    ) -> miette::Result<OkuPost> {
320        match DATABASE.get_post(author_id, path).ok().flatten() {
321            Some(post) => Ok(post),
322            None => self.fetch_post(author_id, path).await,
323        }
324    }
325}