oku_fs/database/posts/
operations.rs

1use super::super::core::*;
2use super::core::OkuPost;
3use super::core::POST_INDEX;
4use super::core::POST_INDEX_READER;
5use super::core::POST_INDEX_WRITER;
6use super::core::POST_SCHEMA;
7use crate::fs::util::path_to_entry_key;
8use iroh_docs::AuthorId;
9use miette::IntoDiagnostic;
10use native_db::*;
11use rayon::iter::{IntoParallelIterator, IntoParallelRefIterator, ParallelIterator};
12use std::{collections::HashSet, path::PathBuf};
13use tantivy::{collector::TopDocs, query::QueryParser, TantivyDocument};
14
15impl OkuDatabase {
16    /// Search OkuNet posts with a query string.
17    ///
18    /// # Arguments
19    ///
20    /// * `query_string` - The string used to query for posts.
21    ///
22    /// * `result_limit` - The maximum number of results to get (defaults to 10).
23    ///
24    /// # Returns
25    ///
26    /// A list of OkuNet posts.
27    pub fn search_posts(
28        query_string: &str,
29        result_limit: &Option<usize>,
30    ) -> miette::Result<Vec<OkuPost>> {
31        let searcher = POST_INDEX_READER.searcher();
32        let query_parser = QueryParser::for_index(
33            &POST_INDEX,
34            vec![
35                POST_SCHEMA.1["author_id"],
36                POST_SCHEMA.1["path"],
37                POST_SCHEMA.1["title"],
38                POST_SCHEMA.1["body"],
39                POST_SCHEMA.1["tag"],
40            ],
41        );
42        let query = query_parser.parse_query(query_string).into_diagnostic()?;
43        let limit = result_limit.unwrap_or(10);
44        let top_docs = searcher
45            .search(&query, &TopDocs::with_limit(limit))
46            .into_diagnostic()?;
47        Ok(top_docs
48            .par_iter()
49            .filter_map(|x| searcher.doc(x.1).ok())
50            .collect::<Vec<TantivyDocument>>()
51            .into_par_iter()
52            .filter_map(|x| TryInto::try_into(x).ok())
53            .collect())
54    }
55
56    /// Insert or update an OkuNet post.
57    ///
58    /// # Arguments
59    ///
60    /// * `post` - An OkuNet post to upsert.
61    ///
62    /// # Returns
63    ///
64    /// The previous version of the post, if one existed.
65    pub fn upsert_post(&self, post: &OkuPost) -> miette::Result<Option<OkuPost>> {
66        let rw: transaction::RwTransaction<'_> =
67            self.database.rw_transaction().into_diagnostic()?;
68        let old_value: Option<OkuPost> = rw.upsert(post.clone()).into_diagnostic()?;
69        rw.commit().into_diagnostic()?;
70
71        let mut index_writer = POST_INDEX_WRITER
72            .clone()
73            .try_lock_owned()
74            .into_diagnostic()?;
75        if let Some(old_post) = old_value.clone() {
76            index_writer.delete_term(old_post.index_term());
77        }
78        index_writer
79            .add_document(post.to_owned().into())
80            .into_diagnostic()?;
81        index_writer.commit().into_diagnostic()?;
82
83        Ok(old_value)
84    }
85
86    /// Insert or update multiple OkuNet posts.
87    ///
88    /// # Arguments
89    ///
90    /// * `posts` - A list of OkuNet posts to upsert.
91    ///
92    /// # Returns
93    ///
94    /// A list containing the previous version of each post, if one existed.
95    pub fn upsert_posts(&self, posts: &Vec<OkuPost>) -> miette::Result<Vec<Option<OkuPost>>> {
96        let rw = self.database.rw_transaction().into_diagnostic()?;
97        let old_posts: Vec<_> = posts
98            .clone()
99            .into_iter()
100            .filter_map(|post| rw.upsert(post).ok())
101            .collect();
102        rw.commit().into_diagnostic()?;
103
104        let mut index_writer = POST_INDEX_WRITER
105            .clone()
106            .try_lock_owned()
107            .into_diagnostic()?;
108        old_posts.par_iter().for_each(|old_post| {
109            if let Some(old_post) = old_post {
110                index_writer.delete_term(old_post.index_term());
111            }
112        });
113        posts.par_iter().for_each(|post| {
114            let _ = index_writer.add_document(post.clone().into());
115        });
116        index_writer.commit().into_diagnostic()?;
117
118        Ok(old_posts)
119    }
120
121    /// Delete an OkuNet post.
122    ///
123    /// # Arguments
124    ///
125    /// * `post` - An OkuNet post to delete.
126    ///
127    /// # Returns
128    ///
129    /// The deleted post.
130    pub fn delete_post(&self, post: &OkuPost) -> miette::Result<OkuPost> {
131        let rw = self.database.rw_transaction().into_diagnostic()?;
132        let removed_post = rw.remove(post.to_owned()).into_diagnostic()?;
133        rw.commit().into_diagnostic()?;
134
135        let mut index_writer = POST_INDEX_WRITER
136            .clone()
137            .try_lock_owned()
138            .into_diagnostic()?;
139        index_writer.delete_term(removed_post.index_term());
140        index_writer.commit().into_diagnostic()?;
141
142        Ok(removed_post)
143    }
144
145    /// Delete multiple OkuNet posts.
146    ///
147    /// # Arguments
148    ///
149    /// * `posts` - A list of OkuNet posts to delete.
150    ///
151    /// # Returns
152    ///
153    /// A list containing the deleted posts.
154    pub fn delete_posts(&self, posts: &[OkuPost]) -> miette::Result<Vec<OkuPost>> {
155        let rw = self.database.rw_transaction().into_diagnostic()?;
156        let removed_posts: Vec<_> = posts
157            .iter()
158            .filter_map(|post| rw.remove(post.to_owned()).ok())
159            .collect();
160        rw.commit().into_diagnostic()?;
161
162        let mut index_writer = POST_INDEX_WRITER
163            .clone()
164            .try_lock_owned()
165            .into_diagnostic()?;
166        removed_posts.par_iter().for_each(|removed_post| {
167            index_writer.delete_term(removed_post.index_term());
168        });
169        index_writer.commit().into_diagnostic()?;
170
171        Ok(removed_posts)
172    }
173
174    /// Retrieves all known OkuNet posts.
175    ///
176    /// # Returns
177    ///
178    /// A list of all known OkuNet posts.
179    pub fn get_posts(&self) -> miette::Result<Vec<OkuPost>> {
180        let r = self.database.r_transaction().into_diagnostic()?;
181        r.scan()
182            .primary()
183            .into_diagnostic()?
184            .all()
185            .into_diagnostic()?
186            .collect::<Result<Vec<_>, _>>()
187            .into_diagnostic()
188    }
189
190    /// Retrieves all known OkuNet posts by a given author.
191    ///
192    /// # Arguments
193    ///
194    /// * `author_id` - A content authorship ID.
195    ///
196    /// # Returns
197    ///
198    /// A list of all known OkuNet posts by the given author.
199    pub fn get_posts_by_author(&self, author_id: &AuthorId) -> miette::Result<Vec<OkuPost>> {
200        Ok(self
201            .get_posts()?
202            .into_par_iter()
203            .filter(|x| x.entry.author() == *author_id)
204            .collect())
205    }
206
207    /// Retrieves all known OkuNet posts by a given tag.
208    ///
209    /// # Arguments
210    ///
211    /// * `tag` - A tag.
212    ///
213    /// # Returns
214    ///
215    /// A list of all known OkuNet posts with the given tag.
216    pub fn get_posts_by_tag(&self, tag: &String) -> miette::Result<Vec<OkuPost>> {
217        Ok(self
218            .get_posts()?
219            .into_par_iter()
220            .filter(|x| x.note.tags.contains(tag))
221            .collect())
222    }
223
224    /// Retrieves all distinct tags used in OkuNet posts.
225    ///
226    /// # Returns
227    ///
228    /// A list of all tags that appear in an OkuNet post.
229    pub fn get_tags(&self) -> miette::Result<HashSet<String>> {
230        Ok(self
231            .get_posts()?
232            .into_iter()
233            .flat_map(|x| x.note.tags)
234            .collect())
235    }
236
237    /// Retrieves an OkuNet post.
238    ///
239    /// # Arguments
240    ///
241    /// * `author_id` - A content authorship ID.
242    ///
243    /// * `path` - A path to a post in the author's home replica.
244    ///
245    /// # Returns
246    ///
247    /// The OkuNet post by the given author at the given path, if one exists.
248    pub fn get_post(
249        &self,
250        author_id: &AuthorId,
251        path: &PathBuf,
252    ) -> miette::Result<Option<OkuPost>> {
253        let r = self.database.r_transaction().into_diagnostic()?;
254        let entry_key = (
255            author_id.as_bytes().to_vec(),
256            path_to_entry_key(path).to_vec(),
257        );
258        r.get().primary(entry_key).into_diagnostic()
259    }
260}